diff --git a/config.sample.json b/config.sample.json index 98bcfdd..365f1c5 100644 --- a/config.sample.json +++ b/config.sample.json @@ -1,5 +1,6 @@ { "project_id": "bigquery-public-data", "dataset_id": "samples", - "table_id": "github_timeline" + "table_id": "github_timeline", + "validate_records": true } diff --git a/target_bigquery.py b/target_bigquery.py index b520223..4486c4e 100644 --- a/target_bigquery.py +++ b/target_bigquery.py @@ -102,7 +102,7 @@ def build_schema(schema): return SCHEMA -def persist_lines_job(project_id, dataset_id, lines=None, truncate=False): +def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, validate_records=True): state = None schemas = {} key_properties = {} @@ -130,7 +130,8 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False): schema = schemas[msg.stream] - validate(msg.record, schema) + if validate_records: + validate(msg.record, schema) # NEWLINE_DELIMITED_JSON expects literal JSON formatted data, with a newline character splitting each row. dat = bytes(json.dumps(msg.record) + '\n', 'UTF-8') @@ -189,7 +190,7 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False): return state -def persist_lines_stream(project_id, dataset_id, lines=None): +def persist_lines_stream(project_id, dataset_id, lines=None, validate_records=True): state = None schemas = {} key_properties = {} @@ -219,7 +220,8 @@ def persist_lines_stream(project_id, dataset_id, lines=None): schema = schemas[msg.stream] - validate(msg.record, schema) + if validate_records: + validate(msg.record, schema) errors[msg.stream] = bigquery_client.insert_rows_json(tables[msg.stream], [msg.record]) rows[msg.stream] += 1 @@ -290,12 +292,14 @@ def main(): else: truncate = False + validate_records = config.get('validate_records', True) + input = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8') if config.get('stream_data', True): - state = persist_lines_stream(config['project_id'], config['dataset_id'], input) + state = persist_lines_stream(config['project_id'], config['dataset_id'], input, validate_records=validate_records) else: - state = persist_lines_job(config['project_id'], config['dataset_id'], input, truncate=truncate) + state = persist_lines_job(config['project_id'], config['dataset_id'], input, truncate=truncate, validate_records=validate_records) emit_state(state) logger.debug("Exiting normally")