Files
agent0/plugins/festinger/festinger/db.py
T

515 lines
20 KiB
Python

"""
Database layer — asyncpg pool, schema init, cache warm-up, flush.
"""
from __future__ import annotations
import asyncio
import json
import logging
import math
import os
from datetime import datetime, timezone
from pathlib import Path
import asyncpg
from . import cache
from .cache import SoasRow, UrdEdge
log = logging.getLogger("festinger.db")
_pool: asyncpg.Pool | None = None
SCHEMA_PATH = Path(__file__).parent.parent / "db" / "schema.sql"
SEED_DIMENSIONS = [
("type", True),
("membership", False),
("runs-on", False),
("tech", False),
("owned-by", False),
("geography", False),
]
async def get_pool(dsn: str) -> asyncpg.Pool:
global _pool
if _pool is None:
_pool = await asyncpg.create_pool(dsn, min_size=2, max_size=10)
return _pool
async def close_pool() -> None:
global _pool
if _pool:
await _pool.close()
_pool = None
# ---------------------------------------------------------------------------
# Schema init
# ---------------------------------------------------------------------------
async def init_schema(pool: asyncpg.Pool) -> None:
sql = SCHEMA_PATH.read_text()
async with pool.acquire() as conn:
await conn.execute(sql)
# Migration: base_url column for LM Studio / custom OpenAI-compatible endpoints
await conn.execute(
"ALTER TABLE models ADD COLUMN IF NOT EXISTS base_url TEXT NOT NULL DEFAULT ''"
)
# Migration: first_seen_context stores the sentence where a novel word first appeared
await conn.execute(
"ALTER TABLE soas ADD COLUMN IF NOT EXISTS first_seen_context TEXT NOT NULL DEFAULT ''"
)
# Migration: per-agent model assignments (agent_name → model_id)
await conn.execute(
"""
CREATE TABLE IF NOT EXISTS agent_models (
agent_name TEXT PRIMARY KEY,
model_id INT NOT NULL REFERENCES models(id) ON DELETE CASCADE,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
)
"""
)
# Migration: recollection event log
await conn.execute(
"""
CREATE TABLE IF NOT EXISTS recollection_log (
id BIGSERIAL PRIMARY KEY,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
agent_name TEXT NOT NULL DEFAULT '',
salient_tokens TEXT[] NOT NULL DEFAULT '{}',
recollection_block TEXT NOT NULL,
messages_json JSONB NOT NULL DEFAULT '[]'
)
"""
)
await conn.execute(
"CREATE INDEX IF NOT EXISTS rl_created_idx ON recollection_log (created_at DESC)"
)
# Migration: per-model context length (0 = don't inject num_ctx)
await conn.execute(
"ALTER TABLE models ADD COLUMN IF NOT EXISTS ctx_length INT NOT NULL DEFAULT 0"
)
# Migration: agent framework routing (agent_id → Agent Zero endpoint + key)
await conn.execute(
"""
CREATE TABLE IF NOT EXISTS agent_frameworks (
agent_id INTEGER PRIMARY KEY,
endpoint_url TEXT NOT NULL,
api_key TEXT NOT NULL DEFAULT '',
label TEXT NOT NULL DEFAULT '',
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
)
"""
)
# Seed default agent frameworks (INSERT OR IGNORE — never overwrites manual changes).
# API keys are mcp_server_token values derived from each agent's fixed
# A0_PERSISTENT_RUNTIME_ID + AUTH_LOGIN + AUTH_PASSWORD (see agents/<name>/.env).
# Endpoint URLs use Docker container names on the internal network.
for agent_id, label, endpoint_url, api_key in [
(1, "dobby", "http://dobby:80", "-d1yhCLT72cEFpiD"),
(2, "gemma", "http://gemma:80", "71I61Jd54p9wy20P"),
(3, "gunnar", "http://gunnar:80", "00oDLpLbWuS16IzE"),
(5, "rind", "http://rind:80", "3GRS5iP91Y2qQNLr"),
(6, "abyssinthia", "http://abyssinthia:80", "_XxQlg7qAxhmlyJh"),
(8, "hermes", "http://hermes:80", "7giYJjjk38c7IsYj"),
]:
await conn.execute(
"""
INSERT INTO agent_frameworks (agent_id, label, endpoint_url, api_key)
VALUES ($1, $2, $3, $4)
ON CONFLICT (agent_id) DO NOTHING
""",
agent_id, label, endpoint_url, api_key,
)
log.info("schema applied")
# ---------------------------------------------------------------------------
# Bootstrap helpers
# ---------------------------------------------------------------------------
async def bootstrap_dimensions(pool: asyncpg.Pool) -> None:
"""Ensure the 6 seed dimensions exist in SOAS and have self-referential root nodes."""
async with pool.acquire() as conn:
for token, _is_isa in SEED_DIMENSIONS:
row = await conn.fetchrow(
"INSERT INTO soas (token, saliency, novelty) VALUES ($1, 0, 0) "
"ON CONFLICT (token) DO UPDATE SET token = EXCLUDED.token "
"RETURNING id, token, encounter_count, saliency, novelty",
token,
)
dim_id = row["id"]
# Self-referential root node: id = parent_id = dim_id
await conn.execute(
"INSERT INTO urd (id, parent_id, dim_id, is_isa, confidence, source) "
"VALUES ($1, $1, $1, $2, 1.0, 'festinger') "
"ON CONFLICT DO NOTHING",
dim_id, True,
)
log.info("seed dimensions bootstrapped")
async def bootstrap_english_dictionary(pool: asyncpg.Pool) -> None:
"""
Bulk-load /usr/share/dict/words into SOAS at saliency=0, novelty=0.
Skips tokens already present. Only loads tokens >= 5 chars.
"""
dict_file = Path("/usr/share/dict/words")
if not dict_file.exists():
log.warning("no system dictionary found at %s — skipping bootstrap", dict_file)
return
words = [
w.strip().lower()
for w in dict_file.read_text().splitlines()
if len(w.strip()) >= 5 and w.strip().isalpha()
]
# Deduplicate
words = list(set(words))
log.info("loading %d dictionary words into soas …", len(words))
async with pool.acquire() as conn:
# Use executemany with ON CONFLICT DO NOTHING for speed
await conn.executemany(
"INSERT INTO soas (token, saliency, novelty) VALUES ($1, 0, 0) "
"ON CONFLICT (token) DO NOTHING",
[(w,) for w in words],
)
log.info("dictionary bootstrap complete")
# ---------------------------------------------------------------------------
# Cache warm-up
# ---------------------------------------------------------------------------
async def warm_cache(pool: asyncpg.Pool) -> None:
"""Load all SOAS and URD rows into the in-memory cache."""
async with pool.acquire() as conn:
soas_rows = await conn.fetch(
"SELECT id, token, encounter_count, saliency, novelty, first_seen_context, last_seen FROM soas"
)
for r in soas_rows:
row = SoasRow(
id=r["id"],
token=r["token"],
encounter_count=r["encounter_count"],
saliency=r["saliency"],
novelty=r["novelty"],
first_seen_context=r["first_seen_context"] or "",
last_seen=r["last_seen"],
)
cache.soas_by_token[r["token"]] = row
cache.soas_by_id[r["id"]] = r["token"]
urd_rows = await conn.fetch(
"""
SELECT u.id, u.parent_id, u.dim_id, u.is_isa, u.confidence, u.source,
p.token AS parent_token, d.token AS dim_token
FROM urd u
INNER JOIN soas p ON p.id = u.parent_id
INNER JOIN soas d ON d.id = u.dim_id
-- skip self-referential dimension root nodes
WHERE NOT (u.id = u.parent_id AND u.id = u.dim_id)
"""
)
for r in urd_rows:
edge = UrdEdge(
concept_id=r["id"],
parent_id=r["parent_id"],
dim_id=r["dim_id"],
is_isa=r["is_isa"],
confidence=r["confidence"],
source=r["source"],
parent_token=r["parent_token"],
dim_token=r["dim_token"],
)
cache.urd_by_concept.setdefault(r["id"], []).append(edge)
cache.urd_by_concept_dim[(r["id"], r["dim_id"])] = edge
cache.urd_by_parent.setdefault(r["parent_id"], []).append(edge)
pending = await conn.fetch(
"SELECT DISTINCT concept_id FROM resolution_queue WHERE status = 'pending'"
)
cache.pending_conflicts = {r["concept_id"] for r in pending}
log.info(
"cache warm: %d soas, %d urd edges, %d pending conflicts",
len(cache.soas_by_token),
len(cache.urd_by_concept_dim),
len(cache.pending_conflicts),
)
async def reload_urd_cache(pool: asyncpg.Pool) -> None:
"""Rebuild only URD cache (called after nightly resolution job)."""
cache.urd_by_concept.clear()
cache.urd_by_concept_dim.clear()
cache.urd_by_parent.clear()
async with pool.acquire() as conn:
urd_rows = await conn.fetch(
"""
SELECT u.id, u.parent_id, u.dim_id, u.is_isa, u.confidence, u.source,
p.token AS parent_token, d.token AS dim_token
FROM urd u
INNER JOIN soas p ON p.id = u.parent_id
INNER JOIN soas d ON d.id = u.dim_id
WHERE NOT (u.id = u.parent_id AND u.id = u.dim_id)
"""
)
for r in urd_rows:
edge = UrdEdge(
concept_id=r["id"],
parent_id=r["parent_id"],
dim_id=r["dim_id"],
is_isa=r["is_isa"],
confidence=r["confidence"],
source=r["source"],
parent_token=r["parent_token"],
dim_token=r["dim_token"],
)
cache.urd_by_concept.setdefault(r["id"], []).append(edge)
cache.urd_by_concept_dim[(r["id"], r["dim_id"])] = edge
cache.urd_by_parent.setdefault(r["parent_id"], []).append(edge)
pending = await conn.fetch(
"SELECT DISTINCT concept_id FROM resolution_queue WHERE status = 'pending'"
)
cache.pending_conflicts = {r["concept_id"] for r in pending}
log.info("urd cache reloaded")
# ---------------------------------------------------------------------------
# SOAS upsert — id always comes from Postgres
# ---------------------------------------------------------------------------
async def get_or_create_soas(pool: asyncpg.Pool, token: str) -> SoasRow:
"""Return cached SoasRow, inserting into Postgres + cache if new."""
if token in cache.soas_by_token:
return cache.soas_by_token[token]
async with pool.acquire() as conn:
row = await conn.fetchrow(
"INSERT INTO soas (token) VALUES ($1) "
"ON CONFLICT (token) DO UPDATE SET token = EXCLUDED.token "
"RETURNING id, token, encounter_count, saliency, novelty, first_seen_context",
token,
)
soas_row = SoasRow(
id=row["id"],
token=row["token"],
encounter_count=row["encounter_count"],
saliency=row["saliency"],
novelty=row["novelty"],
first_seen_context=row["first_seen_context"] or "",
)
cache.soas_by_token[token] = soas_row
cache.soas_by_id[row["id"]] = token
return soas_row
# ---------------------------------------------------------------------------
# Novel-word bootstrap
# ---------------------------------------------------------------------------
# Saliency for a word first seen that is not in the standard English dictionary.
# Kept BELOW saliency_read_threshold (default 0.5) so the concept doesn't
# surface in recollection until the LLM has confirmed it is meaningful.
# Once the LLM inserts a URD edge, saliency is raised to NOVEL_CONFIRMED_SALIENCY.
NOVEL_INITIAL_SALIENCY = 0.1
# Saliency set after the LLM confirms a concept by generating a URD triple.
# Must be well above saliency_read_threshold so the concept becomes a
# recollection attractor on subsequent turns.
NOVEL_CONFIRMED_SALIENCY = 2.0
async def create_novel_soas(pool: asyncpg.Pool, token: str, context: str = "") -> SoasRow:
"""
Insert a domain-specific (non-dictionary) token with an initial saliency
high enough to trigger recollection on the very first encounter.
novelty=1.0 distinguishes these rows from common-English seeds.
context is the sentence fragment where the word was first encountered.
Idempotent — returns the existing row if already present.
"""
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO soas (token, saliency, novelty, encounter_count, first_seen_context)
VALUES ($1, $2, 1.0, 1, $3)
ON CONFLICT (token) DO UPDATE
SET encounter_count = soas.encounter_count + 1,
last_seen = now()
RETURNING id, token, encounter_count, saliency, novelty, first_seen_context
""",
token, NOVEL_INITIAL_SALIENCY, context,
)
soas_row = SoasRow(
id=row["id"],
token=row["token"],
encounter_count=row["encounter_count"],
saliency=row["saliency"],
novelty=row["novelty"],
first_seen_context=row["first_seen_context"] or "",
)
cache.soas_by_token[token] = soas_row
cache.soas_by_id[row["id"]] = token
return soas_row
# ---------------------------------------------------------------------------
# Graph reset
# ---------------------------------------------------------------------------
async def reset_graph(pool: asyncpg.Pool) -> dict:
"""
Wipe all learned knowledge (URD edges, domain SOAS entries,
resolution queue, write log) while keeping the standard-English
dictionary seed (novelty=0) intact.
After the wipe, dimensions are re-bootstrapped and the cache is re-warmed.
"""
async with pool.acquire() as conn:
async with conn.transaction():
# Delete order must respect FK constraints:
# kg_write_log.resolution_queue_id → resolution_queue(id) [no cascade]
# kg_write_log.{concept,parent,dim}_id → soas(id)
# resolution_queue.{concept,existing_parent,incoming_parent,dim}_id → soas(id)
# urd.{id,parent_id,dim_id} → soas(id)
kw = await conn.execute("DELETE FROM kg_write_log")
rq = await conn.execute("DELETE FROM resolution_queue")
urd = await conn.execute("DELETE FROM urd")
# Keep only common-English seeds (novelty = 0); delete domain words
soas = await conn.execute("DELETE FROM soas WHERE novelty > 0")
def _count(result: str) -> int:
try:
return int(result.split()[-1])
except (ValueError, IndexError):
return 0
counts = {
"urd_deleted": _count(urd),
"soas_deleted": _count(soas),
"resolution_queue_deleted": _count(rq),
"kg_write_log_deleted": _count(kw),
}
# Clear in-memory state
cache.urd_by_concept.clear()
cache.urd_by_concept_dim.clear()
cache.urd_by_parent.clear()
cache.pending_conflicts.clear()
# Remove domain words from SOAS cache (keep novelty=0 entries)
domain_tokens = [t for t, r in list(cache.soas_by_token.items()) if r.novelty > 0]
for t in domain_tokens:
row = cache.soas_by_token.pop(t, None)
if row:
cache.soas_by_id.pop(row.id, None)
log.info("graph reset: %s", counts)
return counts
# ---------------------------------------------------------------------------
# Saliency recalculation (log-scale)
# ---------------------------------------------------------------------------
def recalculate_saliency(encounter_count: int, is_common_english: bool) -> float:
"""
Log-scaled saliency. Common English words are pre-seeded with count=0,
novelty=0 and will always return 0. Domain tokens start at count=1 after
first encounter; saliency grows logarithmically.
"""
if is_common_english or encounter_count <= 0:
return 0.0
return math.log1p(encounter_count)
# ---------------------------------------------------------------------------
# Batch saliency flush
# ---------------------------------------------------------------------------
async def flush_encounter_deltas(pool: asyncpg.Pool) -> None:
"""
Flush staged encounter_count deltas to Postgres.
Saliency model:
saliency = 0 → common English word (dictionary seed). Never touched.
saliency ≥ 1 → domain concept. Incremented by +delta on each flush.
saliency=1 is set by urd_writer when a concept first gets a URD edge,
meaning "mentioned at least once in a meaningful context". Each flush
adds the number of new encounters since the last flush, so saliency
equals the total number of times the concept has been seen by the agent.
"""
deltas = cache.drain_deltas()
if not deltas:
return
async with pool.acquire() as conn:
async with conn.transaction():
for soas_id, delta in deltas.items():
token = cache.soas_by_id.get(soas_id, "")
row = cache.soas_by_token.get(token)
if not row or row.saliency == 0.0:
# saliency=0 is the English-word sentinel — never increment.
continue
new_saliency = row.saliency + delta
await conn.execute(
"""
UPDATE soas
SET encounter_count = encounter_count + $1,
last_seen = now(),
saliency = $2
WHERE id = $3
""",
delta, new_saliency, soas_id,
)
row.saliency = new_saliency
row.last_seen = datetime.now(timezone.utc)
log.debug("flushed %d saliency deltas", len(deltas))
# ---------------------------------------------------------------------------
# Config helper
# ---------------------------------------------------------------------------
async def get_config(pool: asyncpg.Pool, key: str, default: str = "") -> str:
async with pool.acquire() as conn:
row = await conn.fetchrow("SELECT value FROM config WHERE key = $1", key)
return row["value"] if row else default
# ---------------------------------------------------------------------------
# Recollection log
# ---------------------------------------------------------------------------
async def log_recollection(
pool: asyncpg.Pool,
agent_name: str,
salient_tokens: list[str],
recollection_block: str,
messages: list[dict],
) -> None:
"""Persist one recollection event. Fire-and-forget — never raises."""
try:
async with pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO recollection_log
(agent_name, salient_tokens, recollection_block, messages_json)
VALUES ($1, $2, $3, $4)
""",
agent_name or "",
salient_tokens,
recollection_block,
json.dumps(messages),
)
except Exception:
log.exception("recollection_log insert failed")