Added agents module with consumers and routes

This commit is contained in:
Viswamedha Nalabotu 2025-12-17 14:11:23 +00:00
parent 8398a9aba4
commit ae33676109
9 changed files with 557 additions and 14 deletions

23
apps/agents/admin.py Normal file
View file

@ -0,0 +1,23 @@
from django.contrib import admin
from apps.agents.models import Agent, AgentExecution, AgentEvent
@admin.register(Agent)
class AgentAdmin(admin.ModelAdmin):
list_display = ('name', 'user', 'status', 'created_at')
list_filter = ('status', 'created_at')
search_fields = ('name', 'description')
@admin.register(AgentExecution)
class AgentExecutionAdmin(admin.ModelAdmin):
list_display = ('agent', 'user', 'status', 'created_at')
list_filter = ('status', 'created_at')
search_fields = ('agent__name',)
@admin.register(AgentEvent)
class AgentEventAdmin(admin.ModelAdmin):
list_display = ('event_type', 'execution', 'timestamp')
list_filter = ('event_type', 'timestamp')
search_fields = ('execution__agent__name',)

163
apps/agents/consumers.py Normal file
View file

@ -0,0 +1,163 @@
import json
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async
from apps.agents.models import Agent, AgentExecution, AgentEvent
from apps.agents.tasks import start_agent_task_mcp
class AgentConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.user = self.scope["user"]
self.agent_id = self.scope['url_route']['kwargs'].get('agent_id')
self.room_group_name = f"agent_{self.agent_id}"
if not self.user.is_authenticated:
await self.close()
return
await self.channel_layer.group_add(self.room_group_name, self.channel_name)
await self.accept()
await self.send(json.dumps({
"type": "connection",
"message": "Connected to agent stream",
"agent_id": str(self.agent_id)
}))
async def disconnect(self, close_code):
await self.channel_layer.group_discard(self.room_group_name, self.channel_name)
async def receive(self, text_data):
try:
data = json.loads(text_data)
action = data.get('action')
if action == 'start_agent':
await self.handle_start_agent(data)
elif action == 'stop_agent':
await self.handle_stop_agent(data)
else:
await self.send(json.dumps({
"type": "error",
"message": f"Unknown action: {action}"
}))
except json.JSONDecodeError:
await self.send(json.dumps({
"type": "error",
"message": "Invalid JSON"
}))
except Exception as e:
await self.send(json.dumps({
"type": "error",
"message": str(e)
}))
async def handle_start_agent(self, data):
input_data = data.get('input_data', {})
agent = await self.get_agent(self.agent_id, self.user)
if not agent:
await self.send(json.dumps({
"type": "error",
"message": "Agent not found"
}))
return
execution = await self.create_execution(agent, self.user, input_data)
await self.send(json.dumps({
"type": "execution_started",
"execution_id": str(execution.uuid),
"agent_id": str(agent.uuid),
"message": f"Agent execution {execution.uuid} queued"
}))
try:
from apps.agents.tasks import start_agent_task_mcp
print(f"[Consumer] Queuing MCP execution for {execution.uuid}")
start_agent_task_mcp.delay(str(execution.uuid))
except Exception as e:
print(f"Error queuing agent task: {e}")
await self.send(json.dumps({
"type": "execution_error",
"execution_id": str(execution.uuid),
"error_message": str(e)
}))
async def handle_stop_agent(self, data):
execution_id = data.get('execution_id')
execution = await self.get_execution(execution_id, self.user)
if not execution:
await self.send(json.dumps({
"type": "error",
"message": "Execution not found"
}))
return
await self.update_execution_status(execution, 'failed')
await self.send(json.dumps({
"type": "execution_stopped",
"execution_id": str(execution.uuid),
"message": "Agent execution stopped by user"
}))
async def agent_event(self, event):
await self.send(json.dumps({
"type": "agent_event",
"event_type": event['event_type'],
"content": event['content'],
"timestamp": event['timestamp']
}))
async def agent_completed(self, event):
await self.send(json.dumps({
"type": "execution_completed",
"execution_id": event['execution_id'],
"output_data": event['output_data'],
"message": "Agent execution completed"
}))
async def agent_error(self, event):
await self.send(json.dumps({
"type": "execution_error",
"execution_id": event['execution_id'],
"error_message": event['error_message']
}))
@database_sync_to_async
def get_agent(self, agent_id, user):
try:
return Agent.objects.get(uuid=agent_id, user=user)
except Agent.DoesNotExist:
return None
@database_sync_to_async
def get_execution(self, execution_id, user):
try:
return AgentExecution.objects.get(uuid=execution_id, user=user)
except AgentExecution.DoesNotExist:
return None
@database_sync_to_async
def create_execution(self, agent, user, input_data):
return AgentExecution.objects.create(
agent=agent,
user=user,
input_data=input_data
)
@database_sync_to_async
def update_execution_status(self, execution, status):
execution.status = status
execution.save()
return execution
@database_sync_to_async
def create_event(self, execution, event_type, content):
return AgentEvent.objects.create(
execution=execution,
event_type=event_type,
content=content
)

View file

@ -0,0 +1,66 @@
# Generated by Django 5.2.8 on 2025-12-17 14:05
import django.db.models.deletion
import uuid
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.CreateModel(
name='Agent',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('uuid', models.UUIDField(default=uuid.uuid4, editable=False, unique=True)),
('name', models.CharField(max_length=255)),
('description', models.TextField(blank=True, default='')),
('status', models.CharField(choices=[('idle', 'Idle'), ('running', 'Running'), ('paused', 'Paused'), ('completed', 'Completed'), ('failed', 'Failed')], default='idle', max_length=20)),
('task_id', models.CharField(blank=True, max_length=255, null=True)),
('created_at', models.DateTimeField(auto_now_add=True)),
('updated_at', models.DateTimeField(auto_now=True)),
('started_at', models.DateTimeField(blank=True, null=True)),
('completed_at', models.DateTimeField(blank=True, null=True)),
('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='agents', to=settings.AUTH_USER_MODEL)),
],
),
migrations.CreateModel(
name='AgentExecution',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('uuid', models.UUIDField(default=uuid.uuid4, editable=False, unique=True)),
('status', models.CharField(choices=[('queued', 'Queued'), ('running', 'Running'), ('completed', 'Completed'), ('failed', 'Failed')], default='queued', max_length=20)),
('input_data', models.JSONField(default=dict)),
('output_data', models.JSONField(blank=True, default=dict)),
('error_message', models.TextField(blank=True, default='')),
('created_at', models.DateTimeField(auto_now_add=True)),
('started_at', models.DateTimeField(blank=True, null=True)),
('completed_at', models.DateTimeField(blank=True, null=True)),
('agent', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='executions', to='agents.agent')),
('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='agent_executions', to=settings.AUTH_USER_MODEL)),
],
),
migrations.CreateModel(
name='AgentEvent',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('uuid', models.UUIDField(default=uuid.uuid4, editable=False, unique=True)),
('event_type', models.CharField(choices=[('started', 'Started'), ('message', 'Message'), ('progress', 'Progress'), ('completed', 'Completed'), ('error', 'Error'), ('step', 'Step')], max_length=20)),
('content', models.JSONField()),
('timestamp', models.DateTimeField(auto_now_add=True)),
('execution', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='events', to='agents.agentexecution')),
],
options={
'verbose_name': 'Agent Event',
'verbose_name_plural': 'Agent Events',
'ordering': ['timestamp'],
},
),
]

View file

View file

@ -1,22 +1,88 @@
from django.db import models
from django.utils import timezone
from apps.users.models import User
import uuid
class AgentRun(models.Model):
agent_name = models.CharField(max_length=255)
input_text = models.TextField()
output_text = models.TextField(blank=True, null=True)
created_at = models.DateTimeField(default=timezone.now)
class Agent(models.Model):
STATUS_CHOICES = [
('idle', 'Idle'),
('running', 'Running'),
('paused', 'Paused'),
('completed', 'Completed'),
('failed', 'Failed'),
]
uuid = models.UUIDField(default=uuid.uuid4, editable=False, unique=True)
user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='agents')
name = models.CharField(max_length=255)
description = models.TextField(blank=True, default="")
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='idle')
task_id = models.CharField(max_length=255, blank=True, null=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
started_at = models.DateTimeField(null=True, blank=True)
completed_at = models.DateTimeField(null=True, blank=True)
def __str__(self) -> str:
return f"{self.name} ({self.status})"
class Meta:
ordering = ["-created_at"]
verbose_name = "Agent"
verbose_name_plural = "Agents"
class ModelMeta(models.Model):
name = models.CharField(max_length=255)
path = models.CharField(max_length=1024, blank=True, null=True)
framework = models.CharField(max_length=100, blank=True, null=True)
created_at = models.DateTimeField(default=timezone.now)
class AgentExecution(models.Model):
uuid = models.UUIDField(default=uuid.uuid4, editable=False, unique=True)
agent = models.ForeignKey(Agent, on_delete=models.CASCADE, related_name='executions')
user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='agent_executions')
status = models.CharField(max_length=20, choices=[
('queued', 'Queued'),
('running', 'Running'),
('completed', 'Completed'),
('failed', 'Failed'),
], default='queued')
input_data = models.JSONField(default=dict)
output_data = models.JSONField(default=dict, blank=True)
error_message = models.TextField(blank=True, default="")
created_at = models.DateTimeField(auto_now_add=True)
started_at = models.DateTimeField(null=True, blank=True)
completed_at = models.DateTimeField(null=True, blank=True)
def __str__(self) -> str:
return f"Execution {self.uuid} - {self.agent.name} ({self.status})"
class Meta:
verbose_name = "Agent Execution"
verbose_name_plural = "Agent Executions"
class AgentEvent(models.Model):
EVENT_TYPES = [
('started', 'Started'),
('message', 'Message'),
('progress', 'Progress'),
('completed', 'Completed'),
('error', 'Error'),
('step', 'Step'),
]
uuid = models.UUIDField(default=uuid.uuid4, editable=False, unique=True)
execution = models.ForeignKey(AgentExecution, on_delete=models.CASCADE, related_name='events')
event_type = models.CharField(max_length=20, choices=EVENT_TYPES)
content = models.JSONField()
timestamp = models.DateTimeField(auto_now_add=True)
def __str__(self) -> str:
return f"{self.id} - {self.event_type} - {self.execution.agent.name}"
class Meta:
ordering = ['timestamp']
verbose_name = "Agent Event"
verbose_name_plural = "Agent Events"
def __str__(self) -> str: # pragma: no cover - trivial
return self.name

7
apps/agents/routing.py Normal file
View file

@ -0,0 +1,7 @@
from django.urls import path
from apps.agents import consumers
websocket_urlpatterns = [
path('ws/agents/<str:agent_id>/', consumers.AgentConsumer.as_asgi()),
]

View file

@ -0,0 +1,26 @@
from rest_framework import serializers
from apps.agents.models import Agent, AgentExecution, AgentEvent
class AgentEventSerializer(serializers.ModelSerializer):
class Meta:
model = AgentEvent
fields = ['uuid', 'event_type', 'content', 'timestamp']
class AgentExecutionSerializer(serializers.ModelSerializer):
events = AgentEventSerializer(many=True, read_only=True)
class Meta:
model = AgentExecution
fields = ['uuid', 'agent', 'user', 'status', 'input_data', 'output_data', 'error_message', 'created_at', 'started_at', 'completed_at', 'events']
read_only_fields = ['uuid', 'created_at', 'started_at', 'completed_at', 'events']
class AgentSerializer(serializers.ModelSerializer):
executions = AgentExecutionSerializer(many=True, read_only=True)
class Meta:
model = Agent
fields = ['uuid', 'user', 'name', 'description', 'status', 'task_id', 'created_at', 'updated_at', 'started_at', 'completed_at', 'executions']
read_only_fields = ['uuid', 'user', 'created_at', 'updated_at']

135
apps/agents/tasks.py Normal file
View file

@ -0,0 +1,135 @@
from celery import shared_task
from django.utils import timezone
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
from apps.agents.models import Agent, AgentExecution, AgentEvent
import json
from django.conf import settings
import asyncio
@shared_task
def start_agent_task_mcp(execution_id):
print(f"[start_agent_task_mcp] invoked with execution_id={execution_id}")
try:
execution = AgentExecution.objects.get(uuid=execution_id)
print(f"[start_agent_task_mcp] execution record loaded: agent={execution.agent.uuid}")
execution.status = 'running'
execution.started_at = timezone.now()
execution.save()
channel_layer = get_channel_layer()
room_group_name = f"agent_{execution.agent.uuid}"
try:
async_to_sync(channel_layer.group_send)(
room_group_name,
{
"type": "agent_event",
"event_type": "started",
"content": {
"execution_id": str(execution.uuid),
"agent_id": str(execution.agent.uuid),
"message": "Agent execution started (via MCP)"
},
"timestamp": timezone.now().isoformat()
}
)
except Exception as channel_error:
print(f"Channel layer error: {channel_error}")
AgentEvent.objects.create(
execution=execution,
event_type='started',
content={"execution_id": str(execution.uuid), "method": "mcp"}
)
from mcp_agent.mcp_client import MCPAgentClient
async def execute_remote():
async with MCPAgentClient() as client:
return await client.execute_agent(
agent_id=str(execution.agent.uuid),
agent_name=execution.agent.name,
execution_id=str(execution.uuid),
query=execution.input_data.get("query", ""),
input_data=execution.input_data
)
result = asyncio.run(execute_remote())
print(f"[start_agent_task_mcp] MCP result: {result.get('status')}")
if result.get('events'):
for event in result['events']:
try:
async_to_sync(channel_layer.group_send)(
room_group_name,
{
"type": "agent_event",
"event_type": event.get('type', 'message'),
"content": event,
"timestamp": event.get('timestamp', timezone.now().isoformat())
}
)
except Exception as e:
print(f"Error forwarding event: {e}")
if result.get('status') == 'completed':
execution.status = 'completed'
execution.output_data = result
elif result.get('status') in ['failed', 'error']:
execution.status = 'failed'
execution.error_message = result.get('error', 'Unknown error')
execution.output_data = result
else:
execution.status = 'completed'
execution.output_data = result
execution.completed_at = timezone.now()
execution.save()
try:
async_to_sync(channel_layer.group_send)(
room_group_name,
{
"type": "agent_completed",
"execution_id": str(execution.uuid),
"output_data": result
}
)
except Exception as channel_error:
print(f"Channel layer error: {channel_error}")
AgentEvent.objects.create(
execution=execution,
event_type='completed',
content={"execution_id": str(execution.uuid), "output": result}
)
except AgentExecution.DoesNotExist:
print(f"Execution {execution_id} not found")
except Exception as e:
print(f"[start_agent_task_mcp] exception: {e}")
import traceback
traceback.print_exc()
try:
execution = AgentExecution.objects.get(uuid=execution_id)
execution.status = 'failed'
execution.error_message = str(e)
execution.completed_at = timezone.now()
execution.save()
channel_layer = get_channel_layer()
room_group_name = f"agent_{execution.agent.uuid}"
async_to_sync(channel_layer.group_send)(
room_group_name,
{
"type": "agent_error",
"execution_id": str(execution.uuid),
"error_message": str(e)
}
)
except:
pass

57
apps/agents/viewsets.py Normal file
View file

@ -0,0 +1,57 @@
from rest_framework import viewsets
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated
from apps.agents.models import Agent, AgentExecution
from apps.agents.serializers import AgentSerializer, AgentExecutionSerializer
from apps.agents.tasks import start_agent_task_mcp
class AgentViewSet(viewsets.ModelViewSet):
serializer_class = AgentSerializer
permission_classes = [IsAuthenticated]
lookup_field = 'uuid'
def get_queryset(self):
return Agent.objects.filter(user=self.request.user)
def perform_create(self, serializer):
serializer.save(user=self.request.user)
@action(detail=True, methods=['post'])
def start(self, request, uuid=None):
agent = self.get_object()
input_data = request.data.get('input_data', {})
execution = AgentExecution.objects.create(
agent=agent,
user=request.user,
input_data=input_data
)
start_agent_task_mcp.delay(str(execution.uuid))
serializer = AgentExecutionSerializer(execution)
return Response({
"status": "queued",
"execution": serializer.data,
"message": "Agent task queued for execution"
})
class AgentExecutionViewSet(viewsets.ReadOnlyModelViewSet):
serializer_class = AgentExecutionSerializer
permission_classes = [IsAuthenticated]
lookup_field = 'uuid'
def get_queryset(self):
return AgentExecution.objects.filter(user=self.request.user)
@action(detail=True, methods=['get'])
def events(self, request, uuid=None):
execution = self.get_object()
events = execution.events.all().values()
return Response({
"execution_id": str(execution.uuid),
"events": list(events)
})