-
Notifications
You must be signed in to change notification settings - Fork 27
/
Copy pathspark_simple.py
32 lines (26 loc) · 1017 Bytes
/
spark_simple.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import datetime
import os
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
output_file = os.path.join(
models.Variable.get('gcs_bucket'), 'dataproc_simple',
datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
args = {
'start_date': yesterday,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
with models.DAG('spark_simple', schedule_interval=datetime.timedelta(days=1), default_args=args) as dag:
run_step = dataproc_operator.DataProcPySparkOperator(
task_id='run_spark',
cluster_name='cluster-9c11',
region=models.Variable.get('gcp_region'),
main='gs://bigdataupv_code/compras_top_ten_countries.py',
files=['gs://bigdataupv_code/helpers.py'])