Skip to content

Commit de3217c

Browse files
committed
tests(worker): consolidate workflow logging test; merge trace identifier into task-failure; restore logger state in finally
1 parent 6cc35c8 commit de3217c

File tree

1 file changed

+27
-73
lines changed

1 file changed

+27
-73
lines changed

tests/worker/test_workflow.py

Lines changed: 27 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1993,12 +1993,16 @@ def last_signal(self) -> str:
19931993

19941994
@pytest.mark.parametrize("temporal_extra_mode", ["dict", "flatten"])
19951995
async def test_workflow_logging(client: Client, temporal_extra_mode: str):
1996-
"""Test that workflow logger produces correct log records for each extra mode."""
1996+
"""Test workflow logging: extra mode formatting, replay suppression, and full_workflow_info."""
19971997
original_mode = workflow.logger.temporal_extra_mode
1998+
original_full_info = workflow.logger.full_workflow_info_on_extra
19981999
workflow.logger.temporal_extra_mode = temporal_extra_mode
2000+
workflow.logger.full_workflow_info_on_extra = True
19992001

20002002
try:
20012003
with LogCapturer().logs_captured(workflow.logger.base_logger) as capturer:
2004+
# --- First execution: logs should appear ---
2005+
# Disable workflow cache so worker restart triggers replay
20022006
async with new_worker(
20032007
client, LoggingWorkflow, max_cached_workflows=0
20042008
) as worker:
@@ -2008,19 +2012,26 @@ async def test_workflow_logging(client: Client, temporal_extra_mode: str):
20082012
task_queue=worker.task_queue,
20092013
)
20102014
await handle.signal(LoggingWorkflow.my_signal, "signal 1")
2015+
await handle.signal(LoggingWorkflow.my_signal, "signal 2")
20112016
await handle.execute_update(
20122017
LoggingWorkflow.my_update, "update 1", id="update-1"
20132018
)
2014-
await handle.signal(LoggingWorkflow.my_signal, "finish")
2015-
await handle.result()
2019+
assert "signal 2" == await handle.query(LoggingWorkflow.last_signal)
20162020

2021+
# Verify logs from first execution
20172022
record = capturer.find_log("Signal: signal 1")
20182023
assert record is not None
20192024
assert record.funcName == "my_signal"
2025+
assert capturer.find_log("Signal: signal 2")
2026+
assert capturer.find_log("Query called")
20202027

20212028
update_record = capturer.find_log("Update: update 1")
20222029
assert update_record is not None
20232030

2031+
# Verify full_workflow_info_on_extra
2032+
assert isinstance(record.__dict__["workflow_info"], workflow.Info)
2033+
2034+
# Verify extra mode formatting
20242035
if temporal_extra_mode == "dict":
20252036
# Dict mode appends context to message and uses nested dict
20262037
assert "({'attempt':" in record.message
@@ -2046,49 +2057,6 @@ async def test_workflow_logging(client: Client, temporal_extra_mode: str):
20462057
assert isinstance(
20472058
value, (str, int, float, bool, type(None))
20482059
), f"Key {key} has non-primitive value: {type(value)}"
2049-
finally:
2050-
workflow.logger.temporal_extra_mode = original_mode
2051-
2052-
2053-
async def test_workflow_logging_replay(client: Client):
2054-
"""Test replay log suppression and full_workflow_info_on_extra.
2055-
2056-
This test validates behavior the parameterized dict/flatten test does NOT cover:
2057-
1. Replay suppression: logs from replayed history should not be emitted again.
2058-
2. full_workflow_info_on_extra: when enabled, logs include workflow.Info object.
2059-
"""
2060-
original_full_info = workflow.logger.full_workflow_info_on_extra
2061-
workflow.logger.full_workflow_info_on_extra = True
2062-
try:
2063-
with LogCapturer().logs_captured(
2064-
workflow.logger.base_logger, activity.logger.base_logger
2065-
) as capturer:
2066-
# --- First execution: logs should appear ---
2067-
# Disable workflow cache so worker restart triggers replay
2068-
async with new_worker(
2069-
client, LoggingWorkflow, max_cached_workflows=0
2070-
) as worker:
2071-
handle = await client.start_workflow(
2072-
LoggingWorkflow.run,
2073-
id=f"workflow-{uuid.uuid4()}",
2074-
task_queue=worker.task_queue,
2075-
)
2076-
await handle.signal(LoggingWorkflow.my_signal, "signal 1")
2077-
await handle.signal(LoggingWorkflow.my_signal, "signal 2")
2078-
await handle.execute_update(
2079-
LoggingWorkflow.my_update, "update 1", id="update-1"
2080-
)
2081-
assert "signal 2" == await handle.query(LoggingWorkflow.last_signal)
2082-
2083-
# Verify logs from first execution
2084-
assert capturer.find_log("Signal: signal 1")
2085-
assert capturer.find_log("Signal: signal 2")
2086-
assert capturer.find_log("Update: update 1")
2087-
assert capturer.find_log("Query called")
2088-
2089-
# Verify full_workflow_info_on_extra
2090-
record = capturer.find_log("Signal: signal 1")
2091-
assert record and isinstance(record.__dict__["workflow_info"], workflow.Info)
20922060

20932061
# --- Clear logs and continue execution (replay path) ---
20942062
# When the new worker starts, it replays the workflow history (signals 1 & 2).
@@ -2111,6 +2079,7 @@ async def test_workflow_logging_replay(client: Client):
21112079
assert capturer.find_log("Signal: signal 3")
21122080
assert capturer.find_log("Signal: finish")
21132081
finally:
2082+
workflow.logger.temporal_extra_mode = original_mode
21142083
workflow.logger.full_workflow_info_on_extra = original_full_info
21152084

21162085

@@ -2175,6 +2144,18 @@ async def test_workflow_logging_task_fail(client: Client):
21752144
== "task_fail_once_activity"
21762145
)
21772146

2147+
def workflow_failure_with_identifier(l: logging.LogRecord):
2148+
if (
2149+
hasattr(l, "__temporal_error_identifier")
2150+
and getattr(l, "__temporal_error_identifier")
2151+
== "WorkflowTaskFailure"
2152+
):
2153+
assert l.msg.startswith("Failed activation on workflow")
2154+
return True
2155+
return False
2156+
2157+
assert capturer.find(workflow_failure_with_identifier) is not None
2158+
21782159

21792160
@workflow.defn
21802161
class StackTraceWorkflow:
@@ -8034,33 +8015,6 @@ async def test_quick_activity_swallows_cancellation(client: Client):
80348015
assert cause.message == "Workflow cancelled"
80358016

80368017

8037-
async def test_workflow_logging_trace_identifier(client: Client):
8038-
with LogCapturer().logs_captured(
8039-
temporalio.worker._workflow_instance.logger
8040-
) as capturer:
8041-
async with new_worker(
8042-
client,
8043-
TaskFailOnceWorkflow,
8044-
activities=[task_fail_once_activity],
8045-
) as worker:
8046-
await client.execute_workflow(
8047-
TaskFailOnceWorkflow.run,
8048-
id="workflow_failure_trace_identifier",
8049-
task_queue=worker.task_queue,
8050-
)
8051-
8052-
def workflow_failure(l: logging.LogRecord):
8053-
if (
8054-
hasattr(l, "__temporal_error_identifier")
8055-
and getattr(l, "__temporal_error_identifier") == "WorkflowTaskFailure"
8056-
):
8057-
assert l.msg.startswith("Failed activation on workflow")
8058-
return True
8059-
return False
8060-
8061-
assert capturer.find(workflow_failure) is not None
8062-
8063-
80648018
@activity.defn
80658019
def use_in_workflow() -> bool:
80668020
return workflow.in_workflow()

0 commit comments

Comments
 (0)