Skip to content

Parallelized OMERO Upload via ExperimentController Queue #149

@beniroquai

Description

@beniroquai

Description
We want to add support for streaming image tiles to OMERO in parallel to acquisition inside the ExperimentController. The idea is to run imaging in headless mode while pushing acquired tiles robustly to the OMERO server, so that data is immediately available in OMERO without filling up Raspberry Pi storage.

Proposed Approach

  • Add an uploader thread inside ExperimentController (or as a managed service) that consumes tiles from a bounded queue.

  • Acquisition code enqueues tiles (with metadata: (ix, iy, z, c, t, ndarray)), and the uploader handles:

    • connecting to OMERO using login credentials provided by ExperimentController/ExperimentManager
    • creating/using the correct Experiment ID to associate with the OMERO image (mapping 1:1 with experiment identifiers used locally)
    • writing to OMERO via RawPixelsStore.setTile if pyramid backend is available, or via row-stripe/plane writes if only ROMIO backend is available.
  • The queue should be bounded with optional disk spillover (spooling to tmpdir) to avoid blocking acquisition if OMERO/network is slow.

User Options

  • Users should be able to select storage backends when starting an experiment:

    • Local ome.tif
    • Local zarr
    • Raw tif
    • Direct OMERO upload
  • These choices should be mutually inclusive: e.g. the same experiment can be written to OME-TIFF and OMERO simultaneously.

Robustness

  • On failed OMERO connections, fall back to local spool (OME-TIFF/Zarr).
  • On reconnect, uploader can retry pending tiles.
  • Ensure images are linked to datasets in OMERO immediately (to avoid orphaned images).

Deliverables

  • ExperimentController: queue + uploader thread integration.
  • ExperimentManager: passes OMERO credentials and chosen storage options.
  • Abstraction for StorageBackends (OME-TIFF, Zarr, TIFF, OMERO).
  • Tests: simulate high acquisition rate, confirm no dropped frames, confirm correct OMERO linking.

Current code implementation that works for uploading the tiles is this 👍

# Streams a tiled mosaic to OMERO.
# - Links image to dataset (no orphaning)
# - Sets pixel size BEFORE writing (no optimistic lock)
# - AUTO: tile writes for pyramid; full-plane writes for ROMIO (small images)

import time
import numpy as np
from typing import Iterable, Tuple

import omero
from omero.gateway import BlitzGateway
from omero.rtypes import rstring
from omero.model import (
    DatasetI, ImageI, DatasetImageLinkI, LengthI,
    MapAnnotationI, NamedValue, ImageAnnotationLinkI, CommentAnnotationI,
)
from omero.model.enums import UnitsLength

# ---------- CONFIG ----------
HOST = "XXX.XXX.XXX.XXX"; PORT = 4064
USERNAME = "root"; PASSWORD = "omero"

# Mosaic/grid
nx, ny = 4, 4                   # tiles in X (cols) and Y (rows)
tile_w, tile_h = 512, 512
sizeZ, sizeC, sizeT = 1, 2, 1
pixel_type_str = "uint16"
pixel_size_xy_um = 0.65

dataset_name = "StageScan-" + str(time.time())
image_name = f"mosaic_{nx*tile_w}x{ny*tile_h}_{sizeC}c"
image_desc = "Auto-tiled upload (tile/plane fallback)"

# Example tile source: replace with your acquisition stream
# yields (ix, iy, chans) where chans is tuple/list of length sizeC; each is (tile_h x tile_w) ndarray
def tile_stream() -> Iterable[Tuple[int, int, Tuple[np.ndarray, ...]]]:
    rng = np.random.default_rng(0)
    for iy in range(ny):
        for ix in range(nx):
            # fake two-channel tiles
            print(f"Generating tile at ({ix}, {iy})")
            t0 = (rng.integers(0, 1000, size=(tile_h, tile_w))).astype(np.uint16)
            t1 = (rng.integers(0, 1000, size=(tile_h, tile_w))).astype(np.uint16)
            yield ix, iy, (t0, t1)

# ---------- CONNECT ----------
client = omero.client(HOST, PORT)
session = client.createSession(USERNAME, PASSWORD)
conn = BlitzGateway(client_obj=client)

try:
    # ---------- DATASET ----------
    ds = DatasetI()
    ds.setName(rstring(dataset_name))
    ds.setDescription(rstring("Mosaic built from streamed tiles"))
    ds = conn.getUpdateService().saveAndReturnObject(ds, conn.SERVICE_OPTS)
    ds_id = ds.getId().getValue()

    # ---------- IMAGE SHELL ----------
    sizeX = nx * tile_w
    sizeY = ny * tile_h

    pixsvc = conn.getPixelsService()
    q = conn.getQueryService()
    pixelsType = q.findByString("PixelsType", "value", pixel_type_str)

    image_id = pixsvc.createImage(
        sizeX, sizeY, sizeZ, sizeT, list(range(sizeC)), pixelsType,
        image_name, image_desc
    ).getValue()

    # Link image to dataset using id-only proxies (prevents orphaning)
    link = DatasetImageLinkI()
    link.setParent(DatasetI(ds_id, False))
    link.setChild(ImageI(image_id, False))
    conn.getUpdateService().saveAndReturnObject(link, conn.SERVICE_OPTS)

    # Set physical pixel size BEFORE writing
    img = conn.getObject("Image", image_id)
    pixels = q.get("Pixels", img.getPixelsId())
    pixels.setPhysicalSizeX(LengthI(pixel_size_xy_um, UnitsLength.MICROMETER))
    pixels.setPhysicalSizeY(LengthI(pixel_size_xy_um, UnitsLength.MICROMETER))
    conn.getUpdateService().saveAndReturnObject(pixels, conn.SERVICE_OPTS)

    # ---------- OPEN STORE & DETECT BACKEND ----------
    store = conn.createRawPixelsStore()
    try:
        store.setPixelsId(pixels.getId().getValue(), True)
        srv_tw, srv_th = store.getTileSize()
        srv_tw, srv_th = int(srv_tw), int(srv_th)
        # Heuristic:
        # - Pyramid/tiling backend → srv_tw < sizeX (e.g. 256)
        # - ROMIO backend (small images) → srv_tw >= sizeX (requires full-row writes)
        use_row_only = (srv_tw <= 0 or srv_tw >= sizeX)

        if not use_row_only:
            # ---------- TRUE TILE WRITES ----------
            def write_one_tile(ix, iy, ch, arr2d: np.ndarray):
                assert arr2d.shape == (tile_h, tile_w)
                x0 = ix * tile_w
                y0 = iy * tile_h
                # sub-tile to server tile size
                for off_y in range(0, tile_h, srv_th):
                    for off_x in range(0, tile_w, srv_tw):
                        sub = arr2d[off_y:off_y+srv_th, off_x:off_x+srv_tw]
                        h, w = sub.shape[0], sub.shape[1]
                        buf = np.ascontiguousarray(sub).tobytes()
                        store.setTile(buf, 0, ch, 0, x0 + off_x, y0 + off_y, w, h)

            for ix, iy, chans in tile_stream():
                if len(chans) != sizeC:
                    raise RuntimeError(f"Expected {sizeC} channels, got {len(chans)} at tile ({ix},{iy})")
                for ch in range(sizeC):
                    write_one_tile(ix, iy, ch, chans[ch])

            store.save()

        else:
            def iter_rows(tile_iter):
                """Group tiles by row: yields (iy, [ (ix, chans), ... ]) in x-order."""
                rows = [[] for _ in range(ny)]
                for ix, iy, chans in tile_iter:
                    rows[iy].append((ix, chans))
                for iy in range(ny):
                    rows[iy].sort(key=lambda t: t[0])
                    yield iy, rows[iy]

            # ROMIO: write stripes per row and channel
            for iy, tiles_in_row in iter_rows(tile_stream()):
                y0 = iy * tile_h
                for ch in range(sizeC):
                    # one stripe buffer: tile_h rows × full width
                    stripe = np.zeros((tile_h, sizeX), dtype=np.uint16)  # dtype must match pixelsType
                    for ix, chans in tiles_in_row:
                        x0 = ix * tile_w
                        stripe[:, x0:x0 + tile_w] = chans[ch]
                    # full-row write: x=0, w=sizeX; height = tile_h
                    buf = np.ascontiguousarray(stripe).tobytes()
                    store.setTile(buf, 0, ch, 0, 0, y0, sizeX, tile_h)

            store.save()

    finally:
        try:
            store.close()
        except Exception:
            pass

    # ---------- OPTIONAL ANNOTATIONS ----------
    meta = MapAnnotationI()
    meta.setNs(rstring("imswitch.stage.scan"))
    meta.setMapValue([
        NamedValue("scan_type", "stage_scan_mosaic"),
        NamedValue("grid_cols", str(nx)),
        NamedValue("grid_rows", str(ny)),
        NamedValue("tile_w_px", str(tile_w)),
        NamedValue("tile_h_px", str(tile_h)),
        NamedValue("pixel_size_um", str(pixel_size_xy_um)),
        NamedValue("sizeX_px", str(sizeX)),
        NamedValue("sizeY_px", str(sizeY)),
        NamedValue("sizeC", str(sizeC)),
        NamedValue("sizeZ", str(sizeZ)),
        NamedValue("sizeT", str(sizeT)),
    ])
    meta = conn.getUpdateService().saveAndReturnObject(meta, conn.SERVICE_OPTS)
    img = conn.getObject("Image", image_id)
    alink = ImageAnnotationLinkI()
    alink.setParent(img._obj)
    alink.setChild(meta)
    conn.getUpdateService().saveObject(alink, conn.SERVICE_OPTS)

    cmt = CommentAnnotationI()
    cmt.setTextValue(rstring("Auto backend: tile writes for pyramid; setPlane for ROMIO."))
    cmt = conn.getUpdateService().saveAndReturnObject(cmt, conn.SERVICE_OPTS)
    clink = ImageAnnotationLinkI()
    clink.setParent(img._obj)
    clink.setChild(cmt)
    conn.getUpdateService().saveObject(clink, conn.SERVICE_OPTS)

    img.resetRDefs()
    print(f"OK: dataset {ds_id} image {image_id}  ({sizeX}x{sizeY}px, {sizeC}C)")

finally:
    try:
        conn.close()
    except Exception:
        pass

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions