diff --git a/alephclient/cli.py b/alephclient/cli.py index 1d373c8..1204c50 100644 --- a/alephclient/cli.py +++ b/alephclient/cli.py @@ -1,13 +1,15 @@ import json -import click import logging -from pathlib import Path + +import click +from requests import HTTPError from alephclient import settings from alephclient.api import AlephAPI -from alephclient.errors import AlephException from alephclient.crawldir import crawl_dir +from alephclient.errors import AlephException from alephclient.fetchdir import fetch_collection, fetch_entity +from alephclient.load_catalog import load_catalog log = logging.getLogger(__name__) @@ -296,6 +298,60 @@ def read_json_stream(stream): raise click.Abort() +@cli.command("load-catalog") +@click.argument("url") +@click.option( + "-c", + "--chunksize", + default=1000, + type=click.INT, + help="chunk size when sending to API", +) +@click.option( + "--force", is_flag=True, default=False, help="continue after server errors" +) +@click.option( + "--unsafe", is_flag=True, default=False, help="disable server-side validation" +) +@click.option("--frequency", help="Add frequency label to collections") +@click.option("--exclude", help="Exclude dataset(s)", multiple=True) +@click.option("--include", help="Include dataset(s)", multiple=True) +@click.pass_context +def _load_catalog( + ctx, + url, + chunksize=1000, + force=False, + unsafe=False, + frequency=None, + exclude=[], + include=[], +): + """Import a catalog from a given url""" + api = ctx.obj["api"] + try: + for collection_id, loader in load_catalog( + api, + url, + exclude_datasets=exclude, + include_datasets=include, + frequency=frequency, + ): + api.write_entities( + collection_id, + loader, + chunk_size=chunksize, + unsafe=unsafe, + force=force, + ) + except AlephException as exc: + raise click.ClickException(exc.message) + except HTTPError as exc: + raise click.ClickException(str(exc)) + except BrokenPipeError: + raise click.Abort() + + @cli.command("stream-entities") @click.option("-o", "--outfile", type=click.File("w"), default="-") # noqa @click.option("-s", "--schema", multiple=True, default=[]) # noqa diff --git a/alephclient/load_catalog.py b/alephclient/load_catalog.py new file mode 100644 index 0000000..5047f56 --- /dev/null +++ b/alephclient/load_catalog.py @@ -0,0 +1,89 @@ +import json +import logging +from typing import Any, Dict, Generator, List, Optional, Tuple + +import requests +from banal import ensure_list + +from alephclient.api import AlephAPI + +log = logging.getLogger(__name__) + +EntityData = Dict[str, Any] +Loader = Tuple[str, Generator[EntityData, None, None]] + +MIME_TYPE = "application/json+ftm" + + +def stream_resource(url: str, foreign_id: str) -> Generator[EntityData, None, None]: + res = requests.get(url, stream=True) + if not res.ok: + log.error(f"[{foreign_id}] {res.status_code}") + return + + for ix, data in enumerate(res.iter_lines()): + if ix and ix % 1000 == 0: + log.info("[%s] Bulk load entities: %s...", foreign_id, ix) + yield json.loads(data) + + +def load_catalog( + api: AlephAPI, + url: str, + exclude_datasets: Optional[List[str]] = [], + include_datasets: Optional[List[str]] = [], + frequency: Optional[str] = None, +) -> Generator[Loader, None, None]: + res = requests.get(url) + if not res.ok: + raise requests.HTTPError(f"Fetch catalog failed: {res.status_code}") + + catalog = res.json() + for dataset in catalog["datasets"]: + foreign_id = dataset["name"] + + if "type" in dataset and dataset["type"] == "collection": + continue + if dataset.get("children") or dataset.get("datasets"): + continue + if exclude_datasets and foreign_id in exclude_datasets: + continue + if include_datasets and foreign_id not in include_datasets: + continue + + aleph_collection = api.get_collection_by_foreign_id(foreign_id) + data = { + "label": dataset["title"], + "summary": ( + dataset.get("description", "") + or "" + "\n\n" + dataset.get("summary", "") # noqa + or "" # noqa + ).strip(), + "publisher": dataset.get("publisher", {}).get("name"), + "publisher_url": dataset.get("publisher", {}).get("url"), + "countries": ensure_list(dataset.get("publisher", {}).get("country")), + "data_url": dataset.get("data", {}).get("url"), + "category": dataset.get("category", "other"), + } + if "frequency" in dataset or frequency is not None: + data["frequency"] = dataset.get("frequency", frequency) + + if aleph_collection is not None: + log.info("[%s] Updating collection metadata ..." % foreign_id) + data.pop( + "category", None + ) # don't overwrite existing (probably user changed) category + aleph_collection = api.update_collection( + aleph_collection["collection_id"], data + ) + else: + log.info("[%s] Creating collection ..." % foreign_id) + aleph_collection = api.create_collection( + {**data, **{"foreign_id": dataset["name"]}} + ) + + for resource in ensure_list(dataset.get("resources")): + if resource["mime_type"] == MIME_TYPE: + loader = stream_resource(resource["url"], foreign_id) + if loader is not None: + yield aleph_collection["collection_id"], loader diff --git a/alephclient/tests/test_load_catalog.py b/alephclient/tests/test_load_catalog.py new file mode 100644 index 0000000..bfbe47f --- /dev/null +++ b/alephclient/tests/test_load_catalog.py @@ -0,0 +1,21 @@ +from alephclient.api import AlephAPI +from alephclient.load_catalog import load_catalog + + +class TestLoadCatalog: + fake_url = "http://aleph.test/api/2/" + catalog_url = "https://data.opensanctions.org/datasets/latest/index.json" + + def setup_method(self, mocker): + self.api = AlephAPI(host=self.fake_url, api_key="fake_key") + + def test_load_catalog(self, mocker): + mocker.patch.object(self.api, "_request") + for collection_id, loader in load_catalog(self.api, self.catalog_url): + self.api._request.assert_called_with( + "GET", "{}collections/{}".format(self.fake_url, collection_id) + ) + for data in loader: + assert isinstance(data, dict) + break + break