diff --git a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/table/HdfsDynamicTableFactory.java b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/table/HdfsDynamicTableFactory.java index 0d69b31349..0051dfd231 100644 --- a/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/table/HdfsDynamicTableFactory.java +++ b/chunjun-connectors/chunjun-connector-hdfs/src/main/java/com/dtstack/chunjun/connector/hdfs/table/HdfsDynamicTableFactory.java @@ -135,6 +135,7 @@ private HdfsConfig getHdfsConfig(ReadableConfig config) { hdfsConfig.setEncoding(config.get(BaseFileOptions.ENCODING)); hdfsConfig.setMaxFileSize(config.get(BaseFileOptions.MAX_FILE_SIZE)); hdfsConfig.setNextCheckRows(config.get(BaseFileOptions.NEXT_CHECK_ROWS)); + hdfsConfig.setJobIdentifier(config.get(BaseFileOptions.JOB_IDENTIFIER)); hdfsConfig.setDefaultFS(config.get(HdfsOptions.DEFAULT_FS)); hdfsConfig.setFileType(config.get(HdfsOptions.FILE_TYPE)); diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/config/BaseFileConfig.java b/chunjun-core/src/main/java/com/dtstack/chunjun/config/BaseFileConfig.java index c382ce9993..04741b9ffa 100644 --- a/chunjun-core/src/main/java/com/dtstack/chunjun/config/BaseFileConfig.java +++ b/chunjun-core/src/main/java/com/dtstack/chunjun/config/BaseFileConfig.java @@ -50,4 +50,6 @@ public class BaseFileConfig extends CommonConfig { private long nextCheckRows = 5000; private String suffix; + + private String jobIdentifier = ""; } diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/sink/format/BaseFileOutputFormat.java b/chunjun-core/src/main/java/com/dtstack/chunjun/sink/format/BaseFileOutputFormat.java index 2f84d08fc6..3dba41f14b 100644 --- a/chunjun-core/src/main/java/com/dtstack/chunjun/sink/format/BaseFileOutputFormat.java +++ b/chunjun-core/src/main/java/com/dtstack/chunjun/sink/format/BaseFileOutputFormat.java @@ -107,7 +107,12 @@ protected void initVariableFields() { } else { outputFilePath = baseFileConfig.getPath(); } - tmpPath = outputFilePath + File.separatorChar + TMP_DIR_NAME; + tmpPath = + outputFilePath + + File.separatorChar + + TMP_DIR_NAME + + baseFileConfig.getJobIdentifier(); + log.info("[initVariableFields] get tmpPath: {}", tmpPath); nextNumForCheckDataSize = baseFileConfig.getNextCheckRows(); openSource(); } diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/table/options/BaseFileOptions.java b/chunjun-core/src/main/java/com/dtstack/chunjun/table/options/BaseFileOptions.java index cb5ac1c269..5c9c333078 100644 --- a/chunjun-core/src/main/java/com/dtstack/chunjun/table/options/BaseFileOptions.java +++ b/chunjun-core/src/main/java/com/dtstack/chunjun/table/options/BaseFileOptions.java @@ -62,4 +62,11 @@ public class BaseFileOptions { .longType() .defaultValue(5000L) .withDescription("The number of data written in the next file size check"); + + public static final ConfigOption JOB_IDENTIFIER = + ConfigOptions.key("properties.job-identifier") + .stringType() + .defaultValue("") + .withDescription( + "To solve the problem of file loss caused by multiple tasks writing to the same output source at the same time"); } diff --git a/chunjun-examples/sql/hdfs/stream_sink_hdfs.sql b/chunjun-examples/sql/hdfs/stream_sink_hdfs.sql new file mode 100644 index 0000000000..79c1cc7948 --- /dev/null +++ b/chunjun-examples/sql/hdfs/stream_sink_hdfs.sql @@ -0,0 +1,42 @@ +CREATE TABLE source +( + id int, + col_boolean boolean, + col_tinyint tinyint +) WITH ( + 'connector' = 'stream-x' + ,'number-of-rows' = '10000' +); + +CREATE TABLE sink +( + id int, + col_boolean boolean +) WITH ( + 'connector' = 'hdfs-x' + ,'path' = 'hdfs://ns/user/hive/warehouse/tudou.db/type_txt' + ,'file-name' = 'pt=1' + ,'properties.hadoop.user.name' = 'root' + ,'properties.dfs.ha.namenodes.ns' = 'nn1,nn2' + ,'properties.fs.defaultFS' = 'hdfs://ns' + ,'properties.dfs.namenode.rpc-address.ns.nn2' = 'ip:9000' + ,'properties.dfs.client.failover.proxy.provider.ns' = 'org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider' + ,'properties.dfs.namenode.rpc-address.ns.nn1' = 'ip:9000' + ,'properties.dfs.nameservices' = 'ns' + ,'properties.fs.hdfs.impl.disable.cache' = 'true' + ,'properties.fs.hdfs.impl' = 'org.apache.hadoop.hdfs.DistributedFileSystem' + ,'default-fs' = 'hdfs://ns' + ,'field-delimiter' = ',' + ,'encoding' = 'utf-8' + ,'max-file-size' = '10485760' + ,'next-check-rows' = '20000' + ,'write-mode' = 'overwrite' + ,'file-type' = 'text' + -- 为了处理多任务(HDFS、HIVE)同时输出到同一数据源时数据丢失的问题 + -- 每个任务指定唯一的字符标识,且不能变化 + ,'properties.job-identifier' = 'job_id_tmp' +); + +insert into sink +select * +from source u;