Skip to content

Commit

Permalink
print stack
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Xiaohua Cai <[email protected]>
  • Loading branch information
kevincai committed Feb 26, 2025
1 parent 776379e commit 8908cbf
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@
import com.starrocks.thrift.TStatusCode;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.Strings;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -77,6 +79,7 @@ public class RoutineLoadTaskScheduler extends FrontendDaemon {

private static final Logger LOG = LogManager.getLogger(RoutineLoadTaskScheduler.class);

public static boolean PRINT_STACETRACE = false;
private static final long BACKEND_SLOT_UPDATE_INTERVAL_MS = 10000; // 10s
private static final long SLOT_FULL_SLEEP_MS = 10000; // 10s
private static final long POLL_TIMEOUT_SEC = 10; // 10s
Expand All @@ -91,11 +94,19 @@ public class RoutineLoadTaskScheduler extends FrontendDaemon {
@VisibleForTesting
public RoutineLoadTaskScheduler() {
super("Routine load task scheduler", 0);
if (PRINT_STACETRACE) {
LOG.warn("RoutineLoadTaskScheduler created at: {}", Strings.join(
Arrays.asList(Thread.currentThread().getStackTrace()), ' '));
}
this.routineLoadManager = GlobalStateMgr.getCurrentState().getRoutineLoadMgr();
}

public RoutineLoadTaskScheduler(RoutineLoadMgr routineLoadManager) {
super("Routine load task scheduler", 0);
if (PRINT_STACETRACE) {
LOG.warn("RoutineLoadTaskScheduler created at: {}", Strings.join(
Arrays.asList(Thread.currentThread().getStackTrace()), ' '));
}
this.routineLoadManager = routineLoadManager;
}

Expand All @@ -109,6 +120,7 @@ protected void runAfterCatalogReady() {
}

private void process() throws InterruptedException {
LOG.warn("schedule a job 0...");
updateBackendSlotIfNecessary();

int idleSlotNum = routineLoadManager.getClusterIdleSlotNum();
Expand All @@ -122,26 +134,27 @@ private void process() throws InterruptedException {
}

try {
LOG.warn("schedule a job A...");
// This step will be blocked until timeout when queue is empty
RoutineLoadTaskInfo routineLoadTaskInfo = needScheduleTasksQueue.poll(POLL_TIMEOUT_SEC, TimeUnit.SECONDS);
if (routineLoadTaskInfo == null) {
return;
}

LOG.warn("schedule a job B ...");
if (routineLoadTaskInfo.getTimeToExecuteMs() > System.currentTimeMillis()) {
// delay adding to queue to avoid endless loop
delayPutToQueue(routineLoadTaskInfo, null);
return;
}

LOG.warn("schedule a job C ...");
// try to delay scheduling this task for scheduleInterval, to void too many failure
if (System.currentTimeMillis() - routineLoadTaskInfo.getLastScheduledTime() <
routineLoadTaskInfo.getTaskScheduleIntervalMs()) {
// delay adding to queue to avoid endless loop
delayPutToQueue(routineLoadTaskInfo, null);
return;
}

LOG.warn("schedule a job D ...");
submitToSchedule(routineLoadTaskInfo);
} catch (Exception e) {
LOG.warn("Taking routine load task from queue has been interrupted", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,29 @@
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.Map;
import java.util.Queue;
import java.util.UUID;

public class RoutineLoadTaskSchedulerTest {
private static final Logger LOG = LogManager.getLogger(RoutineLoadTaskSchedulerTest.class);

@Mocked
private RoutineLoadMgr routineLoadManager;
@Mocked
private GlobalStateMgr globalStateMgr;

@BeforeClass
public static void beforeSetup() {
RoutineLoadTaskScheduler.PRINT_STACETRACE = true;
}

@Test
public void testRunOneCycle(@Injectable KafkaRoutineLoadJob kafkaRoutineLoadJob1,
@Injectable KafkaRoutineLoadJob routineLoadJob) {
Expand Down Expand Up @@ -133,7 +142,7 @@ public void testSchedulerOneTaskTxnNotFound() {
Deencapsulation.setField(kafkaProgress, "partitionIdToOffset", partitionIdToOffset);

Queue<RoutineLoadTaskInfo> routineLoadTaskInfoQueue = Queues.newLinkedBlockingQueue();
KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), routineLoadJob, 20000,
KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), routineLoadJob, 2000,
System.currentTimeMillis(), partitionIdToOffset, Config.routine_load_task_timeout_second * 1000);
routineLoadTaskInfoQueue.add(routineLoadTaskInfo1);

Expand Down Expand Up @@ -218,6 +227,7 @@ public void testSchedulerOneTaskDbNotFound() {
KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1), routineLoadJob, 20000,
System.currentTimeMillis(), partitionIdToOffset, Config.routine_load_task_timeout_second * 1000);
routineLoadTaskInfoQueue.add(routineLoadTaskInfo1);
LOG.warn("task info: {}", routineLoadTaskInfo1);

Map<Long, RoutineLoadTaskInfo> idToRoutineLoadTask = Maps.newHashMap();
idToRoutineLoadTask.put(1L, routineLoadTaskInfo1);
Expand Down

0 comments on commit 8908cbf

Please sign in to comment.