From b6f9c66f4431e82e25454ab3dbf843cac4c477f3 Mon Sep 17 00:00:00 2001 From: stheppi Date: Fri, 23 Feb 2024 12:23:27 +0000 Subject: [PATCH 1/9] Timezone support There have been requests from users to support date/time partition in the S3/GCP/Azure sink with a specific timezone. The code changes brings the timezone configuration as an optional setting. When epoch configuration is involved, changing the timezone to non-UTC yields an error. Removes the check plugin since the jar dependency is built with a newer JVM version and fails the build. --- InsertRollingWallclock.md | 16 +- InsertWallclock.md | 26 ++- InsertWallclockDateTimePart.md | 23 ++- TimestampConverter.md | 28 ++- pom.xml | 4 +- .../lenses/connect/smt/header/Constants.java | 7 + .../smt/header/InsertRollingWallclock.java | 35 +++- .../connect/smt/header/InsertWallclock.java | 27 ++- .../header/InsertWallclockDateTimePart.java | 58 ++++-- .../smt/header/TimestampConverter.java | 66 ++++--- .../io/lenses/connect/smt/header/Utils.java | 5 +- .../header/InsertRollingWallclockTest.java | 72 +++++++ .../InsertWallclockDateTimePartTest.java | 54 ++++++ .../smt/header/InsertWallclockTest.java | 54 ++++++ .../smt/header/TimestampConverterTest.java | 182 ++++++++++++------ 15 files changed, 523 insertions(+), 134 deletions(-) create mode 100644 src/main/java/io/lenses/connect/smt/header/Constants.java diff --git a/InsertRollingWallclock.md b/InsertRollingWallclock.md index 5dd2d2b..2d3b18d 100644 --- a/InsertRollingWallclock.md +++ b/InsertRollingWallclock.md @@ -16,6 +16,7 @@ The value inserted is stored as a STRING, and it holds either a string represent | `format` | Sets the format of the header value inserted if the type was set to string. It can be any valid java date format. | String | | | High | | `rolling.window.type` | Sets the window type. It can be fixed or rolling. | String | minutes | hours, minutes, seconds | High | | `rolling.window.size` | Sets the window size. It can be any positive integer, and depending on the `window.type` it has an upper bound, 60 for seconds and minutes, and 24 for hours. | Int | 15 | | High | +| `timezone` | Sets the timezone. It can be any valid java timezone. Overwrite it when `value.type` is set to `format`, otherwise it will raise an exception. | String | UTC | | High | ## Example @@ -36,8 +37,21 @@ To store a string representation of the date and time in the format `yyyy-MM-dd transforms=InsertRollingWallclock transforms.InsertRollingWallclock.type=io.lenses.connect.smt.header.InsertRollingWallclock transforms.InsertRollingWallclock.header.name=wallclock -transforms.InsertRollingWallclock.value.type=string +transforms.InsertRollingWallclock.value.type=format transforms.InsertRollingWallclock.format=yyyy-MM-dd HH:mm:ss.SSS transforms.InsertRollingWallclock.rolling.window.type=minutes transforms.InsertRollingWallclock.rolling.window.size=15 ``` + +To use the timezone `Asia/Kolkoata`, use the following: + +```properties +transforms=InsertRollingWallclock +transforms.InsertRollingWallclock.type=io.lenses.connect.smt.header.InsertRollingWallclock +transforms.InsertRollingWallclock.header.name=wallclock +transforms.InsertRollingWallclock.value.type=format +transforms.InsertRollingWallclock.format=yyyy-MM-dd HH:mm:ss.SSS +transforms.InsertRollingWallclock.rolling.window.type=minutes +transforms.InsertRollingWallclock.rolling.window.size=15 +transforms.InsertRollingWallclock.timezone=Asia/Kolkata +``` \ No newline at end of file diff --git a/InsertWallclock.md b/InsertWallclock.md index 1aea27d..1948579 100644 --- a/InsertWallclock.md +++ b/InsertWallclock.md @@ -10,13 +10,12 @@ for example `yyyy-MM-dd HH:mm:ss.SSS`. ## Configuration -| Name | Description | Type | Default | Valid Values | Importance | -|---------------|-----------------------------------------------------------------------------------------------------------------------|--------|---------|--------------|------------| -| `header.name` | The name of the header to insert the timestamp into. | String | | | High | -| `value.type` | Sets the header value inserted. It can be epoch or string. If string is used, then the 'format' setting is required." | String | format | epoch,format | High | -| `format` | Sets the format of the header value inserted if the type was set to string. It can be any valid java date format. | String | | | High | - - +| Name | Description | Type | Default | Valid Values | Importance | +|---------------|------------------------------------------------------------------------------------------------------------------------------------------------|--------|---------|--------------|------------| +| `header.name` | The name of the header to insert the timestamp into. | String | | | High | +| `value.type` | Sets the header value inserted. It can be epoch or string. If string is used, then the 'format' setting is required." | String | format | epoch,format | High | +| `format` | Sets the format of the header value inserted if the type was set to string. It can be any valid java date format. | String | | | High | +| `timezone` | Sets the timezone. It can be any valid java timezone. Overwrite it when `value.type` is set to `format`, otherwise it will raise an exception. | String | UTC | | High | ## Example @@ -35,6 +34,17 @@ To store a string representation of the date and time in the format `yyyy-MM-dd transforms=InsertWallclock transforms.InsertWallclock.type=io.lenses.connect.smt.header.InsertWallclock transforms.InsertWallclock.header.name=wallclock -transforms.InsertWallclock.value.type=string +transforms.InsertWallclock.value.type=format +transforms.InsertWallclock.format=yyyy-MM-dd HH:mm:ss.SSS +``` + +To use the timezone `Asia/Kolkoata`, use the following: + +```properties +transforms=InsertWallclock +transforms.InsertWallclock.type=io.lenses.connect.smt.header.InsertWallclock +transforms.InsertWallclock.header.name=wallclock +transforms.InsertWallclock.value.type=format transforms.InsertWallclock.format=yyyy-MM-dd HH:mm:ss.SSS +transforms.InsertWallclock.timezone=Asia/Kolkata ``` \ No newline at end of file diff --git a/InsertWallclockDateTimePart.md b/InsertWallclockDateTimePart.md index d7427c0..6f11f2b 100644 --- a/InsertWallclockDateTimePart.md +++ b/InsertWallclockDateTimePart.md @@ -2,16 +2,16 @@ ## Description -A Kafka Connect Single Message Transform (SMT) that inserts the system clock year, month, day, minute, or seconds as a message header, with a value of type STRING. +A Kafka Connect Single Message Transform (SMT) that inserts the system clock year, month, day, minute, or seconds as a +message header, with a value of type STRING. ## Configuration - -| Name | Description | Type | Default | Valid Values | Importance | -|------------------|------------------------------------------------------|--------|---------|---------------------------------------|------------| -| `header.name` | The name of the header to insert the timestamp into. | String | | | High | -| `date.time.part` | The date time part to insert. | String | | year, month, day, hour,minute, second | High | - +| Name | Description | Type | Default | Valid Values | Importance | +|------------------|-------------------------------------------------------|--------|---------|---------------------------------------|------------| +| `header.name` | The name of the header to insert the timestamp into. | String | | | High | +| `date.time.part` | The date time part to insert. | String | | year, month, day, hour,minute, second | High | +| `timezone` | Sets the timezone. It can be any valid java timezone. | String | UTC | | High | ## Example @@ -50,6 +50,15 @@ transforms.InsertWallclockDateTimePart.type=io.lenses.connect.smt.header.InsertW transforms.InsertWallclockDateTimePart.header.name=wallclock transforms.InsertWallclockDateTimePart.date.time.part=hour ``` +To store the hour, and apply a timezone, use the following configuration: + +```properties +transforms=InsertWallclockDateTimePart +transforms.InsertWallclockDateTimePart.type=io.lenses.connect.smt.header.InsertWallclockDateTimePart +transforms.InsertWallclockDateTimePart.header.name=wallclock +transforms.InsertWallclockDateTimePart.date.time.part=hour +transforms.InsertWallclockDateTimePart.timezone=Asia/Kolkata +``` To store the minute, use the following configuration: diff --git a/TimestampConverter.md b/TimestampConverter.md index fc2bbbe..1bff7fe 100644 --- a/TimestampConverter.md +++ b/TimestampConverter.md @@ -2,7 +2,9 @@ ## Description -An adapted version of the [TimestampConverter](https://github.com/apache/kafka/blob/5c2492bca71200806ccf776ea31639a90290d43e/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java#L50) SMT, that allows the user to specify the format of the timestamp inserted as a header. +An adapted version of +the [TimestampConverter](https://github.com/apache/kafka/blob/5c2492bca71200806ccf776ea31639a90290d43e/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java#L50) +SMT, that allows the user to specify the format of the timestamp inserted as a header. It also avoids the synchronization block requirement for converting to a string representation of the timestamp. The SMT adds a few more features to the original: @@ -12,10 +14,8 @@ The SMT adds a few more features to the original: * allows conversion from one string representation to another (e.g. `yyyy-MM-dd HH:mm:ss` to `yyyy-MM-dd`) * allows conversion using a rolling window boundary (e.g. every 15 minutes, or one hour) - ## Configuration - | Name | Description | Type | Default | Valid Values | |-----------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------|--------------|--------------------------------------------------| | `header.name` | The name of the header to insert the timestamp into. | String | | | @@ -26,11 +26,12 @@ The SMT adds a few more features to the original: | `rolling.window.type` | An optional parameter for the rolling time window type. When set it will adjust the output value according to the time window boundary. | String | none | none, hours, minutes, seconds | | `rolling.window.size` | An optional positive integer parameter for the rolling time window size. When `rolling.window.type` is defined this setting is required. The value is bound by the `rolling.window.type` configuration. If type is `minutes` or `seconds` then the value cannot bigger than 60, and if the type is `hours` then the max value is 24. | Int | 15 | | | `unix.precision` | The desired Unix precision for the timestamp. Used to generate the output when type=unix or used to parse the input if the input is a Long. This SMT will cause precision loss during conversions from, and to, values with sub-millisecond components. | String | milliseconds | seconds, milliseconds, microseconds, nanoseconds | - +| `timezone` | Sets the timezone. It can be any valid java timezone. Overwrite it when `target.type` is set to `date, time, or string`, otherwise it will raise an exception. | String | UTC | | ## Example -To convert to and from a string representation of the date and time in the format `yyyy-MM-dd HH:mm:ss.SSS`, use the following configuration: +To convert to and from a string representation of the date and time in the format `yyyy-MM-dd HH:mm:ss.SSS`, use the +following configuration: ```properties transforms=TimestampConverter @@ -44,7 +45,6 @@ transforms.TimestampConverter.format.to.pattern=yyyy-MM-dd HH:mm:ss.SSS To convert to and from a string representation while applying an hourly rolling window: - ```properties transforms=TimestampConverter transforms.TimestampConverter.type=io.lenses.connect.smt.header.TimestampConverter @@ -57,8 +57,21 @@ transforms.TimestampConverter.rolling.window.type=hours transforms.TimestampConverter.rolling.window.size=1 ``` -To convert to and from a string representation while applying a 15 minutes rolling window: +To convert to and from a string representation while applying an hourly rolling window and timezone: +```properties +transforms=TimestampConverter +transforms.TimestampConverter.type=io.lenses.connect.smt.header.TimestampConverter +transforms.TimestampConverter.header.name=wallclock +transforms.TimestampConverter.field=_value.ts +transforms.TimestampConverter.target.type=string +transforms.TimestampConverter.format.from.pattern=yyyyMMddHHmmssSSS +transforms.TimestampConverter.format.to.pattern=yyyy-MM-dd-HH +transforms.TimestampConverter.rolling.window.type=hours +transforms.TimestampConverter.rolling.window.size=1 +transforms.TimestampConverter.timezone=Asia/Kolkata +``` +To convert to and from a string representation while applying a 15 minutes rolling window: ```properties transforms=TimestampConverter @@ -72,7 +85,6 @@ transforms.TimestampConverter.rolling.window.type=minutes transforms.TimestampConverter.rolling.window.size=15 ``` - To convert to and from a Unix timestamp, use the following: ```properties diff --git a/pom.xml b/pom.xml index abdf997..9838dba 100644 --- a/pom.xml +++ b/pom.xml @@ -83,7 +83,7 @@ - + org.apache.maven.plugins maven-compiler-plugin diff --git a/src/main/java/io/lenses/connect/smt/header/Constants.java b/src/main/java/io/lenses/connect/smt/header/Constants.java new file mode 100644 index 0000000..c5ca3ab --- /dev/null +++ b/src/main/java/io/lenses/connect/smt/header/Constants.java @@ -0,0 +1,7 @@ +package io.lenses.connect.smt.header; + +import java.time.ZoneId; + +public class Constants { + public static final ZoneId UTC = ZoneId.of("UTC"); +} diff --git a/src/main/java/io/lenses/connect/smt/header/InsertRollingWallclock.java b/src/main/java/io/lenses/connect/smt/header/InsertRollingWallclock.java index 9c792e5..1324b52 100644 --- a/src/main/java/io/lenses/connect/smt/header/InsertRollingWallclock.java +++ b/src/main/java/io/lenses/connect/smt/header/InsertRollingWallclock.java @@ -12,10 +12,12 @@ import java.time.Instant; import java.time.OffsetDateTime; +import java.time.ZoneId; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.util.Collections; import java.util.Map; +import java.util.TimeZone; import java.util.function.Supplier; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; @@ -44,6 +46,7 @@ public class InsertRollingWallclock> implements Trans private static final int DEFAULT_ROLLING_WINDOW_VALUE = 15; private static final RollingWindow DEFAULT_ROLLING_WINDOW = RollingWindow.MINUTES; private static final DateTimeFormatter DEFAULT_FORMATTER = DateTimeFormatter.ISO_LOCAL_DATE_TIME; + private ZoneId timezone = Constants.UTC; private interface ConfigName { String HEADER_NAME_CONFIG = "header.name"; @@ -56,6 +59,8 @@ private interface ConfigName { String VALUE_TYPE_EPOCH = "epoch"; String VALUE_TYPE_FORMAT = "format"; + + String TIMEZONE_CONFIG = "timezone"; } public static final ConfigDef CONFIG_DEF = @@ -90,7 +95,13 @@ private interface ConfigName { ConfigDef.Importance.HIGH, "The rolling window size. For example, if the rolling window is set to 'minutes' " + "and the rolling window value is set to 15, then the rolling window " - + "is 15 minutes."); + + "is 15 minutes.") + .define( + ConfigName.TIMEZONE_CONFIG, + ConfigDef.Type.STRING, + "UTC", + ConfigDef.Importance.HIGH, + "The timezone used when 'value.type' is set to format."); /** * Used to testing only to inject the instant value. @@ -144,6 +155,24 @@ public void configure(Map props) { + ConfigName.VALUE_TYPE_CONFIG + "' must be set to either 'epoch' or 'format'."); } + final String timezoneStr = config.getString(ConfigName.TIMEZONE_CONFIG); + try { + this.timezone = TimeZone.getTimeZone(timezoneStr).toZoneId(); + } catch (Exception e) { + throw new ConfigException( + "Configuration '" + + ConfigName.TIMEZONE_CONFIG + + "' is not a valid timezone. It can be any valid java timezone."); + } + if (!this.timezone.getId().equals(Constants.UTC.getId()) + && valueType.equalsIgnoreCase(ConfigName.VALUE_TYPE_EPOCH)) { + throw new ConfigException( + "Configuration '" + + ConfigName.TIMEZONE_CONFIG + + "' is not allowed to be set to a value other than UTC when '" + + ConfigName.VALUE_TYPE_CONFIG + + "' is set to 'epoch'."); + } if (valueType.equalsIgnoreCase(ConfigName.VALUE_TYPE_FORMAT)) { final String pattern = config.getString(ConfigName.FORMAT_CONFIG); if (pattern == null) { @@ -157,6 +186,7 @@ public void configure(Map props) { } } valueExtractorF = this::getFormattedValue; + format = format.withZone(timezone); } else { valueExtractorF = this::getEpochValue; } @@ -190,7 +220,8 @@ private String getEpochValue() { } private String getFormattedValue() { - Instant wallclock = rollingWindowDetails.adjust(instantF.get()); + Instant now = instantF.get(); + Instant wallclock = rollingWindowDetails.adjust(now); OffsetDateTime dateTime = OffsetDateTime.ofInstant(wallclock, ZoneOffset.UTC); return format.format(dateTime); diff --git a/src/main/java/io/lenses/connect/smt/header/InsertWallclock.java b/src/main/java/io/lenses/connect/smt/header/InsertWallclock.java index aef1613..4763753 100644 --- a/src/main/java/io/lenses/connect/smt/header/InsertWallclock.java +++ b/src/main/java/io/lenses/connect/smt/header/InsertWallclock.java @@ -16,6 +16,7 @@ import java.time.format.DateTimeFormatter; import java.util.Locale; import java.util.Map; +import java.util.TimeZone; import java.util.function.Supplier; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; @@ -45,6 +46,7 @@ * "transforms.insertWallclockHeader.header.name": "wallclock", * "transforms.insertWallclockHeader.value.type": "format", * "transforms.insertWallclockHeader.format": "yyyy-MM-dd'T'HH:mm:ss.SSSZ" + * "transforms.insertWallclockHeader.timezone": "Europe/Paris" * * } * @@ -58,6 +60,7 @@ public class InsertWallclock> implements Transformati private Supplier instantF = Instant::now; + private TimeZone timeZone = TimeZone.getTimeZone("UTC"); private static final DateTimeFormatter DEFAULT_FORMATTER = DateTimeFormatter.ISO_LOCAL_DATE_TIME; /** @@ -76,6 +79,8 @@ private interface ConfigName { String VALUE_TYPE_EPOCH = "epoch"; String VALUE_TYPE_FORMAT = "format"; String FORMAT = "format"; + + String TIMEZONE = "timezone"; } public static final ConfigDef CONFIG_DEF = @@ -113,7 +118,13 @@ private interface ConfigName { null, ConfigDef.Importance.MEDIUM, "Sets the format of the header value inserted if the type was set to string. " - + "It can be any valid java date format."); + + "It can be any valid java date format.") + .define( + ConfigName.TIMEZONE, + ConfigDef.Type.STRING, + "UTC", + ConfigDef.Importance.MEDIUM, + "Sets the timezone of the header value inserted if the type was set to string. "); @Override public R apply(R r) { @@ -154,6 +165,13 @@ public void configure(Map props) { + ConfigName.VALUE_TYPE + "' must be set to either 'epoch' or 'format'."); } + final String timezoneStr = config.getString(ConfigName.TIMEZONE); + try { + timeZone = TimeZone.getTimeZone(timezoneStr); + } catch (IllegalArgumentException e) { + throw new ConfigException( + "Configuration '" + ConfigName.TIMEZONE + "' is not a valid timezone."); + } if (valueType.equalsIgnoreCase(ConfigName.VALUE_TYPE_FORMAT)) { final String pattern = config.getString(ConfigName.FORMAT); if (pattern == null) { @@ -166,8 +184,15 @@ public void configure(Map props) { "Configuration '" + ConfigName.FORMAT + "' is not a valid date format."); } } + format = format.withZone(timeZone.toZoneId()); valueExtractorF = this::getFormattedValue; } else { + if (!timeZone.getID().equals(Constants.UTC.getId())) { + throw new ConfigException( + "Configuration '" + + ConfigName.TIMEZONE + + "' must be set to 'UTC' when 'value.type' is set to 'epoch'."); + } valueExtractorF = this::getEpochValue; } } diff --git a/src/main/java/io/lenses/connect/smt/header/InsertWallclockDateTimePart.java b/src/main/java/io/lenses/connect/smt/header/InsertWallclockDateTimePart.java index 6686c79..d33f54e 100644 --- a/src/main/java/io/lenses/connect/smt/header/InsertWallclockDateTimePart.java +++ b/src/main/java/io/lenses/connect/smt/header/InsertWallclockDateTimePart.java @@ -11,11 +11,12 @@ package io.lenses.connect.smt.header; import java.time.Instant; -import java.time.OffsetDateTime; -import java.time.ZoneOffset; +import java.time.ZoneId; +import java.time.ZonedDateTime; import java.util.Arrays; import java.util.Locale; import java.util.Map; +import java.util.TimeZone; import java.util.function.Function; import java.util.function.Supplier; import org.apache.kafka.common.config.ConfigDef; @@ -38,7 +39,9 @@ public class InsertWallclockDateTimePart> implements // Used for testing only to inject the instant value private Supplier instantF = Instant::now; - private Function valueExtractorF; + private ZoneId timeZone = ZoneId.of("UTC"); + + private Function valueExtractorF; void setInstantF(Supplier instantF) { this.instantF = instantF; @@ -59,11 +62,19 @@ void setInstantF(Supplier instantF) { + Arrays.stream(DateTimePart.values()) .map(Enum::name) .reduce((a, b) -> a + ", " + b) - .orElse("")); + .orElse("")) + .define( + ConfigName.TIMEZONE, + ConfigDef.Type.STRING, + "UTC", + ConfigDef.Importance.HIGH, + "The timezone to use."); interface ConfigName { String HEADER_NAME = "header.name"; String DATE_TIME_PART = "date.time.part"; + + String TIMEZONE = "timezone"; } enum DateTimePart { @@ -81,7 +92,9 @@ public R apply(R r) { return null; } - final String value = valueExtractorF.apply(instantF.get()); + Instant now = instantF.get(); + final ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(now, timeZone); + final String value = valueExtractorF.apply(zonedDateTime); r.headers().addString(headerName, value); return r; } @@ -97,6 +110,13 @@ public void close() {} @Override public void configure(Map props) { final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props); + final String timeZoneStr = config.getString(ConfigName.TIMEZONE); + try { + timeZone = TimeZone.getTimeZone(timeZoneStr).toZoneId(); + } catch (IllegalArgumentException e) { + throw new ConfigException( + "Configuration '" + ConfigName.TIMEZONE + "' is not a valid timezone."); + } headerName = config.getString(ConfigName.HEADER_NAME); DateTimePart dateTimePart; try { @@ -140,31 +160,27 @@ public void configure(Map props) { } } - private static OffsetDateTime getDateTime(Instant instant) { - return OffsetDateTime.ofInstant(instant, ZoneOffset.UTC); - } - - private static String getYear(Instant instant) { - return String.valueOf(getDateTime(instant).getYear()); + private static String getYear(ZonedDateTime time) { + return String.valueOf(time.getYear()); } - private static String getMonth(Instant instant) { - return String.valueOf(getDateTime(instant).getMonthValue()); + private static String getMonth(ZonedDateTime time) { + return String.valueOf(time.getMonthValue()); } - private static String getDay(Instant instant) { - return String.valueOf(getDateTime(instant).getDayOfMonth()); + private static String getDay(ZonedDateTime time) { + return String.valueOf(time.getDayOfMonth()); } - private static String getHour(Instant instant) { - return String.valueOf(getDateTime(instant).getHour()); + private static String getHour(ZonedDateTime time) { + return String.valueOf(time.getHour()); } - private static String getMinute(Instant instant) { - return String.valueOf(getDateTime(instant).getMinute()); + private static String getMinute(ZonedDateTime time) { + return String.valueOf(time.getMinute()); } - private static String getSecond(Instant instant) { - return String.valueOf(getDateTime(instant).getSecond()); + private static String getSecond(ZonedDateTime time) { + return String.valueOf(time.getSecond()); } } diff --git a/src/main/java/io/lenses/connect/smt/header/TimestampConverter.java b/src/main/java/io/lenses/connect/smt/header/TimestampConverter.java index f160011..75666ba 100644 --- a/src/main/java/io/lenses/connect/smt/header/TimestampConverter.java +++ b/src/main/java/io/lenses/connect/smt/header/TimestampConverter.java @@ -15,12 +15,15 @@ import static org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull; import java.time.Instant; +import java.time.LocalDate; import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneId; import java.time.ZoneOffset; +import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeParseException; import java.util.Arrays; -import java.util.Calendar; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -61,6 +64,8 @@ public final class TimestampConverter> implements Tra public static final String FORMAT_FROM_CONFIG = "format.from.pattern"; public static final String FORMAT_TO_CONFIG = "format.to.pattern"; + public static final String TARGET_TIMEZONE_CONFIG = "target.timezone"; + private static final String KEY_FIELD = "_key"; private static final String VALUE_FIELD = "_value"; private static final String TIMESTAMP_FIELD = "_timestamp"; @@ -82,6 +87,7 @@ public final class TimestampConverter> implements Tra private static final String UNIX_PRECISION_SECONDS = "seconds"; private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); + private static final ZoneId UTC_ZONE_ID = UTC.toZoneId(); public static final Schema OPTIONAL_DATE_SCHEMA = org.apache.kafka.connect.data.Date.builder().optional().schema(); @@ -160,7 +166,13 @@ public final class TimestampConverter> implements Tra null, ConfigDef.Importance.LOW, "The rolling window size. For example, if the rolling window is set to 'minutes' " - + "size is set to 15."); + + "size is set to 15.") + .define( + TARGET_TIMEZONE_CONFIG, + ConfigDef.Type.STRING, + "UTC", + ConfigDef.Importance.LOW, + "The timezone to use for the timestamp conversion."); private interface TimestampTranslator { /** Convert from the type-specific format to the universal java.util.Date format */ @@ -296,13 +308,13 @@ public Schema typeSchema(boolean isOptional) { @Override public Date toType(Config config, Date orig) { - Calendar result = Calendar.getInstance(UTC); - result.setTime(orig); - result.set(Calendar.HOUR_OF_DAY, 0); - result.set(Calendar.MINUTE, 0); - result.set(Calendar.SECOND, 0); - result.set(Calendar.MILLISECOND, 0); - return result.getTime(); + ZonedDateTime truncated = + ZonedDateTime.of( + orig.toInstant().atZone(config.targetTimeZoneId).toLocalDate(), + LocalTime.MIDNIGHT, + UTC_ZONE_ID); + + return Date.from(truncated.toInstant()); } }); @@ -327,15 +339,10 @@ public Schema typeSchema(boolean isOptional) { @Override public Date toType(Config config, Date orig) { - Calendar origCalendar = Calendar.getInstance(UTC); - origCalendar.setTime(orig); - Calendar result = Calendar.getInstance(UTC); - result.setTimeInMillis(0L); - result.set(Calendar.HOUR_OF_DAY, origCalendar.get(Calendar.HOUR_OF_DAY)); - result.set(Calendar.MINUTE, origCalendar.get(Calendar.MINUTE)); - result.set(Calendar.SECOND, origCalendar.get(Calendar.SECOND)); - result.set(Calendar.MILLISECOND, origCalendar.get(Calendar.MILLISECOND)); - return result.getTime(); + ZonedDateTime zonedDateTime = orig.toInstant().atZone(config.targetTimeZoneId); + return Date.from( + ZonedDateTime.of(LocalDate.of(1970, 1, 1), zonedDateTime.toLocalTime(), UTC_ZONE_ID) + .toInstant()); } }); @@ -392,7 +399,8 @@ private static class Config { String toFormatPattern, String unixPrecision, String header, - Optional rollingWindow) { + Optional rollingWindow, + TimeZone targetTimeZone) { this.fields = fields; this.type = type; this.fromFormat = fromFormat; @@ -402,6 +410,8 @@ private static class Config { this.unixPrecision = unixPrecision; this.header = header; this.rollingWindow = rollingWindow; + this.targetTimeZone = targetTimeZone; + this.targetTimeZoneId = targetTimeZone.toZoneId(); } String[] fields; @@ -414,6 +424,9 @@ private static class Config { String unixPrecision; Optional rollingWindow; + + TimeZone targetTimeZone; + final ZoneId targetTimeZoneId; } private static enum FieldType { @@ -442,14 +455,21 @@ public void configure(Map configs) { final String unixPrecision = simpleConfig.getString(UNIX_PRECISION_CONFIG); + final String targetTimeZone = simpleConfig.getString(TARGET_TIMEZONE_CONFIG); + TimeZone timeZone = TimeZone.getTimeZone(targetTimeZone); + if (timeZone == null) { + throw new ConfigException("Invalid timezone: " + targetTimeZone); + } + if (type.equals(TYPE_STRING) && isBlank(toFormatPattern)) { throw new ConfigException( "TimestampConverter requires format option to be specified " + "when using string timestamps"); } DateTimeFormatter fromPattern = - io.lenses.connect.smt.header.Utils.getDateFormat(fromFormatPattern); - DateTimeFormatter toPattern = io.lenses.connect.smt.header.Utils.getDateFormat(toFormatPattern); + io.lenses.connect.smt.header.Utils.getDateFormat(fromFormatPattern, UTC.toZoneId()); + DateTimeFormatter toPattern = + io.lenses.connect.smt.header.Utils.getDateFormat(toFormatPattern, timeZone.toZoneId()); String[] fields = fieldConfig.split("\\."); if (fields.length > 0) { @@ -494,6 +514,7 @@ public void configure(Map configs) { "TimestampConverter requires a window size to be specified " + "when using rolling window timestamps."); } + config = new Config( fields, @@ -504,7 +525,8 @@ public void configure(Map configs) { toFormatPattern, unixPrecision, header, - rollingWindowDetails); + rollingWindowDetails, + timeZone); } @Override diff --git a/src/main/java/io/lenses/connect/smt/header/Utils.java b/src/main/java/io/lenses/connect/smt/header/Utils.java index 4a5bdfd..36ed260 100644 --- a/src/main/java/io/lenses/connect/smt/header/Utils.java +++ b/src/main/java/io/lenses/connect/smt/header/Utils.java @@ -10,6 +10,7 @@ */ package io.lenses.connect.smt.header; +import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.TimeZone; import org.apache.kafka.common.config.ConfigException; @@ -17,14 +18,14 @@ class Utils { private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); - static DateTimeFormatter getDateFormat(String formatPattern) { + static DateTimeFormatter getDateFormat(String formatPattern, ZoneId zoneId) { if (formatPattern == null) { return null; } DateTimeFormatter format = null; if (!isBlank(formatPattern)) { try { - format = DateTimeFormatter.ofPattern(formatPattern).withZone(UTC.toZoneId()); + format = DateTimeFormatter.ofPattern(formatPattern).withZone(zoneId); } catch (IllegalArgumentException e) { throw new ConfigException( "TimestampConverter requires a DateTimeFormatter-compatible pattern " diff --git a/src/test/java/io/lenses/connect/smt/header/InsertRollingWallclockTest.java b/src/test/java/io/lenses/connect/smt/header/InsertRollingWallclockTest.java index 821c2ba..87d0bcb 100644 --- a/src/test/java/io/lenses/connect/smt/header/InsertRollingWallclockTest.java +++ b/src/test/java/io/lenses/connect/smt/header/InsertRollingWallclockTest.java @@ -11,6 +11,7 @@ package io.lenses.connect.smt.header; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; import java.time.Instant; import java.time.LocalDateTime; @@ -19,6 +20,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.Map; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.header.ConnectHeaders; import org.apache.kafka.connect.header.Headers; @@ -105,6 +107,55 @@ public void testRollingWindowEvery15Minutes() { }); } + @Test + public void testRollingWindowEvery15MinutesAndTimezoneIsParis() { + ArrayList> scenarios = new ArrayList<>(); + + scenarios.add(new Tuple3<>(("2020-01-01T01:00:00.999Z"), 15, "2020-01-01 02:00")); + scenarios.add(new Tuple3<>(("2020-01-01T01:00:01.000Z"), 15, "2020-01-01 02:00")); + scenarios.add(new Tuple3<>(("2020-01-01T01:14:59.000Z"), 15, "2020-01-01 02:00")); + scenarios.add(new Tuple3<>(("2020-01-01T01:15:00.000Z"), 15, "2020-01-01 02:15")); + scenarios.add(new Tuple3<>(("2020-01-01T01:15:01.000Z"), 15, "2020-01-01 02:15")); + scenarios.add(new Tuple3<>(("2020-01-01T01:29:59.000Z"), 15, "2020-01-01 02:15")); + scenarios.add(new Tuple3<>(("2020-01-01T01:30:00.000Z"), 15, "2020-01-01 02:30")); + scenarios.add(new Tuple3<>(("2020-01-01T01:30:01.000Z"), 15, "2020-01-01 02:30")); + scenarios.add(new Tuple3<>(("2020-01-01T01:44:59.000Z"), 15, "2020-01-01 02:30")); + scenarios.add(new Tuple3<>(("2020-01-01T01:45:00.000Z"), 15, "2020-01-01 02:45")); + scenarios.add(new Tuple3<>(("2020-01-01T01:45:01.000Z"), 15, "2020-01-01 02:45")); + scenarios.add(new Tuple3<>(("2020-01-01T01:59:59.000Z"), 15, "2020-01-01 02:45")); + + scenarios.forEach( + scenario -> { + Map configs = new HashMap<>(); + configs.put("header.name", "wallclock"); + configs.put("value.type", "format"); + configs.put("format", "yyyy-MM-dd HH:mm"); + configs.put("window.size", scenario.second.toString()); + configs.put("window.type", "minutes"); + configs.put("timezone", "Europe/Paris"); + final InsertRollingWallclock transformer = new InsertRollingWallclock<>(); + transformer.configure(configs); + transformer.setInstantF(() -> Instant.parse(scenario.first)); + + final Headers headers = new ConnectHeaders(); + final SourceRecord record = + new SourceRecord( + null, + null, + "topic", + 0, + Schema.STRING_SCHEMA, + "key", + Schema.STRING_SCHEMA, + "value", + Instant.parse(scenario.first).toEpochMilli(), + headers); + final SourceRecord transformed = transformer.apply(record); + final String actual = transformed.headers().lastWithName("wallclock").value().toString(); + assertEquals(actual, scenario.third); + }); + } + @Test public void testRollingWindowEvery5Minutes() { ArrayList> scenarios = new ArrayList<>(); @@ -445,6 +496,27 @@ public void testRollingWindowEvery12Seconds() { }); } + @Test + public void testRaiseExceptionWhenTimezoneIsNotUTCAndFormatIsEpoch() { + Map configs = new HashMap<>(); + configs.put("header.name", "wallclock"); + configs.put("value.type", "epoch"); + configs.put("format", "yyyy-MM-dd HH:mm"); + configs.put("rolling.window.size", "15"); + configs.put("rolling.window.type", "minutes"); + configs.put("timezone", "Europe/Paris"); + final InsertRollingWallclock transformer = new InsertRollingWallclock<>(); + try { + transformer.configure(configs); + } catch (ConfigException e) { + assertEquals( + "Configuration 'timezone' is not allowed to be set to a value other than UTC when 'value.type' is set to 'epoch'.", + e.getMessage()); + } catch (Exception e) { + fail("Expected ConfigException but got " + e.getClass().getName()); + } + } + private static String convertToEpochMillis(String date, String pattern) { DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern); LocalDateTime localDateTime = LocalDateTime.parse(date, formatter); diff --git a/src/test/java/io/lenses/connect/smt/header/InsertWallclockDateTimePartTest.java b/src/test/java/io/lenses/connect/smt/header/InsertWallclockDateTimePartTest.java index 88d75df..da9d7e4 100644 --- a/src/test/java/io/lenses/connect/smt/header/InsertWallclockDateTimePartTest.java +++ b/src/test/java/io/lenses/connect/smt/header/InsertWallclockDateTimePartTest.java @@ -129,6 +129,60 @@ public void testInsertHour() { assertEquals("13", transformed.headers().lastWithName("wallclock").value()); } + @Test + public void testInsertHourAndTimezoneIsKalkota() { + InsertWallclockDateTimePart transformer = new InsertWallclockDateTimePart<>(); + Map configs = new HashMap<>(); + configs.put("header.name", "wallclock"); + configs.put("date.time.part", "hour"); + configs.put("timezone", "Asia/Kolkata"); + transformer.configure(configs); + transformer.setInstantF(() -> Instant.parse("2020-01-01T13:00:00.000Z")); + + final Headers headers = new ConnectHeaders(); + final SourceRecord record = + new SourceRecord( + null, + null, + "topic", + 0, + Schema.STRING_SCHEMA, + "key", + Schema.STRING_SCHEMA, + "value", + System.currentTimeMillis(), + headers); + final SourceRecord transformed = transformer.apply(record); + assertEquals("18", transformed.headers().lastWithName("wallclock").value()); + } + + @Test + public void testInsertYearAndTimezoneIsKalkota() { + InsertWallclockDateTimePart transformer = new InsertWallclockDateTimePart<>(); + Map configs = new HashMap<>(); + configs.put("header.name", "wallclock"); + configs.put("date.time.part", "year"); + configs.put("timezone", "Asia/Kolkata"); + transformer.configure(configs); + transformer.setInstantF(() -> Instant.parse("2020-12-31T23:00:00.000Z")); + + final Headers headers = new ConnectHeaders(); + final SourceRecord record = + new SourceRecord( + null, + null, + "topic", + 0, + Schema.STRING_SCHEMA, + "key", + Schema.STRING_SCHEMA, + "value", + System.currentTimeMillis(), + headers); + final SourceRecord transformed = transformer.apply(record); + assertEquals("2021", transformed.headers().lastWithName("wallclock").value()); + } + @Test public void testInsertMinute() { InsertWallclockDateTimePart transformer = new InsertWallclockDateTimePart<>(); diff --git a/src/test/java/io/lenses/connect/smt/header/InsertWallclockTest.java b/src/test/java/io/lenses/connect/smt/header/InsertWallclockTest.java index 2ffffb6..4ca7d14 100644 --- a/src/test/java/io/lenses/connect/smt/header/InsertWallclockTest.java +++ b/src/test/java/io/lenses/connect/smt/header/InsertWallclockTest.java @@ -58,6 +58,23 @@ public void testApplyWithEpochValue() { transformedHeaders.iterator().next().value(), String.valueOf(instant.toEpochMilli())); } + @Test + public void testRaiseConfigExceptionIfTimezoneIsNotUTCAndEpochModeIsSet() { + Map configs = new HashMap<>(); + configs.put("header.name", "wallclock"); + configs.put("value.type", "epoch"); + configs.put("timezone", "Europe/Paris"); + InsertWallclock transformer = new InsertWallclock<>(); + try { + transformer.configure(configs); + fail("It should have raised a ConfigException"); + } catch (Exception e) { + assertEquals( + e.getMessage(), + "Configuration 'timezone' must be set to 'UTC' when 'value.type' is set to 'epoch'."); + } + } + @Test public void testApplyWithStringValue() { Map configs = new HashMap<>(); @@ -130,6 +147,43 @@ public void testApplyWithStringValueAndCustomFormatter() { assertEquals(transformedHeaders.iterator().next().value(), "2020-01-01 00:00:00"); } + @Test + public void testApplyWithStringValueAndCustomFormatterAndTimezoneIsKolkata() { + Map configs = new HashMap<>(); + configs.put("header.name", "wallclock"); + configs.put("value.type", "format"); + configs.put("format", "yyyy-MM-dd HH:mm:ss"); + configs.put("timezone", "Asia/Kolkata"); + InsertWallclock transformer = new InsertWallclock<>(); + transformer.configure(configs); + final Instant instant = Instant.parse("2020-01-01T00:00:00.000Z"); + transformer.setInstantF(() -> instant); + + // Create a source record + Headers headers = new ConnectHeaders(); + SourceRecord record = + new SourceRecord( + null, + null, + "topic", + 0, + Schema.STRING_SCHEMA, + "value", + null, + null, + System.currentTimeMillis(), + headers); + + // Apply the transformation + SourceRecord transformedRecord = transformer.apply(record); + + // Verify the header is inserted + Headers transformedHeaders = transformedRecord.headers(); + assertEquals(1, transformedHeaders.size()); + assertEquals(transformedHeaders.iterator().next().key(), "wallclock"); + assertEquals(transformedHeaders.iterator().next().value(), "2020-01-01 05:30:00"); + } + @Test public void testIgnoreFormatIfEpochModeIsSet() { Map configs = new HashMap<>(); diff --git a/src/test/java/io/lenses/connect/smt/header/TimestampConverterTest.java b/src/test/java/io/lenses/connect/smt/header/TimestampConverterTest.java index 542f44c..68b8dab 100644 --- a/src/test/java/io/lenses/connect/smt/header/TimestampConverterTest.java +++ b/src/test/java/io/lenses/connect/smt/header/TimestampConverterTest.java @@ -84,14 +84,14 @@ public class TimestampConverterTest { // Configuration @Test - void testConfigNoTargetType() { + public void testConfigNoTargetType() { TimestampConverter transformer = new TimestampConverter<>(); assertThrows( ConfigException.class, () -> transformer.configure(Collections.emptyMap())); } @Test - void testConfigInvalidTargetType() { + public void testConfigInvalidTargetType() { TimestampConverter transformer = new TimestampConverter<>(); assertThrows( ConfigException.class, @@ -101,7 +101,7 @@ void testConfigInvalidTargetType() { } @Test - void testConfigInvalidUnixPrecision() { + public void testConfigInvalidUnixPrecision() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "unix"); config.put(TimestampConverter.UNIX_PRECISION_CONFIG, "invalid"); @@ -110,7 +110,7 @@ void testConfigInvalidUnixPrecision() { } @Test - void testConfigValidUnixPrecision() { + public void testConfigValidUnixPrecision() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "unix"); config.put(TimestampConverter.UNIX_PRECISION_CONFIG, "seconds"); @@ -120,7 +120,7 @@ void testConfigValidUnixPrecision() { } @Test - void testConfigMissingFormat() { + public void testConfigMissingFormat() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string"); config.put(TimestampConverter.HEADER_NAME_CONFIG, "ts_header"); @@ -129,7 +129,7 @@ void testConfigMissingFormat() { } @Test - void testConfigInvalidFormat() { + public void testConfigInvalidFormat() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string"); config.put(TimestampConverter.FORMAT_TO_CONFIG, "bad-format"); @@ -141,7 +141,7 @@ void testConfigInvalidFormat() { // Conversions without schemas (most flexible Timestamp -> other types) @Test - void testSchemalessIdentity() { + public void testSchemalessIdentity() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.HEADER_NAME_CONFIG, "ts_header"); @@ -157,7 +157,7 @@ void testSchemalessIdentity() { } @Test - void testSchemalessTimestampToDate() { + public void testSchemalessTimestampToDate() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Date"); config.put(TimestampConverter.HEADER_NAME_CONFIG, "ts_header"); @@ -173,7 +173,29 @@ void testSchemalessTimestampToDate() { } @Test - void testSchemalessTimestampToTime() { + public void testSchemalessTimestampToDateOnNonUTC() { + Map config = new HashMap<>(); + config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Date"); + config.put(TimestampConverter.HEADER_NAME_CONFIG, "ts_header"); + config.put(TimestampConverter.TARGET_TIMEZONE_CONFIG, "America/Chicago"); + + final TimestampConverter transformer = new TimestampConverter<>(); + transformer.configure(config); + System.out.println("DATE_PLUS_TIME: " + DATE_PLUS_TIME.getTime()); + SourceRecord transformed = transformer.apply(createRecordSchemaless(DATE_PLUS_TIME.getTime())); + + Header header = transformed.headers().lastWithName("ts_header"); + assertNotNull(header); + assertEquals(Date.SCHEMA.type(), header.schema().type()); + + Calendar expected = GregorianCalendar.getInstance(UTC); + expected.setTimeInMillis(0L); + expected.set(1970, Calendar.JANUARY, 1, 0, 0, 0); + assertEquals(expected.getTime(), header.value()); + } + + @Test + public void testSchemalessTimestampToTime() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Time"); config.put(TimestampConverter.HEADER_NAME_CONFIG, "ts_header"); @@ -189,7 +211,30 @@ void testSchemalessTimestampToTime() { } @Test - void testSchemalessTimestampToUnix() { + public void testSchemalessTimestampToTimeNonUtc() { + Map config = new HashMap<>(); + config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Time"); + config.put(TimestampConverter.HEADER_NAME_CONFIG, "ts_header"); + config.put(TimestampConverter.TARGET_TIMEZONE_CONFIG, "America/Chicago"); + + final TimestampConverter transformer = new TimestampConverter<>(); + transformer.configure(config); + SourceRecord transformed = transformer.apply(createRecordSchemaless(DATE_PLUS_TIME.getTime())); + + Header header = transformed.headers().lastWithName("ts_header"); + assertNotNull(header); + assertEquals(Time.SCHEMA.type(), header.schema().type()); + + Calendar expected = Calendar.getInstance(TimeZone.getTimeZone("UTC")); + expected.setTimeInMillis(0L); + expected.add(Calendar.HOUR, 18); + expected.add(Calendar.MILLISECOND, 1234); + java.util.Date computedValue = (java.util.Date) header.value(); + assertEquals(expected.getTime(), computedValue); + } + + @Test + public void testSchemalessTimestampToUnix() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "unix"); config.put(TimestampConverter.HEADER_NAME_CONFIG, "ts_header"); @@ -205,7 +250,7 @@ void testSchemalessTimestampToUnix() { } @Test - void testSchemalessTimestampToString() { + public void testSchemalessTimestampToString() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string"); config.put(TimestampConverter.FORMAT_TO_CONFIG, STRING_DATE_FMT); @@ -220,10 +265,27 @@ void testSchemalessTimestampToString() { assertEquals(DATE_PLUS_TIME_STRING, header.value()); } + @Test + public void testSchemalessTimestampToStringTargeting() { + Map config = new HashMap<>(); + config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string"); + config.put(TimestampConverter.FORMAT_TO_CONFIG, STRING_DATE_FMT); + config.put(TimestampConverter.HEADER_NAME_CONFIG, "str_header"); + config.put(TimestampConverter.TARGET_TIMEZONE_CONFIG, "America/Chicago"); + TimestampConverter transformer = new TimestampConverter<>(); + transformer.configure(config); + SourceRecord transformed = transformer.apply(createRecordSchemaless(DATE_PLUS_TIME.getTime())); + + Header header = transformed.headers().lastWithName("str_header"); + assertNotNull(header); + + assertEquals("1970 01 01 18 00 01 234 CST", header.value()); + } + // Conversions without schemas (core types -> most flexible Timestamp format) @Test - void testSchemalessDateToTimestamp() { + public void testSchemalessDateToTimestamp() { final Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.HEADER_NAME_CONFIG, "ts_header"); @@ -239,7 +301,7 @@ void testSchemalessDateToTimestamp() { } @Test - void testSchemalessTimeToTimestamp() { + public void testSchemalessTimeToTimestamp() { final Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.HEADER_NAME_CONFIG, "ts_header"); @@ -255,7 +317,7 @@ void testSchemalessTimeToTimestamp() { } @Test - void testSchemalessUnixToTimestamp() { + public void testSchemalessUnixToTimestamp() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.HEADER_NAME_CONFIG, "ts_header"); @@ -271,7 +333,7 @@ void testSchemalessUnixToTimestamp() { } @Test - void testSchemalessUnixAsStringToTimestamp() { + public void testSchemalessUnixAsStringToTimestamp() { TimestampConverter transformer = new TimestampConverter<>(); Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); @@ -287,7 +349,7 @@ void testSchemalessUnixAsStringToTimestamp() { } @Test - void testSchemalessStringToTimestamp() { + public void testSchemalessStringToTimestamp() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FORMAT_FROM_CONFIG, STRING_DATE_FMT); @@ -305,7 +367,7 @@ void testSchemalessStringToTimestamp() { // Conversions with schemas (most flexible Timestamp -> other types) @Test - void testWithSchemaIdentity() { + public void testWithSchemaIdentity() { final TimestampConverter transformer = new TimestampConverter<>(); final Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); @@ -321,7 +383,7 @@ void testWithSchemaIdentity() { } @Test - void testWithSchemaTimestampToDate() { + public void testWithSchemaTimestampToDate() { final TimestampConverter transformer = new TimestampConverter<>(); Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Date"); @@ -337,7 +399,7 @@ void testWithSchemaTimestampToDate() { } @Test - void testWithSchemaTimestampToTime() { + public void testWithSchemaTimestampToTime() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Time"); config.put(TimestampConverter.HEADER_NAME_CONFIG, "tm_header"); @@ -354,7 +416,7 @@ void testWithSchemaTimestampToTime() { } @Test - void testWithSchemaTimestampToUnix() { + public void testWithSchemaTimestampToUnix() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "unix"); config.put(TimestampConverter.HEADER_NAME_CONFIG, "unix_header"); @@ -371,7 +433,7 @@ void testWithSchemaTimestampToUnix() { } @Test - void testWithSchemaTimestampToString() { + public void testWithSchemaTimestampToString() { final Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string"); config.put(TimestampConverter.FORMAT_TO_CONFIG, STRING_DATE_FMT); @@ -390,31 +452,31 @@ void testWithSchemaTimestampToString() { // Null-value conversions schemaless @Test - void testSchemalessNullValueToString() { + public void testSchemalessNullValueToString() { testSchemalessNullValueConversion("string"); testSchemalessNullFieldConversion("string"); } @Test - void testSchemalessNullValueToDate() { + public void testSchemalessNullValueToDate() { testSchemalessNullValueConversion("Date"); testSchemalessNullFieldConversion("Date"); } @Test - void testSchemalessNullValueToTimestamp() { + public void testSchemalessNullValueToTimestamp() { testSchemalessNullValueConversion("Timestamp"); testSchemalessNullFieldConversion("Timestamp"); } @Test - void testSchemalessNullValueToUnix() { + public void testSchemalessNullValueToUnix() { testSchemalessNullValueConversion("unix"); testSchemalessNullFieldConversion("unix"); } @Test - void testSchemalessNullValueToTime() { + public void testSchemalessNullValueToTime() { testSchemalessNullValueConversion("Time"); testSchemalessNullFieldConversion("Time"); } @@ -453,7 +515,7 @@ private void testSchemalessNullFieldConversion(String targetType) { // Conversions with schemas (core types -> most flexible Timestamp format) @Test - void testWithSchemaDateToTimestamp() { + public void testWithSchemaDateToTimestamp() { final Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.HEADER_NAME_CONFIG, "ts_header"); @@ -470,7 +532,7 @@ void testWithSchemaDateToTimestamp() { } @Test - void testWithSchemaTimeToTimestamp() { + public void testWithSchemaTimeToTimestamp() { final TimestampConverter transformer = new TimestampConverter<>(); final Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); @@ -486,7 +548,7 @@ void testWithSchemaTimeToTimestamp() { } @Test - void testWithSchemaUnixToTimestamp() { + public void testWithSchemaUnixToTimestamp() { final TimestampConverter transformer = new TimestampConverter<>(); final Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); @@ -502,7 +564,7 @@ void testWithSchemaUnixToTimestamp() { } @Test - void testWithSchemaStringToTimestamp() { + public void testWithSchemaStringToTimestamp() { final Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FORMAT_FROM_CONFIG, STRING_DATE_FMT); @@ -521,7 +583,7 @@ void testWithSchemaStringToTimestamp() { // Null-value conversions with schema @Test - void testWithSchemaNullValueToTimestamp() { + public void testWithSchemaNullValueToTimestamp() { testWithSchemaNullValueConversion( "Timestamp", Schema.OPTIONAL_INT64_SCHEMA, TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA); testWithSchemaNullValueConversion( @@ -541,7 +603,7 @@ void testWithSchemaNullValueToTimestamp() { } @Test - void testWithSchemaNullFieldToTimestamp() { + public void testWithSchemaNullFieldToTimestamp() { testWithSchemaNullFieldConversion( "Timestamp", Schema.OPTIONAL_INT64_SCHEMA, TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA); testWithSchemaNullFieldConversion( @@ -561,7 +623,7 @@ void testWithSchemaNullFieldToTimestamp() { } @Test - void testWithSchemaNullValueToUnix() { + public void testWithSchemaNullValueToUnix() { testWithSchemaNullValueConversion( "unix", Schema.OPTIONAL_INT64_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA); testWithSchemaNullValueConversion( @@ -575,7 +637,7 @@ void testWithSchemaNullValueToUnix() { } @Test - void testWithSchemaNullFieldToUnix() { + public void testWithSchemaNullFieldToUnix() { testWithSchemaNullFieldConversion( "unix", Schema.OPTIONAL_INT64_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA); testWithSchemaNullFieldConversion( @@ -589,7 +651,7 @@ void testWithSchemaNullFieldToUnix() { } @Test - void testWithSchemaNullValueToTime() { + public void testWithSchemaNullValueToTime() { testWithSchemaNullValueConversion( "Time", Schema.OPTIONAL_INT64_SCHEMA, TimestampConverter.OPTIONAL_TIME_SCHEMA); testWithSchemaNullValueConversion( @@ -605,7 +667,7 @@ void testWithSchemaNullValueToTime() { } @Test - void testWithSchemaNullFieldToTime() { + public void testWithSchemaNullFieldToTime() { testWithSchemaNullFieldConversion( "Time", Schema.OPTIONAL_INT64_SCHEMA, TimestampConverter.OPTIONAL_TIME_SCHEMA); testWithSchemaNullFieldConversion( @@ -621,7 +683,7 @@ void testWithSchemaNullFieldToTime() { } @Test - void testWithSchemaNullValueToDate() { + public void testWithSchemaNullValueToDate() { testWithSchemaNullValueConversion( "Date", Schema.OPTIONAL_INT64_SCHEMA, TimestampConverter.OPTIONAL_DATE_SCHEMA); testWithSchemaNullValueConversion( @@ -637,7 +699,7 @@ void testWithSchemaNullValueToDate() { } @Test - void testWithSchemaNullFieldToDate() { + public void testWithSchemaNullFieldToDate() { testWithSchemaNullFieldConversion( "Date", Schema.OPTIONAL_INT64_SCHEMA, TimestampConverter.OPTIONAL_DATE_SCHEMA); testWithSchemaNullFieldConversion( @@ -653,7 +715,7 @@ void testWithSchemaNullFieldToDate() { } @Test - void testWithSchemaNullValueToString() { + public void testWithSchemaNullValueToString() { testWithSchemaNullValueConversion( "string", Schema.OPTIONAL_INT64_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA); testWithSchemaNullValueConversion( @@ -667,7 +729,7 @@ void testWithSchemaNullValueToString() { } @Test - void testWithSchemaNullFieldToString() { + public void testWithSchemaNullFieldToString() { testWithSchemaNullFieldConversion( "string", Schema.OPTIONAL_INT64_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA); testWithSchemaNullFieldConversion( @@ -733,7 +795,7 @@ private void testWithSchemaNullFieldConversion( // Convert field instead of entire key/value @Test - void testSchemalessFieldConversion() { + public void testSchemalessFieldConversion() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Date"); config.put(TimestampConverter.FIELD_CONFIG, "ts"); @@ -751,7 +813,7 @@ void testSchemalessFieldConversion() { } @Test - void testWithSchemaFieldConversion() { + public void testWithSchemaFieldConversion() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FIELD_CONFIG, "ts"); @@ -779,7 +841,7 @@ void testWithSchemaFieldConversion() { } @Test - void testWithSchemaFieldConversion_Micros() { + public void testWithSchemaFieldConversion_Micros() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FIELD_CONFIG, "ts"); @@ -804,7 +866,7 @@ void testWithSchemaFieldConversion_Micros() { } @Test - void testWithSchemaFieldConversion_Nanos() { + public void testWithSchemaFieldConversion_Nanos() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FIELD_CONFIG, "ts"); @@ -829,7 +891,7 @@ void testWithSchemaFieldConversion_Nanos() { } @Test - void testWithSchemaFieldConversion_Seconds() { + public void testWithSchemaFieldConversion_Seconds() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FIELD_CONFIG, "ts"); @@ -858,7 +920,7 @@ void testWithSchemaFieldConversion_Seconds() { } @Test - void testWithSchemaValuePrefixedFieldConversion_Seconds() { + public void testWithSchemaValuePrefixedFieldConversion_Seconds() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FIELD_CONFIG, "_value.ts"); @@ -887,7 +949,7 @@ void testWithSchemaValuePrefixedFieldConversion_Seconds() { } @Test - void testWithRecordMetadataPrefixedFieldConversion_Seconds() { + public void testWithRecordMetadataPrefixedFieldConversion_Seconds() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FIELD_CONFIG, "_timestamp"); @@ -926,7 +988,7 @@ void testWithRecordMetadataPrefixedFieldConversion_Seconds() { } @Test - void testRaiseExceptionIfTimestampMetadataIsUsedWithAPath() { + public void testRaiseExceptionIfTimestampMetadataIsUsedWithAPath() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FIELD_CONFIG, "_timestamp.incorrect.path"); @@ -947,7 +1009,7 @@ void testRaiseExceptionIfTimestampMetadataIsUsedWithAPath() { } @Test - void testWithSchemaKeyPrefixedFieldConversion_Seconds() { + public void testWithSchemaKeyPrefixedFieldConversion_Seconds() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FIELD_CONFIG, "_key.ts"); @@ -978,7 +1040,7 @@ void testWithSchemaKeyPrefixedFieldConversion_Seconds() { } @Test - void testWithSchemaNestedFieldConversion_Seconds() { + public void testWithSchemaNestedFieldConversion_Seconds() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FIELD_CONFIG, "level1.ts"); @@ -1011,7 +1073,7 @@ void testWithSchemaNestedFieldConversion_Seconds() { } @Test - void testWithSchemaNestedKeyFieldConversion_Seconds() { + public void testWithSchemaNestedKeyFieldConversion_Seconds() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FIELD_CONFIG, "_key.level1.ts"); @@ -1046,7 +1108,7 @@ void testWithSchemaNestedKeyFieldConversion_Seconds() { } @Test - void testSchemalessStringToUnix_Micros() { + public void testSchemalessStringToUnix_Micros() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "unix"); config.put(TimestampConverter.UNIX_PRECISION_CONFIG, "microseconds"); @@ -1063,7 +1125,7 @@ void testSchemalessStringToUnix_Micros() { } @Test - void testSchemalessStringToUnix_Nanos() { + public void testSchemalessStringToUnix_Nanos() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "unix"); config.put(TimestampConverter.UNIX_PRECISION_CONFIG, "nanoseconds"); @@ -1080,7 +1142,7 @@ void testSchemalessStringToUnix_Nanos() { } @Test - void testSchemalessStringToUnix_Seconds() { + public void testSchemalessStringToUnix_Seconds() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "unix"); config.put(TimestampConverter.UNIX_PRECISION_CONFIG, "seconds"); @@ -1099,7 +1161,7 @@ void testSchemalessStringToUnix_Seconds() { // Validate Key implementation in addition to Value @Test - void testKey() { + public void testKey() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FIELD_CONFIG, "_key"); @@ -1117,7 +1179,7 @@ void testKey() { } @Test - void testWithSchemaNestedKeyFieldConversion15SecondsWindow() { + public void testWithSchemaNestedKeyFieldConversion15SecondsWindow() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FIELD_CONFIG, "_key.level1.ts"); @@ -1155,7 +1217,7 @@ void testWithSchemaNestedKeyFieldConversion15SecondsWindow() { } @Test - void testWithSchemaNestedKeyFieldConversion2HoursTimestampWindow() { + public void testWithSchemaNestedKeyFieldConversion2HoursTimestampWindow() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FIELD_CONFIG, "_key.level1.ts"); @@ -1193,7 +1255,7 @@ void testWithSchemaNestedKeyFieldConversion2HoursTimestampWindow() { } @Test - void testWithSchemaNestedKeyFieldConversion2HoursStringWindow() { + public void testWithSchemaNestedKeyFieldConversion2HoursStringWindow() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string"); @@ -1235,7 +1297,7 @@ void testWithSchemaNestedKeyFieldConversion2HoursStringWindow() { } @Test - void testWithSchemaNestedKeyFieldConversion2HoursStringWindowWhenSourceIsString() { + public void testWithSchemaNestedKeyFieldConversion2HoursStringWindowWhenSourceIsString() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string"); @@ -1277,7 +1339,7 @@ void testWithSchemaNestedKeyFieldConversion2HoursStringWindowWhenSourceIsString( } @Test - void testWithSchemaNestedKeyFieldConversion10MinutesWindow() { + public void testWithSchemaNestedKeyFieldConversion10MinutesWindow() { Map config = new HashMap<>(); config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"); config.put(TimestampConverter.FIELD_CONFIG, "_key.level1.ts"); From 5194eb82ef6c5241a4c6762d7e690dfa433daa46 Mon Sep 17 00:00:00 2001 From: stheppi Date: Fri, 23 Feb 2024 14:32:37 +0000 Subject: [PATCH 2/9] Address the PR comments/suggestions --- .../lenses/connect/smt/header/Constants.java | 4 ++- .../smt/header/InsertRollingWallclock.java | 16 +++------ .../connect/smt/header/InsertWallclock.java | 11 ++----- .../header/InsertWallclockDateTimePart.java | 33 ++++++++----------- .../smt/header/TimestampConverter.java | 17 ++++------ .../io/lenses/connect/smt/header/Utils.java | 2 -- .../InsertWallclockDateTimePartTest.java | 16 ++++----- .../smt/header/TimestampConverterTest.java | 4 +-- 8 files changed, 42 insertions(+), 61 deletions(-) diff --git a/src/main/java/io/lenses/connect/smt/header/Constants.java b/src/main/java/io/lenses/connect/smt/header/Constants.java index c5ca3ab..aebebcd 100644 --- a/src/main/java/io/lenses/connect/smt/header/Constants.java +++ b/src/main/java/io/lenses/connect/smt/header/Constants.java @@ -1,7 +1,9 @@ package io.lenses.connect.smt.header; import java.time.ZoneId; +import java.util.TimeZone; public class Constants { - public static final ZoneId UTC = ZoneId.of("UTC"); + public static final TimeZone UTC = TimeZone.getTimeZone("UTC"); + public static final ZoneId UTC_ZONE_ID = UTC.toZoneId(); } diff --git a/src/main/java/io/lenses/connect/smt/header/InsertRollingWallclock.java b/src/main/java/io/lenses/connect/smt/header/InsertRollingWallclock.java index 1324b52..736861b 100644 --- a/src/main/java/io/lenses/connect/smt/header/InsertRollingWallclock.java +++ b/src/main/java/io/lenses/connect/smt/header/InsertRollingWallclock.java @@ -46,7 +46,6 @@ public class InsertRollingWallclock> implements Trans private static final int DEFAULT_ROLLING_WINDOW_VALUE = 15; private static final RollingWindow DEFAULT_ROLLING_WINDOW = RollingWindow.MINUTES; private static final DateTimeFormatter DEFAULT_FORMATTER = DateTimeFormatter.ISO_LOCAL_DATE_TIME; - private ZoneId timezone = Constants.UTC; private interface ConfigName { String HEADER_NAME_CONFIG = "header.name"; @@ -156,15 +155,10 @@ public void configure(Map props) { + "' must be set to either 'epoch' or 'format'."); } final String timezoneStr = config.getString(ConfigName.TIMEZONE_CONFIG); - try { - this.timezone = TimeZone.getTimeZone(timezoneStr).toZoneId(); - } catch (Exception e) { - throw new ConfigException( - "Configuration '" - + ConfigName.TIMEZONE_CONFIG - + "' is not a valid timezone. It can be any valid java timezone."); - } - if (!this.timezone.getId().equals(Constants.UTC.getId()) + // getTimeZone docs: the specified TimeZone, or the GMT zone if the given ID cannot be + // understood. + ZoneId zoneId = TimeZone.getTimeZone(timezoneStr).toZoneId(); + if (!zoneId.getId().equals(Constants.UTC_ZONE_ID.getId()) && valueType.equalsIgnoreCase(ConfigName.VALUE_TYPE_EPOCH)) { throw new ConfigException( "Configuration '" @@ -186,7 +180,7 @@ public void configure(Map props) { } } valueExtractorF = this::getFormattedValue; - format = format.withZone(timezone); + format = format.withZone(zoneId); } else { valueExtractorF = this::getEpochValue; } diff --git a/src/main/java/io/lenses/connect/smt/header/InsertWallclock.java b/src/main/java/io/lenses/connect/smt/header/InsertWallclock.java index 4763753..c04157a 100644 --- a/src/main/java/io/lenses/connect/smt/header/InsertWallclock.java +++ b/src/main/java/io/lenses/connect/smt/header/InsertWallclock.java @@ -60,7 +60,6 @@ public class InsertWallclock> implements Transformati private Supplier instantF = Instant::now; - private TimeZone timeZone = TimeZone.getTimeZone("UTC"); private static final DateTimeFormatter DEFAULT_FORMATTER = DateTimeFormatter.ISO_LOCAL_DATE_TIME; /** @@ -166,12 +165,8 @@ public void configure(Map props) { + "' must be set to either 'epoch' or 'format'."); } final String timezoneStr = config.getString(ConfigName.TIMEZONE); - try { - timeZone = TimeZone.getTimeZone(timezoneStr); - } catch (IllegalArgumentException e) { - throw new ConfigException( - "Configuration '" + ConfigName.TIMEZONE + "' is not a valid timezone."); - } + TimeZone timeZone = TimeZone.getTimeZone(timezoneStr); + if (valueType.equalsIgnoreCase(ConfigName.VALUE_TYPE_FORMAT)) { final String pattern = config.getString(ConfigName.FORMAT); if (pattern == null) { @@ -187,7 +182,7 @@ public void configure(Map props) { format = format.withZone(timeZone.toZoneId()); valueExtractorF = this::getFormattedValue; } else { - if (!timeZone.getID().equals(Constants.UTC.getId())) { + if (!timeZone.getID().equals(Constants.UTC_ZONE_ID.getId())) { throw new ConfigException( "Configuration '" + ConfigName.TIMEZONE diff --git a/src/main/java/io/lenses/connect/smt/header/InsertWallclockDateTimePart.java b/src/main/java/io/lenses/connect/smt/header/InsertWallclockDateTimePart.java index d33f54e..96ebab8 100644 --- a/src/main/java/io/lenses/connect/smt/header/InsertWallclockDateTimePart.java +++ b/src/main/java/io/lenses/connect/smt/header/InsertWallclockDateTimePart.java @@ -37,14 +37,14 @@ public class InsertWallclockDateTimePart> implements private String headerName; // Used for testing only to inject the instant value - private Supplier instantF = Instant::now; + private Supplier instantSupplier = Instant::now; private ZoneId timeZone = ZoneId.of("UTC"); - private Function valueExtractorF; + private Function datePartExtractor; - void setInstantF(Supplier instantF) { - this.instantF = instantF; + void setInstantSupplier(Supplier instantSupplier) { + this.instantSupplier = instantSupplier; } public static ConfigDef CONFIG_DEF = @@ -92,10 +92,10 @@ public R apply(R r) { return null; } - Instant now = instantF.get(); + Instant now = instantSupplier.get(); final ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(now, timeZone); - final String value = valueExtractorF.apply(zonedDateTime); - r.headers().addString(headerName, value); + final String extractedDatePart = datePartExtractor.apply(zonedDateTime); + r.headers().addString(headerName, extractedDatePart); return r; } @@ -111,12 +111,7 @@ public void close() {} public void configure(Map props) { final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props); final String timeZoneStr = config.getString(ConfigName.TIMEZONE); - try { - timeZone = TimeZone.getTimeZone(timeZoneStr).toZoneId(); - } catch (IllegalArgumentException e) { - throw new ConfigException( - "Configuration '" + ConfigName.TIMEZONE + "' is not a valid timezone."); - } + timeZone = TimeZone.getTimeZone(timeZoneStr).toZoneId(); headerName = config.getString(ConfigName.HEADER_NAME); DateTimePart dateTimePart; try { @@ -138,22 +133,22 @@ public void configure(Map props) { // initialize the value extractor switch (dateTimePart) { case YEAR: - valueExtractorF = InsertWallclockDateTimePart::getYear; + datePartExtractor = InsertWallclockDateTimePart::getYear; break; case MONTH: - valueExtractorF = InsertWallclockDateTimePart::getMonth; + datePartExtractor = InsertWallclockDateTimePart::getMonth; break; case DAY: - valueExtractorF = InsertWallclockDateTimePart::getDay; + datePartExtractor = InsertWallclockDateTimePart::getDay; break; case HOUR: - valueExtractorF = InsertWallclockDateTimePart::getHour; + datePartExtractor = InsertWallclockDateTimePart::getHour; break; case MINUTE: - valueExtractorF = InsertWallclockDateTimePart::getMinute; + datePartExtractor = InsertWallclockDateTimePart::getMinute; break; case SECOND: - valueExtractorF = InsertWallclockDateTimePart::getSecond; + datePartExtractor = InsertWallclockDateTimePart::getSecond; break; default: throw new IllegalStateException("Unexpected value: " + dateTimePart); diff --git a/src/main/java/io/lenses/connect/smt/header/TimestampConverter.java b/src/main/java/io/lenses/connect/smt/header/TimestampConverter.java index 75666ba..6f281a5 100644 --- a/src/main/java/io/lenses/connect/smt/header/TimestampConverter.java +++ b/src/main/java/io/lenses/connect/smt/header/TimestampConverter.java @@ -86,9 +86,6 @@ public final class TimestampConverter> implements Tra private static final String UNIX_PRECISION_NANOS = "nanoseconds"; private static final String UNIX_PRECISION_SECONDS = "seconds"; - private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); - private static final ZoneId UTC_ZONE_ID = UTC.toZoneId(); - public static final Schema OPTIONAL_DATE_SCHEMA = org.apache.kafka.connect.data.Date.builder().optional().schema(); public static final Schema OPTIONAL_TIMESTAMP_SCHEMA = Timestamp.builder().optional().schema(); @@ -312,7 +309,7 @@ public Date toType(Config config, Date orig) { ZonedDateTime.of( orig.toInstant().atZone(config.targetTimeZoneId).toLocalDate(), LocalTime.MIDNIGHT, - UTC_ZONE_ID); + Constants.UTC_ZONE_ID); return Date.from(truncated.toInstant()); } @@ -341,7 +338,10 @@ public Schema typeSchema(boolean isOptional) { public Date toType(Config config, Date orig) { ZonedDateTime zonedDateTime = orig.toInstant().atZone(config.targetTimeZoneId); return Date.from( - ZonedDateTime.of(LocalDate.of(1970, 1, 1), zonedDateTime.toLocalTime(), UTC_ZONE_ID) + ZonedDateTime.of( + LocalDate.of(1970, 1, 1), + zonedDateTime.toLocalTime(), + Constants.UTC_ZONE_ID) .toInstant()); } }); @@ -457,17 +457,14 @@ public void configure(Map configs) { final String targetTimeZone = simpleConfig.getString(TARGET_TIMEZONE_CONFIG); TimeZone timeZone = TimeZone.getTimeZone(targetTimeZone); - if (timeZone == null) { - throw new ConfigException("Invalid timezone: " + targetTimeZone); - } - if (type.equals(TYPE_STRING) && isBlank(toFormatPattern)) { throw new ConfigException( "TimestampConverter requires format option to be specified " + "when using string timestamps"); } DateTimeFormatter fromPattern = - io.lenses.connect.smt.header.Utils.getDateFormat(fromFormatPattern, UTC.toZoneId()); + io.lenses.connect.smt.header.Utils.getDateFormat( + fromFormatPattern, Constants.UTC.toZoneId()); DateTimeFormatter toPattern = io.lenses.connect.smt.header.Utils.getDateFormat(toFormatPattern, timeZone.toZoneId()); diff --git a/src/main/java/io/lenses/connect/smt/header/Utils.java b/src/main/java/io/lenses/connect/smt/header/Utils.java index 36ed260..55f6ad1 100644 --- a/src/main/java/io/lenses/connect/smt/header/Utils.java +++ b/src/main/java/io/lenses/connect/smt/header/Utils.java @@ -12,11 +12,9 @@ import java.time.ZoneId; import java.time.format.DateTimeFormatter; -import java.util.TimeZone; import org.apache.kafka.common.config.ConfigException; class Utils { - private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); static DateTimeFormatter getDateFormat(String formatPattern, ZoneId zoneId) { if (formatPattern == null) { diff --git a/src/test/java/io/lenses/connect/smt/header/InsertWallclockDateTimePartTest.java b/src/test/java/io/lenses/connect/smt/header/InsertWallclockDateTimePartTest.java index da9d7e4..17b4a51 100644 --- a/src/test/java/io/lenses/connect/smt/header/InsertWallclockDateTimePartTest.java +++ b/src/test/java/io/lenses/connect/smt/header/InsertWallclockDateTimePartTest.java @@ -32,7 +32,7 @@ public void testInsertYear() { configs.put("header.name", "wallclock"); configs.put("date.time.part", "year"); transformer.configure(configs); - transformer.setInstantF(() -> Instant.parse("2020-01-01T00:00:00.000Z")); + transformer.setInstantSupplier(() -> Instant.parse("2020-01-01T00:00:00.000Z")); final Headers headers = new ConnectHeaders(); final SourceRecord record = @@ -58,7 +58,7 @@ public void testInsertMonth() { configs.put("header.name", "wallclock"); configs.put("date.time.part", "month"); transformer.configure(configs); - transformer.setInstantF(() -> Instant.parse("2020-01-01T00:00:00.000Z")); + transformer.setInstantSupplier(() -> Instant.parse("2020-01-01T00:00:00.000Z")); final Headers headers = new ConnectHeaders(); final SourceRecord record = @@ -84,7 +84,7 @@ public void testInsertDay() { configs.put("header.name", "wallclock"); configs.put("date.time.part", "day"); transformer.configure(configs); - transformer.setInstantF(() -> Instant.parse("2020-01-13T00:00:00.000Z")); + transformer.setInstantSupplier(() -> Instant.parse("2020-01-13T00:00:00.000Z")); final Headers headers = new ConnectHeaders(); final SourceRecord record = @@ -110,7 +110,7 @@ public void testInsertHour() { configs.put("header.name", "wallclock"); configs.put("date.time.part", "hour"); transformer.configure(configs); - transformer.setInstantF(() -> Instant.parse("2020-01-01T13:00:00.000Z")); + transformer.setInstantSupplier(() -> Instant.parse("2020-01-01T13:00:00.000Z")); final Headers headers = new ConnectHeaders(); final SourceRecord record = @@ -137,7 +137,7 @@ public void testInsertHourAndTimezoneIsKalkota() { configs.put("date.time.part", "hour"); configs.put("timezone", "Asia/Kolkata"); transformer.configure(configs); - transformer.setInstantF(() -> Instant.parse("2020-01-01T13:00:00.000Z")); + transformer.setInstantSupplier(() -> Instant.parse("2020-01-01T13:00:00.000Z")); final Headers headers = new ConnectHeaders(); final SourceRecord record = @@ -164,7 +164,7 @@ public void testInsertYearAndTimezoneIsKalkota() { configs.put("date.time.part", "year"); configs.put("timezone", "Asia/Kolkata"); transformer.configure(configs); - transformer.setInstantF(() -> Instant.parse("2020-12-31T23:00:00.000Z")); + transformer.setInstantSupplier(() -> Instant.parse("2020-12-31T23:00:00.000Z")); final Headers headers = new ConnectHeaders(); final SourceRecord record = @@ -190,7 +190,7 @@ public void testInsertMinute() { configs.put("header.name", "wallclock"); configs.put("date.time.part", "minute"); transformer.configure(configs); - transformer.setInstantF(() -> Instant.parse("2020-01-01T00:13:00.000Z")); + transformer.setInstantSupplier(() -> Instant.parse("2020-01-01T00:13:00.000Z")); final Headers headers = new ConnectHeaders(); final SourceRecord record = @@ -216,7 +216,7 @@ public void testInsertSecond() { configs.put("header.name", "wallclock"); configs.put("date.time.part", "second"); transformer.configure(configs); - transformer.setInstantF(() -> Instant.parse("2020-01-01T00:00:13.000Z")); + transformer.setInstantSupplier(() -> Instant.parse("2020-01-01T00:00:13.000Z")); final Headers headers = new ConnectHeaders(); final SourceRecord record = diff --git a/src/test/java/io/lenses/connect/smt/header/TimestampConverterTest.java b/src/test/java/io/lenses/connect/smt/header/TimestampConverterTest.java index 68b8dab..8b0491b 100644 --- a/src/test/java/io/lenses/connect/smt/header/TimestampConverterTest.java +++ b/src/test/java/io/lenses/connect/smt/header/TimestampConverterTest.java @@ -40,7 +40,7 @@ /** Test for {@link TimestampConverter}. */ public class TimestampConverterTest { - private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); + private static final TimeZone UTC = Constants.UTC; private static final Calendar EPOCH; private static final Calendar TIME; private static final Calendar DATE; @@ -225,7 +225,7 @@ public void testSchemalessTimestampToTimeNonUtc() { assertNotNull(header); assertEquals(Time.SCHEMA.type(), header.schema().type()); - Calendar expected = Calendar.getInstance(TimeZone.getTimeZone("UTC")); + Calendar expected = Calendar.getInstance(Constants.UTC); expected.setTimeInMillis(0L); expected.add(Calendar.HOUR, 18); expected.add(Calendar.MILLISECOND, 1234); From b36ec725a0dc0360b09309e533a5b16ed1d0d99a Mon Sep 17 00:00:00 2001 From: stheppi Date: Fri, 23 Feb 2024 14:48:45 +0000 Subject: [PATCH 3/9] separate release from build github actiions --- .github/workflows/build.yaml | 63 +----------------------- .github/workflows/release.yaml | 88 ++++++++++++++++++++++++++++++++++ 2 files changed, 89 insertions(+), 62 deletions(-) create mode 100644 .github/workflows/release.yaml diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 0f6ea48..101ba8e 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -3,11 +3,7 @@ name: CI/CD on: push: branches: - - main - - create: - tags: - - 'v*' + - * jobs: build: @@ -24,69 +20,12 @@ jobs: java-version: '11' distribution: 'temurin' - - name: Extract tag name - id: extract_tag - run: echo ::set-output name=TAG_NAME::${GITHUB_REF#refs/tags/} - - - name: Update Maven version - run: mvn versions:set -DnewVersion=${{ steps.extract_tag.outputs.TAG_NAME }} - - name: Check License run: mvn license:check - - name: Checkstyle - run: mvn checkstyle:checkstyle - - name: Build run: mvn clean package -B - name: Create JAR run: mvn jar:jar - release: - name: Create Release - needs: build - if: startsWith(github.ref, 'refs/tags/v') - - runs-on: ubuntu-latest - - steps: - - name: Checkout code - uses: actions/checkout@v3 - - - name: Set up JDK - uses: actions/setup-java@v3 - with: - java-version: '11' # Or the desired Java version - distribution: 'temurin' - - - name: Extract tag name - id: extract_tag - run: echo ::set-output name=TAG_NAME::${GITHUB_REF#refs/tags/} - - - name: Update Maven version - run: mvn versions:set -DnewVersion=${{ steps.extract_tag.outputs.TAG_NAME }} - - - name: Build Jar - run: mvn -B package --file pom.xml -DskipTests - - - name: Create Release - id: create_release - uses: actions/create-release@v1 - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - with: - tag_name: ${{ github.ref }} - release_name: Release ${{ github.ref }} - draft: false - prerelease: false - - - name: Upload JAR - uses: actions/upload-release-asset@v1 - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - with: - upload_url: ${{ steps.create_release.outputs.upload_url }} - asset_path: ./target/kafka-connect-smt-${{ steps.extract_tag.outputs.TAG_NAME }}.jar - asset_name: kafka-connect-smt-${{ steps.extract_tag.outputs.TAG_NAME }}.jar - asset_content_type: application/java-archive \ No newline at end of file diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml new file mode 100644 index 0000000..77fdc8e --- /dev/null +++ b/.github/workflows/release.yaml @@ -0,0 +1,88 @@ +name: CI/CD + +on: + create: + tags: + - 'v*' + +jobs: + build: + name: Build + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v3 + + - name: Set up JDK + uses: actions/setup-java@v3 + with: + java-version: '11' + distribution: 'temurin' + + - name: Extract tag name + id: extract_tag + run: echo ::set-output name=TAG_NAME::${GITHUB_REF#refs/tags/} + + - name: Update Maven version + run: mvn versions:set -DnewVersion=${{ steps.extract_tag.outputs.TAG_NAME }} + + - name: Check License + run: mvn license:check + + - name: Checkstyle + run: mvn checkstyle:checkstyle + + - name: Build + run: mvn clean package -B + + - name: Create JAR + run: mvn jar:jar + + release: + name: Create Release + needs: build + if: startsWith(github.ref, 'refs/tags/v') + + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v3 + + - name: Set up JDK + uses: actions/setup-java@v3 + with: + java-version: '11' # Or the desired Java version + distribution: 'temurin' + + - name: Extract tag name + id: extract_tag + run: echo ::set-output name=TAG_NAME::${GITHUB_REF#refs/tags/} + + - name: Update Maven version + run: mvn versions:set -DnewVersion=${{ steps.extract_tag.outputs.TAG_NAME }} + + - name: Build Jar + run: mvn -B package --file pom.xml -DskipTests + + - name: Create Release + id: create_release + uses: actions/create-release@v1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + tag_name: ${{ github.ref }} + release_name: Release ${{ github.ref }} + draft: false + prerelease: false + + - name: Upload JAR + uses: actions/upload-release-asset@v1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + upload_url: ${{ steps.create_release.outputs.upload_url }} + asset_path: ./target/kafka-connect-smt-${{ steps.extract_tag.outputs.TAG_NAME }}.jar + asset_name: kafka-connect-smt-${{ steps.extract_tag.outputs.TAG_NAME }}.jar + asset_content_type: application/java-archive \ No newline at end of file From 31918976074cb79148a140652b05241edd6a7d54 Mon Sep 17 00:00:00 2001 From: stheppi Date: Fri, 23 Feb 2024 14:49:40 +0000 Subject: [PATCH 4/9] Add the license header --- .../java/io/lenses/connect/smt/header/Constants.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/main/java/io/lenses/connect/smt/header/Constants.java b/src/main/java/io/lenses/connect/smt/header/Constants.java index aebebcd..c455f6b 100644 --- a/src/main/java/io/lenses/connect/smt/header/Constants.java +++ b/src/main/java/io/lenses/connect/smt/header/Constants.java @@ -1,3 +1,13 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ package io.lenses.connect.smt.header; import java.time.ZoneId; From f312366b3c1f3c65b2d7704cef9b1cf3d3dd0be7 Mon Sep 17 00:00:00 2001 From: stheppi Date: Fri, 23 Feb 2024 14:51:19 +0000 Subject: [PATCH 5/9] Fix the github action script --- .github/workflows/build.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 101ba8e..0b32744 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -1,9 +1,9 @@ name: CI/CD on: - push: - branches: - - * + push: + branches: + - "*" jobs: build: From 62ee49ba20992db9be19479f6d6d45ec7f1f2246 Mon Sep 17 00:00:00 2001 From: stheppi Date: Fri, 23 Feb 2024 14:53:17 +0000 Subject: [PATCH 6/9] Remove the on in the build.yaml --- .github/workflows/build.yaml | 5 ----- 1 file changed, 5 deletions(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 0b32744..0058d67 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -1,10 +1,5 @@ name: CI/CD -on: - push: - branches: - - "*" - jobs: build: name: Build From 5b4959108317c48179986688e51e643264a398e9 Mon Sep 17 00:00:00 2001 From: stheppi Date: Fri, 23 Feb 2024 14:53:47 +0000 Subject: [PATCH 7/9] Add the trigger back --- .github/workflows/build.yaml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 0058d67..0b32744 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -1,5 +1,10 @@ name: CI/CD +on: + push: + branches: + - "*" + jobs: build: name: Build From fec3c1a393f2b8e2d6e949cafc604526918edc83 Mon Sep 17 00:00:00 2001 From: stheppi Date: Fri, 23 Feb 2024 14:54:56 +0000 Subject: [PATCH 8/9] Set the trigger on PRs as well --- .github/workflows/build.yaml | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 0b32744..26fa29a 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -1,9 +1,11 @@ name: CI/CD on: - push: - branches: - - "*" + push: + branches: [ "*" ] + pull_request: + branches: [ "*" ] + jobs: build: From 4cbd0decdf365c18bcb00dd44be5d9da977d47d4 Mon Sep 17 00:00:00 2001 From: stheppi Date: Mon, 26 Feb 2024 14:01:13 +0000 Subject: [PATCH 9/9] Remove the system.out --- .../io/lenses/connect/smt/header/TimestampConverterTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/test/java/io/lenses/connect/smt/header/TimestampConverterTest.java b/src/test/java/io/lenses/connect/smt/header/TimestampConverterTest.java index 8b0491b..c33d243 100644 --- a/src/test/java/io/lenses/connect/smt/header/TimestampConverterTest.java +++ b/src/test/java/io/lenses/connect/smt/header/TimestampConverterTest.java @@ -181,7 +181,6 @@ public void testSchemalessTimestampToDateOnNonUTC() { final TimestampConverter transformer = new TimestampConverter<>(); transformer.configure(config); - System.out.println("DATE_PLUS_TIME: " + DATE_PLUS_TIME.getTime()); SourceRecord transformed = transformer.apply(createRecordSchemaless(DATE_PLUS_TIME.getTime())); Header header = transformed.headers().lastWithName("ts_header");