This guide explains the fundamental concepts of the AgentForge framework and how they work together to create dynamic workflows.
Signals are the fundamental unit of communication in AgentForge. They carry both data and metadata about events or actions in the system.
# Basic signal creation
signal = Signal.new(:user_action, %{id: 1, data: "example"})
# Signal structure
%{
type: :user_action,
data: %{id: 1, data: "example"},
meta: %{timestamp: ~U[2025-03-22 10:00:00Z]}
}
Handlers are functions that process signals and maintain state. They follow a consistent interface:
# Handler function signature
(signal, state) -> {result, new_state}
# Example handler
def process_user(signal, state) do
new_state = Map.put(state, :last_user, signal.data)
{Signal.emit(:user_processed, signal.data), new_state}
end
Flows compose multiple handlers into processing pipelines. They manage the execution order and error handling.
# Creating a flow
flow = [
&validate_user/2,
&enrich_user_data/2,
&save_user/2
]
# Processing a signal through the flow
{:ok, result, final_state} = Flow.process(flow, signal, initial_state)
Primitives are building blocks for creating handlers. AgentForge provides several core primitives:
Conditionally executes different flows based on a condition:
branch = Primitives.branch(
fn signal, _ -> signal.data.age >= 18 end,
adult_flow,
minor_flow
)
Modifies signal data:
transform = Primitives.transform(fn data ->
Map.put(data, :processed_at, DateTime.utc_now())
end)
Iterates over items in signal data:
loop = Primitives.loop(fn item, state ->
{Signal.emit(:item_processed, item), state}
end)
Pauses execution until a condition is met:
wait = Primitives.wait(
fn _, state -> state.resource_ready end,
timeout: 5000
)
Sends notifications through configured channels:
notify = Primitives.notify(
[:console, :webhook],
format: &("User #{&1.name} registered")
)
State is maintained throughout the workflow execution:
# Initial state
state = %{counter: 0}
# Handler updating state
def count_signal(signal, state) do
new_state = Map.update(state, :counter, 1, & &1 + 1)
{signal, new_state}
end
def process_order(signal, state) do
with {:ok, validated} <- validate_order(signal.data),
{:ok, enriched} <- enrich_order(validated),
{:ok, saved} <- save_order(enriched) do
{Signal.emit(:order_processed, saved), state}
else
{:error, reason} -> {Signal.emit(:order_failed, reason), state}
end
end
def aggregate_totals(signal, state) do
total = Map.get(state, :total, 0) + signal.data.amount
{signal, Map.put(state, :total, total)}
end
def route_request(signal, state) do
case signal.data.priority do
:high -> {Signal.emit(:urgent, signal.data), state}
:low -> {Signal.emit(:routine, signal.data), state}
end
end
AgentForge supports defining workflows through configuration:
name: user_registration
steps:
- name: validate_input
type: transform
config:
validate:
- field: email
required: true
- name: process_user
type: branch
config:
condition: "age >= 18"
then_flow: adult_flow
else_flow: minor_flow
AgentForge provides several ways to handle errors:
- Return tagged tuples:
{:error, reason} -> {Signal.emit(:error, reason), state}
- Use rescue in transforms:
Primitives.transform(fn data ->
# ... risky operation ...
rescue
e -> raise "Processing failed: #{Exception.message(e)}"
end)
- Handle errors in flows:
case Flow.process(workflow, signal, state) do
{:ok, result, final_state} -> handle_success(result)
{:error, reason} -> handle_error(reason)
end
- Keep handlers small and focused
- Use appropriate primitives for common patterns
- Maintain immutable state
- Handle errors at appropriate levels
- Use clear signal types and meaningful data
- Document handlers and flows
- Test different execution paths
- Check out the examples in the
examples/
directory - Read the primitive-specific guides
- Review the test files for more usage patterns
- See the CONTRIBUTING.md file for development guidelines