Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite files with correct header at end of stream #27

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ AUD,BGN,BRL,CAD,CHF,CNY,CZK,DKK,GBP,HKD,HRK,HUF,IDR,ILS,INR,JPY,KRW,MXN,MYR,NOK,
~/.virtualenvs/tap-exchangeratesapi/bin/tap-exchangeratesapi | ~/.virtualenvs/target-csv/bin/target-csv -c my-config.json
```

Config option **rewrite_headers** (default: `false`) can be used to rewrite the header of the csv-file(s) when the header is updated during the processing. This can happen when new fields are discovered during flattening of records. New discovered header fields are appended at the end of the header. Rewriting happens at the end of the stream, so no performance impact during streaming. Only the header is updated, other lines are copied as-is.

---

Copyright © 2017 Stitch
Expand Down
3 changes: 2 additions & 1 deletion config.sample.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"delimiter": "\t",
"quotechar": "'",
"destination_path": ""
"destination_path": "",
"rewrite_headers": true
}
79 changes: 61 additions & 18 deletions target_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from datetime import datetime
import collections
import pkg_resources
import shutil

from jsonschema.validators import Draft4Validator
import singer
Expand All @@ -34,13 +35,14 @@ def flatten(d, parent_key='', sep='__'):
else:
items.append((new_key, str(v) if type(v) is list else v))
return dict(items)
def persist_messages(delimiter, quotechar, messages, destination_path):

def persist_messages(delimiter, quotechar, messages, destination_path, rewrite_headers):
state = None
schemas = {}
key_properties = {}
headers = {}
validators = {}
files_incorrect_headers = {} # Keep track of files with incorrect headers

now = datetime.now().strftime('%Y%m%dT%H%M%S')

Expand All @@ -52,31 +54,39 @@ def persist_messages(delimiter, quotechar, messages, destination_path):
raise
message_type = o['type']
if message_type == 'RECORD':
if o['stream'] not in schemas:
stream = o['stream']
if stream not in schemas:
raise Exception("A record for stream {}"
"was encountered before a corresponding schema".format(o['stream']))
"was encountered before a corresponding schema".format(stream))

validators[o['stream']].validate(o['record'])
validators[stream].validate(o['record'])

filename = o['stream'] + '-' + now + '.csv'
filename = stream + '-' + now + '.csv'
filename = os.path.expanduser(os.path.join(destination_path, filename))
file_is_empty = (not os.path.isfile(filename)) or os.stat(filename).st_size == 0

flattened_record = flatten(o['record'])

if o['stream'] not in headers and not file_is_empty:
with open(filename, 'r') as csvfile:
reader = csv.reader(csvfile,
delimiter=delimiter,
quotechar=quotechar)
first_line = next(reader)
headers[o['stream']] = first_line if first_line else flattened_record.keys()
else:
headers[o['stream']] = flattened_record.keys()
if stream not in headers:
first_line = None
if not file_is_empty:
with open(filename, 'r') as csv_f:
reader = csv.reader(csv_f,
delimiter=delimiter,
quotechar=quotechar)
first_line = next(reader)
headers[stream] = first_line if first_line else list(flattened_record.keys())

# Check current record for unseen field.
missing_header_fields = set(flattened_record.keys()).difference(headers[stream])

if missing_header_fields:
headers[stream] += missing_header_fields
files_incorrect_headers[filename] = headers[stream]

with open(filename, 'a') as csvfile:
writer = csv.DictWriter(csvfile,
headers[o['stream']],
headers[stream],
extrasaction='ignore',
delimiter=delimiter,
quotechar=quotechar)
Expand All @@ -96,11 +106,43 @@ def persist_messages(delimiter, quotechar, messages, destination_path):
key_properties[stream] = o['key_properties']
else:
logger.warning("Unknown message type {} in message {}"
.format(o['type'], o))
.format(o['type'], o))

if files_incorrect_headers and rewrite_headers:
rewrite_csv(files_incorrect_headers, delimiter, quotechar)

return state


def rewrite_csv(files_incorrect_headers, delimiter, quotechar):
"""
Rewrite the CSV-file(s) to update the first line, the header.

Thereby duplicate the data and overwrite the original file afterwards.
"""
logger.info('Rewriting {} csv file(s) with incorrect headers.'
.format(len(files_incorrect_headers)))

for filename, correct_header in files_incorrect_headers.items():
tmp_filename = filename + ".tmp"

with open(filename, "r") as csv_f:
csv_f.readline() # ignore the incorrect header

with open(tmp_filename, "w") as tmp_f:
writer = csv.DictWriter(tmp_f,
correct_header,
extrasaction='ignore',
delimiter=delimiter,
quotechar=quotechar)
# Write correct headers and rest of the content.
writer.writeheader()
shutil.copyfileobj(csv_f, tmp_f)

# Atomic move after updating file
shutil.move(tmp_filename, filename)


def send_usage_stats():
try:
version = pkg_resources.get_distribution('target-csv').version
Expand Down Expand Up @@ -141,7 +183,8 @@ def main():
state = persist_messages(config.get('delimiter', ','),
config.get('quotechar', '"'),
input_messages,
config.get('destination_path', ''))
config.get('destination_path', ''),
config.get('rewrite_headers', False))

emit_state(state)
logger.debug("Exiting normally")
Expand Down