Files
gnommo/gnommo/cli.py
T

4446 lines
153 KiB
Python

"""CLI entry point for GnommoEditor."""
import argparse
import json
from logging import config
import re
import time
import shutil
import subprocess
import sys
from datetime import datetime
from pathlib import Path
from gnommo.parser import _read_json
from . import __version__
from .errors import GnommoError, ParseError, ValidationError, RenderError
from .cache import get_cache_info, resolve_with_cache
from typing import Optional, Union
class NotImplementedException(GnommoError):
"""Feature not yet implemented."""
pass
def main() -> int:
"""Main entry point."""
parser = argparse.ArgumentParser(
prog="gnommo",
description="GnommoEditor - A code-first video editing pipeline",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
gnommo -p video1 render Render the full project
gnommo -p video1 render --slides S1:S10 Render only slides S1-S10
gnommo -p video1 render --slides S10: Render from S10 to end
gnommo -p video1 validate Validate only
gnommo -p video1 import Generate slides.json from images
gnommo -p video1 pre Preprocess videos (chroma key, etc.)
gnommo -p video1 stitch --res tiny -f Fast stitch with new begin/end values
gnommo -p video1 trim Auto-detect silence and set skip/take in narration.json
gnommo -p video1 trim --force Redo trim even for segments that already have skip/take
gnommo -p video1 trim --threshold -25 Raise threshold to ignore clothing/room noise
gnommo -p video1 trim -v Show detected silence periods for debugging
gnommo -p video1 transcode Transcode narration folder to H.265 (1st pass, before preprocess)
gnommo -p video1 transcode --replace Delete originals after successful transcode
gnommo -p video1 transcode --crf 28 Lower quality / smaller files (default CRF: 23)
gnommo -p video1 transcode --processed Compress _processed.mov files to HEVC+alpha (2nd pass, after preprocess)
gnommo -p video1 transcode --processed --alpha-quality 0.5 More aggressive alpha compression
gnommo -p video1 transcode --processed --dry-run Preview what would be compressed
gnommo -p video1 transcode --force Re-transcode even if output already exists
gnommo -p video1 all Full pipeline: import → preprocess → trim → stitch → render → push → handoff → up
gnommo -p video1 render --dry-run Show FFmpeg command without running
gnommo -p video1 description Generate YouTube description file
gnommo -p video1 transcribe Narration file for timing of slides
gnommo -p video1 transcribe --final Transcribe outputted file and generate SRT for YouTube
gnommo -p video1 archive Sync project to external cache storage
gnommo -p video1 archive --dry-run Preview what would be synced
gnommo -p video1 up Upload project files to remote server
gnommo -p video1 down Download project files from remote server
gnommo -p video1 extract-audio --combined Extract audio from narration_combined.mov
gnommo -p video1 extract-audio --combined --channel left Extract left channel only
gnommo -p video1 extract-audio --segment seg01 Extract from a specific segment
gnommo -p video1 master Extract raw + processed audio for A/B comparison
""",
)
parser.add_argument(
"--version",
action="version",
version=f"%(prog)s {__version__}",
)
# Required arguments
parser.add_argument(
"-p",
"--project",
type=str,
required=True,
help="Project directory",
)
parser.add_argument(
"action",
type=str,
nargs="?",
default="render",
choices=[
"validate",
"preprocess",
"pre",
"stitch",
"trim",
"render",
"all",
"transcribe",
"align",
"import",
"description",
"archive",
"load",
"up",
"down",
"extract-audio",
"master",
"push",
"pull",
"handoff",
"transcode",
"pexels",
],
help="Action to perform (default: render)",
)
# Optional arguments
parser.add_argument(
"-v",
"--verbose",
action="store_true",
help="Verbose output",
)
parser.add_argument(
"-f",
"--force",
action="store_true",
help="Force overwrite existing files",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be done without executing",
)
parser.add_argument(
"--slides",
type=str,
help="Render only a range of slides (e.g., S1:S10, S5:, S10:S20)",
)
parser.add_argument(
"--chunk-slides",
type=int,
default=0,
dest="chunk_slides",
help="Split render into chunks of N slides each and concatenate (overrides render_chunk_slides in .gnommo.conf)",
)
parser.add_argument(
"--res",
type=str,
choices=["full", "low", "tiny"],
default="full",
help="Resolution: 'full' (project res), 'low' (490x270), 'tiny' (320x180 ultrafast)",
)
parser.add_argument(
"-w",
"--workers",
type=int,
default=1,
help="Number of parallel workers for preprocessing (default: 1)",
)
parser.add_argument(
"--final",
action="store_true",
help="For transcribe: transcribe the final rendered video and generate SRT captions for YouTube",
)
parser.add_argument(
"--segment",
type=str,
help="For extract-audio: specific segment ID to extract (default: all segments)",
)
parser.add_argument(
"--channel",
type=str,
choices=["auto", "left", "right", "both"],
default="both",
help="For extract-audio: which audio channel(s) to extract (default: both)",
)
parser.add_argument(
"--combined",
action="store_true",
help="For extract-audio: extract from narration_combined.mov instead of individual segments",
)
parser.add_argument(
"--file",
default=None,
help="For handoff: path to video file (overrides output_video in project.json)",
)
parser.add_argument(
"--prod",
action="store_true",
help="Target production server (GNOMMOWEB_PROD_URL / GNOMMOWEB_PROD_API_KEY)",
)
parser.add_argument(
"--threshold",
type=float,
default=-40.0,
help="For trim: silence threshold in dB (default: -40). Raise (e.g. -25) to ignore clothing/room noise.",
)
parser.add_argument(
"--crf",
type=int,
default=23,
help="For transcode: H.265 quality (CRF, default: 23; lower=better quality, larger file)",
)
parser.add_argument(
"--replace",
action="store_true",
help="For transcode: delete original files after successful transcode",
)
parser.add_argument(
"--processed",
action="store_true",
help="For transcode: compress _processed.mov files (with alpha) using HEVC+alpha instead of narration files",
)
parser.add_argument(
"--alpha-quality",
type=float,
default=1.0,
dest="alpha_quality",
help="For transcode --processed: HEVC alpha quality 0.0-1.0 (default: 0.75; lower=smaller file)",
)
args = parser.parse_args()
# Resolve project path
project_path = Path(args.project)
if not project_path.is_absolute():
project_path = Path.cwd() / project_path
try:
# Handle actions
action = args.action
if action == "import":
return cmd_import(project_path, args.force, args.verbose)
elif action == "validate":
return cmd_validate(project_path, args.verbose)
elif action in ("preprocess", "pre"):
return cmd_preprocess(
project_path,
args.verbose,
args.dry_run,
args.force,
args.workers,
args.res,
)
elif action == "trim":
return cmd_trim(
project_path, args.verbose, args.force, args.threshold, args.res
)
elif action == "transcode":
return cmd_transcode(
project_path,
args.verbose,
args.dry_run,
args.replace,
args.crf,
args.force,
args.processed,
args.alpha_quality,
)
elif action in ("stitch"):
return cmd_stitch(
project_path,
args.verbose,
args.force,
args.res,
)
elif action == "render":
return cmd_render(
project_path,
args.verbose,
args.dry_run,
args.slides,
args.res,
args.force,
chunk_slides=args.chunk_slides,
)
elif action == "transcribe":
return cmd_transcribe(project_path, args.verbose, args.res, args.final)
elif action == "align":
return cmd_align(project_path, args.verbose)
elif action == "all":
return cmd_all(
project_path, args.verbose, args.dry_run, args.res, args.force
)
elif action == "description":
return cmd_description(project_path, args.verbose)
elif action == "archive":
return cmd_archive(project_path, args.verbose, args.dry_run)
elif action == "load":
return cmd_load(project_path, args.verbose, args.dry_run)
elif action == "up":
return cmd_sync(project_path, args.verbose, args.dry_run, download=False)
elif action == "down":
return cmd_sync(project_path, args.verbose, args.dry_run, download=True)
elif action == "extract-audio":
return cmd_extract_audio(
project_path, args.verbose, args.segment, args.channel, args.combined
)
elif action == "master":
return cmd_master(project_path, args.verbose, args.channel)
elif action == "push":
from .push import cmd_push
return cmd_push(project_path, args.verbose, args.force, args.prod)
elif action == "pull":
from .pull import cmd_pull
return cmd_pull(project_path, args.verbose, args.force, args.prod)
elif action == "handoff":
from .handoff import cmd_handoff
return cmd_handoff(
project_path, args.verbose, args.file, args.prod, args.res
)
elif action == "pexels":
return cmd_pexels(project_path, args.verbose)
except GnommoError as e:
print(f"Error: {e}", file=sys.stderr)
return 1
except KeyboardInterrupt:
print("\nAborted.", file=sys.stderr)
return 130
return 0
# =============================================================================
# Import Command
# =============================================================================
def cmd_import(project_path: Path, force: bool, verbose: bool) -> int:
"""Import assets and generate metadata JSON files."""
from .parser import parse_project_config, _read_json
print(f"Importing assets for: {project_path.name}")
if not project_path.exists():
print(f"Error: Project directory not found: {project_path}", file=sys.stderr)
return 1
# Load project config if it exists (for videos_path and default_filters)
config = None
if (project_path / "project.json").exists():
config = parse_project_config(project_path)
# Import videos from media/videos directory
if config and config.videos_path:
videos_json_path = project_path / config.videos_path
videos_dir = videos_json_path.parent
else:
videos_dir = project_path / "media" / "videos"
if videos_dir.exists():
_import_videos(videos_dir, config, verbose)
# Import narration segments from media/narration directory
narration_dir = project_path / "media" / "narration"
if narration_dir.exists():
_import_narration_segments(narration_dir, config, verbose)
# Import presenter notes from Keynote file (also exports slide PNGs)
keynote_files = list(project_path.glob("*.key"))
if keynote_files:
keynote_file = keynote_files[0] # Use first .key file found
if len(keynote_files) > 1:
print(f" Warning: Multiple .key files found, using {keynote_file.name}")
_import_presenter_notes(project_path, keynote_file, verbose, config)
# Generate slides.json for each slide directory (after Keynote export)
slides_base = project_path / "media" / "slides"
slides_dirs = (
[d for d in slides_base.glob("*/") if d.is_dir()]
if slides_base.exists()
else []
)
for slides_dir in slides_dirs:
_generate_slides_json(slides_dir, verbose)
else:
if verbose:
print(" No .key file found, skipping presenter notes import")
# Import shared assets (pexels, etc.) from shared_assets directory
# Look for shared_assets relative to project or in parent directories
shared_assets_dir = _find_shared_assets(project_path)
if shared_assets_dir:
_import_shared_assets(shared_assets_dir, verbose)
_import_shared_audio(shared_assets_dir, project_path, config, verbose)
_sync_shared_videos_to_local(project_path, config, shared_assets_dir, verbose)
# Probe and cache audio file durations into audio.json
_probe_audio_durations(project_path, config, force, verbose, shared_assets_dir)
# Probe and cache video metadata (duration, has_audio) into videos.json
_probe_video_metadata(project_path, config, shared_assets_dir, force, verbose)
# ETL: if a manuscript exists, project shorthand marker semantics (cutout/layer)
# into videos.json so the render stage is always data-driven from the manuscript.
# Run AFTER sync so newly-added shared videos are already present when we write
# their cutout/layer. Also warn about any referenced video that is still missing.
manuscript_path = project_path / "manuscript.txt"
if manuscript_path.exists() and config:
from .parser import parse_manuscript
from .transformer import _SHORTHAND_PREFIXES
_, markers, _, _ = parse_manuscript(project_path)
if markers:
_project_markers_to_videos(
markers,
project_path / config.videos_path,
config,
project_path,
)
# Warn about shorthand-referenced videos still absent from videos.json
videos_json_path = project_path / config.videos_path
local_vids: dict = (
_read_json(videos_json_path) if videos_json_path.exists() else {}
)
seen_missing: set[str] = set()
for marker in markers:
for prefix in _SHORTHAND_PREFIXES:
if marker.startswith(prefix):
vid_id = marker[len(prefix):]
if vid_id not in local_vids and vid_id not in seen_missing:
print(
f" ⚠ [{marker}] video '{vid_id}' not found in "
f"videos.json or shared_assets — add it manually"
)
seen_missing.add(vid_id)
break
print("Import complete.")
return 0
def _import_shared_audio(
shared_assets_dir: Path,
project_path: Path,
config,
verbose: bool,
) -> None:
"""Import audio files from shared_assets/media/audio into the project's audio.json."""
audio_extensions = {".mp3", ".wav", ".aac", ".m4a", ".ogg", ".flac"}
shared_audio_dir = shared_assets_dir / "media" / "audio"
if not shared_audio_dir.exists():
if verbose:
print(f" No shared audio dir found at {shared_audio_dir}")
return
audio_files = sorted(
f
for f in shared_audio_dir.iterdir()
if f.is_file()
and f.suffix.lower() in audio_extensions
and not f.name.startswith(".")
)
if not audio_files:
if verbose:
print(f" No audio files found in {shared_audio_dir}")
return
# Resolve project audio.json path
if config and config.audio_path:
audio_json_path = project_path / config.audio_path
else:
audio_json_path = project_path / "media" / "audio" / "audio.json"
audio_json_path.parent.mkdir(parents=True, exist_ok=True)
existing: dict = _read_json(audio_json_path) if audio_json_path.exists() else {}
added = 0
for f in audio_files:
audio_id = f.stem
if audio_id in existing:
if verbose:
print(f" Skipping {audio_id} (already in audio.json)")
continue
existing[audio_id] = {
"file": f.name,
"is_shared": True,
"volume": 1.0,
}
added += 1
if verbose:
print(f" Added shared audio: {audio_id}")
if added > 0:
with open(audio_json_path, "w", encoding="utf-8") as fh:
json.dump(existing, fh, indent=2)
print(
f" Updated {audio_json_path.relative_to(project_path)} (+{added} shared audio files)"
)
else:
if verbose:
print(f" No new shared audio files to add")
def _probe_audio_durations(
project_path: Path,
config,
force: bool,
verbose: bool,
shared_assets_dir: Optional[Path] = None,
) -> None:
"""Probe and cache audio file durations into audio.json.
Runs once at import time so the render stage never needs to scan audio files.
Skips entries that already have a duration unless --force is set.
"""
from .renderer import _get_audio_duration
if config and config.audio_path:
audio_json_path = project_path / config.audio_path
else:
audio_json_path = project_path / "audio.json"
if not audio_json_path.exists():
return
audio_dir = audio_json_path.parent
data = _read_json(audio_json_path)
updated = False
for audio_id, audio_data in data.items():
if "file" not in audio_data:
continue
if "duration" in audio_data and not force:
if verbose:
print(f" Audio '{audio_id}': cached ({audio_data['duration']:.1f}s)")
continue
if audio_data.get("is_shared") and shared_assets_dir:
audio_path = shared_assets_dir / "media" / "audio" / audio_data["file"]
else:
audio_path = audio_dir / audio_data["file"]
if not audio_path.exists():
if verbose:
print(f" Audio '{audio_id}': file not found, skipping")
continue
print(
f" Probing audio '{audio_id}' ({audio_path.name})...", end=" ", flush=True
)
try:
duration = _get_audio_duration(audio_path)
data[audio_id]["duration"] = round(duration, 3)
updated = True
print(f"{duration:.1f}s")
except Exception as e:
print(f"failed ({e})")
if updated:
with open(audio_json_path, "w") as f:
json.dump(data, f, indent=4)
print(f" Saved durations to {audio_json_path.name}")
def _probe_video_metadata(
project_path: Path,
config,
shared_assets_dir: Optional[Path],
force: bool,
verbose: bool,
) -> None:
"""Probe and cache video file duration and audio presence into videos.json.
Runs once at import time so the render stage never needs to probe video files.
Shared entries are written back to shared_assets/videos.json (canonical source).
Local entries are written to the project's videos.json.
Skips entries that already have both fields unless --force is set.
"""
from .preprocessor import get_video_duration
from .renderer import _has_audio_stream
if config and config.videos_path:
videos_json_path = project_path / config.videos_path
else:
videos_json_path = project_path / "media" / "videos" / "videos.json"
if not videos_json_path.exists():
return
videos_dir = videos_json_path.parent
local_data = _read_json(videos_json_path)
# Load shared_assets/videos.json separately — shared probes write there
shared_json_path = shared_assets_dir / "videos.json" if shared_assets_dir else None
shared_data = (
_read_json(shared_json_path)
if shared_json_path and shared_json_path.exists()
else {}
)
local_updated = False
shared_updated = False
for video_id, video_data in local_data.items():
if "source_file" not in video_data:
continue
is_shared = video_data.get("is_shared", False)
# For shared entries, check the shared_assets/videos.json for cached values
if is_shared and video_id in shared_data:
canonical = shared_data[video_id]
else:
canonical = video_data
if not force and "duration" in canonical and "has_audio" in canonical:
if verbose:
print(
f" Video '{video_id}': cached ({canonical['duration']:.1f}s, audio={canonical['has_audio']})"
)
continue
base_dir = (
shared_assets_dir if (is_shared and shared_assets_dir) else videos_dir
)
# Mirror renderer._resolve_video_path: try output_file first, then source_file
video_path = None
output_file = video_data.get("output_file")
if output_file:
for candidate_dir in [base_dir, base_dir.parent]:
candidate = candidate_dir / output_file
if candidate.exists():
video_path = candidate
break
mov_candidate = candidate.with_suffix(".mov")
if mov_candidate.exists():
video_path = mov_candidate
break
if video_path is None:
source_candidate = base_dir / video_data["source_file"]
if source_candidate.exists():
video_path = source_candidate
if video_path is None:
if verbose:
print(f" Video '{video_id}': file not found, skipping")
continue
print(
f" Probing video '{video_id}' ({video_path.name})...", end=" ", flush=True
)
try:
duration = get_video_duration(video_path)
has_audio = _has_audio_stream(video_path)
result = {"duration": round(duration, 3), "has_audio": has_audio}
print(f"{duration:.1f}s, audio={has_audio}")
if is_shared and video_id in shared_data:
# Write back to shared_assets/videos.json — canonical source for shared assets
shared_data[video_id].update(result)
shared_updated = True
else:
local_data[video_id].update(result)
local_updated = True
except Exception as e:
print(f"failed ({e})")
if local_updated:
with open(videos_json_path, "w") as f:
json.dump(local_data, f, indent=4)
print(f" Saved metadata to {videos_json_path.name}")
if shared_updated and shared_json_path:
with open(shared_json_path, "w") as f:
json.dump(shared_data, f, indent=4)
print(f" Saved shared metadata to {shared_json_path.name}")
def _sync_shared_videos_to_local(
project_path: Path, config, shared_assets_dir: Path, verbose: bool
) -> None:
"""Append entries from shared_assets/videos.json into the project's local videos.json.
Each new entry gets is_shared=true so the renderer looks in shared_assets_dir.
Existing local entries are never overwritten (preserves cutout, layer, filters, etc.).
"""
shared_videos_json = shared_assets_dir / "videos.json"
if not shared_videos_json.exists():
return
shared_videos = _read_json(shared_videos_json)
if not shared_videos:
return
if config and config.videos_path:
local_json_path = project_path / config.videos_path
else:
local_json_path = project_path / "media" / "videos" / "videos.json"
local_videos: dict = {}
if local_json_path.exists():
local_videos = _read_json(local_json_path)
_METADATA_FIELDS = ("duration", "has_audio")
added = []
metadata_updated = []
for video_id, shared_entry in shared_videos.items():
if video_id in local_videos:
# Propagate any metadata fields that were probed into shared_assets/videos.json
changed = False
for field in _METADATA_FIELDS:
if (
field in shared_entry
and local_videos[video_id].get(field) != shared_entry[field]
):
local_videos[video_id][field] = shared_entry[field]
changed = True
if changed:
metadata_updated.append(video_id)
elif verbose:
print(f" Shared '{video_id}': already in local videos.json, skipping")
continue
# New entry — copy from shared and mark it as shared
local_entry = dict(shared_entry)
local_entry["is_shared"] = True
local_videos[video_id] = local_entry
added.append(video_id)
if added or metadata_updated:
local_json_path.parent.mkdir(parents=True, exist_ok=True)
with open(local_json_path, "w", encoding="utf-8") as f:
json.dump(local_videos, f, indent=4)
if added:
print(
f" Synced {len(added)} shared asset(s) to local videos.json: {', '.join(added)}"
)
if metadata_updated:
print(
f" Updated metadata for {len(metadata_updated)} shared asset(s): {', '.join(metadata_updated)}"
)
elif verbose:
print(" No new shared assets to sync to local videos.json")
def _find_shared_assets(project_path: Path) -> Optional[Path]:
"""Find the shared_assets directory.
Looks in:
1. project_path/shared_assets
2. project_path/../shared_assets (sibling to project)
"""
# Check if shared_assets is inside project
if (project_path / "shared_assets").exists():
return project_path / "shared_assets"
# Check if shared_assets is sibling to project
if (project_path.parent / "shared_assets").exists():
return project_path.parent / "shared_assets"
return None
def _import_shared_assets(shared_assets_dir: Path, verbose: bool) -> None:
"""Import video files from shared_assets directory into videos.json.
Scans the root level and all subdirectories for video files and creates
a unified videos.json in shared_assets/.
Video IDs use the filename for root-level files (e.g., "Logo") or
are prefixed with the subfolder name for subdirectory files (e.g., "pexels/filename").
"""
video_extensions = {".mov", ".mp4", ".webm", ".avi", ".mkv", ".m4v"}
# Find all video files in shared_assets (root level and subdirectories).
# Also scan the GnommoDisk cache mirror so files placed there are registered.
from .cache import load_cache_config
scan_roots: list[Path] = [shared_assets_dir]
cache_base = load_cache_config()
if cache_base:
cache_shared = cache_base / "shared_assets"
if cache_shared.exists() and cache_shared != shared_assets_dir:
scan_roots.append(cache_shared)
video_files: list[tuple[Path, Path]] = [] # (relative_path, absolute_path)
seen_rel: set[str] = set() # deduplicate by relative path
for scan_root in scan_roots:
for item in scan_root.iterdir():
if item.name.startswith("."):
continue
if item.is_file():
if (
item.suffix.lower() in video_extensions
and not item.name.endswith("_processed.mov")
and not item.name.endswith("_processed.webm")
):
rel_path = item.relative_to(scan_root)
if str(rel_path) not in seen_rel:
seen_rel.add(str(rel_path))
video_files.append((rel_path, item))
elif item.is_dir():
for video_file in item.rglob("*"):
if (
video_file.is_file()
and video_file.suffix.lower() in video_extensions
and not video_file.name.endswith("_processed.mov")
and not video_file.name.endswith("_processed.webm")
):
rel_path = video_file.relative_to(scan_root)
if str(rel_path) not in seen_rel:
seen_rel.add(str(rel_path))
video_files.append((rel_path, video_file))
if not video_files:
if verbose:
print(f" No video files found in {shared_assets_dir}")
return
# Load existing videos.json if it exists
videos_json_path = shared_assets_dir / "videos.json"
existing_videos: dict = {}
if videos_json_path.exists():
existing_videos = _read_json(videos_json_path)
# Add new videos (don't overwrite existing)
added_count = 0
for rel_path, abs_path in sorted(video_files):
# Use path relative to shared_assets without extension as video_id
# e.g., "Logo" for root files, "pexels/6759604-hd" for subdirectory files
video_id = str(rel_path.with_suffix(""))
if video_id in existing_videos:
if verbose:
print(f" Skipping {video_id} (already exists)")
continue
existing_videos[video_id] = {
"source_file": str(rel_path),
}
added_count += 1
if verbose:
print(f" Added: {video_id}")
if added_count > 0:
# Write updated videos.json
with open(videos_json_path, "w", encoding="utf-8") as f:
json.dump(existing_videos, f, indent=2)
print(f" Updated {videos_json_path} (+{added_count} shared assets)")
else:
print(f" No new shared assets to add")
def _generate_slides_json(directory: Path, verbose: bool) -> None:
"""Generate slides.json from Keynote export folder."""
extensions = {".png", ".gif", ".pdf", ".jpg", ".jpeg"}
files = [f for f in directory.iterdir() if f.suffix.lower() in extensions]
if not files:
print(f" Warning: No image files in {directory}")
return
# Extract numeric suffix from filenames like "Video1.001.png"
pattern = re.compile(r"\.(\d+)\.[^.]+$")
slides = {}
for file in files:
match = pattern.search(file.name)
if match:
num = int(match.group(1))
slide_id = f"S{num}"
slides[slide_id] = {
"image": file.name,
"type": "fullscreen",
}
if not slides:
print(f" Warning: No valid slide files in {directory}")
return
# Sort by slide number
sorted_slides = dict(sorted(slides.items(), key=lambda x: int(x[0][1:])))
# Write slides.json only if content changed
output_path = directory / "slides.json"
new_content = json.dumps(sorted_slides, indent=2)
existing_content = (
output_path.read_text(encoding="utf-8") if output_path.exists() else None
)
if new_content != existing_content:
with open(output_path, "w", encoding="utf-8") as f:
f.write(new_content)
print(f" Generated {output_path} ({len(sorted_slides)} slides)")
if verbose:
for slide_id in sorted_slides:
print(f" [{slide_id}]")
def _import_videos(videos_dir: Path, config, verbose: bool) -> None:
"""Import video files into videos.json.
Scans the videos directory for video files and adds them to videos.json.
Uses the filename (without extension) as the video_id.
Does not overwrite existing entries - only adds new ones.
If the video filename matches a pattern like 'talkinghead*' and a 'talkinghead'
filter preset exists in default_filters, it will be applied automatically.
"""
video_extensions = {".mov", ".mp4", ".webm", ".avi", ".mkv", ".m4v"}
# Find all video files (exclude processed outputs, proxies, and intermediate files)
video_files = [
f
for f in videos_dir.iterdir()
if f.is_file()
and f.suffix.lower() in video_extensions
and "_processed" not in f.stem # Exclude any _processed files
and "_fixed" not in f.stem # Exclude any _fixed files
and not f.name.startswith("narration_combined")
]
# Also exclude files in subdirectories (proxy/, intermediate/, etc.)
video_files = [f for f in video_files if f.parent == videos_dir]
# Ensure videos.json exists even if there are no video files yet
videos_json_path = videos_dir / "videos.json"
if not videos_json_path.exists():
videos_dir.mkdir(parents=True, exist_ok=True)
with open(videos_json_path, "w", encoding="utf-8") as f:
json.dump({}, f, indent=2)
print(
f" Created empty {videos_json_path.relative_to(videos_dir.parent.parent)}"
)
if not video_files:
if verbose:
print(f" No new video files found in {videos_dir}")
return
# Load existing videos.json
existing_videos: dict = {}
if videos_json_path.exists():
existing_videos = _read_json(videos_json_path)
# Get available filter presets from config
default_filters = config.default_filters if config else {}
# Add new videos (don't overwrite existing)
added_count = 0
for video_file in sorted(video_files):
# Use filename without extension as video_id
video_id = video_file.stem
if video_id in existing_videos:
if verbose:
print(f" Skipping {video_id} (already exists)")
continue
# Determine if this is a talking head segment
# Match patterns like: talkinghead, talkingheadS01, talkinghead_s01, etc.
is_narration_combined = "narration_combined" in video_file.stem.lower()
# Build the video entry
video_entry = {
"source_file": video_file.name,
}
if is_narration_combined:
video_entry["output_file"] = None
video_entry["cutout"] = "talkinghead"
video_entry["always_visible"] = True
video_entry["skip"] = 0
video_entry["filter"] = []
print(f" Added talking head segment: {video_id}")
else:
# Regular video
video_entry["output_file"] = video_file.name
video_entry["cutout"] = "square"
video_entry["filter"] = []
if verbose:
print(f" Added: {video_id}")
existing_videos[video_id] = video_entry
added_count += 1
if added_count > 0:
# Write updated videos.json
with open(videos_json_path, "w", encoding="utf-8") as f:
json.dump(existing_videos, f, indent=2)
print(f" Updated {videos_json_path.name} (+{added_count} videos)")
else:
print(f" No new videos to add")
def _import_narration_segments(narration_dir: Path, config, verbose: bool) -> None:
"""Import narration video files into narration.json.
Folder structure:
media/narration/raw_mov/ ← raw recordings from iPhone/QuickTime
media/narration/processed/ ← chroma-keyed output (preprocess)
media/narration/narration.json
Scans processed/ for ready-to-stitch files and raw/ for any new raw
recordings not yet represented in narration.json.
Priority: processed/ files define the segment catalogue.
Raw files discovered in raw/ add new entries pointing at raw/ with
output_file preset to processed/<stem>_processed.mov.
"""
video_extensions = {".mov", ".mp4", ".webm", ".avi", ".mkv", ".m4v"}
processed_dir = narration_dir / "processed"
raw_dir = narration_dir / "raw_mov"
processed_dir.mkdir(parents=True, exist_ok=True)
raw_dir.mkdir(parents=True, exist_ok=True)
# Load / create narration.json
narration_json_path = narration_dir / "narration.json"
existing_narration: dict = {}
if narration_json_path.exists():
existing_narration = _read_json(narration_json_path)
default_filters = config.default_filters if config else {}
added_count = 0
def _scan(directory: Path) -> list[Path]:
if not directory.exists():
return []
return sorted(
f
for f in directory.iterdir()
if f.is_file()
and f.suffix.lower() in video_extensions
and not f.name.startswith(".")
)
# 1. Scan processed/ — only add entries when NO raw_mov equivalent exists.
# If raw_mov has the source, step 2 will create the entry pointing there
# (with the filter chain), which is better for re-processing later.
_raw_video_exts = {".mov", ".mp4", ".avi", ".mkv", ".m4v"}
for video_file in _scan(processed_dir):
segment_id = video_file.stem
# Strip _processed suffix for cleaner segment IDs if present
if segment_id.endswith("_processed"):
segment_id = segment_id[:-10]
if segment_id in existing_narration:
if verbose:
print(f" Skipping {segment_id} (already exists)")
continue
# If a raw_mov equivalent exists, skip — step 2 will handle it
raw_mov_has_file = raw_dir.exists() and any(
(raw_dir / f"{segment_id}{ext}").exists() for ext in _raw_video_exts
)
if raw_mov_has_file:
continue
narration_entry = {
"source_file": f"processed/{video_file.name}",
}
narration_entry["use_audio_channels"] = "auto"
narration_entry["defer_loudnorm"] = True
existing_narration[segment_id] = narration_entry
added_count += 1
print(f" Added narration segment: {segment_id} (from processed/)")
# 2. Scan raw/ — add entries for raw files not yet in narration.json
for video_file in _scan(raw_dir):
segment_id = video_file.stem
if segment_id in existing_narration:
if verbose:
print(f" Skipping {segment_id} (already exists)")
continue
narration_entry = {
"source_file": f"raw_mov/{video_file.name}",
"output_file": f"processed/{video_file.stem}_processed.mov",
}
if "talkinghead" in default_filters:
narration_entry["cutout"] = "talkinghead"
narration_entry["filter"] = "talkinghead"
narration_entry["use_audio_channels"] = "auto"
narration_entry["defer_loudnorm"] = True
existing_narration[segment_id] = narration_entry
added_count += 1
print(f" Added narration segment: {segment_id} (from raw_mov)")
if added_count > 0 or not narration_json_path.exists():
with open(narration_json_path, "w", encoding="utf-8") as f:
json.dump(existing_narration, f, indent=2)
if added_count > 0:
print(f" Updated narration.json (+{added_count} segments)")
else:
if not existing_narration:
print(f" narration.json created (empty — add files to processed/ or raw/)")
else:
print(f" No new narration segments to add")
def _write_youtube_meta(
project_path: Path, config, citations: list[str]
) -> None:
"""Write youtube_meta.txt with project description and collected citations."""
meta_path = project_path / "youtube_meta.txt"
lines: list[str] = []
if config and config.description:
lines.append("== Description ==")
lines.append(config.description)
lines.append("")
if citations:
lines.append("== References ==")
for i, cite in enumerate(citations, 1):
lines.append(f"{i}. {cite}")
lines.append("")
meta_path.write_text("\n".join(lines), encoding="utf-8")
print(f" Wrote {meta_path.name} ({len(citations)} reference(s))")
def _import_presenter_notes(
project_path: Path, keynote_file: Path, verbose: bool, config=None
) -> None:
"""Extract presenter notes from Keynote and write to manuscript.txt.
[cite:...] markers are stripped from the manuscript and collected into
youtube_meta.txt alongside the project description.
Uses the JXA script (extract_keynote_notes.js) to extract notes via osascript.
Also exports slides as PNG images to media/slides/{project_name}/.
Backs up existing manuscript.txt before overwriting.
"""
# osascript is macOS-only; skip gracefully on WSL/Linux/Windows
if shutil.which("osascript") is None:
print(
f" Warning: osascript not available (not macOS) — skipping Keynote import for {keynote_file.name}.",
file=sys.stderr,
)
return
print(f" Extracting presenter notes from {keynote_file.name}...")
# Find the JXA script (in the same directory as this module)
script_dir = Path(__file__).parent
jxa_script = script_dir / "extract_keynote_notes.js"
if not jxa_script.exists():
print(f" Error: JXA script not found at {jxa_script}", file=sys.stderr)
return
# Backup existing manuscript.txt if it exists
manuscript_path = project_path / "manuscript.txt"
if manuscript_path.exists():
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = project_path / f"manuscript.txt.{timestamp}.bak"
shutil.copy2(manuscript_path, backup_path)
if verbose:
print(f" Backed up manuscript.txt to {backup_path.name}")
# Slides export directory: {project}/media/slides/{project_name}/
# Use lowercase so the path is consistent on case-sensitive filesystems (WSL/Linux).
slides_dir = project_path / "media" / "slides" / project_path.name.lower()
print(f" Exporting slides to {slides_dir}...")
# Run JXA extractor via osascript (also exports slides)
proc = subprocess.run(
[
"osascript",
"-l",
"JavaScript",
str(jxa_script),
str(keynote_file.resolve()),
str(slides_dir.resolve()),
],
capture_output=True,
text=True,
)
if proc.returncode != 0:
print(f" Error extracting presenter notes:", file=sys.stderr)
print(f" {proc.stderr}", file=sys.stderr)
return
# Parse JSON output from JXA script
try:
notes_data = json.loads(proc.stdout) if proc.stdout.strip() else []
except json.JSONDecodeError as e:
print(f" Error parsing notes JSON: {e}", file=sys.stderr)
return
# Convert to manuscript.txt format, stripping [cite:...] markers
_CITE_RE = re.compile(r"\[cite:([^\]]+)\]")
lines = []
citations: list[str] = []
seen_citations: set[str] = set()
for item in notes_data:
idx = item.get("slide_index")
notes = (item.get("notes") or "").rstrip()
lines.append(f"[S{idx}]")
if notes:
clean_note_lines = []
for note_line in notes.splitlines():
for m in _CITE_RE.finditer(note_line):
cite_text = m.group(1).strip()
if cite_text not in seen_citations:
citations.append(cite_text)
seen_citations.add(cite_text)
cleaned = _CITE_RE.sub("", note_line).strip()
if cleaned:
clean_note_lines.append(cleaned)
if clean_note_lines:
lines.append("\n".join(clean_note_lines))
lines.append("") # blank line between slides
# Write manuscript.txt with Unix line endings (Keynote notes may contain \r\n or \r)
content = "\n".join(lines).rstrip() + "\n"
content = content.replace("\r\n", "\n").replace("\r", "\n")
manuscript_path.write_text(content, encoding="utf-8")
print(f" Wrote {manuscript_path} ({len(notes_data)} slides)")
# Write youtube_meta.txt with description + collected citations
_write_youtube_meta(project_path, config, citations)
if citations and verbose:
for i, cite in enumerate(citations, 1):
print(f" {i}. {cite}")
if verbose:
non_empty = sum(1 for item in notes_data if item.get("notes"))
print(f" {non_empty} slides have presenter notes")
# =============================================================================
# Tasks File
# =============================================================================
_TASKS_VIDEO_PREFIXES = {
"video:": 6,
"vft:": 4,
"vfb:": 4,
"vf2t:": 5,
"vf2b:": 5,
"vst:": 4,
"vsb:": 4,
"vftp:": 5,
"vfbp:": 5,
"vf2tp:": 6,
"vf2bp:": 6,
"vstp:": 5,
"vsbp:": 5,
"narration:": 10,
}
def _collect_missing_video_markers(
markers: list[str], videos: dict
) -> list[tuple[str, str]]:
"""Return (marker_text, video_id) for video markers not defined in videos.json."""
missing = []
seen = set()
for marker in markers:
matched = next((p for p in _TASKS_VIDEO_PREFIXES if marker.startswith(p)), None)
if matched is None:
continue
video_id = marker[_TASKS_VIDEO_PREFIXES[matched] :]
if video_id not in videos and video_id not in seen:
seen.add(video_id)
missing.append((marker, video_id))
return missing
def _write_tasks_file(
project_path: Path,
missing_videos: list[tuple[str, str]],
alignment_issues: list[tuple[str, str]],
) -> None:
"""Write tasks.md to project_path with missing assets and alignment issues."""
tasks_path = project_path / "tasks.md"
today = datetime.now().strftime("%Y-%m-%d")
lines = [
f"# Tasks: {project_path.name}",
f"_Generated: {today}_",
"",
]
if missing_videos:
lines += [
f"## Missing Video Assets ({len(missing_videos)})",
"Referenced in manuscript.txt but not defined in videos.json.",
"",
]
for marker, video_id in missing_videos:
lines.append(f"- [ ] `{video_id}` — referenced as `[{marker}]`")
lines.append("")
if alignment_issues:
lines += [
f"## Slide Alignment Issues ({len(alignment_issues)})",
"Slide markers that could not be matched to the spoken narration (likely adlibbed).",
"",
]
for marker_id, context in alignment_issues:
lines.append(f'- [ ] `{marker_id}` — _"{context}"_')
lines.append("")
if not missing_videos and not alignment_issues:
lines += ["_No outstanding tasks._", ""]
tasks_path.write_text("\n".join(lines), encoding="utf-8")
print(
f" Tasks written → tasks.md"
+ (f" ({len(missing_videos)} missing videos)" if missing_videos else "")
+ (f" ({len(alignment_issues)} alignment issues)" if alignment_issues else "")
)
# =============================================================================
# Pexels Download Command
# =============================================================================
def cmd_pexels(project_path: Path, verbose: bool) -> int:
"""Download missing Pexels videos and enrich metadata for existing ones."""
from .parser import parse_manuscript, parse_project_config, parse_videos
from .pexels import (
get_pexels_api_key,
find_missing_pexels_videos,
download_video,
update_videos_json,
enrich_missing_descriptions,
)
api_key = get_pexels_api_key()
if not api_key:
print(
"Error: Pexels API key not configured.\n"
"Add to ~/.gnommo.conf:\n"
" [pexels]\n"
" api_key = YOUR_KEY_HERE\n"
"Get a free key at https://www.pexels.com/api/",
file=sys.stderr,
)
return 1
config = parse_project_config(project_path)
_, markers, _, _ = parse_manuscript(project_path)
videos, _ = parse_videos(project_path, config)
shared_assets_dir = _find_shared_assets(project_path)
if not shared_assets_dir:
print("Error: shared_assets directory not found.", file=sys.stderr)
return 1
local_videos_json = project_path / config.videos_path
shared_videos_json = shared_assets_dir / "videos.json"
# 1. Download missing files
missing = find_missing_pexels_videos(markers, videos, shared_assets_dir)
failed = 0
if missing:
print(f"Downloading {len(missing)} missing Pexels video(s)...")
for video_id, source_file in missing:
meta = download_video(source_file, shared_assets_dir, api_key)
if meta is None:
failed += 1
continue
for json_path in (local_videos_json, shared_videos_json):
update_videos_json(json_path, video_id, meta)
if failed:
print(f"\n {failed}/{len(missing)} download(s) failed.")
else:
print(f"\n {len(missing)} video(s) downloaded.")
else:
print("No missing Pexels videos.")
# 2. Enrich descriptions for existing files that have none
enrich_missing_descriptions(shared_assets_dir, api_key)
return 1 if failed else 0
# =============================================================================
# Validate Command
# =============================================================================
def cmd_validate(project_path: Path, verbose: bool) -> int:
"""Validate project configuration."""
from .parser import (
parse_manuscript,
parse_project_config,
parse_slides,
parse_videos,
)
from .validator import validate_project
print(f"Validating: {project_path.name}")
if not (project_path / "project.json").exists():
print(f"Error: project.json not found in {project_path}", file=sys.stderr)
return 1
# Parse all files
_, markers, malformed, _ = parse_manuscript(project_path)
config = parse_project_config(project_path)
slides = parse_slides(project_path, config)
videos, videos_dir = parse_videos(project_path, config)
if verbose:
print(f" - Markers in manuscript: {len(markers)}")
print(f" - Slides defined: {len(slides)}")
print(f" - Videos defined: {len(videos)}")
# Validate
warnings = validate_project(
project_path, markers, config, slides, videos, videos_dir, malformed
)
for w in warnings:
print(f" Warning: {w}")
# Write tasks file (missing assets only — no alignment data at validate time)
missing_videos = _collect_missing_video_markers(markers, videos)
_write_tasks_file(project_path, missing_videos, alignment_issues=[])
print("Validation passed.")
return 0
# =============================================================================
# Preprocess Command
# =============================================================================
def _resolve_process_cache(project_path: Path, config) -> Optional[Path]:
"""Return per-project cache dir on external disk, or None if not configured."""
if not (config and config.process_cache):
return None
p = Path(config.process_cache)
if not p.is_absolute():
p = (project_path / p).resolve()
return p / project_path.name
def _narration_combined_hint(project_path: Path, config) -> str:
"""Return a helpful hint when narration_combined.mov cannot be found.
If external storage is configured but the volume isn't mounted, the stitch
command wouldn't help — the disk is just not connected.
"""
from .cache import load_cache_config
missing_paths = []
cache_base = load_cache_config()
if cache_base is not None and not cache_base.exists():
missing_paths.append(cache_base)
if config and config.process_cache:
pc = Path(config.process_cache)
if not pc.is_absolute():
pc = (project_path / pc).resolve()
if not pc.exists():
missing_paths.append(pc)
if missing_paths:
return (
f"External disk not connected (expected at {missing_paths[0]}).\n"
"Connect the disk and try again."
)
return "Run 'gnommo -p <project> stitch' first."
def _resolve_narration_combined(
project_path: Path, videos_dir: Path, config
) -> Optional[Path]:
"""Find narration_combined.mov: local → GnommoCache → process_cache."""
local = videos_dir / "narration_combined.mov"
if local.exists():
return local
resolved, _ = resolve_with_cache(local, project_path)
if resolved.exists():
return resolved
pc_root = _resolve_process_cache(project_path, config)
if pc_root:
pc_path = pc_root / "media" / "videos" / "narration_combined.mov"
if pc_path.exists():
return pc_path
return None
def cmd_preprocess(
project_path: Path,
verbose: bool,
dry_run: bool,
force: bool = False,
workers: int = 1,
res: str = "full",
) -> int:
"""Run preprocessing pipeline on narration segments and videos.
Discovers source files directly from raw_mov/ (preferred) or raw_mp4/
(fallback when raw_mov/ is empty). Does NOT require narration.json to
exist — it writes/updates narration.json after processing.
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
from .parser import parse_project_config, parse_videos
from .preprocessor import preprocess_video, RES_CONFIGS
from .models import VideoSource as _VideoSource
mode_str = f" ({res.upper()})" if res != "full" else ""
print(f"Preprocessing narration: {project_path.name}{mode_str}")
config = parse_project_config(project_path)
# Narration directory — source files always in project media/narration/
narration_dir = project_path / "media" / "narration"
narration_dir.mkdir(parents=True, exist_ok=True)
raw_dir = narration_dir / "raw_mov"
compressed_dir = narration_dir / "raw_mp4"
# process_cache: write processed outputs to external disk to save laptop space
cache_root = _resolve_process_cache(project_path, config)
if cache_root:
# Mirror the project's media/ structure so GnommoCache (resolve_with_cache)
# finds these files transparently during render/stitch.
cache_narration_dir = cache_root / "media" / "narration"
cache_narration_dir.mkdir(parents=True, exist_ok=True)
(cache_narration_dir / "processed").mkdir(parents=True, exist_ok=True)
print(f" Using process cache: {cache_root}")
else:
cache_narration_dir = None
processed_dir = (cache_narration_dir or narration_dir) / "processed"
processed_dir.mkdir(parents=True, exist_ok=True)
# Resolve intermediate directory
gnommo_scratch = None
if config.gnommo_scratch:
gnommo_scratch = Path(config.gnommo_scratch)
if not gnommo_scratch.is_absolute():
gnommo_scratch = project_path / gnommo_scratch
print(f" Using intermediate dir: {gnommo_scratch}")
# --- Filter pipeline ---
talkinghead_filter = (config.default_filters or {}).get("talkinghead", [])
if not talkinghead_filter:
print(
" ERROR: No 'talkinghead' filter defined in project.json default_filters."
)
print(" Add a 'talkinghead' entry under 'default_filters' in project.json.")
return 1
# --- Source discovery ---
_video_exts = {".mov", ".mp4", ".avi", ".mkv", ".m4v"}
def _scan_dir(d: Path) -> list[Path]:
if not d.exists():
return []
return sorted(
f
for f in d.iterdir()
if f.is_file()
and f.suffix.lower() in _video_exts
and not f.name.startswith(".")
)
raw_mov_files = _scan_dir(raw_dir)
raw_mp4_files = _scan_dir(compressed_dir)
if raw_mov_files:
source_files = raw_mov_files
using_compressed = False
elif raw_mp4_files:
source_files = raw_mp4_files
using_compressed = True
print(
" WARNING: raw_mov/ is empty — using compressed files from raw_mp4/ instead. Quality may be reduced."
)
else:
print(f" No source files found in raw_mov/ or raw_mp4/.")
print(f" Place .mov recordings in {raw_dir}")
return 1
# --- Load existing narration.json to preserve per-segment settings ---
narration_json_path = narration_dir / "narration.json"
existing_narration: dict = {}
if narration_json_path.exists():
existing_narration = _read_json(narration_json_path)
# --- Build segments list ---
segments_to_process: list[tuple[str, _VideoSource]] = []
skipped_count = 0
for source_file in source_files:
segment_id = source_file.stem
# Strip _compressed suffix (raw_mp4 naming convention)
if using_compressed and segment_id.endswith("_compressed"):
segment_id = segment_id[: -len("_compressed")]
# For non-full res, write into the res subdir so stitch --res low finds the
# files at narration/low/processed/ (narration.json still records the plain
# "processed/..." path; stitch shifts the base dir itself).
_res_cfg = RES_CONFIGS.get(res) if res != "full" else None
if _res_cfg:
_, _, _subdir = _res_cfg
output_file = f"{_subdir}/processed/{segment_id}_processed.mov"
else:
output_file = f"processed/{segment_id}_processed.mov"
# When process_cache is set, output goes to the cache dir; narration.json
# still records the relative path so stitch (also using cache) can find it.
output_base = cache_narration_dir or narration_dir
output_path = output_base / output_file
if output_path.exists() and not force:
print(f" {segment_id}: output exists, skipping (use --force to reprocess)")
skipped_count += 1
continue
# Filter: from existing narration.json entry (if explicitly set), else talkinghead
existing_entry = existing_narration.get(segment_id, {})
raw_filter = existing_entry.get("filter")
if raw_filter:
if isinstance(raw_filter, str):
filter_list = (config.default_filters or {}).get(
raw_filter, talkinghead_filter
)
else:
filter_list = raw_filter
else:
filter_list = talkinghead_filter
video_source = _VideoSource(
source_file=source_file,
filter=filter_list,
output_file=output_file,
use_audio_channels=existing_entry.get("use_audio_channels", "auto"),
defer_loudnorm=existing_entry.get("defer_loudnorm", True),
)
segments_to_process.append((segment_id, video_source))
if not segments_to_process:
if skipped_count:
print(
f"\n All {skipped_count} segment(s) already preprocessed. Use --force to reprocess."
)
else:
print("\n No segments to preprocess.")
return 0
if dry_run:
for segment_id, segment_source in segments_to_process:
print(f"\n Would preprocess: {segment_id}")
print(f" Source: {segment_source.source_file}")
print(f" Output: {segment_source.output_file}")
print(f" Filters: {len(segment_source.filter)} step(s)")
return 0
# --- Process segments ---
successfully_processed: list[tuple[str, _VideoSource]] = []
if workers > 1 and len(segments_to_process) > 1:
num_workers = min(workers, len(segments_to_process))
print(
f"\n Processing {len(segments_to_process)} segments in parallel ({num_workers} workers)"
)
def process_segment_task(task):
seg_id, seg_source = task
preprocess_video(
cache_narration_dir or narration_dir,
seg_id,
seg_source,
verbose=False,
force=force,
custom_gnommo_scratch=gnommo_scratch,
res=res,
)
return task
completed = 0
with ThreadPoolExecutor(max_workers=num_workers) as executor:
futures = {
executor.submit(process_segment_task, t): t for t in segments_to_process
}
for future in as_completed(futures):
seg_id, seg_source = future.result()
completed += 1
print(f" Completed: {seg_id} ({completed}/{len(segments_to_process)})")
output_path = (
cache_narration_dir or narration_dir
) / seg_source.output_file
if output_path.exists():
successfully_processed.append((seg_id, seg_source))
else:
for segment_id, segment_source in segments_to_process:
_out_full = (
cache_narration_dir or narration_dir
) / segment_source.output_file
print(f"\n Processing: {segment_id}")
print(f" Source: {segment_source.source_file}")
print(f" Output: {_out_full}")
print(f" Filters: {len(segment_source.filter)} step(s)")
preprocess_video(
cache_narration_dir or narration_dir,
segment_id,
segment_source,
verbose,
force,
gnommo_scratch,
res=res,
)
output_path = (
cache_narration_dir or narration_dir
) / segment_source.output_file
if output_path.exists():
successfully_processed.append((segment_id, segment_source))
# --- Update narration.json ---
# Write processed segments; preserve any existing per-segment settings (skip/take/etc.)
_PRESERVE_KEYS = (
"skip",
"take",
"begin",
"end",
"cutout",
"use_audio_channels",
"defer_loudnorm",
"volume",
"zoom",
)
for segment_id, segment_source in successfully_processed:
existing_entry = existing_narration.get(segment_id, {})
entry: dict = {}
# Preserve settings the user may have set (trim points, cutout, etc.)
for key in _PRESERVE_KEYS:
if key in existing_entry:
entry[key] = existing_entry[key]
# Always record the plain path; stitch shifts the base dir for low/tiny.
entry["source_file"] = f"processed/{segment_id}_processed.mov"
entry.setdefault("use_audio_channels", "auto")
entry.setdefault("defer_loudnorm", True)
existing_narration[segment_id] = entry
with open(narration_json_path, "w", encoding="utf-8") as f:
json.dump(existing_narration, f, indent=2)
if successfully_processed:
print(f"\n Updated narration.json ({len(successfully_processed)} segment(s))")
print(
f"\n Run 'gnommo -p <project> stitch' to stitch narration segments into one full length narration file."
)
# Also preprocess videos from videos.json (e.g. chroma key, color grade)
videos, videos_dir = parse_videos(project_path, config)
videos_to_process = [
(vid_id, vid_src)
for vid_id, vid_src in videos.items()
if vid_src.filter and not vid_src.is_shared
]
if videos_to_process:
print(f"\n Processing {len(videos_to_process)} video(s) from videos.json:")
for video_id, video_source in videos_to_process:
if video_source.output_file:
output_path = videos_dir / video_source.output_file
if output_path.exists() and not force:
print(
f" {video_id}: output exists, skipping (use --force to reprocess)"
)
continue
if dry_run:
print(
f" Would preprocess: {video_id} ({len(video_source.filter)} filter(s))"
)
continue
print(f" Processing: {video_id}")
preprocess_video(
videos_dir,
video_id,
video_source,
verbose,
force,
gnommo_scratch,
res=res,
)
print("\nPreprocessing complete.")
return 0
# =============================================================================
# Trim Command — auto-detect silence bounds for narration segments
# =============================================================================
def cmd_trim(
project_path: Path,
verbose: bool,
force: bool = False,
threshold_db: float = -40.0,
res: str = "full",
) -> int:
"""
Auto-detect silence bounds for all narration segments and write skip/take
values into narration.json.
For each segment:
skip = max(0, first_sound_time - 0.5)
take = last_sound_time + 3.0 - skip (capped at file duration)
Segments that already have explicit skip or take values are left unchanged
unless --force is passed.
Use --threshold to adjust sensitivity, e.g. -25 to ignore clothing/room
noise that sits above -40 dB.
"""
from .parser import parse_project_config, parse_narration
from .preprocessor import detect_silence_bounds, get_video_duration
print(f"Auto-trimming narration: {project_path.name}")
config = parse_project_config(project_path)
narration, narration_dir = parse_narration(project_path, config)
if not narration:
print(" No narration segments found in narration.json")
print(" Run 'gnommo -p <project> import' first.")
return 1
# Build a lookup of raw source files by segment ID. Raw files give cleaner
# silence detection — loudnorm can introduce early peaks in processed audio.
_video_exts = {".mov", ".mp4", ".avi", ".mkv", ".m4v"}
raw_dir = narration_dir / "raw_mov"
compressed_dir = narration_dir / "raw_mp4"
raw_lookup: dict[str, Path] = {}
for search_dir in (raw_dir, compressed_dir):
if search_dir.exists():
for f in search_dir.iterdir():
if (
f.is_file()
and f.suffix.lower() in _video_exts
and not f.name.startswith(".")
):
stem = f.stem
if stem.endswith("_compressed"):
stem = stem[: -len("_compressed")]
raw_lookup[stem] = f
narration_json_path = narration_dir / "narration.json"
raw_data: dict = _read_json(narration_json_path)
updated = 0
for seg_id in sorted(narration.keys()):
seg = narration[seg_id]
existing = raw_data.get(seg_id, {})
has_explicit = "skip" in existing or "take" in existing
if has_explicit and not force:
print(f" {seg_id}: already trimmed, skipping (use --force to redo)")
continue
# Prefer raw file; fall back to processed if raw not available.
source_path = raw_lookup.get(seg_id)
if source_path is None:
source_path = narration_dir / seg.source_file
if not source_path.exists():
print(f" {seg_id}: source file not found, skipping")
continue
print(
f" {seg_id}: analysing {source_path.parent.name}/{source_path.name}...",
end="",
flush=True,
)
first_sound, last_sound = detect_silence_bounds(
source_path, noise_threshold_db=threshold_db, verbose=verbose
)
total_dur = get_video_duration(source_path)
new_skip = max(0.0, round(first_sound - 0.5, 3))
new_take = round(min(total_dur - new_skip, last_sound + 3.0 - new_skip), 3)
new_take = max(0.0, new_take)
print(
f" first={first_sound:.2f}s last={last_sound:.2f}s"
f" → skip={new_skip:.3f}s take={new_take:.3f}s"
)
raw_data[seg_id]["skip"] = new_skip
raw_data[seg_id]["take"] = new_take
updated += 1
if updated > 0:
with open(narration_json_path, "w", encoding="utf-8") as f:
json.dump(raw_data, f, indent=2)
print(f"\n Updated {updated} segment(s) in narration.json")
else:
print(f"\n No segments updated")
return 0
# =============================================================================
# Transcode Command — compress narration folder to H.265
# =============================================================================
def _get_video_codec(path: Path) -> str:
"""Return the codec name of the first video stream (e.g. 'hevc', 'prores', 'h264')."""
result = subprocess.run(
[
"ffprobe",
"-v",
"error",
"-select_streams",
"v:0",
"-show_entries",
"stream=codec_name",
"-of",
"default=noprint_wrappers=1:nokey=1",
str(path),
],
capture_output=True,
text=True,
)
return result.stdout.strip().lower()
def _transcode_processed_files(
project_path: Path,
verbose: bool,
dry_run: bool,
replace: bool,
force: bool,
alpha_quality: float,
) -> int:
"""
Compress _processed.mov files (ProRes 4444 + alpha) to HEVC+alpha via
Apple VideoToolbox.
For each _processed.mov:
1. Transcode to a temp file using hevc_videotoolbox with alpha.
2. Move the ProRes original into a prores/ subdirectory (never deleted).
3. Rename the compressed file to the original _processed.mov name
so stitch/render find it unchanged.
The prores/ subdirectory is never scanned — only top-level files are candidates.
If prores/<filename> already exists the file has already been compressed —
skip unless --force.
"""
from .parser import parse_project_config, parse_narration
print(f"Transcoding processed files (HEVC+alpha): {project_path.name}")
config = parse_project_config(project_path)
# Resolve narration_dir and videos_dir — processed files live in both
_narration, narration_dir = parse_narration(project_path, config)
videos_json_path = project_path / config.videos_path
videos_dir = videos_json_path.parent
# Glob both directories for *_processed.mov; skip any _prores.mov archives
search_dirs = [d for d in [narration_dir, videos_dir] if d.exists()]
candidates: list[Path] = []
seen: set[Path] = set()
for d in search_dirs:
for p in d.glob("*_processed.mov"):
if p not in seen and "_prores" not in p.stem:
seen.add(p)
candidates.append(p)
if not candidates:
print(" No _processed.mov files found.")
return 0
# Smallest first
candidates = [c for c in candidates if c.exists()]
candidates.sort(key=lambda f: f.stat().st_size)
total_original = 0
total_compressed = 0
transcoded = 0
skipped = 0
for src in candidates:
# Archive goes into prores/ subdirectory alongside the source file
prores_dir = src.parent / "prores"
archive = prores_dir / src.name
# Always skip files already encoded as HEVC — regardless of --replace or --force
if _get_video_codec(src) == "hevc":
print(f" {src.name}: already HEVC, skipping")
skipped += 1
continue
# Without --replace, skip if the archive already exists in prores/
if not replace and archive.exists() and not force:
size_mb = src.stat().st_size / 1_048_576
print(
f" {src.name}: already compressed ({size_mb:.1f} MB), skipping (use --force to redo)"
)
skipped += 1
continue
src_mb = src.stat().st_size / 1_048_576
print(f" {src.name} ({src_mb:.1f} MB) → HEVC+alpha", end="")
if dry_run:
print(" [dry-run]")
continue
print(" ...", end="", flush=True)
tmp_out = src.with_name(src.stem + "_hevc_tmp.mov")
cmd = [
"ffmpeg",
"-i",
str(src),
"-c:v",
"hevc_videotoolbox",
"-allow_sw",
"1",
"-alpha_quality",
str(alpha_quality),
"-tag:v",
"hvc1",
"-c:a",
"copy",
"-y",
str(tmp_out),
]
if verbose:
print()
print(" " + " ".join(cmd))
result = subprocess.run(
cmd,
capture_output=not verbose,
text=True,
)
if result.returncode != 0:
print(f"\n ERROR transcoding {src.name}")
if tmp_out.exists():
tmp_out.unlink()
if not verbose and result.stderr:
last_lines = result.stderr.strip().splitlines()[-5:]
for line in last_lines:
print(f" {line}", file=sys.stderr)
continue
out_mb = tmp_out.stat().st_size / 1_048_576
ratio = (1.0 - tmp_out.stat().st_size / src.stat().st_size) * 100
if replace:
# Delete ProRes original, move compressed into its place
src.unlink()
tmp_out.rename(src)
print(
f"\r {src.name} ({src_mb:.1f} MB) → HEVC+alpha"
f" ({out_mb:.1f} MB, -{ratio:.0f}%)"
)
else:
# Move ProRes original into prores/ subdirectory, compressed takes its place
prores_dir.mkdir(exist_ok=True)
src.rename(archive)
tmp_out.rename(src)
print(
f"\r {src.name} ({src_mb:.1f} MB) → HEVC+alpha"
f" ({out_mb:.1f} MB, -{ratio:.0f}%)"
f" [ProRes → prores/{archive.name}]"
)
total_original += int(src_mb * 1_048_576)
total_compressed += int(out_mb * 1_048_576)
transcoded += 1
print()
if dry_run:
print(f" [dry-run] Would compress {len(candidates) - skipped} file(s)")
return 0
if transcoded > 0:
orig_mb = total_original / 1_048_576
comp_mb = total_compressed / 1_048_576
saved_mb = orig_mb - comp_mb
ratio = (saved_mb / orig_mb * 100) if orig_mb else 0
print(
f" Compressed {transcoded} file(s): {orig_mb:.1f} MB → {comp_mb:.1f} MB"
f" (saved {saved_mb:.1f} MB, -{ratio:.0f}%)"
)
if skipped:
print(f" Skipped {skipped} already-compressed file(s)")
return 0
def cmd_transcode(
project_path: Path,
verbose: bool,
dry_run: bool = False,
replace: bool = False,
crf: int = 23,
force: bool = False,
processed: bool = False,
alpha_quality: float = 0.75,
) -> int:
"""
Transcode project video files to save disk space.
Default (1st pass, before preprocess):
Compress raw narration recordings to H.265. Output: {stem}_compressed.mp4.
Skips files with '_compressed.' or '_processed.' in the name.
Use --replace to delete originals after success.
With --processed (2nd pass, after preprocess):
Compress _processed.mov files (ProRes 4444 + alpha) to HEVC+alpha.
Archives the ProRes original as _prores.mov (never deleted).
The compressed file takes the original _processed.mov name so the
rest of the pipeline (stitch, render) finds it unchanged.
Uses Apple VideoToolbox (hevc_videotoolbox) with --alpha-quality.
"""
if processed:
return _transcode_processed_files(
project_path, verbose, dry_run, replace, force, alpha_quality
)
from .parser import parse_project_config, parse_narration
print(f"Transcoding narration: {project_path.name}")
config = parse_project_config(project_path)
_narration, narration_dir = parse_narration(project_path, config)
raw_dir = narration_dir / "raw_mov"
compressed_dir = narration_dir / "raw_mp4"
if not raw_dir.exists():
print(f" raw/ directory not found: {raw_dir}", file=sys.stderr)
print(f" Place raw recordings in {raw_dir} and run 'import' first.")
return 1
compressed_dir.mkdir(parents=True, exist_ok=True)
# Collect eligible video files from raw/ only
video_extensions = {".mp4", ".mov", ".avi", ".mkv", ".m4v", ".mts", ".webm"}
candidates = [
f
for f in raw_dir.iterdir()
if f.is_file()
and f.suffix.lower() in video_extensions
and not f.name.startswith(".")
]
if not candidates:
print(f" No video files found in {raw_dir}.")
return 0
# Process smallest files first
candidates.sort(key=lambda f: f.stat().st_size)
total_original = 0
total_compressed = 0
transcoded = 0
skipped = 0
for src in candidates:
# Output: compressed/<stem>.mp4 (clean name, no _compressed suffix)
output = compressed_dir / f"{src.stem}.mp4"
if output.exists() and not force:
size_mb = output.stat().st_size / 1_048_576
print(
f" {src.name}: already transcoded ({size_mb:.1f} MB), skipping (use --force to redo)"
)
skipped += 1
continue
src_mb = src.stat().st_size / 1_048_576
print(
f" raw/{src.name} ({src_mb:.1f} MB) → compressed/{output.name}", end=""
)
if dry_run:
print(" [dry-run]")
continue
print(" ...", end="", flush=True)
cmd = [
"ffmpeg",
"-i",
str(src),
"-vf",
"scale=-2:1080",
"-c:v",
"libx265",
"-crf",
str(crf),
"-preset",
"medium",
"-c:a",
"aac",
"-b:a",
"128k",
"-tag:v",
"hvc1",
"-y",
str(output),
]
if verbose:
print()
print(" " + " ".join(cmd))
result = subprocess.run(
cmd,
capture_output=not verbose,
text=True,
)
if result.returncode != 0:
print(f"\n ERROR transcoding {src.name}")
if not verbose and result.stderr:
# Print last few lines of ffmpeg stderr for diagnosis
last_lines = result.stderr.strip().splitlines()[-5:]
for line in last_lines:
print(f" {line}", file=sys.stderr)
continue
out_mb = output.stat().st_size / 1_048_576
ratio = (1.0 - output.stat().st_size / src.stat().st_size) * 100
print(
f"\r raw/{src.name} ({src_mb:.1f} MB) → compressed/{output.name} ({out_mb:.1f} MB, -{ratio:.0f}%)"
)
total_original += src.stat().st_size
total_compressed += output.stat().st_size
transcoded += 1
print()
if dry_run:
print(f" [dry-run] Would transcode {len(candidates) - skipped} file(s)")
return 0
if transcoded > 0:
orig_mb = total_original / 1_048_576
comp_mb = total_compressed / 1_048_576
saved_mb = orig_mb - comp_mb
ratio = (saved_mb / orig_mb * 100) if orig_mb else 0
print(
f" Transcoded {transcoded} file(s): {orig_mb:.1f} MB → {comp_mb:.1f} MB (saved {saved_mb:.1f} MB, -{ratio:.0f}%)"
)
if replace:
print(f" Originals deleted.")
if skipped:
print(f" Skipped {skipped} already-transcoded file(s)")
return 0
# =============================================================================
# Stitch Command (fast iteration on narration segments)
# =============================================================================
def cmd_stitch(
project_path: Path,
verbose: bool,
force: bool = False,
res: str = "full",
) -> int:
"""
Stitch narration segments from narration.json.
Reads segments from media/narration/narration.json, applies begin/end
trimming during concatenation, and writes output to media/videos/narration_combined.mov.
Also creates/updates an entry in videos.json with volume property.
"""
from .parser import parse_project_config, parse_narration, parse_videos
from .preprocessor import (
stitch_narration_segments,
ensure_downscaled_files_exist,
RES_CONFIGS,
)
mode_str = f" ({res.upper()})" if res != "full" else ""
print(f"Stitching narration: {project_path.name}{mode_str}")
config = parse_project_config(project_path)
narration, narration_dir = parse_narration(project_path, config)
if not narration:
print(" No narration segments found in media/narration/narration.json")
print(" Run 'gnommo -p <project> import' first to populate narration.json")
return 1
# Get videos_dir for output
if config and config.videos_path:
videos_json_path = project_path / config.videos_path
videos_dir = videos_json_path.parent
else:
videos_dir = project_path / "media" / "videos"
# When process_cache is set, redirect processed segment reads and combined output.
# Mirror media/ structure so GnommoCache (resolve_with_cache) finds files during render.
cache_root = _resolve_process_cache(project_path, config)
if cache_root:
narration_dir = cache_root / "media" / "narration"
narration_dir.mkdir(parents=True, exist_ok=True)
videos_dir_out = cache_root / "media" / "videos"
videos_dir_out.mkdir(parents=True, exist_ok=True)
print(f" Using process cache: {cache_root}")
else:
videos_dir_out = videos_dir
# Use downscaled dirs for non-full res
if res != "full":
cfg = RES_CONFIGS[res]
narration_dir = ensure_downscaled_files_exist(
narration_dir, res, force=False, verbose=verbose
)
videos_dir_out = videos_dir_out / cfg[2]
videos_dir_out.mkdir(parents=True, exist_ok=True)
print(f" Using {res} dirs: {narration_dir}, {videos_dir_out}")
# Get segment IDs in natural order (Segment2 before Segment10)
segment_ids = sorted(narration.keys(), key=lambda s: [int(t) if t.isdigit() else t.lower() for t in re.split(r'(\d+)', s)])
# Show what we're stitching
print(f"\n Segments ({len(segment_ids)}):")
for segment_id in segment_ids:
seg = narration[segment_id]
skip_str = f"skip={seg.skip:.1f}s" if seg.skip else ""
take_str = f"take={seg.take:.1f}s" if seg.take else ""
trim_info = ", ".join(filter(None, [skip_str, take_str]))
trim_str = f" ({trim_info})" if trim_info else ""
print(f" - {segment_id}{trim_str}")
stitch_output = videos_dir_out / "narration_combined.mov"
if stitch_output.exists() and not force:
print(f"\n Combined narration exists: {stitch_output.name}")
print(" (use --force to regenerate)")
else:
# Extract loudnorm config from talkinghead filter so stitch uses
# per-project settings instead of hardcoded defaults.
_loudnorm_cfg = None
if config and config.default_filters:
for _f in config.default_filters.get("talkinghead") or []:
if isinstance(_f, dict) and _f.get("type") == "audio_normalize":
_loudnorm_cfg = _f
break
stitch_narration_segments(
narration_dir,
segment_ids,
narration,
stitch_output,
verbose=verbose,
default_end_trim=config.default_end_trim if config else 0.0,
loudnorm_config=_loudnorm_cfg,
)
# Run import videos again to update duration metadata (skip when using cache
# since narration_combined.mov lives on the external disk, not in videos_dir).
if not cache_root:
_import_videos(videos_dir_out, config, verbose)
# Always update the MAIN videos.json (parent of subdir when using low/tiny res)
# Downscaled dirs only affect file paths, not JSON metadata updates
main_videos_dir = (
videos_dir_out.parent if (res != "full" and not cache_root) else videos_dir
)
videos_json_path = main_videos_dir / "videos.json"
if True: # Always update JSON regardless of proxy mode
existing_videos: dict = {}
if videos_json_path.exists():
existing_videos = _read_json(videos_json_path)
# Get cutout from first narration segment
first_seg = narration[segment_ids[0]]
cutout = first_seg.cutout or "talkinghead"
# Create/update narration_combined entry
existing_videos["narration_combined"] = {
"source_file": "narration_combined.mov",
"cutout": cutout,
"always_visible": True,
"volume": 1.0,
}
with open(videos_json_path, "w", encoding="utf-8") as f:
json.dump(existing_videos, f, indent=2)
print(f"\n Updated videos.json with narration_combined entry (volume=1.0)")
print(" Edit videos.json to adjust volume if needed.")
print("\nConcatenation complete.")
# Automatically transcribe to keep transcript in sync with narration
print("\n" + "=" * 60)
print("Auto-running transcribe to sync with new narration...")
print("=" * 60 + "\n")
return cmd_transcribe(project_path, verbose, res=res)
# =============================================================================
# Render Command
# =============================================================================
def _format_time(seconds: float) -> str:
"""Format seconds as MM:SS.ms"""
if seconds < 0:
return "??:??.??"
mins = int(seconds // 60)
secs = seconds % 60
return f"{mins:02d}:{secs:05.2f}"
def _print_render_plan_details(plan, marker_timings, slides: dict) -> None:
"""
Print a detailed render plan showing each marker with its aligned time.
Uses marker_timings from the transformer which contains alignment info.
"""
from .models import CAMERA_PRESETS
print("\n RENDER PLAN:")
print(" " + "-" * 76)
# Build lookup for video events by video_id
video_events_by_id = {}
for event in plan.video_events:
video_events_by_id[event.video_id] = event
audio_events_by_time = {}
for event in plan.audio_events:
t = round(event.start_time, 1)
if t not in audio_events_by_time:
audio_events_by_time[t] = []
audio_events_by_time[t].append(event)
camera_events_by_time = {}
for event in plan.camera_events:
t = round(event.time, 1)
if t not in camera_events_by_time:
camera_events_by_time[t] = []
camera_events_by_time[t].append(event)
# Detect slide markers that share a timestamp with the adjacent slide marker.
# Two slides at the same time means alignment is ambiguous — treat as an error.
slide_timings = [
t for t in marker_timings if t.marker_id in slides and t.timestamp >= 0
]
collision_ids: set[str] = set()
for a, b in zip(slide_timings, slide_timings[1:]):
if abs(a.timestamp - b.timestamp) < 0.1:
collision_ids.add(a.marker_id)
collision_ids.add(b.marker_id)
# Print each marker timing
aligned_count = 0
unaligned_count = 0
collision_count = 0
for timing in marker_timings:
marker_id = timing.marker_id
context = timing.context
if len(context) > 50:
context = context[:47] + "..."
if timing.timestamp >= 0:
time_str = _format_time(timing.timestamp)
# Show confidence if fuzzy match
conf_str = ""
if timing.confidence < 1.0:
conf_str = f" ({timing.confidence:.0%})"
# Determine marker type for display
if marker_id in slides:
if marker_id in collision_ids:
collision_count += 1
print(
f' {marker_id:6} {time_str}{conf_str} COLLISION - same time as adjacent slide - "{context}"'
)
else:
aligned_count += 1
print(f' {marker_id:6} {time_str}{conf_str} "{context}"')
elif any(
marker_id.startswith(p)
for p in (
"video:",
"vft:",
"vfb:",
"vf2t:",
"vf2b:",
"vst:",
"vsb:",
"vftp:",
"vfbp:",
"vf2tp:",
"vf2bp:",
"vstp:",
"vsbp:",
)
):
aligned_count += 1
pfx_len = next(
len(p)
for p in (
"video:",
"vft:",
"vfb:",
"vf2t:",
"vf2b:",
"vst:",
"vsb:",
"vftp:",
"vfbp:",
"vf2tp:",
"vf2bp:",
"vstp:",
"vsbp:",
)
if marker_id.startswith(p)
)
video_id = marker_id[pfx_len:]
# Find corresponding event by video_id
event = video_events_by_id.get(video_id)
if event:
cutout_name = event.cutout_name
end_on = event.video_source.end_on or "next_slide"
layer_tag = f" [{event.layer}]"
else:
cutout_name = "?"
end_on = "next_slide"
layer_tag = ""
cache_ind = " 📁" if video_id in plan.cached_files else ""
print(
f" {marker_id:20} {time_str} in '{cutout_name}' [{end_on}]{layer_tag}{cache_ind}"
)
elif marker_id.startswith("narration:"):
aligned_count += 1
video_id = marker_id[10:]
cache_ind = " 📁" if video_id in plan.cached_files else ""
print(f" {marker_id:20} {time_str} (continuous){cache_ind}")
elif marker_id in CAMERA_PRESETS:
aligned_count += 1
print(f" {time_str} [{marker_id}]")
elif marker_id.startswith("audio:"):
aligned_count += 1
print(f" {time_str} [{marker_id}]")
else:
aligned_count += 1
print(f' {marker_id:6} {time_str} "{context}"')
else:
unaligned_count += 1
# Check if this is a slide that was interpolated into the plan
if marker_id in slides:
interp_event = next(
(e for e in plan.slide_events if e.slide_id == marker_id), None
)
if interp_event:
interp_str = _format_time(interp_event.start_time)
print(f' {marker_id:6} ~{interp_str} INTERPOLATED - "{context}"')
else:
print(f' {marker_id:6} ??:??.?? NOT ALIGNED - "{context}"')
else:
print(f' {marker_id:6} ??:??.?? NOT ALIGNED - "{context}"')
print(" " + "-" * 76)
# Summary
total_markers = len(marker_timings)
slide_markers = [t for t in marker_timings if t.marker_id in slides]
good_slides = len(
[
t
for t in slide_markers
if t.timestamp >= 0 and t.marker_id not in collision_ids
]
)
total_slides = len(slide_markers)
issues = []
if unaligned_count:
issues.append(f"{unaligned_count} UNALIGNED")
if collision_count:
issues.append(f"{collision_count} COLLISION")
status = "OK" if not issues else ", ".join(issues)
print(f" Markers: {aligned_count}/{total_markers} aligned ({status})")
print(f" Slides: {good_slides}/{total_slides}")
print(
f" Videos: {len(plan.video_events)} triggered, {len(plan.narration_videos)} always-visible"
)
if plan.outro_events:
print(f" Outro: {len(plan.outro_events)} video(s)")
for event in plan.outro_events:
print(
f" - {event.video_id}: {_format_time(event.start_time)} - {_format_time(event.end_time)}"
)
print(f" Duration: {_format_time(plan.total_duration)}")
def _parse_slide_range(slides_arg: str) -> tuple[str, Optional[str]]:
"""Parse slide range argument like 'S1:S10' or 'S5:' into a tuple."""
if ":" not in slides_arg:
raise ValueError(
f"Invalid slide range '{slides_arg}'. Expected format: S1:S10 or S5:"
)
parts = slides_arg.split(":", 1)
start_slide = parts[0].strip()
end_slide = parts[1].strip() if parts[1].strip() else None
if not start_slide:
raise ValueError(
f"Invalid slide range '{slides_arg}'. Start slide is required."
)
return start_slide, end_slide
def _project_markers_to_videos(
markers: list[str], videos_json_path: Path, config, project_path: Path = None
) -> None:
"""ETL: project shorthand marker semantics into videos.json.
Scans the manuscript marker list for shorthand prefixes (vft:, vfb:, vst:,
vsb:, vf2t:, vf2b: and their pause variants) and writes the implied cutout
and layer values directly into videos.json. This runs before parse_videos
so the render pass reads already-projected data and needs no shorthand logic.
Videos may live in the project's local videos.json or in shared_assets/videos.json.
Both files are updated so the render pass always finds the projected values.
The manuscript is the authoritative source: the LAST shorthand reference to
a given video_id wins, matching what a human editor would expect when they
change a marker near the end of the script.
"""
if not videos_json_path.exists():
return
from .transformer import _SHORTHAND_PREFIXES # (cutout, layer) lookup table
_PAUSE_PREFIXES = {
"vftp:", "vfbp:", "vfmp:",
"vf2tp:", "vf2bp:", "vf2mp:",
"vstp:", "vsbp:", "vsmp:",
}
# Build projection: video_id → {cutout, layer, auto_pause_narration}
# auto_pause_narration=True means: write pause_narration=duration if not already set.
projection: dict[str, dict] = {}
for marker in markers:
for prefix, implied in _SHORTHAND_PREFIXES.items():
if marker.startswith(prefix):
video_id = marker[len(prefix):]
cutout, layer = implied[0], implied[1]
projection[video_id] = {
"cutout": cutout,
"layer": layer,
"_auto_pause": prefix in _PAUSE_PREFIXES,
}
break
if not projection:
return
def _apply_projection(json_path: Path) -> list[str]:
"""Apply projection to one videos.json file; return list of updated IDs."""
if not json_path.exists():
return []
with open(json_path, "r", encoding="utf-8") as f:
raw = json.load(f)
changed = False
updated = []
for video_id, fields in projection.items():
if video_id not in raw:
continue
entry = raw[video_id]
video_changed = False
for field, value in fields.items():
if field == "_auto_pause":
# Write pause_narration = duration only when:
# - marker is a pause-prefix (value is True)
# - pause_narration not already set (preserve manual overrides)
# - duration is known (probed by import)
if value and not entry.get("pause_narration") and entry.get("duration"):
entry["pause_narration"] = entry["duration"]
changed = True
video_changed = True
elif entry.get(field) != value:
entry[field] = value
changed = True
video_changed = True
if video_changed:
updated.append(video_id)
if changed:
with open(json_path, "w", encoding="utf-8") as f:
json.dump(raw, f, indent=2, ensure_ascii=False)
return updated
updated_local = _apply_projection(videos_json_path)
if updated_local:
print(f" Projected marker semantics → videos.json: {', '.join(updated_local)}")
# Also project into shared_assets/videos.json for pexels/library videos
shared_assets_dir = _find_shared_assets(project_path) if project_path else None
if shared_assets_dir:
shared_videos_json = shared_assets_dir / "videos.json"
updated_shared = _apply_projection(shared_videos_json)
if updated_shared:
print(f" Projected marker semantics → shared_assets/videos.json: {', '.join(updated_shared)}")
def _writeback_video_metadata(plan, project_path, config) -> None:
"""Write back cutout/layer derived from shorthand markers to videos.json.
When a shorthand like [vfb:FARTSection1] is used and FARTSection1 has no
'cutout' set in videos.json, this persists the resolved cutout (and layer if
the shorthand implies a non-default layer) back to the file. Once written,
subsequent renders read the value directly and no further write-back occurs.
"""
import json
videos_json_path = project_path / config.videos_path
if not videos_json_path.exists():
return
# Collect field updates per video_id
writebacks: dict[str, dict] = {}
for event in plan.video_events:
video_id = event.video_id
source = event.video_source
if source.is_shared:
continue # shared videos live in their own file
updates = {}
if source.cutout is None and event.cutout_name:
updates["cutout"] = event.cutout_name
if event.layer != source.layer:
updates["layer"] = event.layer
if updates:
writebacks.setdefault(video_id, {}).update(updates)
if not writebacks:
return
with open(videos_json_path, "r", encoding="utf-8") as f:
raw = json.load(f)
changed = False
for video_id, updates in writebacks.items():
if video_id not in raw:
continue
for field, value in updates.items():
if raw[video_id].get(field) != value:
raw[video_id][field] = value
changed = True
if changed:
with open(videos_json_path, "w", encoding="utf-8") as f:
json.dump(raw, f, indent=2, ensure_ascii=False)
written = ", ".join(
f"{vid}({', '.join(upd)})" for vid, upd in writebacks.items()
)
print(f" Updated videos.json: {written}")
def _chunked_render(
project_path: Path,
verbose: bool,
dry_run: bool,
res: str,
force: bool,
chunk_size: int,
slide_ids: list[str],
out_dir: Path,
final_output: Path,
) -> int:
"""Render in slide-based chunks then concatenate — avoids filter graph OOM."""
import math
# Split slide IDs into groups of chunk_size
groups = [
slide_ids[i : i + chunk_size] for i in range(0, len(slide_ids), chunk_size)
]
print(
f"\n Auto-chunking: {len(slide_ids)} slides → {len(groups)} chunks of ≤{chunk_size}"
)
chunks_dir = out_dir / "chunks"
chunks_dir.mkdir(parents=True, exist_ok=True)
chunk_paths: list[Path] = []
for i, group in enumerate(groups):
start = group[0]
end = groups[i + 1][0] if i + 1 < len(groups) else None
slides_arg = f"{start}:{end}" if end else f"{start}:"
chunk_path = chunks_dir / f"chunk_{i+1:03d}_{start}-{end or 'end'}.mp4"
print(f"\n {'='*56}")
print(f" Chunk {i+1}/{len(groups)}: {slides_arg}{chunk_path.name}")
print(f" {'='*56}")
result = cmd_render(
project_path,
verbose,
dry_run,
slides_arg=slides_arg,
res=res,
force=force,
_output_path_override=chunk_path,
)
if result != 0:
print(f"\n Chunk {i+1} failed — aborting.", file=sys.stderr)
return result
chunk_paths.append(chunk_path)
if dry_run:
print(
f"\n [dry-run] Would concatenate {len(chunk_paths)} chunks → {final_output}"
)
return 0
# Concatenate chunks
print(f"\n Concatenating {len(chunk_paths)} chunks → {final_output.name}...")
concat_list = chunks_dir / "concat.txt"
with open(concat_list, "w") as f:
for p in chunk_paths:
f.write(f"file '{p.resolve()}'\n")
concat_cmd = [
"ffmpeg",
"-y",
"-f",
"concat",
"-safe",
"0",
"-i",
str(concat_list),
"-c",
"copy",
str(final_output),
]
result = subprocess.run(concat_cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f" Concatenation failed:\n{result.stderr}", file=sys.stderr)
return 1
# Clean up chunk files
for p in chunk_paths:
p.unlink(missing_ok=True)
concat_list.unlink(missing_ok=True)
try:
chunks_dir.rmdir()
except OSError:
pass
print(f" Output: {final_output}")
return 0
def cmd_render(
project_path: Path,
verbose: bool,
dry_run: bool,
slides_arg: str = None,
res: str = "full",
force: bool = False,
chunk_slides: int = 0,
_output_path_override: Path = None,
) -> int:
"""Render final video."""
from .parser import (
parse_audio,
parse_manuscript,
parse_project_config,
parse_slides,
parse_videos,
save_citations,
)
from .transcriber import load_transcript
from .validator import validate_project
from .transformer import build_render_plan
from .renderer import render, generate_ffmpeg_command_string
from .preprocessor import RES_CONFIGS, ensure_downscaled_files_exist
# Parse slide range if provided
slide_range = None
if slides_arg:
slide_range = _parse_slide_range(slides_arg)
print(f"Rendering: {project_path.name} (slides {slides_arg})")
else:
print(f"Rendering: {project_path.name}")
# Show resolution mode
if res != "full":
cfg = RES_CONFIGS[res]
print(f" Resolution: {res.upper()} ({cfg[0]}x{cfg[1]})")
# Show cache status
cache_info = get_cache_info()
if cache_info:
print(f" Cache: {cache_info}")
# Stage 1: Parse
print("\n[1/4] Parsing...")
manuscript_text, markers, malformed, citations = parse_manuscript(project_path)
# Save citations for later use (e.g., description generation)
if citations:
citations_path = project_path / "citations.json"
save_citations(citations, citations_path)
config = parse_project_config(project_path)
# ETL: project shorthand marker semantics (cutout/layer) into videos.json
# before parse_videos reads it, so the render pass is purely data-driven.
_project_markers_to_videos(markers, project_path / config.videos_path, config, project_path)
# Override resolution for preview modes
if res != "full":
cfg = RES_CONFIGS[res]
config.resolution = (cfg[0], cfg[1])
slides = parse_slides(project_path, config)
videos, videos_dir = parse_videos(project_path, config)
source_videos_dir = videos_dir # keep original for validation (pre-downscale)
# Non-full res: use downscaled video directory, create on-the-fly if needed
if res != "full":
# Skip downscaling sources that have a preprocessed output_file — the
# renderer will use the full-res processed version instead, saving disk space.
sources_with_output = {v.source_file for v in videos.values() if v.output_file}
videos_dir = ensure_downscaled_files_exist(
videos_dir,
res,
force=False,
verbose=verbose,
skip_sources=sources_with_output,
)
if verbose:
print(f" Using {res} dir: {videos_dir}")
audio, audio_dir = parse_audio(project_path, config)
# Load whisper transcription JSON
# Check for narration_combined in videos.json (new workflow) or multi-segment in config (legacy)
combined_path = videos_dir / "narration_combined.mov"
resolved_combined = _resolve_narration_combined(project_path, videos_dir, config)
if resolved_combined and resolved_combined != combined_path:
# File lives on external disk — point the VideoSource at the absolute path so
# the renderer doesn't re-resolve it via the local (missing) videos_dir.
if "narration_combined" in videos:
videos["narration_combined"].source_file = str(resolved_combined)
if (
"narration_combined" in videos
and resolved_combined
and resolved_combined.exists()
):
# New workflow: narration_combined was created by 'gnommo concat' and is in videos.json
# This entry has the correct volume setting from videos.json
transcript_path = resolved_combined.with_suffix(".transcript.json")
config.main_video = "narration_combined"
if verbose:
print(
f" Using combined narration: {resolved_combined.name} (volume={videos['narration_combined'].volume})"
)
elif isinstance(config.main_video, list) and len(config.main_video) > 1:
# Legacy: Multi-segment narration with main_video array in project.json
resolved_combined, _ = resolve_with_cache(combined_path, project_path)
transcript_path = resolved_combined.with_suffix(".transcript.json")
if not resolved_combined.exists():
print(
f"Error: Combined narration not found: {combined_path}", file=sys.stderr
)
print(
"Run 'gnommo -p <project> concat' first to concatenate segments.",
file=sys.stderr,
)
return 1
# Create a synthetic video entry for the combined narration
# Inherit settings from the first segment
first_segment_id = config.main_video[0]
if first_segment_id in videos:
first_segment = videos[first_segment_id]
from .models import VideoSource
combined_video = VideoSource(
source_file="narration_combined.mov",
filter=first_segment.filter,
output_file=None, # Already processed
cutout=first_segment.cutout,
always_visible=True,
skip=0.0, # Already trimmed during concatenation
take=None,
)
videos["_narration_combined"] = combined_video
config.main_video = "_narration_combined"
if verbose:
print(f" Using combined narration: {combined_path.name}")
else:
# Check if narration.json exists with segments (new workflow) - if so, require narration_combined
narration_json = project_path / "media" / "narration" / "narration.json"
if narration_json.exists() and _read_json(narration_json):
print(
f"Error: narration_combined not found in videos.json", file=sys.stderr
)
print(
f"You have narration segments in narration.json but haven't stitched them.",
file=sys.stderr,
)
print(
f"Run 'gnommo -p {project_path.name} stitch' first.",
file=sys.stderr,
)
return 1
# Single video - look for .transcript.json next to the narration video
result = _find_narration_video(config, videos)
if result:
video_id, narration_source = result
config.main_video = video_id # Ensure main_video is set to the found video
video_path = videos_dir / narration_source.source_file
transcript_path = video_path.with_suffix(".transcript.json")
else:
transcript_path = project_path / "transcript.json"
# If project.json specifies a transcript path, prefer it (always local)
if config.transcript_path:
local_transcript = project_path / config.transcript_path
if local_transcript.exists():
transcript_path = local_transcript
# Try cache fallback for transcript
transcript_path, _ = resolve_with_cache(transcript_path, project_path)
if not transcript_path.exists():
print(f"Error: Transcription not found: {transcript_path}", file=sys.stderr)
print(f"Run 'gnommo -p {project_path.name} transcribe' first.", file=sys.stderr)
return 1
transcription = load_transcript(transcript_path, project_path)
if verbose:
print(f" - Markers in manuscript: {len(markers)}")
print(f" - Slides defined: {len(slides)}")
print(f" - Audio clips: {len(audio)}")
print(f" - Transcription words: {len(transcription)}")
# Stage 2: Validate
print("\n[2/4] Validating...")
warnings = validate_project(
project_path, markers, config, slides, videos, source_videos_dir, malformed
)
for w in warnings:
print(f" Warning: {w}")
print(" Passed.")
# Stage 3: Transform (includes on-the-fly alignment)
print("\n[3/4] Building render plan...")
plan, marker_timings = build_render_plan(
project_path,
config,
slides,
videos,
videos_dir,
manuscript_text,
transcription,
audio,
audio_dir,
slide_range=slide_range,
)
if plan.time_offset > 0:
print(f" Time offset: {plan.time_offset:.1f}s (partial render)")
# Print detailed render plan with alignment info
_print_render_plan_details(plan, marker_timings, slides)
if plan.audio_events:
print(f"\n Audio effects:")
for event in plan.audio_events:
loop_str = " (loop)" if event.audio_def.loop else ""
pause_str = " [ignores pauses]" if event.audio_def.ignore_pauses else ""
print(
f" - {event.audio_id}: '{event.audio_def.file}' @ {_format_time(event.start_time)}{loop_str}{pause_str}"
)
# Show always-visible videos
if plan.narration_videos:
print(f"\n Always-visible videos:")
for video_id, video_source, cutout in plan.narration_videos:
skip_str = (
f" (skip: {video_source.skip:.1f}s)" if video_source.skip > 0 else ""
)
cache_ind = " 📁" if video_id in plan.cached_files else ""
print(f" - {video_id} in '{video_source.cutout}'{skip_str}{cache_ind}")
# Show narration pauses
if plan.narration_pauses:
print(f"\n Narration pauses:")
for pause in plan.narration_pauses:
print(
f" - {pause.video_id} at {_format_time(pause.output_time)} "
f"for {pause.duration:.1f}s (narration freezes at {_format_time(pause.narration_time)})"
)
# Write tasks file with both missing assets and alignment issues
missing_videos = _collect_missing_video_markers(markers, videos)
slide_timings_for_collision = [
t for t in marker_timings if t.marker_id in slides and t.timestamp >= 0
]
collision_ids_render = set()
for _a, _b in zip(slide_timings_for_collision, slide_timings_for_collision[1:]):
if abs(_a.timestamp - _b.timestamp) < 0.1:
collision_ids_render.add(_a.marker_id)
collision_ids_render.add(_b.marker_id)
alignment_issues = [
(t.marker_id, t.context)
for t in marker_timings
if t.marker_id in slides
and (t.timestamp < 0 or t.marker_id in collision_ids_render)
]
_write_tasks_file(project_path, missing_videos, alignment_issues)
# Check for unaligned markers
unaligned = [t for t in marker_timings if t.timestamp < 0]
if unaligned:
print(f"\n WARNING: {len(unaligned)} marker(s) could not be aligned!")
for t in unaligned:
print(f' [{t.marker_id}] - "{t.context}"')
if not force:
print(f"\n Run with -f/--force to render anyway.")
return 1
else:
print(f"\n Continuing anyway due to --force flag...")
# Stage 4: Render
# Determine output filename and directory
if _output_path_override:
output_path = _output_path_override
out_dir = output_path.parent
out_filename = output_path.name
elif config.output_video:
out_filename = config.output_video
out_dir = project_path / "out" / res if res != "full" else project_path / "out"
output_path = out_dir / out_filename
elif slide_range:
start, end = slide_range
range_suffix = f"_{start}-{end}" if end else f"_{start}-end"
out_filename = f"final{range_suffix}.mp4"
out_dir = project_path / "out" / res if res != "full" else project_path / "out"
output_path = out_dir / out_filename
else:
out_filename = f"{config.co}.mp4"
out_dir = project_path / "out" / res if res != "full" else project_path / "out"
output_path = out_dir / out_filename
# Check if chunked rendering is needed (avoids filter graph OOM on long videos)
from .cache import get_render_chunk_size
_chunk_size = chunk_slides or get_render_chunk_size() or 0
_slide_ids = [e.slide_id for e in plan.slide_events]
if _chunk_size > 0 and not slide_range and len(_slide_ids) > _chunk_size:
return _chunked_render(
project_path,
verbose,
dry_run,
res,
force,
_chunk_size,
_slide_ids,
out_dir,
output_path,
)
plan.output_path = output_path
if dry_run:
print("\n[4/4] FFmpeg command (dry run):")
print(generate_ffmpeg_command_string(plan, output_path))
return 0
print("\n[4/4] Rendering...")
render(plan, output_path, verbose=verbose)
print(f" Output: {output_path}")
print("\nDone.")
return 0
# =============================================================================
# Transcribe Command
# =============================================================================
def _find_narration_video(config, videos: dict) -> Optional[tuple[str, "VideoSource"]]:
"""
Find the video to use for transcription/narration.
Priority:
1. config.audio_source if set
2. First video with always_visible=True
3. First video in dict
"""
from .models import VideoSource
# 1. Check audio_source config
if config.audio_source and config.audio_source in videos:
return config.audio_source, videos[config.audio_source]
# 2. Find always_visible video (main talking head)
for video_id, video_source in videos.items():
if video_source.always_visible:
return video_id, video_source
# 3. Fall back to first video
if videos:
video_id = next(iter(videos.keys()))
return video_id, videos[video_id]
return None
def cmd_transcribe(
project_path: Path, verbose: bool, res: str = "full", final: bool = False
) -> int:
"""Transcribe video audio using Whisper."""
from .transcriber import transcribe_video, save_transcript, words_to_srt
from .parser import parse_project_config, parse_videos
from .preprocessor import ensure_downscaled_files_exist
config = parse_project_config(project_path)
# Handle --final mode: transcribe the rendered output for YouTube captions
if final:
path = project_path / "out" / f"{config.output_video}.mp4"
return _transcribe_final(path, verbose)
mode_str = f" ({res.upper()})" if res != "full" else ""
print(f"Transcribing: {project_path.name}{mode_str}")
videos, videos_dir = parse_videos(project_path, config)
if not videos:
print("Error: No videos defined in videos.json", file=sys.stderr)
return 1
# Non-full res: use downscaled video directory
if res != "full":
videos_dir = ensure_downscaled_files_exist(
videos_dir, res, force=False, verbose=verbose
)
# Check for multi-segment narration (concatenated file)
if isinstance(config.main_video, list) and len(config.main_video) > 1:
video_path = videos_dir / "narration_combined.mov"
if not video_path.exists():
print(f"Error: Combined narration not found: {video_path}", file=sys.stderr)
print(
"Run 'gnommo -p <project> pre' first to concatenate segments.",
file=sys.stderr,
)
return 1
print(f" Using combined narration: {video_path.name}")
else:
# Single video - find it using existing logic
result = _find_narration_video(config, videos)
if not result:
print("Error: No suitable video found for transcription", file=sys.stderr)
return 1
video_id, video_source = result
video_path = videos_dir / video_source.source_file
if (
not video_path.exists()
and video_source.source_file == "narration_combined.mov"
):
found = _resolve_narration_combined(project_path, videos_dir, config)
if found:
video_path = found
if not video_path.exists():
video_path, _ = resolve_with_cache(video_path, project_path)
if not video_path.exists():
print(f"Error: Video not found: {video_path}", file=sys.stderr)
return 1
print(f" Video: {video_path.name}")
words = transcribe_video(video_path, model="base")
# Save to project-local path if configured in project.json (keeps transcript off external drives)
if config.transcript_path:
output_path = project_path / config.transcript_path
output_path.parent.mkdir(parents=True, exist_ok=True)
else:
output_path = video_path.with_suffix(".transcript.json")
save_transcript(words, output_path)
print(f" - Transcribed {len(words)} words")
print(f" - Duration: {words[-1].end:.1f}s" if words else " - No words found")
print(f" - Saved: {output_path}")
if verbose and words:
preview = " ".join(w.word for w in words[:10])
print(f" - Preview: {preview}...")
return 0
def _transcribe_final(final_video: Path, verbose: bool) -> int:
"""
Transcribe the final rendered video and generate SRT captions for YouTube.
Looks and creates out filename.srt suitable for upload.
"""
from .transcriber import transcribe_video, save_transcript, words_to_srt
print(f"Transcribing final output: {final_video}")
if not final_video.exists():
print(f"Error: Final video not found: {final_video}", file=sys.stderr)
print("Run 'gnommo render' first.", file=sys.stderr)
return 1
print(f" Video: {final_video.name}")
# Transcribe with word-level timestamps
words = transcribe_video(final_video, model="base")
if not words:
print("Error: No words transcribed from video", file=sys.stderr)
return 1
# Save JSON transcript
transcript_path = final_video.with_suffix(".transcript.json")
save_transcript(words, transcript_path)
# Generate SRT captions
srt_path = final_video.with_suffix(".srt")
srt_content = words_to_srt(words)
srt_path.write_text(srt_content, encoding="utf-8")
print(f" - Transcribed {len(words)} words")
print(f" - Duration: {words[-1].end:.1f}s")
print(f" - Transcript: {transcript_path}")
print(f" - Captions: {srt_path}")
# Count caption segments
caption_count = srt_content.count("\n\n") + 1
print(f" - Caption segments: {caption_count}")
if verbose and words:
preview = " ".join(w.word for w in words[:15])
print(f" - Preview: {preview}...")
print("\nSRT file ready for YouTube upload.")
return 0
# =============================================================================
# Align Command
# =============================================================================
def cmd_align(project_path: Path, verbose: bool) -> int:
"""Preview manuscript marker alignment (no files written)."""
from .transcriber import load_transcript
from .transformer import align_markers_to_transcription
from .parser import (
parse_project_config,
parse_videos,
parse_slides,
parse_audio,
parse_manuscript,
save_citations,
)
print(f"Alignment preview: {project_path.name}")
print(" (This is a preview - alignment happens automatically during render)")
# Load manuscript (cites are stripped at parse time)
manuscript_text, _, _, citations = parse_manuscript(project_path)
# Save citations for later use (e.g., description generation)
if citations:
citations_path = project_path / "citations.json"
save_citations(citations, citations_path)
# Load project config and resources
config = parse_project_config(project_path)
slides = parse_slides(project_path, config)
videos, videos_dir = parse_videos(project_path, config)
audio, _ = parse_audio(project_path, config)
# Find transcription (from narration video)
result = _find_narration_video(config, videos)
if not result:
print("Error: No suitable video found for transcription", file=sys.stderr)
return 1
video_id, video_source = result
video_path = videos_dir / video_source.source_file
transcript_path = video_path.with_suffix(".transcript.json")
# Try cache fallback for transcript
transcript_path, _ = resolve_with_cache(transcript_path, project_path)
if not transcript_path.exists():
print(f"Error: Transcription not found: {transcript_path}", file=sys.stderr)
print(f"Run 'gnommo -p {project_path.name} transcribe' first.", file=sys.stderr)
return 1
print(f" Loading: {transcript_path.name}")
transcription = load_transcript(transcript_path, project_path)
print(f" - {len(transcription)} words")
# Align (cite markers already stripped at parse time)
print("\n Aligning markers to transcription...")
timings = align_markers_to_transcription(
manuscript_text, transcription, slides=slides, videos=videos, audio=audio
)
# Report alignment results
unmatched = 0
fuzzy_matched = 0
exact_matched = 0
for t in timings:
if t.timestamp >= 0:
if t.confidence >= 1.0:
exact_matched += 1
if verbose:
print(f" [{t.marker_id}] @ {_format_time(t.timestamp)}")
else:
fuzzy_matched += 1
# Always show fuzzy matches so user can verify
print(
f" [{t.marker_id}] @ {_format_time(t.timestamp)} (fuzzy {t.confidence:.0%})"
)
else:
print(f' [{t.marker_id}] NOT FOUND - "{t.context}"')
unmatched += 1
# Summary
total = len(timings)
print(f"\n Alignment summary:")
print(f" - Exact matches: {exact_matched}/{total}")
if fuzzy_matched > 0:
print(f" - Fuzzy matches (60%+ words): {fuzzy_matched}/{total}")
if unmatched > 0:
print(f" - NOT FOUND: {unmatched}/{total}")
print(
f"\n Some markers could not be aligned. Check manuscript.txt matches the spoken audio."
)
return 0
# =============================================================================
# All Command (Full Pipeline)
# =============================================================================
def _files_modified_since(root: Path, since: float, pattern: str) -> bool:
"""Return True if any file matching pattern under root has mtime > since."""
try:
for p in root.rglob(pattern):
if p.is_file() and p.stat().st_mtime > since:
return True
except (OSError, PermissionError):
pass
return False
def cmd_all(
project_path: Path,
verbose: bool,
dry_run: bool,
res: str = "full",
force: bool = False,
) -> int:
"""Run full pipeline: import → preprocess → trim → stitch → render → push → handoff → up.
Cascade rule: if any stage produces output, all subsequent stages are forced
to re-run (cascade_force=True), regardless of whether --force was passed.
This ensures downstream caches are always consistent with upstream changes.
"""
from .handoff import cmd_handoff
from .push import cmd_push
print(f"=== Full Pipeline: {project_path.name} ===\n")
# cascade_force starts at --force. Once any stage does real work it flips to
# True so all downstream stages re-run unconditionally.
cascade_force = force
print(">>> Step 1/8: Import\n")
t0 = time.time()
result = cmd_import(project_path, cascade_force, verbose)
if result != 0:
return result
if _files_modified_since(project_path, t0, "slides.json") or _files_modified_since(
project_path, t0, "narration.json"
):
cascade_force = True
print("\n>>> Step 2/8: Preprocess\n")
t0 = time.time()
result = cmd_preprocess(
project_path, verbose, dry_run, cascade_force, workers=1, res=res
)
if result != 0:
return result
if _files_modified_since(
project_path, t0, "*_processed.mov"
) or _files_modified_since(project_path, t0, "*_processed.webm"):
cascade_force = True
print("\n>>> Step 3/8: Trim\n")
t0 = time.time()
result = cmd_trim(project_path, verbose, force=cascade_force, threshold_db=-40.0)
if result != 0:
return result
# Trim modifies narration.json skip/take values; any change invalidates stitch
if _files_modified_since(project_path, t0, "narration.json"):
cascade_force = True
print("\n>>> Step 4/8: Stitch\n")
t0 = time.time()
result = cmd_stitch(project_path, verbose, cascade_force, res=res)
if result != 0:
return result
if _files_modified_since(project_path, t0, "narration_combined.mov"):
cascade_force = True
print("\n>>> Step 5/8: Render\n")
result = cmd_render(project_path, verbose, dry_run, res=res, force=cascade_force)
if result != 0:
return result
print("\n>>> Step 6/8: Push\n")
result = cmd_push(project_path, verbose, force=False, prod=True)
if result != 0:
return result
print("\n>>> Step 7/8: Handoff\n")
result = cmd_handoff(project_path, verbose, file_override=None, prod=True, res=res)
if result != 0:
return result
print("\n>>> Step 8/8: Upload\n")
return cmd_sync(project_path, verbose, dry_run, download=False)
# =============================================================================
# Description Command
# =============================================================================
def cmd_description(project_path: Path, verbose: bool) -> int:
"""Generate YouTube description file with chapters, citations, and attributions."""
from .parser import (
parse_audio,
parse_manuscript,
parse_project_config,
parse_slides,
parse_videos,
load_citations,
)
from .transcriber import load_transcript
from .transformer import align_markers_to_transcription
from .description import write_description_file
print(f"Generating description: {project_path.name}")
# Parse all project files
manuscript_text, markers, _, _ = parse_manuscript(project_path)
# Load citations from file (saved during parse/render/align stages)
citations_path = project_path / "citations.json"
citations = load_citations(citations_path)
config = parse_project_config(project_path)
slides = parse_slides(project_path, config)
videos, videos_dir = parse_videos(project_path, config)
audio, _ = parse_audio(project_path, config)
# Load transcription for alignment (optional but recommended)
transcription = None
result = _find_narration_video(config, videos)
if result:
_, narration_source = result
video_path = videos_dir / narration_source.source_file
transcript_path = video_path.with_suffix(".transcript.json")
# Try cache fallback for transcript
transcript_path, _ = resolve_with_cache(transcript_path, project_path)
if transcript_path.exists():
transcription = load_transcript(transcript_path, project_path)
if verbose:
print(f" Loaded transcription: {len(transcription)} words")
else:
print(f" Warning: No transcription found at {transcript_path}")
print(
f" Run 'gnommo -p {project_path.name} transcribe' for better timestamps."
)
# Align markers to get timings
print(" Aligning markers...")
marker_timings = align_markers_to_transcription(
manuscript_text,
transcription or [],
slides=slides,
videos=videos,
audio=audio,
)
if verbose:
aligned = sum(1 for t in marker_timings if t.timestamp >= 0)
print(f" Aligned {aligned}/{len(marker_timings)} markers")
# Generate description
output_path = project_path / "out" / "description_youtube.txt"
description = write_description_file(
output_path=output_path,
config=config,
manuscript_text=manuscript_text,
slides=slides,
videos=videos,
marker_timings=marker_timings,
transcription=transcription,
citations=citations,
)
# Print summary
lines = description.split("\n")
print(f"\n Output: {output_path}")
print(f" Length: {len(description)} characters, {len(lines)} lines")
# Show sections found
sections = []
if config.description:
sections.append("description")
if "CHAPTERS" in description:
sections.append("chapters")
if "REFERENCES" in description:
sections.append("references")
if "STOCK FOOTAGE" in description:
sections.append("attributions")
if config.footer:
sections.append("footer")
print(f" Sections: {', '.join(sections)}")
if verbose:
print("\n --- Preview ---")
preview_lines = lines[:20]
for line in preview_lines:
print(f" {line}")
if len(lines) > 20:
print(f" ... ({len(lines) - 20} more lines)")
print("\nDone.")
return 0
# Files and directories excluded from all sync/archive/load operations.
# Covers intermediate processing artifacts, chunk scratch dirs, venv, and
# common OS/editor noise.
_RSYNC_EXCLUDES = [
# Intermediate processing files
"media/narration/intermediate/",
"media/narration/intermediate/**",
"media/videos/intermediate/",
"media/videos/intermediate/**",
"media/narration/processed/",
"media/narration/processed/**",
"media/videos/narration_combined.mov",
# Low-res preview files (generated locally, not synced)
"media/narration/low/",
"media/narration/low/**",
"media/videos/low/",
"media/videos/low/**",
# Chunk scratch directories
"**/chunks/",
"**/chunks/**",
# Python
"*.py",
"__pycache__/",
"venv/",
# Version control / OS noise
".git/",
".DS_Store",
"*.tmp",
]
def cmd_archive(project_path: Path, verbose: bool, dry_run: bool) -> int:
"""Archive project files to external cache storage."""
from .cache import load_cache_config
print(f"Archiving: {project_path.name}")
# Check cache is configured
cache_base = load_cache_config()
if cache_base is None:
print("Error: Cache not configured. Create ~/.gnommo.conf with:")
print(" [cache]")
print(" path = /Volumes/YourDisk/gnommo")
return 1
if not cache_base.exists():
print(f"Error: Cache path not accessible: {cache_base}")
print("Make sure the external drive is connected.")
return 1
# Build destination path
dest_path = cache_base / project_path.name
print(f" Source: {project_path}")
print(f" Destination: {dest_path}")
# Create destination if needed
if not dry_run:
dest_path.mkdir(parents=True, exist_ok=True)
rsync_cmd = [
"rsync",
"-av",
"--progress",
*[f"--exclude={p}" for p in _RSYNC_EXCLUDES],
f"{project_path}/",
f"{dest_path}/",
]
if dry_run:
rsync_cmd.insert(1, "--dry-run")
print("\n [DRY RUN] Would execute:")
print(f" {' '.join(rsync_cmd)}")
else:
print("\n Syncing files...")
if verbose:
print(f" Command: {' '.join(rsync_cmd)}")
result = subprocess.run(rsync_cmd)
if result.returncode != 0:
print(f"Error: rsync failed with code {result.returncode}")
return 1
# Update project.json with synced_time
if not dry_run:
project_json_path = project_path / "project.json"
if project_json_path.exists():
try:
data = _read_json(project_json_path.read_text(encoding="utf-8"))
data["synced_time"] = datetime.now().isoformat()
project_json_path.write_text(
json.dumps(data, indent=2, ensure_ascii=False) + "\n",
encoding="utf-8",
)
print(
f"\n Updated project.json with synced_time: {data['synced_time']}"
)
except (json.JSONDecodeError, IOError) as e:
print(f"Warning: Could not update project.json: {e}")
print("\nDone.")
return 0
def cmd_load(project_path: Path, verbose: bool, dry_run: bool) -> int:
"""Load project files from external cache storage onto the local drive."""
from .cache import load_cache_config
print(f"Loading: {project_path.name}")
# Check cache is configured
cache_base = load_cache_config()
if cache_base is None:
print("Error: Cache not configured. Create ~/.gnommo.conf with:")
print(" [cache]")
print(" path = /Volumes/YourDisk/gnommo")
return 1
if not cache_base.exists():
print(f"Error: Cache path not accessible: {cache_base}")
print("Make sure the external drive is connected.")
return 1
# Build source path on the external drive
src_path = cache_base / project_path.name
if not src_path.exists():
print(f"Error: Project not found on external drive: {src_path}")
return 1
print(f" Source: {src_path}")
print(f" Destination: {project_path}")
# Create destination if needed
if not dry_run:
project_path.mkdir(parents=True, exist_ok=True)
rsync_cmd = [
"rsync",
"-av",
"--progress",
*[f"--exclude={p}" for p in _RSYNC_EXCLUDES],
f"{src_path}/",
f"{project_path}/",
]
if dry_run:
rsync_cmd.insert(1, "--dry-run")
print("\n [DRY RUN] Would execute:")
print(f" {' '.join(rsync_cmd)}")
else:
print("\n Copying files...")
if verbose:
print(f" Command: {' '.join(rsync_cmd)}")
result = subprocess.run(rsync_cmd)
if result.returncode != 0:
print(f"Error: rsync failed with code {result.returncode}")
return 1
print("\nDone.")
return 0
def cmd_sync(project_path: Path, verbose: bool, dry_run: bool, download: bool) -> int:
"""Sync project files to/from the remote server via rsync over SSH."""
from .cache import load_server_config
server = load_server_config()
if server is None:
print("Error: Server not configured. Add to ~/.gnommo.conf:")
print(" [server]")
print(" host = 76.13.144.52")
print(" user = root")
print(" path = /gnommo/project")
return 1
direction = "Downloading from" if download else "Uploading to"
print(f"{direction} server: {project_path.name}")
remote = f"{server['user']}@{server['host']}:{server['path']}/{project_path.name}/"
local = f"{project_path}/"
if download:
src, dest = remote, local
else:
src, dest = local, remote
print(f" Source: {src}")
print(f" Destination: {dest}")
# Ensure destination directory exists
if not dry_run:
if download:
project_path.mkdir(parents=True, exist_ok=True)
else:
remote_dir = f"{server['path']}/{project_path.name}"
ssh_cmd = [
"ssh",
"-p",
server["port"],
f"{server['user']}@{server['host']}",
f"mkdir -p {remote_dir}",
]
if verbose:
print(f" Creating remote dir: {remote_dir}")
result = subprocess.run(ssh_cmd)
if result.returncode != 0:
print(f"Error: could not create remote directory {remote_dir}")
return 1
rsync_cmd = [
"rsync",
"-av",
"--progress",
"-e",
f"ssh -p {server['port']}",
*[f"--exclude={p}" for p in _RSYNC_EXCLUDES],
# On upload: delete server-side files that no longer exist locally so
# the remote stays an exact mirror of the local project.
*(["--delete"] if not download else []),
src,
dest,
]
if dry_run:
rsync_cmd.insert(1, "--dry-run")
print("\n [DRY RUN] Would execute:")
print(f" {' '.join(rsync_cmd)}")
else:
print("\n Syncing files...")
if verbose:
print(f" Command: {' '.join(rsync_cmd)}")
result = subprocess.run(rsync_cmd)
if result.returncode != 0:
print(f"Error: rsync failed with code {result.returncode}")
return 1
print("\nDone.")
return 0
# =============================================================================
# Extract Audio Command
# =============================================================================
def _extract_audio_file(
source_path: Path,
output_dir: Path,
name: str,
channel: str,
verbose: bool,
) -> int:
"""
Extract audio from a single video file to WAV.
Args:
source_path: Path to the source video file
output_dir: Directory to save the WAV file
name: Base name for the output file (without extension)
channel: "left", "right", or "both"
verbose: Print verbose output
Returns:
0 on success, 1 on error
"""
# Build output filename
if channel == "both":
output_name = f"{name}.wav"
else:
output_name = f"{name}_{channel}.wav"
output_path = output_dir / output_name
print(f" Channel: {channel}")
print(f" Source: {source_path}")
print(f" Output: {output_path}")
# Build ffmpeg command
cmd = [
"ffmpeg",
"-y", # Overwrite
"-i",
str(source_path),
"-vn", # No video
]
# Channel selection
if channel == "left":
cmd.extend(["-af", "pan=mono|c0=c0"])
elif channel == "right":
cmd.extend(["-af", "pan=mono|c0=c1"])
# "both" keeps stereo, no filter needed
# Output format: 48kHz 16-bit WAV (standard for audio editing)
cmd.extend(
[
"-ar",
"48000", # 48kHz sample rate
"-acodec",
"pcm_s16le", # 16-bit PCM
str(output_path),
]
)
if verbose:
print(f" Command: {' '.join(cmd)}")
print(f" Extracting...", end=" ", flush=True)
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"Error!")
print(f" {result.stderr}", file=sys.stderr)
return 1
# Get duration info
duration_cmd = [
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
str(output_path),
]
duration_result = subprocess.run(duration_cmd, capture_output=True, text=True)
duration_str = ""
if duration_result.returncode == 0:
try:
duration = float(duration_result.stdout.strip())
duration_str = f" ({duration:.1f}s)"
except ValueError:
pass
print(f"Done{duration_str}")
print(f"\n Open in Audition to experiment with:")
print(f" - Effect > Noise Reduction")
print(f" - Effect > Compressor")
print(f" - Effect > Filter Curve EQ")
print(f" - Effect > Loudness Normalization")
print(
f"\n Once you find good settings, update narration.json with matching filter config."
)
return 0
def cmd_extract_audio(
project_path: Path,
verbose: bool,
segment: Optional[str] = None,
channel: str = "both",
combined: bool = False,
) -> int:
"""
Extract audio from narration segments to WAV files for editing in Audacity.
This allows you to experiment with audio processing settings (EQ, compression,
noise reduction) in external software before applying them in the pipeline.
Args:
project_path: Path to the project directory
verbose: Enable verbose output
segment: Specific segment ID to extract, or None for all segments
channel: Which channel(s) to extract: "left", "right", or "both"
combined: If True, extract from narration_combined.mov instead of segments
"""
from .parser import parse_project_config, parse_narration, parse_videos
print(f"Extracting audio: {project_path.name}")
config = parse_project_config(project_path)
# Handle --combined mode: extract from narration_combined.mov
if combined:
videos, videos_dir = parse_videos(project_path, config)
combined_path = _resolve_narration_combined(
project_path, videos_dir, config
) or (videos_dir / "narration_combined.mov")
if not combined_path.exists():
print(
f"Error: narration_combined.mov not found at {combined_path}",
file=sys.stderr,
)
print(_narration_combined_hint(project_path, config), file=sys.stderr)
return 1
# Output to project out/ directory
audio_dir = project_path / "out"
audio_dir.mkdir(parents=True, exist_ok=True)
return _extract_audio_file(
combined_path, audio_dir, "narration_combined", channel, verbose
)
# Normal mode: extract from individual segments
narration, narration_dir = parse_narration(project_path, config)
if not narration:
print(" No narration segments found in media/narration/narration.json")
print(" Run 'gnommo -p <project> import' first to populate narration.json")
return 1
# Create output directory
audio_dir = narration_dir / "audio"
audio_dir.mkdir(parents=True, exist_ok=True)
# Determine which segments to process
if segment:
if segment not in narration:
print(
f"Error: Segment '{segment}' not found in narration.json",
file=sys.stderr,
)
print(
f"Available segments: {', '.join(sorted(narration.keys()))}",
file=sys.stderr,
)
return 1
segments_to_process = [(segment, narration[segment])]
else:
segments_to_process = sorted(narration.items())
print(f" Channel: {channel}")
print(f" Output: {audio_dir}/")
print(f" Segments: {len(segments_to_process)}")
# Process each segment
for segment_id, segment_source in segments_to_process:
source_path = narration_dir / segment_source.source_file
if not source_path.exists():
print(f" Warning: Source not found: {source_path.name}, skipping")
continue
# Build output filename
if channel == "both":
output_name = f"{segment_id}.wav"
else:
output_name = f"{segment_id}_{channel}.wav"
output_path = audio_dir / output_name
print(f"\n {segment_id}:")
print(f" Source: {source_path.name}")
print(f" Output: {output_name}")
# Build ffmpeg command
cmd = [
"ffmpeg",
"-y", # Overwrite
"-i",
str(source_path),
"-vn", # No video
]
# Channel selection
if channel == "left":
cmd.extend(["-af", "pan=mono|c0=c0"])
elif channel == "right":
cmd.extend(["-af", "pan=mono|c0=c1"])
# "both" keeps stereo, no filter needed
# Output format: 48kHz 16-bit WAV (standard for audio editing)
cmd.extend(
[
"-ar",
"48000", # 48kHz sample rate
"-acodec",
"pcm_s16le", # 16-bit PCM
str(output_path),
]
)
if verbose:
print(f" Command: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f" Error: {result.stderr}", file=sys.stderr)
return 1
# Get duration info
duration_cmd = [
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
str(output_path),
]
duration_result = subprocess.run(duration_cmd, capture_output=True, text=True)
if duration_result.returncode == 0:
try:
duration = float(duration_result.stdout.strip())
print(f" Duration: {duration:.1f}s")
except ValueError:
pass
print(f" Done")
print(f"\n Audio files saved to: {audio_dir}")
print(f"\n Open in Audacity to experiment with:")
print(f" - Effect > Noise Reduction")
print(f" - Effect > Compressor")
print(f" - Effect > Filter Curve EQ")
print(f" - Effect > Loudness Normalization")
print(
f"\n Once you find good settings, update narration.json with matching filter config."
)
return 0
# =============================================================================
# Master Command (A/B audio comparison)
# =============================================================================
def cmd_master(
project_path: Path,
verbose: bool,
channel: str = "both",
) -> int:
"""
Extract raw and processed audio from narration_combined for A/B comparison.
Outputs:
out/narration_combined.wav - Raw audio (no processing)
out/narration_combined_processed.wav - With audio filters applied
This lets you compare the effect of your audio processing chain.
"""
from .parser import parse_project_config, parse_videos
from .preprocessor import parse_audio_normalize_config
print(f"Audio mastering: {project_path.name}")
config = parse_project_config(project_path)
videos, videos_dir = parse_videos(project_path, config)
# Find narration_combined.mov
combined_path = _resolve_narration_combined(project_path, videos_dir, config) or (
videos_dir / "narration_combined.mov"
)
if not combined_path.exists():
print(
f"Error: narration_combined.mov not found at {combined_path}",
file=sys.stderr,
)
print(_narration_combined_hint(project_path, config), file=sys.stderr)
return 1
# Output directory
out_dir = project_path / "out"
out_dir.mkdir(parents=True, exist_ok=True)
raw_output = out_dir / "narration_combined.wav"
processed_output = out_dir / "narration_combined_processed.wav"
# Find audio_normalize config from default_filters
audio_config = None
if config.default_filters:
for preset_name, filters in config.default_filters.items():
for f in filters:
if f.get("type") == "audio_normalize":
audio_config = f
print(f" Using audio config from: default_filters.{preset_name}")
break
if audio_config:
break
if not audio_config:
print(" Warning: No audio_normalize filter found in default_filters")
print(" Will only extract raw audio.")
# Build channel filter
channel_filter = ""
if channel == "left":
channel_filter = "pan=mono|c0=c0,"
elif channel == "right":
channel_filter = "pan=mono|c0=c1,"
# Step 1: Extract raw audio
print(f"\n Extracting raw audio...")
raw_cmd = [
"ffmpeg",
"-y",
"-i",
str(combined_path),
"-vn",
]
if channel_filter:
raw_cmd.extend(["-af", channel_filter.rstrip(",")])
raw_cmd.extend(
[
"-ar",
"48000",
"-acodec",
"pcm_s16le",
str(raw_output),
]
)
if verbose:
print(f" Command: {' '.join(raw_cmd)}")
result = subprocess.run(raw_cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f" Error extracting raw audio: {result.stderr}", file=sys.stderr)
return 1
print(f" Saved: {raw_output.name}")
# Step 2: Extract processed audio (if we have config)
if audio_config:
print(f"\n Applying audio filters...")
cfg = parse_audio_normalize_config(audio_config)
# Build filter chain (same order as apply_audio_normalize)
audio_filters = []
# Channel mapping
if channel_filter:
audio_filters.append(channel_filter.rstrip(","))
# EQ bands
for band in cfg.eq_bands:
if band.type == "lowshelf":
audio_filters.append(
f"lowshelf=f={band.freq:.1f}:g={band.gain:.1f}:t=q:w={band.q:.2f}"
)
elif band.type == "highshelf":
audio_filters.append(
f"highshelf=f={band.freq:.1f}:g={band.gain:.1f}:t=q:w={band.q:.2f}"
)
else:
audio_filters.append(
f"equalizer=f={band.freq:.1f}:width_type=q:width={band.q:.2f}:g={band.gain:.1f}"
)
# High-pass
if cfg.highpass > 0:
audio_filters.append(f"highpass=f={cfg.highpass:.1f}")
# Low-pass
if cfg.lowpass > 0:
audio_filters.append(f"lowpass=f={cfg.lowpass:.1f}")
# Room EQ
if cfg.room_eq:
audio_filters.append(
f"equalizer=f={cfg.room_eq_freq:.1f}:width_type=q:width={cfg.room_eq_width:.2f}:g={cfg.room_eq_gain:.1f}"
)
# Denoise
if cfg.denoise:
audio_filters.append(f"afftdn=nf={cfg.noise_floor:.1f}")
# Gate
if cfg.gate:
audio_filters.append(
f"agate=threshold={cfg.gate_threshold:.1f}dB"
f":range={cfg.gate_range:.1f}dB"
f":attack={cfg.gate_attack:.1f}"
f":release={cfg.gate_release:.1f}"
)
# Compressor
if cfg.compress:
audio_filters.append(
f"acompressor=threshold={cfg.threshold:.1f}dB"
f":ratio={cfg.ratio:.1f}"
f":attack={cfg.attack:.1f}"
f":release={cfg.release:.1f}"
f":makeup={cfg.makeup:.1f}dB"
)
# Loudness normalization
if cfg.normalize:
audio_filters.append(
f"loudnorm=I={cfg.target_lufs:.1f}"
f":LRA={cfg.target_lra:.1f}"
f":TP={cfg.target_tp:.1f}"
)
filter_chain = ",".join(audio_filters)
if verbose:
print(f" Filter chain: {filter_chain}")
# Print filter summary
print(f" Filters applied:")
if cfg.eq_bands:
print(f" - EQ: {len(cfg.eq_bands)} bands")
if cfg.highpass > 0:
print(f" - Highpass: {cfg.highpass}Hz")
if cfg.denoise:
print(f" - Denoise: floor={cfg.noise_floor}dB")
if cfg.gate:
print(f" - Gate: threshold={cfg.gate_threshold}dB")
if cfg.compress:
print(f" - Compressor: ratio={cfg.ratio}:1, attack={cfg.attack}ms")
if cfg.normalize:
print(f" - Loudnorm: target={cfg.target_lufs} LUFS")
processed_cmd = [
"ffmpeg",
"-y",
"-i",
str(combined_path),
"-vn",
"-af",
filter_chain,
"-ar",
"48000",
"-acodec",
"pcm_s16le",
str(processed_output),
]
if verbose:
print(f" Command: {' '.join(processed_cmd)}")
result = subprocess.run(processed_cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f" Error applying filters: {result.stderr}", file=sys.stderr)
return 1
print(f" Saved: {processed_output.name}")
# Get durations
def get_duration(path):
cmd = [
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
str(path),
]
r = subprocess.run(cmd, capture_output=True, text=True)
try:
return float(r.stdout.strip())
except:
return 0
duration = get_duration(raw_output)
print(f"\n Output files ({duration:.1f}s):")
print(f" {raw_output}")
print(f" {processed_output}")
print(f"\n Open both in Audition to A/B compare the processing.")
return 0
if __name__ == "__main__":
sys.exit(main())