Serialize local model calls and skip concurrent context discovery
LM Studio and Ollama run one model on one GPU — concurrent requests cause crashes. Two fixes: 1. Per-upstream semaphore (concurrency=1) in _route_agent_chat for lm-studio/ollama providers. All agent-routed calls to the same base URL queue instead of hitting the GPU simultaneously. 2. skip_discovery=True when routing to a local model. Context discovery would fire a second LM Studio call alongside the main inference. Novel words are still registered in SOAS (low saliency) but the LLM confirmation step waits. Configure write_model_id or a separate agent model pointing at a cloud/remote model to re-enable live context discovery. 3. _LLM_CONCURRENCY 2 → 1 in write_queue for the same reason. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -669,6 +669,25 @@ def _openai_sse_from_response(raw: dict) -> bytes:
|
|||||||
return "".join(parts).encode()
|
return "".join(parts).encode()
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Per-upstream concurrency control for local models
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
# Local inference servers (LM Studio, Ollama) typically run a single model
|
||||||
|
# on one GPU and queue or crash under concurrent requests. We serialize all
|
||||||
|
# agent-routed calls that share the same base URL through a semaphore.
|
||||||
|
# Cloud providers (claude, openai) are excluded — they handle concurrency fine.
|
||||||
|
_local_upstream_semaphores: dict[str, asyncio.Semaphore] = {}
|
||||||
|
|
||||||
|
|
||||||
|
def _get_upstream_semaphore(base_url: str) -> asyncio.Semaphore:
|
||||||
|
"""Return (creating if needed) a semaphore that limits concurrency to 1 for this upstream."""
|
||||||
|
key = base_url.rstrip("/").lower()
|
||||||
|
if key not in _local_upstream_semaphores:
|
||||||
|
_local_upstream_semaphores[key] = asyncio.Semaphore(1)
|
||||||
|
return _local_upstream_semaphores[key]
|
||||||
|
|
||||||
|
|
||||||
async def _get_agent_routing_model(pool, agent_name: str) -> ModelConfig | None:
|
async def _get_agent_routing_model(pool, agent_name: str) -> ModelConfig | None:
|
||||||
"""
|
"""
|
||||||
Look up the agent's configured model from the agent_models table.
|
Look up the agent's configured model from the agent_models table.
|
||||||
@@ -711,10 +730,19 @@ async def _route_agent_chat(
|
|||||||
|
|
||||||
claude → Anthropic Messages API (translated in both directions)
|
claude → Anthropic Messages API (translated in both directions)
|
||||||
openai → OpenAI-compatible endpoint (base_url + model swap)
|
openai → OpenAI-compatible endpoint (base_url + model swap)
|
||||||
lm-studio→ same as openai
|
lm-studio→ same as openai, serialized through a per-upstream semaphore
|
||||||
"""
|
"""
|
||||||
# Run recollection injection (same pipeline as standard path)
|
is_local = agent_model.provider in ("lm-studio", "ollama")
|
||||||
body = await process_prompt(body, "/v1/chat/completions", pool, cfg, request_headers)
|
|
||||||
|
# Local single-GPU models can't handle concurrent requests.
|
||||||
|
# Skip context discovery so we don't fire a second LM Studio call
|
||||||
|
# at the same time as the main inference.
|
||||||
|
# (Configure write_model_id or a per-agent model for a cloud/separate
|
||||||
|
# model if you want memory building alongside local inference.)
|
||||||
|
body = await process_prompt(
|
||||||
|
body, "/v1/chat/completions", pool, cfg, request_headers,
|
||||||
|
skip_discovery=is_local,
|
||||||
|
)
|
||||||
|
|
||||||
sess = session_key(agent_model.model_name, body.get("messages", []))
|
sess = session_key(agent_model.model_name, body.get("messages", []))
|
||||||
|
|
||||||
@@ -738,6 +766,8 @@ async def _route_agent_chat(
|
|||||||
"authorization": f"Bearer {agent_model.api_key or 'lm-studio'}",
|
"authorization": f"Bearer {agent_model.api_key or 'lm-studio'}",
|
||||||
"content-type": "application/json",
|
"content-type": "application/json",
|
||||||
}
|
}
|
||||||
|
# One request at a time to local inference servers.
|
||||||
|
_local_sem = _get_upstream_semaphore(oai_upstream) if is_local else None
|
||||||
|
|
||||||
async def _call(current_body: dict) -> tuple[str, dict]:
|
async def _call(current_body: dict) -> tuple[str, dict]:
|
||||||
"""Call the agent's upstream; always return (text, openai_format_dict)."""
|
"""Call the agent's upstream; always return (text, openai_format_dict)."""
|
||||||
@@ -748,7 +778,11 @@ async def _route_agent_chat(
|
|||||||
else:
|
else:
|
||||||
b = dict(current_body)
|
b = dict(current_body)
|
||||||
b["model"] = agent_model.model_name
|
b["model"] = agent_model.model_name
|
||||||
text, raw_o = await call_openai(b, oai_upstream, oai_headers)
|
if _local_sem is not None:
|
||||||
|
async with _local_sem:
|
||||||
|
text, raw_o = await call_openai(b, oai_upstream, oai_headers)
|
||||||
|
else:
|
||||||
|
text, raw_o = await call_openai(b, oai_upstream, oai_headers)
|
||||||
return text, raw_o
|
return text, raw_o
|
||||||
|
|
||||||
text, raw = await _call(body)
|
text, raw = await _call(body)
|
||||||
@@ -806,10 +840,22 @@ def _extract_agent_name(body: dict, headers: dict) -> tuple[str, dict]:
|
|||||||
return identity, clean_body
|
return identity, clean_body
|
||||||
|
|
||||||
|
|
||||||
async def process_prompt(body: dict, path: str, pool, cfg: dict, request_headers: dict | None = None) -> dict:
|
async def process_prompt(
|
||||||
|
body: dict,
|
||||||
|
path: str,
|
||||||
|
pool,
|
||||||
|
cfg: dict,
|
||||||
|
request_headers: dict | None = None,
|
||||||
|
skip_discovery: bool = False,
|
||||||
|
) -> dict:
|
||||||
"""
|
"""
|
||||||
Run the saliency + recollection pipeline over the prompt.
|
Run the saliency + recollection pipeline over the prompt.
|
||||||
Returns a (possibly modified) body dict with the recollection block injected.
|
Returns a (possibly modified) body dict with the recollection block injected.
|
||||||
|
|
||||||
|
skip_discovery: when True, novel-word candidates are still registered in
|
||||||
|
SOAS (low saliency) but the LLM context-discover call is suppressed.
|
||||||
|
Used when the agent is routed to a local single-GPU model that cannot
|
||||||
|
handle simultaneous requests.
|
||||||
"""
|
"""
|
||||||
read_threshold = float(await get_config(pool, "saliency_read_threshold", "0.5"))
|
read_threshold = float(await get_config(pool, "saliency_read_threshold", "0.5"))
|
||||||
conf_floor = float(await get_config(pool, "recollection_confidence_floor", "0.6"))
|
conf_floor = float(await get_config(pool, "recollection_confidence_floor", "0.6"))
|
||||||
@@ -880,7 +926,7 @@ async def process_prompt(body: dict, path: str, pool, cfg: dict, request_headers
|
|||||||
# Do NOT add to salient_for_read — no zero-hit recollection until confirmed.
|
# Do NOT add to salient_for_read — no zero-hit recollection until confirmed.
|
||||||
|
|
||||||
# 3. Enqueue for LLM-driven discovery if there are candidates to evaluate.
|
# 3. Enqueue for LLM-driven discovery if there are candidates to evaluate.
|
||||||
if novel_candidates and len(user_text) >= 20:
|
if novel_candidates and len(user_text) >= 20 and not skip_discovery:
|
||||||
await enqueue_context_discover(
|
await enqueue_context_discover(
|
||||||
user_text, novel_candidates,
|
user_text, novel_candidates,
|
||||||
agent_name=agent_name,
|
agent_name=agent_name,
|
||||||
|
|||||||
@@ -58,7 +58,9 @@ _running: bool = False
|
|||||||
|
|
||||||
# Limit concurrent LM Studio / cloud-LLM calls so we don't flood a slow
|
# Limit concurrent LM Studio / cloud-LLM calls so we don't flood a slow
|
||||||
# local model server. Cue processing (pure DB ops) is never rate-limited.
|
# local model server. Cue processing (pure DB ops) is never rate-limited.
|
||||||
_LLM_CONCURRENCY = 2
|
# Local models (LM Studio, Ollama) can typically only handle one request at a
|
||||||
|
# time. Keep this at 1 to avoid overwhelming a single-GPU inference server.
|
||||||
|
_LLM_CONCURRENCY = 1
|
||||||
_llm_semaphore: asyncio.Semaphore | None = None
|
_llm_semaphore: asyncio.Semaphore | None = None
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user