Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed Jan 7, 2025
1 parent 1023235 commit 935fa5c
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.doris.flink.sink.writer.ChangeEvent;

import java.io.IOException;

/**
* When cdc connector captures data changes about source database schema changes, you need to
* inherit this class to complete the synchronized changes to Doris schema. Supports data messages
Expand All @@ -33,7 +31,8 @@ public abstract class CdcSchemaChange implements ChangeEvent {

protected abstract String extractTable(JsonNode record);

public abstract boolean schemaChange(JsonNode recordRoot) throws IOException;
/** Schema change */
public abstract boolean schemaChange(JsonNode recordRoot);

protected abstract String getCdcTableIdentifier(JsonNode record);
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,8 @@ public DorisRecord serialize(String record) throws IOException {
LOG.debug("received debezium json data {} :", record);
JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
String op = getOperateType(recordRoot);
try {
schemaChange.schemaChange(recordRoot);
} catch (Exception e) {
throw new RuntimeException(e);
}
schemaChange.schemaChange(recordRoot);

return dataChange.serialize(record, recordRoot, op);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,26 +89,32 @@ public String extractTable(JsonNode record) {
}

@Override
public boolean schemaChange(JsonNode recordRoot) throws IOException {
JsonNode logData = getFullDocument(recordRoot);
String cdcTableIdentifier = getCdcTableIdentifier(recordRoot);
String dorisTableIdentifier =
getDorisTableIdentifier(cdcTableIdentifier, dorisOptions, tableMapping);
String[] tableInfo = dorisTableIdentifier.split("\\.");
if (tableInfo.length != 2) {
throw new DorisRuntimeException();
public boolean schemaChange(JsonNode recordRoot) {
try {
JsonNode logData = getFullDocument(recordRoot);
String cdcTableIdentifier = getCdcTableIdentifier(recordRoot);
String dorisTableIdentifier =
getDorisTableIdentifier(cdcTableIdentifier, dorisOptions, tableMapping);
String[] tableInfo = dorisTableIdentifier.split("\\.");
if (tableInfo.length != 2) {
throw new DorisRuntimeException();
}
String dataBase = tableInfo[0];
String table = tableInfo[1];
// build table fields mapping for all record
buildDorisTableFieldsMapping(dataBase, table);

// Determine whether change stream log and tableField are exactly the same, if not,
// perform
// schema change
checkAndUpdateSchemaChange(logData, dorisTableIdentifier, dataBase, table);
formatSpecialFieldData(logData);
((ObjectNode) recordRoot).set(FIELD_DATA, logData);
return true;
} catch (Exception ex) {
LOG.warn("schema change error : ", ex);
return false;
}
String dataBase = tableInfo[0];
String table = tableInfo[1];
// build table fields mapping for all record
buildDorisTableFieldsMapping(dataBase, table);

// Determine whether change stream log and tableField are exactly the same, if not, perform
// schema change
checkAndUpdateSchemaChange(logData, dorisTableIdentifier, dataBase, table);
formatSpecialFieldData(logData);
((ObjectNode) recordRoot).set(FIELD_DATA, logData);
return true;
}

private void formatSpecialFieldData(JsonNode logData) {
Expand Down

0 comments on commit 935fa5c

Please sign in to comment.