Refactor CLI and add preprocessing pipeline

- New CLI structure: -p project, -a action (required flags)
- Add -i import, -f force, -v verbose, --dry-run, --no-cache options
- Add preprocessor.py with chroma key filter (ProRes 4444 output)
- Support background images from shared_assets folder
- Support video metadata JSON files (talkinghead.json)
- Add validation for preprocessed output before render
- Update gnommo.sh with import command and new CLI interface
- Fix Python 3.9 compatibility (Optional[] instead of | None)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-12 15:45:19 +01:00
parent df900dfd59
commit 93fa820275
9 changed files with 763 additions and 287 deletions
+35 -38
View File
@@ -4,7 +4,9 @@
#
# Usage:
# gnommo.sh -p <project> Render project
# gnommo.sh -p <project> import Generate slides.json from image files
# gnommo.sh -p <project> validate Validate only
# gnommo.sh -p <project> preprocess Apply video preprocessing filters
# gnommo.sh -p <project> transcribe Transcribe video
# gnommo.sh -p <project> align Align markers to transcript
# gnommo.sh -p <project> all Full pipeline: transcribe → align → render
@@ -26,13 +28,16 @@ fi
PROJECT=""
COMMAND="render"
VERBOSE=""
FORCE=""
usage() {
echo "Usage: gnommo.sh -p <project> [command] [options]"
echo ""
echo "Commands:"
echo " render Render video (default)"
echo " import Generate slides.json from image files"
echo " validate Validate project only"
echo " preprocess Apply video preprocessing filters (chroma key, etc.)"
echo " transcribe Transcribe video audio"
echo " align Align manuscript to transcript"
echo " all Full pipeline: transcribe → align → render"
@@ -40,10 +45,13 @@ usage() {
echo "Options:"
echo " -p <dir> Project directory (required)"
echo " -v Verbose output"
echo " -f Force overwrite existing files"
echo " -h Show this help"
echo ""
echo "Examples:"
echo " gnommo.sh -p video1 # Render video1 project"
echo " gnommo.sh -p video1 import # Generate slides.json"
echo " gnommo.sh -p video1 import -f # Force overwrite slides.json"
echo " gnommo.sh -p video1 validate # Validate only"
echo " gnommo.sh -p video1 all # Full pipeline"
exit 0
@@ -56,13 +64,17 @@ while [[ $# -gt 0 ]]; do
shift 2
;;
-v|--verbose)
VERBOSE="--verbose"
VERBOSE="-v"
shift
;;
-f|--force)
FORCE="-f"
shift
;;
-h|--help)
usage
;;
validate|render|transcribe|align|all)
import|validate|render|preprocess|transcribe|align|all)
COMMAND="$1"
shift
;;
@@ -90,64 +102,49 @@ if [[ ! -f "$PROJECT/project.json" ]]; then
exit 1
fi
# Run commands
# Run commands using new CLI interface
run_gnommo() {
"$VENV_PYTHON" -m gnommo "$@"
"$VENV_PYTHON" -m gnommo -p "$PROJECT" -a "$1" $VERBOSE
}
run_gnommo_import() {
"$VENV_PYTHON" -m gnommo -p "$PROJECT" -a validate -i $FORCE $VERBOSE
}
case $COMMAND in
import)
echo "=== Importing assets for $PROJECT ==="
run_gnommo_import
;;
validate)
echo "=== Validating $PROJECT ==="
run_gnommo validate "$PROJECT"
run_gnommo validate
;;
transcribe)
echo "=== Transcribing $PROJECT ==="
VIDEO=$(find "$PROJECT/media" -name "*.mov" -o -name "*.mp4" | head -1)
if [[ -z "$VIDEO" ]]; then
echo "Error: No video file found in $PROJECT/media/"
exit 1
fi
run_gnommo transcribe "$VIDEO"
run_gnommo transcribe
;;
align)
echo "=== Aligning $PROJECT ==="
run_gnommo align "$PROJECT"
run_gnommo align
;;
render)
echo "=== Rendering $PROJECT ==="
run_gnommo render "$PROJECT" $VERBOSE
run_gnommo render
;;
preprocess)
echo "=== Preprocessing $PROJECT ==="
run_gnommo preprocess
;;
all)
echo "=== Full Pipeline: $PROJECT ==="
echo ""
# Step 1: Transcribe
echo ">>> Step 1/3: Transcribe"
VIDEO=$(find "$PROJECT/media" -name "*.mov" -o -name "*.mp4" | grep -v transcript | head -1)
if [[ -z "$VIDEO" ]]; then
echo "Error: No video file found in $PROJECT/media/"
exit 1
fi
TRANSCRIPT="${VIDEO%.*}.transcript.json"
if [[ -f "$TRANSCRIPT" ]]; then
echo " Transcript exists, skipping: $TRANSCRIPT"
else
run_gnommo transcribe "$VIDEO"
fi
echo ""
# Step 2: Align
echo ">>> Step 2/3: Align"
run_gnommo align "$PROJECT"
echo ""
# Step 3: Render
echo ">>> Step 3/3: Render"
run_gnommo render "$PROJECT" $VERBOSE
run_gnommo all
;;
*)
+336 -226
View File
@@ -8,18 +8,11 @@ from pathlib import Path
from . import __version__
from .errors import GnommoError, ParseError, ValidationError, RenderError
from .parser import (
parse_manuscript,
parse_project_config,
parse_slides,
parse_transcript,
parse_videos,
)
from .validator import validate_project
from .transformer import build_render_plan
from .renderer import render, generate_ffmpeg_command_string
from .transcriber import transcribe_video, save_transcript, load_transcript
from .aligner import align_markers, save_aligned_transcript
class NotImplementedException(GnommoError):
"""Feature not yet implemented."""
pass
def main() -> int:
@@ -34,120 +27,79 @@ def main() -> int:
version=f"%(prog)s {__version__}",
)
subparsers = parser.add_subparsers(dest="command", required=True)
# validate command
validate_parser = subparsers.add_parser(
"validate",
help="Validate project without rendering",
# Required arguments
parser.add_argument(
"-p", "--project",
type=str,
required=True,
help="Project name (directory in current folder)",
)
validate_parser.add_argument(
"project",
type=Path,
help="Path to project directory",
parser.add_argument(
"-a", "--action",
type=str,
choices=["validate", "preprocess", "render", "all", "transcribe", "align"],
required=True,
help="Action to perform",
)
# render command
render_parser = subparsers.add_parser(
"render",
help="Render video from project",
# Optional arguments
parser.add_argument(
"-i", "--import",
dest="import_assets",
action="store_true",
help="Import assets and generate metadata JSON files",
)
render_parser.add_argument(
"project",
type=Path,
help="Path to project directory",
)
render_parser.add_argument(
"-o", "--output",
type=Path,
help="Output file path (default: project/out/final.mp4)",
)
render_parser.add_argument(
parser.add_argument(
"-v", "--verbose",
action="store_true",
help="Print FFmpeg command",
help="Verbose output",
)
render_parser.add_argument(
parser.add_argument(
"-f", "--force",
action="store_true",
help="Force destructive changes (overwrite existing files)",
)
parser.add_argument(
"--no-cache",
action="store_true",
help="Force cache break (not implemented)",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Print FFmpeg command without executing",
)
# generate-slides command
gen_slides_parser = subparsers.add_parser(
"generate-slides",
help="Generate slides.json from Keynote export folder",
)
gen_slides_parser.add_argument(
"directory",
type=Path,
help="Path to slides directory (e.g., media/slides/Video1)",
)
gen_slides_parser.add_argument(
"--type",
default="square",
help="Slide type for all slides (default: square)",
)
# transcribe command
transcribe_parser = subparsers.add_parser(
"transcribe",
help="Transcribe video audio using Whisper",
)
transcribe_parser.add_argument(
"video",
type=Path,
help="Path to video file",
)
transcribe_parser.add_argument(
"-o", "--output",
type=Path,
help="Output JSON file (default: <video>.transcript.json)",
)
transcribe_parser.add_argument(
"--model",
default="base",
choices=["tiny", "base", "small", "medium", "large"],
help="Whisper model size (default: base)",
)
# align command
align_parser = subparsers.add_parser(
"align",
help="Align manuscript markers to transcript timestamps",
)
align_parser.add_argument(
"project",
type=Path,
help="Path to project directory",
)
align_parser.add_argument(
"--transcript",
type=Path,
help="Path to transcript JSON (default: media/talking_head.transcript.json)",
)
align_parser.add_argument(
"--offset",
type=float,
default=-1.0,
help="Seconds to offset marker times (default: -1.0)",
help="Show what would be done without executing",
)
args = parser.parse_args()
# Resolve project path
project_path = Path(args.project)
if not project_path.is_absolute():
project_path = Path.cwd() / project_path
try:
if args.command == "validate":
return cmd_validate(args.project)
elif args.command == "render":
output = args.output or (args.project / "out" / "final.mp4")
return cmd_render(args.project, output, args.verbose, args.dry_run)
elif args.command == "generate-slides":
return cmd_generate_slides(args.directory, args.type)
elif args.command == "transcribe":
output = args.output or args.video.with_suffix(".transcript.json")
return cmd_transcribe(args.video, output, args.model)
elif args.command == "align":
return cmd_align(args.project, args.transcript, args.offset)
# Check for --no-cache
if args.no_cache:
raise NotImplementedException("--no-cache is not yet implemented")
# Handle import mode
if args.import_assets:
return cmd_import(project_path, args.force, args.verbose)
# Handle actions
if args.action == "validate":
return cmd_validate(project_path, args.verbose)
elif args.action == "preprocess":
return cmd_preprocess(project_path, args.verbose, args.dry_run)
elif args.action == "render":
return cmd_render(project_path, args.verbose, args.dry_run)
elif args.action == "transcribe":
return cmd_transcribe(project_path, args.verbose)
elif args.action == "align":
return cmd_align(project_path, args.verbose)
elif args.action == "all":
return cmd_all(project_path, args.verbose, args.dry_run)
except GnommoError as e:
print(f"Error: {e}", file=sys.stderr)
return 1
@@ -158,9 +110,109 @@ def main() -> int:
return 0
def cmd_validate(project_path: Path) -> int:
"""Run validation only."""
print(f"Validating project: {project_path}")
# =============================================================================
# Import Command
# =============================================================================
def cmd_import(project_path: Path, force: bool, verbose: bool) -> int:
"""Import assets and generate metadata JSON files."""
print(f"Importing assets for: {project_path.name}")
if not project_path.exists():
print(f"Error: Project directory not found: {project_path}", file=sys.stderr)
return 1
# Check for existing files that would be overwritten
slides_base = project_path / "media" / "slides"
slides_dirs = [d for d in slides_base.glob("*/") if d.is_dir()] if slides_base.exists() else []
videos_json = project_path / "videos.json"
files_to_create = []
# Check for slide directories to import
for slides_dir in slides_dirs:
slides_json = slides_dir / "slides.json"
if slides_json.exists() and not force:
print(f"Warning: {slides_json} already exists. Use -f to overwrite.")
return 1
files_to_create.append(("slides", slides_dir))
if not force and files_to_create:
print("\nThe following files will be created/overwritten:")
for ftype, fpath in files_to_create:
print(f" - {fpath}/slides.json")
print("\nUse -f/--force to proceed.")
return 1
# Generate slides.json for each directory
for ftype, slides_dir in files_to_create:
if ftype == "slides":
_generate_slides_json(slides_dir, verbose)
print("Import complete.")
return 0
def _generate_slides_json(directory: Path, verbose: bool) -> None:
"""Generate slides.json from Keynote export folder."""
extensions = {".png", ".gif", ".pdf", ".jpg", ".jpeg"}
files = [f for f in directory.iterdir() if f.suffix.lower() in extensions]
if not files:
print(f" Warning: No image files in {directory}")
return
# Extract numeric suffix from filenames like "Video1.001.png"
pattern = re.compile(r"\.(\d+)\.[^.]+$")
slides = {}
for file in files:
match = pattern.search(file.name)
if match:
num = int(match.group(1))
slide_id = f"S{num}"
slides[slide_id] = {
"image": file.name,
"type": "fullscreen",
}
if not slides:
print(f" Warning: No valid slide files in {directory}")
return
# Sort by slide number
sorted_slides = dict(sorted(slides.items(), key=lambda x: int(x[0][1:])))
# Write slides.json
output_path = directory / "slides.json"
with open(output_path, "w", encoding="utf-8") as f:
json.dump(sorted_slides, f, indent=2)
print(f" Generated {output_path} ({len(sorted_slides)} slides)")
if verbose:
for slide_id in sorted_slides:
print(f" [{slide_id}]")
# =============================================================================
# Validate Command
# =============================================================================
def cmd_validate(project_path: Path, verbose: bool) -> int:
"""Validate project configuration."""
from .parser import (
parse_manuscript,
parse_project_config,
parse_slides,
parse_videos,
)
from .validator import validate_project
print(f"Validating: {project_path.name}")
if not (project_path / "project.json").exists():
print(f"Error: project.json not found in {project_path}", file=sys.stderr)
return 1
# Parse all files
_, markers, malformed = parse_manuscript(project_path)
@@ -168,6 +220,11 @@ def cmd_validate(project_path: Path) -> int:
slides = parse_slides(project_path, config)
videos = parse_videos(project_path)
if verbose:
print(f" - Markers in manuscript: {len(markers)}")
print(f" - Slides defined: {len(slides)}")
print(f" - Videos defined: {len(videos)}")
# Validate
validate_project(project_path, markers, config, slides, videos, malformed)
@@ -175,140 +232,155 @@ def cmd_validate(project_path: Path) -> int:
return 0
def cmd_render(project_path: Path, output_path: Path, verbose: bool, dry_run: bool) -> int:
"""Run full render pipeline."""
print(f"Rendering project: {project_path}")
print(f"Output: {output_path}")
print()
# =============================================================================
# Preprocess Command
# =============================================================================
# Stage 1: Extract
print("Stage 1/4: Parsing input files...")
def cmd_preprocess(project_path: Path, verbose: bool, dry_run: bool) -> int:
"""Run preprocessing pipeline on video sources."""
from .parser import parse_project_config, parse_videos
from .preprocessor import preprocess_video
print(f"Preprocessing: {project_path.name}")
config = parse_project_config(project_path)
videos = parse_videos(project_path)
for video_id, video_source in videos.items():
print(f"\n Processing: {video_id}")
if not video_source.preprocess:
print(" No preprocessing steps defined, skipping.")
continue
if dry_run:
print(f" Would preprocess: {video_source.file}")
for step in video_source.preprocess:
print(f" - {step}")
else:
preprocess_video(project_path, video_id, video_source, verbose)
print("\nPreprocessing complete.")
return 0
# =============================================================================
# Render Command
# =============================================================================
def cmd_render(project_path: Path, verbose: bool, dry_run: bool) -> int:
"""Render final video."""
from .parser import (
parse_manuscript,
parse_project_config,
parse_slides,
parse_transcript,
parse_videos,
)
from .validator import validate_project
from .transformer import build_render_plan
from .renderer import render, generate_ffmpeg_command_string
print(f"Rendering: {project_path.name}")
# Stage 1: Parse
print("\n[1/4] Parsing...")
_, markers, malformed = parse_manuscript(project_path)
config = parse_project_config(project_path)
slides = parse_slides(project_path, config)
videos = parse_videos(project_path)
transcript = parse_transcript(project_path)
print(f" - Found {len(markers)} slide markers in manuscript")
print(f" - Found {len(slides)} slide definitions")
print(f" - Found {len(transcript)} transcript entries")
print()
if verbose:
print(f" - Markers: {len(markers)}")
print(f" - Slides: {len(slides)}")
print(f" - Transcript entries: {len(transcript)}")
# Stage 2: Validate
print("Stage 2/4: Validating...")
print("\n[2/4] Validating...")
validate_project(project_path, markers, config, slides, videos, malformed)
print(" - Validation passed")
print()
print(" Passed.")
# Stage 3: Transform
print("Stage 3/4: Building render plan...")
print("\n[3/4] Building render plan...")
plan = build_render_plan(project_path, config, slides, videos, transcript)
print(f" - Video duration: {plan.total_duration:.2f}s")
print(f" - Duration: {plan.total_duration:.1f}s")
print(f" - Slide events: {len(plan.slide_events)}")
if verbose:
for event in plan.slide_events:
print(f" - [{event.slide_id}] {event.start_time:.2f}s - {event.end_time:.2f}s")
print()
print(f" [{event.slide_id}] {event.start_time:.1f}s - {event.end_time:.1f}s")
# Stage 4: Render
output_path = project_path / "out" / "final.mp4"
if dry_run:
print("Stage 4/4: Generating FFmpeg command (dry run)...")
print()
print("\n[4/4] FFmpeg command (dry run):")
print(generate_ffmpeg_command_string(plan, output_path))
return 0
print("Stage 4/4: Rendering video...")
print("\n[4/4] Rendering...")
render(plan, output_path, verbose=verbose)
print(f" - Output written to: {output_path}")
print()
print("Done.")
print(f" Output: {output_path}")
print("\nDone.")
return 0
def cmd_generate_slides(directory: Path, slide_type: str) -> int:
"""Generate slides.json from Keynote export folder."""
directory = directory.resolve()
# =============================================================================
# Transcribe Command
# =============================================================================
if not directory.exists():
print(f"Error: Directory not found: {directory}", file=sys.stderr)
return 1
if not directory.is_dir():
print(f"Error: Not a directory: {directory}", file=sys.stderr)
return 1
# Find all image files (png, gif, pdf)
extensions = {".png", ".gif", ".pdf", ".jpg", ".jpeg"}
files = [f for f in directory.iterdir() if f.suffix.lower() in extensions]
if not files:
print(f"Error: No image files found in {directory}", file=sys.stderr)
return 1
# Extract numeric suffix from filenames like "Video1.001.png"
# Pattern: anything followed by .NNN. followed by extension
pattern = re.compile(r"\.(\d+)\.[^.]+$")
slides = {}
for file in files:
match = pattern.search(file.name)
if match:
num = int(match.group(1)) # "001" -> 1
slide_id = f"S{num}"
slides[slide_id] = {
"image": file.name,
"type": slide_type,
}
else:
print(f" Warning: Could not parse slide number from: {file.name}")
if not slides:
print("Error: No valid slide files found", file=sys.stderr)
return 1
# Sort by slide number
sorted_slides = dict(sorted(slides.items(), key=lambda x: int(x[0][1:])))
# Write slides.json in the same directory
output_path = directory / "slides.json"
with open(output_path, "w", encoding="utf-8") as f:
json.dump(sorted_slides, f, indent=2)
print(f"Generated {output_path}")
print(f" - Found {len(sorted_slides)} slides")
for slide_id, slide_def in sorted_slides.items():
print(f" [{slide_id}] {slide_def['image']}")
return 0
def cmd_transcribe(video_path: Path, output_path: Path, model: str) -> int:
def cmd_transcribe(project_path: Path, verbose: bool) -> int:
"""Transcribe video audio using Whisper."""
print(f"Transcribing: {video_path}")
print(f"Model: {model}")
print()
from .transcriber import transcribe_video, save_transcript
from .parser import parse_videos
words = transcribe_video(video_path, model=model)
print(f"Transcribing: {project_path.name}")
videos = parse_videos(project_path)
if not videos:
print("Error: No videos defined in videos.json", file=sys.stderr)
return 1
# Use first video
video_id = next(iter(videos.keys()))
video_source = videos[video_id]
video_path = project_path / video_source.file
if not video_path.exists():
print(f"Error: Video not found: {video_path}", file=sys.stderr)
return 1
print(f" Video: {video_path.name}")
words = transcribe_video(video_path, model="base")
output_path = video_path.with_suffix(".transcript.json")
save_transcript(words, output_path)
print(f" - Transcribed {len(words)} words")
print(f" - Duration: {words[-1].end:.1f}s" if words else " - No words found")
print(f" - Saved: {output_path}")
save_transcript(words, output_path)
print(f" - Saved to: {output_path}")
# Show first few words as preview
if words:
if verbose and words:
preview = " ".join(w.word for w in words[:10])
print(f" - Preview: {preview}...")
return 0
def cmd_align(project_path: Path, transcript_path: Path = None, offset: float = -1.0) -> int:
# =============================================================================
# Align Command
# =============================================================================
def cmd_align(project_path: Path, verbose: bool) -> int:
"""Align manuscript markers to transcript timestamps."""
print(f"Aligning: {project_path}")
print(f"Offset: {offset}s")
print()
from .transcriber import load_transcript
from .aligner import align_markers, save_aligned_transcript
from .parser import parse_videos
print(f"Aligning: {project_path.name}")
# Load manuscript
manuscript_path = project_path / "manuscript.txt"
@@ -318,39 +390,40 @@ def cmd_align(project_path: Path, transcript_path: Path = None, offset: float =
manuscript_text = manuscript_path.read_text(encoding="utf-8")
# Load transcript
if transcript_path is None:
# Try to find transcript in media folder
transcript_path = project_path / "media" / "talking_head.transcript.json"
# Find transcript
videos = parse_videos(project_path)
video_id = next(iter(videos.keys()))
video_source = videos[video_id]
video_path = project_path / video_source.file
transcript_path = video_path.with_suffix(".transcript.json")
if not transcript_path.exists():
print(f"Error: Transcript not found: {transcript_path}", file=sys.stderr)
print("Run 'gnommo transcribe' first to generate the transcript.", file=sys.stderr)
print("Run with -a transcribe first.", file=sys.stderr)
return 1
print(f" - Loading transcript: {transcript_path}")
print(f" Loading: {transcript_path.name}")
transcript = load_transcript(transcript_path)
print(f" - Loaded {len(transcript)} words")
print(f" - {len(transcript)} words")
# Align markers
print(" - Aligning markers...")
alignments = align_markers(manuscript_text, transcript, offset_seconds=offset)
# Align
print(" Aligning markers...")
alignments = align_markers(manuscript_text, transcript, offset_seconds=-1.0)
# Report results
print()
print("Alignment results:")
# Report
unmatched = 0
for a in alignments:
if a.timestamp >= 0:
print(f" [{a.marker_id}] @ {a.timestamp:.2f}s - \"{a.matched_phrase}...\"")
if verbose:
print(f" [{a.marker_id}] @ {a.timestamp:.1f}s")
else:
print(f" [{a.marker_id}] NOT FOUND - \"{a.matched_phrase}...\"")
print(f" [{a.marker_id}] NOT FOUND")
unmatched += 1
if unmatched > 0:
print(f"\nWarning: {unmatched} markers could not be aligned")
print(f"\n Warning: {unmatched} markers not aligned")
# Save aligned transcript.csv
# Save
output_path = project_path / "transcript.csv"
save_aligned_transcript(alignments, transcript, output_path)
print(f"\n Saved: {output_path}")
@@ -358,5 +431,42 @@ def cmd_align(project_path: Path, transcript_path: Path = None, offset: float =
return 0
# =============================================================================
# All Command (Full Pipeline)
# =============================================================================
def cmd_all(project_path: Path, verbose: bool, dry_run: bool) -> int:
"""Run full pipeline: transcribe → align → render."""
from .parser import parse_videos
print(f"=== Full Pipeline: {project_path.name} ===\n")
# Check if transcript exists
videos = parse_videos(project_path)
if videos:
video_id = next(iter(videos.keys()))
video_source = videos[video_id]
video_path = project_path / video_source.file
transcript_path = video_path.with_suffix(".transcript.json")
if not transcript_path.exists():
print(">>> Step 1/3: Transcribe\n")
result = cmd_transcribe(project_path, verbose)
if result != 0:
return result
else:
print(f">>> Step 1/3: Transcribe (cached: {transcript_path.name})\n")
# Align
print("\n>>> Step 2/3: Align\n")
result = cmd_align(project_path, verbose)
if result != 0:
return result
# Render
print("\n>>> Step 3/3: Render\n")
return cmd_render(project_path, verbose, dry_run)
if __name__ == "__main__":
sys.exit(main())
+17
View File
@@ -57,3 +57,20 @@ class RenderError(GnommoError):
if stderr:
full_message += f"\nFFmpeg output:\n{stderr}"
super().__init__(full_message)
class PreprocessError(GnommoError):
"""Error during preprocessing stage."""
def __init__(self, message: str, filter_type: Optional[str] = None, command: Optional[str] = None, stderr: Optional[str] = None):
self.filter_type = filter_type
self.command = command
self.stderr = stderr
full_message = message
if filter_type:
full_message = f"[{filter_type}] {full_message}"
if command:
full_message += f"\nCommand: {command}"
if stderr:
full_message += f"\nFFmpeg output:\n{stderr}"
super().__init__(full_message)
+35 -2
View File
@@ -12,6 +12,7 @@ class TalkingHeadConfig:
y: int
target_height: int # in pixels, or -1 for percentage-based
target_height_percent: float = 0.0 # percentage (0.0-1.0) if target_height is -1
file: Optional[str] = None # Path to video or metadata JSON file
@dataclass
@@ -21,7 +22,8 @@ class ProjectConfig:
fps: int
talking_head: TalkingHeadConfig
default_slide_type: str
background_video: str
background: str = "" # Background image or video path (in shared_assets/)
background_video: str = "" # Deprecated: use background instead
slides_path: str = "slides.json" # path to slides.json relative to project
audio_source: Optional[str] = None # defaults to talking head
@@ -33,11 +35,41 @@ class SlideDefinition:
type: str # "fullscreen" | "square"
@dataclass
class ChromaKeyConfig:
"""Configuration for chroma key (green screen) filter."""
color: tuple[int, int, int] = (0, 255, 0) # RGB color to key out
similarity: float = 0.15 # Color similarity threshold (0.0-1.0)
blend: float = 0.1 # Edge blend/feathering (0.0-1.0)
spill: float = 0.0 # Spill suppression amount (0.0-1.0)
@dataclass
class FilterConfig:
"""Base configuration for a preprocessing filter."""
type: str
# Type-specific config stored in subclasses or as dict
@dataclass
class VideoSource:
"""Video source definition from videos.json."""
file: str
preprocess: list[str] = field(default_factory=list)
preprocess: list[dict] = field(default_factory=list) # List of filter config dicts
output_file: Optional[str] = None # Path to preprocessed output (if any)
@dataclass
class VideoMetadata:
"""
Metadata for a video source, typically from a .json file.
This allows defining preprocessing steps separately from videos.json,
enabling per-video preprocessing configuration.
"""
source_file: str # Original source video file
preprocess: list[dict] = field(default_factory=list) # Preprocessing filters
output: Optional[dict] = None # Output config {"file": "...", "colorspace": "...", "alpha": "..."}
@dataclass
@@ -78,6 +110,7 @@ class RenderPlan:
total_duration: float
slides: dict[str, SlideDefinition]
slides_dir: Path = None # directory containing slide images
talking_head_path: Path = None # Resolved path to actual video file
# Slide layout configurations (hardcoded for POC)
+73 -2
View File
@@ -4,7 +4,7 @@ import csv
import json
import re
from pathlib import Path
from typing import Any
from typing import Any, Optional
from .errors import ParseError
from .models import (
@@ -12,6 +12,7 @@ from .models import (
SlideDefinition,
TalkingHeadConfig,
TimedWord,
VideoMetadata,
VideoSource,
)
@@ -119,6 +120,7 @@ def parse_project_config(project_path: Path) -> ProjectConfig:
y=th_data.get("y", 100),
target_height=th_height,
target_height_percent=th_height_pct,
file=th_data.get("file"),
)
# Parse resolution
@@ -131,7 +133,8 @@ def parse_project_config(project_path: Path) -> ProjectConfig:
fps=data.get("fps", 30),
talking_head=talking_head,
default_slide_type=data.get("defaultSlideType", "square"),
background_video=data.get("background_video", ""),
background=data.get("background", ""),
background_video=data.get("background_video", ""), # Deprecated
slides_path=data.get("slides", "slides.json"),
audio_source=data.get("audio_source"),
)
@@ -206,6 +209,7 @@ def parse_videos(project_path: Path) -> dict[str, VideoSource]:
videos[video_id] = VideoSource(
file=video_data["file"],
preprocess=video_data.get("preprocess", []),
output_file=video_data.get("output_file"),
)
return videos
@@ -229,3 +233,70 @@ def get_video_duration(video_path: Path) -> float:
raise ParseError(f"Failed to get duration: {result.stderr}", video_path)
return float(result.stdout.strip())
def parse_video_metadata(metadata_path: Path) -> VideoMetadata:
"""
Parse a video metadata JSON file.
Expected format:
{
"source_file": "talking_head.mov",
"preprocess": [
{"type": "chroma_key", "color": [0, 255, 0], "similarity": 0.15}
],
"output": {
"file": "intermediate/talking_head_rgba.mov",
"colorspace": "rgba",
"alpha": "straight"
}
}
"""
if not metadata_path.exists():
raise ParseError(f"Video metadata not found: {metadata_path}", metadata_path)
try:
data = json.loads(metadata_path.read_text(encoding="utf-8"))
except json.JSONDecodeError as e:
raise ParseError(f"Invalid JSON: {e}", metadata_path)
if "source_file" not in data:
raise ParseError("Video metadata missing required field 'source_file'", metadata_path)
return VideoMetadata(
source_file=data["source_file"],
preprocess=data.get("preprocess", []),
output=data.get("output"),
)
def resolve_video_file(project_path: Path, file_ref: str) -> tuple[Path, Optional[VideoMetadata]]:
"""
Resolve a video file reference, which can be either:
1. A direct path to a video file
2. A path to a metadata JSON file
Returns:
Tuple of (actual video path to use, metadata if JSON file was used)
"""
ref_path = project_path / file_ref
# Check if it's a metadata JSON file
if file_ref.endswith(".json") and ref_path.exists():
metadata = parse_video_metadata(ref_path)
# Resolve paths relative to the metadata file's directory
metadata_dir = ref_path.parent
# If output is specified and exists, use it; otherwise use source
if metadata.output and metadata.output.get("file"):
output_path = metadata_dir / metadata.output["file"]
if output_path.exists():
return output_path, metadata
# Fall back to source file
source_path = metadata_dir / metadata.source_file
return source_path, metadata
# Direct video file reference
return ref_path, None
+195
View File
@@ -0,0 +1,195 @@
"""Preprocessing stage: apply filters to source videos."""
import subprocess
from pathlib import Path
from typing import Any
from .errors import PreprocessError
from .models import VideoSource, ChromaKeyConfig
def preprocess_video(
project_path: Path,
video_id: str,
video_source: VideoSource,
verbose: bool = False,
) -> Path:
"""
Apply preprocessing filters to a video source.
Each filter is applied atomically, producing an intermediate ProRes 4444
file with alpha channel support. Filters are chained sequentially.
Returns:
Path to the final preprocessed output file.
"""
if not video_source.preprocess:
# No preprocessing needed, return original file
return project_path / video_source.file
# Ensure intermediate directory exists
intermediate_dir = project_path / "intermediate"
intermediate_dir.mkdir(parents=True, exist_ok=True)
# Start with the source file
current_input = project_path / video_source.file
if not current_input.exists():
raise PreprocessError(
f"Source video not found: {current_input}",
filter_type=None,
)
# Apply each filter in sequence
for i, filter_config in enumerate(video_source.preprocess):
filter_type = filter_config.get("type")
if filter_type is None:
raise PreprocessError(
f"Filter {i} missing 'type' field",
filter_type=None,
)
# Determine output path for this filter step
step_output = intermediate_dir / f"{video_id}_step{i}_{filter_type}.mov"
if verbose:
print(f" Step {i + 1}: {filter_type}")
print(f" Input: {current_input}")
print(f" Output: {step_output}")
# Apply the appropriate filter
if filter_type == "chroma_key":
apply_chroma_key(current_input, step_output, filter_config, verbose)
else:
raise PreprocessError(
f"Unknown filter type: {filter_type}",
filter_type=filter_type,
)
current_input = step_output
# If output_file is specified, copy/rename to final location
if video_source.output_file:
final_output = project_path / video_source.output_file
final_output.parent.mkdir(parents=True, exist_ok=True)
# Copy the final intermediate to the output location
import shutil
shutil.copy2(current_input, final_output)
if verbose:
print(f" Final output: {final_output}")
return final_output
return current_input
def apply_chroma_key(
input_path: Path,
output_path: Path,
config: dict[str, Any],
verbose: bool = False,
) -> None:
"""
Apply chroma key (green screen) filter using FFmpeg.
Config options:
color: [R, G, B] - Color to key out (default: [0, 255, 0] green)
similarity: float - Color similarity threshold 0.0-1.0 (default: 0.15)
blend: float - Edge blend/feathering 0.0-1.0 (default: 0.1)
spill: float - Spill suppression 0.0-1.0 (default: 0.0)
Output is ProRes 4444 with alpha channel for lossless quality.
"""
# Parse config with defaults
chroma_config = parse_chroma_key_config(config)
# Convert RGB to hex format for FFmpeg
r, g, b = chroma_config.color
hex_color = f"0x{r:02x}{g:02x}{b:02x}"
# Build FFmpeg chromakey filter
# chromakey=color:similarity:blend
filter_parts = [
f"chromakey={hex_color}:{chroma_config.similarity:.3f}:{chroma_config.blend:.3f}"
]
# Add despill if specified
if chroma_config.spill > 0:
# despill filter removes color spill on edges
filter_parts.append(f"despill=type=green:mix={chroma_config.spill:.3f}")
video_filter = ",".join(filter_parts)
# Build FFmpeg command
# ProRes 4444 profile for alpha channel support
cmd = [
"ffmpeg",
"-y", # Overwrite output
"-i", str(input_path),
"-vf", video_filter,
"-c:v", "prores_ks",
"-profile:v", "4", # ProRes 4444
"-pix_fmt", "yuva444p10le", # 10-bit with alpha
"-c:a", "pcm_s16le", # Lossless audio
str(output_path),
]
if verbose:
print(f" Filter: {video_filter}")
print(f" Command: {' '.join(cmd)}")
result = subprocess.run(
cmd,
capture_output=True,
text=True,
)
if result.returncode != 0:
raise PreprocessError(
"Chroma key filter failed",
filter_type="chroma_key",
command=" ".join(cmd),
stderr=result.stderr,
)
def parse_chroma_key_config(config: dict[str, Any]) -> ChromaKeyConfig:
"""Parse a chroma key config dictionary into ChromaKeyConfig."""
color = config.get("color", [0, 255, 0])
if isinstance(color, list) and len(color) == 3:
color = tuple(color)
else:
color = (0, 255, 0)
return ChromaKeyConfig(
color=color,
similarity=float(config.get("similarity", 0.15)),
blend=float(config.get("blend", 0.1)),
spill=float(config.get("spill", 0.0)),
)
def get_preprocessed_path(project_path: Path, video_source: VideoSource) -> Path:
"""
Get the path to the preprocessed video file.
Returns output_file if specified, otherwise returns the original file.
"""
if video_source.output_file:
return project_path / video_source.output_file
return project_path / video_source.file
def needs_preprocessing(project_path: Path, video_source: VideoSource) -> bool:
"""Check if preprocessing is needed (has filters and output doesn't exist)."""
if not video_source.preprocess:
return False
if video_source.output_file:
output_path = project_path / video_source.output_file
return not output_path.exists()
return True
+29 -8
View File
@@ -50,14 +50,23 @@ def build_ffmpeg_command(plan: RenderPlan, output_path: Path) -> list[str]:
output_path = output_path.resolve()
# Input: talking head video
talking_head_path = project_path / plan.talking_head.file
# Use resolved path if available, otherwise construct from file
talking_head_path = plan.talking_head_path or (project_path / plan.talking_head.file)
cmd.extend(["-i", str(talking_head_path)])
# Input: background video (if specified)
has_background = bool(plan.config.background_video)
# Input: background image/video (if specified)
bg_file = plan.config.background or plan.config.background_video
has_background = bool(bg_file)
bg_is_image = False
if has_background:
bg_path = project_path / plan.config.background_video
# Try project folder first, then parent (for shared_assets)
bg_path = project_path / bg_file
if not bg_path.exists():
bg_path = project_path.parent / bg_file
cmd.extend(["-i", str(bg_path)])
# Check if background is an image
image_extensions = {".png", ".jpg", ".jpeg", ".gif", ".bmp", ".tiff", ".webp"}
bg_is_image = bg_path.suffix.lower() in image_extensions
# Input: slide images (from slides_dir, same directory as slides.json)
slides_dir = plan.slides_dir.resolve() if plan.slides_dir else project_path / "media" / "slides"
@@ -70,7 +79,7 @@ def build_ffmpeg_command(plan: RenderPlan, output_path: Path) -> list[str]:
slide_inputs.append(event.slide_id)
# Build filter_complex
filter_complex = build_filter_complex(plan, has_background, slide_inputs)
filter_complex = build_filter_complex(plan, has_background, slide_inputs, bg_is_image)
cmd.extend(["-filter_complex", filter_complex])
# Map output video and audio
@@ -96,12 +105,13 @@ def build_filter_complex(
plan: RenderPlan,
has_background: bool,
slide_inputs: list[str],
bg_is_image: bool = False,
) -> str:
"""
Build the filter_complex string for FFmpeg.
Layer structure:
- Layer 1: Background (solid color or video)
- Layer 1: Background (solid color, image, or video)
- Layer 2: Talking head
- Layer 3: Slides (with time-based enable)
"""
@@ -118,8 +128,19 @@ def build_filter_complex(
# Create base layer (background)
if has_background:
filters.append(f"[{bg_idx}:v]scale={width}:{height}:force_original_aspect_ratio=increase,"
f"crop={width}:{height}[bg]")
if bg_is_image:
# For images: loop to create video stream, then scale
filters.append(
f"[{bg_idx}:v]loop=loop=-1:size=1:start=0,"
f"scale={width}:{height}:force_original_aspect_ratio=increase,"
f"crop={width}:{height},fps={plan.config.fps}[bg]"
)
else:
# For videos: just scale
filters.append(
f"[{bg_idx}:v]scale={width}:{height}:force_original_aspect_ratio=increase,"
f"crop={width}:{height}[bg]"
)
base_label = "bg"
else:
# Create solid color background
+19 -3
View File
@@ -10,7 +10,7 @@ from .models import (
TimedWord,
VideoSource,
)
from .parser import get_video_duration
from .parser import get_video_duration, resolve_video_file
def build_render_plan(
@@ -26,12 +26,27 @@ def build_render_plan(
This transforms transcript markers into timed slide events and
assembles all information needed for the render stage.
"""
# For POC: use the first video as the talking head
# Determine talking head source:
# 1. If config.talking_head.file is set, use that (may be JSON metadata)
# 2. Otherwise, use first video from videos.json
if config.talking_head.file:
video_path, metadata = resolve_video_file(project_path, config.talking_head.file)
# Create a VideoSource from the resolved metadata
if metadata:
talking_head = VideoSource(
file=str(video_path.relative_to(project_path)) if video_path.is_relative_to(project_path) else str(video_path),
preprocess=metadata.preprocess,
output_file=metadata.output.get("file") if metadata.output else None,
)
else:
talking_head = VideoSource(file=config.talking_head.file)
else:
# Fall back to first video in videos.json
talking_head_id = next(iter(videos.keys()))
talking_head = videos[talking_head_id]
video_path = project_path / talking_head.file
# Get video duration for end time calculations
video_path = project_path / talking_head.file
total_duration = get_video_duration(video_path)
# Build slide events from transcript markers
@@ -49,6 +64,7 @@ def build_render_plan(
total_duration=total_duration,
slides=slides,
slides_dir=slides_dir,
talking_head_path=video_path,
)
+20 -4
View File
@@ -74,12 +74,28 @@ def validate_project(
project_path / "videos.json"
))
# Check background video exists (if specified)
if config.background_video:
bg_path = project_path / config.background_video
# Check preprocessed output exists if preprocessing is defined
if video_source.preprocess and video_source.output_file:
output_path = project_path / video_source.output_file
if not output_path.exists():
issues.append(ValidationIssue(
f"Preprocessed output not found: {video_source.output_file}. "
f"Run with -a preprocess first.",
project_path / "videos.json"
))
# Check background exists (image or video)
# Try 'background' first, fall back to deprecated 'background_video'
bg_file = config.background or config.background_video
if bg_file:
# Check in project folder first, then parent (for shared_assets)
bg_path = project_path / bg_file
if not bg_path.exists():
# Try parent directory (shared_assets at repo root)
bg_path = project_path.parent / bg_file
if not bg_path.exists():
issues.append(ValidationIssue(
f"Background video not found: {config.background_video}",
f"Background not found: {bg_file}",
project_path / "project.json"
))