From c79887930658043e4d354d836941f457a8135035 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anz=CC=8Ce=20Kravanja?= Date: Wed, 8 Feb 2023 14:06:22 -0800 Subject: [PATCH] Added the ability to select partition type (HOUR, DAY, MONTH, YEAR) for each stream, apart from just partition field and clustering fields. --- README.md | 1 + target_bigquery/processhandler.py | 30 ++++++++++++------- ...ream_table_config_with_partition_type.json | 11 +++++++ tests/test.py | 4 +-- tests/test_simplestream.py | 30 ++++++++++++++++++- 5 files changed, 63 insertions(+), 13 deletions(-) create mode 100644 tests/rsc/config/simple_stream_table_config_with_partition_type.json diff --git a/README.md b/README.md index f84d7cd..6823b3f 100644 --- a/README.md +++ b/README.md @@ -323,6 +323,7 @@ To configure partitioning and clustering in BigQuery destination tables, we crea "streams": { "charges": { "partition_field": "updated_at", + "partition_type": "DAY", // optional, DAY by default, must be one of [DAY, HOUR, MONTH, YEAR] "cluster_fields": ["type", "status", "customer_id", "transaction_id"] } } diff --git a/target_bigquery/processhandler.py b/target_bigquery/processhandler.py index 1f9b5d2..e9c3f95 100644 --- a/target_bigquery/processhandler.py +++ b/target_bigquery/processhandler.py @@ -179,17 +179,17 @@ def handle_record_message(self, msg): schema = self.schemas[stream] bq_schema = self.bq_schema_dicts[stream] - nr = cleanup_record(schema, msg.record, force_fields=self.table_configs.get(msg.stream, {}).get("force_fields", {})) + nr = cleanup_record(schema, msg.record, + force_fields=self.table_configs.get(msg.stream, {}).get("force_fields", {})) try: nr = format_record_to_schema(nr, self.bq_schema_dicts[stream]) except Exception as e: - extra={"record" : msg.record, "schema": schema, "bq_schema": bq_schema} - self.logger.critical(f"Cannot format a record for stream {msg.stream} to its corresponding BigQuery schema. Details: {extra}") + extra = {"record": msg.record, "schema": schema, "bq_schema": bq_schema} + self.logger.critical( + f"Cannot format a record for stream {msg.stream} to its corresponding BigQuery schema. Details: {extra}") raise e - - # schema validation may fail if data doesn't match schema in terms of data types # in this case, we validate schema again on data which has been forced to match schema # nr is based on msg.record, but all data from msg.record has been forced to match schema @@ -226,8 +226,9 @@ def primary_key_condition(self, stream): if len(keys) < 1: raise Exception(f"No primary keys specified from the tap and Incremental option selected") return " and ".join(keys) - #TODO: test it with multiple ids (an array of ids, if there are multiple key_properties in JSON schema) - #TODO: test it with dupe ids in the data + + # TODO: test it with multiple ids (an array of ids, if there are multiple key_properties in JSON schema) + # TODO: test it with dupe ids in the data def _do_temp_table_based_load(self, rows): assert isinstance(rows, dict) @@ -268,13 +269,14 @@ def _do_temp_table_based_load(self, rows): incremental_success = False if self.incremental: self.logger.info(f"Copy {tmp_table_name} to {self.tables[stream]} by INCREMENTAL") - self.logger.warning(f"INCREMENTAL replication method (MERGE SQL statement) is not recommended. It might result in loss of production data, because historical records get updated during the sync operation. Instead, we recommend using the APPEND replication method, which will preserve historical data.") + self.logger.warning( + f"INCREMENTAL replication method (MERGE SQL statement) is not recommended. It might result in loss of production data, because historical records get updated during the sync operation. Instead, we recommend using the APPEND replication method, which will preserve historical data.") table_id = f"{self.project_id}.{self.dataset.dataset_id}.{self.tables[stream]}" try: self.client.get_table(table_id) column_names = [x.name for x in self.bq_schemas[stream]] - query ="""MERGE `{table}` t + query = """MERGE `{table}` t USING `{temp_table}` s ON {primary_key_condition} WHEN MATCHED THEN @@ -348,9 +350,17 @@ def _load_to_bq(self, """ logger = self.logger partition_field = table_config.get("partition_field", None) + partition_type = table_config.get("partition_type", bigquery.table.TimePartitioningType.DAY) cluster_fields = table_config.get("cluster_fields", None) force_fields = table_config.get("force_fields", {}) + if partition_type not in [bigquery.table.TimePartitioningType.DAY, + bigquery.table.TimePartitioningType.HOUR, + bigquery.table.TimePartitioningType.MONTH, + bigquery.table.TimePartitioningType.YEAR]: + raise NotImplementedError( + f"Table name '{dataset.dataset_id}.{table_name}' was set to partition by '{partition_type}' which is not supported! Use one of [DAY, HOUR, MONTH, YEAR]. If empty, DAY will be used as default if partition_field is set.") + # schema_simplified = simplify(table_schema) # schema = build_schema(schema_simplified, key_properties=key_props, add_metadata=metadata_columns, # force_fields=force_fields) @@ -361,7 +371,7 @@ def _load_to_bq(self, # partitioning if partition_field: load_config.time_partitioning = bigquery.table.TimePartitioning( - type_=bigquery.table.TimePartitioningType.DAY, + type_=partition_type, field=partition_field ) diff --git a/tests/rsc/config/simple_stream_table_config_with_partition_type.json b/tests/rsc/config/simple_stream_table_config_with_partition_type.json new file mode 100644 index 0000000..4afe312 --- /dev/null +++ b/tests/rsc/config/simple_stream_table_config_with_partition_type.json @@ -0,0 +1,11 @@ +{ + "streams": { + "simple_stream": { + "partition_field": "date", + "partition_type": "YEAR", + "cluster_fields": [ + "name" + ] + } + } +} diff --git a/tests/test.py b/tests/test.py index ce43556..947660b 100644 --- a/tests/test.py +++ b/tests/test.py @@ -8,5 +8,5 @@ runner = unittest.TextTestRunner() -if __name__ == "__main__": # this line prevents tests from running twice - runner.run(suite) \ No newline at end of file +if __name__ == "__main__": # this line prevents tests from running twice + runner.run(suite) diff --git a/tests/test_simplestream.py b/tests/test_simplestream.py index a948bee..42d565f 100644 --- a/tests/test_simplestream.py +++ b/tests/test_simplestream.py @@ -120,7 +120,35 @@ def test_simple_stream_with_tables_config(self): table = self.client.get_table("{}.simple_stream".format(self.dataset_id)) self.assertEqual(3, table.num_rows, msg="Number of rows mismatch") self.assertIsNotNone(table.clustering_fields) - self.assertIsNotNone(table.partitioning_type) + self.assertIsNotNone(table.time_partitioning.type_ == 'DAY') + + def test_simple_stream_with_tables_config_with_partition_type(self): + from target_bigquery import main + + self.set_cli_args( + stdin=os.path.join(os.path.join( + os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'tests'), 'rsc'), + 'data'), 'simple_stream.json'), + config=os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'sandbox'), + 'target-config.json'), + tables=os.path.join( + os.path.join(os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), + 'tests'), 'rsc'), 'config'), 'simple_stream_table_config_with_partition_type.json'), + processhandler="load-job" + ) + + ret = main() + state = self.get_state()[-1] + print(state) + + self.assertEqual(ret, 0, msg="Exit code is not 0!") + self.assertDictEqual(state, {"bookmarks": {"simple_stream": {"timestamp": "2020-01-11T00:00:00.000000Z"}}}) + + table = self.client.get_table("{}.simple_stream".format(self.dataset_id)) + self.assertEqual(3, table.num_rows, msg="Number of rows mismatch") + self.assertIsNotNone(table.clustering_fields) + self.assertTrue(table.time_partitioning.type_ == 'YEAR') + def test_simple_stream_with_tables_config_passed_inside_target_config_file(self): """