Dynavera/apps/knowledge/tasks.py
2026-03-08 13:16:26 +00:00

101 lines
3.5 KiB
Python

import hashlib
from celery import shared_task
from django.conf import settings
from django.db import transaction
from docx import Document
from httpx import Client, Timeout
from pypdf import PdfReader
from apps.knowledge.models import RoleRagDocument, TrainingFile
def _decode_text_bytes(raw_bytes: bytes) -> str:
try:
return raw_bytes.decode('utf-8')
except UnicodeDecodeError:
return raw_bytes.decode('latin-1', errors='ignore')
def _extract_text_from_training_file(file_obj: TrainingFile) -> str:
file_name = (file_obj.file_name or '').lower()
if file_name.endswith('.pdf'):
with file_obj.file.open('rb') as f:
reader = PdfReader(f)
pages = [page.extract_text() or '' for page in reader.pages]
return '\n'.join(pages).strip()
if file_name.endswith('.docx'):
with file_obj.file.open('rb') as f:
document = Document(f)
paragraphs = [paragraph.text for paragraph in document.paragraphs if paragraph.text]
return '\n'.join(paragraphs).strip()
with file_obj.file.open('rb') as f:
raw_bytes = f.read()
return _decode_text_bytes(raw_bytes).strip()
def _get_text_chunks(text: str, size: int = 10000):
"""Slices text into rough blocks to prevent HTTP timeouts."""
for i in range(0, len(text), size):
yield text[i:i + size]
@shared_task(name="apps.knowledge.tasks.ingest_training_file_task", bind=True, soft_time_limit=900, time_limit=1200)
def ingest_training_file_task(self, file_uuid):
try:
file_obj = TrainingFile.objects.get(uuid=file_uuid)
except TrainingFile.DoesNotExist:
return f"File {file_uuid} not found."
file_obj.status = 'ingesting'
file_obj.save()
try:
raw_text = _extract_text_from_training_file(file_obj)
if not raw_text:
raise ValueError('No extractable text found.')
all_documents = []
chunk_counter = 0
timeout = Timeout(60.0)
with Client(timeout=timeout) as client:
for text_segment in _get_text_chunks(raw_text):
response = client.post(
settings.INFERENCE_SEMANTIC_CHUNK_ENDPOINT,
json={"text": text_segment, "threshold": 95}
)
response.raise_for_status()
result = response.json()
chunks = result['chunks']
embeddings = result['embeddings']
for chunk_text, embedding in zip(chunks, embeddings):
all_documents.append(RoleRagDocument(
role=file_obj.role,
training_file=file_obj,
content=chunk_text,
content_hash=hashlib.sha256(chunk_text.encode('utf-8')).hexdigest(),
embedding=embedding,
chunk_index=chunk_counter,
metadata={"source": file_obj.file_name}
))
chunk_counter += 1
with transaction.atomic():
RoleRagDocument.objects.bulk_create(all_documents)
file_obj.status = 'embedded'
file_obj.is_processed = True
file_obj.save()
return f"Processed {chunk_counter} chunks via batching."
except Exception as e:
file_obj.status = 'failed'
file_obj.description = str(e)
file_obj.save()
raise e