Dynavera/docs/orchestration-pseudocode.md
2026-03-23 15:02:03 +00:00

180 lines
6.5 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Orchestration Pseudocode
This document provides pseudocode for the core runtime components of Dynavera.
Source references point to the submitted repository.
---
## 1. Multi-Turn Orchestration Loop
**Source:** `apps/onboarding/consumers/base.py:77132`
The `orchestrate` method is the central inference loop. It accumulates a message history,
calls the GPU inference endpoint with MCP tool definitions attached, handles any tool calls
the model requests, and only returns once the model produces a final text response (and the
minimum-turn threshold has been met).
```
function ORCHESTRATE(message, config, min_turns, max_turns):
messages ← [ {role: system, content: config.system_prompt},
{role: user, content: message} ]
for turn = 1 to max_turns do
emit THOUGHT status to WebSocket client
response ← POST /v1/chat/completions {
messages: messages,
tools: MCP_ROUTER.get_tool_definitions(),
tool_choice: "auto",
max_tokens: resolved_max_tokens
}
ai_msg ← response.choices[0].message
append ai_msg to messages
if ai_msg contains tool_calls then
for each call in ai_msg.tool_calls do
emit TOOL_START {name, args} to client
result ← MCP_ROUTER.handle(call.name, call.args)
emit TOOL_RESULT {result} to client
append {role: tool, name: call.name, content: result} to messages
end for
continue // re-enter loop with updated context
else // model returned a text response
content ← censor(ai_msg.content)
if turn < min_turns then
append force_reasoning_prompt to messages
continue // force at least one reasoning pass
end if
return content
end if
end for
return last_content // fallback if max_turns reached
```
**Key design points:**
- Tool results are injected back into the message history before the next inference call,
allowing the model to reason over retrieved evidence.
- `min_turns` enforces at least one structured reasoning pass before returning, improving
output quality on complex generation tasks.
- All status events (`THOUGHT`, `TOOL_START`, `TOOL_RESULT`, `COMPLETED`) are streamed to
the client over the WebSocket, making the reasoning process inspectable in the UI.
---
## 2. MCP Tool Dispatch
**Source:** `apps/onboarding/mcp.py:42127`
The `MCPRouter` exposes a fixed set of approved tools to the model. Tool definitions are
generated at class load time from method-level `@mcp_tool` decorator metadata.
```
function MCP_ROUTER.handle(tool_name, args):
method ← tool_name_to_method_map[tool_name]
if method is None then
return {error: "Tool not found"}
end if
try
return await method(args)
catch Exception as e
return {error: e.message}
end try
// search_knowledge (lines 78127)
function search_knowledge(args):
query_vector ← POST /v1/embeddings {input: args.query}
chunks ← SELECT content, metadata
FROM KnowledgeChunk
WHERE organization = role.organization
AND (role = args.role_uuid OR role IS NULL)
AND is_active = true
ORDER BY CosineDistance(embedding, query_vector) ASC
LIMIT 5
return [{content, source, relevance: 1 - distance} for chunk in chunks]
// update_progress (lines 129159)
function update_progress(args):
session ← OnboardingSession.get(uuid=args.session_uuid)
if args.score → session.state.last_score ← args.score
if args.completed → session.state.completed_modules ← append(args.completed_module)
session.save()
return {status: "success", new_state: session.state}
```
---
## 3. Knowledge Ingestion Pipeline
**Source:** `apps/knowledge/tasks.py:45117`
```
task ingest_training_file(file_uuid):
file ← TrainingFile.get(uuid=file_uuid)
file.status ← "ingesting"; file.save()
raw_text ← extract_text(file) // PDF / DOCX / TXT
all_chunks ← []
for segment in split(raw_text, size=CHUNK_SIZE) do
response ← POST /v1/semantic-chunk {
text: segment,
threshold: SEMANTIC_CHUNK_THRESHOLD
}
for (chunk_text, embedding) in zip(response.chunks, response.embeddings) do
all_chunks.append(KnowledgeChunk {
content: chunk_text,
embedding: embedding, // 768-dim vector
role: file.role,
metadata: {source: file.file_name}
})
end for
end for
new_chunks ← [c for c in all_chunks if c.hash not in existing_hashes]
KnowledgeChunk.bulk_create(new_chunks)
file.status ← "embedded"; file.save()
trigger update_agent_prompts_from_file(file.role.uuid)
```
---
## 4. Onboarding Generation Pipeline (CA → KA → AA)
**Source:** `apps/onboarding/consumers/generate.py:34124`
```
function run_pipeline(role):
// Phase 1 — Curriculum Agent
context ← search_knowledge(role, query=role.name + " responsibilities")
topics ← ORCHESTRATE(curriculum_generation_prompt(role, context), CA_config)
→ parsed as JSON list of topic strings (max 15)
// Phase 2 — Knowledge Agent (one pass per topic)
full_structure ← []
for each topic in topics do
hits ← search_knowledge(role, query=topic)
content ← ORCHESTRATE(knowledge_generation_prompt(topic, hits), KA_config,
min_turns=2, max_tokens=3500)
full_structure.append({title: topic, body: content})
end for
// Phase 3 — Assessment Agent
quiz_fields ← ORCHESTRATE(quiz_generation_prompt(topics, module_briefs), AA_config)
→ sanitised and validated; fallback quiz generated if JSON invalid
full_structure.append({title: "Final Assessment Quiz", fields: quiz_fields,
meta: {pass_mark: 80}})
OnboardingFlow.save(role, full_structure)
emit COMPLETED to client
```
**Grading strategy:**
- Multiple-choice questions: deterministic string comparison against `correct_option`
- Free-text / textarea responses: agent-graded by the AA at session completion
- Per-question outcomes persisted in session state for audit and feedback rendering