Skip to content

Commit 1b58d12

Browse files
authored
Ensure subject removes cancelled consumer (#17)
* Ensure subject removes cancelled consumer Fixes #11 * fixing formatting
1 parent 3862105 commit 1b58d12

File tree

2 files changed

+87
-3
lines changed

2 files changed

+87
-3
lines changed

src/main/kotlin/hu/akarnokd/kotlin/flow/ResumableCollector.kt

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@
1616

1717
package hu.akarnokd.kotlin.flow
1818

19+
import kotlinx.coroutines.CancellationException
1920
import kotlinx.coroutines.flow.FlowCollector
21+
import kotlinx.coroutines.isActive
22+
import kotlin.coroutines.coroutineContext
2023

2124
/**
2225
* A collector that hosts a signal (value/error/completion)
@@ -66,7 +69,7 @@ open class ResumableCollector<T> : Resumable() {
6669
consumerReady.resume()
6770
}
6871

69-
suspend fun drain(collector: FlowCollector<T>, onCrash: ((ResumableCollector<T>) -> Unit)? = null) {
72+
suspend fun drain(collector: FlowCollector<T>, onComplete: ((ResumableCollector<T>) -> Unit)? = null) {
7073
while (true) {
7174

7275
readyConsumer()
@@ -80,9 +83,14 @@ open class ResumableCollector<T> : Resumable() {
8083
hasValue = false
8184

8285
try {
83-
collector.emit(v)
86+
if (coroutineContext.isActive) {
87+
collector.emit(v)
88+
} else {
89+
onComplete?.invoke(this)
90+
throw CancellationException()
91+
}
8492
} catch (exc: Throwable) {
85-
onCrash?.invoke(this)
93+
onComplete?.invoke(this)
8694

8795
readyConsumer() // unblock waiters
8896
throw exc

src/test/kotlin/hu/akarnokd/kotlin/flow/PublishSubjectTest.kt

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,4 +274,80 @@ class PublishSubjectTest {
274274

275275
assertEquals(0, counter1.get())
276276
}
277+
278+
@Test
279+
fun cancelledConsumer() = runBlocking {
280+
withSingle {
281+
val subject = PublishSubject<Int>()
282+
283+
val expected = 3
284+
val n = 10
285+
286+
val counter1 = AtomicInteger()
287+
288+
val job1 = launch(it.asCoroutineDispatcher()) {
289+
subject.collect {
290+
if (counter1.incrementAndGet() == expected) {
291+
cancel()
292+
}
293+
}
294+
}
295+
296+
while (!subject.hasCollectors()) {
297+
delay(1)
298+
}
299+
300+
for (i in 1..n) {
301+
subject.emit(i)
302+
}
303+
304+
assertEquals(true, job1.isCancelled)
305+
assertEquals(expected, counter1.get())
306+
assertEquals(0, subject.collectorCount())
307+
}
308+
309+
}
310+
311+
@Test
312+
fun cancelledOneCollectorSecondCompletes() = runBlocking {
313+
withSingle {
314+
val subject = PublishSubject<Int>()
315+
316+
val expected = 3
317+
val n = 10
318+
319+
val counter1 = AtomicInteger()
320+
val counter2 = AtomicInteger()
321+
322+
val job1 = launch(it.asCoroutineDispatcher()) {
323+
subject.collect {
324+
if (counter1.incrementAndGet() == expected) {
325+
cancel()
326+
}
327+
}
328+
}
329+
330+
val job2 = launch(it.asCoroutineDispatcher()) {
331+
subject.collect { counter2.incrementAndGet() }
332+
}
333+
334+
while (subject.collectorCount() != 2) {
335+
delay(1)
336+
}
337+
338+
for (i in 1..n) {
339+
subject.emit(i)
340+
}
341+
342+
subject.complete()
343+
job2.join()
344+
345+
assertEquals(true, job1.isCancelled)
346+
assertEquals(true, job2.isCompleted)
347+
assertEquals(expected, counter1.get())
348+
assertEquals(n, counter2.get())
349+
assertEquals(0, subject.collectorCount())
350+
}
351+
352+
}
277353
}

0 commit comments

Comments
 (0)