From 57bdde7f2be296c8f8ff27a789d0bd4f587ccd32 Mon Sep 17 00:00:00 2001 From: Jose Mateo Date: Mon, 11 Mar 2024 21:33:08 -0400 Subject: [PATCH] Expose relay messages and connection state (#54) --- .gitignore | 1 + gradle/libs.versions.toml | 2 +- lib/api/lib.api | 4 +++ .../kotlin/app/cash/nostrino/client/Relay.kt | 15 ++++++--- .../app/cash/nostrino/client/RelayClient.kt | 18 ++++++++--- .../app/cash/nostrino/client/RelaySet.kt | 20 +++++++++--- .../app/cash/nostrino/client/RelaySetTest.kt | 32 ++++++++++++++++++- 7 files changed, 76 insertions(+), 16 deletions(-) diff --git a/.gitignore b/.gitignore index 86ff5ff..18d6f70 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,4 @@ docker-sources/ /gradle/wrapper/ /.hermit local.properties +**.DS_Store diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index e79ae77..66aa7a0 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,6 +1,6 @@ [versions] acinqSecp256 = "0.10.1" -guava = "32.1.1-jre" +guava = "31.1-jre" junit = "5.9.3" kotest = "5.6.2" # @pin diff --git a/lib/api/lib.api b/lib/api/lib.api index 5d25f4c..2ca4eb5 100644 --- a/lib/api/lib.api +++ b/lib/api/lib.api @@ -18,6 +18,7 @@ public abstract class app/cash/nostrino/client/Relay { public final fun getDirectMessages ()Lkotlinx/coroutines/flow/Flow; public final fun getNotes ()Lkotlinx/coroutines/flow/Flow; public final fun getReactions ()Lkotlinx/coroutines/flow/Flow; + public abstract fun getRelayMessages ()Lkotlinx/coroutines/flow/Flow; public final fun getUserMetaData ()Lkotlinx/coroutines/flow/Flow; public abstract fun send (Lapp/cash/nostrino/model/Event;)V public abstract fun start ()V @@ -34,6 +35,8 @@ public final class app/cash/nostrino/client/RelayClient : app/cash/nostrino/clie public fun (Ljava/lang/String;Lokhttp3/OkHttpClient;)V public synthetic fun (Ljava/lang/String;Lokhttp3/OkHttpClient;ILkotlin/jvm/internal/DefaultConstructorMarker;)V public fun getAllEvents ()Lkotlinx/coroutines/flow/Flow; + public final fun getConnectionState ()Lkotlinx/coroutines/flow/StateFlow; + public fun getRelayMessages ()Lkotlinx/coroutines/flow/Flow; public fun send (Lapp/cash/nostrino/model/Event;)V public fun start ()V public fun stop ()V @@ -68,6 +71,7 @@ public final class app/cash/nostrino/client/RelaySet : app/cash/nostrino/client/ public static synthetic fun copy$default (Lapp/cash/nostrino/client/RelaySet;Ljava/util/Set;ILjava/lang/Object;)Lapp/cash/nostrino/client/RelaySet; public fun equals (Ljava/lang/Object;)Z public fun getAllEvents ()Lkotlinx/coroutines/flow/Flow; + public fun getRelayMessages ()Lkotlinx/coroutines/flow/Flow; public final fun getRelays ()Ljava/util/Set; public fun hashCode ()I public fun send (Lapp/cash/nostrino/model/Event;)V diff --git a/lib/src/main/kotlin/app/cash/nostrino/client/Relay.kt b/lib/src/main/kotlin/app/cash/nostrino/client/Relay.kt index 3d72156..540d356 100644 --- a/lib/src/main/kotlin/app/cash/nostrino/client/Relay.kt +++ b/lib/src/main/kotlin/app/cash/nostrino/client/Relay.kt @@ -16,6 +16,8 @@ package app.cash.nostrino.client +import app.cash.nostrino.message.relay.RelayMessage +import app.cash.nostrino.message.relay.EventMessage import app.cash.nostrino.model.EncryptedDm import app.cash.nostrino.model.Event import app.cash.nostrino.model.Filter @@ -52,18 +54,21 @@ abstract class Relay { /** Unsubscribe from a subscription */ abstract fun unsubscribe(subscription: Subscription) - /** All events transmitted by this relay for our active subscriptions */ + /** All messages transmitted by this relay for our active subscriptions */ + abstract val relayMessages : Flow + + /** The subset of [RelayMessage] that only contain messages of type [EventMessage] */ abstract val allEvents: Flow - /** The subset of allEvents that are of type TextNote */ + /** The subset of [allEvents] that are of type [TextNote] */ val notes: Flow by lazy { allEvents.filter { it.kind == TextNote.kind } } - /** The subset of allEvents that are of type EncryptedDm */ + /** The subset of [allEvents] that are of type [EncryptedDm] */ val directMessages: Flow by lazy { allEvents.filter { it.kind == EncryptedDm.kind } } - /** The subset of allEvents that are of type UserMetaData */ + /** The subset of [allEvents] that are of type [UserMetaData] */ val userMetaData: Flow by lazy { allEvents.filter { it.kind == UserMetaData.kind } } - /** The subset of allEvents that are of type Reaction */ + /** The subset of [allEvents] that are of type [Reaction] */ val reactions: Flow by lazy { allEvents.filter { it.kind == Reaction.kind } } } diff --git a/lib/src/main/kotlin/app/cash/nostrino/client/RelayClient.kt b/lib/src/main/kotlin/app/cash/nostrino/client/RelayClient.kt index 6fbf06d..c88cc79 100644 --- a/lib/src/main/kotlin/app/cash/nostrino/client/RelayClient.kt +++ b/lib/src/main/kotlin/app/cash/nostrino/client/RelayClient.kt @@ -35,6 +35,9 @@ import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.filterIsInstance import kotlinx.coroutines.flow.map import kotlinx.coroutines.launch @@ -55,18 +58,21 @@ class RelayClient( private val jsonListAdapter = moshi.adapter(List::class.java) private val queuedMessages = MutableSharedFlow(replay = 512) private val listener = RelayListener(url, this) - private val receivedMessages: Flow by lazy { listener.messages() } private val subscriptions: MutableMap> = Collections.synchronizedMap(mutableMapOf()) - private var connectionState = Disconnected + private var _connectionState = MutableStateFlow(Disconnected) + val connectionState : StateFlow get() = _connectionState.asStateFlow() // TODO add tests + private var messageSendingJob: Job? = null private var socket: WebSocket? = null + override val relayMessages: Flow by lazy { listener.messages() } + override fun start() { - if (connectionState == Disconnected) { + if (connectionState.value == Disconnected) { logger.info { "Connecting to $url" } socket = client.newWebSocket(Request.Builder().url(url).build(), listener) - connectionState = Connecting + _connectionState.value = Connecting } } @@ -79,6 +85,8 @@ class RelayClient( Disconnected -> connect() Disconnecting -> stopTalking() } + + _connectionState.value = newState } override fun stop() { @@ -101,7 +109,7 @@ class RelayClient( send(listOf("CLOSE", subscription.id)) } - override val allEvents: Flow by lazy { receivedMessages.filterIsInstance().map { it.event } } + override val allEvents: Flow by lazy { relayMessages.filterIsInstance().map { it.event } } private fun send(message: List) { queuedMessages.tryEmit(jsonListAdapter.toJson(message)) diff --git a/lib/src/main/kotlin/app/cash/nostrino/client/RelaySet.kt b/lib/src/main/kotlin/app/cash/nostrino/client/RelaySet.kt index d3a3586..f301496 100644 --- a/lib/src/main/kotlin/app/cash/nostrino/client/RelaySet.kt +++ b/lib/src/main/kotlin/app/cash/nostrino/client/RelaySet.kt @@ -16,6 +16,8 @@ package app.cash.nostrino.client +import app.cash.nostrino.message.relay.EventMessage +import app.cash.nostrino.message.relay.RelayMessage import app.cash.nostrino.model.Event import app.cash.nostrino.model.Filter import com.google.common.cache.CacheBuilder @@ -23,6 +25,7 @@ import com.google.common.cache.CacheLoader import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.flow.filterIsInstance import kotlinx.coroutines.flow.filterNot import kotlinx.coroutines.flow.flattenMerge import kotlinx.coroutines.flow.map @@ -45,16 +48,25 @@ data class RelaySet( override fun unsubscribe(subscription: Subscription) = relays.forEach { it.unsubscribe(subscription) } @OptIn(ExperimentalCoroutinesApi::class) - override val allEvents: Flow by lazy { + override val relayMessages: Flow by lazy { val cache = CacheBuilder.newBuilder() .maximumSize(4096) .build(CacheLoader.from { _ -> false }) - relays.map { it.allEvents }.asFlow() + + relays.map { it.relayMessages }.asFlow() .flattenMerge() - .filterNot { cache.get(it.id) } + .filterNot { + it is EventMessage && cache.get(it.event.id) + } .map { - cache.put(it.id, true) + if(it is EventMessage) { + cache.put(it.event.id, true) + } it } } + + override val allEvents: Flow by lazy { + relayMessages.filterIsInstance().map { it.event } + } } diff --git a/lib/src/test/kotlin/app/cash/nostrino/client/RelaySetTest.kt b/lib/src/test/kotlin/app/cash/nostrino/client/RelaySetTest.kt index 37ebc2b..3b125c4 100644 --- a/lib/src/test/kotlin/app/cash/nostrino/client/RelaySetTest.kt +++ b/lib/src/test/kotlin/app/cash/nostrino/client/RelaySetTest.kt @@ -17,11 +17,19 @@ package app.cash.nostrino.client import app.cash.nostrino.crypto.PubKeyTest.Companion.arbPubKey +import app.cash.nostrino.message.relay.EventMessage +import app.cash.nostrino.message.relay.RelayMessage import app.cash.nostrino.model.ArbEvent.arbEvent +import app.cash.nostrino.model.ArbEvent.arbEventMessage +import app.cash.nostrino.model.ArbEvent.arbRelayMessage +import app.cash.nostrino.model.ArbEvent.arbSubscriptionId import app.cash.nostrino.model.Event import app.cash.nostrino.model.Filter import io.kotest.core.spec.style.StringSpec +import io.kotest.matchers.collections.shouldBeSameSizeAs +import io.kotest.matchers.collections.shouldContainAll import io.kotest.matchers.collections.shouldContainExactly +import io.kotest.matchers.equals.shouldBeEqual import io.kotest.matchers.shouldBe import io.kotest.property.Arb import io.kotest.property.arbitrary.list @@ -29,10 +37,11 @@ import io.kotest.property.arbitrary.next import io.kotest.property.checkAll import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.flow.filterIsInstance +import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.toList class RelaySetTest : StringSpec({ - fun relaySet(): RelaySet = RelaySet( setOf(FakeRelay(), FakeRelay(), FakeRelay()) ) @@ -94,6 +103,8 @@ class RelaySetTest : StringSpec({ FakeRelay(events.drop(4).take(2).toMutableList()) ) ) + set.subscribe(Filter.globalFeedNotes) + set.allEvents.toList() shouldContainExactly events } @@ -128,6 +139,19 @@ class RelaySetTest : StringSpec({ set.send(events.first()) set.allEvents.toList() shouldContainExactly events } + + "merge relay messages into a single flow" { + val events = Arb.list(arbEvent, 20..20).next().distinct().take(6).toMutableList() + val set = RelaySet( + setOf( + FakeRelay(events), + FakeRelay(events), + FakeRelay(events) + ) + ) + + set.relayMessages.toList().filterIsInstance().map { it.event } shouldBeEqual events + } }) class FakeRelay(val sent: MutableList = mutableListOf()) : Relay() { @@ -157,5 +181,11 @@ class FakeRelay(val sent: MutableList = mutableListOf()) : Relay() { unsubscriptions.add(subscription) } + override val relayMessages: Flow + get() = sent.asSequence() + .zip(arbSubscriptionId.samples()) + .map { (event, id) -> EventMessage(id.value, event) } + .asFlow() + override val allEvents: Flow = sent.asFlow() }