Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
022c1e0
first implimentation
aysim319 Nov 14, 2024
541b0aa
figuring out metric/sensor to use
aysim319 Nov 15, 2024
1c79068
first take
aysim319 Nov 18, 2024
9a9780b
working on test
aysim319 Nov 19, 2024
24a17cd
added preliminary data source
aysim319 Nov 20, 2024
dde6685
adding indicator for gitaction
aysim319 Nov 20, 2024
f5ad4c9
lint
aysim319 Nov 20, 2024
aedf0e9
replace with setup.py
aysim319 Nov 20, 2024
25c24f1
more lint
aysim319 Nov 20, 2024
4a4e169
fixed date range for test
aysim319 Nov 20, 2024
3cfcdf6
lint
aysim319 Nov 20, 2024
b818ca8
Update DETAILS.md
aysim319 Nov 20, 2024
eca8fe0
fix output data
aysim319 Nov 20, 2024
711ace3
analysis in progress
aysim319 Nov 21, 2024
48a8a76
lint and suggestions
aysim319 Nov 21, 2024
8117096
more analysis
aysim319 Nov 22, 2024
411694c
add hhs geo aggregate
aysim319 Nov 25, 2024
f42c2e9
more analysis
aysim319 Nov 25, 2024
7da9e5a
update DETAILS.md
aysim319 Nov 25, 2024
1b4c277
Update nhsn/params.json.template
aysim319 Nov 25, 2024
84f34fa
Update nhsn/params.json.template
aysim319 Nov 25, 2024
1dc6343
Merge remote-tracking branch 'origin/nhsn_indicator' into nhsn_indicator
aysim319 Nov 25, 2024
3b5fbee
cleaning up anaylsis
aysim319 Nov 25, 2024
e97c3cc
rename geo_id column name
aysim319 Nov 26, 2024
26d69f3
suggested / needed to deploy
aysim319 Nov 27, 2024
678822a
adding default locations for deployment
aysim319 Nov 27, 2024
840ebb7
fix geo aggregation for hhs
aysim319 Dec 2, 2024
de601d6
Update nhsn/params.json.template
aysim319 Dec 2, 2024
928a5c7
lint
aysim319 Dec 3, 2024
3ba877e
needed to add hhs in to geo for tests
aysim319 Dec 5, 2024
162152e
fixed and added more plots
aysim319 Dec 6, 2024
5f26ff1
cleaning up notebook and adding details
aysim319 Dec 6, 2024
0000c73
new signal name
aysim319 Dec 10, 2024
de9dc95
needed to update the type dict also
aysim319 Dec 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/python-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ jobs:
dir: "delphi_quidel_covidtest"
- package: "sir_complainsalot"
dir: "delphi_sir_complainsalot"
- package: "nhsn"
dir: "delphi_nhsn"
defaults:
run:
working-directory: ${{ matrix.package }}
Expand Down
18 changes: 18 additions & 0 deletions nhsn/DETAILS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# NHSN data

We import the NHSN Weekly Hospital Respiratory Data

There are 2 sources we grab data from for nhsn:

Primary source: https://data.cdc.gov/Public-Health-Surveillance/Weekly-Hospital-Respiratory-Data-HRD-Metrics-by-Ju/ua7e-t2fy/about_data
Secondary (preliminary source): https://data.cdc.gov/Public-Health-Surveillance/Weekly-Hospital-Respiratory-Data-HRD-Metrics-by-Ju/mpgq-jmmr/about_data

## Geographical Levels
* `state`: reported using two-letter postal code
* `national`: just `us` for now

## Metrics
* `confirmed_admissions_covid`: total number of confirmed admission for covid
* `confirmed_admissions_flu`: total number of confirmed admission for flu
* `prelim_confirmed_admissions_covid`: total number of confirmed admission for covid from preliminary source
* `prelim_confirmed_admissions_flu`: total number of confirmed admission for flu from preliminary source
32 changes: 32 additions & 0 deletions nhsn/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
.PHONY = venv, lint, test, clean

dir = $(shell find ./delphi_* -name __init__.py | grep -o 'delphi_[_[:alnum:]]*' | head -1)
venv:
python3.8 -m venv env

install: venv
. env/bin/activate; \
pip install wheel ; \
pip install -e ../_delphi_utils_python ;\
pip install -e .

install-ci: venv
. env/bin/activate; \
pip install wheel ; \
pip install ../_delphi_utils_python ;\
pip install .

lint:
. env/bin/activate; pylint $(dir) --rcfile=../pyproject.toml
. env/bin/activate; pydocstyle $(dir)

format:
. env/bin/activate; darker $(dir)

test:
. env/bin/activate ;\
(cd tests && ../env/bin/pytest --cov=$(dir) --cov-report=term-missing)

clean:
rm -rf env
rm -f params.json
13 changes: 13 additions & 0 deletions nhsn/delphi_nhsn/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# -*- coding: utf-8 -*-
"""Module to pull and clean indicators from the XXXXX source.

This file defines the functions that are made public by the module. As the
module is intended to be executed though the main method, these are primarily
for testing.
"""

from __future__ import absolute_import

from . import run

__version__ = "0.1.0"
13 changes: 13 additions & 0 deletions nhsn/delphi_nhsn/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# -*- coding: utf-8 -*-
"""Call the function run_module when executed.

This file indicates that calling the module (`python -m MODULE_NAME`) will
call the function `run_module` found within the run.py file. There should be
no need to change this template.
"""

from delphi_utils import read_params

from .run import run_module # pragma: no cover

run_module(read_params()) # pragma: no cover
31 changes: 31 additions & 0 deletions nhsn/delphi_nhsn/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""Registry for signal names."""

GEOS = ["state", "nation"]

# column name from socrata
TOTAL_ADMISSION_COVID_API = "totalconfc19newadm"
TOTAL_ADMISSION_FLU_API = "totalconfflunewadm"

SIGNALS_MAP = {
"confirmed_admissions_covid": TOTAL_ADMISSION_COVID_API,
"confirmed_admissions_flu": TOTAL_ADMISSION_FLU_API,
}

TYPE_DICT = {
"timestamp": "datetime64[ns]",
"geo_id": str,
"confirmed_admissions_covid": float,
"confirmed_admissions_flu": float,
}

# signal mapping for secondary, preliminary source
PRELIM_SIGNALS_MAP = {
"prelim_confirmed_admissions_covid": TOTAL_ADMISSION_COVID_API,
"prelim_confirmed_admissions_flu": TOTAL_ADMISSION_FLU_API,
}
PRELIM_TYPE_DICT = {
"timestamp": "datetime64[ns]",
"geo_id": str,
"prelim_confirmed_admissions_covid": float,
"prelim_confirmed_admissions_flu": float,
}
119 changes: 119 additions & 0 deletions nhsn/delphi_nhsn/pull.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# -*- coding: utf-8 -*-
"""Functions for pulling NSSP ER data."""
import logging
from typing import Optional

import pandas as pd
from delphi_utils import create_backup_csv
from sodapy import Socrata

from .constants import PRELIM_SIGNALS_MAP, PRELIM_TYPE_DICT, SIGNALS_MAP, TYPE_DICT


def pull_data(socrata_token: str, dataset_id: str):
"""Pull data from Socrata API."""
client = Socrata("data.cdc.gov", socrata_token)
results = []
offset = 0
limit = 50000 # maximum limit allowed by SODA 2.0
while True:
page = client.get(dataset_id, limit=limit, offset=offset)
if not page:
break # exit the loop if no more results
results.extend(page)
offset += limit

df = pd.DataFrame.from_records(results)
return df


def pull_nhsn_data(socrata_token: str, backup_dir: str, custom_run: bool, logger: Optional[logging.Logger] = None):
"""Pull the latest NSSP ER visits data, and conforms it into a dataset.

The output dataset has:

- Each row corresponds to a single observation
- Each row additionally has columns for the signals in SIGNALS

Parameters
----------
socrata_token: str
My App Token for pulling the NHSN data
backup_dir: str
Directory to which to save raw backup data
custom_run: bool
Flag indicating if the current run is a patch. If so, don't save any data to disk
logger: Optional[logging.Logger]
logger object

Returns
-------
pd.DataFrame
Dataframe as described above.
"""
# Pull data from Socrata API
df = pull_data(socrata_token, dataset_id="ua7e-t2fy")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's easier to test transform logic if you separate the Extract and Transform steps. Probably fine in this case though, since we're not really transforming it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is holdover from my nssp and nchs dirty code practice. If we aren't doing a full rewrite soon, I would refactor them at some point. For now, they all work (ugly).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I was bouncing back and forth, part of it being what if the other data source has different transformations down the line? maybe better to have a method defined? something along those lines. I did refactor the pull_data, since that doesn't seem like it would change


keep_columns = list(TYPE_DICT.keys())

if not df.empty:
create_backup_csv(df, backup_dir, custom_run, logger=logger)

df = df.rename(columns={"weekendingdate": "timestamp", "jurisdiction": "geo_id"})

for signal, col_name in SIGNALS_MAP.items():
df[signal] = df[col_name]

df = df[keep_columns]
df = df.astype(TYPE_DICT)
else:
df = pd.DataFrame(columns=keep_columns)

return df


def pull_preliminary_nhsn_data(
socrata_token: str, backup_dir: str, custom_run: bool, logger: Optional[logging.Logger] = None
):
"""Pull the latest NSSP ER visits data, and conforms it into a dataset.

The output dataset has:

- Each row corresponds to a single observation
- Each row additionally has columns for the signals in SIGNALS

Parameters
----------
socrata_token: str
My App Token for pulling the NHSN data
backup_dir: str
Directory to which to save raw backup data
custom_run: bool
Flag indicating if the current run is a patch. If so, don't save any data to disk
logger: Optional[logging.Logger]
logger object

Returns
-------
pd.DataFrame
Dataframe as described above.
"""
# Pull data from Socrata API
df = pull_data(socrata_token, dataset_id="mpgq-jmmr")

keep_columns = list(PRELIM_TYPE_DICT.keys())

if not df.empty:
create_backup_csv(df, backup_dir, custom_run, sensor="prelim", logger=logger)

df = df.rename(columns={"weekendingdate": "timestamp", "jurisdiction": "geo_id"})

for signal, col_name in PRELIM_SIGNALS_MAP.items():
df[signal] = df[col_name]

df = df[keep_columns]
df = df.astype(PRELIM_TYPE_DICT)
else:
df = pd.DataFrame(columns=keep_columns)

return df
90 changes: 90 additions & 0 deletions nhsn/delphi_nhsn/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# -*- coding: utf-8 -*-
"""Functions to call when running the function.

This module should contain a function called `run_module`, that is executed
when the module is run with `python -m MODULE_NAME`. `run_module`'s lone argument should be a
nested dictionary of parameters loaded from the params.json file. We expect the `params` to have
the following structure:
- "common":
- "export_dir": str, directory to which the results are exported
- "log_filename": (optional) str, path to log file
- "indicator": (optional)
- "wip_signal": (optional) Any[str, bool], list of signals that are works in progress, or
True if all signals in the registry are works in progress, or False if only
unpublished signals are. See `delphi_utils.add_prefix()`
- Any other indicator-specific settings
"""
import time
from datetime import date, datetime, timedelta

import numpy as np
import pandas as pd
from delphi_utils import get_structured_logger
from delphi_utils.export import create_export_csv

from .constants import GEOS, PRELIM_SIGNALS_MAP, SIGNALS_MAP
from .pull import pull_nhsn_data, pull_preliminary_nhsn_data


def run_module(params):
"""
Run the indicator.

Arguments
--------
params: Dict[str, Any]
Nested dictionary of parameters.
"""
start_time = time.time()
logger = get_structured_logger(
__name__,
filename=params["common"].get("log_filename"),
log_exceptions=params["common"].get("log_exceptions", True),
)
export_dir = params["common"]["export_dir"]
backup_dir = params["common"]["backup_dir"]
custom_run = params["common"].get("custom_run", False)
socrata_token = params["indicator"]["socrata_token"]
export_start_date = params["indicator"]["export_start_date"]
run_stats = []

if export_start_date == "latest": # Find the previous Saturday
export_start_date = date.today() - timedelta(days=date.today().weekday() + 2)
export_start_date = export_start_date.strftime("%Y-%m-%d")

nhsn_df = pull_nhsn_data(socrata_token, backup_dir, custom_run=custom_run, logger=logger)
preliminary_nhsn_df = pull_preliminary_nhsn_data(socrata_token, backup_dir, custom_run=custom_run, logger=logger)

for signals, df_pull in [(SIGNALS_MAP.keys(), nhsn_df), (PRELIM_SIGNALS_MAP.keys(), preliminary_nhsn_df)]:
for geo in GEOS:
if geo == "nation":
df = df_pull[df_pull["geo_id"] == "USA"]
else:
df = df_pull[df_pull["geo_id"] != "USA"]
for signal in signals:
df["val"] = df[signal]
df["se"] = np.nan
df["sample_size"] = np.nan
dates = create_export_csv(
df,
geo_res=geo,
export_dir=export_dir,
start_date=datetime.strptime(export_start_date, "%Y-%m-%d"),
sensor=signal,
weekly_dates=True,
)
if len(dates) > 0:
run_stats.append((max(dates), len(dates)))

elapsed_time_in_seconds = round(time.time() - start_time, 2)
min_max_date = run_stats and min(s[0] for s in run_stats)
csv_export_count = sum(s[-1] for s in run_stats)
max_lag_in_days = min_max_date and (datetime.now() - min_max_date).days
formatted_min_max_date = min_max_date and min_max_date.strftime("%Y-%m-%d")
logger.info(
"Completed indicator run",
elapsed_time_in_seconds=elapsed_time_in_seconds,
csv_export_count=csv_export_count,
max_lag_in_days=max_lag_in_days,
oldest_final_export_date=formatted_min_max_date,
)
22 changes: 22 additions & 0 deletions nhsn/params.json.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"common": {
"export_dir": "./receiving",
"log_filename": "NAME.log"
},
"validation": {
"common": {
"data_source": "NAME",
"span_length": 14,
"min_expected_lag": {"all": "1"},
"max_expected_lag": {"all": "3"},
"dry_run": true,
"suppressed_errors": []
},
"static": {
"minimum_sample_size": 0,
"missing_se_allowed": true,
"missing_sample_size_allowed": true
},
"dynamic": {}
}
}
Empty file added nhsn/receiving/.gitignore
Empty file.
32 changes: 32 additions & 0 deletions nhsn/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from setuptools import setup
from setuptools import find_packages

required = [
"numpy",
"pandas",
"pydocstyle",
"pytest",
"pytest-cov",
"pylint==2.8.3",
"delphi-utils",
"sodapy",
"epiweeks",
"freezegun",
"us",
]

setup(
name="delphi_nhsn",
version="0.1.0",
description="Indicators NHSN Hospital Respiratory Data",
author="",
author_email="",
url="https://github.com/cmu-delphi/covidcast-indicators",
install_requires=required,
classifiers=[
"Development Status :: 1 - Planning",
"Intended Audience :: Developers",
"Programming Language :: Python :: 3.8",
],
packages=find_packages(),
)
Empty file added nhsn/static/.gitignore
Empty file.
Empty file added nhsn/tests/backups/.gitignore
Empty file.
Loading
Loading