diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 00000000..b5158981 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,18 @@ +version: 2 +registries: + python-index-pypi-org: + type: python-index + url: https://pypi.org/ + replaces-base: true + username: "${{secrets.PYTHON_INDEX_PYPI_ORG_USERNAME}}" + password: "${{secrets.PYTHON_INDEX_PYPI_ORG_PASSWORD}}" + +updates: +- package-ecosystem: pip + directory: "/" + schedule: + interval: daily + time: "19:00" + open-pull-requests-limit: 10 + registries: + - python-index-pypi-org diff --git a/README.rst b/README.rst index 0586f336..95c3015e 100644 --- a/README.rst +++ b/README.rst @@ -196,6 +196,12 @@ This setting is shared with other plugins that download resource files, such as ckan.download_proxy = http://my-proxy:1234/ +You may also wish to configure the database to use your preferred date input style on COPY. +For example, to make [PostgreSQL](https://www.postgresql.org/docs/current/runtime-config-client.html#RUNTIME-CONFIG-CLIENT-FORMAT) +expect European (day-first) dates, you could add to ``postgresql.conf``: + + datestyle=ISO,DMY + ------------------------ Developer installation ------------------------ diff --git a/ckanext/xloader/action.py b/ckanext/xloader/action.py index 3fa26803..aabc8148 100644 --- a/ckanext/xloader/action.py +++ b/ckanext/xloader/action.py @@ -152,10 +152,17 @@ def xloader_submit(context, data_dict): 'original_url': resource_dict.get('url'), } } - timeout = config.get('ckanext.xloader.job_timeout', '3600') + # Expand timeout for resources that have to be type-guessed + timeout = config.get( + 'ckanext.xloader.job_timeout', + '3600' if utils.datastore_resource_exists(res_id) else '10800') + log.debug("Timeout for XLoading resource %s is %s", res_id, timeout) + try: job = enqueue_job( - jobs.xloader_data_into_datastore, [data], rq_kwargs=dict(timeout=timeout) + jobs.xloader_data_into_datastore, [data], + title="xloader_submit: package: {} resource: {}".format(resource_dict.get('package_id'), res_id), + rq_kwargs=dict(timeout=timeout) ) except Exception: log.exception('Unable to enqueued xloader res_id=%s', res_id) diff --git a/ckanext/xloader/config_declaration.yaml b/ckanext/xloader/config_declaration.yaml index b31f12e2..feb1cc9c 100644 --- a/ckanext/xloader/config_declaration.yaml +++ b/ckanext/xloader/config_declaration.yaml @@ -29,9 +29,7 @@ groups: default: 1_000_000_000 example: 100000 description: | - The connection string for the jobs database used by XLoader. The - default of an sqlite file is fine for development. For production use a - Postgresql database. + The maximum file size that XLoader will attempt to load. type: int required: false - key: ckanext.xloader.use_type_guessing @@ -48,6 +46,15 @@ groups: type: bool required: false legacy_key: ckanext.xloader.just_load_with_messytables + - key: ckanext.xloader.max_type_guessing_length + default: 0 + example: 100000 + description: | + The maximum file size that will be passed to Tabulator if the + use_type_guessing flag is enabled. Larger files will use COPY even if + the flag is set. Defaults to 1/10 of the maximum content length. + type: int + required: false - key: ckanext.xloader.parse_dates_dayfirst default: False example: False diff --git a/ckanext/xloader/db.py b/ckanext/xloader/db.py index a3078ea4..a93eb0d8 100644 --- a/ckanext/xloader/db.py +++ b/ckanext/xloader/db.py @@ -191,9 +191,7 @@ def add_pending_job(job_id, job_type, api_key, if not metadata: metadata = {} - conn = ENGINE.connect() - trans = conn.begin() - try: + with ENGINE.begin() as conn: conn.execute(JOBS_TABLE.insert().values( job_id=job_id, job_type=job_type, @@ -225,12 +223,6 @@ def add_pending_job(job_id, job_type, api_key, ) if inserts: conn.execute(METADATA_TABLE.insert(), inserts) - trans.commit() - except Exception: - trans.rollback() - raise - finally: - conn.close() class InvalidErrorObjectError(Exception): diff --git a/ckanext/xloader/helpers.py b/ckanext/xloader/helpers.py index 6c4b8b9b..90c70933 100644 --- a/ckanext/xloader/helpers.py +++ b/ckanext/xloader/helpers.py @@ -28,13 +28,17 @@ def xloader_status_description(status): return _('Not Uploaded Yet') -def is_resource_supported_by_xloader(res_dict, check_access = True): +def is_resource_supported_by_xloader(res_dict, check_access=True): is_supported_format = XLoaderFormats.is_it_an_xloader_format(res_dict.get('format')) is_datastore_active = res_dict.get('datastore_active', False) user_has_access = not check_access or toolkit.h.check_access('package_update', {'id':res_dict.get('package_id')}) - try: - is_supported_url_type = res_dict.get('url_type') not in toolkit.h.datastore_rw_resource_url_types() - except AttributeError: - is_supported_url_type = (res_dict.get('url_type') == 'upload' or not res_dict.get('url_type')) + url_type = res_dict.get('url_type') + if url_type: + try: + is_supported_url_type = url_type not in toolkit.h.datastore_rw_resource_url_types() + except AttributeError: + is_supported_url_type = (url_type == 'upload') + else: + is_supported_url_type = True return (is_supported_format or is_datastore_active) and user_has_access and is_supported_url_type diff --git a/ckanext/xloader/jobs.py b/ckanext/xloader/jobs.py index 7b96b993..8393c970 100644 --- a/ckanext/xloader/jobs.py +++ b/ckanext/xloader/jobs.py @@ -7,6 +7,7 @@ import tempfile import json import datetime +import os import traceback import sys @@ -21,7 +22,7 @@ from . import db, loader from .job_exceptions import JobError, HTTPError, DataTooBigError, FileCouldNotBeLoadedError -from .utils import set_resource_metadata +from .utils import datastore_resource_exists, set_resource_metadata try: from ckan.lib.api_token import get_user_from_token @@ -35,10 +36,13 @@ requests.packages.urllib3.disable_warnings() MAX_CONTENT_LENGTH = int(config.get('ckanext.xloader.max_content_length') or 1e9) +# Don't try Tabulator load on large files +MAX_TYPE_GUESSING_LENGTH = int(config.get('ckanext.xloader.max_type_guessing_length') or MAX_CONTENT_LENGTH / 10) MAX_EXCERPT_LINES = int(config.get('ckanext.xloader.max_excerpt_lines') or 0) CHUNK_SIZE = 16 * 1024 # 16kb DOWNLOAD_TIMEOUT = 30 +MAX_RETRIES = 1 RETRYABLE_ERRORS = ( errors.DeadlockDetected, errors.LockNotAvailable, @@ -89,18 +93,21 @@ def xloader_data_into_datastore(input): db.mark_job_as_errored(job_id, str(e)) job_dict['status'] = 'error' job_dict['error'] = str(e) - log.error('xloader error: {0}, {1}'.format(e, traceback.format_exc())) + log.error('xloader error: %s, %s', e, traceback.format_exc()) errored = True except Exception as e: if isinstance(e, RETRYABLE_ERRORS): tries = job_dict['metadata'].get('tries', 0) - if tries == 0: + if tries < MAX_RETRIES: + tries = tries + 1 log.info("Job %s failed due to temporary error [%s], retrying", job_id, e) job_dict['status'] = 'pending' - job_dict['metadata']['tries'] = tries + 1 + job_dict['metadata']['tries'] = tries enqueue_job( xloader_data_into_datastore, [input], + title="retry xloader_data_into_datastore: resource: {} attempt {}".format( + job_dict['metadata']['resource_id'], tries), rq_kwargs=dict(timeout=RETRIED_JOB_TIMEOUT) ) return None @@ -109,7 +116,7 @@ def xloader_data_into_datastore(input): job_id, traceback.format_tb(sys.exc_info()[2])[-1] + repr(e)) job_dict['status'] = 'error' job_dict['error'] = str(e) - log.error('xloader error: {0}, {1}'.format(e, traceback.format_exc())) + log.error('xloader error: %s, %s', e, traceback.format_exc()) errored = True finally: # job_dict is defined in xloader_hook's docstring @@ -226,11 +233,12 @@ def tabulator_load(): logger.info('Loading CSV') # If ckanext.xloader.use_type_guessing is not configured, fall back to # deprecated ckanext.xloader.just_load_with_messytables - use_type_guessing = asbool(config.get( - 'ckanext.xloader.use_type_guessing', config.get( - 'ckanext.xloader.just_load_with_messytables', False))) - logger.info("'use_type_guessing' mode is: %s", - use_type_guessing) + use_type_guessing = asbool( + config.get('ckanext.xloader.use_type_guessing', config.get( + 'ckanext.xloader.just_load_with_messytables', False))) \ + and not datastore_resource_exists(resource['id']) \ + and os.path.getsize(tmp_file.name) <= MAX_TYPE_GUESSING_LENGTH + logger.info("'use_type_guessing' mode is: %s", use_type_guessing) try: if use_type_guessing: tabulator_load() @@ -558,8 +566,7 @@ def __init__(self, task_id, input): self.input = input def emit(self, record): - conn = db.ENGINE.connect() - try: + with db.ENGINE.connect() as conn: # Turn strings into unicode to stop SQLAlchemy # "Unicode type received non-unicode bind param value" warnings. message = str(record.getMessage()) @@ -575,8 +582,6 @@ def emit(self, record): module=module, funcName=funcName, lineno=record.lineno)) - finally: - conn.close() class DatetimeJsonEncoder(json.JSONEncoder): diff --git a/ckanext/xloader/loader.py b/ckanext/xloader/loader.py index e64802e8..85be3f34 100644 --- a/ckanext/xloader/loader.py +++ b/ckanext/xloader/loader.py @@ -9,15 +9,16 @@ from decimal import Decimal import psycopg2 +from chardet.universaldetector import UniversalDetector from six.moves import zip -from tabulator import config as tabulator_config, Stream, TabulatorException +from tabulator import config as tabulator_config, EncodingError, Stream, TabulatorException from unidecode import unidecode import ckan.plugins as p from .job_exceptions import FileCouldNotBeLoadedError, LoaderError -from .parser import CSV_SAMPLE_LINES, XloaderCSVParser -from .utils import headers_guess, type_guess +from .parser import CSV_SAMPLE_LINES, TypeConverter +from .utils import datastore_resource_exists, headers_guess, type_guess from ckan.plugins.toolkit import config @@ -30,6 +31,52 @@ MAX_COLUMN_LENGTH = 63 tabulator_config.CSV_SAMPLE_LINES = CSV_SAMPLE_LINES +SINGLE_BYTE_ENCODING = 'cp1252' + + +class UnknownEncodingStream(object): + """ Provides a context manager that wraps a Tabulator stream + and tries multiple encodings if one fails. + + This is particularly relevant in cases like Latin-1 encoding, + which is usually ASCII and thus the sample could be sniffed as UTF-8, + only to run into problems later in the file. + """ + + def __init__(self, filepath, file_format, decoding_result, **kwargs): + self.filepath = filepath + self.file_format = file_format + self.stream_args = kwargs + self.decoding_result = decoding_result # {'encoding': 'EUC-JP', 'confidence': 0.99} + + def __enter__(self): + try: + + if (self.decoding_result and self.decoding_result['confidence'] and self.decoding_result['confidence'] > 0.7): + self.stream = Stream(self.filepath, format=self.file_format, encoding=self.decoding_result['encoding'], + ** self.stream_args).__enter__() + else: + self.stream = Stream(self.filepath, format=self.file_format, ** self.stream_args).__enter__() + + except (EncodingError, UnicodeDecodeError): + self.stream = Stream(self.filepath, format=self.file_format, + encoding=SINGLE_BYTE_ENCODING, **self.stream_args).__enter__() + return self.stream + + def __exit__(self, *args): + return self.stream.__exit__(*args) + + +def detect_encoding(file_path): + detector = UniversalDetector() + with open(file_path, 'rb') as file: + for line in file: + detector.feed(line) + if detector.done: + break + detector.close() + return detector.result # e.g. {'encoding': 'EUC-JP', 'confidence': 0.99} + def _fields_match(fields, existing_fields, logger): ''' Check whether all columns have the same names and types as previously, @@ -77,15 +124,17 @@ def _clear_datastore_resource(resource_id): def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None): '''Loads a CSV into DataStore. Does not create the indexes.''' + decoding_result = detect_encoding(csv_filepath) + logger.info("load_csv: Decoded encoding: %s", decoding_result) # Determine the header row try: file_format = os.path.splitext(csv_filepath)[1].strip('.') - with Stream(csv_filepath, format=file_format) as stream: + with UnknownEncodingStream(csv_filepath, file_format, decoding_result) as stream: header_offset, headers = headers_guess(stream.sample) except TabulatorException: try: file_format = mimetype.lower().split('/')[-1] - with Stream(csv_filepath, format=file_format) as stream: + with UnknownEncodingStream(csv_filepath, file_format, decoding_result) as stream: header_offset, headers = headers_guess(stream.sample) except TabulatorException as e: raise LoaderError('Tabulator error: {}'.format(e)) @@ -116,10 +165,16 @@ def load_csv(csv_filepath, resource_id, mimetype='text/csv', logger=None): logger.info('Ensuring character coding is UTF8') f_write = tempfile.NamedTemporaryFile(suffix=file_format, delete=False) try: - with Stream(csv_filepath, format=file_format, skip_rows=skip_rows) as stream: - stream.save(target=f_write.name, format='csv', encoding='utf-8', - delimiter=delimiter) - csv_filepath = f_write.name + save_args = {'target': f_write.name, 'format': 'csv', 'encoding': 'utf-8', 'delimiter': delimiter} + try: + with UnknownEncodingStream(csv_filepath, file_format, decoding_result, + skip_rows=skip_rows) as stream: + stream.save(**save_args) + except (EncodingError, UnicodeDecodeError): + with Stream(csv_filepath, format=file_format, encoding=SINGLE_BYTE_ENCODING, + skip_rows=skip_rows) as stream: + stream.save(**save_args) + csv_filepath = f_write.name # datastore db connection engine = get_write_engine() @@ -287,16 +342,18 @@ def load_table(table_filepath, resource_id, mimetype='text/csv', logger=None): # Determine the header row logger.info('Determining column names and types') + decoding_result = detect_encoding(table_filepath) + logger.info("load_table: Decoded encoding: %s", decoding_result) try: file_format = os.path.splitext(table_filepath)[1].strip('.') - with Stream(table_filepath, format=file_format, - custom_parsers={'csv': XloaderCSVParser}) as stream: + with UnknownEncodingStream(table_filepath, file_format, decoding_result, + post_parse=[TypeConverter().convert_types]) as stream: header_offset, headers = headers_guess(stream.sample) except TabulatorException: try: file_format = mimetype.lower().split('/')[-1] - with Stream(table_filepath, format=file_format, - custom_parsers={'csv': XloaderCSVParser}) as stream: + with UnknownEncodingStream(table_filepath, file_format, decoding_result, + post_parse=[TypeConverter().convert_types]) as stream: header_offset, headers = headers_guess(stream.sample) except TabulatorException as e: raise LoaderError('Tabulator error: {}'.format(e)) @@ -332,9 +389,11 @@ def load_table(table_filepath, resource_id, mimetype='text/csv', logger=None): for t, h in zip(types, headers)] headers = [header.strip()[:MAX_COLUMN_LENGTH] for header in headers if header.strip()] + type_converter = TypeConverter(types=types) - with Stream(table_filepath, format=file_format, skip_rows=skip_rows, - custom_parsers={'csv': XloaderCSVParser}) as stream: + with UnknownEncodingStream(table_filepath, file_format, decoding_result, + skip_rows=skip_rows, + post_parse=[type_converter.convert_types]) as stream: def row_iterator(): for row in stream: data_row = {} @@ -457,17 +516,6 @@ def send_resource_to_datastore(resource_id, headers, records): .format(str(e))) -def datastore_resource_exists(resource_id): - from ckan import model - context = {'model': model, 'ignore_auth': True} - try: - response = p.toolkit.get_action('datastore_search')(context, dict( - id=resource_id, limit=0)) - except p.toolkit.ObjectNotFound: - return False - return response or {'fields': []} - - def delete_datastore_resource(resource_id): from ckan import model context = {'model': model, 'user': '', 'ignore_auth': True} diff --git a/ckanext/xloader/parser.py b/ckanext/xloader/parser.py index 82539f4d..11e756cd 100644 --- a/ckanext/xloader/parser.py +++ b/ckanext/xloader/parser.py @@ -1,161 +1,73 @@ # -*- coding: utf-8 -*- -import csv +import datetime from decimal import Decimal, InvalidOperation -from itertools import chain +import re +import six from ckan.plugins.toolkit import asbool -from dateutil.parser import isoparser, parser -from dateutil.parser import ParserError - -from tabulator import helpers -from tabulator.parser import Parser +from dateutil.parser import isoparser, parser, ParserError from ckan.plugins.toolkit import config CSV_SAMPLE_LINES = 1000 +DATE_REGEX = re.compile(r'''^\d{1,4}[-/.\s]\S+[-/.\s]\S+''') -class XloaderCSVParser(Parser): - """Extends tabulator CSVParser to detect datetime and numeric values. +class TypeConverter: + """ Post-process table cells to convert strings into numbers and timestamps + as desired. """ - # Public - - options = [ - 'delimiter', - 'doublequote', - 'escapechar', - 'quotechar', - 'quoting', - 'skipinitialspace', - 'lineterminator' - ] - - def __init__(self, loader, force_parse=False, **options): - super(XloaderCSVParser, self).__init__(loader, force_parse, **options) - # Set attributes - self.__loader = loader - self.__options = options - self.__force_parse = force_parse - self.__extended_rows = None - self.__encoding = None - self.__dialect = None - self.__chars = None - - @property - def closed(self): - return self.__chars is None or self.__chars.closed - - def open(self, source, encoding=None): - # Close the character stream, if necessary, before reloading it. - self.close() - self.__chars = self.__loader.load(source, encoding=encoding) - self.__encoding = getattr(self.__chars, 'encoding', encoding) - if self.__encoding: - self.__encoding.lower() - self.reset() - - def close(self): - if not self.closed: - self.__chars.close() - - def reset(self): - helpers.reset_stream(self.__chars) - self.__extended_rows = self.__iter_extended_rows() - - @property - def encoding(self): - return self.__encoding - - @property - def dialect(self): - if self.__dialect: - dialect = { - 'delimiter': self.__dialect.delimiter, - 'doubleQuote': self.__dialect.doublequote, - 'lineTerminator': self.__dialect.lineterminator, - 'quoteChar': self.__dialect.quotechar, - 'skipInitialSpace': self.__dialect.skipinitialspace, - } - if self.__dialect.escapechar is not None: - dialect['escapeChar'] = self.__dialect.escapechar - return dialect - - @property - def extended_rows(self): - return self.__extended_rows - - # Private - - def __iter_extended_rows(self): - - def type_value(value): - """Returns numeric values as Decimal(). Uses dateutil to parse - date values. Otherwise, returns values as it receives them - (strings). - """ - if value in ('', None): - return '' - - try: - return Decimal(value) - except InvalidOperation: - pass - - try: - i = isoparser() - return i.isoparse(value) - except ValueError: - pass - - try: - p = parser() - yearfirst = asbool(config.get( - 'ckanext.xloader.parse_dates_yearfirst', False)) - dayfirst = asbool(config.get( - 'ckanext.xloader.parse_dates_dayfirst', False)) - return p.parse(value, yearfirst=yearfirst, dayfirst=dayfirst) - except ParserError: - pass - - return value - - sample, dialect = self.__prepare_dialect(self.__chars) - items = csv.reader(chain(sample, self.__chars), dialect=dialect) - for row_number, item in enumerate(items, start=1): - values = [] - for value in item: - value = type_value(value) - values.append(value) - yield row_number, None, list(values) - - def __prepare_dialect(self, stream): - - # Get sample - sample = [] - while True: - try: - sample.append(next(stream)) - except StopIteration: - break - if len(sample) >= CSV_SAMPLE_LINES: - break - - # Get dialect + def __init__(self, types=None): + self.types = types + + def convert_types(self, extended_rows): + """ Try converting cells to numbers or timestamps if applicable. + If a list of types was supplied, use that. + If not, then try converting each column to numeric first, + then to a timestamp. If both fail, just keep it as a string. + """ + for row_number, headers, row in extended_rows: + for cell_index, cell_value in enumerate(row): + if cell_value is None: + row[cell_index] = '' + if not cell_value: + continue + cell_type = self.types[cell_index] if self.types else None + if cell_type in [Decimal, None]: + converted_value = to_number(cell_value) + # Can't do a simple truthiness check, + # because 0 is a valid numeric result. + if converted_value is not None: + row[cell_index] = converted_value + continue + if cell_type in [datetime.datetime, None]: + converted_value = to_timestamp(cell_value) + if converted_value: + row[cell_index] = converted_value + yield (row_number, headers, row) + + +def to_number(value): + if not isinstance(value, six.string_types): + return None + try: + return Decimal(value) + except InvalidOperation: + return None + + +def to_timestamp(value): + if not isinstance(value, six.string_types) or not DATE_REGEX.search(value): + return None + try: + i = isoparser() + return i.isoparse(value) + except ValueError: try: - separator = '' - delimiter = self.__options.get('delimiter', ',\t;|') - dialect = csv.Sniffer().sniff(separator.join(sample), delimiter) - if not dialect.escapechar: - dialect.doublequote = True - except csv.Error: - class dialect(csv.excel): - pass - for key, value in self.__options.items(): - setattr(dialect, key, value) - # https://github.com/frictionlessdata/FrictionlessDarwinCore/issues/1 - if getattr(dialect, 'quotechar', None) == '': - setattr(dialect, 'quoting', csv.QUOTE_NONE) - - self.__dialect = dialect - return sample, dialect + p = parser() + yearfirst = asbool(config.get('ckanext.xloader.parse_dates_yearfirst', False)) + dayfirst = asbool(config.get('ckanext.xloader.parse_dates_dayfirst', False)) + return p.parse(value, yearfirst=yearfirst, dayfirst=dayfirst) + except ParserError: + return None diff --git a/ckanext/xloader/plugin.py b/ckanext/xloader/plugin.py index 392b1cf5..6e65e466 100644 --- a/ckanext/xloader/plugin.py +++ b/ckanext/xloader/plugin.py @@ -78,8 +78,8 @@ def notify(self, entity, operation): See: ckan/model/modification.py.DomainObjectModificationExtension """ if operation != DomainObjectOperation.changed \ - or not isinstance(entity, Resource) \ - or not getattr(entity, 'url_changed', False): + or not isinstance(entity, Resource) \ + or not getattr(entity, 'url_changed', False): return context = { "ignore_auth": True, diff --git a/ckanext/xloader/templates/xloader/resource_data.html b/ckanext/xloader/templates/xloader/resource_data.html index 74a5f715..98027508 100644 --- a/ckanext/xloader/templates/xloader/resource_data.html +++ b/ckanext/xloader/templates/xloader/resource_data.html @@ -23,7 +23,7 @@ {% set delete_action = h.url_for('xloader.delete_datastore_table', id=pkg.id, resource_id=res.id) %}