Files
gnommo/gnommo/transformer.py
T
2026-05-12 08:07:12 +02:00

1286 lines
45 KiB
Python

"""Transform stage: resolve timings and build render plan."""
import re
import string
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
from .models import (
AudioDefinition,
AudioEvent,
CameraEvent,
CameraState,
CutoutDefinition,
CAMERA_PRESETS,
NarrationPause,
OutroEvent,
ProjectConfig,
RenderPlan,
SlideDefinition,
SlideEvent,
VideoEvent,
VideoSource,
)
from .parser import get_video_duration, resolve_missing_videos
from .transcriber import TranscribedWord
# Audio trigger offset: play sound this many seconds before the marker
AUDIO_OFFSET_SECONDS = 1.0
@dataclass
class MarkerTiming:
"""A marker with its aligned timestamp and confidence."""
marker_id: str
timestamp: float # -1 if not found
context: str # the text following the marker
confidence: float # 0-1, how confident the match is
def _normalize_text(text: str) -> str:
"""Normalize text for matching (lowercase, expand contractions, remove punctuation)."""
text = text.lower()
# Expand common contractions before removing punctuation
# This ensures "I'm" matches "I am" in transcripts
contractions = {
"i'm": "i am",
"you're": "you are",
"we're": "we are",
"they're": "they are",
"he's": "he is",
"she's": "she is",
"it's": "it is",
"that's": "that is",
"what's": "what is",
"there's": "there is",
"here's": "here is",
"who's": "who is",
"how's": "how is",
"let's": "let us",
"i've": "i have",
"you've": "you have",
"we've": "we have",
"they've": "they have",
"i'd": "i would",
"you'd": "you would",
"he'd": "he would",
"she'd": "she would",
"we'd": "we would",
"they'd": "they would",
"i'll": "i will",
"you'll": "you will",
"he'll": "he will",
"she'll": "she will",
"we'll": "we will",
"they'll": "they will",
"isn't": "is not",
"aren't": "are not",
"wasn't": "was not",
"weren't": "were not",
"haven't": "have not",
"hasn't": "has not",
"hadn't": "had not",
"won't": "will not",
"wouldn't": "would not",
"don't": "do not",
"doesn't": "does not",
"didn't": "did not",
"can't": "cannot",
"couldn't": "could not",
"shouldn't": "should not",
"mightn't": "might not",
"mustn't": "must not",
}
for contraction, expansion in contractions.items():
text = re.sub(r"\b" + re.escape(contraction) + r"\b", expansion, text)
text = re.sub(r"[^\w\s]", "", text)
text = re.sub(r"\s+", " ", text)
return text.strip()
def _normalize_token(word: str) -> str:
"""Normalize a single word token for comparison.
Strips leading/trailing punctuation and lowercases. Interior characters
(e.g. apostrophes in contractions) are preserved so "don't" stays "don't".
Applied to both transcript tokens and phrase words at comparison time.
"""
return word.lower().strip(string.punctuation)
def _is_known_marker(
marker_id: str, slides: dict = None, videos: dict = None, audio: dict = None
) -> bool:
"""
Check if a marker is a known type that should be processed.
Known markers:
- Slide markers (S1, S2, etc.) - must be in slides dict
- video:xxx - video triggers
- narration:xxx - narration triggers
- Camera presets (Zoom1, TiltLeft, etc.)
- Audio markers (A1, A2, etc.)
Unknown markers are ignored (not part of the render plan).
"""
slides = slides or {}
videos = videos or {}
audio = audio or {}
# Slide markers
if marker_id in slides:
return True
# Video/narration triggers (all supported prefixes)
_VIDEO_PREFIXES = (
"video:",
"narration:",
"vft:",
"vfb:",
"vf2t:",
"vf2b:",
"vst:",
"vsb:",
"vftp:",
"vfbp:",
"vf2tp:",
"vf2bp:",
"vstp:",
"vsbp:",
)
if any(marker_id.startswith(p) for p in _VIDEO_PREFIXES):
return True
# Camera presets
if marker_id in CAMERA_PRESETS:
return True
# Audio markers (A followed by id, e.g., Awoosh) or audio: prefix (e.g., audio:woosh)
if marker_id.startswith("A") and len(marker_id) > 1:
audio_id = marker_id[1:]
if audio_id in audio or audio_id.isdigit():
return True
if marker_id.startswith("audio:") and audio is not None:
audio_id = marker_id[6:]
if audio_id in audio:
return True
return False
def _strip_unknown_markers(
text: str, slides: dict = None, videos: dict = None, audio: dict = None
) -> str:
"""
Remove all [...] markers from context text — none are pronounced aloud.
Note: [cite:...] markers are already stripped at parse time by parse_manuscript().
"""
return re.sub(r"\[([^\]]+)\]", "", text)
def _extract_marker_contexts(
manuscript_text: str,
slides: dict = None,
videos: dict = None,
audio: dict = None,
) -> list[tuple[str, str, bool, str]]:
"""
Extract known markers and the text immediately following them from manuscript.
Unknown markers are filtered out and stripped from following text.
Note: [cite:...] markers are already stripped at parse time.
Returns list of (marker_id, anchor_text, is_borrowed, anchor_type) tuples.
anchor_type is "before" (default — place before the matched phrase) or
"after" (place at the end of the matched phrase — used for markers that
trail a narration block and have no following text of their own).
"""
slides = slides or {}
videos = videos or {}
audio = audio or {}
parts = re.split(r"\[([^\]]+)\]", manuscript_text)
raw_contexts = []
for i in range(1, len(parts), 2):
marker_id = parts[i]
if not _is_known_marker(marker_id, slides, videos, audio):
continue
text_pieces = []
j = i + 1
while j < len(parts):
chunk = parts[j].strip()
if chunk:
text_pieces.append(chunk)
j += 1
if j >= len(parts):
break
if _is_known_marker(parts[j], slides, videos, audio):
break
j += 1
following_text = " ".join(text_pieces)
following_text = " ".join(following_text.split())
following_text = _strip_unknown_markers(following_text, slides, videos, audio)
following_text = " ".join(following_text.split())
raw_contexts.append((marker_id, following_text))
contexts = []
for i, (marker_id, following_text) in enumerate(raw_contexts):
if following_text:
words = following_text.split()[:10]
contexts.append((marker_id, " ".join(words), False, "before"))
else:
borrowed = False
for j in range(i + 1, len(raw_contexts)):
next_marker_id, next_text = raw_contexts[j]
if next_text:
if next_marker_id in (slides or {}):
break
words = next_text.split()[:10]
contexts.append((marker_id, " ".join(words), True, "before"))
borrowed = True
break
if not borrowed:
# No following text and blocked by a slide boundary — look
# backward for the tail of the preceding narration block and
# anchor to the END of those words instead of extrapolating.
preceding_text = ""
for k in range(i - 1, -1, -1):
if raw_contexts[k][1]:
preceding_text = raw_contexts[k][1]
break
if preceding_text:
words = preceding_text.split()
tail = " ".join(words[-6:])
contexts.append((marker_id, tail, False, "after"))
else:
contexts.append((marker_id, "", False, "before"))
return contexts
def _fuzzy_match_ratio(
phrase_words: list[str],
transcription: list[TranscribedWord],
start_idx: int,
window_size: int = 10,
pre_filler: int = 30,
inter_filler: int = 3,
) -> tuple[float, int, int]:
"""
Calculate how many words from phrase match the transcription at start_idx.
Words are matched sequentially. Two separate filler tolerances:
- pre_filler: max words before the FIRST phrase word (absorbs ad-libs)
- inter_filler: max words between consecutive phrase words (keeps the
match tight so common words don't stretch the window far
into later text, which would push last_idx past subsequent
markers' positions)
Returns (ratio, first_match_offset, last_match_end_offset) where offsets
are relative to start_idx. last_match_end_offset points past the last
matched word.
"""
if not phrase_words:
return 0.0, 0, 0
if start_idx >= len(transcription):
return 0.0, 0, 0
words_to_check = min(len(phrase_words), window_size)
# Window only needs to cover pre_filler + phrase words + inter_filler slack
transcript_end = min(start_idx + pre_filler + words_to_check + inter_filler, len(transcription))
transcript_words = [
_normalize_token(transcription[j].word)
for j in range(start_idx, transcript_end)
]
matches = 0
words_checked = 0
t_pos = 0
first_match_offset = 0
last_match_end_offset = 0
for phrase_word in phrase_words[:words_to_check]:
normalized = _normalize_token(phrase_word)
if len(normalized) < 2:
continue
words_checked += 1
# First phrase word may be preceded by a long ad-lib; subsequent words
# should appear within a few positions of each other.
if matches == 0:
search_end = min(t_pos + pre_filler + 1, len(transcript_words))
else:
search_end = min(t_pos + inter_filler + 1, len(transcript_words))
for j in range(t_pos, search_end):
t_word = transcript_words[j]
matched = False
if normalized == t_word:
matched = True
elif len(normalized) >= 4 and len(t_word) >= 4:
if normalized in t_word or t_word in normalized:
matched = True
if matched:
if matches == 0:
first_match_offset = j
matches += 1
last_match_end_offset = j + 1
t_pos = j + 1
break
ratio = matches / words_checked if words_checked > 0 else 0.0
return ratio, first_match_offset, last_match_end_offset
def _find_phrase_timestamp(
phrase: str,
transcription: list[TranscribedWord],
start_from: int = 0,
fuzzy_threshold: float = 0.5,
) -> tuple[int, float, float, int]:
"""
Find a phrase in the transcription using fuzzy matching.
Returns (word_index, timestamp, confidence, match_end_idx) or
(-1, -1.0, 0.0, -1) if not found. word_index points to the first
matched word. match_end_idx points past the last matched word.
"""
phrase_words = [tok for tok in (_normalize_token(w) for w in phrase.split()) if tok]
if not phrase_words:
return -1, -1.0, 0.0, -1
best_idx = -1
best_ratio = 0.0
best_first_offset = 0
best_end_offset = 0
for i in range(start_from, len(transcription)):
ratio, first_offset, end_offset = _fuzzy_match_ratio(
phrase_words, transcription, i
)
if ratio > best_ratio:
best_ratio = ratio
best_idx = i
best_first_offset = first_offset
best_end_offset = end_offset
# Sequential alignment: stop at the first position that clears the
# threshold. Continuing to scan the full transcript risks jumping
# to a higher-ratio match much later and skipping over subsequent
# markers' positions entirely.
if best_ratio >= fuzzy_threshold:
break
if best_ratio >= fuzzy_threshold and best_idx >= 0:
actual_idx = best_idx + best_first_offset
match_end_idx = best_idx + best_end_offset
return actual_idx, transcription[actual_idx].start, best_ratio, match_end_idx
return -1, -1.0, 0.0, -1
def align_markers_to_transcription(
manuscript_text: str,
transcription: list[TranscribedWord],
slides: dict = None,
videos: dict = None,
audio: dict = None,
fuzzy_threshold: float = 0.6,
) -> list[MarkerTiming]:
"""
Align manuscript markers to transcription timestamps using fuzzy phrase matching.
For each known marker, extracts the text immediately following it in the
manuscript and searches for that phrase in the Whisper transcript. Markers are
matched in manuscript order, each starting its search after the previous match.
The filler-word window is intentionally large (+30 words) so that ad-libbed
words spoken before or between the manuscript cue words do not prevent a match.
Unknown markers are filtered out — they aren't pronounced and shouldn't be in
the render plan. Note: [cite:...] markers are stripped at parse time.
Args:
manuscript_text: Full manuscript with [S1], [video:xxx], etc.
transcription: Word-level timestamps from Whisper
slides: Slide definitions (to identify valid slide markers)
videos: Video definitions (to identify valid video markers)
audio: Audio definitions (to identify valid audio markers)
fuzzy_threshold: Minimum match ratio (default 0.6 = 60% of words must match)
Returns:
List of MarkerTiming with timestamps and confidence (known markers only)
"""
contexts = _extract_marker_contexts(manuscript_text, slides, videos, audio)
timings: list[MarkerTiming] = []
last_idx = 0
last_end_time = 0.0
for marker_id, anchor_text, is_borrowed, anchor_type in contexts:
if not anchor_text.strip():
marker_time = last_end_time + 1.0
timings.append(
MarkerTiming(
marker_id=marker_id,
timestamp=marker_time,
context="(after previous)",
confidence=1.0,
)
)
last_end_time = marker_time
continue
idx, timestamp, confidence, match_end_idx = _find_phrase_timestamp(
anchor_text,
transcription,
start_from=last_idx,
fuzzy_threshold=fuzzy_threshold,
)
if idx >= 0:
if anchor_type == "after":
# Marker trails a narration block — place it at the END of the
# matched phrase (when those words finish being spoken).
end_idx = min(match_end_idx - 1, len(transcription) - 1)
marker_time = transcription[end_idx].end if transcription else 0.0
timings.append(
MarkerTiming(
marker_id=marker_id,
timestamp=marker_time,
context=f"(end of: {anchor_text[:40]})",
confidence=confidence,
)
)
last_idx = match_end_idx
last_end_time = marker_time
else:
adjusted_time = max(0.0, timestamp - 0.5)
timings.append(
MarkerTiming(
marker_id=marker_id,
timestamp=adjusted_time,
context=anchor_text[:50],
confidence=confidence,
)
)
if not is_borrowed:
last_idx = match_end_idx
if last_idx > 0 and last_idx <= len(transcription):
last_end_time = transcription[last_idx - 1].end
else:
last_end_time = transcription[-1].end if transcription else 0.0
else:
timings.append(
MarkerTiming(
marker_id=marker_id,
timestamp=-1.0,
context=anchor_text[:50],
confidence=0.0,
)
)
# Deduplicate slide markers. The manuscript pattern [SN]\n\n[SN] text... is
# common: the first blank occurrence is a visual-transition cue and the second
# carries the narration text used for alignment. We keep the first entry in
# order (preserving manuscript position) but upgrade its timestamp to the
# best-matched value found for that ID, then drop subsequent duplicates.
slides_set = set(slides or {})
seen: dict[str, int] = {} # marker_id → index in deduped list
deduped: list[MarkerTiming] = []
for timing in timings:
if timing.marker_id not in slides_set:
deduped.append(timing)
continue
if timing.marker_id not in seen:
seen[timing.marker_id] = len(deduped)
deduped.append(timing)
else:
prev_idx = seen[timing.marker_id]
prev = deduped[prev_idx]
if prev.context == "(after previous)" and timing.context != "(after previous)":
deduped[prev_idx] = MarkerTiming(
marker_id=prev.marker_id,
timestamp=timing.timestamp,
context=timing.context,
confidence=timing.confidence,
)
return deduped
def build_render_plan(
project_path: Path,
config: ProjectConfig,
slides: dict[str, SlideDefinition],
videos: dict[str, VideoSource],
videos_dir: Path,
manuscript_text: str,
transcription: list[TranscribedWord],
audio: Optional[dict[str, AudioDefinition]] = None,
audio_dir: Optional[Path] = None,
slide_range: Optional[tuple[str, Optional[str]]] = None,
) -> tuple[RenderPlan, list[MarkerTiming]]:
"""
Build a complete render plan from manuscript and transcription.
This performs on-the-fly alignment of manuscript markers to transcription
timestamps, then builds the render plan.
Args:
manuscript_text: The manuscript.txt content (source of truth for markers)
transcription: Word-level timestamps from whisper transcription
slide_range: Optional tuple of (start_slide, end_slide) for partial rendering.
Returns:
Tuple of (RenderPlan, list of MarkerTiming for display)
"""
audio = audio or {}
audio_dir = audio_dir or project_path
# Find the main narration video first (need skip value for timing adjustment)
narration_video_id = config.main_video
if isinstance(narration_video_id, list):
narration_video_id = narration_video_id[0] if narration_video_id else None
if not (narration_video_id and narration_video_id in videos):
raise ValueError(
f"Main video '{narration_video_id}' not specified or not found in videos. "
f"Available: {list(videos.keys())}"
)
narration_video = videos[narration_video_id]
# Align markers to transcription timestamps
marker_timings = align_markers_to_transcription(
manuscript_text, transcription, slides=slides, videos=videos, audio=audio
)
# Apply skip offset: if narration video has skip, subtract it from all timestamps
# This accounts for the fact that the video will start at skip seconds, not 0
narration_skip = narration_video.skip
if narration_skip > 0:
for timing in marker_timings:
if timing.timestamp >= 0:
timing.timestamp = max(0.0, timing.timestamp - narration_skip)
# Build marker -> timestamp lookup
marker_times: dict[str, float] = {}
for timing in marker_timings:
if timing.timestamp >= 0:
marker_times[timing.marker_id] = timing.timestamp
# Find shared_assets directory
shared_assets_dir = None
if (project_path / "shared_assets").exists():
shared_assets_dir = project_path / "shared_assets"
elif (project_path.parent / "shared_assets").exists():
shared_assets_dir = project_path.parent / "shared_assets"
narration_video = videos[narration_video_id]
cutout = config.cutouts[narration_video.cutout]
# Track which files are loaded from external cache
cached_files: set[str] = set()
narration_videos: list[tuple[str, VideoSource, CutoutDefinition]] = []
video_path, is_cached = _resolve_video_path(
videos_dir, narration_video, shared_assets_dir, project_path
)
if is_cached:
cached_files.add(narration_video_id)
full_duration = get_video_duration(video_path)
# Adjust duration for skip (content starts at skip, so effective duration is less)
effective_duration = full_duration - narration_skip
# Get total duration from first always_visible video
narration_videos.append((narration_video_id, narration_video, cutout))
# Resolve slide range to time range
time_offset = 0.0
render_end_time = effective_duration
if slide_range:
start_slide, end_slide = slide_range
if start_slide not in marker_times:
raise ValueError(
f"Start slide '{start_slide}' not found in aligned markers"
)
time_offset = marker_times[start_slide]
if end_slide:
if end_slide not in marker_times:
raise ValueError(
f"End slide '{end_slide}' not found in aligned markers"
)
render_end_time = marker_times[end_slide]
# Build events from aligned markers
slide_events = _extract_slide_events(
marker_timings,
slides,
effective_duration,
time_range=(time_offset, render_end_time) if slide_range else None,
)
# Before extracting video events, resolve any referenced videos that are missing
# from the project's videos.json by looking them up in shared_assets/videos.json.
_VIDEO_MARKER_PREFIXES = (
"video:", "narration:", "vft:", "vfb:", "vf2t:", "vf2b:", "vst:", "vsb:",
"vftp:", "vfbp:", "vf2tp:", "vf2bp:", "vstp:", "vsbp:",
)
missing_video_ids = [
timing.marker_id[len(prefix):]
for timing in marker_timings
if timing.timestamp >= 0
for prefix in _VIDEO_MARKER_PREFIXES
if timing.marker_id.startswith(prefix)
and timing.marker_id[len(prefix):] not in videos
]
if missing_video_ids:
found = resolve_missing_videos(missing_video_ids, project_path, config)
videos.update(found)
video_events, video_warnings = _extract_video_events(
marker_timings,
videos,
config.cutouts,
slides,
effective_duration,
time_range=(time_offset, render_end_time) if slide_range else None,
)
if video_warnings:
import sys
print("\nWarnings:", file=sys.stderr)
for w in video_warnings:
print(f"{w}", file=sys.stderr)
print("", file=sys.stderr)
# Track cached files for triggered videos
for event in video_events:
_, is_cached = _resolve_video_path(
videos_dir, event.video_source, shared_assets_dir, project_path
)
if is_cached:
cached_files.add(event.video_id)
audio_events = _extract_audio_events(
marker_timings,
audio,
time_range=(time_offset, render_end_time) if slide_range else None,
)
camera_events, initial_camera_state = _extract_camera_events(
marker_timings,
time_range=(time_offset, render_end_time) if slide_range else None,
)
# Apply time offset to all events (for partial rendering)
if time_offset > 0:
for event in slide_events:
event.start_time -= time_offset
event.end_time -= time_offset
for event in video_events:
event.start_time -= time_offset
event.end_time -= time_offset
for event in audio_events:
event.start_time = max(0, event.start_time - time_offset)
for event in camera_events:
event.time -= time_offset
total_duration = render_end_time - time_offset
# Handle narration pauses (videos that pause the narration track)
narration_pauses: list[NarrationPause] = []
pause_video_events = [e for e in video_events if e.video_source.pause_narration]
if pause_video_events:
# Sort pause events by their narration time
pause_video_events.sort(key=lambda e: e.start_time)
cumulative_offset = 0.0
for event in pause_video_events:
pause_duration = event.video_source.pause_narration
narration_time = event.start_time # Time in narration source
# Create pause record (before applying offset to this event)
narration_pauses.append(
NarrationPause(
output_time=narration_time + cumulative_offset,
narration_time=narration_time,
duration=pause_duration,
video_id=event.video_id,
)
)
# Offset all events that come AFTER this pause
for slide_event in slide_events:
if slide_event.start_time > narration_time:
slide_event.start_time += pause_duration
if slide_event.end_time > narration_time:
slide_event.end_time += pause_duration
for vid_event in video_events:
if vid_event.start_time > narration_time:
vid_event.start_time += pause_duration
if vid_event.end_time > narration_time:
vid_event.end_time += pause_duration
for aud_event in audio_events:
if aud_event.start_time > narration_time:
aud_event.start_time += pause_duration
for cam_event in camera_events:
if cam_event.time > narration_time:
cam_event.time += pause_duration
cumulative_offset += pause_duration
# Update total duration
total_duration += cumulative_offset
# Save narration end time (before outro)
narration_end_time = total_duration
# Resolve any outro videos missing from videos.json via shared_assets.
if config.outro:
missing_outro_ids = [vid_id for vid_id in config.outro if vid_id not in videos]
if missing_outro_ids:
found = resolve_missing_videos(missing_outro_ids, project_path, config)
videos.update(found)
still_missing = [vid_id for vid_id in config.outro if vid_id not in videos]
for vid_id in still_missing:
print(f" WARNING: outro video '{vid_id}' not found in videos.json or shared_assets — skipped", flush=True)
# Build outro events (plays after narration ends)
outro_events = _extract_outro_events(
config.outro,
videos,
config.cutouts,
total_duration,
videos_dir,
shared_assets_dir,
project_path,
cached_files,
)
# Update total duration to include outro
if outro_events:
total_duration = outro_events[-1].end_time
# Derive slides directory — lowercase path for case-sensitive filesystems (WSL/Linux).
slides_json_path = project_path / config.slides_path.lower()
slides_dir = slides_json_path.parent
plan = RenderPlan(
project_path=project_path,
config=config,
slide_events=slide_events,
total_duration=total_duration,
slides=slides,
videos=videos,
video_events=video_events,
narration_videos=narration_videos,
slides_dir=slides_dir,
videos_dir=videos_dir,
audio_events=audio_events,
audio=audio,
audio_dir=audio_dir,
camera_events=camera_events,
time_offset=time_offset,
initial_camera_state=initial_camera_state,
input_seek_time=time_offset,
shared_assets_dir=shared_assets_dir,
narration_pauses=narration_pauses,
outro_events=outro_events,
narration_end_time=narration_end_time,
cached_files=cached_files,
)
return plan, marker_timings
def _resolve_video_path(
videos_dir: Path,
video_source: VideoSource,
shared_assets_dir: Path = None,
project_path: Path = None,
) -> tuple[Path, bool]:
"""Resolve the actual video file path with cache fallback.
Returns:
Tuple of (resolved_path, is_cached) where is_cached=True if
the file was found in the external cache.
"""
from .cache import resolve_with_cache
if video_source.is_shared and shared_assets_dir:
base_dir = shared_assets_dir
else:
base_dir = videos_dir
if video_source.output_file:
video_path = base_dir / video_source.output_file
if project_path:
resolved, is_cached = resolve_with_cache(video_path, project_path)
if resolved.exists():
return resolved, is_cached
elif video_path.exists():
return video_path, False
webm_path = video_path.with_suffix(".mov")
if project_path:
resolved, is_cached = resolve_with_cache(webm_path, project_path)
if resolved.exists():
return resolved, is_cached
elif webm_path.exists():
return webm_path, False
source_path = base_dir / video_source.source_file
if project_path:
return resolve_with_cache(source_path, project_path)
return source_path, False
def _extract_slide_events(
marker_timings: list[MarkerTiming],
slides: dict[str, SlideDefinition],
total_duration: float,
time_range: Optional[tuple[float, float]] = None,
) -> list[SlideEvent]:
"""Extract slide events from aligned marker timings.
Each slide starts at its own marker timestamp and ends when the next
slide's marker appears. Before the first slide, no slide is shown.
Slides that could not be aligned (timestamp < 0) have their position
interpolated evenly between the surrounding aligned slides rather than
being excluded.
"""
range_start, range_end = time_range if time_range else (0.0, float("inf"))
# Get ALL slide markers in manuscript order (aligned and unaligned)
all_slide_markers: list[tuple[float, str]] = []
for timing in marker_timings:
if timing.marker_id in slides:
all_slide_markers.append((timing.timestamp, timing.marker_id))
if not all_slide_markers:
return []
# Interpolate timestamps for unaligned slides (timestamp < 0).
# For each run of consecutive unaligned slides, spread them evenly between
# the nearest aligned slides before and after in manuscript order.
n = len(all_slide_markers)
resolved: list[tuple[float, str]] = list(all_slide_markers)
i = 0
while i < n:
if resolved[i][0] < 0:
run_start = i
while i < n and resolved[i][0] < 0:
i += 1
run_end = i # exclusive
prev_time = resolved[run_start - 1][0] if run_start > 0 else 0.0
next_time = resolved[run_end][0] if run_end < n else total_duration
count = run_end - run_start
for j, idx in enumerate(range(run_start, run_end)):
frac = (j + 1) / (count + 1)
resolved[idx] = (
prev_time + (next_time - prev_time) * frac,
resolved[idx][1],
)
else:
i += 1
events: list[SlideEvent] = []
for i, (marker_time, marker_id) in enumerate(resolved):
# First slide always starts at 0 — it's the opening state of the presentation.
start_time = 0.0 if i == 0 else marker_time
# End time is when the NEXT slide's marker appears, or end of video
if i + 1 < len(resolved):
end_time = resolved[i + 1][0]
else:
end_time = total_duration
# Filter by time range
if end_time <= range_start or start_time >= range_end:
continue
start_time = max(start_time, range_start)
end_time = min(end_time, range_end)
events.append(
SlideEvent(
slide_id=marker_id,
start_time=start_time,
end_time=end_time,
slide_def=slides[marker_id],
)
)
return events
def _extract_video_events(
marker_timings: list[MarkerTiming],
videos: dict[str, VideoSource],
cutouts: dict[str, CutoutDefinition],
slides: dict[str, SlideDefinition],
total_duration: float,
time_range: Optional[tuple[float, float]] = None,
) -> tuple[list[VideoEvent], list[str]]:
"""
Extract video events from aligned marker timings.
- [video:xxx] events end at the next SLIDE marker
- [narration:xxx] events run until end
Returns (events, warnings). Invalid markers are skipped and reported in warnings.
"""
warnings: list[str] = []
range_start, range_end = time_range if time_range else (0.0, float("inf"))
# Collect slide times for video: end time calculation
slide_times: list[float] = sorted(
[
t.timestamp
for t in marker_timings
if t.marker_id in slides and t.timestamp >= 0
]
)
# Mapping from shorthand marker prefix → (implied_cutout_name, implied_layer)
# These are the defaults; videos.json values act as a base but the marker wins.
_SHORTHAND: dict[str, tuple[str, str]] = {
"vft:": ("fullscreen", "above"),
"vfb:": ("fullscreen", "below"),
"vf2t:": ("fullscreen2", "above"),
"vf2b:": ("fullscreen2", "below"),
"vst:": ("square", "above"),
"vsb:": ("square", "below"),
"vftp:": ("fullscreen", "above", "pause_narration"),
"vfbp:": ("fullscreen", "below", "pause_narration"),
"vf2tp:": ("fullscreen2", "above", "pause_narration"),
"vf2bp:": ("fullscreen2", "below", "pause_narration"),
"vstp:": ("square", "above", "pause_narration"),
"vsbp:": ("square", "below", "pause_narration"),
}
# Collect video markers: (time, video_id, event_type, cutout_name_override, layer_override)
# event_type is "video" (ends at next slide) or "narration" (runs to end)
video_markers: list[tuple[float, str, str, str | None, str | None]] = []
for timing in marker_timings:
if timing.timestamp < 0:
continue
mid = timing.marker_id
# --- shorthand markers: vft/vfb/vst/vsb ---
shorthand_match = next((p for p in _SHORTHAND if mid.startswith(p)), None)
if shorthand_match:
video_id = mid[len(shorthand_match) :]
if video_id not in videos:
warnings.append(
f"[{mid}] references unknown video '{video_id}' — skipped. "
f"Add it to videos.json or remove the marker."
)
continue
implied_cutout, implied_layer = _SHORTHAND[shorthand_match]
if implied_cutout not in cutouts:
warnings.append(
f"[{mid}] requires cutout '{implied_cutout}' which is not defined in project config — skipped. "
f"Available cutouts: {list(cutouts.keys())}"
)
continue
video_markers.append(
(timing.timestamp, video_id, "video", implied_cutout, implied_layer)
)
continue
# --- legacy [video:xxx] ---
if mid.startswith("video:"):
video_id = mid[6:]
if video_id not in videos:
warnings.append(
f"[video:{video_id}] references unknown video '{video_id}' — skipped. "
f"Add it to videos.json or remove the marker."
)
continue
video_source = videos[video_id]
if not video_source.cutout:
warnings.append(
f"[video:{video_id}] has no 'cutout' set in videos.json — skipped."
)
continue
if video_source.cutout not in cutouts:
warnings.append(
f"[video:{video_id}] cutout '{video_source.cutout}' is not defined in project config — skipped. "
f"Available: {list(cutouts.keys())}"
)
continue
video_markers.append((timing.timestamp, video_id, "video", None, None))
continue
# --- [narration:xxx] ---
if mid.startswith("narration:"):
video_id = mid[10:]
if video_id not in videos:
warnings.append(
f"[narration:{video_id}] references unknown video '{video_id}' — skipped. "
f"Add it to videos.json or remove the marker."
)
continue
video_source = videos[video_id]
if not video_source.cutout:
warnings.append(
f"[narration:{video_id}] has no 'cutout' set in videos.json — skipped."
)
continue
if video_source.cutout not in cutouts:
warnings.append(
f"[narration:{video_id}] cutout '{video_source.cutout}' is not defined in project config — skipped. "
f"Available: {list(cutouts.keys())}"
)
continue
video_markers.append((timing.timestamp, video_id, "narration", None, None))
events: list[VideoEvent] = []
for (
start_time,
video_id,
marker_type,
cutout_override,
layer_override,
) in video_markers:
video_source = videos[video_id]
# Resolve cutout: marker override > videos.json cutout
# (validation already ensured cutout exists — this is a safety assertion)
cutout_name = cutout_override or video_source.cutout
cutout = cutouts[cutout_name]
# Resolve layer: marker override > videos.json layer
layer = layer_override if layer_override is not None else video_source.layer
end_on = video_source.end_on
if end_on == "take" and video_source.take is not None:
end_time = start_time + video_source.take
elif end_on == "end":
end_time = total_duration
elif end_on == "next_slide" or (end_on is None and marker_type == "video"):
# End at next slide marker
end_time = total_duration
for slide_time in slide_times:
if slide_time > start_time:
end_time = slide_time
break
else:
# end_on is None and marker_type == "narration": runs to end
end_time = total_duration
# Filter by time range
if start_time < range_start or start_time >= range_end:
continue
end_time = min(end_time, range_end)
events.append(
VideoEvent(
video_id=video_id,
start_time=start_time,
end_time=end_time,
video_source=video_source,
cutout=cutout,
cutout_name=cutout_name,
layer=layer,
)
)
return events, warnings
def _extract_audio_events(
marker_timings: list[MarkerTiming],
audio: dict[str, AudioDefinition],
time_range: Optional[tuple[float, float]] = None,
) -> list[AudioEvent]:
"""Extract audio events from aligned marker timings."""
range_start, range_end = time_range if time_range else (0.0, float("inf"))
events: list[AudioEvent] = []
for timing in marker_timings:
if timing.timestamp < 0:
continue
marker_id = timing.marker_id
audio_id = None
if marker_id.startswith("A") and len(marker_id) > 1:
audio_id = marker_id[1:]
elif marker_id.startswith("audio:"):
audio_id = marker_id[6:]
if audio_id is not None and audio_id in audio:
if timing.timestamp < range_start or timing.timestamp >= range_end:
continue
start_time = max(0, timing.timestamp - AUDIO_OFFSET_SECONDS)
events.append(
AudioEvent(
audio_id=audio_id,
start_time=start_time,
audio_def=audio[audio_id],
)
)
return events
def _extract_camera_events(
marker_timings: list[MarkerTiming],
time_range: Optional[tuple[float, float]] = None,
) -> tuple[list[CameraEvent], CameraState]:
"""
Extract camera events from aligned marker timings.
Camera state is cumulative. Returns (events, initial_state).
"""
range_start, range_end = time_range if time_range else (0.0, float("inf"))
events: list[CameraEvent] = []
current_state = CameraState()
initial_state = CameraState()
found_range_start = False
for timing in marker_timings:
if timing.timestamp < 0:
continue
marker_id = timing.marker_id
if marker_id not in CAMERA_PRESETS:
continue
preset = CAMERA_PRESETS[marker_id]
# Determine new state based on marker type
if marker_id in ("Reset", "NoTilt"):
new_state = CameraState()
elif marker_id.startswith("Zoom"):
new_state = CameraState(
zoom=preset.zoom,
rotation=current_state.rotation,
pan_x=current_state.pan_x,
pan_y=current_state.pan_y,
focal_x=current_state.focal_x,
focal_y=current_state.focal_y,
)
elif marker_id.startswith("Tilt"):
new_state = CameraState(
zoom=current_state.zoom,
rotation=preset.rotation,
pan_x=current_state.pan_x,
pan_y=current_state.pan_y,
focal_x=current_state.focal_x,
focal_y=current_state.focal_y,
)
elif marker_id.startswith("Pan"):
new_state = CameraState(
zoom=current_state.zoom,
rotation=current_state.rotation,
pan_x=preset.pan_x,
pan_y=preset.pan_y,
focal_x=current_state.focal_x,
focal_y=current_state.focal_y,
)
else:
new_state = preset
# Capture state at range start
if not found_range_start and timing.timestamp >= range_start:
initial_state = current_state
found_range_start = True
# Only emit events within range
if range_start <= timing.timestamp < range_end:
events.append(
CameraEvent(
time=timing.timestamp,
target_state=new_state,
duration=0.2,
easing="ease-out",
)
)
current_state = new_state
if not found_range_start:
initial_state = CameraState()
return events, initial_state
def _extract_outro_events(
outro_video_ids: list[str],
videos: dict[str, VideoSource],
cutouts: dict[str, CutoutDefinition],
narration_end_time: float,
videos_dir: Path,
shared_assets_dir: Path = None,
project_path: Path = None,
cached_files: set = None,
) -> list[OutroEvent]:
"""
Extract outro events that play after the narration ends.
Outro videos play in sequence, starting from narration_end_time.
Each video plays for its `take` duration (or full source duration if no take).
"""
events: list[OutroEvent] = []
current_time = narration_end_time
for video_id in outro_video_ids:
if video_id not in videos:
continue
video_source = videos[video_id]
# Get the video duration
video_path, is_cached = _resolve_video_path(
videos_dir, video_source, shared_assets_dir, project_path
)
if is_cached and cached_files is not None:
cached_files.add(video_id)
if video_path.exists():
full_duration = get_video_duration(video_path)
else:
full_duration = 10.0 # Fallback
# Use take if specified, otherwise use full duration
duration = video_source.take if video_source.take is not None else full_duration
# Account for skip
duration = max(0, duration)
# Resolve cutout (None = fullscreen)
cutout = None
if video_source.cutout and video_source.cutout in cutouts:
cutout = cutouts[video_source.cutout]
events.append(
OutroEvent(
video_id=video_id,
start_time=current_time,
end_time=current_time + duration,
video_source=video_source,
cutout=cutout,
)
)
current_time += duration
return events