From 935fa5cec05dc3aee743f25d6f85b63a62c4c41d Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Tue, 7 Jan 2025 16:58:11 +0800 Subject: [PATCH] update --- .../jsondebezium/CdcSchemaChange.java | 5 +-- .../MongoDBJsonDebeziumSchemaSerializer.java | 7 +-- .../MongoJsonDebeziumSchemaChange.java | 44 +++++++++++-------- 3 files changed, 29 insertions(+), 27 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcSchemaChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcSchemaChange.java index 858a5effd..ac00017d8 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcSchemaChange.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/CdcSchemaChange.java @@ -20,8 +20,6 @@ import com.fasterxml.jackson.databind.JsonNode; import org.apache.doris.flink.sink.writer.ChangeEvent; -import java.io.IOException; - /** * When cdc connector captures data changes about source database schema changes, you need to * inherit this class to complete the synchronized changes to Doris schema. Supports data messages @@ -33,7 +31,8 @@ public abstract class CdcSchemaChange implements ChangeEvent { protected abstract String extractTable(JsonNode record); - public abstract boolean schemaChange(JsonNode recordRoot) throws IOException; + /** Schema change */ + public abstract boolean schemaChange(JsonNode recordRoot); protected abstract String getCdcTableIdentifier(JsonNode record); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java index d4a87ff87..296a37277 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java @@ -120,11 +120,8 @@ public DorisRecord serialize(String record) throws IOException { LOG.debug("received debezium json data {} :", record); JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class); String op = getOperateType(recordRoot); - try { - schemaChange.schemaChange(recordRoot); - } catch (Exception e) { - throw new RuntimeException(e); - } + schemaChange.schemaChange(recordRoot); + return dataChange.serialize(record, recordRoot, op); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java index 01eebd45e..c3a4a7d8d 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java @@ -89,26 +89,32 @@ public String extractTable(JsonNode record) { } @Override - public boolean schemaChange(JsonNode recordRoot) throws IOException { - JsonNode logData = getFullDocument(recordRoot); - String cdcTableIdentifier = getCdcTableIdentifier(recordRoot); - String dorisTableIdentifier = - getDorisTableIdentifier(cdcTableIdentifier, dorisOptions, tableMapping); - String[] tableInfo = dorisTableIdentifier.split("\\."); - if (tableInfo.length != 2) { - throw new DorisRuntimeException(); + public boolean schemaChange(JsonNode recordRoot) { + try { + JsonNode logData = getFullDocument(recordRoot); + String cdcTableIdentifier = getCdcTableIdentifier(recordRoot); + String dorisTableIdentifier = + getDorisTableIdentifier(cdcTableIdentifier, dorisOptions, tableMapping); + String[] tableInfo = dorisTableIdentifier.split("\\."); + if (tableInfo.length != 2) { + throw new DorisRuntimeException(); + } + String dataBase = tableInfo[0]; + String table = tableInfo[1]; + // build table fields mapping for all record + buildDorisTableFieldsMapping(dataBase, table); + + // Determine whether change stream log and tableField are exactly the same, if not, + // perform + // schema change + checkAndUpdateSchemaChange(logData, dorisTableIdentifier, dataBase, table); + formatSpecialFieldData(logData); + ((ObjectNode) recordRoot).set(FIELD_DATA, logData); + return true; + } catch (Exception ex) { + LOG.warn("schema change error : ", ex); + return false; } - String dataBase = tableInfo[0]; - String table = tableInfo[1]; - // build table fields mapping for all record - buildDorisTableFieldsMapping(dataBase, table); - - // Determine whether change stream log and tableField are exactly the same, if not, perform - // schema change - checkAndUpdateSchemaChange(logData, dorisTableIdentifier, dataBase, table); - formatSpecialFieldData(logData); - ((ObjectNode) recordRoot).set(FIELD_DATA, logData); - return true; } private void formatSpecialFieldData(JsonNode logData) {