Files
gitprov 07329d8672 Switch Agent0 from MCP to REST API (/api/api_message)
MCP endpoint returns 403 (wrong token) and the mcp library's TaskGroup
error is opaque. The REST API (X-API-KEY header, /api/api_message) is
already validated in connectivity tests and returns proper responses.

mcp_key field is reused as the X-API-KEY value.
context_id replaces chat_id for conversation continuity.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-02 19:44:11 +02:00

693 lines
29 KiB
Python

#!/usr/bin/env python3
"""Agent Inference Service
A generic LLM inference service that can serve any configured agent.
Receives the full agent profile (identity, knowledge, guardrails) and model
configuration in each request.
Three connection modes are supported, selected by agent_type (primary) or
the endpoint's type field:
agent_type = "standard" / type = anthropic|openai|ollama|lm_studio
Assembles a full system prompt from the agent profile and calls the model
via litellm. The LLM must respond with JSON {"message", "pose"}.
agent_type = "agent0" / type = "agent0"
Connects to a live Agent Zero instance via its MCP server (streamable-http).
Calls the built-in send_message tool, which has full tool use, memory, and
persistent context. Pose is chosen heuristically from the plain-text reply.
MCP URL: {endpoint}/mcp/t-{mcp_key}/http
agent_type = "hermes"
Connects to a Hermes Agent dashboard via JSON-RPC 2.0 WebSocket.
Fetches the ephemeral session token from the dashboard HTML automatically.
WebSocket: {endpoint}/api/ws?token={token}
Session: session.create → prompt.submit → stream message.delta events.
Nothing is summarised or truncated by this service. Context-window management
for LLM calls is the responsibility of the caller (gnommoweb).
"""
import asyncio
import os
import json
import re
import time
import logging
from fastapi import FastAPI, HTTPException, Header
from pydantic import BaseModel, Field
from typing import Optional
import httpx
import litellm
# --- Config ---
API_KEY = os.getenv("AGENT_INFERENCE_KEY", "agent-inference-dev-key")
AGENT_MAX_MESSAGES = int(os.getenv("AGENT_INFERENCE_MAX_MESSAGES", "10"))
logging.basicConfig(level=logging.INFO)
log = logging.getLogger("agent-inference")
# ── In-memory Agent0 session map ──────────────────────────────────────────────
# Maps gnommoweb conversation_id → Agent Zero chat_id (context id).
# Allows conversation continuity across turns for Agent0 MCP connections.
# Cleared on service restart — acceptable for current single-user usage.
_a0_sessions: dict[int, str] = {}
# ── In-memory Hermes session map ───────────────────────────────────────────────
# Maps gnommoweb conversation_id → Hermes session_id.
# Also caches the dashboard token per endpoint (re-fetched on 401).
_hermes_sessions: dict[int, str] = {}
_hermes_token_cache: dict[str, str] = {} # endpoint → session token
# ── Request / Response models ─────────────────────────────────────────────────
class ModelConfig(BaseModel):
name: Optional[str] = None
model_id: str = "" # litellm model string; label-only for Agent0 MCP
endpoint: Optional[str] = None # base URL; api_base for litellm, or Agent0 host
api_key: Optional[str] = None # LLM API key; not used for Agent0 MCP
type: Optional[str] = None # anthropic | openai | ollama | lm_studio | hermes | agent0
mcp_key: Optional[str] = None # Agent0 MCP token
class ModelsPayload(BaseModel):
heaviness: int = 1 # 1=light, 2=medium, 3=heavy
light: Optional[ModelConfig] = None
medium: Optional[ModelConfig] = None
heavy: Optional[ModelConfig] = None
class AgentProfile(BaseModel):
"""Full agent profile received from gnommoweb. All fields sent verbatim."""
name: str
role: str = ""
from_name: str = ""
poses: list[str] = Field(default_factory=list)
identity_document: str = ""
job_description: str = ""
guardrails: str = ""
best_practices: str = ""
knowledge_base: str = ""
mock_responses: list[str] = Field(default_factory=list)
agent_type: str = "standard" # standard | agent0 | hermes
class ChatRequest(BaseModel):
user_id: int
agent_id: int = 1
message: str
conversation_id: Optional[int] = None
history: list = Field(default_factory=list)
agent: Optional[AgentProfile] = None
models: Optional[ModelsPayload] = None
class ChatResponse(BaseModel):
letter_id: int = 0
timestamp: int
message: str
pose: str
conversation_id: Optional[int] = None
actions: Optional[list] = None
# ── System prompt (LLM path only) ─────────────────────────────────────────────
def build_system_prompt(agent: AgentProfile, message_count: int = 0) -> str:
"""
Assemble the complete system prompt for one LLM call.
Sections (all verbatim from the agent's DB fields):
1. Identity anchor — always present; name + role + from_name
2. Identity document — full persona, backstory, values (if set)
3. Job description — responsibilities and objectives (if set)
4. Guardrails — hard constraints (if set)
5. Best practices — response style and soft rules (if set)
6. Knowledge base — factual grounding (if set)
7. Poses — complete list of valid pose identifiers
8. Response format — JSON output structure
9. Wrap-up hint — injected near the conversation limit
"""
parts = []
# ── 1. Identity anchor (always present) ──────────────────────────────────
anchor_lines = [f"You are {agent.name}"]
if agent.role:
anchor_lines[0] += f", {agent.role}"
if agent.from_name and agent.from_name not in (agent.name, f"{agent.name}, {agent.role}"):
anchor_lines.append(f"You are known as: {agent.from_name}")
anchor_lines.append(
"You MUST maintain this character and persona throughout the entire conversation. "
"Do NOT break character. Do NOT acknowledge being an AI or a language model."
)
parts.append("\n".join(anchor_lines))
# ── 2. Identity document (verbatim) ──────────────────────────────────────
if agent.identity_document:
parts.append(f"## IDENTITY\n\n{agent.identity_document}")
# ── 3. Job description (verbatim) ────────────────────────────────────────
if agent.job_description:
parts.append(f"## YOUR ROLE\n\n{agent.job_description}")
# ── 4. Guardrails (verbatim) ─────────────────────────────────────────────
if agent.guardrails:
parts.append(f"## GUARDRAILS — FOLLOW EXACTLY\n\n{agent.guardrails}")
# ── 5. Best practices (verbatim) ─────────────────────────────────────────
if agent.best_practices:
parts.append(f"## BEST PRACTICES\n\n{agent.best_practices}")
# ── 6. Knowledge base (verbatim) ─────────────────────────────────────────
if agent.knowledge_base:
parts.append(f"## KNOWLEDGE BASE\n\n{agent.knowledge_base}")
# ── 7. Poses ──────────────────────────────────────────────────────────────
valid_poses = agent.poses if agent.poses else ["neutral"]
pose_lines = [
"## POSES",
"",
"Every response MUST include a pose field set to one of these exact identifiers:",
"",
]
for p in valid_poses:
pose_lines.append(f" {p}")
pose_lines += [
"",
"Choose the pose that best reflects your character's genuine emotional state in this moment.",
"Pose names are descriptive — use them accordingly.",
"Any identifier not in the list above will be rejected.",
]
parts.append("\n".join(pose_lines))
# ── 8. Response format ────────────────────────────────────────────────────
parts.append(
"## RESPONSE FORMAT\n\n"
"Respond ONLY with a valid JSON object. No prose before or after it.\n\n"
"Required fields:\n"
' "message" — your in-character response text\n'
' "pose" — one identifier from the POSES list above\n\n'
"Optional field:\n"
' "actions" — array of action buttons to show the user, e.g.:\n'
' [{"emoji": "🫳🏽", "label": "Do"}, {"emoji": "👁️", "label": "Look"}]\n'
" Omit entirely to keep the current action buttons unchanged.\n"
" Maximum 4 actions. The Leave action is always implicit.\n\n"
"Example:\n"
'{\n'
' "message": "Your response here.",\n'
' "pose": "neutral"\n'
'}'
)
# ── 9. Wrap-up hint (injected near the conversation limit) ────────────────
wrap_up_threshold = max(AGENT_MAX_MESSAGES - 3, 4)
if message_count >= AGENT_MAX_MESSAGES:
parts.append(
"## END THIS CONVERSATION NOW\n\n"
"This conversation has reached its limit. You MUST wrap up in this response. "
"Thank the user, guide them toward whatever next step suits your role, "
"and set your pose to 'closed'. This is your final message."
)
elif message_count >= wrap_up_threshold:
parts.append(
"NOTE: This conversation is approaching its limit. "
"Begin guiding the user toward a natural conclusion."
)
return "\n\n".join(parts)
# ── Model selection ───────────────────────────────────────────────────────────
def select_model(payload: Optional[ModelsPayload]) -> Optional[ModelConfig]:
"""
Pick the ModelConfig based on requested heaviness.
Fallback chain: requested heaviness → lower levels → None (use env fallback).
"""
if not payload:
return None
heaviness = payload.heaviness
candidates = []
if heaviness >= 3 and payload.heavy:
candidates.append(payload.heavy)
if heaviness >= 2 and payload.medium:
candidates.append(payload.medium)
if payload.light:
candidates.append(payload.light)
chosen = candidates[0] if candidates else None
if not chosen:
log.warning(f"No model configured for heaviness={heaviness}, falling back to env model")
return chosen
# ── Pose heuristic (Agent0 MCP path) ─────────────────────────────────────────
def pick_pose(text: str, valid_poses: list[str]) -> str:
"""
Choose a pose based on simple keyword heuristics.
Used for Agent0 MCP responses, which are plain text not JSON.
"""
t = text.lower()
candidates: list[tuple[list[str], list[str]]] = [
(["error", "fail", "sorry", "cannot", "unable", "permission denied"], ["sorry", "annoyed", "bored"]),
(["done", "complete", "finished", "success", "pulled", "updated", "deployed", "pushed"], ["engaged", "interested", "naughty"]),
(["?", "what", "which", "how", "why", "could you"], ["inquisitive"]),
(["warning", "careful", "note", "attention"], ["annoyed", "bored"]),
]
for keywords, poses in candidates:
if any(k in t for k in keywords):
for p in poses:
if p in valid_poses:
return p
for p in ("neutral", "neutral2", "engaged"):
if p in valid_poses:
return p
return valid_poses[0] if valid_poses else "neutral"
# ── Agent0 MCP connection ─────────────────────────────────────────────────────
async def handle_agent0_mcp(
req: ChatRequest,
agent: AgentProfile,
model: ModelConfig,
valid_poses: list[str],
) -> ChatResponse:
"""
Send a message to a live Agent Zero instance via its REST API
(POST /api/api_message, X-API-KEY header).
Conversation continuity is maintained via Agent Zero's context_id, which
maps to a gnommoweb conversation_id in _a0_sessions (in-memory).
"""
endpoint = (model.endpoint or "").strip().rstrip("/")
api_key = model.mcp_key or "" # stored in mcp_key field; used as X-API-KEY
api_url = f"{endpoint}/api/api_message"
# Retrieve existing Agent0 context_id for this conversation (if any)
a0_context_id = _a0_sessions.get(req.conversation_id) if req.conversation_id else None
log.info(
f"[{agent.name}] → Agent0 REST url={api_url} "
f"conv={req.conversation_id} a0_ctx={a0_context_id or 'new'} "
f"msg='{req.message[:60]}'"
)
try:
payload: dict = {
"message": req.message,
"lifetime_hours": 24,
}
if a0_context_id:
payload["context_id"] = a0_context_id
async with httpx.AsyncClient(timeout=120.0) as client:
r = await client.post(
api_url,
headers={"X-API-KEY": api_key, "Content-Type": "application/json"},
json=payload,
)
r.raise_for_status()
data = r.json()
log.info(f"[{agent.name}] ← Agent0 REST: {str(data)[:300]}")
response_text = data.get("response", "")
new_context_id = data.get("context_id", "")
# Persist context_id for conversation continuity
if new_context_id and req.conversation_id is not None:
_a0_sessions[req.conversation_id] = new_context_id
pose = pick_pose(response_text, valid_poses)
return ChatResponse(
letter_id=0,
timestamp=int(time.time()),
message=response_text,
pose=pose,
conversation_id=req.conversation_id,
)
except Exception as e:
log.error(f"[{agent.name}] Agent0 REST error: {e}")
fallback_pose = next(
(p for p in ("sorry", "annoyed", "neutral") if p in valid_poses),
valid_poses[0],
)
return ChatResponse(
letter_id=0,
timestamp=int(time.time()),
message="*connection interference* The response channel is temporarily disrupted. Try again in a moment.",
pose=fallback_pose,
conversation_id=req.conversation_id,
)
# ── Hermes Agent connection ───────────────────────────────────────────────────
async def _fetch_hermes_token(endpoint: str) -> str:
"""
Fetch the ephemeral session token injected into the Hermes dashboard HTML.
The token is generated fresh on every server start (secrets.token_urlsafe(32))
and embedded as: window.__HERMES_SESSION_TOKEN__="<token>"
Caches the result per endpoint; callers should invalidate on 401.
"""
cached = _hermes_token_cache.get(endpoint)
if cached:
return cached
url = f"{endpoint}/"
log.info(f"[hermes] fetching token — GET {url!r} (len={len(url)}, bytes={url.encode()!r})")
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(url)
resp.raise_for_status()
html = resp.text
m = re.search(r'__HERMES_SESSION_TOKEN__\s*=\s*"([^"]+)"', html)
if not m:
raise RuntimeError(
f"Could not extract Hermes session token from {endpoint}/ "
"(is the dashboard running?)"
)
token = m.group(1)
_hermes_token_cache[endpoint] = token
log.info(f"[hermes] fetched new session token from {endpoint}")
return token
async def handle_hermes(
req: "ChatRequest",
agent: AgentProfile,
model: ModelConfig,
valid_poses: list[str],
) -> "ChatResponse":
"""
Send a message to a Hermes Agent dashboard via JSON-RPC 2.0 WebSocket.
Protocol:
1. Fetch ephemeral session token from dashboard HTML (cached, re-fetched on 401).
2. Open WebSocket at {endpoint}/api/ws?token={token}
3. JSON-RPC session.create {}{ session_id }
4. JSON-RPC prompt.submit { session_id, text } → (ack)
5. Accumulate message.delta payloads until message.complete
6. Return assembled text with pose heuristic.
Conversation continuity: gnommoweb conversation_id → Hermes session_id
stored in _hermes_sessions (in-memory, cleared on restart).
"""
try:
from websockets.asyncio.client import connect as ws_connect
except ImportError:
from websockets.client import connect as ws_connect # type: ignore
endpoint = (model.endpoint or "").strip().rstrip("/")
hermes_session_id = _hermes_sessions.get(req.conversation_id) if req.conversation_id else None
token = await _fetch_hermes_token(endpoint)
ws_scheme = "wss" if endpoint.startswith("https") else "ws"
ws_url = f"{ws_scheme}://{endpoint.split('://', 1)[-1]}/api/ws?token={token}"
log.info(
f"[{agent.name}] → Hermes url={endpoint} "
f"conv={req.conversation_id} hermes_sess={hermes_session_id or 'new'} "
f"msg='{req.message[:60]}'"
)
async def _do_chat(tok: str) -> ChatResponse:
nonlocal hermes_session_id
ws_url_inner = f"{ws_scheme}://{endpoint.split('://', 1)[-1]}/api/ws?token={tok}"
async with ws_connect(ws_url_inner) as ws:
req_id = 0
async def rpc(method: str, params: dict) -> dict:
nonlocal req_id
req_id += 1
rid = f"h{req_id}"
await ws.send(json.dumps({"jsonrpc": "2.0", "id": rid, "method": method, "params": params}))
# Wait for the matching response (skip events that arrive first)
while True:
raw = await ws.recv()
msg = json.loads(raw)
if msg.get("id") == rid:
if "error" in msg:
raise RuntimeError(msg["error"].get("message", "RPC error"))
return msg.get("result") or {}
# Event frame — queue it for later (ignored for now, we don't miss
# message.delta here because prompt.submit is called after create)
# ── 1. Create or reuse session ────────────────────────────────
if not hermes_session_id:
created = await rpc("session.create", {})
hermes_session_id = created.get("session_id") or created.get("id", "")
if req.conversation_id is not None and hermes_session_id:
_hermes_sessions[req.conversation_id] = hermes_session_id
log.info(f"[{agent.name}] created Hermes session {hermes_session_id}")
# ── 2. Submit prompt ──────────────────────────────────────────
# Send prompt.submit but do NOT wait for its ack via rpc() —
# the ack arrives interleaved with events (after message.start).
# We drain all frames below until message.complete arrives.
submit_id = f"h{req_id + 1}"
req_id += 1
await ws.send(json.dumps({
"jsonrpc": "2.0", "id": submit_id,
"method": "prompt.submit",
"params": {"session_id": hermes_session_id, "text": req.message},
}))
# ── 3. Stream events until message.complete ───────────────────
full_text = ""
while True:
raw = await asyncio.wait_for(ws.recv(), timeout=120.0)
msg = json.loads(raw)
# RPC ack for prompt.submit — ignore, keep reading
if msg.get("id") == submit_id:
continue
if msg.get("method") != "event":
continue
params = msg.get("params") or {}
etype = params.get("type", "")
payload = params.get("payload") or {}
if etype == "message.delta":
full_text += payload.get("text", "")
elif etype == "message.complete":
# payload.text holds the full assembled response
complete_text = payload.get("text", "")
if complete_text:
full_text = complete_text
if payload.get("status") == "error":
raise RuntimeError(full_text or "Hermes returned an error response")
break
elif etype == "error":
raise RuntimeError(payload.get("message", "Hermes error"))
pose = pick_pose(full_text, valid_poses)
return ChatResponse(
letter_id=0,
timestamp=int(time.time()),
message=full_text or "*silence*",
pose=pose,
conversation_id=req.conversation_id,
)
try:
return await _do_chat(token)
except Exception as e:
err_str = str(e)
# 401 / rejected — token likely regenerated; clear cache and retry once
if "401" in err_str or "403" in err_str or "Unauthorized" in err_str.lower():
log.warning(f"[{agent.name}] Hermes token rejected, re-fetching…")
_hermes_token_cache.pop(endpoint, None)
if req.conversation_id is not None:
_hermes_sessions.pop(req.conversation_id, None)
hermes_session_id = None # force session.create on retry
try:
token = await _fetch_hermes_token(endpoint)
return await _do_chat(token)
except Exception as retry_err:
log.error(f"[{agent.name}] Hermes retry failed: {retry_err}")
err_str = str(retry_err)
log.error(f"[{agent.name}] Hermes error: {err_str}")
fallback_pose = next(
(p for p in ("sorry", "annoyed", "neutral") if p in valid_poses),
valid_poses[0],
)
return ChatResponse(
letter_id=0,
timestamp=int(time.time()),
message="*connection interference* The response channel is temporarily disrupted. Try again in a moment.",
pose=fallback_pose,
conversation_id=req.conversation_id,
)
# ── Fallback agent ────────────────────────────────────────────────────────────
def make_fallback_agent() -> AgentProfile:
return AgentProfile(
name="Agent",
role="University Agent",
poses=["neutral", "bored", "annoyed", "engaged", "closed"],
identity_document="You are a helpful university agent.",
)
# ── App ───────────────────────────────────────────────────────────────────────
app = FastAPI(title="Agent Inference Service", version="1.2.0")
@app.get("/health")
async def health():
return {"status": "ok", "service": "agent-inference"}
@app.post("/v1/agent/chat", response_model=ChatResponse)
async def agent_chat(req: ChatRequest, authorization: str = Header(default="")):
if authorization != f"Bearer {API_KEY}":
raise HTTPException(status_code=401, detail="Unauthorised.")
agent = req.agent or make_fallback_agent()
valid_poses = agent.poses if agent.poses else ["neutral"]
model = select_model(req.models)
log.info(
f"[route] agent={agent.name!r} agent_type={agent.agent_type!r} "
f"models_payload={req.models} "
f"selected_model={model}"
)
# ── Route: Hermes Agent ───────────────────────────────────────────────────
if agent.agent_type == "hermes":
if not model or not model.endpoint:
log.error(f"[{agent.name}] Hermes agent has no model with endpoint configured")
return ChatResponse(
letter_id=0,
timestamp=int(time.time()),
message="*configuration error* No Hermes endpoint configured for this agent.",
pose=valid_poses[0],
conversation_id=req.conversation_id,
)
try:
return await handle_hermes(req, agent, model, valid_poses)
except Exception as e:
import traceback
log.error(f"[hermes] unhandled exception:\n{traceback.format_exc()}")
raise
# ── Route: Agent0 MCP ─────────────────────────────────────────────────────
if agent.agent_type == "agent0" or (model and model.type == "agent0"):
return await handle_agent0_mcp(req, agent, model, valid_poses)
# ── Route: standard LLM via litellm ──────────────────────────────────────
if not model or not model.model_id:
log.error(f"[{agent.name}] No inference endpoint configured for this agent")
return ChatResponse(
letter_id=0,
timestamp=int(time.time()),
message="*configuration error* No inference endpoint configured for this agent.",
pose=valid_poses[0],
conversation_id=req.conversation_id,
)
model_id = model.model_id
api_base = model.endpoint
api_key = model.api_key
total_messages = len(req.history) + 1
system_prompt = build_system_prompt(agent, message_count=total_messages)
# Structure: [system] + history + [current user message]
messages = [{"role": "system", "content": system_prompt}]
for msg in req.history:
role = msg.get("role", "user")
content = msg.get("content", "")
if role not in ("user", "assistant"):
role = "user"
messages.append({"role": role, "content": content})
messages.append({"role": "user", "content": req.message})
log.info(
f"[{agent.name}] → {model_id} "
f"heaviness={req.models.heaviness if req.models else 1} "
f"history={len(req.history)} msgs "
f"prompt={len(system_prompt)} chars "
f"user={req.user_id} "
f"msg='{req.message[:60]}'"
)
completion_kwargs = dict(
model=model_id,
messages=messages,
temperature=0.8,
max_tokens=400,
)
if api_base:
completion_kwargs["api_base"] = api_base
if api_key:
completion_kwargs["api_key"] = api_key
try:
response = await litellm.acompletion(**completion_kwargs)
raw = response.choices[0].message.content.strip()
log.info(f"[{agent.name}] ← {raw[:200]}")
# Parse JSON response
try:
parsed = json.loads(raw)
except json.JSONDecodeError:
start = raw.find("{")
end = raw.rfind("}") + 1
if start >= 0 and end > start:
parsed = json.loads(raw[start:end])
else:
parsed = {"message": raw, "pose": "neutral"}
# Validate pose
pose = parsed.get("pose", "neutral")
if pose not in valid_poses:
log.warning(f"[{agent.name}] Invalid pose '{pose}', defaulting to '{valid_poses[0]}'")
pose = valid_poses[0]
# Validate optional actions
raw_actions = parsed.get("actions")
actions = None
if isinstance(raw_actions, list) and raw_actions:
actions = [
{"emoji": a["emoji"], "label": a["label"]}
for a in raw_actions
if isinstance(a, dict) and "emoji" in a and "label" in a
] or None
return ChatResponse(
letter_id=0,
timestamp=int(time.time()),
message=parsed.get("message", raw),
pose=pose,
conversation_id=req.conversation_id,
actions=actions,
)
except Exception as e:
log.error(f"[{agent.name}] LLM error: {e}")
fallback_pose = next(
(p for p in ("annoyed", "bored", "sorry", "neutral") if p in valid_poses),
valid_poses[0],
)
return ChatResponse(
letter_id=0,
timestamp=int(time.time()),
message="*connection interference* The response channel is temporarily disrupted. Try again in a moment.",
pose=fallback_pose,
conversation_id=req.conversation_id,
)
if __name__ == "__main__":
import uvicorn
port = int(os.getenv("AGENT_INFERENCE_PORT", "8089"))
log.info(f"Starting agent inference service on port {port}")
uvicorn.run(app, host="0.0.0.0", port=port)