1515 }
1616}
1717
18- It will generate data for the range of issue dates corresponding to source data files available in "backup_dir" specified under "common", and store them in batch issue format under "patch_dir":
18+ It will generate data for the range of issue dates corresponding to source data files available in "backup_dir"
19+ specified under "common", and store them in batch issue format under "patch_dir":
1920[name-of-patch]/issue_[issue-date]/nhsn/actual_data_file.csv
2021"""
2122
2223from datetime import datetime
2324from os import makedirs
2425from pathlib import Path
26+ from typing import List
2527
2628from delphi_utils import get_structured_logger , read_params
2729from epiweeks import Week
2830
2931from .run import run_module
3032
3133
32- def group_source_files (source_files ):
34+ def filter_source_files (source_files : List [ Path ] ):
3335 """
34- Group patch files such that each lists contains unique epiweek issue date.
35-
36- This allows for acquisitions break down patches files per unique epiweek
37- NHSN has not been updating their data in a consistent fashion
38- and in order to properly capture all the changes that happened, the patch files needs
39-
36+ Filter patch files such that each element in the list is an unique epiweek with the latest issue date.
4037
4138 Parameters
4239 ----------
4340 source_files
4441
4542 Returns
4643 -------
47- list of list of dates where the inner list represents issue dates with a corresponding weekday
48- the content of list contains issue date with the corresponding weekday
44+ list of issue dates
4945
50- ie:
51- [
52- [datetime.datetime(2024, 9, 9, 0, 0), datetime.datetime(2024, 11, 18, 0, 0)], # (weekday = 0)
53- [datetime.datetime(2024, 11, 20, 0, 0)] # (weekday = 2)
54- ]
55-
56- the index may not represent the weekday integer if the sources files does not have issues dates for all 7 days
5746 """
58- days_in_week = 7
59- patch_list = [[] for _ in range (days_in_week )]
47+ epiweek_dict = dict ()
6048
6149 for file in source_files :
6250 if "prelim" not in file .stem :
6351 current_issue_date = datetime .strptime (file .name .split ("." )[0 ], "%Y%m%d" )
64- weekday = current_issue_date . weekday ( )
65- patch_list [ weekday ]. append ( current_issue_date )
52+ epiweek = Week . fromdate ( current_issue_date )
53+ epiweek_dict [ epiweek ] = file
6654
67- filtered_patch_list = [ lst for lst in patch_list if lst ]
55+ filtered_patch_list = list ( epiweek_dict . values ())
6856 return filtered_patch_list
6957
7058
@@ -74,41 +62,31 @@ def patch(params):
7462
7563 The range of issue dates is specified in params.json using the following keys:
7664 - "patch": Only used for patching data
77- - "start_date": str, YYYY-MM-DD format, first issue date
78- - "end_date": str, YYYY-MM-DD format, last issue date
7965 - "patch_dir": str, directory to write all issues output
8066 """
8167 logger = get_structured_logger ("delphi_nhsn.patch" , filename = params ["common" ]["log_filename" ])
8268
8369 source_files = sorted (Path (params ["common" ]["backup_dir" ]).glob ("*.csv.gz" ))
84-
85- patch_directory_prefix = params ["patch" ]["patch_dir" ]
86- patch_list = group_source_files (source_files )
87- for idx , patch_dates in enumerate (patch_list ):
88- start_issue = patch_dates [0 ]
89- end_issue = patch_dates [- 1 ]
90-
91- patch_directory = f"{ patch_directory_prefix } _{ idx } "
92- params ["patch" ]["patch_dir" ] = patch_directory
93-
94- logger .info (
95- "Starting patching" ,
96- patch_directory = patch_directory ,
97- start_issue = start_issue .strftime ("%Y-%m-%d" ),
98- end_issue = end_issue .strftime ("%Y-%m-%d" ),
99- )
100-
101- makedirs (patch_directory , exist_ok = True )
102-
103- for issue_date in patch_dates :
104- current_issue_ew = Week .fromdate (issue_date )
105- logger .info ("Running issue" , issue_date = issue_date .strftime ("%Y-%m-%d" ))
106- params ["patch" ]["issue_date" ] = issue_date .strftime ("%Y%m%d" )
107- current_issue_dir = f"{ params ['patch' ]['patch_dir' ]} /issue_{ current_issue_ew } /nhsn"
108- makedirs (current_issue_dir , exist_ok = True )
109- params ["common" ]["export_dir" ] = current_issue_dir
110- params ["common" ]["custom_run" ] = True
111- run_module (params , logger )
70+ makedirs (params ["patch" ]["patch_dir" ], exist_ok = True )
71+
72+ logger .info (
73+ "Starting patching" ,
74+ patch_directory = params ["patch" ]["patch_dir" ],
75+ start_issue = source_files [0 ].name .split ("." )[0 ],
76+ end_issue = source_files [- 1 ].name .split ("." )[0 ],
77+ )
78+
79+ patch_list = filter_source_files (source_files )
80+ for file in patch_list :
81+ issue_date = datetime .strptime (file .name .split ("." )[0 ], "%Y%m%d" )
82+ current_issue_ew = Week .fromdate (issue_date )
83+ logger .info ("Running issue" , issue_date = issue_date .strftime ("%Y-%m-%d" ))
84+ params ["patch" ]["issue_date" ] = issue_date .strftime ("%Y%m%d" )
85+ current_issue_dir = f"{ params ['patch' ]['patch_dir' ]} /issue_{ current_issue_ew } /nhsn"
86+ makedirs (current_issue_dir , exist_ok = True )
87+ params ["common" ]["export_dir" ] = current_issue_dir
88+ params ["common" ]["custom_run" ] = True
89+ run_module (params , logger )
11290
11391
11492if __name__ == "__main__" :
0 commit comments