diff --git a/databases/create.sql b/databases/create.sql index 8218b9c..33cf30f 100644 --- a/databases/create.sql +++ b/databases/create.sql @@ -14,3 +14,8 @@ CREATE DATABASE IF NOT EXISTS silver.igdb; -- DBTITLE 1,LinuxTips CREATE DATABASE IF NOT EXISTS bronze.linuxtips; CREATE DATABASE IF NOT EXISTS silver.pizza_query; + +-- COMMAND ---------- + +-- DBTITLE 1,DataSUS +CREATE DATABASE IF NOT EXISTS bronze.datasus; diff --git a/databases/grant.sql b/databases/grant.sql index ecc0205..cb49e27 100644 --- a/databases/grant.sql +++ b/databases/grant.sql @@ -1,5 +1,6 @@ -- Databricks notebook source + -- COMMAND ---------- -- DBTITLE 1,Olist @@ -10,4 +11,3 @@ GRANT READ_METADATA ON DATABASE `bronze.olist` TO `twitch`; GRANT USAGE ON DATABASE `silver.olist` TO `twitch`; GRANT SELECT ON DATABASE `silver.olist` TO `twitch`; GRANT READ_METADATA ON DATABASE `silver.olist` TO `twitch`; - diff --git a/src/01.raw/datasus/ingestao.py b/src/01.raw/datasus/ingestao.py new file mode 100644 index 0000000..79aaab3 --- /dev/null +++ b/src/01.raw/datasus/ingestao.py @@ -0,0 +1,54 @@ +# Databricks notebook source +# MAGIC %pip install tqdm + +# COMMAND ---------- + +import urllib.request +from multiprocessing import Pool +from tqdm import tqdm + +import datetime +from dateutil.relativedelta import relativedelta + +import sys +sys.path.insert(0,"../../lib/") + +import dttools + +def get_data_uf_ano_mes(uf, ano, mes): + url = f"ftp://ftp.datasus.gov.br/dissemin/publicos/SIHSUS/200801_/Dados/RD{uf}{ano}{mes}.dbc" + + file_path = f"/dbfs/mnt/datalake/datasus/rd/dbc/landing/RD{uf}{ano}{mes}.dbc" + try: + resp = urllib.request.urlretrieve(url, file_path) + except: + print("Não foi possível coletar o arquivo.") + +def get_data_uf(uf, datas): + for i in tqdm(datas): + ano, mes, dia = i.split("-") + ano = ano[-2:] + get_data_uf_ano_mes(uf, ano, mes) + + +ufs = ["RO", "AC", "AM", "RR","PA", + "AP", "TO", "MA", "PI", "CE", + "RN", "PB", "PE", "AL", "SE", + "BA", "MG", "ES", "RJ", "SP", + "PR", "SC", "RS", "MS", "MT", + "GO","DF"] + + +# COMMAND ---------- + +dt_start = dbutils.widgets.get("dt_start") +dt_stop = dbutils.widgets.get("dt_stop") +delay = int(dbutils.widgets.get("delay")) + +dt_start = (datetime.datetime.strptime(dt_start, "%Y-%m-%d") - relativedelta(months=delay)).strftime("%Y-%m-01") + +datas = dttools.date_range(dt_start, dt_stop, monthly=True) +to_download = [(uf, datas) for uf in ufs] + +with Pool(10) as pool: + pool.starmap(get_data_uf, to_download) diff --git a/src/01.raw/datasus/transform.r b/src/01.raw/datasus/transform.r new file mode 100644 index 0000000..060eb9f --- /dev/null +++ b/src/01.raw/datasus/transform.r @@ -0,0 +1,22 @@ +# Databricks notebook source +install.packages("read.dbc") + +# COMMAND ---------- + +library(read.dbc) + +date = format(Sys.time(), "%Y%m%d") + +dbc_folder <- "/dbfs/mnt/datalake/datasus/rd/dbc/landing" +csv_folder <- "/dbfs/mnt/datalake/datasus/rd/csv" + +files <- list.files(dbc_folder, full.names=TRUE) +for(f in files) { + print(f) + df= read.dbc(f) + lista = strsplit(f, "/")[[1]] + + file = gsub(".dbc", paste(date, "csv", sep="."), lista[length(lista)]) + write.csv2(df, paste(csv_folder, file, sep="/"), row.names=FALSE) + file.rename(from=f, to=gsub("landing", "proceeded", f)) +} diff --git a/src/02.bronze/datasus/ingestao.py b/src/02.bronze/datasus/ingestao.py new file mode 100644 index 0000000..82704ed --- /dev/null +++ b/src/02.bronze/datasus/ingestao.py @@ -0,0 +1,48 @@ +# Databricks notebook source +# DBTITLE 1,Imports +import sys + +sys.path.insert(0, '../../lib') + +from ingestors import IngestaoBronze +import dbtools + +# COMMAND ---------- + +# DBTITLE 1,Setup +table = "rd_sih" +path_full_load=f'/mnt/datalake/datasus/rd/csv' +path_incremental=f'/mnt/datalake/datasus/rd/csv' +file_format='csv' +table_name=table +database_name='bronze.datasus' +id_fields= ["N_AIH", "DT_SAIDA", "IDENT"] +timestamp_field='DT_SAIDA' +partition_fields=["ANO_CMPT","MES_CMPT"] +read_options = {'sep': ';','header': "true"} + +ingestao = IngestaoBronze( + path_full_load=path_full_load, + path_incremental=path_incremental, + file_format=file_format, + table_name=table_name, + database_name=database_name, + id_fields=id_fields, + timestamp_field=timestamp_field, + partition_fields=partition_fields, + read_options=read_options, + spark=spark, +) + +# COMMAND ---------- + +# DBTITLE 1,Criação da tabela +if not dbtools.table_exists(spark, database_name, table): + df_null = spark.createDataFrame(data=[], schema=ingestao.schema) + ingestao.save_full(df_null) + dbutils.fs.rm(ingestao.checkpoint_path, True) + +# COMMAND ---------- + +# DBTITLE 1,Ingestão por streaming +ingestao.process_stream() diff --git a/src/04.workflows/data4u_datasus.json b/src/04.workflows/data4u_datasus.json new file mode 100644 index 0000000..78b095a --- /dev/null +++ b/src/04.workflows/data4u_datasus.json @@ -0,0 +1,79 @@ +{ + "settings": { + "name": "data4u_datasus", + "email_notifications": { + "no_alert_for_skipped_runs": false + }, + "webhook_notifications": {}, + "timeout_seconds": 0, + "max_concurrent_runs": 1, + "tasks": [ + { + "task_key": "raw_ingestion", + "notebook_task": { + "notebook_path": "src/01.raw/datasus/ingestao", + "base_parameters": { + "dt_start": "{{start_date}}", + "dt_stop": "{{start_date}}", + "delay": "13" + }, + "source": "GIT" + }, + "existing_cluster_id": "0523-124114-7ef5b8u0", + "timeout_seconds": 0, + "email_notifications": {}, + "notification_settings": { + "no_alert_for_skipped_runs": false, + "no_alert_for_canceled_runs": false, + "alert_on_last_attempt": false + } + }, + { + "task_key": "raw_transform", + "depends_on": [ + { + "task_key": "raw_ingestion" + } + ], + "notebook_task": { + "notebook_path": "src/01.raw/datasus/transform", + "source": "GIT" + }, + "existing_cluster_id": "0523-124114-7ef5b8u0", + "timeout_seconds": 0, + "email_notifications": {}, + "notification_settings": { + "no_alert_for_skipped_runs": false, + "no_alert_for_canceled_runs": false, + "alert_on_last_attempt": false + } + }, + { + "task_key": "bronze_ingestion", + "depends_on": [ + { + "task_key": "raw_transform" + } + ], + "notebook_task": { + "notebook_path": "src/02.bronze/datasus/ingestao", + "source": "GIT" + }, + "existing_cluster_id": "0523-124114-7ef5b8u0", + "timeout_seconds": 0, + "email_notifications": {}, + "notification_settings": { + "no_alert_for_skipped_runs": false, + "no_alert_for_canceled_runs": false, + "alert_on_last_attempt": false + } + } + ], + "git_source": { + "git_url": "https://github.com/TeoMeWhy/data-4u", + "git_provider": "gitHub", + "git_branch": "feat/raw_datasus" + }, + "format": "MULTI_TASK" + } +} \ No newline at end of file