Skip to content

Commit

Permalink
Add support for epoch timestamps and configurable output format
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Dec 19, 2023
1 parent f04ef41 commit be5820a
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ default void parse(
Consumer<Record<Event>> eventConsumer) throws IOException {
Objects.requireNonNull(inputFile);
Objects.requireNonNull(eventConsumer);
System.out.println("======InputFile==="+inputFile);
try (InputStream inputStream = inputFile.newStream()) {
parse(decompressionEngine.createInputStream(inputStream), eventConsumer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public List<String> getPatterns() {
return patterns;
}

@JsonIgnore
public boolean isValidPatterns() {
for (final String pattern: patterns) {
if (!isValidPattern(pattern)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ class DateProcessorTests {
private Map<String, Object> testData;
private LocalDateTime expectedDateTime;
private Instant expectedInstant;
private Random random;

private static final String TIMESTAMP_KEY = "@timestamp";
private final String messageInput = UUID.randomUUID().toString();
Expand All @@ -84,7 +83,6 @@ class DateProcessorTests {

@BeforeEach
void setup() {
random = new Random();
final DateProcessorConfig dateProcessorConfig = new DateProcessorConfig();
lenient().when(mockDateProcessorConfig.getDestination()).thenReturn(dateProcessorConfig.getDestination());
lenient().when(pluginMetrics.counter(DateProcessor.DATE_PROCESSING_MATCH_SUCCESS)).thenReturn(dateProcessingMatchSuccessCounter);
Expand Down Expand Up @@ -277,28 +275,42 @@ void match_with_missing_hours_minutes_seconds_adds_zeros_test() {
}

private static Stream<Arguments> getInputOutputFormats() {
return Stream.of(
Arguments.of("epoch_second", "epoch_milli"),
Arguments.of("epoch_second", "epoch_nano"),
Arguments.of("epoch_second", "yyyy-MMM-dd HH:mm:ss.SSS"),
Arguments.of("epoch_second", DateProcessorConfig.DEFAULT_OUTPUT_FORMAT),
Arguments.of("epoch_milli", "epoch_second"),
Arguments.of("epoch_milli", "epoch_nano"),
Arguments.of("epoch_milli", "yyyy-MMM-dd HH:mm:ss.SSS"),
Arguments.of("epoch_milli", DateProcessorConfig.DEFAULT_OUTPUT_FORMAT),
Arguments.of("epoch_nano", "epoch_second"),
Arguments.of("epoch_nano", "epoch_milli"),
Arguments.of("epoch_nano", "yyyy-MM-dd'T'HH:mm:ss.nnnnnnnnnXXX"),
Arguments.of("epoch_nano", DateProcessorConfig.DEFAULT_OUTPUT_FORMAT),
Arguments.of("yyyy-MMM-dd HH:mm:ss.SSS", "epoch_second"),
Arguments.of("yyyy-MMM-dd HH:mm:ss.SSS", "epoch_milli"),
Arguments.of("yyyy-MMM-dd HH:mm:ss.nnnnnnnnnXXX", "epoch_nano")
);
Instant now = Instant.now();
long epochSeconds = now.getEpochSecond();
Random random = new Random();
long millis = random.nextInt(1000);
long nanos = random.nextInt(1000_000_000);
long epochMillis = epochSeconds * 1000L + millis;
long epochNanos = epochSeconds * 1000_000_000L + nanos;

ZonedDateTime zdtSeconds = ZonedDateTime.ofInstant(Instant.ofEpochSecond(epochSeconds), java.time.ZoneId.of("UTC"));
ZonedDateTime zdtMillis = ZonedDateTime.ofInstant(Instant.ofEpochMilli(epochMillis), java.time.ZoneId.of("UTC"));
ZonedDateTime zdtNanos = ZonedDateTime.ofInstant(Instant.ofEpochSecond(epochSeconds, nanos), java.time.ZoneId.of("UTC"));
String testFormat = "yyyy-MMM-dd HH:mm:ss.SSS";
String testNanosFormat = "yyyy-MMM-dd HH:mm:ss.nnnnnnnnnXXX";
String defaultFormat = DateProcessorConfig.DEFAULT_OUTPUT_FORMAT;
return Stream.of(
Arguments.of("epoch_second", epochSeconds, "epoch_milli", epochSeconds * 1000L),
Arguments.of("epoch_second", epochSeconds, "epoch_nano", epochSeconds * 1000_000_000L),
Arguments.of("epoch_second", epochSeconds, testFormat, zdtSeconds.format(DateTimeFormatter.ofPattern(testFormat))),
Arguments.of("epoch_second", epochSeconds, defaultFormat, zdtSeconds.format(DateTimeFormatter.ofPattern(defaultFormat))),
Arguments.of("epoch_milli", epochMillis, "epoch_second", epochSeconds),
Arguments.of("epoch_milli", epochMillis, "epoch_nano", epochMillis * 1000_000),
Arguments.of("epoch_milli", epochMillis, testFormat, zdtMillis.format(DateTimeFormatter.ofPattern(testFormat))),
Arguments.of("epoch_milli", epochMillis, defaultFormat, zdtMillis.format(DateTimeFormatter.ofPattern(defaultFormat))),
Arguments.of("epoch_nano", epochNanos, "epoch_second", epochSeconds),
Arguments.of("epoch_nano", epochNanos, "epoch_milli", epochNanos/1000_000),
Arguments.of("epoch_nano", epochNanos, testNanosFormat, zdtNanos.format(DateTimeFormatter.ofPattern(testNanosFormat))),
Arguments.of("epoch_nano", epochNanos, defaultFormat, zdtNanos.format(DateTimeFormatter.ofPattern(defaultFormat))),
Arguments.of(testNanosFormat, zdtNanos.format(DateTimeFormatter.ofPattern(testNanosFormat)), "epoch_second", epochSeconds),
Arguments.of(testNanosFormat, zdtNanos.format(DateTimeFormatter.ofPattern(testNanosFormat)), "epoch_milli", epochNanos/1000_000),
Arguments.of(testNanosFormat, zdtNanos.format(DateTimeFormatter.ofPattern(testNanosFormat)), "epoch_nano", epochNanos),
Arguments.of(testNanosFormat, zdtNanos.format(DateTimeFormatter.ofPattern(testNanosFormat)), defaultFormat, zdtNanos.format(DateTimeFormatter.ofPattern(defaultFormat)))
);
}

@ParameterizedTest
@MethodSource("getInputOutputFormats")
void match_with_different_input_output_formats(String inputFormat, String outputFormat) {
void match_with_different_input_output_formats(String inputFormat, Object input, String outputFormat, Object expectedOutput) {
when(mockDateMatch.getKey()).thenReturn("logDate");
when(mockDateMatch.getPatterns()).thenReturn(Collections.singletonList(inputFormat));
when(mockDateProcessorConfig.getOutputFormat()).thenReturn(outputFormat);
Expand All @@ -312,62 +324,19 @@ void match_with_different_input_output_formats(String inputFormat, String output
when(mockDateProcessorConfig.getSourceLocale()).thenReturn(Locale.ROOT);
}
dateProcessor = createObjectUnderTest();

LocalDate localDate = LocalDate.now(ZoneId.of("UTC"));
testData = getTestData();
Instant now = Instant.now();
long epochSeconds = now.getEpochSecond();
long epochMillis = 0L;
long epochNanos = 0L;
ZonedDateTime zonedDateTime;
if (inputFormat.equals("epoch_second")) {
testData.put("logDate", epochSeconds);
epochMillis = epochSeconds * 1000;
epochNanos = epochSeconds * 1000_000_000;
zonedDateTime = ZonedDateTime.ofInstant(Instant.ofEpochSecond(epochSeconds), java.time.ZoneId.of("UTC"));
} else if (inputFormat.equals("epoch_milli")) {
long millis = random.nextInt(1000);
epochMillis = ((long)epochSeconds)*1000L + millis;
epochNanos = epochMillis * 1000_000;
zonedDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(epochMillis), java.time.ZoneId.of("UTC"));
testData.put("logDate", epochMillis);
} else if (inputFormat.equals("epoch_nano")) {
long nanos = random.nextInt(1000_000_000);
epochNanos = ((long)epochSeconds)*1000_000_000L + nanos;
epochMillis = epochNanos / 1000_000;
zonedDateTime = ZonedDateTime.ofInstant(Instant.ofEpochSecond(epochSeconds, nanos), java.time.ZoneId.of("UTC"));
testData.put("logDate", epochNanos);
} else {
when(mockDateProcessorConfig.getSourceZoneId()).thenReturn(ZoneId.of("UTC"));
long nanos = random.nextInt(1000_000_000);
epochNanos = ((long)epochSeconds)*1000_000_000L + nanos;
epochMillis = epochNanos / 1000_000;
zonedDateTime = ZonedDateTime.ofInstant(Instant.ofEpochSecond(epochSeconds, nanos), java.time.ZoneId.of("UTC"));
testData.put("logDate", zonedDateTime.format(DateTimeFormatter.ofPattern(inputFormat)));
}
testData.put("logDate", input);
final Record<Event> record = buildRecordWithEvent(testData);
final List<Record<Event>> processedRecords = (List<Record<Event>>) dateProcessor.doExecute(Collections.singletonList(record));
String actualOutput, expectedOutput;
if (outputFormat.equals("epoch_second") ||
outputFormat.equals("epoch_milli") ||
outputFormat.equals("epoch_nano")) {
Long tsval = processedRecords.get(0).getData().get(TIMESTAMP_KEY, Long.class);
actualOutput = Long.toString(tsval);
} else {
actualOutput= processedRecords.get(0).getData().get(TIMESTAMP_KEY, String.class);
}

ZonedDateTime expectedZonedDateTime;
if (outputFormat.equals("epoch_second")) {
expectedOutput = Long.toString(epochSeconds);
} else if (outputFormat.equals("epoch_milli")) {
expectedOutput = Long.toString(epochMillis);
} else if (outputFormat.equals("epoch_nano")) {
expectedOutput = Long.toString(epochNanos);
Long actualOutput = processedRecords.get(0).getData().get(TIMESTAMP_KEY, Long.class);
assertThat(actualOutput, equalTo((Long)expectedOutput));
} else {
expectedOutput = zonedDateTime.format(DateTimeFormatter.ofPattern(outputFormat));
String actualOutput= processedRecords.get(0).getData().get(TIMESTAMP_KEY, String.class);
assertThat(actualOutput, equalTo((String)expectedOutput));
}
assertThat(actualOutput, equalTo(expectedOutput));
verify(dateProcessingMatchSuccessCounter, times(1)).increment();
}

Expand Down

0 comments on commit be5820a

Please sign in to comment.