diff --git a/plugins/festinger/festinger/llm_client.py b/plugins/festinger/festinger/llm_client.py index 346b139..b79e411 100644 --- a/plugins/festinger/festinger/llm_client.py +++ b/plugins/festinger/festinger/llm_client.py @@ -84,27 +84,67 @@ async def _call_openai(model: ModelConfig, prompt: str) -> str: # Structured prompts # --------------------------------------------------------------------------- -WRITE_PROMPT_TEMPLATE = """You are a knowledge extraction assistant. +CONTEXT_EXTRACT_PROMPT_TEMPLATE = """You are a knowledge extraction assistant helping build a semantic memory graph. -Given the concept: "{concept}" +The following words appear in the conversation below and are NOT in the standard English dictionary — they are likely domain-specific terms that should be remembered: +{concepts} -And these known dimensions: {dimensions} +Read the conversation excerpt and, for each concept above, generate relationship triples IF the excerpt provides enough context to make a confident assertion. Only assert what is clearly evidenced in the text — do not guess or hallucinate. -Return a JSON array of triples describing what you know about this concept. -Each triple must have these fields: - - "parent": the containing concept (string) - - "dimension": one of the known dimensions above, or a new specific one if none fit - - "is_isa": true if this is a classification (ISA), false if membership/containment (ISPART) - - "confidence": 0.0 to 1.0 +CONVERSATION EXCERPT: +{context} -Return ONLY the JSON array. No explanation. Example: +Return a JSON array of triples. Each triple must have: + - "concept": one of the domain words listed above (exactly as spelled) + - "parent": the broader category or container (e.g. "software-repository", "glitch-university") + - "dimension": one of [{dimensions}] or a new specific one if none fit + - "is_isa": true if concept IS A type of parent (classification), false if concept BELONGS TO parent (containment) + - "confidence": 0.0–1.0 reflecting how clearly the excerpt supports this assertion + +Return ONLY the JSON array. No explanation. If no confident assertions can be made, return []. +Example: [ - {{"parent": "software-repository", "dimension": "type", "is_isa": true, "confidence": 0.9}}, - {{"parent": "glitch-university", "dimension": "membership", "is_isa": false, "confidence": 0.85}} + {{"concept": "gnommoweb", "parent": "software-repository", "dimension": "type", "is_isa": true, "confidence": 0.9}}, + {{"concept": "gnommoweb", "parent": "glitch-university", "dimension": "membership", "is_isa": false, "confidence": 0.85}} ] """ +@dataclass +class ContextTriple: + concept: str + parent: str + dimension: str + is_isa: bool + confidence: float + + +def parse_context_triples(response: str) -> list[ContextTriple]: + """Parse JSON array of context-aware triples from LLM response.""" + try: + start = response.find("[") + end = response.rfind("]") + 1 + if start == -1 or end == 0: + return [] + data = json.loads(response[start:end]) + triples = [] + for item in data: + if not isinstance(item, dict): + continue + triples.append(ContextTriple( + concept=str(item.get("concept", "")).strip().lower(), + parent=str(item.get("parent", "")).strip().lower(), + dimension=str(item.get("dimension", "type")).strip().lower(), + is_isa=bool(item.get("is_isa", True)), + confidence=float(item.get("confidence", 0.7)), + )) + return triples + except (json.JSONDecodeError, ValueError) as e: + log.warning("failed to parse context triples: %s", e) + return [] + + +# Kept for backwards-compatibility with the resolution job prompts below. @dataclass class LLMTriple: parent: str @@ -114,9 +154,8 @@ class LLMTriple: def parse_llm_triples(response: str) -> list[LLMTriple]: - """Parse JSON array of triples from LLM response.""" + """Parse JSON array of single-concept triples (used by resolution job only).""" try: - # Find the JSON array in the response start = response.find("[") end = response.rfind("]") + 1 if start == -1 or end == 0: diff --git a/plugins/festinger/festinger/main.py b/plugins/festinger/festinger/main.py index d15429b..4698773 100644 --- a/plugins/festinger/festinger/main.py +++ b/plugins/festinger/festinger/main.py @@ -44,7 +44,7 @@ from .cue_scanner import scan_cues from .recollection import build_recollection_block, inject_recollection from .resolution_job import run_resolution_job, last_run_timestamp from .tokenizer import tokenize -from .write_queue import enqueue_concept, enqueue_cue, start_worker, stop_worker +from .write_queue import enqueue_context_extract, enqueue_cue, start_worker, stop_worker from .urd_writer import InsertRequest, insert_urd_edge from .wordnet import import_wordnet, CITATION as WORDNET_CITATION from .test_scenarios import SCENARIOS, seed_scenario, reset_scenario @@ -528,11 +528,10 @@ async def process_prompt(body: dict, path: str, pool, cfg: dict) -> dict: # 2. Tokenise + update saliency tokens = tokenize(prompt_text) salient_for_read: list[int] = [] - salient_for_write: list[str] = [] - # Novel words found this prompt that aren't in the cache yet. - # We cap at MAX_NOVEL_PER_PROMPT to avoid flooding on large system prompts. - MAX_NOVEL_PER_PROMPT = 5 + # Novel domain words found in this turn — not in the standard dictionary. + # Capped to avoid flooding on unexpectedly large turns. + MAX_NOVEL_PER_PROMPT = 8 novel_this_prompt: list[str] = [] for token in tokens: @@ -540,7 +539,7 @@ async def process_prompt(body: dict, path: str, pool, cfg: dict) -> dict: if soas_row is None: # Token absent from dictionary → candidate novel domain word. - # Skip structural tokens (paths, versions, numbers) and apply a per-prompt cap. + # Skip structural tokens (paths, versions, numbers). if not _is_structural_token(token) and len(novel_this_prompt) < MAX_NOVEL_PER_PROMPT: novel_this_prompt.append(token) continue @@ -554,16 +553,6 @@ async def process_prompt(body: dict, path: str, pool, cfg: dict) -> dict: if soas_row.saliency >= read_threshold: salient_for_read.append(soas_row.id) - # Only enqueue for LLM write if the concept already has URD edges — - # i.e. we know *something* about it and may want to expand that knowledge. - # Never enqueue freshly-novel words: let the conversation teach us instead. - if ( - soas_row.saliency >= write_threshold - and soas_row.novelty > 0.0 - and cache.urd_by_concept.get(soas_row.id) - ): - salient_for_write.append(token) - # Create SOAS entries for novel words and add them to the read list. # Capture first-seen context so zero-hit recollection can include a hint. for token in novel_this_prompt: @@ -571,8 +560,11 @@ async def process_prompt(body: dict, path: str, pool, cfg: dict) -> dict: soas_row = await create_novel_soas(pool, token, context=ctx) salient_for_read.append(soas_row.id) - for token in salient_for_write: - await enqueue_concept(token) + # Enqueue context-aware LLM extraction for all novel words found this turn. + # The LLM reads the actual conversation text and extracts relationships from + # evidence — one call per turn, not one per concept. + if novel_this_prompt: + await enqueue_context_extract(novel_this_prompt, prompt_text) if not salient_for_read: return body diff --git a/plugins/festinger/festinger/write_queue.py b/plugins/festinger/festinger/write_queue.py index 84168da..d064b0b 100644 --- a/plugins/festinger/festinger/write_queue.py +++ b/plugins/festinger/festinger/write_queue.py @@ -18,7 +18,7 @@ from .cue_scanner import CueTriple from .db import get_or_create_soas, get_config from .llm_client import ( ModelConfig, get_model_config, call_llm, - WRITE_PROMPT_TEMPLATE, parse_llm_triples, + CONTEXT_EXTRACT_PROMPT_TEMPLATE, parse_context_triples, ) from .urd_writer import InsertRequest, insert_urd_edge from . import cache @@ -27,10 +27,14 @@ log = logging.getLogger("festinger.write_queue") @dataclass -class WriteRequest: - """A concept that crossed the write threshold — needs LLM-assisted classification.""" - concept_token: str - trigger: str = "saliency" # 'saliency' | 'cue' +class ContextExtractRequest: + """ + Novel concepts seen in a single conversation turn, with the turn text as context. + The LLM reads the context and extracts relationship triples from evidence in the text — + much more accurate than asking about concepts in isolation. + """ + concept_tokens: list[str] + context_text: str @dataclass @@ -48,11 +52,12 @@ _LLM_CONCURRENCY = 2 _llm_semaphore: asyncio.Semaphore | None = None -async def enqueue_concept(token: str) -> None: +async def enqueue_context_extract(tokens: list[str], context: str) -> None: + """Enqueue a context-aware extraction request for a batch of novel concepts.""" try: - _queue.put_nowait(WriteRequest(concept_token=token)) + _queue.put_nowait(ContextExtractRequest(concept_tokens=tokens, context_text=context)) except asyncio.QueueFull: - log.warning("write queue full — dropping concept: %s", token) + log.warning("write queue full — dropping context extract for: %s", tokens) async def enqueue_cue(triple: CueTriple) -> None: @@ -89,10 +94,12 @@ async def _worker(pool: asyncpg.Pool) -> None: if isinstance(item, CueWriteRequest): # Fast path: no LLM involved, process inline. await _process_cue(pool, item.triple) - elif isinstance(item, WriteRequest): + elif isinstance(item, ContextExtractRequest): # Slow path: fire off without awaiting so the worker stays free. asyncio.create_task( - _process_concept_guarded(pool, item.concept_token) + _process_context_extract_guarded( + pool, item.concept_tokens, item.context_text + ) ) except Exception as e: log.exception("write queue worker error: %s", e) @@ -100,14 +107,18 @@ async def _worker(pool: asyncpg.Pool) -> None: _queue.task_done() -async def _process_concept_guarded(pool: asyncpg.Pool, concept_token: str) -> None: - """Wrapper that acquires the LLM semaphore before calling the model.""" +async def _process_context_extract_guarded( + pool: asyncpg.Pool, + concept_tokens: list[str], + context_text: str, +) -> None: + """Wrapper that acquires the LLM semaphore before context extraction.""" assert _llm_semaphore is not None async with _llm_semaphore: try: - await _process_concept(pool, concept_token) + await _process_context_extract(pool, concept_tokens, context_text) except Exception as e: - log.exception("concept LLM task error for %s: %s", concept_token, e) + log.exception("context extract task error for %s: %s", concept_tokens, e) async def stop_worker() -> None: @@ -138,11 +149,22 @@ async def _process_cue(pool: asyncpg.Pool, triple: CueTriple) -> None: log.info("cue triple collision: %s", collision) -async def _process_concept(pool: asyncpg.Pool, concept_token: str) -> None: - """Call cloud LLM to classify the concept, then insert all returned triples.""" +async def _process_context_extract( + pool: asyncpg.Pool, + concept_tokens: list[str], + context_text: str, +) -> None: + """ + Ask the local LLM to extract relationships for novel concepts from the + actual conversation context. One LLM call per prompt turn, not per concept. + + This is fundamentally better than asking about concepts in isolation: + the model reads what was actually said about e.g. 'gnommoweb' and asserts + only what the text supports — no hallucination, evidence-grounded confidence. + """ write_model_id = await get_config(pool, "write_model_id") if not write_model_id: - log.debug("no write_model_id configured — skipping LLM write for %s", concept_token) + log.debug("no write_model_id configured — skipping context extract") return model = await get_model_config(pool, write_model_id) @@ -150,32 +172,35 @@ async def _process_concept(pool: asyncpg.Pool, concept_token: str) -> None: log.warning("write_model_id=%s not found in models table", write_model_id) return - known_dims = list(cache.soas_by_token.keys()) - # Keep only seed dimensions + short list for prompt brevity seed_dims = ["type", "membership", "runs-on", "tech", "owned-by", "geography"] - dimensions_str = ", ".join(seed_dims) - - prompt = WRITE_PROMPT_TEMPLATE.format( - concept=concept_token, - dimensions=dimensions_str, + prompt = CONTEXT_EXTRACT_PROMPT_TEMPLATE.format( + concepts=", ".join(concept_tokens), + context=context_text[:3000], # cap to avoid exceeding model context + dimensions=", ".join(seed_dims), ) try: response = await call_llm(model, prompt) except Exception as e: - log.warning("LLM call failed for concept %s: %s", concept_token, e) + log.warning("LLM call failed for context extract %s: %s", concept_tokens, e) return - triples = parse_llm_triples(response) + triples = parse_context_triples(response) if not triples: - log.info("LLM returned no triples for concept: %s", concept_token) + log.info("LLM returned no context triples for: %s", concept_tokens) return - subj_row = await get_or_create_soas(pool, concept_token) - + concept_set = set(concept_tokens) + inserted = 0 for t in triples: + # Reject any concept the LLM invented that wasn't in our list + if not t.concept or t.concept not in concept_set: + log.debug("context extract: ignoring hallucinated concept %r", t.concept) + continue if not t.parent or not t.dimension: continue + + subj_row = await get_or_create_soas(pool, t.concept) parent_row = await get_or_create_soas(pool, t.parent) dim_row = await get_or_create_soas(pool, t.dimension) @@ -185,14 +210,13 @@ async def _process_concept(pool: asyncpg.Pool, concept_token: str) -> None: dim_id=dim_row.id, is_isa=t.is_isa, confidence=t.confidence, - source="cloud_llm", + source="context_llm", ) - await insert_urd_edge(pool, req) + collision = await insert_urd_edge(pool, req) + if not collision: + inserted += 1 - # Mark concept as confirmed — set novelty=1.0 - async with pool.acquire() as conn: - await conn.execute( - "UPDATE soas SET novelty = 1.0 WHERE id = $1", subj_row.id - ) - if concept_token in cache.soas_by_token: - cache.soas_by_token[concept_token].novelty = 1.0 + log.info( + "context extract complete: concepts=%s → %d triples → %d inserted", + concept_tokens, len(triples), inserted, + )