-
Notifications
You must be signed in to change notification settings - Fork 110
/
Copy pathproto_unix_io.py
187 lines (158 loc) · 7.71 KB
/
proto_unix_io.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
from threading import Thread
import queue
import os
from software.networking.unix.threaded_unix_listener import ThreadedUnixListener
from software.networking.unix.threaded_unix_sender import ThreadedUnixSender
from software.thunderscope.thread_safe_buffer import ThreadSafeBuffer
from typing import Type
from google.protobuf.message import Message
class ProtoUnixIO:
"""The ProtoUnixIO is responsible for communicating protobufs over unix sockets.
Register Protobuf
to a buffer to
receive data
│
│
▼
ProtoUnixReceiver ┌───────────────────┐ ┌──┬──┬──┬──┬──┐
──────────> ├──────────>│ │ │ │ │ │
│ │ └──┴──┴──┴──┴──┘
│ ProtoUnixIO │ ThreadSafeBuffer
ProtoUnixSender │ │
<─────────┤ <──────────── send_proto
└───────────────────┘
Observing Protobuf:
- Classes can register as an observer by providing a protobuf type to
observe and a ThreadSafeBuffer to place incoming data. We pass in a
ThreadSafeBuffer rather than a callback to not bog down this class.
We don't want slow callbacks from a single observer to hold up other
observers from receiving new data.
Sending Protobuf:
- Classes can also call send_proto to send data to any registered observers
Unix IO:
- attach_unix_receiver() configures a unix receiver to receive protobufs
over a unix socket and send data to all observers of that proto type.
- attach_unix_sender() configures a unix sender (it is an observer as well)
and relays data from send_proto over the socket.
TL;DR This class manages inter-thread communication through register_observer
and send_proto calls. If unix senders/receivers are attached to a proto type,
then the data is also sent/received over the sockets.
"""
def __init__(self) -> None:
# Mapping from ProtoType.DESCRIPTOR.full_name -> buffer
self.proto_observers = {}
self.all_proto_observers = []
self.unix_senders = {}
self.unix_listeners = {}
self.send_proto_to_observer_threads = {}
self.running = True
def __send_proto_to_observers(self, receive_buffer: ThreadSafeBuffer) -> None:
"""Given a ThreadSafeBuffer (receive_buffer) consume it and
send place it in the other buffers.
:param receive_buffer: The queue to consume from
"""
while self.running:
proto = receive_buffer.get()
if proto.DESCRIPTOR.full_name in self.proto_observers:
for buffer in self.proto_observers[proto.DESCRIPTOR.full_name]:
try:
buffer.put(proto, block=False)
except queue.Full:
pass
for buffer in self.all_proto_observers:
try:
buffer.put(proto, block=False)
except queue.Full:
print("Buffer registered to receive everything dropped data")
def register_observer(
self, proto_class: Type[Message], buffer: ThreadSafeBuffer
) -> None:
"""Register a widget to consume from a given protobuf class
:param proto_class: Class of protobuf to consume
:param buffer: buffer from the widget to register
"""
if proto_class.DESCRIPTOR.full_name in self.proto_observers:
self.proto_observers[proto_class.DESCRIPTOR.full_name].append(buffer)
else:
self.proto_observers[proto_class.DESCRIPTOR.full_name] = [buffer]
def register_to_observe_everything(self, buffer: ThreadSafeBuffer) -> None:
"""Register a buffer to observe all incoming protobufs
:param buffer: buffer to push protos onto
"""
self.all_proto_observers.append(buffer)
def send_proto(
self,
proto_class: Type[Message],
data: Message,
block: bool = False,
timeout: int = None,
) -> None:
"""Send the data to all register_observers
:param proto_class: The class to send
:param data: The data to send
:param block: If block is True, then block until a free space opens up
to put the proto. Otherwise, proto will be dropped if queue is full.
:param timeout: If block is True, then wait for this many seconds
"""
if proto_class.DESCRIPTOR.full_name in self.proto_observers:
for buffer in self.proto_observers[proto_class.DESCRIPTOR.full_name]:
buffer.put(data, block, timeout)
for buffer in self.all_proto_observers:
try:
buffer.put(data, block, timeout)
except queue.Full:
print("Buffer registered to receive everything dropped data")
def attach_unix_sender(
self,
runtime_dir: os.PathLike,
unix_path: os.PathLike,
proto_class: Type[Message],
) -> None:
"""Creates a unix sender and registers an observer
of the proto_class to send the data over the unix_path socket.
:param runtime_dir: The runtime_dir where all protos will be sent to
:param unix_path: The unix socket path within the runtime_dir to open
:param proto_class: The protobuf type to send
"""
sender = ThreadedUnixSender(
unix_path=runtime_dir + unix_path, proto_type=proto_class
)
self.unix_senders[proto_class.DESCRIPTOR.full_name] = sender
self.register_observer(proto_class, sender.proto_buffer)
def attach_unix_receiver(
self,
runtime_dir: os.PathLike,
unix_path: os.PathLike = "",
proto_class: Type[Message] = None,
from_log_visualize: bool = False,
) -> None:
"""Creates a unix listener of that protobuf type and provides
incoming data to registered observers.
:param runtime_dir: The runtime_dir where all protos will be sent to
:param unix_path: The unix path within the runtime_dir to send data over
:param proto_class: The prototype to send
:param from_log_visualize: If the protobuf is coming from LOG(VISUALIZE)
"""
listener = ThreadedUnixListener(
(
runtime_dir + f"/{proto_class.DESCRIPTOR.full_name}"
if from_log_visualize and not unix_path
else runtime_dir + unix_path
),
proto_class=proto_class,
)
key = proto_class.DESCRIPTOR.full_name
self.unix_listeners[key] = listener
self.send_proto_to_observer_threads[key] = Thread(
target=self.__send_proto_to_observers,
args=(listener.proto_buffer,),
daemon=True,
)
self.send_proto_to_observer_threads[key].start()
def force_close(self) -> None:
"""Closes all unix senders and receivers"""
self.running = False
for sender in self.unix_senders.items():
sender[1].force_stop()
for listener in self.unix_listeners.items():
listener[1].force_stop()