Skip to content

Commit b1570ce

Browse files
committed
Fix review comments
Fix logging levels Extract send event handling
1 parent fde0d7a commit b1570ce

File tree

1 file changed

+66
-53
lines changed

1 file changed

+66
-53
lines changed

kotlin-sdk-server/src/commonMain/kotlin/io/modelcontextprotocol/kotlin/sdk/server/FeatureNotificationService.kt

Lines changed: 66 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -82,52 +82,7 @@ private class SessionNotificationJob {
8282
events.takeWhile { it !is EndEvent }.collect { event ->
8383
when (event) {
8484
is SendEvent -> {
85-
if (event.timestamp >= fromTimestamp) {
86-
when (val notification = event.notification) {
87-
is PromptListChangedNotification,
88-
is ResourceListChangedNotification,
89-
is ToolListChangedNotification,
90-
-> {
91-
logger.info {
92-
"Sending list changed notification for sessionId: ${session.sessionId}"
93-
}
94-
session.notification(notification)
95-
}
96-
97-
is ResourceUpdatedNotification -> {
98-
resourceSubscriptions.value[notification.params.uri]?.let { resourceFromTimestamp ->
99-
if (event.timestamp >= resourceFromTimestamp) {
100-
logger.info {
101-
"Sending resource updated notification for resource " +
102-
"${notification.params.uri} " +
103-
"to sessionId: ${session.sessionId}"
104-
}
105-
session.notification(notification)
106-
} else {
107-
logger.info {
108-
"Skipping resource updated notification for resource " +
109-
"${notification.params.uri} " +
110-
"as it is older than subscription timestamp $resourceFromTimestamp"
111-
}
112-
}
113-
} ?: run {
114-
logger.info {
115-
"No subscription for resource ${notification.params.uri}. " +
116-
"Skipping notification: $notification"
117-
}
118-
}
119-
}
120-
121-
else -> {
122-
logger.warn { "Skipping notification: $notification" }
123-
}
124-
}
125-
} else {
126-
logger.info {
127-
"Skipping event with id: ${event.timestamp} " +
128-
"as it is older than startingEventId $fromTimestamp: $event"
129-
}
130-
}
85+
handleSendNotificationEvent(event, session, fromTimestamp)
13186
}
13287

13388
else -> {
@@ -138,6 +93,58 @@ private class SessionNotificationJob {
13893
}
13994
}
14095

96+
/**
97+
* Handles sending a notification event to a specific server session.
98+
*
99+
* @param event The notification event to be processed.
100+
* @param session The server session to which the notification should be sent.
101+
* @param fromTimestamp The timestamp to filter events.
102+
* Notifications with timestamps older than this value are skipped.
103+
*/
104+
private suspend fun handleSendNotificationEvent(event: SendEvent, session: ServerSession, fromTimestamp: Long) {
105+
if (event.timestamp < fromTimestamp) {
106+
logger.info {
107+
"Skipping event with id: ${event.timestamp} as it is older than startingEventId $fromTimestamp: $event"
108+
}
109+
return
110+
}
111+
when (val notification = event.notification) {
112+
is PromptListChangedNotification,
113+
is ResourceListChangedNotification,
114+
is ToolListChangedNotification,
115+
-> {
116+
logger.info { "Sending list changed notification for sessionId: ${session.sessionId}" }
117+
session.notification(notification)
118+
}
119+
120+
is ResourceUpdatedNotification -> {
121+
resourceSubscriptions.value[notification.params.uri]?.let { resourceFromTimestamp ->
122+
if (event.timestamp >= resourceFromTimestamp) {
123+
logger.info {
124+
"Sending resource updated notification for resource ${notification.params.uri} " +
125+
"to sessionId: ${session.sessionId}"
126+
}
127+
session.notification(notification)
128+
} else {
129+
logger.info {
130+
"Skipping resource updated notification for resource ${notification.params.uri} " +
131+
"as it is older than subscription timestamp $resourceFromTimestamp"
132+
}
133+
}
134+
} ?: {
135+
logger.info {
136+
"No subscription for resource ${notification.params.uri}. " +
137+
"Skipping notification: $notification"
138+
}
139+
}
140+
}
141+
142+
else -> {
143+
logger.warn { "Skipping notification: $notification" }
144+
}
145+
}
146+
}
147+
141148
/**
142149
* Subscribes to a resource identified by the given feature key.
143150
*
@@ -157,10 +164,16 @@ private class SessionNotificationJob {
157164
resourceSubscriptions.getAndUpdate { it.remove(resourceKey) }
158165
}
159166

167+
/**
168+
* Waits for the notification service to complete its operations.
169+
*/
160170
suspend fun join() {
161171
job.join()
162172
}
163173

174+
/**
175+
* Cancels the notification service job.
176+
*/
164177
fun cancel() {
165178
job.cancel()
166179
}
@@ -216,7 +229,7 @@ internal class FeatureNotificationService(
216229
object : FeatureListener {
217230
override fun onFeatureUpdated(featureKey: FeatureKey) {
218231
val notification = notificationProvider(featureKey)
219-
logger.info { "Emitting notification: ${notification.method.value}" }
232+
logger.debug { "Emitting notification: ${notification.method.value}" }
220233
emit(notification)
221234
}
222235
}
@@ -247,7 +260,7 @@ internal class FeatureNotificationService(
247260

248261
val timestamp = getCurrentTimestamp()
249262
if (closingService.value) {
250-
logger.warn { "Skipping subscription notification as service is closing: ${session.sessionId}" }
263+
logger.debug { "Skipping subscription notification as service is closing: ${session.sessionId}" }
251264
return
252265
}
253266

@@ -321,17 +334,17 @@ internal class FeatureNotificationService(
321334
// Create a timestamp before emit to ensure notifications are processed in order
322335
val timestamp = getCurrentTimestamp()
323336
if (closingService.value) {
324-
logger.warn { "Skipping emitting notification as service is closing: $notification" }
337+
logger.debug { "Skipping emitting notification as service is closing: $notification" }
325338
return
326339
}
327340

328341
logger.info { "Emitting notification $timestamp: $notification" }
329342

330343
// Launching emit lazily to put it to the jobs queue before the completion
331344
val job = notificationScope.launch(start = CoroutineStart.LAZY) {
332-
logger.info { "Actually emitting notification $timestamp: $notification" }
345+
logger.debug { "Actually emitting notification $timestamp: $notification" }
333346
notificationEvents.emit(SendEvent(timestamp, notification))
334-
logger.info { "Notification emitted $timestamp: $notification" }
347+
logger.debug { "Notification emitted $timestamp: $notification" }
335348
}
336349

337350
// Add job to set before starting
@@ -354,7 +367,7 @@ internal class FeatureNotificationService(
354367
logger.info { "Closing feature notification service" }
355368
closingService.compareAndSet(false, update = true)
356369

357-
// Making sure all emit jobs are completed
370+
// Making sure all emitting jobs are completed
358371
activeEmitJobs.value.joinAll()
359372

360373
// Emitting end event to complete all session notification jobs
@@ -364,7 +377,7 @@ internal class FeatureNotificationService(
364377
logger.info { "End event emitted" }
365378
}.join()
366379

367-
// Making sure all session notification jobs are completed (after receiving end event)
380+
// Making sure all session notification jobs are completed (after receiving the end event)
368381
sessionNotificationJobs.value.values.forEach { it.join() }
369382
// Cancelling notification scope to stop processing further events
370383
notificationScope.cancel()

0 commit comments

Comments
 (0)