Files
agent0/plugins/festinger/festinger/resolution_job.py
T
2026-04-20 16:16:46 +02:00

260 lines
9.9 KiB
Python

"""
Nightly resolution job — drain the resolution queue via cloud LLM.
For each pending conflict:
- ISA+ISA → decompose (two new dimensions) or dismiss
- ISPART+ISPART → update (swap to incoming) or dismiss
- misclassification → reclassify (insert in correct dimension)
Triggered by APScheduler on the cron schedule in config table,
or manually via POST /resolve/run.
"""
from __future__ import annotations
import json
import logging
from datetime import datetime, timezone
import asyncpg
from . import cache
from .db import get_or_create_soas, get_config, reload_urd_cache
from .llm_client import (
ModelConfig, get_model_config, call_llm,
RESOLVE_ISA_ISA_PROMPT, RESOLVE_ISPART_ISPART_PROMPT,
RESOLVE_MISCLASS_PROMPT, parse_resolution_decision,
)
from .urd_writer import InsertRequest, insert_urd_edge, log_kg_write
log = logging.getLogger("festinger.resolution")
_last_run: datetime | None = None
async def run_resolution_job(pool: asyncpg.Pool) -> dict:
"""
Process all pending resolution queue items.
Returns a summary dict.
"""
global _last_run
log.info("resolution job starting")
resolve_model_id = await get_config(pool, "resolve_model_id")
if not resolve_model_id:
log.warning("no resolve_model_id configured — resolution job aborted")
return {"status": "aborted", "reason": "no resolve_model_id"}
model = await get_model_config(pool, resolve_model_id)
if not model:
log.warning("resolve_model_id=%s not found in models table", resolve_model_id)
return {"status": "aborted", "reason": "model not found"}
async with pool.acquire() as conn:
items = await conn.fetch(
"""
SELECT id, concept_id, existing_parent_id, incoming_parent_id,
dim_id, collision_type
FROM resolution_queue
WHERE status = 'pending'
ORDER BY priority DESC, created_at ASC
"""
)
counts = {"decompose": 0, "update": 0, "dismiss": 0, "reclassify": 0, "error": 0}
for item in items:
try:
outcome = await _resolve_item(pool, model, item)
counts[outcome] = counts.get(outcome, 0) + 1
except Exception as e:
log.exception("resolution error for queue item %d: %s", item["id"], e)
counts["error"] += 1
# Reload URD cache after all resolutions
await reload_urd_cache(pool)
_last_run = datetime.now(tz=timezone.utc)
log.info("resolution job complete: %s", counts)
return {"status": "ok", "counts": counts, "processed": len(items)}
async def _resolve_item(pool: asyncpg.Pool, model: ModelConfig, item) -> str:
concept_token = cache.soas_by_id.get(item["concept_id"], str(item["concept_id"]))
existing_parent = cache.soas_by_id.get(item["existing_parent_id"], "?")
incoming_parent = cache.soas_by_id.get(item["incoming_parent_id"], "?")
dim_token = cache.soas_by_id.get(item["dim_id"], "?")
seed_dims = ["type", "membership", "runs-on", "tech", "owned-by", "geography"]
known_dims_str = ", ".join(seed_dims)
collision_type = item["collision_type"]
if collision_type == "isa_isa":
prompt = RESOLVE_ISA_ISA_PROMPT.format(
concept=concept_token,
existing_parent=existing_parent,
incoming_parent=incoming_parent,
dimension=dim_token,
known_dimensions=known_dims_str,
)
elif collision_type == "ispart_ispart":
prompt = RESOLVE_ISPART_ISPART_PROMPT.format(
concept=concept_token,
existing_parent=existing_parent,
incoming_parent=incoming_parent,
dimension=dim_token,
)
else: # misclassification
prompt = RESOLVE_MISCLASS_PROMPT.format(
concept=concept_token,
existing_parent=existing_parent,
incoming_parent=incoming_parent,
dimension=dim_token,
known_dimensions=known_dims_str,
)
response = await call_llm(model, prompt)
decision = parse_resolution_decision(response)
if not decision:
await _mark_resolved(pool, item["id"], "error", {"raw_response": response[:500]})
return "error"
outcome = decision.decision
if outcome == "decompose" and collision_type == "isa_isa":
await _apply_decompose(pool, item, decision, concept_token, existing_parent, incoming_parent)
elif outcome == "update" and collision_type == "ispart_ispart":
await _apply_update(pool, item)
elif outcome == "reclassify":
await _apply_reclassify(pool, item, decision)
else:
outcome = "dismiss"
await _mark_resolved(
pool, item["id"], "resolved",
{"decision": outcome, "reasoning": decision.reasoning},
)
return outcome
async def _apply_decompose(pool, item, decision, concept_token, existing_parent, incoming_parent):
"""Create two new dimensions, migrate existing fact, insert incoming fact."""
existing_dim_token = decision.existing_dimension or "type-a"
new_dim_token = decision.new_dimension or "type-b"
existing_dim_row = await get_or_create_soas(pool, existing_dim_token)
new_dim_row = await get_or_create_soas(pool, new_dim_token)
# Create root nodes for new dimensions
async with pool.acquire() as conn:
for dim_row in [existing_dim_row, new_dim_row]:
await conn.execute(
"INSERT INTO urd (id, parent_id, dim_id, is_isa, confidence, source) "
"VALUES ($1, $1, $1, true, 1.0, 'festinger') ON CONFLICT DO NOTHING",
dim_row.id,
)
# Migrate existing edge to new existing_dimension
async with pool.acquire() as conn:
await conn.execute(
"DELETE FROM urd WHERE id=$1 AND dim_id=$2",
item["concept_id"], item["dim_id"],
)
existing_parent_row = await get_or_create_soas(pool, cache.soas_by_id.get(item["existing_parent_id"], existing_parent))
incoming_parent_row = await get_or_create_soas(pool, cache.soas_by_id.get(item["incoming_parent_id"], incoming_parent))
for parent_row, dim_row in [
(existing_parent_row, existing_dim_row),
(incoming_parent_row, new_dim_row),
]:
req = InsertRequest(
concept_id=item["concept_id"],
parent_id=parent_row.id,
dim_id=dim_row.id,
is_isa=True,
confidence=0.9,
source="festinger",
)
await insert_urd_edge(pool, req)
# Override the 'insert' op written by insert_urd_edge with 'decompose'
# by appending a dedicated decompose log row that captures the full context.
await log_kg_write(
pool, "decompose", req,
concept_token=cache.soas_by_id.get(item["concept_id"], str(item["concept_id"])),
parent_token=cache.soas_by_id.get(parent_row.id, parent_row.token if hasattr(parent_row, "token") else str(parent_row.id)),
dim_token=dim_row.token if hasattr(dim_row, "token") else str(dim_row.id),
prev_parent_id=item["existing_parent_id"],
prev_parent_token=cache.soas_by_id.get(item["existing_parent_id"], str(item["existing_parent_id"])),
resolution_queue_id=item["id"],
)
log.info("decompose applied: %s → [%s, %s]", item["concept_id"], existing_dim_token, new_dim_token)
async def _apply_update(pool, item):
"""Replace old ISPART edge with incoming fact."""
async with pool.acquire() as conn:
await conn.execute(
"DELETE FROM urd WHERE id=$1 AND dim_id=$2",
item["concept_id"], item["dim_id"],
)
req = InsertRequest(
concept_id=item["concept_id"],
parent_id=item["incoming_parent_id"],
dim_id=item["dim_id"],
is_isa=False,
confidence=0.85,
source="festinger",
)
await insert_urd_edge(pool, req)
await log_kg_write(
pool, "rewrite", req,
concept_token=cache.soas_by_id.get(item["concept_id"], str(item["concept_id"])),
parent_token=cache.soas_by_id.get(item["incoming_parent_id"], str(item["incoming_parent_id"])),
dim_token=cache.soas_by_id.get(item["dim_id"], str(item["dim_id"])),
prev_parent_id=item["existing_parent_id"],
prev_parent_token=cache.soas_by_id.get(item["existing_parent_id"], str(item["existing_parent_id"])),
resolution_queue_id=item["id"],
)
log.info("update applied: concept=%d dim=%d → parent=%d",
item["concept_id"], item["dim_id"], item["incoming_parent_id"])
async def _apply_reclassify(pool, item, decision):
"""Insert incoming fact in the corrected dimension."""
correct_dim_token = decision.correct_dimension or "membership"
dim_row = await get_or_create_soas(pool, correct_dim_token)
req = InsertRequest(
concept_id=item["concept_id"],
parent_id=item["incoming_parent_id"],
dim_id=dim_row.id,
is_isa=False,
confidence=0.8,
source="festinger",
)
await insert_urd_edge(pool, req)
await log_kg_write(
pool, "reclassify", req,
concept_token=cache.soas_by_id.get(item["concept_id"], str(item["concept_id"])),
parent_token=cache.soas_by_id.get(item["incoming_parent_id"], str(item["incoming_parent_id"])),
dim_token=correct_dim_token,
prev_parent_id=item["existing_parent_id"],
prev_parent_token=cache.soas_by_id.get(item["existing_parent_id"], str(item["existing_parent_id"])),
resolution_queue_id=item["id"],
)
log.info("reclassify applied: concept=%d → dim=%s", item["concept_id"], correct_dim_token)
async def _mark_resolved(pool, queue_id: int, status: str, resolution: dict) -> None:
async with pool.acquire() as conn:
await conn.execute(
"UPDATE resolution_queue SET status=$1, resolution=$2, resolved_at=now() WHERE id=$3",
status, json.dumps(resolution), queue_id,
)
def last_run_timestamp() -> str | None:
if _last_run is None:
return None
return _last_run.isoformat()