1411 lines
54 KiB
Python
1411 lines
54 KiB
Python
"""Load stage: generate and execute FFmpeg commands."""
|
|
|
|
import math
|
|
import subprocess
|
|
from pathlib import Path
|
|
|
|
from .errors import RenderError
|
|
from .parser import _read_json
|
|
from .preprocessor import _resolve_auto_channel
|
|
from .models import (
|
|
AudioEvent,
|
|
CameraEvent,
|
|
CameraState,
|
|
CutoutDefinition,
|
|
RenderPlan,
|
|
SlideEvent,
|
|
SLIDE_LAYOUTS,
|
|
VideoEvent,
|
|
VideoSource,
|
|
)
|
|
from .preprocessor import run_ffmpeg_with_progress
|
|
|
|
|
|
def _get_audio_duration(audio_path: Path) -> float:
|
|
"""Get duration of an audio file using ffprobe.
|
|
|
|
For MP3 files, counts packets directly to get an accurate duration regardless
|
|
of whether the file has a Xing/VBRI header. Falls back to format duration for
|
|
other formats.
|
|
"""
|
|
if audio_path.suffix.lower() == ".mp3":
|
|
# Count actual packets rather than trusting the header estimate.
|
|
# This is slower but accurate for headerless VBR/CBR MP3s.
|
|
cmd = [
|
|
"ffprobe",
|
|
"-v",
|
|
"error",
|
|
"-count_packets",
|
|
"-show_entries",
|
|
"stream=nb_read_packets,duration",
|
|
"-select_streams",
|
|
"a:0",
|
|
"-of",
|
|
"default=noprint_wrappers=1:nokey=1",
|
|
str(audio_path),
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
if result.returncode == 0:
|
|
# Output: duration\nnb_read_packets — take the first non-N/A line
|
|
for line in result.stdout.strip().splitlines():
|
|
try:
|
|
val = float(line)
|
|
if val > 0:
|
|
return val
|
|
except ValueError:
|
|
continue
|
|
cmd = [
|
|
"ffprobe",
|
|
"-v",
|
|
"error",
|
|
"-show_entries",
|
|
"format=duration",
|
|
"-of",
|
|
"default=noprint_wrappers=1:nokey=1",
|
|
str(audio_path),
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
raise RenderError(f"Failed to get duration for {audio_path}: {result.stderr}")
|
|
return float(result.stdout.strip())
|
|
|
|
|
|
def _build_crossfade_loop_filter(
|
|
input_label: str,
|
|
output_label: str,
|
|
audio_duration: float,
|
|
overlap: float,
|
|
needed_duration: float,
|
|
volume: float,
|
|
delay_ms: int,
|
|
) -> list[str]:
|
|
"""
|
|
Build FFmpeg filter chain for crossfade looping.
|
|
|
|
Creates a seamless loop by overlapping copies of the audio with fade in/out.
|
|
Each loop iteration crossfades with the next for `overlap` seconds.
|
|
|
|
Args:
|
|
input_label: Input stream label (e.g., "[0:a]")
|
|
output_label: Output stream label (e.g., "[aud0]")
|
|
audio_duration: Duration of the source audio in seconds
|
|
overlap: Crossfade overlap duration in seconds
|
|
needed_duration: Total duration needed
|
|
volume: Volume multiplier
|
|
delay_ms: Initial delay in milliseconds
|
|
|
|
Returns:
|
|
List of filter strings to append to the filter_complex
|
|
"""
|
|
filters = []
|
|
loop_len = audio_duration - overlap
|
|
|
|
# Calculate number of loop iterations needed (add 1 extra for safety)
|
|
n_loops = math.ceil(needed_duration / loop_len) + 1
|
|
|
|
# Limit to reasonable number of loops to avoid filter complexity explosion
|
|
n_loops = min(n_loops, 100)
|
|
|
|
if n_loops <= 1:
|
|
# Single play, no looping needed
|
|
filters.append(
|
|
f"{input_label}atrim=0:{needed_duration:.3f},"
|
|
f"asetpts=PTS-STARTPTS,"
|
|
f"adelay={delay_ms}|{delay_ms},"
|
|
f"volume={volume:.2f}{output_label}"
|
|
)
|
|
return filters
|
|
|
|
# Split input into n_loops copies
|
|
split_labels = [f"[xfloop_{output_label[1:-1]}_{i}]" for i in range(n_loops)]
|
|
filters.append(f"{input_label}asplit={n_loops}{''.join(split_labels)}")
|
|
|
|
# Process each copy with appropriate delay and fades
|
|
mix_labels = []
|
|
for i in range(n_loops):
|
|
copy_label = split_labels[i]
|
|
out_label = f"[xfl_{output_label[1:-1]}_{i}]"
|
|
mix_labels.append(out_label)
|
|
|
|
loop_delay = i * loop_len
|
|
total_delay_ms = delay_ms + int(loop_delay * 1000)
|
|
|
|
# Build filter chain for this copy
|
|
chain_parts = []
|
|
|
|
# Fade in at start (except first copy)
|
|
if i > 0:
|
|
chain_parts.append(f"afade=t=in:d={overlap:.3f}")
|
|
|
|
# Fade out at end (for overlap with next copy)
|
|
# Calculate fade start time
|
|
fade_out_start = audio_duration - overlap
|
|
if fade_out_start > 0:
|
|
chain_parts.append(f"afade=t=out:st={fade_out_start:.3f}:d={overlap:.3f}")
|
|
|
|
chain_parts.append(f"adelay={total_delay_ms}|{total_delay_ms}")
|
|
chain_parts.append(f"volume={volume:.2f}")
|
|
|
|
filter_chain = ",".join(chain_parts)
|
|
filters.append(f"{copy_label}{filter_chain}{out_label}")
|
|
|
|
# Mix all copies together, then trim to needed duration
|
|
filters.append(
|
|
f"{''.join(mix_labels)}amix=inputs={n_loops}:duration=longest:normalize=0,"
|
|
f"atrim=0:{needed_duration + delay_ms/1000:.3f},"
|
|
f"asetpts=PTS-STARTPTS{output_label}"
|
|
)
|
|
|
|
return filters
|
|
|
|
|
|
def render(plan: RenderPlan, output_path: Path, verbose: bool = False) -> None:
|
|
"""
|
|
Render the final video using FFmpeg.
|
|
|
|
Generates a filter_complex command that:
|
|
1. Scales background video (if present) or creates solid color
|
|
2. Overlays talking head at configured position
|
|
3. Overlays slides at their configured positions with time-based enable
|
|
"""
|
|
# Ensure output directory exists
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Build and execute FFmpeg command
|
|
cmd = build_ffmpeg_command(plan, output_path)
|
|
|
|
if verbose:
|
|
print("FFmpeg command:")
|
|
print(" ".join(cmd))
|
|
print()
|
|
|
|
# Run with progress bar and ETA
|
|
result = run_ffmpeg_with_progress(
|
|
cmd, duration=plan.total_duration, description="Rendering"
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
raise RenderError(
|
|
"FFmpeg rendering failed",
|
|
command=" ".join(cmd),
|
|
stderr=result.stderr,
|
|
)
|
|
|
|
|
|
def _resolve_video_path(
|
|
videos_dir: Path,
|
|
video_source: VideoSource,
|
|
shared_assets_dir: Path = None,
|
|
project_path: Path = None,
|
|
) -> Path:
|
|
"""Resolve the actual video file path (output_file if exists, else source_file).
|
|
|
|
Also checks for WebM variant since preprocessing now outputs WebM for
|
|
compressed alpha channel support.
|
|
|
|
If video_source.is_shared is True, looks in shared_assets_dir instead of videos_dir.
|
|
Uses gnommocache fallback if configured and project_path is provided.
|
|
"""
|
|
from .cache import resolve_with_cache
|
|
|
|
# Determine base directory based on is_shared flag
|
|
if video_source.is_shared and shared_assets_dir:
|
|
base_dir = shared_assets_dir
|
|
else:
|
|
base_dir = videos_dir
|
|
|
|
if video_source.output_file:
|
|
for candidate_dir in [base_dir, base_dir.parent]:
|
|
video_path = candidate_dir / video_source.output_file
|
|
# Check with cache fallback
|
|
if project_path:
|
|
resolved, _ = resolve_with_cache(video_path, project_path)
|
|
if resolved.exists():
|
|
return resolved
|
|
elif video_path.exists():
|
|
return video_path
|
|
# Check for WebM variant (preprocessing outputs compressed WebM instead of ProRes)
|
|
webm_path = video_path.with_suffix(".mov")
|
|
if project_path:
|
|
resolved, _ = resolve_with_cache(webm_path, project_path)
|
|
if resolved.exists():
|
|
return resolved
|
|
elif webm_path.exists():
|
|
return webm_path
|
|
|
|
# Fall back to source_file with cache fallback
|
|
source_path = base_dir / video_source.source_file
|
|
if project_path:
|
|
resolved, _ = resolve_with_cache(source_path, project_path)
|
|
else:
|
|
resolved = source_path
|
|
|
|
if not resolved.exists():
|
|
# File not found anywhere — substitute PlaceholderVideo so FFmpeg doesn't crash
|
|
placeholder = None
|
|
if shared_assets_dir:
|
|
p = shared_assets_dir / "PlaceholderVideo.mp4"
|
|
if project_path:
|
|
p, _ = resolve_with_cache(p, project_path)
|
|
if p.exists():
|
|
placeholder = p
|
|
if placeholder:
|
|
import sys
|
|
print(
|
|
f" Warning: {video_source.source_file} not found — using PlaceholderVideo",
|
|
file=sys.stderr,
|
|
)
|
|
return placeholder
|
|
|
|
return resolved
|
|
|
|
|
|
def _has_audio_stream(video_path: Path) -> bool:
|
|
"""Check if a video file contains a non-empty audio stream.
|
|
|
|
Uses -analyzeduration 0 to avoid the slow avformat_find_stream_info() scan
|
|
that happens when an MP4 has a declared audio track with no actual frames —
|
|
ffprobe would otherwise scan the entire file looking for audio packets.
|
|
|
|
Also checks nb_frames to reject ghost audio tracks (stream header exists in
|
|
the moov atom but no sample data in stsc/stsz).
|
|
"""
|
|
result = subprocess.run(
|
|
[
|
|
"ffprobe",
|
|
"-v",
|
|
"error",
|
|
"-analyzeduration",
|
|
"0",
|
|
"-probesize",
|
|
"1000000",
|
|
"-select_streams",
|
|
"a:0",
|
|
"-show_entries",
|
|
"stream=index,nb_frames",
|
|
"-of",
|
|
"csv=p=0",
|
|
str(video_path),
|
|
],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
output = result.stdout.strip()
|
|
if not output:
|
|
return False
|
|
# output is "index" or "index,nb_frames"
|
|
parts = output.split(",")
|
|
if len(parts) >= 2:
|
|
nb_frames = parts[1].strip()
|
|
if nb_frames == "0":
|
|
return False # Ghost audio track — declared but no sample data
|
|
return True
|
|
|
|
|
|
def _build_audio_channel_filter(use_audio_channels: str) -> str:
|
|
"""Build ffmpeg audio filter for channel selection.
|
|
|
|
Args:
|
|
use_audio_channels: "both", "left", or "right"
|
|
|
|
Returns:
|
|
Filter string (e.g., "pan=mono|c0=c1") or empty string for "both"
|
|
"""
|
|
if use_audio_channels == "left":
|
|
return "pan=mono|c0=c0"
|
|
elif use_audio_channels == "right":
|
|
return "pan=mono|c0=c1"
|
|
return "" # "both" - no filter needed
|
|
|
|
|
|
def build_ffmpeg_command(plan: RenderPlan, output_path: Path) -> list[str]:
|
|
"""Build the complete FFmpeg command as a list of arguments."""
|
|
cmd = ["ffmpeg", "-y"] # -y to overwrite output
|
|
|
|
# Global thread limits before any -i. Without this, each format=rgba conversion
|
|
# in the filter graph (one per video layer) spawns one swscaler thread per CPU core,
|
|
# causing OOM on Apple Silicon where av_cpu_count() returns 10-11.
|
|
from .cache import get_ffmpeg_thread_count
|
|
|
|
_tc = str(get_ffmpeg_thread_count())
|
|
cmd.extend(["-threads", _tc, "-filter_threads", _tc])
|
|
|
|
# Resolve paths to absolute
|
|
project_path = plan.project_path.resolve()
|
|
output_path = output_path.resolve()
|
|
videos_dir = plan.videos_dir.resolve() if plan.videos_dir else project_path
|
|
shared_assets_dir = (
|
|
plan.shared_assets_dir.resolve() if plan.shared_assets_dir else None
|
|
)
|
|
|
|
# Track input indices
|
|
input_idx = 0
|
|
|
|
# Input: always_visible videos (like talking head)
|
|
# Add -ss seek BEFORE -i for skip parameter and/or partial rendering
|
|
always_visible_inputs: list[int] = []
|
|
for video_id, video_source, cutout in plan.narration_videos:
|
|
video_path = _resolve_video_path(
|
|
videos_dir, video_source, shared_assets_dir, project_path
|
|
)
|
|
# Combine video skip setting with partial render offset
|
|
total_seek = video_source.skip + plan.input_seek_time
|
|
if total_seek > 0:
|
|
cmd.extend(["-ss", f"{total_seek:.3f}"])
|
|
# Skip stream analysis — codec params are in the container header, and
|
|
# duration is already known by gnommo via ffprobe (plan.total_duration).
|
|
# Without this, FFmpeg reads 100MB+ of compressed data per input at 4K
|
|
# bitrates before encoding starts ("Estimating duration from bitrate").
|
|
cmd.extend(["-analyzeduration", "0", "-probesize", "1000"])
|
|
cmd.extend(["-i", str(video_path)])
|
|
always_visible_inputs.append(input_idx)
|
|
input_idx += 1
|
|
|
|
from .cache import resolve_with_cache
|
|
|
|
# Input: background — resolved via handle in shared_assets/videos.json
|
|
bg_handle = plan.config.background
|
|
has_background = bool(bg_handle)
|
|
bg_idx = None
|
|
bg_is_image = False
|
|
if has_background:
|
|
shared_assets_dir = project_path.parent / "shared_assets"
|
|
videos_json_bg = shared_assets_dir / "videos.json"
|
|
if not videos_json_bg.exists():
|
|
raise RenderError(
|
|
f"shared_assets/videos.json not found (needed for background handle '{bg_handle}')"
|
|
)
|
|
bg_videos = _read_json(videos_json_bg)
|
|
if bg_handle not in bg_videos:
|
|
raise RenderError(
|
|
f"Background handle '{bg_handle}' not found in shared_assets/videos.json"
|
|
)
|
|
bg_path = shared_assets_dir / bg_videos[bg_handle]["source_file"]
|
|
bg_path, _ = resolve_with_cache(bg_path, plan.project_path)
|
|
if not bg_path.exists():
|
|
raise RenderError(
|
|
f"Background file not found: {bg_path} (from handle '{bg_handle}')"
|
|
)
|
|
image_extensions = {".png", ".jpg", ".jpeg", ".gif", ".bmp", ".tiff", ".webp"}
|
|
bg_is_image = bg_path.suffix.lower() in image_extensions
|
|
# Loop background videos infinitely
|
|
if not bg_is_image:
|
|
cmd.extend(["-stream_loop", "-1"])
|
|
# Duration of background video is irrelevant (looped or image) — skip analysis
|
|
cmd.extend(["-analyzeduration", "0", "-probesize", "1000"])
|
|
cmd.extend(["-i", str(bg_path)])
|
|
bg_idx = input_idx
|
|
input_idx += 1
|
|
|
|
# Input: slide images
|
|
slides_dir = (
|
|
plan.slides_dir.resolve()
|
|
if plan.slides_dir
|
|
else project_path / "media" / "slides"
|
|
)
|
|
slide_inputs: dict[str, int] = {} # slide_id -> input_idx
|
|
|
|
for event in plan.slide_events:
|
|
if event.slide_id not in slide_inputs:
|
|
image_path = slides_dir / event.slide_def.image
|
|
image_path, _ = resolve_with_cache(image_path, project_path)
|
|
cmd.extend(["-i", str(image_path)])
|
|
slide_inputs[event.slide_id] = input_idx
|
|
input_idx += 1
|
|
|
|
# Input: triggered videos
|
|
# Each video event needs its own input because they may have different skip times
|
|
# video_inputs maps (video_id, event_index) -> input_idx
|
|
video_inputs: dict[int, int] = {} # event_index -> input_idx
|
|
video_events_with_audio: set[int] = set() # event indices whose files have audio
|
|
|
|
for i, event in enumerate(plan.video_events):
|
|
video_path = _resolve_video_path(
|
|
videos_dir, event.video_source, shared_assets_dir, project_path
|
|
)
|
|
skip = event.video_source.skip or 0.0
|
|
|
|
# How long this clip needs to play in the output
|
|
clip_duration = event.end_time - event.start_time
|
|
if event.video_source.take is not None:
|
|
clip_duration = min(clip_duration, event.video_source.take)
|
|
|
|
# Loop the clip if the file is shorter than the display window.
|
|
# Don't loop pause-narration videos — they intentionally play once and stop.
|
|
needs_loop = False
|
|
if event.video_source.duration is not None and not event.video_source.pause_narration:
|
|
remaining = event.video_source.duration - skip
|
|
needs_loop = remaining < clip_duration - 0.1 # 0.1 s tolerance
|
|
|
|
if needs_loop:
|
|
cmd.extend(["-stream_loop", "-1"])
|
|
if skip > 0:
|
|
cmd.extend(["-ss", f"{skip:.3f}"])
|
|
cmd.extend(["-analyzeduration", "0", "-probesize", "1000"])
|
|
# Use pre-probed duration (or loop-limited duration) to tell FFmpeg exactly
|
|
# how much to read, preventing scans of ghost audio tracks on empty streams.
|
|
if needs_loop:
|
|
cmd.extend(["-t", f"{clip_duration:.3f}"])
|
|
elif event.video_source.duration is not None:
|
|
remaining = event.video_source.duration - skip
|
|
if remaining > 0:
|
|
cmd.extend(["-t", f"{remaining:.3f}"])
|
|
cmd.extend(["-i", str(video_path)])
|
|
video_inputs[i] = input_idx
|
|
input_idx += 1
|
|
has_audio = event.video_source.has_audio
|
|
if has_audio is None:
|
|
print(
|
|
f" Warning: no cached metadata for '{event.video_source.source_file}' — run 'gnommo import' to avoid slow probing"
|
|
)
|
|
has_audio = _has_audio_stream(video_path)
|
|
if has_audio:
|
|
video_events_with_audio.add(i)
|
|
|
|
# Input: outro videos (play after narration ends)
|
|
outro_inputs: dict[int, int] = {} # event_index -> input_idx
|
|
outro_events_with_audio: set[int] = set()
|
|
|
|
for i, event in enumerate(plan.outro_events):
|
|
video_path = _resolve_video_path(
|
|
videos_dir, event.video_source, shared_assets_dir, project_path
|
|
)
|
|
skip = event.video_source.skip or 0.0
|
|
if skip > 0:
|
|
cmd.extend(["-ss", f"{skip:.3f}"])
|
|
cmd.extend(["-analyzeduration", "0", "-probesize", "1000"])
|
|
if event.video_source.duration is not None:
|
|
remaining = event.video_source.duration - skip
|
|
if remaining > 0:
|
|
cmd.extend(["-t", f"{remaining:.3f}"])
|
|
cmd.extend(["-i", str(video_path)])
|
|
outro_inputs[i] = input_idx
|
|
input_idx += 1
|
|
has_audio = event.video_source.has_audio
|
|
if has_audio is None:
|
|
print(
|
|
f" Warning: no cached metadata for '{event.video_source.source_file}' — run 'gnommo import' to avoid slow probing"
|
|
)
|
|
has_audio = _has_audio_stream(video_path)
|
|
if has_audio:
|
|
outro_events_with_audio.add(i)
|
|
|
|
# Track where audio inputs start
|
|
num_inputs_before_audio = input_idx
|
|
|
|
# Input: audio files
|
|
audio_dir = plan.audio_dir.resolve() if plan.audio_dir else project_path
|
|
audio_inputs: dict[str, int] = {} # audio_id -> input_idx
|
|
audio_durations: dict[str, float] = {} # audio_id -> duration (for crossfade loops)
|
|
|
|
for event in plan.audio_events:
|
|
if event.audio_id not in audio_inputs:
|
|
if event.audio_def.is_shared and plan.shared_assets_dir:
|
|
audio_path = (
|
|
plan.shared_assets_dir / "media" / "audio" / event.audio_def.file
|
|
)
|
|
else:
|
|
audio_path = audio_dir / event.audio_def.file
|
|
audio_path, _ = resolve_with_cache(audio_path, project_path)
|
|
# Use pre-probed duration from audio.json if available (set by import).
|
|
# For MP3 without Xing/VBRI headers this is critical — FFmpeg otherwise
|
|
# scans the whole file to estimate duration (100s+ for large files).
|
|
# Fall back to live probe only for MP3 when duration wasn't pre-cached.
|
|
file_duration = event.audio_def.duration
|
|
if file_duration is None and audio_path.suffix.lower() == ".mp3":
|
|
file_duration = _get_audio_duration(audio_path)
|
|
if file_duration is not None:
|
|
cmd.extend(["-t", str(file_duration)])
|
|
cmd.extend(["-i", str(audio_path)])
|
|
audio_inputs[event.audio_id] = input_idx
|
|
input_idx += 1
|
|
# Cache duration for crossfade loop filter
|
|
if event.audio_def.loop and event.audio_def.overlap:
|
|
audio_durations[event.audio_id] = (
|
|
file_duration
|
|
if file_duration is not None
|
|
else _get_audio_duration(audio_path)
|
|
)
|
|
|
|
# Build filter_complex
|
|
filter_complex = build_filter_complex(
|
|
plan,
|
|
has_background,
|
|
bg_idx,
|
|
bg_is_image,
|
|
always_visible_inputs,
|
|
slide_inputs,
|
|
video_inputs,
|
|
num_inputs_before_audio,
|
|
audio_inputs,
|
|
audio_durations,
|
|
video_events_with_audio,
|
|
outro_inputs,
|
|
outro_events_with_audio,
|
|
)
|
|
cmd.extend(["-filter_complex", filter_complex])
|
|
|
|
# Map output video and audio
|
|
cmd.extend(["-map", "[vout]"])
|
|
|
|
# Determine audio source
|
|
# Priority: [aout] from filter > triggered video > no audio
|
|
# Note: we always create [aout] when always_visible_inputs exists
|
|
if always_visible_inputs:
|
|
cmd.extend(
|
|
["-map", "[aout]"]
|
|
) # Audio from filter (may be segmented or simple copy)
|
|
elif video_inputs:
|
|
# Get first triggered video's input index
|
|
first_video_idx = next(iter(video_inputs.values()))
|
|
cmd.extend(
|
|
["-map", f"{first_video_idx}:a?"]
|
|
) # Audio from first triggered video (? = optional)
|
|
# else: no audio source available, output will be silent
|
|
|
|
# Output settings
|
|
cmd.extend(
|
|
[
|
|
"-t",
|
|
str(plan.total_duration),
|
|
"-c:v",
|
|
"libx264",
|
|
"-preset",
|
|
"fast",
|
|
"-crf",
|
|
"20",
|
|
"-c:a",
|
|
"aac",
|
|
"-b:a",
|
|
"192k",
|
|
"-r",
|
|
str(plan.config.fps),
|
|
str(output_path),
|
|
]
|
|
)
|
|
|
|
return cmd
|
|
|
|
|
|
def _calculate_cutout_position(
|
|
cutout: CutoutDefinition, frame_width: int, frame_height: int
|
|
) -> tuple[int, int, int, int]:
|
|
"""Calculate pixel position, width, and height from cutout definition.
|
|
|
|
Returns: (x, y, width, height)
|
|
"""
|
|
# Calculate height
|
|
if cutout.height >= 0:
|
|
cut_height = cutout.height
|
|
else:
|
|
cut_height = int(frame_height * cutout.height_percent)
|
|
|
|
# Calculate width (defaults to height if not specified)
|
|
if cutout.width >= 0:
|
|
cut_width = cutout.width
|
|
elif cutout.width_percent > 0:
|
|
cut_width = int(frame_width * cutout.width_percent)
|
|
else:
|
|
cut_width = cut_height # Square by default
|
|
|
|
# Calculate x position
|
|
if cutout.x >= 0:
|
|
cut_x = cutout.x
|
|
else:
|
|
cut_x = int(frame_width * cutout.x_percent)
|
|
|
|
# Calculate y position
|
|
if cutout.y >= 0:
|
|
cut_y = cutout.y
|
|
else:
|
|
cut_y = int(frame_height * cutout.y_percent)
|
|
|
|
return cut_x, cut_y, cut_width, cut_height
|
|
|
|
|
|
def build_camera_transform(
|
|
camera_events: list[CameraEvent],
|
|
width: int,
|
|
height: int,
|
|
fps: int,
|
|
initial_state: CameraState = None,
|
|
output_label: str = "vout",
|
|
) -> str:
|
|
"""
|
|
Build FFmpeg filter string for camera transforms (zoom, rotate, pan).
|
|
|
|
Takes the composed [scene] and applies animated camera transforms,
|
|
outputting to the specified label.
|
|
|
|
Args:
|
|
initial_state: Camera state at t=0 (for partial rendering).
|
|
If provided and not default, a virtual event is
|
|
prepended to set the initial state.
|
|
output_label: Label for the output stream (default: "vout")
|
|
"""
|
|
# Handle initial state for partial rendering
|
|
if initial_state and not initial_state.is_default():
|
|
# Prepend a virtual event at t=0 with the initial state (instant, no transition)
|
|
initial_event = CameraEvent(
|
|
time=0.0,
|
|
target_state=initial_state,
|
|
duration=0.0, # Instant
|
|
easing="linear",
|
|
)
|
|
camera_events = [initial_event] + camera_events
|
|
|
|
# Identity transform: if no camera events, pass through.
|
|
if not camera_events:
|
|
return f"[scene]copy[{output_label}]"
|
|
|
|
# Build time-based expressions for each camera property
|
|
zoom_expr = _build_animated_expr(camera_events, "zoom", 1.0)
|
|
rotation_expr = _build_animated_expr(camera_events, "rotation", 0.0)
|
|
pan_x_expr = _build_animated_expr(camera_events, "pan_x", 0.0)
|
|
pan_y_expr = _build_animated_expr(camera_events, "pan_y", 0.0)
|
|
focal_x_expr = _build_animated_expr(camera_events, "focal_x", 0.5)
|
|
focal_y_expr = _build_animated_expr(camera_events, "focal_y", 0.5)
|
|
|
|
# Pad big enough to avoid corners during rotation
|
|
# Use even dimensions to avoid rounding issues in scale/crop
|
|
diagonal = int(math.ceil(math.sqrt(width**2 + height**2)))
|
|
pad_w = ((diagonal + 100) // 2) * 2 # Round up to even
|
|
pad_h = ((diagonal + 100) // 2) * 2
|
|
|
|
# Calculate integer offsets for centering
|
|
pad_x = (pad_w - width) // 2
|
|
pad_y = (pad_h - height) // 2
|
|
|
|
filters: list[str] = []
|
|
|
|
# Pad the scene to allow rotation without clipping
|
|
filters.append(f"[scene]pad={pad_w}:{pad_h}:{pad_x}:{pad_y}:color=black@0[padded]")
|
|
|
|
# Scale for zoom - use max(1, zoom) to prevent shrinking below pad size
|
|
# The ceil/2*2 pattern ensures even output dimensions
|
|
filters.append(
|
|
f"[padded]scale=eval=frame:"
|
|
f"w='trunc(iw*max(1,{zoom_expr})/2+0.5)*2':"
|
|
f"h='trunc(ih*max(1,{zoom_expr})/2+0.5)*2'[zoomed]"
|
|
)
|
|
|
|
# Rotate (degrees -> radians), keep transparent fill
|
|
rotation_rad = f"(-({rotation_expr})*PI/180)"
|
|
filters.append(
|
|
f"[zoomed]format=rgba,"
|
|
f"rotate=a='{rotation_rad}':ow=iw:oh=ih:c='black@0',"
|
|
f"format=yuva444p10le[rotated]"
|
|
)
|
|
|
|
# Crop back to output size with focal point and pan offsets
|
|
# focal_x/focal_y determine where the zoom centers (0.5 = center, 0 = left/top, 1 = right/bottom)
|
|
crop_x = f"((iw-{width})*({focal_x_expr}) + ({pan_x_expr})*(iw-{width})/2)"
|
|
crop_y = f"((ih-{height})*({focal_y_expr}) + ({pan_y_expr})*(ih-{height})/2)"
|
|
filters.append(f"[rotated]crop={width}:{height}:{crop_x}:{crop_y}[{output_label}]")
|
|
|
|
return ";".join(filters)
|
|
|
|
|
|
def ff_escape_expr(expr: str) -> str:
|
|
# Escape filtergraph separators that appear inside FFmpeg expressions.
|
|
# Backslash first to avoid double-escaping.
|
|
return expr.replace("\\", "\\\\").replace(":", "\\:").replace(",", "\\,")
|
|
|
|
|
|
def _build_animated_expr(
|
|
camera_events: list[CameraEvent],
|
|
property_name: str,
|
|
default_value: float,
|
|
) -> str:
|
|
"""
|
|
Build an FFmpeg expression that animates a camera property over time.
|
|
|
|
Creates a piecewise function using nested if() statements:
|
|
- Before first keyframe: default value
|
|
- During transition: linear interpolation
|
|
- After transition: hold value until next keyframe
|
|
|
|
The expression structure is built backwards (inside-out) so the final
|
|
value is the innermost default, and earlier time checks wrap around it.
|
|
"""
|
|
if not camera_events:
|
|
return str(default_value)
|
|
|
|
# Build list of (start_time, end_time, start_value, end_value) segments
|
|
segments: list[tuple[float, float, float, float]] = []
|
|
prev_value = default_value
|
|
prev_end_time = 0.0
|
|
|
|
for event in camera_events:
|
|
target_value = getattr(event.target_state, property_name)
|
|
start_time = event.time
|
|
duration = event.duration
|
|
|
|
# Hold segment: from previous end to this start (if gap exists)
|
|
if start_time > prev_end_time:
|
|
segments.append((prev_end_time, start_time, prev_value, prev_value))
|
|
|
|
# Transition segment
|
|
if duration > 0:
|
|
end_time = start_time + duration
|
|
segments.append((start_time, end_time, prev_value, target_value))
|
|
else:
|
|
# Instant change - represented as a very short segment
|
|
end_time = start_time
|
|
|
|
prev_value = target_value
|
|
prev_end_time = end_time
|
|
|
|
# Build expression from the last segment backwards
|
|
# Start with the final held value
|
|
expr = str(prev_value)
|
|
|
|
# Process segments in reverse order
|
|
for start_time, end_time, start_val, end_val in reversed(segments):
|
|
if start_time == end_time:
|
|
# Point change (instant)
|
|
continue
|
|
|
|
if start_val == end_val:
|
|
# Hold segment: constant value
|
|
segment_expr = str(start_val)
|
|
else:
|
|
# Transition segment: linear interpolation
|
|
# lerp = start + (end - start) * (t - start_time) / duration
|
|
duration = end_time - start_time
|
|
segment_expr = f"({start_val}+({end_val}-{start_val})*(t-{start_time:.3f})/{duration:.3f})"
|
|
|
|
# Wrap with time check
|
|
expr = f"if(between(t,{start_time:.3f},{end_time:.3f}),{segment_expr},{expr})"
|
|
|
|
# Handle time before first segment
|
|
if segments and segments[0][0] > 0:
|
|
expr = f"if(lt(t,{segments[0][0]:.3f}),{default_value},{expr})"
|
|
# Escape special characters for FFmpeg filtergraph
|
|
escaped = ff_escape_expr(expr)
|
|
return escaped
|
|
|
|
|
|
def _build_narration_segments(
|
|
pauses: list, total_duration: float
|
|
) -> list[tuple[float, float, float, float]]:
|
|
"""
|
|
Build narration video segments accounting for pauses.
|
|
|
|
Returns list of (source_start, source_end, output_start, output_end) tuples.
|
|
|
|
Example with pause at narration_time=30 for 5 seconds:
|
|
- Segment 1: source 0-30 -> output 0-30
|
|
- Segment 2: source 30-end -> output 35-end
|
|
"""
|
|
if not pauses:
|
|
return [(0.0, total_duration, 0.0, total_duration)]
|
|
|
|
segments = []
|
|
cumulative_pause = 0.0
|
|
prev_narration_end = 0.0
|
|
|
|
for pause in pauses:
|
|
# Segment before this pause
|
|
src_start = prev_narration_end
|
|
src_end = pause.narration_time
|
|
out_start = prev_narration_end + cumulative_pause
|
|
out_end = pause.output_time
|
|
|
|
if src_end > src_start:
|
|
segments.append((src_start, src_end, out_start, out_end))
|
|
|
|
# Update for next segment
|
|
prev_narration_end = pause.narration_time
|
|
cumulative_pause += pause.duration
|
|
|
|
# Final segment after all pauses
|
|
# Calculate total narration duration (total_duration minus all pause durations)
|
|
total_pause_duration = sum(p.duration for p in pauses)
|
|
narration_end = total_duration - total_pause_duration
|
|
|
|
if narration_end > prev_narration_end:
|
|
src_start = prev_narration_end
|
|
src_end = narration_end
|
|
out_start = prev_narration_end + cumulative_pause
|
|
out_end = total_duration
|
|
segments.append((src_start, src_end, out_start, out_end))
|
|
|
|
return segments
|
|
|
|
|
|
def build_filter_complex(
|
|
plan: RenderPlan,
|
|
has_background: bool,
|
|
bg_idx: int,
|
|
bg_is_image: bool,
|
|
always_visible_inputs: list[int],
|
|
slide_inputs: dict[str, int],
|
|
video_inputs: dict[int, int], # event_index -> input_idx
|
|
num_inputs_before_audio: int,
|
|
audio_inputs: dict[str, int],
|
|
audio_durations: dict[str, float], # audio_id -> duration (for crossfade loops)
|
|
video_events_with_audio: set[int] = None,
|
|
outro_inputs: dict[int, int] = None, # outro event_index -> input_idx
|
|
outro_events_with_audio: set[int] = None,
|
|
) -> str:
|
|
"""
|
|
Build the filter_complex string for FFmpeg.
|
|
|
|
Layer structure (bottom to top):
|
|
- Layer 1: Background (solid color, image, or video)
|
|
- Layer 2: "below" triggered videos (vfb/vf2b/vsb) — behind slides, use with slide on top to mask
|
|
- Layer 3: Slides (transparent in talking-head cutout area)
|
|
- Layer 4: Always visible videos (talking head) — above slides, visible through cutout
|
|
- Layer 5: "above" triggered videos (vft/vf2t/vst) — topmost, covers everything including talking head
|
|
- Layer 6: Camera transform
|
|
- Layer 7: Outro videos (fullscreen, after narration ends)
|
|
- Audio: Main audio mixed with triggered sound effects and outro audio
|
|
"""
|
|
outro_inputs = outro_inputs or {}
|
|
outro_events_with_audio = outro_events_with_audio or set()
|
|
width, height = plan.config.resolution
|
|
filters: list[str] = []
|
|
|
|
# Create base layer (background)
|
|
if has_background:
|
|
if bg_is_image:
|
|
filters.append(
|
|
f"[{bg_idx}:v]loop=loop=-1:size=1:start=0,"
|
|
f"scale={width}:{height}:force_original_aspect_ratio=increase,"
|
|
f"crop={width}:{height},fps={plan.config.fps}[bg]"
|
|
)
|
|
else:
|
|
filters.append(
|
|
f"[{bg_idx}:v]fps={plan.config.fps},"
|
|
f"scale={width}:{height}:force_original_aspect_ratio=increase,"
|
|
f"crop={width}:{height}[bg]"
|
|
)
|
|
else:
|
|
filters.append(f"color=c=black:s={width}x{height}:r={plan.config.fps}[bg]")
|
|
|
|
current_label = "bg"
|
|
|
|
# Layer 2: "below" triggered video overlays (vfb/vsb) — behind slides and talking head
|
|
for i, event in enumerate(plan.video_events):
|
|
if event.layer != "below":
|
|
continue
|
|
video_idx = video_inputs[i]
|
|
cut_x, cut_y, cut_width, cut_height = _calculate_cutout_position(
|
|
event.cutout, width, height
|
|
)
|
|
|
|
duration = event.end_time - event.start_time
|
|
if event.video_source.take is not None:
|
|
duration = min(duration, event.video_source.take)
|
|
effective_end = event.start_time + duration
|
|
|
|
zoom = event.video_source.zoom
|
|
zoomed_width = int(cut_width * zoom)
|
|
zoomed_height = int(cut_height * zoom)
|
|
|
|
video_label = f"tvb{i}"
|
|
start_pts = event.start_time
|
|
filters.append(
|
|
f"[{video_idx}:v]format=yuva444p10le,"
|
|
f"setpts=PTS-STARTPTS+{start_pts:.3f}/TB,"
|
|
f"scale={zoomed_width}:{zoomed_height}:force_original_aspect_ratio=increase,"
|
|
f"crop={cut_width}:{cut_height}:(iw-{cut_width})/2:(ih-{cut_height})/2,"
|
|
f"format=rgba[{video_label}]"
|
|
)
|
|
|
|
next_label = f"tvbbase{i}"
|
|
enable_expr = f"between(t\\,{event.start_time:.3f}\\,{effective_end:.3f})"
|
|
filters.append(
|
|
f"[{current_label}][{video_label}]overlay="
|
|
f"x={cut_x}:y={cut_y}:enable={enable_expr}:eof_action=pass"
|
|
f"[{next_label}]"
|
|
)
|
|
current_label = next_label
|
|
|
|
# Layer 3: Talking head — above below-videos, but under slides so fullscreen slides cover it
|
|
for i, (video_id, video_source, cutout) in enumerate(plan.narration_videos):
|
|
input_idx = always_visible_inputs[i]
|
|
cut_x, cut_y, cut_width, cut_height = _calculate_cutout_position(
|
|
cutout, width, height
|
|
)
|
|
|
|
zoom = video_source.zoom
|
|
zoomed_width = int(cut_width * zoom)
|
|
zoomed_height = int(cut_height * zoom)
|
|
|
|
if not plan.narration_pauses:
|
|
video_label = f"av{i}"
|
|
filters.append(
|
|
f"[{input_idx}:v]fps={plan.config.fps},setpts=PTS-STARTPTS,"
|
|
f"format=yuva444p10le,"
|
|
f"scale={zoomed_width}:{zoomed_height}:force_original_aspect_ratio=increase,"
|
|
f"crop={cut_width}:{cut_height}:(iw-{cut_width})/2:(ih-{cut_height})/2,"
|
|
f"format=rgba[{video_label}]"
|
|
)
|
|
|
|
next_label = f"avbase{i}"
|
|
filters.append(
|
|
f"[{current_label}][{video_label}]overlay=x={cut_x}:y={cut_y}[{next_label}]"
|
|
)
|
|
current_label = next_label
|
|
else:
|
|
segments = _build_narration_segments(
|
|
plan.narration_pauses, plan.total_duration
|
|
)
|
|
|
|
for seg_idx, (src_start, src_end, out_start, out_end) in enumerate(
|
|
segments
|
|
):
|
|
seg_label = f"av{i}_seg{seg_idx}"
|
|
pts_offset = out_start
|
|
filters.append(
|
|
f"[{input_idx}:v]trim={src_start:.3f}:{src_end:.3f},"
|
|
f"setpts=PTS-STARTPTS+{pts_offset:.3f}/TB,"
|
|
f"format=yuva444p10le,"
|
|
f"scale={zoomed_width}:{zoomed_height}:force_original_aspect_ratio=increase,"
|
|
f"crop={cut_width}:{cut_height}:(iw-{cut_width})/2:(ih-{cut_height})/2,"
|
|
f"format=rgba[{seg_label}]"
|
|
)
|
|
|
|
next_label = f"avbase{i}_seg{seg_idx}"
|
|
enable_expr = f"between(t\\,{out_start:.3f}\\,{out_end:.3f})"
|
|
filters.append(
|
|
f"[{current_label}][{seg_label}]overlay=x={cut_x}:y={cut_y}:"
|
|
f"enable={enable_expr}[{next_label}]"
|
|
)
|
|
current_label = next_label
|
|
|
|
# Layer 4: "mid" triggered videos (vfm/vsm) — above talking head, below slides
|
|
# Use case: content that should show through a slide's transparent "screen hole"
|
|
for i, event in enumerate(plan.video_events):
|
|
if event.layer != "mid":
|
|
continue
|
|
video_idx = video_inputs[i]
|
|
cut_x, cut_y, cut_width, cut_height = _calculate_cutout_position(
|
|
event.cutout, width, height
|
|
)
|
|
|
|
duration = event.end_time - event.start_time
|
|
if event.video_source.take is not None:
|
|
duration = min(duration, event.video_source.take)
|
|
effective_end = event.start_time + duration
|
|
|
|
zoom = event.video_source.zoom
|
|
zoomed_width = int(cut_width * zoom)
|
|
zoomed_height = int(cut_height * zoom)
|
|
|
|
video_label = f"tvm{i}"
|
|
start_pts = event.start_time
|
|
filters.append(
|
|
f"[{video_idx}:v]format=yuva444p10le,"
|
|
f"setpts=PTS-STARTPTS+{start_pts:.3f}/TB,"
|
|
f"scale={zoomed_width}:{zoomed_height}:force_original_aspect_ratio=increase,"
|
|
f"crop={cut_width}:{cut_height}:(iw-{cut_width})/2:(ih-{cut_height})/2,"
|
|
f"format=rgba[{video_label}]"
|
|
)
|
|
|
|
next_label = f"tvmbase{i}"
|
|
enable_expr = f"between(t\\,{event.start_time:.3f}\\,{effective_end:.3f})"
|
|
filters.append(
|
|
f"[{current_label}][{video_label}]overlay="
|
|
f"x={cut_x}:y={cut_y}:enable={enable_expr}:eof_action=pass"
|
|
f"[{next_label}]"
|
|
)
|
|
current_label = next_label
|
|
|
|
# Layer 5: Slides — on top of talking head so fullscreen slides cover the narrator
|
|
for i, event in enumerate(plan.slide_events):
|
|
slide_idx = slide_inputs[event.slide_id]
|
|
|
|
slide_label = f"s{i}"
|
|
filters.append(
|
|
f"[{slide_idx}:v]scale={width}:{height}:"
|
|
f"force_original_aspect_ratio=decrease,pad={width}:{height}:(ow-iw)/2:(oh-ih)/2:color=0x00000000[{slide_label}]"
|
|
)
|
|
|
|
next_label = f"sbase{i}"
|
|
enable_expr = f"between(t\\,{event.start_time:.3f}\\,{event.end_time:.3f})"
|
|
filters.append(
|
|
f"[{current_label}][{slide_label}]overlay="
|
|
f"x=0:y=0:enable={enable_expr}"
|
|
f"[{next_label}]"
|
|
)
|
|
current_label = next_label
|
|
|
|
# Layer 6: "above" triggered videos (vft/vf2t/vst) — topmost, covers slides and talking head
|
|
# Use case: fullscreen video that intentionally masks the narrator
|
|
for i, event in enumerate(plan.video_events):
|
|
if event.layer != "above":
|
|
continue
|
|
video_idx = video_inputs[i]
|
|
cut_x, cut_y, cut_width, cut_height = _calculate_cutout_position(
|
|
event.cutout, width, height
|
|
)
|
|
|
|
duration = event.end_time - event.start_time
|
|
if event.video_source.take is not None:
|
|
duration = min(duration, event.video_source.take)
|
|
effective_end = event.start_time + duration
|
|
|
|
zoom = event.video_source.zoom
|
|
zoomed_width = int(cut_width * zoom)
|
|
zoomed_height = int(cut_height * zoom)
|
|
|
|
video_label = f"tv{i}"
|
|
start_pts = event.start_time
|
|
filters.append(
|
|
f"[{video_idx}:v]format=rgba,"
|
|
f"setpts=PTS-STARTPTS+{start_pts:.3f}/TB,"
|
|
f"scale={zoomed_width}:{zoomed_height}:force_original_aspect_ratio=increase,"
|
|
f"crop={cut_width}:{cut_height}:(iw-{cut_width})/2:(ih-{cut_height})/2"
|
|
f"[{video_label}]"
|
|
)
|
|
|
|
next_label = f"tvbase{i}"
|
|
enable_expr = f"between(t\\,{event.start_time:.3f}\\,{effective_end:.3f})"
|
|
filters.append(
|
|
f"[{current_label}][{video_label}]overlay="
|
|
f"x={cut_x}:y={cut_y}:enable={enable_expr}:format=auto:eof_action=pass"
|
|
f"[{next_label}]"
|
|
)
|
|
current_label = next_label
|
|
|
|
# Scene composition complete - now apply camera transform
|
|
# Check if we need camera transform (events exist OR initial state is non-default)
|
|
needs_camera_transform = plan.camera_events or (
|
|
plan.initial_camera_state and not plan.initial_camera_state.is_default()
|
|
)
|
|
|
|
# Determine output label based on whether we have outro events
|
|
has_outro = bool(plan.outro_events and outro_inputs)
|
|
cam_output_label = "cam_out" if has_outro else "vout"
|
|
|
|
if needs_camera_transform:
|
|
# Output to [scene], then camera transform will produce [cam_out] or [vout]
|
|
filters.append(f"[{current_label}]copy[scene]")
|
|
camera_filter = build_camera_transform(
|
|
plan.camera_events,
|
|
width,
|
|
height,
|
|
plan.config.fps,
|
|
initial_state=plan.initial_camera_state,
|
|
output_label=cam_output_label,
|
|
)
|
|
filters.append(camera_filter)
|
|
current_label = cam_output_label
|
|
else:
|
|
# No camera events
|
|
if has_outro:
|
|
filters.append(f"[{current_label}]copy[cam_out]")
|
|
current_label = "cam_out"
|
|
else:
|
|
filters.append(f"[{current_label}]copy[vout]")
|
|
|
|
# Add outro video overlays (fullscreen, after narration ends)
|
|
if has_outro:
|
|
for i, event in enumerate(plan.outro_events):
|
|
video_idx = outro_inputs[i]
|
|
|
|
# Calculate effective duration (respecting 'take' parameter)
|
|
duration = event.end_time - event.start_time
|
|
if event.video_source.take is not None:
|
|
duration = min(duration, event.video_source.take)
|
|
effective_end = event.start_time + duration
|
|
|
|
# Determine if fullscreen or in cutout
|
|
if event.cutout:
|
|
cut_x, cut_y, cut_width, cut_height = _calculate_cutout_position(
|
|
event.cutout, width, height
|
|
)
|
|
else:
|
|
# Fullscreen
|
|
cut_x, cut_y, cut_width, cut_height = 0, 0, width, height
|
|
|
|
# Apply zoom factor
|
|
zoom = event.video_source.zoom
|
|
zoomed_width = int(cut_width * zoom)
|
|
zoomed_height = int(cut_height * zoom)
|
|
|
|
# Scale and crop video
|
|
video_label = f"outro{i}"
|
|
start_pts = event.start_time
|
|
filters.append(
|
|
f"[{video_idx}:v]format=yuva444p10le,"
|
|
f"setpts=PTS-STARTPTS+{start_pts:.3f}/TB,"
|
|
f"scale={zoomed_width}:{zoomed_height}:force_original_aspect_ratio=increase,"
|
|
f"crop={cut_width}:{cut_height}:(iw-{cut_width})/2:(ih-{cut_height})/2,"
|
|
f"format=rgba[{video_label}]"
|
|
)
|
|
|
|
# Overlay with time-based enable
|
|
next_label = f"outrobase{i}"
|
|
enable_expr = f"between(t\\,{event.start_time:.3f}\\,{effective_end:.3f})"
|
|
filters.append(
|
|
f"[{current_label}][{video_label}]overlay="
|
|
f"x={cut_x}:y={cut_y}:enable={enable_expr}"
|
|
f"[{next_label}]"
|
|
)
|
|
|
|
current_label = next_label
|
|
|
|
# Final output
|
|
filters.append(f"[{current_label}]copy[vout]")
|
|
|
|
# Audio mixing: combine main audio with sound effects
|
|
if always_visible_inputs:
|
|
main_audio_idx = always_visible_inputs[0]
|
|
audio_labels_to_mix = []
|
|
|
|
# Get audio channel setting and volume from first narration video
|
|
channel_filter = ""
|
|
narration_volume = 1.0
|
|
if plan.narration_videos:
|
|
_, first_video_source, _ = plan.narration_videos[0]
|
|
use_channels = first_video_source.use_audio_channels
|
|
if use_channels == "auto":
|
|
narration_path = _resolve_video_path(
|
|
videos_dir, first_video_source, shared_assets_dir, project_path
|
|
)
|
|
use_channels = _resolve_auto_channel(narration_path)
|
|
channel_filter = _build_audio_channel_filter(use_channels)
|
|
narration_volume = first_video_source.volume
|
|
|
|
# Build volume filter if not 1.0
|
|
volume_filter = (
|
|
f"volume={narration_volume:.2f}" if narration_volume != 1.0 else ""
|
|
)
|
|
|
|
# Use narration_end_time to stop audio before outro (if outro exists)
|
|
audio_end_time = (
|
|
plan.narration_end_time if plan.outro_events else plan.total_duration
|
|
)
|
|
|
|
if not plan.narration_pauses:
|
|
# Simple case: trim main audio to end before outro (with optional channel and volume filters)
|
|
filter_parts = []
|
|
if channel_filter:
|
|
filter_parts.append(channel_filter)
|
|
if volume_filter:
|
|
filter_parts.append(volume_filter)
|
|
|
|
if plan.outro_events:
|
|
# Trim narration audio to stop before outro
|
|
filter_parts.append(f"atrim=0:{audio_end_time:.3f}")
|
|
filter_parts.append("asetpts=PTS-STARTPTS")
|
|
filters.append(
|
|
f"[{main_audio_idx}:a]{','.join(filter_parts)}[main_aud]"
|
|
)
|
|
audio_labels_to_mix.append("[main_aud]")
|
|
elif filter_parts:
|
|
filters.append(
|
|
f"[{main_audio_idx}:a]{','.join(filter_parts)}[main_aud]"
|
|
)
|
|
audio_labels_to_mix.append("[main_aud]")
|
|
else:
|
|
audio_labels_to_mix.append(f"[{main_audio_idx}:a]")
|
|
else:
|
|
# Complex case: segment the narration audio for pauses
|
|
segments = _build_narration_segments(plan.narration_pauses, audio_end_time)
|
|
for seg_idx, (src_start, src_end, out_start, out_end) in enumerate(
|
|
segments
|
|
):
|
|
seg_label = f"narr_aud{seg_idx}"
|
|
delay_ms = int(out_start * 1000)
|
|
# Trim audio to source range, then delay to output position
|
|
# Apply channel filter, volume filter if needed
|
|
filter_parts = []
|
|
if channel_filter:
|
|
filter_parts.append(channel_filter)
|
|
filter_parts.append(f"atrim={src_start:.3f}:{src_end:.3f}")
|
|
filter_parts.append("asetpts=PTS-STARTPTS")
|
|
filter_parts.append(f"adelay={delay_ms}|{delay_ms}")
|
|
if volume_filter:
|
|
filter_parts.append(volume_filter)
|
|
filters.append(
|
|
f"[{main_audio_idx}:a]{','.join(filter_parts)}[{seg_label}]"
|
|
)
|
|
audio_labels_to_mix.append(f"[{seg_label}]")
|
|
|
|
# Process each audio event with delay and volume
|
|
if plan.audio_events and audio_inputs:
|
|
for i, event in enumerate(plan.audio_events):
|
|
audio_idx = audio_inputs[event.audio_id]
|
|
volume = event.audio_def.volume
|
|
|
|
if event.audio_def.loop:
|
|
# Looping audio: loop source, then trim/segment
|
|
# Stop at narration end if there's an outro
|
|
loop_end_time = audio_end_time
|
|
remaining = loop_end_time - event.start_time
|
|
|
|
if plan.narration_pauses and not event.audio_def.ignore_pauses:
|
|
# Build segments that skip narration pauses (pauses by default)
|
|
relevant_pauses = [
|
|
p
|
|
for p in plan.narration_pauses
|
|
if p.output_time > event.start_time
|
|
]
|
|
src_pos = 0.0
|
|
seg_start = event.start_time
|
|
seg_count = 0
|
|
|
|
for pause in relevant_pauses:
|
|
seg_end = pause.output_time
|
|
if seg_end > seg_start:
|
|
seg_dur = seg_end - seg_start
|
|
seg_label = f"aud{i}_seg{seg_count}"
|
|
delay_ms = int(seg_start * 1000)
|
|
filters.append(
|
|
f"[{audio_idx}:a]aloop=loop=-1:size=2e+09,"
|
|
f"atrim={src_pos:.3f}:{src_pos + seg_dur:.3f},"
|
|
f"asetpts=PTS-STARTPTS,"
|
|
f"adelay={delay_ms}|{delay_ms},"
|
|
f"volume={volume:.2f}[{seg_label}]"
|
|
)
|
|
audio_labels_to_mix.append(f"[{seg_label}]")
|
|
src_pos += seg_dur
|
|
seg_count += 1
|
|
seg_start = pause.output_time + pause.duration
|
|
|
|
# Final segment after last pause (stop at narration end if outro)
|
|
if seg_start < loop_end_time:
|
|
seg_dur = loop_end_time - seg_start
|
|
seg_label = f"aud{i}_seg{seg_count}"
|
|
delay_ms = int(seg_start * 1000)
|
|
filters.append(
|
|
f"[{audio_idx}:a]aloop=loop=-1:size=2e+09,"
|
|
f"atrim={src_pos:.3f}:{src_pos + seg_dur:.3f},"
|
|
f"asetpts=PTS-STARTPTS,"
|
|
f"adelay={delay_ms}|{delay_ms},"
|
|
f"volume={volume:.2f}[{seg_label}]"
|
|
)
|
|
audio_labels_to_mix.append(f"[{seg_label}]")
|
|
else:
|
|
# Simple loop: no pauses or ignore_pauses=True
|
|
label = f"aud{i}"
|
|
delay_ms = int(event.start_time * 1000)
|
|
|
|
if (
|
|
event.audio_def.overlap
|
|
and event.audio_id in audio_durations
|
|
):
|
|
# Crossfade loop: overlap copies with fade in/out
|
|
audio_dur = audio_durations[event.audio_id]
|
|
crossfade_filters = _build_crossfade_loop_filter(
|
|
input_label=f"[{audio_idx}:a]",
|
|
output_label=f"[{label}]",
|
|
audio_duration=audio_dur,
|
|
overlap=event.audio_def.overlap,
|
|
needed_duration=remaining,
|
|
volume=volume,
|
|
delay_ms=delay_ms,
|
|
)
|
|
filters.extend(crossfade_filters)
|
|
else:
|
|
# Standard loop without crossfade
|
|
filters.append(
|
|
f"[{audio_idx}:a]aloop=loop=-1:size=2e+09,"
|
|
f"atrim=0:{remaining:.3f},"
|
|
f"asetpts=PTS-STARTPTS,"
|
|
f"adelay={delay_ms}|{delay_ms},"
|
|
f"volume={volume:.2f}[{label}]"
|
|
)
|
|
audio_labels_to_mix.append(f"[{label}]")
|
|
else:
|
|
# One-shot audio: delay to trigger time
|
|
label = f"aud{i}"
|
|
delay_ms = int(event.start_time * 1000)
|
|
filters.append(
|
|
f"[{audio_idx}:a]adelay={delay_ms}|{delay_ms},volume={volume:.2f}[{label}]"
|
|
)
|
|
audio_labels_to_mix.append(f"[{label}]")
|
|
|
|
# Extract and mix audio from triggered video events
|
|
_have_audio = video_events_with_audio or set()
|
|
for i, event in enumerate(plan.video_events):
|
|
if i not in _have_audio:
|
|
continue
|
|
video_idx = video_inputs[i]
|
|
# Calculate effective duration (same logic as video side)
|
|
duration = event.end_time - event.start_time
|
|
if event.video_source.take is not None:
|
|
duration = min(duration, event.video_source.take)
|
|
delay_ms = int(event.start_time * 1000)
|
|
label = f"tvaud{i}"
|
|
|
|
vol = event.video_source.volume
|
|
vol_filter = f",volume={vol:.2f}" if vol != 1.0 else ""
|
|
filters.append(
|
|
f"[{video_idx}:a]atrim=0:{duration:.3f},"
|
|
f"asetpts=PTS-STARTPTS,"
|
|
f"adelay={delay_ms}|{delay_ms}"
|
|
f"{vol_filter}[{label}]"
|
|
)
|
|
audio_labels_to_mix.append(f"[{label}]")
|
|
|
|
# Extract and mix audio from outro video events
|
|
for i, event in enumerate(plan.outro_events):
|
|
if i not in outro_events_with_audio:
|
|
continue
|
|
video_idx = outro_inputs[i]
|
|
# Calculate effective duration (same logic as video side)
|
|
duration = event.end_time - event.start_time
|
|
if event.video_source.take is not None:
|
|
duration = min(duration, event.video_source.take)
|
|
delay_ms = int(event.start_time * 1000)
|
|
label = f"outroaud{i}"
|
|
|
|
vol = event.video_source.volume
|
|
vol_filter = f",volume={vol:.2f}" if vol != 1.0 else ""
|
|
filters.append(
|
|
f"[{video_idx}:a]atrim=0:{duration:.3f},"
|
|
f"asetpts=PTS-STARTPTS,"
|
|
f"adelay={delay_ms}|{delay_ms}"
|
|
f"{vol_filter}[{label}]"
|
|
)
|
|
audio_labels_to_mix.append(f"[{label}]")
|
|
|
|
# Mix all audio tracks together
|
|
if len(audio_labels_to_mix) > 1:
|
|
num_audio_tracks = len(audio_labels_to_mix)
|
|
audio_mix_inputs = "".join(audio_labels_to_mix)
|
|
# normalize=0 prevents amix from dividing volume by number of inputs
|
|
filters.append(
|
|
f"{audio_mix_inputs}amix=inputs={num_audio_tracks}:duration=longest:dropout_transition=0:normalize=0[aout]"
|
|
)
|
|
elif len(audio_labels_to_mix) == 1:
|
|
# Single audio track, just copy it
|
|
label = audio_labels_to_mix[0].strip("[]")
|
|
filters.append(f"[{label}]acopy[aout]")
|
|
|
|
return ";".join(filters)
|
|
|
|
|
|
def generate_ffmpeg_command_string(plan: RenderPlan, output_path: Path) -> str:
|
|
"""Generate a human-readable FFmpeg command string (for debugging)."""
|
|
cmd = build_ffmpeg_command(plan, output_path)
|
|
fg = cmd[cmd.index("-filter_complex") + 1]
|
|
print("FILTER_COMPLEX repr:", repr(fg))
|
|
# Format nicely with line breaks
|
|
result = []
|
|
i = 0
|
|
while i < len(cmd):
|
|
if cmd[i] == "-filter_complex":
|
|
result.append(
|
|
f" -filter_complex \"\n {cmd[i+1].replace(';', ';' + chr(10) + ' ')}\n \""
|
|
)
|
|
i += 2
|
|
elif cmd[i].startswith("-"):
|
|
if i + 1 < len(cmd) and not cmd[i + 1].startswith("-"):
|
|
result.append(f" {cmd[i]} {cmd[i+1]}")
|
|
i += 2
|
|
else:
|
|
result.append(f" {cmd[i]}")
|
|
i += 1
|
|
else:
|
|
result.append(f" {cmd[i]}")
|
|
i += 1
|
|
|
|
return "".join(result)
|