diff --git a/src/main/java/io/openmessaging/connect/runtime/common/QueuePartition.java b/src/main/java/io/openmessaging/connect/runtime/common/QueuePartition.java new file mode 100644 index 0000000..257d241 --- /dev/null +++ b/src/main/java/io/openmessaging/connect/runtime/common/QueuePartition.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.connect.runtime.common; + +import java.io.Serializable; +import java.util.Objects; + +/** + * A queue name and partition number + */ +public final class QueuePartition implements Serializable { + + private int hash = 0; + private final int partition; + private final String queue; + + public QueuePartition(String queue, int partition) { + this.partition = partition; + this.queue = queue; + } + + public int partition() { + return partition; + } + + public String queue() { + return queue; + } + + @Override + public int hashCode() { + if (hash != 0) + return hash; + final int prime = 31; + int result = 1; + result = prime * result + partition; + result = prime * result + Objects.hashCode(queue); + this.hash = result; + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + QueuePartition other = (QueuePartition) obj; + return partition == other.partition && Objects.equals(queue, other.queue); + } + + @Override + public String toString() { + return queue + "-" + partition; + } +} diff --git a/src/main/java/io/openmessaging/connect/runtime/connectorwrapper/Worker.java b/src/main/java/io/openmessaging/connect/runtime/connectorwrapper/Worker.java index 0f2d8f4..9b3c299 100644 --- a/src/main/java/io/openmessaging/connect/runtime/connectorwrapper/Worker.java +++ b/src/main/java/io/openmessaging/connect/runtime/connectorwrapper/Worker.java @@ -18,7 +18,6 @@ package io.openmessaging.connect.runtime.connectorwrapper; import io.netty.util.internal.ConcurrentSet; -import io.openmessaging.MessagingAccessPoint; import io.openmessaging.connect.runtime.common.ConnectKeyValue; import io.openmessaging.connect.runtime.config.ConnectConfig; import io.openmessaging.connect.runtime.config.RuntimeConfigDefine; @@ -31,15 +30,13 @@ import io.openmessaging.connector.api.data.Converter; import io.openmessaging.connector.api.data.SinkDataEntry; import io.openmessaging.connector.api.data.SourceDataEntry; +import io.openmessaging.connector.api.sink.SinkTask; import io.openmessaging.connector.api.source.SourceTask; +import io.openmessaging.consumer.PullConsumer; import io.openmessaging.producer.Producer; + import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -61,7 +58,7 @@ public class Worker { /** * Current running tasks. */ - private Set workingTasks = new ConcurrentSet<>(); + private Set workingTasks = new ConcurrentSet<>(); /** * Thread pool for connectors and tasks. @@ -85,8 +82,8 @@ public class Worker { private TaskPositionCommitService taskPositionCommitService; public Worker(ConnectConfig connectConfig, - PositionManagementService positionManagementService, - MessagingAccessWrapper messagingAccessWrapper) { + PositionManagementService positionManagementService, + MessagingAccessWrapper messagingAccessWrapper) { this.workerId = connectConfig.getWorkerId(); this.taskExecutor = Executors.newCachedThreadPool(); @@ -95,7 +92,7 @@ public Worker(ConnectConfig connectConfig, taskPositionCommitService = new TaskPositionCommitService(this); } - public void start(){ + public void start() { taskPositionCommitService.start(); } @@ -103,42 +100,43 @@ public void start(){ * Start a collection of connectors with the given configs. * If a connector is already started with the same configs, it will not start again. * If a connector is already started but not contained in the new configs, it will stop. + * * @param connectorConfigs * @throws Exception */ public synchronized void startConnectors(Map connectorConfigs) throws Exception { Set stoppedConnector = new HashSet<>(); - for(WorkerConnector workerConnector : workingConnectors){ + for (WorkerConnector workerConnector : workingConnectors) { String connectorName = workerConnector.getConnectorName(); ConnectKeyValue keyValue = connectorConfigs.get(connectorName); - if(null == keyValue || 0 != keyValue.getInt(RuntimeConfigDefine.CONFIG_DELETED)){ + if (null == keyValue || 0 != keyValue.getInt(RuntimeConfigDefine.CONFIG_DELETED)) { workerConnector.stop(); stoppedConnector.add(workerConnector); - }else if(!keyValue.equals(workerConnector.getKeyValue())){ + } else if (!keyValue.equals(workerConnector.getKeyValue())) { workerConnector.reconfigure(keyValue); } } workingConnectors.removeAll(stoppedConnector); - if(null == connectorConfigs || 0 == connectorConfigs.size()){ + if (null == connectorConfigs || 0 == connectorConfigs.size()) { return; } Map newConnectors = new HashMap<>(); - for(String connectorName : connectorConfigs.keySet()){ + for (String connectorName : connectorConfigs.keySet()) { boolean isNewConnector = true; - for(WorkerConnector workerConnector : workingConnectors){ - if(workerConnector.getConnectorName().equals(connectorName)){ + for (WorkerConnector workerConnector : workingConnectors) { + if (workerConnector.getConnectorName().equals(connectorName)) { isNewConnector = false; break; } } - if(isNewConnector){ + if (isNewConnector) { newConnectors.put(connectorName, connectorConfigs.get(connectorName)); } } - for(String connectorName : newConnectors.keySet()){ + for (String connectorName : newConnectors.keySet()) { ConnectKeyValue keyValue = newConnectors.get(connectorName); Class clazz = Class.forName(keyValue.getString(RuntimeConfigDefine.CONNECTOR_CLASS)); Connector connector = (Connector) clazz.newInstance(); @@ -152,46 +150,70 @@ public synchronized void startConnectors(Map connectorC * Start a collection of tasks with the given configs. * If a task is already started with the same configs, it will not start again. * If a task is already started but not contained in the new configs, it will stop. + * * @param taskConfigs * @throws Exception */ public synchronized void startTasks(Map> taskConfigs) throws Exception { - Set stoppedTasks = new HashSet<>(); - for(WorkerSourceTask workerSourceTask : workingTasks){ - String connectorName = workerSourceTask.getConnectorName(); + Set stoppedTasks = new HashSet<>(); + for (Runnable runnable : workingTasks) { + WorkerSourceTask workerSourceTask = null; + WorkerSinkTask workerSinkTask = null; + if (runnable instanceof WorkerSourceTask) { + workerSourceTask = (WorkerSourceTask) runnable; + } else { + workerSinkTask = (WorkerSinkTask) runnable; + } + + String connectorName = null != workerSourceTask ? workerSourceTask.getConnectorName() : workerSinkTask.getConnectorName(); + ConnectKeyValue taskConfig = null != workerSourceTask ? workerSourceTask.getTaskConfig() : workerSinkTask.getTaskConfig(); List keyValues = taskConfigs.get(connectorName); boolean needStop = true; - if(null != keyValues && keyValues.size() > 0){ - for(ConnectKeyValue keyValue : keyValues){ - if(keyValue.equals(workerSourceTask.getTaskConfig())){ + if (null != keyValues && keyValues.size() > 0) { + for (ConnectKeyValue keyValue : keyValues) { + if (keyValue.equals(taskConfig)) { needStop = false; break; } } } - if(needStop){ - workerSourceTask.stop(); - stoppedTasks.add(workerSourceTask); + if (needStop) { + if (null != workerSourceTask) { + workerSourceTask.stop(); + stoppedTasks.add(workerSourceTask); + } else { + workerSinkTask.stop(); + stoppedTasks.add(workerSinkTask); + } + } } workingTasks.removeAll(stoppedTasks); - if (null == taskConfigs || 0 == taskConfigs.size()){ + if (null == taskConfigs || 0 == taskConfigs.size()) { return; } Map> newTasks = new HashMap<>(); - for(String connectorName : taskConfigs.keySet()){ - for(ConnectKeyValue keyValue : taskConfigs.get(connectorName)){ + for (String connectorName : taskConfigs.keySet()) { + for (ConnectKeyValue keyValue : taskConfigs.get(connectorName)) { boolean isNewTask = true; - for(WorkerSourceTask workeringTask : workingTasks){ - if(keyValue.equals(workeringTask.getTaskConfig())){ + for (Runnable runnable : workingTasks) { + WorkerSourceTask workerSourceTask = null; + WorkerSinkTask workerSinkTask = null; + if (runnable instanceof WorkerSourceTask) { + workerSourceTask = (WorkerSourceTask) runnable; + } else { + workerSinkTask = (WorkerSinkTask) runnable; + } + ConnectKeyValue taskConfig = null != workerSourceTask ? workerSourceTask.getTaskConfig() : workerSinkTask.getTaskConfig(); + if (keyValue.equals(taskConfig)) { isNewTask = false; break; } } - if(isNewTask){ - if(!newTasks.containsKey(connectorName)){ + if (isNewTask) { + if (!newTasks.containsKey(connectorName)) { newTasks.put(connectorName, new ArrayList<>()); } newTasks.get(connectorName).add(keyValue); @@ -199,23 +221,29 @@ public synchronized void startTasks(Map> taskConfi } } - for(String connectorName : newTasks.keySet()){ - for(ConnectKeyValue keyValue : newTasks.get(connectorName)){ + for (String connectorName : newTasks.keySet()) { + for (ConnectKeyValue keyValue : newTasks.get(connectorName)) { Class taskClazz = Class.forName(keyValue.getString(RuntimeConfigDefine.TASK_CLASS)); Task task = (Task) taskClazz.newInstance(); Class converterClazz = Class.forName(keyValue.getString(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER)); Converter recordConverter = (Converter) converterClazz.newInstance(); - if(task instanceof SourceTask){ + if (task instanceof SourceTask) { Producer producer = messagingAccessWrapper - .getMessageAccessPoint(keyValue.getString(RuntimeConfigDefine.OMS_DRIVER_URL)).createProducer(); + .getMessageAccessPoint(keyValue.getString(RuntimeConfigDefine.OMS_DRIVER_URL)).createProducer(); producer.startup(); WorkerSourceTask workerSourceTask = new WorkerSourceTask(connectorName, - (SourceTask) task, keyValue, - new PositionStorageReaderImpl(positionManagementService), recordConverter, producer); + (SourceTask) task, keyValue, + new PositionStorageReaderImpl(positionManagementService), recordConverter, producer); this.taskExecutor.submit(workerSourceTask); this.workingTasks.add(workerSourceTask); + } else if (task instanceof SinkTask) { + PullConsumer consumer = messagingAccessWrapper.getMessageAccessPoint(keyValue.getString(RuntimeConfigDefine.OMS_DRIVER_URL)).createPullConsumer(); + consumer.startup(); + WorkerSinkTask workerSinkTask = new WorkerSinkTask(connectorName, (SinkTask) task, keyValue, recordConverter, consumer); + this.taskExecutor.submit(workerSinkTask); + this.workingTasks.add(workerSinkTask); } } } @@ -226,8 +254,10 @@ public synchronized void startTasks(Map> taskConfi */ public void commitTaskPosition() { Map positionData = new HashMap<>(); - for(WorkerSourceTask task : workingTasks){ - positionData.putAll(task.getPositionData()); + for (Runnable task : workingTasks) { + if (task instanceof WorkerSourceTask) { + positionData.putAll(((WorkerSourceTask) task).getPositionData()); + } } positionManagementService.putPosition(positionData); } @@ -245,15 +275,15 @@ public Set getWorkingConnectors() { } public void setWorkingConnectors( - Set workingConnectors) { + Set workingConnectors) { this.workingConnectors = workingConnectors; } - public Set getWorkingTasks() { + public Set getWorkingTasks() { return workingTasks; } - public void setWorkingTasks(Set workingTasks) { + public void setWorkingTasks(Set workingTasks) { this.workingTasks = workingTasks; } } diff --git a/src/main/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerSinkTask.java b/src/main/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerSinkTask.java new file mode 100644 index 0000000..0f44917 --- /dev/null +++ b/src/main/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerSinkTask.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.connect.runtime.connectorwrapper; + +import com.alibaba.fastjson.JSON; +import io.openmessaging.KeyValue; +import io.openmessaging.Message; +import io.openmessaging.connect.runtime.common.ConnectKeyValue; +import io.openmessaging.connect.runtime.common.LoggerName; +import io.openmessaging.connect.runtime.common.QueuePartition; +import io.openmessaging.connector.api.data.Converter; +import io.openmessaging.connector.api.data.SinkDataEntry; +import io.openmessaging.connector.api.data.SourceDataEntry; +import io.openmessaging.connector.api.sink.SinkTask; +import io.openmessaging.connector.api.sink.SinkTaskContext; +import io.openmessaging.consumer.PullConsumer; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A wrapper of {@link SinkTask} for runtime. + */ +public class WorkerSinkTask implements Runnable { + + private static final Logger log = LoggerFactory.getLogger(LoggerName.OMS_RUNTIME); + + /** + * The configuration key that provides the list of queueNames that are inputs for this + * SinkTask. + */ + public static final String QUEUENAMES_CONFIG = "queueNames"; + + /** + * Connector name of current task. + */ + private String connectorName; + + /** + * The implements of the sink task. + */ + private SinkTask sinkTask; + + /** + * The configs of current sink task. + */ + private ConnectKeyValue taskConfig; + + /** + * A switch for the sink task. + */ + private AtomicBoolean isStopping; + + /** + * A OMS consumer to pull message from MQ. + */ + private PullConsumer consumer; + + /** + * A converter to parse sink data entry to object. + */ + private Converter recordConverter; + + public ConcurrentHashMap partitionStatusMap; + + public ConcurrentHashMap partitionOffsetMap; + + public WorkerSinkTask(String connectorName, + SinkTask sinkTask, + ConnectKeyValue taskConfig, + Converter recordConverter, + PullConsumer consumer) { + this.connectorName = connectorName; + this.sinkTask = sinkTask; + this.taskConfig = taskConfig; + this.isStopping = new AtomicBoolean(false); + this.consumer = consumer; + this.recordConverter = recordConverter; + } + + /** + * Start a sink task, and receive data entry from MQ cyclically. + */ + @Override + public void run() { + try { + sinkTask.initialize(new SinkTaskContext() { + @Override + public void resetOffset(String queueName, Long offset) { + //TODO oms-1.0.0-alpha支持获取, 并且queueName 是否需要换成QueuePartition + QueuePartition queuePartition = new QueuePartition(queueName, 0); + partitionOffsetMap.put(queuePartition, offset); + } + + @Override + public void resetOffset(Map offsets) { + //TODO oms-1.0.0-alpha支持获取, 并且queueName 是否需要换成QueuePartition + for (Map.Entry entry : offsets.entrySet()) { + QueuePartition queuePartition = new QueuePartition(entry.getKey(), 0); + partitionOffsetMap.put(queuePartition, entry.getValue()); + } + } + + @Override + public void pause(List queueNames) { + //TODO queueName 是否需要换成QueuePartition + if (null != queueNames && queueNames.size() > 0) { + for (String queueName : queueNames) { + QueuePartition queuePartition = new QueuePartition(queueName, 0); + partitionStatusMap.put(queuePartition, PartitionStatus.PAUSE); + } + } + } + + @Override + public void resume(List queueNames) { + //TODO queueName 是否需要换成QueuePartition + if (null != queueNames && queueNames.size() > 0) { + for (String queueName : queueNames) { + QueuePartition queuePartition = new QueuePartition(queueName, 0); + partitionStatusMap.remove(queuePartition); + } + } + } + + @Override + public KeyValue configs() { + return taskConfig; + } + }); + String queueNamesStr = taskConfig.getString(QUEUENAMES_CONFIG); + if (!StringUtils.isEmpty(queueNamesStr)) { + String[] queueNames = queueNamesStr.split(","); + for (String queueName : queueNames) { + consumer.attachQueue(queueName); + } + log.debug("{} Initializing and starting task for queueNames {}", this, queueNames); + } else { + log.error("Lack of sink comsume queueNames config"); + } + sinkTask.start(taskConfig); + + while (!isStopping.get()) { + final Message message = consumer.receive(); + List messages = new ArrayList<>(16); + messages.add(message); + checkPause(messages); + receiveMessages(messages); + for (Message message1 : messages) { + String msgId = message1.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID); + log.info("Received one message success : {}", msgId); + //TODO 更新queue offset +// partitionOffsetMap.put() + consumer.ack(msgId); + } + } + log.info("Task stop, config:{}", JSON.toJSONString(taskConfig)); + } catch (Exception e) { + log.error("Run task failed {}.", e); + } + } + + private void checkPause(List messages) { + final Iterator iterator = messages.iterator(); + while (iterator.hasNext()) { + final Message message = iterator.next(); + String queueName = message.sysHeaders().getString("DESTINATION"); + //TODO 缺失partition + QueuePartition queuePartition = new QueuePartition(queueName, 0); + if (null != partitionStatusMap.get(queuePartition)) { + String msgId = message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID); + log.info("QueueName {}, partition {} pause, Discard the message {}", queueName, 0, msgId); + iterator.remove(); + } + } + } + + public void stop() { + isStopping.set(true); + consumer.shutdown(); + sinkTask.stop(); + } + + /** + * receive message from MQ. + * + * @param messages + */ + private void receiveMessages(List messages) { + List sinkDataEntries = new ArrayList<>(8); + for (Message message : messages) { + SinkDataEntry sinkDataEntry = convertToSinkDataEntry(message); + sinkDataEntries.add(sinkDataEntry); + } + sinkTask.put(sinkDataEntries); + } + + private SinkDataEntry convertToSinkDataEntry(Message message) { + String queueName = message.sysHeaders().getString("DESTINATION"); + final byte[] messageBody = message.getBody(byte[].class); + final SourceDataEntry sourceDataEntry = JSON.parseObject(new String(messageBody), SourceDataEntry.class); + final Object[] payload = sourceDataEntry.getPayload(); + final byte[] decodeBytes = Base64.getDecoder().decode((String) payload[0]); + final Object recodeObject = recordConverter.byteToObject(decodeBytes); + Object[] newObject = new Object[1]; + newObject[0] = recodeObject; + //TODO oms-1.0.0-alpha支持获取offset,并且支持批量获取消息,SinkDataEntry & SourceDataEntry 是否要增加partition相关属性 + SinkDataEntry sinkDataEntry = new SinkDataEntry(10L, sourceDataEntry.getTimestamp(), sourceDataEntry.getEntryType(), queueName, sourceDataEntry.getSchema(), newObject); + sinkDataEntry.setPayload(newObject); + return sinkDataEntry; + } + + public String getConnectorName() { + return connectorName; + } + + public ConnectKeyValue getTaskConfig() { + return taskConfig; + } + + @Override + public String toString() { + + StringBuilder sb = new StringBuilder(); + sb.append("connectorName:" + connectorName) + .append("\nConfigs:" + JSON.toJSONString(taskConfig)); + return sb.toString(); + } + + private enum PartitionStatus { + PAUSE; + } +} diff --git a/src/main/java/io/openmessaging/connect/runtime/rest/RestHandler.java b/src/main/java/io/openmessaging/connect/runtime/rest/RestHandler.java index 199d454..3f48dc7 100644 --- a/src/main/java/io/openmessaging/connect/runtime/rest/RestHandler.java +++ b/src/main/java/io/openmessaging/connect/runtime/rest/RestHandler.java @@ -55,15 +55,15 @@ public RestHandler(ConnectController connectController){ private void getAllocatedInfo(Context context){ Set workerConnectors = connectController.getWorker().getWorkingConnectors(); - Set workerSourceTasks = connectController.getWorker().getWorkingTasks(); + Set workerSourceTasks = connectController.getWorker().getWorkingTasks(); StringBuilder sb = new StringBuilder(); sb.append("working connectors:\n"); for(WorkerConnector workerConnector : workerConnectors){ sb.append(workerConnector.toString()+"\n"); } sb.append("working tasks:\n"); - for(WorkerSourceTask workerSourceTask : workerSourceTasks){ - sb.append(workerSourceTask.toString()+"\n"); + for(Runnable runnable : workerSourceTasks){ + sb.append(runnable.toString()+"\n"); } context.result(sb.toString()); } diff --git a/src/test/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerTest.java b/src/test/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerTest.java index 2e15fac..6444f4b 100644 --- a/src/test/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerTest.java +++ b/src/test/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerTest.java @@ -81,12 +81,12 @@ public void init() { worker.setWorkingConnectors(workingConnectors); assertThat(worker.getWorkingConnectors().size()).isEqualTo(3); - Set workerSourceTasks = new HashSet<>(); + Set runnables = new HashSet<>(); for (int i=0; i<3; i++) { ConnectKeyValue connectKeyValue = new ConnectKeyValue(); connectKeyValue.getProperties().put("key1", "TEST-TASK-" + i + "1"); connectKeyValue.getProperties().put("key2", "TEST-TASK-" + i + "2"); - workerSourceTasks.add(new WorkerSourceTask("TEST-CONN-" + i, + runnables.add(new WorkerSourceTask("TEST-CONN-" + i, new TestSourceTask(), connectKeyValue, new TestPositionStorageReader(), @@ -94,7 +94,7 @@ public void init() { producer )); } - worker.setWorkingTasks(workerSourceTasks); + worker.setWorkingTasks(runnables); assertThat(worker.getWorkingTasks().size()).isEqualTo(3); worker.start(); @@ -150,10 +150,18 @@ public void testStartTasks() { e.printStackTrace(); } - Set sourceTasks = worker.getWorkingTasks(); + Set sourceTasks = worker.getWorkingTasks(); assertThat(sourceTasks.size()).isEqualTo(3); - for (WorkerSourceTask wst: sourceTasks) { - assertThat(wst.getConnectorName()).isIn("TEST-CONN-1", "TEST-CONN-2", "TEST-CONN-3"); + for (Runnable runnable : sourceTasks) { + WorkerSourceTask workerSourceTask = null; + WorkerSinkTask workerSinkTask = null; + if (runnable instanceof WorkerSourceTask) { + workerSourceTask = (WorkerSourceTask) runnable; + } else { + workerSinkTask = (WorkerSinkTask) runnable; + } + String connectorName = null != workerSourceTask ? workerSourceTask.getConnectorName() : workerSinkTask.getConnectorName(); + assertThat(connectorName).isIn("TEST-CONN-1", "TEST-CONN-2", "TEST-CONN-3"); } } } diff --git a/src/test/java/io/openmessaging/connect/runtime/rest/RestHandlerTest.java b/src/test/java/io/openmessaging/connect/runtime/rest/RestHandlerTest.java index 3b9e6b1..5f72fa2 100644 --- a/src/test/java/io/openmessaging/connect/runtime/rest/RestHandlerTest.java +++ b/src/test/java/io/openmessaging/connect/runtime/rest/RestHandlerTest.java @@ -127,7 +127,7 @@ public class RestHandlerTest { private Set workerConnectors; - private Set workerSourceTasks; + private Set workerTasks; @Before public void init() throws Exception { @@ -196,7 +196,7 @@ public void init() throws Exception { }; WorkerSourceTask workerSourceTask1 = new WorkerSourceTask("testConnectorName1", sourceTask, connectKeyValue, positionStorageReader, converter, producer); WorkerSourceTask workerSourceTask2 = new WorkerSourceTask("testConnectorName2", sourceTask, connectKeyValue1, positionStorageReader, converter, producer); - workerSourceTasks = new HashSet() { + workerTasks = new HashSet() { { add(workerSourceTask1); add(workerSourceTask2); @@ -204,7 +204,7 @@ public void init() throws Exception { }; when(connectController.getWorker()).thenReturn(worker); when(worker.getWorkingConnectors()).thenReturn(workerConnectors); - when(worker.getWorkingTasks()).thenReturn(workerSourceTasks); + when(worker.getWorkingTasks()).thenReturn(workerTasks); restHandler = new RestHandler(connectController); @@ -254,8 +254,8 @@ public void testRESTful() throws Exception { sb.append(workerConnector.toString() + "\n"); } sb.append("working tasks:\n"); - for (WorkerSourceTask workerSourceTask : workerSourceTasks) { - sb.append(workerSourceTask.toString() + "\n"); + for (Runnable runnable : workerTasks) { + sb.append(runnable.toString() + "\n"); } assertEquals(sb.toString(), EntityUtils.toString(httpResponse4.getEntity(), "UTF-8")); }