Skip to content

Commit

Permalink
Compile Releases Transform - include records
Browse files Browse the repository at this point in the history
#63

Also, in ocdskingfisherprocess/transform/compile_releases.py marked private methods as private with a _ at start
  • Loading branch information
jarofgreen committed Apr 14, 2020
1 parent 9a2020c commit c292571
Show file tree
Hide file tree
Showing 4 changed files with 280 additions and 8 deletions.
118 changes: 111 additions & 7 deletions ocdskingfisherprocess/transform/compile_releases.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,20 @@ def process(self):
return

# Do the work ...
for ocid in self.get_ocids():
self.process_ocid(ocid)
for ocid in self._get_ocids():
self._process_ocid(ocid)
# Early return?
if self.run_until_timestamp and self.run_until_timestamp < datetime.datetime.utcnow().timestamp():
return

# Mark Transform as finished
self.database.mark_collection_store_done(self.destination_collection.database_id)

def get_ocids(self):
def _get_ocids(self):
''' Gets the ocids for this collection that have not been transformed'''
ocids = []

# get ocids from releases
with self.database.get_engine().begin() as engine:
query = engine.execute(
sa.text(
Expand All @@ -55,18 +56,121 @@ def get_ocids(self):
for row in query:
ocids.append(row['ocid'])

# get ocids from records
with self.database.get_engine().begin() as engine:
query = engine.execute(
sa.text(
" SELECT r.ocid FROM record AS r" +
" LEFT JOIN compiled_release AS cr ON " +
" cr.ocid = r.ocid and cr.collection_id = :destination_collection_id" +
" WHERE r.collection_id = :collection_id and cr.ocid is NULL" +
" GROUP BY r.ocid "
),
collection_id=self.source_collection.database_id,
destination_collection_id=self.destination_collection.database_id
)

for row in query:
if row['ocid'] not in ocids:
ocids.append(row['ocid'])

# done
return ocids

def process_ocid(self, ocid):
def _process_ocid(self, ocid):

# Load Records
records = []
with self.database.get_engine().begin() as engine:
query = engine.execute(sa.text(
" SELECT record.* FROM record " +
" WHERE record.collection_id = :collection_id AND record.ocid = :ocid "
), collection_id=self.source_collection.database_id, ocid=ocid)

for row in query:
records.append(self.database.get_data(row['data_id']))

# Decide what to do .....
if len(records) > 1:

# This counts as taking a random record, as we have not ordered the SQL query by anything.
# (In practice the way postgres works I think we will always get the first record by load order,
# but with no ORDER BY clause that is not guaranteed)
self._process_record(ocid, records[0])

elif len(records) == 1:

self._process_record(ocid, records[0])

else:

self._process_releases(ocid)

def _process_record(self, ocid, record):

releases = record.get('releases', [])
releases_linked = [r for r in releases if 'url' in r and r['url']]

if len(releases) > 0 and len(releases_linked) == 0:
# We have releases and none are URL's.
# We can compile them ourselves.
out = ocdsmerge.merge(releases)

# In the occurrence of a race condition where two concurrent transforms have run the same ocid
# we rely on the fact that collection_id and filename are unique in the file_item table.
# Therefore this will error with a violation of unique key contraint and not cause duplicate entries.
self.store.store_file_item(ocid + '.json', '', 'compiled_release', out, 1)
return

compiled_release = record.get('compiledRelease')
if compiled_release:

# TODO log we have done this

# In the occurrence of a race condition where two concurrent transforms have run the same ocid
# we rely on the fact that collection_id and filename are unique in the file_item table.
# Therefore this will error with a violation of unique key contraint and not cause duplicate entries.
self.store.store_file_item(ocid + '.json', '', 'compiled_release', compiled_release, 1)
return

releases_compiled = \
[x for x in releases if 'tag' in x and isinstance(x['tag'], list) and 'compiled' in x['tag']]

if len(releases_compiled) > 1:
# If more than one, pick one at random. and log that.
warning = 'This already has multiple compiled releases in the source! ' + \
'We have picked one at random and passed it through this transform unchanged.'
self.store.store_file_item(
ocid + '.json',
'',
'compiled_release',
releases_compiled[0],
1,
warnings=[warning])
elif len(releases_compiled) == 1:
# There is just one compiled release - pass it through unchanged, and log that.
warning = 'This already has one compiled release in the source! ' + \
'We have passed it through this transform unchanged.'
self.store.store_file_item(
ocid + '.json',
'',
'compiled_release',
releases_compiled[0],
1,
warnings=[warning])
else:
# We can't process this ocid. Warn of that.
pass
# TODO

def _process_releases(self, ocid):

releases = []

with self.database.get_engine().begin() as engine:
query = engine.execute(sa.text(
" SELECT release.* FROM release " +
" JOIN collection_file_item ON collection_file_item.id = release.collection_file_item_id " +
" JOIN collection_file ON collection_file.id = collection_file_item.collection_file_id " +
" WHERE collection_file.collection_id = :collection_id AND release.ocid = :ocid "
" WHERE release.collection_id = :collection_id AND release.ocid = :ocid "
), collection_id=self.source_collection.database_id, ocid=ocid)

for row in query:
Expand Down
70 changes: 70 additions & 0 deletions tests/fixtures/sample_1_1_record_linked_releases_not_compiled.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
{
"uri": "https://raw.githubusercontent.com/open-contracting/sample-data/master/fictional-example/1.0/record/ocds-213czf-000-00001.json",
"version": "1.1",
"publisher": {
"scheme": "GB-COH",
"uid": "09506232",
"name": "Open Data Services Co-operative Limited",
"uri": "http://standard.open-contracting.org/examples/"
},
"publishedDate": "2014-02-02T13:02:00Z",
"license": "http://opendatacommons.org/licenses/pddl/1.0/",
"publicationPolicy": "https://github.com/open-contracting/sample-data/",
"packages": [
"http://standard.open-contracting.org/examples/1.0/releases/ocds-213czf-000-00001-01-planning.json",
"http://standard.open-contracting.org/examples/1.0/releases/ocds-213czf-000-00001-02-tender.json",
"http://standard.open-contracting.org/examples/1.0/releases/ocds-213czf-000-00001-03-tenderAmendment.json",
"http://standard.open-contracting.org/examples/1.0/releases/ocds-213czf-000-00001-04-award.json",
"http://standard.open-contracting.org/examples/1.0/releases/ocds-213czf-000-00001-05-contract.json",
"http://standard.open-contracting.org/examples/1.0/releases/ocds-213czf-000-00001-05-contract.json"
],
"records": [
{
"ocid": "ocds-213czf-000-00001",
"releases": [
{
"url": "http://standard.open-contracting.org/examples/1.0/releases/ocds-213czf-000-00001-01-planning.json#ocds-213czf-000-00001-01-planning",
"date": "2009-03-15T14:45:00Z",
"tag": [
"planning"
]
},
{
"url": "http://standard.open-contracting.org/examples/1.0/releases/ocds-213czf-000-00001-02-tender.json#ocds-213czf-000-00001-02-tender",
"date": "2010-03-15T09:30:00Z",
"tag": [
"tender"
]
},
{
"url": "http://standard.open-contracting.org/examples/1.0/releases/ocds-213czf-000-00001-03-tenderAmendment.json#ocds-213czf-000-00001-03-tenderAmendment",
"date": "2010-03-20T09:45:00Z",
"tag": [
"tenderAmendment"
]
},
{
"url": "http://standard.open-contracting.org/examples/1.0/releases/ocds-213czf-000-00001-04-award.json#ocds-213czf-000-00001-04-award",
"date": "2010-05-10T09:30:00Z",
"tag": [
"award"
]
},
{
"url": "http://standard.open-contracting.org/examples/1.0/releases/ocds-213czf-000-00001-05-contract.json#ocds-213czf-000-00001-05-contract",
"date": "2010-05-10T10:30:00Z",
"tag": [
"contract"
]
},
{
"url": "http://standard.open-contracting.org/examples/1.0/releases/ocds-213czf-000-00001-05-contract.json#ocds-213czf-000-00001-06-implementation",
"date": "2011-01-10T09:30:00Z",
"tag": [
"implementation"
]
}
]
}
]
}
98 changes: 98 additions & 0 deletions tests/test_transform_compile_releases_from_records.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import datetime
import os

import sqlalchemy as sa

from ocdskingfisherprocess.store import Store
from ocdskingfisherprocess.transform import TRANSFORM_TYPE_COMPILE_RELEASES
from ocdskingfisherprocess.transform.compile_releases import CompileReleasesTransform
from tests.base import BaseDataBaseTest


class TestTransformCompileReleasesFromRecords(BaseDataBaseTest):


def _setup(self, filename):

# Make source collection
source_collection_id = self.database.get_or_create_collection_id("test", datetime.datetime.now(), False)
source_collection = self.database.get_collection(source_collection_id)

# Load some data
store = Store(self.config, self.database)
store.set_collection(source_collection)
json_filename = os.path.join(os.path.dirname(
os.path.realpath(__file__)), 'fixtures', filename
)
store.store_file_from_local("test.json", "http://example.com", "record_package", "utf-8", json_filename)

# Make destination collection
destination_collection_id = self.database.get_or_create_collection_id(
source_collection.source_id,
source_collection.data_version,
source_collection.sample,
transform_from_collection_id=source_collection_id,
transform_type=TRANSFORM_TYPE_COMPILE_RELEASES)
destination_collection = self.database.get_collection(destination_collection_id)

# transform! Nothing should happen because source is not finished
transform = CompileReleasesTransform(self.config, self.database, destination_collection)
transform.process()

# check
with self.database.get_engine().begin() as connection:
s = sa.sql.select([self.database.compiled_release_table])
result = connection.execute(s)
assert 0 == result.rowcount

# Mark source collection as finished
self.database.mark_collection_store_done(source_collection_id)

# transform! This should do the work.
transform = CompileReleasesTransform(self.config, self.database, destination_collection)
transform.process()

return source_collection_id, source_collection, destination_collection_id, destination_collection

def test_compiled_release(self):

source_collection_id, source_collection, destination_collection_id, destination_collection = \
self._setup('sample_1_1_record.json')

# check
with self.database.get_engine().begin() as connection:
s = sa.sql.select([self.database.compiled_release_table])
result = connection.execute(s)
assert 1 == result.rowcount

# Check a couple of fields just to sanity check it's the compiled release in the record table
complied_release = result.fetchone()
data = self.database.get_data(complied_release['data_id'])
assert 'ocds-213czf-000-00001-2011-01-10T09:30:00Z' == data.get('id')
assert '2011-01-10T09:30:00Z' == data.get('date')
assert 'ocds-213czf-000-00001' == data.get('ocid')

# transform again! This should be fine
transform = CompileReleasesTransform(self.config, self.database, destination_collection)
transform.process()

# check
with self.database.get_engine().begin() as connection:
s = sa.sql.select([self.database.compiled_release_table])
result = connection.execute(s)
assert 1 == result.rowcount

# destination collection should be closed
destination_collection = self.database.get_collection(destination_collection_id)
assert destination_collection.store_end_at != None # noqa

def test_no_compiled_release_linked_records_so_cant_do_anything(self):

source_collection_id, source_collection, destination_collection_id, destination_collection = \
self._setup('sample_1_1_record_linked_releases_not_compiled.json')

# check
with self.database.get_engine().begin() as connection:
s = sa.sql.select([self.database.compiled_release_table])
result = connection.execute(s)
assert 0 == result.rowcount
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from tests.base import BaseDataBaseTest


class TestTransformCompileReleases(BaseDataBaseTest):
class TestTransformCompileReleasesFromReleases(BaseDataBaseTest):

def test_1(self):
# Make source collection
Expand Down

0 comments on commit c292571

Please sign in to comment.