Skip to content

Commit

Permalink
Hybrid OpenCensusProtobufInputFormat in opencensus-extensions (#69)
Browse files Browse the repository at this point in the history
* Support OpenTelemetry payloads in OpenCensusProtobufInputFormat
Support reading mixed OpenTelemetry and OpenCensus topics based on Kafka version header
 
* workaround classloader isolation
Workaround classloader isolation by using method handles to get access
to KafkaRecordEntity related methods and check record headers

Co-authored-by: Xavier Léauté <[email protected]>
  • Loading branch information
2 people authored and harinirajendran committed Mar 1, 2022
1 parent 4b93e7c commit 788b94d
Show file tree
Hide file tree
Showing 9 changed files with 613 additions and 8 deletions.
26 changes: 26 additions & 0 deletions extensions-contrib/opencensus-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry.proto</groupId>
<artifactId>opentelemetry-proto</artifactId>
</dependency>
<dependency>
<groupId>org.apache.druid.extensions.contrib</groupId>
<artifactId>druid-opentelemetry-extensions</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand All @@ -74,6 +83,11 @@
<artifactId>guice</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
Expand All @@ -85,6 +99,18 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.druid.extensions</groupId>
<artifactId>druid-kafka-indexing-service</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${apache.kafka.version}</version>
<scope>test</scope>
</dependency>
<!-- jmh -->
<dependency>
<groupId>org.openjdk.jmh</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.data.input;

import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.util.Objects;

public class KafkaUtils
{
/**
* Creates a MethodHandle that – when invoked on a KafkaRecordEntity - returns the given header value
* for the underlying KafkaRecordEntity
*
* The method handle is roughly equivalent to the following function
*
* (KafkaRecordEntity input) -> {
* Header h = input.getRecord().headers().lastHeader(header)
* if (h != null) {
* return h.value();
* } else {
* return null;
* }
* }
*
* Since KafkaRecordEntity only exists in the kafka-indexing-service plugin classloader,
* we need to look up the relevant classes in the classloader where the InputEntity was instantiated.
*
* The handle returned by this method should be cached for the classloader it was invoked with.
*
* If the lookup fails for whatever reason, the method handle will always return null;
*
* @param classLoader the kafka-indexing-service classloader
* @param header the header value to look up
* @return a MethodHandle
*/
public static MethodHandle lookupGetHeaderMethod(ClassLoader classLoader, String header)
{
try {
Class entityType = Class.forName("org.apache.druid.data.input.kafka.KafkaRecordEntity", true, classLoader);
Class recordType = Class.forName("org.apache.kafka.clients.consumer.ConsumerRecord", true, classLoader);
Class headersType = Class.forName("org.apache.kafka.common.header.Headers", true, classLoader);
Class headerType = Class.forName("org.apache.kafka.common.header.Header", true, classLoader);

final MethodHandles.Lookup lookup = MethodHandles.lookup();
MethodHandle nonNullTest = lookup.findStatic(Objects.class, "nonNull",
MethodType.methodType(boolean.class, Object.class)
).asType(MethodType.methodType(boolean.class, headerType));

final MethodHandle getRecordMethod = lookup.findVirtual(
entityType,
"getRecord",
MethodType.methodType(recordType)
);
final MethodHandle headersMethod = lookup.findVirtual(recordType, "headers", MethodType.methodType(headersType));
final MethodHandle lastHeaderMethod = lookup.findVirtual(
headersType,
"lastHeader",
MethodType.methodType(headerType, String.class)
);
final MethodHandle valueMethod = lookup.findVirtual(headerType, "value", MethodType.methodType(byte[].class));

return MethodHandles.filterReturnValue(
MethodHandles.filterReturnValue(
MethodHandles.filterReturnValue(getRecordMethod, headersMethod),
MethodHandles.insertArguments(lastHeaderMethod, 1, header)
),
// return null byte array if header is not present
MethodHandles.guardWithTest(
nonNullTest,
valueMethod,
// match valueMethod signature by dropping the header instance argument
MethodHandles.dropArguments(MethodHandles.constant(byte[].class, null), 0, headerType)
)
);
}
catch (ReflectiveOperationException e) {
// if lookup fails in the classloader where the InputEntity is defined, then the source may not be
// the kafka-indexing-service classloader, or method signatures did not match.
// In that case we return a method handle always returning null
return noopMethodHandle();
}
}

static MethodHandle noopMethodHandle()
{
return MethodHandles.dropArguments(MethodHandles.constant(byte[].class, null), 0, InputEntity.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,42 @@
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.KafkaUtils;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.opentelemetry.protobuf.OpenTelemetryMetricsProtobufReader;
import org.apache.druid.java.util.common.StringUtils;

import javax.annotation.Nullable;
import java.io.File;
import java.lang.invoke.MethodHandle;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Objects;

public class OpenCensusProtobufInputFormat implements InputFormat
{
private static final String DEFAULT_METRIC_DIMENSION = "name";
private static final String DEFAULT_RESOURCE_PREFIX = "resource.";
private static final String DEFAULT_VALUE_DIMENSION = "value";
private static final String VERSION_HEADER_KEY = "v";
private static final int OPENTELEMETRY_FORMAT_VERSION = 1;

private final String metricDimension;
private final String valueDimension;
private final String metricLabelPrefix;
private final String resourceLabelPrefix;

private volatile MethodHandle getHeaderMethod = null;

public OpenCensusProtobufInputFormat(
@JsonProperty("metricDimension") String metricDimension,
@JsonProperty("valueDimension") @Nullable String valueDimension,
@JsonProperty("metricLabelPrefix") String metricLabelPrefix,
@JsonProperty("resourceLabelPrefix") String resourceLabelPrefix
)
{
this.metricDimension = metricDimension != null ? metricDimension : DEFAULT_METRIC_DIMENSION;
this.valueDimension = valueDimension != null ? valueDimension : DEFAULT_VALUE_DIMENSION;
this.metricLabelPrefix = StringUtils.nullToEmptyNonDruidDataString(metricLabelPrefix);
this.resourceLabelPrefix = resourceLabelPrefix != null ? resourceLabelPrefix : DEFAULT_RESOURCE_PREFIX;
}
Expand All @@ -59,6 +73,37 @@ public boolean isSplittable()
@Override
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{
// assume InputEntity is always defined in a single classloader (the kafka-indexing-service classloader)
// so we only have to look it up once. To be completely correct we should cache the method based on classloader
if (getHeaderMethod == null) {
getHeaderMethod = KafkaUtils.lookupGetHeaderMethod(
source.getClass().getClassLoader(),
OpenCensusProtobufInputFormat.VERSION_HEADER_KEY
);
}

try {
byte[] versionHeader = (byte[]) getHeaderMethod.invoke(source);
if (versionHeader != null) {
int version =
ByteBuffer.wrap(versionHeader).order(ByteOrder.LITTLE_ENDIAN).getInt();
if (version == OPENTELEMETRY_FORMAT_VERSION) {
return new OpenTelemetryMetricsProtobufReader(
inputRowSchema.getDimensionsSpec(),
(ByteEntity) source,
metricDimension,
valueDimension,
metricLabelPrefix,
resourceLabelPrefix
);
}
}
}
catch (Throwable t) {
// assume input is opencensus if something went wrong
}


return new OpenCensusProtobufReader(
inputRowSchema.getDimensionsSpec(),
(ByteEntity) source,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.data.input;


import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.junit.Assert;
import org.junit.Test;

import java.lang.invoke.MethodHandle;
import java.nio.ByteBuffer;

public class KafkaUtilsTest
{

private static final byte[] BYTES = ByteBuffer.allocate(Integer.BYTES).putInt(42).array();

@Test
public void testNoopMethodHandle() throws Throwable
{
Assert.assertNull(
KafkaUtils.noopMethodHandle().invoke(new ByteEntity(new byte[]{}))
);
}

@Test
public void testKafkaRecordEntity() throws Throwable
{
final MethodHandle handle = KafkaUtils.lookupGetHeaderMethod(KafkaUtilsTest.class.getClassLoader(), "version");
KafkaRecordEntity input = new KafkaRecordEntity(
new ConsumerRecord<>(
"test",
0,
0,
0,
TimestampType.CREATE_TIME,
-1L,
-1,
-1,
null,
new byte[]{},
new RecordHeaders(ImmutableList.of(new Header()
{
@Override
public String key()
{
return "version";
}

@Override
public byte[] value()
{
return BYTES;
}
}))
)
);
Assert.assertArrayEquals(BYTES, (byte[]) handle.invoke(input));
}

@Test(expected = ClassCastException.class)
public void testNonKafkaEntity() throws Throwable
{
final MethodHandle handle = KafkaUtils.lookupGetHeaderMethod(KafkaUtilsTest.class.getClassLoader(), "version");
handle.invoke(new ByteEntity(new byte[]{}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class OpenCensusInputFormatTest
@Test
public void testSerde() throws Exception
{
OpenCensusProtobufInputFormat inputFormat = new OpenCensusProtobufInputFormat("metric.name", "descriptor.", "custom.");
OpenCensusProtobufInputFormat inputFormat = new OpenCensusProtobufInputFormat("metric.name", null, "descriptor.", "custom.");

final ObjectMapper jsonMapper = new ObjectMapper();
jsonMapper.registerModules(new OpenCensusProtobufExtensionsModule().getJacksonModules());
Expand All @@ -47,7 +47,7 @@ public void testSerde() throws Exception
@Test
public void testDefaults()
{
OpenCensusProtobufInputFormat inputFormat = new OpenCensusProtobufInputFormat(null, null, null);
OpenCensusProtobufInputFormat inputFormat = new OpenCensusProtobufInputFormat(null, null, null, null);

Assert.assertEquals("name", inputFormat.getMetricDimension());
Assert.assertEquals("", inputFormat.getMetricLabelPrefix());
Expand Down
Loading

0 comments on commit 788b94d

Please sign in to comment.