diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 1168bee7a..580e668b5 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.4.7 +current_version = 0.4.8 commit = False tag = False diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 5070eae06..438a805fd 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -57,11 +57,13 @@ jobs: docker build -t delphi_web_epidata -f ./devops/Dockerfile . cd ../../../ + # MODULE_NAME specifies the location of the `app` variable, the actual WSGI application object to run. + # see https://github.com/tiangolo/meinheld-gunicorn-docker#module_name - name: Start services run: | docker network create --driver bridge delphi-net docker run --rm -d -p 13306:3306 --network delphi-net --name delphi_database_epidata --cap-add=sys_nice delphi_database_epidata - docker run --rm -d -p 10080:80 --env "SQLALCHEMY_DATABASE_URI=mysql+mysqldb://user:pass@delphi_database_epidata:3306/epidata" --env "FLASK_SECRET=abc" --env "FLASK_PREFIX=/epidata" --network delphi-net --name delphi_web_epidata delphi_web_epidata + docker run --rm -d -p 10080:80 --env "MODULE_NAME=delphi.epidata.server.main" --env "SQLALCHEMY_DATABASE_URI=mysql+mysqldb://user:pass@delphi_database_epidata:3306/epidata" --env "FLASK_SECRET=abc" --env "FLASK_PREFIX=/epidata" --network delphi-net --name delphi_web_epidata delphi_web_epidata docker ps - run: | diff --git a/.github/workflows/performance-tests.yml b/.github/workflows/performance-tests.yml index d515f77e0..596d0a348 100644 --- a/.github/workflows/performance-tests.yml +++ b/.github/workflows/performance-tests.yml @@ -71,7 +71,7 @@ jobs: run: | cd delphi-admin/load-testing/locust docker build -t locust . - export CSV=v4-requests-small.csv + export CSV=v4-requests-as_of.csv touch output_stats.csv && chmod 666 output_stats.csv touch output_stats_history.csv && chmod 666 output_stats_history.csv touch output_failures.csv && chmod 666 output_failures.csv diff --git a/deploy.json b/deploy.json index 425ddef6d..59d141ba4 100644 --- a/deploy.json +++ b/deploy.json @@ -17,6 +17,15 @@ "add-header-comment": true }, + "// common", + { + "type": "move", + "src": "src/common/", + "dst": "[[package]]/common/", + "match": "^.*\\.(py)$", + "add-header-comment": true + }, + "// server", { "type": "move", @@ -47,15 +56,6 @@ "add-header-comment": true }, - "// acquisition - common", - { - "type": "move", - "src": "src/acquisition/common/", - "dst": "[[package]]/acquisition/common/", - "match": "^.*\\.(py)$", - "add-header-comment": true - }, - "// acquisition - fluview", { "type": "move", diff --git a/dev/local/Makefile b/dev/local/Makefile index 75b10554c..55210d790 100644 --- a/dev/local/Makefile +++ b/dev/local/Makefile @@ -91,8 +91,11 @@ web: cd - @# Run the web server + @# MODULE_NAME specifies the location of the `app` variable, the actual WSGI application object to run. + @# see https://github.com/tiangolo/meinheld-gunicorn-docker#module_name @docker run --rm -p 127.0.0.1:10080:80 \ $(M1) \ + --env "MODULE_NAME=delphi.epidata.server.main" \ --env "SQLALCHEMY_DATABASE_URI=$(sqlalchemy_uri)" \ --env "FLASK_SECRET=abc" --env "FLASK_PREFIX=/epidata" --env "LOG_DEBUG" \ --network delphi-net --name delphi_web_epidata \ diff --git a/dev/local/setup.cfg b/dev/local/setup.cfg index e43d90ade..990e8e890 100644 --- a/dev/local/setup.cfg +++ b/dev/local/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = Delphi Development -version = 0.4.7 +version = 0.4.8 [options] packages = diff --git a/devops/Dockerfile b/devops/Dockerfile index 158fb9589..97dc0e2c8 100644 --- a/devops/Dockerfile +++ b/devops/Dockerfile @@ -8,14 +8,16 @@ LABEL org.opencontainers.image.source=https://github.com/cmu-delphi/delphi-epida COPY ./devops/gunicorn_conf.py /app COPY ./devops/start_wrapper.sh / -COPY ./src/server/ /app/app/ -COPY --from=builder ./src/build/lib/ /app/app/lib/ +RUN mkdir -p /app/delphi/epidata +COPY ./src/server /app/delphi/epidata/server +COPY ./src/common /app/delphi/epidata/common +COPY --from=builder ./src/build/lib/ /app/delphi/epidata/lib/ COPY requirements.api.txt /app/requirements_also.txt RUN ln -s -f /usr/share/zoneinfo/America/New_York /etc/localtime \ - && rm -rf /app/app/__pycache__ /app/app/*.php \ - && chmod -R o+r /app/app \ + && rm -rf /app/delphi/epidata/__pycache__ \ + && chmod -R o+r /app/delphi/epidata \ && chmod 755 /start_wrapper.sh \ && pip install --no-cache-dir -r /tmp/requirements.txt -r requirements_also.txt # the file /tmp/requirements.txt is created in the parent docker definition. (see: diff --git a/docs/Gemfile.lock b/docs/Gemfile.lock index 88877f914..417d76219 100644 --- a/docs/Gemfile.lock +++ b/docs/Gemfile.lock @@ -1,11 +1,12 @@ GEM remote: https://rubygems.org/ specs: - activesupport (5.2.5) + activesupport (6.0.6.1) concurrent-ruby (~> 1.0, >= 1.0.2) i18n (>= 0.7, < 2) minitest (~> 5.1) tzinfo (~> 1.1) + zeitwerk (~> 2.2, >= 2.2.2) addressable (2.8.0) public_suffix (>= 2.0.2, < 5.0) coffee-script (2.4.1) @@ -15,7 +16,7 @@ GEM colorator (1.1.0) commonmarker (0.17.13) ruby-enum (~> 0.5) - concurrent-ruby (1.1.8) + concurrent-ruby (1.2.0) dnsruby (1.61.5) simpleidn (~> 0.1) em-websocket (0.5.2) @@ -209,7 +210,7 @@ GEM jekyll (>= 3.5, < 5.0) jekyll-feed (~> 0.9) jekyll-seo-tag (~> 2.1) - minitest (5.14.4) + minitest (5.17.0) multipart-post (2.1.1) nokogiri (1.13.10) mini_portile2 (~> 2.8.0) @@ -255,6 +256,7 @@ GEM unf_ext (0.0.7.7) unicode-display_width (1.7.0) wdm (0.1.1) + zeitwerk (2.6.6) PLATFORMS ruby diff --git a/docs/symptom-survey/publications.md b/docs/symptom-survey/publications.md index bff92fc4f..e015bbe4e 100644 --- a/docs/symptom-survey/publications.md +++ b/docs/symptom-survey/publications.md @@ -26,10 +26,14 @@ Pandemic"](https://www.pnas.org/topic/548) in *PNAS*: Research publications using the survey data include: +- Taube JC, Susswein Z, Bansal S (2023). [Spatiotemporal Trends in Self-Reported + Mask-Wearing Behavior in the United States: Analysis of a Large + Cross-sectional Survey](https://doi.org/10.2196/42128). *JMIR Public Health + and Surveillance* 9:e42128. - Rebecca L. Weintraub et al (2023). [Identifying COVID-19 Vaccine Deserts and Ways to Reduce Them: A Digital Tool to Support Public Health Decision-Making](https://doi.org/10.2105/AJPH.2022.307198). *American Journal - of Public Health*. + of Public Health* 113 (4), 363-367. - Anzalone AJ, Sun J, Vinson AJ, Beasley WH, Hillegass WB, Murray K, et al. (2023). [Community risks for SARS-CoV-2 infection among fully vaccinated US adults by rurality: A retrospective cohort study from the National COVID @@ -38,7 +42,7 @@ Research publications using the survey data include: - Rufino, J., Baquero, C., Frey, D. et al (2023). [Using survey data to estimate the impact of the omicron variant on vaccine efficacy against COVID-19 infection](https://doi.org/10.1038/s41598-023-27951-3). *Scientific Reports* - 13, 900 (2023). + 13, 900. - Rader, B., Astley, C.M., Sewalk, K. et al (2022). [Spatial modeling of vaccine deserts as barriers to controlling SARS-CoV-2](https://doi.org/10.1038/s43856-022-00183-8). *Communications diff --git a/integrations/acquisition/covidcast/delete_batch.csv b/integrations/acquisition/covidcast/delete_batch.csv index 5c1602218..14cf71e16 100644 --- a/integrations/acquisition/covidcast/delete_batch.csv +++ b/integrations/acquisition/covidcast/delete_batch.csv @@ -1,4 +1,4 @@ geo_id,value,stderr,sample_size,issue,time_value,geo_type,signal,source -d_nonlatest,0,0,0,1,0,county,sig,src -d_latest, 0,0,0,3,0,county,sig,src -d_justone, 0,0,0,1,0,county,sig,src \ No newline at end of file +d_nonlatest,0,0,0,1,0,msa,sig,src +d_latest, 0,0,0,3,0,msa,sig,src +d_justone, 0,0,0,1,0,msa,sig,src \ No newline at end of file diff --git a/integrations/acquisition/covidcast/test_csv_uploading.py b/integrations/acquisition/covidcast/test_csv_uploading.py index f975ecfa0..040eb5f1a 100644 --- a/integrations/acquisition/covidcast/test_csv_uploading.py +++ b/integrations/acquisition/covidcast/test_csv_uploading.py @@ -4,7 +4,7 @@ from datetime import date import os import unittest -from unittest.mock import MagicMock +import argparse # third party import mysql.connector @@ -14,7 +14,7 @@ # first party from delphi_utils import Nans from delphi.epidata.client.delphi_epidata import Epidata -from delphi.epidata.acquisition.covidcast.csv_to_database import main +from delphi.epidata.acquisition.covidcast.csv_to_database import main, get_argument_parser import delphi.operations.secrets as secrets # py3tester coverage target (equivalent to `import *`) @@ -92,16 +92,12 @@ def test_uploading(self): # make some fake data files data_dir = 'covid/data' - source_receiving_dir = data_dir + '/receiving/src-name' + indicator_dir = 'src-name' + source_receiving_dir = data_dir + '/receiving/' + indicator_dir log_file_directory = "/var/log/" os.makedirs(source_receiving_dir, exist_ok=True) os.makedirs(log_file_directory, exist_ok=True) - # TODO: use an actual argparse object for the args instead of a MagicMock - args = MagicMock( - log_file=log_file_directory + - "output.log", - data_dir=data_dir, - specific_issue_date=False) + args = get_argument_parser().parse_args(["--log_file", log_file_directory + "output.log", "--data_dir", data_dir, "--indicator_name", indicator_dir]) uploader_column_rename = {"geo_id": "geo_value", "val": "value", "se": "stderr", "missing_val": "missing_value", "missing_se": "missing_stderr"} diff --git a/integrations/client/test_delphi_epidata.py b/integrations/client/test_delphi_epidata.py index 82c1452ec..0c8c3e35d 100644 --- a/integrations/client/test_delphi_epidata.py +++ b/integrations/client/test_delphi_epidata.py @@ -12,11 +12,10 @@ # third party import delphi.operations.secrets as secrets from delphi.epidata.acquisition.covidcast.covidcast_meta_cache_updater import main as update_covidcast_meta_cache -from delphi.epidata.acquisition.covidcast.test_utils import CovidcastBase, CovidcastTestRow +from delphi.epidata.acquisition.covidcast.test_utils import CovidcastBase, CovidcastTestRow, FIPS, MSA from delphi.epidata.client.delphi_epidata import Epidata from delphi_utils import Nans - # py3tester coverage target __test_target__ = 'delphi.epidata.client.delphi_epidata' # all the Nans we use here are just one value, so this is a shortcut to it: @@ -219,10 +218,10 @@ def test_geo_value(self): # insert placeholder data: three counties, three MSAs N = 3 rows = [ - CovidcastTestRow.make_default_row(geo_type="county", geo_value=str(i)*5, value=i) + CovidcastTestRow.make_default_row(geo_type="fips", geo_value=FIPS[i], value=i) for i in range(N) ] + [ - CovidcastTestRow.make_default_row(geo_type="msa", geo_value=str(i)*5, value=i*10) + CovidcastTestRow.make_default_row(geo_type="msa", geo_value=MSA[i], value=i*10) for i in range(N) ] self._insert_rows(rows) @@ -241,26 +240,28 @@ def fetch(geo): self.assertEqual(request['message'], 'success') self.assertEqual(request['epidata'], counties) # test fetch a specific region - request = fetch('11111') + request = fetch([FIPS[0]]) self.assertEqual(request['message'], 'success') - self.assertEqual(request['epidata'], [counties[1]]) + self.assertEqual(request['epidata'], [counties[0]]) # test fetch a specific yet not existing region request = fetch('55555') - self.assertEqual(request['message'], 'no results') + self.assertEqual(request['message'], 'Invalid geo_value(s) 55555 for the requested geo_type fips') # test fetch a multiple regions - request = fetch(['11111', '22222']) + request = fetch([FIPS[0], FIPS[1]]) self.assertEqual(request['message'], 'success') - self.assertEqual(request['epidata'], [counties[1], counties[2]]) + self.assertEqual(request['epidata'], [counties[0], counties[1]]) # test fetch a multiple regions in another variant - request = fetch(['00000', '22222']) + request = fetch([FIPS[0], FIPS[2]]) self.assertEqual(request['message'], 'success') self.assertEqual(request['epidata'], [counties[0], counties[2]]) # test fetch a multiple regions but one is not existing - request = fetch(['11111', '55555']) - self.assertEqual(request['message'], 'success') - self.assertEqual(request['epidata'], [counties[1]]) + request = fetch([FIPS[0], '55555']) + self.assertEqual(request['message'], 'Invalid geo_value(s) 55555 for the requested geo_type fips') # test fetch a multiple regions but specify no region request = fetch([]) + self.assertEqual(request['message'], 'geo_value is empty for the requested geo_type fips!') + # test fetch a region with no results + request = fetch([FIPS[3]]) self.assertEqual(request['message'], 'no results') def test_covidcast_meta(self): @@ -325,10 +326,10 @@ def test_async_epidata(self): # insert placeholder data: three counties, three MSAs N = 3 rows = [ - CovidcastTestRow.make_default_row(geo_type="county", geo_value=str(i)*5, value=i) + CovidcastTestRow.make_default_row(geo_type="fips", geo_value=FIPS[i-1], value=i) for i in range(N) ] + [ - CovidcastTestRow.make_default_row(geo_type="msa", geo_value=str(i)*5, value=i*10) + CovidcastTestRow.make_default_row(geo_type="msa", geo_value=MSA[i-1], value=i*10) for i in range(N) ] self._insert_rows(rows) diff --git a/integrations/server/test_covidcast.py b/integrations/server/test_covidcast.py index 5a8df96f0..01d81bf29 100644 --- a/integrations/server/test_covidcast.py +++ b/integrations/server/test_covidcast.py @@ -10,7 +10,7 @@ # first party from delphi_utils import Nans -from delphi.epidata.acquisition.covidcast.test_utils import CovidcastBase, CovidcastTestRow +from delphi.epidata.acquisition.covidcast.test_utils import CovidcastBase, CovidcastTestRow, FIPS, MSA from delphi.epidata.client.delphi_epidata import Epidata # use the local instance of the Epidata API @@ -37,11 +37,10 @@ def _insert_placeholder_set_one(self): def _insert_placeholder_set_two(self): rows = [ - CovidcastTestRow.make_default_row(geo_type='county', geo_value=str(i)*5, value=i*1., stderr=i*10., sample_size=i*100.) + CovidcastTestRow.make_default_row(geo_type='msa', geo_value=MSA[i-1], value=i*1., stderr=i*10., sample_size=i*100.) for i in [1, 2, 3] ] + [ - # geo value intended to overlap with counties above - CovidcastTestRow.make_default_row(geo_type='msa', geo_value=str(i-3)*5, value=i*1., stderr=i*10., sample_size=i*100.) + CovidcastTestRow.make_default_row(geo_type='fips', geo_value=FIPS[i-4], value=i*1., stderr=i*10., sample_size=i*100.) for i in [4, 5, 6] ] self._insert_rows(rows) @@ -49,11 +48,11 @@ def _insert_placeholder_set_two(self): def _insert_placeholder_set_three(self): rows = [ - CovidcastTestRow.make_default_row(geo_type='county', geo_value='11111', time_value=2000_01_01+i, value=i*1., stderr=i*10., sample_size=i*100., issue=2000_01_03, lag=2-i) + CovidcastTestRow.make_default_row(time_value=2000_01_01+i, value=i*1., stderr=i*10., sample_size=i*100., issue=2000_01_03, lag=2-i) for i in [1, 2, 3] ] + [ - # time value intended to overlap with 11111 above, with disjoint geo values - CovidcastTestRow.make_default_row(geo_type='county', geo_value=str(i)*5, time_value=2000_01_01+i-3, value=i*1., stderr=i*10., sample_size=i*100., issue=2000_01_03, lag=5-i) + # time value intended to overlap with the time values above, with disjoint geo values + CovidcastTestRow.make_default_row(geo_value=MSA[i-3], time_value=2000_01_01+i-3, value=i*1., stderr=i*10., sample_size=i*100., issue=2000_01_03, lag=5-i) for i in [4, 5, 6] ] self._insert_rows(rows) @@ -295,7 +294,7 @@ def test_signal_wildcard(self): }) def test_geo_value(self): - """test different variants of geo types: single, *, multi.""" + """test whether geo values are valid for specific geo types""" # insert placeholder data rows = self._insert_placeholder_set_two() @@ -308,26 +307,28 @@ def fetch(geo_value): return response # test fetch a specific region - r = fetch('11111') + r = fetch(MSA[0]) self.assertEqual(r['message'], 'success') self.assertEqual(r['epidata'], expected[0:1]) # test fetch a specific yet not existing region - r = fetch('55555') - self.assertEqual(r['message'], 'no results') + r = fetch('11111') + self.assertEqual(r['message'], 'Invalid geo_value(s) 11111 for the requested geo_type msa') # test fetch multiple regions - r = fetch('11111,22222') + r = fetch(f'{MSA[0]},{MSA[1]}') self.assertEqual(r['message'], 'success') self.assertEqual(r['epidata'], expected[0:2]) # test fetch multiple noncontiguous regions - r = fetch('11111,33333') + r = fetch(f'{MSA[0]},{MSA[2]}') self.assertEqual(r['message'], 'success') self.assertEqual(r['epidata'], [expected[0], expected[2]]) # test fetch multiple regions but one is not existing - r = fetch('11111,55555') - self.assertEqual(r['message'], 'success') - self.assertEqual(r['epidata'], expected[0:1]) + r = fetch(f'{MSA[0]},11111') + self.assertEqual(r['message'], 'Invalid geo_value(s) 11111 for the requested geo_type msa') # test fetch empty region r = fetch('') + self.assertEqual(r['message'], 'geo_value is empty for the requested geo_type msa!') + # test a region that has no results + r = fetch(MSA[3]) self.assertEqual(r['message'], 'no results') def test_location_timeline(self): diff --git a/integrations/server/test_covidcast_endpoints.py b/integrations/server/test_covidcast_endpoints.py index 41d942456..1f7e7ade5 100644 --- a/integrations/server/test_covidcast_endpoints.py +++ b/integrations/server/test_covidcast_endpoints.py @@ -182,32 +182,6 @@ def match_row(trend, row): self.assertEqual(trend["max_value"], first.value) self.assertEqual(trend["max_trend"], "decreasing") - def test_correlation(self): - """Request a signal from the /correlation endpoint.""" - - num_rows = 30 - reference_rows = [CovidcastTestRow.make_default_row(signal="ref", time_value=20200401 + i, value=i) for i in range(num_rows)] - first = reference_rows[0] - self._insert_rows(reference_rows) - other_rows = [CovidcastTestRow.make_default_row(signal="other", time_value=20200401 + i, value=i) for i in range(num_rows)] - other = other_rows[0] - self._insert_rows(other_rows) - max_lag = 3 - - out = self._fetch("/correlation", reference=first.signal_pair(), others=other.signal_pair(), geo=first.geo_pair(), window="20200401-20201212", lag=max_lag) - self.assertEqual(out["result"], 1) - df = pd.DataFrame(out["epidata"]) - self.assertEqual(len(df), max_lag * 2 + 1) # -...0...+ - self.assertEqual(df["geo_type"].unique().tolist(), [first.geo_type]) - self.assertEqual(df["geo_value"].unique().tolist(), [first.geo_value]) - self.assertEqual(df["signal_source"].unique().tolist(), [other.source]) - self.assertEqual(df["signal_signal"].unique().tolist(), [other.signal]) - - self.assertEqual(df["lag"].tolist(), list(range(-max_lag, max_lag + 1))) - self.assertEqual(df["r2"].unique().tolist(), [1.0]) - self.assertEqual(df["slope"].unique().tolist(), [1.0]) - self.assertEqual(df["intercept"].tolist(), [3.0, 2.0, 1.0, 0.0, -1.0, -2.0, -3.0]) - self.assertEqual(df["samples"].tolist(), [num_rows - abs(l) for l in range(-max_lag, max_lag + 1)]) def test_csv(self): """Request a signal from the /csv endpoint.""" diff --git a/requirements.api.txt b/requirements.api.txt index 6ccafc1e1..fe6a5b213 100644 --- a/requirements.api.txt +++ b/requirements.api.txt @@ -1,3 +1,4 @@ +delphi_utils==0.3.6 epiweeks==2.1.2 Flask==2.2.2 itsdangerous<2.1 @@ -13,4 +14,4 @@ SQLAlchemy==1.4.40 structlog==22.1.0 tenacity==7.0.0 typing-extensions -werkzeug==2.2.2 +werkzeug==2.2.3 diff --git a/src/acquisition/common/logger.py b/src/acquisition/common/logger.py deleted file mode 100644 index ad3b3679f..000000000 --- a/src/acquisition/common/logger.py +++ /dev/null @@ -1,106 +0,0 @@ -"""Structured logger utility for creating JSON logs in Delphi pipelines.""" -import logging -import os -import sys -import threading -import structlog - -def handle_exceptions(logger): - """Handle exceptions using the provided logger.""" - def exception_handler(etype, value, traceback): - logger.exception("Top-level exception occurred", - exc_info=(etype, value, traceback)) - - def multithread_exception_handler(args): - exception_handler(args.exc_type, args.exc_value, args.exc_traceback) - - sys.excepthook = exception_handler - threading.excepthook = multithread_exception_handler - - -def get_structured_logger(name=__name__, - filename=None, - log_exceptions=True): - """Create a new structlog logger. - - Use the logger returned from this in indicator code using the standard - wrapper calls, e.g.: - - logger = get_structured_logger(__name__) - logger.warning("Error", type="Signal too low"). - - The output will be rendered as JSON which can easily be consumed by logs - processors. - - See the structlog documentation for details. - - Parameters - --------- - name: Name to use for logger (included in log lines), __name__ from caller - is a good choice. - filename: An (optional) file to write log output. - """ - # Configure the underlying logging configuration - handlers = [logging.StreamHandler()] - if filename: - handlers.append(logging.FileHandler(filename)) - - if "LOG_DEBUG" in os.environ: - log_level = logging.DEBUG - else: - log_level = logging.INFO - - logging.basicConfig( - format="%(message)s", - level=log_level, - handlers=handlers - ) - - def add_pid(_logger, _method_name, event_dict): - """ - Add current PID to the event dict. - """ - event_dict["pid"] = os.getpid() - return event_dict - - # Configure structlog. This uses many of the standard suggestions from - # the structlog documentation. - structlog.configure( - processors=[ - # Filter out log levels we are not tracking. - structlog.stdlib.filter_by_level, - # Include logger name in output. - structlog.stdlib.add_logger_name, - # Include log level in output. - structlog.stdlib.add_log_level, - # Include PID in output. - add_pid, - # Allow formatting into arguments e.g., logger.info("Hello, %s", - # name) - structlog.stdlib.PositionalArgumentsFormatter(), - # Add timestamps. - structlog.processors.TimeStamper(fmt="iso"), - # Match support for exception logging in the standard logger. - structlog.processors.StackInfoRenderer(), - structlog.processors.format_exc_info, - # Decode unicode characters - structlog.processors.UnicodeDecoder(), - # Render as JSON - structlog.processors.JSONRenderer() - ], - # Use a dict class for keeping track of data. - context_class=dict, - # Use a standard logger for the actual log call. - logger_factory=structlog.stdlib.LoggerFactory(), - # Use a standard wrapper class for utilities like log.warning() - wrapper_class=structlog.stdlib.BoundLogger, - # Cache the logger - cache_logger_on_first_use=True, - ) - - logger = structlog.get_logger(name) - - if log_exceptions: - handle_exceptions(logger) - - return logger diff --git a/src/acquisition/covid_hosp/common/database.py b/src/acquisition/covid_hosp/common/database.py index ed308e7a0..4fd0981a1 100644 --- a/src/acquisition/covid_hosp/common/database.py +++ b/src/acquisition/covid_hosp/common/database.py @@ -11,7 +11,7 @@ # first party import delphi.operations.secrets as secrets -from delphi.epidata.acquisition.common.logger import get_structured_logger +from delphi.epidata.common.logger import get_structured_logger Columndef = namedtuple("Columndef", "csv_name sql_name dtype") diff --git a/src/acquisition/covid_hosp/common/utils.py b/src/acquisition/covid_hosp/common/utils.py index fcf956f66..5f718ad69 100644 --- a/src/acquisition/covid_hosp/common/utils.py +++ b/src/acquisition/covid_hosp/common/utils.py @@ -160,7 +160,8 @@ def merge_by_key_cols(dfs, key_cols, logger=False): ## repeated concatenation in pandas is expensive, but (1) we don't expect ## batch sizes to be terribly large (7 files max) and (2) this way we can ## more easily capture the next iteration's updates to any new keys - new_rows = df.loc[[i for i in df.index.to_list() if i not in result.index.to_list()]] + result_index_set = set(result.index.to_list()) + new_rows = df.loc[[i for i in df.index.to_list() if i not in result_index_set]] result = pd.concat([result, new_rows]) # convert the index rows back to columns diff --git a/src/acquisition/covidcast/covidcast_meta_cache_updater.py b/src/acquisition/covidcast/covidcast_meta_cache_updater.py index b4eff0d08..c5f7fe3e8 100644 --- a/src/acquisition/covidcast/covidcast_meta_cache_updater.py +++ b/src/acquisition/covidcast/covidcast_meta_cache_updater.py @@ -7,7 +7,7 @@ # first party from delphi.epidata.acquisition.covidcast.database import Database -from delphi.epidata.acquisition.common.logger import get_structured_logger +from delphi.epidata.common.logger import get_structured_logger from delphi.epidata.client.delphi_epidata import Epidata def get_argument_parser(): diff --git a/src/acquisition/covidcast/csv_importer.py b/src/acquisition/covidcast/csv_importer.py index 3eaec7d2a..f6122e610 100644 --- a/src/acquisition/covidcast/csv_importer.py +++ b/src/acquisition/covidcast/csv_importer.py @@ -15,8 +15,8 @@ # first party from delphi_utils import Nans from delphi.utils.epiweek import delta_epiweeks -from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow -from delphi.epidata.acquisition.common.logger import get_structured_logger +from delphi.epidata.common.covidcast_row import CovidcastRow +from delphi.epidata.common.logger import get_structured_logger DataFrameRow = NamedTuple('DFRow', [ ('geo_id', str), @@ -133,10 +133,11 @@ def find_issue_specific_csv_files(scan_dir): @staticmethod - def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today()))): + def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today())), indicator_dir= "*"): """Recursively search for and yield covidcast-format CSV files. scan_dir: the directory to scan (recursively) + indicator_dir: specify one indicator with .csv files inside The return value is a tuple of (path, details), where, if the path was valid, details is a tuple of (source, signal, time_type, geo_type, @@ -149,7 +150,7 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today() issue_value=-1 lag_value=-1 - for path in sorted(glob(os.path.join(scan_dir, '*', '*'))): + for path in sorted(glob(os.path.join(scan_dir, indicator_dir, '*'))): # safe to ignore this file if not path.lower().endswith('.csv'): continue diff --git a/src/acquisition/covidcast/csv_to_database.py b/src/acquisition/covidcast/csv_to_database.py index 90270cb27..be9dad86c 100644 --- a/src/acquisition/covidcast/csv_to_database.py +++ b/src/acquisition/covidcast/csv_to_database.py @@ -11,7 +11,7 @@ from delphi.epidata.acquisition.covidcast.csv_importer import CsvImporter, PathDetails from delphi.epidata.acquisition.covidcast.database import Database, DBLoadStateException from delphi.epidata.acquisition.covidcast.file_archiver import FileArchiver -from delphi.epidata.acquisition.common.logger import get_structured_logger +from delphi.epidata.common.logger import get_structured_logger def get_argument_parser(): @@ -28,16 +28,21 @@ def get_argument_parser(): parser.add_argument( '--log_file', help="filename for log output (defaults to stdout)") + parser.add_argument( + '--indicator_name', + nargs='?', + default='*', + help='Name of one indicator directory to run acquisition on') return parser -def collect_files(data_dir: str, specific_issue_date: bool): +def collect_files(data_dir: str, specific_issue_date: bool, indicator_name="*"): """Fetch path and data profile details for each file to upload.""" logger= get_structured_logger('collect_files') if specific_issue_date: results = list(CsvImporter.find_issue_specific_csv_files(data_dir)) else: - results = list(CsvImporter.find_csv_files(os.path.join(data_dir, 'receiving'))) + results = list(CsvImporter.find_csv_files(os.path.join(data_dir, 'receiving'), indicator_dir=indicator_name)) logger.info(f'found {len(results)} files') return results @@ -146,9 +151,8 @@ def main(args): logger = get_structured_logger("csv_ingestion", filename=args.log_file) start_time = time.time() - # shortcut escape without hitting db if nothing to do - path_details = collect_files(args.data_dir, args.specific_issue_date) + path_details=collect_files(args.data_dir, args.specific_issue_date, indicator_name = args.indicator_name) if not path_details: logger.info('nothing to do; exiting...') return diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index 347c85841..31dd3f77e 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -14,8 +14,8 @@ # first party import delphi.operations.secrets as secrets -from delphi.epidata.acquisition.common.logger import get_structured_logger -from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow +from delphi.epidata.common.logger import get_structured_logger +from delphi.epidata.common.covidcast_row import CovidcastRow class DBLoadStateException(Exception): diff --git a/src/acquisition/covidcast/delete_batch.py b/src/acquisition/covidcast/delete_batch.py index ae6ddc487..31a25ef2a 100644 --- a/src/acquisition/covidcast/delete_batch.py +++ b/src/acquisition/covidcast/delete_batch.py @@ -8,7 +8,7 @@ # first party from delphi.epidata.acquisition.covidcast.database import Database -from delphi.epidata.acquisition.common.logger import get_structured_logger +from delphi.epidata.common.logger import get_structured_logger def get_argument_parser(): diff --git a/src/acquisition/covidcast/file_archiver.py b/src/acquisition/covidcast/file_archiver.py index 368677133..802590871 100644 --- a/src/acquisition/covidcast/file_archiver.py +++ b/src/acquisition/covidcast/file_archiver.py @@ -6,7 +6,7 @@ import shutil # first party -from delphi.epidata.acquisition.common.logger import get_structured_logger +from delphi.epidata.common.logger import get_structured_logger class FileArchiver: """Archives files by moving and compressing.""" diff --git a/src/acquisition/covidcast/signal_dash_data_generator.py b/src/acquisition/covidcast/signal_dash_data_generator.py index 431dae9fd..6eea06579 100644 --- a/src/acquisition/covidcast/signal_dash_data_generator.py +++ b/src/acquisition/covidcast/signal_dash_data_generator.py @@ -15,7 +15,7 @@ # first party import covidcast import delphi.operations.secrets as secrets -from delphi.epidata.acquisition.common.logger import get_structured_logger +from delphi.epidata.common.logger import get_structured_logger LOOKBACK_DAYS_FOR_COVERAGE = 56 diff --git a/src/acquisition/covidcast/test_utils.py b/src/acquisition/covidcast/test_utils.py index 96db2c164..6e77aba22 100644 --- a/src/acquisition/covidcast/test_utils.py +++ b/src/acquisition/covidcast/test_utils.py @@ -6,7 +6,7 @@ import pandas as pd from delphi_utils import Nans -from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow +from delphi.epidata.common.covidcast_row import CovidcastRow from delphi.epidata.acquisition.covidcast.database import Database from delphi.epidata.server.utils.dates import day_to_time_value, time_value_to_day import delphi.operations.secrets as secrets @@ -14,6 +14,11 @@ # all the Nans we use here are just one value, so this is a shortcut to it: nmv = Nans.NOT_MISSING.value +# TODO replace these real geo_values with fake values, and use patch and mock to mock the return values of +# delphi_utils.geomap.GeoMapper().get_geo_values(geo_type) in parse_geo_sets() of _params.py + +FIPS = ['04019', '19143', '29063', '36083'] # Example list of valid FIPS codes as strings +MSA = ['40660', '44180', '48620', '49420'] # Example list of valid MSA codes as strings class CovidcastTestRow(CovidcastRow): @staticmethod @@ -22,9 +27,9 @@ def make_default_row(**kwargs) -> "CovidcastTestRow": "source": "src", "signal": "sig", "time_type": "day", - "geo_type": "county", + "geo_type": "msa", "time_value": 2020_02_02, - "geo_value": "01234", + "geo_value": MSA[0], "value": 10.0, "stderr": 10.0, "sample_size": 10.0, diff --git a/src/client/delphi_epidata.R b/src/client/delphi_epidata.R index 01f75068d..5af428bba 100644 --- a/src/client/delphi_epidata.R +++ b/src/client/delphi_epidata.R @@ -15,7 +15,7 @@ Epidata <- (function() { # API base url BASE_URL <- 'https://delphi.cmu.edu/epidata/api.php' - client_version <- '0.4.7' + client_version <- '0.4.8' # Helper function to cast values and/or ranges to strings .listitem <- function(value) { diff --git a/src/client/delphi_epidata.js b/src/client/delphi_epidata.js index 588ab7eb3..da6566f9c 100644 --- a/src/client/delphi_epidata.js +++ b/src/client/delphi_epidata.js @@ -22,7 +22,7 @@ } })(this, function (exports, fetchImpl, jQuery) { const BASE_URL = "https://delphi.cmu.edu/epidata/"; - const client_version = "0.4.7"; + const client_version = "0.4.8"; // Helper function to cast values and/or ranges to strings function _listitem(value) { diff --git a/src/client/packaging/npm/package-lock.json b/src/client/packaging/npm/package-lock.json index b6aec99d9..243a3a46c 100644 --- a/src/client/packaging/npm/package-lock.json +++ b/src/client/packaging/npm/package-lock.json @@ -1,12 +1,12 @@ { "name": "delphi_epidata", - "version": "0.3.14", + "version": "0.4.4", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "delphi_epidata", - "version": "0.3.14", + "version": "0.4.4", "license": "MIT", "dependencies": { "cross-fetch": "^3.1.4" @@ -2896,13 +2896,10 @@ } }, "node_modules/json5": { - "version": "2.2.0", - "resolved": "https://registry.npmjs.org/json5/-/json5-2.2.0.tgz", - "integrity": "sha512-f+8cldu7X/y7RAJurMEJmdoKXGB/X550w2Nr3tTbezL6RwEE/iMcm+tZnXeoZtKuOq6ft8+CqzEkrIgx1fPoQA==", + "version": "2.2.3", + "resolved": "https://registry.npmjs.org/json5/-/json5-2.2.3.tgz", + "integrity": "sha512-XmOWe7eyHYH14cLdVPoyg+GOH3rYX++KpzrylJwSW98t3Nk+U8XOl8FWKOgwtzdb8lXGf6zYwDUzeHMWfxasyg==", "dev": true, - "dependencies": { - "minimist": "^1.2.5" - }, "bin": { "json5": "lib/cli.js" }, @@ -6304,13 +6301,10 @@ "dev": true }, "json5": { - "version": "2.2.0", - "resolved": "https://registry.npmjs.org/json5/-/json5-2.2.0.tgz", - "integrity": "sha512-f+8cldu7X/y7RAJurMEJmdoKXGB/X550w2Nr3tTbezL6RwEE/iMcm+tZnXeoZtKuOq6ft8+CqzEkrIgx1fPoQA==", - "dev": true, - "requires": { - "minimist": "^1.2.5" - } + "version": "2.2.3", + "resolved": "https://registry.npmjs.org/json5/-/json5-2.2.3.tgz", + "integrity": "sha512-XmOWe7eyHYH14cLdVPoyg+GOH3rYX++KpzrylJwSW98t3Nk+U8XOl8FWKOgwtzdb8lXGf6zYwDUzeHMWfxasyg==", + "dev": true }, "kleur": { "version": "3.0.3", diff --git a/src/client/packaging/npm/package.json b/src/client/packaging/npm/package.json index 1c1c31b58..84c7664a7 100644 --- a/src/client/packaging/npm/package.json +++ b/src/client/packaging/npm/package.json @@ -2,7 +2,7 @@ "name": "delphi_epidata", "description": "Delphi Epidata API Client", "authors": "Delphi Group", - "version": "0.4.7", + "version": "0.4.8", "license": "MIT", "homepage": "https://github.com/cmu-delphi/delphi-epidata", "bugs": { diff --git a/src/client/packaging/pypi/delphi_epidata/__init__.py b/src/client/packaging/pypi/delphi_epidata/__init__.py index d92fb10d4..c5fffe5f5 100644 --- a/src/client/packaging/pypi/delphi_epidata/__init__.py +++ b/src/client/packaging/pypi/delphi_epidata/__init__.py @@ -1,4 +1,4 @@ from .delphi_epidata import Epidata name = 'delphi_epidata' -__version__ = '0.4.7' +__version__ = '0.4.8' diff --git a/src/client/packaging/pypi/setup.py b/src/client/packaging/pypi/setup.py index 557784c6a..ff86f08f3 100644 --- a/src/client/packaging/pypi/setup.py +++ b/src/client/packaging/pypi/setup.py @@ -5,7 +5,7 @@ setuptools.setup( name="delphi_epidata", - version="0.4.7", + version="0.4.8", author="David Farrow", author_email="dfarrow0@gmail.com", description="A programmatic interface to Delphi's Epidata API.", diff --git a/src/acquisition/covidcast/covidcast_row.py b/src/common/covidcast_row.py similarity index 100% rename from src/acquisition/covidcast/covidcast_row.py rename to src/common/covidcast_row.py diff --git a/src/server/utils/logger.py b/src/common/logger.py similarity index 100% rename from src/server/utils/logger.py rename to src/common/logger.py diff --git a/src/server/_common.py b/src/server/_common.py index d8e2bc068..2d2d3059f 100644 --- a/src/server/_common.py +++ b/src/server/_common.py @@ -6,7 +6,7 @@ from sqlalchemy.engine import Connection, Engine from werkzeug.local import LocalProxy -from .utils.logger import get_structured_logger +from delphi.epidata.common.logger import get_structured_logger from ._config import SECRET, SQLALCHEMY_DATABASE_URI, SQLALCHEMY_ENGINE_OPTIONS from ._exceptions import DatabaseErrorException, EpiDataException diff --git a/src/server/_config.py b/src/server/_config.py index 618407f75..b85a2eef5 100644 --- a/src/server/_config.py +++ b/src/server/_config.py @@ -4,7 +4,7 @@ load_dotenv() -VERSION = "0.4.7" +VERSION = "0.4.8" MAX_RESULTS = int(10e6) MAX_COMPATIBILITY_RESULTS = int(3650) diff --git a/src/server/_params.py b/src/server/_params.py index d0b1cda6d..41f5ce494 100644 --- a/src/server/_params.py +++ b/src/server/_params.py @@ -2,6 +2,7 @@ import re from dataclasses import dataclass from typing import List, Optional, Sequence, Tuple, Union +import delphi_utils from flask import request @@ -53,6 +54,17 @@ class GeoSet: geo_type: str geo_values: Union[bool, Sequence[str]] + def __init__(self, geo_type: str, geo_values: Union[bool, Sequence[str]]): + if not isinstance(geo_values, bool): + if geo_values == ['']: + raise ValidationFailedException(f"geo_value is empty for the requested geo_type {geo_type}!") + allowed_values = delphi_utils.geomap.GeoMapper().get_geo_values(geo_type) + invalid_values = set(geo_values) - set(allowed_values) + if invalid_values: + raise ValidationFailedException(f"Invalid geo_value(s) {', '.join(invalid_values)} for the requested geo_type {geo_type}") + self.geo_type = geo_type + self.geo_values = geo_values + def matches(self, geo_type: str, geo_value: str) -> bool: return self.geo_type == geo_type and (self.geo_values is True or (not isinstance(self.geo_values, bool) and geo_value in self.geo_values)) @@ -460,6 +472,7 @@ def parse_source_signal_sets() -> List[SourceSignalSet]: def parse_geo_sets() -> List[GeoSet]: geo_type = request.values.get("geo_type") + if geo_type: # old version require_any(request, "geo_value", "geo_values", empty=True) diff --git a/src/server/_printer.py b/src/server/_printer.py index 52f959968..162ba2e36 100644 --- a/src/server/_printer.py +++ b/src/server/_printer.py @@ -8,7 +8,7 @@ from ._config import MAX_RESULTS, MAX_COMPATIBILITY_RESULTS from ._common import is_compatibility_mode -from .utils.logger import get_structured_logger +from delphi.epidata.common.logger import get_structured_logger def print_non_standard(format: str, data): diff --git a/src/server/endpoints/covidcast.py b/src/server/endpoints/covidcast.py index 09b3d3740..c1350b490 100644 --- a/src/server/endpoints/covidcast.py +++ b/src/server/endpoints/covidcast.py @@ -32,7 +32,7 @@ from .._printer import create_printer, CSVPrinter from .._validate import require_all from .._pandas import as_pandas, print_pandas -from .covidcast_utils import compute_trend, compute_trends, compute_correlations, compute_trend_value, CovidcastMetaEntry +from .covidcast_utils import compute_trend, compute_trends, compute_trend_value, CovidcastMetaEntry from ..utils import shift_day_value, day_to_time_value, time_value_to_iso, time_value_to_day, shift_week_value, time_value_to_week, guess_time_value_is_day, week_to_time_value, TimeValues from .covidcast_utils.model import TimeType, count_signal_time_types, data_sources, create_source_signal_alias_mapper @@ -206,79 +206,6 @@ def gen(rows): return p(filter_fields(gen(r))) -@bp.route("/correlation", methods=("GET", "POST")) -def handle_correlation(): - require_all(request, "reference", "window", "others", "geo") - reference = parse_single_source_signal_arg("reference") - other_sets = parse_source_signal_arg("others") - daily_signals, weekly_signals = count_signal_time_types(other_sets + [reference]) - source_signal_sets, alias_mapper = create_source_signal_alias_mapper(other_sets + [reference]) - geo_sets = parse_geo_arg() - time_window = parse_day_or_week_range_arg("window") - is_day = time_window.is_day - _verify_argument_time_type_matches(is_day, daily_signals, weekly_signals) - - lag = extract_integer("lag") - if lag is None: - lag = 28 - - # `lag` above is used in post-processing, not in the database query, so we can use latest here - q = QueryBuilder(latest_table, "t") - - fields_string = ["geo_type", "geo_value", "source", "signal"] - fields_int = ["time_value"] - fields_float = ["value"] - q.set_fields(fields_string, fields_int, fields_float) - q.set_sort_order("geo_type", "geo_value", "source", "signal", "time_value") - - q.apply_source_signal_filters( - "source", - "signal", - source_signal_sets, - ) - q.apply_geo_filters("geo_type", "geo_value", geo_sets) - q.apply_time_filter("time_type", "time_value", time_window) - - df = as_pandas(str(q), q.params) - if is_day: - df["time_value"] = to_datetime(df["time_value"], format="%Y%m%d") - else: - # week but convert to date for simpler shifting - df["time_value"] = to_datetime(df["time_value"].apply(lambda v: time_value_to_week(v).startdate())) - - p = create_printer(request.values.get("format")) - - def prepare_data_frame(df): - return df[["time_value", "value"]].set_index("time_value") - - def gen(): - by_geo = df.groupby(["geo_type", "geo_value"]) - for (geo_type, geo_value), group in by_geo: - # group by source, signal - by_signal = group.groupby(["source", "signal"]) - - # find reference group - # dataframe structure: index=time_value, value=value - reference_group = next((prepare_data_frame(group) for (source, signal), group in by_signal if source == reference.source and signal == reference.signal[0]), None) - - if reference_group is None or reference_group.empty: - continue # no data for reference - - # dataframe structure: index=time_value, value=value - other_groups = [((source, signal), prepare_data_frame(group)) for (source, signal), group in by_signal if not (source == reference.source and signal == reference.signal[0])] - if not other_groups: - continue # no other signals - - for (source, signal), other_group in other_groups: - if alias_mapper: - source = alias_mapper(source, signal) - for cor in compute_correlations(geo_type, geo_value, source, signal, lag, reference_group, other_group, is_day): - yield cor.asdict() - - # now use a generator for sending the rows and execute all the other queries - return p(filter_fields(gen())) - - @bp.route("/csv", methods=("GET", "POST")) def handle_export(): source, signal = request.values.get("signal", "jhu-csse:confirmed_incidence_num").split(":") diff --git a/src/server/endpoints/covidcast_meta.py b/src/server/endpoints/covidcast_meta.py index 92c78017f..d10de18db 100644 --- a/src/server/endpoints/covidcast_meta.py +++ b/src/server/endpoints/covidcast_meta.py @@ -8,7 +8,7 @@ from .._params import extract_strings from .._printer import create_printer from .._query import filter_fields -from ..utils.logger import get_structured_logger +from delphi.epidata.common.logger import get_structured_logger bp = Blueprint("covidcast_meta", __name__) diff --git a/src/server/endpoints/covidcast_utils/__init__.py b/src/server/endpoints/covidcast_utils/__init__.py index 965b62cf0..c36fc55a2 100644 --- a/src/server/endpoints/covidcast_utils/__init__.py +++ b/src/server/endpoints/covidcast_utils/__init__.py @@ -1,3 +1,2 @@ from .trend import compute_trend, compute_trend_value, compute_trends -from .correlation import compute_correlations from .meta import CovidcastMetaEntry \ No newline at end of file diff --git a/src/server/endpoints/covidcast_utils/correlation.py b/src/server/endpoints/covidcast_utils/correlation.py deleted file mode 100644 index 3f8268b25..000000000 --- a/src/server/endpoints/covidcast_utils/correlation.py +++ /dev/null @@ -1,93 +0,0 @@ -from dataclasses import dataclass, asdict -from typing import Iterable -from scipy.stats import linregress -import pandas as pd - - -@dataclass -class CorrelationResult: - geo_type: str - geo_value: str - signal_source: str - signal_signal: str - - lag: int - r2: float - - slope: float - """ - y = slope * x + intercept - """ - intercept: float - """ - y = slope * x + intercept - """ - samples: int - """ - number of dates used for the regression line - """ - - def asdict(self): - return asdict(self) - - -@dataclass -class Correlation: - r2: float - - slope: float - """ - y = slope * x + intercept - """ - intercept: float - """ - y = slope * x + intercept - """ - samples: int - """ - number of dates used for the regression line - """ - - -def lag_join(lag: int, x: pd.DataFrame, y: pd.DataFrame, is_day = True) -> pd.DataFrame: - # x_t_i ~ y_t_(i-lag) - # aka x_t_(i+lag) ~ y_t_i - - if lag == 0: - x_shifted = x - y_shifted = y - elif lag > 0: - # x_t_i ~ y_shifted_t_i - # shift y such that y_t(i - lag) -> y_shifted_t_i - x_shifted = x - y_shifted = y.shift(lag, freq="D" if is_day else 'W') - else: # lag < 0 - # x_shifted_t_i ~ y_t_i - # shift x such that x_t(i+lag) -> x_shifted_t_i - # lag < 0 -> - - lag = + lag - x_shifted = x.shift(-lag, freq="D" if is_day else 'W') - y_shifted = y - # inner join to remove invalid pairs - r = x_shifted.join(y_shifted, how="inner", lsuffix="_x", rsuffix="_y") - return r.rename(columns=dict(value_x="x", value_y="y")) - - -def compute_correlations(geo_type: str, geo_value: str, signal_source: str, signal_signal: str, lag: int, x: pd.DataFrame, y: pd.DataFrame, is_day = True) -> Iterable[CorrelationResult]: - """ - x,y ... DataFrame with "time_value" (Date) index and "value" (float) column - """ - for current_lag in range(-lag, lag + 1): - xy = lag_join(current_lag, x, y, is_day) - c = compute_correlation(xy) - - yield CorrelationResult(geo_type, geo_value, signal_source, signal_signal, current_lag, r2=c.r2, intercept=c.intercept, slope=c.slope, samples=c.samples) - - -def compute_correlation(xy: pd.DataFrame) -> Correlation: - if len(xy) < 2: - # won't be a useful one - return Correlation(0, 0, 0, len(xy)) - - model = linregress(xy.to_numpy()) - r2 = float(model.rvalue) ** 2 - return Correlation(r2, float(model.slope), float(model.intercept), len(xy)) diff --git a/src/server/utils/dates.py b/src/server/utils/dates.py index 126f79383..e810e2146 100644 --- a/src/server/utils/dates.py +++ b/src/server/utils/dates.py @@ -10,7 +10,7 @@ from epiweeks import Week, Year from typing_extensions import TypeAlias -from .logger import get_structured_logger +from delphi.epidata.common.logger import get_structured_logger # Alias for a sequence of date ranges (int, int) or date integers IntRange: TypeAlias = Union[Tuple[int, int], int] diff --git a/tests/acquisition/covidcast/test_covidcast_row.py b/tests/common/test_covidcast_row.py similarity index 91% rename from tests/acquisition/covidcast/test_covidcast_row.py rename to tests/common/test_covidcast_row.py index 9462fd4ed..834a7852d 100644 --- a/tests/acquisition/covidcast/test_covidcast_row.py +++ b/tests/common/test_covidcast_row.py @@ -4,7 +4,7 @@ from pandas.testing import assert_frame_equal from delphi_utils.nancodes import Nans -from delphi.epidata.acquisition.covidcast.covidcast_row import CovidcastRow, set_df_dtypes +from delphi.epidata.common.covidcast_row import CovidcastRow, set_df_dtypes from delphi.epidata.acquisition.covidcast.test_utils import ( CovidcastTestRow, covidcast_rows_as_api_compatibility_row_df, @@ -12,9 +12,10 @@ covidcast_rows_from_args, transpose_dict, ) +from delphi.epidata.acquisition.covidcast.test_utils import MSA # py3tester coverage target (equivalent to `import *`) -__test_target__ = 'delphi.epidata.acquisition.covidcast.covidcast_row' +__test_target__ = 'delphi.epidata.common.covidcast_row' class TestCovidcastRows(unittest.TestCase): @@ -22,9 +23,9 @@ class TestCovidcastRows(unittest.TestCase): "source": ["src"] * 10, "signal": ["sig_base"] * 5 + ["sig_other"] * 5, "time_type": ["day"] * 10, - "geo_type": ["county"] * 10, + "geo_type": ["msa"] * 10, "time_value": [2021_05_01 + i for i in range(5)] * 2, - "geo_value": ["01234"] * 10, + "geo_value": [MSA[0]] * 10, "value": range(10), "stderr": [10.0] * 10, "sample_size": [10.0] * 10, diff --git a/tests/server/endpoints/covidcast_utils/test_correlation.py b/tests/server/endpoints/covidcast_utils/test_correlation.py deleted file mode 100644 index 82b79ed2d..000000000 --- a/tests/server/endpoints/covidcast_utils/test_correlation.py +++ /dev/null @@ -1,69 +0,0 @@ -from typing import Tuple -import unittest -import pandas as pd - -from delphi.epidata.server.endpoints.covidcast_utils.correlation import CorrelationResult, lag_join, compute_correlations, compute_correlation, Correlation - - -def as_df(*tuples: Tuple[int, float]) -> pd.DataFrame: - df = pd.DataFrame.from_records(tuples, columns=["time_value", "value"]) - df["time_value"] = pd.to_datetime(df["time_value"], format="%Y%m%d") - return df.set_index("time_value") - - -def as_xy_df(*tuples: Tuple[float, float]) -> pd.DataFrame: - return pd.DataFrame.from_records(tuples, columns=["x", "y"]) - - -class UnitTests(unittest.TestCase): - def test_lag_join(self): - with self.subTest("all data"): - x = as_df((20201010, 1), (20201011, 2), (20201012, 3)) - y = as_df((20201010, 11), (20201011, 12), (20201012, 13)) - with self.subTest("lag = 0"): - # xxx - # yyy - self.assertEqual(lag_join(0, x, y).to_dict("records"), [dict(x=1, y=11), dict(x=2, y=12), dict(x=3, y=13)]) - with self.subTest("lag = 1"): - # xxx - # yyy - self.assertEqual(lag_join(1, x, y).to_dict("records"), [dict(x=2, y=11), dict(x=3, y=12)]) - with self.subTest("lag = -1"): - # xxx - # yyy - self.assertEqual(lag_join(-1, x, y).to_dict("records"), [dict(x=1, y=12), dict(x=2, y=13)]) - with self.subTest("missing entry"): - x = as_df((20201010, 1), (20201011, 2), (20201012, 3), (20201013, 4)) - y = as_df((20201010, 11), (20201012, 13), (20201013, 14)) - with self.subTest("lag = 0"): - # xxxx - # y yy - self.assertEqual(lag_join(0, x, y).to_dict("records"), [dict(x=1, y=11), dict(x=3, y=13), dict(x=4, y=14)]) - with self.subTest("lag = 1"): - # xxxx - # y yy - self.assertEqual(lag_join(1, x, y).to_dict("records"), [dict(x=2, y=11), dict(x=4, y=13)]) - with self.subTest("lag = -1"): - # xxxx - # y yy - self.assertEqual(lag_join(-1, x, y).to_dict("records"), [dict(x=2, y=13), dict(x=3, y=14)]) - - def test_compute_correlation(self): - with self.subTest("simple"): - xy = as_xy_df((1, 11), (2, 12), (3, 13)) - self.assertEqual(compute_correlation(xy), Correlation(r2=1, intercept=10, slope=1, samples=3)) - with self.subTest("inverted"): - xy = as_xy_df((1, 13), (2, 12), (3, 11)) - self.assertEqual(compute_correlation(xy), Correlation(r2=1, intercept=14, slope=-1, samples=3)) - with self.subTest("none"): - xy = as_xy_df((1, 0), (2, 0), (3, 0)) - self.assertEqual(compute_correlation(xy), Correlation(r2=0, intercept=0, slope=0, samples=3)) - - def test_compute_correlations(self): - x = as_df((20201010, 1), (20201011, 2), (20201012, 3)) - y = as_df((20201010, 11), (20201011, 12), (20201012, 13)) - - r = list(compute_correlations("gt", "gv", "so", "si", 2, x, y)) - self.assertEqual(len(r), 5) - # lag 0 - self.assertEqual(r[2], CorrelationResult("gt", "gv", "so", "si", 0, r2=1, intercept=10, slope=1, samples=3)) \ No newline at end of file diff --git a/tests/server/test_pandas.py b/tests/server/test_pandas.py index 12a9c18cd..e1c3ee8fe 100644 --- a/tests/server/test_pandas.py +++ b/tests/server/test_pandas.py @@ -25,7 +25,7 @@ def setUp(self): @patch("delphi.epidata.server._pandas.text") @patch("pandas.read_sql_query") def test_as_pandas(self, mock_read_sql_query, mock_sqlalch_text): - with app.test_request_context('/correlation'): + with app.test_request_context('covidcast/'): mock_sqlalch_text.return_value = sentinel.default_limit as_pandas("", params=None, db_engine=None) diff --git a/tests/server/test_params.py b/tests/server/test_params.py index 177ff5cba..1a401efe2 100644 --- a/tests/server/test_params.py +++ b/tests/server/test_params.py @@ -28,6 +28,7 @@ from delphi.epidata.server._exceptions import ( ValidationFailedException, ) +from delphi.epidata.acquisition.covidcast.test_utils import FIPS, MSA # py3tester coverage target __test_target__ = "delphi.epidata.server._params" @@ -45,19 +46,19 @@ def setUp(self): def test_geo_set(self): with self.subTest("*"): - p = GeoSet("hrr", True) - self.assertTrue(p.matches("hrr", "any")) + p = GeoSet("fips", True) + self.assertTrue(p.matches("fips", "any")) self.assertFalse(p.matches("msa", "any")) with self.subTest("subset"): - p = GeoSet("hrr", ["a", "b"]) - self.assertTrue(p.matches("hrr", "a")) - self.assertTrue(p.matches("hrr", "b")) - self.assertFalse(p.matches("hrr", "c")) + p = GeoSet("fips", [FIPS[0], FIPS[1]]) + self.assertTrue(p.matches("fips", FIPS[0])) + self.assertTrue(p.matches("fips", FIPS[1])) + self.assertFalse(p.matches("fips", "c")) self.assertFalse(p.matches("msa", "any")) with self.subTest("count"): self.assertEqual(GeoSet("a", True).count(), inf) self.assertEqual(GeoSet("a", False).count(), 0) - self.assertEqual(GeoSet("a", ["a", "b"]).count(), 2) + self.assertEqual(GeoSet("fips", [FIPS[0], FIPS[1]]).count(), 2) def test_source_signal_set(self): with self.subTest("*"): @@ -89,43 +90,43 @@ def test_parse_geo_arg(self): with app.test_request_context("/"): self.assertEqual(parse_geo_arg(), []) with self.subTest("single"): - with app.test_request_context("/?geo=state:*"): - self.assertEqual(parse_geo_arg(), [GeoSet("state", True)]) - with app.test_request_context("/?geo=state:AK"): - self.assertEqual(parse_geo_arg(), [GeoSet("state", ["ak"])]) + with app.test_request_context("/?geo=fips:*"): + self.assertEqual(parse_geo_arg(), [GeoSet("fips", True)]) + with app.test_request_context(f"/?geo=fips:{FIPS[0]}"): + self.assertEqual(parse_geo_arg(), [GeoSet("fips", [FIPS[0]])]) with self.subTest("single list"): - with app.test_request_context("/?geo=state:AK,TK"): - self.assertEqual(parse_geo_arg(), [GeoSet("state", ["ak", "tk"])]) + with app.test_request_context(f"/?geo=fips:{FIPS[0]},{FIPS[1]}"): + self.assertEqual(parse_geo_arg(), [GeoSet("fips", [FIPS[0], FIPS[1]])]) with self.subTest("multi"): - with app.test_request_context("/?geo=state:*;nation:*"): - self.assertEqual(parse_geo_arg(), [GeoSet("state", True), GeoSet("nation", True)]) - with app.test_request_context("/?geo=state:AK;nation:US"): + with app.test_request_context("/?geo=fips:*;msa:*"): + self.assertEqual(parse_geo_arg(), [GeoSet("fips", True), GeoSet("msa", True)]) + with app.test_request_context(f"/?geo=fips:{FIPS[0]};msa:{MSA[0]}"): self.assertEqual( parse_geo_arg(), - [GeoSet("state", ["ak"]), GeoSet("nation", ["us"])], + [GeoSet("fips", [FIPS[0]]), GeoSet("msa", [MSA[0]])], ) - with app.test_request_context("/?geo=state:AK;state:KY"): + with app.test_request_context(f"/?geo=fips:{FIPS[0]};fips:{FIPS[1]}"): self.assertEqual( parse_geo_arg(), - [GeoSet("state", ["ak"]), GeoSet("state", ["ky"])], + [GeoSet("fips", [FIPS[0]]), GeoSet("fips", [FIPS[1]])], ) with self.subTest("multi list"): - with app.test_request_context("/?geo=state:AK,TK;county:42003,40556"): + with app.test_request_context(f"/?geo=fips:{FIPS[0]},{FIPS[1]};msa:{MSA[0]},{MSA[1]}"): self.assertEqual( parse_geo_arg(), [ - GeoSet("state", ["ak", "tk"]), - GeoSet("county", ["42003", "40556"]), + GeoSet("fips", [FIPS[0], FIPS[1]]), + GeoSet("msa", [MSA[0], MSA[1]]), ], ) with self.subTest("hybrid"): - with app.test_request_context("/?geo=nation:*;state:PA;county:42003,42002"): + with app.test_request_context(f"/?geo=nation:*;fips:{FIPS[0]};msa:{MSA[0]},{MSA[1]}"): self.assertEqual( parse_geo_arg(), [ GeoSet("nation", True), - GeoSet("state", ["pa"]), - GeoSet("county", ["42003", "42002"]), + GeoSet("fips", [FIPS[0]]), + GeoSet("msa", [MSA[0], MSA[1]]), ], ) @@ -140,10 +141,10 @@ def test_single_parse_geo_arg(self): with app.test_request_context("/"): self.assertRaises(ValidationFailedException, parse_single_geo_arg, "geo") with self.subTest("single"): - with app.test_request_context("/?geo=state:AK"): - self.assertEqual(parse_single_geo_arg("geo"), GeoSet("state", ["ak"])) + with app.test_request_context(f"/?geo=fips:{FIPS[0]}"): + self.assertEqual(parse_single_geo_arg("geo"), GeoSet("fips", [FIPS[0]])) with self.subTest("single list"): - with app.test_request_context("/?geo=state:AK,TK"): + with app.test_request_context(f"/?geo=fips:{FIPS[0]},{FIPS[1]}"): self.assertRaises(ValidationFailedException, parse_single_geo_arg, "geo") with self.subTest("multi"): with app.test_request_context("/?geo=state:*;nation:*"): diff --git a/tests/server/test_query.py b/tests/server/test_query.py index 53aca5621..ec07d3e8b 100644 --- a/tests/server/test_query.py +++ b/tests/server/test_query.py @@ -21,6 +21,7 @@ TimeSet, SourceSignalSet, ) +from delphi.epidata.acquisition.covidcast.test_utils import FIPS, MSA # py3tester coverage target __test_target__ = "delphi.epidata.server._query" @@ -145,17 +146,17 @@ def test_filter_geo_sets(self): with self.subTest("single"): params = {} self.assertEqual( - filter_geo_sets("t", "v", [GeoSet("state", ["KY"])], "p", params), + filter_geo_sets("t", "v", [GeoSet("fips", [FIPS[0]])], "p", params), "((t = :p_0t AND (v = :p_0t_0)))", ) - self.assertEqual(params, {"p_0t": "state", "p_0t_0": "KY"}) + self.assertEqual(params, {"p_0t": "fips", "p_0t_0": FIPS[0]}) with self.subTest("multi"): params = {} self.assertEqual( - filter_geo_sets("t", "v", [GeoSet("state", ["KY", "AK"])], "p", params), + filter_geo_sets("t", "v", [GeoSet("fips", [FIPS[0], FIPS[1]])], "p", params), "((t = :p_0t AND (v = :p_0t_0 OR v = :p_0t_1)))", ) - self.assertEqual(params, {"p_0t": "state", "p_0t_0": "KY", "p_0t_1": "AK"}) + self.assertEqual(params, {"p_0t": "fips", "p_0t_0": FIPS[0], "p_0t_1": FIPS[1]}) with self.subTest("multiple pairs"): params = {} self.assertEqual( @@ -175,7 +176,7 @@ def test_filter_geo_sets(self): filter_geo_sets( "t", "v", - [GeoSet("state", ["AK"]), GeoSet("nation", ["US"])], + [GeoSet("fips", [FIPS[0]]), GeoSet("msa", [MSA[0]])], "p", params, ), @@ -183,7 +184,7 @@ def test_filter_geo_sets(self): ) self.assertEqual( params, - {"p_0t": "state", "p_0t_0": "AK", "p_1t": "nation", "p_1t_0": "US"}, + {"p_0t": "fips", "p_0t_0": FIPS[0], "p_1t": "msa", "p_1t_0": MSA[0]}, ) def test_filter_source_signal_sets(self):