2026-03-18 20:07:24 +00:00
|
|
|
import json
|
|
|
|
|
import re
|
|
|
|
|
|
|
|
|
|
import httpx
|
|
|
|
|
|
|
|
|
|
from channels.db import database_sync_to_async
|
|
|
|
|
from django.conf import settings
|
|
|
|
|
from django.utils import timezone
|
|
|
|
|
|
|
|
|
|
from apps.onboarding.consumers.base import BaseOnboardingConsumer, LogType
|
|
|
|
|
from apps.onboarding.consumers.prompts import OnboardingPrompts
|
|
|
|
|
from apps.onboarding.models import AgentInteractionLog, OnboardingSession
|
|
|
|
|
|
|
|
|
|
__all__ = ['OnboardingKnowledgeConsumer']
|
|
|
|
|
|
|
|
|
|
class OnboardingKnowledgeConsumer(BaseOnboardingConsumer):
|
|
|
|
|
"""
|
|
|
|
|
Route: /ws/onboarding/knowledge/<uuid:session_uuid>/
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
session_uuid: str
|
|
|
|
|
|
|
|
|
|
def parse_extra(self):
|
|
|
|
|
self.session_uuid = self.scope['url_route']['kwargs'].get('session_uuid')
|
|
|
|
|
|
|
|
|
|
async def action_ask(self, data: dict):
|
|
|
|
|
page_uuid = data.get('page_uuid')
|
|
|
|
|
user_message = data.get('message')
|
|
|
|
|
mode = str(data.get('mode', 'separate'))
|
|
|
|
|
|
|
|
|
|
if not page_uuid or not user_message:
|
|
|
|
|
return await self.send_error('page_uuid and message are required.')
|
|
|
|
|
|
|
|
|
|
session = await self.get_session(self.session_uuid, self.user.id)
|
|
|
|
|
if not session:
|
|
|
|
|
return await self.send_error('Session not found or access denied.')
|
|
|
|
|
|
|
|
|
|
if not session.flow:
|
|
|
|
|
return await self.send_error('Onboarding flow not found.')
|
|
|
|
|
|
|
|
|
|
page = self._get_page(session.flow, str(page_uuid))
|
|
|
|
|
if not isinstance(page, dict):
|
|
|
|
|
return await self.send_error('Page not found in this flow.')
|
|
|
|
|
|
|
|
|
|
page_title = str(page.get('title') or 'Onboarding Page')
|
|
|
|
|
page_body = str(page.get('body') or '')
|
|
|
|
|
role_name = session.role.name
|
|
|
|
|
role_uuid = str(session.role.uuid)
|
|
|
|
|
|
|
|
|
|
config = await self.get_config_by_type(role_uuid, 'knowledge')
|
|
|
|
|
|
|
|
|
|
updated_page = False
|
|
|
|
|
revised_body = None
|
|
|
|
|
assistant_message = ''
|
|
|
|
|
|
|
|
|
|
if mode == 'update_page':
|
|
|
|
|
await self.send_log(LogType.STATUS, 'Revising page content...')
|
|
|
|
|
revised_body = await self._call_llm(
|
|
|
|
|
config,
|
|
|
|
|
OnboardingPrompts.ka_page_revision_prompt(role_name, page_title, page_body, str(user_message)),
|
|
|
|
|
max_tokens=3000,
|
|
|
|
|
stop=['\n[END]', '[END]'],
|
|
|
|
|
)
|
|
|
|
|
if revised_body:
|
|
|
|
|
await self.save_page_override(session, str(page_uuid), revised_body)
|
|
|
|
|
updated_page = True
|
|
|
|
|
assistant_message = (
|
|
|
|
|
'Updated this page by integrating your clarification request into the core content. '
|
|
|
|
|
'Please review the revised page text above.'
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if not assistant_message:
|
|
|
|
|
await self.send_log(LogType.STATUS, 'Thinking...')
|
|
|
|
|
if config:
|
|
|
|
|
assistant_message = await self._call_llm(
|
|
|
|
|
config,
|
|
|
|
|
OnboardingPrompts.ka_help_prompt(role_name, page_title, page_body, str(user_message)),
|
|
|
|
|
max_tokens=1024,
|
|
|
|
|
) or OnboardingPrompts.KA_HELP_FALLBACK
|
|
|
|
|
else:
|
|
|
|
|
assistant_message = OnboardingPrompts.KA_HELP_FALLBACK
|
|
|
|
|
|
|
|
|
|
await self.save_page_help(session, str(page_uuid), str(user_message), assistant_message)
|
2026-03-22 17:40:23 +00:00
|
|
|
await self.log_interaction(session, str(user_message), assistant_message, str(page_uuid), mode, updated_page, config=config)
|
2026-03-18 20:07:24 +00:00
|
|
|
|
|
|
|
|
await self.send_log(LogType.COMPLETED, assistant_message, {
|
|
|
|
|
'updated_page': updated_page,
|
|
|
|
|
'revised_page_body': revised_body if mode == 'update_page' else None,
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
async def _call_llm(
|
|
|
|
|
self,
|
|
|
|
|
config,
|
|
|
|
|
prompt: str,
|
|
|
|
|
*,
|
|
|
|
|
max_tokens: int = 1024,
|
|
|
|
|
stop: list[str] | None = None,
|
|
|
|
|
) -> str | None:
|
|
|
|
|
if not config:
|
|
|
|
|
return None
|
|
|
|
|
system_prompt = config.system_prompt or OnboardingPrompts.FALLBACK_SYSTEM_PROMPT
|
|
|
|
|
payload: dict = {
|
|
|
|
|
'messages': [
|
|
|
|
|
{'role': 'system', 'content': system_prompt},
|
|
|
|
|
{'role': 'user', 'content': prompt},
|
|
|
|
|
],
|
|
|
|
|
'max_tokens': max_tokens,
|
|
|
|
|
'stream': True,
|
|
|
|
|
}
|
|
|
|
|
if stop:
|
|
|
|
|
payload['stop'] = stop
|
|
|
|
|
try:
|
|
|
|
|
chunks: list[str] = []
|
2026-03-22 20:04:14 +00:00
|
|
|
async with httpx.AsyncClient(timeout=settings.INFERENCE_STREAM_TIMEOUT, auth=settings.INFERENCE_AUTH) as client:
|
2026-03-18 20:07:24 +00:00
|
|
|
async with client.stream('POST', settings.INFERENCE_CHAT_COMPLETIONS_ENDPOINT, json=payload) as response:
|
|
|
|
|
response.raise_for_status()
|
|
|
|
|
async for line in response.aiter_lines():
|
|
|
|
|
if not line.startswith('data: '):
|
|
|
|
|
continue
|
|
|
|
|
data = line[6:].strip()
|
|
|
|
|
if data == '[DONE]':
|
|
|
|
|
break
|
|
|
|
|
try:
|
|
|
|
|
chunk_obj = json.loads(data)
|
|
|
|
|
choice = chunk_obj['choices'][0]
|
|
|
|
|
delta = choice.get('delta', {}).get('content', '')
|
|
|
|
|
if delta:
|
|
|
|
|
chunks.append(delta)
|
|
|
|
|
await self.send_log(LogType.STREAM_CHUNK, delta)
|
|
|
|
|
if choice.get('finish_reason') == 'length':
|
|
|
|
|
self.logger.warning('Knowledge LLM response truncated (finish_reason=length)')
|
|
|
|
|
except Exception:
|
|
|
|
|
continue
|
|
|
|
|
result = ''.join(chunks).strip()
|
|
|
|
|
result = re.sub(r'\n?\[END\]\s*$', '', result).strip()
|
|
|
|
|
return result or None
|
|
|
|
|
except Exception as e:
|
|
|
|
|
self.logger.exception('Knowledge LLM call failed: %s', e)
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def _get_page(self, flow, page_uuid: str) -> dict | None:
|
|
|
|
|
pages = flow.structure if isinstance(flow.structure, list) else []
|
|
|
|
|
return next(
|
|
|
|
|
(p for p in pages if isinstance(p, dict) and str(p.get('uuid')) == page_uuid),
|
|
|
|
|
None,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
@database_sync_to_async
|
|
|
|
|
def get_session(self, session_uuid: str, user_id: int):
|
|
|
|
|
return (
|
|
|
|
|
OnboardingSession.objects
|
|
|
|
|
.select_related('flow', 'role')
|
|
|
|
|
.filter(uuid=session_uuid, user_id=user_id)
|
|
|
|
|
.first()
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
@database_sync_to_async
|
|
|
|
|
def save_page_help(self, session, page_uuid: str, user_message: str, assistant_message: str):
|
|
|
|
|
state = session.state or {}
|
|
|
|
|
page_help = state.get('page_help', {})
|
|
|
|
|
if not isinstance(page_help, dict):
|
|
|
|
|
page_help = {}
|
|
|
|
|
|
|
|
|
|
thread = page_help.get(page_uuid, [])
|
|
|
|
|
if not isinstance(thread, list):
|
|
|
|
|
thread = []
|
|
|
|
|
|
|
|
|
|
thread.append({
|
|
|
|
|
'question': user_message,
|
|
|
|
|
'answer': assistant_message,
|
|
|
|
|
'timestamp': timezone.now().isoformat(),
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
page_help[page_uuid] = thread[-20:]
|
|
|
|
|
state['page_help'] = page_help
|
|
|
|
|
session.state = state
|
|
|
|
|
session.save(update_fields=['state', 'updated_at'])
|
|
|
|
|
|
|
|
|
|
@database_sync_to_async
|
|
|
|
|
def save_page_override(self, session, page_uuid: str, new_body: str):
|
|
|
|
|
state = session.state if isinstance(session.state, dict) else {}
|
|
|
|
|
overrides = state.get('page_overrides', {})
|
|
|
|
|
if not isinstance(overrides, dict):
|
|
|
|
|
overrides = {}
|
|
|
|
|
overrides[page_uuid] = new_body
|
|
|
|
|
state['page_overrides'] = overrides
|
|
|
|
|
session.state = state
|
|
|
|
|
session.save(update_fields=['state', 'updated_at'])
|
|
|
|
|
|
|
|
|
|
@database_sync_to_async
|
|
|
|
|
def log_interaction(
|
|
|
|
|
self, session, user_message: str, assistant_message: str,
|
2026-03-22 17:40:23 +00:00
|
|
|
page_uuid: str, mode: str, updated_page: bool, config=None,
|
2026-03-18 20:07:24 +00:00
|
|
|
):
|
|
|
|
|
AgentInteractionLog.objects.create(
|
|
|
|
|
session=session,
|
2026-03-22 17:40:23 +00:00
|
|
|
agent_config=config,
|
2026-03-18 20:07:24 +00:00
|
|
|
sender_type='user',
|
|
|
|
|
content=user_message,
|
|
|
|
|
tool_call_metadata={'action': 'ask_ka', 'page_uuid': page_uuid, 'mode': mode},
|
|
|
|
|
)
|
|
|
|
|
AgentInteractionLog.objects.create(
|
|
|
|
|
session=session,
|
2026-03-22 17:40:23 +00:00
|
|
|
agent_config=config,
|
2026-03-18 20:07:24 +00:00
|
|
|
sender_type='ai',
|
|
|
|
|
content=assistant_message,
|
|
|
|
|
tool_call_metadata={
|
|
|
|
|
'action': 'ask_ka_response',
|
|
|
|
|
'page_uuid': page_uuid,
|
|
|
|
|
'mode': mode,
|
|
|
|
|
'updated_page': updated_page,
|
|
|
|
|
},
|
|
|
|
|
)
|