Skip to content

Commit

Permalink
edit docs
Browse files Browse the repository at this point in the history
  • Loading branch information
danielgafni committed Oct 4, 2024
1 parent d1dfcd4 commit 633284d
Show file tree
Hide file tree
Showing 10 changed files with 143 additions and 34 deletions.
Binary file added dagster_pipes.zip
Binary file not shown.
Binary file modified docs/content/api/modules.json.gz
Binary file not shown.
Binary file modified docs/content/api/searchindex.json.gz
Binary file not shown.
Binary file modified docs/content/api/sections.json.gz
Binary file not shown.
Binary file modified docs/next/public/objects.inv
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# this Dockerfile can be used to create a venv archive for PySpark on AWS EMR

FROM amazonlinux:2 AS builder

RUN yum install -y python3

WORKDIR /build

COPY --from=ghcr.io/astral-sh/uv:latest /uv /bin/uv

ENV VIRTUAL_ENV=/build/.venv
ENV PATH="$VIRTUAL_ENV/bin:$PATH"

RUN uv python install --python-preference only-managed 3.9.16 && uv python pin 3.9.16

RUN uv venv .venv

RUN uv pip install pex dagster-pipes boto3 pyspark

RUN pex dagster-pipes boto3 pyspark -o /output/venv.pex && chmod +x /output/venv.pex

# test imports
RUN /output/venv.pex -c "import dagster_pipes, pyspark, boto3;"

FROM scratch AS export

COPY --from=builder /output/venv.pex /venv.pex
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,57 @@
import os

import boto3
from dagster_aws.pipes import PipesEMRClient
from dagster_aws.pipes import PipesEMRClient, PipesS3MessageReader
from mypy_boto3_emr.type_defs import InstanceFleetTypeDef

from dagster import AssetExecutionContext, asset


@asset
def glue_pipes_asset(context: AssetExecutionContext, pipes_emr_client: PipesEMRClient):
def emr_pipes_asset(context: AssetExecutionContext, pipes_emr_client: PipesEMRClient):
return pipes_emr_client.run(
context=context,
run_job_flow_params={
"Name": "Example Job",
"Name": "Dagster Pipes",
"LogUri": "s3://aws-glue-assets-467123434025-eu-north-1/emr/logs",
"JobFlowRole": "arn:aws:iam::467123434025:instance-profile/AmazonEMR-InstanceProfile-20241001T134828",
"ServiceRole": "arn:aws:iam::467123434025:role/service-role/AmazonEMR-ServiceRole-20241001T134845",
"ReleaseLabel": "emr-7.3.0",
"Instances": {
"MasterInstanceType": "m5.xlarge",
"SlaveInstanceType": "m5.xlarge",
"InstanceCount": 3,
"Ec2KeyName": "YubiKey",
},
"Applications": [{"Name": "Hadoop"}, {"Name": "Spark"}],
"StepConcurrencyLevel": 1,
"Steps": [
{
"Name": "Example Step",
"ActionOnFailure": "CONTINUE",
"Name": "Main",
"ActionOnFailure": "TERMINATE_CLUSTER",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--deploy-mode",
"cluster",
"example.py",
"--master",
"yarn",
"--files",
"s3://aws-glue-assets-467123434025-eu-north-1/envs/emr/pipes/venv.pex",
"--conf",
"spark.pyspark.python=./venv.pex",
"--conf",
"spark.yarn.submit.waitAppCompletion=true",
"s3://aws-glue-assets-467123434025-eu-north-1/envs/emr/pipes/script.py",
],
},
},
],
"Tags": [
{
"Key": "for-use-with-amazon-emr-managed-policies",
"Value": "true",
}
],
},
Expand All @@ -45,8 +67,14 @@ def glue_pipes_asset(context: AssetExecutionContext, pipes_emr_client: PipesEMRC


defs = Definitions(
assets=[glue_pipes_asset],
resources={"pipes_emr_client": PipesEMRClient()},
assets=[emr_pipes_asset],
resources={
"pipes_emr_client": PipesEMRClient(
message_reader=PipesS3MessageReader(
bucket=os.environ["DAGSTER_PIPES_BUCKET"]
)
)
},
)

# end_definitions_marker

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import boto3
from dagster_pipes import PipesS3MessageWriter, open_dagster_pipes
from pyspark.sql import SparkSession


def main():
with open_dagster_pipes(
message_writer=PipesS3MessageWriter(client=boto3.client("s3"))
) as pipes:
pipes.log.info("Hello from AWS EMR!")

spark = SparkSession.builder.appName("HelloWorld").getOrCreate()

df = spark.createDataFrame(
[(1, "Alice", 34), (2, "Bob", 45), (3, "Charlie", 56)],
["id", "name", "age"],
)

# calculate a really important statistic
avg_age = float(df.agg({"age": "avg"}).collect()[0][0])

# attach it to the asset materialization in Dagster
pipes.report_asset_materialization(
metadata={"average_age": {"raw_value": avg_age, "type": "float"}},
data_version="alpha",
)

spark.stop()

print("Hello from stdout!") # noqa: T201


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# this script can be used to pack and upload a python .pex file to an s3 bucket
# requires docker and AWS CLI

import argparse
import os
import subprocess
import sys
import tempfile
from pathlib import Path

SCRIPT_DIR = Path(__file__).parent

REQUIREMENTS_TXT = SCRIPT_DIR / "requirements.txt"
DAGSTER_DIR = Path(*SCRIPT_DIR.parts[: SCRIPT_DIR.parts.index("examples")])

DAGSTER_PIPES_DIR = DAGSTER_DIR / "python_modules/dagster-pipes"

parser = argparse.ArgumentParser(description="Upload a python virtualenv to an s3 path")
parser.add_argument(
"--python", type=str, help="python version to use", default="3.9.16"
)
parser.add_argument(
"--s3-dir", type=str, help="s3 directory to copy files into", required=True
)


def main():
args = parser.parse_args()

with tempfile.TemporaryDirectory() as temp_dir:
os.chdir(temp_dir)
subprocess.run(
" && \\\n".join(
[
f"DOCKER_BUILDKIT=1 docker build --output type=local,dest=./output -f {SCRIPT_DIR}/Dockerfile .",
f"aws s3 cp ./output/venv.pex {os.path.join(args.s3_dir, 'venv.pex')}",
f"aws s3 cp {SCRIPT_DIR / 'script.py'} {os.path.join(args.s3_dir, 'script.py')}",
]
),
shell=True,
check=True,
)


if __name__ == "__main__":
main()

0 comments on commit 633284d

Please sign in to comment.