Skip to content

Commit 3862105

Browse files
committed
Add onBackpressureDrop
1 parent 8b1fc2c commit 3862105

File tree

4 files changed

+111
-1
lines changed

4 files changed

+111
-1
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ repositories {
3030
dependencies {
3131
compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version"
3232
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.3'
33-
testCompile 'junit:junit:4.12'
33+
testCompile 'junit:junit:4.13'
3434
testCompile "org.jetbrains.kotlin:kotlin-test-junit:$kotlin_version"
3535
}
3636

src/main/kotlin/hu/akarnokd/kotlin/flow/FlowExtensions.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,12 @@ fun <T> Flow<T>.toList() : Flow<List<T>> {
158158
}
159159
}
160160

161+
/**
162+
* Drops items from the upstream when the downstream is not ready to receive them.
163+
*/
164+
@FlowPreview
165+
fun <T> Flow<T>.onBackpressurureDrop() : Flow<T> = FlowOnBackpressureDrop(this)
166+
161167
// -----------------------------------------------------------------------------------------
162168
// Parallel Extensions
163169
// -----------------------------------------------------------------------------------------
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package hu.akarnokd.kotlin.flow.impl
2+
3+
import hu.akarnokd.kotlin.flow.Resumable
4+
import kotlinx.coroutines.*
5+
import kotlinx.coroutines.flow.AbstractFlow
6+
import kotlinx.coroutines.flow.Flow
7+
import kotlinx.coroutines.flow.FlowCollector
8+
import kotlinx.coroutines.flow.collect
9+
import java.util.concurrent.atomic.AtomicBoolean
10+
import java.util.concurrent.atomic.AtomicReference
11+
12+
@FlowPreview
13+
internal class FlowOnBackpressureDrop<T>(private val source: Flow<T>) : AbstractFlow<T>() {
14+
@ExperimentalCoroutinesApi
15+
@InternalCoroutinesApi
16+
override suspend fun collectSafely(collector: FlowCollector<T>) {
17+
coroutineScope {
18+
val consumerReady = AtomicBoolean()
19+
val producerReady = Resumable()
20+
val value = AtomicReference<T>()
21+
val done = AtomicBoolean()
22+
val error = AtomicReference<Throwable>();
23+
24+
launch {
25+
try {
26+
source.collect {
27+
if (consumerReady.get()) {
28+
value.set(it);
29+
consumerReady.set(false);
30+
producerReady.resume();
31+
}
32+
}
33+
done.set(true)
34+
} catch (ex: Throwable) {
35+
error.set(ex)
36+
}
37+
producerReady.resume()
38+
}
39+
40+
while (true) {
41+
consumerReady.set(true)
42+
producerReady.await()
43+
44+
val d = done.get()
45+
val ex = error.get()
46+
val v = value.getAndSet(null)
47+
48+
if (ex != null) {
49+
throw ex;
50+
}
51+
if (d) {
52+
break;
53+
}
54+
55+
collector.emit(v)
56+
}
57+
}
58+
}
59+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2019 David Karnok
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package hu.akarnokd.kotlin.flow.impl
18+
19+
import hu.akarnokd.kotlin.flow.assertResult
20+
import hu.akarnokd.kotlin.flow.onBackpressurureDrop
21+
import hu.akarnokd.kotlin.flow.startCollectOn
22+
import kotlinx.coroutines.*
23+
import kotlinx.coroutines.flow.*
24+
import org.junit.Test
25+
26+
@FlowPreview
27+
class FlowOnBackpressureDropTest {
28+
@InternalCoroutinesApi
29+
@Test
30+
fun basic() = runBlocking {
31+
32+
flow {
33+
for (i in 0 until 10) {
34+
emit(i)
35+
delay(100)
36+
}
37+
}
38+
.onBackpressurureDrop()
39+
.map {
40+
delay(130)
41+
it
42+
}
43+
.assertResult(0, 2, 4, 6, 8)
44+
}
45+
}

0 commit comments

Comments
 (0)