Skip to content

Commit

Permalink
Get the param as allowed.operations instead of skipped.operations
Browse files Browse the repository at this point in the history
Change the property names of the sequence
Add isDebugEnable check where string concaterntions are done.
  • Loading branch information
RusJaI committed Aug 10, 2023
1 parent affa13a commit a379f91
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@
import java.util.Map;
import java.util.Properties;

import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DATABASE_NAME;
import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.OPERATIONS;
import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.TABLES;
import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.TS_MS;
import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.CDC_DATABASE_NAME;
import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.CDC_OPERATIONS;
import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.CDC_TABLES;
import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.CDC_TS_MS;

public class CDCInjectHandler {

Expand Down Expand Up @@ -73,9 +73,7 @@ public boolean invoke(Object object, String inboundEndpointName) throws SynapseE

ChangeEvent<String, String> eventRecord = (ChangeEvent<String, String>) object;
if (eventRecord == null || eventRecord.value() == null) {
if (logger.isDebugEnabled()) {
logger.debug("CDC Source Handler received empty event record");
}
logger.debug("CDC Source Handler received empty event record");
} else {
InputStream in = null;
try {
Expand All @@ -87,10 +85,10 @@ public boolean invoke(Object object, String inboundEndpointName) throws SynapseE
CustomLogSetter.getInstance().setLogAppender(inboundEndpoint.getArtifactContainerName());

CDCEventOutput cdcEventOutput = new CDCEventOutput(eventRecord);
msgCtx.setProperty(DATABASE_NAME, cdcEventOutput.getDatabase());
msgCtx.setProperty(TABLES, cdcEventOutput.getTable().toString());
msgCtx.setProperty(OPERATIONS, cdcEventOutput.getOp());
msgCtx.setProperty(TS_MS, cdcEventOutput.getTs_ms().toString());
msgCtx.setProperty(CDC_DATABASE_NAME, cdcEventOutput.getDatabase());
msgCtx.setProperty(CDC_TABLES, cdcEventOutput.getTable().toString());
msgCtx.setProperty(CDC_OPERATIONS, cdcEventOutput.getOp());
msgCtx.setProperty(CDC_TS_MS, cdcEventOutput.getTs_ms().toString());

if (logger.isDebugEnabled()) {
logger.debug("Processed event : " + eventRecord);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,9 @@ public void execute() {
* according to the registered handler
*/
public ChangeEvent<String, String> poll() {

if (logger.isDebugEnabled()) {
logger.debug("Start : listening to DB events : ");
}
logger.debug("Start : listening to DB events : ");
listenDataChanges();
if (logger.isDebugEnabled()) {
logger.debug("End : Listening to DB events : ");
}
logger.debug("End : Listening to DB events : ");
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@

package org.wso2.carbon.inbound.endpoint.protocol.cdc;

import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand All @@ -39,10 +36,15 @@

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_ALLOWED_OPERATIONS;
import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_DATABASE_ALLOW_PUBLIC_KEY_RETRIEVAL;
import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_DATABASE_PASSWORD;
import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_KEY_CONVERTER;
Expand All @@ -55,6 +57,7 @@
import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_TOPIC_PREFIX;
import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_VALUE_CONVERTER;
import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_VALUE_CONVERTER_SCHEMAS_ENABLE;
import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.DEBEZIUM_SKIPPED_OPERATIONS;
import static org.wso2.carbon.inbound.endpoint.protocol.cdc.InboundCDCConstants.TRUE;

public class CDCProcessor extends InboundRequestProcessorImpl implements TaskStartupObserver, InboundTaskProcessor {
Expand All @@ -72,6 +75,9 @@ public class CDCProcessor extends InboundRequestProcessorImpl implements TaskSta
private static final String FILE_SCHEMA_HISTORY_STORAGE_CLASS = "io.debezium.storage.file.history.FileSchemaHistory";
private static final Log LOGGER = LogFactory.getLog(CDCProcessor.class);

private enum operations {create, update, delete, truncate};
private enum opCodes {c, u, d, t};

public CDCProcessor(InboundProcessorParams params) {
this.name = params.getName();
this.injectingSeq = params.getInjectingSeq();
Expand Down Expand Up @@ -128,6 +134,11 @@ private void setProperties () {
this.cdcProperties.setProperty(DEBEZIUM_DATABASE_ALLOW_PUBLIC_KEY_RETRIEVAL, TRUE);
}

if (this.cdcProperties.getProperty(DEBEZIUM_ALLOWED_OPERATIONS) != null) {
this.cdcProperties.setProperty(DEBEZIUM_SKIPPED_OPERATIONS,
getSkippedOperationsString(this.cdcProperties.getProperty(DEBEZIUM_ALLOWED_OPERATIONS)));
}

if (this.cdcProperties.getProperty(DEBEZIUM_TOPIC_PREFIX) == null) {
this.cdcProperties.setProperty(DEBEZIUM_TOPIC_PREFIX, this.name +"_topic");
}
Expand Down Expand Up @@ -233,4 +244,34 @@ public void destroy(boolean removeTask) {
public void update() {
// This will not be called for inbound endpoints
}

private String getOpCode(String op) {
if (op != null) {
switch (operations.valueOf(op)) {
case create:
return opCodes.c.toString();
case update:
return opCodes.u.toString();
case delete:
return opCodes.d.toString();
case truncate:
return opCodes.t.toString();
}
}
return "";
}

/**
* Get the comma separated list containing allowed operations and returns the string of skipped operation codes
* @param allowedOperationsString string
* @return the coma separated string of skipped operation codes
*/
private String getSkippedOperationsString(String allowedOperationsString) {
List<String> allOperations = Stream.of(opCodes.values()).map(Enum :: toString).collect(Collectors.toList());
Set<String> allowedOperationsSet = Stream.of(allowedOperationsString.split(",")).
map(String :: trim).map(String :: toLowerCase).map(op -> getOpCode(op)).
collect(Collectors.toSet());
allOperations.removeAll(allowedOperationsSet);
return String.join(",", allOperations);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ public CDCTask(CDCPollingConsumer pollingConsumer, long interval) {
}

protected void taskExecute() {
if (logger.isDebugEnabled()) {
logger.debug("CDC Task executing.");
}
logger.debug("CDC Task executing.");
pollingConsumer.execute();
}

Expand All @@ -54,14 +52,10 @@ public Properties getInboundProperties() {
}

public void init(SynapseEnvironment synapseEnvironment) {
if (logger.isDebugEnabled()) {
logger.debug("Initializing Task.");
}
logger.debug("Initializing Task.");
}

public void destroy() {
if (logger.isDebugEnabled()) {
logger.debug("Destroying Task. ");
}
logger.debug("Destroying Task. ");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,15 @@ class InboundCDCConstants {

public static final String DEBEZIUM_SCHEMA_HISTORY_INTERNAL = "schema.history.internal";
public static final String DEBEZIUM_SCHEMA_HISTORY_INTERNAL_FILE_FILENAME = "schema.history.internal.file.filename";
public static final String DEBEZIUM_SKIPPED_OPERATIONS = "skipped.operations";
public static final String DEBEZIUM_ALLOWED_OPERATIONS = "allowed.operations";


/** Output Properties **/
public static final String DATABASE_NAME = "database";
public static final String TABLES ="tables";
public static final String OPERATIONS ="operations";
public static final String CDC_DATABASE_NAME = "cdc.database";
public static final String CDC_TABLES ="cdc.tables";
public static final String CDC_OPERATIONS ="cdc.operations";
public static final String CDC_TS_MS = "cdc.ts_ms";
public static final String TS_MS = "ts_ms";

public static final String BEFORE = "before";
Expand Down

0 comments on commit a379f91

Please sign in to comment.