Skip to content

Commit e3590d4

Browse files
authored
Integrate dummy capture loop and publish events to MQTT topic (#2)
* integrate dummy capture loop and publish events to MQTT topic * adjust test data * rename topic * refactor broker configs
1 parent 9b8cca9 commit e3590d4

17 files changed

+218
-89
lines changed

Makefile

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@ install: ## Install dependencies in local virtualenv
1212
test ! -e requirements.txt || ($(VENV_RUN); $(PIP_CMD) install -r requirements.txt)
1313

1414
test: ## Run local unit and integration tests
15-
$(VENV_RUN); PYTHONPATH=`pwd` nosetests $(NOSE_ARGS) --with-timer --with-coverage --logging-level=WARNING --nocapture --no-skip --exe --cover-erase --cover-tests --cover-inclusive --cover-package=cpopserver --with-xunit --exclude='$(VENV_DIR).*' $(TEST_PATH)
15+
$(VENV_RUN); PYTHONPATH=`pwd` nosetests $(NOSE_ARGS) --with-timer --with-coverage --logging-level=WARNING --nocapture --no-skip --exe --cover-erase --cover-tests --cover-inclusive --cover-package=cpopservice --with-xunit --exclude='$(VENV_DIR).*' $(TEST_PATH)
1616

1717
start: ## Start up the CPOP server
18-
$(VENV_RUN); PYTHONPATH=`pwd` python cpopserver/server.py
18+
$(VENV_RUN); PYTHONPATH=`pwd` python cpopservice/server.py
1919

2020
lint: ## Run code linter to check code style
2121
($(VENV_RUN); flake8 --inline-quotes=single --show-source --max-line-length=120 --ignore=E128,W504 --exclude=$(VENV_DIR)* .)

adabins.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,4 @@
2121
depth_img = pred[0, 0]/10
2222
cv2.imshow('frame', depth_img)
2323
if cv2.waitKey(1) & 0xFF == ord('q'):
24-
break
24+
break

cpopserver/config.py

-13
This file was deleted.

cpopserver/server.py

-69
This file was deleted.
File renamed without changes.

cpopservice/capture_loop.py

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import cv2
2+
import torch
3+
from time import time
4+
from PIL import Image
5+
from adabins.infer import InferenceHelper
6+
from util.timer import Timer
7+
8+
9+
def run_capture_loop(capture, timer=None, result_queue=None, headless=True):
10+
# infer_helper = InferenceHelper()
11+
while True:
12+
ret, frame = capture.read()
13+
frame_rgb = frame[:, :, ::-1]
14+
im_pil = Image.fromarray(frame_rgb)
15+
timer and timer.start()
16+
# centers, pred = inferHelper.predict_pil(im_pil)
17+
timer and timer.stop()
18+
# depth_img = pred[0, 0]/10
19+
if result_queue:
20+
message = {
21+
'Timestamp': time(),
22+
'Type': 'person',
23+
'Position': {'X': 1, 'Y': 2, 'Z': 3},
24+
'Shape': []
25+
}
26+
result_queue.put(message)
27+
if not headless:
28+
cv2.imshow('frame', depth_img)
29+
if cv2.waitKey(1) & 0xFF == ord('q'):
30+
break

cpopservice/config.py

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import os
2+
import logging
3+
import tempfile
4+
5+
6+
# host and port of local MQTT broker
7+
BROKER_HOST = os.environ.get('BROKER_HOST') or 'localhost'
8+
BROKER_PORT = int(os.environ.get('BROKER_PORT') or 1883)
9+
BROKER_STARTUP = os.environ.get('BROKER_STARTUP') not in ['false', '0', False]
10+
11+
# MQTT topic name
12+
MQTT_TOPIC_NAME = 'cpop'
13+
14+
# local folder to store temporary files
15+
TMP_FOLDER = os.path.join(tempfile.gettempdir(), 'cpopserver')
16+
17+
# configure logging
18+
logging.basicConfig(level=logging.INFO)
File renamed without changes.
File renamed without changes.

cpopserver/core/models.py cpopservice/core/models.py

+11
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33

44
class BaseModel(object):
5+
def __init__(self, params={}):
6+
self.__dict__ = params
7+
58
def to_bson(self):
69
result = bson.dumps(self.__dict__)
710
return result
@@ -20,6 +23,14 @@ def __hash__(self, other):
2023
# TODO: implement hash function if needed
2124
return 0
2225

26+
def __repr__(self):
27+
return '%s(%s)' % (self.__class__.__name__, self.__dict__)
28+
2329

2430
class Event(BaseModel):
31+
# attributes: timestamp, shape (list of Coordinates), position (Coordinates)
32+
pass
33+
34+
35+
class Coordinates(BaseModel):
2536
pass

cpopservice/core/pubsub.py

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import logging
2+
import paho.mqtt.client as mqtt
3+
from cpopservice import config
4+
5+
LOG = logging.getLogger(__name__)
6+
7+
8+
class CPOPPublisher:
9+
""" Publisher that publishes CPOP events to the pub/sub broker """
10+
11+
def publish_event(self, event):
12+
raise Exception('Not implemented')
13+
14+
@staticmethod
15+
def get(impl_type=None):
16+
subclasses = CPOPPublisher.__subclasses__()
17+
subclasses = {subclass.name(): subclass for subclass in subclasses}
18+
if not impl_type and len(subclasses) != 1:
19+
raise Exception('Multiple CPOPPublisher implemtations found and type not specified')
20+
subclass = subclasses.get(impl_type) or list(subclasses.values())[0]
21+
return subclass()
22+
23+
24+
class CPOPPublisherMQTT(CPOPPublisher):
25+
""" Publisher based on MQTT broker """
26+
27+
@staticmethod
28+
def name():
29+
return 'mqtt'
30+
31+
def __init__(self):
32+
self.client = mqtt.Client()
33+
self.client.connect(config.BROKER_HOST, config.BROKER_PORT)
34+
35+
def publish_event(self, event):
36+
LOG.info('Publishing message to topic %s: %s' % (config.MQTT_TOPIC_NAME, event))
37+
self.client.publish(config.MQTT_TOPIC_NAME, event.to_bson())

cpopservice/server.py

+114
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
import os
2+
import glob
3+
import time
4+
import queue
5+
import logging
6+
import cv2
7+
from cpopservice import config
8+
from cpopservice.constants import MOSQUITTO_URL_LINUX
9+
from cpopservice.core.models import Event
10+
from cpopservice.core.pubsub import CPOPPublisher
11+
from cpopservice.capture_loop import run_capture_loop
12+
from cpopservice.utils.common import (
13+
ShellCommandThread, FuncThread, get_os_type, mkdir, download, untar, find_command, sleep_forever)
14+
15+
LOG = logging.getLogger(__name__)
16+
THREADS = []
17+
LOCAL_HOSTS = ['localhost', '0.0.0.0', '127.0.0.1']
18+
19+
20+
class CPOPServer(FuncThread):
21+
def __init__(self):
22+
FuncThread.__init__(self, self.run_loop)
23+
self.publisher = CPOPPublisher.get()
24+
25+
def run_loop(self):
26+
LOG.info('Starting CPOP server capture loop')
27+
result_queue = queue.Queue()
28+
capture = get_capture_device()
29+
QueueSubscriber(result_queue, self.publisher).start()
30+
run_capture_loop(capture, result_queue=result_queue)
31+
32+
33+
class QueueSubscriber(FuncThread):
34+
def __init__(self, queue, publisher):
35+
self.queue = queue
36+
self.publisher = publisher
37+
FuncThread.__init__(self, self.run_loop)
38+
39+
def run_loop(self):
40+
LOG.info('Starting CPOP queue subscriber loop')
41+
while True:
42+
message = self.queue.get()
43+
message = self._prepare_message(message)
44+
self.publisher.publish_event(message)
45+
46+
def _prepare_message(self, message):
47+
# TODO: for testing only
48+
if isinstance(message, dict):
49+
import time
50+
time.sleep(0.5)
51+
message = Event(message)
52+
return message
53+
54+
55+
def get_capture_device():
56+
# TODO review this
57+
capture = cv2.VideoCapture(0)
58+
return capture
59+
60+
61+
def start_mqtt_broker():
62+
broker_host = config.BROKER_HOST
63+
if not config.BROKER_STARTUP or broker_host not in LOCAL_HOSTS:
64+
return
65+
mosquitto_bin = install_mqtt()
66+
LOG.info('Starting MQTT broker on port %s' % config.BROKER_PORT)
67+
t = ShellCommandThread('%s -p %s' % (mosquitto_bin, config.BROKER_PORT))
68+
t.start()
69+
time.sleep(1)
70+
THREADS.append(t)
71+
72+
73+
def install_mqtt():
74+
bin_path = find_command('mosquitto')
75+
if bin_path:
76+
return bin_path
77+
os_type = get_os_type()
78+
target_dir = os.path.join(config.TMP_FOLDER, 'mosquitto')
79+
archive = os.path.join(target_dir, 'archive.tgz')
80+
mkdir(target_dir)
81+
if not os.path.exists(archive):
82+
if os_type == 'linux':
83+
url = MOSQUITTO_URL_LINUX
84+
else:
85+
raise Exception('Unsupported OS type: %s' % os_type)
86+
download(url, archive)
87+
untar(archive, target_dir)
88+
result = glob.glob('%s/**/*bin/mosquitto' % target_dir, recursive=True)
89+
return result[0]
90+
91+
92+
def startup_servers():
93+
start_mqtt_broker()
94+
CPOPServer().start()
95+
96+
97+
def shutdown_servers():
98+
LOG.info('Shutting down server threads ...')
99+
while THREADS:
100+
t = THREADS[0]
101+
del THREADS[0]
102+
t.stop()
103+
104+
105+
def main():
106+
startup_servers()
107+
try:
108+
sleep_forever()
109+
except KeyboardInterrupt:
110+
shutdown_servers()
111+
112+
113+
if __name__ == '__main__':
114+
main()
File renamed without changes.
File renamed without changes.

requirements.txt

+1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ pandas
2828
# extras --------------------------------------
2929
thop # FLOPS computation
3030
pycocotools>=2.0 # COCO mAP
31+
opencv-python
3132

3233
# CPOP Server dependencies
3334
bson
File renamed without changes.

tests/cpopserver/test_base_server.py tests/cpopservice/test_mqtt_communication.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import queue
22
import unittest
33
import paho.mqtt.client as mqtt
4-
from cpopserver import config
5-
from cpopserver.core import models
6-
from cpopserver.server import startup_servers, shutdown_servers
7-
from cpopserver.utils.common import short_uid, FuncThread, retry
4+
from cpopservice import config
5+
from cpopservice.core import models
6+
from cpopservice.server import startup_servers, shutdown_servers
7+
from cpopservice.utils.common import short_uid, FuncThread, retry
88

99

1010
class TestCPOPServer(unittest.TestCase):

0 commit comments

Comments
 (0)