Skip to content

Commit b479682

Browse files
authored
Merge pull request #58 from rsocket/reactivex_rxpy4
Reactivex rxpy4
2 parents d362d4e + 3790dfb commit b479682

15 files changed

+850
-2
lines changed

CHANGELOG.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ v0.4.0
1111
- fragment size now includes frame header and length.
1212
- Added checking fragment size limit (minimum 64) as in java implementation
1313
- Updated examples
14+
- Added reactivex (RxPy version 4) wrapper client
1415

1516
v0.3.0
1617
======

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,16 @@ pip install rsocket
1212

1313
or install any of the extras:
1414

15-
* rx
15+
* rx (RxPy3)
16+
* reactivex (RxPy4)
1617
* aiohttp
1718
* quart
1819
* uic
1920

2021
Example:
2122

2223
```shell
23-
pip install --pre rsocket[rx]
24+
pip install --pre rsocket[reactivex]
2425
```
2526

2627
Alternatively, download the source code, build a package:

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ aiohttp==3.8.1
1010
quart==0.17.0
1111
coveralls==3.3.1
1212
aioquic==0.9.19
13+
reactivex==4.0.4

rsocket/reactivex/__init__.py

Whitespace-only changes.
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
import asyncio
2+
from typing import Optional
3+
4+
import reactivex
5+
from reactivex import Observable, Observer
6+
from reactivex.notification import OnNext, OnError, OnCompleted
7+
from reactivex.operators import materialize
8+
from reactivex.subject import Subject
9+
10+
from reactivestreams.subscriber import Subscriber
11+
from rsocket.helpers import DefaultPublisherSubscription
12+
from rsocket.logger import logger
13+
from rsocket.reactivex.subscriber_adapter import SubscriberAdapter
14+
15+
16+
async def observable_to_async_event_generator(observable: Observable):
17+
queue = asyncio.Queue()
18+
19+
def on_next(i):
20+
queue.put_nowait(i)
21+
22+
observable.pipe(materialize()).subscribe(
23+
on_next=on_next
24+
)
25+
26+
while True:
27+
value = await queue.get()
28+
yield value
29+
queue.task_done()
30+
31+
32+
def from_aiter(iterator, feedback: Optional[Observable] = None):
33+
# noinspection PyUnusedLocal
34+
def on_subscribe(observer: Observer, scheduler):
35+
async def _aio_next():
36+
try:
37+
event = await iterator.__anext__()
38+
39+
if isinstance(event, OnNext):
40+
observer.on_next(event.value)
41+
elif isinstance(event, OnError):
42+
observer.on_error(event.exception)
43+
elif isinstance(event, OnCompleted):
44+
observer.on_completed()
45+
except StopAsyncIteration:
46+
pass
47+
except Exception as exception:
48+
logger().error(str(exception), exc_info=True)
49+
observer.on_error(exception)
50+
51+
def create_next_task():
52+
asyncio.create_task(_aio_next())
53+
54+
return feedback.subscribe(
55+
on_next=lambda i: create_next_task()
56+
)
57+
58+
return reactivex.create(on_subscribe)
59+
60+
61+
class BackPressurePublisher(DefaultPublisherSubscription):
62+
def __init__(self, wrapped_observable: Observable):
63+
self._wrapped_observable = wrapped_observable
64+
self._feedback = None
65+
66+
def subscribe(self, subscriber: Subscriber):
67+
super().subscribe(subscriber)
68+
self._feedback = Subject()
69+
async_iterator = observable_to_async_event_generator(self._wrapped_observable).__aiter__()
70+
from_aiter(async_iterator, self._feedback).subscribe(SubscriberAdapter(subscriber))
71+
72+
def request(self, n: int):
73+
for i in range(n):
74+
self._feedback.on_next(True)
75+
76+
def cancel(self):
77+
self._feedback.on_completed()
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
import asyncio
2+
import functools
3+
4+
import reactivex
5+
from reactivex import Observable, Observer
6+
from reactivex.disposable import Disposable
7+
8+
from reactivestreams.publisher import Publisher
9+
from reactivestreams.subscriber import Subscriber
10+
from reactivestreams.subscription import Subscription
11+
from rsocket.logger import logger
12+
13+
14+
class RxSubscriber(Subscriber):
15+
def __init__(self, observer, limit_rate: int):
16+
self.limit_rate = limit_rate
17+
self.observer = observer
18+
self._received_messages = 0
19+
self.done = asyncio.Event()
20+
self.get_next_n = asyncio.Event()
21+
self.subscription = None
22+
23+
def on_subscribe(self, subscription: Subscription):
24+
self.subscription = subscription
25+
26+
def on_next(self, value, is_complete=False):
27+
self._received_messages += 1
28+
self.observer.on_next(value)
29+
if is_complete:
30+
self.observer.on_completed()
31+
self._finish()
32+
33+
else:
34+
if self._received_messages == self.limit_rate:
35+
self._received_messages = 0
36+
self.get_next_n.set()
37+
38+
def _finish(self):
39+
self.done.set()
40+
41+
def on_error(self, exception: Exception):
42+
self.observer.on_error(exception)
43+
self._finish()
44+
45+
def on_complete(self):
46+
self.observer.on_completed()
47+
self._finish()
48+
49+
50+
async def _aio_sub(publisher: Publisher, subscriber: RxSubscriber, observer: Observer, loop):
51+
try:
52+
publisher.subscribe(subscriber)
53+
await subscriber.done.wait()
54+
55+
except asyncio.CancelledError:
56+
if not subscriber.done.is_set():
57+
subscriber.subscription.cancel()
58+
except Exception as exception:
59+
loop.call_soon(functools.partial(observer.on_error, exception))
60+
61+
62+
async def _trigger_next_request_n(subscriber, limit_rate):
63+
try:
64+
while True:
65+
await subscriber.get_next_n.wait()
66+
subscriber.subscription.request(limit_rate)
67+
subscriber.get_next_n.clear()
68+
except asyncio.CancelledError:
69+
logger().debug('Asyncio task canceled: trigger_next_request_n')
70+
71+
72+
def from_rsocket_publisher(publisher: Publisher, limit_rate=5) -> Observable:
73+
loop = asyncio.get_event_loop()
74+
75+
# noinspection PyUnusedLocal
76+
def on_subscribe(observer: Observer, scheduler):
77+
subscriber = RxSubscriber(observer, limit_rate)
78+
79+
get_next_task = asyncio.create_task(
80+
_trigger_next_request_n(subscriber, limit_rate)
81+
)
82+
task = asyncio.create_task(
83+
_aio_sub(publisher, subscriber, observer, loop)
84+
)
85+
86+
def dispose():
87+
get_next_task.cancel()
88+
task.cancel()
89+
90+
return Disposable(dispose)
91+
92+
return reactivex.create(on_subscribe)

rsocket/reactivex/reactivex_client.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
from asyncio import Future
2+
3+
from typing import Optional, cast
4+
5+
import reactivex
6+
from reactivex import Observable
7+
8+
from rsocket.frame import MAX_REQUEST_N
9+
from rsocket.payload import Payload
10+
from rsocket.rsocket import RSocket
11+
from rsocket.reactivex.back_pressure_publisher import BackPressurePublisher
12+
from rsocket.reactivex.from_rsocket_publisher import from_rsocket_publisher
13+
14+
15+
class ReactiveXClient:
16+
def __init__(self, rsocket: RSocket):
17+
self._rsocket = rsocket
18+
19+
def request_stream(self, request: Payload, request_limit: int = MAX_REQUEST_N) -> Observable:
20+
response_publisher = self._rsocket.request_stream(request).initial_request_n(request_limit)
21+
return from_rsocket_publisher(response_publisher, request_limit)
22+
23+
def request_response(self, request: Payload) -> Observable:
24+
return reactivex.from_future(cast(Future, self._rsocket.request_response(request)))
25+
26+
def request_channel(self,
27+
request: Payload,
28+
request_limit: int = MAX_REQUEST_N,
29+
observable: Optional[Observable] = None) -> Observable:
30+
if observable is not None:
31+
local_publisher = BackPressurePublisher(observable)
32+
else:
33+
local_publisher = None
34+
35+
response_publisher = self._rsocket.request_channel(
36+
request, local_publisher
37+
).initial_request_n(request_limit)
38+
return from_rsocket_publisher(response_publisher, request_limit)
39+
40+
def fire_and_forget(self, request: Payload) -> Observable:
41+
return reactivex.from_future(cast(Future, self._rsocket.fire_and_forget(request)))
42+
43+
def metadata_push(self, metadata: bytes) -> Observable:
44+
return reactivex.from_future(cast(Future, self._rsocket.metadata_push(metadata)))
45+
46+
async def connect(self):
47+
return await self._rsocket.connect()
48+
49+
async def close(self):
50+
await self._rsocket.close()
51+
52+
async def __aenter__(self):
53+
await self._rsocket.__aenter__()
54+
return self
55+
56+
async def __aexit__(self, exc_type, exc_val, exc_tb):
57+
await self._rsocket.__aexit__(exc_type, exc_val, exc_tb)
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
from reactivex.abc import ObserverBase
2+
3+
from reactivestreams.subscriber import Subscriber
4+
5+
6+
class SubscriberAdapter(ObserverBase):
7+
def __init__(self, subscriber: Subscriber):
8+
self._subscriber = subscriber
9+
10+
def on_next(self, value):
11+
self._subscriber.on_next(value)
12+
13+
def on_error(self, error):
14+
self._subscriber.on_error(error)
15+
16+
def on_completed(self):
17+
self._subscriber.on_complete()

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
python_requires='>=3.8',
1919
extras_require={
2020
'rx': {'Rx >= 3.0.0'},
21+
'reactivex': {'reactivex >= 4.0.0'},
2122
'aiohttp': {'aiohttp >= 3.0.0'},
2223
'quart': {'quart >= 0.15.0'},
2324
'quic': {'aioquic >= 0.9.0'}

tests/rx_support/__init__.py

Whitespace-only changes.

tests/test_reactivex/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)