Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/feature/issue39' into staging
Browse files Browse the repository at this point in the history
  • Loading branch information
RuslanBergenov committed Mar 21, 2023
2 parents afed95b + c798879 commit 54a745a
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 13 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ To configure partitioning and clustering in BigQuery destination tables, we crea
"streams": {
"charges": {
"partition_field": "updated_at",
"partition_type": "DAY", // optional, DAY by default, must be one of [DAY, HOUR, MONTH, YEAR]
"cluster_fields": ["type", "status", "customer_id", "transaction_id"]
}
}
Expand Down
30 changes: 20 additions & 10 deletions target_bigquery/processhandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,17 +179,17 @@ def handle_record_message(self, msg):

schema = self.schemas[stream]
bq_schema = self.bq_schema_dicts[stream]
nr = cleanup_record(schema, msg.record, force_fields=self.table_configs.get(msg.stream, {}).get("force_fields", {}))
nr = cleanup_record(schema, msg.record,
force_fields=self.table_configs.get(msg.stream, {}).get("force_fields", {}))

try:
nr = format_record_to_schema(nr, self.bq_schema_dicts[stream])
except Exception as e:
extra={"record" : msg.record, "schema": schema, "bq_schema": bq_schema}
self.logger.critical(f"Cannot format a record for stream {msg.stream} to its corresponding BigQuery schema. Details: {extra}")
extra = {"record": msg.record, "schema": schema, "bq_schema": bq_schema}
self.logger.critical(
f"Cannot format a record for stream {msg.stream} to its corresponding BigQuery schema. Details: {extra}")
raise e



# schema validation may fail if data doesn't match schema in terms of data types
# in this case, we validate schema again on data which has been forced to match schema
# nr is based on msg.record, but all data from msg.record has been forced to match schema
Expand Down Expand Up @@ -226,8 +226,9 @@ def primary_key_condition(self, stream):
if len(keys) < 1:
raise Exception(f"No primary keys specified from the tap and Incremental option selected")
return " and ".join(keys)
#TODO: test it with multiple ids (an array of ids, if there are multiple key_properties in JSON schema)
#TODO: test it with dupe ids in the data

# TODO: test it with multiple ids (an array of ids, if there are multiple key_properties in JSON schema)
# TODO: test it with dupe ids in the data

def _do_temp_table_based_load(self, rows):
assert isinstance(rows, dict)
Expand Down Expand Up @@ -268,13 +269,14 @@ def _do_temp_table_based_load(self, rows):
incremental_success = False
if self.incremental:
self.logger.info(f"Copy {tmp_table_name} to {self.tables[stream]} by INCREMENTAL")
self.logger.warning(f"INCREMENTAL replication method (MERGE SQL statement) is not recommended. It might result in loss of production data, because historical records get updated during the sync operation. Instead, we recommend using the APPEND replication method, which will preserve historical data.")
self.logger.warning(
f"INCREMENTAL replication method (MERGE SQL statement) is not recommended. It might result in loss of production data, because historical records get updated during the sync operation. Instead, we recommend using the APPEND replication method, which will preserve historical data.")
table_id = f"{self.project_id}.{self.dataset.dataset_id}.{self.tables[stream]}"
try:
self.client.get_table(table_id)
column_names = [x.name for x in self.bq_schemas[stream]]

query ="""MERGE `{table}` t
query = """MERGE `{table}` t
USING `{temp_table}` s
ON {primary_key_condition}
WHEN MATCHED THEN
Expand Down Expand Up @@ -348,9 +350,17 @@ def _load_to_bq(self,
"""
logger = self.logger
partition_field = table_config.get("partition_field", None)
partition_type = table_config.get("partition_type", bigquery.table.TimePartitioningType.DAY)
cluster_fields = table_config.get("cluster_fields", None)
force_fields = table_config.get("force_fields", {})

if partition_type not in [bigquery.table.TimePartitioningType.DAY,
bigquery.table.TimePartitioningType.HOUR,
bigquery.table.TimePartitioningType.MONTH,
bigquery.table.TimePartitioningType.YEAR]:
raise NotImplementedError(
f"Table name '{dataset.dataset_id}.{table_name}' was set to partition by '{partition_type}' which is not supported! Use one of [DAY, HOUR, MONTH, YEAR]. If empty, DAY will be used as default if partition_field is set.")

# schema_simplified = simplify(table_schema)
# schema = build_schema(schema_simplified, key_properties=key_props, add_metadata=metadata_columns,
# force_fields=force_fields)
Expand All @@ -361,7 +371,7 @@ def _load_to_bq(self,
# partitioning
if partition_field:
load_config.time_partitioning = bigquery.table.TimePartitioning(
type_=bigquery.table.TimePartitioningType.DAY,
type_=partition_type,
field=partition_field
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"streams": {
"simple_stream": {
"partition_field": "date",
"partition_type": "YEAR",
"cluster_fields": [
"name"
]
}
}
}
4 changes: 2 additions & 2 deletions tests/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@

runner = unittest.TextTestRunner()

if __name__ == "__main__": # this line prevents tests from running twice
runner.run(suite)
if __name__ == "__main__": # this line prevents tests from running twice
runner.run(suite)
30 changes: 29 additions & 1 deletion tests/test_simplestream.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,35 @@ def test_simple_stream_with_tables_config(self):
table = self.client.get_table("{}.simple_stream".format(self.dataset_id))
self.assertEqual(3, table.num_rows, msg="Number of rows mismatch")
self.assertIsNotNone(table.clustering_fields)
self.assertIsNotNone(table.partitioning_type)
self.assertIsNotNone(table.time_partitioning.type_ == 'DAY')

def test_simple_stream_with_tables_config_with_partition_type(self):
from target_bigquery import main

self.set_cli_args(
stdin=os.path.join(os.path.join(
os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'tests'), 'rsc'),
'data'), 'simple_stream.json'),
config=os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'sandbox'),
'target-config.json'),
tables=os.path.join(
os.path.join(os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
'tests'), 'rsc'), 'config'), 'simple_stream_table_config_with_partition_type.json'),
processhandler="load-job"
)

ret = main()
state = self.get_state()[-1]
print(state)

self.assertEqual(ret, 0, msg="Exit code is not 0!")
self.assertDictEqual(state, {"bookmarks": {"simple_stream": {"timestamp": "2020-01-11T00:00:00.000000Z"}}})

table = self.client.get_table("{}.simple_stream".format(self.dataset_id))
self.assertEqual(3, table.num_rows, msg="Number of rows mismatch")
self.assertIsNotNone(table.clustering_fields)
self.assertTrue(table.time_partitioning.type_ == 'YEAR')


def test_simple_stream_with_tables_config_passed_inside_target_config_file(self):
"""
Expand Down

0 comments on commit 54a745a

Please sign in to comment.