172 lines
5.2 KiB
Python
172 lines
5.2 KiB
Python
|
|
"""
|
||
|
|
Async write queue — background processing of saliency-triggered and
|
||
|
|
cue-triggered write requests.
|
||
|
|
|
||
|
|
Cloud LLM calls and URD inserts never block the proxy response path.
|
||
|
|
"""
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import asyncio
|
||
|
|
import logging
|
||
|
|
from dataclasses import dataclass
|
||
|
|
from typing import Optional
|
||
|
|
|
||
|
|
import asyncpg
|
||
|
|
|
||
|
|
from .cache import SoasRow
|
||
|
|
from .cue_scanner import CueTriple
|
||
|
|
from .db import get_or_create_soas, get_config
|
||
|
|
from .llm_client import (
|
||
|
|
ModelConfig, get_model_config, call_llm,
|
||
|
|
WRITE_PROMPT_TEMPLATE, parse_llm_triples,
|
||
|
|
)
|
||
|
|
from .urd_writer import InsertRequest, insert_urd_edge
|
||
|
|
from . import cache
|
||
|
|
|
||
|
|
log = logging.getLogger("festinger.write_queue")
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class WriteRequest:
|
||
|
|
"""A concept that crossed the write threshold — needs LLM-assisted classification."""
|
||
|
|
concept_token: str
|
||
|
|
trigger: str = "saliency" # 'saliency' | 'cue'
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class CueWriteRequest:
|
||
|
|
"""A directly extracted CueTriple — bypasses LLM, goes straight to URD insert."""
|
||
|
|
triple: CueTriple
|
||
|
|
|
||
|
|
|
||
|
|
_queue: asyncio.Queue = asyncio.Queue(maxsize=1000)
|
||
|
|
_running: bool = False
|
||
|
|
|
||
|
|
|
||
|
|
async def enqueue_concept(token: str) -> None:
|
||
|
|
try:
|
||
|
|
_queue.put_nowait(WriteRequest(concept_token=token))
|
||
|
|
except asyncio.QueueFull:
|
||
|
|
log.warning("write queue full — dropping concept: %s", token)
|
||
|
|
|
||
|
|
|
||
|
|
async def enqueue_cue(triple: CueTriple) -> None:
|
||
|
|
try:
|
||
|
|
_queue.put_nowait(CueWriteRequest(triple=triple))
|
||
|
|
except asyncio.QueueFull:
|
||
|
|
log.warning("write queue full — dropping cue: %s", triple)
|
||
|
|
|
||
|
|
|
||
|
|
async def start_worker(pool: asyncpg.Pool) -> None:
|
||
|
|
"""Launch background worker. Call once at startup."""
|
||
|
|
global _running
|
||
|
|
_running = True
|
||
|
|
asyncio.create_task(_worker(pool))
|
||
|
|
log.info("write queue worker started")
|
||
|
|
|
||
|
|
|
||
|
|
async def _worker(pool: asyncpg.Pool) -> None:
|
||
|
|
while _running:
|
||
|
|
try:
|
||
|
|
item = await asyncio.wait_for(_queue.get(), timeout=5.0)
|
||
|
|
except asyncio.TimeoutError:
|
||
|
|
continue
|
||
|
|
|
||
|
|
try:
|
||
|
|
if isinstance(item, CueWriteRequest):
|
||
|
|
await _process_cue(pool, item.triple)
|
||
|
|
elif isinstance(item, WriteRequest):
|
||
|
|
await _process_concept(pool, item.concept_token)
|
||
|
|
except Exception as e:
|
||
|
|
log.exception("write queue worker error: %s", e)
|
||
|
|
finally:
|
||
|
|
_queue.task_done()
|
||
|
|
|
||
|
|
|
||
|
|
async def stop_worker() -> None:
|
||
|
|
global _running
|
||
|
|
_running = False
|
||
|
|
|
||
|
|
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
# Processing
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
|
||
|
|
async def _process_cue(pool: asyncpg.Pool, triple: CueTriple) -> None:
|
||
|
|
"""Insert a cue-extracted triple directly into URD."""
|
||
|
|
subj_row = await get_or_create_soas(pool, triple.subject)
|
||
|
|
parent_row = await get_or_create_soas(pool, triple.parent)
|
||
|
|
dim_row = await get_or_create_soas(pool, triple.dimension)
|
||
|
|
|
||
|
|
req = InsertRequest(
|
||
|
|
concept_id=subj_row.id,
|
||
|
|
parent_id=parent_row.id,
|
||
|
|
dim_id=dim_row.id,
|
||
|
|
is_isa=triple.is_isa,
|
||
|
|
confidence=triple.confidence,
|
||
|
|
source="inferred",
|
||
|
|
)
|
||
|
|
collision = await insert_urd_edge(pool, req)
|
||
|
|
if collision:
|
||
|
|
log.info("cue triple collision: %s", collision)
|
||
|
|
|
||
|
|
|
||
|
|
async def _process_concept(pool: asyncpg.Pool, concept_token: str) -> None:
|
||
|
|
"""Call cloud LLM to classify the concept, then insert all returned triples."""
|
||
|
|
write_model_id = await get_config(pool, "write_model_id")
|
||
|
|
if not write_model_id:
|
||
|
|
log.debug("no write_model_id configured — skipping LLM write for %s", concept_token)
|
||
|
|
return
|
||
|
|
|
||
|
|
model = await get_model_config(pool, write_model_id)
|
||
|
|
if not model:
|
||
|
|
log.warning("write_model_id=%s not found in models table", write_model_id)
|
||
|
|
return
|
||
|
|
|
||
|
|
known_dims = list(cache.soas_by_token.keys())
|
||
|
|
# Keep only seed dimensions + short list for prompt brevity
|
||
|
|
seed_dims = ["type", "membership", "runs-on", "tech", "owned-by", "geography"]
|
||
|
|
dimensions_str = ", ".join(seed_dims)
|
||
|
|
|
||
|
|
prompt = WRITE_PROMPT_TEMPLATE.format(
|
||
|
|
concept=concept_token,
|
||
|
|
dimensions=dimensions_str,
|
||
|
|
)
|
||
|
|
|
||
|
|
try:
|
||
|
|
response = await call_llm(model, prompt)
|
||
|
|
except Exception as e:
|
||
|
|
log.warning("LLM call failed for concept %s: %s", concept_token, e)
|
||
|
|
return
|
||
|
|
|
||
|
|
triples = parse_llm_triples(response)
|
||
|
|
if not triples:
|
||
|
|
log.info("LLM returned no triples for concept: %s", concept_token)
|
||
|
|
return
|
||
|
|
|
||
|
|
subj_row = await get_or_create_soas(pool, concept_token)
|
||
|
|
|
||
|
|
for t in triples:
|
||
|
|
if not t.parent or not t.dimension:
|
||
|
|
continue
|
||
|
|
parent_row = await get_or_create_soas(pool, t.parent)
|
||
|
|
dim_row = await get_or_create_soas(pool, t.dimension)
|
||
|
|
|
||
|
|
req = InsertRequest(
|
||
|
|
concept_id=subj_row.id,
|
||
|
|
parent_id=parent_row.id,
|
||
|
|
dim_id=dim_row.id,
|
||
|
|
is_isa=t.is_isa,
|
||
|
|
confidence=t.confidence,
|
||
|
|
source="cloud_llm",
|
||
|
|
)
|
||
|
|
await insert_urd_edge(pool, req)
|
||
|
|
|
||
|
|
# Mark concept as confirmed — set novelty=1.0
|
||
|
|
async with pool.acquire() as conn:
|
||
|
|
await conn.execute(
|
||
|
|
"UPDATE soas SET novelty = 1.0 WHERE id = $1", subj_row.id
|
||
|
|
)
|
||
|
|
if concept_token in cache.soas_by_token:
|
||
|
|
cache.soas_by_token[concept_token].novelty = 1.0
|