Skip to content

Commit

Permalink
[hotfix-#1745][connector][hdfs][hive] Data loss occurs when multiple …
Browse files Browse the repository at this point in the history
…tasks write to the same hdfs (or hive) data source
  • Loading branch information
gaoliang committed Jul 19, 2023
1 parent a05900e commit c9c8c8a
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ private HdfsConfig getHdfsConfig(ReadableConfig config) {
hdfsConfig.setEncoding(config.get(BaseFileOptions.ENCODING));
hdfsConfig.setMaxFileSize(config.get(BaseFileOptions.MAX_FILE_SIZE));
hdfsConfig.setNextCheckRows(config.get(BaseFileOptions.NEXT_CHECK_ROWS));
hdfsConfig.setJobTimeStamp(config.get(BaseFileOptions.JOB_TIMESTAMP));

hdfsConfig.setDefaultFS(config.get(HdfsOptions.DEFAULT_FS));
hdfsConfig.setFileType(config.get(HdfsOptions.FILE_TYPE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ private HiveConfig getHiveConf(ReadableConfig config) {
hiveConf.setEncoding(config.get(BaseFileOptions.ENCODING));
hiveConf.setMaxFileSize(config.get(BaseFileOptions.MAX_FILE_SIZE));
hiveConf.setNextCheckRows(config.get(BaseFileOptions.NEXT_CHECK_ROWS));
hiveConf.setJobTimeStamp(config.get(BaseFileOptions.JOB_TIMESTAMP));

hiveConf.setDefaultFS(config.get(HdfsOptions.DEFAULT_FS));
hiveConf.setFileType(config.get(HdfsOptions.FILE_TYPE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,6 @@ public class BaseFileConfig extends CommonConfig {
private long nextCheckRows = 5000;

private String suffix;

private String jobTimeStamp = "";
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,12 @@ protected void initVariableFields() {
} else {
outputFilePath = baseFileConfig.getPath();
}
tmpPath = outputFilePath + File.separatorChar + TMP_DIR_NAME;
tmpPath =
outputFilePath
+ File.separatorChar
+ TMP_DIR_NAME
+ baseFileConfig.getJobTimeStamp();
log.info("initVariableFields get tmpPath: {}", tmpPath);
nextNumForCheckDataSize = baseFileConfig.getNextCheckRows();
openSource();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,11 @@ public class BaseFileOptions {
.longType()
.defaultValue(5000L)
.withDescription("The number of data written in the next file size check");

public static final ConfigOption<String> JOB_TIMESTAMP =
ConfigOptions.key("properties.job-timestamp")
.stringType()
.defaultValue(String.valueOf(System.currentTimeMillis()))
.withDescription(
"To solve the problem of file loss caused by multiple tasks writing to the same output source at the same time");
}

0 comments on commit c9c8c8a

Please sign in to comment.