diff --git a/airflow/dags/atcddd/dag.py b/airflow/dags/atcddd/dag.py new file mode 100644 index 0000000..5c941e9 --- /dev/null +++ b/airflow/dags/atcddd/dag.py @@ -0,0 +1,21 @@ +import pendulum +from airflow.decorators import dag +from atcddd.dag_tasks import extract, load +from common_dag_tasks import transform + +dag_id = "atcddd" + +@dag( + dag_id=dag_id, + schedule_interval="0 3 15 * *", # Runs on the 15th of each month at 3 AM + start_date=pendulum.today('UTC').add(days=-1), + catchup=False +) +def atc_ddd(): + extract_task = extract(dag_id) + load_task = load(extract_task) + transform_task = transform(dag_id, models_subdir=['staging', 'intermediate']) + + extract_task >> load_task >> transform_task + +dag = atc_ddd() \ No newline at end of file diff --git a/airflow/dags/atcddd/dag_tasks.py b/airflow/dags/atcddd/dag_tasks.py new file mode 100644 index 0000000..6b5ea3e --- /dev/null +++ b/airflow/dags/atcddd/dag_tasks.py @@ -0,0 +1,287 @@ +from airflow.decorators import task +import pandas as pd +import requests +from requests.adapters import HTTPAdapter +from bs4 import BeautifulSoup +from concurrent.futures import ThreadPoolExecutor, as_completed +import numpy as np +import logging +from functools import lru_cache +from io import StringIO +from collections import deque +from sagerx import write_json_file, read_json_file, create_path, load_df_to_pg +from common_dag_tasks import get_data_folder + +logger = logging.getLogger(__name__) + + +class ATCScraper: + def __init__(self): + self.base_url = "https://www.whocc.no/atc_ddd_index/" + self.atc_roots = ['A', 'B', 'C', 'D', 'G', 'H', 'J', 'L', 'M', 'N', 'P', 'R', 'S', 'V'] + + self.session = requests.Session() + adapter = HTTPAdapter(pool_connections=20, pool_maxsize=20) + self.session.mount('http://', adapter) + self.session.mount('https://', adapter) + + self.session.headers.update({ + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' + }) + + @lru_cache(maxsize=10000) + def get_atc_level(self, code: str) -> int: + length = len(code) + if length == 1: return 1 + elif length == 3: return 2 + elif length == 4: return 3 + elif length == 5: return 4 # Level 4 pages contain DDD tables for Level 5 codes + elif length == 7: return 5 + else: return -1 + + @lru_cache(maxsize=10000) + def fetch_page(self, code: str) -> str: + url = f"{self.base_url}?code={code}&showdescription=no" + try: + response = self.session.get(url, timeout=20) + response.raise_for_status() + return response.text + except requests.exceptions.RequestException as e: + logger.error(f"Request failed for {code} at {url}: {e}") + return "" + + def parse_subcodes(self, html: str, parent_code: str) -> pd.DataFrame: + if not html: + return pd.DataFrame() + + soup = BeautifulSoup(html, 'html.parser') + content_p = soup.select_one("#content > p:nth-of-type(2)") + + if not content_p: + content_paragraphs = soup.select("#content p") + if len(content_paragraphs) > 1: + content_p = content_paragraphs[1] + elif content_paragraphs: + content_p = content_paragraphs[0] + + if not content_p: + return pd.DataFrame() + + text = content_p.get_text(separator='\n').strip() + if not text: + return pd.DataFrame() + + lines = [line.strip() for line in text.split('\n') if line.strip()] + data = [] + current_parent_level = self.get_atc_level(parent_code) + + for line in lines: + parts = line.split(maxsplit=1) + if not parts: continue + + potential_code = parts[0] + + if not (potential_code and potential_code[0].isalpha() and potential_code.isalnum()): + continue + + code_level = self.get_atc_level(potential_code) + if code_level > current_parent_level and potential_code.startswith(parent_code): + name = parts[1].strip() if len(parts) > 1 else "N/A" + name_parts_cleaned = [] + for word in name.split(): + if word.replace('.', '', 1).replace(',', '', 1).isdigit() and \ + any(char.isdigit() for char in word): + break + name_parts_cleaned.append(word) + name = ' '.join(name_parts_cleaned) + + data.append({ + 'atc_code': potential_code, + 'atc_name': name, + 'atc_level': code_level + }) + + return pd.DataFrame(data) + + def parse_ddd_table(self, html: str, page_code: str) -> pd.DataFrame: + if not html: + return pd.DataFrame() + + soup = BeautifulSoup(html, 'html.parser') + + table = soup.select_one("#content ul li table") + if not table: table = soup.select_one("#content table") + if not table: table = soup.select_one("table") + + if not table: + return pd.DataFrame() + + try: + html_string = str(table) + dfs = pd.read_html(StringIO(html_string), header=0) + + if not dfs: + logger.warning(f"Page {page_code}: pd.read_html found no tables from HTML string.") + return pd.DataFrame() + + df = dfs[0] + + expected_cols_map = { + 'atc code': 'atc_code', 'name': 'atc_name', + 'ddd': 'ddd', 'u': 'uom', 'adm.r': 'adm_route', 'note': 'note' + } + df.columns = df.columns.str.lower().str.strip() + df = df.rename(columns=expected_cols_map) + + if 'atc_code' not in df.columns: + logger.error(f"Page {page_code}: 'atc_code' column MISSING after renaming. Cols: {df.columns.tolist()}") + return pd.DataFrame() + + if 'atc_name' in df.columns: + df['atc_name'] = df['atc_name'].ffill() + df['atc_code'] = df['atc_code'].ffill().astype(str) + + df = df[df['atc_code'].str.len() == 7] + if df.empty: + return pd.DataFrame() + + df = df.replace('', np.nan) + + ddd_columns_to_check = ['ddd', 'uom', 'adm_route', 'note'] + existing_ddd_cols = [col for col in ddd_columns_to_check if col in df.columns] + + if existing_ddd_cols: + df = df.dropna(subset=existing_ddd_cols, how='all') + + return df + + except Exception as e: + logger.error(f"Page {page_code}: Error parsing DDD table: {e}", exc_info=True) + return pd.DataFrame() + + def scrape_code_iterative(self, root_code: str) -> pd.DataFrame: + all_data_for_root = [] + visited_codes = set() + queue = deque([(root_code, 1)]) + max_depth = 6 + + while queue: + current_code, depth = queue.popleft() + + if current_code in visited_codes: continue + visited_codes.add(current_code) + + if depth > max_depth: + logger.warning(f"Max depth {max_depth} reached for {current_code}, stopping branch.") + continue + + try: + html_content = self.fetch_page(current_code) + if not html_content: + continue + + current_atc_level = self.get_atc_level(current_code) + + if current_atc_level in [1, 2, 3]: + subcodes_df = self.parse_subcodes(html_content, current_code) + if not subcodes_df.empty: + for _, row in subcodes_df.iterrows(): + subcode = row['atc_code'] + if subcode not in visited_codes: + queue.append((subcode, depth + 1)) + + elif current_atc_level == 4: + ddd_df = self.parse_ddd_table(html_content, current_code) + if not ddd_df.empty: + all_data_for_root.append(ddd_df) + + except Exception as e: + logger.error(f"Unhandled error scraping {current_code}: {e}", exc_info=True) + + return pd.concat(all_data_for_root, ignore_index=True) if all_data_for_root else pd.DataFrame() + + def scrape_all(self, max_workers: int = 10) -> pd.DataFrame: + all_scraped_data = [] + + logger.info(f"Starting parallel scrape with {max_workers} workers for {len(self.atc_roots)} roots...") + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + future_to_root_code = { + executor.submit(self.scrape_code_iterative, root): root + for root in self.atc_roots + } + + for future in as_completed(future_to_root_code): + root_code_val = future_to_root_code[future] + try: + data_from_root = future.result() + if data_from_root is not None and not data_from_root.empty: + all_scraped_data.append(data_from_root) + logger.info(f"Completed {root_code_val}: {len(data_from_root)} DDD entries found.") + else: + logger.info(f"Completed {root_code_val}: No DDD entries found.") + except Exception as e: + logger.error(f"Error processing root {root_code_val}: {e}") + logger.error(f"Exception from worker for root {root_code_val}", exc_info=True) + + if all_scraped_data: + logger.info("Combining and cleaning all scraped data...") + final_df = pd.concat(all_scraped_data, ignore_index=True) + + essential_cols = ['atc_code', 'atc_name', 'ddd', 'uom', 'adm_route', 'note'] + for col in essential_cols: + if col not in final_df.columns: + final_df[col] = np.nan + final_df = final_df[essential_cols] + + key_cols_for_dedup = ['atc_code'] + for col in ['ddd', 'uom', 'adm_route']: + if col in final_df.columns: + key_cols_for_dedup.append(col) + + final_df = final_df.drop_duplicates(subset=key_cols_for_dedup, keep='first') + final_df = final_df.sort_values('atc_code').reset_index(drop=True) + + logger.info(f"Final combined DataFrame shape: {final_df.shape}") + return final_df + else: + logger.warning("No data was scraped from any ATC root.") + return pd.DataFrame() + + +@task +def extract(dag_id: str) -> str: + logging.info("Starting ATC DDD scraping...") + + scraper = ATCScraper() + atcddd_df = scraper.scrape_all(max_workers=10) + + # Convert DataFrame to records format for JSON storage + results = atcddd_df.to_dict('records') + + data_folder = get_data_folder(dag_id) + file_path = create_path(data_folder) / 'data.json' + file_path_str = file_path.resolve().as_posix() + + write_json_file(file_path_str, results) + + print(f"Extraction Completed! Data saved to file: {file_path_str}") + print(f"Total records scraped: {len(results)}") + + return file_path_str + + +@task +def load(file_path_str: str): + results = read_json_file(file_path_str) + + # Convert back to DataFrame + df = pd.DataFrame(results) + + print(f'DataFrame created of {len(df)} length.') + print(f'Columns: {df.columns.tolist()}') + + # Load to PostgreSQL + load_df_to_pg(df, "sagerx_lake", "atc_ddd", "replace", index=False) + + print(f"Successfully loaded {len(df)} records to sagerx_lake.atc_ddd") \ No newline at end of file diff --git a/dbt/sagerx/models/intermediate/atcddd/_int_atcddd__models.yml b/dbt/sagerx/models/intermediate/atcddd/_int_atcddd__models.yml new file mode 100644 index 0000000..c97363f --- /dev/null +++ b/dbt/sagerx/models/intermediate/atcddd/_int_atcddd__models.yml @@ -0,0 +1,26 @@ +version: 2 + +models: + - name: int_atcddd_clinical_products_to_ddd + columns: + - name: clinical_product_rxcui + - name: clinical_product_name + - name: clinical_product_tty + - name: clinical_product_component_rxcui + - name: clinical_product_compnent_name + - name: clinical_product_component_tty + - name: dose_form_rxcui + - name: dose_form_name + - name: dose_form_tty + - name: ingredient_rxcui + - name: ingredient_name + - name: ingredient_tty + - name: ingredient_component_rxcui + - name: ingredient_component_name + - name: ingredient_component_tty + - name: active + - name: prescribable + - name: ddd + - name: uom + - name: adm_route + - name: note \ No newline at end of file diff --git a/dbt/sagerx/models/intermediate/atcddd/int_atcddd_clinical_products_to_ddd.sql b/dbt/sagerx/models/intermediate/atcddd/int_atcddd_clinical_products_to_ddd.sql new file mode 100644 index 0000000..166a47d --- /dev/null +++ b/dbt/sagerx/models/intermediate/atcddd/int_atcddd_clinical_products_to_ddd.sql @@ -0,0 +1,109 @@ +with route_dose_form_map as ( + -- map adm_route to the dose_form_names in the clinical products table + select 'Chewing gum' as route, 'Chewing Gum' as form + union all select 'Inhal', 'Dry Powder Inhaler' + union all select 'Inhal', 'Inhalation Powder' + union all select 'Inhal', 'Inhalation Solution' + union all select 'Inhal', 'Inhalation Spray' + union all select 'Inhal', 'Inhalation Suspension' + union all select 'Inhal', 'Metered Dose Inhaler' + union all select 'Inhal.aerosol', 'Inhalation Spray' + union all select 'Inhal.aerosol', 'Metered Dose Inhaler' + union all select 'Inhal.powder', 'Dry Powder Inhaler' + union all select 'Inhal.powder', 'Inhalation Powder' + union all select 'Inhal.solution', 'Inhalation Solution' + union all select 'N', 'Metered Dose Nasal Spray' + union all select 'N', 'Nasal Gel' + union all select 'N', 'Nasal Inhalant' + union all select 'N', 'Nasal Powder' + union all select 'N', 'Nasal Solution' + union all select 'N', 'Nasal Spray' + union all select 'N', 'Nasal Suspension' + union all select 'N', 'Powder for Nasal Solution' + union all select 'O', 'Chewable Bar' + union all select 'O', 'Chewable Extended Release Oral Tablet' + union all select 'O', 'Chewable Tablet' + union all select 'O', 'Delayed Release Oral Capsule' + union all select 'O', 'Delayed Release Oral Granules' + union all select 'O', 'Delayed Release Oral Tablet' + union all select 'O', 'Disintegrating Oral Tablet' + union all select 'O', 'Effervescent Oral Tablet' + union all select 'O', 'Extended Release Oral Capsule' + union all select 'O', 'Extended Release Oral Tablet' + union all select 'O', 'Extended Release Suspension' + union all select 'O', 'Granules for Oral Solution' + union all select 'O', 'Granules for Oral Suspension' + union all select 'O', 'Oral Capsule' + union all select 'O', 'Oral Cream' + union all select 'O', 'Oral Film' + union all select 'O', 'Oral Foam' + union all select 'O', 'Oral Gel' + union all select 'O', 'Oral Granules' + union all select 'O', 'Oral Lozenge' + union all select 'O', 'Oral Paste' + union all select 'O', 'Oral Pellet' + union all select 'O', 'Oral Powder' + union all select 'O', 'Oral Solution' + union all select 'O', 'Oral Spray' + union all select 'O', 'Oral Suspension' + union all select 'O', 'Oral Tablet' + union all select 'O', 'Oral Wafer' + union all select 'O', 'Powder for Oral Solution' + union all select 'O', 'Powder for Oral Suspension' + union all select 'O', 'Tablet for Oral Suspension' + union all select 'P', 'Auto-Injector' + union all select 'P', 'Cartridge' + union all select 'P', 'Injectable Solution' + union all select 'P', 'Injectable Suspension' + union all select 'P', 'Injection' + union all select 'P', 'Intraperitoneal Solution' + union all select 'P', 'Jet Injector' + union all select 'P', 'Pen Injector' + union all select 'P', 'Prefilled Syringe' + union all select 'R', 'Enema' + union all select 'R', 'Rectal Cream' + union all select 'R', 'Rectal Foam' + union all select 'R', 'Rectal Gel' + union all select 'R', 'Rectal Ointment' + union all select 'R', 'Rectal Solution' + union all select 'R', 'Rectal Spray' + union all select 'R', 'Rectal Suppository' + union all select 'SL', 'Buccal Film' + union all select 'SL', 'Buccal Tablet' + union all select 'SL', 'Sublingual Film' + union all select 'SL', 'Sublingual Powder' + union all select 'SL', 'Sublingual Tablet' + union all select 'SL', 'Sustained Release Buccal Tablet' + union all select 'SL', 'Mucosal Spray' + union all select 'SL', 'Mucous Membrane Topical Solution' + union all select 'TD', 'Medicated Patch' + union all select 'TD', 'Transdermal System' + union all select 'V', 'Douche' + union all select 'V', 'Vaginal Cream' + union all select 'V', 'Vaginal Gel' + union all select 'V', 'Vaginal Insert' + union all select 'V', 'Vaginal Ointment' + union all select 'V', 'Vaginal System' + union all select 'implant', 'Drug Implant' + union all select 'implant', 'Intrauterine System' + union all select 's.c. implant', 'Drug Implant' + union all select 'urethral', 'Urethral Gel' + union all select 'urethral', 'Urethral Suppository' + union all select 'oral aerosol', 'Oral Spray' +), +ingredient_to_ddd as ( + select * + from {{ ref('stg_atcddd__ingredient_to_ddd') }} +), +clinical_product_to_ddd as ( + select + c.*, + i.ddd, + i.uom, + i.adm_route, + i.note + from {{ ref('int_rxnorm_clinical_products_to_ingredient_components') }} c + join ingredient_to_ddd i on i.rxcui = c.ingredient_component_rxcui + join route_dose_form_map m on i.adm_route = m.route and c.dose_form_name = m.form +) +select * from clinical_product_to_ddd \ No newline at end of file diff --git a/dbt/sagerx/models/staging/atcddd/_atcddd__models.yml b/dbt/sagerx/models/staging/atcddd/_atcddd__models.yml new file mode 100644 index 0000000..7dadce8 --- /dev/null +++ b/dbt/sagerx/models/staging/atcddd/_atcddd__models.yml @@ -0,0 +1,14 @@ +version: 2 + +models: + - name: stg_atcddd__ingredient_to_ddd + description: All RxCUIs that have DDD according to the World Health Organization + columns: + - name: rxcui + - name: atc_code + - name: name + - name: tty + - name: ddd + - name: uom + - name: adm_route + - name: note \ No newline at end of file diff --git a/dbt/sagerx/models/staging/atcddd/_atcddd__sources.yml b/dbt/sagerx/models/staging/atcddd/_atcddd__sources.yml new file mode 100644 index 0000000..9b9cc0f --- /dev/null +++ b/dbt/sagerx/models/staging/atcddd/_atcddd__sources.yml @@ -0,0 +1,7 @@ +version: 2 + +sources: + - name: atc_ddd + schema: sagerx_lake + tables: + - name: atc_ddd \ No newline at end of file diff --git a/dbt/sagerx/models/staging/atcddd/stg_atcddd__ingredient_to_ddd.sql b/dbt/sagerx/models/staging/atcddd/stg_atcddd__ingredient_to_ddd.sql new file mode 100644 index 0000000..e8b436f --- /dev/null +++ b/dbt/sagerx/models/staging/atcddd/stg_atcddd__ingredient_to_ddd.sql @@ -0,0 +1,12 @@ +select + r.rxcui, + a.atc_code, + r.str as name, + r.tty, + a.ddd, + a.uom, + a.adm_route, + a.note +from {{ source('atc_ddd', 'atc_ddd') }} a +join {{ source('rxnorm', 'rxnorm_rxnconso') }} r on a.atc_code = r.code +where r.sab = 'ATC' \ No newline at end of file