import hashlib import logging from celery import shared_task from django.conf import settings from django.db import transaction from docx import Document from httpx import Client, Timeout from pypdf import PdfReader from apps.knowledge.models import KnowledgeChunk, TrainingFile logger = logging.getLogger(__name__) def _decode_text_bytes(raw_bytes: bytes) -> str: try: return raw_bytes.decode('utf-8') except UnicodeDecodeError: return raw_bytes.decode('latin-1', errors='ignore') def _extract_text_from_training_file(file_obj: TrainingFile) -> str: file_name = (file_obj.file_name or '').lower() if file_name.endswith('.pdf'): with file_obj.file.open('rb') as f: reader = PdfReader(f) pages = [page.extract_text() or '' for page in reader.pages] return '\n'.join(pages).strip() if file_name.endswith('.docx'): with file_obj.file.open('rb') as f: document = Document(f) paragraphs = [paragraph.text for paragraph in document.paragraphs if paragraph.text] return '\n'.join(paragraphs).strip() with file_obj.file.open('rb') as f: raw_bytes = f.read() return _decode_text_bytes(raw_bytes).strip() def _get_text_chunks(text: str, size: int = 10000): for i in range(0, len(text), size): yield text[i:i + size] @shared_task(name="apps.knowledge.tasks.ingest_training_file_task", bind=True, soft_time_limit=900, time_limit=1200) def ingest_training_file_task(self, file_uuid): """ Ingests a training file by extracting text, chunking it, generating embeddings via an external service, and saving KnowledgeChunk entries. Updates the file status accordingly and triggers prompt refinement. """ try: file_obj = TrainingFile.objects.get(uuid=file_uuid) except TrainingFile.DoesNotExist: return f"File {file_uuid} not found." file_obj.status = 'ingesting' file_obj.save() try: raw_text = _extract_text_from_training_file(file_obj) if not raw_text: raise ValueError('No extractable text found.') all_documents = [] chunk_counter = 0 timeout = Timeout(60.0) with Client(timeout=timeout, auth=settings.INFERENCE_AUTH) as client: for text_segment in _get_text_chunks(raw_text): response = client.post( settings.INFERENCE_SEMANTIC_CHUNK_ENDPOINT, json={ "text": text_segment, "threshold": 95, }, ) response.raise_for_status() result = response.json() chunks = result['chunks'] embeddings = result['embeddings'] for chunk_text, embedding in zip(chunks, embeddings): all_documents.append(KnowledgeChunk( organization=file_obj.organization, role=file_obj.role, training_file=file_obj, content=chunk_text, content_hash=hashlib.sha256(chunk_text.encode('utf-8')).hexdigest(), embedding=embedding, chunk_index=chunk_counter, metadata={ "source": file_obj.file_name, "file_name": file_obj.file_name, "scope": "role" if file_obj.role_id else "organization", }, )) chunk_counter += 1 existing_hashes = set(KnowledgeChunk.objects.filter(training_file=file_obj).values_list('content_hash', flat=True)) new_documents = [d for d in all_documents if d.content_hash not in existing_hashes] with transaction.atomic(): KnowledgeChunk.objects.bulk_create(new_documents) file_obj.status = 'embedded' file_obj.save() if file_obj.role_id: update_agent_prompts_from_file_task.delay(str(file_obj.role.uuid)) return f"Processed {chunk_counter} chunks via batching." except Exception as e: file_obj.status = 'failed' file_obj.description = str(e) file_obj.save() raise e @shared_task(name="apps.knowledge.tasks.update_agent_prompts_from_file_task", bind=True, soft_time_limit=120, time_limit=180) def update_agent_prompts_from_file_task(self, role_uuid: str): """ After a training file is ingested (or deleted), refine the curriculum and assessment AgentConfig system prompts using document content. Resets to canonical base prompts when no files remain. """ from apps.accounts.models import Role from apps.onboarding.consumers.prompts import OnboardingPrompts from apps.onboarding.models import AgentConfig try: role = Role.objects.get(uuid=role_uuid) except Role.DoesNotExist: logger.warning("update_agent_prompts_from_file_task: role %s not found", role_uuid) return configs = { cfg.agent_type: cfg for cfg in AgentConfig.objects.filter(role=role, agent_type__in=['curriculum', 'assessment']) } chunk_texts = list( KnowledgeChunk.objects.filter(role=role, is_active=True) .order_by('training_file_id', 'chunk_index') .values_list('content', flat=True)[:30] ) # No files left — reset both to their canonical base prompts if not chunk_texts: to_update = [] if 'curriculum' in configs: configs['curriculum'].system_prompt = OnboardingPrompts.default_curriculum_prompt(role.name) to_update.append(configs['curriculum']) if 'assessment' in configs: configs['assessment'].system_prompt = OnboardingPrompts.default_assessment_prompt(role.name) to_update.append(configs['assessment']) for cfg in to_update: cfg.save(update_fields=['system_prompt', 'updated_at']) logger.info("update_agent_prompts_from_file_task: reset to base prompts for role %s", role_uuid) return combined_text = '\n\n'.join(chunk_texts)[:6000] refine_calls = [ ( 'curriculum', OnboardingPrompts.refine_curriculum_prompt( role.name, OnboardingPrompts.default_curriculum_prompt(role.name), combined_text ), ), ( 'assessment', OnboardingPrompts.refine_assessment_prompt( role.name, OnboardingPrompts.default_assessment_prompt(role.name), combined_text ), ), ] try: with Client(timeout=Timeout(60.0), auth=settings.INFERENCE_AUTH) as client: for agent_type, user_prompt in refine_calls: if agent_type not in configs: continue response = client.post( settings.INFERENCE_CHAT_COMPLETIONS_ENDPOINT, json={ "model": "meta-llama-3.1-8b-instruct", "messages": [{"role": "user", "content": user_prompt}], "max_tokens": 600, }, ) response.raise_for_status() configs[agent_type].system_prompt = response.json()["choices"][0]["message"]["content"].strip() configs[agent_type].save(update_fields=['system_prompt', 'updated_at']) logger.info("update_agent_prompts_from_file_task: refined %s prompt for role %s", agent_type, role_uuid) except Exception as e: logger.exception("update_agent_prompts_from_file_task: LLM call failed for role %s: %s", role_uuid, e)