This repository has been archived by the owner on Aug 9, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.py
executable file
·106 lines (89 loc) · 3.12 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# -*- coding: utf-8 -*-
"""
Server to which clients connect to get updates on data embeddings.
It organizes computations and exposes a webosocket API.
"""
import json
import asyncio
from concurrent.futures import ThreadPoolExecutor
from numpy import random
import websockets
from embedding import tsne, mappers
from data import s_buffer, stream
executor = ThreadPoolExecutor(max_workers=10)
# updates for mapping learners are triggered by a new tSNE
needs_remapping = asyncio.Event()
async def compute_tsne():
while True:
print("[compute_tsne] start")
X, index_first = s_buffer.X()
X_2d = await loop.run_in_executor(executor, tsne, X)
updates = s_buffer.update2d(X_2d, index_first)
broadcast(json.dumps({
"status": "update",
"samples": updates,
}))
needs_remapping.set()
print("[compute_tsne] done")
await asyncio.sleep(10) # debug
async def compute_mappers():
while True:
print("[compute_mappers] start")
s_buffer.mapper_x, s_buffer.mapper_y = \
await loop.run_in_executor(executor,
mappers,
s_buffer.X()[0],
s_buffer.X_2d())
needs_remapping.clear()
print("[compute_mappers] done")
updates = s_buffer.update_xys(mapper_x, mapper_y)
broadcast(json.dumps({
"status":"update",
"samples": updates,
}))
await needs_remapping.wait()
async def in_stream():
""" Simulate a stream of incomming data and buffers it. """
for s in stream: # we fake it...
await asyncio.sleep(random.exponential())
new = s_buffer.extend(s, mapper_x, mapper_y)
broadcast(json.dumps({
"status":"new",
"samples": new,
}))
s_buffer.remove_old(60*10)
def broadcast(message):
""" Send a message to all connected clients. """
if connected:
tasks = asyncio.wait([ws.send(message) for ws in connected])
asyncio.ensure_future(tasks)
async def handler(websocket, path):
""" Handle a websocket connection. """
connected.add(websocket)
try:
# send the buffer's content to new clients
await websocket.send(json.dumps({
'status':'load',
'samples': s_buffer.to_dict(),
}))
while True:
await asyncio.sleep(10)
finally:
connected.remove(websocket)
if __name__ == '__main__':
X, index_first = s_buffer.X()
X_2d = tsne(X)
s_buffer.update2d(X_2d, index_first)
mapper_x, mapper_y = mappers(X, X_2d)
print("[init] done")
connected = set()
start_server = websockets.serve(handler, '0.0.0.0', 8765)
loop = asyncio.get_event_loop()
tasks = [
asyncio.ensure_future(in_stream()),
asyncio.ensure_future(compute_mappers()),
asyncio.ensure_future(compute_tsne()),
start_server,
]
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()