1
+ import asyncio
2
+ from typing import AsyncGenerator , List
3
+ from fastapi import FastAPI
4
+ from starlette .responses import StreamingResponse
5
+ import uvicorn
6
+ from threading import Thread
7
+ import httpx
8
+ from mcp .client .sse import aconnect_sse
9
+
1
10
app = FastAPI ()
2
11
3
12
@app .get ("/sse" )
4
13
async def sse_endpoint () -> StreamingResponse :
5
14
async def event_stream () -> AsyncGenerator [str , None ]:
6
15
for i in range (3 ):
7
- yield f"data: Hello { i + 1 } \n \n "
16
+ yield f"data: Hello { i + 1 } \\ n \ \ n"
8
17
await asyncio .sleep (0.1 )
9
18
return StreamingResponse (event_stream (), media_type = "text/event-stream" )
10
19
11
20
def run_mock_server () -> None :
12
21
uvicorn .run (app , host = "127.0.0.1" , port = 8012 , log_level = "warning" )
13
22
14
- async def test_aconnect_sse_server_response () -> None :
23
+ async def run_sse_test () -> None :
15
24
server_thread = Thread (target = run_mock_server , daemon = True )
16
25
server_thread .start ()
17
26
await asyncio .sleep (1 )
18
27
19
28
messages : List [str ] = []
20
-
21
29
async with httpx .AsyncClient () as client :
22
30
async with aconnect_sse (client , "GET" , "http://127.0.0.1:8012/sse" ) as event_source :
23
31
async for event in event_source .aiter_sse ():
@@ -27,5 +35,11 @@ async def test_aconnect_sse_server_response() -> None:
27
35
if len (messages ) == 3 :
28
36
break
29
37
30
- assert messages == ["Hello 1" , "Hello 2" , "Hello 3" ]
31
- print ("\n Test passed! SSE connection via aconnect_sse worked correctly." )
38
+ if messages == ["Hello 1" , "Hello 2" , "Hello 3" ]:
39
+ print ("\\ n Test passed!" )
40
+ else :
41
+ print ("\\ n Test failed:" , messages )
42
+
43
+ if __name__ == "__main__" :
44
+ asyncio .run (run_sse_test ())
45
+
0 commit comments