From cd02f326aaac05a537f77b6092e50f76c4d88acf Mon Sep 17 00:00:00 2001 From: nicochen Date: Thu, 21 Sep 2023 10:47:18 +0800 Subject: [PATCH] optimizer report tm name --- .../arctic/ams/api/OptimizeManager.java | 595 +++++++++++------- .../thrift/arctic_optimize_manager.thrift | 2 +- .../ams/api/MockArcticMetastoreServer.java | 2 +- .../handler/impl/OptimizeManagerHandler.java | 4 +- .../mapper/OptimizeTaskRuntimesMapper.java | 1 + .../ams/server/mapper/TaskHistoryMapper.java | 5 +- .../ams/server/model/OptimizeTaskRuntime.java | 10 + .../ams/server/model/TableTaskHistory.java | 10 + .../ams/server/optimize/OptimizeTaskItem.java | 4 +- .../service/impl/OptimizeQueueService.java | 12 +- .../arctic/optimizer/flink/FlinkConsumer.java | 2 +- .../optimizer/local/LocalOptimizer.java | 2 +- .../optimizer/operator/BaseTaskConsumer.java | 10 +- .../optimizer/operator/BaseTaskExecutor.java | 2 +- .../optimizer/operator/FakeBaseConsumer.java | 2 +- 15 files changed, 398 insertions(+), 265 deletions(-) diff --git a/ams/ams-api/src/main/gen-java/com/netease/arctic/ams/api/OptimizeManager.java b/ams/ams-api/src/main/gen-java/com/netease/arctic/ams/api/OptimizeManager.java index 1c19bb54b0..152b069282 100644 --- a/ams/ams-api/src/main/gen-java/com/netease/arctic/ams/api/OptimizeManager.java +++ b/ams/ams-api/src/main/gen-java/com/netease/arctic/ams/api/OptimizeManager.java @@ -7,18 +7,18 @@ package com.netease.arctic.ams.api; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2023-02-24") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2023-09-20") public class OptimizeManager { /** * replace TableContainer - * + * */ public interface Iface { public void ping() throws org.apache.thrift.TException; - public OptimizeTask pollTask(int queueId, JobId jobId, java.lang.String attemptId, long waitTime) throws com.netease.arctic.ams.api.NoSuchObjectException, org.apache.thrift.TException; + public OptimizeTask pollTask(int queueId, JobId jobId, java.lang.String attemptId, long waitTime, java.lang.String container) throws com.netease.arctic.ams.api.NoSuchObjectException, org.apache.thrift.TException; public void reportOptimizeResult(OptimizeTaskStat optimizeTaskStat) throws org.apache.thrift.TException; @@ -36,7 +36,7 @@ public interface AsyncIface { public void ping(org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; - public void pollTask(int queueId, JobId jobId, java.lang.String attemptId, long waitTime, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; + public void pollTask(int queueId, JobId jobId, java.lang.String attemptId, long waitTime, java.lang.String container, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; public void reportOptimizeResult(OptimizeTaskStat optimizeTaskStat, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; @@ -89,19 +89,20 @@ public void recv_ping() throws org.apache.thrift.TException return; } - public OptimizeTask pollTask(int queueId, JobId jobId, java.lang.String attemptId, long waitTime) throws com.netease.arctic.ams.api.NoSuchObjectException, org.apache.thrift.TException + public OptimizeTask pollTask(int queueId, JobId jobId, java.lang.String attemptId, long waitTime, java.lang.String container) throws com.netease.arctic.ams.api.NoSuchObjectException, org.apache.thrift.TException { - send_pollTask(queueId, jobId, attemptId, waitTime); + send_pollTask(queueId, jobId, attemptId, waitTime, container); return recv_pollTask(); } - public void send_pollTask(int queueId, JobId jobId, java.lang.String attemptId, long waitTime) throws org.apache.thrift.TException + public void send_pollTask(int queueId, JobId jobId, java.lang.String attemptId, long waitTime, java.lang.String container) throws org.apache.thrift.TException { pollTask_args args = new pollTask_args(); args.setQueueId(queueId); args.setJobId(jobId); args.setAttemptId(attemptId); args.setWaitTime(waitTime); + args.setContainer(container); sendBase("pollTask", args); } @@ -274,9 +275,9 @@ public Void getResult() throws org.apache.thrift.TException { } } - public void pollTask(int queueId, JobId jobId, java.lang.String attemptId, long waitTime, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException { + public void pollTask(int queueId, JobId jobId, java.lang.String attemptId, long waitTime, java.lang.String container, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException { checkReady(); - pollTask_call method_call = new pollTask_call(queueId, jobId, attemptId, waitTime, resultHandler, this, ___protocolFactory, ___transport); + pollTask_call method_call = new pollTask_call(queueId, jobId, attemptId, waitTime, container, resultHandler, this, ___protocolFactory, ___transport); this.___currentMethod = method_call; ___manager.call(method_call); } @@ -286,12 +287,14 @@ public static class pollTask_call extends org.apache.thrift.async.TAsyncMethodCa private JobId jobId; private java.lang.String attemptId; private long waitTime; - public pollTask_call(int queueId, JobId jobId, java.lang.String attemptId, long waitTime, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException { + private java.lang.String container; + public pollTask_call(int queueId, JobId jobId, java.lang.String attemptId, long waitTime, java.lang.String container, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException { super(client, protocolFactory, transport, resultHandler, false); this.queueId = queueId; this.jobId = jobId; this.attemptId = attemptId; this.waitTime = waitTime; + this.container = container; } public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException { @@ -301,6 +304,7 @@ public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apa args.setJobId(jobId); args.setAttemptId(attemptId); args.setWaitTime(waitTime); + args.setContainer(container); args.write(prot); prot.writeMessageEnd(); } @@ -544,7 +548,7 @@ protected boolean rethrowUnhandledExceptions() { public pollTask_result getResult(I iface, pollTask_args args) throws org.apache.thrift.TException { pollTask_result result = new pollTask_result(); try { - result.success = iface.pollTask(args.queueId, args.jobId, args.attemptId, args.waitTime); + result.success = iface.pollTask(args.queueId, args.jobId, args.attemptId, args.waitTime, args.container); } catch (com.netease.arctic.ams.api.NoSuchObjectException e1) { result.e1 = e1; } @@ -719,7 +723,7 @@ public ping_args getEmptyArgsInstance() { public org.apache.thrift.async.AsyncMethodCallback getResultHandler(final org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer fb, final int seqid) { final org.apache.thrift.AsyncProcessFunction fcall = this; - return new org.apache.thrift.async.AsyncMethodCallback() { + return new org.apache.thrift.async.AsyncMethodCallback() { public void onComplete(Void o) { ping_result result = new ping_result(); try { @@ -779,7 +783,7 @@ public pollTask_args getEmptyArgsInstance() { public org.apache.thrift.async.AsyncMethodCallback getResultHandler(final org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer fb, final int seqid) { final org.apache.thrift.AsyncProcessFunction fcall = this; - return new org.apache.thrift.async.AsyncMethodCallback() { + return new org.apache.thrift.async.AsyncMethodCallback() { public void onComplete(OptimizeTask o) { pollTask_result result = new pollTask_result(); result.success = o; @@ -829,7 +833,7 @@ protected boolean isOneway() { } public void start(I iface, pollTask_args args, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException { - iface.pollTask(args.queueId, args.jobId, args.attemptId, args.waitTime,resultHandler); + iface.pollTask(args.queueId, args.jobId, args.attemptId, args.waitTime, args.container,resultHandler); } } @@ -844,7 +848,7 @@ public reportOptimizeResult_args getEmptyArgsInstance() { public org.apache.thrift.async.AsyncMethodCallback getResultHandler(final org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer fb, final int seqid) { final org.apache.thrift.AsyncProcessFunction fcall = this; - return new org.apache.thrift.async.AsyncMethodCallback() { + return new org.apache.thrift.async.AsyncMethodCallback() { public void onComplete(Void o) { reportOptimizeResult_result result = new reportOptimizeResult_result(); try { @@ -904,7 +908,7 @@ public reportOptimizerState_args getEmptyArgsInstance() { public org.apache.thrift.async.AsyncMethodCallback getResultHandler(final org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer fb, final int seqid) { final org.apache.thrift.AsyncProcessFunction fcall = this; - return new org.apache.thrift.async.AsyncMethodCallback() { + return new org.apache.thrift.async.AsyncMethodCallback() { public void onComplete(Void o) { reportOptimizerState_result result = new reportOptimizerState_result(); try { @@ -964,7 +968,7 @@ public registerOptimizer_args getEmptyArgsInstance() { public org.apache.thrift.async.AsyncMethodCallback getResultHandler(final org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer fb, final int seqid) { final org.apache.thrift.AsyncProcessFunction fcall = this; - return new org.apache.thrift.async.AsyncMethodCallback() { + return new org.apache.thrift.async.AsyncMethodCallback() { public void onComplete(OptimizerDescriptor o) { registerOptimizer_result result = new registerOptimizer_result(); result.success = o; @@ -1025,7 +1029,7 @@ public stopOptimize_args getEmptyArgsInstance() { public org.apache.thrift.async.AsyncMethodCallback getResultHandler(final org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer fb, final int seqid) { final org.apache.thrift.AsyncProcessFunction fcall = this; - return new org.apache.thrift.async.AsyncMethodCallback() { + return new org.apache.thrift.async.AsyncMethodCallback() { public void onComplete(Void o) { stopOptimize_result result = new stopOptimize_result(); try { @@ -1089,7 +1093,7 @@ public startOptimize_args getEmptyArgsInstance() { public org.apache.thrift.async.AsyncMethodCallback getResultHandler(final org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer fb, final int seqid) { final org.apache.thrift.AsyncProcessFunction fcall = this; - return new org.apache.thrift.async.AsyncMethodCallback() { + return new org.apache.thrift.async.AsyncMethodCallback() { public void onComplete(Void o) { startOptimize_result result = new startOptimize_result(); try { @@ -1154,7 +1158,7 @@ public static class ping_args implements org.apache.thrift.TBase byName = new java.util.HashMap(); @@ -1349,7 +1353,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, ping_args struct) t while (true) { schemeField = iprot.readFieldBegin(); - if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { break; } switch (schemeField.id) { @@ -1408,7 +1412,7 @@ public static class ping_result implements org.apache.thrift.TBase byName = new java.util.HashMap(); @@ -1557,7 +1561,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.t public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { scheme(oprot).write(oprot, this); - } + } @Override public java.lang.String toString() { @@ -1603,7 +1607,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, ping_result struct) while (true) { schemeField = iprot.readFieldBegin(); - if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { break; } switch (schemeField.id) { @@ -1659,6 +1663,7 @@ public static class pollTask_args implements org.apache.thrift.TBase byName = new java.util.HashMap(); @@ -1697,6 +1704,8 @@ public static _Fields findByThriftId(int fieldId) { return ATTEMPT_ID; case 4: // WAIT_TIME return WAIT_TIME; + case 5: // CONTAINER + return CONTAINER; default: return null; } @@ -1744,14 +1753,16 @@ public java.lang.String getFieldName() { public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); - tmpMap.put(_Fields.QUEUE_ID, new org.apache.thrift.meta_data.FieldMetaData("queueId", org.apache.thrift.TFieldRequirementType.DEFAULT, + tmpMap.put(_Fields.QUEUE_ID, new org.apache.thrift.meta_data.FieldMetaData("queueId", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); - tmpMap.put(_Fields.JOB_ID, new org.apache.thrift.meta_data.FieldMetaData("jobId", org.apache.thrift.TFieldRequirementType.DEFAULT, + tmpMap.put(_Fields.JOB_ID, new org.apache.thrift.meta_data.FieldMetaData("jobId", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, JobId.class))); - tmpMap.put(_Fields.ATTEMPT_ID, new org.apache.thrift.meta_data.FieldMetaData("attemptId", org.apache.thrift.TFieldRequirementType.DEFAULT, + tmpMap.put(_Fields.ATTEMPT_ID, new org.apache.thrift.meta_data.FieldMetaData("attemptId", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); - tmpMap.put(_Fields.WAIT_TIME, new org.apache.thrift.meta_data.FieldMetaData("waitTime", org.apache.thrift.TFieldRequirementType.DEFAULT, + tmpMap.put(_Fields.WAIT_TIME, new org.apache.thrift.meta_data.FieldMetaData("waitTime", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); + tmpMap.put(_Fields.CONTAINER, new org.apache.thrift.meta_data.FieldMetaData("container", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(pollTask_args.class, metaDataMap); } @@ -1760,10 +1771,11 @@ public pollTask_args() { } public pollTask_args( - int queueId, - JobId jobId, - java.lang.String attemptId, - long waitTime) + int queueId, + JobId jobId, + java.lang.String attemptId, + long waitTime, + java.lang.String container) { this(); this.queueId = queueId; @@ -1772,6 +1784,7 @@ public pollTask_args( this.attemptId = attemptId; this.waitTime = waitTime; setWaitTimeIsSet(true); + this.container = container; } /** @@ -1787,6 +1800,9 @@ public pollTask_args(pollTask_args other) { this.attemptId = other.attemptId; } this.waitTime = other.waitTime; + if (other.isSetContainer()) { + this.container = other.container; + } } public pollTask_args deepCopy() { @@ -1801,6 +1817,7 @@ public void clear() { this.attemptId = null; setWaitTimeIsSet(false); this.waitTime = 0; + this.container = null; } public int getQueueId() { @@ -1899,39 +1916,72 @@ public void setWaitTimeIsSet(boolean value) { __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __WAITTIME_ISSET_ID, value); } + @org.apache.thrift.annotation.Nullable + public java.lang.String getContainer() { + return this.container; + } + + public pollTask_args setContainer(@org.apache.thrift.annotation.Nullable java.lang.String container) { + this.container = container; + return this; + } + + public void unsetContainer() { + this.container = null; + } + + /** Returns true if field container is set (has been assigned a value) and false otherwise */ + public boolean isSetContainer() { + return this.container != null; + } + + public void setContainerIsSet(boolean value) { + if (!value) { + this.container = null; + } + } + public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) { switch (field) { - case QUEUE_ID: - if (value == null) { - unsetQueueId(); - } else { - setQueueId((java.lang.Integer)value); - } - break; + case QUEUE_ID: + if (value == null) { + unsetQueueId(); + } else { + setQueueId((java.lang.Integer)value); + } + break; - case JOB_ID: - if (value == null) { - unsetJobId(); - } else { - setJobId((JobId)value); - } - break; + case JOB_ID: + if (value == null) { + unsetJobId(); + } else { + setJobId((JobId)value); + } + break; - case ATTEMPT_ID: - if (value == null) { - unsetAttemptId(); - } else { - setAttemptId((java.lang.String)value); - } - break; + case ATTEMPT_ID: + if (value == null) { + unsetAttemptId(); + } else { + setAttemptId((java.lang.String)value); + } + break; - case WAIT_TIME: - if (value == null) { - unsetWaitTime(); - } else { - setWaitTime((java.lang.Long)value); - } - break; + case WAIT_TIME: + if (value == null) { + unsetWaitTime(); + } else { + setWaitTime((java.lang.Long)value); + } + break; + + case CONTAINER: + if (value == null) { + unsetContainer(); + } else { + setContainer((java.lang.String)value); + } + break; } } @@ -1939,17 +1989,20 @@ public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable @org.apache.thrift.annotation.Nullable public java.lang.Object getFieldValue(_Fields field) { switch (field) { - case QUEUE_ID: - return getQueueId(); + case QUEUE_ID: + return getQueueId(); - case JOB_ID: - return getJobId(); + case JOB_ID: + return getJobId(); - case ATTEMPT_ID: - return getAttemptId(); + case ATTEMPT_ID: + return getAttemptId(); - case WAIT_TIME: - return getWaitTime(); + case WAIT_TIME: + return getWaitTime(); + + case CONTAINER: + return getContainer(); } throw new java.lang.IllegalStateException(); @@ -1962,14 +2015,16 @@ public boolean isSet(_Fields field) { } switch (field) { - case QUEUE_ID: - return isSetQueueId(); - case JOB_ID: - return isSetJobId(); - case ATTEMPT_ID: - return isSetAttemptId(); - case WAIT_TIME: - return isSetWaitTime(); + case QUEUE_ID: + return isSetQueueId(); + case JOB_ID: + return isSetJobId(); + case ATTEMPT_ID: + return isSetAttemptId(); + case WAIT_TIME: + return isSetWaitTime(); + case CONTAINER: + return isSetContainer(); } throw new java.lang.IllegalStateException(); } @@ -2025,6 +2080,15 @@ public boolean equals(pollTask_args that) { return false; } + boolean this_present_container = true && this.isSetContainer(); + boolean that_present_container = true && that.isSetContainer(); + if (this_present_container || that_present_container) { + if (!(this_present_container && that_present_container)) + return false; + if (!this.container.equals(that.container)) + return false; + } + return true; } @@ -2044,6 +2108,10 @@ public int hashCode() { hashCode = hashCode * 8191 + org.apache.thrift.TBaseHelper.hashCode(waitTime); + hashCode = hashCode * 8191 + ((isSetContainer()) ? 131071 : 524287); + if (isSetContainer()) + hashCode = hashCode * 8191 + container.hashCode(); + return hashCode; } @@ -2095,6 +2163,16 @@ public int compareTo(pollTask_args other) { return lastComparison; } } + lastComparison = java.lang.Boolean.valueOf(isSetContainer()).compareTo(other.isSetContainer()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetContainer()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.container, other.container); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -2139,6 +2217,14 @@ public java.lang.String toString() { sb.append("waitTime:"); sb.append(this.waitTime); first = false; + if (!first) sb.append(", "); + sb.append("container:"); + if (this.container == null) { + sb.append("null"); + } else { + sb.append(this.container); + } + first = false; sb.append(")"); return sb.toString(); } @@ -2183,7 +2269,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, pollTask_args struc while (true) { schemeField = iprot.readFieldBegin(); - if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { break; } switch (schemeField.id) { @@ -2191,7 +2277,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, pollTask_args struc if (schemeField.type == org.apache.thrift.protocol.TType.I32) { struct.queueId = iprot.readI32(); struct.setQueueIdIsSet(true); - } else { + } else { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; @@ -2200,7 +2286,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, pollTask_args struc struct.jobId = new JobId(); struct.jobId.read(iprot); struct.setJobIdIsSet(true); - } else { + } else { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; @@ -2208,7 +2294,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, pollTask_args struc if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { struct.attemptId = iprot.readString(); struct.setAttemptIdIsSet(true); - } else { + } else { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; @@ -2216,7 +2302,15 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, pollTask_args struc if (schemeField.type == org.apache.thrift.protocol.TType.I64) { struct.waitTime = iprot.readI64(); struct.setWaitTimeIsSet(true); - } else { + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 5: // CONTAINER + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.container = iprot.readString(); + struct.setContainerIsSet(true); + } else { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; @@ -2251,6 +2345,11 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, pollTask_args stru oprot.writeFieldBegin(WAIT_TIME_FIELD_DESC); oprot.writeI64(struct.waitTime); oprot.writeFieldEnd(); + if (struct.container != null) { + oprot.writeFieldBegin(CONTAINER_FIELD_DESC); + oprot.writeString(struct.container); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -2281,7 +2380,10 @@ public void write(org.apache.thrift.protocol.TProtocol prot, pollTask_args struc if (struct.isSetWaitTime()) { optionals.set(3); } - oprot.writeBitSet(optionals, 4); + if (struct.isSetContainer()) { + optionals.set(4); + } + oprot.writeBitSet(optionals, 5); if (struct.isSetQueueId()) { oprot.writeI32(struct.queueId); } @@ -2294,12 +2396,15 @@ public void write(org.apache.thrift.protocol.TProtocol prot, pollTask_args struc if (struct.isSetWaitTime()) { oprot.writeI64(struct.waitTime); } + if (struct.isSetContainer()) { + oprot.writeString(struct.container); + } } @Override public void read(org.apache.thrift.protocol.TProtocol prot, pollTask_args struct) throws org.apache.thrift.TException { org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot; - java.util.BitSet incoming = iprot.readBitSet(4); + java.util.BitSet incoming = iprot.readBitSet(5); if (incoming.get(0)) { struct.queueId = iprot.readI32(); struct.setQueueIdIsSet(true); @@ -2317,6 +2422,10 @@ public void read(org.apache.thrift.protocol.TProtocol prot, pollTask_args struct struct.waitTime = iprot.readI64(); struct.setWaitTimeIsSet(true); } + if (incoming.get(4)) { + struct.container = iprot.readString(); + struct.setContainerIsSet(true); + } } } @@ -2404,9 +2513,9 @@ public java.lang.String getFieldName() { public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); - tmpMap.put(_Fields.SUCCESS, new org.apache.thrift.meta_data.FieldMetaData("success", org.apache.thrift.TFieldRequirementType.DEFAULT, + tmpMap.put(_Fields.SUCCESS, new org.apache.thrift.meta_data.FieldMetaData("success", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, OptimizeTask.class))); - tmpMap.put(_Fields.E1, new org.apache.thrift.meta_data.FieldMetaData("e1", org.apache.thrift.TFieldRequirementType.DEFAULT, + tmpMap.put(_Fields.E1, new org.apache.thrift.meta_data.FieldMetaData("e1", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, com.netease.arctic.ams.api.NoSuchObjectException.class))); metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(pollTask_result.class, metaDataMap); @@ -2416,8 +2525,8 @@ public pollTask_result() { } public pollTask_result( - OptimizeTask success, - com.netease.arctic.ams.api.NoSuchObjectException e1) + OptimizeTask success, + com.netease.arctic.ams.api.NoSuchObjectException e1) { this(); this.success = success; @@ -2498,21 +2607,21 @@ public void setE1IsSet(boolean value) { public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) { switch (field) { - case SUCCESS: - if (value == null) { - unsetSuccess(); - } else { - setSuccess((OptimizeTask)value); - } - break; + case SUCCESS: + if (value == null) { + unsetSuccess(); + } else { + setSuccess((OptimizeTask)value); + } + break; - case E1: - if (value == null) { - unsetE1(); - } else { - setE1((com.netease.arctic.ams.api.NoSuchObjectException)value); - } - break; + case E1: + if (value == null) { + unsetE1(); + } else { + setE1((com.netease.arctic.ams.api.NoSuchObjectException)value); + } + break; } } @@ -2520,11 +2629,11 @@ public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable @org.apache.thrift.annotation.Nullable public java.lang.Object getFieldValue(_Fields field) { switch (field) { - case SUCCESS: - return getSuccess(); + case SUCCESS: + return getSuccess(); - case E1: - return getE1(); + case E1: + return getE1(); } throw new java.lang.IllegalStateException(); @@ -2537,10 +2646,10 @@ public boolean isSet(_Fields field) { } switch (field) { - case SUCCESS: - return isSetSuccess(); - case E1: - return isSetE1(); + case SUCCESS: + return isSetSuccess(); + case E1: + return isSetE1(); } throw new java.lang.IllegalStateException(); } @@ -2638,7 +2747,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.t public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { scheme(oprot).write(oprot, this); - } + } @Override public java.lang.String toString() { @@ -2702,7 +2811,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, pollTask_result str while (true) { schemeField = iprot.readFieldBegin(); - if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { break; } switch (schemeField.id) { @@ -2711,7 +2820,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, pollTask_result str struct.success = new OptimizeTask(); struct.success.read(iprot); struct.setSuccessIsSet(true); - } else { + } else { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; @@ -2720,7 +2829,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, pollTask_result str struct.e1 = new com.netease.arctic.ams.api.NoSuchObjectException(); struct.e1.read(iprot); struct.setE1IsSet(true); - } else { + } else { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; @@ -2878,7 +2987,7 @@ public java.lang.String getFieldName() { public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); - tmpMap.put(_Fields.OPTIMIZE_TASK_STAT, new org.apache.thrift.meta_data.FieldMetaData("optimizeTaskStat", org.apache.thrift.TFieldRequirementType.DEFAULT, + tmpMap.put(_Fields.OPTIMIZE_TASK_STAT, new org.apache.thrift.meta_data.FieldMetaData("optimizeTaskStat", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, OptimizeTaskStat.class))); metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(reportOptimizeResult_args.class, metaDataMap); @@ -2888,7 +2997,7 @@ public reportOptimizeResult_args() { } public reportOptimizeResult_args( - OptimizeTaskStat optimizeTaskStat) + OptimizeTaskStat optimizeTaskStat) { this(); this.optimizeTaskStat = optimizeTaskStat; @@ -2939,13 +3048,13 @@ public void setOptimizeTaskStatIsSet(boolean value) { public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) { switch (field) { - case OPTIMIZE_TASK_STAT: - if (value == null) { - unsetOptimizeTaskStat(); - } else { - setOptimizeTaskStat((OptimizeTaskStat)value); - } - break; + case OPTIMIZE_TASK_STAT: + if (value == null) { + unsetOptimizeTaskStat(); + } else { + setOptimizeTaskStat((OptimizeTaskStat)value); + } + break; } } @@ -2953,8 +3062,8 @@ public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable @org.apache.thrift.annotation.Nullable public java.lang.Object getFieldValue(_Fields field) { switch (field) { - case OPTIMIZE_TASK_STAT: - return getOptimizeTaskStat(); + case OPTIMIZE_TASK_STAT: + return getOptimizeTaskStat(); } throw new java.lang.IllegalStateException(); @@ -2967,8 +3076,8 @@ public boolean isSet(_Fields field) { } switch (field) { - case OPTIMIZE_TASK_STAT: - return isSetOptimizeTaskStat(); + case OPTIMIZE_TASK_STAT: + return isSetOptimizeTaskStat(); } throw new java.lang.IllegalStateException(); } @@ -3099,7 +3208,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, reportOptimizeResul while (true) { schemeField = iprot.readFieldBegin(); - if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { break; } switch (schemeField.id) { @@ -3108,7 +3217,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, reportOptimizeResul struct.optimizeTaskStat = new OptimizeTaskStat(); struct.optimizeTaskStat.read(iprot); struct.setOptimizeTaskStatIsSet(true); - } else { + } else { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; @@ -3186,7 +3295,7 @@ public static class reportOptimizeResult_result implements org.apache.thrift.TBa /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { -; + ; private static final java.util.Map byName = new java.util.HashMap(); @@ -3335,7 +3444,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.t public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { scheme(oprot).write(oprot, this); - } + } @Override public java.lang.String toString() { @@ -3381,7 +3490,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, reportOptimizeResul while (true) { schemeField = iprot.readFieldBegin(); - if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { break; } switch (schemeField.id) { @@ -3504,7 +3613,7 @@ public java.lang.String getFieldName() { public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); - tmpMap.put(_Fields.REPORT_DATA, new org.apache.thrift.meta_data.FieldMetaData("reportData", org.apache.thrift.TFieldRequirementType.DEFAULT, + tmpMap.put(_Fields.REPORT_DATA, new org.apache.thrift.meta_data.FieldMetaData("reportData", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, OptimizerStateReport.class))); metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(reportOptimizerState_args.class, metaDataMap); @@ -3514,7 +3623,7 @@ public reportOptimizerState_args() { } public reportOptimizerState_args( - OptimizerStateReport reportData) + OptimizerStateReport reportData) { this(); this.reportData = reportData; @@ -3565,13 +3674,13 @@ public void setReportDataIsSet(boolean value) { public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) { switch (field) { - case REPORT_DATA: - if (value == null) { - unsetReportData(); - } else { - setReportData((OptimizerStateReport)value); - } - break; + case REPORT_DATA: + if (value == null) { + unsetReportData(); + } else { + setReportData((OptimizerStateReport)value); + } + break; } } @@ -3579,8 +3688,8 @@ public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable @org.apache.thrift.annotation.Nullable public java.lang.Object getFieldValue(_Fields field) { switch (field) { - case REPORT_DATA: - return getReportData(); + case REPORT_DATA: + return getReportData(); } throw new java.lang.IllegalStateException(); @@ -3593,8 +3702,8 @@ public boolean isSet(_Fields field) { } switch (field) { - case REPORT_DATA: - return isSetReportData(); + case REPORT_DATA: + return isSetReportData(); } throw new java.lang.IllegalStateException(); } @@ -3725,7 +3834,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, reportOptimizerStat while (true) { schemeField = iprot.readFieldBegin(); - if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { break; } switch (schemeField.id) { @@ -3734,7 +3843,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, reportOptimizerStat struct.reportData = new OptimizerStateReport(); struct.reportData.read(iprot); struct.setReportDataIsSet(true); - } else { + } else { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; @@ -3812,7 +3921,7 @@ public static class reportOptimizerState_result implements org.apache.thrift.TBa /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { -; + ; private static final java.util.Map byName = new java.util.HashMap(); @@ -3961,7 +4070,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.t public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { scheme(oprot).write(oprot, this); - } + } @Override public java.lang.String toString() { @@ -4007,7 +4116,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, reportOptimizerStat while (true) { schemeField = iprot.readFieldBegin(); - if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { break; } switch (schemeField.id) { @@ -4130,7 +4239,7 @@ public java.lang.String getFieldName() { public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); - tmpMap.put(_Fields.REGISTER_INFO, new org.apache.thrift.meta_data.FieldMetaData("registerInfo", org.apache.thrift.TFieldRequirementType.DEFAULT, + tmpMap.put(_Fields.REGISTER_INFO, new org.apache.thrift.meta_data.FieldMetaData("registerInfo", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, OptimizerRegisterInfo.class))); metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(registerOptimizer_args.class, metaDataMap); @@ -4140,7 +4249,7 @@ public registerOptimizer_args() { } public registerOptimizer_args( - OptimizerRegisterInfo registerInfo) + OptimizerRegisterInfo registerInfo) { this(); this.registerInfo = registerInfo; @@ -4191,13 +4300,13 @@ public void setRegisterInfoIsSet(boolean value) { public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) { switch (field) { - case REGISTER_INFO: - if (value == null) { - unsetRegisterInfo(); - } else { - setRegisterInfo((OptimizerRegisterInfo)value); - } - break; + case REGISTER_INFO: + if (value == null) { + unsetRegisterInfo(); + } else { + setRegisterInfo((OptimizerRegisterInfo)value); + } + break; } } @@ -4205,8 +4314,8 @@ public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable @org.apache.thrift.annotation.Nullable public java.lang.Object getFieldValue(_Fields field) { switch (field) { - case REGISTER_INFO: - return getRegisterInfo(); + case REGISTER_INFO: + return getRegisterInfo(); } throw new java.lang.IllegalStateException(); @@ -4219,8 +4328,8 @@ public boolean isSet(_Fields field) { } switch (field) { - case REGISTER_INFO: - return isSetRegisterInfo(); + case REGISTER_INFO: + return isSetRegisterInfo(); } throw new java.lang.IllegalStateException(); } @@ -4351,7 +4460,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, registerOptimizer_a while (true) { schemeField = iprot.readFieldBegin(); - if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { break; } switch (schemeField.id) { @@ -4360,7 +4469,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, registerOptimizer_a struct.registerInfo = new OptimizerRegisterInfo(); struct.registerInfo.read(iprot); struct.setRegisterInfoIsSet(true); - } else { + } else { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; @@ -4502,7 +4611,7 @@ public java.lang.String getFieldName() { public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); - tmpMap.put(_Fields.SUCCESS, new org.apache.thrift.meta_data.FieldMetaData("success", org.apache.thrift.TFieldRequirementType.DEFAULT, + tmpMap.put(_Fields.SUCCESS, new org.apache.thrift.meta_data.FieldMetaData("success", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, OptimizerDescriptor.class))); metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(registerOptimizer_result.class, metaDataMap); @@ -4512,7 +4621,7 @@ public registerOptimizer_result() { } public registerOptimizer_result( - OptimizerDescriptor success) + OptimizerDescriptor success) { this(); this.success = success; @@ -4563,13 +4672,13 @@ public void setSuccessIsSet(boolean value) { public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) { switch (field) { - case SUCCESS: - if (value == null) { - unsetSuccess(); - } else { - setSuccess((OptimizerDescriptor)value); - } - break; + case SUCCESS: + if (value == null) { + unsetSuccess(); + } else { + setSuccess((OptimizerDescriptor)value); + } + break; } } @@ -4577,8 +4686,8 @@ public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable @org.apache.thrift.annotation.Nullable public java.lang.Object getFieldValue(_Fields field) { switch (field) { - case SUCCESS: - return getSuccess(); + case SUCCESS: + return getSuccess(); } throw new java.lang.IllegalStateException(); @@ -4591,8 +4700,8 @@ public boolean isSet(_Fields field) { } switch (field) { - case SUCCESS: - return isSetSuccess(); + case SUCCESS: + return isSetSuccess(); } throw new java.lang.IllegalStateException(); } @@ -4667,7 +4776,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.t public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { scheme(oprot).write(oprot, this); - } + } @Override public java.lang.String toString() { @@ -4723,7 +4832,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, registerOptimizer_r while (true) { schemeField = iprot.readFieldBegin(); - if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { break; } switch (schemeField.id) { @@ -4732,7 +4841,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, registerOptimizer_r struct.success = new OptimizerDescriptor(); struct.success.read(iprot); struct.setSuccessIsSet(true); - } else { + } else { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; @@ -4874,7 +4983,7 @@ public java.lang.String getFieldName() { public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); - tmpMap.put(_Fields.TABLE_IDENTIFIER, new org.apache.thrift.meta_data.FieldMetaData("tableIdentifier", org.apache.thrift.TFieldRequirementType.DEFAULT, + tmpMap.put(_Fields.TABLE_IDENTIFIER, new org.apache.thrift.meta_data.FieldMetaData("tableIdentifier", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, com.netease.arctic.ams.api.TableIdentifier.class))); metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(stopOptimize_args.class, metaDataMap); @@ -4884,7 +4993,7 @@ public stopOptimize_args() { } public stopOptimize_args( - com.netease.arctic.ams.api.TableIdentifier tableIdentifier) + com.netease.arctic.ams.api.TableIdentifier tableIdentifier) { this(); this.tableIdentifier = tableIdentifier; @@ -4935,13 +5044,13 @@ public void setTableIdentifierIsSet(boolean value) { public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) { switch (field) { - case TABLE_IDENTIFIER: - if (value == null) { - unsetTableIdentifier(); - } else { - setTableIdentifier((com.netease.arctic.ams.api.TableIdentifier)value); - } - break; + case TABLE_IDENTIFIER: + if (value == null) { + unsetTableIdentifier(); + } else { + setTableIdentifier((com.netease.arctic.ams.api.TableIdentifier)value); + } + break; } } @@ -4949,8 +5058,8 @@ public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable @org.apache.thrift.annotation.Nullable public java.lang.Object getFieldValue(_Fields field) { switch (field) { - case TABLE_IDENTIFIER: - return getTableIdentifier(); + case TABLE_IDENTIFIER: + return getTableIdentifier(); } throw new java.lang.IllegalStateException(); @@ -4963,8 +5072,8 @@ public boolean isSet(_Fields field) { } switch (field) { - case TABLE_IDENTIFIER: - return isSetTableIdentifier(); + case TABLE_IDENTIFIER: + return isSetTableIdentifier(); } throw new java.lang.IllegalStateException(); } @@ -5095,7 +5204,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, stopOptimize_args s while (true) { schemeField = iprot.readFieldBegin(); - if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { break; } switch (schemeField.id) { @@ -5104,7 +5213,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, stopOptimize_args s struct.tableIdentifier = new com.netease.arctic.ams.api.TableIdentifier(); struct.tableIdentifier.read(iprot); struct.setTableIdentifierIsSet(true); - } else { + } else { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; @@ -5246,7 +5355,7 @@ public java.lang.String getFieldName() { public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); - tmpMap.put(_Fields.E, new org.apache.thrift.meta_data.FieldMetaData("e", org.apache.thrift.TFieldRequirementType.DEFAULT, + tmpMap.put(_Fields.E, new org.apache.thrift.meta_data.FieldMetaData("e", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, com.netease.arctic.ams.api.OperationErrorException.class))); metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(stopOptimize_result.class, metaDataMap); @@ -5256,7 +5365,7 @@ public stopOptimize_result() { } public stopOptimize_result( - com.netease.arctic.ams.api.OperationErrorException e) + com.netease.arctic.ams.api.OperationErrorException e) { this(); this.e = e; @@ -5307,13 +5416,13 @@ public void setEIsSet(boolean value) { public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) { switch (field) { - case E: - if (value == null) { - unsetE(); - } else { - setE((com.netease.arctic.ams.api.OperationErrorException)value); - } - break; + case E: + if (value == null) { + unsetE(); + } else { + setE((com.netease.arctic.ams.api.OperationErrorException)value); + } + break; } } @@ -5321,8 +5430,8 @@ public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable @org.apache.thrift.annotation.Nullable public java.lang.Object getFieldValue(_Fields field) { switch (field) { - case E: - return getE(); + case E: + return getE(); } throw new java.lang.IllegalStateException(); @@ -5335,8 +5444,8 @@ public boolean isSet(_Fields field) { } switch (field) { - case E: - return isSetE(); + case E: + return isSetE(); } throw new java.lang.IllegalStateException(); } @@ -5411,7 +5520,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.t public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { scheme(oprot).write(oprot, this); - } + } @Override public java.lang.String toString() { @@ -5464,7 +5573,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, stopOptimize_result while (true) { schemeField = iprot.readFieldBegin(); - if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { break; } switch (schemeField.id) { @@ -5473,7 +5582,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, stopOptimize_result struct.e = new com.netease.arctic.ams.api.OperationErrorException(); struct.e.read(iprot); struct.setEIsSet(true); - } else { + } else { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; @@ -5615,7 +5724,7 @@ public java.lang.String getFieldName() { public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); - tmpMap.put(_Fields.TABLE_IDENTIFIER, new org.apache.thrift.meta_data.FieldMetaData("tableIdentifier", org.apache.thrift.TFieldRequirementType.DEFAULT, + tmpMap.put(_Fields.TABLE_IDENTIFIER, new org.apache.thrift.meta_data.FieldMetaData("tableIdentifier", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, com.netease.arctic.ams.api.TableIdentifier.class))); metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(startOptimize_args.class, metaDataMap); @@ -5625,7 +5734,7 @@ public startOptimize_args() { } public startOptimize_args( - com.netease.arctic.ams.api.TableIdentifier tableIdentifier) + com.netease.arctic.ams.api.TableIdentifier tableIdentifier) { this(); this.tableIdentifier = tableIdentifier; @@ -5676,13 +5785,13 @@ public void setTableIdentifierIsSet(boolean value) { public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) { switch (field) { - case TABLE_IDENTIFIER: - if (value == null) { - unsetTableIdentifier(); - } else { - setTableIdentifier((com.netease.arctic.ams.api.TableIdentifier)value); - } - break; + case TABLE_IDENTIFIER: + if (value == null) { + unsetTableIdentifier(); + } else { + setTableIdentifier((com.netease.arctic.ams.api.TableIdentifier)value); + } + break; } } @@ -5690,8 +5799,8 @@ public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable @org.apache.thrift.annotation.Nullable public java.lang.Object getFieldValue(_Fields field) { switch (field) { - case TABLE_IDENTIFIER: - return getTableIdentifier(); + case TABLE_IDENTIFIER: + return getTableIdentifier(); } throw new java.lang.IllegalStateException(); @@ -5704,8 +5813,8 @@ public boolean isSet(_Fields field) { } switch (field) { - case TABLE_IDENTIFIER: - return isSetTableIdentifier(); + case TABLE_IDENTIFIER: + return isSetTableIdentifier(); } throw new java.lang.IllegalStateException(); } @@ -5836,7 +5945,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, startOptimize_args while (true) { schemeField = iprot.readFieldBegin(); - if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { break; } switch (schemeField.id) { @@ -5845,7 +5954,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, startOptimize_args struct.tableIdentifier = new com.netease.arctic.ams.api.TableIdentifier(); struct.tableIdentifier.read(iprot); struct.setTableIdentifierIsSet(true); - } else { + } else { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; @@ -5987,7 +6096,7 @@ public java.lang.String getFieldName() { public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); - tmpMap.put(_Fields.E, new org.apache.thrift.meta_data.FieldMetaData("e", org.apache.thrift.TFieldRequirementType.DEFAULT, + tmpMap.put(_Fields.E, new org.apache.thrift.meta_data.FieldMetaData("e", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, com.netease.arctic.ams.api.OperationErrorException.class))); metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(startOptimize_result.class, metaDataMap); @@ -5997,7 +6106,7 @@ public startOptimize_result() { } public startOptimize_result( - com.netease.arctic.ams.api.OperationErrorException e) + com.netease.arctic.ams.api.OperationErrorException e) { this(); this.e = e; @@ -6048,13 +6157,13 @@ public void setEIsSet(boolean value) { public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) { switch (field) { - case E: - if (value == null) { - unsetE(); - } else { - setE((com.netease.arctic.ams.api.OperationErrorException)value); - } - break; + case E: + if (value == null) { + unsetE(); + } else { + setE((com.netease.arctic.ams.api.OperationErrorException)value); + } + break; } } @@ -6062,8 +6171,8 @@ public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable @org.apache.thrift.annotation.Nullable public java.lang.Object getFieldValue(_Fields field) { switch (field) { - case E: - return getE(); + case E: + return getE(); } throw new java.lang.IllegalStateException(); @@ -6076,8 +6185,8 @@ public boolean isSet(_Fields field) { } switch (field) { - case E: - return isSetE(); + case E: + return isSetE(); } throw new java.lang.IllegalStateException(); } @@ -6152,7 +6261,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.t public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { scheme(oprot).write(oprot, this); - } + } @Override public java.lang.String toString() { @@ -6205,7 +6314,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, startOptimize_resul while (true) { schemeField = iprot.readFieldBegin(); - if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { break; } switch (schemeField.id) { @@ -6214,7 +6323,7 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, startOptimize_resul struct.e = new com.netease.arctic.ams.api.OperationErrorException(); struct.e.read(iprot); struct.setEIsSet(true); - } else { + } else { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; diff --git a/ams/ams-api/src/main/thrift/arctic_optimize_manager.thrift b/ams/ams-api/src/main/thrift/arctic_optimize_manager.thrift index 98e725db4f..c4c0d159a6 100644 --- a/ams/ams-api/src/main/thrift/arctic_optimize_manager.thrift +++ b/ams/ams-api/src/main/thrift/arctic_optimize_manager.thrift @@ -119,7 +119,7 @@ service OptimizeManager { void ping() - OptimizeTask pollTask(1:i32 queueId, 2:JobId jobId, 3:string attemptId, 4:i64 waitTime) + OptimizeTask pollTask(1:i32 queueId, 2:JobId jobId, 3:string attemptId, 4:i64 waitTime, 5:string container) throws (1: arctic_commons.NoSuchObjectException e1) void reportOptimizeResult(1:OptimizeTaskStat optimizeTaskStat) diff --git a/ams/ams-api/src/test/java/com/netease/arctic/ams/api/MockArcticMetastoreServer.java b/ams/ams-api/src/test/java/com/netease/arctic/ams/api/MockArcticMetastoreServer.java index 57903852c3..7959961036 100644 --- a/ams/ams-api/src/test/java/com/netease/arctic/ams/api/MockArcticMetastoreServer.java +++ b/ams/ams-api/src/test/java/com/netease/arctic/ams/api/MockArcticMetastoreServer.java @@ -383,7 +383,7 @@ public void ping() throws TException { } @Override - public OptimizeTask pollTask(int queueId, JobId jobId, String attemptId, long waitTime) + public OptimizeTask pollTask(int queueId, JobId jobId, String attemptId, long waitTime, String container) throws NoSuchObjectException, TException { return null; } diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/handler/impl/OptimizeManagerHandler.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/handler/impl/OptimizeManagerHandler.java index c811bdad38..c380aa539f 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/handler/impl/OptimizeManagerHandler.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/handler/impl/OptimizeManagerHandler.java @@ -42,9 +42,9 @@ public void ping() throws TException { } @Override - public OptimizeTask pollTask(int queueId, JobId jobId, String attemptId, long waitTime) + public OptimizeTask pollTask(int queueId, JobId jobId, String attemptId, long waitTime, String container) throws TException { - return ServiceContainer.getOptimizeQueueService().pollTask(queueId, jobId, attemptId, waitTime); + return ServiceContainer.getOptimizeQueueService().pollTask(queueId, jobId, attemptId, waitTime, container); } @Override diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/mapper/OptimizeTaskRuntimesMapper.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/mapper/OptimizeTaskRuntimesMapper.java index b8e05787ed..b25c5f9668 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/mapper/OptimizeTaskRuntimesMapper.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/mapper/OptimizeTaskRuntimesMapper.java @@ -76,6 +76,7 @@ public interface OptimizeTaskRuntimesMapper { " job_type = #{optimizeTaskRuntime.jobId.type, jdbcType=VARCHAR}," + " job_id = #{optimizeTaskRuntime.jobId.id, jdbcType=VARCHAR}," + " attempt_id = #{optimizeTaskRuntime.attemptId, jdbcType=VARCHAR}," + + " container = #{optimizeTaskRuntime.container, jdbcType=VARCHAR}," + " retry = #{optimizeTaskRuntime.retry}," + " fail_reason = #{optimizeTaskRuntime.failReason, jdbcType=VARCHAR}," + " fail_time = #{optimizeTaskRuntime.failTime, " + diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/mapper/TaskHistoryMapper.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/mapper/TaskHistoryMapper.java index b1dbe6eb5b..f03e01b3cc 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/mapper/TaskHistoryMapper.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/mapper/TaskHistoryMapper.java @@ -75,7 +75,7 @@ List selectTaskHistory(@Param("tableIdentifier") TableIdentifi List selectTaskHistoryByTraceId(@Param("taskTraceId") String taskTraceId); @Insert("insert into " + TABLE_NAME + "(task_trace_id, retry, catalog_name, db_name, table_name, " + - "task_plan_group, start_time, end_time, cost_time, queue_id) values ( " + + "task_plan_group, start_time, end_time, cost_time, queue_id, container) values ( " + "#{taskHistory.taskTraceId}, " + "#{taskHistory.retry}, " + "#{taskHistory.tableIdentifier.catalog}, " + @@ -87,7 +87,8 @@ List selectTaskHistory(@Param("tableIdentifier") TableIdentifi "#{taskHistory.endTime, " + "typeHandler=com.netease.arctic.ams.server.mybatis.Long2TsConvertor}," + "#{taskHistory.costTime}, " + - "#{taskHistory.queueId}) ") + "#{taskHistory.queueId}, " + + "#{taskHistory.container}) ") void insertTaskHistory(@Param("taskHistory") TableTaskHistory taskHistory); @Update("update " + TABLE_NAME + " set " + diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/model/OptimizeTaskRuntime.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/model/OptimizeTaskRuntime.java index c45c8f7846..4abdf44cb1 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/model/OptimizeTaskRuntime.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/model/OptimizeTaskRuntime.java @@ -40,6 +40,8 @@ public class OptimizeTaskRuntime implements Cloneable { private JobId jobId; private String attemptId; + private String container; + private int retry = 0; private ErrorMessage errorMessage = null; @@ -166,6 +168,14 @@ public void setAttemptId(String attemptId) { this.attemptId = attemptId; } + public String getContainer() { + return container; + } + + public void setContainer(String container) { + this.container = container; + } + public long getCostTime() { return costTime; } diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/model/TableTaskHistory.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/model/TableTaskHistory.java index 8b73ae8af7..b12aa6982d 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/model/TableTaskHistory.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/model/TableTaskHistory.java @@ -32,6 +32,8 @@ public class TableTaskHistory { private String failReason; private long failTime; + private String container; + public TableIdentifier getTableIdentifier() { return tableIdentifier; } @@ -112,6 +114,14 @@ public void setFailTime(long failTime) { this.failTime = failTime; } + public String getContainer() { + return container; + } + + public void setContainer(String container) { + this.container = container; + } + @Override public String toString() { return "TableTaskHistory{" + diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/OptimizeTaskItem.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/OptimizeTaskItem.java index 8d723d9029..e8b779e5cb 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/OptimizeTaskItem.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/OptimizeTaskItem.java @@ -106,7 +106,7 @@ public void onPending() { } } - public TableTaskHistory onExecuting(JobId jobId, String attemptId) { + public TableTaskHistory onExecuting(JobId jobId, String attemptId, String container) { lock.lock(); try { Preconditions.checkArgument(optimizeRuntime.getStatus() != OptimizeStatus.Prepared, @@ -120,6 +120,7 @@ public TableTaskHistory onExecuting(JobId jobId, String attemptId) { newRuntime.setPreparedTime(OptimizeTaskRuntime.INVALID_TIME); newRuntime.setCostTime(0); newRuntime.setErrorMessage(null); + newRuntime.setContainer(container); persistTaskRuntime(newRuntime, false); optimizeRuntime = newRuntime; return constructNewTableTaskHistory(currentTime); @@ -346,6 +347,7 @@ private TableTaskHistory constructNewTableTaskHistory(long currentTime) { tableTaskHistory.setRetry(optimizeRuntime.getRetry()); tableTaskHistory.setStartTime(currentTime); tableTaskHistory.setQueueId(optimizeTask.getQueueId()); + tableTaskHistory.setContainer(optimizeRuntime.getContainer()); return tableTaskHistory; } diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/OptimizeQueueService.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/OptimizeQueueService.java index 3807bebc22..305873bb27 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/OptimizeQueueService.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/OptimizeQueueService.java @@ -332,10 +332,10 @@ public List getQueues() throws NoSuchObjectException { } } - public OptimizeTask pollTask(int queueId, JobId jobId, String attemptId, long waitTime) + public OptimizeTask pollTask(int queueId, JobId jobId, String attemptId, long waitTime, String container) throws NoSuchObjectException, TException { try { - OptimizeTask task = getQueue(queueId).poll(jobId, attemptId, waitTime); + OptimizeTask task = getQueue(queueId).poll(jobId, attemptId, waitTime, container); if (task != null) { LOG.info("{} pollTask success, {}", jobId, task); } else { @@ -535,7 +535,7 @@ public String queueName() { optimizeQueue.getOptimizeQueueMeta().getQueueId(); } - public OptimizeTask poll(JobId jobId, final String attemptId, long waitTime) { + public OptimizeTask poll(JobId jobId, final String attemptId, long waitTime, String container) { long startTime = System.currentTimeMillis(); OptimizeTaskItem task = pollValidTask(); if (task == null) { @@ -545,17 +545,17 @@ public OptimizeTask poll(JobId jobId, final String attemptId, long waitTime) { return null; } } - return onExecuteOptimizeTask(task, jobId, attemptId); + return onExecuteOptimizeTask(task, jobId, attemptId, container); } - private OptimizeTask onExecuteOptimizeTask(OptimizeTaskItem task, JobId jobId, String attemptId) { + private OptimizeTask onExecuteOptimizeTask(OptimizeTaskItem task, JobId jobId, String attemptId, String container) { TableTaskHistory tableTaskHistory; try { // load files from sysdb task.setFiles(); // update max execute time task.setMaxExecuteTime(); - tableTaskHistory = task.onExecuting(jobId, attemptId); + tableTaskHistory = task.onExecuting(jobId, attemptId, container); } catch (Exception e) { task.clearFiles(); LOG.error("{} handle sysdb failed, try put task back into queue", task.getTaskId(), e); diff --git a/optimizer/src/main/java/com/netease/arctic/optimizer/flink/FlinkConsumer.java b/optimizer/src/main/java/com/netease/arctic/optimizer/flink/FlinkConsumer.java index b00e7e9666..d8483bade7 100644 --- a/optimizer/src/main/java/com/netease/arctic/optimizer/flink/FlinkConsumer.java +++ b/optimizer/src/main/java/com/netease/arctic/optimizer/flink/FlinkConsumer.java @@ -51,7 +51,7 @@ public void open(Configuration parameters) throws Exception { public void run(SourceContext sourceContext) throws Exception { while (running) { try { - TaskWrapper task = taskConsumer.pollTask(0); + TaskWrapper task = taskConsumer.pollTask(0, getRuntimeContext().getTaskNameWithSubtasks()); if (task != null) { sourceContext.collect(task); } else { diff --git a/optimizer/src/main/java/com/netease/arctic/optimizer/local/LocalOptimizer.java b/optimizer/src/main/java/com/netease/arctic/optimizer/local/LocalOptimizer.java index 39e8302b9c..d5fe19bd38 100644 --- a/optimizer/src/main/java/com/netease/arctic/optimizer/local/LocalOptimizer.java +++ b/optimizer/src/main/java/com/netease/arctic/optimizer/local/LocalOptimizer.java @@ -246,7 +246,7 @@ public Consumer() { public TaskWrapper pollTask() throws InterruptedException { while (!stopped) { try { - TaskWrapper task = baseTaskConsumer.pollTask(0); + TaskWrapper task = baseTaskConsumer.pollTask(0, ""); if (task != null) { LOG.info("poll task {}", task); return task; diff --git a/optimizer/src/main/java/com/netease/arctic/optimizer/operator/BaseTaskConsumer.java b/optimizer/src/main/java/com/netease/arctic/optimizer/operator/BaseTaskConsumer.java index a3f87291bd..efc827c59f 100644 --- a/optimizer/src/main/java/com/netease/arctic/optimizer/operator/BaseTaskConsumer.java +++ b/optimizer/src/main/java/com/netease/arctic/optimizer/operator/BaseTaskConsumer.java @@ -57,7 +57,7 @@ public BaseTaskConsumer(OptimizerConfig config) { * @return - return null if got no task */ public TaskWrapper pollTask() throws TException { - return pollTask(DEFAULT_POLL_WAIT_TIMEOUT); + return pollTask(DEFAULT_POLL_WAIT_TIMEOUT, ""); } /** @@ -65,16 +65,16 @@ public TaskWrapper pollTask() throws TException { * * @return - return null if got no task */ - public TaskWrapper pollTask(long timeout) throws TException { + public TaskWrapper pollTask(long timeout, String container) throws TException { int attemptId = Math.abs(ThreadLocalRandom.current().nextInt()); - OptimizeTask task = pollTask(attemptId, timeout); + OptimizeTask task = pollTask(attemptId, timeout, container); return task == null ? null : new TaskWrapper(task, attemptId); } - private OptimizeTask pollTask(int attemptId, long timeout) throws TException { + private OptimizeTask pollTask(int attemptId, long timeout, String container) throws TException { try { OptimizeManager.Iface optimizeManager = OptimizeManagerClientPools.getClient(config.getAmsUrl()); - return optimizeManager.pollTask(config.getQueueId(), jobId, attemptId + "", timeout); + return optimizeManager.pollTask(config.getQueueId(), jobId, attemptId + "", timeout, container); } catch (NoSuchObjectException e) { return null; } diff --git a/optimizer/src/main/java/com/netease/arctic/optimizer/operator/BaseTaskExecutor.java b/optimizer/src/main/java/com/netease/arctic/optimizer/operator/BaseTaskExecutor.java index 2018437215..0d258659ed 100644 --- a/optimizer/src/main/java/com/netease/arctic/optimizer/operator/BaseTaskExecutor.java +++ b/optimizer/src/main/java/com/netease/arctic/optimizer/operator/BaseTaskExecutor.java @@ -38,9 +38,9 @@ import com.netease.arctic.optimizer.operator.executor.NodeTask; import com.netease.arctic.optimizer.operator.executor.OptimizeTaskResult; import com.netease.arctic.optimizer.operator.executor.TableIdentificationInfo; -import com.netease.arctic.utils.ExceptionUtil; import com.netease.arctic.table.ArcticTable; import com.netease.arctic.table.TableProperties; +import com.netease.arctic.utils.ExceptionUtil; import com.netease.arctic.utils.SerializationUtils; import com.netease.arctic.utils.TableTypeUtil; import org.apache.commons.collections.CollectionUtils; diff --git a/optimizer/src/test/java/com/netease/arctic/optimizer/operator/FakeBaseConsumer.java b/optimizer/src/test/java/com/netease/arctic/optimizer/operator/FakeBaseConsumer.java index 2ce10806b1..a1b5f55109 100644 --- a/optimizer/src/test/java/com/netease/arctic/optimizer/operator/FakeBaseConsumer.java +++ b/optimizer/src/test/java/com/netease/arctic/optimizer/operator/FakeBaseConsumer.java @@ -50,7 +50,7 @@ public OptimizeTask feedTask() { } @Override - public TaskWrapper pollTask(long timeout) throws TException { + public TaskWrapper pollTask(long timeout, String container) throws TException { synchronized (this) { try { if (this.nextTaskToConsume == null) {