Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 29 additions & 10 deletions nhsn/delphi_nhsn/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,48 @@
PRELIM_DATASET_ID = "mpgq-jmmr"

# column name from socrata
TOTAL_ADMISSION_COVID_API = "totalconfc19newadm"
TOTAL_ADMISSION_FLU_API = "totalconfflunewadm"
TOTAL_ADMISSION_COVID_COL = "totalconfc19newadm"
TOTAL_ADMISSION_FLU_COL = "totalconfflunewadm"
NUM_HOSP_REPORTING_COVID_COL = "totalconfc19newadmhosprep"
NUM_HOSP_REPORTING_FLU_COL = "totalconfflunewadmhosprep"

# signal name
TOTAL_ADMISSION_COVID = "confirmed_admissions_covid_ew"
TOTAL_ADMISSION_FLU = "confirmed_admissions_flu_ew"
NUM_HOSP_REPORTING_COVID = "hosprep_confirmed_admissions_covid_ew"
NUM_HOSP_REPORTING_FLU = "hosprep_confirmed_admissions_flu_ew"

SIGNALS_MAP = {
"confirmed_admissions_covid_ew": TOTAL_ADMISSION_COVID_API,
"confirmed_admissions_flu_ew": TOTAL_ADMISSION_FLU_API,
TOTAL_ADMISSION_COVID: TOTAL_ADMISSION_COVID_COL,
TOTAL_ADMISSION_FLU: TOTAL_ADMISSION_FLU_COL,
NUM_HOSP_REPORTING_COVID: NUM_HOSP_REPORTING_COVID_COL,
NUM_HOSP_REPORTING_FLU: NUM_HOSP_REPORTING_FLU_COL,
}

TYPE_DICT = {
"timestamp": "datetime64[ns]",
"geo_id": str,
"confirmed_admissions_covid_ew": float,
"confirmed_admissions_flu_ew": float,
TOTAL_ADMISSION_COVID: float,
TOTAL_ADMISSION_FLU: float,
NUM_HOSP_REPORTING_COVID: float,
NUM_HOSP_REPORTING_FLU: float,
}

# signal mapping for secondary, preliminary source
# made copy incase things would diverge

PRELIM_SIGNALS_MAP = {
"confirmed_admissions_covid_ew_prelim": TOTAL_ADMISSION_COVID_API,
"confirmed_admissions_flu_ew_prelim": TOTAL_ADMISSION_FLU_API,
f"{TOTAL_ADMISSION_COVID}_prelim": TOTAL_ADMISSION_COVID_COL,
f"{TOTAL_ADMISSION_FLU}_prelim": TOTAL_ADMISSION_FLU_COL,
f"{NUM_HOSP_REPORTING_COVID}_prelim": NUM_HOSP_REPORTING_COVID_COL,
f"{NUM_HOSP_REPORTING_FLU}_prelim": NUM_HOSP_REPORTING_FLU_COL,
}

PRELIM_TYPE_DICT = {
"timestamp": "datetime64[ns]",
"geo_id": str,
"confirmed_admissions_covid_ew_prelim": float,
"confirmed_admissions_flu_ew_prelim": float,
f"{TOTAL_ADMISSION_COVID}_prelim": float,
f"{TOTAL_ADMISSION_FLU}_prelim": float,
f"{NUM_HOSP_REPORTING_COVID}_prelim": float,
f"{NUM_HOSP_REPORTING_FLU}_prelim": float,
}
109 changes: 91 additions & 18 deletions nhsn/delphi_nhsn/pull.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,87 @@
# -*- coding: utf-8 -*-
"""Functions for pulling NSSP ER data."""
import logging
import random
import time
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
from urllib.error import HTTPError

import pandas as pd
from delphi_epidata import Epidata
from delphi_utils import create_backup_csv
from epiweeks import Week
from sodapy import Socrata

from .constants import MAIN_DATASET_ID, PRELIM_DATASET_ID, PRELIM_SIGNALS_MAP, PRELIM_TYPE_DICT, SIGNALS_MAP, TYPE_DICT


def pull_data(socrata_token: str, dataset_id: str):
def get_latest_nhsn_issue_date() -> datetime:
"""Check the api and return approxiate latest issue date available."""
response = Epidata.covidcast_meta()
meta_df = pd.DataFrame(response["epidata"])
nhsn_df = meta_df[meta_df["data_source"] == "nhsn"]
last_updated = datetime.utcfromtimestamp(min(nhsn_df.last_update))
max_issue = Week.fromstring(str(min(nhsn_df.max_issue)))
# last_updated can be multiple days off from max issue from patching / other processes
if max_issue.startdate() <= last_updated.date() <= max_issue.enddate():
return last_updated
# defaults to start of epiweek if last_update is vastly different from max_issue
return datetime.combine(max_issue.startdate(), datetime.min.time())


def check_last_updated(client, dataset_id, logger):
"""Check last updated timestamp to determine data should be pulled or not."""
# retry logic for 500 error
try:
response = client.get_metadata(dataset_id)
except HTTPError as err:
if err.code == 503:
time.sleep(2 + random.randint(0, 1000) / 1000.0)
response = client.get_metadata(dataset_id)
else:
raise err

updated_timestamp = datetime.utcfromtimestamp(int(response["rowsUpdatedAt"]))
now = datetime.utcnow()
recently_updated_source = (now - updated_timestamp) < timedelta(days=1)

latest_issue_date = get_latest_nhsn_issue_date()
# generally expect one issue per week
recently_updated_api = (updated_timestamp - latest_issue_date) < timedelta(days=6)

prelim_prefix = "Preliminary " if dataset_id == PRELIM_DATASET_ID else ""
if recently_updated_source:
logger.info(f"{prelim_prefix}NHSN data was recently updated; Pulling data", updated_timestamp=updated_timestamp)
elif not recently_updated_api:
logger.info(
f"{prelim_prefix}NHSN data is missing issue; Pulling data",
updated_timestamp=updated_timestamp,
issue=latest_issue_date,
)
else:
logger.info(f"{prelim_prefix}NHSN data is stale; Skipping", updated_timestamp=updated_timestamp)
return recently_updated_source or not recently_updated_api


def pull_data(socrata_token: str, dataset_id: str, logger):
"""Pull data from Socrata API."""
client = Socrata("data.cdc.gov", socrata_token)
results = []
offset = 0
limit = 50000 # maximum limit allowed by SODA 2.0
while True:
page = client.get(dataset_id, limit=limit, offset=offset)
if not page:
break # exit the loop if no more results
results.extend(page)
offset += limit

df = pd.DataFrame.from_records(results)
recently_updated = check_last_updated(client, dataset_id, logger)
df = pd.DataFrame()
if recently_updated:
results = []
offset = 0
limit = 50000 # maximum limit allowed by SODA 2.0
while True:
page = client.get(dataset_id, limit=limit, offset=offset)
if not page:
break # exit the loop if no more results
results.extend(page)
offset += limit

df = pd.DataFrame.from_records(results)
return df


Expand Down Expand Up @@ -89,7 +146,7 @@ def pull_nhsn_data(
"""
# Pull data from Socrata API
df = (
pull_data(socrata_token, dataset_id=MAIN_DATASET_ID)
pull_data(socrata_token, MAIN_DATASET_ID, logger)
if not custom_run
else pull_data_from_file(backup_dir, issue_date, logger, prelim_flag=False)
)
Expand All @@ -102,12 +159,20 @@ def pull_nhsn_data(
df = df.rename(columns={"weekendingdate": "timestamp", "jurisdiction": "geo_id"})

for signal, col_name in SIGNALS_MAP.items():
df[signal] = df[col_name]
# older backups don't have certain columns
try:
df[signal] = df[col_name]
except KeyError:
logger.info("column not available in data", col_name=col_name)
keep_columns.remove(signal)

df = df[keep_columns]
df["geo_id"] = df["geo_id"].str.lower()
df.loc[df["geo_id"] == "usa", "geo_id"] = "us"
df = df.astype(TYPE_DICT)
try:
df = df.astype(TYPE_DICT)
except KeyError:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this just pass?

Copy link
Contributor Author

@aysim319 aysim319 Feb 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was that some of the older data didn't have newer signals (rsv, reporting hospitals) in the source back up it would log, but resume the patching process.

Previously I tried modify TYPE_DICT, but being mutable caused some issue in patching runs. So this was the next solution...is it a good one ehhh....I log that that the signal is unavailable eariler (line 150) and I thought I shouldn't log basically the same message twice

Since you brought up...I should also check if the rest of the columns actually changed data types and maybe look into a less janky way

pass
else:
df = pd.DataFrame(columns=keep_columns)

Expand Down Expand Up @@ -144,8 +209,9 @@ def pull_preliminary_nhsn_data(
pd.DataFrame
Dataframe as described above.
"""
# Pull data from Socrata API
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: pull_preliminary_nhsn_data and pull_nhsn_data are really similar. I think it will become a maintenance issue to keep both. We should probably keep these two functions as wrappers of a shared fn that takes a is_prelim flag (or similar).

Diff of the two fns:

Screen Shot 2025-02-11 at 15 20 55

Copy link
Contributor Author

@aysim319 aysim319 Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know they're similar, i thought about it and went back and forth about it but I was in the thought of maybe in the future there would be something different going on so kept it seperate. I'm not too concerned about this, since we'll be slowly deprecating this codebase;

df = (
pull_data(socrata_token, dataset_id=PRELIM_DATASET_ID)
pull_data(socrata_token, PRELIM_DATASET_ID, logger)
if not custom_run
else pull_data_from_file(backup_dir, issue_date, logger, prelim_flag=True)
)
Expand All @@ -158,10 +224,17 @@ def pull_preliminary_nhsn_data(
df = df.rename(columns={"weekendingdate": "timestamp", "jurisdiction": "geo_id"})

for signal, col_name in PRELIM_SIGNALS_MAP.items():
df[signal] = df[col_name]
try:
df[signal] = df[col_name]
except KeyError:
logger.info("column not available in data", col_name=col_name, signal=signal)
keep_columns.remove(signal)

df = df[keep_columns]
df = df.astype(PRELIM_TYPE_DICT)
try:
df = df.astype(PRELIM_TYPE_DICT)
except KeyError:
pass
df["geo_id"] = df["geo_id"].str.lower()
df.loc[df["geo_id"] == "usa", "geo_id"] = "us"
else:
Expand Down
24 changes: 19 additions & 5 deletions nhsn/delphi_nhsn/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
unpublished signals are. See `delphi_utils.add_prefix()`
- Any other indicator-specific settings
"""
import re
import time
from datetime import date, datetime, timedelta
from itertools import product

import numpy as np
from delphi_utils import GeoMapper, get_structured_logger
Expand Down Expand Up @@ -59,16 +61,20 @@ def run_module(params, logger=None):
)

geo_mapper = GeoMapper()
signal_df_dict = {signal: nhsn_df for signal in SIGNALS_MAP}
# some of the source backups do not include for preliminary data TODO remove after first patch
signal_df_dict = dict()
if not nhsn_df.empty:
signal_df_dict.update({signal: nhsn_df for signal in SIGNALS_MAP})
# some of the source backups do not include for preliminary data
if not preliminary_nhsn_df.empty:
signal_df_dict.update({signal: preliminary_nhsn_df for signal in PRELIM_SIGNALS_MAP})

for signal, df_pull in signal_df_dict.items():
for geo in GEOS:
df = df_pull.copy()
for geo, signals_df in product(GEOS, signal_df_dict.items()):
signal, df_pull = signals_df
df = df_pull.copy()
try:
df = df[["timestamp", "geo_id", signal]]
df.rename({signal: "val"}, axis=1, inplace=True)

if geo == "nation":
df = df[df["geo_id"] == "us"]
elif geo == "hhs":
Expand Down Expand Up @@ -96,6 +102,14 @@ def run_module(params, logger=None):
)
if len(dates) > 0:
run_stats.append((max(dates), len(dates)))
# some signal columns are unavailable for patching
except KeyError as e:
missing_signal = re.search(r"'([^']*)'", str(e)).group(1)
full_signal_list = list(SIGNALS_MAP.keys()) + list(PRELIM_SIGNALS_MAP.keys())
if missing_signal in full_signal_list:
logger.info("signal not available in data", signal=missing_signal)
else:
raise RuntimeError("Column(s) that shouldn't be missing is missing") from e

elapsed_time_in_seconds = round(time.time() - start_time, 2)
min_max_date = run_stats and min(s[0] for s in run_stats)
Expand Down
5 changes: 4 additions & 1 deletion nhsn/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import copy
import json
import time
from unittest.mock import patch

import pytest
Expand Down Expand Up @@ -60,7 +61,8 @@ def params_w_patch(params):

@pytest.fixture(scope="function")
def run_as_module(params):
with patch('sodapy.Socrata.get') as mock_get:
with patch('sodapy.Socrata.get') as mock_get, \
patch('sodapy.Socrata.get_metadata') as mock_get_metadata:
def side_effect(*args, **kwargs):
if kwargs['offset'] == 0:
if "ua7e-t2fy" in args[0]:
Expand All @@ -70,5 +72,6 @@ def side_effect(*args, **kwargs):
else:
return []
mock_get.side_effect = side_effect
mock_get_metadata.return_value = {"rowsUpdatedAt": time.time()}
run_module(params)

Empty file added nhsn/tests/patch_dir/.gitignore
Empty file.
Binary file added nhsn/tests/test_data/20241119.csv.gz
Binary file not shown.
Loading