413 lines
17 KiB
Python
413 lines
17 KiB
Python
|
|
import json
|
||
|
|
import httpx
|
||
|
|
import re
|
||
|
|
import logging
|
||
|
|
from uuid import uuid4
|
||
|
|
from channels.generic.websocket import AsyncWebsocketConsumer
|
||
|
|
from channels.db import database_sync_to_async
|
||
|
|
from django.utils import timezone
|
||
|
|
from django.conf import settings
|
||
|
|
|
||
|
|
from apps.onboarding.mcp import MCPRouter
|
||
|
|
from apps.onboarding.models import AgentConfig, OnboardingFlow, OnboardingSession
|
||
|
|
|
||
|
|
logger = logging.getLogger(__name__)
|
||
|
|
|
||
|
|
class OnboardingConsumer(AsyncWebsocketConsumer):
|
||
|
|
async def connect(self):
|
||
|
|
self.user = self.scope["user"]
|
||
|
|
|
||
|
|
self.context_uuid = self.scope["url_route"]["kwargs"].get("session_uuid")
|
||
|
|
|
||
|
|
if not self.user.is_authenticated:
|
||
|
|
await self.close()
|
||
|
|
return
|
||
|
|
|
||
|
|
self.router = MCPRouter()
|
||
|
|
await self.accept()
|
||
|
|
|
||
|
|
async def disconnect(self, close_code):
|
||
|
|
pass
|
||
|
|
|
||
|
|
def _build_system_prompt(self, config):
|
||
|
|
base_prompt = config.system_prompt or "You are a helpful onboarding assistant."
|
||
|
|
permissions = config.tool_permissions or []
|
||
|
|
if permissions:
|
||
|
|
return f"{base_prompt}\n\nAllowed tools: {', '.join(str(p) for p in permissions)}"
|
||
|
|
return base_prompt
|
||
|
|
|
||
|
|
async def receive(self, text_data):
|
||
|
|
try:
|
||
|
|
data = json.loads(text_data)
|
||
|
|
action = data.get("action")
|
||
|
|
|
||
|
|
if action == "start_full_onboarding":
|
||
|
|
role_uuid = data.get("role_uuid")
|
||
|
|
if not role_uuid:
|
||
|
|
await self.send_log("error", "Missing role_uuid for full onboarding generation")
|
||
|
|
return
|
||
|
|
await self.run_full_onboarding_generation(role_uuid)
|
||
|
|
elif action == "progress_monitor":
|
||
|
|
role_uuid = data.get("role_uuid") or self.context_uuid
|
||
|
|
if not role_uuid:
|
||
|
|
await self.send_log("error", "Missing role_uuid for progress monitoring")
|
||
|
|
return
|
||
|
|
await self.run_progress_monitor(role_uuid)
|
||
|
|
else:
|
||
|
|
|
||
|
|
user_message = data.get("query") or data.get("message")
|
||
|
|
if not user_message:
|
||
|
|
await self.send_log("error", "Missing query/message payload")
|
||
|
|
return
|
||
|
|
config = await self.get_config(self.context_uuid)
|
||
|
|
ai_response = await self.orchestrate_ai(user_message, config)
|
||
|
|
|
||
|
|
await self.send(json.dumps({
|
||
|
|
"type": "completed",
|
||
|
|
"timestamp": timezone.now().isoformat(),
|
||
|
|
"message": "Inference complete.",
|
||
|
|
"content": {
|
||
|
|
"response": ai_response,
|
||
|
|
}
|
||
|
|
}))
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"WebSocket Receive Error: {str(e)}")
|
||
|
|
await self.send_log("error", f"Consumer encountered an error: {str(e)}")
|
||
|
|
|
||
|
|
async def run_full_onboarding_generation(self, role_uuid):
|
||
|
|
"""
|
||
|
|
The Master Script that builds the JSON structure sequentially.
|
||
|
|
Pipeline: Curriculum Agent -> Knowledge Agent -> Assessment Agent
|
||
|
|
"""
|
||
|
|
|
||
|
|
await self.send_log("status", "Phase 1: Generating Curriculum...", "curriculum")
|
||
|
|
ca_config = await self.get_config_by_type(role_uuid, 'curriculum')
|
||
|
|
if not ca_config:
|
||
|
|
await self.send_log("error", "Missing curriculum AgentConfig for this role")
|
||
|
|
return
|
||
|
|
|
||
|
|
ca_prompt = (
|
||
|
|
"Based on available documentation, create an onboarding curriculum for this role. "
|
||
|
|
"Output ONLY a valid JSON array of 3-5 strings representing module titles. "
|
||
|
|
"Example: [\"Introduction\", \"Safety\", \"Operations\"]"
|
||
|
|
)
|
||
|
|
ca_response = await self.orchestrate_ai(ca_prompt, ca_config)
|
||
|
|
topics = self._extract_json_list(ca_response)
|
||
|
|
if not topics:
|
||
|
|
await self.send_log("error", "Curriculum generation returned no topics")
|
||
|
|
return
|
||
|
|
|
||
|
|
toc_lines = [f"{idx + 1}. {title}" for idx, title in enumerate(topics)]
|
||
|
|
toc_markdown = "## Table of Contents\n\n" + "\n".join(toc_lines)
|
||
|
|
|
||
|
|
full_structure = []
|
||
|
|
|
||
|
|
|
||
|
|
for index, topic in enumerate(topics):
|
||
|
|
|
||
|
|
await self.send_log("status", f"Phase 2: Researching {topic}...", "knowledge")
|
||
|
|
ka_config = await self.get_config_by_type(role_uuid, 'knowledge')
|
||
|
|
if not ka_config:
|
||
|
|
await self.send_log("error", "Missing knowledge AgentConfig for this role")
|
||
|
|
return
|
||
|
|
|
||
|
|
knowledge_hits = await self.fetch_knowledge_context(role_uuid, topic)
|
||
|
|
context_markdown = self.format_knowledge_context(knowledge_hits)
|
||
|
|
|
||
|
|
page_content = await self.orchestrate_ai(
|
||
|
|
(
|
||
|
|
f"Write a practical onboarding training guide for the topic '{topic}'. "
|
||
|
|
"Use the MCP search context provided below as the primary source. "
|
||
|
|
"If the context is empty, provide a concise best-practice overview and clearly say no indexed documents were found. "
|
||
|
|
"Use Markdown formatting and do NOT include a table of contents in this section.\n\n"
|
||
|
|
f"Role UUID: {role_uuid}\n"
|
||
|
|
f"MCP search context:\n{context_markdown}"
|
||
|
|
),
|
||
|
|
ka_config
|
||
|
|
)
|
||
|
|
|
||
|
|
if index == 0:
|
||
|
|
page_content = f"{toc_markdown}\n\n---\n\n{page_content}"
|
||
|
|
|
||
|
|
|
||
|
|
await self.send_log("status", f"Phase 3: Creating quiz for {topic}...", "assessment")
|
||
|
|
aa_config = await self.get_config_by_type(role_uuid, 'assessment')
|
||
|
|
if not aa_config:
|
||
|
|
await self.send_log("error", "Missing assessment AgentConfig for this role")
|
||
|
|
return
|
||
|
|
aa_prompt = (
|
||
|
|
f"Based on this content: '{page_content[:1000]}', create 2 multiple choice questions. "
|
||
|
|
"Output ONLY a JSON array of objects with keys: 'key', 'label', 'field_type' (use 'select'), "
|
||
|
|
"'options' (array of strings), and 'required' (true)."
|
||
|
|
)
|
||
|
|
quiz_response = await self.orchestrate_ai(aa_prompt, aa_config)
|
||
|
|
quiz_fields = self._extract_json_list(quiz_response)
|
||
|
|
|
||
|
|
|
||
|
|
full_structure.append({
|
||
|
|
"title": topic,
|
||
|
|
"body": page_content,
|
||
|
|
"order": index,
|
||
|
|
"fields": quiz_fields
|
||
|
|
})
|
||
|
|
|
||
|
|
|
||
|
|
await self.save_full_flow(role_uuid, full_structure)
|
||
|
|
|
||
|
|
|
||
|
|
await self.send(json.dumps({
|
||
|
|
"type": "completed",
|
||
|
|
"timestamp": timezone.now().isoformat(),
|
||
|
|
"message": "Onboarding pipeline complete and structure saved."
|
||
|
|
}))
|
||
|
|
|
||
|
|
async def run_progress_monitor(self, role_uuid):
|
||
|
|
await self.send_log("status", "Progress Monitor is analyzing your onboarding progress...", "monitor")
|
||
|
|
|
||
|
|
monitor_config = await self.get_config_by_type(role_uuid, 'monitor')
|
||
|
|
if not monitor_config:
|
||
|
|
await self.send_log("error", "Missing Progress Monitor AgentConfig for this role")
|
||
|
|
return
|
||
|
|
|
||
|
|
progress_context = await self.get_role_progress_context(role_uuid, self.user.id)
|
||
|
|
|
||
|
|
monitor_prompt = (
|
||
|
|
"You are a progress monitoring agent for onboarding. "
|
||
|
|
"Analyze the role onboarding data below and provide concise feedback with:\n"
|
||
|
|
"1) current status\n2) strengths\n3) gaps\n4) next actions\n"
|
||
|
|
"Keep it short and practical.\n\n"
|
||
|
|
f"Progress context JSON:\n{json.dumps(progress_context)}"
|
||
|
|
)
|
||
|
|
|
||
|
|
feedback = await self.orchestrate_ai(monitor_prompt, monitor_config)
|
||
|
|
|
||
|
|
await self.send(json.dumps({
|
||
|
|
"type": "completed",
|
||
|
|
"timestamp": timezone.now().isoformat(),
|
||
|
|
"message": "Progress analysis complete.",
|
||
|
|
"content": {
|
||
|
|
"role_uuid": role_uuid,
|
||
|
|
"feedback": feedback,
|
||
|
|
"status": progress_context.get("latest_status", "unknown"),
|
||
|
|
}
|
||
|
|
}))
|
||
|
|
|
||
|
|
async def orchestrate_ai(self, user_message, config):
|
||
|
|
"""
|
||
|
|
Handles the multi-turn ReAct loop (Reasoning + Tool Use).
|
||
|
|
"""
|
||
|
|
messages = [
|
||
|
|
{"role": "system", "content": self._build_system_prompt(config)},
|
||
|
|
{"role": "user", "content": user_message}
|
||
|
|
]
|
||
|
|
|
||
|
|
async with httpx.AsyncClient(timeout=60.0) as client:
|
||
|
|
for turn in range(5):
|
||
|
|
await self.send_log("thought", f"Agent is thinking (Turn {turn+1})...")
|
||
|
|
|
||
|
|
try:
|
||
|
|
response = await client.post(
|
||
|
|
f"{settings.INFERENCE_URL}/v1/chat/completions",
|
||
|
|
json={
|
||
|
|
"model": config.llm_config.get("model_id", "meta-llama-3.1-8b"),
|
||
|
|
"messages": messages,
|
||
|
|
"tools": self.router.get_tool_definitions(),
|
||
|
|
"tool_choice": "auto"
|
||
|
|
}
|
||
|
|
)
|
||
|
|
response.raise_for_status()
|
||
|
|
res_json = response.json()
|
||
|
|
|
||
|
|
ai_message = res_json["choices"][0]["message"]
|
||
|
|
messages.append(ai_message)
|
||
|
|
|
||
|
|
if ai_message.get("tool_calls"):
|
||
|
|
for tool_call in ai_message["tool_calls"]:
|
||
|
|
fn_name = tool_call["function"]["name"]
|
||
|
|
fn_args = json.loads(tool_call["function"]["arguments"])
|
||
|
|
|
||
|
|
await self.send(json.dumps({
|
||
|
|
"type": "tool_start",
|
||
|
|
"message": f"Accessing knowledge base: {fn_name}...",
|
||
|
|
"content": fn_args
|
||
|
|
}))
|
||
|
|
|
||
|
|
|
||
|
|
result = await self.router.handle_tool_call(fn_name, fn_args)
|
||
|
|
|
||
|
|
messages.append({
|
||
|
|
"role": "tool",
|
||
|
|
"tool_call_id": tool_call["id"],
|
||
|
|
"name": fn_name,
|
||
|
|
"content": json.dumps(result)
|
||
|
|
})
|
||
|
|
continue
|
||
|
|
|
||
|
|
else:
|
||
|
|
return ai_message["content"]
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
await self.send_log("error", f"Inference failed: {str(e)}")
|
||
|
|
return f"Error: {str(e)}"
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
async def fetch_knowledge_context(self, role_uuid, topic):
|
||
|
|
query = f"onboarding training content for {topic}"
|
||
|
|
await self.send(json.dumps({
|
||
|
|
"type": "tool_start",
|
||
|
|
"message": "Accessing knowledge base: search_knowledge...",
|
||
|
|
"content": {"query": query, "role_uuid": role_uuid}
|
||
|
|
}))
|
||
|
|
|
||
|
|
try:
|
||
|
|
result = await self.router.handle_tool_call(
|
||
|
|
"search_knowledge",
|
||
|
|
{
|
||
|
|
"query": query,
|
||
|
|
"role_uuid": role_uuid,
|
||
|
|
},
|
||
|
|
)
|
||
|
|
|
||
|
|
await self.send(json.dumps({
|
||
|
|
"type": "tool_result",
|
||
|
|
"message": f"Retrieved {len(result) if isinstance(result, list) else 0} knowledge chunk(s)",
|
||
|
|
"content": result,
|
||
|
|
"timestamp": timezone.now().isoformat(),
|
||
|
|
}))
|
||
|
|
|
||
|
|
return result if isinstance(result, list) else []
|
||
|
|
except Exception as exc:
|
||
|
|
await self.send_log("error", f"Knowledge retrieval failed for topic '{topic}': {str(exc)}")
|
||
|
|
return []
|
||
|
|
|
||
|
|
def format_knowledge_context(self, knowledge_hits):
|
||
|
|
if not knowledge_hits:
|
||
|
|
return "No indexed MCP documents found for this role/topic."
|
||
|
|
|
||
|
|
lines = []
|
||
|
|
for idx, item in enumerate(knowledge_hits[:5]):
|
||
|
|
source = item.get("source", "Unknown Source") if isinstance(item, dict) else "Unknown Source"
|
||
|
|
relevance = item.get("relevance") if isinstance(item, dict) else None
|
||
|
|
content = item.get("content", "") if isinstance(item, dict) else ""
|
||
|
|
safe_content = str(content).strip()[:1600]
|
||
|
|
lines.append(
|
||
|
|
f"[{idx + 1}] Source: {source} | Relevance: {relevance}\n{safe_content}"
|
||
|
|
)
|
||
|
|
|
||
|
|
return "\n\n".join(lines)
|
||
|
|
|
||
|
|
def _extract_json_list(self, text):
|
||
|
|
"""Regex helper to pull JSON out of LLM conversational filler."""
|
||
|
|
try:
|
||
|
|
if not text:
|
||
|
|
return []
|
||
|
|
match = re.search(r'\[.*\]', text, re.DOTALL)
|
||
|
|
if match:
|
||
|
|
return json.loads(match.group())
|
||
|
|
return []
|
||
|
|
except Exception:
|
||
|
|
return []
|
||
|
|
|
||
|
|
def _normalize_structure(self, structure):
|
||
|
|
normalized_pages = []
|
||
|
|
for index, page in enumerate(structure or []):
|
||
|
|
fields = []
|
||
|
|
for field_index, field in enumerate(page.get('fields', []) if isinstance(page, dict) else []):
|
||
|
|
if not isinstance(field, dict):
|
||
|
|
continue
|
||
|
|
key = str(field.get('key') or f'field_{field_index + 1}')
|
||
|
|
fields.append({
|
||
|
|
'uuid': str(uuid4()),
|
||
|
|
'key': key,
|
||
|
|
'label': str(field.get('label') or key.replace('_', ' ').title()),
|
||
|
|
'field_type': str(field.get('field_type') or 'text'),
|
||
|
|
'required': bool(field.get('required', False)),
|
||
|
|
'options': field.get('options') if isinstance(field.get('options'), list) else [],
|
||
|
|
'default_value': field.get('default_value', ''),
|
||
|
|
})
|
||
|
|
|
||
|
|
page_title = page.get('title') if isinstance(page, dict) else None
|
||
|
|
page_body = page.get('body') if isinstance(page, dict) else ''
|
||
|
|
page_order = page.get('order') if isinstance(page, dict) else index
|
||
|
|
normalized_pages.append({
|
||
|
|
'uuid': str(uuid4()),
|
||
|
|
'title': str(page_title or f'Module {index + 1}'),
|
||
|
|
'body': str(page_body or ''),
|
||
|
|
'order': int(page_order if isinstance(page_order, int) else index),
|
||
|
|
'fields': fields,
|
||
|
|
})
|
||
|
|
return normalized_pages
|
||
|
|
|
||
|
|
@database_sync_to_async
|
||
|
|
def save_full_flow(self, role_uuid, structure):
|
||
|
|
"""Saves the final nested structure to the OnboardingFlow model."""
|
||
|
|
from apps.accounts.models import Role
|
||
|
|
role = Role.objects.get(uuid=role_uuid)
|
||
|
|
normalized_structure = self._normalize_structure(structure)
|
||
|
|
flow, _ = OnboardingFlow.objects.update_or_create(
|
||
|
|
role=role,
|
||
|
|
defaults={
|
||
|
|
'title': f"AI Onboarding: {role.name}",
|
||
|
|
'structure': normalized_structure,
|
||
|
|
'is_active': True
|
||
|
|
}
|
||
|
|
)
|
||
|
|
return flow
|
||
|
|
|
||
|
|
async def send_log(self, log_type, message, content=None):
|
||
|
|
await self.send(json.dumps({
|
||
|
|
"type": log_type,
|
||
|
|
"message": message,
|
||
|
|
"content": content,
|
||
|
|
"timestamp": timezone.now().isoformat()
|
||
|
|
}))
|
||
|
|
|
||
|
|
@database_sync_to_async
|
||
|
|
def get_config(self, config_uuid):
|
||
|
|
return AgentConfig.objects.get(uuid=config_uuid)
|
||
|
|
|
||
|
|
@database_sync_to_async
|
||
|
|
def get_config_by_type(self, role_uuid, agent_type):
|
||
|
|
return AgentConfig.objects.filter(
|
||
|
|
organization__roles__uuid=role_uuid,
|
||
|
|
agent_type=agent_type,
|
||
|
|
).order_by('-updated_at').first()
|
||
|
|
|
||
|
|
@database_sync_to_async
|
||
|
|
def get_role_progress_context(self, role_uuid, user_id):
|
||
|
|
from apps.accounts.models import Role
|
||
|
|
|
||
|
|
role = Role.objects.get(uuid=role_uuid)
|
||
|
|
sessions = OnboardingSession.objects.filter(user_id=user_id, role=role).order_by('-updated_at')
|
||
|
|
latest_session = sessions.first()
|
||
|
|
active_flow = OnboardingFlow.objects.filter(role=role, is_active=True).order_by('-updated_at').first()
|
||
|
|
|
||
|
|
if not latest_session:
|
||
|
|
return {
|
||
|
|
"role_uuid": str(role.uuid),
|
||
|
|
"role_name": role.name,
|
||
|
|
"latest_status": "not_started",
|
||
|
|
"session_count": 0,
|
||
|
|
"flow_exists": bool(active_flow),
|
||
|
|
"progress": 0,
|
||
|
|
"responses_count": 0,
|
||
|
|
"completed_modules": [],
|
||
|
|
}
|
||
|
|
|
||
|
|
state = latest_session.state or {}
|
||
|
|
responses = state.get("responses", {})
|
||
|
|
completed_modules = state.get("completed_modules", [])
|
||
|
|
progress = state.get("progress_percentage", state.get("progress", 0))
|
||
|
|
|
||
|
|
return {
|
||
|
|
"role_uuid": str(role.uuid),
|
||
|
|
"role_name": role.name,
|
||
|
|
"latest_status": latest_session.status,
|
||
|
|
"session_count": sessions.count(),
|
||
|
|
"flow_exists": bool(active_flow),
|
||
|
|
"progress": progress,
|
||
|
|
"responses_count": len(responses) if isinstance(responses, dict) else 0,
|
||
|
|
"completed_modules": completed_modules if isinstance(completed_modules, list) else [],
|
||
|
|
"updated_at": latest_session.updated_at.isoformat() if latest_session.updated_at else None,
|
||
|
|
}
|