Files
agent0/plugins/festinger/festinger/db.py
T
2026-04-21 18:08:22 +02:00

404 lines
15 KiB
Python

"""
Database layer — asyncpg pool, schema init, cache warm-up, flush.
"""
from __future__ import annotations
import asyncio
import logging
import math
import os
from pathlib import Path
import asyncpg
from . import cache
from .cache import SoasRow, UrdEdge
log = logging.getLogger("festinger.db")
_pool: asyncpg.Pool | None = None
SCHEMA_PATH = Path(__file__).parent.parent / "db" / "schema.sql"
SEED_DIMENSIONS = [
("type", True),
("membership", False),
("runs-on", False),
("tech", False),
("owned-by", False),
("geography", False),
]
async def get_pool(dsn: str) -> asyncpg.Pool:
global _pool
if _pool is None:
_pool = await asyncpg.create_pool(dsn, min_size=2, max_size=10)
return _pool
async def close_pool() -> None:
global _pool
if _pool:
await _pool.close()
_pool = None
# ---------------------------------------------------------------------------
# Schema init
# ---------------------------------------------------------------------------
async def init_schema(pool: asyncpg.Pool) -> None:
sql = SCHEMA_PATH.read_text()
async with pool.acquire() as conn:
await conn.execute(sql)
# Migration: base_url column for LM Studio / custom OpenAI-compatible endpoints
await conn.execute(
"ALTER TABLE models ADD COLUMN IF NOT EXISTS base_url TEXT NOT NULL DEFAULT ''"
)
# Migration: first_seen_context stores the sentence where a novel word first appeared
await conn.execute(
"ALTER TABLE soas ADD COLUMN IF NOT EXISTS first_seen_context TEXT NOT NULL DEFAULT ''"
)
log.info("schema applied")
# ---------------------------------------------------------------------------
# Bootstrap helpers
# ---------------------------------------------------------------------------
async def bootstrap_dimensions(pool: asyncpg.Pool) -> None:
"""Ensure the 6 seed dimensions exist in SOAS and have self-referential root nodes."""
async with pool.acquire() as conn:
for token, _is_isa in SEED_DIMENSIONS:
row = await conn.fetchrow(
"INSERT INTO soas (token, saliency, novelty) VALUES ($1, 0, 0) "
"ON CONFLICT (token) DO UPDATE SET token = EXCLUDED.token "
"RETURNING id, token, encounter_count, saliency, novelty",
token,
)
dim_id = row["id"]
# Self-referential root node: id = parent_id = dim_id
await conn.execute(
"INSERT INTO urd (id, parent_id, dim_id, is_isa, confidence, source) "
"VALUES ($1, $1, $1, $2, 1.0, 'festinger') "
"ON CONFLICT DO NOTHING",
dim_id, True,
)
log.info("seed dimensions bootstrapped")
async def bootstrap_english_dictionary(pool: asyncpg.Pool) -> None:
"""
Bulk-load /usr/share/dict/words into SOAS at saliency=0, novelty=0.
Skips tokens already present. Only loads tokens >= 5 chars.
"""
dict_file = Path("/usr/share/dict/words")
if not dict_file.exists():
log.warning("no system dictionary found at %s — skipping bootstrap", dict_file)
return
words = [
w.strip().lower()
for w in dict_file.read_text().splitlines()
if len(w.strip()) >= 5 and w.strip().isalpha()
]
# Deduplicate
words = list(set(words))
log.info("loading %d dictionary words into soas …", len(words))
async with pool.acquire() as conn:
# Use executemany with ON CONFLICT DO NOTHING for speed
await conn.executemany(
"INSERT INTO soas (token, saliency, novelty) VALUES ($1, 0, 0) "
"ON CONFLICT (token) DO NOTHING",
[(w,) for w in words],
)
log.info("dictionary bootstrap complete")
# ---------------------------------------------------------------------------
# Cache warm-up
# ---------------------------------------------------------------------------
async def warm_cache(pool: asyncpg.Pool) -> None:
"""Load all SOAS and URD rows into the in-memory cache."""
async with pool.acquire() as conn:
soas_rows = await conn.fetch(
"SELECT id, token, encounter_count, saliency, novelty, first_seen_context FROM soas"
)
for r in soas_rows:
row = SoasRow(
id=r["id"],
token=r["token"],
encounter_count=r["encounter_count"],
saliency=r["saliency"],
novelty=r["novelty"],
first_seen_context=r["first_seen_context"] or "",
)
cache.soas_by_token[r["token"]] = row
cache.soas_by_id[r["id"]] = r["token"]
urd_rows = await conn.fetch(
"""
SELECT u.id, u.parent_id, u.dim_id, u.is_isa, u.confidence, u.source,
p.token AS parent_token, d.token AS dim_token
FROM urd u
INNER JOIN soas p ON p.id = u.parent_id
INNER JOIN soas d ON d.id = u.dim_id
-- skip self-referential dimension root nodes
WHERE NOT (u.id = u.parent_id AND u.id = u.dim_id)
"""
)
for r in urd_rows:
edge = UrdEdge(
concept_id=r["id"],
parent_id=r["parent_id"],
dim_id=r["dim_id"],
is_isa=r["is_isa"],
confidence=r["confidence"],
source=r["source"],
parent_token=r["parent_token"],
dim_token=r["dim_token"],
)
cache.urd_by_concept.setdefault(r["id"], []).append(edge)
cache.urd_by_concept_dim[(r["id"], r["dim_id"])] = edge
cache.urd_by_parent.setdefault(r["parent_id"], []).append(edge)
pending = await conn.fetch(
"SELECT DISTINCT concept_id FROM resolution_queue WHERE status = 'pending'"
)
cache.pending_conflicts = {r["concept_id"] for r in pending}
log.info(
"cache warm: %d soas, %d urd edges, %d pending conflicts",
len(cache.soas_by_token),
len(cache.urd_by_concept_dim),
len(cache.pending_conflicts),
)
async def reload_urd_cache(pool: asyncpg.Pool) -> None:
"""Rebuild only URD cache (called after nightly resolution job)."""
cache.urd_by_concept.clear()
cache.urd_by_concept_dim.clear()
cache.urd_by_parent.clear()
async with pool.acquire() as conn:
urd_rows = await conn.fetch(
"""
SELECT u.id, u.parent_id, u.dim_id, u.is_isa, u.confidence, u.source,
p.token AS parent_token, d.token AS dim_token
FROM urd u
INNER JOIN soas p ON p.id = u.parent_id
INNER JOIN soas d ON d.id = u.dim_id
WHERE NOT (u.id = u.parent_id AND u.id = u.dim_id)
"""
)
for r in urd_rows:
edge = UrdEdge(
concept_id=r["id"],
parent_id=r["parent_id"],
dim_id=r["dim_id"],
is_isa=r["is_isa"],
confidence=r["confidence"],
source=r["source"],
parent_token=r["parent_token"],
dim_token=r["dim_token"],
)
cache.urd_by_concept.setdefault(r["id"], []).append(edge)
cache.urd_by_concept_dim[(r["id"], r["dim_id"])] = edge
cache.urd_by_parent.setdefault(r["parent_id"], []).append(edge)
pending = await conn.fetch(
"SELECT DISTINCT concept_id FROM resolution_queue WHERE status = 'pending'"
)
cache.pending_conflicts = {r["concept_id"] for r in pending}
log.info("urd cache reloaded")
# ---------------------------------------------------------------------------
# SOAS upsert — id always comes from Postgres
# ---------------------------------------------------------------------------
async def get_or_create_soas(pool: asyncpg.Pool, token: str) -> SoasRow:
"""Return cached SoasRow, inserting into Postgres + cache if new."""
if token in cache.soas_by_token:
return cache.soas_by_token[token]
async with pool.acquire() as conn:
row = await conn.fetchrow(
"INSERT INTO soas (token) VALUES ($1) "
"ON CONFLICT (token) DO UPDATE SET token = EXCLUDED.token "
"RETURNING id, token, encounter_count, saliency, novelty, first_seen_context",
token,
)
soas_row = SoasRow(
id=row["id"],
token=row["token"],
encounter_count=row["encounter_count"],
saliency=row["saliency"],
novelty=row["novelty"],
first_seen_context=row["first_seen_context"] or "",
)
cache.soas_by_token[token] = soas_row
cache.soas_by_id[row["id"]] = token
return soas_row
# ---------------------------------------------------------------------------
# Novel-word bootstrap
# ---------------------------------------------------------------------------
# Saliency assigned to a word seen for the first time that is not in the
# standard English dictionary. Must be above saliency_read_threshold (0.5)
# so the recollection engine immediately picks it up.
NOVEL_INITIAL_SALIENCY = 2.0
async def create_novel_soas(pool: asyncpg.Pool, token: str, context: str = "") -> SoasRow:
"""
Insert a domain-specific (non-dictionary) token with an initial saliency
high enough to trigger recollection on the very first encounter.
novelty=1.0 distinguishes these rows from common-English seeds.
context is the sentence fragment where the word was first encountered.
Idempotent — returns the existing row if already present.
"""
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO soas (token, saliency, novelty, encounter_count, first_seen_context)
VALUES ($1, $2, 1.0, 1, $3)
ON CONFLICT (token) DO UPDATE
SET encounter_count = soas.encounter_count + 1,
last_seen = now()
RETURNING id, token, encounter_count, saliency, novelty, first_seen_context
""",
token, NOVEL_INITIAL_SALIENCY, context,
)
soas_row = SoasRow(
id=row["id"],
token=row["token"],
encounter_count=row["encounter_count"],
saliency=row["saliency"],
novelty=row["novelty"],
first_seen_context=row["first_seen_context"] or "",
)
cache.soas_by_token[token] = soas_row
cache.soas_by_id[row["id"]] = token
return soas_row
# ---------------------------------------------------------------------------
# Graph reset
# ---------------------------------------------------------------------------
async def reset_graph(pool: asyncpg.Pool) -> dict:
"""
Wipe all learned knowledge (URD edges, domain SOAS entries,
resolution queue, write log) while keeping the standard-English
dictionary seed (novelty=0) intact.
After the wipe, dimensions are re-bootstrapped and the cache is re-warmed.
"""
async with pool.acquire() as conn:
async with conn.transaction():
# Delete order must respect FK constraints:
# kg_write_log.resolution_queue_id → resolution_queue(id) [no cascade]
# kg_write_log.{concept,parent,dim}_id → soas(id)
# resolution_queue.{concept,existing_parent,incoming_parent,dim}_id → soas(id)
# urd.{id,parent_id,dim_id} → soas(id)
kw = await conn.execute("DELETE FROM kg_write_log")
rq = await conn.execute("DELETE FROM resolution_queue")
urd = await conn.execute("DELETE FROM urd")
# Keep only common-English seeds (novelty = 0); delete domain words
soas = await conn.execute("DELETE FROM soas WHERE novelty > 0")
def _count(result: str) -> int:
try:
return int(result.split()[-1])
except (ValueError, IndexError):
return 0
counts = {
"urd_deleted": _count(urd),
"soas_deleted": _count(soas),
"resolution_queue_deleted": _count(rq),
"kg_write_log_deleted": _count(kw),
}
# Clear in-memory state
cache.urd_by_concept.clear()
cache.urd_by_concept_dim.clear()
cache.urd_by_parent.clear()
cache.pending_conflicts.clear()
# Remove domain words from SOAS cache (keep novelty=0 entries)
domain_tokens = [t for t, r in list(cache.soas_by_token.items()) if r.novelty > 0]
for t in domain_tokens:
row = cache.soas_by_token.pop(t, None)
if row:
cache.soas_by_id.pop(row.id, None)
log.info("graph reset: %s", counts)
return counts
# ---------------------------------------------------------------------------
# Saliency recalculation (log-scale)
# ---------------------------------------------------------------------------
def recalculate_saliency(encounter_count: int, is_common_english: bool) -> float:
"""
Log-scaled saliency. Common English words are pre-seeded with count=0,
novelty=0 and will always return 0. Domain tokens start at count=1 after
first encounter; saliency grows logarithmically.
"""
if is_common_english or encounter_count <= 0:
return 0.0
return math.log1p(encounter_count)
# ---------------------------------------------------------------------------
# Batch saliency flush
# ---------------------------------------------------------------------------
async def flush_encounter_deltas(pool: asyncpg.Pool) -> None:
"""Flush staged encounter_count deltas to Postgres in one batch UPDATE."""
deltas = cache.drain_deltas()
if not deltas:
return
async with pool.acquire() as conn:
async with conn.transaction():
for soas_id, delta in deltas.items():
token = cache.soas_by_id.get(soas_id, "")
row = cache.soas_by_token.get(token)
new_count = (row.encounter_count if row else 0)
# novelty = 0 for common English words (pre-seeded)
is_common = (row.novelty == 0.0 and row.saliency == 0.0) if row else False
new_saliency = recalculate_saliency(new_count, is_common)
await conn.execute(
"""
UPDATE soas
SET encounter_count = encounter_count + $1,
last_seen = now(),
saliency = $2
WHERE id = $3
""",
delta, new_saliency, soas_id,
)
if row:
row.saliency = new_saliency
log.debug("flushed %d saliency deltas", len(deltas))
# ---------------------------------------------------------------------------
# Config helper
# ---------------------------------------------------------------------------
async def get_config(pool: asyncpg.Pool, key: str, default: str = "") -> str:
async with pool.acquire() as conn:
row = await conn.fetchrow("SELECT value FROM config WHERE key = $1", key)
return row["value"] if row else default