From a52f93717dc686bd35fd31690cc5993f9a668294 Mon Sep 17 00:00:00 2001 From: Colton Loftus <70598503+C-Loftus@users.noreply.github.com> Date: Thu, 6 Feb 2025 13:36:59 -0500 Subject: [PATCH 1/2] More speedups for pipeline --- Docker/Docker-compose.yaml | 6 +++++- userCode/main.py | 19 +++++++++++-------- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/Docker/Docker-compose.yaml b/Docker/Docker-compose.yaml index 05060b3e..496bc06d 100644 --- a/Docker/Docker-compose.yaml +++ b/Docker/Docker-compose.yaml @@ -15,6 +15,8 @@ services: - "../.env" networks: - dagster_network + profiles: + - localInfra # Creates buckets for MinIO createbuckets: @@ -180,7 +182,7 @@ services: - production # Headless browser for testing - headless: + scheduler_headless: image: chromedp/headless-shell:latest container_name: scheduler_headless ports: @@ -189,6 +191,8 @@ services: - dagster_network environment: - SERVICE_PORTS=9222 + profiles: + - production volumes: scheduler_minio_data: diff --git a/userCode/main.py b/userCode/main.py index 405766ed..30f960ec 100644 --- a/userCode/main.py +++ b/userCode/main.py @@ -195,9 +195,7 @@ def gleaner_config(context: AssetExecutionContext): sources = [] names: set[str] = set() - assert ( - len(Lines) > 0 - ), f"No sitemaps found in sitemap index {REMOTE_GLEANER_SITEMAP}" + assert len(Lines) > 0, f"No sitemaps found in index {REMOTE_GLEANER_SITEMAP}" for line in Lines: basename = REMOTE_GLEANER_SITEMAP.removesuffix(".xml") @@ -314,9 +312,14 @@ def can_contact_headless(): if RUNNING_AS_TEST_OR_DEV(): portNumber = GLEANER_HEADLESS_ENDPOINT.removeprefix("http://").split(":")[1] url = f"http://localhost:{portNumber}" + get_dagster_logger().warning( + f"Skipping headless check in test mode. Check would have pinged {url}" + ) + # Dagster does not support skipping asset checks so must return a valid result + return AssetCheckResult(passed=True) # the Host header needs to be set for Chromium due to an upstream security requirement - result = requests.get(url, timeout=TWO_SECONDS) + result = requests.get(url, headers={"Host": "localhost"}, timeout=TWO_SECONDS) return AssetCheckResult( passed=result.status_code == 200, metadata={ @@ -427,7 +430,7 @@ def nabu_prov_release(context): ) -@asset(partitions_def=sources_partitions_def, deps=[nabu_prov_release]) +@asset(partitions_def=sources_partitions_def, deps=[gleaner]) def nabu_prov_clear(context: OpExecutionContext): """Clears the prov graph before putting the new nq in""" source = context.partition_key @@ -447,7 +450,7 @@ def nabu_prov_clear(context: OpExecutionContext): ) -@asset(partitions_def=sources_partitions_def, deps=[nabu_prov_clear]) +@asset(partitions_def=sources_partitions_def, deps=[nabu_prov_clear, nabu_prov_release]) def nabu_prov_object(context): """Take the nq file from s3 and use the sparql API to upload it into the prov graph repository""" source = context.partition_key @@ -492,7 +495,7 @@ def nabu_orgs_release(context: OpExecutionContext): @asset(partitions_def=sources_partitions_def, deps=[nabu_orgs_release]) -def nabu_orgs(context: OpExecutionContext): +def nabu_orgs_prefix(context: OpExecutionContext): """Move the orgs nq file(s) into the graphdb""" source = context.partition_key ARGS = [ @@ -515,7 +518,7 @@ def nabu_orgs(context: OpExecutionContext): @asset( partitions_def=sources_partitions_def, - deps=[nabu_orgs, nabu_prov_object, nabu_prune], + deps=[nabu_orgs_prefix, nabu_prov_object, nabu_prune], ) def finished_individual_crawl(context: OpExecutionContext): """Dummy asset signifying the geoconnex crawl is completed once the orgs and prov nq files are in the graphdb and the graph is synced with the s3 bucket""" From 1781ee9edc4645da05ff49e20652c6c4c1f78814 Mon Sep 17 00:00:00 2001 From: Colton Loftus <70598503+C-Loftus@users.noreply.github.com> Date: Thu, 6 Feb 2025 13:44:30 -0500 Subject: [PATCH 2/2] get rid of prov dependency on export --- userCode/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/userCode/main.py b/userCode/main.py index e4006cf8..8cd3f627 100644 --- a/userCode/main.py +++ b/userCode/main.py @@ -529,7 +529,7 @@ def nabu_orgs_prefix(context: OpExecutionContext): @asset( partitions_def=sources_partitions_def, - deps=[nabu_orgs_prefix, nabu_prov_object, nabu_prune], + deps=[nabu_orgs_prefix, nabu_prune], ) def finished_individual_crawl(context: OpExecutionContext): """Dummy asset signifying the geoconnex crawl is completed once the orgs and prov nq files are in the graphdb and the graph is synced with the s3 bucket"""