Skip to content

Commit

Permalink
Resolve concurrency issues in polling scenarios
Browse files Browse the repository at this point in the history
  • Loading branch information
RusJaI committed Jul 31, 2023
1 parent f8ffc98 commit 4804ccc
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@
package org.wso2.carbon.inbound.endpoint.protocol.cdc;

import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.core.SynapseEnvironment;

import java.io.IOException;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;

import static org.wso2.carbon.inbound.endpoint.protocol.cdc.CDCProcessor.inboundEpEventQueueMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* This class implement the processing logic related to inbound CDC protocol.
Expand All @@ -42,6 +44,8 @@ public class CDCPollingConsumer {
private long scanInterval;
private Long lastRanTime;
private CDCInjectHandler injectHandler;
private ExecutorService executorService = null;
private DebeziumEngine<ChangeEvent<String, String>> engine = null;

public CDCPollingConsumer(Properties cdcProperties, String inboundEndpointName, SynapseEnvironment synapseEnvironment,
long scanInterval) {
Expand Down Expand Up @@ -99,20 +103,42 @@ public ChangeEvent<String, String> poll() {
if (logger.isDebugEnabled()) {
logger.debug("Start : listening to DB events : ");
}

BlockingQueue<ChangeEvent<String, String>> eventQueue = inboundEpEventQueueMap.get(inboundEndpointName);
while (!eventQueue.isEmpty()) {
injectHandler.invoke(eventQueue.poll(), inboundEndpointName);
}

listenDataChanges();
if (logger.isDebugEnabled()) {
logger.debug("End : Listening to DB events : ");
}
return null;
}

private void listenDataChanges () {
executorService = Executors.newSingleThreadExecutor();

if (engine == null || executorService.isShutdown()) {
engine = DebeziumEngine.create(Json.class)
.using(this.cdcProperties)
.notifying(record -> {
injectHandler.invoke(record, this.inboundEndpointName);
}).build();

executorService.execute(engine);
}
}

protected Properties getInboundProperties() {
return cdcProperties;
}

protected void destroy () {
if (!executorService.isShutdown()) {
executorService.shutdown();
}
try {
if (engine != null) {
engine.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,7 @@

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -74,8 +71,6 @@ public class CDCProcessor extends InboundRequestProcessorImpl implements TaskSta
private static final String FILE_OFFSET_STORAGE_CLASS = "org.apache.kafka.connect.storage.FileOffsetBackingStore";
private static final String FILE_SCHEMA_HISTORY_STORAGE_CLASS = "io.debezium.storage.file.history.FileSchemaHistory";
private static final Log LOGGER = LogFactory.getLog(CDCProcessor.class);
protected static Map<String, BlockingQueue> inboundEpEventQueueMap = new HashMap();
private ExecutorService executorService = null;

public CDCProcessor(InboundProcessorParams params) {
this.name = params.getName();
Expand All @@ -100,10 +95,6 @@ public CDCProcessor(InboundProcessorParams params) {
if (cdcProperties.getProperty(PollingConstants.INBOUND_COORDINATION) != null) {
this.coordination = Boolean.parseBoolean(cdcProperties.getProperty(PollingConstants.INBOUND_COORDINATION));
}
if (!inboundEpEventQueueMap.containsKey(this.name)) {
BlockingQueue<ChangeEvent<String, String>> eventQueue = new LinkedBlockingQueue<>();
inboundEpEventQueueMap.put(this.name, eventQueue);
}
}

private void setProperties () {
Expand Down Expand Up @@ -213,19 +204,6 @@ public void init() {
pollingConsumer = new CDCPollingConsumer(cdcProperties, name, synapseEnvironment, interval);
pollingConsumer.registerHandler(new CDCInjectHandler(injectingSeq, onErrorSeq, sequential,
synapseEnvironment, cdcProperties));

DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
.using(this.cdcProperties)
.notifying(record -> {
try {
inboundEpEventQueueMap.get(this.name).offer(record, interval, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).build();

executorService = Executors.newSingleThreadExecutor();
executorService.execute(engine);
start();
}

Expand All @@ -247,7 +225,7 @@ public void start() {
public void destroy(boolean removeTask) {
if (removeTask) {
destroy();
executorService.shutdown();
pollingConsumer.destroy();
}
}

Expand Down

0 comments on commit 4804ccc

Please sign in to comment.