Skip to content

Commit

Permalink
Fix otel log format issue opensearch-project#3123
Browse files Browse the repository at this point in the history
Covert KVList into a regular json string format.

Signed-off-by: Gong Yi <[email protected]>
  • Loading branch information
topikachu authored and gongy committed Aug 9, 2023
1 parent 64875ab commit dd8dc3d
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.opentelemetry.proto.trace.v1.Status;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.dataprepper.model.log.JacksonOtelLog;
import org.opensearch.dataprepper.model.log.OpenTelemetryLog;
import org.opensearch.dataprepper.model.metric.Bucket;
Expand Down Expand Up @@ -643,17 +644,9 @@ public static Object convertAnyValue(final AnyValue value) {
* as Json string.
*/
case ARRAY_VALUE:
try {
return OBJECT_MAPPER.writeValueAsString(value.getArrayValue().getValuesList().stream()
.map(OTelProtoCodec::convertAnyValue)
.collect(Collectors.toList()));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
case KVLIST_VALUE:
try {
return OBJECT_MAPPER.writeValueAsString(value.getKvlistValue().getValuesList().stream()
.collect(Collectors.toMap(i -> REPLACE_DOT_WITH_AT.apply(i.getKey()), i -> convertAnyValue(i.getValue()))));
return OBJECT_MAPPER.writeValueAsString(asPlanObject(value));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
Expand All @@ -662,6 +655,33 @@ public static Object convertAnyValue(final AnyValue value) {
}
}

private static Object asPlanObject(AnyValue anyValue){
switch (anyValue.getValueCase()) {
case VALUE_NOT_SET:
case STRING_VALUE:
return anyValue.getStringValue();
case BOOL_VALUE:
return anyValue.getBoolValue();
case INT_VALUE:
return anyValue.getIntValue();
case DOUBLE_VALUE:
return anyValue.getDoubleValue();
case ARRAY_VALUE:
return anyValue.getArrayValue().getValuesList()
.stream()
.map(OTelProtoCodec::asPlanObject)
.collect(Collectors.toList());
case KVLIST_VALUE:
return anyValue.getKvlistValue().getValuesList()
.stream()
.map(
keyValue -> Pair.of(keyValue.getKey(), asPlanObject(keyValue.getValue()))
)
.collect(Collectors.toMap(i -> REPLACE_DOT_WITH_AT.apply(i.getKey()), Pair::getValue));
}
return null;
}

/**
* Converts the keys of all attributes in the {@link NumberDataPoint}.
* Also, casts the underlying data into its actual type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class OTelProtoCodecTest {

private static final String TEST_REQUEST_LOGS_IS_JSON_FILE = "test-request-log-is.json";

private static final String TEST_REQUEST_COMPLEX_LOGS_JSON_FILE = "test-request-complex-log.json";

private static final Long TIME = TimeUnit.MILLISECONDS.toNanos(ZonedDateTime.of(
LocalDateTime.of(2020, 5, 24, 14, 1, 0),
Expand Down Expand Up @@ -427,6 +428,15 @@ public void testParseExportLogsServiceRequest_ScopedLogs() throws IOException {
validateLog(logs.get(0));
}

@Test
public void testParseExportComplexLogsServiceRequest_ScopedLogs() throws IOException {
final ExportLogsServiceRequest exportLogsServiceRequest = buildExportLogsServiceRequestFromJsonFile(TEST_REQUEST_COMPLEX_LOGS_JSON_FILE);
List<OpenTelemetryLog> logs = decoderUnderTest.parseExportLogsServiceRequest(exportLogsServiceRequest);

assertThat(logs.size() , is(equalTo(1)));
assertThat(logs.get(0).getBody(), is("{\"key1\":\"value1\",\"key2\":\"value2\",\"key3\":{\"nestedKey1\":\"nestedValue1\"}}"));
}

@Test
public void testParseExportLogsServiceRequest_InstrumentationLibraryLogs() throws IOException {
final ExportLogsServiceRequest exportLogsServiceRequest = buildExportLogsServiceRequestFromJsonFile(TEST_REQUEST_LOGS_IS_JSON_FILE);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
{
"resourceLogs": [{
"resource": {
"attributes": [{
"key": "service.name",
"value": {
"stringValue": "service"
}
}]
},
"scopeLogs": [{
"logRecords": [{
"timeUnixNano": "1590328800000000000",
"severityNumber": "SEVERITY_NUMBER_DEBUG",
"body": {
"kvlistValue": {
"values": [{
"key": "key1",
"value": {
"stringValue": "value1"
}
}, {
"key": "key2",
"value": {
"stringValue": "value2"
}
}, {
"key": "key3",
"value": {
"kvlistValue": {
"values": [{
"key": "nestedKey1",
"value": {
"stringValue": "nestedValue1"
}
}]
}
}
}]
}
},
"attributes": [{
"key": "statement.params",
"value": {
"stringValue": "us-east-1"
}
}],
"droppedAttributesCount": 3,
"flags": 1,
"traceId": "uhocI7QJO2M=",
"spanId": "LMg6yQ68Rpw=",
"observedTimeUnixNano": "1590328802000000000"
}]
}],
"schemaUrl": "schemaurl"
}]
}

0 comments on commit dd8dc3d

Please sign in to comment.