Dynavera/apps/accounts/management/commands/benchmark.py
2026-03-24 17:05:46 +00:00

484 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import datetime
import json
import statistics
import time
from pathlib import Path
import httpx
from django.conf import settings
from django.core.management.base import BaseCommand
from django.db.models import Q
from pgvector.django import CosineDistance
from apps.accounts.models import Organization, Role, User
from apps.knowledge.models import KnowledgeChunk, TrainingFile
from apps.onboarding.models import OnboardingSession
class Command(BaseCommand):
help = "Benchmark Dynavera system components: GPU inference server, pgvector retrieval, and database."
def add_arguments(self, parser):
parser.add_argument("--runs", type=int, default=5, help="Repetitions per latency benchmark (default: 5)")
parser.add_argument("--out", type=str, default="benchmarks", help="Output directory for the results file (default: benchmarks/)")
parser.add_argument("--skip-llm", action="store_true", help="Skip LLM inference benchmarks (each prompt takes ~30 s)")
def handle(self, *args, **options):
self.runs = options["runs"]
self.skip_llm = options["skip_llm"]
self.out_dir = Path(options["out"])
self.out_dir.mkdir(exist_ok=True)
self.results = {}
self.stdout.write(self.style.SUCCESS("\n=== Dynavera System Benchmark ==="))
self.stdout.write(f" Inference endpoint : {settings.INFERENCE_URL}")
self.stdout.write(f" Repetitions : {self.runs}")
self.stdout.write(f" LLM benchmarks : {'SKIPPED (--skip-llm)' if self.skip_llm else 'ENABLED'}\n")
self._bench_health()
self._bench_embeddings()
self._bench_chunking()
if not self.skip_llm:
self._bench_llm()
self._bench_database()
self._bench_retrieval()
self._print_summary()
self._save_report()
def _req(self, method, path, **kwargs):
url = f"{settings.INFERENCE_URL}{path}"
resp = httpx.request(method, url, auth=settings.INFERENCE_AUTH, timeout=180, **kwargs)
resp.raise_for_status()
return resp.json()
def _time_fn(self, fn):
t0 = time.perf_counter()
result = fn()
return result, (time.perf_counter() - t0) * 1000
def _stats(self, times_ms):
s = sorted(times_ms)
n = len(s)
p95_idx = min(n - 1, int(-(-(0.95 * n) // 1)) - 1)
return {
"mean_ms": round(statistics.mean(s), 1),
"median_ms": round(statistics.median(s), 1),
"p95_ms": round(s[p95_idx], 1),
"min_ms": round(s[0], 1),
"max_ms": round(s[-1], 1),
}
def _bench_health(self):
self.stdout.write("[ 1/6 ] GPU server health check ...")
try:
data, ms = self._time_fn(lambda: self._req("GET", "/health"))
ok = data.get("status") == "ok"
self.results["health"] = {
"status": "OK" if ok else "DEGRADED",
"llm_ready": data.get("llm_ready", False),
"embed_ready": data.get("embedding_ready", False),
"latency_ms": round(ms, 1),
}
h = self.results["health"]
self.stdout.write(
f" {h['status']} | LLM: {'ready' if h['llm_ready'] else 'unloaded'} "
f"| Embed: {'ready' if h['embed_ready'] else 'not ready'} | {ms:.0f} ms"
)
except Exception as exc:
self.results["health"] = {"status": "ERROR", "error": str(exc)}
self.stdout.write(self.style.ERROR(f" FAILED: {exc}"))
def _bench_embeddings(self):
self.stdout.write(f"\n[ 2/6 ] Embedding latency ({self.runs} runs × 3 query lengths) ...")
queries = {
"short ": "What is onboarding?",
"medium ": (
"Explain the process for configuring access control policies for a new software engineer "
"joining the platform team, including approval workflows and tool provisioning steps."
),
"long ": (
"A new hire on the infrastructure team needs to understand our CI/CD pipeline, deployment "
"procedures, incident response protocols, monitoring dashboards, on-call rotation policy, "
"and how to request access to production systems. Provide a comprehensive overview of all "
"these areas including the relevant tools, key contacts, and escalation procedures they "
"should be aware of during their first week and first month at the company."
),
}
embed_results = {}
for label, query in queries.items():
times = []
for _ in range(self.runs):
_, ms = self._time_fn(lambda q=query: self._req("POST", "/v1/embeddings", json={"input": q}))
times.append(ms)
st = self._stats(times)
embed_results[label.strip()] = {"query_chars": len(query), **st}
self.stdout.write(
f" {label}({len(query):4d} chars) mean={st['mean_ms']:.0f} ms "
f"p95={st['p95_ms']:.0f} ms min={st['min_ms']:.0f} ms max={st['max_ms']:.0f} ms"
)
self.results["embeddings"] = embed_results
def _bench_chunking(self):
self.stdout.write("\n[ 3/6 ] Semantic chunking latency ...")
texts = {
"small (~200 c)": "a " * 100,
"medium (~2k c) ": (
"This section covers the onboarding process for new employees joining the engineering team. "
"You will learn about code review practices, deployment procedures, incident response, and "
"team communication protocols. Each topic is covered in depth with examples and references "
"to internal documentation. All engineers are expected to complete this module in week one. "
) * 5,
"large (~8k c) ": (
"The infrastructure team manages all cloud resources, CI/CD pipelines, and production environments. "
"New members are expected to understand Kubernetes cluster management, Terraform IaC, "
"GitLab CI pipeline authoring, monitoring with Grafana and Prometheus, and incident response procedures. "
"This document provides a comprehensive guide to each area including runbooks and escalation paths. "
) * 20,
}
chunk_results = {}
for label, text in texts.items():
try:
result, ms = self._time_fn(lambda t=text: self._req("POST", "/v1/semantic-chunk", json={"text": t}))
n = len(result.get("chunks", []))
chunk_results[label.strip()] = {"chars": len(text), "chunks_produced": n, "latency_ms": round(ms, 1)}
self.stdout.write(f" {label}{n} chunks | {ms:.0f} ms")
except Exception as exc:
chunk_results[label.strip()] = {"error": str(exc)}
self.stdout.write(self.style.ERROR(f" {label} FAILED: {exc}"))
self.results["chunking"] = chunk_results
def _bench_llm(self):
self.stdout.write("\n[ 4/6 ] LLM inference latency (each prompt is a single non-streaming call) ...")
prompts = [
{
"label": "short_qa",
"system": "You are an onboarding assistant.",
"user": "What does a Kubernetes pod do? Answer in 2 sentences.",
"max_tokens": 128,
},
{
"label": "progress_summary",
"system": "You are an onboarding assistant.",
"user": (
"A trainee has completed: Git Basics, CI/CD Pipelines, Code Review. Score: 85%. "
"Write a 2-sentence progress summary."
),
"max_tokens": 128,
},
{
"label": "curriculum_gen",
"system": "You are an onboarding assistant. Output only a valid JSON array of strings.",
"user": (
"Create a 6-module onboarding curriculum for a Software Engineer role focused on "
"backend services. Output ONLY a JSON array of module title strings."
),
"max_tokens": 256,
},
{
"label": "assessment_gen",
"system": "You are an onboarding assistant. Output only valid JSON.",
"user": (
"Generate 3 multiple-choice questions to assess understanding of CI/CD pipelines. "
"Output as a JSON array of objects with keys: question, options (array of 4), answer."
),
"max_tokens": 512,
},
{
"label": "knowledge_explanation",
"system": "You are an onboarding assistant.",
"user": (
"Explain Git branching strategy best practices for a new engineer. "
"Cover: feature branches, naming conventions, merge vs rebase, and PR workflow. "
"Use clear headings and bullet points. Target ~400 words."
),
"max_tokens": 700,
},
]
llm_results = {}
for p in prompts:
self.stdout.write(f" {p['label']} (max_tokens={p['max_tokens']}) ...", ending="")
self.stdout.flush()
try:
t0 = time.perf_counter()
data = self._req(
"POST",
"/v1/chat/completions",
json={
"messages": [
{"role": "system", "content": p["system"]},
{"role": "user", "content": p["user"]},
],
"max_tokens": p["max_tokens"],
"stream": False,
},
)
elapsed_s = time.perf_counter() - t0
usage = data.get("usage", {})
ct = usage.get("completion_tokens", 0)
pt = usage.get("prompt_tokens", 0)
tps = round(ct / elapsed_s, 1) if elapsed_s > 0 and ct > 0 else 0
preview = (data["choices"][0]["message"]["content"] or "")[:100].replace("\n", " ")
llm_results[p["label"]] = {
"elapsed_s": round(elapsed_s, 2),
"prompt_tokens": pt,
"completion_tokens": ct,
"tokens_per_sec": tps,
"response_preview": preview,
}
self.stdout.write(f" {elapsed_s:.1f} s | {ct} tokens | {tps} tok/s")
except Exception as exc:
llm_results[p["label"]] = {"error": str(exc)}
self.stdout.write(self.style.ERROR(f" FAILED: {exc}"))
self.results["llm"] = llm_results
def _bench_database(self):
self.stdout.write("\n[ 5/6 ] Database statistics ...")
try:
from django.db import connection
with connection.cursor() as cur:
cur.execute("SELECT 1 FROM knowledge_knowledgechunk LIMIT 1")
except Exception:
self.stdout.write(self.style.WARNING(" Tables missing — run 'manage.py migrate' first. Skipping."))
self.results["database"] = {"skipped": "Migrations not applied."}
return
try:
self.results["database"] = {
"organizations": Organization.objects.count(),
"roles": Role.objects.count(),
"users": User.objects.count(),
"training_files_total": TrainingFile.objects.count(),
"training_files_embedded": TrainingFile.objects.filter(status="embedded").count(),
"knowledge_chunks_with_embeddings": KnowledgeChunk.objects.filter(embedding__isnull=False, is_active=True).count(),
"onboarding_sessions": OnboardingSession.objects.count(),
}
d = self.results["database"]
self.stdout.write(f" Orgs: {d['organizations']} | Roles: {d['roles']} | Users: {d['users']}")
self.stdout.write(f" Training files: {d['training_files_total']} total ({d['training_files_embedded']} embedded)")
self.stdout.write(f" Knowledge chunks (with embeddings): {d['knowledge_chunks_with_embeddings']}")
self.stdout.write(f" Onboarding sessions: {d['onboarding_sessions']}")
except Exception as exc:
self.results["database"] = {"error": str(exc)}
self.stdout.write(self.style.ERROR(f" FAILED: {exc}"))
def _bench_retrieval(self):
self.stdout.write(f"\n[ 6/6 ] pgvector retrieval latency ({self.runs} runs × top-k ∈ [5, 10, 20]) ...")
try:
role = Role.objects.filter(knowledge_chunks__embedding__isnull=False).distinct().first()
except Exception as exc:
self.stdout.write(self.style.WARNING(f" DB not ready ({exc}). Skipping."))
self.results["retrieval"] = {"skipped": str(exc)}
return
if role is None:
self.stdout.write(self.style.WARNING(" No role with embedded chunks — skipping."))
self.results["retrieval"] = {"skipped": "No embedded chunks found in database."}
return
query = "What are the key responsibilities, tools, and procedures for this role?"
self.stdout.write(f" Role: {role.name} (org: {role.organization.name})")
self.stdout.write(f" Query: \"{query}\"")
try:
embed_data = self._req("POST", "/v1/embeddings", json={"input": query})
query_vector = embed_data["data"][0]["embedding"]
except Exception as exc:
self.results["retrieval"] = {"error": f"Could not generate query embedding: {exc}"}
self.stdout.write(self.style.ERROR(f" FAILED to get embedding: {exc}"))
return
total_chunks = KnowledgeChunk.objects.filter(embedding__isnull=False, is_active=True).count()
retrieval_results = {}
for top_k in [5, 10, 20]:
times = []
n_returned = 0
for _ in range(self.runs):
t0 = time.perf_counter()
chunks = list(
KnowledgeChunk.objects.filter(
organization=role.organization,
embedding__isnull=False,
is_active=True,
).filter(
Q(role=role) | Q(role__isnull=True)
).annotate(
distance=CosineDistance("embedding", query_vector)
).order_by("distance")[:top_k]
)
times.append((time.perf_counter() - t0) * 1000)
n_returned = len(chunks)
st = self._stats(times)
retrieval_results[f"top_{top_k}"] = {"results_returned": n_returned, **st}
self.stdout.write(
f" top-{top_k:2d}: mean={st['mean_ms']:.1f} ms "
f"p95={st['p95_ms']:.1f} ms min={st['min_ms']:.1f} ms max={st['max_ms']:.1f} ms"
)
self.results["retrieval"] = {
"role": role.name,
"organization": role.organization.name,
"query": query,
"total_chunks_in_db": total_chunks,
"results": retrieval_results,
}
def _print_summary(self):
self.stdout.write(self.style.SUCCESS("\n=== Summary ===\n"))
h = self.results.get("health", {})
self.stdout.write(f" GPU Server : {h.get('status', 'N/A')} — LLM {'ready' if h.get('llm_ready') else 'unloaded'}, embed {'ready' if h.get('embed_ready') else 'N/A'}")
emb = self.results.get("embeddings", {})
means = [v["mean_ms"] for v in emb.values() if "mean_ms" in v]
if means:
self.stdout.write(f" Embedding : {min(means):.0f}{max(means):.0f} ms (mean across query lengths)")
chnk = self.results.get("chunking", {})
lats = [v["latency_ms"] for v in chnk.values() if "latency_ms" in v]
if lats:
self.stdout.write(f" Chunking : {min(lats):.0f}{max(lats):.0f} ms range by text size")
llm = self.results.get("llm", {})
elapsed = [v["elapsed_s"] for v in llm.values() if "elapsed_s" in v]
tps_all = [v["tokens_per_sec"] for v in llm.values() if "tokens_per_sec" in v and v["tokens_per_sec"] > 0]
if elapsed:
self.stdout.write(
f" LLM inference : {min(elapsed):.1f}{max(elapsed):.1f} s range"
+ (f" | {statistics.mean(tps_all):.1f} tok/s avg" if tps_all else "")
)
ret = self.results.get("retrieval", {})
r5 = ret.get("results", {}).get("top_5", {})
if r5.get("mean_ms"):
self.stdout.write(f" RAG retrieval : {r5['mean_ms']:.1f} ms mean (top-5, {ret.get('total_chunks_in_db', '?')} total chunks)")
db = self.results.get("database", {})
if "knowledge_chunks_with_embeddings" in db:
self.stdout.write(
f" Knowledge base : {db['knowledge_chunks_with_embeddings']} chunks from "
f"{db['training_files_embedded']} embedded files"
)
def _save_report(self):
ts = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
path = self.out_dir / f"results_{ts}.md"
lines = [
"# Dynavera Benchmark Results",
"",
f"**Date:** {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ",
f"**Inference endpoint:** `{settings.INFERENCE_URL}` ",
f"**Repetitions per benchmark:** {self.runs} ",
"",
]
h = self.results.get("health", {})
lines += [
"## 1. GPU Server Health",
"",
"| Field | Value |",
"|---|---|",
f"| Status | {h.get('status', 'N/A')} |",
f"| LLM Ready | {h.get('llm_ready', 'N/A')} |",
f"| Embed Ready | {h.get('embed_ready', 'N/A')} |",
f"| Health check RTT | {h.get('latency_ms', 'N/A')} ms |",
"",
]
emb = self.results.get("embeddings", {})
if emb:
lines += [
"## 2. Embedding Latency",
"",
"| Query type | Chars | Mean (ms) | Median (ms) | P95 (ms) | Min (ms) | Max (ms) |",
"|---|---|---|---|---|---|---|",
]
for label, v in emb.items():
if "mean_ms" in v:
lines.append(f"| {label} | {v['query_chars']} | {v['mean_ms']} | {v['median_ms']} | {v['p95_ms']} | {v['min_ms']} | {v['max_ms']} |")
lines.append("")
chnk = self.results.get("chunking", {})
if chnk:
lines += [
"## 3. Semantic Chunking Latency",
"",
"| Input size | Chars | Chunks produced | Latency (ms) |",
"|---|---|---|---|",
]
for label, v in chnk.items():
if "latency_ms" in v:
lines.append(f"| {label} | {v['chars']} | {v['chunks_produced']} | {v['latency_ms']} |")
lines.append("")
llm = self.results.get("llm", {})
if llm:
lines += [
"## 4. LLM Inference Latency",
"",
"| Prompt type | Elapsed (s) | Prompt tokens | Completion tokens | Tok/s |",
"|---|---|---|---|---|",
]
for label, v in llm.items():
if "elapsed_s" in v:
lines.append(
f"| {label} | {v['elapsed_s']} | {v['prompt_tokens']} | {v['completion_tokens']} | {v['tokens_per_sec']} |"
)
else:
lines.append(f"| {label} | ERROR | — | — | — |")
lines.append("")
lines += [
"> **Note on end-to-end session time:** A full onboarding session invokes multiple sequential",
"> inference calls (curriculum generation → knowledge explanation × N modules → assessment generation → progress summary).",
"> Total wall-clock time accumulates across all turns plus retrieval and tool-call overhead.",
"",
]
db = self.results.get("database", {})
if db and "error" not in db:
lines += [
"## 5. Database Statistics",
"",
"| Entity | Count |",
"|---|---|",
]
labels = {
"organizations": "Organizations",
"roles": "Roles",
"users": "Users",
"training_files_total": "Training Files (total)",
"training_files_embedded": "Training Files (embedded)",
"knowledge_chunks_with_embeddings": "Knowledge Chunks (with embeddings)",
"onboarding_sessions": "Onboarding Sessions",
}
for key, label in labels.items():
if key in db:
lines.append(f"| {label} | {db[key]} |")
lines.append("")
ret = self.results.get("retrieval", {})
if "results" in ret:
lines += [
"## 6. pgvector Retrieval Latency",
"",
f"**Role:** {ret.get('role')} ",
f"**Organisation:** {ret.get('organization')} ",
f'**Query:** "{ret.get("query")}" ',
f"**Total chunks in DB:** {ret.get('total_chunks_in_db')} ",
"",
"| Top-K | Results returned | Mean (ms) | Median (ms) | P95 (ms) | Min (ms) | Max (ms) |",
"|---|---|---|---|---|---|---|",
]
for k, v in ret["results"].items():
lines.append(
f"| {k} | {v['results_returned']} | {v['mean_ms']} | {v['median_ms']} | {v['p95_ms']} | {v['min_ms']} | {v['max_ms']} |"
)
lines.append("")
lines += [
"## Raw JSON",
"",
"```json",
json.dumps(self.results, indent=2, default=str),
"```",
"",
]
path.write_text("\n".join(lines), encoding="utf-8")
self.stdout.write(self.style.SUCCESS(f"\nResults saved → {path}"))