@@ -176,4 +176,104 @@ extension AnomaliesTest {
176
176
performSharingOperatorsTest ( share: op)
177
177
}
178
178
}
179
+
180
+ func testShareReplayOneInitialEmissionDeadlock( ) {
181
+ let immediatelyEmittingSource = Observable < Void > . create { observer in
182
+ observer. on ( . next( ( ) ) )
183
+ return Disposables . create ( )
184
+ }
185
+ . share ( replay: 1 )
186
+
187
+ let exp = createInitialEmissionsDeadlockExpectation (
188
+ sourceName: " `share(replay: 1)` " ,
189
+ immediatelyEmittingSource: immediatelyEmittingSource
190
+ )
191
+
192
+ wait ( for: [ exp] , timeout: 1 )
193
+ }
194
+
195
+ func testIdleBehaviorSubjectInitialEmissionDeadlock( ) {
196
+ let immediatelyEmittingSource = BehaviorSubject < Void > ( value: ( ) )
197
+
198
+ let exp = createInitialEmissionsDeadlockExpectation (
199
+ sourceName: " 'Idle BehaviorSubject' " ,
200
+ immediatelyEmittingSource: immediatelyEmittingSource
201
+ )
202
+
203
+ wait ( for: [ exp] , timeout: 1 )
204
+ }
205
+
206
+ func testCompletedBehaviorSubjectInitialEmissionDeadlock( ) {
207
+ let immediatelyEmittingSource = BehaviorSubject < Void > ( value: ( ) )
208
+ immediatelyEmittingSource. on ( . completed)
209
+
210
+ let exp = createInitialEmissionsDeadlockExpectation (
211
+ sourceName: " 'BehaviorSubject with completed event' " ,
212
+ immediatelyEmittingSource: immediatelyEmittingSource
213
+ )
214
+
215
+ wait ( for: [ exp] , timeout: 1 )
216
+ }
217
+
218
+ func testCompletedPublishSubjectInitialEmissionDeadlock( ) {
219
+ let immediatelyEmittingSource = PublishSubject < Void > ( )
220
+ immediatelyEmittingSource. on ( . completed)
221
+
222
+ let exp = createInitialEmissionsDeadlockExpectation (
223
+ sourceName: " 'PublishSubject with completed event' " ,
224
+ immediatelyEmittingSource: immediatelyEmittingSource
225
+ )
226
+
227
+ wait ( for: [ exp] , timeout: 1 )
228
+ }
229
+
230
+ func testIdleReplaySubjectInitialEmissionDeadlock( ) {
231
+ let immediatelyEmittingSource = ReplaySubject< Void> . create( bufferSize: 1 )
232
+ immediatelyEmittingSource. on ( . next( ( ) ) )
233
+
234
+ let exp = createInitialEmissionsDeadlockExpectation (
235
+ sourceName: " 'Idle ReplaySubject' " ,
236
+ immediatelyEmittingSource: immediatelyEmittingSource
237
+ )
238
+
239
+ wait ( for: [ exp] , timeout: 1 )
240
+ }
241
+
242
+ func testCompletedReplaySubjectInitialEmissionDeadlock( ) {
243
+ let immediatelyEmittingSource = ReplaySubject< Void> . create( bufferSize: 1 )
244
+ immediatelyEmittingSource. on ( . completed)
245
+
246
+ let exp = createInitialEmissionsDeadlockExpectation (
247
+ sourceName: " 'ReplaySubject with completed event' " ,
248
+ immediatelyEmittingSource: immediatelyEmittingSource
249
+ )
250
+
251
+ wait ( for: [ exp] , timeout: 1 )
252
+ }
253
+
254
+ private func createInitialEmissionsDeadlockExpectation(
255
+ sourceName: String ,
256
+ immediatelyEmittingSource: Observable < Void >
257
+ ) -> XCTestExpectation {
258
+ let exp = expectation ( description: " ` \( sourceName) ` doesn't cause a deadlock in multithreaded environment because it replays with its own lock acquired " )
259
+
260
+ let triggerRange = 0 ..< 100
261
+
262
+ let concurrentScheduler = ConcurrentDispatchQueueScheduler ( qos: . userInitiated)
263
+
264
+ let multipleSubscriptions = Observable . zip ( triggerRange. map { _ in
265
+ Observable . just ( ( ) )
266
+ . observe ( on: concurrentScheduler)
267
+ . flatMap { _ in
268
+ immediatelyEmittingSource
269
+ }
270
+ . take ( 1 )
271
+ } )
272
+
273
+ _ = multipleSubscriptions. subscribe ( onCompleted: {
274
+ exp. fulfill ( )
275
+ } )
276
+
277
+ return exp
278
+ }
179
279
}
0 commit comments