From 17809438e819a64b75fe6704adec533ad459dbf7 Mon Sep 17 00:00:00 2001 From: "teocalvo2@gmail.com" Date: Tue, 11 Jul 2023 22:59:50 +0000 Subject: [PATCH 01/12] =?UTF-8?q?Primeira=20vers=C3=A3o=20para=20coleta=20?= =?UTF-8?q?de=20dados=20autom=C3=A1tica?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/01.raw/datasus/ingestao.py | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 src/01.raw/datasus/ingestao.py diff --git a/src/01.raw/datasus/ingestao.py b/src/01.raw/datasus/ingestao.py new file mode 100644 index 0000000..d27a1fa --- /dev/null +++ b/src/01.raw/datasus/ingestao.py @@ -0,0 +1,34 @@ +# Databricks notebook source +# MAGIC %pip install tqdm + +# COMMAND ---------- + +import urllib.request + +from tqdm import tqdm + +def get_data_uf_ano_mes(uf, ano, mes): + url = f"ftp://ftp.datasus.gov.br/dissemin/publicos/SIHSUS/200801_/Dados/RD{uf}{ano}{mes}.dbc" + + file_path = f"/dbfs/mnt/datalake/datasus/rd/dbc/RD{uf}{ano}{mes}.dbc" + + resp = urllib.request.urlretrieve(url, file_path) + +def get_data_uf(uf, datas): + for i in tqdm(datas): + ano, mes, dia = i.split("-") + ano = ano[-2:] + get_data_uf_ano_mes(uf, ano, mes) + +ufs = ["RO", "AC", "AM", "RR","PA", + "AP", "TO", "MA", "PI", "CE", + "RN", "PB", "PE", "AL", "SE", + "BA", "MG", "ES", "RJ", "SP", + "PR", "SC", "RS", "MS", "MT", + "GO","DF"] + +datas = ['2023-01-01'] + +for uf in ufs: + print(uf) + get_data_uf(uf, datas) From e966f1136204734b6bdf0b3955625e29933dbdde Mon Sep 17 00:00:00 2001 From: "teocalvo2@gmail.com" Date: Tue, 11 Jul 2023 23:16:51 +0000 Subject: [PATCH 02/12] =?UTF-8?q?Vers=C3=A3o=20multithread?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/01.raw/datasus/ingestao.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/01.raw/datasus/ingestao.py b/src/01.raw/datasus/ingestao.py index d27a1fa..60a5ca1 100644 --- a/src/01.raw/datasus/ingestao.py +++ b/src/01.raw/datasus/ingestao.py @@ -4,6 +4,7 @@ # COMMAND ---------- import urllib.request +from multiprocessing import Pool from tqdm import tqdm @@ -27,8 +28,11 @@ def get_data_uf(uf, datas): "PR", "SC", "RS", "MS", "MT", "GO","DF"] -datas = ['2023-01-01'] +datas = ['2023-01-01', '2023-02-01'] -for uf in ufs: - print(uf) - get_data_uf(uf, datas) +to_download = [(uf, datas) for uf in ufs] + +# COMMAND ---------- + +with Pool(8) as pool: + pool.starmap(get_data_uf, to_download) From 38edbcab691818d1aa3d61cb04db27518685d0f4 Mon Sep 17 00:00:00 2001 From: "teocalvo2@gmail.com" Date: Wed, 12 Jul 2023 00:33:51 +0000 Subject: [PATCH 03/12] =?UTF-8?q?Tranforma=C3=A7=C3=A3o=20dos=20dados=20co?= =?UTF-8?q?m=20R?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/01.raw/datasus/ingestao.py | 15 +++++++++++---- src/01.raw/datasus/transform.r | 18 ++++++++++++++++++ 2 files changed, 29 insertions(+), 4 deletions(-) create mode 100644 src/01.raw/datasus/transform.r diff --git a/src/01.raw/datasus/ingestao.py b/src/01.raw/datasus/ingestao.py index 60a5ca1..e10ab23 100644 --- a/src/01.raw/datasus/ingestao.py +++ b/src/01.raw/datasus/ingestao.py @@ -5,9 +5,13 @@ import urllib.request from multiprocessing import Pool - from tqdm import tqdm +import sys +sys.path.insert(0,"../../lib/") + +import dttools + def get_data_uf_ano_mes(uf, ano, mes): url = f"ftp://ftp.datasus.gov.br/dissemin/publicos/SIHSUS/200801_/Dados/RD{uf}{ano}{mes}.dbc" @@ -28,11 +32,14 @@ def get_data_uf(uf, datas): "PR", "SC", "RS", "MS", "MT", "GO","DF"] -datas = ['2023-01-01', '2023-02-01'] - -to_download = [(uf, datas) for uf in ufs] # COMMAND ---------- +dt_start = dbutils.widgets.get("dt_start") +dt_stop = dbutils.widgets.get("dt_stop") + +datas = dttools.date_range(dt_start, dt_stop, monthly=True) +to_download = [(uf, datas) for uf in ufs] + with Pool(8) as pool: pool.starmap(get_data_uf, to_download) diff --git a/src/01.raw/datasus/transform.r b/src/01.raw/datasus/transform.r new file mode 100644 index 0000000..6a825ac --- /dev/null +++ b/src/01.raw/datasus/transform.r @@ -0,0 +1,18 @@ +# Databricks notebook source +install.packages("read.dbc") + +# COMMAND ---------- + +library(read.dbc) + +dbc_folder <- "/dbfs/mnt/datalake/datasus/rd/dbc" +csv_folder <- "/dbfs/mnt/datalake/datasus/rd/csv" + +files <- list.files(dbc_folder, full.names=TRUE) +for(f in files) { + print(f) + df= read.dbc(f) + lista = strsplit(f, "/")[[1]] + file = gsub(".dbc", ".csv", lista[length(lista)]) + write.csv2(df, paste(csv_folder, file, sep="/"), row.names=FALSE) +} From b379501efe7c61f9692b5e62c5882f86e72403e6 Mon Sep 17 00:00:00 2001 From: "teocalvo2@gmail.com" Date: Wed, 12 Jul 2023 22:19:15 +0000 Subject: [PATCH 04/12] Adicionando pasta de landing no S3 em raw --- src/01.raw/datasus/ingestao.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/01.raw/datasus/ingestao.py b/src/01.raw/datasus/ingestao.py index e10ab23..5dda8ec 100644 --- a/src/01.raw/datasus/ingestao.py +++ b/src/01.raw/datasus/ingestao.py @@ -15,7 +15,7 @@ def get_data_uf_ano_mes(uf, ano, mes): url = f"ftp://ftp.datasus.gov.br/dissemin/publicos/SIHSUS/200801_/Dados/RD{uf}{ano}{mes}.dbc" - file_path = f"/dbfs/mnt/datalake/datasus/rd/dbc/RD{uf}{ano}{mes}.dbc" + file_path = f"/dbfs/mnt/datalake/datasus/rd/dbc/landing/RD{uf}{ano}{mes}.dbc" resp = urllib.request.urlretrieve(url, file_path) @@ -25,6 +25,7 @@ def get_data_uf(uf, datas): ano = ano[-2:] get_data_uf_ano_mes(uf, ano, mes) + ufs = ["RO", "AC", "AM", "RR","PA", "AP", "TO", "MA", "PI", "CE", "RN", "PB", "PE", "AL", "SE", @@ -41,5 +42,5 @@ def get_data_uf(uf, datas): datas = dttools.date_range(dt_start, dt_stop, monthly=True) to_download = [(uf, datas) for uf in ufs] -with Pool(8) as pool: +with Pool(10) as pool: pool.starmap(get_data_uf, to_download) From 63a11259f2a41c3b3539694c8c35fa1bbd893605 Mon Sep 17 00:00:00 2001 From: "teocalvo2@gmail.com" Date: Wed, 12 Jul 2023 23:20:21 +0000 Subject: [PATCH 05/12] Move dados processados em R para outro folder --- src/01.raw/datasus/transform.r | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/01.raw/datasus/transform.r b/src/01.raw/datasus/transform.r index 6a825ac..def6dcd 100644 --- a/src/01.raw/datasus/transform.r +++ b/src/01.raw/datasus/transform.r @@ -5,7 +5,7 @@ install.packages("read.dbc") library(read.dbc) -dbc_folder <- "/dbfs/mnt/datalake/datasus/rd/dbc" +dbc_folder <- "/dbfs/mnt/datalake/datasus/rd/dbc/landing" csv_folder <- "/dbfs/mnt/datalake/datasus/rd/csv" files <- list.files(dbc_folder, full.names=TRUE) @@ -15,4 +15,5 @@ for(f in files) { lista = strsplit(f, "/")[[1]] file = gsub(".dbc", ".csv", lista[length(lista)]) write.csv2(df, paste(csv_folder, file, sep="/"), row.names=FALSE) + file.rename(from=f, to=gsub("landing", "proceeded", f)) } From cd2f8a74abfe7d195ed496b221ac25193ad1cf72 Mon Sep 17 00:00:00 2001 From: "teocalvo2@gmail.com" Date: Fri, 14 Jul 2023 00:21:23 +0000 Subject: [PATCH 06/12] Ingestor com spark streaming --- databases/create.sql | 5 ++ databases/grant.sql | 2 +- src/02.bronze/datasus/ingestao.py | 91 +++++++++++++++++++++++++++++++ 3 files changed, 97 insertions(+), 1 deletion(-) create mode 100644 src/02.bronze/datasus/ingestao.py diff --git a/databases/create.sql b/databases/create.sql index 8218b9c..33cf30f 100644 --- a/databases/create.sql +++ b/databases/create.sql @@ -14,3 +14,8 @@ CREATE DATABASE IF NOT EXISTS silver.igdb; -- DBTITLE 1,LinuxTips CREATE DATABASE IF NOT EXISTS bronze.linuxtips; CREATE DATABASE IF NOT EXISTS silver.pizza_query; + +-- COMMAND ---------- + +-- DBTITLE 1,DataSUS +CREATE DATABASE IF NOT EXISTS bronze.datasus; diff --git a/databases/grant.sql b/databases/grant.sql index ecc0205..cb49e27 100644 --- a/databases/grant.sql +++ b/databases/grant.sql @@ -1,5 +1,6 @@ -- Databricks notebook source + -- COMMAND ---------- -- DBTITLE 1,Olist @@ -10,4 +11,3 @@ GRANT READ_METADATA ON DATABASE `bronze.olist` TO `twitch`; GRANT USAGE ON DATABASE `silver.olist` TO `twitch`; GRANT SELECT ON DATABASE `silver.olist` TO `twitch`; GRANT READ_METADATA ON DATABASE `silver.olist` TO `twitch`; - diff --git a/src/02.bronze/datasus/ingestao.py b/src/02.bronze/datasus/ingestao.py new file mode 100644 index 0000000..0533c7f --- /dev/null +++ b/src/02.bronze/datasus/ingestao.py @@ -0,0 +1,91 @@ +# Databricks notebook source +import sys + +sys.path.insert(0, '../../lib') + +from ingestors import IngestaoBronze +import dbtools + +# COMMAND ---------- + +table = "rd_sih" +path_full_load=f'/mnt/datalake/datasus/rd/csv' +path_incremental=f'/mnt/datalake/datasus/rd/csv' +file_format='csv' +table_name=table +database_name='bronze.datasus' +id_fields= ["N_AIH", "DT_SAIDA", "IDENT"] +timestamp_field='DT_SAIDA' +partition_fields=["ANO_CMPT","MES_CMPT"] +read_options = {'sep': ';','header': "true"} + +ingestao = IngestaoBronze( + path_full_load=path_full_load, + path_incremental=path_incremental, + file_format=file_format, + table_name=table_name, + database_name=database_name, + id_fields=id_fields, + timestamp_field=timestamp_field, + partition_fields=partition_fields, + read_options=read_options, + spark=spark, +) + +# COMMAND ---------- + +if not dbtools.table_exists(spark, database_name, table): + df_null = spark.createDataFrame(data=[], schema=ingestao.schema) + ingestao.save_full(df_null) + dbutils.fs.rm(ingestao.checkpoint_path, True) + +# COMMAND ---------- + +ingestao.process_stream() + +# COMMAND ---------- + +# MAGIC %sql +# MAGIC +# MAGIC -- 53834305 +# MAGIC -- 53834305 +# MAGIC +# MAGIC SELECT * +# MAGIC FROM bronze.datasus.rd_sih + +# COMMAND ---------- + +# MAGIC %sql +# MAGIC +# MAGIC SELECT count(*), +# MAGIC count(distinct N_AIH, IDENT), +# MAGIC count(distinct N_AIH, DT_SAIDA, IDENT) +# MAGIC FROM bronze.datasus.sih +# MAGIC + +# COMMAND ---------- + +# MAGIC %sql +# MAGIC with tb_group As ( +# MAGIC SELECT +# MAGIC N_AIH, DT_SAIDA, +# MAGIC COUNT(*) +# MAGIC FROM bronze.datasus.sih +# MAGIC +# MAGIC GROUP BY N_AIH, DT_SAIDA +# MAGIC +# MAGIC having COUNT(*) > 1 +# MAGIC +# MAGIC order by 3 desc +# MAGIC +# MAGIC ) +# MAGIC +# MAGIC SELECT * FROM bronze.datasus.sih +# MAGIC WHERE N_AIH IN (select N_AIH from tb_group) +# MAGIC and DT_SAIDA in (select DT_SAIDA from tb_group) + +# COMMAND ---------- + +# MAGIC %sql +# MAGIC +# MAGIC SELECT * FROM TB_SUS WHERE N_AIH = 2708103094323 From 343b761496db637a7a86a408c03d307eb80f9dfa Mon Sep 17 00:00:00 2001 From: "teocalvo2@gmail.com" Date: Fri, 14 Jul 2023 22:54:10 +0000 Subject: [PATCH 07/12] =?UTF-8?q?Mudan=C3=A7as=20na=20ingest=C3=A3o=20e=20?= =?UTF-8?q?transforma=C3=A7=C3=A3o?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Delay de data com 1 ano de defaul - Arquivo csv ocm data de ingestão no nome --- src/01.raw/datasus/ingestao.py | 6 ++++++ src/01.raw/datasus/transform.r | 5 ++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/01.raw/datasus/ingestao.py b/src/01.raw/datasus/ingestao.py index 5dda8ec..9ed5262 100644 --- a/src/01.raw/datasus/ingestao.py +++ b/src/01.raw/datasus/ingestao.py @@ -7,6 +7,9 @@ from multiprocessing import Pool from tqdm import tqdm +import datetime +from dateutil.relativedelta import relativedelta + import sys sys.path.insert(0,"../../lib/") @@ -38,6 +41,9 @@ def get_data_uf(uf, datas): dt_start = dbutils.widgets.get("dt_start") dt_stop = dbutils.widgets.get("dt_stop") +delay = dbutils.widgets.get("delay") + +dt_start = (datetime.strptime(dt_start, "%Y-%d-01") - relativedelta(months=delay)).strftime("%Y-%d-01") datas = dttools.date_range(dt_start, dt_stop, monthly=True) to_download = [(uf, datas) for uf in ufs] diff --git a/src/01.raw/datasus/transform.r b/src/01.raw/datasus/transform.r index def6dcd..060eb9f 100644 --- a/src/01.raw/datasus/transform.r +++ b/src/01.raw/datasus/transform.r @@ -5,6 +5,8 @@ install.packages("read.dbc") library(read.dbc) +date = format(Sys.time(), "%Y%m%d") + dbc_folder <- "/dbfs/mnt/datalake/datasus/rd/dbc/landing" csv_folder <- "/dbfs/mnt/datalake/datasus/rd/csv" @@ -13,7 +15,8 @@ for(f in files) { print(f) df= read.dbc(f) lista = strsplit(f, "/")[[1]] - file = gsub(".dbc", ".csv", lista[length(lista)]) + + file = gsub(".dbc", paste(date, "csv", sep="."), lista[length(lista)]) write.csv2(df, paste(csv_folder, file, sep="/"), row.names=FALSE) file.rename(from=f, to=gsub("landing", "proceeded", f)) } From 45b392651ab15f6d373aa681f75bff326d447041 Mon Sep 17 00:00:00 2001 From: "teocalvo2@gmail.com" Date: Fri, 14 Jul 2023 22:55:43 +0000 Subject: [PATCH 08/12] Fix typo --- src/01.raw/datasus/ingestao.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/01.raw/datasus/ingestao.py b/src/01.raw/datasus/ingestao.py index 9ed5262..4fee481 100644 --- a/src/01.raw/datasus/ingestao.py +++ b/src/01.raw/datasus/ingestao.py @@ -41,9 +41,9 @@ def get_data_uf(uf, datas): dt_start = dbutils.widgets.get("dt_start") dt_stop = dbutils.widgets.get("dt_stop") -delay = dbutils.widgets.get("delay") +delay = int(dbutils.widgets.get("delay")) -dt_start = (datetime.strptime(dt_start, "%Y-%d-01") - relativedelta(months=delay)).strftime("%Y-%d-01") +dt_start = (datetime.datetime.strptime(dt_start, "%Y-%d-01") - relativedelta(months=delay)).strftime("%Y-%d-01") datas = dttools.date_range(dt_start, dt_stop, monthly=True) to_download = [(uf, datas) for uf in ufs] From 9e6dbc43a23b26b380e6f2748833f898a200c85b Mon Sep 17 00:00:00 2001 From: "teocalvo2@gmail.com" Date: Fri, 14 Jul 2023 22:56:51 +0000 Subject: [PATCH 09/12] Fix date --- src/01.raw/datasus/ingestao.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/01.raw/datasus/ingestao.py b/src/01.raw/datasus/ingestao.py index 4fee481..76d6216 100644 --- a/src/01.raw/datasus/ingestao.py +++ b/src/01.raw/datasus/ingestao.py @@ -43,7 +43,7 @@ def get_data_uf(uf, datas): dt_stop = dbutils.widgets.get("dt_stop") delay = int(dbutils.widgets.get("delay")) -dt_start = (datetime.datetime.strptime(dt_start, "%Y-%d-01") - relativedelta(months=delay)).strftime("%Y-%d-01") +dt_start = (datetime.datetime.strptime(dt_start, "%Y-%m-01") - relativedelta(months=delay)).strftime("%Y-%m-01") datas = dttools.date_range(dt_start, dt_stop, monthly=True) to_download = [(uf, datas) for uf in ufs] From dc28f82da30072cb06c5c65dbf037861787ae7a5 Mon Sep 17 00:00:00 2001 From: "teocalvo2@gmail.com" Date: Fri, 14 Jul 2023 22:58:25 +0000 Subject: [PATCH 10/12] fix date --- src/01.raw/datasus/ingestao.py | 2 +- src/02.bronze/datasus/ingestao.py | 51 +++---------------------------- 2 files changed, 5 insertions(+), 48 deletions(-) diff --git a/src/01.raw/datasus/ingestao.py b/src/01.raw/datasus/ingestao.py index 76d6216..03a8518 100644 --- a/src/01.raw/datasus/ingestao.py +++ b/src/01.raw/datasus/ingestao.py @@ -43,7 +43,7 @@ def get_data_uf(uf, datas): dt_stop = dbutils.widgets.get("dt_stop") delay = int(dbutils.widgets.get("delay")) -dt_start = (datetime.datetime.strptime(dt_start, "%Y-%m-01") - relativedelta(months=delay)).strftime("%Y-%m-01") +dt_start = (datetime.datetime.strptime(dt_start, "%Y-%m-%d") - relativedelta(months=delay)).strftime("%Y-%m-01") datas = dttools.date_range(dt_start, dt_stop, monthly=True) to_download = [(uf, datas) for uf in ufs] diff --git a/src/02.bronze/datasus/ingestao.py b/src/02.bronze/datasus/ingestao.py index 0533c7f..82704ed 100644 --- a/src/02.bronze/datasus/ingestao.py +++ b/src/02.bronze/datasus/ingestao.py @@ -1,4 +1,5 @@ # Databricks notebook source +# DBTITLE 1,Imports import sys sys.path.insert(0, '../../lib') @@ -8,6 +9,7 @@ # COMMAND ---------- +# DBTITLE 1,Setup table = "rd_sih" path_full_load=f'/mnt/datalake/datasus/rd/csv' path_incremental=f'/mnt/datalake/datasus/rd/csv' @@ -34,6 +36,7 @@ # COMMAND ---------- +# DBTITLE 1,Criação da tabela if not dbtools.table_exists(spark, database_name, table): df_null = spark.createDataFrame(data=[], schema=ingestao.schema) ingestao.save_full(df_null) @@ -41,51 +44,5 @@ # COMMAND ---------- +# DBTITLE 1,Ingestão por streaming ingestao.process_stream() - -# COMMAND ---------- - -# MAGIC %sql -# MAGIC -# MAGIC -- 53834305 -# MAGIC -- 53834305 -# MAGIC -# MAGIC SELECT * -# MAGIC FROM bronze.datasus.rd_sih - -# COMMAND ---------- - -# MAGIC %sql -# MAGIC -# MAGIC SELECT count(*), -# MAGIC count(distinct N_AIH, IDENT), -# MAGIC count(distinct N_AIH, DT_SAIDA, IDENT) -# MAGIC FROM bronze.datasus.sih -# MAGIC - -# COMMAND ---------- - -# MAGIC %sql -# MAGIC with tb_group As ( -# MAGIC SELECT -# MAGIC N_AIH, DT_SAIDA, -# MAGIC COUNT(*) -# MAGIC FROM bronze.datasus.sih -# MAGIC -# MAGIC GROUP BY N_AIH, DT_SAIDA -# MAGIC -# MAGIC having COUNT(*) > 1 -# MAGIC -# MAGIC order by 3 desc -# MAGIC -# MAGIC ) -# MAGIC -# MAGIC SELECT * FROM bronze.datasus.sih -# MAGIC WHERE N_AIH IN (select N_AIH from tb_group) -# MAGIC and DT_SAIDA in (select DT_SAIDA from tb_group) - -# COMMAND ---------- - -# MAGIC %sql -# MAGIC -# MAGIC SELECT * FROM TB_SUS WHERE N_AIH = 2708103094323 From 526221a5a103d8a68ae7836200751b69854f1b56 Mon Sep 17 00:00:00 2001 From: "teocalvo2@gmail.com" Date: Fri, 14 Jul 2023 23:05:33 +0000 Subject: [PATCH 11/12] try --- src/01.raw/datasus/ingestao.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/01.raw/datasus/ingestao.py b/src/01.raw/datasus/ingestao.py index 03a8518..79aaab3 100644 --- a/src/01.raw/datasus/ingestao.py +++ b/src/01.raw/datasus/ingestao.py @@ -19,8 +19,10 @@ def get_data_uf_ano_mes(uf, ano, mes): url = f"ftp://ftp.datasus.gov.br/dissemin/publicos/SIHSUS/200801_/Dados/RD{uf}{ano}{mes}.dbc" file_path = f"/dbfs/mnt/datalake/datasus/rd/dbc/landing/RD{uf}{ano}{mes}.dbc" - - resp = urllib.request.urlretrieve(url, file_path) + try: + resp = urllib.request.urlretrieve(url, file_path) + except: + print("Não foi possível coletar o arquivo.") def get_data_uf(uf, datas): for i in tqdm(datas): From 8c83ee486789b40b00460abee9a7f2ab8be08e43 Mon Sep 17 00:00:00 2001 From: "teocalvo2@gmail.com" Date: Fri, 14 Jul 2023 23:51:37 +0000 Subject: [PATCH 12/12] Task com setup --- src/04.workflows/data4u_datasus.json | 79 ++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 src/04.workflows/data4u_datasus.json diff --git a/src/04.workflows/data4u_datasus.json b/src/04.workflows/data4u_datasus.json new file mode 100644 index 0000000..78b095a --- /dev/null +++ b/src/04.workflows/data4u_datasus.json @@ -0,0 +1,79 @@ +{ + "settings": { + "name": "data4u_datasus", + "email_notifications": { + "no_alert_for_skipped_runs": false + }, + "webhook_notifications": {}, + "timeout_seconds": 0, + "max_concurrent_runs": 1, + "tasks": [ + { + "task_key": "raw_ingestion", + "notebook_task": { + "notebook_path": "src/01.raw/datasus/ingestao", + "base_parameters": { + "dt_start": "{{start_date}}", + "dt_stop": "{{start_date}}", + "delay": "13" + }, + "source": "GIT" + }, + "existing_cluster_id": "0523-124114-7ef5b8u0", + "timeout_seconds": 0, + "email_notifications": {}, + "notification_settings": { + "no_alert_for_skipped_runs": false, + "no_alert_for_canceled_runs": false, + "alert_on_last_attempt": false + } + }, + { + "task_key": "raw_transform", + "depends_on": [ + { + "task_key": "raw_ingestion" + } + ], + "notebook_task": { + "notebook_path": "src/01.raw/datasus/transform", + "source": "GIT" + }, + "existing_cluster_id": "0523-124114-7ef5b8u0", + "timeout_seconds": 0, + "email_notifications": {}, + "notification_settings": { + "no_alert_for_skipped_runs": false, + "no_alert_for_canceled_runs": false, + "alert_on_last_attempt": false + } + }, + { + "task_key": "bronze_ingestion", + "depends_on": [ + { + "task_key": "raw_transform" + } + ], + "notebook_task": { + "notebook_path": "src/02.bronze/datasus/ingestao", + "source": "GIT" + }, + "existing_cluster_id": "0523-124114-7ef5b8u0", + "timeout_seconds": 0, + "email_notifications": {}, + "notification_settings": { + "no_alert_for_skipped_runs": false, + "no_alert_for_canceled_runs": false, + "alert_on_last_attempt": false + } + } + ], + "git_source": { + "git_url": "https://github.com/TeoMeWhy/data-4u", + "git_provider": "gitHub", + "git_branch": "feat/raw_datasus" + }, + "format": "MULTI_TASK" + } +} \ No newline at end of file