Removing streaming as this caused errors

This commit is contained in:
2026-04-20 17:19:05 +02:00
parent de907fd29a
commit ce67d3ab1a
+94 -1
View File
@@ -221,6 +221,92 @@ OPENAI_RELAY_HEADERS = (
)
def _anthropic_to_sse(data: dict) -> bytes:
"""
Convert a non-streaming Anthropic Messages API response into SSE bytes
that litellm expects when it sent stream=True.
Emits the minimal event sequence litellm's streaming parser requires:
message_start → content_block_start → content_block_delta(s)
→ content_block_stop → message_delta → message_stop
"""
def sse(event: str, payload: dict) -> str:
return f"event: {event}\ndata: {json.dumps(payload)}\n\n"
lines: list[str] = []
# message_start
lines.append(sse("message_start", {
"type": "message_start",
"message": {
"id": data.get("id", "msg_festinger"),
"type": "message",
"role": "assistant",
"content": [],
"model": data.get("model", ""),
"stop_reason": None,
"stop_sequence": None,
"usage": data.get("usage", {"input_tokens": 0, "output_tokens": 0}),
},
}))
lines.append(sse("ping", {"type": "ping"}))
content_blocks = data.get("content", [])
for idx, block in enumerate(content_blocks):
btype = block.get("type", "text")
if btype == "text":
lines.append(sse("content_block_start", {
"type": "content_block_start",
"index": idx,
"content_block": {"type": "text", "text": ""},
}))
lines.append(sse("content_block_delta", {
"type": "content_block_delta",
"index": idx,
"delta": {"type": "text_delta", "text": block.get("text", "")},
}))
elif btype == "tool_use":
lines.append(sse("content_block_start", {
"type": "content_block_start",
"index": idx,
"content_block": {
"type": "tool_use",
"id": block.get("id", ""),
"name": block.get("name", ""),
"input": {},
},
}))
lines.append(sse("content_block_delta", {
"type": "content_block_delta",
"index": idx,
"delta": {
"type": "input_json_delta",
"partial_json": json.dumps(block.get("input", {})),
},
}))
lines.append(sse("content_block_stop", {
"type": "content_block_stop",
"index": idx,
}))
# message_delta + message_stop
usage = data.get("usage", {})
lines.append(sse("message_delta", {
"type": "message_delta",
"delta": {
"stop_reason": data.get("stop_reason", "end_turn"),
"stop_sequence": data.get("stop_sequence"),
},
"usage": {"output_tokens": usage.get("output_tokens", 0)},
}))
lines.append(sse("message_stop", {"type": "message_stop"}))
return "".join(lines).encode()
async def call_anthropic(body: dict, upstream: str, headers: dict) -> tuple[str, dict]:
"""
Forward a request to the Anthropic Messages API (non-streaming).
@@ -528,10 +614,12 @@ async def anthropic_messages(request: Request) -> Response:
cfg = request.app.state.yaml_config
pool = request.app.state.pool
body = await request.json()
# Capture streaming intent BEFORE call_anthropic forces stream=False
original_stream: bool = bool(body.get("stream", False))
model = body.get("model", "unknown")
upstream = cfg["upstream_anthropic"]
min_len = cfg["detection"]["min_length"]
log.info("chat route=/v1/messages model=%s upstream=%s", model, upstream)
log.info("chat route=/v1/messages model=%s upstream=%s stream=%s", model, upstream, original_stream)
try:
headers = _relay_headers(request, ANTHROPIC_RELAY_HEADERS)
if "anthropic-version" not in {k.lower() for k in headers}:
@@ -547,9 +635,14 @@ async def anthropic_messages(request: Request) -> Response:
if override is not None:
raw["content"] = [{"type": "text", "text": override}]
raw["loop_detected"] = True
if original_stream:
return Response(content=_anthropic_to_sse(raw), media_type="text/event-stream")
return Response(content=json.dumps(raw), media_type="application/json")
text, raw = await call_anthropic(body, upstream, headers)
record_and_check(sess, text, min_len)
if original_stream:
log.info("streaming_response provider=anthropic model=%s converting json→sse", model)
return Response(content=_anthropic_to_sse(raw), media_type="text/event-stream")
return Response(content=json.dumps(raw), media_type="application/json")
except UpstreamError as exc:
log.error("chat_upstream_error route=/v1/messages model=%s %s", model, exc)