9814d18e8c
- test_connectivity.py: connectivity tests for all four endpoint types (anthropic, openai, lm_studio, hermes, agent0) — treats no-credits as success - test_hermes.py: raw WebSocket frame logger used to reverse-engineer protocol - Fix handle_hermes: skip prompt.submit ack frame, read full text from message.complete payload.text, always raise on status==error - Fix requirements.txt: use >= pins (fastapi/uvicorn versions didn't exist) - Fix dev.sh: prefer python3.12 for venv (mcp>=1.9.0 requires 3.10+) - Remove ANTHROPIC_KEY env var dependency from server.py (keys come from DB) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
351 lines
14 KiB
Python
351 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
agent-inference connectivity tests.
|
|
|
|
Tests reachability for all four supported inference endpoint types:
|
|
- anthropic : Anthropic cloud API (via litellm)
|
|
- openai : OpenAI or compatible cloud API (via litellm)
|
|
- lm_studio : Local LM Studio (OpenAI-compatible, no key)
|
|
- hermes : Hermes Agent dashboard (JSON-RPC WebSocket)
|
|
- agent0 : Agent Zero (MCP streamable-http)
|
|
|
|
A test PASSES if the endpoint is reachable — even if the API key has
|
|
no credits. A 400/402/429 from the provider still means connectivity works.
|
|
A test FAILS only on network errors (connection refused, timeout, DNS failure)
|
|
or bad credentials (401/403).
|
|
|
|
Configuration via environment variables (or a local .env file):
|
|
ANTHROPIC_API_KEY — Anthropic API key
|
|
OPENAI_API_KEY — OpenAI API key
|
|
OPENAI_BASE_URL — optional, override for OpenAI-compatible endpoint
|
|
LM_STUDIO_URL — LM Studio base URL (default: http://localhost:1234)
|
|
LM_STUDIO_MODEL — model to use (default: first available)
|
|
HERMES_URL — Hermes dashboard URL (default: http://localhost:50007)
|
|
AGENT0_URL — Agent Zero base URL
|
|
AGENT0_MCP_KEY — Agent Zero MCP token
|
|
|
|
Usage:
|
|
python test_connectivity.py
|
|
python test_connectivity.py --anthropic --hermes # run specific tests only
|
|
"""
|
|
|
|
import argparse
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
from typing import Optional
|
|
|
|
# Load .env if present
|
|
_env_file = os.path.join(os.path.dirname(__file__), ".env")
|
|
if os.path.exists(_env_file):
|
|
with open(_env_file) as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line and not line.startswith("#") and "=" in line:
|
|
k, _, v = line.partition("=")
|
|
os.environ.setdefault(k.strip(), v.strip())
|
|
|
|
|
|
# ── ANSI colours ──────────────────────────────────────────────────────────────
|
|
|
|
GREEN = "\033[32m"
|
|
RED = "\033[31m"
|
|
YELLOW = "\033[33m"
|
|
GREY = "\033[90m"
|
|
RESET = "\033[0m"
|
|
|
|
def ok(label: str, detail: str = ""):
|
|
suffix = f" {GREY}{detail}{RESET}" if detail else ""
|
|
print(f" {GREEN}✓{RESET} {label}{suffix}")
|
|
|
|
def fail(label: str, detail: str = ""):
|
|
suffix = f" {GREY}{detail}{RESET}" if detail else ""
|
|
print(f" {RED}✗{RESET} {label}{suffix}")
|
|
|
|
def skip(label: str, reason: str = "not configured"):
|
|
print(f" {YELLOW}○{RESET} {label} {GREY}({reason}){RESET}")
|
|
|
|
def section(title: str):
|
|
print(f"\n{title}")
|
|
print("─" * len(title))
|
|
|
|
|
|
# ── Anthropic ─────────────────────────────────────────────────────────────────
|
|
|
|
async def test_anthropic():
|
|
section("Anthropic")
|
|
api_key = os.environ.get("ANTHROPIC_API_KEY", "")
|
|
if not api_key:
|
|
skip("API reachability", "ANTHROPIC_API_KEY not set")
|
|
return
|
|
|
|
import httpx
|
|
try:
|
|
async with httpx.AsyncClient(timeout=15.0) as client:
|
|
r = await client.post(
|
|
"https://api.anthropic.com/v1/messages",
|
|
headers={
|
|
"x-api-key": api_key,
|
|
"anthropic-version": "2023-06-01",
|
|
"content-type": "application/json",
|
|
},
|
|
json={
|
|
"model": "claude-haiku-4-5-20251001",
|
|
"max_tokens": 16,
|
|
"messages": [{"role": "user", "content": "ping"}],
|
|
},
|
|
)
|
|
|
|
if r.status_code == 200:
|
|
ok("API reachable + responded", f"HTTP {r.status_code}")
|
|
elif r.status_code in (400, 402, 529):
|
|
body = r.json()
|
|
err = body.get("error", {}).get("message", "")
|
|
if "credit" in err.lower() or "balance" in err.lower():
|
|
ok("API reachable (no credits)", f"HTTP {r.status_code} — {err[:80]}")
|
|
else:
|
|
ok("API reachable", f"HTTP {r.status_code} — {err[:80]}")
|
|
elif r.status_code in (401, 403):
|
|
fail("Bad API key", f"HTTP {r.status_code}")
|
|
else:
|
|
fail("Unexpected response", f"HTTP {r.status_code} — {r.text[:120]}")
|
|
|
|
except httpx.ConnectError as e:
|
|
fail("Connection failed", str(e))
|
|
except httpx.TimeoutException:
|
|
fail("Timeout (15s)")
|
|
except Exception as e:
|
|
fail("Error", str(e))
|
|
|
|
|
|
# ── OpenAI ────────────────────────────────────────────────────────────────────
|
|
|
|
async def test_openai():
|
|
section("OpenAI")
|
|
api_key = os.environ.get("OPENAI_API_KEY", "")
|
|
base_url = os.environ.get("OPENAI_BASE_URL", "https://api.openai.com/v1")
|
|
if not api_key:
|
|
skip("API reachability", "OPENAI_API_KEY not set")
|
|
return
|
|
|
|
import httpx
|
|
try:
|
|
async with httpx.AsyncClient(timeout=15.0) as client:
|
|
r = await client.post(
|
|
f"{base_url.rstrip('/')}/chat/completions",
|
|
headers={
|
|
"Authorization": f"Bearer {api_key}",
|
|
"content-type": "application/json",
|
|
},
|
|
json={
|
|
"model": "gpt-4o-mini",
|
|
"max_tokens": 16,
|
|
"messages": [{"role": "user", "content": "ping"}],
|
|
},
|
|
)
|
|
|
|
if r.status_code == 200:
|
|
ok("API reachable + responded", f"HTTP {r.status_code}")
|
|
elif r.status_code in (400, 402, 429):
|
|
body = r.json()
|
|
err = (body.get("error") or {}).get("message", "")
|
|
if any(w in err.lower() for w in ("quota", "credit", "balance", "insufficient")):
|
|
ok("API reachable (quota/credits issue)", f"HTTP {r.status_code} — {err[:80]}")
|
|
else:
|
|
ok("API reachable", f"HTTP {r.status_code} — {err[:80]}")
|
|
elif r.status_code in (401, 403):
|
|
fail("Bad API key", f"HTTP {r.status_code}")
|
|
else:
|
|
fail("Unexpected response", f"HTTP {r.status_code} — {r.text[:120]}")
|
|
|
|
except httpx.ConnectError as e:
|
|
fail("Connection failed", str(e)[:120])
|
|
except httpx.TimeoutException:
|
|
fail("Timeout (15s)")
|
|
except Exception as e:
|
|
fail("Error", str(e))
|
|
|
|
|
|
# ── LM Studio ─────────────────────────────────────────────────────────────────
|
|
|
|
async def test_lm_studio():
|
|
section("LM Studio (local)")
|
|
base_url = os.environ.get("LM_STUDIO_URL", "http://localhost:1234").rstrip("/")
|
|
model = os.environ.get("LM_STUDIO_MODEL", "")
|
|
|
|
import httpx
|
|
try:
|
|
# Step 1: list models
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
r = await client.get(f"{base_url}/v1/models")
|
|
|
|
if r.status_code != 200:
|
|
fail("Models endpoint", f"HTTP {r.status_code}")
|
|
return
|
|
|
|
models = [m["id"] for m in r.json().get("data", [])]
|
|
if not models:
|
|
fail("No models loaded in LM Studio")
|
|
return
|
|
|
|
ok(f"Reachable — {len(models)} model(s)", ", ".join(models[:3]))
|
|
|
|
# Step 2: minimal inference — try each model until one works
|
|
candidates = [model] if model else models
|
|
inference_ok = False
|
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
|
for chosen in candidates:
|
|
r2 = await client.post(
|
|
f"{base_url}/v1/chat/completions",
|
|
headers={"content-type": "application/json"},
|
|
json={
|
|
"model": chosen,
|
|
"max_tokens": 16,
|
|
"messages": [{"role": "user", "content": "ping"}],
|
|
},
|
|
)
|
|
if r2.status_code == 200:
|
|
content = r2.json()["choices"][0]["message"]["content"]
|
|
ok(f"Inference OK ({chosen})", repr(content[:60]))
|
|
inference_ok = True
|
|
break
|
|
body = r2.text[:120]
|
|
if any(s in body for s in ("Failed to load model", "No models loaded", "not loaded")):
|
|
continue # model in catalog but not loaded — try next
|
|
fail(f"Inference failed ({chosen})", f"HTTP {r2.status_code} — {body}")
|
|
inference_ok = True # stop trying, this is a real error
|
|
break
|
|
|
|
if not inference_ok:
|
|
fail("No model currently loaded in LM Studio", f"tried: {', '.join(candidates[:3])}")
|
|
|
|
except httpx.ConnectError:
|
|
fail("Not reachable", f"{base_url} — is LM Studio running?")
|
|
except httpx.TimeoutException:
|
|
fail("Timeout")
|
|
except Exception as e:
|
|
fail("Error", str(e))
|
|
|
|
|
|
# ── Hermes ────────────────────────────────────────────────────────────────────
|
|
|
|
async def test_hermes():
|
|
section("Hermes (WebSocket)")
|
|
endpoint = os.environ.get("HERMES_URL", "http://localhost:50007").rstrip("/")
|
|
|
|
import httpx
|
|
try:
|
|
from websockets.asyncio.client import connect as ws_connect
|
|
except ImportError:
|
|
from websockets.client import connect as ws_connect # type: ignore
|
|
|
|
# Step 1: fetch token from dashboard HTML
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
r = await client.get(f"{endpoint}/")
|
|
m = re.search(r'__HERMES_SESSION_TOKEN__\s*=\s*"([^"]+)"', r.text)
|
|
if not m:
|
|
fail("Token fetch", "Could not find __HERMES_SESSION_TOKEN__ in dashboard HTML")
|
|
return
|
|
token = m.group(1)
|
|
ok("Dashboard reachable + token fetched", f"{token[:12]}…")
|
|
except httpx.ConnectError:
|
|
fail("Dashboard not reachable", f"{endpoint} — is gerhard-dashboard running?")
|
|
return
|
|
except Exception as e:
|
|
fail("Dashboard error", str(e))
|
|
return
|
|
|
|
# Step 2: WebSocket + session.create
|
|
ws_scheme = "wss" if endpoint.startswith("https") else "ws"
|
|
ws_url = f"{ws_scheme}://{endpoint.split('://', 1)[-1]}/api/ws?token={token}"
|
|
|
|
try:
|
|
async with ws_connect(ws_url) as ws:
|
|
await ws.send(json.dumps({"jsonrpc": "2.0", "id": "c1",
|
|
"method": "session.create", "params": {}}))
|
|
session_id = None
|
|
for _ in range(10): # drain up to 10 frames looking for the ack
|
|
raw = await asyncio.wait_for(ws.recv(), timeout=10.0)
|
|
msg = json.loads(raw)
|
|
if msg.get("id") == "c1":
|
|
result = msg.get("result") or {}
|
|
session_id = result.get("session_id") or result.get("id")
|
|
break
|
|
|
|
if session_id:
|
|
ok("WebSocket + session.create", f"session={session_id}")
|
|
else:
|
|
fail("session.create — no session_id in response")
|
|
|
|
except Exception as e:
|
|
fail("WebSocket error", str(e)[:120])
|
|
|
|
|
|
# ── Agent Zero ────────────────────────────────────────────────────────────────
|
|
|
|
async def test_agent0():
|
|
section("Agent Zero (MCP)")
|
|
base_url = os.environ.get("AGENT0_URL", "")
|
|
mcp_key = os.environ.get("AGENT0_MCP_KEY", "")
|
|
|
|
if not base_url or not mcp_key:
|
|
skip("MCP endpoint", "AGENT0_URL and AGENT0_MCP_KEY not set")
|
|
return
|
|
|
|
mcp_url = f"{base_url.rstrip('/')}/mcp/t-{mcp_key}/http"
|
|
|
|
import httpx
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
r = await client.get(mcp_url, headers={"Accept": "application/json"})
|
|
|
|
if r.status_code in (200, 405):
|
|
ok("MCP endpoint reachable", f"HTTP {r.status_code} at {mcp_url}")
|
|
elif r.status_code == 401:
|
|
fail("Bad MCP key", f"HTTP {r.status_code}")
|
|
else:
|
|
ok("MCP endpoint reachable", f"HTTP {r.status_code}")
|
|
|
|
except httpx.ConnectError as e:
|
|
fail("Not reachable", str(e)[:120])
|
|
except httpx.TimeoutException:
|
|
fail("Timeout (10s)")
|
|
except Exception as e:
|
|
fail("Error", str(e))
|
|
|
|
|
|
# ── Runner ────────────────────────────────────────────────────────────────────
|
|
|
|
async def main(args):
|
|
run_all = not any([args.anthropic, args.openai, args.lm_studio, args.hermes, args.agent0])
|
|
|
|
print(f"\nagent-inference connectivity tests")
|
|
print(f"{'='*40}")
|
|
|
|
if run_all or args.anthropic:
|
|
await test_anthropic()
|
|
if run_all or args.openai:
|
|
await test_openai()
|
|
if run_all or args.lm_studio:
|
|
await test_lm_studio()
|
|
if run_all or args.hermes:
|
|
await test_hermes()
|
|
if run_all or args.agent0:
|
|
await test_agent0()
|
|
|
|
print()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="agent-inference connectivity tests")
|
|
parser.add_argument("--anthropic", action="store_true")
|
|
parser.add_argument("--openai", action="store_true")
|
|
parser.add_argument("--lm-studio", dest="lm_studio", action="store_true")
|
|
parser.add_argument("--hermes", action="store_true")
|
|
parser.add_argument("--agent0", action="store_true")
|
|
args = parser.parse_args()
|
|
asyncio.run(main(args))
|