Skip to content
Open
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 189 additions & 6 deletions selfdrive/ui/tests/test_ui/raylib_screenshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,40 @@

from cereal import log
from cereal import messaging
from cereal.messaging import PubMaster
from cereal.messaging import PubMaster, log_from_bytes, sub_sock
from msgq.visionipc import VisionIpcServer, VisionStreamType
from openpilot.common.basedir import BASEDIR
from openpilot.common.params import Params
from openpilot.common.prefix import OpenpilotPrefix
from openpilot.common.transformations.camera import DEVICE_CAMERAS
from openpilot.selfdrive.test.helpers import with_processes
from openpilot.selfdrive.test.process_replay.migration import migrate, migrate_controlsState, migrate_carState
from openpilot.selfdrive.selfdrived.alertmanager import set_offroad_alert
from openpilot.system.updated.updated import parse_release_notes
from openpilot.tools.lib.cache import DEFAULT_CACHE_DIR
from openpilot.tools.lib.framereader import FrameReader
from openpilot.tools.lib.logreader import LogReader
from openpilot.tools.lib.route import Route

AlertSize = log.SelfdriveState.AlertSize
AlertStatus = log.SelfdriveState.AlertStatus

TEST_DIR = pathlib.Path(__file__).parent
TEST_OUTPUT_DIR = TEST_DIR / "raylib_report"
SCREENSHOTS_DIR = TEST_OUTPUT_DIR / "screenshots"
UI_DELAY = 0.2

# Offroad alerts to test
OFFROAD_ALERTS = ['Offroad_IsTakingSnapshot']
OFFROAD_ALERTS = ["Offroad_IsTakingSnapshot"]

# Onroad test data
TEST_ROUTE = "a2a0ccea32023010|2023-07-27--13-01-19"
TEST_ROUTE_SEGMENT = 2
STREAMS: list[tuple] = []
DATA: dict[str, messaging.capnp._DynamicStructBuilder] = dict.fromkeys(
["carParams", "deviceState", "pandaStates", "controlsState", "selfdriveState",
"liveCalibration", "modelV2", "radarState", "driverMonitoringState", "carState",
"driverStateV2", "roadCameraState", "wideRoadCameraState", "driverCameraState"], None)


def put_update_params(params: Params):
Expand Down Expand Up @@ -126,6 +145,101 @@ def setup_experimental_mode_description(click, pm: PubMaster):
click(1200, 280) # expand description for experimental mode


def setup_onroad(click, pm: PubMaster):
# Start a visionipc server to feed frames
vipc_server = VisionIpcServer("camerad")
for stream_type, cam, _ in STREAMS:
vipc_server.create_buffers(stream_type, 5, cam.width, cam.height)
vipc_server.start_listener()

uidebug_received_cnt = 0
packet_id = 0
uidebug_sock = sub_sock("uiDebug")

# Condition check for uiDebug processing
check_uidebug = DATA["deviceState"].deviceState.started and not DATA["carParams"].carParams.notCar

# Loop until some uiDebug messages or a few cycles
while uidebug_received_cnt <= 20:
for service, data in DATA.items():
if data:
pm.send(service, data)
data.clear_write_flag()

for stream_type, _, image in STREAMS:
vipc_server.send(stream_type, image, packet_id, packet_id, packet_id)

if check_uidebug:
while uidebug_sock.receive(non_blocking=True):
uidebug_received_cnt += 1
else:
uidebug_received_cnt += 1

packet_id += 1
time.sleep(0.05)


def setup_onroad_alert(click, pm: PubMaster, text1, text2, size, status=AlertStatus.normal):
state = DATA["selfdriveState"]
origin_state_bytes = state.to_bytes()
cs = state.selfdriveState
cs.alertText1 = text1
cs.alertText2 = text2
cs.alertSize = size
cs.alertStatus = status
cs.alertType = "test_onroad_alert"
setup_onroad(click, pm)
DATA["selfdriveState"] = log_from_bytes(origin_state_bytes).as_builder()

def setup_onroad_alert_small(click, pm: PubMaster):
setup_onroad_alert(click, pm, "This is a small alert message", "", AlertSize.small)

def setup_onroad_alert_mid_warning(click, pm: PubMaster):
setup_onroad_alert(click, pm, "Medium Alert", "This is a medium warning alert message", AlertSize.mid, AlertStatus.userPrompt)

def setup_onroad_alert_full_critical(click, pm: PubMaster):
setup_onroad_alert(click, pm, "Full Alert", "This is a full critical alert message", AlertSize.full, AlertStatus.critical)


def setup_onroad_disengaged(click, pm: PubMaster):
DATA["selfdriveState"].selfdriveState.enabled = False
setup_onroad(click, pm)
DATA["selfdriveState"].selfdriveState.enabled = True


def setup_onroad_override(click, pm: PubMaster):
DATA["selfdriveState"].selfdriveState.state = log.SelfdriveState.OpenpilotState.overriding
setup_onroad(click, pm)
DATA["selfdriveState"].selfdriveState.state = log.SelfdriveState.OpenpilotState.enabled


def setup_onroad_wide(click, pm: PubMaster):
# widecam only shows when in experimental mode and going slow
DATA["selfdriveState"].selfdriveState.experimentalMode = True
DATA["carState"].carState.vEgo = 5
setup_onroad(click, pm)


def setup_onroad_sidebar(click, pm: PubMaster):
setup_onroad(click, pm)
click(500, 500)
setup_onroad(click, pm)


def setup_onroad_wide_sidebar(click, pm: PubMaster):
setup_onroad_wide(click, pm)
click(500, 500)
setup_onroad_wide(click, pm)


def setup_driver_camera(click, pm: PubMaster):
setup_settings(click, pm)
click(1980, 620) # preview driver camera button
DATA["deviceState"].deviceState.started = False
setup_onroad(click, pm)
DATA["deviceState"].deviceState.started = True


CASES = {
"homescreen": setup_homescreen,
"homescreen_paired": setup_homescreen,
Expand All @@ -145,6 +259,16 @@ def setup_experimental_mode_description(click, pm: PubMaster):
"offroad_alert": setup_offroad_alert,
"confirmation_dialog": setup_confirmation_dialog,
"experimental_mode_description": setup_experimental_mode_description,
"onroad": setup_onroad,
"onroad_alert_small_normal": setup_onroad_alert_small,
"onroad_alert_mid_warning": setup_onroad_alert_mid_warning,
"onroad_alert_full_critical": setup_onroad_alert_full_critical,
"onroad_disengaged": setup_onroad_disengaged,
"onroad_override": setup_onroad_override,
"onroad_sidebar": setup_onroad_sidebar,
"onroad_wide": setup_onroad_wide,
"onroad_wide_sidebar": setup_onroad_wide_sidebar,
"driver_camera": setup_driver_camera,
}


Expand All @@ -154,12 +278,12 @@ def __init__(self):
sys.modules["mouseinfo"] = False

def setup(self):
# Seed minimal offroad state
self.pm = PubMaster(["deviceState"])
ds = messaging.new_message('deviceState')
ds = DATA["deviceState"]
ds.deviceState.networkType = log.DeviceState.NetworkType.wifi
ds.deviceState.lastAthenaPingTime = 0 # show "connect offline" instead of "connect error"
self.pm = PubMaster(list(DATA.keys()))
for _ in range(5):
self.pm.send('deviceState', ds)
self.pm.send("deviceState", ds)
ds.clear_write_flag()
time.sleep(0.05)
time.sleep(0.5)
Expand Down Expand Up @@ -187,11 +311,70 @@ def test_ui(self, name, setup_case):
self.screenshot(name)


def get_frame(path: str | None, index: int = 0):
if path is None:
raise ValueError("Missing camera frame path")
return FrameReader(path, pix_fmt="nv12").get(index)


def get_cached_frames(route: Route, segnum: int):
import pickle

# Ensure cache directory exists
os.makedirs(DEFAULT_CACHE_DIR, exist_ok=True)
frames_cache = f"{DEFAULT_CACHE_DIR}/test_ui_frames"
Copy link

Copilot AI Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cache filename is not specific to the route/segment; running tests with different routes or segments risks stale or mismatched frames. Include route name and segment (and optionally camera profile) in the cache key, e.g., DEFAULT_CACHE_DIR/test_ui_frames_{route.fullname}_{segnum}.npz.

Suggested change
frames_cache = f"{DEFAULT_CACHE_DIR}/test_ui_frames"
# Make cache filename specific to route and segment
safe_route_name = route.fullname.replace("/", "_")
frames_cache = f"{DEFAULT_CACHE_DIR}/test_ui_frames_{safe_route_name}_{segnum}.pkl"

Copilot uses AI. Check for mistakes.

# Load frames from cache if available
if os.path.isfile(frames_cache):
with open(frames_cache, "rb") as f:
# Read frames from cache
frames = pickle.load(f)
road_img = frames[0]
wide_road_img = frames[1]
driver_img = frames[2]
else:
with open(frames_cache, "wb") as f:
# No cached frames, read from route and cache them
print("no cached frames, reading from route")
road_path = route.camera_paths()[segnum]
road_img = get_frame(road_path)
wide_road_path = route.ecamera_paths()[segnum]
wide_road_img = get_frame(wide_road_path)
driver_path = route.dcamera_paths()[segnum]
driver_img = get_frame(driver_path)
pickle.dump([road_img, wide_road_img, driver_img], f)
return road_img, wide_road_img, driver_img


def prepare_onroad_data():
route = Route(TEST_ROUTE)

# Prepare route data
qpaths = route.qlog_paths()
lr = LogReader(qpaths[TEST_ROUTE_SEGMENT])
DATA["carParams"] = next((event.as_builder() for event in lr if event.which() == "carParams"), None)
for event in migrate(lr, [migrate_controlsState, migrate_carState]):
if event.which() in DATA:
DATA[event.which()] = event.as_builder()
if all(DATA.values()):
break

# Prepare camera frames
cam = DEVICE_CAMERAS.get(("tici", "ar0231"))
if cam:
road_img, wide_road_img, driver_img = get_cached_frames(route, TEST_ROUTE_SEGMENT)
STREAMS.append((VisionStreamType.VISION_STREAM_ROAD, cam.fcam, road_img.flatten().tobytes()))
STREAMS.append((VisionStreamType.VISION_STREAM_WIDE_ROAD, cam.ecam, wide_road_img.flatten().tobytes()))
STREAMS.append((VisionStreamType.VISION_STREAM_DRIVER, cam.dcam, driver_img.flatten().tobytes()))


def create_screenshots():
if TEST_OUTPUT_DIR.exists():
shutil.rmtree(TEST_OUTPUT_DIR)
SCREENSHOTS_DIR.mkdir(parents=True)

# Prepare onroad data (route + frames)
prepare_onroad_data()

t = TestUI()
for name, setup in CASES.items():
with OpenpilotPrefix():
Expand Down
Loading