From ce67d3ab1a54027c1698d018a5d756bc5c534dcc Mon Sep 17 00:00:00 2001 From: jenstandstad Date: Mon, 20 Apr 2026 17:19:05 +0200 Subject: [PATCH] Removing streaming as this caused errors --- plugins/festinger/festinger/main.py | 95 ++++++++++++++++++++++++++++- 1 file changed, 94 insertions(+), 1 deletion(-) diff --git a/plugins/festinger/festinger/main.py b/plugins/festinger/festinger/main.py index 797f6a1..965e8ff 100644 --- a/plugins/festinger/festinger/main.py +++ b/plugins/festinger/festinger/main.py @@ -221,6 +221,92 @@ OPENAI_RELAY_HEADERS = ( ) +def _anthropic_to_sse(data: dict) -> bytes: + """ + Convert a non-streaming Anthropic Messages API response into SSE bytes + that litellm expects when it sent stream=True. + + Emits the minimal event sequence litellm's streaming parser requires: + message_start → content_block_start → content_block_delta(s) + → content_block_stop → message_delta → message_stop + """ + def sse(event: str, payload: dict) -> str: + return f"event: {event}\ndata: {json.dumps(payload)}\n\n" + + lines: list[str] = [] + + # message_start + lines.append(sse("message_start", { + "type": "message_start", + "message": { + "id": data.get("id", "msg_festinger"), + "type": "message", + "role": "assistant", + "content": [], + "model": data.get("model", ""), + "stop_reason": None, + "stop_sequence": None, + "usage": data.get("usage", {"input_tokens": 0, "output_tokens": 0}), + }, + })) + lines.append(sse("ping", {"type": "ping"})) + + content_blocks = data.get("content", []) + for idx, block in enumerate(content_blocks): + btype = block.get("type", "text") + + if btype == "text": + lines.append(sse("content_block_start", { + "type": "content_block_start", + "index": idx, + "content_block": {"type": "text", "text": ""}, + })) + lines.append(sse("content_block_delta", { + "type": "content_block_delta", + "index": idx, + "delta": {"type": "text_delta", "text": block.get("text", "")}, + })) + + elif btype == "tool_use": + lines.append(sse("content_block_start", { + "type": "content_block_start", + "index": idx, + "content_block": { + "type": "tool_use", + "id": block.get("id", ""), + "name": block.get("name", ""), + "input": {}, + }, + })) + lines.append(sse("content_block_delta", { + "type": "content_block_delta", + "index": idx, + "delta": { + "type": "input_json_delta", + "partial_json": json.dumps(block.get("input", {})), + }, + })) + + lines.append(sse("content_block_stop", { + "type": "content_block_stop", + "index": idx, + })) + + # message_delta + message_stop + usage = data.get("usage", {}) + lines.append(sse("message_delta", { + "type": "message_delta", + "delta": { + "stop_reason": data.get("stop_reason", "end_turn"), + "stop_sequence": data.get("stop_sequence"), + }, + "usage": {"output_tokens": usage.get("output_tokens", 0)}, + })) + lines.append(sse("message_stop", {"type": "message_stop"})) + + return "".join(lines).encode() + + async def call_anthropic(body: dict, upstream: str, headers: dict) -> tuple[str, dict]: """ Forward a request to the Anthropic Messages API (non-streaming). @@ -528,10 +614,12 @@ async def anthropic_messages(request: Request) -> Response: cfg = request.app.state.yaml_config pool = request.app.state.pool body = await request.json() + # Capture streaming intent BEFORE call_anthropic forces stream=False + original_stream: bool = bool(body.get("stream", False)) model = body.get("model", "unknown") upstream = cfg["upstream_anthropic"] min_len = cfg["detection"]["min_length"] - log.info("chat route=/v1/messages model=%s upstream=%s", model, upstream) + log.info("chat route=/v1/messages model=%s upstream=%s stream=%s", model, upstream, original_stream) try: headers = _relay_headers(request, ANTHROPIC_RELAY_HEADERS) if "anthropic-version" not in {k.lower() for k in headers}: @@ -547,9 +635,14 @@ async def anthropic_messages(request: Request) -> Response: if override is not None: raw["content"] = [{"type": "text", "text": override}] raw["loop_detected"] = True + if original_stream: + return Response(content=_anthropic_to_sse(raw), media_type="text/event-stream") return Response(content=json.dumps(raw), media_type="application/json") text, raw = await call_anthropic(body, upstream, headers) record_and_check(sess, text, min_len) + if original_stream: + log.info("streaming_response provider=anthropic model=%s converting json→sse", model) + return Response(content=_anthropic_to_sse(raw), media_type="text/event-stream") return Response(content=json.dumps(raw), media_type="application/json") except UpstreamError as exc: log.error("chat_upstream_error route=/v1/messages model=%s %s", model, exc)