Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ RUN pixi shell-hook -s bash > /shell-hook

ENV PYTHONUNBUFFERED=1

# below used to prevent qas-xas from writing a log file
ENV TEST=true

COPY default.py .

RUN mkdir /etc/tiled
Expand Down
45 changes: 3 additions & 42 deletions data_validation.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from prefect import task, flow, get_run_logger
import time as ttime
from prefect import task
from tiled.client import from_uri
from bluesky_tiled_plugins.writing.validator import validate
from dotenv import load_dotenv
import os

Expand Down Expand Up @@ -35,45 +33,8 @@ def get_run_migration(uid, api_key=None): # TODO remove after migration is comp


@task(retries=2, retry_delay_seconds=10)
def get_run_processed(uid, api_key=None):
def get_client_processed(api_key=None):
if not api_key:
api_key = get_api_key_from_env()
cl = from_uri("https://tiled.nsls2.bnl.gov", api_key=api_key)
run = cl[f"{BEAMLINE_OR_ENDSTATION}/processed"][uid]
return run


@task(retries=2, retry_delay_seconds=10)
def read_stream(run, stream):
return run[stream].read()


# currently configured to run only one of BTP validation or read all streams checks
@flow
def data_validation(uid, api_key=None, dry_run=False):
logger = get_run_logger()
run_client = get_run_migration(
uid, api_key=api_key
) # replace with get_run() if no SQL database
logger.info(f"Validating uid {run_client.start['uid']}")
start_time = ttime.monotonic()
try:
# the following calls to validate() only work for SQL database-backed catalogs - remove if not available
if dry_run:
validate(
run_client, fix_errors=False, try_reading=True, raise_on_error=True
)
else:
validate(run_client, fix_errors=True, try_reading=True, raise_on_error=True)
except AttributeError:
# check by reading data if not SQL database-backed
run_client = get_run(uid, api_key=api_key) # remove if no SQL database
for stream in run_client:
logger.info(f"{stream}:")
stream_start_time = ttime.monotonic()
stream_data = read_stream(run_client, stream) # noqa: F841
stream_elapsed_time = ttime.monotonic() - stream_start_time
logger.info(f"{stream} elapsed_time = {stream_elapsed_time}")
logger.info(f"{stream} nbytes = {stream_data.nbytes:_}")
elapsed_time = ttime.monotonic() - start_time
logger.info(f"{elapsed_time = }")
return cl[f"{BEAMLINE_OR_ENDSTATION}/processed"]
7 changes: 3 additions & 4 deletions end_of_run_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from prefect.context import FlowRunContext
from prefect.settings import PREFECT_UI_URL

from data_validation import data_validation, get_run, get_run_processed
from data_validation import get_run, get_client_processed

# QAS Application Specific
from xas.process import process_interpolate_bin_with_tiled
Expand Down Expand Up @@ -82,10 +82,9 @@ def log_completion(dry_run=False):
@slack
def end_of_run_workflow(stop_doc, api_key=None, dry_run=False):
uid = stop_doc["run_start"]
data_validation(uid, api_key=api_key, dry_run=dry_run)
# Processing goes here
run = get_run(uid, api_key=api_key)
run_processed = get_run_processed(uid, api_key=api_key)
process_interpolate_bin_with_tiled(run, run_processed)
client_processed = get_client_processed(api_key=api_key)
process_interpolate_bin_with_tiled(run, client_processed)
log_completion(dry_run=dry_run)
return True
Loading
Loading