Skip to content

Commit

Permalink
Merge branch 'release/2023-08-03_remove_blanks_kafka_combined'
Browse files Browse the repository at this point in the history
  • Loading branch information
Steven-Eardley committed Aug 3, 2023
2 parents 3836d04 + 8e8c892 commit 93b43ee
Show file tree
Hide file tree
Showing 12 changed files with 224 additions and 12 deletions.
1 change: 1 addition & 0 deletions doajtest/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ class DoajTestCase(TestCase):

@classmethod
def setUpClass(cls) -> None:
import portality.app # noqa, needed to registing routes
cls.originals = patch_config(app, {
"STORE_IMPL": "portality.store.StoreLocal",
"STORE_LOCAL_DIR": paths.rel2abs(__file__, "..", "tmp", "store", "main", cls.__name__.lower()),
Expand Down
11 changes: 11 additions & 0 deletions portality/events/combined.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from portality.events.shortcircuit import send_event as shortcircuit_send_event
from portality.core import app


def send_event(event):
try:
from portality.events.kafka_producer import send_event as kafka_send_event
kafka_send_event(event)
except Exception as e:
app.logger.exception("Failed to send event to Kafka. " + str(e))
shortcircuit_send_event(event)
8 changes: 7 additions & 1 deletion portality/events/kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,19 @@
app = faust.App('events', broker=broker, value_serializer='json')
topic = app.topic(topic_name)

event_counter = 0


@app.agent(topic)
async def handle_event(stream):
global event_counter
with doajapp.test_request_context("/"):
svc = DOAJ.eventsService()
async for event in stream:
svc.consume(Event(raw=json.loads(event)))
event_counter += 1
doajapp.logger.info(f"Kafka event count {event_counter}")
# TODO uncomment the following line once the Event model is fixed to Kafka
# svc.consume(Event(raw=json.loads(event)))


if __name__ == '__main__':
Expand Down
9 changes: 9 additions & 0 deletions portality/lib/csv_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import csv
from typing import Iterable, Union


def read_all(csv_path, as_dict=False) -> Iterable[Union[list, dict]]:
reader = csv.DictReader if as_dict else csv.reader
with open(csv_path, 'r') as f:
for row in reader(f):
yield row
13 changes: 13 additions & 0 deletions portality/migrate/903_remove_blanks/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Remove Blank

remove blank from start or end of string in Journal and Application

### Run
```
python portality/upgrade.py -u portality/migrate/903_remove_blanks/migrate.json
```

### verify
```
python -m portality.scripts.blank_field_finder
```
Empty file.
21 changes: 21 additions & 0 deletions portality/migrate/903_remove_blanks/functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
def remove_blanks(obj) -> dict:
if not isinstance(obj, dict):
return obj

for k, v in obj.items():
if isinstance(v, dict):
obj[k] = remove_blanks(v)

elif isinstance(v, list):
if not v:
continue
if isinstance(v[0], dict):
obj[k] = [remove_blanks(item) for item in v]
elif isinstance(v[0], str):
obj[k] = [item.strip() for item in v]

elif isinstance(v, str) and v != v.strip():
print(f'remove blanks: {k} = [{v}]')
obj[k] = v.strip()

return obj
13 changes: 13 additions & 0 deletions portality/migrate/903_remove_blanks/migrate.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"batch" : 10000,
"types": [
{
"type" : "journal",
"init_with_model" : false,
"keepalive" : "10m",
"functions" : [
"portality.migrate.903_remove_blanks.functions.remove_blanks"
]
}
]
}
87 changes: 87 additions & 0 deletions portality/scripts/blank_field_finder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import argparse
from pathlib import Path
from typing import Any, Iterable

from portality.bll.services.journal import JournalService
from portality.lib import csv_utils
from portality.models import Application, Journal


def to_k_v(item: Any, prefix: list = None):
if prefix is None:
prefix = []

if isinstance(item, dict):
for k, v in item.items():
yield from to_k_v(v, prefix=prefix + [k])

elif isinstance(item, list):
for k, v in enumerate(item):
yield from to_k_v(v, prefix=prefix + [k])
else:
yield '.'.join(map(str, prefix)), str(item)


def tee(txt: str, out_file):
print(txt)
out_file.write(txt + '\n')


def write_bad_data_domain_object(domain_object_class: Any, out_path):
with open(out_path, 'w') as f:
items = iter(domain_object_class.iterall())
while True:
try:
j = next(items, None)
except:
continue

if j is None:
break

for k, v in filter_bad_only(to_k_v(j.data)):
tee(f'{j.id} {k} [{v}]', f)


def main2():
with open('/tmp/journals.csv', 'w') as f:
JournalService._make_journals_csv(f)


def is_bad_str(v: str):
return isinstance(v, str) and v != v.strip()


def filter_bad_only(row: Iterable):
return (i for i in row if is_bad_str(i[1]))


def write_bad_data_journals_csv(csv_path, out_path):
with open(out_path, 'w') as out_file:
for row in csv_utils.read_all(csv_path, as_dict=True):
for k, v in filter_bad_only(row.items()):
tee(f'{k} [{v}]', out_file)


def write_results(journal_csv_path, out_dir):
# out_dir = Path('/tmp')
# journal_csv_path = '/home/kk/tmp/journals.csv'
out_dir = Path(out_dir)
write_bad_data_domain_object(Application, out_dir / 'bad_app.txt')
write_bad_data_domain_object(Journal, out_dir / 'bad_journals.txt')
if journal_csv_path:
write_bad_data_journals_csv(journal_csv_path, out_dir / 'bad_journals_csv.txt')


def main():
parser = argparse.ArgumentParser(description='Output file with bad data')
parser.add_argument('-i', '--input', help='Path of input CSV file', type=str, default=None)
parser.add_argument('-o', '--output', help='Output directory', type=str, default='.')
args = parser.parse_args(
# ['-i', '/home/kk/tmp/journals.csv', '-o', '/tmp']
)
write_results(args.input, args.output)


if __name__ == '__main__':
main()
59 changes: 50 additions & 9 deletions portality/upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from datetime import datetime, timedelta
from copy import deepcopy
from collections import OrderedDict
from typing import TypedDict, List, Dict

from portality import models
from portality.dao import ScrollTimeoutException
from portality.lib import plugin, dates
Expand All @@ -14,12 +16,12 @@
from portality.dao import ScrollTimeoutException

MODELS = {
"journal": models.Journal, #~~->Journal:Model~~
"article": models.Article, #~~->Article:Model~~
"suggestion": models.Suggestion, #~~->Application:Model~~
"journal": models.Journal, # ~~->Journal:Model~~
"article": models.Article, # ~~->Article:Model~~
"suggestion": models.Suggestion, # ~~->Application:Model~~
"application": models.Application,
"account": models.Account, #~~->Account:Model~~
"background_job": models.BackgroundJob #~~->BackgroundJob:Model~~
"account": models.Account, # ~~->Account:Model~~
"background_job": models.BackgroundJob # ~~->BackgroundJob:Model~~
}


Expand All @@ -29,7 +31,42 @@ def upgrade_article(self, article):
pass


def do_upgrade(definition, verbose, save_batches=None):
class UpgradeType(TypedDict):
type: str # name / key of the MODELS class
action: str # default is update
query: dict # ES query to use to find the records to upgrade
keepalive: str # ES keepalive time for the scroll, default 1m
scroll_size: int # ES scroll size, default 1000

"""
python path of functions to run on the record
interface of the function should be:
my_function(instance: DomainObject | dict) -> DomainObject | dict
"""
functions: List[str]

"""
instance would be a DomainObject if True, otherwise a dict
default is True
"""
init_with_model: bool #

"""
tasks to run on the record
that will only work if init_with_model is True
format of each task:
{ function name of model : kwargs }
"""
tasks: List[Dict[str, dict]]


class Definition(TypedDict):
batch: int
types: List[UpgradeType]


def do_upgrade(definition: Definition, verbose, save_batches=None):
# get the source and target es definitions
# ~~->Elasticsearch:Technology~~

Expand All @@ -54,7 +91,8 @@ def do_upgrade(definition, verbose, save_batches=None):

# Iterate through all of the records in the model class
try:
for result in model_class.iterate(q=tdef.get("query", default_query), keepalive=tdef.get("keepalive", "1m"), page_size=tdef.get("scroll_size", 1000), wrap=False):
for result in model_class.iterate(q=tdef.get("query", default_query), keepalive=tdef.get("keepalive", "1m"),
page_size=tdef.get("scroll_size", 1000), wrap=False):

original = deepcopy(result)
if tdef.get("init_with_model", True):
Expand Down Expand Up @@ -83,7 +121,8 @@ def do_upgrade(definition, verbose, save_batches=None):
result.prep()
except AttributeError:
if verbose:
print(tdef.get("type"), result.id, "has no prep method - no, pre-save preparation being done")
print(tdef.get("type"), result.id,
"has no prep method - no, pre-save preparation being done")
pass

data = result.data
Expand Down Expand Up @@ -134,7 +173,8 @@ def do_upgrade(definition, verbose, save_batches=None):
f.write(json.dumps(batch, indent=2))
print(dates.now(), "wrote batch to file {x}".format(x=fn))

print(dates.now(), "scroll timed out / writing ", len(batch), "to", tdef.get("type"), ";", total, "of", max)
print(dates.now(), "scroll timed out / writing ", len(batch), "to",
tdef.get("type"), ";", total, "of", max)
model_class.bulk(batch, action=action, req_timeout=120)
batch = []

Expand Down Expand Up @@ -180,6 +220,7 @@ def recurse(context, c, o):
if __name__ == "__main__":
# ~~->Migrate:Script~~
import argparse

parser = argparse.ArgumentParser()
parser.add_argument("-u", "--upgrade", help="path to upgrade definition")
parser.add_argument("-v", "--verbose", action="store_true", help="verbose output to stdout during processing")
Expand Down
7 changes: 6 additions & 1 deletion production.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,12 @@ PLAUSIBLE_URL = "https://plausible.io"

# Run notifications through Kafka in production.
#EVENT_SEND_FUNCTION = "portality.events.kafka_producer.send_event"
EVENT_SEND_FUNCTION = "portality.events.shortcircuit.send_event"
#EVENT_SEND_FUNCTION = "portality.events.shortcircuit.send_event"

# 2023-08-03 Use the combined event sender for max traffic - doaj-kafka machine
EVENT_SEND_FUNCTION = "portality.events.combined.send_event"
KAFKA_BROKER = "kafka://10.131.35.14:9092"
KAFKA_BOOTSTRAP_SERVER = "10.131.35.14:9092"

# https://github.com/DOAJ/doajPM/issues/3565 2023-03-07
PRESERVATION_PAGE_UNDER_MAINTENANCE = False
7 changes: 6 additions & 1 deletion test.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,12 @@ PUBLIC_REGISTER = True
LOGIN_VIA_ACCOUNT_ID = True

# 2022-12-09 enable the shorcircuit handler until we can fix kafka
EVENT_SEND_FUNCTION = "portality.events.shortcircuit.send_event"
#EVENT_SEND_FUNCTION = "portality.events.shortcircuit.send_event"

# 2023-08-02 Use the combined event sender for max traffic - doaj-kafka machine
EVENT_SEND_FUNCTION = "portality.events.combined.send_event"
KAFKA_BROKER = "kafka://10.131.35.14:9092"
KAFKA_BOOTSTRAP_SERVER = "10.131.35.14:9092"

# No plausible on test
PLAUSIBLE_URL = None
Expand Down

0 comments on commit 93b43ee

Please sign in to comment.