478 lines
18 KiB
Python
478 lines
18 KiB
Python
"""
|
|
Database layer — asyncpg pool, schema init, cache warm-up, flush.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import math
|
|
import os
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
import asyncpg
|
|
|
|
from . import cache
|
|
from .cache import SoasRow, UrdEdge
|
|
|
|
log = logging.getLogger("festinger.db")
|
|
|
|
_pool: asyncpg.Pool | None = None
|
|
|
|
SCHEMA_PATH = Path(__file__).parent.parent / "db" / "schema.sql"
|
|
|
|
SEED_DIMENSIONS = [
|
|
("type", True),
|
|
("membership", False),
|
|
("runs-on", False),
|
|
("tech", False),
|
|
("owned-by", False),
|
|
("geography", False),
|
|
]
|
|
|
|
|
|
async def get_pool(dsn: str) -> asyncpg.Pool:
|
|
global _pool
|
|
if _pool is None:
|
|
_pool = await asyncpg.create_pool(dsn, min_size=2, max_size=10)
|
|
return _pool
|
|
|
|
|
|
async def close_pool() -> None:
|
|
global _pool
|
|
if _pool:
|
|
await _pool.close()
|
|
_pool = None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Schema init
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def init_schema(pool: asyncpg.Pool) -> None:
|
|
sql = SCHEMA_PATH.read_text()
|
|
async with pool.acquire() as conn:
|
|
await conn.execute(sql)
|
|
# Migration: base_url column for LM Studio / custom OpenAI-compatible endpoints
|
|
await conn.execute(
|
|
"ALTER TABLE models ADD COLUMN IF NOT EXISTS base_url TEXT NOT NULL DEFAULT ''"
|
|
)
|
|
# Migration: first_seen_context stores the sentence where a novel word first appeared
|
|
await conn.execute(
|
|
"ALTER TABLE soas ADD COLUMN IF NOT EXISTS first_seen_context TEXT NOT NULL DEFAULT ''"
|
|
)
|
|
# Migration: per-agent model assignments (agent_name → model_id)
|
|
await conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS agent_models (
|
|
agent_name TEXT PRIMARY KEY,
|
|
model_id INT NOT NULL REFERENCES models(id) ON DELETE CASCADE,
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
|
)
|
|
"""
|
|
)
|
|
# Migration: recollection event log
|
|
await conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS recollection_log (
|
|
id BIGSERIAL PRIMARY KEY,
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
|
agent_name TEXT NOT NULL DEFAULT '',
|
|
salient_tokens TEXT[] NOT NULL DEFAULT '{}',
|
|
recollection_block TEXT NOT NULL,
|
|
messages_json JSONB NOT NULL DEFAULT '[]'
|
|
)
|
|
"""
|
|
)
|
|
await conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS rl_created_idx ON recollection_log (created_at DESC)"
|
|
)
|
|
log.info("schema applied")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Bootstrap helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def bootstrap_dimensions(pool: asyncpg.Pool) -> None:
|
|
"""Ensure the 6 seed dimensions exist in SOAS and have self-referential root nodes."""
|
|
async with pool.acquire() as conn:
|
|
for token, _is_isa in SEED_DIMENSIONS:
|
|
row = await conn.fetchrow(
|
|
"INSERT INTO soas (token, saliency, novelty) VALUES ($1, 0, 0) "
|
|
"ON CONFLICT (token) DO UPDATE SET token = EXCLUDED.token "
|
|
"RETURNING id, token, encounter_count, saliency, novelty",
|
|
token,
|
|
)
|
|
dim_id = row["id"]
|
|
# Self-referential root node: id = parent_id = dim_id
|
|
await conn.execute(
|
|
"INSERT INTO urd (id, parent_id, dim_id, is_isa, confidence, source) "
|
|
"VALUES ($1, $1, $1, $2, 1.0, 'festinger') "
|
|
"ON CONFLICT DO NOTHING",
|
|
dim_id, True,
|
|
)
|
|
log.info("seed dimensions bootstrapped")
|
|
|
|
|
|
async def bootstrap_english_dictionary(pool: asyncpg.Pool) -> None:
|
|
"""
|
|
Bulk-load /usr/share/dict/words into SOAS at saliency=0, novelty=0.
|
|
Skips tokens already present. Only loads tokens >= 5 chars.
|
|
"""
|
|
dict_file = Path("/usr/share/dict/words")
|
|
if not dict_file.exists():
|
|
log.warning("no system dictionary found at %s — skipping bootstrap", dict_file)
|
|
return
|
|
|
|
words = [
|
|
w.strip().lower()
|
|
for w in dict_file.read_text().splitlines()
|
|
if len(w.strip()) >= 5 and w.strip().isalpha()
|
|
]
|
|
# Deduplicate
|
|
words = list(set(words))
|
|
log.info("loading %d dictionary words into soas …", len(words))
|
|
|
|
async with pool.acquire() as conn:
|
|
# Use executemany with ON CONFLICT DO NOTHING for speed
|
|
await conn.executemany(
|
|
"INSERT INTO soas (token, saliency, novelty) VALUES ($1, 0, 0) "
|
|
"ON CONFLICT (token) DO NOTHING",
|
|
[(w,) for w in words],
|
|
)
|
|
log.info("dictionary bootstrap complete")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Cache warm-up
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def warm_cache(pool: asyncpg.Pool) -> None:
|
|
"""Load all SOAS and URD rows into the in-memory cache."""
|
|
async with pool.acquire() as conn:
|
|
soas_rows = await conn.fetch(
|
|
"SELECT id, token, encounter_count, saliency, novelty, first_seen_context, last_seen FROM soas"
|
|
)
|
|
for r in soas_rows:
|
|
row = SoasRow(
|
|
id=r["id"],
|
|
token=r["token"],
|
|
encounter_count=r["encounter_count"],
|
|
saliency=r["saliency"],
|
|
novelty=r["novelty"],
|
|
first_seen_context=r["first_seen_context"] or "",
|
|
last_seen=r["last_seen"],
|
|
)
|
|
cache.soas_by_token[r["token"]] = row
|
|
cache.soas_by_id[r["id"]] = r["token"]
|
|
|
|
urd_rows = await conn.fetch(
|
|
"""
|
|
SELECT u.id, u.parent_id, u.dim_id, u.is_isa, u.confidence, u.source,
|
|
p.token AS parent_token, d.token AS dim_token
|
|
FROM urd u
|
|
INNER JOIN soas p ON p.id = u.parent_id
|
|
INNER JOIN soas d ON d.id = u.dim_id
|
|
-- skip self-referential dimension root nodes
|
|
WHERE NOT (u.id = u.parent_id AND u.id = u.dim_id)
|
|
"""
|
|
)
|
|
for r in urd_rows:
|
|
edge = UrdEdge(
|
|
concept_id=r["id"],
|
|
parent_id=r["parent_id"],
|
|
dim_id=r["dim_id"],
|
|
is_isa=r["is_isa"],
|
|
confidence=r["confidence"],
|
|
source=r["source"],
|
|
parent_token=r["parent_token"],
|
|
dim_token=r["dim_token"],
|
|
)
|
|
cache.urd_by_concept.setdefault(r["id"], []).append(edge)
|
|
cache.urd_by_concept_dim[(r["id"], r["dim_id"])] = edge
|
|
cache.urd_by_parent.setdefault(r["parent_id"], []).append(edge)
|
|
|
|
pending = await conn.fetch(
|
|
"SELECT DISTINCT concept_id FROM resolution_queue WHERE status = 'pending'"
|
|
)
|
|
cache.pending_conflicts = {r["concept_id"] for r in pending}
|
|
|
|
log.info(
|
|
"cache warm: %d soas, %d urd edges, %d pending conflicts",
|
|
len(cache.soas_by_token),
|
|
len(cache.urd_by_concept_dim),
|
|
len(cache.pending_conflicts),
|
|
)
|
|
|
|
|
|
async def reload_urd_cache(pool: asyncpg.Pool) -> None:
|
|
"""Rebuild only URD cache (called after nightly resolution job)."""
|
|
cache.urd_by_concept.clear()
|
|
cache.urd_by_concept_dim.clear()
|
|
cache.urd_by_parent.clear()
|
|
async with pool.acquire() as conn:
|
|
urd_rows = await conn.fetch(
|
|
"""
|
|
SELECT u.id, u.parent_id, u.dim_id, u.is_isa, u.confidence, u.source,
|
|
p.token AS parent_token, d.token AS dim_token
|
|
FROM urd u
|
|
INNER JOIN soas p ON p.id = u.parent_id
|
|
INNER JOIN soas d ON d.id = u.dim_id
|
|
WHERE NOT (u.id = u.parent_id AND u.id = u.dim_id)
|
|
"""
|
|
)
|
|
for r in urd_rows:
|
|
edge = UrdEdge(
|
|
concept_id=r["id"],
|
|
parent_id=r["parent_id"],
|
|
dim_id=r["dim_id"],
|
|
is_isa=r["is_isa"],
|
|
confidence=r["confidence"],
|
|
source=r["source"],
|
|
parent_token=r["parent_token"],
|
|
dim_token=r["dim_token"],
|
|
)
|
|
cache.urd_by_concept.setdefault(r["id"], []).append(edge)
|
|
cache.urd_by_concept_dim[(r["id"], r["dim_id"])] = edge
|
|
cache.urd_by_parent.setdefault(r["parent_id"], []).append(edge)
|
|
|
|
pending = await conn.fetch(
|
|
"SELECT DISTINCT concept_id FROM resolution_queue WHERE status = 'pending'"
|
|
)
|
|
cache.pending_conflicts = {r["concept_id"] for r in pending}
|
|
log.info("urd cache reloaded")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SOAS upsert — id always comes from Postgres
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def get_or_create_soas(pool: asyncpg.Pool, token: str) -> SoasRow:
|
|
"""Return cached SoasRow, inserting into Postgres + cache if new."""
|
|
if token in cache.soas_by_token:
|
|
return cache.soas_by_token[token]
|
|
|
|
async with pool.acquire() as conn:
|
|
row = await conn.fetchrow(
|
|
"INSERT INTO soas (token) VALUES ($1) "
|
|
"ON CONFLICT (token) DO UPDATE SET token = EXCLUDED.token "
|
|
"RETURNING id, token, encounter_count, saliency, novelty, first_seen_context",
|
|
token,
|
|
)
|
|
|
|
soas_row = SoasRow(
|
|
id=row["id"],
|
|
token=row["token"],
|
|
encounter_count=row["encounter_count"],
|
|
saliency=row["saliency"],
|
|
novelty=row["novelty"],
|
|
first_seen_context=row["first_seen_context"] or "",
|
|
)
|
|
cache.soas_by_token[token] = soas_row
|
|
cache.soas_by_id[row["id"]] = token
|
|
return soas_row
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Novel-word bootstrap
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# Saliency for a word first seen that is not in the standard English dictionary.
|
|
# Kept BELOW saliency_read_threshold (default 0.5) so the concept doesn't
|
|
# surface in recollection until the LLM has confirmed it is meaningful.
|
|
# Once the LLM inserts a URD edge, saliency is raised to NOVEL_CONFIRMED_SALIENCY.
|
|
NOVEL_INITIAL_SALIENCY = 0.1
|
|
|
|
# Saliency set after the LLM confirms a concept by generating a URD triple.
|
|
# Must be well above saliency_read_threshold so the concept becomes a
|
|
# recollection attractor on subsequent turns.
|
|
NOVEL_CONFIRMED_SALIENCY = 2.0
|
|
|
|
|
|
async def create_novel_soas(pool: asyncpg.Pool, token: str, context: str = "") -> SoasRow:
|
|
"""
|
|
Insert a domain-specific (non-dictionary) token with an initial saliency
|
|
high enough to trigger recollection on the very first encounter.
|
|
novelty=1.0 distinguishes these rows from common-English seeds.
|
|
context is the sentence fragment where the word was first encountered.
|
|
Idempotent — returns the existing row if already present.
|
|
"""
|
|
async with pool.acquire() as conn:
|
|
row = await conn.fetchrow(
|
|
"""
|
|
INSERT INTO soas (token, saliency, novelty, encounter_count, first_seen_context)
|
|
VALUES ($1, $2, 1.0, 1, $3)
|
|
ON CONFLICT (token) DO UPDATE
|
|
SET encounter_count = soas.encounter_count + 1,
|
|
last_seen = now()
|
|
RETURNING id, token, encounter_count, saliency, novelty, first_seen_context
|
|
""",
|
|
token, NOVEL_INITIAL_SALIENCY, context,
|
|
)
|
|
soas_row = SoasRow(
|
|
id=row["id"],
|
|
token=row["token"],
|
|
encounter_count=row["encounter_count"],
|
|
saliency=row["saliency"],
|
|
novelty=row["novelty"],
|
|
first_seen_context=row["first_seen_context"] or "",
|
|
)
|
|
cache.soas_by_token[token] = soas_row
|
|
cache.soas_by_id[row["id"]] = token
|
|
return soas_row
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Graph reset
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def reset_graph(pool: asyncpg.Pool) -> dict:
|
|
"""
|
|
Wipe all learned knowledge (URD edges, domain SOAS entries,
|
|
resolution queue, write log) while keeping the standard-English
|
|
dictionary seed (novelty=0) intact.
|
|
After the wipe, dimensions are re-bootstrapped and the cache is re-warmed.
|
|
"""
|
|
async with pool.acquire() as conn:
|
|
async with conn.transaction():
|
|
# Delete order must respect FK constraints:
|
|
# kg_write_log.resolution_queue_id → resolution_queue(id) [no cascade]
|
|
# kg_write_log.{concept,parent,dim}_id → soas(id)
|
|
# resolution_queue.{concept,existing_parent,incoming_parent,dim}_id → soas(id)
|
|
# urd.{id,parent_id,dim_id} → soas(id)
|
|
kw = await conn.execute("DELETE FROM kg_write_log")
|
|
rq = await conn.execute("DELETE FROM resolution_queue")
|
|
urd = await conn.execute("DELETE FROM urd")
|
|
# Keep only common-English seeds (novelty = 0); delete domain words
|
|
soas = await conn.execute("DELETE FROM soas WHERE novelty > 0")
|
|
|
|
def _count(result: str) -> int:
|
|
try:
|
|
return int(result.split()[-1])
|
|
except (ValueError, IndexError):
|
|
return 0
|
|
|
|
counts = {
|
|
"urd_deleted": _count(urd),
|
|
"soas_deleted": _count(soas),
|
|
"resolution_queue_deleted": _count(rq),
|
|
"kg_write_log_deleted": _count(kw),
|
|
}
|
|
|
|
# Clear in-memory state
|
|
cache.urd_by_concept.clear()
|
|
cache.urd_by_concept_dim.clear()
|
|
cache.urd_by_parent.clear()
|
|
cache.pending_conflicts.clear()
|
|
# Remove domain words from SOAS cache (keep novelty=0 entries)
|
|
domain_tokens = [t for t, r in list(cache.soas_by_token.items()) if r.novelty > 0]
|
|
for t in domain_tokens:
|
|
row = cache.soas_by_token.pop(t, None)
|
|
if row:
|
|
cache.soas_by_id.pop(row.id, None)
|
|
|
|
log.info("graph reset: %s", counts)
|
|
return counts
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Saliency recalculation (log-scale)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def recalculate_saliency(encounter_count: int, is_common_english: bool) -> float:
|
|
"""
|
|
Log-scaled saliency. Common English words are pre-seeded with count=0,
|
|
novelty=0 and will always return 0. Domain tokens start at count=1 after
|
|
first encounter; saliency grows logarithmically.
|
|
"""
|
|
if is_common_english or encounter_count <= 0:
|
|
return 0.0
|
|
return math.log1p(encounter_count)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Batch saliency flush
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def flush_encounter_deltas(pool: asyncpg.Pool) -> None:
|
|
"""
|
|
Flush staged encounter_count deltas to Postgres.
|
|
|
|
Saliency model:
|
|
saliency = 0 → common English word (dictionary seed). Never touched.
|
|
saliency ≥ 1 → domain concept. Incremented by +delta on each flush.
|
|
|
|
saliency=1 is set by urd_writer when a concept first gets a URD edge,
|
|
meaning "mentioned at least once in a meaningful context". Each flush
|
|
adds the number of new encounters since the last flush, so saliency
|
|
equals the total number of times the concept has been seen by the agent.
|
|
"""
|
|
deltas = cache.drain_deltas()
|
|
if not deltas:
|
|
return
|
|
|
|
async with pool.acquire() as conn:
|
|
async with conn.transaction():
|
|
for soas_id, delta in deltas.items():
|
|
token = cache.soas_by_id.get(soas_id, "")
|
|
row = cache.soas_by_token.get(token)
|
|
if not row or row.saliency == 0.0:
|
|
# saliency=0 is the English-word sentinel — never increment.
|
|
continue
|
|
new_saliency = row.saliency + delta
|
|
await conn.execute(
|
|
"""
|
|
UPDATE soas
|
|
SET encounter_count = encounter_count + $1,
|
|
last_seen = now(),
|
|
saliency = $2
|
|
WHERE id = $3
|
|
""",
|
|
delta, new_saliency, soas_id,
|
|
)
|
|
row.saliency = new_saliency
|
|
row.last_seen = datetime.now(timezone.utc)
|
|
|
|
log.debug("flushed %d saliency deltas", len(deltas))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Config helper
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def get_config(pool: asyncpg.Pool, key: str, default: str = "") -> str:
|
|
async with pool.acquire() as conn:
|
|
row = await conn.fetchrow("SELECT value FROM config WHERE key = $1", key)
|
|
return row["value"] if row else default
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Recollection log
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def log_recollection(
|
|
pool: asyncpg.Pool,
|
|
agent_name: str,
|
|
salient_tokens: list[str],
|
|
recollection_block: str,
|
|
messages: list[dict],
|
|
) -> None:
|
|
"""Persist one recollection event. Fire-and-forget — never raises."""
|
|
try:
|
|
async with pool.acquire() as conn:
|
|
await conn.execute(
|
|
"""
|
|
INSERT INTO recollection_log
|
|
(agent_name, salient_tokens, recollection_block, messages_json)
|
|
VALUES ($1, $2, $3, $4)
|
|
""",
|
|
agent_name or "",
|
|
salient_tokens,
|
|
recollection_block,
|
|
json.dumps(messages),
|
|
)
|
|
except Exception:
|
|
log.exception("recollection_log insert failed")
|