Skip to content

Commit

Permalink
[hotfix-#1745][connector][hdfs][hive] Data loss occurs when multiple …
Browse files Browse the repository at this point in the history
…tasks write to the same hdfs (or hive) data source
  • Loading branch information
gaoliang committed Oct 30, 2023
1 parent e5c0066 commit 69824c8
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,21 @@ protected void deleteDataDir() {

@Override
protected void deleteTmpDataDir() {
deleteDirectory(tmpPath);
try {
Path dir = new Path(outputFilePath);
if (fs == null) {
openSource();
}
FileStatus[] fileStatuses =
fs.listStatus(dir, path -> path.getName().startsWith(TMP_DIR_NAME));
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isDirectory()) {
deleteDirectory(fileStatus.getPath().toString());
}
}
} catch (IOException e) {
throw new ChunJunRuntimeException("cannot delete tmpDir: ", e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ private HdfsConfig getHdfsConfig(ReadableConfig config) {
hdfsConfig.setEncoding(config.get(BaseFileOptions.ENCODING));
hdfsConfig.setMaxFileSize(config.get(BaseFileOptions.MAX_FILE_SIZE));
hdfsConfig.setNextCheckRows(config.get(BaseFileOptions.NEXT_CHECK_ROWS));
hdfsConfig.setJobTimeStamp(config.get(BaseFileOptions.JOB_TIMESTAMP));

hdfsConfig.setDefaultFS(config.get(HdfsOptions.DEFAULT_FS));
hdfsConfig.setFileType(config.get(HdfsOptions.FILE_TYPE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ private HiveConfig getHiveConf(ReadableConfig config) {
hiveConf.setEncoding(config.get(BaseFileOptions.ENCODING));
hiveConf.setMaxFileSize(config.get(BaseFileOptions.MAX_FILE_SIZE));
hiveConf.setNextCheckRows(config.get(BaseFileOptions.NEXT_CHECK_ROWS));
hiveConf.setJobTimeStamp(config.get(BaseFileOptions.JOB_TIMESTAMP));

hiveConf.setDefaultFS(config.get(HdfsOptions.DEFAULT_FS));
hiveConf.setFileType(config.get(HdfsOptions.FILE_TYPE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,21 @@ protected void deleteDataDir() {

@Override
protected void deleteTmpDataDir() {
deleteDirectory(tmpPath);
try {
Path dir = new Path(outputFilePath);
if (fs == null) {
openSource();
}
FileStatus[] fileStatuses =
fs.listStatus(dir, path -> path.getName().startsWith(TMP_DIR_NAME));
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isDirectory()) {
deleteDirectory(fileStatus.getPath().toString());
}
}
} catch (IOException e) {
throw new ChunJunRuntimeException("cannot delete tmpDir: ", e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,6 @@ public class BaseFileConfig extends CommonConfig {
private long nextCheckRows = 5000;

private String suffix;

private String jobTimeStamp = "";
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,12 @@ protected void initVariableFields() {
} else {
outputFilePath = baseFileConfig.getPath();
}
tmpPath = outputFilePath + File.separatorChar + TMP_DIR_NAME;
tmpPath =
outputFilePath
+ File.separatorChar
+ TMP_DIR_NAME
+ baseFileConfig.getJobTimeStamp();
log.info("initVariableFields get tmpPath: {}", tmpPath);
nextNumForCheckDataSize = baseFileConfig.getNextCheckRows();
openSource();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,11 @@ public class BaseFileOptions {
.longType()
.defaultValue(5000L)
.withDescription("The number of data written in the next file size check");

public static final ConfigOption<String> JOB_TIMESTAMP =
ConfigOptions.key("properties.job-timestamp")
.stringType()
.defaultValue(String.valueOf(System.currentTimeMillis()))
.withDescription(
"To solve the problem of file loss caused by multiple tasks writing to the same output source at the same time");
}

0 comments on commit 69824c8

Please sign in to comment.