Skip to content

vikramsoni2/microflow

Repository files navigation

Microflow

A lightweight event-driven workflow system for building AI agents and processing pipelines in just 60 lines of Python.

Installation

pip install microflow

Quick Start

import asyncio
from microflow import WorkflowManager, WorkflowEvent

# Create a workflow manager
workflow = WorkflowManager()

# Define a handler
async def greeting_handler(event, ctx):
    name = event.data.get("name", "World")
    yield WorkflowEvent.progress("greeting", f"Processing greeting for {name}")
    yield WorkflowEvent(name="completed", data={"message": f"Hello, {name}!"})

# Register the handler
workflow.register("greet", greeting_handler)

# Run the workflow
async def main():
    initial_event = WorkflowEvent(name="greet", data={"name": "Alice"})
    
    async for event in workflow.process(initial_event):
        if event.name == "progress":
            print(f"Progress: {event.data['step']} - {event.data['description']}")
        elif event.name == "completed":
            print(f"Result: {event.data['message']}")

if __name__ == "__main__":
    asyncio.run(main())

Features

  • Event-Driven Architecture: Chain operations through events
  • Progress Reporting: Built-in support for progress updates
  • Error Handling: Graceful error propagation
  • Flexible Workflows: Easily modify and extend workflows
  • Minimal Dependencies: No external dependencies required
  • Global State: Shared global state throughout the workflow

Guide

Core Components

The entire framework is just 60 lines of Python code, consisting of two main components:

WorkflowEvent

The fundamental data structure that flows through the system:

@dataclass
class WorkflowEvent:
    name: str                      # Event type identifier
    data: Dict[str, Any] = None    # Event payload
    metadata: Dict[str, Any] = None  # Context information
    error: Optional[str] = None    # Error information if applicable

Special events include:

  • progress events: For reporting status updates
  • error events: For handling failures

WorkflowManager

Orchestrates the flow of events through registered handlers:

class WorkflowManager:
    def register(self, event_name: str, handler: EventHandler) -> Self
    async def process(self, initial_event: WorkflowEvent) -> AsyncGenerator[WorkflowEvent, None]

How It Works

Registering Handlers

Handlers are registered for specific event types:

workflow = WorkflowManager()

# Register a handler for the "start" event
workflow.register("start", start_handler)

# Register multiple handlers for the same event (they'll run in sequence)
workflow.register("process_data", validation_handler)
workflow.register("process_data", transformation_handler)

# Chain registration is supported
workflow.register("event1", handler1).register("event2", handler2)

Creating Event Handlers

An event handler is an async function that takes a WorkflowEvent and yields one or more events:

async def search_handler(event: WorkflowEvent, ctx: Dict[str, Any]) -> AsyncGenerator[WorkflowEvent, None]:
    # Report progress
    yield WorkflowEvent.progress("search", "Searching for information...")
    
    # Perform work
    search_results = await perform_search(event.data["query"])
    
    # Return results with a new event type
    yield WorkflowEvent(
        name="search_completed",
        data={"search_results": search_results},
        metadata=event.metadata
    )

Chaining Handlers

Handlers chain together by yielding events that trigger other handlers:

  1. Handler A processes an event and yields a new event with name="process_data"
  2. WorkflowManager sees this event and finds handlers registered for "process_data"
  3. Those handlers run in sequence, potentially yielding more events

Progress Reporting

Special handling for progress events allows for status updates without breaking the chain:

async def complex_handler(event: WorkflowEvent, ctx: Dict[str, Any]):
    # Report progress without changing workflow direction
    yield WorkflowEvent.progress("step1", "Starting processing...")
    
    # Do some work...
    
    yield WorkflowEvent.progress("step2", "Halfway done...")
    
    # Continue the workflow with a new event
    yield WorkflowEvent(name="next_step", data={"result": "success"})

Error Handling

Errors can be propagated through the workflow:

async def risky_handler(event: WorkflowEvent, ctx: Dict[str, Any]):
    try:
        result = await risky_operation()
        yield WorkflowEvent(name="success", data={"result": result})
    except Exception as e:
        yield WorkflowEvent(name="error", error=str(e), metadata=event.metadata)

Example Workflow

the below workflow processes the user's query for search, then searches for weather if the query is related to weather, and finally generates the response.

Flow Diagram

Process Diagram

import asyncio
from microflow import WorkflowManager, WorkflowEvent

# Initialize workflow manager
workflow = WorkflowManager()

# Query handler
async def query_handler(event, ctx):
    yield WorkflowEvent.progress("query", f"Processing query: {event.data['query']}")
    yield WorkflowEvent(name="search", data={"query": event.data["query"]})

# Search handler
async def search_handler(event, ctx):
    yield WorkflowEvent.progress("search", f"Searching for: {event.data['query']}")
    results = ["Result 1", "Result 2", "Result 3"]
    yield WorkflowEvent(name="generate", data={"query": event.data["query"], "results": results})

# Generate handler
async def generate_handler(event, ctx):
    yield WorkflowEvent.progress("generate", "Generating response")
    response = f"Answer to '{event.data['query']}' based on {len(event.data['results'])} results"
    yield WorkflowEvent(name="completed", data={"response": response})

# Register handlers
workflow.register("query", query_handler)
workflow.register("search", search_handler)
workflow.register("generate", generate_handler)

# Run workflow
async def main():
    initial_event = WorkflowEvent(name="query", data={"query": "How do LLMs work?"})
    
    async for event in workflow.process(initial_event):
        if event.name == "progress":
            print(f"Progress: {event.data['step']} - {event.data['description']}")
        elif event.name == "completed":
            print(f"Result: {event.data['response']}")

if __name__ == "__main__":
    asyncio.run(main())

For more complex examples, see the examples directory which includes:

  1. A simple agent workflow that processes queries, searches for information, and generates responses
  2. A weather assistant that analyzes queries, fetches weather data, and generates streaming responses

Benefits

  • Decoupling: Components communicate through events without direct dependencies
  • Extensibility: Easy to add new handlers or modify workflow without changing existing code
  • Observability: Progress events provide visibility into the workflow state
  • Error Handling: Centralized error management through error events

License

MIT

About

Lightweight event-driven Agentic Workflow Manager

Resources

License

Contributing

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages