diff --git a/README.md b/README.md index 627d6dc..ad84d53 100644 --- a/README.md +++ b/README.md @@ -52,6 +52,8 @@ AUD,BGN,BRL,CAD,CHF,CNY,CZK,DKK,GBP,HKD,HRK,HUF,IDR,ILS,INR,JPY,KRW,MXN,MYR,NOK, ~/.virtualenvs/tap-exchangeratesapi/bin/tap-exchangeratesapi | ~/.virtualenvs/target-csv/bin/target-csv -c my-config.json ``` +Config option **rewrite_headers** (default: `false`) can be used to rewrite the header of the csv-file(s) when the header is updated during the processing. This can happen when new fields are discovered during flattening of records. New discovered header fields are appended at the end of the header. Rewriting happens at the end of the stream, so no performance impact during streaming. Only the header is updated, other lines are copied as-is. + --- Copyright © 2017 Stitch diff --git a/config.sample.json b/config.sample.json index f677aae..a160915 100644 --- a/config.sample.json +++ b/config.sample.json @@ -1,5 +1,6 @@ { "delimiter": "\t", "quotechar": "'", - "destination_path": "" + "destination_path": "", + "rewrite_headers": true } diff --git a/target_csv.py b/target_csv.py index c372545..1933664 100755 --- a/target_csv.py +++ b/target_csv.py @@ -12,6 +12,7 @@ from datetime import datetime import collections import pkg_resources +import shutil from jsonschema.validators import Draft4Validator import singer @@ -34,13 +35,14 @@ def flatten(d, parent_key='', sep='__'): else: items.append((new_key, str(v) if type(v) is list else v)) return dict(items) - -def persist_messages(delimiter, quotechar, messages, destination_path): + +def persist_messages(delimiter, quotechar, messages, destination_path, rewrite_headers): state = None schemas = {} key_properties = {} headers = {} validators = {} + files_incorrect_headers = {} # Keep track of files with incorrect headers now = datetime.now().strftime('%Y%m%dT%H%M%S') @@ -52,31 +54,39 @@ def persist_messages(delimiter, quotechar, messages, destination_path): raise message_type = o['type'] if message_type == 'RECORD': - if o['stream'] not in schemas: + stream = o['stream'] + if stream not in schemas: raise Exception("A record for stream {}" - "was encountered before a corresponding schema".format(o['stream'])) + "was encountered before a corresponding schema".format(stream)) - validators[o['stream']].validate(o['record']) + validators[stream].validate(o['record']) - filename = o['stream'] + '-' + now + '.csv' + filename = stream + '-' + now + '.csv' filename = os.path.expanduser(os.path.join(destination_path, filename)) file_is_empty = (not os.path.isfile(filename)) or os.stat(filename).st_size == 0 flattened_record = flatten(o['record']) - if o['stream'] not in headers and not file_is_empty: - with open(filename, 'r') as csvfile: - reader = csv.reader(csvfile, - delimiter=delimiter, - quotechar=quotechar) - first_line = next(reader) - headers[o['stream']] = first_line if first_line else flattened_record.keys() - else: - headers[o['stream']] = flattened_record.keys() + if stream not in headers: + first_line = None + if not file_is_empty: + with open(filename, 'r') as csv_f: + reader = csv.reader(csv_f, + delimiter=delimiter, + quotechar=quotechar) + first_line = next(reader) + headers[stream] = first_line if first_line else list(flattened_record.keys()) + + # Check current record for unseen field. + missing_header_fields = set(flattened_record.keys()).difference(headers[stream]) + + if missing_header_fields: + headers[stream] += missing_header_fields + files_incorrect_headers[filename] = headers[stream] with open(filename, 'a') as csvfile: writer = csv.DictWriter(csvfile, - headers[o['stream']], + headers[stream], extrasaction='ignore', delimiter=delimiter, quotechar=quotechar) @@ -96,11 +106,43 @@ def persist_messages(delimiter, quotechar, messages, destination_path): key_properties[stream] = o['key_properties'] else: logger.warning("Unknown message type {} in message {}" - .format(o['type'], o)) + .format(o['type'], o)) + + if files_incorrect_headers and rewrite_headers: + rewrite_csv(files_incorrect_headers, delimiter, quotechar) return state +def rewrite_csv(files_incorrect_headers, delimiter, quotechar): + """ + Rewrite the CSV-file(s) to update the first line, the header. + + Thereby duplicate the data and overwrite the original file afterwards. + """ + logger.info('Rewriting {} csv file(s) with incorrect headers.' + .format(len(files_incorrect_headers))) + + for filename, correct_header in files_incorrect_headers.items(): + tmp_filename = filename + ".tmp" + + with open(filename, "r") as csv_f: + csv_f.readline() # ignore the incorrect header + + with open(tmp_filename, "w") as tmp_f: + writer = csv.DictWriter(tmp_f, + correct_header, + extrasaction='ignore', + delimiter=delimiter, + quotechar=quotechar) + # Write correct headers and rest of the content. + writer.writeheader() + shutil.copyfileobj(csv_f, tmp_f) + + # Atomic move after updating file + shutil.move(tmp_filename, filename) + + def send_usage_stats(): try: version = pkg_resources.get_distribution('target-csv').version @@ -141,7 +183,8 @@ def main(): state = persist_messages(config.get('delimiter', ','), config.get('quotechar', '"'), input_messages, - config.get('destination_path', '')) + config.get('destination_path', ''), + config.get('rewrite_headers', False)) emit_state(state) logger.debug("Exiting normally")