Skip to content

Commit

Permalink
Expose relay messages and connection state (#54)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmateoac authored Mar 12, 2024
1 parent 17ca6a1 commit 57bdde7
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 16 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ docker-sources/
/gradle/wrapper/
/.hermit
local.properties
**.DS_Store
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[versions]
acinqSecp256 = "0.10.1"
guava = "32.1.1-jre"
guava = "31.1-jre"
junit = "5.9.3"
kotest = "5.6.2"
# @pin
Expand Down
4 changes: 4 additions & 0 deletions lib/api/lib.api
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public abstract class app/cash/nostrino/client/Relay {
public final fun getDirectMessages ()Lkotlinx/coroutines/flow/Flow;
public final fun getNotes ()Lkotlinx/coroutines/flow/Flow;
public final fun getReactions ()Lkotlinx/coroutines/flow/Flow;
public abstract fun getRelayMessages ()Lkotlinx/coroutines/flow/Flow;
public final fun getUserMetaData ()Lkotlinx/coroutines/flow/Flow;
public abstract fun send (Lapp/cash/nostrino/model/Event;)V
public abstract fun start ()V
Expand All @@ -34,6 +35,8 @@ public final class app/cash/nostrino/client/RelayClient : app/cash/nostrino/clie
public fun <init> (Ljava/lang/String;Lokhttp3/OkHttpClient;)V
public synthetic fun <init> (Ljava/lang/String;Lokhttp3/OkHttpClient;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun getAllEvents ()Lkotlinx/coroutines/flow/Flow;
public final fun getConnectionState ()Lkotlinx/coroutines/flow/StateFlow;
public fun getRelayMessages ()Lkotlinx/coroutines/flow/Flow;
public fun send (Lapp/cash/nostrino/model/Event;)V
public fun start ()V
public fun stop ()V
Expand Down Expand Up @@ -68,6 +71,7 @@ public final class app/cash/nostrino/client/RelaySet : app/cash/nostrino/client/
public static synthetic fun copy$default (Lapp/cash/nostrino/client/RelaySet;Ljava/util/Set;ILjava/lang/Object;)Lapp/cash/nostrino/client/RelaySet;
public fun equals (Ljava/lang/Object;)Z
public fun getAllEvents ()Lkotlinx/coroutines/flow/Flow;
public fun getRelayMessages ()Lkotlinx/coroutines/flow/Flow;
public final fun getRelays ()Ljava/util/Set;
public fun hashCode ()I
public fun send (Lapp/cash/nostrino/model/Event;)V
Expand Down
15 changes: 10 additions & 5 deletions lib/src/main/kotlin/app/cash/nostrino/client/Relay.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package app.cash.nostrino.client

import app.cash.nostrino.message.relay.RelayMessage
import app.cash.nostrino.message.relay.EventMessage
import app.cash.nostrino.model.EncryptedDm
import app.cash.nostrino.model.Event
import app.cash.nostrino.model.Filter
Expand Down Expand Up @@ -52,18 +54,21 @@ abstract class Relay {
/** Unsubscribe from a subscription */
abstract fun unsubscribe(subscription: Subscription)

/** All events transmitted by this relay for our active subscriptions */
/** All messages transmitted by this relay for our active subscriptions */
abstract val relayMessages : Flow<RelayMessage>

/** The subset of [RelayMessage] that only contain messages of type [EventMessage] */
abstract val allEvents: Flow<Event>

/** The subset of allEvents that are of type TextNote */
/** The subset of [allEvents] that are of type [TextNote] */
val notes: Flow<Event> by lazy { allEvents.filter { it.kind == TextNote.kind } }

/** The subset of allEvents that are of type EncryptedDm */
/** The subset of [allEvents] that are of type [EncryptedDm] */
val directMessages: Flow<Event> by lazy { allEvents.filter { it.kind == EncryptedDm.kind } }

/** The subset of allEvents that are of type UserMetaData */
/** The subset of [allEvents] that are of type [UserMetaData] */
val userMetaData: Flow<Event> by lazy { allEvents.filter { it.kind == UserMetaData.kind } }

/** The subset of allEvents that are of type Reaction */
/** The subset of [allEvents] that are of type [Reaction] */
val reactions: Flow<Event> by lazy { allEvents.filter { it.kind == Reaction.kind } }
}
18 changes: 13 additions & 5 deletions lib/src/main/kotlin/app/cash/nostrino/client/RelayClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch
Expand All @@ -55,18 +58,21 @@ class RelayClient(
private val jsonListAdapter = moshi.adapter(List::class.java)
private val queuedMessages = MutableSharedFlow<String>(replay = 512)
private val listener = RelayListener(url, this)
private val receivedMessages: Flow<RelayMessage> by lazy { listener.messages() }
private val subscriptions: MutableMap<Subscription, Set<Filter>> = Collections.synchronizedMap(mutableMapOf())

private var connectionState = Disconnected
private var _connectionState = MutableStateFlow(Disconnected)
val connectionState : StateFlow<ConnectionState> get() = _connectionState.asStateFlow() // TODO add tests

private var messageSendingJob: Job? = null
private var socket: WebSocket? = null

override val relayMessages: Flow<RelayMessage> by lazy { listener.messages() }

override fun start() {
if (connectionState == Disconnected) {
if (connectionState.value == Disconnected) {
logger.info { "Connecting to $url" }
socket = client.newWebSocket(Request.Builder().url(url).build(), listener)
connectionState = Connecting
_connectionState.value = Connecting
}
}

Expand All @@ -79,6 +85,8 @@ class RelayClient(
Disconnected -> connect()
Disconnecting -> stopTalking()
}

_connectionState.value = newState
}

override fun stop() {
Expand All @@ -101,7 +109,7 @@ class RelayClient(
send(listOf("CLOSE", subscription.id))
}

override val allEvents: Flow<Event> by lazy { receivedMessages.filterIsInstance<EventMessage>().map { it.event } }
override val allEvents: Flow<Event> by lazy { relayMessages.filterIsInstance<EventMessage>().map { it.event } }

private fun send(message: List<Any>) {
queuedMessages.tryEmit(jsonListAdapter.toJson(message))
Expand Down
20 changes: 16 additions & 4 deletions lib/src/main/kotlin/app/cash/nostrino/client/RelaySet.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@

package app.cash.nostrino.client

import app.cash.nostrino.message.relay.EventMessage
import app.cash.nostrino.message.relay.RelayMessage
import app.cash.nostrino.model.Event
import app.cash.nostrino.model.Filter
import com.google.common.cache.CacheBuilder
import com.google.common.cache.CacheLoader
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.filterNot
import kotlinx.coroutines.flow.flattenMerge
import kotlinx.coroutines.flow.map
Expand All @@ -45,16 +48,25 @@ data class RelaySet(
override fun unsubscribe(subscription: Subscription) = relays.forEach { it.unsubscribe(subscription) }

@OptIn(ExperimentalCoroutinesApi::class)
override val allEvents: Flow<Event> by lazy {
override val relayMessages: Flow<RelayMessage> by lazy {
val cache = CacheBuilder.newBuilder()
.maximumSize(4096)
.build<ByteString, Boolean>(CacheLoader.from { _ -> false })
relays.map { it.allEvents }.asFlow()

relays.map { it.relayMessages }.asFlow()
.flattenMerge()
.filterNot { cache.get(it.id) }
.filterNot {
it is EventMessage && cache.get(it.event.id)
}
.map {
cache.put(it.id, true)
if(it is EventMessage) {
cache.put(it.event.id, true)
}
it
}
}

override val allEvents: Flow<Event> by lazy {
relayMessages.filterIsInstance<EventMessage>().map { it.event }
}
}
32 changes: 31 additions & 1 deletion lib/src/test/kotlin/app/cash/nostrino/client/RelaySetTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,31 @@
package app.cash.nostrino.client

import app.cash.nostrino.crypto.PubKeyTest.Companion.arbPubKey
import app.cash.nostrino.message.relay.EventMessage
import app.cash.nostrino.message.relay.RelayMessage
import app.cash.nostrino.model.ArbEvent.arbEvent
import app.cash.nostrino.model.ArbEvent.arbEventMessage
import app.cash.nostrino.model.ArbEvent.arbRelayMessage
import app.cash.nostrino.model.ArbEvent.arbSubscriptionId
import app.cash.nostrino.model.Event
import app.cash.nostrino.model.Filter
import io.kotest.core.spec.style.StringSpec
import io.kotest.matchers.collections.shouldBeSameSizeAs
import io.kotest.matchers.collections.shouldContainAll
import io.kotest.matchers.collections.shouldContainExactly
import io.kotest.matchers.equals.shouldBeEqual
import io.kotest.matchers.shouldBe
import io.kotest.property.Arb
import io.kotest.property.arbitrary.list
import io.kotest.property.arbitrary.next
import io.kotest.property.checkAll
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.toList

class RelaySetTest : StringSpec({

fun relaySet(): RelaySet = RelaySet(
setOf(FakeRelay(), FakeRelay(), FakeRelay())
)
Expand Down Expand Up @@ -94,6 +103,8 @@ class RelaySetTest : StringSpec({
FakeRelay(events.drop(4).take(2).toMutableList())
)
)
set.subscribe(Filter.globalFeedNotes)

set.allEvents.toList() shouldContainExactly events
}

Expand Down Expand Up @@ -128,6 +139,19 @@ class RelaySetTest : StringSpec({
set.send(events.first())
set.allEvents.toList() shouldContainExactly events
}

"merge relay messages into a single flow" {
val events = Arb.list(arbEvent, 20..20).next().distinct().take(6).toMutableList()
val set = RelaySet(
setOf(
FakeRelay(events),
FakeRelay(events),
FakeRelay(events)
)
)

set.relayMessages.toList().filterIsInstance<EventMessage>().map { it.event } shouldBeEqual events
}
})

class FakeRelay(val sent: MutableList<Event> = mutableListOf()) : Relay() {
Expand Down Expand Up @@ -157,5 +181,11 @@ class FakeRelay(val sent: MutableList<Event> = mutableListOf()) : Relay() {
unsubscriptions.add(subscription)
}

override val relayMessages: Flow<RelayMessage>
get() = sent.asSequence()
.zip(arbSubscriptionId.samples())
.map { (event, id) -> EventMessage(id.value, event) }
.asFlow()

override val allEvents: Flow<Event> = sent.asFlow()
}

0 comments on commit 57bdde7

Please sign in to comment.