diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCPollingConsumer.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCPollingConsumer.java index db780eae17..dbd56d18f2 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCPollingConsumer.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCPollingConsumer.java @@ -24,8 +24,7 @@ import java.util.Date; import java.util.Properties; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.BlockingQueue; import static org.wso2.carbon.inbound.endpoint.protocol.cdc.CDCProcessor.inboundEpEventQueueMap; @@ -38,7 +37,6 @@ public class CDCPollingConsumer { private static final Log logger = LogFactory.getLog(CDCPollingConsumer.class); private Properties cdcProperties; - private ExecutorService executorService = null; private String inboundEndpointName; private SynapseEnvironment synapseEnvironment; private long scanInterval; @@ -102,10 +100,9 @@ public ChangeEvent poll() { logger.debug("Start : listening to DB events : "); } - ConcurrentLinkedQueue> eventQueue = inboundEpEventQueueMap.get(inboundEndpointName); + BlockingQueue> eventQueue = inboundEpEventQueueMap.get(inboundEndpointName); while (!eventQueue.isEmpty()) { - injectHandler.invoke(eventQueue.peek(), inboundEndpointName); - eventQueue.remove(eventQueue.peek()); + injectHandler.invoke(eventQueue.poll(), inboundEndpointName); } if (logger.isDebugEnabled()) { diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCProcessor.java b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCProcessor.java index 8926d2c916..f5353e2acd 100644 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCProcessor.java +++ b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/cdc/CDCProcessor.java @@ -42,9 +42,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.*; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -76,7 +74,7 @@ public class CDCProcessor extends InboundRequestProcessorImpl implements TaskSta private static final String FILE_OFFSET_STORAGE_CLASS = "org.apache.kafka.connect.storage.FileOffsetBackingStore"; private static final String FILE_SCHEMA_HISTORY_STORAGE_CLASS = "io.debezium.storage.file.history.FileSchemaHistory"; private static final Log LOGGER = LogFactory.getLog(CDCProcessor.class); - protected static Map inboundEpEventQueueMap = new HashMap(); + protected static Map inboundEpEventQueueMap = new HashMap(); private ExecutorService executorService = null; public CDCProcessor(InboundProcessorParams params) { @@ -103,7 +101,7 @@ public CDCProcessor(InboundProcessorParams params) { this.coordination = Boolean.parseBoolean(cdcProperties.getProperty(PollingConstants.INBOUND_COORDINATION)); } if (!inboundEpEventQueueMap.containsKey(this.name)) { - ConcurrentLinkedQueue> eventQueue = new ConcurrentLinkedQueue(); + BlockingQueue> eventQueue = new LinkedBlockingQueue<>(); inboundEpEventQueueMap.put(this.name, eventQueue); } } @@ -226,7 +224,11 @@ public void init() { DebeziumEngine> engine = DebeziumEngine.create(Json.class) .using(this.cdcProperties) .notifying(record -> { - inboundEpEventQueueMap.get(this.name).add(record); + try { + inboundEpEventQueueMap.get(this.name).offer(record, interval, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } }).build(); executorService = Executors.newSingleThreadExecutor(); diff --git a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/resources/log4j.properties b/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/resources/log4j.properties deleted file mode 100644 index 1da8230168..0000000000 --- a/components/mediation/inbound-endpoints/org.wso2.micro.integrator.inbound.endpoint/src/main/resources/log4j.properties +++ /dev/null @@ -1,8 +0,0 @@ -# Root logger option -log4j.rootLogger=INFO, stdout - -# Direct log messages to stdout -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.Target=System.out -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n