Adding handoff functionality for reviews

This commit is contained in:
2026-03-13 11:10:32 +01:00
parent fdd275ac0e
commit 3dcd7961c6
35 changed files with 7181 additions and 326 deletions
+211 -37
View File
@@ -19,6 +19,110 @@ from .models import (
from .preprocessor import run_ffmpeg_with_progress
def _get_audio_duration(audio_path: Path) -> float:
"""Get duration of an audio file using ffprobe."""
cmd = [
"ffprobe",
"-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
str(audio_path),
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RenderError(f"Failed to get duration for {audio_path}: {result.stderr}")
return float(result.stdout.strip())
def _build_crossfade_loop_filter(
input_label: str,
output_label: str,
audio_duration: float,
overlap: float,
needed_duration: float,
volume: float,
delay_ms: int,
) -> list[str]:
"""
Build FFmpeg filter chain for crossfade looping.
Creates a seamless loop by overlapping copies of the audio with fade in/out.
Each loop iteration crossfades with the next for `overlap` seconds.
Args:
input_label: Input stream label (e.g., "[0:a]")
output_label: Output stream label (e.g., "[aud0]")
audio_duration: Duration of the source audio in seconds
overlap: Crossfade overlap duration in seconds
needed_duration: Total duration needed
volume: Volume multiplier
delay_ms: Initial delay in milliseconds
Returns:
List of filter strings to append to the filter_complex
"""
filters = []
loop_len = audio_duration - overlap
# Calculate number of loop iterations needed (add 1 extra for safety)
n_loops = math.ceil(needed_duration / loop_len) + 1
# Limit to reasonable number of loops to avoid filter complexity explosion
n_loops = min(n_loops, 100)
if n_loops <= 1:
# Single play, no looping needed
filters.append(
f"{input_label}atrim=0:{needed_duration:.3f},"
f"asetpts=PTS-STARTPTS,"
f"adelay={delay_ms}|{delay_ms},"
f"volume={volume:.2f}{output_label}"
)
return filters
# Split input into n_loops copies
split_labels = [f"[xfloop_{output_label[1:-1]}_{i}]" for i in range(n_loops)]
filters.append(f"{input_label}asplit={n_loops}{''.join(split_labels)}")
# Process each copy with appropriate delay and fades
mix_labels = []
for i in range(n_loops):
copy_label = split_labels[i]
out_label = f"[xfl_{output_label[1:-1]}_{i}]"
mix_labels.append(out_label)
loop_delay = i * loop_len
total_delay_ms = delay_ms + int(loop_delay * 1000)
# Build filter chain for this copy
chain_parts = []
# Fade in at start (except first copy)
if i > 0:
chain_parts.append(f"afade=t=in:d={overlap:.3f}")
# Fade out at end (for overlap with next copy)
# Calculate fade start time
fade_out_start = audio_duration - overlap
if fade_out_start > 0:
chain_parts.append(f"afade=t=out:st={fade_out_start:.3f}:d={overlap:.3f}")
chain_parts.append(f"adelay={total_delay_ms}|{total_delay_ms}")
chain_parts.append(f"volume={volume:.2f}")
filter_chain = ",".join(chain_parts)
filters.append(f"{copy_label}{filter_chain}{out_label}")
# Mix all copies together, then trim to needed duration
filters.append(
f"{''.join(mix_labels)}amix=inputs={n_loops}:duration=longest:normalize=0,"
f"atrim=0:{needed_duration + delay_ms/1000:.3f},"
f"asetpts=PTS-STARTPTS{output_label}"
)
return filters
def render(plan: RenderPlan, output_path: Path, verbose: bool = False) -> None:
"""
Render the final video using FFmpeg.
@@ -56,6 +160,7 @@ def _resolve_video_path(
videos_dir: Path,
video_source: VideoSource,
shared_assets_dir: Path = None,
project_path: Path = None,
) -> Path:
"""Resolve the actual video file path (output_file if exists, else source_file).
@@ -63,7 +168,10 @@ def _resolve_video_path(
compressed alpha channel support.
If video_source.is_shared is True, looks in shared_assets_dir instead of videos_dir.
Uses gnommocache fallback if configured and project_path is provided.
"""
from .cache import resolve_with_cache
# Determine base directory based on is_shared flag
if video_source.is_shared and shared_assets_dir:
base_dir = shared_assets_dir
@@ -72,26 +180,47 @@ def _resolve_video_path(
if video_source.output_file:
video_path = base_dir / video_source.output_file
if video_path.exists():
# Check with cache fallback
if project_path:
resolved, _ = resolve_with_cache(video_path, project_path)
if resolved.exists():
return resolved
elif video_path.exists():
return video_path
# Check for WebM variant (preprocessing outputs compressed WebM instead of ProRes)
webm_path = video_path.with_suffix(".mov")
if webm_path.exists():
if project_path:
resolved, _ = resolve_with_cache(webm_path, project_path)
if resolved.exists():
return resolved
elif webm_path.exists():
return webm_path
return base_dir / video_source.source_file
# Fall back to source_file with cache fallback
source_path = base_dir / video_source.source_file
if project_path:
resolved, _ = resolve_with_cache(source_path, project_path)
return resolved
return source_path
def _has_audio_stream(video_path: Path) -> bool:
"""Check if a video file contains an audio stream using ffprobe."""
result = subprocess.run(
[
"ffprobe", "-v", "error",
"-select_streams", "a",
"-show_entries", "stream=index",
"-of", "csv=p=0",
"ffprobe",
"-v",
"error",
"-select_streams",
"a",
"-show_entries",
"stream=index",
"-of",
"csv=p=0",
str(video_path),
],
capture_output=True, text=True,
capture_output=True,
text=True,
)
return bool(result.stdout.strip())
@@ -131,7 +260,7 @@ def build_ffmpeg_command(plan: RenderPlan, output_path: Path) -> list[str]:
# Add -ss seek BEFORE -i for skip parameter and/or partial rendering
always_visible_inputs: list[int] = []
for video_id, video_source, cutout in plan.narration_videos:
video_path = _resolve_video_path(videos_dir, video_source, shared_assets_dir)
video_path = _resolve_video_path(videos_dir, video_source, shared_assets_dir, project_path)
# Combine video skip setting with partial render offset
total_seek = video_source.skip + plan.input_seek_time
if total_seek > 0:
@@ -141,12 +270,14 @@ def build_ffmpeg_command(plan: RenderPlan, output_path: Path) -> list[str]:
input_idx += 1
# Input: background image/video (if specified)
from .cache import resolve_with_cache
bg_file = plan.config.background or plan.config.background_video
has_background = bool(bg_file)
bg_idx = None
bg_is_image = False
if has_background:
bg_path = project_path / bg_file
bg_path, _ = resolve_with_cache(bg_path, project_path)
if not bg_path.exists():
bg_path = project_path.parent / bg_file
image_extensions = {".png", ".jpg", ".jpeg", ".gif", ".bmp", ".tiff", ".webp"}
@@ -169,6 +300,7 @@ def build_ffmpeg_command(plan: RenderPlan, output_path: Path) -> list[str]:
for event in plan.slide_events:
if event.slide_id not in slide_inputs:
image_path = slides_dir / event.slide_def.image
image_path, _ = resolve_with_cache(image_path, project_path)
cmd.extend(["-i", str(image_path)])
slide_inputs[event.slide_id] = input_idx
input_idx += 1
@@ -181,7 +313,7 @@ def build_ffmpeg_command(plan: RenderPlan, output_path: Path) -> list[str]:
for i, event in enumerate(plan.video_events):
video_path = _resolve_video_path(
videos_dir, event.video_source, shared_assets_dir
videos_dir, event.video_source, shared_assets_dir, project_path
)
# Seek to skip point before loading input
skip = event.video_source.skip
@@ -199,7 +331,7 @@ def build_ffmpeg_command(plan: RenderPlan, output_path: Path) -> list[str]:
for i, event in enumerate(plan.outro_events):
video_path = _resolve_video_path(
videos_dir, event.video_source, shared_assets_dir
videos_dir, event.video_source, shared_assets_dir, project_path
)
# Seek to skip point before loading input
skip = event.video_source.skip
@@ -217,13 +349,18 @@ def build_ffmpeg_command(plan: RenderPlan, output_path: Path) -> list[str]:
# Input: audio files
audio_dir = plan.audio_dir.resolve() if plan.audio_dir else project_path
audio_inputs: dict[str, int] = {} # audio_id -> input_idx
audio_durations: dict[str, float] = {} # audio_id -> duration (for crossfade loops)
for event in plan.audio_events:
if event.audio_id not in audio_inputs:
audio_path = audio_dir / event.audio_def.file
audio_path, _ = resolve_with_cache(audio_path, project_path)
cmd.extend(["-i", str(audio_path)])
audio_inputs[event.audio_id] = input_idx
input_idx += 1
# Cache duration if this audio uses crossfade looping
if event.audio_def.loop and event.audio_def.overlap:
audio_durations[event.audio_id] = _get_audio_duration(audio_path)
# Build filter_complex
filter_complex = build_filter_complex(
@@ -236,6 +373,7 @@ def build_ffmpeg_command(plan: RenderPlan, output_path: Path) -> list[str]:
video_inputs,
num_inputs_before_audio,
audio_inputs,
audio_durations,
video_events_with_audio,
outro_inputs,
outro_events_with_audio,
@@ -541,6 +679,7 @@ def build_filter_complex(
video_inputs: dict[int, int], # event_index -> input_idx
num_inputs_before_audio: int,
audio_inputs: dict[str, int],
audio_durations: dict[str, float], # audio_id -> duration (for crossfade loops)
video_events_with_audio: set[int] = None,
outro_inputs: dict[int, int] = None, # outro event_index -> input_idx
outro_events_with_audio: set[int] = None,
@@ -790,48 +929,65 @@ def build_filter_complex(
main_audio_idx = always_visible_inputs[0]
audio_labels_to_mix = []
# Get audio channel setting from first narration video
# Get audio channel setting and volume from first narration video
channel_filter = ""
narration_volume = 1.0
if plan.narration_videos:
_, first_video_source, _ = plan.narration_videos[0]
channel_filter = _build_audio_channel_filter(
first_video_source.use_audio_channels
)
narration_volume = first_video_source.volume
# Build volume filter if not 1.0
volume_filter = f"volume={narration_volume:.2f}" if narration_volume != 1.0 else ""
# Use narration_end_time to stop audio before outro (if outro exists)
audio_end_time = plan.narration_end_time if plan.outro_events else plan.total_duration
audio_end_time = (
plan.narration_end_time if plan.outro_events else plan.total_duration
)
if not plan.narration_pauses:
# Simple case: trim main audio to end before outro (with optional channel filter)
# Simple case: trim main audio to end before outro (with optional channel and volume filters)
filter_parts = []
if channel_filter:
filter_parts.append(channel_filter)
if volume_filter:
filter_parts.append(volume_filter)
if plan.outro_events:
# Trim narration audio to stop before outro
if channel_filter:
filters.append(f"[{main_audio_idx}:a]{channel_filter}atrim=0:{audio_end_time:.3f},asetpts=PTS-STARTPTS[main_aud]")
else:
filters.append(f"[{main_audio_idx}:a]atrim=0:{audio_end_time:.3f},asetpts=PTS-STARTPTS[main_aud]")
filter_parts.append(f"atrim=0:{audio_end_time:.3f}")
filter_parts.append("asetpts=PTS-STARTPTS")
filters.append(
f"[{main_audio_idx}:a]{','.join(filter_parts)}[main_aud]"
)
audio_labels_to_mix.append("[main_aud]")
elif channel_filter:
filters.append(f"[{main_audio_idx}:a]{channel_filter}[main_aud]")
elif filter_parts:
filters.append(f"[{main_audio_idx}:a]{','.join(filter_parts)}[main_aud]")
audio_labels_to_mix.append("[main_aud]")
else:
audio_labels_to_mix.append(f"[{main_audio_idx}:a]")
else:
# Complex case: segment the narration audio for pauses
segments = _build_narration_segments(
plan.narration_pauses, audio_end_time
)
segments = _build_narration_segments(plan.narration_pauses, audio_end_time)
for seg_idx, (src_start, src_end, out_start, out_end) in enumerate(
segments
):
seg_label = f"narr_aud{seg_idx}"
delay_ms = int(out_start * 1000)
# Trim audio to source range, then delay to output position
# Apply channel filter if needed
channel_part = f"{channel_filter}," if channel_filter else ""
# Apply channel filter, volume filter if needed
filter_parts = []
if channel_filter:
filter_parts.append(channel_filter)
filter_parts.append(f"atrim={src_start:.3f}:{src_end:.3f}")
filter_parts.append("asetpts=PTS-STARTPTS")
filter_parts.append(f"adelay={delay_ms}|{delay_ms}")
if volume_filter:
filter_parts.append(volume_filter)
filters.append(
f"[{main_audio_idx}:a]{channel_part}atrim={src_start:.3f}:{src_end:.3f},"
f"asetpts=PTS-STARTPTS,"
f"adelay={delay_ms}|{delay_ms}[{seg_label}]"
f"[{main_audio_idx}:a]{','.join(filter_parts)}[{seg_label}]"
)
audio_labels_to_mix.append(f"[{seg_label}]")
@@ -850,7 +1006,8 @@ def build_filter_complex(
if plan.narration_pauses and not event.audio_def.ignore_pauses:
# Build segments that skip narration pauses (pauses by default)
relevant_pauses = [
p for p in plan.narration_pauses
p
for p in plan.narration_pauses
if p.output_time > event.start_time
]
src_pos = 0.0
@@ -892,13 +1049,29 @@ def build_filter_complex(
# Simple loop: no pauses or ignore_pauses=True
label = f"aud{i}"
delay_ms = int(event.start_time * 1000)
filters.append(
f"[{audio_idx}:a]aloop=loop=-1:size=2e+09,"
f"atrim=0:{remaining:.3f},"
f"asetpts=PTS-STARTPTS,"
f"adelay={delay_ms}|{delay_ms},"
f"volume={volume:.2f}[{label}]"
)
if event.audio_def.overlap and event.audio_id in audio_durations:
# Crossfade loop: overlap copies with fade in/out
audio_dur = audio_durations[event.audio_id]
crossfade_filters = _build_crossfade_loop_filter(
input_label=f"[{audio_idx}:a]",
output_label=f"[{label}]",
audio_duration=audio_dur,
overlap=event.audio_def.overlap,
needed_duration=remaining,
volume=volume,
delay_ms=delay_ms,
)
filters.extend(crossfade_filters)
else:
# Standard loop without crossfade
filters.append(
f"[{audio_idx}:a]aloop=loop=-1:size=2e+09,"
f"atrim=0:{remaining:.3f},"
f"asetpts=PTS-STARTPTS,"
f"adelay={delay_ms}|{delay_ms},"
f"volume={volume:.2f}[{label}]"
)
audio_labels_to_mix.append(f"[{label}]")
else:
# One-shot audio: delay to trigger time
@@ -952,8 +1125,9 @@ def build_filter_complex(
if len(audio_labels_to_mix) > 1:
num_audio_tracks = len(audio_labels_to_mix)
audio_mix_inputs = "".join(audio_labels_to_mix)
# normalize=0 prevents amix from dividing volume by number of inputs
filters.append(
f"{audio_mix_inputs}amix=inputs={num_audio_tracks}:duration=longest:dropout_transition=0[aout]"
f"{audio_mix_inputs}amix=inputs={num_audio_tracks}:duration=longest:dropout_transition=0:normalize=0[aout]"
)
elif len(audio_labels_to_mix) == 1:
# Single audio track, just copy it