Skip to content

Conversation

@Cyb3rWard0g
Copy link
Collaborator

Overview

This PR extends Dapr’s native workflow model to support LLM-powered and agent-based activity execution through new unified decorators. Developers can now define, register, and run workflows using standard Dapr patterns (@runtime.workflow, @runtime.activity) while seamlessly integrating reasoning and automation via @llm_activity and @agent_activity. This approach preserves full control over the workflow runtime while enabling declarative, composable AI-driven orchestration.

Key Changes

  • Introduced @llm_activity for direct LLM-powered activity execution
  • Added @agent_activity for integrating autonomous agents in workflows
  • Preserved native Dapr workflow definitions, enabling flexible orchestration with full runtime control
  • Unified prompt formatting, structured output validation, and input normalization under workflow utils
  • Extended convert_result() to handle both BaseMessage and agent message types
  • Supported context-free and templated prompts across all activity types
  • Updated LLM-based and Agent-based Dapr workflow examples to reflect new decorators and best practices

Examples

LLM-based Single Task Workflow

import time

import dapr.ext.workflow as wf
from dapr.ext.workflow import DaprWorkflowContext
from dotenv import load_dotenv

from dapr_agents.llm.dapr import DaprChatClient
from dapr_agents.workflow.decorators import llm_activity

# Load environment variables (e.g., API keys, secrets)
load_dotenv()

# Initialize the Dapr workflow runtime and LLM client
runtime = wf.WorkflowRuntime()
llm = DaprChatClient(component_name="openai")


@runtime.workflow(name="single_task_workflow")
def single_task_workflow(ctx: DaprWorkflowContext, name: str):
    """Ask the LLM about a single historical figure and return a short bio."""
    response = yield ctx.call_activity(describe_person, input={"name": name})
    return response


@runtime.activity(name="describe_person")
@llm_activity(
    prompt="Who was {name}?",
    llm=llm,
)
async def describe_person(ctx, name: str) -> str:
    pass


if __name__ == "__main__":
    runtime.start()
    time.sleep(5)

    client = wf.DaprWorkflowClient()
    instance_id = client.schedule_new_workflow(
        workflow=single_task_workflow,
        input="Grace Hopper",
    )
    print(f"Workflow started: {instance_id}")

    state = client.wait_for_workflow_completion(instance_id)
    if not state:
        print("No state returned (instance may not exist).")
    elif state.runtime_status.name == "COMPLETED":
        print(f"Grace Hopper bio:\n{state.serialized_output}")
    else:
        print(f"Workflow ended with status: {state.runtime_status}")
        if state.failure_details:
            fd = state.failure_details
            print("Failure type:", fd.error_type)
            print("Failure message:", fd.message)
            print("Stack trace:\n", fd.stack_trace)
        else:
            print("Custom status:", state.serialized_custom_status)

    runtime.shutdown()

LLM-based Parallel Tasks Workflow

import logging
import time
from typing import List

import dapr.ext.workflow as wf
from dapr.ext.workflow import DaprWorkflowContext
from dotenv import load_dotenv
from pydantic import BaseModel, Field

from dapr_agents.llm.dapr import DaprChatClient
from dapr_agents.workflow.decorators import llm_activity

# Load environment variables (API keys, etc.)
load_dotenv()

# Configure logging
logging.basicConfig(level=logging.INFO)

# Initialize the Dapr workflow runtime and LLM client
runtime = wf.WorkflowRuntime()
llm = DaprChatClient(component_name="openai")


# ----- Models -----

class Question(BaseModel):
    """Represents a single research question."""
    text: str = Field(..., description="A research question related to the topic.")


class Questions(BaseModel):
    """Encapsulates a list of research questions."""
    questions: List[Question] = Field(
        ..., description="A list of research questions generated for the topic."
    )


# ----- Workflow -----

@runtime.workflow(name="research_workflow")
def research_workflow(ctx: DaprWorkflowContext, topic: str):
    """Defines a Dapr workflow for researching a given topic."""
    # 1) Generate research questions
    questions: Questions = yield ctx.call_activity(
        generate_questions, input={"topic": topic}
    )

    # Handle both dict and model cases gracefully
    q_list = (
        [q["text"] for q in questions["questions"]]
        if isinstance(questions, dict)
        else [q.text for q in questions.questions]
    )

    # 2) Gather information for each question in parallel
    parallel_tasks = [
        ctx.call_activity(gather_information, input={"question": q})
        for q in q_list
    ]
    research_results: List[str] = yield wf.when_all(parallel_tasks)

    # 3) Synthesize final report
    final_report: str = yield ctx.call_activity(
        synthesize_results, input={"topic": topic, "research_results": research_results}
    )

    return final_report


# ----- Activities -----

@runtime.activity(name="generate_questions")
@llm_activity(
    prompt="""
You are a research assistant. Generate exactly 3 focused research questions about the topic: {topic}.
Return ONLY a JSON object matching this schema (no prose):

{{
  "questions": [
    {{ "text": "..." }},
    {{ "text": "..." }},
    {{ "text": "..." }}
  ]
}}
""",
    llm=llm,
)
def generate_questions(ctx, topic: str) -> Questions:
    # Implemented by llm_activity via the prompt above.
    pass


@runtime.activity(name="gather_information")
@llm_activity(
    prompt="""
Research the following question and provide a detailed, well-cited answer (paragraphs + bullet points where helpful).
Question: {question}
""",
    llm=llm,
)
def gather_information(ctx, question: str) -> str:
    # Implemented by llm_activity via the prompt above.
    pass


@runtime.activity(name="synthesize_results")
@llm_activity(
    prompt="""
Create a comprehensive research report on the topic "{topic}" using the following research findings:

{research_results}

Requirements:
- Clear executive summary (3-5 sentences)
- Key findings (bulleted)
- Risks/unknowns
- Short conclusion

Return plain text (no JSON).
""",
    llm=llm,
)
def synthesize_results(ctx, topic: str, research_results: List[str]) -> str:
    # Implemented by llm_activity via the prompt above.
    pass


# ----- Entrypoint -----

if __name__ == "__main__":
    runtime.start()
    time.sleep(5)  # small grace period for runtime readiness

    client = wf.DaprWorkflowClient()
    research_topic = "The environmental impact of quantum computing"

    logging.info(f"Starting research workflow on: {research_topic}")
    instance_id = client.schedule_new_workflow(
        workflow=research_workflow,
        input=research_topic,
    )
    logging.info(f"Workflow started: {instance_id}")

    state = client.wait_for_workflow_completion(instance_id)
    if not state:
        logging.error("No state returned (instance may not exist).")
    elif state.runtime_status.name == "COMPLETED":
        logging.info(f"\nResearch Report:\n{state.serialized_output}")
    else:
        logging.error(f"Workflow ended with status: {state.runtime_status}")
        if state.failure_details:
            fd = state.failure_details
            logging.error("Failure type: %s", fd.error_type)
            logging.error("Failure message: %s", fd.message)
            logging.error("Stack trace:\n%s", fd.stack_trace)
        else:
            logging.error("Custom status: %s", state.serialized_custom_status)

    runtime.shutdown()

Agent-Based Workflow

from __future__ import annotations

import logging
import time

import dapr.ext.workflow as wf
from dapr.ext.workflow import DaprWorkflowContext
from dotenv import load_dotenv

from dapr_agents import Agent
from dapr_agents.llm.dapr import DaprChatClient
from dapr_agents.workflow.decorators import agent_activity

# -----------------------------------------------------------------------------
# Setup
# -----------------------------------------------------------------------------
load_dotenv()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
runtime = wf.WorkflowRuntime()
llm = DaprChatClient(component_name="openai")

# -----------------------------------------------------------------------------
# Agents
# -----------------------------------------------------------------------------
extractor = Agent(
    name="DestinationExtractor",
    role="Extract destination",
    instructions=[
        "Extract the main city from the user's message.",
        "Return only the city name, nothing else.",
    ],
    llm=llm,
)

planner = Agent(
    name="PlannerAgent",
    role="Trip planner",
    instructions=[
        "Create a concise 3-day outline for the given destination.",
        "Balance culture, food, and leisure activities.",
    ],
    llm=llm,
)

expander = Agent(
    name="ItineraryAgent",
    role="Itinerary expander",
    llm=llm,
    instructions=[
        "Expand a 3-day outline into a detailed itinerary.",
        "Include Morning, Afternoon, and Evening sections each day.",
    ],
)


# -----------------------------------------------------------------------------
# Workflow
# -----------------------------------------------------------------------------

@runtime.workflow(name="chained_planner_workflow")
def chained_planner_workflow(ctx: DaprWorkflowContext, user_msg: str) -> str:
    """Plan a 3-day trip using chained agent activities."""
    dest = yield ctx.call_activity(extract_destination, input=user_msg)
    outline = yield ctx.call_activity(plan_outline, input=dest["content"])
    itinerary = yield ctx.call_activity(expand_itinerary, input=outline["content"])
    return itinerary["content"]


# -----------------------------------------------------------------------------
# Activities (no explicit params, no prompts)
# -----------------------------------------------------------------------------

@runtime.activity(name="extract_destination")
@agent_activity(agent=extractor)
def extract_destination(ctx) -> dict:
    """Extract destination city."""
    pass


@runtime.activity(name="plan_outline")
@agent_activity(agent=planner)
def plan_outline(ctx) -> dict:
    """Generate a 3-day outline for the destination."""
    pass


@runtime.activity(name="expand_itinerary")
@agent_activity(agent=expander)
def expand_itinerary(ctx) -> dict:
    """Expand the outline into a full detailed itinerary."""
    pass


# -----------------------------------------------------------------------------
# Entrypoint
# -----------------------------------------------------------------------------

if __name__ == "__main__":
    runtime.start()
    time.sleep(5)

    client = wf.DaprWorkflowClient()
    user_input = "Plan a trip to Paris."

    logger.info("Starting workflow: %s", user_input)
    instance_id = client.schedule_new_workflow(
        workflow=chained_planner_workflow,
        input=user_input,
    )

    logger.info("Workflow started: %s", instance_id)
    state = client.wait_for_workflow_completion(instance_id)

    if not state:
        logger.error("No state returned (instance may not exist).")
    elif state.runtime_status.name == "COMPLETED":
        logger.info("Trip Itinerary:\n%s", state.serialized_output)
    else:
        logger.error("Workflow ended with status: %s", state.runtime_status)
        if state.failure_details:
            fd = state.failure_details
            logger.error("Failure type: %s", fd.error_type)
            logger.error("Failure message: %s", fd.message)
            logger.error("Stack trace:\n%s", fd.stack_trace)
        else:
            logger.error("Custom status: %s", state.serialized_custom_status)

    runtime.shutdown()

Signed-off-by: Roberto Rodriguez <[email protected]>
Copy link
Contributor

@sicoyle sicoyle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

amazingggggggg - thank you for your efforts on this! Few comments for my own understanding/clarity please 🙏 🙌

Comment on lines +57 to +61
# Scalar / positional fallback: bind to the first parameter if present.
if not signature.parameters:
return {}
first_param = next(iter(signature.parameters))
return {first_param: raw_input}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to put this inside a loop in case there are multiple parameters?

)


def strip_context_parameter(signature: inspect.Signature) -> inspect.Signature:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still reading through the rest of the changes, but want to post in case i forget later on... do we add back the ctx? or it is not needed in the case of llm_activity or agent_activity?

Comment on lines +107 to +118
### Agent-based Workflow Patterns

Learn to orchestrate **autonomous, role-driven agents** inside Dapr Workflows using the `@agent_activity` decorator.
These patterns focus on chaining and coordinating specialized agents that reason, plan, and act within durable, stateful workflows.

- **Agent-driven Tasks**: Execute workflow activities through autonomous agents with defined roles and instructions
- **Sequential & Composed Flows**: Chain multiple agents together, passing context and results between steps
- **Resilient Orchestration**: Combine agent reasoning with Dapr’s durable state, recovery, and execution guarantees

This quickstart demonstrates how to design and run **agent-based workflows**, starting with a sequential chain of agents collaborating to complete a shared objective.

[Go to Agent-based Workflow Patterns](./04-agent-based-workflows/)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please add if this applies to agent and durableagent classes?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it! I will add that. Currently, it does not process Durable Agents. It only works with native Agent class. I asked a similiar quesiton above: Can an Activity trigger a child workflow?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DurableAgent within workflow will be great! Tracking that here

Here is an example how to trigger it, not sure if it can be from within an activity:
https://github.com/dapr/python-sdk/blob/main/examples/demo_workflow/app.py#L58

Comment on lines +112 to +178
### 1. Single LLM Activity (01_single_activity_workflow.py)

A simple example where the workflow executes one LLM-powered activity that returns a short biography.

```python
import time
import dapr.ext.workflow as wf
from dapr.ext.workflow import DaprWorkflowContext
from dotenv import load_dotenv
from dapr_agents.llm.dapr import DaprChatClient
from dapr_agents.workflow.decorators import llm_activity
# Load environment variables (e.g., API keys, secrets)
load_dotenv()
# Initialize the Dapr workflow runtime and LLM client
runtime = wf.WorkflowRuntime()
llm = DaprChatClient(component_name="openai")
@runtime.workflow(name="single_task_workflow")
def single_task_workflow(ctx: DaprWorkflowContext, name: str):
"""Ask the LLM about a single historical figure and return a short bio."""
response = yield ctx.call_activity(describe_person, input={"name": name})
return response
@runtime.activity(name="describe_person")
@llm_activity(
prompt="Who was {name}?",
llm=llm,
)
async def describe_person(ctx, name: str) -> str:
pass
if __name__ == "__main__":
runtime.start()
time.sleep(5)
client = wf.DaprWorkflowClient()
instance_id = client.schedule_new_workflow(
workflow=single_task_workflow,
input="Grace Hopper",
)
print(f"Workflow started: {instance_id}")
state = client.wait_for_workflow_completion(instance_id)
if not state:
print("No state returned (instance may not exist).")
elif state.runtime_status.name == "COMPLETED":
print(f"Grace Hopper bio:\n{state.serialized_output}")
else:
print(f"Workflow ended with status: {state.runtime_status}")
if state.failure_details:
fd = state.failure_details
print("Failure type:", fd.error_type)
print("Failure message:", fd.message)
print("Stack trace:\n", fd.stack_trace)
else:
print("Custom status:", state.serialized_custom_status)
runtime.shutdown()
```

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
### 1. Single LLM Activity (01_single_activity_workflow.py)
A simple example where the workflow executes one LLM-powered activity that returns a short biography.
```python
import time
import dapr.ext.workflow as wf
from dapr.ext.workflow import DaprWorkflowContext
from dotenv import load_dotenv
from dapr_agents.llm.dapr import DaprChatClient
from dapr_agents.workflow.decorators import llm_activity
# Load environment variables (e.g., API keys, secrets)
load_dotenv()
# Initialize the Dapr workflow runtime and LLM client
runtime = wf.WorkflowRuntime()
llm = DaprChatClient(component_name="openai")
@runtime.workflow(name="single_task_workflow")
def single_task_workflow(ctx: DaprWorkflowContext, name: str):
"""Ask the LLM about a single historical figure and return a short bio."""
response = yield ctx.call_activity(describe_person, input={"name": name})
return response
@runtime.activity(name="describe_person")
@llm_activity(
prompt="Who was {name}?",
llm=llm,
)
async def describe_person(ctx, name: str) -> str:
pass
if __name__ == "__main__":
runtime.start()
time.sleep(5)
client = wf.DaprWorkflowClient()
instance_id = client.schedule_new_workflow(
workflow=single_task_workflow,
input="Grace Hopper",
)
print(f"Workflow started: {instance_id}")
state = client.wait_for_workflow_completion(instance_id)
if not state:
print("No state returned (instance may not exist).")
elif state.runtime_status.name == "COMPLETED":
print(f"Grace Hopper bio:\n{state.serialized_output}")
else:
print(f"Workflow ended with status: {state.runtime_status}")
if state.failure_details:
fd = state.failure_details
print("Failure type:", fd.error_type)
print("Failure message:", fd.message)
print("Stack trace:\n", fd.stack_trace)
else:
print("Custom status:", state.serialized_custom_status)
runtime.shutdown()
```

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we link to the file and not put the code in the readmes please?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! I was following other READE examples, but yeah too much 😅

Comment on lines +52 to +57
# Handle both dict and model cases gracefully
q_list = (
[q["text"] for q in questions["questions"]]
if isinstance(questions, dict)
else [q.text for q in questions.questions]
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see that we have to support both data forms often in our code. Do you by chance know why? Like in this quickstart can we just support one? I think it's bc the underlying state implementation supports just dictionaries, but I'm not sure if that is because of our implementation in dapr agents or because of limitations within the python sdk or something else tbh

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, we should only support dictionaries here. I will fix that.

@sicoyle
Copy link
Contributor

sicoyle commented Oct 20, 2025

A few thoughts I wrote down while reviewing:

  • Once we merge this, I suggest we create a follow-up issue to support LLM activity prompts through a filename input.

  • How will the decorators apply to orchestrators? I’m thinking they might not work since we check for the AgentBase class, but I’d like to double-check my understanding.

tagging @bibryam too, do you have any thoughts on how this might look when applied to orchestrators? We’re seeing more users on Discord starting to use them...

@Cyb3rWard0g
Copy link
Collaborator Author

Cyb3rWard0g commented Oct 21, 2025

A few thoughts I wrote down while reviewing:

  • Once we merge this, I suggest we create a follow-up issue to support LLM activity prompts through a filename input.
  • How will the decorators apply to orchestrators? I’m thinking they might not work since we check for the AgentBase class, but I’d like to double-check my understanding.

tagging @bibryam too, do you have any thoughts on how this might look when applied to orchestrators? We’re seeing more users on Discord starting to use them...

Yes! I like the idea of supporting prompts from files 💯. I like that idea.

Regarding the Orchestrator classes and basically any class inheriting the current WorkflowApp -> AgenticWorkflow classes, using the decorators will be challening. I am working on a new WorkflowApp which should only be used to start, or stop workflows, register multiple workflows and activities with the same runtime, handle HTTP requests to start workflows, and subscribe pub/sub and HTTP routes to trigger workflows. We should let users define workflows as native dapr workflows in their own separate scripts and use the new WorkflowApp to aggregate and start the runtime. In this scenario, the LLM activity decorators work well with LLM clients using the DaprChatCleint component because they do not rely on an environment variable or python class attribute such as LLM. Technically LLM client requirements such as API Key values are set in the dapr component file. As long as the native dapr workflow has an LLM activity decorator pointing to an LLM variable that is pointing to the Dapr chat client / conversation API component with the right name, we are good. However, the user will not be able to do the same with LLM clients such as llm=OpenAIChatClient() and then pass it to a DurableAgent(llm=llm) and expect to propagate that to all LLM variables defined in all native dapr workflows available in separate python files. To handle that, we would need to add some additional wrappers, but it starts to get too complex.

I will push a PR on this hopefully tomorrow. I am almost done. After that PR, I will have to figure out the "Observability" challenge. Moving away from a WorkflowApp class that encapsulates only 1 workflow and uses legacy Task and Workflow decorators, will bring some extra challenges on tracing native dapr workflows. I need help on that. I wanted to do separate the PRs, to make it easy to review and not disrupt anything. This is why I am currently setting Task and Workflow current decorators as Legacy and showing warnings and not removing anything yet ;)

@sicoyle
Copy link
Contributor

sicoyle commented Oct 21, 2025

I will push a PR on this hopefully tomorrow. I am almost done. After that PR, I will have to figure out the "Observability" challenge. Moving away from a WorkflowApp class that encapsulates only 1 workflow and uses legacy Task and Workflow decorators, will bring some extra challenges on tracing native dapr workflows. I need help on that. I wanted to do separate the PRs, to make it easy to review and not disrupt anything. This is why I am currently setting Task and Workflow current decorators as Legacy and showing warnings and not removing anything yet ;)

There will be efforts in dapr upstream in the sdks to bring in telemetry tracing to the clients. That is currently missing and something that dapr agents will benefit from to where we can just propagate the trace context instead of having our wrapper classes. This will also give us the full story in our workflow activities where right now you do not see things like the underlying activity might call something like a state store so we should see a state trace within the activity. IMO tracing will not be the best until we get that from the sdk side of things, so don't spend tooooooo much time there as it does not have to be perfect, and will have to be updated when sdks have the trace context for us. I also say that because I have also spent a fair amount of time on the tracing, so I know it gets quite complex there 🙃 furthermore, imo, if tracing gets broken on the "legacy" bits, then I don't think we really need to worry about that... just call it out in the PR pls bc before we cut any release I go through and check all the quickstarts (manually until I automate it) so I do confirm things on the tracing side are g2g too.

I also really appreciate the PRs separated out some to help with reviewing 🤗

@bibryam
Copy link
Collaborator

bibryam commented Oct 21, 2025

@Cyb3rWard0g this is a fantastic PR, thank you!
@sicoyle I'm still not very familiar with orchestrators, but IMO orchestrators don't have to be aligned as much with native dapr workflows, and as a result we don't need to address orchestrators as part of this PR.

@sicoyle
Copy link
Contributor

sicoyle commented Oct 22, 2025

any chance we can add a few tests for the decorators too pls? 🙏

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants