Skip to content

Commit c04828a

Browse files
authored
Merge pull request #119 from rsocket/frame_send_optimization
remove data/metadata copy in sending frames
2 parents 3ec1da0 + 57a9fd1 commit c04828a

12 files changed

+237
-69
lines changed

performance/conftest.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
import logging
22

33

4+
def pytest_configure(config):
5+
config.addinivalue_line("markers", "performance: marks performance tests")
6+
7+
48
def setup_logging(level=logging.DEBUG, use_file: bool = False):
59
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
610

@@ -19,4 +23,4 @@ def setup_logging(level=logging.DEBUG, use_file: bool = False):
1923
logging.basicConfig(level=level, handlers=handlers)
2024

2125

22-
setup_logging(logging.ERROR)
26+
setup_logging(logging.DEBUG)

performance/performance_client.py

+21-11
Original file line numberDiff line numberDiff line change
@@ -13,25 +13,28 @@
1313
from rsocket.rsocket_client import RSocketClient
1414
from rsocket.streams.stream_from_async_generator import StreamFromAsyncGenerator
1515
from rsocket.transports.tcp import TransportTCP
16-
from tests.rsocket.helpers import to_json_bytes
16+
from tests.rsocket.helpers import to_json_bytes, create_large_random_data
17+
18+
19+
data_size = 1920 # * 1080 * 3
20+
large_data = create_large_random_data(data_size)
1721

1822

1923
def sample_publisher(wait_for_requester_complete: Event,
20-
response_count: int = 3) -> Publisher:
24+
response_count: int = 3,
25+
data_generator=lambda index: ('Item to server from client on channel: %s' % index).encode('utf-8')
26+
) -> Publisher:
2127
async def generator() -> AsyncGenerator[Tuple[Fragment, bool], None]:
22-
current_response = 0
2328
for i in range(response_count):
24-
is_complete = (current_response + 1) == response_count
29+
is_complete = (i + 1) == response_count
2530

26-
message = 'Item to server from client on channel: %s' % current_response
27-
yield Fragment(message.encode('utf-8')), is_complete
31+
message = data_generator(i)
32+
yield Payload(message), is_complete
2833

2934
if is_complete:
3035
wait_for_requester_complete.set()
3136
break
3237

33-
current_response += 1
34-
3538
return StreamFromAsyncGenerator(generator)
3639

3740

@@ -47,13 +50,20 @@ async def request_response(self):
4750

4851
return await self._client.request_response(payload)
4952

53+
async def large_request(self):
54+
payload = Payload(large_data, composite(
55+
route('large'),
56+
authenticate_simple('user', '12345')
57+
))
58+
59+
return await self._client.request_response(payload)
60+
5061
async def request_channel(self):
51-
requester_completion_event = Event()
5262
payload = Payload(b'The quick brown fox', composite(
5363
route('channel'),
5464
authenticate_simple('user', '12345')
5565
))
56-
publisher = sample_publisher(requester_completion_event)
66+
publisher = sample_publisher(Event())
5767

5868
return await self._client.request_channel(payload, publisher, limit_rate=5)
5969

@@ -97,7 +107,7 @@ async def __aenter__(self):
97107
connection = await asyncio.open_connection('localhost', self._server_port)
98108

99109
self._client = AwaitableRSocket(RSocketClient(
100-
single_transport_provider(TransportTCP(*connection)),
110+
single_transport_provider(TransportTCP(*connection, read_buffer_size=data_size + 3000)),
101111
metadata_encoding=WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA)
102112
)
103113

performance/performance_server.py

+12-2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@
1515
from rsocket.rsocket_server import RSocketServer
1616
from rsocket.transports.tcp import TransportTCP
1717
from performance.sample_responses import response_stream_2, response_stream_1, LoggingSubscriber
18+
from tests.rsocket.helpers import create_large_random_data
19+
20+
data_size = 1920 # * 1080 * 3
21+
large_data = create_large_random_data(data_size)
1822

1923
router = RequestRouter()
2024

@@ -34,6 +38,12 @@ async def single_request_response(payload, composite_metadata):
3438
return create_future(Payload(b'single_response'))
3539

3640

41+
@router.response('large')
42+
async def single_request_response(payload, composite_metadata):
43+
logging.info('Got single request')
44+
return create_future(Payload(large_data))
45+
46+
3747
@router.response('last_fnf')
3848
async def get_last_fnf():
3949
logging.info('Got single request')
@@ -102,9 +112,9 @@ def handler_factory():
102112

103113
def client_handler_factory(on_ready=None):
104114
def handle_client(reader, writer):
105-
RSocketServer(TransportTCP(reader, writer),
115+
RSocketServer(TransportTCP(reader, writer, read_buffer_size=data_size + 3000),
106116
handler_factory=handler_factory,
107-
fragment_size_bytes=64_000,
117+
# fragment_size_bytes=64_000,
108118
on_ready=on_ready
109119
)
110120

performance/test_performance.py

+8
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from performance.performance_client import PerformanceClient
99
from performance.performance_server import run_server
1010
from rsocket.rsocket_server import RSocketServer
11+
from tests.tools.helpers import measure_time
1112

1213

1314
@pytest.mark.timeout(5)
@@ -31,6 +32,13 @@ async def test_request_stream(unused_tcp_port):
3132
assert result is not None
3233

3334

35+
@pytest.mark.performance
36+
async def test_large_request():
37+
async with run_with_client(6565) as client:
38+
result = await measure_time(client.large_request())
39+
print(result.delta)
40+
41+
3442
@asynccontextmanager
3543
async def run_against_server(unused_tcp_port: int) -> PerformanceClient:
3644
server_ready = asyncio.Event()

0 commit comments

Comments
 (0)