Skip to content

Commit

Permalink
[Feature](cdc) add MongoDB cdc (#343)
Browse files Browse the repository at this point in the history
  • Loading branch information
bingquanzhao authored May 8, 2024
1 parent 863fe75 commit ca358a8
Show file tree
Hide file tree
Showing 15 changed files with 1,266 additions and 2 deletions.
13 changes: 13 additions & 0 deletions flink-doris-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,19 @@ under the License.
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mongodb-cdc</artifactId>
<!-- the dependency is available only for stable releases. -->
<version>${flink.sql.cdc.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>flink-shaded-guava</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -241,6 +242,32 @@ public static String buildCreateTableDDL(TableSchema schema) {
return sb.toString();
}

public Map<String, String> getTableFieldNames(String databaseName, String tableName) {
if (!databaseExists(databaseName)) {
throw new DorisRuntimeException("database" + databaseName + " is not exists");
}
String sql =
String.format(
"SELECT COLUMN_NAME,DATA_TYPE "
+ "FROM `information_schema`.`COLUMNS` WHERE `TABLE_SCHEMA`= '%s' AND `TABLE_NAME`= '%s'",
databaseName, tableName);

Map<String, String> columnValues = new HashMap<>();
try (PreparedStatement ps =
jdbcConnectionProvider.getOrEstablishConnection().prepareStatement(sql)) {
ResultSet rs = ps.executeQuery();
while (rs.next()) {
String filedName = rs.getString(1);
String datatype = rs.getString(2);
columnValues.put(filedName, datatype);
}
return columnValues;
} catch (Exception e) {
throw new DorisSystemException(
String.format("The following SQL query could not be executed: %s", sql), e);
}
}

private static void buildColumn(StringBuilder sql, FieldSchema field, boolean isKey) {
String fieldType = field.getTypeString();
if (isKey && DorisType.STRING.equals(fieldType)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.sink.writer.serializer;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.sink.writer.serializer.jsondebezium.CdcDataChange;
import org.apache.doris.flink.sink.writer.serializer.jsondebezium.CdcSchemaChange;
import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeContext;
import org.apache.doris.flink.sink.writer.serializer.jsondebezium.MongoJsonDebeziumDataChange;
import org.apache.doris.flink.sink.writer.serializer.jsondebezium.MongoJsonDebeziumSchemaChange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Map;
import java.util.regex.Pattern;

import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;

public class MongoDBJsonDebeziumSchemaSerializer implements DorisRecordSerializer<String> {

private static final Logger LOG =
LoggerFactory.getLogger(MongoDBJsonDebeziumSchemaSerializer.class);
private final Pattern pattern;
private final DorisOptions dorisOptions;
private final ObjectMapper objectMapper = new ObjectMapper();
// table name of the cdc upstream, format is db.tbl
private final String sourceTableName;
private String lineDelimiter = LINE_DELIMITER_DEFAULT;
private boolean ignoreUpdateBefore = true;
// <cdc db.schema.table, doris db.table>
private Map<String, String> tableMapping;
// create table properties
private Map<String, String> tableProperties;
private String targetDatabase;

private CdcDataChange dataChange;
private CdcSchemaChange schemaChange;

private String targetTablePrefix;
private String targetTableSuffix;

public MongoDBJsonDebeziumSchemaSerializer(
DorisOptions dorisOptions,
Pattern pattern,
String sourceTableName,
DorisExecutionOptions executionOptions,
Map<String, String> tableMapping,
Map<String, String> tableProperties,
String targetDatabase,
String targetTablePrefix,
String targetTableSuffix) {
this.dorisOptions = dorisOptions;
this.pattern = pattern;
this.sourceTableName = sourceTableName;
// Prevent loss of decimal data precision
this.objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
JsonNodeFactory jsonNodeFactory = JsonNodeFactory.withExactBigDecimals(true);
this.objectMapper.setNodeFactory(jsonNodeFactory);
this.tableMapping = tableMapping;
this.tableProperties = tableProperties;
this.targetDatabase = targetDatabase;
this.targetTablePrefix = targetTablePrefix;
this.targetTableSuffix = targetTableSuffix;
if (executionOptions != null) {
this.lineDelimiter =
executionOptions
.getStreamLoadProp()
.getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT);
this.ignoreUpdateBefore = executionOptions.getIgnoreUpdateBefore();
}
init();
}

private void init() {
JsonDebeziumChangeContext changeContext =
new JsonDebeziumChangeContext(
dorisOptions,
tableMapping,
sourceTableName,
targetDatabase,
tableProperties,
objectMapper,
pattern,
lineDelimiter,
ignoreUpdateBefore,
targetTablePrefix,
targetTableSuffix);
this.dataChange = new MongoJsonDebeziumDataChange(changeContext);
this.schemaChange = new MongoJsonDebeziumSchemaChange(changeContext);
}

@Override
public DorisRecord serialize(String record) throws IOException {
LOG.debug("received debezium json data {} :", record);
JsonNode recordRoot = objectMapper.readValue(record, JsonNode.class);
String op = getOperateType(recordRoot);
try {
schemaChange.schemaChange(recordRoot);
} catch (Exception e) {
throw new RuntimeException(e);
}
return dataChange.serialize(record, recordRoot, op);
}

private String getOperateType(JsonNode recordRoot) {
return recordRoot.get("operationType").asText();
}

public static MongoDBJsonDebeziumSchemaSerializer.Builder builder() {
return new MongoDBJsonDebeziumSchemaSerializer.Builder();
}

public static class Builder {
private DorisOptions dorisOptions;
private Pattern addDropDDLPattern;
private String sourceTableName;
private DorisExecutionOptions executionOptions;
private Map<String, String> tableMapping;
private Map<String, String> tableProperties;
private String targetDatabase;
private String targetTablePrefix = "";
private String targetTableSuffix = "";

public MongoDBJsonDebeziumSchemaSerializer.Builder setDorisOptions(
DorisOptions dorisOptions) {
this.dorisOptions = dorisOptions;
return this;
}

public MongoDBJsonDebeziumSchemaSerializer.Builder setPattern(Pattern addDropDDLPattern) {
this.addDropDDLPattern = addDropDDLPattern;
return this;
}

public MongoDBJsonDebeziumSchemaSerializer.Builder setSourceTableName(
String sourceTableName) {
this.sourceTableName = sourceTableName;
return this;
}

public MongoDBJsonDebeziumSchemaSerializer.Builder setExecutionOptions(
DorisExecutionOptions executionOptions) {
this.executionOptions = executionOptions;
return this;
}

public MongoDBJsonDebeziumSchemaSerializer.Builder setTableMapping(
Map<String, String> tableMapping) {
this.tableMapping = tableMapping;
return this;
}

public MongoDBJsonDebeziumSchemaSerializer.Builder setTableProperties(
Map<String, String> tableProperties) {
this.tableProperties = tableProperties;
return this;
}

public MongoDBJsonDebeziumSchemaSerializer.Builder setTargetDatabase(
String targetDatabase) {
this.targetDatabase = targetDatabase;
return this;
}

public MongoDBJsonDebeziumSchemaSerializer build() {
return new MongoDBJsonDebeziumSchemaSerializer(
dorisOptions,
addDropDDLPattern,
sourceTableName,
executionOptions,
tableMapping,
tableProperties,
targetDatabase,
targetTablePrefix,
targetTableSuffix);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
*/
public abstract class CdcDataChange implements ChangeEvent {

protected abstract DorisRecord serialize(String record, JsonNode recordRoot, String op)
public abstract DorisRecord serialize(String record, JsonNode recordRoot, String op)
throws IOException;

protected abstract Map<String, Object> extractBeforeRow(JsonNode record);
Expand Down
Loading

0 comments on commit ca358a8

Please sign in to comment.