diff --git a/data_importer/__init__.py b/data_importer/__init__.py deleted file mode 100644 index 8486303..0000000 --- a/data_importer/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by 'bens3' on 2013-06-21. -Copyright (c) 2013 'bens3'. All rights reserved. -""" diff --git a/data_importer/commands/__init__.py b/data_importer/commands/__init__.py deleted file mode 100644 index dd67e53..0000000 --- a/data_importer/commands/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '20/02/2017'. -""" - diff --git a/data_importer/commands/cron.py b/data_importer/commands/cron.py deleted file mode 100644 index f6d3c5a..0000000 --- a/data_importer/commands/cron.py +++ /dev/null @@ -1,37 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '31/03/2017'. -""" - -import click -import time -import luigi - -from data_importer.tasks.specimen import SpecimenDatasetTask -from data_importer.tasks.indexlot import IndexLotDatasetTask -from data_importer.tasks.artefact import ArtefactDatasetTask - - -@click.command() -@click.option('--local-scheduler', default=False, help='Whether to use the luigi local scheduler.', is_flag=True) -def run_cron(local_scheduler): - """ - Run tasks on cron - gets the current date, and runs tasks for that date - This should be used in conjunction with a cron task, that schedules - this command to be run for every day a known export is produced - if an export is missing or corrupt (zero bytes) the tasks themselves - will raise an error - :param local_scheduler: - :return: None - """ - # Get today's date, formatted as per keemu export files - 20170608 - params = { - 'date': int(time.strftime("%Y%m%d")) - } - for task in [SpecimenDatasetTask, IndexLotDatasetTask, ArtefactDatasetTask]: - luigi.build([task(**params)], local_scheduler=local_scheduler) - - -if __name__ == "__main__": - run_cron() diff --git a/data_importer/commands/drop_tables.py b/data_importer/commands/drop_tables.py deleted file mode 100644 index 5722100..0000000 --- a/data_importer/commands/drop_tables.py +++ /dev/null @@ -1,53 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '31/03/2017'. -""" - - -import click -import psycopg2 -import logging -from prompter import yesno - -from data_importer.lib.config import Config -from data_importer.lib.dataset import dataset_get_tasks -from data_importer.tasks.keemu.ecatalogue import EcatalogueTask -from data_importer.tasks.keemu.emultimedia import EMultimediaTask -from data_importer.tasks.keemu.etaxonomy import ETaxonomyTask -from data_importer.lib.db import db_drop_table -from data_importer.lib.dataset import dataset_get_foreign_keys - - -logger = logging.getLogger('luigi-interface') - - -@click.command() -def drop_tables(): - """ - Drop all tables - :return: None - """ - - connection = psycopg2.connect( - host=Config.get('database', 'host'), - port=Config.get('database', 'port'), - database=Config.get('database', 'datastore_dbname'), - user=Config.get('database', 'username'), - password=Config.get('database', 'password') - ) - - if yesno('Your are dropping all tables - all data will be deleted. Are you sure you want to continue?'): - db_drop_table('table_updates', connection) - # Delete all info in the module tables - for task in [EcatalogueTask, ETaxonomyTask, EMultimediaTask]: - db_drop_table(task.module_name, connection) - - for foreign_key in dataset_get_foreign_keys(): - db_drop_table(foreign_key.table, connection) - - connection.commit() - connection.close() - -if __name__ == "__main__": - drop_tables() diff --git a/data_importer/commands/run.py b/data_importer/commands/run.py deleted file mode 100644 index b4ab0cb..0000000 --- a/data_importer/commands/run.py +++ /dev/null @@ -1,34 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '31/03/2017'. - - python commands/run.py 20170906 - -""" - -import click -import time -import luigi - -from data_importer.tasks.specimen import SpecimenDatasetTask -from data_importer.tasks.indexlot import IndexLotDatasetTask -from data_importer.tasks.artefact import ArtefactDatasetTask - - -@click.command() -@click.argument('date') -@click.option('--local-scheduler', default=False, help='Whether to use the luigi local scheduler.', is_flag=True) -def run_cron(date, local_scheduler): - """ - Helper command to run all three dataset tasks - :param date: data of import to run. - :param local_scheduler: - :return: None - """ - for task in [SpecimenDatasetTask, IndexLotDatasetTask, ArtefactDatasetTask]: - luigi.build([task(date=int(date))], local_scheduler=local_scheduler) - - -if __name__ == "__main__": - run_cron() diff --git a/data_importer/commands/solr/__init__.py b/data_importer/commands/solr/__init__.py deleted file mode 100644 index 1fbd318..0000000 --- a/data_importer/commands/solr/__init__.py +++ /dev/null @@ -1,293 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '04/09/2017'. -""" - -import click -from data_importer.lib.dataset import dataset_get_tasks -from collections import OrderedDict - - -class SolrCommand(object): - def __init__(self, dataset_name): - self.dataset_name = dataset_name - self.dataset = self._get_dataset(dataset_name) - - # Extra field names - if self.dataset_name == 'collection-specimens': - self.multimedia_field = 'associatedMedia' - self.guid_field = 'occurrenceID' - else: - self.multimedia_field = 'multimedia' - self.guid_field = 'GUID' - - @staticmethod - def _get_dataset(dataset_name): - """ - Get dataset matching the package name - @return: - """ - for dataset in dataset_get_tasks(): - if dataset_name == dataset.package_name: - return dataset - - # If we haven't found a dataset, raise an error - names = [d.package_name for d in dataset_get_tasks()] - raise click.UsageError('Unknown dataset name - allowed values are: %s' % ' '.join(names)) - - @property - def dataset_fields(self): - fields = set() - for field in self.dataset.fields: - # Multimedia and created/modified are handled separately - created/modified - # because they're dates; multimedia because it's jsonb - if field.module_name is 'emultimedia' or field.field_alias in ['created', 'modified']: - continue - fields.add(field) - return fields - - # Commands - def get_sql(self): - primary_table = 'ecatalogue' - multimedia_view = '_multimedia_view' - - # We don't want to include mammal group part in the output - these - # records - try: - self.dataset.record_types.remove('Mammal Group Part') - except ValueError: - pass - - sql = OrderedDict([ - ('SELECT', [ - '{}.irn as _id'.format(primary_table), - '{0}.guid::text as {1}'.format(primary_table, self.guid_field), - # Format created and modified so they're acceptable dates for SOLR - '{}.properties->>\'created\' || \'T00:00:00Z\' as created'.format(primary_table), - '{}.properties->>\'modified\' || \'T00:00:00Z\' as modified'.format(primary_table), - '{0}.multimedia as {1}'.format(multimedia_view, self.multimedia_field), - '{}.category as imageCategory'.format(multimedia_view), - 'CASE WHEN {}.irn IS NULL THEN false ELSE true END AS _has_multimedia'.format(multimedia_view), - ]), - ('FROM', [primary_table]), - ('WHERE', [ - '{0}.record_type IN (\'{1}\')'.format( - primary_table, - '\',\''.join(self.dataset.record_types) - ), - '({0}.embargo_date IS NULL OR {0}.embargo_date < NOW())'.format(primary_table), - '{0}.deleted IS NULL'.format(primary_table) - ]), - ]) - - # Fields to be coallesced in the specimens - join by parent catalogue properties - coalesced_fields = [ - 'class', - 'genus', - 'family', - 'phylum', - 'kingdom', - 'order', - 'scientificName', - 'specificEpithet', - 'sex', - 'locality' - ] - - for field in self.dataset_fields: - # Blank recordedBy and identifiedBy fields - if field.field_alias in ['recordedBy', 'identifiedBy']: - sql['SELECT'].append('NULL as "{0}"'.format( - field.field_alias - )) - elif self.dataset_name == 'collection-specimens' and field.field_alias in coalesced_fields: - # TODO: Check which modules these fields are from - e.g etaxonomy or ecatalogue - sql['SELECT'].append('COALESCE({0}.properties->>\'{1}\', parent_catalogue.properties->>\'{1}\', etaxonomy.properties->>\'{1}\') as "{1}"'.format( - field.module_name, - field.field_alias - )) - else: - sql['SELECT'].append('{0}.properties->>\'{1}\' as "{1}"'.format( - field.module_name, - field.field_alias - )) - - for fk in self.dataset.foreign_keys: - - # Special handling for multimedia - if fk.join_module == 'emultimedia': - sql['FROM'].append('LEFT JOIN _multimedia_view ON _multimedia_view.irn={primary_table}.irn'.format( - primary_table=primary_table, - )) - else: - if fk.record_type: - sql['FROM'].append('LEFT JOIN {fk_table} ON {fk_table}.irn={primary_table}.irn AND {primary_table}.record_type=\'{record_type}\''.format( - fk_table=fk.table, - primary_table=primary_table, - record_type=fk.record_type - )) - else: - sql['FROM'].append('LEFT JOIN {fk_table} ON {fk_table}.irn={primary_table}.irn'.format( - fk_table=fk.table, - primary_table=primary_table - )) - - sql['FROM'].append('LEFT JOIN {join_module} {join_alias} ON {join_on}.irn={fk_table}.rel_irn'.format( - fk_table=fk.table, - join_alias=fk.join_alias, - join_module=fk.join_module, - join_on=fk.join_alias if fk.join_alias else fk.join_module - )) - - # Special adjustments for the collection specimens dataset - if self.dataset_name == 'collection-specimens': - # Add GBIF issues - sql['FROM'].append('LEFT JOIN gbif ON {primary_table}.guid=gbif.occurrenceid'.format( - primary_table=primary_table - )) - sql['SELECT'].append('gbif.issue as "gbifIssue"') - sql['SELECT'].append('gbif.id as "gbifID"') - - # Add a few static fields - sql['SELECT'].append('\'Specimen\' as "basisOfRecord"') - sql['SELECT'].append('\'NHMUK\' as "institutionCode"') - - # Add _geom field. - # This is of type LatLonType (https://lucene.apache.org/solr/4_4_0/solr-core/org/apache/solr/schema/LatLonType.html) - # So lat/lon fields need to be concatenated: lat,lng - sql['SELECT'].append('CASE WHEN {primary_table}.properties->>\'decimalLatitude\' IS NOT NULL THEN CONCAT_WS(\',\', {primary_table}.properties->>\'decimalLatitude\', {primary_table}.properties->>\'decimalLongitude\') ELSE NULL END as "geom"'.format( - primary_table=primary_table - )) - - return sql - - def get_query(self, encode): - query = OrderedDict( - [('name', self.dataset_name.rstrip('s'))] - ) - sql = self.get_sql() - # Base query uses just the full SQL statement - query['query'] = self._sql_to_string(sql) - # Make a copy of where - we're going to needed it in the delta queries - where = sql['WHERE'] - - # For delta query, replace the where statement with delta id - # The filtering on record type etc., will be handled by the delta queries - sql['WHERE'] = ["ecatalogue.irn = '${dih.delta._id}'"] - query['deltaImportQuery'] = self._sql_to_string(sql) - # Get IRN of records created or modified since last index - # Uses same filters as default query - query['deltaQuery'] = """ - SELECT irn AS _id FROM ecatalogue WHERE (modified > '${dih.last_index_time}' OR created > '${dih.last_index_time}') AND (%s) - """ % ' AND '.join(where) - - # Get IRN of records deleted since last index time - # Filter on deleted > index time and record type - query['deletedPkQuery'] = """ - SELECT irn AS _id FROM ecatalogue WHERE deleted > '${dih.last_index_time}' AND ecatalogue.record_type IN ('%s') - """ % ('\',\''.join(self.dataset.record_types)) - - # If encoded, escape quotes so this is ready to be dropped into a solr schema doc - if encode: - for k, v in query.items(): - query[k] = self._escape_quotes(v) - - return self._dict2xml(query, 'entity') - - def get_schema(self): - # Create basic schema fields with _id and multimedia - schema_fields = [ - OrderedDict(name="_id", type="int", indexed="true", stored="true", required="true"), - OrderedDict(name=self.multimedia_field, type="string", indexed="false", stored="true", required="false"), - OrderedDict(name=self.guid_field, type="string", indexed="false", stored="true", required="false"), - OrderedDict(name="imageCategory", type="semiColonDelimited", indexed="true", stored="true", required="false", multiValued="true"), - OrderedDict(name="created", type="date", indexed="true", stored="true", required="true"), - OrderedDict(name="modified", type="date", indexed="true", stored="true", required="false"), - OrderedDict(name="_has_multimedia", type="boolean", indexed="true", stored="false", required="false", default="false"), - # sumPreferredCentroidLatDec is populated even when lat/lng is not centroid! - # What field denotes centroid? - # OrderedDict(name="centroid", type="boolean", indexed="true", stored="false", required="false", default="false"), - ] - - # Create a list of schema fields already added - manual_schema_fields = [f['name'] for f in schema_fields] - - # Add in all dataset defined fields - for field in self.dataset_fields: - # Do not add field if we've already added it manually - if field.field_alias not in manual_schema_fields: - schema_fields.append(OrderedDict(name=field.field_alias, type="field_text", indexed="true", stored="true", required="false")) - - # Add additional collection specimen fields - GBIF issues and static fields - if self.dataset_name == 'collection-specimens': - schema_fields.append(OrderedDict(name="geom", type="geospatial_rpt", indexed="true", stored="true", required="false")) - schema_fields.append(OrderedDict(name="gbifIssue", type="semiColonDelimited", indexed="true", stored="true", required="false", multiValued="true")) - schema_fields.append(OrderedDict(name="gbifID", type="field_text", indexed="false", stored="true", required="false")) - schema_fields.append(OrderedDict(name="basisOfRecord", type="field_text", indexed="false", stored="true", required="false")) - schema_fields.append(OrderedDict(name="institutionCode", type="field_text", indexed="false", stored="true", required="false")) - - return self._dict2xml(schema_fields, 'field') - - @staticmethod - def _sql_to_string(sql): - """ - Convert SQL dict to a string - @param sql: - @return: - """ - sql_string = '' - for key, value in sql.items(): - sql_string += ' %s ' % key - if key in ['SELECT', 'GROUP BY']: - conj = ', ' - elif key == 'WHERE': - conj = ' AND ' - else: - conj = ' ' - sql_string += conj.join(value) - - return sql_string - - def _dict2xml(self, d, root_node=None): - wrap = False if None is root_node or isinstance(d, list) else True - root = 'objects' if None is root_node else root_node - root_singular = root[:-1] if 's' == root[-1] and None == root_node else root - xml = '' - children = [] - - if isinstance(d, dict): - for key, value in dict.items(d): - if isinstance(value, dict): - children.append(self._dict2xml(value, key)) - elif isinstance(value, list): - children.append(self._dict2xml(value, key)) - else: - xml = xml + ' ' + key + '="' + str(value) + '"' - else: - for value in d: - children.append(self._dict2xml(value, root_singular)) - - end_tag = '>' if 0 < len(children) else '/>' - - if wrap or isinstance(d, dict): - xml = '<' + root + xml + end_tag - - if 0 < len(children): - for child in children: - xml = xml + child - - if wrap or isinstance(d, dict): - xml = xml + '' - - return xml - - @staticmethod - def _escape_quotes(sql): - """ - If this is to be used in a SOLR Xml schema, need to escape - Quotes - ' & " - @param sql: - @return: escaped str - """ - return sql.replace('\'', ''').replace('"', '"').replace('<', '<') diff --git a/data_importer/commands/solr/query.py b/data_importer/commands/solr/query.py deleted file mode 100644 index 4ed440b..0000000 --- a/data_importer/commands/solr/query.py +++ /dev/null @@ -1,26 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '31/05/2017'. -""" - -import click -import logging -from collections import OrderedDict -from data_importer.commands.solr import SolrCommand - -logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.INFO) -logger = logging.getLogger('luigi-interface') - - -@click.command() -@click.option('--dataset-name', help='Output a SOLR schema.', required=True) -@click.option('--encode', is_flag=True) -def solr_query(dataset_name, encode=False): - solr_cmd = SolrCommand(dataset_name) - query = solr_cmd.get_query(encode) - print(query) - - -if __name__ == "__main__": - solr_query() diff --git a/data_importer/commands/solr/schema.py b/data_importer/commands/solr/schema.py deleted file mode 100644 index 39cc7dc..0000000 --- a/data_importer/commands/solr/schema.py +++ /dev/null @@ -1,26 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '31/05/2017'. -""" - -import click -import logging -from collections import OrderedDict - -from data_importer.commands.solr import SolrCommand - -logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.INFO) -logger = logging.getLogger('luigi-interface') - - -@click.command() -@click.option('--dataset-name', help='Output a SOLR schema.', required=True) -def solr_schema(dataset_name): - solr_cmd = SolrCommand(dataset_name) - schema = solr_cmd.get_schema() - print(schema.replace('/>', '/>\n')) - - -if __name__ == "__main__": - solr_schema() diff --git a/data_importer/default.cfg.example b/data_importer/default.cfg.example deleted file mode 100644 index 589cbb1..0000000 --- a/data_importer/default.cfg.example +++ /dev/null @@ -1,5 +0,0 @@ -[database] -name = -host = -user = -password = \ No newline at end of file diff --git a/data_importer/lib/__init__.py b/data_importer/lib/__init__.py deleted file mode 100644 index 8486303..0000000 --- a/data_importer/lib/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by 'bens3' on 2013-06-21. -Copyright (c) 2013 'bens3'. All rights reserved. -""" diff --git a/data_importer/lib/ckan.py b/data_importer/lib/ckan.py deleted file mode 100644 index 35b456c..0000000 --- a/data_importer/lib/ckan.py +++ /dev/null @@ -1,68 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '21/03/2017'. -""" - -import ckanapi -import psycopg2 -from data_importer.lib.config import Config - - -class CKAN(object): - """ - Interact with CKAN instance via API - """ - def __init__(self): - self.remote_ckan = ckanapi.RemoteCKAN(Config.get('ckan', 'site_url'), apikey=self.get_api_key()) - - @staticmethod - def get_api_key(): - connection = psycopg2.connect( - host=Config.get('database', 'host'), - port=Config.get('database', 'port'), - database=Config.get('database', 'ckan_dbname'), - user=Config.get('database', 'username'), - password=Config.get('database', 'password') - ) - cursor = connection.cursor() - sql = """ SELECT apikey - FROM public.user - WHERE name = %s; - """ - cursor.execute(sql, (Config.get('ckan', 'api_user'),)) - result = cursor.fetchone() - return result[0] - - def get_package(self, package_name): - """ - Get a resource id for a dataset dict - Resource ID is also use as datastore table name - :param package_name: - :return: resource id - """ - try: - return self.remote_ckan.action.package_show(id=package_name) - except ckanapi.NotFound: - return False - - def create_package(self, pkg_dict): - self.remote_ckan.action.package_create(**pkg_dict) - - def update_package(self, pkg_dict): - self.remote_ckan.action.package_update(**pkg_dict) - - def get_resource(self, resource_id): - """ - Get a resource id for a dataset dict - Resource ID is also use as datastore table name - :param resource_id: - :return: resource id - """ - try: - return self.remote_ckan.action.resource_show(id=resource_id) - except ckanapi.NotFound: - return False - - def update_resource(self, resource_dict): - self.remote_ckan.action.resource_update(**resource_dict) diff --git a/data_importer/lib/column.py b/data_importer/lib/column.py deleted file mode 100644 index ca3db5b..0000000 --- a/data_importer/lib/column.py +++ /dev/null @@ -1,33 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '22/03/2017'. -""" - - -class Column(object): - """ - Field definition - :param module_name: keemu module - :param field_name: field name - param field_name: postgres field type - :param field_alias: the keemu field - :param formatter: a function that when given a value returns a formatted - version appropriate for this column. If no formatter is provided the value - is simply returned as is. - """ - def __init__(self, field_name, field_type, ke_field_name=None, - indexed=False, formatter=lambda v: v): - self.field_name = field_name - self.field_type = field_type - self.ke_field_name = ke_field_name - self.indexed = indexed - self.formatter = formatter - - def get_index_type(self): - """ - Get index type - we use GIN for JSONB fields; BTREE for all others - @return: - """ - - return 'GIN' if self.field_type == 'JSONB' else 'BTREE' diff --git a/data_importer/lib/config.py b/data_importer/lib/config.py deleted file mode 100644 index 2e05e63..0000000 --- a/data_importer/lib/config.py +++ /dev/null @@ -1,25 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '20/02/2017'. -""" - -import os -import sys - -try: - import configparser -except ImportError: - from six.moves import configparser - -Config = configparser.ConfigParser() -config_files = ['default.cfg'] - -# Add test config if test flag is set -if 'unittest' in sys.argv[0]: - config_files.append('test.cfg') - -# Build config, looping through all the file options -config_dir = os.path.dirname(os.path.dirname(__file__)) -for config_file in config_files: - Config.read(os.path.join(config_dir, config_file)) diff --git a/data_importer/lib/dataset.py b/data_importer/lib/dataset.py deleted file mode 100644 index 3b422fb..0000000 --- a/data_importer/lib/dataset.py +++ /dev/null @@ -1,40 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '31/08/2017'. -""" - - -def dataset_get_tasks(): - from data_importer.tasks.indexlot import IndexLotDatasetTask - from data_importer.tasks.specimen import SpecimenDatasetTask - from data_importer.tasks.artefact import ArtefactDatasetTask - return [SpecimenDatasetTask, IndexLotDatasetTask, ArtefactDatasetTask] - - -def dataset_get_properties(module_name): - """ - Loop through all of the datasets, and extract all the of property fields - For a particular module name - @param module_name: - @return: list of properties - """ - record_properties = [] - for dataset_task in dataset_get_tasks(): - record_properties += [f for f in dataset_task.fields if f.module_name == module_name] - return record_properties - - -def dataset_get_foreign_keys(module_name=None): - """ - Build a list of all foreign key fields - :return: - """ - foreign_keys = set() - for dataset_task in dataset_get_tasks(): - for foreign_key in dataset_task.foreign_keys: - if not module_name: - foreign_keys.add(foreign_key) - elif foreign_key.module_name == module_name: - foreign_keys.add(foreign_key) - return foreign_keys \ No newline at end of file diff --git a/data_importer/lib/db.py b/data_importer/lib/db.py deleted file mode 100644 index 88742e1..0000000 --- a/data_importer/lib/db.py +++ /dev/null @@ -1,89 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '21/03/2017'. -""" - - -def db_view_exists(view_name, connection): - cursor = connection.cursor() - cursor.execute("select exists(select * from pg_matviews where matviewname=%s)", (view_name,)) - return cursor.fetchone()[0] - - -def db_drop_view(view_name, connection): - """ - Drop the table - :param view_name - :param connection: - :return: - """ - query = 'DROP MATERIALIZED VIEW IF EXISTS "{view_name}" CASCADE'.format(view_name=view_name) - connection.cursor().execute(query) - connection.commit() - - -def db_create_index(table_name, field_name, index_type, connection): - cursor = connection.cursor() - query = "CREATE INDEX IF NOT EXISTS {table}_{field_name}_idx ON {table} USING {index_type} ({field_name})".format( - table=table_name, - index_type=index_type, - field_name=field_name - ) - cursor.execute(query) - - -def db_table_exists(table_name, connection): - """ - Check if a table exists - :param table_name: - :param connection: - :return: - """ - cursor = connection.cursor() - cursor.execute("select exists(select * from information_schema.tables where table_name=%s)", (table_name,)) - return cursor.fetchone()[0] - - -def db_drop_table(table_name, connection): - """ - Drop the table - :param table_name - :param connection: - :return: - """ - query = "DROP TABLE IF EXISTS {table} CASCADE".format(table=table_name) - connection.cursor().execute(query) - connection.commit() - - -def db_table_has_records(table_name, connection): - """ - Drop the table - :param table_name - :param connection: - :return: - """ - if db_table_exists(table_name, connection): - cursor = connection.cursor() - cursor.execute(""" - SELECT SIGN(COUNT(*)) - FROM {table} LIMIT 1 - """.format(table=table_name) - ) - return bool(cursor.fetchone()[0]) - return False - - -def db_delete_record(table_name, irn, cursor): - """ - Mark a record as deleted - :param table_name - :param irn: record irn - :param cursor: - :return: - """ - sql = "UPDATE {table_name} SET (deleted) = (NOW()) WHERE {table_name}.irn = %s".format( - table_name=table_name, - ) - cursor.execute(sql, (irn,)) diff --git a/data_importer/lib/field.py b/data_importer/lib/field.py deleted file mode 100644 index 2fcaf35..0000000 --- a/data_importer/lib/field.py +++ /dev/null @@ -1,43 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '22/03/2017'. -""" - - -class Field(object): - """ - Field definition - :param module_name: keemu module - :param field_name: keemu field name - :param field_alias: the field name in the dataset - """ - - def __init__(self, module_name, field_name, field_alias, formatter=None): - self.module_name = module_name - self.field_name = field_name if isinstance(field_name, list) else [field_name] - self.field_alias = field_alias - self.formatter = formatter - - def _get_value(self, record): - """ - Loop through all the field names, returning value if it exists - @param record: - @return: - """ - for fn in self.field_name: - if getattr(record, fn, None): - return getattr(record, fn) - return None - - def get_value(self, record): - v = self._get_value(record) - if self.formatter: - v = self.formatter(v) - if v and isinstance(v, str): - # remove any whitespace at either end of the actual data - v = v.strip() - return v - - def has_value(self, record): - return self._get_value(record) \ No newline at end of file diff --git a/data_importer/lib/filter.py b/data_importer/lib/filter.py deleted file mode 100644 index 1921441..0000000 --- a/data_importer/lib/filter.py +++ /dev/null @@ -1,34 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '22/03/2017'. -""" - - -class Filter(object): - """ - Field definition - :param field_name: keemu field name - :param filters: List of filters to apply - """ - def __init__(self, field_name, filters): - self.field_name = field_name - self.filters = filters - - def apply(self, record): - value = getattr(record, self.field_name, None) - for f in self.filters: - # Some filters have a value for comparison; others are just a function - try: - filter_operator, filter_value = f - ret = filter_operator(value, filter_value) - except TypeError: - ret = f(value) - - if not ret: - return False - - return True - - def __str__(self): - return '%s - %s' % (self.field_name, self.filters) diff --git a/data_importer/lib/foreign_key.py b/data_importer/lib/foreign_key.py deleted file mode 100644 index db6ace3..0000000 --- a/data_importer/lib/foreign_key.py +++ /dev/null @@ -1,95 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '12/09/2017'. -""" - -import psycopg2 -from data_importer.lib.db import db_table_exists - - -class ForeignKeyField(object): - """ - Foreign Key field - :param module_name: - :param join_module: - :param field_name: KE EMu field name - :param record_type: optional - foreign key is valid only for certain record types - """ - - def __init__(self, module_name, join_module, field_name, join_alias='', record_type=None): - self.module_name = module_name - self.join_module = join_module - self.field_name = field_name - self.join_alias = join_alias - self.record_type = record_type - - @property - def table(self): - """ - Table name is just module name, but is required by LuigiCopyToTable - :return: string - """ - return '_{}__{}'.format(self.module_name, self.join_module) - - @property - def insert_sql(self): - """ - SQL for inserting - Uses SELECT...WHERE EXISTS to ensure the IRN exists in the join table - If there's a conflict on irn/rel_irn, do not insert - """ - sql = """ - INSERT INTO {table_name}(irn, rel_irn) - SELECT %(irn)s, %(rel_irn)s - WHERE EXISTS(SELECT 1 FROM {join_module} where irn=%(rel_irn)s) - ON CONFLICT (irn, rel_irn) DO NOTHING; - """.format( - table_name=self.table, - join_module=self.join_module - ) - return sql - - @property - def delete_sql(self): - sql = """ - DELETE FROM {table_name} WHERE irn = %(irn)s - """.format( - table_name=self.table, - ) - return sql - - def create_table(self, connection): - if not db_table_exists(self.table, connection): - query = """ - CREATE TABLE {table} ( - irn int references {module_name}(irn), - rel_irn int references {join_module}(irn) - ) - """.format( - table=self.table, - module_name=self.module_name, - join_module=self.join_module - ) - connection.cursor().execute(query) - # Create indexes - postgres does not index reference fields - query = """ - CREATE UNIQUE INDEX ON {table} (irn, rel_irn) - """.format( - table=self.table, - ) - connection.cursor().execute(query) - - def delete(self, cursor, record): - cursor.execute(self.delete_sql, {'irn': record.irn}) - - def insert(self, cursor, record, rel_irn): - - # We can get a list of IRNs, so convert to list so we can easily loop - # Also, ensure all are integers - irn_list = [int(rel_irn)] if not isinstance(rel_irn, list) else map(int, rel_irn) - for irn in irn_list: - cursor.execute(self.insert_sql, { - 'irn': record.irn, - 'rel_irn': irn - }) diff --git a/data_importer/lib/operators.py b/data_importer/lib/operators.py deleted file mode 100644 index 0f71459..0000000 --- a/data_importer/lib/operators.py +++ /dev/null @@ -1,36 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '03/03/2017'. -""" - - -def is_one_of(a, b): - """ - Helper operator a is in list b - :param a: str - :param b: list - :return: boolean - """ - return a in b - - -def is_not_one_of(a, b): - """ - Helper operator a is not in list b - :param a: str - :param b: list - :return: boolean - """ - return a not in b - - -def is_uuid(a): - """ - Helper operator a is not in list b - :param a: str - :param b: list - :return: boolean - """ - # FIXME: Better UUID checking - return len(a) == 36 diff --git a/data_importer/lib/parser.py b/data_importer/lib/parser.py deleted file mode 100644 index a944478..0000000 --- a/data_importer/lib/parser.py +++ /dev/null @@ -1,47 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '25/02/2017'. -""" - -import re -import gzip -from data_importer.lib.record import Record - - -class Parser(object): - """ - Iterator for parsing a KE EMu export file - Yields record objects - """ - def __init__(self, path): - self.path = path - self.export_file = gzip.open(self.path, 'rt') - self.re_field_name_index = re.compile(':([0-9]?)+') - - def __iter__(self): - return self - - def next(self): - for record in self._parse(): - return record - raise StopIteration() - - def _parse(self): - record = Record() - for line in self.export_file: - line = line.strip() - if not line: - continue - # If is a line separator, write the record - if line == '###': - yield record - record = Record() - else: - field_name, value = line.split('=', 1) - # Replace field name indexes - field_name = self.re_field_name_index.sub('', field_name) - setattr(record, field_name, value) - - # Python 3.X Compatibility - __next__ = next diff --git a/data_importer/lib/query.py b/data_importer/lib/query.py deleted file mode 100644 index 43e9b28..0000000 --- a/data_importer/lib/query.py +++ /dev/null @@ -1,14 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '22/03/2017'. -""" - - -class Query(object): - - def __init__(self, select): - self.select = select - - def to_sql(self): - return "" diff --git a/data_importer/lib/record.py b/data_importer/lib/record.py deleted file mode 100644 index 6bc6c1d..0000000 --- a/data_importer/lib/record.py +++ /dev/null @@ -1,24 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '14/02/2017'. -""" - - -class Record(object): - """ - Object with setter overridden so multi-value fields are turned into an array - """ - def __setattr__(self, key, value): - if value: - if key in self.__dict__: - try: - self.__dict__[key].append(value) - except AttributeError: - self.__dict__[key] = [self.__dict__[key], value] - else: - self.__dict__[key] = value - - # if key in ['NhmSecEmbargoDate', 'NhmSecEmbargoExtensionDate']: - # print(key) - # print(value) diff --git a/data_importer/lib/solr_index.py b/data_importer/lib/solr_index.py deleted file mode 100644 index 23b3ff5..0000000 --- a/data_importer/lib/solr_index.py +++ /dev/null @@ -1,43 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '04/09/2017'. -""" - -import os -import requests - - -class SolrIndex: - """ - Class for handling solr indexes - Public api: - full_import - run full-import command - delta_import - run delta-import command - status - get status of full-import command - """ - - def __init__(self, host, core_name): - self.host = host - self.core_name = core_name - - def _request(self, command): - # We want the JSON response - params = { - 'wt': 'json', - 'json.nl': 'map', - 'command': command - } - url = os.path.join(self.host, 'solr', self.core_name, 'dataimport') - r = requests.get(url, params=params) - r.raise_for_status() - return r.json() - - def full_import(self): - return self._request('full-import') - - def delta_import(self): - return self._request('delta-import') - - def status(self): - return self._request('status') diff --git a/data_importer/lib/stats.py b/data_importer/lib/stats.py deleted file mode 100644 index 2c58d56..0000000 --- a/data_importer/lib/stats.py +++ /dev/null @@ -1,287 +0,0 @@ -import math -from datetime import datetime as dt - -import abc -import psycopg2 -import requests - -from data_importer.lib.config import Config, configparser - - -def get_milestones(cursor): - return [ - SpecimenMilestone(cursor) - ] - - -class BaseMilestone(object): - name = '' - - try: - log = Config.get('milestones', 'log') - except configparser.NoOptionError: - log = '/var/log/import-milestones.log' - - try: - slackurl = Config.get('milestones', 'slack') - except configparser.NoOptionError: - slackurl = None - - def __init__(self, cursor): - self.every = Config.getint('milestones', self.name) - self.starting_records = self._get_database_record_count(cursor) - self.matched_records = 0 - - @property - @abc.abstractmethod - def query(self): - """ - A query to count the number of relevant records already in the database. - :return: string query - """ - return '' - - def _get_database_record_count(self, cursor): - """ - Get the number of relevant records currently in the database. - :param cursor: a cursor connected to the database - :return: int - """ - try: - cursor.execute(self.query) - row_count = cursor.fetchone()[0] - except psycopg2.ProgrammingError as e: - if e.pgcode == psycopg2.errorcodes.UNDEFINED_TABLE: - # the table we're attempting to count on doesn't exist yet, this just means this is the first import for - # this table and therefore we should return a count of 0 - row_count = 0 - # also rollback the connection otherwise any subsequent sql will fail. This is safe to do at this point - # without loss of any upserted records because this record count function is only called during - # initialisation, before any data has even been read from the keemu dumps - cursor.connection.rollback() - else: - # if it's not a undefined table issue then re-raise it - raise e - return row_count - - @abc.abstractmethod - def match(self, record_dict): - """ - Does this record match the criteria to be counted towards this milestone? - :param record_dict: the record in dict form - :return: boolean; True if it's relevant/matches, False if not - """ - return True - - @property - def next_milestone(self): - """ - The next record count to aim for. - :return: int - """ - return math.ceil(self.current_count / self.every) * self.every - - @property - def current_count(self): - """ - The combined count of records already in the database at the start of processing - and the number of records added since. - :return: int - """ - return self.starting_records + self.matched_records - - def _log_entry(self, record_dict): - entry = '{0}: {1} {2} (IRN: {3}, GUID: {4})'.format( - dt.now().strftime('%Y-%m-%d:%H:%M:%S'), - self.current_count, - self.name, - record_dict.get('irn', 'no IRN'), - record_dict.get('guid', 'no GUID') - ) - return entry - - def log_item(self, record_dict): - """ - Add a record to the log. - :param record_dict: the record to be logged - """ - entry = self._log_entry(record_dict) - try: - with open(self.log, 'a') as logfile: - logfile.write(entry) - except OSError: - print(entry) - - def _slack_msg(self, record_dict): - """ - Generate the slack message. - :param record_dict: the record to be posted about - :return: - """ - entry = self._log_entry(record_dict) - data = { - 'attachments': [ - { - 'fallback': entry, - 'pretext': 'The data importer just reached {0} {1}!'.format( - self.current_count, self.name), - 'title': record_dict.get('irn', 'no IRN') - } - ] - } - return data - - def slack(self, record_dict): - """ - Post the record to a slack channel. - :param record_dict: the record to be posted - """ - if self.slackurl is None: - return - try: - data = self._slack_msg(record_dict) - r = requests.post(self.slackurl, json=data) - if not r.ok: - raise requests.ConnectionError(request=r.request, response=r) - except requests.ConnectionError as e: - try: - with open(self.log + '.errors', 'a') as logfile: - logfile.write(e.response) - except OSError: - print('Could not post to slack.') - - def check(self, record_dict): - """ - Check if the current record meets the criteria and logs/notifies if it does. - :param record_dict: the record - """ - if self.match(record_dict): - self.matched_records += 1 - if self.current_count == self.next_milestone: - self.log_item(record_dict) - self.slack(record_dict) - - -class SpecimenMilestone(BaseMilestone): - name = 'specimens' - - def __init__(self, *args, **kwargs): - from data_importer.tasks import specimen_record_types - self.record_types = [i for i in specimen_record_types if 'Group Parent' not in i] - super(SpecimenMilestone, self).__init__(*args, **kwargs) - - def match(self, record_dict): - record_type = record_dict.get('record_type', None) in self.record_types - embargoed = record_dict.get('embargo_date', None) is not None - if embargoed: - try: - embargo_date = dt.strptime(record_dict.get('embargo_date'), '%Y%m%d') - except: - embargo_date = dt.now() - else: - embargo_date = dt.now() - embargo_passed = embargo_date <= dt.now() - return record_type and (not embargoed or embargo_passed) - - @staticmethod - def _emoji(record_properties): - """ - Find an emoji representing this record. - :param record_properties: the record's properties (as a dict) - :return: an emoji string representation (surrounded by colons) - """ - emoji_dict = { - 'BMNH(E)': { - None: ':bug:' - }, - 'BOT': { - None: ':deciduous_tree:', - 'bryophytes': ':golf:', - 'diatoms': ':eight_pointed_black_star:', - 'flowering plants': ':blossom:', - 'seed plants: brit & irish': ':seedling:' - }, - 'MIN': { - None: ':gem:' - }, - 'PAL': { - None: ':t-rex:', - 'vertebrates': ':sauropod:', - 'invertebrates': ':trilobite:', - 'palaeobotany': ':fallen_leaf:' - }, - 'ZOO': { - None: ':penguin:', - 'annelida': ':wavy_dash:', - 'aves': ':bird:', - 'cnidaria': ':space_invader:', - 'crustacea': ':crab:', - 'echinodermata': ':star:', - 'mammalia': ':monkey_face:', - 'mollusca': ':snail:', - 'pisces': ':fish:', - 'porifera': ':cloud:', - 'reptiles & amphibians': ':snake:' - }, - None: { - None: ':darwin:' - }, - } - - coll = record_properties.get('collectionCode', None) - if coll: - coll = coll.upper() - sub_dep = record_properties.get('subDepartment', None) - if sub_dep: - sub_dep = sub_dep.lower() - coll_emojis = emoji_dict.get(coll, emoji_dict.get(None, {})) - emoji = coll_emojis.get(sub_dep, coll_emojis.get(None, None)) - return emoji or ':question:' - - @staticmethod - def _colour(record_properties): - """ - Get a colour for the slack message. - :param record_properties: the record's properties (as a dict) - :return: a hex colour code - """ - colour_dict = { - 'BMNH(E)': '#563635', - 'BOT': '#f4e04d', - 'MIN': '#042d6b', - 'PAL': '#5daf57', - 'ZOO': '#af2d2d', - None: '#ca7df9', - } - - coll = record_properties.get('collectionCode', None).upper() - return colour_dict.get(coll, '#ca7df9') - - def _slack_msg(self, record_dict): - data = super(SpecimenMilestone, self)._slack_msg(record_dict) - properties = record_dict['properties'].adapted - data['attachments'][0]['color'] = self._colour(properties) - data['attachments'][0]['author_name'] = properties.get('recordedBy', - properties.get( - 'identifiedBy', - properties.get( - 'donorName', - '-'))) - data['attachments'][0]['title'] = properties.get('scientificName', - properties.get('catalogNumber', - 'Record')) - data['attachments'][0][ - 'title_link'] = 'http://data.nhm.ac.uk/object/' + record_dict.get( - 'guid') - data['attachments'][0]['footer'] = self._emoji(properties) - created = properties.get('created', None) - if created is not None: - data['attachments'][0]['ts'] = dt.strptime(created, '%Y-%m-%d').timestamp() - return data - - @property - def query(self): - record_types = "','".join(self.record_types) - return "select count(*) from ecatalogue where record_type in ('{0}') and (" \ - "embargo_date is null or embargo_date < NOW()) and deleted is " \ - "null".format(record_types) diff --git a/data_importer/migrations/add-pending-column-to-emultimedia.sql b/data_importer/migrations/add-pending-column-to-emultimedia.sql deleted file mode 100644 index 675b313..0000000 --- a/data_importer/migrations/add-pending-column-to-emultimedia.sql +++ /dev/null @@ -1,38 +0,0 @@ -/* -migration: 20180530-1 -database: datastore_default -description: adding new pending column to emultimedia and setting all existing - values to false. -*/ - --- add the new boolean column -alter table emultimedia add column pending boolean; --- set the default value to false as all entries prior to this migration's run --- will not be pending. Setting the value on all existing rows like this is --- better than using a default value when altering the table as we avoid locking --- the entire table whilst updating some 2million rows which will take a while. -update emultimedia set pending = false; - - --- drop the materialized view _multimedia_view so that it can be redefined -drop materialized view _multimedia_view; --- redefine the materialized view -CREATE MATERIALIZED VIEW _multimedia_view AS ( - SELECT - _ecatalogue__emultimedia.irn, - COALESCE(jsonb_agg( - jsonb_build_object('identifier', format('http://www.nhm.ac.uk/services/media-store/asset/%s/contents/preview', emultimedia.properties->>'assetID'), - 'type', 'StillImage', - 'license', 'http://creativecommons.org/licenses/by/4.0/', - 'rightsHolder', 'The Trustees of the Natural History Museum, London') || emultimedia.properties) - FILTER (WHERE emultimedia.irn IS NOT NULL), NULL)::TEXT as multimedia, - string_agg(DISTINCT emultimedia.properties->>'category', ';') as category - FROM emultimedia - INNER JOIN _ecatalogue__emultimedia ON _ecatalogue__emultimedia.rel_irn = emultimedia.irn - WHERE - (embargo_date IS NULL OR embargo_date < NOW()) - AND deleted IS NULL - AND pending IS FALSE - GROUP BY _ecatalogue__emultimedia.irn); - -CREATE UNIQUE INDEX ON _multimedia_view (irn); diff --git a/data_importer/migrations/rename-exsiccata-vicecounty.sql b/data_importer/migrations/rename-exsiccata-vicecounty.sql deleted file mode 100644 index 2c90909..0000000 --- a/data_importer/migrations/rename-exsiccata-vicecounty.sql +++ /dev/null @@ -1,19 +0,0 @@ -/* -migration: 20171214-1 -database: datastore_default -description: renaming the keys exsiccati, exsiccatiNumber, and viceCountry. -*/ - -update ecatalogue -set - properties = properties - 'exsiccatiNumber' - 'exsiccati' - 'viceCountry' - || - jsonb_build_object('exsiccataNumber', properties -> 'exsiccatiNumber', 'exsiccata', - properties -> 'exsiccati', 'viceCounty', properties -> 'viceCountry'); - --- should be 0 if successful -select count(key) as "failed changes:" -from (select jsonb_object_keys(properties) as key - from ecatalogue) as keys -where - key in ('exsiccati', 'exsiccatiNumber', 'viceCountry'); \ No newline at end of file diff --git a/data_importer/tasks/__init__.py b/data_importer/tasks/__init__.py deleted file mode 100644 index 68b8ef6..0000000 --- a/data_importer/tasks/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '20/03/2017'. -""" - -from .specimen import SpecimenDatasetTask - -specimen_record_types = SpecimenDatasetTask.record_types \ No newline at end of file diff --git a/data_importer/tasks/artefact.py b/data_importer/tasks/artefact.py deleted file mode 100644 index 13ef8a5..0000000 --- a/data_importer/tasks/artefact.py +++ /dev/null @@ -1,42 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '14/03/2017'. - - - -""" - -import luigi - -from operator import eq, is_not - -from data_importer.tasks.dataset import DatasetTask -from data_importer.lib.field import Field -from data_importer.lib.filter import Filter -from data_importer.lib.config import Config - - -class ArtefactDatasetTask(DatasetTask): - - package_name = 'collection-artefacts' - package_description = "Cultural and historical artefacts from The Natural History Museum" - package_title = "Artefacts" - - resource_title = 'Artefacts' - resource_id = Config.get('resource_ids', 'artefact') - resource_description = 'Museum Artefacts' - - record_types = ['Artefact'] - - fields = DatasetTask.fields + [ - Field('ecatalogue', 'PalArtObjectName', 'artefactName'), - # FIXME: Artefact type is missing?? - Field('ecatalogue', 'PalArtType', 'artefactType'), - Field('ecatalogue', 'PalArtDescription', 'artefactDescription'), - Field('ecatalogue', 'IdeCurrentScientificName', 'scientificName') - ] - -if __name__ == "__main__": - luigi.run(main_task_cls=ArtefactDatasetTask) - diff --git a/data_importer/tasks/dataset.py b/data_importer/tasks/dataset.py deleted file mode 100644 index 26e6f4d..0000000 --- a/data_importer/tasks/dataset.py +++ /dev/null @@ -1,183 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '14/03/2017'. - -""" - -import sys -import time -import logging -import abc -import luigi -import datetime -from operator import is_not, ne -from prompter import yesno - -from data_importer.lib.config import Config -from data_importer.lib.field import Field -from data_importer.lib.foreign_key import ForeignKeyField -from data_importer.lib.ckan import CKAN -from data_importer.tasks.solr.index import SolrIndexTask -from data_importer.tasks.delete import DeleteTask -from data_importer.lib.db import db_table_exists -from data_importer.tasks.keemu.ecatalogue import EcatalogueTask -from data_importer.tasks.postgres import PostgresTask - -logger = logging.getLogger('luigi-interface') - - -class DatasetTask(PostgresTask): - """ - Base Dataset Task - """ - # KE EMu export date to process - date = luigi.IntParameter() - # Limit - only used when testing - limit = luigi.IntParameter(default=None, significant=False) - - resource_type = 'csv' - priority = 1 - - # Luigi Postgres database connections - host = Config.get('database', 'host') - database = Config.get('database', 'datastore_dbname') - user = Config.get('database', 'username') - password = Config.get('database', 'password') - - # List of all fields - fields = [ - # All datasets include create and update - Field('ecatalogue', 'AdmDateInserted', 'created'), - Field('ecatalogue', 'AdmDateModified', 'modified'), - # All datasets include multimedia fields - Field('emultimedia', 'GenDigitalMediaId', 'assetID'), - Field('emultimedia', 'MulTitle', 'title'), - Field('emultimedia', 'MulMimeFormat', 'mime'), - Field('emultimedia', 'MulCreator', 'creator'), - Field('emultimedia', 'DetResourceType', 'category'), - ] - - # All datasets have a foreign key join with emultimedia - foreign_keys = [ - ForeignKeyField('ecatalogue', 'emultimedia', 'MulMultiMediaRef'), - ] - - @abc.abstractproperty - def record_types(self): - """ - Record type(s) to use to build this dataset - :return: String or List - """ - return [] - - @abc.abstractproperty - def package_name(self): - """ - Name of the package being created - :return: String - """ - return None - - @abc.abstractproperty - def resource_title(self): - """ - Title of the resource - :return: String - """ - return None - - @abc.abstractproperty - def resource_id(self): - """ - ID of the resource - :return: String - """ - return None - - @property - def table(self): - """ - Table name - ID of the resource - """ - return self.resource_id - - def run(self): - self.create_ckan_dataset() - connection = self.output().connect() - if not db_table_exists(self.table, connection): - self.create_table(connection) - - def create_ckan_dataset(self): - """ - Create a dataset on CKAN - :return: - """ - - ckan = CKAN() - - pkg_dict = { - 'name': self.package_name, - 'notes': self.package_description, - 'title': self.package_title, - 'author': Config.get('ckan', 'dataset_author'), - 'license_id': Config.get('ckan', 'dataset_licence'), - 'resources': [ - { - 'id': self.resource_id, - 'name': self.resource_title, - 'description': self.resource_description, - 'format': self.resource_type, - 'url': '_datastore_only_resource', - 'url_type': 'dataset' - } - ], - 'dataset_category': Config.get('ckan', 'dataset_type'), - 'owner_org': Config.get('ckan', 'owner_org') - } - - package = ckan.get_package(self.package_name) - resource = ckan.get_resource(self.resource_id) - # If we have a package, update resource modified date - if package: - logger.info('Updating CKAN dataset %s', self.package_name) - package['last_modified'] = datetime.datetime.now().isoformat() - ckan.update_package(package) - else: - if not yesno('Package {package_name} does not exist. Do you want to create it?'.format( - package_name=self.package_name - )): - sys.exit("Import cancelled") - - # Check the resource doesn't already exist - if resource: - raise Exception('Resource {resource_title} ({resource_id}) already exists - package cannot be created') - - # Create the package - ckan.create_package(pkg_dict) - - def create_table(self, connection): - query = "CREATE TABLE \"{table}\" (_id INT PRIMARY KEY)".format(table=self.table) - connection.cursor().execute(query) - connection.commit() - - def requires(self): - """ - Luigi requires - Just requires the keemu ecatalogue import - which is dependent - On etaxonomy and emultimedia - :return: - """ - full_export_date = Config.getint('keemu', 'full_export_date') - # IS the date being run a full export date? - is_full_export = full_export_date == self.date - requirements = [ - EcatalogueTask(date=self.date, limit=self.limit), - SolrIndexTask(core=self.package_name, full_import=is_full_export) - ] - # If this isn't a full export date, add the delete task - # On full exports there will be no eaudit file produced - if not is_full_export: - requirements.append(DeleteTask(date=self.date)) - - return requirements diff --git a/data_importer/tasks/delete.py b/data_importer/tasks/delete.py deleted file mode 100644 index 83e2949..0000000 --- a/data_importer/tasks/delete.py +++ /dev/null @@ -1,70 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '30/03/2017'. -""" - -import luigi -import logging -from data_importer.tasks.postgres import PostgresTask -from data_importer.tasks.file.keemu import KeemuFileTask -from data_importer.lib.parser import Parser -from data_importer.lib.dataset import dataset_get_tasks -from data_importer.lib.db import db_delete_record - -logger = logging.getLogger('luigi-interface') - - -class DeleteTask(PostgresTask): - - # Task params - date = luigi.IntParameter() - table = 'eaudit' - - # Run delete before all dataset tasks - priority = 100 - - def requires(self): - return KeemuFileTask( - file_name='eaudit.deleted-export', - date=self.date - ) - - @staticmethod - def list_all_modules(): - """ - Build a list of all unique module names - :return: - """ - dataset_tasks = dataset_get_tasks() - modules = set() - [[modules.add(field.module_name) for field in dataset_task.fields] for dataset_task in dataset_tasks] - return list(modules) - - def run(self): - logger.info('Executing task: {name}'.format(name=self.__class__)) - modules = self.list_all_modules() - connection = self.output().connect() - cursor = connection.cursor() - for record in Parser(self.input().path): - module_name = record.AudTable - # We only want to delete a record, if we're using information - # from the module - if module_name not in modules: - # Skip record if it's one of the modules - # we're not using - continue - - irn = record.AudKey - db_delete_record(module_name, irn, cursor) - - # mark as complete in same transaction - self.output().touch(connection) - - # commit and close connection - connection.commit() - connection.close() - - -if __name__ == "__main__": - luigi.run(main_task_cls=DeleteTask) \ No newline at end of file diff --git a/data_importer/tasks/file/__init__.py b/data_importer/tasks/file/__init__.py deleted file mode 100644 index a52c5fd..0000000 --- a/data_importer/tasks/file/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '08/09/2017'. -""" \ No newline at end of file diff --git a/data_importer/tasks/file/keemu.py b/data_importer/tasks/file/keemu.py deleted file mode 100644 index b23faa9..0000000 --- a/data_importer/tasks/file/keemu.py +++ /dev/null @@ -1,35 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by 'bens3' on 2013-06-21. -Copyright (c) 2013 'bens3'. All rights reserved. -""" - -import os -import luigi -from data_importer.lib.config import Config - - -class KeemuFileTask(luigi.ExternalTask): - """ - Wrapper around a KE EMu export file - Luigi requires LocalTarget tasks to be external tasks - """ - date = luigi.IntParameter() - file_name = luigi.Parameter() - - @property - def file_path(self): - file_name = '{file_name}.{date}.gz'.format( - file_name=self.file_name, - date=self.date - ) - return os.path.join(Config.get('keemu', 'export_dir'), file_name) - - def output(self): - # Check file size if greater than zero - # Will also raise an error if the file doesn't exist - file_size = os.path.getsize(self.file_path) - if file_size == 0: - raise IOError('KE EMu export file %s has zero byte length' % self.file_path) - return luigi.LocalTarget(self.file_path) diff --git a/data_importer/tasks/file/local.py b/data_importer/tasks/file/local.py deleted file mode 100644 index 5a9c798..0000000 --- a/data_importer/tasks/file/local.py +++ /dev/null @@ -1,19 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by 'bens3' on 2013-06-21. -Copyright (c) 2013 'bens3'. All rights reserved. -""" - -import luigi -from luigi.format import Gzip - - -class LocalFileTask(luigi.ExternalTask): - """ - Wrapper around a local file - """ - file_path = luigi.Parameter() - - def output(self): - return luigi.LocalTarget(self.file_path, Gzip) diff --git a/data_importer/tasks/file/remote.py b/data_importer/tasks/file/remote.py deleted file mode 100644 index 79658c7..0000000 --- a/data_importer/tasks/file/remote.py +++ /dev/null @@ -1,35 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '08/09/2017'. -""" - -import os -import luigi -import requests -import tempfile -import shutil - -from luigi.format import Gzip - -class RemoteFileTask(luigi.ExternalTask): - """ - Remote File Task - downloads to local tmp and returns LocalTarget - """ - - url = luigi.Parameter() - dest = os.path.join(tempfile.gettempdir(), 'gbif.zip') - - def run(self): - # Download file - r = requests.get(self.url, stream=True) - r.raise_for_status() - with open(self.dest, 'wb') as f: - shutil.copyfileobj(r.raw, f) - - def output(self): - """ - Return a local file object - @return: - """ - return luigi.LocalTarget(self.dest, Gzip) diff --git a/data_importer/tasks/gbif.py b/data_importer/tasks/gbif.py deleted file mode 100644 index 0c1df99..0000000 --- a/data_importer/tasks/gbif.py +++ /dev/null @@ -1,112 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '08/09/2017'. -""" - -import os -import csv -import luigi -import codecs -from luigi.contrib.postgres import CopyToTable - -from data_importer.lib.config import Config -from data_importer.tasks.file.remote import RemoteFileTask -from data_importer.tasks.file.local import LocalFileTask -from data_importer.lib.db import db_create_index - - -class GBIFTask(CopyToTable): - # Luigi Postgres database connections - host = Config.get('database', 'host') - database = Config.get('database', 'datastore_dbname') - user = Config.get('database', 'username') - password = Config.get('database', 'password') - table = 'gbif' - - columns = [ - ("ID", "INT"), - ("occurrenceID", "UUID"), - ("lastInterpreted", "TEXT"), # Possible date?? - ("issue", "TEXT"), - ("kingdom", "TEXT"), - ("kingdomKey", "INT"), - ("phylum", "TEXT"), - ("phylumKey", "INT"), - ("class", "TEXT"), - ("classKey", "INT"), - ("taxonOrder", "TEXT"), # Using just order breaks import - ("orderKey", "INT"), - ("family", "TEXT"), - ("familyKey", "INT"), - ("genus", "TEXT"), - ("genusKey", "INT"), - ("subgenus", "TEXT"), - ("subgenusKey", "INT"), - ("species", "TEXT"), - ("speciesKey", "INT"), - ("taxonRank", "TEXT"), - ("taxonKey", "INT"), - ("identifiedBy", "TEXT"), - ("scientificName", "TEXT"), - ("recordedBy", "TEXT"), - ("eventDate", "TEXT"), - ("recordNumber", "TEXT"), - ("continent", "TEXT"), - ("country", "TEXT"), - ("countryCode", "TEXT"), - ("stateProvince", "TEXT"), - ("habitat", "TEXT"), - ("islandGroup", "TEXT"), - ("decimalLongitude", "TEXT"), - ("decimalLatitude", "TEXT"), - ] - - def requires(self): - # TODO: New GBIF API does not include many of the IDs needed to link out - tmp use local file - # return RemoteFileTask('http://api.gbif.org/v1/occurrence/download/request/0005170-170826194755519.zip') - file_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'data', 'gbif.csv.gz') - return LocalFileTask(file_path) - - @staticmethod - def _get_value(col, col_type, line): - """ - Rewrite column name taxonOrder to order (order breaksSQL import) - @param col: - @return: - """ - col = 'order' if col == 'taxonOrder' else col - value = line.get(col, None) - # Coerce empty strings to None, empty strings break on integer fields - if not value: - value = None - - return value - - def rows(self): - """ - Implementation of CopyToTable.rows() - :return: - """ - # Populate row using the same order as columns - with self.input().open('r') as f: - csvfile = csv.DictReader(codecs.iterdecode(f, 'utf-8')) - for line in csvfile: - row = [self._get_value(col, col_type, line) for col, col_type in self.columns] - yield row - - def on_success(self): - """ - On completion, create indexes - @return: - """ - self.ensure_indexes() - - def ensure_indexes(self): - connection = self.output().connect() - db_create_index(self.table, 'occurrenceID', 'btree', connection) - connection.commit() - - -if __name__ == "__main__": - luigi.run(main_task_cls=GBIFTask) diff --git a/data_importer/tasks/indexlot.py b/data_importer/tasks/indexlot.py deleted file mode 100644 index 9a22b9a..0000000 --- a/data_importer/tasks/indexlot.py +++ /dev/null @@ -1,65 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '14/03/2017'. - - - -""" - -import luigi -from data_importer.tasks.dataset import DatasetTask -from data_importer.lib.field import Field -from data_importer.lib.foreign_key import ForeignKeyField -from data_importer.lib.config import Config - - -class IndexLotDatasetTask(DatasetTask): - - package_name = 'collection-indexlots' - package_description = "Index Lot records from the Natural History Museum's collection" - package_title = "Index Lot collection" - - resource_title = 'Index Lots' - resource_id = Config.get('resource_ids', 'indexlot') - resource_description = 'Species level record denoting the presence of a taxon in the Museum collection' - - record_types = ['Index Lot'] - - fields = DatasetTask.fields + [ - Field('ecatalogue', 'EntIndMaterial', 'material'), - Field('ecatalogue', 'EntIndType', 'type'), - Field('ecatalogue', 'EntIndMedia', 'media'), - Field('ecatalogue', 'EntIndBritish', 'british'), - Field('ecatalogue', 'EntIndKindOfMaterial', 'kindOfMaterial'), - Field('ecatalogue', 'EntIndKindOfMedia', 'kindOfMedia'), - # Material detail - Field('ecatalogue', 'EntIndCount', 'materialCount'), - Field('ecatalogue', 'EntIndSex', 'materialSex'), - Field('ecatalogue', 'EntIndStage', 'materialStage'), - Field('ecatalogue', 'EntIndTypes', 'materialTypes'), - Field('ecatalogue', 'EntIndPrimaryTypeNo', 'materialPrimaryTypeNumber'), - # Etaxonomy - Field('etaxonomy', 'ClaScientificNameBuilt', 'scientificName'), - Field('etaxonomy', 'ClaCurrentSciNameLocal', 'currentScientificName'), - Field('etaxonomy', 'ClaKingdom', 'kingdom'), - Field('etaxonomy', 'ClaPhylum', 'phylum'), - Field('etaxonomy', 'ClaClass', 'class'), - Field('etaxonomy', 'ClaOrder', 'order'), - Field('etaxonomy', 'ClaSuborder', 'suborder'), - Field('etaxonomy', 'ClaSuperfamily', 'superfamily'), - Field('etaxonomy', 'ClaFamily', 'family'), - Field('etaxonomy', 'ClaSubfamily', 'subfamily'), - Field('etaxonomy', 'ClaGenus', 'genus'), - Field('etaxonomy', 'ClaSubgenus', 'subgenus'), - Field('etaxonomy', 'ClaSpecies', 'specificEpithet'), - Field('etaxonomy', 'ClaSubspecies', 'infraspecificEpithet'), - Field('etaxonomy', 'ClaRank', 'taxonRank'), # NB: CKAN uses rank internally - ] - - foreign_keys = DatasetTask.foreign_keys + [ - ForeignKeyField('ecatalogue', 'etaxonomy', 'EntIndIndexLotTaxonNameLocalRef') - ] - -if __name__ == "__main__": - luigi.run(main_task_cls=IndexLotDatasetTask) diff --git a/data_importer/tasks/keemu/__init__.py b/data_importer/tasks/keemu/__init__.py deleted file mode 100644 index 076394e..0000000 --- a/data_importer/tasks/keemu/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '20/03/2017'. -""" \ No newline at end of file diff --git a/data_importer/tasks/keemu/base.py b/data_importer/tasks/keemu/base.py deleted file mode 100644 index d325c54..0000000 --- a/data_importer/tasks/keemu/base.py +++ /dev/null @@ -1,337 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '03/03/2017'. -""" - -import abc -import logging -from psycopg2.extras import Json as PGJson -import luigi -import time -from operator import is_not -from luigi.contrib.postgres import CopyToTable as LuigiCopyToTable -from data_importer.lib.db import db_table_exists, db_delete_record, db_create_index -from data_importer.lib.operators import is_not_one_of, is_uuid -from data_importer.tasks.file.keemu import KeemuFileTask -from data_importer.lib.parser import Parser -from data_importer.lib.config import Config -from data_importer.lib.column import Column -from data_importer.lib.filter import Filter - -from data_importer.lib.dataset import ( - dataset_get_foreign_keys, - dataset_get_properties -) - -from data_importer.lib.stats import ( - get_milestones -) - -logger = logging.getLogger('luigi-interface') - - -class KeemuBaseTask(LuigiCopyToTable): - """ - Extends CopyToTable to write directly to database using Upsert statements - """ - # Task parameters - date = luigi.IntParameter() - limit = luigi.IntParameter(default=None, significant=False) - - @abc.abstractproperty - def module_name(self): - """ - Name of the KE EMu module - set in ecatalogue etc., - Base classes deriving form this one - """ - return [] - - # Base postgres columns for importing a keemu module - # These can be extended by individual module task classes - columns = [ - Column("irn", "INTEGER PRIMARY KEY"), - Column("guid", "UUID", indexed=True), - Column("created", "TIMESTAMP DEFAULT CURRENT_TIMESTAMP"), - Column("modified", "TIMESTAMP"), - Column("deleted", "TIMESTAMP", indexed=True), - Column("properties", "JSONB", indexed=True), # The field for storing the record data - Column("import_date", "INTEGER", indexed=True), # Date of import - ] - - # Filters to apply against each record. - # If any return False, the record will not be imported - record_filters = [ - Filter('AdmPublishWebNoPasswordFlag', [ - (is_not_one_of, ['n', 'N']) - ]), - Filter('AdmGUIDPreferredValue', [ - (is_not, None), - is_uuid - ]), - ] - - # Luigi Postgres database connections - host = Config.get('database', 'host') - database = Config.get('database', 'datastore_dbname') - user = Config.get('database', 'username') - password = Config.get('database', 'password') - - @property - def table(self): - """ - Table name is just module name, but is required by LuigiCopyToTable - :return: string - """ - return self.module_name - - # Count total number of records (including skipped) - record_count = 0 - # Count of records inserted / written to CSV - insert_count = 0 - - def __init__(self, *args, **kwargs): - # Initiate a DB connection - super(KeemuBaseTask, self).__init__(*args, **kwargs) - self.connection = self.output().connect() - self.cursor = self.connection.cursor() - # Build a list of properties from the dataset fields - self.record_properties = dataset_get_properties(self.module_name) - # Get all foreign keys - self.foreign_keys = dataset_get_foreign_keys(self.module_name) - # Get current specimen record count - self.milestones = get_milestones(self.cursor) - - def create_table(self, connection): - """ - Create table for the ke emu module - This is an override of luigi.CopyToTable.create_table() to handle using - Column objects, not tuples - """ - - # Build a string of column definitions - coldefs = ','.join( - '{name} {type}'.format(name=col.field_name, type=col.field_type) for col in self.columns - ) - query = "CREATE TABLE {table} ({coldefs})".format(table=self.table, coldefs=coldefs) - connection.cursor().execute(query) - connection.commit() - - @property - def sql(self): - """ - SQL for insert / updates - Tries inserting, and on conflict performs update with modified date - :return: SQL - """ - # Get any extra fields defined in the base module class - # This uses a set comprehension converted into a list for deduping - extra_fields = list({col.field_name for col in self.columns if col not in KeemuBaseTask.columns}) - insert_fields = ['irn', 'guid', 'properties', 'import_date'] + extra_fields - update_fields = ['properties', 'import_date'] + extra_fields - sql = """ - INSERT INTO {table_name} ({insert_fields}, created) VALUES ({insert_fields_placeholders}, NOW()) - ON CONFLICT (irn) - DO UPDATE SET ({update_fields}, modified) = ({update_fields_placeholders}, NOW()) WHERE {table_name}.irn = %(irn)s RETURNING modified - """.format( - table_name=self.table, - insert_fields=','.join(insert_fields), - insert_fields_placeholders=','.join(map(lambda field: "%({0})s".format(field), insert_fields)), - update_fields=','.join(update_fields), - update_fields_placeholders=','.join(map(lambda field: "%({0})s".format(field), update_fields)), - ) - return sql - - def requires(self): - return KeemuFileTask( - file_name='{module_name}.export'.format(module_name=self.module_name), - date=self.date - ) - - def ensure_table(self): - connection = self.output().connect() - if not db_table_exists(self.table, connection): - self.create_table(self.connection) - # Create any foreign key tables - if self.foreign_keys: - for fk in self.foreign_keys: - fk.create_table(self.connection) - - def _apply_filters(self, record): - """ - Apply all the filters to determine if a record should be imported - Return True if it can be; return False if it should be skipped - @return: boolean - """ - for record_filter in self.record_filters: - if not record_filter.apply(record): - return False - return True - - def milestone_check(self, record_dict): - """ - Checks to see if this record reaches any of the defined milestones - :param record_dict: the current record - """ - for m in self.milestones: - m.check(record_dict) - - def run(self): - # Ensure table exists - self.ensure_table() - start_time = time.time() - - for record in self.records(): - self.insert_count += 1 - record_dict = self._record_to_dict(record) - - # Cast all dict objects to PGJson - for key in record_dict.keys(): - if type(record_dict[key]) is dict: - record_dict[key] = PGJson(record_dict[key]) - - # Insert the record - self.cursor.execute(self.sql, record_dict) - # The SQL upsert statement uses INSERT ON CONFLICT UPDATE... - # Which always returns INSERT 1 in status message - # So the UPDATE statement uses RETURNING modified - # If we have a modified date, this is an update; otherwise an insert - is_update = self.cursor.fetchone()[0] is not None - - if not is_update: - self.milestone_check(record_dict) - - # If we have foreign keys for this module, see if a foreign key - # has been defined - and if it has insert it - if self.foreign_keys: - for fk in self.foreign_keys: - # If this is an update (has modified date), delete all - # foreign key relations before reinserting - # Prevents relationships data from getting stale - if is_update: - fk.delete(self.cursor, record) - # If this record has a relationship, insert it - rel_irn = getattr(record, fk.field_name, None) - if rel_irn: - fk.insert(self.cursor, record, rel_irn) - - # mark as complete in same transaction - self.output().touch(self.connection) - # Create indexes now - slightly speeds up the process if it happens afterwards - self.ensure_indexes() - # And commit everything - self.connection.commit() - logger.info('Inserted %d %s records in %d seconds', self.insert_count, self.table, time.time() - start_time) - - @property - def file_input(self): - """ - Helper to get reference to file input - allows ecatalogue to override - @return: input file ref - """ - return self.input() - - def records(self): - for record in Parser(self.file_input.path): - # Iterate record counter, even if it gets filtered out - # makes debugging a bit simpler, as you can limit and test filters - self.record_count += 1 - if self._apply_filters(record): - yield record - # Record is being filtered out - # Before we continue, if the record has been marked as - # not web publishable, we try and delete it - elif not self._is_web_publishable(record): - self.delete_record(record) - - if self.limit and self.record_count >= self.limit: - break - - if self.record_count % 1000 == 0: - logger.debug('Record count: %d', self.record_count) - - def delete_record(self, record): - """ - Mark a record as deleted - :return: None - """ - db_delete_record(self.table, record.irn, self.cursor) - - def ensure_indexes(self): - for column in self.columns: - if column.indexed: - index_type = column.get_index_type() - db_create_index(self.table, column.field_name, index_type, self.connection) - - @staticmethod - def _is_web_publishable(record): - """ - Evaluate whether a record is importable - At the very least a record will need AdmPublishWebNoPasswordFlag set to Y, - Additional models will extend this to provide additional filters - :param record: - :return: boolean - false if not importable - """ - return record.AdmPublishWebNoPasswordFlag.lower() != 'n' - - def _record_to_dict(self, record): - """ - Convert record object to a dict. This function combines the outputs of - the _record_to_core_dict and _record_to_columns_dict functions. - :param record: - :return: - """ - record_dict = self._record_to_core_dict(record) - record_dict.update(self._record_to_columns_dict(record)) - return record_dict - - def _record_to_core_dict(self, record): - """ - Convert a record to a dictionary that only includes the core details. - :param record: - :return: - """ - return { - 'irn': record.irn, - 'guid': record.AdmGUIDPreferredValue, - 'properties': self._record_map_fields(record, self.record_properties), - 'import_date': int(self.date) - } - - def _record_to_columns_dict(self, record): - """ - Convert a record into a dict containing the column data. - :param record: - :return: - """ - column_dict = {} - # Loop through columns, adding any extra fields - # These need to be set even if null as they are part of the SQL statement - for column in self.columns: - if column.ke_field_name: - value = None - # Ke field name can be a list - if it is loop through all the values - if isinstance(column.ke_field_name, list): - for fn in column.ke_field_name: - value = getattr(record, fn, None) - # Once we have a value, break out of the loop - if value: - break - else: - value = getattr(record, column.ke_field_name, None) - - column_dict[column.field_name] = column.formatter(value) - - return column_dict - - @staticmethod - def _record_map_fields(record, fields): - """ - Helper function - pass in a list of tuples - (source field, destination field) - And return a dict of values keyed by destination field - :param record: - :param fields: - :return: - """ - return {f.field_alias: f.get_value(record) for f in fields if f.has_value(record)} diff --git a/data_importer/tasks/keemu/ecatalogue.py b/data_importer/tasks/keemu/ecatalogue.py deleted file mode 100644 index eb00035..0000000 --- a/data_importer/tasks/keemu/ecatalogue.py +++ /dev/null @@ -1,141 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '30/08/2017'. -""" - -import luigi -import logging - -from data_importer.tasks.keemu.base import KeemuBaseTask -from data_importer.tasks.keemu.etaxonomy import ETaxonomyTask -from data_importer.tasks.keemu.emultimedia import EMultimediaTask -from data_importer.lib.column import Column -from data_importer.lib.operators import is_one_of, is_not_one_of -from data_importer.lib.filter import Filter -from data_importer.lib.dataset import dataset_get_tasks -from data_importer.lib.db import db_view_exists - -logger = logging.getLogger('luigi-interface') - - -class EcatalogueTask(KeemuBaseTask): - """ - Task for importing the KE EMu ecatalogue module - """ - module_name = 'ecatalogue' - - # Additional columns for this module - columns = KeemuBaseTask.columns + [ - Column("record_type", "TEXT", "ColRecordType", indexed=True), - Column('embargo_date', "DATE", ["NhmSecEmbargoDate", "NhmSecEmbargoExtensionDate"], indexed=True), - ] - - @property - def record_filters(self): - """ - Table name is just module name, but is required by LuigiCopyToTable - :return: string - """ - return KeemuBaseTask.record_filters + [ - # Does this record have an excluded status - Stub etc., - Filter('SecRecordStatus', [ - (is_not_one_of, [ - "DELETE", - "DELETE-MERGED", - "DUPLICATION", - "Disposed of", - "FROZEN ARK", - "INVALID", - "POSSIBLE TYPE", - "PROBLEM", - "Re-registered in error", - "Reserved", - "Retired", - "Retired (see Notes)", - "SCAN_cat", - "See Notes", - "Specimen missing - see notes", - "Stub", - "Stub Record", - "Stub record" - ]) - ]), - # Make sure ecatalogue records are one of the record types - # used in one of the datasets - otherwise pulling in a load of cruft - Filter('ColRecordType', [ - (is_one_of, self.get_record_types()) - ]), - # Record must be in one of the known collection departments - # (Otherwise the home page breaks) - both Artefacts & Index Lots - # Have ColDepartment so this filter does not need to be more specific - Filter('ColDepartment', [ - (is_one_of, [ - "Botany", - "Entomology", - "Mineralogy", - "Palaeontology", - "Zoology" - ]) - ]), - ] - - @staticmethod - def get_record_types(): - """ - Loop through all of the datasets, and get all record types - @return: list of record types - """ - record_types = [] - for dataset_task in dataset_get_tasks(): - record_types += dataset_task.record_types - return record_types - - def requires(self): - return [ - super(EcatalogueTask, self).requires(), - ETaxonomyTask(date=self.date, limit=self.limit), - EMultimediaTask(date=self.date, limit=self.limit) - ] - - def on_success(self): - if not db_view_exists('_multimedia_view', self.connection): - logger.info('Creating multimedia view') - query = """ - CREATE MATERIALIZED VIEW _multimedia_view AS ( - SELECT - _ecatalogue__emultimedia.irn, - COALESCE(jsonb_agg( - jsonb_build_object('identifier', format('http://www.nhm.ac.uk/services/media-store/asset/%s/contents/preview', emultimedia.properties->>'assetID'), - 'type', 'StillImage', - 'license', 'http://creativecommons.org/licenses/by/4.0/', - 'rightsHolder', 'The Trustees of the Natural History Museum, London') || emultimedia.properties) - FILTER (WHERE emultimedia.irn IS NOT NULL), NULL)::TEXT as multimedia, - string_agg(DISTINCT emultimedia.properties->>'category', ';') as category FROM emultimedia - INNER JOIN _ecatalogue__emultimedia ON _ecatalogue__emultimedia.rel_irn = emultimedia.irn WHERE (embargo_date IS NULL OR embargo_date < NOW()) AND deleted IS NULL AND pending IS FALSE - GROUP BY _ecatalogue__emultimedia.irn); CREATE UNIQUE INDEX ON _multimedia_view (irn); - """ - else: - logger.info('Refreshing multimedia view') - query = """ - REFRESH MATERIALIZED VIEW _multimedia_view - """ - - self.connection.cursor().execute(query) - self.connection.commit() - self.connection.close() - - @property - def file_input(self): - """ - Ecatalogue has multiple requirements - loop through and return - The file task input - @return: input file ref - """ - for i in self.input(): - if hasattr(i, 'path'): - return i - - -if __name__ == "__main__": - luigi.run(main_task_cls=EcatalogueTask) diff --git a/data_importer/tasks/keemu/emultimedia.py b/data_importer/tasks/keemu/emultimedia.py deleted file mode 100644 index 2e04c05..0000000 --- a/data_importer/tasks/keemu/emultimedia.py +++ /dev/null @@ -1,40 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '30/08/2017'. -""" - -import luigi -from data_importer.lib.column import Column -from data_importer.lib.filter import Filter -from data_importer.tasks.keemu.base import KeemuBaseTask -from operator import is_not - - -class EMultimediaTask(KeemuBaseTask): - """ - Task for importing the KE EMu ecatalogue module - """ - module_name = 'emultimedia' - - # Additional columns for emultimedia module - add embargo date - columns = KeemuBaseTask.columns + [ - Column('embargo_date', "DATE", ["NhmSecEmbargoDate", "NhmSecEmbargoExtensionDate"], True), - # note the use of the formatter parameter to turn the value into a bool - Column('pending', 'BOOLEAN', 'GenDigitalMediaId', True, lambda g: g == 'Pending') - ] - - # Apply filters to each record, and do not import if any fail - record_filters = KeemuBaseTask.record_filters + [ - Filter('GenDigitalMediaId', [ - (is_not, None) - ]), - ] - - # Ensure emultimedia tasks runs before FileTask as both are - # requirements of EcatalogueTask - priority = 100 - - -if __name__ == "__main__": - luigi.run(main_task_cls=EMultimediaTask) diff --git a/data_importer/tasks/keemu/etaxonomy.py b/data_importer/tasks/keemu/etaxonomy.py deleted file mode 100644 index 61403d5..0000000 --- a/data_importer/tasks/keemu/etaxonomy.py +++ /dev/null @@ -1,47 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '30/08/2017'. -""" - -import luigi -from data_importer.lib.filter import Filter -from data_importer.lib.operators import is_uuid -from data_importer.tasks.keemu.base import KeemuBaseTask - - -class ETaxonomyTask(KeemuBaseTask): - """ - Task for importing the KE EMu etaxonomy module - """ - module_name = 'etaxonomy' - - # Ensure etaxonomy tasks runs before FileTask as both are - # requirements of EcatalogueTask - priority = 100 - - # use all the same filters as the base task but exclude the guid filter - record_filters = [f for f in KeemuBaseTask.record_filters - if f.field_name != 'AdmGUIDPreferredValue'] - # add a guid filter which allows None guids or if a guid exists, forces it - # to meet the is_uuid standard - record_filters.append(Filter('AdmGUIDPreferredValue', - [lambda g: g is None or is_uuid(g)])) - - def _record_to_core_dict(self, record): - """ - Convert record object to the core dict data allowing the GUID to be None - :param record: - :return: - """ - return { - 'irn': record.irn, - # get the guid or None - 'guid': getattr(record, 'AdmGUIDPreferredValue', None), - 'properties': self._record_map_fields(record, self.record_properties), - 'import_date': int(self.date) - } - - -if __name__ == "__main__": - luigi.run(main_task_cls=ETaxonomyTask) diff --git a/data_importer/tasks/postgres/__init__.py b/data_importer/tasks/postgres/__init__.py deleted file mode 100644 index d099a52..0000000 --- a/data_importer/tasks/postgres/__init__.py +++ /dev/null @@ -1,36 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '19/09/2017'. -""" - -import luigi -from luigi.contrib.postgres import PostgresTarget -from data_importer.lib.config import Config - - -class PostgresTask(luigi.Task): - """ - Basic Luigi task for setting up a connection to Postgres - """ - - # Luigi Postgres database connections - host = Config.get('database', 'host') - database = Config.get('database', 'datastore_dbname') - user = Config.get('database', 'username') - password = Config.get('database', 'password') - - def output(self): - """ - Returns a PostgresTarget representing the inserted dataset. - - Normally you don't override this. - """ - return PostgresTarget( - host=self.host, - database=self.database, - user=self.user, - password=self.password, - table=self.table, - update_id=self.task_id - ) \ No newline at end of file diff --git a/data_importer/tasks/solr/__init__.py b/data_importer/tasks/solr/__init__.py deleted file mode 100644 index 58c97df..0000000 --- a/data_importer/tasks/solr/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '04/09/2017'. -""" \ No newline at end of file diff --git a/data_importer/tasks/solr/index.py b/data_importer/tasks/solr/index.py deleted file mode 100644 index 528f8d1..0000000 --- a/data_importer/tasks/solr/index.py +++ /dev/null @@ -1,91 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '04/09/2017'. -""" - -import json -import luigi -import time -import logging -from datetime import datetime - -from data_importer.lib.solr_index import SolrIndex -from data_importer.lib.config import Config -from data_importer.tasks.postgres import PostgresTask -from data_importer.lib.db import db_table_exists - -logger = logging.getLogger('luigi-interface') - - -class SolrIndexTask(PostgresTask): - # Name of the solr core - e.g. collection-specimens - core = luigi.Parameter() - full_import = luigi.BoolParameter() - - # Interval to wait before checking import has completed - sleep_interval = 2 - - # Run after all tasks - priority = -1 - - table = 'ecatalogue' - - def __init__(self, *args, **kwargs): - super(SolrIndexTask, self).__init__(*args, **kwargs) - solr_hosts = json.loads(Config.get('solr', 'hosts')) - self.indexes = [] - for solr_host in solr_hosts: - self.indexes.append(SolrIndex(solr_host, self.core)) - - @staticmethod - def _last_import_date(solr_index): - r = solr_index.status() - for i in ['Full Dump Started', 'Delta Dump started']: - d = r['statusMessages'].get(i, None) - if d: - return datetime.strptime(d, '%Y-%m-%d %H:%M:%S') - return None - - def complete(self): - """ - Completion is based on whether indexing has run today, and indexed at least one document - @return: - """ - connection = self.output().connect() - if not db_table_exists(self.table, connection): - return False - - cursor = connection.cursor() - query = "SELECT 1 from {table} WHERE created>%s OR modified>%s LIMIT 1".format( - table=self.table - ) - - for solr_index in self.indexes: - last_import = self._last_import_date(solr_index) - if not last_import: - return False - # Do we have new records since last index operation ran - cursor.execute(query, (last_import, last_import)) - if cursor.fetchone(): - return False - - return True - - def run(self): - for solr_index in self.indexes: - # We always call delta import - if this is the first run won't make any difference - if self.full_import: - solr_index.full_import() - else: - solr_index.delta_import() - while True: - r = solr_index.status() - if r['status'] == 'busy': - logger.info('Total Rows Fetched: %s', r['statusMessages'].get('Total Rows Fetched')) - logger.info('Time Elapsed: %s', r['statusMessages'].get('Time Elapsed')) - time.sleep(self.sleep_interval) - else: - logger.info('Total Rows: %s', r['statusMessages'].get('Total Rows Fetched')) - logger.info('Time taken: %s', r['statusMessages'].get('Time taken')) - break diff --git a/data_importer/tasks/specimen.py b/data_importer/tasks/specimen.py deleted file mode 100644 index 74ed4c2..0000000 --- a/data_importer/tasks/specimen.py +++ /dev/null @@ -1,249 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '14/03/2017'. - - - -""" - -import luigi -from operator import is_not -from data_importer.tasks.dataset import DatasetTask -from data_importer.lib.operators import is_one_of, is_not_one_of -from data_importer.lib.field import Field -from data_importer.lib.foreign_key import ForeignKeyField -from data_importer.lib.filter import Filter -from data_importer.lib.config import Config - - -class SpecimenDatasetTask(DatasetTask): - package_name = 'collection-specimens' - package_description = "Specimen records from the Natural History Museum\'s collection" - package_title = "Collection specimens" - - resource_title = 'Specimen records' - resource_id = Config.get('resource_ids', 'specimen') - resource_description = 'Specimen records' - resource_type = 'dwc' # Darwin Core - - record_types = [ - 'Specimen', - 'Specimen - single', - "Specimen - multiple", - 'DNA Prep', - 'Mammal Group Parent', # Not included in the actual output; added to part records - 'Mammal Group Part', - 'Bryozoa Part Specimen', - 'Silica Gel Specimen', - "Parasite Card", - ] - - fields = DatasetTask.fields + [ - # Record numbers & metadata - # Use RegRegistrationNumber if DarCatalogueNumber is empty - Field('ecatalogue', ['DarCatalogNumber', 'RegRegistrationNumber'], 'catalogNumber'), - Field('ecatalogue', 'irn', 'otherCatalogNumbers', lambda irn: 'NHMUK:ecatalogue:%s' % irn), - Field('ecatalogue', 'ColDepartment', 'collectionCode', lambda dept: 'BMNH(E)' if dept == 'Entomology' else dept[:3].upper()), - - # Taxonomy - Field('ecatalogue', 'DarScientificName', 'scientificName'), - # Rather than using the two darwin core fields DarScientificNameAuthorYear and ScientificNameAuthor - # It's easier to just use IdeFiledAsAuthors which has them both concatenated - Field('ecatalogue', 'IdeFiledAsAuthors', 'scientificNameAuthorship'), - # If DarTypeStatus is empty, we'll use sumTypeStatus which has previous determinations - Field('ecatalogue', ['DarTypeStatus', 'sumTypeStatus'], 'typeStatus'), - Field('ecatalogue', 'DarKingdom', 'kingdom'), - Field('ecatalogue', 'DarPhylum', 'phylum'), - Field('ecatalogue', 'DarClass', 'class'), - Field('ecatalogue', 'DarOrder', 'order'), - Field('ecatalogue', 'DarFamily', 'family'), - Field('ecatalogue', 'DarGenus', 'genus'), - Field('ecatalogue', 'DarSubgenus', 'subgenus'), - Field('ecatalogue', 'DarSpecies', 'specificEpithet'), - Field('ecatalogue', 'DarSubspecies', 'infraspecificEpithet'), - Field('ecatalogue', 'DarHigherTaxon', 'higherClassification'), - Field('ecatalogue', 'DarInfraspecificRank', 'taxonRank'), - - # Location - # Use nearest name place rather than precise locality https://github', 'com/NaturalHistoryMuseum/ke2mongo/issues/29 - Field('ecatalogue', ['PalNearestNamedPlaceLocal', 'sumPreciseLocation', 'MinNhmVerbatimLocalityLocal'], 'locality'), - Field('ecatalogue', 'DarCountry', 'country'), - Field('ecatalogue', 'DarWaterBody', 'waterBody'), - Field('ecatalogue', ['EntLocExpeditionNameLocal', 'CollEventExpeditionName'], 'expedition'), - Field('ecatalogue', 'CollEventVesselName', 'vessel'), - Field('ecatalogue', ['DarCollector', 'CollEventNameSummaryData'], 'recordedBy'), - Field('ecatalogue', 'CollEventCollectionMethod', 'samplingProtocol'), - Field('ecatalogue', 'DarFieldNumber', 'fieldNumber'), - Field('ecatalogue', 'DarStateProvince', 'stateProvince'), - Field('ecatalogue', 'DarContinent', 'continent'), - Field('ecatalogue', 'DarIsland', 'island'), - Field('ecatalogue', 'DarIslandGroup', 'islandGroup'), - Field('ecatalogue', 'DarHigherGeography', 'higherGeography'), - Field('ecatalogue', 'ColHabitatVerbatim', 'habitat'), - Field('ecatalogue', 'DarDecimalLongitude', 'decimalLongitude'), - Field('ecatalogue', 'DarDecimalLatitude', 'decimalLatitude'), - Field('ecatalogue', 'sumPreferredCentroidLongitude', 'verbatimLongitude'), - Field('ecatalogue', 'sumPreferredCentroidLatitude', 'verbatimLatitude'), - # FIXME: sumPreferredCentroidLatDec is populated, even when lat/lng is not centroid! - # Field('ecatalogue', 'sumPreferredCentroidLatDec', 'centroid', lambda v: True if v else False), - Field('ecatalogue', 'DarGeodeticDatum', 'geodeticDatum'), - Field('ecatalogue', 'DarGeorefMethod', 'georeferenceProtocol'), - # Occurrence - Field('ecatalogue', 'DarMinimumElevationInMeters', 'minimumElevationInMeters'), - Field('ecatalogue', 'DarMaximumElevationInMeters', 'maximumElevationInMeters'), - Field('ecatalogue', ['DarMinimumDepthInMeters', 'CollEventFromMetres'], 'minimumDepthInMeters'), - Field('ecatalogue', ['DarMaximumDepthInMeters', 'CollEventToMetres'], 'maximumDepthInMeters'), - Field('ecatalogue', 'DarCollectorNumber', 'recordNumber'), - Field('ecatalogue', 'DarIndividualCount', 'individualCount'), - # Parasite cards use a different field for life stage - Field('ecatalogue', ['DarLifeStage', 'CardParasiteStage'], 'lifeStage'), - Field('ecatalogue', 'DarSex', 'sex'), - Field('ecatalogue', 'DarPreparations', 'preparations'), - # Identification - Field('ecatalogue', 'DarIdentifiedBy', 'identifiedBy'), - # KE Emu has 3 fields for identification date: DarDayIdentified, DarMonthIdentified and DarYearIdentified - # But EntIdeDateIdentified holds them all - which is what we want for dateIdentified - Field('ecatalogue', 'EntIdeDateIdentified', 'dateIdentified'), - Field('ecatalogue', 'DarIdentificationQualifier', 'identificationQualifier'), - Field('ecatalogue', 'DarTimeOfDay', 'eventTime'), - Field('ecatalogue', 'DarDayCollected', 'day'), - Field('ecatalogue', 'DarMonthCollected', 'month'), - Field('ecatalogue', 'DarYearCollected', 'year'), - # Geology - Field('ecatalogue', 'DarEarliestEon', 'earliestEonOrLowestEonothem'), - Field('ecatalogue', 'DarLatestEon', 'latestEonOrHighestEonothem'), - Field('ecatalogue', 'DarEarliestEra', 'earliestEraOrLowestErathem'), - Field('ecatalogue', 'DarLatestEra', 'latestEraOrHighestErathem'), - Field('ecatalogue', 'DarEarliestPeriod', 'earliestPeriodOrLowestSystem'), - Field('ecatalogue', 'DarLatestPeriod', 'latestPeriodOrHighestSystem'), - Field('ecatalogue', 'DarEarliestEpoch', 'earliestEpochOrLowestSeries'), - Field('ecatalogue', 'DarLatestEpoch', 'latestEpochOrHighestSeries'), - Field('ecatalogue', 'DarEarliestAge', 'earliestAgeOrLowestStage'), - Field('ecatalogue', 'DarLatestAge', 'latestAgeOrHighestStage'), - Field('ecatalogue', 'DarLowestBiostrat', 'lowestBiostratigraphicZone'), - Field('ecatalogue', 'DarHighestBiostrat', 'highestBiostratigraphicZone'), - Field('ecatalogue', 'DarGroup', 'group'), - Field('ecatalogue', 'DarFormation', 'formation'), - Field('ecatalogue', 'DarMember', 'member'), - Field('ecatalogue', 'DarBed', 'bed'), - # These fields do not map to DwC, but are still very useful - Field('ecatalogue', 'ColSubDepartment', 'subDepartment'), - Field('ecatalogue', 'PrtType', 'partType'), - Field('ecatalogue', 'RegCode', 'registrationCode'), - Field('ecatalogue', 'CatKindOfObject', 'kindOfObject'), - Field('ecatalogue', 'CatKindOfCollection', 'kindOfCollection'), - Field('ecatalogue', ['CatPreservative', 'EntCatPreservation'], 'preservative'), - Field('ecatalogue', 'ColKind', 'collectionKind'), - Field('ecatalogue', 'EntPriCollectionName', 'collectionName'), - Field('ecatalogue', 'PalAcqAccLotDonorFullName', 'donorName'), - Field('ecatalogue', 'DarPreparationType', 'preparationType'), - Field('ecatalogue', 'DarObservedWeight', 'observedWeight'), - # Location - # Data is stored in sumViceCountry field in ecatalogue data - but actually this - # should be viceCountry Field(which it is in esites) - Field('ecatalogue', 'sumViceCountry', 'viceCounty'), - # DNA - Field('ecatalogue', 'DnaExtractionMethod', 'extractionMethod'), - Field('ecatalogue', 'DnaReSuspendedIn', 'resuspendedIn'), - Field('ecatalogue', 'DnaTotalVolume', 'totalVolume'), - # Parasite card - Field('ecatalogue', ['EntCatBarcode', 'CardBarcode'], 'barcode'), - # Egg - Field('ecatalogue', 'EggClutchSize', 'clutchSize'), - Field('ecatalogue', 'EggSetMark', 'setMark'), - # Nest - Field('ecatalogue', 'NesShape', 'nestShape'), - Field('ecatalogue', 'NesSite', 'nestSite'), - # Silica gel - Field('ecatalogue', 'SilPopulationCode', 'populationCode'), - # Botany - Field('ecatalogue', 'CollExsiccati', 'exsiccata'), - Field('ecatalogue', 'ColExsiccatiNumber', 'exsiccataNumber'), - Field('ecatalogue', 'ColSiteDescription', 'labelLocality'), - Field('ecatalogue', 'ColPlantDescription', 'plantDescription'), - # Paleo - Field('ecatalogue', 'PalDesDescription', 'catalogueDescription'), - Field('ecatalogue', 'PalStrChronostratLocal', 'chronostratigraphy'), - Field('ecatalogue', 'PalStrLithostratLocal', 'lithostratigraphy'), - # Mineralogy - Field('ecatalogue', 'MinDateRegistered', 'dateRegistered'), - Field('ecatalogue', 'MinIdentificationAsRegistered', 'identificationAsRegistered'), - Field('ecatalogue', 'MinIdentificationDescription', 'identificationDescription'), - Field('ecatalogue', 'MinPetOccurance', 'occurrence'), - Field('ecatalogue', 'MinOreCommodity', 'commodity'), - Field('ecatalogue', 'MinOreDepositType', 'depositType'), - Field('ecatalogue', 'MinTextureStructure', 'texture'), - Field('ecatalogue', 'MinIdentificationVariety', 'identificationVariety'), - Field('ecatalogue', 'MinIdentificationOther', 'identificationOther'), - Field('ecatalogue', 'MinHostRock', 'hostRock'), - Field('ecatalogue', 'MinAgeDataAge', 'age'), - Field('ecatalogue', 'MinAgeDataType', 'ageType'), - # Mineralogy location - Field('ecatalogue', 'MinNhmTectonicProvinceLocal', 'tectonicProvince'), - Field('ecatalogue', 'MinNhmStandardMineLocal', 'mine'), - Field('ecatalogue', 'MinNhmMiningDistrictLocal', 'miningDistrict'), - Field('ecatalogue', 'MinNhmComplexLocal', 'mineralComplex'), - Field('ecatalogue', 'MinNhmRegionLocal', 'geologyRegion'), - # Meteorite - Field('ecatalogue', 'MinMetType', 'meteoriteType'), - Field('ecatalogue', 'MinMetGroup', 'meteoriteGroup'), - Field('ecatalogue', 'MinMetChondriteAchondrite', 'chondriteAchondrite'), - Field('ecatalogue', 'MinMetClass', 'meteoriteClass'), - Field('ecatalogue', 'MinMetPetType', 'petrologyType'), - Field('ecatalogue', 'MinMetPetSubtype', 'petrologySubtype'), - Field('ecatalogue', 'MinMetRecoveryFindFall', 'recovery'), - Field('ecatalogue', 'MinMetRecoveryDate', 'recoveryDate'), - Field('ecatalogue', 'MinMetRecoveryWeight', 'recoveryWeight'), - Field('ecatalogue', 'MinMetWeightAsRegistered', 'registeredWeight'), - Field('ecatalogue', 'MinMetWeightAsRegisteredUnit', 'registeredWeightUnit'), - # Determinations - Field('ecatalogue', 'IdeCitationTypeStatus', 'determinationTypes'), - Field('ecatalogue', 'EntIdeScientificNameLocal', 'determinationNames'), - Field('ecatalogue', 'EntIdeFiledAs', 'determinationFiledAs'), - # Project - Field('ecatalogue', 'NhmSecProjectName', 'project'), - ] - - foreign_keys = DatasetTask.foreign_keys + [ - ForeignKeyField('ecatalogue', 'ecatalogue', 'RegRegistrationParentRef', join_alias='parent_catalogue'), - # Card parasite ref field is empty?? - ForeignKeyField('ecatalogue', 'etaxonomy', 'CardParasiteRef', record_type='Parasite Card'), - ] - - def create_table(self, connection): - """ - For the windshaft map, we need a geospatial table - @param connection: - @return: - """ - query = """ - CREATE TABLE "{table}" AS (SELECT ecatalogue.irn as _id, - cast(ecatalogue.properties->>'decimalLatitude' as FLOAT8) as "decimalLatitude", - cast(ecatalogue.properties->>'decimalLongitude' as FLOAT8) as "decimalLongitude", - st_setsrid(st_makepoint( - cast(ecatalogue.properties->>'decimalLongitude' as FLOAT8), - cast(ecatalogue.properties->>'decimalLatitude' as FLOAT8) - ), 4326) as _geom, - st_transform( - st_setsrid( - st_makepoint( - cast(ecatalogue.properties->>'decimalLongitude' as FLOAT8), - cast(ecatalogue.properties->>'decimalLatitude' as FLOAT8) - ), - 4326), - 3857) as _the_geom_webmercator - FROM ecatalogue - WHERE ecatalogue.properties->>'decimalLatitude' ~ '^-?[0-9\.]+$' - AND cast(ecatalogue.properties->>'decimalLatitude' as FLOAT8) > -90 - AND cast(ecatalogue.properties->>'decimalLatitude' as FLOAT8) < 90 - AND ecatalogue.properties->>'decimalLongitude' ~ '^-?[0-9\.]+$' - AND cast(ecatalogue.properties->>'decimalLongitude' as FLOAT8) >= -180 - AND cast(ecatalogue.properties->>'decimalLongitude' as FLOAT8) <= 180 - ) - """.format(table=self.table) - connection.cursor().execute(query) - connection.commit() - -if __name__ == "__main__": - luigi.run(main_task_cls=SpecimenDatasetTask) diff --git a/data_importer/tests/README.md b/data_importer/tests/README.md deleted file mode 100644 index 7eadd4b..0000000 --- a/data_importer/tests/README.md +++ /dev/null @@ -1,6 +0,0 @@ - - - -python -m unittest discover - -python test_delete.py \ No newline at end of file diff --git a/data_importer/tests/__init__.py b/data_importer/tests/__init__.py deleted file mode 100644 index 65d6186..0000000 --- a/data_importer/tests/__init__.py +++ /dev/null @@ -1,85 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '31/03/2017'. -""" - -import os -import luigi -import unittest -import psycopg2 - -from data_importer.lib.config import Config -from data_importer.lib.helpers import get_dataset_tasks, list_all_modules, get_file_export_dates -from data_importer.tasks.delete import DeleteTask -from data_importer.tests.test_task import TestTask - - -class BaseTestCase(unittest.TestCase): - - connection = None - cursor = None - - @classmethod - def setUpClass(cls): - cls.connection = psycopg2.connect( - host=Config.get('database', 'host'), - port=Config.get('database', 'port'), - database=Config.get('database', 'datastore_dbname'), - user=Config.get('database', 'username'), - password=Config.get('database', 'password') - ) - cls.cursor = cls.connection.cursor(cursor_factory=psycopg2.extras.DictCursor) - # Run test tasks for all the export dates - for export_date in get_file_export_dates(): - luigi.build([TestTask(date=int(export_date))], local_scheduler=True) - - def _get_record(self, table_name, **kwargs): - """ - Retrieve a record, building where clause from kwargs - :param table_name: - :param kwargs: - :return: - """ - sql = 'SELECT * FROM "{}" WHERE {}'.format( - table_name, - ' AND '.join(['{} = %s'.format(k) for k in kwargs.keys()]) - ) - self.cursor.execute(sql, list(kwargs.values())) - return self.cursor.fetchone() - - def _get_dataset_record(self, resource_name, irn): - table_name = Config.get('resource_ids', resource_name) - return self._get_record(table_name, _id=irn) - - def assertRecordExists(self, table_name, irn): - record = self._get_record(table_name, irn=irn) - self.assertEqual(irn, record['irn']) - - def assertDatasetRecordExists(self, dataset_name, irn): - record = self._get_dataset_record(dataset_name, irn) - self.assertEqual(irn, record['_id']) - - def assertRecordDoesNotExist(self, table_name, irn): - record = self._get_record(table_name, irn=irn) - self.assertIsNone(record) - - def assertDatasetRecordDoesNotExist(self, dataset_name, irn): - record = self._get_dataset_record(dataset_name, irn) - self.assertIsNone(record) - - def assertRecordIsDeleted(self, table_name, irn): - record = self._get_record(table_name, irn=irn) - self.assertIsNotNone(record['deleted']) - - @classmethod - def tearDownClass(cls): - # Delete all table updates - cls.cursor.execute('DELETE FROM table_updates') - # Delete all info in the module tables - for module_name in list_all_modules(): - cls.cursor.execute('DELETE FROM "{module_name}"'.format( - module_name=module_name - )) - cls.connection.commit() - cls.connection.close() diff --git a/data_importer/tests/data/eaudit.deleted-export.20170100.gz b/data_importer/tests/data/eaudit.deleted-export.20170100.gz deleted file mode 100644 index 029a2db..0000000 Binary files a/data_importer/tests/data/eaudit.deleted-export.20170100.gz and /dev/null differ diff --git a/data_importer/tests/data/eaudit.deleted-export.20170101.gz b/data_importer/tests/data/eaudit.deleted-export.20170101.gz deleted file mode 100644 index ad56a8e..0000000 Binary files a/data_importer/tests/data/eaudit.deleted-export.20170101.gz and /dev/null differ diff --git a/data_importer/tests/data/ecatalogue.export.20170100.gz b/data_importer/tests/data/ecatalogue.export.20170100.gz deleted file mode 100644 index cb851d0..0000000 Binary files a/data_importer/tests/data/ecatalogue.export.20170100.gz and /dev/null differ diff --git a/data_importer/tests/data/ecatalogue.export.20170101.gz b/data_importer/tests/data/ecatalogue.export.20170101.gz deleted file mode 100644 index 9334e8b..0000000 Binary files a/data_importer/tests/data/ecatalogue.export.20170101.gz and /dev/null differ diff --git a/data_importer/tests/data/emultimedia.export.20170100.gz b/data_importer/tests/data/emultimedia.export.20170100.gz deleted file mode 100644 index 0195767..0000000 Binary files a/data_importer/tests/data/emultimedia.export.20170100.gz and /dev/null differ diff --git a/data_importer/tests/data/emultimedia.export.20170101.gz b/data_importer/tests/data/emultimedia.export.20170101.gz deleted file mode 100644 index ac1eefe..0000000 Binary files a/data_importer/tests/data/emultimedia.export.20170101.gz and /dev/null differ diff --git a/data_importer/tests/data/etaxonomy.export.20170100.gz b/data_importer/tests/data/etaxonomy.export.20170100.gz deleted file mode 100644 index a82d607..0000000 Binary files a/data_importer/tests/data/etaxonomy.export.20170100.gz and /dev/null differ diff --git a/data_importer/tests/data/etaxonomy.export.20170101.gz b/data_importer/tests/data/etaxonomy.export.20170101.gz deleted file mode 100644 index 313a9ad..0000000 Binary files a/data_importer/tests/data/etaxonomy.export.20170101.gz and /dev/null differ diff --git a/data_importer/tests/test_delete.py b/data_importer/tests/test_delete.py deleted file mode 100644 index 9fada74..0000000 --- a/data_importer/tests/test_delete.py +++ /dev/null @@ -1,42 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '31/03/2017'. -""" - -import unittest - -from data_importer.tests import BaseTestCase - - -class TestDelete(BaseTestCase): - - def test_deleted_specimen_is_removed(self): - irn = 100 - self.assertRecordIsDeleted('ecatalogue', irn) - self.assertDatasetRecordDoesNotExist('specimen', irn) - - def test_deleted_multimedia_is_removed(self): - irn = 100 - self.assertRecordIsDeleted('emultimedia', irn) - - def test_deleted_multimedia_is_removed_from_specimen(self): - irn = 16 - record = self._get_dataset_record('specimen', irn) - # Check the specimen does not have multimedia record - self.assertIsNone(record['associatedMedia']) - - def test_deleted_taxonomy_is_removed(self): - irn = 100 - self.assertRecordIsDeleted('etaxonomy', irn) - - def test_deleted_taxonomy_is_removed_from_indexlot(self): - irn = 18 - record = self._get_dataset_record('indexlot', irn) - # Check the specimen does not have a scientific name - # As the corresponding taxonomy record has been deleted - self.assertIsNone(record['properties'].get('scientificName', None)) - - -if __name__ == '__main__': - unittest.main() diff --git a/data_importer/tests/test_embargo.py b/data_importer/tests/test_embargo.py deleted file mode 100644 index a9d5432..0000000 --- a/data_importer/tests/test_embargo.py +++ /dev/null @@ -1,42 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '31/03/2017'. -""" - -import unittest - - -from data_importer.tests import BaseTestCase - - -class TestEmbargo(BaseTestCase): - - def test_embargoed_specimen_exists_in_ecatalogue(self): - irn = 13 - self.assertRecordExists('ecatalogue', irn) - - def test_embargoed_specimen_is_not_in_dataset(self): - irn = 13 - self.assertDatasetRecordDoesNotExist('specimen', irn) - - def test_embargoed_by_extension_specimen_is_imported_but_not_released_in_dataset(self): - irn = 14 - self.assertRecordExists('ecatalogue', irn) - self.assertDatasetRecordDoesNotExist('specimen', irn) - - def test_expired_embargoed_specimen_is_released_in_dataset(self): - irn = 15 - self.assertRecordExists('ecatalogue', irn) - self.assertDatasetRecordExists('specimen', irn) - - def test_embargoed_multimedia_record_has_embargoed_date(self): - record = self._get_record('emultimedia', irn=3) - self.assertIsNotNone(record['embargo_date']) - - def test_specimen_record_does_not_use_embargoed_multimedia(self): - record = self._get_dataset_record('specimen', irn=20) - self.assertIsNone(record['associatedMedia']) - -if __name__ == '__main__': - unittest.main() diff --git a/data_importer/tests/test_filters.py b/data_importer/tests/test_filters.py deleted file mode 100644 index 413bfa7..0000000 --- a/data_importer/tests/test_filters.py +++ /dev/null @@ -1,64 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '31/03/2017'. -""" - -import unittest -from data_importer.tests import BaseTestCase - - -class TestFilters(BaseTestCase): - """ - Tests for checking imports work correctly - """ - def test_specimen_record_with_invalid_collection_department_is_not_imported(self): - irn = 4 - self.assertRecordDoesNotExist('ecatalogue', irn) - self.assertDatasetRecordDoesNotExist('specimen', irn) - - def test_specimen_record_with_invalid_record_type_is_not_imported(self): - irn = 5 - self.assertRecordDoesNotExist('ecatalogue', irn) - self.assertDatasetRecordDoesNotExist('specimen', irn) - - def test_specimen_record_with_stub_record_status_is_not_imported(self): - irn = 6 - self.assertRecordDoesNotExist('ecatalogue', irn) - self.assertDatasetRecordDoesNotExist('specimen', irn) - - def test_specimen_record_with_missing_guid_is_not_imported(self): - irn = 7 - self.assertRecordDoesNotExist('ecatalogue', irn) - self.assertDatasetRecordDoesNotExist('specimen', irn) - - def test_non_web_publishable_specimen_record_is_not_imported(self): - irn = 8 - self.assertRecordDoesNotExist('ecatalogue', irn) - self.assertDatasetRecordDoesNotExist('specimen', irn) - - def test_indexlot_record_with_missing_guid_is_not_imported(self): - irn = 9 - self.assertRecordDoesNotExist('ecatalogue', irn) - self.assertDatasetRecordDoesNotExist('indexlot', irn) - - def test_indexlot_record_with_inactive_record_status_is_not_imported(self): - irn = 10 - self.assertRecordDoesNotExist('ecatalogue', irn) - self.assertDatasetRecordDoesNotExist('indexlot', irn) - - def test_non_web_publishable_indexlot_record_is_not_imported(self): - irn = 11 - self.assertRecordDoesNotExist('ecatalogue', irn) - self.assertDatasetRecordDoesNotExist('indexlot', irn) - - def test_non_web_publishable_multimedia_record_is_not_imported(self): - irn = 2 - self.assertRecordDoesNotExist('emultimedia', irn) - - def test_non_web_publishable_taxonomy_record_is_not_imported(self): - irn = 2 - self.assertRecordDoesNotExist('etaxonomy', irn) - -if __name__ == '__main__': - unittest.main() diff --git a/data_importer/tests/test_import.py b/data_importer/tests/test_import.py deleted file mode 100644 index 95995b9..0000000 --- a/data_importer/tests/test_import.py +++ /dev/null @@ -1,51 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '31/03/2017'. -""" - -import unittest -from data_importer.tests import BaseTestCase - - -class TestImport(BaseTestCase): - """ - Tests for checking imports work correctly - """ - def test_specimen_record_exists_in_ecatalogue(self): - self.assertRecordExists('ecatalogue', 1) - - def test_specimen_record_exists_in_dataset(self): - self.assertDatasetRecordExists('specimen', 1) - - def test_artefact_record_exists_in_ecatalogue(self): - self.assertRecordExists('ecatalogue', 2) - - def test_artefact_record_exists_in_dataset(self): - self.assertDatasetRecordExists('artefact', 2) - - def test_indexlot_record_exists_in_ecatalogue(self): - self.assertRecordExists('ecatalogue', 3) - - def test_indexlot_record_exists_in_dataset(self): - self.assertDatasetRecordExists('indexlot', 3) - - def test_emultimedia_record_exists(self): - self.assertRecordExists('emultimedia', 1) - - def test_specimen_record_has_emultimedia(self): - record = self._get_dataset_record('specimen', 12) - multimedia = record['associatedMedia'][0] - self.assertEqual(multimedia['assetID'], '1234') - - def test_etaxonomy_record_exists(self): - self.assertRecordExists('etaxonomy', 1) - - def test_indexlot_record_has_etaxonomy(self): - record = self._get_dataset_record('indexlot', 17) - # Check indexlot scientific name is Tree Creeper - # (Derived from the taxonomy record) - self.assertEqual(record['properties'].get('scientificName'), 'Certhia americana') - -if __name__ == '__main__': - unittest.main() diff --git a/data_importer/tests/test_task.py b/data_importer/tests/test_task.py deleted file mode 100644 index 311f0bd..0000000 --- a/data_importer/tests/test_task.py +++ /dev/null @@ -1,26 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '03/04/2017'. -""" - -import luigi - -from data_importer.lib.helpers import get_dataset_tasks -from data_importer.tasks.delete import DeleteTask - - -class TestTask(luigi.Task): - """ - Helper task for scheduling delete and dataset tasks - """ - date = luigi.IntParameter() - - def requires(self): - params = { - 'date': int(self.date) - } - yield DeleteTask(**params) - params['view_only'] = True - for task in get_dataset_tasks(): - yield task(**params) diff --git a/data_importer/tests/test_unpublish.py b/data_importer/tests/test_unpublish.py deleted file mode 100644 index 6f6cc73..0000000 --- a/data_importer/tests/test_unpublish.py +++ /dev/null @@ -1,20 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -""" -Created by Ben Scott on '31/03/2017'. -""" - -import unittest -from data_importer.tests import BaseTestCase - - -class TestUnpublish(BaseTestCase): - - def test_unpublished_specimen_previously_published_is_marked_deleted(self): - self.assertRecordIsDeleted('ecatalogue', 19) - - def test_unpublished_specimen_previously_published_is_not_released(self): - self.assertDatasetRecordDoesNotExist('specimen', 19) - -if __name__ == '__main__': - unittest.main()