-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathparticipant.py
87 lines (70 loc) · 3.06 KB
/
participant.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
from multiprocessing import Process, Queue
from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR, IPPROTO_TCP, TCP_NODELAY
import logging
import sys
import time
from roast import pre_round, sign_round
from transport import send_obj, recv_obj
MAX_NONCE_QUEUE = 32
class Participant:
def __init__(self, X, i, sk_i, nonce_queue):
self.X = X
self.i = i
self.sk_i = sk_i
self.nonce_queue = nonce_queue
self.spre_i, self.pre_i = nonce_queue.get()
def sign_round(self, msg, T, pre):
s_i = sign_round(self.X, msg, T, pre, self.i, self.sk_i, self.spre_i)
self.spre_i, self.pre_i = self.nonce_queue.get()
return s_i, self.pre_i
def handle_requests(connection, nonce_queue):
curr_run_id = -1
while True:
obj = recv_obj(connection)
if obj is None:
logging.debug('Connection closed')
break
run_id, data = obj
if run_id < curr_run_id:
logging.debug(f'Participant {i}: Ignoring incoming message from outdated run (run_id = {run_id}, curr_run_id = {curr_run_id})')
elif run_id > curr_run_id:
X, i, sk_i = data
logging.debug(f'Participant {i}: Received initialization data for new run (run_id = {run_id}, curr_run_id = {curr_run_id})')
curr_run_id = run_id
participant = Participant(X, i, sk_i, nonce_queue)
send_obj(connection, (run_id, (i, None, participant.pre_i)))
logging.debug(f'Participant {i}: Sent initial pre_i value')
else:
msg, T, pre, is_malicious = data
logging.info(f'Participant {i}: Received sign_round request, run_id = {run_id}, is_malicious = {is_malicious}')
if not is_malicious:
start = time.time()
s_i, pre_i = participant.sign_round(msg, T, pre)
elapsed = time.time() - start
send_obj(connection, (run_id, (i, s_i, pre_i)))
logging.info(f'Participant {i}: Sent sign_round response and next pre_i value in {elapsed:.4f} seconds')
def compute_nonce_loop(nonce_queue):
while True:
nonce_queue.put(pre_round())
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
if len(sys.argv) != 2:
print(f'usage: {sys.argv[0]} <port>')
sys.exit(1)
nonce_queue = Queue(MAX_NONCE_QUEUE)
Process(target=compute_nonce_loop, args=[nonce_queue], daemon=True).start()
port = int(sys.argv[1])
addr = ('0.0.0.0', port)
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, True)
sock.setsockopt(IPPROTO_TCP, TCP_NODELAY, True)
sock.bind(addr)
sock.listen()
while True:
logging.debug(f'Listening for incoming connections on {addr}')
connection, src = sock.accept()
logging.debug('Accepted connection from {src}')
try:
handle_requests(connection, nonce_queue)
except ConnectionResetError:
logging.info('Participant: Connection reset by coordinator (which most likely has terminated).')