diff --git a/plugins/festinger/festinger/main.py b/plugins/festinger/festinger/main.py index 19d7f34..2d32dbc 100644 --- a/plugins/festinger/festinger/main.py +++ b/plugins/festinger/festinger/main.py @@ -574,13 +574,217 @@ def _extract_request_model_config( return None +# --------------------------------------------------------------------------- +# Agent routing — cross-protocol dispatch +# --------------------------------------------------------------------------- + +def _openai_to_anthropic_body(body: dict, model_name: str) -> dict: + """ + Translate an OpenAI chat completions request to Anthropic Messages API format. + - system messages are lifted to the top-level 'system' field + - max_tokens defaults to 4096 if not specified + - temperature/top_p forwarded if present + """ + system_parts: list[str] = [] + claude_messages: list[dict] = [] + + for m in body.get("messages", []): + role = m.get("role", "") + content = m.get("content", "") + if role == "system": + if isinstance(content, str): + system_parts.append(content) + elif isinstance(content, list): + system_parts.extend( + b.get("text", "") for b in content + if isinstance(b, dict) and b.get("type") == "text" + ) + else: + claude_messages.append(m) + + anthropic_body: dict = { + "model": model_name, + "messages": claude_messages, + "max_tokens": body.get("max_tokens") or 4096, + } + if system_parts: + anthropic_body["system"] = "\n\n".join(system_parts) + for key in ("temperature", "top_p", "stop_sequences"): + if key in body: + anthropic_body[key] = body[key] + return anthropic_body + + +def _anthropic_to_openai_response(data: dict) -> dict: + """Convert an Anthropic Messages API response to OpenAI chat completions format.""" + text = "".join( + b.get("text", "") for b in data.get("content", []) + if b.get("type") == "text" + ) + usage = data.get("usage", {}) + stop_map = {"end_turn": "stop", "max_tokens": "length", "stop_sequence": "stop"} + finish = stop_map.get(data.get("stop_reason", "end_turn"), "stop") + return { + "id": data.get("id", "chatcmpl-festinger"), + "object": "chat.completion", + "created": int(time.time()), + "model": data.get("model", ""), + "choices": [{ + "index": 0, + "message": {"role": "assistant", "content": text}, + "finish_reason": finish, + }], + "usage": { + "prompt_tokens": usage.get("input_tokens", 0), + "completion_tokens": usage.get("output_tokens", 0), + "total_tokens": usage.get("input_tokens", 0) + usage.get("output_tokens", 0), + }, + } + + +def _openai_sse_from_response(raw: dict) -> bytes: + """ + Synthesise a minimal OpenAI-compatible SSE stream from a complete (non-streaming) + OpenAI-format response dict. Used when the client sent stream=true but the + upstream was called non-streaming. + """ + text = raw.get("choices", [{}])[0].get("message", {}).get("content", "") + model = raw.get("model", "") + cid = raw.get("id", "chatcmpl-festinger") + ts = int(time.time()) + + def chunk(delta: dict, finish_reason=None) -> str: + return "data: " + json.dumps({ + "id": cid, "object": "chat.completion.chunk", + "created": ts, "model": model, + "choices": [{"index": 0, "delta": delta, "finish_reason": finish_reason}], + }) + "\n\n" + + parts = [ + chunk({"role": "assistant", "content": ""}), + chunk({"content": text}), + chunk({}, finish_reason="stop"), + "data: [DONE]\n\n", + ] + return "".join(parts).encode() + + +async def _get_agent_routing_model(pool, agent_name: str) -> ModelConfig | None: + """ + Look up the agent's configured model from the agent_models table. + agent_name is the normalised key (lowercase name or numeric ID string). + """ + if not agent_name: + return None + async with pool.acquire() as conn: + row = await conn.fetchrow( + """ + SELECT m.provider, m.model_name, m.api_key, m.base_url + FROM agent_models am + JOIN models m ON m.id = am.model_id + WHERE am.agent_name = $1 + """, + agent_name, + ) + if not row: + return None + return ModelConfig( + provider=row["provider"], + model_name=row["model_name"], + api_key=row["api_key"], + base_url=row["base_url"] or "", + ) + + +async def _route_agent_chat( + body: dict, + agent_model: ModelConfig, + original_stream: bool, + pool, + cfg: dict, + request_headers: dict, + min_len: int, +) -> Response: + """ + Route an OpenAI-compatible chat completions request to the agent's + configured provider, handling cross-protocol translation. + + claude → Anthropic Messages API (translated in both directions) + openai → OpenAI-compatible endpoint (base_url + model swap) + lm-studio→ same as openai + """ + # Run recollection injection (same pipeline as standard path) + body = await process_prompt(body, "/v1/chat/completions", pool, cfg, request_headers) + + sess = session_key(agent_model.model_name, body.get("messages", [])) + + # Capture upstream config for use in the loop-detection re-run closure + if agent_model.provider == "claude": + anthropic_upstream = agent_model.base_url or "https://api.anthropic.com" + anthropic_headers = { + "x-api-key": agent_model.api_key, + "anthropic-version": "2023-06-01", + "content-type": "application/json", + } + else: + oai_upstream = agent_model.base_url or cfg.get("upstream_openai", "https://api.openai.com") + oai_headers = { + "authorization": f"Bearer {agent_model.api_key or 'lm-studio'}", + "content-type": "application/json", + } + + async def _call(current_body: dict) -> tuple[str, dict]: + """Call the agent's upstream; always return (text, openai_format_dict).""" + if agent_model.provider == "claude": + ab = _openai_to_anthropic_body(current_body, agent_model.model_name) + text, raw_a = await call_anthropic(ab, anthropic_upstream, anthropic_headers) + return text, _anthropic_to_openai_response(raw_a) + else: + b = dict(current_body) + b["model"] = agent_model.model_name + text, raw_o = await call_openai(b, oai_upstream, oai_headers) + return text, raw_o + + text, raw = await _call(body) + count = record_and_check(sess, text, min_len) + + if count >= 2: + log.warning( + "loop_detected (agent routed) agent_model=%s session=%s count=%d", + agent_model.model_name, sess[1], count, + ) + body, override = apply_mitigations(body, count, cfg) + if override is not None: + raw["choices"] = [{ + "index": 0, + "message": {"role": "assistant", "content": override}, + "finish_reason": "stop", + }] + raw["loop_detected"] = True + else: + text, raw = await _call(body) + record_and_check(sess, text, min_len) + + if original_stream: + return Response(content=_openai_sse_from_response(raw), media_type="text/event-stream") + return Response(content=json.dumps(raw), media_type="application/json") + + # --------------------------------------------------------------------------- # Saliency + recollection pipeline # --------------------------------------------------------------------------- def _agent_name_from_headers(headers: dict) -> str: - """Extract and normalise the agent identity from X-Agent-Name header.""" - return headers.get("x-agent-name", "").strip().lower() + """ + Extract agent identity from request headers. + Checks X-Agent-Name first (e.g. 'GUNNAR' → 'gunnar'), + then falls back to X-Agent-Id (e.g. '3' → '3'). + Both are stored as the agent_name key in the agent_models table. + """ + name = headers.get("x-agent-name", "").strip().lower() + if name: + return name + return headers.get("x-agent-id", "").strip() # numeric IDs work as-is async def process_prompt(body: dict, path: str, pool, cfg: dict, request_headers: dict | None = None) -> dict: @@ -830,10 +1034,29 @@ async def openai_chat_completions(request: Request) -> Response: model = body.get("model", "unknown") upstream = cfg["upstream_openai"] min_len = cfg["detection"]["min_length"] - log.info("chat route=/v1/chat/completions model=%s upstream=%s", model, upstream) + original_stream: bool = bool(body.get("stream", False)) + hdrs = dict(request.headers) + agent_name = _agent_name_from_headers(hdrs) + + log.info("chat route=/v1/chat/completions model=%s upstream=%s agent=%s stream=%s", + model, upstream, agent_name or "—", original_stream) try: + # Agent routing: if agent has a registered model, dispatch cross-protocol + if agent_name: + agent_model = await _get_agent_routing_model(pool, agent_name) + if agent_model: + log.info( + "agent_route agent=%s provider=%s model=%s base_url=%s", + agent_name, agent_model.provider, agent_model.model_name, + agent_model.base_url or "(default)", + ) + return await _route_agent_chat( + body, agent_model, original_stream, pool, cfg, hdrs, min_len + ) + + # Standard path — forward to configured upstream unchanged headers = _relay_headers(request, OPENAI_RELAY_HEADERS) - body = await process_prompt(body, "/v1/chat/completions", pool, cfg, headers) + body = await process_prompt(body, "/v1/chat/completions", pool, cfg, hdrs) messages = body.get("messages", []) sess = session_key(model, messages) text, raw = await call_openai(body, upstream, headers) @@ -845,9 +1068,13 @@ async def openai_chat_completions(request: Request) -> Response: if raw.get("choices"): raw["choices"][0]["message"]["content"] = override raw["loop_detected"] = True + if original_stream: + return Response(content=_openai_sse_from_response(raw), media_type="text/event-stream") return Response(content=json.dumps(raw), media_type="application/json") text, raw = await call_openai(body, upstream, headers) record_and_check(sess, text, min_len) + if original_stream: + return Response(content=_openai_sse_from_response(raw), media_type="text/event-stream") return Response(content=json.dumps(raw), media_type="application/json") except UpstreamError as exc: log.error("chat_upstream_error route=/v1/chat/completions model=%s %s", model, exc) @@ -2023,19 +2250,21 @@ ADMIN_HTML = """
- Assign a model per agent. Agent0 must send X-Agent-Name: <name> on every request.
- Takes priority over the global write model.
+ Routes the main inference request to the agent's model — full cross-protocol
+ (Claude ↔ LM Studio ↔ OpenAI). Agent must send X-Agent-Name: GUNNAR
+ or X-Agent-Id: 3 on every request. Also determines which model
+ Festinger uses for memory writing (context discovery).
| Agent | Model ID | Provider | Model name | |
|---|---|---|---|---|
| Agent name / ID | Model ID | Provider | Model name | |
| Loading… | ||||