From 3d4615e2fe916d5cb92763736a97b1573d36f2ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Stankiewicz?= Date: Wed, 29 Jan 2025 20:50:12 +0100 Subject: [PATCH] support dbcp2.BasicDataSource in jdbcio lineage (#33801) --- .../org/apache/beam/sdk/io/jdbc/JdbcUtil.java | 54 ++++++++++++------- .../apache/beam/sdk/io/jdbc/JdbcUtilTest.java | 15 ++++++ 2 files changed, 51 insertions(+), 18 deletions(-) diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java index 128f21a81097..7a0558e6ca92 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java @@ -75,6 +75,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Files; +import org.apache.commons.dbcp2.BasicDataSource; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.DateTime; import org.joda.time.Duration; @@ -712,24 +713,41 @@ void reportLineage(Lineage lineage, @Nullable KV<@Nullable String, String> table String maybeSqlInstance; String url; try { - Class hikariClass = Class.forName("com.zaxxer.hikari.HikariDataSource"); - if (!hikariClass.isInstance(dataSource)) { - return null; - } - Method getProperties = hikariClass.getMethod("getDataSourceProperties"); - Properties properties = (Properties) getProperties.invoke(dataSource); - if (properties == null) { - return null; - } - maybeSqlInstance = properties.getProperty("cloudSqlInstance"); - if (maybeSqlInstance == null) { - // not a cloudSqlInstance - return null; - } - Method getUrl = hikariClass.getMethod("getJdbcUrl"); - url = (String) getUrl.invoke(dataSource); - if (url == null) { - return null; + if (dataSource instanceof BasicDataSource) { + // try default data source implementation + BasicDataSource source = (BasicDataSource) dataSource; + Method getProperties = source.getClass().getDeclaredMethod("getConnectionProperties"); + getProperties.setAccessible(true); + Properties properties = (Properties) getProperties.invoke(dataSource); + if (properties == null) { + return null; + } + maybeSqlInstance = properties.getProperty("cloudSqlInstance"); + if (maybeSqlInstance == null) { + // not a cloudSqlInstance + return null; + } + url = source.getUrl(); + } else { // try recommended as per best practice + Class hikariClass = Class.forName("com.zaxxer.hikari.HikariDataSource"); + if (!hikariClass.isInstance(dataSource)) { + return null; + } + Method getProperties = hikariClass.getMethod("getDataSourceProperties"); + Properties properties = (Properties) getProperties.invoke(dataSource); + if (properties == null) { + return null; + } + maybeSqlInstance = properties.getProperty("cloudSqlInstance"); + if (maybeSqlInstance == null) { + // not a cloudSqlInstance + return null; + } + Method getUrl = hikariClass.getMethod("getJdbcUrl"); + url = (String) getUrl.invoke(dataSource); + if (url == null) { + return null; + } } } catch (ClassNotFoundException | InvocationTargetException diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcUtilTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcUtilTest.java index 118eaa4df7ef..da0faad46bc6 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcUtilTest.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcUtilTest.java @@ -47,6 +47,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.apache.commons.dbcp2.BasicDataSource; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.DateTime; import org.junit.Rule; @@ -329,6 +330,20 @@ public void testFqnFromHikariDataSourcePostgreSql() { components.getSegments()); } + @Test + public void testFqnFromBasicDataSourcePostgreSql() { + BasicDataSource source = new BasicDataSource(); + source.setUrl("jdbc:postgresql:///postgres"); + source.setUsername("postgres"); + source.setConnectionProperties( + "cloudSqlInstance=example.com:project:some-region:instance-name"); + JdbcUtil.FQNComponents components = JdbcUtil.FQNComponents.of(source); + assertEquals("cloudsql_postgresql", components.getScheme()); + assertEquals( + ImmutableList.of("example.com:project", "some-region", "instance-name", "postgres"), + components.getSegments()); + } + @Test public void testFqnFromHikariDataSourceMySql() { HikariConfig config = new HikariConfig();