diff --git a/doajtest/helpers.py b/doajtest/helpers.py index f65c11f241..1f716bcf3b 100644 --- a/doajtest/helpers.py +++ b/doajtest/helpers.py @@ -120,6 +120,7 @@ class DoajTestCase(TestCase): @classmethod def setUpClass(cls) -> None: + import portality.app # noqa, needed to registing routes cls.originals = patch_config(app, { "STORE_IMPL": "portality.store.StoreLocal", "STORE_LOCAL_DIR": paths.rel2abs(__file__, "..", "tmp", "store", "main", cls.__name__.lower()), diff --git a/portality/events/combined.py b/portality/events/combined.py new file mode 100644 index 0000000000..869d63ab88 --- /dev/null +++ b/portality/events/combined.py @@ -0,0 +1,11 @@ +from portality.events.shortcircuit import send_event as shortcircuit_send_event +from portality.core import app + + +def send_event(event): + try: + from portality.events.kafka_producer import send_event as kafka_send_event + kafka_send_event(event) + except Exception as e: + app.logger.exception("Failed to send event to Kafka. " + str(e)) + shortcircuit_send_event(event) diff --git a/portality/events/kafka_consumer.py b/portality/events/kafka_consumer.py index 77c812b6e2..0ce1e1120e 100644 --- a/portality/events/kafka_consumer.py +++ b/portality/events/kafka_consumer.py @@ -11,13 +11,19 @@ app = faust.App('events', broker=broker, value_serializer='json') topic = app.topic(topic_name) +event_counter = 0 + @app.agent(topic) async def handle_event(stream): + global event_counter with doajapp.test_request_context("/"): svc = DOAJ.eventsService() async for event in stream: - svc.consume(Event(raw=json.loads(event))) + event_counter += 1 + doajapp.logger.info(f"Kafka event count {event_counter}") + # TODO uncomment the following line once the Event model is fixed to Kafka + # svc.consume(Event(raw=json.loads(event))) if __name__ == '__main__': diff --git a/portality/lib/csv_utils.py b/portality/lib/csv_utils.py new file mode 100644 index 0000000000..c5a46f37fd --- /dev/null +++ b/portality/lib/csv_utils.py @@ -0,0 +1,9 @@ +import csv +from typing import Iterable, Union + + +def read_all(csv_path, as_dict=False) -> Iterable[Union[list, dict]]: + reader = csv.DictReader if as_dict else csv.reader + with open(csv_path, 'r') as f: + for row in reader(f): + yield row diff --git a/portality/migrate/903_remove_blanks/README.md b/portality/migrate/903_remove_blanks/README.md new file mode 100644 index 0000000000..833c2c102f --- /dev/null +++ b/portality/migrate/903_remove_blanks/README.md @@ -0,0 +1,13 @@ +# Remove Blank + +remove blank from start or end of string in Journal and Application + +### Run +``` +python portality/upgrade.py -u portality/migrate/903_remove_blanks/migrate.json +``` + +### verify +``` +python -m portality.scripts.blank_field_finder +``` \ No newline at end of file diff --git a/portality/migrate/903_remove_blanks/__init__.py b/portality/migrate/903_remove_blanks/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/portality/migrate/903_remove_blanks/functions.py b/portality/migrate/903_remove_blanks/functions.py new file mode 100644 index 0000000000..f36c435314 --- /dev/null +++ b/portality/migrate/903_remove_blanks/functions.py @@ -0,0 +1,21 @@ +def remove_blanks(obj) -> dict: + if not isinstance(obj, dict): + return obj + + for k, v in obj.items(): + if isinstance(v, dict): + obj[k] = remove_blanks(v) + + elif isinstance(v, list): + if not v: + continue + if isinstance(v[0], dict): + obj[k] = [remove_blanks(item) for item in v] + elif isinstance(v[0], str): + obj[k] = [item.strip() for item in v] + + elif isinstance(v, str) and v != v.strip(): + print(f'remove blanks: {k} = [{v}]') + obj[k] = v.strip() + + return obj diff --git a/portality/migrate/903_remove_blanks/migrate.json b/portality/migrate/903_remove_blanks/migrate.json new file mode 100644 index 0000000000..64d4a31842 --- /dev/null +++ b/portality/migrate/903_remove_blanks/migrate.json @@ -0,0 +1,13 @@ +{ + "batch" : 10000, + "types": [ + { + "type" : "journal", + "init_with_model" : false, + "keepalive" : "10m", + "functions" : [ + "portality.migrate.903_remove_blanks.functions.remove_blanks" + ] + } + ] +} \ No newline at end of file diff --git a/portality/scripts/blank_field_finder.py b/portality/scripts/blank_field_finder.py new file mode 100644 index 0000000000..028332b1a6 --- /dev/null +++ b/portality/scripts/blank_field_finder.py @@ -0,0 +1,87 @@ +import argparse +from pathlib import Path +from typing import Any, Iterable + +from portality.bll.services.journal import JournalService +from portality.lib import csv_utils +from portality.models import Application, Journal + + +def to_k_v(item: Any, prefix: list = None): + if prefix is None: + prefix = [] + + if isinstance(item, dict): + for k, v in item.items(): + yield from to_k_v(v, prefix=prefix + [k]) + + elif isinstance(item, list): + for k, v in enumerate(item): + yield from to_k_v(v, prefix=prefix + [k]) + else: + yield '.'.join(map(str, prefix)), str(item) + + +def tee(txt: str, out_file): + print(txt) + out_file.write(txt + '\n') + + +def write_bad_data_domain_object(domain_object_class: Any, out_path): + with open(out_path, 'w') as f: + items = iter(domain_object_class.iterall()) + while True: + try: + j = next(items, None) + except: + continue + + if j is None: + break + + for k, v in filter_bad_only(to_k_v(j.data)): + tee(f'{j.id} {k} [{v}]', f) + + +def main2(): + with open('/tmp/journals.csv', 'w') as f: + JournalService._make_journals_csv(f) + + +def is_bad_str(v: str): + return isinstance(v, str) and v != v.strip() + + +def filter_bad_only(row: Iterable): + return (i for i in row if is_bad_str(i[1])) + + +def write_bad_data_journals_csv(csv_path, out_path): + with open(out_path, 'w') as out_file: + for row in csv_utils.read_all(csv_path, as_dict=True): + for k, v in filter_bad_only(row.items()): + tee(f'{k} [{v}]', out_file) + + +def write_results(journal_csv_path, out_dir): + # out_dir = Path('/tmp') + # journal_csv_path = '/home/kk/tmp/journals.csv' + out_dir = Path(out_dir) + write_bad_data_domain_object(Application, out_dir / 'bad_app.txt') + write_bad_data_domain_object(Journal, out_dir / 'bad_journals.txt') + if journal_csv_path: + write_bad_data_journals_csv(journal_csv_path, out_dir / 'bad_journals_csv.txt') + + +def main(): + parser = argparse.ArgumentParser(description='Output file with bad data') + parser.add_argument('-i', '--input', help='Path of input CSV file', type=str, default=None) + parser.add_argument('-o', '--output', help='Output directory', type=str, default='.') + args = parser.parse_args( + # ['-i', '/home/kk/tmp/journals.csv', '-o', '/tmp'] + ) + write_results(args.input, args.output) + + +if __name__ == '__main__': + main() diff --git a/portality/upgrade.py b/portality/upgrade.py index fb600977cc..1f33ba6140 100644 --- a/portality/upgrade.py +++ b/portality/upgrade.py @@ -6,6 +6,8 @@ from datetime import datetime, timedelta from copy import deepcopy from collections import OrderedDict +from typing import TypedDict, List, Dict + from portality import models from portality.dao import ScrollTimeoutException from portality.lib import plugin, dates @@ -14,12 +16,12 @@ from portality.dao import ScrollTimeoutException MODELS = { - "journal": models.Journal, #~~->Journal:Model~~ - "article": models.Article, #~~->Article:Model~~ - "suggestion": models.Suggestion, #~~->Application:Model~~ + "journal": models.Journal, # ~~->Journal:Model~~ + "article": models.Article, # ~~->Article:Model~~ + "suggestion": models.Suggestion, # ~~->Application:Model~~ "application": models.Application, - "account": models.Account, #~~->Account:Model~~ - "background_job": models.BackgroundJob #~~->BackgroundJob:Model~~ + "account": models.Account, # ~~->Account:Model~~ + "background_job": models.BackgroundJob # ~~->BackgroundJob:Model~~ } @@ -29,7 +31,42 @@ def upgrade_article(self, article): pass -def do_upgrade(definition, verbose, save_batches=None): +class UpgradeType(TypedDict): + type: str # name / key of the MODELS class + action: str # default is update + query: dict # ES query to use to find the records to upgrade + keepalive: str # ES keepalive time for the scroll, default 1m + scroll_size: int # ES scroll size, default 1000 + + """ + python path of functions to run on the record + interface of the function should be: + my_function(instance: DomainObject | dict) -> DomainObject | dict + """ + functions: List[str] + + """ + instance would be a DomainObject if True, otherwise a dict + default is True + """ + init_with_model: bool # + + """ + tasks to run on the record + that will only work if init_with_model is True + + format of each task: + { function name of model : kwargs } + """ + tasks: List[Dict[str, dict]] + + +class Definition(TypedDict): + batch: int + types: List[UpgradeType] + + +def do_upgrade(definition: Definition, verbose, save_batches=None): # get the source and target es definitions # ~~->Elasticsearch:Technology~~ @@ -54,7 +91,8 @@ def do_upgrade(definition, verbose, save_batches=None): # Iterate through all of the records in the model class try: - for result in model_class.iterate(q=tdef.get("query", default_query), keepalive=tdef.get("keepalive", "1m"), page_size=tdef.get("scroll_size", 1000), wrap=False): + for result in model_class.iterate(q=tdef.get("query", default_query), keepalive=tdef.get("keepalive", "1m"), + page_size=tdef.get("scroll_size", 1000), wrap=False): original = deepcopy(result) if tdef.get("init_with_model", True): @@ -83,7 +121,8 @@ def do_upgrade(definition, verbose, save_batches=None): result.prep() except AttributeError: if verbose: - print(tdef.get("type"), result.id, "has no prep method - no, pre-save preparation being done") + print(tdef.get("type"), result.id, + "has no prep method - no, pre-save preparation being done") pass data = result.data @@ -134,7 +173,8 @@ def do_upgrade(definition, verbose, save_batches=None): f.write(json.dumps(batch, indent=2)) print(dates.now(), "wrote batch to file {x}".format(x=fn)) - print(dates.now(), "scroll timed out / writing ", len(batch), "to", tdef.get("type"), ";", total, "of", max) + print(dates.now(), "scroll timed out / writing ", len(batch), "to", + tdef.get("type"), ";", total, "of", max) model_class.bulk(batch, action=action, req_timeout=120) batch = [] @@ -180,6 +220,7 @@ def recurse(context, c, o): if __name__ == "__main__": # ~~->Migrate:Script~~ import argparse + parser = argparse.ArgumentParser() parser.add_argument("-u", "--upgrade", help="path to upgrade definition") parser.add_argument("-v", "--verbose", action="store_true", help="verbose output to stdout during processing") diff --git a/production.cfg b/production.cfg index 1e6e01f2e9..897ff34004 100644 --- a/production.cfg +++ b/production.cfg @@ -58,7 +58,12 @@ PLAUSIBLE_URL = "https://plausible.io" # Run notifications through Kafka in production. #EVENT_SEND_FUNCTION = "portality.events.kafka_producer.send_event" -EVENT_SEND_FUNCTION = "portality.events.shortcircuit.send_event" +#EVENT_SEND_FUNCTION = "portality.events.shortcircuit.send_event" + +# 2023-08-03 Use the combined event sender for max traffic - doaj-kafka machine +EVENT_SEND_FUNCTION = "portality.events.combined.send_event" +KAFKA_BROKER = "kafka://10.131.35.14:9092" +KAFKA_BOOTSTRAP_SERVER = "10.131.35.14:9092" # https://github.com/DOAJ/doajPM/issues/3565 2023-03-07 PRESERVATION_PAGE_UNDER_MAINTENANCE = False diff --git a/test.cfg b/test.cfg index b6b18f3b15..01f2c9c5bf 100644 --- a/test.cfg +++ b/test.cfg @@ -64,7 +64,12 @@ PUBLIC_REGISTER = True LOGIN_VIA_ACCOUNT_ID = True # 2022-12-09 enable the shorcircuit handler until we can fix kafka -EVENT_SEND_FUNCTION = "portality.events.shortcircuit.send_event" +#EVENT_SEND_FUNCTION = "portality.events.shortcircuit.send_event" + +# 2023-08-02 Use the combined event sender for max traffic - doaj-kafka machine +EVENT_SEND_FUNCTION = "portality.events.combined.send_event" +KAFKA_BROKER = "kafka://10.131.35.14:9092" +KAFKA_BOOTSTRAP_SERVER = "10.131.35.14:9092" # No plausible on test PLAUSIBLE_URL = None