Dynavera/mcp_agent/mcp_server.py

309 lines
10 KiB
Python
Raw Normal View History

2025-12-17 14:27:39 +00:00
import asyncio
import json
import os
import sys
from pathlib import Path
from aiohttp import web
if __name__ == "__main__":
project_root = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(project_root))
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')
import django
django.setup()
from mcp.server import Server
from mcp.types import Tool, TextContent, ImageContent, EmbeddedResource
from django.utils import timezone
from typing import Any, Dict
app = Server("dynavera-agent-runtime")
async def handle_health(request: web.Request) -> web.Response:
try:
result = await check_health()
return web.json_response(result)
except Exception as e:
return web.json_response({"status": "unhealthy", "error": str(e)}, status=500)
async def handle_execute(request: web.Request) -> web.Response:
try:
payload = await request.json()
tool = payload.get("tool")
arguments = payload.get("arguments", {}) or {}
if tool not in {"execute_agent", "health_check"}:
return web.json_response({"status": "failed", "error": f"Unknown tool: {tool}"}, status=400)
if tool == "execute_agent":
result = await run_agent_execution(arguments)
else:
result = await check_health()
return web.json_response(result)
except json.JSONDecodeError:
return web.json_response({"status": "failed", "error": "Invalid JSON payload"}, status=400)
except Exception as e:
print(f"[MCP Server] HTTP execute error: {e}")
return web.json_response({"status": "failed", "error": str(e)}, status=500)
@app.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="execute_agent",
description="Execute an AI agent with given query and input data. Supports RAG-enabled responses using local knowledge base.",
inputSchema={
"type": "object",
"properties": {
"agent_id": {
"type": "string",
"description": "UUID of the agent to execute"
},
"agent_name": {
"type": "string",
"description": "Name of the agent"
},
"execution_id": {
"type": "string",
"description": "UUID of the execution record"
},
"query": {
"type": "string",
"description": "User query to process"
},
"input_data": {
"type": "object",
"description": "Additional input parameters"
}
},
"required": ["agent_id", "agent_name", "execution_id", "query"]
}
),
Tool(
name="health_check",
description="Check if the agent server is healthy and ready to process requests",
inputSchema={
"type": "object",
"properties": {}
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent | ImageContent | EmbeddedResource]:
if name == "execute_agent":
result = await run_agent_execution(arguments)
return [TextContent(
type="text",
text=json.dumps(result, indent=2)
)]
elif name == "health_check":
health_info = await check_health()
return [TextContent(
type="text",
text=json.dumps(health_info, indent=2)
)]
raise ValueError(f"Unknown tool: {name}")
async def check_health() -> Dict[str, Any]:
import platform
MODEL_NAME = "Meta-Llama-3-8B-Instruct.Q4_0.gguf"
DEFAULT_MODEL_DIR = os.path.join(os.path.expanduser("~"), ".cache", "gpt4all")
MODEL_PATH = os.path.join(DEFAULT_MODEL_DIR, MODEL_NAME)
RAG_PATH = "./build/rag_db"
return {
"status": "healthy",
"timestamp": timezone.now().isoformat(),
"platform": platform.platform(),
"python_version": platform.python_version(),
"model_available": os.path.exists(MODEL_PATH),
"model_path": MODEL_PATH,
"rag_available": os.path.exists(RAG_PATH),
"rag_path": RAG_PATH
}
async def run_agent_execution(arguments: dict) -> Dict[str, Any]:
agent_id = arguments["agent_id"]
agent_name = arguments["agent_name"]
execution_id = arguments["execution_id"]
query = arguments.get("query", "")
input_data = arguments.get("input_data", {})
print(f"[MCP Server] Executing agent {agent_name} (ID: {agent_id})")
print(f"[MCP Server] Execution ID: {execution_id}")
print(f"[MCP Server] Query: {query}")
if not query:
return {
"status": "error",
"message": "No query provided",
"execution_id": execution_id,
"timestamp": timezone.now().isoformat()
}
try:
from gpt4all import GPT4All
from sentence_transformers import SentenceTransformer
from chromadb import PersistentClient
MODEL_NAME = "Meta-Llama-3-8B-Instruct.Q4_0.gguf"
EMBEDDER_NAME = "all-MiniLM-L6-v2"
RAG_PATH = "./build/rag_db"
CONTEXT_SIZE = 8192
DEFAULT_MODEL_DIR = os.path.join(os.path.expanduser("~"), ".cache", "gpt4all")
MODEL_PATH = os.path.join(DEFAULT_MODEL_DIR, MODEL_NAME)
print(f"[MCP Server] MODEL_PATH={MODEL_PATH}")
# Check if model exists, fail if not
if not os.path.exists(MODEL_PATH):
error_msg = f"Model not found at {MODEL_PATH}"
print(f"[MCP Server] {error_msg}")
return {
"status": "failed",
"error": error_msg,
"error_type": "ModelNotFound",
"execution_id": execution_id,
"timestamp": timezone.now().isoformat()
}
print("[MCP Server] Full pipeline - loading models")
events = []
# Initialize AI model
events.append({
"type": "progress",
"stage": "initializing",
"message": "Initializing AI model...",
"timestamp": timezone.now().isoformat()
})
# RAG retrieval if available
if os.path.exists(RAG_PATH):
print(f"[MCP Server] RAG path found at {RAG_PATH}")
try:
embedder = SentenceTransformer(EMBEDDER_NAME)
client = PersistentClient(path=RAG_PATH)
collection = client.get_collection("documents")
events.append({
"type": "progress",
"stage": "retrieval",
"message": "Retrieving relevant context...",
"timestamp": timezone.now().isoformat()
})
query_embedding = embedder.encode(query).tolist()
results = collection.query(query_embeddings=[query_embedding], n_results=3)
retrieved_docs = []
if results and results.get('documents'):
retrieved_docs = results['documents'][0]
context = "\n\n".join(retrieved_docs) if retrieved_docs else ""
events.append({
"type": "progress",
"stage": "retrieved",
"message": f"Retrieved {len(retrieved_docs)} relevant documents",
"timestamp": timezone.now().isoformat()
})
except Exception as rag_error:
print(f"[MCP Server] RAG error: {rag_error}")
context = ""
events.append({
"type": "warning",
"message": f"RAG retrieval failed: {str(rag_error)}",
"timestamp": timezone.now().isoformat()
})
else:
context = ""
# Load and run LLM
events.append({
"type": "progress",
"stage": "generating",
"message": "Generating response...",
"timestamp": timezone.now().isoformat()
})
model = GPT4All(MODEL_NAME, model_path=DEFAULT_MODEL_DIR, allow_download=False)
if context:
prompt = f"Context:\n{context}\n\nQuestion: {query}\n\nAnswer:"
else:
prompt = f"Question: {query}\n\nAnswer:"
print(f"[MCP Server] Running model inference...")
response = model.generate(prompt, max_tokens=512, temp=0.7)
print(f"[MCP Server] Generated response: {response[:100]}...")
events.append({
"type": "progress",
"stage": "completed",
"message": "Response generated successfully",
"timestamp": timezone.now().isoformat()
})
return {
"status": "completed",
"query": query,
"response": response,
"method": "rag" if context else "direct",
"context_used": bool(context),
"timestamp": timezone.now().isoformat(),
"agent_name": agent_name,
"execution_id": execution_id,
"events": events
}
except Exception as e:
print(f"[MCP Server] Error during execution: {e}")
import traceback
traceback.print_exc()
return {
"status": "failed",
"error": str(e),
"error_type": type(e).__name__,
"execution_id": execution_id,
"timestamp": timezone.now().isoformat()
}
async def run_http_server():
host = os.getenv("MCP_HTTP_HOST", "0.0.0.0")
port = int(os.getenv("MCP_HTTP_PORT", "8001"))
app_http = web.Application()
app_http.router.add_post("/execute", handle_execute)
app_http.router.add_get("/health", handle_health)
runner = web.AppRunner(app_http)
await runner.setup()
site = web.TCPSite(runner, host=host, port=port)
await site.start()
print(f"[MCP Server] HTTP server listening on {host}:{port}", file=sys.stderr)
await asyncio.Event().wait()
async def main():
await run_http_server()
if __name__ == "__main__":
asyncio.run(main())