-
Notifications
You must be signed in to change notification settings - Fork 110
/
Copy paththread_safe_buffer.py
107 lines (89 loc) · 4.17 KB
/
thread_safe_buffer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import queue
from software.logger.logger import create_logger
from typing import Type, Optional
from google.protobuf.message import Message
class ThreadSafeBuffer:
MIN_DROPPED_BEFORE_LOG = 20
"""Multiple producer, multiple consumer buffer.
│ buffer_size │
├──────────────────────────────────────────┤
│ │
┌──────┬──────┬──────┬──────┬──────┬───────┐
put() │ │ │ │ │ │ │ get()
└──────┴──────┴──────┴──────┴──────┴───────┘
ThreadSafeBuffer
"""
def __init__(
self, buffer_size: int, protobuf_type: Type[Message], log_overrun: bool = False
) -> None:
"""A buffer to hold data to be consumed.
:param buffer size: The size of the buffer.
:param protobuf_type: To buffer
:param log_overrun: False
"""
self.logger = create_logger(protobuf_type.DESCRIPTOR.name + " Buffer")
self.queue = queue.Queue(buffer_size)
self.protobuf_type = protobuf_type
self.log_overrun = log_overrun
self.cached_msg = protobuf_type()
self.protos_dropped = 0
self.last_logged_protos_dropped = 0
def get(
self, block: bool = False, timeout: float = None, return_cached: bool = True
) -> Optional[Message]:
"""Get data from the buffer.
If the buffer is empty:
- If block is True
- wait until a new msg is received.
- If a timeout is supplied, wait for timeout seconds
- Then throw an error, or return cached message if return_cached is True
- If block is False
- Return None if return_cached is False
- Return cached message if return_cached is True
:param block: If block is True, then block until a new message
comes through, or returned the cached msg if return_cached = True
:param timeout: If block is True, then wait for this many seconds before
throwing an error or returning cached
:param return_cached: If queue is empty, decides whether to
return cached value (True) or return None / throw an error (false)
:return: protobuf (cached if block is False and there is no data
in the buffer)
"""
if (
self.log_overrun
and self.protos_dropped > self.last_logged_protos_dropped
and self.protos_dropped > self.MIN_DROPPED_BEFORE_LOG
):
self.logger.warn(
"packets dropped; thunderscope did not show {} protos".format(
self.protos_dropped
)
)
self.last_logged_protos_dropped = self.protos_dropped
if block:
try:
self.cached_msg = self.queue.get(timeout=timeout)
except queue.Empty as empty:
if not return_cached:
raise empty
else:
try:
self.cached_msg = self.queue.get_nowait()
except queue.Empty:
if not return_cached:
return None
return self.cached_msg
def put(self, proto: Message, block: bool = False, timeout: float = None) -> None:
"""Put data into the buffer. If the buffer is full, then
the proto will be logged.
:param proto: The proto to place in the buffer
:param block: Should block until there is space in the buffer
:param timeout: If block is True, then wait for this many seconds
"""
if block:
self.queue.put(proto, block, timeout)
return
try:
self.queue.put_nowait(proto)
except queue.Full:
self.protos_dropped += 1