Skip to content

Commit

Permalink
Add mariadb support (#950)
Browse files Browse the repository at this point in the history
  • Loading branch information
kellen authored Jan 3, 2025
1 parent 8d07bf4 commit e4b739f
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 40 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ simply streams the table contents via JDBC into target location as Avro.

## dbeam-core package features

- Supports both PostgreSQL and MySQL JDBC connectors
- Supports both PostgreSQL, MySQL, MariaDB, and H2 JDBC connectors
- Supports [Google CloudSQL](https://cloud.google.com/sql/) managed databases
- Currently outputs only to Avro format
- Reads database from an external password file (`--passwordFile`) or an external [KMS](https://cloud.google.com/kms/) encrypted password file (`--passwordFileKmsEncrypted`)
Expand Down
8 changes: 8 additions & 0 deletions dbeam-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
</dependency>
<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
</dependency>
<dependency>
<groupId>com.google.cloud.sql</groupId>
<artifactId>postgres-socket-factory</artifactId>
Expand All @@ -64,6 +68,10 @@
<groupId>com.google.cloud.sql</groupId>
<artifactId>mysql-socket-factory-connector-j-8</artifactId>
</dependency>
<dependency>
<groupId>com.google.cloud.sql</groupId>
<artifactId>mariadb-socket-factory</artifactId>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,27 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
import java.util.stream.Collectors;

public class JdbcConnectionUtil {

private static Map<String, String> driverMapping =
ImmutableMap.of(
"postgresql", "org.postgresql.Driver",
"mysql", "com.mysql.cj.jdbc.Driver",
"mariadb", "org.mariadb.jdbc.Driver",
"h2", "org.h2.Driver");

private static String driverPrefixes =
driverMapping.keySet().stream().map(k -> "jdbc:" + k).collect(Collectors.joining(", "));

public static String getDriverClass(final String url) throws ClassNotFoundException {
final String[] parts = url.split(":", 3);
Preconditions.checkArgument(
parts.length > 1 && "jdbc".equals(parts[0]) && driverMapping.get(parts[1]) != null,
"Invalid jdbc connection URL: %s. Expect jdbc:postgresql or jdbc:mysql as prefix.",
url);
parts.length > 1 && "jdbc".equals(parts[0]) && driverMapping.containsKey(parts[1]),
"Invalid jdbc connection URL: %s. Expected one of %s as prefix.",
url,
driverPrefixes);
return Class.forName(driverMapping.get(parts[1])).getCanonicalName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ static String getDatabaseTableName(final ResultSetMetaData meta) throws SQLExcep

private static SchemaBuilder.FieldAssembler<Schema> createAvroFields(
final ResultSet resultSet,
final SchemaBuilder.FieldAssembler<Schema> builder,
final SchemaBuilder.FieldAssembler<Schema> builder,
final boolean useLogicalTypes)
throws SQLException {

Expand Down Expand Up @@ -162,8 +162,8 @@ private static SchemaBuilder.FieldAssembler<Schema> createAvroFields(
SchemaBuilder.UnionAccumulator<SchemaBuilder.NullDefault<Schema>>>
fieldSchemaBuilder = field.type().unionOf().nullBuilder().endNull().and();

Integer arrayItemType = resultSet.isFirst() && columnType == ARRAY
? resultSet.getArray(i).getBaseType() : null;
Integer arrayItemType =
resultSet.isFirst() && columnType == ARRAY ? resultSet.getArray(i).getBaseType() : null;

final SchemaBuilder.UnionAccumulator<SchemaBuilder.NullDefault<Schema>> schemaFieldAssembler =
setAvroColumnType(
Expand All @@ -189,7 +189,6 @@ private static SchemaBuilder.FieldAssembler<Schema> createAvroFields(
* <li>{@link com.mysql.cj.MysqlType }
* <li>org.h2.value.Value
* </ul>
*
*/
private static SchemaBuilder.UnionAccumulator<SchemaBuilder.NullDefault<Schema>>
setAvroColumnType(
Expand Down Expand Up @@ -240,8 +239,13 @@ private static SchemaBuilder.FieldAssembler<Schema> createAvroFields(
return field.bytesType();
}
case ARRAY:
return setAvroColumnType(arrayItemType, null, precision, columnClassName,
useLogicalTypes, field.array().items());
return setAvroColumnType(
arrayItemType,
null,
precision,
columnClassName,
useLogicalTypes,
field.array().items());
case BINARY:
case VARBINARY:
case LONGVARBINARY:
Expand Down
56 changes: 31 additions & 25 deletions dbeam-core/src/test/java/com/spotify/dbeam/Coffee.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.Locale;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;

// A fictitious DB model to test different SQL types
@AutoValue
Expand Down Expand Up @@ -96,8 +95,9 @@ public static Coffee create(
public String insertStatement() {
return String.format(
Locale.ENGLISH,
"INSERT INTO COFFEES " + "VALUES ('%s', %s, '%s', %f, %f, %b, %d, %d, '%s', %s, '%s', %d,"
+ " ARRAY [%s], ARRAY ['%s'])",
"INSERT INTO COFFEES "
+ "VALUES ('%s', %s, '%s', %f, %f, %b, %d, %d, '%s', %s, '%s', %d,"
+ " ARRAY [%s], ARRAY ['%s'])",
name(),
supId().orElse(null),
price().toString(),
Expand Down Expand Up @@ -147,17 +147,20 @@ public static String ddl() {
Optional.empty(),
UUID.fromString("123e4567-e89b-12d3-a456-426655440000"),
1L,
new ArrayList<Integer>() {{
add(5);
add(7);
add(11);
}},
new ArrayList<String>() {{
add("rock");
add("scissors");
add("paper");
}}
);
new ArrayList<Integer>() {
{
add(5);
add(7);
add(11);
}
},
new ArrayList<String>() {
{
add("rock");
add("scissors");
add("paper");
}
});

public static Coffee COFFEE2 =
create(
Expand All @@ -173,15 +176,18 @@ public static String ddl() {
Optional.empty(),
UUID.fromString("123e4567-e89b-a456-12d3-426655440000"),
2L,
new ArrayList<Integer>() {{
add(7);
add(11);
add(23);
}},
new ArrayList<String>() {{
add("scissors");
add("paper");
add("rock");
}}
);
new ArrayList<Integer>() {
{
add(7);
add(11);
add(23);
}
},
new ArrayList<String>() {
{
add("scissors");
add("paper");
add("rock");
}
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -207,8 +205,8 @@ public void shouldEncodeResultSetToValidAvro()
Assert.assertEquals(14, record.getSchema().getFields().size());
Assert.assertEquals(schema, record.getSchema());
List<String> actualTxtArray =
((GenericData.Array<Utf8>) record.get(13)).stream().map(x -> x.toString()).collect(
Collectors.toList());
((GenericData.Array<Utf8>) record.get(13))
.stream().map(x -> x.toString()).collect(Collectors.toList());
final Coffee actual =
Coffee.create(
record.get(0).toString(),
Expand Down
2 changes: 1 addition & 1 deletion e2e/ddl.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

-- This file contatins psql views with complex types to validate and troubleshoot dbeam
-- This file contains psql views with complex types to validate and troubleshoot dbeam
-- import with:
-- psql -f ddl.sql postgres

Expand Down
11 changes: 11 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@
<bouncycastle.version>1.78.1</bouncycastle.version>
<junit.version>4.13.2</junit.version>
<mysql.version>8.4.0</mysql.version>
<mariadb.version>3.5.1</mariadb.version>
<postgresql.version>42.7.4</postgresql.version>
<socket-factory.version>1.18.0</socket-factory.version>
</properties>
Expand Down Expand Up @@ -365,6 +366,11 @@
<artifactId>mysql-connector-j</artifactId>
<version>${mysql.version}</version>
</dependency>
<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
<version>${mariadb.version}</version>
</dependency>
<dependency>
<groupId>com.google.cloud.sql</groupId>
<artifactId>postgres-socket-factory</artifactId>
Expand All @@ -376,6 +382,11 @@
<artifactId>mysql-socket-factory-connector-j-8</artifactId>
<version>${socket-factory.version}</version>
</dependency>
<dependency>
<groupId>com.google.cloud.sql</groupId>
<artifactId>mariadb-socket-factory</artifactId>
<version>${socket-factory.version}</version>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest</artifactId>
Expand Down

0 comments on commit e4b739f

Please sign in to comment.