Initial commit: GnommoEditor video pipeline
A code-first, declarative video editing system that compiles text documents into rendered video via FFmpeg. Uses a compiler-style ETL pipeline: Extract (parse inputs) → Validate → Transform (build timeline) → Render (FFmpeg). Features: - Text-based project definition (manuscript, transcript, JSON configs) - Slide markers [S1], [S2] in transcript map to timed overlays - Strict validation with fail-fast error reporting - FFmpeg filter_complex generation with time-based enables - CLI with validate/render/dry-run modes Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
+21
@@ -0,0 +1,21 @@
|
||||
# Python
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*$py.class
|
||||
*.so
|
||||
.Python
|
||||
venv/
|
||||
.venv/
|
||||
*.egg-info/
|
||||
|
||||
# OS
|
||||
.DS_Store
|
||||
Thumbs.db
|
||||
|
||||
# Output
|
||||
**/out/
|
||||
*.mp4
|
||||
|
||||
# Temp
|
||||
*.tmp
|
||||
.cache/
|
||||
@@ -0,0 +1,5 @@
|
||||
Welcome to GnommoEditor, a code-first video editing system. [S1]
|
||||
|
||||
In this example, we demonstrate how slides appear at specific timestamps based on markers in the transcript. [S2]
|
||||
|
||||
And that's the end of our demo.
|
||||
@@ -0,0 +1,11 @@
|
||||
{
|
||||
"resolution": [1920, 1080],
|
||||
"fps": 30,
|
||||
"talkinghead": {
|
||||
"x": 50,
|
||||
"y": 600,
|
||||
"targetheight": 400
|
||||
},
|
||||
"defaultSlideType": "square",
|
||||
"background_video": ""
|
||||
}
|
||||
@@ -0,0 +1,10 @@
|
||||
{
|
||||
"S1": {
|
||||
"image": "S1.png",
|
||||
"type": "square"
|
||||
},
|
||||
"S2": {
|
||||
"image": "S2.png",
|
||||
"type": "square"
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,8 @@
|
||||
t,word
|
||||
0.00,Hello
|
||||
0.30,world
|
||||
0.60,[S1]
|
||||
1.50,Second
|
||||
1.80,slide
|
||||
2.00,[S2]
|
||||
2.50,End
|
||||
|
@@ -0,0 +1,6 @@
|
||||
{
|
||||
"talking_head": {
|
||||
"file": "media/talking_head.mp4",
|
||||
"preprocess": []
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,3 @@
|
||||
"""GnommoEditor - A code-first, declarative video editing pipeline."""
|
||||
|
||||
__version__ = "0.1.0"
|
||||
@@ -0,0 +1,6 @@
|
||||
"""Allow running gnommo as a module: python -m gnommo"""
|
||||
|
||||
from .cli import main
|
||||
|
||||
if __name__ == "__main__":
|
||||
exit(main())
|
||||
+158
@@ -0,0 +1,158 @@
|
||||
"""CLI entry point for GnommoEditor."""
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
from . import __version__
|
||||
from .errors import GnommoError, ParseError, ValidationError, RenderError
|
||||
from .parser import (
|
||||
parse_manuscript,
|
||||
parse_project_config,
|
||||
parse_slides,
|
||||
parse_transcript,
|
||||
parse_videos,
|
||||
)
|
||||
from .validator import validate_project
|
||||
from .transformer import build_render_plan
|
||||
from .renderer import render, generate_ffmpeg_command_string
|
||||
|
||||
|
||||
def main() -> int:
|
||||
"""Main entry point."""
|
||||
parser = argparse.ArgumentParser(
|
||||
prog="gnommo",
|
||||
description="GnommoEditor - A code-first video editing pipeline",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--version",
|
||||
action="version",
|
||||
version=f"%(prog)s {__version__}",
|
||||
)
|
||||
|
||||
subparsers = parser.add_subparsers(dest="command", required=True)
|
||||
|
||||
# validate command
|
||||
validate_parser = subparsers.add_parser(
|
||||
"validate",
|
||||
help="Validate project without rendering",
|
||||
)
|
||||
validate_parser.add_argument(
|
||||
"project",
|
||||
type=Path,
|
||||
help="Path to project directory",
|
||||
)
|
||||
|
||||
# render command
|
||||
render_parser = subparsers.add_parser(
|
||||
"render",
|
||||
help="Render video from project",
|
||||
)
|
||||
render_parser.add_argument(
|
||||
"project",
|
||||
type=Path,
|
||||
help="Path to project directory",
|
||||
)
|
||||
render_parser.add_argument(
|
||||
"-o", "--output",
|
||||
type=Path,
|
||||
help="Output file path (default: project/out/final.mp4)",
|
||||
)
|
||||
render_parser.add_argument(
|
||||
"-v", "--verbose",
|
||||
action="store_true",
|
||||
help="Print FFmpeg command",
|
||||
)
|
||||
render_parser.add_argument(
|
||||
"--dry-run",
|
||||
action="store_true",
|
||||
help="Print FFmpeg command without executing",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
try:
|
||||
if args.command == "validate":
|
||||
return cmd_validate(args.project)
|
||||
elif args.command == "render":
|
||||
output = args.output or (args.project / "out" / "final.mp4")
|
||||
return cmd_render(args.project, output, args.verbose, args.dry_run)
|
||||
except GnommoError as e:
|
||||
print(f"Error: {e}", file=sys.stderr)
|
||||
return 1
|
||||
except KeyboardInterrupt:
|
||||
print("\nAborted.", file=sys.stderr)
|
||||
return 130
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
def cmd_validate(project_path: Path) -> int:
|
||||
"""Run validation only."""
|
||||
print(f"Validating project: {project_path}")
|
||||
|
||||
# Parse all files
|
||||
_, markers = parse_manuscript(project_path)
|
||||
config = parse_project_config(project_path)
|
||||
slides = parse_slides(project_path)
|
||||
videos = parse_videos(project_path)
|
||||
|
||||
# Validate
|
||||
validate_project(project_path, markers, config, slides, videos)
|
||||
|
||||
print("Validation passed.")
|
||||
return 0
|
||||
|
||||
|
||||
def cmd_render(project_path: Path, output_path: Path, verbose: bool, dry_run: bool) -> int:
|
||||
"""Run full render pipeline."""
|
||||
print(f"Rendering project: {project_path}")
|
||||
print(f"Output: {output_path}")
|
||||
print()
|
||||
|
||||
# Stage 1: Extract
|
||||
print("Stage 1/4: Parsing input files...")
|
||||
_, markers = parse_manuscript(project_path)
|
||||
config = parse_project_config(project_path)
|
||||
slides = parse_slides(project_path)
|
||||
videos = parse_videos(project_path)
|
||||
transcript = parse_transcript(project_path)
|
||||
|
||||
print(f" - Found {len(markers)} slide markers in manuscript")
|
||||
print(f" - Found {len(slides)} slide definitions")
|
||||
print(f" - Found {len(transcript)} transcript entries")
|
||||
print()
|
||||
|
||||
# Stage 2: Validate
|
||||
print("Stage 2/4: Validating...")
|
||||
validate_project(project_path, markers, config, slides, videos)
|
||||
print(" - Validation passed")
|
||||
print()
|
||||
|
||||
# Stage 3: Transform
|
||||
print("Stage 3/4: Building render plan...")
|
||||
plan = build_render_plan(project_path, config, slides, videos, transcript)
|
||||
print(f" - Video duration: {plan.total_duration:.2f}s")
|
||||
print(f" - Slide events: {len(plan.slide_events)}")
|
||||
for event in plan.slide_events:
|
||||
print(f" - [{event.slide_id}] {event.start_time:.2f}s - {event.end_time:.2f}s")
|
||||
print()
|
||||
|
||||
# Stage 4: Render
|
||||
if dry_run:
|
||||
print("Stage 4/4: Generating FFmpeg command (dry run)...")
|
||||
print()
|
||||
print(generate_ffmpeg_command_string(plan, output_path))
|
||||
return 0
|
||||
|
||||
print("Stage 4/4: Rendering video...")
|
||||
render(plan, output_path, verbose=verbose)
|
||||
print(f" - Output written to: {output_path}")
|
||||
print()
|
||||
print("Done.")
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
@@ -0,0 +1,59 @@
|
||||
"""Structured error types for GnommoEditor pipeline."""
|
||||
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
|
||||
class GnommoError(Exception):
|
||||
"""Base exception for all GnommoEditor errors."""
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class ValidationIssue:
|
||||
"""A single validation issue with location context."""
|
||||
message: str
|
||||
file: Optional[Path] = None
|
||||
line: Optional[int] = None
|
||||
|
||||
def __str__(self) -> str:
|
||||
parts = []
|
||||
if self.file:
|
||||
parts.append(str(self.file))
|
||||
if self.line is not None:
|
||||
parts.append(f"line {self.line}")
|
||||
location = ":".join(parts) if parts else "project"
|
||||
return f"[{location}] {self.message}"
|
||||
|
||||
|
||||
class ParseError(GnommoError):
|
||||
"""Error during parsing of input files."""
|
||||
|
||||
def __init__(self, message: str, file: Optional[Path] = None, line: Optional[int] = None):
|
||||
self.issue = ValidationIssue(message, file, line)
|
||||
super().__init__(str(self.issue))
|
||||
|
||||
|
||||
class ValidationError(GnommoError):
|
||||
"""Error during validation stage. Can contain multiple issues."""
|
||||
|
||||
def __init__(self, issues: list[ValidationIssue]):
|
||||
self.issues = issues
|
||||
message = f"Validation failed with {len(issues)} error(s):\n"
|
||||
message += "\n".join(f" - {issue}" for issue in issues)
|
||||
super().__init__(message)
|
||||
|
||||
|
||||
class RenderError(GnommoError):
|
||||
"""Error during rendering stage."""
|
||||
|
||||
def __init__(self, message: str, command: Optional[str] = None, stderr: Optional[str] = None):
|
||||
self.command = command
|
||||
self.stderr = stderr
|
||||
full_message = message
|
||||
if command:
|
||||
full_message += f"\nCommand: {command}"
|
||||
if stderr:
|
||||
full_message += f"\nFFmpeg output:\n{stderr}"
|
||||
super().__init__(full_message)
|
||||
@@ -0,0 +1,94 @@
|
||||
"""Data models for GnommoEditor pipeline."""
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
class TalkingHeadConfig:
|
||||
"""Configuration for talking head video positioning."""
|
||||
x: int
|
||||
y: int
|
||||
target_height: int # in pixels, or -1 for percentage-based
|
||||
|
||||
|
||||
@dataclass
|
||||
class ProjectConfig:
|
||||
"""Global project configuration from project.json."""
|
||||
resolution: tuple[int, int]
|
||||
fps: int
|
||||
talking_head: TalkingHeadConfig
|
||||
default_slide_type: str
|
||||
background_video: str
|
||||
audio_source: Optional[str] = None # defaults to talking head
|
||||
|
||||
|
||||
@dataclass
|
||||
class SlideDefinition:
|
||||
"""Definition of a single slide from slides.json."""
|
||||
image: str
|
||||
type: str # "fullscreen" | "square"
|
||||
|
||||
|
||||
@dataclass
|
||||
class VideoSource:
|
||||
"""Video source definition from videos.json."""
|
||||
file: str
|
||||
preprocess: list[str] = field(default_factory=list)
|
||||
|
||||
|
||||
@dataclass
|
||||
class TimedWord:
|
||||
"""A word or marker with its timestamp from transcript.csv."""
|
||||
time: float
|
||||
word: str
|
||||
|
||||
@property
|
||||
def is_marker(self) -> bool:
|
||||
"""Check if this is a slide marker like [S1]."""
|
||||
return self.word.startswith("[") and self.word.endswith("]")
|
||||
|
||||
@property
|
||||
def marker_id(self) -> Optional[str]:
|
||||
"""Extract marker ID (e.g., 'S1' from '[S1]')."""
|
||||
if self.is_marker:
|
||||
return self.word[1:-1]
|
||||
return None
|
||||
|
||||
|
||||
@dataclass
|
||||
class SlideEvent:
|
||||
"""A resolved slide event with timing information."""
|
||||
slide_id: str
|
||||
start_time: float
|
||||
end_time: float
|
||||
slide_def: SlideDefinition
|
||||
|
||||
|
||||
@dataclass
|
||||
class RenderPlan:
|
||||
"""Complete plan for rendering the final video."""
|
||||
project_path: Path
|
||||
config: ProjectConfig
|
||||
talking_head: VideoSource
|
||||
slide_events: list[SlideEvent]
|
||||
total_duration: float
|
||||
slides: dict[str, SlideDefinition]
|
||||
|
||||
|
||||
# Slide layout configurations (hardcoded for POC)
|
||||
SLIDE_LAYOUTS = {
|
||||
"fullscreen": {
|
||||
"x": 0,
|
||||
"y": 0,
|
||||
"width": 1920,
|
||||
"height": 1080,
|
||||
},
|
||||
"square": {
|
||||
"x": 560, # centered horizontally: (1920 - 800) / 2
|
||||
"y": 140, # positioned in upper area
|
||||
"width": 800,
|
||||
"height": 800,
|
||||
},
|
||||
}
|
||||
@@ -0,0 +1,197 @@
|
||||
"""Extract stage: parse all input files."""
|
||||
|
||||
import csv
|
||||
import json
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from .errors import ParseError
|
||||
from .models import (
|
||||
ProjectConfig,
|
||||
SlideDefinition,
|
||||
TalkingHeadConfig,
|
||||
TimedWord,
|
||||
VideoSource,
|
||||
)
|
||||
|
||||
|
||||
def parse_manuscript(project_path: Path) -> tuple[str, list[str]]:
|
||||
"""
|
||||
Parse manuscript.txt and extract text content and slide markers.
|
||||
|
||||
Returns:
|
||||
Tuple of (full text, list of marker IDs found)
|
||||
"""
|
||||
manuscript_path = project_path / "manuscript.txt"
|
||||
|
||||
if not manuscript_path.exists():
|
||||
raise ParseError("manuscript.txt not found", manuscript_path)
|
||||
|
||||
text = manuscript_path.read_text(encoding="utf-8")
|
||||
|
||||
# Extract all slide markers like [S1], [S2], etc.
|
||||
markers = re.findall(r"\[([A-Za-z0-9_]+)\]", text)
|
||||
|
||||
return text, markers
|
||||
|
||||
|
||||
def parse_transcript(project_path: Path) -> list[TimedWord]:
|
||||
"""
|
||||
Parse transcript.csv into a list of timed words.
|
||||
|
||||
Expected format:
|
||||
t,word
|
||||
0.00,This
|
||||
0.42,is
|
||||
...
|
||||
"""
|
||||
transcript_path = project_path / "transcript.csv"
|
||||
|
||||
if not transcript_path.exists():
|
||||
raise ParseError("transcript.csv not found", transcript_path)
|
||||
|
||||
timed_words = []
|
||||
|
||||
with open(transcript_path, "r", encoding="utf-8") as f:
|
||||
reader = csv.DictReader(f)
|
||||
|
||||
if reader.fieldnames is None or "t" not in reader.fieldnames or "word" not in reader.fieldnames:
|
||||
raise ParseError(
|
||||
"transcript.csv must have columns: t, word",
|
||||
transcript_path
|
||||
)
|
||||
|
||||
for line_num, row in enumerate(reader, start=2): # start=2 because line 1 is header
|
||||
try:
|
||||
time = float(row["t"])
|
||||
word = row["word"].strip()
|
||||
timed_words.append(TimedWord(time=time, word=word))
|
||||
except (ValueError, KeyError) as e:
|
||||
raise ParseError(
|
||||
f"Invalid row: {e}",
|
||||
transcript_path,
|
||||
line_num
|
||||
)
|
||||
|
||||
return timed_words
|
||||
|
||||
|
||||
def parse_project_config(project_path: Path) -> ProjectConfig:
|
||||
"""Parse project.json into ProjectConfig."""
|
||||
config_path = project_path / "project.json"
|
||||
|
||||
if not config_path.exists():
|
||||
raise ParseError("project.json not found", config_path)
|
||||
|
||||
try:
|
||||
data = json.loads(config_path.read_text(encoding="utf-8"))
|
||||
except json.JSONDecodeError as e:
|
||||
raise ParseError(f"Invalid JSON: {e}", config_path)
|
||||
|
||||
# Parse talking head config
|
||||
th_data = data.get("talkinghead", {})
|
||||
talking_head = TalkingHeadConfig(
|
||||
x=th_data.get("x", 100),
|
||||
y=th_data.get("y", 100),
|
||||
target_height=_parse_dimension(th_data.get("targetheight", 200)),
|
||||
)
|
||||
|
||||
# Parse resolution
|
||||
resolution = data.get("resolution", [1920, 1080])
|
||||
if not isinstance(resolution, list) or len(resolution) != 2:
|
||||
raise ParseError("resolution must be [width, height]", config_path)
|
||||
|
||||
return ProjectConfig(
|
||||
resolution=tuple(resolution),
|
||||
fps=data.get("fps", 30),
|
||||
talking_head=talking_head,
|
||||
default_slide_type=data.get("defaultSlideType", "square"),
|
||||
background_video=data.get("background_video", ""),
|
||||
audio_source=data.get("audio_source"),
|
||||
)
|
||||
|
||||
|
||||
def _parse_dimension(value: Any) -> int:
|
||||
"""Parse a dimension value (can be int or string like '100%')."""
|
||||
if isinstance(value, int):
|
||||
return value
|
||||
if isinstance(value, str):
|
||||
if value.endswith("%"):
|
||||
return -1 # Percentage marker, will be resolved during rendering
|
||||
return int(value)
|
||||
return 200 # default
|
||||
|
||||
|
||||
def parse_slides(project_path: Path) -> dict[str, SlideDefinition]:
|
||||
"""Parse slides.json into slide definitions."""
|
||||
slides_path = project_path / "slides.json"
|
||||
|
||||
if not slides_path.exists():
|
||||
raise ParseError("slides.json not found", slides_path)
|
||||
|
||||
try:
|
||||
data = json.loads(slides_path.read_text(encoding="utf-8"))
|
||||
except json.JSONDecodeError as e:
|
||||
raise ParseError(f"Invalid JSON: {e}", slides_path)
|
||||
|
||||
slides = {}
|
||||
for slide_id, slide_data in data.items():
|
||||
if "image" not in slide_data:
|
||||
raise ParseError(
|
||||
f"Slide '{slide_id}' missing required field 'image'",
|
||||
slides_path
|
||||
)
|
||||
slides[slide_id] = SlideDefinition(
|
||||
image=slide_data["image"],
|
||||
type=slide_data.get("type", "square"),
|
||||
)
|
||||
|
||||
return slides
|
||||
|
||||
|
||||
def parse_videos(project_path: Path) -> dict[str, VideoSource]:
|
||||
"""Parse videos.json into video source definitions."""
|
||||
videos_path = project_path / "videos.json"
|
||||
|
||||
if not videos_path.exists():
|
||||
raise ParseError("videos.json not found", videos_path)
|
||||
|
||||
try:
|
||||
data = json.loads(videos_path.read_text(encoding="utf-8"))
|
||||
except json.JSONDecodeError as e:
|
||||
raise ParseError(f"Invalid JSON: {e}", videos_path)
|
||||
|
||||
videos = {}
|
||||
for video_id, video_data in data.items():
|
||||
if "file" not in video_data:
|
||||
raise ParseError(
|
||||
f"Video '{video_id}' missing required field 'file'",
|
||||
videos_path
|
||||
)
|
||||
videos[video_id] = VideoSource(
|
||||
file=video_data["file"],
|
||||
preprocess=video_data.get("preprocess", []),
|
||||
)
|
||||
|
||||
return videos
|
||||
|
||||
|
||||
def get_video_duration(video_path: Path) -> float:
|
||||
"""Get duration of a video file using ffprobe."""
|
||||
import subprocess
|
||||
|
||||
cmd = [
|
||||
"ffprobe",
|
||||
"-v", "error",
|
||||
"-show_entries", "format=duration",
|
||||
"-of", "default=noprint_wrappers=1:nokey=1",
|
||||
str(video_path)
|
||||
]
|
||||
|
||||
result = subprocess.run(cmd, capture_output=True, text=True)
|
||||
|
||||
if result.returncode != 0:
|
||||
raise ParseError(f"Failed to get duration: {result.stderr}", video_path)
|
||||
|
||||
return float(result.stdout.strip())
|
||||
@@ -0,0 +1,197 @@
|
||||
"""Load stage: generate and execute FFmpeg commands."""
|
||||
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
from .errors import RenderError
|
||||
from .models import RenderPlan, SlideEvent, SLIDE_LAYOUTS
|
||||
|
||||
|
||||
def render(plan: RenderPlan, output_path: Path, verbose: bool = False) -> None:
|
||||
"""
|
||||
Render the final video using FFmpeg.
|
||||
|
||||
Generates a filter_complex command that:
|
||||
1. Scales background video (if present) or creates solid color
|
||||
2. Overlays talking head at configured position
|
||||
3. Overlays slides at their configured positions with time-based enable
|
||||
"""
|
||||
# Ensure output directory exists
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Build and execute FFmpeg command
|
||||
cmd = build_ffmpeg_command(plan, output_path)
|
||||
|
||||
if verbose:
|
||||
print("FFmpeg command:")
|
||||
print(" ".join(cmd))
|
||||
print()
|
||||
|
||||
result = subprocess.run(
|
||||
cmd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
|
||||
if result.returncode != 0:
|
||||
raise RenderError(
|
||||
"FFmpeg rendering failed",
|
||||
command=" ".join(cmd),
|
||||
stderr=result.stderr,
|
||||
)
|
||||
|
||||
|
||||
def build_ffmpeg_command(plan: RenderPlan, output_path: Path) -> list[str]:
|
||||
"""Build the complete FFmpeg command as a list of arguments."""
|
||||
cmd = ["ffmpeg", "-y"] # -y to overwrite output
|
||||
|
||||
# Resolve paths to absolute
|
||||
project_path = plan.project_path.resolve()
|
||||
output_path = output_path.resolve()
|
||||
|
||||
# Input: talking head video
|
||||
talking_head_path = project_path / plan.talking_head.file
|
||||
cmd.extend(["-i", str(talking_head_path)])
|
||||
|
||||
# Input: background video (if specified)
|
||||
has_background = bool(plan.config.background_video)
|
||||
if has_background:
|
||||
bg_path = project_path / plan.config.background_video
|
||||
cmd.extend(["-i", str(bg_path)])
|
||||
|
||||
# Input: slide images
|
||||
slides_path = project_path / "media" / "slides"
|
||||
slide_inputs: list[str] = [] # Track which slides we've added
|
||||
|
||||
for event in plan.slide_events:
|
||||
if event.slide_id not in slide_inputs:
|
||||
image_path = slides_path / event.slide_def.image
|
||||
cmd.extend(["-i", str(image_path)])
|
||||
slide_inputs.append(event.slide_id)
|
||||
|
||||
# Build filter_complex
|
||||
filter_complex = build_filter_complex(plan, has_background, slide_inputs)
|
||||
cmd.extend(["-filter_complex", filter_complex])
|
||||
|
||||
# Map output video and audio
|
||||
cmd.extend(["-map", "[vout]"])
|
||||
cmd.extend(["-map", "0:a"]) # Audio from talking head
|
||||
|
||||
# Output settings
|
||||
cmd.extend([
|
||||
"-t", str(plan.total_duration), # Limit output duration
|
||||
"-c:v", "libx264",
|
||||
"-preset", "fast",
|
||||
"-crf", "23",
|
||||
"-c:a", "aac",
|
||||
"-b:a", "192k",
|
||||
"-r", str(plan.config.fps),
|
||||
str(output_path),
|
||||
])
|
||||
|
||||
return cmd
|
||||
|
||||
|
||||
def build_filter_complex(
|
||||
plan: RenderPlan,
|
||||
has_background: bool,
|
||||
slide_inputs: list[str],
|
||||
) -> str:
|
||||
"""
|
||||
Build the filter_complex string for FFmpeg.
|
||||
|
||||
Layer structure:
|
||||
- Layer 1: Background (solid color or video)
|
||||
- Layer 2: Talking head
|
||||
- Layer 3: Slides (with time-based enable)
|
||||
"""
|
||||
width, height = plan.config.resolution
|
||||
filters: list[str] = []
|
||||
|
||||
# Input indices:
|
||||
# 0 = talking head
|
||||
# 1 = background (if present)
|
||||
# 2+ = slides
|
||||
talking_head_idx = 0
|
||||
bg_idx = 1 if has_background else None
|
||||
slide_start_idx = 2 if has_background else 1
|
||||
|
||||
# Create base layer (background)
|
||||
if has_background:
|
||||
filters.append(f"[{bg_idx}:v]scale={width}:{height}:force_original_aspect_ratio=increase,"
|
||||
f"crop={width}:{height}[bg]")
|
||||
base_label = "bg"
|
||||
else:
|
||||
# Create solid color background
|
||||
filters.append(f"color=c=black:s={width}x{height}:r={plan.config.fps}[bg]")
|
||||
base_label = "bg"
|
||||
|
||||
# Scale and position talking head
|
||||
th_config = plan.config.talking_head
|
||||
th_height = th_config.target_height if th_config.target_height > 0 else height
|
||||
|
||||
filters.append(
|
||||
f"[{talking_head_idx}:v]scale=-1:{th_height}[head]"
|
||||
)
|
||||
|
||||
# Overlay talking head on background
|
||||
filters.append(
|
||||
f"[{base_label}][head]overlay=x={th_config.x}:y={th_config.y}[base]"
|
||||
)
|
||||
|
||||
current_label = "base"
|
||||
|
||||
# Add slide overlays with time-based enable
|
||||
for i, event in enumerate(plan.slide_events):
|
||||
slide_idx = slide_start_idx + slide_inputs.index(event.slide_id)
|
||||
layout = SLIDE_LAYOUTS.get(event.slide_def.type, SLIDE_LAYOUTS["square"])
|
||||
|
||||
# Scale slide to fit layout while preserving aspect ratio
|
||||
slide_label = f"s{i}"
|
||||
filters.append(
|
||||
f"[{slide_idx}:v]scale={layout['width']}:{layout['height']}:"
|
||||
f"force_original_aspect_ratio=decrease[{slide_label}]"
|
||||
)
|
||||
|
||||
# Overlay with time-based enable
|
||||
next_label = f"v{i}" if i < len(plan.slide_events) - 1 else "vout"
|
||||
enable_expr = f"between(t,{event.start_time:.3f},{event.end_time:.3f})"
|
||||
|
||||
filters.append(
|
||||
f"[{current_label}][{slide_label}]overlay="
|
||||
f"x={layout['x']}:y={layout['y']}:"
|
||||
f"enable='{enable_expr}'[{next_label}]"
|
||||
)
|
||||
|
||||
current_label = next_label
|
||||
|
||||
# If no slides, just rename base to vout
|
||||
if not plan.slide_events:
|
||||
filters.append(f"[{current_label}]copy[vout]")
|
||||
|
||||
return ";".join(filters)
|
||||
|
||||
|
||||
def generate_ffmpeg_command_string(plan: RenderPlan, output_path: Path) -> str:
|
||||
"""Generate a human-readable FFmpeg command string (for debugging)."""
|
||||
cmd = build_ffmpeg_command(plan, output_path)
|
||||
|
||||
# Format nicely with line breaks
|
||||
result = []
|
||||
i = 0
|
||||
while i < len(cmd):
|
||||
if cmd[i] == "-filter_complex":
|
||||
result.append(f" -filter_complex \"\n {cmd[i+1].replace(';', ';' + chr(10) + ' ')}\n \"")
|
||||
i += 2
|
||||
elif cmd[i].startswith("-"):
|
||||
if i + 1 < len(cmd) and not cmd[i + 1].startswith("-"):
|
||||
result.append(f" {cmd[i]} {cmd[i+1]}")
|
||||
i += 2
|
||||
else:
|
||||
result.append(f" {cmd[i]}")
|
||||
i += 1
|
||||
else:
|
||||
result.append(f" {cmd[i]}")
|
||||
i += 1
|
||||
|
||||
return "ffmpeg \\\n" + " \\\n".join(result)
|
||||
@@ -0,0 +1,88 @@
|
||||
"""Transform stage: resolve timings and build render plan."""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from .models import (
|
||||
ProjectConfig,
|
||||
RenderPlan,
|
||||
SlideDefinition,
|
||||
SlideEvent,
|
||||
TimedWord,
|
||||
VideoSource,
|
||||
)
|
||||
from .parser import get_video_duration
|
||||
|
||||
|
||||
def build_render_plan(
|
||||
project_path: Path,
|
||||
config: ProjectConfig,
|
||||
slides: dict[str, SlideDefinition],
|
||||
videos: dict[str, VideoSource],
|
||||
transcript: list[TimedWord],
|
||||
) -> RenderPlan:
|
||||
"""
|
||||
Build a complete render plan from parsed and validated data.
|
||||
|
||||
This transforms transcript markers into timed slide events and
|
||||
assembles all information needed for the render stage.
|
||||
"""
|
||||
# For POC: use the first video as the talking head
|
||||
talking_head_id = next(iter(videos.keys()))
|
||||
talking_head = videos[talking_head_id]
|
||||
|
||||
# Get video duration for end time calculations
|
||||
video_path = project_path / talking_head.file
|
||||
total_duration = get_video_duration(video_path)
|
||||
|
||||
# Build slide events from transcript markers
|
||||
slide_events = _extract_slide_events(transcript, slides, total_duration)
|
||||
|
||||
return RenderPlan(
|
||||
project_path=project_path,
|
||||
config=config,
|
||||
talking_head=talking_head,
|
||||
slide_events=slide_events,
|
||||
total_duration=total_duration,
|
||||
slides=slides,
|
||||
)
|
||||
|
||||
|
||||
def _extract_slide_events(
|
||||
transcript: list[TimedWord],
|
||||
slides: dict[str, SlideDefinition],
|
||||
total_duration: float,
|
||||
) -> list[SlideEvent]:
|
||||
"""
|
||||
Extract slide events from transcript markers.
|
||||
|
||||
Each marker like [S1] in the transcript becomes a SlideEvent with:
|
||||
- start_time: timestamp of the marker
|
||||
- end_time: timestamp of next marker, or end of video
|
||||
"""
|
||||
# Find all markers in transcript
|
||||
marker_times: list[tuple[float, str]] = []
|
||||
|
||||
for timed_word in transcript:
|
||||
if timed_word.is_marker:
|
||||
marker_id = timed_word.marker_id
|
||||
if marker_id and marker_id in slides:
|
||||
marker_times.append((timed_word.time, marker_id))
|
||||
|
||||
# Convert markers to slide events
|
||||
events: list[SlideEvent] = []
|
||||
|
||||
for i, (start_time, marker_id) in enumerate(marker_times):
|
||||
# End time is start of next marker, or end of video
|
||||
if i + 1 < len(marker_times):
|
||||
end_time = marker_times[i + 1][0]
|
||||
else:
|
||||
end_time = total_duration
|
||||
|
||||
events.append(SlideEvent(
|
||||
slide_id=marker_id,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
slide_def=slides[marker_id],
|
||||
))
|
||||
|
||||
return events
|
||||
@@ -0,0 +1,104 @@
|
||||
"""Validation stage: fail-fast checks on parsed data."""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from .errors import ValidationError, ValidationIssue
|
||||
from .models import ProjectConfig, SlideDefinition, VideoSource, SLIDE_LAYOUTS
|
||||
|
||||
|
||||
def validate_project(
|
||||
project_path: Path,
|
||||
manuscript_markers: list[str],
|
||||
config: ProjectConfig,
|
||||
slides: dict[str, SlideDefinition],
|
||||
videos: dict[str, VideoSource],
|
||||
) -> None:
|
||||
"""
|
||||
Validate all parsed project data. Raises ValidationError if any issues found.
|
||||
|
||||
Checks:
|
||||
- All slide markers in manuscript exist in slides.json
|
||||
- All slide images exist on disk
|
||||
- All video files exist on disk
|
||||
- Background video exists (if specified)
|
||||
- Slide types are valid
|
||||
"""
|
||||
issues: list[ValidationIssue] = []
|
||||
|
||||
# Check all manuscript markers have corresponding slides
|
||||
for marker in manuscript_markers:
|
||||
if marker not in slides:
|
||||
issues.append(ValidationIssue(
|
||||
f"Slide marker [{marker}] referenced in manuscript but not defined in slides.json",
|
||||
project_path / "manuscript.txt"
|
||||
))
|
||||
|
||||
# Check all slide images exist
|
||||
media_path = project_path / "media"
|
||||
slides_path = media_path / "slides"
|
||||
|
||||
for slide_id, slide_def in slides.items():
|
||||
image_path = slides_path / slide_def.image
|
||||
if not image_path.exists():
|
||||
issues.append(ValidationIssue(
|
||||
f"Slide image not found: {slide_def.image}",
|
||||
project_path / "slides.json"
|
||||
))
|
||||
|
||||
# Check slide type is valid
|
||||
if slide_def.type not in SLIDE_LAYOUTS:
|
||||
issues.append(ValidationIssue(
|
||||
f"Unknown slide type '{slide_def.type}' for slide {slide_id}. "
|
||||
f"Valid types: {list(SLIDE_LAYOUTS.keys())}",
|
||||
project_path / "slides.json"
|
||||
))
|
||||
|
||||
# Check all video files exist
|
||||
for video_id, video_source in videos.items():
|
||||
video_path = project_path / video_source.file
|
||||
if not video_path.exists():
|
||||
issues.append(ValidationIssue(
|
||||
f"Video file not found: {video_source.file}",
|
||||
project_path / "videos.json"
|
||||
))
|
||||
|
||||
# Check background video exists (if specified)
|
||||
if config.background_video:
|
||||
bg_path = project_path / config.background_video
|
||||
if not bg_path.exists():
|
||||
issues.append(ValidationIssue(
|
||||
f"Background video not found: {config.background_video}",
|
||||
project_path / "project.json"
|
||||
))
|
||||
|
||||
# Check we have at least one video source
|
||||
if not videos:
|
||||
issues.append(ValidationIssue(
|
||||
"No video sources defined in videos.json",
|
||||
project_path / "videos.json"
|
||||
))
|
||||
|
||||
# Check resolution is reasonable
|
||||
width, height = config.resolution
|
||||
if width < 100 or height < 100:
|
||||
issues.append(ValidationIssue(
|
||||
f"Resolution too small: {width}x{height}",
|
||||
project_path / "project.json"
|
||||
))
|
||||
|
||||
if width > 7680 or height > 4320:
|
||||
issues.append(ValidationIssue(
|
||||
f"Resolution too large: {width}x{height} (max 8K)",
|
||||
project_path / "project.json"
|
||||
))
|
||||
|
||||
# Check FPS is reasonable
|
||||
if config.fps < 1 or config.fps > 120:
|
||||
issues.append(ValidationIssue(
|
||||
f"Invalid FPS: {config.fps} (must be 1-120)",
|
||||
project_path / "project.json"
|
||||
))
|
||||
|
||||
# If any issues, raise ValidationError
|
||||
if issues:
|
||||
raise ValidationError(issues)
|
||||
Reference in New Issue
Block a user