|
| 1 | +from pathlib import Path |
| 2 | +import pendulum |
| 3 | +import zipfile |
| 4 | +import os |
| 5 | + |
| 6 | +from airflow.decorators import dag, task |
| 7 | +from airflow.hooks.subprocess import SubprocessHook |
| 8 | + |
| 9 | +from lxml import etree |
| 10 | + |
| 11 | +from sagerx import create_path, load_df_to_pg |
| 12 | + |
| 13 | +@dag( |
| 14 | + schedule="0 0 10 * *", |
| 15 | + start_date=pendulum.yesterday(), |
| 16 | + catchup=False |
| 17 | +) |
| 18 | +def dailymed(): |
| 19 | + dag_id = "dailymed" |
| 20 | + |
| 21 | + ds_folder = Path("/opt/airflow/dags") / dag_id |
| 22 | + data_folder = Path("/opt/airflow/data") / dag_id |
| 23 | + |
| 24 | + # NOTE: "dm_spl_release_human" accounts for both |
| 25 | + # rx and otc SPLs (but no other types of SPLs) |
| 26 | + # - "dm_spl_release_human_rx" for rx meds only |
| 27 | + # - "dm_spl_release_human_otc" for otc meds only |
| 28 | + # - "dm_spl_release_human_rx_part1" for a given part |
| 29 | + # - "dm_spl_daily_update_MMDDYYYY" for a given date |
| 30 | + # (replace MMDDYYY with your month, day, and year) |
| 31 | + file_set = "dm_spl_release_human_rx" |
| 32 | + |
| 33 | + def connect_to_ftp_dir(ftp_str: str, dir: str): |
| 34 | + import ftplib |
| 35 | + |
| 36 | + ftp = ftplib.FTP(ftp_str) |
| 37 | + ftp.login() |
| 38 | + |
| 39 | + ftp.cwd(dir) |
| 40 | + |
| 41 | + return ftp |
| 42 | + |
| 43 | + def obtain_ftp_file_list(ftp): |
| 44 | + import fnmatch |
| 45 | + |
| 46 | + file_list = [] |
| 47 | + for file in ftp.nlst(): |
| 48 | + if fnmatch.fnmatch(file, f"*{file_set}*"): |
| 49 | + file_list.append(file) |
| 50 | + return file_list |
| 51 | + |
| 52 | + def get_dailymed_files(ftp, file_name: str): |
| 53 | + zip_path = create_path(data_folder) / file_name |
| 54 | + |
| 55 | + with open(zip_path, "wb") as file: |
| 56 | + ftp.retrbinary(f"RETR {file_name}", file.write) |
| 57 | + |
| 58 | + with zipfile.ZipFile(zip_path, "r") as zip_ref: |
| 59 | + zip_ref.extractall(data_folder.with_suffix("")) |
| 60 | + |
| 61 | + os.remove(zip_path) |
| 62 | + |
| 63 | + def transform_xml(input_xml, xslt): |
| 64 | + # load xml input |
| 65 | + dom = etree.parse(input_xml, etree.XMLParser(huge_tree=True)) |
| 66 | + # load XSLT |
| 67 | + xslt_doc = etree.parse(xslt) |
| 68 | + xslt_transformer = etree.XSLT(xslt_doc) |
| 69 | + # apply XSLT on loaded dom |
| 70 | + new_xml = xslt_transformer(dom) |
| 71 | + return etree.tostring(new_xml, pretty_print=True).decode("utf-8") |
| 72 | + |
| 73 | + def load_xml_data(spl_type_data_folder: Path): |
| 74 | + import re |
| 75 | + import pandas as pd |
| 76 | + import sqlalchemy |
| 77 | + |
| 78 | + xslt = ds_folder / "template.xsl" |
| 79 | + |
| 80 | + db_conn_string = os.environ["AIRFLOW_CONN_POSTGRES_DEFAULT"] |
| 81 | + db_conn = sqlalchemy.create_engine(db_conn_string) |
| 82 | + |
| 83 | + data = [] |
| 84 | + for zip_folder in spl_type_data_folder.iterdir(): |
| 85 | + with zipfile.ZipFile(zip_folder) as unzipped_folder: |
| 86 | + zip_file = zip_folder.stem |
| 87 | + set_id = zip_file.split('_')[1] |
| 88 | + for subfile in unzipped_folder.infolist(): |
| 89 | + if re.search("\.xml$", subfile.filename): |
| 90 | + xml_file = subfile.filename |
| 91 | + |
| 92 | + # xslt transform |
| 93 | + temp_xml_file = unzipped_folder.extract(subfile, spl_type_data_folder) |
| 94 | + xml_content = transform_xml(temp_xml_file, xslt) |
| 95 | + os.remove(temp_xml_file) |
| 96 | + |
| 97 | + # append row to the data list |
| 98 | + data.append({"set_id": set_id, "zip_file": zip_file, "xml_file": xml_file, "xml_content": xml_content}) |
| 99 | + |
| 100 | + df = pd.DataFrame( |
| 101 | + data, |
| 102 | + columns=["set_id", "zip_file", "xml_file", "xml_content"], |
| 103 | + ) |
| 104 | + |
| 105 | + load_df_to_pg( |
| 106 | + df, |
| 107 | + schema_name="sagerx_lake", |
| 108 | + table_name="dailymed", |
| 109 | + if_exists="append", # TODO: make this better - maybe don't put stuff in multiple folders? |
| 110 | + index=False, |
| 111 | + ) |
| 112 | + |
| 113 | + @task |
| 114 | + def extract(): |
| 115 | + dailymed_ftp = "public.nlm.nih.gov" |
| 116 | + ftp_dir = "/nlmdata/.dailymed/" |
| 117 | + |
| 118 | + ftp = connect_to_ftp_dir(dailymed_ftp, ftp_dir) |
| 119 | + |
| 120 | + file_list = obtain_ftp_file_list(ftp) |
| 121 | + print(f'Extracting {file_list}') |
| 122 | + |
| 123 | + for file_name in file_list: |
| 124 | + get_dailymed_files(ftp, file_name) |
| 125 | + |
| 126 | + @task |
| 127 | + def load(): |
| 128 | + spl_types = ['prescription', 'otc'] |
| 129 | + |
| 130 | + for spl_type in spl_types: |
| 131 | + spl_type_data_folder = ( |
| 132 | + data_folder |
| 133 | + / spl_type |
| 134 | + ) |
| 135 | + if os.path.exists(spl_type_data_folder): |
| 136 | + print(f'Loading {spl_type} SPLs...') |
| 137 | + load_xml_data(spl_type_data_folder) |
| 138 | + |
| 139 | + |
| 140 | + # Task to transform data using dbt |
| 141 | + @task |
| 142 | + def transform(): |
| 143 | + subprocess = SubprocessHook() |
| 144 | + result = subprocess.run_command(['dbt', 'run', '--select', 'models/staging/dailymed', 'models/intermediate/dailymed'], cwd='/dbt/sagerx') |
| 145 | + print("Result from dbt:", result) |
| 146 | + |
| 147 | + extract() >> load() >> transform() |
| 148 | + |
| 149 | +dailymed() |
0 commit comments