From 821e789e97598588bc7bf91cff52001e63267b03 Mon Sep 17 00:00:00 2001 From: "teocalvo2@gmail.com" Date: Wed, 9 Aug 2023 15:40:15 +0000 Subject: [PATCH 1/8] [ibge] Dados de municipios brasileiros --- databases/create.sql | 30 ++++++++++++++--------- src/02.bronze/ibge/ingestao_municipios.py | 10 ++++++++ 2 files changed, 28 insertions(+), 12 deletions(-) create mode 100644 src/02.bronze/ibge/ingestao_municipios.py diff --git a/databases/create.sql b/databases/create.sql index e21e63a..74e3add 100644 --- a/databases/create.sql +++ b/databases/create.sql @@ -1,7 +1,19 @@ -- Databricks notebook source --- DBTITLE 1,Olist -CREATE DATABASE IF NOT EXISTS bronze.olist; -CREATE DATABASE IF NOT EXISTS silver.olist; +-- DBTITLE 1,DataSUS +CREATE DATABASE IF NOT EXISTS bronze.datasus; +CREATE DATABASE IF NOT EXISTS silver.datasus; + +-- COMMAND ---------- + +-- DBTITLE 1,Dota +CREATE DATABASE IF NOT EXISTS bronze.dota; +CREATE DATABASE IF NOT EXISTS silver.dota; + +-- COMMAND ---------- + +-- DBTITLE 1,IBGE +CREATE DATABASE IF NOT EXISTS bronze.ibge; +CREATE DATABASE IF NOT EXISTS silver.ibge; -- COMMAND ---------- @@ -17,12 +29,6 @@ CREATE DATABASE IF NOT EXISTS silver.pizza_query; -- COMMAND ---------- --- DBTITLE 1,DataSUS -CREATE DATABASE IF NOT EXISTS bronze.datasus; -CREATE DATABASE IF NOT EXISTS silver.datasus; - --- COMMAND ---------- - --- DBTITLE 1,Dota -CREATE DATABASE IF NOT EXISTS bronze.dota; -CREATE DATABASE IF NOT EXISTS silver.dota; +-- DBTITLE 1,Olist +CREATE DATABASE IF NOT EXISTS bronze.olist; +CREATE DATABASE IF NOT EXISTS silver.olist; diff --git a/src/02.bronze/ibge/ingestao_municipios.py b/src/02.bronze/ibge/ingestao_municipios.py new file mode 100644 index 0000000..626b3a9 --- /dev/null +++ b/src/02.bronze/ibge/ingestao_municipios.py @@ -0,0 +1,10 @@ +# Databricks notebook source +path = "/mnt/datalake/ibge/municipios_brasileiros.csv" + +df = (spark.read + .csv(path, header=True, sep=";")) + +(df.write + .format("delta") + .mode("overwrite") + .saveAsTable("bronze.ibge.municipios_brasileiros")) From bfc5f8101de851c0157f8fae4e8ffce308263915 Mon Sep 17 00:00:00 2001 From: "teocalvo2@gmail.com" Date: Wed, 9 Aug 2023 15:42:21 +0000 Subject: [PATCH 2/8] Municipios brasileiros em silver --- src/03.silver/ibge/municipios_brasileiros.sql | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 src/03.silver/ibge/municipios_brasileiros.sql diff --git a/src/03.silver/ibge/municipios_brasileiros.sql b/src/03.silver/ibge/municipios_brasileiros.sql new file mode 100644 index 0000000..c79a328 --- /dev/null +++ b/src/03.silver/ibge/municipios_brasileiros.sql @@ -0,0 +1,9 @@ +-- Databricks notebook source +DROP TABLE IF EXISTS silver.ibge.municipios_brasileiros; + +CREATE TABLE IF NOT EXISTS silver.ibge.municipios_brasileiros AS ( + + SELECT * + FROM bronze.ibge.municipios_brasileiros + +); From 0ce94cda2de39ce69a17597dc86fa3cf7627866f Mon Sep 17 00:00:00 2001 From: "teocalvo2@gmail.com" Date: Wed, 9 Aug 2023 20:49:58 +0000 Subject: [PATCH 3/8] =?UTF-8?q?Adicao=20da=20desci=C3=A7=C3=A3o=20do=20CID?= =?UTF-8?q?=20no=20Diagn=C3=B3stico=20Princial=20e=20secund=C3=A1rio?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/01.raw/datasus/ingestion_cid.py | 20 ++++++++ src/02.bronze/datasus/ingestao_cid.py | 73 +++++++++++++++++++++++++++ src/03.silver/datasus/cid.sql | 7 +++ src/03.silver/datasus/rd_sih.sql | 29 ++++++++++- 4 files changed, 127 insertions(+), 2 deletions(-) create mode 100644 src/01.raw/datasus/ingestion_cid.py create mode 100644 src/02.bronze/datasus/ingestao_cid.py create mode 100644 src/03.silver/datasus/cid.sql diff --git a/src/01.raw/datasus/ingestion_cid.py b/src/01.raw/datasus/ingestion_cid.py new file mode 100644 index 0000000..d551117 --- /dev/null +++ b/src/01.raw/datasus/ingestion_cid.py @@ -0,0 +1,20 @@ +# Databricks notebook source +# MAGIC %pip install lxml + +# COMMAND ---------- + +import pandas as pd + +url = 'http://tabnet.datasus.gov.br/cgi/sih/mxcid10lm.htm' + +dfs = pd.read_html(url) +df = dfs[2] +df.columns = df.columns.levels[1] + +# COMMAND ---------- + +df.to_csv("/dbfs/mnt/datalake/datasus/cid/volume1.csv", index=False, sep=";") + +# COMMAND ---------- + +df diff --git a/src/02.bronze/datasus/ingestao_cid.py b/src/02.bronze/datasus/ingestao_cid.py new file mode 100644 index 0000000..98a1b3e --- /dev/null +++ b/src/02.bronze/datasus/ingestao_cid.py @@ -0,0 +1,73 @@ +# Databricks notebook source +import pandas as pd +import string + +def range_letter(x, stop_number=None): + letter = x[0] + start_number = int(x.split("-")[0][1:]) + stop_number = int(x.split("-")[1][1:]) if stop_number == None else stop_number + + values = [ f'{letter}{i:02d}' for i in range(start_number, stop_number+1) ] + return values + + +def make_range(x): + + x = x.strip(" ") + + try: + start, stop = x.split("-") + except ValueError as err: + return x + + letter_start = start[0] + letter_start_pos = string.ascii_uppercase.find(letter_start) + number_start = int(float(start[1:])) + + letter_stop = stop[0] + letter_stop_pos = string.ascii_uppercase.find(letter_stop) + number_stop = int(float(stop[1:])) + + values = [] + letter_pos = letter_start_pos + while letter_pos < letter_stop_pos: + + letter = string.ascii_uppercase[letter_pos] + + values.extend(range_letter(f'{letter}{number_start:02d}-{letter}99')) + + letter_pos += 1 + number_start = 1 + + values.extend(range_letter(f'{letter_stop}{number_start}-{letter_stop}{number_stop}')) + + return values + +# COMMAND ---------- + +df = pd.read_csv("/dbfs/mnt/datalake/datasus/cid/volume1.csv", sep=";") + +# COMMAND ---------- + +df['DescricaoLista'] = df['Descrição'].fillna("").apply(lambda x: x.split(",")) + +df_explode = df.explode("DescricaoLista").iloc[:-1] +df_explode = df_explode[df_explode["Código"] != '-'] + +df_explode["DescricaoListaRange"] = df_explode["DescricaoLista"].apply(make_range) +df_completa = df_explode.explode("DescricaoListaRange") + +columns = { + "Capítulo": "descCapituloCID" , + "Código": "codCID", + "Códigos da CID-10": "descCID" , + "Descrição": "codCID10" , + "DescricaoListaRange": "codCID10dst", +} + +df_completa = df_completa.rename(columns=columns)[columns.values()] +sdf = spark.createDataFrame(df_completa) + +# COMMAND ---------- + +sdf.write.format("delta").mode("overwrite").saveAsTable("bronze.datasus.cid") diff --git a/src/03.silver/datasus/cid.sql b/src/03.silver/datasus/cid.sql new file mode 100644 index 0000000..e60c758 --- /dev/null +++ b/src/03.silver/datasus/cid.sql @@ -0,0 +1,7 @@ +-- Databricks notebook source +DROP TABLE IF EXISTS silver.datasus.cid; + +CREATE TABLE IF NOT EXISTS silver.datasus.cid AS ( + SELECT * + FROM bronze.datasus.cid +); diff --git a/src/03.silver/datasus/rd_sih.sql b/src/03.silver/datasus/rd_sih.sql index b1e1786..8a9f2c1 100644 --- a/src/03.silver/datasus/rd_sih.sql +++ b/src/03.silver/datasus/rd_sih.sql @@ -3,8 +3,22 @@ DROP TABLE IF EXISTS silver.datasus.rd_sih; CREATE TABLE IF NOT EXISTS silver.datasus.rd_sih AS +WITH tb_uf AS ( + select distinct codUF, descUF + from silver.ibge.municipios_brasileiros +), + +tb_cid AS ( + select * + from silver.datasus.cid + where codCID not like '%-%' + and codCID10 not like '%.%' + qualify row_number() over (partition by codCID10dst order by codCID desc) = 1 +) + SELECT - t1.UF_ZI, + -- t1.UF_ZI, + t32.descUF, t1.ANO_CMPT, t1.MES_CMPT, t4.descEspecialidade, @@ -44,7 +58,9 @@ SELECT TO_DATE(t1.DT_INTER, 'yyyymmdd') AS DtInternacao, TO_DATE(t1.DT_SAIDA, 'yyyymmdd') AS DtSaida, t1.DIAG_PRINC, + t33.descCID AS descDiagnosticoCIDPrinc, t1.DIAG_SECUN, + t34.descCID AS descDiagnosticoCIDSec, t17.`descTipoCobrança` AS descTipoCobranca, t10.descNaturezaHospitalSUS, t11.descNaturezaJuridica, @@ -160,7 +176,8 @@ ON t1.RUBRICA = t14.codRubrica LEFT JOIN bronze.datasus.seqaih_5 AS t15 ON t1.SEQ_AIH5 = t15.codSeqAIH -LEFT JOIN bronze.datasus.subtipo_financ AS t16 +LEFT JOIN bronze.datasusOutras complicações da gravidez e do parto +.subtipo_financ AS t16 ON t1.FAEC_TP = t16.codSubTipoFinanciamento LEFT JOIN bronze.datasus.tipo_cobranca AS t17 @@ -208,4 +225,12 @@ ON t1.DIAGSEC8 = t30.codTpDiagSecundario LEFT JOIN bronze.datasus.tp_diagsecundario as t31 ON t1.DIAGSEC9 = t31.codTpDiagSecundario +LEFT JOIN tb_uf AS t32 +ON substring(t1.UF_ZI,0,2) = t32.codUF + +LEFT JOIN tb_cid AS t33 +ON substring(t1.DIAG_PRINC,0,3) = t33.codCID10dst + +LEFT JOIN tb_cid AS t34 +ON substring(t1.DIAG_SECUN,0,3) = t34.codCID10dst ; From d9a7c69eff6580500e5e0bfdf51025d4f0b072d9 Mon Sep 17 00:00:00 2001 From: TeoCalvo Date: Tue, 15 Aug 2023 13:21:12 +0000 Subject: [PATCH 4/8] Add sinasc --- src/01.raw/datasus/datasources.json | 12 +++++ src/01.raw/datasus/ingestao.py | 82 ++++++++++++++++++++++------- src/lib/dttools.py | 7 ++- 3 files changed, 80 insertions(+), 21 deletions(-) create mode 100644 src/01.raw/datasus/datasources.json diff --git a/src/01.raw/datasus/datasources.json b/src/01.raw/datasus/datasources.json new file mode 100644 index 0000000..f38ccb4 --- /dev/null +++ b/src/01.raw/datasus/datasources.json @@ -0,0 +1,12 @@ +{ + "sihsus": { + "origin": "ftp://ftp.datasus.gov.br/dissemin/publicos/SIHSUS/200801_/Dados/RD{uf}{ano}{mes}.dbc", + "target": "/dbfs/mnt/datalake/datasus/rd/dbc/landing/RD{uf}{ano}{mes}.dbc", + "period": "monthly" + }, + "sinasc": { + "origin": "ftp://ftp.datasus.gov.br/dissemin/publicos/SINASC/1996_/Dados/DNRES/DN{uf}{ano}.dbc", + "target": "/dbfs/mnt/datalake/datasus/sinasc/dbc/landing/DN{uf}{ano}.dbc", + "period": "yearly" + } +} \ No newline at end of file diff --git a/src/01.raw/datasus/ingestao.py b/src/01.raw/datasus/ingestao.py index 79aaab3..a346f54 100644 --- a/src/01.raw/datasus/ingestao.py +++ b/src/01.raw/datasus/ingestao.py @@ -6,6 +6,7 @@ import urllib.request from multiprocessing import Pool from tqdm import tqdm +import json import datetime from dateutil.relativedelta import relativedelta @@ -15,20 +16,47 @@ import dttools -def get_data_uf_ano_mes(uf, ano, mes): - url = f"ftp://ftp.datasus.gov.br/dissemin/publicos/SIHSUS/200801_/Dados/RD{uf}{ano}{mes}.dbc" +class IngestionRawSUS: - file_path = f"/dbfs/mnt/datalake/datasus/rd/dbc/landing/RD{uf}{ano}{mes}.dbc" - try: - resp = urllib.request.urlretrieve(url, file_path) - except: - print("Não foi possível coletar o arquivo.") + def __init__(self, ufs, date_range, source, n_jobs=2): + self.ufs = ufs + self.n_jobs = n_jobs -def get_data_uf(uf, datas): - for i in tqdm(datas): - ano, mes, dia = i.split("-") - ano = ano[-2:] - get_data_uf_ano_mes(uf, ano, mes) + with open('datasources.json', 'r') as open_file: + datasources = json.load(open_file) + + self.origin = datasources[source]['origin'] + self.target = datasources[source]['target'] + self.period = datasources[source]['period'] + + self.dates=[] + self.set_dates(date_range) + + + def set_dates(self, date_range): + self.dates = dttools.date_range(date_range[0], date_range[-1], period=self.period) + + + def get_data_uf_ano_mes(self, uf, ano, mes): + + url = self.origin.format(uf=uf, ano=ano, mes=mes) + file_path = self.target.format(uf=uf, ano=ano, mes=mes) + + try: + resp = urllib.request.urlretrieve(url, file_path) + except: + print(f"Não foi possível coletar o arquivo. {uf} | {ano}-{mes}-01") + + def get_data_uf(self, uf): + for i in tqdm(self.dates): + ano, mes, dia = i.split("-") + ano = ano[-2:] + self.get_data_uf_ano_mes(uf, ano, mes) + + + def auto_execute(self): + with Pool(self.n_jobs) as pool: + pool.map(self.get_data_uf, self.ufs) ufs = ["RO", "AC", "AM", "RR","PA", @@ -41,14 +69,30 @@ def get_data_uf(uf, datas): # COMMAND ---------- -dt_start = dbutils.widgets.get("dt_start") -dt_stop = dbutils.widgets.get("dt_stop") -delay = int(dbutils.widgets.get("delay")) +# datasource = dbutils.widgets.get("datasource") +datasource = 'sihsus' + +# dt_start = dbutils.widgets.get("dt_start") +dt_start = '2023-08-01' + +# dt_stop = dbutils.widgets.get("dt_stop") +dt_stop = '2023-08-01' + +# delay = int(dbutils.widgets.get("delay")) +delay = 3 + dt_start = (datetime.datetime.strptime(dt_start, "%Y-%m-%d") - relativedelta(months=delay)).strftime("%Y-%m-01") -datas = dttools.date_range(dt_start, dt_stop, monthly=True) -to_download = [(uf, datas) for uf in ufs] +ing = IngestionRawSUS(ufs=ufs, + date_range=[dt_start, dt_stop], + source=datasource, + n_jobs=10) + +# COMMAND ---------- + +ing.auto_execute() + +# COMMAND ---------- + -with Pool(10) as pool: - pool.starmap(get_data_uf, to_download) diff --git a/src/lib/dttools.py b/src/lib/dttools.py index 3eadeb6..f5b61df 100644 --- a/src/lib/dttools.py +++ b/src/lib/dttools.py @@ -1,6 +1,6 @@ import datetime -def date_range(start, stop, monthly=False): +def date_range(start, stop, period='monthly'): dt_start = datetime.datetime.strptime(start, '%Y-%m-%d') dt_stop = datetime.datetime.strptime(stop, '%Y-%m-%d') dates = [] @@ -9,7 +9,10 @@ def date_range(start, stop, monthly=False): dates.append(dt_start.strftime("%Y-%m-%d")) dt_start += datetime.timedelta(days=1) - if monthly: + if period=='monthly': return [i for i in dates if i.endswith("01")] + elif period=='yearly': + return [i for i in dates if i.endswith("01") and i.split('-')[-2]=='01'] + return dates \ No newline at end of file From b8ed93c98ed70792f4c5701d435f53c31527361f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?T=C3=A9o=20Calvo?= Date: Tue, 15 Aug 2023 11:06:46 -0300 Subject: [PATCH 5/8] Fix date_range --- src/03.silver/igdb/feature_store/fs_ingestao.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/03.silver/igdb/feature_store/fs_ingestao.py b/src/03.silver/igdb/feature_store/fs_ingestao.py index 2f63e73..c5f9db8 100644 --- a/src/03.silver/igdb/feature_store/fs_ingestao.py +++ b/src/03.silver/igdb/feature_store/fs_ingestao.py @@ -17,8 +17,11 @@ date_start = dbutils.widgets.get('date_start') # parametro date_stop = dbutils.widgets.get('date_stop') # parametro -monthly = dbutils.widgets.get('monthly') == 'True' # parametro -dates = dttools.date_range(date_start, date_stop, monthly=monthly) +period = 'monthly' if dbutils.widgets.get('monthly') == 'True' else 'daily' + + +# parametro +dates = dttools.date_range(date_start, date_stop, period=period) query = dbtools.import_query(f'etl/{table}.sql') From 79e3f8067a4b7415eb6dfbe22c02dcf8353cd0a8 Mon Sep 17 00:00:00 2001 From: TeoCalvo Date: Tue, 15 Aug 2023 18:23:27 +0000 Subject: [PATCH 6/8] Auto execute com sinasc --- src/01.raw/datasus/ingestao.py | 40 +++++++++++++++------------------- 1 file changed, 17 insertions(+), 23 deletions(-) diff --git a/src/01.raw/datasus/ingestao.py b/src/01.raw/datasus/ingestao.py index a346f54..c0462d6 100644 --- a/src/01.raw/datasus/ingestao.py +++ b/src/01.raw/datasus/ingestao.py @@ -32,7 +32,6 @@ def __init__(self, ufs, date_range, source, n_jobs=2): self.dates=[] self.set_dates(date_range) - def set_dates(self, date_range): self.dates = dttools.date_range(date_range[0], date_range[-1], period=self.period) @@ -44,13 +43,17 @@ def get_data_uf_ano_mes(self, uf, ano, mes): try: resp = urllib.request.urlretrieve(url, file_path) + except: print(f"Não foi possível coletar o arquivo. {uf} | {ano}-{mes}-01") def get_data_uf(self, uf): for i in tqdm(self.dates): ano, mes, dia = i.split("-") - ano = ano[-2:] + + if self.period == 'monthly': + ano = ano[-2:] + self.get_data_uf_ano_mes(uf, ano, mes) @@ -59,30 +62,25 @@ def auto_execute(self): pool.map(self.get_data_uf, self.ufs) -ufs = ["RO", "AC", "AM", "RR","PA", - "AP", "TO", "MA", "PI", "CE", - "RN", "PB", "PE", "AL", "SE", - "BA", "MG", "ES", "RJ", "SP", - "PR", "SC", "RS", "MS", "MT", - "GO","DF"] - - # COMMAND ---------- -# datasource = dbutils.widgets.get("datasource") -datasource = 'sihsus' +datasource = dbutils.widgets.get("datasource") +dt_start = dbutils.widgets.get("dt_start") +dt_stop = dbutils.widgets.get("dt_stop") +delay = int(dbutils.widgets.get("delay")) -# dt_start = dbutils.widgets.get("dt_start") -dt_start = '2023-08-01' +dt_start = (datetime.datetime.strptime(dt_start, "%Y-%m-%d") - relativedelta(months=delay)).strftime("%Y-%m-01") -# dt_stop = dbutils.widgets.get("dt_stop") -dt_stop = '2023-08-01' -# delay = int(dbutils.widgets.get("delay")) -delay = 3 +ufs = ["RO", "AC", "AM", "RR","PA", + "AP", "TO", "MA", "PI", "CE", + "RN", "PB", "PE", "AL", "SE", + "BA", "MG", "ES", "RJ", "SP", + "PR", "SC", "RS", "MS", "MT", + "GO", "DF"] +ufs.sort(reverse=True) -dt_start = (datetime.datetime.strptime(dt_start, "%Y-%m-%d") - relativedelta(months=delay)).strftime("%Y-%m-01") ing = IngestionRawSUS(ufs=ufs, date_range=[dt_start, dt_stop], @@ -92,7 +90,3 @@ def auto_execute(self): # COMMAND ---------- ing.auto_execute() - -# COMMAND ---------- - - From 1482fbb4a05b8bcedec227a78323f934f234014e Mon Sep 17 00:00:00 2001 From: TeoCalvo Date: Tue, 15 Aug 2023 19:00:02 +0000 Subject: [PATCH 7/8] Add parameter to jobs --- src/01.raw/datasus/transform.r | 26 +++++++++----- src/06.workflows/data4u_datasus.json | 54 ++++++++++++++++++++++++++-- 2 files changed, 68 insertions(+), 12 deletions(-) diff --git a/src/01.raw/datasus/transform.r b/src/01.raw/datasus/transform.r index 8be6cf0..337cd63 100644 --- a/src/01.raw/datasus/transform.r +++ b/src/01.raw/datasus/transform.r @@ -1,17 +1,25 @@ # Databricks notebook source install.packages("read.dbc") install.packages("doParallel") +install.packages("jsonlite") # COMMAND ---------- library(read.dbc) library(foreach) library(doParallel) +library(jsonlite) date = format(Sys.time(), "%Y%m%d") -dbc_folder <- "/dbfs/mnt/datalake/datasus/rd/dbc/landing" -csv_folder <- "/dbfs/mnt/datalake/datasus/rd/csv" +datasource <- dbutils.widgets.get("datasource") +datasources <- fromJSON("datasources.json") + +path = datasources[datasource][[1]]['target'][[1]] +partes <- unlist(strsplit(path, "/")) +partes <- partes[-length(partes)] +dbc_folder <- paste(partes, collapse = "/") +csv_folder <- sub('/dbc/landing', '/csv', dbc_folder) files <- list.files(dbc_folder, full.names=TRUE) @@ -31,12 +39,12 @@ etl <- function(f) { registerDoParallel(8) while (sum(is.na(files)) != length(files)) { - batch = files[1:min(8, length(files))] - files = files[1+min(8, length(files)):length(files)] - foreach (i=batch) %dopar% { - print(i) - if (is.na(i) == FALSE) { - etl(i) - } + batch = files[1:min(8, length(files))] + files = files[1+min(8, length(files)):length(files)] + foreach (i=batch) %dopar% { + print(i) + if (is.na(i) == FALSE) { + etl(i) } + } } diff --git a/src/06.workflows/data4u_datasus.json b/src/06.workflows/data4u_datasus.json index fdacb86..42598c6 100644 --- a/src/06.workflows/data4u_datasus.json +++ b/src/06.workflows/data4u_datasus.json @@ -8,10 +8,11 @@ "max_concurrent_runs": 1, "tasks": [ { - "task_key": "raw_ingestion", + "task_key": "raw_ingestion_sihsus", "notebook_task": { "notebook_path": "src/01.raw/datasus/ingestao", "base_parameters": { + "datasource": "sihsus", "dt_start": "{{start_date}}", "dt_stop": "{{start_date}}", "delay": "13" @@ -28,14 +29,61 @@ } }, { - "task_key": "raw_transform", + "task_key": "raw_ingestion_sinasc", + "notebook_task": { + "notebook_path": "src/01.raw/datasus/ingestao", + "base_parameters": { + "datasource": "sinasc", + "dt_start": "{{start_date}}", + "dt_stop": "{{start_date}}", + "delay": "13" + }, + "source": "GIT" + }, + "existing_cluster_id": "0523-124114-7ef5b8u0", + "timeout_seconds": 0, + "email_notifications": {}, + "notification_settings": { + "no_alert_for_skipped_runs": false, + "no_alert_for_canceled_runs": false, + "alert_on_last_attempt": false + } + }, + { + "task_key": "raw_transform_sihsus", "depends_on": [ { - "task_key": "raw_ingestion" + "task_key": "raw_ingestion_sihsus" } ], "notebook_task": { "notebook_path": "src/01.raw/datasus/transform", + "base_parameters": { + "datasource": "sihsus" + }, + "source": "GIT" + }, + "existing_cluster_id": "0523-124114-7ef5b8u0", + "timeout_seconds": 0, + "email_notifications": {}, + "notification_settings": { + "no_alert_for_skipped_runs": false, + "no_alert_for_canceled_runs": false, + "alert_on_last_attempt": false + } + }, + { + "task_key": "raw_transform_sinasc", + "depends_on": [ + { + "task_key": "raw_ingestion_sinasc" + } + ], + "notebook_task": { + "notebook_path": "src/01.raw/datasus/transform", + "base_parameters": { + "datasource": "sinasc" + }, "source": "GIT" }, "existing_cluster_id": "0523-124114-7ef5b8u0", From 2ac9217e50931dface87000a4ee2a19e1892230f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?T=C3=A9o=20Calvo?= Date: Tue, 15 Aug 2023 16:26:23 -0300 Subject: [PATCH 8/8] fix datasus jobs --- src/06.workflows/data4u_datasus.json | 8 ++++---- src/06.workflows/sync_jobs.py | 5 ++++- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/06.workflows/data4u_datasus.json b/src/06.workflows/data4u_datasus.json index 42598c6..d969c56 100644 --- a/src/06.workflows/data4u_datasus.json +++ b/src/06.workflows/data4u_datasus.json @@ -96,10 +96,10 @@ } }, { - "task_key": "bronze_ingestion", + "task_key": "bronze_ingestion_sihsus", "depends_on": [ { - "task_key": "raw_transform" + "task_key": "raw_transform_sihsus" } ], "notebook_task": { @@ -119,7 +119,7 @@ "task_key": "silver_rd_sih", "depends_on": [ { - "task_key": "bronze_ingestion" + "task_key": "bronze_ingestion_sihsus" } ], "notebook_task": { @@ -139,7 +139,7 @@ "task_key": "csv_cleaner", "depends_on": [ { - "task_key": "bronze_ingestion" + "task_key": "bronze_ingestion_sihsus" } ], "notebook_task": { diff --git a/src/06.workflows/sync_jobs.py b/src/06.workflows/sync_jobs.py index 4d5c421..95fd55d 100644 --- a/src/06.workflows/sync_jobs.py +++ b/src/06.workflows/sync_jobs.py @@ -9,7 +9,7 @@ # %% -dotenv.load_dotenv(".env") +dotenv.load_dotenv(dotenv.find_dotenv(".env")) DATABRICKS_WORKSPACE_TOKEN = os.getenv("DATABRICKS_WORKSPACE_TOKEN") DATABRICKS_WORKSPACE_URL = os.getenv("DATABRICKS_WORKSPACE_URL") @@ -77,3 +77,6 @@ def create_or_update_job( job_settings = import_json(i) job_name = i.split(".")[0] resp = create_or_update_job(job_name, job_settings, df_jobs, job_client) + + if resp.status_code != 200: + print(i, resp.text)