Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test otel proto exporter v2 #268

Draft
wants to merge 20 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions hypertrace-ingester/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ hypertraceDocker {

dependencies {
implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21")
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.26")
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.26")
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.30-SNAPSHOT")
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.30-SNAPSHOT")
implementation("org.hypertrace.core.datamodel:data-model:0.1.18")
implementation("org.hypertrace.core.viewgenerator:view-generator-framework:0.3.1")
implementation("com.typesafe:config:1.4.1")
Expand All @@ -40,6 +40,9 @@ dependencies {
implementation(project(":raw-spans-grouper:raw-spans-grouper"))
implementation(project(":hypertrace-trace-enricher:hypertrace-trace-enricher"))
implementation(project(":hypertrace-view-generator:hypertrace-view-generator"))
implementation(project(":hypertrace-metrics-generator:hypertrace-metrics-generator"))
implementation(project(":hypertrace-metrics-processor:hypertrace-metrics-processor"))
implementation(project(":hypertrace-metrics-exporter:hypertrace-metrics-exporter"))

testImplementation("org.junit.jupiter:junit-jupiter:5.7.1")
testImplementation("org.mockito:mockito-core:3.8.0")
Expand All @@ -64,7 +67,10 @@ tasks.register<Copy>("copyServiceConfigs") {
createCopySpec("span-normalizer", "span-normalizer", "main", "common"),
createCopySpec("raw-spans-grouper", "raw-spans-grouper", "main", "common"),
createCopySpec("hypertrace-trace-enricher", "hypertrace-trace-enricher", "main", "common"),
createCopySpec("hypertrace-view-generator", "hypertrace-view-generator", "main", "common")
createCopySpec("hypertrace-view-generator", "hypertrace-view-generator", "main", "common"),
createCopySpec("hypertrace-metrics-generator", "hypertrace-metrics-generator", "main", "common"),
createCopySpec("hypertrace-metrics-processor", "hypertrace-metrics-processor", "main", "common"),
createCopySpec("hypertrace-metrics-exporter", "hypertrace-metrics-exporter", "main", "common")
).into("./build/resources/main/configs/")
}

Expand Down Expand Up @@ -101,10 +107,13 @@ tasks.test {

tasks.register<Copy>("copyServiceConfigsTest") {
with(
createCopySpec("span-normalizer", "span-normalizer", "test", "span-normalizer"),
createCopySpec("raw-spans-grouper", "raw-spans-grouper", "test", "raw-spans-grouper"),
createCopySpec("hypertrace-trace-enricher", "hypertrace-trace-enricher", "test", "hypertrace-trace-enricher"),
createCopySpec("hypertrace-view-generator", "hypertrace-view-generator", "test", "hypertrace-view-generator")
createCopySpec("span-normalizer", "span-normalizer", "test", "span-normalizer"),
createCopySpec("raw-spans-grouper", "raw-spans-grouper", "test", "raw-spans-grouper"),
createCopySpec("hypertrace-trace-enricher", "hypertrace-trace-enricher", "test", "hypertrace-trace-enricher"),
createCopySpec("hypertrace-view-generator", "hypertrace-view-generator", "test", "hypertrace-view-generator"),
createCopySpec("hypertrace-metrics-generator", "hypertrace-metrics-generator", "test", "hypertrace-metrics-generator"),
createCopySpec("hypertrace-metrics-processor", "hypertrace-metrics-processor", "test", "hypertrace-metrics-processor"),
createCopySpec("hypertrace-metrics-exporter", "hypertrace-metrics-exporter", "test", "hypertrace-metrics-exporter")
).into("./build/resources/test/configs/")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
import org.hypertrace.core.serviceframework.config.ConfigUtils;
import org.hypertrace.core.spannormalizer.SpanNormalizer;
import org.hypertrace.core.viewgenerator.service.MultiViewGeneratorLauncher;
import org.hypertrace.metrics.exporter.MetricsExporterEntryService;
import org.hypertrace.metrics.generator.MetricsGenerator;
import org.hypertrace.metrics.processor.MetricsProcessor;
import org.hypertrace.traceenricher.trace.enricher.TraceEnricher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -29,9 +32,34 @@ public class HypertraceIngester extends KafkaStreamsApp {
private static final String HYPERTRACE_INGESTER_JOB_CONFIG = "hypertrace-ingester-job-config";

private Map<String, Pair<String, KafkaStreamsApp>> jobNameToSubTopology = new HashMap<>();
private MetricsExporterEntryService metricsExporter;
private Thread metricsExporterThread;

public HypertraceIngester(ConfigClient configClient) {
super(configClient);
metricsExporter =
new MetricsExporterEntryService(
configClient, getSubJobConfig("hypertrace-metrics-exporter"));
}

@Override
protected void doInit() {
super.doInit();
metricsExporter.doInit();
}

@Override
protected void doStart() {
super.doStart();
metricsExporterThread = new Thread(() -> metricsExporter.doStart());
metricsExporterThread.start();
}

@Override
protected void doStop() {
super.doStop();
metricsExporter.doStop();
metricsExporterThread.stop();
}

private KafkaStreamsApp getSubTopologyInstance(String name) {
Expand All @@ -49,6 +77,12 @@ private KafkaStreamsApp getSubTopologyInstance(String name) {
case "all-views":
kafkaStreamsApp = new MultiViewGeneratorLauncher(ConfigClientFactory.getClient());
break;
case "hypertrace-metrics-generator":
kafkaStreamsApp = new MetricsGenerator(ConfigClientFactory.getClient());
break;
case "hypertrace-metrics-processor":
kafkaStreamsApp = new MetricsProcessor(ConfigClientFactory.getClient());
break;
default:
throw new RuntimeException(String.format("Invalid configured sub-topology : [%s]", name));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@ main.class = org.hypertrace.ingester.HypertraceIngester
service.name = hypertrace-ingester
service.admin.port = 8099

sub.topology.names = ["span-normalizer", "raw-spans-grouper", "hypertrace-trace-enricher", "all-views"]
sub.topology.names = [
"span-normalizer",
"raw-spans-grouper",
"hypertrace-trace-enricher",
"all-views",
"hypertrace-metrics-generator",
"hypertrace-metrics-processor"
]

precreate.topics = false
precreate.topics = ${?PRE_CREATE_TOPICS}
Expand Down
29 changes: 29 additions & 0 deletions hypertrace-ingester/src/main/resources/log4j2.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
status = error
name = PropertiesConfig

appender.console.type = Console
appender.console.name = STDOUT
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n

appender.rolling.type = RollingFile
appender.rolling.name = ROLLING_FILE
appender.rolling.fileName = ${sys:service.name:-hypertrace-ingester}.log
appender.rolling.filePattern = ${sys:service.name:-hypertrace-ingester}-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %c{1.} - %msg%n
appender.rolling.policies.type = Policies
appender.rolling.policies.time.type = TimeBasedTriggeringPolicy
appender.rolling.policies.time.interval = 3600
appender.rolling.policies.time.modulate = true
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size = 20MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 5

rootLogger.level = INFO
rootLogger.appenderRef.stdout.ref = STDOUT
rootLogger.appenderRef.rolling.ref = ROLLING_FILE



3 changes: 3 additions & 0 deletions hypertrace-metrics-exporter/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
subprojects {
group = "org.hypertrace.metrics.exporter"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
plugins {
java
application
jacoco
id("org.hypertrace.docker-java-application-plugin")
id("org.hypertrace.docker-publish-plugin")
id("org.hypertrace.jacoco-report-plugin")
}

application {
mainClass.set("org.hypertrace.core.serviceframework.PlatformServiceLauncher")
}

hypertraceDocker {
defaultImage {
javaApplication {
serviceName.set("${project.name}")
adminPort.set(8099)
}
}
}

tasks.test {
useJUnitPlatform()
}

dependencies {
// common and framework
implementation(project(":hypertrace-view-generator:hypertrace-view-generator-api"))
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.30-SNAPSHOT")
implementation("org.hypertrace.core.serviceframework:platform-metrics:0.1.30-SNAPSHOT")
implementation("org.hypertrace.core.kafkastreams.framework:kafka-streams-framework:0.1.21")

// open telemetry
implementation("io.opentelemetry:opentelemetry-api:1.7.0-SNAPSHOT")
implementation("io.opentelemetry:opentelemetry-api-metrics:1.7.0-alpha-SNAPSHOT")
implementation("io.opentelemetry:opentelemetry-sdk:1.7.0-SNAPSHOT")
implementation("io.opentelemetry:opentelemetry-exporter-otlp-common:1.7.0-SNAPSHOT")
implementation("io.opentelemetry:opentelemetry-sdk-metrics:1.7.0-alpah-SNAPSHOT")
implementation("io.opentelemetry:opentelemetry-exporter-otlp-metrics:1.7.0-alpha-SNAPSHOT")
implementation("io.opentelemetry:opentelemetry-exporter-prometheus:1.7.0-alpha-SNAPSHOT")

// open telemetry proto
implementation("io.opentelemetry:opentelemetry-proto:1.6.0-alpha")

// jetty server
implementation("org.eclipse.jetty:jetty-server:9.4.42.v20210604")
implementation("org.eclipse.jetty:jetty-servlet:9.4.42.v20210604")

// prometheus metrics servelet
implementation("io.prometheus:simpleclient_servlet:0.12.0")

// kafka
implementation("org.apache.kafka:kafka-clients:2.6.0")

// test
testImplementation("org.junit.jupiter:junit-jupiter:5.7.1")
testImplementation("org.mockito:mockito-core:3.8.0")
testImplementation("com.google.code.gson:gson:2.8.7")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package org.hypertrace.metrics.exporter;

import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricProducer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InMemoryMetricsProducer implements MetricProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(InMemoryMetricsProducer.class);
private BlockingQueue<MetricData> metricDataQueue;
private final AtomicBoolean commitOffset = new AtomicBoolean(false);

public InMemoryMetricsProducer(int maxQueueSize) {
this.metricDataQueue = new ArrayBlockingQueue<MetricData>(maxQueueSize);
}

public void addMetricData(List<ResourceMetrics> resourceMetrics) {
try {
for (ResourceMetrics rm : resourceMetrics) {
List<MetricData> metricData = OtlpToObjectConverter.toMetricData(rm);
for (MetricData md : metricData) {
this.metricDataQueue.put(md);
}
}
} catch (InterruptedException exception) {
LOGGER.info("This thread was intruppted, so we might loose copying some metrics ");
}
}

public Collection<MetricData> collectAllMetrics() {
List<MetricData> metricDataList = new ArrayList<>();
int max = 0;
while (max < 100 && this.metricDataQueue.peek() != null) {
metricDataList.add(this.metricDataQueue.poll());
max++;
}
return metricDataList;
}

public void setCommitOffset() {
commitOffset.set(true);
}

public void clearCommitOffset() {
commitOffset.set(false);
}

public boolean isCommitOffset() {
return commitOffset.get();
}
}
Loading