2489 lines
82 KiB
Python
2489 lines
82 KiB
Python
"""Preprocessing stage: apply filters to source videos."""
|
||
|
||
import os
|
||
import subprocess
|
||
import sys
|
||
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
|
||
from pathlib import Path
|
||
from typing import Any, Optional
|
||
import shutil
|
||
from .errors import PreprocessError
|
||
from .models import (
|
||
VideoSource,
|
||
ChromaKeyConfig,
|
||
ColorGradeConfig,
|
||
GnommoKeyConfig,
|
||
AudioNormalizeConfig,
|
||
EQBand,
|
||
)
|
||
from typing import Union, Optional
|
||
|
||
|
||
def _tc() -> str:
|
||
"""Return FFmpeg thread count string from ~/.gnommo.conf [performance] cpu_limit."""
|
||
from .cache import get_ffmpeg_thread_count
|
||
|
||
return str(get_ffmpeg_thread_count())
|
||
|
||
|
||
# Number of parallel workers for chunk processing
|
||
DEFAULT_CHUNK_WORKERS = 1
|
||
|
||
# Chunk duration in seconds for parallel filter processing (avoids huge intermediate files)
|
||
CHUNK_DURATION = 60
|
||
|
||
# Resolution presets for preview/proxy workflow
|
||
# Each entry: (width, height, subdir_name)
|
||
RES_CONFIGS: dict[str, Optional[tuple]] = {
|
||
"full": None, # no downscale, no subdir
|
||
"low": (490, 270, "low"),
|
||
"tiny": (320, 180, "proxy"), # "proxy" subdir kept for backward compat
|
||
}
|
||
|
||
# Keep legacy constants pointing at "tiny" values
|
||
PROXY_WIDTH, PROXY_HEIGHT = RES_CONFIGS["tiny"][:2] # type: ignore[index]
|
||
|
||
|
||
def get_video_duration(video_path: Path) -> float:
|
||
"""Get duration of a video file using ffprobe."""
|
||
cmd = [
|
||
"ffprobe",
|
||
"-v",
|
||
"error",
|
||
"-show_entries",
|
||
"format=duration",
|
||
"-of",
|
||
"default=noprint_wrappers=1:nokey=1",
|
||
str(video_path),
|
||
]
|
||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||
if result.returncode != 0:
|
||
return 0.0
|
||
try:
|
||
return float(result.stdout.strip())
|
||
except ValueError:
|
||
return 0.0
|
||
|
||
|
||
def _video_has_alpha(video_path: Path) -> bool:
|
||
"""Check if a video file has an alpha channel."""
|
||
cmd = [
|
||
"ffprobe",
|
||
"-v",
|
||
"error",
|
||
"-select_streams",
|
||
"v:0",
|
||
"-show_entries",
|
||
"stream=pix_fmt",
|
||
"-of",
|
||
"default=noprint_wrappers=1:nokey=1",
|
||
str(video_path),
|
||
]
|
||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||
if result.returncode != 0:
|
||
return False
|
||
pix_fmt = result.stdout.strip()
|
||
# Pixel formats with alpha contain 'a' (yuva, rgba, bgra, etc.)
|
||
return "yuva" in pix_fmt or "rgba" in pix_fmt or "bgra" in pix_fmt
|
||
|
||
|
||
def format_time(seconds: float) -> str:
|
||
"""Format seconds as human-readable time string."""
|
||
if seconds < 60:
|
||
return f"{int(seconds)}s"
|
||
elif seconds < 3600:
|
||
mins = int(seconds // 60)
|
||
secs = int(seconds % 60)
|
||
return f"{mins}m {secs}s"
|
||
else:
|
||
hours = int(seconds // 3600)
|
||
mins = int((seconds % 3600) // 60)
|
||
return f"{hours}h {mins}m"
|
||
|
||
|
||
def create_downscaled_video(
|
||
source_path: Path,
|
||
out_dir: Path,
|
||
width: int,
|
||
height: int,
|
||
force: bool = False,
|
||
) -> Path:
|
||
"""Downscale a video to the given resolution, preserving audio."""
|
||
out_dir.mkdir(parents=True, exist_ok=True)
|
||
out_path = out_dir / source_path.name
|
||
|
||
if out_path.exists() and not force:
|
||
return out_path
|
||
|
||
cmd = [
|
||
"ffmpeg",
|
||
"-y",
|
||
"-i",
|
||
str(source_path),
|
||
"-vf",
|
||
f"scale={width}:{height}",
|
||
"-c:v",
|
||
"libx264",
|
||
"-preset",
|
||
"ultrafast",
|
||
"-crf",
|
||
"28",
|
||
"-vsync",
|
||
"cfr",
|
||
"-c:a",
|
||
"aac", # re-encode audio so both streams share the same PTS origin,
|
||
"-ar", # avoiding the lip-sync drift caused by libx264 encoder delay
|
||
"48000", # when audio is copied with its original timestamps
|
||
str(out_path),
|
||
]
|
||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||
if result.returncode != 0:
|
||
raise PreprocessError(
|
||
f"Failed to downscale {source_path.name} to {width}x{height}",
|
||
filter_type="downscale",
|
||
command=" ".join(cmd),
|
||
stderr=result.stderr,
|
||
)
|
||
return out_path
|
||
|
||
|
||
# Keep legacy name as alias
|
||
def create_proxy_video(source_path: Path, proxy_dir: Path, force: bool = False) -> Path:
|
||
w, h, _ = RES_CONFIGS["tiny"] # type: ignore[misc]
|
||
return create_downscaled_video(source_path, proxy_dir, w, h, force)
|
||
|
||
|
||
def create_downscaled_videos(
|
||
videos_dir: Path,
|
||
videos: dict[str, VideoSource],
|
||
res: str,
|
||
force: bool = False,
|
||
verbose: bool = False,
|
||
) -> Path:
|
||
"""
|
||
Create downscaled copies of all source videos for the given res preset.
|
||
Returns the path to the output subdirectory.
|
||
"""
|
||
cfg = RES_CONFIGS[res]
|
||
if cfg is None:
|
||
return videos_dir # full res — no subdir
|
||
width, height, subdir = cfg
|
||
|
||
out_dir = videos_dir / subdir
|
||
out_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
source_files: set[str] = set(v.source_file for v in videos.values())
|
||
print(f" Creating {res} copies ({width}x{height})...")
|
||
|
||
for source_file in sorted(source_files):
|
||
source_path = videos_dir / source_file
|
||
if not source_path.exists():
|
||
if verbose:
|
||
print(f" Skipping {source_file} (not found)")
|
||
continue
|
||
out_path = out_dir / source_file
|
||
if out_path.exists() and not force:
|
||
if verbose:
|
||
print(f" {source_file}: exists, skipping")
|
||
continue
|
||
print(f" {source_file}...", end=" ", flush=True)
|
||
create_downscaled_video(source_path, out_dir, width, height, force)
|
||
print("done")
|
||
|
||
return out_dir
|
||
|
||
|
||
# Keep legacy name as alias
|
||
def create_proxies_for_videos(
|
||
videos_dir: Path,
|
||
videos: dict[str, VideoSource],
|
||
force: bool = False,
|
||
verbose: bool = False,
|
||
) -> Path:
|
||
return create_downscaled_videos(videos_dir, videos, "tiny", force, verbose)
|
||
|
||
|
||
def ensure_downscaled_files_exist(
|
||
source_dir: Path,
|
||
res: str,
|
||
force: bool = False,
|
||
verbose: bool = False,
|
||
skip_sources: set = None,
|
||
) -> Path:
|
||
"""
|
||
Ensure downscaled copies exist for all videos in source_dir for the given res preset.
|
||
Creates them on-the-fly if missing. Returns the output subdirectory.
|
||
|
||
skip_sources: optional set of source filenames to skip (e.g. files that have a
|
||
preprocessed output_file, where the full-res processed version will be used instead).
|
||
"""
|
||
cfg = RES_CONFIGS[res]
|
||
if cfg is None:
|
||
return source_dir
|
||
width, height, subdir = cfg
|
||
|
||
video_extensions = {".mov", ".mp4", ".webm", ".avi", ".mkv", ".m4v"}
|
||
out_dir = source_dir / subdir
|
||
out_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
video_files = [
|
||
f
|
||
for f in source_dir.iterdir()
|
||
if f.is_file()
|
||
and f.suffix.lower() in video_extensions
|
||
and "_processed" not in f.stem
|
||
and not f.name.startswith(".")
|
||
and (skip_sources is None or f.name not in skip_sources)
|
||
]
|
||
|
||
if not video_files:
|
||
if verbose:
|
||
print(f" No video files found in {source_dir}")
|
||
return out_dir
|
||
|
||
missing = [f for f in video_files if not (out_dir / f.name).exists() or force]
|
||
|
||
if not missing:
|
||
if verbose:
|
||
print(f" All {res} copies exist in {out_dir}")
|
||
return out_dir
|
||
|
||
print(f" Creating {len(missing)} {res} file(s) ({width}x{height})...")
|
||
for video_file in missing:
|
||
print(f" {video_file.name}...", end=" ", flush=True)
|
||
create_downscaled_video(video_file, out_dir, width, height, force=True)
|
||
print("done")
|
||
|
||
return out_dir
|
||
|
||
|
||
# Keep legacy name as alias
|
||
def ensure_proxy_files_exist(
|
||
source_dir: Path,
|
||
force: bool = False,
|
||
verbose: bool = False,
|
||
) -> Path:
|
||
return ensure_downscaled_files_exist(source_dir, "tiny", force, verbose)
|
||
|
||
|
||
import selectors, time, sys, subprocess
|
||
|
||
|
||
def run_ffmpeg_with_progress(cmd, duration, description="Processing"):
|
||
cmd = cmd.copy()
|
||
|
||
insert_pos = cmd.index("-y") + 1 if "-y" in cmd else 1
|
||
cmd[insert_pos:insert_pos] = [
|
||
"-progress",
|
||
"pipe:1",
|
||
"-nostats",
|
||
"-loglevel",
|
||
"warning",
|
||
]
|
||
|
||
p = subprocess.Popen(
|
||
cmd,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.STDOUT,
|
||
text=True,
|
||
bufsize=1,
|
||
universal_newlines=True,
|
||
)
|
||
|
||
sel = selectors.DefaultSelector()
|
||
sel.register(p.stdout, selectors.EVENT_READ)
|
||
|
||
bar_width = 30
|
||
start_time = time.time()
|
||
last_update = time.time()
|
||
last_percent = 0
|
||
seen_any_progress = False
|
||
last_log_line = ""
|
||
logs = []
|
||
|
||
def draw(percent, suffix=""):
|
||
filled = int(bar_width * percent / 100)
|
||
bar = "█" * filled + "░" * (bar_width - filled)
|
||
sys.stdout.write(
|
||
f"\r {description}: [{bar}] {percent:3d}% {suffix} "
|
||
)
|
||
sys.stdout.flush()
|
||
|
||
draw(0, "Initializing...")
|
||
|
||
while True:
|
||
# If process ended and no more output, break
|
||
if p.poll() is not None:
|
||
# drain any remaining output quickly
|
||
while True:
|
||
line = p.stdout.readline()
|
||
if not line:
|
||
break
|
||
logs.append(line)
|
||
break
|
||
|
||
events = sel.select(timeout=0.2)
|
||
if not events:
|
||
if not seen_any_progress:
|
||
# Show elapsed time and last FFmpeg output line during init
|
||
elapsed = time.time() - start_time
|
||
hint = f" | {last_log_line[:50]}" if last_log_line else ""
|
||
draw(0, f"Initializing... ({elapsed:.0f}s){hint}")
|
||
elif (
|
||
seen_any_progress
|
||
and last_percent >= 99
|
||
and (time.time() - last_update) > 1.0
|
||
):
|
||
draw(last_percent, "Finalizing...")
|
||
continue
|
||
|
||
for key, _ in events:
|
||
line = key.fileobj.readline()
|
||
if not line:
|
||
continue
|
||
logs.append(line)
|
||
# Track last non-empty, non-progress-key line for init diagnostics
|
||
stripped = line.strip()
|
||
if stripped and "=" not in stripped:
|
||
last_log_line = stripped
|
||
|
||
if line.startswith("out_time_ms="):
|
||
val = line.split("=", 1)[1].strip()
|
||
if val != "N/A":
|
||
try:
|
||
t_ms = int(val)
|
||
t_s = t_ms / 1_000_000
|
||
percent = (
|
||
min(99, int((t_s / duration) * 100)) if duration > 0 else 0
|
||
)
|
||
last_percent = max(last_percent, percent)
|
||
last_update = time.time()
|
||
seen_any_progress = True
|
||
draw(last_percent, "")
|
||
except ValueError:
|
||
pass
|
||
|
||
# Completion
|
||
if p.returncode == 0:
|
||
draw(100, "Done\n")
|
||
else:
|
||
code = p.returncode
|
||
# On macOS/Linux, -9 means SIGKILL (OOM kill by OS), -6 = SIGABRT
|
||
signal_hint = (
|
||
" (OOM kill)" if code == -9 else (" (abort)" if code == -6 else "")
|
||
)
|
||
sys.stdout.write(f"\n FFmpeg exited with code {code}{signal_hint}\n")
|
||
sys.stdout.flush()
|
||
|
||
return subprocess.CompletedProcess(
|
||
cmd, p.returncode, stdout="", stderr="".join(logs)
|
||
)
|
||
|
||
|
||
def _has_audio_stream(video_path: Path) -> bool:
|
||
"""Return True if the file has a real (non-ghost) audio stream."""
|
||
result = subprocess.run(
|
||
[
|
||
"ffprobe",
|
||
"-v",
|
||
"error",
|
||
"-analyzeduration",
|
||
"0",
|
||
"-probesize",
|
||
"1000000",
|
||
"-select_streams",
|
||
"a:0",
|
||
"-show_entries",
|
||
"stream=index,nb_frames",
|
||
"-of",
|
||
"csv=p=0",
|
||
str(video_path),
|
||
],
|
||
capture_output=True,
|
||
text=True,
|
||
)
|
||
output = result.stdout.strip()
|
||
if not output:
|
||
return False
|
||
parts = output.split(",")
|
||
if len(parts) >= 2 and parts[1].strip() == "0":
|
||
return False # Ghost audio track — header present but no sample data
|
||
return True
|
||
|
||
|
||
def check_audio_channel_silent(
|
||
input_path: Path, channel: str, threshold_db: float = -60.0
|
||
) -> tuple[bool, float]:
|
||
"""
|
||
Quick check whether the specified audio channel is silent.
|
||
Uses ffmpeg volumedetect (audio-only pass, much faster than full processing).
|
||
|
||
Returns (is_silent, max_volume_db).
|
||
"""
|
||
pan = "pan=mono|c0=c0" if channel == "left" else "pan=mono|c0=c1"
|
||
cmd = [
|
||
"ffmpeg",
|
||
"-i",
|
||
str(input_path),
|
||
"-af",
|
||
f"{pan},volumedetect",
|
||
"-f",
|
||
"null",
|
||
"/dev/null",
|
||
]
|
||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||
for line in result.stderr.splitlines():
|
||
if "max_volume:" in line:
|
||
try:
|
||
max_vol = float(line.split("max_volume:")[1].strip().replace(" dB", ""))
|
||
return max_vol < threshold_db, max_vol
|
||
except ValueError:
|
||
pass
|
||
return False, 0.0
|
||
|
||
|
||
def _resolve_auto_channel(input_path: Path, threshold_db: float = -60.0) -> str:
|
||
"""
|
||
Detect which audio channels have signal and return the appropriate channel setting.
|
||
|
||
Logic:
|
||
- One channel silent, the other not → return the active channel ("left" or "right")
|
||
- Both channels have signal → return "both"
|
||
"""
|
||
left_silent, _ = check_audio_channel_silent(input_path, "left", threshold_db)
|
||
right_silent, _ = check_audio_channel_silent(input_path, "right", threshold_db)
|
||
|
||
if left_silent and not right_silent:
|
||
return "right"
|
||
if right_silent and not left_silent:
|
||
return "left"
|
||
return "both"
|
||
|
||
|
||
def detect_silence_bounds(
|
||
input_path: Path,
|
||
noise_threshold_db: float = -40.0,
|
||
min_silence_duration: float = 0.3,
|
||
verbose: bool = False,
|
||
) -> tuple[float, float]:
|
||
"""
|
||
Detect when audio content starts and ends in a file.
|
||
|
||
Uses FFmpeg's silencedetect filter to find the first and last
|
||
non-silent moments. Useful for automatically computing skip/take values.
|
||
|
||
Two common preamble shapes are handled:
|
||
- File starts with silence → first_sound = end of that silence.
|
||
- File starts with noise (e.g. clothing rustle) followed by a brief
|
||
quiet gap before speech → first_sound = end of that first gap.
|
||
|
||
Args:
|
||
input_path: Video or audio file to analyse.
|
||
noise_threshold_db: dB level below which audio is considered silent.
|
||
Raise (e.g. -25) to treat low-level noise like clothing rustle
|
||
as silence.
|
||
min_silence_duration: Minimum gap length (seconds) that counts as
|
||
silence. Shorter gaps are ignored.
|
||
verbose: Print detected silence periods for debugging.
|
||
|
||
Returns:
|
||
(first_sound_time, last_sound_time) in seconds.
|
||
first_sound_time — when the first meaningful sound begins.
|
||
last_sound_time — when the last meaningful sound ends.
|
||
"""
|
||
total_duration = get_video_duration(input_path)
|
||
|
||
cmd = [
|
||
"ffmpeg",
|
||
"-i",
|
||
str(input_path),
|
||
"-af",
|
||
f"silencedetect=noise={noise_threshold_db}dB:duration={min_silence_duration}",
|
||
"-f",
|
||
"null",
|
||
"/dev/null",
|
||
]
|
||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||
|
||
# Parse silence_start / silence_end lines from stderr
|
||
silence_periods: list[tuple[float, float]] = []
|
||
pending_start: float | None = None
|
||
|
||
for line in result.stderr.splitlines():
|
||
if "silence_start:" in line:
|
||
try:
|
||
pending_start = float(line.split("silence_start:")[1].strip())
|
||
except ValueError:
|
||
pass
|
||
elif "silence_end:" in line and pending_start is not None:
|
||
try:
|
||
end_t = float(line.split("silence_end:")[1].split("|")[0].strip())
|
||
silence_periods.append((pending_start, end_t))
|
||
pending_start = None
|
||
except ValueError:
|
||
pass
|
||
|
||
# File ended while still in silence — close the period at total_duration
|
||
if pending_start is not None:
|
||
silence_periods.append((pending_start, total_duration))
|
||
|
||
if verbose:
|
||
print(f"\n silence periods ({len(silence_periods)}):")
|
||
for s, e in silence_periods:
|
||
print(f" {s:.3f}s – {e:.3f}s")
|
||
|
||
# --- First sound ---
|
||
# Take the end of the FIRST silence period found in the preamble window
|
||
# (first 60 s). This handles both:
|
||
# • file starts with silence → silence[0].start ≈ 0
|
||
# • file starts with noise (crumpling etc.) then has a brief quiet gap
|
||
# before speech → silence[0].start > 0
|
||
# If no silence is found at all the whole file is assumed to be content.
|
||
PREAMBLE_LIMIT = 60.0
|
||
first_sound = 0.0
|
||
for s_start, s_end in silence_periods:
|
||
if s_start < PREAMBLE_LIMIT:
|
||
first_sound = s_end
|
||
break
|
||
|
||
# --- Last sound ---
|
||
# Where the trailing silence begins (if the file ends with silence).
|
||
last_sound = total_duration
|
||
if silence_periods and silence_periods[-1][1] >= total_duration - 0.05:
|
||
last_sound = silence_periods[-1][0]
|
||
|
||
return first_sound, last_sound
|
||
|
||
|
||
def preprocess_video(
|
||
videos_dir: Path,
|
||
video_id: str,
|
||
video_source: VideoSource,
|
||
verbose: bool = False,
|
||
force: bool = False,
|
||
custom_gnommo_scratch: Optional[Path] = None,
|
||
res: str = "full",
|
||
) -> Path:
|
||
"""
|
||
Apply preprocessing filters to a video source.
|
||
|
||
Video filters (chroma_key, mask) are combined into single FFmpeg passes
|
||
for efficiency. Non-video filters (transcribe) are handled separately.
|
||
|
||
Args:
|
||
videos_dir: Directory containing videos.json and video files
|
||
video_id: ID of the video being processed
|
||
video_source: VideoSource with source_file, filter, and output_file
|
||
custom_gnommo_scratch: Optional external directory for intermediate files (e.g., SSD)
|
||
res: Resolution preset — when not "full", source is downscaled before filtering
|
||
|
||
Returns:
|
||
Path to the final preprocessed output file.
|
||
"""
|
||
if not video_source.filter:
|
||
# No filters defined, return original file
|
||
return videos_dir / video_source.source_file
|
||
|
||
# Use custom intermediate dir if provided, otherwise default to videos_dir/intermediate
|
||
if custom_gnommo_scratch:
|
||
gnommo_scratch = custom_gnommo_scratch / video_id
|
||
else:
|
||
gnommo_scratch = videos_dir / "intermediate"
|
||
gnommo_scratch.mkdir(parents=True, exist_ok=True)
|
||
|
||
# Start with the source file (relative to videos_dir)
|
||
current_input = videos_dir / video_source.source_file
|
||
|
||
if not current_input.exists():
|
||
raise PreprocessError(
|
||
f"Source video not found: {current_input}",
|
||
filter_type=None,
|
||
)
|
||
|
||
# For non-full res, downscale the raw source first so all subsequent
|
||
# filters (chroma key, color grade, etc.) operate on the small file.
|
||
if res != "full":
|
||
cfg = RES_CONFIGS.get(res)
|
||
if cfg:
|
||
width, height, _ = cfg
|
||
print(f" Downscaling source to {width}x{height} ({res})...")
|
||
raw_low_dir = gnommo_scratch / f"raw_{res}"
|
||
current_input = create_downscaled_video(
|
||
current_input, raw_low_dir, width, height, force
|
||
)
|
||
|
||
# Resolve channel setting (auto-detect if needed) and sanity check
|
||
channel = video_source.use_audio_channels
|
||
if channel == "auto":
|
||
channel = _resolve_auto_channel(current_input)
|
||
print(f" Auto channel detection: using '{channel}'")
|
||
elif channel in ("left", "right"):
|
||
is_silent, max_vol = check_audio_channel_silent(current_input, channel)
|
||
if is_silent:
|
||
raise PreprocessError(
|
||
f"Audio channel '{channel}' is silent (max_volume={max_vol:.1f} dB). "
|
||
f"Wrong microphone channel selected?",
|
||
filter_type="audio_check",
|
||
)
|
||
|
||
# Track intermediate files for cleanup
|
||
intermediate_files: list[Path] = []
|
||
|
||
# Video filter types that can be combined in a single FFmpeg pass
|
||
VIDEO_FILTER_TYPES = {"chroma_key", "mask", "color_grade", "gnommokey"}
|
||
|
||
# Group consecutive video filters into batches
|
||
filter_batches: list[list[dict]] = []
|
||
current_batch: list[dict] = []
|
||
|
||
for filter_config in video_source.filter:
|
||
filter_type = filter_config.get("type")
|
||
if filter_type in VIDEO_FILTER_TYPES:
|
||
current_batch.append(filter_config)
|
||
else:
|
||
# Non-video filter breaks the batch
|
||
if current_batch:
|
||
filter_batches.append(current_batch)
|
||
current_batch = []
|
||
# Add non-video filter as its own "batch"
|
||
filter_batches.append([filter_config])
|
||
|
||
# Don't forget the last batch
|
||
if current_batch:
|
||
filter_batches.append(current_batch)
|
||
|
||
# Process each batch
|
||
batch_num = 0
|
||
for batch in filter_batches:
|
||
first_filter_type = batch[0].get("type")
|
||
|
||
|
||
|
||
if first_filter_type in VIDEO_FILTER_TYPES:
|
||
# Combined video filter batch - use chunked processing for large files
|
||
filter_names = "+".join(f.get("type") for f in batch)
|
||
print(f" Video filters (combined): {filter_names}")
|
||
|
||
# Output to WebM (compressed with alpha) instead of ProRes
|
||
step_output = gnommo_scratch / f"{video_id}_batch{batch_num}.mov"
|
||
intermediate_files.append(step_output)
|
||
|
||
# Note: skip/take are NOT applied here - they're only used during concatenation
|
||
apply_combined_video_filters_chunked(
|
||
current_input,
|
||
step_output,
|
||
batch,
|
||
verbose,
|
||
take=None,
|
||
scratch_dir=gnommo_scratch / "chunks",
|
||
)
|
||
current_input = step_output
|
||
batch_num += 1
|
||
|
||
elif first_filter_type == "transcribe":
|
||
# Transcribe doesn't transform video
|
||
print(" Filter: transcribe")
|
||
apply_transcribe(current_input, batch[0], verbose, force)
|
||
|
||
elif first_filter_type == "audio_normalize":
|
||
# Audio normalization: denoise, compress, and normalize loudness
|
||
# Note: skip/take are NOT applied here - they're only used during concatenation
|
||
print(" Filter: audio_normalize")
|
||
if not _has_audio_stream(current_input):
|
||
raise PreprocessError(
|
||
f"audio_normalize requires an audio stream, but '{current_input.name}' has none.\n"
|
||
f" Check that the source file has audio, or remove audio_normalize from the filter list.",
|
||
filter_type="audio_normalize",
|
||
command="",
|
||
stderr="",
|
||
)
|
||
step_output = gnommo_scratch / f"{video_id}_batch{batch_num}_audio.mov"
|
||
intermediate_files.append(step_output)
|
||
apply_audio_normalize(
|
||
current_input,
|
||
step_output,
|
||
batch[0],
|
||
verbose,
|
||
take=None,
|
||
use_audio_channels=channel,
|
||
skip_loudnorm=video_source.defer_loudnorm,
|
||
)
|
||
current_input = step_output
|
||
batch_num += 1
|
||
|
||
else:
|
||
raise PreprocessError(
|
||
f"Unknown filter type: {first_filter_type}",
|
||
filter_type=first_filter_type,
|
||
)
|
||
|
||
# If output_file is specified, copy/rename to final location and clean up
|
||
if video_source.output_file:
|
||
import shutil
|
||
|
||
final_output = videos_dir / video_source.output_file
|
||
|
||
final_output.parent.mkdir(parents=True, exist_ok=True)
|
||
|
||
# Copy the final intermediate to the output location
|
||
shutil.copy2(current_input, final_output)
|
||
|
||
if verbose:
|
||
print(f" Final output: {final_output}")
|
||
|
||
# Clean up intermediate files
|
||
for intermediate_file in intermediate_files:
|
||
if intermediate_file.exists():
|
||
intermediate_file.unlink()
|
||
if verbose:
|
||
print(f" Removed intermediate: {intermediate_file.name}")
|
||
|
||
# Remove intermediate directory if empty
|
||
try:
|
||
gnommo_scratch.rmdir()
|
||
except OSError:
|
||
pass # Directory not empty (other videos may have intermediates)
|
||
|
||
return final_output
|
||
|
||
# No output_file specified, return current processed file
|
||
return current_input
|
||
|
||
|
||
def apply_combined_video_filters(
|
||
input_path: Path,
|
||
output_path: Path,
|
||
filters: list[dict],
|
||
verbose: bool = False,
|
||
take: float = None,
|
||
) -> None:
|
||
"""
|
||
Apply multiple video filters in a single FFmpeg pass.
|
||
|
||
Combines chroma_key, mask, and other video filters into one filter chain.
|
||
"""
|
||
filter_parts: list[str] = []
|
||
|
||
for filter_config in filters:
|
||
filter_type = filter_config.get("type")
|
||
|
||
if filter_type == "chroma_key":
|
||
filter_parts.append(build_chroma_key_filter(filter_config))
|
||
elif filter_type == "mask":
|
||
filter_parts.append(build_mask_filter(filter_config))
|
||
elif filter_type == "color_grade":
|
||
filter_parts.append(build_color_grade_filter(filter_config))
|
||
elif filter_type == "gnommokey":
|
||
filter_parts.append(build_gnommokey_filter(filter_config))
|
||
|
||
video_filter = ",".join(filter_parts)
|
||
|
||
# Build FFmpeg command
|
||
cmd = ["ffmpeg", "-y"]
|
||
# Global options before -i (after -i they become output options and don't limit filter threads)
|
||
cmd.extend(["-threads", _tc(), "-filter_threads", _tc()])
|
||
|
||
if take is not None:
|
||
cmd.extend(["-t", str(take)])
|
||
|
||
cmd.extend(
|
||
[
|
||
"-probesize",
|
||
"50000000",
|
||
"-analyzeduration",
|
||
"50000000",
|
||
"-i",
|
||
str(input_path),
|
||
"-vf",
|
||
video_filter,
|
||
"-c:v",
|
||
"prores_ks",
|
||
"-profile:v",
|
||
"4", # ProRes 4444
|
||
"-pix_fmt",
|
||
"yuva444p10le", # 10-bit with alpha
|
||
"-c:a",
|
||
"pcm_s16le", # Lossless audio
|
||
str(output_path),
|
||
]
|
||
)
|
||
|
||
if verbose:
|
||
print(f" Combined filter: {video_filter}")
|
||
print(f" Command: {' '.join(cmd)}")
|
||
|
||
# Get duration for progress bar
|
||
duration = take if take is not None else get_video_duration(input_path)
|
||
|
||
result = run_ffmpeg_with_progress(cmd, duration, "Processing")
|
||
|
||
if result.returncode != 0:
|
||
raise PreprocessError(
|
||
"Combined video filter failed",
|
||
filter_type="combined",
|
||
command=" ".join(cmd),
|
||
stderr=result.stderr,
|
||
)
|
||
|
||
|
||
def build_chroma_key_filter(config: dict) -> str:
|
||
"""Build FFmpeg chromakey filter string from config."""
|
||
chroma_config = parse_chroma_key_config(config)
|
||
|
||
r, g, b = chroma_config.color
|
||
hex_color = f"0x{r:02x}{g:02x}{b:02x}"
|
||
|
||
parts = [
|
||
f"chromakey={hex_color}:{chroma_config.similarity:.3f}:{chroma_config.blend:.3f}"
|
||
]
|
||
|
||
if chroma_config.spill > 0:
|
||
parts.append(f"despill=type=green:mix={chroma_config.spill:.3f}")
|
||
|
||
# Edge erosion: shrink alpha mask to remove green fringe
|
||
# Uses erosion filter targeting only alpha channel (plane 3)
|
||
# threshold0-2=65535 means Y/U/V unchanged, threshold3=0 erodes alpha
|
||
if chroma_config.edge_erode > 0:
|
||
erode_passes = min(chroma_config.edge_erode, 5) # Cap at 5 passes
|
||
parts.append("format=yuva444p")
|
||
for _ in range(erode_passes):
|
||
parts.append(
|
||
"erosion=threshold0=65535:threshold1=65535:threshold2=65535:threshold3=0"
|
||
)
|
||
|
||
# Color protection: restore alpha for pixels matching protected color
|
||
# This runs AFTER chromakey/despill/erosion to restore any incorrectly keyed pixels
|
||
if chroma_config.protect_color:
|
||
pr, pg, pb = chroma_config.protect_color
|
||
# Convert tolerance from 0-1 range to pixel range (0-255)
|
||
tol = int(chroma_config.protect_tolerance * 255)
|
||
|
||
# Ensure we're in RGBA for geq to work with r/g/b/alpha functions
|
||
parts.append("format=rgba")
|
||
|
||
# Build condition: pixel RGB is within tolerance of protected color
|
||
# between(value, min, max) returns 1 if min <= value <= max
|
||
# Multiply conditions together for AND logic
|
||
condition = (
|
||
f"between(r(X,Y),{max(0, pr-tol)},{min(255, pr+tol)})*"
|
||
f"between(g(X,Y),{max(0, pg-tol)},{min(255, pg+tol)})*"
|
||
f"between(b(X,Y),{max(0, pb-tol)},{min(255, pb+tol)})"
|
||
)
|
||
|
||
# geq: if pixel matches protected color, set alpha to 255, else keep current alpha
|
||
parts.append(
|
||
f"geq=r='r(X,Y)':g='g(X,Y)':b='b(X,Y)':a='if({condition},255,alpha(X,Y))'"
|
||
)
|
||
|
||
return ",".join(parts)
|
||
|
||
|
||
def build_mask_filter(config: dict) -> str:
|
||
"""Build FFmpeg geq mask filter string from config."""
|
||
left = float(config.get("left", 0))
|
||
right = float(config.get("right", 0))
|
||
top = float(config.get("top", 0))
|
||
bottom = float(config.get("bottom", 0))
|
||
|
||
conditions = []
|
||
if left > 0:
|
||
conditions.append(f"lt(X,W*{left})")
|
||
if right > 0:
|
||
conditions.append(f"gt(X,W*{1-right})")
|
||
if top > 0:
|
||
conditions.append(f"lt(Y,H*{top})")
|
||
if bottom > 0:
|
||
conditions.append(f"gt(Y,H*{1-bottom})")
|
||
|
||
if not conditions:
|
||
return "copy" # No-op filter
|
||
|
||
alpha_expr = "+".join(conditions)
|
||
alpha_expr = f"if({alpha_expr},0,alpha(X,Y))"
|
||
|
||
# Use r/g/b passthrough so this works in rgba space (as output by gnommokey/color_grade)
|
||
# without triggering an rgba→yuv conversion that would spawn 11 more swscaler threads.
|
||
return f"geq=r='r(X,Y)':g='g(X,Y)':b='b(X,Y)':a='{alpha_expr}'"
|
||
|
||
|
||
def build_color_grade_filter(config: dict) -> str:
|
||
"""Build FFmpeg color grading filter string from config.
|
||
|
||
Applies color balance, curves, and EQ adjustments while preserving alpha.
|
||
The filter chain converts to RGBA for color operations, then back to
|
||
yuva444p10le to preserve the alpha channel.
|
||
"""
|
||
grade_config = parse_color_grade_config(config)
|
||
parts: list[str] = []
|
||
|
||
# Start with format conversion to RGBA for color operations
|
||
parts.append("format=rgba")
|
||
|
||
# Color balance (only add if any value is non-zero)
|
||
colorbalance_parts = []
|
||
if grade_config.rs != 0:
|
||
colorbalance_parts.append(f"rs={grade_config.rs:.3f}")
|
||
if grade_config.gs != 0:
|
||
colorbalance_parts.append(f"gs={grade_config.gs:.3f}")
|
||
if grade_config.bs != 0:
|
||
colorbalance_parts.append(f"bs={grade_config.bs:.3f}")
|
||
if grade_config.rm != 0:
|
||
colorbalance_parts.append(f"rm={grade_config.rm:.3f}")
|
||
if grade_config.gm != 0:
|
||
colorbalance_parts.append(f"gm={grade_config.gm:.3f}")
|
||
if grade_config.bm != 0:
|
||
colorbalance_parts.append(f"bm={grade_config.bm:.3f}")
|
||
if grade_config.rh != 0:
|
||
colorbalance_parts.append(f"rh={grade_config.rh:.3f}")
|
||
if grade_config.gh != 0:
|
||
colorbalance_parts.append(f"gh={grade_config.gh:.3f}")
|
||
if grade_config.bh != 0:
|
||
colorbalance_parts.append(f"bh={grade_config.bh:.3f}")
|
||
|
||
if colorbalance_parts:
|
||
parts.append(f"colorbalance={':'.join(colorbalance_parts)}")
|
||
|
||
# Curves preset (if specified)
|
||
if grade_config.curves_preset and grade_config.curves_preset != "none":
|
||
parts.append(f"curves=preset={grade_config.curves_preset}")
|
||
|
||
# EQ adjustments (only add if different from defaults)
|
||
eq_parts = []
|
||
if grade_config.contrast != 1.0:
|
||
eq_parts.append(f"contrast={grade_config.contrast:.3f}")
|
||
if grade_config.brightness != 0.0:
|
||
eq_parts.append(f"brightness={grade_config.brightness:.3f}")
|
||
if grade_config.saturation != 1.0:
|
||
eq_parts.append(f"saturation={grade_config.saturation:.3f}")
|
||
|
||
if eq_parts:
|
||
parts.append(f"eq={':'.join(eq_parts)}")
|
||
|
||
# Custom curves (if specified)
|
||
custom_curves = []
|
||
if grade_config.curves_r:
|
||
custom_curves.append(f"r='{grade_config.curves_r}'")
|
||
if grade_config.curves_g:
|
||
custom_curves.append(f"g='{grade_config.curves_g}'")
|
||
if grade_config.curves_b:
|
||
custom_curves.append(f"b='{grade_config.curves_b}'")
|
||
if grade_config.curves_master:
|
||
custom_curves.append(f"master='{grade_config.curves_master}'")
|
||
|
||
if custom_curves:
|
||
parts.append(f"curves={':'.join(custom_curves)}")
|
||
|
||
# Convert back to yuva444p10le to preserve alpha for downstream filters
|
||
parts.append("format=yuva444p10le")
|
||
|
||
return ",".join(parts)
|
||
|
||
|
||
def parse_color_grade_config(config: dict) -> ColorGradeConfig:
|
||
"""Parse a color grade config dictionary into ColorGradeConfig."""
|
||
return ColorGradeConfig(
|
||
# Shadows
|
||
rs=float(config.get("rs", 0.0)),
|
||
gs=float(config.get("gs", 0.0)),
|
||
bs=float(config.get("bs", 0.0)),
|
||
# Midtones
|
||
rm=float(config.get("rm", 0.0)),
|
||
gm=float(config.get("gm", 0.0)),
|
||
bm=float(config.get("bm", 0.0)),
|
||
# Highlights
|
||
rh=float(config.get("rh", 0.0)),
|
||
gh=float(config.get("gh", 0.0)),
|
||
bh=float(config.get("bh", 0.0)),
|
||
# Curves preset
|
||
curves_preset=config.get("curves_preset", "none"),
|
||
# EQ
|
||
contrast=float(config.get("contrast", 1.0)),
|
||
brightness=float(config.get("brightness", 0.0)),
|
||
saturation=float(config.get("saturation", 1.0)),
|
||
# Custom curves
|
||
curves_r=config.get("curves_r", ""),
|
||
curves_g=config.get("curves_g", ""),
|
||
curves_b=config.get("curves_b", ""),
|
||
curves_master=config.get("curves_master", ""),
|
||
)
|
||
|
||
|
||
def build_gnommokey_filter(config: dict) -> str:
|
||
"""Build FFmpeg gnommokey filter string - Keylight-style color-difference keyer.
|
||
|
||
Uses YCbCr color-difference keying algorithm:
|
||
- For green screen: key signal = (Cb - Cr), high values = green
|
||
- screen_gain scales the key extraction strength
|
||
- screen_balance mixes luminance into the key calculation
|
||
- clip_black/clip_white compress the matte range
|
||
- despill shifts green spill toward the bias color
|
||
"""
|
||
cfg = parse_gnommokey_config(config)
|
||
parts: list[str] = []
|
||
|
||
# Get screen color RGB values
|
||
sr, sg, sb = cfg.screen_color
|
||
|
||
# Determine if this is green or blue screen based on RGB dominance
|
||
# Green screen: G is the highest channel
|
||
# Blue screen: B is the highest channel
|
||
is_green_screen = sg >= sb
|
||
|
||
# Work in RGBA space for RGB-based color difference keying
|
||
parts.append("format=rgba")
|
||
|
||
# Build the alpha calculation expression
|
||
gain = cfg.screen_gain / 100.0
|
||
balance = cfg.screen_balance / 100.0
|
||
|
||
# RGB-based color-difference key calculation:
|
||
# For green screen: key = G - max(R, B) → measures "greenness"
|
||
# For blue screen: key = B - max(R, G) → measures "blueness"
|
||
# This is more reliable than YCbCr for screens that aren't pure colors
|
||
if is_green_screen:
|
||
# Green screen: how much does G exceed the stronger of R or B?
|
||
key_signal = "max(0,g(X,Y)-max(r(X,Y),b(X,Y)))"
|
||
else:
|
||
# Blue screen: how much does B exceed the stronger of R or G?
|
||
key_signal = "max(0,b(X,Y)-max(r(X,Y),g(X,Y)))"
|
||
|
||
# Apply screen_balance: mix in luminance-based keying
|
||
# At balance=0: pure color difference
|
||
# At balance=1: luminance contributes (pixels matching screen luma key more)
|
||
screen_y = int(0.299 * sr + 0.587 * sg + 0.114 * sb)
|
||
|
||
if balance > 0:
|
||
# Luma similarity: boost keying for pixels with similar luminance to screen
|
||
# This helps key darker/lighter greens that might otherwise be missed
|
||
luma_expr = f"(0.299*r(X,Y)+0.587*g(X,Y)+0.114*b(X,Y))"
|
||
luma_boost = f"(1+{balance:.2f}*(1-abs({luma_expr}-{screen_y})/128))"
|
||
key_expr = f"({key_signal})*{luma_boost}"
|
||
else:
|
||
key_expr = f"({key_signal})"
|
||
|
||
# Apply gain: screen_gain of 100 = 1.0, 126 = 1.26
|
||
# For typical green screen, G-max(R,B) ranges 0-150
|
||
# Scale factor maps this to 0-255 range
|
||
scale_factor = gain * 2.5
|
||
key_expr = f"({key_expr})*{scale_factor:.3f}"
|
||
|
||
# Shadow boost: amplify key signal for dark pixels so dark greens key out fully.
|
||
# shadow_factor = 1 - luma/255 (high for dark pixels, 0 for bright pixels)
|
||
# extra multiplier = 1 + shadow_boost * shadow_factor
|
||
if cfg.shadow_boost > 0:
|
||
luma_expr = f"(0.299*r(X,Y)+0.587*g(X,Y)+0.114*b(X,Y))"
|
||
shadow_factor = f"(1-{luma_expr}/255)"
|
||
key_expr = f"({key_expr})*(1+{cfg.shadow_boost:.3f}*{shadow_factor})"
|
||
|
||
# Apply clip_black and clip_white to compress the matte
|
||
# clip_black: key values below this become 0 (those pixels stay opaque)
|
||
# clip_white: key values above this become 255 (fully transparent)
|
||
# Default 0/100 means: 0-255 maps to 0-255 (no change)
|
||
clip_b = cfg.clip_black * 2.55 # Convert 0-100 to 0-255
|
||
clip_w = cfg.clip_white * 2.55
|
||
|
||
if clip_w > clip_b:
|
||
# Remap the range [clip_b, clip_w] to [0, 255]
|
||
range_scale = 255.0 / (clip_w - clip_b)
|
||
key_expr = f"clip(({key_expr}-{clip_b:.1f})*{range_scale:.3f},0,255)"
|
||
else:
|
||
key_expr = f"clip({key_expr},0,255)"
|
||
|
||
# Invert: high key value (green) = low alpha (transparent)
|
||
alpha_expr = f"255-{key_expr}"
|
||
|
||
# Luminance protection: lock bright pixels to fully opaque so white objects
|
||
# (headphones, teeth) are never accidentally keyed or jitter.
|
||
# protect_luma=-1 disables this. Use ~220 for typical white protection.
|
||
if cfg.protect_luma >= 0:
|
||
luma_expr = f"(0.299*r(X,Y)+0.587*g(X,Y)+0.114*b(X,Y))"
|
||
alpha_expr = f"if(gt({luma_expr},{cfg.protect_luma}),255,{alpha_expr})"
|
||
|
||
# Build the geq filter for alpha (in RGBA mode)
|
||
parts.append(f"geq=r='r(X,Y)':g='g(X,Y)':b='b(X,Y)':a='{alpha_expr}'")
|
||
|
||
# Despill: shift green/blue spill toward the bias color
|
||
if cfg.despill_bias and cfg.despill_strength > 0:
|
||
# Already in RGBA format
|
||
br, bg, bb = cfg.despill_bias
|
||
strength = cfg.despill_strength
|
||
|
||
if is_green_screen:
|
||
# Green spill: G exceeds max(R, B)
|
||
spill_expr = "max(0,g(X,Y)-max(r(X,Y),b(X,Y)))"
|
||
else:
|
||
# Blue spill: B exceeds max(R, G)
|
||
spill_expr = "max(0,b(X,Y)-max(r(X,Y),g(X,Y)))"
|
||
|
||
# Lerp factor based on spill amount
|
||
factor_expr = f"({spill_expr}/255*{strength:.2f})"
|
||
|
||
# Shift colors toward the bias
|
||
new_r = f"clip(r(X,Y)+({br}-r(X,Y))*{factor_expr},0,255)"
|
||
new_g = f"clip(g(X,Y)+({bg}-g(X,Y))*{factor_expr},0,255)"
|
||
new_b = f"clip(b(X,Y)+({bb}-b(X,Y))*{factor_expr},0,255)"
|
||
|
||
parts.append(f"geq=r='{new_r}':g='{new_g}':b='{new_b}':a='alpha(X,Y)'")
|
||
|
||
# Edge-aware despill: aggressively suppress green at semi-transparent edges
|
||
# This targets the 2-4px green fringe that regular despill misses
|
||
# edge_factor is high (1.0) at alpha=128, low (0) at alpha=0 or 255
|
||
# At edges: cap G so it never exceeds max(R, B)
|
||
if is_green_screen:
|
||
# Edge factor: peaks at alpha=128, falls off toward 0 and 255
|
||
# Using min(alpha, 255-alpha)/128 gives smooth 0→1→0 curve
|
||
edge_factor = "min(alpha(X,Y),255-alpha(X,Y))/128"
|
||
|
||
# Green excess at this pixel
|
||
green_excess = "max(0,g(X,Y)-max(r(X,Y),b(X,Y)))"
|
||
|
||
# Suppress green proportionally to edge_factor
|
||
# At edges: G = G - excess (caps G to max(R,B))
|
||
# At interior: G unchanged
|
||
new_g = f"clip(g(X,Y)-({green_excess})*({edge_factor}),0,255)"
|
||
|
||
parts.append(f"geq=r='r(X,Y)':g='{new_g}':b='b(X,Y)':a='alpha(X,Y)'")
|
||
else:
|
||
# Blue screen edge despill
|
||
edge_factor = "min(alpha(X,Y),255-alpha(X,Y))/128"
|
||
blue_excess = "max(0,b(X,Y)-max(r(X,Y),g(X,Y)))"
|
||
new_b = f"clip(b(X,Y)-({blue_excess})*({edge_factor}),0,255)"
|
||
|
||
parts.append(f"geq=r='r(X,Y)':g='g(X,Y)':b='{new_b}':a='alpha(X,Y)'")
|
||
|
||
# Edge erosion: shrink alpha channel to remove green fringe
|
||
# threshold=0 means "don't change", threshold=65535 means "full erosion"
|
||
# We want to erode only the alpha channel (plane 3), leave RGB unchanged
|
||
if cfg.edge_erode > 0:
|
||
erode_passes = min(cfg.edge_erode, 5)
|
||
for _ in range(erode_passes):
|
||
parts.append(
|
||
"erosion=threshold0=0:threshold1=0:threshold2=0:threshold3=65535"
|
||
)
|
||
|
||
# Edge softening (blur the alpha)
|
||
if cfg.edge_soften > 0:
|
||
# Use gblur on alpha channel only via format manipulation
|
||
# First extract to a format where we can blur, then re-merge
|
||
# Simpler approach: use avgblur with small radius
|
||
radius = min(int(cfg.edge_soften), 5)
|
||
if radius > 0:
|
||
parts.append(f"alphaextract,avgblur=sizeX={radius}:sizeY={radius}[blur]")
|
||
# This gets complex - for now, skip alpha blur and just use erosion
|
||
|
||
# Stay in rgba so downstream filters (color_grade, mask) don't trigger
|
||
# a redundant yuva444p10le→rgba round-trip and its 11-thread swscaler call.
|
||
# The caller (_process_chunk_to_prores4444) appends format=yuva444p10le at the end.
|
||
|
||
return ",".join(parts)
|
||
|
||
|
||
def parse_gnommokey_config(config: dict) -> GnommoKeyConfig:
|
||
"""Parse a gnommokey config dictionary into GnommoKeyConfig."""
|
||
# Parse screen_color
|
||
screen_color = config.get("screen_color", [0, 177, 64])
|
||
if isinstance(screen_color, list) and len(screen_color) == 3:
|
||
screen_color = tuple(screen_color)
|
||
else:
|
||
screen_color = (0, 177, 64)
|
||
|
||
# Parse despill_bias
|
||
despill_bias = config.get("despill_bias")
|
||
if despill_bias:
|
||
if isinstance(despill_bias, list) and len(despill_bias) == 3:
|
||
despill_bias = tuple(despill_bias)
|
||
else:
|
||
despill_bias = None
|
||
|
||
# Parse alpha_bias
|
||
alpha_bias = config.get("alpha_bias")
|
||
if alpha_bias:
|
||
if isinstance(alpha_bias, list) and len(alpha_bias) == 3:
|
||
alpha_bias = tuple(alpha_bias)
|
||
else:
|
||
alpha_bias = None
|
||
|
||
return GnommoKeyConfig(
|
||
screen_color=screen_color,
|
||
screen_gain=float(config.get("screen_gain", 100.0)),
|
||
screen_balance=float(config.get("screen_balance", 50.0)),
|
||
clip_black=float(config.get("clip_black", 0.0)),
|
||
clip_white=float(config.get("clip_white", 100.0)),
|
||
despill_bias=despill_bias,
|
||
despill_strength=float(config.get("despill_strength", 0.5)),
|
||
alpha_bias=alpha_bias,
|
||
protect_luma=int(config.get("protect_luma", -1)),
|
||
shadow_boost=float(config.get("shadow_boost", 0.0)),
|
||
edge_erode=int(config.get("edge_erode", 0)),
|
||
edge_soften=float(config.get("edge_soften", 0.0)),
|
||
)
|
||
|
||
|
||
def apply_combined_video_filters_chunked(
|
||
input_path: Path,
|
||
output_path: Path,
|
||
filters: list[dict],
|
||
verbose: bool = False,
|
||
take: float = None,
|
||
scratch_dir: Path = None,
|
||
) -> None:
|
||
"""
|
||
Apply video filters using chunk-based processing for large files.
|
||
|
||
For videos longer than CHUNK_DURATION:
|
||
1. Split into chunks
|
||
2. Process each chunk with filters
|
||
3. Encode to ProRes 4444 with alpha
|
||
4. Concatenate chunks into final output
|
||
|
||
Chunking allows parallel processing and avoids huge intermediate files.
|
||
"""
|
||
duration = take if take is not None else get_video_duration(input_path)
|
||
|
||
# Short video: process directly without chunking
|
||
if duration <= CHUNK_DURATION:
|
||
_process_chunk_to_prores4444(
|
||
input_path, output_path, filters, 0, duration, verbose, take, True
|
||
)
|
||
return
|
||
|
||
# Long video: process in chunks (parallel)
|
||
if scratch_dir is None:
|
||
scratch_dir = output_path.parent / "chunks"
|
||
scratch_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
num_chunks = int(duration / CHUNK_DURATION) + 1
|
||
chunk_files: list[Path] = []
|
||
chunk_tasks: list[tuple] = [] # (index, chunk_path, start_time, chunk_duration)
|
||
|
||
# Build list of chunk tasks
|
||
for i in range(num_chunks):
|
||
start_time = i * CHUNK_DURATION
|
||
chunk_duration = min(CHUNK_DURATION, duration - start_time)
|
||
|
||
if chunk_duration <= 0:
|
||
break
|
||
|
||
chunk_path = scratch_dir / f"chunk_{i:04d}.mov"
|
||
chunk_files.append(chunk_path)
|
||
chunk_tasks.append((i, chunk_path, start_time, chunk_duration))
|
||
|
||
num_workers = min(DEFAULT_CHUNK_WORKERS, len(chunk_tasks))
|
||
print(
|
||
f" Processing {len(chunk_tasks)} chunks in parallel ({num_workers} workers)"
|
||
)
|
||
|
||
# Process chunks in parallel
|
||
def process_chunk_task(task):
|
||
i, chunk_path, start_time, chunk_dur = task
|
||
_process_chunk_to_prores4444(
|
||
input_path,
|
||
chunk_path,
|
||
filters,
|
||
start_time,
|
||
chunk_dur,
|
||
verbose=False, # Suppress verbose in parallel mode
|
||
take=chunk_dur,
|
||
)
|
||
return i, chunk_path
|
||
|
||
completed = 0
|
||
with ThreadPoolExecutor(max_workers=num_workers) as executor:
|
||
futures = {
|
||
executor.submit(process_chunk_task, task): task for task in chunk_tasks
|
||
}
|
||
for future in as_completed(futures):
|
||
i, chunk_path = future.result()
|
||
completed += 1
|
||
print(
|
||
f" Completed chunk {i+1}/{len(chunk_tasks)} ({completed}/{len(chunk_tasks)} done)"
|
||
)
|
||
|
||
# Concatenate chunks into final output
|
||
concat_list = scratch_dir / "concat.txt"
|
||
with open(concat_list, "w") as cf:
|
||
for chunk_path in chunk_files:
|
||
cf.write(f"file '{chunk_path.resolve()}'\n")
|
||
|
||
if verbose:
|
||
print(f" Concatenating {len(chunk_files)} chunks → {output_path.name}")
|
||
|
||
concat_cmd = [
|
||
"ffmpeg",
|
||
"-y",
|
||
"-f",
|
||
"concat",
|
||
"-safe",
|
||
"0",
|
||
"-i",
|
||
str(concat_list),
|
||
"-c",
|
||
"copy",
|
||
str(output_path),
|
||
]
|
||
concat_result = run_ffmpeg_with_progress(concat_cmd, duration, "Concatenating")
|
||
if concat_result.returncode != 0:
|
||
raise PreprocessError(
|
||
"Chunk concatenation failed",
|
||
filter_type="concat",
|
||
command=" ".join(concat_cmd),
|
||
stderr=concat_result.stderr,
|
||
)
|
||
|
||
# Clean up chunk files and concat list
|
||
for chunk_path in chunk_files:
|
||
if chunk_path.exists():
|
||
chunk_path.unlink()
|
||
concat_list.unlink(missing_ok=True)
|
||
|
||
# Remove chunks directory if empty
|
||
try:
|
||
scratch_dir.rmdir()
|
||
except OSError:
|
||
pass
|
||
|
||
|
||
def _process_chunk_to_prores4444(
|
||
input_path: Path,
|
||
output_path: Path,
|
||
filters: list[dict],
|
||
start_time: float,
|
||
chunk_duration: float,
|
||
verbose: bool = False,
|
||
take: float = None,
|
||
keep_audio: bool = True,
|
||
) -> None:
|
||
"""
|
||
Process a video chunk with filters and encode to ProRes 4444 (MOV) with alpha.
|
||
|
||
This is intended as an intermediate format for compositing:
|
||
- true alpha channel (non-binary edges)
|
||
- 4:4:4 chroma (better key edges than 4:2:0)
|
||
- robust for concatenation and further filtering
|
||
"""
|
||
|
||
filter_parts: list[str] = []
|
||
|
||
for filter_config in filters:
|
||
filter_type = filter_config.get("type")
|
||
if filter_type == "chroma_key":
|
||
filter_parts.append(build_chroma_key_filter(filter_config))
|
||
elif filter_type == "mask":
|
||
filter_parts.append(build_mask_filter(filter_config))
|
||
elif filter_type == "color_grade":
|
||
filter_parts.append(build_color_grade_filter(filter_config))
|
||
elif filter_type == "gnommokey":
|
||
filter_parts.append(build_gnommokey_filter(filter_config))
|
||
|
||
video_filter = ",".join(filter_parts)
|
||
|
||
# Ensure we end in an alpha-capable pixel format.
|
||
# 10-bit 4:4:4 + alpha is ideal for keyed edges.
|
||
if video_filter:
|
||
video_filter += ",format=yuva444p10le"
|
||
else:
|
||
video_filter = "format=yuva444p10le"
|
||
|
||
# Build FFmpeg command
|
||
cmd: list[str] = ["ffmpeg", "-y"]
|
||
|
||
# Global thread limits MUST be before the first -i.
|
||
# After -i they become output-stream options and FFmpeg ignores them for the
|
||
# filter graph — each geq stage then spawns one thread per CPU core (11 on M-series),
|
||
# causing the N-way RGBA frame buffer explosion that OOM-kills the process.
|
||
cmd.extend(["-threads", _tc(), "-filter_threads", _tc()])
|
||
|
||
# Seek to start time (before input for fast seeking)
|
||
if start_time > 0:
|
||
cmd.extend(["-ss", str(start_time)])
|
||
|
||
# Limit initial file analysis to 50 MB. Without this, FFmpeg scans the entire
|
||
# source file when moov is at the end (common for camera recordings), which reads
|
||
# gigabytes of data and triggers OOM when multiple chunk workers run in parallel.
|
||
cmd.extend(["-probesize", "50000000", "-analyzeduration", "50000000"])
|
||
cmd.extend(["-i", str(input_path)])
|
||
|
||
# Limit duration
|
||
actual_take = take if take is not None else chunk_duration
|
||
if actual_take is not None:
|
||
cmd.extend(["-t", str(actual_take)])
|
||
|
||
cmd.extend(
|
||
[
|
||
"-vf",
|
||
video_filter,
|
||
"-c:v",
|
||
"prores_ks",
|
||
"-profile:v",
|
||
"4", # 4 = ProRes 4444
|
||
"-pix_fmt",
|
||
"yuva444p10le", # must carry alpha
|
||
"-vendor",
|
||
"apl0", # optional; helps some NLEs tag as Apple ProRes
|
||
]
|
||
)
|
||
|
||
# Audio handling (optional)
|
||
if keep_audio:
|
||
# PCM is the least surprising intermediate audio.
|
||
# You can also do "-c:a copy" if your source audio codec is stable across chunks.
|
||
cmd.extend(["-c:a", "pcm_s16le"])
|
||
else:
|
||
cmd.append("-an")
|
||
|
||
cmd.append(str(output_path))
|
||
|
||
if verbose:
|
||
print(f" Filter: {video_filter}")
|
||
print(f" Command: {' '.join(cmd)}")
|
||
|
||
result = run_ffmpeg_with_progress(cmd, actual_take or chunk_duration, "Encoding")
|
||
|
||
if result.returncode != 0:
|
||
raise PreprocessError(
|
||
"Chunk processing failed",
|
||
filter_type="chunk",
|
||
command=" ".join(cmd),
|
||
stderr=result.stderr,
|
||
)
|
||
|
||
# Validate the output file is a readable MOV (moov atom present).
|
||
# FFmpeg can return 0 but write a corrupt/incomplete file (e.g. moov atom
|
||
# missing) when faststart rewrite fails or disk is under pressure.
|
||
probe = subprocess.run(
|
||
[
|
||
"ffprobe",
|
||
"-v",
|
||
"error",
|
||
"-show_entries",
|
||
"format=duration",
|
||
"-of",
|
||
"csv=p=0",
|
||
str(output_path),
|
||
],
|
||
capture_output=True,
|
||
text=True,
|
||
)
|
||
if probe.returncode != 0 or not probe.stdout.strip():
|
||
raise PreprocessError(
|
||
f"Chunk output file is unreadable or missing moov atom: {output_path.name}",
|
||
filter_type="chunk",
|
||
command=" ".join(cmd),
|
||
stderr=probe.stderr,
|
||
)
|
||
|
||
|
||
def _process_chunk_to_webm(
|
||
input_path: Path,
|
||
output_path: Path,
|
||
filters: list[dict],
|
||
start_time: float,
|
||
chunk_duration: float,
|
||
verbose: bool = False,
|
||
take: float = None,
|
||
) -> None:
|
||
"""
|
||
Process a video chunk with filters and encode to VP9/WebM with alpha.
|
||
|
||
VP9 with alpha uses ~10-20% of ProRes 4444 file size while maintaining
|
||
good quality for compositing.
|
||
"""
|
||
filter_parts: list[str] = []
|
||
|
||
for filter_config in filters:
|
||
filter_type = filter_config.get("type")
|
||
if filter_type == "chroma_key":
|
||
filter_parts.append(build_chroma_key_filter(filter_config))
|
||
elif filter_type == "mask":
|
||
filter_parts.append(build_mask_filter(filter_config))
|
||
elif filter_type == "color_grade":
|
||
filter_parts.append(build_color_grade_filter(filter_config))
|
||
elif filter_type == "gnommokey":
|
||
filter_parts.append(build_gnommokey_filter(filter_config))
|
||
|
||
video_filter = ",".join(filter_parts)
|
||
|
||
# Force output to yuva420p to preserve alpha channel through to encoder
|
||
video_filter += ",format=yuva420p"
|
||
|
||
# Build FFmpeg command for VP9 with alpha
|
||
cmd = ["ffmpeg", "-y"]
|
||
|
||
# Seek to start time (before input for fast seeking)
|
||
if start_time > 0:
|
||
cmd.extend(["-ss", str(start_time)])
|
||
|
||
cmd.extend(["-i", str(input_path)])
|
||
|
||
# Limit duration
|
||
actual_take = take if take is not None else chunk_duration
|
||
if actual_take is not None:
|
||
cmd.extend(["-t", str(actual_take)])
|
||
|
||
cmd.extend(
|
||
[
|
||
"-vf",
|
||
video_filter,
|
||
"-c:v",
|
||
"libvpx-vp9",
|
||
"-pix_fmt",
|
||
"yuva420p", # VP9 with alpha
|
||
"-auto-alt-ref",
|
||
"0", # Required for alpha channel in VP9
|
||
"-crf",
|
||
"25", # Quality (lower = better, 15-35 typical)
|
||
"-b:v",
|
||
"0", # Variable bitrate mode
|
||
"-deadline",
|
||
"good", # Encoding speed (good balance)
|
||
"-cpu-used",
|
||
"2", # Speed/quality tradeoff (0-5, lower = better)
|
||
"-c:a",
|
||
"libopus", # Opus audio codec
|
||
"-b:a",
|
||
"128k",
|
||
str(output_path),
|
||
]
|
||
)
|
||
|
||
if verbose:
|
||
print(f" Filter: {video_filter}")
|
||
print(f" Command: {' '.join(cmd)}")
|
||
|
||
result = run_ffmpeg_with_progress(cmd, actual_take or chunk_duration, "Encoding")
|
||
|
||
if result.returncode != 0:
|
||
raise PreprocessError(
|
||
"Chunk processing failed",
|
||
filter_type="chunk",
|
||
command=" ".join(cmd),
|
||
stderr=result.stderr,
|
||
)
|
||
|
||
|
||
def _concatenate_prores4444_chunks(
|
||
chunk_files: list[Path],
|
||
output_path: Path,
|
||
verbose: bool = False,
|
||
keep_audio: bool = False,
|
||
) -> None:
|
||
"""
|
||
Concatenate ProRes 4444 (MOV) chunks into a single ProRes 4444 output.
|
||
|
||
Uses FFmpeg concat demuxer, then re-encodes once to ensure alpha and
|
||
stream consistency across chunks.
|
||
"""
|
||
|
||
concat_list = output_path.parent / "concat_list.txt"
|
||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||
|
||
with open(concat_list, "w", encoding="utf-8") as f:
|
||
for chunk in chunk_files:
|
||
f.write(f"file '{chunk.resolve()}'\n")
|
||
|
||
cmd: list[str] = [
|
||
"ffmpeg",
|
||
"-y",
|
||
"-f",
|
||
"concat",
|
||
"-safe",
|
||
"0",
|
||
"-i",
|
||
str(concat_list),
|
||
# Encode to ProRes 4444 with alpha
|
||
"-c:v",
|
||
"prores_ks",
|
||
"-profile:v",
|
||
"4", # ProRes 4444
|
||
"-pix_fmt",
|
||
"yuva444p10le", # preserve alpha + best key edges
|
||
"-vendor",
|
||
"apl0",
|
||
"-movflags",
|
||
"+faststart",
|
||
]
|
||
|
||
if keep_audio:
|
||
# safest for intermediates; alternatively "-c:a copy" if identical across chunks
|
||
cmd += ["-c:a", "pcm_s16le"]
|
||
else:
|
||
cmd += ["-an"]
|
||
|
||
cmd.append(str(output_path))
|
||
|
||
if verbose:
|
||
print(f" Concat list: {concat_list}")
|
||
print(f" Command: {' '.join(cmd)}")
|
||
|
||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||
|
||
if result.returncode != 0:
|
||
raise PreprocessError(
|
||
"Chunk concatenation failed",
|
||
filter_type="concat",
|
||
command=" ".join(cmd),
|
||
stderr=result.stderr,
|
||
)
|
||
|
||
|
||
def _concatenate_webm_chunks(
|
||
chunk_files: list[Path],
|
||
output_path: Path,
|
||
verbose: bool = False,
|
||
) -> None:
|
||
"""
|
||
Concatenate WebM chunks into a single output file.
|
||
|
||
Uses FFmpeg's concat demuxer for lossless concatenation.
|
||
"""
|
||
# Create concat file list
|
||
concat_list = output_path.parent / "concat_list.txt"
|
||
|
||
with open(concat_list, "w") as f:
|
||
for chunk in chunk_files:
|
||
# FFmpeg concat format: file 'path'
|
||
f.write(f"file '{chunk.resolve()}'\n")
|
||
|
||
cmd = [
|
||
"ffmpeg",
|
||
"-y",
|
||
"-f",
|
||
"concat",
|
||
"-safe",
|
||
"0",
|
||
"-i",
|
||
str(concat_list),
|
||
"-c:v",
|
||
"libvpx-vp9",
|
||
"-pix_fmt",
|
||
"yuva420p", # Stream copy (no re-encoding)
|
||
str(output_path),
|
||
]
|
||
|
||
if verbose:
|
||
print(f" Concat list: {concat_list}")
|
||
print(f" Command: {' '.join(cmd)}")
|
||
|
||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||
|
||
# Clean up concat list
|
||
concat_list.unlink()
|
||
|
||
if result.returncode != 0:
|
||
raise PreprocessError(
|
||
"Chunk concatenation failed",
|
||
filter_type="concat",
|
||
command=" ".join(cmd),
|
||
stderr=result.stderr,
|
||
)
|
||
|
||
|
||
def apply_chroma_key(
|
||
input_path: Path,
|
||
output_path: Path,
|
||
config: dict[str, Any],
|
||
verbose: bool = False,
|
||
take: float = None,
|
||
) -> None:
|
||
"""
|
||
Apply chroma key (green screen) filter using FFmpeg.
|
||
|
||
Config options:
|
||
color: [R, G, B] - Color to key out (default: [0, 255, 0] green)
|
||
similarity: float - Color similarity threshold 0.0-1.0 (default: 0.4)
|
||
blend: float - Edge blend/feathering 0.0-1.0 (default: 0.08)
|
||
spill: float - Spill suppression 0.0-1.0 (default: 0.1)
|
||
|
||
Args:
|
||
take: Optional duration in seconds to limit processing (for quick iteration)
|
||
|
||
Output is ProRes 4444 with alpha channel for lossless quality.
|
||
"""
|
||
# Parse config with defaults
|
||
chroma_config = parse_chroma_key_config(config)
|
||
|
||
# Convert RGB to hex format for FFmpeg
|
||
r, g, b = chroma_config.color
|
||
hex_color = f"0x{r:02x}{g:02x}{b:02x}"
|
||
|
||
# Build FFmpeg chromakey filter
|
||
# chromakey=color:similarity:blend
|
||
# Using higher similarity to capture more green shades
|
||
filter_parts = [
|
||
f"chromakey={hex_color}:{chroma_config.similarity:.3f}:{chroma_config.blend:.3f}"
|
||
]
|
||
|
||
# Add despill to remove green spill on edges (always recommended for green screen)
|
||
if chroma_config.spill > 0:
|
||
filter_parts.append(f"despill=type=green:mix={chroma_config.spill:.3f}")
|
||
|
||
video_filter = ",".join(filter_parts)
|
||
|
||
# Build FFmpeg command
|
||
# ProRes 4444 profile for alpha channel support
|
||
cmd = ["ffmpeg", "-y"]
|
||
# Global options before -i
|
||
cmd.extend(["-threads", _tc(), "-filter_threads", _tc()])
|
||
|
||
# Add duration limit if specified (before input for efficiency)
|
||
if take is not None:
|
||
cmd.extend(["-t", str(take)])
|
||
|
||
cmd.extend(
|
||
[
|
||
"-i",
|
||
str(input_path),
|
||
"-vf",
|
||
video_filter,
|
||
"-c:v",
|
||
"prores_ks",
|
||
"-profile:v",
|
||
"4", # ProRes 4444
|
||
"-pix_fmt",
|
||
"yuva444p10le", # 10-bit with alpha
|
||
"-c:a",
|
||
"pcm_s16le", # Lossless audio
|
||
str(output_path),
|
||
]
|
||
)
|
||
|
||
if verbose:
|
||
print(f" Filter: {video_filter}")
|
||
if take:
|
||
print(f" Duration limit: {take}s")
|
||
print(f" Command: {' '.join(cmd)}")
|
||
|
||
# Get duration for progress bar
|
||
duration = take if take is not None else get_video_duration(input_path)
|
||
|
||
result = run_ffmpeg_with_progress(cmd, duration, "Chroma key")
|
||
|
||
if result.returncode != 0:
|
||
raise PreprocessError(
|
||
"Chroma key filter failed",
|
||
filter_type="chroma_key",
|
||
command=" ".join(cmd),
|
||
stderr=result.stderr,
|
||
)
|
||
|
||
|
||
def apply_mask(
|
||
input_path: Path,
|
||
output_path: Path,
|
||
config: dict[str, Any],
|
||
verbose: bool = False,
|
||
take: float = None,
|
||
) -> None:
|
||
"""
|
||
Apply a mask to make edges transparent using FFmpeg.
|
||
|
||
Config options:
|
||
left: float - Percentage of left side to make transparent (0.0-1.0)
|
||
right: float - Percentage of right side to make transparent (0.0-1.0)
|
||
top: float - Percentage of top to make transparent (0.0-1.0)
|
||
bottom: float - Percentage of bottom to make transparent (0.0-1.0)
|
||
|
||
Args:
|
||
take: Optional duration in seconds to limit processing
|
||
|
||
Uses geq filter to set alpha channel to 0 for masked regions.
|
||
"""
|
||
left = float(config.get("left", 0))
|
||
right = float(config.get("right", 0))
|
||
top = float(config.get("top", 0))
|
||
bottom = float(config.get("bottom", 0))
|
||
|
||
# Build alpha expression for geq filter
|
||
# Alpha is 255 (opaque) in the center, 0 (transparent) at edges
|
||
# X < W*left -> transparent
|
||
# X > W*(1-right) -> transparent
|
||
# Y < H*top -> transparent
|
||
# Y > H*(1-bottom) -> transparent
|
||
conditions = []
|
||
if left > 0:
|
||
conditions.append(f"lt(X,W*{left})")
|
||
if right > 0:
|
||
conditions.append(f"gt(X,W*{1-right})")
|
||
if top > 0:
|
||
conditions.append(f"lt(Y,H*{top})")
|
||
if bottom > 0:
|
||
conditions.append(f"gt(Y,H*{1-bottom})")
|
||
|
||
if not conditions:
|
||
# No masking needed, just copy
|
||
import shutil
|
||
|
||
shutil.copy2(input_path, output_path)
|
||
return
|
||
|
||
# Combine conditions with OR - if any condition is true, alpha = 0
|
||
alpha_expr = "+".join(conditions)
|
||
# geq alpha: if any condition matches, return 0, else return alpha(X,Y)
|
||
# Using: if(condition, 0, alpha(X,Y))
|
||
alpha_expr = f"if({alpha_expr},0,alpha(X,Y))"
|
||
|
||
# Build the geq filter - preserve RGB channels, modify alpha
|
||
video_filter = f"geq=r='r(X,Y)':g='g(X,Y)':b='b(X,Y)':a='{alpha_expr}'"
|
||
|
||
# Build FFmpeg command
|
||
cmd = ["ffmpeg", "-y"]
|
||
# Global options before -i
|
||
cmd.extend(["-threads", _tc(), "-filter_threads", _tc()])
|
||
|
||
if take is not None:
|
||
cmd.extend(["-t", str(take)])
|
||
|
||
cmd.extend(
|
||
[
|
||
"-i",
|
||
str(input_path),
|
||
"-vf",
|
||
video_filter,
|
||
"-c:v",
|
||
"prores_ks",
|
||
"-profile:v",
|
||
"4", # ProRes 4444
|
||
"-pix_fmt",
|
||
"yuva444p10le", # 10-bit with alpha
|
||
"-c:a",
|
||
"pcm_s16le", # Lossless audio
|
||
str(output_path),
|
||
]
|
||
)
|
||
|
||
if verbose:
|
||
print(f" Mask: left={left}, right={right}, top={top}, bottom={bottom}")
|
||
print(f" Filter: {video_filter}")
|
||
print(f" Command: {' '.join(cmd)}")
|
||
|
||
# Get duration for progress bar
|
||
duration = take if take is not None else get_video_duration(input_path)
|
||
|
||
result = run_ffmpeg_with_progress(cmd, duration, "Mask")
|
||
|
||
if result.returncode != 0:
|
||
raise PreprocessError(
|
||
"Mask filter failed",
|
||
filter_type="mask",
|
||
command=" ".join(cmd),
|
||
stderr=result.stderr,
|
||
)
|
||
|
||
|
||
def apply_transcribe(
|
||
input_path: Path,
|
||
config: dict[str, Any],
|
||
verbose: bool = False,
|
||
force: bool = False,
|
||
) -> Path:
|
||
"""
|
||
Transcribe video audio using Whisper and save to JSON file.
|
||
|
||
Config options:
|
||
model: str - Whisper model size (tiny, base, small, medium, large). Default: "base"
|
||
output: str - Output filename. Default: input filename with .transcript.json suffix
|
||
|
||
This filter doesn't transform the video, it creates a sidecar transcript file.
|
||
Skips if output file exists unless force=True.
|
||
|
||
Returns:
|
||
Path to the transcript JSON file.
|
||
"""
|
||
from .transcriber import transcribe_video, save_transcript
|
||
|
||
model = config.get("model", "base")
|
||
output_name = config.get("output")
|
||
|
||
if output_name:
|
||
output_path = input_path.parent / output_name
|
||
else:
|
||
output_path = input_path.with_suffix(".transcript.json")
|
||
|
||
# Skip if exists (unless force)
|
||
if output_path.exists() and not force:
|
||
print(f" Transcript exists, skipping: {output_path.name}")
|
||
print(" (use --force to regenerate)")
|
||
return output_path
|
||
|
||
if verbose:
|
||
print(f" Model: {model}")
|
||
print(f" Output: {output_path}")
|
||
|
||
# Run transcription
|
||
words = transcribe_video(input_path, model=model)
|
||
save_transcript(words, output_path)
|
||
|
||
print(f" Transcribed {len(words)} words -> {output_path.name}")
|
||
|
||
return output_path
|
||
|
||
|
||
def apply_audio_normalize(
|
||
input_path: Path,
|
||
output_path: Path,
|
||
config: dict[str, Any],
|
||
verbose: bool = False,
|
||
take: float = None,
|
||
use_audio_channels: str = "both",
|
||
skip_loudnorm: bool = False,
|
||
) -> None:
|
||
"""
|
||
Apply audio normalization: denoise, compress, and loudness normalize.
|
||
|
||
If skip_loudnorm=True, the loudnorm filter is skipped. Use this for segments
|
||
that will be concatenated, then apply loudnorm once to the final output.
|
||
|
||
Config options:
|
||
# Room treatment
|
||
highpass: float - High-pass filter frequency in Hz (0 = disabled, try 80-120)
|
||
lowpass: float - Low-pass filter frequency in Hz (0 = disabled)
|
||
room_eq: bool - Enable room resonance EQ cut
|
||
room_eq_freq: float - Center frequency for room cut (default: 300)
|
||
room_eq_gain: float - Gain in dB, negative = cut (default: -4)
|
||
room_eq_width: float - Q/bandwidth (default: 1.5)
|
||
|
||
# Gate (reverb tail reduction)
|
||
gate: bool - Enable noise gate
|
||
gate_threshold: float - Threshold in dB (default: -35)
|
||
gate_range: float - Attenuation in dB when closed (default: -20)
|
||
gate_attack: float - Attack time in ms (default: 10)
|
||
gate_release: float - Release time in ms (default: 150)
|
||
|
||
# Neural de-reverb
|
||
dereverb_model: str - Path to RNNoise model file (empty = disabled)
|
||
|
||
# Noise reduction
|
||
denoise: bool - Enable noise reduction (default: True)
|
||
noise_floor: float - Noise floor in dB (default: -25)
|
||
|
||
# Compression
|
||
compress: bool - Enable compression (default: True)
|
||
threshold: float - Compression threshold in dB (default: -20)
|
||
ratio: float - Compression ratio (default: 4)
|
||
attack: float - Attack time in ms (default: 5)
|
||
release: float - Release time in ms (default: 50)
|
||
makeup: float - Makeup gain in dB (default: 2)
|
||
|
||
# Loudness normalization
|
||
normalize: bool - Enable loudness normalization (default: True)
|
||
target_lufs: float - Target loudness in LUFS (default: -16)
|
||
target_lra: float - Target loudness range (default: 11)
|
||
target_tp: float - Target true peak in dB (default: -1.5)
|
||
|
||
Args:
|
||
use_audio_channels: "both", "left", or "right" - which channel(s) to use,
|
||
output is always stereo with sound in both channels
|
||
|
||
Filter chain order:
|
||
channel_map -> eq_bands -> highpass -> lowpass -> room_eq -> dereverb -> denoise -> gate -> compress -> normalize
|
||
"""
|
||
cfg = parse_audio_normalize_config(config)
|
||
if not cfg.enabled:
|
||
# No audio processing, just copy
|
||
import shutil
|
||
|
||
shutil.copy2(input_path, output_path)
|
||
return
|
||
# Build audio filter chain (order matters!)
|
||
audio_filters: list[str] = []
|
||
|
||
# 0. Channel mapping - take specified channel(s) and output stereo
|
||
if use_audio_channels == "left":
|
||
# Take left channel, duplicate to both stereo channels
|
||
audio_filters.append("pan=stereo|c0=c0|c1=c0")
|
||
elif use_audio_channels == "right":
|
||
# Take right channel, duplicate to both stereo channels
|
||
audio_filters.append("pan=stereo|c0=c1|c1=c1")
|
||
|
||
# 0.5. Parametric EQ bands (applied early for tonal shaping)
|
||
for band in cfg.eq_bands:
|
||
if band.type == "lowshelf":
|
||
# Low shelf filter: boosts/cuts frequencies below the center
|
||
audio_filters.append(
|
||
f"lowshelf=f={band.freq:.1f}:g={band.gain:.1f}:t=q:w={band.q:.2f}"
|
||
)
|
||
elif band.type == "highshelf":
|
||
# High shelf filter: boosts/cuts frequencies above the center
|
||
audio_filters.append(
|
||
f"highshelf=f={band.freq:.1f}:g={band.gain:.1f}:t=q:w={band.q:.2f}"
|
||
)
|
||
else:
|
||
# Peak/parametric EQ band
|
||
audio_filters.append(
|
||
f"equalizer=f={band.freq:.1f}:width_type=q:width={band.q:.2f}:g={band.gain:.1f}"
|
||
)
|
||
|
||
# 1. High-pass filter (remove room rumble and low-frequency buildup)
|
||
if cfg.highpass > 0:
|
||
audio_filters.append(f"highpass=f={cfg.highpass:.1f}")
|
||
|
||
# 2. Low-pass filter (remove harsh highs if needed)
|
||
if cfg.lowpass > 0:
|
||
audio_filters.append(f"lowpass=f={cfg.lowpass:.1f}")
|
||
|
||
# 3. Room resonance EQ cut (reduce muddy frequencies from room modes)
|
||
if cfg.room_eq:
|
||
# equalizer filter: f=frequency, width_type=q, width=Q, g=gain
|
||
audio_filters.append(
|
||
f"equalizer=f={cfg.room_eq_freq:.1f}"
|
||
f":width_type=q:width={cfg.room_eq_width:.2f}"
|
||
f":g={cfg.room_eq_gain:.1f}"
|
||
)
|
||
|
||
# 4. Neural de-reverb (arnndn - very effective if model available)
|
||
if cfg.dereverb_model:
|
||
model_path = Path(cfg.dereverb_model)
|
||
if model_path.exists():
|
||
audio_filters.append(f"arnndn=m={model_path}:mix={cfg.dereverb_mix:.2f}")
|
||
else:
|
||
print(f" Warning: dereverb model not found: {model_path}")
|
||
|
||
# 5. Noise reduction (afftdn)
|
||
if cfg.denoise:
|
||
audio_filters.append(f"afftdn=nf={cfg.noise_floor:.1f}")
|
||
|
||
# 6. Noise gate (reduce reverb tails during pauses)
|
||
if cfg.gate:
|
||
# agate: threshold, range (attenuation), attack, release
|
||
audio_filters.append(
|
||
f"agate=threshold={cfg.gate_threshold:.1f}dB"
|
||
f":range={cfg.gate_range:.1f}dB"
|
||
f":attack={cfg.gate_attack:.1f}"
|
||
f":release={cfg.gate_release:.1f}"
|
||
)
|
||
|
||
# 7. Compression (acompressor)
|
||
if cfg.compress:
|
||
audio_filters.append(
|
||
f"acompressor=threshold={cfg.threshold:.1f}dB"
|
||
f":ratio={cfg.ratio:.1f}"
|
||
f":attack={cfg.attack:.1f}"
|
||
f":release={cfg.release:.1f}"
|
||
f":makeup={cfg.makeup:.1f}dB"
|
||
)
|
||
|
||
# 8. Loudness normalization (loudnorm - EBU R128)
|
||
# Skip if skip_loudnorm=True (for segments that will be concatenated)
|
||
if cfg.normalize and not skip_loudnorm:
|
||
audio_filters.append(
|
||
f"loudnorm=I={cfg.target_lufs:.1f}"
|
||
f":LRA={cfg.target_lra:.1f}"
|
||
f":TP={cfg.target_tp:.1f}"
|
||
)
|
||
|
||
if not audio_filters:
|
||
# No filters enabled, just copy
|
||
import shutil
|
||
|
||
shutil.copy2(input_path, output_path)
|
||
return
|
||
|
||
audio_filter = ",".join(audio_filters)
|
||
|
||
# Build FFmpeg command - copy video, process audio
|
||
cmd = ["ffmpeg", "-y"]
|
||
|
||
if take is not None:
|
||
cmd.extend(["-t", str(take)])
|
||
|
||
cmd.extend(
|
||
[
|
||
"-i",
|
||
str(input_path),
|
||
"-c:v",
|
||
"copy", # Copy video stream unchanged
|
||
"-af",
|
||
audio_filter,
|
||
"-c:a",
|
||
"pcm_s16le", # Lossless audio output
|
||
str(output_path),
|
||
]
|
||
)
|
||
|
||
if verbose:
|
||
print(f" Audio filter: {audio_filter}")
|
||
print(f" Command: {' '.join(cmd)}")
|
||
|
||
# Get duration for progress bar
|
||
duration = take if take is not None else get_video_duration(input_path)
|
||
|
||
result = run_ffmpeg_with_progress(cmd, duration, "Audio normalize")
|
||
|
||
if result.returncode != 0:
|
||
raise PreprocessError(
|
||
"Audio normalization failed",
|
||
filter_type="audio_normalize",
|
||
command=" ".join(cmd),
|
||
stderr=result.stderr,
|
||
)
|
||
|
||
|
||
def parse_audio_normalize_config(config: dict[str, Any]) -> AudioNormalizeConfig:
|
||
"""Parse an audio normalize config dictionary into AudioNormalizeConfig."""
|
||
# Parse EQ bands
|
||
eq_bands = []
|
||
for band in config.get("eq_bands", []):
|
||
eq_bands.append(
|
||
EQBand(
|
||
freq=float(band.get("freq", 1000)),
|
||
gain=float(band.get("gain", 0)),
|
||
q=float(band.get("q", 1.0)),
|
||
type=str(band.get("type", "peak")),
|
||
)
|
||
)
|
||
|
||
return AudioNormalizeConfig(
|
||
enabled=bool(config.get("enabled", True)),
|
||
# Parametric EQ
|
||
eq_bands=eq_bands,
|
||
# Room treatment
|
||
highpass=float(config.get("highpass", 0.0)),
|
||
lowpass=float(config.get("lowpass", 0.0)),
|
||
room_eq=bool(config.get("room_eq", False)),
|
||
room_eq_freq=float(config.get("room_eq_freq", 300.0)),
|
||
room_eq_gain=float(config.get("room_eq_gain", -4.0)),
|
||
room_eq_width=float(config.get("room_eq_width", 1.5)),
|
||
# Gate
|
||
gate=bool(config.get("gate", False)),
|
||
gate_threshold=float(config.get("gate_threshold", -35.0)),
|
||
gate_range=float(config.get("gate_range", -20.0)),
|
||
gate_attack=float(config.get("gate_attack", 10.0)),
|
||
gate_release=float(config.get("gate_release", 150.0)),
|
||
# Neural de-reverb
|
||
dereverb_model=str(config.get("dereverb_model", "")),
|
||
dereverb_mix=float(config.get("dereverb_mix", 0.8)),
|
||
# Noise reduction
|
||
denoise=bool(config.get("denoise", True)),
|
||
noise_floor=float(config.get("noise_floor", -25.0)),
|
||
# Compression
|
||
compress=bool(config.get("compress", True)),
|
||
threshold=float(config.get("threshold", -20.0)),
|
||
ratio=float(config.get("ratio", 4.0)),
|
||
attack=float(config.get("attack", 5.0)),
|
||
release=float(config.get("release", 50.0)),
|
||
makeup=float(config.get("makeup", 2.0)),
|
||
# Loudness normalization
|
||
normalize=bool(config.get("normalize", True)),
|
||
target_lufs=float(config.get("target_lufs", -16.0)),
|
||
target_lra=float(config.get("target_lra", 11.0)),
|
||
target_tp=float(config.get("target_tp", -1.5)),
|
||
)
|
||
|
||
|
||
def parse_chroma_key_config(config: dict[str, Any]) -> ChromaKeyConfig:
|
||
"""Parse a chroma key config dictionary into ChromaKeyConfig.
|
||
|
||
Defaults are tuned for aggressive green screen removal:
|
||
- similarity 0.4: Captures wide range of green shades (lighting variations)
|
||
- blend 0.08: Tight edges with minimal feathering
|
||
- spill 0.1: Light despill to remove green reflections on subject
|
||
- edge_erode 0: No alpha erosion (set 1-3 to remove green fringe)
|
||
- protect_color: Optional RGB color to protect from keying (e.g., yellow jumpsuit)
|
||
- protect_tolerance: How much variation from protect_color to allow (0-1, default 0.15)
|
||
"""
|
||
color = config.get("color", [0, 255, 0])
|
||
if isinstance(color, list) and len(color) == 3:
|
||
color = tuple(color)
|
||
else:
|
||
color = (0, 255, 0)
|
||
|
||
# Parse protect_color if provided
|
||
protect_color = config.get("protect_color")
|
||
if protect_color:
|
||
if isinstance(protect_color, list) and len(protect_color) == 3:
|
||
protect_color = tuple(protect_color)
|
||
else:
|
||
protect_color = None
|
||
|
||
return ChromaKeyConfig(
|
||
color=color,
|
||
similarity=float(config.get("similarity", 0.4)),
|
||
blend=float(config.get("blend", 0.08)),
|
||
spill=float(config.get("spill", 0.1)),
|
||
edge_erode=int(config.get("edge_erode", 0)),
|
||
protect_color=protect_color,
|
||
protect_tolerance=float(config.get("protect_tolerance", 0.15)),
|
||
)
|
||
|
||
|
||
def get_preprocessed_path(videos_dir: Path, video_source: VideoSource) -> Path:
|
||
"""
|
||
Get the path to the preprocessed video file.
|
||
|
||
Returns output_file if specified, otherwise returns source_file.
|
||
"""
|
||
if video_source.output_file:
|
||
return videos_dir / video_source.output_file
|
||
return videos_dir / video_source.source_file
|
||
|
||
|
||
def needs_preprocessing(videos_dir: Path, video_source: VideoSource) -> bool:
|
||
"""Check if preprocessing is needed (has filters and output doesn't exist)."""
|
||
if not video_source.filter:
|
||
return False
|
||
|
||
if video_source.output_file:
|
||
output_path = videos_dir / video_source.output_file
|
||
if output_path.exists():
|
||
return False
|
||
# Also check for WebM variant
|
||
webm_path = output_path.with_suffix(".mov")
|
||
if webm_path.exists():
|
||
return False
|
||
return True
|
||
|
||
return True
|
||
|
||
|
||
def stitch_narration_segments(
|
||
videos_dir: Path,
|
||
segment_ids: list[str],
|
||
videos: dict[str, VideoSource],
|
||
output_path: Path,
|
||
verbose: bool = False,
|
||
default_end_trim: float = 0.0,
|
||
loudnorm_config: Optional[dict] = None,
|
||
) -> Path:
|
||
"""
|
||
Stitch multiple narration video segments into a single file.
|
||
|
||
Each segment's skip and take values are applied to trim dead video at the
|
||
start/end of each recording. The segments are concatenated in the order
|
||
specified by segment_ids.
|
||
|
||
Args:
|
||
videos_dir: Directory containing video files
|
||
segment_ids: Ordered list of video IDs from videos.json
|
||
videos: Dict of video ID -> VideoSource from videos.json
|
||
output_path: Path for the concatenated output file
|
||
verbose: Enable verbose output
|
||
default_end_trim: Seconds to trim from the end when no explicit end/take is set
|
||
|
||
Returns:
|
||
Path to the stitched video file.
|
||
"""
|
||
if len(segment_ids) == 1:
|
||
# Single segment - just return its processed path
|
||
video_source = videos[segment_ids[0]]
|
||
return get_preprocessed_path(videos_dir, video_source)
|
||
|
||
print(f" Concatenating {len(segment_ids)} narration segments...")
|
||
|
||
# Create temp directory for trimmed segments
|
||
temp_dir = output_path.parent / "concat_temp"
|
||
temp_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
trimmed_segments: list[Path] = []
|
||
|
||
for i, video_id in enumerate(segment_ids):
|
||
if video_id not in videos:
|
||
raise PreprocessError(
|
||
f"Narration segment '{video_id}' not found in videos.json",
|
||
filter_type=None,
|
||
)
|
||
|
||
video_source = videos[video_id]
|
||
source_path = get_preprocessed_path(videos_dir, video_source)
|
||
|
||
if not source_path.exists():
|
||
raise PreprocessError(
|
||
f"Narration segment not found: {source_path}",
|
||
filter_type=None,
|
||
)
|
||
|
||
# Get segment duration
|
||
full_duration = get_video_duration(source_path)
|
||
skip = video_source.skip or 0.0
|
||
take = video_source.take
|
||
|
||
# Apply default end trim if no explicit take/end was set
|
||
if take is None and default_end_trim > 0:
|
||
take = max(0.0, full_duration - skip - default_end_trim)
|
||
|
||
# Calculate effective duration
|
||
if take is not None:
|
||
effective_duration = min(take, full_duration - skip)
|
||
else:
|
||
effective_duration = full_duration - skip
|
||
|
||
if verbose:
|
||
print(f" Segment {i+1}: {video_id}")
|
||
print(f" Source: {source_path.name}")
|
||
print(
|
||
f" Skip: {skip}s, Take: {take or 'all'}s, Duration: {effective_duration:.1f}s"
|
||
)
|
||
|
||
# Always re-encode every segment to normalize fps and timestamps.
|
||
# Mixing un-normalized source files (e.g. 60fps camera) with
|
||
# trimmed-and-re-encoded 30fps segments causes cumulative A/V drift
|
||
# in the final concat.
|
||
|
||
# Trim/normalize the segment
|
||
trimmed_path = temp_dir / f"segment_{i:03d}.mov"
|
||
|
||
# Check if source has alpha channel (for ProRes 4444, etc.)
|
||
has_alpha = _video_has_alpha(source_path)
|
||
|
||
# Re-encode to normalize framerate and fix timestamps
|
||
# Different segments may have different framerates which breaks concatenation
|
||
cmd = ["ffmpeg", "-y"]
|
||
if skip > 0:
|
||
cmd.extend(["-ss", str(skip)])
|
||
cmd.extend(["-i", str(source_path)])
|
||
if take is not None:
|
||
cmd.extend(["-t", str(take)])
|
||
|
||
if has_alpha:
|
||
# Preserve alpha with ProRes 4444
|
||
cmd.extend(
|
||
[
|
||
"-vf",
|
||
"fps=30,format=yuva444p10le",
|
||
"-c:v",
|
||
"prores_ks",
|
||
"-profile:v",
|
||
"4",
|
||
"-pix_fmt",
|
||
"yuva444p10le",
|
||
"-c:a",
|
||
"pcm_s16le",
|
||
"-avoid_negative_ts",
|
||
"make_zero",
|
||
str(trimmed_path),
|
||
]
|
||
)
|
||
else:
|
||
# No alpha - use fast h264 encoding
|
||
cmd.extend(
|
||
[
|
||
"-vf",
|
||
"fps=30",
|
||
"-c:v",
|
||
"libx264",
|
||
"-preset",
|
||
"fast",
|
||
"-crf",
|
||
"18",
|
||
"-c:a",
|
||
"aac",
|
||
"-b:a",
|
||
"192k",
|
||
"-avoid_negative_ts",
|
||
"make_zero",
|
||
"-movflags",
|
||
"+faststart",
|
||
str(trimmed_path),
|
||
]
|
||
)
|
||
|
||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||
if result.returncode != 0:
|
||
raise PreprocessError(
|
||
f"Failed to trim segment {video_id}",
|
||
filter_type="concat",
|
||
command=" ".join(cmd),
|
||
stderr=result.stderr,
|
||
)
|
||
|
||
trimmed_segments.append(trimmed_path)
|
||
|
||
# Build concat file list
|
||
concat_list = temp_dir / "concat_list.txt"
|
||
with open(concat_list, "w", encoding="utf-8") as f:
|
||
for segment in trimmed_segments:
|
||
f.write(f"file '{segment.resolve()}'\n")
|
||
|
||
# Concatenate all segments
|
||
print(f" Stitching {len(trimmed_segments)} segments -> {output_path.name}")
|
||
|
||
cmd = [
|
||
"ffmpeg",
|
||
"-y",
|
||
"-f",
|
||
"concat",
|
||
"-safe",
|
||
"0",
|
||
"-i",
|
||
str(concat_list),
|
||
"-c:v",
|
||
"copy",
|
||
"-c:a",
|
||
"copy",
|
||
"-movflags",
|
||
"+faststart",
|
||
str(output_path),
|
||
]
|
||
|
||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||
if result.returncode != 0:
|
||
raise PreprocessError(
|
||
"Segment concatenation failed",
|
||
filter_type="concat",
|
||
command=" ".join(cmd),
|
||
stderr=result.stderr,
|
||
)
|
||
|
||
# Apply loudnorm if any segment had defer_loudnorm=True
|
||
needs_loudnorm = any(
|
||
videos[seg_id].defer_loudnorm for seg_id in segment_ids if seg_id in videos
|
||
)
|
||
if needs_loudnorm:
|
||
print(" Applying loudness normalization to stitched output...")
|
||
normalized_path = (
|
||
output_path.parent / f"{output_path.stem}_normalized{output_path.suffix}"
|
||
)
|
||
|
||
# Build loudnorm filter string from project config (or fall back to defaults)
|
||
_cfg = loudnorm_config or {}
|
||
_lufs = float(_cfg.get("target_lufs", -14))
|
||
_lra = float(_cfg.get("target_lra", 11))
|
||
_tp = float(_cfg.get("target_tp", -1.5))
|
||
loudnorm_filter = f"loudnorm=I={_lufs:.1f}:LRA={_lra:.1f}:TP={_tp:.1f}"
|
||
|
||
loudnorm_cmd = [
|
||
"ffmpeg",
|
||
"-y",
|
||
"-i",
|
||
str(output_path),
|
||
"-c:v",
|
||
"copy",
|
||
"-af",
|
||
loudnorm_filter,
|
||
"-c:a",
|
||
"aac",
|
||
"-b:a",
|
||
"192k",
|
||
"-movflags",
|
||
"+faststart",
|
||
str(normalized_path),
|
||
]
|
||
|
||
result = subprocess.run(loudnorm_cmd, capture_output=True, text=True)
|
||
if result.returncode != 0:
|
||
raise PreprocessError(
|
||
"Loudness normalization failed",
|
||
filter_type="loudnorm",
|
||
command=" ".join(loudnorm_cmd),
|
||
stderr=result.stderr,
|
||
)
|
||
|
||
# Replace original with normalized version
|
||
output_path.unlink()
|
||
normalized_path.rename(output_path)
|
||
print(" Loudness normalization complete.")
|
||
|
||
# Clean up temp files
|
||
for segment in trimmed_segments:
|
||
if segment.parent == temp_dir and segment.exists():
|
||
segment.unlink()
|
||
concat_list.unlink()
|
||
try:
|
||
temp_dir.rmdir()
|
||
except OSError:
|
||
pass
|
||
|
||
total_duration = get_video_duration(output_path)
|
||
print(f" Stitched duration: {format_time(total_duration)}")
|
||
|
||
return output_path
|