Skip to content
4 changes: 4 additions & 0 deletions kotlin-sdk-server/api/kotlin-sdk-server.api
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ public class io/modelcontextprotocol/kotlin/sdk/server/Server {
public final fun onConnect (Lkotlin/jvm/functions/Function0;)V
public final fun onInitialized (Lkotlin/jvm/functions/Function0;)V
public final fun ping (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun removeNotificationHandler (Lio/modelcontextprotocol/kotlin/sdk/types/Method;)V
public final fun removeNotificationHandler (Ljava/lang/String;Lio/modelcontextprotocol/kotlin/sdk/types/Method;)V
public final fun removePrompt (Ljava/lang/String;)Z
public final fun removePrompts (Ljava/util/List;)I
public final fun removeResource (Ljava/lang/String;)Z
Expand All @@ -93,6 +95,8 @@ public class io/modelcontextprotocol/kotlin/sdk/server/Server {
public final fun sendResourceListChanged (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun sendResourceUpdated (Ljava/lang/String;Lio/modelcontextprotocol/kotlin/sdk/types/ResourceUpdatedNotification;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun sendToolListChanged (Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun setNotificationHandler (Lio/modelcontextprotocol/kotlin/sdk/types/Method;Lkotlin/jvm/functions/Function1;)V
public final fun setNotificationHandler (Ljava/lang/String;Lio/modelcontextprotocol/kotlin/sdk/types/Method;Lkotlin/jvm/functions/Function1;)V
}

public final class io/modelcontextprotocol/kotlin/sdk/server/ServerOptions : io/modelcontextprotocol/kotlin/sdk/shared/ProtocolOptions {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,17 @@ import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.getAndUpdate
import kotlinx.atomicfu.update
import kotlinx.collections.immutable.minus
import kotlinx.collections.immutable.persistentListOf
import kotlinx.collections.immutable.persistentMapOf
import kotlinx.collections.immutable.toPersistentSet

/**
* A listener interface for receiving notifications about feature changes in registry.
*/
internal interface FeatureListener {
fun onFeatureUpdated(featureKey: String)
}

/**
* A generic registry for managing features of a specified type. This class provides thread-safe
* operations for adding, removing, and retrieving features from the registry.
Expand All @@ -33,15 +41,28 @@ internal class FeatureRegistry<T : Feature>(private val featureType: String) {
internal val values: Map<FeatureKey, T>
get() = registry.value

private val listeners = atomic(persistentListOf<FeatureListener>())

internal fun addListener(listener: FeatureListener) {
listeners.update { it.add(listener) }
}

internal fun removeListener(listener: FeatureListener) {
listeners.update { it.remove(listener) }
}

/**
* Adds the specified feature to the registry.
*
* @param feature The feature to be added to the registry.
*/
internal fun add(feature: T) {
logger.info { "Adding $featureType: \"${feature.key}\"" }
registry.update { current -> current.put(feature.key, feature) }
val oldMap = registry.getAndUpdate { current -> current.put(feature.key, feature) }
val oldFeature = oldMap[feature.key]

logger.info { "Added $featureType: \"${feature.key}\"" }
notifyFeatureUpdated(oldFeature, feature)
}

/**
Expand All @@ -52,8 +73,13 @@ internal class FeatureRegistry<T : Feature>(private val featureType: String) {
*/
internal fun addAll(features: List<T>) {
logger.info { "Adding ${featureType}s: ${features.size}" }
registry.update { current -> current.putAll(features.associateBy { it.key }) }
val oldMap = registry.getAndUpdate { current -> current.putAll(features.associateBy { it.key }) }

logger.info { "Added ${featureType}s: ${features.size}" }
for (feature in features) {
val oldFeature = oldMap[feature.key]
notifyFeatureUpdated(oldFeature, feature)
}
}

/**
Expand All @@ -66,15 +92,17 @@ internal class FeatureRegistry<T : Feature>(private val featureType: String) {
logger.info { "Removing $featureType: \"$key\"" }
val oldMap = registry.getAndUpdate { current -> current.remove(key) }

val removed = key in oldMap
logger.info {
if (removed) {
"Removed $featureType: \"$key\""
} else {
"$featureType not found: \"$key\""
}
val removedFeature = oldMap[key]
val removed = removedFeature != null

if (removed) {
logger.info { "Removed $featureType: \"$key\"" }
notifyFeatureUpdated(removedFeature, null)
} else {
logger.info { "$featureType not found: \"$key\"" }
}
return key in oldMap

return removed
}

/**
Expand All @@ -87,13 +115,16 @@ internal class FeatureRegistry<T : Feature>(private val featureType: String) {
logger.info { "Removing ${featureType}s: ${keys.size}" }
val oldMap = registry.getAndUpdate { current -> current - keys.toPersistentSet() }

val removedCount = keys.count { it in oldMap }
logger.info {
if (removedCount > 0) {
"Removed ${featureType}s: $removedCount"
} else {
"No $featureType were removed"
}
val removedFeatures = keys.mapNotNull { oldMap[it] }
val removedCount = removedFeatures.size

if (removedCount > 0) {
logger.info { "Removed ${featureType}s: $removedCount" }
} else {
logger.info { "No $featureType were removed" }
}
removedFeatures.forEach {
notifyFeatureUpdated(it, null)
}

return removedCount
Expand All @@ -108,13 +139,22 @@ internal class FeatureRegistry<T : Feature>(private val featureType: String) {
internal fun get(key: FeatureKey): T? {
logger.info { "Getting $featureType: \"$key\"" }
val feature = registry.value[key]
logger.info {
if (feature != null) {
"Got $featureType: \"$key\""
} else {
"$featureType not found: \"$key\""
}
if (feature != null) {
logger.info { "Got $featureType: \"$key\"" }
} else {
logger.info { "$featureType not found: \"$key\"" }
}

return feature
}

private fun notifyFeatureUpdated(oldFeature: T?, newFeature: T?) {
val featureKey = (oldFeature?.key ?: newFeature?.key) ?: run {
logger.error { "Notification should have feature key, but none found" }
return
}

logger.info { "Notifying listeners on feature update" }
listeners.value.forEach { it.onFeatureUpdated(featureKey) }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,24 @@ import io.modelcontextprotocol.kotlin.sdk.types.ListToolsRequest
import io.modelcontextprotocol.kotlin.sdk.types.ListToolsResult
import io.modelcontextprotocol.kotlin.sdk.types.LoggingMessageNotification
import io.modelcontextprotocol.kotlin.sdk.types.Method
import io.modelcontextprotocol.kotlin.sdk.types.Notification
import io.modelcontextprotocol.kotlin.sdk.types.Prompt
import io.modelcontextprotocol.kotlin.sdk.types.PromptArgument
import io.modelcontextprotocol.kotlin.sdk.types.ReadResourceRequest
import io.modelcontextprotocol.kotlin.sdk.types.ReadResourceResult
import io.modelcontextprotocol.kotlin.sdk.types.Resource
import io.modelcontextprotocol.kotlin.sdk.types.ResourceUpdatedNotification
import io.modelcontextprotocol.kotlin.sdk.types.ServerCapabilities
import io.modelcontextprotocol.kotlin.sdk.types.SubscribeRequest
import io.modelcontextprotocol.kotlin.sdk.types.TextContent
import io.modelcontextprotocol.kotlin.sdk.types.Tool
import io.modelcontextprotocol.kotlin.sdk.types.ToolAnnotations
import io.modelcontextprotocol.kotlin.sdk.types.ToolSchema
import io.modelcontextprotocol.kotlin.sdk.types.UnsubscribeRequest
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.Deferred
import kotlinx.serialization.json.JsonObject
import kotlin.time.ExperimentalTime

private val logger = KotlinLogging.logger {}

Expand All @@ -58,6 +63,11 @@ public class ServerOptions(public val capabilities: ServerCapabilities, enforceS
* You can register tools, prompts, and resources using [addTool], [addPrompt], and [addResource].
* The server will then automatically handle listing and retrieval requests from the client.
*
* In case the server supports feature list notification or resource substitution,
* the server will automatically send notifications for all connected clients.
* Currently, after subscription to a resource, the server will NOT send the subscription confirmation
* as this response schema is not defined in the protocol.
*
* @param serverInfo Information about this server implementation (name, version).
* @param options Configuration options for the server.
* @param instructionsProvider Optional provider for instructions from the server to the client about how to use
Expand Down Expand Up @@ -86,14 +96,6 @@ public open class Server(
block: Server.() -> Unit = {},
) : this(serverInfo, options, { instructions }, block)

private val sessionRegistry = ServerSessionRegistry()

/**
* Provides a snapshot of all sessions currently registered in the server
*/
public val sessions: Map<ServerSessionKey, ServerSession>
get() = sessionRegistry.sessions

@Suppress("ktlint:standard:backing-property-naming")
private var _onInitialized: (() -> Unit) = {}

Expand All @@ -103,14 +105,51 @@ public open class Server(
@Suppress("ktlint:standard:backing-property-naming")
private var _onClose: () -> Unit = {}

private val toolRegistry = FeatureRegistry<RegisteredTool>("Tool")
private val promptRegistry = FeatureRegistry<RegisteredPrompt>("Prompt")
private val resourceRegistry = FeatureRegistry<RegisteredResource>("Resource")
@OptIn(ExperimentalTime::class)
private val notificationService = FeatureNotificationService()

private val sessionRegistry = ServerSessionRegistry()

private val toolRegistry = FeatureRegistry<RegisteredTool>("Tool").apply {
if (options.capabilities.tools?.listChanged ?: false) {
addListener(notificationService.toolListChangedListener)
}
}
private val promptRegistry = FeatureRegistry<RegisteredPrompt>("Prompt").apply {
if (options.capabilities.prompts?.listChanged ?: false) {
addListener(notificationService.promptListChangedListener)
}
}
private val resourceRegistry = FeatureRegistry<RegisteredResource>("Resource").apply {
if (options.capabilities.resources?.listChanged ?: false) {
addListener(notificationService.resourceListChangedListener)
}
if (options.capabilities.resources?.subscribe ?: false) {
addListener(notificationService.resourceUpdatedListener)
}
}

/**
* Provides a snapshot of all sessions currently registered in the server
*/
public val sessions: Map<ServerSessionKey, ServerSession>
get() = sessionRegistry.sessions

/**
* Provides a snapshot of all tools currently registered in the server
*/
public val tools: Map<String, RegisteredTool>
get() = toolRegistry.values

/**
* Provides a snapshot of all prompts currently registered in the server
*/
public val prompts: Map<String, RegisteredPrompt>
get() = promptRegistry.values

/**
* Provides a snapshot of all resources currently registered in the server
*/
public val resources: Map<String, RegisteredResource>
get() = resourceRegistry.values

Expand All @@ -120,6 +159,7 @@ public open class Server(

public suspend fun close() {
logger.debug { "Closing MCP server" }
notificationService.close()
sessions.forEach { (sessionId, session) ->
logger.info { "Closing session $sessionId" }
session.close()
Expand Down Expand Up @@ -182,17 +222,32 @@ public open class Server(
session.setRequestHandler<ListResourceTemplatesRequest>(Method.Defined.ResourcesTemplatesList) { _, _ ->
handleListResourceTemplates()
}
if (options.capabilities.resources?.subscribe ?: false) {
session.setRequestHandler<SubscribeRequest>(Method.Defined.ResourcesSubscribe) { request, _ ->
handleSubscribeResources(session, request)
// Does not return any confirmation as the structure is not stated in the protocol
null
}
session.setRequestHandler<UnsubscribeRequest>(Method.Defined.ResourcesUnsubscribe) { request, _ ->
handleUnsubscribeResources(session, request)
// Does not return any confirmation as the structure is not stated in the protocol
null
}
}
}

// Register cleanup handler to remove session from list when it closes
session.onClose {
logger.debug { "Removing closed session from active sessions list" }
notificationService.unsubscribeSession(session)
sessionRegistry.removeSession(session.sessionId)
}

logger.debug { "Server session connecting to transport" }
session.connect(transport)
logger.debug { "Server session successfully connected to transport" }
sessionRegistry.addSession(session)
notificationService.subscribeSession(session)

_onConnect()
return session
Expand Down Expand Up @@ -484,7 +539,25 @@ public open class Server(
}

// --- Internal Handlers ---
private suspend fun handleListTools(): ListToolsResult {
private fun handleSubscribeResources(session: ServerSession, request: SubscribeRequest) {
if (options.capabilities.resources?.subscribe ?: false) {
logger.debug { "Subscribing to resources" }
notificationService.subscribeToResourceUpdate(session, request.params.uri)
} else {
logger.debug { "Failed to subscribe to resources: Server does not support resources capability" }
}
}

private fun handleUnsubscribeResources(session: ServerSession, request: UnsubscribeRequest) {
if (options.capabilities.resources?.subscribe ?: false) {
logger.debug { "Unsubscribing from resources" }
notificationService.unsubscribeFromResourceUpdate(session, request.params.uri)
} else {
logger.debug { "Failed to unsubscribe from resources: Server does not support resources capability" }
}
}

private fun handleListTools(): ListToolsResult {
val toolList = tools.values.map { it.tool }
return ListToolsResult(tools = toolList, nextCursor = null)
}
Expand Down Expand Up @@ -518,7 +591,7 @@ public open class Server(
}
}

private suspend fun handleListPrompts(): ListPromptsResult {
private fun handleListPrompts(): ListPromptsResult {
logger.debug { "Handling list prompts request" }
return ListPromptsResult(prompts = prompts.values.map { it.prompt })
}
Expand All @@ -534,7 +607,7 @@ public open class Server(
return prompt.messageProvider(request)
}

private suspend fun handleListResources(): ListResourcesResult {
private fun handleListResources(): ListResourcesResult {
logger.debug { "Handling list resources request" }
return ListResourcesResult(resources = resources.values.map { it.resource })
}
Expand All @@ -550,7 +623,7 @@ public open class Server(
return resource.readHandler(request)
}

private suspend fun handleListResourceTemplates(): ListResourceTemplatesResult {
private fun handleListResourceTemplates(): ListResourceTemplatesResult {
// If you have resource templates, return them here. For now, return empty.
return ListResourceTemplatesResult(listOf())
}
Expand Down Expand Up @@ -675,4 +748,30 @@ public open class Server(
}
}
// End the ServerSession redirection section

// Start the notification handling section
public fun <T : Notification> setNotificationHandler(method: Method, handler: (notification: T) -> Deferred<Unit>) {
sessions.forEach { (_, session) ->
session.setNotificationHandler(method, handler)
}
}

public fun removeNotificationHandler(method: Method) {
sessions.forEach { (_, session) ->
session.removeNotificationHandler(method)
}
}

public fun <T : Notification> setNotificationHandler(
sessionId: String,
method: Method,
handler: (notification: T) -> Deferred<Unit>,
) {
sessionRegistry.getSessionOrNull(sessionId)?.setNotificationHandler(method, handler)
}

public fun removeNotificationHandler(sessionId: String, method: Method) {
sessionRegistry.getSessionOrNull(sessionId)?.removeNotificationHandler(method)
}
// End the notification handling section
}
Loading
Loading