Skip to content

Commit

Permalink
first pass at handling the new nhsn format for weekly data
Browse files Browse the repository at this point in the history
  • Loading branch information
elray1 committed Nov 15, 2024
1 parent 24e736a commit 106f185
Showing 1 changed file with 55 additions and 16 deletions.
71 changes: 55 additions & 16 deletions src/iddata/loader.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from itertools import product
from urllib.parse import urljoin

import datetime
import numpy as np
import pandas as pd
import pymmwr
Expand Down Expand Up @@ -106,7 +107,7 @@ def load_one_us_census_file(self, f):
def load_us_census(self, fillna = True):
files = [
self._construct_data_raw_url("us-census/nst-est2019-alldata.csv"),
self._construct_data_raw_url("us-census/NST-EST2022-ALLDATA.csv")]
self._construct_data_raw_url("us-census/NST-EST2023-ALLDATA.csv")]
us_pops = pd.concat([self.load_one_us_census_file(f) for f in files], axis=0)

fips_mappings = pd.read_csv(self._construct_data_raw_url("fips-mappings/fips_mappings.csv"))
Expand All @@ -128,7 +129,7 @@ def load_us_census(self, fillna = True):

if fillna:
all_locations = dat["location"].unique()
all_seasons = [str(y) + "/" + str(y+1)[-2:] for y in range(1997, 2024)]
all_seasons = [str(y) + "/" + str(y+1)[-2:] for y in range(1997, 2025)]
full_result = pd.DataFrame.from_records(product(all_locations, all_seasons))
full_result.columns = ["location", "season"]
dat = full_result.merge(dat, how="left", on=["location", "season"]) \
Expand Down Expand Up @@ -303,21 +304,26 @@ def load_ilinet(self,


def load_nhsn(self, rates=True, drop_pandemic_seasons=True, as_of=None):
if drop_pandemic_seasons:
if as_of is None:
file_path = "influenza-hhs/hhs.csv"
else:
# find the largest stored file dated on or before the as_of date
as_of_file_path = f"influenza-hhs/hhs-{str(as_of)}.csv"
glob_results = s3fs.S3FileSystem(anon=True) \
.glob("infectious-disease-data/data-raw/influenza-hhs/hhs-????-??-??.csv")
all_file_paths = sorted([f[len("infectious-disease-data/data-raw/"):] for f in glob_results])
all_file_paths = [f for f in all_file_paths if f <= as_of_file_path]
file_path = all_file_paths[-1]
if not drop_pandemic_seasons:
raise NotImplementedError("Functionality for loading all seasons of NHSN data with specified as_of date is not implemented.")

if as_of is None:
as_of = datetime.date.today().isoformat()

if as_of < '2024-11-15':
return self.load_nhsn_from_hhs(rates=rates, as_of=as_of)
else:
if as_of is not None:
raise NotImplementedError("Functionality for loading all seasons of NHSN data with specified as_of date is not implemented.")
file_path = "influenza-hhs/hhs_complete.csv"
return self.load_nhsn_from_nhsn(rates=rates, as_of=as_of)


def load_nhsn_from_hhs(self, rates=True, as_of=None):
# find the largest stored file dated on or before the as_of date
as_of_file_path = f"influenza-hhs/hhs-{str(as_of)}.csv"
glob_results = s3fs.S3FileSystem(anon=True) \
.glob("infectious-disease-data/data-raw/influenza-hhs/hhs-????-??-??.csv")
all_file_paths = sorted([f[len("infectious-disease-data/data-raw/"):] for f in glob_results])
all_file_paths = [f for f in all_file_paths if f <= as_of_file_path]
file_path = all_file_paths[-1]

dat = pd.read_csv(self._construct_data_raw_url(file_path))
dat.rename(columns={"date": "wk_end_date"}, inplace=True)
Expand All @@ -340,6 +346,39 @@ def load_nhsn(self, rates=True, drop_pandemic_seasons=True, as_of=None):
return dat


def load_nhsn_from_nhsn(self, rates=True, as_of=None):
# find the largest stored file dated on or before the as_of date
as_of_file_path = f"influenza-nhsn/nhsn-{str(as_of)}.csv"
glob_results = s3fs.S3FileSystem(anon=True) \
.glob("infectious-disease-data/data-raw/influenza-nhsn/nhsn-????-??-??.csv")
all_file_paths = sorted([f[len("infectious-disease-data/data-raw/"):] for f in glob_results])
all_file_paths = [f for f in all_file_paths if f <= as_of_file_path]
file_path = all_file_paths[-1]

dat = pd.read_csv(self._construct_data_raw_url(file_path))
# Keeping Percent Hospitals Reporting field for now in case it's useful later.
dat = dat[['Geographic aggregation', 'Week Ending Date', 'Total Influenza Admissions', 'Percent Hospitals Reporting Influenza Admissions']]
dat.columns = ['abbreviation', 'wk_end_date', 'inc', 'pct_report']
fips_mappings = self.load_fips_mappings()
dat = dat.merge(fips_mappings, on=["abbreviation"], how="left")

ew_str = dat.apply(utils.date_to_ew_str, axis=1)
dat["season"] = utils.convert_epiweek_to_season(ew_str)
dat["season_week"] = utils.convert_epiweek_to_season_week(ew_str)
dat = dat.sort_values(by=["season", "season_week"])

if rates:
pops = self.load_us_census()
dat = dat.merge(pops, on = ["location", "season"], how="left") \
.assign(inc=lambda x: x["inc"] / x["pop"] * 100000)

dat["wk_end_date"] = pd.to_datetime(dat["wk_end_date"])

dat["agg_level"] = np.where(dat["location"] == "US", "national", "state")
dat = dat[["agg_level", "location", "season", "season_week", "wk_end_date", "inc"]]
dat["source"] = "nhsn"
return dat

def load_agg_transform_ilinet(self, fips_mappings, **ilinet_kwargs):
df_ilinet_full = self.load_ilinet(**ilinet_kwargs)
# df_ilinet_full.loc[df_ilinet_full['inc'] < np.exp(-7), 'inc'] = np.exp(-7)
Expand Down

0 comments on commit 106f185

Please sign in to comment.