-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path01_basic_workflow.py
107 lines (82 loc) · 3.18 KB
/
01_basic_workflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
from llama_index.core.workflow import Workflow, step, Event, Context
from llama_index.core.workflow.events import (
StartEvent,
StopEvent,
InputRequiredEvent,
HumanResponseEvent,
)
from llama_index.core.workflow.handler import WorkflowHandler
from llama_index.llms.ollama import Ollama
from llama_index.core.llms import ChatMessage
model_name = "qwen2.5:0.5b"
llm = Ollama(model=model_name, request_timeout=120.0)
class ProgressEvent(Event):
def __init__(self, msg: str):
super().__init__()
self.msg = msg
class RetryEvent(Event):
pass
class FinalResultEvent(Event):
pass
class HumanInteractionWorkflow(Workflow):
@step
async def ask_question(
self, ctx: Context, ev: StartEvent | RetryEvent
) -> InputRequiredEvent:
if isinstance(ev, StartEvent):
ctx.write_event_to_stream(
ProgressEvent(msg=f"I am doing query on '{ev.query}'")
)
else:
ctx.write_event_to_stream(
ProgressEvent(msg=f"I am doing query again on '{ev.query}'")
)
await ctx.set("original_query", ev.query)
payload = llm.chat([ChatMessage(role="user", content=ev.query)])
return InputRequiredEvent(prefix="", query=ev.query, payload=payload)
@step
async def human_review(
self, ctx: Context, ev: HumanResponseEvent
) -> RetryEvent | FinalResultEvent:
ctx.write_event_to_stream(
ProgressEvent(msg=f"The human has responded {ev.response}")
)
original_query = await ctx.get("original_query")
if ev.response == "yes":
return FinalResultEvent(result=f"Result of {original_query}")
else:
return RetryEvent(query=original_query)
@step
async def final_result(self, ctx: Context, ev: FinalResultEvent) -> StopEvent:
ctx.write_event_to_stream(ProgressEvent(msg="final result is preparing"))
original_query = await ctx.get("original_query")
print(original_query, ev.result)
return StopEvent(result=ev.result)
async def executor(query: str):
workflow = HumanInteractionWorkflow(timeout=None, verbose=False)
try:
handler: WorkflowHandler = workflow.run(query=query)
async for event in handler.stream_events():
if isinstance(event, InputRequiredEvent):
# query = event.query
payload = event.payload
print("[Result]:", payload)
print(
"[Assessment]: If you are satisfied type 'yes' for retry type 'no'"
)
user_input = input("[Review]:")
handler.ctx.send_event(HumanResponseEvent(response=user_input))
elif isinstance(event, ProgressEvent):
print("[Progress]", event.msg)
final_result = await handler
print("[Final Result]", final_result)
except Exception as e:
print("[Error]", e)
import asyncio
def main():
query = input("[Input]: ")
asyncio.run(executor(query=query))
if __name__ == "__main__":
main()
# Run : python hitl-workflow/01_basic_workflow.py
# Say something, and then based on the response provide your feedback