Skip to content

feat(workflow_engine): Setup the Task for process_workflow_updates #93553

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jun 18, 2025
1 change: 1 addition & 0 deletions src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,7 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
Queue("demo_mode", routing_key="demo_mode"),
Queue("release_registry", routing_key="release_registry"),
Queue("seer.seer_automation", routing_key="seer.seer_automation"),
Queue("workflow_engine.process_workflows", routing_key="workflow_engine.process_workflows"),
]

from celery.schedules import crontab
Expand Down
13 changes: 8 additions & 5 deletions src/sentry/issues/status_change_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,14 @@ def update_status(group: Group, status_change: StatusChangeMessageData) -> None:

This is used to trigger the `workflow_engine` processing status changes.
"""
latest_activity = Activity.objects.filter(
group_id=group.id, type=activity_type.value
).order_by("-datetime")
for handler in group_status_update_registry.registrations.values():
handler(group, status_change, latest_activity[0])
latest_activity = (
Activity.objects.filter(group_id=group.id, type=activity_type.value)
.order_by("-datetime")
.first()
)
if latest_activity is not None:
for handler in group_status_update_registry.registrations.values():
handler(group, status_change, latest_activity)


def get_group_from_fingerprint(project_id: int, fingerprint: Sequence[str]) -> Group | None:
Expand Down
4 changes: 4 additions & 0 deletions src/sentry/taskworker/namespaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@

uptime_tasks = taskregistry.create_namespace("uptime", app_feature="crons")

workflow_engine_tasks = taskregistry.create_namespace(
"workflow_engine", app_feature="workflow_engine"
)


# Namespaces for testing taskworker tasks
exampletasks = taskregistry.create_namespace(name="examples")
Expand Down
2 changes: 2 additions & 0 deletions src/sentry/workflow_engine/processors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
"process_workflows",
"process_data_packet",
"process_delayed_workflows",
"process_workflows",
"DelayedWorkflow",
]

from .data_source import process_data_sources
from .delayed_workflow import DelayedWorkflow, process_delayed_workflows
from .detector import process_detectors
from .workflow import process_workflows
71 changes: 71 additions & 0 deletions src/sentry/workflow_engine/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from sentry.issues.status_change_consumer import group_status_update_registry
from sentry.issues.status_change_message import StatusChangeMessageData
from sentry.models.activity import Activity
from sentry.models.group import Group
from sentry.silo.base import SiloMode
from sentry.tasks.base import instrumented_task
from sentry.taskworker import config, namespaces, retry
from sentry.types.activity import ActivityType
from sentry.utils import metrics

SUPPORTED_ACTIVITIES = [ActivityType.SET_RESOLVED.value]


@instrumented_task(
name="sentry.workflow_engine.tasks.process_workflow_activity",
queue="workflow_engine.process_workflows",
acks_late=True,
default_retry_delay=5,
max_retries=3,
soft_time_limit=50,
time_limit=60,
silo_mode=SiloMode.REGION,
taskworker_config=config.TaskworkerConfig(
namespace=namespaces.workflow_engine_tasks,
processing_deadline_duration=60,
retry=retry.Retry(
times=3,
delay=5,
),
),
)
def process_workflow_activity(activity_id: int, detector_id: int) -> None:
"""
Process a workflow task identified by the given Activity ID and Detector ID.

The task will get the Activity from the database, create a WorkflowEventData object,
and then process the data in `process_workflows`.
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any ideas on what kind of throughput tasks in the workflow_tasks namespace will have?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm - not sure about throughput, i think that it will vary pretty heavily depending on configurations / products. for example, a metric issue with anomaly detection will be more computing time than evaluating if an event is new.

In the end, the throughput would be similar to issue alerts / metric alerts / crons -- all three of those product verticals will be executing here, but they're all fairly different rates.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Knowing that it will be similar to alerts is helpful.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add this to the alerts taskworker pools when we get US deployed. In other regions throughput will be small enough that it isn't a concern.

# TODO - @saponifi3d - implement this in a follow-up PR. This update will require WorkflowEventData
# to allow for an activity in the `event` attribute. That refactor is a bit noisy
# and will be done in a subsequent pr.
pass


@group_status_update_registry.register("workflow_status_update")
def workflow_status_update_handler(
group: Group, status_change_message: StatusChangeMessageData, activity: Activity
) -> None:
"""
Hook the process_workflow_task into the activity creation registry.

Since this handler is called in process for the activity, we want
to queue a task to process workflows asynchronously.
"""
if activity.type not in SUPPORTED_ACTIVITIES:
# If the activity type is not supported, we do not need to process it.
return

detector_id = status_change_message.get("detector_id")

if detector_id is None:
# We should not hit this case, it's should only occur if there is a bug
# passing it from the workflow_engine to the issue platform.
metrics.incr("workflow_engine.error.tasks.no_detector_id")
return

# TODO - implement in follow-up PR for now, just track a metric that we are seeing the activities.
# process_workflow_task.delay(activity.id, detector_id)
metrics.incr(
"workflow_engine.process_workflow.activity_update", tags={"activity_type": activity.type}
)
69 changes: 69 additions & 0 deletions tests/sentry/workflow_engine/test_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from unittest import mock

from sentry.issues.status_change_consumer import update_status
from sentry.issues.status_change_message import StatusChangeMessageData
from sentry.models.activity import Activity
from sentry.models.group import GroupStatus
from sentry.testutils.cases import TestCase
from sentry.types.activity import ActivityType
from sentry.types.group import GroupSubStatus
from sentry.workflow_engine.tasks import workflow_status_update_handler


class IssuePlatformIntegrationTests(TestCase):
def test_handler_invoked__when_resolved(self):
"""
Integration test to ensure the `update_status` method
will correctly invoke the `workflow_state_update_handler`
and increment the metric.
"""
detector = self.create_detector()
group = self.create_group(
project=self.project,
status=GroupStatus.UNRESOLVED,
substatus=GroupSubStatus.ESCALATING,
)

message = StatusChangeMessageData(
id="test_message_id",
project_id=self.project.id,
new_status=GroupStatus.RESOLVED,
new_substatus=None,
fingerprint=["test_fingerprint"],
detector_id=detector.id,
)

with mock.patch("sentry.workflow_engine.tasks.metrics.incr") as mock_incr:
update_status(group, message)
mock_incr.assert_called_with(
"workflow_engine.process_workflow.activity_update",
tags={"activity_type": ActivityType.SET_RESOLVED.value},
)


class WorkflowStatusUpdateHandlerTests(TestCase):
def test__no_detector_id(self):
"""
Test that the workflow_status_update_handler does not crash
when no detector_id is provided in the status change message.
"""
group = self.create_group(project=self.project)
activity = Activity(
project=self.project,
group=group,
type=ActivityType.SET_RESOLVED.value,
data={"fingerprint": ["test_fingerprint"]},
)

message = StatusChangeMessageData(
id="test_message_id",
project_id=self.project.id,
new_status=GroupStatus.RESOLVED,
new_substatus=None,
fingerprint=["test_fingerprint"],
detector_id=None, # No detector_id provided
)

with mock.patch("sentry.workflow_engine.tasks.metrics.incr") as mock_incr:
workflow_status_update_handler(group, message, activity)
mock_incr.assert_called_with("workflow_engine.error.tasks.no_detector_id")
Loading