Skip to content

Commit

Permalink
optimizer report tm name
Browse files Browse the repository at this point in the history
  • Loading branch information
nicochen committed Sep 21, 2023
1 parent 888db83 commit cd02f32
Show file tree
Hide file tree
Showing 15 changed files with 398 additions and 265 deletions.

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion ams/ams-api/src/main/thrift/arctic_optimize_manager.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ service OptimizeManager {

void ping()

OptimizeTask pollTask(1:i32 queueId, 2:JobId jobId, 3:string attemptId, 4:i64 waitTime)
OptimizeTask pollTask(1:i32 queueId, 2:JobId jobId, 3:string attemptId, 4:i64 waitTime, 5:string container)
throws (1: arctic_commons.NoSuchObjectException e1)

void reportOptimizeResult(1:OptimizeTaskStat optimizeTaskStat)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ public void ping() throws TException {
}

@Override
public OptimizeTask pollTask(int queueId, JobId jobId, String attemptId, long waitTime)
public OptimizeTask pollTask(int queueId, JobId jobId, String attemptId, long waitTime, String container)
throws NoSuchObjectException, TException {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ public void ping() throws TException {
}

@Override
public OptimizeTask pollTask(int queueId, JobId jobId, String attemptId, long waitTime)
public OptimizeTask pollTask(int queueId, JobId jobId, String attemptId, long waitTime, String container)
throws TException {
return ServiceContainer.getOptimizeQueueService().pollTask(queueId, jobId, attemptId, waitTime);
return ServiceContainer.getOptimizeQueueService().pollTask(queueId, jobId, attemptId, waitTime, container);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public interface OptimizeTaskRuntimesMapper {
" job_type = #{optimizeTaskRuntime.jobId.type, jdbcType=VARCHAR}," +
" job_id = #{optimizeTaskRuntime.jobId.id, jdbcType=VARCHAR}," +
" attempt_id = #{optimizeTaskRuntime.attemptId, jdbcType=VARCHAR}," +
" container = #{optimizeTaskRuntime.container, jdbcType=VARCHAR}," +
" retry = #{optimizeTaskRuntime.retry}," +
" fail_reason = #{optimizeTaskRuntime.failReason, jdbcType=VARCHAR}," +
" fail_time = #{optimizeTaskRuntime.failTime, " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ List<TableTaskHistory> selectTaskHistory(@Param("tableIdentifier") TableIdentifi
List<TableTaskHistory> selectTaskHistoryByTraceId(@Param("taskTraceId") String taskTraceId);

@Insert("insert into " + TABLE_NAME + "(task_trace_id, retry, catalog_name, db_name, table_name, " +
"task_plan_group, start_time, end_time, cost_time, queue_id) values ( " +
"task_plan_group, start_time, end_time, cost_time, queue_id, container) values ( " +
"#{taskHistory.taskTraceId}, " +
"#{taskHistory.retry}, " +
"#{taskHistory.tableIdentifier.catalog}, " +
Expand All @@ -87,7 +87,8 @@ List<TableTaskHistory> selectTaskHistory(@Param("tableIdentifier") TableIdentifi
"#{taskHistory.endTime, " +
"typeHandler=com.netease.arctic.ams.server.mybatis.Long2TsConvertor}," +
"#{taskHistory.costTime}, " +
"#{taskHistory.queueId}) ")
"#{taskHistory.queueId}, " +
"#{taskHistory.container}) ")
void insertTaskHistory(@Param("taskHistory") TableTaskHistory taskHistory);

@Update("update " + TABLE_NAME + " set " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public class OptimizeTaskRuntime implements Cloneable {
private JobId jobId;
private String attemptId;

private String container;

private int retry = 0;
private ErrorMessage errorMessage = null;

Expand Down Expand Up @@ -166,6 +168,14 @@ public void setAttemptId(String attemptId) {
this.attemptId = attemptId;
}

public String getContainer() {
return container;
}

public void setContainer(String container) {
this.container = container;
}

public long getCostTime() {
return costTime;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public class TableTaskHistory {
private String failReason;
private long failTime;

private String container;

public TableIdentifier getTableIdentifier() {
return tableIdentifier;
}
Expand Down Expand Up @@ -112,6 +114,14 @@ public void setFailTime(long failTime) {
this.failTime = failTime;
}

public String getContainer() {
return container;
}

public void setContainer(String container) {
this.container = container;
}

@Override
public String toString() {
return "TableTaskHistory{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void onPending() {
}
}

public TableTaskHistory onExecuting(JobId jobId, String attemptId) {
public TableTaskHistory onExecuting(JobId jobId, String attemptId, String container) {
lock.lock();
try {
Preconditions.checkArgument(optimizeRuntime.getStatus() != OptimizeStatus.Prepared,
Expand All @@ -120,6 +120,7 @@ public TableTaskHistory onExecuting(JobId jobId, String attemptId) {
newRuntime.setPreparedTime(OptimizeTaskRuntime.INVALID_TIME);
newRuntime.setCostTime(0);
newRuntime.setErrorMessage(null);
newRuntime.setContainer(container);
persistTaskRuntime(newRuntime, false);
optimizeRuntime = newRuntime;
return constructNewTableTaskHistory(currentTime);
Expand Down Expand Up @@ -346,6 +347,7 @@ private TableTaskHistory constructNewTableTaskHistory(long currentTime) {
tableTaskHistory.setRetry(optimizeRuntime.getRetry());
tableTaskHistory.setStartTime(currentTime);
tableTaskHistory.setQueueId(optimizeTask.getQueueId());
tableTaskHistory.setContainer(optimizeRuntime.getContainer());

return tableTaskHistory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,10 +332,10 @@ public List<OptimizeQueueMeta> getQueues() throws NoSuchObjectException {
}
}

public OptimizeTask pollTask(int queueId, JobId jobId, String attemptId, long waitTime)
public OptimizeTask pollTask(int queueId, JobId jobId, String attemptId, long waitTime, String container)
throws NoSuchObjectException, TException {
try {
OptimizeTask task = getQueue(queueId).poll(jobId, attemptId, waitTime);
OptimizeTask task = getQueue(queueId).poll(jobId, attemptId, waitTime, container);
if (task != null) {
LOG.info("{} pollTask success, {}", jobId, task);
} else {
Expand Down Expand Up @@ -535,7 +535,7 @@ public String queueName() {
optimizeQueue.getOptimizeQueueMeta().getQueueId();
}

public OptimizeTask poll(JobId jobId, final String attemptId, long waitTime) {
public OptimizeTask poll(JobId jobId, final String attemptId, long waitTime, String container) {
long startTime = System.currentTimeMillis();
OptimizeTaskItem task = pollValidTask();
if (task == null) {
Expand All @@ -545,17 +545,17 @@ public OptimizeTask poll(JobId jobId, final String attemptId, long waitTime) {
return null;
}
}
return onExecuteOptimizeTask(task, jobId, attemptId);
return onExecuteOptimizeTask(task, jobId, attemptId, container);
}

private OptimizeTask onExecuteOptimizeTask(OptimizeTaskItem task, JobId jobId, String attemptId) {
private OptimizeTask onExecuteOptimizeTask(OptimizeTaskItem task, JobId jobId, String attemptId, String container) {
TableTaskHistory tableTaskHistory;
try {
// load files from sysdb
task.setFiles();
// update max execute time
task.setMaxExecuteTime();
tableTaskHistory = task.onExecuting(jobId, attemptId);
tableTaskHistory = task.onExecuting(jobId, attemptId, container);
} catch (Exception e) {
task.clearFiles();
LOG.error("{} handle sysdb failed, try put task back into queue", task.getTaskId(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void open(Configuration parameters) throws Exception {
public void run(SourceContext<TaskWrapper> sourceContext) throws Exception {
while (running) {
try {
TaskWrapper task = taskConsumer.pollTask(0);
TaskWrapper task = taskConsumer.pollTask(0, getRuntimeContext().getTaskNameWithSubtasks());
if (task != null) {
sourceContext.collect(task);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ public Consumer() {
public TaskWrapper pollTask() throws InterruptedException {
while (!stopped) {
try {
TaskWrapper task = baseTaskConsumer.pollTask(0);
TaskWrapper task = baseTaskConsumer.pollTask(0, "");
if (task != null) {
LOG.info("poll task {}", task);
return task;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,24 +57,24 @@ public BaseTaskConsumer(OptimizerConfig config) {
* @return - return null if got no task
*/
public TaskWrapper pollTask() throws TException {
return pollTask(DEFAULT_POLL_WAIT_TIMEOUT);
return pollTask(DEFAULT_POLL_WAIT_TIMEOUT, "");
}

/**
* poll task from Ams with timeout.
*
* @return - return null if got no task
*/
public TaskWrapper pollTask(long timeout) throws TException {
public TaskWrapper pollTask(long timeout, String container) throws TException {
int attemptId = Math.abs(ThreadLocalRandom.current().nextInt());
OptimizeTask task = pollTask(attemptId, timeout);
OptimizeTask task = pollTask(attemptId, timeout, container);
return task == null ? null : new TaskWrapper(task, attemptId);
}

private OptimizeTask pollTask(int attemptId, long timeout) throws TException {
private OptimizeTask pollTask(int attemptId, long timeout, String container) throws TException {
try {
OptimizeManager.Iface optimizeManager = OptimizeManagerClientPools.getClient(config.getAmsUrl());
return optimizeManager.pollTask(config.getQueueId(), jobId, attemptId + "", timeout);
return optimizeManager.pollTask(config.getQueueId(), jobId, attemptId + "", timeout, container);
} catch (NoSuchObjectException e) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@
import com.netease.arctic.optimizer.operator.executor.NodeTask;
import com.netease.arctic.optimizer.operator.executor.OptimizeTaskResult;
import com.netease.arctic.optimizer.operator.executor.TableIdentificationInfo;
import com.netease.arctic.utils.ExceptionUtil;
import com.netease.arctic.table.ArcticTable;
import com.netease.arctic.table.TableProperties;
import com.netease.arctic.utils.ExceptionUtil;
import com.netease.arctic.utils.SerializationUtils;
import com.netease.arctic.utils.TableTypeUtil;
import org.apache.commons.collections.CollectionUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public OptimizeTask feedTask() {
}

@Override
public TaskWrapper pollTask(long timeout) throws TException {
public TaskWrapper pollTask(long timeout, String container) throws TException {
synchronized (this) {
try {
if (this.nextTaskToConsume == null) {
Expand Down

0 comments on commit cd02f32

Please sign in to comment.