Skip to content

Commit

Permalink
Merge pull request #8 from seizethedave/master
Browse files Browse the repository at this point in the history
Add ability to bypass record validation.
  • Loading branch information
Michael Dunn authored Mar 8, 2019
2 parents 366f85b + 8444462 commit a969f10
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 7 deletions.
3 changes: 2 additions & 1 deletion config.sample.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"project_id": "bigquery-public-data",
"dataset_id": "samples",
"table_id": "github_timeline"
"table_id": "github_timeline",
"validate_records": true
}
16 changes: 10 additions & 6 deletions target_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def build_schema(schema):

return SCHEMA

def persist_lines_job(project_id, dataset_id, lines=None, truncate=False):
def persist_lines_job(project_id, dataset_id, lines=None, truncate=False, validate_records=True):
state = None
schemas = {}
key_properties = {}
Expand Down Expand Up @@ -130,7 +130,8 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False):

schema = schemas[msg.stream]

validate(msg.record, schema)
if validate_records:
validate(msg.record, schema)

# NEWLINE_DELIMITED_JSON expects literal JSON formatted data, with a newline character splitting each row.
dat = bytes(json.dumps(msg.record) + '\n', 'UTF-8')
Expand Down Expand Up @@ -189,7 +190,7 @@ def persist_lines_job(project_id, dataset_id, lines=None, truncate=False):

return state

def persist_lines_stream(project_id, dataset_id, lines=None):
def persist_lines_stream(project_id, dataset_id, lines=None, validate_records=True):
state = None
schemas = {}
key_properties = {}
Expand Down Expand Up @@ -219,7 +220,8 @@ def persist_lines_stream(project_id, dataset_id, lines=None):

schema = schemas[msg.stream]

validate(msg.record, schema)
if validate_records:
validate(msg.record, schema)

errors[msg.stream] = bigquery_client.insert_rows_json(tables[msg.stream], [msg.record])
rows[msg.stream] += 1
Expand Down Expand Up @@ -290,12 +292,14 @@ def main():
else:
truncate = False

validate_records = config.get('validate_records', True)

input = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8')

if config.get('stream_data', True):
state = persist_lines_stream(config['project_id'], config['dataset_id'], input)
state = persist_lines_stream(config['project_id'], config['dataset_id'], input, validate_records=validate_records)
else:
state = persist_lines_job(config['project_id'], config['dataset_id'], input, truncate=truncate)
state = persist_lines_job(config['project_id'], config['dataset_id'], input, truncate=truncate, validate_records=validate_records)

emit_state(state)
logger.debug("Exiting normally")
Expand Down

0 comments on commit a969f10

Please sign in to comment.