Skip to content

Commit

Permalink
Merge branch 'master' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
AliceXiaoLu authored Feb 23, 2024
2 parents 0f91dd5 + 8d80354 commit 4909f40
Show file tree
Hide file tree
Showing 12 changed files with 1,443 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

Expand All @@ -47,6 +48,7 @@
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.powermock.api.mockito.PowerMockito.when;

@PowerMockIgnore("javax.management.*")
@RunWith(PowerMockRunner.class)
@PrepareForTest({JdbcUtil.class, Connection.class})
public class SinkFactoryTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

import static org.powermock.api.mockito.PowerMockito.mock;
import static org.powermock.api.mockito.PowerMockito.when;

@PowerMockIgnore("javax.management.*")
@RunWith(PowerMockRunner.class)
@PrepareForTest({JdbcInputFormat.class, JdbcConfig.class, JdbcDialect.class})
public class JdbcInputFormatBuilderTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

Expand Down Expand Up @@ -72,6 +73,7 @@
import static org.powermock.api.mockito.PowerMockito.when;
import static org.powermock.reflect.Whitebox.setInternalState;

@PowerMockIgnore("javax.management.*")
@RunWith(PowerMockRunner.class)
@PrepareForTest({
JdbcInputFormat.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

Expand All @@ -45,6 +46,7 @@
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.powermock.api.mockito.PowerMockito.when;

@PowerMockIgnore("javax.management.*")
@RunWith(PowerMockRunner.class)
@PrepareForTest({JdbcUtil.class, Connection.class})
public class SourceFactoryTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
Expand All @@ -68,6 +69,7 @@
import static org.powermock.api.mockito.PowerMockito.when;
import static org.powermock.reflect.Whitebox.setInternalState;

@PowerMockIgnore("javax.management.*")
@RunWith(PowerMockRunner.class)
@PrepareForTest({
DistributedJdbcInputFormat.class,
Expand Down
29 changes: 1 addition & 28 deletions chunjun-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<artifactId>flink-table-planner-loader</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink sql -->
Expand Down Expand Up @@ -376,33 +376,6 @@

<build>
<plugins>
<!-- Scala Compiler -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<!-- Run scala compiler in the process-resources phas e, so that dependencies on
scala classes can be resolved later in the (Java) compile phase -->
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>

<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
scala classes can be resolved later in the (Java) test-compile phase -->
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
2 changes: 2 additions & 0 deletions chunjun-core/src/main/java/com/dtstack/chunjun/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
Expand Down Expand Up @@ -330,6 +331,7 @@ private static void configStreamExecutionEnvironment(
Thread.currentThread().getContextClassLoader(),
ConstantValue.DIRTY_DATA_DIR_NAME);
// TODO sql 支持restore.
FactoryUtil.setFactoryHelper(factoryHelper);
}
PluginUtil.registerShipfileToCachedFile(options.getAddShipfile(), env);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.dtstack.chunjun.util.PropertiesUtil;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down Expand Up @@ -50,11 +51,15 @@ public static StreamExecutionEnvironment createStreamExecutionEnvironment(Option
flinkConf = GlobalConfiguration.loadConfiguration(options.getFlinkConfDir());
}
StreamExecutionEnvironment env;
if (StringUtils.equalsIgnoreCase(ClusterMode.local.name(), options.getMode())) {
if (StringUtils.equalsIgnoreCase(ClusterMode.localTest.name(), options.getMode())) {
flinkConf.addAll(cfg);
env = new MyLocalStreamEnvironment(flinkConf);
} else {
env = StreamExecutionEnvironment.getExecutionEnvironment(cfg);
// 如果没有配置默认的并行度,那么ChunJun 默认设置并行度为1
if (!cfg.contains(CoreOptions.DEFAULT_PARALLELISM)) {
env.setParallelism(1);
}
}
env.getConfig().disableClosureCleaner();
env.getConfig().setGlobalJobParameters(cfg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,31 @@
*/
package com.dtstack.chunjun.util;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;

import java.util.HashMap;
import java.util.Map;

public class ConnectorNameConvertUtil {

// tuple f0 package name && directory name,f1 class name
private static Map<String, Tuple2<String, String>> connectorNameMap = new HashMap<>();
private static final Map<String, Tuple3<String, String, String>> connectorNameMap =
new HashMap<>();

static {
connectorNameMap.put("es", new Tuple2<>("elasticsearch6", "elasticsearch6"));
connectorNameMap.put("hbase", new Tuple2<>("hbase14", "HBase14"));
connectorNameMap.put("tidb", new Tuple2<>("mysql", "mysql"));
connectorNameMap.put("restapi", new Tuple2<>("http", "http"));
connectorNameMap.put("adbpostgresql", new Tuple2<>("postgresql", "postgresql"));
connectorNameMap.put("dorisbatch", new Tuple2<>("doris", "doris"));
connectorNameMap.put("gbase", new Tuple2<>("gBase", "gBase"));
connectorNameMap.put("protobuf", new Tuple2<>("pbformat", "pbformat"));
connectorNameMap.put("starrocks", new Tuple2<>("starrocks", "starRocks"));
connectorNameMap.put(
"es", new Tuple3<>("elasticsearch7", "elasticsearch7", "elasticsearch7"));
connectorNameMap.put("hbase", new Tuple3<>("hbase14", "HBase14", "hbase14"));
connectorNameMap.put("hbase2", new Tuple3<>("hbase2", "HBase2", null));
connectorNameMap.put("tidb", new Tuple3<>("mysql", "mysql", "mysql"));
connectorNameMap.put("restapi", new Tuple3<>("http", "http", "http"));
connectorNameMap.put(
"adbpostgresql", new Tuple3<>("postgresql", "postgresql", "postgresql"));

connectorNameMap.put("dorisbatch", new Tuple3<>("doris", "doris", "doris"));
connectorNameMap.put("starrocks", new Tuple3<>("starrocks", "starRocks", null));
connectorNameMap.put("gbase", new Tuple3<>("gbase", "gBase", null));
connectorNameMap.put("protobuf", new Tuple3<>("pbformat", "pbformat", null));
}

public static String convertClassPrefix(String originName) {
Expand All @@ -52,4 +57,12 @@ public static String convertPackageName(String originName) {
}
return connectorNameMap.get(originName).f0;
}

public static String convertPluginName(String originName) {
Tuple3<String, String, String> tuple3 = connectorNameMap.get(originName);
if (tuple3 == null) {
return originName;
}
return tuple3.f2 != null ? tuple3.f2 : originName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -484,17 +484,26 @@ public static List<String> setPipelineOptionsToEnvConfig(
jarList.addAll(urlList);

List<String> pipelineJars = new ArrayList<>();

List<String> classpathList = configuration.get(PipelineOptions.CLASSPATHS);
if (classpathList == null) {
classpathList = new ArrayList<>(urlList.size());
}

log.info("ChunJun executionMode: " + executionMode);
if (ClusterMode.getByName(executionMode) == ClusterMode.kubernetesApplication) {
for (String jarUrl : jarList) {
String newJarUrl = jarUrl;
if (StringUtils.startsWith(jarUrl, File.separator)) {
newJarUrl = "file:" + jarUrl;
}
if (pipelineJars.contains(newJarUrl)) {
continue;
if (!pipelineJars.contains(newJarUrl)) {
pipelineJars.add(newJarUrl);
}

if (!classpathList.contains(newJarUrl)) {
classpathList.add(newJarUrl);
}
pipelineJars.add(newJarUrl);
}
} else {
pipelineJars.addAll(jarList);
Expand All @@ -503,11 +512,6 @@ public static List<String> setPipelineOptionsToEnvConfig(
log.info("ChunJun reset pipeline.jars: " + pipelineJars);
configuration.set(PipelineOptions.JARS, pipelineJars);

List<String> classpathList = configuration.get(PipelineOptions.CLASSPATHS);
if (classpathList == null) {
classpathList = new ArrayList<>(urlList.size());
}
classpathList.addAll(pipelineJars);
configuration.set(PipelineOptions.CLASSPATHS, classpathList);
return pipelineJars;
} catch (Exception e) {
Expand Down
Loading

0 comments on commit 4909f40

Please sign in to comment.