diff --git a/bin/start-server.sh b/bin/start-server.sh index e3510e3943..b4edca5f0a 100755 --- a/bin/start-server.sh +++ b/bin/start-server.sh @@ -26,7 +26,7 @@ echo "CHUNJUN_HOME:"$CHUNJUN_HOME >&2 HO_HEAP_SIZE="${HO_HEAP_SIZE:=1024m}" JAVA_OPTS="$JAVA_OPTS -Xmx${HO_HEAP_SIZE}" -JAVA_OPTS="$JAVA_OPTS -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=10006" +#JAVA_OPTS="$JAVA_OPTS -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=10006" JAVA_OPTS="$JAVA_OPTS -Xms${HO_HEAP_SIZE}" @@ -78,7 +78,7 @@ CLASS_NAME=com.dtstack.chunjun.server.ServerLauncher start(){ echo "ChunJun server starting ..." - nice -n ${CHUNJUN_NICE} $JAVA_RUN $JAVA_OPTS -cp $JAR_DIR $CLASS_NAME $@ 1> "${CHUNJUN_LOG_DIR}/chunjun.stdout" 2> "${CHUNJUN_LOG_DIR}/chunjun.err" & + nice -n ${CHUNJUN_NICE} $JAVA_RUN $JAVA_OPTS -cp $JAR_DIR $CLASS_NAME $@ 1> "${CHUNJUN_LOG_DIR}/chunjun.stdout" 2>&1 & echo $! > $pidfile ret=$? return 0 diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/lookup/JdbcLruTableFunction.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/lookup/JdbcLruTableFunction.java index b6c15981d2..9e8dc46860 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/lookup/JdbcLruTableFunction.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/lookup/JdbcLruTableFunction.java @@ -100,7 +100,7 @@ public JdbcLruTableFunction( String[] fieldNames, String[] keyNames, RowType rowType) { - super(lookupConfig, jdbcDialect.getRowConverter(rowType), keyNames, rowType); + super(lookupConfig, jdbcDialect.getRowConverter(rowType)); this.jdbcConfig = jdbcConfig; this.jdbcDialect = jdbcDialect; this.asyncPoolSize = ((JdbcLookupConfig) lookupConfig).getAsyncPoolSize(); diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/lookup/AbstractLruTableFunction.java b/chunjun-core/src/main/java/com/dtstack/chunjun/lookup/AbstractLruTableFunction.java index d0fd54329f..e5c82c4f8a 100644 --- a/chunjun-core/src/main/java/com/dtstack/chunjun/lookup/AbstractLruTableFunction.java +++ b/chunjun-core/src/main/java/com/dtstack/chunjun/lookup/AbstractLruTableFunction.java @@ -33,12 +33,9 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.functions.AsyncLookupFunction; import org.apache.flink.table.functions.FunctionContext; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.RowType; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.tuple.Pair; import java.lang.reflect.InvocationTargetException; import java.util.Arrays; @@ -64,25 +61,9 @@ public abstract class AbstractLruTableFunction extends AsyncLookupFunction { private static final int TIMEOUT_LOG_FLUSH_NUM = 10; private int timeOutNum = 0; - protected final RowData.FieldGetter[] fieldGetters; - - public AbstractLruTableFunction( - LookupConfig lookupConfig, - AbstractRowConverter rowConverter, - String[] keyNames, - RowType rowType) { + public AbstractLruTableFunction(LookupConfig lookupConfig, AbstractRowConverter rowConverter) { this.lookupConfig = lookupConfig; this.rowConverter = rowConverter; - - this.fieldGetters = new RowData.FieldGetter[keyNames.length]; - List> fieldTypeAndPositionOfKeyField = - getFieldTypeAndPositionOfKeyField(keyNames, rowType); - for (int i = 0; i < fieldTypeAndPositionOfKeyField.size(); i++) { - Pair typeAndPosition = fieldTypeAndPositionOfKeyField.get(i); - fieldGetters[i] = - RowData.createFieldGetter( - typeAndPosition.getLeft(), typeAndPosition.getRight()); - } } @Override @@ -217,19 +198,15 @@ protected void preInvoke(CompletableFuture> future, Object.. public CompletableFuture> asyncLookup(RowData keyRow) { CompletableFuture> lookupFuture = new CompletableFuture<>(); try { - Object[] keyData = new Object[keyRow.getArity()]; - for (int i = 0; i < keyRow.getArity(); i++) { - keyData[i] = fieldGetters[i].getFieldOrNull(keyRow); - } + preInvoke(lookupFuture, keyRow); - preInvoke(lookupFuture, keyData); - String cacheKey = buildCacheKey(keyData); + String cacheKey = buildCacheKey(keyRow); // 缓存判断 if (isUseCache(cacheKey)) { invokeWithCache(cacheKey, lookupFuture); return lookupFuture; } - handleAsyncInvoke(lookupFuture, keyData); + handleAsyncInvoke(lookupFuture, keyRow); } catch (Exception e) { // todo 优化 log.error(e.getMessage()); @@ -331,14 +308,4 @@ protected void dealFillDataError(CompletableFuture> future, dealMissKey(future); } } - - protected List> getFieldTypeAndPositionOfKeyField( - String[] keyNames, RowType rowType) { - List> typeAndPosition = Lists.newLinkedList(); - for (int i = 0; i < keyNames.length; i++) { - LogicalType type = rowType.getTypeAt(rowType.getFieldIndex(keyNames[i])); - typeAndPosition.add(Pair.of(type, i)); - } - return typeAndPosition; - } } diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/util/JsonUtil.java b/chunjun-core/src/main/java/com/dtstack/chunjun/util/JsonUtil.java index e9bd724c71..0db2597056 100644 --- a/chunjun-core/src/main/java/com/dtstack/chunjun/util/JsonUtil.java +++ b/chunjun-core/src/main/java/com/dtstack/chunjun/util/JsonUtil.java @@ -110,7 +110,9 @@ public static String toPrintJson(Object obj) { Map result = objectMapper.readValue(objectMapper.writeValueAsString(obj), HashMap.class); MapUtil.replaceAllElement( - result, Lists.newArrayList("pwd", "password", "druid.password", "secretKey"), "******"); + result, + Lists.newArrayList("pwd", "password", "druid.password", "secretKey"), + "******"); return objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(result); } catch (Exception e) { throw new RuntimeException("error parse [" + obj + "] to json", e); diff --git a/chunjun-core/src/test/java/com/dtstack/chunjun/lookup/AbstractLruTableFunctionTest.java b/chunjun-core/src/test/java/com/dtstack/chunjun/lookup/AbstractLruTableFunctionTest.java index c0b4a9f6a9..64eb8ae5b2 100644 --- a/chunjun-core/src/test/java/com/dtstack/chunjun/lookup/AbstractLruTableFunctionTest.java +++ b/chunjun-core/src/test/java/com/dtstack/chunjun/lookup/AbstractLruTableFunctionTest.java @@ -36,7 +36,6 @@ import org.apache.flink.runtime.taskexecutor.rpc.RpcGlobalAggregateManager; import org.apache.flink.table.data.RowData; import org.apache.flink.table.functions.FunctionContext; -import org.apache.flink.table.types.logical.RowType; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; @@ -60,11 +59,7 @@ public class AbstractLruTableFunctionTest { public void setup() { LookupConfig lookupConfig = new LookupConfig(); MockRowConverter mockRowConverter = new MockRowConverter(); - - String[] keyNames = new String[] {"id"}; - RowType rowType = mockRowConverter.getRowType(); - this.lruTableFunction = - new MockLruTableFunction(lookupConfig, mockRowConverter, keyNames, rowType); + this.lruTableFunction = new MockLruTableFunction(lookupConfig, mockRowConverter); } @Test diff --git a/chunjun-core/src/test/java/com/dtstack/chunjun/lookup/MockLruTableFunction.java b/chunjun-core/src/test/java/com/dtstack/chunjun/lookup/MockLruTableFunction.java index 3829353aff..a75acf0371 100644 --- a/chunjun-core/src/test/java/com/dtstack/chunjun/lookup/MockLruTableFunction.java +++ b/chunjun-core/src/test/java/com/dtstack/chunjun/lookup/MockLruTableFunction.java @@ -22,19 +22,14 @@ import com.dtstack.chunjun.lookup.config.LookupConfig; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.types.logical.RowType; import java.util.Collection; import java.util.concurrent.CompletableFuture; public class MockLruTableFunction extends AbstractLruTableFunction { - public MockLruTableFunction( - LookupConfig lookupConfig, - AbstractRowConverter rowConverter, - String[] keyNames, - RowType rowType) { - super(lookupConfig, rowConverter, keyNames, rowType); + public MockLruTableFunction(LookupConfig lookupConfig, AbstractRowConverter rowConverter) { + super(lookupConfig, rowConverter); } @Override diff --git a/pom.xml b/pom.xml index 9d16b94163..7ef0517a4a 100755 --- a/pom.xml +++ b/pom.xml @@ -40,8 +40,8 @@ chunjun-restore chunjun-ddl chunjun-assembly - chunjun-e2e - chunjun-local-test + + chunjun-server