Add cross-protocol agent routing at /v1/chat/completions
When X-Agent-Name or X-Agent-Id is present and matches an agent_models
entry, Festinger routes the main inference request to the configured
provider — not just the memory-writing utility model.
Protocol translation:
- Incoming OpenAI → outgoing Claude: system-message extraction,
max_tokens defaulting, response translated back to OpenAI format
- Incoming OpenAI → outgoing LM Studio/OpenAI: model + base_url swap
- All responses returned as OpenAI-compatible JSON or SSE
Also adds streaming synthesis for /v1/chat/completions (OpenAI SSE)
and X-Agent-Id fallback in _agent_name_from_headers so numeric
AGENT_ID env vars work without needing AGENT_NAME.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -574,13 +574,217 @@ def _extract_request_model_config(
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Agent routing — cross-protocol dispatch
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _openai_to_anthropic_body(body: dict, model_name: str) -> dict:
|
||||
"""
|
||||
Translate an OpenAI chat completions request to Anthropic Messages API format.
|
||||
- system messages are lifted to the top-level 'system' field
|
||||
- max_tokens defaults to 4096 if not specified
|
||||
- temperature/top_p forwarded if present
|
||||
"""
|
||||
system_parts: list[str] = []
|
||||
claude_messages: list[dict] = []
|
||||
|
||||
for m in body.get("messages", []):
|
||||
role = m.get("role", "")
|
||||
content = m.get("content", "")
|
||||
if role == "system":
|
||||
if isinstance(content, str):
|
||||
system_parts.append(content)
|
||||
elif isinstance(content, list):
|
||||
system_parts.extend(
|
||||
b.get("text", "") for b in content
|
||||
if isinstance(b, dict) and b.get("type") == "text"
|
||||
)
|
||||
else:
|
||||
claude_messages.append(m)
|
||||
|
||||
anthropic_body: dict = {
|
||||
"model": model_name,
|
||||
"messages": claude_messages,
|
||||
"max_tokens": body.get("max_tokens") or 4096,
|
||||
}
|
||||
if system_parts:
|
||||
anthropic_body["system"] = "\n\n".join(system_parts)
|
||||
for key in ("temperature", "top_p", "stop_sequences"):
|
||||
if key in body:
|
||||
anthropic_body[key] = body[key]
|
||||
return anthropic_body
|
||||
|
||||
|
||||
def _anthropic_to_openai_response(data: dict) -> dict:
|
||||
"""Convert an Anthropic Messages API response to OpenAI chat completions format."""
|
||||
text = "".join(
|
||||
b.get("text", "") for b in data.get("content", [])
|
||||
if b.get("type") == "text"
|
||||
)
|
||||
usage = data.get("usage", {})
|
||||
stop_map = {"end_turn": "stop", "max_tokens": "length", "stop_sequence": "stop"}
|
||||
finish = stop_map.get(data.get("stop_reason", "end_turn"), "stop")
|
||||
return {
|
||||
"id": data.get("id", "chatcmpl-festinger"),
|
||||
"object": "chat.completion",
|
||||
"created": int(time.time()),
|
||||
"model": data.get("model", ""),
|
||||
"choices": [{
|
||||
"index": 0,
|
||||
"message": {"role": "assistant", "content": text},
|
||||
"finish_reason": finish,
|
||||
}],
|
||||
"usage": {
|
||||
"prompt_tokens": usage.get("input_tokens", 0),
|
||||
"completion_tokens": usage.get("output_tokens", 0),
|
||||
"total_tokens": usage.get("input_tokens", 0) + usage.get("output_tokens", 0),
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def _openai_sse_from_response(raw: dict) -> bytes:
|
||||
"""
|
||||
Synthesise a minimal OpenAI-compatible SSE stream from a complete (non-streaming)
|
||||
OpenAI-format response dict. Used when the client sent stream=true but the
|
||||
upstream was called non-streaming.
|
||||
"""
|
||||
text = raw.get("choices", [{}])[0].get("message", {}).get("content", "")
|
||||
model = raw.get("model", "")
|
||||
cid = raw.get("id", "chatcmpl-festinger")
|
||||
ts = int(time.time())
|
||||
|
||||
def chunk(delta: dict, finish_reason=None) -> str:
|
||||
return "data: " + json.dumps({
|
||||
"id": cid, "object": "chat.completion.chunk",
|
||||
"created": ts, "model": model,
|
||||
"choices": [{"index": 0, "delta": delta, "finish_reason": finish_reason}],
|
||||
}) + "\n\n"
|
||||
|
||||
parts = [
|
||||
chunk({"role": "assistant", "content": ""}),
|
||||
chunk({"content": text}),
|
||||
chunk({}, finish_reason="stop"),
|
||||
"data: [DONE]\n\n",
|
||||
]
|
||||
return "".join(parts).encode()
|
||||
|
||||
|
||||
async def _get_agent_routing_model(pool, agent_name: str) -> ModelConfig | None:
|
||||
"""
|
||||
Look up the agent's configured model from the agent_models table.
|
||||
agent_name is the normalised key (lowercase name or numeric ID string).
|
||||
"""
|
||||
if not agent_name:
|
||||
return None
|
||||
async with pool.acquire() as conn:
|
||||
row = await conn.fetchrow(
|
||||
"""
|
||||
SELECT m.provider, m.model_name, m.api_key, m.base_url
|
||||
FROM agent_models am
|
||||
JOIN models m ON m.id = am.model_id
|
||||
WHERE am.agent_name = $1
|
||||
""",
|
||||
agent_name,
|
||||
)
|
||||
if not row:
|
||||
return None
|
||||
return ModelConfig(
|
||||
provider=row["provider"],
|
||||
model_name=row["model_name"],
|
||||
api_key=row["api_key"],
|
||||
base_url=row["base_url"] or "",
|
||||
)
|
||||
|
||||
|
||||
async def _route_agent_chat(
|
||||
body: dict,
|
||||
agent_model: ModelConfig,
|
||||
original_stream: bool,
|
||||
pool,
|
||||
cfg: dict,
|
||||
request_headers: dict,
|
||||
min_len: int,
|
||||
) -> Response:
|
||||
"""
|
||||
Route an OpenAI-compatible chat completions request to the agent's
|
||||
configured provider, handling cross-protocol translation.
|
||||
|
||||
claude → Anthropic Messages API (translated in both directions)
|
||||
openai → OpenAI-compatible endpoint (base_url + model swap)
|
||||
lm-studio→ same as openai
|
||||
"""
|
||||
# Run recollection injection (same pipeline as standard path)
|
||||
body = await process_prompt(body, "/v1/chat/completions", pool, cfg, request_headers)
|
||||
|
||||
sess = session_key(agent_model.model_name, body.get("messages", []))
|
||||
|
||||
# Capture upstream config for use in the loop-detection re-run closure
|
||||
if agent_model.provider == "claude":
|
||||
anthropic_upstream = agent_model.base_url or "https://api.anthropic.com"
|
||||
anthropic_headers = {
|
||||
"x-api-key": agent_model.api_key,
|
||||
"anthropic-version": "2023-06-01",
|
||||
"content-type": "application/json",
|
||||
}
|
||||
else:
|
||||
oai_upstream = agent_model.base_url or cfg.get("upstream_openai", "https://api.openai.com")
|
||||
oai_headers = {
|
||||
"authorization": f"Bearer {agent_model.api_key or 'lm-studio'}",
|
||||
"content-type": "application/json",
|
||||
}
|
||||
|
||||
async def _call(current_body: dict) -> tuple[str, dict]:
|
||||
"""Call the agent's upstream; always return (text, openai_format_dict)."""
|
||||
if agent_model.provider == "claude":
|
||||
ab = _openai_to_anthropic_body(current_body, agent_model.model_name)
|
||||
text, raw_a = await call_anthropic(ab, anthropic_upstream, anthropic_headers)
|
||||
return text, _anthropic_to_openai_response(raw_a)
|
||||
else:
|
||||
b = dict(current_body)
|
||||
b["model"] = agent_model.model_name
|
||||
text, raw_o = await call_openai(b, oai_upstream, oai_headers)
|
||||
return text, raw_o
|
||||
|
||||
text, raw = await _call(body)
|
||||
count = record_and_check(sess, text, min_len)
|
||||
|
||||
if count >= 2:
|
||||
log.warning(
|
||||
"loop_detected (agent routed) agent_model=%s session=%s count=%d",
|
||||
agent_model.model_name, sess[1], count,
|
||||
)
|
||||
body, override = apply_mitigations(body, count, cfg)
|
||||
if override is not None:
|
||||
raw["choices"] = [{
|
||||
"index": 0,
|
||||
"message": {"role": "assistant", "content": override},
|
||||
"finish_reason": "stop",
|
||||
}]
|
||||
raw["loop_detected"] = True
|
||||
else:
|
||||
text, raw = await _call(body)
|
||||
record_and_check(sess, text, min_len)
|
||||
|
||||
if original_stream:
|
||||
return Response(content=_openai_sse_from_response(raw), media_type="text/event-stream")
|
||||
return Response(content=json.dumps(raw), media_type="application/json")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Saliency + recollection pipeline
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _agent_name_from_headers(headers: dict) -> str:
|
||||
"""Extract and normalise the agent identity from X-Agent-Name header."""
|
||||
return headers.get("x-agent-name", "").strip().lower()
|
||||
"""
|
||||
Extract agent identity from request headers.
|
||||
Checks X-Agent-Name first (e.g. 'GUNNAR' → 'gunnar'),
|
||||
then falls back to X-Agent-Id (e.g. '3' → '3').
|
||||
Both are stored as the agent_name key in the agent_models table.
|
||||
"""
|
||||
name = headers.get("x-agent-name", "").strip().lower()
|
||||
if name:
|
||||
return name
|
||||
return headers.get("x-agent-id", "").strip() # numeric IDs work as-is
|
||||
|
||||
|
||||
async def process_prompt(body: dict, path: str, pool, cfg: dict, request_headers: dict | None = None) -> dict:
|
||||
@@ -830,10 +1034,29 @@ async def openai_chat_completions(request: Request) -> Response:
|
||||
model = body.get("model", "unknown")
|
||||
upstream = cfg["upstream_openai"]
|
||||
min_len = cfg["detection"]["min_length"]
|
||||
log.info("chat route=/v1/chat/completions model=%s upstream=%s", model, upstream)
|
||||
original_stream: bool = bool(body.get("stream", False))
|
||||
hdrs = dict(request.headers)
|
||||
agent_name = _agent_name_from_headers(hdrs)
|
||||
|
||||
log.info("chat route=/v1/chat/completions model=%s upstream=%s agent=%s stream=%s",
|
||||
model, upstream, agent_name or "—", original_stream)
|
||||
try:
|
||||
# Agent routing: if agent has a registered model, dispatch cross-protocol
|
||||
if agent_name:
|
||||
agent_model = await _get_agent_routing_model(pool, agent_name)
|
||||
if agent_model:
|
||||
log.info(
|
||||
"agent_route agent=%s provider=%s model=%s base_url=%s",
|
||||
agent_name, agent_model.provider, agent_model.model_name,
|
||||
agent_model.base_url or "(default)",
|
||||
)
|
||||
return await _route_agent_chat(
|
||||
body, agent_model, original_stream, pool, cfg, hdrs, min_len
|
||||
)
|
||||
|
||||
# Standard path — forward to configured upstream unchanged
|
||||
headers = _relay_headers(request, OPENAI_RELAY_HEADERS)
|
||||
body = await process_prompt(body, "/v1/chat/completions", pool, cfg, headers)
|
||||
body = await process_prompt(body, "/v1/chat/completions", pool, cfg, hdrs)
|
||||
messages = body.get("messages", [])
|
||||
sess = session_key(model, messages)
|
||||
text, raw = await call_openai(body, upstream, headers)
|
||||
@@ -845,9 +1068,13 @@ async def openai_chat_completions(request: Request) -> Response:
|
||||
if raw.get("choices"):
|
||||
raw["choices"][0]["message"]["content"] = override
|
||||
raw["loop_detected"] = True
|
||||
if original_stream:
|
||||
return Response(content=_openai_sse_from_response(raw), media_type="text/event-stream")
|
||||
return Response(content=json.dumps(raw), media_type="application/json")
|
||||
text, raw = await call_openai(body, upstream, headers)
|
||||
record_and_check(sess, text, min_len)
|
||||
if original_stream:
|
||||
return Response(content=_openai_sse_from_response(raw), media_type="text/event-stream")
|
||||
return Response(content=json.dumps(raw), media_type="application/json")
|
||||
except UpstreamError as exc:
|
||||
log.error("chat_upstream_error route=/v1/chat/completions model=%s %s", model, exc)
|
||||
@@ -2023,19 +2250,21 @@ ADMIN_HTML = """<!DOCTYPE html>
|
||||
|
||||
<h2>Agent models</h2>
|
||||
<p style="font-size:0.83em;color:#666;margin-bottom:0.8em">
|
||||
Assign a model per agent. Agent0 must send <code>X-Agent-Name: <name></code> on every request.
|
||||
Takes priority over the global write model.
|
||||
Routes the main inference request to the agent's model — full cross-protocol
|
||||
(Claude ↔ LM Studio ↔ OpenAI). Agent must send <code>X-Agent-Name: GUNNAR</code>
|
||||
or <code>X-Agent-Id: 3</code> on every request. Also determines which model
|
||||
Festinger uses for memory writing (context discovery).
|
||||
</p>
|
||||
<table id="agent-models-table" style="margin-bottom:0.8em">
|
||||
<thead><tr><th>Agent</th><th>Model ID</th><th>Provider</th><th>Model name</th><th></th></tr></thead>
|
||||
<thead><tr><th>Agent name / ID</th><th>Model ID</th><th>Provider</th><th>Model name</th><th></th></tr></thead>
|
||||
<tbody id="agent-models-tbody"><tr><td colspan="5">Loading…</td></tr></tbody>
|
||||
</table>
|
||||
<details style="margin-bottom:1em">
|
||||
<summary style="cursor:pointer;font-size:0.9em;color:#555">Assign model to agent…</summary>
|
||||
<div style="margin-top:0.6em;display:flex;gap:0.7em;flex-wrap:wrap;align-items:flex-end">
|
||||
<label style="font-size:0.85em">Agent name (e.g. gunnar)
|
||||
<input id="am-agent" type="text" placeholder="gunnar"
|
||||
style="font-family:monospace;padding:5px 8px;border:1px solid #ccc;border-radius:3px;display:block;margin-top:2px;width:140px">
|
||||
<label style="font-size:0.85em">Agent name or ID (e.g. gunnar or 3)
|
||||
<input id="am-agent" type="text" placeholder="gunnar or 3"
|
||||
style="font-family:monospace;padding:5px 8px;border:1px solid #ccc;border-radius:3px;display:block;margin-top:2px;width:160px">
|
||||
</label>
|
||||
<label style="font-size:0.85em">Model
|
||||
<select id="am-model" style="font-family:monospace;padding:5px 8px;border:1px solid #ccc;border-radius:3px;display:block;margin-top:2px">
|
||||
|
||||
Reference in New Issue
Block a user