Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,18 @@ usage limit.
The post processing agent handles cataloging raw and reduced data files in ONCat https://oncat.ornl.gov/ by
calling scripts hosted on the analysis cluster.

##### Image File Cataloging

For instruments that produce image files (e.g., FITS or TIFF format), the agent can automatically discover
and catalog these files along with the main data file. The metadata path where image file locations are
stored can be configured:

"image_filepath_metadata_paths": ["metadata.entry.daslogs.bl10:exp:im:imagefilepath.value"]

If not specified, this defaults to the VENUS instrument metadata path. For other instruments, configure
this parameter to match the appropriate metadata path(s) in your NeXus files. Multiple paths can be specified
as an array.


Installation
------------
Expand Down
2 changes: 1 addition & 1 deletion SPECS/postprocessing.spec
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
%define release 1

Name: %{srcname}
Version: 4.3.0
Version: 4.4.0
Release: %{release}%{?dist}
Summary: %{summary}

Expand Down
12 changes: 2 additions & 10 deletions pixi.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions postprocessing/Configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ def __init__(self, config_file):
self.oncat_url = config.get("oncat_url", "")
self.oncat_api_token = config.get("oncat_api_token", "")

# Image filepath metadata paths for cataloging image files
# Default is for VENUS instrument, but can be configured per instrument
default_image_metadata_paths = ["metadata.entry.daslogs.bl10:exp:im:imagefilepath.value"]
self.image_filepath_metadata_paths = config.get("image_filepath_metadata_paths", default_image_metadata_paths)

sys.path.insert(0, self.sw_dir)
# Configure processor plugins
default_processors = [
Expand Down
63 changes: 63 additions & 0 deletions postprocessing/processors/oncat_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
import pyoncat


# Batch size for image ingestion (must be less than max of 100)
IMAGE_BATCH_SIZE = 50


class ONCatProcessor(BaseProcessor):
"""
Define post-processing task
Expand Down Expand Up @@ -60,6 +64,26 @@ def ingest(self, location):
logging.info("Calling ONCat for %s", related_file)
oncat.Datafile.ingest(related_file)

# Catalog image files using batch API for efficiency
images = image_files(datafile, self.configuration.image_filepath_metadata_paths)
for batch in batches(images, IMAGE_BATCH_SIZE):
logging.info("Batch ingesting %d image files", len(batch))
oncat.Datafile.batch(batch)


def batches(items, size):
"""Yield successive batches of items.

Args:
items: List of items to batch
size: Size of each batch

Yields:
List slices of the specified size
"""
for i in range(0, len(items), size):
yield items[i : i + size]


def related_files(datafile):
"""Given a datafile, return a list of related files to also catalog.
Expand Down Expand Up @@ -89,3 +113,42 @@ def related_files(datafile):
)
if path != location
]


def image_files(datafile, metadata_paths):
"""Find image files from metadata paths.

Iterates through the configured metadata paths, retrieves values from
the datafile metadata, and globs for image files in the discovered
subdirectories.

Args:
datafile: ONCat datafile object with metadata
metadata_paths: List of metadata paths to check for image directory locations

Returns:
List of absolute paths to image files (FITS and TIFF)
"""
facility = datafile.facility
instrument = datafile.instrument
experiment = datafile.experiment
image_file_paths = []

for metadata_path in metadata_paths:
value = datafile.get(metadata_path)
if value is None:
continue

subdirs = value if isinstance(value, list) else [value]

for subdir in subdirs:
full_path = os.path.join("/", facility, instrument, experiment, subdir)

if not os.path.isdir(full_path):
continue

fits_files = glob.glob(os.path.join(full_path, "*.fits"))
tiff_files = glob.glob(os.path.join(full_path, "*.tiff"))
image_file_paths.extend(fits_files + tiff_files)

return image_file_paths
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[project]
name = "postprocessing"
description = "Post-processing agent to automatically catalog and reduce neutron data"
version = "4.3.0"
version = "4.4.0"
requires-python = ">=3.9"
dependencies = [
"requests",
Expand Down
8 changes: 8 additions & 0 deletions tests/integration/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ RUN mkdir -p /opt/postprocessing/log && \
echo "a=1" > /SNS/TOPAZ/shared/autoreduce/reduce_TOPAZ_default.py && \
echo "a=\${value}" > /SNS/TOPAZ/shared/autoreduce/reduce_TOPAZ.py.template && \
\
mkdir -p /SNS/VENUS/IPTS-99999/nexus && \
mkdir -p /SNS/VENUS/IPTS-99999/images && \
mkdir -p /SNS/VENUS/IPTS-99999/shared/autoreduce && \
touch /SNS/VENUS/IPTS-99999/nexus/VENUS_12345.nxs.h5 && \
touch /SNS/VENUS/IPTS-99999/images/image_001.fits && \
touch /SNS/VENUS/IPTS-99999/images/image_002.fits && \
touch /SNS/VENUS/IPTS-99999/images/image_003.tiff && \
\
chown -R root:root /opt/postprocessing /SNS

WORKDIR /opt/postprocessing
Expand Down
10 changes: 9 additions & 1 deletion tests/integration/Dockerfile.oncat
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,13 @@ RUN mkdir -p /SNS/CORELLI/IPTS-15526/nexus && \
mkdir -p /SNS/CORELLI/IPTS-15526/images/det_main && \
touch /SNS/CORELLI/IPTS-15526/images/det_main/CORELLI_29666_det_main_000001.tiff && \
mkdir -p /SNS/CORELLI/IPTS-15526/shared/autoreduce && \
echo '{"output_files":[], "input_files": []}' > /SNS/CORELLI/IPTS-15526/shared/autoreduce/CORELLI_29666.json
echo '{"output_files":[], "input_files": []}' > /SNS/CORELLI/IPTS-15526/shared/autoreduce/CORELLI_29666.json && \
\
mkdir -p /SNS/VENUS/IPTS-99999/nexus && \
mkdir -p /SNS/VENUS/IPTS-99999/images && \
mkdir -p /SNS/VENUS/IPTS-99999/shared/autoreduce && \
touch /SNS/VENUS/IPTS-99999/nexus/VENUS_12345.nxs.h5 && \
touch /SNS/VENUS/IPTS-99999/images/image_001.fits && \
touch /SNS/VENUS/IPTS-99999/images/image_002.fits && \
touch /SNS/VENUS/IPTS-99999/images/image_003.tiff
CMD ["python", "oncat_server.py"]
39 changes: 39 additions & 0 deletions tests/integration/oncat_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,40 @@ def do_POST(self):
self.wfile.write(json.dumps(response).encode("utf-8"))
return

# Handle batch cataloging endpoint
if self.path == "/api/datafiles/batch":
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length)
try:
file_paths = json.loads(body.decode("utf-8"))
if not isinstance(file_paths, list):
self.send_response(400)
self.send_header("Content-type", "application/json")
self.end_headers()
response = {"error": "Expected array of file paths"}
self.wfile.write(json.dumps(response).encode("utf-8"))
return

logging.info("Received batch datafile ingest request for %d files", len(file_paths))
for file_path in file_paths:
logging.info(" - %s", file_path)

# Send success response
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
response = {"ingested": len(file_paths)}
self.wfile.write(json.dumps(response).encode("utf-8"))
return
except Exception as e:
self.send_response(400)
self.send_header("Content-type", "application/json")
self.end_headers()
logging.error("Batch ingestion error: %s", str(e))
response = {"error": str(e)}
self.wfile.write(json.dumps(response).encode("utf-8"))
return

if self.path.startswith("/api/datafiles/"):
location = self.path.replace("/api/datafiles", "").replace("/ingest", "")
logging.info("Received datafile ingest request for %s", location)
Expand Down Expand Up @@ -67,6 +101,11 @@ def do_POST(self):
"experiment": experiment,
"indexed": {"run_number": run_number},
}

# Add metadata for VENUS instrument to support image cataloging
if instrument == "VENUS":
response["metadata"] = {"entry": {"daslogs": {"bl10:exp:im:imagefilepath": {"value": "images"}}}}

self.wfile.write(json.dumps(response).encode("utf-8"))


Expand Down
77 changes: 73 additions & 4 deletions tests/integration/test_cataloging.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,19 @@ def test_oncat_catalog():
# send data ready
conn.send("/queue/CATALOG.ONCAT.DATA_READY", json.dumps(message).encode())

listener.wait_for_message()
# Wait for messages until we get the one for this run
max_attempts = 10
for _ in range(max_attempts):
listener.wait_for_message()
header, body = listener.get_latest_message()
msg = json.loads(body)
if msg["run_number"] == message["run_number"]:
break
else:
pytest.fail(f"Did not receive COMPLETE message for CORELLI run {message['run_number']}")

conn.disconnect()

header, body = listener.get_latest_message()

msg = json.loads(body)
assert msg["run_number"] == message["run_number"]
assert msg["instrument"] == message["instrument"]
assert msg["ipts"] == message["ipts"]
Expand Down Expand Up @@ -101,6 +107,69 @@ def test_oncat_catalog_error():
assert log[-1].endswith("ERROR Invalid path format: /bin/true")


def test_oncat_catalog_venus_images():
"""This should run ONCatProcessor and catalog VENUS image files using batch API"""
message = {
"run_number": "12345",
"instrument": "VENUS",
"ipts": "IPTS-99999",
"facility": "SNS",
"data_file": "/SNS/VENUS/IPTS-99999/nexus/VENUS_12345.nxs.h5",
}

conn = stomp.Connection(host_and_ports=[("localhost", 61613)])

listener = stomp.listener.TestListener(10) # 10 second timeout
conn.set_listener("", listener)

try:
conn.connect("icat", "icat")
except stomp.exception.ConnectFailedException:
pytest.skip("Requires activemq running")

# expect a message on CATALOG.ONCAT.COMPLETE
conn.subscribe("/queue/CATALOG.ONCAT.COMPLETE", id="venus123", ack="auto")

# send data ready
conn.send("/queue/CATALOG.ONCAT.DATA_READY", json.dumps(message).encode())

# Wait for messages until we get the one for this run
max_attempts = 10
for _ in range(max_attempts):
listener.wait_for_message()
header, body = listener.get_latest_message()
msg = json.loads(body)
if msg["run_number"] == message["run_number"]:
break
else:
pytest.fail(f"Did not receive COMPLETE message for VENUS run {message['run_number']}")

conn.disconnect()

assert msg["run_number"] == message["run_number"]
assert msg["instrument"] == message["instrument"]
assert msg["ipts"] == message["ipts"]
assert msg["facility"] == message["facility"]
assert msg["data_file"] == message["data_file"]

time.sleep(1) # give oncat_server time to write its log
log = docker_exec_and_cat("/oncat_server.log", "oncat").splitlines()

# Check that the NeXus file was ingested
assert any(
"INFO Received datafile ingest request for /SNS/VENUS/IPTS-99999/nexus/VENUS_12345.nxs.h5" in line
for line in log
)

# Check that batch ingestion was called with the image files
assert any("INFO Received batch datafile ingest request for 3 files" in line for line in log)

# Verify all three image files were logged
assert any("INFO - /SNS/VENUS/IPTS-99999/images/image_001.fits" in line for line in log)
assert any("INFO - /SNS/VENUS/IPTS-99999/images/image_002.fits" in line for line in log)
assert any("INFO - /SNS/VENUS/IPTS-99999/images/image_003.tiff" in line for line in log)


def test_oncat_reduction_catalog():
"""This should run reduction ONCatProcessor"""
message = {
Expand Down
Loading