import asyncio import json import os import sys from aiohttp import web from mcp.server import Server from mcp.types import Tool, TextContent app = Server("minimal-mcp-server") @app.list_tools() async def list_tools(): return [ Tool( name="echo", description="Echo back the provided input", inputSchema={ "type": "object", "properties": { "message": {"type": "string"} }, "required": ["message"] }, ) ] @app.call_tool() async def call_tool(name: str, arguments: dict): if name != "echo": raise ValueError(f"Unknown tool: {name}") return [ TextContent( type="text", text=json.dumps( { "received": arguments, "status": "ok", }, indent=2, ), ) ] async def handle_execute(request: web.Request) -> web.Response: try: payload = await request.json() tool = payload.get("tool") arguments = payload.get("arguments", {}) if not tool: return web.json_response( {"error": "Missing 'tool' field"}, status=400 ) result = await call_tool(tool, arguments) return web.json_response( { "tool": tool, "result": [c.text for c in result], } ) except json.JSONDecodeError: return web.json_response({"error": "Invalid JSON"}, status=400) except Exception as e: return web.json_response({"error": str(e)}, status=500) async def handle_health(request: web.Request) -> web.Response: return web.json_response({"status": "healthy"}) async def run_http_server(): host = os.getenv("MCP_HTTP_HOST", "0.0.0.0") port = int(os.getenv("MCP_HTTP_PORT", "8001")) app_http = web.Application() app_http.router.add_post("/execute", handle_execute) app_http.router.add_get("/health", handle_health) runner = web.AppRunner(app_http) await runner.setup() site = web.TCPSite(runner, host, port) await site.start() print(f"HTTP server running on {host}:{port}", file=sys.stderr) await asyncio.Event().wait() if __name__ == "__main__": asyncio.run(run_http_server())