View of rewrites

This commit is contained in:
2026-04-20 16:16:46 +02:00
parent 8ff73d32ae
commit 2f8880d785
4 changed files with 233 additions and 1 deletions
+30
View File
@@ -81,3 +81,33 @@ CREATE TABLE IF NOT EXISTS resolution_queue (
CREATE INDEX IF NOT EXISTS rq_status_idx ON resolution_queue (status);
CREATE INDEX IF NOT EXISTS rq_concept_idx ON resolution_queue (concept_id);
-- ---------------------------------------------------------------------------
-- kg_write_log — immutable audit log of every knowledge graph write
-- ---------------------------------------------------------------------------
-- op values:
-- 'insert' — new URD edge written for the first time
-- 'rewrite' — existing edge replaced (ispart_ispart update)
-- 'decompose' — isa_isa collision resolved by splitting into two dimensions
-- 'reclassify' — concept re-inserted in the correct dimension
CREATE TABLE IF NOT EXISTS kg_write_log (
id SERIAL PRIMARY KEY,
op VARCHAR(16) NOT NULL,
concept_id INT NOT NULL REFERENCES soas(id),
concept_token TEXT NOT NULL,
parent_id INT NOT NULL REFERENCES soas(id),
parent_token TEXT NOT NULL,
prev_parent_id INT REFERENCES soas(id),
prev_parent_token TEXT,
dim_id INT NOT NULL REFERENCES soas(id),
dim_token TEXT NOT NULL,
is_isa BOOLEAN NOT NULL DEFAULT false,
confidence FLOAT NOT NULL DEFAULT 1.0,
source VARCHAR(32) NOT NULL DEFAULT 'cloud_llm',
resolution_queue_id INT REFERENCES resolution_queue(id),
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS kwl_created_idx ON kg_write_log (created_at DESC);
CREATE INDEX IF NOT EXISTS kwl_concept_idx ON kg_write_log (concept_id);
CREATE INDEX IF NOT EXISTS kwl_op_idx ON kg_write_log (op);
+132
View File
@@ -611,6 +611,49 @@ async def conflicts(request: Request) -> dict:
return {"conflicts": [format_row(r) for r in rows]}
@app.get("/kg-log")
async def kg_log(request: Request, limit: int = 100, offset: int = 0, op: str = "") -> dict:
"""Return recent knowledge graph write log entries, newest first."""
pool = request.app.state.pool
query = """
SELECT id, op, concept_token, parent_token, prev_parent_token,
dim_token, is_isa, confidence, source, resolution_queue_id, created_at
FROM kg_write_log
{where}
ORDER BY created_at DESC
LIMIT $1 OFFSET $2
"""
count_query = "SELECT COUNT(*) FROM kg_write_log {where}"
if op:
where = "WHERE op = $3"
async with pool.acquire() as conn:
rows = await conn.fetch(query.format(where=where), limit, offset, op)
total = await conn.fetchval(count_query.format(where=where), op)
else:
where = ""
async with pool.acquire() as conn:
rows = await conn.fetch(query.format(where=where), limit, offset)
total = await conn.fetchval(count_query.format(where=where))
def fmt(r):
return {
"id": r["id"],
"op": r["op"],
"concept": r["concept_token"],
"parent": r["parent_token"],
"prev_parent": r["prev_parent_token"],
"dimension": r["dim_token"],
"is_isa": r["is_isa"],
"confidence": round(r["confidence"], 3),
"source": r["source"],
"resolution_queue_id": r["resolution_queue_id"],
"created_at": r["created_at"].isoformat() if r["created_at"] else None,
}
return {"total": total, "offset": offset, "limit": limit, "entries": [fmt(r) for r in rows]}
# ---------------------------------------------------------------------------
# /test — scenario seeding and reset (for integration testing)
# ---------------------------------------------------------------------------
@@ -679,6 +722,17 @@ ADMIN_HTML = """<!DOCTYPE html>
pre {{ background: #f4f4f4; border: 1px solid #e0e0e0; border-radius: 3px; padding: 1em; overflow: auto; font-size: 0.85em; max-height: 400px; }}
.status-ok {{ color: #2a7a2a; }}
.status-err {{ color: #b00; }}
table {{ width: 100%; border-collapse: collapse; font-size: 0.82em; }}
th {{ text-align: left; border-bottom: 2px solid #ddd; padding: 4px 8px; font-size: 0.75em; text-transform: uppercase; letter-spacing: 0.04em; color: #666; }}
td {{ border-bottom: 1px solid #f0f0f0; padding: 4px 8px; vertical-align: top; }}
tr:hover td {{ background: #fafafa; }}
.op-insert {{ color: #2a7a2a; font-weight: bold; }}
.op-rewrite {{ color: #c07000; font-weight: bold; }}
.op-decompose {{ color: #5050cc; font-weight: bold; }}
.op-reclassify {{ color: #c02060; font-weight: bold; }}
.log-controls {{ display: flex; gap: 1em; align-items: center; margin: 0.5em 0 1em; }}
.log-controls select, .log-controls button {{ font-family: monospace; padding: 4px 10px; }}
.log-nav {{ margin-top: 0.5em; display: flex; gap: 1em; align-items: center; font-size: 0.85em; }}
footer {{ margin-top: 3em; padding-top: 1em; border-top: 1px solid #ddd; font-size: 0.78em; color: #888; }}
footer a {{ color: #888; }}
</style>
@@ -705,6 +759,26 @@ ADMIN_HTML = """<!DOCTYPE html>
<h2>Pending conflicts</h2>
<pre id="conflicts-pre">Loading…</pre>
<h2>Knowledge graph write log</h2>
<div class="log-controls">
<label>Op filter:
<select id="log-op-filter" onchange="loadLog(0)">
<option value="">all</option>
<option value="insert">insert</option>
<option value="rewrite">rewrite</option>
<option value="decompose">decompose</option>
<option value="reclassify">reclassify</option>
</select>
</label>
<button onclick="loadLog(0)">Refresh</button>
</div>
<div id="log-table-wrap">Loading…</div>
<div class="log-nav" id="log-nav" style="display:none">
<button id="log-prev" onclick="logPage(-1)" disabled>← prev</button>
<span id="log-page-info"></span>
<button id="log-next" onclick="logPage(1)">next →</button>
</div>
<footer>
<strong>Vocabulary source:</strong>
Princeton University &ldquo;About WordNet.&rdquo; <em>WordNet.</em> Princeton University. 2010.
@@ -772,8 +846,66 @@ ADMIN_HTML = """<!DOCTYPE html>
}}
}}
const LOG_PAGE_SIZE = 50;
let logOffset = 0;
let logTotal = 0;
async function loadLog(offset) {{
logOffset = offset;
const op = document.getElementById('log-op-filter').value;
const params = new URLSearchParams({{limit: LOG_PAGE_SIZE, offset: logOffset}});
if (op) params.set('op', op);
const r = await fetch('/kg-log?' + params);
const d = await r.json();
logTotal = d.total;
const opClass = {{insert:'op-insert', rewrite:'op-rewrite', decompose:'op-decompose', reclassify:'op-reclassify'}};
if (!d.entries.length) {{
document.getElementById('log-table-wrap').innerHTML = '<em>No entries yet.</em>';
document.getElementById('log-nav').style.display = 'none';
return;
}}
let html = '<table><thead><tr>'
+ '<th>#</th><th>Time</th><th>Op</th><th>Concept</th>'
+ '<th>Dimension</th><th>Parent</th><th>Prev parent</th>'
+ '<th>is-a</th><th>Conf</th><th>Source</th>'
+ '</tr></thead><tbody>';
for (const e of d.entries) {{
const ts = e.created_at ? e.created_at.replace('T',' ').slice(0,19) : '';
const cls = opClass[e.op] || '';
html += `<tr>
<td>${{e.id}}</td>
<td>${{ts}}</td>
<td><span class="${{cls}}">${{e.op}}</span></td>
<td>${{e.concept}}</td>
<td>${{e.dimension}}</td>
<td>${{e.parent}}</td>
<td>${{e.prev_parent || ''}}</td>
<td>${{e.is_isa ? 'yes' : ''}}</td>
<td>${{e.confidence}}</td>
<td>${{e.source}}</td>
</tr>`;
}}
html += '</tbody></table>';
document.getElementById('log-table-wrap').innerHTML = html;
const nav = document.getElementById('log-nav');
nav.style.display = 'flex';
document.getElementById('log-prev').disabled = logOffset === 0;
document.getElementById('log-next').disabled = logOffset + LOG_PAGE_SIZE >= logTotal;
document.getElementById('log-page-info').textContent =
`${{logOffset + 1}}${{Math.min(logOffset + LOG_PAGE_SIZE, logTotal)}} of ${{logTotal}}`;
}}
function logPage(dir) {{
loadLog(Math.max(0, logOffset + dir * LOG_PAGE_SIZE));
}}
loadStats();
loadConflicts();
loadLog(0);
</script>
</body>
</html>
+30 -1
View File
@@ -24,7 +24,7 @@ from .llm_client import (
RESOLVE_ISA_ISA_PROMPT, RESOLVE_ISPART_ISPART_PROMPT,
RESOLVE_MISCLASS_PROMPT, parse_resolution_decision,
)
from .urd_writer import InsertRequest, insert_urd_edge
from .urd_writer import InsertRequest, insert_urd_edge, log_kg_write
log = logging.getLogger("festinger.resolution")
@@ -176,6 +176,17 @@ async def _apply_decompose(pool, item, decision, concept_token, existing_parent,
source="festinger",
)
await insert_urd_edge(pool, req)
# Override the 'insert' op written by insert_urd_edge with 'decompose'
# by appending a dedicated decompose log row that captures the full context.
await log_kg_write(
pool, "decompose", req,
concept_token=cache.soas_by_id.get(item["concept_id"], str(item["concept_id"])),
parent_token=cache.soas_by_id.get(parent_row.id, parent_row.token if hasattr(parent_row, "token") else str(parent_row.id)),
dim_token=dim_row.token if hasattr(dim_row, "token") else str(dim_row.id),
prev_parent_id=item["existing_parent_id"],
prev_parent_token=cache.soas_by_id.get(item["existing_parent_id"], str(item["existing_parent_id"])),
resolution_queue_id=item["id"],
)
log.info("decompose applied: %s → [%s, %s]", item["concept_id"], existing_dim_token, new_dim_token)
@@ -196,6 +207,15 @@ async def _apply_update(pool, item):
source="festinger",
)
await insert_urd_edge(pool, req)
await log_kg_write(
pool, "rewrite", req,
concept_token=cache.soas_by_id.get(item["concept_id"], str(item["concept_id"])),
parent_token=cache.soas_by_id.get(item["incoming_parent_id"], str(item["incoming_parent_id"])),
dim_token=cache.soas_by_id.get(item["dim_id"], str(item["dim_id"])),
prev_parent_id=item["existing_parent_id"],
prev_parent_token=cache.soas_by_id.get(item["existing_parent_id"], str(item["existing_parent_id"])),
resolution_queue_id=item["id"],
)
log.info("update applied: concept=%d dim=%d → parent=%d",
item["concept_id"], item["dim_id"], item["incoming_parent_id"])
@@ -213,6 +233,15 @@ async def _apply_reclassify(pool, item, decision):
source="festinger",
)
await insert_urd_edge(pool, req)
await log_kg_write(
pool, "reclassify", req,
concept_token=cache.soas_by_id.get(item["concept_id"], str(item["concept_id"])),
parent_token=cache.soas_by_id.get(item["incoming_parent_id"], str(item["incoming_parent_id"])),
dim_token=correct_dim_token,
prev_parent_id=item["existing_parent_id"],
prev_parent_token=cache.soas_by_id.get(item["existing_parent_id"], str(item["existing_parent_id"])),
resolution_queue_id=item["id"],
)
log.info("reclassify applied: concept=%d → dim=%s", item["concept_id"], correct_dim_token)
+41
View File
@@ -127,9 +127,50 @@ async def insert_urd_edge(
"urd insert concept=%d parent=%d dim=%d is_isa=%s source=%s",
req.concept_id, req.parent_id, req.dim_id, req.is_isa, req.source,
)
await log_kg_write(
pool, "insert", req,
concept_token=cache.soas_by_id.get(req.concept_id, str(req.concept_id)),
parent_token=parent_token,
dim_token=dim_token,
)
return None
async def log_kg_write(
pool: asyncpg.Pool,
op: str,
req: InsertRequest,
*,
concept_token: str,
parent_token: str,
dim_token: str,
prev_parent_id: Optional[int] = None,
prev_parent_token: Optional[str] = None,
resolution_queue_id: Optional[int] = None,
) -> None:
"""Append one row to kg_write_log. Fire-and-forget — never raises."""
try:
async with pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO kg_write_log
(op, concept_id, concept_token, parent_id, parent_token,
prev_parent_id, prev_parent_token, dim_id, dim_token,
is_isa, confidence, source, resolution_queue_id)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13)
""",
op,
req.concept_id, concept_token,
req.parent_id, parent_token,
prev_parent_id, prev_parent_token,
req.dim_id, dim_token,
req.is_isa, req.confidence, req.source,
resolution_queue_id,
)
except Exception:
log.exception("kg_write_log insert failed (op=%s concept=%d)", op, req.concept_id)
async def _queue_collision(
pool: asyncpg.Pool,
col: CollisionInfo,