From 372d8e01d013deda4770e08e72b278ea0b47dbe8 Mon Sep 17 00:00:00 2001 From: jenstandstad Date: Mon, 20 Apr 2026 16:38:21 +0200 Subject: [PATCH] Updating main for error --- plugins/festinger/festinger/main.py | 1026 +++++++++++++++++++++++---- 1 file changed, 888 insertions(+), 138 deletions(-) diff --git a/plugins/festinger/festinger/main.py b/plugins/festinger/festinger/main.py index fb04578..727431d 100644 --- a/plugins/festinger/festinger/main.py +++ b/plugins/festinger/festinger/main.py @@ -111,6 +111,53 @@ async def lifespan(app: FastAPI): app = FastAPI(title="Festinger", lifespan=lifespan) +# --------------------------------------------------------------------------- +# Upstream error — preserves the real HTTP status code from upstream +# --------------------------------------------------------------------------- + +class UpstreamError(Exception): + """ + Raised when an upstream (Anthropic / Ollama / OpenAI) returns a non-2xx + response. Carries the original status code and body so route handlers can + forward them verbatim instead of turning everything into a 500. + """ + def __init__(self, status_code: int, content: bytes, content_type: str, provider: str): + self.status_code = status_code + self.content = content + self.content_type = content_type or "application/json" + self.provider = provider + super().__init__(f"{provider} upstream returned {status_code}") + + +# --------------------------------------------------------------------------- +# Request / response logging middleware +# --------------------------------------------------------------------------- + +@app.middleware("http") +async def log_requests(request: Request, call_next): + """Log every inbound request and its outcome. Catches unhandled exceptions.""" + t0 = time.perf_counter() + method = request.method + path = request.url.path + qs = ("?" + str(request.url.query)) if request.url.query else "" + size = request.headers.get("content-length", "?") + + log.info("→ %s %s%s bytes=%s", method, path, qs, size) + try: + response = await call_next(request) + ms = (time.perf_counter() - t0) * 1000 + log.info("← %d %s %.0fms", response.status_code, path, ms) + return response + except Exception as exc: + ms = (time.perf_counter() - t0) * 1000 + log.exception("✗ %s %s %.0fms unhandled %s: %s", method, path, ms, type(exc).__name__, exc) + return Response( + content=json.dumps({"error": str(exc), "type": type(exc).__name__}), + status_code=500, + media_type="application/json", + ) + + # --------------------------------------------------------------------------- # Ollama forwarding helpers # --------------------------------------------------------------------------- @@ -118,10 +165,28 @@ app = FastAPI(title="Festinger", lifespan=lifespan) async def call_ollama(path: str, body: dict, upstream: str) -> tuple[str, dict]: body = dict(body) body["stream"] = False - async with httpx.AsyncClient(timeout=300.0) as client: - r = await client.post(f"{upstream}{path}", json=body) - r.raise_for_status() - data = r.json() + model = body.get("model", "?") + url = f"{upstream}{path}" + log.info("upstream_call provider=ollama model=%s url=%s", model, url) + t0 = time.perf_counter() + try: + async with httpx.AsyncClient(timeout=300.0) as client: + r = await client.post(url, json=body) + except httpx.TimeoutException as exc: + log.error("upstream_timeout provider=ollama model=%s url=%s after=%.0fs %s", + model, url, time.perf_counter() - t0, exc) + raise + except httpx.RequestError as exc: + log.error("upstream_connect_error provider=ollama model=%s url=%s %s: %s", + model, url, type(exc).__name__, exc) + raise + ms = (time.perf_counter() - t0) * 1000 + if not r.is_success: + log.error("upstream_error provider=ollama model=%s url=%s status=%d %.0fms body=%.500s", + model, url, r.status_code, ms, r.text) + raise UpstreamError(r.status_code, r.content, r.headers.get("content-type", ""), "ollama") + log.info("upstream_ok provider=ollama model=%s status=%d %.0fms", model, r.status_code, ms) + data = r.json() if path == "/api/chat": text = data.get("message", {}).get("content", "") else: @@ -163,20 +228,31 @@ async def call_anthropic(body: dict, upstream: str, headers: dict) -> tuple[str, """ body = dict(body) body["stream"] = False - # Anthropic requires anthropic-version header; add default if caller omitted it if "anthropic-version" not in {k.lower() for k in headers}: headers = {**headers, "anthropic-version": "2023-06-01"} - async with httpx.AsyncClient(timeout=300.0) as client: - r = await client.post( - f"{upstream}/v1/messages", - json=body, - headers=headers, - ) - r.raise_for_status() - data = r.json() - - # Extract text from Anthropic content blocks + model = body.get("model", "?") + url = f"{upstream}/v1/messages" + log.info("upstream_call provider=anthropic model=%s url=%s", model, url) + t0 = time.perf_counter() + try: + async with httpx.AsyncClient(timeout=300.0) as client: + r = await client.post(url, json=body, headers=headers) + except httpx.TimeoutException as exc: + log.error("upstream_timeout provider=anthropic model=%s url=%s after=%.0fs %s", + model, url, time.perf_counter() - t0, exc) + raise + except httpx.RequestError as exc: + log.error("upstream_connect_error provider=anthropic model=%s url=%s %s: %s", + model, url, type(exc).__name__, exc) + raise + ms = (time.perf_counter() - t0) * 1000 + if not r.is_success: + log.error("upstream_error provider=anthropic model=%s url=%s status=%d %.0fms body=%.500s", + model, url, r.status_code, ms, r.text) + raise UpstreamError(r.status_code, r.content, r.headers.get("content-type", ""), "anthropic") + log.info("upstream_ok provider=anthropic model=%s status=%d %.0fms", model, r.status_code, ms) + data = r.json() text = "" for block in data.get("content", []): if block.get("type") == "text": @@ -191,16 +267,28 @@ async def call_openai(body: dict, upstream: str, headers: dict) -> tuple[str, di """ body = dict(body) body["stream"] = False - - async with httpx.AsyncClient(timeout=300.0) as client: - r = await client.post( - f"{upstream}/v1/chat/completions", - json=body, - headers=headers, - ) - r.raise_for_status() - data = r.json() - + model = body.get("model", "?") + url = f"{upstream}/v1/chat/completions" + log.info("upstream_call provider=openai model=%s url=%s", model, url) + t0 = time.perf_counter() + try: + async with httpx.AsyncClient(timeout=300.0) as client: + r = await client.post(url, json=body, headers=headers) + except httpx.TimeoutException as exc: + log.error("upstream_timeout provider=openai model=%s url=%s after=%.0fs %s", + model, url, time.perf_counter() - t0, exc) + raise + except httpx.RequestError as exc: + log.error("upstream_connect_error provider=openai model=%s url=%s %s: %s", + model, url, type(exc).__name__, exc) + raise + ms = (time.perf_counter() - t0) * 1000 + if not r.is_success: + log.error("upstream_error provider=openai model=%s url=%s status=%d %.0fms body=%.500s", + model, url, r.status_code, ms, r.text) + raise UpstreamError(r.status_code, r.content, r.headers.get("content-type", ""), "openai") + log.info("upstream_ok provider=openai model=%s status=%d %.0fms", model, r.status_code, ms) + data = r.json() text = data.get("choices", [{}])[0].get("message", {}).get("content", "") return text, data @@ -209,36 +297,52 @@ async def call_openai(body: dict, upstream: str, headers: dict) -> tuple[str, di # Text extraction helpers (unified across API formats) # --------------------------------------------------------------------------- +def _extract_text_strings(content) -> list[str]: + """ + Normalise any Anthropic content shape into a list of plain strings. + Handles: bare string, list of content-block dicts, or anything unexpected. + """ + if isinstance(content, str): + return [content] if content else [] + if isinstance(content, list): + out = [] + for block in content: + if isinstance(block, dict) and block.get("type") == "text": + text = block.get("text", "") + if isinstance(text, str) and text: + out.append(text) + return out + return [] + + def extract_prompt_text(body: dict, path: str) -> str: """Extract a flat string from a request body for saliency processing.""" - if path in ("/api/chat", "/v1/chat/completions"): - messages = body.get("messages", []) - parts = [] - for m in messages: - content = m.get("content", "") - if isinstance(content, str): - parts.append(content) - elif isinstance(content, list): - # Anthropic-style content blocks - for block in content: - if isinstance(block, dict) and block.get("type") == "text": - parts.append(block.get("text", "")) - # Include top-level system field (Anthropic format) - if body.get("system"): - parts.insert(0, body["system"]) + if path in ("/api/chat", "/v1/chat/completions", "/v1/messages"): + parts: list[str] = [] + # system can be a plain string OR a list of content-block dicts (Anthropic format) + system = body.get("system") + if system: + parts.extend(_extract_text_strings(system)) + for m in body.get("messages", []): + parts.extend(_extract_text_strings(m.get("content", ""))) return " ".join(parts) - if path == "/v1/messages": - return extract_prompt_text(body, "/v1/chat/completions") return body.get("prompt", "") def inject_recollection_anthropic(body: dict, block: str) -> dict: """ Inject a recollection block into an Anthropic Messages API request. - Anthropic uses a top-level 'system' string field rather than a system message. + Anthropic uses a top-level 'system' field — either a plain string or a list of + content-block dicts. Normalise to a plain string before prepending the block. """ body = dict(body) existing = body.get("system") or "" + if isinstance(existing, list): + # Flatten content-block list to plain text + existing = " ".join( + b.get("text", "") for b in existing + if isinstance(b, dict) and b.get("type") == "text" + ) body["system"] = block + ("\n\n" + existing if existing else "") return body @@ -322,25 +426,29 @@ async def chat(request: Request) -> Response: model = body.get("model", "unknown") upstream = cfg["upstream_ollama"] min_len = cfg["detection"]["min_length"] - - body = await process_prompt(body, "/api/chat", pool, cfg) - - text, raw = await call_ollama("/api/chat", body, upstream) - sess = session_key(model, body.get("messages", [])) - count = record_and_check(sess, text, min_len) - - if count >= 2: - log.warning("loop detected model=%s session=%s count=%d", model, sess[1], count) - body, override = apply_mitigations(body, count, cfg) - if override is not None: - raw["message"] = {"role": "assistant", "content": override} - raw["loop_detected"] = True - return Response(content=json.dumps(raw), media_type="application/json") + log.info("chat route=/api/chat model=%s", model) + try: + body = await process_prompt(body, "/api/chat", pool, cfg) text, raw = await call_ollama("/api/chat", body, upstream) - record_and_check(sess, text, min_len) - - raw["message"] = {"role": "assistant", "content": text} - return Response(content=json.dumps(raw), media_type="application/json") + sess = session_key(model, body.get("messages", [])) + count = record_and_check(sess, text, min_len) + if count >= 2: + log.warning("loop_detected model=%s session=%s count=%d", model, sess[1], count) + body, override = apply_mitigations(body, count, cfg) + if override is not None: + raw["message"] = {"role": "assistant", "content": override} + raw["loop_detected"] = True + return Response(content=json.dumps(raw), media_type="application/json") + text, raw = await call_ollama("/api/chat", body, upstream) + record_and_check(sess, text, min_len) + raw["message"] = {"role": "assistant", "content": text} + return Response(content=json.dumps(raw), media_type="application/json") + except UpstreamError as exc: + log.error("chat_upstream_error route=/api/chat model=%s %s", model, exc) + return Response(content=exc.content, status_code=exc.status_code, media_type=exc.content_type) + except Exception as exc: + log.exception("chat_error route=/api/chat model=%s %s: %s", model, type(exc).__name__, exc) + raise @app.post("/api/generate") @@ -351,27 +459,30 @@ async def generate(request: Request) -> Response: model = body.get("model", "unknown") upstream = cfg["upstream_ollama"] min_len = cfg["detection"]["min_length"] - - body = await process_prompt(body, "/api/generate", pool, cfg) - - messages = [{"role": "user", "content": body.get("prompt", "")}] - sess = session_key(model, messages) - - text, raw = await call_ollama("/api/generate", body, upstream) - count = record_and_check(sess, text, min_len) - - if count >= 2: - log.warning("loop detected model=%s session=%s count=%d", model, sess[1], count) - body, override = apply_mitigations(body, count, cfg) - if override is not None: - raw["response"] = override - raw["loop_detected"] = True - return Response(content=json.dumps(raw), media_type="application/json") + log.info("chat route=/api/generate model=%s", model) + try: + body = await process_prompt(body, "/api/generate", pool, cfg) + messages = [{"role": "user", "content": body.get("prompt", "")}] + sess = session_key(model, messages) text, raw = await call_ollama("/api/generate", body, upstream) - record_and_check(sess, text, min_len) - - raw["response"] = text - return Response(content=json.dumps(raw), media_type="application/json") + count = record_and_check(sess, text, min_len) + if count >= 2: + log.warning("loop_detected model=%s session=%s count=%d", model, sess[1], count) + body, override = apply_mitigations(body, count, cfg) + if override is not None: + raw["response"] = override + raw["loop_detected"] = True + return Response(content=json.dumps(raw), media_type="application/json") + text, raw = await call_ollama("/api/generate", body, upstream) + record_and_check(sess, text, min_len) + raw["response"] = text + return Response(content=json.dumps(raw), media_type="application/json") + except UpstreamError as exc: + log.error("chat_upstream_error route=/api/generate model=%s %s", model, exc) + return Response(content=exc.content, status_code=exc.status_code, media_type=exc.content_type) + except Exception as exc: + log.exception("chat_error route=/api/generate model=%s %s: %s", model, type(exc).__name__, exc) + raise # --------------------------------------------------------------------------- @@ -386,33 +497,32 @@ async def anthropic_messages(request: Request) -> Response: model = body.get("model", "unknown") upstream = cfg["upstream_anthropic"] min_len = cfg["detection"]["min_length"] - - headers = _relay_headers(request, ANTHROPIC_RELAY_HEADERS) - # Ensure anthropic-version is present - if "anthropic-version" not in {k.lower() for k in headers}: - headers["anthropic-version"] = "2023-06-01" - - body = await process_prompt(body, "/v1/messages", pool, cfg) - - # Use messages list as session key (same logic as /api/chat) - messages = body.get("messages", []) - sess = session_key(model, messages) - - text, raw = await call_anthropic(body, upstream, headers) - count = record_and_check(sess, text, min_len) - - if count >= 2: - log.warning("loop detected model=%s session=%s count=%d", model, sess[1], count) - body, override = apply_mitigations(body, count, cfg) - if override is not None: - # Return a minimal Anthropic-format response with the override message - raw["content"] = [{"type": "text", "text": override}] - raw["loop_detected"] = True - return Response(content=json.dumps(raw), media_type="application/json") + log.info("chat route=/v1/messages model=%s upstream=%s", model, upstream) + try: + headers = _relay_headers(request, ANTHROPIC_RELAY_HEADERS) + if "anthropic-version" not in {k.lower() for k in headers}: + headers["anthropic-version"] = "2023-06-01" + body = await process_prompt(body, "/v1/messages", pool, cfg) + messages = body.get("messages", []) + sess = session_key(model, messages) text, raw = await call_anthropic(body, upstream, headers) - record_and_check(sess, text, min_len) - - return Response(content=json.dumps(raw), media_type="application/json") + count = record_and_check(sess, text, min_len) + if count >= 2: + log.warning("loop_detected model=%s session=%s count=%d", model, sess[1], count) + body, override = apply_mitigations(body, count, cfg) + if override is not None: + raw["content"] = [{"type": "text", "text": override}] + raw["loop_detected"] = True + return Response(content=json.dumps(raw), media_type="application/json") + text, raw = await call_anthropic(body, upstream, headers) + record_and_check(sess, text, min_len) + return Response(content=json.dumps(raw), media_type="application/json") + except UpstreamError as exc: + log.error("chat_upstream_error route=/v1/messages model=%s %s", model, exc) + return Response(content=exc.content, status_code=exc.status_code, media_type=exc.content_type) + except Exception as exc: + log.exception("chat_error route=/v1/messages model=%s %s: %s", model, type(exc).__name__, exc) + raise # --------------------------------------------------------------------------- @@ -427,29 +537,31 @@ async def openai_chat_completions(request: Request) -> Response: model = body.get("model", "unknown") upstream = cfg["upstream_openai"] min_len = cfg["detection"]["min_length"] - - headers = _relay_headers(request, OPENAI_RELAY_HEADERS) - - body = await process_prompt(body, "/v1/chat/completions", pool, cfg) - - messages = body.get("messages", []) - sess = session_key(model, messages) - - text, raw = await call_openai(body, upstream, headers) - count = record_and_check(sess, text, min_len) - - if count >= 2: - log.warning("loop detected model=%s session=%s count=%d", model, sess[1], count) - body, override = apply_mitigations(body, count, cfg) - if override is not None: - if raw.get("choices"): - raw["choices"][0]["message"]["content"] = override - raw["loop_detected"] = True - return Response(content=json.dumps(raw), media_type="application/json") + log.info("chat route=/v1/chat/completions model=%s upstream=%s", model, upstream) + try: + headers = _relay_headers(request, OPENAI_RELAY_HEADERS) + body = await process_prompt(body, "/v1/chat/completions", pool, cfg) + messages = body.get("messages", []) + sess = session_key(model, messages) text, raw = await call_openai(body, upstream, headers) - record_and_check(sess, text, min_len) - - return Response(content=json.dumps(raw), media_type="application/json") + count = record_and_check(sess, text, min_len) + if count >= 2: + log.warning("loop_detected model=%s session=%s count=%d", model, sess[1], count) + body, override = apply_mitigations(body, count, cfg) + if override is not None: + if raw.get("choices"): + raw["choices"][0]["message"]["content"] = override + raw["loop_detected"] = True + return Response(content=json.dumps(raw), media_type="application/json") + text, raw = await call_openai(body, upstream, headers) + record_and_check(sess, text, min_len) + return Response(content=json.dumps(raw), media_type="application/json") + except UpstreamError as exc: + log.error("chat_upstream_error route=/v1/chat/completions model=%s %s", model, exc) + return Response(content=exc.content, status_code=exc.status_code, media_type=exc.content_type) + except Exception as exc: + log.exception("chat_error route=/v1/chat/completions model=%s %s: %s", model, type(exc).__name__, exc) + raise # --------------------------------------------------------------------------- @@ -696,6 +808,626 @@ async def test_reset(scenario_id: str, request: Request) -> dict: return await reset_scenario(pool, scenario_id) +# --------------------------------------------------------------------------- +# /graph — knowledge graph explorer +# --------------------------------------------------------------------------- + +@app.get("/graph/data") +async def graph_data( + request: Request, + dim: str = "", + min_saliency: float = 1.0, + limit: int = 400, + center: str = "", +) -> dict: + """ + Return nodes and edges for the knowledge graph explorer. + - dim: filter to a single dimension (empty = all) + - min_saliency: only include concepts above this threshold + - limit: max edges to return + - center: if set, return the neighbourhood of this concept token + """ + pool = request.app.state.pool + base_query = """ + SELECT u.id AS concept_id, u.parent_id, u.dim_id, + u.is_isa, u.confidence, + sc.token AS concept_token, + COALESCE(sc.saliency, 0.0) AS concept_saliency, + sp.token AS parent_token, + COALESCE(sp.saliency, 0.0) AS parent_saliency, + sd.token AS dim_token + FROM urd u + JOIN soas sc ON sc.id = u.id + JOIN soas sp ON sp.id = u.parent_id + JOIN soas sd ON sd.id = u.dim_id + WHERE u.id != u.parent_id + """ + + async with pool.acquire() as conn: + if center: + row = await conn.fetchrow("SELECT id FROM soas WHERE token = $1", center) + if not row: + return {"nodes": [], "edges": [], "dim_list": [], "total_nodes": 0, "total_edges": 0} + center_id = row["id"] + rows = await conn.fetch( + base_query + " AND (u.id = $1 OR u.parent_id = $1) AND ($2 = '' OR sd.token = $2) LIMIT $3", + center_id, dim, limit, + ) + else: + rows = await conn.fetch( + base_query + " AND ($1 = '' OR sd.token = $1) AND sc.saliency >= $2 ORDER BY sc.saliency DESC LIMIT $3", + dim, min_saliency, limit, + ) + + nodes_map: dict = {} + edges = [] + + for r in rows: + cid, pid, dtok = r["concept_id"], r["parent_id"], r["dim_token"] + + if cid not in nodes_map: + nodes_map[cid] = {"id": cid, "token": r["concept_token"], + "saliency": round(float(r["concept_saliency"]), 3), "dc": {}} + if pid not in nodes_map: + nodes_map[pid] = {"id": pid, "token": r["parent_token"], + "saliency": round(float(r["parent_saliency"]), 3), "dc": {}} + + nodes_map[cid]["dc"][dtok] = nodes_map[cid]["dc"].get(dtok, 0) + 1 + + edges.append({"source": cid, "target": pid, "dim": dtok, + "is_isa": r["is_isa"], "confidence": round(float(r["confidence"]), 3)}) + + nodes = [] + for n in nodes_map.values(): + primary = max(n["dc"], key=n["dc"].get) if n["dc"] else "other" + nodes.append({"id": n["id"], "token": n["token"], "saliency": n["saliency"], + "primary_dim": primary, "dims": list(n["dc"].keys())}) + + dim_list = sorted({e["dim"] for e in edges}) + return {"nodes": nodes, "edges": edges, "dim_list": dim_list, + "total_nodes": len(nodes), "total_edges": len(edges)} + + +GRAPH_HTML = """ + + + +Festinger — Knowledge Graph + + + + + + + + +
+ +
+
Loading graph…
+
+

No nodes match the current filters.

+

Try lowering the saliency threshold or reloading after the agent has run.

+
+
+
+ + + +
+
+ + + + +""" + + +@app.get("/graph", response_class=HTMLResponse) +async def graph_explorer() -> str: + return GRAPH_HTML + + # --------------------------------------------------------------------------- # /admin — minimal HTML UI # --------------------------------------------------------------------------- @@ -739,7 +1471,9 @@ ADMIN_HTML = """

Festinger

-

Ollama-compatible inference middleware — loop detection & Recollections world model

+

Ollama-compatible inference middleware — loop detection & Recollections world model +  — Knowledge Graph Explorer +

World model stats

@@ -924,13 +1658,17 @@ async def admin() -> str: @app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "HEAD"]) async def passthrough(path: str, request: Request) -> Response: cfg = request.app.state.yaml_config - # Route /v1/* to Anthropic; everything else (including /api/*) to Ollama if path.startswith("v1/"): upstream = cfg["upstream_anthropic"] relay_headers = ANTHROPIC_RELAY_HEADERS + provider = "anthropic" else: upstream = cfg["upstream_ollama"] relay_headers = None + provider = "ollama" + + url = f"{upstream}/{path}" + log.info("passthrough %s %s → %s", request.method, path, url) body = await request.body() if relay_headers: @@ -938,13 +1676,25 @@ async def passthrough(path: str, request: Request) -> Response: else: headers = {k: v for k, v in request.headers.items() if k.lower() != "host"} - async with httpx.AsyncClient(timeout=120.0) as client: - r = await client.request( - request.method, - f"{upstream}/{path}", - content=body, - headers=headers, - ) + t0 = time.perf_counter() + try: + async with httpx.AsyncClient(timeout=120.0) as client: + r = await client.request(request.method, url, content=body, headers=headers) + except httpx.TimeoutException as exc: + log.error("passthrough_timeout provider=%s url=%s after=%.0fs %s", + provider, url, time.perf_counter() - t0, exc) + raise + except httpx.RequestError as exc: + log.error("passthrough_connect_error provider=%s url=%s %s: %s", + provider, url, type(exc).__name__, exc) + raise + + ms = (time.perf_counter() - t0) * 1000 + if not r.is_success: + log.warning("passthrough_error provider=%s url=%s status=%d %.0fms body=%.300s", + provider, url, r.status_code, ms, r.text) + else: + log.info("passthrough_ok provider=%s status=%d %.0fms", provider, r.status_code, ms) return Response( content=r.content,