Description
Problem Statement
With the recent surge in Large Language Models (LLMs) and AI agents, there's an increasing demand for AI systems to access the latest data from various sources, including databases and public APIs. The Model Context Protocol (MCP) has emerged as a universal, open standard for facilitating these connections. This presents an opportunity for OpenSearch to join this AI revolution by supporting MCP servers and exposing OpenSearch APIs as tools. This integration would allow diverse AI systems to interact with OpenSearch with minimal modifications.
Proposed Solutions
MCP is working based on JSON-RPC, so an example of index/_mapping
API would look like below:
Request
{
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"arguments": {
"parameters": {
"indices": "test_index"
}
}
}
}
Response
{
"jsonrpc": "2.0",
"id": 1,
"result": {
"mappings": {
"properties": {
"text": {
"type": "text"
}
}
},
"settings": {
"index": {
"translog": {
"sync_interval": "30s",
"durability": "async"
},
"number_of_replicas": "1",
"number_of_shards": "2"
}
}
}
}
We are considering two different approaches to support MCP server in OpenSearch:
Option1: Using official MCP SDK
Implementing MCP using MCP SDK is easy and straightforward, SDK offers a bunch of APIs that can be used to manage the MCP server, like adding tools, updating tools, etc. A simple POC like below:
private static final int PORT = 8080;
private static final String MESSAGE_ENDPOINT = "/sse/message";
public static void runMCPServer(McpServerFeatures.AsyncToolSpecification... toolSpecifications) {
WebFluxSseServerTransportProvider mcpServerTransportProvider = new WebFluxSseServerTransportProvider(new ObjectMapper(), MESSAGE_ENDPOINT);
HttpHandler httpHandler = RouterFunctions.toHttpHandler(mcpServerTransportProvider.getRouterFunction());
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
DisposableServer httpServer = HttpServer.create().port(PORT).handle(adapter).bindNow();
System.out.println("http server started");
McpServer
.async(mcpServerTransportProvider)
.capabilities(McpSchema.ServerCapabilities.builder()
.tools(true)
.logging()
.build())
.serverInfo("test-server", "1.0.0")
.tools(toolSpecifications).build();
httpServer.onDispose().block();
}
MCPWebFlux.runMCPServer(getWebSearchToolSpecification());
private static McpServerFeatures.AsyncToolSpecification getWebSearchToolSpecification() {
var schema = """
{
"type" : "object",
"properties" : {
"parameters" : {
"type" : "object",
"question": {
"type": "string"
}
}
}
}
""";
return new McpServerFeatures.AsyncToolSpecification(
new McpSchema.Tool("websearch", "web search and crawl", schema),
(exchange, arguments) -> {
WebSearchTool pplTool = WebSearchTool.Factory.getInstance().create(ImmutableMap.of("engine", "duckduckgo"));
Map<String, String> params = (Map<String, String>) arguments.get("parameters");
PlainActionFuture<String> actionFuture = PlainActionFuture.newFuture();
pplTool.run(params, actionFuture);
try {
String result = actionFuture.get(50, TimeUnit.SECONDS);
return Mono.just(new McpSchema.CallToolResult(
List.of(new McpSchema.TextContent(result)), false));
} catch (InterruptedException | TimeoutException | ExecutionException e) {
return Mono.just(new McpSchema.CallToolResult(
ImmutableList.of(new McpSchema.TextContent(e.getMessage())), true));
}
}
);
}
pros
- Easy to implement as official SDK implemented all the SSE and API details and exposes APIs to user.
- Easy to maintain, whenever a new version MCP SDK released, it’s easy to upgrade the version to get all the benefits of new version.
cons
Currently the MCP SDK doesn’t have auth logic implemented, we need to wait for the latest release or work around this manually.
Option2: Implement MCP protocol manually in OpenSearch plugin
Implement MCP using OpenSearch REST handler is another approach, in this approach, we need to implement the methods of the MCP protocol, some critical methods are mandatory to implement:
- start session: sse
- initiate session: initialize
- initialized: notifications/initialized
- tool call: tools/call
- cancel: notifications/cancelled
Some of the methods is better to have like:
- prompt list: prompts/list
- prompt get: prompts/get
- prompt change notification: notifications/prompts/list_changed
Above is only the minimal set of methods that is mandatory to implement to provide a minimal tool call and prompt enabled MCP server, there are more methods needs to be implemented to make it a complete MCP server, although some of them are not necessary to implement in OpenSearch as there isn’t use cases, e.g. resources.
A simple POC looks like this:
private static final Map<String, StreamingRestChannel> CHANNELS = new ConcurrentHashMap<>();
@Override
public List<Route> routes() {
return List.of(new Route(GET, "/mcp/sse"));
}
@Override
public String getName() {
return "streaming_mcp_sse";
}
@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) {
final String sessionId = request.param("sessionId");
final StreamingRestChannelConsumer consumer = (channel) -> {
channel.prepareResponse(RestStatus.OK,
Map.of(
"Content-Type", List.of("text/event-stream"),
"Cache-Control", List.of("no-cache"),
"Connection", List.of("keep-alive"),
"Transfer-Encoding", List.of("chunked")
));
if (sessionId == null) {
String generatedId = UUID.randomUUID().toString();
CHANNELS.put(generatedId, channel);
Mono.just(String.format("event: endpoint\ndata:/sse/message?sessionId=%s\n\n", generatedId))
.delayElement(Duration.ofSeconds(1))
.flatMap(message -> Mono.fromRunnable(() -> sendSseEvent(channel, message)))
.onErrorResume(ex -> {
log.error("Error sending endpoint event: {}", ex.getMessage());
return Mono.empty();
}).subscribe();
} else {
StreamingRestChannel channel1 = CHANNELS.get(sessionId);
Mono.just("event: message\ndata:initialized successfully\n\n")
.delayElement(Duration.ofSeconds(1))
.flatMap(x -> Mono.fromRunnable(() -> sendSseEvent(channel1, x)))
.onErrorResume(ex -> {
log.error("Error sending message event: {}", ex.getMessage());
return Mono.empty();
}).subscribe();
}
final ActionRequestValidationException validationError = new ActionRequestValidationException();
validationError.addValidationError("Unable to initiate request / response streaming over non-streaming channel");
};
return channel -> {
if (channel instanceof StreamingRestChannel) {
consumer.accept((StreamingRestChannel) channel);
} else {
final ActionRequestValidationException validationError = new ActionRequestValidationException();
validationError.addValidationError("Unable to initiate request / response streaming over non-streaming channel");
channel.sendResponse(new BytesRestResponse(channel, validationError));
}
};
}
@Override
public boolean supportsContentStream() {
return true;
}
@Override
public boolean supportsStreaming() {
return true;
}
@Override
public boolean allowsUnsafeBuffers() {
return true;
}
private void sendSseEvent(StreamingRestChannel channel, String result) {
BytesReference content = BytesReference.fromByteBuffer(ByteBuffer.wrap(result.getBytes(StandardCharsets.UTF_8)));
channel.sendChunk(new HttpChunk() {
@Override
public void close() {
}
@Override
public boolean isLast() {
return false;
}
@Override
public BytesReference content() {
return content;
}
});
}
pros
- We don’t need to open up a new port in plugin, the default OpenSearch port will be used to handle the MCP request.
- The authn/authz is aligned with OpenSearch, there isn’t a necessity to implement the auth manually.
cons
- Much more effort as we need to implement the protocol details manually.
- We need to take care of the SSE implementation our own.
Related component
Other
Describe alternatives you've considered
No response
Additional context
No response