diff --git a/imswitch/imcontrol/controller/controllers/VirtualMicroscopeController.py b/imswitch/imcontrol/controller/controllers/VirtualMicroscopeController.py
new file mode 100644
index 000000000..30cc0434d
--- /dev/null
+++ b/imswitch/imcontrol/controller/controllers/VirtualMicroscopeController.py
@@ -0,0 +1,491 @@
+from imswitch.imcommon.model import APIExport, initLogger
+from imswitch.imcontrol.controller.basecontrollers import LiveUpdatedController
+from imswitch import IS_HEADLESS
+import numpy as np
+from typing import Dict, Optional, Tuple, List
+from pydantic import BaseModel
+
+
+class VirtualMicroscopeConfig(BaseModel):
+ """Configuration for virtual microscope simulation parameters"""
+ # Stage drift simulation
+ drift_enabled: bool = False
+ drift_rate_x: float = 0.1 # pixels per second
+ drift_rate_y: float = 0.1
+ drift_rate_z: float = 0.05
+
+ # Objective parameters
+ objectives: Dict[str, Dict] = {
+ "20x_0.75": {
+ "magnification": 20,
+ "NA": 0.75,
+ "pixel_scale": 0.325, # um per pixel
+ "type": "air"
+ },
+ "60x_1.42": {
+ "magnification": 60,
+ "NA": 1.42,
+ "pixel_scale": 0.108, # um per pixel
+ "type": "oil"
+ }
+ }
+ current_objective: str = "20x_0.75"
+
+ # Exposure and gain simulation
+ exposure_time: float = 100.0 # ms
+ gain: float = 1.0
+
+ # Photobleaching simulation
+ bleaching_enabled: bool = False
+ bleaching_rate: float = 0.01 # fraction per exposure
+
+ # Multi-channel simulation
+ channels: Dict[str, Dict] = {
+ "488": {"wavelength": 488, "intensity": 1.0, "color": "cyan"},
+ "561": {"wavelength": 561, "intensity": 1.0, "color": "green"},
+ "640": {"wavelength": 640, "intensity": 1.0, "color": "red"}
+ }
+ active_channels: List[str] = ["488"]
+
+ # Noise parameters
+ readout_noise: float = 50.0
+ shot_noise_enabled: bool = True
+ dark_current: float = 0.1
+
+ # Sampling and aliasing
+ nyquist_sampling: bool = True
+ aliasing_enabled: bool = False
+
+
+class VirtualMicroscopeController(LiveUpdatedController):
+ """Controller for enhanced Virtual Microscope simulation with API endpoints"""
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self._logger = initLogger(self, tryInheritParent=False)
+
+ # Initialize simulation configuration
+ self._config = VirtualMicroscopeConfig()
+
+ # Get virtual microscope manager from RS232 devices
+ self._virtualMicroscopeManager = None
+ if hasattr(self._master, 'rs232sManager'):
+ rs232_managers = self._master.rs232sManager
+ for name, manager in rs232_managers:
+ if 'VirtualMicroscope' in name:
+ self._virtualMicroscopeManager = manager
+ break
+
+ if self._virtualMicroscopeManager is None:
+ self._logger.warning("Virtual Microscope Manager not found")
+ return
+
+ # Initialize simulation state
+ self._last_drift_time = None
+ self._bleaching_factor = 1.0
+ self._frame_count = 0
+
+ # Connect to existing controllers for integration
+ self._connectToExistingControllers()
+
+ @APIExport(runOnUIThread=True)
+ def getConfig(self) -> Dict:
+ """Get current virtual microscope configuration"""
+ return self._config.dict()
+
+ @APIExport(runOnUIThread=True)
+ def updateConfig(self, config: Dict) -> Dict:
+ """Update virtual microscope configuration"""
+ try:
+ # Update configuration with provided values
+ for key, value in config.items():
+ if hasattr(self._config, key):
+ setattr(self._config, key, value)
+
+ self._logger.info(f"Updated virtual microscope config: {config}")
+ return {"status": "success", "config": self._config.dict()}
+ except Exception as e:
+ self._logger.error(f"Failed to update config: {str(e)}")
+ return {"status": "error", "message": str(e)}
+
+
+
+ @APIExport(runOnUIThread=True)
+ def enableStageDrift(self, enabled: bool, drift_rate_x: float = None,
+ drift_rate_y: float = None, drift_rate_z: float = None) -> Dict:
+ """Enable/disable stage drift simulation with relative increments"""
+ try:
+ self._config.drift_enabled = enabled
+ if drift_rate_x is not None:
+ self._config.drift_rate_x = drift_rate_x
+ if drift_rate_y is not None:
+ self._config.drift_rate_y = drift_rate_y
+ if drift_rate_z is not None:
+ self._config.drift_rate_z = drift_rate_z
+
+ if enabled:
+ # Reset drift timing for relative increments
+ import time
+ self._last_drift_time = time.time()
+ else:
+ self._last_drift_time = None
+
+ self._logger.info(f"Stage drift {'enabled' if enabled else 'disabled'} with rates: X={self._config.drift_rate_x}, Y={self._config.drift_rate_y}, Z={self._config.drift_rate_z}")
+ return {"status": "success", "drift_enabled": enabled}
+ except Exception as e:
+ self._logger.error(f"Failed to set stage drift: {str(e)}")
+ return {"status": "error", "message": str(e)}
+
+ @APIExport(runOnUIThread=True)
+ def enablePhotobleaching(self, enabled: bool, bleaching_rate: float = None) -> Dict:
+ """Enable/disable photobleaching simulation"""
+ try:
+ self._config.bleaching_enabled = enabled
+ if bleaching_rate is not None:
+ self._config.bleaching_rate = bleaching_rate
+
+ if not enabled:
+ self._bleaching_factor = 1.0 # Reset bleaching
+ self._frame_count = 0
+
+ self._logger.info(f"Photobleaching {'enabled' if enabled else 'disabled'}")
+ return {"status": "success", "bleaching_enabled": enabled}
+ except Exception as e:
+ self._logger.error(f"Failed to set photobleaching: {str(e)}")
+ return {"status": "error", "message": str(e)}
+
+
+
+ @APIExport(runOnUIThread=True)
+ def setActiveChannels(self, channels: List[str]) -> Dict:
+ """Set active laser channels for multi-channel imaging"""
+ try:
+ available_channels = list(self._config.channels.keys())
+ invalid_channels = [ch for ch in channels if ch not in available_channels]
+
+ if invalid_channels:
+ return {
+ "status": "error",
+ "message": f"Invalid channels: {invalid_channels}. Available: {available_channels}"
+ }
+
+ self._config.active_channels = channels
+
+ # Calculate combined intensity from active channels
+ if self._virtualMicroscopeManager:
+ total_intensity = sum(
+ self._config.channels[ch]["intensity"]
+ for ch in channels
+ ) * 1000 # Base scaling
+ self._virtualMicroscopeManager._illuminator.set_intensity(1, total_intensity)
+
+ self._logger.info(f"Set active channels: {channels}")
+ return {"status": "success", "active_channels": channels}
+ except Exception as e:
+ self._logger.error(f"Failed to set channels: {str(e)}")
+ return {"status": "error", "message": str(e)}
+
+ @APIExport(runOnUIThread=True)
+ def simulateAliasing(self, enabled: bool) -> Dict:
+ """Enable/disable aliasing artifacts for educational purposes"""
+ try:
+ self._config.aliasing_enabled = enabled
+ self._config.nyquist_sampling = not enabled # Inverse relationship
+
+ self._logger.info(f"Aliasing simulation {'enabled' if enabled else 'disabled'}")
+ return {"status": "success", "aliasing_enabled": enabled}
+ except Exception as e:
+ self._logger.error(f"Failed to set aliasing: {str(e)}")
+ return {"status": "error", "message": str(e)}
+
+ @APIExport(runOnUIThread=True)
+ def getStatus(self) -> Dict:
+ """Get current virtual microscope status and parameters"""
+ try:
+ status = {
+ "config": self._config.dict(),
+ "drift_active": self._last_drift_time is not None,
+ "frame_count": self._frame_count,
+ "bleaching_factor": self._bleaching_factor
+ }
+
+ if self._virtualMicroscopeManager:
+ position = self._virtualMicroscopeManager._positioner.get_position()
+ intensity = self._virtualMicroscopeManager._illuminator.get_intensity(1)
+ objective_state = getattr(self._virtualMicroscopeManager, 'currentObjective', 1)
+
+ status.update({
+ "position": position,
+ "illuminator_intensity": intensity,
+ "current_objective_slot": objective_state
+ })
+
+ # Add SLM status if available
+ if hasattr(self._virtualMicroscopeManager._virtualMicroscope, 'slm'):
+ slm_status = self._virtualMicroscopeManager._virtualMicroscope.slm.get_status()
+ status["slm"] = slm_status
+
+ return status
+ except Exception as e:
+ self._logger.error(f"Failed to get status: {str(e)}")
+ return {"status": "error", "message": str(e)}
+
+ @APIExport(runOnUIThread=True)
+ def setSLMPattern(self, pattern_type: str, **kwargs) -> Dict:
+ """Set SLM pattern for structured illumination and beam shaping"""
+ try:
+ if not self._virtualMicroscopeManager:
+ return {"status": "error", "message": "Virtual Microscope Manager not available"}
+
+ if not hasattr(self._virtualMicroscopeManager._virtualMicroscope, 'slm'):
+ return {"status": "error", "message": "Virtual SLM not available"}
+
+ slm = self._virtualMicroscopeManager._virtualMicroscope.slm
+ success = slm.set_pattern(pattern_type, **kwargs)
+
+ if success:
+ slm.set_active(True)
+ return {
+ "status": "success",
+ "pattern_type": pattern_type,
+ "parameters": kwargs,
+ "slm_status": slm.get_status()
+ }
+ else:
+ return {"status": "error", "message": "Failed to set SLM pattern"}
+
+ except Exception as e:
+ self._logger.error(f"Failed to set SLM pattern: {str(e)}")
+ return {"status": "error", "message": str(e)}
+
+ @APIExport(runOnUIThread=True)
+ def applySLMAberrationCorrection(self, **zernike_coeffs) -> Dict:
+ """Apply aberration correction using Zernike polynomials on SLM"""
+ try:
+ if not self._virtualMicroscopeManager:
+ return {"status": "error", "message": "Virtual Microscope Manager not available"}
+
+ if not hasattr(self._virtualMicroscopeManager._virtualMicroscope, 'slm'):
+ return {"status": "error", "message": "Virtual SLM not available"}
+
+ slm = self._virtualMicroscopeManager._virtualMicroscope.slm
+ success = slm.apply_aberration_correction(**zernike_coeffs)
+
+ if success:
+ return {
+ "status": "success",
+ "zernike_coefficients": zernike_coeffs,
+ "slm_status": slm.get_status()
+ }
+ else:
+ return {"status": "error", "message": "Failed to apply aberration correction"}
+
+ except Exception as e:
+ self._logger.error(f"Failed to apply SLM aberration correction: {str(e)}")
+ return {"status": "error", "message": str(e)}
+
+ @APIExport(runOnUIThread=True)
+ def getSLMPattern(self) -> Dict:
+ """Get current SLM pattern as base64 encoded image"""
+ try:
+ if not self._virtualMicroscopeManager:
+ return {"status": "error", "message": "Virtual Microscope Manager not available"}
+
+ if not hasattr(self._virtualMicroscopeManager._virtualMicroscope, 'slm'):
+ return {"status": "error", "message": "Virtual SLM not available"}
+
+ slm = self._virtualMicroscopeManager._virtualMicroscope.slm
+ pattern = slm.get_pattern()
+
+ # Convert to base64 for web transmission
+ import base64
+ from io import BytesIO
+ try:
+ from PIL import Image
+ img = Image.fromarray(pattern)
+ buffer = BytesIO()
+ img.save(buffer, format='PNG')
+ img_str = base64.b64encode(buffer.getvalue()).decode()
+
+ return {
+ "status": "success",
+ "pattern_base64": img_str,
+ "pattern_shape": pattern.shape,
+ "slm_status": slm.get_status()
+ }
+ except ImportError:
+ # Fallback without PIL
+ return {
+ "status": "success",
+ "pattern_shape": pattern.shape,
+ "pattern_available": True,
+ "slm_status": slm.get_status(),
+ "note": "Pattern data available but PIL not installed for base64 encoding"
+ }
+
+ except Exception as e:
+ self._logger.error(f"Failed to get SLM pattern: {str(e)}")
+ return {"status": "error", "message": str(e)}
+
+ @APIExport(runOnUIThread=True)
+ def resetSLM(self) -> Dict:
+ """Reset SLM to blank state"""
+ try:
+ if not self._virtualMicroscopeManager:
+ return {"status": "error", "message": "Virtual Microscope Manager not available"}
+
+ if not hasattr(self._virtualMicroscopeManager._virtualMicroscope, 'slm'):
+ return {"status": "error", "message": "Virtual SLM not available"}
+
+ slm = self._virtualMicroscopeManager._virtualMicroscope.slm
+ slm.reset()
+
+ return {
+ "status": "success",
+ "message": "SLM reset to blank state",
+ "slm_status": slm.get_status()
+ }
+
+ except Exception as e:
+ self._logger.error(f"Failed to reset SLM: {str(e)}")
+ return {"status": "error", "message": str(e)}
+
+ def _applyDrift(self):
+ """Internal method to apply stage drift if enabled using relative increments"""
+ if not self._config.drift_enabled or not hasattr(self, '_last_drift_time') or self._last_drift_time is None:
+ return
+
+ if not self._virtualMicroscopeManager:
+ return
+
+ try:
+ import time
+ current_time = time.time()
+ time_interval = current_time - self._last_drift_time
+
+ # Apply relative drift increments based on time interval
+ drift_x = self._config.drift_rate_x * time_interval
+ drift_y = self._config.drift_rate_y * time_interval
+ drift_z = self._config.drift_rate_z * time_interval
+
+ # Apply relative position changes
+ self._virtualMicroscopeManager._positioner.move(
+ x=drift_x, y=drift_y, z=drift_z, is_absolute=False
+ )
+
+ # Update timing for next interval
+ self._last_drift_time = current_time
+
+ except Exception as e:
+ self._logger.error(f"Error applying drift: {str(e)}")
+
+ def _applyPhotobleaching(self):
+ """Internal method to apply photobleaching if enabled"""
+ if not self._config.bleaching_enabled:
+ return
+
+ self._frame_count += 1
+
+ # Apply exponential decay
+ self._bleaching_factor *= (1 - self._config.bleaching_rate)
+
+ # Update illuminator intensity to reflect bleaching
+ if self._virtualMicroscopeManager:
+ # Get current intensity and apply bleaching factor
+ current_intensity = self._virtualMicroscopeManager._illuminator.get_intensity(1)
+ if current_intensity > 0:
+ bleached_intensity = current_intensity * self._bleaching_factor
+ self._virtualMicroscopeManager._illuminator.set_intensity(1, bleached_intensity)
+
+ def update(self):
+ """Update method called periodically by the framework"""
+ super().update()
+
+ # Apply ongoing simulations
+ self._applyDrift()
+ self._applyPhotobleaching()
+
+ def _connectToExistingControllers(self):
+ """Connect to existing controllers to listen for changes"""
+ try:
+ # Connect to SettingsController changes via shared attributes
+ if hasattr(self._commChannel, 'sharedAttrs'):
+ self._commChannel.sharedAttrs.sigAttributeSet.connect(self._onDetectorSettingChanged)
+
+ # Connect to ObjectiveController if available
+ if hasattr(self._master, 'objectiveController'):
+ objective_controller = self._master.objectiveController
+ if hasattr(objective_controller, 'sigObjectiveChanged'):
+ objective_controller.sigObjectiveChanged.connect(self._onObjectiveChanged)
+
+ except Exception as e:
+ self._logger.warning(f"Could not fully connect to existing controllers: {str(e)}")
+
+ def _onDetectorSettingChanged(self, key, value):
+ """Handle detector setting changes from SettingsController"""
+ try:
+ if not isinstance(key, tuple) or len(key) < 3:
+ return
+
+ category, detector_name, param_category = key[:3]
+ if category != 'Detector':
+ return
+
+ # Handle exposure time changes
+ if len(key) == 4 and param_category == 'Param' and key[3] == 'exposure':
+ self._config.exposure_time = value
+ if self._virtualMicroscopeManager:
+ self._virtualMicroscopeManager.updateExposureGain(
+ exposure_time=value,
+ gain=self._config.gain
+ )
+ self._logger.info(f"Virtual microscope updated exposure: {value}")
+
+ # Handle gain changes
+ elif len(key) == 4 and param_category == 'Param' and key[3] == 'gain':
+ self._config.gain = value
+ if self._virtualMicroscopeManager:
+ self._virtualMicroscopeManager.updateExposureGain(
+ exposure_time=self._config.exposure_time,
+ gain=value
+ )
+ self._logger.info(f"Virtual microscope updated gain: {value}")
+
+ except Exception as e:
+ self._logger.error(f"Error handling detector setting change: {str(e)}")
+
+ def _onObjectiveChanged(self, status_dict):
+ """Handle objective changes from ObjectiveController"""
+ try:
+ if 'state' in status_dict:
+ objective_slot = status_dict['state']
+ if self._virtualMicroscopeManager:
+ self._virtualMicroscopeManager.setObjective(objective_slot)
+
+ # Update our config to reflect the change
+ if objective_slot == 1:
+ self._config.current_objective = "20x_0.75"
+ elif objective_slot == 2:
+ self._config.current_objective = "60x_1.42"
+
+ self._logger.info(f"Virtual microscope updated objective: slot {objective_slot}")
+
+ except Exception as e:
+ self._logger.error(f"Error handling objective change: {str(e)}")
+
+
+# Copyright (C) 2020-2024 ImSwitch developers
+# This file is part of ImSwitch.
+#
+# ImSwitch is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# ImSwitch is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
\ No newline at end of file
diff --git a/imswitch/imcontrol/model/SetupInfo.py b/imswitch/imcontrol/model/SetupInfo.py
index 719c00643..ae109cce7 100644
--- a/imswitch/imcontrol/model/SetupInfo.py
+++ b/imswitch/imcontrol/model/SetupInfo.py
@@ -484,6 +484,81 @@ class PyroServerInfo:
active: Optional[bool] = False
+@dataclass(frozen=False)
+class VirtualMicroscopeInfo:
+ """ Configuration for Virtual Microscope simulation parameters """
+
+ # Stage drift simulation parameters
+ drift_enabled: bool = False
+ drift_rate_x: float = 0.1 # pixels per second
+ drift_rate_y: float = 0.1
+ drift_rate_z: float = 0.05
+
+ # Objective parameters
+ objectives: dict = field(default_factory=lambda: {
+ "20x_0.75": {
+ "magnification": 20,
+ "NA": 0.75,
+ "pixel_scale": 0.325, # um per pixel
+ "type": "air"
+ },
+ "60x_1.42": {
+ "magnification": 60,
+ "NA": 1.42,
+ "pixel_scale": 0.108, # um per pixel
+ "type": "oil"
+ }
+ })
+ current_objective: str = "20x_0.75"
+
+ # Exposure and gain simulation
+ exposure_time: float = 100.0 # ms
+ gain: float = 1.0
+
+ # Photobleaching simulation
+ bleaching_enabled: bool = False
+ bleaching_rate: float = 0.01 # fraction per exposure
+
+ # Multi-channel simulation
+ channels: dict = field(default_factory=lambda: {
+ "488": {"wavelength": 488, "intensity": 1.0, "color": "cyan"},
+ "561": {"wavelength": 561, "intensity": 1.0, "color": "green"},
+ "640": {"wavelength": 640, "intensity": 1.0, "color": "red"}
+ })
+ active_channels: list = field(default_factory=lambda: ["488"])
+
+ # Noise parameters
+ readout_noise: float = 50.0
+ shot_noise_enabled: bool = True
+ dark_current: float = 0.1
+
+ # Sampling and aliasing for education
+ nyquist_sampling: bool = True
+ aliasing_enabled: bool = False
+
+ # SLM (Spatial Light Modulator) parameters
+ slm_enabled: bool = False
+ slm_pattern_type: str = "blank"
+ slm_pattern_params: dict = field(default_factory=lambda: {
+ "frequency": 10,
+ "phase": 0,
+ "amplitude": 255,
+ "angle": 0,
+ "center_x": 960, # Half of typical SLM width
+ "center_y": 576 # Half of typical SLM height
+ })
+ slm_zernike_coeffs: dict = field(default_factory=lambda: {
+ "tip": 0.0,
+ "tilt": 0.0,
+ "defocus": 0.0,
+ "astig_0": 0.0,
+ "astig_45": 0.0,
+ "coma_x": 0.0,
+ "coma_y": 0.0,
+ "spherical": 0.0
+ })
+
+
@dataclass_json(undefined=Undefined.INCLUDE)
@dataclass
class SetupInfo:
@@ -582,6 +657,9 @@ class SetupInfo:
""" Focus lock settings. Required to be defined to use focus lock
functionality. """
+ virtualMicroscope: Optional[VirtualMicroscopeInfo] = field(default_factory=lambda: None)
+ """ Virtual Microscope settings. Required to be defined to use enhanced Virtual Microscope functionality. """
+
fovLock: Optional[FOVLockInfo] = field(default_factory=lambda: None)
""" Focus lock settings. Required to be defined to use fov lock
functionality. """
diff --git a/imswitch/imcontrol/model/managers/rs232/VirtualMicroscopeManager.py b/imswitch/imcontrol/model/managers/rs232/VirtualMicroscopeManager.py
index 69cf06769..325d694cf 100644
--- a/imswitch/imcontrol/model/managers/rs232/VirtualMicroscopeManager.py
+++ b/imswitch/imcontrol/model/managers/rs232/VirtualMicroscopeManager.py
@@ -2,18 +2,23 @@
import cv2
import math
import time
-from imswitch import IS_HEADLESS, __file__
import threading
import numpy as np
import matplotlib.pyplot as plt
-from skimage.draw import line
-from scipy.signal import convolve2d
+from imswitch import IS_HEADLESS, __file__
from imswitch.imcommon.model import initLogger
try:
- import NanoImagingPack as nip
+ from .VirtualSLM import VirtualSLM
+except ImportError:
+ # Fallback if VirtualSLM is not available
+ class VirtualSLM:
+ def __init__(self, parent):
+ self.parent = parent
+try:
+ import NanoImagingPack as nip
IS_NIP = True
except:
IS_NIP = False
@@ -27,21 +32,20 @@
def njit(*args, **kwargs):
def wrapper(func):
return func
-
return wrapper
class VirtualMicroscopeManager:
- """A low-level wrapper for TCP-IP communication (ESP32 REST API)"""
+ """A low-level wrapper for Virtual Microscope simulation"""
def __init__(self, rs232Info, name, **_lowLevelManagers):
self.__logger = initLogger(self, instanceName=name)
self._settings = rs232Info.managerProperties
self._name = name
- availableImageModalities = ["simplant", "smlm"]
+
try:
self._imagePath = rs232Info.managerProperties["imagePath"]
- if not self._imagePath in availableImageModalities:
+ if self._imagePath not in ["simplant", "smlm"]:
raise NameError
except:
package_dir = os.path.dirname(os.path.abspath(__file__))
@@ -51,33 +55,61 @@ def __init__(self, rs232Info, name, **_lowLevelManagers):
self.__logger.info(
"If you want to use the plant, use 'imagePath': 'simplant' in your setup.json"
)
- defaultJSON = {
- "rs232devices": {
- "VirtualMicroscope": {
- "managerName": "VirtualMicroscopeManager",
- "managerProperties": {"imagePath": "simplant"},
- }
- }
- }
- self.__logger.info("Default JSON:" + str(defaultJSON))
self._virtualMicroscope = VirtualMicroscopy(self._imagePath)
self._positioner = self._virtualMicroscope.positioner
self._camera = self._virtualMicroscope.camera
self._illuminator = self._virtualMicroscope.illuminator
- self._objective = None
+ self._objective = self._virtualMicroscope.objective
+ # Initialize objective state: 1 (default) => no binning, 2 => binned image (2x magnification)
+ self.currentObjective = 1
+ self._camera.binning = False
+
+ # Base illuminator intensity for exposure/gain calculations
+ self._base_intensity = 1000
+
+ def toggleObjective(self):
"""
- # Test the functionality
- for i in range(10):
- microscope.positioner.move(x=5, y=5)
- microscope.illuminator.set_intensity(intensity=1.5)
- frame = microscope.get_frame()
- cv2.imshow("Microscope View", frame)
- cv2.waitKey(100)
-
- cv2.destroyAllWindows()
+ Toggle the objective lens.
+ When toggled, the virtual objective move is simulated,
+ and the image magnification is changed by binning the pixels.
"""
+ if self.currentObjective == 1:
+ # Move to objective 2: simulate move and apply 2x binning
+ self.__logger.info("Switching to Objective 2: Applying 2x binning")
+ self.currentObjective = 2
+ self._camera.binning = True
+ else:
+ # Move back to objective 1: remove binning
+ self.__logger.info("Switching to Objective 1: Removing binning")
+ self.currentObjective = 1
+ self._camera.binning = False
+
+ def setObjective(self, objective_slot: int):
+ """Set objective based on slot number (1 or 2)"""
+ if objective_slot == 1:
+ self.__logger.info("Switching to Objective 1: Removing binning")
+ self.currentObjective = 1
+ self._camera.binning = False
+ elif objective_slot == 2:
+ self.__logger.info("Switching to Objective 2: Applying 2x binning")
+ self.currentObjective = 2
+ self._camera.binning = True
+ else:
+ self.__logger.warning(f"Invalid objective slot {objective_slot}, only 1 or 2 supported")
+
+ def updateExposureGain(self, exposure_time: float = None, gain: float = None):
+ """Update illuminator intensity based on exposure and gain settings"""
+ if exposure_time is None:
+ exposure_time = 100.0 # Default exposure
+ if gain is None:
+ gain = 1.0 # Default gain
+
+ # Scale illuminator intensity based on exposure and gain
+ scaled_intensity = self._base_intensity * (exposure_time / 100.0) * gain
+ self._illuminator.set_intensity(1, scaled_intensity)
+ self.__logger.info(f"Updated virtual microscope intensity to {scaled_intensity} (exposure: {exposure_time}, gain: {gain})")
def finalize(self):
self._virtualMicroscope.stop()
@@ -109,6 +141,8 @@ def __init__(self, parent, filePath="path_to_image.jpeg"):
self.PixelSize = 1.0
self.isRGB = False
self.frameNumber = 0
+ # Binning control
+ self.binning = False
# precompute noise so that we will save energy and trees
self.noiseStack = np.abs(
np.random.randn(self.SensorHeight, self.SensorWidth, 100) * 2
@@ -118,25 +152,45 @@ def produce_frame(
self, x_offset=0, y_offset=0, light_intensity=1.0, defocusPSF=None
):
"""Generate a frame based on the current settings."""
- if self.filePath == "smlm": # There is likely a better way of handling this
+ if self.filePath == "smlm": # There is likely a better way of handling this
return self.produce_smlm_frame(x_offset, y_offset, light_intensity)
else:
with self.lock:
- # add moise
+ # add noise
image = self.image.copy()
# Adjust image based on offsets
image = np.roll(
np.roll(image, int(x_offset), axis=1), int(y_offset), axis=0
)
- image = nip.extract(image, (self.SensorHeight, self.SensorWidth)) # extract the image to the sensor size
+ if IS_NIP:
+ image = nip.extract(image, (self.SensorHeight, self.SensorWidth))
+ else:
+ # Fallback cropping if NIP is not available
+ h, w = image.shape
+ start_h = max(0, h // 2 - self.SensorHeight // 2)
+ start_w = max(0, w // 2 - self.SensorWidth // 2)
+ image = image[start_h:start_h + self.SensorHeight,
+ start_w:start_w + self.SensorWidth]
# do all post-processing on cropped image
if IS_NIP and defocusPSF is not None and not defocusPSF.shape == ():
print("Defocus:" + str(defocusPSF.shape))
image = np.array(np.real(nip.convolve(image, defocusPSF)))
+
image = np.float32(image) * np.float32(light_intensity)
image += self.noiseStack[:, :, np.random.randint(0, 100)]
-
+
+ # Apply binning if enabled (simulates higher magnification objective)
+ if self.binning:
+ # 2x2 binning by taking every 2nd pixel
+ image = image[::2, ::2]
+ # Scale back up to original size for display
+ image = np.repeat(np.repeat(image, 2, axis=0), 2, axis=1)
+ # Ensure we maintain original size
+ if image.shape[0] > self.SensorHeight:
+ image = image[:self.SensorHeight, :]
+ if image.shape[1] > self.SensorWidth:
+ image = image[:, :self.SensorWidth]
# Adjust illumination
image = image.astype(np.uint16)
@@ -146,25 +200,33 @@ def produce_frame(
def produce_smlm_frame(self, x_offset=0, y_offset=0, light_intensity=5000):
"""Generate a SMLM frame based on the current settings."""
with self.lock:
- # add moise
+ # add noise
image = self.image.copy()
# Adjust image based on offsets
image = np.roll(
np.roll(image, int(x_offset), axis=1), int(y_offset), axis=0
)
- image = nip.extract(image, (self.SensorHeight, self.SensorWidth))
+ if IS_NIP:
+ image = nip.extract(image, (self.SensorHeight, self.SensorWidth))
+ else:
+ # Fallback cropping if NIP is not available
+ h, w = image.shape
+ start_h = max(0, h // 2 - self.SensorHeight // 2)
+ start_w = max(0, w // 2 - self.SensorWidth // 2)
+ image = image[start_h:start_h + self.SensorHeight,
+ start_w:start_w + self.SensorWidth]
yc_array, xc_array = binary2locs(image, density=0.05)
photon_array = np.random.normal(
light_intensity * 5, light_intensity * 0.05, size=len(xc_array)
)
- wavelenght = 6 # change to get it from microscope settings
- wavelenght_std = 0.5 # change to get it from microscope settings
+ wavelength = 6 # change to get it from microscope settings
+ wavelength_std = 0.5 # change to get it from microscope settings
NA = 1.2 # change to get it from microscope settings
- sigma = 0.21 * wavelenght / NA # change to get it from microscope settings
+ sigma = 0.21 * wavelength / NA # change to get it from microscope settings
sigma_std = (
- 0.21 * wavelenght_std / NA
+ 0.21 * wavelength_std / NA
) # change to get it from microscope settings
sigma_array = np.random.normal(sigma, sigma_std, size=len(xc_array))
@@ -192,7 +254,7 @@ def produce_smlm_frame(self, x_offset=0, y_offset=0, light_intensity=5000):
def getLast(self, returnFrameNumber=False):
position = self._parent.positioner.get_position()
- defocusPSF = np.squeeze(self._parent.positioner.get_psf())
+ defocusPSF = np.squeeze(self._parent.positioner.get_psf()) if self._parent.positioner.get_psf() is not None else None
intensity = self._parent.illuminator.get_intensity(1)
self.frameNumber += 1
if returnFrameNumber:
@@ -213,11 +275,10 @@ def getLast(self, returnFrameNumber=False):
defocusPSF=defocusPSF,
)
-
def getLastChunk(self):
mFrame = self.getLast()
- return np.expand_dims(mFrame, axis=0), [self.frameNumber] # we only provide one chunk, so we return a list with one element
-
+ return np.expand_dims(mFrame, axis=0), [self.frameNumber] # we only provide one chunk, so we return a list with one element
+
def setPropertyValue(self, propertyName, propertyValue):
pass
@@ -284,152 +345,6 @@ def get_psf(self):
return self.psf
-class Illuminator:import os
-import cv2
-import math
-import time
-import threading
-import numpy as np
-import matplotlib.pyplot as plt
-
-from imswitch import IS_HEADLESS, __file__
-from imswitch.imcommon.model import initLogger
-
-try:
- import NanoImagingPack as nip
- IS_NIP = True
-except:
- IS_NIP = False
-
-try:
- from numba import njit, prange
-except ModuleNotFoundError:
- prange = range
- def njit(*args, **kwargs):
- def wrapper(func):
- return func
- return wrapper
-
-
-class VirtualMicroscopeManager:
- """A low-level wrapper for TCP-IP communication (ESP32 REST API)
- with added objective control that toggles the objective lens.
- Toggling the objective will double the image magnification by
- binning the pixels (2x2 binning).
- """
-
- def __init__(self, rs232Info, name, **_lowLevelManagers):
- self.__logger = initLogger(self, instanceName=name)
- self._settings = rs232Info.managerProperties
- self._name = name
-
- try:
- self._imagePath = rs232Info.managerProperties["imagePath"]
- if self._imagePath not in ["simplant", "smlm"]:
- raise NameError
- except:
- package_dir = os.path.dirname(os.path.abspath(__file__))
- self._imagePath = os.path.join(
- package_dir, "_data/images/histoASHLARStitch.jpg"
- )
- self.__logger.info(
- "If you want to use the plant, use 'imagePath': 'simplant' in your setup.json"
- )
-
- self._virtualMicroscope = VirtualMicroscopy(self._imagePath)
- self._positioner = self._virtualMicroscope.positioner
- self._camera = self._virtualMicroscope.camera
- self._illuminator = self._virtualMicroscope.illuminator
- self._objective = self._virtualMicroscope.objective
-
- # Initialize objective state: 1 (default) => no binning, 2 => binned image (2x magnification)
- self.currentObjective = 1
- self._camera.binning = False
-
- def toggleObjective(self):
- """
- Toggle the objective lens.
- When toggled, the virtual objective move is simulated,
- and the image magnification is changed by binning the pixels.
- """
- if self.currentObjective == 1:
- # Move to objective 2: simulate move and apply 2x binning
- self.__logger.info("Switching to Objective 2: Applying 2x binning")
- # Here one could call a REST API endpoint like:
- # /ObjectiveController/moveToObjective?slot=2
- self.currentObjective = 2
- self._camera.binning = True
- else:
- # Move back to objective 1: remove binning
- self.__logger.info("Switching to Objective 1: Removing binning")
- # Here one could call a REST API endpoint like:
- # /ObjectiveController/moveToObjective?slot=1
- self.currentObjective = 1
- self._camera.binning = False
-
- def finalize(self):
- self._virtualMicroscope.stop()
-
-
-
-class Positioner:
- def __init__(self, parent):
- self._parent = parent
- self.position = {"X": 0, "Y": 0, "Z": 0, "A": 0}
- self.mDimensions = (self._parent.camera.SensorHeight, self._parent.camera.SensorWidth)
- self.lock = threading.Lock()
- if IS_NIP:
- self.psf = self.compute_psf(dz=0)
- else:
- self.psf = None
-
- def move(self, x=None, y=None, z=None, a=None, is_absolute=False):
- with self.lock:
- if is_absolute:
- if x is not None:
- self.position["X"] = x
- if y is not None:
- self.position["Y"] = y
- if z is not None:
- self.position["Z"] = z
- self.compute_psf(self.position["Z"])
- if a is not None:
- self.position["A"] = a
- else:
- if x is not None:
- self.position["X"] += x
- if y is not None:
- self.position["Y"] += y
- if z is not None:
- self.position["Z"] += z
- self.compute_psf(self.position["Z"])
- if a is not None:
- self.position["A"] += a
-
- def get_position(self):
- with self.lock:
- return self.position.copy()
-
- def compute_psf(self, dz):
- dz = np.float32(dz)
- print("Defocus:" + str(dz))
- if IS_NIP and dz != 0:
- obj = nip.image(np.zeros(self.mDimensions))
- obj.pixelsize = (100.0, 100.0)
- paraAbber = nip.PSF_PARAMS()
- paraAbber.aberration_types = [paraAbber.aberration_zernikes.spheric]
- paraAbber.aberration_strength = [np.float32(dz) / 10]
- psf = nip.psf(obj, paraAbber)
- self.psf = psf.copy()
- del psf
- del obj
- else:
- self.psf = None
-
- def get_psf(self):
- return self.psf
-
-
class Illuminator:
def __init__(self, parent):
self._parent = parent
@@ -457,6 +372,7 @@ def __init__(self, filePath="path_to_image.jpeg"):
self.positioner = Positioner(self)
self.illuminator = Illuminator(self)
self.objective = Objective(self)
+ self.slm = VirtualSLM(self)
def stop(self):
pass
@@ -466,114 +382,6 @@ def stop(self):
def FromLoc2Image_MultiThreaded(
xc_array: np.ndarray, yc_array: np.ndarray, photon_array: np.ndarray,
sigma_array: np.ndarray, image_height: int, image_width: int, pixel_size: float
-):
- Image = np.zeros((image_height, image_width))
- for ij in prange(image_height * image_width):
- j = int(ij / image_width)
- i = ij - j * image_width
- for xc, yc, photon, sigma in zip(xc_array, yc_array, photon_array, sigma_array):
- if (photon > 0) and (sigma > 0):
- S = sigma * math.sqrt(2)
- x = i * pixel_size - xc
- y = j * pixel_size - yc
- if (x + pixel_size / 2) ** 2 + (y + pixel_size / 2) ** 2 < 16 * sigma**2:
- ErfX = math.erf((x + pixel_size) / S) - math.erf(x / S)
- ErfY = math.erf((y + pixel_size) / S) - math.erf(y / S)
- Image[j][i] += 0.25 * photon * ErfX * ErfY
- return Image
-
-
-def binary2locs(img: np.ndarray, density: float):
- all_locs = np.nonzero(img == 1)
- n_points = int(len(all_locs[0]) * density)
- selected_idx = np.random.choice(len(all_locs[0]), n_points, replace=False)
- filtered_locs = all_locs[0][selected_idx], all_locs[1][selected_idx]
- return filtered_locs
-
-
-def createBranchingTree(width=5000, height=5000, lineWidth=3):
- np.random.seed(0)
- image = np.ones((height, width), dtype=np.uint8) * 255
-
- def draw_vessel(start, end, image):
- rr, cc = line(start[0], start[1], end[0], end[1])
- try:
- image[rr, cc] = 0
- except:
- return
-
- def draw_tree(start, angle, length, depth, image, reducer, max_angle=40):
- if depth == 0:
- return
- end = (int(start[0] + length * np.sin(np.radians(angle))),
- int(start[1] + length * np.cos(np.radians(angle))))
- draw_vessel(start, end, image)
- angle += np.random.uniform(-10, 10)
- new_length = length * reducer
- new_depth = depth - 1
- draw_tree(end, angle - max_angle * np.random.uniform(-1, 1), new_length, new_depth, image, reducer)
- draw_tree(end, angle + max_angle * np.random.uniform(-1, 1), new_length, new_depth, image, reducer)
-
- start_point = (height - 1, width // 2)
- initial_angle = -90
- initial_length = np.max((width, height)) * 0.15
- depth = 7
- reducer = 0.9
- draw_tree(start_point, initial_angle, initial_length, depth, image, reducer)
- rectangle = np.ones((lineWidth, lineWidth))
- from scipy.signal import convolve2d
- image = convolve2d(image, rectangle, mode="same", boundary="fill", fillvalue=0)
- return image
-
-
-if __name__ == "__main__":
- imagePath = "smlm"
- microscope = VirtualMicroscopy(filePath=imagePath)
- vmManager = VirtualMicroscopeManager(rs232Info=type("RS232", (), {"managerProperties": {"imagePath": "smlm"}})(), name="VirtualScope")
- microscope.illuminator.set_intensity(intensity=1000)
-
- # Toggle objective to simulate switching and doubling magnification via binning
- vmManager.toggleObjective()
- for i in range(5):
- microscope.positioner.move(
- x=1400 + i * (-200), y=-800 + i * (-10), z=0, is_absolute=True
- )
- frame = microscope.camera.getLast()
- plt.imsave(f"frame_{i}.png", frame)
- cv2.destroyAllWindows()
-
- def __init__(self, parent):
- self._parent = parent
- self.intensity = 0
- self.lock = threading.Lock()
-
- def set_intensity(self, channel=1, intensity=0):
- with self.lock:
- self.intensity = intensity
-
- def get_intensity(self, channel):
- with self.lock:
- return self.intensity
-
-class Objective:
- def __init__(self, parent):
- self._parent = parent
-
-
-class VirtualMicroscopy:
- def __init__(self, filePath="path_to_image.jpeg"):
- self.camera = Camera(self, filePath)
- self.positioner = Positioner(self)
- self.illuminator = Illuminator(self)
- self.objective = Objective(self)
-
- def stop(self):
- pass
-
-
-@njit(parallel=True)
-def FromLoc2Image_MultiThreaded(
- xc_array: np.ndarray, yc_array: np.ndarray, photon_array: np.ndarray, sigma_array: np.ndarray, image_height: int, image_width: int, pixel_size: float
):
"""
Generate an image from localized emitters using multi-threading.
@@ -667,18 +475,18 @@ def binary2locs(img: np.ndarray, density: float):
def createBranchingTree(width=5000, height=5000, lineWidth=3):
np.random.seed(0) # Set a random seed for reproducibility
# Define the dimensions of the image
- width, height = 5000, 5000
+ width, height = width, height
# Create a blank white image
image = np.ones((height, width), dtype=np.uint8) * 255
# Function to draw a line (blood vessel) on the image
def draw_vessel(start, end, image):
+ from skimage.draw import line
rr, cc = line(start[0], start[1], end[0], end[1])
try:
image[rr, cc] = 0 # Draw a black line
except:
- end = 0
return
# Recursive function to draw a tree-like structure
@@ -729,20 +537,20 @@ def draw_tree(start, angle, length, depth, image, reducer, max_angle=40):
# convolve image with rectangle
rectangle = np.ones((lineWidth, lineWidth))
+ from scipy.signal import convolve2d
image = convolve2d(image, rectangle, mode="same", boundary="fill", fillvalue=0)
return image
if __name__ == "__main__":
-
- # Read the image locally
- # mFWD = os.path.dirname(os.path.realpath(__file__)).split("imswitch")[0]
- # imagePath = mFWD + "imswitch/_data/images/histoASHLARStitch.jpg"
imagePath = "smlm"
microscope = VirtualMicroscopy(filePath=imagePath)
+ vmManager = VirtualMicroscopeManager(rs232Info=type("RS232", (), {"managerProperties": {"imagePath": "smlm"}})(), name="VirtualScope")
microscope.illuminator.set_intensity(intensity=1000)
+ # Toggle objective to simulate switching and doubling magnification via binning
+ vmManager.toggleObjective()
for i in range(5):
microscope.positioner.move(
x=1400 + i * (-200), y=-800 + i * (-10), z=0, is_absolute=True
@@ -765,4 +573,4 @@ def draw_tree(start, angle, length, depth, image, reducer, max_angle=40):
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
-# along with this program. If not, see .
+# along with this program. If not, see .
\ No newline at end of file
diff --git a/imswitch/imcontrol/model/managers/rs232/VirtualMicroscopeManager_backup.py b/imswitch/imcontrol/model/managers/rs232/VirtualMicroscopeManager_backup.py
new file mode 100644
index 000000000..69cf06769
--- /dev/null
+++ b/imswitch/imcontrol/model/managers/rs232/VirtualMicroscopeManager_backup.py
@@ -0,0 +1,768 @@
+import os
+import cv2
+import math
+import time
+from imswitch import IS_HEADLESS, __file__
+import threading
+import numpy as np
+import matplotlib.pyplot as plt
+
+from skimage.draw import line
+from scipy.signal import convolve2d
+from imswitch.imcommon.model import initLogger
+
+try:
+ import NanoImagingPack as nip
+
+ IS_NIP = True
+except:
+ IS_NIP = False
+
+# Makes sure code still executes without numba, albeit extremely slow
+try:
+ from numba import njit, prange
+except ModuleNotFoundError:
+ prange = range
+
+ def njit(*args, **kwargs):
+ def wrapper(func):
+ return func
+
+ return wrapper
+
+
+class VirtualMicroscopeManager:
+ """A low-level wrapper for TCP-IP communication (ESP32 REST API)"""
+
+ def __init__(self, rs232Info, name, **_lowLevelManagers):
+ self.__logger = initLogger(self, instanceName=name)
+ self._settings = rs232Info.managerProperties
+ self._name = name
+ availableImageModalities = ["simplant", "smlm"]
+ try:
+ self._imagePath = rs232Info.managerProperties["imagePath"]
+ if not self._imagePath in availableImageModalities:
+ raise NameError
+ except:
+ package_dir = os.path.dirname(os.path.abspath(__file__))
+ self._imagePath = os.path.join(
+ package_dir, "_data/images/histoASHLARStitch.jpg"
+ )
+ self.__logger.info(
+ "If you want to use the plant, use 'imagePath': 'simplant' in your setup.json"
+ )
+ defaultJSON = {
+ "rs232devices": {
+ "VirtualMicroscope": {
+ "managerName": "VirtualMicroscopeManager",
+ "managerProperties": {"imagePath": "simplant"},
+ }
+ }
+ }
+ self.__logger.info("Default JSON:" + str(defaultJSON))
+
+ self._virtualMicroscope = VirtualMicroscopy(self._imagePath)
+ self._positioner = self._virtualMicroscope.positioner
+ self._camera = self._virtualMicroscope.camera
+ self._illuminator = self._virtualMicroscope.illuminator
+ self._objective = None
+
+ """
+ # Test the functionality
+ for i in range(10):
+ microscope.positioner.move(x=5, y=5)
+ microscope.illuminator.set_intensity(intensity=1.5)
+ frame = microscope.get_frame()
+ cv2.imshow("Microscope View", frame)
+ cv2.waitKey(100)
+
+ cv2.destroyAllWindows()
+ """
+
+ def finalize(self):
+ self._virtualMicroscope.stop()
+
+
+class Camera:
+ def __init__(self, parent, filePath="path_to_image.jpeg"):
+ self._parent = parent
+ self.filePath = filePath
+
+ if self.filePath == "simplant":
+ self.image = createBranchingTree(width=5000, height=5000)
+ self.image /= np.max(self.image)
+ elif self.filePath == "smlm":
+ tmp = createBranchingTree(width=5000, height=5000)
+ tmp_min = np.min(tmp)
+ tmp_max = np.max(tmp)
+ self.image = (
+ 1 - ((tmp - tmp_min) / (tmp_max - tmp_min)) > 0
+ ) # generating binary image
+ else:
+ self.image = np.mean(cv2.imread(filePath), axis=2)
+ self.image /= np.max(self.image)
+
+ self.lock = threading.Lock()
+ self.SensorHeight = 300 # self.image.shape[1]
+ self.SensorWidth = 400 # self.image.shape[0]
+ self.model = "VirtualCamera"
+ self.PixelSize = 1.0
+ self.isRGB = False
+ self.frameNumber = 0
+ # precompute noise so that we will save energy and trees
+ self.noiseStack = np.abs(
+ np.random.randn(self.SensorHeight, self.SensorWidth, 100) * 2
+ )
+
+ def produce_frame(
+ self, x_offset=0, y_offset=0, light_intensity=1.0, defocusPSF=None
+ ):
+ """Generate a frame based on the current settings."""
+ if self.filePath == "smlm": # There is likely a better way of handling this
+ return self.produce_smlm_frame(x_offset, y_offset, light_intensity)
+ else:
+ with self.lock:
+ # add moise
+ image = self.image.copy()
+ # Adjust image based on offsets
+ image = np.roll(
+ np.roll(image, int(x_offset), axis=1), int(y_offset), axis=0
+ )
+ image = nip.extract(image, (self.SensorHeight, self.SensorWidth)) # extract the image to the sensor size
+
+ # do all post-processing on cropped image
+ if IS_NIP and defocusPSF is not None and not defocusPSF.shape == ():
+ print("Defocus:" + str(defocusPSF.shape))
+ image = np.array(np.real(nip.convolve(image, defocusPSF)))
+ image = np.float32(image) * np.float32(light_intensity)
+ image += self.noiseStack[:, :, np.random.randint(0, 100)]
+
+
+ # Adjust illumination
+ image = image.astype(np.uint16)
+ time.sleep(0.1)
+ return np.array(image)
+
+ def produce_smlm_frame(self, x_offset=0, y_offset=0, light_intensity=5000):
+ """Generate a SMLM frame based on the current settings."""
+ with self.lock:
+ # add moise
+ image = self.image.copy()
+ # Adjust image based on offsets
+ image = np.roll(
+ np.roll(image, int(x_offset), axis=1), int(y_offset), axis=0
+ )
+ image = nip.extract(image, (self.SensorHeight, self.SensorWidth))
+
+ yc_array, xc_array = binary2locs(image, density=0.05)
+ photon_array = np.random.normal(
+ light_intensity * 5, light_intensity * 0.05, size=len(xc_array)
+ )
+
+ wavelenght = 6 # change to get it from microscope settings
+ wavelenght_std = 0.5 # change to get it from microscope settings
+ NA = 1.2 # change to get it from microscope settings
+ sigma = 0.21 * wavelenght / NA # change to get it from microscope settings
+ sigma_std = (
+ 0.21 * wavelenght_std / NA
+ ) # change to get it from microscope settings
+ sigma_array = np.random.normal(sigma, sigma_std, size=len(xc_array))
+
+ ADC_per_photon_conversion = 1.0 # change to get it from microscope settings
+ readout_noise = 50 # change to get it from microscope settings
+ ADC_offset = 100 # change to get it from microscope settings
+
+ out = FromLoc2Image_MultiThreaded(
+ xc_array,
+ yc_array,
+ photon_array,
+ sigma_array,
+ self.SensorHeight,
+ self.SensorWidth,
+ self.PixelSize,
+ )
+ out = (
+ ADC_per_photon_conversion * np.random.poisson(out)
+ + readout_noise
+ * np.random.normal(size=(self.SensorHeight, self.SensorWidth))
+ + ADC_offset
+ )
+ time.sleep(0.1)
+ return np.array(out)
+
+ def getLast(self, returnFrameNumber=False):
+ position = self._parent.positioner.get_position()
+ defocusPSF = np.squeeze(self._parent.positioner.get_psf())
+ intensity = self._parent.illuminator.get_intensity(1)
+ self.frameNumber += 1
+ if returnFrameNumber:
+ return (
+ self.produce_frame(
+ x_offset=position["X"],
+ y_offset=position["Y"],
+ light_intensity=intensity,
+ defocusPSF=defocusPSF,
+ ),
+ self.frameNumber,
+ )
+ else:
+ return self.produce_frame(
+ x_offset=position["X"],
+ y_offset=position["Y"],
+ light_intensity=intensity,
+ defocusPSF=defocusPSF,
+ )
+
+
+ def getLastChunk(self):
+ mFrame = self.getLast()
+ return np.expand_dims(mFrame, axis=0), [self.frameNumber] # we only provide one chunk, so we return a list with one element
+
+ def setPropertyValue(self, propertyName, propertyValue):
+ pass
+
+
+class Positioner:
+ def __init__(self, parent):
+ self._parent = parent
+ self.position = {"X": 0, "Y": 0, "Z": 0, "A": 0}
+ self.mDimensions = (
+ self._parent.camera.SensorHeight,
+ self._parent.camera.SensorWidth,
+ )
+ self.lock = threading.Lock()
+ if IS_NIP:
+ self.psf = self.compute_psf(dz=0)
+ else:
+ self.psf = None
+
+ def move(self, x=None, y=None, z=None, a=None, is_absolute=False):
+ with self.lock:
+ if is_absolute:
+ if x is not None:
+ self.position["X"] = x
+ if y is not None:
+ self.position["Y"] = y
+ if z is not None:
+ self.position["Z"] = z
+ self.compute_psf(self.position["Z"])
+ if a is not None:
+ self.position["A"] = a
+ else:
+ if x is not None:
+ self.position["X"] += x
+ if y is not None:
+ self.position["Y"] += y
+ if z is not None:
+ self.position["Z"] += z
+ self.compute_psf(self.position["Z"])
+ if a is not None:
+ self.position["A"] += a
+
+ def get_position(self):
+ with self.lock:
+ return self.position.copy()
+
+ def compute_psf(self, dz):
+ dz = np.float32(dz)
+ print("Defocus:" + str(dz))
+ if IS_NIP and dz != 0:
+ obj = nip.image(np.zeros(self.mDimensions))
+ obj.pixelsize = (100.0, 100.0)
+ paraAbber = nip.PSF_PARAMS()
+ # aber_map = nip.xx(obj.shape[-2:]).normalize(1)
+ paraAbber.aberration_types = [paraAbber.aberration_zernikes.spheric]
+ paraAbber.aberration_strength = [np.float32(dz) / 10]
+ psf = nip.psf(obj, paraAbber)
+ self.psf = psf.copy()
+ del psf
+ del obj
+ else:
+ self.psf = None
+
+ def get_psf(self):
+ return self.psf
+
+
+class Illuminator:import os
+import cv2
+import math
+import time
+import threading
+import numpy as np
+import matplotlib.pyplot as plt
+
+from imswitch import IS_HEADLESS, __file__
+from imswitch.imcommon.model import initLogger
+
+try:
+ import NanoImagingPack as nip
+ IS_NIP = True
+except:
+ IS_NIP = False
+
+try:
+ from numba import njit, prange
+except ModuleNotFoundError:
+ prange = range
+ def njit(*args, **kwargs):
+ def wrapper(func):
+ return func
+ return wrapper
+
+
+class VirtualMicroscopeManager:
+ """A low-level wrapper for TCP-IP communication (ESP32 REST API)
+ with added objective control that toggles the objective lens.
+ Toggling the objective will double the image magnification by
+ binning the pixels (2x2 binning).
+ """
+
+ def __init__(self, rs232Info, name, **_lowLevelManagers):
+ self.__logger = initLogger(self, instanceName=name)
+ self._settings = rs232Info.managerProperties
+ self._name = name
+
+ try:
+ self._imagePath = rs232Info.managerProperties["imagePath"]
+ if self._imagePath not in ["simplant", "smlm"]:
+ raise NameError
+ except:
+ package_dir = os.path.dirname(os.path.abspath(__file__))
+ self._imagePath = os.path.join(
+ package_dir, "_data/images/histoASHLARStitch.jpg"
+ )
+ self.__logger.info(
+ "If you want to use the plant, use 'imagePath': 'simplant' in your setup.json"
+ )
+
+ self._virtualMicroscope = VirtualMicroscopy(self._imagePath)
+ self._positioner = self._virtualMicroscope.positioner
+ self._camera = self._virtualMicroscope.camera
+ self._illuminator = self._virtualMicroscope.illuminator
+ self._objective = self._virtualMicroscope.objective
+
+ # Initialize objective state: 1 (default) => no binning, 2 => binned image (2x magnification)
+ self.currentObjective = 1
+ self._camera.binning = False
+
+ def toggleObjective(self):
+ """
+ Toggle the objective lens.
+ When toggled, the virtual objective move is simulated,
+ and the image magnification is changed by binning the pixels.
+ """
+ if self.currentObjective == 1:
+ # Move to objective 2: simulate move and apply 2x binning
+ self.__logger.info("Switching to Objective 2: Applying 2x binning")
+ # Here one could call a REST API endpoint like:
+ # /ObjectiveController/moveToObjective?slot=2
+ self.currentObjective = 2
+ self._camera.binning = True
+ else:
+ # Move back to objective 1: remove binning
+ self.__logger.info("Switching to Objective 1: Removing binning")
+ # Here one could call a REST API endpoint like:
+ # /ObjectiveController/moveToObjective?slot=1
+ self.currentObjective = 1
+ self._camera.binning = False
+
+ def finalize(self):
+ self._virtualMicroscope.stop()
+
+
+
+class Positioner:
+ def __init__(self, parent):
+ self._parent = parent
+ self.position = {"X": 0, "Y": 0, "Z": 0, "A": 0}
+ self.mDimensions = (self._parent.camera.SensorHeight, self._parent.camera.SensorWidth)
+ self.lock = threading.Lock()
+ if IS_NIP:
+ self.psf = self.compute_psf(dz=0)
+ else:
+ self.psf = None
+
+ def move(self, x=None, y=None, z=None, a=None, is_absolute=False):
+ with self.lock:
+ if is_absolute:
+ if x is not None:
+ self.position["X"] = x
+ if y is not None:
+ self.position["Y"] = y
+ if z is not None:
+ self.position["Z"] = z
+ self.compute_psf(self.position["Z"])
+ if a is not None:
+ self.position["A"] = a
+ else:
+ if x is not None:
+ self.position["X"] += x
+ if y is not None:
+ self.position["Y"] += y
+ if z is not None:
+ self.position["Z"] += z
+ self.compute_psf(self.position["Z"])
+ if a is not None:
+ self.position["A"] += a
+
+ def get_position(self):
+ with self.lock:
+ return self.position.copy()
+
+ def compute_psf(self, dz):
+ dz = np.float32(dz)
+ print("Defocus:" + str(dz))
+ if IS_NIP and dz != 0:
+ obj = nip.image(np.zeros(self.mDimensions))
+ obj.pixelsize = (100.0, 100.0)
+ paraAbber = nip.PSF_PARAMS()
+ paraAbber.aberration_types = [paraAbber.aberration_zernikes.spheric]
+ paraAbber.aberration_strength = [np.float32(dz) / 10]
+ psf = nip.psf(obj, paraAbber)
+ self.psf = psf.copy()
+ del psf
+ del obj
+ else:
+ self.psf = None
+
+ def get_psf(self):
+ return self.psf
+
+
+class Illuminator:
+ def __init__(self, parent):
+ self._parent = parent
+ self.intensity = 0
+ self.lock = threading.Lock()
+
+ def set_intensity(self, channel=1, intensity=0):
+ with self.lock:
+ self.intensity = intensity
+
+ def get_intensity(self, channel):
+ with self.lock:
+ return self.intensity
+
+
+class Objective:
+ def __init__(self, parent):
+ self._parent = parent
+ # Additional initialization for objective control can be added here
+
+
+class VirtualMicroscopy:
+ def __init__(self, filePath="path_to_image.jpeg"):
+ self.camera = Camera(self, filePath)
+ self.positioner = Positioner(self)
+ self.illuminator = Illuminator(self)
+ self.objective = Objective(self)
+
+ def stop(self):
+ pass
+
+
+@njit(parallel=True)
+def FromLoc2Image_MultiThreaded(
+ xc_array: np.ndarray, yc_array: np.ndarray, photon_array: np.ndarray,
+ sigma_array: np.ndarray, image_height: int, image_width: int, pixel_size: float
+):
+ Image = np.zeros((image_height, image_width))
+ for ij in prange(image_height * image_width):
+ j = int(ij / image_width)
+ i = ij - j * image_width
+ for xc, yc, photon, sigma in zip(xc_array, yc_array, photon_array, sigma_array):
+ if (photon > 0) and (sigma > 0):
+ S = sigma * math.sqrt(2)
+ x = i * pixel_size - xc
+ y = j * pixel_size - yc
+ if (x + pixel_size / 2) ** 2 + (y + pixel_size / 2) ** 2 < 16 * sigma**2:
+ ErfX = math.erf((x + pixel_size) / S) - math.erf(x / S)
+ ErfY = math.erf((y + pixel_size) / S) - math.erf(y / S)
+ Image[j][i] += 0.25 * photon * ErfX * ErfY
+ return Image
+
+
+def binary2locs(img: np.ndarray, density: float):
+ all_locs = np.nonzero(img == 1)
+ n_points = int(len(all_locs[0]) * density)
+ selected_idx = np.random.choice(len(all_locs[0]), n_points, replace=False)
+ filtered_locs = all_locs[0][selected_idx], all_locs[1][selected_idx]
+ return filtered_locs
+
+
+def createBranchingTree(width=5000, height=5000, lineWidth=3):
+ np.random.seed(0)
+ image = np.ones((height, width), dtype=np.uint8) * 255
+
+ def draw_vessel(start, end, image):
+ rr, cc = line(start[0], start[1], end[0], end[1])
+ try:
+ image[rr, cc] = 0
+ except:
+ return
+
+ def draw_tree(start, angle, length, depth, image, reducer, max_angle=40):
+ if depth == 0:
+ return
+ end = (int(start[0] + length * np.sin(np.radians(angle))),
+ int(start[1] + length * np.cos(np.radians(angle))))
+ draw_vessel(start, end, image)
+ angle += np.random.uniform(-10, 10)
+ new_length = length * reducer
+ new_depth = depth - 1
+ draw_tree(end, angle - max_angle * np.random.uniform(-1, 1), new_length, new_depth, image, reducer)
+ draw_tree(end, angle + max_angle * np.random.uniform(-1, 1), new_length, new_depth, image, reducer)
+
+ start_point = (height - 1, width // 2)
+ initial_angle = -90
+ initial_length = np.max((width, height)) * 0.15
+ depth = 7
+ reducer = 0.9
+ draw_tree(start_point, initial_angle, initial_length, depth, image, reducer)
+ rectangle = np.ones((lineWidth, lineWidth))
+ from scipy.signal import convolve2d
+ image = convolve2d(image, rectangle, mode="same", boundary="fill", fillvalue=0)
+ return image
+
+
+if __name__ == "__main__":
+ imagePath = "smlm"
+ microscope = VirtualMicroscopy(filePath=imagePath)
+ vmManager = VirtualMicroscopeManager(rs232Info=type("RS232", (), {"managerProperties": {"imagePath": "smlm"}})(), name="VirtualScope")
+ microscope.illuminator.set_intensity(intensity=1000)
+
+ # Toggle objective to simulate switching and doubling magnification via binning
+ vmManager.toggleObjective()
+ for i in range(5):
+ microscope.positioner.move(
+ x=1400 + i * (-200), y=-800 + i * (-10), z=0, is_absolute=True
+ )
+ frame = microscope.camera.getLast()
+ plt.imsave(f"frame_{i}.png", frame)
+ cv2.destroyAllWindows()
+
+ def __init__(self, parent):
+ self._parent = parent
+ self.intensity = 0
+ self.lock = threading.Lock()
+
+ def set_intensity(self, channel=1, intensity=0):
+ with self.lock:
+ self.intensity = intensity
+
+ def get_intensity(self, channel):
+ with self.lock:
+ return self.intensity
+
+class Objective:
+ def __init__(self, parent):
+ self._parent = parent
+
+
+class VirtualMicroscopy:
+ def __init__(self, filePath="path_to_image.jpeg"):
+ self.camera = Camera(self, filePath)
+ self.positioner = Positioner(self)
+ self.illuminator = Illuminator(self)
+ self.objective = Objective(self)
+
+ def stop(self):
+ pass
+
+
+@njit(parallel=True)
+def FromLoc2Image_MultiThreaded(
+ xc_array: np.ndarray, yc_array: np.ndarray, photon_array: np.ndarray, sigma_array: np.ndarray, image_height: int, image_width: int, pixel_size: float
+):
+ """
+ Generate an image from localized emitters using multi-threading.
+
+ Parameters
+ ----------
+ xc_array : array_like
+ Array of x-coordinates of the emitters.
+ yc_array : array_like
+ Array of y-coordinates of the emitters.
+ photon_array : array_like
+ Array of photon counts for each emitter.
+ sigma_array : array_like
+ Array of standard deviations (sigmas) for each emitter.
+ image_height : int
+ Height of the output image in pixels.
+ image_width : int
+ Width of the output image in pixels.
+ pixel_size : float
+ Size of each pixel in the image.
+
+ Returns
+ -------
+ Image : ndarray
+ 2D array representing the generated image.
+
+ Notes
+ -----
+ The function utilizes multi-threading for parallel processing using Numba's
+ `njit` decorator with `parallel=True`. Emitters with non-positive photon
+ counts or non-positive sigma values are ignored. Only emitters within a
+ distance of 4 sigma from the center of the pixel are considered to save
+ computation time.
+
+ The calculation involves error functions (`erf`) to determine the contribution
+ of each emitter to the pixel intensity.
+
+ Originally from: https://colab.research.google.com/github/HenriquesLab/ZeroCostDL4Mic/blob/master/Colab_notebooks/Deep-STORM_2D_ZeroCostDL4Mic.ipynb
+ """
+ Image = np.zeros((image_height, image_width))
+ for ij in prange(image_height * image_width):
+ j = int(ij / image_width)
+ i = ij - j * image_width
+ for xc, yc, photon, sigma in zip(xc_array, yc_array, photon_array, sigma_array):
+ # Don't bother if the emitter has photons <= 0 or if Sigma <= 0
+ if (photon > 0) and (sigma > 0):
+ S = sigma * math.sqrt(2)
+ x = i * pixel_size - xc
+ y = j * pixel_size - yc
+ # Don't bother if the emitter is further than 4 sigma from the centre of the pixel
+ if (x + pixel_size / 2) ** 2 + (
+ y + pixel_size / 2
+ ) ** 2 < 16 * sigma**2:
+ ErfX = math.erf((x + pixel_size) / S) - math.erf(x / S)
+ ErfY = math.erf((y + pixel_size) / S) - math.erf(y / S)
+ Image[j][i] += 0.25 * photon * ErfX * ErfY
+ return Image
+
+
+def binary2locs(img: np.ndarray, density: float):
+ """
+ Selects a subset of locations from a binary image based on a specified density.
+
+ Parameters
+ ----------
+ img : np.ndarray
+ 2D binary image array where 1s indicate points of interest.
+ density : float
+ Proportion of points to randomly select from the points of interest.
+ Should be a value between 0 and 1.
+
+ Returns
+ -------
+ filtered_locs : tuple of np.ndarray
+ Tuple containing two arrays. The first array contains the row indices
+ and the second array contains the column indices of the selected points.
+
+ Notes
+ -----
+ The function identifies all locations in the binary image where the value is 1.
+ It then randomly selects a subset of these locations based on the specified
+ density and returns their coordinates.
+ """
+ all_locs = np.nonzero(img == 1)
+ n_points = int(len(all_locs[0]) * density)
+ selected_idx = np.random.choice(len(all_locs[0]), n_points, replace=False)
+ filtered_locs = all_locs[0][selected_idx], all_locs[1][selected_idx]
+ return filtered_locs
+
+
+def createBranchingTree(width=5000, height=5000, lineWidth=3):
+ np.random.seed(0) # Set a random seed for reproducibility
+ # Define the dimensions of the image
+ width, height = 5000, 5000
+
+ # Create a blank white image
+ image = np.ones((height, width), dtype=np.uint8) * 255
+
+ # Function to draw a line (blood vessel) on the image
+ def draw_vessel(start, end, image):
+ rr, cc = line(start[0], start[1], end[0], end[1])
+ try:
+ image[rr, cc] = 0 # Draw a black line
+ except:
+ end = 0
+ return
+
+ # Recursive function to draw a tree-like structure
+ def draw_tree(start, angle, length, depth, image, reducer, max_angle=40):
+ if depth == 0:
+ return
+
+ # Calculate the end point of the branch
+ end = (
+ int(start[0] + length * np.sin(np.radians(angle))),
+ int(start[1] + length * np.cos(np.radians(angle))),
+ )
+
+ # Draw the branch
+ draw_vessel(start, end, image)
+
+ # change the angle slightly to add some randomness
+ angle += np.random.uniform(-10, 10)
+
+ # Recursively draw the next level of branches
+ new_length = length * reducer # Reduce the length for the next level
+ new_depth = depth - 1
+ draw_tree(
+ end,
+ angle - max_angle * np.random.uniform(-1, 1),
+ new_length,
+ new_depth,
+ image,
+ reducer,
+ )
+ draw_tree(
+ end,
+ angle + max_angle * np.random.uniform(-1, 1),
+ new_length,
+ new_depth,
+ image,
+ reducer,
+ )
+
+ # Starting point and parameters
+ start_point = (height - 1, width // 2)
+ initial_angle = -90 # Start by pointing upwards
+ initial_length = np.max((width, height)) * 0.15 # Length of the first branch
+ depth = 7 # Number of branching levels
+ reducer = 0.9
+ # Draw the tree structure
+ draw_tree(start_point, initial_angle, initial_length, depth, image, reducer)
+
+ # convolve image with rectangle
+ rectangle = np.ones((lineWidth, lineWidth))
+ image = convolve2d(image, rectangle, mode="same", boundary="fill", fillvalue=0)
+
+ return image
+
+
+if __name__ == "__main__":
+
+ # Read the image locally
+ # mFWD = os.path.dirname(os.path.realpath(__file__)).split("imswitch")[0]
+ # imagePath = mFWD + "imswitch/_data/images/histoASHLARStitch.jpg"
+ imagePath = "smlm"
+ microscope = VirtualMicroscopy(filePath=imagePath)
+ microscope.illuminator.set_intensity(intensity=1000)
+
+ for i in range(5):
+ microscope.positioner.move(
+ x=1400 + i * (-200), y=-800 + i * (-10), z=0, is_absolute=True
+ )
+ frame = microscope.camera.getLast()
+ plt.imsave(f"frame_{i}.png", frame)
+ cv2.destroyAllWindows()
+
+# Copyright (C) 2020-2024 ImSwitch developers
+# This file is part of ImSwitch.
+#
+# ImSwitch is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# ImSwitch is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
diff --git a/imswitch/imcontrol/model/managers/rs232/VirtualSLM.py b/imswitch/imcontrol/model/managers/rs232/VirtualSLM.py
new file mode 100644
index 000000000..b17f0ed00
--- /dev/null
+++ b/imswitch/imcontrol/model/managers/rs232/VirtualSLM.py
@@ -0,0 +1,382 @@
+"""
+Virtual SLM (Spatial Light Modulator) simulation for ImSwitch Virtual Microscope
+
+This module provides simulation capabilities for SLM-based microscopy techniques
+such as structured illumination, beam shaping, and holographic manipulation.
+"""
+
+import numpy as np
+import threading
+from typing import Dict, List, Optional, Tuple, Union
+from imswitch.imcommon.model import initLogger
+
+
+class VirtualSLM:
+ """
+ Virtual Spatial Light Modulator for advanced microscopy simulation
+
+ Supports various SLM patterns and operations for educational and research purposes:
+ - Structured illumination patterns (sinusoidal, square wave)
+ - Beam shaping (donut, top-hat, Gaussian)
+ - Holographic patterns for optical trapping
+ - Aberration correction via Zernike polynomials
+ - Custom pattern upload and display
+ """
+
+ def __init__(self, parent, width=1920, height=1152):
+ """
+ Initialize Virtual SLM
+
+ Parameters:
+ -----------
+ parent : VirtualMicroscopy
+ Parent virtual microscopy instance
+ width : int
+ SLM width in pixels (default: 1920 for typical SLM)
+ height : int
+ SLM height in pixels (default: 1152 for typical SLM)
+ """
+ self._parent = parent
+ self._logger = initLogger(self, tryInheritParent=True)
+
+ self.width = width
+ self.height = height
+ self.lock = threading.Lock()
+
+ # Current SLM state
+ self._current_pattern = np.zeros((height, width), dtype=np.uint8)
+ self._is_active = False
+ self._pattern_type = "blank"
+
+ # Pattern parameters
+ self._pattern_params = {
+ "frequency": 10, # lines per mm
+ "phase": 0, # phase offset in radians
+ "amplitude": 255, # pattern amplitude (0-255)
+ "angle": 0, # pattern rotation angle
+ "center_x": width // 2,
+ "center_y": height // 2
+ }
+
+ # Aberration correction parameters (Zernike coefficients)
+ self._zernike_coeffs = {
+ "tip": 0.0, # Z2 - tip
+ "tilt": 0.0, # Z3 - tilt
+ "defocus": 0.0, # Z4 - defocus
+ "astig_0": 0.0, # Z5 - astigmatism 0°
+ "astig_45": 0.0, # Z6 - astigmatism 45°
+ "coma_x": 0.0, # Z7 - coma x
+ "coma_y": 0.0, # Z8 - coma y
+ "spherical": 0.0 # Z9 - spherical aberration
+ }
+
+ self._logger.info(f"Initialized Virtual SLM ({width}x{height} pixels)")
+
+ def set_pattern(self, pattern_type: str, **kwargs) -> bool:
+ """
+ Set SLM pattern type and parameters
+
+ Parameters:
+ -----------
+ pattern_type : str
+ Pattern type: 'blank', 'sinusoidal', 'square', 'donut', 'tophat', 'gaussian', 'custom'
+ **kwargs : dict
+ Pattern-specific parameters
+
+ Returns:
+ --------
+ bool : Success status
+ """
+ with self.lock:
+ try:
+ # Update pattern parameters
+ for key, value in kwargs.items():
+ if key in self._pattern_params:
+ self._pattern_params[key] = value
+
+ self._pattern_type = pattern_type
+ self._current_pattern = self._generate_pattern(pattern_type)
+
+ self._logger.info(f"Set SLM pattern: {pattern_type} with params {kwargs}")
+ return True
+
+ except Exception as e:
+ self._logger.error(f"Failed to set SLM pattern: {e}")
+ return False
+
+ def _generate_pattern(self, pattern_type: str) -> np.ndarray:
+ """Generate the specified pattern"""
+
+ if pattern_type == "blank":
+ return np.zeros((self.height, self.width), dtype=np.uint8)
+
+ elif pattern_type == "sinusoidal":
+ return self._generate_sinusoidal()
+
+ elif pattern_type == "square":
+ return self._generate_square_wave()
+
+ elif pattern_type == "donut":
+ return self._generate_donut()
+
+ elif pattern_type == "tophat":
+ return self._generate_tophat()
+
+ elif pattern_type == "gaussian":
+ return self._generate_gaussian()
+
+ else:
+ self._logger.warning(f"Unknown pattern type: {pattern_type}, using blank")
+ return np.zeros((self.height, self.width), dtype=np.uint8)
+
+ def _generate_sinusoidal(self) -> np.ndarray:
+ """Generate sinusoidal grating pattern for structured illumination"""
+ y, x = np.ogrid[:self.height, :self.width]
+
+ # Create coordinate system centered at pattern center
+ x_centered = x - self._pattern_params["center_x"]
+ y_centered = y - self._pattern_params["center_y"]
+
+ # Apply rotation
+ angle = np.radians(self._pattern_params["angle"])
+ x_rot = x_centered * np.cos(angle) - y_centered * np.sin(angle)
+
+ # Generate sinusoidal pattern
+ frequency = self._pattern_params["frequency"] * 2 * np.pi / self.width
+ phase = self._pattern_params["phase"]
+ amplitude = self._pattern_params["amplitude"]
+
+ pattern = 127 + (amplitude / 2) * np.sin(frequency * x_rot + phase)
+ return np.clip(pattern, 0, 255).astype(np.uint8)
+
+ def _generate_square_wave(self) -> np.ndarray:
+ """Generate square wave pattern"""
+ y, x = np.ogrid[:self.height, :self.width]
+
+ x_centered = x - self._pattern_params["center_x"]
+ y_centered = y - self._pattern_params["center_y"]
+
+ angle = np.radians(self._pattern_params["angle"])
+ x_rot = x_centered * np.cos(angle) - y_centered * np.sin(angle)
+
+ frequency = self._pattern_params["frequency"] * 2 * np.pi / self.width
+ phase = self._pattern_params["phase"]
+ amplitude = self._pattern_params["amplitude"]
+
+ pattern = 127 + (amplitude / 2) * np.sign(np.sin(frequency * x_rot + phase))
+ return np.clip(pattern, 0, 255).astype(np.uint8)
+
+ def _generate_donut(self) -> np.ndarray:
+ """Generate donut pattern for STED-like applications"""
+ y, x = np.ogrid[:self.height, :self.width]
+
+ center_x = self._pattern_params["center_x"]
+ center_y = self._pattern_params["center_y"]
+
+ # Calculate distance from center
+ r = np.sqrt((x - center_x)**2 + (y - center_y)**2)
+
+ # Generate donut profile (higher intensity at edges)
+ r_norm = r / (min(self.width, self.height) / 4) # Normalize radius
+ donut = np.sin(np.pi * r_norm)**2 # Donut shape
+
+ pattern = self._pattern_params["amplitude"] * donut
+ return np.clip(pattern, 0, 255).astype(np.uint8)
+
+ def _generate_tophat(self) -> np.ndarray:
+ """Generate top-hat (flat circular) pattern"""
+ y, x = np.ogrid[:self.height, :self.width]
+
+ center_x = self._pattern_params["center_x"]
+ center_y = self._pattern_params["center_y"]
+
+ r = np.sqrt((x - center_x)**2 + (y - center_y)**2)
+ radius = min(self.width, self.height) / 6
+
+ pattern = np.where(r <= radius, self._pattern_params["amplitude"], 0)
+ return pattern.astype(np.uint8)
+
+ def _generate_gaussian(self) -> np.ndarray:
+ """Generate Gaussian beam pattern"""
+ y, x = np.ogrid[:self.height, :self.width]
+
+ center_x = self._pattern_params["center_x"]
+ center_y = self._pattern_params["center_y"]
+
+ sigma = min(self.width, self.height) / 8 # Beam waist
+
+ r_sq = (x - center_x)**2 + (y - center_y)**2
+ gaussian = np.exp(-r_sq / (2 * sigma**2))
+
+ pattern = self._pattern_params["amplitude"] * gaussian
+ return np.clip(pattern, 0, 255).astype(np.uint8)
+
+ def apply_aberration_correction(self, **zernike_coeffs) -> bool:
+ """
+ Apply aberration correction using Zernike polynomials
+
+ Parameters:
+ -----------
+ **zernike_coeffs : dict
+ Zernike coefficients for aberration correction
+
+ Returns:
+ --------
+ bool : Success status
+ """
+ with self.lock:
+ try:
+ # Update Zernike coefficients
+ for mode, coeff in zernike_coeffs.items():
+ if mode in self._zernike_coeffs:
+ self._zernike_coeffs[mode] = coeff
+
+ # Apply aberration correction to current pattern
+ correction = self._generate_zernike_correction()
+ self._current_pattern = np.clip(
+ self._current_pattern.astype(np.float32) + correction, 0, 255
+ ).astype(np.uint8)
+
+ self._logger.info(f"Applied aberration correction: {zernike_coeffs}")
+ return True
+
+ except Exception as e:
+ self._logger.error(f"Failed to apply aberration correction: {e}")
+ return False
+
+ def _generate_zernike_correction(self) -> np.ndarray:
+ """Generate Zernike aberration correction pattern"""
+ y, x = np.ogrid[:self.height, :self.width]
+
+ # Normalize coordinates to unit circle
+ x_norm = (2 * x / self.width) - 1
+ y_norm = (2 * y / self.height) - 1
+
+ # Calculate polar coordinates
+ rho = np.sqrt(x_norm**2 + y_norm**2)
+ phi = np.arctan2(y_norm, x_norm)
+
+ # Apply unit circle mask
+ mask = rho <= 1
+
+ # Initialize correction pattern
+ correction = np.zeros((self.height, self.width))
+
+ # Add Zernike modes (simplified implementation)
+ if self._zernike_coeffs["tip"] != 0:
+ correction += self._zernike_coeffs["tip"] * rho * np.cos(phi) * mask
+
+ if self._zernike_coeffs["tilt"] != 0:
+ correction += self._zernike_coeffs["tilt"] * rho * np.sin(phi) * mask
+
+ if self._zernike_coeffs["defocus"] != 0:
+ correction += self._zernike_coeffs["defocus"] * (2 * rho**2 - 1) * mask
+
+ if self._zernike_coeffs["astig_0"] != 0:
+ correction += self._zernike_coeffs["astig_0"] * rho**2 * np.cos(2 * phi) * mask
+
+ if self._zernike_coeffs["astig_45"] != 0:
+ correction += self._zernike_coeffs["astig_45"] * rho**2 * np.sin(2 * phi) * mask
+
+ # Scale correction to appropriate range
+ return 20 * correction # Adjust scaling as needed
+
+ def upload_custom_pattern(self, pattern: np.ndarray) -> bool:
+ """
+ Upload custom pattern to SLM
+
+ Parameters:
+ -----------
+ pattern : np.ndarray
+ Custom pattern array (will be resized if needed)
+
+ Returns:
+ --------
+ bool : Success status
+ """
+ with self.lock:
+ try:
+ # Resize pattern if needed
+ if pattern.shape != (self.height, self.width):
+ from scipy.ndimage import zoom
+ scale_y = self.height / pattern.shape[0]
+ scale_x = self.width / pattern.shape[1]
+ pattern = zoom(pattern, (scale_y, scale_x), order=1)
+
+ # Ensure proper data type and range
+ pattern = np.clip(pattern, 0, 255).astype(np.uint8)
+
+ self._current_pattern = pattern
+ self._pattern_type = "custom"
+
+ self._logger.info(f"Uploaded custom pattern: {pattern.shape}")
+ return True
+
+ except Exception as e:
+ self._logger.error(f"Failed to upload custom pattern: {e}")
+ return False
+
+ def get_pattern(self) -> np.ndarray:
+ """Get current SLM pattern"""
+ with self.lock:
+ return self._current_pattern.copy()
+
+ def set_active(self, active: bool):
+ """Set SLM active state"""
+ with self.lock:
+ self._is_active = active
+ self._logger.info(f"SLM {'activated' if active else 'deactivated'}")
+
+ def is_active(self) -> bool:
+ """Check if SLM is active"""
+ with self.lock:
+ return self._is_active
+
+ def get_status(self) -> Dict:
+ """Get comprehensive SLM status"""
+ with self.lock:
+ return {
+ "active": self._is_active,
+ "pattern_type": self._pattern_type,
+ "pattern_params": self._pattern_params.copy(),
+ "zernike_coeffs": self._zernike_coeffs.copy(),
+ "dimensions": (self.height, self.width)
+ }
+
+ def reset(self):
+ """Reset SLM to blank state"""
+ with self.lock:
+ self._current_pattern = np.zeros((self.height, self.width), dtype=np.uint8)
+ self._pattern_type = "blank"
+ self._is_active = False
+
+ # Reset parameters to defaults
+ self._pattern_params = {
+ "frequency": 10,
+ "phase": 0,
+ "amplitude": 255,
+ "angle": 0,
+ "center_x": self.width // 2,
+ "center_y": self.height // 2
+ }
+
+ self._zernike_coeffs = {key: 0.0 for key in self._zernike_coeffs}
+
+ self._logger.info("SLM reset to blank state")
+
+
+# Copyright (C) 2020-2024 ImSwitch developers
+# This file is part of ImSwitch.
+#
+# ImSwitch is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# ImSwitch is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
\ No newline at end of file