6
6
7
7
import json
8
8
import logging
9
- from typing import Any , Dict , List , Optional
9
+ from typing import Any , Callable , Dict , List , Optional
10
10
11
11
import httpx
12
12
from fastapi import FastAPI
13
13
from fastapi .openapi .utils import get_openapi
14
14
from mcp .server .fastmcp import FastMCP
15
- from pydantic import Field
16
15
17
- logger = logging .getLogger ("fastapi_mcp" )
18
-
19
-
20
- def resolve_schema_references (schema : Dict [str , Any ], openapi_schema : Dict [str , Any ]) -> Dict [str , Any ]:
21
- """
22
- Resolve schema references in OpenAPI schemas.
16
+ from .openapi_utils import (
17
+ clean_schema_for_display ,
18
+ generate_example_from_schema ,
19
+ get_python_type_and_default ,
20
+ get_single_param_type_from_schema ,
21
+ resolve_schema_references ,
22
+ PYTHON_TYPE_IMPORTS ,
23
+ )
23
24
24
- Args:
25
- schema: The schema that may contain references
26
- openapi_schema: The full OpenAPI schema to resolve references from
27
-
28
- Returns:
29
- The schema with references resolved
30
- """
31
- # Make a copy to avoid modifying the input schema
32
- schema = schema .copy ()
33
-
34
- # Handle $ref directly in the schema
35
- if "$ref" in schema :
36
- ref_path = schema ["$ref" ]
37
- # Standard OpenAPI references are in the format "#/components/schemas/ModelName"
38
- if ref_path .startswith ("#/components/schemas/" ):
39
- model_name = ref_path .split ("/" )[- 1 ]
40
- if "components" in openapi_schema and "schemas" in openapi_schema ["components" ]:
41
- if model_name in openapi_schema ["components" ]["schemas" ]:
42
- # Replace with the resolved schema
43
- ref_schema = openapi_schema ["components" ]["schemas" ][model_name ].copy ()
44
- # Remove the $ref key and merge with the original schema
45
- schema .pop ("$ref" )
46
- schema .update (ref_schema )
47
-
48
- # Handle array items
49
- if "type" in schema and schema ["type" ] == "array" and "items" in schema :
50
- schema ["items" ] = resolve_schema_references (schema ["items" ], openapi_schema )
51
-
52
- # Handle object properties
53
- if "properties" in schema :
54
- for prop_name , prop_schema in schema ["properties" ].items ():
55
- schema ["properties" ][prop_name ] = resolve_schema_references (prop_schema , openapi_schema )
56
-
57
- return schema
58
-
59
-
60
- def clean_schema_for_display (schema : Dict [str , Any ]) -> Dict [str , Any ]:
61
- """
62
- Clean up a schema for display by removing internal fields.
63
-
64
- Args:
65
- schema: The schema to clean
66
-
67
- Returns:
68
- The cleaned schema
69
- """
70
- # Make a copy to avoid modifying the input schema
71
- schema = schema .copy ()
72
-
73
- # Remove common internal fields that are not helpful for LLMs
74
- fields_to_remove = [
75
- "allOf" ,
76
- "anyOf" ,
77
- "oneOf" ,
78
- "nullable" ,
79
- "discriminator" ,
80
- "readOnly" ,
81
- "writeOnly" ,
82
- "xml" ,
83
- "externalDocs" ,
84
- ]
85
- for field in fields_to_remove :
86
- if field in schema :
87
- schema .pop (field )
88
-
89
- # Process nested properties
90
- if "properties" in schema :
91
- for prop_name , prop_schema in schema ["properties" ].items ():
92
- if isinstance (prop_schema , dict ):
93
- schema ["properties" ][prop_name ] = clean_schema_for_display (prop_schema )
94
-
95
- # Process array items
96
- if "type" in schema and schema ["type" ] == "array" and "items" in schema :
97
- if isinstance (schema ["items" ], dict ):
98
- schema ["items" ] = clean_schema_for_display (schema ["items" ])
99
-
100
- return schema
25
+ logger = logging .getLogger ("fastapi_mcp" )
101
26
102
27
103
28
def create_mcp_tools_from_openapi (
@@ -126,6 +51,9 @@ def create_mcp_tools_from_openapi(
126
51
routes = app .routes ,
127
52
)
128
53
54
+ # Resolve all references in the schema at once
55
+ resolved_openapi_schema = resolve_schema_references (openapi_schema , openapi_schema )
56
+
129
57
if not base_url :
130
58
# Try to determine the base URL from FastAPI config
131
59
if hasattr (app , "root_path" ) and app .root_path :
@@ -144,7 +72,7 @@ def create_mcp_tools_from_openapi(
144
72
base_url = base_url [:- 1 ]
145
73
146
74
# Process each path in the OpenAPI schema
147
- for path , path_item in openapi_schema .get ("paths" , {}).items ():
75
+ for path , path_item in resolved_openapi_schema .get ("paths" , {}).items ():
148
76
for method , operation in path_item .items ():
149
77
# Skip non-HTTP methods
150
78
if method not in ["get" , "post" , "put" , "delete" , "patch" ]:
@@ -167,11 +95,42 @@ def create_mcp_tools_from_openapi(
167
95
parameters = operation .get ("parameters" , []),
168
96
request_body = operation .get ("requestBody" , {}),
169
97
responses = operation .get ("responses" , {}),
170
- openapi_schema = openapi_schema ,
98
+ openapi_schema = resolved_openapi_schema ,
171
99
describe_all_responses = describe_all_responses ,
172
100
describe_full_response_schema = describe_full_response_schema ,
173
101
)
174
102
103
+ def _create_http_tool_function (function_template : Callable , properties : Dict [str , Any ], additional_variables : Dict [str , Any ]) -> Callable :
104
+ # Build parameter string with type hints
105
+ parsed_parameters = {}
106
+ parsed_parameters_with_defaults = {}
107
+ for param_name , parsed_param_schema in properties .items ():
108
+ type_hint , has_default_value = get_python_type_and_default (parsed_param_schema )
109
+ if has_default_value :
110
+ parsed_parameters_with_defaults [param_name ] = f"{ param_name } : { type_hint } "
111
+ else :
112
+ parsed_parameters [param_name ] = f"{ param_name } : { type_hint } "
113
+
114
+ parsed_parameters_keys = list (parsed_parameters .keys ()) + list (parsed_parameters_with_defaults .keys ())
115
+ parsed_parameters_values = list (parsed_parameters .values ()) + list (parsed_parameters_with_defaults .values ())
116
+ parameters_str = ", " .join (parsed_parameters_values )
117
+ kwargs_str = ', ' .join ([f"'{ k } ': { k } " for k in parsed_parameters_keys ])
118
+
119
+ dynamic_function_body = f"""async def dynamic_http_tool_function({ parameters_str } ):
120
+ kwargs = {{{ kwargs_str } }}
121
+ return await http_tool_function_template(**kwargs)
122
+ """
123
+
124
+ # Create function namespace with required imports
125
+ namespace = {
126
+ "http_tool_function_template" : function_template ,
127
+ ** PYTHON_TYPE_IMPORTS ,
128
+ ** additional_variables
129
+ }
130
+
131
+ # Execute the dynamic function definition
132
+ exec (dynamic_function_body , namespace )
133
+ return namespace ["dynamic_http_tool_function" ]
175
134
176
135
def create_http_tool (
177
136
mcp_server : FastMCP ,
@@ -247,23 +206,13 @@ def create_http_tool(
247
206
schema = content_data ["schema" ]
248
207
response_info += f"\n Content-Type: { content_type } "
249
208
250
- # Resolve any schema references
251
- resolved_schema = resolve_schema_references (schema , openapi_schema )
252
-
253
209
# Clean the schema for display
254
- display_schema = clean_schema_for_display (resolved_schema )
210
+ display_schema = clean_schema_for_display (schema )
255
211
256
212
# Get model name if it's a referenced model
257
213
model_name = None
258
214
model_examples = None
259
215
items_model_name = None
260
- if "$ref" in schema :
261
- ref_path = schema ["$ref" ]
262
- if ref_path .startswith ("#/components/schemas/" ):
263
- model_name = ref_path .split ("/" )[- 1 ]
264
- response_info += f"\n Model: { model_name } "
265
- # Try to get examples from the model
266
- model_examples = extract_model_examples_from_components (model_name , openapi_schema )
267
216
268
217
# Check if this is an array of items
269
218
if schema .get ("type" ) == "array" and "items" in schema and "$ref" in schema ["items" ]:
@@ -358,7 +307,6 @@ def create_http_tool(
358
307
query_params = []
359
308
header_params = []
360
309
body_params = []
361
-
362
310
for param in parameters :
363
311
param_name = param .get ("name" )
364
312
param_in = param .get ("in" )
@@ -416,10 +364,12 @@ def create_http_tool(
416
364
param_required = param .get ("required" , False )
417
365
418
366
properties [param_name ] = {
419
- "type" : param_schema . get ( "type" , "string" ),
367
+ "type" : get_single_param_type_from_schema ( param_schema ),
420
368
"title" : param_name ,
421
369
"description" : param_desc ,
422
370
}
371
+ if "default" in param_schema :
372
+ properties [param_name ]["default" ] = param_schema ["default" ]
423
373
424
374
if param_required :
425
375
required_props .append (param_name )
@@ -429,14 +379,25 @@ def create_http_tool(
429
379
param_schema = param .get ("schema" , {})
430
380
param_required = param .get ("required" , False )
431
381
432
- properties [param_name ] = param_schema
433
- properties [param_name ]["title" ] = param_name
382
+ # properties[param_name] = param_schema
383
+ properties [param_name ] = {
384
+ "type" : get_single_param_type_from_schema (param_schema ),
385
+ "title" : param_name ,
386
+ }
387
+ if "default" in param_schema :
388
+ properties [param_name ]["default" ] = param_schema ["default" ]
434
389
435
390
if param_required :
436
391
required_props .append (param_name )
437
392
438
- # Function to dynamically call the API endpoint
439
- async def http_tool_function (kwargs : Dict [str , Any ] = Field (default_factory = dict )):
393
+ # Create a proper input schema for the tool
394
+ input_schema = {"type" : "object" , "properties" : properties , "title" : f"{ operation_id } Arguments" }
395
+
396
+ if required_props :
397
+ input_schema ["required" ] = required_props
398
+
399
+ # Dynamically create a function to call the API endpoint
400
+ async def http_tool_function_template (** kwargs ):
440
401
# Prepare URL with path parameters
441
402
url = f"{ base_url } { path } "
442
403
for param_name , _ in path_params :
@@ -480,13 +441,9 @@ async def http_tool_function(kwargs: Dict[str, Any] = Field(default_factory=dict
480
441
except ValueError :
481
442
return response .text
482
443
483
- # Create a proper input schema for the tool
484
- input_schema = {"type" : "object" , "properties" : properties , "title" : f"{ operation_id } Arguments" }
485
-
486
- if required_props :
487
- input_schema ["required" ] = required_props
488
-
489
- # Set the function name and docstring
444
+ # Create the http_tool_function (with name and docstring)
445
+ additional_variables = {"path_params" : path_params , "query_params" : query_params , "header_params" : header_params }
446
+ http_tool_function = _create_http_tool_function (http_tool_function_template , properties , additional_variables ) # type: ignore
490
447
http_tool_function .__name__ = operation_id
491
448
http_tool_function .__doc__ = tool_description
492
449
@@ -499,116 +456,3 @@ async def http_tool_function(kwargs: Dict[str, Any] = Field(default_factory=dict
499
456
500
457
# Update the tool's parameters to use our custom schema instead of the auto-generated one
501
458
tool .parameters = input_schema
502
-
503
-
504
- def extract_model_examples_from_components (
505
- model_name : str , openapi_schema : Dict [str , Any ]
506
- ) -> Optional [List [Dict [str , Any ]]]:
507
- """
508
- Extract examples from a model definition in the OpenAPI components.
509
-
510
- Args:
511
- model_name: The name of the model to extract examples from
512
- openapi_schema: The full OpenAPI schema
513
-
514
- Returns:
515
- List of example dictionaries if found, None otherwise
516
- """
517
- if "components" not in openapi_schema or "schemas" not in openapi_schema ["components" ]:
518
- return None
519
-
520
- if model_name not in openapi_schema ["components" ]["schemas" ]:
521
- return None
522
-
523
- schema = openapi_schema ["components" ]["schemas" ][model_name ]
524
-
525
- # Look for examples in the schema
526
- examples = None
527
-
528
- # Check for examples field directly (OpenAPI 3.1.0+)
529
- if "examples" in schema :
530
- examples = schema ["examples" ]
531
- # Check for example field (older OpenAPI versions)
532
- elif "example" in schema :
533
- examples = [schema ["example" ]]
534
-
535
- return examples
536
-
537
-
538
- def generate_example_from_schema (schema : Dict [str , Any ], model_name : Optional [str ] = None ) -> Any :
539
- """
540
- Generate a simple example response from a JSON schema.
541
-
542
- Args:
543
- schema: The JSON schema to generate an example from
544
- model_name: Optional model name for special handling
545
-
546
- Returns:
547
- An example object based on the schema
548
- """
549
- if not schema or not isinstance (schema , dict ):
550
- return None
551
-
552
- # Special handling for known model types
553
- if model_name == "Item" :
554
- # Create a realistic Item example since this is commonly used
555
- return {
556
- "id" : 1 ,
557
- "name" : "Hammer" ,
558
- "description" : "A tool for hammering nails" ,
559
- "price" : 9.99 ,
560
- "tags" : ["tool" , "hardware" ],
561
- }
562
- elif model_name == "HTTPValidationError" :
563
- # Create a realistic validation error example
564
- return {"detail" : [{"loc" : ["body" , "name" ], "msg" : "field required" , "type" : "value_error.missing" }]}
565
-
566
- # Handle different types
567
- schema_type = schema .get ("type" )
568
-
569
- if schema_type == "object" :
570
- result = {}
571
- if "properties" in schema :
572
- for prop_name , prop_schema in schema ["properties" ].items ():
573
- # Generate an example for each property
574
- prop_example = generate_example_from_schema (prop_schema )
575
- if prop_example is not None :
576
- result [prop_name ] = prop_example
577
- return result
578
-
579
- elif schema_type == "array" :
580
- if "items" in schema :
581
- # Generate a single example item
582
- item_example = generate_example_from_schema (schema ["items" ])
583
- if item_example is not None :
584
- return [item_example ]
585
- return []
586
-
587
- elif schema_type == "string" :
588
- # Check if there's a format
589
- format_type = schema .get ("format" )
590
- if format_type == "date-time" :
591
- return "2023-01-01T00:00:00Z"
592
- elif format_type == "date" :
593
- return "2023-01-01"
594
- elif format_type == "email" :
595
-
596
- elif format_type == "uri" :
597
- return "https://example.com"
598
- # Use title or property name if available
599
- return schema .get ("title" , "string" )
600
-
601
- elif schema_type == "integer" :
602
- return 1
603
-
604
- elif schema_type == "number" :
605
- return 1.0
606
-
607
- elif schema_type == "boolean" :
608
- return True
609
-
610
- elif schema_type == "null" :
611
- return None
612
-
613
- # Default case
614
- return None
0 commit comments