Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/Tencent/bk-job into issue…
Browse files Browse the repository at this point in the history
…_3305
  • Loading branch information
wangyu096 committed Dec 27, 2024
2 parents 5d5151d + 31667b1 commit faa18b6
Show file tree
Hide file tree
Showing 20 changed files with 173 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,35 @@

package com.tencent.bk.job.common.util;

import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;

/**
* 随机工具类
*/
public class RandomUtil {

private static final Random random = new Random();

/**
* 生成随机的正整数,可用于多线程环境
*
* @return 随机数
*/
public static long getRandomPositiveLong() {
long value = random.nextLong();
long value = ThreadLocalRandom.current().nextLong();
if (value == Long.MIN_VALUE || value == 1L) {
return 1;
} else if (value < 0) {
return -value;
}
return value;
}

/**
* 生成[0,bound)范围内的随机正整数,可用于多线程环境
*
* @param bound 边界值,不包含
* @return 随机数
*/
public static int nextInt(int bound) {
return ThreadLocalRandom.current().nextInt(bound);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.tencent.bk.job.common.util;

import org.junit.jupiter.api.Test;

import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;

public class RandomUtilTest {

@Test
void testNextInt() {
for (int i = 0; i < 100; i++) {
int bound = RandomUtil.nextInt(1000) + 1;
assertThat(RandomUtil.nextInt(bound)).isGreaterThanOrEqualTo(0);
assertThat(RandomUtil.nextInt(bound)).isLessThan(bound);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,8 @@ private void reScheduleTimeoutTasks() {
}
long currentTime = System.currentTimeMillis();
runningTasks.forEach(runningTask -> {
// 如果归档任务没有正常结束,通过当前时间减去任务创建(修改)时间计算执行时长,判断是否超过合理的执行时长
if (currentTime - runningTask.getCreateTime() > TIMEOUT_MILLS ||
currentTime - runningTask.getLastUpdateTime() > TIMEOUT_MILLS) {
// 如果归档任务没有正常结束,通过当前时间减去任务最后修改时间计算执行时长,判断是否超过合理的执行时长
if (currentTime - runningTask.getLastUpdateTime() > TIMEOUT_MILLS) {
log.info("Found timeout archive task, and set archive task status to pending. taskId: {}",
runningTask.buildTaskUniqueId());
// 设置为 pending 状态,会被重新调度
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ public class JobInstanceArchiveTaskScheduler implements SmartLifecycle {
*/
private final Map<String, JobInstanceMainDataArchiveTask> scheduledTasks = new ConcurrentHashMap<>();

/**
* 调度器线程挂起 object monitor
*/
private final Object schedulerHangMonitor = new Object();


public JobInstanceArchiveTaskScheduler(ArchiveTaskService archiveTaskService,
JobInstanceHotRecordDAO taskInstanceRecordDAO,
Expand Down Expand Up @@ -118,12 +123,12 @@ public void schedule() {
return;
}
this.scheduling = true;
if (!isActive()) {
log.info("JobInstanceArchiveTaskScheduler is not active, skip");
return;
}

while (true) {
if (!isActive()) {
log.info("JobInstanceArchiveTaskScheduler is not active, skip");
return;
}
StopWatch watch = new StopWatch("archive-task-schedule");
boolean locked = false;
try {
Expand All @@ -135,7 +140,6 @@ public void schedule() {
continue;
}


// 获取待调度的任务信息(按照 DB 节点计数)
watch.start("countScheduleTasks");
Map<String, Integer> scheduleTasksGroupByDb =
Expand All @@ -162,12 +166,14 @@ public void schedule() {
watch.stop();
int taskConcurrent = archiveProperties.getTasks().getJobInstance().getConcurrent();
if (highestPriorityDbNodeTasksInfo.getRunningTaskCount() >= taskConcurrent) {
// 休眠5分钟,等待并行任务减少
log.info("Running archive task count exceed concurrent limit : {}, wait 300s", taskConcurrent);
// 休眠1分钟,等待并行任务减少
log.info("Running archive task count exceed concurrent limit : {}, wait 60s", taskConcurrent);
// 释放锁
jobInstanceArchiveTaskScheduleLock.unlock();
locked = false;
ThreadUtils.sleep(1000 * 300L);
synchronized (schedulerHangMonitor) {
schedulerHangMonitor.wait(1000 * 60L);
}
continue;
}

Expand Down Expand Up @@ -260,6 +266,10 @@ public void stop() {
}
this.active = false;
}
synchronized (schedulerHangMonitor) {
schedulerHangMonitor.notify();
log.info("Try notify scheduler when stopping");
}
stopTasksGraceful();
log.info("JobInstanceArchiveTaskScheduler stop successfully!");
}
Expand All @@ -280,8 +290,8 @@ private void stopTasksGraceful() {
}
try {
if (taskCountDownLatch != null) {
// 等待任务结束,最多等待 2min
boolean isAllTaskStopped = taskCountDownLatch.waitingForAllTasksDone(120);
// 等待任务结束,最多等待 30s(等待时间太长进程会被k8s kill掉)
boolean isAllTaskStopped = taskCountDownLatch.waitingForAllTasksDone(30);
if (!isAllTaskStopped) {
for (JobInstanceArchiveTask task : scheduledTasks.values()) {
task.forceStopAtOnce();
Expand All @@ -307,10 +317,13 @@ private static final class StopTask implements Runnable {
@Override
public void run() {
try {
log.info("[{}] Run stop task begin", task.getTaskId());
task.stop(() -> taskCountDownLatch.decrement(task.getTaskId()));
} catch (Throwable e) {
String errorMsg = "Stop archive task caught exception, task: " + task;
String errorMsg = "Stop archive task caught exception, task: " + task.getTaskId();
log.warn(errorMsg, e);
} finally {
log.info("[{}] Run stop task end", task.getTaskId());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,39 +32,7 @@
import com.tencent.bk.job.backup.archive.JobInstanceArchiveTaskScheduler;
import com.tencent.bk.job.backup.archive.JobInstanceSubTableArchivers;
import com.tencent.bk.job.backup.archive.dao.JobInstanceColdDAO;
import com.tencent.bk.job.backup.archive.dao.impl.FileSourceTaskLogRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.GseFileAgentTaskRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.GseFileExecuteObjTaskRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.GseScriptAgentTaskRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.GseScriptExecuteObjTaskRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.GseTaskRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.JobInstanceHotRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.OperationLogRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.RollingConfigRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.StepInstanceConfirmRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.StepInstanceFileRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.StepInstanceRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.StepInstanceRollingTaskRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.StepInstanceScriptRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.StepInstanceVariableRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.TaskInstanceHostRecordDAO;
import com.tencent.bk.job.backup.archive.dao.impl.TaskInstanceVariableRecordDAO;
import com.tencent.bk.job.backup.archive.impl.FileSourceTaskLogArchiver;
import com.tencent.bk.job.backup.archive.impl.GseFileAgentTaskArchiver;
import com.tencent.bk.job.backup.archive.impl.GseFileExecuteObjTaskArchiver;
import com.tencent.bk.job.backup.archive.impl.GseScriptAgentTaskArchiver;
import com.tencent.bk.job.backup.archive.impl.GseScriptExecuteObjTaskArchiver;
import com.tencent.bk.job.backup.archive.impl.GseTaskArchiver;
import com.tencent.bk.job.backup.archive.impl.OperationLogArchiver;
import com.tencent.bk.job.backup.archive.impl.RollingConfigArchiver;
import com.tencent.bk.job.backup.archive.impl.StepInstanceArchiver;
import com.tencent.bk.job.backup.archive.impl.StepInstanceConfirmArchiver;
import com.tencent.bk.job.backup.archive.impl.StepInstanceFileArchiver;
import com.tencent.bk.job.backup.archive.impl.StepInstanceRollingTaskArchiver;
import com.tencent.bk.job.backup.archive.impl.StepInstanceScriptArchiver;
import com.tencent.bk.job.backup.archive.impl.StepInstanceVariableArchiver;
import com.tencent.bk.job.backup.archive.impl.TaskInstanceHostArchiver;
import com.tencent.bk.job.backup.archive.impl.TaskInstanceVariableArchiver;
import com.tencent.bk.job.backup.archive.metrics.ArchiveTasksGauge;
import com.tencent.bk.job.backup.archive.service.ArchiveTaskService;
import com.tencent.bk.job.backup.archive.util.lock.ArchiveTaskExecuteLock;
Expand All @@ -73,7 +41,6 @@
import com.tencent.bk.job.backup.archive.util.lock.JobInstanceArchiveTaskScheduleLock;
import com.tencent.bk.job.backup.metrics.ArchiveErrorTaskCounter;
import com.tencent.bk.job.common.WatchableThreadPoolExecutor;
import com.tencent.bk.job.common.mysql.dynamic.ds.DSLContextProvider;
import io.micrometer.core.instrument.MeterRegistry;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.ObjectProvider;
Expand Down Expand Up @@ -144,7 +111,7 @@ public ThreadPoolExecutor archiveTaskStopExecutor(MeterRegistry meterRegistry) {
5,
20,
120L,
TimeUnit.MILLISECONDS,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactoryBuilder().setNameFormat("archive-task-stop-thread-pool-%d").build()
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.tencent.bk.job.backup.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;

import java.util.concurrent.ScheduledThreadPoolExecutor;

@Configuration
@Slf4j
public class SpringScheduleConfig implements SchedulingConfigurer {
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
log.info("Configure spring cron task scheduler");
//设定一个长度5的定时任务线程池
taskRegistrar.setScheduler(new ScheduledThreadPoolExecutor(5, (r, executor) -> log.error(
"ScheduledThreadPoolExecutor rejected a runnable")));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import javax.annotation.PreDestroy;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

/**
Expand All @@ -43,6 +45,8 @@ public class LoadCronJobRunner implements CommandLineRunner {
private final CronJobLoadingService cronJobLoadingService;
private final ThreadPoolExecutor crontabInitRunnerExecutor;

private Future<?> loadCronJobFuture;

@Autowired
public LoadCronJobRunner(CronJobLoadingService cronJobLoadingService,
@Qualifier("crontabInitRunnerExecutor") ThreadPoolExecutor crontabInitRunnerExecutor) {
Expand All @@ -52,9 +56,18 @@ public LoadCronJobRunner(CronJobLoadingService cronJobLoadingService,

@Override
public void run(String... args) {
crontabInitRunnerExecutor.submit(() -> {
loadCronJobFuture = crontabInitRunnerExecutor.submit(() -> {
log.info("loadCronToQuartzOnStartup");
cronJobLoadingService.loadAllCronJob();
});
}

@PreDestroy
public void destroy() {
log.info("destroy LoadCronJobRunner");
if (loadCronJobFuture != null) {
boolean result = loadCronJobFuture.cancel(true);
log.info("loadCronJobFuture cancel result:{}", result);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public interface CronJobBatchLoadService {
* @param limit 一批定时任务的数量
* @return 加载结果数据
*/
CronLoadResult batchLoadCronToQuartz(int start, int limit);
CronLoadResult batchLoadCronToQuartz(int start, int limit) throws InterruptedException;

@Data
class CronLoadResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,13 @@ public CronJobBatchLoadServiceImpl(CronJobService cronJobService) {

@Override
@JobTransactional(transactionManager = "jobCrontabTransactionManager", timeout = 30)
public CronLoadResult batchLoadCronToQuartz(int start, int limit) {
public CronLoadResult batchLoadCronToQuartz(int start, int limit) throws InterruptedException {
int successNum = 0;
int failedNum = 0;
List<CronJobBasicInfoDTO> failedCronList = new ArrayList<>();
List<CronJobBasicInfoDTO> cronJobBasicInfoList = cronJobService.listEnabledCronBasicInfoForUpdate(start, limit);
for (CronJobBasicInfoDTO cronJobBasicInfoDTO : cronJobBasicInfoList) {
checkInterrupt();
boolean result = false;
try {
result = cronJobService.addJobToQuartz(
Expand Down Expand Up @@ -93,4 +94,10 @@ public CronLoadResult batchLoadCronToQuartz(int start, int limit) {
cronLoadResult.setFailedCronList(failedCronList);
return cronLoadResult;
}

private void checkInterrupt() throws InterruptedException {
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("batchLoadCronToQuartz thread is interrupted, exit");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public void loadAllCronJob() {
}
loadingCronToQuartz = true;
loadAllCronJobToQuartz();
} catch (InterruptedException e) {
log.info("loadAllCronJob interrupted, application may be closing");
} catch (Exception e) {
log.warn("Fail to loadAllCronJob", e);
} finally {
Expand All @@ -65,7 +67,7 @@ public void loadAllCronJob() {
}
}

private void loadAllCronJobToQuartz() {
private void loadAllCronJobToQuartz() throws InterruptedException {
int start = 0;
int limit = 100;
int currentFetchNum;
Expand Down
Loading

0 comments on commit faa18b6

Please sign in to comment.