From 1a19861cd84c9e559e8b8b2369b712cf9b2c361e Mon Sep 17 00:00:00 2001 From: Colton Loftus <70598503+C-Loftus@users.noreply.github.com> Date: Tue, 4 Feb 2025 10:55:50 -0500 Subject: [PATCH] add proper tmp paths --- Docker/Docker-compose.yaml | 7 ++++--- main.py | 16 +++++++++------- userCode/lib/env.py | 9 ++++++--- userCode/lib/utils.py | 11 ++++++++++- userCode/main.py | 31 ++++++++++++++++--------------- 5 files changed, 45 insertions(+), 29 deletions(-) diff --git a/Docker/Docker-compose.yaml b/Docker/Docker-compose.yaml index 3050b5f3..a69fa72f 100644 --- a/Docker/Docker-compose.yaml +++ b/Docker/Docker-compose.yaml @@ -15,8 +15,6 @@ services: - "../.env" networks: - dagster_network - profiles: - - localInfra # Creates buckets for MinIO createbuckets: @@ -139,6 +137,7 @@ services: - dagster-daemon - run volumes: + # we mount the docker sock to allow us to spawn nabu / gleaner containers - /var/run/docker.sock:/var/run/docker.sock - /tmp/io_manager_storage:/tmp/io_manager_storage depends_on: @@ -171,8 +170,10 @@ services: - GLEANER_IMAGE=${GLEANER_IMAGE:-internetofwater/gleaner:latest} - DAGSTER_POSTGRES_HOST=dagster_postgres volumes: - - ../userCode:/opt/dagster/app/userCode - /var/run/docker.sock:/var/run/docker.sock + # we mount the temp dir to allow us to access the nabu / gleaner configs from host + # since the docker socket mounts volumes relative to the host + - /tmp/geoconnex/:/tmp/geoconnex/ env_file: - "../.env" profiles: diff --git a/main.py b/main.py index 1f1c0a53..4d26a7dc 100644 --- a/main.py +++ b/main.py @@ -4,15 +4,13 @@ import sys import argparse -BUILD_DIR = os.path.join(os.path.dirname(__file__), "build") -TEMPLATE_DIR = os.path.join(os.path.dirname(__file__), "templates") """ This file is the CLI for managing Docker Compose-based infrastructure. """ -def run_subprocess(command: str, returnStdoutAsValue: bool = False, wait: bool = True): +def run_subprocess(command: str, returnStdoutAsValue: bool = False): """Run a shell command and stream the output in realtime""" process = subprocess.Popen( command, @@ -22,7 +20,10 @@ def run_subprocess(command: str, returnStdoutAsValue: bool = False, wait: bool = ) stdout, stderr = process.communicate() if process.returncode != 0: - print(stderr.decode("utf-8")) + if stderr: + print(stderr.decode("utf-8")) + if stdout: + print(stdout.decode("utf-8")) sys.exit(process.returncode) return stdout.decode("utf-8") if returnStdoutAsValue else None @@ -144,7 +145,8 @@ def main(): if __name__ == "__main__": - assert ( - os.path.dirname(os.path.abspath(__file__)) == os.getcwd() - ), "Please run this script from the root of the repository" + assert os.path.dirname(os.path.abspath(__file__)) == os.getcwd(), ( + "Please run this script from the root of the repository" + ) + os.makedirs("/tmp/geoconnex", exist_ok=True) main() diff --git a/userCode/lib/env.py b/userCode/lib/env.py index 22c7b6ba..94abf1ea 100644 --- a/userCode/lib/env.py +++ b/userCode/lib/env.py @@ -101,6 +101,9 @@ def strict_env(key: str): DAGSTER_YAML_CONFIG: str = os.path.join(userCodeRoot, "dagster.yaml") -assert Path( - DAGSTER_YAML_CONFIG -).exists(), f"the dagster.yaml file does not exist at {DAGSTER_YAML_CONFIG}" +assert Path(DAGSTER_YAML_CONFIG).exists(), ( + f"the dagster.yaml file does not exist at {DAGSTER_YAML_CONFIG}" +) +assert Path("/tmp/geoconnex/").exists(), ( + "the /tmp/geoconnex directory does not exist. This must exist for us to share configs with the docker socket on the host" +) diff --git a/userCode/lib/utils.py b/userCode/lib/utils.py index 6dafe088..8b1e33cd 100644 --- a/userCode/lib/utils.py +++ b/userCode/lib/utils.py @@ -31,7 +31,7 @@ from dagster_docker.utils import validate_docker_image -def remove_non_alphanumeric(string): +def remove_non_alphanumeric(string: str): return re.sub(r"[^a-zA-Z0-9_]+", "", string) @@ -65,6 +65,15 @@ def run_scheduler_docker_image( client = docker.DockerClient() + if volumeMapping: + for volume in volumeMapping: + src = volume.split(":")[0] + assert os.path.exists(src), f"volume {src} does not exist" + if src.endswith("/"): + assert os.path.isdir(src), f"volume {src} is not a directory" + else: + assert os.path.isfile(src), f"volume {src} is not a file" + container = client.containers.run( image_name, name=container_name, diff --git a/userCode/main.py b/userCode/main.py index 50187145..ea3c11a1 100644 --- a/userCode/main.py +++ b/userCode/main.py @@ -75,7 +75,7 @@ def nabu_config(): encoded_as_bytes, "configs/nabuconfig.yaml", ) - with open("/tmp/nabuconfig.yaml", "w") as f: + with open("/tmp/geoconnex/nabuconfig.yaml", "w") as f: f.write(templated_data) @@ -195,9 +195,9 @@ def gleaner_config(context: AssetExecutionContext): sources = [] names: set[str] = set() - assert ( - len(Lines) > 0 - ), f"No sitemaps found in sitemap index {REMOTE_GLEANER_SITEMAP}" + assert len(Lines) > 0, ( + f"No sitemaps found in sitemap index {REMOTE_GLEANER_SITEMAP}" + ) for line in Lines: basename = REMOTE_GLEANER_SITEMAP.removesuffix(".xml") @@ -251,7 +251,7 @@ def gleaner_config(context: AssetExecutionContext): encoded_as_bytes = yaml.dump(templated_base).encode() s3_client = S3() s3_client.load(encoded_as_bytes, "configs/gleanerconfig.yaml") - with open("/tmp/gleanerconfig.yaml", "w") as f: + with open("/tmp/geoconnex/gleanerconfig.yaml", "w") as f: f.write(yaml.dump(templated_base)) @@ -296,7 +296,7 @@ async def main(urls): @asset(deps=[gleaner_config, nabu_config, rclone_config]) def docker_client_environment(): """Set up dagster by pulling both the gleaner and nabu images and moving the config files into docker configs""" - get_dagster_logger().info("Getting docker client and pulling images: ") + get_dagster_logger().info("Initializing docker client and pulling images: ") client = docker.DockerClient() # check if the docker socket is available @@ -337,7 +337,7 @@ def gleaner(context: OpExecutionContext): GLEANER_IMAGE, ARGS, "gleaner", - volumeMapping=["/tmp/gleanerconfig.yaml:/app/gleanerconfig.yaml"], + volumeMapping=["/tmp/geoconnex/gleanerconfig.yaml:/app/gleanerconfig.yaml"], ) get_dagster_logger().info(f"Gleaner returned value: '{returned_value}'") @@ -358,7 +358,7 @@ def nabu_release(context: OpExecutionContext): NABU_IMAGE, ARGS, "release", - volumeMapping=["/tmp/nabuconfig.yaml:/nabuconfig.yaml"], + volumeMapping=["/tmp/geoconnex/nabuconfig.yaml:/app/nabuconfig.yaml"], ) @@ -379,7 +379,7 @@ def nabu_object(context: OpExecutionContext): NABU_IMAGE, ARGS, "object", - volumeMapping=["/tmp/nabuconfig.yaml:/nabuconfig.yaml"], + volumeMapping=["/tmp/geoconnex/nabuconfig.yaml:/app/nabuconfig.yaml"], ) @@ -401,7 +401,7 @@ def nabu_prune(context: OpExecutionContext): NABU_IMAGE, ARGS, "prune", - volumeMapping=["/tmp/nabuconfig.yaml:/nabuconfig.yaml"], + volumeMapping=["/tmp/geoconnex/nabuconfig.yaml:/app/nabuconfig.yaml"], ) @@ -422,7 +422,7 @@ def nabu_prov_release(context): NABU_IMAGE, ARGS, "prov-release", - volumeMapping=["/tmp/nabuconfig.yaml:/nabuconfig.yaml"], + volumeMapping=["/tmp/geoconnex/nabuconfig.yaml:/app/nabuconfig.yaml"], ) @@ -434,6 +434,7 @@ def nabu_prov_clear(context: OpExecutionContext): "--cfg", "nabuconfig.yaml", "clear", + "--dangerous", "--endpoint", GLEANERIO_PROVGRAPH_ENDPOINT, ] @@ -442,7 +443,7 @@ def nabu_prov_clear(context: OpExecutionContext): NABU_IMAGE, ARGS, "prov-clear", - volumeMapping=["/tmp/nabuconfig.yaml:/nabuconfig.yaml"], + volumeMapping=["/tmp/geoconnex/nabuconfig.yaml:/app/nabuconfig.yaml"], ) @@ -463,7 +464,7 @@ def nabu_prov_object(context): NABU_IMAGE, ARGS, "prov-object", - volumeMapping=["/tmp/nabuconfig.yaml:/nabuconfig.yaml"], + volumeMapping=["/tmp/geoconnex/nabuconfig.yaml:/app/nabuconfig.yaml"], ) @@ -486,7 +487,7 @@ def nabu_orgs_release(context: OpExecutionContext): NABU_IMAGE, ARGS, "orgs-release", - volumeMapping=["/tmp/nabuconfig.yaml:/nabuconfig.yaml"], + volumeMapping=["/tmp/geoconnex/nabuconfig.yaml:/app/nabuconfig.yaml"], ) @@ -508,7 +509,7 @@ def nabu_orgs(context: OpExecutionContext): NABU_IMAGE, ARGS, "orgs", - volumeMapping=["/tmp/nabuconfig.yaml:/nabuconfig.yaml"], + volumeMapping=["/tmp/geoconnex/nabuconfig.yaml:/app/nabuconfig.yaml"], )