Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 98b115d471 | |||
| 58f903aec0 |
@@ -495,17 +495,19 @@ async def process_prompt(body: dict, path: str, pool, cfg: dict) -> dict:
|
||||
salient_for_read: list[int] = []
|
||||
salient_for_write: list[str] = []
|
||||
|
||||
# Novel words found this prompt that aren't in the cache yet.
|
||||
# We cap at MAX_NOVEL_PER_PROMPT to avoid flooding on large system prompts.
|
||||
MAX_NOVEL_PER_PROMPT = 5
|
||||
novel_this_prompt: list[str] = []
|
||||
|
||||
for token in tokens:
|
||||
soas_row = cache.soas_by_token.get(token)
|
||||
|
||||
if soas_row is None:
|
||||
# Token is absent from the dictionary entirely → novel domain word.
|
||||
# Give it an initial high saliency so recollection fires immediately
|
||||
# and instructs the model to ask the user what it is.
|
||||
soas_row = await create_novel_soas(pool, token)
|
||||
salient_for_read.append(soas_row.id)
|
||||
# Do NOT add to salient_for_write: we have no basis for LLM-inferred
|
||||
# relationships yet — let the conversation teach us via cue scanner.
|
||||
# Token absent from dictionary → candidate novel domain word.
|
||||
# Collect for batch processing; apply a per-prompt cap.
|
||||
if len(novel_this_prompt) < MAX_NOVEL_PER_PROMPT:
|
||||
novel_this_prompt.append(token)
|
||||
continue
|
||||
|
||||
if soas_row.saliency == 0.0 and soas_row.novelty == 0.0:
|
||||
@@ -517,11 +519,21 @@ async def process_prompt(body: dict, path: str, pool, cfg: dict) -> dict:
|
||||
if soas_row.saliency >= read_threshold:
|
||||
salient_for_read.append(soas_row.id)
|
||||
|
||||
if soas_row.saliency >= write_threshold and soas_row.novelty > 0.0:
|
||||
# Only enqueue domain-specific words for LLM relationship extraction,
|
||||
# not freshly-created novel words (novelty=1.0 but just inserted).
|
||||
# Only enqueue for LLM write if the concept already has URD edges —
|
||||
# i.e. we know *something* about it and may want to expand that knowledge.
|
||||
# Never enqueue freshly-novel words: let the conversation teach us instead.
|
||||
if (
|
||||
soas_row.saliency >= write_threshold
|
||||
and soas_row.novelty > 0.0
|
||||
and cache.urd_by_concept.get(soas_row.id)
|
||||
):
|
||||
salient_for_write.append(token)
|
||||
|
||||
# Create SOAS entries for novel words and add them to the read list.
|
||||
for token in novel_this_prompt:
|
||||
soas_row = await create_novel_soas(pool, token)
|
||||
salient_for_read.append(soas_row.id)
|
||||
|
||||
for token in salient_for_write:
|
||||
await enqueue_concept(token)
|
||||
|
||||
|
||||
@@ -77,21 +77,37 @@ def build_recollection_block(
|
||||
salient_concept_ids: list[int],
|
||||
confidence_floor: float,
|
||||
recency_days: int,
|
||||
max_lines: int = 12,
|
||||
) -> Optional[str]:
|
||||
"""
|
||||
Build the full <recollection> block for a list of above-threshold concept IDs.
|
||||
Returns None if there is nothing to say.
|
||||
"""
|
||||
lines: list[str] = []
|
||||
|
||||
Concepts with known edges are shown first (most informative); zero-hit
|
||||
"don't know" lines fill the remaining budget up to max_lines.
|
||||
"""
|
||||
hit_lines: list[str] = []
|
||||
zero_lines: list[str] = []
|
||||
|
||||
# Deduplicate — same concept may appear twice if novel + known
|
||||
seen: set[int] = set()
|
||||
for cid in salient_concept_ids:
|
||||
if cid in seen:
|
||||
continue
|
||||
seen.add(cid)
|
||||
|
||||
token = cache.soas_by_id.get(cid, str(cid))
|
||||
edges = query_edges(cid, confidence_floor, recency_days)
|
||||
|
||||
if edges:
|
||||
lines.append(render_hit(token, edges, cid))
|
||||
hit_lines.append(render_hit(token, edges, cid))
|
||||
else:
|
||||
lines.append(render_zero_hit(token))
|
||||
zero_lines.append(render_zero_hit(token))
|
||||
|
||||
# Hits always included (up to max_lines); zero-hits fill the remainder
|
||||
lines = hit_lines[:max_lines]
|
||||
remaining = max_lines - len(lines)
|
||||
lines += zero_lines[:remaining]
|
||||
|
||||
if not lines:
|
||||
return None
|
||||
|
||||
@@ -42,6 +42,11 @@ class CueWriteRequest:
|
||||
_queue: asyncio.Queue = asyncio.Queue(maxsize=1000)
|
||||
_running: bool = False
|
||||
|
||||
# Limit concurrent LM Studio / cloud-LLM calls so we don't flood a slow
|
||||
# local model server. Cue processing (pure DB ops) is never rate-limited.
|
||||
_LLM_CONCURRENCY = 2
|
||||
_llm_semaphore: asyncio.Semaphore | None = None
|
||||
|
||||
|
||||
async def enqueue_concept(token: str) -> None:
|
||||
try:
|
||||
@@ -59,13 +64,21 @@ async def enqueue_cue(triple: CueTriple) -> None:
|
||||
|
||||
async def start_worker(pool: asyncpg.Pool) -> None:
|
||||
"""Launch background worker. Call once at startup."""
|
||||
global _running
|
||||
global _running, _llm_semaphore
|
||||
_running = True
|
||||
_llm_semaphore = asyncio.Semaphore(_LLM_CONCURRENCY)
|
||||
asyncio.create_task(_worker(pool))
|
||||
log.info("write queue worker started")
|
||||
|
||||
|
||||
async def _worker(pool: asyncpg.Pool) -> None:
|
||||
"""
|
||||
Drain the queue as fast as possible.
|
||||
|
||||
Cue writes (pure DB ops) are awaited inline — they're fast.
|
||||
Concept LLM calls are dispatched as fire-and-forget asyncio tasks so
|
||||
a slow LM Studio response never stalls the worker loop.
|
||||
"""
|
||||
while _running:
|
||||
try:
|
||||
item = await asyncio.wait_for(_queue.get(), timeout=5.0)
|
||||
@@ -74,15 +87,29 @@ async def _worker(pool: asyncpg.Pool) -> None:
|
||||
|
||||
try:
|
||||
if isinstance(item, CueWriteRequest):
|
||||
# Fast path: no LLM involved, process inline.
|
||||
await _process_cue(pool, item.triple)
|
||||
elif isinstance(item, WriteRequest):
|
||||
await _process_concept(pool, item.concept_token)
|
||||
# Slow path: fire off without awaiting so the worker stays free.
|
||||
asyncio.create_task(
|
||||
_process_concept_guarded(pool, item.concept_token)
|
||||
)
|
||||
except Exception as e:
|
||||
log.exception("write queue worker error: %s", e)
|
||||
finally:
|
||||
_queue.task_done()
|
||||
|
||||
|
||||
async def _process_concept_guarded(pool: asyncpg.Pool, concept_token: str) -> None:
|
||||
"""Wrapper that acquires the LLM semaphore before calling the model."""
|
||||
assert _llm_semaphore is not None
|
||||
async with _llm_semaphore:
|
||||
try:
|
||||
await _process_concept(pool, concept_token)
|
||||
except Exception as e:
|
||||
log.exception("concept LLM task error for %s: %s", concept_token, e)
|
||||
|
||||
|
||||
async def stop_worker() -> None:
|
||||
global _running
|
||||
_running = False
|
||||
|
||||
Reference in New Issue
Block a user