Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make minio local only; add a speedup; make headless prod only #92

Merged
merged 3 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion Docker/Docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ services:
- "../.env"
networks:
- dagster_network
profiles:
- localInfra

# GraphDB service for storage
graphdb:
Expand Down Expand Up @@ -165,7 +167,7 @@ services:
- production

# Headless browser for testing
headless:
scheduler_headless:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made service name match container name just for clarity; shouldnt change anything to my understanding

image: chromedp/headless-shell:latest
container_name: scheduler_headless
ports:
Expand All @@ -174,6 +176,8 @@ services:
- dagster_network
environment:
- SERVICE_PORTS=9222
profiles:
- production

volumes:
scheduler_minio_data:
Expand Down
19 changes: 11 additions & 8 deletions userCode/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,7 @@ def gleaner_config(context: AssetExecutionContext):
sources = []
names: set[str] = set()

assert (
len(Lines) > 0
), f"No sitemaps found in sitemap index {REMOTE_GLEANER_SITEMAP}"
assert len(Lines) > 0, f"No sitemaps found in index {REMOTE_GLEANER_SITEMAP}"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made this shorter since my local ruff formatted is different from ruff in precommit for some reason and messes with this if it is too long


for line in Lines:
basename = REMOTE_GLEANER_SITEMAP.removesuffix(".xml")
Expand Down Expand Up @@ -323,9 +321,14 @@ def can_contact_headless():
if RUNNING_AS_TEST_OR_DEV():
portNumber = GLEANER_HEADLESS_ENDPOINT.removeprefix("http://").split(":")[1]
url = f"http://localhost:{portNumber}"
get_dagster_logger().warning(
f"Skipping headless check in test mode. Check would have pinged {url}"
)
# Dagster does not support skipping asset checks so must return a valid result
return AssetCheckResult(passed=True)

# the Host header needs to be set for Chromium due to an upstream security requirement
result = requests.get(url, timeout=TWO_SECONDS)
result = requests.get(url, headers={"Host": "localhost"}, timeout=TWO_SECONDS)
return AssetCheckResult(
passed=result.status_code == 200,
metadata={
Expand Down Expand Up @@ -438,7 +441,7 @@ def nabu_prov_release(context):
)


@asset(partitions_def=sources_partitions_def, deps=[nabu_prov_release])
@asset(partitions_def=sources_partitions_def, deps=[gleaner])
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prov_clear clears the triplestore whereas prov_release generates a nq file but does not operate on the triplestore. Thus they can be done in parallel and are not dependent on each other

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

graph looks like this now

image

def nabu_prov_clear(context: OpExecutionContext):
"""Clears the prov graph before putting the new nq in"""
source = context.partition_key
Expand All @@ -458,7 +461,7 @@ def nabu_prov_clear(context: OpExecutionContext):
)


@asset(partitions_def=sources_partitions_def, deps=[nabu_prov_clear])
@asset(partitions_def=sources_partitions_def, deps=[nabu_prov_clear, nabu_prov_release])
def nabu_prov_object(context):
"""Take the nq file from s3 and use the sparql API to upload it into the prov graph repository"""
source = context.partition_key
Expand Down Expand Up @@ -503,7 +506,7 @@ def nabu_orgs_release(context: OpExecutionContext):


@asset(partitions_def=sources_partitions_def, deps=[nabu_orgs_release])
def nabu_orgs(context: OpExecutionContext):
def nabu_orgs_prefix(context: OpExecutionContext):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added the word prefix to make it clear that the nabu prefix X command is being ran

"""Move the orgs nq file(s) into the graphdb"""
source = context.partition_key
ARGS = [
Expand All @@ -526,7 +529,7 @@ def nabu_orgs(context: OpExecutionContext):

@asset(
partitions_def=sources_partitions_def,
deps=[nabu_orgs, nabu_prune],
deps=[nabu_orgs_prefix, nabu_prune],
)
def finished_individual_crawl(context: OpExecutionContext):
"""Dummy asset signifying the geoconnex crawl is completed once the orgs and prov nq files are in the graphdb and the graph is synced with the s3 bucket"""
Expand Down