Files
2026-04-19 16:16:13 +02:00

285 lines
10 KiB
Python

"""
Collision detection tests — exercises the in-memory URD insert pipeline.
All tests inject state directly into the cache module dicts and call
insert_urd_edge() with a mock asyncpg pool that records calls but never
touches a real database.
"""
from __future__ import annotations
import asyncio
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from festinger import cache
from festinger.urd_writer import InsertRequest, insert_urd_edge, CollisionInfo
from tests.helpers import reset_cache, add_soas, add_urd
# ---------------------------------------------------------------------------
# Mock pool — captures INSERT attempts, never hits Postgres
# ---------------------------------------------------------------------------
def make_mock_pool():
"""Return a mock asyncpg pool where execute() succeeds and never raises."""
conn = AsyncMock()
conn.execute = AsyncMock(return_value="INSERT 0 1")
conn.fetchrow = AsyncMock(return_value=None)
conn.__aenter__ = AsyncMock(return_value=conn)
conn.__aexit__ = AsyncMock(return_value=False)
pool = MagicMock()
pool.acquire = MagicMock(return_value=conn)
return pool, conn
# ---------------------------------------------------------------------------
# Scenario A — misclassification (ISPART existing, ISA incoming, same dim)
#
# Pre-state: michigan ISPART usa in dim:usa (parent_id = dim_id — degenerate edge)
# Trigger: michigan ISA state in dim:usa
# Expected: misclassification collision, no URD modification
# ---------------------------------------------------------------------------
class TestScenarioA:
def setup_method(self):
reset_cache()
# SOAS entries
self.michigan = add_soas(101, "michigan", saliency=1.5)
self.usa = add_soas(102, "usa", saliency=0.8)
self.state = add_soas(103, "state", saliency=0.7)
# Degenerate seed edge: michigan ISPART usa, dim=usa (parent_id = dim_id)
self.existing = add_urd(
concept_id=101, parent_id=102, dim_id=102, # dim_id = parent_id = usa
is_isa=False, confidence=0.85, source="test"
)
def test_degenerate_edge_stored(self):
"""Confirm the seed has parent_id == dim_id."""
edge = cache.urd_by_concept_dim.get((101, 102))
assert edge is not None
assert edge.parent_id == edge.dim_id == 102
@pytest.mark.asyncio
async def test_misclassification_detected(self):
"""Incoming ISA in the same dim as existing ISPART → misclassification."""
pool, conn = make_mock_pool()
# Patch _queue_collision so we can inspect it without hitting Postgres
queued: list[CollisionInfo] = []
async def fake_queue(pool, col, priority):
queued.append(col)
cache.pending_conflicts.add(col.concept_id)
with patch("festinger.urd_writer._queue_collision", side_effect=fake_queue):
req = InsertRequest(
concept_id=101, # michigan
parent_id=103, # state
dim_id=102, # usa (same dim as existing ISPART)
is_isa=True,
confidence=0.85,
source="gutask",
)
collision = await insert_urd_edge(pool, req)
assert collision is not None
assert collision.collision_type == "misclassification"
assert collision.existing_is_isa is False
assert collision.incoming_is_isa is True
@pytest.mark.asyncio
async def test_urd_not_modified_on_collision(self):
"""URD in-memory cache is unchanged after a collision."""
pool, conn = make_mock_pool()
with patch("festinger.urd_writer._queue_collision", new_callable=AsyncMock):
req = InsertRequest(
concept_id=101, parent_id=103, dim_id=102,
is_isa=True, confidence=0.85, source="gutask",
)
await insert_urd_edge(pool, req)
# Original edge still in place
edge = cache.urd_by_concept_dim.get((101, 102))
assert edge is not None
assert edge.parent_id == 102 # usa, unchanged
assert edge.is_isa is False
@pytest.mark.asyncio
async def test_postgres_not_written_on_collision(self):
"""No INSERT to Postgres is attempted when collision is detected in cache."""
pool, conn = make_mock_pool()
with patch("festinger.urd_writer._queue_collision", new_callable=AsyncMock):
req = InsertRequest(
concept_id=101, parent_id=103, dim_id=102,
is_isa=True, confidence=0.85, source="gutask",
)
await insert_urd_edge(pool, req)
conn.execute.assert_not_called()
@pytest.mark.asyncio
async def test_pending_conflicts_marked(self):
"""concept_id is added to pending_conflicts after collision."""
pool, _ = make_mock_pool()
async def fake_queue(pool, col, priority):
cache.pending_conflicts.add(col.concept_id)
with patch("festinger.urd_writer._queue_collision", side_effect=fake_queue):
req = InsertRequest(
concept_id=101, parent_id=103, dim_id=102,
is_isa=True, confidence=0.85, source="gutask",
)
await insert_urd_edge(pool, req)
assert 101 in cache.pending_conflicts
# ---------------------------------------------------------------------------
# Scenario B — ISA + ISA collision (dimension too coarse → decompose)
#
# Pre-state: gnommoweb ISA container in dim:type
# Trigger: gnommoweb ISA repo in dim:type
# Expected: isa_isa collision
# ---------------------------------------------------------------------------
class TestScenarioB:
def setup_method(self):
reset_cache()
self.gnommoweb = add_soas(201, "gnommoweb", saliency=1.5)
self.container = add_soas(202, "container", saliency=0.9)
self.repo = add_soas(203, "repo", saliency=0.8)
self.type_dim = add_soas(1, "type", saliency=0.0)
self.existing = add_urd(
concept_id=201, parent_id=202, dim_id=1,
is_isa=True, confidence=0.9, source="cloud_llm"
)
@pytest.mark.asyncio
async def test_isa_isa_collision_type(self):
pool, _ = make_mock_pool()
queued: list[CollisionInfo] = []
async def fake_queue(pool, col, priority):
queued.append(col)
cache.pending_conflicts.add(col.concept_id)
with patch("festinger.urd_writer._queue_collision", side_effect=fake_queue):
req = InsertRequest(
concept_id=201, parent_id=203, dim_id=1,
is_isa=True, confidence=0.85, source="gutask",
)
collision = await insert_urd_edge(pool, req)
assert collision is not None
assert collision.collision_type == "isa_isa"
assert len(queued) == 1
assert queued[0].existing_parent_id == 202 # container
assert queued[0].incoming_parent_id == 203 # repo
@pytest.mark.asyncio
async def test_existing_edge_unchanged_after_isa_isa(self):
pool, _ = make_mock_pool()
with patch("festinger.urd_writer._queue_collision", new_callable=AsyncMock):
req = InsertRequest(
concept_id=201, parent_id=203, dim_id=1,
is_isa=True, confidence=0.85, source="gutask",
)
await insert_urd_edge(pool, req)
edge = cache.urd_by_concept_dim.get((201, 1))
assert edge.parent_id == 202 # still container
# ---------------------------------------------------------------------------
# Scenario C — ISPART + ISPART contradiction (host migration)
#
# Pre-state: dobby ISPART docker_host_1 in dim:runs-on
# Trigger: dobby ISPART docker_host_2 in dim:runs-on
# Expected: ispart_ispart collision
# ---------------------------------------------------------------------------
class TestScenarioC:
def setup_method(self):
reset_cache()
self.dobby = add_soas(301, "dobby", saliency=1.5)
self.host1 = add_soas(302, "docker_host_1", saliency=0.5)
self.host2 = add_soas(303, "docker_host_2", saliency=0.5)
self.runs_on_dim = add_soas(4, "runs-on", saliency=0.0)
self.existing = add_urd(
concept_id=301, parent_id=302, dim_id=4,
is_isa=False, confidence=0.9, source="cloud_llm"
)
@pytest.mark.asyncio
async def test_ispart_ispart_collision_type(self):
pool, _ = make_mock_pool()
queued: list[CollisionInfo] = []
async def fake_queue(pool, col, priority):
queued.append(col)
with patch("festinger.urd_writer._queue_collision", side_effect=fake_queue):
req = InsertRequest(
concept_id=301, parent_id=303, dim_id=4,
is_isa=False, confidence=1.0, source="gutask",
)
collision = await insert_urd_edge(pool, req)
assert collision is not None
assert collision.collision_type == "ispart_ispart"
assert queued[0].existing_parent_id == 302 # docker_host_1
assert queued[0].incoming_parent_id == 303 # docker_host_2
# ---------------------------------------------------------------------------
# Clean insert (no collision)
# ---------------------------------------------------------------------------
class TestCleanInsert:
def setup_method(self):
reset_cache()
add_soas(401, "festinger", saliency=1.2)
add_soas(402, "middleware", saliency=0.8)
add_soas(1, "type", saliency=0.0)
@pytest.mark.asyncio
async def test_successful_insert_updates_cache(self):
pool, conn = make_mock_pool()
req = InsertRequest(
concept_id=401, parent_id=402, dim_id=1,
is_isa=True, confidence=0.9, source="test",
)
collision = await insert_urd_edge(pool, req)
assert collision is None
edge = cache.urd_by_concept_dim.get((401, 1))
assert edge is not None
assert edge.parent_id == 402
assert edge.is_isa is True
@pytest.mark.asyncio
async def test_successful_insert_calls_postgres(self):
pool, conn = make_mock_pool()
req = InsertRequest(
concept_id=401, parent_id=402, dim_id=1,
is_isa=True, confidence=0.9, source="test",
)
await insert_urd_edge(pool, req)
conn.execute.assert_called_once()
call_args = conn.execute.call_args[0]
assert "INSERT INTO urd" in call_args[0]