From 0c2d097cdf685d4130103fb53b44afd96290e71a Mon Sep 17 00:00:00 2001 From: jenstandstad Date: Sun, 10 May 2026 13:46:50 +0200 Subject: [PATCH] Adding fix to aligner --- claude.sh | 4 + gnommo/cli.py | 18 +-- gnommo/transformer.py | 318 +++++++++++++----------------------------- 3 files changed, 109 insertions(+), 231 deletions(-) create mode 100755 claude.sh diff --git a/claude.sh b/claude.sh new file mode 100755 index 0000000..8c54ba5 --- /dev/null +++ b/claude.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +claude --resume b0382a18-067d-4420-9c67-9c19b5034453 + diff --git a/gnommo/cli.py b/gnommo/cli.py index 088eaa3..8f7d9f5 100644 --- a/gnommo/cli.py +++ b/gnommo/cli.py @@ -744,12 +744,14 @@ def _generate_slides_json(directory: Path, verbose: bool) -> None: # Sort by slide number sorted_slides = dict(sorted(slides.items(), key=lambda x: int(x[0][1:]))) - # Write slides.json + # Write slides.json only if content changed output_path = directory / "slides.json" - with open(output_path, "w", encoding="utf-8") as f: - json.dump(sorted_slides, f, indent=2) - - print(f" Generated {output_path} ({len(sorted_slides)} slides)") + new_content = json.dumps(sorted_slides, indent=2) + existing_content = output_path.read_text(encoding="utf-8") if output_path.exists() else None + if new_content != existing_content: + with open(output_path, "w", encoding="utf-8") as f: + f.write(new_content) + print(f" Generated {output_path} ({len(sorted_slides)} slides)") if verbose: for slide_id in sorted_slides: print(f" [{slide_id}]") @@ -950,9 +952,9 @@ def _import_narration_segments(narration_dir: Path, config, verbose: bool) -> No added_count += 1 print(f" Added narration segment: {segment_id} (from raw_mov)") - # Always write narration.json (creates it if missing) - with open(narration_json_path, "w", encoding="utf-8") as f: - json.dump(existing_narration, f, indent=2) + if added_count > 0 or not narration_json_path.exists(): + with open(narration_json_path, "w", encoding="utf-8") as f: + json.dump(existing_narration, f, indent=2) if added_count > 0: print(f" Updated narration.json (+{added_count} segments)") diff --git a/gnommo/transformer.py b/gnommo/transformer.py index df32052..fbb67e2 100644 --- a/gnommo/transformer.py +++ b/gnommo/transformer.py @@ -1,5 +1,6 @@ """Transform stage: resolve timings and build render plan.""" +import difflib import re import string from dataclasses import dataclass @@ -177,208 +178,104 @@ def _strip_unknown_markers( return re.sub(r"\[([^\]]+)\]", "", text) -def _extract_marker_contexts( +def _build_sequence_alignment( manuscript_text: str, + transcription: list[TranscribedWord], slides: dict = None, videos: dict = None, audio: dict = None, -) -> list[tuple[str, str]]: +) -> tuple[list[str], list[tuple[str, int]], dict[int, int]]: """ - Extract known markers and the text immediately following them from manuscript. + Build a global word-level alignment between manuscript and transcription. - Unknown markers are filtered out and stripped from following text. - Note: [cite:...] markers are already stripped at parse time. + Strips markers from the manuscript to produce a plain word sequence, then + uses difflib.SequenceMatcher to align it against the transcript word list. + Ad-libbed words in the transcript appear as insertions and don't break the + alignment of surrounding manuscript text. - Returns list of (marker_id, following_text) tuples for known markers only. + Returns: + ms_words: normalized manuscript word list (markers stripped) + marker_positions: list of (marker_id, word_idx) in manuscript order, + where word_idx is the index of the first following word + alignment: dict mapping manuscript_word_idx → transcript_word_idx """ slides = slides or {} videos = videos or {} audio = audio or {} - # Split by markers, keeping the markers — broad pattern handles any content - # including paths with / and - (e.g. [vfb:pexels/7670835-uhd_3840_2160_30fps]) parts = re.split(r"\[([^\]]+)\]", manuscript_text) + ms_words: list[str] = [] + marker_positions: list[tuple[str, int]] = [] - # parts: [text_before, marker1, text_after1, marker2, text_after2, ...] - raw_contexts = [] - for i in range(1, len(parts), 2): - marker_id = parts[i] - - # Skip unknown markers entirely - if not _is_known_marker(marker_id, slides, videos, audio): - continue - - # Collect all following text, looking past unknown markers until the - # next known marker. This handles [S1][segment:1] text... where the - # text lives two parts ahead rather than immediately after S1. - text_pieces = [] - j = i + 1 - while j < len(parts): - chunk = parts[j].strip() - if chunk: - text_pieces.append(chunk) - j += 1 # advance to the marker after this text chunk - if j >= len(parts): - break - if _is_known_marker(parts[j], slides, videos, audio): - break # stop at the next known marker - j += 1 # skip the unknown marker; its following text is next - - following_text = " ".join(text_pieces) - following_text = " ".join(following_text.split()) # collapse whitespace - following_text = _strip_unknown_markers(following_text, slides, videos, audio) - following_text = " ".join(following_text.split()) - raw_contexts.append((marker_id, following_text)) - - # For markers with no following text (consecutive markers), look ahead - # Return (marker_id, following_text, is_borrowed) - is_borrowed=True means text came from look-ahead - contexts = [] - for i, (marker_id, following_text) in enumerate(raw_contexts): - if following_text: - # Take first ~10 words for matching - words = following_text.split()[:10] - contexts.append((marker_id, " ".join(words), False)) + for i, part in enumerate(parts): + if i % 2 == 0: + text = _strip_unknown_markers(part, slides, videos, audio) + for w in text.split(): + norm = _normalize_token(w) + if norm: + ms_words.append(norm) else: - # Look ahead for next marker with text, but never borrow from another - # slide marker — slides must align independently to avoid two consecutive - # slides matching the same transcription position simultaneously. - borrowed = False - for j in range(i + 1, len(raw_contexts)): - next_marker_id, next_text = raw_contexts[j] - if next_text: - if next_marker_id in (slides or {}): - break # Slide owns this text; give up borrowing - words = next_text.split()[:10] - contexts.append((marker_id, " ".join(words), True)) # Borrowed - borrowed = True - break - if not borrowed: - contexts.append((marker_id, "", False)) + marker_id = part + if _is_known_marker(marker_id, slides, videos, audio): + marker_positions.append((marker_id, len(ms_words))) - return contexts + tr_words = [_normalize_token(tw.word) for tw in transcription] + + matcher = difflib.SequenceMatcher(None, ms_words, tr_words, autojunk=False) + alignment: dict[int, int] = {} + for ms_start, tr_start, length in matcher.get_matching_blocks(): + for k in range(length): + alignment[ms_start + k] = tr_start + k + + return ms_words, marker_positions, alignment -def _fuzzy_match_ratio( - phrase_words: list[str], +def _timestamp_for_ms_word( + word_idx: int, + alignment: dict[int, int], + ms_len: int, transcription: list[TranscribedWord], - start_idx: int, - window_size: int = 10, -) -> tuple[float, int, int]: +) -> tuple[float, float]: """ - Calculate how many words from phrase match the transcription at start_idx. + Map a manuscript word index to a transcript timestamp and confidence. - Words are matched sequentially: each phrase word must appear at or after - the position of the previous match. This prevents false matches where - phrase words appear out of order or far into the window. - - Returns (ratio, first_match_offset, last_match_end_offset) where offsets - are relative to start_idx. last_match_end_offset points past the last - matched word. + Confidence levels: + 1.0 — direct alignment hit + 0.8 — a nearby word (within 5 forward) was aligned + 0.5 — interpolated between two surrounding anchors + 0.3 — extrapolated past the last anchor + 0.0 — no alignment data """ - if not phrase_words: - return 0.0, 0, 0 + if not transcription or not alignment: + return -1.0, 0.0 - words_to_check = min(len(phrase_words), window_size) - transcript_end = min( - start_idx + words_to_check + 5, len(transcription) - ) # +5 for flexibility (speaker may add filler words) + word_idx = min(word_idx, ms_len) - if start_idx >= len(transcription): - return 0.0, 0, 0 + if word_idx in alignment: + return transcription[alignment[word_idx]].start, 1.0 - transcript_words = [ - _normalize_token(transcription[j].word) - for j in range(start_idx, transcript_end) - ] + for delta in range(1, 6): + idx = word_idx + delta + if idx in alignment: + return transcription[alignment[idx]].start, 0.8 - # Match phrase words sequentially against transcript window - matches = 0 - words_checked = 0 - t_pos = 0 # Current search position in transcript window - first_match_offset = 0 - last_match_end_offset = 0 + before = max((m for m in alignment if m < word_idx), default=None) + after = min((m for m in alignment if m > word_idx), default=None) - for phrase_word in phrase_words[:words_to_check]: - normalized = _normalize_token(phrase_word) - if len(normalized) < 2: - continue # skip very short words (a, I, etc.) - don't count them - words_checked += 1 + if before is not None and after is not None: + t_b, t_a = alignment[before], alignment[after] + ratio = (word_idx - before) / (after - before) + t_idx = round(t_b + ratio * (t_a - t_b)) + t_idx = max(0, min(t_idx, len(transcription) - 1)) + return transcription[t_idx].start, 0.5 - # Search forward from current position (preserves word order) - for j in range(t_pos, len(transcript_words)): - t_word = transcript_words[j] - matched = False - # Exact match - if normalized == t_word: - matched = True - # Allow substring match for words 4+ chars (handles plurals, tenses) - elif len(normalized) >= 4 and len(t_word) >= 4: - if normalized in t_word or t_word in normalized: - matched = True + if before is not None: + return transcription[alignment[before]].end, 0.3 - if matched: - if matches == 0: - first_match_offset = j - matches += 1 - last_match_end_offset = j + 1 - t_pos = j + 1 # Next word must appear after this one - break + if after is not None: + return transcription[alignment[after]].start, 0.3 - ratio = matches / words_checked if words_checked > 0 else 0.0 - return ratio, first_match_offset, last_match_end_offset - - -def _find_phrase_timestamp( - phrase: str, - transcription: list[TranscribedWord], - start_from: int = 0, - fuzzy_threshold: float = 0.5, -) -> tuple[int, float, float, int]: - """ - Find a phrase in the transcription using fuzzy matching. - - Returns (word_index, timestamp, confidence, match_end_idx) or - (-1, -1.0, 0.0, -1) if not found. word_index points to the first - matched word. match_end_idx points past the last matched word. - """ - # Normalize each word individually — same method as transcript tokens. - # This keeps contractions as single tokens ("haven't" stays "haven't") so - # phrase and transcript word counts stay in sync. Using _normalize_text on - # the whole phrase would expand "haven't" → "have not" (2 words), creating - # a phantom "not" that fails to match the transcript and corrupts the window. - phrase_words = [tok for tok in (_normalize_token(w) for w in phrase.split()) if tok] - - if not phrase_words: - return -1, -1.0, 0.0, -1 - - best_idx = -1 - best_ratio = 0.0 - best_first_offset = 0 - best_end_offset = 0 - - # Slide through transcription looking for best match - for i in range(start_from, len(transcription)): - ratio, first_offset, end_offset = _fuzzy_match_ratio( - phrase_words, transcription, i - ) - if ratio > best_ratio: - best_ratio = ratio - best_idx = i - best_first_offset = first_offset - best_end_offset = end_offset - - # If we found a very good match, stop early - if ratio >= 0.95: - break - - if best_ratio >= fuzzy_threshold and best_idx >= 0: - # Use the actual first matched word position for the timestamp, - # not the window start position - actual_idx = best_idx + best_first_offset - match_end_idx = best_idx + best_end_offset - return actual_idx, transcription[actual_idx].start, best_ratio, match_end_idx - - return -1, -1.0, 0.0, -1 + return -1.0, 0.0 def align_markers_to_transcription( @@ -390,82 +287,57 @@ def align_markers_to_transcription( fuzzy_threshold: float = 0.6, ) -> list[MarkerTiming]: """ - Align manuscript markers to transcription timestamps using fuzzy matching. + Align manuscript markers to transcription timestamps using global sequence alignment. - This is the core alignment function that matches markers in manuscript.txt - to their corresponding timecodes in the whisper transcription. + Builds a word-level alignment between the manuscript (markers stripped) and the + Whisper transcript using difflib.SequenceMatcher. Ad-libbed words in the + transcript appear as insertions and don't disrupt alignment of surrounding + manuscript text. - Unknown markers are filtered out - they aren't pronounced and shouldn't - be in the render plan. Note: [cite:...] markers are stripped at parse time. + Unknown markers are filtered out — they aren't pronounced and shouldn't be in + the render plan. Note: [cite:...] markers are stripped at parse time. Args: manuscript_text: Full manuscript with [S1], [video:xxx], etc. - transcription: Word-level timestamps from whisper - slides: Slide definitions (to identify valid slide markers) - videos: Video definitions (to identify valid video markers) - audio: Audio definitions (to identify valid audio markers) - fuzzy_threshold: Minimum match ratio (default 0.6 = 60% of words) + transcription: Word-level timestamps from Whisper + slides: Slide definitions (to identify valid slide markers) + videos: Video definitions (to identify valid video markers) + audio: Audio definitions (to identify valid audio markers) + fuzzy_threshold: Kept for API compatibility; unused in alignment logic Returns: List of MarkerTiming with timestamps and confidence (known markers only) """ - contexts = _extract_marker_contexts(manuscript_text, slides, videos, audio) + if not transcription: + return [] + + ms_words, marker_positions, alignment = _build_sequence_alignment( + manuscript_text, transcription, slides, videos, audio + ) + ms_len = len(ms_words) timings: list[MarkerTiming] = [] - last_idx = 0 - last_end_time = 0.0 # Track end time of last matched phrase - - for marker_id, following_text, is_borrowed in contexts: - # If no text (empty context), place 1 second after the previous marker/phrase - # This handles markers like [video:xxx] that appear after text - if not following_text.strip(): - # Use 1 second after the previous end time - marker_time = last_end_time + 1.0 - timings.append( - MarkerTiming( - marker_id=marker_id, - timestamp=marker_time, - context="(after previous)", - confidence=1.0, - ) - ) - # Update last_end_time so subsequent markers without text continue to offset - last_end_time = marker_time - continue - - idx, timestamp, confidence, match_end_idx = _find_phrase_timestamp( - following_text, - transcription, - start_from=last_idx, - fuzzy_threshold=fuzzy_threshold, + for marker_id, word_idx in marker_positions: + context = " ".join(ms_words[word_idx: word_idx + 10]) + timestamp, confidence = _timestamp_for_ms_word( + word_idx, alignment, ms_len, transcription ) - - if idx >= 0: - # Apply offset: marker should appear slightly before the words + if timestamp >= 0: adjusted_time = max(0.0, timestamp - 0.5) timings.append( MarkerTiming( marker_id=marker_id, timestamp=adjusted_time, - context=following_text[:50], + context=context[:50], confidence=confidence, ) ) - # Only advance last_idx if this marker owns its text (not borrowed) - # If borrowed, the next marker needs to match the same text - if not is_borrowed: - last_idx = match_end_idx - # Calculate end time of this phrase for markers with no text - if last_idx > 0 and last_idx <= len(transcription): - last_end_time = transcription[last_idx - 1].end - else: - last_end_time = transcription[-1].end if transcription else 0.0 else: timings.append( MarkerTiming( marker_id=marker_id, timestamp=-1.0, - context=following_text[:50], + context=context[:50], confidence=0.0, ) )