Files
gnommo/gnommo/cli.py
T
2026-03-26 10:46:05 +01:00

3528 lines
121 KiB
Python

"""CLI entry point for GnommoEditor."""
import argparse
import json
from logging import config
import re
import time
import shutil
import subprocess
import sys
from datetime import datetime
from pathlib import Path
from gnommo.parser import _read_json
from . import __version__
from .errors import GnommoError, ParseError, ValidationError, RenderError
from .cache import get_cache_info, resolve_with_cache
from typing import Optional, Union
class NotImplementedException(GnommoError):
"""Feature not yet implemented."""
pass
def main() -> int:
"""Main entry point."""
parser = argparse.ArgumentParser(
prog="gnommo",
description="GnommoEditor - A code-first video editing pipeline",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
gnommo -p video1 render Render the full project
gnommo -p video1 render --slides S1:S10 Render only slides S1-S10
gnommo -p video1 render --slides S10: Render from S10 to end
gnommo -p video1 validate Validate only
gnommo -p video1 import Generate slides.json from images
gnommo -p video1 pre Preprocess videos (chroma key, etc.)
gnommo -p video1 stitch --res tiny -f Fast stitch with new begin/end values
gnommo -p video1 trim Auto-detect silence and set skip/take in narration.json
gnommo -p video1 trim --force Redo trim even for segments that already have skip/take
gnommo -p video1 trim --threshold -25 Raise threshold to ignore clothing/room noise
gnommo -p video1 trim -v Show detected silence periods for debugging
gnommo -p video1 transcode Transcode narration folder to H.265 (1st pass, before preprocess)
gnommo -p video1 transcode --replace Delete originals after successful transcode
gnommo -p video1 transcode --crf 28 Lower quality / smaller files (default CRF: 23)
gnommo -p video1 transcode --processed Compress _processed.mov files to HEVC+alpha (2nd pass, after preprocess)
gnommo -p video1 transcode --processed --alpha-quality 0.5 More aggressive alpha compression
gnommo -p video1 transcode --processed --dry-run Preview what would be compressed
gnommo -p video1 transcode --force Re-transcode even if output already exists
gnommo -p video1 all Full pipeline: transcribe → align → render
gnommo -p video1 render --dry-run Show FFmpeg command without running
gnommo -p video1 description Generate YouTube description file
gnommo -p video1 transcribe Narration file for timing of slides
gnommo -p video1 transcribe --final Transcribe outputted file and generate SRT for YouTube
gnommo -p video1 archive Sync project to external cache storage
gnommo -p video1 archive --dry-run Preview what would be synced
gnommo -p video1 extract-audio --combined Extract audio from narration_combined.mov
gnommo -p video1 extract-audio --combined --channel left Extract left channel only
gnommo -p video1 extract-audio --segment seg01 Extract from a specific segment
gnommo -p video1 master Extract raw + processed audio for A/B comparison
""",
)
parser.add_argument(
"--version",
action="version",
version=f"%(prog)s {__version__}",
)
# Required arguments
parser.add_argument(
"-p",
"--project",
type=str,
required=True,
help="Project directory",
)
parser.add_argument(
"action",
type=str,
nargs="?",
default="render",
choices=[
"validate",
"preprocess",
"pre",
"stitch",
"trim",
"render",
"all",
"transcribe",
"align",
"import",
"description",
"archive",
"extract-audio",
"master",
"push",
"pull",
"handoff",
"transcode",
],
help="Action to perform (default: render)",
)
# Optional arguments
parser.add_argument(
"-v",
"--verbose",
action="store_true",
help="Verbose output",
)
parser.add_argument(
"-f",
"--force",
action="store_true",
help="Force overwrite existing files",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be done without executing",
)
parser.add_argument(
"--slides",
type=str,
help="Render only a range of slides (e.g., S1:S10, S5:, S10:S20)",
)
parser.add_argument(
"--res",
type=str,
choices=["full", "low", "tiny"],
default="full",
help="Resolution: 'full' (project res), 'low' (490x270), 'tiny' (320x180 ultrafast)",
)
parser.add_argument(
"-w",
"--workers",
type=int,
default=1,
help="Number of parallel workers for preprocessing (default: 1)",
)
parser.add_argument(
"--final",
action="store_true",
help="For transcribe: transcribe the final rendered video and generate SRT captions for YouTube",
)
parser.add_argument(
"--segment",
type=str,
help="For extract-audio: specific segment ID to extract (default: all segments)",
)
parser.add_argument(
"--channel",
type=str,
choices=["auto", "left", "right", "both"],
default="both",
help="For extract-audio: which audio channel(s) to extract (default: both)",
)
parser.add_argument(
"--combined",
action="store_true",
help="For extract-audio: extract from narration_combined.mov instead of individual segments",
)
parser.add_argument(
"--file",
default=None,
help="For handoff: path to video file (overrides output_video in project.json)",
)
parser.add_argument(
"--prod",
action="store_true",
help="Target production server (GNOMMOWEB_PROD_URL / GNOMMOWEB_PROD_API_KEY)",
)
parser.add_argument(
"--threshold",
type=float,
default=-40.0,
help="For trim: silence threshold in dB (default: -40). Raise (e.g. -25) to ignore clothing/room noise.",
)
parser.add_argument(
"--crf",
type=int,
default=23,
help="For transcode: H.265 quality (CRF, default: 23; lower=better quality, larger file)",
)
parser.add_argument(
"--replace",
action="store_true",
help="For transcode: delete original files after successful transcode",
)
parser.add_argument(
"--processed",
action="store_true",
help="For transcode: compress _processed.mov files (with alpha) using HEVC+alpha instead of narration files",
)
parser.add_argument(
"--alpha-quality",
type=float,
default=1.0,
dest="alpha_quality",
help="For transcode --processed: HEVC alpha quality 0.0-1.0 (default: 0.75; lower=smaller file)",
)
args = parser.parse_args()
# Resolve project path
project_path = Path(args.project)
if not project_path.is_absolute():
project_path = Path.cwd() / project_path
try:
# Handle actions
action = args.action
if action == "import":
return cmd_import(project_path, args.force, args.verbose)
elif action == "validate":
return cmd_validate(project_path, args.verbose)
elif action in ("preprocess", "pre"):
return cmd_preprocess(
project_path,
args.verbose,
args.dry_run,
args.force,
args.workers,
args.res,
)
elif action == "trim":
return cmd_trim(project_path, args.verbose, args.force, args.threshold)
elif action == "transcode":
return cmd_transcode(
project_path,
args.verbose,
args.dry_run,
args.replace,
args.crf,
args.force,
args.processed,
args.alpha_quality,
)
elif action in ("stitch"):
return cmd_stitch(
project_path,
args.verbose,
args.force,
args.res,
)
elif action == "render":
return cmd_render(
project_path,
args.verbose,
args.dry_run,
args.slides,
args.res,
args.force,
)
elif action == "transcribe":
return cmd_transcribe(project_path, args.verbose, args.res, args.final)
elif action == "align":
return cmd_align(project_path, args.verbose)
elif action == "all":
return cmd_all(
project_path, args.verbose, args.dry_run, args.res, args.force
)
elif action == "description":
return cmd_description(project_path, args.verbose)
elif action == "archive":
return cmd_archive(project_path, args.verbose, args.dry_run)
elif action == "extract-audio":
return cmd_extract_audio(
project_path, args.verbose, args.segment, args.channel, args.combined
)
elif action == "master":
return cmd_master(project_path, args.verbose, args.channel)
elif action == "push":
from .push import cmd_push
return cmd_push(project_path, args.verbose, args.force, args.prod)
elif action == "pull":
from .pull import cmd_pull
return cmd_pull(project_path, args.verbose, args.force, args.prod)
elif action == "handoff":
from .handoff import cmd_handoff
return cmd_handoff(
project_path, args.verbose, args.file, args.prod, args.res
)
except GnommoError as e:
print(f"Error: {e}", file=sys.stderr)
return 1
except KeyboardInterrupt:
print("\nAborted.", file=sys.stderr)
return 130
return 0
# =============================================================================
# Import Command
# =============================================================================
def cmd_import(project_path: Path, force: bool, verbose: bool) -> int:
"""Import assets and generate metadata JSON files."""
from .parser import parse_project_config, _read_json
print(f"Importing assets for: {project_path.name}")
if not project_path.exists():
print(f"Error: Project directory not found: {project_path}", file=sys.stderr)
return 1
# Load project config if it exists (for videos_path and default_filters)
config = None
if (project_path / "project.json").exists():
config = parse_project_config(project_path)
# Import videos from media/videos directory
if config and config.videos_path:
videos_json_path = project_path / config.videos_path
videos_dir = videos_json_path.parent
else:
videos_dir = project_path / "media" / "videos"
if videos_dir.exists():
_import_videos(videos_dir, config, verbose)
# Import narration segments from media/narration directory
narration_dir = project_path / "media" / "narration"
if narration_dir.exists():
_import_narration_segments(narration_dir, config, verbose)
# Import presenter notes from Keynote file (also exports slide PNGs)
keynote_files = list(project_path.glob("*.key"))
if keynote_files:
keynote_file = keynote_files[0] # Use first .key file found
if len(keynote_files) > 1:
print(f" Warning: Multiple .key files found, using {keynote_file.name}")
_import_presenter_notes(project_path, keynote_file, verbose)
# Generate slides.json for each slide directory (after Keynote export)
slides_base = project_path / "media" / "slides"
slides_dirs = (
[d for d in slides_base.glob("*/") if d.is_dir()]
if slides_base.exists()
else []
)
for slides_dir in slides_dirs:
_generate_slides_json(slides_dir, verbose)
else:
if verbose:
print(" No .key file found, skipping presenter notes import")
# Import shared assets (pexels, etc.) from shared_assets directory
# Look for shared_assets relative to project or in parent directories
shared_assets_dir = _find_shared_assets(project_path)
if shared_assets_dir:
_import_shared_assets(shared_assets_dir, verbose)
_sync_shared_videos_to_local(project_path, config, shared_assets_dir, verbose)
# Probe and cache audio file durations into audio.json
_probe_audio_durations(project_path, config, force, verbose)
# Probe and cache video metadata (duration, has_audio) into videos.json
_probe_video_metadata(project_path, config, shared_assets_dir, force, verbose)
print("Import complete.")
return 0
def _probe_audio_durations(
project_path: Path, config, force: bool, verbose: bool
) -> None:
"""Probe and cache audio file durations into audio.json.
Runs once at import time so the render stage never needs to scan audio files.
Skips entries that already have a duration unless --force is set.
"""
from .renderer import _get_audio_duration
if config and config.audio_path:
audio_json_path = project_path / config.audio_path
else:
audio_json_path = project_path / "audio.json"
if not audio_json_path.exists():
return
audio_dir = audio_json_path.parent
data = _read_json(audio_json_path)
updated = False
for audio_id, audio_data in data.items():
if "file" not in audio_data:
continue
if "duration" in audio_data and not force:
if verbose:
print(f" Audio '{audio_id}': cached ({audio_data['duration']:.1f}s)")
continue
audio_path = audio_dir / audio_data["file"]
if not audio_path.exists():
if verbose:
print(f" Audio '{audio_id}': file not found, skipping")
continue
print(f" Probing audio '{audio_id}' ({audio_path.name})...", end=" ", flush=True)
try:
duration = _get_audio_duration(audio_path)
data[audio_id]["duration"] = round(duration, 3)
updated = True
print(f"{duration:.1f}s")
except Exception as e:
print(f"failed ({e})")
if updated:
with open(audio_json_path, "w") as f:
json.dump(data, f, indent=4)
print(f" Saved durations to {audio_json_path.name}")
def _probe_video_metadata(
project_path: Path, config, shared_assets_dir: Optional[Path], force: bool, verbose: bool
) -> None:
"""Probe and cache video file duration and audio presence into videos.json.
Runs once at import time so the render stage never needs to probe video files.
Shared entries are written back to shared_assets/videos.json (canonical source).
Local entries are written to the project's videos.json.
Skips entries that already have both fields unless --force is set.
"""
from .preprocessor import get_video_duration
from .renderer import _has_audio_stream
if config and config.videos_path:
videos_json_path = project_path / config.videos_path
else:
videos_json_path = project_path / "media" / "videos" / "videos.json"
if not videos_json_path.exists():
return
videos_dir = videos_json_path.parent
local_data = _read_json(videos_json_path)
# Load shared_assets/videos.json separately — shared probes write there
shared_json_path = shared_assets_dir / "videos.json" if shared_assets_dir else None
shared_data = _read_json(shared_json_path) if shared_json_path and shared_json_path.exists() else {}
local_updated = False
shared_updated = False
for video_id, video_data in local_data.items():
if "source_file" not in video_data:
continue
is_shared = video_data.get("is_shared", False)
# For shared entries, check the shared_assets/videos.json for cached values
if is_shared and video_id in shared_data:
canonical = shared_data[video_id]
else:
canonical = video_data
if not force and "duration" in canonical and "has_audio" in canonical:
if verbose:
print(f" Video '{video_id}': cached ({canonical['duration']:.1f}s, audio={canonical['has_audio']})")
continue
base_dir = shared_assets_dir if (is_shared and shared_assets_dir) else videos_dir
# Mirror renderer._resolve_video_path: try output_file first, then source_file
video_path = None
output_file = video_data.get("output_file")
if output_file:
for candidate_dir in [base_dir, base_dir.parent]:
candidate = candidate_dir / output_file
if candidate.exists():
video_path = candidate
break
mov_candidate = candidate.with_suffix(".mov")
if mov_candidate.exists():
video_path = mov_candidate
break
if video_path is None:
source_candidate = base_dir / video_data["source_file"]
if source_candidate.exists():
video_path = source_candidate
if video_path is None:
if verbose:
print(f" Video '{video_id}': file not found, skipping")
continue
print(f" Probing video '{video_id}' ({video_path.name})...", end=" ", flush=True)
try:
duration = get_video_duration(video_path)
has_audio = _has_audio_stream(video_path)
result = {"duration": round(duration, 3), "has_audio": has_audio}
print(f"{duration:.1f}s, audio={has_audio}")
if is_shared and video_id in shared_data:
# Write back to shared_assets/videos.json — canonical source for shared assets
shared_data[video_id].update(result)
shared_updated = True
else:
local_data[video_id].update(result)
local_updated = True
except Exception as e:
print(f"failed ({e})")
if local_updated:
with open(videos_json_path, "w") as f:
json.dump(local_data, f, indent=4)
print(f" Saved metadata to {videos_json_path.name}")
if shared_updated and shared_json_path:
with open(shared_json_path, "w") as f:
json.dump(shared_data, f, indent=4)
print(f" Saved shared metadata to {shared_json_path.name}")
def _sync_shared_videos_to_local(
project_path: Path, config, shared_assets_dir: Path, verbose: bool
) -> None:
"""Append entries from shared_assets/videos.json into the project's local videos.json.
Each new entry gets is_shared=true so the renderer looks in shared_assets_dir.
Existing local entries are never overwritten (preserves cutout, layer, filters, etc.).
"""
shared_videos_json = shared_assets_dir / "videos.json"
if not shared_videos_json.exists():
return
shared_videos = _read_json(shared_videos_json)
if not shared_videos:
return
if config and config.videos_path:
local_json_path = project_path / config.videos_path
else:
local_json_path = project_path / "media" / "videos" / "videos.json"
local_videos: dict = {}
if local_json_path.exists():
local_videos = _read_json(local_json_path)
_METADATA_FIELDS = ("duration", "has_audio")
added = []
metadata_updated = []
for video_id, shared_entry in shared_videos.items():
if video_id in local_videos:
# Propagate any metadata fields that were probed into shared_assets/videos.json
changed = False
for field in _METADATA_FIELDS:
if field in shared_entry and local_videos[video_id].get(field) != shared_entry[field]:
local_videos[video_id][field] = shared_entry[field]
changed = True
if changed:
metadata_updated.append(video_id)
elif verbose:
print(f" Shared '{video_id}': already in local videos.json, skipping")
continue
# New entry — copy from shared and mark it as shared
local_entry = dict(shared_entry)
local_entry["is_shared"] = True
local_videos[video_id] = local_entry
added.append(video_id)
if added or metadata_updated:
local_json_path.parent.mkdir(parents=True, exist_ok=True)
with open(local_json_path, "w", encoding="utf-8") as f:
json.dump(local_videos, f, indent=4)
if added:
print(f" Synced {len(added)} shared asset(s) to local videos.json: {', '.join(added)}")
if metadata_updated:
print(f" Updated metadata for {len(metadata_updated)} shared asset(s): {', '.join(metadata_updated)}")
elif verbose:
print(" No new shared assets to sync to local videos.json")
def _find_shared_assets(project_path: Path) -> Optional[Path]:
"""Find the shared_assets directory.
Looks in:
1. project_path/shared_assets
2. project_path/../shared_assets (sibling to project)
"""
# Check if shared_assets is inside project
if (project_path / "shared_assets").exists():
return project_path / "shared_assets"
# Check if shared_assets is sibling to project
if (project_path.parent / "shared_assets").exists():
return project_path.parent / "shared_assets"
return None
def _import_shared_assets(shared_assets_dir: Path, verbose: bool) -> None:
"""Import video files from shared_assets directory into videos.json.
Scans the root level and all subdirectories for video files and creates
a unified videos.json in shared_assets/.
Video IDs use the filename for root-level files (e.g., "Logo") or
are prefixed with the subfolder name for subdirectory files (e.g., "pexels/filename").
"""
video_extensions = {".mov", ".mp4", ".webm", ".avi", ".mkv", ".m4v"}
# Find all video files in shared_assets (root level and subdirectories)
video_files: list[tuple[Path, Path]] = [] # (relative_path, absolute_path)
for item in shared_assets_dir.iterdir():
if item.name.startswith("."):
continue
if item.is_file():
# Video file directly in shared_assets root
if (
item.suffix.lower() in video_extensions
and not item.name.endswith("_processed.mov")
and not item.name.endswith("_processed.webm")
):
rel_path = item.relative_to(shared_assets_dir)
video_files.append((rel_path, item))
elif item.is_dir():
# Scan subdirectories recursively
for video_file in item.rglob("*"):
if (
video_file.is_file()
and video_file.suffix.lower() in video_extensions
and not video_file.name.endswith("_processed.mov")
and not video_file.name.endswith("_processed.webm")
):
rel_path = video_file.relative_to(shared_assets_dir)
video_files.append((rel_path, video_file))
if not video_files:
if verbose:
print(f" No video files found in {shared_assets_dir}")
return
# Load existing videos.json if it exists
videos_json_path = shared_assets_dir / "videos.json"
existing_videos: dict = {}
if videos_json_path.exists():
existing_videos = _read_json(videos_json_path)
# Add new videos (don't overwrite existing)
added_count = 0
for rel_path, abs_path in sorted(video_files):
# Use path relative to shared_assets without extension as video_id
# e.g., "Logo" for root files, "pexels/6759604-hd" for subdirectory files
video_id = str(rel_path.with_suffix(""))
if video_id in existing_videos:
if verbose:
print(f" Skipping {video_id} (already exists)")
continue
existing_videos[video_id] = {
"source_file": str(rel_path),
}
added_count += 1
if verbose:
print(f" Added: {video_id}")
if added_count > 0:
# Write updated videos.json
with open(videos_json_path, "w", encoding="utf-8") as f:
json.dump(existing_videos, f, indent=2)
print(f" Updated {videos_json_path} (+{added_count} shared assets)")
else:
print(f" No new shared assets to add")
def _generate_slides_json(directory: Path, verbose: bool) -> None:
"""Generate slides.json from Keynote export folder."""
extensions = {".png", ".gif", ".pdf", ".jpg", ".jpeg"}
files = [f for f in directory.iterdir() if f.suffix.lower() in extensions]
if not files:
print(f" Warning: No image files in {directory}")
return
# Extract numeric suffix from filenames like "Video1.001.png"
pattern = re.compile(r"\.(\d+)\.[^.]+$")
slides = {}
for file in files:
match = pattern.search(file.name)
if match:
num = int(match.group(1))
slide_id = f"S{num}"
slides[slide_id] = {
"image": file.name,
"type": "fullscreen",
}
if not slides:
print(f" Warning: No valid slide files in {directory}")
return
# Sort by slide number
sorted_slides = dict(sorted(slides.items(), key=lambda x: int(x[0][1:])))
# Write slides.json
output_path = directory / "slides.json"
with open(output_path, "w", encoding="utf-8") as f:
json.dump(sorted_slides, f, indent=2)
print(f" Generated {output_path} ({len(sorted_slides)} slides)")
if verbose:
for slide_id in sorted_slides:
print(f" [{slide_id}]")
def _import_videos(videos_dir: Path, config, verbose: bool) -> None:
"""Import video files into videos.json.
Scans the videos directory for video files and adds them to videos.json.
Uses the filename (without extension) as the video_id.
Does not overwrite existing entries - only adds new ones.
If the video filename matches a pattern like 'talkinghead*' and a 'talkinghead'
filter preset exists in default_filters, it will be applied automatically.
"""
video_extensions = {".mov", ".mp4", ".webm", ".avi", ".mkv", ".m4v"}
# Find all video files (exclude processed outputs, proxies, and intermediate files)
video_files = [
f
for f in videos_dir.iterdir()
if f.is_file()
and f.suffix.lower() in video_extensions
and "_processed" not in f.stem # Exclude any _processed files
and "_fixed" not in f.stem # Exclude any _fixed files
and not f.name.startswith("narration_combined")
]
# Also exclude files in subdirectories (proxy/, intermediate/, etc.)
video_files = [f for f in video_files if f.parent == videos_dir]
# Ensure videos.json exists even if there are no video files yet
videos_json_path = videos_dir / "videos.json"
if not videos_json_path.exists():
videos_dir.mkdir(parents=True, exist_ok=True)
with open(videos_json_path, "w", encoding="utf-8") as f:
json.dump({}, f, indent=2)
print(
f" Created empty {videos_json_path.relative_to(videos_dir.parent.parent)}"
)
if not video_files:
if verbose:
print(f" No new video files found in {videos_dir}")
return
# Load existing videos.json
existing_videos: dict = {}
if videos_json_path.exists():
existing_videos = _read_json(videos_json_path)
# Get available filter presets from config
default_filters = config.default_filters if config else {}
# Add new videos (don't overwrite existing)
added_count = 0
for video_file in sorted(video_files):
# Use filename without extension as video_id
video_id = video_file.stem
if video_id in existing_videos:
if verbose:
print(f" Skipping {video_id} (already exists)")
continue
# Determine if this is a talking head segment
# Match patterns like: talkinghead, talkingheadS01, talkinghead_s01, etc.
is_narration_combined = "narration_combined" in video_file.stem.lower()
# Build the video entry
video_entry = {
"source_file": video_file.name,
}
if is_narration_combined:
video_entry["output_file"] = None
video_entry["cutout"] = "talkinghead"
video_entry["always_visible"] = True
video_entry["skip"] = 0
video_entry["filter"] = []
print(f" Added talking head segment: {video_id}")
else:
# Regular video
video_entry["output_file"] = video_file.name
video_entry["cutout"] = "square"
video_entry["filter"] = []
if verbose:
print(f" Added: {video_id}")
existing_videos[video_id] = video_entry
added_count += 1
if added_count > 0:
# Write updated videos.json
with open(videos_json_path, "w", encoding="utf-8") as f:
json.dump(existing_videos, f, indent=2)
print(f" Updated {videos_json_path.name} (+{added_count} videos)")
else:
print(f" No new videos to add")
def _import_narration_segments(narration_dir: Path, config, verbose: bool) -> None:
"""Import narration video files into narration.json.
Folder structure:
media/narration/raw_mov/ ← raw recordings from iPhone/QuickTime
media/narration/compressed/ ← H.265 copies (transcode 1st pass)
media/narration/processed/ ← chroma-keyed output (preprocess)
media/narration/narration.json
Scans processed/ for ready-to-stitch files and raw/ for any new raw
recordings not yet represented in narration.json.
Priority: processed/ files define the segment catalogue.
Raw files discovered in raw/ add new entries pointing at raw/ with
output_file preset to processed/<stem>_processed.mov.
"""
video_extensions = {".mov", ".mp4", ".webm", ".avi", ".mkv", ".m4v"}
processed_dir = narration_dir / "processed"
raw_dir = narration_dir / "raw_mov"
processed_dir.mkdir(parents=True, exist_ok=True)
raw_dir.mkdir(parents=True, exist_ok=True)
# Load / create narration.json
narration_json_path = narration_dir / "narration.json"
existing_narration: dict = {}
if narration_json_path.exists():
existing_narration = _read_json(narration_json_path)
default_filters = config.default_filters if config else {}
added_count = 0
def _scan(directory: Path) -> list[Path]:
if not directory.exists():
return []
return sorted(
f
for f in directory.iterdir()
if f.is_file()
and f.suffix.lower() in video_extensions
and not f.name.startswith(".")
)
# 1. Scan processed/ — only add entries when NO raw_mov equivalent exists.
# If raw_mov has the source, step 2 will create the entry pointing there
# (with the filter chain), which is better for re-processing later.
_raw_video_exts = {".mov", ".mp4", ".avi", ".mkv", ".m4v"}
for video_file in _scan(processed_dir):
segment_id = video_file.stem
# Strip _processed suffix for cleaner segment IDs if present
if segment_id.endswith("_processed"):
segment_id = segment_id[:-10]
if segment_id in existing_narration:
if verbose:
print(f" Skipping {segment_id} (already exists)")
continue
# If a raw_mov equivalent exists, skip — step 2 will handle it
raw_mov_has_file = raw_dir.exists() and any(
(raw_dir / f"{segment_id}{ext}").exists()
for ext in _raw_video_exts
)
if raw_mov_has_file:
continue
narration_entry = {
"source_file": f"processed/{video_file.name}",
}
narration_entry["use_audio_channels"] = "auto"
narration_entry["defer_loudnorm"] = True
existing_narration[segment_id] = narration_entry
added_count += 1
print(f" Added narration segment: {segment_id} (from processed/)")
# 2. Scan raw/ — add entries for raw files not yet in narration.json
for video_file in _scan(raw_dir):
segment_id = video_file.stem
if segment_id in existing_narration:
if verbose:
print(f" Skipping {segment_id} (already exists)")
continue
narration_entry = {
"source_file": f"raw_mov/{video_file.name}",
"output_file": f"processed/{video_file.stem}_processed.mov",
}
if "talkinghead" in default_filters:
narration_entry["cutout"] = "talkinghead"
narration_entry["filter"] = "talkinghead"
narration_entry["use_audio_channels"] = "auto"
narration_entry["defer_loudnorm"] = True
existing_narration[segment_id] = narration_entry
added_count += 1
print(f" Added narration segment: {segment_id} (from raw_mov)")
# Always write narration.json (creates it if missing)
with open(narration_json_path, "w", encoding="utf-8") as f:
json.dump(existing_narration, f, indent=2)
if added_count > 0:
print(f" Updated narration.json (+{added_count} segments)")
else:
if not existing_narration:
print(f" narration.json created (empty — add files to processed/ or raw/)")
else:
print(f" No new narration segments to add")
def _import_presenter_notes(
project_path: Path, keynote_file: Path, verbose: bool
) -> None:
"""Extract presenter notes from Keynote and write to manuscript.txt.
Uses the JXA script (extract_keynote_notes.js) to extract notes via osascript.
Also exports slides as PNG images to media/slides/{project_name}/.
Backs up existing manuscript.txt before overwriting.
"""
print(f" Extracting presenter notes from {keynote_file.name}...")
# Find the JXA script (in the same directory as this module)
script_dir = Path(__file__).parent
jxa_script = script_dir / "extract_keynote_notes.js"
if not jxa_script.exists():
print(f" Error: JXA script not found at {jxa_script}", file=sys.stderr)
return
# Backup existing manuscript.txt if it exists
manuscript_path = project_path / "manuscript.txt"
if manuscript_path.exists():
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = project_path / f"manuscript.txt.{timestamp}.bak"
shutil.copy2(manuscript_path, backup_path)
if verbose:
print(f" Backed up manuscript.txt to {backup_path.name}")
# Slides export directory: {project}/media/slides/{project_name}/
slides_dir = project_path / "media" / "slides" / project_path.name
print(f" Exporting slides to {slides_dir}...")
# Run JXA extractor via osascript (also exports slides)
proc = subprocess.run(
[
"osascript",
"-l",
"JavaScript",
str(jxa_script),
str(keynote_file.resolve()),
str(slides_dir.resolve()),
],
capture_output=True,
text=True,
)
if proc.returncode != 0:
print(f" Error extracting presenter notes:", file=sys.stderr)
print(f" {proc.stderr}", file=sys.stderr)
return
# Parse JSON output from JXA script
try:
notes_data = json.loads(proc.stdout) if proc.stdout.strip() else []
except json.JSONDecodeError as e:
print(f" Error parsing notes JSON: {e}", file=sys.stderr)
return
# Convert to manuscript.txt format
lines = []
for item in notes_data:
idx = item.get("slide_index")
notes = (item.get("notes") or "").rstrip()
lines.append(f"[S{idx}]")
if notes:
lines.append(notes)
lines.append("") # blank line between slides
# Write manuscript.txt
manuscript_path.write_text("\n".join(lines).rstrip() + "\n", encoding="utf-8")
print(f" Wrote {manuscript_path} ({len(notes_data)} slides)")
if verbose:
non_empty = sum(1 for item in notes_data if item.get("notes"))
print(f" {non_empty} slides have presenter notes")
# =============================================================================
# Tasks File
# =============================================================================
_TASKS_VIDEO_PREFIXES = {
"video:": 6,
"vft:": 4,
"vfb:": 4,
"vst:": 4,
"vsb:": 4,
"narration:": 10,
}
def _collect_missing_video_markers(
markers: list[str], videos: dict
) -> list[tuple[str, str]]:
"""Return (marker_text, video_id) for video markers not defined in videos.json."""
missing = []
seen = set()
for marker in markers:
matched = next((p for p in _TASKS_VIDEO_PREFIXES if marker.startswith(p)), None)
if matched is None:
continue
video_id = marker[_TASKS_VIDEO_PREFIXES[matched] :]
if video_id not in videos and video_id not in seen:
seen.add(video_id)
missing.append((marker, video_id))
return missing
def _write_tasks_file(
project_path: Path,
missing_videos: list[tuple[str, str]],
alignment_issues: list[tuple[str, str]],
) -> None:
"""Write tasks.md to project_path with missing assets and alignment issues."""
tasks_path = project_path / "tasks.md"
today = datetime.now().strftime("%Y-%m-%d")
lines = [
f"# Tasks: {project_path.name}",
f"_Generated: {today}_",
"",
]
if missing_videos:
lines += [
f"## Missing Video Assets ({len(missing_videos)})",
"Referenced in manuscript.txt but not defined in videos.json.",
"",
]
for marker, video_id in missing_videos:
lines.append(f"- [ ] `{video_id}` — referenced as `[{marker}]`")
lines.append("")
if alignment_issues:
lines += [
f"## Slide Alignment Issues ({len(alignment_issues)})",
"Slide markers that could not be matched to the spoken narration (likely adlibbed).",
"",
]
for marker_id, context in alignment_issues:
lines.append(f'- [ ] `{marker_id}` — _"{context}"_')
lines.append("")
if not missing_videos and not alignment_issues:
lines += ["_No outstanding tasks._", ""]
tasks_path.write_text("\n".join(lines), encoding="utf-8")
print(
f" Tasks written → tasks.md"
+ (f" ({len(missing_videos)} missing videos)" if missing_videos else "")
+ (f" ({len(alignment_issues)} alignment issues)" if alignment_issues else "")
)
# =============================================================================
# Validate Command
# =============================================================================
def cmd_validate(project_path: Path, verbose: bool) -> int:
"""Validate project configuration."""
from .parser import (
parse_manuscript,
parse_project_config,
parse_slides,
parse_videos,
)
from .validator import validate_project
print(f"Validating: {project_path.name}")
if not (project_path / "project.json").exists():
print(f"Error: project.json not found in {project_path}", file=sys.stderr)
return 1
# Parse all files
_, markers, malformed, _ = parse_manuscript(project_path)
config = parse_project_config(project_path)
slides = parse_slides(project_path, config)
videos, videos_dir = parse_videos(project_path, config)
if verbose:
print(f" - Markers in manuscript: {len(markers)}")
print(f" - Slides defined: {len(slides)}")
print(f" - Videos defined: {len(videos)}")
# Validate
warnings = validate_project(
project_path, markers, config, slides, videos, videos_dir, malformed
)
for w in warnings:
print(f" Warning: {w}")
# Write tasks file (missing assets only — no alignment data at validate time)
missing_videos = _collect_missing_video_markers(markers, videos)
_write_tasks_file(project_path, missing_videos, alignment_issues=[])
print("Validation passed.")
return 0
# =============================================================================
# Preprocess Command
# =============================================================================
def cmd_preprocess(
project_path: Path,
verbose: bool,
dry_run: bool,
force: bool = False,
workers: int = 1,
res: str = "full",
) -> int:
"""Run preprocessing pipeline on narration segments and videos.
Discovers source files directly from raw_mov/ (preferred) or raw_mp4/
(fallback when raw_mov/ is empty). Does NOT require narration.json to
exist — it writes/updates narration.json after processing.
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
from .parser import parse_project_config, parse_videos
from .preprocessor import preprocess_video
from .models import VideoSource as _VideoSource
mode_str = f" ({res.upper()})" if res != "full" else ""
print(f"Preprocessing narration: {project_path.name}{mode_str}")
config = parse_project_config(project_path)
# Narration directory — always media/narration/
narration_dir = project_path / "media" / "narration"
narration_dir.mkdir(parents=True, exist_ok=True)
raw_dir = narration_dir / "raw_mov"
compressed_dir = narration_dir / "raw_mp4"
processed_dir = narration_dir / "processed"
processed_dir.mkdir(parents=True, exist_ok=True)
# Resolve intermediate directory
gnommo_scratch = None
if config.gnommo_scratch:
gnommo_scratch = Path(config.gnommo_scratch)
if not gnommo_scratch.is_absolute():
gnommo_scratch = project_path / gnommo_scratch
print(f" Using intermediate dir: {gnommo_scratch}")
# --- Filter pipeline ---
talkinghead_filter = (config.default_filters or {}).get("talkinghead", [])
if not talkinghead_filter:
print(" ERROR: No 'talkinghead' filter defined in project.json default_filters.")
print(" Add a 'talkinghead' entry under 'default_filters' in project.json.")
return 1
# --- Source discovery ---
_video_exts = {".mov", ".mp4", ".avi", ".mkv", ".m4v"}
def _scan_dir(d: Path) -> list[Path]:
if not d.exists():
return []
return sorted(
f for f in d.iterdir()
if f.is_file() and f.suffix.lower() in _video_exts and not f.name.startswith(".")
)
raw_mov_files = _scan_dir(raw_dir)
raw_mp4_files = _scan_dir(compressed_dir)
if raw_mov_files:
source_files = raw_mov_files
using_compressed = False
elif raw_mp4_files:
source_files = raw_mp4_files
using_compressed = True
print(" WARNING: raw_mov/ is empty — using compressed files from raw_mp4/ instead. Quality may be reduced.")
else:
print(f" No source files found in raw_mov/ or raw_mp4/.")
print(f" Place .mov recordings in {raw_dir}")
return 1
# --- Load existing narration.json to preserve per-segment settings ---
narration_json_path = narration_dir / "narration.json"
existing_narration: dict = {}
if narration_json_path.exists():
existing_narration = _read_json(narration_json_path)
# --- Build segments list ---
segments_to_process: list[tuple[str, _VideoSource]] = []
skipped_count = 0
for source_file in source_files:
segment_id = source_file.stem
# Strip _compressed suffix (raw_mp4 naming convention)
if using_compressed and segment_id.endswith("_compressed"):
segment_id = segment_id[: -len("_compressed")]
output_file = f"processed/{segment_id}_processed.mov"
output_path = narration_dir / output_file
if output_path.exists() and not force:
print(f" {segment_id}: output exists, skipping (use --force to reprocess)")
skipped_count += 1
continue
# Filter: from existing narration.json entry (if explicitly set), else talkinghead
existing_entry = existing_narration.get(segment_id, {})
raw_filter = existing_entry.get("filter")
if raw_filter:
if isinstance(raw_filter, str):
filter_list = (config.default_filters or {}).get(raw_filter, talkinghead_filter)
else:
filter_list = raw_filter
else:
filter_list = talkinghead_filter
video_source = _VideoSource(
source_file=source_file,
filter=filter_list,
output_file=output_file,
use_audio_channels=existing_entry.get("use_audio_channels", "auto"),
defer_loudnorm=existing_entry.get("defer_loudnorm", True),
)
segments_to_process.append((segment_id, video_source))
if not segments_to_process:
if skipped_count:
print(f"\n All {skipped_count} segment(s) already preprocessed. Use --force to reprocess.")
else:
print("\n No segments to preprocess.")
return 0
if dry_run:
for segment_id, segment_source in segments_to_process:
print(f"\n Would preprocess: {segment_id}")
print(f" Source: {segment_source.source_file}")
print(f" Output: {segment_source.output_file}")
print(f" Filters: {len(segment_source.filter)} step(s)")
return 0
# --- Process segments ---
successfully_processed: list[tuple[str, _VideoSource]] = []
if workers > 1 and len(segments_to_process) > 1:
num_workers = min(workers, len(segments_to_process))
print(f"\n Processing {len(segments_to_process)} segments in parallel ({num_workers} workers)")
def process_segment_task(task):
seg_id, seg_source = task
preprocess_video(
narration_dir, seg_id, seg_source,
verbose=False, force=force, custom_gnommo_scratch=gnommo_scratch,
)
return task
completed = 0
with ThreadPoolExecutor(max_workers=num_workers) as executor:
futures = {executor.submit(process_segment_task, t): t for t in segments_to_process}
for future in as_completed(futures):
seg_id, seg_source = future.result()
completed += 1
print(f" Completed: {seg_id} ({completed}/{len(segments_to_process)})")
output_path = narration_dir / seg_source.output_file
if output_path.exists():
successfully_processed.append((seg_id, seg_source))
else:
for segment_id, segment_source in segments_to_process:
print(f"\n Processing: {segment_id}")
print(f" Source: {segment_source.source_file}")
print(f" Output: {segment_source.output_file}")
print(f" Filters: {len(segment_source.filter)} step(s)")
preprocess_video(
narration_dir, segment_id, segment_source,
verbose, force, gnommo_scratch,
)
output_path = narration_dir / segment_source.output_file
if output_path.exists():
successfully_processed.append((segment_id, segment_source))
# --- Update narration.json ---
# Write processed segments; preserve any existing per-segment settings (skip/take/etc.)
_PRESERVE_KEYS = ("skip", "take", "begin", "end", "cutout", "use_audio_channels",
"defer_loudnorm", "volume", "zoom")
for segment_id, segment_source in successfully_processed:
existing_entry = existing_narration.get(segment_id, {})
entry: dict = {}
# Preserve settings the user may have set (trim points, cutout, etc.)
for key in _PRESERVE_KEYS:
if key in existing_entry:
entry[key] = existing_entry[key]
# Point source_file to the processed output
entry["source_file"] = segment_source.output_file
entry.setdefault("use_audio_channels", "auto")
entry.setdefault("defer_loudnorm", True)
existing_narration[segment_id] = entry
with open(narration_json_path, "w", encoding="utf-8") as f:
json.dump(existing_narration, f, indent=2)
if successfully_processed:
print(f"\n Updated narration.json ({len(successfully_processed)} segment(s))")
print(
f"\n Run 'gnommo -p <project> stitch' to stitch narration segments into one full length narration file."
)
# Also preprocess videos from videos.json (e.g. chroma key, color grade)
videos, videos_dir = parse_videos(project_path, config)
videos_to_process = [
(vid_id, vid_src)
for vid_id, vid_src in videos.items()
if vid_src.filter and not vid_src.is_shared
]
if videos_to_process:
print(f"\n Processing {len(videos_to_process)} video(s) from videos.json:")
for video_id, video_source in videos_to_process:
if video_source.output_file:
output_path = videos_dir / video_source.output_file
if output_path.exists() and not force:
print(
f" {video_id}: output exists, skipping (use --force to reprocess)"
)
continue
if dry_run:
print(
f" Would preprocess: {video_id} ({len(video_source.filter)} filter(s))"
)
continue
print(f" Processing: {video_id}")
preprocess_video(
videos_dir, video_id, video_source, verbose, force, gnommo_scratch
)
print("\nPreprocessing complete.")
return 0
# =============================================================================
# Trim Command — auto-detect silence bounds for narration segments
# =============================================================================
def cmd_trim(
project_path: Path,
verbose: bool,
force: bool = False,
threshold_db: float = -40.0,
) -> int:
"""
Auto-detect silence bounds for all narration segments and write skip/take
values into narration.json.
For each segment:
skip = max(0, first_sound_time - 0.5)
take = last_sound_time + 3.0 - skip (capped at file duration)
Segments that already have explicit skip or take values are left unchanged
unless --force is passed.
Use --threshold to adjust sensitivity, e.g. -25 to ignore clothing/room
noise that sits above -40 dB.
"""
from .parser import parse_project_config, parse_narration
from .preprocessor import detect_silence_bounds, get_video_duration
print(f"Auto-trimming narration: {project_path.name}")
config = parse_project_config(project_path)
narration, narration_dir = parse_narration(project_path, config)
if not narration:
print(" No narration segments found in narration.json")
print(" Run 'gnommo -p <project> import' first.")
return 1
narration_json_path = narration_dir / "narration.json"
raw_data: dict = _read_json(narration_json_path)
updated = 0
for seg_id in sorted(narration.keys()):
seg = narration[seg_id]
existing = raw_data.get(seg_id, {})
has_explicit = "skip" in existing or "take" in existing
if has_explicit and not force:
print(f" {seg_id}: already trimmed, skipping (use --force to redo)")
continue
# Always analyse the raw source file — it's always present and has the
# same audio as any processed version (processing is video-only).
source_path = narration_dir / seg.source_file
if not source_path.exists():
print(f" {seg_id}: source file not found ({seg.source_file}), skipping")
continue
print(f" {seg_id}: analysing...", end="", flush=True)
first_sound, last_sound = detect_silence_bounds(
source_path, noise_threshold_db=threshold_db, verbose=verbose
)
total_dur = get_video_duration(source_path)
new_skip = max(0.0, round(first_sound - 0.5, 3))
new_take = round(min(total_dur - new_skip, last_sound + 3.0 - new_skip), 3)
new_take = max(0.0, new_take)
print(
f" first={first_sound:.2f}s last={last_sound:.2f}s"
f" → skip={new_skip:.3f}s take={new_take:.3f}s"
)
raw_data[seg_id]["skip"] = new_skip
raw_data[seg_id]["take"] = new_take
updated += 1
if updated > 0:
with open(narration_json_path, "w", encoding="utf-8") as f:
json.dump(raw_data, f, indent=2)
print(f"\n Updated {updated} segment(s) in narration.json")
else:
print(f"\n No segments updated")
return 0
# =============================================================================
# Transcode Command — compress narration folder to H.265
# =============================================================================
def _get_video_codec(path: Path) -> str:
"""Return the codec name of the first video stream (e.g. 'hevc', 'prores', 'h264')."""
result = subprocess.run(
[
"ffprobe",
"-v",
"error",
"-select_streams",
"v:0",
"-show_entries",
"stream=codec_name",
"-of",
"default=noprint_wrappers=1:nokey=1",
str(path),
],
capture_output=True,
text=True,
)
return result.stdout.strip().lower()
def _transcode_processed_files(
project_path: Path,
verbose: bool,
dry_run: bool,
replace: bool,
force: bool,
alpha_quality: float,
) -> int:
"""
Compress _processed.mov files (ProRes 4444 + alpha) to HEVC+alpha via
Apple VideoToolbox.
For each _processed.mov:
1. Transcode to a temp file using hevc_videotoolbox with alpha.
2. Move the ProRes original into a prores/ subdirectory (never deleted).
3. Rename the compressed file to the original _processed.mov name
so stitch/render find it unchanged.
The prores/ subdirectory is never scanned — only top-level files are candidates.
If prores/<filename> already exists the file has already been compressed —
skip unless --force.
"""
from .parser import parse_project_config, parse_narration
print(f"Transcoding processed files (HEVC+alpha): {project_path.name}")
config = parse_project_config(project_path)
# Resolve narration_dir and videos_dir — processed files live in both
_narration, narration_dir = parse_narration(project_path, config)
videos_json_path = project_path / config.videos_path
videos_dir = videos_json_path.parent
# Glob both directories for *_processed.mov; skip any _prores.mov archives
search_dirs = [d for d in [narration_dir, videos_dir] if d.exists()]
candidates: list[Path] = []
seen: set[Path] = set()
for d in search_dirs:
for p in d.glob("*_processed.mov"):
if p not in seen and "_prores" not in p.stem:
seen.add(p)
candidates.append(p)
if not candidates:
print(" No _processed.mov files found.")
return 0
# Smallest first
candidates = [c for c in candidates if c.exists()]
candidates.sort(key=lambda f: f.stat().st_size)
total_original = 0
total_compressed = 0
transcoded = 0
skipped = 0
for src in candidates:
# Archive goes into prores/ subdirectory alongside the source file
prores_dir = src.parent / "prores"
archive = prores_dir / src.name
# Always skip files already encoded as HEVC — regardless of --replace or --force
if _get_video_codec(src) == "hevc":
print(f" {src.name}: already HEVC, skipping")
skipped += 1
continue
# Without --replace, skip if the archive already exists in prores/
if not replace and archive.exists() and not force:
size_mb = src.stat().st_size / 1_048_576
print(
f" {src.name}: already compressed ({size_mb:.1f} MB), skipping (use --force to redo)"
)
skipped += 1
continue
src_mb = src.stat().st_size / 1_048_576
print(f" {src.name} ({src_mb:.1f} MB) → HEVC+alpha", end="")
if dry_run:
print(" [dry-run]")
continue
print(" ...", end="", flush=True)
tmp_out = src.with_name(src.stem + "_hevc_tmp.mov")
cmd = [
"ffmpeg",
"-i",
str(src),
"-c:v",
"hevc_videotoolbox",
"-allow_sw",
"1",
"-alpha_quality",
str(alpha_quality),
"-tag:v",
"hvc1",
"-c:a",
"copy",
"-y",
str(tmp_out),
]
if verbose:
print()
print(" " + " ".join(cmd))
result = subprocess.run(
cmd,
capture_output=not verbose,
text=True,
)
if result.returncode != 0:
print(f"\n ERROR transcoding {src.name}")
if tmp_out.exists():
tmp_out.unlink()
if not verbose and result.stderr:
last_lines = result.stderr.strip().splitlines()[-5:]
for line in last_lines:
print(f" {line}", file=sys.stderr)
continue
out_mb = tmp_out.stat().st_size / 1_048_576
ratio = (1.0 - tmp_out.stat().st_size / src.stat().st_size) * 100
if replace:
# Delete ProRes original, move compressed into its place
src.unlink()
tmp_out.rename(src)
print(
f"\r {src.name} ({src_mb:.1f} MB) → HEVC+alpha"
f" ({out_mb:.1f} MB, -{ratio:.0f}%)"
)
else:
# Move ProRes original into prores/ subdirectory, compressed takes its place
prores_dir.mkdir(exist_ok=True)
src.rename(archive)
tmp_out.rename(src)
print(
f"\r {src.name} ({src_mb:.1f} MB) → HEVC+alpha"
f" ({out_mb:.1f} MB, -{ratio:.0f}%)"
f" [ProRes → prores/{archive.name}]"
)
total_original += int(src_mb * 1_048_576)
total_compressed += int(out_mb * 1_048_576)
transcoded += 1
print()
if dry_run:
print(f" [dry-run] Would compress {len(candidates) - skipped} file(s)")
return 0
if transcoded > 0:
orig_mb = total_original / 1_048_576
comp_mb = total_compressed / 1_048_576
saved_mb = orig_mb - comp_mb
ratio = (saved_mb / orig_mb * 100) if orig_mb else 0
print(
f" Compressed {transcoded} file(s): {orig_mb:.1f} MB → {comp_mb:.1f} MB"
f" (saved {saved_mb:.1f} MB, -{ratio:.0f}%)"
)
if skipped:
print(f" Skipped {skipped} already-compressed file(s)")
return 0
def cmd_transcode(
project_path: Path,
verbose: bool,
dry_run: bool = False,
replace: bool = False,
crf: int = 23,
force: bool = False,
processed: bool = False,
alpha_quality: float = 0.75,
) -> int:
"""
Transcode project video files to save disk space.
Default (1st pass, before preprocess):
Compress raw narration recordings to H.265. Output: {stem}_compressed.mp4.
Skips files with '_compressed.' or '_processed.' in the name.
Use --replace to delete originals after success.
With --processed (2nd pass, after preprocess):
Compress _processed.mov files (ProRes 4444 + alpha) to HEVC+alpha.
Archives the ProRes original as _prores.mov (never deleted).
The compressed file takes the original _processed.mov name so the
rest of the pipeline (stitch, render) finds it unchanged.
Uses Apple VideoToolbox (hevc_videotoolbox) with --alpha-quality.
"""
if processed:
return _transcode_processed_files(
project_path, verbose, dry_run, replace, force, alpha_quality
)
from .parser import parse_project_config, parse_narration
print(f"Transcoding narration: {project_path.name}")
config = parse_project_config(project_path)
_narration, narration_dir = parse_narration(project_path, config)
raw_dir = narration_dir / "raw_mov"
compressed_dir = narration_dir / "raw_mp4"
if not raw_dir.exists():
print(f" raw/ directory not found: {raw_dir}", file=sys.stderr)
print(f" Place raw recordings in {raw_dir} and run 'import' first.")
return 1
compressed_dir.mkdir(parents=True, exist_ok=True)
# Collect eligible video files from raw/ only
video_extensions = {".mp4", ".mov", ".avi", ".mkv", ".m4v", ".mts", ".webm"}
candidates = [
f
for f in raw_dir.iterdir()
if f.is_file()
and f.suffix.lower() in video_extensions
and not f.name.startswith(".")
]
if not candidates:
print(f" No video files found in {raw_dir}.")
return 0
# Process smallest files first
candidates.sort(key=lambda f: f.stat().st_size)
total_original = 0
total_compressed = 0
transcoded = 0
skipped = 0
for src in candidates:
# Output: compressed/<stem>.mp4 (clean name, no _compressed suffix)
output = compressed_dir / f"{src.stem}.mp4"
if output.exists() and not force:
size_mb = output.stat().st_size / 1_048_576
print(
f" {src.name}: already transcoded ({size_mb:.1f} MB), skipping (use --force to redo)"
)
skipped += 1
continue
src_mb = src.stat().st_size / 1_048_576
print(
f" raw/{src.name} ({src_mb:.1f} MB) → compressed/{output.name}", end=""
)
if dry_run:
print(" [dry-run]")
continue
print(" ...", end="", flush=True)
cmd = [
"ffmpeg",
"-i",
str(src),
"-vf",
"scale=-2:1080",
"-c:v",
"libx265",
"-crf",
str(crf),
"-preset",
"medium",
"-c:a",
"aac",
"-b:a",
"128k",
"-tag:v",
"hvc1",
"-y",
str(output),
]
if verbose:
print()
print(" " + " ".join(cmd))
result = subprocess.run(
cmd,
capture_output=not verbose,
text=True,
)
if result.returncode != 0:
print(f"\n ERROR transcoding {src.name}")
if not verbose and result.stderr:
# Print last few lines of ffmpeg stderr for diagnosis
last_lines = result.stderr.strip().splitlines()[-5:]
for line in last_lines:
print(f" {line}", file=sys.stderr)
continue
out_mb = output.stat().st_size / 1_048_576
ratio = (1.0 - output.stat().st_size / src.stat().st_size) * 100
print(
f"\r raw/{src.name} ({src_mb:.1f} MB) → compressed/{output.name} ({out_mb:.1f} MB, -{ratio:.0f}%)"
)
total_original += src.stat().st_size
total_compressed += output.stat().st_size
transcoded += 1
print()
if dry_run:
print(f" [dry-run] Would transcode {len(candidates) - skipped} file(s)")
return 0
if transcoded > 0:
orig_mb = total_original / 1_048_576
comp_mb = total_compressed / 1_048_576
saved_mb = orig_mb - comp_mb
ratio = (saved_mb / orig_mb * 100) if orig_mb else 0
print(
f" Transcoded {transcoded} file(s): {orig_mb:.1f} MB → {comp_mb:.1f} MB (saved {saved_mb:.1f} MB, -{ratio:.0f}%)"
)
if replace:
print(f" Originals deleted.")
if skipped:
print(f" Skipped {skipped} already-transcoded file(s)")
return 0
# =============================================================================
# Stitch Command (fast iteration on narration segments)
# =============================================================================
def cmd_stitch(
project_path: Path,
verbose: bool,
force: bool = False,
res: str = "full",
) -> int:
"""
Stitch narration segments from narration.json.
Reads segments from media/narration/narration.json, applies begin/end
trimming during concatenation, and writes output to media/videos/narration_combined.mov.
Also creates/updates an entry in videos.json with volume property.
"""
from .parser import parse_project_config, parse_narration, parse_videos
from .preprocessor import (
stitch_narration_segments,
ensure_downscaled_files_exist,
RES_CONFIGS,
)
mode_str = f" ({res.upper()})" if res != "full" else ""
print(f"Stitching narration: {project_path.name}{mode_str}")
config = parse_project_config(project_path)
narration, narration_dir = parse_narration(project_path, config)
if not narration:
print(" No narration segments found in media/narration/narration.json")
print(" Run 'gnommo -p <project> import' first to populate narration.json")
return 1
# Get videos_dir for output
if config and config.videos_path:
videos_json_path = project_path / config.videos_path
videos_dir = videos_json_path.parent
else:
videos_dir = project_path / "media" / "videos"
# Use downscaled dirs for non-full res
if res != "full":
cfg = RES_CONFIGS[res]
narration_dir = ensure_downscaled_files_exist(
narration_dir, res, force=False, verbose=verbose
)
videos_dir = videos_dir / cfg[2]
videos_dir.mkdir(parents=True, exist_ok=True)
print(f" Using {res} dirs: {narration_dir}, {videos_dir}")
# Get segment IDs in sorted order
segment_ids = sorted(narration.keys())
# Show what we're stitching
print(f"\n Segments ({len(segment_ids)}):")
for segment_id in segment_ids:
seg = narration[segment_id]
skip_str = f"skip={seg.skip:.1f}s" if seg.skip else ""
take_str = f"take={seg.take:.1f}s" if seg.take else ""
trim_info = ", ".join(filter(None, [skip_str, take_str]))
trim_str = f" ({trim_info})" if trim_info else ""
print(f" - {segment_id}{trim_str}")
stitch_output = videos_dir / "narration_combined.mov"
if stitch_output.exists() and not force:
print(f"\n Combined narration exists: {stitch_output.name}")
print(" (use --force to regenerate)")
else:
stitch_narration_segments(
narration_dir,
segment_ids,
narration,
stitch_output,
verbose=verbose,
default_end_trim=config.default_end_trim if config else 0.0,
)
# Run import videos again, because at this point narration_combined might have been created.
_import_videos(videos_dir, config, verbose)
# Always update the MAIN videos.json (parent of subdir when using low/tiny res)
# Downscaled dirs only affect file paths, not JSON metadata updates
main_videos_dir = videos_dir.parent if res != "full" else videos_dir
videos_json_path = main_videos_dir / "videos.json"
if True: # Always update JSON regardless of proxy mode
existing_videos: dict = {}
if videos_json_path.exists():
existing_videos = _read_json(videos_json_path)
# Get cutout from first narration segment
first_seg = narration[segment_ids[0]]
cutout = (
first_seg.cutout or "talkinghead"
)
# Create/update narration_combined entry
existing_videos["narration_combined"] = {
"source_file": "narration_combined.mov",
"output_file": "narration_combined.mov",
"cutout": cutout,
"always_visible": True,
"volume": 1.0,
}
with open(videos_json_path, "w", encoding="utf-8") as f:
json.dump(existing_videos, f, indent=2)
print(f"\n Updated videos.json with narration_combined entry (volume=1.0)")
print(" Edit videos.json to adjust volume if needed.")
print("\nConcatenation complete.")
# Automatically transcribe to keep transcript in sync with narration
print("\n" + "=" * 60)
print("Auto-running transcribe to sync with new narration...")
print("=" * 60 + "\n")
return cmd_transcribe(project_path, verbose, res=res)
# =============================================================================
# Render Command
# =============================================================================
def _format_time(seconds: float) -> str:
"""Format seconds as MM:SS.ms"""
if seconds < 0:
return "??:??.??"
mins = int(seconds // 60)
secs = seconds % 60
return f"{mins:02d}:{secs:05.2f}"
def _print_render_plan_details(plan, marker_timings, slides: dict) -> None:
"""
Print a detailed render plan showing each marker with its aligned time.
Uses marker_timings from the transformer which contains alignment info.
"""
from .models import CAMERA_PRESETS
print("\n RENDER PLAN:")
print(" " + "-" * 76)
# Build lookup for video events by video_id
video_events_by_id = {}
for event in plan.video_events:
video_events_by_id[event.video_id] = event
audio_events_by_time = {}
for event in plan.audio_events:
t = round(event.start_time, 1)
if t not in audio_events_by_time:
audio_events_by_time[t] = []
audio_events_by_time[t].append(event)
camera_events_by_time = {}
for event in plan.camera_events:
t = round(event.time, 1)
if t not in camera_events_by_time:
camera_events_by_time[t] = []
camera_events_by_time[t].append(event)
# Detect slide markers that share a timestamp with the adjacent slide marker.
# Two slides at the same time means alignment is ambiguous — treat as an error.
slide_timings = [
t for t in marker_timings if t.marker_id in slides and t.timestamp >= 0
]
collision_ids: set[str] = set()
for a, b in zip(slide_timings, slide_timings[1:]):
if abs(a.timestamp - b.timestamp) < 0.1:
collision_ids.add(a.marker_id)
collision_ids.add(b.marker_id)
# Print each marker timing
aligned_count = 0
unaligned_count = 0
collision_count = 0
for timing in marker_timings:
marker_id = timing.marker_id
context = timing.context
if len(context) > 50:
context = context[:47] + "..."
if timing.timestamp >= 0:
time_str = _format_time(timing.timestamp)
# Show confidence if fuzzy match
conf_str = ""
if timing.confidence < 1.0:
conf_str = f" ({timing.confidence:.0%})"
# Determine marker type for display
if marker_id in slides:
if marker_id in collision_ids:
collision_count += 1
print(
f' {marker_id:6} {time_str}{conf_str} COLLISION - same time as adjacent slide - "{context}"'
)
else:
aligned_count += 1
print(f' {marker_id:6} {time_str}{conf_str} "{context}"')
elif any(
marker_id.startswith(p)
for p in ("video:", "vft:", "vfb:", "vst:", "vsb:", "vft:", "vfbp:", "vstp:", "vsbp:")
):
aligned_count += 1
pfx_len = next(
len(p)
for p in ("video:", "vft:", "vfb:", "vst:", "vsb:", "vft:", "vfbp:", "vstp:", "vsbp:")
if marker_id.startswith(p)
)
video_id = marker_id[pfx_len:]
# Find corresponding event by video_id
event = video_events_by_id.get(video_id)
if event:
cutout_name = event.cutout_name
end_on = event.video_source.end_on or "next_slide"
layer_tag = f" [{event.layer}]"
else:
cutout_name = "?"
end_on = "next_slide"
layer_tag = ""
cache_ind = " 📁" if video_id in plan.cached_files else ""
print(
f" {marker_id:20} {time_str} in '{cutout_name}' [{end_on}]{layer_tag}{cache_ind}"
)
elif marker_id.startswith("narration:"):
aligned_count += 1
video_id = marker_id[10:]
cache_ind = " 📁" if video_id in plan.cached_files else ""
print(f" {marker_id:20} {time_str} (continuous){cache_ind}")
elif marker_id in CAMERA_PRESETS:
aligned_count += 1
print(f" {time_str} [{marker_id}]")
elif marker_id.startswith("audio:"):
aligned_count += 1
print(f" {time_str} [audio:{marker_id[1:]}]")
else:
aligned_count += 1
print(f' {marker_id:6} {time_str} "{context}"')
else:
unaligned_count += 1
print(f' {marker_id:6} ??:??.?? NOT ALIGNED - "{context}"')
print(" " + "-" * 76)
# Summary
total_markers = len(marker_timings)
slide_markers = [t for t in marker_timings if t.marker_id in slides]
good_slides = len(
[
t
for t in slide_markers
if t.timestamp >= 0 and t.marker_id not in collision_ids
]
)
total_slides = len(slide_markers)
issues = []
if unaligned_count:
issues.append(f"{unaligned_count} UNALIGNED")
if collision_count:
issues.append(f"{collision_count} COLLISION")
status = "OK" if not issues else ", ".join(issues)
print(f" Markers: {aligned_count}/{total_markers} aligned ({status})")
print(f" Slides: {good_slides}/{total_slides}")
print(
f" Videos: {len(plan.video_events)} triggered, {len(plan.narration_videos)} always-visible"
)
if plan.outro_events:
print(f" Outro: {len(plan.outro_events)} video(s)")
for event in plan.outro_events:
print(
f" - {event.video_id}: {_format_time(event.start_time)} - {_format_time(event.end_time)}"
)
print(f" Duration: {_format_time(plan.total_duration)}")
def _parse_slide_range(slides_arg: str) -> tuple[str, Optional[str]]:
"""Parse slide range argument like 'S1:S10' or 'S5:' into a tuple."""
if ":" not in slides_arg:
raise ValueError(
f"Invalid slide range '{slides_arg}'. Expected format: S1:S10 or S5:"
)
parts = slides_arg.split(":", 1)
start_slide = parts[0].strip()
end_slide = parts[1].strip() if parts[1].strip() else None
if not start_slide:
raise ValueError(
f"Invalid slide range '{slides_arg}'. Start slide is required."
)
return start_slide, end_slide
def _writeback_video_metadata(plan, project_path, config) -> None:
"""Write back cutout/layer derived from shorthand markers to videos.json.
When a shorthand like [vfb:FARTSection1] is used and FARTSection1 has no
'cutout' set in videos.json, this persists the resolved cutout (and layer if
the shorthand implies a non-default layer) back to the file. Once written,
subsequent renders read the value directly and no further write-back occurs.
"""
import json
videos_json_path = project_path / config.videos_path
if not videos_json_path.exists():
return
# Collect field updates per video_id
writebacks: dict[str, dict] = {}
for event in plan.video_events:
video_id = event.video_id
source = event.video_source
if source.is_shared:
continue # shared videos live in their own file
updates = {}
if source.cutout is None and event.cutout_name:
updates["cutout"] = event.cutout_name
if event.layer != source.layer:
updates["layer"] = event.layer
if updates:
writebacks.setdefault(video_id, {}).update(updates)
if not writebacks:
return
with open(videos_json_path, "r", encoding="utf-8") as f:
raw = json.load(f)
changed = False
for video_id, updates in writebacks.items():
if video_id not in raw:
continue
for field, value in updates.items():
if raw[video_id].get(field) != value:
raw[video_id][field] = value
changed = True
if changed:
with open(videos_json_path, "w", encoding="utf-8") as f:
json.dump(raw, f, indent=2, ensure_ascii=False)
written = ", ".join(
f"{vid}({', '.join(upd)})" for vid, upd in writebacks.items()
)
print(f" Updated videos.json: {written}")
def cmd_render(
project_path: Path,
verbose: bool,
dry_run: bool,
slides_arg: str = None,
res: str = "full",
force: bool = False,
) -> int:
"""Render final video."""
from .parser import (
parse_audio,
parse_manuscript,
parse_project_config,
parse_slides,
parse_videos,
save_citations,
)
from .transcriber import load_transcript
from .validator import validate_project
from .transformer import build_render_plan
from .renderer import render, generate_ffmpeg_command_string
from .preprocessor import RES_CONFIGS, ensure_downscaled_files_exist
# Parse slide range if provided
slide_range = None
if slides_arg:
slide_range = _parse_slide_range(slides_arg)
print(f"Rendering: {project_path.name} (slides {slides_arg})")
else:
print(f"Rendering: {project_path.name}")
# Show resolution mode
if res != "full":
cfg = RES_CONFIGS[res]
print(f" Resolution: {res.upper()} ({cfg[0]}x{cfg[1]})")
# Show cache status
cache_info = get_cache_info()
if cache_info:
print(f" Cache: {cache_info}")
# Stage 1: Parse
print("\n[1/4] Parsing...")
manuscript_text, markers, malformed, citations = parse_manuscript(project_path)
# Save citations for later use (e.g., description generation)
if citations:
citations_path = project_path / "citations.json"
save_citations(citations, citations_path)
config = parse_project_config(project_path)
# Override resolution for preview modes
if res != "full":
cfg = RES_CONFIGS[res]
config.resolution = (cfg[0], cfg[1])
slides = parse_slides(project_path, config)
videos, videos_dir = parse_videos(project_path, config)
# Non-full res: use downscaled video directory, create on-the-fly if needed
if res != "full":
# Skip downscaling sources that have a preprocessed output_file — the
# renderer will use the full-res processed version instead, saving disk space.
sources_with_output = {v.source_file for v in videos.values() if v.output_file}
videos_dir = ensure_downscaled_files_exist(
videos_dir,
res,
force=False,
verbose=verbose,
skip_sources=sources_with_output,
)
if verbose:
print(f" Using {res} dir: {videos_dir}")
audio, audio_dir = parse_audio(project_path, config)
# Load whisper transcription JSON
# Check for narration_combined in videos.json (new workflow) or multi-segment in config (legacy)
combined_path = videos_dir / "narration_combined.mov"
# Try cache fallback for combined narration
resolved_combined, _ = resolve_with_cache(combined_path, project_path)
if "narration_combined" in videos and resolved_combined.exists():
# New workflow: narration_combined was created by 'gnommo concat' and is in videos.json
# This entry has the correct volume setting from videos.json
transcript_path = resolved_combined.with_suffix(".transcript.json")
config.main_video = "narration_combined"
if verbose:
print(
f" Using combined narration: {resolved_combined.name} (volume={videos['narration_combined'].volume})"
)
elif isinstance(config.main_video, list) and len(config.main_video) > 1:
# Legacy: Multi-segment narration with main_video array in project.json
resolved_combined, _ = resolve_with_cache(combined_path, project_path)
transcript_path = resolved_combined.with_suffix(".transcript.json")
if not resolved_combined.exists():
print(
f"Error: Combined narration not found: {combined_path}", file=sys.stderr
)
print(
"Run 'gnommo -p <project> concat' first to concatenate segments.",
file=sys.stderr,
)
return 1
# Create a synthetic video entry for the combined narration
# Inherit settings from the first segment
first_segment_id = config.main_video[0]
if first_segment_id in videos:
first_segment = videos[first_segment_id]
from .models import VideoSource
combined_video = VideoSource(
source_file="narration_combined.mov",
filter=first_segment.filter,
output_file=None, # Already processed
cutout=first_segment.cutout,
always_visible=True,
skip=0.0, # Already trimmed during concatenation
take=None,
)
videos["_narration_combined"] = combined_video
config.main_video = "_narration_combined"
if verbose:
print(f" Using combined narration: {combined_path.name}")
else:
# Check if narration.json exists (new workflow) - if so, require narration_combined
narration_json = project_path / "media" / "narration" / "narration.json"
if narration_json.exists():
print(
f"Error: narration_combined not found in videos.json", file=sys.stderr
)
print(
f"You have narration segments in narration.json but haven't stitched them.",
file=sys.stderr,
)
print(
f"Run 'gnommo -p {project_path.name} stitch' first.",
file=sys.stderr,
)
return 1
# Single video - look for .transcript.json next to the narration video
result = _find_narration_video(config, videos)
if result:
video_id, narration_source = result
config.main_video = video_id # Ensure main_video is set to the found video
video_path = videos_dir / narration_source.source_file
transcript_path = video_path.with_suffix(".transcript.json")
else:
transcript_path = project_path / "transcript.json"
# Try cache fallback for transcript
transcript_path, _ = resolve_with_cache(transcript_path, project_path)
if not transcript_path.exists():
print(f"Error: Transcription not found: {transcript_path}", file=sys.stderr)
print(f"Run 'gnommo -p {project_path.name} transcribe' first.", file=sys.stderr)
return 1
transcription = load_transcript(transcript_path, project_path)
if verbose:
print(f" - Markers in manuscript: {len(markers)}")
print(f" - Slides defined: {len(slides)}")
print(f" - Audio clips: {len(audio)}")
print(f" - Transcription words: {len(transcription)}")
# Stage 2: Validate
print("\n[2/4] Validating...")
warnings = validate_project(
project_path, markers, config, slides, videos, videos_dir, malformed
)
for w in warnings:
print(f" Warning: {w}")
print(" Passed.")
# Stage 3: Transform (includes on-the-fly alignment)
print("\n[3/4] Building render plan...")
plan, marker_timings = build_render_plan(
project_path,
config,
slides,
videos,
videos_dir,
manuscript_text,
transcription,
audio,
audio_dir,
slide_range=slide_range,
)
if plan.time_offset > 0:
print(f" Time offset: {plan.time_offset:.1f}s (partial render)")
# Persist shorthand-derived cutout/layer back to videos.json (idempotent)
_writeback_video_metadata(plan, project_path, config)
# Print detailed render plan with alignment info
_print_render_plan_details(plan, marker_timings, slides)
if plan.audio_events:
print(f"\n Audio effects:")
for event in plan.audio_events:
loop_str = " (loop)" if event.audio_def.loop else ""
pause_str = " [ignores pauses]" if event.audio_def.ignore_pauses else ""
print(
f" - {event.audio_id}: '{event.audio_def.file}' @ {_format_time(event.start_time)}{loop_str}{pause_str}"
)
# Show always-visible videos
if plan.narration_videos:
print(f"\n Always-visible videos:")
for video_id, video_source, cutout in plan.narration_videos:
skip_str = (
f" (skip: {video_source.skip:.1f}s)" if video_source.skip > 0 else ""
)
cache_ind = " 📁" if video_id in plan.cached_files else ""
print(f" - {video_id} in '{video_source.cutout}'{skip_str}{cache_ind}")
# Show narration pauses
if plan.narration_pauses:
print(f"\n Narration pauses:")
for pause in plan.narration_pauses:
print(
f" - {pause.video_id} at {_format_time(pause.output_time)} "
f"for {pause.duration:.1f}s (narration freezes at {_format_time(pause.narration_time)})"
)
# Write tasks file with both missing assets and alignment issues
missing_videos = _collect_missing_video_markers(markers, videos)
slide_timings_for_collision = [
t for t in marker_timings if t.marker_id in slides and t.timestamp >= 0
]
collision_ids_render = set()
for _a, _b in zip(slide_timings_for_collision, slide_timings_for_collision[1:]):
if abs(_a.timestamp - _b.timestamp) < 0.1:
collision_ids_render.add(_a.marker_id)
collision_ids_render.add(_b.marker_id)
alignment_issues = [
(t.marker_id, t.context)
for t in marker_timings
if t.marker_id in slides
and (t.timestamp < 0 or t.marker_id in collision_ids_render)
]
_write_tasks_file(project_path, missing_videos, alignment_issues)
# Check for unaligned markers
unaligned = [t for t in marker_timings if t.timestamp < 0]
if unaligned:
print(f"\n WARNING: {len(unaligned)} marker(s) could not be aligned!")
for t in unaligned:
print(f' [{t.marker_id}] - "{t.context}"')
if not force:
print(f"\n Run with -f/--force to render anyway.")
return 1
else:
print(f"\n Continuing anyway due to --force flag...")
# Stage 4: Render
# Determine output filename and directory
if config.output_video:
out_filename = config.output_video
elif slide_range:
start, end = slide_range
range_suffix = f"_{start}-{end}" if end else f"_{start}-end"
out_filename = f"final{range_suffix}.mp4"
else:
out_filename = f"{config.co}.mp4"
out_dir = project_path / "out" / res if res != "full" else project_path / "out"
output_path = out_dir / out_filename
plan.output_path = output_path
if dry_run:
print("\n[4/4] FFmpeg command (dry run):")
print(generate_ffmpeg_command_string(plan, output_path))
return 0
print("\n[4/4] Rendering...")
render(plan, output_path, verbose=verbose)
print(f" Output: {output_path}")
print("\nDone.")
return 0
# =============================================================================
# Transcribe Command
# =============================================================================
def _find_narration_video(config, videos: dict) -> Optional[tuple[str, "VideoSource"]]:
"""
Find the video to use for transcription/narration.
Priority:
1. config.audio_source if set
2. First video with always_visible=True
3. First video in dict
"""
from .models import VideoSource
# 1. Check audio_source config
if config.audio_source and config.audio_source in videos:
return config.audio_source, videos[config.audio_source]
# 2. Find always_visible video (main talking head)
for video_id, video_source in videos.items():
if video_source.always_visible:
return video_id, video_source
# 3. Fall back to first video
if videos:
video_id = next(iter(videos.keys()))
return video_id, videos[video_id]
return None
def cmd_transcribe(
project_path: Path, verbose: bool, res: str = "full", final: bool = False
) -> int:
"""Transcribe video audio using Whisper."""
from .transcriber import transcribe_video, save_transcript, words_to_srt
from .parser import parse_project_config, parse_videos
from .preprocessor import ensure_downscaled_files_exist
config = parse_project_config(project_path)
# Handle --final mode: transcribe the rendered output for YouTube captions
if final:
path = project_path / "out" / f"{config.output_video}.mp4"
return _transcribe_final(path, verbose)
mode_str = f" ({res.upper()})" if res != "full" else ""
print(f"Transcribing: {project_path.name}{mode_str}")
videos, videos_dir = parse_videos(project_path, config)
if not videos:
print("Error: No videos defined in videos.json", file=sys.stderr)
return 1
# Non-full res: use downscaled video directory
if res != "full":
videos_dir = ensure_downscaled_files_exist(
videos_dir, res, force=False, verbose=verbose
)
# Check for multi-segment narration (concatenated file)
if isinstance(config.main_video, list) and len(config.main_video) > 1:
video_path = videos_dir / "narration_combined.mov"
if not video_path.exists():
print(f"Error: Combined narration not found: {video_path}", file=sys.stderr)
print(
"Run 'gnommo -p <project> pre' first to concatenate segments.",
file=sys.stderr,
)
return 1
print(f" Using combined narration: {video_path.name}")
else:
# Single video - find it using existing logic
result = _find_narration_video(config, videos)
if not result:
print("Error: No suitable video found for transcription", file=sys.stderr)
return 1
video_id, video_source = result
video_path = videos_dir / video_source.source_file
if not video_path.exists():
print(f"Error: Video not found: {video_path}", file=sys.stderr)
return 1
print(f" Video: {video_path.name}")
words = transcribe_video(video_path, model="base")
output_path = video_path.with_suffix(".transcript.json")
save_transcript(words, output_path)
print(f" - Transcribed {len(words)} words")
print(f" - Duration: {words[-1].end:.1f}s" if words else " - No words found")
print(f" - Saved: {output_path}")
if verbose and words:
preview = " ".join(w.word for w in words[:10])
print(f" - Preview: {preview}...")
return 0
def _transcribe_final(final_video: Path, verbose: bool) -> int:
"""
Transcribe the final rendered video and generate SRT captions for YouTube.
Looks and creates out filename.srt suitable for upload.
"""
from .transcriber import transcribe_video, save_transcript, words_to_srt
print(f"Transcribing final output: {final_video}")
if not final_video.exists():
print(f"Error: Final video not found: {final_video}", file=sys.stderr)
print("Run 'gnommo render' first.", file=sys.stderr)
return 1
print(f" Video: {final_video.name}")
# Transcribe with word-level timestamps
words = transcribe_video(final_video, model="base")
if not words:
print("Error: No words transcribed from video", file=sys.stderr)
return 1
# Save JSON transcript
transcript_path = final_video.with_suffix(".transcript.json")
save_transcript(words, transcript_path)
# Generate SRT captions
srt_path = final_video.with_suffix(".srt")
srt_content = words_to_srt(words)
srt_path.write_text(srt_content, encoding="utf-8")
print(f" - Transcribed {len(words)} words")
print(f" - Duration: {words[-1].end:.1f}s")
print(f" - Transcript: {transcript_path}")
print(f" - Captions: {srt_path}")
# Count caption segments
caption_count = srt_content.count("\n\n") + 1
print(f" - Caption segments: {caption_count}")
if verbose and words:
preview = " ".join(w.word for w in words[:15])
print(f" - Preview: {preview}...")
print("\nSRT file ready for YouTube upload.")
return 0
# =============================================================================
# Align Command
# =============================================================================
def cmd_align(project_path: Path, verbose: bool) -> int:
"""Preview manuscript marker alignment (no files written)."""
from .transcriber import load_transcript
from .transformer import align_markers_to_transcription
from .parser import (
parse_project_config,
parse_videos,
parse_slides,
parse_audio,
parse_manuscript,
save_citations,
)
print(f"Alignment preview: {project_path.name}")
print(" (This is a preview - alignment happens automatically during render)")
# Load manuscript (cites are stripped at parse time)
manuscript_text, _, _, citations = parse_manuscript(project_path)
# Save citations for later use (e.g., description generation)
if citations:
citations_path = project_path / "citations.json"
save_citations(citations, citations_path)
# Load project config and resources
config = parse_project_config(project_path)
slides = parse_slides(project_path, config)
videos, videos_dir = parse_videos(project_path, config)
audio, _ = parse_audio(project_path, config)
# Find transcription (from narration video)
result = _find_narration_video(config, videos)
if not result:
print("Error: No suitable video found for transcription", file=sys.stderr)
return 1
video_id, video_source = result
video_path = videos_dir / video_source.source_file
transcript_path = video_path.with_suffix(".transcript.json")
# Try cache fallback for transcript
transcript_path, _ = resolve_with_cache(transcript_path, project_path)
if not transcript_path.exists():
print(f"Error: Transcription not found: {transcript_path}", file=sys.stderr)
print(f"Run 'gnommo -p {project_path.name} transcribe' first.", file=sys.stderr)
return 1
print(f" Loading: {transcript_path.name}")
transcription = load_transcript(transcript_path, project_path)
print(f" - {len(transcription)} words")
# Align (cite markers already stripped at parse time)
print("\n Aligning markers to transcription...")
timings = align_markers_to_transcription(
manuscript_text, transcription, slides=slides, videos=videos, audio=audio
)
# Report alignment results
unmatched = 0
fuzzy_matched = 0
exact_matched = 0
for t in timings:
if t.timestamp >= 0:
if t.confidence >= 1.0:
exact_matched += 1
if verbose:
print(f" [{t.marker_id}] @ {_format_time(t.timestamp)}")
else:
fuzzy_matched += 1
# Always show fuzzy matches so user can verify
print(
f" [{t.marker_id}] @ {_format_time(t.timestamp)} (fuzzy {t.confidence:.0%})"
)
else:
print(f' [{t.marker_id}] NOT FOUND - "{t.context}"')
unmatched += 1
# Summary
total = len(timings)
print(f"\n Alignment summary:")
print(f" - Exact matches: {exact_matched}/{total}")
if fuzzy_matched > 0:
print(f" - Fuzzy matches (60%+ words): {fuzzy_matched}/{total}")
if unmatched > 0:
print(f" - NOT FOUND: {unmatched}/{total}")
print(
f"\n Some markers could not be aligned. Check manuscript.txt matches the spoken audio."
)
return 0
# =============================================================================
# All Command (Full Pipeline)
# =============================================================================
def _files_modified_since(root: Path, since: float, pattern: str) -> bool:
"""Return True if any file matching pattern under root has mtime > since."""
try:
for p in root.rglob(pattern):
if p.is_file() and p.stat().st_mtime > since:
return True
except (OSError, PermissionError):
pass
return False
def cmd_all(
project_path: Path,
verbose: bool,
dry_run: bool,
res: str = "full",
force: bool = False,
) -> int:
"""Run full pipeline: import → transcode → preprocess → transcode --processed → trim → stitch → render → handoff.
Cascade rule: if any stage produces output, all subsequent stages are forced
to re-run (cascade_force=True), regardless of whether --force was passed.
This ensures downstream caches are always consistent with upstream changes.
"""
from .handoff import cmd_handoff
print(f"=== Full Pipeline: {project_path.name} ===\n")
# cascade_force starts at --force. Once any stage does real work it flips to
# True so all downstream stages re-run unconditionally.
cascade_force = force
print(">>> Step 1/8: Import\n")
result = cmd_import(project_path, cascade_force, verbose)
if result != 0:
return result
print("\n>>> Step 2/8: Transcode narration (H.265)\n")
t0 = time.time()
result = cmd_transcode(
project_path, verbose, dry_run, replace=False, crf=23, force=cascade_force
)
if result != 0:
return result
# Step 2 does not cascade: preprocess already checks its own output existence.
# A broad *_compressed.mp4 pattern would falsely match pre-existing raw_mp4/ sources.
print("\n>>> Step 3/8: Preprocess\n")
t0 = time.time()
result = cmd_preprocess(project_path, verbose, dry_run, cascade_force, workers=1, res=res)
if result != 0:
return result
if (
_files_modified_since(project_path, t0, "*_processed.mov")
or _files_modified_since(project_path, t0, "*_processed.webm")
):
cascade_force = True
print("\n>>> Step 4/8: Transcode processed (HEVC+alpha)\n")
t0 = time.time()
result = cmd_transcode(
project_path,
verbose,
dry_run,
replace=False,
crf=23,
force=cascade_force,
processed=True,
alpha_quality=1.0,
)
if result != 0:
return result
if _files_modified_since(project_path, t0, "*_processed.mov"):
cascade_force = True
print("\n>>> Step 5/8: Trim\n")
t0 = time.time()
result = cmd_trim(project_path, verbose, force=cascade_force, threshold_db=-40.0)
if result != 0:
return result
# Trim modifies narration.json skip/take values; any change invalidates stitch
if _files_modified_since(project_path, t0, "narration.json"):
cascade_force = True
print("\n>>> Step 6/8: Stitch\n")
t0 = time.time()
result = cmd_stitch(project_path, verbose, cascade_force, res=res)
if result != 0:
return result
if _files_modified_since(project_path, t0, "narration_combined.mov"):
cascade_force = True
print("\n>>> Step 7/8: Render\n")
result = cmd_render(project_path, verbose, dry_run, res=res, force=cascade_force)
if result != 0:
return result
print("\n>>> Step 8/8: Handoff\n")
return cmd_handoff(project_path, verbose, file_override=None, prod=False, res=res)
# =============================================================================
# Description Command
# =============================================================================
def cmd_description(project_path: Path, verbose: bool) -> int:
"""Generate YouTube description file with chapters, citations, and attributions."""
from .parser import (
parse_audio,
parse_manuscript,
parse_project_config,
parse_slides,
parse_videos,
load_citations,
)
from .transcriber import load_transcript
from .transformer import align_markers_to_transcription
from .description import write_description_file
print(f"Generating description: {project_path.name}")
# Parse all project files
manuscript_text, markers, _, _ = parse_manuscript(project_path)
# Load citations from file (saved during parse/render/align stages)
citations_path = project_path / "citations.json"
citations = load_citations(citations_path)
config = parse_project_config(project_path)
slides = parse_slides(project_path, config)
videos, videos_dir = parse_videos(project_path, config)
audio, _ = parse_audio(project_path, config)
# Load transcription for alignment (optional but recommended)
transcription = None
result = _find_narration_video(config, videos)
if result:
_, narration_source = result
video_path = videos_dir / narration_source.source_file
transcript_path = video_path.with_suffix(".transcript.json")
# Try cache fallback for transcript
transcript_path, _ = resolve_with_cache(transcript_path, project_path)
if transcript_path.exists():
transcription = load_transcript(transcript_path, project_path)
if verbose:
print(f" Loaded transcription: {len(transcription)} words")
else:
print(f" Warning: No transcription found at {transcript_path}")
print(
f" Run 'gnommo -p {project_path.name} transcribe' for better timestamps."
)
# Align markers to get timings
print(" Aligning markers...")
marker_timings = align_markers_to_transcription(
manuscript_text,
transcription or [],
slides=slides,
videos=videos,
audio=audio,
)
if verbose:
aligned = sum(1 for t in marker_timings if t.timestamp >= 0)
print(f" Aligned {aligned}/{len(marker_timings)} markers")
# Generate description
output_path = project_path / "out" / "description_youtube.txt"
description = write_description_file(
output_path=output_path,
config=config,
manuscript_text=manuscript_text,
slides=slides,
videos=videos,
marker_timings=marker_timings,
transcription=transcription,
citations=citations,
)
# Print summary
lines = description.split("\n")
print(f"\n Output: {output_path}")
print(f" Length: {len(description)} characters, {len(lines)} lines")
# Show sections found
sections = []
if config.description:
sections.append("description")
if "CHAPTERS" in description:
sections.append("chapters")
if "REFERENCES" in description:
sections.append("references")
if "STOCK FOOTAGE" in description:
sections.append("attributions")
if config.footer:
sections.append("footer")
print(f" Sections: {', '.join(sections)}")
if verbose:
print("\n --- Preview ---")
preview_lines = lines[:20]
for line in preview_lines:
print(f" {line}")
if len(lines) > 20:
print(f" ... ({len(lines) - 20} more lines)")
print("\nDone.")
return 0
def cmd_archive(project_path: Path, verbose: bool, dry_run: bool) -> int:
"""Archive project files to external cache storage."""
from .cache import load_cache_config
print(f"Archiving: {project_path.name}")
# Check cache is configured
cache_base = load_cache_config()
if cache_base is None:
print("Error: Cache not configured. Create ~/.gnommo.conf with:")
print(" [cache]")
print(" path = /Volumes/YourDisk/gnommo")
return 1
if not cache_base.exists():
print(f"Error: Cache path not accessible: {cache_base}")
print("Make sure the external drive is connected.")
return 1
# Build destination path
dest_path = cache_base / project_path.name
print(f" Source: {project_path}")
print(f" Destination: {dest_path}")
# Create destination if needed
if not dry_run:
dest_path.mkdir(parents=True, exist_ok=True)
# Use rsync to sync media files
# -a: archive mode (preserves permissions, timestamps, etc.)
# -v: verbose
# --progress: show progress
# --exclude: skip files we don't want to sync
rsync_cmd = [
"rsync",
"-av",
"--progress",
"--exclude=*.py",
"--exclude=__pycache__",
"--exclude=.git",
"--exclude=.DS_Store",
f"{project_path}/",
f"{dest_path}/",
]
if dry_run:
rsync_cmd.insert(1, "--dry-run")
print("\n [DRY RUN] Would execute:")
print(f" {' '.join(rsync_cmd)}")
else:
print("\n Syncing files...")
if verbose:
print(f" Command: {' '.join(rsync_cmd)}")
result = subprocess.run(rsync_cmd)
if result.returncode != 0:
print(f"Error: rsync failed with code {result.returncode}")
return 1
# Update project.json with synced_time
if not dry_run:
project_json_path = project_path / "project.json"
if project_json_path.exists():
try:
data = _read_json(project_json_path.read_text(encoding="utf-8"))
data["synced_time"] = datetime.now().isoformat()
project_json_path.write_text(
json.dumps(data, indent=2, ensure_ascii=False) + "\n",
encoding="utf-8",
)
print(
f"\n Updated project.json with synced_time: {data['synced_time']}"
)
except (json.JSONDecodeError, IOError) as e:
print(f"Warning: Could not update project.json: {e}")
print("\nDone.")
return 0
# =============================================================================
# Extract Audio Command
# =============================================================================
def _extract_audio_file(
source_path: Path,
output_dir: Path,
name: str,
channel: str,
verbose: bool,
) -> int:
"""
Extract audio from a single video file to WAV.
Args:
source_path: Path to the source video file
output_dir: Directory to save the WAV file
name: Base name for the output file (without extension)
channel: "left", "right", or "both"
verbose: Print verbose output
Returns:
0 on success, 1 on error
"""
# Build output filename
if channel == "both":
output_name = f"{name}.wav"
else:
output_name = f"{name}_{channel}.wav"
output_path = output_dir / output_name
print(f" Channel: {channel}")
print(f" Source: {source_path}")
print(f" Output: {output_path}")
# Build ffmpeg command
cmd = [
"ffmpeg",
"-y", # Overwrite
"-i",
str(source_path),
"-vn", # No video
]
# Channel selection
if channel == "left":
cmd.extend(["-af", "pan=mono|c0=c0"])
elif channel == "right":
cmd.extend(["-af", "pan=mono|c0=c1"])
# "both" keeps stereo, no filter needed
# Output format: 48kHz 16-bit WAV (standard for audio editing)
cmd.extend(
[
"-ar",
"48000", # 48kHz sample rate
"-acodec",
"pcm_s16le", # 16-bit PCM
str(output_path),
]
)
if verbose:
print(f" Command: {' '.join(cmd)}")
print(f" Extracting...", end=" ", flush=True)
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"Error!")
print(f" {result.stderr}", file=sys.stderr)
return 1
# Get duration info
duration_cmd = [
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
str(output_path),
]
duration_result = subprocess.run(duration_cmd, capture_output=True, text=True)
duration_str = ""
if duration_result.returncode == 0:
try:
duration = float(duration_result.stdout.strip())
duration_str = f" ({duration:.1f}s)"
except ValueError:
pass
print(f"Done{duration_str}")
print(f"\n Open in Audition to experiment with:")
print(f" - Effect > Noise Reduction")
print(f" - Effect > Compressor")
print(f" - Effect > Filter Curve EQ")
print(f" - Effect > Loudness Normalization")
print(
f"\n Once you find good settings, update narration.json with matching filter config."
)
return 0
def cmd_extract_audio(
project_path: Path,
verbose: bool,
segment: Optional[str] = None,
channel: str = "both",
combined: bool = False,
) -> int:
"""
Extract audio from narration segments to WAV files for editing in Audacity.
This allows you to experiment with audio processing settings (EQ, compression,
noise reduction) in external software before applying them in the pipeline.
Args:
project_path: Path to the project directory
verbose: Enable verbose output
segment: Specific segment ID to extract, or None for all segments
channel: Which channel(s) to extract: "left", "right", or "both"
combined: If True, extract from narration_combined.mov instead of segments
"""
from .parser import parse_project_config, parse_narration, parse_videos
print(f"Extracting audio: {project_path.name}")
config = parse_project_config(project_path)
# Handle --combined mode: extract from narration_combined.mov
if combined:
videos, videos_dir = parse_videos(project_path, config)
combined_path = videos_dir / "narration_combined.mov"
if not combined_path.exists():
print(
f"Error: narration_combined.mov not found at {combined_path}",
file=sys.stderr,
)
print("Run 'gnommo -p <project> stitch' first.", file=sys.stderr)
return 1
# Output to project out/ directory
audio_dir = project_path / "out"
audio_dir.mkdir(parents=True, exist_ok=True)
return _extract_audio_file(
combined_path, audio_dir, "narration_combined", channel, verbose
)
# Normal mode: extract from individual segments
narration, narration_dir = parse_narration(project_path, config)
if not narration:
print(" No narration segments found in media/narration/narration.json")
print(" Run 'gnommo -p <project> import' first to populate narration.json")
return 1
# Create output directory
audio_dir = narration_dir / "audio"
audio_dir.mkdir(parents=True, exist_ok=True)
# Determine which segments to process
if segment:
if segment not in narration:
print(
f"Error: Segment '{segment}' not found in narration.json",
file=sys.stderr,
)
print(
f"Available segments: {', '.join(sorted(narration.keys()))}",
file=sys.stderr,
)
return 1
segments_to_process = [(segment, narration[segment])]
else:
segments_to_process = sorted(narration.items())
print(f" Channel: {channel}")
print(f" Output: {audio_dir}/")
print(f" Segments: {len(segments_to_process)}")
# Process each segment
for segment_id, segment_source in segments_to_process:
source_path = narration_dir / segment_source.source_file
if not source_path.exists():
print(f" Warning: Source not found: {source_path.name}, skipping")
continue
# Build output filename
if channel == "both":
output_name = f"{segment_id}.wav"
else:
output_name = f"{segment_id}_{channel}.wav"
output_path = audio_dir / output_name
print(f"\n {segment_id}:")
print(f" Source: {source_path.name}")
print(f" Output: {output_name}")
# Build ffmpeg command
cmd = [
"ffmpeg",
"-y", # Overwrite
"-i",
str(source_path),
"-vn", # No video
]
# Channel selection
if channel == "left":
cmd.extend(["-af", "pan=mono|c0=c0"])
elif channel == "right":
cmd.extend(["-af", "pan=mono|c0=c1"])
# "both" keeps stereo, no filter needed
# Output format: 48kHz 16-bit WAV (standard for audio editing)
cmd.extend(
[
"-ar",
"48000", # 48kHz sample rate
"-acodec",
"pcm_s16le", # 16-bit PCM
str(output_path),
]
)
if verbose:
print(f" Command: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f" Error: {result.stderr}", file=sys.stderr)
return 1
# Get duration info
duration_cmd = [
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
str(output_path),
]
duration_result = subprocess.run(duration_cmd, capture_output=True, text=True)
if duration_result.returncode == 0:
try:
duration = float(duration_result.stdout.strip())
print(f" Duration: {duration:.1f}s")
except ValueError:
pass
print(f" Done")
print(f"\n Audio files saved to: {audio_dir}")
print(f"\n Open in Audacity to experiment with:")
print(f" - Effect > Noise Reduction")
print(f" - Effect > Compressor")
print(f" - Effect > Filter Curve EQ")
print(f" - Effect > Loudness Normalization")
print(
f"\n Once you find good settings, update narration.json with matching filter config."
)
return 0
# =============================================================================
# Master Command (A/B audio comparison)
# =============================================================================
def cmd_master(
project_path: Path,
verbose: bool,
channel: str = "both",
) -> int:
"""
Extract raw and processed audio from narration_combined for A/B comparison.
Outputs:
out/narration_combined.wav - Raw audio (no processing)
out/narration_combined_processed.wav - With audio filters applied
This lets you compare the effect of your audio processing chain.
"""
from .parser import parse_project_config, parse_videos
from .preprocessor import parse_audio_normalize_config
print(f"Audio mastering: {project_path.name}")
config = parse_project_config(project_path)
videos, videos_dir = parse_videos(project_path, config)
# Find narration_combined.mov
combined_path = videos_dir / "narration_combined.mov"
if not combined_path.exists():
print(
f"Error: narration_combined.mov not found at {combined_path}",
file=sys.stderr,
)
print("Run 'gnommo -p <project> stitch' first.", file=sys.stderr)
return 1
# Output directory
out_dir = project_path / "out"
out_dir.mkdir(parents=True, exist_ok=True)
raw_output = out_dir / "narration_combined.wav"
processed_output = out_dir / "narration_combined_processed.wav"
# Find audio_normalize config from default_filters
audio_config = None
if config.default_filters:
for preset_name, filters in config.default_filters.items():
for f in filters:
if f.get("type") == "audio_normalize":
audio_config = f
print(f" Using audio config from: default_filters.{preset_name}")
break
if audio_config:
break
if not audio_config:
print(" Warning: No audio_normalize filter found in default_filters")
print(" Will only extract raw audio.")
# Build channel filter
channel_filter = ""
if channel == "left":
channel_filter = "pan=mono|c0=c0,"
elif channel == "right":
channel_filter = "pan=mono|c0=c1,"
# Step 1: Extract raw audio
print(f"\n Extracting raw audio...")
raw_cmd = [
"ffmpeg",
"-y",
"-i",
str(combined_path),
"-vn",
]
if channel_filter:
raw_cmd.extend(["-af", channel_filter.rstrip(",")])
raw_cmd.extend(
[
"-ar",
"48000",
"-acodec",
"pcm_s16le",
str(raw_output),
]
)
if verbose:
print(f" Command: {' '.join(raw_cmd)}")
result = subprocess.run(raw_cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f" Error extracting raw audio: {result.stderr}", file=sys.stderr)
return 1
print(f" Saved: {raw_output.name}")
# Step 2: Extract processed audio (if we have config)
if audio_config:
print(f"\n Applying audio filters...")
cfg = parse_audio_normalize_config(audio_config)
# Build filter chain (same order as apply_audio_normalize)
audio_filters = []
# Channel mapping
if channel_filter:
audio_filters.append(channel_filter.rstrip(","))
# EQ bands
for band in cfg.eq_bands:
if band.type == "lowshelf":
audio_filters.append(
f"lowshelf=f={band.freq:.1f}:g={band.gain:.1f}:t=q:w={band.q:.2f}"
)
elif band.type == "highshelf":
audio_filters.append(
f"highshelf=f={band.freq:.1f}:g={band.gain:.1f}:t=q:w={band.q:.2f}"
)
else:
audio_filters.append(
f"equalizer=f={band.freq:.1f}:width_type=q:width={band.q:.2f}:g={band.gain:.1f}"
)
# High-pass
if cfg.highpass > 0:
audio_filters.append(f"highpass=f={cfg.highpass:.1f}")
# Low-pass
if cfg.lowpass > 0:
audio_filters.append(f"lowpass=f={cfg.lowpass:.1f}")
# Room EQ
if cfg.room_eq:
audio_filters.append(
f"equalizer=f={cfg.room_eq_freq:.1f}:width_type=q:width={cfg.room_eq_width:.2f}:g={cfg.room_eq_gain:.1f}"
)
# Denoise
if cfg.denoise:
audio_filters.append(f"afftdn=nf={cfg.noise_floor:.1f}")
# Gate
if cfg.gate:
audio_filters.append(
f"agate=threshold={cfg.gate_threshold:.1f}dB"
f":range={cfg.gate_range:.1f}dB"
f":attack={cfg.gate_attack:.1f}"
f":release={cfg.gate_release:.1f}"
)
# Compressor
if cfg.compress:
audio_filters.append(
f"acompressor=threshold={cfg.threshold:.1f}dB"
f":ratio={cfg.ratio:.1f}"
f":attack={cfg.attack:.1f}"
f":release={cfg.release:.1f}"
f":makeup={cfg.makeup:.1f}dB"
)
# Loudness normalization
if cfg.normalize:
audio_filters.append(
f"loudnorm=I={cfg.target_lufs:.1f}"
f":LRA={cfg.target_lra:.1f}"
f":TP={cfg.target_tp:.1f}"
)
filter_chain = ",".join(audio_filters)
if verbose:
print(f" Filter chain: {filter_chain}")
# Print filter summary
print(f" Filters applied:")
if cfg.eq_bands:
print(f" - EQ: {len(cfg.eq_bands)} bands")
if cfg.highpass > 0:
print(f" - Highpass: {cfg.highpass}Hz")
if cfg.denoise:
print(f" - Denoise: floor={cfg.noise_floor}dB")
if cfg.gate:
print(f" - Gate: threshold={cfg.gate_threshold}dB")
if cfg.compress:
print(f" - Compressor: ratio={cfg.ratio}:1, attack={cfg.attack}ms")
if cfg.normalize:
print(f" - Loudnorm: target={cfg.target_lufs} LUFS")
processed_cmd = [
"ffmpeg",
"-y",
"-i",
str(combined_path),
"-vn",
"-af",
filter_chain,
"-ar",
"48000",
"-acodec",
"pcm_s16le",
str(processed_output),
]
if verbose:
print(f" Command: {' '.join(processed_cmd)}")
result = subprocess.run(processed_cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f" Error applying filters: {result.stderr}", file=sys.stderr)
return 1
print(f" Saved: {processed_output.name}")
# Get durations
def get_duration(path):
cmd = [
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
str(path),
]
r = subprocess.run(cmd, capture_output=True, text=True)
try:
return float(r.stdout.strip())
except:
return 0
duration = get_duration(raw_output)
print(f"\n Output files ({duration:.1f}s):")
print(f" {raw_output}")
print(f" {processed_output}")
print(f"\n Open both in Audition to A/B compare the processing.")
return 0
if __name__ == "__main__":
sys.exit(main())