Skip to content

Commit

Permalink
Merge pull request #4 from lensesio/feat/support_record_timestamp
Browse files Browse the repository at this point in the history
Enhancement: Support Record Timestamp for Data Partitioning
  • Loading branch information
stheppi authored Feb 21, 2024
2 parents e5d09ee + 40a1e1a commit e5e85df
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 9 deletions.
13 changes: 12 additions & 1 deletion TimestampConverter.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ The SMT adds a few more features to the original:
| Name | Description | Type | Default | Valid Values |
|-----------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------|--------------|--------------------------------------------------|
| `header.name` | The name of the header to insert the timestamp into. | String | | |
| `field` | The field path containing the timestamp, or empty if the entire value is a timestamp. Prefix the path with the literal string `_key` or `_value` to specify the record Key or Value is used as source. If not specified `_value` is implied. | String | | |
| `field` | The field path containing the timestamp, or empty if the entire value is a timestamp. Prefix the path with the literal string `_key`, `_value` or `_timestamp`, to specify the record Key, Value or Timestamp is used as source. If not specified `_value` is implied. | String | | |
| `target.type` | Sets the desired timestamp representation. | String | | string,unix,date,time,timestamp |
| `format.from.pattern` | Sets the format of the timestamp when the input is a string. The format requires a Java DateTimeFormatter-compatible pattern. | String | | |
| `format.to.pattern` | Sets the format of the timestamp when the output is a string. The format requires a Java DateTimeFormatter-compatible pattern. | String | | |
Expand Down Expand Up @@ -82,4 +82,15 @@ transforms.TimestampConverter.header.name=wallclock
transforms.TimestampConverter.field=_key.ts
transforms.TimestampConverter.target.type=unix
transforms.TimestampConverter.unix.precision=milliseconds
```

Here is an example using the record timestamp field:

```properties
transforms=TimestampConverter
transforms.TimestampConverter.type=io.lenses.connect.smt.header.TimestampConverter
transforms.TimestampConverter.header.name=wallclock
transforms.TimestampConverter.field=_timestamp
transforms.TimestampConverter.target.type=unix
transforms.TimestampConverter.unix.precision=milliseconds
```
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

<groupId>io.lenses</groupId>
<artifactId>kafka-connect-smt</artifactId>
<version>1.0.4-SNAPSHOT</version>
<version>1.0.5-SNAPSHOT</version>
<packaging>jar</packaging>

<properties>
Expand Down
51 changes: 44 additions & 7 deletions src/main/java/io/lenses/connect/smt/header/TimestampConverter.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public final class TimestampConverter<R extends ConnectRecord<R>> implements Tra

private static final String KEY_FIELD = "_key";
private static final String VALUE_FIELD = "_value";
private static final String TIMESTAMP_FIELD = "_timestamp";

public static final String UNIX_PRECISION_CONFIG = "unix.precision";
private static final String UNIX_PRECISION_DEFAULT = "milliseconds";
Expand Down Expand Up @@ -194,7 +195,6 @@ public Date toRaw(Config config, Object orig) {
try {
final LocalDateTime localDateTime =
LocalDateTime.parse((String) orig, config.fromFormat);
localDateTime.atZone(ZoneOffset.UTC);
return Date.from(localDateTime.atZone(ZoneOffset.UTC).toInstant());
} catch (DateTimeParseException e) {
throw new DataException(
Expand Down Expand Up @@ -344,6 +344,22 @@ public Date toType(Config config, Date orig) {
new TimestampTranslator() {
@Override
public Date toRaw(Config config, Object orig) {
// long epoch to date
if (orig instanceof Long) {
// if precision is not specified, assume milliseconds
// otherwise convert to milliseconds
switch (config.unixPrecision) {
case UNIX_PRECISION_SECONDS:
return new Date(TimeUnit.SECONDS.toMillis((Long) orig));
case UNIX_PRECISION_MICROS:
return new Date(TimeUnit.MICROSECONDS.toMillis((Long) orig));
case UNIX_PRECISION_NANOS:
return new Date(TimeUnit.NANOSECONDS.toMillis((Long) orig));
case UNIX_PRECISION_MILLIS:
default:
return new Date((Long) orig);
}
}
if (!(orig instanceof Date)) {
throw new DataException(
"Expected Timestamp to be a java.util.Date, but found " + orig.getClass());
Expand Down Expand Up @@ -400,7 +416,13 @@ private static class Config {
Optional<RollingWindowDetails> rollingWindow;
}

private boolean isKey;
private static enum FieldType {
KEY,
VALUE,
TIMESTAMP
}

private FieldType fieldType;
private Config config;

@Override
Expand Down Expand Up @@ -432,18 +454,26 @@ public void configure(Map<String, ?> configs) {
String[] fields = fieldConfig.split("\\.");
if (fields.length > 0) {
if (fields[0].equalsIgnoreCase(KEY_FIELD)) {
isKey = true;
fieldType = FieldType.KEY;
// drop the first element
fields = Arrays.copyOfRange(fields, 1, fields.length);
} else if (fields[0].equalsIgnoreCase(TIMESTAMP_FIELD)) {
fieldType = FieldType.TIMESTAMP;
// if fields length is > 1, then it is an error since the timestamp is a primitive
if (fields.length > 1) {
throw new ConfigException(
"When using the record timestamp field, the field path should only be '_timestamp'.");
}
fields = new String[0];
} else {
isKey = false;
fieldType = FieldType.VALUE;
if (fields[0].equalsIgnoreCase(VALUE_FIELD)) {
// drop the first element
fields = Arrays.copyOfRange(fields, 1, fields.length);
}
}
} else {
isKey = false;
fieldType = FieldType.VALUE;
}

// ignore NONE as a rolling window type
Expand Down Expand Up @@ -504,14 +534,21 @@ public ConfigDef config() {
public void close() {}

private Schema operatingSchema(R record) {
if (isKey) {
if (fieldType == FieldType.TIMESTAMP) {
// A record timestamp is epoch time and corresponds to the Logical Type Timestamp
return Timestamp.SCHEMA;
}
if (fieldType == FieldType.KEY) {
return record.keySchema();
}
return record.valueSchema();
}

private Object operatingValue(R record) {
if (isKey) {
if (fieldType == FieldType.TIMESTAMP) {
return record.timestamp();
}
if (fieldType == FieldType.KEY) {
return record.key();
}
return record.value();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.time.Instant;
import java.time.ZoneOffset;
Expand Down Expand Up @@ -75,6 +77,7 @@ public class TimestampConverterTest {
DATE_PLUS_TIME_UNIX_NANOS = DATE_PLUS_TIME_UNIX_MICROS * 1000 + 456;
// 86401 seconds
DATE_PLUS_TIME_UNIX_SECONDS = DATE_PLUS_TIME.getTimeInMillis() / 1000;

DATE_PLUS_TIME_STRING = "1970 01 02 00 00 01 234 UTC";
}

Expand Down Expand Up @@ -883,6 +886,66 @@ void testWithSchemaValuePrefixedFieldConversion_Seconds() {
assertEquals(expectedDate.getTime(), header.value());
}

@Test
void testWithRecordMetadataPrefixedFieldConversion_Seconds() {
Map<String, String> config = new HashMap<>();
config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp");
config.put(TimestampConverter.FIELD_CONFIG, "_timestamp");
config.put(TimestampConverter.UNIX_PRECISION_CONFIG, "seconds");
config.put(TimestampConverter.HEADER_NAME_CONFIG, "ts_header");
// ts field is a unix timestamp with seconds precision
Schema structWithTimestampFieldSchema =
SchemaBuilder.struct().field("ts", Schema.INT64_SCHEMA).build();
Struct original = new Struct(structWithTimestampFieldSchema);
original.put("ts", DATE_PLUS_TIME_UNIX_SECONDS);

final TimestampConverter<SourceRecord> transformer = new TimestampConverter<>();
transformer.configure(config);
final SourceRecord initial =
new SourceRecord(
null,
null,
"topic",
0,
null,
null,
structWithTimestampFieldSchema,
original,
DATE_PLUS_TIME_UNIX_SECONDS);
final SourceRecord transformed = transformer.apply(initial);

Calendar expectedDate = GregorianCalendar.getInstance(UTC);
expectedDate.setTimeInMillis(0L);
expectedDate.add(Calendar.DATE, 1);
expectedDate.add(Calendar.SECOND, 1);

Header header = transformed.headers().lastWithName("ts_header");
assertNotNull(header);
assertEquals(Timestamp.SCHEMA.type(), header.schema().type());
assertEquals(expectedDate.getTime(), header.value());
}

@Test
void testRaiseExceptionIfTimestampMetadataIsUsedWithAPath() {
Map<String, String> config = new HashMap<>();
config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp");
config.put(TimestampConverter.FIELD_CONFIG, "_timestamp.incorrect.path");
config.put(TimestampConverter.UNIX_PRECISION_CONFIG, "seconds");
config.put(TimestampConverter.HEADER_NAME_CONFIG, "ts_header");

final TimestampConverter<SourceRecord> transformer = new TimestampConverter<>();

try {
transformer.configure(config);
fail("Expected a ConfigException");
} catch (ConfigException e) {
assertTrue(
e.getMessage()
.contains(
"When using the record timestamp field, the field path should only be '_timestamp'."));
}
}

@Test
void testWithSchemaKeyPrefixedFieldConversion_Seconds() {
Map<String, String> config = new HashMap<>();
Expand Down

0 comments on commit e5e85df

Please sign in to comment.