Skip to content

Commit 6cc35c8

Browse files
harsh543claude
andcommitted
test: improve test_workflow_logging_replay structure and cleanup
- Add try/finally to restore full_workflow_info_on_extra after test - Improve docstring to explain what this test validates vs parameterized test - Add clear phase comments for first execution vs replay path Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 15f08ca commit 6cc35c8

File tree

1 file changed

+56
-45
lines changed

1 file changed

+56
-45
lines changed

tests/worker/test_workflow.py

Lines changed: 56 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -2051,56 +2051,67 @@ async def test_workflow_logging(client: Client, temporal_extra_mode: str):
20512051

20522052

20532053
async def test_workflow_logging_replay(client: Client):
2054-
"""Test that replayed logs are suppressed and full_workflow_info_on_extra works."""
2054+
"""Test replay log suppression and full_workflow_info_on_extra.
2055+
2056+
This test validates behavior the parameterized dict/flatten test does NOT cover:
2057+
1. Replay suppression: logs from replayed history should not be emitted again.
2058+
2. full_workflow_info_on_extra: when enabled, logs include workflow.Info object.
2059+
"""
2060+
original_full_info = workflow.logger.full_workflow_info_on_extra
20552061
workflow.logger.full_workflow_info_on_extra = True
2056-
with LogCapturer().logs_captured(
2057-
workflow.logger.base_logger, activity.logger.base_logger
2058-
) as capturer:
2059-
# Log two signals and kill worker before completing. Need to disable
2060-
# workflow cache since we restart the worker and don't want to pay the
2061-
# sticky queue penalty.
2062-
async with new_worker(
2063-
client, LoggingWorkflow, max_cached_workflows=0
2064-
) as worker:
2065-
handle = await client.start_workflow(
2066-
LoggingWorkflow.run,
2067-
id=f"workflow-{uuid.uuid4()}",
2068-
task_queue=worker.task_queue,
2069-
)
2070-
await handle.signal(LoggingWorkflow.my_signal, "signal 1")
2071-
await handle.signal(LoggingWorkflow.my_signal, "signal 2")
2072-
await handle.execute_update(
2073-
LoggingWorkflow.my_update, "update 1", id="update-1"
2074-
)
2075-
assert "signal 2" == await handle.query(LoggingWorkflow.last_signal)
2062+
try:
2063+
with LogCapturer().logs_captured(
2064+
workflow.logger.base_logger, activity.logger.base_logger
2065+
) as capturer:
2066+
# --- First execution: logs should appear ---
2067+
# Disable workflow cache so worker restart triggers replay
2068+
async with new_worker(
2069+
client, LoggingWorkflow, max_cached_workflows=0
2070+
) as worker:
2071+
handle = await client.start_workflow(
2072+
LoggingWorkflow.run,
2073+
id=f"workflow-{uuid.uuid4()}",
2074+
task_queue=worker.task_queue,
2075+
)
2076+
await handle.signal(LoggingWorkflow.my_signal, "signal 1")
2077+
await handle.signal(LoggingWorkflow.my_signal, "signal 2")
2078+
await handle.execute_update(
2079+
LoggingWorkflow.my_update, "update 1", id="update-1"
2080+
)
2081+
assert "signal 2" == await handle.query(LoggingWorkflow.last_signal)
20762082

2077-
# Confirm logs were produced
2078-
assert capturer.find_log("Signal: signal 1")
2079-
assert capturer.find_log("Signal: signal 2")
2080-
assert capturer.find_log("Update: update 1")
2081-
assert capturer.find_log("Query called")
2083+
# Verify logs from first execution
2084+
assert capturer.find_log("Signal: signal 1")
2085+
assert capturer.find_log("Signal: signal 2")
2086+
assert capturer.find_log("Update: update 1")
2087+
assert capturer.find_log("Query called")
20822088

2083-
# Since we enabled full info, make sure it's there
2084-
record = capturer.find_log("Signal: signal 1")
2085-
assert record and isinstance(record.__dict__["workflow_info"], workflow.Info)
2089+
# Verify full_workflow_info_on_extra
2090+
record = capturer.find_log("Signal: signal 1")
2091+
assert record and isinstance(record.__dict__["workflow_info"], workflow.Info)
20862092

2087-
# Clear queue and start a new one with more signals
2088-
capturer.log_queue.queue.clear()
2089-
async with new_worker(
2090-
client,
2091-
LoggingWorkflow,
2092-
task_queue=worker.task_queue,
2093-
max_cached_workflows=0,
2094-
) as worker:
2095-
await handle.signal(LoggingWorkflow.my_signal, "signal 3")
2096-
await handle.signal(LoggingWorkflow.my_signal, "finish")
2097-
await handle.result()
2093+
# --- Clear logs and continue execution (replay path) ---
2094+
# When the new worker starts, it replays the workflow history (signals 1 & 2).
2095+
# Replay suppression should prevent those logs from appearing again.
2096+
capturer.log_queue.queue.clear()
20982097

2099-
# Confirm replayed logs are not present but new ones are
2100-
assert not capturer.find_log("Signal: signal 1")
2101-
assert not capturer.find_log("Signal: signal 2")
2102-
assert capturer.find_log("Signal: signal 3")
2103-
assert capturer.find_log("Signal: finish")
2098+
async with new_worker(
2099+
client,
2100+
LoggingWorkflow,
2101+
task_queue=worker.task_queue,
2102+
max_cached_workflows=0,
2103+
) as worker:
2104+
await handle.signal(LoggingWorkflow.my_signal, "signal 3")
2105+
await handle.signal(LoggingWorkflow.my_signal, "finish")
2106+
await handle.result()
2107+
2108+
# --- Replay execution: no duplicate logs ---
2109+
assert not capturer.find_log("Signal: signal 1")
2110+
assert not capturer.find_log("Signal: signal 2")
2111+
assert capturer.find_log("Signal: signal 3")
2112+
assert capturer.find_log("Signal: finish")
2113+
finally:
2114+
workflow.logger.full_workflow_info_on_extra = original_full_info
21042115

21052116

21062117
@activity.defn

0 commit comments

Comments
 (0)