From a4e85033414828e56d015776c6046721f79c9bed Mon Sep 17 00:00:00 2001 From: zhoubo0404 <877036922@qq.com> Date: Tue, 16 Apr 2019 20:50:13 +0800 Subject: [PATCH 1/3] Add workerSinkTask --- .../runtime/connectorwrapper/Worker.java | 124 ++++++---- .../connectorwrapper/WorkerSinkTask.java | 222 ++++++++++++++++++ .../connect/runtime/rest/RestHandler.java | 6 +- .../runtime/connectorwrapper/WorkerTest.java | 20 +- .../connect/runtime/rest/RestHandlerTest.java | 10 +- 5 files changed, 321 insertions(+), 61 deletions(-) create mode 100644 src/main/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerSinkTask.java diff --git a/src/main/java/io/openmessaging/connect/runtime/connectorwrapper/Worker.java b/src/main/java/io/openmessaging/connect/runtime/connectorwrapper/Worker.java index 0f2d8f4..9b3c299 100644 --- a/src/main/java/io/openmessaging/connect/runtime/connectorwrapper/Worker.java +++ b/src/main/java/io/openmessaging/connect/runtime/connectorwrapper/Worker.java @@ -18,7 +18,6 @@ package io.openmessaging.connect.runtime.connectorwrapper; import io.netty.util.internal.ConcurrentSet; -import io.openmessaging.MessagingAccessPoint; import io.openmessaging.connect.runtime.common.ConnectKeyValue; import io.openmessaging.connect.runtime.config.ConnectConfig; import io.openmessaging.connect.runtime.config.RuntimeConfigDefine; @@ -31,15 +30,13 @@ import io.openmessaging.connector.api.data.Converter; import io.openmessaging.connector.api.data.SinkDataEntry; import io.openmessaging.connector.api.data.SourceDataEntry; +import io.openmessaging.connector.api.sink.SinkTask; import io.openmessaging.connector.api.source.SourceTask; +import io.openmessaging.consumer.PullConsumer; import io.openmessaging.producer.Producer; + import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -61,7 +58,7 @@ public class Worker { /** * Current running tasks. */ - private Set workingTasks = new ConcurrentSet<>(); + private Set workingTasks = new ConcurrentSet<>(); /** * Thread pool for connectors and tasks. @@ -85,8 +82,8 @@ public class Worker { private TaskPositionCommitService taskPositionCommitService; public Worker(ConnectConfig connectConfig, - PositionManagementService positionManagementService, - MessagingAccessWrapper messagingAccessWrapper) { + PositionManagementService positionManagementService, + MessagingAccessWrapper messagingAccessWrapper) { this.workerId = connectConfig.getWorkerId(); this.taskExecutor = Executors.newCachedThreadPool(); @@ -95,7 +92,7 @@ public Worker(ConnectConfig connectConfig, taskPositionCommitService = new TaskPositionCommitService(this); } - public void start(){ + public void start() { taskPositionCommitService.start(); } @@ -103,42 +100,43 @@ public void start(){ * Start a collection of connectors with the given configs. * If a connector is already started with the same configs, it will not start again. * If a connector is already started but not contained in the new configs, it will stop. + * * @param connectorConfigs * @throws Exception */ public synchronized void startConnectors(Map connectorConfigs) throws Exception { Set stoppedConnector = new HashSet<>(); - for(WorkerConnector workerConnector : workingConnectors){ + for (WorkerConnector workerConnector : workingConnectors) { String connectorName = workerConnector.getConnectorName(); ConnectKeyValue keyValue = connectorConfigs.get(connectorName); - if(null == keyValue || 0 != keyValue.getInt(RuntimeConfigDefine.CONFIG_DELETED)){ + if (null == keyValue || 0 != keyValue.getInt(RuntimeConfigDefine.CONFIG_DELETED)) { workerConnector.stop(); stoppedConnector.add(workerConnector); - }else if(!keyValue.equals(workerConnector.getKeyValue())){ + } else if (!keyValue.equals(workerConnector.getKeyValue())) { workerConnector.reconfigure(keyValue); } } workingConnectors.removeAll(stoppedConnector); - if(null == connectorConfigs || 0 == connectorConfigs.size()){ + if (null == connectorConfigs || 0 == connectorConfigs.size()) { return; } Map newConnectors = new HashMap<>(); - for(String connectorName : connectorConfigs.keySet()){ + for (String connectorName : connectorConfigs.keySet()) { boolean isNewConnector = true; - for(WorkerConnector workerConnector : workingConnectors){ - if(workerConnector.getConnectorName().equals(connectorName)){ + for (WorkerConnector workerConnector : workingConnectors) { + if (workerConnector.getConnectorName().equals(connectorName)) { isNewConnector = false; break; } } - if(isNewConnector){ + if (isNewConnector) { newConnectors.put(connectorName, connectorConfigs.get(connectorName)); } } - for(String connectorName : newConnectors.keySet()){ + for (String connectorName : newConnectors.keySet()) { ConnectKeyValue keyValue = newConnectors.get(connectorName); Class clazz = Class.forName(keyValue.getString(RuntimeConfigDefine.CONNECTOR_CLASS)); Connector connector = (Connector) clazz.newInstance(); @@ -152,46 +150,70 @@ public synchronized void startConnectors(Map connectorC * Start a collection of tasks with the given configs. * If a task is already started with the same configs, it will not start again. * If a task is already started but not contained in the new configs, it will stop. + * * @param taskConfigs * @throws Exception */ public synchronized void startTasks(Map> taskConfigs) throws Exception { - Set stoppedTasks = new HashSet<>(); - for(WorkerSourceTask workerSourceTask : workingTasks){ - String connectorName = workerSourceTask.getConnectorName(); + Set stoppedTasks = new HashSet<>(); + for (Runnable runnable : workingTasks) { + WorkerSourceTask workerSourceTask = null; + WorkerSinkTask workerSinkTask = null; + if (runnable instanceof WorkerSourceTask) { + workerSourceTask = (WorkerSourceTask) runnable; + } else { + workerSinkTask = (WorkerSinkTask) runnable; + } + + String connectorName = null != workerSourceTask ? workerSourceTask.getConnectorName() : workerSinkTask.getConnectorName(); + ConnectKeyValue taskConfig = null != workerSourceTask ? workerSourceTask.getTaskConfig() : workerSinkTask.getTaskConfig(); List keyValues = taskConfigs.get(connectorName); boolean needStop = true; - if(null != keyValues && keyValues.size() > 0){ - for(ConnectKeyValue keyValue : keyValues){ - if(keyValue.equals(workerSourceTask.getTaskConfig())){ + if (null != keyValues && keyValues.size() > 0) { + for (ConnectKeyValue keyValue : keyValues) { + if (keyValue.equals(taskConfig)) { needStop = false; break; } } } - if(needStop){ - workerSourceTask.stop(); - stoppedTasks.add(workerSourceTask); + if (needStop) { + if (null != workerSourceTask) { + workerSourceTask.stop(); + stoppedTasks.add(workerSourceTask); + } else { + workerSinkTask.stop(); + stoppedTasks.add(workerSinkTask); + } + } } workingTasks.removeAll(stoppedTasks); - if (null == taskConfigs || 0 == taskConfigs.size()){ + if (null == taskConfigs || 0 == taskConfigs.size()) { return; } Map> newTasks = new HashMap<>(); - for(String connectorName : taskConfigs.keySet()){ - for(ConnectKeyValue keyValue : taskConfigs.get(connectorName)){ + for (String connectorName : taskConfigs.keySet()) { + for (ConnectKeyValue keyValue : taskConfigs.get(connectorName)) { boolean isNewTask = true; - for(WorkerSourceTask workeringTask : workingTasks){ - if(keyValue.equals(workeringTask.getTaskConfig())){ + for (Runnable runnable : workingTasks) { + WorkerSourceTask workerSourceTask = null; + WorkerSinkTask workerSinkTask = null; + if (runnable instanceof WorkerSourceTask) { + workerSourceTask = (WorkerSourceTask) runnable; + } else { + workerSinkTask = (WorkerSinkTask) runnable; + } + ConnectKeyValue taskConfig = null != workerSourceTask ? workerSourceTask.getTaskConfig() : workerSinkTask.getTaskConfig(); + if (keyValue.equals(taskConfig)) { isNewTask = false; break; } } - if(isNewTask){ - if(!newTasks.containsKey(connectorName)){ + if (isNewTask) { + if (!newTasks.containsKey(connectorName)) { newTasks.put(connectorName, new ArrayList<>()); } newTasks.get(connectorName).add(keyValue); @@ -199,23 +221,29 @@ public synchronized void startTasks(Map> taskConfi } } - for(String connectorName : newTasks.keySet()){ - for(ConnectKeyValue keyValue : newTasks.get(connectorName)){ + for (String connectorName : newTasks.keySet()) { + for (ConnectKeyValue keyValue : newTasks.get(connectorName)) { Class taskClazz = Class.forName(keyValue.getString(RuntimeConfigDefine.TASK_CLASS)); Task task = (Task) taskClazz.newInstance(); Class converterClazz = Class.forName(keyValue.getString(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER)); Converter recordConverter = (Converter) converterClazz.newInstance(); - if(task instanceof SourceTask){ + if (task instanceof SourceTask) { Producer producer = messagingAccessWrapper - .getMessageAccessPoint(keyValue.getString(RuntimeConfigDefine.OMS_DRIVER_URL)).createProducer(); + .getMessageAccessPoint(keyValue.getString(RuntimeConfigDefine.OMS_DRIVER_URL)).createProducer(); producer.startup(); WorkerSourceTask workerSourceTask = new WorkerSourceTask(connectorName, - (SourceTask) task, keyValue, - new PositionStorageReaderImpl(positionManagementService), recordConverter, producer); + (SourceTask) task, keyValue, + new PositionStorageReaderImpl(positionManagementService), recordConverter, producer); this.taskExecutor.submit(workerSourceTask); this.workingTasks.add(workerSourceTask); + } else if (task instanceof SinkTask) { + PullConsumer consumer = messagingAccessWrapper.getMessageAccessPoint(keyValue.getString(RuntimeConfigDefine.OMS_DRIVER_URL)).createPullConsumer(); + consumer.startup(); + WorkerSinkTask workerSinkTask = new WorkerSinkTask(connectorName, (SinkTask) task, keyValue, recordConverter, consumer); + this.taskExecutor.submit(workerSinkTask); + this.workingTasks.add(workerSinkTask); } } } @@ -226,8 +254,10 @@ public synchronized void startTasks(Map> taskConfi */ public void commitTaskPosition() { Map positionData = new HashMap<>(); - for(WorkerSourceTask task : workingTasks){ - positionData.putAll(task.getPositionData()); + for (Runnable task : workingTasks) { + if (task instanceof WorkerSourceTask) { + positionData.putAll(((WorkerSourceTask) task).getPositionData()); + } } positionManagementService.putPosition(positionData); } @@ -245,15 +275,15 @@ public Set getWorkingConnectors() { } public void setWorkingConnectors( - Set workingConnectors) { + Set workingConnectors) { this.workingConnectors = workingConnectors; } - public Set getWorkingTasks() { + public Set getWorkingTasks() { return workingTasks; } - public void setWorkingTasks(Set workingTasks) { + public void setWorkingTasks(Set workingTasks) { this.workingTasks = workingTasks; } } diff --git a/src/main/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerSinkTask.java b/src/main/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerSinkTask.java new file mode 100644 index 0000000..4f587a5 --- /dev/null +++ b/src/main/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerSinkTask.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.connect.runtime.connectorwrapper; + +import com.alibaba.fastjson.JSON; +import io.openmessaging.KeyValue; +import io.openmessaging.Message; +import io.openmessaging.connect.runtime.common.ConnectKeyValue; +import io.openmessaging.connect.runtime.common.LoggerName; +import io.openmessaging.connector.api.data.Converter; +import io.openmessaging.connector.api.data.SinkDataEntry; +import io.openmessaging.connector.api.data.SourceDataEntry; +import io.openmessaging.connector.api.sink.SinkTask; +import io.openmessaging.connector.api.sink.SinkTaskContext; +import io.openmessaging.consumer.PullConsumer; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A wrapper of {@link SinkTask} for runtime. + */ +public class WorkerSinkTask implements Runnable { + + private static final Logger log = LoggerFactory.getLogger(LoggerName.OMS_RUNTIME); + + /** + *

+ * The configuration key that provides the list of topics that are inputs for this + * SinkTask. + *

+ */ + public static final String TOPICS_CONFIG = "topics"; + + /** + *

+ * The configuration key that provides a regex specifying which topics to include as inputs + * for this SinkTask. + *

+ */ + public static final String TOPICS_REGEX_CONFIG = "topics.regex"; + + /** + * Connector name of current task. + */ + private String connectorName; + + /** + * The implements of the sink task. + */ + private SinkTask sinkTask; + + /** + * The configs of current sink task. + */ + private ConnectKeyValue taskConfig; + + /** + * A switch for the sink task. + */ + private AtomicBoolean isStopping; + + /** + * A OMS consumer to pull message from MQ. + */ + private PullConsumer consumer; + + /** + * A converter to parse sink data entry to byte[]. + */ + private Converter recordConverter; + + /** + * Current position info of the sink task. + */ + private Map positionData = new HashMap<>(); + + public WorkerSinkTask(String connectorName, + SinkTask sinkTask, + ConnectKeyValue taskConfig, + Converter recordConverter, + PullConsumer consumer) { + this.connectorName = connectorName; + this.sinkTask = sinkTask; + this.taskConfig = taskConfig; + this.isStopping = new AtomicBoolean(false); + this.consumer = consumer; + this.recordConverter = recordConverter; + } + + /** + * Start a sink task, and send data entry to MQ cyclically. + */ + @Override + public void run() { + try { + sinkTask.initialize(new SinkTaskContext() { + @Override + public void resetOffset(String queueName, Long offset) { + + } + + @Override + public void resetOffset(Map offsets) { + + } + + @Override + public void pause(List queueName) { + + } + + @Override + public void resume(List queueName) { + + } + + @Override + public KeyValue configs() { + return taskConfig; + } + }); + String topicsStr = taskConfig.getString(TOPICS_CONFIG); + if (!StringUtils.isEmpty(topicsStr)) { + String[] topics = topicsStr.split(","); + for (String topic : topics) { + consumer.attachQueue(topic); + } + log.debug("{} Initializing and starting task for topics {}", this, topics); + } else { + // TODO 通过正则表达式订阅topic + /* String topicsRegexStr = taskConfig.getString(TOPICS_REGEX_CONFIG); + Pattern pattern = Pattern.compile(topicsRegexStr); + consumer.attachQueue(pattern) + log.debug("{} Initializing and starting task for topics regex {}", this, topicsRegexStr);*/ + } + sinkTask.start(taskConfig); + log.info("Task start, config:{}", JSON.toJSONString(taskConfig)); + + while (!isStopping.get()) { + final Message message = consumer.receive(); + if (null != message) { + receiveRecord(message); + String msgId = message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID); + log.info("Received one message: %s%n", msgId); + consumer.ack(msgId); + } + } + log.info("Task stop, config:{}", JSON.toJSONString(taskConfig)); + } catch (Exception e) { + log.error("Run task failed.", e); + } + } + + public Map getPositionData() { + return positionData; + } + + public void stop() { + isStopping.set(true); + consumer.shutdown(); + sinkTask.stop(); + } + + /** + * receive message from MQ. + * + * @param message + */ + private void receiveRecord(Message message) { + String queueName = message.sysHeaders().getString("DESTINATION"); + final byte[] messageBody = message.getBody(byte[].class); + final SourceDataEntry sourceDataEntry = JSON.parseObject(new String(messageBody), SourceDataEntry.class); + final Object[] payload = sourceDataEntry.getPayload(); + final byte[] decodeBytes = Base64.getDecoder().decode((String) payload[0]); + final Object recodeObject = recordConverter.byteToObject(decodeBytes); + Object[] newObject = new Object[1]; + newObject[0] = recodeObject; + //TODO queueOffset如何获得 + SinkDataEntry sinkDataEntry = new SinkDataEntry(10L, sourceDataEntry.getTimestamp(), sourceDataEntry.getEntryType(), queueName, sourceDataEntry.getSchema(), newObject); + sinkDataEntry.setPayload(newObject); + List sinkDataEntries = new ArrayList<>(8); + sinkDataEntries.add(sinkDataEntry); + sinkTask.put(sinkDataEntries); + } + + public String getConnectorName() { + return connectorName; + } + + public ConnectKeyValue getTaskConfig() { + return taskConfig; + } + + @Override + public String toString() { + + StringBuilder sb = new StringBuilder(); + sb.append("connectorName:" + connectorName) + .append("\nConfigs:" + JSON.toJSONString(taskConfig)); + return sb.toString(); + } +} diff --git a/src/main/java/io/openmessaging/connect/runtime/rest/RestHandler.java b/src/main/java/io/openmessaging/connect/runtime/rest/RestHandler.java index 199d454..3f48dc7 100644 --- a/src/main/java/io/openmessaging/connect/runtime/rest/RestHandler.java +++ b/src/main/java/io/openmessaging/connect/runtime/rest/RestHandler.java @@ -55,15 +55,15 @@ public RestHandler(ConnectController connectController){ private void getAllocatedInfo(Context context){ Set workerConnectors = connectController.getWorker().getWorkingConnectors(); - Set workerSourceTasks = connectController.getWorker().getWorkingTasks(); + Set workerSourceTasks = connectController.getWorker().getWorkingTasks(); StringBuilder sb = new StringBuilder(); sb.append("working connectors:\n"); for(WorkerConnector workerConnector : workerConnectors){ sb.append(workerConnector.toString()+"\n"); } sb.append("working tasks:\n"); - for(WorkerSourceTask workerSourceTask : workerSourceTasks){ - sb.append(workerSourceTask.toString()+"\n"); + for(Runnable runnable : workerSourceTasks){ + sb.append(runnable.toString()+"\n"); } context.result(sb.toString()); } diff --git a/src/test/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerTest.java b/src/test/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerTest.java index 2e15fac..6444f4b 100644 --- a/src/test/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerTest.java +++ b/src/test/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerTest.java @@ -81,12 +81,12 @@ public void init() { worker.setWorkingConnectors(workingConnectors); assertThat(worker.getWorkingConnectors().size()).isEqualTo(3); - Set workerSourceTasks = new HashSet<>(); + Set runnables = new HashSet<>(); for (int i=0; i<3; i++) { ConnectKeyValue connectKeyValue = new ConnectKeyValue(); connectKeyValue.getProperties().put("key1", "TEST-TASK-" + i + "1"); connectKeyValue.getProperties().put("key2", "TEST-TASK-" + i + "2"); - workerSourceTasks.add(new WorkerSourceTask("TEST-CONN-" + i, + runnables.add(new WorkerSourceTask("TEST-CONN-" + i, new TestSourceTask(), connectKeyValue, new TestPositionStorageReader(), @@ -94,7 +94,7 @@ public void init() { producer )); } - worker.setWorkingTasks(workerSourceTasks); + worker.setWorkingTasks(runnables); assertThat(worker.getWorkingTasks().size()).isEqualTo(3); worker.start(); @@ -150,10 +150,18 @@ public void testStartTasks() { e.printStackTrace(); } - Set sourceTasks = worker.getWorkingTasks(); + Set sourceTasks = worker.getWorkingTasks(); assertThat(sourceTasks.size()).isEqualTo(3); - for (WorkerSourceTask wst: sourceTasks) { - assertThat(wst.getConnectorName()).isIn("TEST-CONN-1", "TEST-CONN-2", "TEST-CONN-3"); + for (Runnable runnable : sourceTasks) { + WorkerSourceTask workerSourceTask = null; + WorkerSinkTask workerSinkTask = null; + if (runnable instanceof WorkerSourceTask) { + workerSourceTask = (WorkerSourceTask) runnable; + } else { + workerSinkTask = (WorkerSinkTask) runnable; + } + String connectorName = null != workerSourceTask ? workerSourceTask.getConnectorName() : workerSinkTask.getConnectorName(); + assertThat(connectorName).isIn("TEST-CONN-1", "TEST-CONN-2", "TEST-CONN-3"); } } } diff --git a/src/test/java/io/openmessaging/connect/runtime/rest/RestHandlerTest.java b/src/test/java/io/openmessaging/connect/runtime/rest/RestHandlerTest.java index 3b9e6b1..5f72fa2 100644 --- a/src/test/java/io/openmessaging/connect/runtime/rest/RestHandlerTest.java +++ b/src/test/java/io/openmessaging/connect/runtime/rest/RestHandlerTest.java @@ -127,7 +127,7 @@ public class RestHandlerTest { private Set workerConnectors; - private Set workerSourceTasks; + private Set workerTasks; @Before public void init() throws Exception { @@ -196,7 +196,7 @@ public void init() throws Exception { }; WorkerSourceTask workerSourceTask1 = new WorkerSourceTask("testConnectorName1", sourceTask, connectKeyValue, positionStorageReader, converter, producer); WorkerSourceTask workerSourceTask2 = new WorkerSourceTask("testConnectorName2", sourceTask, connectKeyValue1, positionStorageReader, converter, producer); - workerSourceTasks = new HashSet() { + workerTasks = new HashSet() { { add(workerSourceTask1); add(workerSourceTask2); @@ -204,7 +204,7 @@ public void init() throws Exception { }; when(connectController.getWorker()).thenReturn(worker); when(worker.getWorkingConnectors()).thenReturn(workerConnectors); - when(worker.getWorkingTasks()).thenReturn(workerSourceTasks); + when(worker.getWorkingTasks()).thenReturn(workerTasks); restHandler = new RestHandler(connectController); @@ -254,8 +254,8 @@ public void testRESTful() throws Exception { sb.append(workerConnector.toString() + "\n"); } sb.append("working tasks:\n"); - for (WorkerSourceTask workerSourceTask : workerSourceTasks) { - sb.append(workerSourceTask.toString() + "\n"); + for (Runnable runnable : workerTasks) { + sb.append(runnable.toString() + "\n"); } assertEquals(sb.toString(), EntityUtils.toString(httpResponse4.getEntity(), "UTF-8")); } From 773b0f31259c83dcc5a14a680fc584909299bb9d Mon Sep 17 00:00:00 2001 From: odbozhou <877036922@qq.com> Date: Wed, 17 Apr 2019 00:10:09 +0800 Subject: [PATCH 2/3] Adjust WorkerSinkTask --- .../connectorwrapper/WorkerSinkTask.java | 33 +++++++------------ 1 file changed, 11 insertions(+), 22 deletions(-) diff --git a/src/main/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerSinkTask.java b/src/main/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerSinkTask.java index 4f587a5..53bcd55 100644 --- a/src/main/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerSinkTask.java +++ b/src/main/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerSinkTask.java @@ -32,8 +32,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -44,18 +46,14 @@ public class WorkerSinkTask implements Runnable { private static final Logger log = LoggerFactory.getLogger(LoggerName.OMS_RUNTIME); /** - *

* The configuration key that provides the list of topics that are inputs for this * SinkTask. - *

*/ public static final String TOPICS_CONFIG = "topics"; /** - *

* The configuration key that provides a regex specifying which topics to include as inputs * for this SinkTask. - *

*/ public static final String TOPICS_REGEX_CONFIG = "topics.regex"; @@ -85,15 +83,10 @@ public class WorkerSinkTask implements Runnable { private PullConsumer consumer; /** - * A converter to parse sink data entry to byte[]. + * A converter to parse sink data entry to object. */ private Converter recordConverter; - /** - * Current position info of the sink task. - */ - private Map positionData = new HashMap<>(); - public WorkerSinkTask(String connectorName, SinkTask sinkTask, ConnectKeyValue taskConfig, @@ -108,7 +101,7 @@ public WorkerSinkTask(String connectorName, } /** - * Start a sink task, and send data entry to MQ cyclically. + * Start a sink task, and receive data entry from MQ cyclically. */ @Override public void run() { @@ -154,27 +147,23 @@ public KeyValue configs() { log.debug("{} Initializing and starting task for topics regex {}", this, topicsRegexStr);*/ } sinkTask.start(taskConfig); - log.info("Task start, config:{}", JSON.toJSONString(taskConfig)); + log.info("Task start, config: {}", JSON.toJSONString(taskConfig)); while (!isStopping.get()) { final Message message = consumer.receive(); if (null != message) { - receiveRecord(message); + receiveMessage(message); String msgId = message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID); - log.info("Received one message: %s%n", msgId); + log.info("Received one message: {}", msgId); consumer.ack(msgId); } } log.info("Task stop, config:{}", JSON.toJSONString(taskConfig)); } catch (Exception e) { - log.error("Run task failed.", e); + log.error("Run task failed {}.", e); } } - public Map getPositionData() { - return positionData; - } - public void stop() { isStopping.set(true); consumer.shutdown(); @@ -186,7 +175,7 @@ public void stop() { * * @param message */ - private void receiveRecord(Message message) { + private void receiveMessage(Message message) { String queueName = message.sysHeaders().getString("DESTINATION"); final byte[] messageBody = message.getBody(byte[].class); final SourceDataEntry sourceDataEntry = JSON.parseObject(new String(messageBody), SourceDataEntry.class); From c9b9bccfe1dcae86f14867a1510641d46ac728d2 Mon Sep 17 00:00:00 2001 From: zhoubo0404 <877036922@qq.com> Date: Mon, 22 Apr 2019 15:12:13 +0800 Subject: [PATCH 3/3] Complete WorkerSinkTask --- .../runtime/common/QueuePartition.java | 72 +++++++++++ .../connectorwrapper/WorkerSinkTask.java | 121 ++++++++++++------ 2 files changed, 153 insertions(+), 40 deletions(-) create mode 100644 src/main/java/io/openmessaging/connect/runtime/common/QueuePartition.java diff --git a/src/main/java/io/openmessaging/connect/runtime/common/QueuePartition.java b/src/main/java/io/openmessaging/connect/runtime/common/QueuePartition.java new file mode 100644 index 0000000..257d241 --- /dev/null +++ b/src/main/java/io/openmessaging/connect/runtime/common/QueuePartition.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.connect.runtime.common; + +import java.io.Serializable; +import java.util.Objects; + +/** + * A queue name and partition number + */ +public final class QueuePartition implements Serializable { + + private int hash = 0; + private final int partition; + private final String queue; + + public QueuePartition(String queue, int partition) { + this.partition = partition; + this.queue = queue; + } + + public int partition() { + return partition; + } + + public String queue() { + return queue; + } + + @Override + public int hashCode() { + if (hash != 0) + return hash; + final int prime = 31; + int result = 1; + result = prime * result + partition; + result = prime * result + Objects.hashCode(queue); + this.hash = result; + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + QueuePartition other = (QueuePartition) obj; + return partition == other.partition && Objects.equals(queue, other.queue); + } + + @Override + public String toString() { + return queue + "-" + partition; + } +} diff --git a/src/main/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerSinkTask.java b/src/main/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerSinkTask.java index 53bcd55..0f44917 100644 --- a/src/main/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerSinkTask.java +++ b/src/main/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerSinkTask.java @@ -22,6 +22,7 @@ import io.openmessaging.Message; import io.openmessaging.connect.runtime.common.ConnectKeyValue; import io.openmessaging.connect.runtime.common.LoggerName; +import io.openmessaging.connect.runtime.common.QueuePartition; import io.openmessaging.connector.api.data.Converter; import io.openmessaging.connector.api.data.SinkDataEntry; import io.openmessaging.connector.api.data.SourceDataEntry; @@ -32,10 +33,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Base64; -import java.util.List; -import java.util.Map; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -46,16 +45,10 @@ public class WorkerSinkTask implements Runnable { private static final Logger log = LoggerFactory.getLogger(LoggerName.OMS_RUNTIME); /** - * The configuration key that provides the list of topics that are inputs for this + * The configuration key that provides the list of queueNames that are inputs for this * SinkTask. */ - public static final String TOPICS_CONFIG = "topics"; - - /** - * The configuration key that provides a regex specifying which topics to include as inputs - * for this SinkTask. - */ - public static final String TOPICS_REGEX_CONFIG = "topics.regex"; + public static final String QUEUENAMES_CONFIG = "queueNames"; /** * Connector name of current task. @@ -87,6 +80,10 @@ public class WorkerSinkTask implements Runnable { */ private Converter recordConverter; + public ConcurrentHashMap partitionStatusMap; + + public ConcurrentHashMap partitionOffsetMap; + public WorkerSinkTask(String connectorName, SinkTask sinkTask, ConnectKeyValue taskConfig, @@ -109,22 +106,40 @@ public void run() { sinkTask.initialize(new SinkTaskContext() { @Override public void resetOffset(String queueName, Long offset) { - + //TODO oms-1.0.0-alpha支持获取, 并且queueName 是否需要换成QueuePartition + QueuePartition queuePartition = new QueuePartition(queueName, 0); + partitionOffsetMap.put(queuePartition, offset); } @Override public void resetOffset(Map offsets) { - + //TODO oms-1.0.0-alpha支持获取, 并且queueName 是否需要换成QueuePartition + for (Map.Entry entry : offsets.entrySet()) { + QueuePartition queuePartition = new QueuePartition(entry.getKey(), 0); + partitionOffsetMap.put(queuePartition, entry.getValue()); + } } @Override - public void pause(List queueName) { - + public void pause(List queueNames) { + //TODO queueName 是否需要换成QueuePartition + if (null != queueNames && queueNames.size() > 0) { + for (String queueName : queueNames) { + QueuePartition queuePartition = new QueuePartition(queueName, 0); + partitionStatusMap.put(queuePartition, PartitionStatus.PAUSE); + } + } } @Override - public void resume(List queueName) { - + public void resume(List queueNames) { + //TODO queueName 是否需要换成QueuePartition + if (null != queueNames && queueNames.size() > 0) { + for (String queueName : queueNames) { + QueuePartition queuePartition = new QueuePartition(queueName, 0); + partitionStatusMap.remove(queuePartition); + } + } } @Override @@ -132,29 +147,29 @@ public KeyValue configs() { return taskConfig; } }); - String topicsStr = taskConfig.getString(TOPICS_CONFIG); - if (!StringUtils.isEmpty(topicsStr)) { - String[] topics = topicsStr.split(","); - for (String topic : topics) { - consumer.attachQueue(topic); + String queueNamesStr = taskConfig.getString(QUEUENAMES_CONFIG); + if (!StringUtils.isEmpty(queueNamesStr)) { + String[] queueNames = queueNamesStr.split(","); + for (String queueName : queueNames) { + consumer.attachQueue(queueName); } - log.debug("{} Initializing and starting task for topics {}", this, topics); + log.debug("{} Initializing and starting task for queueNames {}", this, queueNames); } else { - // TODO 通过正则表达式订阅topic - /* String topicsRegexStr = taskConfig.getString(TOPICS_REGEX_CONFIG); - Pattern pattern = Pattern.compile(topicsRegexStr); - consumer.attachQueue(pattern) - log.debug("{} Initializing and starting task for topics regex {}", this, topicsRegexStr);*/ + log.error("Lack of sink comsume queueNames config"); } sinkTask.start(taskConfig); - log.info("Task start, config: {}", JSON.toJSONString(taskConfig)); while (!isStopping.get()) { final Message message = consumer.receive(); - if (null != message) { - receiveMessage(message); - String msgId = message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID); - log.info("Received one message: {}", msgId); + List messages = new ArrayList<>(16); + messages.add(message); + checkPause(messages); + receiveMessages(messages); + for (Message message1 : messages) { + String msgId = message1.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID); + log.info("Received one message success : {}", msgId); + //TODO 更新queue offset +// partitionOffsetMap.put() consumer.ack(msgId); } } @@ -164,6 +179,21 @@ public KeyValue configs() { } } + private void checkPause(List messages) { + final Iterator iterator = messages.iterator(); + while (iterator.hasNext()) { + final Message message = iterator.next(); + String queueName = message.sysHeaders().getString("DESTINATION"); + //TODO 缺失partition + QueuePartition queuePartition = new QueuePartition(queueName, 0); + if (null != partitionStatusMap.get(queuePartition)) { + String msgId = message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID); + log.info("QueueName {}, partition {} pause, Discard the message {}", queueName, 0, msgId); + iterator.remove(); + } + } + } + public void stop() { isStopping.set(true); consumer.shutdown(); @@ -173,9 +203,18 @@ public void stop() { /** * receive message from MQ. * - * @param message + * @param messages */ - private void receiveMessage(Message message) { + private void receiveMessages(List messages) { + List sinkDataEntries = new ArrayList<>(8); + for (Message message : messages) { + SinkDataEntry sinkDataEntry = convertToSinkDataEntry(message); + sinkDataEntries.add(sinkDataEntry); + } + sinkTask.put(sinkDataEntries); + } + + private SinkDataEntry convertToSinkDataEntry(Message message) { String queueName = message.sysHeaders().getString("DESTINATION"); final byte[] messageBody = message.getBody(byte[].class); final SourceDataEntry sourceDataEntry = JSON.parseObject(new String(messageBody), SourceDataEntry.class); @@ -184,12 +223,10 @@ private void receiveMessage(Message message) { final Object recodeObject = recordConverter.byteToObject(decodeBytes); Object[] newObject = new Object[1]; newObject[0] = recodeObject; - //TODO queueOffset如何获得 + //TODO oms-1.0.0-alpha支持获取offset,并且支持批量获取消息,SinkDataEntry & SourceDataEntry 是否要增加partition相关属性 SinkDataEntry sinkDataEntry = new SinkDataEntry(10L, sourceDataEntry.getTimestamp(), sourceDataEntry.getEntryType(), queueName, sourceDataEntry.getSchema(), newObject); sinkDataEntry.setPayload(newObject); - List sinkDataEntries = new ArrayList<>(8); - sinkDataEntries.add(sinkDataEntry); - sinkTask.put(sinkDataEntries); + return sinkDataEntry; } public String getConnectorName() { @@ -208,4 +245,8 @@ public String toString() { .append("\nConfigs:" + JSON.toJSONString(taskConfig)); return sb.toString(); } + + private enum PartitionStatus { + PAUSE; + } }