Skip to content

Commit

Permalink
Merge pull request #54 from TeoMeWhy/feat/datasus
Browse files Browse the repository at this point in the history
Feat/datasus
  • Loading branch information
TeoCalvo authored Aug 15, 2023
2 parents a8c2a7b + 1482fbb commit c20eecd
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 35 deletions.
40 changes: 17 additions & 23 deletions src/01.raw/datasus/ingestao.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ def __init__(self, ufs, date_range, source, n_jobs=2):
self.dates=[]
self.set_dates(date_range)


def set_dates(self, date_range):
self.dates = dttools.date_range(date_range[0], date_range[-1], period=self.period)

Expand All @@ -44,13 +43,17 @@ def get_data_uf_ano_mes(self, uf, ano, mes):

try:
resp = urllib.request.urlretrieve(url, file_path)

except:
print(f"Não foi possível coletar o arquivo. {uf} | {ano}-{mes}-01")

def get_data_uf(self, uf):
for i in tqdm(self.dates):
ano, mes, dia = i.split("-")
ano = ano[-2:]

if self.period == 'monthly':
ano = ano[-2:]

self.get_data_uf_ano_mes(uf, ano, mes)


Expand All @@ -59,30 +62,25 @@ def auto_execute(self):
pool.map(self.get_data_uf, self.ufs)


ufs = ["RO", "AC", "AM", "RR","PA",
"AP", "TO", "MA", "PI", "CE",
"RN", "PB", "PE", "AL", "SE",
"BA", "MG", "ES", "RJ", "SP",
"PR", "SC", "RS", "MS", "MT",
"GO","DF"]


# COMMAND ----------

# datasource = dbutils.widgets.get("datasource")
datasource = 'sihsus'
datasource = dbutils.widgets.get("datasource")
dt_start = dbutils.widgets.get("dt_start")
dt_stop = dbutils.widgets.get("dt_stop")
delay = int(dbutils.widgets.get("delay"))

# dt_start = dbutils.widgets.get("dt_start")
dt_start = '2023-08-01'
dt_start = (datetime.datetime.strptime(dt_start, "%Y-%m-%d") - relativedelta(months=delay)).strftime("%Y-%m-01")

# dt_stop = dbutils.widgets.get("dt_stop")
dt_stop = '2023-08-01'

# delay = int(dbutils.widgets.get("delay"))
delay = 3
ufs = ["RO", "AC", "AM", "RR","PA",
"AP", "TO", "MA", "PI", "CE",
"RN", "PB", "PE", "AL", "SE",
"BA", "MG", "ES", "RJ", "SP",
"PR", "SC", "RS", "MS", "MT",
"GO", "DF"]

ufs.sort(reverse=True)

dt_start = (datetime.datetime.strptime(dt_start, "%Y-%m-%d") - relativedelta(months=delay)).strftime("%Y-%m-01")

ing = IngestionRawSUS(ufs=ufs,
date_range=[dt_start, dt_stop],
Expand All @@ -92,7 +90,3 @@ def auto_execute(self):
# COMMAND ----------

ing.auto_execute()

# COMMAND ----------


26 changes: 17 additions & 9 deletions src/01.raw/datasus/transform.r
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
# Databricks notebook source
install.packages("read.dbc")
install.packages("doParallel")
install.packages("jsonlite")

# COMMAND ----------

library(read.dbc)
library(foreach)
library(doParallel)
library(jsonlite)

date = format(Sys.time(), "%Y%m%d")

dbc_folder <- "/dbfs/mnt/datalake/datasus/rd/dbc/landing"
csv_folder <- "/dbfs/mnt/datalake/datasus/rd/csv"
datasource <- dbutils.widgets.get("datasource")
datasources <- fromJSON("datasources.json")

path = datasources[datasource][[1]]['target'][[1]]
partes <- unlist(strsplit(path, "/"))
partes <- partes[-length(partes)]
dbc_folder <- paste(partes, collapse = "/")
csv_folder <- sub('/dbc/landing', '/csv', dbc_folder)

files <- list.files(dbc_folder, full.names=TRUE)

Expand All @@ -31,12 +39,12 @@ etl <- function(f) {

registerDoParallel(8)
while (sum(is.na(files)) != length(files)) {
batch = files[1:min(8, length(files))]
files = files[1+min(8, length(files)):length(files)]
foreach (i=batch) %dopar% {
print(i)
if (is.na(i) == FALSE) {
etl(i)
}
batch = files[1:min(8, length(files))]
files = files[1+min(8, length(files)):length(files)]
foreach (i=batch) %dopar% {
print(i)
if (is.na(i) == FALSE) {
etl(i)
}
}
}
54 changes: 51 additions & 3 deletions src/06.workflows/data4u_datasus.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
"max_concurrent_runs": 1,
"tasks": [
{
"task_key": "raw_ingestion",
"task_key": "raw_ingestion_sihsus",
"notebook_task": {
"notebook_path": "src/01.raw/datasus/ingestao",
"base_parameters": {
"datasource": "sihsus",
"dt_start": "{{start_date}}",
"dt_stop": "{{start_date}}",
"delay": "13"
Expand All @@ -28,14 +29,61 @@
}
},
{
"task_key": "raw_transform",
"task_key": "raw_ingestion_sinasc",
"notebook_task": {
"notebook_path": "src/01.raw/datasus/ingestao",
"base_parameters": {
"datasource": "sinasc",
"dt_start": "{{start_date}}",
"dt_stop": "{{start_date}}",
"delay": "13"
},
"source": "GIT"
},
"existing_cluster_id": "0523-124114-7ef5b8u0",
"timeout_seconds": 0,
"email_notifications": {},
"notification_settings": {
"no_alert_for_skipped_runs": false,
"no_alert_for_canceled_runs": false,
"alert_on_last_attempt": false
}
},
{
"task_key": "raw_transform_sihsus",
"depends_on": [
{
"task_key": "raw_ingestion"
"task_key": "raw_ingestion_sihsus"
}
],
"notebook_task": {
"notebook_path": "src/01.raw/datasus/transform",
"base_parameters": {
"datasource": "sihsus"
},
"source": "GIT"
},
"existing_cluster_id": "0523-124114-7ef5b8u0",
"timeout_seconds": 0,
"email_notifications": {},
"notification_settings": {
"no_alert_for_skipped_runs": false,
"no_alert_for_canceled_runs": false,
"alert_on_last_attempt": false
}
},
{
"task_key": "raw_transform_sinasc",
"depends_on": [
{
"task_key": "raw_ingestion_sinasc"
}
],
"notebook_task": {
"notebook_path": "src/01.raw/datasus/transform",
"base_parameters": {
"datasource": "sinasc"
},
"source": "GIT"
},
"existing_cluster_id": "0523-124114-7ef5b8u0",
Expand Down

0 comments on commit c20eecd

Please sign in to comment.