Skip to content

Commit

Permalink
start rclone
Browse files Browse the repository at this point in the history
  • Loading branch information
C-Loftus committed Dec 16, 2024
1 parent 28a1d31 commit 914f6ad
Show file tree
Hide file tree
Showing 15 changed files with 294 additions and 95 deletions.
5 changes: 5 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,8 @@ POSTGRES_DB=postgres_db
# Integrations / Notifications
# https://docs.dagster.io/_apidocs/libraries/dagster-slack
DAGSTER_SLACK_TOKEN=xoxb-1111111111111-1111111111111-111111111111111111111111

# rclone
RCLONE_ENDPOINT_URL=https://lakefs.example.org/api/v1
RCLONE_ACCESS_KEY_ID=ABCDEFGHIJK123456789
RCLONE_SECRET_ACCESS_KEY=123456789ABCDEF12334566666
10 changes: 7 additions & 3 deletions Docker/Dockerfile_user_code
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
FROM python:3.10-slim

# Base requirements
COPY Docker/user_code_requirements.txt user_code_requirements.txt
RUN pip install -r user_code_requirements.txt

COPY Docker/installRclone.py installRclone.py
RUN python installRclone.py
# install rclone
RUN apt-get -y update; apt-get -y install curl unzip
COPY Docker/rclone.sh rclone.sh
RUN bash rclone.sh

# Check if we are debugging
ARG DAGSTER_DEBUG
# dependencies needed for dagster dev
RUN if [ "$DAGSTER_DEBUG" = "true" ]; then pip install debugpy dagster-webserver; fi
Expand All @@ -21,7 +25,7 @@ COPY templates /opt/dagster/app/templates

# Expose the necessary ports
EXPOSE 4000

# Set the home variable so the code locations are loaded properly
ENV DAGSTER_HOME=/opt/dagster/app

COPY Docker/entrypoint.sh /opt/dagster/app/entrypoint.sh
Expand Down
48 changes: 0 additions & 48 deletions Docker/installRclone.py

This file was deleted.

7 changes: 7 additions & 0 deletions Docker/rclone.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
curl -O https://downloads.rclone.org/rclone-current-linux-amd64.zip
unzip rclone-current-linux-amd64.zip
cd rclone-*-linux-amd64
cp rclone /usr/bin/
chown root:root /usr/bin/rclone
chmod 755 /usr/bin/rclone
rclone version
1 change: 1 addition & 0 deletions Docker/user_code_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ dagster-docker
# Addon code dependencies
dagster-slack
minio
requests
jinja2 # used for templating common variables into gleaner/nabu configs
lxml # used for parsing sitemaps
pyyaml # used for processing gleaner/nabu configs
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

Tear down the docker swarm stack with `python3 main.py down`

As you develop, run `python3 main.py refresh` to rebuild the user code image and `python3 main.py test` to run tests inside the container.

## Architecture

- an .env file is sourced for all secrets and configuration for dagster
Expand Down
3 changes: 3 additions & 0 deletions dagster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ run_launcher:
- DAGSTER_SLACK_TOKEN
- DAGSTER_HOME
- REMOTE_GLEANER_SITEMAP
- RCLONE_ENDPOINT_URL
- RCLONE_ACCESS_KEY_ID
- RCLONE_SECRET_ACCESS_KEY

network: dagster_network
container_kwargs:
Expand Down
28 changes: 18 additions & 10 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,24 +117,24 @@ def refresh():
)


def test():
"""Run pytest inside the user code container"""
def test(*args):
"""Run pytest inside the user code container with optional arguments"""

# get the name of the container
# Get the name of the container
containerName = run_subprocess(
"docker ps --filter name=geoconnex_crawler_dagster_user_code --format '{{.Names}}'",
returnStdoutInsteadOfPrint=True,
)
if not containerName:
raise RuntimeError("Could not find the user code container to run pytest")
containerName = containerName.strip() # Container name sometimes has extra \n
containerName = containerName.strip()

# If we are in CI/CD we need to skip the interactive / terminal flags
pytest = "pytest -vvvxs"
# Prepare the pytest command
pytest_command = f"pytest -vvvxs {' '.join(args)}"
if not sys.stdin.isatty():
run_subprocess(f"docker exec {containerName} {pytest}")
run_subprocess(f"docker exec {containerName} {pytest_command}")
else:
run_subprocess(f"docker exec -it {containerName} {pytest}")
run_subprocess(f"docker exec -it {containerName} {pytest_command}")


def main():
Expand Down Expand Up @@ -171,7 +171,15 @@ def main():
help="Spin up the docker swarm stack with remote s3 and graphdb",
)

subparsers.add_parser("test", help="Run pytest inside the user code container")
test_parser = subparsers.add_parser(
"test",
help="Run pytest inside the user code container. Pass additional pytest arguments as needed",
)
test_parser.add_argument(
"pytest_args",
nargs="*",
help="Additional arguments to pass to pytest (e.g., -- -k 'special_fn')",
)

subparsers.add_parser(
"login", help="Log into the user code container (interactive shell)"
Expand All @@ -187,7 +195,7 @@ def main():
elif args.command == "refresh":
refresh()
elif args.command == "test":
test()
test(*args.pytest_args)
elif args.command == "login":
login()
else:
Expand Down
16 changes: 16 additions & 0 deletions templates/rclone.conf.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[lakefs]
type = s3
provider = Other
env_auth = false
access_key_id = {{ RCLONE_ACCESS_KEY_ID}}
secret_access_key = {{ RCLONE_SECRET_ACCESS_KEY}}
endpoint = {{ RCLONE_ENDPOINT_URL }}

[minio]
type = s3
provider = Minio
env_auth = false
access_key_id = {{ MINIO_ACCESS_KEY }}
secret_access_key = {{ MINIO_SECRET_KEY }}
endpoint = {{ GLEANERIO_MINIO_ADDRESS }}:{{ GLEANERIO_MINIO_PORT }}

55 changes: 55 additions & 0 deletions userCode/lib/classes.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
import io
from pathlib import Path
import subprocess
import sys
from typing import Any
from dagster import get_dagster_logger
from minio import Minio
from urllib3 import BaseHTTPResponse

from .env import (
GLEANER_MINIO_SECRET_KEY,
GLEANER_MINIO_ACCESS_KEY,
GLEANER_MINIO_BUCKET,
GLEANER_MINIO_ADDRESS,
GLEANER_MINIO_PORT,
GLEANER_MINIO_USE_SSL,
RCLONE_ENDPOINT_URL,
)


Expand All @@ -24,6 +29,7 @@ def __init__(self):
)

def load(self, data: Any, remote_path: str):
"""Load arbitrary data into s3 bucket"""
f = io.BytesIO()
length = f.write(data)
f.seek(0) # Reset the stream position to the beginning for reading
Expand All @@ -50,3 +56,52 @@ def read(self, remote_path: str):
response.close()
response.release_conn()
return data


class RClone:
@classmethod
def get_config_path(cls) -> Path:
# Run the command and capture its output
result = subprocess.run(
["rclone", "config", "file"],
text=True, # Ensure output is returned as a string
stdout=subprocess.PIPE, # Capture standard output
stderr=subprocess.PIPE, # Capture standard error
)
if result.returncode == 0:
# Parse the output to get the path
for line in result.stdout.splitlines():
if line.startswith("/"): # Configuration paths typically start with '/'
return Path(line.strip())

raise RuntimeError("Error finding rclone config file path:", result.stderr)

def __init__(self, config_data: str):
rclone_conf_location = self.get_config_path()
with open(str(rclone_conf_location), "w") as f:
f.write(config_data)

def _run_subprocess(self, command: str):
"""Run a shell command and stream the output in realtime"""
process = subprocess.Popen(
command,
shell=True,
stdout=sys.stdout,
stderr=sys.stderr,
)
stdout, stderr = process.communicate()
if process.returncode != 0:
sys.exit(
f"{command} failed with exit code {process.returncode} {stderr=} {stdout=}"
)

return process.returncode, stdout, stderr

def copy(self, path_to_file: str):
get_dagster_logger().info(f"Uploading {path_to_file} to {RCLONE_ENDPOINT_URL}")
returncode = self._run_subprocess(
f"rclone copy minio:{GLEANER_MINIO_BUCKET}/{path_to_file} lakefs:geoconnex/main/{path_to_file}"
)

if returncode != 0:
raise Exception(f"Error copying {path_to_file} to {RCLONE_ENDPOINT_URL}.")
4 changes: 4 additions & 0 deletions userCode/lib/dagster_env.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from dagster import DynamicPartitionsDefinition

# This is the list of sources to crawl that is dynamically generated at runtime by parsing the geoconnex config
sources_partitions_def = DynamicPartitionsDefinition(name="sources_partitions_def")
6 changes: 6 additions & 0 deletions userCode/lib/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ def assert_all_vars():
"GLEANER_HEADLESS_ENDPOINT", # note this is named differently, confusingly so does not have the IO
"GLEANERIO_GRAPH_URL",
"GLEANERIO_GRAPH_NAMESPACE",
"RCLONE_ENDPOINT_URL",
"RCLONE_ACCESS_KEY_ID",
"RCLONE_SECRET_ACCESS_KEY",
]
errors = ""
for var in vars:
Expand Down Expand Up @@ -77,3 +80,6 @@ def strict_get_tag(context: OpExecutionContext, key: str) -> str:
GLEANERIO_DATAGRAPH_ENDPOINT = strict_env("GLEANERIO_DATAGRAPH_ENDPOINT")
GLEANERIO_PROVGRAPH_ENDPOINT = strict_env("GLEANERIO_PROVGRAPH_ENDPOINT")
REMOTE_GLEANER_SITEMAP = strict_env("REMOTE_GLEANER_SITEMAP")
RCLONE_ENDPOINT_URL = strict_env("RCLONE_ENDPOINT_URL")
RCLONE_ACCESS_KEY_ID = strict_env("RCLONE_ACCESS_KEY_ID")
RCLONE_SECRET_ACCESS_KEY = strict_env("RCLONE_SECRET_ACCESS_KEY")
61 changes: 59 additions & 2 deletions userCode/lib/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
import re
import time
from typing import List, Optional, Sequence
from dagster import OpExecutionContext, RunFailureSensorContext, get_dagster_logger
from dagster import (
AssetKey,
OpExecutionContext,
RunFailureSensorContext,
get_dagster_logger,
)
from dagster_docker import DockerRunLauncher
import docker
import docker.errors
Expand All @@ -12,6 +17,7 @@
import docker.models.services
from jinja2 import Environment, FileSystemLoader
import jinja2
from .dagster_env import sources_partitions_def

from .classes import S3
from .env import (
Expand Down Expand Up @@ -188,7 +194,33 @@ def slack_error_fn(context: RunFailureSensorContext) -> str:
return f"Error: {context.failure_event.message}"


def template_config(input_template_file_path: str) -> str:
def template_rclone(input_template_file_path: str) -> str:
"""Fill in a template with shared env vars and return the templated data"""
vars_in_rclone_config = {
var: strict_env(var)
for var in [
"RCLONE_ENDPOINT_URL",
"RCLONE_ACCESS_KEY_ID",
"RCLONE_SECRET_ACCESS_KEY",
"GLEANERIO_MINIO_ADDRESS",
"GLEANERIO_MINIO_PORT",
"GLEANERIO_MINIO_USE_SSL",
"GLEANERIO_MINIO_BUCKET",
"MINIO_SECRET_KEY",
"MINIO_ACCESS_KEY",
]
}
env = Environment(
loader=FileSystemLoader(os.path.dirname(input_template_file_path)),
undefined=jinja2.StrictUndefined,
)
template = env.get_template(os.path.basename(input_template_file_path))

# Render the template with the context
return template.render(**vars_in_rclone_config)


def template_gleaner_or_nabu(input_template_file_path: str) -> str:
"""Fill in a template with shared env vars and return the templated data"""
vars_in_both_nabu_and_gleaner_configs = {
var: strict_env(var)
Expand All @@ -215,3 +247,28 @@ def template_config(input_template_file_path: str) -> str:

# Render the template with the context
return template.render(**vars_in_both_nabu_and_gleaner_configs)


def all_dependencies_materialized(
context: OpExecutionContext, dependency_asset_key: str
) -> bool:
"""Check if all partitions of a given asset are materialized"""
instance = context.instance
all_partitions = sources_partitions_def.get_partition_keys(
dynamic_partitions_store=instance
)
# Check if all partitions of finished_individual_crawl are materialized
materialized_partitions = context.instance.get_materialized_partitions(
asset_key=AssetKey(dependency_asset_key)
)

if len(all_partitions) != len(materialized_partitions):
get_dagster_logger().warning(
f"Not all partitions of {dependency_asset_key} are materialized, so nq generation will be skipped"
)
return False
else:
get_dagster_logger().info(
f"All partitions of {dependency_asset_key} are detected as having been materialized"
)
return True
Loading

0 comments on commit 914f6ad

Please sign in to comment.