From 98b115d4710c30de9025857ac11d184fad7449b8 Mon Sep 17 00:00:00 2001 From: jenstandstad Date: Mon, 20 Apr 2026 18:22:57 +0200 Subject: [PATCH] Draing the queue --- plugins/festinger/festinger/write_queue.py | 31 ++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/plugins/festinger/festinger/write_queue.py b/plugins/festinger/festinger/write_queue.py index 398f668..84168da 100644 --- a/plugins/festinger/festinger/write_queue.py +++ b/plugins/festinger/festinger/write_queue.py @@ -42,6 +42,11 @@ class CueWriteRequest: _queue: asyncio.Queue = asyncio.Queue(maxsize=1000) _running: bool = False +# Limit concurrent LM Studio / cloud-LLM calls so we don't flood a slow +# local model server. Cue processing (pure DB ops) is never rate-limited. +_LLM_CONCURRENCY = 2 +_llm_semaphore: asyncio.Semaphore | None = None + async def enqueue_concept(token: str) -> None: try: @@ -59,13 +64,21 @@ async def enqueue_cue(triple: CueTriple) -> None: async def start_worker(pool: asyncpg.Pool) -> None: """Launch background worker. Call once at startup.""" - global _running + global _running, _llm_semaphore _running = True + _llm_semaphore = asyncio.Semaphore(_LLM_CONCURRENCY) asyncio.create_task(_worker(pool)) log.info("write queue worker started") async def _worker(pool: asyncpg.Pool) -> None: + """ + Drain the queue as fast as possible. + + Cue writes (pure DB ops) are awaited inline — they're fast. + Concept LLM calls are dispatched as fire-and-forget asyncio tasks so + a slow LM Studio response never stalls the worker loop. + """ while _running: try: item = await asyncio.wait_for(_queue.get(), timeout=5.0) @@ -74,15 +87,29 @@ async def _worker(pool: asyncpg.Pool) -> None: try: if isinstance(item, CueWriteRequest): + # Fast path: no LLM involved, process inline. await _process_cue(pool, item.triple) elif isinstance(item, WriteRequest): - await _process_concept(pool, item.concept_token) + # Slow path: fire off without awaiting so the worker stays free. + asyncio.create_task( + _process_concept_guarded(pool, item.concept_token) + ) except Exception as e: log.exception("write queue worker error: %s", e) finally: _queue.task_done() +async def _process_concept_guarded(pool: asyncpg.Pool, concept_token: str) -> None: + """Wrapper that acquires the LLM semaphore before calling the model.""" + assert _llm_semaphore is not None + async with _llm_semaphore: + try: + await _process_concept(pool, concept_token) + except Exception as e: + log.exception("concept LLM task error for %s: %s", concept_token, e) + + async def stop_worker() -> None: global _running _running = False