Files
agent0/plugins/festinger/festinger/write_queue.py
T
2026-04-21 18:32:21 +02:00

223 lines
7.3 KiB
Python

"""
Async write queue — background processing of saliency-triggered and
cue-triggered write requests.
Cloud LLM calls and URD inserts never block the proxy response path.
"""
from __future__ import annotations
import asyncio
import logging
from dataclasses import dataclass
from typing import Optional
import asyncpg
from .cache import SoasRow
from .cue_scanner import CueTriple
from .db import get_or_create_soas, get_config
from .llm_client import (
ModelConfig, get_model_config, call_llm,
CONTEXT_EXTRACT_PROMPT_TEMPLATE, parse_context_triples,
)
from .urd_writer import InsertRequest, insert_urd_edge
from . import cache
log = logging.getLogger("festinger.write_queue")
@dataclass
class ContextExtractRequest:
"""
Novel concepts seen in a single conversation turn, with the turn text as context.
The LLM reads the context and extracts relationship triples from evidence in the text —
much more accurate than asking about concepts in isolation.
"""
concept_tokens: list[str]
context_text: str
@dataclass
class CueWriteRequest:
"""A directly extracted CueTriple — bypasses LLM, goes straight to URD insert."""
triple: CueTriple
_queue: asyncio.Queue = asyncio.Queue(maxsize=1000)
_running: bool = False
# Limit concurrent LM Studio / cloud-LLM calls so we don't flood a slow
# local model server. Cue processing (pure DB ops) is never rate-limited.
_LLM_CONCURRENCY = 2
_llm_semaphore: asyncio.Semaphore | None = None
async def enqueue_context_extract(tokens: list[str], context: str) -> None:
"""Enqueue a context-aware extraction request for a batch of novel concepts."""
try:
_queue.put_nowait(ContextExtractRequest(concept_tokens=tokens, context_text=context))
except asyncio.QueueFull:
log.warning("write queue full — dropping context extract for: %s", tokens)
async def enqueue_cue(triple: CueTriple) -> None:
try:
_queue.put_nowait(CueWriteRequest(triple=triple))
except asyncio.QueueFull:
log.warning("write queue full — dropping cue: %s", triple)
async def start_worker(pool: asyncpg.Pool) -> None:
"""Launch background worker. Call once at startup."""
global _running, _llm_semaphore
_running = True
_llm_semaphore = asyncio.Semaphore(_LLM_CONCURRENCY)
asyncio.create_task(_worker(pool))
log.info("write queue worker started")
async def _worker(pool: asyncpg.Pool) -> None:
"""
Drain the queue as fast as possible.
Cue writes (pure DB ops) are awaited inline — they're fast.
Concept LLM calls are dispatched as fire-and-forget asyncio tasks so
a slow LM Studio response never stalls the worker loop.
"""
while _running:
try:
item = await asyncio.wait_for(_queue.get(), timeout=5.0)
except asyncio.TimeoutError:
continue
try:
if isinstance(item, CueWriteRequest):
# Fast path: no LLM involved, process inline.
await _process_cue(pool, item.triple)
elif isinstance(item, ContextExtractRequest):
# Slow path: fire off without awaiting so the worker stays free.
asyncio.create_task(
_process_context_extract_guarded(
pool, item.concept_tokens, item.context_text
)
)
except Exception as e:
log.exception("write queue worker error: %s", e)
finally:
_queue.task_done()
async def _process_context_extract_guarded(
pool: asyncpg.Pool,
concept_tokens: list[str],
context_text: str,
) -> None:
"""Wrapper that acquires the LLM semaphore before context extraction."""
assert _llm_semaphore is not None
async with _llm_semaphore:
try:
await _process_context_extract(pool, concept_tokens, context_text)
except Exception as e:
log.exception("context extract task error for %s: %s", concept_tokens, e)
async def stop_worker() -> None:
global _running
_running = False
# ---------------------------------------------------------------------------
# Processing
# ---------------------------------------------------------------------------
async def _process_cue(pool: asyncpg.Pool, triple: CueTriple) -> None:
"""Insert a cue-extracted triple directly into URD."""
subj_row = await get_or_create_soas(pool, triple.subject)
parent_row = await get_or_create_soas(pool, triple.parent)
dim_row = await get_or_create_soas(pool, triple.dimension)
req = InsertRequest(
concept_id=subj_row.id,
parent_id=parent_row.id,
dim_id=dim_row.id,
is_isa=triple.is_isa,
confidence=triple.confidence,
source="inferred",
)
collision = await insert_urd_edge(pool, req)
if collision:
log.info("cue triple collision: %s", collision)
async def _process_context_extract(
pool: asyncpg.Pool,
concept_tokens: list[str],
context_text: str,
) -> None:
"""
Ask the local LLM to extract relationships for novel concepts from the
actual conversation context. One LLM call per prompt turn, not per concept.
This is fundamentally better than asking about concepts in isolation:
the model reads what was actually said about e.g. 'gnommoweb' and asserts
only what the text supports — no hallucination, evidence-grounded confidence.
"""
write_model_id = await get_config(pool, "write_model_id")
if not write_model_id:
log.debug("no write_model_id configured — skipping context extract")
return
model = await get_model_config(pool, write_model_id)
if not model:
log.warning("write_model_id=%s not found in models table", write_model_id)
return
seed_dims = ["type", "membership", "runs-on", "tech", "owned-by", "geography"]
prompt = CONTEXT_EXTRACT_PROMPT_TEMPLATE.format(
concepts=", ".join(concept_tokens),
context=context_text[:3000], # cap to avoid exceeding model context
dimensions=", ".join(seed_dims),
)
try:
response = await call_llm(model, prompt)
except Exception as e:
log.warning("LLM call failed for context extract %s: %s", concept_tokens, e)
return
triples = parse_context_triples(response)
if not triples:
log.info("LLM returned no context triples for: %s", concept_tokens)
return
concept_set = set(concept_tokens)
inserted = 0
for t in triples:
# Reject any concept the LLM invented that wasn't in our list
if not t.concept or t.concept not in concept_set:
log.debug("context extract: ignoring hallucinated concept %r", t.concept)
continue
if not t.parent or not t.dimension:
continue
subj_row = await get_or_create_soas(pool, t.concept)
parent_row = await get_or_create_soas(pool, t.parent)
dim_row = await get_or_create_soas(pool, t.dimension)
req = InsertRequest(
concept_id=subj_row.id,
parent_id=parent_row.id,
dim_id=dim_row.id,
is_isa=t.is_isa,
confidence=t.confidence,
source="context_llm",
)
collision = await insert_urd_edge(pool, req)
if not collision:
inserted += 1
log.info(
"context extract complete: concepts=%s%d triples → %d inserted",
concept_tokens, len(triples), inserted,
)