-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Expose relay messages and connection state #54
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,3 +13,4 @@ docker-sources/ | |
/gradle/wrapper/ | ||
/.hermit | ||
local.properties | ||
**.DS_Store |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,8 @@ | |
|
||
package app.cash.nostrino.client | ||
|
||
import app.cash.nostrino.message.relay.RelayMessage | ||
import app.cash.nostrino.message.relay.EventMessage | ||
import app.cash.nostrino.model.EncryptedDm | ||
import app.cash.nostrino.model.Event | ||
import app.cash.nostrino.model.Filter | ||
|
@@ -52,18 +54,21 @@ abstract class Relay { | |
/** Unsubscribe from a subscription */ | ||
abstract fun unsubscribe(subscription: Subscription) | ||
|
||
/** All events transmitted by this relay for our active subscriptions */ | ||
/** All messages transmitted by this relay for our active subscriptions */ | ||
abstract val relayMessages : Flow<RelayMessage> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Exposing all messages so we can handle them on the app-side. |
||
|
||
/** The subset of [RelayMessage] that only contain messages of type [EventMessage] */ | ||
abstract val allEvents: Flow<Event> | ||
|
||
/** The subset of allEvents that are of type TextNote */ | ||
/** The subset of [allEvents] that are of type [TextNote] */ | ||
val notes: Flow<Event> by lazy { allEvents.filter { it.kind == TextNote.kind } } | ||
|
||
/** The subset of allEvents that are of type EncryptedDm */ | ||
/** The subset of [allEvents] that are of type [EncryptedDm] */ | ||
val directMessages: Flow<Event> by lazy { allEvents.filter { it.kind == EncryptedDm.kind } } | ||
|
||
/** The subset of allEvents that are of type UserMetaData */ | ||
/** The subset of [allEvents] that are of type [UserMetaData] */ | ||
val userMetaData: Flow<Event> by lazy { allEvents.filter { it.kind == UserMetaData.kind } } | ||
|
||
/** The subset of allEvents that are of type Reaction */ | ||
/** The subset of [allEvents] that are of type [Reaction] */ | ||
val reactions: Flow<Event> by lazy { allEvents.filter { it.kind == Reaction.kind } } | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,6 +35,9 @@ import kotlinx.coroutines.coroutineScope | |
import kotlinx.coroutines.delay | ||
import kotlinx.coroutines.flow.Flow | ||
import kotlinx.coroutines.flow.MutableSharedFlow | ||
import kotlinx.coroutines.flow.MutableStateFlow | ||
import kotlinx.coroutines.flow.StateFlow | ||
import kotlinx.coroutines.flow.asStateFlow | ||
import kotlinx.coroutines.flow.filterIsInstance | ||
import kotlinx.coroutines.flow.map | ||
import kotlinx.coroutines.launch | ||
|
@@ -55,18 +58,21 @@ class RelayClient( | |
private val jsonListAdapter = moshi.adapter(List::class.java) | ||
private val queuedMessages = MutableSharedFlow<String>(replay = 512) | ||
private val listener = RelayListener(url, this) | ||
private val receivedMessages: Flow<RelayMessage> by lazy { listener.messages() } | ||
private val subscriptions: MutableMap<Subscription, Set<Filter>> = Collections.synchronizedMap(mutableMapOf()) | ||
|
||
private var connectionState = Disconnected | ||
private var _connectionState = MutableStateFlow(Disconnected) | ||
val connectionState : StateFlow<ConnectionState> get() = _connectionState.asStateFlow() // TODO add tests | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is important so we can show users the current state of the relays |
||
|
||
private var messageSendingJob: Job? = null | ||
private var socket: WebSocket? = null | ||
|
||
override val relayMessages: Flow<RelayMessage> by lazy { listener.messages() } | ||
|
||
override fun start() { | ||
if (connectionState == Disconnected) { | ||
if (connectionState.value == Disconnected) { | ||
logger.info { "Connecting to $url" } | ||
socket = client.newWebSocket(Request.Builder().url(url).build(), listener) | ||
connectionState = Connecting | ||
_connectionState.value = Connecting | ||
} | ||
} | ||
|
||
|
@@ -79,6 +85,8 @@ class RelayClient( | |
Disconnected -> connect() | ||
Disconnecting -> stopTalking() | ||
} | ||
|
||
_connectionState.value = newState | ||
} | ||
|
||
override fun stop() { | ||
|
@@ -101,7 +109,7 @@ class RelayClient( | |
send(listOf("CLOSE", subscription.id)) | ||
} | ||
|
||
override val allEvents: Flow<Event> by lazy { receivedMessages.filterIsInstance<EventMessage>().map { it.event } } | ||
override val allEvents: Flow<Event> by lazy { relayMessages.filterIsInstance<EventMessage>().map { it.event } } | ||
|
||
private fun send(message: List<Any>) { | ||
queuedMessages.tryEmit(jsonListAdapter.toJson(message)) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,13 +16,16 @@ | |
|
||
package app.cash.nostrino.client | ||
|
||
import app.cash.nostrino.message.relay.EventMessage | ||
import app.cash.nostrino.message.relay.RelayMessage | ||
import app.cash.nostrino.model.Event | ||
import app.cash.nostrino.model.Filter | ||
import com.google.common.cache.CacheBuilder | ||
import com.google.common.cache.CacheLoader | ||
import kotlinx.coroutines.ExperimentalCoroutinesApi | ||
import kotlinx.coroutines.flow.Flow | ||
import kotlinx.coroutines.flow.asFlow | ||
import kotlinx.coroutines.flow.filterIsInstance | ||
import kotlinx.coroutines.flow.filterNot | ||
import kotlinx.coroutines.flow.flattenMerge | ||
import kotlinx.coroutines.flow.map | ||
|
@@ -45,16 +48,25 @@ data class RelaySet( | |
override fun unsubscribe(subscription: Subscription) = relays.forEach { it.unsubscribe(subscription) } | ||
|
||
@OptIn(ExperimentalCoroutinesApi::class) | ||
override val allEvents: Flow<Event> by lazy { | ||
override val relayMessages: Flow<RelayMessage> by lazy { | ||
val cache = CacheBuilder.newBuilder() | ||
.maximumSize(4096) | ||
.build<ByteString, Boolean>(CacheLoader.from { _ -> false }) | ||
relays.map { it.allEvents }.asFlow() | ||
|
||
relays.map { it.relayMessages }.asFlow() | ||
.flattenMerge() | ||
.filterNot { cache.get(it.id) } | ||
.filterNot { | ||
it is EventMessage && cache.get(it.event.id) | ||
} | ||
.map { | ||
cache.put(it.id, true) | ||
if(it is EventMessage) { | ||
cache.put(it.event.id, true) | ||
} | ||
it | ||
Comment on lines
+58
to
65
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll have to find a better way to do this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Raised as #58 |
||
} | ||
} | ||
|
||
override val allEvents: Flow<Event> by lazy { | ||
relayMessages.filterIsInstance<EventMessage>().map { it.event } | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My Android builds were hanging on the newer version of guava. (historically, android and guava don't get along very well)
Any appetite for finding a different solution for the cache?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added #57