From f907139b870ebfb1a1c58a880a5f229d8bf80188 Mon Sep 17 00:00:00 2001 From: Nolan Gormley Date: Thu, 15 Jun 2023 11:11:36 -0400 Subject: [PATCH 1/5] merge tool mvp --- utils.py | 72 +++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 71 insertions(+), 1 deletion(-) diff --git a/utils.py b/utils.py index 881538e..11564d7 100644 --- a/utils.py +++ b/utils.py @@ -1,10 +1,12 @@ import json import os import re +import pandas as pd from datetime import datetime as dtime from datetime import timedelta from typing import Dict, List from urllib.parse import urlparse +from sqlalchemy import create_engine import marko import pytz @@ -22,6 +24,7 @@ CLEANR = re.compile("<.*?>") FAULT_RECORD_API_URL = os.getenv("FAULT_RECORD_API_URL") +FAULT_RECORD_DB_URI = os.getenv("FAULT_RECORD_DB_URI") SHIB_MEMBER = os.getenv("SHIB_MEMBER") SHIB_FIRST_NAME = os.getenv("SHIB_FIRST_NAME") @@ -30,6 +33,73 @@ SCRAPER_USER_ID = -1 +def merge_lag_faults(): + + global SCRAPER_USER_ID + + if SCRAPER_USER_ID == -1: + SCRAPER_USER_ID = get_or_create_user(SCRAPER_EMAIL, SHIB_FIRST_NAME, SHIB_LAST_NAME, FAULT_RECORD_API_URL) + + FRengine = create_engine('FAULT_RECORD_DB_URI') + query = f""" + SELECT + * + FROM fault_record.records + """ + df = pd.read_sql(query , FRengine) + + # Scrape information needed to merge + regex = r"\*(.*)\* is (\d+) days old - \(last update: (.*)\) (.*)" + df['split_desc'] = df.desc.apply(lambda x: re.findall(regex, x)) + cols = ['scraped_source', 'lag', 'date', 'scraped_desc'] + regex = r"\*(?P.*)\* is (?P\d+) days old - \(last update: (?P.*)\) (?P.*)" + regex_cols = df.desc.str.extract(regex) + for col in cols: + df[col] = regex_cols[col] + + # Drop the cols that don't have the matching data + df = df.drop('DVC_id', axis=1).dropna() + df['lag'] = df.lag.astype(int) + + # Aggregate the faults we want to merge + df_merged = df.groupby(['scraped_source', 'date'])\ + .agg({ + 'first_occurance': 'min', + 'last_occurance': 'max', + 'record_date': 'min', + 'fault_id': 'min', + 'lag': ['max', pd.Series.nunique] + }).reset_index() + + df_merged.columns = ['scraped_source', 'date', 'first_occurance', 'last_occurance', + 'record_date', 'fault_id', 'lag', 'rows'] + + + + # Update the first fault with the range of data + for _, row in df_merged.iterrows(): + if row['rows'] > 1: + desc = df[df['fault_id']==row['fault_id']]['desc'].iloc[0] + url = f"{FAULT_RECORD_API_URL}/api/v1/admin/faults/{row['fault_id']}" + payload = {'published': '', 'user_id': SCRAPER_USER_ID} + payload['name'] = f"Merged lag faults for {row['scraped_source']}" + payload['desc'] = f"Merged faults with lag that grew to {row['lag']} with messages similar to \n {desc}" + payload['first_occurance'] = row['first_occurance'].isoformat() + payload['last_occurance'] = row['last_occurance'].isoformat() + payload['record_date'] = row['record_date'].isoformat() + payload['signals'] = get_signal_ids(desc) + + r = requests.put(url, payload) + if not r.ok: + print(r.json()) + + # Delete the faults that were merged into the first + df_delete = df['fault_id'][~df['fault_id'].isin(df_merged['fault_id'])] + for fault_id in df_delete: + url = f"{FAULT_RECORD_API_URL}/api/v1/admin/faults/{fault_id}" + r = requests.delete(url) + if not r.ok: + print(r.json()) def parse_timestamp(ts: float) -> str: """Convert Slack message timestamp to date @@ -254,7 +324,7 @@ def get_signal_ids(message: str) -> List: """ try: source, signals = extract_source_signal_pair(message) - query_signals_url = get_signals_url(source, signals) + query_signals_url = get_signals_url(source, FAULT_RECORD_API_URL, signals) signals = requests.get(query_signals_url) signal_ids = [sig.get("signal_id") for sig in signals.json()] return signal_ids From 86f9398143adb681550d1942e01be0e35e77571a Mon Sep 17 00:00:00 2001 From: Nolan Gormley Date: Thu, 15 Jun 2023 11:21:15 -0400 Subject: [PATCH 2/5] added logging and run file --- merge_tool.py | 9 +++++++++ utils.py | 9 +++++++-- 2 files changed, 16 insertions(+), 2 deletions(-) create mode 100644 merge_tool.py diff --git a/merge_tool.py b/merge_tool.py new file mode 100644 index 0000000..64294bf --- /dev/null +++ b/merge_tool.py @@ -0,0 +1,9 @@ +from logzero import logger + +from utils import merge_lag_faults + +def main(): + merge_lag_faults() + +if __name__ == "__main__": + main() diff --git a/utils.py b/utils.py index 11564d7..a394704 100644 --- a/utils.py +++ b/utils.py @@ -89,17 +89,22 @@ def merge_lag_faults(): payload['record_date'] = row['record_date'].isoformat() payload['signals'] = get_signal_ids(desc) + logger.info(f"Merging {row['rows']} records into fault# {row['fault_id']}.") r = requests.put(url, payload) + if not r.ok: - print(r.json()) + logger.error(f"Something went wrong when trying to merge fault# {row['fault_id']}: {r.json()}") # Delete the faults that were merged into the first df_delete = df['fault_id'][~df['fault_id'].isin(df_merged['fault_id'])] for fault_id in df_delete: url = f"{FAULT_RECORD_API_URL}/api/v1/admin/faults/{fault_id}" + + logger.info(f"Deleting fault# {row['fault_id']}.") r = requests.delete(url) + if not r.ok: - print(r.json()) + logger.error(f"Something went wrong when trying to delete fault# {fault_id}: {r.json()}") def parse_timestamp(ts: float) -> str: """Convert Slack message timestamp to date From 115e1cf119c721ccb7b8f83f9d088484430752bc Mon Sep 17 00:00:00 2001 From: Nolan Gormley Date: Thu, 15 Jun 2023 13:08:22 -0400 Subject: [PATCH 3/5] fixed sql connection and removed signal processing --- utils.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/utils.py b/utils.py index a394704..d3248ae 100644 --- a/utils.py +++ b/utils.py @@ -40,7 +40,7 @@ def merge_lag_faults(): if SCRAPER_USER_ID == -1: SCRAPER_USER_ID = get_or_create_user(SCRAPER_EMAIL, SHIB_FIRST_NAME, SHIB_LAST_NAME, FAULT_RECORD_API_URL) - FRengine = create_engine('FAULT_RECORD_DB_URI') + FRengine = create_engine(FAULT_RECORD_DB_URI) query = f""" SELECT * @@ -87,7 +87,6 @@ def merge_lag_faults(): payload['first_occurance'] = row['first_occurance'].isoformat() payload['last_occurance'] = row['last_occurance'].isoformat() payload['record_date'] = row['record_date'].isoformat() - payload['signals'] = get_signal_ids(desc) logger.info(f"Merging {row['rows']} records into fault# {row['fault_id']}.") r = requests.put(url, payload) From 343fd53784e779794289dabbf6193913d05308cb Mon Sep 17 00:00:00 2001 From: Nolan Gormley Date: Thu, 15 Jun 2023 13:40:25 -0400 Subject: [PATCH 4/5] added headers to work with shibboleth --- utils.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/utils.py b/utils.py index d3248ae..22c5d8a 100644 --- a/utils.py +++ b/utils.py @@ -75,6 +75,7 @@ def merge_lag_faults(): 'record_date', 'fault_id', 'lag', 'rows'] + headers = {"Content-type": "application/json", "Accept": "text/plain", "Member": SHIB_MEMBER, "givenName": SHIB_FIRST_NAME, "sn": SHIB_LAST_NAME} # Update the first fault with the range of data for _, row in df_merged.iterrows(): @@ -89,7 +90,7 @@ def merge_lag_faults(): payload['record_date'] = row['record_date'].isoformat() logger.info(f"Merging {row['rows']} records into fault# {row['fault_id']}.") - r = requests.put(url, payload) + r = requests.put(url, payload, headers=headers) if not r.ok: logger.error(f"Something went wrong when trying to merge fault# {row['fault_id']}: {r.json()}") @@ -100,7 +101,7 @@ def merge_lag_faults(): url = f"{FAULT_RECORD_API_URL}/api/v1/admin/faults/{fault_id}" logger.info(f"Deleting fault# {row['fault_id']}.") - r = requests.delete(url) + r = requests.delete(url, headers=headers) if not r.ok: logger.error(f"Something went wrong when trying to delete fault# {fault_id}: {r.json()}") From fec8dc6486ae7830c941c32503f848f44f49e826 Mon Sep 17 00:00:00 2001 From: Nolan Gormley Date: Thu, 15 Jun 2023 13:58:53 -0400 Subject: [PATCH 5/5] Fixed request data and delete logging --- utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/utils.py b/utils.py index 22c5d8a..8b1b864 100644 --- a/utils.py +++ b/utils.py @@ -90,7 +90,7 @@ def merge_lag_faults(): payload['record_date'] = row['record_date'].isoformat() logger.info(f"Merging {row['rows']} records into fault# {row['fault_id']}.") - r = requests.put(url, payload, headers=headers) + r = requests.put(url, data=json.dumps(payload), headers=headers) if not r.ok: logger.error(f"Something went wrong when trying to merge fault# {row['fault_id']}: {r.json()}") @@ -100,7 +100,7 @@ def merge_lag_faults(): for fault_id in df_delete: url = f"{FAULT_RECORD_API_URL}/api/v1/admin/faults/{fault_id}" - logger.info(f"Deleting fault# {row['fault_id']}.") + logger.info(f"Deleting fault# {fault_id}.") r = requests.delete(url, headers=headers) if not r.ok: