diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/pom.xml new file mode 100644 index 0000000000..285597836e --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/pom.xml @@ -0,0 +1,200 @@ + + + + + + org.apache.flink + flink-cdc-pipeline-connectors + ${revision} + + 4.0.0 + + flink-cdc-pipeline-connector-postgres + + + + + + + org.apache.flink + flink-connector-postgres-cdc + ${project.version} + + + + org.apache.flink + flink-connector-test-util + ${project.version} + test + + + + io.debezium + debezium-core + ${debezium.version} + test-jar + test + + + + org.apache.flink + flink-table-planner_${scala.binary.version} + ${flink.version} + test + + + + org.apache.flink + flink-table-runtime + ${flink.version} + test + + + + org.apache.flink + flink-test-utils + ${flink.version} + test + + + + org.apache.flink + flink-connector-test-utils + ${flink.version} + test + + + + org.apache.flink + flink-core + ${flink.version} + test-jar + test + + + + org.apache.flink + flink-streaming-java + ${flink.version} + test-jar + test + + + + org.apache.flink + flink-table-common + ${flink.version} + test-jar + test + + + + org.apache.flink + flink-tests + ${flink.version} + test-jar + test + + + + org.apache.flink + flink-table-planner_${scala.binary.version} + ${flink.version} + test-jar + test + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.4 + + + shade-flink + package + + shade + + + false + + + io.debezium:debezium-api + io.debezium:debezium-embedded + io.debezium:debezium-core + io.debezium:debezium-connector-postgres + org.apache.flink:flink-cdc-base + org.apache.flink:flink-connector-debezium + org.apache.flink:flink-connector-postgres-cdc + com.zaxxer:HikariCP + com.google.protobuf:protobuf-java + com.google.guava:* + org.apache.kafka:* + org.postgresql:postgresql + com.fasterxml.*:* + + org.apache.flink:flink-shaded-guava + + + + + org.apache.kafka:* + + kafka/kafka-version.properties + LICENSE + + NOTICE + common/** + + + + + + org.apache.kafka + + org.apache.flink.cdc.connectors.shaded.org.apache.kafka + + + + com.google + + org.apache.flink.cdc.connectors.shaded.com.google + + + + com.fasterxml + + org.apache.flink.cdc.connectors.shaded.com.fasterxml + + + + + + + + + + \ No newline at end of file diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java new file mode 100644 index 0000000000..52f9207522 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.postgres.factory; + +import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.configuration.ConfigOption; +import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.factories.DataSourceFactory; +import org.apache.flink.cdc.common.factories.Factory; +import org.apache.flink.cdc.common.schema.Selectors; +import org.apache.flink.cdc.common.source.DataSource; +import org.apache.flink.cdc.connectors.base.options.SourceOptions; +import org.apache.flink.cdc.connectors.base.options.StartupOptions; +import org.apache.flink.cdc.connectors.postgres.source.PostgresDataSource; +import org.apache.flink.cdc.connectors.postgres.source.PostgresSourceBuilder; +import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig; +import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory; +import org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.data.RowData; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.cdc.connectors.base.utils.ObjectUtils.doubleCompare; +import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; +import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; +import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.CHUNK_META_GROUP_SIZE; +import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.CONNECTION_POOL_SIZE; +import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.CONNECT_MAX_RETRIES; +import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.CONNECT_TIMEOUT; +import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.DECODING_PLUGIN_NAME; +import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.HEARTBEAT_INTERVAL; +import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.HOSTNAME; +import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.PASSWORD; +import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.PG_PORT; +import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED; +import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP; +import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN; +import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; +import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; +import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_STARTUP_MODE; +import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE; +import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS; +import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS; +import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SLOT_NAME; +import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; +import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; +import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.TABLES; +import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.TABLES_EXCLUDE; +import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.USERNAME; +import static org.apache.flink.cdc.debezium.table.DebeziumOptions.getDebeziumProperties; +import static org.apache.flink.util.Preconditions.checkState; + +/** A {@link Factory} to create {@link PostgresDataSource}. */ +@Internal +public class PostgresDataSourceFactory implements DataSourceFactory { + + private static final Logger LOG = LoggerFactory.getLogger(PostgresDataSourceFactory.class); + + public static final String IDENTIFIER = "postgres"; + + @Override + public DataSource createDataSource(Context context) { + final Configuration config = context.getFactoryConfiguration(); + String hostname = config.get(HOSTNAME); + int port = config.get(PG_PORT); + String pluginName = config.get(DECODING_PLUGIN_NAME); + String slotName = config.get(SLOT_NAME); + String username = config.get(USERNAME); + String password = config.get(PASSWORD); + String chunkKeyColumn = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN); + String tables = config.get(TABLES); + String tablesExclude = config.get(TABLES_EXCLUDE); + Duration heartbeatInterval = config.get(HEARTBEAT_INTERVAL); + StartupOptions startupOptions = getStartupOptions(config); + + int fetchSize = config.get(SCAN_SNAPSHOT_FETCH_SIZE); + int splitSize = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE); + int splitMetaGroupSize = config.get(CHUNK_META_GROUP_SIZE); + + double distributionFactorUpper = config.get(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND); + double distributionFactorLower = config.get(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND); + + boolean closeIdleReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); + + Duration connectTimeout = config.get(CONNECT_TIMEOUT); + int connectMaxRetries = config.get(CONNECT_MAX_RETRIES); + int connectionPoolSize = config.get(CONNECTION_POOL_SIZE); + boolean skipSnapshotBackfill = config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP); + + validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1); + validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1); + validateIntegerOption(SCAN_SNAPSHOT_FETCH_SIZE, fetchSize, 1); + validateIntegerOption(CONNECTION_POOL_SIZE, connectionPoolSize, 1); + validateIntegerOption(CONNECT_MAX_RETRIES, connectMaxRetries, 0); + validateDistributionFactorUpper(distributionFactorUpper); + validateDistributionFactorLower(distributionFactorLower); + + Map configMap = config.toMap(); + String firstTable = tables.split(",")[0]; + TableId tableId = TableId.parse(firstTable); + + PostgresSourceConfigFactory configFactory = + PostgresSourceBuilder.PostgresIncrementalSource.builder() + .hostname(hostname) + .port(port) + .database(tableId.getNamespace()) + .schemaList(".*") + .tableList(".*") + .username(username) + .password(password) + .decodingPluginName(pluginName) + .slotName(slotName) + .debeziumProperties(getDebeziumProperties(configMap)) + .splitSize(splitSize) + .splitMetaGroupSize(splitMetaGroupSize) + .distributionFactorUpper(distributionFactorUpper) + .distributionFactorLower(distributionFactorLower) + .fetchSize(fetchSize) + .connectTimeout(connectTimeout) + .connectMaxRetries(connectMaxRetries) + .connectionPoolSize(connectionPoolSize) + .startupOptions(startupOptions) + .chunkKeyColumn(chunkKeyColumn) + .heartbeatInterval(heartbeatInterval) + .closeIdleReaders(closeIdleReaders) + .skipSnapshotBackfill(skipSnapshotBackfill) + .getConfigFactory(); + + Selectors selectors = new Selectors.SelectorsBuilder().includeTables(tables).build(); + List capturedTables = getTableList(configFactory.create(0), selectors); + if (capturedTables.isEmpty()) { + throw new IllegalArgumentException( + "Cannot find any table by the option 'tables' = " + tables); + } + if (tablesExclude != null) { + Selectors selectExclude = + new Selectors.SelectorsBuilder().includeTables(tablesExclude).build(); + List excludeTables = getTableList(configFactory.create(0), selectExclude); + if (!excludeTables.isEmpty()) { + capturedTables.removeAll(excludeTables); + } + if (capturedTables.isEmpty()) { + throw new IllegalArgumentException( + "Cannot find any table with by the option 'tables.exclude' = " + + tablesExclude); + } + } + configFactory.tableList(capturedTables.toArray(new String[0])); + + return new PostgresDataSource(configFactory); + } + + @Override + public Set> requiredOptions() { + Set> options = new HashSet<>(); + options.add(HOSTNAME); + options.add(USERNAME); + options.add(PASSWORD); + options.add(TABLES); + options.add(SLOT_NAME); + return options; + } + + @Override + public Set> optionalOptions() { + Set> options = new HashSet<>(); + options.add(PG_PORT); + options.add(TABLES_EXCLUDE); + options.add(DECODING_PLUGIN_NAME); + options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE); + options.add(SCAN_SNAPSHOT_FETCH_SIZE); + options.add(SCAN_STARTUP_MODE); + options.add(SCAN_STARTUP_TIMESTAMP_MILLIS); + options.add(SCAN_STARTUP_SPECIFIC_OFFSET_FILE); + options.add(SCAN_STARTUP_SPECIFIC_OFFSET_POS); + options.add(CONNECT_TIMEOUT); + options.add(CONNECT_MAX_RETRIES); + options.add(CONNECTION_POOL_SIZE); + options.add(HEARTBEAT_INTERVAL); + options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); + options.add(CHUNK_META_GROUP_SIZE); + options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND); + options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND); + return options; + } + + @Override + public String identifier() { + return IDENTIFIER; + } + + private static List getTableList( + PostgresSourceConfig sourceConfig, Selectors selectors) { + return PostgresSchemaUtils.listTables(sourceConfig, null).stream() + .filter(selectors::isMatch) + .map(TableId::toString) + .collect(Collectors.toList()); + } + + /** Checks the value of given integer option is valid. */ + private void validateIntegerOption( + ConfigOption option, int optionValue, int exclusiveMin) { + checkState( + optionValue > exclusiveMin, + String.format( + "The value of option '%s' must larger than %d, but is %d", + option.key(), exclusiveMin, optionValue)); + } + + private static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial"; + + private static final String SCAN_STARTUP_MODE_VALUE_SNAPSHOT = "snapshot"; + private static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset"; + + private static StartupOptions getStartupOptions(Configuration config) { + String modeString = config.get(SCAN_STARTUP_MODE); + + switch (modeString.toLowerCase()) { + case SCAN_STARTUP_MODE_VALUE_INITIAL: + return StartupOptions.initial(); + case SCAN_STARTUP_MODE_VALUE_SNAPSHOT: + return StartupOptions.snapshot(); + case SCAN_STARTUP_MODE_VALUE_LATEST: + return StartupOptions.latest(); + + default: + throw new ValidationException( + String.format( + "Invalid value for option '%s'. Supported values are [%s, %s, %s], but was: %s", + SourceOptions.SCAN_STARTUP_MODE.key(), + SCAN_STARTUP_MODE_VALUE_INITIAL, + SCAN_STARTUP_MODE_VALUE_SNAPSHOT, + SCAN_STARTUP_MODE_VALUE_LATEST, + modeString)); + } + } + + /** Checks the value of given evenly distribution factor upper bound is valid. */ + private void validateDistributionFactorUpper(double distributionFactorUpper) { + checkState( + doubleCompare(distributionFactorUpper, 1.0d) >= 0, + String.format( + "The value of option '%s' must larger than or equals %s, but is %s", + SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.key(), + 1.0d, + distributionFactorUpper)); + } + + /** Checks the value of given evenly distribution factor lower bound is valid. */ + private void validateDistributionFactorLower(double distributionFactorLower) { + checkState( + doubleCompare(distributionFactorLower, 0.0d) >= 0 + && doubleCompare(distributionFactorLower, 1.0d) <= 0, + String.format( + "The value of option '%s' must between %s and %s inclusively, but is %s", + SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.key(), + 0.0d, + 1.0d, + distributionFactorLower)); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java new file mode 100644 index 0000000000..208468432e --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.postgres.source; + +import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.annotation.VisibleForTesting; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.source.DataSource; +import org.apache.flink.cdc.common.source.EventSourceProvider; +import org.apache.flink.cdc.common.source.FlinkSourceProvider; +import org.apache.flink.cdc.common.source.MetadataAccessor; +import org.apache.flink.cdc.connectors.base.config.SourceConfig; +import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource; +import org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords; +import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState; +import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics; +import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig; +import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory; +import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory; +import org.apache.flink.cdc.connectors.postgres.source.reader.PostgresPipelineRecordEmitter; +import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; +import org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema; +import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode; +import org.apache.flink.connector.base.source.reader.RecordEmitter; + +/** A {@link DataSource} for Postgres cdc connector. */ +@Internal +public class PostgresDataSource implements DataSource { + + private final PostgresSourceConfigFactory configFactory; + private final PostgresSourceConfig postgresSourceConfig; + + public PostgresDataSource(PostgresSourceConfigFactory configFactory) { + this.configFactory = configFactory; + this.postgresSourceConfig = configFactory.create(0); + } + + @Override + public EventSourceProvider getEventSourceProvider() { + DebeziumEventDeserializationSchema deserializer = + new PostgresEventDeserializer( + DebeziumChangelogMode.ALL, postgresSourceConfig.isIncludeSchemaChanges()); + + PostgresOffsetFactory postgresOffsetFactory = new PostgresOffsetFactory(); + PostgresDialect postgresDialect = new PostgresDialect(postgresSourceConfig); + + PostgresSourceBuilder.PostgresIncrementalSource source = + new PostgresPipelineSource<>( + configFactory, + deserializer, + postgresOffsetFactory, + postgresDialect, + postgresSourceConfig); + + return FlinkSourceProvider.of(source); + } + + @Override + public MetadataAccessor getMetadataAccessor() { + return new PostgresMetadataAccessor(postgresSourceConfig); + } + + @VisibleForTesting + public PostgresSourceConfig getPostgresSourceConfig() { + return postgresSourceConfig; + } + + /** The {@link JdbcIncrementalSource} implementation for Postgres. */ + public static class PostgresPipelineSource + extends PostgresSourceBuilder.PostgresIncrementalSource { + private final PostgresSourceConfig sourceConfig; + private final PostgresDialect dataSourceDialect; + + public PostgresPipelineSource( + PostgresSourceConfigFactory configFactory, + DebeziumDeserializationSchema deserializationSchema, + PostgresOffsetFactory offsetFactory, + PostgresDialect dataSourceDialect, + PostgresSourceConfig sourceConfig) { + super(configFactory, deserializationSchema, offsetFactory, dataSourceDialect); + this.sourceConfig = sourceConfig; + this.dataSourceDialect = dataSourceDialect; + } + + @Override + protected RecordEmitter createRecordEmitter( + SourceConfig sourceConfig, SourceReaderMetrics sourceReaderMetrics) { + return new PostgresPipelineRecordEmitter<>( + deserializationSchema, + sourceReaderMetrics, + this.sourceConfig, + offsetFactory, + this.dataSourceDialect); + } + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java new file mode 100644 index 0000000000..8c6e26d63c --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.postgres.source; + +import org.apache.flink.cdc.common.annotation.Experimental; +import org.apache.flink.cdc.common.annotation.PublicEvolving; +import org.apache.flink.cdc.common.configuration.ConfigOption; +import org.apache.flink.cdc.common.configuration.ConfigOptions; +import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode; + +import java.time.Duration; + +/** Configurations for {@link PostgresDataSource}. */ +@PublicEvolving +public class PostgresDataSourceOptions { + + public static final ConfigOption HOSTNAME = + ConfigOptions.key("hostname") + .stringType() + .noDefaultValue() + .withDescription("IP address or hostname of the PostgreSQL database server."); + public static final ConfigOption PG_PORT = + ConfigOptions.key("port") + .intType() + .defaultValue(5432) + .withDescription("Integer port number of the PostgreSQL database server."); + + public static final ConfigOption USERNAME = + ConfigOptions.key("username") + .stringType() + .noDefaultValue() + .withDescription( + "Name of the PostgreSQL database to use when connecting to the PostgreSQL database server."); + + public static final ConfigOption PASSWORD = + ConfigOptions.key("password") + .stringType() + .noDefaultValue() + .withDescription( + "Password to use when connecting to the PostgreSQL database server."); + public static final ConfigOption TABLES = + ConfigOptions.key("tables") + .stringType() + .noDefaultValue() + .withDescription( + "Table names of the PostgreSQL tables to monitor. Regular expressions are supported. " + + "It is important to note that the dot (.) is treated as a delimiter for database and table names. " + + "If there is a need to use a dot (.) in a regular expression to match any character, " + + "it is necessary to escape the dot with a backslash." + + "eg. db0.\\.*, db1.user_table_[0-9]+, db[1-2].[app|web]_order_\\.*"); + + public static final ConfigOption DECODING_PLUGIN_NAME = + ConfigOptions.key("decoding.plugin.name") + .stringType() + .defaultValue("decoderbufs") + .withDescription( + "The name of the Postgres logical decoding plug-in installed on the server.\n" + + "Supported values are decoderbufs, wal2json, wal2json_rds, wal2json_streaming,\n" + + "wal2json_rds_streaming and pgoutput."); + + public static final ConfigOption SLOT_NAME = + ConfigOptions.key("slot.name") + .stringType() + .noDefaultValue() + .withDescription( + "The name of the PostgreSQL logical decoding slot that was created for streaming changes " + + "from a particular plug-in for a particular database/schema. The server uses this slot " + + "to stream events to the connector that you are configuring."); + + public static final ConfigOption CHANGELOG_MODE = + ConfigOptions.key("changelog-mode") + .enumType(DebeziumChangelogMode.class) + .defaultValue(DebeziumChangelogMode.ALL) + .withDescription( + "The changelog mode used for encoding streaming changes.\n" + + "\"all\": Encodes changes as retract stream using all RowKinds. This is the default mode.\n" + + "\"upsert\": Encodes changes as upsert stream that describes idempotent updates on a key. It can be used for tables with primary keys when replica identity FULL is not an option."); + + public static final ConfigOption SCAN_INCREMENTAL_SNAPSHOT_ENABLED = + ConfigOptions.key("scan.incremental.snapshot.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Incremental snapshot is a new mechanism to read snapshot of a table. " + + "Compared to the old snapshot mechanism, the incremental snapshot has many advantages, including:\n" + + "(1) source can be parallel during snapshot reading, \n" + + "(2) source can perform checkpoints in the chunk granularity during snapshot reading, \n" + + "(3) source doesn't need to acquire global read lock before snapshot reading."); + + public static final ConfigOption SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN = + ConfigOptions.key("scan.incremental.snapshot.chunk.key-column") + .stringType() + .noDefaultValue() + .withDescription( + "The chunk key of table snapshot, captured tables are split into multiple chunks by a chunk key when read the snapshot of table." + + "By default, the chunk key is the first column of the primary key and the chunk key is the RowId in oracle." + + "This column must be a column of the primary key."); + + public static final ConfigOption SERVER_TIME_ZONE = + ConfigOptions.key("server-time-zone") + .stringType() + .noDefaultValue() + .withDescription( + "The session time zone in database server. If not set, then " + + "ZoneId.systemDefault() is used to determine the server time zone."); + + public static final ConfigOption SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE = + ConfigOptions.key("scan.incremental.snapshot.chunk.size") + .intType() + .defaultValue(8096) + .withDescription( + "The chunk size (number of rows) of table snapshot, captured tables are split into multiple chunks when read the snapshot of table."); + + public static final ConfigOption SCAN_SNAPSHOT_FETCH_SIZE = + ConfigOptions.key("scan.snapshot.fetch.size") + .intType() + .defaultValue(1024) + .withDescription( + "The maximum fetch size for per poll when read table snapshot."); + + public static final ConfigOption CONNECT_TIMEOUT = + ConfigOptions.key("connect.timeout") + .durationType() + .defaultValue(Duration.ofSeconds(30)) + .withDescription( + "The maximum time that the connector should wait after trying to connect to the PostgreSQL database server before timing out."); + + public static final ConfigOption CONNECTION_POOL_SIZE = + ConfigOptions.key("connection.pool.size") + .intType() + .defaultValue(20) + .withDescription("The connection pool size."); + + public static final ConfigOption CONNECT_MAX_RETRIES = + ConfigOptions.key("connect.max-retries") + .intType() + .defaultValue(3) + .withDescription( + "The max retry times that the connector should retry to build PostgreSQL database server connection."); + + public static final ConfigOption SCAN_STARTUP_MODE = + ConfigOptions.key("scan.startup.mode") + .stringType() + .defaultValue("initial") + .withDescription( + "Optional startup mode for PostgreSQL CDC consumer, valid enumerations are " + + "\"initial\", \"earliest-offset\", \"latest-offset\", \"timestamp\"\n" + + "or \"specific-offset\""); + + public static final ConfigOption SCAN_STARTUP_SPECIFIC_OFFSET_FILE = + ConfigOptions.key("scan.startup.specific-offset.file") + .stringType() + .noDefaultValue() + .withDescription( + "Optional binlog file name used in case of \"specific-offset\" startup mode"); + + public static final ConfigOption SCAN_STARTUP_SPECIFIC_OFFSET_POS = + ConfigOptions.key("scan.startup.specific-offset.pos") + .longType() + .noDefaultValue() + .withDescription( + "Optional binlog file position used in case of \"specific-offset\" startup mode"); + + public static final ConfigOption SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET = + ConfigOptions.key("scan.startup.specific-offset.gtid-set") + .stringType() + .noDefaultValue() + .withDescription( + "Optional GTID set used in case of \"specific-offset\" startup mode"); + + public static final ConfigOption SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS = + ConfigOptions.key("scan.startup.specific-offset.skip-events") + .longType() + .noDefaultValue() + .withDescription( + "Optional number of events to skip after the specific starting offset"); + + public static final ConfigOption SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS = + ConfigOptions.key("scan.startup.specific-offset.skip-rows") + .longType() + .noDefaultValue() + .withDescription("Optional number of rows to skip after the specific offset"); + + public static final ConfigOption SCAN_STARTUP_TIMESTAMP_MILLIS = + ConfigOptions.key("scan.startup.timestamp-millis") + .longType() + .noDefaultValue() + .withDescription( + "Optional timestamp used in case of \"timestamp\" startup mode"); + + public static final ConfigOption HEARTBEAT_INTERVAL = + ConfigOptions.key("heartbeat.interval") + .durationType() + .defaultValue(Duration.ofSeconds(30)) + .withDescription( + "Optional interval of sending heartbeat event for tracing the latest available binlog offsets"); + + public static final org.apache.flink.configuration.ConfigOption + SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND = + org.apache.flink.configuration.ConfigOptions.key( + "chunk-key.even-distribution.factor.upper-bound") + .doubleType() + .defaultValue(1000.0d) + .withFallbackKeys("split-key.even-distribution.factor.upper-bound") + .withDescription( + "The upper bound of chunk key distribution factor. The distribution factor is used to determine whether the" + + " table is evenly distribution or not." + + " The table chunks would use evenly calculation optimization when the data distribution is even," + + " and the query for splitting would happen when it is uneven." + + " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount."); + + public static final org.apache.flink.configuration.ConfigOption + SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND = + org.apache.flink.configuration.ConfigOptions.key( + "chunk-key.even-distribution.factor.lower-bound") + .doubleType() + .defaultValue(0.05d) + .withFallbackKeys("split-key.even-distribution.factor.lower-bound") + .withDescription( + "The lower bound of chunk key distribution factor. The distribution factor is used to determine whether the" + + " table is evenly distribution or not." + + " The table chunks would use evenly calculation optimization when the data distribution is even," + + " and the query for splitting would happen when it is uneven." + + " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount."); + public static final ConfigOption SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP = + ConfigOptions.key("scan.incremental.snapshot.backfill.skip") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to skip backfill in snapshot reading phase. If backfill is skipped, changes on captured tables during snapshot phase will be consumed later in change log reading phase instead of being merged into the snapshot.WARNING: Skipping backfill might lead to data inconsistency because some change log events happened within the snapshot phase might be replayed (only at-least-once semantic is promised). For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially."); + + // ---------------------------------------------------------------------------- + // experimental options, won't add them to documentation + // ---------------------------------------------------------------------------- + @Experimental + public static final ConfigOption CHUNK_META_GROUP_SIZE = + ConfigOptions.key("chunk-meta.group.size") + .intType() + .defaultValue(1000) + .withDescription( + "The group size of chunk meta, if the meta size exceeds the group size, the meta will be divided into multiple groups."); + + @Experimental + public static final ConfigOption CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND = + ConfigOptions.key("chunk-key.even-distribution.factor.upper-bound") + .doubleType() + .defaultValue(1000.0d) + .withFallbackKeys("split-key.even-distribution.factor.upper-bound") + .withDescription( + "The upper bound of chunk key distribution factor. The distribution factor is used to determine whether the" + + " table is evenly distribution or not." + + " The table chunks would use evenly calculation optimization when the data distribution is even," + + " and the query PostgreSQL for splitting would happen when it is uneven." + + " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount."); + + @Experimental + public static final ConfigOption CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND = + ConfigOptions.key("chunk-key.even-distribution.factor.lower-bound") + .doubleType() + .defaultValue(0.05d) + .withFallbackKeys("split-key.even-distribution.factor.lower-bound") + .withDescription( + "The lower bound of chunk key distribution factor. The distribution factor is used to determine whether the" + + " table is evenly distribution or not." + + " The table chunks would use evenly calculation optimization when the data distribution is even," + + " and the query PostgreSQL for splitting would happen when it is uneven." + + " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount."); + + @Experimental + public static final ConfigOption SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED = + ConfigOptions.key("scan.incremental.close-idle-reader.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to close idle readers at the end of the snapshot phase. This feature depends on " + + "FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be " + + "greater than or equal to 1.14 when enabling this feature."); + + @Experimental + public static final ConfigOption SCHEMA_CHANGE_ENABLED = + ConfigOptions.key("schema-change.enabled") + .booleanType() + .defaultValue(true) + .withDescription( + "Whether send schema change events, by default is true. If set to false, the schema changes will not be sent."); + + @Experimental + public static final ConfigOption TABLES_EXCLUDE = + ConfigOptions.key("tables.exclude") + .stringType() + .noDefaultValue() + .withDescription( + "Table names of the PostgreSQL tables to Exclude. Regular expressions are supported. " + + "It is important to note that the dot (.) is treated as a delimiter for database and table names. " + + "If there is a need to use a dot (.) in a regular expression to match any character, " + + "it is necessary to escape the dot with a backslash." + + "eg. db0.\\.*, db1.user_table_[0-9]+, db[1-2].[app|web]_order_\\.*"); +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java new file mode 100644 index 0000000000..3702bb7959 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.postgres.source; + +import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema; +import org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference; +import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode; + +import io.debezium.data.Envelope; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** Event deserializer for {@link PostgresDataSource}. */ +@Internal +public class PostgresEventDeserializer extends DebeziumEventDeserializationSchema { + + private static final long serialVersionUID = 1L; + private final boolean includeSchemaChanges; + + public PostgresEventDeserializer( + DebeziumChangelogMode changelogMode, boolean includeSchemaChanges) { + super(new DebeziumSchemaDataTypeInference(), changelogMode); + this.includeSchemaChanges = includeSchemaChanges; + } + + @Override + protected List deserializeSchemaChangeRecord(SourceRecord record) { + return Collections.emptyList(); + } + + @Override + protected boolean isDataChangeRecord(SourceRecord record) { + Schema valueSchema = record.valueSchema(); + Struct value = (Struct) record.value(); + return value != null + && valueSchema != null + && valueSchema.field(Envelope.FieldName.OPERATION) != null + && value.getString(Envelope.FieldName.OPERATION) != null; + } + + @Override + protected boolean isSchemaChangeRecord(SourceRecord record) { + return false; + } + + @Override + protected TableId getTableId(SourceRecord record) { + String[] parts = record.topic().split("\\."); + return TableId.tableId(parts[1], parts[2]); + } + + @Override + protected Map getMetadata(SourceRecord record) { + return Collections.emptyMap(); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresMetadataAccessor.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresMetadataAccessor.java new file mode 100644 index 0000000000..ea94fd0ae4 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresMetadataAccessor.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.postgres.source; + +import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.source.MetadataAccessor; +import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig; +import org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils; + +import io.debezium.connector.postgresql.PostgresPartition; + +import javax.annotation.Nullable; + +import java.util.List; + +/** {@link MetadataAccessor} for {@link PostgresDataSource}. */ +@Internal +public class PostgresMetadataAccessor implements MetadataAccessor { + + private final PostgresSourceConfig sourceConfig; + + private final PostgresPartition partition; + + public PostgresMetadataAccessor(PostgresSourceConfig sourceConfig) { + this.sourceConfig = sourceConfig; + this.partition = + new PostgresPartition(sourceConfig.getDbzConnectorConfig().getLogicalName()); + } + + /** + * Always throw {@link UnsupportedOperationException} because postgres does not support + * namespace. + */ + @Override + public List listNamespaces() { + return PostgresSchemaUtils.listNamespaces(sourceConfig); + } + + /** + * List all database from postgres. + * + * @param namespace This parameter is ignored because postgres does not support namespace. + * @return The list of database + */ + @Override + public List listSchemas(@Nullable String namespace) { + return PostgresSchemaUtils.listSchemas(sourceConfig, namespace); + } + + /** + * List tables from postgres. + * + * @param namespace This parameter is ignored because postgres does not support namespace. + * @param dbName The database to list tables from. If null, list tables from all databases. + * @return The list of {@link TableId}s. + */ + @Override + public List listTables(@Nullable String namespace, @Nullable String dbName) { + return PostgresSchemaUtils.listTables(sourceConfig, dbName); + } + + /** + * Get the {@link Schema} of the given table. + * + * @param tableId The {@link TableId} of the given table. + * @return The {@link Schema} of the table. + */ + @Override + public Schema getTableSchema(TableId tableId) { + return PostgresSchemaUtils.getTableSchema(sourceConfig, partition, tableId); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java new file mode 100644 index 0000000000..e84f6a4d26 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.postgres.source.reader; + +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.connectors.base.options.StartupMode; +import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory; +import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState; +import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics; +import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter; +import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect; +import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig; +import org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils; +import org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils; +import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; +import org.apache.flink.connector.base.source.reader.RecordEmitter; + +import io.debezium.connector.postgresql.connection.PostgresConnection; +import io.debezium.relational.TableId; +import org.apache.kafka.connect.source.SourceRecord; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isLowWatermarkEvent; + +/** The {@link RecordEmitter} implementation for pipeline oracle connector. */ +public class PostgresPipelineRecordEmitter extends IncrementalSourceRecordEmitter { + private final PostgresSourceConfig sourceConfig; + private final PostgresDialect postgresDialect; + + // Used when startup mode is initial + private Set alreadySendCreateTableTables; + + // Used when startup mode is not initial + private boolean alreadySendCreateTableForBinlogSplit = false; + private final List createTableEventCache; + + public PostgresPipelineRecordEmitter( + DebeziumDeserializationSchema debeziumDeserializationSchema, + SourceReaderMetrics sourceReaderMetrics, + PostgresSourceConfig sourceConfig, + OffsetFactory offsetFactory, + PostgresDialect postgresDialect) { + super( + debeziumDeserializationSchema, + sourceReaderMetrics, + sourceConfig.isIncludeSchemaChanges(), + offsetFactory); + this.sourceConfig = sourceConfig; + this.postgresDialect = postgresDialect; + this.alreadySendCreateTableTables = new HashSet<>(); + this.createTableEventCache = new ArrayList<>(); + + if (!sourceConfig.getStartupOptions().startupMode.equals(StartupMode.INITIAL)) { + try (PostgresConnection jdbc = postgresDialect.openJdbcConnection()) { + List capturedTableIds = + TableDiscoveryUtils.listTables( + sourceConfig.getDatabaseList().get(0), + jdbc, + sourceConfig.getTableFilters()); + for (TableId tableId : capturedTableIds) { + Schema schema = PostgresSchemaUtils.getTableSchema(tableId, sourceConfig, jdbc); + createTableEventCache.add( + new CreateTableEvent( + org.apache.flink.cdc.common.event.TableId.tableId( + tableId.schema(), tableId.table()), + schema)); + } + } catch (SQLException e) { + throw new RuntimeException("Cannot start emitter to fetch table schema.", e); + } + } + } + + @Override + protected void processElement( + SourceRecord element, SourceOutput output, SourceSplitState splitState) + throws Exception { + if (isLowWatermarkEvent(element) && splitState.isSnapshotSplitState()) { + TableId tableId = splitState.asSnapshotSplitState().toSourceSplit().getTableId(); + if (!alreadySendCreateTableTables.contains(tableId)) { + try (PostgresConnection jdbc = postgresDialect.openJdbcConnection()) { + sendCreateTableEvent(jdbc, tableId, (SourceOutput) output); + alreadySendCreateTableTables.add(tableId); + } + } + } else if (splitState.isStreamSplitState() + && !alreadySendCreateTableForBinlogSplit + && !sourceConfig.getStartupOptions().startupMode.equals(StartupMode.INITIAL)) { + for (CreateTableEvent createTableEvent : createTableEventCache) { + output.collect((T) createTableEvent); + } + alreadySendCreateTableForBinlogSplit = true; + } + super.processElement(element, output, splitState); + } + + private void sendCreateTableEvent( + PostgresConnection jdbc, TableId tableId, SourceOutput output) { + Schema schema = PostgresSchemaUtils.getTableSchema(tableId, sourceConfig, jdbc); + output.collect( + new CreateTableEvent( + org.apache.flink.cdc.common.event.TableId.tableId( + tableId.schema(), tableId.table()), + schema)); + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java new file mode 100644 index 0000000000..a057f53970 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresSchemaUtils.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.postgres.utils; + +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect; +import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig; +import org.apache.flink.cdc.connectors.postgres.source.utils.CustomPostgresSchema; + +import io.debezium.connector.postgresql.PostgresPartition; +import io.debezium.connector.postgresql.connection.PostgresConnection; +import io.debezium.jdbc.JdbcConnection; +import io.debezium.relational.Table; +import io.debezium.relational.history.TableChanges; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** Utilities for converting from debezium {@link Table} types to {@link Schema}. */ +public class PostgresSchemaUtils { + + private static final Logger LOG = LoggerFactory.getLogger(PostgresSchemaUtils.class); + + private static volatile PostgresDialect postgresDialect; + + public static List listSchemas(PostgresSourceConfig sourceConfig, String namespace) { + try (JdbcConnection jdbc = getPostgresDialect(sourceConfig).openJdbcConnection()) { + return listSchemas(jdbc, namespace); + } catch (SQLException e) { + throw new RuntimeException("Error to list schemas: " + e.getMessage(), e); + } + } + + public static List listNamespaces(PostgresSourceConfig sourceConfig) { + try (JdbcConnection jdbc = getPostgresDialect(sourceConfig).openJdbcConnection()) { + return listNamespaces(jdbc); + } catch (SQLException e) { + throw new RuntimeException("Error to list namespaces: " + e.getMessage(), e); + } + } + + public static List listTables( + PostgresSourceConfig sourceConfig, @Nullable String dbName) { + try (PostgresConnection jdbc = getPostgresDialect(sourceConfig).openJdbcConnection()) { + + List databases = + dbName != null + ? Collections.singletonList(dbName) + : Collections.singletonList(sourceConfig.getDatabaseList().get(0)); + + List tableIds = new ArrayList<>(); + for (String database : databases) { + List tableIdList = + jdbc.getAllTableIds(database).stream() + .map(PostgresSchemaUtils::toCdcTableId) + .collect(Collectors.toList()); + tableIds.addAll(tableIdList); + } + return tableIds; + } catch (SQLException e) { + throw new RuntimeException("Error to list databases: " + e.getMessage(), e); + } + } + + public static Schema getTableSchema( + PostgresSourceConfig sourceConfig, PostgresPartition partition, TableId tableId) { + try (PostgresConnection jdbc = getPostgresDialect(sourceConfig).openJdbcConnection()) { + return getTableSchema(partition, tableId, sourceConfig, jdbc); + } + } + + public static PostgresDialect getPostgresDialect(PostgresSourceConfig sourceConfig) { + if (postgresDialect == null) { // + synchronized (PostgresSchemaUtils.class) { + if (postgresDialect == null) { // + postgresDialect = new PostgresDialect(sourceConfig); + } + } + } + return postgresDialect; + } + + public static List listSchemas(JdbcConnection jdbc, String namespace) + throws SQLException { + LOG.info("Read list of available schemas"); + final List schemaNames = new ArrayList<>(); + + String querySql = + String.format( + "SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE CATALOG_NAME = %s", + quote(namespace)); + + jdbc.query( + querySql, + rs -> { + while (rs.next()) { + schemaNames.add(rs.getString(1)); + } + }); + LOG.info("\t list of available schemas are: {}", schemaNames); + return schemaNames; + } + + public static List listNamespaces(JdbcConnection jdbc) throws SQLException { + LOG.info("Read list of available namespaces"); + final List namespaceNames = new ArrayList<>(); + jdbc.query( + "SELECT DATNAME FROM PG_DATABASE", + rs -> { + while (rs.next()) { + namespaceNames.add(rs.getString(1)); + } + }); + LOG.info("\t list of available namespaces are: {}", namespaceNames); + return namespaceNames; + } + + public static String quote(String dbOrTableName) { + return "\"" + dbOrTableName + "\""; + } + + public static Schema getTableSchema( + PostgresPartition partition, + TableId tableId, + PostgresSourceConfig sourceConfig, + PostgresConnection jdbc) { + // fetch table schemas + CustomPostgresSchema postgresSchema = new CustomPostgresSchema(jdbc, sourceConfig); + TableChanges.TableChange tableSchema = postgresSchema.getTableSchema(toDbzTableId(tableId)); + return toSchema(tableSchema.getTable()); + } + + public static Schema getTableSchema( + io.debezium.relational.TableId tableId, + PostgresSourceConfig sourceConfig, + PostgresConnection jdbc) { + // fetch table schemas + CustomPostgresSchema postgresSchema = new CustomPostgresSchema(jdbc, sourceConfig); + + TableChanges.TableChange tableSchema = postgresSchema.getTableSchema(tableId); + return toSchema(tableSchema.getTable()); + } + + public static Schema toSchema(Table table) { + List columns = + table.columns().stream() + .map(PostgresSchemaUtils::toColumn) + .collect(Collectors.toList()); + + return Schema.newBuilder() + .setColumns(columns) + .primaryKey(table.primaryKeyColumnNames()) + .comment(table.comment()) + .build(); + } + + public static Column toColumn(io.debezium.relational.Column column) { + return Column.physicalColumn( + column.name(), PostgresTypeUtils.fromDbzColumn(column), column.comment()); + } + + public static io.debezium.relational.TableId toDbzTableId(TableId tableId) { + return new io.debezium.relational.TableId( + tableId.getSchemaName(), null, tableId.getTableName()); + } + + public static org.apache.flink.cdc.common.event.TableId toCdcTableId( + io.debezium.relational.TableId dbzTableId) { + return org.apache.flink.cdc.common.event.TableId.tableId( + dbzTableId.schema(), dbzTableId.table()); + } + + private PostgresSchemaUtils() {} +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java new file mode 100644 index 0000000000..f58ac2d8bd --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.postgres.utils; + +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.table.types.logical.DecimalType; + +import io.debezium.relational.Column; + +/** A utility class for converting Postgres types to Flink types. */ +public class PostgresTypeUtils { + private static final String PG_SMALLSERIAL = "smallserial"; + private static final String PG_SERIAL = "serial"; + private static final String PG_BIGSERIAL = "bigserial"; + private static final String PG_BYTEA = "bytea"; + private static final String PG_BYTEA_ARRAY = "_bytea"; + private static final String PG_SMALLINT = "int2"; + private static final String PG_SMALLINT_ARRAY = "_int2"; + private static final String PG_INTEGER = "int4"; + private static final String PG_INTEGER_ARRAY = "_int4"; + private static final String PG_BIGINT = "int8"; + private static final String PG_BIGINT_ARRAY = "_int8"; + private static final String PG_REAL = "float4"; + private static final String PG_REAL_ARRAY = "_float4"; + private static final String PG_DOUBLE_PRECISION = "float8"; + private static final String PG_DOUBLE_PRECISION_ARRAY = "_float8"; + private static final String PG_NUMERIC = "numeric"; + private static final String PG_NUMERIC_ARRAY = "_numeric"; + private static final String PG_BOOLEAN = "bool"; + private static final String PG_BOOLEAN_ARRAY = "_bool"; + private static final String PG_TIMESTAMP = "timestamp"; + private static final String PG_TIMESTAMP_ARRAY = "_timestamp"; + private static final String PG_TIMESTAMPTZ = "timestamptz"; + private static final String PG_TIMESTAMPTZ_ARRAY = "_timestamptz"; + private static final String PG_DATE = "date"; + private static final String PG_DATE_ARRAY = "_date"; + private static final String PG_TIME = "time"; + private static final String PG_TIME_ARRAY = "_time"; + private static final String PG_TEXT = "text"; + private static final String PG_TEXT_ARRAY = "_text"; + private static final String PG_CHAR = "bpchar"; + private static final String PG_CHAR_ARRAY = "_bpchar"; + private static final String PG_CHARACTER = "character"; + private static final String PG_CHARACTER_ARRAY = "_character"; + private static final String PG_CHARACTER_VARYING = "varchar"; + private static final String PG_CHARACTER_VARYING_ARRAY = "_varchar"; + + /** Returns a corresponding Flink data type from a debezium {@link Column}. */ + public static DataType fromDbzColumn(Column column) { + DataType dataType = convertFromColumn(column); + if (column.isOptional()) { + return dataType; + } else { + return dataType.notNull(); + } + } + + /** + * Returns a corresponding Flink data type from a debezium {@link Column} with nullable always + * be true. + */ + private static DataType convertFromColumn(Column column) { + String typeName = column.typeName(); + + int precision = column.length(); + int scale = column.scale().orElse(0); + + switch (typeName) { + case PG_BOOLEAN: + return DataTypes.BOOLEAN(); + case PG_BOOLEAN_ARRAY: + return DataTypes.ARRAY(DataTypes.BOOLEAN()); + case PG_BYTEA: + return DataTypes.BYTES(); + case PG_BYTEA_ARRAY: + return DataTypes.ARRAY(DataTypes.BYTES()); + case PG_SMALLINT: + case PG_SMALLSERIAL: + return DataTypes.SMALLINT(); + case PG_SMALLINT_ARRAY: + return DataTypes.ARRAY(DataTypes.SMALLINT()); + case PG_INTEGER: + case PG_SERIAL: + return DataTypes.INT(); + case PG_INTEGER_ARRAY: + return DataTypes.ARRAY(DataTypes.INT()); + case PG_BIGINT: + case PG_BIGSERIAL: + return DataTypes.BIGINT(); + case PG_BIGINT_ARRAY: + return DataTypes.ARRAY(DataTypes.BIGINT()); + case PG_REAL: + return DataTypes.FLOAT(); + case PG_REAL_ARRAY: + return DataTypes.ARRAY(DataTypes.FLOAT()); + case PG_DOUBLE_PRECISION: + return DataTypes.DOUBLE(); + case PG_DOUBLE_PRECISION_ARRAY: + return DataTypes.ARRAY(DataTypes.DOUBLE()); + case PG_NUMERIC: + // see SPARK-26538: handle numeric without explicit precision and scale. + if (precision > 0) { + return DataTypes.DECIMAL(precision, scale); + } + return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18); + case PG_NUMERIC_ARRAY: + // see SPARK-26538: handle numeric without explicit precision and scale. + if (precision > 0) { + return DataTypes.ARRAY(DataTypes.DECIMAL(precision, scale)); + } + return DataTypes.ARRAY(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18)); + case PG_CHAR: + case PG_CHARACTER: + return DataTypes.CHAR(precision); + case PG_CHAR_ARRAY: + case PG_CHARACTER_ARRAY: + return DataTypes.ARRAY(DataTypes.CHAR(precision)); + case PG_CHARACTER_VARYING: + return DataTypes.VARCHAR(precision); + case PG_CHARACTER_VARYING_ARRAY: + return DataTypes.ARRAY(DataTypes.VARCHAR(precision)); + case PG_TEXT: + return DataTypes.STRING(); + case PG_TEXT_ARRAY: + return DataTypes.ARRAY(DataTypes.STRING()); + case PG_TIMESTAMP: + return DataTypes.TIMESTAMP(scale); + case PG_TIMESTAMP_ARRAY: + return DataTypes.ARRAY(DataTypes.TIMESTAMP(scale)); + case PG_TIME: + return DataTypes.TIME(scale); + case PG_TIME_ARRAY: + return DataTypes.ARRAY(DataTypes.TIME(scale)); + case PG_DATE: + return DataTypes.DATE(); + case PG_DATE_ARRAY: + return DataTypes.ARRAY(DataTypes.DATE()); + default: + throw new UnsupportedOperationException( + String.format("Doesn't support Postgres type '%s' yet", typeName)); + } + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/resources/META-INF/services/org.apache.flink.cdc.common.factories.Factory b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/resources/META-INF/services/org.apache.flink.cdc.common.factories.Factory new file mode 100644 index 0000000000..97346f89f5 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/resources/META-INF/services/org.apache.flink.cdc.common.factories.Factory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.cdc.connectors.postgres.factory.PostgresDataSourceFactory diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/pom.xml index 424372f4c1..ea2c50a533 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/pom.xml +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/pom.xml @@ -30,6 +30,7 @@ limitations under the License. flink-cdc-pipeline-connector-values flink-cdc-pipeline-connector-mysql + flink-cdc-pipeline-connector-postgres flink-cdc-pipeline-connector-doris flink-cdc-pipeline-connector-starrocks flink-cdc-pipeline-connector-kafka diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java index ee991a70f8..2f06487f6a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java @@ -298,6 +298,10 @@ public PostgresIncrementalSource build() { configFactory, checkNotNull(deserializer), offsetFactory, dialect); } + public PostgresSourceConfigFactory getConfigFactory() { + return configFactory; + } + /** The Postgres source based on the incremental snapshot framework. */ @Experimental public static class PostgresIncrementalSource extends JdbcIncrementalSource {