diff --git a/README.md b/README.md index d4268c32..2761939f 100644 --- a/README.md +++ b/README.md @@ -125,17 +125,23 @@ com.spotify.dbeam.options.JdbcExportPipelineOptions: Controls whether generated Avro schema will contain logicalTypes or not. ``` -#### Input Avro schema file +#### Input (provided) Avro schema file -If provided an input Avro schema file, dbeam will read input schema file and use some of the +If there is a provided input Avro schema file (using `--avroSchemaFilePath` parameter), dbeam will read input schema file and use some of the properties when an output Avro schema is created. -#### Following fields will be propagated from input into output schema: +#### Following fields will be propagated from a provided input schema into the output schema: -* `record.doc` +* `record.name` * `record.namespace` +* `record.doc` * `record.field.doc` +#### Precedence rules: Avro record `doc` value +Avro record `doc` value can be set using three means (in order of precedence): +1. From a provided Avro schema `record.doc` +2. From a command line parameter (`--avroDoc`) +3. An automatically generated value #### DBeam Parallel Mode diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/args/JdbcExportArgs.java b/dbeam-core/src/main/java/com/spotify/dbeam/args/JdbcExportArgs.java index 8209ce2d..ecf9978f 100644 --- a/dbeam-core/src/main/java/com/spotify/dbeam/args/JdbcExportArgs.java +++ b/dbeam-core/src/main/java/com/spotify/dbeam/args/JdbcExportArgs.java @@ -26,7 +26,6 @@ import java.sql.Connection; import java.time.Duration; import java.util.Optional; -import org.apache.avro.Schema; @AutoValue public abstract class JdbcExportArgs implements Serializable { @@ -47,8 +46,6 @@ public abstract class JdbcExportArgs implements Serializable { public abstract Duration exportTimeout(); - public abstract Optional inputAvroSchema(); - @AutoValue.Builder abstract static class Builder { @@ -66,8 +63,6 @@ abstract static class Builder { abstract Builder setExportTimeout(Duration exportTimeout); - abstract Builder setInputAvroSchema(Optional inputAvroSchema); - abstract JdbcExportArgs build(); } @@ -81,8 +76,7 @@ static JdbcExportArgs create( Optional.empty(), Optional.empty(), false, - Duration.ofDays(7), - Optional.empty()); + Duration.ofDays(7)); } public static JdbcExportArgs create( @@ -92,8 +86,7 @@ public static JdbcExportArgs create( final Optional avroSchemaName, final Optional avroDoc, final Boolean useAvroLogicalTypes, - final Duration exportTimeout, - final Optional inputAvroSchema) { + final Duration exportTimeout) { return new AutoValue_JdbcExportArgs.Builder() .setJdbcAvroOptions(jdbcAvroArgs) .setQueryBuilderArgs(queryBuilderArgs) @@ -102,7 +95,6 @@ public static JdbcExportArgs create( .setAvroDoc(avroDoc) .setUseAvroLogicalTypes(useAvroLogicalTypes) .setExportTimeout(exportTimeout) - .setInputAvroSchema(inputAvroSchema) .build(); } diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/avro/AvroSchemaMetadataProvider.java b/dbeam-core/src/main/java/com/spotify/dbeam/avro/AvroSchemaMetadataProvider.java new file mode 100644 index 00000000..3348541c --- /dev/null +++ b/dbeam-core/src/main/java/com/spotify/dbeam/avro/AvroSchemaMetadataProvider.java @@ -0,0 +1,85 @@ +/*- + * -\-\- + * DBeam Core + * -- + * Copyright (C) 2016 - 2018 Spotify AB + * -- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * -/-/- + */ + +package com.spotify.dbeam.avro; + +import org.apache.avro.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AvroSchemaMetadataProvider { + + private static Logger LOGGER = LoggerFactory.getLogger(AvroSchemaMetadataProvider.class); + + // provided schema has a priority over single arguments' values. + private final Schema provided; + private final String avroSchemaName; + private final String avroSchemaNamespace; + private final String avroDoc; + + public AvroSchemaMetadataProvider( + final String avroSchemaName, final String avroSchemaNamespace, final String avroDoc) { + this(null, avroSchemaName, avroSchemaNamespace, avroDoc); + } + + public AvroSchemaMetadataProvider( + final Schema provided, + final String avroSchemaName, + final String avroSchemaNamespace, + final String avroDoc) { + this.provided = provided; + this.avroSchemaName = avroSchemaName; + this.avroSchemaNamespace = avroSchemaNamespace; + this.avroDoc = avroDoc; + } + + public String avroDoc(final String defaultVal) { + return (provided != null && provided.getDoc() != null) + ? provided.getDoc() + : (avroDoc != null) ? avroDoc : defaultVal; + } + + public String avroSchemaName(final String defaultVal) { + return (provided != null && provided.getName() != null) + ? provided.getName() + : (avroSchemaName != null) ? avroSchemaName : defaultVal; + } + + public String avroSchemaNamespace() { + return (provided != null && provided.getNamespace() != null) + ? provided.getNamespace() + : avroSchemaNamespace; + } + + public String getFieldDoc(final String fieldName, final String defaultVal) { + if (provided != null) { + final Schema.Field field = provided.getField(fieldName); + if (field != null) { + return field.doc(); + } else { + LOGGER.warn( + "Field [{}] not found in the provided schema [{}]", fieldName, provided.getName()); + return defaultVal; + } + } else { + return defaultVal; + } + } +} diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/avro/BeamJdbcAvroSchema.java b/dbeam-core/src/main/java/com/spotify/dbeam/avro/BeamJdbcAvroSchema.java index ddbc92a8..83b11fa4 100644 --- a/dbeam-core/src/main/java/com/spotify/dbeam/avro/BeamJdbcAvroSchema.java +++ b/dbeam-core/src/main/java/com/spotify/dbeam/avro/BeamJdbcAvroSchema.java @@ -27,7 +27,6 @@ import java.sql.Connection; import java.sql.SQLException; import java.util.Collections; -import java.util.Optional; import org.apache.avro.Schema; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.FileSystems; @@ -42,7 +41,7 @@ public class BeamJdbcAvroSchema { - private static Logger LOGGER = LoggerFactory.getLogger(BeamJdbcAvroSchema.class); + private static final Logger LOGGER = LoggerFactory.getLogger(BeamJdbcAvroSchema.class); /** * Generate Avro schema by reading one row. Expose Beam metrics via a Beam PTransform. @@ -54,10 +53,13 @@ public class BeamJdbcAvroSchema { * @throws Exception in case of failure to query database */ public static Schema createSchema( - final Pipeline pipeline, final JdbcExportArgs args, final Connection connection) + final Pipeline pipeline, + final JdbcExportArgs args, + final Connection connection, + final AvroSchemaMetadataProvider provider) throws Exception { final long startTime = System.nanoTime(); - final Schema generatedSchema = generateAvroSchema(args, connection); + final Schema generatedSchema = generateAvroSchema(args, connection, provider); final long elapsedTimeSchema = (System.nanoTime() - startTime) / 1000000; LOGGER.info("Elapsed time to schema {} seconds", elapsedTimeSchema / 1000.0); @@ -78,35 +80,24 @@ public static Schema createSchema( return generatedSchema; } - private static Schema generateAvroSchema(final JdbcExportArgs args, final Connection connection) + private static Schema generateAvroSchema( + final JdbcExportArgs args, + final Connection connection, + final AvroSchemaMetadataProvider provider) throws SQLException { - final String dbUrl = connection.getMetaData().getURL(); - final String avroDoc = - args.avroDoc() - .orElseGet(() -> String.format("Generate schema from JDBC ResultSet from %s", dbUrl)); return JdbcAvroSchema.createSchemaByReadingOneRow( - connection, - args.queryBuilderArgs(), - args.avroSchemaNamespace(), - args.avroSchemaName(), - avroDoc, - args.useAvroLogicalTypes()); - } - - public static Optional parseOptionalInputAvroSchemaFile(final String filename) - throws IOException { - - if (filename == null || filename.isEmpty()) { - return Optional.empty(); - } - - return Optional.of(parseInputAvroSchemaFile(filename)); + connection, args.queryBuilderArgs(), args.useAvroLogicalTypes(), provider); } public static Schema parseInputAvroSchemaFile(final String filename) throws IOException { final MatchResult.Metadata m = FileSystems.matchSingleFileSpec(filename); final InputStream inputStream = Channels.newInputStream(FileSystems.open(m.resourceId())); - return new Schema.Parser().parse(inputStream); + final Schema schema = new Schema.Parser().parse(inputStream); + + LOGGER.info("Parsed the provided schema from: [{}]", filename); + + return schema; } + } diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroSchema.java b/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroSchema.java index 291cea9d..738fbce4 100644 --- a/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroSchema.java +++ b/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroSchema.java @@ -52,7 +52,6 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; -import java.util.Optional; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; import org.slf4j.Logger; @@ -65,23 +64,15 @@ public class JdbcAvroSchema { public static Schema createSchemaByReadingOneRow( final Connection connection, final QueryBuilderArgs queryBuilderArgs, - final String avroSchemaNamespace, - final Optional schemaName, - final String avroDoc, - final boolean useLogicalTypes) + final boolean useLogicalTypes, + final AvroSchemaMetadataProvider provider) throws SQLException { LOGGER.debug("Creating Avro schema based on the first read row from the database"); try (Statement statement = connection.createStatement()) { final ResultSet resultSet = statement.executeQuery(queryBuilderArgs.sqlQueryWithLimitOne()); final Schema schema = - createAvroSchema( - resultSet, - avroSchemaNamespace, - connection.getMetaData().getURL(), - schemaName, - avroDoc, - useLogicalTypes); + createAvroSchema(resultSet, connection.getMetaData().getURL(), useLogicalTypes, provider); LOGGER.info("Schema created successfully. Generated schema: {}", schema.toString()); return schema; } @@ -89,25 +80,27 @@ public static Schema createSchemaByReadingOneRow( public static Schema createAvroSchema( final ResultSet resultSet, - final String avroSchemaNamespace, final String connectionUrl, - final Optional maybeSchemaName, - final String avroDoc, - final boolean useLogicalTypes) + final boolean useLogicalTypes, + final AvroSchemaMetadataProvider provider) throws SQLException { final ResultSetMetaData meta = resultSet.getMetaData(); final String tableName = getDatabaseTableName(meta); - final String schemaName = maybeSchemaName.orElse(tableName); + final String recordName = provider.avroSchemaName(tableName); + final String namespace = provider.avroSchemaNamespace(); + final String recordDoc = + provider.avroDoc( + String.format("Generate schema from JDBC ResultSet from %s", connectionUrl)); final SchemaBuilder.FieldAssembler builder = - SchemaBuilder.record(schemaName) - .namespace(avroSchemaNamespace) - .doc(avroDoc) + SchemaBuilder.record(recordName) + .namespace(namespace) + .doc(recordDoc) .prop("tableName", tableName) .prop("connectionUrl", connectionUrl) .fields(); - return createAvroFields(meta, builder, useLogicalTypes).endRecord(); + return createAvroFields(meta, builder, useLogicalTypes, provider).endRecord(); } static String getDatabaseTableName(final ResultSetMetaData meta) throws SQLException { @@ -125,25 +118,26 @@ static String getDatabaseTableName(final ResultSetMetaData meta) throws SQLExcep private static SchemaBuilder.FieldAssembler createAvroFields( final ResultSetMetaData meta, final SchemaBuilder.FieldAssembler builder, - final boolean useLogicalTypes) + final boolean useLogicalTypes, + final AvroSchemaMetadataProvider provider) throws SQLException { + final StringBuilder sqlMetadataLog = new StringBuilder("Sql ResultSet metadata: { "); + for (int i = 1; i <= meta.getColumnCount(); i++) { - final String columnName; - if (meta.getColumnName(i).isEmpty()) { - columnName = meta.getColumnLabel(i); - } else { - columnName = meta.getColumnName(i); - } - + final String columnName = getColumnName(meta, i); final int columnType = meta.getColumnType(i); final String typeName = JDBCType.valueOf(columnType).getName(); final String columnClassName = meta.getColumnClassName(i); final SchemaBuilder.FieldBuilder field = builder .name(normalizeForAvro(columnName)) - .doc(String.format("From sqlType %d %s (%s)", columnType, typeName, columnClassName)) + .doc( + provider.getFieldDoc( + columnName, + String.format( + "From sqlType %d %s (%s)", columnType, typeName, columnClassName))) .prop("columnName", columnName) .prop("sqlCode", String.valueOf(columnType)) .prop("typeName", typeName) @@ -162,10 +156,25 @@ private static SchemaBuilder.FieldAssembler createAvroFields( fieldSchemaBuilder); schemaFieldAssembler.endUnion().nullDefault(); + + sqlMetadataLog.append(String.format("#[%d] name[%s] type[%s], ", i, columnName, typeName)); } + + LOGGER.info(sqlMetadataLog.append(" }").toString()); + return builder; } + private static String getColumnName(ResultSetMetaData meta, int i) throws SQLException { + final String columnName; + if (meta.getColumnName(i).isEmpty()) { + columnName = meta.getColumnLabel(i); + } else { + columnName = meta.getColumnName(i); + } + return columnName; + } + /** * Creates Avro field schema based on JDBC MetaData * @@ -176,7 +185,6 @@ private static SchemaBuilder.FieldAssembler createAvroFields( *
  • {@link com.mysql.cj.MysqlType } *
  • {@link org.h2.value.TypeInfo } * - * */ private static SchemaBuilder.UnionAccumulator> setAvroColumnType( diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/jobs/JdbcAvroJob.java b/dbeam-core/src/main/java/com/spotify/dbeam/jobs/JdbcAvroJob.java index 7d74421a..074e1c98 100644 --- a/dbeam-core/src/main/java/com/spotify/dbeam/jobs/JdbcAvroJob.java +++ b/dbeam-core/src/main/java/com/spotify/dbeam/jobs/JdbcAvroJob.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import com.spotify.dbeam.args.JdbcExportArgs; +import com.spotify.dbeam.avro.AvroSchemaMetadataProvider; import com.spotify.dbeam.avro.BeamJdbcAvroSchema; import com.spotify.dbeam.avro.JdbcAvroIO; import com.spotify.dbeam.avro.JdbcAvroMetering; @@ -143,12 +144,30 @@ public void prepareExport() throws Exception { output, ".avro", generatedSchema, jdbcExportArgs.jdbcAvroOptions())); } - private Schema createSchema(final Connection connection) throws Exception { - if (this.jdbcExportArgs.inputAvroSchema().isPresent()) { - return this.jdbcExportArgs.inputAvroSchema().get(); - } else { - return BeamJdbcAvroSchema.createSchema(this.pipeline, jdbcExportArgs, connection); + Schema createSchema(final Connection connection) throws Exception { + + AvroSchemaMetadataProvider metadataProvider = + new AvroSchemaMetadataProvider( + getProvidedSchema(), + jdbcExportArgs.avroSchemaName().orElse(null), + jdbcExportArgs.avroSchemaNamespace(), + jdbcExportArgs.avroDoc().orElse(null)); + + Schema generatedSchema = + BeamJdbcAvroSchema.createSchema( + this.pipeline, jdbcExportArgs, connection, metadataProvider); + + return generatedSchema; + } + + Schema getProvidedSchema() throws IOException { + String avroSchemaFilePath = + pipelineOptions.as(JdbcExportPipelineOptions.class).getAvroSchemaFilePath(); + if (avroSchemaFilePath != null && !avroSchemaFilePath.isEmpty()) { + return BeamJdbcAvroSchema.parseInputAvroSchemaFile(avroSchemaFilePath); } + + return null; } public Pipeline getPipeline() { diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/options/JdbcExportArgsFactory.java b/dbeam-core/src/main/java/com/spotify/dbeam/options/JdbcExportArgsFactory.java index 37e056e6..5238cb76 100644 --- a/dbeam-core/src/main/java/com/spotify/dbeam/options/JdbcExportArgsFactory.java +++ b/dbeam-core/src/main/java/com/spotify/dbeam/options/JdbcExportArgsFactory.java @@ -26,7 +26,6 @@ import com.spotify.dbeam.args.JdbcConnectionArgs; import com.spotify.dbeam.args.JdbcExportArgs; import com.spotify.dbeam.args.QueryBuilderArgs; -import com.spotify.dbeam.avro.BeamJdbcAvroSchema; import com.spotify.dbeam.beam.BeamHelper; import java.io.IOException; import java.time.Duration; @@ -77,8 +76,7 @@ public static JdbcExportArgs fromPipelineOptions(final PipelineOptions options) Optional.ofNullable(exportOptions.getAvroSchemaName()), Optional.ofNullable(exportOptions.getAvroDoc()), exportOptions.isUseAvroLogicalTypes(), - Duration.parse(exportOptions.getExportTimeout()), - BeamJdbcAvroSchema.parseOptionalInputAvroSchemaFile(exportOptions.getAvroSchemaFilePath())); + Duration.parse(exportOptions.getExportTimeout())); } public static QueryBuilderArgs createQueryArgs(final JdbcExportPipelineOptions options) diff --git a/dbeam-core/src/test/java/com/spotify/dbeam/JavaSqlHelper.java b/dbeam-core/src/test/java/com/spotify/dbeam/JavaSqlHelper.java new file mode 100644 index 00000000..e46599de --- /dev/null +++ b/dbeam-core/src/test/java/com/spotify/dbeam/JavaSqlHelper.java @@ -0,0 +1,61 @@ +/*- + * -\-\- + * DBeam Core + * -- + * Copyright (C) 2016 - 2018 Spotify AB + * -- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * -/-/- + */ + +package com.spotify.dbeam; + +import static org.mockito.Mockito.when; + +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Types; +import org.mockito.Mockito; + +public class JavaSqlHelper { + + public static class DummyResultSetMetaData { + + public static ResultSetMetaData create(int columnCount) throws SQLException { + + String[] tableNames = new String[columnCount]; + String[] columnNames = new String[columnCount]; + int[] columnTypes = new int[columnCount]; + for (int i = 0; i < columnCount; i++) { + tableNames[i] = "dummyTableName" + i; + columnNames[i] = "dummyColumnName" + i; + columnTypes[i] = Types.CHAR; + } + return create(columnCount, tableNames, columnNames, columnTypes); + } + + public static ResultSetMetaData create( + int columnCount, String[] tableNames, String[] columnNames, int[] columnTypes) + throws SQLException { + ResultSetMetaData rsMetaData = Mockito.mock(ResultSetMetaData.class); + when(rsMetaData.getColumnCount()).thenReturn(columnCount); + for (int i = 0; i < columnCount; i++) { + when(rsMetaData.getTableName(i + 1)).thenReturn(tableNames[i]); + when(rsMetaData.getColumnName(i + 1)).thenReturn(columnNames[i]); + when(rsMetaData.getColumnType(i + 1)).thenReturn(columnTypes[i]); + when(rsMetaData.getColumnClassName(i + 1)).thenReturn("dummy"); + } + return rsMetaData; + } + } +} diff --git a/dbeam-core/src/test/java/com/spotify/dbeam/avro/AvroSchemaMetadataProviderTest.java b/dbeam-core/src/test/java/com/spotify/dbeam/avro/AvroSchemaMetadataProviderTest.java new file mode 100644 index 00000000..0de9a074 --- /dev/null +++ b/dbeam-core/src/test/java/com/spotify/dbeam/avro/AvroSchemaMetadataProviderTest.java @@ -0,0 +1,177 @@ +/*- + * -\-\- + * DBeam Core + * -- + * Copyright (C) 2016 - 2019 Spotify AB + * -- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * -/-/- + */ + +package com.spotify.dbeam.avro; + +import java.util.Arrays; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class AvroSchemaMetadataProviderTest { + + @BeforeClass + public static void beforeAll() {} + + public static Schema createProvidedSchemaWithDoc( + final List fieldNames, final String schemaDoc) { + return createProvidedSchema(fieldNames, "providedSchemaName", schemaDoc); + } + + public static Schema createProvidedSchemaWithName( + final List fieldNames, final String schemaName) { + return createProvidedSchema(fieldNames, schemaName, "providedSchemaDoc"); + } + + public static Schema createProvidedSchema(final List fieldNames) { + return createProvidedSchema(fieldNames, "providedSchemaName", "providedSchemaDoc"); + } + + public static Schema createProvidedSchema( + final List fieldNames, final String schemaName, final String schemaDoc) { + final SchemaBuilder.FieldAssembler builder = + SchemaBuilder.record(schemaName) + .namespace("providedSchemaNamespace") + .doc(schemaDoc) + .fields(); + + fieldNames.forEach( + fieldName -> + builder.name(fieldName).doc("Doc for " + fieldName).type().stringType().noDefault()); + return builder.endRecord(); + } + + @Test + public void verifySimpleProvider() { + + AvroSchemaMetadataProvider provider = + new AvroSchemaMetadataProvider("schemaName", "schemaNamespace", "avroDoc"); + + Assert.assertEquals("schemaName", provider.avroSchemaName("")); + Assert.assertEquals("schemaNamespace", provider.avroSchemaNamespace()); + Assert.assertEquals("avroDoc", provider.avroDoc("")); + } + + @Test + public void verifyEmptyProvider() { + + AvroSchemaMetadataProvider provider = + new AvroSchemaMetadataProvider(null, "schemaNamespace", null); + + Assert.assertEquals("temp1", provider.avroSchemaName("temp1")); + Assert.assertEquals("schemaNamespace", provider.avroSchemaNamespace()); + Assert.assertEquals("temp2", provider.avroDoc("temp2")); + } + + @Test + public void verifyProviderWithSchema() { + + List fieldNames = Arrays.asList("field1", "field2"); + Schema providedSchema = createProvidedSchema(fieldNames); + + AvroSchemaMetadataProvider provider = + new AvroSchemaMetadataProvider(providedSchema, null, "schemaNamespace", null); + + Assert.assertEquals("providedSchemaName", provider.avroSchemaName("temp1")); + Assert.assertEquals("providedSchemaNamespace", provider.avroSchemaNamespace()); + Assert.assertEquals("providedSchemaDoc", provider.avroDoc("temp2")); + Assert.assertEquals("Doc for field1", provider.getFieldDoc("field1", "dummy")); + Assert.assertEquals("Doc for field2", provider.getFieldDoc("field2", "dummy")); + Assert.assertEquals("default", provider.getFieldDoc("field3", "default")); + } + + private static AvroSchemaMetadataProvider getAvroSchemaMetadataProviderWithDoc( + String providedSchemaDoc, String commandLineSchemaDoc) { + List fieldNames = Arrays.asList("field1", "field2"); + Schema providedSchema = createProvidedSchemaWithDoc(fieldNames, providedSchemaDoc); + + return new AvroSchemaMetadataProvider( + providedSchema, null, "schemaNamespace", commandLineSchemaDoc); + } + + @Test + public void verifyProviderWithSchemaAndNoSchemaDocUsesCommandLineDoc() { + + final String providedSchemaDoc = null; // not provided + final String commandLineSchemaDoc = "Custom Doc"; + final String generatedSchemaDoc = "GeneratedDoc"; + final AvroSchemaMetadataProvider provider = + getAvroSchemaMetadataProviderWithDoc(providedSchemaDoc, commandLineSchemaDoc); + + Assert.assertEquals(commandLineSchemaDoc, provider.avroDoc(generatedSchemaDoc)); + } + + @Test + public void verifyProviderWithSchemaAndEmptySchemaDocUsesSchemaDoc() { + + final String providedSchemaDoc = ""; // empty + final String commandLineSchemaDoc = "Custom Doc"; + final String generatedSchemaDoc = "GeneratedDoc"; + final AvroSchemaMetadataProvider provider = + getAvroSchemaMetadataProviderWithDoc(providedSchemaDoc, commandLineSchemaDoc); + + Assert.assertEquals(providedSchemaDoc, provider.avroDoc(generatedSchemaDoc)); + } + + @Test + public void verifyProviderWithSchemaAndNoSchemaDocAndNoCommandLineDocUsesGeneratedDoc() { + + final String providedSchemaDoc = null; // not provided + final String commandLineSchemaDoc = null; + final String generatedSchemaDoc = "GeneratedDoc"; + final AvroSchemaMetadataProvider provider = + getAvroSchemaMetadataProviderWithDoc(providedSchemaDoc, commandLineSchemaDoc); + + Assert.assertEquals(generatedSchemaDoc, provider.avroDoc(generatedSchemaDoc)); + } + + private static AvroSchemaMetadataProvider getAvroSchemaMetadataProviderWithName( + String providedSchemaName, String commandLineSchemaName) { + List fieldNames = Arrays.asList("field1", "field2"); + Schema providedSchema = createProvidedSchemaWithName(fieldNames, providedSchemaName); + + return new AvroSchemaMetadataProvider( + providedSchema, commandLineSchemaName, "schemaNamespace", "DummyDoc"); + } + + @Test(expected = NullPointerException.class) + public void verifyProviderWithSchemaAndNoSchemaNameThrowsException() { + + final String providedSchemaName = null; // not provided + final String commandLineSchemaName = "Custom Name"; + final String generatedSchemaName = "GeneratedName"; + getAvroSchemaMetadataProviderWithName(providedSchemaName, commandLineSchemaName); + } + + @Test + public void verifyProviderWithSchemaAndNoSchemaNameAndNoCommandLineNameUsesGeneratedName() { + + final String providedSchemaName = "ProvidedName"; + final String commandLineSchemaName = "Custom Name"; + final String generatedSchemaName = "GeneratedName"; + final AvroSchemaMetadataProvider provider = + getAvroSchemaMetadataProviderWithName(providedSchemaName, commandLineSchemaName); + + Assert.assertEquals(providedSchemaName, provider.avroSchemaName(generatedSchemaName)); + } +} diff --git a/dbeam-core/src/test/java/com/spotify/dbeam/avro/JdbcAvroRecordTest.java b/dbeam-core/src/test/java/com/spotify/dbeam/avro/JdbcAvroRecordTest.java index 2004b4fd..bad979cd 100644 --- a/dbeam-core/src/test/java/com/spotify/dbeam/avro/JdbcAvroRecordTest.java +++ b/dbeam-core/src/test/java/com/spotify/dbeam/avro/JdbcAvroRecordTest.java @@ -20,6 +20,8 @@ package com.spotify.dbeam.avro; +import static com.spotify.dbeam.avro.AvroSchemaMetadataProviderTest.createProvidedSchema; +import static com.spotify.dbeam.jobs.InputAvroSchemaTest.getMockConnection; import static org.mockito.Mockito.when; import com.google.common.collect.Lists; @@ -30,10 +32,12 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.sql.Connection; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Timestamp; +import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; @@ -52,7 +56,7 @@ public class JdbcAvroRecordTest { - private static String CONNECTION_URL = + private static final String CONNECTION_URL = "jdbc:h2:mem:test;MODE=PostgreSQL;DATABASE_TO_UPPER=false;DB_CLOSE_DELAY=-1"; @BeforeClass @@ -63,14 +67,19 @@ public static void beforeAll() throws SQLException, ClassNotFoundException { @Test public void shouldCreateSchema() throws ClassNotFoundException, SQLException { final int fieldCount = 12; + + AvroSchemaMetadataProvider provider = + new AvroSchemaMetadataProvider( + null, + "dbeam_generated", + "Generate schema from JDBC ResultSet from COFFEES jdbc:h2:mem:test"); + final Schema actual = JdbcAvroSchema.createSchemaByReadingOneRow( DbTestHelper.createConnection(CONNECTION_URL), QueryBuilderArgs.create("COFFEES"), - "dbeam_generated", - Optional.empty(), - "Generate schema from JDBC ResultSet from COFFEES jdbc:h2:mem:test", - false); + false, + provider); Assert.assertNotNull(actual); Assert.assertEquals("dbeam_generated", actual.getNamespace()); @@ -129,14 +138,19 @@ public void shouldCreateSchema() throws ClassNotFoundException, SQLException { @Test public void shouldCreateSchemaWithLogicalTypes() throws ClassNotFoundException, SQLException { final int fieldCount = 12; + + AvroSchemaMetadataProvider provider = + new AvroSchemaMetadataProvider( + "dbeam_generated", + null, + "Generate schema from JDBC ResultSet from COFFEES jdbc:h2:mem:test"); + final Schema actual = JdbcAvroSchema.createSchemaByReadingOneRow( DbTestHelper.createConnection(CONNECTION_URL), QueryBuilderArgs.create("COFFEES"), - "dbeam_generated", - Optional.empty(), - "Generate schema from JDBC ResultSet from COFFEES jdbc:h2:mem:test", - true); + true, + provider); Assert.assertEquals(fieldCount, actual.getFields().size()); Assert.assertEquals( @@ -146,18 +160,50 @@ public void shouldCreateSchemaWithLogicalTypes() throws ClassNotFoundException, @Test public void shouldCreateSchemaWithCustomSchemaName() throws ClassNotFoundException, SQLException { + + AvroSchemaMetadataProvider provider = + new AvroSchemaMetadataProvider( + "CustomSchemaName", + "dbeam_generated", + "Generate schema from JDBC ResultSet from COFFEES jdbc:h2:mem:test"); + final Schema actual = JdbcAvroSchema.createSchemaByReadingOneRow( DbTestHelper.createConnection(CONNECTION_URL), QueryBuilderArgs.create("COFFEES"), - "dbeam_generated", - Optional.of("CustomSchemaName"), - "Generate schema from JDBC ResultSet from COFFEES jdbc:h2:mem:test", - false); + false, + provider); Assert.assertEquals("CustomSchemaName", actual.getName()); } + @Test + public void shouldCreateSchemaWithUserProvidedSchemaAndBackupDocForMissingField() + throws SQLException { + + List fieldNames = Arrays.asList("dummyColumnName0", "dummyColumnName1"); + Schema providedSchema = createProvidedSchema(fieldNames); + + AvroSchemaMetadataProvider provider = + new AvroSchemaMetadataProvider( + providedSchema, "Default schema name", "Default schema namespace", "Default avro doc"); + + Connection mockConnection = getMockConnection(3); + final Schema actualSchema = + JdbcAvroSchema.createSchemaByReadingOneRow( + mockConnection, QueryBuilderArgs.create("dummy_table"), false, provider); + + // value provide by a user-supplied schema + Assert.assertEquals( + "Doc for dummyColumnName0", actualSchema.getField("dummyColumnName0").doc()); + // value provide by a user-supplied schema + Assert.assertEquals( + "Doc for dummyColumnName1", actualSchema.getField("dummyColumnName1").doc()); + // back-up value generated by default + Assert.assertEquals( + "From sqlType 1 CHAR (dummy)", actualSchema.getField("dummyColumnName2").doc()); + } + @Test public void shouldEncodeResultSetToValidAvro() throws ClassNotFoundException, SQLException, IOException { @@ -165,9 +211,11 @@ public void shouldEncodeResultSetToValidAvro() DbTestHelper.createConnection(CONNECTION_URL) .createStatement() .executeQuery("SELECT * FROM COFFEES"); - final Schema schema = - JdbcAvroSchema.createAvroSchema( - rs, "dbeam_generated", "connection", Optional.empty(), "doc", false); + + AvroSchemaMetadataProvider provider = + new AvroSchemaMetadataProvider("dbeam_generated", null, "doc"); + + final Schema schema = JdbcAvroSchema.createAvroSchema(rs, "connection", false, provider); final JdbcAvroRecordConverter converter = JdbcAvroRecordConverter.create(rs); final DataFileWriter dataFileWriter = new DataFileWriter<>(new GenericDatumWriter<>(schema)); diff --git a/dbeam-core/src/test/java/com/spotify/dbeam/avro/JdbcAvroSchemaTest.java b/dbeam-core/src/test/java/com/spotify/dbeam/avro/JdbcAvroSchemaTest.java index afa9d2fe..cc76f2c8 100644 --- a/dbeam-core/src/test/java/com/spotify/dbeam/avro/JdbcAvroSchemaTest.java +++ b/dbeam-core/src/test/java/com/spotify/dbeam/avro/JdbcAvroSchemaTest.java @@ -26,7 +26,6 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Types; -import java.util.Optional; import org.apache.avro.Schema; import org.junit.Assert; import org.junit.Test; @@ -151,9 +150,14 @@ public void shouldDefaultConversionToStringType() throws SQLException { private Schema createAvroSchemaForSingleField( final ResultSet resultSet, final boolean useLogicalTypes) throws SQLException { + final String avroSchemaNamespace = "namespace1"; + final String connectionUrl = "url1"; + final String schemaName = null; + final String avroDoc = "doc1"; + AvroSchemaMetadataProvider provider = + new AvroSchemaMetadataProvider(avroSchemaNamespace, schemaName, avroDoc); Schema avroSchema = - JdbcAvroSchema.createAvroSchema( - resultSet, "namespace1", "url1", Optional.empty(), "doc1", useLogicalTypes); + JdbcAvroSchema.createAvroSchema(resultSet, connectionUrl, useLogicalTypes, provider); return avroSchema.getField("column1").schema().getTypes().get(COLUMN_NUM); } diff --git a/dbeam-core/src/test/java/com/spotify/dbeam/options/InputAvroSchemaTest.java b/dbeam-core/src/test/java/com/spotify/dbeam/jobs/InputAvroSchemaTest.java similarity index 69% rename from dbeam-core/src/test/java/com/spotify/dbeam/options/InputAvroSchemaTest.java rename to dbeam-core/src/test/java/com/spotify/dbeam/jobs/InputAvroSchemaTest.java index 1b03dcc9..213966cf 100644 --- a/dbeam-core/src/test/java/com/spotify/dbeam/options/InputAvroSchemaTest.java +++ b/dbeam-core/src/test/java/com/spotify/dbeam/jobs/InputAvroSchemaTest.java @@ -18,11 +18,18 @@ * -/-/- */ -package com.spotify.dbeam.options; +package com.spotify.dbeam.jobs; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; + +import com.spotify.dbeam.JavaSqlHelper; +import com.spotify.dbeam.args.JdbcAvroArgs; +import com.spotify.dbeam.args.JdbcExportArgs; import com.spotify.dbeam.args.QueryBuilderArgs; import com.spotify.dbeam.args.QueryBuilderArgsTest; import com.spotify.dbeam.avro.BeamJdbcAvroSchema; +import com.spotify.dbeam.options.JdbcExportPipelineOptions; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; @@ -30,18 +37,27 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Optional; import org.apache.avro.Schema; import org.apache.avro.SchemaParseException; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.TestPipeline; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; +import org.mockito.Mockito; public class InputAvroSchemaTest { @@ -86,6 +102,11 @@ private static File createTestAvroSchemaFile(final String jsonSchema) throws IOE return newTempFile; } + @AfterClass + public static void afterAll() throws IOException { + Files.delete(avroSchemaFile.toPath()); + } + private Schema createRecordSchema( final String recordName, final String recordDoc, @@ -104,11 +125,6 @@ private Schema createRecordSchema( return inputSchema; } - @AfterClass - public static void afterAll() throws IOException { - Files.delete(avroSchemaFile.toPath()); - } - @Test public void checkReadAvroSchema() throws IOException { final JdbcExportPipelineOptions options = @@ -125,18 +141,64 @@ public void checkReadAvroSchema() throws IOException { } @Test - @Ignore - public void checkFullPath() { - // TODO - // Check provide input string to args and verify final schema - - final String[] fieldNames = null; - final String[] fieldDocs = null; - final String recordName = "COFFEE"; - final String recordDoc = "Input record doc"; - final String recordNamespace = "Input record namespace"; - final Schema inputSchema = - createRecordSchema(recordName, recordDoc, recordNamespace, fieldNames, fieldDocs); + public void checkSuppliedAvroSchemaValidationPassed() throws Exception { + + int columnCountInResultSet = 2; + // Since our supplied schema has 2 fields, validation should fail. + + Schema schema = createAndVerifySuppliedSchema(columnCountInResultSet); + + Assert.assertEquals(2, schema.getFields().size()); + } + + private Schema createAndVerifySuppliedSchema(int columnCount) throws Exception { + final JdbcExportPipelineOptions options = + PipelineOptionsFactory.create().as(JdbcExportPipelineOptions.class); + options.setAvroSchemaFilePath(avroSchemaFilePathStr); + + PipelineOptions pipelineOptions = options; + final Pipeline pipeline = TestPipeline.create(); + + final JdbcExportArgs jdbcExportArgs = + JdbcExportArgs.create( + // JdbcAvroArgs.create(JdbcConnectionArgs.create("dummyUrl")), + Mockito.mock(JdbcAvroArgs.class), + QueryBuilderArgs.create("dummyTable"), + "avroSchemaNamespace", + Optional.empty(), + Optional.empty(), + false, + Duration.ofSeconds(30)); + + // mocks set-up + Connection connection = getMockConnection(columnCount); + + final String output = "output"; + final boolean dataOnly = true; + final long minRows = -1L; + + final JdbcAvroJob job = + new JdbcAvroJob(pipelineOptions, pipeline, jdbcExportArgs, output, dataOnly, minRows); + + return job.createSchema(connection); + } + + public static Connection getMockConnection(int columnCount) throws SQLException { + DatabaseMetaData meta = Mockito.mock(DatabaseMetaData.class); + when(meta.getURL()).thenReturn("dummyUrl"); + + ResultSetMetaData rsMetaData = JavaSqlHelper.DummyResultSetMetaData.create(columnCount); + + ResultSet resultSet = Mockito.mock(ResultSet.class); + when(resultSet.getMetaData()).thenReturn(rsMetaData); + + Statement statement = Mockito.mock(Statement.class); + when(statement.executeQuery(anyString())).thenReturn(resultSet); + + Connection connection = Mockito.mock(Connection.class); + when(connection.getMetaData()).thenReturn(meta); + when(connection.createStatement()).thenReturn(statement); + return connection; } @Test @@ -147,8 +209,6 @@ public void checkReadAvroSchemaWithEmptyParameter() throws IOException { options.setAvroSchemaFilePath(path); Assert.assertEquals(path, options.getAvroSchemaFilePath()); - Assert.assertEquals( - Optional.empty(), BeamJdbcAvroSchema.parseOptionalInputAvroSchemaFile(path)); } @Test @@ -159,8 +219,6 @@ public void checkReadAvroSchemaWithNullParameter() throws IOException { options.setAvroSchemaFilePath(path); Assert.assertEquals(path, options.getAvroSchemaFilePath()); - Assert.assertEquals( - Optional.empty(), BeamJdbcAvroSchema.parseOptionalInputAvroSchemaFile(path)); } @Test(expected = SchemaParseException.class) @@ -173,7 +231,7 @@ public void checkReadAvroSchemaWithInvalidFormat() throws IOException { options.setAvroSchemaFilePath(path); Assert.assertEquals(path, options.getAvroSchemaFilePath()); - BeamJdbcAvroSchema.parseOptionalInputAvroSchemaFile(path); + BeamJdbcAvroSchema.parseInputAvroSchemaFile(path); } @Test(expected = FileNotFoundException.class) @@ -184,7 +242,7 @@ public void checkReadAvroSchemaWithNonExistentFile() throws IOException { options.setAvroSchemaFilePath(path); Assert.assertEquals(path, options.getAvroSchemaFilePath()); - BeamJdbcAvroSchema.parseOptionalInputAvroSchemaFile(path); + BeamJdbcAvroSchema.parseInputAvroSchemaFile(path); } @Test @@ -206,14 +264,4 @@ public void checkEmptyCommandLineArgIsParsedAsOptions() throws IOException, SQLE Assert.assertNull(options.getAvroSchemaFilePath()); } - - private QueryBuilderArgs pareOptions(String cmdLineArgs) throws IOException { - PipelineOptionsFactory.register(JdbcExportPipelineOptions.class); - final JdbcExportPipelineOptions opts = - PipelineOptionsFactory.fromArgs(cmdLineArgs.split(" ")) - .withValidation() - .create() - .as(JdbcExportPipelineOptions.class); - return JdbcExportArgsFactory.createQueryArgs(opts); - } } diff --git a/dbeam-core/src/test/java/com/spotify/dbeam/jobs/PsqlReplicationCheckTest.java b/dbeam-core/src/test/java/com/spotify/dbeam/jobs/PsqlReplicationCheckTest.java index b5491463..9a659b88 100644 --- a/dbeam-core/src/test/java/com/spotify/dbeam/jobs/PsqlReplicationCheckTest.java +++ b/dbeam-core/src/test/java/com/spotify/dbeam/jobs/PsqlReplicationCheckTest.java @@ -44,8 +44,7 @@ private static JdbcExportArgs createArgs(String url, QueryBuilderArgs queryBuild Optional.empty(), Optional.empty(), false, - Duration.ZERO, - Optional.empty()); + Duration.ZERO); } @Test(expected = IllegalArgumentException.class) diff --git a/docs/provided_schema.md b/docs/provided_schema.md new file mode 100644 index 00000000..861a238c --- /dev/null +++ b/docs/provided_schema.md @@ -0,0 +1,77 @@ +# Output Avro schema + +## Generated Avro schema + +`dbeam` generates an Avro schema using an SQL query result. +There are a number of settings to configure a few the resulting schema. + * `--avroDoc=` + * `--avroSchemaName=` + * `--avroSchemaNamespace=` + * `--useAvroLogicalTypes=` + +However, there is no way to supply your own fields' `doc` properties. +They will be auto-generated. + +## Custom provided schema +A user can provide own Avro schema using a parameter + * `--avroSchemaFilePath`. + +E.g. `--avroSchemaFilePath=/temp/my_record.avsc` + +The main purpose of using this approach is to supply own values for Avro record fields' `doc` attribute for a generated Avro schema used in output Avro files. + +### Fields look-up +Avro record fields in the provided schema are looked up using field `name`, so it is important that the fields' names are the same as in the generated schema. +Thus, it is recommended to copy the generated schema, update `doc` fields and use it as `--avroSchemaFilePath` input. + +### A generated Avro schema example +``` +{ + "type" : "record", + "name" : "table_numbers", + "namespace" : "dbeam_generated", + "doc" : "Generate schema from JDBC ResultSet from jdbc:mysql://localhost/dev_db", + "fields" : [ { + "name" : "id", + "type" : [ "null", "string" ], + "doc" : "From sqlType -5 BIGINT", + "default" : null, + "typeName" : "BIGINT", + "sqlCode" : "-5", + "columnName" : "id" + }, { + "name" : "seq_num", + "type" : [ "null", "int" ], + "doc" : "From sqlType 4 INTEGER", + "default" : null, + "typeName" : "INTEGER", + "sqlCode" : "4", + "columnName" : "seq_num" + } ], + "connectionUrl" : "jdbc:mysql://localhost/dev_db", + "tableName" : "table_numbers" +} +``` +### A user-provided schema example +``` +{ + "type" : "record", + "name" : "Your own record name", + "namespace" : "You own namespace", + "doc" : "Your own description", + "fields" : [ + { + "name" : "id", + "type" : [ "null", "string" ], + "default" : null, + "doc" : "Your own field description here. #DATA_LEVEL7" + }, + { + "name" : "seq_num", + "type" : [ "null", "int" ], + "default" : null, + "doc" : "Your own field description here. #DATA_LEVEL8" + } + ] +} +```