|
9 | 9 | from urllib.error import HTTPError |
10 | 10 |
|
11 | 11 | import pandas as pd |
12 | | -from delphi_epidata import Epidata |
13 | 12 | from delphi_utils import create_backup_csv |
14 | | -from epiweeks import Week |
15 | 13 | from sodapy import Socrata |
16 | 14 |
|
17 | 15 | from .constants import MAIN_DATASET_ID, PRELIM_DATASET_ID, PRELIM_SIGNALS_MAP, PRELIM_TYPE_DICT, SIGNALS_MAP, TYPE_DICT |
18 | 16 |
|
19 | 17 |
|
20 | | -def get_latest_nhsn_issue_date() -> datetime: |
21 | | - """Check the api and return approxiate latest issue date available.""" |
22 | | - response = Epidata.covidcast_meta() |
23 | | - meta_df = pd.DataFrame(response["epidata"]) |
24 | | - nhsn_df = meta_df[meta_df["data_source"] == "nhsn"] |
25 | | - last_updated = datetime.utcfromtimestamp(min(nhsn_df.last_update)) |
26 | | - max_issue = Week.fromstring(str(min(nhsn_df.max_issue))) |
27 | | - # last_updated can be multiple days off from max issue from patching / other processes |
28 | | - if max_issue.startdate() <= last_updated.date() <= max_issue.enddate(): |
29 | | - return last_updated |
30 | | - # defaults to start of epiweek if last_update is vastly different from max_issue |
31 | | - return datetime.combine(max_issue.startdate(), datetime.min.time()) |
32 | | - |
33 | | - |
34 | | -def check_last_updated(client, dataset_id, logger): |
| 18 | +def check_last_updated(socrata_token, dataset_id, logger): |
35 | 19 | """Check last updated timestamp to determine data should be pulled or not.""" |
36 | | - # retry logic for 500 error |
37 | | - try: |
38 | | - response = client.get_metadata(dataset_id) |
39 | | - except HTTPError as err: |
40 | | - if err.code == 503: |
41 | | - time.sleep(2 + random.randint(0, 1000) / 1000.0) |
42 | | - response = client.get_metadata(dataset_id) |
43 | | - else: |
44 | | - raise err |
| 20 | + client = Socrata("data.cdc.gov", socrata_token) |
| 21 | + response = client.get_metadata(dataset_id) |
45 | 22 |
|
46 | 23 | updated_timestamp = datetime.utcfromtimestamp(int(response["rowsUpdatedAt"])) |
47 | 24 | now = datetime.utcnow() |
48 | 25 | recently_updated_source = (now - updated_timestamp) < timedelta(days=1) |
49 | 26 |
|
50 | | - latest_issue_date = get_latest_nhsn_issue_date() |
51 | | - # generally expect one issue per week |
52 | | - recently_updated_api = (updated_timestamp - latest_issue_date) < timedelta(days=6) |
53 | | - |
54 | 27 | prelim_prefix = "Preliminary " if dataset_id == PRELIM_DATASET_ID else "" |
55 | 28 | if recently_updated_source: |
56 | 29 | logger.info(f"{prelim_prefix}NHSN data was recently updated; Pulling data", updated_timestamp=updated_timestamp) |
57 | | - elif not recently_updated_api: |
58 | | - logger.info( |
59 | | - f"{prelim_prefix}NHSN data is missing issue; Pulling data", |
60 | | - updated_timestamp=updated_timestamp, |
61 | | - issue=latest_issue_date, |
62 | | - ) |
63 | 30 | else: |
64 | 31 | logger.info(f"{prelim_prefix}NHSN data is stale; Skipping", updated_timestamp=updated_timestamp) |
65 | | - return recently_updated_source or not recently_updated_api |
| 32 | + return recently_updated_source |
66 | 33 |
|
67 | 34 |
|
68 | | -def pull_data(socrata_token: str, dataset_id: str, logger): |
| 35 | +def pull_data(socrata_token: str, dataset_id: str, backup_dir:str, logger): |
69 | 36 | """Pull data from Socrata API.""" |
70 | 37 | client = Socrata("data.cdc.gov", socrata_token) |
71 | | - recently_updated = check_last_updated(client, dataset_id, logger) |
72 | | - df = pd.DataFrame() |
73 | | - if recently_updated: |
74 | | - results = [] |
75 | | - offset = 0 |
76 | | - limit = 50000 # maximum limit allowed by SODA 2.0 |
| 38 | + logger.info("Pulling data from Socrata API") |
| 39 | + results = [] |
| 40 | + offset = 0 |
| 41 | + limit = 50000 # maximum limit allowed by SODA 2.0 |
| 42 | + # retry logic for 500 error |
| 43 | + try: |
77 | 44 | page = client.get(dataset_id, limit=limit, offset=offset) |
78 | | - while page: |
79 | | - results.extend(page) |
80 | | - offset += limit |
| 45 | + except HTTPError as err: |
| 46 | + if err.code == 503: |
| 47 | + time.sleep(2 + random.randint(0, 1000) / 1000.0) |
81 | 48 | page = client.get(dataset_id, limit=limit, offset=offset) |
| 49 | + else: |
| 50 | + raise err |
| 51 | + |
| 52 | + while len(page) > 0: |
| 53 | + results.extend(page) |
| 54 | + offset += limit |
| 55 | + page = client.get(dataset_id, limit=limit, offset=offset) |
| 56 | + |
| 57 | + if results: |
82 | 58 | df = pd.DataFrame.from_records(results) |
| 59 | + create_backup_csv(df, backup_dir, False, logger=logger) |
| 60 | + else: |
| 61 | + df = pd.DataFrame() |
83 | 62 | return df |
84 | 63 |
|
85 | 64 |
|
@@ -144,16 +123,16 @@ def pull_nhsn_data( |
144 | 123 | """ |
145 | 124 | # Pull data from Socrata API |
146 | 125 | df = ( |
147 | | - pull_data(socrata_token, MAIN_DATASET_ID, logger) |
| 126 | + pull_data(socrata_token, MAIN_DATASET_ID, backup_dir, logger) |
148 | 127 | if not custom_run |
149 | 128 | else pull_data_from_file(backup_dir, issue_date, logger, prelim_flag=False) |
150 | 129 | ) |
151 | 130 |
|
152 | | - keep_columns = list(TYPE_DICT.keys()) |
| 131 | + recently_updated = True if custom_run else check_last_updated(socrata_token, MAIN_DATASET_ID, logger) |
153 | 132 |
|
154 | | - if not df.empty: |
155 | | - create_backup_csv(df, backup_dir, custom_run, logger=logger) |
| 133 | + keep_columns = list(TYPE_DICT.keys()) |
156 | 134 |
|
| 135 | + if not df.empty and recently_updated: |
157 | 136 | df = df.rename(columns={"weekendingdate": "timestamp", "jurisdiction": "geo_id"}) |
158 | 137 |
|
159 | 138 | for signal, col_name in SIGNALS_MAP.items(): |
@@ -209,16 +188,15 @@ def pull_preliminary_nhsn_data( |
209 | 188 | """ |
210 | 189 | # Pull data from Socrata API |
211 | 190 | df = ( |
212 | | - pull_data(socrata_token, PRELIM_DATASET_ID, logger) |
| 191 | + pull_data(socrata_token, PRELIM_DATASET_ID, backup_dir, logger) |
213 | 192 | if not custom_run |
214 | 193 | else pull_data_from_file(backup_dir, issue_date, logger, prelim_flag=True) |
215 | 194 | ) |
216 | 195 |
|
217 | 196 | keep_columns = list(PRELIM_TYPE_DICT.keys()) |
| 197 | + recently_updated = True if custom_run else check_last_updated(socrata_token, PRELIM_DATASET_ID, logger) |
218 | 198 |
|
219 | | - if not df.empty: |
220 | | - create_backup_csv(df, backup_dir, custom_run, sensor="prelim", logger=logger) |
221 | | - |
| 199 | + if not df.empty and recently_updated: |
222 | 200 | df = df.rename(columns={"weekendingdate": "timestamp", "jurisdiction": "geo_id"}) |
223 | 201 |
|
224 | 202 | for signal, col_name in PRELIM_SIGNALS_MAP.items(): |
|
0 commit comments