|
| 1 | +import json |
| 2 | + |
| 3 | +from cloudevents.conversion import to_json, from_json |
| 4 | +from cloudevents.pydantic import CloudEvent |
| 5 | + |
| 6 | +from rsocket.cloudevents.serialize import cloud_event_deserialize, cloud_event_serialize |
| 7 | +from rsocket.extensions.helpers import composite, route |
| 8 | +from rsocket.extensions.mimetypes import WellKnownMimeTypes |
| 9 | +from rsocket.payload import Payload |
| 10 | +from rsocket.routing.request_router import RequestRouter |
| 11 | +from rsocket.routing.routing_request_handler import RoutingRequestHandler |
| 12 | + |
| 13 | + |
| 14 | +async def test_routed_cloudevents(lazy_pipe): |
| 15 | + router = RequestRouter(cloud_event_deserialize, |
| 16 | + cloud_event_serialize) |
| 17 | + |
| 18 | + def handler_factory(): |
| 19 | + return RoutingRequestHandler(router) |
| 20 | + |
| 21 | + @router.response('cloud_event') |
| 22 | + async def response_request(value: CloudEvent) -> CloudEvent: |
| 23 | + return CloudEvent.create(attributes={ |
| 24 | + 'type': 'io.spring.event.Foo', |
| 25 | + 'source': 'https://spring.io/foos' |
| 26 | + }, data=json.dumps(json.loads(value.data))) |
| 27 | + |
| 28 | + async with lazy_pipe( |
| 29 | + client_arguments={'metadata_encoding': WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA}, |
| 30 | + server_arguments={'handler_factory': handler_factory}) as (server, client): |
| 31 | + event = CloudEvent.create(attributes={ |
| 32 | + 'type': 'io.spring.event.Foo', |
| 33 | + 'source': 'https://spring.io/foos' |
| 34 | + }, data=json.dumps({'value': 'Dave'})) |
| 35 | + |
| 36 | + response = await client.request_response(Payload(data=to_json(event), metadata=composite(route('cloud_event')))) |
| 37 | + |
| 38 | + response_event = from_json(CloudEvent, response.data) |
| 39 | + response_data = json.loads(response_event.data) |
| 40 | + |
| 41 | + assert response_data['value'] == 'Dave' |
0 commit comments