import json import re from uuid import uuid4 from channels.db import database_sync_to_async from apps.accounts.models import Role from apps.onboarding.consumers.base import BaseOnboardingConsumer, LogType from apps.onboarding.consumers.prompts import OnboardingPrompts from apps.onboarding.models import OnboardingFlow __all__ = ["OnboardingGenerateConsumer"] class OnboardingGenerateConsumer(BaseOnboardingConsumer): """ Route: /ws/onboarding/generate// """ role_uuid: str def parse_extra(self): self.role_uuid = self.scope["url_route"]["kwargs"].get("role_uuid") async def action_start_full_onboarding(self, data: dict) -> None: user_message = data.get("message") if user_message and not self.moderator.is_clean(user_message): return await self.send_error("Your message did not pass moderation. Please revise your input.") if not await self.can_access_role(self.role_uuid): return await self.send_error("You are not a member of this role or do not have permission to access it") await self.run_pipeline(await self.get_role(self.role_uuid)) async def run_pipeline(self, role: Role) -> None: """ Full orchestration pipeline for generating curriculum, knowledge content, and assessment quiz for a given role """ self.logger.info(f"Starting onboarding generation pipeline for role={role.name} (uuid={role.uuid}) by user={self.user.full_name}") await self.send_log(LogType.STATUS, f"Phase 1: Generating Curriculum...", "curriculum") ca_config = await self.get_config_by_type(role.uuid, 'curriculum') if not ca_config: return await self.send_error("Missing curriculum AgentConfig for this role") ca_response = await self.orchestrate(OnboardingPrompts.curriculum_generation_prompt(), ca_config, minimum_turns=1, max_tokens=384) topics = self._extract_json_list(ca_response) if not topics: return await self.send_log(LogType.ERROR, "Curriculum generation returned no topics", f"Curriculum generation produced no topics for role={role.name} (uuid={role.uuid})") full_structure = [] module_briefs = [] for index, topic in enumerate(topics): await self.send_log(LogType.STATUS, f"Phase 2: Researching {topic}...", "knowledge") ka_config = await self.get_config_by_type(role.uuid, 'knowledge') if not ka_config: return await self.send_error("Missing knowledge AgentConfig for this role") knowledge_hits = await self.fetch_knowledge_context(role.uuid, topic) context_markdown = self.format_knowledge_context(knowledge_hits) ka_response = await self.orchestrate( OnboardingPrompts.knowledge_generation_prompt(topic, context_markdown), ka_config, minimum_turns=2, max_tokens=2400 ) full_structure.append({ "title": topic, "body": ka_response, "order": index, "fields": [], "meta": { "topic_index": index, "table_of_contents": [str(item) for item in topics], }, }) module_briefs.append({ "topic": str(topic), "summary_excerpt": str(ka_response)[:1200], }) await self.send_log(LogType.STATUS, "Phase 3: Creating final assessment quiz...", "assessment") aa_config = await self.get_config_by_type(role.uuid, 'assessment') if not aa_config: return await self.send_error("Missing assessment AgentConfig for this role") question_count = 8 try: random_result = await self.router.handle_tool_call("random_int", {"min": 6, "max": 10}) if isinstance(random_result, dict) and isinstance(random_result.get("value"), int): question_count = int(random_result["value"]) except Exception: question_count = 8 quiz_response = await self.orchestrate( OnboardingPrompts.quiz_generation_prompt(question_count, module_briefs), aa_config, minimum_turns=1, max_tokens=1600, ) quiz_fields = self._sanitize_quiz_fields(self._extract_json_list(quiz_response)) if not quiz_fields: await self.send_log(LogType.STATUS, "Assessment output invalid, retrying quiz generation...", "assessment") retry_response = await self.orchestrate( OnboardingPrompts.quiz_generation_retry_prompt(question_count, module_briefs), aa_config, minimum_turns=1, max_tokens=1600, ) quiz_fields = self._sanitize_quiz_fields(self._extract_json_list(retry_response)) if not quiz_fields: await self.send_log(LogType.STATUS, "Assessment output still invalid. Using fallback final quiz.", "assessment") quiz_fields = self._build_fallback_quiz_fields([str(topic) for topic in topics], count=question_count) full_structure.append({ "title": "Final Assessment Quiz", "body": ( "### Final Quiz\n" "Answer all questions below. You need **80%** to complete onboarding. " "You can update answers and submit when ready." ), "order": len(full_structure), "fields": quiz_fields, "meta": { "page_type": "final_quiz", "pass_mark": 80, }, }) await self.save_full_flow(role.uuid, full_structure) self.logger.info("Full onboarding generation completed: role_uuid=%s pages=%s", role.uuid, len(full_structure)) await self.send_log(LogType.COMPLETED, "Onboarding pipeline complete and structure saved.") async def fetch_knowledge_context(self, role_uuid, topic): query = f"onboarding training content for {topic}" await self.send_log(LogType.TOOL_START, "Accessing knowledge base: search_knowledge...", {"query": query, "role_uuid": str(role_uuid)}) try: result = await self.router.handle_tool_call( "search_knowledge", { "query": query, "role_uuid": str(role_uuid), }, ) await self.send_log(LogType.TOOL_RESULT, f"Retrieved {len(result) if isinstance(result, list) else 0} knowledge chunk(s)", result) return result if isinstance(result, list) else [] except Exception as exc: await self.send_log(LogType.ERROR, f"Knowledge retrieval failed for topic '{topic}': {str(exc)}") return [] def _coerce_list_payload(self, payload): if isinstance(payload, list): return payload if isinstance(payload, dict): for key in ('questions', 'items', 'fields', 'quiz'): value = payload.get(key) if isinstance(value, list): return value return [] def _extract_json_list(self, text: str) -> list: """ Extracts a JSON list from model output, tolerating wrappers and markdown fences """ if not text: return [] candidate_texts = [str(text).strip()] for block in re.findall(r'```(?:json)?\s*([\s\S]*?)```', str(text), re.IGNORECASE): candidate_texts.append(block.strip()) decoder = json.JSONDecoder() for candidate in candidate_texts: if not candidate: continue try: parsed = json.loads(candidate) coerced = self._coerce_list_payload(parsed) if coerced: return coerced except Exception: pass for idx, char in enumerate(candidate): if char not in '[{': continue try: parsed, _ = decoder.raw_decode(candidate[idx:]) except Exception: continue coerced = self._coerce_list_payload(parsed) if coerced: return coerced return [] def format_knowledge_context(self, knowledge_hits): if not knowledge_hits: return "No indexed MCP documents found for this role/topic." lines = [] for idx, item in enumerate(knowledge_hits[:5]): source = item.get("source", "Unknown Source") if isinstance(item, dict) else "Unknown Source" relevance = item.get("relevance") if isinstance(item, dict) else None content = item.get("content", "") if isinstance(item, dict) else "" safe_content = str(content).strip()[:1600] lines.append( f"[{idx + 1}] Source: {source} | Relevance: {relevance}\n{safe_content}" ) return "\n\n".join(lines) def _sanitize_quiz_fields(self, raw_fields): sanitized = [] seen_keys = set() for index, field in enumerate(raw_fields or []): if not isinstance(field, dict): continue key = str(field.get('key') or f'final_quiz_q_{index + 1}').strip().lower().replace(' ', '_') if not key: key = f'final_quiz_q_{index + 1}' if key in seen_keys: key = f'{key}_{index + 1}' seen_keys.add(key) label = str(field.get('label') or '').strip() if not label: continue field_type = str(field.get('field_type') or 'select').strip().lower() if field_type not in ('select', 'text', 'textarea'): field_type = 'select' raw_options = field.get('options') if isinstance(field.get('options'), list) else [] options = [] for option in raw_options: option_text = str(option).strip() if option_text and option_text not in options: options.append(option_text) validation = field.get('validation') if isinstance(field.get('validation'), dict) else {} if field_type == 'select': if len(options) < 2: continue correct_option = str(validation.get('correct_option') or '').strip() if correct_option not in options: correct_option = options[0] sanitized.append({ 'key': key, 'label': label, 'field_type': 'select', 'options': options[:5], 'required': True, 'validation': { 'correct_option': correct_option, 'explanation': str(validation.get('explanation') or ''), }, }) continue accepted_answers_raw = validation.get('accepted_answers') if isinstance(accepted_answers_raw, list): accepted_answers = [str(item).strip() for item in accepted_answers_raw if str(item).strip()] else: accepted_single = str(validation.get('correct_answer') or '').strip() accepted_answers = [accepted_single] if accepted_single else [] if not accepted_answers: continue sanitized.append({ 'key': key, 'label': label, 'field_type': 'textarea' if field_type == 'textarea' else 'text', 'options': [], 'required': True, 'validation': { 'accepted_answers': accepted_answers, 'explanation': str(validation.get('explanation') or ''), }, }) return sanitized def _build_fallback_quiz_fields(self, topics, count=8): safe_topics = [str(topic).strip() for topic in (topics or []) if str(topic).strip()] if not safe_topics: safe_topics = ['onboarding fundamentals'] fallback_fields = [] for index in range(count): topic = safe_topics[index % len(safe_topics)] key = f'final_quiz_q_{index + 1}' if index % 3 == 0: fallback_fields.append({ 'key': key, 'label': f"In one or two sentences, what is the safest approach when handling {topic}?", 'field_type': 'textarea', 'options': [], 'required': True, 'validation': { 'accepted_answers': [ 'best practices', 'documentation', 'quality', 'compliance', ], 'explanation': 'Good answers reference documented best practices, quality checks, and compliance.', }, }) continue correct = f"Use documented best practices for {topic}." options = [ correct, f"Skip review steps for {topic} to move faster.", f"Rely only on assumptions instead of evidence for {topic}.", f"Ignore quality and compliance checks in {topic} tasks.", ] fallback_fields.append({ 'key': key, 'label': f"Which approach is most appropriate when working on {topic}?", 'field_type': 'select', 'options': options, 'required': True, 'validation': { 'correct_option': correct, 'explanation': f"{correct} balances reliability, quality, and role expectations.", }, }) return fallback_fields def _normalize_structure(self, structure): normalized_pages = [] for index, page in enumerate(structure or []): fields = [] for field_index, field in enumerate(page.get('fields', []) if isinstance(page, dict) else []): if not isinstance(field, dict): continue key = str(field.get('key') or f'field_{field_index + 1}') raw_options = field.get('options') if isinstance(field.get('options'), list) else [] options = [str(option) for option in raw_options if str(option).strip()] validation = field.get('validation') if isinstance(field.get('validation'), dict) else {} correct_option = validation.get('correct_option') if correct_option is not None: correct_option = str(correct_option) normalized_validation = { 'correct_option': correct_option if correct_option in options else None, 'accepted_answers': [ str(item).strip() for item in (validation.get('accepted_answers') if isinstance(validation.get('accepted_answers'), list) else []) if str(item).strip() ], 'explanation': str(validation.get('explanation') or ''), } fields.append({ 'uuid': str(uuid4()), 'key': key, 'label': str(field.get('label') or key.replace('_', ' ').title()), 'field_type': str(field.get('field_type') or 'text'), 'required': bool(field.get('required', False)), 'options': options, 'default_value': field.get('default_value', ''), 'validation': normalized_validation, }) page_title = page.get('title') if isinstance(page, dict) else None page_body = page.get('body') if isinstance(page, dict) else '' page_order = page.get('order') if isinstance(page, dict) else index normalized_pages.append({ 'uuid': str(uuid4()), 'title': str(page_title or f'Module {index + 1}'), 'body': str(page_body or ''), 'order': int(page_order if isinstance(page_order, int) else index), 'fields': fields, 'meta': page.get('meta') if isinstance(page.get('meta'), dict) else {}, }) return normalized_pages @database_sync_to_async def save_full_flow(self, role_uuid, structure): """Saves the final nested structure to the OnboardingFlow model.""" from apps.accounts.models import Role role = Role.objects.get(uuid=role_uuid) normalized_structure = self._normalize_structure(structure) flow, _ = OnboardingFlow.objects.update_or_create( role=role, defaults={ 'title': f"AI Onboarding: {role.name}", 'structure': normalized_structure, 'is_active': True } ) return flow