From 6aa98b283909599574c74d2fef13d26e6e878ae1 Mon Sep 17 00:00:00 2001 From: Viswamedha Nalabotu Date: Wed, 18 Mar 2026 00:37:38 +0000 Subject: [PATCH] Refactored consumers, added profanity filters and rewrote endpoints --- apps/onboarding/admin.py | 2 +- apps/onboarding/consumers.py | 912 --------------------- apps/onboarding/consumers/__init__.py | 4 + apps/onboarding/consumers/base.py | 214 +++++ apps/onboarding/consumers/chat.py | 25 + apps/onboarding/consumers/generate.py | 402 +++++++++ apps/onboarding/consumers/progress.py | 146 ++++ apps/onboarding/consumers/prompts.py | 71 ++ apps/onboarding/mcp.py | 4 +- apps/onboarding/migrations/0001_initial.py | 1 - apps/onboarding/models.py | 3 +- apps/onboarding/routing.py | 6 +- apps/onboarding/serializers.py | 2 +- apps/onboarding/tests/test_api.py | 3 +- apps/onboarding/tests/test_models.py | 4 +- apps/onboarding/utils/content_moderator.py | 14 + apps/onboarding/viewsets.py | 11 +- requirements/django.txt | 1 + site/src/stores/agentStore.ts | 4 +- site/src/stores/onboardingAgentStore.ts | 2 +- site/src/views/AgentDetailView.vue | 5 - site/src/views/ProgressDetailView.vue | 15 +- 22 files changed, 898 insertions(+), 953 deletions(-) delete mode 100644 apps/onboarding/consumers.py create mode 100644 apps/onboarding/consumers/__init__.py create mode 100644 apps/onboarding/consumers/base.py create mode 100644 apps/onboarding/consumers/chat.py create mode 100644 apps/onboarding/consumers/generate.py create mode 100644 apps/onboarding/consumers/progress.py create mode 100644 apps/onboarding/consumers/prompts.py create mode 100644 apps/onboarding/utils/content_moderator.py diff --git a/apps/onboarding/admin.py b/apps/onboarding/admin.py index 188a706..5ee1cd6 100644 --- a/apps/onboarding/admin.py +++ b/apps/onboarding/admin.py @@ -19,7 +19,7 @@ class AgentConfigAdmin(admin.ModelAdmin): search_fields = ('name', 'system_prompt') readonly_fields = ('uuid', 'created_at', 'updated_at') fieldsets = ( - (None, {'fields': ('name', 'agent_type', 'organization', 'uuid', 'system_prompt', 'llm_config', 'tool_permissions')}), + (None, {'fields': ('name', 'agent_type', 'organization', 'uuid', 'system_prompt', 'llm_config')}), (_('Agent Logic'), {'fields': ()}), (_('Metadata'), {'fields': ('created_at', 'updated_at')}), ) diff --git a/apps/onboarding/consumers.py b/apps/onboarding/consumers.py deleted file mode 100644 index c6df7a0..0000000 --- a/apps/onboarding/consumers.py +++ /dev/null @@ -1,912 +0,0 @@ -import json -import logging -import re -from uuid import uuid4 - -import httpx -from channels.db import database_sync_to_async -from channels.generic.websocket import AsyncWebsocketConsumer -from django.conf import settings -from django.db.models import Q -from django.utils import timezone - -from apps.onboarding.mcp import MCPRouter -from apps.onboarding.models import AgentConfig, OnboardingFlow, OnboardingSession - -logger = logging.getLogger(__name__) - -class OnboardingConsumer(AsyncWebsocketConsumer): - async def connect(self): - self.user = self.scope["user"] - - self.context_uuid = self.scope["url_route"]["kwargs"].get("session_uuid") - - if not self.user.is_authenticated: - logger.warning("WebSocket connect denied: unauthenticated user") - await self.close() - return - - self.router = MCPRouter() - logger.info("WebSocket connected: user_id=%s context_uuid=%s", self.user.id, self.context_uuid) - await self.accept() - - async def disconnect(self, close_code): - logger.info("WebSocket disconnected: user_id=%s context_uuid=%s close_code=%s", getattr(self.user, 'id', None), self.context_uuid, close_code) - - def _build_system_prompt(self, config): - base_prompt = config.system_prompt or "You are a helpful onboarding assistant." - permissions = config.tool_permissions or [] - if permissions: - return f"{base_prompt}\n\nAllowed tools: {', '.join(str(p) for p in permissions)}" - return base_prompt - - async def receive(self, text_data): - try: - data = json.loads(text_data) - action = data.get("action") - logger.info("WebSocket received action=%s user_id=%s context_uuid=%s", action, self.user.id, self.context_uuid) - - if action == "start_full_onboarding": - role_uuid = data.get("role_uuid") - if not role_uuid: - await self.send_log("error", "Missing role_uuid for full onboarding generation") - return - if not await self.can_manage_role(role_uuid, self.user.id): - await self.send_log("error", "Forbidden") - return - await self.run_full_onboarding_generation(role_uuid) - elif action == "progress_monitor": - role_uuid = data.get("role_uuid") or self.context_uuid - target_user_uuid = data.get("user_uuid") - flow_uuid = data.get("flow_uuid") - if not role_uuid: - await self.send_log("error", "Missing role_uuid for progress monitoring") - return - if not await self.can_access_role(role_uuid, self.user.id): - await self.send_log("error", "Forbidden") - return - target_user_id = self.user.id - if target_user_uuid and str(target_user_uuid) != str(self.user.uuid): - target_user_id = await self.resolve_target_user_id(role_uuid, self.user.id, target_user_uuid) - if not target_user_id: - await self.send_log("error", "Forbidden") - return - - await self.run_progress_monitor(role_uuid, target_user_id=target_user_id, flow_uuid=flow_uuid) - else: - - user_message = data.get("query") or data.get("message") - requested_max_tokens = data.get("max_tokens") - if not user_message: - await self.send_log("error", "Missing query/message payload") - return - config = await self.get_config_for_user(self.context_uuid, self.user.id) - if config is None: - await self.send_log("error", "Forbidden") - return - ai_response = await self.orchestrate_ai( - user_message, - config, - max_tokens=requested_max_tokens, - ) - - await self.send(json.dumps({ - "type": "completed", - "timestamp": timezone.now().isoformat(), - "message": "Inference complete.", - "content": { - "response": ai_response, - } - })) - except Exception as e: - logger.exception("WebSocket receive error: user_id=%s context_uuid=%s", getattr(self.user, 'id', None), self.context_uuid) - await self.send_log("error", f"Consumer encountered an error: {str(e)}") - - async def run_full_onboarding_generation(self, role_uuid): - """ - The Master Script that builds the JSON structure sequentially. - Pipeline: Curriculum Agent -> Knowledge Agent -> Assessment Agent - """ - - logger.info("Starting full onboarding generation: role_uuid=%s user_id=%s", role_uuid, self.user.id) - await self.send_log("status", "Phase 1: Generating Curriculum...", "curriculum") - ca_config = await self.get_config_by_type(role_uuid, 'curriculum') - if not ca_config: - await self.send_log("error", "Missing curriculum AgentConfig for this role") - return - - ca_prompt = ( - "Based on available documentation, create an onboarding curriculum for this role. " - "Output ONLY a valid JSON array of 3-5 strings representing module titles. " - "Example: [\"Introduction\", \"Safety\", \"Operations\"]" - ) - ca_response = await self.orchestrate_ai( - ca_prompt, - ca_config, - min_internal_turns=1, - max_tokens=384, - ) - topics = self._extract_json_list(ca_response) - if not topics: - logger.warning("Curriculum generation produced no topics: role_uuid=%s", role_uuid) - await self.send_log("error", "Curriculum generation returned no topics") - return - - full_structure = [] - module_briefs = [] - - - for index, topic in enumerate(topics): - - await self.send_log("status", f"Phase 2: Researching {topic}...", "knowledge") - ka_config = await self.get_config_by_type(role_uuid, 'knowledge') - if not ka_config: - await self.send_log("error", "Missing knowledge AgentConfig for this role") - return - - knowledge_hits = await self.fetch_knowledge_context(role_uuid, topic) - context_markdown = self.format_knowledge_context(knowledge_hits) - - page_content = await self.orchestrate_ai( - ( - f"Write a practical onboarding training guide for the topic '{topic}'. " - "Think step-by-step internally before writing the final answer. " - "Use the MCP search context below as your primary source, and call additional tools if needed. " - "If no indexed documents are available, provide a concise best-practice overview and clearly say no indexed documents were found. " - "Use Markdown formatting and do NOT include a table of contents in this section. " - "Generate substantial depth: target 900-1400 words. " - "Include these sections in order: Overview, Core Concepts, Role-Specific Workflow, Practical Examples, Common Pitfalls, and Action Checklist. " - "In Practical Examples, provide at least 2 concrete examples relevant to this role/topic. " - "In Action Checklist, provide at least 8 actionable checklist items.\n\n" - f"Role UUID: {role_uuid}\n" - f"Topic: {topic}\n" - f"MCP search context:\n{context_markdown}" - ), - ka_config, - min_internal_turns=2, - max_tokens=2400, - ) - - - full_structure.append({ - "title": topic, - "body": page_content, - "order": index, - "fields": [], - "meta": { - "topic_index": index, - "table_of_contents": [str(item) for item in topics], - }, - }) - - module_briefs.append({ - "topic": str(topic), - "summary_excerpt": str(page_content)[:1200], - }) - - await self.send_log("status", "Phase 3: Creating final assessment quiz...", "assessment") - aa_config = await self.get_config_by_type(role_uuid, 'assessment') - if not aa_config: - await self.send_log("error", "Missing assessment AgentConfig for this role") - return - - question_count = 8 - try: - random_result = await self.router.handle_tool_call("random_int", {"min": 6, "max": 10}) - if isinstance(random_result, dict) and isinstance(random_result.get("value"), int): - question_count = int(random_result["value"]) - except Exception: - question_count = 8 - - quiz_prompt = ( - "Create a final onboarding quiz that assesses all generated modules. " - f"Output ONLY a valid JSON array of exactly {question_count} question objects. " - "Use a mix of question types: at least 2 short-answer questions and at least 2 multiple-choice questions. " - "For multiple-choice objects: field_type='select', options (4 unique strings), and validation.correct_option. " - "For short-answer objects: field_type='textarea' (or 'text') and validation.accepted_answers (array of valid answers/keywords). " - "Each object MUST include key, label, field_type, required=true, and validation.explanation. " - "Cover all topics with balanced difficulty and avoid ambiguous wording.\n\n" - f"Modules JSON:\n{json.dumps(module_briefs, ensure_ascii=False)}" - ) - quiz_response = await self.orchestrate_ai( - quiz_prompt, - aa_config, - min_internal_turns=1, - max_tokens=1600, - ) - quiz_fields = self._sanitize_quiz_fields(self._extract_json_list(quiz_response)) - - if not quiz_fields: - await self.send_log("status", "Assessment output invalid, retrying quiz generation...", "assessment") - retry_response = await self.orchestrate_ai( - f"{quiz_prompt}\n\nReturn ONLY raw JSON. Do not use markdown fences. Do not include explanations outside JSON.", - aa_config, - min_internal_turns=1, - max_tokens=1600, - ) - quiz_fields = self._sanitize_quiz_fields(self._extract_json_list(retry_response)) - - if not quiz_fields: - await self.send_log("status", "Assessment output still invalid. Using fallback final quiz.", "assessment") - quiz_fields = self._build_fallback_quiz_fields([str(topic) for topic in topics], count=question_count) - - full_structure.append({ - "title": "Final Assessment Quiz", - "body": ( - "### Final Quiz\n" - "Answer all questions below. You need **80%** to complete onboarding. " - "You can update answers and submit when ready." - ), - "order": len(full_structure), - "fields": quiz_fields, - "meta": { - "page_type": "final_quiz", - "pass_mark": 80, - }, - }) - - - await self.save_full_flow(role_uuid, full_structure) - logger.info("Full onboarding generation completed: role_uuid=%s pages=%s", role_uuid, len(full_structure)) - - - await self.send(json.dumps({ - "type": "completed", - "timestamp": timezone.now().isoformat(), - "message": "Onboarding pipeline complete and structure saved." - })) - - async def run_progress_monitor(self, role_uuid, target_user_id=None, flow_uuid=None): - logger.info( - "Starting progress monitor: role_uuid=%s requester_id=%s target_user_id=%s flow_uuid=%s", - role_uuid, - self.user.id, - target_user_id or self.user.id, - flow_uuid, - ) - await self.send_log("status", "Progress Monitor is analyzing your onboarding progress...", "monitor") - - monitor_config = await self.get_config_by_type(role_uuid, 'monitor') - if not monitor_config: - await self.send_log("error", "Missing Progress Monitor AgentConfig for this role") - return - - progress_context = await self.get_role_progress_context( - role_uuid, - target_user_id or self.user.id, - flow_uuid=flow_uuid, - ) - - monitor_prompt = ( - "You are a progress monitoring agent for onboarding. " - "Analyze the role onboarding data below and provide concise feedback with:\n" - "1) current status\n2) strengths\n3) gaps\n4) next actions\n" - "Use prior learner question/answer evidence and any saved marking details when available. " - "If evidence is insufficient, explicitly state what is missing.\n" - "Keep it short and practical.\n\n" - f"Progress context JSON:\n{json.dumps(progress_context)}" - ) - - try: - feedback = await self.orchestrate_ai( - monitor_prompt, - monitor_config, - min_internal_turns=1, - max_tokens=640, - raise_on_error=True, - ) - except Exception as exc: - await self.send_log("error", f"Inference failed: {str(exc)}") - return - - if str(feedback).startswith("Error:"): - await self.send_log("error", str(feedback)) - return - - await self.send(json.dumps({ - "type": "completed", - "timestamp": timezone.now().isoformat(), - "message": "Progress analysis complete.", - "content": { - "role_uuid": role_uuid, - "feedback": feedback, - "status": progress_context.get("latest_status", "unknown"), - "user_id": target_user_id or self.user.id, - "flow_uuid": flow_uuid, - "is_completed": progress_context.get("is_completed", False), - } - })) - logger.info("Progress monitor completed: role_uuid=%s target_user_id=%s", role_uuid, target_user_id or self.user.id) - - async def orchestrate_ai( - self, - user_message, - config, - min_internal_turns=2, - max_turns=6, - max_tokens=None, - raise_on_error=False, - ): - """ - Handles the multi-turn ReAct loop (Reasoning + Tool Use). - """ - messages = [ - {"role": "system", "content": self._build_system_prompt(config)}, - {"role": "user", "content": user_message} - ] - - llm_config = config.llm_config if isinstance(config.llm_config, dict) else {} - - resolved_max_tokens = max_tokens - if resolved_max_tokens is None: - resolved_max_tokens = llm_config.get("max_tokens", 1024) - - try: - resolved_max_tokens = max(64, int(resolved_max_tokens)) - except Exception: - resolved_max_tokens = 1024 - - last_content = "" - min_internal_turns = max(1, int(min_internal_turns or 1)) - max_turns = max(min_internal_turns, int(max_turns or 1)) - - async with httpx.AsyncClient(timeout=60.0) as client: - for turn in range(max_turns): - await self.send_log("thought", f"Agent is thinking (Turn {turn+1})...") - - try: - response = await client.post( - settings.INFERENCE_CHAT_COMPLETIONS_ENDPOINT, - json={ - "model": llm_config.get("model_id", "meta-llama-3.1-8b"), - "messages": messages, - "tools": self.router.get_tool_definitions(), - "tool_choice": "auto", - "max_tokens": resolved_max_tokens, - } - ) - response.raise_for_status() - res_json = response.json() - - ai_message = res_json["choices"][0]["message"] - messages.append(ai_message) - - if ai_message.get("tool_calls"): - for tool_call in ai_message["tool_calls"]: - fn_name = tool_call["function"]["name"] - fn_args = json.loads(tool_call["function"]["arguments"]) - - await self.send(json.dumps({ - "type": "tool_start", - "message": f"Accessing knowledge base: {fn_name}...", - "content": fn_args - })) - - - result = await self.router.handle_tool_call(fn_name, fn_args) - - messages.append({ - "role": "tool", - "tool_call_id": tool_call["id"], - "name": fn_name, - "content": json.dumps(result) - }) - continue - - else: - last_content = str(ai_message.get("content") or "").strip() - - if (turn + 1) < min_internal_turns: - messages.append({ - "role": "user", - "content": ( - "Run one more internal reasoning pass before finalizing. " - "If additional evidence is needed, call tools. " - "Then return only the improved final answer." - ), - }) - continue - - return last_content - - except Exception as e: - await self.send_log("error", f"Inference failed: {str(e)}") - logger.exception("Inference failed: user_id=%s context_uuid=%s", self.user.id, self.context_uuid) - if raise_on_error: - raise - return f"Error: {str(e)}" - - return last_content - - - - async def fetch_knowledge_context(self, role_uuid, topic): - query = f"onboarding training content for {topic}" - await self.send(json.dumps({ - "type": "tool_start", - "message": "Accessing knowledge base: search_knowledge...", - "content": {"query": query, "role_uuid": role_uuid} - })) - - try: - result = await self.router.handle_tool_call( - "search_knowledge", - { - "query": query, - "role_uuid": role_uuid, - }, - ) - - await self.send(json.dumps({ - "type": "tool_result", - "message": f"Retrieved {len(result) if isinstance(result, list) else 0} knowledge chunk(s)", - "content": result, - "timestamp": timezone.now().isoformat(), - })) - - return result if isinstance(result, list) else [] - except Exception as exc: - await self.send_log("error", f"Knowledge retrieval failed for topic '{topic}': {str(exc)}") - return [] - - def format_knowledge_context(self, knowledge_hits): - if not knowledge_hits: - return "No indexed MCP documents found for this role/topic." - - lines = [] - for idx, item in enumerate(knowledge_hits[:5]): - source = item.get("source", "Unknown Source") if isinstance(item, dict) else "Unknown Source" - relevance = item.get("relevance") if isinstance(item, dict) else None - content = item.get("content", "") if isinstance(item, dict) else "" - safe_content = str(content).strip()[:1600] - lines.append( - f"[{idx + 1}] Source: {source} | Relevance: {relevance}\n{safe_content}" - ) - - return "\n\n".join(lines) - - def _coerce_list_payload(self, payload): - if isinstance(payload, list): - return payload - if isinstance(payload, dict): - for key in ('questions', 'items', 'fields', 'quiz'): - value = payload.get(key) - if isinstance(value, list): - return value - return [] - - def _extract_json_list(self, text): - """Extracts a JSON list from model output, tolerating wrappers and markdown fences.""" - if not text: - return [] - - candidate_texts = [str(text).strip()] - - for block in re.findall(r'```(?:json)?\s*([\s\S]*?)```', str(text), re.IGNORECASE): - candidate_texts.append(block.strip()) - - decoder = json.JSONDecoder() - - for candidate in candidate_texts: - if not candidate: - continue - - try: - parsed = json.loads(candidate) - coerced = self._coerce_list_payload(parsed) - if coerced: - return coerced - except Exception: - pass - - for idx, char in enumerate(candidate): - if char not in '[{': - continue - try: - parsed, _ = decoder.raw_decode(candidate[idx:]) - except Exception: - continue - - coerced = self._coerce_list_payload(parsed) - if coerced: - return coerced - - return [] - - def _sanitize_quiz_fields(self, raw_fields): - sanitized = [] - seen_keys = set() - - for index, field in enumerate(raw_fields or []): - if not isinstance(field, dict): - continue - - key = str(field.get('key') or f'final_quiz_q_{index + 1}').strip().lower().replace(' ', '_') - if not key: - key = f'final_quiz_q_{index + 1}' - - if key in seen_keys: - key = f'{key}_{index + 1}' - seen_keys.add(key) - - label = str(field.get('label') or '').strip() - if not label: - continue - - field_type = str(field.get('field_type') or 'select').strip().lower() - if field_type not in ('select', 'text', 'textarea'): - field_type = 'select' - - raw_options = field.get('options') if isinstance(field.get('options'), list) else [] - options = [] - for option in raw_options: - option_text = str(option).strip() - if option_text and option_text not in options: - options.append(option_text) - - validation = field.get('validation') if isinstance(field.get('validation'), dict) else {} - if field_type == 'select': - if len(options) < 2: - continue - - correct_option = str(validation.get('correct_option') or '').strip() - if correct_option not in options: - correct_option = options[0] - - sanitized.append({ - 'key': key, - 'label': label, - 'field_type': 'select', - 'options': options[:5], - 'required': True, - 'validation': { - 'correct_option': correct_option, - 'explanation': str(validation.get('explanation') or ''), - }, - }) - continue - - accepted_answers_raw = validation.get('accepted_answers') - if isinstance(accepted_answers_raw, list): - accepted_answers = [str(item).strip() for item in accepted_answers_raw if str(item).strip()] - else: - accepted_single = str(validation.get('correct_answer') or '').strip() - accepted_answers = [accepted_single] if accepted_single else [] - - if not accepted_answers: - continue - - sanitized.append({ - 'key': key, - 'label': label, - 'field_type': 'textarea' if field_type == 'textarea' else 'text', - 'options': [], - 'required': True, - 'validation': { - 'accepted_answers': accepted_answers, - 'explanation': str(validation.get('explanation') or ''), - }, - }) - - return sanitized - - def _build_fallback_quiz_fields(self, topics, count=8): - safe_topics = [str(topic).strip() for topic in (topics or []) if str(topic).strip()] - if not safe_topics: - safe_topics = ['onboarding fundamentals'] - - fallback_fields = [] - for index in range(count): - topic = safe_topics[index % len(safe_topics)] - key = f'final_quiz_q_{index + 1}' - - if index % 3 == 0: - fallback_fields.append({ - 'key': key, - 'label': f"In one or two sentences, what is the safest approach when handling {topic}?", - 'field_type': 'textarea', - 'options': [], - 'required': True, - 'validation': { - 'accepted_answers': [ - 'best practices', - 'documentation', - 'quality', - 'compliance', - ], - 'explanation': 'Good answers reference documented best practices, quality checks, and compliance.', - }, - }) - continue - - correct = f"Use documented best practices for {topic}." - options = [ - correct, - f"Skip review steps for {topic} to move faster.", - f"Rely only on assumptions instead of evidence for {topic}.", - f"Ignore quality and compliance checks in {topic} tasks.", - ] - fallback_fields.append({ - 'key': key, - 'label': f"Which approach is most appropriate when working on {topic}?", - 'field_type': 'select', - 'options': options, - 'required': True, - 'validation': { - 'correct_option': correct, - 'explanation': f"{correct} balances reliability, quality, and role expectations.", - }, - }) - - return fallback_fields - - def _normalize_structure(self, structure): - normalized_pages = [] - for index, page in enumerate(structure or []): - fields = [] - for field_index, field in enumerate(page.get('fields', []) if isinstance(page, dict) else []): - if not isinstance(field, dict): - continue - key = str(field.get('key') or f'field_{field_index + 1}') - raw_options = field.get('options') if isinstance(field.get('options'), list) else [] - options = [str(option) for option in raw_options if str(option).strip()] - - validation = field.get('validation') if isinstance(field.get('validation'), dict) else {} - correct_option = validation.get('correct_option') - if correct_option is not None: - correct_option = str(correct_option) - - normalized_validation = { - 'correct_option': correct_option if correct_option in options else None, - 'accepted_answers': [ - str(item).strip() - for item in (validation.get('accepted_answers') if isinstance(validation.get('accepted_answers'), list) else []) - if str(item).strip() - ], - 'explanation': str(validation.get('explanation') or ''), - } - - fields.append({ - 'uuid': str(uuid4()), - 'key': key, - 'label': str(field.get('label') or key.replace('_', ' ').title()), - 'field_type': str(field.get('field_type') or 'text'), - 'required': bool(field.get('required', False)), - 'options': options, - 'default_value': field.get('default_value', ''), - 'validation': normalized_validation, - }) - - page_title = page.get('title') if isinstance(page, dict) else None - page_body = page.get('body') if isinstance(page, dict) else '' - page_order = page.get('order') if isinstance(page, dict) else index - normalized_pages.append({ - 'uuid': str(uuid4()), - 'title': str(page_title or f'Module {index + 1}'), - 'body': str(page_body or ''), - 'order': int(page_order if isinstance(page_order, int) else index), - 'fields': fields, - 'meta': page.get('meta') if isinstance(page.get('meta'), dict) else {}, - }) - return normalized_pages - - @database_sync_to_async - def save_full_flow(self, role_uuid, structure): - """Saves the final nested structure to the OnboardingFlow model.""" - from apps.accounts.models import Role - role = Role.objects.get(uuid=role_uuid) - normalized_structure = self._normalize_structure(structure) - flow, _ = OnboardingFlow.objects.update_or_create( - role=role, - defaults={ - 'title': f"AI Onboarding: {role.name}", - 'structure': normalized_structure, - 'is_active': True - } - ) - return flow - - async def send_log(self, log_type, message, content=None): - if log_type == "error": - logger.error("Consumer log event: type=%s message=%s content=%s", log_type, message, content) - elif log_type == "status": - logger.info("Consumer log event: type=%s message=%s content=%s", log_type, message, content) - else: - logger.debug("Consumer log event: type=%s message=%s content=%s", log_type, message, content) - await self.send(json.dumps({ - "type": log_type, - "message": message, - "content": content, - "timestamp": timezone.now().isoformat() - })) - - @database_sync_to_async - def get_config(self, config_uuid): - return AgentConfig.objects.get(uuid=config_uuid) - - @database_sync_to_async - def get_config_for_user(self, config_uuid, user_id): - return AgentConfig.objects.filter( - uuid=config_uuid, - ).filter( - Q(organization__owner__id=user_id) | Q(organization__members__id=user_id) - ).first() - - @database_sync_to_async - def can_access_role(self, role_uuid, user_id): - from apps.accounts.models import Role - - role = Role.objects.filter(uuid=role_uuid).first() - if role is None: - return False - - if role.organization.owner.id == user_id: - return True - - return role.organization.members.filter(id=user_id).exists() - - @database_sync_to_async - def can_manage_role(self, role_uuid, user_id): - from apps.accounts.models import Role, User - - role = Role.objects.filter(uuid=role_uuid).first() - user = User.objects.filter(id=user_id).first() - if role is None or user is None: - return False - - if role.organization.owner.id == user_id: - return True - - return bool(user.is_manager) and role.organization.members.filter(id=user_id).exists() - - @database_sync_to_async - def get_config_by_type(self, role_uuid, agent_type): - role_specific = AgentConfig.objects.filter( - role__uuid=role_uuid, - agent_type=agent_type, - ).order_by('-updated_at').first() - - if role_specific: - return role_specific - - return AgentConfig.objects.filter( - organization__roles__uuid=role_uuid, - role__isnull=True, - agent_type=agent_type, - ).order_by('-updated_at').first() - - @database_sync_to_async - def get_role_progress_context(self, role_uuid, user_id, flow_uuid=None): - from apps.accounts.models import Role - - role = Role.objects.get(uuid=role_uuid) - active_flow = OnboardingFlow.objects.filter(role=role, is_active=True).order_by('-updated_at').first() - scoped_flow = None - if flow_uuid: - scoped_flow = OnboardingFlow.objects.filter(role=role, uuid=flow_uuid).first() - - sessions = OnboardingSession.objects.filter(user_id=user_id, role=role).order_by('-updated_at') - if flow_uuid: - sessions = sessions.filter(Q(flow__uuid=flow_uuid) | Q(flow__isnull=True, state__flow_uuid=str(flow_uuid))) - - latest_session = sessions.first() - - if not latest_session: - return { - "role_uuid": str(role.uuid), - "role_name": role.name, - "latest_status": "not_started", - "session_count": 0, - "flow_exists": bool(scoped_flow or active_flow), - "flow_uuid": str((scoped_flow or active_flow).uuid) if (scoped_flow or active_flow) else None, - "progress": 0, - "responses_count": 0, - "completed_modules": [], - "is_completed": False, - "final_quiz_result": {}, - "final_quiz_qa": [], - } - - state = latest_session.state or {} - responses = state.get("responses", {}) - completed_modules = state.get("completed_modules", []) - progress = state.get("progress_percentage", state.get("progress", 0)) - final_quiz_result = state.get("final_quiz_result", {}) - - flow_for_context = latest_session.flow or scoped_flow or active_flow - structure = flow_for_context.structure if flow_for_context and isinstance(flow_for_context.structure, list) else [] - - quiz_page = None - for page in structure: - if not isinstance(page, dict): - continue - meta = page.get("meta") if isinstance(page.get("meta"), dict) else {} - if str(meta.get("page_type") or "").strip() == "final_quiz": - quiz_page = page - break - - if quiz_page is None and structure: - quiz_page = structure[-1] if isinstance(structure[-1], dict) else None - - quiz_fields = quiz_page.get("fields") if isinstance(quiz_page, dict) and isinstance(quiz_page.get("fields"), list) else [] - quiz_page_uuid = str(quiz_page.get("uuid") or "") if isinstance(quiz_page, dict) else "" - - quiz_responses = {} - if isinstance(responses, dict) and quiz_page_uuid: - candidate = responses.get(quiz_page_uuid, {}) - if isinstance(candidate, dict): - quiz_responses = candidate - - grading_details = ( - final_quiz_result.get("grading_details", []) - if isinstance(final_quiz_result, dict) - else [] - ) - grading_by_key = {} - if isinstance(grading_details, list): - for detail in grading_details: - if not isinstance(detail, dict): - continue - key = str(detail.get("key") or "").strip() - if not key: - continue - grading_by_key[key] = { - "correct": bool(detail.get("correct")), - "reason": str(detail.get("reason") or ""), - } - - final_quiz_qa = [] - for field in quiz_fields: - if not isinstance(field, dict): - continue - key = str(field.get("key") or "").strip() - if not key: - continue - - detail = grading_by_key.get(key, {}) - final_quiz_qa.append( - { - "key": key, - "label": str(field.get("label") or key), - "field_type": str(field.get("field_type") or ""), - "answer": quiz_responses.get(key), - "marked_correct": detail.get("correct"), - "marking_reason": detail.get("reason", ""), - } - ) - - return { - "role_uuid": str(role.uuid), - "role_name": role.name, - "latest_status": latest_session.status, - "session_count": sessions.count(), - "flow_exists": bool(scoped_flow or active_flow), - "flow_uuid": str(latest_session.flow.uuid) if latest_session.flow_id else str((scoped_flow or active_flow).uuid) if (scoped_flow or active_flow) else None, - "progress": progress, - "responses_count": len(responses) if isinstance(responses, dict) else 0, - "completed_modules": completed_modules if isinstance(completed_modules, list) else [], - "updated_at": latest_session.updated_at.isoformat() if latest_session.updated_at else None, - "is_completed": latest_session.status == 'completed', - "final_quiz_result": final_quiz_result if isinstance(final_quiz_result, dict) else {}, - "final_quiz_qa": final_quiz_qa, - } - - @database_sync_to_async - def resolve_target_user_id(self, role_uuid, requester_id, target_user_uuid): - from apps.accounts.models import Role, User - - role = Role.objects.filter(uuid=role_uuid).first() - requester = User.objects.filter(id=requester_id).first() - target = User.objects.filter(uuid=target_user_uuid).first() - if role is None or requester is None or target is None: - return None - - is_owner = role.organization.owner.id == requester_id - is_manager_member = bool(requester.is_manager) and role.organization.members.filter(id=requester_id).exists() - if not (is_owner or is_manager_member): - return None - - if not role.members.filter(id=target.id).exists(): - return None - - return target.id \ No newline at end of file diff --git a/apps/onboarding/consumers/__init__.py b/apps/onboarding/consumers/__init__.py new file mode 100644 index 0000000..93e6ec0 --- /dev/null +++ b/apps/onboarding/consumers/__init__.py @@ -0,0 +1,4 @@ +from .base import * +from .chat import * +from .generate import * +from .progress import * \ No newline at end of file diff --git a/apps/onboarding/consumers/base.py b/apps/onboarding/consumers/base.py new file mode 100644 index 0000000..364e943 --- /dev/null +++ b/apps/onboarding/consumers/base.py @@ -0,0 +1,214 @@ +import json +import logging +from enum import Enum + +import httpx +from channels.db import database_sync_to_async +from channels.generic.websocket import AsyncWebsocketConsumer +from django.conf import settings +from django.db.models import Q +from django.utils import timezone + +from apps.accounts.models import User, Role +from apps.accounts.permissions import get_organization_from_object, can_manage_organization +from apps.onboarding.consumers.prompts import OnboardingPrompts +from apps.onboarding.mcp import mcp_router, MCPRouter +from apps.onboarding.models import AgentConfig +from apps.onboarding.utils.content_moderator import ContentModerator + +__all__ = ["BaseOnboardingConsumer", "LogType"] + +logger = logging.getLogger(__name__) + +class LogType(Enum): + STATUS = "status" # General progress updates + ERROR = "error" # Failures + INFO = "info" # Debug/Verbose logs + THOUGHT = "thought" # AI internal reasoning turns + TOOL_START = "tool_start" # When an MCP tool is called + TOOL_RESULT = "tool_result" # When data comes back from a tool + COMPLETED = "completed" # The final completion signal + +class BaseOnboardingConsumer(AsyncWebsocketConsumer): + """ + Base consumer for all onboarding-related WebSocket consumers. + """ + + user: User + router: MCPRouter + logger: logging.Logger = logger + moderator: ContentModerator = ContentModerator() + + ### Connection Management ### + async def connect(self): + self.user = self.scope["user"] + if not self.user.is_authenticated: + self.logger.warning("WebSocket connection rejected: unauthenticated user attempted to connect") + return await self.close() + self.parse_extra() + self.router = mcp_router + self.logger.info(f"WebSocket connected: user={self.user.full_name}") + return await self.accept() + + async def disconnect(self, close_code: int): + self.logger.info(f"WebSocket disconnected: user={self.user.full_name} close_code={close_code}") + + ### Event Handling ### + async def receive(self, text_data: str): + """ + Main entry point for incoming messages. + """ + try: + data = self.from_json(text_data) + for field in ("query", "message"): + value = data.get(field) + if value and not self.moderator.is_clean(str(value)): + return await self.send_error("Message blocked: content did not pass moderation.") + action_name = data.get("action", "message") + method = getattr(self, f"action_{action_name}", None) + if method: + self.logger.info(f"Dispatching action: {action_name}") + await method(data) + else: + await self.send_error(f"Action '{action_name}' not supported on this endpoint.") + except Exception as e: + await self.send_error(f"An unexpected error occurred when processing the event.") + self.logger.exception(f"WebSocket receive critical failure: {str(e)}") + + ### MCP Handling ### + async def orchestrate(self, message: str, config: AgentConfig, minimum_turns: int = 2, maximum_turns: int = 5, + max_tokens: int | None = None, raise_on_error: bool = False, request_timeout: int = 60.0) -> str: + """ + Orchestrates a multi-turn conversation with the agent, including tool calls and reasoning steps. + """ + llm_config = config.llm_config if isinstance(config.llm_config, dict) else {} + resolved_max_tokens = max_tokens or llm_config.get("max_tokens", 1024) + messages = [ + {"role": "system", "content": config.system_prompt or OnboardingPrompts.default_system_prompt()}, + {"role": "user", "content": message} + ] + last_content = "" + async with httpx.AsyncClient(timeout = request_timeout) as client: + for turn in range(1, maximum_turns + 1): + await self.send_log(LogType.THOUGHT, f"Agent reasoning (Turn {turn})...") + try: + response = await client.post( + settings.INFERENCE_CHAT_COMPLETIONS_ENDPOINT, + json={ + "model": llm_config.get("model_id", "meta-llama-3.1-8b"), + "messages": messages, + "tools": self.router.get_tool_definitions(), + "tool_choice": "auto", + "max_tokens": resolved_max_tokens, + } + ) + response.raise_for_status() + ai_message = response.json()["choices"][0]["message"] + messages.append(ai_message) + if ai_message.get("tool_calls"): + for tool_call in ai_message["tool_calls"]: + fn_name = tool_call["function"]["name"] + fn_args = json.loads(tool_call["function"]["arguments"]) + await self.send_log(LogType.TOOL_START, f"Calling tool: {fn_name}", content=fn_args) + result = await self.router.handle_tool_call(fn_name, fn_args) + await self.send_log(LogType.TOOL_RESULT, f"Tool {fn_name} returned data", content=result) + messages.append({ + "role": "tool", + "tool_call_id": tool_call["id"], + "name": fn_name, + "content": self.to_json(result) + }) + continue + else: + last_content = self.moderator.censor(str(ai_message.get("content") or "").strip()) + if turn < minimum_turns: + messages.append({ + "role": "user", + "content": OnboardingPrompts.force_reasoning_prompt() + }) + continue + return last_content + except Exception as e: + await self.send_error(f"AI Orchestration failed: {str(e)}") + if raise_on_error: + raise e + return f"Error: {str(e)}" + return last_content + + ### Regular Helpers ### + async def send_log(self, log_type: LogType, message: str, content: str | dict | None = None): + if log_type == LogType.ERROR: + self.logger.error(f"[{log_type.value}]: message={str(message)[:100]} content={str(content)[:60]}") + else: + self.logger.info(f"[{log_type.value}]: message={str(message)[:100]} content={str(content)[:60]}") + await self.send(self.to_json({ "type": log_type.value, "message": message, "content": content, "timestamp": self.get_timestamp()})) + + async def send_error(self, message: str): + await self.send_log(LogType.ERROR, message) + + def parse_extra(self): + """ + Override for custom parsing + """ + pass + + def to_json(self, data: dict | list) -> str: + return json.dumps(data, default=str) + + def from_json(self, data: str) -> dict | list: + return json.loads(data) + + def get_timestamp(self): + return timezone.now().isoformat() + + def parse_max_tokens(self, max_tokens: int | None) -> int: + if max_tokens is None: + return None + if isinstance(max_tokens, int) and max_tokens > 0: + return max_tokens + return None + + ### Database Helpers ### + @database_sync_to_async + def get_config(self, config_uuid): + return AgentConfig.objects.get(uuid = config_uuid) + + @database_sync_to_async + def get_role(self, role_uuid): + return Role.objects.get(uuid = role_uuid) + + @database_sync_to_async + def get_config_by_type(self, role_uuid, agent_type): + role_specific = AgentConfig.objects.filter( + role__uuid=role_uuid, + agent_type=agent_type, + ).order_by('-updated_at').first() + + if role_specific: + return role_specific + + return AgentConfig.objects.filter( + organization__roles__uuid=role_uuid, + role__isnull=True, + agent_type=agent_type, + ).order_by('-updated_at').first() + + @database_sync_to_async + def get_config_for_user(self, config_uuid): + return AgentConfig.objects.filter(uuid = config_uuid).filter( + Q(organization__owner__id=self.user.id) | Q(organization__members__id=self.user.id) + ).first() + + @database_sync_to_async + def can_manage_role(self, role_uuid): + role = Role.objects.filter(uuid=role_uuid).first() + if role is None: + return False + return can_manage_organization(self.user, get_organization_from_object(role)) + + @database_sync_to_async + def can_access_role(self, role_uuid): + role = Role.objects.filter(uuid=role_uuid).first() + if role is None: + return False + return role.members.filter(id=self.user.id).exists() or can_manage_organization(self.user, get_organization_from_object(role)) diff --git a/apps/onboarding/consumers/chat.py b/apps/onboarding/consumers/chat.py new file mode 100644 index 0000000..1cc947b --- /dev/null +++ b/apps/onboarding/consumers/chat.py @@ -0,0 +1,25 @@ +from apps.onboarding.consumers.base import BaseOnboardingConsumer, LogType + +__all__ = ["OnboardingChatConsumer"] + +class OnboardingChatConsumer(BaseOnboardingConsumer): + """ + Route: /ws/onboarding/chat// + """ + + config_uuid: str + + def parse_extra(self): + self.config_uuid = self.scope["url_route"]["kwargs"].get("config_uuid") + + async def action_message(self, data: dict): + user_query = data.get("query") + if not user_query: + return await self.send_error("Missing 'query' field in payload.") + max_tokens = self.parse_max_tokens(data.get("max_tokens")) + config = await self.get_config_for_user(self.config_uuid) + if config is None: + await self.send_error("Forbidden or Invalid Config UUID") + return + response = await self.orchestrate(user_query, config, max_tokens=max_tokens) + await self.send_log(LogType.COMPLETED, "Inferenced complete.", {"response": response}) diff --git a/apps/onboarding/consumers/generate.py b/apps/onboarding/consumers/generate.py new file mode 100644 index 0000000..7254272 --- /dev/null +++ b/apps/onboarding/consumers/generate.py @@ -0,0 +1,402 @@ +import json +import re +from uuid import uuid4 + +from channels.db import database_sync_to_async + +from apps.accounts.models import Role +from apps.onboarding.consumers.base import BaseOnboardingConsumer, LogType +from apps.onboarding.consumers.prompts import OnboardingPrompts + +from apps.onboarding.models import OnboardingFlow + +__all__ = ["OnboardingGenerateConsumer"] + +class OnboardingGenerateConsumer(BaseOnboardingConsumer): + """ + Route: /ws/onboarding/generate// + """ + + role_uuid: str + + def parse_extra(self): + self.role_uuid = self.scope["url_route"]["kwargs"].get("role_uuid") + + async def action_start_full_onboarding(self, data: dict) -> None: + user_message = data.get("message") + if user_message and not self.moderator.is_clean(user_message): + return await self.send_error("Your message did not pass moderation. Please revise your input.") + if not await self.can_access_role(self.role_uuid): + return await self.send_error("You are not a member of this role or do not have permission to access it") + await self.run_pipeline(await self.get_role(self.role_uuid)) + + async def run_pipeline(self, role: Role) -> None: + """ + Full orchestration pipeline for generating curriculum, knowledge content, and assessment quiz for a given role + """ + self.logger.info(f"Starting onboarding generation pipeline for role={role.name} (uuid={role.uuid}) by user={self.user.full_name}") + await self.send_log(LogType.STATUS, f"Phase 1: Generating Curriculum...", "curriculum") + ca_config = await self.get_config_by_type(role.uuid, 'curriculum') + if not ca_config: + return await self.send_error("Missing curriculum AgentConfig for this role") + ca_response = await self.orchestrate(OnboardingPrompts.curriculum_generation_prompt(), ca_config, minimum_turns=1, max_tokens=384) + topics = self._extract_json_list(ca_response) + if not topics: + return await self.send_log(LogType.ERROR, "Curriculum generation returned no topics", f"Curriculum generation produced no topics for role={role.name} (uuid={role.uuid})") + full_structure = [] + module_briefs = [] + for index, topic in enumerate(topics): + await self.send_log(LogType.STATUS, f"Phase 2: Researching {topic}...", "knowledge") + ka_config = await self.get_config_by_type(role.uuid, 'knowledge') + if not ka_config: + return await self.send_error("Missing knowledge AgentConfig for this role") + knowledge_hits = await self.fetch_knowledge_context(role.uuid, topic) + context_markdown = self.format_knowledge_context(knowledge_hits) + ka_response = await self.orchestrate( + OnboardingPrompts.knowledge_generation_prompt(topic, context_markdown), + ka_config, minimum_turns=2, max_tokens=2400 + ) + full_structure.append({ + "title": topic, + "body": ka_response, + "order": index, + "fields": [], + "meta": { + "topic_index": index, + "table_of_contents": [str(item) for item in topics], + }, + }) + module_briefs.append({ + "topic": str(topic), + "summary_excerpt": str(ka_response)[:1200], + }) + + await self.send_log(LogType.STATUS, "Phase 3: Creating final assessment quiz...", "assessment") + aa_config = await self.get_config_by_type(role.uuid, 'assessment') + if not aa_config: + return await self.send_error("Missing assessment AgentConfig for this role") + question_count = 8 + try: + random_result = await self.router.handle_tool_call("random_int", {"min": 6, "max": 10}) + if isinstance(random_result, dict) and isinstance(random_result.get("value"), int): + question_count = int(random_result["value"]) + except Exception: + question_count = 8 + quiz_response = await self.orchestrate( + OnboardingPrompts.quiz_generation_prompt(question_count, module_briefs), + aa_config, + minimum_turns=1, + max_tokens=1600, + ) + quiz_fields = self._sanitize_quiz_fields(self._extract_json_list(quiz_response)) + + if not quiz_fields: + await self.send_log(LogType.STATUS, "Assessment output invalid, retrying quiz generation...", "assessment") + retry_response = await self.orchestrate( + OnboardingPrompts.quiz_generation_retry_prompt(question_count, module_briefs), + aa_config, + minimum_turns=1, + max_tokens=1600, + ) + quiz_fields = self._sanitize_quiz_fields(self._extract_json_list(retry_response)) + + if not quiz_fields: + await self.send_log(LogType.STATUS, "Assessment output still invalid. Using fallback final quiz.", "assessment") + quiz_fields = self._build_fallback_quiz_fields([str(topic) for topic in topics], count=question_count) + + full_structure.append({ + "title": "Final Assessment Quiz", + "body": ( + "### Final Quiz\n" + "Answer all questions below. You need **80%** to complete onboarding. " + "You can update answers and submit when ready." + ), + "order": len(full_structure), + "fields": quiz_fields, + "meta": { + "page_type": "final_quiz", + "pass_mark": 80, + }, + }) + await self.save_full_flow(role.uuid, full_structure) + self.logger.info("Full onboarding generation completed: role_uuid=%s pages=%s", role.uuid, len(full_structure)) + await self.send_log(LogType.COMPLETED, "Onboarding pipeline complete and structure saved.") + + async def fetch_knowledge_context(self, role_uuid, topic): + query = f"onboarding training content for {topic}" + await self.send_log(LogType.TOOL_START, "Accessing knowledge base: search_knowledge...", {"query": query, "role_uuid": str(role_uuid)}) + + try: + result = await self.router.handle_tool_call( + "search_knowledge", + { + "query": query, + "role_uuid": str(role_uuid), + }, + ) + + await self.send_log(LogType.TOOL_RESULT, f"Retrieved {len(result) if isinstance(result, list) else 0} knowledge chunk(s)", result) + + return result if isinstance(result, list) else [] + except Exception as exc: + await self.send_log(LogType.ERROR, f"Knowledge retrieval failed for topic '{topic}': {str(exc)}") + return [] + + def _coerce_list_payload(self, payload): + if isinstance(payload, list): + return payload + if isinstance(payload, dict): + for key in ('questions', 'items', 'fields', 'quiz'): + value = payload.get(key) + if isinstance(value, list): + return value + return [] + + def _extract_json_list(self, text: str) -> list: + """ + Extracts a JSON list from model output, tolerating wrappers and markdown fences + """ + if not text: + return [] + candidate_texts = [str(text).strip()] + + for block in re.findall(r'```(?:json)?\s*([\s\S]*?)```', str(text), re.IGNORECASE): + candidate_texts.append(block.strip()) + + decoder = json.JSONDecoder() + + for candidate in candidate_texts: + if not candidate: + continue + + try: + parsed = json.loads(candidate) + coerced = self._coerce_list_payload(parsed) + if coerced: + return coerced + except Exception: + pass + + for idx, char in enumerate(candidate): + if char not in '[{': + continue + try: + parsed, _ = decoder.raw_decode(candidate[idx:]) + except Exception: + continue + + coerced = self._coerce_list_payload(parsed) + if coerced: + return coerced + + return [] + + def format_knowledge_context(self, knowledge_hits): + + if not knowledge_hits: + return "No indexed MCP documents found for this role/topic." + + lines = [] + for idx, item in enumerate(knowledge_hits[:5]): + source = item.get("source", "Unknown Source") if isinstance(item, dict) else "Unknown Source" + relevance = item.get("relevance") if isinstance(item, dict) else None + content = item.get("content", "") if isinstance(item, dict) else "" + safe_content = str(content).strip()[:1600] + lines.append( + f"[{idx + 1}] Source: {source} | Relevance: {relevance}\n{safe_content}" + ) + + return "\n\n".join(lines) + + def _sanitize_quiz_fields(self, raw_fields): + sanitized = [] + seen_keys = set() + + for index, field in enumerate(raw_fields or []): + if not isinstance(field, dict): + continue + + key = str(field.get('key') or f'final_quiz_q_{index + 1}').strip().lower().replace(' ', '_') + if not key: + key = f'final_quiz_q_{index + 1}' + + if key in seen_keys: + key = f'{key}_{index + 1}' + seen_keys.add(key) + + label = str(field.get('label') or '').strip() + if not label: + continue + + field_type = str(field.get('field_type') or 'select').strip().lower() + if field_type not in ('select', 'text', 'textarea'): + field_type = 'select' + + raw_options = field.get('options') if isinstance(field.get('options'), list) else [] + options = [] + for option in raw_options: + option_text = str(option).strip() + if option_text and option_text not in options: + options.append(option_text) + + validation = field.get('validation') if isinstance(field.get('validation'), dict) else {} + if field_type == 'select': + if len(options) < 2: + continue + + correct_option = str(validation.get('correct_option') or '').strip() + if correct_option not in options: + correct_option = options[0] + + sanitized.append({ + 'key': key, + 'label': label, + 'field_type': 'select', + 'options': options[:5], + 'required': True, + 'validation': { + 'correct_option': correct_option, + 'explanation': str(validation.get('explanation') or ''), + }, + }) + continue + + accepted_answers_raw = validation.get('accepted_answers') + if isinstance(accepted_answers_raw, list): + accepted_answers = [str(item).strip() for item in accepted_answers_raw if str(item).strip()] + else: + accepted_single = str(validation.get('correct_answer') or '').strip() + accepted_answers = [accepted_single] if accepted_single else [] + + if not accepted_answers: + continue + + sanitized.append({ + 'key': key, + 'label': label, + 'field_type': 'textarea' if field_type == 'textarea' else 'text', + 'options': [], + 'required': True, + 'validation': { + 'accepted_answers': accepted_answers, + 'explanation': str(validation.get('explanation') or ''), + }, + }) + + return sanitized + + def _build_fallback_quiz_fields(self, topics, count=8): + safe_topics = [str(topic).strip() for topic in (topics or []) if str(topic).strip()] + if not safe_topics: + safe_topics = ['onboarding fundamentals'] + + fallback_fields = [] + for index in range(count): + topic = safe_topics[index % len(safe_topics)] + key = f'final_quiz_q_{index + 1}' + + if index % 3 == 0: + fallback_fields.append({ + 'key': key, + 'label': f"In one or two sentences, what is the safest approach when handling {topic}?", + 'field_type': 'textarea', + 'options': [], + 'required': True, + 'validation': { + 'accepted_answers': [ + 'best practices', + 'documentation', + 'quality', + 'compliance', + ], + 'explanation': 'Good answers reference documented best practices, quality checks, and compliance.', + }, + }) + continue + + correct = f"Use documented best practices for {topic}." + options = [ + correct, + f"Skip review steps for {topic} to move faster.", + f"Rely only on assumptions instead of evidence for {topic}.", + f"Ignore quality and compliance checks in {topic} tasks.", + ] + fallback_fields.append({ + 'key': key, + 'label': f"Which approach is most appropriate when working on {topic}?", + 'field_type': 'select', + 'options': options, + 'required': True, + 'validation': { + 'correct_option': correct, + 'explanation': f"{correct} balances reliability, quality, and role expectations.", + }, + }) + + return fallback_fields + + def _normalize_structure(self, structure): + normalized_pages = [] + for index, page in enumerate(structure or []): + fields = [] + for field_index, field in enumerate(page.get('fields', []) if isinstance(page, dict) else []): + if not isinstance(field, dict): + continue + key = str(field.get('key') or f'field_{field_index + 1}') + raw_options = field.get('options') if isinstance(field.get('options'), list) else [] + options = [str(option) for option in raw_options if str(option).strip()] + + validation = field.get('validation') if isinstance(field.get('validation'), dict) else {} + correct_option = validation.get('correct_option') + if correct_option is not None: + correct_option = str(correct_option) + + normalized_validation = { + 'correct_option': correct_option if correct_option in options else None, + 'accepted_answers': [ + str(item).strip() + for item in (validation.get('accepted_answers') if isinstance(validation.get('accepted_answers'), list) else []) + if str(item).strip() + ], + 'explanation': str(validation.get('explanation') or ''), + } + + fields.append({ + 'uuid': str(uuid4()), + 'key': key, + 'label': str(field.get('label') or key.replace('_', ' ').title()), + 'field_type': str(field.get('field_type') or 'text'), + 'required': bool(field.get('required', False)), + 'options': options, + 'default_value': field.get('default_value', ''), + 'validation': normalized_validation, + }) + + page_title = page.get('title') if isinstance(page, dict) else None + page_body = page.get('body') if isinstance(page, dict) else '' + page_order = page.get('order') if isinstance(page, dict) else index + normalized_pages.append({ + 'uuid': str(uuid4()), + 'title': str(page_title or f'Module {index + 1}'), + 'body': str(page_body or ''), + 'order': int(page_order if isinstance(page_order, int) else index), + 'fields': fields, + 'meta': page.get('meta') if isinstance(page.get('meta'), dict) else {}, + }) + return normalized_pages + + @database_sync_to_async + def save_full_flow(self, role_uuid, structure): + """Saves the final nested structure to the OnboardingFlow model.""" + from apps.accounts.models import Role + role = Role.objects.get(uuid=role_uuid) + normalized_structure = self._normalize_structure(structure) + flow, _ = OnboardingFlow.objects.update_or_create( + role=role, + defaults={ + 'title': f"AI Onboarding: {role.name}", + 'structure': normalized_structure, + 'is_active': True + } + ) + return flow \ No newline at end of file diff --git a/apps/onboarding/consumers/progress.py b/apps/onboarding/consumers/progress.py new file mode 100644 index 0000000..fe59437 --- /dev/null +++ b/apps/onboarding/consumers/progress.py @@ -0,0 +1,146 @@ +from django.db.models import Q +from channels.db import database_sync_to_async + +from apps.accounts.models import Role +from apps.onboarding.consumers.base import BaseOnboardingConsumer, LogType +from apps.onboarding.consumers.prompts import OnboardingPrompts +from apps.onboarding.models import OnboardingFlow, OnboardingSession + +__all__ = ["OnboardingProgressConsumer"] + +class OnboardingProgressConsumer(BaseOnboardingConsumer): + """ + Route: /ws/onboarding/progress//// + """ + + role_uuid: str + flow_uuid: str + target_user_uuid: str + + def parse_extra(self): + self.role_uuid = self.scope["url_route"]["kwargs"].get("role_uuid") + self.flow_uuid = self.scope["url_route"]["kwargs"].get("flow_uuid") + self.target_user_uuid = self.scope["url_route"]["kwargs"].get("user_uuid") + + async def action_progress_monitor(self, data: dict): + """ + Analyzes the learner's progress and provides strengths, gaps, and next actions. + """ + if not await self.can_access_role(self.role_uuid): + return await self.send_error("Forbidden: You do not have access to this role.") + + target_user_id = self.user.id + if self.target_user_uuid and str(self.target_user_uuid) != str(self.user.uuid): + target_user_id = await self.resolve_target_user_id(self.role_uuid, self.user.id, self.target_user_uuid) + if not target_user_id: + return await self.send_error("Forbidden: You cannot monitor this user.") + + await self.send_log(LogType.STATUS, "Progress Monitor is analyzing your onboarding progress...", "monitor") + + monitor_config = await self.get_config_by_type(self.role_uuid, 'monitor') + if not monitor_config: + return await self.send_error("Missing Progress Monitor AgentConfig for this role") + + progress_context = await self.get_role_progress_context( + self.role_uuid, + target_user_id, + flow_uuid=self.flow_uuid, + ) + + feedback = await self.orchestrate( + OnboardingPrompts.progress_monitoring_prompt(progress_context), + monitor_config, + minimum_turns=1, + max_tokens=640, + raise_on_error=True + ) + + await self.send_log(LogType.COMPLETED, "Progress analysis complete.", { + "role_uuid": self.role_uuid, + "feedback": feedback, + "status": progress_context.get("latest_status", "unknown"), + "user_id": target_user_id, + "flow_uuid": self.flow_uuid, + "is_completed": progress_context.get("is_completed", False), + }) + + ### Database Helpers ### + @database_sync_to_async + def get_role_progress_context(self, role_uuid, user_id, flow_uuid=None): + + role = Role.objects.get(uuid=role_uuid) + active_flow = OnboardingFlow.objects.filter(role=role, is_active=True).order_by('-updated_at').first() + scoped_flow = OnboardingFlow.objects.filter(role=role, uuid=flow_uuid).first() if flow_uuid else None + + sessions = OnboardingSession.objects.filter(user_id=user_id, role=role).order_by('-updated_at') + if flow_uuid: + sessions = sessions.filter(Q(flow__uuid=flow_uuid) | Q(flow__isnull=True, state__flow_uuid=str(flow_uuid))) + + latest_session = sessions.first() + + if not latest_session: + return { + "role_uuid": str(role.uuid), + "role_name": role.name, + "latest_status": "not_started", + "flow_uuid": str((scoped_flow or active_flow).uuid) if (scoped_flow or active_flow) else None, + "is_completed": False, + "progress": 0, + } + + state = latest_session.state or {} + flow_for_context = latest_session.flow or scoped_flow or active_flow + structure = flow_for_context.structure if flow_for_context and isinstance(flow_for_context.structure, list) else [] + + # Find the Final Quiz page for grading details + quiz_page = next((p for p in structure if isinstance(p, dict) and p.get("meta", {}).get("page_type") == "final_quiz"), None) + if quiz_page is None and structure: + quiz_page = structure[-1] + + quiz_page_uuid = str(quiz_page.get("uuid") or "") if quiz_page else "" + responses = state.get("responses", {}) + quiz_responses = responses.get(quiz_page_uuid, {}) if isinstance(responses, dict) else {} + + final_quiz_result = state.get("final_quiz_result", {}) + grading_details = final_quiz_result.get("grading_details", []) if isinstance(final_quiz_result, dict) else [] + + # Build QA map for the AI + final_quiz_qa = [] + if quiz_page: + for field in quiz_page.get("fields", []): + key = field.get("key") + detail = next((d for d in grading_details if d.get("key") == key), {}) + final_quiz_qa.append({ + "label": field.get("label"), + "answer": quiz_responses.get(key), + "marked_correct": detail.get("correct"), + "reason": detail.get("reason", "") + }) + + return { + "role_name": role.name, + "latest_status": latest_session.status, + "progress": state.get("progress_percentage", 0), + "completed_modules": state.get("completed_modules", []), + "is_completed": latest_session.status == 'completed', + "final_quiz_qa": final_quiz_qa, + } + + @database_sync_to_async + def resolve_target_user_id(self, role_uuid, requester_id, target_user_uuid): + from apps.accounts.models import Role, User + + role = Role.objects.filter(uuid=role_uuid).first() + requester = User.objects.filter(id=requester_id).first() + target = User.objects.filter(uuid=target_user_uuid).first() + + if not all([role, requester, target]): + return None + + # Check if requester is Owner or Manager + is_privileged = (role.organization.owner.id == requester_id or + (bool(requester.is_manager) and role.organization.members.filter(id=requester_id).exists())) + + if is_privileged and role.members.filter(id=target.id).exists(): + return target.id + return None \ No newline at end of file diff --git a/apps/onboarding/consumers/prompts.py b/apps/onboarding/consumers/prompts.py new file mode 100644 index 0000000..272cd5c --- /dev/null +++ b/apps/onboarding/consumers/prompts.py @@ -0,0 +1,71 @@ +import json + +__all__ = ["OnboardingPrompts"] + +class OnboardingPrompts: + + @staticmethod + def default_system_prompt(): + return ( + "You are a helpful onboarding assistant that helps new employees get onboarded to their new company." + "You may use relevant tools to assist you to provide the best support." + ) + + @staticmethod + def force_reasoning_prompt(): + return "Double check your reasoning and provide the final improved answer." + + @staticmethod + def curriculum_generation_prompt(): + return ( + "Based on available documentation, create an onboarding curriculum for this role. " + "Output ONLY a valid JSON array of 3-5 strings representing module titles. " + "Example: [\"Introduction\", \"Safety\", \"Operations\"]" + ) + + @staticmethod + def knowledge_generation_prompt(topic, context_markdown): + return ( + f"Write a practical onboarding training guide for the topic '{topic}'. " + "Think step-by-step internally before writing the final answer. " + "Use the MCP search context below as your primary source, and call additional tools if needed. " + "If no indexed documents are available, provide a concise best-practice overview and clearly say no indexed documents were found. " + "Use Markdown formatting and do NOT include a table of contents in this section. " + "Generate substantial depth: target 900-1400 words. " + "Include these sections in order: Overview, Core Concepts, Role-Specific Workflow, Practical Examples, Common Pitfalls, and Action Checklist. " + "In Practical Examples, provide at least 2 concrete examples relevant to this role/topic. " + "In Action Checklist, provide at least 8 actionable checklist items.\n\n" + f"Topic: {topic}\n" + f"MCP search context:\n{context_markdown}" + ) + + @staticmethod + def quiz_generation_prompt(question_count, module_briefs): + return ( + "Create a final onboarding quiz that assesses all generated modules. " + f"Output ONLY a valid JSON array of exactly {question_count} question objects. " + "Use a mix of question types: at least 2 short-answer questions and at least 2 multiple-choice questions. " + "For multiple-choice objects: field_type='select', options (4 unique strings), and validation.correct_option. " + "For short-answer objects: field_type='textarea' (or 'text') and validation.accepted_answers (array of valid answers/keywords). " + "Each object MUST include key, label, field_type, required=true, and validation.explanation. " + "Cover all topics with balanced difficulty and avoid ambiguous wording.\n\n" + f"Modules JSON:\n{json.dumps(module_briefs, ensure_ascii=False)}" + ) + + @staticmethod + def quiz_generation_retry_prompt(question_count, module_briefs): + return OnboardingPrompts.quiz_generation_prompt(question_count, module_briefs) + ( + "Return ONLY raw JSON. Do not use markdown fences. Do not include explanations outside JSON." + ) + + @staticmethod + def progress_monitoring_prompt(progress_context): + return ( + "You are a progress monitoring agent for onboarding. " + "Analyze the role onboarding data below and provide concise feedback with:\n" + "1) current status\n2) strengths\n3) gaps\n4) next actions\n" + "Use prior learner question/answer evidence and any saved marking details when available. " + "If evidence is insufficient, explicitly state what is missing.\n" + "Keep it short and practical.\n\n" + f"Progress context JSON:\n{json.dumps(progress_context)}" + ) \ No newline at end of file diff --git a/apps/onboarding/mcp.py b/apps/onboarding/mcp.py index 900eb2d..37a3f94 100644 --- a/apps/onboarding/mcp.py +++ b/apps/onboarding/mcp.py @@ -203,4 +203,6 @@ class MCPRouter: return {'value': value, 'min': min_value, 'max': max_value} tools = _collect_tools(locals()) - _tool_name_to_method = {tool['name']: tool['method'] for tool in tools} \ No newline at end of file + _tool_name_to_method = {tool['name']: tool['method'] for tool in tools} + +mcp_router = MCPRouter() \ No newline at end of file diff --git a/apps/onboarding/migrations/0001_initial.py b/apps/onboarding/migrations/0001_initial.py index 112dd0e..4959842 100644 --- a/apps/onboarding/migrations/0001_initial.py +++ b/apps/onboarding/migrations/0001_initial.py @@ -25,7 +25,6 @@ class Migration(migrations.Migration): ('agent_type', models.CharField(choices=[('curriculum', 'Curriculum Agent (CA)'), ('knowledge', 'Knowledge Agent (KA)'), ('assessment', 'Assessment Agent (AA)'), ('monitor', 'Progress Monitor Agent (PMA)')], max_length=40, verbose_name='Agent Type')), ('llm_config', models.JSONField(blank=True, default=dict, null=True, verbose_name='LLM Configuration')), ('system_prompt', models.TextField(blank=True, default='', verbose_name='System Prompt')), - ('tool_permissions', models.JSONField(blank=True, default=list, null=True, verbose_name='Tool Permissions')), ('organization', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='agent_configs', to='accounts.organization', verbose_name='Organization')), ('role', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='agent_configs', to='accounts.role', verbose_name='Role')), ], diff --git a/apps/onboarding/models.py b/apps/onboarding/models.py index ddc7431..dc6a9dd 100644 --- a/apps/onboarding/models.py +++ b/apps/onboarding/models.py @@ -19,14 +19,13 @@ class AgentConfig(IdentifierMixin, TimeStampMixin, Model): llm_config = JSONField(default=dict, blank=True, null=True, verbose_name=_("LLM Configuration")) system_prompt = TextField(verbose_name=_("System Prompt"), blank=True, default='') - tool_permissions = JSONField(default=list, blank=True, null=True, verbose_name=_("Tool Permissions")) class Meta: verbose_name = _('Agent Config') verbose_name_plural = _('Agent Configs') def __str__(self): - return f"{self.name} ({self.get_agent_type_display()})" + return f"{self.name} ({self.organization.name})" class OnboardingFlow(IdentifierMixin, TimeStampMixin, Model): diff --git a/apps/onboarding/routing.py b/apps/onboarding/routing.py index 38343af..411ed84 100644 --- a/apps/onboarding/routing.py +++ b/apps/onboarding/routing.py @@ -1,7 +1,9 @@ from django.urls import path -from .consumers import OnboardingConsumer +from apps.onboarding.consumers import OnboardingChatConsumer, OnboardingGenerateConsumer, OnboardingProgressConsumer websocket_urlpatterns = [ - path("ws/onboarding//", OnboardingConsumer.as_asgi()), + path("ws/onboarding/chat//", OnboardingChatConsumer.as_asgi()), + path("ws/onboarding/generate//", OnboardingGenerateConsumer.as_asgi()), + path("ws/onboarding/progress////", OnboardingProgressConsumer.as_asgi()), ] \ No newline at end of file diff --git a/apps/onboarding/serializers.py b/apps/onboarding/serializers.py index a496d54..e2bf20b 100644 --- a/apps/onboarding/serializers.py +++ b/apps/onboarding/serializers.py @@ -11,7 +11,7 @@ class AgentConfigSerializer(ModelSerializer): model = AgentConfig fields = [ 'id', 'uuid', 'organization', 'role', 'name', 'agent_type', - 'system_prompt', 'llm_config', 'tool_permissions', + 'system_prompt', 'llm_config', 'created_at', 'updated_at' ] read_only_fields = ['id', 'uuid', 'created_at', 'updated_at'] diff --git a/apps/onboarding/tests/test_api.py b/apps/onboarding/tests/test_api.py index bf67114..3e147c3 100644 --- a/apps/onboarding/tests/test_api.py +++ b/apps/onboarding/tests/test_api.py @@ -72,7 +72,6 @@ class OnboardingApiTests(TestCase): 'agent_type': 'monitor', 'system_prompt': 'Monitor progress', 'llm_config': {'model': 'local'}, - 'tool_permissions': ['read'], }, format='json') self.assertEqual(response.status_code, HTTP_201_CREATED) @@ -90,7 +89,7 @@ class OnboardingApiTests(TestCase): 'agent_type': 'knowledge', 'system_prompt': 'Updated', 'llm_config': {'model': 'local'}, - 'tool_permissions': ['read'], + }, format='json', ) diff --git a/apps/onboarding/tests/test_models.py b/apps/onboarding/tests/test_models.py index 073088a..ed7cf80 100644 --- a/apps/onboarding/tests/test_models.py +++ b/apps/onboarding/tests/test_models.py @@ -26,8 +26,7 @@ class OnboardingModelTests(TestCase): name='Operator Knowledge Agent', agent_type='knowledge', llm_config={'model_id': 'x'}, - system_prompt='Assist user', - tool_permissions=['search'], + system_prompt='Assist user' ) self.assertEqual(config.organization, self.org) @@ -35,7 +34,6 @@ class OnboardingModelTests(TestCase): self.assertEqual(config.agent_type, 'knowledge') self.assertEqual(config.llm_config, {'model_id': 'x'}) self.assertEqual(config.system_prompt, 'Assist user') - self.assertEqual(config.tool_permissions, ['search']) self.assertIsNotNone(config.id) self.assertIsNotNone(config.uuid) self.assertIsNotNone(config.created_at) diff --git a/apps/onboarding/utils/content_moderator.py b/apps/onboarding/utils/content_moderator.py new file mode 100644 index 0000000..5d6f601 --- /dev/null +++ b/apps/onboarding/utils/content_moderator.py @@ -0,0 +1,14 @@ +from better_profanity import profanity + +profanity.load_censor_words() + +class ContentModerator: + def is_clean(self, text: str) -> bool: + if not isinstance(text, str): + return True + return not profanity.contains_profanity(text.strip()) + + def censor(self, text: str) -> str: + if not isinstance(text, str): + return text + return profanity.censor(text) diff --git a/apps/onboarding/viewsets.py b/apps/onboarding/viewsets.py index fa750a2..25da142 100644 --- a/apps/onboarding/viewsets.py +++ b/apps/onboarding/viewsets.py @@ -255,7 +255,6 @@ class AgentConfigViewSet(ModelViewSet): agent_type=agent_type, system_prompt=str(request.data.get('system_prompt') or ''), llm_config=request.data.get('llm_config') or {}, - tool_permissions=request.data.get('tool_permissions') or [], ) serializer = self.get_serializer(config) @@ -269,14 +268,13 @@ class AgentConfigViewSet(ModelViewSet): 'agent_type': request.data.get('agent_type'), 'system_prompt': request.data.get('system_prompt'), 'llm_config': request.data.get('llm_config'), - 'tool_permissions': request.data.get('tool_permissions'), } for field, value in updatable_fields.items(): if value is not None: setattr(config, field, value) - config.save(update_fields=['name', 'agent_type', 'system_prompt', 'llm_config', 'tool_permissions', 'updated_at']) + config.save(update_fields=['name', 'agent_type', 'system_prompt', 'llm_config', 'updated_at']) serializer = self.get_serializer(config) return Response(serializer.data, status=HTTP_200_OK) @@ -291,10 +289,8 @@ class AgentConfigViewSet(ModelViewSet): config.system_prompt = request.data.get('system_prompt') if 'llm_config' in request.data: config.llm_config = request.data.get('llm_config') - if 'tool_permissions' in request.data: - config.tool_permissions = request.data.get('tool_permissions') - config.save(update_fields=['name', 'agent_type', 'system_prompt', 'llm_config', 'tool_permissions', 'updated_at']) + config.save(update_fields=['name', 'agent_type', 'system_prompt', 'llm_config', 'updated_at']) serializer = self.get_serializer(config) return Response(serializer.data, status=HTTP_200_OK) @@ -528,9 +524,6 @@ class OnboardingSessionViewSet(ModelViewSet): if not config: return "You are a helpful onboarding assistant." base_prompt = config.system_prompt or "You are a helpful onboarding assistant." - permissions = config.tool_permissions or [] - if permissions: - return f"{base_prompt}\n\nAllowed tools: {', '.join(str(p) for p in permissions)}" return base_prompt def _get_knowledge_agent_config(self, session): diff --git a/requirements/django.txt b/requirements/django.txt index 078be5e..33c09a4 100644 --- a/requirements/django.txt +++ b/requirements/django.txt @@ -1,3 +1,4 @@ +better-profanity==0.7.0 celery[redis]==5.6.2 channels==4.3.2 channels-redis==4.3.0 diff --git a/site/src/stores/agentStore.ts b/site/src/stores/agentStore.ts index ffad414..aab08c7 100644 --- a/site/src/stores/agentStore.ts +++ b/site/src/stores/agentStore.ts @@ -30,7 +30,7 @@ export const useAgentStore = defineStore('agent', () => { } const wsProtocol = window.location.protocol === 'https:' ? 'wss' : 'ws' - const wsUrl = `${wsProtocol}://${window.location.host}/ws/onboarding/${id}/` + const wsUrl = `${wsProtocol}://${window.location.host}/ws/onboarding/chat/${id}/` socket.value = new WebSocket(wsUrl) socket.value.onopen = () => { @@ -102,8 +102,8 @@ export const useAgentStore = defineStore('agent', () => { executionStatus.value = 'running' socket.value.send( JSON.stringify({ + action: 'message', query: data.query, - role_uuid: data.role_uuid, max_tokens: data.max_tokens, }), ) diff --git a/site/src/stores/onboardingAgentStore.ts b/site/src/stores/onboardingAgentStore.ts index 411a0e4..db1a483 100644 --- a/site/src/stores/onboardingAgentStore.ts +++ b/site/src/stores/onboardingAgentStore.ts @@ -30,7 +30,7 @@ export const useOnboardingAgentStore = defineStore('onboarding-agent', () => { } const wsProtocol = window.location.protocol === 'https:' ? 'wss' : 'ws' - const wsUrl = `${wsProtocol}://${window.location.host}/ws/onboarding/${id}/` + const wsUrl = `${wsProtocol}://${window.location.host}/ws/onboarding/generate/${id}/` socket.value = new WebSocket(wsUrl) socket.value.onopen = () => { diff --git a/site/src/views/AgentDetailView.vue b/site/src/views/AgentDetailView.vue index 2f8b4cb..9df3fce 100644 --- a/site/src/views/AgentDetailView.vue +++ b/site/src/views/AgentDetailView.vue @@ -27,7 +27,6 @@ const agentUuid = route.params.agentUuid as string const agent = ref({ name: 'Loading...', - description: '', status: 'idle', uuid: agentUuid, agent_type: 'knowledge', @@ -193,10 +192,6 @@ onUnmounted(() => { - - {{ agent.description || 'No description available' }} - - Configuration
diff --git a/site/src/views/ProgressDetailView.vue b/site/src/views/ProgressDetailView.vue index 38e9cc5..eff8aed 100644 --- a/site/src/views/ProgressDetailView.vue +++ b/site/src/views/ProgressDetailView.vue @@ -79,9 +79,9 @@ const buildProgressReport = (session: ProgressSessionApi | null) => { ].join('\n') } -const websocketUrl = (id: string) => { +const websocketUrl = (id: string, userUuid?: string, flowUuid?: string) => { const protocol = window.location.protocol === 'https:' ? 'wss' : 'ws' - return `${protocol}://${window.location.host}/ws/onboarding/${id}/` + return `${protocol}://${window.location.host}/ws/onboarding/progress/${id}/${flowUuid}/${userUuid}/` } const closeMonitorSocket = () => { @@ -107,7 +107,7 @@ const runProgressMonitor = async () => { try { closeMonitorSocket() - const ws = new WebSocket(websocketUrl(roleId.value)) + const ws = new WebSocket(websocketUrl(roleId.value, selectedUserUuid.value || undefined, selectedFlowUuid.value || undefined)) monitorSocket.value = ws await new Promise((resolve, reject) => { @@ -120,14 +120,7 @@ const runProgressMonitor = async () => { }, 30000) ws.onopen = () => { - ws.send( - JSON.stringify({ - action: 'progress_monitor', - role_uuid: roleId.value, - user_uuid: selectedUserUuid.value || undefined, - flow_uuid: selectedFlowUuid.value || undefined, - }), - ) + ws.send(JSON.stringify({action: 'progress_monitor'}),) } ws.onmessage = (event) => {