|
| 1 | +from vuer import Vuer |
| 2 | +from vuer.schemas import ImageBackground, Hands |
| 3 | +from multiprocessing import Process, Array, Value, shared_memory |
| 4 | +import numpy as np |
| 5 | +import asyncio |
| 6 | + |
| 7 | +class TeleVision: |
| 8 | + def __init__(self, img_shape, stereo=False): |
| 9 | + self.stereo = stereo |
| 10 | + |
| 11 | + self.app = Vuer(host='0.0.0.0', cert="./cert.pem", key="./key.pem", queries=dict(grid=False)) |
| 12 | + |
| 13 | + self.app.add_handler("HAND_MOVE")(self.on_hand_move) |
| 14 | + self.app.add_handler("CAMERA_MOVE")(self.on_cam_move) |
| 15 | + self.app.spawn(start=False)(self.main) |
| 16 | + |
| 17 | + self.img_shape = (2*img_shape[0], img_shape[1], 3) |
| 18 | + self.img_height, self.img_width = img_shape[:2] |
| 19 | + self.shm = shared_memory.SharedMemory(create=True, size=np.prod(self.img_shape) * np.uint8().itemsize) |
| 20 | + self.shm_name = self.shm.name |
| 21 | + self.shared_image = np.ndarray(self.img_shape, dtype=np.uint8, buffer=self.shm.buf) |
| 22 | + self.shared_image[:] = np.zeros(self.img_shape, dtype=np.uint8) |
| 23 | + |
| 24 | + self.left_hand_shared = Array('d', 16, lock=True) |
| 25 | + self.right_hand_shared = Array('d', 16, lock=True) |
| 26 | + self.left_landmarks_shared = Array('d', 75, lock=True) |
| 27 | + self.right_landmarks_shared = Array('d', 75, lock=True) |
| 28 | + |
| 29 | + self.head_matrix_shared = Array('d', 16, lock=True) |
| 30 | + self.aspect_shared = Value('d', 1.0, lock=True) |
| 31 | + |
| 32 | + self.process = Process(target=self.run) |
| 33 | + self.process.start() |
| 34 | + |
| 35 | + def run(self): |
| 36 | + self.app.run() |
| 37 | + |
| 38 | + async def on_cam_move(self, event, session): |
| 39 | + try: |
| 40 | + with self.head_matrix_shared.get_lock(): # Use the lock to ensure thread-safe updates |
| 41 | + self.head_matrix_shared[:] = event.value["camera"]["matrix"] |
| 42 | + with self.aspect_shared.get_lock(): |
| 43 | + self.aspect_shared.value = event.value['camera']['aspect'] |
| 44 | + except: |
| 45 | + pass |
| 46 | + |
| 47 | + async def on_hand_move(self, event, session): |
| 48 | + try: |
| 49 | + with self.left_hand_shared.get_lock(): # Use the lock to ensure thread-safe updates |
| 50 | + self.left_hand_shared[:] = event.value["leftHand"] |
| 51 | + with self.right_hand_shared.get_lock(): |
| 52 | + self.right_hand_shared[:] = event.value["rightHand"] |
| 53 | + with self.left_landmarks_shared.get_lock(): |
| 54 | + self.left_landmarks_shared[:] = np.array(event.value["leftLandmarks"]).flatten() |
| 55 | + with self.right_landmarks_shared.get_lock(): |
| 56 | + self.right_landmarks_shared[:] = np.array(event.value["rightLandmarks"]).flatten() |
| 57 | + except: |
| 58 | + pass |
| 59 | + |
| 60 | + async def main(self, session, fps=60): |
| 61 | + session.upsert @ Hands(fps=fps, stream=True, key="hands") |
| 62 | + while True: |
| 63 | + display_image = self.shared_image |
| 64 | + |
| 65 | + if not self.stereo: |
| 66 | + session.upsert( |
| 67 | + ImageBackground( |
| 68 | + display_image[:self.img_height], |
| 69 | + format="jpeg", |
| 70 | + quality=80, |
| 71 | + key="left-image", |
| 72 | + interpolate=True, |
| 73 | + aspect=1.778, |
| 74 | + distanceToCamera=2, |
| 75 | + position=[0, -0.5, -2], |
| 76 | + rotation=[0, 0, 0], |
| 77 | + ), |
| 78 | + to="bgChildren", |
| 79 | + ) |
| 80 | + else: |
| 81 | + session.upsert( |
| 82 | + [ImageBackground( |
| 83 | + display_image[:self.img_height], |
| 84 | + format="jpeg", |
| 85 | + quality=40, |
| 86 | + key="left-image", |
| 87 | + interpolate=True, |
| 88 | + aspect=1.778, |
| 89 | + distanceToCamera=2, |
| 90 | + layers=1 |
| 91 | + ), |
| 92 | + ImageBackground( |
| 93 | + display_image[self.img_height:], |
| 94 | + format="jpeg", |
| 95 | + quality=40, |
| 96 | + key="right-image", |
| 97 | + interpolate=True, |
| 98 | + aspect=1.778, |
| 99 | + distanceToCamera=2, |
| 100 | + layers=2 |
| 101 | + )], |
| 102 | + to="bgChildren", |
| 103 | + ) |
| 104 | + await asyncio.sleep(1/fps) |
| 105 | + |
| 106 | + def modify_shared_image(self, img, random=False): |
| 107 | + assert img.shape == self.img_shape, f"Image shape must be {self.img_shape}, got {img.shape}" |
| 108 | + existing_shm = shared_memory.SharedMemory(name=self.shm_name) |
| 109 | + shared_image = np.ndarray(self.img_shape, dtype=np.uint8, buffer=existing_shm.buf) |
| 110 | + shared_image[:] = img[:] if not random else np.random.randint(0, 256, self.img_shape, dtype=np.uint8) |
| 111 | + existing_shm.close() |
| 112 | + |
| 113 | + @property |
| 114 | + def left_hand(self): |
| 115 | + with self.left_hand_shared.get_lock(): |
| 116 | + return np.array(self.left_hand_shared[:]).reshape(4, 4, order="F") |
| 117 | + |
| 118 | + @property |
| 119 | + def right_hand(self): |
| 120 | + with self.right_hand_shared.get_lock(): |
| 121 | + return np.array(self.right_hand_shared[:]).reshape(4, 4, order="F") |
| 122 | + |
| 123 | + @property |
| 124 | + def left_landmarks(self): |
| 125 | + with self.left_landmarks_shared.get_lock(): |
| 126 | + return np.array(self.left_landmarks_shared[:]).reshape(25, 3) |
| 127 | + |
| 128 | + @property |
| 129 | + def right_landmarks(self): |
| 130 | + with self.right_landmarks_shared.get_lock(): |
| 131 | + return np.array(self.right_landmarks_shared[:]).reshape(25, 3) |
| 132 | + |
| 133 | + @property |
| 134 | + def head_matrix(self): |
| 135 | + with self.head_matrix_shared.get_lock(): |
| 136 | + return np.array(self.head_matrix_shared[:]).reshape(4, 4, order="F") |
| 137 | + |
| 138 | + @property |
| 139 | + def aspect(self): |
| 140 | + with self.aspect_shared.get_lock(): |
| 141 | + return float(self.aspect_shared.value) |
0 commit comments