|
9 | 9 |
|
10 | 10 | logger = logging.getLogger()
|
11 | 11 |
|
| 12 | +class Picamera2EoncoderMock(threading.Thread): |
| 13 | + def __init__(self, output): |
| 14 | + self.output = output |
| 15 | + self.image_jpeg = open('test/test_image.jpeg', 'rb').read() |
| 16 | + self.exit = False |
| 17 | + super().__init__() |
| 18 | + |
| 19 | + def close(self): |
| 20 | + self.output[0].close() |
| 21 | + |
| 22 | + def stop(self): |
| 23 | + self.exit = True |
| 24 | + |
| 25 | + def run(self): |
| 26 | + while(not self.exit): |
| 27 | + self.output[0].outputframe(self.image_jpeg) |
| 28 | + time.sleep(0.03) |
| 29 | + |
| 30 | +class Picamera2MJPEGEncoderMock(Picamera2EoncoderMock): |
| 31 | + def __init__(self, output=None, quality=20): |
| 32 | + super().__init__(output) |
| 33 | + self.quality = quality |
| 34 | + |
| 35 | +class Picamera2H264EncoderMock(Picamera2EoncoderMock): |
| 36 | + def __init__(self, output=None): |
| 37 | + super().__init__(output) |
| 38 | + |
12 | 39 | class Picamera2Mock(object):
|
13 | 40 | """Implements PiCamera mock class
|
14 | 41 | PiCamera is the library used to access the integrated Camera, this mock class emulates the capture functions in order to test the streamer loop.
|
@@ -53,35 +80,13 @@ def __init__(self, buffer, video):
|
53 | 80 | def start(self):
|
54 | 81 | pass
|
55 | 82 |
|
56 |
| - def start_recording(self, buffer, format, splitter_port, quality=None, bitrate=None, resize=None): |
57 |
| - """mock start_recording""" |
58 |
| - print(format) |
59 |
| - if format == "bgra" and resize: |
60 |
| - self.images[format] = cv2.resize(self.images[format], resize) |
61 |
| - if format == "h264": |
62 |
| - f = open("test/test.h264", "rb") |
63 |
| - video = f.read() |
64 |
| - f.close() |
65 |
| - self.splitter_recorders[splitter_port] = self.VideoRecorder(buffer, video) |
66 |
| - else: |
67 |
| - self.splitter_recorders[splitter_port] = self.ImageRecorder(buffer, self.images[format]) |
68 |
| - self.splitter_recorders[splitter_port].start() |
69 |
| - |
70 |
| - def stop_recording(self, splitter_port): |
71 |
| - if splitter_port < 2: |
72 |
| - self.splitter_recorders[splitter_port].go = False |
73 |
| - self.splitter_recorders[splitter_port].join() |
74 |
| - else: |
75 |
| - recorder = self.splitter_recorders[splitter_port] |
76 |
| - f = open(recorder.buffer, "wb") |
77 |
| - f.write(recorder.video) |
78 |
| - f.close() |
79 |
| - |
80 |
| - def start_encoder(self, encoder): |
81 |
| - pass |
| 83 | + def start_encoder(self, encoder, output=None): |
| 84 | + encoder.start() |
| 85 | + output |
82 | 86 |
|
83 | 87 | def stop_encoder(self, encoders):
|
84 |
| - pass |
| 88 | + for encoder in encoders: |
| 89 | + encoder.stop() |
85 | 90 |
|
86 | 91 | def capture_buffer(self):
|
87 | 92 | return self.images["bgra"]
|
|
0 commit comments