Skip to content

Commit 6cc67f4

Browse files
authored
Merge pull request #291 from rsocket/send_channel_request_to_subscriber
Updated example use cases. Fixed a reactivex request_response error
2 parents 41706b4 + 2fcc39f commit 6cc67f4

File tree

4 files changed

+26
-4
lines changed

4 files changed

+26
-4
lines changed

examples/server_with_routing.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,10 @@ async def metadata_push(payload: Payload, composite_metadata: CompositeMetadata)
7373
storage.last_metadata_push = item.content
7474

7575
@router.channel('channel')
76-
async def channel_response(payload, composite_metadata):
76+
async def channel_response(payload:Payload, composite_metadata):
7777
logging.info('Got channel request')
7878
subscriber = LoggingSubscriber()
79+
subscriber.on_next(payload)
7980
channel = sample_async_response_stream(local_subscriber=subscriber)
8081
return channel, subscriber
8182

examples/tutorial/reactivex/chat_server.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ async def receive_statistics(statistics: ClientStatistics):
180180
self._session.statistics = statistics
181181

182182
@router.channel('statistics')
183-
async def send_statistics() -> ReactivexChannel:
183+
async def send_statistics(initial_payload: Payload) -> ReactivexChannel:
184184

185185
async def statistics_generator():
186186
while True:
@@ -201,6 +201,8 @@ def on_next(payload: Payload):
201201
if request.period_seconds is not None:
202202
self._session.requested_statistics.period_seconds = request.period_seconds
203203

204+
on_next(initial_payload)
205+
204206
return ReactivexChannel(
205207
from_observable_with_backpressure(
206208
lambda backpressure: observable_from_async_generator(

rsocket/reactivex/reactivex_handler_adapter.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,13 @@ async def request_fire_and_forget(self, payload: Payload):
4848
await self.delegate.request_fire_and_forget(payload)
4949

5050
async def request_response(self, payload: Payload) -> asyncio.Future:
51-
observable = await self.delegate.request_response(payload)
51+
response = await self.delegate.request_response(payload)
52+
53+
if isinstance(response, asyncio.Future):
54+
observable = await response
55+
else:
56+
observable = response
57+
5258
return observable.pipe(
5359
operators.default_if_empty(Payload()),
5460
operators.to_future()

tests/test_reactivex/test_reactivex_server_side.py

+14-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ class Handler(BaseReactivexHandler):
1919

2020
def __init__(self, server_done: Optional[asyncio.Event] = None):
2121
self._server_done = server_done
22+
self.received_payloads = []
2223

2324
async def request_stream(self, payload: Payload) -> Observable:
2425
return reactivex.from_iterable((Payload(ensure_bytes('Feed Item: {}'.format(index))) for index in range(3)))
@@ -29,6 +30,9 @@ async def request_channel(self, payload: Payload) -> ReactivexChannel:
2930

3031
def observer(value: Payload):
3132
logging.info(f'Received by test server: {value.data}')
33+
self.received_payloads.append(value.data)
34+
35+
observer(payload)
3236

3337
return ReactivexChannel(observable,
3438
Observer(observer,
@@ -62,9 +66,12 @@ async def test_serve_reactivex_channel(pipe: Tuple[RSocketServer, RSocketClient]
6266
server, client = pipe
6367

6468
server_done_event = asyncio.Event()
69+
handler = None
6570

6671
def handler_factory():
67-
return Handler(server_done_event)
72+
nonlocal handler
73+
handler = Handler(server_done_event)
74+
return handler
6875

6976
server.set_handler_using_factory(reactivex_handler_factory(handler_factory))
7077

@@ -84,6 +91,12 @@ def handler_factory():
8491

8592
await server_done_event.wait()
8693

94+
assert len(handler.received_payloads) == 4
95+
assert handler.received_payloads[0] == b'request text'
96+
assert handler.received_payloads[1] == b'Client item: 0'
97+
assert handler.received_payloads[2] == b'Client item: 1'
98+
assert handler.received_payloads[3] == b'Client item: 2'
99+
87100

88101
async def test_serve_reactivex_response(pipe: Tuple[RSocketServer, RSocketClient]):
89102
server, client = pipe

0 commit comments

Comments
 (0)