309 lines
10 KiB
Python
309 lines
10 KiB
Python
|
|
import asyncio
|
||
|
|
import json
|
||
|
|
import os
|
||
|
|
import sys
|
||
|
|
from pathlib import Path
|
||
|
|
from aiohttp import web
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
project_root = Path(__file__).resolve().parent.parent
|
||
|
|
sys.path.insert(0, str(project_root))
|
||
|
|
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')
|
||
|
|
import django
|
||
|
|
django.setup()
|
||
|
|
|
||
|
|
from mcp.server import Server
|
||
|
|
from mcp.types import Tool, TextContent, ImageContent, EmbeddedResource
|
||
|
|
from django.utils import timezone
|
||
|
|
from typing import Any, Dict
|
||
|
|
|
||
|
|
|
||
|
|
app = Server("dynavera-agent-runtime")
|
||
|
|
|
||
|
|
|
||
|
|
async def handle_health(request: web.Request) -> web.Response:
|
||
|
|
try:
|
||
|
|
result = await check_health()
|
||
|
|
return web.json_response(result)
|
||
|
|
except Exception as e:
|
||
|
|
return web.json_response({"status": "unhealthy", "error": str(e)}, status=500)
|
||
|
|
|
||
|
|
|
||
|
|
async def handle_execute(request: web.Request) -> web.Response:
|
||
|
|
try:
|
||
|
|
payload = await request.json()
|
||
|
|
tool = payload.get("tool")
|
||
|
|
arguments = payload.get("arguments", {}) or {}
|
||
|
|
if tool not in {"execute_agent", "health_check"}:
|
||
|
|
return web.json_response({"status": "failed", "error": f"Unknown tool: {tool}"}, status=400)
|
||
|
|
|
||
|
|
if tool == "execute_agent":
|
||
|
|
result = await run_agent_execution(arguments)
|
||
|
|
else:
|
||
|
|
result = await check_health()
|
||
|
|
|
||
|
|
return web.json_response(result)
|
||
|
|
except json.JSONDecodeError:
|
||
|
|
return web.json_response({"status": "failed", "error": "Invalid JSON payload"}, status=400)
|
||
|
|
except Exception as e:
|
||
|
|
print(f"[MCP Server] HTTP execute error: {e}")
|
||
|
|
return web.json_response({"status": "failed", "error": str(e)}, status=500)
|
||
|
|
|
||
|
|
|
||
|
|
@app.list_tools()
|
||
|
|
async def list_tools() -> list[Tool]:
|
||
|
|
return [
|
||
|
|
Tool(
|
||
|
|
name="execute_agent",
|
||
|
|
description="Execute an AI agent with given query and input data. Supports RAG-enabled responses using local knowledge base.",
|
||
|
|
inputSchema={
|
||
|
|
"type": "object",
|
||
|
|
"properties": {
|
||
|
|
"agent_id": {
|
||
|
|
"type": "string",
|
||
|
|
"description": "UUID of the agent to execute"
|
||
|
|
},
|
||
|
|
"agent_name": {
|
||
|
|
"type": "string",
|
||
|
|
"description": "Name of the agent"
|
||
|
|
},
|
||
|
|
"execution_id": {
|
||
|
|
"type": "string",
|
||
|
|
"description": "UUID of the execution record"
|
||
|
|
},
|
||
|
|
"query": {
|
||
|
|
"type": "string",
|
||
|
|
"description": "User query to process"
|
||
|
|
},
|
||
|
|
"input_data": {
|
||
|
|
"type": "object",
|
||
|
|
"description": "Additional input parameters"
|
||
|
|
}
|
||
|
|
},
|
||
|
|
"required": ["agent_id", "agent_name", "execution_id", "query"]
|
||
|
|
}
|
||
|
|
),
|
||
|
|
Tool(
|
||
|
|
name="health_check",
|
||
|
|
description="Check if the agent server is healthy and ready to process requests",
|
||
|
|
inputSchema={
|
||
|
|
"type": "object",
|
||
|
|
"properties": {}
|
||
|
|
}
|
||
|
|
)
|
||
|
|
]
|
||
|
|
|
||
|
|
|
||
|
|
@app.call_tool()
|
||
|
|
async def call_tool(name: str, arguments: dict) -> list[TextContent | ImageContent | EmbeddedResource]:
|
||
|
|
if name == "execute_agent":
|
||
|
|
result = await run_agent_execution(arguments)
|
||
|
|
return [TextContent(
|
||
|
|
type="text",
|
||
|
|
text=json.dumps(result, indent=2)
|
||
|
|
)]
|
||
|
|
|
||
|
|
elif name == "health_check":
|
||
|
|
health_info = await check_health()
|
||
|
|
return [TextContent(
|
||
|
|
type="text",
|
||
|
|
text=json.dumps(health_info, indent=2)
|
||
|
|
)]
|
||
|
|
|
||
|
|
raise ValueError(f"Unknown tool: {name}")
|
||
|
|
|
||
|
|
|
||
|
|
async def check_health() -> Dict[str, Any]:
|
||
|
|
import platform
|
||
|
|
|
||
|
|
MODEL_NAME = "Meta-Llama-3-8B-Instruct.Q4_0.gguf"
|
||
|
|
DEFAULT_MODEL_DIR = os.path.join(os.path.expanduser("~"), ".cache", "gpt4all")
|
||
|
|
MODEL_PATH = os.path.join(DEFAULT_MODEL_DIR, MODEL_NAME)
|
||
|
|
RAG_PATH = "./build/rag_db"
|
||
|
|
|
||
|
|
return {
|
||
|
|
"status": "healthy",
|
||
|
|
"timestamp": timezone.now().isoformat(),
|
||
|
|
"platform": platform.platform(),
|
||
|
|
"python_version": platform.python_version(),
|
||
|
|
"model_available": os.path.exists(MODEL_PATH),
|
||
|
|
"model_path": MODEL_PATH,
|
||
|
|
"rag_available": os.path.exists(RAG_PATH),
|
||
|
|
"rag_path": RAG_PATH
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
async def run_agent_execution(arguments: dict) -> Dict[str, Any]:
|
||
|
|
agent_id = arguments["agent_id"]
|
||
|
|
agent_name = arguments["agent_name"]
|
||
|
|
execution_id = arguments["execution_id"]
|
||
|
|
query = arguments.get("query", "")
|
||
|
|
input_data = arguments.get("input_data", {})
|
||
|
|
|
||
|
|
print(f"[MCP Server] Executing agent {agent_name} (ID: {agent_id})")
|
||
|
|
print(f"[MCP Server] Execution ID: {execution_id}")
|
||
|
|
print(f"[MCP Server] Query: {query}")
|
||
|
|
|
||
|
|
if not query:
|
||
|
|
return {
|
||
|
|
"status": "error",
|
||
|
|
"message": "No query provided",
|
||
|
|
"execution_id": execution_id,
|
||
|
|
"timestamp": timezone.now().isoformat()
|
||
|
|
}
|
||
|
|
|
||
|
|
try:
|
||
|
|
from gpt4all import GPT4All
|
||
|
|
from sentence_transformers import SentenceTransformer
|
||
|
|
from chromadb import PersistentClient
|
||
|
|
|
||
|
|
MODEL_NAME = "Meta-Llama-3-8B-Instruct.Q4_0.gguf"
|
||
|
|
EMBEDDER_NAME = "all-MiniLM-L6-v2"
|
||
|
|
RAG_PATH = "./build/rag_db"
|
||
|
|
CONTEXT_SIZE = 8192
|
||
|
|
DEFAULT_MODEL_DIR = os.path.join(os.path.expanduser("~"), ".cache", "gpt4all")
|
||
|
|
MODEL_PATH = os.path.join(DEFAULT_MODEL_DIR, MODEL_NAME)
|
||
|
|
|
||
|
|
print(f"[MCP Server] MODEL_PATH={MODEL_PATH}")
|
||
|
|
|
||
|
|
# Check if model exists, fail if not
|
||
|
|
if not os.path.exists(MODEL_PATH):
|
||
|
|
error_msg = f"Model not found at {MODEL_PATH}"
|
||
|
|
print(f"[MCP Server] {error_msg}")
|
||
|
|
return {
|
||
|
|
"status": "failed",
|
||
|
|
"error": error_msg,
|
||
|
|
"error_type": "ModelNotFound",
|
||
|
|
"execution_id": execution_id,
|
||
|
|
"timestamp": timezone.now().isoformat()
|
||
|
|
}
|
||
|
|
|
||
|
|
print("[MCP Server] Full pipeline - loading models")
|
||
|
|
events = []
|
||
|
|
|
||
|
|
# Initialize AI model
|
||
|
|
events.append({
|
||
|
|
"type": "progress",
|
||
|
|
"stage": "initializing",
|
||
|
|
"message": "Initializing AI model...",
|
||
|
|
"timestamp": timezone.now().isoformat()
|
||
|
|
})
|
||
|
|
|
||
|
|
# RAG retrieval if available
|
||
|
|
if os.path.exists(RAG_PATH):
|
||
|
|
print(f"[MCP Server] RAG path found at {RAG_PATH}")
|
||
|
|
try:
|
||
|
|
embedder = SentenceTransformer(EMBEDDER_NAME)
|
||
|
|
client = PersistentClient(path=RAG_PATH)
|
||
|
|
collection = client.get_collection("documents")
|
||
|
|
|
||
|
|
events.append({
|
||
|
|
"type": "progress",
|
||
|
|
"stage": "retrieval",
|
||
|
|
"message": "Retrieving relevant context...",
|
||
|
|
"timestamp": timezone.now().isoformat()
|
||
|
|
})
|
||
|
|
|
||
|
|
query_embedding = embedder.encode(query).tolist()
|
||
|
|
results = collection.query(query_embeddings=[query_embedding], n_results=3)
|
||
|
|
|
||
|
|
retrieved_docs = []
|
||
|
|
if results and results.get('documents'):
|
||
|
|
retrieved_docs = results['documents'][0]
|
||
|
|
|
||
|
|
context = "\n\n".join(retrieved_docs) if retrieved_docs else ""
|
||
|
|
|
||
|
|
events.append({
|
||
|
|
"type": "progress",
|
||
|
|
"stage": "retrieved",
|
||
|
|
"message": f"Retrieved {len(retrieved_docs)} relevant documents",
|
||
|
|
"timestamp": timezone.now().isoformat()
|
||
|
|
})
|
||
|
|
|
||
|
|
except Exception as rag_error:
|
||
|
|
print(f"[MCP Server] RAG error: {rag_error}")
|
||
|
|
context = ""
|
||
|
|
events.append({
|
||
|
|
"type": "warning",
|
||
|
|
"message": f"RAG retrieval failed: {str(rag_error)}",
|
||
|
|
"timestamp": timezone.now().isoformat()
|
||
|
|
})
|
||
|
|
else:
|
||
|
|
context = ""
|
||
|
|
|
||
|
|
# Load and run LLM
|
||
|
|
events.append({
|
||
|
|
"type": "progress",
|
||
|
|
"stage": "generating",
|
||
|
|
"message": "Generating response...",
|
||
|
|
"timestamp": timezone.now().isoformat()
|
||
|
|
})
|
||
|
|
|
||
|
|
model = GPT4All(MODEL_NAME, model_path=DEFAULT_MODEL_DIR, allow_download=False)
|
||
|
|
|
||
|
|
if context:
|
||
|
|
prompt = f"Context:\n{context}\n\nQuestion: {query}\n\nAnswer:"
|
||
|
|
else:
|
||
|
|
prompt = f"Question: {query}\n\nAnswer:"
|
||
|
|
|
||
|
|
print(f"[MCP Server] Running model inference...")
|
||
|
|
response = model.generate(prompt, max_tokens=512, temp=0.7)
|
||
|
|
|
||
|
|
print(f"[MCP Server] Generated response: {response[:100]}...")
|
||
|
|
|
||
|
|
events.append({
|
||
|
|
"type": "progress",
|
||
|
|
"stage": "completed",
|
||
|
|
"message": "Response generated successfully",
|
||
|
|
"timestamp": timezone.now().isoformat()
|
||
|
|
})
|
||
|
|
|
||
|
|
return {
|
||
|
|
"status": "completed",
|
||
|
|
"query": query,
|
||
|
|
"response": response,
|
||
|
|
"method": "rag" if context else "direct",
|
||
|
|
"context_used": bool(context),
|
||
|
|
"timestamp": timezone.now().isoformat(),
|
||
|
|
"agent_name": agent_name,
|
||
|
|
"execution_id": execution_id,
|
||
|
|
"events": events
|
||
|
|
}
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
print(f"[MCP Server] Error during execution: {e}")
|
||
|
|
import traceback
|
||
|
|
traceback.print_exc()
|
||
|
|
|
||
|
|
return {
|
||
|
|
"status": "failed",
|
||
|
|
"error": str(e),
|
||
|
|
"error_type": type(e).__name__,
|
||
|
|
"execution_id": execution_id,
|
||
|
|
"timestamp": timezone.now().isoformat()
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
async def run_http_server():
|
||
|
|
host = os.getenv("MCP_HTTP_HOST", "0.0.0.0")
|
||
|
|
port = int(os.getenv("MCP_HTTP_PORT", "8001"))
|
||
|
|
app_http = web.Application()
|
||
|
|
app_http.router.add_post("/execute", handle_execute)
|
||
|
|
app_http.router.add_get("/health", handle_health)
|
||
|
|
|
||
|
|
runner = web.AppRunner(app_http)
|
||
|
|
await runner.setup()
|
||
|
|
site = web.TCPSite(runner, host=host, port=port)
|
||
|
|
await site.start()
|
||
|
|
print(f"[MCP Server] HTTP server listening on {host}:{port}", file=sys.stderr)
|
||
|
|
|
||
|
|
await asyncio.Event().wait()
|
||
|
|
|
||
|
|
|
||
|
|
async def main():
|
||
|
|
await run_http_server()
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
asyncio.run(main())
|