diff --git a/ocdskingfisherprocess/transform/compile_releases.py b/ocdskingfisherprocess/transform/compile_releases.py index d66c15e7..e78d7695 100644 --- a/ocdskingfisherprocess/transform/compile_releases.py +++ b/ocdskingfisherprocess/transform/compile_releases.py @@ -26,8 +26,8 @@ def process(self): return # Do the work ... - for ocid in self.get_ocids(): - self.process_ocid(ocid) + for ocid in self._get_ocids(): + self._process_ocid(ocid) # Early return? if self.run_until_timestamp and self.run_until_timestamp < datetime.datetime.utcnow().timestamp(): return @@ -35,10 +35,11 @@ def process(self): # Mark Transform as finished self.database.mark_collection_store_done(self.destination_collection.database_id) - def get_ocids(self): + def _get_ocids(self): ''' Gets the ocids for this collection that have not been transformed''' ocids = [] + # get ocids from releases with self.database.get_engine().begin() as engine: query = engine.execute( sa.text( @@ -55,18 +56,105 @@ def get_ocids(self): for row in query: ocids.append(row['ocid']) + # get ocids from records + with self.database.get_engine().begin() as engine: + query = engine.execute( + sa.text( + " SELECT r.ocid FROM record AS r" + + " LEFT JOIN compiled_release AS cr ON " + + " cr.ocid = r.ocid and cr.collection_id = :destination_collection_id" + + " WHERE r.collection_id = :collection_id and cr.ocid is NULL" + + " GROUP BY r.ocid " + ), + collection_id=self.source_collection.database_id, + destination_collection_id=self.destination_collection.database_id + ) + + for row in query: + if row['ocid'] not in ocids: + ocids.append(row['ocid']) + + # done return ocids - def process_ocid(self, ocid): + def _process_ocid(self, ocid): + + # Load Records + records = [] + with self.database.get_engine().begin() as engine: + query = engine.execute(sa.text( + " SELECT record.* FROM record " + + " WHERE record.collection_id = :collection_id AND record.ocid = :ocid " + ), collection_id=self.source_collection.database_id, ocid=ocid) + + for row in query: + records.append(self.database.get_data(row['data_id'])) + + # Decide what to do ..... + if len(records) > 1: + + # This counts as taking a random record, as we have not ordered the SQL query by anything. + # (In practice the way postgres works I think we will always get the first record by load order, + # but with no ORDER BY clause that is not guaranteed) + # TODO log we have done this + self._process_record(ocid, records[0]) + + elif len(records) == 1: + + self._process_record(ocid, records[0]) + + else: + + self._process_releases(ocid) + + def _process_record(self, ocid, record): + + releases = record.get('releases', []) + releases_linked = [r for r in releases if 'url' in r and r['url']] + + if len(releases) > 0 and len(releases_linked) == 0: + # We have releases and none are linked (have URL's). + # We can compile them ourselves. + out = ocdsmerge.merge(releases) + self._store_result(ocid, out) + return + + compiled_release = record.get('compiledRelease') + if compiled_release: + + # TODO log we have done this + + self._store_result(ocid, compiled_release) + return + + releases_compiled = \ + [x for x in releases if 'tag' in x and isinstance(x['tag'], list) and 'compiled' in x['tag']] + + if len(releases_compiled) > 1: + # If more than one, pick one at random. and log that. + warning = 'This already has multiple compiled releases in the source! ' + \ + 'We have picked one at random and passed it through this transform unchanged.' + self._store_result(ocid, releases_compiled[0], warnings=[warning]) + + elif len(releases_compiled) == 1: + # There is just one compiled release - pass it through unchanged, and log that. + warning = 'This already has one compiled release in the source! ' + \ + 'We have passed it through this transform unchanged.' + self._store_result(ocid, releases_compiled[0], warnings=[warning]) + + else: + # We can't process this ocid. Warn of that. + pass + # TODO log we have done this + + def _process_releases(self, ocid): releases = [] with self.database.get_engine().begin() as engine: query = engine.execute(sa.text( " SELECT release.* FROM release " + - " JOIN collection_file_item ON collection_file_item.id = release.collection_file_item_id " + - " JOIN collection_file ON collection_file.id = collection_file_item.collection_file_id " + - " WHERE collection_file.collection_id = :collection_id AND release.ocid = :ocid " + " WHERE release.collection_id = :collection_id AND release.ocid = :ocid " ), collection_id=self.source_collection.database_id, ocid=ocid) for row in query: @@ -80,29 +168,20 @@ def process_ocid(self, ocid): # If more than one, pick one at random. and log that. warning = 'This already has multiple compiled releases in the source! ' +\ 'We have picked one at random and passed it through this transform unchanged.' - self.store.store_file_item( - ocid + '.json', - '', - 'compiled_release', - releases_compiled[0], - 1, - warnings=[warning]) + self._store_result(ocid, releases_compiled[0], warnings=[warning]) elif len(releases_compiled) == 1: # There is just one compiled release - pass it through unchanged, and log that. warning = 'This already has one compiled release in the source! ' +\ 'We have passed it through this transform unchanged.' - self.store.store_file_item( - ocid + '.json', - '', - 'compiled_release', - releases_compiled[0], - 1, - warnings=[warning]) + self._store_result(ocid, releases_compiled[0], warnings=[warning]) else: # There is no compiled release - we will do it ourselves. out = ocdsmerge.merge(releases) - # In the occurrence of a race condition where two concurrent transforms have run the same ocid - # we rely on the fact that collection_id and filename are unique in the file_item table. - # Therefore this will error with a violation of unique key contraint and not cause duplicate entries. - self.store.store_file_item(ocid+'.json', '', 'compiled_release', out, 1) + self._store_result(ocid, out) + + def _store_result(self, ocid, data, warnings=[]): + # In the occurrence of a race condition where two concurrent transforms have run the same ocid + # we rely on the fact that collection_id and filename are unique in the file_item table. + # Therefore this will error with a violation of unique key constraint and not cause duplicate entries. + self.store.store_file_item(ocid+'.json', '', 'compiled_release', data, 1, warnings=warnings) diff --git a/tests/fixtures/sample_1_1_record_linked_releases_not_compiled.json b/tests/fixtures/sample_1_1_record_linked_releases_not_compiled.json new file mode 100644 index 00000000..55da3f41 --- /dev/null +++ b/tests/fixtures/sample_1_1_record_linked_releases_not_compiled.json @@ -0,0 +1,70 @@ +{ + "uri": "https://raw.githubusercontent.com/open-contracting/sample-data/master/fictional-example/1.0/record/ocds-213czf-000-00001.json", + "version": "1.1", + "publisher": { + "scheme": "GB-COH", + "uid": "09506232", + "name": "Open Data Services Co-operative Limited", + "uri": "http://standard.open-contracting.org/examples/" + }, + "publishedDate": "2014-02-02T13:02:00Z", + "license": "http://opendatacommons.org/licenses/pddl/1.0/", + "publicationPolicy": "https://github.com/open-contracting/sample-data/", + "packages": [ + "http://standard.open-contracting.org/examples/1.0/releases/ocds-213czf-000-00001-01-planning.json", + "http://standard.open-contracting.org/examples/1.0/releases/ocds-213czf-000-00001-02-tender.json", + "http://standard.open-contracting.org/examples/1.0/releases/ocds-213czf-000-00001-03-tenderAmendment.json", + "http://standard.open-contracting.org/examples/1.0/releases/ocds-213czf-000-00001-04-award.json", + "http://standard.open-contracting.org/examples/1.0/releases/ocds-213czf-000-00001-05-contract.json", + "http://standard.open-contracting.org/examples/1.0/releases/ocds-213czf-000-00001-05-contract.json" + ], + "records": [ + { + "ocid": "ocds-213czf-000-00001", + "releases": [ + { + "url": "http://standard.open-contracting.org/examples/1.0/releases/ocds-213czf-000-00001-01-planning.json#ocds-213czf-000-00001-01-planning", + "date": "2009-03-15T14:45:00Z", + "tag": [ + "planning" + ] + }, + { + "url": "http://standard.open-contracting.org/examples/1.0/releases/ocds-213czf-000-00001-02-tender.json#ocds-213czf-000-00001-02-tender", + "date": "2010-03-15T09:30:00Z", + "tag": [ + "tender" + ] + }, + { + "url": "http://standard.open-contracting.org/examples/1.0/releases/ocds-213czf-000-00001-03-tenderAmendment.json#ocds-213czf-000-00001-03-tenderAmendment", + "date": "2010-03-20T09:45:00Z", + "tag": [ + "tenderAmendment" + ] + }, + { + "url": "http://standard.open-contracting.org/examples/1.0/releases/ocds-213czf-000-00001-04-award.json#ocds-213czf-000-00001-04-award", + "date": "2010-05-10T09:30:00Z", + "tag": [ + "award" + ] + }, + { + "url": "http://standard.open-contracting.org/examples/1.0/releases/ocds-213czf-000-00001-05-contract.json#ocds-213czf-000-00001-05-contract", + "date": "2010-05-10T10:30:00Z", + "tag": [ + "contract" + ] + }, + { + "url": "http://standard.open-contracting.org/examples/1.0/releases/ocds-213czf-000-00001-05-contract.json#ocds-213czf-000-00001-06-implementation", + "date": "2011-01-10T09:30:00Z", + "tag": [ + "implementation" + ] + } + ] + } + ] +} diff --git a/tests/test_transform_compile_releases_from_records.py b/tests/test_transform_compile_releases_from_records.py new file mode 100644 index 00000000..d14899af --- /dev/null +++ b/tests/test_transform_compile_releases_from_records.py @@ -0,0 +1,97 @@ +import datetime +import os + +import sqlalchemy as sa + +from ocdskingfisherprocess.store import Store +from ocdskingfisherprocess.transform import TRANSFORM_TYPE_COMPILE_RELEASES +from ocdskingfisherprocess.transform.compile_releases import CompileReleasesTransform +from tests.base import BaseDataBaseTest + + +class TestTransformCompileReleasesFromRecords(BaseDataBaseTest): + + def _setup(self, filename): + + # Make source collection + source_collection_id = self.database.get_or_create_collection_id("test", datetime.datetime.now(), False) + source_collection = self.database.get_collection(source_collection_id) + + # Load some data + store = Store(self.config, self.database) + store.set_collection(source_collection) + json_filename = os.path.join(os.path.dirname( + os.path.realpath(__file__)), 'fixtures', filename + ) + store.store_file_from_local("test.json", "http://example.com", "record_package", "utf-8", json_filename) + + # Make destination collection + destination_collection_id = self.database.get_or_create_collection_id( + source_collection.source_id, + source_collection.data_version, + source_collection.sample, + transform_from_collection_id=source_collection_id, + transform_type=TRANSFORM_TYPE_COMPILE_RELEASES) + destination_collection = self.database.get_collection(destination_collection_id) + + # transform! Nothing should happen because source is not finished + transform = CompileReleasesTransform(self.config, self.database, destination_collection) + transform.process() + + # check + with self.database.get_engine().begin() as connection: + s = sa.sql.select([self.database.compiled_release_table]) + result = connection.execute(s) + assert 0 == result.rowcount + + # Mark source collection as finished + self.database.mark_collection_store_done(source_collection_id) + + # transform! This should do the work. + transform = CompileReleasesTransform(self.config, self.database, destination_collection) + transform.process() + + return source_collection_id, source_collection, destination_collection_id, destination_collection + + def test_compiled_release(self): + + source_collection_id, source_collection, destination_collection_id, destination_collection = \ + self._setup('sample_1_1_record.json') + + # check + with self.database.get_engine().begin() as connection: + s = sa.sql.select([self.database.compiled_release_table]) + result = connection.execute(s) + assert 1 == result.rowcount + + # Check a couple of fields just to sanity check it's the compiled release in the record table + complied_release = result.fetchone() + data = self.database.get_data(complied_release['data_id']) + assert 'ocds-213czf-000-00001-2011-01-10T09:30:00Z' == data.get('id') + assert '2011-01-10T09:30:00Z' == data.get('date') + assert 'ocds-213czf-000-00001' == data.get('ocid') + + # transform again! This should be fine + transform = CompileReleasesTransform(self.config, self.database, destination_collection) + transform.process() + + # check + with self.database.get_engine().begin() as connection: + s = sa.sql.select([self.database.compiled_release_table]) + result = connection.execute(s) + assert 1 == result.rowcount + + # destination collection should be closed + destination_collection = self.database.get_collection(destination_collection_id) + assert destination_collection.store_end_at != None # noqa + + def test_no_compiled_release_linked_records_so_cant_do_anything(self): + + source_collection_id, source_collection, destination_collection_id, destination_collection = \ + self._setup('sample_1_1_record_linked_releases_not_compiled.json') + + # check + with self.database.get_engine().begin() as connection: + s = sa.sql.select([self.database.compiled_release_table]) + result = connection.execute(s) + assert 0 == result.rowcount diff --git a/tests/test_transform_compile_releases.py b/tests/test_transform_compile_releases_from_releases.py similarity index 99% rename from tests/test_transform_compile_releases.py rename to tests/test_transform_compile_releases_from_releases.py index e34491d9..2d0e781a 100644 --- a/tests/test_transform_compile_releases.py +++ b/tests/test_transform_compile_releases_from_releases.py @@ -9,7 +9,7 @@ from tests.base import BaseDataBaseTest -class TestTransformCompileReleases(BaseDataBaseTest): +class TestTransformCompileReleasesFromReleases(BaseDataBaseTest): def test_1(self): # Make source collection