From e4745e182f0b98e40ed51a64232026159f1d354c Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Fri, 5 Jul 2024 11:06:00 +0800 Subject: [PATCH] add lock to JdbcConnectionPools.close --- .../relational/connection/JdbcConnectionPools.java | 10 ++++++---- .../mysql/source/connection/JdbcConnectionPools.java | 10 ++++++---- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/relational/connection/JdbcConnectionPools.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/relational/connection/JdbcConnectionPools.java index cc703849820..9d63d8d5601 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/relational/connection/JdbcConnectionPools.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/relational/connection/JdbcConnectionPools.java @@ -86,10 +86,12 @@ public String getJdbcUrl( } public static void close() throws IOException { - if (instance != null) { - instance.pools.values().stream().forEach(HikariDataSource::close); - instance.pools.clear(); - POOL_FACTORY_MAP.clear(); + synchronized (instance.pools) { + if (instance != null) { + instance.pools.values().stream().forEach(HikariDataSource::close); + instance.pools.clear(); + POOL_FACTORY_MAP.clear(); + } } } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java index 0b3681a99d5..45ccf406aaf 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java @@ -53,10 +53,12 @@ public HikariDataSource getOrCreateConnectionPool( } } - public static void close() throws IOException { - if (INSTANCE != null) { - INSTANCE.pools.values().stream().forEach(HikariDataSource::close); - INSTANCE.pools.clear(); + public static synchronized void close() throws IOException { + synchronized (INSTANCE.pools) { + if (INSTANCE != null) { + INSTANCE.pools.values().stream().forEach(HikariDataSource::close); + INSTANCE.pools.clear(); + } } } }