Skip to content

Commit

Permalink
Use blockingqueue and remove log4j properties file
Browse files Browse the repository at this point in the history
  • Loading branch information
RusJaI committed Jul 26, 2023
1 parent 59cf982 commit b6249da
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@

import java.util.Date;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.BlockingQueue;

import static org.wso2.carbon.inbound.endpoint.protocol.cdc.CDCProcessor.inboundEpEventQueueMap;

Expand All @@ -38,7 +37,6 @@ public class CDCPollingConsumer {

private static final Log logger = LogFactory.getLog(CDCPollingConsumer.class);
private Properties cdcProperties;
private ExecutorService executorService = null;
private String inboundEndpointName;
private SynapseEnvironment synapseEnvironment;
private long scanInterval;
Expand Down Expand Up @@ -102,10 +100,9 @@ public ChangeEvent<String, String> poll() {
logger.debug("Start : listening to DB events : ");
}

ConcurrentLinkedQueue<ChangeEvent<String, String>> eventQueue = inboundEpEventQueueMap.get(inboundEndpointName);
BlockingQueue<ChangeEvent<String, String>> eventQueue = inboundEpEventQueueMap.get(inboundEndpointName);
while (!eventQueue.isEmpty()) {
injectHandler.invoke(eventQueue.peek(), inboundEndpointName);
eventQueue.remove(eventQueue.peek());
injectHandler.invoke(eventQueue.poll(), inboundEndpointName);
}

if (logger.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -76,7 +74,7 @@ public class CDCProcessor extends InboundRequestProcessorImpl implements TaskSta
private static final String FILE_OFFSET_STORAGE_CLASS = "org.apache.kafka.connect.storage.FileOffsetBackingStore";
private static final String FILE_SCHEMA_HISTORY_STORAGE_CLASS = "io.debezium.storage.file.history.FileSchemaHistory";
private static final Log LOGGER = LogFactory.getLog(CDCProcessor.class);
protected static Map<String, ConcurrentLinkedQueue> inboundEpEventQueueMap = new HashMap();
protected static Map<String, BlockingQueue> inboundEpEventQueueMap = new HashMap();
private ExecutorService executorService = null;

public CDCProcessor(InboundProcessorParams params) {
Expand All @@ -103,7 +101,7 @@ public CDCProcessor(InboundProcessorParams params) {
this.coordination = Boolean.parseBoolean(cdcProperties.getProperty(PollingConstants.INBOUND_COORDINATION));
}
if (!inboundEpEventQueueMap.containsKey(this.name)) {
ConcurrentLinkedQueue<ChangeEvent<String, String>> eventQueue = new ConcurrentLinkedQueue();
BlockingQueue<ChangeEvent<String, String>> eventQueue = new LinkedBlockingQueue<>();
inboundEpEventQueueMap.put(this.name, eventQueue);
}
}
Expand Down Expand Up @@ -226,7 +224,11 @@ public void init() {
DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
.using(this.cdcProperties)
.notifying(record -> {
inboundEpEventQueueMap.get(this.name).add(record);
try {
inboundEpEventQueueMap.get(this.name).offer(record, interval, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).build();

executorService = Executors.newSingleThreadExecutor();
Expand Down

This file was deleted.

0 comments on commit b6249da

Please sign in to comment.