Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose relay messages and connection state #54

Merged
merged 1 commit into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ docker-sources/
/gradle/wrapper/
/.hermit
local.properties
**.DS_Store
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[versions]
acinqSecp256 = "0.10.1"
guava = "32.1.1-jre"
guava = "31.1-jre"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My Android builds were hanging on the newer version of guava. (historically, android and guava don't get along very well)
Any appetite for finding a different solution for the cache?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added #57

junit = "5.9.3"
kotest = "5.6.2"
# @pin
Expand Down
4 changes: 4 additions & 0 deletions lib/api/lib.api
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public abstract class app/cash/nostrino/client/Relay {
public final fun getDirectMessages ()Lkotlinx/coroutines/flow/Flow;
public final fun getNotes ()Lkotlinx/coroutines/flow/Flow;
public final fun getReactions ()Lkotlinx/coroutines/flow/Flow;
public abstract fun getRelayMessages ()Lkotlinx/coroutines/flow/Flow;
public final fun getUserMetaData ()Lkotlinx/coroutines/flow/Flow;
public abstract fun send (Lapp/cash/nostrino/model/Event;)V
public abstract fun start ()V
Expand All @@ -34,6 +35,8 @@ public final class app/cash/nostrino/client/RelayClient : app/cash/nostrino/clie
public fun <init> (Ljava/lang/String;Lokhttp3/OkHttpClient;)V
public synthetic fun <init> (Ljava/lang/String;Lokhttp3/OkHttpClient;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun getAllEvents ()Lkotlinx/coroutines/flow/Flow;
public final fun getConnectionState ()Lkotlinx/coroutines/flow/StateFlow;
public fun getRelayMessages ()Lkotlinx/coroutines/flow/Flow;
public fun send (Lapp/cash/nostrino/model/Event;)V
public fun start ()V
public fun stop ()V
Expand Down Expand Up @@ -68,6 +71,7 @@ public final class app/cash/nostrino/client/RelaySet : app/cash/nostrino/client/
public static synthetic fun copy$default (Lapp/cash/nostrino/client/RelaySet;Ljava/util/Set;ILjava/lang/Object;)Lapp/cash/nostrino/client/RelaySet;
public fun equals (Ljava/lang/Object;)Z
public fun getAllEvents ()Lkotlinx/coroutines/flow/Flow;
public fun getRelayMessages ()Lkotlinx/coroutines/flow/Flow;
public final fun getRelays ()Ljava/util/Set;
public fun hashCode ()I
public fun send (Lapp/cash/nostrino/model/Event;)V
Expand Down
15 changes: 10 additions & 5 deletions lib/src/main/kotlin/app/cash/nostrino/client/Relay.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package app.cash.nostrino.client

import app.cash.nostrino.message.relay.RelayMessage
import app.cash.nostrino.message.relay.EventMessage
import app.cash.nostrino.model.EncryptedDm
import app.cash.nostrino.model.Event
import app.cash.nostrino.model.Filter
Expand Down Expand Up @@ -52,18 +54,21 @@ abstract class Relay {
/** Unsubscribe from a subscription */
abstract fun unsubscribe(subscription: Subscription)

/** All events transmitted by this relay for our active subscriptions */
/** All messages transmitted by this relay for our active subscriptions */
abstract val relayMessages : Flow<RelayMessage>
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exposing all messages so we can handle them on the app-side.
It's also important to expose the subscription ID where the message was received so that the correct observer knows the message is for them.


/** The subset of [RelayMessage] that only contain messages of type [EventMessage] */
abstract val allEvents: Flow<Event>

/** The subset of allEvents that are of type TextNote */
/** The subset of [allEvents] that are of type [TextNote] */
val notes: Flow<Event> by lazy { allEvents.filter { it.kind == TextNote.kind } }

/** The subset of allEvents that are of type EncryptedDm */
/** The subset of [allEvents] that are of type [EncryptedDm] */
val directMessages: Flow<Event> by lazy { allEvents.filter { it.kind == EncryptedDm.kind } }

/** The subset of allEvents that are of type UserMetaData */
/** The subset of [allEvents] that are of type [UserMetaData] */
val userMetaData: Flow<Event> by lazy { allEvents.filter { it.kind == UserMetaData.kind } }

/** The subset of allEvents that are of type Reaction */
/** The subset of [allEvents] that are of type [Reaction] */
val reactions: Flow<Event> by lazy { allEvents.filter { it.kind == Reaction.kind } }
}
18 changes: 13 additions & 5 deletions lib/src/main/kotlin/app/cash/nostrino/client/RelayClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch
Expand All @@ -55,18 +58,21 @@ class RelayClient(
private val jsonListAdapter = moshi.adapter(List::class.java)
private val queuedMessages = MutableSharedFlow<String>(replay = 512)
private val listener = RelayListener(url, this)
private val receivedMessages: Flow<RelayMessage> by lazy { listener.messages() }
private val subscriptions: MutableMap<Subscription, Set<Filter>> = Collections.synchronizedMap(mutableMapOf())

private var connectionState = Disconnected
private var _connectionState = MutableStateFlow(Disconnected)
val connectionState : StateFlow<ConnectionState> get() = _connectionState.asStateFlow() // TODO add tests
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is important so we can show users the current state of the relays


private var messageSendingJob: Job? = null
private var socket: WebSocket? = null

override val relayMessages: Flow<RelayMessage> by lazy { listener.messages() }

override fun start() {
if (connectionState == Disconnected) {
if (connectionState.value == Disconnected) {
logger.info { "Connecting to $url" }
socket = client.newWebSocket(Request.Builder().url(url).build(), listener)
connectionState = Connecting
_connectionState.value = Connecting
}
}

Expand All @@ -79,6 +85,8 @@ class RelayClient(
Disconnected -> connect()
Disconnecting -> stopTalking()
}

_connectionState.value = newState
}

override fun stop() {
Expand All @@ -101,7 +109,7 @@ class RelayClient(
send(listOf("CLOSE", subscription.id))
}

override val allEvents: Flow<Event> by lazy { receivedMessages.filterIsInstance<EventMessage>().map { it.event } }
override val allEvents: Flow<Event> by lazy { relayMessages.filterIsInstance<EventMessage>().map { it.event } }

private fun send(message: List<Any>) {
queuedMessages.tryEmit(jsonListAdapter.toJson(message))
Expand Down
20 changes: 16 additions & 4 deletions lib/src/main/kotlin/app/cash/nostrino/client/RelaySet.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@

package app.cash.nostrino.client

import app.cash.nostrino.message.relay.EventMessage
import app.cash.nostrino.message.relay.RelayMessage
import app.cash.nostrino.model.Event
import app.cash.nostrino.model.Filter
import com.google.common.cache.CacheBuilder
import com.google.common.cache.CacheLoader
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.filterNot
import kotlinx.coroutines.flow.flattenMerge
import kotlinx.coroutines.flow.map
Expand All @@ -45,16 +48,25 @@ data class RelaySet(
override fun unsubscribe(subscription: Subscription) = relays.forEach { it.unsubscribe(subscription) }

@OptIn(ExperimentalCoroutinesApi::class)
override val allEvents: Flow<Event> by lazy {
override val relayMessages: Flow<RelayMessage> by lazy {
val cache = CacheBuilder.newBuilder()
.maximumSize(4096)
.build<ByteString, Boolean>(CacheLoader.from { _ -> false })
relays.map { it.allEvents }.asFlow()

relays.map { it.relayMessages }.asFlow()
.flattenMerge()
.filterNot { cache.get(it.id) }
.filterNot {
it is EventMessage && cache.get(it.event.id)
}
.map {
cache.put(it.id, true)
if(it is EventMessage) {
cache.put(it.event.id, true)
}
it
Comment on lines +58 to 65
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll have to find a better way to do this.
If multiple subscriptions share the same message, only one of them will receive it

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raised as #58

}
}

override val allEvents: Flow<Event> by lazy {
relayMessages.filterIsInstance<EventMessage>().map { it.event }
}
}
32 changes: 31 additions & 1 deletion lib/src/test/kotlin/app/cash/nostrino/client/RelaySetTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,31 @@
package app.cash.nostrino.client

import app.cash.nostrino.crypto.PubKeyTest.Companion.arbPubKey
import app.cash.nostrino.message.relay.EventMessage
import app.cash.nostrino.message.relay.RelayMessage
import app.cash.nostrino.model.ArbEvent.arbEvent
import app.cash.nostrino.model.ArbEvent.arbEventMessage
import app.cash.nostrino.model.ArbEvent.arbRelayMessage
import app.cash.nostrino.model.ArbEvent.arbSubscriptionId
import app.cash.nostrino.model.Event
import app.cash.nostrino.model.Filter
import io.kotest.core.spec.style.StringSpec
import io.kotest.matchers.collections.shouldBeSameSizeAs
import io.kotest.matchers.collections.shouldContainAll
import io.kotest.matchers.collections.shouldContainExactly
import io.kotest.matchers.equals.shouldBeEqual
import io.kotest.matchers.shouldBe
import io.kotest.property.Arb
import io.kotest.property.arbitrary.list
import io.kotest.property.arbitrary.next
import io.kotest.property.checkAll
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.toList

class RelaySetTest : StringSpec({

fun relaySet(): RelaySet = RelaySet(
setOf(FakeRelay(), FakeRelay(), FakeRelay())
)
Expand Down Expand Up @@ -94,6 +103,8 @@ class RelaySetTest : StringSpec({
FakeRelay(events.drop(4).take(2).toMutableList())
)
)
set.subscribe(Filter.globalFeedNotes)

set.allEvents.toList() shouldContainExactly events
}

Expand Down Expand Up @@ -128,6 +139,19 @@ class RelaySetTest : StringSpec({
set.send(events.first())
set.allEvents.toList() shouldContainExactly events
}

"merge relay messages into a single flow" {
val events = Arb.list(arbEvent, 20..20).next().distinct().take(6).toMutableList()
val set = RelaySet(
setOf(
FakeRelay(events),
FakeRelay(events),
FakeRelay(events)
)
)

set.relayMessages.toList().filterIsInstance<EventMessage>().map { it.event } shouldBeEqual events
}
})

class FakeRelay(val sent: MutableList<Event> = mutableListOf()) : Relay() {
Expand Down Expand Up @@ -157,5 +181,11 @@ class FakeRelay(val sent: MutableList<Event> = mutableListOf()) : Relay() {
unsubscriptions.add(subscription)
}

override val relayMessages: Flow<RelayMessage>
get() = sent.asSequence()
.zip(arbSubscriptionId.samples())
.map { (event, id) -> EventMessage(id.value, event) }
.asFlow()

override val allEvents: Flow<Event> = sent.asFlow()
}
Loading