diff --git a/plugins/festinger/festinger/main.py b/plugins/festinger/festinger/main.py index 14c833d..35c8667 100644 --- a/plugins/festinger/festinger/main.py +++ b/plugins/festinger/festinger/main.py @@ -669,6 +669,25 @@ def _openai_sse_from_response(raw: dict) -> bytes: return "".join(parts).encode() +# --------------------------------------------------------------------------- +# Per-upstream concurrency control for local models +# --------------------------------------------------------------------------- + +# Local inference servers (LM Studio, Ollama) typically run a single model +# on one GPU and queue or crash under concurrent requests. We serialize all +# agent-routed calls that share the same base URL through a semaphore. +# Cloud providers (claude, openai) are excluded — they handle concurrency fine. +_local_upstream_semaphores: dict[str, asyncio.Semaphore] = {} + + +def _get_upstream_semaphore(base_url: str) -> asyncio.Semaphore: + """Return (creating if needed) a semaphore that limits concurrency to 1 for this upstream.""" + key = base_url.rstrip("/").lower() + if key not in _local_upstream_semaphores: + _local_upstream_semaphores[key] = asyncio.Semaphore(1) + return _local_upstream_semaphores[key] + + async def _get_agent_routing_model(pool, agent_name: str) -> ModelConfig | None: """ Look up the agent's configured model from the agent_models table. @@ -711,10 +730,19 @@ async def _route_agent_chat( claude → Anthropic Messages API (translated in both directions) openai → OpenAI-compatible endpoint (base_url + model swap) - lm-studio→ same as openai + lm-studio→ same as openai, serialized through a per-upstream semaphore """ - # Run recollection injection (same pipeline as standard path) - body = await process_prompt(body, "/v1/chat/completions", pool, cfg, request_headers) + is_local = agent_model.provider in ("lm-studio", "ollama") + + # Local single-GPU models can't handle concurrent requests. + # Skip context discovery so we don't fire a second LM Studio call + # at the same time as the main inference. + # (Configure write_model_id or a per-agent model for a cloud/separate + # model if you want memory building alongside local inference.) + body = await process_prompt( + body, "/v1/chat/completions", pool, cfg, request_headers, + skip_discovery=is_local, + ) sess = session_key(agent_model.model_name, body.get("messages", [])) @@ -738,6 +766,8 @@ async def _route_agent_chat( "authorization": f"Bearer {agent_model.api_key or 'lm-studio'}", "content-type": "application/json", } + # One request at a time to local inference servers. + _local_sem = _get_upstream_semaphore(oai_upstream) if is_local else None async def _call(current_body: dict) -> tuple[str, dict]: """Call the agent's upstream; always return (text, openai_format_dict).""" @@ -748,7 +778,11 @@ async def _route_agent_chat( else: b = dict(current_body) b["model"] = agent_model.model_name - text, raw_o = await call_openai(b, oai_upstream, oai_headers) + if _local_sem is not None: + async with _local_sem: + text, raw_o = await call_openai(b, oai_upstream, oai_headers) + else: + text, raw_o = await call_openai(b, oai_upstream, oai_headers) return text, raw_o text, raw = await _call(body) @@ -806,10 +840,22 @@ def _extract_agent_name(body: dict, headers: dict) -> tuple[str, dict]: return identity, clean_body -async def process_prompt(body: dict, path: str, pool, cfg: dict, request_headers: dict | None = None) -> dict: +async def process_prompt( + body: dict, + path: str, + pool, + cfg: dict, + request_headers: dict | None = None, + skip_discovery: bool = False, +) -> dict: """ Run the saliency + recollection pipeline over the prompt. Returns a (possibly modified) body dict with the recollection block injected. + + skip_discovery: when True, novel-word candidates are still registered in + SOAS (low saliency) but the LLM context-discover call is suppressed. + Used when the agent is routed to a local single-GPU model that cannot + handle simultaneous requests. """ read_threshold = float(await get_config(pool, "saliency_read_threshold", "0.5")) conf_floor = float(await get_config(pool, "recollection_confidence_floor", "0.6")) @@ -880,7 +926,7 @@ async def process_prompt(body: dict, path: str, pool, cfg: dict, request_headers # Do NOT add to salient_for_read — no zero-hit recollection until confirmed. # 3. Enqueue for LLM-driven discovery if there are candidates to evaluate. - if novel_candidates and len(user_text) >= 20: + if novel_candidates and len(user_text) >= 20 and not skip_discovery: await enqueue_context_discover( user_text, novel_candidates, agent_name=agent_name, diff --git a/plugins/festinger/festinger/write_queue.py b/plugins/festinger/festinger/write_queue.py index 44f4048..0e3a30c 100644 --- a/plugins/festinger/festinger/write_queue.py +++ b/plugins/festinger/festinger/write_queue.py @@ -58,7 +58,9 @@ _running: bool = False # Limit concurrent LM Studio / cloud-LLM calls so we don't flood a slow # local model server. Cue processing (pure DB ops) is never rate-limited. -_LLM_CONCURRENCY = 2 +# Local models (LM Studio, Ollama) can typically only handle one request at a +# time. Keep this at 1 to avoid overwhelming a single-GPU inference server. +_LLM_CONCURRENCY = 1 _llm_semaphore: asyncio.Semaphore | None = None