Skip to content

Commit

Permalink
perf: 本地文件准备过程优化 #2611
Browse files Browse the repository at this point in the history
处理Review意见。
  • Loading branch information
jsonwan committed Nov 13, 2023
1 parent 2bbb146 commit c198f68
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@

public interface FileSourceTaskLogDAO {

void saveFileSourceTaskLog(FileSourceTaskLogDTO fileSourceTaskLog);
void insertOrUpdateFileSourceTaskLog(FileSourceTaskLogDTO fileSourceTaskLog);

FileSourceTaskLogDTO getLatestFileSourceTaskLog(long stepInstanceId, int executeCount);
FileSourceTaskLogDTO getFileSourceTaskLog(long stepInstanceId, int executeCount);

FileSourceTaskLogDTO getFileSourceTaskLogByBatchTaskId(String fileSourceBatchTaskId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,36 +65,48 @@ private FileSourceTaskLogDTO extractInfo(Record record) {
}

@Override
public void saveFileSourceTaskLog(FileSourceTaskLogDTO fileSourceTaskLog) {
public void insertOrUpdateFileSourceTaskLog(FileSourceTaskLogDTO fileSourceTaskLog) {
FileSourceTaskLog t = FileSourceTaskLog.FILE_SOURCE_TASK_LOG;
defaultContext.insertInto(t, t.STEP_INSTANCE_ID, t.EXECUTE_COUNT, t.START_TIME, t.END_TIME, t.TOTAL_TIME,
t.STATUS, t.FILE_SOURCE_BATCH_TASK_ID)
.values(fileSourceTaskLog.getStepInstanceId(),
fileSourceTaskLog.getExecuteCount(),
fileSourceTaskLog.getStartTime(),
fileSourceTaskLog.getEndTime(),
fileSourceTaskLog.getTotalTime(),
JooqDataTypeUtil.toByte(fileSourceTaskLog.getStatus()),
fileSourceTaskLog.getFileSourceBatchTaskId())
.onDuplicateKeyUpdate()
defaultContext.insertInto(
t,
t.STEP_INSTANCE_ID,
t.EXECUTE_COUNT,
t.START_TIME,
t.END_TIME,
t.TOTAL_TIME,
t.STATUS,
t.FILE_SOURCE_BATCH_TASK_ID
).values(
fileSourceTaskLog.getStepInstanceId(),
fileSourceTaskLog.getExecuteCount(),
fileSourceTaskLog.getStartTime(),
fileSourceTaskLog.getEndTime(),
fileSourceTaskLog.getTotalTime(),
JooqDataTypeUtil.toByte(fileSourceTaskLog.getStatus()),
fileSourceTaskLog.getFileSourceBatchTaskId()
).onDuplicateKeyUpdate()
.set(t.START_TIME, fileSourceTaskLog.getStartTime())
.set(t.END_TIME, fileSourceTaskLog.getEndTime())
.set(t.TOTAL_TIME, fileSourceTaskLog.getTotalTime())
.set(t.STATUS, JooqDataTypeUtil.toByte(fileSourceTaskLog.getStatus())).set(t.FILE_SOURCE_BATCH_TASK_ID,
fileSourceTaskLog.getFileSourceBatchTaskId())
.set(t.STATUS, JooqDataTypeUtil.toByte(fileSourceTaskLog.getStatus()))
.set(t.FILE_SOURCE_BATCH_TASK_ID, fileSourceTaskLog.getFileSourceBatchTaskId())
.execute();
}

@Override
public FileSourceTaskLogDTO getLatestFileSourceTaskLog(long stepInstanceId, int executeCount) {
public FileSourceTaskLogDTO getFileSourceTaskLog(long stepInstanceId, int executeCount) {
FileSourceTaskLog t = FileSourceTaskLog.FILE_SOURCE_TASK_LOG;
Record record = defaultContext.select(t.STEP_INSTANCE_ID, t.EXECUTE_COUNT, t.START_TIME, t.END_TIME,
Record record = defaultContext.select(
t.STEP_INSTANCE_ID,
t.EXECUTE_COUNT,
t.START_TIME,
t.END_TIME,
t.TOTAL_TIME,
t.STATUS, t.FILE_SOURCE_BATCH_TASK_ID).from(t)
t.STATUS,
t.FILE_SOURCE_BATCH_TASK_ID
).from(t)
.where(t.STEP_INSTANCE_ID.eq(stepInstanceId))
.and(t.EXECUTE_COUNT.eq(executeCount))
.orderBy(t.ID.desc())
.limit(1)
.fetchOne();
return extractInfo(record);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ public enum StepActionEnum {
*/
REFRESH(14),
/**
* 文件准备过程在发布时被取消后通知其他实例重新准备文件
* 准备文件
*/
PREPARE_FILE_AGAIN(15);
PREPARE_FILE(15);

private final int value;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public void handleEvent(StepEvent stepEvent,
case CONTINUE_FILE_PUSH:
continueGseFileStep(stepInstance);
break;
case PREPARE_FILE_AGAIN:
case PREPARE_FILE:
prepareFileAgain(stepInstance);
break;
case CLEAR:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,15 +213,15 @@ public static StepEvent continueGseFileStep(long stepInstanceId) {
}

/**
* 构造`重新准备文件`事件
* 构造`准备文件`事件
*
* @param stepInstanceId 步骤实例ID
* @return 事件
*/
public static StepEvent prepareFileAgain(long stepInstanceId) {
public static StepEvent prepareFile(long stepInstanceId) {
StepEvent stepEvent = new StepEvent();
stepEvent.setStepInstanceId(stepInstanceId);
stepEvent.setAction(StepActionEnum.PREPARE_FILE_AGAIN.getValue());
stepEvent.setAction(StepActionEnum.PREPARE_FILE.getValue());
stepEvent.setTime(LocalDateTime.now());
return stepEvent;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ private void gracefulStop() {
log.info("gracefulStop begin:{}", getTaskId());
// 1.停止正在进行的所有文件准备任务
filePrepareService.stopPrepareFile(stepInstance);
// 2.MQ消息通知其他实例重新准备文件
taskExecuteMQEventDispatcher.dispatchStepEvent(StepEvent.prepareFileAgain(stepInstance.getId()));
// 2.MQ消息通知其他实例准备文件
taskExecuteMQEventDispatcher.dispatchStepEvent(StepEvent.prepareFile(stepInstance.getId()));
this.isStopped = true;
StopTaskCounter.getInstance().decrement(getTaskId());
log.info("gracefulStop end:{}", getTaskId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ public void prepareThirdFileAsync(
log.debug("[{}]: fileSourceList={}", stepInstance.getUniqueKey(), fileSourceList);
// 放进文件源下载任务进度表中
FileSourceTaskLogDTO fileSourceTaskLogDTO = buildInitFileSourceTaskLog(stepInstance, batchTaskInfoDTO);
fileSourceTaskLogDAO.saveFileSourceTaskLog(fileSourceTaskLogDTO);
fileSourceTaskLogDAO.insertOrUpdateFileSourceTaskLog(fileSourceTaskLogDTO);
// 更新文件源任务状态
taskInstanceService.updateResolvedSourceFile(stepInstance.getId(), fileSourceList);
// 异步轮询文件下载任务
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ public StepExecutionDetailDTO getStepExecutionResult(String username, Long appId
if (stepInstance.isFileStep()) {
watch.start("involveFileSourceTaskLog");
FileSourceTaskLogDTO fileSourceTaskLog =
fileSourceTaskLogDAO.getLatestFileSourceTaskLog(
fileSourceTaskLogDAO.getFileSourceTaskLog(
stepInstance.getId(),
queryExecuteCount
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public void checkVolumeAndClear() {
private void doCheckVolumeAndClear() {
long maxSizeBytes = workerConfig.getMaxSizeGB() * 1024L * 1024L * 1024L;
File workDirFile = new File(workerConfig.getWorkspaceDirPath());
if (!workDirFile.exists()) {
if (!workDirFile.exists() && log.isDebugEnabled()) {
log.debug("Workspace({}) not exists yet, ignore clear", workerConfig.getWorkspaceDirPath());
return;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
USE job_execute;

SET NAMES utf8mb4;

DROP PROCEDURE IF EXISTS job_schema_update;

DELIMITER <JOB_UBF>

CREATE PROCEDURE job_schema_update()
BEGIN

DECLARE db VARCHAR(100);
SET AUTOCOMMIT = 0;
SELECT DATABASE() INTO db;

IF NOT EXISTS(SELECT 1
FROM information_schema.statistics
WHERE TABLE_SCHEMA = db
AND TABLE_NAME = 'file_source_task_log'
AND INDEX_NAME = 'uk_step_instance_id_execute_count') THEN
ALTER TABLE file_source_task_log ADD UNIQUE INDEX `uk_step_instance_id_execute_count` (`step_instance_id`,`execute_count`);
END IF;

IF EXISTS(SELECT 1
FROM information_schema.statistics
WHERE TABLE_SCHEMA = db
AND TABLE_NAME = 'file_source_task_log'
AND INDEX_NAME = 'idx_step_instance_id_execute_count') THEN
ALTER TABLE file_source_task_log DROP INDEX `idx_step_instance_id_execute_count`;
END IF;

COMMIT;
END <JOB_UBF>
DELIMITER ;
COMMIT;

CALL job_schema_update();

DROP PROCEDURE IF EXISTS job_schema_update;

0 comments on commit c198f68

Please sign in to comment.