-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathboundedblockingqueue.py
85 lines (68 loc) · 1.78 KB
/
boundedblockingqueue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
from threading import *
import time
class Node:
def __init__(self,val):
self.val=val
self.next=None
self.prev=None
class BoundedBlockingQueue:
def __init__(self,capacity):
self.capacity=capacity
self.queue_lock=Lock()
self.enqueue_condition=Condition(self.queue_lock)
self.dequeue_condition=Condition(self.queue_lock)
self.size=0
self.head=Node(-1)
self.tail=Node(-2)
self.head.next=self.tail
self.tail.prev=self.head
def enqueue(self,v):
with self.queue_lock:
while self.size==self.capacity:
self.enqueue_condition.wait()
nex=self.tail
prev=self.tail.prev
cur=Node(v)
cur.next=nex
nex.prev=cur
prev.next=cur
cur.prev=prev
self.size+=1
self.dequeue_condition.notify()
def deque(self):
with self.queue_lock:
while self.size==0:
self.dequeue_condition.wait()
prev=self.head
cur=self.head.next
nex=cur.next
prev.next=nex
nex.prev=prev
v=cur.val
cur.prev=None
cur.next=None
self.size-=1
self.enqueue_condition.notify()
return v
def get_size(self):
with self.queue_lock:
return self.size
if __name__ == "__main__":
def producer(queue, items):
for item in items:
print(f"Producing {item}")
queue.enqueue(item)
time.sleep(0.1) # Simulate production time
def consumer(queue, n):
for _ in range(n):
item = queue.deque()
print(f"Consumed {item}")
time.sleep(0.2) # Simulate consumption time
queue = BoundedBlockingQueue(5)
items = range(10)
producer_thread = Thread(target=producer, args=(queue, items))
consumer_thread = Thread(target=consumer, args=(queue, len(items)))
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()