From 2817f445151c8dc324b56b055978712686cee1a0 Mon Sep 17 00:00:00 2001 From: doug tilley Date: Wed, 10 Dec 2025 22:20:26 +0000 Subject: [PATCH] feat: Add Newton physics backend with XPBD solver support Complete Newton physics backend implementation for the ARK simulator framework. This adds Newton as a fourth backend option alongside PyBullet, MuJoCo, and Genesis. The Newton backend provides GPU-accelerated physics simulation using Newton's XPBD (Extended Position-Based Dynamics) solver with full integration into ARK's existing architecture. Core Framework Changes: - simulator_node.py: Added Newton backend option to backend selection logic. When backend_type: "newton" is specified, the simulator imports and instantiates NewtonBackend. - subscriber.py: Minor enhancement to subscriber handling for improved message processing. Newton Backend Implementation: - newton_backend.py: Main backend class implementing SimulatorBackend ABC. Handles initialization, physics stepping, collision detection, and component lifecycle management. Configurable via YAML with gravity, substeps, solver iterations, and joint control parameters. - newton_builder.py: Wrapper around Newton's ModelBuilder with comprehensive defensive programming. Provides safe URDF loading, joint configuration defaults, and scene metadata tracking. - newton_robot_driver.py: Robot driver implementing RobotDriver interface. Supports TARGET_POSITION, TARGET_VELOCITY, and FORCE joint control modes. Publishes joint_state_t and subscribes to joint_group_command_t for arm/gripper control. - newton_camera_driver.py: Camera sensor driver for RGB and depth capture using Newton's viewer for GPU-accelerated rendering. - newton_lidar_driver.py: LiDAR sensor driver using Newton's ray-casting capabilities with configurable scan parameters. - newton_multibody.py: Rigid body management for non-articulated objects (cubes, spheres, etc.). - newton_viewer.py: Viewer manager supporting GUI mode (OpenGL) and headless mode for render loop and state visualization. - geometry_descriptors.py: Solver-agnostic geometry representation for shape descriptors adaptable to different Newton solvers. Solver Adapters: - scene_adapters/base_adapter.py: Abstract base class defining the adapter interface for solver-specific scene building. - scene_adapters/xpbd_adapter.py: XPBD solver adapter using Newton's native add_ground_plane() and standard contact handling. - scene_adapters/mujoco_adapter.py: MuJoCo solver adapter with workaround for ground plane limitations using box geometry. The Newton backend is 100% Newton-native - it uses only Newton APIs for physics, kinematics, and collision detection. --- ark/client/comm_handler/subscriber.py | 13 + ark/system/newton/geometry_descriptors.py | 175 +++ ark/system/newton/newton_backend.py | 559 ++++++++ ark/system/newton/newton_builder.py | 1161 +++++++++++++++++ ark/system/newton/newton_camera_driver.py | 342 +++++ ark/system/newton/newton_lidar_driver.py | 262 ++++ ark/system/newton/newton_multibody.py | 212 +++ ark/system/newton/newton_robot_driver.py | 516 ++++++++ ark/system/newton/newton_viewer.py | 206 +++ ark/system/newton/scene_adapters/__init__.py | 27 + .../newton/scene_adapters/base_adapter.py | 163 +++ .../newton/scene_adapters/mujoco_adapter.py | 180 +++ .../newton/scene_adapters/xpbd_adapter.py | 157 +++ ark/system/simulation/simulator_node.py | 97 +- 14 files changed, 4027 insertions(+), 43 deletions(-) create mode 100644 ark/system/newton/geometry_descriptors.py create mode 100644 ark/system/newton/newton_backend.py create mode 100644 ark/system/newton/newton_builder.py create mode 100644 ark/system/newton/newton_camera_driver.py create mode 100644 ark/system/newton/newton_lidar_driver.py create mode 100644 ark/system/newton/newton_multibody.py create mode 100644 ark/system/newton/newton_robot_driver.py create mode 100644 ark/system/newton/newton_viewer.py create mode 100644 ark/system/newton/scene_adapters/__init__.py create mode 100644 ark/system/newton/scene_adapters/base_adapter.py create mode 100644 ark/system/newton/scene_adapters/mujoco_adapter.py create mode 100644 ark/system/newton/scene_adapters/xpbd_adapter.py diff --git a/ark/client/comm_handler/subscriber.py b/ark/client/comm_handler/subscriber.py index 1021741..69ae9cb 100644 --- a/ark/client/comm_handler/subscriber.py +++ b/ark/client/comm_handler/subscriber.py @@ -65,6 +65,19 @@ def subscribe(self): @return: ``None`` """ self._sub = self._lcm.subscribe(self.channel_name, self.subscriber_callback) + + if self._sub is None: + log.error( + f"Failed to subscribe to channel '{self.channel_name}'. " + f"LCM subscription returned None - this usually indicates:\n" + f" - LCM networking issue (check firewall, routing, ttl settings)\n" + f" - Invalid channel name\n" + f" - LCM not properly initialized\n" + f"Current LCM URL: {getattr(self._lcm, '_url', 'unknown')}" + ) + self.active = False + return + self._sub.set_queue_capacity(1) # TODO: configurable log.ok(f"subscribed to {self}") self.active = True diff --git a/ark/system/newton/geometry_descriptors.py b/ark/system/newton/geometry_descriptors.py new file mode 100644 index 0000000..100e095 --- /dev/null +++ b/ark/system/newton/geometry_descriptors.py @@ -0,0 +1,175 @@ +"""Solver-agnostic geometry descriptions for scene building. + +This module provides unified data structures for describing scene geometry +in a way that can be adapted to any Newton physics solver (XPBD, MuJoCo, etc.). + +The key insight is that users should describe geometry semantically (e.g., "ground plane") +without worrying about solver-specific implementation details. Adapters then translate +these descriptions to solver-compatible representations. + +Example: + >>> descriptor = GeometryDescriptor.from_ground_plane_config({ + ... "friction": 0.8, + ... "restitution": 0.0, + ... "thickness": 0.02 + ... }) + >>> # Adapter decides whether to use native plane or box geometry +""" + +from dataclasses import dataclass, field +from enum import Enum +from typing import Any, Dict + + +class GeometryType(Enum): + """Solver-agnostic geometry types. + + These represent semantic geometry types that may have different + implementations depending on the physics solver being used. + """ + + INFINITE_PLANE = "infinite_plane" # Ground plane, collision floor + BOX = "box" # Rectangular prism + SPHERE = "sphere" # Sphere + CAPSULE = "capsule" # Capsule (cylinder with hemispherical ends) + CYLINDER = "cylinder" # Cylinder + MESH = "mesh" # Triangle mesh + SDF = "sdf" # Signed distance field + URDF = "urdf" # URDF-defined articulation + + +@dataclass +class GeometryDescriptor: + """Unified geometry description that can be adapted to any solver. + + This class encapsulates all information needed to create geometry in a + solver-agnostic way. Adapters translate these descriptions to solver-specific + API calls. + + Attributes: + geometry_type: Semantic type of geometry (plane, box, sphere, etc.) + parameters: Geometric parameters (size, radius, etc.) - type-specific + physics: Physical properties (friction, restitution, mass, etc.) + metadata: Optional metadata (name, tags, etc.) + + Example: + Creating a ground plane descriptor: + + >>> ground = GeometryDescriptor( + ... geometry_type=GeometryType.INFINITE_PLANE, + ... parameters={"thickness": 0.02}, # For box fallback + ... physics={"friction": 0.8, "restitution": 0.0} + ... ) + """ + + geometry_type: GeometryType + parameters: Dict[str, Any] = field(default_factory=dict) + physics: Dict[str, Any] = field(default_factory=dict) + metadata: Dict[str, Any] = field(default_factory=dict) + + @classmethod + def from_ground_plane_config(cls, cfg: Dict[str, Any]) -> "GeometryDescriptor": + """Create ground plane descriptor from ARK config format. + + This factory method handles the standard ARK ground_plane configuration + format and converts it to a unified descriptor. + + Args: + cfg: Ground plane configuration dict, typically from YAML: + { + "enabled": true, + "friction": 0.8, + "restitution": 0.0, + "thickness": 0.02 # For box fallback if solver needs it + } + + Returns: + GeometryDescriptor representing an infinite ground plane + + Example: + >>> cfg = {"friction": 0.8, "restitution": 0.0, "thickness": 0.02} + >>> descriptor = GeometryDescriptor.from_ground_plane_config(cfg) + >>> descriptor.geometry_type + + """ + return cls( + geometry_type=GeometryType.INFINITE_PLANE, + parameters={ + "size": cfg.get("size", 100.0), # Half-extent for box fallback + "thickness": cfg.get("thickness", 0.02), # Half-height for box fallback + }, + physics={ + "friction": cfg.get("friction", 0.8), + "restitution": cfg.get("restitution", 0.0), + "density": 0.0, # Static body (infinite mass) + }, + metadata={ + "name": "ground", + "semantic_type": "ground_plane", + } + ) + + @classmethod + def from_primitive_config(cls, cfg: Dict[str, Any]) -> "GeometryDescriptor": + """Create primitive shape descriptor from ARK object config. + + This factory method handles primitive object configurations (boxes, spheres, etc.) + and converts them to unified descriptors. + + Args: + cfg: Primitive configuration dict: + { + "shape": "box", # or "sphere", "capsule", etc. + "size": [0.1, 0.1, 0.1], # Full extents for box + "radius": 0.05, # For sphere/capsule + "height": 0.2, # For cylinder/capsule + "mass": 0.1, + "friction": 0.8, + "restitution": 0.0 + } + + Returns: + GeometryDescriptor representing the primitive shape + + Example: + >>> cfg = {"shape": "box", "size": [0.1, 0.1, 0.1], "mass": 0.5} + >>> descriptor = GeometryDescriptor.from_primitive_config(cfg) + >>> descriptor.geometry_type + + """ + shape = cfg.get("shape", "box").lower() + + # Map shape string to GeometryType + shape_map = { + "box": GeometryType.BOX, + "sphere": GeometryType.SPHERE, + "capsule": GeometryType.CAPSULE, + "cylinder": GeometryType.CYLINDER, + } + + geometry_type = shape_map.get(shape, GeometryType.BOX) + + # Extract shape-specific parameters + parameters = {} + if geometry_type == GeometryType.BOX: + size = cfg.get("size", [1.0, 1.0, 1.0]) + # Convert full extents to half-extents for Newton API + parameters["half_extents"] = [abs(float(s)) * 0.5 for s in size] + elif geometry_type == GeometryType.SPHERE: + parameters["radius"] = cfg.get("radius", 0.5) + elif geometry_type in (GeometryType.CAPSULE, GeometryType.CYLINDER): + parameters["radius"] = cfg.get("radius", 0.05) + parameters["half_height"] = cfg.get("height", 1.0) * 0.5 + + return cls( + geometry_type=geometry_type, + parameters=parameters, + physics={ + "mass": cfg.get("mass", 1.0), + "friction": cfg.get("friction", 0.8), + "restitution": cfg.get("restitution", 0.0), + }, + metadata={ + "name": cfg.get("name", "primitive"), + } + ) diff --git a/ark/system/newton/newton_backend.py b/ark/system/newton/newton_backend.py new file mode 100644 index 0000000..0864f69 --- /dev/null +++ b/ark/system/newton/newton_backend.py @@ -0,0 +1,559 @@ +"""Newton backend integration for the ARK simulator.""" + +from __future__ import annotations + +import ast +import importlib.util +import sys +from pathlib import Path +from typing import Any, Callable, Optional + +import newton +import numpy as np +import warp as wp + +from ark.tools.log import log +from ark.system.simulation.simulator_backend import SimulatorBackend +from ark.system.newton.newton_builder import NewtonBuilder +from ark.system.newton.newton_camera_driver import NewtonCameraDriver +from ark.system.newton.newton_lidar_driver import NewtonLiDARDriver +from ark.system.newton.newton_multibody import NewtonMultiBody +from ark.system.newton.newton_robot_driver import NewtonRobotDriver +from ark.system.newton.newton_viewer import NewtonViewerManager +from ark.client.frequencies.rate import Rate +from arktypes import * + + +def import_class_from_directory(path: Path) -> tuple[type[Any], Optional[type[Any]]]: + """Import a class (and optional driver enum) from ``path``. + The helper searches for ``.py`` inside ``path`` and imports the + class with the same name. If a ``Drivers`` class is present in the module + its ``NEWTON_DRIVER`` attribute is returned alongside the main class. + + @param path Path to the directory containing the module. + @return Tuple ``(cls, driver_cls)`` where ``driver_cls`` is ``None`` when no + driver is defined. + @rtype Tuple[type, Optional[type]] + + """ + + ## Extract the class name from the last part of the directory path (last directory name) + class_name = path.name + file_path = (path / f"{class_name}.py").resolve() ##just add the resolve here instead of newline + ## Defensive check for the filepath, raise error if not found + if not file_path.exists(): + raise FileNotFoundError(f"The file {file_path} does not exist.") + + with open(file_path, "r", encoding="utf-8") as handle: + tree = ast.parse(handle.read(), filename=str(file_path)) + + module_dir = str(file_path.parent) + sys.path.insert(0, module_dir) + ## Import the module dynamically and extract class names defensively + try: + class_names = [node.name for node in ast.walk(tree) if isinstance(node, ast.ClassDef)] + drivers_attr: Optional[type[Any]] = None + + spec = importlib.util.spec_from_file_location(class_name, file_path) + if spec is None or spec.loader is None: + raise ImportError(f"Could not create module spec for {file_path}") + module = importlib.util.module_from_spec(spec) + sys.modules[class_name] = module + spec.loader.exec_module(module) + + if "Drivers" in class_names: + drivers_cls = getattr(module, "Drivers", None) + drivers_attr = getattr(drivers_cls, "NEWTON_DRIVER", None) if drivers_cls else None + class_names.remove("Drivers") + + target_name = class_names[0] if class_names else class_name + target_cls = getattr(module, target_name) + finally: + sys.path.pop(0) + sys.modules.pop(class_name, None) + + return target_cls, drivers_attr + + +class NewtonBackend(SimulatorBackend): + """Simulation backend using the Newton physics engine.""" + + def _determine_up_axis(self, gravity: tuple[float, float, float]) -> newton.Axis: + """Determine up axis from gravity vector.""" + gx, gy, gz = gravity + components = np.array([gx, gy, gz], dtype=float) + if not np.any(components): + return newton.Axis.Z # Default to Z-up if no gravity + idx = int(np.argmax(np.abs(components))) + axis_map = {0: newton.Axis.X, 1: newton.Axis.Y, 2: newton.Axis.Z} + return axis_map[idx] + + def _extract_gravity_magnitude(self, gravity: tuple[float, float, float]) -> float: + """Extract gravity magnitude from gravity vector.""" + gx, gy, gz = gravity + components = np.array([gx, gy, gz], dtype=float) + if not np.any(components): + return -9.81 # Default gravity + idx = int(np.argmax(np.abs(components))) + return float(components[idx]) + + def _apply_joint_defaults(self, sim_cfg: dict[str, Any]) -> None: + """Apply Newton-specific joint defaults from configuration. + + This must be called BEFORE loading any robots so defaults apply to all URDFs. + """ + newton_cfg = sim_cfg.get("newton_physics", {}) + joint_cfg = newton_cfg.get("joint_defaults", {}) + + if not joint_cfg: + log.info("Newton backend: No joint_defaults in config, using Newton defaults") + return + + # Map string mode to Newton enum + mode_str = joint_cfg.get("mode", "TARGET_POSITION").upper() + mode_value = None + if mode_str == "TARGET_POSITION": + mode_value = newton.JointMode.TARGET_POSITION + elif mode_str == "TARGET_VELOCITY": + mode_value = newton.JointMode.TARGET_VELOCITY + elif mode_str == "FORCE": + mode_value = newton.JointMode.FORCE + + # Build kwargs dict for set_default_joint_config + joint_defaults = {} + if mode_value is not None: + joint_defaults["mode"] = mode_value + if "target_ke" in joint_cfg: + joint_defaults["target_ke"] = float(joint_cfg["target_ke"]) + if "target_kd" in joint_cfg: + joint_defaults["target_kd"] = float(joint_cfg["target_kd"]) + if "limit_ke" in joint_cfg: + joint_defaults["limit_ke"] = float(joint_cfg["limit_ke"]) + if "limit_kd" in joint_cfg: + joint_defaults["limit_kd"] = float(joint_cfg["limit_kd"]) + if "armature" in joint_cfg: + joint_defaults["armature"] = float(joint_cfg["armature"]) + + # Apply via NewtonBuilder + self.scene_builder.set_default_joint_config(**joint_defaults) + + log.info( + f"Newton backend: Applied joint defaults - " + f"ke={self.scene_builder.builder.default_joint_cfg.target_ke}, " + f"kd={self.scene_builder.builder.default_joint_cfg.target_kd}, " + f"armature={self.scene_builder.builder.default_joint_cfg.armature}" + ) + + def initialize(self) -> None: + self.ready = False + sim_cfg = self.global_config.get("simulator", {}).get("config", {}) + # Ensure namespace and channel metadata exist for downstream components + if "namespace" not in self.global_config: + namespace = self.global_config.get("simulator", {}).get("namespace", "ark") + self.global_config["namespace"] = namespace + self.global_config.setdefault("observation_channels", None) + self.global_config.setdefault("action_channels", None) + + self.sim_frequency = float(sim_cfg.get("sim_frequency", 240.0)) + self.sim_substeps = max(int(sim_cfg.get("substeps", 1)), 1) + base_dt = 1.0 / self.sim_frequency if self.sim_frequency > 0 else 0.005 + self.set_time_step(base_dt) + + # Create NewtonBuilder instead of raw ModelBuilder + gravity = tuple(sim_cfg.get("gravity", [0.0, 0.0, -9.81])) + up_axis = self._determine_up_axis(gravity) + gravity_magnitude = self._extract_gravity_magnitude(gravity) + + self.scene_builder = NewtonBuilder( + model_name="ark_world", + up_axis=up_axis, + gravity=gravity_magnitude + ) + + # Create solver-specific adapter early (before scene building) + solver_name = sim_cfg.get("solver", "xpbd") or "xpbd" + self.adapter = self._create_scene_adapter(solver_name) + log.info(f"Newton backend: Using {self.adapter.solver_name} solver adapter") + + device_name = sim_cfg.get("device") + + if device_name: + try: + wp.set_device(device_name) + except Exception as exc: # noqa: BLE001 + log.warning(f"Newton backend: unable to select device '{device_name}': {exc}") + + self._apply_joint_defaults(sim_cfg) + + # Add ground plane if requested (adapter handles solver-specific implementation) + ground_cfg = self.global_config.get("ground_plane", {}) + if ground_cfg.get("enabled", False): + from ark.system.newton.geometry_descriptors import GeometryDescriptor + descriptor = GeometryDescriptor.from_ground_plane_config(ground_cfg) + self.adapter.adapt_ground_plane(descriptor) + + # Add robots FIRST - URDF loader must come before primitives + # Otherwise Newton's add_urdf overwrites primitive body indices + if self.global_config.get("robots"): + for robot_name, robot_cfg in self.global_config["robots"].items(): + self.add_robot(robot_name, robot_cfg) + + # Add objects AFTER robots to preserve body indices + if self.global_config.get("objects"): + for obj_name, obj_cfg in self.global_config["objects"].items(): + obj_type = obj_cfg.get("type", "primitive") + self.add_sim_component(obj_name, obj_type, obj_cfg) + + if self.global_config.get("sensors"): + for sensor_name, sensor_cfg in self.global_config["sensors"].items(): + from ark.system.driver.sensor_driver import SensorType + sensor_type = SensorType(sensor_cfg.get("type", "camera").upper()) + self.add_sensor(sensor_name, sensor_type, sensor_cfg) + + # Finalize model via NewtonBuilder and get metadata + self.model, self.scene_metadata = self.scene_builder.finalize( + device=device_name or "cuda:0" + ) + + if self.model is None: + log.error("Newton backend: Model finalization failed, returned None") + raise RuntimeError("Failed to finalize Newton model") + + log.ok( + f"Newton backend: Model finalized successfully - " + f"{self.model.joint_count} joints, {self.model.body_count} bodies" + ) + + # Apply safety multiplier to rigid_contact_max to handle complex mesh collisions + # Newton calculates a conservative contact limit, but complex mesh-mesh interactions + # (like Franka Panda's 10 STL collision geometries) can exceed this estimate. + # The multiplier provides headroom for contact-rich scenarios without mesh simplification. + newton_cfg = sim_cfg.get("newton_physics", {}) + contact_multiplier = float(newton_cfg.get("rigid_contact_multiplier", 4.0)) + original_max = self.model.rigid_contact_max + self.model.rigid_contact_max = int(original_max * contact_multiplier) + log.info( + f"Newton backend: Increased rigid_contact_max from {original_max} to " + f"{self.model.rigid_contact_max} (multiplier={contact_multiplier})" + ) + + # Create solver through adapter (handles solver-specific configuration) + self.solver = self.adapter.create_solver(self.model, sim_cfg) + + self.state_current = self.model.state() + self.state_next = self.model.state() + self.control = self.model.control() + self.contacts = self.model.collide(self.state_current) + + # Use model arrays for initial FK (these contain initial config from builder) + # This matches Newton's own examples (see test_franka_standalone.py) + newton.eval_fk(self.model, self.model.joint_q, self.model.joint_qd, self.state_current) + newton.eval_fk(self.model, self.model.joint_q, self.model.joint_qd, self.state_next) + + self._state_accessor: Callable[[], newton.State] = lambda: self.state_current + self._bind_runtime_handles() + + # NOTE: No state sync needed here - both state_current and state_next were already + # initialized with eval_fk using model.joint_q above. Since drivers no longer call + # _apply_initial_configuration() during bind_runtime, there's no state modification + # to synchronize. The initial config was applied to builder before finalize. + + # CRITICAL FIX: Initialize control.joint_target from current state + # Without this, control.joint_target starts at zeros and PD controller + # drives all joints toward zero instead of maintaining current positions! + # This follows Newton's own examples (see example_basic_urdf.py:72) + if self.control.joint_target is not None and self.state_current.joint_q is not None: + self.control.joint_target.assign(self.state_current.joint_q) + target_sample = self.control.joint_target.numpy()[:min(7, len(self.control.joint_target))] + log.ok(f"Newton backend: Initialized control.joint_target from state: {target_sample}") + else: + log.error("Newton backend: FAILED to initialize control.joint_target - array is None!") + + # Initialize viewer manager + self.viewer_manager = NewtonViewerManager(sim_cfg, self.model) + if self.viewer_manager.gui_enabled: + # When GUI is active, step physics from the main thread to keep GL interop happy. + self.custom_event_loop = self._viewer_event_loop + + # Log successful initialization + log.ok( + f"Newton backend: Initialized with " + f"{len(self.robot_ref)} robot(s), " + f"{len(self.sensor_ref)} sensor(s), " + f"{len(self.object_ref)} object(s)" + ) + + self.ready = True + + def _bind_runtime_handles(self) -> None: + state_accessor = lambda: self.state_current + bound_robots = 0 + bound_objects = 0 + bound_sensors = 0 + + for robot in self.robot_ref.values(): + driver = getattr(robot, "_driver", None) + if isinstance(driver, NewtonRobotDriver): + driver.bind_runtime(self.model, self.control, state_accessor, self._substep_dt) + bound_robots += 1 + + for obj in self.object_ref.values(): + if isinstance(obj, NewtonMultiBody): + obj.bind_runtime(self.model, state_accessor) + bound_objects += 1 + + for sensor in self.sensor_ref.values(): + driver = getattr(sensor, "_driver", None) + if isinstance(driver, NewtonCameraDriver): + # Pass viewer_manager for RGB capture capability + driver.bind_runtime(self.model, state_accessor, viewer_manager=self.viewer_manager) + bound_sensors += 1 + elif isinstance(driver, NewtonLiDARDriver): + driver.bind_runtime(self.model, state_accessor) + bound_sensors += 1 + + log.info( + f"Newton backend: Bound runtime handles - " + f"{bound_robots} robots, {bound_objects} objects, {bound_sensors} sensors" + ) + + def _create_scene_adapter(self, solver_name: str): + """Factory method to create solver-specific scene adapter. + + Creates the appropriate adapter based on solver name. Adapters handle + solver-specific scene building requirements (e.g., MuJoCo ground plane + workaround) in a transparent, maintainable way. + + Args: + solver_name: Name of the solver ("xpbd", "mujoco", "featherstone", etc.) + + Returns: + Solver-specific adapter instance + + Example: + >>> adapter = self._create_scene_adapter("mujoco") + >>> adapter.adapt_ground_plane(descriptor) # Uses box workaround + """ + from ark.system.newton.scene_adapters import ( + XPBDAdapter, + MuJoCoAdapter, + ) + + # Map solver names to adapter classes + adapter_map = { + "xpbd": XPBDAdapter, + "solverxpbd": XPBDAdapter, + "mujoco": MuJoCoAdapter, + "solvermujoco": MuJoCoAdapter, + # TODO: Add Featherstone and SemiImplicit adapters in future + } + + # Get adapter class (default to XPBD if unknown) + solver_key = solver_name.lower() + adapter_cls = adapter_map.get(solver_key) + + if not adapter_cls: + log.warning( + f"Unknown solver '{solver_name}', falling back to XPBD adapter" + ) + adapter_cls = XPBDAdapter + + return adapter_cls(self.scene_builder) + + def set_gravity(self, gravity: tuple[float, float, float]) -> None: + """Set gravity (only works before builder is created).""" + gx, gy, gz = gravity + axis_names = {0: "X", 1: "Y", 2: "Z"} + components = np.array([gx, gy, gz], dtype=float) + idx = int(np.argmax(np.abs(components))) + magnitude = float(components[idx]) if np.any(components) else -9.81 + + # This method is called before builder is created, so we just log + # The actual gravity is set during NewtonBuilder initialization + log.info( + f"Newton backend: Gravity set to {magnitude:.2f} m/s² along {axis_names[idx]}-axis " + f"(input: [{gx}, {gy}, {gz}])" + ) + + def set_time_step(self, time_step: float) -> None: + self._time_step = time_step + self._substep_dt = time_step / self.sim_substeps + + def reset_simulator(self) -> None: + for robot in list(self.robot_ref.values()): + robot.kill_node() + for obj in list(self.object_ref.values()): + obj.kill_node() + for sensor in list(self.sensor_ref.values()): + sensor.kill_node() + + self.robot_ref.clear() + self.object_ref.clear() + self.sensor_ref.clear() + + self._simulation_time = 0.0 + self.initialize() + log.ok("Newton simulator reset complete.") + + def add_robot(self, name: str, robot_config: dict[str, Any]) -> None: + """Add a robot to the simulation. + + Args: + name: Name of the robot. + robot_config: Configuration dictionary for the robot. + """ + # NOTE: Don't call add_articulation() here - Newton's add_urdf() creates + # articulations automatically. Calling it here would create duplicates. + + class_path = Path(robot_config["class_dir"]) + if class_path.is_file(): + class_path = class_path.parent + RobotClass, driver_enum = import_class_from_directory(class_path) + driver_cls = getattr(driver_enum, "value", driver_enum) or NewtonRobotDriver + + # Pass scene_builder.builder (raw Newton ModelBuilder) to driver + driver = driver_cls(name, robot_config, builder=self.scene_builder.builder) + robot = RobotClass(name=name, global_config=self.global_config, driver=driver) + self.robot_ref[name] = robot + + def add_sim_component(self, name: str, _type: str, _obj_config: dict[str, Any]) -> None: + """Add a generic simulation object. + + Args: + name: Name of the object. + _type: Type identifier (unused, NewtonMultiBody reads from global_config). + _obj_config: Configuration dictionary (unused, read from global_config). + """ + component = NewtonMultiBody( + name=name, + builder=self.scene_builder.builder, + global_config=self.global_config + ) + self.object_ref[name] = component + + def add_sensor(self, name: str, _sensor_type: Any, sensor_config: dict[str, Any]) -> None: + """Add a sensor to the simulation. + + Args: + name: Name of the sensor. + _sensor_type: SensorType enum (unused, type determined from sensor_config). + sensor_config: Configuration dictionary for the sensor. + """ + class_path = Path(sensor_config["class_dir"]) + if class_path.is_file(): + class_path = class_path.parent + SensorClass, driver_enum = import_class_from_directory(class_path) + + # Determine driver class based on sensor type in config + config_type = sensor_config.get("type", "camera").lower() + if driver_enum is not None: + # Use custom driver if specified in module + driver_cls = getattr(driver_enum, "value", driver_enum) + elif config_type == "lidar": + driver_cls = NewtonLiDARDriver + else: + # Default to camera driver + driver_cls = NewtonCameraDriver + + driver = driver_cls(name, sensor_config) + sensor = SensorClass(name=name, driver=driver, global_config=self.global_config) + self.sensor_ref[name] = sensor + + def remove(self, name: str) -> None: + if name in self.robot_ref: + self.robot_ref[name].kill_node() + del self.robot_ref[name] + return + if name in self.sensor_ref: + self.sensor_ref[name].kill_node() + del self.sensor_ref[name] + return + if name in self.object_ref: + self.object_ref[name].kill_node() + del self.object_ref[name] + return + log.warning(f"Newton backend: component '{name}' does not exist.") + + def _all_available(self) -> bool: + for robot in self.robot_ref.values(): + if robot._is_suspended: # noqa: SLF001 + return False + for obj in self.object_ref.values(): + if obj._is_suspended: # noqa: SLF001 + return False + return True + + def _viewer_event_loop(self, sim_node) -> None: + """Run Newton simulation when GUI viewer requires main-thread ownership.""" + sim_cfg = self.global_config.get("simulator", {}).get("config", {}) + node_hz = float(sim_cfg.get("node_frequency", 240.0)) + rate = Rate(node_hz, reset=True) if node_hz > 0 else None + + lcm_handles = [sim_node._lcm] + for registry in (self.robot_ref, self.sensor_ref, self.object_ref): + for component in registry.values(): + lc = getattr(component, "_lcm", None) + if lc is not None: + lcm_handles.append(lc) + + log.info( + "Newton backend: GUI viewer active - running simulation loop on main thread" + ) + + while not sim_node._done: + if not self.viewer_manager or not self.viewer_manager.is_running(): + log.info("Newton backend: Viewer closed - stopping simulation loop") + break + + try: + for lc in lcm_handles: + lc.handle_timeout(0) + except OSError as exc: + log.warning(f"Newton backend: LCM error in viewer loop: {exc}") + break + + self._spin_sim_components() + sim_node._step_simulation() + + if rate is not None: + rate.sleep() + + sim_node._done = True + + def step(self) -> None: + if not self._all_available(): + return + + self._step_sim_components() + + for _ in range(self.sim_substeps): + self.state_current.clear_forces() + self.contacts = self.model.collide(self.state_current) + self.solver.step( + self.state_current, + self.state_next, + self.control, + self.contacts, + self._substep_dt, + ) + self.state_current, self.state_next = self.state_next, self.state_current + + self._simulation_time += self._time_step + + # Note: Do NOT call eval_fk() here - Newton's viewer.log_state() internally + # handles FK computation when updating shape transforms for rendering. + self.viewer_manager.render(self.state_current, self.contacts, self._simulation_time) + + def shutdown_backend(self) -> None: + if hasattr(self, 'viewer_manager'): + self.viewer_manager.shutdown() + for robot in list(self.robot_ref.values()): + robot.kill_node() + for obj in list(self.object_ref.values()): + obj.kill_node() + for sensor in list(self.sensor_ref.values()): + sensor.kill_node() + self.robot_ref.clear() + self.object_ref.clear() + self.sensor_ref.clear() + self.ready = False diff --git a/ark/system/newton/newton_builder.py b/ark/system/newton/newton_builder.py new file mode 100644 index 0000000..9940982 --- /dev/null +++ b/ark/system/newton/newton_builder.py @@ -0,0 +1,1161 @@ +# newton_builder.py +""" +Newton scene builder following ARK's design patterns. + +Provides a fluent API for building Newton physics scenes while tracking metadata +for integration with ARK's messaging layer. Similar in spirit to mjcf_builder.py +but designed around Newton's articulation-based architecture and conventions. + +Key features: +- Articulation-centric scene construction +- Method chaining for clean, readable scene descriptions +- Automatic tracking of bodies, joints, and articulations for spawn state generation +- Support for Newton-specific features (particles, USD/URDF/MJCF loading, env replication) +- Follows Newton conventions: Z-up, (x,y,z,w) quaternions, generalized coordinates +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple, Union + +import newton +import numpy as np +import warp as wp + +from ark.tools.log import log + + +# ----------------------------- Data Structures ----------------------------- + + +@dataclass +class ArticulationSpec: + """Metadata for a single articulation (robot/mechanism).""" + + name: str + key: Optional[str] = None # Newton articulation key + root_body_idx: Optional[int] = None + body_indices: List[int] = field(default_factory=list) + joint_indices: List[int] = field(default_factory=list) + joint_q_start: Optional[int] = None + joint_q_count: int = 0 + joint_qd_start: Optional[int] = None + joint_qd_count: int = 0 + + +@dataclass +class JointSpec: + """Metadata for joint tracking (for spawn state generation).""" + + name: str + joint_type: newton.JointType + articulation: Optional[str] = None + parent_body_idx: int = -1 + child_body_idx: int = -1 + q_start: int = 0 # Index into joint_q array + q_size: int = 0 # Number of position DOFs + qd_start: int = 0 # Index into joint_qd array + qd_size: int = 0 # Number of velocity DOFs + default_q: Optional[List[float]] = None + + +@dataclass +class BodySpec: + """Metadata for body tracking.""" + + name: str + body_idx: int + articulation: Optional[str] = None + xform: wp.transform = field(default_factory=lambda: wp.transform([0, 0, 0], [0, 0, 0, 1])) + mass: float = 0.0 + + +# --------------------------- Newton Builder ---------------------------- + + +class NewtonBuilder: + """ + Fluent API for building Newton physics scenes with ARK integration. + + Wraps Newton's ModelBuilder with: + - Named entity tracking (bodies, joints, articulations) + - Spawn state generation (joint_q, body_q initialization) + - Method chaining for readable scene construction + - Integration with ARK's YAML config system + + Example: + builder = ( + NewtonBuilder("robot_scene") + .set_gravity(-9.81) + .add_ground_plane() + .add_articulation("panda") + .load_urdf("panda", "panda.urdf", xform=wp.transform([0,0,0.5])) + .add_articulation("cube") + .add_simple_object("cube", shape="box", size=[0.05,0.05,0.05], xform=...) + ) + + model, metadata = builder.finalize() + spawn_state = builder.make_spawn_state() + """ + + def __init__( + self, + model_name: str = "world", + up_axis: newton.Axis = newton.Axis.Z, + gravity: float = -9.81 + ): + """ + Initialize Newton scene builder. + + Args: + model_name: Name for the scene/model + up_axis: Up direction (default: Z-up following Newton conventions) + gravity: Gravity magnitude along up axis (default: -9.81) + """ + self.model_name = model_name + self.up_axis = up_axis + self.gravity_magnitude = gravity + + # Create underlying Newton builder + self.builder = newton.ModelBuilder(up_axis=up_axis, gravity=gravity) + + # Articulation tracking + self._articulations: Dict[str, ArticulationSpec] = {} + self._current_articulation: Optional[str] = None + + # Entity tracking for spawn state generation + self._bodies: Dict[str, BodySpec] = {} + self._joints: List[JointSpec] = [] + self._joint_defaults: Dict[str, List[float]] = {} + + # Counter for auto-naming + self._unnamed_body_count = 0 + self._unnamed_joint_count = 0 + + # ---------- Configuration ---------- + + def set_gravity(self, magnitude: float) -> "NewtonBuilder": + """Set gravity magnitude along up axis.""" + self.gravity_magnitude = magnitude + # Newton's builder doesn't have a setter, gravity is set at init + # So we'll track it and use it when creating bodies if needed + return self + + def set_default_shape_config(self, **kwargs) -> "NewtonBuilder": + """ + Set default shape configuration. + + Args: + **kwargs: Shape config parameters (density, friction, restitution, etc.) + """ + for key, value in kwargs.items(): + if hasattr(self.builder.default_shape_cfg, key): + setattr(self.builder.default_shape_cfg, key, value) + return self + + def set_default_joint_config(self, **kwargs) -> "NewtonBuilder": + """ + Set default joint configuration. + + Args: + **kwargs: Joint config parameters (target_ke, target_kd, armature, etc.) + """ + for key, value in kwargs.items(): + if hasattr(self.builder.default_joint_cfg, key): + setattr(self.builder.default_joint_cfg, key, value) + return self + + # ---------- Articulation Management ---------- + + def add_articulation(self, name: str, key: Optional[str] = None) -> "NewtonBuilder": + """ + Start a new articulation group (robot/mechanism). + + Subsequent add_body and add_joint calls will be associated with this articulation + until a new articulation is started. + + Args: + name: Unique name for this articulation (for ARK tracking) + key: Optional Newton articulation key (auto-generated if None) + + Returns: + Self for method chaining + """ + if name in self._articulations: + log.warning(f"Articulation '{name}' already exists, switching context") + else: + # Create Newton articulation + artic_key = key if key is not None else name + self.builder.add_articulation(artic_key) + + # Track metadata + self._articulations[name] = ArticulationSpec( + name=name, + key=artic_key, + joint_q_start=len(self.builder.joint_q), + joint_qd_start=len(self.builder.joint_qd) + ) + + self._current_articulation = name + return self + + def set_current_articulation(self, name: str) -> "NewtonBuilder": + """Switch to an existing articulation context.""" + if name not in self._articulations: + raise ValueError(f"Articulation '{name}' not found. Call add_articulation('{name}') first.") + self._current_articulation = name + return self + + # ---------- Bodies ---------- + + def add_body( + self, + name: Optional[str] = None, + xform: Union[wp.transform, Tuple, List, None] = None, + mass: float = 1.0, + com: Optional[Vec3] = None, + inertia: Optional[Mat33] = None, + **kwargs + ) -> "NewtonBuilder": + """ + Add a rigid body to the scene. + + Args: + name: Body name (auto-generated if None) + xform: Initial transform (position + quaternion xyzw) + mass: Body mass in kg + com: Center of mass offset + inertia: 3x3 inertia tensor + **kwargs: Additional args passed to ModelBuilder.add_body() + + Returns: + Self for method chaining + """ + # Handle transform input + if xform is None: + xform = wp.transform([0, 0, 0], [0, 0, 0, 1]) + elif isinstance(xform, (tuple, list)): + # Convert list/tuple to wp.transform + if len(xform) == 3: + xform = wp.transform(xform, [0, 0, 0, 1]) + elif len(xform) == 7: + xform = wp.transform(xform[:3], xform[3:]) + + # Auto-generate name if needed + if name is None: + name = f"body_{self._unnamed_body_count}" + self._unnamed_body_count += 1 + + # Add to Newton builder + body_idx = self.builder.add_body( + xform=xform, + mass=mass, + com=com, + I_m=inertia, + **kwargs + ) + + # Track metadata + self._bodies[name] = BodySpec( + name=name, + body_idx=body_idx, + articulation=self._current_articulation, + xform=xform, + mass=mass + ) + + # Associate with current articulation + if self._current_articulation: + artic = self._articulations[self._current_articulation] + artic.body_indices.append(body_idx) + if artic.root_body_idx is None: + artic.root_body_idx = body_idx + + return self + + # ---------- Shapes ---------- + + def add_shape_plane( + self, + body: Union[str, int], + width: float = 10.0, + length: float = 10.0, + **kwargs + ) -> "NewtonBuilder": + """Add plane shape to body.""" + body_idx = self._resolve_body_idx(body) + self.builder.add_shape_plane(body=body_idx, width=width, length=length, **kwargs) + return self + + def add_ground_plane( + self, + size: float = 1000.0, + **kwargs + ) -> "NewtonBuilder": + """Add infinite ground plane.""" + self.builder.add_ground_plane(size=size, **kwargs) + return self + + def add_shape_box( + self, + body: Union[str, int], + hx: float = 0.5, + hy: float = 0.5, + hz: float = 0.5, + **kwargs + ) -> "NewtonBuilder": + """ + Add box shape to body. + + Args: + body: Body name or index + hx, hy, hz: Half-extents along each axis (Newton convention) + **kwargs: Additional shape parameters + """ + body_idx = self._resolve_body_idx(body) + self.builder.add_shape_box(body=body_idx, hx=hx, hy=hy, hz=hz, **kwargs) + return self + + def add_shape_sphere( + self, + body: Union[str, int], + radius: float = 0.5, + **kwargs + ) -> "NewtonBuilder": + """Add sphere shape to body.""" + body_idx = self._resolve_body_idx(body) + self.builder.add_shape_sphere(body=body_idx, radius=radius, **kwargs) + return self + + def add_shape_capsule( + self, + body: Union[str, int], + radius: float = 0.5, + half_height: float = 1.0, + **kwargs + ) -> "NewtonBuilder": + """ + Add capsule shape to body. + + Args: + body: Body name or index + radius: Capsule radius + half_height: Half-height excluding hemispherical caps (Newton convention) + **kwargs: Additional shape parameters + """ + body_idx = self._resolve_body_idx(body) + self.builder.add_shape_capsule( + body=body_idx, + radius=radius, + half_height=half_height, + **kwargs + ) + return self + + def add_shape_cylinder( + self, + body: Union[str, int], + radius: float = 0.5, + half_height: float = 1.0, + **kwargs + ) -> "NewtonBuilder": + """Add cylinder shape to body.""" + body_idx = self._resolve_body_idx(body) + self.builder.add_shape_cylinder( + body=body_idx, + radius=radius, + half_height=half_height, + **kwargs + ) + return self + + def add_shape_mesh( + self, + body: Union[str, int], + mesh: newton.Mesh, + **kwargs + ) -> "NewtonBuilder": + """Add mesh shape to body.""" + body_idx = self._resolve_body_idx(body) + self.builder.add_shape_mesh(body=body_idx, mesh=mesh, **kwargs) + return self + + def add_shape_sdf( + self, + body: Union[str, int], + sdf: newton.SDF, + **kwargs + ) -> "NewtonBuilder": + """Add SDF (signed distance field) shape to body.""" + body_idx = self._resolve_body_idx(body) + self.builder.add_shape_sdf(body=body_idx, sdf=sdf, **kwargs) + return self + + # ---------- Joints ---------- + + def add_joint_revolute( + self, + name: Optional[str] = None, + parent: Union[str, int] = -1, + child: Union[str, int] = -1, + axis: Union[newton.Axis, wp.vec3] = newton.Axis.Z, + limit_lower: float = -1e6, + limit_upper: float = 1e6, + **kwargs + ) -> "NewtonBuilder": + """ + Add revolute (hinge) joint. + + Args: + name: Joint name (auto-generated if None) + parent: Parent body name or index (-1 for world) + child: Child body name or index + axis: Rotation axis + limit_lower: Lower joint limit (radians) + limit_upper: Upper joint limit (radians) + **kwargs: Additional joint parameters (target_ke, target_kd, etc.) + """ + parent_idx = self._resolve_body_idx(parent) if parent != -1 else -1 + child_idx = self._resolve_body_idx(child) + + if name is None: + name = f"joint_{self._unnamed_joint_count}" + self._unnamed_joint_count += 1 + + # Add to Newton builder + joint_idx = self.builder.add_joint_revolute( + parent=parent_idx, + child=child_idx, + axis=axis, + limit_lower=limit_lower, + limit_upper=limit_upper, + **kwargs + ) + + # Track metadata + self._track_joint( + name=name, + joint_type=newton.JointType.REVOLUTE, + parent_idx=parent_idx, + child_idx=child_idx, + q_dofs=1, + qd_dofs=1 + ) + + return self + + def add_joint_prismatic( + self, + name: Optional[str] = None, + parent: Union[str, int] = -1, + child: Union[str, int] = -1, + axis: Union[newton.Axis, wp.vec3] = newton.Axis.Z, + limit_lower: float = -1e6, + limit_upper: float = 1e6, + **kwargs + ) -> "NewtonBuilder": + """ + Add prismatic (slider) joint. + + Args: + name: Joint name + parent: Parent body + child: Child body + axis: Sliding axis + limit_lower: Lower position limit (meters) + limit_upper: Upper position limit (meters) + **kwargs: Additional joint parameters + """ + parent_idx = self._resolve_body_idx(parent) if parent != -1 else -1 + child_idx = self._resolve_body_idx(child) + + if name is None: + name = f"joint_{self._unnamed_joint_count}" + self._unnamed_joint_count += 1 + + joint_idx = self.builder.add_joint_prismatic( + parent=parent_idx, + child=child_idx, + axis=axis, + limit_lower=limit_lower, + limit_upper=limit_upper, + **kwargs + ) + + self._track_joint( + name=name, + joint_type=newton.JointType.PRISMATIC, + parent_idx=parent_idx, + child_idx=child_idx, + q_dofs=1, + qd_dofs=1 + ) + + return self + + def add_joint_ball( + self, + name: Optional[str] = None, + parent: Union[str, int] = -1, + child: Union[str, int] = -1, + **kwargs + ) -> "NewtonBuilder": + """ + Add ball (spherical) joint. + + Ball joints have 3 rotational DOFs represented as quaternions. + Position DOFs: 4 (quaternion), Velocity DOFs: 3 (angular velocity). + """ + parent_idx = self._resolve_body_idx(parent) if parent != -1 else -1 + child_idx = self._resolve_body_idx(child) + + if name is None: + name = f"joint_{self._unnamed_joint_count}" + self._unnamed_joint_count += 1 + + joint_idx = self.builder.add_joint_ball( + parent=parent_idx, + child=child_idx, + **kwargs + ) + + self._track_joint( + name=name, + joint_type=newton.JointType.BALL, + parent_idx=parent_idx, + child_idx=child_idx, + q_dofs=4, # Quaternion + qd_dofs=3 # Angular velocity + ) + + return self + + def add_joint_free( + self, + name: Optional[str] = None, + parent: Union[str, int] = -1, + child: Union[str, int] = -1, + **kwargs + ) -> "NewtonBuilder": + """ + Add free (6-DOF) joint for floating base robots/objects. + + Free joints have 6 DOFs: 3 translation + 3 rotation. + Position DOFs: 7 (pos + quat), Velocity DOFs: 6 (linear + angular). + """ + parent_idx = self._resolve_body_idx(parent) if parent != -1 else -1 + child_idx = self._resolve_body_idx(child) + + if name is None: + name = f"joint_{self._unnamed_joint_count}" + self._unnamed_joint_count += 1 + + joint_idx = self.builder.add_joint_free( + parent=parent_idx, + child=child_idx, + **kwargs + ) + + self._track_joint( + name=name, + joint_type=newton.JointType.FREE, + parent_idx=parent_idx, + child_idx=child_idx, + q_dofs=7, # Position + quaternion + qd_dofs=6 # Linear + angular velocity + ) + + return self + + def add_joint_fixed( + self, + name: Optional[str] = None, + parent: Union[str, int] = -1, + child: Union[str, int] = -1, + **kwargs + ) -> "NewtonBuilder": + """Add fixed joint (rigid connection between bodies).""" + parent_idx = self._resolve_body_idx(parent) if parent != -1 else -1 + child_idx = self._resolve_body_idx(child) + + if name is None: + name = f"joint_{self._unnamed_joint_count}" + self._unnamed_joint_count += 1 + + joint_idx = self.builder.add_joint_fixed( + parent=parent_idx, + child=child_idx, + **kwargs + ) + + self._track_joint( + name=name, + joint_type=newton.JointType.FIXED, + parent_idx=parent_idx, + child_idx=child_idx, + q_dofs=0, + qd_dofs=0 + ) + + return self + + def add_joint_d6( + self, + name: Optional[str] = None, + parent: Union[str, int] = -1, + child: Union[str, int] = -1, + linear_axes: Optional[List] = None, + angular_axes: Optional[List] = None, + **kwargs + ) -> "NewtonBuilder": + """ + Add D6 (generic 6-DOF configurable) joint. + + Args: + name: Joint name + parent: Parent body + child: Child body + linear_axes: List of linear axis configs (up to 3) + angular_axes: List of angular axis configs (up to 3) + **kwargs: Additional joint parameters + """ + parent_idx = self._resolve_body_idx(parent) if parent != -1 else -1 + child_idx = self._resolve_body_idx(child) + + if name is None: + name = f"joint_{self._unnamed_joint_count}" + self._unnamed_joint_count += 1 + + joint_idx = self.builder.add_joint_d6( + parent=parent_idx, + child=child_idx, + linear_axes=linear_axes or [], + angular_axes=angular_axes or [], + **kwargs + ) + + # D6 DOF count depends on axes configuration + num_linear = len(linear_axes) if linear_axes else 0 + num_angular = len(angular_axes) if angular_axes else 0 + total_dofs = num_linear + num_angular + + self._track_joint( + name=name, + joint_type=newton.JointType.D6, + parent_idx=parent_idx, + child_idx=child_idx, + q_dofs=total_dofs, + qd_dofs=total_dofs + ) + + return self + + # ---------- Asset Loading ---------- + + def load_urdf( + self, + name: str, + file: Union[str, Path], + xform: Optional[wp.transform] = None, + floating: bool = False, + collapse_fixed_joints: bool = False, + **kwargs + ) -> "NewtonBuilder": + """ + Load a URDF file as an articulation. + + Args: + name: Articulation name for tracking + file: Path to URDF file + xform: Initial transform + floating: If True, add free joint for floating base + collapse_fixed_joints: Merge fixed-joint-connected bodies + **kwargs: Additional args for add_urdf + + Returns: + Self for method chaining + """ + # Ensure articulation context + if name not in self._articulations: + self.add_articulation(name) + else: + self.set_current_articulation(name) + + xform = xform or wp.transform([0, 0, 0], [0, 0, 0, 1]) + + # Load URDF + self.builder.add_urdf( + source=str(file), + xform=xform, + floating=floating, + collapse_fixed_joints=collapse_fixed_joints, + **kwargs + ) + + # Update articulation metadata + artic = self._articulations[name] + artic.joint_q_count = len(self.builder.joint_q) - artic.joint_q_start + artic.joint_qd_count = len(self.builder.joint_qd) - artic.joint_qd_start + + log.info(f"Loaded URDF '{file}' as articulation '{name}' " + f"({artic.joint_q_count} DOFs)") + + return self + + def load_usd( + self, + name: str, + file: Union[str, Path], + xform: Optional[wp.transform] = None, + **kwargs + ) -> "NewtonBuilder": + """ + Load a USD (Universal Scene Description) file. + + Args: + name: Articulation name + file: Path to USD file + xform: Initial transform + **kwargs: Additional args for add_usd + """ + if name not in self._articulations: + self.add_articulation(name) + else: + self.set_current_articulation(name) + + xform = xform or wp.transform([0, 0, 0], [0, 0, 0, 1]) + + self.builder.add_usd( + source=str(file), + xform=xform, + **kwargs + ) + + artic = self._articulations[name] + artic.joint_q_count = len(self.builder.joint_q) - artic.joint_q_start + artic.joint_qd_count = len(self.builder.joint_qd) - artic.joint_qd_start + + log.info(f"Loaded USD '{file}' as articulation '{name}'") + + return self + + def load_mjcf( + self, + name: str, + file: Union[str, Path], + xform: Optional[wp.transform] = None, + ignore_names: Optional[List[str]] = None, + **kwargs + ) -> "NewtonBuilder": + """ + Load a MuJoCo MJCF XML file. + + Args: + name: Articulation name + file: Path to MJCF file + xform: Initial transform + ignore_names: List of body names to skip + **kwargs: Additional args for add_mjcf + """ + if name not in self._articulations: + self.add_articulation(name) + else: + self.set_current_articulation(name) + + xform = xform or wp.transform([0, 0, 0], [0, 0, 0, 1]) + + self.builder.add_mjcf( + source=str(file), + xform=xform, + ignore_names=ignore_names or [], + **kwargs + ) + + artic = self._articulations[name] + artic.joint_q_count = len(self.builder.joint_q) - artic.joint_q_start + artic.joint_qd_count = len(self.builder.joint_qd) - artic.joint_qd_start + + log.info(f"Loaded MJCF '{file}' as articulation '{name}'") + + return self + + # ---------- High-Level Convenience Methods ---------- + + def add_simple_object( + self, + name: str, + shape: str = "box", + size: Union[List[float], float] = 0.1, + xform: Optional[wp.transform] = None, + mass: float = 1.0, + free: bool = True, + **shape_kwargs + ) -> "NewtonBuilder": + """ + Convenience method to add a simple geometric object. + + Args: + name: Object name + shape: Shape type ("box", "sphere", "capsule", "cylinder") + size: Size parameter (half-extents for box, radius for sphere, etc.) + xform: Initial transform + mass: Object mass + free: If True, add free joint for 6-DOF motion + **shape_kwargs: Additional shape parameters (friction, restitution, etc.) + + Returns: + Self for method chaining + """ + # Create articulation for this object + self.add_articulation(name) + + # Add body + self.add_body(name=name, xform=xform, mass=mass) + + # Add free joint if requested + if free: + self.add_joint_free(name=f"{name}_root", parent=-1, child=name) + + # Add shape + if shape == "box": + if isinstance(size, (list, tuple)): + hx, hy, hz = size[0], size[1], size[2] + else: + hx = hy = hz = size + self.add_shape_box(body=name, hx=hx, hy=hy, hz=hz, **shape_kwargs) + elif shape == "sphere": + radius = size if isinstance(size, (int, float)) else size[0] + self.add_shape_sphere(body=name, radius=radius, **shape_kwargs) + elif shape == "capsule": + if isinstance(size, (list, tuple)): + radius, half_height = size[0], size[1] + else: + radius = half_height = size + self.add_shape_capsule(body=name, radius=radius, half_height=half_height, **shape_kwargs) + elif shape == "cylinder": + if isinstance(size, (list, tuple)): + radius, half_height = size[0], size[1] + else: + radius = half_height = size + self.add_shape_cylinder(body=name, radius=radius, half_height=half_height, **shape_kwargs) + else: + raise ValueError(f"Unknown shape type: {shape}") + + return self + + # ---------- Particles ---------- + + def add_particle( + self, + pos: Union[wp.vec3, Tuple, List], + vel: Union[wp.vec3, Tuple, List] = (0, 0, 0), + mass: float = 1.0, + radius: Optional[float] = None, + **kwargs + ) -> "NewtonBuilder": + """Add a single particle to the scene.""" + radius = radius or self.builder.default_particle_radius + self.builder.add_particle(pos=pos, vel=vel, mass=mass, radius=radius, **kwargs) + return self + + def add_particles( + self, + positions: np.ndarray, + velocities: Optional[np.ndarray] = None, + masses: Optional[np.ndarray] = None, + radii: Optional[np.ndarray] = None, + **kwargs + ) -> "NewtonBuilder": + """Add multiple particles at once.""" + self.builder.add_particles( + positions=positions, + velocities=velocities, + masses=masses, + radii=radii, + **kwargs + ) + return self + + def add_particle_grid( + self, + lower: Union[wp.vec3, Tuple, List], + upper: Union[wp.vec3, Tuple, List], + spacing: float, + **kwargs + ) -> "NewtonBuilder": + """Add a regular grid of particles.""" + self.builder.add_particle_grid(lower=lower, upper=upper, spacing=spacing, **kwargs) + return self + + # ---------- Environment Replication ---------- + + def replicate_articulation( + self, + num_envs: int, + spacing: Union[float, Tuple[float, float, float]] = 2.0 + ) -> "NewtonBuilder": + """ + Replicate the current scene for parallel environments. + + Newton's replicate() creates multiple copies of the scene for vectorized + simulation (useful for RL training). + + Args: + num_envs: Number of environment copies + spacing: Spacing between environments (scalar or (x,y,z) tuple) + + Returns: + Self for method chaining + """ + if isinstance(spacing, (int, float)): + spacing = (spacing, spacing, 0.0) + + # Note: Newton's replicate works on the whole builder, not per-articulation + # For now, we'll just call it directly + # TODO: Implement per-articulation replication if needed + log.warning("replicate_articulation() replicates the entire scene, not just one articulation") + + # We can't actually replicate here because it would duplicate everything + # This should be called after all articulations are added + # Store parameters for later + self._replicate_params = { + "num_envs": num_envs, + "spacing": spacing + } + + return self + + # ---------- Spawn State Generation ---------- + + def set_joint_defaults( + self, + joint_defaults: Dict[str, Union[float, List[float]]] + ) -> "NewtonBuilder": + """ + Set default joint positions for spawn state generation. + + Args: + joint_defaults: Dict mapping joint names to default positions + (scalar for 1-DOF, list for multi-DOF joints) + + Returns: + Self for method chaining + """ + for name, value in joint_defaults.items(): + if isinstance(value, (int, float)): + self._joint_defaults[name] = [float(value)] + else: + self._joint_defaults[name] = [float(v) for v in value] + return self + + def make_spawn_state( + self, + name: str = "spawn", + joint_positions: Optional[Dict[str, Union[float, List[float]]]] = None + ) -> Dict[str, np.ndarray]: + """ + Generate initial spawn state for the scene. + + Similar to mjcf_builder's make_spawn_keyframe(), but returns numpy arrays + instead of XML. Includes both generalized (joint_q) and maximal (body_q) + coordinates. + + Args: + name: State name (for logging/debugging) + joint_positions: Optional dict of joint positions (overrides defaults) + + Returns: + Dict with keys: + - joint_q: Generalized positions (numpy array) + - joint_qd: Generalized velocities (numpy array, all zeros) + - body_q: Maximal coordinates (numpy array, computed via FK) + - body_qd: Body velocities (numpy array, all zeros) + """ + # Merge defaults with overrides + merged_defaults = dict(self._joint_defaults) + if joint_positions: + for k, v in joint_positions.items(): + if isinstance(v, (int, float)): + merged_defaults[k] = [float(v)] + else: + merged_defaults[k] = [float(x) for x in v] + + # Build joint_q array + joint_q = [] + for joint_spec in self._joints: + jname = joint_spec.name + q_size = joint_spec.q_size + + if jname in merged_defaults: + vals = merged_defaults[jname] + if len(vals) != q_size: + raise ValueError( + f"Joint '{jname}' expects {q_size} DOFs, got {len(vals)}" + ) + joint_q.extend(vals) + else: + # Default values based on joint type + if joint_spec.joint_type == newton.JointType.FREE: + # Free joint: [x, y, z, qx, qy, qz, qw] + # Use body's initial transform if available + body_spec = self._bodies.get( + next((b.name for b in self._bodies.values() + if b.body_idx == joint_spec.child_body_idx), None) + ) + if body_spec: + xform = body_spec.xform + joint_q.extend([ + xform.p[0], xform.p[1], xform.p[2], # position + xform.q[0], xform.q[1], xform.q[2], xform.q[3] # quat (xyzw) + ]) + else: + joint_q.extend([0, 0, 0, 0, 0, 0, 1]) + elif joint_spec.joint_type == newton.JointType.BALL: + # Ball joint: [qx, qy, qz, qw] + joint_q.extend([0, 0, 0, 1]) + else: + # Revolute/prismatic: single value + joint_q.extend([0.0] * q_size) + + joint_q = np.array(joint_q, dtype=np.float32) + joint_qd = np.zeros_like(joint_q) + + log.ok(f"Generated spawn state '{name}' with {len(joint_q)} generalized DOFs") + + return { + "name": name, + "joint_q": joint_q, + "joint_qd": joint_qd, + # Note: body_q and body_qd would require calling eval_fk after finalize + # We'll add that capability after finalize() is implemented + } + + # ---------- Utilities ---------- + + def get_body_idx(self, name: str) -> int: + """Get body index by name.""" + if name not in self._bodies: + raise ValueError(f"Body '{name}' not found") + return self._bodies[name].body_idx + + def get_articulation(self, name: str) -> ArticulationSpec: + """Get articulation metadata by name.""" + if name not in self._articulations: + raise ValueError(f"Articulation '{name}' not found") + return self._articulations[name] + + def joint_order(self) -> List[str]: + """Return list of joint names in qpos order.""" + return [j.name for j in self._joints] + + def _resolve_body_idx(self, body: Union[str, int]) -> int: + """Convert body name or index to index.""" + if isinstance(body, str): + return self.get_body_idx(body) + return body + + def _track_joint( + self, + name: str, + joint_type: newton.JointType, + parent_idx: int, + child_idx: int, + q_dofs: int, + qd_dofs: int + ): + """Internal helper to track joint metadata.""" + q_start = len(self._joints) # Simplified - would need actual index tracking + + joint_spec = JointSpec( + name=name, + joint_type=joint_type, + articulation=self._current_articulation, + parent_body_idx=parent_idx, + child_body_idx=child_idx, + q_start=q_start, + q_size=q_dofs, + qd_start=q_start, # Simplified + qd_size=qd_dofs + ) + + self._joints.append(joint_spec) + + # Update articulation metadata + if self._current_articulation: + artic = self._articulations[self._current_articulation] + artic.joint_indices.append(len(self._joints) - 1) + + # ---------- Finalization ---------- + + def finalize(self, device: str = "cuda:0") -> Tuple[newton.Model, Dict]: + """ + Finalize the builder and return Newton model + metadata. + + Args: + device: Compute device (e.g., "cuda:0", "cpu") + + Returns: + Tuple of (Newton Model, metadata dict) + """ + # Apply environment replication if requested + if hasattr(self, '_replicate_params'): + params = self._replicate_params + log.info(f"Replicating scene {params['num_envs']} times...") + # Newton's replicate needs to be called before finalize + # but after all entities are added + # For now, skip as it requires more complex integration + log.warning("Environment replication not yet implemented in finalize()") + + # Finalize Newton model + log.info(f"Finalizing Newton model '{self.model_name}' on {device}...") + model = self.builder.finalize(device=device) + + # Build metadata dict + metadata = { + "model_name": self.model_name, + "articulations": { + name: { + "key": spec.key, + "num_bodies": len(spec.body_indices), + "num_joints": len(spec.joint_indices), + "joint_q_start": spec.joint_q_start, + "joint_q_count": spec.joint_q_count, + "joint_qd_start": spec.joint_qd_start, + "joint_qd_count": spec.joint_qd_count, + } + for name, spec in self._articulations.items() + }, + "bodies": { + name: { + "idx": spec.body_idx, + "articulation": spec.articulation, + "mass": spec.mass, + } + for name, spec in self._bodies.items() + }, + "joints": [ + { + "name": spec.name, + "type": spec.joint_type.name, + "articulation": spec.articulation, + "q_start": spec.q_start, + "q_size": spec.q_size, + } + for spec in self._joints + ], + "num_bodies": model.body_count, + "num_joints": model.joint_count, + "num_dofs": model.joint_dof_count, + } + + log.ok(f"Finalized model with {model.body_count} bodies, " + f"{model.joint_count} joints, {model.joint_dof_count} DOFs") + + return model, metadata + + +# Type aliases for convenience +Vec3 = Union[wp.vec3, Tuple[float, float, float], List[float]] +Mat33 = Union[wp.mat33, np.ndarray] diff --git a/ark/system/newton/newton_camera_driver.py b/ark/system/newton/newton_camera_driver.py new file mode 100644 index 0000000..20d774c --- /dev/null +++ b/ark/system/newton/newton_camera_driver.py @@ -0,0 +1,342 @@ +"""Newton camera driver with RaycastSensor for depth and ViewerGL for RGB.""" + +from __future__ import annotations + +import math +from enum import Enum +from typing import Any, Callable, Dict, Optional, Sequence + +import numpy as np +import warp as wp +from scipy.spatial.transform import Rotation as R + +from ark.tools.log import log +from ark.system.driver.sensor_driver import CameraDriver + +try: + from newton.sensors import RaycastSensor + RAYCAST_AVAILABLE = True +except ImportError: + log.warning("Newton RaycastSensor not available - depth images will be placeholders") + RAYCAST_AVAILABLE = False + + +class CameraType(Enum): + """Supported camera models.""" + + FIXED = "fixed" + ATTACHED = "attached" + + +class NewtonCameraDriver(CameraDriver): + """Camera driver using Newton RaycastSensor for depth and ViewerGL for RGB. + + Supports two camera modes: + - FIXED: Static camera with position defined by yaw/pitch/roll around a target + - ATTACHED: Camera attached to a robot link with offset transform + + Note: Segmentation is not supported by Newton - returns zeros with warning. + """ + + def __init__( + self, + component_name: str, + component_config: Dict[str, Any], + attached_body_id: Optional[int] = None, + ) -> None: + super().__init__(component_name, component_config, True) + + sim_cfg = component_config.get("sim_config", {}) + + # Image dimensions + self.width = int(component_config.get("width", sim_cfg.get("width", 640))) + self.height = int(component_config.get("height", sim_cfg.get("height", 480))) + + # Camera intrinsics + self.fov = float(sim_cfg.get("fov", 60.0)) + self.near = float(sim_cfg.get("near", sim_cfg.get("near_val", 0.1))) + self.far = float(sim_cfg.get("far", sim_cfg.get("far_val", 100.0))) + self.fov_radians = math.radians(self.fov) + + # Determine camera type + try: + camera_type_str = component_config.get("camera_type", "fixed") + self.camera_type = CameraType(camera_type_str.lower()) + except ValueError: + log.warning(f"Newton camera '{component_name}': Unknown camera_type '{camera_type_str}', defaulting to FIXED") + self.camera_type = CameraType.FIXED + + # Stream configuration + streams = component_config.get("streams", {}) + self.color_stream = streams.get("color", {}).get("enable", True) + self.depth_stream = streams.get("depth", {}).get("enable", True) + self.segmentation_stream = streams.get("segmentation", {}).get("enable", False) + self._segmentation_warned = False + + # Fixed camera parameters + if self.camera_type == CameraType.FIXED: + fix_cfg = sim_cfg.get("fix", {}) + self.camera_target_position = fix_cfg.get("camera_target_position", [0, 0, 0]) + self.distance = fix_cfg.get("distance", 1.5) + self.yaw = fix_cfg.get("yaw", 0.0) + self.pitch = fix_cfg.get("pitch", -30.0) + self.roll = fix_cfg.get("roll", 0.0) + self.up_axis_index = fix_cfg.get("up_axis_index", 2) # Z-up default + + # Compute initial camera pose from yaw/pitch/roll + self._compute_fixed_camera_pose() + + # Attached camera parameters + else: + attach_cfg = sim_cfg.get("attach", {}) + self._parent_name = attach_cfg.get("parent_name") + self._parent_link = attach_cfg.get("parent_link") + self._offset_position = np.array(attach_cfg.get("position", attach_cfg.get("offset_translation", [0.0, 0.0, 0.0])), dtype=np.float32) + offset_rot = attach_cfg.get("orientation", attach_cfg.get("offset_rotation", [0.0, 0.0, 0.0, 1.0])) + if len(offset_rot) == 3: + # Euler angles (degrees) + self._offset_rotation = R.from_euler('xyz', offset_rot, degrees=True) + else: + # Quaternion (xyzw) + self._offset_rotation = R.from_quat(offset_rot) + self._rel_camera_target = attach_cfg.get("rel_camera_target", [1, 0, 0]) + + # Runtime bindings (set in bind_runtime) + self._model = None + self._state_accessor: Callable[[], Any] = lambda: None + self._viewer_manager = None + self._raycast_sensor: Optional[Any] = None + self._body_index: Optional[int] = attached_body_id + + # Current camera pose (updated for ATTACHED mode) + self.current_position = np.array([0.0, 0.0, 0.0], dtype=np.float32) + self.current_direction = np.array([1.0, 0.0, 0.0], dtype=np.float32) + self.current_up = np.array([0.0, 0.0, 1.0], dtype=np.float32) + + log.info( + f"Newton camera driver '{component_name}': Initialized " + f"(size={self.width}x{self.height}, type={self.camera_type.value})" + ) + + def _compute_fixed_camera_pose(self) -> None: + """Compute camera position and orientation from yaw/pitch/roll parameters.""" + # Convert angles to radians + yaw_rad = math.radians(self.yaw) + pitch_rad = math.radians(self.pitch) + + # Compute camera position using spherical coordinates + # Camera looks at target from distance, with yaw rotating around vertical axis + # and pitch tilting up/down + cos_pitch = math.cos(pitch_rad) + sin_pitch = math.sin(pitch_rad) + cos_yaw = math.cos(yaw_rad) + sin_yaw = math.sin(yaw_rad) + + # Position offset from target (spherical coords) + offset_x = self.distance * cos_pitch * cos_yaw + offset_y = self.distance * cos_pitch * sin_yaw + offset_z = self.distance * sin_pitch + + target = np.array(self.camera_target_position, dtype=np.float32) + self.current_position = target + np.array([offset_x, offset_y, offset_z], dtype=np.float32) + + # Direction is from camera to target (normalized) + self.current_direction = target - self.current_position + norm = np.linalg.norm(self.current_direction) + if norm > 1e-6: + self.current_direction = self.current_direction / norm + else: + self.current_direction = np.array([1.0, 0.0, 0.0], dtype=np.float32) + + # Up vector based on up_axis_index + self.current_up = np.array([0.0, 0.0, 1.0], dtype=np.float32) # Default Z-up + + def bind_runtime( + self, + model: Any, + state_accessor: Callable[[], Any], + viewer_manager: Optional[Any] = None, + ) -> None: + """Bind to Newton model after finalization. + + Args: + model: Finalized newton.Model + state_accessor: Callable returning current newton.State + viewer_manager: Optional NewtonViewerManager for RGB capture + """ + self._model = model + self._state_accessor = state_accessor + self._viewer_manager = viewer_manager + + # Resolve parent body index for ATTACHED mode + if self.camera_type == CameraType.ATTACHED and self._body_index is None: + if self._parent_name and hasattr(model, 'body_key'): + key_to_index = {name: idx for idx, name in enumerate(model.body_key)} + # Try parent_link first, then parent_name + search_key = self._parent_link or self._parent_name + self._body_index = key_to_index.get(search_key) + + if self._body_index is None: + log.warning( + f"Newton camera driver '{self.component_name}': " + f"Parent body '{search_key}' not found in model. " + f"Camera will not track body motion." + ) + + # Create RaycastSensor for depth + if RAYCAST_AVAILABLE and self._model is not None: + try: + self._raycast_sensor = RaycastSensor( + model=self._model, + camera_position=tuple(self.current_position), + camera_direction=tuple(self.current_direction), + camera_up=tuple(self.current_up), + fov_radians=self.fov_radians, + width=self.width, + height=self.height, + max_distance=self.far, + ) + log.ok(f"Newton camera driver '{self.component_name}': RaycastSensor created") + except Exception as e: + log.warning(f"Newton camera driver '{self.component_name}': Failed to create RaycastSensor: {e}") + self._raycast_sensor = None + + def _update_camera_pose(self) -> None: + """Update camera pose for ATTACHED mode from body state.""" + if self.camera_type != CameraType.ATTACHED or self._body_index is None: + return + + state = self._state_accessor() + if state is None or state.body_q is None: + return + + # Get body transform from state + body_q = state.body_q.numpy() + if self._body_index >= len(body_q): + return + + # body_q is array of transforms, each is [px, py, pz, qx, qy, qz, qw] + body_transform = body_q[self._body_index] + body_pos = body_transform[:3] + body_quat = body_transform[3:7] # xyzw format + + # Apply offset transform + body_rot = R.from_quat(body_quat) + offset_pos_world = body_rot.apply(self._offset_position) + self.current_position = (body_pos + offset_pos_world).astype(np.float32) + + # Compute camera direction (forward in local frame transformed to world) + combined_rot = body_rot * self._offset_rotation + local_forward = np.array(self._rel_camera_target, dtype=np.float32) + self.current_direction = combined_rot.apply(local_forward).astype(np.float32) + norm = np.linalg.norm(self.current_direction) + if norm > 1e-6: + self.current_direction = self.current_direction / norm + + # Up vector + local_up = np.array([0.0, 0.0, 1.0], dtype=np.float32) + self.current_up = combined_rot.apply(local_up).astype(np.float32) + + # Update RaycastSensor pose + if self._raycast_sensor is not None: + self._raycast_sensor.update_camera_pose( + position=tuple(self.current_position), + direction=tuple(self.current_direction), + up=tuple(self.current_up), + ) + + def get_images(self) -> Dict[str, np.ndarray]: + """Get camera images. + + Returns dict with optional keys: + - "color": RGB image (H, W, 3) uint8 + - "depth": Depth image (H, W) float32 in meters + - "segmentation": Zeros (H, W) int32 (not supported by Newton) + """ + images = {} + + # Update pose for attached cameras + if self.camera_type == CameraType.ATTACHED: + self._update_camera_pose() + + # Get depth from RaycastSensor + if self.depth_stream: + depth = self._get_depth_image() + images["depth"] = depth + + # Get RGB from ViewerGL (if available) + if self.color_stream: + color = self._get_color_image() + images["color"] = color + + # Segmentation not supported - return zeros with warning + if self.segmentation_stream: + if not self._segmentation_warned: + log.warning( + f"Newton camera '{self.component_name}': Segmentation masks are not " + f"supported by Newton. Returning zeros. Use PyBullet backend if " + f"segmentation is required." + ) + self._segmentation_warned = True + images["segmentation"] = np.zeros((self.height, self.width), dtype=np.int32) + + return images + + def _get_depth_image(self) -> np.ndarray: + """Get depth image from RaycastSensor.""" + if self._raycast_sensor is None: + # Return placeholder (infinity = no depth data) + return np.full((self.height, self.width), np.inf, dtype=np.float32) + + state = self._state_accessor() + if state is None: + return np.full((self.height, self.width), np.inf, dtype=np.float32) + + # Evaluate raycast + self._raycast_sensor.eval(state) + depth = self._raycast_sensor.get_depth_image_numpy() + + # Convert -1.0 (no hit) to infinity for consistency with PyBullet + depth = depth.copy() + depth[depth < 0] = np.inf + + return depth.astype(np.float32) + + def _get_color_image(self) -> np.ndarray: + """Get RGB image from ViewerGL if available.""" + if self._viewer_manager is None or not getattr(self._viewer_manager, '_gui_enabled', False): + # Return black placeholder in headless mode + return np.zeros((self.height, self.width, 3), dtype=np.uint8) + + viewer = getattr(self._viewer_manager, 'viewer', None) + if viewer is None or not hasattr(viewer, 'get_frame'): + return np.zeros((self.height, self.width, 3), dtype=np.uint8) + + try: + # Get frame from ViewerGL (returns Warp array) + frame = viewer.get_frame() + if frame is not None: + rgb = frame.numpy() + # Resize if dimensions don't match + if rgb.shape[0] != self.height or rgb.shape[1] != self.width: + # Simple resize by cropping/padding or using scipy + try: + from scipy.ndimage import zoom + scale_h = self.height / rgb.shape[0] + scale_w = self.width / rgb.shape[1] + rgb = zoom(rgb, (scale_h, scale_w, 1), order=1).astype(np.uint8) + except ImportError: + # Fall back to simple resize + rgb = rgb[:self.height, :self.width, :] + return rgb + except Exception as e: + log.warning(f"Newton camera '{self.component_name}': Failed to get frame from viewer: {e}") + + return np.zeros((self.height, self.width, 3), dtype=np.uint8) + + def shutdown_driver(self) -> None: + """Clean up resources.""" + self._raycast_sensor = None + self._model = None + self._viewer_manager = None + super().shutdown_driver() diff --git a/ark/system/newton/newton_lidar_driver.py b/ark/system/newton/newton_lidar_driver.py new file mode 100644 index 0000000..a8f0f48 --- /dev/null +++ b/ark/system/newton/newton_lidar_driver.py @@ -0,0 +1,262 @@ +"""Newton LiDAR driver using RaycastSensor for range measurements.""" + +from __future__ import annotations + +import math +from enum import Enum +from typing import Any, Callable, Dict, Optional + +import numpy as np +from scipy.spatial.transform import Rotation as R + +from ark.tools.log import log +from ark.system.driver.sensor_driver import LiDARDriver + +try: + from newton.sensors import RaycastSensor + RAYCAST_AVAILABLE = True +except ImportError: + log.warning("Newton RaycastSensor not available - LiDAR scans will be placeholders") + RAYCAST_AVAILABLE = False + + +class LiDARType(Enum): + """Supported LiDAR mounting types.""" + + FIXED = "fixed" + ATTACHED = "attached" + + +class NewtonLiDARDriver(LiDARDriver): + """LiDAR driver using Newton's RaycastSensor for range measurements. + + Simulates a 2D planar LiDAR by using RaycastSensor with height=1. + The sensor casts rays horizontally in a fan pattern and returns + range measurements. + + Supports two mounting modes: + - FIXED: Static position and orientation + - ATTACHED: LiDAR attached to a robot body with offset transform + """ + + def __init__( + self, + component_name: str, + component_config: Dict[str, Any], + attached_body_id: Optional[int] = None, + ) -> None: + super().__init__(component_name, component_config, True) + + sim_cfg = component_config.get("sim_config", {}) + + # LiDAR parameters + self.num_rays = int(sim_cfg.get("num_rays", 360)) + self.linear_range = float(sim_cfg.get("linear_range", 10.0)) + self.angular_range = float(sim_cfg.get("angular_range", 360.0)) # degrees + + # Validate parameters + if self.num_rays <= 0: + raise ValueError(f"num_rays must be > 0 for {component_name}") + if self.linear_range <= 0: + raise ValueError(f"linear_range must be > 0 for {component_name}") + if not (0 < self.angular_range <= 360): + raise ValueError(f"angular_range must be > 0 and <= 360 for {component_name}") + + # Determine LiDAR type + lidar_type_str = sim_cfg.get("lidar_type", "fixed") + try: + self.lidar_type = LiDARType(lidar_type_str.lower()) + except ValueError: + log.warning(f"Newton LiDAR '{component_name}': Unknown lidar_type '{lidar_type_str}', defaulting to FIXED") + self.lidar_type = LiDARType.FIXED + + # Pre-compute angles array (in radians, relative to sensor frame) + # Angles are centered around 0, spanning from -angular_range/2 to +angular_range/2 + angular_range_rad = math.radians(self.angular_range) + if self.angular_range == 360: + # Don't repeat the same angle + self._angles = np.linspace(-angular_range_rad / 2, angular_range_rad / 2, self.num_rays, endpoint=False) + else: + self._angles = np.linspace(-angular_range_rad / 2, angular_range_rad / 2, self.num_rays, endpoint=True) + + # Fixed LiDAR parameters + if self.lidar_type == LiDARType.FIXED: + fix_cfg = sim_cfg.get("fix", {}) + self.current_position = np.array(fix_cfg.get("position", [0.0, 0.0, 0.0]), dtype=np.float32) + yaw_deg = fix_cfg.get("yaw", 0.0) + self.current_yaw = math.radians(yaw_deg) + + # Attached LiDAR parameters + else: + attach_cfg = sim_cfg.get("attach", {}) + self._parent_name = attach_cfg.get("parent_name") + self._parent_link = attach_cfg.get("parent_link") + self._offset_position = np.array(attach_cfg.get("offset_translation", [0.0, 0.0, 0.0]), dtype=np.float32) + self._offset_yaw = math.radians(attach_cfg.get("offset_yaw", 0.0)) + self.current_position = np.array([0.0, 0.0, 0.0], dtype=np.float32) + self.current_yaw = 0.0 + + # Runtime bindings + self._model = None + self._state_accessor: Callable[[], Any] = lambda: None + self._raycast_sensor: Optional[Any] = None + self._body_index: Optional[int] = attached_body_id + + log.info( + f"Newton LiDAR driver '{component_name}': Initialized " + f"(rays={self.num_rays}, range={self.linear_range}m, " + f"angular={self.angular_range}deg, type={self.lidar_type.value})" + ) + + def bind_runtime( + self, + model: Any, + state_accessor: Callable[[], Any], + ) -> None: + """Bind to Newton model after finalization. + + Args: + model: Finalized newton.Model + state_accessor: Callable returning current newton.State + """ + self._model = model + self._state_accessor = state_accessor + + # Resolve parent body index for ATTACHED mode + if self.lidar_type == LiDARType.ATTACHED and self._body_index is None: + if self._parent_name and hasattr(model, 'body_key'): + key_to_index = {name: idx for idx, name in enumerate(model.body_key)} + search_key = self._parent_link or self._parent_name + self._body_index = key_to_index.get(search_key) + + if self._body_index is None: + log.warning( + f"Newton LiDAR driver '{self.component_name}': " + f"Parent body '{search_key}' not found in model." + ) + + # Create RaycastSensor configured for planar scanning + # Use height=1 for 2D LiDAR, width=num_rays + if RAYCAST_AVAILABLE and self._model is not None: + try: + # Compute initial camera direction based on yaw + direction = np.array([ + math.cos(self.current_yaw), + math.sin(self.current_yaw), + 0.0 + ], dtype=np.float32) + + # Up vector is Z (planar LiDAR scans horizontally) + up = np.array([0.0, 0.0, 1.0], dtype=np.float32) + + # FOV in radians for horizontal scan + fov_rad = math.radians(self.angular_range) + + self._raycast_sensor = RaycastSensor( + model=self._model, + camera_position=tuple(self.current_position), + camera_direction=tuple(direction), + camera_up=tuple(up), + fov_radians=fov_rad, + width=self.num_rays, + height=1, # Single row for 2D LiDAR + max_distance=self.linear_range, + ) + log.ok(f"Newton LiDAR driver '{self.component_name}': RaycastSensor created") + except Exception as e: + log.warning(f"Newton LiDAR driver '{self.component_name}': Failed to create RaycastSensor: {e}") + self._raycast_sensor = None + + def _update_position(self) -> None: + """Update LiDAR position and orientation for ATTACHED mode.""" + if self.lidar_type != LiDARType.ATTACHED or self._body_index is None: + return + + state = self._state_accessor() + if state is None or state.body_q is None: + return + + # Get body transform + body_q = state.body_q.numpy() + if self._body_index >= len(body_q): + return + + body_transform = body_q[self._body_index] + body_pos = body_transform[:3] + body_quat = body_transform[3:7] # xyzw + + # Get body yaw + body_rot = R.from_quat(body_quat) + body_euler = body_rot.as_euler('xyz') + body_yaw = body_euler[2] + + # Apply offset + offset_pos_rotated = np.array([ + self._offset_position[0] * math.cos(body_yaw) - self._offset_position[1] * math.sin(body_yaw), + self._offset_position[0] * math.sin(body_yaw) + self._offset_position[1] * math.cos(body_yaw), + self._offset_position[2] + ], dtype=np.float32) + + self.current_position = (body_pos + offset_pos_rotated).astype(np.float32) + self.current_yaw = body_yaw + self._offset_yaw + + # Update RaycastSensor pose + if self._raycast_sensor is not None: + direction = np.array([ + math.cos(self.current_yaw), + math.sin(self.current_yaw), + 0.0 + ], dtype=np.float32) + up = np.array([0.0, 0.0, 1.0], dtype=np.float32) + + self._raycast_sensor.update_camera_pose( + position=tuple(self.current_position), + direction=tuple(direction), + up=tuple(up), + ) + + def get_scan(self) -> Dict[str, np.ndarray]: + """Get LiDAR scan data. + + Returns: + Dictionary with keys: + - "angles": 1D array of angles in radians (in LiDAR's reference frame) + - "ranges": 1D array of range values in meters (-1 for no hit) + """ + # Update position for attached LiDAR + if self.lidar_type == LiDARType.ATTACHED: + self._update_position() + + # Get range measurements + ranges = self._get_ranges() + + return { + "angles": self._angles.copy(), + "ranges": ranges, + } + + def _get_ranges(self) -> np.ndarray: + """Get range measurements from RaycastSensor.""" + if self._raycast_sensor is None: + # Return placeholder (all -1 = no hit) + return np.full(self.num_rays, -1.0, dtype=np.float32) + + state = self._state_accessor() + if state is None: + return np.full(self.num_rays, -1.0, dtype=np.float32) + + # Evaluate raycast + self._raycast_sensor.eval(state) + depth = self._raycast_sensor.get_depth_image_numpy() + + # depth is shape (1, num_rays), flatten to (num_rays,) + ranges = depth.flatten().astype(np.float32) + + # RaycastSensor uses -1.0 for no hit, which matches expected LiDAR output + return ranges + + def shutdown_driver(self) -> None: + """Clean up resources.""" + self._raycast_sensor = None + self._model = None + super().shutdown_driver() diff --git a/ark/system/newton/newton_multibody.py b/ark/system/newton/newton_multibody.py new file mode 100644 index 0000000..0252703 --- /dev/null +++ b/ark/system/newton/newton_multibody.py @@ -0,0 +1,212 @@ +"""Newton multi-body component integration.""" + +from __future__ import annotations + +from enum import Enum +from pathlib import Path +from typing import Any, Dict, Sequence + +import newton +import numpy as np +import warp as wp + +from ark.tools.log import log +from ark.system.component.sim_component import SimComponent +from arktypes import flag_t, rigid_body_state_t + + +def _as_transform(position: Sequence[float], orientation: Sequence[float]) -> wp.transform: + pos = wp.vec3(*(float(v) for v in (position[:3] if len(position) >= 3 else [0.0, 0.0, 0.0]))) + if len(orientation) == 4: + quat = wp.quat(float(orientation[0]), float(orientation[1]), float(orientation[2]), float(orientation[3])) + elif len(orientation) == 3: + quat = wp.quat_from_euler(wp.vec3(float(orientation[0]), float(orientation[1]), float(orientation[2])), 0, 1, 2) + else: + quat = wp.quat_identity() + return wp.transform(pos, quat) + + +class SourceType(Enum): + URDF = "urdf" + PRIMITIVE = "primitive" + GROUND_PLANE = "ground_plane" + + +class NewtonMultiBody(SimComponent): + """Simulation object backed by Newton physics.""" + + def __init__( + self, + name: str, + builder: newton.ModelBuilder, + global_config: Dict[str, Any] | None = None, + ) -> None: + super().__init__(name, global_config) + + self.builder = builder + self._model: newton.Model | None = None + self._state_accessor = lambda: None + self._body_names: list[str] = [] + self._body_indices: list[int] = [] # Pre-stored body indices (stable across finalization) + + source_str = self.config.get("source", "primitive").lower() + source = SourceType(source_str) if source_str in SourceType._value2member_map_ else SourceType.PRIMITIVE + + if source is SourceType.URDF: + self._load_urdf() + elif source is SourceType.PRIMITIVE: + self._load_primitive() + elif source is SourceType.GROUND_PLANE: + self._load_ground_plane() + else: + log.warning(f"Newton multi-body '{self.name}': unsupported source '{source_str}'.") + + self.publisher_name = f"{self.name}/ground_truth/sim" + if self.publish_ground_truth: + self.component_channels_init({self.publisher_name: rigid_body_state_t}) + + def _load_urdf(self) -> None: + urdf_path = Path(self.config.get("urdf_path", "")) + if not urdf_path.is_absolute(): + base_dir = Path(self.config.get("class_dir", "")) + if base_dir.is_file(): + base_dir = base_dir.parent + urdf_path = base_dir / urdf_path + + # Validate URDF file exists + if not urdf_path.exists(): + log.error(f"Newton multi-body '{self.name}': URDF file not found at '{urdf_path}'") + log.error(f" Full path: {urdf_path.resolve()}") + return + + pre_body_count = self.builder.body_count + try: + self.builder.add_urdf(str(urdf_path), floating=False, enable_self_collisions=False) + log.ok(f"Newton multi-body '{self.name}': Loaded URDF '{urdf_path.name}' successfully") + except Exception as exc: # noqa: BLE001 + log.error(f"Newton multi-body '{self.name}': Failed to load URDF '{urdf_path}': {exc}") + return + + self._body_names = list(self.builder.body_key[pre_body_count : self.builder.body_count]) + log.info(f"Newton multi-body '{self.name}': Loaded {len(self._body_names)} bodies from URDF") + + def _load_primitive(self) -> None: + pos = self.config.get("base_position", [0.0, 0.0, 0.0]) + orn = self.config.get("base_orientation", [0.0, 0.0, 0.0, 1.0]) + mass = float(self.config.get("mass", 0.0)) + transform = _as_transform(pos, orn) + + pre_body_count = self.builder.body_count + body_idx = self.builder.add_body(xform=transform, mass=mass, key=self.name) + size = self.config.get("size", [1.0, 1.0, 1.0]) + + # Validate size array + if not isinstance(size, (list, tuple)) or len(size) != 3: + log.warning( + f"Newton multi-body '{self.name}': 'size' must be a list of 3 elements, " + f"got {size}. Using default [1.0, 1.0, 1.0]" + ) + size = [1.0, 1.0, 1.0] + + hx, hy, hz = [abs(float(component)) * 0.5 for component in size] + self.builder.add_shape_box(body_idx, hx=hx, hy=hy, hz=hz) + + # Store the body index directly (stable across finalization) + self._body_indices = [body_idx] + self._body_names = [self.name] # Keep for logging + + log.info( + f"Newton multi-body '{self.name}': Created primitive box " + f"(size=[{hx*2:.3f}, {hy*2:.3f}, {hz*2:.3f}], mass={mass:.3f}), " + f"body_idx={body_idx}" + ) + + def _load_ground_plane(self) -> None: + """Add a ground plane collision surface to the scene.""" + self.builder.add_ground_plane() + # Ground plane doesn't create a named body, so no body_names to track + self._body_names = [] + log.info(f"Newton multi-body '{self.name}': Added ground plane") + + def bind_runtime(self, model: newton.Model, state_accessor) -> None: + self._model = model + self._state_accessor = state_accessor + + # If body indices were already stored during loading (primitives), use them directly + if self._body_indices: + log.info( + f"Newton multi-body '{self.name}': " + f"Using pre-stored body indices: {self._body_indices}" + ) + return + + # Fallback: Resolve indices from body names (for URDF-loaded bodies) + if not self._body_names: + log.debug(f"Newton multi-body '{self.name}': No bodies to bind (e.g., ground plane)") + return + + key_to_index = {name: idx for idx, name in enumerate(model.body_key)} + self._body_indices = [key_to_index[name] for name in self._body_names if name in key_to_index] + + # Warn if bodies weren't found in model + missing_bodies = [name for name in self._body_names if name not in key_to_index] + if missing_bodies: + log.warning( + f"Newton multi-body '{self.name}': {len(missing_bodies)} body/bodies " + f"not found in finalized model: {missing_bodies}. " + f"This may happen if collapse_fixed_joints renamed bodies." + ) + + log.info( + f"Newton multi-body '{self.name}': " + f"Bound {len(self._body_indices)}/{len(self._body_names)} bodies to model" + ) + + def get_object_data(self) -> Dict[str, Any]: + state = self._state_accessor() + if state is None: + log.warning(f"Newton multi-body '{self.name}': State accessor returned None") + return { + "name": self.name, + "position": [0.0, 0.0, 0.0], + "orientation": [0.0, 0.0, 0.0, 1.0], + "lin_velocity": [0.0, 0.0, 0.0], + "ang_velocity": [0.0, 0.0, 0.0], + } + if not self._body_indices: + # No bodies to query (e.g., ground plane or unbound object) + return { + "name": self.name, + "position": [0.0, 0.0, 0.0], + "orientation": [0.0, 0.0, 0.0, 1.0], + "lin_velocity": [0.0, 0.0, 0.0], + "ang_velocity": [0.0, 0.0, 0.0], + } + + body_idx = self._body_indices[0] + pose = state.body_q.numpy()[body_idx] + vel = state.body_qd.numpy()[body_idx] if state.body_qd is not None else np.zeros(6) + position = pose[:3].tolist() + orientation = pose[3:].tolist() + linear_velocity = vel[:3].tolist() + angular_velocity = vel[3:].tolist() + return { + "name": self.name, + "position": position, + "orientation": orientation, + "lin_velocity": linear_velocity, + "ang_velocity": angular_velocity, + } + + def pack_data(self, data_dict: Dict[str, Any]) -> Dict[str, rigid_body_state_t]: + msg = rigid_body_state_t() + msg.name = data_dict["name"] + msg.position = data_dict["position"] + msg.orientation = data_dict["orientation"] + msg.lin_velocity = data_dict["lin_velocity"] + msg.ang_velocity = data_dict["ang_velocity"] + return {self.publisher_name: msg} + + def reset_component(self, channel, msg) -> flag_t: + log.warning(f"Reset not implemented for Newton multi-body '{self.name}'.") + return flag_t() diff --git a/ark/system/newton/newton_robot_driver.py b/ark/system/newton/newton_robot_driver.py new file mode 100644 index 0000000..fd56068 --- /dev/null +++ b/ark/system/newton/newton_robot_driver.py @@ -0,0 +1,516 @@ +"""Newton robot driver bridging ARK control with the Newton simulator.""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any, Callable, Dict, Iterable, Sequence + +import newton +import numpy as np +import warp as wp +from newton.selection import ArticulationView + +from ark.tools.log import log +from ark.system.driver.robot_driver import ControlType, SimRobotDriver + + +def _as_quaternion(values: Sequence[float]) -> wp.quat: + if len(values) == 4: + return wp.quat(float(values[0]), float(values[1]), float(values[2]), float(values[3])) + if len(values) == 3: + return wp.quat_from_euler(wp.vec3(float(values[0]), float(values[1]), float(values[2])), 0, 1, 2) + return wp.quat_identity() + + +def _as_transform(position: Sequence[float], orientation: Sequence[float]) -> wp.transform: + pos = wp.vec3(float(position[0]), float(position[1]), float(position[2])) if len(position) >= 3 else wp.vec3() + quat = _as_quaternion(orientation) + return wp.transform(pos, quat) + + +class NewtonRobotDriver(SimRobotDriver): + """Driver that exposes Newton articulation controls to ARK.""" + + def __init__( + self, + component_name: str, + component_config: Dict[str, Any], + builder: newton.ModelBuilder, + ) -> None: + super().__init__(component_name, component_config, True) + + self.builder = builder + self._model: newton.Model | None = None + self._control: newton.Control | None = None + self._state_accessor: Callable[[], newton.State] = lambda: None + self._dt: float = 0.0 + + self._joint_names: list[str] = [] + self._joint_index_map: dict[str, int] = {} + self._body_names: list[str] = [] + + self._joint_q_start: np.ndarray | None = None + self._joint_qd_start: np.ndarray | None = None + self._joint_dof_dim: np.ndarray | None = None + + # ArticulationView for proper Newton control API (UR10 pattern) + self._articulation_view: ArticulationView | None = None + self._control_handle: wp.array | None = None + + self._last_commanded_torque: dict[str, float] = {} + self._torque_groups = { + name + for name, cfg in self.config.get("joint_groups", {}).items() + if cfg.get("control_mode") == ControlType.TORQUE.value + } + + self.base_position = self.config.get("base_position", [0.0, 0.0, 0.0]) + self.base_orientation = self.config.get("base_orientation", [0.0, 0.0, 0.0, 1.0]) + self.initial_configuration = list(self.config.get("initial_configuration", [])) + + self._load_into_builder() + + def _resolve_path(self, key: str) -> Path: + raw = self.config.get(key) + if raw is None: + raise ValueError(f"Newton robot driver requires '{key}' in config.") + path = Path(raw) + class_dir = Path(self.config.get("class_dir", "")) + if class_dir.is_file(): + class_dir = class_dir.parent + if not path.is_absolute(): + path = class_dir / path + return path + + def _load_into_builder(self) -> None: + pre_joint_count = self.builder.joint_count + pre_body_count = self.builder.body_count + + urdf_path = self._resolve_path("urdf_path") + + # Validate URDF file exists + if not urdf_path.exists(): + log.error(f"Newton robot driver: URDF file not found at '{urdf_path}'") + log.error(f" Full path: {urdf_path.resolve()}") + raise FileNotFoundError(f"URDF file not found: {urdf_path}") + + xform = _as_transform(self.base_position, self.base_orientation) + floating = bool(self.config.get("floating", False)) + enable_self_collisions = bool(self.config.get("enable_self_collisions", False)) + collapse_fixed = bool(self.config.get("collapse_fixed_joints", False)) + + # Store pre-load joint/body counts to identify which joints were added + pre_joint_count = self.builder.joint_count + pre_joint_dof_count = len(self.builder.joint_target_ke) + + try: + self.builder.add_urdf( + str(urdf_path), + xform=xform, + floating=floating, + enable_self_collisions=enable_self_collisions, + collapse_fixed_joints=collapse_fixed, + ) + log.ok(f"Newton robot driver: Loaded URDF '{urdf_path.name}' successfully") + except Exception as exc: # noqa: BLE001 + log.error(f"Newton robot driver: Failed to load URDF '{urdf_path}': {exc}") + raise + + # CRITICAL FIX: Apply joint defaults to URDF joints via post-processing + # Newton's add_urdf() ignores default_joint_cfg, so we must manually apply it + # to all joints that were just loaded (from pre_joint_dof_count to current) + post_joint_dof_count = len(self.builder.joint_target_ke) + num_new_dofs = post_joint_dof_count - pre_joint_dof_count + + if num_new_dofs > 0: + # Get defaults from builder + default_cfg = self.builder.default_joint_cfg + + # CRITICAL: Apply initial_configuration to builder.joint_q BEFORE finalize + # This ensures the model starts in the correct configuration, avoiding + # the "explosion on load" problem where robot tries to jump from zero to config + new_joint_count = self.builder.joint_count - pre_joint_count + if self.initial_configuration: + for j_idx in range(new_joint_count): + if j_idx >= len(self.initial_configuration): + break + # Get DOF range for this joint + joint_idx = pre_joint_count + j_idx + q_start = int(self.builder.joint_q_start[joint_idx]) + q_end = int(self.builder.joint_q_start[joint_idx + 1]) if (joint_idx + 1) < len(self.builder.joint_q_start) else len(self.builder.joint_q) + width = q_end - q_start + if width <= 0: + continue + target = self.initial_configuration[j_idx] + values = target if isinstance(target, Sequence) else [target] * width + for offset in range(min(width, len(values))): + self.builder.joint_q[q_start + offset] = float(values[offset]) + log.info( + f"Newton robot driver: Applied initial_configuration to builder.joint_q " + f"({len(self.initial_configuration)} values)" + ) + + # Apply to all newly loaded joint DOFs + for i in range(pre_joint_dof_count, post_joint_dof_count): + self.builder.joint_dof_mode[i] = default_cfg.mode + self.builder.joint_target_ke[i] = default_cfg.target_ke + self.builder.joint_target_kd[i] = default_cfg.target_kd + self.builder.joint_limit_ke[i] = default_cfg.limit_ke + self.builder.joint_limit_kd[i] = default_cfg.limit_kd + self.builder.joint_armature[i] = default_cfg.armature + + # CRITICAL: Initialize joint_target to match joint_q (which now has initial config) + # Without this, PD controller has no target to track! + # This follows Newton's own examples (see example_basic_urdf.py:72) + self.builder.joint_target[i] = self.builder.joint_q[i] + + log.ok( + f"Newton robot driver: Applied joint defaults to {num_new_dofs} DOFs " + f"(ke={default_cfg.target_ke}, kd={default_cfg.target_kd}, mode={default_cfg.mode})" + ) + + self._joint_names = list( + self.builder.joint_key[pre_joint_count : self.builder.joint_count] + ) + self._body_names = list( + self.builder.body_key[pre_body_count : self.builder.body_count] + ) + + def bind_runtime( + self, + model: newton.Model, + control: newton.Control, + state_accessor: Callable[[], newton.State], + dt: float, + ) -> None: + self._model = model + self._control = control + self._state_accessor = state_accessor + self._dt = dt + + key_to_index = {name: idx for idx, name in enumerate(model.joint_key)} + missing_joints = [] + for name in self._joint_names: + if name in key_to_index: + self._joint_index_map[name] = key_to_index[name] + else: + missing_joints.append(name) + + # Warn about joint mapping issues + if missing_joints: + log.warning( + f"Newton robot driver '{self.component_name}': " + f"{len(missing_joints)} joint(s) from URDF not found in model: {missing_joints}" + ) + + log.info( + f"Newton robot driver '{self.component_name}': " + f"Mapped {len(self._joint_index_map)}/{len(self._joint_names)} joints to model" + ) + + self._joint_q_start = model.joint_q_start.numpy() + self._joint_qd_start = model.joint_qd_start.numpy() + self._joint_dof_dim = model.joint_dof_dim.numpy() + + # Create ArticulationView for proper Newton control API (UR10 example pattern) + # This is CRITICAL for TARGET_POSITION mode to work correctly + try: + # Use component name as pattern (e.g., "panda" matches bodies with "panda" in name) + pattern = f"*{self.component_name}*" + self._articulation_view = ArticulationView( + model, + pattern, + exclude_joint_types=[newton.JointType.FREE, newton.JointType.DISTANCE] + ) + + # Get control handle for joint targets + self._control_handle = self._articulation_view.get_attribute("joint_target", control) + + log.ok( + f"Newton robot driver '{self.component_name}': " + f"Created ArticulationView (pattern='{pattern}', count={self._articulation_view.count}, " + f"dofs={self._control_handle.shape if self._control_handle is not None else 'N/A'})" + ) + except Exception as exc: + log.error( + f"Newton robot driver '{self.component_name}': " + f"Failed to create ArticulationView: {exc}. Falling back to direct control." + ) + self._articulation_view = None + self._control_handle = None + + # NOTE: _apply_initial_configuration() is NOT called here because initial config + # is already applied to builder.joint_q and builder.joint_target in _load_into_builder() + # BEFORE finalize(). After model.state(), state.joint_q inherits these values, and + # the backend's eval_fk with model.joint_q correctly computes body transforms. + # Calling _apply_initial_configuration() here would be redundant and uses state.joint_q + # for eval_fk instead of model.joint_q, causing inconsistency. + # The method is kept for sim_reset() which needs to re-apply config at runtime. + + def _apply_initial_configuration(self) -> None: + """Apply initial joint configuration to runtime state and control. + + CRITICAL: This writes to state.joint_q (runtime simulation state), + NOT model.joint_q (static template). The physics solver reads/writes + state.joint_q, so that's where initial config must be applied. + """ + if not self.initial_configuration or not self._joint_names: + log.info(f"Newton robot driver '{self.component_name}': No initial configuration to apply") + return + + state = self._state_accessor() + if state is None or state.joint_q is None: + log.warning( + f"Newton robot driver '{self.component_name}': " + f"Cannot apply initial config - state not available yet" + ) + return + + # Validate configuration length + if len(self.initial_configuration) != len(self._joint_names): + log.warning( + f"Newton robot driver '{self.component_name}': " + f"initial_configuration length ({len(self.initial_configuration)}) != " + f"joint count ({len(self._joint_names)}). Using available values." + ) + + # Get state arrays as numpy (mutable copies) + joint_q_np = state.joint_q.numpy().copy() + joint_qd_np = state.joint_qd.numpy().copy() + + # Apply initial positions to STATE (not model!) + for idx, joint_name in enumerate(self._joint_names): + if idx >= len(self.initial_configuration): + break + + model_idx = self._joint_index_map.get(joint_name) + if model_idx is None: + continue + + if model_idx >= len(self._joint_q_start) - 1: + continue + + start = int(self._joint_q_start[model_idx]) + end = int(self._joint_q_start[model_idx + 1]) + width = end - start + if width <= 0: + continue + + target = self.initial_configuration[idx] + values = target if isinstance(target, Sequence) else [target] * width + + # Write to state.joint_q + for offset in range(width): + if offset < len(values): + joint_q_np[start + offset] = float(values[offset]) + + # Zero out velocities in state.joint_qd + vel_start = int(self._joint_qd_start[model_idx]) + vel_end = int(self._joint_qd_start[model_idx + 1]) + for offset in range(vel_end - vel_start): + joint_qd_np[vel_start + offset] = 0.0 + + # Set control target + self._write_joint_target(joint_name, target) + + # Write modified arrays back to state + state.joint_q.assign(joint_q_np) + state.joint_qd.assign(joint_qd_np) + + # Update forward kinematics with STATE arrays + try: + newton.eval_fk( + self._model, + state.joint_q, # KEY FIX: Use state.joint_q, not model.joint_q + state.joint_qd, + state + ) + log.ok( + f"Newton robot driver '{self.component_name}': " + f"Applied initial configuration to runtime state and updated FK" + ) + except Exception as exc: # noqa: BLE001 + log.error( + f"Newton robot driver '{self.component_name}': " + f"Failed to evaluate FK after initial configuration: {exc}" + ) + + def _write_joint_target(self, joint_name: str, value: float | Sequence[float]) -> None: + """Write joint target using ArticulationView API (UR10 example pattern). + + This follows Newton's official pattern from example_robot_ur10.py which uses + ArticulationView.set_attribute() instead of direct array assignment. + """ + if self._control is None or self._control.joint_target is None: + log.warning(f"Newton robot driver: Cannot write joint target, control not initialized") + return + idx = self._joint_index_map.get(joint_name) + if idx is None: + return # Joint not mapped, already warned in bind_runtime + # CRITICAL: Use joint_q_start (position indices) not joint_qd_start (velocity indices) + # Joint targets are POSITION targets, so they must use position DOF indices + if self._joint_q_start is None: + log.warning(f"Newton robot driver: joint_q_start array not initialized") + return + + # Validate array bounds + if idx >= len(self._joint_q_start) - 1: + log.error( + f"Newton robot driver: Joint index {idx} out of bounds for joint_q_start " + f"(size {len(self._joint_q_start)})" + ) + return + + start = int(self._joint_q_start[idx]) + end = int(self._joint_q_start[idx + 1]) + width = end - start + if width <= 0: + return + values = value if isinstance(value, Sequence) else [value] * width + + # APPROACH 1: ArticulationView API (if available) + if self._articulation_view is not None and self._control_handle is not None: + # Get current control handle values (shape: [num_envs, num_dofs]) + handle_np = self._control_handle.numpy().copy() + + # Single environment, so index [0, dof] + env_idx = 0 + for offset in range(width): + if offset < len(values): + handle_np[env_idx, start + offset] = float(values[offset]) + + # Write back via ArticulationView + self._control_handle.assign(handle_np) + self._articulation_view.set_attribute("joint_target", self._control, self._control_handle) + + # APPROACH 2: Direct assignment fallback (if ArticulationView failed) + else: + joint_target_np = self._control.joint_target.numpy().copy() + for offset in range(width): + if offset < len(values): + joint_target_np[start + offset] = float(values[offset]) + self._control.joint_target.assign(joint_target_np) + + if not hasattr(self, '_direct_write_warned'): + self._direct_write_warned = True + log.warning(f"Newton robot driver: Using direct .assign() (ArticulationView not available)") + + def _write_joint_force(self, joint_name: str, value: float | Sequence[float]) -> None: + if self._control is None or self._control.joint_f is None: + log.warning(f"Newton robot driver: Cannot write joint force, control not initialized") + return + idx = self._joint_index_map.get(joint_name) + if idx is None: + return # Joint not mapped, already warned in bind_runtime + if self._joint_qd_start is None: + log.warning(f"Newton robot driver: joint_qd_start array not initialized") + return + + # Validate array bounds + if idx >= len(self._joint_qd_start) - 1: + log.error( + f"Newton robot driver: Joint index {idx} out of bounds for joint_qd_start " + f"(size {len(self._joint_qd_start)})" + ) + return + + start = int(self._joint_qd_start[idx]) + end = int(self._joint_qd_start[idx + 1]) + width = end - start + if width <= 0: + return + values = value if isinstance(value, Sequence) else [value] * width + # Get numpy copy, modify, and assign back to device + joint_f_np = self._control.joint_f.numpy().copy() + for offset in range(width): + if offset < len(values): + joint_f_np[start + offset] = float(values[offset]) + self._control.joint_f.assign(joint_f_np) + self._last_commanded_torque[joint_name] = float(values[0]) + + def check_torque_status(self) -> bool: + return bool(self._torque_groups) + + def _gather_joint_values( + self, + joints: Iterable[str], + array_getter: Callable[[int], float | Sequence[float]], + ) -> Dict[str, float | Sequence[float]]: + result: Dict[str, float | Sequence[float]] = {} + for joint in joints: + idx = self._joint_index_map.get(joint) + if idx is None: + continue + result[joint] = array_getter(idx) + return result + + def pass_joint_positions(self, joints: list[str]) -> dict[str, float | Sequence[float]]: + state = self._state_accessor() + if state is None or state.joint_q is None or self._joint_q_start is None: + return {joint: 0.0 for joint in joints} + + # Convert Warp array to numpy once for efficient indexing + joint_q_np = state.joint_q.numpy() + + def getter(idx: int) -> float | Sequence[float]: + start = int(self._joint_q_start[idx]) + end = int(self._joint_q_start[idx + 1]) + width = end - start + if width <= 0: + return 0.0 + if width == 1: + return float(joint_q_np[start]) + return [float(joint_q_np[start + k]) for k in range(width)] + + return self._gather_joint_values(joints, getter) + + def pass_joint_velocities(self, joints: list[str]) -> dict[str, float | Sequence[float]]: + state = self._state_accessor() + if state is None or state.joint_qd is None or self._joint_qd_start is None: + return {joint: 0.0 for joint in joints} + + # Convert Warp array to numpy once for efficient indexing + joint_qd_np = state.joint_qd.numpy() + + def getter(idx: int) -> float | Sequence[float]: + start = int(self._joint_qd_start[idx]) + end = int(self._joint_qd_start[idx + 1]) + width = end - start + if width <= 0: + return 0.0 + if width == 1: + return float(joint_qd_np[start]) + return [float(joint_qd_np[start + k]) for k in range(width)] + + return self._gather_joint_values(joints, getter) + + def pass_joint_efforts(self, joints: list[str]) -> dict[str, float]: + return {joint: self._last_commanded_torque.get(joint, 0.0) for joint in joints} + + def pass_joint_group_control_cmd( + self, + control_mode: str, + cmd: dict[str, float | Sequence[float]], + **_: Any, + ) -> None: + mode = ControlType(control_mode.lower()) + if mode in {ControlType.POSITION, ControlType.VELOCITY}: + for joint, value in cmd.items(): + self._write_joint_target(joint, value) + elif mode == ControlType.TORQUE: + for joint, value in cmd.items(): + self._write_joint_force(joint, value) + else: + log.warning(f"Newton robot driver: unsupported control mode '{control_mode}'.") + + def sim_reset( + self, + base_pos: list[float], + base_orn: list[float], + init_pos: list[float], + ) -> None: + self.base_position = base_pos + self.base_orientation = base_orn + if init_pos: + self.initial_configuration = list(init_pos) + self._apply_initial_configuration() diff --git a/ark/system/newton/newton_viewer.py b/ark/system/newton/newton_viewer.py new file mode 100644 index 0000000..8d83dea --- /dev/null +++ b/ark/system/newton/newton_viewer.py @@ -0,0 +1,206 @@ +"""Newton viewer manager for ARK framework. + +This module encapsulates all GUI/visualization logic for the Newton backend, +keeping the physics backend clean and modular. +""" + +from __future__ import annotations + +import os +from typing import Any, Optional + +import newton +import warp as wp + +from ark.tools.log import log + + +class NewtonViewerManager: + """Manages Newton viewer lifecycle and rendering. + + This class handles all visualization concerns for the Newton backend, + allowing the physics simulation code to remain minimal and focused. + The viewer can be completely disabled by setting connection_mode to "DIRECT". + """ + + def __init__( + self, + sim_config: dict[str, Any], + model: newton.Model, + ) -> None: + """Initialize Newton viewer based on configuration. + + Args: + sim_config: Simulator configuration dictionary containing: + - connection_mode: "GUI" for interactive viewer, "DIRECT" for headless + - viewer_width: Window width in pixels (default: 1280) + - viewer_height: Window height in pixels (default: 800) + - viewer_vsync: Enable vsync (default: False) + - show_contacts: Show contact forces (default: True) + - mp4: Optional path to record MP4 video + model: Finalized Newton physics model + """ + from newton.viewer import ViewerGL, ViewerNull + + self.model = model + self.sim_config = sim_config + self.viewer: Optional[newton.viewer.ViewerBase] = None + self._gui_enabled = False + + # Get configuration parameters + connection_mode = sim_config.get("connection_mode", "DIRECT").upper() + viewer_width = int(sim_config.get("viewer_width", 1280)) + viewer_height = int(sim_config.get("viewer_height", 800)) + viewer_vsync = bool(sim_config.get("viewer_vsync", False)) + show_contacts = bool(sim_config.get("show_contacts", True)) + + # Check for GUI mode + if connection_mode == "GUI": + # Check if DISPLAY is available (for X11-based systems) + display = os.environ.get("DISPLAY") + headless = display in (None, "") + + if headless: + log.warning( + "Newton viewer: DISPLAY environment variable not set, " + "GUI mode unavailable. Falling back to headless mode." + ) + + try: + if headless: + # Skip ViewerGL if we know display is unavailable + raise RuntimeError("DISPLAY unavailable") + + # Create interactive OpenGL viewer + self.viewer = ViewerGL( + width=viewer_width, + height=viewer_height, + vsync=viewer_vsync, + headless=False, + ) + self.viewer.set_model(model) + + # Position camera to view scene (matches standalone test) + # Camera should be positioned to see robot at origin + self.viewer.set_camera( + pos=wp.vec3(-3.0, 3.0, 2.0), # Back-left, elevated (matching standalone) + pitch=-20.0, # Look down slightly + yaw=225.0 # Face toward origin (matching standalone) + ) + + self.viewer.show_contacts = show_contacts + self.viewer.show_collision = True # Show collision geometry (robot meshes) + self.viewer.show_static = True # Show static shapes (ground plane) + self._gui_enabled = True + + log.ok( + f"Newton viewer: ViewerGL initialized successfully " + f"({viewer_width}x{viewer_height}, vsync={viewer_vsync}, " + f"contacts={show_contacts})" + ) + + except Exception as exc: # noqa: BLE001 + # Fall back to null viewer if ViewerGL fails + log.warning( + f"Newton viewer: ViewerGL initialization failed ({exc}). " + "Falling back to headless mode (ViewerNull)." + ) + self.viewer = ViewerNull() + self.viewer.set_model(model) + self._gui_enabled = False + log.info("Newton viewer: Running in headless mode (no visualization)") + + else: + # DIRECT mode - headless operation + self.viewer = ViewerNull() + self.viewer.set_model(model) + self._gui_enabled = False + log.info("Newton viewer: Running in DIRECT mode (headless, no visualization)") + + # Check for MP4 recording + mp4_path = sim_config.get("mp4") + if mp4_path: + log.info(f"Newton viewer: MP4 recording requested: {mp4_path}") + log.warning("Newton viewer: MP4 recording not yet implemented") + # TODO: Implement MP4 recording support + + def render( + self, + state: newton.State, + contacts: Optional[newton.Contacts], + sim_time: float, + ) -> None: + """Render current simulation state to the viewer. + + This method should be called once per simulation step (not per substep) + to update the visualization with the current physics state. + + Args: + state: Current Newton simulation state + contacts: Current contact information (optional) + sim_time: Current simulation time in seconds + """ + if self.viewer is None: + return + + try: + # Begin frame with current simulation time + self.viewer.begin_frame(sim_time) + + # Log current state (updates all bodies, joints, etc.) + self.viewer.log_state(state) + + # Optionally render contact forces + if contacts is not None and hasattr(self.viewer, "show_contacts"): + if self.viewer.show_contacts: + self.viewer.log_contacts(contacts, state) + + # Finalize frame + self.viewer.end_frame() + + except Exception as exc: # noqa: BLE001 + # Log error but don't crash simulation if rendering fails + log.warning(f"Newton viewer: Frame render failed: {exc}") + + def is_running(self) -> bool: + """Check if the viewer window is still open. + + Returns: + True if viewer is running (or in headless mode), False if window was closed. + """ + if self.viewer is None: + return True + + try: + return self.viewer.is_running() + except Exception: # noqa: BLE001, S110 + # If checking fails, assume viewer is still running to avoid premature exit + return True + + def shutdown(self) -> None: + """Clean up viewer resources. + + This should be called when the simulation is shutting down to properly + release OpenGL contexts and other viewer resources. + """ + if self.viewer is None: + return + + try: + # ViewerGL has a close() method + if hasattr(self.viewer, "close"): + self.viewer.close() + log.info("Newton viewer: Shutdown complete") + except Exception as exc: # noqa: BLE001 + log.warning(f"Newton viewer: Shutdown failed: {exc}") + finally: + self.viewer = None + + @property + def gui_enabled(self) -> bool: + """Check if interactive GUI is enabled. + + Returns: + True if ViewerGL is active, False if running headless (ViewerNull). + """ + return self._gui_enabled diff --git a/ark/system/newton/scene_adapters/__init__.py b/ark/system/newton/scene_adapters/__init__.py new file mode 100644 index 0000000..5ffc926 --- /dev/null +++ b/ark/system/newton/scene_adapters/__init__.py @@ -0,0 +1,27 @@ +"""Scene adapters for solver-specific geometry translation. + +This package provides adapters that translate solver-agnostic scene descriptions +(GeometryDescriptors) into solver-specific Newton API calls. + +Available adapters: +- base_adapter.SolverSceneAdapter: Abstract base class +- xpbd_adapter.XPBDAdapter: XPBD solver adapter (native ground plane) +- mujoco_adapter.MuJoCoAdapter: MuJoCo solver adapter (box-based ground) +- featherstone_adapter.FeatherstoneAdapter: Featherstone solver adapter +- semiimplicit_adapter.SemiImplicitAdapter: SemiImplicit solver adapter + +Usage: + >>> from ark.system.newton.scene_adapters import XPBDAdapter, MuJoCoAdapter + >>> adapter = XPBDAdapter(builder) + >>> adapter.adapt_ground_plane(descriptor) +""" + +from ark.system.newton.scene_adapters.base_adapter import SolverSceneAdapter +from ark.system.newton.scene_adapters.xpbd_adapter import XPBDAdapter +from ark.system.newton.scene_adapters.mujoco_adapter import MuJoCoAdapter + +__all__ = [ + "SolverSceneAdapter", + "XPBDAdapter", + "MuJoCoAdapter", +] diff --git a/ark/system/newton/scene_adapters/base_adapter.py b/ark/system/newton/scene_adapters/base_adapter.py new file mode 100644 index 0000000..b437323 --- /dev/null +++ b/ark/system/newton/scene_adapters/base_adapter.py @@ -0,0 +1,163 @@ +"""Base abstract class for solver-specific scene adapters. + +This module defines the interface that all solver adapters must implement. +Adapters translate solver-agnostic GeometryDescriptors into solver-specific +geometry creation API calls. + +The adapter pattern allows ARK to support multiple Newton physics solvers +(XPBD, MuJoCo, Featherstone, SemiImplicit) with the same user configuration. + +Example: + >>> from ark.system.newton.scene_adapters import XPBDAdapter, MuJoCoAdapter + >>> # Both adapters implement the same interface + >>> xpbd_adapter = XPBDAdapter(builder) + >>> mujoco_adapter = MuJoCoAdapter(builder) + >>> # But they handle ground planes differently + >>> xpbd_adapter.adapt_ground_plane(descriptor) # Uses native plane + >>> mujoco_adapter.adapt_ground_plane(descriptor) # Uses box geometry +""" + +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING, Any, Dict + +if TYPE_CHECKING: + import newton + from ark.system.newton.newton_builder import NewtonBuilder + from ark.system.newton.geometry_descriptors import GeometryDescriptor + + +class SolverSceneAdapter(ABC): + """Abstract base class for solver-specific scene adaptation. + + Each Newton physics solver (XPBD, MuJoCo, Featherstone, etc.) gets a + concrete adapter subclass that knows how to: + + 1. Translate generic geometry descriptions to solver-compatible forms + 2. Handle solver-specific constraints (e.g., MuJoCo ground plane limitation) + 3. Create and configure the appropriate solver instance + + The adapter pattern keeps solver-specific logic isolated, making it easy + to add new solvers without modifying existing code. + + Attributes: + builder: NewtonBuilder instance that provides access to Newton's + ModelBuilder for adding geometry + """ + + def __init__(self, builder: "NewtonBuilder"): + """Initialize adapter with a scene builder. + + Args: + builder: NewtonBuilder instance for scene construction + """ + self.builder = builder + + @property + @abstractmethod + def solver_name(self) -> str: + """Human-readable name of the solver (for logging). + + Returns: + Solver name (e.g., "XPBD", "MuJoCo", "Featherstone") + """ + pass + + @abstractmethod + def adapt_ground_plane(self, descriptor: "GeometryDescriptor") -> None: + """Add ground plane using solver-compatible geometry. + + Different solvers may require different implementations: + - XPBD, Featherstone: Can use builder.add_ground_plane() natively + - MuJoCo: Requires explicit box geometry as workaround + + This method must implement the appropriate strategy for this solver. + + Args: + descriptor: Unified ground plane description containing: + - parameters: size, thickness + - physics: friction, restitution + - metadata: name, semantic_type + + Example: + XPBD implementation: + >>> def adapt_ground_plane(self, descriptor): + ... self.builder.builder.add_ground_plane() + ... log.ok("XPBD: Added native ground plane") + + MuJoCo implementation: + >>> def adapt_ground_plane(self, descriptor): + ... thickness = descriptor.parameters["thickness"] + ... self.builder.builder.add_shape_box( + ... body=-1, hx=100, hy=thickness, hz=100, ... + ... ) + ... log.ok("MuJoCo: Added ground as box geometry") + """ + pass + + @abstractmethod + def create_solver( + self, + model: "newton.Model", + sim_cfg: Dict[str, Any] + ) -> "newton.solvers.SolverBase": + """Create and configure the solver instance. + + This method constructs the appropriate solver type with solver-specific + parameters extracted from the simulation configuration. + + Args: + model: Finalized Newton model + sim_cfg: Simulation configuration dictionary, typically containing: + - solver: Solver name (already known by adapter) + - solver_iterations: Number of solver iterations + - substeps: Number of substeps per frame + - newton_physics: Newton-specific parameters + + Returns: + Configured solver instance (SolverXPBD, SolverMuJoCo, etc.) + + Example: + XPBD implementation: + >>> def create_solver(self, model, sim_cfg): + ... iterations = int(sim_cfg.get("solver_iterations", 1)) + ... # XPBD needs at least 8 iterations for position control + ... iterations = max(iterations, 8) + ... return newton.solvers.SolverXPBD(model, iterations=iterations) + + MuJoCo implementation: + >>> def create_solver(self, model, sim_cfg): + ... # MuJoCo uses default parameters (20 iterations) + ... return newton.solvers.SolverMuJoCo(model) + """ + pass + + def validate_scene(self, global_config: Dict[str, Any]) -> list[str]: + """Check for solver-specific incompatibilities in scene configuration. + + This method is called early in initialization to detect configuration + issues before attempting to build the scene. It can return warnings + or error messages that help users fix their configs. + + Args: + global_config: Complete ARK global configuration dictionary + + Returns: + List of warning/error messages. Empty list if scene is valid. + Messages should be prefixed with "WARNING:" or "ERROR:" for severity. + + Example: + >>> def validate_scene(self, global_config): + ... issues = [] + ... sim_cfg = global_config.get("simulator", {}).get("config", {}) + ... iterations = sim_cfg.get("solver_iterations", 1) + ... substeps = sim_cfg.get("substeps", 1) + ... + ... if iterations * substeps < 20: + ... issues.append( + ... "WARNING: XPBD with TARGET_POSITION needs 20+ " + ... f"effective iterations. Current: {iterations * substeps}" + ... ) + ... return issues + """ + # Base implementation - subclasses can override to add solver-specific checks + return [] diff --git a/ark/system/newton/scene_adapters/mujoco_adapter.py b/ark/system/newton/scene_adapters/mujoco_adapter.py new file mode 100644 index 0000000..8075e4d --- /dev/null +++ b/ark/system/newton/scene_adapters/mujoco_adapter.py @@ -0,0 +1,180 @@ +"""MuJoCo solver adapter with box-based ground plane workaround. + +MuJoCo is a direct, global solver with quadratic convergence, excellent for +articulated robot control. However, it cannot use Newton's native ground_plane +API - it requires explicit box geometry as a workaround. + +Key characteristics of MuJoCo solver: +- Direct/Newton solver (quadratic convergence, <10 iterations needed) +- Excellent for rigid articulated systems (robots, manipulation) +- Validated gradient support (production-ready for RL training) +- Ground plane: Requires box geometry workaround (planes must be on body=-1) +- Default 20 iterations is sufficient (no tuning needed) + +Why the ground plane limitation exists: +MuJoCo converts Newton geometry to MJCF format. Newton's add_ground_plane() +creates a procedural infinite plane, but MuJoCo's MJCF requires explicit geom +elements. The converter can't translate procedural planes, causing a geometry +count mismatch error. Solution: Use a large, thin box as ground substitute. +""" + +from typing import TYPE_CHECKING, Any, Dict + +import warp as wp + +from ark.tools.log import log +from ark.system.newton.scene_adapters.base_adapter import SolverSceneAdapter + +if TYPE_CHECKING: + import newton + from ark.system.newton.geometry_descriptors import GeometryDescriptor + + +class MuJoCoAdapter(SolverSceneAdapter): + """Adapter for Newton's MuJoCo solver. + + MuJoCo requires a workaround for ground planes - we substitute a large, + thin box geometry placed at the world origin. This provides equivalent + collision behavior for most scenarios. + + Example: + >>> adapter = MuJoCoAdapter(builder) + >>> adapter.adapt_ground_plane(descriptor) + # Creates 100m × 100m × 0.02m box instead of infinite plane + >>> solver = adapter.create_solver(model, sim_cfg) + # Returns SolverMuJoCo with default parameters + """ + + @property + def solver_name(self) -> str: + """Return solver display name.""" + return "MuJoCo" + + def adapt_ground_plane(self, descriptor: "GeometryDescriptor") -> None: + """Add ground plane using box geometry workaround for MuJoCo. + + MuJoCo solver cannot handle builder.add_ground_plane() due to MJCF + conversion limitations. Instead, we create a large, thin box attached + to the world body (body=-1) positioned so its top surface is at z=0. + + This workaround is transparent to users and provides equivalent collision + behavior for most robotic scenarios (flat ground, no edge effects). + + Args: + descriptor: Ground plane description containing: + - parameters.thickness: Half-height of ground box (default 0.02m) + - parameters.size: Half-extent in x/z (default 100m) + - physics.friction: Ground friction coefficient + - physics.restitution: Ground restitution coefficient + + Implementation details: + Box dimensions: 200m × 0.04m × 200m (default) + Position: [0, -thickness, 0] (top surface at y=0 for up_axis=Y) + Body: -1 (world-fixed, infinite mass) + """ + import newton + + params = descriptor.parameters + physics = descriptor.physics + + # Extract ground properties from descriptor + thickness = params.get("thickness", 0.02) # Half-height (2cm default) + size = params.get("size", 100.0) # Half-extent in x/z (100m default) + friction = physics.get("friction", 0.8) + restitution = physics.get("restitution", 0.0) + + # Create shape configuration + ground_cfg = newton.ModelBuilder.ShapeConfig() + ground_cfg.density = 0.0 # Static body (infinite mass) + ground_cfg.ke = 1.0e5 # Contact stiffness + ground_cfg.kd = 1.0e3 # Contact damping + ground_cfg.mu = friction + ground_cfg.restitution = restitution + + # Add large flat box as ground substitute + # Position: top surface at origin (assuming up_axis is Y) + # Box center is at y = -thickness, so top face is at y = 0 + self.builder.builder.add_shape_box( + body=-1, # World body (static) + hx=size, # Half-extent in x (100m → 200m total) + hy=thickness, # Half-height in y (0.02m → 0.04m total) + hz=size, # Half-extent in z (100m → 200m total) + xform=wp.transform( + wp.vec3(0.0, -thickness, 0.0), # Position (center below origin) + wp.quat_identity() # No rotation + ), + cfg=ground_cfg + ) + + log.ok( + f"MuJoCo adapter: Added ground as box geometry " + f"({size*2:.0f}m × {thickness*2:.3f}m × {size*2:.0f}m, " + f"friction={friction:.2f})" + ) + + def create_solver( + self, + model: "newton.Model", + sim_cfg: Dict[str, Any] + ) -> "newton.solvers.SolverBase": + """Create MuJoCo solver with default parameters. + + MuJoCo solver uses robust defaults (20 iterations, CG solver, + implicitfast integrator) that work well for most robotic scenarios. + Unlike XPBD, MuJoCo doesn't require iteration tuning because its + direct solver achieves quadratic convergence. + + Args: + model: Finalized Newton model + sim_cfg: Simulation config (solver_iterations ignored - MuJoCo + uses its own defaults) + + Returns: + Configured SolverMuJoCo instance with: + - iterations: 20 (default, sufficient for robots) + - ls_iterations: 10 (line search for Newton solver) + - solver: "cg" (Conjugate Gradient, fast) + - integrator: "implicitfast" (recommended for control) + """ + import newton + + log.info("MuJoCo adapter: Creating solver (recommended for articulated robots)") + + # MuJoCo solver uses default parameters (20 iterations, CG solver) + # These are robust for most scenarios and don't require tuning + return newton.solvers.SolverMuJoCo(model) + + def validate_scene(self, global_config: Dict[str, Any]) -> list[str]: + """Validate MuJoCo-specific configuration requirements. + + MuJoCo is production-ready for rigid articulated systems but doesn't + support soft bodies or particles. This method checks for incompatible + features. + + Args: + global_config: Complete ARK configuration + + Returns: + List of warning/error messages (empty if scene is compatible) + """ + issues = super().validate_scene(global_config) + + # Check if user is trying to use XPBD-specific features + sim_cfg = global_config.get("simulator", {}).get("config", {}) + newton_cfg = sim_cfg.get("newton_physics", {}) + + # Soft bodies warning + if "soft_bodies" in newton_cfg: + issues.append( + "WARNING: MuJoCo solver doesn't support soft bodies " + "(XPBD feature). Use solver='xpbd' for deformables." + ) + + # Particles warning + if "particles" in newton_cfg: + issues.append( + "WARNING: MuJoCo solver doesn't support particle systems. " + "Use solver='xpbd' or 'implicitm pm' for particles." + ) + + return issues diff --git a/ark/system/newton/scene_adapters/xpbd_adapter.py b/ark/system/newton/scene_adapters/xpbd_adapter.py new file mode 100644 index 0000000..28eb4b7 --- /dev/null +++ b/ark/system/newton/scene_adapters/xpbd_adapter.py @@ -0,0 +1,157 @@ +"""XPBD solver adapter with native ground plane support. + +XPBD (Extended Position-Based Dynamics) is an iterative, position-based solver +that natively supports infinite ground planes. This adapter uses the standard +builder.add_ground_plane() API without workarounds. + +Key characteristics of XPBD solver: +- Iterative constraint solver (Gauss-Seidel-like) +- Linear convergence (needs 20+ effective iterations for stiff constraints) +- Excellent for soft bodies, cloth, deformables +- GPU-parallel (fast for batch simulations) +- Ground plane: Native support via builder.add_ground_plane() + +For robot position control, XPBD requires careful iteration tuning: +- Minimum 8 iterations enforced by backend +- Research shows 20+ effective iterations needed (e.g., 2 iter × 10 substeps) +- Lower damping (target_kd ~ 1.0) works better than standard PD gains +""" + +from typing import TYPE_CHECKING, Any, Dict + +from ark.tools.log import log +from ark.system.newton.scene_adapters.base_adapter import SolverSceneAdapter + +if TYPE_CHECKING: + import newton + from ark.system.newton.geometry_descriptors import GeometryDescriptor + + +class XPBDAdapter(SolverSceneAdapter): + """Adapter for Newton's XPBD solver. + + XPBD uses native ground plane implementation, making this adapter + straightforward. The main complexity is in solver validation - XPBD + requires many iterations for stiff position control. + + Example: + >>> adapter = XPBDAdapter(builder) + >>> adapter.adapt_ground_plane(descriptor) + # Uses builder.add_ground_plane() natively + >>> solver = adapter.create_solver(model, sim_cfg) + # Returns SolverXPBD with appropriate iteration count + """ + + @property + def solver_name(self) -> str: + """Return solver display name.""" + return "XPBD" + + def adapt_ground_plane(self, descriptor: "GeometryDescriptor") -> None: + """Add ground plane using XPBD's native support. + + XPBD natively supports infinite collision planes, so we can use + the standard builder.add_ground_plane() API directly. No workarounds + or geometry substitution needed. + + Args: + descriptor: Ground plane description (physics properties applied to plane) + """ + # XPBD supports native ground plane + # Note: Newton's add_ground_plane() doesn't accept size parameter - it's always infinite + self.builder.builder.add_ground_plane() + + log.ok("XPBD adapter: Added native ground plane") + + def create_solver( + self, + model: "newton.Model", + sim_cfg: Dict[str, Any] + ) -> "newton.solvers.SolverBase": + """Create XPBD solver with appropriate iteration count. + + XPBD is an iterative solver that needs sufficient iterations for + convergence, especially for stiff constraints like position-controlled + robot joints. + + Research findings (from investigation): + - 4 iterations (ARK default): Robot appears "frozen", ~0.3 rad error + - 20 effective iterations: Converges successfully + - "Small Steps" paper: Many substeps better than many iterations + + This method enforces a minimum of 8 iterations for TARGET_POSITION mode + to prevent the "frozen robot" problem. + + Args: + model: Finalized Newton model + sim_cfg: Simulation config containing: + - solver_iterations: Requested iteration count + - newton_physics.joint_defaults.mode: Control mode + + Returns: + Configured SolverXPBD instance + """ + import newton + + iterations = int(sim_cfg.get("solver_iterations", 1)) + + # Check if using TARGET_POSITION control mode + newton_cfg = sim_cfg.get("newton_physics", {}) + joint_mode = newton_cfg.get("joint_defaults", {}).get("mode", "") + + if joint_mode == "TARGET_POSITION": + # Enforce minimum 8 iterations for position control stability + # (Full solution needs 20+ effective iterations via substeps) + if iterations < 8: + old_iterations = iterations + iterations = 8 + log.warning( + f"XPBD adapter: Increased solver_iterations from {old_iterations} " + f"to {iterations} (minimum for TARGET_POSITION mode)" + ) + + log.info(f"XPBD adapter: Creating solver with {iterations} iterations") + + return newton.solvers.SolverXPBD(model, iterations=iterations) + + def validate_scene(self, global_config: Dict[str, Any]) -> list[str]: + """Validate XPBD-specific configuration requirements. + + XPBD's main gotcha is iteration count for position control. This method + checks for common misconfigurations and provides helpful warnings. + + Args: + global_config: Complete ARK configuration + + Returns: + List of warning messages (empty if config is optimal) + """ + issues = super().validate_scene(global_config) + + sim_cfg = global_config.get("simulator", {}).get("config", {}) + iterations = sim_cfg.get("solver_iterations", 1) + substeps = sim_cfg.get("substeps", 1) + effective_iters = iterations * substeps + + # Check for TARGET_POSITION mode with insufficient iterations + newton_cfg = sim_cfg.get("newton_physics", {}) + joint_mode = newton_cfg.get("joint_defaults", {}).get("mode", "") + + if joint_mode == "TARGET_POSITION" and effective_iters < 20: + issues.append( + f"WARNING: XPBD with TARGET_POSITION mode needs ~20 effective iterations " + f"for stability (research-validated). Current: {iterations} × {substeps} = {effective_iters}. " + f"Consider using: solver_iterations=2, substeps=10 (Small Steps approach) " + f"or solver: 'mujoco' for better robot control convergence." + ) + + # Check for high damping (common mistake when porting from MuJoCo) + target_kd = newton_cfg.get("joint_defaults", {}).get("target_kd", 0) + if target_kd > 20.0: + issues.append( + f"WARNING: XPBD target_kd={target_kd} is high. XPBD interprets damping " + f"differently than MuJoCo. Research shows target_kd=1.0-5.0 works better " + f"for XPBD. High damping can cause 'frozen robot' behavior." + ) + + return issues diff --git a/ark/system/simulation/simulator_node.py b/ark/system/simulation/simulator_node.py index cf13943..1c95c2d 100644 --- a/ark/system/simulation/simulator_node.py +++ b/ark/system/simulation/simulator_node.py @@ -11,6 +11,9 @@ from abc import ABC, abstractmethod from pathlib import Path from typing import Any +import sys +import traceback +import threading from ark.client.comm_infrastructure.base_node import BaseNode from ark.tools.log import log @@ -28,13 +31,7 @@ class SimulatorNode(BaseNode, ABC): tick. """ - def __init__( - self, - global_config, - observation_channels: dict[str, type] | None = None, - action_channels: dict[str, type] | None = None, - namespace: str = "ark", - ): + def __init__(self, global_config): """!Construct the simulator node. The constructor loads the global configuration, instantiates the @@ -47,10 +44,6 @@ def __init__( self._load_config(global_config) self.name = self.global_config["simulator"].get("name", "simulator") - self.global_config["observation_channels"] = observation_channels - self.global_config["action_channels"] = action_channels - self.global_config["namespace"] = namespace - super().__init__(self.name, global_config=global_config) log.info( @@ -72,27 +65,24 @@ def __init__( elif self.backend_type == "genesis": from ark.system.genesis.genesis_backend import GenesisBackend self.backend = GenesisBackend(self.global_config) - elif self.backend_type in ["isaacsim", "isaac_sim", "isaac"]: - from ark.system.isaac.isaac_backend import IsaacSimBackend - self.backend = IsaacSimBackend(self.global_config) + elif self.backend_type == "newton": + from ark.system.newton.newton_backend import NewtonBackend + self.backend = NewtonBackend(self.global_config) else: raise ValueError(f"Unsupported backend '{self.backend_type}'") # to initialize a scene with objects that dont need to publish, e.g. for visuals self.initialize_scene() - self.step_physics() ## Reset Backend Service - reset_service_name = f"{namespace}/" + self.name + "/backend/reset/sim" + reset_service_name = self.name + "/backend/reset/sim" self.create_service(reset_service_name, flag_t, flag_t, self._reset_backend) - custom_loop = getattr(self.backend, "custom_event_loop", None) - self.custom_loop = True if callable(custom_loop) else False - if not self.custom_loop: - freq = self.global_config["simulator"]["config"].get( - "node_frequency", 240.0 - ) - self.create_stepper(freq, self._step_simulation) + freq = self.global_config["simulator"]["config"].get("node_frequency", 240.0) + # self.create_stepper(freq, self._step_simulation) + + self.spin_thread = threading.Thread(target=self.spin, daemon=True) + self.spin_thread.start() def _load_config(self, global_config) -> None: """!Load and merge the global configuration. @@ -152,6 +142,10 @@ def _load_config(self, global_config) -> None: config["objects"] = self._load_section(cfg, global_config, "objects") except KeyError as e: config["objects"] = {} + try: + config["ground_plane"] = cfg.get("ground_plane", {}) + except KeyError: + config["ground_plane"] = {} log.ok("Config file under " + global_config.str + " loaded successfully.") self.global_config = config @@ -206,25 +200,12 @@ def _step_simulation(self) -> None: self.step() self.backend.step() - def step_physics(self, num_steps: int = 25, call_step_hook: bool = False) -> None: - """ - Advance the physics backend - Args: - num_steps: Number of physics ticks to run. - call_step_hook: If True, also invoke the subclass step() each tick. - - Returns: - None - """ - for _ in range(max(0, num_steps)): - if call_step_hook: - self.step() - self.backend.step() - + @abstractmethod def initialize_scene(self) -> None: """!Create the initial simulation scene.""" pass + @abstractmethod def step(self) -> None: """!Hook executed every simulation step.""" pass @@ -237,11 +218,6 @@ def spin(self) -> None: backend for spinning all components. It terminates when an ``OSError`` occurs or :attr:`_done` is set to ``True``. """ - # Allow backends to provide their own event loop (e.g., IsaacSim main thread) - if self.custom_loop: - self.backend.custom_event_loop(sim_node=self) - return - while not self._done: try: self._lcm.handle_timeout(0) @@ -255,3 +231,38 @@ def kill_node(self) -> None: """!Shut down the node and the underlying backend.""" self.backend.shutdown_backend() super().kill_node() + +def main(node_cls: type[SimulatorNode], *args) -> None: + """! + Initializes and runs a node. + + This function creates an instance of the specified `node_cls`, spins the node to handle messages, + and handles exceptions that occur during the node's execution. + + @param node_cls: The class of the node to run. + @type node_cls: Type[BaseNode] + """ + + if "--help" in sys.argv or "-h" in sys.argv: + print(node_cls.get_cli_doc()) + sys.exit(0) + + node = None + log.ok(f"Initializing {node_cls.__name__} type node") + try: + node = node_cls(*args) + log.ok(f"Initialized {node.name}") + while not node._done: + node._step_simulation() + except KeyboardInterrupt: + log.warning(f"User killed node {node_cls.__name__}") + except Exception: + tb = traceback.format_exc() + div = "=" * 30 + log.error(f"Exception thrown during node execution:\n{div}\n{tb}\n{div}") + finally: + if node is not None: + node.kill_node() + log.ok(f"Finished running node {node_cls.__name__}") + else: + log.warning(f"Node {node_cls.__name__} failed during initialization")