Adding fix to aligner

This commit is contained in:
2026-05-10 13:46:50 +02:00
parent 2dff8f45b9
commit 0c2d097cdf
3 changed files with 109 additions and 231 deletions
Executable
+4
View File
@@ -0,0 +1,4 @@
#!/bin/bash
claude --resume b0382a18-067d-4420-9c67-9c19b5034453
+6 -4
View File
@@ -744,11 +744,13 @@ def _generate_slides_json(directory: Path, verbose: bool) -> None:
# Sort by slide number
sorted_slides = dict(sorted(slides.items(), key=lambda x: int(x[0][1:])))
# Write slides.json
# Write slides.json only if content changed
output_path = directory / "slides.json"
new_content = json.dumps(sorted_slides, indent=2)
existing_content = output_path.read_text(encoding="utf-8") if output_path.exists() else None
if new_content != existing_content:
with open(output_path, "w", encoding="utf-8") as f:
json.dump(sorted_slides, f, indent=2)
f.write(new_content)
print(f" Generated {output_path} ({len(sorted_slides)} slides)")
if verbose:
for slide_id in sorted_slides:
@@ -950,7 +952,7 @@ def _import_narration_segments(narration_dir: Path, config, verbose: bool) -> No
added_count += 1
print(f" Added narration segment: {segment_id} (from raw_mov)")
# Always write narration.json (creates it if missing)
if added_count > 0 or not narration_json_path.exists():
with open(narration_json_path, "w", encoding="utf-8") as f:
json.dump(existing_narration, f, indent=2)
+92 -220
View File
@@ -1,5 +1,6 @@
"""Transform stage: resolve timings and build render plan."""
import difflib
import re
import string
from dataclasses import dataclass
@@ -177,208 +178,104 @@ def _strip_unknown_markers(
return re.sub(r"\[([^\]]+)\]", "", text)
def _extract_marker_contexts(
def _build_sequence_alignment(
manuscript_text: str,
transcription: list[TranscribedWord],
slides: dict = None,
videos: dict = None,
audio: dict = None,
) -> list[tuple[str, str]]:
) -> tuple[list[str], list[tuple[str, int]], dict[int, int]]:
"""
Extract known markers and the text immediately following them from manuscript.
Build a global word-level alignment between manuscript and transcription.
Unknown markers are filtered out and stripped from following text.
Note: [cite:...] markers are already stripped at parse time.
Strips markers from the manuscript to produce a plain word sequence, then
uses difflib.SequenceMatcher to align it against the transcript word list.
Ad-libbed words in the transcript appear as insertions and don't break the
alignment of surrounding manuscript text.
Returns list of (marker_id, following_text) tuples for known markers only.
Returns:
ms_words: normalized manuscript word list (markers stripped)
marker_positions: list of (marker_id, word_idx) in manuscript order,
where word_idx is the index of the first following word
alignment: dict mapping manuscript_word_idx → transcript_word_idx
"""
slides = slides or {}
videos = videos or {}
audio = audio or {}
# Split by markers, keeping the markers — broad pattern handles any content
# including paths with / and - (e.g. [vfb:pexels/7670835-uhd_3840_2160_30fps])
parts = re.split(r"\[([^\]]+)\]", manuscript_text)
ms_words: list[str] = []
marker_positions: list[tuple[str, int]] = []
# parts: [text_before, marker1, text_after1, marker2, text_after2, ...]
raw_contexts = []
for i in range(1, len(parts), 2):
marker_id = parts[i]
# Skip unknown markers entirely
if not _is_known_marker(marker_id, slides, videos, audio):
continue
# Collect all following text, looking past unknown markers until the
# next known marker. This handles [S1][segment:1] text... where the
# text lives two parts ahead rather than immediately after S1.
text_pieces = []
j = i + 1
while j < len(parts):
chunk = parts[j].strip()
if chunk:
text_pieces.append(chunk)
j += 1 # advance to the marker after this text chunk
if j >= len(parts):
break
if _is_known_marker(parts[j], slides, videos, audio):
break # stop at the next known marker
j += 1 # skip the unknown marker; its following text is next
following_text = " ".join(text_pieces)
following_text = " ".join(following_text.split()) # collapse whitespace
following_text = _strip_unknown_markers(following_text, slides, videos, audio)
following_text = " ".join(following_text.split())
raw_contexts.append((marker_id, following_text))
# For markers with no following text (consecutive markers), look ahead
# Return (marker_id, following_text, is_borrowed) - is_borrowed=True means text came from look-ahead
contexts = []
for i, (marker_id, following_text) in enumerate(raw_contexts):
if following_text:
# Take first ~10 words for matching
words = following_text.split()[:10]
contexts.append((marker_id, " ".join(words), False))
for i, part in enumerate(parts):
if i % 2 == 0:
text = _strip_unknown_markers(part, slides, videos, audio)
for w in text.split():
norm = _normalize_token(w)
if norm:
ms_words.append(norm)
else:
# Look ahead for next marker with text, but never borrow from another
# slide marker — slides must align independently to avoid two consecutive
# slides matching the same transcription position simultaneously.
borrowed = False
for j in range(i + 1, len(raw_contexts)):
next_marker_id, next_text = raw_contexts[j]
if next_text:
if next_marker_id in (slides or {}):
break # Slide owns this text; give up borrowing
words = next_text.split()[:10]
contexts.append((marker_id, " ".join(words), True)) # Borrowed
borrowed = True
break
if not borrowed:
contexts.append((marker_id, "", False))
marker_id = part
if _is_known_marker(marker_id, slides, videos, audio):
marker_positions.append((marker_id, len(ms_words)))
return contexts
tr_words = [_normalize_token(tw.word) for tw in transcription]
matcher = difflib.SequenceMatcher(None, ms_words, tr_words, autojunk=False)
alignment: dict[int, int] = {}
for ms_start, tr_start, length in matcher.get_matching_blocks():
for k in range(length):
alignment[ms_start + k] = tr_start + k
return ms_words, marker_positions, alignment
def _fuzzy_match_ratio(
phrase_words: list[str],
def _timestamp_for_ms_word(
word_idx: int,
alignment: dict[int, int],
ms_len: int,
transcription: list[TranscribedWord],
start_idx: int,
window_size: int = 10,
) -> tuple[float, int, int]:
) -> tuple[float, float]:
"""
Calculate how many words from phrase match the transcription at start_idx.
Map a manuscript word index to a transcript timestamp and confidence.
Words are matched sequentially: each phrase word must appear at or after
the position of the previous match. This prevents false matches where
phrase words appear out of order or far into the window.
Returns (ratio, first_match_offset, last_match_end_offset) where offsets
are relative to start_idx. last_match_end_offset points past the last
matched word.
Confidence levels:
1.0 — direct alignment hit
0.8 — a nearby word (within 5 forward) was aligned
0.5 — interpolated between two surrounding anchors
0.3 — extrapolated past the last anchor
0.0 — no alignment data
"""
if not phrase_words:
return 0.0, 0, 0
if not transcription or not alignment:
return -1.0, 0.0
words_to_check = min(len(phrase_words), window_size)
transcript_end = min(
start_idx + words_to_check + 5, len(transcription)
) # +5 for flexibility (speaker may add filler words)
word_idx = min(word_idx, ms_len)
if start_idx >= len(transcription):
return 0.0, 0, 0
if word_idx in alignment:
return transcription[alignment[word_idx]].start, 1.0
transcript_words = [
_normalize_token(transcription[j].word)
for j in range(start_idx, transcript_end)
]
for delta in range(1, 6):
idx = word_idx + delta
if idx in alignment:
return transcription[alignment[idx]].start, 0.8
# Match phrase words sequentially against transcript window
matches = 0
words_checked = 0
t_pos = 0 # Current search position in transcript window
first_match_offset = 0
last_match_end_offset = 0
before = max((m for m in alignment if m < word_idx), default=None)
after = min((m for m in alignment if m > word_idx), default=None)
for phrase_word in phrase_words[:words_to_check]:
normalized = _normalize_token(phrase_word)
if len(normalized) < 2:
continue # skip very short words (a, I, etc.) - don't count them
words_checked += 1
if before is not None and after is not None:
t_b, t_a = alignment[before], alignment[after]
ratio = (word_idx - before) / (after - before)
t_idx = round(t_b + ratio * (t_a - t_b))
t_idx = max(0, min(t_idx, len(transcription) - 1))
return transcription[t_idx].start, 0.5
# Search forward from current position (preserves word order)
for j in range(t_pos, len(transcript_words)):
t_word = transcript_words[j]
matched = False
# Exact match
if normalized == t_word:
matched = True
# Allow substring match for words 4+ chars (handles plurals, tenses)
elif len(normalized) >= 4 and len(t_word) >= 4:
if normalized in t_word or t_word in normalized:
matched = True
if before is not None:
return transcription[alignment[before]].end, 0.3
if matched:
if matches == 0:
first_match_offset = j
matches += 1
last_match_end_offset = j + 1
t_pos = j + 1 # Next word must appear after this one
break
if after is not None:
return transcription[alignment[after]].start, 0.3
ratio = matches / words_checked if words_checked > 0 else 0.0
return ratio, first_match_offset, last_match_end_offset
def _find_phrase_timestamp(
phrase: str,
transcription: list[TranscribedWord],
start_from: int = 0,
fuzzy_threshold: float = 0.5,
) -> tuple[int, float, float, int]:
"""
Find a phrase in the transcription using fuzzy matching.
Returns (word_index, timestamp, confidence, match_end_idx) or
(-1, -1.0, 0.0, -1) if not found. word_index points to the first
matched word. match_end_idx points past the last matched word.
"""
# Normalize each word individually — same method as transcript tokens.
# This keeps contractions as single tokens ("haven't" stays "haven't") so
# phrase and transcript word counts stay in sync. Using _normalize_text on
# the whole phrase would expand "haven't" → "have not" (2 words), creating
# a phantom "not" that fails to match the transcript and corrupts the window.
phrase_words = [tok for tok in (_normalize_token(w) for w in phrase.split()) if tok]
if not phrase_words:
return -1, -1.0, 0.0, -1
best_idx = -1
best_ratio = 0.0
best_first_offset = 0
best_end_offset = 0
# Slide through transcription looking for best match
for i in range(start_from, len(transcription)):
ratio, first_offset, end_offset = _fuzzy_match_ratio(
phrase_words, transcription, i
)
if ratio > best_ratio:
best_ratio = ratio
best_idx = i
best_first_offset = first_offset
best_end_offset = end_offset
# If we found a very good match, stop early
if ratio >= 0.95:
break
if best_ratio >= fuzzy_threshold and best_idx >= 0:
# Use the actual first matched word position for the timestamp,
# not the window start position
actual_idx = best_idx + best_first_offset
match_end_idx = best_idx + best_end_offset
return actual_idx, transcription[actual_idx].start, best_ratio, match_end_idx
return -1, -1.0, 0.0, -1
return -1.0, 0.0
def align_markers_to_transcription(
@@ -390,82 +287,57 @@ def align_markers_to_transcription(
fuzzy_threshold: float = 0.6,
) -> list[MarkerTiming]:
"""
Align manuscript markers to transcription timestamps using fuzzy matching.
Align manuscript markers to transcription timestamps using global sequence alignment.
This is the core alignment function that matches markers in manuscript.txt
to their corresponding timecodes in the whisper transcription.
Builds a word-level alignment between the manuscript (markers stripped) and the
Whisper transcript using difflib.SequenceMatcher. Ad-libbed words in the
transcript appear as insertions and don't disrupt alignment of surrounding
manuscript text.
Unknown markers are filtered out - they aren't pronounced and shouldn't
be in the render plan. Note: [cite:...] markers are stripped at parse time.
Unknown markers are filtered out they aren't pronounced and shouldn't be in
the render plan. Note: [cite:...] markers are stripped at parse time.
Args:
manuscript_text: Full manuscript with [S1], [video:xxx], etc.
transcription: Word-level timestamps from whisper
transcription: Word-level timestamps from Whisper
slides: Slide definitions (to identify valid slide markers)
videos: Video definitions (to identify valid video markers)
audio: Audio definitions (to identify valid audio markers)
fuzzy_threshold: Minimum match ratio (default 0.6 = 60% of words)
fuzzy_threshold: Kept for API compatibility; unused in alignment logic
Returns:
List of MarkerTiming with timestamps and confidence (known markers only)
"""
contexts = _extract_marker_contexts(manuscript_text, slides, videos, audio)
if not transcription:
return []
ms_words, marker_positions, alignment = _build_sequence_alignment(
manuscript_text, transcription, slides, videos, audio
)
ms_len = len(ms_words)
timings: list[MarkerTiming] = []
last_idx = 0
last_end_time = 0.0 # Track end time of last matched phrase
for marker_id, following_text, is_borrowed in contexts:
# If no text (empty context), place 1 second after the previous marker/phrase
# This handles markers like [video:xxx] that appear after text
if not following_text.strip():
# Use 1 second after the previous end time
marker_time = last_end_time + 1.0
timings.append(
MarkerTiming(
marker_id=marker_id,
timestamp=marker_time,
context="(after previous)",
confidence=1.0,
for marker_id, word_idx in marker_positions:
context = " ".join(ms_words[word_idx: word_idx + 10])
timestamp, confidence = _timestamp_for_ms_word(
word_idx, alignment, ms_len, transcription
)
)
# Update last_end_time so subsequent markers without text continue to offset
last_end_time = marker_time
continue
idx, timestamp, confidence, match_end_idx = _find_phrase_timestamp(
following_text,
transcription,
start_from=last_idx,
fuzzy_threshold=fuzzy_threshold,
)
if idx >= 0:
# Apply offset: marker should appear slightly before the words
if timestamp >= 0:
adjusted_time = max(0.0, timestamp - 0.5)
timings.append(
MarkerTiming(
marker_id=marker_id,
timestamp=adjusted_time,
context=following_text[:50],
context=context[:50],
confidence=confidence,
)
)
# Only advance last_idx if this marker owns its text (not borrowed)
# If borrowed, the next marker needs to match the same text
if not is_borrowed:
last_idx = match_end_idx
# Calculate end time of this phrase for markers with no text
if last_idx > 0 and last_idx <= len(transcription):
last_end_time = transcription[last_idx - 1].end
else:
last_end_time = transcription[-1].end if transcription else 0.0
else:
timings.append(
MarkerTiming(
marker_id=marker_id,
timestamp=-1.0,
context=following_text[:50],
context=context[:50],
confidence=0.0,
)
)