Draing the queue
This commit is contained in:
@@ -42,6 +42,11 @@ class CueWriteRequest:
|
|||||||
_queue: asyncio.Queue = asyncio.Queue(maxsize=1000)
|
_queue: asyncio.Queue = asyncio.Queue(maxsize=1000)
|
||||||
_running: bool = False
|
_running: bool = False
|
||||||
|
|
||||||
|
# Limit concurrent LM Studio / cloud-LLM calls so we don't flood a slow
|
||||||
|
# local model server. Cue processing (pure DB ops) is never rate-limited.
|
||||||
|
_LLM_CONCURRENCY = 2
|
||||||
|
_llm_semaphore: asyncio.Semaphore | None = None
|
||||||
|
|
||||||
|
|
||||||
async def enqueue_concept(token: str) -> None:
|
async def enqueue_concept(token: str) -> None:
|
||||||
try:
|
try:
|
||||||
@@ -59,13 +64,21 @@ async def enqueue_cue(triple: CueTriple) -> None:
|
|||||||
|
|
||||||
async def start_worker(pool: asyncpg.Pool) -> None:
|
async def start_worker(pool: asyncpg.Pool) -> None:
|
||||||
"""Launch background worker. Call once at startup."""
|
"""Launch background worker. Call once at startup."""
|
||||||
global _running
|
global _running, _llm_semaphore
|
||||||
_running = True
|
_running = True
|
||||||
|
_llm_semaphore = asyncio.Semaphore(_LLM_CONCURRENCY)
|
||||||
asyncio.create_task(_worker(pool))
|
asyncio.create_task(_worker(pool))
|
||||||
log.info("write queue worker started")
|
log.info("write queue worker started")
|
||||||
|
|
||||||
|
|
||||||
async def _worker(pool: asyncpg.Pool) -> None:
|
async def _worker(pool: asyncpg.Pool) -> None:
|
||||||
|
"""
|
||||||
|
Drain the queue as fast as possible.
|
||||||
|
|
||||||
|
Cue writes (pure DB ops) are awaited inline — they're fast.
|
||||||
|
Concept LLM calls are dispatched as fire-and-forget asyncio tasks so
|
||||||
|
a slow LM Studio response never stalls the worker loop.
|
||||||
|
"""
|
||||||
while _running:
|
while _running:
|
||||||
try:
|
try:
|
||||||
item = await asyncio.wait_for(_queue.get(), timeout=5.0)
|
item = await asyncio.wait_for(_queue.get(), timeout=5.0)
|
||||||
@@ -74,15 +87,29 @@ async def _worker(pool: asyncpg.Pool) -> None:
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
if isinstance(item, CueWriteRequest):
|
if isinstance(item, CueWriteRequest):
|
||||||
|
# Fast path: no LLM involved, process inline.
|
||||||
await _process_cue(pool, item.triple)
|
await _process_cue(pool, item.triple)
|
||||||
elif isinstance(item, WriteRequest):
|
elif isinstance(item, WriteRequest):
|
||||||
await _process_concept(pool, item.concept_token)
|
# Slow path: fire off without awaiting so the worker stays free.
|
||||||
|
asyncio.create_task(
|
||||||
|
_process_concept_guarded(pool, item.concept_token)
|
||||||
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.exception("write queue worker error: %s", e)
|
log.exception("write queue worker error: %s", e)
|
||||||
finally:
|
finally:
|
||||||
_queue.task_done()
|
_queue.task_done()
|
||||||
|
|
||||||
|
|
||||||
|
async def _process_concept_guarded(pool: asyncpg.Pool, concept_token: str) -> None:
|
||||||
|
"""Wrapper that acquires the LLM semaphore before calling the model."""
|
||||||
|
assert _llm_semaphore is not None
|
||||||
|
async with _llm_semaphore:
|
||||||
|
try:
|
||||||
|
await _process_concept(pool, concept_token)
|
||||||
|
except Exception as e:
|
||||||
|
log.exception("concept LLM task error for %s: %s", concept_token, e)
|
||||||
|
|
||||||
|
|
||||||
async def stop_worker() -> None:
|
async def stop_worker() -> None:
|
||||||
global _running
|
global _running
|
||||||
_running = False
|
_running = False
|
||||||
|
|||||||
Reference in New Issue
Block a user