Files

1411 lines
54 KiB
Python

"""Load stage: generate and execute FFmpeg commands."""
import math
import subprocess
from pathlib import Path
from .errors import RenderError
from .parser import _read_json
from .preprocessor import _resolve_auto_channel
from .models import (
AudioEvent,
CameraEvent,
CameraState,
CutoutDefinition,
RenderPlan,
SlideEvent,
SLIDE_LAYOUTS,
VideoEvent,
VideoSource,
)
from .preprocessor import run_ffmpeg_with_progress
def _get_audio_duration(audio_path: Path) -> float:
"""Get duration of an audio file using ffprobe.
For MP3 files, counts packets directly to get an accurate duration regardless
of whether the file has a Xing/VBRI header. Falls back to format duration for
other formats.
"""
if audio_path.suffix.lower() == ".mp3":
# Count actual packets rather than trusting the header estimate.
# This is slower but accurate for headerless VBR/CBR MP3s.
cmd = [
"ffprobe",
"-v",
"error",
"-count_packets",
"-show_entries",
"stream=nb_read_packets,duration",
"-select_streams",
"a:0",
"-of",
"default=noprint_wrappers=1:nokey=1",
str(audio_path),
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
# Output: duration\nnb_read_packets — take the first non-N/A line
for line in result.stdout.strip().splitlines():
try:
val = float(line)
if val > 0:
return val
except ValueError:
continue
cmd = [
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
str(audio_path),
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RenderError(f"Failed to get duration for {audio_path}: {result.stderr}")
return float(result.stdout.strip())
def _build_crossfade_loop_filter(
input_label: str,
output_label: str,
audio_duration: float,
overlap: float,
needed_duration: float,
volume: float,
delay_ms: int,
) -> list[str]:
"""
Build FFmpeg filter chain for crossfade looping.
Creates a seamless loop by overlapping copies of the audio with fade in/out.
Each loop iteration crossfades with the next for `overlap` seconds.
Args:
input_label: Input stream label (e.g., "[0:a]")
output_label: Output stream label (e.g., "[aud0]")
audio_duration: Duration of the source audio in seconds
overlap: Crossfade overlap duration in seconds
needed_duration: Total duration needed
volume: Volume multiplier
delay_ms: Initial delay in milliseconds
Returns:
List of filter strings to append to the filter_complex
"""
filters = []
loop_len = audio_duration - overlap
# Calculate number of loop iterations needed (add 1 extra for safety)
n_loops = math.ceil(needed_duration / loop_len) + 1
# Limit to reasonable number of loops to avoid filter complexity explosion
n_loops = min(n_loops, 100)
if n_loops <= 1:
# Single play, no looping needed
filters.append(
f"{input_label}atrim=0:{needed_duration:.3f},"
f"asetpts=PTS-STARTPTS,"
f"adelay={delay_ms}|{delay_ms},"
f"volume={volume:.2f}{output_label}"
)
return filters
# Split input into n_loops copies
split_labels = [f"[xfloop_{output_label[1:-1]}_{i}]" for i in range(n_loops)]
filters.append(f"{input_label}asplit={n_loops}{''.join(split_labels)}")
# Process each copy with appropriate delay and fades
mix_labels = []
for i in range(n_loops):
copy_label = split_labels[i]
out_label = f"[xfl_{output_label[1:-1]}_{i}]"
mix_labels.append(out_label)
loop_delay = i * loop_len
total_delay_ms = delay_ms + int(loop_delay * 1000)
# Build filter chain for this copy
chain_parts = []
# Fade in at start (except first copy)
if i > 0:
chain_parts.append(f"afade=t=in:d={overlap:.3f}")
# Fade out at end (for overlap with next copy)
# Calculate fade start time
fade_out_start = audio_duration - overlap
if fade_out_start > 0:
chain_parts.append(f"afade=t=out:st={fade_out_start:.3f}:d={overlap:.3f}")
chain_parts.append(f"adelay={total_delay_ms}|{total_delay_ms}")
chain_parts.append(f"volume={volume:.2f}")
filter_chain = ",".join(chain_parts)
filters.append(f"{copy_label}{filter_chain}{out_label}")
# Mix all copies together, then trim to needed duration
filters.append(
f"{''.join(mix_labels)}amix=inputs={n_loops}:duration=longest:normalize=0,"
f"atrim=0:{needed_duration + delay_ms/1000:.3f},"
f"asetpts=PTS-STARTPTS{output_label}"
)
return filters
def render(plan: RenderPlan, output_path: Path, verbose: bool = False) -> None:
"""
Render the final video using FFmpeg.
Generates a filter_complex command that:
1. Scales background video (if present) or creates solid color
2. Overlays talking head at configured position
3. Overlays slides at their configured positions with time-based enable
"""
# Ensure output directory exists
output_path.parent.mkdir(parents=True, exist_ok=True)
# Build and execute FFmpeg command
cmd = build_ffmpeg_command(plan, output_path)
if verbose:
print("FFmpeg command:")
print(" ".join(cmd))
print()
# Run with progress bar and ETA
result = run_ffmpeg_with_progress(
cmd, duration=plan.total_duration, description="Rendering"
)
if result.returncode != 0:
raise RenderError(
"FFmpeg rendering failed",
command=" ".join(cmd),
stderr=result.stderr,
)
def _resolve_video_path(
videos_dir: Path,
video_source: VideoSource,
shared_assets_dir: Path = None,
project_path: Path = None,
) -> Path:
"""Resolve the actual video file path (output_file if exists, else source_file).
Also checks for WebM variant since preprocessing now outputs WebM for
compressed alpha channel support.
If video_source.is_shared is True, looks in shared_assets_dir instead of videos_dir.
Uses gnommocache fallback if configured and project_path is provided.
"""
from .cache import resolve_with_cache
# Determine base directory based on is_shared flag
if video_source.is_shared and shared_assets_dir:
base_dir = shared_assets_dir
else:
base_dir = videos_dir
if video_source.output_file:
for candidate_dir in [base_dir, base_dir.parent]:
video_path = candidate_dir / video_source.output_file
# Check with cache fallback
if project_path:
resolved, _ = resolve_with_cache(video_path, project_path)
if resolved.exists():
return resolved
elif video_path.exists():
return video_path
# Check for WebM variant (preprocessing outputs compressed WebM instead of ProRes)
webm_path = video_path.with_suffix(".mov")
if project_path:
resolved, _ = resolve_with_cache(webm_path, project_path)
if resolved.exists():
return resolved
elif webm_path.exists():
return webm_path
# Fall back to source_file with cache fallback
source_path = base_dir / video_source.source_file
if project_path:
resolved, _ = resolve_with_cache(source_path, project_path)
else:
resolved = source_path
if not resolved.exists():
# File not found anywhere — substitute PlaceholderVideo so FFmpeg doesn't crash
placeholder = None
if shared_assets_dir:
p = shared_assets_dir / "PlaceholderVideo.mp4"
if project_path:
p, _ = resolve_with_cache(p, project_path)
if p.exists():
placeholder = p
if placeholder:
import sys
print(
f" Warning: {video_source.source_file} not found — using PlaceholderVideo",
file=sys.stderr,
)
return placeholder
return resolved
def _has_audio_stream(video_path: Path) -> bool:
"""Check if a video file contains a non-empty audio stream.
Uses -analyzeduration 0 to avoid the slow avformat_find_stream_info() scan
that happens when an MP4 has a declared audio track with no actual frames —
ffprobe would otherwise scan the entire file looking for audio packets.
Also checks nb_frames to reject ghost audio tracks (stream header exists in
the moov atom but no sample data in stsc/stsz).
"""
result = subprocess.run(
[
"ffprobe",
"-v",
"error",
"-analyzeduration",
"0",
"-probesize",
"1000000",
"-select_streams",
"a:0",
"-show_entries",
"stream=index,nb_frames",
"-of",
"csv=p=0",
str(video_path),
],
capture_output=True,
text=True,
)
output = result.stdout.strip()
if not output:
return False
# output is "index" or "index,nb_frames"
parts = output.split(",")
if len(parts) >= 2:
nb_frames = parts[1].strip()
if nb_frames == "0":
return False # Ghost audio track — declared but no sample data
return True
def _build_audio_channel_filter(use_audio_channels: str) -> str:
"""Build ffmpeg audio filter for channel selection.
Args:
use_audio_channels: "both", "left", or "right"
Returns:
Filter string (e.g., "pan=mono|c0=c1") or empty string for "both"
"""
if use_audio_channels == "left":
return "pan=mono|c0=c0"
elif use_audio_channels == "right":
return "pan=mono|c0=c1"
return "" # "both" - no filter needed
def build_ffmpeg_command(plan: RenderPlan, output_path: Path) -> list[str]:
"""Build the complete FFmpeg command as a list of arguments."""
cmd = ["ffmpeg", "-y"] # -y to overwrite output
# Global thread limits before any -i. Without this, each format=rgba conversion
# in the filter graph (one per video layer) spawns one swscaler thread per CPU core,
# causing OOM on Apple Silicon where av_cpu_count() returns 10-11.
from .cache import get_ffmpeg_thread_count
_tc = str(get_ffmpeg_thread_count())
cmd.extend(["-threads", _tc, "-filter_threads", _tc])
# Resolve paths to absolute
project_path = plan.project_path.resolve()
output_path = output_path.resolve()
videos_dir = plan.videos_dir.resolve() if plan.videos_dir else project_path
shared_assets_dir = (
plan.shared_assets_dir.resolve() if plan.shared_assets_dir else None
)
# Track input indices
input_idx = 0
# Input: always_visible videos (like talking head)
# Add -ss seek BEFORE -i for skip parameter and/or partial rendering
always_visible_inputs: list[int] = []
for video_id, video_source, cutout in plan.narration_videos:
video_path = _resolve_video_path(
videos_dir, video_source, shared_assets_dir, project_path
)
# Combine video skip setting with partial render offset
total_seek = video_source.skip + plan.input_seek_time
if total_seek > 0:
cmd.extend(["-ss", f"{total_seek:.3f}"])
# Skip stream analysis — codec params are in the container header, and
# duration is already known by gnommo via ffprobe (plan.total_duration).
# Without this, FFmpeg reads 100MB+ of compressed data per input at 4K
# bitrates before encoding starts ("Estimating duration from bitrate").
cmd.extend(["-analyzeduration", "0", "-probesize", "1000"])
cmd.extend(["-i", str(video_path)])
always_visible_inputs.append(input_idx)
input_idx += 1
from .cache import resolve_with_cache
# Input: background — resolved via handle in shared_assets/videos.json
bg_handle = plan.config.background
has_background = bool(bg_handle)
bg_idx = None
bg_is_image = False
if has_background:
shared_assets_dir = project_path.parent / "shared_assets"
videos_json_bg = shared_assets_dir / "videos.json"
if not videos_json_bg.exists():
raise RenderError(
f"shared_assets/videos.json not found (needed for background handle '{bg_handle}')"
)
bg_videos = _read_json(videos_json_bg)
if bg_handle not in bg_videos:
raise RenderError(
f"Background handle '{bg_handle}' not found in shared_assets/videos.json"
)
bg_path = shared_assets_dir / bg_videos[bg_handle]["source_file"]
bg_path, _ = resolve_with_cache(bg_path, plan.project_path)
if not bg_path.exists():
raise RenderError(
f"Background file not found: {bg_path} (from handle '{bg_handle}')"
)
image_extensions = {".png", ".jpg", ".jpeg", ".gif", ".bmp", ".tiff", ".webp"}
bg_is_image = bg_path.suffix.lower() in image_extensions
# Loop background videos infinitely
if not bg_is_image:
cmd.extend(["-stream_loop", "-1"])
# Duration of background video is irrelevant (looped or image) — skip analysis
cmd.extend(["-analyzeduration", "0", "-probesize", "1000"])
cmd.extend(["-i", str(bg_path)])
bg_idx = input_idx
input_idx += 1
# Input: slide images
slides_dir = (
plan.slides_dir.resolve()
if plan.slides_dir
else project_path / "media" / "slides"
)
slide_inputs: dict[str, int] = {} # slide_id -> input_idx
for event in plan.slide_events:
if event.slide_id not in slide_inputs:
image_path = slides_dir / event.slide_def.image
image_path, _ = resolve_with_cache(image_path, project_path)
cmd.extend(["-i", str(image_path)])
slide_inputs[event.slide_id] = input_idx
input_idx += 1
# Input: triggered videos
# Each video event needs its own input because they may have different skip times
# video_inputs maps (video_id, event_index) -> input_idx
video_inputs: dict[int, int] = {} # event_index -> input_idx
video_events_with_audio: set[int] = set() # event indices whose files have audio
for i, event in enumerate(plan.video_events):
video_path = _resolve_video_path(
videos_dir, event.video_source, shared_assets_dir, project_path
)
skip = event.video_source.skip or 0.0
# How long this clip needs to play in the output
clip_duration = event.end_time - event.start_time
if event.video_source.take is not None:
clip_duration = min(clip_duration, event.video_source.take)
# Loop the clip if the file is shorter than the display window.
# Don't loop pause-narration videos — they intentionally play once and stop.
needs_loop = False
if event.video_source.duration is not None and not event.video_source.pause_narration:
remaining = event.video_source.duration - skip
needs_loop = remaining < clip_duration - 0.1 # 0.1 s tolerance
if needs_loop:
cmd.extend(["-stream_loop", "-1"])
if skip > 0:
cmd.extend(["-ss", f"{skip:.3f}"])
cmd.extend(["-analyzeduration", "0", "-probesize", "1000"])
# Use pre-probed duration (or loop-limited duration) to tell FFmpeg exactly
# how much to read, preventing scans of ghost audio tracks on empty streams.
if needs_loop:
cmd.extend(["-t", f"{clip_duration:.3f}"])
elif event.video_source.duration is not None:
remaining = event.video_source.duration - skip
if remaining > 0:
cmd.extend(["-t", f"{remaining:.3f}"])
cmd.extend(["-i", str(video_path)])
video_inputs[i] = input_idx
input_idx += 1
has_audio = event.video_source.has_audio
if has_audio is None:
print(
f" Warning: no cached metadata for '{event.video_source.source_file}' — run 'gnommo import' to avoid slow probing"
)
has_audio = _has_audio_stream(video_path)
if has_audio:
video_events_with_audio.add(i)
# Input: outro videos (play after narration ends)
outro_inputs: dict[int, int] = {} # event_index -> input_idx
outro_events_with_audio: set[int] = set()
for i, event in enumerate(plan.outro_events):
video_path = _resolve_video_path(
videos_dir, event.video_source, shared_assets_dir, project_path
)
skip = event.video_source.skip or 0.0
if skip > 0:
cmd.extend(["-ss", f"{skip:.3f}"])
cmd.extend(["-analyzeduration", "0", "-probesize", "1000"])
if event.video_source.duration is not None:
remaining = event.video_source.duration - skip
if remaining > 0:
cmd.extend(["-t", f"{remaining:.3f}"])
cmd.extend(["-i", str(video_path)])
outro_inputs[i] = input_idx
input_idx += 1
has_audio = event.video_source.has_audio
if has_audio is None:
print(
f" Warning: no cached metadata for '{event.video_source.source_file}' — run 'gnommo import' to avoid slow probing"
)
has_audio = _has_audio_stream(video_path)
if has_audio:
outro_events_with_audio.add(i)
# Track where audio inputs start
num_inputs_before_audio = input_idx
# Input: audio files
audio_dir = plan.audio_dir.resolve() if plan.audio_dir else project_path
audio_inputs: dict[str, int] = {} # audio_id -> input_idx
audio_durations: dict[str, float] = {} # audio_id -> duration (for crossfade loops)
for event in plan.audio_events:
if event.audio_id not in audio_inputs:
if event.audio_def.is_shared and plan.shared_assets_dir:
audio_path = (
plan.shared_assets_dir / "media" / "audio" / event.audio_def.file
)
else:
audio_path = audio_dir / event.audio_def.file
audio_path, _ = resolve_with_cache(audio_path, project_path)
# Use pre-probed duration from audio.json if available (set by import).
# For MP3 without Xing/VBRI headers this is critical — FFmpeg otherwise
# scans the whole file to estimate duration (100s+ for large files).
# Fall back to live probe only for MP3 when duration wasn't pre-cached.
file_duration = event.audio_def.duration
if file_duration is None and audio_path.suffix.lower() == ".mp3":
file_duration = _get_audio_duration(audio_path)
if file_duration is not None:
cmd.extend(["-t", str(file_duration)])
cmd.extend(["-i", str(audio_path)])
audio_inputs[event.audio_id] = input_idx
input_idx += 1
# Cache duration for crossfade loop filter
if event.audio_def.loop and event.audio_def.overlap:
audio_durations[event.audio_id] = (
file_duration
if file_duration is not None
else _get_audio_duration(audio_path)
)
# Build filter_complex
filter_complex = build_filter_complex(
plan,
has_background,
bg_idx,
bg_is_image,
always_visible_inputs,
slide_inputs,
video_inputs,
num_inputs_before_audio,
audio_inputs,
audio_durations,
video_events_with_audio,
outro_inputs,
outro_events_with_audio,
)
cmd.extend(["-filter_complex", filter_complex])
# Map output video and audio
cmd.extend(["-map", "[vout]"])
# Determine audio source
# Priority: [aout] from filter > triggered video > no audio
# Note: we always create [aout] when always_visible_inputs exists
if always_visible_inputs:
cmd.extend(
["-map", "[aout]"]
) # Audio from filter (may be segmented or simple copy)
elif video_inputs:
# Get first triggered video's input index
first_video_idx = next(iter(video_inputs.values()))
cmd.extend(
["-map", f"{first_video_idx}:a?"]
) # Audio from first triggered video (? = optional)
# else: no audio source available, output will be silent
# Output settings
cmd.extend(
[
"-t",
str(plan.total_duration),
"-c:v",
"libx264",
"-preset",
"fast",
"-crf",
"20",
"-c:a",
"aac",
"-b:a",
"192k",
"-r",
str(plan.config.fps),
str(output_path),
]
)
return cmd
def _calculate_cutout_position(
cutout: CutoutDefinition, frame_width: int, frame_height: int
) -> tuple[int, int, int, int]:
"""Calculate pixel position, width, and height from cutout definition.
Returns: (x, y, width, height)
"""
# Calculate height
if cutout.height >= 0:
cut_height = cutout.height
else:
cut_height = int(frame_height * cutout.height_percent)
# Calculate width (defaults to height if not specified)
if cutout.width >= 0:
cut_width = cutout.width
elif cutout.width_percent > 0:
cut_width = int(frame_width * cutout.width_percent)
else:
cut_width = cut_height # Square by default
# Calculate x position
if cutout.x >= 0:
cut_x = cutout.x
else:
cut_x = int(frame_width * cutout.x_percent)
# Calculate y position
if cutout.y >= 0:
cut_y = cutout.y
else:
cut_y = int(frame_height * cutout.y_percent)
return cut_x, cut_y, cut_width, cut_height
def build_camera_transform(
camera_events: list[CameraEvent],
width: int,
height: int,
fps: int,
initial_state: CameraState = None,
output_label: str = "vout",
) -> str:
"""
Build FFmpeg filter string for camera transforms (zoom, rotate, pan).
Takes the composed [scene] and applies animated camera transforms,
outputting to the specified label.
Args:
initial_state: Camera state at t=0 (for partial rendering).
If provided and not default, a virtual event is
prepended to set the initial state.
output_label: Label for the output stream (default: "vout")
"""
# Handle initial state for partial rendering
if initial_state and not initial_state.is_default():
# Prepend a virtual event at t=0 with the initial state (instant, no transition)
initial_event = CameraEvent(
time=0.0,
target_state=initial_state,
duration=0.0, # Instant
easing="linear",
)
camera_events = [initial_event] + camera_events
# Identity transform: if no camera events, pass through.
if not camera_events:
return f"[scene]copy[{output_label}]"
# Build time-based expressions for each camera property
zoom_expr = _build_animated_expr(camera_events, "zoom", 1.0)
rotation_expr = _build_animated_expr(camera_events, "rotation", 0.0)
pan_x_expr = _build_animated_expr(camera_events, "pan_x", 0.0)
pan_y_expr = _build_animated_expr(camera_events, "pan_y", 0.0)
focal_x_expr = _build_animated_expr(camera_events, "focal_x", 0.5)
focal_y_expr = _build_animated_expr(camera_events, "focal_y", 0.5)
# Pad big enough to avoid corners during rotation
# Use even dimensions to avoid rounding issues in scale/crop
diagonal = int(math.ceil(math.sqrt(width**2 + height**2)))
pad_w = ((diagonal + 100) // 2) * 2 # Round up to even
pad_h = ((diagonal + 100) // 2) * 2
# Calculate integer offsets for centering
pad_x = (pad_w - width) // 2
pad_y = (pad_h - height) // 2
filters: list[str] = []
# Pad the scene to allow rotation without clipping
filters.append(f"[scene]pad={pad_w}:{pad_h}:{pad_x}:{pad_y}:color=black@0[padded]")
# Scale for zoom - use max(1, zoom) to prevent shrinking below pad size
# The ceil/2*2 pattern ensures even output dimensions
filters.append(
f"[padded]scale=eval=frame:"
f"w='trunc(iw*max(1,{zoom_expr})/2+0.5)*2':"
f"h='trunc(ih*max(1,{zoom_expr})/2+0.5)*2'[zoomed]"
)
# Rotate (degrees -> radians), keep transparent fill
rotation_rad = f"(-({rotation_expr})*PI/180)"
filters.append(
f"[zoomed]format=rgba,"
f"rotate=a='{rotation_rad}':ow=iw:oh=ih:c='black@0',"
f"format=yuva444p10le[rotated]"
)
# Crop back to output size with focal point and pan offsets
# focal_x/focal_y determine where the zoom centers (0.5 = center, 0 = left/top, 1 = right/bottom)
crop_x = f"((iw-{width})*({focal_x_expr}) + ({pan_x_expr})*(iw-{width})/2)"
crop_y = f"((ih-{height})*({focal_y_expr}) + ({pan_y_expr})*(ih-{height})/2)"
filters.append(f"[rotated]crop={width}:{height}:{crop_x}:{crop_y}[{output_label}]")
return ";".join(filters)
def ff_escape_expr(expr: str) -> str:
# Escape filtergraph separators that appear inside FFmpeg expressions.
# Backslash first to avoid double-escaping.
return expr.replace("\\", "\\\\").replace(":", "\\:").replace(",", "\\,")
def _build_animated_expr(
camera_events: list[CameraEvent],
property_name: str,
default_value: float,
) -> str:
"""
Build an FFmpeg expression that animates a camera property over time.
Creates a piecewise function using nested if() statements:
- Before first keyframe: default value
- During transition: linear interpolation
- After transition: hold value until next keyframe
The expression structure is built backwards (inside-out) so the final
value is the innermost default, and earlier time checks wrap around it.
"""
if not camera_events:
return str(default_value)
# Build list of (start_time, end_time, start_value, end_value) segments
segments: list[tuple[float, float, float, float]] = []
prev_value = default_value
prev_end_time = 0.0
for event in camera_events:
target_value = getattr(event.target_state, property_name)
start_time = event.time
duration = event.duration
# Hold segment: from previous end to this start (if gap exists)
if start_time > prev_end_time:
segments.append((prev_end_time, start_time, prev_value, prev_value))
# Transition segment
if duration > 0:
end_time = start_time + duration
segments.append((start_time, end_time, prev_value, target_value))
else:
# Instant change - represented as a very short segment
end_time = start_time
prev_value = target_value
prev_end_time = end_time
# Build expression from the last segment backwards
# Start with the final held value
expr = str(prev_value)
# Process segments in reverse order
for start_time, end_time, start_val, end_val in reversed(segments):
if start_time == end_time:
# Point change (instant)
continue
if start_val == end_val:
# Hold segment: constant value
segment_expr = str(start_val)
else:
# Transition segment: linear interpolation
# lerp = start + (end - start) * (t - start_time) / duration
duration = end_time - start_time
segment_expr = f"({start_val}+({end_val}-{start_val})*(t-{start_time:.3f})/{duration:.3f})"
# Wrap with time check
expr = f"if(between(t,{start_time:.3f},{end_time:.3f}),{segment_expr},{expr})"
# Handle time before first segment
if segments and segments[0][0] > 0:
expr = f"if(lt(t,{segments[0][0]:.3f}),{default_value},{expr})"
# Escape special characters for FFmpeg filtergraph
escaped = ff_escape_expr(expr)
return escaped
def _build_narration_segments(
pauses: list, total_duration: float
) -> list[tuple[float, float, float, float]]:
"""
Build narration video segments accounting for pauses.
Returns list of (source_start, source_end, output_start, output_end) tuples.
Example with pause at narration_time=30 for 5 seconds:
- Segment 1: source 0-30 -> output 0-30
- Segment 2: source 30-end -> output 35-end
"""
if not pauses:
return [(0.0, total_duration, 0.0, total_duration)]
segments = []
cumulative_pause = 0.0
prev_narration_end = 0.0
for pause in pauses:
# Segment before this pause
src_start = prev_narration_end
src_end = pause.narration_time
out_start = prev_narration_end + cumulative_pause
out_end = pause.output_time
if src_end > src_start:
segments.append((src_start, src_end, out_start, out_end))
# Update for next segment
prev_narration_end = pause.narration_time
cumulative_pause += pause.duration
# Final segment after all pauses
# Calculate total narration duration (total_duration minus all pause durations)
total_pause_duration = sum(p.duration for p in pauses)
narration_end = total_duration - total_pause_duration
if narration_end > prev_narration_end:
src_start = prev_narration_end
src_end = narration_end
out_start = prev_narration_end + cumulative_pause
out_end = total_duration
segments.append((src_start, src_end, out_start, out_end))
return segments
def build_filter_complex(
plan: RenderPlan,
has_background: bool,
bg_idx: int,
bg_is_image: bool,
always_visible_inputs: list[int],
slide_inputs: dict[str, int],
video_inputs: dict[int, int], # event_index -> input_idx
num_inputs_before_audio: int,
audio_inputs: dict[str, int],
audio_durations: dict[str, float], # audio_id -> duration (for crossfade loops)
video_events_with_audio: set[int] = None,
outro_inputs: dict[int, int] = None, # outro event_index -> input_idx
outro_events_with_audio: set[int] = None,
) -> str:
"""
Build the filter_complex string for FFmpeg.
Layer structure (bottom to top):
- Layer 1: Background (solid color, image, or video)
- Layer 2: "below" triggered videos (vfb/vf2b/vsb) — behind slides, use with slide on top to mask
- Layer 3: Slides (transparent in talking-head cutout area)
- Layer 4: Always visible videos (talking head) — above slides, visible through cutout
- Layer 5: "above" triggered videos (vft/vf2t/vst) — topmost, covers everything including talking head
- Layer 6: Camera transform
- Layer 7: Outro videos (fullscreen, after narration ends)
- Audio: Main audio mixed with triggered sound effects and outro audio
"""
outro_inputs = outro_inputs or {}
outro_events_with_audio = outro_events_with_audio or set()
width, height = plan.config.resolution
filters: list[str] = []
# Create base layer (background)
if has_background:
if bg_is_image:
filters.append(
f"[{bg_idx}:v]loop=loop=-1:size=1:start=0,"
f"scale={width}:{height}:force_original_aspect_ratio=increase,"
f"crop={width}:{height},fps={plan.config.fps}[bg]"
)
else:
filters.append(
f"[{bg_idx}:v]fps={plan.config.fps},"
f"scale={width}:{height}:force_original_aspect_ratio=increase,"
f"crop={width}:{height}[bg]"
)
else:
filters.append(f"color=c=black:s={width}x{height}:r={plan.config.fps}[bg]")
current_label = "bg"
# Layer 2: "below" triggered video overlays (vfb/vsb) — behind slides and talking head
for i, event in enumerate(plan.video_events):
if event.layer != "below":
continue
video_idx = video_inputs[i]
cut_x, cut_y, cut_width, cut_height = _calculate_cutout_position(
event.cutout, width, height
)
duration = event.end_time - event.start_time
if event.video_source.take is not None:
duration = min(duration, event.video_source.take)
effective_end = event.start_time + duration
zoom = event.video_source.zoom
zoomed_width = int(cut_width * zoom)
zoomed_height = int(cut_height * zoom)
video_label = f"tvb{i}"
start_pts = event.start_time
filters.append(
f"[{video_idx}:v]format=yuva444p10le,"
f"setpts=PTS-STARTPTS+{start_pts:.3f}/TB,"
f"scale={zoomed_width}:{zoomed_height}:force_original_aspect_ratio=increase,"
f"crop={cut_width}:{cut_height}:(iw-{cut_width})/2:(ih-{cut_height})/2,"
f"format=rgba[{video_label}]"
)
next_label = f"tvbbase{i}"
enable_expr = f"between(t\\,{event.start_time:.3f}\\,{effective_end:.3f})"
filters.append(
f"[{current_label}][{video_label}]overlay="
f"x={cut_x}:y={cut_y}:enable={enable_expr}:eof_action=pass"
f"[{next_label}]"
)
current_label = next_label
# Layer 3: Talking head — above below-videos, but under slides so fullscreen slides cover it
for i, (video_id, video_source, cutout) in enumerate(plan.narration_videos):
input_idx = always_visible_inputs[i]
cut_x, cut_y, cut_width, cut_height = _calculate_cutout_position(
cutout, width, height
)
zoom = video_source.zoom
zoomed_width = int(cut_width * zoom)
zoomed_height = int(cut_height * zoom)
if not plan.narration_pauses:
video_label = f"av{i}"
filters.append(
f"[{input_idx}:v]fps={plan.config.fps},setpts=PTS-STARTPTS,"
f"format=yuva444p10le,"
f"scale={zoomed_width}:{zoomed_height}:force_original_aspect_ratio=increase,"
f"crop={cut_width}:{cut_height}:(iw-{cut_width})/2:(ih-{cut_height})/2,"
f"format=rgba[{video_label}]"
)
next_label = f"avbase{i}"
filters.append(
f"[{current_label}][{video_label}]overlay=x={cut_x}:y={cut_y}[{next_label}]"
)
current_label = next_label
else:
segments = _build_narration_segments(
plan.narration_pauses, plan.total_duration
)
for seg_idx, (src_start, src_end, out_start, out_end) in enumerate(
segments
):
seg_label = f"av{i}_seg{seg_idx}"
pts_offset = out_start
filters.append(
f"[{input_idx}:v]trim={src_start:.3f}:{src_end:.3f},"
f"setpts=PTS-STARTPTS+{pts_offset:.3f}/TB,"
f"format=yuva444p10le,"
f"scale={zoomed_width}:{zoomed_height}:force_original_aspect_ratio=increase,"
f"crop={cut_width}:{cut_height}:(iw-{cut_width})/2:(ih-{cut_height})/2,"
f"format=rgba[{seg_label}]"
)
next_label = f"avbase{i}_seg{seg_idx}"
enable_expr = f"between(t\\,{out_start:.3f}\\,{out_end:.3f})"
filters.append(
f"[{current_label}][{seg_label}]overlay=x={cut_x}:y={cut_y}:"
f"enable={enable_expr}[{next_label}]"
)
current_label = next_label
# Layer 4: "mid" triggered videos (vfm/vsm) — above talking head, below slides
# Use case: content that should show through a slide's transparent "screen hole"
for i, event in enumerate(plan.video_events):
if event.layer != "mid":
continue
video_idx = video_inputs[i]
cut_x, cut_y, cut_width, cut_height = _calculate_cutout_position(
event.cutout, width, height
)
duration = event.end_time - event.start_time
if event.video_source.take is not None:
duration = min(duration, event.video_source.take)
effective_end = event.start_time + duration
zoom = event.video_source.zoom
zoomed_width = int(cut_width * zoom)
zoomed_height = int(cut_height * zoom)
video_label = f"tvm{i}"
start_pts = event.start_time
filters.append(
f"[{video_idx}:v]format=yuva444p10le,"
f"setpts=PTS-STARTPTS+{start_pts:.3f}/TB,"
f"scale={zoomed_width}:{zoomed_height}:force_original_aspect_ratio=increase,"
f"crop={cut_width}:{cut_height}:(iw-{cut_width})/2:(ih-{cut_height})/2,"
f"format=rgba[{video_label}]"
)
next_label = f"tvmbase{i}"
enable_expr = f"between(t\\,{event.start_time:.3f}\\,{effective_end:.3f})"
filters.append(
f"[{current_label}][{video_label}]overlay="
f"x={cut_x}:y={cut_y}:enable={enable_expr}:eof_action=pass"
f"[{next_label}]"
)
current_label = next_label
# Layer 5: Slides — on top of talking head so fullscreen slides cover the narrator
for i, event in enumerate(plan.slide_events):
slide_idx = slide_inputs[event.slide_id]
slide_label = f"s{i}"
filters.append(
f"[{slide_idx}:v]scale={width}:{height}:"
f"force_original_aspect_ratio=decrease,pad={width}:{height}:(ow-iw)/2:(oh-ih)/2:color=0x00000000[{slide_label}]"
)
next_label = f"sbase{i}"
enable_expr = f"between(t\\,{event.start_time:.3f}\\,{event.end_time:.3f})"
filters.append(
f"[{current_label}][{slide_label}]overlay="
f"x=0:y=0:enable={enable_expr}"
f"[{next_label}]"
)
current_label = next_label
# Layer 6: "above" triggered videos (vft/vf2t/vst) — topmost, covers slides and talking head
# Use case: fullscreen video that intentionally masks the narrator
for i, event in enumerate(plan.video_events):
if event.layer != "above":
continue
video_idx = video_inputs[i]
cut_x, cut_y, cut_width, cut_height = _calculate_cutout_position(
event.cutout, width, height
)
duration = event.end_time - event.start_time
if event.video_source.take is not None:
duration = min(duration, event.video_source.take)
effective_end = event.start_time + duration
zoom = event.video_source.zoom
zoomed_width = int(cut_width * zoom)
zoomed_height = int(cut_height * zoom)
video_label = f"tv{i}"
start_pts = event.start_time
filters.append(
f"[{video_idx}:v]format=rgba,"
f"setpts=PTS-STARTPTS+{start_pts:.3f}/TB,"
f"scale={zoomed_width}:{zoomed_height}:force_original_aspect_ratio=increase,"
f"crop={cut_width}:{cut_height}:(iw-{cut_width})/2:(ih-{cut_height})/2"
f"[{video_label}]"
)
next_label = f"tvbase{i}"
enable_expr = f"between(t\\,{event.start_time:.3f}\\,{effective_end:.3f})"
filters.append(
f"[{current_label}][{video_label}]overlay="
f"x={cut_x}:y={cut_y}:enable={enable_expr}:format=auto:eof_action=pass"
f"[{next_label}]"
)
current_label = next_label
# Scene composition complete - now apply camera transform
# Check if we need camera transform (events exist OR initial state is non-default)
needs_camera_transform = plan.camera_events or (
plan.initial_camera_state and not plan.initial_camera_state.is_default()
)
# Determine output label based on whether we have outro events
has_outro = bool(plan.outro_events and outro_inputs)
cam_output_label = "cam_out" if has_outro else "vout"
if needs_camera_transform:
# Output to [scene], then camera transform will produce [cam_out] or [vout]
filters.append(f"[{current_label}]copy[scene]")
camera_filter = build_camera_transform(
plan.camera_events,
width,
height,
plan.config.fps,
initial_state=plan.initial_camera_state,
output_label=cam_output_label,
)
filters.append(camera_filter)
current_label = cam_output_label
else:
# No camera events
if has_outro:
filters.append(f"[{current_label}]copy[cam_out]")
current_label = "cam_out"
else:
filters.append(f"[{current_label}]copy[vout]")
# Add outro video overlays (fullscreen, after narration ends)
if has_outro:
for i, event in enumerate(plan.outro_events):
video_idx = outro_inputs[i]
# Calculate effective duration (respecting 'take' parameter)
duration = event.end_time - event.start_time
if event.video_source.take is not None:
duration = min(duration, event.video_source.take)
effective_end = event.start_time + duration
# Determine if fullscreen or in cutout
if event.cutout:
cut_x, cut_y, cut_width, cut_height = _calculate_cutout_position(
event.cutout, width, height
)
else:
# Fullscreen
cut_x, cut_y, cut_width, cut_height = 0, 0, width, height
# Apply zoom factor
zoom = event.video_source.zoom
zoomed_width = int(cut_width * zoom)
zoomed_height = int(cut_height * zoom)
# Scale and crop video
video_label = f"outro{i}"
start_pts = event.start_time
filters.append(
f"[{video_idx}:v]format=yuva444p10le,"
f"setpts=PTS-STARTPTS+{start_pts:.3f}/TB,"
f"scale={zoomed_width}:{zoomed_height}:force_original_aspect_ratio=increase,"
f"crop={cut_width}:{cut_height}:(iw-{cut_width})/2:(ih-{cut_height})/2,"
f"format=rgba[{video_label}]"
)
# Overlay with time-based enable
next_label = f"outrobase{i}"
enable_expr = f"between(t\\,{event.start_time:.3f}\\,{effective_end:.3f})"
filters.append(
f"[{current_label}][{video_label}]overlay="
f"x={cut_x}:y={cut_y}:enable={enable_expr}"
f"[{next_label}]"
)
current_label = next_label
# Final output
filters.append(f"[{current_label}]copy[vout]")
# Audio mixing: combine main audio with sound effects
if always_visible_inputs:
main_audio_idx = always_visible_inputs[0]
audio_labels_to_mix = []
# Get audio channel setting and volume from first narration video
channel_filter = ""
narration_volume = 1.0
if plan.narration_videos:
_, first_video_source, _ = plan.narration_videos[0]
use_channels = first_video_source.use_audio_channels
if use_channels == "auto":
narration_path = _resolve_video_path(
videos_dir, first_video_source, shared_assets_dir, project_path
)
use_channels = _resolve_auto_channel(narration_path)
channel_filter = _build_audio_channel_filter(use_channels)
narration_volume = first_video_source.volume
# Build volume filter if not 1.0
volume_filter = (
f"volume={narration_volume:.2f}" if narration_volume != 1.0 else ""
)
# Use narration_end_time to stop audio before outro (if outro exists)
audio_end_time = (
plan.narration_end_time if plan.outro_events else plan.total_duration
)
if not plan.narration_pauses:
# Simple case: trim main audio to end before outro (with optional channel and volume filters)
filter_parts = []
if channel_filter:
filter_parts.append(channel_filter)
if volume_filter:
filter_parts.append(volume_filter)
if plan.outro_events:
# Trim narration audio to stop before outro
filter_parts.append(f"atrim=0:{audio_end_time:.3f}")
filter_parts.append("asetpts=PTS-STARTPTS")
filters.append(
f"[{main_audio_idx}:a]{','.join(filter_parts)}[main_aud]"
)
audio_labels_to_mix.append("[main_aud]")
elif filter_parts:
filters.append(
f"[{main_audio_idx}:a]{','.join(filter_parts)}[main_aud]"
)
audio_labels_to_mix.append("[main_aud]")
else:
audio_labels_to_mix.append(f"[{main_audio_idx}:a]")
else:
# Complex case: segment the narration audio for pauses
segments = _build_narration_segments(plan.narration_pauses, audio_end_time)
for seg_idx, (src_start, src_end, out_start, out_end) in enumerate(
segments
):
seg_label = f"narr_aud{seg_idx}"
delay_ms = int(out_start * 1000)
# Trim audio to source range, then delay to output position
# Apply channel filter, volume filter if needed
filter_parts = []
if channel_filter:
filter_parts.append(channel_filter)
filter_parts.append(f"atrim={src_start:.3f}:{src_end:.3f}")
filter_parts.append("asetpts=PTS-STARTPTS")
filter_parts.append(f"adelay={delay_ms}|{delay_ms}")
if volume_filter:
filter_parts.append(volume_filter)
filters.append(
f"[{main_audio_idx}:a]{','.join(filter_parts)}[{seg_label}]"
)
audio_labels_to_mix.append(f"[{seg_label}]")
# Process each audio event with delay and volume
if plan.audio_events and audio_inputs:
for i, event in enumerate(plan.audio_events):
audio_idx = audio_inputs[event.audio_id]
volume = event.audio_def.volume
if event.audio_def.loop:
# Looping audio: loop source, then trim/segment
# Stop at narration end if there's an outro
loop_end_time = audio_end_time
remaining = loop_end_time - event.start_time
if plan.narration_pauses and not event.audio_def.ignore_pauses:
# Build segments that skip narration pauses (pauses by default)
relevant_pauses = [
p
for p in plan.narration_pauses
if p.output_time > event.start_time
]
src_pos = 0.0
seg_start = event.start_time
seg_count = 0
for pause in relevant_pauses:
seg_end = pause.output_time
if seg_end > seg_start:
seg_dur = seg_end - seg_start
seg_label = f"aud{i}_seg{seg_count}"
delay_ms = int(seg_start * 1000)
filters.append(
f"[{audio_idx}:a]aloop=loop=-1:size=2e+09,"
f"atrim={src_pos:.3f}:{src_pos + seg_dur:.3f},"
f"asetpts=PTS-STARTPTS,"
f"adelay={delay_ms}|{delay_ms},"
f"volume={volume:.2f}[{seg_label}]"
)
audio_labels_to_mix.append(f"[{seg_label}]")
src_pos += seg_dur
seg_count += 1
seg_start = pause.output_time + pause.duration
# Final segment after last pause (stop at narration end if outro)
if seg_start < loop_end_time:
seg_dur = loop_end_time - seg_start
seg_label = f"aud{i}_seg{seg_count}"
delay_ms = int(seg_start * 1000)
filters.append(
f"[{audio_idx}:a]aloop=loop=-1:size=2e+09,"
f"atrim={src_pos:.3f}:{src_pos + seg_dur:.3f},"
f"asetpts=PTS-STARTPTS,"
f"adelay={delay_ms}|{delay_ms},"
f"volume={volume:.2f}[{seg_label}]"
)
audio_labels_to_mix.append(f"[{seg_label}]")
else:
# Simple loop: no pauses or ignore_pauses=True
label = f"aud{i}"
delay_ms = int(event.start_time * 1000)
if (
event.audio_def.overlap
and event.audio_id in audio_durations
):
# Crossfade loop: overlap copies with fade in/out
audio_dur = audio_durations[event.audio_id]
crossfade_filters = _build_crossfade_loop_filter(
input_label=f"[{audio_idx}:a]",
output_label=f"[{label}]",
audio_duration=audio_dur,
overlap=event.audio_def.overlap,
needed_duration=remaining,
volume=volume,
delay_ms=delay_ms,
)
filters.extend(crossfade_filters)
else:
# Standard loop without crossfade
filters.append(
f"[{audio_idx}:a]aloop=loop=-1:size=2e+09,"
f"atrim=0:{remaining:.3f},"
f"asetpts=PTS-STARTPTS,"
f"adelay={delay_ms}|{delay_ms},"
f"volume={volume:.2f}[{label}]"
)
audio_labels_to_mix.append(f"[{label}]")
else:
# One-shot audio: delay to trigger time
label = f"aud{i}"
delay_ms = int(event.start_time * 1000)
filters.append(
f"[{audio_idx}:a]adelay={delay_ms}|{delay_ms},volume={volume:.2f}[{label}]"
)
audio_labels_to_mix.append(f"[{label}]")
# Extract and mix audio from triggered video events
_have_audio = video_events_with_audio or set()
for i, event in enumerate(plan.video_events):
if i not in _have_audio:
continue
video_idx = video_inputs[i]
# Calculate effective duration (same logic as video side)
duration = event.end_time - event.start_time
if event.video_source.take is not None:
duration = min(duration, event.video_source.take)
delay_ms = int(event.start_time * 1000)
label = f"tvaud{i}"
vol = event.video_source.volume
vol_filter = f",volume={vol:.2f}" if vol != 1.0 else ""
filters.append(
f"[{video_idx}:a]atrim=0:{duration:.3f},"
f"asetpts=PTS-STARTPTS,"
f"adelay={delay_ms}|{delay_ms}"
f"{vol_filter}[{label}]"
)
audio_labels_to_mix.append(f"[{label}]")
# Extract and mix audio from outro video events
for i, event in enumerate(plan.outro_events):
if i not in outro_events_with_audio:
continue
video_idx = outro_inputs[i]
# Calculate effective duration (same logic as video side)
duration = event.end_time - event.start_time
if event.video_source.take is not None:
duration = min(duration, event.video_source.take)
delay_ms = int(event.start_time * 1000)
label = f"outroaud{i}"
vol = event.video_source.volume
vol_filter = f",volume={vol:.2f}" if vol != 1.0 else ""
filters.append(
f"[{video_idx}:a]atrim=0:{duration:.3f},"
f"asetpts=PTS-STARTPTS,"
f"adelay={delay_ms}|{delay_ms}"
f"{vol_filter}[{label}]"
)
audio_labels_to_mix.append(f"[{label}]")
# Mix all audio tracks together
if len(audio_labels_to_mix) > 1:
num_audio_tracks = len(audio_labels_to_mix)
audio_mix_inputs = "".join(audio_labels_to_mix)
# normalize=0 prevents amix from dividing volume by number of inputs
filters.append(
f"{audio_mix_inputs}amix=inputs={num_audio_tracks}:duration=longest:dropout_transition=0:normalize=0[aout]"
)
elif len(audio_labels_to_mix) == 1:
# Single audio track, just copy it
label = audio_labels_to_mix[0].strip("[]")
filters.append(f"[{label}]acopy[aout]")
return ";".join(filters)
def generate_ffmpeg_command_string(plan: RenderPlan, output_path: Path) -> str:
"""Generate a human-readable FFmpeg command string (for debugging)."""
cmd = build_ffmpeg_command(plan, output_path)
fg = cmd[cmd.index("-filter_complex") + 1]
print("FILTER_COMPLEX repr:", repr(fg))
# Format nicely with line breaks
result = []
i = 0
while i < len(cmd):
if cmd[i] == "-filter_complex":
result.append(
f" -filter_complex \"\n {cmd[i+1].replace(';', ';' + chr(10) + ' ')}\n \""
)
i += 2
elif cmd[i].startswith("-"):
if i + 1 < len(cmd) and not cmd[i + 1].startswith("-"):
result.append(f" {cmd[i]} {cmd[i+1]}")
i += 2
else:
result.append(f" {cmd[i]}")
i += 1
else:
result.append(f" {cmd[i]}")
i += 1
return "".join(result)