From e4b739f36248c2d772fc5788c32a1a95bf73169e Mon Sep 17 00:00:00 2001 From: kellen Date: Fri, 3 Jan 2025 14:28:29 -0500 Subject: [PATCH] Add mariadb support (#950) --- README.md | 2 +- dbeam-core/pom.xml | 8 +++ .../dbeam/args/JdbcConnectionUtil.java | 12 +++- .../spotify/dbeam/avro/JdbcAvroSchema.java | 16 ++++-- .../test/java/com/spotify/dbeam/Coffee.java | 56 ++++++++++--------- .../dbeam/avro/JdbcAvroRecordTest.java | 6 +- e2e/ddl.sql | 2 +- pom.xml | 11 ++++ 8 files changed, 73 insertions(+), 40 deletions(-) diff --git a/README.md b/README.md index 3fad3ad4..7a766917 100644 --- a/README.md +++ b/README.md @@ -31,7 +31,7 @@ simply streams the table contents via JDBC into target location as Avro. ## dbeam-core package features -- Supports both PostgreSQL and MySQL JDBC connectors +- Supports both PostgreSQL, MySQL, MariaDB, and H2 JDBC connectors - Supports [Google CloudSQL](https://cloud.google.com/sql/) managed databases - Currently outputs only to Avro format - Reads database from an external password file (`--passwordFile`) or an external [KMS](https://cloud.google.com/kms/) encrypted password file (`--passwordFileKmsEncrypted`) diff --git a/dbeam-core/pom.xml b/dbeam-core/pom.xml index ea4980a4..b58e9371 100644 --- a/dbeam-core/pom.xml +++ b/dbeam-core/pom.xml @@ -56,6 +56,10 @@ com.mysql mysql-connector-j + + org.mariadb.jdbc + mariadb-java-client + com.google.cloud.sql postgres-socket-factory @@ -64,6 +68,10 @@ com.google.cloud.sql mysql-socket-factory-connector-j-8 + + com.google.cloud.sql + mariadb-socket-factory + com.google.guava diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/args/JdbcConnectionUtil.java b/dbeam-core/src/main/java/com/spotify/dbeam/args/JdbcConnectionUtil.java index 6f70b7bd..fc1791f0 100644 --- a/dbeam-core/src/main/java/com/spotify/dbeam/args/JdbcConnectionUtil.java +++ b/dbeam-core/src/main/java/com/spotify/dbeam/args/JdbcConnectionUtil.java @@ -23,6 +23,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import java.util.Map; +import java.util.stream.Collectors; public class JdbcConnectionUtil { @@ -30,14 +31,19 @@ public class JdbcConnectionUtil { ImmutableMap.of( "postgresql", "org.postgresql.Driver", "mysql", "com.mysql.cj.jdbc.Driver", + "mariadb", "org.mariadb.jdbc.Driver", "h2", "org.h2.Driver"); + private static String driverPrefixes = + driverMapping.keySet().stream().map(k -> "jdbc:" + k).collect(Collectors.joining(", ")); + public static String getDriverClass(final String url) throws ClassNotFoundException { final String[] parts = url.split(":", 3); Preconditions.checkArgument( - parts.length > 1 && "jdbc".equals(parts[0]) && driverMapping.get(parts[1]) != null, - "Invalid jdbc connection URL: %s. Expect jdbc:postgresql or jdbc:mysql as prefix.", - url); + parts.length > 1 && "jdbc".equals(parts[0]) && driverMapping.containsKey(parts[1]), + "Invalid jdbc connection URL: %s. Expected one of %s as prefix.", + url, + driverPrefixes); return Class.forName(driverMapping.get(parts[1])).getCanonicalName(); } } diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroSchema.java b/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroSchema.java index 1e846b6a..4fe5686e 100644 --- a/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroSchema.java +++ b/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroSchema.java @@ -126,7 +126,7 @@ static String getDatabaseTableName(final ResultSetMetaData meta) throws SQLExcep private static SchemaBuilder.FieldAssembler createAvroFields( final ResultSet resultSet, - final SchemaBuilder.FieldAssembler builder, + final SchemaBuilder.FieldAssembler builder, final boolean useLogicalTypes) throws SQLException { @@ -162,8 +162,8 @@ private static SchemaBuilder.FieldAssembler createAvroFields( SchemaBuilder.UnionAccumulator>> fieldSchemaBuilder = field.type().unionOf().nullBuilder().endNull().and(); - Integer arrayItemType = resultSet.isFirst() && columnType == ARRAY - ? resultSet.getArray(i).getBaseType() : null; + Integer arrayItemType = + resultSet.isFirst() && columnType == ARRAY ? resultSet.getArray(i).getBaseType() : null; final SchemaBuilder.UnionAccumulator> schemaFieldAssembler = setAvroColumnType( @@ -189,7 +189,6 @@ private static SchemaBuilder.FieldAssembler createAvroFields( *
  • {@link com.mysql.cj.MysqlType } *
  • org.h2.value.Value * - * */ private static SchemaBuilder.UnionAccumulator> setAvroColumnType( @@ -240,8 +239,13 @@ private static SchemaBuilder.FieldAssembler createAvroFields( return field.bytesType(); } case ARRAY: - return setAvroColumnType(arrayItemType, null, precision, columnClassName, - useLogicalTypes, field.array().items()); + return setAvroColumnType( + arrayItemType, + null, + precision, + columnClassName, + useLogicalTypes, + field.array().items()); case BINARY: case VARBINARY: case LONGVARBINARY: diff --git a/dbeam-core/src/test/java/com/spotify/dbeam/Coffee.java b/dbeam-core/src/test/java/com/spotify/dbeam/Coffee.java index af181643..f9c52d79 100644 --- a/dbeam-core/src/test/java/com/spotify/dbeam/Coffee.java +++ b/dbeam-core/src/test/java/com/spotify/dbeam/Coffee.java @@ -27,7 +27,6 @@ import java.util.Locale; import java.util.Optional; import java.util.UUID; -import java.util.stream.Collectors; // A fictitious DB model to test different SQL types @AutoValue @@ -96,8 +95,9 @@ public static Coffee create( public String insertStatement() { return String.format( Locale.ENGLISH, - "INSERT INTO COFFEES " + "VALUES ('%s', %s, '%s', %f, %f, %b, %d, %d, '%s', %s, '%s', %d," - + " ARRAY [%s], ARRAY ['%s'])", + "INSERT INTO COFFEES " + + "VALUES ('%s', %s, '%s', %f, %f, %b, %d, %d, '%s', %s, '%s', %d," + + " ARRAY [%s], ARRAY ['%s'])", name(), supId().orElse(null), price().toString(), @@ -147,17 +147,20 @@ public static String ddl() { Optional.empty(), UUID.fromString("123e4567-e89b-12d3-a456-426655440000"), 1L, - new ArrayList() {{ - add(5); - add(7); - add(11); - }}, - new ArrayList() {{ - add("rock"); - add("scissors"); - add("paper"); - }} - ); + new ArrayList() { + { + add(5); + add(7); + add(11); + } + }, + new ArrayList() { + { + add("rock"); + add("scissors"); + add("paper"); + } + }); public static Coffee COFFEE2 = create( @@ -173,15 +176,18 @@ public static String ddl() { Optional.empty(), UUID.fromString("123e4567-e89b-a456-12d3-426655440000"), 2L, - new ArrayList() {{ - add(7); - add(11); - add(23); - }}, - new ArrayList() {{ - add("scissors"); - add("paper"); - add("rock"); - }} - ); + new ArrayList() { + { + add(7); + add(11); + add(23); + } + }, + new ArrayList() { + { + add("scissors"); + add("paper"); + add("rock"); + } + }); } diff --git a/dbeam-core/src/test/java/com/spotify/dbeam/avro/JdbcAvroRecordTest.java b/dbeam-core/src/test/java/com/spotify/dbeam/avro/JdbcAvroRecordTest.java index 0ea2b6f3..6caac904 100644 --- a/dbeam-core/src/test/java/com/spotify/dbeam/avro/JdbcAvroRecordTest.java +++ b/dbeam-core/src/test/java/com/spotify/dbeam/avro/JdbcAvroRecordTest.java @@ -35,8 +35,6 @@ import java.sql.SQLException; import java.sql.Timestamp; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; @@ -207,8 +205,8 @@ public void shouldEncodeResultSetToValidAvro() Assert.assertEquals(14, record.getSchema().getFields().size()); Assert.assertEquals(schema, record.getSchema()); List actualTxtArray = - ((GenericData.Array) record.get(13)).stream().map(x -> x.toString()).collect( - Collectors.toList()); + ((GenericData.Array) record.get(13)) + .stream().map(x -> x.toString()).collect(Collectors.toList()); final Coffee actual = Coffee.create( record.get(0).toString(), diff --git a/e2e/ddl.sql b/e2e/ddl.sql index 29d2f866..760eff4f 100644 --- a/e2e/ddl.sql +++ b/e2e/ddl.sql @@ -1,5 +1,5 @@ --- This file contatins psql views with complex types to validate and troubleshoot dbeam +-- This file contains psql views with complex types to validate and troubleshoot dbeam -- import with: -- psql -f ddl.sql postgres diff --git a/pom.xml b/pom.xml index 880c26ad..58505ef7 100644 --- a/pom.xml +++ b/pom.xml @@ -135,6 +135,7 @@ 1.78.1 4.13.2 8.4.0 + 3.5.1 42.7.4 1.18.0 @@ -365,6 +366,11 @@ mysql-connector-j ${mysql.version} + + org.mariadb.jdbc + mariadb-java-client + ${mariadb.version} + com.google.cloud.sql postgres-socket-factory @@ -376,6 +382,11 @@ mysql-socket-factory-connector-j-8 ${socket-factory.version} + + com.google.cloud.sql + mariadb-socket-factory + ${socket-factory.version} + org.hamcrest hamcrest