Skip to content

Commit

Permalink
First Commit
Browse files Browse the repository at this point in the history
  • Loading branch information
SamHjelmfelt committed Nov 27, 2019
0 parents commit dde001b
Show file tree
Hide file tree
Showing 10 changed files with 749 additions and 0 deletions.
41 changes: 41 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# NiFi Prometheus Metrics
NiFi Processor that performs Prometheus service discovery and metric scraping

## Usage
All Prometheus service discovery and scrape configurations should be supported.
Prometheus configurations: https://prometheus.io/docs/prometheus/latest/configuration/configuration/
See also: https://prometheus.io/docs/instrumenting/exporters/

Metrics are output in JSON format with one metric per flowfile. For example:
{"__name__"="go_gc_duration_seconds", "instance"="localhost:9100", "job"="prometheus", "quantile"="0", "ts"=1563377239705, "value"=0.000000}

Optionally based on configuration, the processor can extract the namespace into its own field:
For example: __name__=myNamespace_my_metric_name -> __namespace__=myNamespace, __name__=my_metric_name


Note that this processor uses a native library, so it will need to be built for each target architecture. The release zip contains native libraries for both centos and mac.


## How to Build
```
#Prerequisites (in PATH): git, go, maven, gcc
#Environment Variables: JAVA_HOME
mvn clean install
```

## How to Install (HDF)
```
cp NiFi-Prometheus-Metrics/nifi-prometheus-metrics-nar/target/nifi-prometheus-metrics-nar-1.0-SNAPSHOT.nar /usr/hdf/current/nifi/lib/
#If mac
#cp NiFi-Prometheus-Metrics/nifi-prometheus-metrics-processors/target/libprommetricsapi.jnilib /usr/hdf/current/nifi/lib/
#Else if Linux
cp NiFi-Prometheus-Metrics/nifi-prometheus-metrics-processors/target/libprommetricsapi.so /usr/hdf/current/nifi/lib/
#Add to nifi-env.sh (in Ambari, Advanced nifi-env -> Template for nifi-env.sh)
#export LD_LIBRARY_PATH=/usr/hdf/current/nifi/lib/
```

## How It Works
CGO is used to compile the relevant Prometheus classes and a small wrapper into a shared object (.so) file that can be loaded into the JVM using JNI. The Java code instantiates a blocking queue and passes it as a parameter into the native libary along with the Prometheus configuration. The native library starts the service discovery and metric scraping and puts the metrics that it scrapes into this queue for the Java code to consume. The Java code converts the metrics to JSON and outputs them as NiFi FlowFiles.
41 changes: 41 additions & 0 deletions nifi-prometheus-metrics-nar/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>prometheus-metrics</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>

<artifactId>nifi-prometheus-metrics-nar</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>nar</packaging>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
<source.skip>true</source.skip>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-prometheus-metrics-processors</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>

</project>
83 changes: 83 additions & 0 deletions nifi-prometheus-metrics-processors/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>prometheus-metrics</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>

<artifactId>nifi-prometheus-metrics-processors</artifactId>
<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>1.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>1.9.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>exec-maven-plugin</artifactId>
<groupId>org.codehaus.mojo</groupId>
<executions>
<execution>
<id>Build Go binary</id>
<phase>generate-sources</phase>
<goals>
<goal>exec</goal>
</goals>
<configuration>
<executable>${basedir}/go-build.sh</executable>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<forkMode>once</forkMode>
<argLine>-Djava.library.path=${project.build.directory}</argLine>
</configuration>
</plugin>
</plugins>
</build>
</project>
73 changes: 73 additions & 0 deletions nifi-prometheus-metrics-processors/src/main/go/cgo_exports.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package main
/*
#cgo CFLAGS: -I/Library/Java/JavaVirtualMachines/jdk1.8.0_71.jdk/Contents/Home/include
#cgo CFLAGS: -I/Library/Java/JavaVirtualMachines/jdk1.8.0_71.jdk/Contents/Home/include/darwin
#include <jni.h>
*/
import "C"
import (
"github.com/go-kit/kit/log"
"sync"
)

type channelWriter struct{
channel chan string
}
func (w channelWriter) Write(p []byte) (n int, err error) {
w.channel <- string(p[:])
return len(p), nil
}

//export Java_org_apache_nifi_processors_prometheus_GetPrometheusMetrics_Run
func Java_org_apache_nifi_processors_prometheus_GetPrometheusMetrics_Run(env *C.JNIEnv, callingObject C.jclass, configJStr C.jstring, ID C.jstring, metricQueue C.jobject, logQueue C.jobject) C.int{
nStr := getNativeStringGo(env, configJStr)
config := C.GoString(nStr)

metricChannel := make(chan string, 1000)
logChannel := make(chan string, 1000)
jvm := getJVMGo(env)

logger := log.NewLogfmtLogger(log.NewSyncWriter(channelWriter{logChannel}))

cancelFunc := GetMetricsWithCancel(config, metricChannel, logger)

register(C.GoString(getNativeStringGo(env, ID)), cancelFunc)

addToQueue(jvm, metricQueue, metricChannel, logQueue, logChannel)
return 0
}
//export Java_org_apache_nifi_processors_prometheus_GetPrometheusMetrics_Stop
func Java_org_apache_nifi_processors_prometheus_GetPrometheusMetrics_Stop(env *C.JNIEnv, callingObject C.jclass, ID C.jstring) C.jboolean {

cancelFunc, ok := lookup(C.GoString(getNativeStringGo(env, ID)))
if ok {
cancelFunc()
unregister(C.GoString(getNativeStringGo(env, ID)))
}
return C.JNI_TRUE
}

var mu sync.Mutex
var fns = make(map[string]func())

func register(ID string, fn func()) {
mu.Lock()
defer mu.Unlock()
fns[ID] = fn
}

func lookup(ID string) (func(),bool) {
mu.Lock()
defer mu.Unlock()
val, ok := fns[ID]
return val, ok
}

func unregister(ID string) {
mu.Lock()
defer mu.Unlock()
delete(fns, ID)
}

func main() {}
67 changes: 67 additions & 0 deletions nifi-prometheus-metrics-processors/src/main/go/jni.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package main
/*
#cgo CFLAGS: -I/Library/Java/JavaVirtualMachines/jdk1.8.0_71.jdk/Contents/Home/include
#cgo CFLAGS: -I/Library/Java/JavaVirtualMachines/jdk1.8.0_71.jdk/Contents/Home/include/darwin
#include <jni.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
JavaVM* getJVM(JNIEnv* env){
JavaVM* jvm;
(*env)->GetJavaVM(env, &jvm);
return jvm;
}
char* getNativeString(JNIEnv* env, jstring str) {
return (char*)((*env)->GetStringUTFChars(env, str, 0));
}
void addToQueue(JavaVM* jvm, jobject queue, char* bytes) {
JNIEnv* env;
//go does not have thread affinity, so re-attach everytime
(*jvm)->AttachCurrentThread(jvm, (void**)&env, NULL);
jclass queueClass = (*env)->GetObjectClass(env, queue);
jmethodID putMethod = (*env)->GetMethodID(env, queueClass, "put", "(Ljava/lang/Object;)V");
(*env)->DeleteLocalRef(env, queueClass);
jstring str = (*env)->NewStringUTF(env, bytes);
(*env)->CallVoidMethod(env, queue, putMethod, str);
(*env)->DeleteLocalRef(env, str);
}
*/
import "C"
import "time"

func getNativeStringGo(env *C.JNIEnv, str C.jstring) *C.char{
return C.getNativeString(env,str);
}
func getJVMGo(sourceEnv *C.JNIEnv) *C.JavaVM{
jvm := C.getJVM(sourceEnv);
return jvm
}
func addToQueue(jvm *C.JavaVM, metricQueue C.jobject, metricChannel chan string, logQueue C.jobject, logChannel chan string) {

for {
//Check for logs before pulling metrics
select {
case log := <- logChannel:
if log != "" {
C.addToQueue(jvm, logQueue, C.CString(log))
}
default:
//No log messages
}

hasMetrics := true
for hasMetrics == true { //Loop while metrics are available
select {
case metric := <- metricChannel:
if metric != "" {
C.addToQueue(jvm, metricQueue, C.CString(metric))
}
default:
//No metrics
hasMetrics = false
}
}
time.Sleep(100 * time.Millisecond)
}
}
Loading

0 comments on commit dde001b

Please sign in to comment.