Skip to content

Commit 8932498

Browse files
authored
Merge pull request #57 from rsocket/correct_fragmentation_implementation
Correct fragmentation implementation
2 parents 08f9e82 + 05da218 commit 8932498

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+1484
-425
lines changed

CHANGELOG.rst

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,21 @@
11
Changelog
22
---------
33

4-
v0.3.1
4+
v0.4.0
55
======
66

7-
- Added ability to await fire_and_forget and push_metadata. Waits until the client finishes sending the frame.
7+
- Breaking change: Added ability to await fire_and_forget and push_metadata:
8+
- Both now return a future which resolves when the frame (or all fragments) finished sending.
9+
- Fixed fragmentation implementation (misunderstood spec):
10+
- fragments after first one are now correctly of type PayloadFrame
11+
- fragment size now includes frame header and length.
12+
- Added checking fragment size limit (minimum 64) as in java implementation
13+
- Updated examples
14+
15+
v0.3.0
16+
======
17+
Initial mostly complete implementation after long time from previous release (0.2.0)
18+
19+
v0.2.0
20+
======
21+
Legacy. Unknown history.

README.md

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,14 @@ pip install rsocket
1111
```
1212

1313
or install any of the extras:
14-
* rx
15-
* aiohttp
16-
* quart
17-
* uic
14+
15+
* rx
16+
* aiohttp
17+
* quart
18+
* uic
1819

1920
Example:
21+
2022
```shell
2123
pip install --pre rsocket[rx]
2224
```
@@ -47,17 +49,18 @@ The **examples/test_examples.py** shows which pairs of client/server work with e
4749
all the examples
4850
(except for the client_springboot.py which is set up to work against https://github.com/benwilcock/spring-rsocket-demo)
4951

50-
| server (python) | server (java) | client (python) | client(java) |
51-
|-----------------------------|---------------|------------------------------------|-----------------|
52-
| server.py | | client.py | |
53-
| server_quic.py | | client_quic.py | |
54-
| server_with_lease.py | | | ClientWithLease |
55-
| server_with_routing.py | | client_with_routing.py | Client |
56-
| server_with_routing.py | | client_rx.py | |
57-
| server_with_routing.py | | client_reconnect.py | |
58-
| | Server | run_against_example_java_server.py | |
59-
| server_quart_websocket.py | | client_websocket.py | |
60-
| server_aiohttp_websocket.py | | client_websocket.py | |
52+
| server (python) | server (java) | client (python) | client(java) |
53+
|-----------------------------|-------------------------|------------------------------------|-----------------|
54+
| server.py | | client.py | |
55+
| server_quic.py | | client_quic.py | |
56+
| server_with_lease.py | | | ClientWithLease |
57+
| server_with_routing.py | | client_with_routing.py | Client |
58+
| server_with_routing.py | | client_rx.py | |
59+
| server_with_routing.py | | client_reconnect.py | |
60+
| | Server | run_against_example_java_server.py | |
61+
| | ServerWithFragmentation | client_with_routing.py | |
62+
| server_quart_websocket.py | | client_websocket.py | |
63+
| server_aiohttp_websocket.py | | client_websocket.py | |
6164

6265
# Build Status
6366

examples/client.py

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22
import logging
33
import sys
44

5+
from examples.shared_tests import simple_client_server_test
56
from reactivestreams.subscriber import DefaultSubscriber
67
from rsocket.helpers import single_transport_provider
7-
from rsocket.payload import Payload
88
from rsocket.rsocket_client import RSocketClient
99
from rsocket.transports.tcp import TransportTCP
1010

@@ -22,22 +22,7 @@ async def main(server_port):
2222
connection = await asyncio.open_connection('localhost', server_port)
2323

2424
async with RSocketClient(single_transport_provider(TransportTCP(*connection))) as client:
25-
payload = Payload(b'%Y-%m-%d %H:%M:%S')
26-
27-
async def run_request_response():
28-
try:
29-
while True:
30-
result = await client.request_response(payload)
31-
logging.info('Response: {}'.format(result.data))
32-
await asyncio.sleep(1)
33-
except asyncio.CancelledError:
34-
pass
35-
36-
task = asyncio.create_task(run_request_response())
37-
38-
await asyncio.sleep(5)
39-
task.cancel()
40-
await task
25+
await simple_client_server_test(client)
4126

4227

4328
if __name__ == '__main__':

examples/client_quic.py

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55

66
from aioquic.quic.configuration import QuicConfiguration
77

8+
from examples.shared_tests import simple_client_server_test
89
from rsocket.helpers import single_transport_provider
9-
from rsocket.payload import Payload
1010
from rsocket.rsocket_client import RSocketClient
1111
from rsocket.transports.aioquic_transport import rsocket_connect
1212

@@ -23,22 +23,7 @@ async def main(server_port):
2323
async with rsocket_connect('localhost', server_port,
2424
configuration=client_configuration) as transport:
2525
async with RSocketClient(single_transport_provider(transport)) as client:
26-
payload = Payload(b'%Y-%m-%d %H:%M:%S')
27-
28-
async def run_request_response():
29-
try:
30-
while True:
31-
result = await client.request_response(payload)
32-
logging.info('Response: {}'.format(result.data))
33-
await asyncio.sleep(1)
34-
except asyncio.CancelledError:
35-
pass
36-
37-
task = asyncio.create_task(run_request_response())
38-
39-
await asyncio.sleep(5)
40-
task.cancel()
41-
await task
26+
await simple_client_server_test(client)
4227

4328

4429
if __name__ == '__main__':

examples/client_rx.py

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,13 @@
55
from typing import AsyncGenerator, Tuple
66

77
from rx import operators
8+
from rx.core.operators.map import _map
89

10+
from examples.shared_tests import assert_result_data
911
from reactivestreams.subscriber import Subscriber
1012
from reactivestreams.subscription import Subscription
1113
from rsocket.extensions.helpers import route, composite, authenticate_simple, metadata_item
1214
from rsocket.extensions.mimetypes import WellKnownMimeTypes
13-
from rsocket.fragment import Fragment
1415
from rsocket.helpers import single_transport_provider
1516
from rsocket.payload import Payload
1617
from rsocket.rsocket_client import RSocketClient
@@ -21,13 +22,13 @@
2122

2223
def sample_publisher(wait_for_requester_complete: Event,
2324
response_count: int = 3):
24-
async def generator() -> AsyncGenerator[Tuple[Fragment, bool], None]:
25+
async def generator() -> AsyncGenerator[Tuple[Payload, bool], None]:
2526
current_response = 0
2627
for i in range(response_count):
2728
is_complete = (current_response + 1) == response_count
2829

2930
message = 'Item to server from client on channel: %s' % current_response
30-
yield Fragment(message.encode('utf-8')), is_complete
31+
yield Payload(message.encode('utf-8')), is_complete
3132

3233
if is_complete:
3334
wait_for_requester_complete.set()
@@ -96,7 +97,9 @@ async def request_response(client: RxRSocket):
9697
authenticate_simple('user', '12345')
9798
))
9899

99-
await client.request_response(payload).pipe()
100+
result = await client.request_response(payload).pipe()
101+
102+
assert_result_data(result, b'single_response')
100103

101104

102105
async def request_last_metadata(client: RxRSocket):
@@ -107,7 +110,7 @@ async def request_last_metadata(client: RxRSocket):
107110

108111
result = await client.request_response(payload).pipe()
109112

110-
assert result.data == b'audit info'
113+
assert_result_data(result, b'audit info')
111114

112115

113116
async def request_last_fnf(client: RxRSocket):
@@ -118,11 +121,10 @@ async def request_last_fnf(client: RxRSocket):
118121

119122
result = await client.request_response(payload).pipe()
120123

121-
assert result.data == b'aux data'
124+
assert_result_data(result, b'aux data')
122125

123126

124127
async def metadata_push(client: RxRSocket, metadata: bytes):
125-
126128
await client.metadata_push(composite(
127129
route('metadata_push'),
128130
authenticate_simple('user', '12345'),
@@ -173,8 +175,12 @@ async def request_stream(client: RxRSocket):
173175
route('stream'),
174176
authenticate_simple('user', '12345')
175177
))
176-
result = await client.request_stream(payload).pipe(operators.to_list())
177-
print(result)
178+
result = await client.request_stream(payload).pipe(_map(lambda p: p.data), operators.to_list())
179+
180+
if result != [b'Item on channel: 0',
181+
b'Item on channel: 1',
182+
b'Item on channel: 2']:
183+
raise Exception(result)
178184

179185

180186
async def request_slow_stream(client: RxRSocket):
@@ -186,15 +192,6 @@ async def request_slow_stream(client: RxRSocket):
186192
print(result)
187193

188194

189-
async def request_fragmented_stream(client: RxRSocket):
190-
payload = Payload(b'The quick brown fox', composite(
191-
route('fragmented_stream'),
192-
authenticate_simple('user', '12345')
193-
))
194-
result = await client.request_stream(payload).pipe(operators.to_list())
195-
print(result)
196-
197-
198195
async def main(server_port):
199196
logging.info('Connecting to server at localhost:%s', server_port)
200197

@@ -208,7 +205,6 @@ async def main(server_port):
208205
await request_slow_stream(rx_client)
209206
await request_channel(rx_client)
210207
await request_stream_invalid_login(rx_client)
211-
await request_fragmented_stream(rx_client)
212208

213209
await metadata_push(rx_client, b'audit info')
214210
await request_last_metadata(rx_client)

0 commit comments

Comments
 (0)