Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Syn dota and from main #58

Merged
merged 24 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
c5196b3
Merge pull request #40 from TeoMeWhy/feat/dota
TeoCalvo Aug 4, 2023
516f7bc
Merge pull request #41 from TeoMeWhy/feat/dota
TeoCalvo Aug 4, 2023
790237b
Merge pull request #42 from TeoMeWhy/feat/dota
TeoCalvo Aug 4, 2023
d0c5c86
Merge pull request #43 from TeoMeWhy/feat/dota
TeoCalvo Aug 5, 2023
9feaff0
Merge pull request #44 from TeoMeWhy/feat/dota
TeoCalvo Aug 5, 2023
e2ac19d
Merge pull request #45 from TeoMeWhy/feat/dota
TeoCalvo Aug 5, 2023
529f838
Merge pull request #46 from TeoMeWhy/feat/dota
TeoCalvo Aug 5, 2023
d2a979a
Merge pull request #47 from TeoMeWhy/feat/dota
TeoCalvo Aug 5, 2023
ac78baf
Merge pull request #48 from TeoMeWhy/feat/dota
TeoCalvo Aug 5, 2023
821e789
[ibge] Dados de municipios brasileiros
TeoCalvo Aug 9, 2023
bfc5f81
Municipios brasileiros em silver
TeoCalvo Aug 9, 2023
42347c9
Merge pull request #49 from TeoMeWhy/feat/ibge
TeoCalvo Aug 9, 2023
0ce94cd
Adicao da descição do CID no Diagnóstico Princial e secundário
TeoCalvo Aug 9, 2023
23fcf53
Merge pull request #50 from TeoMeWhy/feat/ibge
TeoCalvo Aug 9, 2023
d9a7c69
Add sinasc
Aug 15, 2023
b8ed93c
Fix date_range
TeoCalvo Aug 15, 2023
e16149a
Merge pull request #51 from TeoMeWhy/fix/date_range
TeoCalvo Aug 15, 2023
82ea8df
Merge pull request #52 from TeoMeWhy/main
TeoCalvo Aug 15, 2023
a8c2a7b
Merge pull request #53 from TeoMeWhy/feat/datasus
TeoCalvo Aug 15, 2023
79e3f80
Auto execute com sinasc
Aug 15, 2023
1482fbb
Add parameter to jobs
Aug 15, 2023
c20eecd
Merge pull request #54 from TeoMeWhy/feat/datasus
TeoCalvo Aug 15, 2023
2ac9217
fix datasus jobs
TeoCalvo Aug 15, 2023
8085f61
Merge pull request #57 from TeoMeWhy/feat/dota
TeoCalvo Aug 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions databases/create.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,19 @@
-- Databricks notebook source
-- DBTITLE 1,Olist
CREATE DATABASE IF NOT EXISTS bronze.olist;
CREATE DATABASE IF NOT EXISTS silver.olist;
-- DBTITLE 1,DataSUS
CREATE DATABASE IF NOT EXISTS bronze.datasus;
CREATE DATABASE IF NOT EXISTS silver.datasus;

-- COMMAND ----------

-- DBTITLE 1,Dota
CREATE DATABASE IF NOT EXISTS bronze.dota;
CREATE DATABASE IF NOT EXISTS silver.dota;

-- COMMAND ----------

-- DBTITLE 1,IBGE
CREATE DATABASE IF NOT EXISTS bronze.ibge;
CREATE DATABASE IF NOT EXISTS silver.ibge;

-- COMMAND ----------

Expand All @@ -17,12 +29,6 @@ CREATE DATABASE IF NOT EXISTS silver.pizza_query;

-- COMMAND ----------

-- DBTITLE 1,DataSUS
CREATE DATABASE IF NOT EXISTS bronze.datasus;
CREATE DATABASE IF NOT EXISTS silver.datasus;

-- COMMAND ----------

-- DBTITLE 1,Dota
CREATE DATABASE IF NOT EXISTS bronze.dota;
CREATE DATABASE IF NOT EXISTS silver.dota;
-- DBTITLE 1,Olist
CREATE DATABASE IF NOT EXISTS bronze.olist;
CREATE DATABASE IF NOT EXISTS silver.olist;
12 changes: 12 additions & 0 deletions src/01.raw/datasus/datasources.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"sihsus": {
"origin": "ftp://ftp.datasus.gov.br/dissemin/publicos/SIHSUS/200801_/Dados/RD{uf}{ano}{mes}.dbc",
"target": "/dbfs/mnt/datalake/datasus/rd/dbc/landing/RD{uf}{ano}{mes}.dbc",
"period": "monthly"
},
"sinasc": {
"origin": "ftp://ftp.datasus.gov.br/dissemin/publicos/SINASC/1996_/Dados/DNRES/DN{uf}{ano}.dbc",
"target": "/dbfs/mnt/datalake/datasus/sinasc/dbc/landing/DN{uf}{ano}.dbc",
"period": "yearly"
}
}
82 changes: 60 additions & 22 deletions src/01.raw/datasus/ingestao.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import urllib.request
from multiprocessing import Pool
from tqdm import tqdm
import json

import datetime
from dateutil.relativedelta import relativedelta
Expand All @@ -15,40 +16,77 @@

import dttools

def get_data_uf_ano_mes(uf, ano, mes):
url = f"ftp://ftp.datasus.gov.br/dissemin/publicos/SIHSUS/200801_/Dados/RD{uf}{ano}{mes}.dbc"
class IngestionRawSUS:

file_path = f"/dbfs/mnt/datalake/datasus/rd/dbc/landing/RD{uf}{ano}{mes}.dbc"
try:
resp = urllib.request.urlretrieve(url, file_path)
except:
print("Não foi possível coletar o arquivo.")
def __init__(self, ufs, date_range, source, n_jobs=2):
self.ufs = ufs
self.n_jobs = n_jobs

def get_data_uf(uf, datas):
for i in tqdm(datas):
ano, mes, dia = i.split("-")
ano = ano[-2:]
get_data_uf_ano_mes(uf, ano, mes)
with open('datasources.json', 'r') as open_file:
datasources = json.load(open_file)

self.origin = datasources[source]['origin']
self.target = datasources[source]['target']
self.period = datasources[source]['period']

self.dates=[]
self.set_dates(date_range)

def set_dates(self, date_range):
self.dates = dttools.date_range(date_range[0], date_range[-1], period=self.period)


def get_data_uf_ano_mes(self, uf, ano, mes):

url = self.origin.format(uf=uf, ano=ano, mes=mes)
file_path = self.target.format(uf=uf, ano=ano, mes=mes)

try:
resp = urllib.request.urlretrieve(url, file_path)

except:
print(f"Não foi possível coletar o arquivo. {uf} | {ano}-{mes}-01")

def get_data_uf(self, uf):
for i in tqdm(self.dates):
ano, mes, dia = i.split("-")

if self.period == 'monthly':
ano = ano[-2:]

self.get_data_uf_ano_mes(uf, ano, mes)

ufs = ["RO", "AC", "AM", "RR","PA",
"AP", "TO", "MA", "PI", "CE",
"RN", "PB", "PE", "AL", "SE",
"BA", "MG", "ES", "RJ", "SP",
"PR", "SC", "RS", "MS", "MT",
"GO","DF"]

def auto_execute(self):
with Pool(self.n_jobs) as pool:
pool.map(self.get_data_uf, self.ufs)


# COMMAND ----------

datasource = dbutils.widgets.get("datasource")
dt_start = dbutils.widgets.get("dt_start")
dt_stop = dbutils.widgets.get("dt_stop")
delay = int(dbutils.widgets.get("delay"))

dt_start = (datetime.datetime.strptime(dt_start, "%Y-%m-%d") - relativedelta(months=delay)).strftime("%Y-%m-01")

datas = dttools.date_range(dt_start, dt_stop, monthly=True)
to_download = [(uf, datas) for uf in ufs]

with Pool(10) as pool:
pool.starmap(get_data_uf, to_download)
ufs = ["RO", "AC", "AM", "RR","PA",
"AP", "TO", "MA", "PI", "CE",
"RN", "PB", "PE", "AL", "SE",
"BA", "MG", "ES", "RJ", "SP",
"PR", "SC", "RS", "MS", "MT",
"GO", "DF"]

ufs.sort(reverse=True)


ing = IngestionRawSUS(ufs=ufs,
date_range=[dt_start, dt_stop],
source=datasource,
n_jobs=10)

# COMMAND ----------

ing.auto_execute()
20 changes: 20 additions & 0 deletions src/01.raw/datasus/ingestion_cid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Databricks notebook source
# MAGIC %pip install lxml

# COMMAND ----------

import pandas as pd

url = 'http://tabnet.datasus.gov.br/cgi/sih/mxcid10lm.htm'

dfs = pd.read_html(url)
df = dfs[2]
df.columns = df.columns.levels[1]

# COMMAND ----------

df.to_csv("/dbfs/mnt/datalake/datasus/cid/volume1.csv", index=False, sep=";")

# COMMAND ----------

df
26 changes: 17 additions & 9 deletions src/01.raw/datasus/transform.r
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
# Databricks notebook source
install.packages("read.dbc")
install.packages("doParallel")
install.packages("jsonlite")

# COMMAND ----------

library(read.dbc)
library(foreach)
library(doParallel)
library(jsonlite)

date = format(Sys.time(), "%Y%m%d")

dbc_folder <- "/dbfs/mnt/datalake/datasus/rd/dbc/landing"
csv_folder <- "/dbfs/mnt/datalake/datasus/rd/csv"
datasource <- dbutils.widgets.get("datasource")
datasources <- fromJSON("datasources.json")

path = datasources[datasource][[1]]['target'][[1]]
partes <- unlist(strsplit(path, "/"))
partes <- partes[-length(partes)]
dbc_folder <- paste(partes, collapse = "/")
csv_folder <- sub('/dbc/landing', '/csv', dbc_folder)

files <- list.files(dbc_folder, full.names=TRUE)

Expand All @@ -31,12 +39,12 @@ etl <- function(f) {

registerDoParallel(8)
while (sum(is.na(files)) != length(files)) {
batch = files[1:min(8, length(files))]
files = files[1+min(8, length(files)):length(files)]
foreach (i=batch) %dopar% {
print(i)
if (is.na(i) == FALSE) {
etl(i)
}
batch = files[1:min(8, length(files))]
files = files[1+min(8, length(files)):length(files)]
foreach (i=batch) %dopar% {
print(i)
if (is.na(i) == FALSE) {
etl(i)
}
}
}
73 changes: 73 additions & 0 deletions src/02.bronze/datasus/ingestao_cid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Databricks notebook source
import pandas as pd
import string

def range_letter(x, stop_number=None):
letter = x[0]
start_number = int(x.split("-")[0][1:])
stop_number = int(x.split("-")[1][1:]) if stop_number == None else stop_number

values = [ f'{letter}{i:02d}' for i in range(start_number, stop_number+1) ]
return values


def make_range(x):

x = x.strip(" ")

try:
start, stop = x.split("-")
except ValueError as err:
return x

letter_start = start[0]
letter_start_pos = string.ascii_uppercase.find(letter_start)
number_start = int(float(start[1:]))

letter_stop = stop[0]
letter_stop_pos = string.ascii_uppercase.find(letter_stop)
number_stop = int(float(stop[1:]))

values = []
letter_pos = letter_start_pos
while letter_pos < letter_stop_pos:

letter = string.ascii_uppercase[letter_pos]

values.extend(range_letter(f'{letter}{number_start:02d}-{letter}99'))

letter_pos += 1
number_start = 1

values.extend(range_letter(f'{letter_stop}{number_start}-{letter_stop}{number_stop}'))

return values

# COMMAND ----------

df = pd.read_csv("/dbfs/mnt/datalake/datasus/cid/volume1.csv", sep=";")

# COMMAND ----------

df['DescricaoLista'] = df['Descrição'].fillna("").apply(lambda x: x.split(","))

df_explode = df.explode("DescricaoLista").iloc[:-1]
df_explode = df_explode[df_explode["Código"] != '-']

df_explode["DescricaoListaRange"] = df_explode["DescricaoLista"].apply(make_range)
df_completa = df_explode.explode("DescricaoListaRange")

columns = {
"Capítulo": "descCapituloCID" ,
"Código": "codCID",
"Códigos da CID-10": "descCID" ,
"Descrição": "codCID10" ,
"DescricaoListaRange": "codCID10dst",
}

df_completa = df_completa.rename(columns=columns)[columns.values()]
sdf = spark.createDataFrame(df_completa)

# COMMAND ----------

sdf.write.format("delta").mode("overwrite").saveAsTable("bronze.datasus.cid")
10 changes: 10 additions & 0 deletions src/02.bronze/ibge/ingestao_municipios.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Databricks notebook source
path = "/mnt/datalake/ibge/municipios_brasileiros.csv"

df = (spark.read
.csv(path, header=True, sep=";"))

(df.write
.format("delta")
.mode("overwrite")
.saveAsTable("bronze.ibge.municipios_brasileiros"))
7 changes: 7 additions & 0 deletions src/03.silver/datasus/cid.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- Databricks notebook source
DROP TABLE IF EXISTS silver.datasus.cid;

CREATE TABLE IF NOT EXISTS silver.datasus.cid AS (
SELECT *
FROM bronze.datasus.cid
);
29 changes: 27 additions & 2 deletions src/03.silver/datasus/rd_sih.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,22 @@ DROP TABLE IF EXISTS silver.datasus.rd_sih;

CREATE TABLE IF NOT EXISTS silver.datasus.rd_sih AS

WITH tb_uf AS (
select distinct codUF, descUF
from silver.ibge.municipios_brasileiros
),

tb_cid AS (
select *
from silver.datasus.cid
where codCID not like '%-%'
and codCID10 not like '%.%'
qualify row_number() over (partition by codCID10dst order by codCID desc) = 1
)

SELECT
t1.UF_ZI,
-- t1.UF_ZI,
t32.descUF,
t1.ANO_CMPT,
t1.MES_CMPT,
t4.descEspecialidade,
Expand Down Expand Up @@ -44,7 +58,9 @@ SELECT
TO_DATE(t1.DT_INTER, 'yyyymmdd') AS DtInternacao,
TO_DATE(t1.DT_SAIDA, 'yyyymmdd') AS DtSaida,
t1.DIAG_PRINC,
t33.descCID AS descDiagnosticoCIDPrinc,
t1.DIAG_SECUN,
t34.descCID AS descDiagnosticoCIDSec,
t17.`descTipoCobrança` AS descTipoCobranca,
t10.descNaturezaHospitalSUS,
t11.descNaturezaJuridica,
Expand Down Expand Up @@ -160,7 +176,8 @@ ON t1.RUBRICA = t14.codRubrica
LEFT JOIN bronze.datasus.seqaih_5 AS t15
ON t1.SEQ_AIH5 = t15.codSeqAIH

LEFT JOIN bronze.datasus.subtipo_financ AS t16
LEFT JOIN bronze.datasusOutras complicações da gravidez e do parto
.subtipo_financ AS t16
ON t1.FAEC_TP = t16.codSubTipoFinanciamento

LEFT JOIN bronze.datasus.tipo_cobranca AS t17
Expand Down Expand Up @@ -208,4 +225,12 @@ ON t1.DIAGSEC8 = t30.codTpDiagSecundario
LEFT JOIN bronze.datasus.tp_diagsecundario as t31
ON t1.DIAGSEC9 = t31.codTpDiagSecundario

LEFT JOIN tb_uf AS t32
ON substring(t1.UF_ZI,0,2) = t32.codUF

LEFT JOIN tb_cid AS t33
ON substring(t1.DIAG_PRINC,0,3) = t33.codCID10dst

LEFT JOIN tb_cid AS t34
ON substring(t1.DIAG_SECUN,0,3) = t34.codCID10dst
;
9 changes: 9 additions & 0 deletions src/03.silver/ibge/municipios_brasileiros.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- Databricks notebook source
DROP TABLE IF EXISTS silver.ibge.municipios_brasileiros;

CREATE TABLE IF NOT EXISTS silver.ibge.municipios_brasileiros AS (

SELECT *
FROM bronze.ibge.municipios_brasileiros

);
Loading
Loading