From 40a1e1a827483c0cba83d58f452aede7d7bf9bb6 Mon Sep 17 00:00:00 2001 From: stheppi Date: Tue, 20 Feb 2024 22:15:30 +0000 Subject: [PATCH] Enhancement: Support Record Timestamp for Data Partitioning This update addresses a common user request to utilize the record timestamp for data partitioning purposes. Previously, the timestamp converter exclusively worked with the record Key or Value. Users had to resort to additional SMTs to transfer the record timestamp into the payload (Key or Value), or handle it during data production, leading to redundancy. To streamline this process, this PR introduces support for utilizing the record timestamp directly. It introduces a new `_timestamp` prefix. When specified, the converter will read the record timestamp as the source value for conversion, eliminating the need for additional data manipulation steps. --- TimestampConverter.md | 13 +++- pom.xml | 2 +- .../smt/header/TimestampConverter.java | 51 ++++++++++++--- .../smt/header/TimestampConverterTest.java | 63 +++++++++++++++++++ 4 files changed, 120 insertions(+), 9 deletions(-) diff --git a/TimestampConverter.md b/TimestampConverter.md index 685dca8..fc2bbbe 100644 --- a/TimestampConverter.md +++ b/TimestampConverter.md @@ -19,7 +19,7 @@ The SMT adds a few more features to the original: | Name | Description | Type | Default | Valid Values | |-----------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------|--------------|--------------------------------------------------| | `header.name` | The name of the header to insert the timestamp into. | String | | | -| `field` | The field path containing the timestamp, or empty if the entire value is a timestamp. Prefix the path with the literal string `_key` or `_value` to specify the record Key or Value is used as source. If not specified `_value` is implied. | String | | | +| `field` | The field path containing the timestamp, or empty if the entire value is a timestamp. Prefix the path with the literal string `_key`, `_value` or `_timestamp`, to specify the record Key, Value or Timestamp is used as source. If not specified `_value` is implied. | String | | | | `target.type` | Sets the desired timestamp representation. | String | | string,unix,date,time,timestamp | | `format.from.pattern` | Sets the format of the timestamp when the input is a string. The format requires a Java DateTimeFormatter-compatible pattern. | String | | | | `format.to.pattern` | Sets the format of the timestamp when the output is a string. The format requires a Java DateTimeFormatter-compatible pattern. | String | | | @@ -82,4 +82,15 @@ transforms.TimestampConverter.header.name=wallclock transforms.TimestampConverter.field=_key.ts transforms.TimestampConverter.target.type=unix transforms.TimestampConverter.unix.precision=milliseconds +``` + +Here is an example using the record timestamp field: + +```properties +transforms=TimestampConverter +transforms.TimestampConverter.type=io.lenses.connect.smt.header.TimestampConverter +transforms.TimestampConverter.header.name=wallclock +transforms.TimestampConverter.field=_timestamp +transforms.TimestampConverter.target.type=unix +transforms.TimestampConverter.unix.precision=milliseconds ``` \ No newline at end of file diff --git a/pom.xml b/pom.xml index ab23265..abdf997 100644 --- a/pom.xml +++ b/pom.xml @@ -26,7 +26,7 @@ io.lenses kafka-connect-smt - 1.0.4-SNAPSHOT + 1.0.5-SNAPSHOT jar diff --git a/src/main/java/io/lenses/connect/smt/header/TimestampConverter.java b/src/main/java/io/lenses/connect/smt/header/TimestampConverter.java index e161325..f160011 100644 --- a/src/main/java/io/lenses/connect/smt/header/TimestampConverter.java +++ b/src/main/java/io/lenses/connect/smt/header/TimestampConverter.java @@ -63,6 +63,7 @@ public final class TimestampConverter> implements Tra private static final String KEY_FIELD = "_key"; private static final String VALUE_FIELD = "_value"; + private static final String TIMESTAMP_FIELD = "_timestamp"; public static final String UNIX_PRECISION_CONFIG = "unix.precision"; private static final String UNIX_PRECISION_DEFAULT = "milliseconds"; @@ -194,7 +195,6 @@ public Date toRaw(Config config, Object orig) { try { final LocalDateTime localDateTime = LocalDateTime.parse((String) orig, config.fromFormat); - localDateTime.atZone(ZoneOffset.UTC); return Date.from(localDateTime.atZone(ZoneOffset.UTC).toInstant()); } catch (DateTimeParseException e) { throw new DataException( @@ -344,6 +344,22 @@ public Date toType(Config config, Date orig) { new TimestampTranslator() { @Override public Date toRaw(Config config, Object orig) { + // long epoch to date + if (orig instanceof Long) { + // if precision is not specified, assume milliseconds + // otherwise convert to milliseconds + switch (config.unixPrecision) { + case UNIX_PRECISION_SECONDS: + return new Date(TimeUnit.SECONDS.toMillis((Long) orig)); + case UNIX_PRECISION_MICROS: + return new Date(TimeUnit.MICROSECONDS.toMillis((Long) orig)); + case UNIX_PRECISION_NANOS: + return new Date(TimeUnit.NANOSECONDS.toMillis((Long) orig)); + case UNIX_PRECISION_MILLIS: + default: + return new Date((Long) orig); + } + } if (!(orig instanceof Date)) { throw new DataException( "Expected Timestamp to be a java.util.Date, but found " + orig.getClass()); @@ -400,7 +416,13 @@ private static class Config { Optional rollingWindow; } - private boolean isKey; + private static enum FieldType { + KEY, + VALUE, + TIMESTAMP + } + + private FieldType fieldType; private Config config; @Override @@ -432,18 +454,26 @@ public void configure(Map configs) { String[] fields = fieldConfig.split("\\."); if (fields.length > 0) { if (fields[0].equalsIgnoreCase(KEY_FIELD)) { - isKey = true; + fieldType = FieldType.KEY; // drop the first element fields = Arrays.copyOfRange(fields, 1, fields.length); + } else if (fields[0].equalsIgnoreCase(TIMESTAMP_FIELD)) { + fieldType = FieldType.TIMESTAMP; + // if fields length is > 1, then it is an error since the timestamp is a primitive + if (fields.length > 1) { + throw new ConfigException( + "When using the record timestamp field, the field path should only be '_timestamp'."); + } + fields = new String[0]; } else { - isKey = false; + fieldType = FieldType.VALUE; if (fields[0].equalsIgnoreCase(VALUE_FIELD)) { // drop the first element fields = Arrays.copyOfRange(fields, 1, fields.length); } } } else { - isKey = false; + fieldType = FieldType.VALUE; } // ignore NONE as a rolling window type @@ -504,14 +534,21 @@ public ConfigDef config() { public void close() {} private Schema operatingSchema(R record) { - if (isKey) { + if (fieldType == FieldType.TIMESTAMP) { + // A record timestamp is epoch time and corresponds to the Logical Type Timestamp + return Timestamp.SCHEMA; + } + if (fieldType == FieldType.KEY) { return record.keySchema(); } return record.valueSchema(); } private Object operatingValue(R record) { - if (isKey) { + if (fieldType == FieldType.TIMESTAMP) { + return record.timestamp(); + } + if (fieldType == FieldType.KEY) { return record.key(); } return record.value(); diff --git a/src/test/java/io/lenses/connect/smt/header/TimestampConverterTest.java b/src/test/java/io/lenses/connect/smt/header/TimestampConverterTest.java index 554d076..542f44c 100644 --- a/src/test/java/io/lenses/connect/smt/header/TimestampConverterTest.java +++ b/src/test/java/io/lenses/connect/smt/header/TimestampConverterTest.java @@ -15,6 +15,8 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import java.time.Instant; import java.time.ZoneOffset; @@ -75,6 +77,7 @@ public class TimestampConverterTest { DATE_PLUS_TIME_UNIX_NANOS = DATE_PLUS_TIME_UNIX_MICROS * 1000 + 456; // 86401 seconds DATE_PLUS_TIME_UNIX_SECONDS = DATE_PLUS_TIME.getTimeInMillis() / 1000; + DATE_PLUS_TIME_STRING = "1970 01 02 00 00 01 234 UTC"; } @@ -883,6 +886,66 @@ void testWithSchemaValuePrefixedFieldConversion_Seconds() { assertEquals(expectedDate.getTime(), header.value()); } + @Test + void testWithRecordMetadataPrefixedFieldConversion_Seconds() { + Map config = new HashMap<>(); + config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); + config.put(TimestampConverter.FIELD_CONFIG, "_timestamp"); + config.put(TimestampConverter.UNIX_PRECISION_CONFIG, "seconds"); + config.put(TimestampConverter.HEADER_NAME_CONFIG, "ts_header"); + // ts field is a unix timestamp with seconds precision + Schema structWithTimestampFieldSchema = + SchemaBuilder.struct().field("ts", Schema.INT64_SCHEMA).build(); + Struct original = new Struct(structWithTimestampFieldSchema); + original.put("ts", DATE_PLUS_TIME_UNIX_SECONDS); + + final TimestampConverter transformer = new TimestampConverter<>(); + transformer.configure(config); + final SourceRecord initial = + new SourceRecord( + null, + null, + "topic", + 0, + null, + null, + structWithTimestampFieldSchema, + original, + DATE_PLUS_TIME_UNIX_SECONDS); + final SourceRecord transformed = transformer.apply(initial); + + Calendar expectedDate = GregorianCalendar.getInstance(UTC); + expectedDate.setTimeInMillis(0L); + expectedDate.add(Calendar.DATE, 1); + expectedDate.add(Calendar.SECOND, 1); + + Header header = transformed.headers().lastWithName("ts_header"); + assertNotNull(header); + assertEquals(Timestamp.SCHEMA.type(), header.schema().type()); + assertEquals(expectedDate.getTime(), header.value()); + } + + @Test + void testRaiseExceptionIfTimestampMetadataIsUsedWithAPath() { + Map config = new HashMap<>(); + config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); + config.put(TimestampConverter.FIELD_CONFIG, "_timestamp.incorrect.path"); + config.put(TimestampConverter.UNIX_PRECISION_CONFIG, "seconds"); + config.put(TimestampConverter.HEADER_NAME_CONFIG, "ts_header"); + + final TimestampConverter transformer = new TimestampConverter<>(); + + try { + transformer.configure(config); + fail("Expected a ConfigException"); + } catch (ConfigException e) { + assertTrue( + e.getMessage() + .contains( + "When using the record timestamp field, the field path should only be '_timestamp'.")); + } + } + @Test void testWithSchemaKeyPrefixedFieldConversion_Seconds() { Map config = new HashMap<>();