Dynavera/apps/onboarding/consumers/progress.py

143 lines
6.2 KiB
Python
Raw Normal View History

from django.db.models import Q
from channels.db import database_sync_to_async
from apps.accounts.models import Role
from apps.onboarding.consumers.base import BaseOnboardingConsumer, LogType
from apps.onboarding.consumers.prompts import OnboardingPrompts
from apps.onboarding.models import OnboardingFlow, OnboardingSession
__all__ = ["OnboardingProgressConsumer"]
class OnboardingProgressConsumer(BaseOnboardingConsumer):
"""
Route: /ws/onboarding/progress/<role_uuid>/<flow_uuid>/<user_uuid>/
"""
role_uuid: str
flow_uuid: str
target_user_uuid: str
def parse_extra(self):
self.role_uuid = self.scope["url_route"]["kwargs"].get("role_uuid")
self.flow_uuid = self.scope["url_route"]["kwargs"].get("flow_uuid")
self.target_user_uuid = self.scope["url_route"]["kwargs"].get("user_uuid")
async def action_progress_monitor(self, data: dict):
"""
Analyzes the learner's progress and provides strengths, gaps, and next actions.
"""
if not await self.can_access_role(self.role_uuid):
return await self.send_error("Forbidden: You do not have access to this role.")
target_user_id = self.user.id
if self.target_user_uuid and str(self.target_user_uuid) != str(self.user.uuid):
target_user_id = await self.resolve_target_user_id(self.role_uuid, self.user.id, self.target_user_uuid)
if not target_user_id:
return await self.send_error("Forbidden: You cannot monitor this user.")
await self.send_log(LogType.STATUS, "Progress Monitor is analyzing your onboarding progress...", "monitor")
monitor_config = await self.get_config_by_type(self.role_uuid, 'monitor')
if not monitor_config:
return await self.send_error("Missing Progress Monitor AgentConfig for this role")
progress_context = await self.get_role_progress_context(
self.role_uuid,
target_user_id,
flow_uuid=self.flow_uuid,
)
feedback = await self.stream_llm(
monitor_config,
OnboardingPrompts.progress_monitoring_prompt(progress_context),
max_tokens=640,
)
await self.send_log(LogType.COMPLETED, "Progress analysis complete.", {
"role_uuid": self.role_uuid,
"feedback": feedback,
"status": progress_context.get("latest_status", "unknown"),
"user_id": target_user_id,
"flow_uuid": self.flow_uuid,
"is_completed": progress_context.get("is_completed", False),
})
@database_sync_to_async
def get_role_progress_context(self, role_uuid, user_id, flow_uuid=None):
role = Role.objects.get(uuid=role_uuid)
active_flow = OnboardingFlow.objects.filter(role=role, is_active=True).order_by('-updated_at').first()
scoped_flow = OnboardingFlow.objects.filter(role=role, uuid=flow_uuid).first() if flow_uuid else None
sessions = OnboardingSession.objects.filter(user_id=user_id, role=role).order_by('-updated_at')
if flow_uuid:
sessions = sessions.filter(Q(flow__uuid=flow_uuid) | Q(flow__isnull=True, state__flow_uuid=str(flow_uuid)))
latest_session = sessions.first()
if not latest_session:
return {
"role_uuid": str(role.uuid),
"role_name": role.name,
"latest_status": "not_started",
"flow_uuid": str((scoped_flow or active_flow).uuid) if (scoped_flow or active_flow) else None,
"is_completed": False,
"progress": 0,
}
state = latest_session.state or {}
flow_for_context = latest_session.flow or scoped_flow or active_flow
structure = flow_for_context.structure if flow_for_context and isinstance(flow_for_context.structure, list) else []
# Find the Final Quiz page for grading details
quiz_page = next((p for p in structure if isinstance(p, dict) and p.get("meta", {}).get("page_type") == "final_quiz"), None)
if quiz_page is None and structure:
quiz_page = structure[-1]
quiz_page_uuid = str(quiz_page.get("uuid") or "") if quiz_page else ""
responses = state.get("responses", {})
quiz_responses = responses.get(quiz_page_uuid, {}) if isinstance(responses, dict) else {}
final_quiz_result = state.get("final_quiz_result", {})
grading_details = final_quiz_result.get("grading_details", []) if isinstance(final_quiz_result, dict) else []
# Build QA map for the AI
final_quiz_qa = []
if quiz_page:
for field in quiz_page.get("fields", []):
key = field.get("key")
detail = next((d for d in grading_details if d.get("key") == key), {})
final_quiz_qa.append({
"label": field.get("label"),
"answer": quiz_responses.get(key),
"marked_correct": detail.get("correct"),
"reason": detail.get("reason", "")
})
return {
"role_name": role.name,
"latest_status": latest_session.status,
"progress": state.get("progress_percentage", 0),
"completed_modules": state.get("completed_modules", []),
"is_completed": latest_session.status == 'completed',
"final_quiz_qa": final_quiz_qa,
}
@database_sync_to_async
def resolve_target_user_id(self, role_uuid, requester_id, target_user_uuid):
from apps.accounts.models import Role, User
role = Role.objects.filter(uuid=role_uuid).first()
requester = User.objects.filter(id=requester_id).first()
target = User.objects.filter(uuid=target_user_uuid).first()
if not all([role, requester, target]):
return None
# Check if requester is Owner or Manager
is_privileged = (role.organization.owner.id == requester_id or
(bool(requester.is_manager) and role.organization.members.filter(id=requester_id).exists()))
if is_privileged and role.members.filter(id=target.id).exists():
return target.id
return None