diff --git a/ezidapp/management/commands/diag-db-stats.py b/ezidapp/management/commands/diag-db-stats.py index 9376e2613..ce0c07ab0 100644 --- a/ezidapp/management/commands/diag-db-stats.py +++ b/ezidapp/management/commands/diag-db-stats.py @@ -14,6 +14,7 @@ import contextlib import logging +import argparse import django.apps import django.conf diff --git a/ezidapp/management/commands/diag-queue-stats.py b/ezidapp/management/commands/diag-queue-stats.py new file mode 100644 index 000000000..e4c21bbb7 --- /dev/null +++ b/ezidapp/management/commands/diag-queue-stats.py @@ -0,0 +1,70 @@ +# Copyright©2021, Regents of the University of California +# http://creativecommons.org/licenses/BSD + +"""Report queue statuses + +For each queue, report the number of entries at each status level. + +For queues other than download: + U = Unsubmitted + C = Unchecked + S = Submitted + W = Warning + F = Failure + I = Ignored + O = Success +""" + +import json +import logging + +import django.apps +import django.conf +import django.contrib.auth.models +import django.core.management +import django.db.models +import django.db.transaction + +import ezidapp.models.async_queue +import ezidapp.models.identifier + +_L = logging.getLogger(__name__) + +class Command(django.core.management.BaseCommand): + help = __doc__ + + def __init__(self): + super(Command, self).__init__() + + def handle(self, *_, **opt): + queue_classes = [ + ("binder", ezidapp.models.async_queue.BinderQueue), + ("datacite", ezidapp.models.async_queue.DataciteQueue), + ("crossref", ezidapp.models.async_queue.CrossrefQueue), + ("searchindexer", ezidapp.models.async_queue.SearchIndexerQueue), + ] + queue_stats = { + 'download': {} + } + #Download queue is a different beast + _L.info("Processing queue: download...") + res = ezidapp.models.async_queue.DownloadQueue.objects\ + .all()\ + .values('stage')\ + .annotate(total=django.db.models.Count('stage'))\ + .order_by() + for row in res: + queue_stats['download'][row['stage']] = row['total'] + + for q_class in queue_classes: + q_name = q_class[0] + _L.info(f"Processing queue: {q_name}") + res = q_class[1].objects\ + .all()\ + .values('status')\ + .annotate(total=django.db.models.Count('status'))\ + .order_by() + queue_stats[q_name] = {} + for row in res: + queue_stats[q_name][row['status']] = row['total'] + print(json.dumps(queue_stats, indent=2)) diff --git a/ezidapp/management/commands/proc-download.py b/ezidapp/management/commands/proc-download.py index f6f5f50cb..9038fe8ea 100644 --- a/ezidapp/management/commands/proc-download.py +++ b/ezidapp/management/commands/proc-download.py @@ -13,13 +13,13 @@ """ import csv -import logging import os import os.path import pathlib import re import subprocess import time +import typing import django.conf import django.core.mail @@ -41,8 +41,6 @@ import impl.util import impl.util2 -log = logging.getLogger(__name__) - SUFFIX_FORMAT_DICT = { ezidapp.models.async_queue.DownloadQueue.ANVL: "txt", @@ -50,6 +48,7 @@ ezidapp.models.async_queue.DownloadQueue.XML: "xml", } + class Command(ezidapp.management.commands.proc_base.AsyncProcessingCommand): help = __doc__ name = __name__ @@ -66,20 +65,25 @@ def run(self): if doSleep: self.sleep(django.conf.settings.DAEMONS_DOWNLOAD_PROCESSING_IDLE_SLEEP) try: - r = ezidapp.models.async_queue.DownloadQueue.objects.all().order_by("seq")[:1] - if len(r) == 0: + rs = ezidapp.models.async_queue.DownloadQueue.objects.all().order_by("seq")[:1] + if len(rs) == 0: + # Don't sleep while work is in progress doSleep = True continue - self._proc_stage(r) + self._proc_stage(rs) self._remove_expired_files() doSleep = False except Exception as e: - log.exception('Exception') + self.log.exception('Exception') impl.log.otherError("download.run", e) doSleep = True - def _proc_stage(self, r): - r = r[0] + def _proc_stage(self, rs): + # rs is a list of ezidapp.models.async_queue.DownloadQueue + # Only process one download request at a time + # Once completed, current is deleted, so the + # next one becomes index 0 + r = rs[0] if r.stage == ezidapp.models.async_queue.DownloadQueue.CREATE: self._createFile(r) elif r.stage == ezidapp.models.async_queue.DownloadQueue.HARVEST: @@ -122,7 +126,7 @@ def _wrapException(self, context, exception): m = ": " + m return Exception(f"batch download error: {context}: {type(exception).__name__}{m}") - def _path(self, r, i): + def _path(self, r: ezidapp.models.async_queue.DownloadQueue, i: int): # i=1: uncompressed work file # i=2: compressed work file # i=3: compressed delivery file @@ -142,12 +146,13 @@ def _path(self, r, i): def _csvEncode(self, s): return impl.util.oneLine(s).encode("utf-8") - def _flushFile(self, f): + def _flushFile(self, f: typing.TextIO): f.flush() os.fsync(f.fileno()) - def _createFile(self, r): + def _createFile(self, r: ezidapp.models.async_queue.DownloadQueue): f = None + self.log.debug("createFile: %s", self._path(r, 1)) try: f = open(self._path(r, 1), "w", newline='', encoding="utf-8") if r.format == ezidapp.models.async_queue.DownloadQueue.CSV: @@ -162,7 +167,7 @@ def _createFile(self, r): # probe the file to find its size. n = f.tell() except Exception as e: - log.exception('Exception') + self.log.exception('Exception') raise self._wrapException("error creating file", e) else: r.stage = ezidapp.models.async_queue.DownloadQueue.HARVEST @@ -213,9 +218,9 @@ def _satisfiesConstraints(self, id_model, constraints): def _prepareMetadata( self, - id_model: ezidapp.models.identifier.Identifier, + id_model: ezidapp.models.identifier.SearchIdentifier, convertTimestamps: object, - ) -> object: + ) -> dict: d = id_model.toLegacy() ezidapp.models.model_util.convertLegacyToExternal(d) if id_model.isDoi: @@ -225,13 +230,21 @@ def _prepareMetadata( d["_updated"] = impl.util.formatTimestampZulu(int(d["_updated"])) return d - def _writeAnvl(self, f, id_model, metadata): + def _writeAnvl( + self, f: typing.TextIO, id_model: ezidapp.models.identifier.SearchIdentifier, metadata: dict + ): if f.tell() > 0: f.write("\n") f.write(f":: {id_model.identifier}\n") f.write(impl.anvl.format(metadata).encode("utf-8")) - def _writeCsv(self, f, columns, id_model, metadata): + def _writeCsv( + self, + f: typing.TextIO, + columns, + id_model: ezidapp.models.identifier.SearchIdentifier, + metadata: dict, + ): w = csv.writer(f) l = [] for c in columns: @@ -251,20 +264,23 @@ def _writeCsv(self, f, columns, id_model, metadata): l.append(metadata.get(c, "")) w.writerow([self._csvEncode(c).decode('utf-8', errors='replace') for c in l]) - def _writeXml(self, f, id, metadata): + def _writeXml( + self, f: typing.TextIO, id: ezidapp.models.identifier.SearchIdentifier, metadata: dict + ): f.write(f'') for k, v in list(metadata.items()): if k in ["datacite", "crossref"]: v = impl.util.removeXmlDeclaration(v) else: v = impl.util.xmlEscape(v) - f.write(f'{v}'.encode("utf-8")) + f.write(f'{v}') f.write("") - def _harvest1(self, r, f): + def _harvest1(self, r: ezidapp.models.async_queue.DownloadQueue, f: typing.TextIO): columns = self._decode(r.columns) constraints = self._decode(r.constraints) options = self._decode(r.options) + _total = 0 while not self.terminated(): qs = ( ezidapp.models.identifier.SearchIdentifier.objects.filter(identifier__gt=r.lastId) @@ -272,7 +288,9 @@ def _harvest1(self, r, f): .select_related("owner", "ownergroup", "datacenter", "profile") .order_by("identifier") ) + # self.log.debug("Query issued: %s", str(qs.query)) ids = list(qs[:1000]) + self.log.debug("Total query matches: %s", len(ids)) if len(ids) == 0: break try: @@ -287,15 +305,20 @@ def _harvest1(self, r, f): self._writeXml(f, id, m) else: assert False, "unhandled case" + _total += 1 self._flushFile(f) except Exception as e: - log.exception('Exception') + self.log.exception('Exception') raise self._wrapException("error writing file", e) r.lastId = ids[-1].identifier r.fileSize = f.tell() r.save() + if self.terminated(): + self.log.warning("Harvest terminated.") + else: + self.log.info("Total records exported: %s", _total) - def _harvest(self, r): + def _harvest(self, r: ezidapp.models.async_queue.DownloadQueue): f = None try: try: @@ -304,7 +327,7 @@ def _harvest(self, r): f.seek(r.fileSize) f.truncate() except Exception as e: - log.exception('Exception') + self.log.exception('Exception') raise self._wrapException("error re-opening/seeking/truncating file", e) start = r.currentIndex for i in range(r.currentIndex, len(r.toHarvest.split(","))): @@ -318,7 +341,7 @@ def _harvest(self, r): f.write("") self._flushFile(f) except Exception as e: - log.exception('Exception') + self.log.exception('Exception') raise self._wrapException("error writing file footer", e) r.stage = ezidapp.models.async_queue.DownloadQueue.COMPRESS r.save() @@ -326,7 +349,7 @@ def _harvest(self, r): if f: f.close() - def _compressFile(self, r): + def _compressFile(self, r: ezidapp.models.async_queue.DownloadQueue): infile = None outfile = None try: @@ -369,7 +392,7 @@ def _compressFile(self, r): p.returncode == 0 and stderr == b'' ), f"compression command returned status code {p.returncode:d}, stderr '{stderr}'" except Exception as e: - log.exception('Exception') + self.log.exception('Exception') raise self._wrapException("error compressing file", e) else: r.stage = ezidapp.models.async_queue.DownloadQueue.DELETE @@ -380,39 +403,37 @@ def _compressFile(self, r): if outfile: outfile.close() - def _deleteUncompressedFile(self, r): + def _deleteUncompressedFile(self, r: ezidapp.models.async_queue.DownloadQueue): try: if os.path.exists(self._path(r, 1)): os.unlink(self._path(r, 1)) except Exception as e: - log.exception('Exception') + self.log.exception('Exception') raise self._wrapException("error deleting uncompressed file", e) else: r.stage = ezidapp.models.async_queue.DownloadQueue.MOVE r.save() - def _moveCompressedFile(self, r): + def _moveCompressedFile(self, r: ezidapp.models.async_queue.DownloadQueue): try: if os.path.exists(self._path(r, 2)): os.rename(self._path(r, 2), self._path(r, 3)) else: assert os.path.exists(self._path(r, 3)), "file has disappeared" except Exception as e: - log.exception('Exception') + self.log.exception('Exception') raise self._wrapException("error moving compressed file", e) else: r.stage = ezidapp.models.async_queue.DownloadQueue.NOTIFY r.save() - def _notifyRequestor(self, r): + def _notifyRequestor(self, r: ezidapp.models.async_queue.DownloadQueue): f = None try: - f = open(self._path(r, 4), "w") - f.write( - f"{ezidapp.models.util.getUserByPid(r.requestor).username}\n{r.rawRequest.encode('utf-8')}\n" - ) + f = open(self._path(r, 4), mode="w", encoding="utf-8") + f.write(f"{ezidapp.models.util.getUserByPid(r.requestor).username}\n{r.rawRequest}\n") except Exception as e: - log.exception('Exception') + self.log.exception('Exception') raise self._wrapException("error writing sidecar file", e) finally: if f: @@ -448,27 +469,34 @@ def _notifyRequestor(self, r): fail_silently=True, ) except Exception as e: - log.exception('Exception') + self.log.exception('Exception') raise self._wrapException("error sending email", e) r.delete() - def _unescape(self, s): + def _unescape(self, s: str) -> str: return re.sub("%([0-9A-F][0-9A-F])", lambda m: chr(int(m.group(1), 16)), s) - - def _decode(self, s): + def _decode(self, s: str): + ''' + Decodes DownloadQueue.constraint + ''' if s[0] == "B": + # boolean return s[1:] == "True" elif s[0] == "I": + # integer return int(s[1:]) elif s[0] == "S": + # string return s[1:] elif s[0] == "L": + # list, from comma separated string of constraints if len(s) > 1: return [self._decode(self._unescape(i)) for i in s[1:].split(",")] else: return [] elif s[0] == "D": + # dict, from comma separated list of k=v if len(s) > 1: return dict( list( @@ -485,8 +513,7 @@ def _decode(self, s): else: assert False, "unhandled case" - - def _fileSuffix(self, r): + def _fileSuffix(self, r: ezidapp.models.async_queue.DownloadQueue): if r.compression == ezidapp.models.async_queue.DownloadQueue.GZIP: return SUFFIX_FORMAT_DICT[r.format] + ".gz" else: diff --git a/ezidapp/models/validation.py b/ezidapp/models/validation.py index 7f6b93c09..74e64c2be 100644 --- a/ezidapp/models/validation.py +++ b/ezidapp/models/validation.py @@ -114,7 +114,7 @@ def publicationDate(date): # return ("%04d", "%04d-%02d", "%04d-%02d-%02d")[numComponents - 1] % t[:numComponents] return ("{:04d}", "{:04d}-{:02d}", "{:04d}-{:02d}-{:02d}")[ numComponents - 1 - ].format(t[:numComponents]) + ].format(*t[:numComponents]) except Exception: pass raise django.core.exceptions.ValidationError( diff --git a/impl/mapping.py b/impl/mapping.py index a0125abe5..b0be56860 100644 --- a/impl/mapping.py +++ b/impl/mapping.py @@ -21,7 +21,7 @@ import re -# import ezidapp.models.validation +import ezidapp.models.validation import impl.datacite import impl.erc import impl.util @@ -52,8 +52,8 @@ def __init__( def validatedDate(self): if self.date is not None: try: - import ezidapp.models.validation - + #2022-06-22 Not clear why this import was within the method instead of module level + #import ezidapp.models.validation return ezidapp.models.validation.publicationDate(self.date) except Exception: return None diff --git a/tests/test_docs/datacite_metadata_01.txt b/tests/test_docs/datacite_metadata_01.txt new file mode 100644 index 000000000..af3ee31c6 --- /dev/null +++ b/tests/test_docs/datacite_metadata_01.txt @@ -0,0 +1,7 @@ +# For input to client testing datacite minting +# client.py l admin mint doi:10.5072/FK2 @datacite_metadata_01.txt +datacite.creator: Dave +datacite.title: Test doc +datacite.publicationyear: 1961 +datacite.resourcetype: Event +datacite.publisher: Tester diff --git a/tests/test_validation.py b/tests/test_validation.py new file mode 100644 index 000000000..1fdcfd6ef --- /dev/null +++ b/tests/test_validation.py @@ -0,0 +1,20 @@ +# Copyright©2021, Regents of the University of California +# http://creativecommons.org/licenses/BSD + +"""Test ezidapp.models.validation +""" + +import pytest + +import ezidapp.models.validation + +#TODO: Flesh out the test cases to match all the possibilities in the tested method +@pytest.mark.parametrize("test,expected",[ + ('1961', '1961'), + ('196104','1961-04'), + ('20201201', '2020-12-01'), +]) +def test_publicationDate(test, expected): + res = ezidapp.models.validation.publicationDate(test) + assert res == expected +