diff --git a/compose/dev/docker-compose.yml b/compose/dev/docker-compose.yml index ca47412..c700d13 100644 --- a/compose/dev/docker-compose.yml +++ b/compose/dev/docker-compose.yml @@ -3,7 +3,7 @@ services: fyp-redis: image: redis:7-alpine - container_name: fyp-redis + container_name: ${REDIS_CONTAINER_NAME:-fyp-redis} ports: - "0.0.0.0:6379:6379" volumes: @@ -36,31 +36,64 @@ services: build: context: ../.. dockerfile: compose/dev/python/Dockerfile + container_name: dynavera-api ports: - "0.0.0.0:8000:8000" volumes: - ../../:/app - venv:/venv environment: - CELERY_BROKER_URL: redis://fyp-redis:6379/0 + DJANGO_SECRET_KEY: ${DJANGO_SECRET_KEY:-dev-secret-key-change-in-production} + DJANGO_DEBUG: "true" + DJANGO_ALLOWED_HOSTS: "*" + DJANGO_CELERY_BROKER_URL: redis://${REDIS_CONTAINER_NAME:-fyp-redis}:6379/0 + DJANGO_CORS_ALLOWED_ORIGINS: http://localhost:5173,http://127.0.0.1:5173 DJANGO_SETTINGS_MODULE: config.settings + MCP_AGENT_URL: http://mcp-agent-server:8001 depends_on: fyp-redis: condition: service_healthy web: condition: service_started + mcp-agent-server: + condition: service_started celery: build: context: ../.. dockerfile: compose/dev/celery/Dockerfile + container_name: dynavera-celery volumes: - ../../:/app - venv:/venv - - ${USERPROFILE}/.cache/gpt4all:/root/.cache/gpt4all:ro + - ${USERPROFILE}/.cache/gpt4all:/root/.cache/gpt4all:rw environment: - CELERY_BROKER_URL: redis://fyp-redis:6379/0 + DJANGO_SECRET_KEY: ${DJANGO_SECRET_KEY:-dev-secret-key-change-in-production} + DJANGO_CELERY_BROKER_URL: redis://${REDIS_CONTAINER_NAME:-fyp-redis}:6379/0 DJANGO_SETTINGS_MODULE: config.settings + MCP_AGENT_URL: http://mcp-agent-server:8001 + depends_on: + fyp-redis: + condition: service_healthy + mcp-agent-server: + condition: service_started + + mcp-agent-server: + build: + context: ../.. + dockerfile: compose/dev/mcp/Dockerfile + container_name: dynavera-mcp-agent + ports: + - "0.0.0.0:8001:8001" + volumes: + - ../../:/app + - ${USERPROFILE}/.cache/gpt4all:/root/.cache/gpt4all:rw + - ../../build/rag_db:/app/build/rag_db:ro + environment: + DJANGO_SECRET_KEY: ${DJANGO_SECRET_KEY:-dev-secret-key-change-in-production} + DJANGO_SETTINGS_MODULE: config.settings + PYTHONUNBUFFERED: "1" + HOME: /root depends_on: fyp-redis: condition: service_healthy diff --git a/compose/dev/mcp/Dockerfile b/compose/dev/mcp/Dockerfile new file mode 100644 index 0000000..69fc861 --- /dev/null +++ b/compose/dev/mcp/Dockerfile @@ -0,0 +1,24 @@ +FROM python:3.11-slim + +WORKDIR /app + +RUN apt-get update && apt-get install -y \ + build-essential \ + git \ + && rm -rf /var/lib/apt/lists/* + +COPY requirements/base.txt requirements/base.txt + +RUN pip install --no-cache-dir -r requirements/base.txt && \ + pip install --no-cache-dir mcp gpt4all sentence-transformers chromadb + +COPY apps /app/apps +COPY config /app/config +COPY mcp_agent /app/mcp_agent +COPY manage.py /app/ + +ENV PYTHONUNBUFFERED=1 +ENV DJANGO_SETTINGS_MODULE=config.settings +EXPOSE 8001 + +CMD ["python", "-m", "mcp_agent.mcp_server"] diff --git a/config/__pycache__/__init__.cpython-313.pyc b/config/__pycache__/__init__.cpython-313.pyc index b15c2cc..b7eb80b 100644 Binary files a/config/__pycache__/__init__.cpython-313.pyc and b/config/__pycache__/__init__.cpython-313.pyc differ diff --git a/config/__pycache__/settings.cpython-313.pyc b/config/__pycache__/settings.cpython-313.pyc index 7b7298b..cde58dd 100644 Binary files a/config/__pycache__/settings.cpython-313.pyc and b/config/__pycache__/settings.cpython-313.pyc differ diff --git a/config/settings.py b/config/settings.py index 855472b..10d31f6 100644 --- a/config/settings.py +++ b/config/settings.py @@ -65,11 +65,13 @@ ROOT_URLCONF = f'{PARENT_NAME}.urls' WSGI_APPLICATION = f'{PARENT_NAME}.wsgi.application' ASGI_APPLICATION = f'{PARENT_NAME}.asgi.application' +DJANGO_CELERY_BROKER_URL = os.getenv('DJANGO_CELERY_BROKER_URL', 'redis://localhost:6379/0') + CHANNEL_LAYERS = { 'default': { 'BACKEND': 'channels_redis.core.RedisChannelLayer', 'CONFIG': { - 'hosts': [os.getenv('CELERY_BROKER_URL')], + 'hosts': [DJANGO_CELERY_BROKER_URL], }, }, } @@ -152,7 +154,7 @@ REST_FRAMEWORK = { ], } -CELERY_BROKER_URL = os.getenv('DJANGO_CELERY_BROKER_URL') +CELERY_BROKER_URL = DJANGO_CELERY_BROKER_URL CELERY_RESULT_BACKEND = 'django-db' CELERY_CACHE_BACKEND = 'django-cache' CELERY_ACCEPT_CONTENT = ['json'] @@ -162,7 +164,10 @@ CELERY_TIMEZONE = 'UTC' CELERY_TASK_TRACK_STARTED = True CELERY_TASK_TIME_LIMIT = 30 * 60 -CORS_ALLOWED_ORIGINS = os.getenv('DJANGO_CORS_ALLOWED_ORIGINS').split(',') if os.getenv('DJANGO_CORS_ALLOWED_ORIGINS') else [] +MCP_AGENT_URL = os.getenv('MCP_AGENT_URL', 'http://localhost:8001') + +if origins:=os.getenv('DJANGO_CORS_ALLOWED_ORIGINS'): + CORS_ALLOWED_ORIGINS = origins.split(',') CORS_ALLOW_CREDENTIALS = True if DEBUG: diff --git a/mcp_agent/__init__.py b/mcp_agent/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mcp_agent/mcp_client.py b/mcp_agent/mcp_client.py new file mode 100644 index 0000000..1a29b35 --- /dev/null +++ b/mcp_agent/mcp_client.py @@ -0,0 +1,129 @@ +import httpx +import json +from typing import Optional, Dict, Any, List +from django.conf import settings +import asyncio +import logging + +logger = logging.getLogger(__name__) + + +class MCPAgentClient: + + def __init__(self, server_url: Optional[str] = None): + self.server_url = server_url or getattr(settings, 'MCP_AGENT_URL', 'http://localhost:8001') + self.http_client = httpx.AsyncClient( + timeout=httpx.Timeout(300.0), + follow_redirects=True + ) + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + if self.http_client: + await self.http_client.aclose() + + async def execute_agent( + self, + agent_id: str, + agent_name: str, + execution_id: str, + query: str, + input_data: Optional[Dict[str, Any]] = None + ) -> Dict[str, Any]: + arguments = { + "agent_id": agent_id, + "agent_name": agent_name, + "execution_id": execution_id, + "query": query, + "input_data": input_data or {} + } + + return await self._execute_via_http(arguments) + + async def _execute_via_http(self, arguments: Dict[str, Any]) -> Dict[str, Any]: + if not self.http_client: + raise RuntimeError("HTTP client not initialized") + + try: + response = await self.http_client.post( + f"{self.server_url}/execute", + json={ + "tool": "execute_agent", + "arguments": arguments + }, + headers={"Content-Type": "application/json"} + ) + response.raise_for_status() + return response.json() + + except httpx.HTTPStatusError as e: + logger.error(f"HTTP error from MCP server: {e.response.status_code} - {e.response.text}") + return { + "status": "failed", + "error": f"Server returned {e.response.status_code}", + "error_type": "HTTPError", + "details": e.response.text + } + except httpx.RequestError as e: + logger.error(f"Request error to MCP server: {e}") + return { + "status": "failed", + "error": f"Failed to connect to MCP server at {self.server_url}", + "error_type": "ConnectionError" + } + except Exception as e: + logger.error(f"Unexpected error in HTTP execution: {e}") + return { + "status": "failed", + "error": str(e), + "error_type": type(e).__name__ + } + + async def health_check(self) -> Dict[str, Any]: + try: + response = await self.http_client.get(f"{self.server_url}/health") + response.raise_for_status() + return response.json() + except Exception as e: + return {"status": "unhealthy", "error": str(e)} + + async def list_tools(self) -> List[Dict[str, Any]]: + return [ + { + "name": "execute_agent", + "description": "Execute an AI agent with given query and input data" + }, + { + "name": "health_check", + "description": "Check if the agent server is healthy" + } + ] + + async def close(self): + if self.http_client: + await self.http_client.aclose() + +_mcp_client_instance: Optional[MCPAgentClient] = None +_client_lock = asyncio.Lock() + + +async def get_mcp_client() -> MCPAgentClient: + global _mcp_client_instance + + async with _client_lock: + if _mcp_client_instance is None: + server_url = getattr(settings, 'MCP_AGENT_URL', 'http://localhost:8001') + _mcp_client_instance = MCPAgentClient(server_url=server_url) + + return _mcp_client_instance + + +async def close_mcp_client(): + global _mcp_client_instance + + async with _client_lock: + if _mcp_client_instance is not None: + await _mcp_client_instance.close() + _mcp_client_instance = None diff --git a/mcp_agent/mcp_server.py b/mcp_agent/mcp_server.py new file mode 100644 index 0000000..f44fde1 --- /dev/null +++ b/mcp_agent/mcp_server.py @@ -0,0 +1,308 @@ +import asyncio +import json +import os +import sys +from pathlib import Path +from aiohttp import web + +if __name__ == "__main__": + project_root = Path(__file__).resolve().parent.parent + sys.path.insert(0, str(project_root)) + os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings') + import django + django.setup() + +from mcp.server import Server +from mcp.types import Tool, TextContent, ImageContent, EmbeddedResource +from django.utils import timezone +from typing import Any, Dict + + +app = Server("dynavera-agent-runtime") + + +async def handle_health(request: web.Request) -> web.Response: + try: + result = await check_health() + return web.json_response(result) + except Exception as e: + return web.json_response({"status": "unhealthy", "error": str(e)}, status=500) + + +async def handle_execute(request: web.Request) -> web.Response: + try: + payload = await request.json() + tool = payload.get("tool") + arguments = payload.get("arguments", {}) or {} + if tool not in {"execute_agent", "health_check"}: + return web.json_response({"status": "failed", "error": f"Unknown tool: {tool}"}, status=400) + + if tool == "execute_agent": + result = await run_agent_execution(arguments) + else: + result = await check_health() + + return web.json_response(result) + except json.JSONDecodeError: + return web.json_response({"status": "failed", "error": "Invalid JSON payload"}, status=400) + except Exception as e: + print(f"[MCP Server] HTTP execute error: {e}") + return web.json_response({"status": "failed", "error": str(e)}, status=500) + + +@app.list_tools() +async def list_tools() -> list[Tool]: + return [ + Tool( + name="execute_agent", + description="Execute an AI agent with given query and input data. Supports RAG-enabled responses using local knowledge base.", + inputSchema={ + "type": "object", + "properties": { + "agent_id": { + "type": "string", + "description": "UUID of the agent to execute" + }, + "agent_name": { + "type": "string", + "description": "Name of the agent" + }, + "execution_id": { + "type": "string", + "description": "UUID of the execution record" + }, + "query": { + "type": "string", + "description": "User query to process" + }, + "input_data": { + "type": "object", + "description": "Additional input parameters" + } + }, + "required": ["agent_id", "agent_name", "execution_id", "query"] + } + ), + Tool( + name="health_check", + description="Check if the agent server is healthy and ready to process requests", + inputSchema={ + "type": "object", + "properties": {} + } + ) + ] + + +@app.call_tool() +async def call_tool(name: str, arguments: dict) -> list[TextContent | ImageContent | EmbeddedResource]: + if name == "execute_agent": + result = await run_agent_execution(arguments) + return [TextContent( + type="text", + text=json.dumps(result, indent=2) + )] + + elif name == "health_check": + health_info = await check_health() + return [TextContent( + type="text", + text=json.dumps(health_info, indent=2) + )] + + raise ValueError(f"Unknown tool: {name}") + + +async def check_health() -> Dict[str, Any]: + import platform + + MODEL_NAME = "Meta-Llama-3-8B-Instruct.Q4_0.gguf" + DEFAULT_MODEL_DIR = os.path.join(os.path.expanduser("~"), ".cache", "gpt4all") + MODEL_PATH = os.path.join(DEFAULT_MODEL_DIR, MODEL_NAME) + RAG_PATH = "./build/rag_db" + + return { + "status": "healthy", + "timestamp": timezone.now().isoformat(), + "platform": platform.platform(), + "python_version": platform.python_version(), + "model_available": os.path.exists(MODEL_PATH), + "model_path": MODEL_PATH, + "rag_available": os.path.exists(RAG_PATH), + "rag_path": RAG_PATH + } + + +async def run_agent_execution(arguments: dict) -> Dict[str, Any]: + agent_id = arguments["agent_id"] + agent_name = arguments["agent_name"] + execution_id = arguments["execution_id"] + query = arguments.get("query", "") + input_data = arguments.get("input_data", {}) + + print(f"[MCP Server] Executing agent {agent_name} (ID: {agent_id})") + print(f"[MCP Server] Execution ID: {execution_id}") + print(f"[MCP Server] Query: {query}") + + if not query: + return { + "status": "error", + "message": "No query provided", + "execution_id": execution_id, + "timestamp": timezone.now().isoformat() + } + + try: + from gpt4all import GPT4All + from sentence_transformers import SentenceTransformer + from chromadb import PersistentClient + + MODEL_NAME = "Meta-Llama-3-8B-Instruct.Q4_0.gguf" + EMBEDDER_NAME = "all-MiniLM-L6-v2" + RAG_PATH = "./build/rag_db" + CONTEXT_SIZE = 8192 + DEFAULT_MODEL_DIR = os.path.join(os.path.expanduser("~"), ".cache", "gpt4all") + MODEL_PATH = os.path.join(DEFAULT_MODEL_DIR, MODEL_NAME) + + print(f"[MCP Server] MODEL_PATH={MODEL_PATH}") + + # Check if model exists, fail if not + if not os.path.exists(MODEL_PATH): + error_msg = f"Model not found at {MODEL_PATH}" + print(f"[MCP Server] {error_msg}") + return { + "status": "failed", + "error": error_msg, + "error_type": "ModelNotFound", + "execution_id": execution_id, + "timestamp": timezone.now().isoformat() + } + + print("[MCP Server] Full pipeline - loading models") + events = [] + + # Initialize AI model + events.append({ + "type": "progress", + "stage": "initializing", + "message": "Initializing AI model...", + "timestamp": timezone.now().isoformat() + }) + + # RAG retrieval if available + if os.path.exists(RAG_PATH): + print(f"[MCP Server] RAG path found at {RAG_PATH}") + try: + embedder = SentenceTransformer(EMBEDDER_NAME) + client = PersistentClient(path=RAG_PATH) + collection = client.get_collection("documents") + + events.append({ + "type": "progress", + "stage": "retrieval", + "message": "Retrieving relevant context...", + "timestamp": timezone.now().isoformat() + }) + + query_embedding = embedder.encode(query).tolist() + results = collection.query(query_embeddings=[query_embedding], n_results=3) + + retrieved_docs = [] + if results and results.get('documents'): + retrieved_docs = results['documents'][0] + + context = "\n\n".join(retrieved_docs) if retrieved_docs else "" + + events.append({ + "type": "progress", + "stage": "retrieved", + "message": f"Retrieved {len(retrieved_docs)} relevant documents", + "timestamp": timezone.now().isoformat() + }) + + except Exception as rag_error: + print(f"[MCP Server] RAG error: {rag_error}") + context = "" + events.append({ + "type": "warning", + "message": f"RAG retrieval failed: {str(rag_error)}", + "timestamp": timezone.now().isoformat() + }) + else: + context = "" + + # Load and run LLM + events.append({ + "type": "progress", + "stage": "generating", + "message": "Generating response...", + "timestamp": timezone.now().isoformat() + }) + + model = GPT4All(MODEL_NAME, model_path=DEFAULT_MODEL_DIR, allow_download=False) + + if context: + prompt = f"Context:\n{context}\n\nQuestion: {query}\n\nAnswer:" + else: + prompt = f"Question: {query}\n\nAnswer:" + + print(f"[MCP Server] Running model inference...") + response = model.generate(prompt, max_tokens=512, temp=0.7) + + print(f"[MCP Server] Generated response: {response[:100]}...") + + events.append({ + "type": "progress", + "stage": "completed", + "message": "Response generated successfully", + "timestamp": timezone.now().isoformat() + }) + + return { + "status": "completed", + "query": query, + "response": response, + "method": "rag" if context else "direct", + "context_used": bool(context), + "timestamp": timezone.now().isoformat(), + "agent_name": agent_name, + "execution_id": execution_id, + "events": events + } + + except Exception as e: + print(f"[MCP Server] Error during execution: {e}") + import traceback + traceback.print_exc() + + return { + "status": "failed", + "error": str(e), + "error_type": type(e).__name__, + "execution_id": execution_id, + "timestamp": timezone.now().isoformat() + } + + +async def run_http_server(): + host = os.getenv("MCP_HTTP_HOST", "0.0.0.0") + port = int(os.getenv("MCP_HTTP_PORT", "8001")) + app_http = web.Application() + app_http.router.add_post("/execute", handle_execute) + app_http.router.add_get("/health", handle_health) + + runner = web.AppRunner(app_http) + await runner.setup() + site = web.TCPSite(runner, host=host, port=port) + await site.start() + print(f"[MCP Server] HTTP server listening on {host}:{port}", file=sys.stderr) + + await asyncio.Event().wait() + + +async def main(): + await run_http_server() + + +if __name__ == "__main__": + asyncio.run(main())