"""Preprocessing stage: apply filters to source videos.""" import os import subprocess import sys from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed from pathlib import Path from typing import Any, Optional import shutil from .errors import PreprocessError from .models import ( VideoSource, ChromaKeyConfig, ColorGradeConfig, GnommoKeyConfig, AudioNormalizeConfig, EQBand, ) from typing import Union, Optional # Number of parallel workers for chunk processing DEFAULT_CHUNK_WORKERS = 4 # Chunk duration in seconds for parallel filter processing (avoids huge intermediate files) CHUNK_DURATION = 60 # Resolution presets for preview/proxy workflow # Each entry: (width, height, subdir_name) RES_CONFIGS: dict[str, tuple[int, int, str] | None] = { "full": None, # no downscale, no subdir "low": (490, 270, "low"), "tiny": (320, 180, "proxy"), # "proxy" subdir kept for backward compat } # Keep legacy constants pointing at "tiny" values PROXY_WIDTH, PROXY_HEIGHT = RES_CONFIGS["tiny"][:2] # type: ignore[index] def get_video_duration(video_path: Path) -> float: """Get duration of a video file using ffprobe.""" cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", str(video_path), ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: return 0.0 try: return float(result.stdout.strip()) except ValueError: return 0.0 def _video_has_alpha(video_path: Path) -> bool: """Check if a video file has an alpha channel.""" cmd = [ "ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=pix_fmt", "-of", "default=noprint_wrappers=1:nokey=1", str(video_path), ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: return False pix_fmt = result.stdout.strip() # Pixel formats with alpha contain 'a' (yuva, rgba, bgra, etc.) return "yuva" in pix_fmt or "rgba" in pix_fmt or "bgra" in pix_fmt def format_time(seconds: float) -> str: """Format seconds as human-readable time string.""" if seconds < 60: return f"{int(seconds)}s" elif seconds < 3600: mins = int(seconds // 60) secs = int(seconds % 60) return f"{mins}m {secs}s" else: hours = int(seconds // 3600) mins = int((seconds % 3600) // 60) return f"{hours}h {mins}m" def create_downscaled_video( source_path: Path, out_dir: Path, width: int, height: int, force: bool = False, ) -> Path: """Downscale a video to the given resolution, preserving audio.""" out_dir.mkdir(parents=True, exist_ok=True) out_path = out_dir / source_path.name if out_path.exists() and not force: return out_path cmd = [ "ffmpeg", "-y", "-i", str(source_path), "-vf", f"scale={width}:{height}", "-c:v", "libx264", "-preset", "ultrafast", "-crf", "28", "-c:a", "copy", str(out_path), ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise PreprocessError( f"Failed to downscale {source_path.name} to {width}x{height}", filter_type="downscale", command=" ".join(cmd), stderr=result.stderr, ) return out_path # Keep legacy name as alias def create_proxy_video(source_path: Path, proxy_dir: Path, force: bool = False) -> Path: w, h, _ = RES_CONFIGS["tiny"] # type: ignore[misc] return create_downscaled_video(source_path, proxy_dir, w, h, force) def create_downscaled_videos( videos_dir: Path, videos: dict[str, VideoSource], res: str, force: bool = False, verbose: bool = False, ) -> Path: """ Create downscaled copies of all source videos for the given res preset. Returns the path to the output subdirectory. """ cfg = RES_CONFIGS[res] if cfg is None: return videos_dir # full res — no subdir width, height, subdir = cfg out_dir = videos_dir / subdir out_dir.mkdir(parents=True, exist_ok=True) source_files: set[str] = set(v.source_file for v in videos.values()) print(f" Creating {res} copies ({width}x{height})...") for source_file in sorted(source_files): source_path = videos_dir / source_file if not source_path.exists(): if verbose: print(f" Skipping {source_file} (not found)") continue out_path = out_dir / source_file if out_path.exists() and not force: if verbose: print(f" {source_file}: exists, skipping") continue print(f" {source_file}...", end=" ", flush=True) create_downscaled_video(source_path, out_dir, width, height, force) print("done") return out_dir # Keep legacy name as alias def create_proxies_for_videos( videos_dir: Path, videos: dict[str, VideoSource], force: bool = False, verbose: bool = False, ) -> Path: return create_downscaled_videos(videos_dir, videos, "tiny", force, verbose) def ensure_downscaled_files_exist( source_dir: Path, res: str, force: bool = False, verbose: bool = False, skip_sources: set = None, ) -> Path: """ Ensure downscaled copies exist for all videos in source_dir for the given res preset. Creates them on-the-fly if missing. Returns the output subdirectory. skip_sources: optional set of source filenames to skip (e.g. files that have a preprocessed output_file, where the full-res processed version will be used instead). """ cfg = RES_CONFIGS[res] if cfg is None: return source_dir width, height, subdir = cfg video_extensions = {".mov", ".mp4", ".webm", ".avi", ".mkv", ".m4v"} out_dir = source_dir / subdir out_dir.mkdir(parents=True, exist_ok=True) video_files = [ f for f in source_dir.iterdir() if f.is_file() and f.suffix.lower() in video_extensions and "_processed" not in f.stem and not f.name.startswith(".") and (skip_sources is None or f.name not in skip_sources) ] if not video_files: if verbose: print(f" No video files found in {source_dir}") return out_dir missing = [f for f in video_files if not (out_dir / f.name).exists() or force] if not missing: if verbose: print(f" All {res} copies exist in {out_dir}") return out_dir print(f" Creating {len(missing)} {res} file(s) ({width}x{height})...") for video_file in missing: print(f" {video_file.name}...", end=" ", flush=True) create_downscaled_video(video_file, out_dir, width, height, force=True) print("done") return out_dir # Keep legacy name as alias def ensure_proxy_files_exist( source_dir: Path, force: bool = False, verbose: bool = False, ) -> Path: return ensure_downscaled_files_exist(source_dir, "tiny", force, verbose) import selectors, time, sys, subprocess def run_ffmpeg_with_progress(cmd, duration, description="Processing"): cmd = cmd.copy() insert_pos = cmd.index("-y") + 1 if "-y" in cmd else 1 cmd[insert_pos:insert_pos] = [ "-progress", "pipe:1", "-nostats", "-loglevel", "warning", ] p = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, universal_newlines=True, ) sel = selectors.DefaultSelector() sel.register(p.stdout, selectors.EVENT_READ) bar_width = 30 start_time = time.time() last_update = time.time() last_percent = 0 seen_any_progress = False last_log_line = "" logs = [] def draw(percent, suffix=""): filled = int(bar_width * percent / 100) bar = "█" * filled + "░" * (bar_width - filled) sys.stdout.write( f"\r {description}: [{bar}] {percent:3d}% {suffix} " ) sys.stdout.flush() draw(0, "Initializing...") while True: # If process ended and no more output, break if p.poll() is not None: # drain any remaining output quickly while True: line = p.stdout.readline() if not line: break logs.append(line) break events = sel.select(timeout=0.2) if not events: if not seen_any_progress: # Show elapsed time and last FFmpeg output line during init elapsed = time.time() - start_time hint = f" | {last_log_line[:50]}" if last_log_line else "" draw(0, f"Initializing... ({elapsed:.0f}s){hint}") elif ( seen_any_progress and last_percent >= 99 and (time.time() - last_update) > 1.0 ): draw(last_percent, "Finalizing...") continue for key, _ in events: line = key.fileobj.readline() if not line: continue logs.append(line) # Track last non-empty, non-progress-key line for init diagnostics stripped = line.strip() if stripped and "=" not in stripped: last_log_line = stripped if line.startswith("out_time_ms="): val = line.split("=", 1)[1].strip() if val != "N/A": try: t_ms = int(val) t_s = t_ms / 1_000_000 percent = ( min(99, int((t_s / duration) * 100)) if duration > 0 else 0 ) last_percent = max(last_percent, percent) last_update = time.time() seen_any_progress = True draw(last_percent, "") except ValueError: pass # Completion if p.returncode == 0: draw(100, "Done\n") else: code = p.returncode # On macOS/Linux, -9 means SIGKILL (OOM kill by OS), -6 = SIGABRT signal_hint = ( " (OOM kill)" if code == -9 else (" (abort)" if code == -6 else "") ) sys.stdout.write(f"\n FFmpeg exited with code {code}{signal_hint}\n") sys.stdout.flush() return subprocess.CompletedProcess( cmd, p.returncode, stdout="", stderr="".join(logs) ) def _has_audio_stream(video_path: Path) -> bool: """Return True if the file has a real (non-ghost) audio stream.""" result = subprocess.run( [ "ffprobe", "-v", "error", "-analyzeduration", "0", "-probesize", "1000000", "-select_streams", "a:0", "-show_entries", "stream=index,nb_frames", "-of", "csv=p=0", str(video_path), ], capture_output=True, text=True, ) output = result.stdout.strip() if not output: return False parts = output.split(",") if len(parts) >= 2 and parts[1].strip() == "0": return False # Ghost audio track — header present but no sample data return True def check_audio_channel_silent( input_path: Path, channel: str, threshold_db: float = -60.0 ) -> tuple[bool, float]: """ Quick check whether the specified audio channel is silent. Uses ffmpeg volumedetect (audio-only pass, much faster than full processing). Returns (is_silent, max_volume_db). """ pan = "pan=mono|c0=c0" if channel == "left" else "pan=mono|c0=c1" cmd = [ "ffmpeg", "-i", str(input_path), "-af", f"{pan},volumedetect", "-f", "null", "/dev/null", ] result = subprocess.run(cmd, capture_output=True, text=True) for line in result.stderr.splitlines(): if "max_volume:" in line: try: max_vol = float(line.split("max_volume:")[1].strip().replace(" dB", "")) return max_vol < threshold_db, max_vol except ValueError: pass return False, 0.0 def _resolve_auto_channel(input_path: Path, threshold_db: float = -60.0) -> str: """ Detect which audio channels have signal and return the appropriate channel setting. Logic: - One channel silent, the other not → return the active channel ("left" or "right") - Both channels have signal → return "both" """ left_silent, _ = check_audio_channel_silent(input_path, "left", threshold_db) right_silent, _ = check_audio_channel_silent(input_path, "right", threshold_db) if left_silent and not right_silent: return "right" if right_silent and not left_silent: return "left" return "both" def detect_silence_bounds( input_path: Path, noise_threshold_db: float = -40.0, min_silence_duration: float = 0.3, verbose: bool = False, ) -> tuple[float, float]: """ Detect when audio content starts and ends in a file. Uses FFmpeg's silencedetect filter to find the first and last non-silent moments. Useful for automatically computing skip/take values. Two common preamble shapes are handled: - File starts with silence → first_sound = end of that silence. - File starts with noise (e.g. clothing rustle) followed by a brief quiet gap before speech → first_sound = end of that first gap. Args: input_path: Video or audio file to analyse. noise_threshold_db: dB level below which audio is considered silent. Raise (e.g. -25) to treat low-level noise like clothing rustle as silence. min_silence_duration: Minimum gap length (seconds) that counts as silence. Shorter gaps are ignored. verbose: Print detected silence periods for debugging. Returns: (first_sound_time, last_sound_time) in seconds. first_sound_time — when the first meaningful sound begins. last_sound_time — when the last meaningful sound ends. """ total_duration = get_video_duration(input_path) cmd = [ "ffmpeg", "-i", str(input_path), "-af", f"silencedetect=noise={noise_threshold_db}dB:duration={min_silence_duration}", "-f", "null", "/dev/null", ] result = subprocess.run(cmd, capture_output=True, text=True) # Parse silence_start / silence_end lines from stderr silence_periods: list[tuple[float, float]] = [] pending_start: float | None = None for line in result.stderr.splitlines(): if "silence_start:" in line: try: pending_start = float(line.split("silence_start:")[1].strip()) except ValueError: pass elif "silence_end:" in line and pending_start is not None: try: end_t = float(line.split("silence_end:")[1].split("|")[0].strip()) silence_periods.append((pending_start, end_t)) pending_start = None except ValueError: pass # File ended while still in silence — close the period at total_duration if pending_start is not None: silence_periods.append((pending_start, total_duration)) if verbose: print(f"\n silence periods ({len(silence_periods)}):") for s, e in silence_periods: print(f" {s:.3f}s – {e:.3f}s") # --- First sound --- # Take the end of the FIRST silence period found in the preamble window # (first 60 s). This handles both: # • file starts with silence → silence[0].start ≈ 0 # • file starts with noise (crumpling etc.) then has a brief quiet gap # before speech → silence[0].start > 0 # If no silence is found at all the whole file is assumed to be content. PREAMBLE_LIMIT = 60.0 first_sound = 0.0 for s_start, s_end in silence_periods: if s_start < PREAMBLE_LIMIT: first_sound = s_end break # --- Last sound --- # Where the trailing silence begins (if the file ends with silence). last_sound = total_duration if silence_periods and silence_periods[-1][1] >= total_duration - 0.05: last_sound = silence_periods[-1][0] return first_sound, last_sound def preprocess_video( videos_dir: Path, video_id: str, video_source: VideoSource, verbose: bool = False, force: bool = False, custom_gnommo_scratch: Optional[Path] = None, res: str = "full", ) -> Path: """ Apply preprocessing filters to a video source. Video filters (chroma_key, mask) are combined into single FFmpeg passes for efficiency. Non-video filters (transcribe) are handled separately. Args: videos_dir: Directory containing videos.json and video files video_id: ID of the video being processed video_source: VideoSource with source_file, filter, and output_file custom_gnommo_scratch: Optional external directory for intermediate files (e.g., SSD) res: Resolution preset — when not "full", source is downscaled before filtering Returns: Path to the final preprocessed output file. """ if not video_source.filter: # No filters defined, return original file return videos_dir / video_source.source_file # Use custom intermediate dir if provided, otherwise default to videos_dir/intermediate if custom_gnommo_scratch: gnommo_scratch = custom_gnommo_scratch / video_id else: gnommo_scratch = videos_dir / "intermediate" gnommo_scratch.mkdir(parents=True, exist_ok=True) # Start with the source file (relative to videos_dir) current_input = videos_dir / video_source.source_file if not current_input.exists(): raise PreprocessError( f"Source video not found: {current_input}", filter_type=None, ) # For non-full res, downscale the raw source first so all subsequent # filters (chroma key, color grade, etc.) operate on the small file. if res != "full": cfg = RES_CONFIGS.get(res) if cfg: width, height, _ = cfg print(f" Downscaling source to {width}x{height} ({res})...") raw_low_dir = gnommo_scratch / f"raw_{res}" current_input = create_downscaled_video( current_input, raw_low_dir, width, height, force ) # Resolve channel setting (auto-detect if needed) and sanity check channel = video_source.use_audio_channels if channel == "auto": channel = _resolve_auto_channel(current_input) print(f" Auto channel detection: using '{channel}'") elif channel in ("left", "right"): is_silent, max_vol = check_audio_channel_silent(current_input, channel) if is_silent: raise PreprocessError( f"Audio channel '{channel}' is silent (max_volume={max_vol:.1f} dB). " f"Wrong microphone channel selected?", filter_type="audio_check", ) # Track intermediate files for cleanup intermediate_files: list[Path] = [] # Video filter types that can be combined in a single FFmpeg pass VIDEO_FILTER_TYPES = {"chroma_key", "mask", "color_grade", "gnommokey"} # Group consecutive video filters into batches filter_batches: list[list[dict]] = [] current_batch: list[dict] = [] for filter_config in video_source.filter: filter_type = filter_config.get("type") if filter_type in VIDEO_FILTER_TYPES: current_batch.append(filter_config) else: # Non-video filter breaks the batch if current_batch: filter_batches.append(current_batch) current_batch = [] # Add non-video filter as its own "batch" filter_batches.append([filter_config]) # Don't forget the last batch if current_batch: filter_batches.append(current_batch) # Process each batch batch_num = 0 for batch in filter_batches: first_filter_type = batch[0].get("type") if first_filter_type in VIDEO_FILTER_TYPES: # Combined video filter batch - use chunked processing for large files filter_names = "+".join(f.get("type") for f in batch) print(f" Video filters (combined): {filter_names}") # Output to WebM (compressed with alpha) instead of ProRes step_output = gnommo_scratch / f"{video_id}_batch{batch_num}.mov" intermediate_files.append(step_output) # Note: skip/take are NOT applied here - they're only used during concatenation apply_combined_video_filters_chunked( current_input, step_output, batch, verbose, take=None, scratch_dir=gnommo_scratch / "chunks", ) current_input = step_output batch_num += 1 elif first_filter_type == "transcribe": # Transcribe doesn't transform video print(" Filter: transcribe") apply_transcribe(current_input, batch[0], verbose, force) elif first_filter_type == "audio_normalize": # Audio normalization: denoise, compress, and normalize loudness # Note: skip/take are NOT applied here - they're only used during concatenation print(" Filter: audio_normalize") if not _has_audio_stream(current_input): raise PreprocessError( f"audio_normalize requires an audio stream, but '{current_input.name}' has none.\n" f" Check that the source file has audio, or remove audio_normalize from the filter list.", filter_type="audio_normalize", command="", stderr="", ) step_output = gnommo_scratch / f"{video_id}_batch{batch_num}_audio.mov" intermediate_files.append(step_output) apply_audio_normalize( current_input, step_output, batch[0], verbose, take=None, use_audio_channels=channel, skip_loudnorm=video_source.defer_loudnorm, ) current_input = step_output batch_num += 1 else: raise PreprocessError( f"Unknown filter type: {first_filter_type}", filter_type=first_filter_type, ) # If output_file is specified, copy/rename to final location and clean up if video_source.output_file: import shutil final_output = videos_dir / video_source.output_file final_output.parent.mkdir(parents=True, exist_ok=True) # Copy the final intermediate to the output location shutil.copy2(current_input, final_output) if verbose: print(f" Final output: {final_output}") # Clean up intermediate files for intermediate_file in intermediate_files: if intermediate_file.exists(): intermediate_file.unlink() if verbose: print(f" Removed intermediate: {intermediate_file.name}") # Remove intermediate directory if empty try: gnommo_scratch.rmdir() except OSError: pass # Directory not empty (other videos may have intermediates) return final_output # No output_file specified, return current processed file return current_input def apply_combined_video_filters( input_path: Path, output_path: Path, filters: list[dict], verbose: bool = False, take: float = None, ) -> None: """ Apply multiple video filters in a single FFmpeg pass. Combines chroma_key, mask, and other video filters into one filter chain. """ filter_parts: list[str] = [] for filter_config in filters: filter_type = filter_config.get("type") if filter_type == "chroma_key": filter_parts.append(build_chroma_key_filter(filter_config)) elif filter_type == "mask": filter_parts.append(build_mask_filter(filter_config)) elif filter_type == "color_grade": filter_parts.append(build_color_grade_filter(filter_config)) elif filter_type == "gnommokey": filter_parts.append(build_gnommokey_filter(filter_config)) video_filter = ",".join(filter_parts) # Build FFmpeg command cmd = ["ffmpeg", "-y"] if take is not None: cmd.extend(["-t", str(take)]) cmd.extend( [ "-i", str(input_path), "-vf", video_filter, "-c:v", "prores_ks", "-profile:v", "4", # ProRes 4444 "-pix_fmt", "yuva444p10le", # 10-bit with alpha "-c:a", "pcm_s16le", # Lossless audio str(output_path), ] ) if verbose: print(f" Combined filter: {video_filter}") print(f" Command: {' '.join(cmd)}") # Get duration for progress bar duration = take if take is not None else get_video_duration(input_path) result = run_ffmpeg_with_progress(cmd, duration, "Processing") if result.returncode != 0: raise PreprocessError( "Combined video filter failed", filter_type="combined", command=" ".join(cmd), stderr=result.stderr, ) def build_chroma_key_filter(config: dict) -> str: """Build FFmpeg chromakey filter string from config.""" chroma_config = parse_chroma_key_config(config) r, g, b = chroma_config.color hex_color = f"0x{r:02x}{g:02x}{b:02x}" parts = [ f"chromakey={hex_color}:{chroma_config.similarity:.3f}:{chroma_config.blend:.3f}" ] if chroma_config.spill > 0: parts.append(f"despill=type=green:mix={chroma_config.spill:.3f}") # Edge erosion: shrink alpha mask to remove green fringe # Uses erosion filter targeting only alpha channel (plane 3) # threshold0-2=65535 means Y/U/V unchanged, threshold3=0 erodes alpha if chroma_config.edge_erode > 0: erode_passes = min(chroma_config.edge_erode, 5) # Cap at 5 passes parts.append("format=yuva444p") for _ in range(erode_passes): parts.append( "erosion=threshold0=65535:threshold1=65535:threshold2=65535:threshold3=0" ) # Color protection: restore alpha for pixels matching protected color # This runs AFTER chromakey/despill/erosion to restore any incorrectly keyed pixels if chroma_config.protect_color: pr, pg, pb = chroma_config.protect_color # Convert tolerance from 0-1 range to pixel range (0-255) tol = int(chroma_config.protect_tolerance * 255) # Ensure we're in RGBA for geq to work with r/g/b/alpha functions parts.append("format=rgba") # Build condition: pixel RGB is within tolerance of protected color # between(value, min, max) returns 1 if min <= value <= max # Multiply conditions together for AND logic condition = ( f"between(r(X,Y),{max(0, pr-tol)},{min(255, pr+tol)})*" f"between(g(X,Y),{max(0, pg-tol)},{min(255, pg+tol)})*" f"between(b(X,Y),{max(0, pb-tol)},{min(255, pb+tol)})" ) # geq: if pixel matches protected color, set alpha to 255, else keep current alpha parts.append( f"geq=r='r(X,Y)':g='g(X,Y)':b='b(X,Y)':a='if({condition},255,alpha(X,Y))'" ) return ",".join(parts) def build_mask_filter(config: dict) -> str: """Build FFmpeg geq mask filter string from config.""" left = float(config.get("left", 0)) right = float(config.get("right", 0)) top = float(config.get("top", 0)) bottom = float(config.get("bottom", 0)) conditions = [] if left > 0: conditions.append(f"lt(X,W*{left})") if right > 0: conditions.append(f"gt(X,W*{1-right})") if top > 0: conditions.append(f"lt(Y,H*{top})") if bottom > 0: conditions.append(f"gt(Y,H*{1-bottom})") if not conditions: return "copy" # No-op filter alpha_expr = "+".join(conditions) alpha_expr = f"if({alpha_expr},0,alpha(X,Y))" return f"geq=lum='lum(X,Y)':cb='cb(X,Y)':cr='cr(X,Y)':a='{alpha_expr}'" def build_color_grade_filter(config: dict) -> str: """Build FFmpeg color grading filter string from config. Applies color balance, curves, and EQ adjustments while preserving alpha. The filter chain converts to RGBA for color operations, then back to yuva444p10le to preserve the alpha channel. """ grade_config = parse_color_grade_config(config) parts: list[str] = [] # Start with format conversion to RGBA for color operations parts.append("format=rgba") # Color balance (only add if any value is non-zero) colorbalance_parts = [] if grade_config.rs != 0: colorbalance_parts.append(f"rs={grade_config.rs:.3f}") if grade_config.gs != 0: colorbalance_parts.append(f"gs={grade_config.gs:.3f}") if grade_config.bs != 0: colorbalance_parts.append(f"bs={grade_config.bs:.3f}") if grade_config.rm != 0: colorbalance_parts.append(f"rm={grade_config.rm:.3f}") if grade_config.gm != 0: colorbalance_parts.append(f"gm={grade_config.gm:.3f}") if grade_config.bm != 0: colorbalance_parts.append(f"bm={grade_config.bm:.3f}") if grade_config.rh != 0: colorbalance_parts.append(f"rh={grade_config.rh:.3f}") if grade_config.gh != 0: colorbalance_parts.append(f"gh={grade_config.gh:.3f}") if grade_config.bh != 0: colorbalance_parts.append(f"bh={grade_config.bh:.3f}") if colorbalance_parts: parts.append(f"colorbalance={':'.join(colorbalance_parts)}") # Curves preset (if specified) if grade_config.curves_preset and grade_config.curves_preset != "none": parts.append(f"curves=preset={grade_config.curves_preset}") # EQ adjustments (only add if different from defaults) eq_parts = [] if grade_config.contrast != 1.0: eq_parts.append(f"contrast={grade_config.contrast:.3f}") if grade_config.brightness != 0.0: eq_parts.append(f"brightness={grade_config.brightness:.3f}") if grade_config.saturation != 1.0: eq_parts.append(f"saturation={grade_config.saturation:.3f}") if eq_parts: parts.append(f"eq={':'.join(eq_parts)}") # Custom curves (if specified) custom_curves = [] if grade_config.curves_r: custom_curves.append(f"r='{grade_config.curves_r}'") if grade_config.curves_g: custom_curves.append(f"g='{grade_config.curves_g}'") if grade_config.curves_b: custom_curves.append(f"b='{grade_config.curves_b}'") if grade_config.curves_master: custom_curves.append(f"master='{grade_config.curves_master}'") if custom_curves: parts.append(f"curves={':'.join(custom_curves)}") # Convert back to yuva444p10le to preserve alpha for downstream filters parts.append("format=yuva444p10le") return ",".join(parts) def parse_color_grade_config(config: dict) -> ColorGradeConfig: """Parse a color grade config dictionary into ColorGradeConfig.""" return ColorGradeConfig( # Shadows rs=float(config.get("rs", 0.0)), gs=float(config.get("gs", 0.0)), bs=float(config.get("bs", 0.0)), # Midtones rm=float(config.get("rm", 0.0)), gm=float(config.get("gm", 0.0)), bm=float(config.get("bm", 0.0)), # Highlights rh=float(config.get("rh", 0.0)), gh=float(config.get("gh", 0.0)), bh=float(config.get("bh", 0.0)), # Curves preset curves_preset=config.get("curves_preset", "none"), # EQ contrast=float(config.get("contrast", 1.0)), brightness=float(config.get("brightness", 0.0)), saturation=float(config.get("saturation", 1.0)), # Custom curves curves_r=config.get("curves_r", ""), curves_g=config.get("curves_g", ""), curves_b=config.get("curves_b", ""), curves_master=config.get("curves_master", ""), ) def build_gnommokey_filter(config: dict) -> str: """Build FFmpeg gnommokey filter string - Keylight-style color-difference keyer. Uses YCbCr color-difference keying algorithm: - For green screen: key signal = (Cb - Cr), high values = green - screen_gain scales the key extraction strength - screen_balance mixes luminance into the key calculation - clip_black/clip_white compress the matte range - despill shifts green spill toward the bias color """ cfg = parse_gnommokey_config(config) parts: list[str] = [] # Get screen color RGB values sr, sg, sb = cfg.screen_color # Determine if this is green or blue screen based on RGB dominance # Green screen: G is the highest channel # Blue screen: B is the highest channel is_green_screen = sg >= sb # Work in RGBA space for RGB-based color difference keying parts.append("format=rgba") # Build the alpha calculation expression gain = cfg.screen_gain / 100.0 balance = cfg.screen_balance / 100.0 # RGB-based color-difference key calculation: # For green screen: key = G - max(R, B) → measures "greenness" # For blue screen: key = B - max(R, G) → measures "blueness" # This is more reliable than YCbCr for screens that aren't pure colors if is_green_screen: # Green screen: how much does G exceed the stronger of R or B? key_signal = "max(0,g(X,Y)-max(r(X,Y),b(X,Y)))" else: # Blue screen: how much does B exceed the stronger of R or G? key_signal = "max(0,b(X,Y)-max(r(X,Y),g(X,Y)))" # Apply screen_balance: mix in luminance-based keying # At balance=0: pure color difference # At balance=1: luminance contributes (pixels matching screen luma key more) screen_y = int(0.299 * sr + 0.587 * sg + 0.114 * sb) if balance > 0: # Luma similarity: boost keying for pixels with similar luminance to screen # This helps key darker/lighter greens that might otherwise be missed luma_expr = f"(0.299*r(X,Y)+0.587*g(X,Y)+0.114*b(X,Y))" luma_boost = f"(1+{balance:.2f}*(1-abs({luma_expr}-{screen_y})/128))" key_expr = f"({key_signal})*{luma_boost}" else: key_expr = f"({key_signal})" # Apply gain: screen_gain of 100 = 1.0, 126 = 1.26 # For typical green screen, G-max(R,B) ranges 0-150 # Scale factor maps this to 0-255 range scale_factor = gain * 2.5 key_expr = f"({key_expr})*{scale_factor:.3f}" # Apply clip_black and clip_white to compress the matte # clip_black: key values below this become 0 (those pixels stay opaque) # clip_white: key values above this become 255 (fully transparent) # Default 0/100 means: 0-255 maps to 0-255 (no change) clip_b = cfg.clip_black * 2.55 # Convert 0-100 to 0-255 clip_w = cfg.clip_white * 2.55 if clip_w > clip_b: # Remap the range [clip_b, clip_w] to [0, 255] range_scale = 255.0 / (clip_w - clip_b) key_expr = f"clip(({key_expr}-{clip_b:.1f})*{range_scale:.3f},0,255)" else: key_expr = f"clip({key_expr},0,255)" # Invert: high key value (green) = low alpha (transparent) alpha_expr = f"255-{key_expr}" # Build the geq filter for alpha (in RGBA mode) parts.append(f"geq=r='r(X,Y)':g='g(X,Y)':b='b(X,Y)':a='{alpha_expr}'") # Despill: shift green/blue spill toward the bias color if cfg.despill_bias and cfg.despill_strength > 0: # Already in RGBA format br, bg, bb = cfg.despill_bias strength = cfg.despill_strength if is_green_screen: # Green spill: G exceeds max(R, B) spill_expr = "max(0,g(X,Y)-max(r(X,Y),b(X,Y)))" else: # Blue spill: B exceeds max(R, G) spill_expr = "max(0,b(X,Y)-max(r(X,Y),g(X,Y)))" # Lerp factor based on spill amount factor_expr = f"({spill_expr}/255*{strength:.2f})" # Shift colors toward the bias new_r = f"clip(r(X,Y)+({br}-r(X,Y))*{factor_expr},0,255)" new_g = f"clip(g(X,Y)+({bg}-g(X,Y))*{factor_expr},0,255)" new_b = f"clip(b(X,Y)+({bb}-b(X,Y))*{factor_expr},0,255)" parts.append(f"geq=r='{new_r}':g='{new_g}':b='{new_b}':a='alpha(X,Y)'") # Edge-aware despill: aggressively suppress green at semi-transparent edges # This targets the 2-4px green fringe that regular despill misses # edge_factor is high (1.0) at alpha=128, low (0) at alpha=0 or 255 # At edges: cap G so it never exceeds max(R, B) if is_green_screen: # Edge factor: peaks at alpha=128, falls off toward 0 and 255 # Using min(alpha, 255-alpha)/128 gives smooth 0→1→0 curve edge_factor = "min(alpha(X,Y),255-alpha(X,Y))/128" # Green excess at this pixel green_excess = "max(0,g(X,Y)-max(r(X,Y),b(X,Y)))" # Suppress green proportionally to edge_factor # At edges: G = G - excess (caps G to max(R,B)) # At interior: G unchanged new_g = f"clip(g(X,Y)-({green_excess})*({edge_factor}),0,255)" parts.append(f"geq=r='r(X,Y)':g='{new_g}':b='b(X,Y)':a='alpha(X,Y)'") else: # Blue screen edge despill edge_factor = "min(alpha(X,Y),255-alpha(X,Y))/128" blue_excess = "max(0,b(X,Y)-max(r(X,Y),g(X,Y)))" new_b = f"clip(b(X,Y)-({blue_excess})*({edge_factor}),0,255)" parts.append(f"geq=r='r(X,Y)':g='g(X,Y)':b='{new_b}':a='alpha(X,Y)'") # Edge erosion: shrink alpha channel to remove green fringe # threshold=0 means "don't change", threshold=65535 means "full erosion" # We want to erode only the alpha channel (plane 3), leave RGB unchanged if cfg.edge_erode > 0: erode_passes = min(cfg.edge_erode, 5) for _ in range(erode_passes): parts.append( "erosion=threshold0=0:threshold1=0:threshold2=0:threshold3=65535" ) # Edge softening (blur the alpha) if cfg.edge_soften > 0: # Use gblur on alpha channel only via format manipulation # First extract to a format where we can blur, then re-merge # Simpler approach: use avgblur with small radius radius = min(int(cfg.edge_soften), 5) if radius > 0: parts.append(f"alphaextract,avgblur=sizeX={radius}:sizeY={radius}[blur]") # This gets complex - for now, skip alpha blur and just use erosion # Ensure output is in a good format parts.append("format=yuva444p10le") return ",".join(parts) def parse_gnommokey_config(config: dict) -> GnommoKeyConfig: """Parse a gnommokey config dictionary into GnommoKeyConfig.""" # Parse screen_color screen_color = config.get("screen_color", [0, 177, 64]) if isinstance(screen_color, list) and len(screen_color) == 3: screen_color = tuple(screen_color) else: screen_color = (0, 177, 64) # Parse despill_bias despill_bias = config.get("despill_bias") if despill_bias: if isinstance(despill_bias, list) and len(despill_bias) == 3: despill_bias = tuple(despill_bias) else: despill_bias = None # Parse alpha_bias alpha_bias = config.get("alpha_bias") if alpha_bias: if isinstance(alpha_bias, list) and len(alpha_bias) == 3: alpha_bias = tuple(alpha_bias) else: alpha_bias = None return GnommoKeyConfig( screen_color=screen_color, screen_gain=float(config.get("screen_gain", 100.0)), screen_balance=float(config.get("screen_balance", 50.0)), clip_black=float(config.get("clip_black", 0.0)), clip_white=float(config.get("clip_white", 100.0)), despill_bias=despill_bias, despill_strength=float(config.get("despill_strength", 0.5)), alpha_bias=alpha_bias, edge_erode=int(config.get("edge_erode", 0)), edge_soften=float(config.get("edge_soften", 0.0)), ) def apply_combined_video_filters_chunked( input_path: Path, output_path: Path, filters: list[dict], verbose: bool = False, take: float = None, scratch_dir: Path = None, ) -> None: """ Apply video filters using chunk-based processing for large files. For videos longer than CHUNK_DURATION: 1. Split into chunks 2. Process each chunk with filters 3. Encode to ProRes 4444 with alpha 4. Concatenate chunks into final output Chunking allows parallel processing and avoids huge intermediate files. """ duration = take if take is not None else get_video_duration(input_path) # Short video: process directly without chunking if duration <= CHUNK_DURATION: _process_chunk_to_prores4444( input_path, output_path, filters, 0, duration, verbose, take, True ) return # Long video: process in chunks (parallel) if scratch_dir is None: scratch_dir = output_path.parent / "chunks" scratch_dir.mkdir(parents=True, exist_ok=True) num_chunks = int(duration / CHUNK_DURATION) + 1 chunk_files: list[Path] = [] chunk_tasks: list[tuple] = [] # (index, chunk_path, start_time, chunk_duration) # Build list of chunk tasks for i in range(num_chunks): start_time = i * CHUNK_DURATION chunk_duration = min(CHUNK_DURATION, duration - start_time) if chunk_duration <= 0: break chunk_path = scratch_dir / f"chunk_{i:04d}.mov" chunk_files.append(chunk_path) chunk_tasks.append((i, chunk_path, start_time, chunk_duration)) num_workers = min(DEFAULT_CHUNK_WORKERS, len(chunk_tasks)) print( f" Processing {len(chunk_tasks)} chunks in parallel ({num_workers} workers)" ) # Process chunks in parallel def process_chunk_task(task): i, chunk_path, start_time, chunk_dur = task _process_chunk_to_prores4444( input_path, chunk_path, filters, start_time, chunk_dur, verbose=False, # Suppress verbose in parallel mode take=chunk_dur, ) return i, chunk_path completed = 0 with ThreadPoolExecutor(max_workers=num_workers) as executor: futures = { executor.submit(process_chunk_task, task): task for task in chunk_tasks } for future in as_completed(futures): i, chunk_path = future.result() completed += 1 print( f" Completed chunk {i+1}/{len(chunk_tasks)} ({completed}/{len(chunk_tasks)} done)" ) # Concatenate chunks into final output concat_list = scratch_dir / "concat.txt" with open(concat_list, "w") as cf: for chunk_path in chunk_files: cf.write(f"file '{chunk_path.resolve()}'\n") if verbose: print(f" Concatenating {len(chunk_files)} chunks → {output_path.name}") concat_cmd = [ "ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", str(concat_list), "-c", "copy", str(output_path), ] concat_result = run_ffmpeg_with_progress(concat_cmd, duration, "Concatenating") if concat_result.returncode != 0: raise PreprocessError( "Chunk concatenation failed", filter_type="concat", command=" ".join(concat_cmd), stderr=concat_result.stderr, ) # Clean up chunk files and concat list for chunk_path in chunk_files: if chunk_path.exists(): chunk_path.unlink() concat_list.unlink(missing_ok=True) # Remove chunks directory if empty try: scratch_dir.rmdir() except OSError: pass def _process_chunk_to_prores4444( input_path: Path, output_path: Path, filters: list[dict], start_time: float, chunk_duration: float, verbose: bool = False, take: float = None, keep_audio: bool = True, ) -> None: """ Process a video chunk with filters and encode to ProRes 4444 (MOV) with alpha. This is intended as an intermediate format for compositing: - true alpha channel (non-binary edges) - 4:4:4 chroma (better key edges than 4:2:0) - robust for concatenation and further filtering """ filter_parts: list[str] = [] for filter_config in filters: filter_type = filter_config.get("type") if filter_type == "chroma_key": filter_parts.append(build_chroma_key_filter(filter_config)) elif filter_type == "mask": filter_parts.append(build_mask_filter(filter_config)) elif filter_type == "color_grade": filter_parts.append(build_color_grade_filter(filter_config)) elif filter_type == "gnommokey": filter_parts.append(build_gnommokey_filter(filter_config)) video_filter = ",".join(filter_parts) # Ensure we end in an alpha-capable pixel format. # 10-bit 4:4:4 + alpha is ideal for keyed edges. if video_filter: video_filter += ",format=yuva444p10le" else: video_filter = "format=yuva444p10le" # Build FFmpeg command cmd: list[str] = ["ffmpeg", "-y"] # Seek to start time (before input for fast seeking) if start_time > 0: cmd.extend(["-ss", str(start_time)]) cmd.extend(["-i", str(input_path)]) # Limit duration actual_take = take if take is not None else chunk_duration if actual_take is not None: cmd.extend(["-t", str(actual_take)]) # Video encode: ProRes 4444 with alpha cmd.extend( [ "-vf", video_filter, "-c:v", "prores_ks", "-profile:v", "4", # 4 = ProRes 4444 "-pix_fmt", "yuva444p10le", # must carry alpha "-vendor", "apl0", # optional; helps some NLEs tag as Apple ProRes ] ) # Audio handling (optional) if keep_audio: # PCM is the least surprising intermediate audio. # You can also do "-c:a copy" if your source audio codec is stable across chunks. cmd.extend(["-c:a", "pcm_s16le"]) else: cmd.append("-an") cmd.append(str(output_path)) if verbose: print(f" Filter: {video_filter}") print(f" Command: {' '.join(cmd)}") result = run_ffmpeg_with_progress(cmd, actual_take or chunk_duration, "Encoding") if result.returncode != 0: raise PreprocessError( "Chunk processing failed", filter_type="chunk", command=" ".join(cmd), stderr=result.stderr, ) # Validate the output file is a readable MOV (moov atom present). # FFmpeg can return 0 but write a corrupt/incomplete file (e.g. moov atom # missing) when faststart rewrite fails or disk is under pressure. probe = subprocess.run( [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "csv=p=0", str(output_path), ], capture_output=True, text=True, ) if probe.returncode != 0 or not probe.stdout.strip(): raise PreprocessError( f"Chunk output file is unreadable or missing moov atom: {output_path.name}", filter_type="chunk", command=" ".join(cmd), stderr=probe.stderr, ) def _process_chunk_to_webm( input_path: Path, output_path: Path, filters: list[dict], start_time: float, chunk_duration: float, verbose: bool = False, take: float = None, ) -> None: """ Process a video chunk with filters and encode to VP9/WebM with alpha. VP9 with alpha uses ~10-20% of ProRes 4444 file size while maintaining good quality for compositing. """ filter_parts: list[str] = [] for filter_config in filters: filter_type = filter_config.get("type") if filter_type == "chroma_key": filter_parts.append(build_chroma_key_filter(filter_config)) elif filter_type == "mask": filter_parts.append(build_mask_filter(filter_config)) elif filter_type == "color_grade": filter_parts.append(build_color_grade_filter(filter_config)) elif filter_type == "gnommokey": filter_parts.append(build_gnommokey_filter(filter_config)) video_filter = ",".join(filter_parts) # Force output to yuva420p to preserve alpha channel through to encoder video_filter += ",format=yuva420p" # Build FFmpeg command for VP9 with alpha cmd = ["ffmpeg", "-y"] # Seek to start time (before input for fast seeking) if start_time > 0: cmd.extend(["-ss", str(start_time)]) cmd.extend(["-i", str(input_path)]) # Limit duration actual_take = take if take is not None else chunk_duration if actual_take is not None: cmd.extend(["-t", str(actual_take)]) cmd.extend( [ "-vf", video_filter, "-c:v", "libvpx-vp9", "-pix_fmt", "yuva420p", # VP9 with alpha "-auto-alt-ref", "0", # Required for alpha channel in VP9 "-crf", "25", # Quality (lower = better, 15-35 typical) "-b:v", "0", # Variable bitrate mode "-deadline", "good", # Encoding speed (good balance) "-cpu-used", "2", # Speed/quality tradeoff (0-5, lower = better) "-c:a", "libopus", # Opus audio codec "-b:a", "128k", str(output_path), ] ) if verbose: print(f" Filter: {video_filter}") print(f" Command: {' '.join(cmd)}") result = run_ffmpeg_with_progress(cmd, actual_take or chunk_duration, "Encoding") if result.returncode != 0: raise PreprocessError( "Chunk processing failed", filter_type="chunk", command=" ".join(cmd), stderr=result.stderr, ) def _concatenate_prores4444_chunks( chunk_files: list[Path], output_path: Path, verbose: bool = False, keep_audio: bool = False, ) -> None: """ Concatenate ProRes 4444 (MOV) chunks into a single ProRes 4444 output. Uses FFmpeg concat demuxer, then re-encodes once to ensure alpha and stream consistency across chunks. """ concat_list = output_path.parent / "concat_list.txt" output_path.parent.mkdir(parents=True, exist_ok=True) with open(concat_list, "w", encoding="utf-8") as f: for chunk in chunk_files: f.write(f"file '{chunk.resolve()}'\n") cmd: list[str] = [ "ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", str(concat_list), # Encode to ProRes 4444 with alpha "-c:v", "prores_ks", "-profile:v", "4", # ProRes 4444 "-pix_fmt", "yuva444p10le", # preserve alpha + best key edges "-vendor", "apl0", "-movflags", "+faststart", ] if keep_audio: # safest for intermediates; alternatively "-c:a copy" if identical across chunks cmd += ["-c:a", "pcm_s16le"] else: cmd += ["-an"] cmd.append(str(output_path)) if verbose: print(f" Concat list: {concat_list}") print(f" Command: {' '.join(cmd)}") result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise PreprocessError( "Chunk concatenation failed", filter_type="concat", command=" ".join(cmd), stderr=result.stderr, ) def _concatenate_webm_chunks( chunk_files: list[Path], output_path: Path, verbose: bool = False, ) -> None: """ Concatenate WebM chunks into a single output file. Uses FFmpeg's concat demuxer for lossless concatenation. """ # Create concat file list concat_list = output_path.parent / "concat_list.txt" with open(concat_list, "w") as f: for chunk in chunk_files: # FFmpeg concat format: file 'path' f.write(f"file '{chunk.resolve()}'\n") cmd = [ "ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", str(concat_list), "-c:v", "libvpx-vp9", "-pix_fmt", "yuva420p", # Stream copy (no re-encoding) str(output_path), ] if verbose: print(f" Concat list: {concat_list}") print(f" Command: {' '.join(cmd)}") result = subprocess.run(cmd, capture_output=True, text=True) # Clean up concat list concat_list.unlink() if result.returncode != 0: raise PreprocessError( "Chunk concatenation failed", filter_type="concat", command=" ".join(cmd), stderr=result.stderr, ) def apply_chroma_key( input_path: Path, output_path: Path, config: dict[str, Any], verbose: bool = False, take: float = None, ) -> None: """ Apply chroma key (green screen) filter using FFmpeg. Config options: color: [R, G, B] - Color to key out (default: [0, 255, 0] green) similarity: float - Color similarity threshold 0.0-1.0 (default: 0.4) blend: float - Edge blend/feathering 0.0-1.0 (default: 0.08) spill: float - Spill suppression 0.0-1.0 (default: 0.1) Args: take: Optional duration in seconds to limit processing (for quick iteration) Output is ProRes 4444 with alpha channel for lossless quality. """ # Parse config with defaults chroma_config = parse_chroma_key_config(config) # Convert RGB to hex format for FFmpeg r, g, b = chroma_config.color hex_color = f"0x{r:02x}{g:02x}{b:02x}" # Build FFmpeg chromakey filter # chromakey=color:similarity:blend # Using higher similarity to capture more green shades filter_parts = [ f"chromakey={hex_color}:{chroma_config.similarity:.3f}:{chroma_config.blend:.3f}" ] # Add despill to remove green spill on edges (always recommended for green screen) if chroma_config.spill > 0: filter_parts.append(f"despill=type=green:mix={chroma_config.spill:.3f}") video_filter = ",".join(filter_parts) # Build FFmpeg command # ProRes 4444 profile for alpha channel support cmd = [ "ffmpeg", "-y", # Overwrite output ] # Add duration limit if specified (before input for efficiency) if take is not None: cmd.extend(["-t", str(take)]) cmd.extend( [ "-i", str(input_path), "-vf", video_filter, "-c:v", "prores_ks", "-profile:v", "4", # ProRes 4444 "-pix_fmt", "yuva444p10le", # 10-bit with alpha "-c:a", "pcm_s16le", # Lossless audio str(output_path), ] ) if verbose: print(f" Filter: {video_filter}") if take: print(f" Duration limit: {take}s") print(f" Command: {' '.join(cmd)}") # Get duration for progress bar duration = take if take is not None else get_video_duration(input_path) result = run_ffmpeg_with_progress(cmd, duration, "Chroma key") if result.returncode != 0: raise PreprocessError( "Chroma key filter failed", filter_type="chroma_key", command=" ".join(cmd), stderr=result.stderr, ) def apply_mask( input_path: Path, output_path: Path, config: dict[str, Any], verbose: bool = False, take: float = None, ) -> None: """ Apply a mask to make edges transparent using FFmpeg. Config options: left: float - Percentage of left side to make transparent (0.0-1.0) right: float - Percentage of right side to make transparent (0.0-1.0) top: float - Percentage of top to make transparent (0.0-1.0) bottom: float - Percentage of bottom to make transparent (0.0-1.0) Args: take: Optional duration in seconds to limit processing Uses geq filter to set alpha channel to 0 for masked regions. """ left = float(config.get("left", 0)) right = float(config.get("right", 0)) top = float(config.get("top", 0)) bottom = float(config.get("bottom", 0)) # Build alpha expression for geq filter # Alpha is 255 (opaque) in the center, 0 (transparent) at edges # X < W*left -> transparent # X > W*(1-right) -> transparent # Y < H*top -> transparent # Y > H*(1-bottom) -> transparent conditions = [] if left > 0: conditions.append(f"lt(X,W*{left})") if right > 0: conditions.append(f"gt(X,W*{1-right})") if top > 0: conditions.append(f"lt(Y,H*{top})") if bottom > 0: conditions.append(f"gt(Y,H*{1-bottom})") if not conditions: # No masking needed, just copy import shutil shutil.copy2(input_path, output_path) return # Combine conditions with OR - if any condition is true, alpha = 0 alpha_expr = "+".join(conditions) # geq alpha: if any condition matches, return 0, else return alpha(X,Y) # Using: if(condition, 0, alpha(X,Y)) alpha_expr = f"if({alpha_expr},0,alpha(X,Y))" # Build the geq filter - preserve luma, chroma, modify alpha video_filter = f"geq=lum='lum(X,Y)':cb='cb(X,Y)':cr='cr(X,Y)':a='{alpha_expr}'" # Build FFmpeg command cmd = [ "ffmpeg", "-y", # Overwrite output ] if take is not None: cmd.extend(["-t", str(take)]) cmd.extend( [ "-i", str(input_path), "-vf", video_filter, "-c:v", "prores_ks", "-profile:v", "4", # ProRes 4444 "-pix_fmt", "yuva444p10le", # 10-bit with alpha "-c:a", "pcm_s16le", # Lossless audio str(output_path), ] ) if verbose: print(f" Mask: left={left}, right={right}, top={top}, bottom={bottom}") print(f" Filter: {video_filter}") print(f" Command: {' '.join(cmd)}") # Get duration for progress bar duration = take if take is not None else get_video_duration(input_path) result = run_ffmpeg_with_progress(cmd, duration, "Mask") if result.returncode != 0: raise PreprocessError( "Mask filter failed", filter_type="mask", command=" ".join(cmd), stderr=result.stderr, ) def apply_transcribe( input_path: Path, config: dict[str, Any], verbose: bool = False, force: bool = False, ) -> Path: """ Transcribe video audio using Whisper and save to JSON file. Config options: model: str - Whisper model size (tiny, base, small, medium, large). Default: "base" output: str - Output filename. Default: input filename with .transcript.json suffix This filter doesn't transform the video, it creates a sidecar transcript file. Skips if output file exists unless force=True. Returns: Path to the transcript JSON file. """ from .transcriber import transcribe_video, save_transcript model = config.get("model", "base") output_name = config.get("output") if output_name: output_path = input_path.parent / output_name else: output_path = input_path.with_suffix(".transcript.json") # Skip if exists (unless force) if output_path.exists() and not force: print(f" Transcript exists, skipping: {output_path.name}") print(" (use --force to regenerate)") return output_path if verbose: print(f" Model: {model}") print(f" Output: {output_path}") # Run transcription words = transcribe_video(input_path, model=model) save_transcript(words, output_path) print(f" Transcribed {len(words)} words -> {output_path.name}") return output_path def apply_audio_normalize( input_path: Path, output_path: Path, config: dict[str, Any], verbose: bool = False, take: float = None, use_audio_channels: str = "both", skip_loudnorm: bool = False, ) -> None: """ Apply audio normalization: denoise, compress, and loudness normalize. If skip_loudnorm=True, the loudnorm filter is skipped. Use this for segments that will be concatenated, then apply loudnorm once to the final output. Config options: # Room treatment highpass: float - High-pass filter frequency in Hz (0 = disabled, try 80-120) lowpass: float - Low-pass filter frequency in Hz (0 = disabled) room_eq: bool - Enable room resonance EQ cut room_eq_freq: float - Center frequency for room cut (default: 300) room_eq_gain: float - Gain in dB, negative = cut (default: -4) room_eq_width: float - Q/bandwidth (default: 1.5) # Gate (reverb tail reduction) gate: bool - Enable noise gate gate_threshold: float - Threshold in dB (default: -35) gate_range: float - Attenuation in dB when closed (default: -20) gate_attack: float - Attack time in ms (default: 10) gate_release: float - Release time in ms (default: 150) # Neural de-reverb dereverb_model: str - Path to RNNoise model file (empty = disabled) # Noise reduction denoise: bool - Enable noise reduction (default: True) noise_floor: float - Noise floor in dB (default: -25) # Compression compress: bool - Enable compression (default: True) threshold: float - Compression threshold in dB (default: -20) ratio: float - Compression ratio (default: 4) attack: float - Attack time in ms (default: 5) release: float - Release time in ms (default: 50) makeup: float - Makeup gain in dB (default: 2) # Loudness normalization normalize: bool - Enable loudness normalization (default: True) target_lufs: float - Target loudness in LUFS (default: -16) target_lra: float - Target loudness range (default: 11) target_tp: float - Target true peak in dB (default: -1.5) Args: use_audio_channels: "both", "left", or "right" - which channel(s) to use, output is always stereo with sound in both channels Filter chain order: channel_map -> eq_bands -> highpass -> lowpass -> room_eq -> dereverb -> denoise -> gate -> compress -> normalize """ cfg = parse_audio_normalize_config(config) # Build audio filter chain (order matters!) audio_filters: list[str] = [] # 0. Channel mapping - take specified channel(s) and output stereo if use_audio_channels == "left": # Take left channel, duplicate to both stereo channels audio_filters.append("pan=stereo|c0=c0|c1=c0") elif use_audio_channels == "right": # Take right channel, duplicate to both stereo channels audio_filters.append("pan=stereo|c0=c1|c1=c1") # 0.5. Parametric EQ bands (applied early for tonal shaping) for band in cfg.eq_bands: if band.type == "lowshelf": # Low shelf filter: boosts/cuts frequencies below the center audio_filters.append( f"lowshelf=f={band.freq:.1f}:g={band.gain:.1f}:t=q:w={band.q:.2f}" ) elif band.type == "highshelf": # High shelf filter: boosts/cuts frequencies above the center audio_filters.append( f"highshelf=f={band.freq:.1f}:g={band.gain:.1f}:t=q:w={band.q:.2f}" ) else: # Peak/parametric EQ band audio_filters.append( f"equalizer=f={band.freq:.1f}:width_type=q:width={band.q:.2f}:g={band.gain:.1f}" ) # 1. High-pass filter (remove room rumble and low-frequency buildup) if cfg.highpass > 0: audio_filters.append(f"highpass=f={cfg.highpass:.1f}") # 2. Low-pass filter (remove harsh highs if needed) if cfg.lowpass > 0: audio_filters.append(f"lowpass=f={cfg.lowpass:.1f}") # 3. Room resonance EQ cut (reduce muddy frequencies from room modes) if cfg.room_eq: # equalizer filter: f=frequency, width_type=q, width=Q, g=gain audio_filters.append( f"equalizer=f={cfg.room_eq_freq:.1f}" f":width_type=q:width={cfg.room_eq_width:.2f}" f":g={cfg.room_eq_gain:.1f}" ) # 4. Neural de-reverb (arnndn - very effective if model available) if cfg.dereverb_model: model_path = Path(cfg.dereverb_model) if model_path.exists(): audio_filters.append(f"arnndn=m={model_path}:mix={cfg.dereverb_mix:.2f}") else: print(f" Warning: dereverb model not found: {model_path}") # 5. Noise reduction (afftdn) if cfg.denoise: audio_filters.append(f"afftdn=nf={cfg.noise_floor:.1f}") # 6. Noise gate (reduce reverb tails during pauses) if cfg.gate: # agate: threshold, range (attenuation), attack, release audio_filters.append( f"agate=threshold={cfg.gate_threshold:.1f}dB" f":range={cfg.gate_range:.1f}dB" f":attack={cfg.gate_attack:.1f}" f":release={cfg.gate_release:.1f}" ) # 7. Compression (acompressor) if cfg.compress: audio_filters.append( f"acompressor=threshold={cfg.threshold:.1f}dB" f":ratio={cfg.ratio:.1f}" f":attack={cfg.attack:.1f}" f":release={cfg.release:.1f}" f":makeup={cfg.makeup:.1f}dB" ) # 8. Loudness normalization (loudnorm - EBU R128) # Skip if skip_loudnorm=True (for segments that will be concatenated) if cfg.normalize and not skip_loudnorm: audio_filters.append( f"loudnorm=I={cfg.target_lufs:.1f}" f":LRA={cfg.target_lra:.1f}" f":TP={cfg.target_tp:.1f}" ) if not audio_filters: # No filters enabled, just copy import shutil shutil.copy2(input_path, output_path) return audio_filter = ",".join(audio_filters) # Build FFmpeg command - copy video, process audio cmd = ["ffmpeg", "-y"] if take is not None: cmd.extend(["-t", str(take)]) cmd.extend( [ "-i", str(input_path), "-c:v", "copy", # Copy video stream unchanged "-af", audio_filter, "-c:a", "pcm_s16le", # Lossless audio output str(output_path), ] ) if verbose: print(f" Audio filter: {audio_filter}") print(f" Command: {' '.join(cmd)}") # Get duration for progress bar duration = take if take is not None else get_video_duration(input_path) result = run_ffmpeg_with_progress(cmd, duration, "Audio normalize") if result.returncode != 0: raise PreprocessError( "Audio normalization failed", filter_type="audio_normalize", command=" ".join(cmd), stderr=result.stderr, ) def parse_audio_normalize_config(config: dict[str, Any]) -> AudioNormalizeConfig: """Parse an audio normalize config dictionary into AudioNormalizeConfig.""" # Parse EQ bands eq_bands = [] for band in config.get("eq_bands", []): eq_bands.append( EQBand( freq=float(band.get("freq", 1000)), gain=float(band.get("gain", 0)), q=float(band.get("q", 1.0)), type=str(band.get("type", "peak")), ) ) return AudioNormalizeConfig( # Parametric EQ eq_bands=eq_bands, # Room treatment highpass=float(config.get("highpass", 0.0)), lowpass=float(config.get("lowpass", 0.0)), room_eq=bool(config.get("room_eq", False)), room_eq_freq=float(config.get("room_eq_freq", 300.0)), room_eq_gain=float(config.get("room_eq_gain", -4.0)), room_eq_width=float(config.get("room_eq_width", 1.5)), # Gate gate=bool(config.get("gate", False)), gate_threshold=float(config.get("gate_threshold", -35.0)), gate_range=float(config.get("gate_range", -20.0)), gate_attack=float(config.get("gate_attack", 10.0)), gate_release=float(config.get("gate_release", 150.0)), # Neural de-reverb dereverb_model=str(config.get("dereverb_model", "")), dereverb_mix=float(config.get("dereverb_mix", 0.8)), # Noise reduction denoise=bool(config.get("denoise", True)), noise_floor=float(config.get("noise_floor", -25.0)), # Compression compress=bool(config.get("compress", True)), threshold=float(config.get("threshold", -20.0)), ratio=float(config.get("ratio", 4.0)), attack=float(config.get("attack", 5.0)), release=float(config.get("release", 50.0)), makeup=float(config.get("makeup", 2.0)), # Loudness normalization normalize=bool(config.get("normalize", True)), target_lufs=float(config.get("target_lufs", -16.0)), target_lra=float(config.get("target_lra", 11.0)), target_tp=float(config.get("target_tp", -1.5)), ) def parse_chroma_key_config(config: dict[str, Any]) -> ChromaKeyConfig: """Parse a chroma key config dictionary into ChromaKeyConfig. Defaults are tuned for aggressive green screen removal: - similarity 0.4: Captures wide range of green shades (lighting variations) - blend 0.08: Tight edges with minimal feathering - spill 0.1: Light despill to remove green reflections on subject - edge_erode 0: No alpha erosion (set 1-3 to remove green fringe) - protect_color: Optional RGB color to protect from keying (e.g., yellow jumpsuit) - protect_tolerance: How much variation from protect_color to allow (0-1, default 0.15) """ color = config.get("color", [0, 255, 0]) if isinstance(color, list) and len(color) == 3: color = tuple(color) else: color = (0, 255, 0) # Parse protect_color if provided protect_color = config.get("protect_color") if protect_color: if isinstance(protect_color, list) and len(protect_color) == 3: protect_color = tuple(protect_color) else: protect_color = None return ChromaKeyConfig( color=color, similarity=float(config.get("similarity", 0.4)), blend=float(config.get("blend", 0.08)), spill=float(config.get("spill", 0.1)), edge_erode=int(config.get("edge_erode", 0)), protect_color=protect_color, protect_tolerance=float(config.get("protect_tolerance", 0.15)), ) def get_preprocessed_path(videos_dir: Path, video_source: VideoSource) -> Path: """ Get the path to the preprocessed video file. Returns output_file if specified, otherwise returns source_file. """ if video_source.output_file: return videos_dir / video_source.output_file return videos_dir / video_source.source_file def needs_preprocessing(videos_dir: Path, video_source: VideoSource) -> bool: """Check if preprocessing is needed (has filters and output doesn't exist).""" if not video_source.filter: return False if video_source.output_file: output_path = videos_dir / video_source.output_file if output_path.exists(): return False # Also check for WebM variant webm_path = output_path.with_suffix(".mov") if webm_path.exists(): return False return True return True def stitch_narration_segments( videos_dir: Path, segment_ids: list[str], videos: dict[str, VideoSource], output_path: Path, verbose: bool = False, default_end_trim: float = 0.0, ) -> Path: """ Stitch multiple narration video segments into a single file. Each segment's skip and take values are applied to trim dead video at the start/end of each recording. The segments are concatenated in the order specified by segment_ids. Args: videos_dir: Directory containing video files segment_ids: Ordered list of video IDs from videos.json videos: Dict of video ID -> VideoSource from videos.json output_path: Path for the concatenated output file verbose: Enable verbose output default_end_trim: Seconds to trim from the end when no explicit end/take is set Returns: Path to the stitched video file. """ if len(segment_ids) == 1: # Single segment - just return its processed path video_source = videos[segment_ids[0]] return get_preprocessed_path(videos_dir, video_source) print(f" Concatenating {len(segment_ids)} narration segments...") # Create temp directory for trimmed segments temp_dir = output_path.parent / "concat_temp" temp_dir.mkdir(parents=True, exist_ok=True) trimmed_segments: list[Path] = [] for i, video_id in enumerate(segment_ids): if video_id not in videos: raise PreprocessError( f"Narration segment '{video_id}' not found in videos.json", filter_type=None, ) video_source = videos[video_id] source_path = get_preprocessed_path(videos_dir, video_source) if not source_path.exists(): raise PreprocessError( f"Narration segment not found: {source_path}", filter_type=None, ) # Get segment duration full_duration = get_video_duration(source_path) skip = video_source.skip or 0.0 take = video_source.take # Apply default end trim if no explicit take/end was set if take is None and default_end_trim > 0: take = max(0.0, full_duration - skip - default_end_trim) # Calculate effective duration if take is not None: effective_duration = min(take, full_duration - skip) else: effective_duration = full_duration - skip if verbose: print(f" Segment {i+1}: {video_id}") print(f" Source: {source_path.name}") print( f" Skip: {skip}s, Take: {take or 'all'}s, Duration: {effective_duration:.1f}s" ) # Always re-encode every segment to normalize fps and timestamps. # Mixing un-normalized source files (e.g. 60fps camera) with # trimmed-and-re-encoded 30fps segments causes cumulative A/V drift # in the final concat. # Trim/normalize the segment trimmed_path = temp_dir / f"segment_{i:03d}.mov" # Check if source has alpha channel (for ProRes 4444, etc.) has_alpha = _video_has_alpha(source_path) # Re-encode to normalize framerate and fix timestamps # Different segments may have different framerates which breaks concatenation cmd = ["ffmpeg", "-y"] if skip > 0: cmd.extend(["-ss", str(skip)]) cmd.extend(["-i", str(source_path)]) if take is not None: cmd.extend(["-t", str(take)]) if has_alpha: # Preserve alpha with ProRes 4444 cmd.extend( [ "-vf", "fps=30,format=yuva444p10le", "-c:v", "prores_ks", "-profile:v", "4", "-pix_fmt", "yuva444p10le", "-c:a", "pcm_s16le", "-avoid_negative_ts", "make_zero", str(trimmed_path), ] ) else: # No alpha - use fast h264 encoding cmd.extend( [ "-vf", "fps=30", "-c:v", "libx264", "-preset", "fast", "-crf", "18", "-c:a", "aac", "-b:a", "192k", "-avoid_negative_ts", "make_zero", "-movflags", "+faststart", str(trimmed_path), ] ) result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise PreprocessError( f"Failed to trim segment {video_id}", filter_type="concat", command=" ".join(cmd), stderr=result.stderr, ) trimmed_segments.append(trimmed_path) # Build concat file list concat_list = temp_dir / "concat_list.txt" with open(concat_list, "w", encoding="utf-8") as f: for segment in trimmed_segments: f.write(f"file '{segment.resolve()}'\n") # Concatenate all segments print(f" Stitching {len(trimmed_segments)} segments -> {output_path.name}") cmd = [ "ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", str(concat_list), "-c:v", "copy", "-c:a", "copy", "-movflags", "+faststart", str(output_path), ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise PreprocessError( "Segment concatenation failed", filter_type="concat", command=" ".join(cmd), stderr=result.stderr, ) # Apply loudnorm if any segment had defer_loudnorm=True needs_loudnorm = any( videos[seg_id].defer_loudnorm for seg_id in segment_ids if seg_id in videos ) if needs_loudnorm: print(" Applying loudness normalization to stitched output...") normalized_path = ( output_path.parent / f"{output_path.stem}_normalized{output_path.suffix}" ) # Use EBU R128 loudnorm targeting YouTube's recommended levels loudnorm_cmd = [ "ffmpeg", "-y", "-i", str(output_path), "-c:v", "copy", "-af", "loudnorm=I=-14:LRA=11:TP=-1.5", "-c:a", "aac", "-b:a", "192k", "-movflags", "+faststart", str(normalized_path), ] result = subprocess.run(loudnorm_cmd, capture_output=True, text=True) if result.returncode != 0: raise PreprocessError( "Loudness normalization failed", filter_type="loudnorm", command=" ".join(loudnorm_cmd), stderr=result.stderr, ) # Replace original with normalized version output_path.unlink() normalized_path.rename(output_path) print(" Loudness normalization complete.") # Clean up temp files for segment in trimmed_segments: if segment.parent == temp_dir and segment.exists(): segment.unlink() concat_list.unlink() try: temp_dir.rmdir() except OSError: pass total_duration = get_video_duration(output_path) print(f" Stitched duration: {format_time(total_duration)}") return output_path