Skip to content

Commit

Permalink
Merge pull request #139 from liip/feat/handle-error-deleting-old-reso…
Browse files Browse the repository at this point in the history
…urce-for-main

Feat/handle error deleting old resource for main
  • Loading branch information
bellisk authored Dec 4, 2024
2 parents fec4d20 + 74c1912 commit 0e21d01
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 50 deletions.
4 changes: 2 additions & 2 deletions bin/install_test_requirements.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ pip install -e /__w/ckanext-switzerland/ckanext-switzerland/
# Install ckanext dependencies
pip install -e git+https://github.com/ckan/[email protected]#egg=ckanext-dcat
pip install -r https://raw.githubusercontent.com/ckan/ckanext-dcat/v1.5.1/requirements.txt
pip install -e git+https://gitlab.liip.ch/odp_oev_schweiz/ckanext-harvest.git#egg=ckanext-harvest
pip install -r https://gitlab.liip.ch/odp_oev_schweiz/ckanext-harvest/-/raw/main/requirements.txt
pip install -e git+https://gitlab.liip.ch/odp_oev_schweiz/ckanext-harvest.git@v1.0.2#egg=ckanext-harvest
pip install -r https://gitlab.liip.ch/odp_oev_schweiz/ckanext-harvest/-/raw/v1.0.2/requirements.txt
pip install -e git+https://github.com/ckan/[email protected]#egg=ckanext-scheming
pip install -e git+https://github.com/ckan/ckanext-fluent.git#egg=ckanext-fluent
pip install -r https://raw.githubusercontent.com/ckan/ckanext-fluent/master/requirements.txt
Expand Down
130 changes: 82 additions & 48 deletions ckanext/switzerland/harvester/base_sbb_harvester.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@

from ckanext.harvest.harvesters.base import HarvesterBase
from ckanext.switzerland.harvester.storage_adapter_factory import StorageAdapterFactory
from ckanext.switzerland.helpers import resource_filename

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -445,7 +444,7 @@ def fetch_stage(self, harvest_object):
)
return False

def _fetch_stage(self, harvest_object): # noqa
def _fetch_stage(self, harvest_object): # noqa: C901
"""
Fetching of resources. Runs once for each gathered resource.
Expand Down Expand Up @@ -603,7 +602,7 @@ def import_stage(self, harvest_object):
)
return False

def _import_stage(self, harvest_object): # noqa
def _import_stage(self, harvest_object): # noqa: C901
"""
Importing the fetched files into CKAN storage.
Runs once for each fetched resource.
Expand Down Expand Up @@ -939,15 +938,17 @@ def _import_stage(self, harvest_object): # noqa
if old_resource_id:
log.info("Deleting old resource: %s", old_resource_id)

# delete the datastore table
try:
get_action("datastore_delete")(
context, {"resource_id": old_resource_id, "force": True}
)
self._fully_delete_resource(context, old_resource_meta)
except NotFound:
pass # Sometimes importing the data into the datastore fails

get_action("resource_delete")(context, {"id": old_resource_id})
self._save_object_error(
f"Error deleting old resource {old_resource_id} for "
f"filename {file_name}. This could be due to a failed "
f"connection to the database. "
f"{traceback.format_exc()}",
harvest_object,
stage,
)

Session.commit()

Expand Down Expand Up @@ -995,8 +996,10 @@ def _get_ordered_resources(self, package):

return ordered_resources, unmatched_resources

def finalize(self, harvest_object, harvest_object_data):
context = {"model": model, "session": Session, "user": self._get_user_name()}
def finalize(self, harvest_object, harvest_object_data): # noqa: C901
# TODO: Simplify this method.
user_name = self._get_user_name()
context = {"model": model, "session": Session, "user": user_name}
stage = "Import"

log.info("Running finalizing tasks:")
Expand Down Expand Up @@ -1029,9 +1032,23 @@ def finalize(self, harvest_object, harvest_object_data):
log.exception(message)
self._save_object_error(message, harvest_object, "Import")

return True
return False

ordered_resources, unmatched_resources = self._get_ordered_resources(package)
try:
ordered_resources, unmatched_resources = self._get_ordered_resources(
package
)
except NotFound:
self._save_object_error(
f"Error reordering resources for dataset "
f"{harvest_object_data['dataset']}. "
f"This could be due to a failed connection to the database. "
f"{traceback.format_exc()}",
harvest_object,
stage,
)

return False

# ----------------------------------------------------------------------------
# delete old resources
Expand All @@ -1048,9 +1065,16 @@ def finalize(self, harvest_object, harvest_object_data):

for resource in ordered_resources[max_resources:]:
try:
self._delete_version(
context, package, resource_filename(resource["url"])
)
# We need a new context each time: otherwise, if there is an
# exception deleting the resource, there will be auth data left in
# the context that won't get deleted. Then all subsequent calls to
# resource_delete will seem unauthorized and fail.
delete_context = {
"model": model,
"session": Session,
"user": user_name,
}
self._fully_delete_resource(delete_context, resource)
except Exception as e:
self._save_object_error(
"Error deleting resource {} in finalizing tasks: {}".format(
Expand Down Expand Up @@ -1080,15 +1104,27 @@ def finalize(self, harvest_object, harvest_object_data):
},
)

# reorder resources
# not matched resources come first in the list, then the ordered
get_action("package_resource_reorder")(
context,
{
"id": package["id"],
"order": [r["id"] for r in unmatched_resources + ordered_resources],
},
)
try:
# reorder resources
# not matched resources come first in the list, then the ordered
get_action("package_resource_reorder")(
context,
{
"id": package["id"],
"order": [r["id"] for r in unmatched_resources + ordered_resources],
},
)
except ValidationError:
self._save_object_error(
f"Error reordering resources for dataset "
f"{harvest_object_data['dataset']}. "
f"This could be due to a failed connection to the database. "
f"{traceback.format_exc()}",
harvest_object,
stage,
)

return False

from ckanext.harvest.model import harvest_object_table

Expand All @@ -1114,27 +1150,25 @@ def finalize(self, harvest_object, harvest_object_data):

search.rebuild(package["id"])

def _delete_version(self, context, package, filename):
"""Fully delete the resource with the given filename"""
for resource in package["resources"]:
if resource_filename(resource["url"]) == filename:
log.debug(
"Deleting resource {} with filename {}".format(
resource["id"], filename
)
)
# delete the file from the filestore
path = uploader.ResourceUpload(resource).get_path(resource["id"])
if os.path.exists(path):
os.remove(path)
def _fully_delete_resource(self, context, resource):
"""Fully delete a resource and its file."""
log.debug(
"Deleting resource {} with filename {}".format(
resource["id"], resource["url"]
)
)
# delete the file from the filestore
path = uploader.ResourceUpload(resource).get_path(resource["id"])
if os.path.exists(path):
os.remove(path)

# delete the datastore table
try:
get_action("datastore_delete")(
context, {"resource_id": resource["id"], "force": True}
)
except NotFound:
pass # Sometimes importing the data into the datastore fails
# delete the datastore table
try:
get_action("datastore_delete")(
context, {"resource_id": resource["id"], "force": True}
)
except NotFound:
pass # Sometimes importing the data into the datastore fails

# delete the resource itself
get_action("resource_delete")(context, {"id": resource["id"]})
# delete the resource itself
get_action("resource_delete")(context, {"id": resource["id"]})

0 comments on commit 0e21d01

Please sign in to comment.