Skip to content

Commit

Permalink
Merge pull request #51 from TeoMeWhy/fix/date_range
Browse files Browse the repository at this point in the history
Fix/date range
  • Loading branch information
TeoCalvo authored Aug 15, 2023
2 parents 23fcf53 + b8ed93c commit e16149a
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 23 deletions.
12 changes: 12 additions & 0 deletions src/01.raw/datasus/datasources.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"sihsus": {
"origin": "ftp://ftp.datasus.gov.br/dissemin/publicos/SIHSUS/200801_/Dados/RD{uf}{ano}{mes}.dbc",
"target": "/dbfs/mnt/datalake/datasus/rd/dbc/landing/RD{uf}{ano}{mes}.dbc",
"period": "monthly"
},
"sinasc": {
"origin": "ftp://ftp.datasus.gov.br/dissemin/publicos/SINASC/1996_/Dados/DNRES/DN{uf}{ano}.dbc",
"target": "/dbfs/mnt/datalake/datasus/sinasc/dbc/landing/DN{uf}{ano}.dbc",
"period": "yearly"
}
}
82 changes: 63 additions & 19 deletions src/01.raw/datasus/ingestao.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import urllib.request
from multiprocessing import Pool
from tqdm import tqdm
import json

import datetime
from dateutil.relativedelta import relativedelta
Expand All @@ -15,20 +16,47 @@

import dttools

def get_data_uf_ano_mes(uf, ano, mes):
url = f"ftp://ftp.datasus.gov.br/dissemin/publicos/SIHSUS/200801_/Dados/RD{uf}{ano}{mes}.dbc"
class IngestionRawSUS:

file_path = f"/dbfs/mnt/datalake/datasus/rd/dbc/landing/RD{uf}{ano}{mes}.dbc"
try:
resp = urllib.request.urlretrieve(url, file_path)
except:
print("Não foi possível coletar o arquivo.")
def __init__(self, ufs, date_range, source, n_jobs=2):
self.ufs = ufs
self.n_jobs = n_jobs

def get_data_uf(uf, datas):
for i in tqdm(datas):
ano, mes, dia = i.split("-")
ano = ano[-2:]
get_data_uf_ano_mes(uf, ano, mes)
with open('datasources.json', 'r') as open_file:
datasources = json.load(open_file)

self.origin = datasources[source]['origin']
self.target = datasources[source]['target']
self.period = datasources[source]['period']

self.dates=[]
self.set_dates(date_range)


def set_dates(self, date_range):
self.dates = dttools.date_range(date_range[0], date_range[-1], period=self.period)


def get_data_uf_ano_mes(self, uf, ano, mes):

url = self.origin.format(uf=uf, ano=ano, mes=mes)
file_path = self.target.format(uf=uf, ano=ano, mes=mes)

try:
resp = urllib.request.urlretrieve(url, file_path)
except:
print(f"Não foi possível coletar o arquivo. {uf} | {ano}-{mes}-01")

def get_data_uf(self, uf):
for i in tqdm(self.dates):
ano, mes, dia = i.split("-")
ano = ano[-2:]
self.get_data_uf_ano_mes(uf, ano, mes)


def auto_execute(self):
with Pool(self.n_jobs) as pool:
pool.map(self.get_data_uf, self.ufs)


ufs = ["RO", "AC", "AM", "RR","PA",
Expand All @@ -41,14 +69,30 @@ def get_data_uf(uf, datas):

# COMMAND ----------

dt_start = dbutils.widgets.get("dt_start")
dt_stop = dbutils.widgets.get("dt_stop")
delay = int(dbutils.widgets.get("delay"))
# datasource = dbutils.widgets.get("datasource")
datasource = 'sihsus'

# dt_start = dbutils.widgets.get("dt_start")
dt_start = '2023-08-01'

# dt_stop = dbutils.widgets.get("dt_stop")
dt_stop = '2023-08-01'

# delay = int(dbutils.widgets.get("delay"))
delay = 3


dt_start = (datetime.datetime.strptime(dt_start, "%Y-%m-%d") - relativedelta(months=delay)).strftime("%Y-%m-01")

datas = dttools.date_range(dt_start, dt_stop, monthly=True)
to_download = [(uf, datas) for uf in ufs]
ing = IngestionRawSUS(ufs=ufs,
date_range=[dt_start, dt_stop],
source=datasource,
n_jobs=10)

# COMMAND ----------

ing.auto_execute()

# COMMAND ----------


with Pool(10) as pool:
pool.starmap(get_data_uf, to_download)
7 changes: 5 additions & 2 deletions src/03.silver/igdb/feature_store/fs_ingestao.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

date_start = dbutils.widgets.get('date_start') # parametro
date_stop = dbutils.widgets.get('date_stop') # parametro
monthly = dbutils.widgets.get('monthly') == 'True' # parametro
dates = dttools.date_range(date_start, date_stop, monthly=monthly)
period = 'monthly' if dbutils.widgets.get('monthly') == 'True' else 'daily'


# parametro
dates = dttools.date_range(date_start, date_stop, period=period)

query = dbtools.import_query(f'etl/{table}.sql')

Expand Down
7 changes: 5 additions & 2 deletions src/lib/dttools.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import datetime

def date_range(start, stop, monthly=False):
def date_range(start, stop, period='monthly'):
dt_start = datetime.datetime.strptime(start, '%Y-%m-%d')
dt_stop = datetime.datetime.strptime(stop, '%Y-%m-%d')
dates = []
Expand All @@ -9,7 +9,10 @@ def date_range(start, stop, monthly=False):
dates.append(dt_start.strftime("%Y-%m-%d"))
dt_start += datetime.timedelta(days=1)

if monthly:
if period=='monthly':
return [i for i in dates if i.endswith("01")]

elif period=='yearly':
return [i for i in dates if i.endswith("01") and i.split('-')[-2]=='01']

return dates

0 comments on commit e16149a

Please sign in to comment.