Skip to content

Commit

Permalink
Optimize processor and timestamp function
Browse files Browse the repository at this point in the history
  • Loading branch information
aiwenmo committed Mar 6, 2024
1 parent 80d4ccf commit b7c421d
Show file tree
Hide file tree
Showing 19 changed files with 986 additions and 159 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ververica.cdc.common.utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Date;
import java.util.TimeZone;

/** Utility functions for datetime types: date, time, timestamp. */
public class DateTimeUtils {

private static final Logger LOG = LoggerFactory.getLogger(DateTimeUtils.class);

/** The julian date of the epoch, 1970-01-01. */
public static final int EPOCH_JULIAN = 2440588;
/**
* The number of milliseconds in a day.
*
* <p>This is the modulo 'mask' used when converting TIMESTAMP values to DATE and TIME values.
*/
public static final long MILLIS_PER_DAY = 86400000L; // = 24 * 60 * 60 * 1000

/**
* A ThreadLocal cache map for SimpleDateFormat, because SimpleDateFormat is not thread-safe.
* (string_format) => formatter
*/
private static final ThreadLocalCache<String, SimpleDateFormat> FORMATTER_CACHE =
ThreadLocalCache.of(SimpleDateFormat::new);

// --------------------------------------------------------------------------------------------
// TIMESTAMP to DATE/TIME utils
// --------------------------------------------------------------------------------------------

/**
* Get date from a timestamp.
*
* @param ts the timestamp in milliseconds.
* @return the date in days.
*/
public static int timestampMillisToDate(long ts) {
int days = (int) (ts / MILLIS_PER_DAY);
if (days < 0) {
days = days - 1;
}
return days;
}

/**
* Get time from a timestamp.
*
* @param ts the timestamp in milliseconds.
* @return the time in milliseconds.
*/
public static int timestampMillisToTime(long ts) {
return (int) (ts % MILLIS_PER_DAY);
}

// --------------------------------------------------------------------------------------------
// Parsing functions
// --------------------------------------------------------------------------------------------
/** Returns the epoch days since 1970-01-01. */
public static int parseDate(String dateStr, String fromFormat) {
// It is OK to use UTC, we just want get the epoch days
// TODO use offset, better performance
long ts = internalParseTimestampMillis(dateStr, fromFormat, TimeZone.getTimeZone("UTC"));
ZoneId zoneId = ZoneId.of("UTC");
Instant instant = Instant.ofEpochMilli(ts);
ZonedDateTime zdt = ZonedDateTime.ofInstant(instant, zoneId);
return ymdToUnixDate(zdt.getYear(), zdt.getMonthValue(), zdt.getDayOfMonth());
}

private static long internalParseTimestampMillis(String dateStr, String format, TimeZone tz) {
SimpleDateFormat formatter = FORMATTER_CACHE.get(format);
formatter.setTimeZone(tz);
try {
Date date = formatter.parse(dateStr);
return date.getTime();
} catch (ParseException e) {
LOG.error(
String.format(
"Exception when parsing datetime string '%s' in format '%s'",
dateStr, format),
e);
return Long.MIN_VALUE;
}
}

private static int ymdToUnixDate(int year, int month, int day) {
final int julian = ymdToJulian(year, month, day);
return julian - EPOCH_JULIAN;
}

private static int ymdToJulian(int year, int month, int day) {
int a = (14 - month) / 12;
int y = year + 4800 - a;
int m = month + 12 * a - 3;
return day + (153 * m + 2) / 5 + 365 * y + y / 4 - y / 100 + y / 400 - 32045;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ververica.cdc.common.utils;

import org.apache.flink.annotation.Internal;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.Function;

/**
* Provides a thread local cache with a maximum cache size per thread.
*
* <p>Note: Values must not be null.
*/
@Internal
public abstract class ThreadLocalCache<K, V> {

private static final int DEFAULT_CACHE_SIZE = 64;

private final ThreadLocal<BoundedMap<K, V>> cache = new ThreadLocal<>();
private final int maxSizePerThread;

protected ThreadLocalCache() {
this(DEFAULT_CACHE_SIZE);
}

protected ThreadLocalCache(int maxSizePerThread) {
this.maxSizePerThread = maxSizePerThread;
}

public V get(K key) {
BoundedMap<K, V> map = cache.get();
if (map == null) {
map = new BoundedMap<>(maxSizePerThread);
cache.set(map);
}
V value = map.get(key);
if (value == null) {
value = getNewInstance(key);
map.put(key, value);
}
return value;
}

public abstract V getNewInstance(K key);

private static class BoundedMap<K, V> extends LinkedHashMap<K, V> {

private static final long serialVersionUID = -211630219014422361L;

private final int maxSize;

private BoundedMap(int maxSize) {
this.maxSize = maxSize;
}

@Override
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
return this.size() > maxSize;
}
}

public static <K, V> ThreadLocalCache<K, V> of(Function<K, V> creator) {
return new ThreadLocalCache<K, V>() {
@Override
public V getNewInstance(K key) {
return creator.apply(key);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,12 @@ public String toString() {

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TransformDef that = (TransformDef) o;
return Objects.equals(sourceTable, that.sourceTable)
&& Objects.equals(projection, that.projection)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ public DataStream<Event> translateSchema(
}

public DataStream<Event> translateData(
DataStream<Event> input, List<TransformDef> transforms, OperatorID schemaOperatorID) {
DataStream<Event> input,
List<TransformDef> transforms,
OperatorID schemaOperatorID,
String timezone) {
if (transforms.isEmpty()) {
return input;
}
Expand All @@ -69,6 +72,7 @@ public DataStream<Event> translateData(
}
}
transformDataFunctionBuilder.addSchemaOperatorID(schemaOperatorID);
transformDataFunctionBuilder.addTimezone(timezone);
return input.transform(
"Transform:Data", new EventTypeInfo(), transformDataFunctionBuilder.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
// Transform Data
stream =
transformTranslator.translateData(
stream, pipelineDef.getTransforms(), schemaOperatorIDGenerator.generate());
stream,
pipelineDef.getTransforms(),
schemaOperatorIDGenerator.generate(),
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE));

// Route
RouteTranslator routeTranslator = new RouteTranslator();
Expand Down
Loading

0 comments on commit b7c421d

Please sign in to comment.