""" Collision detection tests — exercises the in-memory URD insert pipeline. All tests inject state directly into the cache module dicts and call insert_urd_edge() with a mock asyncpg pool that records calls but never touches a real database. """ from __future__ import annotations import asyncio import pytest from unittest.mock import AsyncMock, MagicMock, patch from festinger import cache from festinger.urd_writer import InsertRequest, insert_urd_edge, CollisionInfo from tests.helpers import reset_cache, add_soas, add_urd # --------------------------------------------------------------------------- # Mock pool — captures INSERT attempts, never hits Postgres # --------------------------------------------------------------------------- def make_mock_pool(): """Return a mock asyncpg pool where execute() succeeds and never raises.""" conn = AsyncMock() conn.execute = AsyncMock(return_value="INSERT 0 1") conn.fetchrow = AsyncMock(return_value=None) conn.__aenter__ = AsyncMock(return_value=conn) conn.__aexit__ = AsyncMock(return_value=False) pool = MagicMock() pool.acquire = MagicMock(return_value=conn) return pool, conn # --------------------------------------------------------------------------- # Scenario A — misclassification (ISPART existing, ISA incoming, same dim) # # Pre-state: michigan ISPART usa in dim:usa (parent_id = dim_id — degenerate edge) # Trigger: michigan ISA state in dim:usa # Expected: misclassification collision, no URD modification # --------------------------------------------------------------------------- class TestScenarioA: def setup_method(self): reset_cache() # SOAS entries self.michigan = add_soas(101, "michigan", saliency=1.5) self.usa = add_soas(102, "usa", saliency=0.8) self.state = add_soas(103, "state", saliency=0.7) # Degenerate seed edge: michigan ISPART usa, dim=usa (parent_id = dim_id) self.existing = add_urd( concept_id=101, parent_id=102, dim_id=102, # dim_id = parent_id = usa is_isa=False, confidence=0.85, source="test" ) def test_degenerate_edge_stored(self): """Confirm the seed has parent_id == dim_id.""" edge = cache.urd_by_concept_dim.get((101, 102)) assert edge is not None assert edge.parent_id == edge.dim_id == 102 @pytest.mark.asyncio async def test_misclassification_detected(self): """Incoming ISA in the same dim as existing ISPART → misclassification.""" pool, conn = make_mock_pool() # Patch _queue_collision so we can inspect it without hitting Postgres queued: list[CollisionInfo] = [] async def fake_queue(pool, col, priority): queued.append(col) cache.pending_conflicts.add(col.concept_id) with patch("festinger.urd_writer._queue_collision", side_effect=fake_queue): req = InsertRequest( concept_id=101, # michigan parent_id=103, # state dim_id=102, # usa (same dim as existing ISPART) is_isa=True, confidence=0.85, source="gutask", ) collision = await insert_urd_edge(pool, req) assert collision is not None assert collision.collision_type == "misclassification" assert collision.existing_is_isa is False assert collision.incoming_is_isa is True @pytest.mark.asyncio async def test_urd_not_modified_on_collision(self): """URD in-memory cache is unchanged after a collision.""" pool, conn = make_mock_pool() with patch("festinger.urd_writer._queue_collision", new_callable=AsyncMock): req = InsertRequest( concept_id=101, parent_id=103, dim_id=102, is_isa=True, confidence=0.85, source="gutask", ) await insert_urd_edge(pool, req) # Original edge still in place edge = cache.urd_by_concept_dim.get((101, 102)) assert edge is not None assert edge.parent_id == 102 # usa, unchanged assert edge.is_isa is False @pytest.mark.asyncio async def test_postgres_not_written_on_collision(self): """No INSERT to Postgres is attempted when collision is detected in cache.""" pool, conn = make_mock_pool() with patch("festinger.urd_writer._queue_collision", new_callable=AsyncMock): req = InsertRequest( concept_id=101, parent_id=103, dim_id=102, is_isa=True, confidence=0.85, source="gutask", ) await insert_urd_edge(pool, req) conn.execute.assert_not_called() @pytest.mark.asyncio async def test_pending_conflicts_marked(self): """concept_id is added to pending_conflicts after collision.""" pool, _ = make_mock_pool() async def fake_queue(pool, col, priority): cache.pending_conflicts.add(col.concept_id) with patch("festinger.urd_writer._queue_collision", side_effect=fake_queue): req = InsertRequest( concept_id=101, parent_id=103, dim_id=102, is_isa=True, confidence=0.85, source="gutask", ) await insert_urd_edge(pool, req) assert 101 in cache.pending_conflicts # --------------------------------------------------------------------------- # Scenario B — ISA + ISA collision (dimension too coarse → decompose) # # Pre-state: gnommoweb ISA container in dim:type # Trigger: gnommoweb ISA repo in dim:type # Expected: isa_isa collision # --------------------------------------------------------------------------- class TestScenarioB: def setup_method(self): reset_cache() self.gnommoweb = add_soas(201, "gnommoweb", saliency=1.5) self.container = add_soas(202, "container", saliency=0.9) self.repo = add_soas(203, "repo", saliency=0.8) self.type_dim = add_soas(1, "type", saliency=0.0) self.existing = add_urd( concept_id=201, parent_id=202, dim_id=1, is_isa=True, confidence=0.9, source="cloud_llm" ) @pytest.mark.asyncio async def test_isa_isa_collision_type(self): pool, _ = make_mock_pool() queued: list[CollisionInfo] = [] async def fake_queue(pool, col, priority): queued.append(col) cache.pending_conflicts.add(col.concept_id) with patch("festinger.urd_writer._queue_collision", side_effect=fake_queue): req = InsertRequest( concept_id=201, parent_id=203, dim_id=1, is_isa=True, confidence=0.85, source="gutask", ) collision = await insert_urd_edge(pool, req) assert collision is not None assert collision.collision_type == "isa_isa" assert len(queued) == 1 assert queued[0].existing_parent_id == 202 # container assert queued[0].incoming_parent_id == 203 # repo @pytest.mark.asyncio async def test_existing_edge_unchanged_after_isa_isa(self): pool, _ = make_mock_pool() with patch("festinger.urd_writer._queue_collision", new_callable=AsyncMock): req = InsertRequest( concept_id=201, parent_id=203, dim_id=1, is_isa=True, confidence=0.85, source="gutask", ) await insert_urd_edge(pool, req) edge = cache.urd_by_concept_dim.get((201, 1)) assert edge.parent_id == 202 # still container # --------------------------------------------------------------------------- # Scenario C — ISPART + ISPART contradiction (host migration) # # Pre-state: dobby ISPART docker_host_1 in dim:runs-on # Trigger: dobby ISPART docker_host_2 in dim:runs-on # Expected: ispart_ispart collision # --------------------------------------------------------------------------- class TestScenarioC: def setup_method(self): reset_cache() self.dobby = add_soas(301, "dobby", saliency=1.5) self.host1 = add_soas(302, "docker_host_1", saliency=0.5) self.host2 = add_soas(303, "docker_host_2", saliency=0.5) self.runs_on_dim = add_soas(4, "runs-on", saliency=0.0) self.existing = add_urd( concept_id=301, parent_id=302, dim_id=4, is_isa=False, confidence=0.9, source="cloud_llm" ) @pytest.mark.asyncio async def test_ispart_ispart_collision_type(self): pool, _ = make_mock_pool() queued: list[CollisionInfo] = [] async def fake_queue(pool, col, priority): queued.append(col) with patch("festinger.urd_writer._queue_collision", side_effect=fake_queue): req = InsertRequest( concept_id=301, parent_id=303, dim_id=4, is_isa=False, confidence=1.0, source="gutask", ) collision = await insert_urd_edge(pool, req) assert collision is not None assert collision.collision_type == "ispart_ispart" assert queued[0].existing_parent_id == 302 # docker_host_1 assert queued[0].incoming_parent_id == 303 # docker_host_2 # --------------------------------------------------------------------------- # Clean insert (no collision) # --------------------------------------------------------------------------- class TestCleanInsert: def setup_method(self): reset_cache() add_soas(401, "festinger", saliency=1.2) add_soas(402, "middleware", saliency=0.8) add_soas(1, "type", saliency=0.0) @pytest.mark.asyncio async def test_successful_insert_updates_cache(self): pool, conn = make_mock_pool() req = InsertRequest( concept_id=401, parent_id=402, dim_id=1, is_isa=True, confidence=0.9, source="test", ) collision = await insert_urd_edge(pool, req) assert collision is None edge = cache.urd_by_concept_dim.get((401, 1)) assert edge is not None assert edge.parent_id == 402 assert edge.is_isa is True @pytest.mark.asyncio async def test_successful_insert_calls_postgres(self): pool, conn = make_mock_pool() req = InsertRequest( concept_id=401, parent_id=402, dim_id=1, is_isa=True, confidence=0.9, source="test", ) await insert_urd_edge(pool, req) conn.execute.assert_called_once() call_args = conn.execute.call_args[0] assert "INSERT INTO urd" in call_args[0]