Skip to content

Commit

Permalink
Merge pull request #32 from TeoMeWhy/feat/raw_datasus
Browse files Browse the repository at this point in the history
Feat/raw datasus
  • Loading branch information
TeoCalvo authored Jul 14, 2023
2 parents 25096c9 + 8c83ee4 commit 545bac1
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 1 deletion.
5 changes: 5 additions & 0 deletions databases/create.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,8 @@ CREATE DATABASE IF NOT EXISTS silver.igdb;
-- DBTITLE 1,LinuxTips
CREATE DATABASE IF NOT EXISTS bronze.linuxtips;
CREATE DATABASE IF NOT EXISTS silver.pizza_query;

-- COMMAND ----------

-- DBTITLE 1,DataSUS
CREATE DATABASE IF NOT EXISTS bronze.datasus;
2 changes: 1 addition & 1 deletion databases/grant.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
-- Databricks notebook source


-- COMMAND ----------

-- DBTITLE 1,Olist
Expand All @@ -10,4 +11,3 @@ GRANT READ_METADATA ON DATABASE `bronze.olist` TO `twitch`;
GRANT USAGE ON DATABASE `silver.olist` TO `twitch`;
GRANT SELECT ON DATABASE `silver.olist` TO `twitch`;
GRANT READ_METADATA ON DATABASE `silver.olist` TO `twitch`;

54 changes: 54 additions & 0 deletions src/01.raw/datasus/ingestao.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Databricks notebook source
# MAGIC %pip install tqdm

# COMMAND ----------

import urllib.request
from multiprocessing import Pool
from tqdm import tqdm

import datetime
from dateutil.relativedelta import relativedelta

import sys
sys.path.insert(0,"../../lib/")

import dttools

def get_data_uf_ano_mes(uf, ano, mes):
url = f"ftp://ftp.datasus.gov.br/dissemin/publicos/SIHSUS/200801_/Dados/RD{uf}{ano}{mes}.dbc"

file_path = f"/dbfs/mnt/datalake/datasus/rd/dbc/landing/RD{uf}{ano}{mes}.dbc"
try:
resp = urllib.request.urlretrieve(url, file_path)
except:
print("Não foi possível coletar o arquivo.")

def get_data_uf(uf, datas):
for i in tqdm(datas):
ano, mes, dia = i.split("-")
ano = ano[-2:]
get_data_uf_ano_mes(uf, ano, mes)


ufs = ["RO", "AC", "AM", "RR","PA",
"AP", "TO", "MA", "PI", "CE",
"RN", "PB", "PE", "AL", "SE",
"BA", "MG", "ES", "RJ", "SP",
"PR", "SC", "RS", "MS", "MT",
"GO","DF"]


# COMMAND ----------

dt_start = dbutils.widgets.get("dt_start")
dt_stop = dbutils.widgets.get("dt_stop")
delay = int(dbutils.widgets.get("delay"))

dt_start = (datetime.datetime.strptime(dt_start, "%Y-%m-%d") - relativedelta(months=delay)).strftime("%Y-%m-01")

datas = dttools.date_range(dt_start, dt_stop, monthly=True)
to_download = [(uf, datas) for uf in ufs]

with Pool(10) as pool:
pool.starmap(get_data_uf, to_download)
22 changes: 22 additions & 0 deletions src/01.raw/datasus/transform.r
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Databricks notebook source
install.packages("read.dbc")

# COMMAND ----------

library(read.dbc)

date = format(Sys.time(), "%Y%m%d")

dbc_folder <- "/dbfs/mnt/datalake/datasus/rd/dbc/landing"
csv_folder <- "/dbfs/mnt/datalake/datasus/rd/csv"

files <- list.files(dbc_folder, full.names=TRUE)
for(f in files) {
print(f)
df= read.dbc(f)
lista = strsplit(f, "/")[[1]]

file = gsub(".dbc", paste(date, "csv", sep="."), lista[length(lista)])
write.csv2(df, paste(csv_folder, file, sep="/"), row.names=FALSE)
file.rename(from=f, to=gsub("landing", "proceeded", f))
}
48 changes: 48 additions & 0 deletions src/02.bronze/datasus/ingestao.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Databricks notebook source
# DBTITLE 1,Imports
import sys

sys.path.insert(0, '../../lib')

from ingestors import IngestaoBronze
import dbtools

# COMMAND ----------

# DBTITLE 1,Setup
table = "rd_sih"
path_full_load=f'/mnt/datalake/datasus/rd/csv'
path_incremental=f'/mnt/datalake/datasus/rd/csv'
file_format='csv'
table_name=table
database_name='bronze.datasus'
id_fields= ["N_AIH", "DT_SAIDA", "IDENT"]
timestamp_field='DT_SAIDA'
partition_fields=["ANO_CMPT","MES_CMPT"]
read_options = {'sep': ';','header': "true"}

ingestao = IngestaoBronze(
path_full_load=path_full_load,
path_incremental=path_incremental,
file_format=file_format,
table_name=table_name,
database_name=database_name,
id_fields=id_fields,
timestamp_field=timestamp_field,
partition_fields=partition_fields,
read_options=read_options,
spark=spark,
)

# COMMAND ----------

# DBTITLE 1,Criação da tabela
if not dbtools.table_exists(spark, database_name, table):
df_null = spark.createDataFrame(data=[], schema=ingestao.schema)
ingestao.save_full(df_null)
dbutils.fs.rm(ingestao.checkpoint_path, True)

# COMMAND ----------

# DBTITLE 1,Ingestão por streaming
ingestao.process_stream()
79 changes: 79 additions & 0 deletions src/04.workflows/data4u_datasus.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
{
"settings": {
"name": "data4u_datasus",
"email_notifications": {
"no_alert_for_skipped_runs": false
},
"webhook_notifications": {},
"timeout_seconds": 0,
"max_concurrent_runs": 1,
"tasks": [
{
"task_key": "raw_ingestion",
"notebook_task": {
"notebook_path": "src/01.raw/datasus/ingestao",
"base_parameters": {
"dt_start": "{{start_date}}",
"dt_stop": "{{start_date}}",
"delay": "13"
},
"source": "GIT"
},
"existing_cluster_id": "0523-124114-7ef5b8u0",
"timeout_seconds": 0,
"email_notifications": {},
"notification_settings": {
"no_alert_for_skipped_runs": false,
"no_alert_for_canceled_runs": false,
"alert_on_last_attempt": false
}
},
{
"task_key": "raw_transform",
"depends_on": [
{
"task_key": "raw_ingestion"
}
],
"notebook_task": {
"notebook_path": "src/01.raw/datasus/transform",
"source": "GIT"
},
"existing_cluster_id": "0523-124114-7ef5b8u0",
"timeout_seconds": 0,
"email_notifications": {},
"notification_settings": {
"no_alert_for_skipped_runs": false,
"no_alert_for_canceled_runs": false,
"alert_on_last_attempt": false
}
},
{
"task_key": "bronze_ingestion",
"depends_on": [
{
"task_key": "raw_transform"
}
],
"notebook_task": {
"notebook_path": "src/02.bronze/datasus/ingestao",
"source": "GIT"
},
"existing_cluster_id": "0523-124114-7ef5b8u0",
"timeout_seconds": 0,
"email_notifications": {},
"notification_settings": {
"no_alert_for_skipped_runs": false,
"no_alert_for_canceled_runs": false,
"alert_on_last_attempt": false
}
}
],
"git_source": {
"git_url": "https://github.com/TeoMeWhy/data-4u",
"git_provider": "gitHub",
"git_branch": "feat/raw_datasus"
},
"format": "MULTI_TASK"
}
}

0 comments on commit 545bac1

Please sign in to comment.