|
16 | 16 | """ |
17 | 17 | import time |
18 | 18 | from datetime import date, datetime, timedelta |
| 19 | +from itertools import product |
19 | 20 |
|
20 | 21 | import numpy as np |
21 | | -from delphi_utils import get_structured_logger |
| 22 | +from delphi_utils import get_structured_logger, GeoMapper |
22 | 23 | from delphi_utils.export import create_export_csv |
23 | 24 |
|
24 | 25 | from .constants import GEOS, PRELIM_SIGNALS_MAP, SIGNALS_MAP |
@@ -54,26 +55,39 @@ def run_module(params): |
54 | 55 | nhsn_df = pull_nhsn_data(socrata_token, backup_dir, custom_run=custom_run, logger=logger) |
55 | 56 | preliminary_nhsn_df = pull_preliminary_nhsn_data(socrata_token, backup_dir, custom_run=custom_run, logger=logger) |
56 | 57 |
|
57 | | - for signals, df_pull in [(SIGNALS_MAP.keys(), nhsn_df), (PRELIM_SIGNALS_MAP.keys(), preliminary_nhsn_df)]: |
| 58 | + geo_mapper = GeoMapper() |
| 59 | + signal_df_dict = {signal: nhsn_df for signal in SIGNALS_MAP.keys()} |
| 60 | + signal_df_dict.update({signal: preliminary_nhsn_df for signal in PRELIM_SIGNALS_MAP.keys()}) |
| 61 | + |
| 62 | + for signal, df_pull in signal_df_dict.items(): |
58 | 63 | for geo in GEOS: |
| 64 | + df = df_pull.copy() |
| 65 | + df = df[["timestamp", "geo_id", signal]] |
| 66 | + df.rename({signal: "val"}, axis=1, inplace=True) |
59 | 67 | if geo == "nation": |
60 | | - df = df_pull[df_pull["geo_id"] == "USA"] |
| 68 | + df = df[df["geo_id"] == "us"] |
| 69 | + elif geo == "hhs": |
| 70 | + df = df[df["geo_id"] != "us"] |
| 71 | + df = geo_mapper.add_population_column(df, geocode_type="state_id", geocode_col="geo_id") |
| 72 | + df = geo_mapper.add_geocode(df, "state_id", "state_code", from_col="state_id") |
| 73 | + df = geo_mapper.add_geocode(df, "state_code", "hhs", from_col="state_code", new_col="geo_id") |
| 74 | + df = geo_mapper.aggregate_by_weighted_sum(df, "geo_id_y", "val", "timestamp", "population") |
| 75 | + df = df.rename(columns={"weighted_val": "val"}) |
61 | 76 | else: |
62 | | - df = df_pull[df_pull["geo_id"] != "USA"] |
63 | | - for signal in signals: |
64 | | - df["val"] = df[signal] |
65 | | - df["se"] = np.nan |
66 | | - df["sample_size"] = np.nan |
67 | | - dates = create_export_csv( |
68 | | - df, |
69 | | - geo_res=geo, |
70 | | - export_dir=export_dir, |
71 | | - start_date=datetime.strptime(export_start_date, "%Y-%m-%d"), |
72 | | - sensor=signal, |
73 | | - weekly_dates=True, |
74 | | - ) |
75 | | - if len(dates) > 0: |
76 | | - run_stats.append((max(dates), len(dates))) |
| 77 | + df = df[df_pull["geo_id"] != "us"] |
| 78 | + df["se"] = np.nan |
| 79 | + df["sample_size"] = np.nan |
| 80 | + print(signal, geo) |
| 81 | + dates = create_export_csv( |
| 82 | + df, |
| 83 | + geo_res=geo, |
| 84 | + export_dir=export_dir, |
| 85 | + start_date=datetime.strptime(export_start_date, "%Y-%m-%d"), |
| 86 | + sensor=signal, |
| 87 | + weekly_dates=True, |
| 88 | + ) |
| 89 | + if len(dates) > 0: |
| 90 | + run_stats.append((max(dates), len(dates))) |
77 | 91 |
|
78 | 92 | elapsed_time_in_seconds = round(time.time() - start_time, 2) |
79 | 93 | min_max_date = run_stats and min(s[0] for s in run_stats) |
|
0 commit comments