Skip to content

Commit

Permalink
Merge pull request #8 from odbozhou/base-oms-1.0.0-alpha
Browse files Browse the repository at this point in the history
[issues#6] Implement WorkSinkTask
  • Loading branch information
duhenglucky authored May 31, 2019
2 parents 91e2281 + c9b9bcc commit 9734c3b
Show file tree
Hide file tree
Showing 6 changed files with 423 additions and 61 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.connect.runtime.common;

import java.io.Serializable;
import java.util.Objects;

/**
* A queue name and partition number
*/
public final class QueuePartition implements Serializable {

private int hash = 0;
private final int partition;
private final String queue;

public QueuePartition(String queue, int partition) {
this.partition = partition;
this.queue = queue;
}

public int partition() {
return partition;
}

public String queue() {
return queue;
}

@Override
public int hashCode() {
if (hash != 0)
return hash;
final int prime = 31;
int result = 1;
result = prime * result + partition;
result = prime * result + Objects.hashCode(queue);
this.hash = result;
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
QueuePartition other = (QueuePartition) obj;
return partition == other.partition && Objects.equals(queue, other.queue);
}

@Override
public String toString() {
return queue + "-" + partition;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package io.openmessaging.connect.runtime.connectorwrapper;

import io.netty.util.internal.ConcurrentSet;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.connect.runtime.common.ConnectKeyValue;
import io.openmessaging.connect.runtime.config.ConnectConfig;
import io.openmessaging.connect.runtime.config.RuntimeConfigDefine;
Expand All @@ -31,15 +30,13 @@
import io.openmessaging.connector.api.data.Converter;
import io.openmessaging.connector.api.data.SinkDataEntry;
import io.openmessaging.connector.api.data.SourceDataEntry;
import io.openmessaging.connector.api.sink.SinkTask;
import io.openmessaging.connector.api.source.SourceTask;
import io.openmessaging.consumer.PullConsumer;
import io.openmessaging.producer.Producer;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

Expand All @@ -61,7 +58,7 @@ public class Worker {
/**
* Current running tasks.
*/
private Set<WorkerSourceTask> workingTasks = new ConcurrentSet<>();
private Set<Runnable> workingTasks = new ConcurrentSet<>();

/**
* Thread pool for connectors and tasks.
Expand All @@ -85,8 +82,8 @@ public class Worker {
private TaskPositionCommitService taskPositionCommitService;

public Worker(ConnectConfig connectConfig,
PositionManagementService positionManagementService,
MessagingAccessWrapper messagingAccessWrapper) {
PositionManagementService positionManagementService,
MessagingAccessWrapper messagingAccessWrapper) {

this.workerId = connectConfig.getWorkerId();
this.taskExecutor = Executors.newCachedThreadPool();
Expand All @@ -95,50 +92,51 @@ public Worker(ConnectConfig connectConfig,
taskPositionCommitService = new TaskPositionCommitService(this);
}

public void start(){
public void start() {
taskPositionCommitService.start();
}

/**
* Start a collection of connectors with the given configs.
* If a connector is already started with the same configs, it will not start again.
* If a connector is already started but not contained in the new configs, it will stop.
*
* @param connectorConfigs
* @throws Exception
*/
public synchronized void startConnectors(Map<String, ConnectKeyValue> connectorConfigs) throws Exception {

Set<WorkerConnector> stoppedConnector = new HashSet<>();
for(WorkerConnector workerConnector : workingConnectors){
for (WorkerConnector workerConnector : workingConnectors) {
String connectorName = workerConnector.getConnectorName();
ConnectKeyValue keyValue = connectorConfigs.get(connectorName);
if(null == keyValue || 0 != keyValue.getInt(RuntimeConfigDefine.CONFIG_DELETED)){
if (null == keyValue || 0 != keyValue.getInt(RuntimeConfigDefine.CONFIG_DELETED)) {
workerConnector.stop();
stoppedConnector.add(workerConnector);
}else if(!keyValue.equals(workerConnector.getKeyValue())){
} else if (!keyValue.equals(workerConnector.getKeyValue())) {
workerConnector.reconfigure(keyValue);
}
}
workingConnectors.removeAll(stoppedConnector);

if(null == connectorConfigs || 0 == connectorConfigs.size()){
if (null == connectorConfigs || 0 == connectorConfigs.size()) {
return;
}
Map<String, ConnectKeyValue> newConnectors = new HashMap<>();
for(String connectorName : connectorConfigs.keySet()){
for (String connectorName : connectorConfigs.keySet()) {
boolean isNewConnector = true;
for(WorkerConnector workerConnector : workingConnectors){
if(workerConnector.getConnectorName().equals(connectorName)){
for (WorkerConnector workerConnector : workingConnectors) {
if (workerConnector.getConnectorName().equals(connectorName)) {
isNewConnector = false;
break;
}
}
if(isNewConnector){
if (isNewConnector) {
newConnectors.put(connectorName, connectorConfigs.get(connectorName));
}
}

for(String connectorName : newConnectors.keySet()){
for (String connectorName : newConnectors.keySet()) {
ConnectKeyValue keyValue = newConnectors.get(connectorName);
Class clazz = Class.forName(keyValue.getString(RuntimeConfigDefine.CONNECTOR_CLASS));
Connector connector = (Connector) clazz.newInstance();
Expand All @@ -152,70 +150,100 @@ public synchronized void startConnectors(Map<String, ConnectKeyValue> connectorC
* Start a collection of tasks with the given configs.
* If a task is already started with the same configs, it will not start again.
* If a task is already started but not contained in the new configs, it will stop.
*
* @param taskConfigs
* @throws Exception
*/
public synchronized void startTasks(Map<String, List<ConnectKeyValue>> taskConfigs) throws Exception {

Set<WorkerSourceTask> stoppedTasks = new HashSet<>();
for(WorkerSourceTask workerSourceTask : workingTasks){
String connectorName = workerSourceTask.getConnectorName();
Set<Runnable> stoppedTasks = new HashSet<>();
for (Runnable runnable : workingTasks) {
WorkerSourceTask workerSourceTask = null;
WorkerSinkTask workerSinkTask = null;
if (runnable instanceof WorkerSourceTask) {
workerSourceTask = (WorkerSourceTask) runnable;
} else {
workerSinkTask = (WorkerSinkTask) runnable;
}

String connectorName = null != workerSourceTask ? workerSourceTask.getConnectorName() : workerSinkTask.getConnectorName();
ConnectKeyValue taskConfig = null != workerSourceTask ? workerSourceTask.getTaskConfig() : workerSinkTask.getTaskConfig();
List<ConnectKeyValue> keyValues = taskConfigs.get(connectorName);
boolean needStop = true;
if(null != keyValues && keyValues.size() > 0){
for(ConnectKeyValue keyValue : keyValues){
if(keyValue.equals(workerSourceTask.getTaskConfig())){
if (null != keyValues && keyValues.size() > 0) {
for (ConnectKeyValue keyValue : keyValues) {
if (keyValue.equals(taskConfig)) {
needStop = false;
break;
}
}
}
if(needStop){
workerSourceTask.stop();
stoppedTasks.add(workerSourceTask);
if (needStop) {
if (null != workerSourceTask) {
workerSourceTask.stop();
stoppedTasks.add(workerSourceTask);
} else {
workerSinkTask.stop();
stoppedTasks.add(workerSinkTask);
}

}
}
workingTasks.removeAll(stoppedTasks);

if (null == taskConfigs || 0 == taskConfigs.size()){
if (null == taskConfigs || 0 == taskConfigs.size()) {
return;
}
Map<String, List<ConnectKeyValue>> newTasks = new HashMap<>();
for(String connectorName : taskConfigs.keySet()){
for(ConnectKeyValue keyValue : taskConfigs.get(connectorName)){
for (String connectorName : taskConfigs.keySet()) {
for (ConnectKeyValue keyValue : taskConfigs.get(connectorName)) {
boolean isNewTask = true;
for(WorkerSourceTask workeringTask : workingTasks){
if(keyValue.equals(workeringTask.getTaskConfig())){
for (Runnable runnable : workingTasks) {
WorkerSourceTask workerSourceTask = null;
WorkerSinkTask workerSinkTask = null;
if (runnable instanceof WorkerSourceTask) {
workerSourceTask = (WorkerSourceTask) runnable;
} else {
workerSinkTask = (WorkerSinkTask) runnable;
}
ConnectKeyValue taskConfig = null != workerSourceTask ? workerSourceTask.getTaskConfig() : workerSinkTask.getTaskConfig();
if (keyValue.equals(taskConfig)) {
isNewTask = false;
break;
}
}
if(isNewTask){
if(!newTasks.containsKey(connectorName)){
if (isNewTask) {
if (!newTasks.containsKey(connectorName)) {
newTasks.put(connectorName, new ArrayList<>());
}
newTasks.get(connectorName).add(keyValue);
}
}
}

for(String connectorName : newTasks.keySet()){
for(ConnectKeyValue keyValue : newTasks.get(connectorName)){
for (String connectorName : newTasks.keySet()) {
for (ConnectKeyValue keyValue : newTasks.get(connectorName)) {
Class taskClazz = Class.forName(keyValue.getString(RuntimeConfigDefine.TASK_CLASS));
Task task = (Task) taskClazz.newInstance();

Class converterClazz = Class.forName(keyValue.getString(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER));
Converter recordConverter = (Converter) converterClazz.newInstance();

if(task instanceof SourceTask){
if (task instanceof SourceTask) {
Producer producer = messagingAccessWrapper
.getMessageAccessPoint(keyValue.getString(RuntimeConfigDefine.OMS_DRIVER_URL)).createProducer();
.getMessageAccessPoint(keyValue.getString(RuntimeConfigDefine.OMS_DRIVER_URL)).createProducer();
producer.startup();
WorkerSourceTask workerSourceTask = new WorkerSourceTask(connectorName,
(SourceTask) task, keyValue,
new PositionStorageReaderImpl(positionManagementService), recordConverter, producer);
(SourceTask) task, keyValue,
new PositionStorageReaderImpl(positionManagementService), recordConverter, producer);
this.taskExecutor.submit(workerSourceTask);
this.workingTasks.add(workerSourceTask);
} else if (task instanceof SinkTask) {
PullConsumer consumer = messagingAccessWrapper.getMessageAccessPoint(keyValue.getString(RuntimeConfigDefine.OMS_DRIVER_URL)).createPullConsumer();
consumer.startup();
WorkerSinkTask workerSinkTask = new WorkerSinkTask(connectorName, (SinkTask) task, keyValue, recordConverter, consumer);
this.taskExecutor.submit(workerSinkTask);
this.workingTasks.add(workerSinkTask);
}
}
}
Expand All @@ -226,8 +254,10 @@ public synchronized void startTasks(Map<String, List<ConnectKeyValue>> taskConfi
*/
public void commitTaskPosition() {
Map<ByteBuffer, ByteBuffer> positionData = new HashMap<>();
for(WorkerSourceTask task : workingTasks){
positionData.putAll(task.getPositionData());
for (Runnable task : workingTasks) {
if (task instanceof WorkerSourceTask) {
positionData.putAll(((WorkerSourceTask) task).getPositionData());
}
}
positionManagementService.putPosition(positionData);
}
Expand All @@ -245,15 +275,15 @@ public Set<WorkerConnector> getWorkingConnectors() {
}

public void setWorkingConnectors(
Set<WorkerConnector> workingConnectors) {
Set<WorkerConnector> workingConnectors) {
this.workingConnectors = workingConnectors;
}

public Set<WorkerSourceTask> getWorkingTasks() {
public Set<Runnable> getWorkingTasks() {
return workingTasks;
}

public void setWorkingTasks(Set<WorkerSourceTask> workingTasks) {
public void setWorkingTasks(Set<Runnable> workingTasks) {
this.workingTasks = workingTasks;
}
}
Loading

0 comments on commit 9734c3b

Please sign in to comment.