dding updates to gnommo

This commit is contained in:
2026-05-11 08:23:21 +02:00
parent 0c2d097cdf
commit b9376cd650
4 changed files with 375 additions and 113 deletions
+2 -1
View File
@@ -1,4 +1,5 @@
#!/bin/bash #!/bin/bash
claude --resume b0382a18-067d-4420-9c67-9c19b5034453 claude --resume df8f915f-0f99-4e0f-b345-3562a49fcb06
+1 -1
View File
@@ -1992,7 +1992,6 @@ def cmd_stitch(
# Create/update narration_combined entry # Create/update narration_combined entry
existing_videos["narration_combined"] = { existing_videos["narration_combined"] = {
"source_file": "narration_combined.mov", "source_file": "narration_combined.mov",
"output_file": "narration_combined.mov",
"cutout": cutout, "cutout": cutout,
"always_visible": True, "always_visible": True,
"volume": 1.0, "volume": 1.0,
@@ -3038,6 +3037,7 @@ _RSYNC_EXCLUDES = [
"media/videos/intermediate/**", "media/videos/intermediate/**",
"media/narration/processed/", "media/narration/processed/",
"media/narration/processed/**", "media/narration/processed/**",
"media/videos/narration_combined.mov",
# Chunk scratch directories # Chunk scratch directories
"**/chunks/", "**/chunks/",
"**/chunks/**", "**/chunks/**",
+103
View File
@@ -720,3 +720,106 @@ def resolve_video_file(
# Direct video file reference # Direct video file reference
return ref_path, None return ref_path, None
def resolve_missing_videos(
missing_ids: list[str],
project_path: Path,
config: Optional[ProjectConfig] = None,
) -> dict[str, VideoSource]:
"""
For video IDs not found in the project's videos.json, look them up in
shared_assets/videos.json. When a match is found the entry is written back
into the project's videos.json with ``is_shared: true`` so subsequent runs
find it without another lookup.
Returns a dict of newly resolved VideoSource objects (only the ones found).
Silently ignores IDs that aren't in the shared library either.
"""
if not missing_ids:
return {}
# Locate shared_assets
shared_dir: Optional[Path] = None
if (project_path / "shared_assets").exists():
shared_dir = project_path / "shared_assets"
elif (project_path.parent / "shared_assets").exists():
shared_dir = project_path.parent / "shared_assets"
if shared_dir is None:
return {}
shared_videos_path = shared_dir / "videos.json"
if not shared_videos_path.exists():
return {}
try:
shared_data = _read_json(shared_videos_path)
except (json.JSONDecodeError, OSError):
return {}
found = {vid_id for vid_id in missing_ids if vid_id in shared_data}
if not found:
return {}
# Load the project's videos.json so we can append to it
if config and config.videos_path:
local_videos_path = project_path / config.videos_path
else:
local_videos_path = project_path / "videos.json"
try:
local_data = _read_json(local_videos_path) if local_videos_path.exists() else {}
except (json.JSONDecodeError, OSError):
local_data = {}
resolved: dict[str, VideoSource] = {}
for video_id in sorted(found):
entry = dict(shared_data[video_id])
entry["is_shared"] = True
# Persist into the project's videos.json
local_data[video_id] = entry
print(f" → Copied shared video '{video_id}' into videos.json (is_shared=true)")
# Build the in-memory VideoSource
attribution = None
if "attribution" in entry:
attr = entry["attribution"]
attribution = Attribution(
source=attr.get("source", "unknown"),
creator=attr.get("creator", "Unknown"),
url=attr.get("url"),
)
raw_duration = entry.get("duration")
raw_has_audio = entry.get("has_audio")
resolved[video_id] = VideoSource(
source_file=entry["source_file"],
filter=entry.get("filter", []),
output_file=entry.get("output_file"),
take=entry.get("take"),
skip=float(entry.get("skip", 0.0)),
zoom=float(entry.get("zoom", 1.0)),
cutout=entry.get("cutout"),
always_visible=bool(entry.get("always_visible", False)),
is_shared=True,
pause_narration=float(entry.get("pause_narration", 0)),
attribution=attribution,
use_audio_channels=entry.get("use_audio_channels", "both"),
defer_loudnorm=bool(entry.get("defer_loudnorm", False)),
volume=float(entry.get("volume", 1.0)),
layer=entry.get("layer", "above"),
duration=float(raw_duration) if raw_duration is not None else None,
has_audio=bool(raw_has_audio) if raw_has_audio is not None else None,
end_on=entry.get("end_on"),
)
try:
with open(local_videos_path, "w", encoding="utf-8") as fh:
json.dump(local_data, fh, indent=4)
fh.write("\n")
except OSError as e:
print(f" Warning: could not update videos.json: {e}")
return resolved
+269 -111
View File
@@ -1,6 +1,5 @@
"""Transform stage: resolve timings and build render plan.""" """Transform stage: resolve timings and build render plan."""
import difflib
import re import re
import string import string
from dataclasses import dataclass from dataclasses import dataclass
@@ -23,7 +22,7 @@ from .models import (
VideoEvent, VideoEvent,
VideoSource, VideoSource,
) )
from .parser import get_video_duration from .parser import get_video_duration, resolve_missing_videos
from .transcriber import TranscribedWord from .transcriber import TranscribedWord
# Audio trigger offset: play sound this many seconds before the marker # Audio trigger offset: play sound this many seconds before the marker
@@ -178,104 +177,182 @@ def _strip_unknown_markers(
return re.sub(r"\[([^\]]+)\]", "", text) return re.sub(r"\[([^\]]+)\]", "", text)
def _build_sequence_alignment( def _extract_marker_contexts(
manuscript_text: str, manuscript_text: str,
transcription: list[TranscribedWord],
slides: dict = None, slides: dict = None,
videos: dict = None, videos: dict = None,
audio: dict = None, audio: dict = None,
) -> tuple[list[str], list[tuple[str, int]], dict[int, int]]: ) -> list[tuple[str, str, bool]]:
""" """
Build a global word-level alignment between manuscript and transcription. Extract known markers and the text immediately following them from manuscript.
Strips markers from the manuscript to produce a plain word sequence, then Unknown markers are filtered out and stripped from following text.
uses difflib.SequenceMatcher to align it against the transcript word list. Note: [cite:...] markers are already stripped at parse time.
Ad-libbed words in the transcript appear as insertions and don't break the
alignment of surrounding manuscript text.
Returns: Returns list of (marker_id, following_text, is_borrowed) tuples for known markers only.
ms_words: normalized manuscript word list (markers stripped)
marker_positions: list of (marker_id, word_idx) in manuscript order,
where word_idx is the index of the first following word
alignment: dict mapping manuscript_word_idx → transcript_word_idx
""" """
slides = slides or {} slides = slides or {}
videos = videos or {} videos = videos or {}
audio = audio or {} audio = audio or {}
parts = re.split(r"\[([^\]]+)\]", manuscript_text) parts = re.split(r"\[([^\]]+)\]", manuscript_text)
ms_words: list[str] = []
marker_positions: list[tuple[str, int]] = []
for i, part in enumerate(parts): raw_contexts = []
if i % 2 == 0: for i in range(1, len(parts), 2):
text = _strip_unknown_markers(part, slides, videos, audio) marker_id = parts[i]
for w in text.split():
norm = _normalize_token(w) if not _is_known_marker(marker_id, slides, videos, audio):
if norm: continue
ms_words.append(norm)
text_pieces = []
j = i + 1
while j < len(parts):
chunk = parts[j].strip()
if chunk:
text_pieces.append(chunk)
j += 1
if j >= len(parts):
break
if _is_known_marker(parts[j], slides, videos, audio):
break
j += 1
following_text = " ".join(text_pieces)
following_text = " ".join(following_text.split())
following_text = _strip_unknown_markers(following_text, slides, videos, audio)
following_text = " ".join(following_text.split())
raw_contexts.append((marker_id, following_text))
contexts = []
for i, (marker_id, following_text) in enumerate(raw_contexts):
if following_text:
words = following_text.split()[:10]
contexts.append((marker_id, " ".join(words), False))
else: else:
marker_id = part borrowed = False
if _is_known_marker(marker_id, slides, videos, audio): for j in range(i + 1, len(raw_contexts)):
marker_positions.append((marker_id, len(ms_words))) next_marker_id, next_text = raw_contexts[j]
if next_text:
if next_marker_id in (slides or {}):
break
words = next_text.split()[:10]
contexts.append((marker_id, " ".join(words), True))
borrowed = True
break
if not borrowed:
contexts.append((marker_id, "", False))
tr_words = [_normalize_token(tw.word) for tw in transcription] return contexts
matcher = difflib.SequenceMatcher(None, ms_words, tr_words, autojunk=False)
alignment: dict[int, int] = {}
for ms_start, tr_start, length in matcher.get_matching_blocks():
for k in range(length):
alignment[ms_start + k] = tr_start + k
return ms_words, marker_positions, alignment
def _timestamp_for_ms_word( def _fuzzy_match_ratio(
word_idx: int, phrase_words: list[str],
alignment: dict[int, int],
ms_len: int,
transcription: list[TranscribedWord], transcription: list[TranscribedWord],
) -> tuple[float, float]: start_idx: int,
window_size: int = 10,
) -> tuple[float, int, int]:
""" """
Map a manuscript word index to a transcript timestamp and confidence. Calculate how many words from phrase match the transcription at start_idx.
Confidence levels: Words are matched sequentially: each phrase word must appear at or after
1.0 — direct alignment hit the position of the previous match. This prevents false matches where
0.8 — a nearby word (within 5 forward) was aligned phrase words appear out of order or far into the window.
0.5 — interpolated between two surrounding anchors
0.3 — extrapolated past the last anchor Returns (ratio, first_match_offset, last_match_end_offset) where offsets
0.0 — no alignment data are relative to start_idx. last_match_end_offset points past the last
matched word.
""" """
if not transcription or not alignment: if not phrase_words:
return -1.0, 0.0 return 0.0, 0, 0
word_idx = min(word_idx, ms_len) words_to_check = min(len(phrase_words), window_size)
# +30 filler allowance: absorbs ad-libbed words spoken before or between
# the manuscript cue words without breaking the match ratio.
transcript_end = min(start_idx + words_to_check + 30, len(transcription))
if word_idx in alignment: if start_idx >= len(transcription):
return transcription[alignment[word_idx]].start, 1.0 return 0.0, 0, 0
for delta in range(1, 6): transcript_words = [
idx = word_idx + delta _normalize_token(transcription[j].word)
if idx in alignment: for j in range(start_idx, transcript_end)
return transcription[alignment[idx]].start, 0.8 ]
before = max((m for m in alignment if m < word_idx), default=None) matches = 0
after = min((m for m in alignment if m > word_idx), default=None) words_checked = 0
t_pos = 0
first_match_offset = 0
last_match_end_offset = 0
if before is not None and after is not None: for phrase_word in phrase_words[:words_to_check]:
t_b, t_a = alignment[before], alignment[after] normalized = _normalize_token(phrase_word)
ratio = (word_idx - before) / (after - before) if len(normalized) < 2:
t_idx = round(t_b + ratio * (t_a - t_b)) continue
t_idx = max(0, min(t_idx, len(transcription) - 1)) words_checked += 1
return transcription[t_idx].start, 0.5
if before is not None: for j in range(t_pos, len(transcript_words)):
return transcription[alignment[before]].end, 0.3 t_word = transcript_words[j]
matched = False
if normalized == t_word:
matched = True
elif len(normalized) >= 4 and len(t_word) >= 4:
if normalized in t_word or t_word in normalized:
matched = True
if after is not None: if matched:
return transcription[alignment[after]].start, 0.3 if matches == 0:
first_match_offset = j
matches += 1
last_match_end_offset = j + 1
t_pos = j + 1
break
return -1.0, 0.0 ratio = matches / words_checked if words_checked > 0 else 0.0
return ratio, first_match_offset, last_match_end_offset
def _find_phrase_timestamp(
phrase: str,
transcription: list[TranscribedWord],
start_from: int = 0,
fuzzy_threshold: float = 0.5,
) -> tuple[int, float, float, int]:
"""
Find a phrase in the transcription using fuzzy matching.
Returns (word_index, timestamp, confidence, match_end_idx) or
(-1, -1.0, 0.0, -1) if not found. word_index points to the first
matched word. match_end_idx points past the last matched word.
"""
phrase_words = [tok for tok in (_normalize_token(w) for w in phrase.split()) if tok]
if not phrase_words:
return -1, -1.0, 0.0, -1
best_idx = -1
best_ratio = 0.0
best_first_offset = 0
best_end_offset = 0
for i in range(start_from, len(transcription)):
ratio, first_offset, end_offset = _fuzzy_match_ratio(
phrase_words, transcription, i
)
if ratio > best_ratio:
best_ratio = ratio
best_idx = i
best_first_offset = first_offset
best_end_offset = end_offset
if ratio >= 0.95:
break
if best_ratio >= fuzzy_threshold and best_idx >= 0:
actual_idx = best_idx + best_first_offset
match_end_idx = best_idx + best_end_offset
return actual_idx, transcription[actual_idx].start, best_ratio, match_end_idx
return -1, -1.0, 0.0, -1
def align_markers_to_transcription( def align_markers_to_transcription(
@@ -287,12 +364,14 @@ def align_markers_to_transcription(
fuzzy_threshold: float = 0.6, fuzzy_threshold: float = 0.6,
) -> list[MarkerTiming]: ) -> list[MarkerTiming]:
""" """
Align manuscript markers to transcription timestamps using global sequence alignment. Align manuscript markers to transcription timestamps using fuzzy phrase matching.
Builds a word-level alignment between the manuscript (markers stripped) and the For each known marker, extracts the text immediately following it in the
Whisper transcript using difflib.SequenceMatcher. Ad-libbed words in the manuscript and searches for that phrase in the Whisper transcript. Markers are
transcript appear as insertions and don't disrupt alignment of surrounding matched in manuscript order, each starting its search after the previous match.
manuscript text.
The filler-word window is intentionally large (+30 words) so that ad-libbed
words spoken before or between the manuscript cue words do not prevent a match.
Unknown markers are filtered out — they aren't pronounced and shouldn't be in Unknown markers are filtered out — they aren't pronounced and shouldn't be in
the render plan. Note: [cite:...] markers are stripped at parse time. the render plan. Note: [cite:...] markers are stripped at parse time.
@@ -303,46 +382,91 @@ def align_markers_to_transcription(
slides: Slide definitions (to identify valid slide markers) slides: Slide definitions (to identify valid slide markers)
videos: Video definitions (to identify valid video markers) videos: Video definitions (to identify valid video markers)
audio: Audio definitions (to identify valid audio markers) audio: Audio definitions (to identify valid audio markers)
fuzzy_threshold: Kept for API compatibility; unused in alignment logic fuzzy_threshold: Minimum match ratio (default 0.6 = 60% of words must match)
Returns: Returns:
List of MarkerTiming with timestamps and confidence (known markers only) List of MarkerTiming with timestamps and confidence (known markers only)
""" """
if not transcription: contexts = _extract_marker_contexts(manuscript_text, slides, videos, audio)
return []
ms_words, marker_positions, alignment = _build_sequence_alignment(
manuscript_text, transcription, slides, videos, audio
)
ms_len = len(ms_words)
timings: list[MarkerTiming] = [] timings: list[MarkerTiming] = []
for marker_id, word_idx in marker_positions: last_idx = 0
context = " ".join(ms_words[word_idx: word_idx + 10]) last_end_time = 0.0
timestamp, confidence = _timestamp_for_ms_word(
word_idx, alignment, ms_len, transcription for marker_id, following_text, is_borrowed in contexts:
if not following_text.strip():
marker_time = last_end_time + 1.0
timings.append(
MarkerTiming(
marker_id=marker_id,
timestamp=marker_time,
context="(after previous)",
confidence=1.0,
)
)
last_end_time = marker_time
continue
idx, timestamp, confidence, match_end_idx = _find_phrase_timestamp(
following_text,
transcription,
start_from=last_idx,
fuzzy_threshold=fuzzy_threshold,
) )
if timestamp >= 0:
if idx >= 0:
adjusted_time = max(0.0, timestamp - 0.5) adjusted_time = max(0.0, timestamp - 0.5)
timings.append( timings.append(
MarkerTiming( MarkerTiming(
marker_id=marker_id, marker_id=marker_id,
timestamp=adjusted_time, timestamp=adjusted_time,
context=context[:50], context=following_text[:50],
confidence=confidence, confidence=confidence,
) )
) )
if not is_borrowed:
last_idx = match_end_idx
if last_idx > 0 and last_idx <= len(transcription):
last_end_time = transcription[last_idx - 1].end
else:
last_end_time = transcription[-1].end if transcription else 0.0
else: else:
timings.append( timings.append(
MarkerTiming( MarkerTiming(
marker_id=marker_id, marker_id=marker_id,
timestamp=-1.0, timestamp=-1.0,
context=context[:50], context=following_text[:50],
confidence=0.0, confidence=0.0,
) )
) )
return timings # Deduplicate slide markers. The manuscript pattern [SN]\n\n[SN] text... is
# common: the first blank occurrence is a visual-transition cue and the second
# carries the narration text used for alignment. We keep the first entry in
# order (preserving manuscript position) but upgrade its timestamp to the
# best-matched value found for that ID, then drop subsequent duplicates.
slides_set = set(slides or {})
seen: dict[str, int] = {} # marker_id → index in deduped list
deduped: list[MarkerTiming] = []
for timing in timings:
if timing.marker_id not in slides_set:
deduped.append(timing)
continue
if timing.marker_id not in seen:
seen[timing.marker_id] = len(deduped)
deduped.append(timing)
else:
prev_idx = seen[timing.marker_id]
prev = deduped[prev_idx]
if prev.context == "(after previous)" and timing.context != "(after previous)":
deduped[prev_idx] = MarkerTiming(
marker_id=prev.marker_id,
timestamp=timing.timestamp,
context=timing.context,
confidence=timing.confidence,
)
return deduped
def build_render_plan( def build_render_plan(
@@ -453,7 +577,25 @@ def build_render_plan(
time_range=(time_offset, render_end_time) if slide_range else None, time_range=(time_offset, render_end_time) if slide_range else None,
) )
video_events = _extract_video_events( # Before extracting video events, resolve any referenced videos that are missing
# from the project's videos.json by looking them up in shared_assets/videos.json.
_VIDEO_MARKER_PREFIXES = (
"video:", "narration:", "vft:", "vfb:", "vst:", "vsb:",
"vftp:", "vfbp:", "vstp:", "vsbp:",
)
missing_video_ids = [
timing.marker_id[len(prefix):]
for timing in marker_timings
if timing.timestamp >= 0
for prefix in _VIDEO_MARKER_PREFIXES
if timing.marker_id.startswith(prefix)
and timing.marker_id[len(prefix):] not in videos
]
if missing_video_ids:
found = resolve_missing_videos(missing_video_ids, project_path, config)
videos.update(found)
video_events, video_warnings = _extract_video_events(
marker_timings, marker_timings,
videos, videos,
config.cutouts, config.cutouts,
@@ -461,6 +603,12 @@ def build_render_plan(
effective_duration, effective_duration,
time_range=(time_offset, render_end_time) if slide_range else None, time_range=(time_offset, render_end_time) if slide_range else None,
) )
if video_warnings:
import sys
print("\nWarnings:", file=sys.stderr)
for w in video_warnings:
print(f"{w}", file=sys.stderr)
print("", file=sys.stderr)
# Track cached files for triggered videos # Track cached files for triggered videos
for event in video_events: for event in video_events:
@@ -726,13 +874,16 @@ def _extract_video_events(
slides: dict[str, SlideDefinition], slides: dict[str, SlideDefinition],
total_duration: float, total_duration: float,
time_range: Optional[tuple[float, float]] = None, time_range: Optional[tuple[float, float]] = None,
) -> list[VideoEvent]: ) -> tuple[list[VideoEvent], list[str]]:
""" """
Extract video events from aligned marker timings. Extract video events from aligned marker timings.
- [video:xxx] events end at the next SLIDE marker - [video:xxx] events end at the next SLIDE marker
- [narration:xxx] events run until end - [narration:xxx] events run until end
Returns (events, warnings). Invalid markers are skipped and reported in warnings.
""" """
warnings: list[str] = []
range_start, range_end = time_range if time_range else (0.0, float("inf")) range_start, range_end = time_range if time_range else (0.0, float("inf"))
# Collect slide times for video: end time calculation # Collect slide times for video: end time calculation
@@ -772,17 +923,18 @@ def _extract_video_events(
if shorthand_match: if shorthand_match:
video_id = mid[len(shorthand_match) :] video_id = mid[len(shorthand_match) :]
if video_id not in videos: if video_id not in videos:
raise ValueError( warnings.append(
f"Marker [{mid}] references unknown video '{video_id}'. " f"[{mid}] references unknown video '{video_id}' — skipped. "
f"Add it to videos.json or remove the marker." f"Add it to videos.json or remove the marker."
) )
continue
implied_cutout, implied_layer = _SHORTHAND[shorthand_match] implied_cutout, implied_layer = _SHORTHAND[shorthand_match]
if implied_cutout not in cutouts: if implied_cutout not in cutouts:
raise ValueError( warnings.append(
f"Marker [{mid}] uses shorthand '{shorthand_match}' which requires " f"[{mid}] requires cutout '{implied_cutout}' which is not defined in project config — skipped. "
f"cutout '{implied_cutout}' but it is not defined in project config. "
f"Available cutouts: {list(cutouts.keys())}" f"Available cutouts: {list(cutouts.keys())}"
) )
continue
video_markers.append( video_markers.append(
(timing.timestamp, video_id, "video", implied_cutout, implied_layer) (timing.timestamp, video_id, "video", implied_cutout, implied_layer)
) )
@@ -792,20 +944,23 @@ def _extract_video_events(
if mid.startswith("video:"): if mid.startswith("video:"):
video_id = mid[6:] video_id = mid[6:]
if video_id not in videos: if video_id not in videos:
raise ValueError( warnings.append(
f"Marker [video:{video_id}] references unknown video '{video_id}'. " f"[video:{video_id}] references unknown video '{video_id}' — skipped. "
f"Add it to videos.json or remove the marker." f"Add it to videos.json or remove the marker."
) )
continue
video_source = videos[video_id] video_source = videos[video_id]
if not video_source.cutout: if not video_source.cutout:
raise ValueError( warnings.append(
f"Marker [video:{video_id}] — video '{video_id}' has no 'cutout' set in videos.json." f"[video:{video_id}] has no 'cutout' set in videos.json — skipped."
) )
continue
if video_source.cutout not in cutouts: if video_source.cutout not in cutouts:
raise ValueError( warnings.append(
f"Marker [video:{video_id}] cutout '{video_source.cutout}' is not defined in project config. " f"[video:{video_id}] cutout '{video_source.cutout}' is not defined in project config — skipped. "
f"Available: {list(cutouts.keys())}" f"Available: {list(cutouts.keys())}"
) )
continue
video_markers.append((timing.timestamp, video_id, "video", None, None)) video_markers.append((timing.timestamp, video_id, "video", None, None))
continue continue
@@ -813,20 +968,23 @@ def _extract_video_events(
if mid.startswith("narration:"): if mid.startswith("narration:"):
video_id = mid[10:] video_id = mid[10:]
if video_id not in videos: if video_id not in videos:
raise ValueError( warnings.append(
f"Marker [narration:{video_id}] references unknown video '{video_id}'. " f"[narration:{video_id}] references unknown video '{video_id}' — skipped. "
f"Add it to videos.json or remove the marker." f"Add it to videos.json or remove the marker."
) )
continue
video_source = videos[video_id] video_source = videos[video_id]
if not video_source.cutout: if not video_source.cutout:
raise ValueError( warnings.append(
f"Marker [narration:{video_id}] — video '{video_id}' has no 'cutout' set in videos.json." f"[narration:{video_id}] has no 'cutout' set in videos.json — skipped."
) )
continue
if video_source.cutout not in cutouts: if video_source.cutout not in cutouts:
raise ValueError( warnings.append(
f"Marker [narration:{video_id}] cutout '{video_source.cutout}' is not defined in project config. " f"[narration:{video_id}] cutout '{video_source.cutout}' is not defined in project config — skipped. "
f"Available: {list(cutouts.keys())}" f"Available: {list(cutouts.keys())}"
) )
continue
video_markers.append((timing.timestamp, video_id, "narration", None, None)) video_markers.append((timing.timestamp, video_id, "narration", None, None))
events: list[VideoEvent] = [] events: list[VideoEvent] = []
@@ -880,7 +1038,7 @@ def _extract_video_events(
) )
) )
return events return events, warnings
def _extract_audio_events( def _extract_audio_events(