"""CLI entry point for GnommoEditor.""" import argparse import json from logging import config import re import time import shutil import subprocess import sys from datetime import datetime from pathlib import Path from gnommo.parser import _read_json from . import __version__ from .errors import GnommoError, ParseError, ValidationError, RenderError from .cache import get_cache_info, resolve_with_cache from typing import Optional, Union class NotImplementedException(GnommoError): """Feature not yet implemented.""" pass def main() -> int: """Main entry point.""" parser = argparse.ArgumentParser( prog="gnommo", description="GnommoEditor - A code-first video editing pipeline", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: gnommo -p video1 render Render the full project gnommo -p video1 render --slides S1:S10 Render only slides S1-S10 gnommo -p video1 render --slides S10: Render from S10 to end gnommo -p video1 validate Validate only gnommo -p video1 import Generate slides.json from images gnommo -p video1 pre Preprocess videos (chroma key, etc.) gnommo -p video1 stitch --res tiny -f Fast stitch with new begin/end values gnommo -p video1 trim Auto-detect silence and set skip/take in narration.json gnommo -p video1 trim --force Redo trim even for segments that already have skip/take gnommo -p video1 trim --threshold -25 Raise threshold to ignore clothing/room noise gnommo -p video1 trim -v Show detected silence periods for debugging gnommo -p video1 transcode Transcode narration folder to H.265 (1st pass, before preprocess) gnommo -p video1 transcode --replace Delete originals after successful transcode gnommo -p video1 transcode --crf 28 Lower quality / smaller files (default CRF: 23) gnommo -p video1 transcode --processed Compress _processed.mov files to HEVC+alpha (2nd pass, after preprocess) gnommo -p video1 transcode --processed --alpha-quality 0.5 More aggressive alpha compression gnommo -p video1 transcode --processed --dry-run Preview what would be compressed gnommo -p video1 transcode --force Re-transcode even if output already exists gnommo -p video1 all Full pipeline: down → import → preprocess → trim → stitch → render → push → handoff → up gnommo -p video1 render --dry-run Show FFmpeg command without running gnommo -p video1 description Generate YouTube description file gnommo -p video1 transcribe Narration file for timing of slides gnommo -p video1 transcribe --final Transcribe outputted file and generate SRT for YouTube gnommo -p video1 archive Sync project to external cache storage gnommo -p video1 archive --dry-run Preview what would be synced gnommo -p video1 up Upload project files to remote server gnommo -p video1 down Download project files from remote server gnommo -p video1 extract-audio --combined Extract audio from narration_combined.mov gnommo -p video1 extract-audio --combined --channel left Extract left channel only gnommo -p video1 extract-audio --segment seg01 Extract from a specific segment gnommo -p video1 master Extract raw + processed audio for A/B comparison """, ) parser.add_argument( "--version", action="version", version=f"%(prog)s {__version__}", ) # Required arguments parser.add_argument( "-p", "--project", type=str, required=True, help="Project directory", ) parser.add_argument( "action", type=str, nargs="?", default="render", choices=[ "validate", "preprocess", "pre", "stitch", "trim", "render", "all", "transcribe", "align", "import", "description", "archive", "load", "up", "down", "extract-audio", "master", "push", "pull", "handoff", "transcode", ], help="Action to perform (default: render)", ) # Optional arguments parser.add_argument( "-v", "--verbose", action="store_true", help="Verbose output", ) parser.add_argument( "-f", "--force", action="store_true", help="Force overwrite existing files", ) parser.add_argument( "--dry-run", action="store_true", help="Show what would be done without executing", ) parser.add_argument( "--slides", type=str, help="Render only a range of slides (e.g., S1:S10, S5:, S10:S20)", ) parser.add_argument( "--chunk-slides", type=int, default=0, dest="chunk_slides", help="Split render into chunks of N slides each and concatenate (overrides render_chunk_slides in .gnommo.conf)", ) parser.add_argument( "--res", type=str, choices=["full", "low", "tiny"], default="full", help="Resolution: 'full' (project res), 'low' (490x270), 'tiny' (320x180 ultrafast)", ) parser.add_argument( "-w", "--workers", type=int, default=1, help="Number of parallel workers for preprocessing (default: 1)", ) parser.add_argument( "--final", action="store_true", help="For transcribe: transcribe the final rendered video and generate SRT captions for YouTube", ) parser.add_argument( "--segment", type=str, help="For extract-audio: specific segment ID to extract (default: all segments)", ) parser.add_argument( "--channel", type=str, choices=["auto", "left", "right", "both"], default="both", help="For extract-audio: which audio channel(s) to extract (default: both)", ) parser.add_argument( "--combined", action="store_true", help="For extract-audio: extract from narration_combined.mov instead of individual segments", ) parser.add_argument( "--file", default=None, help="For handoff: path to video file (overrides output_video in project.json)", ) parser.add_argument( "--prod", action="store_true", help="Target production server (GNOMMOWEB_PROD_URL / GNOMMOWEB_PROD_API_KEY)", ) parser.add_argument( "--threshold", type=float, default=-40.0, help="For trim: silence threshold in dB (default: -40). Raise (e.g. -25) to ignore clothing/room noise.", ) parser.add_argument( "--crf", type=int, default=23, help="For transcode: H.265 quality (CRF, default: 23; lower=better quality, larger file)", ) parser.add_argument( "--replace", action="store_true", help="For transcode: delete original files after successful transcode", ) parser.add_argument( "--processed", action="store_true", help="For transcode: compress _processed.mov files (with alpha) using HEVC+alpha instead of narration files", ) parser.add_argument( "--alpha-quality", type=float, default=1.0, dest="alpha_quality", help="For transcode --processed: HEVC alpha quality 0.0-1.0 (default: 0.75; lower=smaller file)", ) args = parser.parse_args() # Resolve project path project_path = Path(args.project) if not project_path.is_absolute(): project_path = Path.cwd() / project_path try: # Handle actions action = args.action if action == "import": return cmd_import(project_path, args.force, args.verbose) elif action == "validate": return cmd_validate(project_path, args.verbose) elif action in ("preprocess", "pre"): return cmd_preprocess( project_path, args.verbose, args.dry_run, args.force, args.workers, args.res, ) elif action == "trim": return cmd_trim(project_path, args.verbose, args.force, args.threshold, args.res) elif action == "transcode": return cmd_transcode( project_path, args.verbose, args.dry_run, args.replace, args.crf, args.force, args.processed, args.alpha_quality, ) elif action in ("stitch"): return cmd_stitch( project_path, args.verbose, args.force, args.res, ) elif action == "render": return cmd_render( project_path, args.verbose, args.dry_run, args.slides, args.res, args.force, chunk_slides=args.chunk_slides, ) elif action == "transcribe": return cmd_transcribe(project_path, args.verbose, args.res, args.final) elif action == "align": return cmd_align(project_path, args.verbose) elif action == "all": return cmd_all( project_path, args.verbose, args.dry_run, args.res, args.force ) elif action == "description": return cmd_description(project_path, args.verbose) elif action == "archive": return cmd_archive(project_path, args.verbose, args.dry_run) elif action == "load": return cmd_load(project_path, args.verbose, args.dry_run) elif action == "up": return cmd_sync(project_path, args.verbose, args.dry_run, download=False) elif action == "down": return cmd_sync(project_path, args.verbose, args.dry_run, download=True) elif action == "extract-audio": return cmd_extract_audio( project_path, args.verbose, args.segment, args.channel, args.combined ) elif action == "master": return cmd_master(project_path, args.verbose, args.channel) elif action == "push": from .push import cmd_push return cmd_push(project_path, args.verbose, args.force, args.prod) elif action == "pull": from .pull import cmd_pull return cmd_pull(project_path, args.verbose, args.force, args.prod) elif action == "handoff": from .handoff import cmd_handoff return cmd_handoff( project_path, args.verbose, args.file, args.prod, args.res ) except GnommoError as e: print(f"Error: {e}", file=sys.stderr) return 1 except KeyboardInterrupt: print("\nAborted.", file=sys.stderr) return 130 return 0 # ============================================================================= # Import Command # ============================================================================= def cmd_import(project_path: Path, force: bool, verbose: bool) -> int: """Import assets and generate metadata JSON files.""" from .parser import parse_project_config, _read_json print(f"Importing assets for: {project_path.name}") if not project_path.exists(): print(f"Error: Project directory not found: {project_path}", file=sys.stderr) return 1 # Load project config if it exists (for videos_path and default_filters) config = None if (project_path / "project.json").exists(): config = parse_project_config(project_path) # Import videos from media/videos directory if config and config.videos_path: videos_json_path = project_path / config.videos_path videos_dir = videos_json_path.parent else: videos_dir = project_path / "media" / "videos" if videos_dir.exists(): _import_videos(videos_dir, config, verbose) # Import narration segments from media/narration directory narration_dir = project_path / "media" / "narration" if narration_dir.exists(): _import_narration_segments(narration_dir, config, verbose) # Import presenter notes from Keynote file (also exports slide PNGs) keynote_files = list(project_path.glob("*.key")) if keynote_files: keynote_file = keynote_files[0] # Use first .key file found if len(keynote_files) > 1: print(f" Warning: Multiple .key files found, using {keynote_file.name}") _import_presenter_notes(project_path, keynote_file, verbose) # Generate slides.json for each slide directory (after Keynote export) slides_base = project_path / "media" / "slides" slides_dirs = ( [d for d in slides_base.glob("*/") if d.is_dir()] if slides_base.exists() else [] ) for slides_dir in slides_dirs: _generate_slides_json(slides_dir, verbose) else: if verbose: print(" No .key file found, skipping presenter notes import") # Import shared assets (pexels, etc.) from shared_assets directory # Look for shared_assets relative to project or in parent directories shared_assets_dir = _find_shared_assets(project_path) if shared_assets_dir: _import_shared_assets(shared_assets_dir, verbose) _import_shared_audio(shared_assets_dir, project_path, config, verbose) _sync_shared_videos_to_local(project_path, config, shared_assets_dir, verbose) # Probe and cache audio file durations into audio.json _probe_audio_durations(project_path, config, force, verbose, shared_assets_dir) # Probe and cache video metadata (duration, has_audio) into videos.json _probe_video_metadata(project_path, config, shared_assets_dir, force, verbose) print("Import complete.") return 0 def _import_shared_audio( shared_assets_dir: Path, project_path: Path, config, verbose: bool, ) -> None: """Import audio files from shared_assets/media/audio into the project's audio.json.""" audio_extensions = {".mp3", ".wav", ".aac", ".m4a", ".ogg", ".flac"} shared_audio_dir = shared_assets_dir / "media" / "audio" if not shared_audio_dir.exists(): if verbose: print(f" No shared audio dir found at {shared_audio_dir}") return audio_files = sorted( f for f in shared_audio_dir.iterdir() if f.is_file() and f.suffix.lower() in audio_extensions and not f.name.startswith(".") ) if not audio_files: if verbose: print(f" No audio files found in {shared_audio_dir}") return # Resolve project audio.json path if config and config.audio_path: audio_json_path = project_path / config.audio_path else: audio_json_path = project_path / "media" / "audio" / "audio.json" audio_json_path.parent.mkdir(parents=True, exist_ok=True) existing: dict = _read_json(audio_json_path) if audio_json_path.exists() else {} added = 0 for f in audio_files: audio_id = f.stem if audio_id in existing: if verbose: print(f" Skipping {audio_id} (already in audio.json)") continue existing[audio_id] = { "file": f.name, "is_shared": True, "volume": 1.0, } added += 1 if verbose: print(f" Added shared audio: {audio_id}") if added > 0: with open(audio_json_path, "w", encoding="utf-8") as fh: json.dump(existing, fh, indent=2) print(f" Updated {audio_json_path.relative_to(project_path)} (+{added} shared audio files)") else: if verbose: print(f" No new shared audio files to add") def _probe_audio_durations( project_path: Path, config, force: bool, verbose: bool, shared_assets_dir: Optional[Path] = None, ) -> None: """Probe and cache audio file durations into audio.json. Runs once at import time so the render stage never needs to scan audio files. Skips entries that already have a duration unless --force is set. """ from .renderer import _get_audio_duration if config and config.audio_path: audio_json_path = project_path / config.audio_path else: audio_json_path = project_path / "audio.json" if not audio_json_path.exists(): return audio_dir = audio_json_path.parent data = _read_json(audio_json_path) updated = False for audio_id, audio_data in data.items(): if "file" not in audio_data: continue if "duration" in audio_data and not force: if verbose: print(f" Audio '{audio_id}': cached ({audio_data['duration']:.1f}s)") continue if audio_data.get("is_shared") and shared_assets_dir: audio_path = shared_assets_dir / "media" / "audio" / audio_data["file"] else: audio_path = audio_dir / audio_data["file"] if not audio_path.exists(): if verbose: print(f" Audio '{audio_id}': file not found, skipping") continue print( f" Probing audio '{audio_id}' ({audio_path.name})...", end=" ", flush=True ) try: duration = _get_audio_duration(audio_path) data[audio_id]["duration"] = round(duration, 3) updated = True print(f"{duration:.1f}s") except Exception as e: print(f"failed ({e})") if updated: with open(audio_json_path, "w") as f: json.dump(data, f, indent=4) print(f" Saved durations to {audio_json_path.name}") def _probe_video_metadata( project_path: Path, config, shared_assets_dir: Optional[Path], force: bool, verbose: bool, ) -> None: """Probe and cache video file duration and audio presence into videos.json. Runs once at import time so the render stage never needs to probe video files. Shared entries are written back to shared_assets/videos.json (canonical source). Local entries are written to the project's videos.json. Skips entries that already have both fields unless --force is set. """ from .preprocessor import get_video_duration from .renderer import _has_audio_stream if config and config.videos_path: videos_json_path = project_path / config.videos_path else: videos_json_path = project_path / "media" / "videos" / "videos.json" if not videos_json_path.exists(): return videos_dir = videos_json_path.parent local_data = _read_json(videos_json_path) # Load shared_assets/videos.json separately — shared probes write there shared_json_path = shared_assets_dir / "videos.json" if shared_assets_dir else None shared_data = ( _read_json(shared_json_path) if shared_json_path and shared_json_path.exists() else {} ) local_updated = False shared_updated = False for video_id, video_data in local_data.items(): if "source_file" not in video_data: continue is_shared = video_data.get("is_shared", False) # For shared entries, check the shared_assets/videos.json for cached values if is_shared and video_id in shared_data: canonical = shared_data[video_id] else: canonical = video_data if not force and "duration" in canonical and "has_audio" in canonical: if verbose: print( f" Video '{video_id}': cached ({canonical['duration']:.1f}s, audio={canonical['has_audio']})" ) continue base_dir = ( shared_assets_dir if (is_shared and shared_assets_dir) else videos_dir ) # Mirror renderer._resolve_video_path: try output_file first, then source_file video_path = None output_file = video_data.get("output_file") if output_file: for candidate_dir in [base_dir, base_dir.parent]: candidate = candidate_dir / output_file if candidate.exists(): video_path = candidate break mov_candidate = candidate.with_suffix(".mov") if mov_candidate.exists(): video_path = mov_candidate break if video_path is None: source_candidate = base_dir / video_data["source_file"] if source_candidate.exists(): video_path = source_candidate if video_path is None: if verbose: print(f" Video '{video_id}': file not found, skipping") continue print( f" Probing video '{video_id}' ({video_path.name})...", end=" ", flush=True ) try: duration = get_video_duration(video_path) has_audio = _has_audio_stream(video_path) result = {"duration": round(duration, 3), "has_audio": has_audio} print(f"{duration:.1f}s, audio={has_audio}") if is_shared and video_id in shared_data: # Write back to shared_assets/videos.json — canonical source for shared assets shared_data[video_id].update(result) shared_updated = True else: local_data[video_id].update(result) local_updated = True except Exception as e: print(f"failed ({e})") if local_updated: with open(videos_json_path, "w") as f: json.dump(local_data, f, indent=4) print(f" Saved metadata to {videos_json_path.name}") if shared_updated and shared_json_path: with open(shared_json_path, "w") as f: json.dump(shared_data, f, indent=4) print(f" Saved shared metadata to {shared_json_path.name}") def _sync_shared_videos_to_local( project_path: Path, config, shared_assets_dir: Path, verbose: bool ) -> None: """Append entries from shared_assets/videos.json into the project's local videos.json. Each new entry gets is_shared=true so the renderer looks in shared_assets_dir. Existing local entries are never overwritten (preserves cutout, layer, filters, etc.). """ shared_videos_json = shared_assets_dir / "videos.json" if not shared_videos_json.exists(): return shared_videos = _read_json(shared_videos_json) if not shared_videos: return if config and config.videos_path: local_json_path = project_path / config.videos_path else: local_json_path = project_path / "media" / "videos" / "videos.json" local_videos: dict = {} if local_json_path.exists(): local_videos = _read_json(local_json_path) _METADATA_FIELDS = ("duration", "has_audio") added = [] metadata_updated = [] for video_id, shared_entry in shared_videos.items(): if video_id in local_videos: # Propagate any metadata fields that were probed into shared_assets/videos.json changed = False for field in _METADATA_FIELDS: if ( field in shared_entry and local_videos[video_id].get(field) != shared_entry[field] ): local_videos[video_id][field] = shared_entry[field] changed = True if changed: metadata_updated.append(video_id) elif verbose: print(f" Shared '{video_id}': already in local videos.json, skipping") continue # New entry — copy from shared and mark it as shared local_entry = dict(shared_entry) local_entry["is_shared"] = True local_videos[video_id] = local_entry added.append(video_id) if added or metadata_updated: local_json_path.parent.mkdir(parents=True, exist_ok=True) with open(local_json_path, "w", encoding="utf-8") as f: json.dump(local_videos, f, indent=4) if added: print( f" Synced {len(added)} shared asset(s) to local videos.json: {', '.join(added)}" ) if metadata_updated: print( f" Updated metadata for {len(metadata_updated)} shared asset(s): {', '.join(metadata_updated)}" ) elif verbose: print(" No new shared assets to sync to local videos.json") def _find_shared_assets(project_path: Path) -> Optional[Path]: """Find the shared_assets directory. Looks in: 1. project_path/shared_assets 2. project_path/../shared_assets (sibling to project) """ # Check if shared_assets is inside project if (project_path / "shared_assets").exists(): return project_path / "shared_assets" # Check if shared_assets is sibling to project if (project_path.parent / "shared_assets").exists(): return project_path.parent / "shared_assets" return None def _import_shared_assets(shared_assets_dir: Path, verbose: bool) -> None: """Import video files from shared_assets directory into videos.json. Scans the root level and all subdirectories for video files and creates a unified videos.json in shared_assets/. Video IDs use the filename for root-level files (e.g., "Logo") or are prefixed with the subfolder name for subdirectory files (e.g., "pexels/filename"). """ video_extensions = {".mov", ".mp4", ".webm", ".avi", ".mkv", ".m4v"} # Find all video files in shared_assets (root level and subdirectories) video_files: list[tuple[Path, Path]] = [] # (relative_path, absolute_path) for item in shared_assets_dir.iterdir(): if item.name.startswith("."): continue if item.is_file(): # Video file directly in shared_assets root if ( item.suffix.lower() in video_extensions and not item.name.endswith("_processed.mov") and not item.name.endswith("_processed.webm") ): rel_path = item.relative_to(shared_assets_dir) video_files.append((rel_path, item)) elif item.is_dir(): # Scan subdirectories recursively for video_file in item.rglob("*"): if ( video_file.is_file() and video_file.suffix.lower() in video_extensions and not video_file.name.endswith("_processed.mov") and not video_file.name.endswith("_processed.webm") ): rel_path = video_file.relative_to(shared_assets_dir) video_files.append((rel_path, video_file)) if not video_files: if verbose: print(f" No video files found in {shared_assets_dir}") return # Load existing videos.json if it exists videos_json_path = shared_assets_dir / "videos.json" existing_videos: dict = {} if videos_json_path.exists(): existing_videos = _read_json(videos_json_path) # Add new videos (don't overwrite existing) added_count = 0 for rel_path, abs_path in sorted(video_files): # Use path relative to shared_assets without extension as video_id # e.g., "Logo" for root files, "pexels/6759604-hd" for subdirectory files video_id = str(rel_path.with_suffix("")) if video_id in existing_videos: if verbose: print(f" Skipping {video_id} (already exists)") continue existing_videos[video_id] = { "source_file": str(rel_path), } added_count += 1 if verbose: print(f" Added: {video_id}") if added_count > 0: # Write updated videos.json with open(videos_json_path, "w", encoding="utf-8") as f: json.dump(existing_videos, f, indent=2) print(f" Updated {videos_json_path} (+{added_count} shared assets)") else: print(f" No new shared assets to add") def _generate_slides_json(directory: Path, verbose: bool) -> None: """Generate slides.json from Keynote export folder.""" extensions = {".png", ".gif", ".pdf", ".jpg", ".jpeg"} files = [f for f in directory.iterdir() if f.suffix.lower() in extensions] if not files: print(f" Warning: No image files in {directory}") return # Extract numeric suffix from filenames like "Video1.001.png" pattern = re.compile(r"\.(\d+)\.[^.]+$") slides = {} for file in files: match = pattern.search(file.name) if match: num = int(match.group(1)) slide_id = f"S{num}" slides[slide_id] = { "image": file.name, "type": "fullscreen", } if not slides: print(f" Warning: No valid slide files in {directory}") return # Sort by slide number sorted_slides = dict(sorted(slides.items(), key=lambda x: int(x[0][1:]))) # Write slides.json only if content changed output_path = directory / "slides.json" new_content = json.dumps(sorted_slides, indent=2) existing_content = output_path.read_text(encoding="utf-8") if output_path.exists() else None if new_content != existing_content: with open(output_path, "w", encoding="utf-8") as f: f.write(new_content) print(f" Generated {output_path} ({len(sorted_slides)} slides)") if verbose: for slide_id in sorted_slides: print(f" [{slide_id}]") def _import_videos(videos_dir: Path, config, verbose: bool) -> None: """Import video files into videos.json. Scans the videos directory for video files and adds them to videos.json. Uses the filename (without extension) as the video_id. Does not overwrite existing entries - only adds new ones. If the video filename matches a pattern like 'talkinghead*' and a 'talkinghead' filter preset exists in default_filters, it will be applied automatically. """ video_extensions = {".mov", ".mp4", ".webm", ".avi", ".mkv", ".m4v"} # Find all video files (exclude processed outputs, proxies, and intermediate files) video_files = [ f for f in videos_dir.iterdir() if f.is_file() and f.suffix.lower() in video_extensions and "_processed" not in f.stem # Exclude any _processed files and "_fixed" not in f.stem # Exclude any _fixed files and not f.name.startswith("narration_combined") ] # Also exclude files in subdirectories (proxy/, intermediate/, etc.) video_files = [f for f in video_files if f.parent == videos_dir] # Ensure videos.json exists even if there are no video files yet videos_json_path = videos_dir / "videos.json" if not videos_json_path.exists(): videos_dir.mkdir(parents=True, exist_ok=True) with open(videos_json_path, "w", encoding="utf-8") as f: json.dump({}, f, indent=2) print( f" Created empty {videos_json_path.relative_to(videos_dir.parent.parent)}" ) if not video_files: if verbose: print(f" No new video files found in {videos_dir}") return # Load existing videos.json existing_videos: dict = {} if videos_json_path.exists(): existing_videos = _read_json(videos_json_path) # Get available filter presets from config default_filters = config.default_filters if config else {} # Add new videos (don't overwrite existing) added_count = 0 for video_file in sorted(video_files): # Use filename without extension as video_id video_id = video_file.stem if video_id in existing_videos: if verbose: print(f" Skipping {video_id} (already exists)") continue # Determine if this is a talking head segment # Match patterns like: talkinghead, talkingheadS01, talkinghead_s01, etc. is_narration_combined = "narration_combined" in video_file.stem.lower() # Build the video entry video_entry = { "source_file": video_file.name, } if is_narration_combined: video_entry["output_file"] = None video_entry["cutout"] = "talkinghead" video_entry["always_visible"] = True video_entry["skip"] = 0 video_entry["filter"] = [] print(f" Added talking head segment: {video_id}") else: # Regular video video_entry["output_file"] = video_file.name video_entry["cutout"] = "square" video_entry["filter"] = [] if verbose: print(f" Added: {video_id}") existing_videos[video_id] = video_entry added_count += 1 if added_count > 0: # Write updated videos.json with open(videos_json_path, "w", encoding="utf-8") as f: json.dump(existing_videos, f, indent=2) print(f" Updated {videos_json_path.name} (+{added_count} videos)") else: print(f" No new videos to add") def _import_narration_segments(narration_dir: Path, config, verbose: bool) -> None: """Import narration video files into narration.json. Folder structure: media/narration/raw_mov/ ← raw recordings from iPhone/QuickTime media/narration/processed/ ← chroma-keyed output (preprocess) media/narration/narration.json Scans processed/ for ready-to-stitch files and raw/ for any new raw recordings not yet represented in narration.json. Priority: processed/ files define the segment catalogue. Raw files discovered in raw/ add new entries pointing at raw/ with output_file preset to processed/_processed.mov. """ video_extensions = {".mov", ".mp4", ".webm", ".avi", ".mkv", ".m4v"} processed_dir = narration_dir / "processed" raw_dir = narration_dir / "raw_mov" processed_dir.mkdir(parents=True, exist_ok=True) raw_dir.mkdir(parents=True, exist_ok=True) # Load / create narration.json narration_json_path = narration_dir / "narration.json" existing_narration: dict = {} if narration_json_path.exists(): existing_narration = _read_json(narration_json_path) default_filters = config.default_filters if config else {} added_count = 0 def _scan(directory: Path) -> list[Path]: if not directory.exists(): return [] return sorted( f for f in directory.iterdir() if f.is_file() and f.suffix.lower() in video_extensions and not f.name.startswith(".") ) # 1. Scan processed/ — only add entries when NO raw_mov equivalent exists. # If raw_mov has the source, step 2 will create the entry pointing there # (with the filter chain), which is better for re-processing later. _raw_video_exts = {".mov", ".mp4", ".avi", ".mkv", ".m4v"} for video_file in _scan(processed_dir): segment_id = video_file.stem # Strip _processed suffix for cleaner segment IDs if present if segment_id.endswith("_processed"): segment_id = segment_id[:-10] if segment_id in existing_narration: if verbose: print(f" Skipping {segment_id} (already exists)") continue # If a raw_mov equivalent exists, skip — step 2 will handle it raw_mov_has_file = raw_dir.exists() and any( (raw_dir / f"{segment_id}{ext}").exists() for ext in _raw_video_exts ) if raw_mov_has_file: continue narration_entry = { "source_file": f"processed/{video_file.name}", } narration_entry["use_audio_channels"] = "auto" narration_entry["defer_loudnorm"] = True existing_narration[segment_id] = narration_entry added_count += 1 print(f" Added narration segment: {segment_id} (from processed/)") # 2. Scan raw/ — add entries for raw files not yet in narration.json for video_file in _scan(raw_dir): segment_id = video_file.stem if segment_id in existing_narration: if verbose: print(f" Skipping {segment_id} (already exists)") continue narration_entry = { "source_file": f"raw_mov/{video_file.name}", "output_file": f"processed/{video_file.stem}_processed.mov", } if "talkinghead" in default_filters: narration_entry["cutout"] = "talkinghead" narration_entry["filter"] = "talkinghead" narration_entry["use_audio_channels"] = "auto" narration_entry["defer_loudnorm"] = True existing_narration[segment_id] = narration_entry added_count += 1 print(f" Added narration segment: {segment_id} (from raw_mov)") if added_count > 0 or not narration_json_path.exists(): with open(narration_json_path, "w", encoding="utf-8") as f: json.dump(existing_narration, f, indent=2) if added_count > 0: print(f" Updated narration.json (+{added_count} segments)") else: if not existing_narration: print(f" narration.json created (empty — add files to processed/ or raw/)") else: print(f" No new narration segments to add") def _import_presenter_notes( project_path: Path, keynote_file: Path, verbose: bool ) -> None: """Extract presenter notes from Keynote and write to manuscript.txt. Uses the JXA script (extract_keynote_notes.js) to extract notes via osascript. Also exports slides as PNG images to media/slides/{project_name}/. Backs up existing manuscript.txt before overwriting. """ # osascript is macOS-only; skip gracefully on WSL/Linux/Windows if shutil.which("osascript") is None: print( f" Warning: osascript not available (not macOS) — skipping Keynote import for {keynote_file.name}.", file=sys.stderr, ) return print(f" Extracting presenter notes from {keynote_file.name}...") # Find the JXA script (in the same directory as this module) script_dir = Path(__file__).parent jxa_script = script_dir / "extract_keynote_notes.js" if not jxa_script.exists(): print(f" Error: JXA script not found at {jxa_script}", file=sys.stderr) return # Backup existing manuscript.txt if it exists manuscript_path = project_path / "manuscript.txt" if manuscript_path.exists(): timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_path = project_path / f"manuscript.txt.{timestamp}.bak" shutil.copy2(manuscript_path, backup_path) if verbose: print(f" Backed up manuscript.txt to {backup_path.name}") # Slides export directory: {project}/media/slides/{project_name}/ # Use lowercase so the path is consistent on case-sensitive filesystems (WSL/Linux). slides_dir = project_path / "media" / "slides" / project_path.name.lower() print(f" Exporting slides to {slides_dir}...") # Run JXA extractor via osascript (also exports slides) proc = subprocess.run( [ "osascript", "-l", "JavaScript", str(jxa_script), str(keynote_file.resolve()), str(slides_dir.resolve()), ], capture_output=True, text=True, ) if proc.returncode != 0: print(f" Error extracting presenter notes:", file=sys.stderr) print(f" {proc.stderr}", file=sys.stderr) return # Parse JSON output from JXA script try: notes_data = json.loads(proc.stdout) if proc.stdout.strip() else [] except json.JSONDecodeError as e: print(f" Error parsing notes JSON: {e}", file=sys.stderr) return # Convert to manuscript.txt format lines = [] for item in notes_data: idx = item.get("slide_index") notes = (item.get("notes") or "").rstrip() lines.append(f"[S{idx}]") if notes: lines.append(notes) lines.append("") # blank line between slides # Write manuscript.txt manuscript_path.write_text("\n".join(lines).rstrip() + "\n", encoding="utf-8") print(f" Wrote {manuscript_path} ({len(notes_data)} slides)") if verbose: non_empty = sum(1 for item in notes_data if item.get("notes")) print(f" {non_empty} slides have presenter notes") # ============================================================================= # Tasks File # ============================================================================= _TASKS_VIDEO_PREFIXES = { "video:": 6, "vft:": 4, "vfb:": 4, "vf2t:": 5, "vf2b:": 5, "vst:": 4, "vsb:": 4, "vftp:": 5, "vfbp:": 5, "vf2tp:": 6, "vf2bp:": 6, "vstp:": 5, "vsbp:": 5, "narration:": 10, } def _collect_missing_video_markers( markers: list[str], videos: dict ) -> list[tuple[str, str]]: """Return (marker_text, video_id) for video markers not defined in videos.json.""" missing = [] seen = set() for marker in markers: matched = next((p for p in _TASKS_VIDEO_PREFIXES if marker.startswith(p)), None) if matched is None: continue video_id = marker[_TASKS_VIDEO_PREFIXES[matched] :] if video_id not in videos and video_id not in seen: seen.add(video_id) missing.append((marker, video_id)) return missing def _write_tasks_file( project_path: Path, missing_videos: list[tuple[str, str]], alignment_issues: list[tuple[str, str]], ) -> None: """Write tasks.md to project_path with missing assets and alignment issues.""" tasks_path = project_path / "tasks.md" today = datetime.now().strftime("%Y-%m-%d") lines = [ f"# Tasks: {project_path.name}", f"_Generated: {today}_", "", ] if missing_videos: lines += [ f"## Missing Video Assets ({len(missing_videos)})", "Referenced in manuscript.txt but not defined in videos.json.", "", ] for marker, video_id in missing_videos: lines.append(f"- [ ] `{video_id}` — referenced as `[{marker}]`") lines.append("") if alignment_issues: lines += [ f"## Slide Alignment Issues ({len(alignment_issues)})", "Slide markers that could not be matched to the spoken narration (likely adlibbed).", "", ] for marker_id, context in alignment_issues: lines.append(f'- [ ] `{marker_id}` — _"{context}"_') lines.append("") if not missing_videos and not alignment_issues: lines += ["_No outstanding tasks._", ""] tasks_path.write_text("\n".join(lines), encoding="utf-8") print( f" Tasks written → tasks.md" + (f" ({len(missing_videos)} missing videos)" if missing_videos else "") + (f" ({len(alignment_issues)} alignment issues)" if alignment_issues else "") ) # ============================================================================= # Validate Command # ============================================================================= def cmd_validate(project_path: Path, verbose: bool) -> int: """Validate project configuration.""" from .parser import ( parse_manuscript, parse_project_config, parse_slides, parse_videos, ) from .validator import validate_project print(f"Validating: {project_path.name}") if not (project_path / "project.json").exists(): print(f"Error: project.json not found in {project_path}", file=sys.stderr) return 1 # Parse all files _, markers, malformed, _ = parse_manuscript(project_path) config = parse_project_config(project_path) slides = parse_slides(project_path, config) videos, videos_dir = parse_videos(project_path, config) if verbose: print(f" - Markers in manuscript: {len(markers)}") print(f" - Slides defined: {len(slides)}") print(f" - Videos defined: {len(videos)}") # Validate warnings = validate_project( project_path, markers, config, slides, videos, videos_dir, malformed ) for w in warnings: print(f" Warning: {w}") # Write tasks file (missing assets only — no alignment data at validate time) missing_videos = _collect_missing_video_markers(markers, videos) _write_tasks_file(project_path, missing_videos, alignment_issues=[]) print("Validation passed.") return 0 # ============================================================================= # Preprocess Command # ============================================================================= def _resolve_process_cache(project_path: Path, config) -> Optional[Path]: """Return per-project cache dir on external disk, or None if not configured.""" if not (config and config.process_cache): return None p = Path(config.process_cache) if not p.is_absolute(): p = (project_path / p).resolve() return p / project_path.name def _resolve_narration_combined( project_path: Path, videos_dir: Path, config ) -> Optional[Path]: """Find narration_combined.mov: local → GnommoCache → process_cache.""" local = videos_dir / "narration_combined.mov" if local.exists(): return local resolved, _ = resolve_with_cache(local, project_path) if resolved.exists(): return resolved pc_root = _resolve_process_cache(project_path, config) if pc_root: pc_path = pc_root / "media" / "videos" / "narration_combined.mov" if pc_path.exists(): return pc_path return None def cmd_preprocess( project_path: Path, verbose: bool, dry_run: bool, force: bool = False, workers: int = 1, res: str = "full", ) -> int: """Run preprocessing pipeline on narration segments and videos. Discovers source files directly from raw_mov/ (preferred) or raw_mp4/ (fallback when raw_mov/ is empty). Does NOT require narration.json to exist — it writes/updates narration.json after processing. """ from concurrent.futures import ThreadPoolExecutor, as_completed from .parser import parse_project_config, parse_videos from .preprocessor import preprocess_video, RES_CONFIGS from .models import VideoSource as _VideoSource mode_str = f" ({res.upper()})" if res != "full" else "" print(f"Preprocessing narration: {project_path.name}{mode_str}") config = parse_project_config(project_path) # Narration directory — source files always in project media/narration/ narration_dir = project_path / "media" / "narration" narration_dir.mkdir(parents=True, exist_ok=True) raw_dir = narration_dir / "raw_mov" compressed_dir = narration_dir / "raw_mp4" # process_cache: write processed outputs to external disk to save laptop space cache_root = _resolve_process_cache(project_path, config) if cache_root: # Mirror the project's media/ structure so GnommoCache (resolve_with_cache) # finds these files transparently during render/stitch. cache_narration_dir = cache_root / "media" / "narration" cache_narration_dir.mkdir(parents=True, exist_ok=True) (cache_narration_dir / "processed").mkdir(parents=True, exist_ok=True) print(f" Using process cache: {cache_root}") else: cache_narration_dir = None processed_dir = (cache_narration_dir or narration_dir) / "processed" processed_dir.mkdir(parents=True, exist_ok=True) # Resolve intermediate directory gnommo_scratch = None if config.gnommo_scratch: gnommo_scratch = Path(config.gnommo_scratch) if not gnommo_scratch.is_absolute(): gnommo_scratch = project_path / gnommo_scratch print(f" Using intermediate dir: {gnommo_scratch}") # --- Filter pipeline --- talkinghead_filter = (config.default_filters or {}).get("talkinghead", []) if not talkinghead_filter: print( " ERROR: No 'talkinghead' filter defined in project.json default_filters." ) print(" Add a 'talkinghead' entry under 'default_filters' in project.json.") return 1 # --- Source discovery --- _video_exts = {".mov", ".mp4", ".avi", ".mkv", ".m4v"} def _scan_dir(d: Path) -> list[Path]: if not d.exists(): return [] return sorted( f for f in d.iterdir() if f.is_file() and f.suffix.lower() in _video_exts and not f.name.startswith(".") ) raw_mov_files = _scan_dir(raw_dir) raw_mp4_files = _scan_dir(compressed_dir) if raw_mov_files: source_files = raw_mov_files using_compressed = False elif raw_mp4_files: source_files = raw_mp4_files using_compressed = True print( " WARNING: raw_mov/ is empty — using compressed files from raw_mp4/ instead. Quality may be reduced." ) else: print(f" No source files found in raw_mov/ or raw_mp4/.") print(f" Place .mov recordings in {raw_dir}") return 1 # --- Load existing narration.json to preserve per-segment settings --- narration_json_path = narration_dir / "narration.json" existing_narration: dict = {} if narration_json_path.exists(): existing_narration = _read_json(narration_json_path) # --- Build segments list --- segments_to_process: list[tuple[str, _VideoSource]] = [] skipped_count = 0 for source_file in source_files: segment_id = source_file.stem # Strip _compressed suffix (raw_mp4 naming convention) if using_compressed and segment_id.endswith("_compressed"): segment_id = segment_id[: -len("_compressed")] # For non-full res, write into the res subdir so stitch --res low finds the # files at narration/low/processed/ (narration.json still records the plain # "processed/..." path; stitch shifts the base dir itself). _res_cfg = RES_CONFIGS.get(res) if res != "full" else None if _res_cfg: _, _, _subdir = _res_cfg output_file = f"{_subdir}/processed/{segment_id}_processed.mov" else: output_file = f"processed/{segment_id}_processed.mov" # When process_cache is set, output goes to the cache dir; narration.json # still records the relative path so stitch (also using cache) can find it. output_base = cache_narration_dir or narration_dir output_path = output_base / output_file if output_path.exists() and not force: print(f" {segment_id}: output exists, skipping (use --force to reprocess)") skipped_count += 1 continue # Filter: from existing narration.json entry (if explicitly set), else talkinghead existing_entry = existing_narration.get(segment_id, {}) raw_filter = existing_entry.get("filter") if raw_filter: if isinstance(raw_filter, str): filter_list = (config.default_filters or {}).get( raw_filter, talkinghead_filter ) else: filter_list = raw_filter else: filter_list = talkinghead_filter video_source = _VideoSource( source_file=source_file, filter=filter_list, output_file=output_file, use_audio_channels=existing_entry.get("use_audio_channels", "auto"), defer_loudnorm=existing_entry.get("defer_loudnorm", True), ) segments_to_process.append((segment_id, video_source)) if not segments_to_process: if skipped_count: print( f"\n All {skipped_count} segment(s) already preprocessed. Use --force to reprocess." ) else: print("\n No segments to preprocess.") return 0 if dry_run: for segment_id, segment_source in segments_to_process: print(f"\n Would preprocess: {segment_id}") print(f" Source: {segment_source.source_file}") print(f" Output: {segment_source.output_file}") print(f" Filters: {len(segment_source.filter)} step(s)") return 0 # --- Process segments --- successfully_processed: list[tuple[str, _VideoSource]] = [] if workers > 1 and len(segments_to_process) > 1: num_workers = min(workers, len(segments_to_process)) print( f"\n Processing {len(segments_to_process)} segments in parallel ({num_workers} workers)" ) def process_segment_task(task): seg_id, seg_source = task preprocess_video( cache_narration_dir or narration_dir, seg_id, seg_source, verbose=False, force=force, custom_gnommo_scratch=gnommo_scratch, res=res, ) return task completed = 0 with ThreadPoolExecutor(max_workers=num_workers) as executor: futures = { executor.submit(process_segment_task, t): t for t in segments_to_process } for future in as_completed(futures): seg_id, seg_source = future.result() completed += 1 print(f" Completed: {seg_id} ({completed}/{len(segments_to_process)})") output_path = (cache_narration_dir or narration_dir) / seg_source.output_file if output_path.exists(): successfully_processed.append((seg_id, seg_source)) else: for segment_id, segment_source in segments_to_process: _out_full = (cache_narration_dir or narration_dir) / segment_source.output_file print(f"\n Processing: {segment_id}") print(f" Source: {segment_source.source_file}") print(f" Output: {_out_full}") print(f" Filters: {len(segment_source.filter)} step(s)") preprocess_video( cache_narration_dir or narration_dir, segment_id, segment_source, verbose, force, gnommo_scratch, res=res, ) output_path = (cache_narration_dir or narration_dir) / segment_source.output_file if output_path.exists(): successfully_processed.append((segment_id, segment_source)) # --- Update narration.json --- # Write processed segments; preserve any existing per-segment settings (skip/take/etc.) _PRESERVE_KEYS = ( "skip", "take", "begin", "end", "cutout", "use_audio_channels", "defer_loudnorm", "volume", "zoom", ) for segment_id, segment_source in successfully_processed: existing_entry = existing_narration.get(segment_id, {}) entry: dict = {} # Preserve settings the user may have set (trim points, cutout, etc.) for key in _PRESERVE_KEYS: if key in existing_entry: entry[key] = existing_entry[key] # Always record the plain path; stitch shifts the base dir for low/tiny. entry["source_file"] = f"processed/{segment_id}_processed.mov" entry.setdefault("use_audio_channels", "auto") entry.setdefault("defer_loudnorm", True) existing_narration[segment_id] = entry with open(narration_json_path, "w", encoding="utf-8") as f: json.dump(existing_narration, f, indent=2) if successfully_processed: print(f"\n Updated narration.json ({len(successfully_processed)} segment(s))") print( f"\n Run 'gnommo -p stitch' to stitch narration segments into one full length narration file." ) # Also preprocess videos from videos.json (e.g. chroma key, color grade) videos, videos_dir = parse_videos(project_path, config) videos_to_process = [ (vid_id, vid_src) for vid_id, vid_src in videos.items() if vid_src.filter and not vid_src.is_shared ] if videos_to_process: print(f"\n Processing {len(videos_to_process)} video(s) from videos.json:") for video_id, video_source in videos_to_process: if video_source.output_file: output_path = videos_dir / video_source.output_file if output_path.exists() and not force: print( f" {video_id}: output exists, skipping (use --force to reprocess)" ) continue if dry_run: print( f" Would preprocess: {video_id} ({len(video_source.filter)} filter(s))" ) continue print(f" Processing: {video_id}") preprocess_video( videos_dir, video_id, video_source, verbose, force, gnommo_scratch, res=res ) print("\nPreprocessing complete.") return 0 # ============================================================================= # Trim Command — auto-detect silence bounds for narration segments # ============================================================================= def cmd_trim( project_path: Path, verbose: bool, force: bool = False, threshold_db: float = -40.0, res: str = "full", ) -> int: """ Auto-detect silence bounds for all narration segments and write skip/take values into narration.json. For each segment: skip = max(0, first_sound_time - 0.5) take = last_sound_time + 3.0 - skip (capped at file duration) Segments that already have explicit skip or take values are left unchanged unless --force is passed. Use --threshold to adjust sensitivity, e.g. -25 to ignore clothing/room noise that sits above -40 dB. """ from .parser import parse_project_config, parse_narration from .preprocessor import detect_silence_bounds, get_video_duration print(f"Auto-trimming narration: {project_path.name}") config = parse_project_config(project_path) narration, narration_dir = parse_narration(project_path, config) if not narration: print(" No narration segments found in narration.json") print(" Run 'gnommo -p import' first.") return 1 # Build a lookup of raw source files by segment ID. Raw files give cleaner # silence detection — loudnorm can introduce early peaks in processed audio. _video_exts = {".mov", ".mp4", ".avi", ".mkv", ".m4v"} raw_dir = narration_dir / "raw_mov" compressed_dir = narration_dir / "raw_mp4" raw_lookup: dict[str, Path] = {} for search_dir in (raw_dir, compressed_dir): if search_dir.exists(): for f in search_dir.iterdir(): if f.is_file() and f.suffix.lower() in _video_exts and not f.name.startswith("."): stem = f.stem if stem.endswith("_compressed"): stem = stem[: -len("_compressed")] raw_lookup[stem] = f narration_json_path = narration_dir / "narration.json" raw_data: dict = _read_json(narration_json_path) updated = 0 for seg_id in sorted(narration.keys()): seg = narration[seg_id] existing = raw_data.get(seg_id, {}) has_explicit = "skip" in existing or "take" in existing if has_explicit and not force: print(f" {seg_id}: already trimmed, skipping (use --force to redo)") continue # Prefer raw file; fall back to processed if raw not available. source_path = raw_lookup.get(seg_id) if source_path is None: source_path = narration_dir / seg.source_file if not source_path.exists(): print(f" {seg_id}: source file not found, skipping") continue print(f" {seg_id}: analysing {source_path.parent.name}/{source_path.name}...", end="", flush=True) first_sound, last_sound = detect_silence_bounds( source_path, noise_threshold_db=threshold_db, verbose=verbose ) total_dur = get_video_duration(source_path) new_skip = max(0.0, round(first_sound - 0.5, 3)) new_take = round(min(total_dur - new_skip, last_sound + 3.0 - new_skip), 3) new_take = max(0.0, new_take) print( f" first={first_sound:.2f}s last={last_sound:.2f}s" f" → skip={new_skip:.3f}s take={new_take:.3f}s" ) raw_data[seg_id]["skip"] = new_skip raw_data[seg_id]["take"] = new_take updated += 1 if updated > 0: with open(narration_json_path, "w", encoding="utf-8") as f: json.dump(raw_data, f, indent=2) print(f"\n Updated {updated} segment(s) in narration.json") else: print(f"\n No segments updated") return 0 # ============================================================================= # Transcode Command — compress narration folder to H.265 # ============================================================================= def _get_video_codec(path: Path) -> str: """Return the codec name of the first video stream (e.g. 'hevc', 'prores', 'h264').""" result = subprocess.run( [ "ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=codec_name", "-of", "default=noprint_wrappers=1:nokey=1", str(path), ], capture_output=True, text=True, ) return result.stdout.strip().lower() def _transcode_processed_files( project_path: Path, verbose: bool, dry_run: bool, replace: bool, force: bool, alpha_quality: float, ) -> int: """ Compress _processed.mov files (ProRes 4444 + alpha) to HEVC+alpha via Apple VideoToolbox. For each _processed.mov: 1. Transcode to a temp file using hevc_videotoolbox with alpha. 2. Move the ProRes original into a prores/ subdirectory (never deleted). 3. Rename the compressed file to the original _processed.mov name so stitch/render find it unchanged. The prores/ subdirectory is never scanned — only top-level files are candidates. If prores/ already exists the file has already been compressed — skip unless --force. """ from .parser import parse_project_config, parse_narration print(f"Transcoding processed files (HEVC+alpha): {project_path.name}") config = parse_project_config(project_path) # Resolve narration_dir and videos_dir — processed files live in both _narration, narration_dir = parse_narration(project_path, config) videos_json_path = project_path / config.videos_path videos_dir = videos_json_path.parent # Glob both directories for *_processed.mov; skip any _prores.mov archives search_dirs = [d for d in [narration_dir, videos_dir] if d.exists()] candidates: list[Path] = [] seen: set[Path] = set() for d in search_dirs: for p in d.glob("*_processed.mov"): if p not in seen and "_prores" not in p.stem: seen.add(p) candidates.append(p) if not candidates: print(" No _processed.mov files found.") return 0 # Smallest first candidates = [c for c in candidates if c.exists()] candidates.sort(key=lambda f: f.stat().st_size) total_original = 0 total_compressed = 0 transcoded = 0 skipped = 0 for src in candidates: # Archive goes into prores/ subdirectory alongside the source file prores_dir = src.parent / "prores" archive = prores_dir / src.name # Always skip files already encoded as HEVC — regardless of --replace or --force if _get_video_codec(src) == "hevc": print(f" {src.name}: already HEVC, skipping") skipped += 1 continue # Without --replace, skip if the archive already exists in prores/ if not replace and archive.exists() and not force: size_mb = src.stat().st_size / 1_048_576 print( f" {src.name}: already compressed ({size_mb:.1f} MB), skipping (use --force to redo)" ) skipped += 1 continue src_mb = src.stat().st_size / 1_048_576 print(f" {src.name} ({src_mb:.1f} MB) → HEVC+alpha", end="") if dry_run: print(" [dry-run]") continue print(" ...", end="", flush=True) tmp_out = src.with_name(src.stem + "_hevc_tmp.mov") cmd = [ "ffmpeg", "-i", str(src), "-c:v", "hevc_videotoolbox", "-allow_sw", "1", "-alpha_quality", str(alpha_quality), "-tag:v", "hvc1", "-c:a", "copy", "-y", str(tmp_out), ] if verbose: print() print(" " + " ".join(cmd)) result = subprocess.run( cmd, capture_output=not verbose, text=True, ) if result.returncode != 0: print(f"\n ERROR transcoding {src.name}") if tmp_out.exists(): tmp_out.unlink() if not verbose and result.stderr: last_lines = result.stderr.strip().splitlines()[-5:] for line in last_lines: print(f" {line}", file=sys.stderr) continue out_mb = tmp_out.stat().st_size / 1_048_576 ratio = (1.0 - tmp_out.stat().st_size / src.stat().st_size) * 100 if replace: # Delete ProRes original, move compressed into its place src.unlink() tmp_out.rename(src) print( f"\r {src.name} ({src_mb:.1f} MB) → HEVC+alpha" f" ({out_mb:.1f} MB, -{ratio:.0f}%)" ) else: # Move ProRes original into prores/ subdirectory, compressed takes its place prores_dir.mkdir(exist_ok=True) src.rename(archive) tmp_out.rename(src) print( f"\r {src.name} ({src_mb:.1f} MB) → HEVC+alpha" f" ({out_mb:.1f} MB, -{ratio:.0f}%)" f" [ProRes → prores/{archive.name}]" ) total_original += int(src_mb * 1_048_576) total_compressed += int(out_mb * 1_048_576) transcoded += 1 print() if dry_run: print(f" [dry-run] Would compress {len(candidates) - skipped} file(s)") return 0 if transcoded > 0: orig_mb = total_original / 1_048_576 comp_mb = total_compressed / 1_048_576 saved_mb = orig_mb - comp_mb ratio = (saved_mb / orig_mb * 100) if orig_mb else 0 print( f" Compressed {transcoded} file(s): {orig_mb:.1f} MB → {comp_mb:.1f} MB" f" (saved {saved_mb:.1f} MB, -{ratio:.0f}%)" ) if skipped: print(f" Skipped {skipped} already-compressed file(s)") return 0 def cmd_transcode( project_path: Path, verbose: bool, dry_run: bool = False, replace: bool = False, crf: int = 23, force: bool = False, processed: bool = False, alpha_quality: float = 0.75, ) -> int: """ Transcode project video files to save disk space. Default (1st pass, before preprocess): Compress raw narration recordings to H.265. Output: {stem}_compressed.mp4. Skips files with '_compressed.' or '_processed.' in the name. Use --replace to delete originals after success. With --processed (2nd pass, after preprocess): Compress _processed.mov files (ProRes 4444 + alpha) to HEVC+alpha. Archives the ProRes original as _prores.mov (never deleted). The compressed file takes the original _processed.mov name so the rest of the pipeline (stitch, render) finds it unchanged. Uses Apple VideoToolbox (hevc_videotoolbox) with --alpha-quality. """ if processed: return _transcode_processed_files( project_path, verbose, dry_run, replace, force, alpha_quality ) from .parser import parse_project_config, parse_narration print(f"Transcoding narration: {project_path.name}") config = parse_project_config(project_path) _narration, narration_dir = parse_narration(project_path, config) raw_dir = narration_dir / "raw_mov" compressed_dir = narration_dir / "raw_mp4" if not raw_dir.exists(): print(f" raw/ directory not found: {raw_dir}", file=sys.stderr) print(f" Place raw recordings in {raw_dir} and run 'import' first.") return 1 compressed_dir.mkdir(parents=True, exist_ok=True) # Collect eligible video files from raw/ only video_extensions = {".mp4", ".mov", ".avi", ".mkv", ".m4v", ".mts", ".webm"} candidates = [ f for f in raw_dir.iterdir() if f.is_file() and f.suffix.lower() in video_extensions and not f.name.startswith(".") ] if not candidates: print(f" No video files found in {raw_dir}.") return 0 # Process smallest files first candidates.sort(key=lambda f: f.stat().st_size) total_original = 0 total_compressed = 0 transcoded = 0 skipped = 0 for src in candidates: # Output: compressed/.mp4 (clean name, no _compressed suffix) output = compressed_dir / f"{src.stem}.mp4" if output.exists() and not force: size_mb = output.stat().st_size / 1_048_576 print( f" {src.name}: already transcoded ({size_mb:.1f} MB), skipping (use --force to redo)" ) skipped += 1 continue src_mb = src.stat().st_size / 1_048_576 print( f" raw/{src.name} ({src_mb:.1f} MB) → compressed/{output.name}", end="" ) if dry_run: print(" [dry-run]") continue print(" ...", end="", flush=True) cmd = [ "ffmpeg", "-i", str(src), "-vf", "scale=-2:1080", "-c:v", "libx265", "-crf", str(crf), "-preset", "medium", "-c:a", "aac", "-b:a", "128k", "-tag:v", "hvc1", "-y", str(output), ] if verbose: print() print(" " + " ".join(cmd)) result = subprocess.run( cmd, capture_output=not verbose, text=True, ) if result.returncode != 0: print(f"\n ERROR transcoding {src.name}") if not verbose and result.stderr: # Print last few lines of ffmpeg stderr for diagnosis last_lines = result.stderr.strip().splitlines()[-5:] for line in last_lines: print(f" {line}", file=sys.stderr) continue out_mb = output.stat().st_size / 1_048_576 ratio = (1.0 - output.stat().st_size / src.stat().st_size) * 100 print( f"\r raw/{src.name} ({src_mb:.1f} MB) → compressed/{output.name} ({out_mb:.1f} MB, -{ratio:.0f}%)" ) total_original += src.stat().st_size total_compressed += output.stat().st_size transcoded += 1 print() if dry_run: print(f" [dry-run] Would transcode {len(candidates) - skipped} file(s)") return 0 if transcoded > 0: orig_mb = total_original / 1_048_576 comp_mb = total_compressed / 1_048_576 saved_mb = orig_mb - comp_mb ratio = (saved_mb / orig_mb * 100) if orig_mb else 0 print( f" Transcoded {transcoded} file(s): {orig_mb:.1f} MB → {comp_mb:.1f} MB (saved {saved_mb:.1f} MB, -{ratio:.0f}%)" ) if replace: print(f" Originals deleted.") if skipped: print(f" Skipped {skipped} already-transcoded file(s)") return 0 # ============================================================================= # Stitch Command (fast iteration on narration segments) # ============================================================================= def cmd_stitch( project_path: Path, verbose: bool, force: bool = False, res: str = "full", ) -> int: """ Stitch narration segments from narration.json. Reads segments from media/narration/narration.json, applies begin/end trimming during concatenation, and writes output to media/videos/narration_combined.mov. Also creates/updates an entry in videos.json with volume property. """ from .parser import parse_project_config, parse_narration, parse_videos from .preprocessor import ( stitch_narration_segments, ensure_downscaled_files_exist, RES_CONFIGS, ) mode_str = f" ({res.upper()})" if res != "full" else "" print(f"Stitching narration: {project_path.name}{mode_str}") config = parse_project_config(project_path) narration, narration_dir = parse_narration(project_path, config) if not narration: print(" No narration segments found in media/narration/narration.json") print(" Run 'gnommo -p import' first to populate narration.json") return 1 # Get videos_dir for output if config and config.videos_path: videos_json_path = project_path / config.videos_path videos_dir = videos_json_path.parent else: videos_dir = project_path / "media" / "videos" # When process_cache is set, redirect processed segment reads and combined output. # Mirror media/ structure so GnommoCache (resolve_with_cache) finds files during render. cache_root = _resolve_process_cache(project_path, config) if cache_root: narration_dir = cache_root / "media" / "narration" narration_dir.mkdir(parents=True, exist_ok=True) videos_dir_out = cache_root / "media" / "videos" videos_dir_out.mkdir(parents=True, exist_ok=True) print(f" Using process cache: {cache_root}") else: videos_dir_out = videos_dir # Use downscaled dirs for non-full res if res != "full": cfg = RES_CONFIGS[res] narration_dir = ensure_downscaled_files_exist( narration_dir, res, force=False, verbose=verbose ) videos_dir_out = videos_dir_out / cfg[2] videos_dir_out.mkdir(parents=True, exist_ok=True) print(f" Using {res} dirs: {narration_dir}, {videos_dir_out}") # Get segment IDs in sorted order segment_ids = sorted(narration.keys()) # Show what we're stitching print(f"\n Segments ({len(segment_ids)}):") for segment_id in segment_ids: seg = narration[segment_id] skip_str = f"skip={seg.skip:.1f}s" if seg.skip else "" take_str = f"take={seg.take:.1f}s" if seg.take else "" trim_info = ", ".join(filter(None, [skip_str, take_str])) trim_str = f" ({trim_info})" if trim_info else "" print(f" - {segment_id}{trim_str}") stitch_output = videos_dir_out / "narration_combined.mov" if stitch_output.exists() and not force: print(f"\n Combined narration exists: {stitch_output.name}") print(" (use --force to regenerate)") else: # Extract loudnorm config from talkinghead filter so stitch uses # per-project settings instead of hardcoded defaults. _loudnorm_cfg = None if config and config.default_filters: for _f in (config.default_filters.get("talkinghead") or []): if isinstance(_f, dict) and _f.get("type") == "audio_normalize": _loudnorm_cfg = _f break stitch_narration_segments( narration_dir, segment_ids, narration, stitch_output, verbose=verbose, default_end_trim=config.default_end_trim if config else 0.0, loudnorm_config=_loudnorm_cfg, ) # Run import videos again to update duration metadata (skip when using cache # since narration_combined.mov lives on the external disk, not in videos_dir). if not cache_root: _import_videos(videos_dir_out, config, verbose) # Always update the MAIN videos.json (parent of subdir when using low/tiny res) # Downscaled dirs only affect file paths, not JSON metadata updates main_videos_dir = videos_dir_out.parent if (res != "full" and not cache_root) else videos_dir videos_json_path = main_videos_dir / "videos.json" if True: # Always update JSON regardless of proxy mode existing_videos: dict = {} if videos_json_path.exists(): existing_videos = _read_json(videos_json_path) # Get cutout from first narration segment first_seg = narration[segment_ids[0]] cutout = first_seg.cutout or "talkinghead" # Create/update narration_combined entry existing_videos["narration_combined"] = { "source_file": "narration_combined.mov", "cutout": cutout, "always_visible": True, "volume": 1.0, } with open(videos_json_path, "w", encoding="utf-8") as f: json.dump(existing_videos, f, indent=2) print(f"\n Updated videos.json with narration_combined entry (volume=1.0)") print(" Edit videos.json to adjust volume if needed.") print("\nConcatenation complete.") # Automatically transcribe to keep transcript in sync with narration print("\n" + "=" * 60) print("Auto-running transcribe to sync with new narration...") print("=" * 60 + "\n") return cmd_transcribe(project_path, verbose, res=res) # ============================================================================= # Render Command # ============================================================================= def _format_time(seconds: float) -> str: """Format seconds as MM:SS.ms""" if seconds < 0: return "??:??.??" mins = int(seconds // 60) secs = seconds % 60 return f"{mins:02d}:{secs:05.2f}" def _print_render_plan_details(plan, marker_timings, slides: dict) -> None: """ Print a detailed render plan showing each marker with its aligned time. Uses marker_timings from the transformer which contains alignment info. """ from .models import CAMERA_PRESETS print("\n RENDER PLAN:") print(" " + "-" * 76) # Build lookup for video events by video_id video_events_by_id = {} for event in plan.video_events: video_events_by_id[event.video_id] = event audio_events_by_time = {} for event in plan.audio_events: t = round(event.start_time, 1) if t not in audio_events_by_time: audio_events_by_time[t] = [] audio_events_by_time[t].append(event) camera_events_by_time = {} for event in plan.camera_events: t = round(event.time, 1) if t not in camera_events_by_time: camera_events_by_time[t] = [] camera_events_by_time[t].append(event) # Detect slide markers that share a timestamp with the adjacent slide marker. # Two slides at the same time means alignment is ambiguous — treat as an error. slide_timings = [ t for t in marker_timings if t.marker_id in slides and t.timestamp >= 0 ] collision_ids: set[str] = set() for a, b in zip(slide_timings, slide_timings[1:]): if abs(a.timestamp - b.timestamp) < 0.1: collision_ids.add(a.marker_id) collision_ids.add(b.marker_id) # Print each marker timing aligned_count = 0 unaligned_count = 0 collision_count = 0 for timing in marker_timings: marker_id = timing.marker_id context = timing.context if len(context) > 50: context = context[:47] + "..." if timing.timestamp >= 0: time_str = _format_time(timing.timestamp) # Show confidence if fuzzy match conf_str = "" if timing.confidence < 1.0: conf_str = f" ({timing.confidence:.0%})" # Determine marker type for display if marker_id in slides: if marker_id in collision_ids: collision_count += 1 print( f' {marker_id:6} {time_str}{conf_str} COLLISION - same time as adjacent slide - "{context}"' ) else: aligned_count += 1 print(f' {marker_id:6} {time_str}{conf_str} "{context}"') elif any( marker_id.startswith(p) for p in ( "video:", "vft:", "vfb:", "vf2t:", "vf2b:", "vst:", "vsb:", "vftp:", "vfbp:", "vf2tp:", "vf2bp:", "vstp:", "vsbp:", ) ): aligned_count += 1 pfx_len = next( len(p) for p in ( "video:", "vft:", "vfb:", "vf2t:", "vf2b:", "vst:", "vsb:", "vftp:", "vfbp:", "vf2tp:", "vf2bp:", "vstp:", "vsbp:", ) if marker_id.startswith(p) ) video_id = marker_id[pfx_len:] # Find corresponding event by video_id event = video_events_by_id.get(video_id) if event: cutout_name = event.cutout_name end_on = event.video_source.end_on or "next_slide" layer_tag = f" [{event.layer}]" else: cutout_name = "?" end_on = "next_slide" layer_tag = "" cache_ind = " 📁" if video_id in plan.cached_files else "" print( f" {marker_id:20} {time_str} in '{cutout_name}' [{end_on}]{layer_tag}{cache_ind}" ) elif marker_id.startswith("narration:"): aligned_count += 1 video_id = marker_id[10:] cache_ind = " 📁" if video_id in plan.cached_files else "" print(f" {marker_id:20} {time_str} (continuous){cache_ind}") elif marker_id in CAMERA_PRESETS: aligned_count += 1 print(f" {time_str} [{marker_id}]") elif marker_id.startswith("audio:"): aligned_count += 1 print(f" {time_str} [{marker_id}]") else: aligned_count += 1 print(f' {marker_id:6} {time_str} "{context}"') else: unaligned_count += 1 # Check if this is a slide that was interpolated into the plan if marker_id in slides: interp_event = next( (e for e in plan.slide_events if e.slide_id == marker_id), None ) if interp_event: interp_str = _format_time(interp_event.start_time) print(f' {marker_id:6} ~{interp_str} INTERPOLATED - "{context}"') else: print(f' {marker_id:6} ??:??.?? NOT ALIGNED - "{context}"') else: print(f' {marker_id:6} ??:??.?? NOT ALIGNED - "{context}"') print(" " + "-" * 76) # Summary total_markers = len(marker_timings) slide_markers = [t for t in marker_timings if t.marker_id in slides] good_slides = len( [ t for t in slide_markers if t.timestamp >= 0 and t.marker_id not in collision_ids ] ) total_slides = len(slide_markers) issues = [] if unaligned_count: issues.append(f"{unaligned_count} UNALIGNED") if collision_count: issues.append(f"{collision_count} COLLISION") status = "OK" if not issues else ", ".join(issues) print(f" Markers: {aligned_count}/{total_markers} aligned ({status})") print(f" Slides: {good_slides}/{total_slides}") print( f" Videos: {len(plan.video_events)} triggered, {len(plan.narration_videos)} always-visible" ) if plan.outro_events: print(f" Outro: {len(plan.outro_events)} video(s)") for event in plan.outro_events: print( f" - {event.video_id}: {_format_time(event.start_time)} - {_format_time(event.end_time)}" ) print(f" Duration: {_format_time(plan.total_duration)}") def _parse_slide_range(slides_arg: str) -> tuple[str, Optional[str]]: """Parse slide range argument like 'S1:S10' or 'S5:' into a tuple.""" if ":" not in slides_arg: raise ValueError( f"Invalid slide range '{slides_arg}'. Expected format: S1:S10 or S5:" ) parts = slides_arg.split(":", 1) start_slide = parts[0].strip() end_slide = parts[1].strip() if parts[1].strip() else None if not start_slide: raise ValueError( f"Invalid slide range '{slides_arg}'. Start slide is required." ) return start_slide, end_slide def _writeback_video_metadata(plan, project_path, config) -> None: """Write back cutout/layer derived from shorthand markers to videos.json. When a shorthand like [vfb:FARTSection1] is used and FARTSection1 has no 'cutout' set in videos.json, this persists the resolved cutout (and layer if the shorthand implies a non-default layer) back to the file. Once written, subsequent renders read the value directly and no further write-back occurs. """ import json videos_json_path = project_path / config.videos_path if not videos_json_path.exists(): return # Collect field updates per video_id writebacks: dict[str, dict] = {} for event in plan.video_events: video_id = event.video_id source = event.video_source if source.is_shared: continue # shared videos live in their own file updates = {} if source.cutout is None and event.cutout_name: updates["cutout"] = event.cutout_name if event.layer != source.layer: updates["layer"] = event.layer if updates: writebacks.setdefault(video_id, {}).update(updates) if not writebacks: return with open(videos_json_path, "r", encoding="utf-8") as f: raw = json.load(f) changed = False for video_id, updates in writebacks.items(): if video_id not in raw: continue for field, value in updates.items(): if raw[video_id].get(field) != value: raw[video_id][field] = value changed = True if changed: with open(videos_json_path, "w", encoding="utf-8") as f: json.dump(raw, f, indent=2, ensure_ascii=False) written = ", ".join( f"{vid}({', '.join(upd)})" for vid, upd in writebacks.items() ) print(f" Updated videos.json: {written}") def _chunked_render( project_path: Path, verbose: bool, dry_run: bool, res: str, force: bool, chunk_size: int, slide_ids: list[str], out_dir: Path, final_output: Path, ) -> int: """Render in slide-based chunks then concatenate — avoids filter graph OOM.""" import math # Split slide IDs into groups of chunk_size groups = [slide_ids[i : i + chunk_size] for i in range(0, len(slide_ids), chunk_size)] print( f"\n Auto-chunking: {len(slide_ids)} slides → {len(groups)} chunks of ≤{chunk_size}" ) chunks_dir = out_dir / "chunks" chunks_dir.mkdir(parents=True, exist_ok=True) chunk_paths: list[Path] = [] for i, group in enumerate(groups): start = group[0] end = groups[i + 1][0] if i + 1 < len(groups) else None slides_arg = f"{start}:{end}" if end else f"{start}:" chunk_path = chunks_dir / f"chunk_{i+1:03d}_{start}-{end or 'end'}.mp4" print(f"\n {'='*56}") print(f" Chunk {i+1}/{len(groups)}: {slides_arg} → {chunk_path.name}") print(f" {'='*56}") result = cmd_render( project_path, verbose, dry_run, slides_arg=slides_arg, res=res, force=force, _output_path_override=chunk_path, ) if result != 0: print(f"\n Chunk {i+1} failed — aborting.", file=sys.stderr) return result chunk_paths.append(chunk_path) if dry_run: print(f"\n [dry-run] Would concatenate {len(chunk_paths)} chunks → {final_output}") return 0 # Concatenate chunks print(f"\n Concatenating {len(chunk_paths)} chunks → {final_output.name}...") concat_list = chunks_dir / "concat.txt" with open(concat_list, "w") as f: for p in chunk_paths: f.write(f"file '{p.resolve()}'\n") concat_cmd = [ "ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", str(concat_list), "-c", "copy", str(final_output), ] result = subprocess.run(concat_cmd, capture_output=True, text=True) if result.returncode != 0: print(f" Concatenation failed:\n{result.stderr}", file=sys.stderr) return 1 # Clean up chunk files for p in chunk_paths: p.unlink(missing_ok=True) concat_list.unlink(missing_ok=True) try: chunks_dir.rmdir() except OSError: pass print(f" Output: {final_output}") return 0 def cmd_render( project_path: Path, verbose: bool, dry_run: bool, slides_arg: str = None, res: str = "full", force: bool = False, chunk_slides: int = 0, _output_path_override: Path = None, ) -> int: """Render final video.""" from .parser import ( parse_audio, parse_manuscript, parse_project_config, parse_slides, parse_videos, save_citations, ) from .transcriber import load_transcript from .validator import validate_project from .transformer import build_render_plan from .renderer import render, generate_ffmpeg_command_string from .preprocessor import RES_CONFIGS, ensure_downscaled_files_exist # Parse slide range if provided slide_range = None if slides_arg: slide_range = _parse_slide_range(slides_arg) print(f"Rendering: {project_path.name} (slides {slides_arg})") else: print(f"Rendering: {project_path.name}") # Show resolution mode if res != "full": cfg = RES_CONFIGS[res] print(f" Resolution: {res.upper()} ({cfg[0]}x{cfg[1]})") # Show cache status cache_info = get_cache_info() if cache_info: print(f" Cache: {cache_info}") # Stage 1: Parse print("\n[1/4] Parsing...") manuscript_text, markers, malformed, citations = parse_manuscript(project_path) # Save citations for later use (e.g., description generation) if citations: citations_path = project_path / "citations.json" save_citations(citations, citations_path) config = parse_project_config(project_path) # Override resolution for preview modes if res != "full": cfg = RES_CONFIGS[res] config.resolution = (cfg[0], cfg[1]) slides = parse_slides(project_path, config) videos, videos_dir = parse_videos(project_path, config) # Non-full res: use downscaled video directory, create on-the-fly if needed if res != "full": # Skip downscaling sources that have a preprocessed output_file — the # renderer will use the full-res processed version instead, saving disk space. sources_with_output = {v.source_file for v in videos.values() if v.output_file} videos_dir = ensure_downscaled_files_exist( videos_dir, res, force=False, verbose=verbose, skip_sources=sources_with_output, ) if verbose: print(f" Using {res} dir: {videos_dir}") audio, audio_dir = parse_audio(project_path, config) # Load whisper transcription JSON # Check for narration_combined in videos.json (new workflow) or multi-segment in config (legacy) combined_path = videos_dir / "narration_combined.mov" resolved_combined = _resolve_narration_combined(project_path, videos_dir, config) if resolved_combined and resolved_combined != combined_path: # File lives on external disk — point the VideoSource at the absolute path so # the renderer doesn't re-resolve it via the local (missing) videos_dir. if "narration_combined" in videos: videos["narration_combined"].source_file = str(resolved_combined) if "narration_combined" in videos and resolved_combined and resolved_combined.exists(): # New workflow: narration_combined was created by 'gnommo concat' and is in videos.json # This entry has the correct volume setting from videos.json transcript_path = resolved_combined.with_suffix(".transcript.json") config.main_video = "narration_combined" if verbose: print( f" Using combined narration: {resolved_combined.name} (volume={videos['narration_combined'].volume})" ) elif isinstance(config.main_video, list) and len(config.main_video) > 1: # Legacy: Multi-segment narration with main_video array in project.json resolved_combined, _ = resolve_with_cache(combined_path, project_path) transcript_path = resolved_combined.with_suffix(".transcript.json") if not resolved_combined.exists(): print( f"Error: Combined narration not found: {combined_path}", file=sys.stderr ) print( "Run 'gnommo -p concat' first to concatenate segments.", file=sys.stderr, ) return 1 # Create a synthetic video entry for the combined narration # Inherit settings from the first segment first_segment_id = config.main_video[0] if first_segment_id in videos: first_segment = videos[first_segment_id] from .models import VideoSource combined_video = VideoSource( source_file="narration_combined.mov", filter=first_segment.filter, output_file=None, # Already processed cutout=first_segment.cutout, always_visible=True, skip=0.0, # Already trimmed during concatenation take=None, ) videos["_narration_combined"] = combined_video config.main_video = "_narration_combined" if verbose: print(f" Using combined narration: {combined_path.name}") else: # Check if narration.json exists with segments (new workflow) - if so, require narration_combined narration_json = project_path / "media" / "narration" / "narration.json" if narration_json.exists() and _read_json(narration_json): print( f"Error: narration_combined not found in videos.json", file=sys.stderr ) print( f"You have narration segments in narration.json but haven't stitched them.", file=sys.stderr, ) print( f"Run 'gnommo -p {project_path.name} stitch' first.", file=sys.stderr, ) return 1 # Single video - look for .transcript.json next to the narration video result = _find_narration_video(config, videos) if result: video_id, narration_source = result config.main_video = video_id # Ensure main_video is set to the found video video_path = videos_dir / narration_source.source_file transcript_path = video_path.with_suffix(".transcript.json") else: transcript_path = project_path / "transcript.json" # Try cache fallback for transcript transcript_path, _ = resolve_with_cache(transcript_path, project_path) if not transcript_path.exists(): print(f"Error: Transcription not found: {transcript_path}", file=sys.stderr) print(f"Run 'gnommo -p {project_path.name} transcribe' first.", file=sys.stderr) return 1 transcription = load_transcript(transcript_path, project_path) if verbose: print(f" - Markers in manuscript: {len(markers)}") print(f" - Slides defined: {len(slides)}") print(f" - Audio clips: {len(audio)}") print(f" - Transcription words: {len(transcription)}") # Stage 2: Validate print("\n[2/4] Validating...") warnings = validate_project( project_path, markers, config, slides, videos, videos_dir, malformed ) for w in warnings: print(f" Warning: {w}") print(" Passed.") # Stage 3: Transform (includes on-the-fly alignment) print("\n[3/4] Building render plan...") plan, marker_timings = build_render_plan( project_path, config, slides, videos, videos_dir, manuscript_text, transcription, audio, audio_dir, slide_range=slide_range, ) if plan.time_offset > 0: print(f" Time offset: {plan.time_offset:.1f}s (partial render)") # Persist shorthand-derived cutout/layer back to videos.json (idempotent) _writeback_video_metadata(plan, project_path, config) # Print detailed render plan with alignment info _print_render_plan_details(plan, marker_timings, slides) if plan.audio_events: print(f"\n Audio effects:") for event in plan.audio_events: loop_str = " (loop)" if event.audio_def.loop else "" pause_str = " [ignores pauses]" if event.audio_def.ignore_pauses else "" print( f" - {event.audio_id}: '{event.audio_def.file}' @ {_format_time(event.start_time)}{loop_str}{pause_str}" ) # Show always-visible videos if plan.narration_videos: print(f"\n Always-visible videos:") for video_id, video_source, cutout in plan.narration_videos: skip_str = ( f" (skip: {video_source.skip:.1f}s)" if video_source.skip > 0 else "" ) cache_ind = " 📁" if video_id in plan.cached_files else "" print(f" - {video_id} in '{video_source.cutout}'{skip_str}{cache_ind}") # Show narration pauses if plan.narration_pauses: print(f"\n Narration pauses:") for pause in plan.narration_pauses: print( f" - {pause.video_id} at {_format_time(pause.output_time)} " f"for {pause.duration:.1f}s (narration freezes at {_format_time(pause.narration_time)})" ) # Write tasks file with both missing assets and alignment issues missing_videos = _collect_missing_video_markers(markers, videos) slide_timings_for_collision = [ t for t in marker_timings if t.marker_id in slides and t.timestamp >= 0 ] collision_ids_render = set() for _a, _b in zip(slide_timings_for_collision, slide_timings_for_collision[1:]): if abs(_a.timestamp - _b.timestamp) < 0.1: collision_ids_render.add(_a.marker_id) collision_ids_render.add(_b.marker_id) alignment_issues = [ (t.marker_id, t.context) for t in marker_timings if t.marker_id in slides and (t.timestamp < 0 or t.marker_id in collision_ids_render) ] _write_tasks_file(project_path, missing_videos, alignment_issues) # Check for unaligned markers unaligned = [t for t in marker_timings if t.timestamp < 0] if unaligned: print(f"\n WARNING: {len(unaligned)} marker(s) could not be aligned!") for t in unaligned: print(f' [{t.marker_id}] - "{t.context}"') if not force: print(f"\n Run with -f/--force to render anyway.") return 1 else: print(f"\n Continuing anyway due to --force flag...") # Stage 4: Render # Determine output filename and directory if _output_path_override: output_path = _output_path_override out_dir = output_path.parent out_filename = output_path.name elif config.output_video: out_filename = config.output_video out_dir = project_path / "out" / res if res != "full" else project_path / "out" output_path = out_dir / out_filename elif slide_range: start, end = slide_range range_suffix = f"_{start}-{end}" if end else f"_{start}-end" out_filename = f"final{range_suffix}.mp4" out_dir = project_path / "out" / res if res != "full" else project_path / "out" output_path = out_dir / out_filename else: out_filename = f"{config.co}.mp4" out_dir = project_path / "out" / res if res != "full" else project_path / "out" output_path = out_dir / out_filename # Check if chunked rendering is needed (avoids filter graph OOM on long videos) from .cache import get_render_chunk_size _chunk_size = chunk_slides or get_render_chunk_size() or 0 _slide_ids = [e.slide_id for e in plan.slide_events] if _chunk_size > 0 and not slide_range and len(_slide_ids) > _chunk_size: return _chunked_render( project_path, verbose, dry_run, res, force, _chunk_size, _slide_ids, out_dir, output_path, ) plan.output_path = output_path if dry_run: print("\n[4/4] FFmpeg command (dry run):") print(generate_ffmpeg_command_string(plan, output_path)) return 0 print("\n[4/4] Rendering...") render(plan, output_path, verbose=verbose) print(f" Output: {output_path}") print("\nDone.") return 0 # ============================================================================= # Transcribe Command # ============================================================================= def _find_narration_video(config, videos: dict) -> Optional[tuple[str, "VideoSource"]]: """ Find the video to use for transcription/narration. Priority: 1. config.audio_source if set 2. First video with always_visible=True 3. First video in dict """ from .models import VideoSource # 1. Check audio_source config if config.audio_source and config.audio_source in videos: return config.audio_source, videos[config.audio_source] # 2. Find always_visible video (main talking head) for video_id, video_source in videos.items(): if video_source.always_visible: return video_id, video_source # 3. Fall back to first video if videos: video_id = next(iter(videos.keys())) return video_id, videos[video_id] return None def cmd_transcribe( project_path: Path, verbose: bool, res: str = "full", final: bool = False ) -> int: """Transcribe video audio using Whisper.""" from .transcriber import transcribe_video, save_transcript, words_to_srt from .parser import parse_project_config, parse_videos from .preprocessor import ensure_downscaled_files_exist config = parse_project_config(project_path) # Handle --final mode: transcribe the rendered output for YouTube captions if final: path = project_path / "out" / f"{config.output_video}.mp4" return _transcribe_final(path, verbose) mode_str = f" ({res.upper()})" if res != "full" else "" print(f"Transcribing: {project_path.name}{mode_str}") videos, videos_dir = parse_videos(project_path, config) if not videos: print("Error: No videos defined in videos.json", file=sys.stderr) return 1 # Non-full res: use downscaled video directory if res != "full": videos_dir = ensure_downscaled_files_exist( videos_dir, res, force=False, verbose=verbose ) # Check for multi-segment narration (concatenated file) if isinstance(config.main_video, list) and len(config.main_video) > 1: video_path = videos_dir / "narration_combined.mov" if not video_path.exists(): print(f"Error: Combined narration not found: {video_path}", file=sys.stderr) print( "Run 'gnommo -p pre' first to concatenate segments.", file=sys.stderr, ) return 1 print(f" Using combined narration: {video_path.name}") else: # Single video - find it using existing logic result = _find_narration_video(config, videos) if not result: print("Error: No suitable video found for transcription", file=sys.stderr) return 1 video_id, video_source = result video_path = videos_dir / video_source.source_file if not video_path.exists() and video_source.source_file == "narration_combined.mov": found = _resolve_narration_combined(project_path, videos_dir, config) if found: video_path = found if not video_path.exists(): video_path, _ = resolve_with_cache(video_path, project_path) if not video_path.exists(): print(f"Error: Video not found: {video_path}", file=sys.stderr) return 1 print(f" Video: {video_path.name}") words = transcribe_video(video_path, model="base") output_path = video_path.with_suffix(".transcript.json") save_transcript(words, output_path) print(f" - Transcribed {len(words)} words") print(f" - Duration: {words[-1].end:.1f}s" if words else " - No words found") print(f" - Saved: {output_path}") if verbose and words: preview = " ".join(w.word for w in words[:10]) print(f" - Preview: {preview}...") return 0 def _transcribe_final(final_video: Path, verbose: bool) -> int: """ Transcribe the final rendered video and generate SRT captions for YouTube. Looks and creates out filename.srt suitable for upload. """ from .transcriber import transcribe_video, save_transcript, words_to_srt print(f"Transcribing final output: {final_video}") if not final_video.exists(): print(f"Error: Final video not found: {final_video}", file=sys.stderr) print("Run 'gnommo render' first.", file=sys.stderr) return 1 print(f" Video: {final_video.name}") # Transcribe with word-level timestamps words = transcribe_video(final_video, model="base") if not words: print("Error: No words transcribed from video", file=sys.stderr) return 1 # Save JSON transcript transcript_path = final_video.with_suffix(".transcript.json") save_transcript(words, transcript_path) # Generate SRT captions srt_path = final_video.with_suffix(".srt") srt_content = words_to_srt(words) srt_path.write_text(srt_content, encoding="utf-8") print(f" - Transcribed {len(words)} words") print(f" - Duration: {words[-1].end:.1f}s") print(f" - Transcript: {transcript_path}") print(f" - Captions: {srt_path}") # Count caption segments caption_count = srt_content.count("\n\n") + 1 print(f" - Caption segments: {caption_count}") if verbose and words: preview = " ".join(w.word for w in words[:15]) print(f" - Preview: {preview}...") print("\nSRT file ready for YouTube upload.") return 0 # ============================================================================= # Align Command # ============================================================================= def cmd_align(project_path: Path, verbose: bool) -> int: """Preview manuscript marker alignment (no files written).""" from .transcriber import load_transcript from .transformer import align_markers_to_transcription from .parser import ( parse_project_config, parse_videos, parse_slides, parse_audio, parse_manuscript, save_citations, ) print(f"Alignment preview: {project_path.name}") print(" (This is a preview - alignment happens automatically during render)") # Load manuscript (cites are stripped at parse time) manuscript_text, _, _, citations = parse_manuscript(project_path) # Save citations for later use (e.g., description generation) if citations: citations_path = project_path / "citations.json" save_citations(citations, citations_path) # Load project config and resources config = parse_project_config(project_path) slides = parse_slides(project_path, config) videos, videos_dir = parse_videos(project_path, config) audio, _ = parse_audio(project_path, config) # Find transcription (from narration video) result = _find_narration_video(config, videos) if not result: print("Error: No suitable video found for transcription", file=sys.stderr) return 1 video_id, video_source = result video_path = videos_dir / video_source.source_file transcript_path = video_path.with_suffix(".transcript.json") # Try cache fallback for transcript transcript_path, _ = resolve_with_cache(transcript_path, project_path) if not transcript_path.exists(): print(f"Error: Transcription not found: {transcript_path}", file=sys.stderr) print(f"Run 'gnommo -p {project_path.name} transcribe' first.", file=sys.stderr) return 1 print(f" Loading: {transcript_path.name}") transcription = load_transcript(transcript_path, project_path) print(f" - {len(transcription)} words") # Align (cite markers already stripped at parse time) print("\n Aligning markers to transcription...") timings = align_markers_to_transcription( manuscript_text, transcription, slides=slides, videos=videos, audio=audio ) # Report alignment results unmatched = 0 fuzzy_matched = 0 exact_matched = 0 for t in timings: if t.timestamp >= 0: if t.confidence >= 1.0: exact_matched += 1 if verbose: print(f" [{t.marker_id}] @ {_format_time(t.timestamp)}") else: fuzzy_matched += 1 # Always show fuzzy matches so user can verify print( f" [{t.marker_id}] @ {_format_time(t.timestamp)} (fuzzy {t.confidence:.0%})" ) else: print(f' [{t.marker_id}] NOT FOUND - "{t.context}"') unmatched += 1 # Summary total = len(timings) print(f"\n Alignment summary:") print(f" - Exact matches: {exact_matched}/{total}") if fuzzy_matched > 0: print(f" - Fuzzy matches (60%+ words): {fuzzy_matched}/{total}") if unmatched > 0: print(f" - NOT FOUND: {unmatched}/{total}") print( f"\n Some markers could not be aligned. Check manuscript.txt matches the spoken audio." ) return 0 # ============================================================================= # All Command (Full Pipeline) # ============================================================================= def _files_modified_since(root: Path, since: float, pattern: str) -> bool: """Return True if any file matching pattern under root has mtime > since.""" try: for p in root.rglob(pattern): if p.is_file() and p.stat().st_mtime > since: return True except (OSError, PermissionError): pass return False def cmd_all( project_path: Path, verbose: bool, dry_run: bool, res: str = "full", force: bool = False, ) -> int: """Run full pipeline: down → import → preprocess → trim → stitch → render → push → handoff → up. Cascade rule: if any stage produces output, all subsequent stages are forced to re-run (cascade_force=True), regardless of whether --force was passed. This ensures downstream caches are always consistent with upstream changes. """ from .handoff import cmd_handoff from .push import cmd_push print(f"=== Full Pipeline: {project_path.name} ===\n") print(">>> Step 1/9: Download\n") result = cmd_sync(project_path, verbose, dry_run, download=True) if result != 0: return result # cascade_force starts at --force. Once any stage does real work it flips to # True so all downstream stages re-run unconditionally. cascade_force = force print("\n>>> Step 2/9: Import\n") t0 = time.time() result = cmd_import(project_path, cascade_force, verbose) if result != 0: return result if _files_modified_since(project_path, t0, "slides.json") or _files_modified_since( project_path, t0, "narration.json" ): cascade_force = True print("\n>>> Step 3/9: Preprocess\n") t0 = time.time() result = cmd_preprocess( project_path, verbose, dry_run, cascade_force, workers=1, res=res ) if result != 0: return result if _files_modified_since( project_path, t0, "*_processed.mov" ) or _files_modified_since(project_path, t0, "*_processed.webm"): cascade_force = True print("\n>>> Step 4/9: Trim\n") t0 = time.time() result = cmd_trim(project_path, verbose, force=cascade_force, threshold_db=-40.0) if result != 0: return result # Trim modifies narration.json skip/take values; any change invalidates stitch if _files_modified_since(project_path, t0, "narration.json"): cascade_force = True print("\n>>> Step 5/9: Stitch\n") t0 = time.time() result = cmd_stitch(project_path, verbose, cascade_force, res=res) if result != 0: return result if _files_modified_since(project_path, t0, "narration_combined.mov"): cascade_force = True print("\n>>> Step 6/9: Render\n") result = cmd_render(project_path, verbose, dry_run, res=res, force=cascade_force) if result != 0: return result print("\n>>> Step 7/9: Push\n") result = cmd_push(project_path, verbose, force=False, prod=True) if result != 0: return result print("\n>>> Step 8/9: Handoff\n") result = cmd_handoff(project_path, verbose, file_override=None, prod=True, res=res) if result != 0: return result print("\n>>> Step 9/9: Upload\n") return cmd_sync(project_path, verbose, dry_run, download=False) # ============================================================================= # Description Command # ============================================================================= def cmd_description(project_path: Path, verbose: bool) -> int: """Generate YouTube description file with chapters, citations, and attributions.""" from .parser import ( parse_audio, parse_manuscript, parse_project_config, parse_slides, parse_videos, load_citations, ) from .transcriber import load_transcript from .transformer import align_markers_to_transcription from .description import write_description_file print(f"Generating description: {project_path.name}") # Parse all project files manuscript_text, markers, _, _ = parse_manuscript(project_path) # Load citations from file (saved during parse/render/align stages) citations_path = project_path / "citations.json" citations = load_citations(citations_path) config = parse_project_config(project_path) slides = parse_slides(project_path, config) videos, videos_dir = parse_videos(project_path, config) audio, _ = parse_audio(project_path, config) # Load transcription for alignment (optional but recommended) transcription = None result = _find_narration_video(config, videos) if result: _, narration_source = result video_path = videos_dir / narration_source.source_file transcript_path = video_path.with_suffix(".transcript.json") # Try cache fallback for transcript transcript_path, _ = resolve_with_cache(transcript_path, project_path) if transcript_path.exists(): transcription = load_transcript(transcript_path, project_path) if verbose: print(f" Loaded transcription: {len(transcription)} words") else: print(f" Warning: No transcription found at {transcript_path}") print( f" Run 'gnommo -p {project_path.name} transcribe' for better timestamps." ) # Align markers to get timings print(" Aligning markers...") marker_timings = align_markers_to_transcription( manuscript_text, transcription or [], slides=slides, videos=videos, audio=audio, ) if verbose: aligned = sum(1 for t in marker_timings if t.timestamp >= 0) print(f" Aligned {aligned}/{len(marker_timings)} markers") # Generate description output_path = project_path / "out" / "description_youtube.txt" description = write_description_file( output_path=output_path, config=config, manuscript_text=manuscript_text, slides=slides, videos=videos, marker_timings=marker_timings, transcription=transcription, citations=citations, ) # Print summary lines = description.split("\n") print(f"\n Output: {output_path}") print(f" Length: {len(description)} characters, {len(lines)} lines") # Show sections found sections = [] if config.description: sections.append("description") if "CHAPTERS" in description: sections.append("chapters") if "REFERENCES" in description: sections.append("references") if "STOCK FOOTAGE" in description: sections.append("attributions") if config.footer: sections.append("footer") print(f" Sections: {', '.join(sections)}") if verbose: print("\n --- Preview ---") preview_lines = lines[:20] for line in preview_lines: print(f" {line}") if len(lines) > 20: print(f" ... ({len(lines) - 20} more lines)") print("\nDone.") return 0 # Files and directories excluded from all sync/archive/load operations. # Covers intermediate processing artifacts, chunk scratch dirs, venv, and # common OS/editor noise. _RSYNC_EXCLUDES = [ # Intermediate processing files "media/narration/intermediate/", "media/narration/intermediate/**", "media/videos/intermediate/", "media/videos/intermediate/**", "media/narration/processed/", "media/narration/processed/**", "media/videos/narration_combined.mov", # Low-res preview files (generated locally, not synced) "media/narration/low/", "media/narration/low/**", "media/videos/low/", "media/videos/low/**", # Chunk scratch directories "**/chunks/", "**/chunks/**", # Python "*.py", "__pycache__/", "venv/", # Version control / OS noise ".git/", ".DS_Store", "*.tmp", ] def cmd_archive(project_path: Path, verbose: bool, dry_run: bool) -> int: """Archive project files to external cache storage.""" from .cache import load_cache_config print(f"Archiving: {project_path.name}") # Check cache is configured cache_base = load_cache_config() if cache_base is None: print("Error: Cache not configured. Create ~/.gnommo.conf with:") print(" [cache]") print(" path = /Volumes/YourDisk/gnommo") return 1 if not cache_base.exists(): print(f"Error: Cache path not accessible: {cache_base}") print("Make sure the external drive is connected.") return 1 # Build destination path dest_path = cache_base / project_path.name print(f" Source: {project_path}") print(f" Destination: {dest_path}") # Create destination if needed if not dry_run: dest_path.mkdir(parents=True, exist_ok=True) rsync_cmd = [ "rsync", "-av", "--progress", *[f"--exclude={p}" for p in _RSYNC_EXCLUDES], f"{project_path}/", f"{dest_path}/", ] if dry_run: rsync_cmd.insert(1, "--dry-run") print("\n [DRY RUN] Would execute:") print(f" {' '.join(rsync_cmd)}") else: print("\n Syncing files...") if verbose: print(f" Command: {' '.join(rsync_cmd)}") result = subprocess.run(rsync_cmd) if result.returncode != 0: print(f"Error: rsync failed with code {result.returncode}") return 1 # Update project.json with synced_time if not dry_run: project_json_path = project_path / "project.json" if project_json_path.exists(): try: data = _read_json(project_json_path.read_text(encoding="utf-8")) data["synced_time"] = datetime.now().isoformat() project_json_path.write_text( json.dumps(data, indent=2, ensure_ascii=False) + "\n", encoding="utf-8", ) print( f"\n Updated project.json with synced_time: {data['synced_time']}" ) except (json.JSONDecodeError, IOError) as e: print(f"Warning: Could not update project.json: {e}") print("\nDone.") return 0 def cmd_load(project_path: Path, verbose: bool, dry_run: bool) -> int: """Load project files from external cache storage onto the local drive.""" from .cache import load_cache_config print(f"Loading: {project_path.name}") # Check cache is configured cache_base = load_cache_config() if cache_base is None: print("Error: Cache not configured. Create ~/.gnommo.conf with:") print(" [cache]") print(" path = /Volumes/YourDisk/gnommo") return 1 if not cache_base.exists(): print(f"Error: Cache path not accessible: {cache_base}") print("Make sure the external drive is connected.") return 1 # Build source path on the external drive src_path = cache_base / project_path.name if not src_path.exists(): print(f"Error: Project not found on external drive: {src_path}") return 1 print(f" Source: {src_path}") print(f" Destination: {project_path}") # Create destination if needed if not dry_run: project_path.mkdir(parents=True, exist_ok=True) rsync_cmd = [ "rsync", "-av", "--progress", *[f"--exclude={p}" for p in _RSYNC_EXCLUDES], f"{src_path}/", f"{project_path}/", ] if dry_run: rsync_cmd.insert(1, "--dry-run") print("\n [DRY RUN] Would execute:") print(f" {' '.join(rsync_cmd)}") else: print("\n Copying files...") if verbose: print(f" Command: {' '.join(rsync_cmd)}") result = subprocess.run(rsync_cmd) if result.returncode != 0: print(f"Error: rsync failed with code {result.returncode}") return 1 print("\nDone.") return 0 def cmd_sync(project_path: Path, verbose: bool, dry_run: bool, download: bool) -> int: """Sync project files to/from the remote server via rsync over SSH.""" from .cache import load_server_config server = load_server_config() if server is None: print("Error: Server not configured. Add to ~/.gnommo.conf:") print(" [server]") print(" host = 76.13.144.52") print(" user = root") print(" path = /gnommo/project") return 1 direction = "Downloading from" if download else "Uploading to" print(f"{direction} server: {project_path.name}") remote = f"{server['user']}@{server['host']}:{server['path']}/{project_path.name}/" local = f"{project_path}/" if download: src, dest = remote, local else: src, dest = local, remote print(f" Source: {src}") print(f" Destination: {dest}") # Ensure destination directory exists if not dry_run: if download: project_path.mkdir(parents=True, exist_ok=True) else: remote_dir = f"{server['path']}/{project_path.name}" ssh_cmd = [ "ssh", "-p", server["port"], f"{server['user']}@{server['host']}", f"mkdir -p {remote_dir}", ] if verbose: print(f" Creating remote dir: {remote_dir}") result = subprocess.run(ssh_cmd) if result.returncode != 0: print(f"Error: could not create remote directory {remote_dir}") return 1 rsync_cmd = [ "rsync", "-av", "--progress", "-e", f"ssh -p {server['port']}", *[f"--exclude={p}" for p in _RSYNC_EXCLUDES], # On upload: delete server-side files that no longer exist locally so # the remote stays an exact mirror of the local project. *(["--delete"] if not download else []), src, dest, ] if dry_run: rsync_cmd.insert(1, "--dry-run") print("\n [DRY RUN] Would execute:") print(f" {' '.join(rsync_cmd)}") else: print("\n Syncing files...") if verbose: print(f" Command: {' '.join(rsync_cmd)}") result = subprocess.run(rsync_cmd) if result.returncode != 0: print(f"Error: rsync failed with code {result.returncode}") return 1 print("\nDone.") return 0 # ============================================================================= # Extract Audio Command # ============================================================================= def _extract_audio_file( source_path: Path, output_dir: Path, name: str, channel: str, verbose: bool, ) -> int: """ Extract audio from a single video file to WAV. Args: source_path: Path to the source video file output_dir: Directory to save the WAV file name: Base name for the output file (without extension) channel: "left", "right", or "both" verbose: Print verbose output Returns: 0 on success, 1 on error """ # Build output filename if channel == "both": output_name = f"{name}.wav" else: output_name = f"{name}_{channel}.wav" output_path = output_dir / output_name print(f" Channel: {channel}") print(f" Source: {source_path}") print(f" Output: {output_path}") # Build ffmpeg command cmd = [ "ffmpeg", "-y", # Overwrite "-i", str(source_path), "-vn", # No video ] # Channel selection if channel == "left": cmd.extend(["-af", "pan=mono|c0=c0"]) elif channel == "right": cmd.extend(["-af", "pan=mono|c0=c1"]) # "both" keeps stereo, no filter needed # Output format: 48kHz 16-bit WAV (standard for audio editing) cmd.extend( [ "-ar", "48000", # 48kHz sample rate "-acodec", "pcm_s16le", # 16-bit PCM str(output_path), ] ) if verbose: print(f" Command: {' '.join(cmd)}") print(f" Extracting...", end=" ", flush=True) result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: print(f"Error!") print(f" {result.stderr}", file=sys.stderr) return 1 # Get duration info duration_cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", str(output_path), ] duration_result = subprocess.run(duration_cmd, capture_output=True, text=True) duration_str = "" if duration_result.returncode == 0: try: duration = float(duration_result.stdout.strip()) duration_str = f" ({duration:.1f}s)" except ValueError: pass print(f"Done{duration_str}") print(f"\n Open in Audition to experiment with:") print(f" - Effect > Noise Reduction") print(f" - Effect > Compressor") print(f" - Effect > Filter Curve EQ") print(f" - Effect > Loudness Normalization") print( f"\n Once you find good settings, update narration.json with matching filter config." ) return 0 def cmd_extract_audio( project_path: Path, verbose: bool, segment: Optional[str] = None, channel: str = "both", combined: bool = False, ) -> int: """ Extract audio from narration segments to WAV files for editing in Audacity. This allows you to experiment with audio processing settings (EQ, compression, noise reduction) in external software before applying them in the pipeline. Args: project_path: Path to the project directory verbose: Enable verbose output segment: Specific segment ID to extract, or None for all segments channel: Which channel(s) to extract: "left", "right", or "both" combined: If True, extract from narration_combined.mov instead of segments """ from .parser import parse_project_config, parse_narration, parse_videos print(f"Extracting audio: {project_path.name}") config = parse_project_config(project_path) # Handle --combined mode: extract from narration_combined.mov if combined: videos, videos_dir = parse_videos(project_path, config) combined_path = _resolve_narration_combined(project_path, videos_dir, config) or (videos_dir / "narration_combined.mov") if not combined_path.exists(): print( f"Error: narration_combined.mov not found at {combined_path}", file=sys.stderr, ) print("Run 'gnommo -p stitch' first.", file=sys.stderr) return 1 # Output to project out/ directory audio_dir = project_path / "out" audio_dir.mkdir(parents=True, exist_ok=True) return _extract_audio_file( combined_path, audio_dir, "narration_combined", channel, verbose ) # Normal mode: extract from individual segments narration, narration_dir = parse_narration(project_path, config) if not narration: print(" No narration segments found in media/narration/narration.json") print(" Run 'gnommo -p import' first to populate narration.json") return 1 # Create output directory audio_dir = narration_dir / "audio" audio_dir.mkdir(parents=True, exist_ok=True) # Determine which segments to process if segment: if segment not in narration: print( f"Error: Segment '{segment}' not found in narration.json", file=sys.stderr, ) print( f"Available segments: {', '.join(sorted(narration.keys()))}", file=sys.stderr, ) return 1 segments_to_process = [(segment, narration[segment])] else: segments_to_process = sorted(narration.items()) print(f" Channel: {channel}") print(f" Output: {audio_dir}/") print(f" Segments: {len(segments_to_process)}") # Process each segment for segment_id, segment_source in segments_to_process: source_path = narration_dir / segment_source.source_file if not source_path.exists(): print(f" Warning: Source not found: {source_path.name}, skipping") continue # Build output filename if channel == "both": output_name = f"{segment_id}.wav" else: output_name = f"{segment_id}_{channel}.wav" output_path = audio_dir / output_name print(f"\n {segment_id}:") print(f" Source: {source_path.name}") print(f" Output: {output_name}") # Build ffmpeg command cmd = [ "ffmpeg", "-y", # Overwrite "-i", str(source_path), "-vn", # No video ] # Channel selection if channel == "left": cmd.extend(["-af", "pan=mono|c0=c0"]) elif channel == "right": cmd.extend(["-af", "pan=mono|c0=c1"]) # "both" keeps stereo, no filter needed # Output format: 48kHz 16-bit WAV (standard for audio editing) cmd.extend( [ "-ar", "48000", # 48kHz sample rate "-acodec", "pcm_s16le", # 16-bit PCM str(output_path), ] ) if verbose: print(f" Command: {' '.join(cmd)}") result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: print(f" Error: {result.stderr}", file=sys.stderr) return 1 # Get duration info duration_cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", str(output_path), ] duration_result = subprocess.run(duration_cmd, capture_output=True, text=True) if duration_result.returncode == 0: try: duration = float(duration_result.stdout.strip()) print(f" Duration: {duration:.1f}s") except ValueError: pass print(f" Done") print(f"\n Audio files saved to: {audio_dir}") print(f"\n Open in Audacity to experiment with:") print(f" - Effect > Noise Reduction") print(f" - Effect > Compressor") print(f" - Effect > Filter Curve EQ") print(f" - Effect > Loudness Normalization") print( f"\n Once you find good settings, update narration.json with matching filter config." ) return 0 # ============================================================================= # Master Command (A/B audio comparison) # ============================================================================= def cmd_master( project_path: Path, verbose: bool, channel: str = "both", ) -> int: """ Extract raw and processed audio from narration_combined for A/B comparison. Outputs: out/narration_combined.wav - Raw audio (no processing) out/narration_combined_processed.wav - With audio filters applied This lets you compare the effect of your audio processing chain. """ from .parser import parse_project_config, parse_videos from .preprocessor import parse_audio_normalize_config print(f"Audio mastering: {project_path.name}") config = parse_project_config(project_path) videos, videos_dir = parse_videos(project_path, config) # Find narration_combined.mov combined_path = _resolve_narration_combined(project_path, videos_dir, config) or (videos_dir / "narration_combined.mov") if not combined_path.exists(): print( f"Error: narration_combined.mov not found at {combined_path}", file=sys.stderr, ) print("Run 'gnommo -p stitch' first.", file=sys.stderr) return 1 # Output directory out_dir = project_path / "out" out_dir.mkdir(parents=True, exist_ok=True) raw_output = out_dir / "narration_combined.wav" processed_output = out_dir / "narration_combined_processed.wav" # Find audio_normalize config from default_filters audio_config = None if config.default_filters: for preset_name, filters in config.default_filters.items(): for f in filters: if f.get("type") == "audio_normalize": audio_config = f print(f" Using audio config from: default_filters.{preset_name}") break if audio_config: break if not audio_config: print(" Warning: No audio_normalize filter found in default_filters") print(" Will only extract raw audio.") # Build channel filter channel_filter = "" if channel == "left": channel_filter = "pan=mono|c0=c0," elif channel == "right": channel_filter = "pan=mono|c0=c1," # Step 1: Extract raw audio print(f"\n Extracting raw audio...") raw_cmd = [ "ffmpeg", "-y", "-i", str(combined_path), "-vn", ] if channel_filter: raw_cmd.extend(["-af", channel_filter.rstrip(",")]) raw_cmd.extend( [ "-ar", "48000", "-acodec", "pcm_s16le", str(raw_output), ] ) if verbose: print(f" Command: {' '.join(raw_cmd)}") result = subprocess.run(raw_cmd, capture_output=True, text=True) if result.returncode != 0: print(f" Error extracting raw audio: {result.stderr}", file=sys.stderr) return 1 print(f" Saved: {raw_output.name}") # Step 2: Extract processed audio (if we have config) if audio_config: print(f"\n Applying audio filters...") cfg = parse_audio_normalize_config(audio_config) # Build filter chain (same order as apply_audio_normalize) audio_filters = [] # Channel mapping if channel_filter: audio_filters.append(channel_filter.rstrip(",")) # EQ bands for band in cfg.eq_bands: if band.type == "lowshelf": audio_filters.append( f"lowshelf=f={band.freq:.1f}:g={band.gain:.1f}:t=q:w={band.q:.2f}" ) elif band.type == "highshelf": audio_filters.append( f"highshelf=f={band.freq:.1f}:g={band.gain:.1f}:t=q:w={band.q:.2f}" ) else: audio_filters.append( f"equalizer=f={band.freq:.1f}:width_type=q:width={band.q:.2f}:g={band.gain:.1f}" ) # High-pass if cfg.highpass > 0: audio_filters.append(f"highpass=f={cfg.highpass:.1f}") # Low-pass if cfg.lowpass > 0: audio_filters.append(f"lowpass=f={cfg.lowpass:.1f}") # Room EQ if cfg.room_eq: audio_filters.append( f"equalizer=f={cfg.room_eq_freq:.1f}:width_type=q:width={cfg.room_eq_width:.2f}:g={cfg.room_eq_gain:.1f}" ) # Denoise if cfg.denoise: audio_filters.append(f"afftdn=nf={cfg.noise_floor:.1f}") # Gate if cfg.gate: audio_filters.append( f"agate=threshold={cfg.gate_threshold:.1f}dB" f":range={cfg.gate_range:.1f}dB" f":attack={cfg.gate_attack:.1f}" f":release={cfg.gate_release:.1f}" ) # Compressor if cfg.compress: audio_filters.append( f"acompressor=threshold={cfg.threshold:.1f}dB" f":ratio={cfg.ratio:.1f}" f":attack={cfg.attack:.1f}" f":release={cfg.release:.1f}" f":makeup={cfg.makeup:.1f}dB" ) # Loudness normalization if cfg.normalize: audio_filters.append( f"loudnorm=I={cfg.target_lufs:.1f}" f":LRA={cfg.target_lra:.1f}" f":TP={cfg.target_tp:.1f}" ) filter_chain = ",".join(audio_filters) if verbose: print(f" Filter chain: {filter_chain}") # Print filter summary print(f" Filters applied:") if cfg.eq_bands: print(f" - EQ: {len(cfg.eq_bands)} bands") if cfg.highpass > 0: print(f" - Highpass: {cfg.highpass}Hz") if cfg.denoise: print(f" - Denoise: floor={cfg.noise_floor}dB") if cfg.gate: print(f" - Gate: threshold={cfg.gate_threshold}dB") if cfg.compress: print(f" - Compressor: ratio={cfg.ratio}:1, attack={cfg.attack}ms") if cfg.normalize: print(f" - Loudnorm: target={cfg.target_lufs} LUFS") processed_cmd = [ "ffmpeg", "-y", "-i", str(combined_path), "-vn", "-af", filter_chain, "-ar", "48000", "-acodec", "pcm_s16le", str(processed_output), ] if verbose: print(f" Command: {' '.join(processed_cmd)}") result = subprocess.run(processed_cmd, capture_output=True, text=True) if result.returncode != 0: print(f" Error applying filters: {result.stderr}", file=sys.stderr) return 1 print(f" Saved: {processed_output.name}") # Get durations def get_duration(path): cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", str(path), ] r = subprocess.run(cmd, capture_output=True, text=True) try: return float(r.stdout.strip()) except: return 0 duration = get_duration(raw_output) print(f"\n Output files ({duration:.1f}s):") print(f" {raw_output}") print(f" {processed_output}") print(f"\n Open both in Audition to A/B compare the processing.") return 0 if __name__ == "__main__": sys.exit(main())