14
14
15
15
import java .io .IOException ;
16
16
import java .io .InputStream ;
17
- import java .io .InterruptedIOException ;
18
- import java .io .PipedInputStream ;
19
- import java .io .PipedOutputStream ;
20
17
import java .net .Socket ;
21
- import java .util .HashSet ;
22
18
import java .util .Set ;
23
- import java .util .concurrent .ConcurrentLinkedQueue ;
24
19
import java .util .concurrent .Future ;
25
20
import java .util .concurrent .ScheduledExecutorService ;
26
21
31
26
import org .openhab .core .audio .AudioFormat ;
32
27
import org .openhab .core .audio .AudioSource ;
33
28
import org .openhab .core .audio .AudioStream ;
29
+ import org .openhab .core .audio .PipedAudioStream ;
34
30
import org .openhab .core .common .ThreadPoolManager ;
35
31
import org .slf4j .Logger ;
36
32
import org .slf4j .LoggerFactory ;
45
41
public class PulseAudioAudioSource extends PulseaudioSimpleProtocolStream implements AudioSource {
46
42
47
43
private final Logger logger = LoggerFactory .getLogger (PulseAudioAudioSource .class );
48
- private final ConcurrentLinkedQueue < PipedOutputStream > pipeOutputs = new ConcurrentLinkedQueue <>() ;
44
+ private final PipedAudioStream . Group streamGroup ;
49
45
private final ScheduledExecutorService executor ;
46
+ private final AudioFormat streamFormat ;
50
47
51
48
private @ Nullable Future <?> pipeWriteTask ;
52
49
53
50
public PulseAudioAudioSource (PulseaudioHandler pulseaudioHandler , ScheduledExecutorService scheduler ) {
54
51
super (pulseaudioHandler , scheduler );
52
+ streamFormat = pulseaudioHandler .getSourceAudioFormat ();
55
53
executor = ThreadPoolManager
56
54
.getScheduledPool ("OH-binding-" + pulseaudioHandler .getThing ().getUID () + "-source" );
55
+ streamGroup = PipedAudioStream .newGroup (streamFormat );
57
56
}
58
57
59
58
@ Override
60
59
public Set <AudioFormat > getSupportedFormats () {
61
- var supportedFormats = new HashSet <AudioFormat >();
62
- var audioFormat = pulseaudioHandler .getSourceAudioFormat ();
63
- if (audioFormat != null ) {
64
- supportedFormats .add (audioFormat );
65
- }
66
- return supportedFormats ;
60
+ return Set .of (streamFormat );
67
61
}
68
62
69
63
@ Override
@@ -76,27 +70,18 @@ public AudioStream getInputStream(AudioFormat audioFormat) throws AudioException
76
70
if (clientSocketLocal == null ) {
77
71
break ;
78
72
}
79
- var sourceFormat = pulseaudioHandler .getSourceAudioFormat ();
80
- if (sourceFormat == null ) {
81
- throw new AudioException ("Unable to get source audio format" );
82
- }
83
- if (!audioFormat .isCompatible (sourceFormat )) {
73
+ if (!audioFormat .isCompatible (streamFormat )) {
84
74
throw new AudioException ("Incompatible audio format requested" );
85
75
}
86
- var pipeOutput = new PipedOutputStream ();
87
- var pipeInput = new PipedInputStream (pipeOutput , 1024 * 10 ) {
88
- @ Override
89
- public void close () throws IOException {
90
- unregisterPipe (pipeOutput );
91
- super .close ();
92
- }
93
- };
94
- registerPipe (pipeOutput );
95
- // get raw audio from the pulse audio socket
96
- return new PulseAudioStream (sourceFormat , pipeInput , () -> {
97
- // ensure pipe is writing
98
- startPipeWrite ();
76
+ var audioStream = streamGroup .getAudioStreamInGroup ();
77
+ audioStream .onClose (() -> {
78
+ minusClientCount ();
79
+ stopPipeWriteTask ();
99
80
});
81
+ addClientCount ();
82
+ startPipeWrite ();
83
+ // get raw audio from the pulse audio socket
84
+ return audioStream ;
100
85
} catch (IOException e ) {
101
86
disconnect (); // disconnect to force clear connection in case of socket not cleanly shutdown
102
87
if (countAttempt == 2 ) { // we won't retry : log and quit
@@ -120,14 +105,6 @@ public void close() throws IOException {
120
105
throw new AudioException ("Unable to create input stream" );
121
106
}
122
107
123
- private synchronized void registerPipe (PipedOutputStream pipeOutput ) {
124
- boolean isAdded = this .pipeOutputs .add (pipeOutput );
125
- if (isAdded ) {
126
- addClientCount ();
127
- }
128
- startPipeWrite ();
129
- }
130
-
131
108
/**
132
109
* As startPipeWrite is called for every chunk read,
133
110
* this wrapper method make the test before effectively
@@ -143,35 +120,16 @@ private synchronized void startPipeWriteSynchronized() {
143
120
if (this .pipeWriteTask == null ) {
144
121
this .pipeWriteTask = executor .submit (() -> {
145
122
int lengthRead ;
146
- byte [] buffer = new byte [1024 ];
123
+ byte [] buffer = new byte [1200 ];
147
124
int readRetries = 3 ;
148
- while (!pipeOutputs .isEmpty ()) {
125
+ while (!streamGroup .isEmpty ()) {
149
126
var stream = getSourceInputStream ();
150
127
if (stream != null ) {
151
128
try {
152
129
lengthRead = stream .read (buffer );
153
130
readRetries = 3 ;
154
- for (var output : pipeOutputs ) {
155
- try {
156
- output .write (buffer , 0 , lengthRead );
157
- if (pipeOutputs .contains (output )) {
158
- output .flush ();
159
- }
160
- } catch (InterruptedIOException e ) {
161
- if (pipeOutputs .isEmpty ()) {
162
- // task has been ended while writing
163
- return ;
164
- }
165
- logger .warn ("InterruptedIOException while writing from pulse source to pipe: {}" ,
166
- getExceptionMessage (e ));
167
- } catch (IOException e ) {
168
- logger .warn ("IOException while writing from pulse source to pipe: {}" ,
169
- getExceptionMessage (e ));
170
- } catch (RuntimeException e ) {
171
- logger .warn ("RuntimeException while writing from pulse source to pipe: {}" ,
172
- getExceptionMessage (e ));
173
- }
174
- }
131
+ streamGroup .write (buffer , 0 , lengthRead );
132
+ streamGroup .flush ();
175
133
} catch (IOException e ) {
176
134
logger .warn ("IOException while reading from pulse source: {}" , getExceptionMessage (e ));
177
135
if (readRetries == 0 ) {
@@ -192,25 +150,9 @@ private synchronized void startPipeWriteSynchronized() {
192
150
}
193
151
}
194
152
195
- private synchronized void unregisterPipe (PipedOutputStream pipeOutput ) {
196
- boolean isRemoved = this .pipeOutputs .remove (pipeOutput );
197
- if (isRemoved ) {
198
- minusClientCount ();
199
- }
200
- try {
201
- Thread .sleep (0 );
202
- } catch (InterruptedException ignored ) {
203
- }
204
- stopPipeWriteTask ();
205
- try {
206
- pipeOutput .close ();
207
- } catch (IOException ignored ) {
208
- }
209
- }
210
-
211
153
private synchronized void stopPipeWriteTask () {
212
154
var pipeWriteTask = this .pipeWriteTask ;
213
- if (pipeOutputs .isEmpty () && pipeWriteTask != null ) {
155
+ if (streamGroup .isEmpty () && pipeWriteTask != null ) {
214
156
pipeWriteTask .cancel (true );
215
157
this .pipeWriteTask = null ;
216
158
}
@@ -243,58 +185,4 @@ public void disconnect() {
243
185
stopPipeWriteTask ();
244
186
super .disconnect ();
245
187
}
246
-
247
- static class PulseAudioStream extends AudioStream {
248
- private final Logger logger = LoggerFactory .getLogger (PulseAudioAudioSource .class );
249
- private final AudioFormat format ;
250
- private final InputStream input ;
251
- private final Runnable activity ;
252
- private boolean closed = false ;
253
-
254
- public PulseAudioStream (AudioFormat format , InputStream input , Runnable activity ) {
255
- this .input = input ;
256
- this .format = format ;
257
- this .activity = activity ;
258
- }
259
-
260
- @ Override
261
- public AudioFormat getFormat () {
262
- return format ;
263
- }
264
-
265
- @ Override
266
- public int read () throws IOException {
267
- byte [] b = new byte [1 ];
268
- int bytesRead = read (b );
269
- if (-1 == bytesRead ) {
270
- return bytesRead ;
271
- }
272
- Byte bb = Byte .valueOf (b [0 ]);
273
- return bb .intValue ();
274
- }
275
-
276
- @ Override
277
- public int read (byte @ Nullable [] b ) throws IOException {
278
- return read (b , 0 , b == null ? 0 : b .length );
279
- }
280
-
281
- @ Override
282
- public int read (byte @ Nullable [] b , int off , int len ) throws IOException {
283
- if (b == null ) {
284
- throw new IOException ("Buffer is null" );
285
- }
286
- logger .trace ("reading from pulseaudio stream" );
287
- if (closed ) {
288
- throw new IOException ("Stream is closed" );
289
- }
290
- activity .run ();
291
- return input .read (b , off , len );
292
- }
293
-
294
- @ Override
295
- public void close () throws IOException {
296
- closed = true ;
297
- input .close ();
298
- }
299
- }
300
188
}
0 commit comments