Compare commits

...

2 Commits

Author SHA1 Message Date
gitprov 98b115d471 Draing the queue 2026-04-20 18:22:57 +02:00
gitprov 58f903aec0 Adding updates to Festinger 2026-04-20 18:21:23 +02:00
3 changed files with 71 additions and 16 deletions
+22 -10
View File
@@ -495,17 +495,19 @@ async def process_prompt(body: dict, path: str, pool, cfg: dict) -> dict:
salient_for_read: list[int] = []
salient_for_write: list[str] = []
# Novel words found this prompt that aren't in the cache yet.
# We cap at MAX_NOVEL_PER_PROMPT to avoid flooding on large system prompts.
MAX_NOVEL_PER_PROMPT = 5
novel_this_prompt: list[str] = []
for token in tokens:
soas_row = cache.soas_by_token.get(token)
if soas_row is None:
# Token is absent from the dictionary entirely → novel domain word.
# Give it an initial high saliency so recollection fires immediately
# and instructs the model to ask the user what it is.
soas_row = await create_novel_soas(pool, token)
salient_for_read.append(soas_row.id)
# Do NOT add to salient_for_write: we have no basis for LLM-inferred
# relationships yet — let the conversation teach us via cue scanner.
# Token absent from dictionary → candidate novel domain word.
# Collect for batch processing; apply a per-prompt cap.
if len(novel_this_prompt) < MAX_NOVEL_PER_PROMPT:
novel_this_prompt.append(token)
continue
if soas_row.saliency == 0.0 and soas_row.novelty == 0.0:
@@ -517,11 +519,21 @@ async def process_prompt(body: dict, path: str, pool, cfg: dict) -> dict:
if soas_row.saliency >= read_threshold:
salient_for_read.append(soas_row.id)
if soas_row.saliency >= write_threshold and soas_row.novelty > 0.0:
# Only enqueue domain-specific words for LLM relationship extraction,
# not freshly-created novel words (novelty=1.0 but just inserted).
# Only enqueue for LLM write if the concept already has URD edges —
# i.e. we know *something* about it and may want to expand that knowledge.
# Never enqueue freshly-novel words: let the conversation teach us instead.
if (
soas_row.saliency >= write_threshold
and soas_row.novelty > 0.0
and cache.urd_by_concept.get(soas_row.id)
):
salient_for_write.append(token)
# Create SOAS entries for novel words and add them to the read list.
for token in novel_this_prompt:
soas_row = await create_novel_soas(pool, token)
salient_for_read.append(soas_row.id)
for token in salient_for_write:
await enqueue_concept(token)
+20 -4
View File
@@ -77,21 +77,37 @@ def build_recollection_block(
salient_concept_ids: list[int],
confidence_floor: float,
recency_days: int,
max_lines: int = 12,
) -> Optional[str]:
"""
Build the full <recollection> block for a list of above-threshold concept IDs.
Returns None if there is nothing to say.
"""
lines: list[str] = []
Concepts with known edges are shown first (most informative); zero-hit
"don't know" lines fill the remaining budget up to max_lines.
"""
hit_lines: list[str] = []
zero_lines: list[str] = []
# Deduplicate — same concept may appear twice if novel + known
seen: set[int] = set()
for cid in salient_concept_ids:
if cid in seen:
continue
seen.add(cid)
token = cache.soas_by_id.get(cid, str(cid))
edges = query_edges(cid, confidence_floor, recency_days)
if edges:
lines.append(render_hit(token, edges, cid))
hit_lines.append(render_hit(token, edges, cid))
else:
lines.append(render_zero_hit(token))
zero_lines.append(render_zero_hit(token))
# Hits always included (up to max_lines); zero-hits fill the remainder
lines = hit_lines[:max_lines]
remaining = max_lines - len(lines)
lines += zero_lines[:remaining]
if not lines:
return None
+29 -2
View File
@@ -42,6 +42,11 @@ class CueWriteRequest:
_queue: asyncio.Queue = asyncio.Queue(maxsize=1000)
_running: bool = False
# Limit concurrent LM Studio / cloud-LLM calls so we don't flood a slow
# local model server. Cue processing (pure DB ops) is never rate-limited.
_LLM_CONCURRENCY = 2
_llm_semaphore: asyncio.Semaphore | None = None
async def enqueue_concept(token: str) -> None:
try:
@@ -59,13 +64,21 @@ async def enqueue_cue(triple: CueTriple) -> None:
async def start_worker(pool: asyncpg.Pool) -> None:
"""Launch background worker. Call once at startup."""
global _running
global _running, _llm_semaphore
_running = True
_llm_semaphore = asyncio.Semaphore(_LLM_CONCURRENCY)
asyncio.create_task(_worker(pool))
log.info("write queue worker started")
async def _worker(pool: asyncpg.Pool) -> None:
"""
Drain the queue as fast as possible.
Cue writes (pure DB ops) are awaited inline — they're fast.
Concept LLM calls are dispatched as fire-and-forget asyncio tasks so
a slow LM Studio response never stalls the worker loop.
"""
while _running:
try:
item = await asyncio.wait_for(_queue.get(), timeout=5.0)
@@ -74,15 +87,29 @@ async def _worker(pool: asyncpg.Pool) -> None:
try:
if isinstance(item, CueWriteRequest):
# Fast path: no LLM involved, process inline.
await _process_cue(pool, item.triple)
elif isinstance(item, WriteRequest):
await _process_concept(pool, item.concept_token)
# Slow path: fire off without awaiting so the worker stays free.
asyncio.create_task(
_process_concept_guarded(pool, item.concept_token)
)
except Exception as e:
log.exception("write queue worker error: %s", e)
finally:
_queue.task_done()
async def _process_concept_guarded(pool: asyncpg.Pool, concept_token: str) -> None:
"""Wrapper that acquires the LLM semaphore before calling the model."""
assert _llm_semaphore is not None
async with _llm_semaphore:
try:
await _process_concept(pool, concept_token)
except Exception as e:
log.exception("concept LLM task error for %s: %s", concept_token, e)
async def stop_worker() -> None:
global _running
_running = False