Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add load-catalog cli command #45

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 59 additions & 3 deletions alephclient/cli.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import json
import click
import logging
from pathlib import Path

import click
from requests import HTTPError

from alephclient import settings
from alephclient.api import AlephAPI
from alephclient.errors import AlephException
from alephclient.crawldir import crawl_dir
from alephclient.errors import AlephException
from alephclient.fetchdir import fetch_collection, fetch_entity
from alephclient.load_catalog import load_catalog

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -296,6 +298,60 @@ def read_json_stream(stream):
raise click.Abort()


@cli.command("load-catalog")
@click.argument("url")
@click.option(
"-c",
"--chunksize",
default=1000,
type=click.INT,
help="chunk size when sending to API",
)
@click.option(
"--force", is_flag=True, default=False, help="continue after server errors"
)
@click.option(
"--unsafe", is_flag=True, default=False, help="disable server-side validation"
)
@click.option("--frequency", help="Add frequency label to collections")
@click.option("--exclude", help="Exclude dataset(s)", multiple=True)
@click.option("--include", help="Include dataset(s)", multiple=True)
@click.pass_context
def _load_catalog(
ctx,
url,
chunksize=1000,
force=False,
unsafe=False,
frequency=None,
exclude=[],
include=[],
):
"""Import a catalog from a given url"""
api = ctx.obj["api"]
try:
for collection_id, loader in load_catalog(
api,
url,
exclude_datasets=exclude,
include_datasets=include,
frequency=frequency,
):
api.write_entities(
collection_id,
loader,
chunk_size=chunksize,
unsafe=unsafe,
force=force,
)
except AlephException as exc:
raise click.ClickException(exc.message)
except HTTPError as exc:
raise click.ClickException(str(exc))
except BrokenPipeError:
raise click.Abort()


@cli.command("stream-entities")
@click.option("-o", "--outfile", type=click.File("w"), default="-") # noqa
@click.option("-s", "--schema", multiple=True, default=[]) # noqa
Expand Down
89 changes: 89 additions & 0 deletions alephclient/load_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import json
import logging
from typing import Any, Dict, Generator, List, Optional, Tuple

import requests
from banal import ensure_list

from alephclient.api import AlephAPI

log = logging.getLogger(__name__)

EntityData = Dict[str, Any]
Loader = Tuple[str, Generator[EntityData, None, None]]

MIME_TYPE = "application/json+ftm"


def stream_resource(url: str, foreign_id: str) -> Generator[EntityData, None, None]:
res = requests.get(url, stream=True)
if not res.ok:
log.error(f"[{foreign_id}] {res.status_code}")
return

for ix, data in enumerate(res.iter_lines()):
if ix and ix % 1000 == 0:
log.info("[%s] Bulk load entities: %s...", foreign_id, ix)
yield json.loads(data)


def load_catalog(
api: AlephAPI,
url: str,
exclude_datasets: Optional[List[str]] = [],
include_datasets: Optional[List[str]] = [],
frequency: Optional[str] = None,
) -> Generator[Loader, None, None]:
res = requests.get(url)
if not res.ok:
raise requests.HTTPError(f"Fetch catalog failed: {res.status_code}")

catalog = res.json()
for dataset in catalog["datasets"]:
foreign_id = dataset["name"]

if "type" in dataset and dataset["type"] == "collection":
continue
if dataset.get("children") or dataset.get("datasets"):
continue
if exclude_datasets and foreign_id in exclude_datasets:
continue
if include_datasets and foreign_id not in include_datasets:
continue

aleph_collection = api.get_collection_by_foreign_id(foreign_id)
data = {
"label": dataset["title"],
"summary": (
dataset.get("description", "")
or "" + "\n\n" + dataset.get("summary", "") # noqa
or "" # noqa
).strip(),
"publisher": dataset.get("publisher", {}).get("name"),
"publisher_url": dataset.get("publisher", {}).get("url"),
"countries": ensure_list(dataset.get("publisher", {}).get("country")),
"data_url": dataset.get("data", {}).get("url"),
"category": dataset.get("category", "other"),
}
if "frequency" in dataset or frequency is not None:
data["frequency"] = dataset.get("frequency", frequency)

if aleph_collection is not None:
log.info("[%s] Updating collection metadata ..." % foreign_id)
data.pop(
"category", None
) # don't overwrite existing (probably user changed) category
aleph_collection = api.update_collection(
aleph_collection["collection_id"], data
)
else:
log.info("[%s] Creating collection ..." % foreign_id)
aleph_collection = api.create_collection(
{**data, **{"foreign_id": dataset["name"]}}
)

for resource in ensure_list(dataset.get("resources")):
if resource["mime_type"] == MIME_TYPE:
loader = stream_resource(resource["url"], foreign_id)
if loader is not None:
yield aleph_collection["collection_id"], loader
21 changes: 21 additions & 0 deletions alephclient/tests/test_load_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from alephclient.api import AlephAPI
from alephclient.load_catalog import load_catalog


class TestLoadCatalog:
fake_url = "http://aleph.test/api/2/"
catalog_url = "https://data.opensanctions.org/datasets/latest/index.json"

def setup_method(self, mocker):
self.api = AlephAPI(host=self.fake_url, api_key="fake_key")

def test_load_catalog(self, mocker):
mocker.patch.object(self.api, "_request")
for collection_id, loader in load_catalog(self.api, self.catalog_url):
self.api._request.assert_called_with(
"GET", "{}collections/{}".format(self.fake_url, collection_id)
)
for data in loader:
assert isinstance(data, dict)
break
break