Dynavera/apps/onboarding/viewsets.py
Viswamedha Nalabotu 5280433c8f Removed llm config
2026-03-22 19:35:07 +00:00

965 lines
38 KiB
Python

import httpx
import json
import re
from django.conf import settings
from django.db import transaction
from django.db.models import Q
from django.utils import timezone
from rest_framework.decorators import action
from rest_framework.exceptions import NotFound, PermissionDenied, ValidationError
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework.status import (
HTTP_200_OK,
HTTP_201_CREATED,
HTTP_400_BAD_REQUEST,
HTTP_403_FORBIDDEN,
)
from rest_framework.viewsets import ModelViewSet, ReadOnlyModelViewSet
from apps.accounts.models import Organization, Role, User
from apps.accounts.permissions import CanManageOrganization, can_manage_organization
from apps.onboarding.consumers.prompts import OnboardingPrompts
from apps.onboarding.mixins import RequestParamMixin
from apps.onboarding.models import AgentConfig, AgentInteractionLog, OnboardingFlow, OnboardingSession
from apps.onboarding.serializers import (
AgentConfigSerializer,
AgentInteractionLogSerializer,
OnboardingFlowSerializer,
OnboardingSessionSerializer,
)
class OnboardingFlowViewSet(RequestParamMixin, ModelViewSet):
queryset = OnboardingFlow.objects.all()
serializer_class = OnboardingFlowSerializer
permission_classes = [IsAuthenticated]
lookup_field = 'uuid'
def get_permissions(self):
permissions = super().get_permissions()
if self.action in ['update', 'partial_update', 'destroy']:
return [*permissions, CanManageOrganization()]
return permissions
def get_queryset(self):
user = self.request.user
queryset = OnboardingFlow.objects.filter(
Q(role__organization__owner=user) |
Q(role__organization__members=user)
).distinct().order_by('-created_at')
if organization_uuid := self._get_param('organization_uuid'):
queryset = queryset.filter(role__organization__uuid=organization_uuid)
if role_uuid := self._get_param('role_uuid'):
queryset = queryset.filter(role__uuid=role_uuid)
return queryset
def create(self, request, *args, **kwargs):
role_uuid = request.data.get('role_uuid')
if not role_uuid:
raise ValidationError({'role_uuid': 'role_uuid is required.'})
role = Role.objects.filter(uuid=role_uuid).first()
if not role:
raise NotFound('Role not found')
if not can_manage_organization(request.user, role.organization):
raise PermissionDenied('Forbidden')
is_active = self._parse_bool(request.data.get('is_active'), default=True, field_name='is_active')
flow = OnboardingFlow.objects.create(
title=(request.data.get('title') or f'Onboarding Flow: {role.name}').strip(),
role=role,
structure=request.data.get('structure') or [],
is_active=is_active,
)
serializer = self.get_serializer(flow)
return Response(serializer.data, status=HTTP_201_CREATED)
def update(self, request, *args, **kwargs):
flow = self.get_object()
title = request.data.get('title')
structure = request.data.get('structure')
is_active = request.data.get('is_active')
if title is not None:
flow.title = str(title).strip()
if structure is not None:
flow.structure = structure if isinstance(structure, list) else flow.structure
if is_active is not None:
flow.is_active = self._parse_bool(is_active, field_name='is_active')
flow.save(update_fields=['title', 'structure', 'is_active', 'updated_at'])
serializer = self.get_serializer(flow)
return Response(serializer.data, status=HTTP_200_OK)
def partial_update(self, request, *args, **kwargs):
flow = self.get_object()
if 'title' in request.data:
flow.title = str(request.data.get('title') or '').strip()
if 'structure' in request.data:
structure = request.data.get('structure')
if not isinstance(structure, list):
raise ValidationError({'structure': 'structure must be a list'})
flow.structure = structure
if 'is_active' in request.data:
flow.is_active = self._parse_bool(request.data.get('is_active'), field_name='is_active')
flow.save(update_fields=['title', 'structure', 'is_active', 'updated_at'])
serializer = self.get_serializer(flow)
return Response(serializer.data, status=HTTP_200_OK)
def destroy(self, request, *args, **kwargs):
flow = self.get_object()
with transaction.atomic():
OnboardingSession.objects.filter(flow=flow).delete()
self.perform_destroy(flow)
return Response(status=204)
def _parse_bool(self, value, *, default=None, field_name='value'):
if value is None:
if default is not None:
return default
raise ValidationError({field_name: f'{field_name} is required'})
if isinstance(value, bool):
return value
if isinstance(value, int):
if value in (0, 1):
return bool(value)
raise ValidationError({field_name: f'{field_name} must be a boolean value'})
if isinstance(value, str):
normalized = value.strip().lower()
if normalized in ['true', '1', 'yes', 'on']:
return True
if normalized in ['false', '0', 'no', 'off']:
return False
raise ValidationError({field_name: f'{field_name} must be a boolean value'})
@action(detail=True, methods=['post'], url_path='start-session')
def start_session(self, request, uuid=None):
flow = self.get_object()
if not request.user.is_manager and not flow.role.members.filter(id=request.user.id).exists():
return Response(
{'error': 'Join this role before starting onboarding.'},
status=HTTP_403_FORBIDDEN,
)
session = OnboardingSession.objects.filter(user=request.user, role=flow.role, flow=flow).first()
if not session:
session = OnboardingSession.objects.filter(user=request.user, role=flow.role).first()
if session:
state = session.state or {}
state['flow_uuid'] = str(flow.uuid)
session.flow = flow
session.state = state
session.save(update_fields=['flow', 'state', 'updated_at'])
serializer = OnboardingSessionSerializer(session)
return Response(serializer.data, status=HTTP_200_OK)
session = OnboardingSession.objects.create(
user=request.user,
role=flow.role,
flow=flow,
status='active',
state={
'progress': 0,
'current_step': 'intro',
},
)
serializer = OnboardingSessionSerializer(session)
return Response(serializer.data, status=HTTP_201_CREATED)
serializer = OnboardingSessionSerializer(session)
return Response(serializer.data, status=HTTP_200_OK)
class AgentConfigViewSet(RequestParamMixin, ModelViewSet):
queryset = AgentConfig.objects.all()
serializer_class = AgentConfigSerializer
permission_classes = [IsAuthenticated]
lookup_field = 'uuid'
def get_permissions(self):
permissions = super().get_permissions()
if self.action in ['update', 'partial_update', 'destroy']:
return [*permissions, CanManageOrganization()]
return permissions
def get_queryset(self):
queryset = AgentConfig.objects.filter(
Q(organization__owner=self.request.user) | Q(organization__members=self.request.user)
).distinct().order_by('-updated_at')
if organization_uuid := self._get_param('organization_uuid'):
queryset = queryset.filter(organization__uuid=organization_uuid)
if role_uuid := self._get_param('role_uuid'):
queryset = queryset.filter(role__uuid=role_uuid)
return queryset
def create(self, request, *args, **kwargs):
organization_uuid = self._get_param('organization_uuid')
if not organization_uuid:
raise ValidationError({'organization_uuid': 'organization_uuid is required.'})
organization = Organization.objects.filter(uuid=organization_uuid).filter(
Q(owner=request.user) | Q(members=request.user),
).first()
if not organization:
raise NotFound('Organization not found')
if not can_manage_organization(request.user, organization):
raise PermissionDenied('Forbidden')
name = str(request.data.get('name') or '').strip()
if not name:
raise ValidationError({'name': 'Name is required.'})
agent_type = str(request.data.get('agent_type') or '').strip()
if not agent_type:
raise ValidationError({'agent_type': 'agent_type is required.'})
role_uuid = request.data.get('role_uuid')
role = None
if role_uuid:
role = Role.objects.filter(uuid=role_uuid, organization=organization).first()
if not role:
raise NotFound('Role not found in this organization')
config = AgentConfig.objects.create(
organization=organization,
role=role,
name=name,
agent_type=agent_type,
system_prompt=str(request.data.get('system_prompt') or '')
)
serializer = self.get_serializer(config)
return Response(serializer.data, status=HTTP_201_CREATED)
def update(self, request, *args, **kwargs):
return self.partial_update(request, *args, **kwargs)
def partial_update(self, request, *args, **kwargs):
config = self.get_object()
fields = ['name', 'agent_type', 'system_prompt']
for field in fields:
if field in request.data and request.data.get(field) is not None:
setattr(config, field, request.data[field])
config.save(update_fields=fields + ['updated_at'])
serializer = self.get_serializer(config)
return Response(serializer.data, status=HTTP_200_OK)
class OnboardingSessionViewSet(RequestParamMixin, ModelViewSet):
queryset = OnboardingSession.objects.all()
serializer_class = OnboardingSessionSerializer
permission_classes = [IsAuthenticated]
lookup_field = 'uuid'
def get_queryset(self):
user: User = self.request.user
if user.is_manager:
queryset = OnboardingSession.objects.filter(role__organization__members=user).distinct()
else:
queryset = OnboardingSession.objects.filter(user=user)
if organization_uuid := self._get_param('organization_uuid'):
queryset = queryset.filter(role__organization__uuid=organization_uuid)
if role_uuid := self._get_param('role_uuid'):
queryset = queryset.filter(role__uuid=role_uuid)
if user_uuid := self._get_param('user_uuid'):
if not user.is_manager and str(user.uuid) != str(user_uuid):
raise PermissionDenied('You can only view your own progress sessions.')
queryset = queryset.filter(user__uuid=user_uuid)
if flow_uuid := self._get_param('flow_uuid'):
queryset = queryset.filter(flow__uuid=flow_uuid)
if status_value := self.request.query_params.get('status'):
queryset = queryset.filter(status=status_value)
return queryset.order_by('-created_at')
@action(detail=False, methods=['get'], url_path='progress-overview')
def progress_overview(self, request):
user: User = request.user
role_uuid = request.query_params.get('role_uuid')
if user.is_manager:
roles_qs = Role.objects.filter(Q(organization__owner=user) | Q(organization__members=user)).distinct()
else:
roles_qs = Role.objects.filter(members=user)
if role_uuid:
roles_qs = roles_qs.filter(uuid=role_uuid)
rows = []
for role in roles_qs.order_by('name'):
flows = list(OnboardingFlow.objects.filter(role=role).order_by('-updated_at'))
if not flows:
continue
if user.is_manager:
learners = list(role.members.all().order_by('first_name', 'last_name', 'email_address'))
else:
learners = [user] if role.members.filter(id=user.id).exists() else []
if not learners:
continue
role_sessions = list(
OnboardingSession.objects.filter(role=role, user__in=learners)
.select_related('user').order_by('-updated_at')
)
latest_by_user_flow = {}
for session in role_sessions:
if not session.flow_id:
continue
key = (session.user_id, str(session.flow.uuid))
if key not in latest_by_user_flow:
latest_by_user_flow[key] = session
for flow in flows:
flow_uuid_str = str(flow.uuid)
for learner in learners:
latest_session = latest_by_user_flow.get((learner.id, flow_uuid_str))
latest_status = latest_session.status if latest_session else 'not_started'
state = latest_session.state if latest_session and isinstance(latest_session.state, dict) else {}
progress = state.get('progress_percentage', state.get('progress', 0))
rows.append({
'role': {
'uuid': str(role.uuid),
'name': role.name,
},
'user': {
'uuid': str(learner.uuid),
'name': str(getattr(learner, 'full_name', '')).strip() or learner.email_address,
'email': learner.email_address,
},
'flow': {
'uuid': flow_uuid_str,
'title': flow.title,
'is_active': flow.is_active,
},
'latest_status': latest_status,
'progress': int(progress) if isinstance(progress, (int, float)) else 0,
'is_completed': latest_status == 'completed',
'latest_session_uuid': str(latest_session.uuid) if latest_session else None,
'updated_at': latest_session.updated_at.isoformat() if latest_session and latest_session.updated_at else None,
})
rows.sort(
key=lambda row: (
str(row.get('role', {}).get('name', '')),
str(row.get('user', {}).get('name', '')),
str(row.get('flow', {}).get('title', '')),
)
)
return Response(rows, status=HTTP_200_OK)
def create(self, request, *args, **kwargs):
return Response(
{'error': 'Use onboarding-flow/<uuid>/start-session/ to create a session.'},
status=HTTP_400_BAD_REQUEST,
)
def _has_attempt(self, value):
if value is None:
return False
if isinstance(value, str):
return bool(value.strip())
if isinstance(value, (list, dict, tuple, set)):
return bool(value)
return True
def _get_flow_for_session(self, session):
return session.flow
def _get_page_from_flow(self, flow, page_uuid):
pages = flow.structure if isinstance(flow.structure, list) else []
page = next(
(
item for item in pages
if isinstance(item, dict) and str(item.get('uuid')) == str(page_uuid)
),
None,
)
return page, pages
def _upsert_page_responses(self, session, page_uuid, responses):
state = session.state or {}
stored_responses = state.get('responses', {})
if not isinstance(stored_responses, dict):
stored_responses = {}
stored_responses[str(page_uuid)] = responses
state['responses'] = stored_responses
state['last_page_uuid'] = str(page_uuid)
session.state = state
session.save(update_fields=['state', 'updated_at'])
def _record_page_visit(self, session, page_uuid):
state = session.state or {}
visited_pages = state.get('visited_pages', [])
if not isinstance(visited_pages, list):
visited_pages = []
page_uuid_str = str(page_uuid)
if page_uuid_str not in visited_pages:
visited_pages.append(page_uuid_str)
state['visited_pages'] = visited_pages
state['last_page_uuid'] = page_uuid_str
session.state = state
session.save(update_fields=['state', 'updated_at'])
def _mark_page_progress(self, session, page_uuid, total_pages, is_final_quiz=False):
if is_final_quiz:
return
state = session.state or {}
completed_modules = state.get('completed_modules', [])
if not isinstance(completed_modules, list):
completed_modules = []
page_uuid_str = str(page_uuid)
if page_uuid_str not in completed_modules:
completed_modules.append(page_uuid_str)
state['completed_modules'] = completed_modules
completed_count = len(completed_modules)
state['progress_percentage'] = int(round((completed_count / total_pages) * 100)) if total_pages else 0
session.state = state
session.save(update_fields=['state', 'updated_at'])
def _build_system_prompt(self, config):
return (config and config.system_prompt) or OnboardingPrompts.FALLBACK_SYSTEM_PROMPT
def _get_agent_config(self, session, agent_type):
role_specific = AgentConfig.objects.filter(
role=session.role,
agent_type=agent_type,
).order_by('-updated_at').first()
if role_specific:
return role_specific
return AgentConfig.objects.filter(
organization=session.role.organization,
role__isnull=True,
agent_type=agent_type,
).order_by('-updated_at').first()
def _extract_json_object(self, text):
if not text:
return None
candidate = str(text).strip()
try:
return json.loads(candidate)
except Exception:
pass
matches = re.findall(r'```(?:json)?\s*([\s\S]*?)```', candidate, re.IGNORECASE)
for block in matches:
try:
return json.loads(block.strip())
except Exception:
continue
decoder = json.JSONDecoder()
for idx, char in enumerate(candidate):
if char != '{':
continue
try:
obj, _ = decoder.raw_decode(candidate[idx:])
if isinstance(obj, dict):
return obj
except Exception:
continue
return None
def _grade_final_quiz_with_assessment_agent(self, session, quiz_fields, page_responses, pass_mark):
select_results = []
ai_fields = []
select_correct_count = 0
for field in quiz_fields:
if not isinstance(field, dict):
continue
key = str(field.get('key') or '').strip()
if not key:
continue
field_type = str(field.get('field_type') or '').strip().lower()
validation = field.get('validation') if isinstance(field.get('validation'), dict) else {}
if field_type == 'select':
correct_option = str(validation.get('correct_option') or '').strip()
if correct_option:
answer = page_responses.get(key)
attempted = self._has_attempt(answer)
answer_text = str(answer).strip() if attempted else ''
is_correct = attempted and answer_text == correct_option
if is_correct:
select_correct_count += 1
select_results.append(
{
'key': key,
'correct': is_correct,
'reason': '' if is_correct else 'Selected option does not match the expected choice.',
}
)
continue
ai_fields.append(field)
ai_correct_count = 0
ai_gradable_count = 0
ai_per_question = []
if ai_fields:
config = self._get_agent_config(session, 'assessment') or self._get_agent_config(session, 'knowledge')
if not config:
return None, {'error': 'No assessment/knowledge agent configured for grading.'}
prompt = OnboardingPrompts.grading_prompt(ai_fields, page_responses)
try:
with httpx.Client(timeout=60.0, auth=settings.INFERENCE_AUTH) as client:
response = client.post(
settings.INFERENCE_CHAT_COMPLETIONS_ENDPOINT,
json={
'messages': [
{'role': 'system', 'content': self._build_system_prompt(config)},
{'role': 'user', 'content': prompt},
],
'max_tokens': 1000,
},
)
response.raise_for_status()
content = response.json().get('choices', [{}])[0].get('message', {}).get('content', '')
except Exception:
return None, {'error': 'Assessment grading model unavailable.'}
parsed = self._extract_json_object(content)
if not isinstance(parsed, dict):
return None, {'error': 'Assessment grading returned invalid JSON.'}
try:
ai_correct_count = int(parsed.get('correct_count', 0))
ai_gradable_count = int(parsed.get('gradable_count', len(ai_fields)))
except Exception:
return None, {'error': 'Assessment grading returned invalid counts.'}
if ai_gradable_count < 0:
ai_gradable_count = 0
if ai_correct_count < 0:
ai_correct_count = 0
if ai_correct_count > ai_gradable_count:
ai_correct_count = ai_gradable_count
ai_per_question = parsed.get('per_question', []) if isinstance(parsed.get('per_question', []), list) else []
correct_count = select_correct_count + ai_correct_count
gradable_count = len(select_results) + ai_gradable_count
score_percentage = int(round((correct_count / gradable_count) * 100)) if gradable_count else 0
merged_per_question = list(select_results) + list(ai_per_question)
return {
'correct_count': correct_count,
'gradable_count': gradable_count,
'score_percentage': score_percentage,
'pass_mark': pass_mark,
'per_question': merged_per_question,
}, None
def _sanitize_grading_details(self, quiz_fields, per_question):
if not isinstance(per_question, list):
return []
validation_tokens_by_key = {}
for field in quiz_fields:
if not isinstance(field, dict):
continue
key = str(field.get('key') or '').strip()
if not key:
continue
validation = field.get('validation') if isinstance(field.get('validation'), dict) else {}
tokens = []
correct_option = str(validation.get('correct_option') or '').strip()
if correct_option:
tokens.append(correct_option.lower())
accepted_answers = validation.get('accepted_answers')
if isinstance(accepted_answers, list):
for answer in accepted_answers:
answer_text = str(answer or '').strip().lower()
if answer_text:
tokens.append(answer_text)
validation_tokens_by_key[key] = tokens
sanitized = []
for item in per_question:
if not isinstance(item, dict):
continue
key = str(item.get('key') or '').strip()
correct = bool(item.get('correct'))
raw_reason = str(item.get('reason') or '').strip()
reason = raw_reason
if not correct and reason:
lowered = reason.lower()
has_leak = any(token and token in lowered for token in validation_tokens_by_key.get(key, []))
if has_leak:
reason = 'Your response missed key requirements from the prompt. Review the guidance and try again.'
sanitized.append(
{
'key': key,
'correct': correct,
'reason': reason,
}
)
return sanitized
def _evaluate_final_quiz(self, session):
flow = self._get_flow_for_session(session)
if not flow:
return None, {'error': 'Onboarding flow not found for this session.'}
pages = flow.structure if isinstance(flow.structure, list) else []
if not pages:
return None, {'error': 'Onboarding flow has no pages.'}
quiz_page = None
for page in pages:
if not isinstance(page, dict):
continue
meta = page.get('meta') if isinstance(page.get('meta'), dict) else {}
if str(meta.get('page_type') or '').strip() == 'final_quiz':
quiz_page = page
break
if quiz_page is None:
quiz_page = pages[-1] if isinstance(pages[-1], dict) else None
if not isinstance(quiz_page, dict):
return None, {'error': 'Final quiz page could not be identified.'}
quiz_fields = quiz_page.get('fields') if isinstance(quiz_page.get('fields'), list) else []
if not quiz_fields:
return None, {'error': 'Final quiz has no questions.'}
meta = quiz_page.get('meta') if isinstance(quiz_page.get('meta'), dict) else {}
pass_mark = meta.get('pass_mark', 80)
try:
pass_mark = int(pass_mark)
except Exception:
pass_mark = 80
state = session.state or {}
responses = state.get('responses', {})
if not isinstance(responses, dict):
responses = {}
page_uuid = str(quiz_page.get('uuid') or '')
page_responses = responses.get(page_uuid, {})
if not isinstance(page_responses, dict):
page_responses = {}
required_count = 0
missing_required_keys = []
for field in quiz_fields:
if not isinstance(field, dict):
continue
key = str(field.get('key') or '').strip()
if not key:
continue
answer = page_responses.get(key)
attempted = self._has_attempt(answer)
if bool(field.get('required', False)):
required_count += 1
if not attempted:
missing_required_keys.append(key)
grading, grading_error = self._grade_final_quiz_with_assessment_agent(
session,
quiz_fields,
page_responses,
pass_mark,
)
if grading_error:
return None, grading_error
score_percentage = int(grading.get('score_percentage', 0))
correct_count = int(grading.get('correct_count', 0))
gradable_count = int(grading.get('gradable_count', 0))
passed = len(missing_required_keys) == 0 and gradable_count > 0 and score_percentage >= pass_mark
grading_details = self._sanitize_grading_details(quiz_fields, grading.get('per_question', []))
quiz_result = {
'page_uuid': page_uuid,
'pass_mark': pass_mark,
'required_count': required_count,
'missing_required_keys': missing_required_keys,
'gradable_count': gradable_count,
'correct_count': correct_count,
'score_percentage': score_percentage,
'passed': passed,
'grading_details': grading_details,
}
state['final_quiz_result'] = quiz_result
session.state = state
session.save(update_fields=['state', 'updated_at'])
return quiz_result, None
def _call_ka(self, session, page, message):
config = self._get_agent_config(session, 'knowledge')
system_prompt = self._build_system_prompt(config)
page_body = str(page.get('body') or '') if isinstance(page, dict) else ''
page_title = str(page.get('title') or '') if isinstance(page, dict) else ''
context = f"Page: {page_title}\n\n{page_body}" if page_body else page_title
prompt = f"Context:\n{context}\n\nQuestion: {message}"
try:
with httpx.Client(timeout=60.0, auth=settings.INFERENCE_AUTH) as client:
response = client.post(
settings.INFERENCE_CHAT_COMPLETIONS_ENDPOINT,
json={
'messages': [
{'role': 'system', 'content': system_prompt},
{'role': 'user', 'content': prompt},
],
'max_tokens': 512,
},
)
response.raise_for_status()
return response.json().get('choices', [{}])[0].get('message', {}).get('content', '') or 'No answer available.'
except Exception:
return 'Knowledge agent is temporarily unavailable. Please try again later.'
def _run_ka_page_revision(self, session, page, message):
config = self._get_agent_config(session, 'knowledge')
system_prompt = self._build_system_prompt(config)
page_body = str(page.get('body') or '') if isinstance(page, dict) else ''
page_title = str(page.get('title') or '') if isinstance(page, dict) else ''
prompt = (
f"You are revising onboarding page content to incorporate a learner's clarification request.\n\n"
f"Page Title: {page_title}\n"
f"Original Body:\n{page_body}\n\n"
f"Learner's request: {message}\n\n"
f"Rewrite the page body to address the learner's request while preserving the original content structure. "
f"Return only the revised page body."
)
try:
with httpx.Client(timeout=60.0, auth=settings.INFERENCE_AUTH) as client:
response = client.post(
settings.INFERENCE_CHAT_COMPLETIONS_ENDPOINT,
json={
'messages': [
{'role': 'system', 'content': system_prompt},
{'role': 'user', 'content': prompt},
],
'max_tokens': 1024,
},
)
response.raise_for_status()
return response.json().get('choices', [{}])[0].get('message', {}).get('content', '') or page_body
except Exception:
return page_body
@action(detail=True, methods=['post'], url_path='ask-ka')
def ask_ka(self, request, uuid=None):
session = self.get_object()
page_uuid = request.data.get('page_uuid')
message = request.data.get('message')
mode = request.data.get('mode', 'separate')
if not page_uuid or not message:
return Response({'error': 'page_uuid and message are required.'}, status=HTTP_400_BAD_REQUEST)
flow = self._get_flow_for_session(session)
page = None
if flow:
page, _ = self._get_page_from_flow(flow, page_uuid)
if mode == 'update_page':
revised_body = self._run_ka_page_revision(session, page, message)
state = session.state or {}
overrides = state.get('page_overrides', {})
if not isinstance(overrides, dict):
overrides = {}
overrides[str(page_uuid)] = revised_body
state['page_overrides'] = overrides
session.state = state
session.save(update_fields=['state', 'updated_at'])
return Response({
'status': 'ok',
'updated_page': True,
'revised_page_body': revised_body,
'session_state': session.state,
})
answer = self._call_ka(session, page, message)
state = session.state or {}
page_help = state.get('page_help', {})
if not isinstance(page_help, dict):
page_help = {}
page_help[str(page_uuid)] = answer
state['page_help'] = page_help
session.state = state
session.save(update_fields=['state', 'updated_at'])
return Response({
'status': 'ok',
'answer': answer,
'session_state': session.state,
})
@action(detail=True, methods=['post'], url_path='interact')
def interact(self, request, uuid=None):
session = self.get_object()
user_message = request.data.get('message')
page_uuid = request.data.get('page_uuid')
responses = request.data.get('responses')
if not user_message and not page_uuid:
return Response({'error': 'Message or page_uuid is required'}, status=HTTP_400_BAD_REQUEST)
if page_uuid:
self._record_page_visit(session, page_uuid)
if isinstance(responses, dict):
flow = self._get_flow_for_session(session)
total_pages = len(flow.structure) if flow and isinstance(flow.structure, list) else 0
if page_uuid:
self._upsert_page_responses(session, page_uuid, responses)
is_final_quiz = False
if flow:
page, _ = self._get_page_from_flow(flow, page_uuid)
if isinstance(page, dict):
meta = page.get('meta') if isinstance(page.get('meta'), dict) else {}
is_final_quiz = str(meta.get('page_type') or '').strip() == 'final_quiz'
if total_pages:
self._mark_page_progress(session, page_uuid, total_pages, is_final_quiz=is_final_quiz)
else:
state = session.state or {}
stored_responses = state.get('responses', {})
if not isinstance(stored_responses, dict):
stored_responses = {}
stored_responses.update(responses)
state['responses'] = stored_responses
session.state = state
session.save(update_fields=['state', 'updated_at'])
if user_message or isinstance(responses, dict):
AgentInteractionLog.objects.create(
session=session,
sender_type='user',
content=user_message or f'Submitted onboarding responses for page {page_uuid or "unknown"}',
tool_call_metadata={'page_uuid': page_uuid, 'has_responses': isinstance(responses, dict)}
)
return Response({
'status': 'received',
'session_state': session.state,
})
@action(detail=True, methods=['get'], url_path='history')
def history(self, request, uuid=None):
session = self.get_object()
logs = session.logs.all().order_by('created_at')
serializer = AgentInteractionLogSerializer(logs, many=True)
return Response(serializer.data)
@action(detail=True, methods=['post'], url_path='complete')
def complete(self, request, uuid=None):
session = self.get_object()
quiz_result, quiz_error = self._evaluate_final_quiz(session)
if quiz_error:
return Response(quiz_error, status=HTTP_400_BAD_REQUEST)
if not quiz_result.get('passed'):
return Response({
'error': 'Final quiz pass mark not met.',
'quiz_result': quiz_result,
}, status=HTTP_400_BAD_REQUEST)
state = session.state or {}
completed_modules = state.get('completed_modules', [])
if not isinstance(completed_modules, list):
completed_modules = []
if quiz_result.get('page_uuid') and quiz_result['page_uuid'] not in completed_modules:
completed_modules.append(quiz_result['page_uuid'])
state['completed_modules'] = completed_modules
flow = self._get_flow_for_session(session)
total_pages = len(flow.structure) if flow and isinstance(flow.structure, list) else 0
state['progress_percentage'] = int(round((len(completed_modules) / total_pages) * 100)) if total_pages else 100
session.state = state
session.status = 'completed'
session.completed_at = timezone.now()
session.save(update_fields=['status', 'completed_at', 'state', 'updated_at'])
return Response({'message': 'Session marked as completed', 'quiz_result': quiz_result})
class AgentInteractionLogViewSet(RequestParamMixin, ReadOnlyModelViewSet):
queryset = AgentInteractionLog.objects.all()
serializer_class = AgentInteractionLogSerializer
permission_classes = [IsAuthenticated]
lookup_field = 'uuid'
def get_queryset(self):
user = self.request.user
manager_scope = Q()
if user.is_manager:
manager_scope = Q(session__role__organization__members=user)
queryset = AgentInteractionLog.objects.filter(
Q(session__user=user) |
Q(session__role__organization__owner=user) |
manager_scope
).distinct()
if session_uuid := self._get_param('session_uuid'):
queryset = queryset.filter(session__uuid=session_uuid)
if role_uuid := self._get_param('role_uuid'):
queryset = queryset.filter(session__role__uuid=role_uuid)
if organization_uuid := self._get_param('organization_uuid'):
queryset = queryset.filter(session__role__organization__uuid=organization_uuid)
return queryset.order_by('created_at')