Skip to content

Commit

Permalink
Merge pull request TencentBlueKing#3365 from wangyu096/issue_3364
Browse files Browse the repository at this point in the history
fix: 执行历史归档任务重调度逻辑不正确 TencentBlueKing#3364
  • Loading branch information
wangyu096 authored Dec 27, 2024
2 parents 09deee6 + eb7bbc3 commit a407060
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,8 @@ private void reScheduleTimeoutTasks() {
}
long currentTime = System.currentTimeMillis();
runningTasks.forEach(runningTask -> {
// 如果归档任务没有正常结束,通过当前时间减去任务创建(修改)时间计算执行时长,判断是否超过合理的执行时长
if (currentTime - runningTask.getCreateTime() > TIMEOUT_MILLS ||
currentTime - runningTask.getLastUpdateTime() > TIMEOUT_MILLS) {
// 如果归档任务没有正常结束,通过当前时间减去任务最后修改时间计算执行时长,判断是否超过合理的执行时长
if (currentTime - runningTask.getLastUpdateTime() > TIMEOUT_MILLS) {
log.info("Found timeout archive task, and set archive task status to pending. taskId: {}",
runningTask.buildTaskUniqueId());
// 设置为 pending 状态,会被重新调度
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ public class JobInstanceArchiveTaskScheduler implements SmartLifecycle {
*/
private final Map<String, JobInstanceMainDataArchiveTask> scheduledTasks = new ConcurrentHashMap<>();

/**
* 调度器线程挂起 object monitor
*/
private final Object schedulerHangMonitor = new Object();


public JobInstanceArchiveTaskScheduler(ArchiveTaskService archiveTaskService,
JobInstanceHotRecordDAO taskInstanceRecordDAO,
Expand Down Expand Up @@ -118,12 +123,12 @@ public void schedule() {
return;
}
this.scheduling = true;
if (!isActive()) {
log.info("JobInstanceArchiveTaskScheduler is not active, skip");
return;
}

while (true) {
if (!isActive()) {
log.info("JobInstanceArchiveTaskScheduler is not active, skip");
return;
}
StopWatch watch = new StopWatch("archive-task-schedule");
boolean locked = false;
try {
Expand Down Expand Up @@ -166,7 +171,9 @@ public void schedule() {
// 释放锁
jobInstanceArchiveTaskScheduleLock.unlock();
locked = false;
ThreadUtils.sleep(1000 * 60L);
synchronized (schedulerHangMonitor) {
schedulerHangMonitor.wait(1000 * 60L);
}
continue;
}

Expand Down Expand Up @@ -259,6 +266,10 @@ public void stop() {
}
this.active = false;
}
synchronized (schedulerHangMonitor) {
schedulerHangMonitor.notify();
log.info("Try notify scheduler when stopping");
}
stopTasksGraceful();
log.info("JobInstanceArchiveTaskScheduler stop successfully!");
}
Expand All @@ -279,8 +290,8 @@ private void stopTasksGraceful() {
}
try {
if (taskCountDownLatch != null) {
// 等待任务结束,最多等待 10s(等待时间太长进程会被k8s kill掉)
boolean isAllTaskStopped = taskCountDownLatch.waitingForAllTasksDone(10);
// 等待任务结束,最多等待 30s(等待时间太长进程会被k8s kill掉)
boolean isAllTaskStopped = taskCountDownLatch.waitingForAllTasksDone(30);
if (!isAllTaskStopped) {
for (JobInstanceArchiveTask task : scheduledTasks.values()) {
task.forceStopAtOnce();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ spec:
annotations:
{{ include "annotations.sha256sum.configmap" ( dict "service" "job-backup" "context" . ) | nindent 8 }}
spec:
{{- include "job.podTerminationGracePeriodSeconds" . | nindent 6 }}
{{- include "job.imagePullSecrets" . | nindent 6 }}
hostAliases: {{- include "common.tplvalues.render" (dict "value" .Values.hostAliases "context" $) | nindent 8 }}
{{- if .Values.backupConfig.affinity }}
Expand Down Expand Up @@ -104,6 +103,7 @@ spec:
- name: redis
mountPath: /etc/secrets/redis
readOnly: true
terminationGracePeriodSeconds: 80
volumes:
- name: job-storage
persistentVolumeClaim:
Expand Down

0 comments on commit a407060

Please sign in to comment.