Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion data_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def get_run(uid, api_key=None):

# SQL database-backed - remove if this does not exist on the beamline
@task(retries=2, retry_delay_seconds=10)
def get_run_migration(uid, api_key=None):
def get_run_migration(uid, api_key=None): # TODO remove after migration is complete
if not api_key:
api_key = get_api_key_from_env()
cl = from_uri("https://tiled.nsls2.bnl.gov", api_key=api_key)
Expand Down
68 changes: 67 additions & 1 deletion end_of_run_workflow.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,75 @@
import traceback

from prefect import task, flow, get_run_logger
from prefect.blocks.notifications import SlackWebhook
from prefect.context import FlowRunContext
from prefect.settings import PREFECT_UI_URL

from data_validation import data_validation, get_run, get_run_processed

# QAS Application Specific
from xas.process import process_interpolate_bin_with_tiled

tiled_inst = "https://tiled.nsls2.bnl.gov"
CATALOG_NAME = "qas"


def slack(func):
"""
Send a message to mon-prefect and mon-prefect-spec slack channels if the flow-run failed.
Send a message to mon-prefect-qas slack channel with the flow-run status.
Send a message to mon-bluesky slack channel if the bluesky-run failed.

NOTE: the name of this inner function is the same as the real end_of_workflow() function because
when the decorator is used, Prefect sees the name of this inner function as the name of
the flow. To keep the naming of workflows consistent, the name of this inner function had to match the expected name.
"""

def end_of_run_workflow(stop_doc, api_key=None, dry_run=False):
flow_run_name = FlowRunContext.get().flow_run.dict().get("name")

# Load slack credentials that are saved in Prefect.
mon_prefect = SlackWebhook.load("mon-prefect")
mon_bluesky = SlackWebhook.load("mon-bluesky")
mon_prefect_qas = SlackWebhook.load("mon-prefect-qas")
mon_prefect_spec = SlackWebhook.load("mon-prefect-spec")

# Get the uid.
uid = stop_doc["run_start"]

# Get the scan_id.
run = get_run(uid, api_key=api_key)
scan_id = run.start["scan_id"]

# Send a message to mon-bluesky if bluesky-run failed.
if stop_doc.get("exit_status") == "fail":
mon_bluesky.notify(
f":bangbang: {CATALOG_NAME} bluesky-run failed. (*{flow_run_name}*)\n ```run_start: {uid}\nscan_id: {scan_id}``` ```reason: {stop_doc.get('reason', 'none')}```"
)

try:
result = func(stop_doc, api_key=api_key, dry_run=dry_run)

# Send a message to mon-prefect-qas if flow-run is successful.
message = f":white_check_mark: {CATALOG_NAME} flow-run successful. (*{flow_run_name}*)\n ```run_start: {uid}\nscan_id: {scan_id}```"
mon_prefect_qas.notify(message)
return result
except Exception as e:
tb = traceback.format_exception_only(e)

# Send a message to mon-prefect-qas, mon-prefect if flow-run failed.
message = f":bangbang: {CATALOG_NAME} flow-run failed. (*{flow_run_name}*)\n ```run_start: {uid}\nscan_id: {scan_id}``` ```{tb[-1]}```"
mon_prefect.notify(message)
mon_prefect_qas.notify(message)
flow_run = FlowRunContext.get().flow_run
# Add link to flow-run for the message to mon-prefect-spec.
program_message = (
f":bangbang: {CATALOG_NAME} flow-run failed. <{PREFECT_UI_URL.value()}/flow-runs/"
+ f"flow-run/{flow_run.id}|the flow run link> (*{flow_run_name}*)\n ```run_start: {uid}\nscan_id: {scan_id}``` ```{tb[-1]}```"
)
mon_prefect_spec.notify(program_message)
raise

return end_of_run_workflow


@task
Expand All @@ -14,6 +79,7 @@ def log_completion(dry_run=False):


@flow
@slack
def end_of_run_workflow(stop_doc, api_key=None, dry_run=False):
uid = stop_doc["run_start"]
data_validation(uid, api_key=api_key, dry_run=dry_run)
Expand Down
Loading
Loading