Skip to content

Commit aff7818

Browse files
committed
added specific demo examples
1 parent 64503db commit aff7818

File tree

3 files changed

+272
-0
lines changed

3 files changed

+272
-0
lines changed

examples/cli_demo_server/server.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import asyncio
2+
import logging
3+
4+
import asyncclick as click
5+
import reactivex
6+
from aiohttp import web
7+
from reactivex import Observable
8+
9+
from rsocket.payload import Payload
10+
from rsocket.routing.request_router import RequestRouter
11+
from rsocket.routing.routing_request_handler import RoutingRequestHandler
12+
from rsocket.rsocket_server import RSocketServer
13+
from rsocket.transports.aiohttp_websocket import TransportAioHttpWebsocket
14+
from rsocket.transports.tcp import TransportTCP
15+
16+
router = RequestRouter()
17+
18+
19+
@router.response('echo')
20+
async def echo(payload: Payload) -> Observable:
21+
return reactivex.just(Payload(payload.data))
22+
23+
24+
def websocket_handler_factory(**kwargs):
25+
async def websocket_handler(request):
26+
ws = web.WebSocketResponse()
27+
await ws.prepare(request)
28+
transport = TransportAioHttpWebsocket(ws)
29+
RSocketServer(transport, **kwargs)
30+
await transport.handle_incoming_ws_messages()
31+
return ws
32+
33+
return websocket_handler
34+
35+
36+
@click.command()
37+
@click.option('--port', help='Port to listen on', default=6565, type=int)
38+
@click.option('--transport', is_flag=False, default='tcp')
39+
async def start_server(port: int, transport: str):
40+
logging.basicConfig(level=logging.DEBUG)
41+
42+
logging.info(f'Starting {transport} server at localhost:{port}')
43+
44+
if transport in ['ws']:
45+
app = web.Application()
46+
app.add_routes([web.get('/', websocket_handler_factory(
47+
handler_factory=lambda: RoutingRequestHandler(router)
48+
))])
49+
50+
await web._run_app(app, port=port)
51+
elif transport == 'tcp':
52+
def handle_client(reader, writer):
53+
RSocketServer(TransportTCP(reader, writer),
54+
handler_factory=lambda: RoutingRequestHandler(router))
55+
56+
server = await asyncio.start_server(handle_client, 'localhost', port)
57+
58+
async with server:
59+
await server.serve_forever()
60+
else:
61+
raise Exception(f'Unsupported transport {transport}')
62+
63+
64+
if __name__ == '__main__':
65+
start_server()

examples/issue_290/client.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
import asyncio
2+
import logging
3+
import sys
4+
from asyncio import Event
5+
from typing import AsyncGenerator, Tuple
6+
7+
import aiohttp
8+
from rsocket.transports.asyncwebsockets_transport import websocket_client
9+
10+
from rsocket.transports.aiohttp_websocket import TransportAioHttpClient
11+
12+
from reactivestreams.publisher import Publisher
13+
from reactivestreams.subscriber import Subscriber
14+
from reactivestreams.subscription import Subscription
15+
from rsocket.extensions.helpers import route, composite, authenticate_simple
16+
from rsocket.extensions.mimetypes import WellKnownMimeTypes
17+
from rsocket.helpers import single_transport_provider
18+
from rsocket.payload import Payload
19+
from rsocket.rsocket_client import RSocketClient
20+
from rsocket.streams.stream_from_async_generator import StreamFromAsyncGenerator
21+
from rsocket.transports.tcp import TransportTCP
22+
23+
24+
def sample_publisher(wait_for_requester_complete: Event,
25+
response_count: int = 1000) -> Publisher:
26+
async def generator() -> AsyncGenerator[Tuple[Payload, bool], None]:
27+
current_response = 0
28+
for i in range(response_count):
29+
is_complete = (current_response + 1) == response_count
30+
31+
message = 'Item to server from client on channel: %s' % current_response
32+
yield Payload(message.encode('utf-8')), is_complete
33+
34+
if is_complete:
35+
wait_for_requester_complete.set()
36+
break
37+
38+
current_response += 1
39+
40+
return StreamFromAsyncGenerator(generator)
41+
42+
43+
class ChannelSubscriber(Subscriber):
44+
45+
def __init__(self, wait_for_responder_complete: Event) -> None:
46+
super().__init__()
47+
self._wait_for_responder_complete = wait_for_responder_complete
48+
self.values = []
49+
50+
def on_subscribe(self, subscription: Subscription):
51+
self.subscription = subscription
52+
53+
def on_next(self, value: Payload, is_complete=False):
54+
logging.info('From server on channel: ' + value.data.decode('utf-8'))
55+
self.values.append(value.data)
56+
if is_complete:
57+
self._wait_for_responder_complete.set()
58+
59+
def on_error(self, exception: Exception):
60+
logging.error('Error from server on channel' + str(exception))
61+
self._wait_for_responder_complete.set()
62+
63+
def on_complete(self):
64+
logging.info('Completed from server on channel')
65+
self._wait_for_responder_complete.set()
66+
67+
async def request_channel(client: RSocketClient):
68+
69+
channel_completion_event = Event()
70+
requester_completion_event = Event()
71+
payload = Payload(b'The first item in the stream', composite(
72+
route('channel'),
73+
authenticate_simple('user', '12345')
74+
))
75+
publisher = sample_publisher(requester_completion_event)
76+
77+
requested = client.request_channel(payload, publisher)
78+
79+
subscriber = ChannelSubscriber(channel_completion_event)
80+
requested.initial_request_n(5).subscribe(subscriber)
81+
82+
await channel_completion_event.wait()
83+
await requester_completion_event.wait()
84+
85+
86+
async def application(serve_port: int):
87+
async with websocket_client('http://localhost:%s/rsocket' % serve_port,
88+
metadata_encoding=WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA,) as client:
89+
await request_channel(client)
90+
91+
92+
async def command():
93+
logging.basicConfig(level=logging.DEBUG)
94+
await application(7878)
95+
96+
97+
if __name__ == '__main__':
98+
asyncio.run(command())

examples/issue_290/server.py

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
import asyncio
2+
import logging
3+
4+
from aiohttp import web
5+
6+
from reactivestreams.subscriber import DefaultSubscriber
7+
from reactivestreams.subscription import Subscription
8+
from rsocket.helpers import DefaultPublisherSubscription
9+
from rsocket.payload import Payload
10+
from rsocket.routing.request_router import RequestRouter
11+
from rsocket.routing.routing_request_handler import RoutingRequestHandler
12+
from rsocket.rsocket_server import RSocketServer
13+
from rsocket.transports.aiohttp_websocket import TransportAioHttpWebsocket
14+
15+
16+
class ChannelPublisher(DefaultPublisherSubscription):
17+
def __init__(self):
18+
super().__init__()
19+
self.subscriber = None
20+
21+
def publish(self, data):
22+
print(f"ChannelPublisher - publish: {data}")
23+
print(f"ChannelPublisher - publish subscriber: {self.subscriber}")
24+
25+
if (self.subscriber is not None):
26+
self.subscriber.on_next(Payload(data.encode('utf-8')))
27+
28+
def subscribe(self, subscriber):
29+
print("ChannelPublisher - subscribe")
30+
31+
self.subscriber = subscriber
32+
33+
subscriber.on_subscribe(self)
34+
35+
36+
class ChannelSubscriber(DefaultSubscriber):
37+
38+
def __init__(self, publisher: ChannelPublisher):
39+
self.subscription = None
40+
self.publisher = publisher
41+
42+
def on_subscribe(self, subscription: Subscription):
43+
print("ChannelSubscriber - on_subscribe")
44+
subscription.request(3)
45+
46+
def on_next(self, value: Payload, is_complete=False):
47+
user_message = value.data.decode('utf-8')
48+
print(f"ChannelSubscriber - on_next: {user_message}")
49+
50+
self.publisher.publish(f"Some text and message: {user_message}")
51+
52+
53+
def handler_factory_factory():
54+
router = RequestRouter()
55+
56+
@router.channel('channel')
57+
async def channel_response(payload: Payload, composite_metadata):
58+
print('Got channel request')
59+
60+
publisher = ChannelPublisher()
61+
subscriber = ChannelSubscriber(publisher)
62+
63+
# WHY?: if passing received payload to ChannelSubscriber here can be processed before at ChannelPublisher "subscribe" method got subscriber
64+
# in other words - we do "one_next()", it will be processed with some logic and result can be published with "publish()"
65+
# subscriber.on_next(payload)
66+
67+
return publisher, subscriber
68+
69+
def handler_factory():
70+
return RoutingRequestHandler(router)
71+
72+
return handler_factory
73+
74+
75+
def websocket_handler_factory(**kwargs):
76+
async def websocket_handler(request):
77+
ws = web.WebSocketResponse()
78+
await ws.prepare(request)
79+
transport = TransportAioHttpWebsocket(ws)
80+
RSocketServer(transport, **kwargs)
81+
await transport.handle_incoming_ws_messages()
82+
return ws
83+
84+
return websocket_handler
85+
86+
87+
async def start_server():
88+
logging.basicConfig(level=logging.DEBUG)
89+
90+
print(f'Starting server at localhost: 7878')
91+
92+
app = web.Application()
93+
app.add_routes(
94+
[
95+
web.get(
96+
'/rsocket',
97+
websocket_handler_factory(
98+
handler_factory=handler_factory_factory()
99+
)
100+
),
101+
# web.static('/static', 'static'),
102+
]
103+
)
104+
105+
await web._run_app(app, port=7878, ssl_context=None)
106+
107+
108+
if __name__ == '__main__':
109+
asyncio.run(start_server())

0 commit comments

Comments
 (0)