222 lines
7.7 KiB
Python
222 lines
7.7 KiB
Python
"""
|
|
Recollection engine — read path.
|
|
|
|
For each salient concept found in an intercepted prompt:
|
|
- Query URD edges (from in-memory cache, filtered by confidence)
|
|
- Rank by effective score (recency-decayed saliency + centrality bonus)
|
|
- Render hit or zero-hit block
|
|
- Inject the <recollection> block into the prompt
|
|
|
|
Scoring
|
|
-------
|
|
effective_score(c) = saliency(c) * recency_decay(c) + centrality_bonus(c)
|
|
|
|
recency_decay = exp(-days_since_last_seen / RECENCY_HALF_LIFE)
|
|
→ 1.0 if never seen (no penalty for brand-new concepts)
|
|
→ 0.5 at 30 days, 0.25 at 60 days, ~0.04 at 90 days
|
|
|
|
centrality_bonus = log1p(n_children)
|
|
→ 0 for leaf concepts
|
|
→ grows slowly with the number of concepts that use this
|
|
one as a parent (hub nodes surface first)
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import math
|
|
from datetime import datetime, timezone, timedelta
|
|
from typing import Optional
|
|
|
|
from . import cache
|
|
from .cache import SoasRow, UrdEdge
|
|
|
|
log = logging.getLogger("festinger.recollection")
|
|
|
|
ZERO_HIT_TEMPLATE = "You have no memory of {concept}. Try: memory_load query='{concept}' or gutask context {concept}"
|
|
|
|
# Half-life for recency decay: saliency halves after this many days of silence.
|
|
RECENCY_HALF_LIFE = 30.0
|
|
|
|
# Flat bonus added to effective_score for concepts found in the system message.
|
|
# Large enough to push grounding concepts above freshly-encountered domain words.
|
|
SESSION_BOOST = 2.0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Scoring
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def recency_decay(row: SoasRow) -> float:
|
|
"""
|
|
Exponential decay based on days since last_seen.
|
|
Returns 1.0 if last_seen is unknown (no penalty for fresh concepts).
|
|
"""
|
|
if row.last_seen is None:
|
|
return 1.0
|
|
now = datetime.now(tz=timezone.utc)
|
|
last = row.last_seen
|
|
if last.tzinfo is None:
|
|
last = last.replace(tzinfo=timezone.utc)
|
|
days = (now - last).total_seconds() / 86400.0
|
|
return math.exp(-days / RECENCY_HALF_LIFE)
|
|
|
|
|
|
def centrality_bonus(concept_id: int) -> float:
|
|
"""
|
|
log1p of the number of concepts that reference this one as a parent.
|
|
Hub concepts (many children) rise to the top of the recollection block.
|
|
"""
|
|
n_children = len(cache.urd_by_parent.get(concept_id, []))
|
|
return math.log1p(n_children)
|
|
|
|
|
|
def effective_score(concept_id: int, session_boosted: bool = False) -> float:
|
|
"""Combined score used to rank concepts within the recollection block."""
|
|
token = cache.soas_by_id.get(concept_id, "")
|
|
row = cache.soas_by_token.get(token)
|
|
if row is None:
|
|
return 0.0
|
|
base = row.saliency * recency_decay(row) + centrality_bonus(concept_id)
|
|
return base + (SESSION_BOOST if session_boosted else 0.0)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Query
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def query_edges(concept_id: int, confidence_floor: float) -> list[UrdEdge]:
|
|
"""Return URD edges for *concept_id* above the confidence floor."""
|
|
return [
|
|
e for e in cache.urd_by_concept.get(concept_id, [])
|
|
if e.confidence >= confidence_floor
|
|
]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Renderers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def render_hit(
|
|
concept_token: str,
|
|
edges: list[UrdEdge],
|
|
concept_id: int,
|
|
score: float,
|
|
reverse_edges: list[UrdEdge] | None = None,
|
|
) -> str:
|
|
"""
|
|
Render one concept's recollection line, e.g.:
|
|
gnommoweb [s=3.2]: [type] repo [membership] glitch_university [type←] omega13
|
|
The score annotation helps with debugging.
|
|
Pending-conflict dimensions get a '?' suffix.
|
|
"""
|
|
has_pending = concept_id in cache.pending_conflicts
|
|
parts = []
|
|
for e in edges:
|
|
dim_label = e.dim_token + ("?" if has_pending else "")
|
|
parts.append(f"[{dim_label}] {e.parent_token}")
|
|
for e in (reverse_edges or [])[:3]:
|
|
child_token = cache.soas_by_id.get(e.concept_id, str(e.concept_id))
|
|
parts.append(f"[{e.dim_token}←] {child_token}")
|
|
score_str = f"{score:.1f}"
|
|
return f"{concept_token} [s={score_str}]: {' '.join(parts)}"
|
|
|
|
|
|
def render_zero_hit(concept_token: str, score: float) -> str:
|
|
line = ZERO_HIT_TEMPLATE.format(concept=concept_token)
|
|
soas_row = cache.soas_by_token.get(concept_token)
|
|
if soas_row and soas_row.first_seen_context:
|
|
line += f" [first seen: '{soas_row.first_seen_context}']"
|
|
return line
|
|
|
|
|
|
def build_recollection_block(
|
|
salient_concept_ids: list[int],
|
|
confidence_floor: float,
|
|
recency_days: int, # kept for API compat, no longer used for edge filtering
|
|
max_lines: int = 12,
|
|
session_boost_ids: set[int] | None = None,
|
|
) -> Optional[str]:
|
|
"""
|
|
Build the <recollection> block, ranked by effective_score (recency-decayed
|
|
saliency + centrality bonus + optional session boost).
|
|
|
|
Concepts with known edges are shown first; zero-hit lines fill the remainder.
|
|
session_boost_ids: concept IDs found in the system message — ranked higher
|
|
to surface grounding context (agent persona, project name) first.
|
|
"""
|
|
boosted = session_boost_ids or set()
|
|
|
|
# Deduplicate and score
|
|
seen: set[int] = set()
|
|
scored: list[tuple[float, int]] = []
|
|
for cid in salient_concept_ids:
|
|
if cid in seen:
|
|
continue
|
|
seen.add(cid)
|
|
scored.append((effective_score(cid, session_boosted=(cid in boosted)), cid))
|
|
|
|
# Highest score first
|
|
scored.sort(key=lambda x: x[0], reverse=True)
|
|
|
|
hit_lines: list[str] = []
|
|
zero_lines: list[str] = []
|
|
|
|
for score, cid in scored:
|
|
token = cache.soas_by_id.get(cid, str(cid))
|
|
edges = query_edges(cid, confidence_floor)
|
|
reverse_edges = cache.urd_by_parent.get(cid, [])
|
|
|
|
if edges or reverse_edges:
|
|
hit_lines.append(render_hit(token, edges, cid, score, reverse_edges))
|
|
else:
|
|
zero_lines.append(render_zero_hit(token, score))
|
|
|
|
lines = hit_lines[:max_lines]
|
|
remaining = max_lines - len(lines)
|
|
lines += zero_lines[:remaining]
|
|
|
|
if not lines:
|
|
return None
|
|
|
|
# Footer: list the concepts that have actual URD data so the agent knows
|
|
# it can dig deeper via gutask recall.
|
|
hit_tokens = [
|
|
cache.soas_by_id.get(cid, str(cid))
|
|
for _, cid in scored
|
|
if query_edges(cid, confidence_floor) or cache.urd_by_parent.get(cid)
|
|
][:6]
|
|
footer_lines = []
|
|
if hit_tokens:
|
|
footer_lines.append(
|
|
"To recall more: gutask recall <concept> brief|detailed|everything"
|
|
)
|
|
footer_lines.append(" concepts: " + ", ".join(hit_tokens))
|
|
|
|
body = "\n".join(lines)
|
|
if footer_lines:
|
|
body += "\n" + "\n".join(footer_lines)
|
|
|
|
return "<recollection>\n" + body + "\n</recollection>"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Prompt injection
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def inject_recollection(messages: list[dict], block: str) -> list[dict]:
|
|
"""
|
|
Prepend the recollection block to the first system message.
|
|
If no system message exists, insert one at position 0.
|
|
Returns a new messages list (does not mutate the input).
|
|
"""
|
|
messages = list(messages)
|
|
for i, msg in enumerate(messages):
|
|
if msg.get("role") == "system":
|
|
messages[i] = dict(msg)
|
|
messages[i]["content"] = block + "\n\n" + (msg.get("content") or "")
|
|
return messages
|
|
|
|
# No system message — insert one
|
|
messages.insert(0, {"role": "system", "content": block})
|
|
return messages
|