From dc2e452739b3d5f2d38db262aedb91ee513847d5 Mon Sep 17 00:00:00 2001 From: Vishwanath Martur <64204611+vishwamartur@users.noreply.github.com> Date: Mon, 20 Jan 2025 09:16:39 +0530 Subject: [PATCH] Add recording smoke test Related to #775 Add a recording smoke test to ensure the recording functionality works as expected. * **New Test File**: Add `tests/openadapt/test_record.py`. * **Setup and Teardown**: Use `pytest` fixtures to set up and tear down the test environment. * **Recording Test**: - Call `openadapt.record` and wait for the `recording.started` signal. - Stop the recording by emitting the stop sequence. - Assert the state of the database to ensure the recording is created. - Assert the state of the filesystem to ensure the video and performance plots are created. --- tests/openadapt/test_record.py | 59 ++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 tests/openadapt/test_record.py diff --git a/tests/openadapt/test_record.py b/tests/openadapt/test_record.py new file mode 100644 index 000000000..331d315de --- /dev/null +++ b/tests/openadapt/test_record.py @@ -0,0 +1,59 @@ +import pytest +import time +import multiprocessing +from openadapt import record, db, config + +@pytest.fixture +def setup_environment(): + # Setup code before the test + config.RECORD_VIDEO = True + config.RECORD_IMAGES = True + config.RECORD_BROWSER_EVENTS = False + config.RECORD_AUDIO = False + config.PLOT_PERFORMANCE = False + + yield + + # Teardown code after the test + db_path = config.RECORDING_DIR_PATH / "recording.db" + if db_path.exists(): + db_path.unlink() + +def test_recording_smoke_test(setup_environment): + terminate_processing = multiprocessing.Event() + terminate_recording = multiprocessing.Event() + status_pipe_parent, status_pipe_child = multiprocessing.Pipe() + + record_process = multiprocessing.Process( + target=record.record, + args=("Test Task", terminate_processing, terminate_recording, status_pipe_child) + ) + record_process.start() + + # Wait for the recording to start + start_time = time.time() + while time.time() - start_time < 30: + if status_pipe_parent.poll(): + message = status_pipe_parent.recv() + if message.get("type") == "record.started": + break + time.sleep(1) + else: + pytest.fail("Recording did not start within 30 seconds") + + # Stop the recording by emitting the stop sequence + terminate_processing.set() + terminate_recording.wait() + + # Assert the state of the database + session = db.get_new_session(read_and_write=True) + recording = db.crud.get_latest_recording(session) + assert recording is not None, "Recording was not created in the database" + + # Assert the state of the filesystem + video_file_path = config.RECORDING_DIR_PATH / f"{recording.timestamp}.mp4" + assert video_file_path.exists(), "Video file was not created" + performance_plot_path = config.RECORDING_DIR_PATH / f"{recording.timestamp}_performance.png" + assert performance_plot_path.exists(), "Performance plot was not created" + + record_process.join()