From 81ea8f68a893f60f26f8f2a501f4efc3eb11e5c2 Mon Sep 17 00:00:00 2001 From: Darsh Date: Wed, 14 Jan 2026 14:44:13 -0500 Subject: [PATCH 1/4] feat: Add cataloging of images for VENUS instrument --- pixi.lock | 12 +- postprocessing/processors/oncat_processor.py | 67 +++++ pyproject.toml | 2 +- tests/integration/Dockerfile | 8 + tests/integration/oncat_server.py | 39 +++ tests/integration/test_cataloging.py | 57 ++++ .../processors/test_oncat_processor.py | 263 ++++++++++++++++++ 7 files changed, 437 insertions(+), 11 deletions(-) create mode 100644 tests/unit/postprocessing/processors/test_oncat_processor.py diff --git a/pixi.lock b/pixi.lock index df2a79f..0583ae0 100644 --- a/pixi.lock +++ b/pixi.lock @@ -6,8 +6,6 @@ environments: - url: https://conda.anaconda.org/oncat/ indexes: - https://pypi.org/simple - options: - pypi-prerelease-mode: if-necessary-or-explicit packages: linux-64: - conda: https://conda.anaconda.org/conda-forge/linux-64/_libgcc_mutex-0.1-conda_forge.tar.bz2 @@ -303,8 +301,6 @@ environments: - url: https://conda.anaconda.org/oncat/ indexes: - https://pypi.org/simple - options: - pypi-prerelease-mode: if-necessary-or-explicit packages: linux-64: - conda: https://conda.anaconda.org/conda-forge/linux-64/_libgcc_mutex-0.1-conda_forge.tar.bz2 @@ -684,8 +680,6 @@ environments: - url: https://conda.anaconda.org/oncat/ indexes: - https://pypi.org/simple - options: - pypi-prerelease-mode: if-necessary-or-explicit packages: linux-64: - conda: https://conda.anaconda.org/conda-forge/linux-64/_libgcc_mutex-0.1-conda_forge.tar.bz2 @@ -1065,8 +1059,6 @@ environments: - url: https://conda.anaconda.org/oncat/ indexes: - https://pypi.org/simple - options: - pypi-prerelease-mode: if-necessary-or-explicit packages: linux-64: - conda: https://conda.anaconda.org/conda-forge/linux-64/_libgcc_mutex-0.1-conda_forge.tar.bz2 @@ -3483,8 +3475,8 @@ packages: timestamp: 1747339794916 - pypi: ./ name: postprocessing - version: 4.3.0 - sha256: f41b77096b7deb425e5d7d411fad9b0411b40f00879181daae412c1a22e95391 + version: 4.4.0 + sha256: 2fcc06b6becb7bcf57b54c59988fd7ada94406c5e01c09f73c0436387c65725b requires_dist: - requests - stomp-py diff --git a/postprocessing/processors/oncat_processor.py b/postprocessing/processors/oncat_processor.py index 215a5be..fe2c626 100644 --- a/postprocessing/processors/oncat_processor.py +++ b/postprocessing/processors/oncat_processor.py @@ -11,6 +11,15 @@ import pyoncat +# Metadata paths where image file paths may be stored +IMAGE_FILEPATH_METADATA_PATHS = [ + "metadata.entry.daslogs.bl10:exp:im:imagefilepath.value", +] + +# Batch size for image ingestion (must be less than max of 100) +IMAGE_BATCH_SIZE = 50 + + class ONCatProcessor(BaseProcessor): """ Define post-processing task @@ -60,6 +69,26 @@ def ingest(self, location): logging.info("Calling ONCat for %s", related_file) oncat.Datafile.ingest(related_file) + # Catalog image files using batch API for efficiency + images = image_files(datafile) + for batch in batches(images, IMAGE_BATCH_SIZE): + logging.info("Batch ingesting %d image files", len(batch)) + oncat.Datafile.batch(batch) + + +def batches(items, size): + """Yield successive batches of items. + + Args: + items: List of items to batch + size: Size of each batch + + Yields: + List slices of the specified size + """ + for i in range(0, len(items), size): + yield items[i : i + size] + def related_files(datafile): """Given a datafile, return a list of related files to also catalog. @@ -89,3 +118,41 @@ def related_files(datafile): ) if path != location ] + + +def image_files(datafile): + """Find image files from metadata paths. + + Iterates through the known metadata paths, retrieves values from + the datafile metadata, and globs for image files in the discovered + subdirectories. + + Args: + datafile: ONCat datafile object with metadata + + Returns: + List of absolute paths to image files (FITS and TIFF) + """ + facility = datafile.facility + instrument = datafile.instrument + experiment = datafile.experiment + image_file_paths = [] + + for metadata_path in IMAGE_FILEPATH_METADATA_PATHS: + value = datafile.get(metadata_path) + if value is None: + continue + + subdirs = value if isinstance(value, list) else [value] + + for subdir in subdirs: + full_path = os.path.join("/", facility, instrument, experiment, subdir) + + if not os.path.isdir(full_path): + continue + + fits_files = glob.glob(os.path.join(full_path, "*.fits")) + tiff_files = glob.glob(os.path.join(full_path, "*.tiff")) + image_file_paths.extend(fits_files + tiff_files) + + return image_file_paths diff --git a/pyproject.toml b/pyproject.toml index 3cb30b4..b668500 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [project] name = "postprocessing" description = "Post-processing agent to automatically catalog and reduce neutron data" -version = "4.3.0" +version = "4.4.0" requires-python = ">=3.9" dependencies = [ "requests", diff --git a/tests/integration/Dockerfile b/tests/integration/Dockerfile index 39ae752..041439d 100644 --- a/tests/integration/Dockerfile +++ b/tests/integration/Dockerfile @@ -43,6 +43,14 @@ RUN mkdir -p /opt/postprocessing/log && \ echo "a=1" > /SNS/TOPAZ/shared/autoreduce/reduce_TOPAZ_default.py && \ echo "a=\${value}" > /SNS/TOPAZ/shared/autoreduce/reduce_TOPAZ.py.template && \ \ + mkdir -p /SNS/VENUS/IPTS-99999/nexus && \ + mkdir -p /SNS/VENUS/IPTS-99999/images && \ + mkdir -p /SNS/VENUS/IPTS-99999/shared/autoreduce && \ + touch /SNS/VENUS/IPTS-99999/nexus/VENUS_12345.nxs.h5 && \ + touch /SNS/VENUS/IPTS-99999/images/image_001.fits && \ + touch /SNS/VENUS/IPTS-99999/images/image_002.fits && \ + touch /SNS/VENUS/IPTS-99999/images/image_003.tiff && \ + \ chown -R root:root /opt/postprocessing /SNS WORKDIR /opt/postprocessing diff --git a/tests/integration/oncat_server.py b/tests/integration/oncat_server.py index 782fcb4..9eacd44 100644 --- a/tests/integration/oncat_server.py +++ b/tests/integration/oncat_server.py @@ -18,6 +18,40 @@ def do_POST(self): self.wfile.write(json.dumps(response).encode("utf-8")) return + # Handle batch cataloging endpoint + if self.path == "/api/datafiles/batch": + content_length = int(self.headers.get("Content-Length", 0)) + body = self.rfile.read(content_length) + try: + file_paths = json.loads(body.decode("utf-8")) + if not isinstance(file_paths, list): + self.send_response(400) + self.send_header("Content-type", "application/json") + self.end_headers() + response = {"error": "Expected array of file paths"} + self.wfile.write(json.dumps(response).encode("utf-8")) + return + + logging.info("Received batch datafile ingest request for %d files", len(file_paths)) + for file_path in file_paths: + logging.info(" - %s", file_path) + + # Send success response + self.send_response(200) + self.send_header("Content-type", "application/json") + self.end_headers() + response = {"ingested": len(file_paths)} + self.wfile.write(json.dumps(response).encode("utf-8")) + return + except Exception as e: + self.send_response(400) + self.send_header("Content-type", "application/json") + self.end_headers() + logging.error("Batch ingestion error: %s", str(e)) + response = {"error": str(e)} + self.wfile.write(json.dumps(response).encode("utf-8")) + return + if self.path.startswith("/api/datafiles/"): location = self.path.replace("/api/datafiles", "").replace("/ingest", "") logging.info("Received datafile ingest request for %s", location) @@ -67,6 +101,11 @@ def do_POST(self): "experiment": experiment, "indexed": {"run_number": run_number}, } + + # Add metadata for VENUS instrument to support image cataloging + if instrument == "VENUS": + response["metadata"] = {"entry": {"daslogs": {"bl10:exp:im:imagefilepath": {"value": "images"}}}} + self.wfile.write(json.dumps(response).encode("utf-8")) diff --git a/tests/integration/test_cataloging.py b/tests/integration/test_cataloging.py index 02dcf7f..1d22bea 100644 --- a/tests/integration/test_cataloging.py +++ b/tests/integration/test_cataloging.py @@ -101,6 +101,63 @@ def test_oncat_catalog_error(): assert log[-1].endswith("ERROR Invalid path format: /bin/true") +def test_oncat_catalog_venus_images(): + """This should run ONCatProcessor and catalog VENUS image files using batch API""" + message = { + "run_number": "12345", + "instrument": "VENUS", + "ipts": "IPTS-99999", + "facility": "SNS", + "data_file": "/SNS/VENUS/IPTS-99999/nexus/VENUS_12345.nxs.h5", + } + + conn = stomp.Connection(host_and_ports=[("localhost", 61613)]) + + listener = stomp.listener.TestListener() + conn.set_listener("", listener) + + try: + conn.connect("icat", "icat") + except stomp.exception.ConnectFailedException: + pytest.skip("Requires activemq running") + + # expect a message on CATALOG.ONCAT.COMPLETE + conn.subscribe("/queue/CATALOG.ONCAT.COMPLETE", id="123", ack="auto") + + # send data ready + conn.send("/queue/CATALOG.ONCAT.DATA_READY", json.dumps(message).encode()) + + listener.wait_for_message() + + conn.disconnect() + + header, body = listener.get_latest_message() + + msg = json.loads(body) + assert msg["run_number"] == message["run_number"] + assert msg["instrument"] == message["instrument"] + assert msg["ipts"] == message["ipts"] + assert msg["facility"] == message["facility"] + assert msg["data_file"] == message["data_file"] + + time.sleep(1) # give oncat_server time to write its log + log = docker_exec_and_cat("/oncat_server.log", "oncat").splitlines() + + # Check that the NeXus file was ingested + assert any( + "INFO Received datafile ingest request for /SNS/VENUS/IPTS-99999/nexus/VENUS_12345.nxs.h5" in line + for line in log + ) + + # Check that batch ingestion was called with the image files + assert any("INFO Received batch datafile ingest request for 3 files" in line for line in log) + + # Verify all three image files were logged + assert any("INFO - /SNS/VENUS/IPTS-99999/images/image_001.fits" in line for line in log) + assert any("INFO - /SNS/VENUS/IPTS-99999/images/image_002.fits" in line for line in log) + assert any("INFO - /SNS/VENUS/IPTS-99999/images/image_003.tiff" in line for line in log) + + def test_oncat_reduction_catalog(): """This should run reduction ONCatProcessor""" message = { diff --git a/tests/unit/postprocessing/processors/test_oncat_processor.py b/tests/unit/postprocessing/processors/test_oncat_processor.py new file mode 100644 index 0000000..e9c84f5 --- /dev/null +++ b/tests/unit/postprocessing/processors/test_oncat_processor.py @@ -0,0 +1,263 @@ +from unittest.mock import Mock, patch + +from postprocessing.processors.oncat_processor import ( + ONCatProcessor, + batches, + related_files, + image_files, +) + + +def test_batches_empty_list(): + """Test batches function with empty list""" + result = list(batches([], 50)) + assert result == [] + + +def test_batches_single_batch(): + """Test batches function with items that fit in one batch""" + items = list(range(10)) + result = list(batches(items, 50)) + assert len(result) == 1 + assert result[0] == items + + +def test_batches_multiple_batches(): + """Test batches function with items that require multiple batches""" + items = list(range(125)) + result = list(batches(items, 50)) + assert len(result) == 3 + assert result[0] == list(range(0, 50)) + assert result[1] == list(range(50, 100)) + assert result[2] == list(range(100, 125)) + + +def test_batches_exact_multiple(): + """Test batches function when items are exact multiple of batch size""" + items = list(range(100)) + result = list(batches(items, 50)) + assert len(result) == 2 + assert result[0] == list(range(0, 50)) + assert result[1] == list(range(50, 100)) + + +def test_related_files_no_run_number(): + """Test related_files when datafile has no run_number""" + mock_datafile = Mock() + mock_datafile.get.return_value = None + + result = related_files(mock_datafile) + assert result == [] + + +def test_related_files_with_run_number(): + """Test related_files finds matching files""" + mock_datafile = Mock() + mock_datafile.location = "/SNS/CORELLI/IPTS-15526/nexus/CORELLI_29666.nxs.h5" + mock_datafile.facility = "SNS" + mock_datafile.instrument = "CORELLI" + mock_datafile.experiment = "IPTS-15526" + mock_datafile.get.return_value = "29666" + + with patch("glob.glob") as mock_glob: + mock_glob.return_value = [ + "/SNS/CORELLI/IPTS-15526/images/det_main/CORELLI_29666_det_main_000001.tiff", + "/SNS/CORELLI/IPTS-15526/nexus/CORELLI_29666.nxs.h5", # This should be excluded + ] + + result = related_files(mock_datafile) + + assert len(result) == 1 + assert "/SNS/CORELLI/IPTS-15526/images/det_main/CORELLI_29666_det_main_000001.tiff" in result + assert mock_datafile.location not in result + + +def test_image_files_no_metadata(): + """Test image_files when metadata path doesn't exist""" + mock_datafile = Mock() + mock_datafile.facility = "SNS" + mock_datafile.instrument = "VENUS" + mock_datafile.experiment = "IPTS-99999" + mock_datafile.get.return_value = None # No metadata found + + result = image_files(mock_datafile) + assert result == [] + + +def test_image_files_metadata_not_a_directory(): + """Test image_files when metadata points to non-existent directory""" + mock_datafile = Mock() + mock_datafile.facility = "SNS" + mock_datafile.instrument = "VENUS" + mock_datafile.experiment = "IPTS-99999" + mock_datafile.get.return_value = "images" + + with patch("os.path.isdir") as mock_isdir: + mock_isdir.return_value = False + + result = image_files(mock_datafile) + assert result == [] + + +def test_image_files_single_directory(): + """Test image_files with single directory containing FITS and TIFF files""" + mock_datafile = Mock() + mock_datafile.facility = "SNS" + mock_datafile.instrument = "VENUS" + mock_datafile.experiment = "IPTS-99999" + mock_datafile.get.return_value = "images" + + with patch("os.path.isdir") as mock_isdir, patch("glob.glob") as mock_glob: + mock_isdir.return_value = True + + def glob_side_effect(pattern): + if pattern.endswith("*.fits"): + return [ + "/SNS/VENUS/IPTS-99999/images/image_001.fits", + "/SNS/VENUS/IPTS-99999/images/image_002.fits", + ] + elif pattern.endswith("*.tiff"): + return ["/SNS/VENUS/IPTS-99999/images/image_003.tiff"] + return [] + + mock_glob.side_effect = glob_side_effect + + result = image_files(mock_datafile) + + assert len(result) == 3 + assert "/SNS/VENUS/IPTS-99999/images/image_001.fits" in result + assert "/SNS/VENUS/IPTS-99999/images/image_002.fits" in result + assert "/SNS/VENUS/IPTS-99999/images/image_003.tiff" in result + + +def test_image_files_multiple_directories(): + """Test image_files with multiple directories (list of subdirectories)""" + mock_datafile = Mock() + mock_datafile.facility = "SNS" + mock_datafile.instrument = "VENUS" + mock_datafile.experiment = "IPTS-99999" + mock_datafile.get.return_value = ["images/batch1", "images/batch2"] + + with patch("os.path.isdir") as mock_isdir, patch("glob.glob") as mock_glob: + mock_isdir.return_value = True + + def glob_side_effect(pattern): + if "batch1" in pattern and pattern.endswith("*.fits"): + return ["/SNS/VENUS/IPTS-99999/images/batch1/image_001.fits"] + elif "batch2" in pattern and pattern.endswith("*.tiff"): + return ["/SNS/VENUS/IPTS-99999/images/batch2/image_002.tiff"] + return [] + + mock_glob.side_effect = glob_side_effect + + result = image_files(mock_datafile) + + assert len(result) == 2 + assert "/SNS/VENUS/IPTS-99999/images/batch1/image_001.fits" in result + assert "/SNS/VENUS/IPTS-99999/images/batch2/image_002.tiff" in result + + +def test_oncat_processor_ingest_with_images(): + """Test ONCatProcessor.ingest method catalogs images using batch API""" + test_message = { + "run_number": "12345", + "instrument": "VENUS", + "ipts": "IPTS-99999", + "facility": "SNS", + "data_file": "/SNS/VENUS/IPTS-99999/nexus/VENUS_12345.nxs.h5", + } + + mock_conf = Mock() + mock_conf.oncat_url = "http://oncat:8000" + mock_conf.oncat_api_token = "test-token" + + mock_send_function = Mock() + + with patch("postprocessing.processors.base_processor.open", create=True), patch( + "postprocessing.processors.oncat_processor.pyoncat.ONCat" + ) as mock_oncat_class, patch("postprocessing.processors.oncat_processor.related_files") as mock_related, patch( + "postprocessing.processors.oncat_processor.image_files" + ) as mock_images: + # Setup mocks + mock_oncat = Mock() + mock_oncat_class.return_value = mock_oncat + + mock_datafile = Mock() + mock_oncat.Datafile.ingest.return_value = mock_datafile + + mock_related.return_value = [] + mock_images.return_value = [ + "/SNS/VENUS/IPTS-99999/images/image_001.fits", + "/SNS/VENUS/IPTS-99999/images/image_002.fits", + "/SNS/VENUS/IPTS-99999/images/image_003.tiff", + ] + + # Create processor and call ingest + processor = ONCatProcessor(test_message, mock_conf, mock_send_function) + processor.ingest(test_message["data_file"]) + + # Verify ONCat was initialized correctly + mock_oncat_class.assert_called_once_with( + "http://oncat:8000", + api_token="test-token", + ) + + # Verify the main file was ingested + mock_oncat.Datafile.ingest.assert_called_once() + + # Verify batch was called with the image files + mock_oncat.Datafile.batch.assert_called_once_with( + [ + "/SNS/VENUS/IPTS-99999/images/image_001.fits", + "/SNS/VENUS/IPTS-99999/images/image_002.fits", + "/SNS/VENUS/IPTS-99999/images/image_003.tiff", + ] + ) + + +def test_oncat_processor_ingest_with_many_images(): + """Test ONCatProcessor.ingest batches large number of images correctly""" + test_message = { + "run_number": "12345", + "instrument": "VENUS", + "ipts": "IPTS-99999", + "facility": "SNS", + "data_file": "/SNS/VENUS/IPTS-99999/nexus/VENUS_12345.nxs.h5", + } + + mock_conf = Mock() + mock_conf.oncat_url = "http://oncat:8000" + mock_conf.oncat_api_token = "test-token" + + mock_send_function = Mock() + + # Create 125 image files (should be split into 3 batches: 50, 50, 25) + many_images = [f"/SNS/VENUS/IPTS-99999/images/image_{i:04d}.fits" for i in range(125)] + + with patch("postprocessing.processors.base_processor.open", create=True), patch( + "postprocessing.processors.oncat_processor.pyoncat.ONCat" + ) as mock_oncat_class, patch("postprocessing.processors.oncat_processor.related_files") as mock_related, patch( + "postprocessing.processors.oncat_processor.image_files" + ) as mock_images: + # Setup mocks + mock_oncat = Mock() + mock_oncat_class.return_value = mock_oncat + + mock_datafile = Mock() + mock_oncat.Datafile.ingest.return_value = mock_datafile + + mock_related.return_value = [] + mock_images.return_value = many_images + + # Create processor and call ingest + processor = ONCatProcessor(test_message, mock_conf, mock_send_function) + processor.ingest(test_message["data_file"]) + + # Verify batch was called 3 times + assert mock_oncat.Datafile.batch.call_count == 3 + + # Verify batch sizes + calls = mock_oncat.Datafile.batch.call_args_list + assert len(calls[0][0][0]) == 50 + assert len(calls[1][0][0]) == 50 + assert len(calls[2][0][0]) == 25 From 6b9be59a95a21ab3157754faf0c966a415433433 Mon Sep 17 00:00:00 2001 From: Darsh Date: Thu, 15 Jan 2026 10:37:28 -0500 Subject: [PATCH 2/4] Fix integration test failures --- tests/integration/test_cataloging.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/tests/integration/test_cataloging.py b/tests/integration/test_cataloging.py index 1d22bea..876c797 100644 --- a/tests/integration/test_cataloging.py +++ b/tests/integration/test_cataloging.py @@ -127,13 +127,23 @@ def test_oncat_catalog_venus_images(): # send data ready conn.send("/queue/CATALOG.ONCAT.DATA_READY", json.dumps(message).encode()) - listener.wait_for_message() + # Wait for the correct message, skipping any stale messages from previous tests + max_attempts = 10 + for attempt in range(max_attempts): + listener.wait_for_message(timeout=5) + header, body = listener.get_latest_message() + msg = json.loads(body) - conn.disconnect() + # Check if this is our VENUS message + if msg.get("instrument") == "VENUS" and msg.get("run_number") == "12345": + break - header, body = listener.get_latest_message() + # If not our message, keep waiting for the next one + if attempt == max_attempts - 1: + pytest.fail(f"Did not receive VENUS message after {max_attempts} attempts. Last message: {msg}") + + conn.disconnect() - msg = json.loads(body) assert msg["run_number"] == message["run_number"] assert msg["instrument"] == message["instrument"] assert msg["ipts"] == message["ipts"] From c1e0d50b1be523883915efce21326867a5eb11de Mon Sep 17 00:00:00 2001 From: Darsh Date: Fri, 16 Jan 2026 09:44:14 -0500 Subject: [PATCH 3/4] Another fix for integration tests --- tests/integration/test_cataloging.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_cataloging.py b/tests/integration/test_cataloging.py index 876c797..a196538 100644 --- a/tests/integration/test_cataloging.py +++ b/tests/integration/test_cataloging.py @@ -130,7 +130,8 @@ def test_oncat_catalog_venus_images(): # Wait for the correct message, skipping any stale messages from previous tests max_attempts = 10 for attempt in range(max_attempts): - listener.wait_for_message(timeout=5) + time.sleep(0.5) # Wait briefly for message + listener.wait_for_message() header, body = listener.get_latest_message() msg = json.loads(body) From 6062c283de800605d2a15d7fed7da969ed6fad90 Mon Sep 17 00:00:00 2001 From: Darsh Date: Tue, 20 Jan 2026 12:58:00 -0500 Subject: [PATCH 4/4] Make image filepath metadata PV configurable --- README.md | 12 +++++++ SPECS/postprocessing.spec | 2 +- postprocessing/Configuration.py | 5 +++ postprocessing/processors/oncat_processor.py | 14 +++----- tests/integration/Dockerfile.oncat | 10 +++++- tests/integration/test_cataloging.py | 33 ++++++++++--------- .../processors/test_oncat_processor.py | 14 +++++--- .../unit/postprocessing/test_Configuration.py | 33 +++++++++++++++++++ 8 files changed, 92 insertions(+), 31 deletions(-) diff --git a/README.md b/README.md index aaff166..6272f97 100644 --- a/README.md +++ b/README.md @@ -87,6 +87,18 @@ usage limit. The post processing agent handles cataloging raw and reduced data files in ONCat https://oncat.ornl.gov/ by calling scripts hosted on the analysis cluster. +##### Image File Cataloging + +For instruments that produce image files (e.g., FITS or TIFF format), the agent can automatically discover +and catalog these files along with the main data file. The metadata path where image file locations are +stored can be configured: + + "image_filepath_metadata_paths": ["metadata.entry.daslogs.bl10:exp:im:imagefilepath.value"] + +If not specified, this defaults to the VENUS instrument metadata path. For other instruments, configure +this parameter to match the appropriate metadata path(s) in your NeXus files. Multiple paths can be specified +as an array. + Installation ------------ diff --git a/SPECS/postprocessing.spec b/SPECS/postprocessing.spec index 87632dd..882813c 100644 --- a/SPECS/postprocessing.spec +++ b/SPECS/postprocessing.spec @@ -3,7 +3,7 @@ %define release 1 Name: %{srcname} -Version: 4.3.0 +Version: 4.4.0 Release: %{release}%{?dist} Summary: %{summary} diff --git a/postprocessing/Configuration.py b/postprocessing/Configuration.py index 86cdf75..2cdd385 100644 --- a/postprocessing/Configuration.py +++ b/postprocessing/Configuration.py @@ -91,6 +91,11 @@ def __init__(self, config_file): self.oncat_url = config.get("oncat_url", "") self.oncat_api_token = config.get("oncat_api_token", "") + # Image filepath metadata paths for cataloging image files + # Default is for VENUS instrument, but can be configured per instrument + default_image_metadata_paths = ["metadata.entry.daslogs.bl10:exp:im:imagefilepath.value"] + self.image_filepath_metadata_paths = config.get("image_filepath_metadata_paths", default_image_metadata_paths) + sys.path.insert(0, self.sw_dir) # Configure processor plugins default_processors = [ diff --git a/postprocessing/processors/oncat_processor.py b/postprocessing/processors/oncat_processor.py index fe2c626..6b12fce 100644 --- a/postprocessing/processors/oncat_processor.py +++ b/postprocessing/processors/oncat_processor.py @@ -11,11 +11,6 @@ import pyoncat -# Metadata paths where image file paths may be stored -IMAGE_FILEPATH_METADATA_PATHS = [ - "metadata.entry.daslogs.bl10:exp:im:imagefilepath.value", -] - # Batch size for image ingestion (must be less than max of 100) IMAGE_BATCH_SIZE = 50 @@ -70,7 +65,7 @@ def ingest(self, location): oncat.Datafile.ingest(related_file) # Catalog image files using batch API for efficiency - images = image_files(datafile) + images = image_files(datafile, self.configuration.image_filepath_metadata_paths) for batch in batches(images, IMAGE_BATCH_SIZE): logging.info("Batch ingesting %d image files", len(batch)) oncat.Datafile.batch(batch) @@ -120,15 +115,16 @@ def related_files(datafile): ] -def image_files(datafile): +def image_files(datafile, metadata_paths): """Find image files from metadata paths. - Iterates through the known metadata paths, retrieves values from + Iterates through the configured metadata paths, retrieves values from the datafile metadata, and globs for image files in the discovered subdirectories. Args: datafile: ONCat datafile object with metadata + metadata_paths: List of metadata paths to check for image directory locations Returns: List of absolute paths to image files (FITS and TIFF) @@ -138,7 +134,7 @@ def image_files(datafile): experiment = datafile.experiment image_file_paths = [] - for metadata_path in IMAGE_FILEPATH_METADATA_PATHS: + for metadata_path in metadata_paths: value = datafile.get(metadata_path) if value is None: continue diff --git a/tests/integration/Dockerfile.oncat b/tests/integration/Dockerfile.oncat index 47f3967..6fdb07a 100644 --- a/tests/integration/Dockerfile.oncat +++ b/tests/integration/Dockerfile.oncat @@ -5,5 +5,13 @@ RUN mkdir -p /SNS/CORELLI/IPTS-15526/nexus && \ mkdir -p /SNS/CORELLI/IPTS-15526/images/det_main && \ touch /SNS/CORELLI/IPTS-15526/images/det_main/CORELLI_29666_det_main_000001.tiff && \ mkdir -p /SNS/CORELLI/IPTS-15526/shared/autoreduce && \ - echo '{"output_files":[], "input_files": []}' > /SNS/CORELLI/IPTS-15526/shared/autoreduce/CORELLI_29666.json + echo '{"output_files":[], "input_files": []}' > /SNS/CORELLI/IPTS-15526/shared/autoreduce/CORELLI_29666.json && \ + \ + mkdir -p /SNS/VENUS/IPTS-99999/nexus && \ + mkdir -p /SNS/VENUS/IPTS-99999/images && \ + mkdir -p /SNS/VENUS/IPTS-99999/shared/autoreduce && \ + touch /SNS/VENUS/IPTS-99999/nexus/VENUS_12345.nxs.h5 && \ + touch /SNS/VENUS/IPTS-99999/images/image_001.fits && \ + touch /SNS/VENUS/IPTS-99999/images/image_002.fits && \ + touch /SNS/VENUS/IPTS-99999/images/image_003.tiff CMD ["python", "oncat_server.py"] diff --git a/tests/integration/test_cataloging.py b/tests/integration/test_cataloging.py index a196538..7f426a2 100644 --- a/tests/integration/test_cataloging.py +++ b/tests/integration/test_cataloging.py @@ -31,13 +31,19 @@ def test_oncat_catalog(): # send data ready conn.send("/queue/CATALOG.ONCAT.DATA_READY", json.dumps(message).encode()) - listener.wait_for_message() + # Wait for messages until we get the one for this run + max_attempts = 10 + for _ in range(max_attempts): + listener.wait_for_message() + header, body = listener.get_latest_message() + msg = json.loads(body) + if msg["run_number"] == message["run_number"]: + break + else: + pytest.fail(f"Did not receive COMPLETE message for CORELLI run {message['run_number']}") conn.disconnect() - header, body = listener.get_latest_message() - - msg = json.loads(body) assert msg["run_number"] == message["run_number"] assert msg["instrument"] == message["instrument"] assert msg["ipts"] == message["ipts"] @@ -113,7 +119,7 @@ def test_oncat_catalog_venus_images(): conn = stomp.Connection(host_and_ports=[("localhost", 61613)]) - listener = stomp.listener.TestListener() + listener = stomp.listener.TestListener(10) # 10 second timeout conn.set_listener("", listener) try: @@ -122,26 +128,21 @@ def test_oncat_catalog_venus_images(): pytest.skip("Requires activemq running") # expect a message on CATALOG.ONCAT.COMPLETE - conn.subscribe("/queue/CATALOG.ONCAT.COMPLETE", id="123", ack="auto") + conn.subscribe("/queue/CATALOG.ONCAT.COMPLETE", id="venus123", ack="auto") # send data ready conn.send("/queue/CATALOG.ONCAT.DATA_READY", json.dumps(message).encode()) - # Wait for the correct message, skipping any stale messages from previous tests + # Wait for messages until we get the one for this run max_attempts = 10 - for attempt in range(max_attempts): - time.sleep(0.5) # Wait briefly for message + for _ in range(max_attempts): listener.wait_for_message() header, body = listener.get_latest_message() msg = json.loads(body) - - # Check if this is our VENUS message - if msg.get("instrument") == "VENUS" and msg.get("run_number") == "12345": + if msg["run_number"] == message["run_number"]: break - - # If not our message, keep waiting for the next one - if attempt == max_attempts - 1: - pytest.fail(f"Did not receive VENUS message after {max_attempts} attempts. Last message: {msg}") + else: + pytest.fail(f"Did not receive COMPLETE message for VENUS run {message['run_number']}") conn.disconnect() diff --git a/tests/unit/postprocessing/processors/test_oncat_processor.py b/tests/unit/postprocessing/processors/test_oncat_processor.py index e9c84f5..b670137 100644 --- a/tests/unit/postprocessing/processors/test_oncat_processor.py +++ b/tests/unit/postprocessing/processors/test_oncat_processor.py @@ -80,7 +80,8 @@ def test_image_files_no_metadata(): mock_datafile.experiment = "IPTS-99999" mock_datafile.get.return_value = None # No metadata found - result = image_files(mock_datafile) + metadata_paths = ["metadata.entry.daslogs.bl10:exp:im:imagefilepath.value"] + result = image_files(mock_datafile, metadata_paths) assert result == [] @@ -95,7 +96,8 @@ def test_image_files_metadata_not_a_directory(): with patch("os.path.isdir") as mock_isdir: mock_isdir.return_value = False - result = image_files(mock_datafile) + metadata_paths = ["metadata.entry.daslogs.bl10:exp:im:imagefilepath.value"] + result = image_files(mock_datafile, metadata_paths) assert result == [] @@ -122,7 +124,8 @@ def glob_side_effect(pattern): mock_glob.side_effect = glob_side_effect - result = image_files(mock_datafile) + metadata_paths = ["metadata.entry.daslogs.bl10:exp:im:imagefilepath.value"] + result = image_files(mock_datafile, metadata_paths) assert len(result) == 3 assert "/SNS/VENUS/IPTS-99999/images/image_001.fits" in result @@ -150,7 +153,8 @@ def glob_side_effect(pattern): mock_glob.side_effect = glob_side_effect - result = image_files(mock_datafile) + metadata_paths = ["metadata.entry.daslogs.bl10:exp:im:imagefilepath.value"] + result = image_files(mock_datafile, metadata_paths) assert len(result) == 2 assert "/SNS/VENUS/IPTS-99999/images/batch1/image_001.fits" in result @@ -170,6 +174,7 @@ def test_oncat_processor_ingest_with_images(): mock_conf = Mock() mock_conf.oncat_url = "http://oncat:8000" mock_conf.oncat_api_token = "test-token" + mock_conf.image_filepath_metadata_paths = ["metadata.entry.daslogs.bl10:exp:im:imagefilepath.value"] mock_send_function = Mock() @@ -228,6 +233,7 @@ def test_oncat_processor_ingest_with_many_images(): mock_conf = Mock() mock_conf.oncat_url = "http://oncat:8000" mock_conf.oncat_api_token = "test-token" + mock_conf.image_filepath_metadata_paths = ["metadata.entry.daslogs.bl10:exp:im:imagefilepath.value"] mock_send_function = Mock() diff --git a/tests/unit/postprocessing/test_Configuration.py b/tests/unit/postprocessing/test_Configuration.py index ab251f1..ca790a6 100644 --- a/tests/unit/postprocessing/test_Configuration.py +++ b/tests/unit/postprocessing/test_Configuration.py @@ -36,6 +36,39 @@ def test_log_configuration(self, data_server, test_logger): log_contents = open(test_logger.log_file).read() assert "LOCAL execution" in log_contents + def test_image_filepath_metadata_paths_default(self, data_server): + """Test that image_filepath_metadata_paths has correct default value""" + conf = Configuration(data_server.path_to("post_processing.conf")) + # Should use default VENUS metadata path + assert conf.image_filepath_metadata_paths == ["metadata.entry.daslogs.bl10:exp:im:imagefilepath.value"] + + def test_image_filepath_metadata_paths_custom(self, tmp_path): + """Test that image_filepath_metadata_paths can be configured""" + custom_paths = [ + "metadata.entry.daslogs.custom:path:1.value", + "metadata.entry.daslogs.custom:path:2.value", + ] + config_data = { + "failover_uri": "failover:(tcp://localhost:61613)", + "brokers": [["localhost", 61613]], + "amq_user": "test", + "amq_pwd": "test", + "sw_dir": "/tmp", + "log_file": "/tmp/test.log", + "postprocess_error": "ERROR", + "reduction_started": "STARTED", + "reduction_complete": "COMPLETE", + "reduction_error": "ERROR", + "reduction_disabled": "DISABLED", + "heart_beat": "/topic/HEARTBEAT", + "image_filepath_metadata_paths": custom_paths, + } + tmp_conf_file = tmp_path / "test_config.conf" + tmp_conf_file.write_text(json.dumps(config_data)) + + conf = Configuration(tmp_conf_file.as_posix()) + assert conf.image_filepath_metadata_paths == custom_paths + def test_read_configuration(data_server, caplog): caplog.set_level(logging.INFO)