diff --git a/plugins/festinger/db/schema.sql b/plugins/festinger/db/schema.sql index 6140c7b..9e2093e 100644 --- a/plugins/festinger/db/schema.sql +++ b/plugins/festinger/db/schema.sql @@ -111,3 +111,14 @@ CREATE TABLE IF NOT EXISTS kg_write_log ( CREATE INDEX IF NOT EXISTS kwl_created_idx ON kg_write_log (created_at DESC); CREATE INDEX IF NOT EXISTS kwl_concept_idx ON kg_write_log (concept_id); CREATE INDEX IF NOT EXISTS kwl_op_idx ON kg_write_log (op); + +-- --------------------------------------------------------------------------- +-- agent_models — per-agent LLM model assignments +-- Maps an agent identity (from X-Agent-Name header) to a specific model. +-- Priority over write_model_id (global default) when agent_name is present. +-- --------------------------------------------------------------------------- +CREATE TABLE IF NOT EXISTS agent_models ( + agent_name TEXT PRIMARY KEY, -- normalised lowercase, e.g. 'gunnar', 'rind' + model_id INT NOT NULL REFERENCES models(id) ON DELETE CASCADE, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); diff --git a/plugins/festinger/festinger/db.py b/plugins/festinger/festinger/db.py index b91df73..3e82a0b 100644 --- a/plugins/festinger/festinger/db.py +++ b/plugins/festinger/festinger/db.py @@ -60,6 +60,16 @@ async def init_schema(pool: asyncpg.Pool) -> None: await conn.execute( "ALTER TABLE soas ADD COLUMN IF NOT EXISTS first_seen_context TEXT NOT NULL DEFAULT ''" ) + # Migration: per-agent model assignments (agent_name → model_id) + await conn.execute( + """ + CREATE TABLE IF NOT EXISTS agent_models ( + agent_name TEXT PRIMARY KEY, + model_id INT NOT NULL REFERENCES models(id) ON DELETE CASCADE, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() + ) + """ + ) log.info("schema applied") diff --git a/plugins/festinger/festinger/main.py b/plugins/festinger/festinger/main.py index e8d43e7..19d7f34 100644 --- a/plugins/festinger/festinger/main.py +++ b/plugins/festinger/festinger/main.py @@ -578,6 +578,11 @@ def _extract_request_model_config( # Saliency + recollection pipeline # --------------------------------------------------------------------------- +def _agent_name_from_headers(headers: dict) -> str: + """Extract and normalise the agent identity from X-Agent-Name header.""" + return headers.get("x-agent-name", "").strip().lower() + + async def process_prompt(body: dict, path: str, pool, cfg: dict, request_headers: dict | None = None) -> dict: """ Run the saliency + recollection pipeline over the prompt. @@ -587,9 +592,11 @@ async def process_prompt(body: dict, path: str, pool, cfg: dict, request_headers conf_floor = float(await get_config(pool, "recollection_confidence_floor", "0.6")) recency_days = int(await get_config(pool, "recollection_recency_days", "90")) + hdrs = request_headers or {} # Derive a ModelConfig from the intercepted request so context discovery can # mirror Agent0's current model without a separate write_model_id config. - request_model = _extract_request_model_config(path, body, request_headers or {}, cfg) + request_model = _extract_request_model_config(path, body, hdrs, cfg) + agent_name = _agent_name_from_headers(hdrs) # Extract only the last user message — agent responses and reasoning traces # are noise for both cue scanning and concept discovery. @@ -651,7 +658,11 @@ async def process_prompt(body: dict, path: str, pool, cfg: dict, request_headers # 3. Enqueue for LLM-driven discovery if there are candidates to evaluate. if novel_candidates and len(user_text) >= 20: - await enqueue_context_discover(user_text, novel_candidates, fallback_model=request_model) + await enqueue_context_discover( + user_text, novel_candidates, + agent_name=agent_name, + fallback_model=request_model, + ) if not salient_for_read: return body @@ -964,6 +975,81 @@ async def delete_model(model_id: int, request: Request) -> dict: return {"status": "ok", "deleted": model_id} +# --------------------------------------------------------------------------- +# /agent-models — per-agent model assignments +# --------------------------------------------------------------------------- + +@app.get("/agent-models") +async def list_agent_models(request: Request) -> dict: + pool = request.app.state.pool + async with pool.acquire() as conn: + rows = await conn.fetch( + """ + SELECT am.agent_name, am.model_id, am.created_at, + m.provider, m.model_name, m.base_url + FROM agent_models am + JOIN models m ON m.id = am.model_id + ORDER BY am.agent_name + """ + ) + return {"agent_models": [ + { + "agent_name": r["agent_name"], + "model_id": r["model_id"], + "provider": r["provider"], + "model_name": r["model_name"], + "base_url": r["base_url"] or "", + "created_at": r["created_at"].isoformat(), + } + for r in rows + ]} + + +@app.put("/agent-models/{agent_name}") +async def set_agent_model(agent_name: str, request: Request) -> dict: + """ + Assign a model to an agent. agent_name is normalised to lowercase. + Body: {"model_id": 3} + """ + pool = request.app.state.pool + data = await request.json() + model_id = data.get("model_id") + if not model_id: + return {"error": "model_id is required"} + name = agent_name.strip().lower() + if not name: + return {"error": "agent_name must not be empty"} + async with pool.acquire() as conn: + # Verify the model exists + row = await conn.fetchrow("SELECT id, provider, model_name FROM models WHERE id=$1", int(model_id)) + if not row: + return {"error": f"model {model_id} not found"} + await conn.execute( + """ + INSERT INTO agent_models (agent_name, model_id) + VALUES ($1, $2) + ON CONFLICT (agent_name) DO UPDATE SET model_id = EXCLUDED.model_id, created_at = now() + """, + name, int(model_id), + ) + log.info("agent_model set agent=%s model_id=%s provider=%s model=%s", + name, model_id, row["provider"], row["model_name"]) + return {"status": "ok", "agent_name": name, "model_id": int(model_id)} + + +@app.delete("/agent-models/{agent_name}") +async def delete_agent_model(agent_name: str, request: Request) -> dict: + pool = request.app.state.pool + name = agent_name.strip().lower() + async with pool.acquire() as conn: + result = await conn.execute("DELETE FROM agent_models WHERE agent_name=$1", name) + deleted = int(result.split()[-1]) if result else 0 + if not deleted: + return {"error": f"no assignment found for agent '{name}'"} + log.info("agent_model deleted agent=%s", name) + return {"status": "ok", "deleted": name} + + @app.get("/models/discover") async def discover_models(base_url: str = "http://host.docker.internal:1234") -> dict: """ @@ -1935,6 +2021,31 @@ ADMIN_HTML = """ +

Agent models

+

+ Assign a model per agent. Agent0 must send X-Agent-Name: <name> on every request. + Takes priority over the global write model. +

+ + + +
AgentModel IDProviderModel name
Loading…
+
+ Assign model to agent… +
+ + + +
+
+

Actions

@@ -2011,6 +2122,11 @@ ADMIN_HTML = """ `).join(''); + + // Populate model dropdown in agent-models assignment form + const sel = document.getElementById('am-model'); + sel.innerHTML = '' + + md.models.map(m => ``).join(''); }} async function addModel(btn) {{ @@ -2052,6 +2168,57 @@ ADMIN_HTML = """ await loadModels(); }} + async function loadAgentModels() {{ + const r = await fetch('/agent-models'); + const d = await r.json(); + const tbody = document.getElementById('agent-models-tbody'); + if (!d.agent_models.length) {{ + tbody.innerHTML = 'No assignments yet.'; + return; + }} + tbody.innerHTML = d.agent_models.map(a => ` + + ${{a.agent_name}} + ${{a.model_id}} + ${{a.provider}} + ${{a.model_name}}${{a.base_url ? ' (' + a.base_url + ')' : ''}} + + `).join(''); + }} + + async function assignAgentModel(btn) {{ + const agent = document.getElementById('am-agent').value.trim().toLowerCase(); + const model_id = document.getElementById('am-model').value; + if (!agent) {{ alert('Enter an agent name.'); return; }} + if (!model_id) {{ alert('Select a model.'); return; }} + btn.disabled = true; + try {{ + const r = await fetch('/agent-models/' + encodeURIComponent(agent), {{ + method: 'PUT', + headers: {{'Content-Type': 'application/json'}}, + body: JSON.stringify({{model_id: parseInt(model_id)}}) + }}); + const d = await r.json(); + if (d.error) {{ showResult('Error: ' + d.error, false); return; }} + showResult('Assigned model ' + model_id + ' to agent "' + agent + '".', true); + document.getElementById('am-agent').value = ''; + await loadAgentModels(); + }} catch(e) {{ showResult('Error: ' + e.message, false); }} + finally {{ btn.disabled = false; }} + }} + + async function removeAgentModel(agent, btn) {{ + if (!confirm('Remove model assignment for "' + agent + '"?')) return; + btn.disabled = true; + try {{ + const r = await fetch('/agent-models/' + encodeURIComponent(agent), {{method: 'DELETE'}}); + const d = await r.json(); + if (d.error) {{ showResult('Error: ' + d.error, false); return; }} + await loadAgentModels(); + }} catch(e) {{ showResult('Error: ' + e.message, false); }} + finally {{ btn.disabled = false; }} + }} + async function loadConflicts() {{ const r = await fetch('/conflicts'); const d = await r.json(); @@ -2200,6 +2367,7 @@ ADMIN_HTML = """ loadConflicts(); loadLog(0); loadModels(); + loadAgentModels(); diff --git a/plugins/festinger/festinger/write_queue.py b/plugins/festinger/festinger/write_queue.py index 24f0e8a..44f4048 100644 --- a/plugins/festinger/festinger/write_queue.py +++ b/plugins/festinger/festinger/write_queue.py @@ -32,15 +32,18 @@ class ContextDiscoverRequest: User message text plus candidate tokens (words absent from the English dictionary) submitted for LLM-driven concept discovery. - candidate_tokens are hints for the LLM — it decides which are real domain - concepts vs typos/noise, and extracts relationship triples from the text. + candidate_tokens: hints for the LLM — it decides which are real domain + concepts vs typos/noise, and extracts relationship triples from the text. - fallback_model: if set, used when write_model_id is not configured in the - DB. Allows Festinger to mirror the model Agent0 is currently using without - any extra configuration. + agent_name: normalised agent identity from X-Agent-Name header (lowercase). + Used to look up an agent-specific model in the agent_models table. + + fallback_model: last-resort ModelConfig derived from the intercepted request. + Used when neither an agent-specific model nor write_model_id is configured. """ user_text: str candidate_tokens: list[str] + agent_name: str = "" fallback_model: Optional[ModelConfig] = None @@ -62,19 +65,23 @@ _llm_semaphore: asyncio.Semaphore | None = None async def enqueue_context_discover( user_text: str, candidate_tokens: list[str], + agent_name: str = "", fallback_model: Optional[ModelConfig] = None, ) -> None: """ Enqueue a user message for LLM-driven concept discovery and relation extraction. - fallback_model: ModelConfig derived from the intercepted request. Used when - write_model_id is not configured — lets Festinger mirror Agent0's model - without any extra setup. + agent_name: normalised agent identity (from X-Agent-Name header). + Festinger looks up an agent-specific model in the agent_models table. + + fallback_model: last-resort ModelConfig from the intercepted request. + Used when neither an agent-specific model nor write_model_id is configured. """ try: _queue.put_nowait(ContextDiscoverRequest( user_text=user_text, candidate_tokens=candidate_tokens, + agent_name=agent_name, fallback_model=fallback_model, )) except asyncio.QueueFull: @@ -119,7 +126,8 @@ async def _worker(pool: asyncpg.Pool) -> None: # Slow path: fire off without awaiting so the worker stays free. asyncio.create_task( _process_context_discover_guarded( - pool, item.user_text, item.candidate_tokens, item.fallback_model + pool, item.user_text, item.candidate_tokens, + item.agent_name, item.fallback_model, ) ) except Exception as e: @@ -132,13 +140,14 @@ async def _process_context_discover_guarded( pool: asyncpg.Pool, user_text: str, candidate_tokens: list[str], + agent_name: str = "", fallback_model: Optional[ModelConfig] = None, ) -> None: """Wrapper that acquires the LLM semaphore before concept discovery.""" assert _llm_semaphore is not None async with _llm_semaphore: try: - await _process_context_discover(pool, user_text, candidate_tokens, fallback_model) + await _process_context_discover(pool, user_text, candidate_tokens, agent_name, fallback_model) except Exception as e: log.exception("context discover task error: %s", e) @@ -171,20 +180,74 @@ async def _process_cue(pool: asyncpg.Pool, triple: CueTriple) -> None: log.info("cue triple collision: %s", collision) +async def _resolve_discover_model( + pool: asyncpg.Pool, + agent_name: str, + fallback_model: Optional[ModelConfig], +) -> Optional[ModelConfig]: + """ + Resolve which LLM to use for context discovery. + + Priority: + 1. Agent-specific model — agent_models table, keyed by agent_name + 2. Global default — write_model_id config key + 3. Request mirror — fallback_model (same provider/model Agent0 used) + """ + if agent_name: + async with pool.acquire() as conn: + row = await conn.fetchrow( + """ + SELECT m.provider, m.model_name, m.api_key, m.base_url + FROM agent_models am + JOIN models m ON m.id = am.model_id + WHERE am.agent_name = $1 + """, + agent_name, + ) + if row: + log.debug( + "context discover: agent=%s → provider=%s model=%s", + agent_name, row["provider"], row["model_name"], + ) + return ModelConfig( + provider=row["provider"], + model_name=row["model_name"], + api_key=row["api_key"], + base_url=row["base_url"] or "", + ) + + write_model_id = await get_config(pool, "write_model_id") + if write_model_id: + m = await get_model_config(pool, write_model_id) + if m: + return m + log.warning("write_model_id=%s not found in models table", write_model_id) + + if fallback_model: + log.debug( + "context discover: mirroring request model provider=%s model=%s", + fallback_model.provider, fallback_model.model_name, + ) + return fallback_model + + return None + + async def _process_context_discover( pool: asyncpg.Pool, user_text: str, candidate_tokens: list[str], + agent_name: str = "", fallback_model: Optional[ModelConfig] = None, ) -> None: """ Ask the LLM to evaluate candidate tokens (dictionary misses) and extract relationship triples for those it judges to be real domain concepts. - Model selection priority: - 1. write_model_id config key → explicit utility model override (DB models table) - 2. fallback_model → mirrors the model Agent0 used for this request - 3. Neither set → skip (log debug) + Model selection priority (see _resolve_discover_model): + 1. Agent-specific model (agent_models table) + 2. write_model_id config (global default) + 3. Request mirror (fallback_model) Saliency feedback loop: - Confirmed concepts (triples inserted) → saliency raised to NOVEL_CONFIRMED_SALIENCY @@ -192,20 +255,9 @@ async def _process_context_discover( - Rejected concepts (no triples) → saliency stays low (NOVEL_INITIAL_SALIENCY), effectively hiding typos and noise from the recollection engine. """ - write_model_id = await get_config(pool, "write_model_id") - if write_model_id: - model = await get_model_config(pool, write_model_id) - if not model: - log.warning("write_model_id=%s not found in models table", write_model_id) - return - elif fallback_model: - model = fallback_model - log.debug( - "context discover: using request model provider=%s model=%s", - model.provider, model.model_name, - ) - else: - log.debug("no write_model_id configured and no request model available — skipping context discover") + model = await _resolve_discover_model(pool, agent_name, fallback_model) + if not model: + log.debug("no model resolved for context discover (agent=%r) — skipping", agent_name) return seed_dims = ["type", "membership", "runs-on", "tech", "owned-by", "geography"]