How to unit test socketio AsyncServer deployed with Sanic using unittest.TestCase? #848
-
I would like to unit test my simple service implemented using socketio.AsyncServer & Sanic. class EventService:
def __init__(self, port=8000, listener_timeout=5):
self.port = port
self.event_listener = EventListener()
self.listener_timeout = listener_timeout
self.sio = socketio.AsyncServer(async_mode='sanic', logger=True, engineio_logger=True)
self.app = Sanic('event_service')
self.sio.attach(self.app)
self.__setup_sio_event_handlers()
self.__register_app_listeners()
def run(self):
self.app.run(port=self.port)
async def __gather_and_send_events(self):
while True:
await self.sio.sleep(self.listener_timeout)
for event in self.event_listener.get_events():
for topic in event.topics:
await self.sio.emit('event', event.data, room=topic)
def __setup_sio_event_handlers(self):
@self.sio.event
async def join(sid, msg):
self.sio.enter_room(sid, msg['room'])
await self.sio.emit('message', 'Entered room: ' + msg['room'], room=sid)
@self.sio.event
async def leave(sid, msg):
self.sio.leave_room(sid, msg['room'])
await self.sio.emit('message', 'Left room: ' + msg['room'], room=sid)
def __register_app_listeners(self):
@self.app.listener('after_server_start')
def __start_event_listener_task(app, loop):
self.sio.start_background_task(self.__gather_and_send_events)
if __name__ == '__main__':
service = EventService()
service.run() I'm struggling with understanding how to launch app instance so I can connect to it from several testcases. I'd like to have the following tests working: class TestEventService(TestCase):
def setUp(self):
self.service = EventService()
self.client = MockSIOClient()
def test_service_can_run(self):
self.assertTrue(self.service.app.is_running)
@async_test
async def test_service_is_reachable_by_clients(self):
await self.client.sio.connect('http://localhost:8000')
self.assertTrue(self.client.is_connected)
self.client.sio.disconnect()
@async_test
async def test_client_can_join_room(self):
await self.sio.emit('join', {'room': 'some_room'})
self.assertTrue(self.client.is_in_room('some_room'))
@async_test
async def test_client_can_leave_room(self):
await self.sio.emit('leave', {'room': 'some_room'})
self.assertFalse(self.client.is_in_room('some_room'))
@async_test
async def test_client_receives_events(self):
w = EventWriter()
w.write_event(Event(topics['some_room'], number=101, hash_str='xoxo'))
# EventService pickups event and sends to client
self.assertEqual(len(self.client.events), 1) I would really appreciate your advice on how to make this work. Thanks in advance! |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 3 replies
-
I have no experience unit testing with Sanic, so I cannot help you there. For the Socket.IO server you have two options, you can start a real server and then connect to it with a real client, or else you can just call your Socket.IO handlers directly and make sure they do what they need to do (mocking any emits or other things that need to be suppressed during tests). Related: #465 |
Beta Was this translation helpful? Give feedback.
I have no experience unit testing with Sanic, so I cannot help you there. For the Socket.IO server you have two options, you can start a real server and then connect to it with a real client, or else you can just call your Socket.IO handlers directly and make sure they do what they need to do (mocking any emits or other things that need to be suppressed during tests).
Related: #465