Compare commits

...

33 Commits

Author SHA1 Message Date
gitprov b9b5a8e77d Adding pexels downloader and fixes 2026-06-07 11:19:19 +02:00
gitprov 980bb84dac Fixing black formatting 2026-05-13 21:53:22 +02:00
gitprov 20aba06be1 Commit fix to time reader 2026-05-13 21:30:40 +02:00
gitprov 12b052eb1d Avoiding destructive down command when running all 2026-05-13 08:14:59 +02:00
gitprov cf40a19b4e Fixes to gnommo 2026-05-13 08:13:20 +02:00
gitprov 5d7c77db91 Adding fix to the slide 2026-05-12 21:11:33 +02:00
gitprov 87424a6531 Adding chunking to main render loop 2026-05-12 20:45:36 +02:00
gitprov 60e2f20b0f Adding performance tuning 2026-05-12 20:22:05 +02:00
gitprov 4a24d3987f Fixing the chunker 2026-05-12 20:16:28 +02:00
gitprov 7c53daec8a Adding fix to transpose 2026-05-12 19:57:28 +02:00
gitprov 41d96501b6 Fixes to performance 2026-05-12 19:49:15 +02:00
gitprov ff47ffea8f Fixing the issue 2026-05-12 08:16:30 +02:00
gitprov b4c48d81b0 Fxing the cache path 2026-05-12 08:07:12 +02:00
gitprov 409d7790c0 Fixing some filter paralleism 2026-05-12 08:04:45 +02:00
gitprov 994a2e0bb6 Fixing loudness issue 2026-05-12 00:52:14 +02:00
gitprov feb4df0506 Adding some files 2026-05-11 21:45:30 +02:00
gitprov b9376cd650 dding updates to gnommo 2026-05-11 08:23:21 +02:00
gitprov 0c2d097cdf Adding fix to aligner 2026-05-10 13:46:50 +02:00
gitprov 2dff8f45b9 Adding fixes to the publish pipeline 2026-05-09 15:36:15 +02:00
gitprov 00e01237ed Adding rsync --delete flag on up 2026-05-09 14:59:01 +02:00
gitprov 3a9e5d17e9 Updating the sync logic 2026-05-09 14:42:42 +02:00
gitprov dac6dfc48b Adding some more fixes for path 2026-05-09 13:09:41 +02:00
gitprov a351022a8f Adding some fixe 2026-05-09 13:06:37 +02:00
gitprov efd1eba5df fixing path issue on wsl 2026-05-09 12:55:33 +02:00
gitprov ad07de2e9a Git adding case insenstiive 2026-05-09 12:51:59 +02:00
gitprov e6a6968109 Tweaks ton esure that 2026-05-09 12:38:05 +02:00
gitprov d722272edc Adding ignoring processed as well 2026-05-09 12:31:17 +02:00
gitprov f8d359543a Add two way sync improvement 2026-05-09 12:18:26 +02:00
gitprov 12bf494f2d Fail gracefully on machines without osascript support 2026-05-09 12:11:36 +02:00
gitprov 831c0c4e60 Adding some bugfixes to the 'all' command 2026-05-09 12:06:15 +02:00
gitprov f0387f24bb Adding support for audio again 2026-05-08 08:08:08 +02:00
gitprov 26d027a44e Adding cache so we can sync via server 2026-05-04 20:31:37 +02:00
gitprov 2516e3eeef Add gnommo load command to copy projects from removable media
Adds the inverse of the archive command: `gnommo load -p <project>`
inspects the configured external drive and rsyncs the project folder
onto the local drive. Supports --dry-run. Also expands .gitignore to
cover additional media file types and project directories.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-04 20:05:12 +02:00
17 changed files with 2746 additions and 598 deletions
+21 -2
View File
@@ -7,16 +7,35 @@ __pycache__/
venv/
.venv/
*.egg-info/
Video1/*
*.pdf
*.png
*.key
*.bak
shared_assets/*
Video*/*
Illustrations
# OS
.DS_Store
Thumbs.db
*/intermediate/*
# Output
**/out/
*.mp4
*.mov
*.mp3
*.aifc
*.wav
# Temp
*.tmp
.cache/
# Secrets
.env
.env.*
# Sync state (local only, per-environment)
.gnommo_sync.json
.gnommo_sync.prod.json
+3 -1
View File
@@ -7,13 +7,14 @@
"platform_targets": ["youtube"],
"status": "scripted",
"youtube_url": null,
"resolution": [1960, 1080],
"resolution": [1920, 1080],
"fps": 30,
"duration_seconds": null,
"default_filters": {
"audioonly": [
{
"type": "audio_normalize",
"enable":false,
"compress": false,
"normalize": true,
"target_lufs": -14,
@@ -24,6 +25,7 @@
"talkinghead": [
{
"type": "audio_normalize",
"enable":false,
"normalize": true,
"target_lufs": -14,
"target_lra": 11,
Executable
+9
View File
@@ -0,0 +1,9 @@
#!/bin/sh
./gnommo.sh -p video1 all --force --prod
./gnommo.sh -p video2 all --force --prod
./gnommo.sh -p video3 all --force --prod
./gnommo.sh -p video4 all --force --prod
#./gnommo.sh -p video5 all --force
#./gnommo.sh -p video6 all --force
Executable
+5
View File
@@ -0,0 +1,5 @@
#!/bin/bash
claude --resume df8f915f-0f99-4e0f-b345-3562a49fcb06
+3 -2
View File
@@ -4,8 +4,7 @@
"description": "In this video, I demonstrate the Gnommo video editing pipeline - a code-first approach to creating presenter-mode videos from Keynote presentations.",
"footer": "Subscribe for more tutorials!\nTwitter: @example",
"resolution": [1920, 1080],
"fps": 30,
"gnommo_scratch": null,
"fps": 30,
"defaultSlideType": "fullscreen",
"keynote_file": "media/example.key",
"transcript": "media/videos/talking_head.transcript.json",
@@ -14,10 +13,12 @@
"videos": "media/videos/videos.json",
"slides": "media/slides/Example/slides.json",
"audio": "media/audio/audio.json",
"output": "final.mp4",
"default_filters": {
"talkinghead": [
{
"type": "audio_normalize",
"enable":false,
"eq_bands": [
{"freq": 47, "gain": -15, "type": "lowshelf"},
{"freq": 107, "gain": -1.3, "q": 1.2},
+101 -2
View File
@@ -11,10 +11,66 @@ Files are looked up first locally, then in the cache at:
"""
import configparser
import os
from pathlib import Path
from typing import Optional, Tuple
_cache_config: Optional[dict] = None
_perf_config: Optional[dict] = None
def get_ffmpeg_thread_count() -> int:
"""Return FFmpeg thread count based on [performance] cpu_limit in ~/.gnommo.conf.
cpu_limit is a fraction of logical CPUs (e.g. 0.8 = 80%).
Defaults to 1 when not configured, which is safe on memory-constrained machines.
Example ~/.gnommo.conf:
[performance]
cpu_limit = 0.8
"""
global _perf_config
if _perf_config is None:
config_path = Path.home() / ".gnommo.conf"
_perf_config = {}
if config_path.exists():
cfg = configparser.ConfigParser()
cfg.read(config_path)
if cfg.has_option("performance", "cpu_limit"):
try:
_perf_config["cpu_limit"] = float(
cfg.get("performance", "cpu_limit")
)
except ValueError:
pass
cpu_limit = _perf_config.get("cpu_limit")
if cpu_limit is None:
return 1
cpu_count = os.cpu_count() or 1
return max(1, int(cpu_count * cpu_limit))
def get_render_chunk_size() -> Optional[int]:
"""Return slides-per-chunk for auto-chunked rendering, or None if not configured.
When set, cmd_render splits the filter graph into chunks of this many slides
to avoid OOM from allocating filter buffers for the entire video at once.
Example ~/.gnommo.conf:
[performance]
render_chunk_slides = 15
"""
global _perf_config
if _perf_config is None:
get_ffmpeg_thread_count() # populates _perf_config
val = _perf_config.get("render_chunk_slides")
if val is None:
return None
try:
return max(1, int(val))
except (ValueError, TypeError):
return None
def load_cache_config() -> Optional[Path]:
@@ -73,18 +129,61 @@ def resolve_with_cache(
if cache_base is None:
return local_path, False # No cache configured
# Build cache path: {cache_base}/{project_name}/{relative_path}
# Try 1: path inside the project → cache_base / project_name / relative
try:
relative = local_path.relative_to(project_path)
cache_path = cache_base / project_path.name / relative
if cache_path.exists():
return cache_path, True
except ValueError:
pass # local_path is not relative to project_path
pass # local_path is not under project_path
# Try 2: path relative to gnommo root (sibling dirs like shared_assets)
# e.g. shared_assets/pexels/file.mp4 → cache_base / shared_assets / pexels / file.mp4
try:
relative = local_path.relative_to(project_path.parent)
cache_path = cache_base / relative
if cache_path.exists():
return cache_path, True
except ValueError:
pass # local_path is not under project_path.parent either
return local_path, False
def load_server_config() -> Optional[dict]:
"""Load server rsync config from ~/.gnommo.conf.
Expected config:
[server]
host = 76.13.144.52
user = root
path = /gnommo/project
Returns:
Dict with keys host, user, path (and optionally port), or None.
"""
config_path = Path.home() / ".gnommo.conf"
if not config_path.exists():
return None
config = configparser.ConfigParser()
config.read(config_path)
if not config.has_section("server"):
return None
host = config.get("server", "host", fallback=None)
user = config.get("server", "user", fallback="root")
path = config.get("server", "path", fallback="/gnommo/project")
port = config.get("server", "port", fallback="22")
if not host:
return None
return {"host": host, "user": user, "path": path, "port": port}
def is_cache_configured() -> bool:
"""Check if cache is configured (for status messages)."""
return load_cache_config() is not None
+1069 -162
View File
File diff suppressed because it is too large Load Diff
+60 -37
View File
@@ -1,4 +1,4 @@
"""Hand off a finished video to the gnommoweb server.
"""Hand off a finished video to MinIO storage via gnommoeditor (prod) or gnommoweb (local).
Works for any gnommo project type: parent videos and shorts alike.
@@ -10,14 +10,17 @@ Usage:
Reads project.json for the 'output_video' field (path relative to the
project directory). Override with --file.
On success:
- Uploads the video to MinIO via POST /api/projects/:handle/handoff
- For shorts: server auto-advances status to 'processed'
- Bumps video_version on every upload
On success (production):
- Uploads the video to MinIO via POST /api/assets/upload on gnommoeditor
- Updates .gnommo_sync.prod.json with asset URL
On success (local):
- Uploads via POST /api/projects/:handle/handoff on gnommoweb
- Updates .gnommo_sync.json with new video_version
Configuration (from .env or environment):
GNOMMOWEB_URL Base URL (e.g. http://localhost:3001)
GNOMMOEDITOR_URL Base URL for production (e.g. https://editor.glitch.university)
GNOMMOWEB_URL Base URL for local dev (e.g. http://localhost:3001)
GNOMMOWEB_API_KEY Bearer token (CONTENT_API_KEY from gnommoweb)
"""
@@ -83,13 +86,9 @@ def cmd_handoff(
_load_env_file()
if prod:
api_url = os.environ.get("GNOMMOWEB_PROD_URL", "").rstrip("/")
api_key = os.environ.get("GNOMMOWEB_PROD_API_KEY", "")
api_url = os.environ.get("GNOMMOEDITOR_URL", "").rstrip("/")
if not api_url:
print("Error: GNOMMOWEB_PROD_URL is not set.", file=sys.stderr)
return 1
if not api_key:
print("Error: GNOMMOWEB_PROD_API_KEY is not set.", file=sys.stderr)
print("Error: GNOMMOEDITOR_URL is not set.", file=sys.stderr)
return 1
else:
api_url = os.environ.get("GNOMMOWEB_URL", "").rstrip("/")
@@ -102,7 +101,7 @@ def cmd_handoff(
return 1
if verbose:
target = "production" if prod else "local"
target = "production (gnommoeditor)" if prod else "local"
print(f"{target}: {api_url}")
project_file = project_path / "project.json"
@@ -147,13 +146,23 @@ def cmd_handoff(
# ── Upload ─────────────────────────────────────────────────────────────────
try:
with open(video_path, "rb") as vf:
r = requests.post(
f"{api_url}/api/projects/{project_id}/handoff",
files={"video": (video_path.name, vf, _mime_type(video_path))},
headers={"Authorization": f"Bearer {api_key}"},
timeout=None, # large files may take a while
)
if prod:
# gnommoeditor: POST /api/assets/upload — field name is 'file', no auth
with open(video_path, "rb") as vf:
r = requests.post(
f"{api_url}/api/assets/upload",
files={"file": (video_path.name, vf, _mime_type(video_path))},
timeout=None,
)
else:
# gnommoweb: POST /api/projects/:id/handoff
with open(video_path, "rb") as vf:
r = requests.post(
f"{api_url}/api/projects/{project_id}/handoff",
files={"video": (video_path.name, vf, _mime_type(video_path))},
headers={"Authorization": f"Bearer {api_key}"},
timeout=None,
)
except requests.exceptions.ConnectionError:
print(f"✗ Could not connect to {api_url}")
return 1
@@ -167,28 +176,42 @@ def cmd_handoff(
return 1
result = r.json()
video_version = result.get("video_version", "?")
video_url = result.get("video_url", "")
# ── Write sync state ───────────────────────────────────────────────────────
now_iso = datetime.now(tz=timezone.utc).isoformat(timespec="seconds")
existing_sync = _read_sync(project_path, prod)
_write_sync(
project_path,
{
**existing_sync,
"last_handoff_at": now_iso,
"video_version": video_version,
"server_updated_at": result.get("asset", {}).get(
"updated_at", existing_sync.get("server_updated_at")
),
},
prod,
)
print(f"{project_id} → v{video_version} [processed]")
if video_url:
print(f" {video_url}")
if prod:
# gnommoeditor response: { asset: { id, url, minio_object_key, ... } }
asset = result.get("asset", {})
asset_url = asset.get("url", "")
_write_sync(
project_path,
{**existing_sync, "last_handoff_at": now_iso, "asset_url": asset_url},
prod,
)
print(f"{project_id} → uploaded [asset #{asset.get('id')}]")
if asset_url:
print(f" {asset_url}")
else:
# gnommoweb response: { video_version, video_url, asset: { updated_at } }
video_version = result.get("video_version", "?")
video_url = result.get("video_url", "")
_write_sync(
project_path,
{
**existing_sync,
"last_handoff_at": now_iso,
"video_version": video_version,
"server_updated_at": result.get("asset", {}).get(
"updated_at", existing_sync.get("server_updated_at")
),
},
prod,
)
print(f"{project_id} → v{video_version} [processed]")
if video_url:
print(f" {video_url}")
return 0
+23 -5
View File
@@ -49,6 +49,7 @@ class ProjectConfig:
slides_path: str = "slides.json" # path to slides.json relative to project
videos_path: str = "videos.json" # path to videos.json relative to project
audio_path: str = "audio.json" # path to audio.json relative to project
transcript_path: Optional[str] = None # path to transcript.json relative to project (always saved locally)
audio_source: Optional[str] = None # defaults to talking head
main_video: Optional[
Union[str, list]
@@ -56,6 +57,9 @@ class ProjectConfig:
gnommo_scratch: Optional[
str
] = None # directory for intermediate files (e.g., external SSD)
process_cache: Optional[
str
] = None # external directory for processed/combined outputs (saves laptop disk space)
default_begin: float = 0.0 # Trim this many seconds from the start of each segment (if no explicit begin/skip)
default_end_trim: float = 0.0 # Trim this many seconds from the end of each segment (if no explicit end/take)
# Outro sequence - plays after narration ends (not marker-triggered)
@@ -132,6 +136,15 @@ class GnommoKeyConfig:
# Can help with edge color contamination
alpha_bias: tuple[int, int, int] = None
# Luminance protection: pixels with luma above this stay fully opaque (0-255, -1 = off)
# Use ~220 to protect white objects (headphones, teeth) from being partially keyed.
protect_luma: int = -1
# Shadow boost: extra key strength for dark pixels (0.0-5.0, 0 = off)
# Ramps up key signal proportionally to how dark a pixel is, helping key dark greens
# without affecting bright foreground areas. Values 1.0-2.0 are typical.
shadow_boost: float = 0.0
# Edge refinement
edge_erode: int = 0 # Pixels to erode from alpha edge (0-5)
edge_soften: float = 0.0 # Blur the alpha edge (0-5 pixels)
@@ -192,7 +205,7 @@ class AudioNormalizeConfig:
Applies noise reduction, compression, and loudness normalization
to improve audio quality and consistency.
"""
enabled: bool = True # Master switch to enable/disable all audio processing
# Parametric EQ bands (applied before other processing)
eq_bands: list[EQBand] = field(default_factory=list)
@@ -297,10 +310,14 @@ class VideoSource:
False # If True, skip loudnorm during preprocessing (apply after concatenation)
)
volume: float = 1.0 # Volume multiplier (1.0=full, >1.0=boost, <1.0=reduce)
layer: str = "above" # "above" = renders on top of slides; "below" = behind slides
duration: Optional[float] = None # Pre-probed file duration in seconds (set by import)
layer: str = "above" # "above" = on top of slides; "mid" = above narrator/below slides; "below" = behind narrator
duration: Optional[
float
] = None # Pre-probed file duration in seconds (set by import)
has_audio: Optional[bool] = None # Pre-detected audio presence (set by import)
end_on: Optional[str] = None # When video event ends: "next_slide" | "end" | "take" (None = marker-type default)
end_on: Optional[
str
] = None # When video event ends: "next_slide" | "end" | "take" (None = marker-type default)
@dataclass
@@ -333,7 +350,7 @@ class SlideEvent:
class AudioDefinition:
"""Definition of an audio clip from audio.json."""
file: str # Audio filename (relative to audio.json location)
file: str # Audio filename (relative to audio.json location, or to shared_assets/media/audio/ if is_shared)
volume: float = 1.0 # Volume multiplier (0.0-1.0)
loop: bool = False # If True, loop for entire duration from trigger point
overlap: Optional[float] = None # Crossfade overlap in seconds when looping
@@ -341,6 +358,7 @@ class AudioDefinition:
False # If True, audio continues playing during narration pauses
)
duration: Optional[float] = None # Pre-probed duration in seconds (set by import)
is_shared: bool = False # If True, file is relative to shared_assets/media/audio/
@dataclass
+156 -8
View File
@@ -25,6 +25,35 @@ def _read_json(path: Path) -> Any:
return json.loads(text) if text else {}
def _resolve_case_insensitive(path: Path) -> Path:
"""Return the real on-disk path, resolving each component case-insensitively.
On case-insensitive filesystems (macOS) paths just work. On case-sensitive
ones (Linux/WSL) a mismatch between project.json and the actual directory
name causes a FileNotFoundError. This walks each component and picks the
first directory entry whose name matches case-insensitively, returning the
corrected path. If the path already exists, it is returned unchanged.
"""
if path.exists():
return path
resolved = path.anchor and Path(path.anchor) or Path(".")
for part in path.parts[len(Path(path.anchor).parts) :]:
if (resolved / part).exists():
resolved = resolved / part
else:
try:
match = next(
(p for p in resolved.iterdir() if p.name.lower() == part.lower()),
None,
)
except (OSError, NotADirectoryError):
match = None
resolved = match if match else (resolved / part)
return resolved
def parse_manuscript(
project_path: Path,
) -> tuple[str, list[str], list[tuple[int, str]], list[Citation]]:
@@ -55,9 +84,9 @@ def parse_manuscript(
text = re.sub(r"\[marker:[^\]]+\]", "", text)
text = re.sub(r"\[cue:[^\]]+\]", "", text)
# Extract all valid markers like [S1], [video:demo], [Zoom2], etc.
# Include . in pattern to catch markers with file extensions (so validator can warn about them)
markers = re.findall(r"\[([A-Za-z0-9_:.]+)\]", text)
# Extract all valid markers like [S1], [video:demo], [vf2m:pexels/clip-name], etc.
# Include / and - to capture pexels/library video IDs; . to catch file extensions in markers.
markers = re.findall(r"\[([A-Za-z0-9_:./\-]+)\]", text)
# Find malformed markers (missing brackets, extra spaces, etc.)
malformed: list[tuple[int, str]] = []
@@ -229,9 +258,10 @@ def parse_project_config(project_path: Path) -> ProjectConfig:
slides_path=data.get("slides", "slides.json"),
videos_path=data.get("videos", "videos.json"),
audio_path=data.get("audio", "audio.json"),
transcript_path=data.get("transcript"),
audio_source=data.get("audio_source"),
main_video=data.get("main_video"),
gnommo_scratch=data.get("gnommo_scratch"),
process_cache=data.get("process_cache"),
default_begin=float(data.get("default_begin", 0.0)),
default_end_trim=float(data.get("default_end_trim", 0.0)),
outro=data.get("outro", []),
@@ -263,7 +293,10 @@ def parse_slides(
) -> dict[str, SlideDefinition]:
"""Parse slides.json into slide definitions."""
if config and config.slides_path:
local_slides_path = project_path / config.slides_path
# Lowercase the path so that a capital-cased project name embedded by
# the import stage (e.g. "media/slides/video2/slides.json") resolves
# correctly on case-sensitive filesystems (WSL/Linux).
local_slides_path = project_path / config.slides_path.lower()
else:
local_slides_path = project_path / "slides.json"
@@ -342,6 +375,7 @@ def parse_audio(
overlap=overlap,
ignore_pauses=bool(audio_data.get("ignore_pauses", False)),
duration=float(raw_duration) if raw_duration is not None else None,
is_shared=bool(audio_data.get("is_shared", False)),
)
return audio, audio_dir
@@ -356,6 +390,7 @@ def parse_timestamp(value: str) -> float:
- "2:54" → 2 minutes 54 seconds (174.0)
- "1:23:45" → 1 hour 23 minutes 45 seconds
- "2:54.5" → 2 minutes 54.5 seconds
- "2m:3.5s" → 2 minutes 3.5 seconds
Returns:
Time in seconds as a float.
@@ -366,6 +401,10 @@ def parse_timestamp(value: str) -> float:
value = value.strip()
# Remove trailing 's' if present (e.g., "3.5s")
if "h" in value:
value = value.replace("h", ":")
if "m" in value:
value = value.replace("m", ":")
if value.endswith("s"):
value = value[:-1]
@@ -462,8 +501,12 @@ def parse_videos(
filter_list = filter_value
# Handle skip/take - can use begin/end as user-friendly alternatives
skip = video_data.get("skip", 0.0)
take = video_data.get("take")
skip = float(video_data.get("skip") or 0.0)
take = (
float(video_data["take"])
if video_data.get("take") not in (None, "")
else None
)
# Convert begin/end to skip/take if provided
if "begin" in video_data and video_data["begin"]:
@@ -571,9 +614,11 @@ def parse_narration(
skip = segment_data.get("skip", default_begin)
take = segment_data.get("take")
# Explicit begin/end always override defaults
# Explicit begin/start/end always override defaults
if "begin" in segment_data and segment_data["begin"]:
skip = parse_timestamp(segment_data["begin"])
elif "start" in segment_data and segment_data["start"]:
skip = parse_timestamp(segment_data["start"])
if "end" in segment_data and segment_data["end"]:
end_time = parse_timestamp(segment_data["end"])
# take = end - begin (duration from begin to end)
@@ -688,3 +733,106 @@ def resolve_video_file(
# Direct video file reference
return ref_path, None
def resolve_missing_videos(
missing_ids: list[str],
project_path: Path,
config: Optional[ProjectConfig] = None,
) -> dict[str, VideoSource]:
"""
For video IDs not found in the project's videos.json, look them up in
shared_assets/videos.json. When a match is found the entry is written back
into the project's videos.json with ``is_shared: true`` so subsequent runs
find it without another lookup.
Returns a dict of newly resolved VideoSource objects (only the ones found).
Silently ignores IDs that aren't in the shared library either.
"""
if not missing_ids:
return {}
# Locate shared_assets
shared_dir: Optional[Path] = None
if (project_path / "shared_assets").exists():
shared_dir = project_path / "shared_assets"
elif (project_path.parent / "shared_assets").exists():
shared_dir = project_path.parent / "shared_assets"
if shared_dir is None:
return {}
shared_videos_path = shared_dir / "videos.json"
if not shared_videos_path.exists():
return {}
try:
shared_data = _read_json(shared_videos_path)
except (json.JSONDecodeError, OSError):
return {}
found = {vid_id for vid_id in missing_ids if vid_id in shared_data}
if not found:
return {}
# Load the project's videos.json so we can append to it
if config and config.videos_path:
local_videos_path = project_path / config.videos_path
else:
local_videos_path = project_path / "videos.json"
try:
local_data = _read_json(local_videos_path) if local_videos_path.exists() else {}
except (json.JSONDecodeError, OSError):
local_data = {}
resolved: dict[str, VideoSource] = {}
for video_id in sorted(found):
entry = dict(shared_data[video_id])
entry["is_shared"] = True
# Persist into the project's videos.json
local_data[video_id] = entry
print(f" → Copied shared video '{video_id}' into videos.json (is_shared=true)")
# Build the in-memory VideoSource
attribution = None
if "attribution" in entry:
attr = entry["attribution"]
attribution = Attribution(
source=attr.get("source", "unknown"),
creator=attr.get("creator", "Unknown"),
url=attr.get("url"),
)
raw_duration = entry.get("duration")
raw_has_audio = entry.get("has_audio")
resolved[video_id] = VideoSource(
source_file=entry["source_file"],
filter=entry.get("filter", []),
output_file=entry.get("output_file"),
take=entry.get("take"),
skip=float(entry.get("skip", 0.0)),
zoom=float(entry.get("zoom", 1.0)),
cutout=entry.get("cutout"),
always_visible=bool(entry.get("always_visible", False)),
is_shared=True,
pause_narration=float(entry.get("pause_narration", 0)),
attribution=attribution,
use_audio_channels=entry.get("use_audio_channels", "both"),
defer_loudnorm=bool(entry.get("defer_loudnorm", False)),
volume=float(entry.get("volume", 1.0)),
layer=entry.get("layer", "above"),
duration=float(raw_duration) if raw_duration is not None else None,
has_audio=bool(raw_has_audio) if raw_has_audio is not None else None,
end_on=entry.get("end_on"),
)
try:
with open(local_videos_path, "w", encoding="utf-8") as fh:
json.dump(local_data, fh, indent=4)
fh.write("\n")
except OSError as e:
print(f" Warning: could not update videos.json: {e}")
return resolved
+312
View File
@@ -0,0 +1,312 @@
"""Pexels video downloader for gnommo shared_assets.
Configure API key in ~/.gnommo.conf:
[pexels]
api_key = YOUR_KEY_HERE
Get a free key at https://www.pexels.com/api/
"""
import configparser
import json
import re
import sys
import urllib.error
import urllib.request
from pathlib import Path
from typing import Optional
def get_pexels_api_key() -> Optional[str]:
config_path = Path.home() / ".gnommo.conf"
if not config_path.exists():
return None
cfg = configparser.ConfigParser()
cfg.read(config_path)
return cfg.get("pexels", "api_key", fallback=None)
def extract_pexels_id(source_file: str) -> Optional[str]:
"""Extract the numeric Pexels video ID from a source_file path.
Handles names like 'pexels/11868263-hd_1920_1080_24fps.mp4'
and 'pexels/12136677_1080_1920_30fps.mp4'.
"""
name = Path(source_file).stem.split("/")[-1]
m = re.match(r"^(\d+)", name)
return m.group(1) if m else None
def _fetch_video_info(pexels_id: str, api_key: str) -> Optional[dict]:
url = f"https://api.pexels.com/videos/videos/{pexels_id}"
req = urllib.request.Request(
url,
headers={"Authorization": api_key, "User-Agent": "Mozilla/5.0 gnommo/1.0"},
)
try:
with urllib.request.urlopen(req, timeout=15) as resp:
return json.loads(resp.read())
except urllib.error.HTTPError as e:
print(f" [{pexels_id}] Pexels API error {e.code} — video may have been deleted", flush=True)
return None
except Exception as e:
print(f" [{pexels_id}] Pexels API error: {e}", flush=True)
return None
def description_from_url(video_url: str) -> str:
"""Extract human-readable description from a Pexels video URL slug.
'https://www.pexels.com/video/abstract-television-noise-11868263/'
'Abstract Television Noise'
"""
m = re.search(r"/video/([a-z0-9][a-z0-9-]+?)-\d+/?$", video_url)
if m:
return m.group(1).replace("-", " ").title()
return ""
def _pick_best_video_file(video_files: list, source_file: str) -> Optional[dict]:
"""Select the video_files entry that best matches the hints in source_file."""
stem = Path(source_file).stem.split("/")[-1]
width_hint = height_hint = fps_hint = quality_hint = None
m = re.search(r"[_-](\d{3,4})[_-](\d{3,4})[_-](\d+)fps", stem)
if m:
width_hint = int(m.group(1))
height_hint = int(m.group(2))
fps_hint = int(m.group(3))
for q in ("uhd", "hd", "sd"):
if q in stem.lower():
quality_hint = q
break
mp4s = [f for f in video_files if f.get("file_type") == "video/mp4"]
if not mp4s:
mp4s = video_files # fall back to any format
def score(vf: dict) -> int:
s = 0
if quality_hint and vf.get("quality", "").lower() == quality_hint:
s += 10
if width_hint and vf.get("width") == width_hint:
s += 5
if height_hint and vf.get("height") == height_hint:
s += 5
if fps_hint and round(float(vf.get("fps") or 0)) == fps_hint:
s += 3
return s
return max(mp4s, key=score)
def download_video(
source_file: str,
shared_assets_dir: Path,
api_key: str,
) -> Optional[dict]:
"""Download one Pexels video to shared_assets_dir/<source_file>.
Returns a metadata dict {description, duration, has_audio=False} on
success, or None on failure.
"""
pexels_id = extract_pexels_id(source_file)
if not pexels_id:
print(f" Cannot extract Pexels ID from: {source_file}", file=sys.stderr)
return None
target_path = shared_assets_dir / source_file
target_path.parent.mkdir(parents=True, exist_ok=True)
print(f" [{pexels_id}] Fetching video info...", flush=True)
info = _fetch_video_info(pexels_id, api_key)
if not info:
return None
description = description_from_url(info.get("url", ""))
duration = float(info.get("duration") or 0) or None
video_files = info.get("video_files", [])
if not video_files:
print(f" [{pexels_id}] No video files in API response", flush=True)
return None
best = _pick_best_video_file(video_files, source_file)
if not best:
return None
download_url = best["link"]
w, h, fps = best.get("width", "?"), best.get("height", "?"), best.get("fps", "?")
q = best.get("quality", "?")
label = f'"{description}"' if description else ""
print(f" [{pexels_id}] {label}{q} {w}x{h} @ {fps}fps", flush=True)
print(f"{target_path}", flush=True)
try:
req = urllib.request.Request(
download_url, headers={"User-Agent": "Mozilla/5.0 gnommo/1.0"}
)
with urllib.request.urlopen(req, timeout=300) as resp:
total = int(resp.headers.get("Content-Length") or 0)
downloaded = 0
chunks: list[bytes] = []
chunk_size = 1024 * 512 # 512 KB
while True:
chunk = resp.read(chunk_size)
if not chunk:
break
chunks.append(chunk)
downloaded += len(chunk)
if total:
pct = downloaded * 100 // total
mb_done = downloaded / 1024 / 1024
mb_total = total / 1024 / 1024
print(f" {pct:3d}% {mb_done:.1f}/{mb_total:.1f} MB\r", end="", flush=True)
print(f" Done — {downloaded / 1024 / 1024:.1f} MB ", flush=True)
target_path.write_bytes(b"".join(chunks))
except Exception as e:
print(f"\n Download failed: {e}", flush=True)
return None
return {
"description": description,
"duration": duration,
"has_audio": False, # conservative; renderer probes when needed
}
def update_videos_json(
json_path: Path,
video_id: str,
metadata: dict,
) -> None:
"""Write description (and other metadata) into an existing videos.json entry."""
if not json_path.exists():
return
with open(json_path, "r", encoding="utf-8") as f:
raw = json.load(f)
if video_id not in raw:
return
changed = False
for key, value in metadata.items():
if value and raw[video_id].get(key) != value:
raw[video_id][key] = value
changed = True
if changed:
with open(json_path, "w", encoding="utf-8") as f:
json.dump(raw, f, indent=2, ensure_ascii=False)
def fetch_metadata(pexels_id: str, api_key: str) -> Optional[dict]:
"""Fetch only description and duration for a Pexels video (no download)."""
info = _fetch_video_info(pexels_id, api_key)
if not info:
return None
return {
"description": description_from_url(info.get("url", "")),
"duration": float(info.get("duration") or 0) or None,
}
def enrich_missing_descriptions(
shared_assets_dir: Path,
api_key: str,
) -> int:
"""Fetch descriptions from Pexels API for entries that have a file on disk but no description.
Scans shared_assets/videos.json for pexels/* entries where:
- description is absent or empty
- source_file exists on disk (locally or via cache)
Returns number of entries updated.
"""
from .cache import resolve_with_cache
videos_json = shared_assets_dir / "videos.json"
if not videos_json.exists():
return 0
with open(videos_json, "r", encoding="utf-8") as f:
raw = json.load(f)
candidates = [
(vid_id, entry)
for vid_id, entry in raw.items()
if vid_id.startswith("pexels/") and not entry.get("description")
]
# Filter to those whose file exists on disk
project_root = shared_assets_dir.parent
to_enrich = []
for vid_id, entry in candidates:
sf = entry.get("source_file", "")
if not sf:
continue
path = shared_assets_dir / sf
resolved, _ = resolve_with_cache(path, project_root)
if resolved.exists():
pexels_id = extract_pexels_id(sf)
if pexels_id:
to_enrich.append((vid_id, pexels_id))
if not to_enrich:
return 0
print(f" Enriching descriptions for {len(to_enrich)} existing pexels video(s)...", flush=True)
updated = 0
for vid_id, pexels_id in to_enrich:
meta = fetch_metadata(pexels_id, api_key)
if meta and meta.get("description"):
print(f" [{pexels_id}] \"{meta['description']}\"", flush=True)
update_videos_json(videos_json, vid_id, meta)
updated += 1
else:
print(f" [{pexels_id}] not found or no description — skipped", flush=True)
return updated
def find_missing_pexels_videos(
manuscript_markers: list[str],
videos: dict,
shared_assets_dir: Path,
) -> list[tuple[str, str]]:
"""Return [(video_id, source_file)] for pexels videos referenced but not on disk."""
from .cache import resolve_with_cache
_VIDEO_PREFIXES = (
"video:", "narration:",
"vft:", "vfb:", "vfm:",
"vf2t:", "vf2b:", "vf2m:",
"vst:", "vsb:", "vsm:",
"vftp:", "vfbp:", "vfmp:",
"vf2tp:", "vf2bp:", "vf2mp:",
"vstp:", "vsbp:", "vsmp:",
)
seen: set[str] = set()
missing: list[tuple[str, str]] = []
for marker in manuscript_markers:
prefix = next((p for p in _VIDEO_PREFIXES if marker.startswith(p)), None)
if prefix is None:
continue
video_id = marker[len(prefix):]
if video_id in seen or not video_id.startswith("pexels/"):
continue
seen.add(video_id)
source_file = videos.get(video_id, None)
if source_file is None:
continue
sf = source_file.source_file if hasattr(source_file, "source_file") else source_file
candidate = shared_assets_dir / sf
# resolve_with_cache needs a project_path — use shared_assets parent
resolved, _ = resolve_with_cache(candidate, shared_assets_dir.parent)
if not resolved.exists():
missing.append((video_id, sf))
return missing
+134 -29
View File
@@ -18,15 +18,23 @@ from .models import (
)
from typing import Union, Optional
def _tc() -> str:
"""Return FFmpeg thread count string from ~/.gnommo.conf [performance] cpu_limit."""
from .cache import get_ffmpeg_thread_count
return str(get_ffmpeg_thread_count())
# Number of parallel workers for chunk processing
DEFAULT_CHUNK_WORKERS = 4
DEFAULT_CHUNK_WORKERS = 1
# Chunk duration in seconds for parallel filter processing (avoids huge intermediate files)
CHUNK_DURATION = 60
# Resolution presets for preview/proxy workflow
# Each entry: (width, height, subdir_name)
RES_CONFIGS: dict[str, tuple[int, int, str] | None] = {
RES_CONFIGS: dict[str, Optional[tuple]] = {
"full": None, # no downscale, no subdir
"low": (490, 270, "low"),
"tiny": (320, 180, "proxy"), # "proxy" subdir kept for backward compat
@@ -120,8 +128,12 @@ def create_downscaled_video(
"ultrafast",
"-crf",
"28",
"-vsync",
"cfr",
"-c:a",
"copy",
"aac", # re-encode audio so both streams share the same PTS origin,
"-ar", # avoiding the lip-sync drift caused by libx264 encoder delay
"48000", # when audio is copied with its original timestamps
str(out_path),
]
result = subprocess.run(cmd, capture_output=True, text=True)
@@ -302,7 +314,6 @@ def run_ffmpeg_with_progress(cmd, duration, description="Processing"):
while True:
# If process ended and no more output, break
if p.poll() is not None:
# drain any remaining output quickly
while True:
line = p.stdout.readline()
@@ -358,7 +369,9 @@ def run_ffmpeg_with_progress(cmd, duration, description="Processing"):
else:
code = p.returncode
# On macOS/Linux, -9 means SIGKILL (OOM kill by OS), -6 = SIGABRT
signal_hint = " (OOM kill)" if code == -9 else (" (abort)" if code == -6 else "")
signal_hint = (
" (OOM kill)" if code == -9 else (" (abort)" if code == -6 else "")
)
sys.stdout.write(f"\n FFmpeg exited with code {code}{signal_hint}\n")
sys.stdout.flush()
@@ -371,12 +384,19 @@ def _has_audio_stream(video_path: Path) -> bool:
"""Return True if the file has a real (non-ghost) audio stream."""
result = subprocess.run(
[
"ffprobe", "-v", "error",
"-analyzeduration", "0",
"-probesize", "1000000",
"-select_streams", "a:0",
"-show_entries", "stream=index,nb_frames",
"-of", "csv=p=0",
"ffprobe",
"-v",
"error",
"-analyzeduration",
"0",
"-probesize",
"1000000",
"-select_streams",
"a:0",
"-show_entries",
"stream=index,nb_frames",
"-of",
"csv=p=0",
str(video_path),
],
capture_output=True,
@@ -542,6 +562,7 @@ def preprocess_video(
verbose: bool = False,
force: bool = False,
custom_gnommo_scratch: Optional[Path] = None,
res: str = "full",
) -> Path:
"""
Apply preprocessing filters to a video source.
@@ -554,6 +575,7 @@ def preprocess_video(
video_id: ID of the video being processed
video_source: VideoSource with source_file, filter, and output_file
custom_gnommo_scratch: Optional external directory for intermediate files (e.g., SSD)
res: Resolution preset — when not "full", source is downscaled before filtering
Returns:
Path to the final preprocessed output file.
@@ -578,6 +600,18 @@ def preprocess_video(
filter_type=None,
)
# For non-full res, downscale the raw source first so all subsequent
# filters (chroma key, color grade, etc.) operate on the small file.
if res != "full":
cfg = RES_CONFIGS.get(res)
if cfg:
width, height, _ = cfg
print(f" Downscaling source to {width}x{height} ({res})...")
raw_low_dir = gnommo_scratch / f"raw_{res}"
current_input = create_downscaled_video(
current_input, raw_low_dir, width, height, force
)
# Resolve channel setting (auto-detect if needed) and sanity check
channel = video_source.use_audio_channels
if channel == "auto":
@@ -622,6 +656,8 @@ def preprocess_video(
batch_num = 0
for batch in filter_batches:
first_filter_type = batch[0].get("type")
if first_filter_type in VIDEO_FILTER_TYPES:
# Combined video filter batch - use chunked processing for large files
@@ -744,12 +780,18 @@ def apply_combined_video_filters(
# Build FFmpeg command
cmd = ["ffmpeg", "-y"]
# Global options before -i (after -i they become output options and don't limit filter threads)
cmd.extend(["-threads", _tc(), "-filter_threads", _tc()])
if take is not None:
cmd.extend(["-t", str(take)])
cmd.extend(
[
"-probesize",
"50000000",
"-analyzeduration",
"50000000",
"-i",
str(input_path),
"-vf",
@@ -859,7 +901,9 @@ def build_mask_filter(config: dict) -> str:
alpha_expr = "+".join(conditions)
alpha_expr = f"if({alpha_expr},0,alpha(X,Y))"
return f"geq=lum='lum(X,Y)':cb='cb(X,Y)':cr='cr(X,Y)':a='{alpha_expr}'"
# Use r/g/b passthrough so this works in rgba space (as output by gnommokey/color_grade)
# without triggering an rgba→yuv conversion that would spawn 11 more swscaler threads.
return f"geq=r='r(X,Y)':g='g(X,Y)':b='b(X,Y)':a='{alpha_expr}'"
def build_color_grade_filter(config: dict) -> str:
@@ -1023,6 +1067,14 @@ def build_gnommokey_filter(config: dict) -> str:
scale_factor = gain * 2.5
key_expr = f"({key_expr})*{scale_factor:.3f}"
# Shadow boost: amplify key signal for dark pixels so dark greens key out fully.
# shadow_factor = 1 - luma/255 (high for dark pixels, 0 for bright pixels)
# extra multiplier = 1 + shadow_boost * shadow_factor
if cfg.shadow_boost > 0:
luma_expr = f"(0.299*r(X,Y)+0.587*g(X,Y)+0.114*b(X,Y))"
shadow_factor = f"(1-{luma_expr}/255)"
key_expr = f"({key_expr})*(1+{cfg.shadow_boost:.3f}*{shadow_factor})"
# Apply clip_black and clip_white to compress the matte
# clip_black: key values below this become 0 (those pixels stay opaque)
# clip_white: key values above this become 255 (fully transparent)
@@ -1040,6 +1092,13 @@ def build_gnommokey_filter(config: dict) -> str:
# Invert: high key value (green) = low alpha (transparent)
alpha_expr = f"255-{key_expr}"
# Luminance protection: lock bright pixels to fully opaque so white objects
# (headphones, teeth) are never accidentally keyed or jitter.
# protect_luma=-1 disables this. Use ~220 for typical white protection.
if cfg.protect_luma >= 0:
luma_expr = f"(0.299*r(X,Y)+0.587*g(X,Y)+0.114*b(X,Y))"
alpha_expr = f"if(gt({luma_expr},{cfg.protect_luma}),255,{alpha_expr})"
# Build the geq filter for alpha (in RGBA mode)
parts.append(f"geq=r='r(X,Y)':g='g(X,Y)':b='b(X,Y)':a='{alpha_expr}'")
@@ -1112,8 +1171,9 @@ def build_gnommokey_filter(config: dict) -> str:
parts.append(f"alphaextract,avgblur=sizeX={radius}:sizeY={radius}[blur]")
# This gets complex - for now, skip alpha blur and just use erosion
# Ensure output is in a good format
parts.append("format=yuva444p10le")
# Stay in rgba so downstream filters (color_grade, mask) don't trigger
# a redundant yuva444p10le→rgba round-trip and its 11-thread swscaler call.
# The caller (_process_chunk_to_prores4444) appends format=yuva444p10le at the end.
return ",".join(parts)
@@ -1152,6 +1212,8 @@ def parse_gnommokey_config(config: dict) -> GnommoKeyConfig:
despill_bias=despill_bias,
despill_strength=float(config.get("despill_strength", 0.5)),
alpha_bias=alpha_bias,
protect_luma=int(config.get("protect_luma", -1)),
shadow_boost=float(config.get("shadow_boost", 0.0)),
edge_erode=int(config.get("edge_erode", 0)),
edge_soften=float(config.get("edge_soften", 0.0)),
)
@@ -1325,10 +1387,20 @@ def _process_chunk_to_prores4444(
# Build FFmpeg command
cmd: list[str] = ["ffmpeg", "-y"]
# Global thread limits MUST be before the first -i.
# After -i they become output-stream options and FFmpeg ignores them for the
# filter graph — each geq stage then spawns one thread per CPU core (11 on M-series),
# causing the N-way RGBA frame buffer explosion that OOM-kills the process.
cmd.extend(["-threads", _tc(), "-filter_threads", _tc()])
# Seek to start time (before input for fast seeking)
if start_time > 0:
cmd.extend(["-ss", str(start_time)])
# Limit initial file analysis to 50 MB. Without this, FFmpeg scans the entire
# source file when moov is at the end (common for camera recordings), which reads
# gigabytes of data and triggers OOM when multiple chunk workers run in parallel.
cmd.extend(["-probesize", "50000000", "-analyzeduration", "50000000"])
cmd.extend(["-i", str(input_path)])
# Limit duration
@@ -1336,7 +1408,6 @@ def _process_chunk_to_prores4444(
if actual_take is not None:
cmd.extend(["-t", str(actual_take)])
# Video encode: ProRes 4444 with alpha
cmd.extend(
[
"-vf",
@@ -1349,8 +1420,6 @@ def _process_chunk_to_prores4444(
"yuva444p10le", # must carry alpha
"-vendor",
"apl0", # optional; helps some NLEs tag as Apple ProRes
"-movflags",
"+faststart", # optional; makes MOV streamable
]
)
@@ -1378,6 +1447,31 @@ def _process_chunk_to_prores4444(
stderr=result.stderr,
)
# Validate the output file is a readable MOV (moov atom present).
# FFmpeg can return 0 but write a corrupt/incomplete file (e.g. moov atom
# missing) when faststart rewrite fails or disk is under pressure.
probe = subprocess.run(
[
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"csv=p=0",
str(output_path),
],
capture_output=True,
text=True,
)
if probe.returncode != 0 or not probe.stdout.strip():
raise PreprocessError(
f"Chunk output file is unreadable or missing moov atom: {output_path.name}",
filter_type="chunk",
command=" ".join(cmd),
stderr=probe.stderr,
)
def _process_chunk_to_webm(
input_path: Path,
@@ -1627,10 +1721,9 @@ def apply_chroma_key(
# Build FFmpeg command
# ProRes 4444 profile for alpha channel support
cmd = [
"ffmpeg",
"-y", # Overwrite output
]
cmd = ["ffmpeg", "-y"]
# Global options before -i
cmd.extend(["-threads", _tc(), "-filter_threads", _tc()])
# Add duration limit if specified (before input for efficiency)
if take is not None:
@@ -1729,14 +1822,13 @@ def apply_mask(
# Using: if(condition, 0, alpha(X,Y))
alpha_expr = f"if({alpha_expr},0,alpha(X,Y))"
# Build the geq filter - preserve luma, chroma, modify alpha
video_filter = f"geq=lum='lum(X,Y)':cb='cb(X,Y)':cr='cr(X,Y)':a='{alpha_expr}'"
# Build the geq filter - preserve RGB channels, modify alpha
video_filter = f"geq=r='r(X,Y)':g='g(X,Y)':b='b(X,Y)':a='{alpha_expr}'"
# Build FFmpeg command
cmd = [
"ffmpeg",
"-y", # Overwrite output
]
cmd = ["ffmpeg", "-y"]
# Global options before -i
cmd.extend(["-threads", _tc(), "-filter_threads", _tc()])
if take is not None:
cmd.extend(["-t", str(take)])
@@ -1886,7 +1978,12 @@ def apply_audio_normalize(
channel_map -> eq_bands -> highpass -> lowpass -> room_eq -> dereverb -> denoise -> gate -> compress -> normalize
"""
cfg = parse_audio_normalize_config(config)
if not cfg.enabled:
# No audio processing, just copy
import shutil
shutil.copy2(input_path, output_path)
return
# Build audio filter chain (order matters!)
audio_filters: list[str] = []
@@ -2036,6 +2133,7 @@ def parse_audio_normalize_config(config: dict[str, Any]) -> AudioNormalizeConfig
)
return AudioNormalizeConfig(
enabled=bool(config.get("enabled", True)),
# Parametric EQ
eq_bands=eq_bands,
# Room treatment
@@ -2144,6 +2242,7 @@ def stitch_narration_segments(
output_path: Path,
verbose: bool = False,
default_end_trim: float = 0.0,
loudnorm_config: Optional[dict] = None,
) -> Path:
"""
Stitch multiple narration video segments into a single file.
@@ -2334,7 +2433,13 @@ def stitch_narration_segments(
output_path.parent / f"{output_path.stem}_normalized{output_path.suffix}"
)
# Use EBU R128 loudnorm targeting YouTube's recommended levels
# Build loudnorm filter string from project config (or fall back to defaults)
_cfg = loudnorm_config or {}
_lufs = float(_cfg.get("target_lufs", -14))
_lra = float(_cfg.get("target_lra", 11))
_tp = float(_cfg.get("target_tp", -1.5))
loudnorm_filter = f"loudnorm=I={_lufs:.1f}:LRA={_lra:.1f}:TP={_tp:.1f}"
loudnorm_cmd = [
"ffmpeg",
"-y",
@@ -2343,7 +2448,7 @@ def stitch_narration_segments(
"-c:v",
"copy",
"-af",
"loudnorm=I=-14:LRA=11:TP=-1.5",
loudnorm_filter,
"-c:a",
"aac",
"-b:a",
+211 -60
View File
@@ -1,36 +1,19 @@
"""Push project metadata to gnommoweb server.
"""Push project metadata to gnommoeditor (prod) or gnommoweb (local).
Usage:
gnommo push -p video1 # push parent video project
gnommo push -p short_pixelated_universe # push a short project
gnommo push -p myproject --force # force push, overwrite server
Reads project.json and POSTs to POST /api/projects/push.
If project.json contains a "parent_project" field, the project is pushed
as a short and registered under that parent. Otherwise it is pushed as a
parent video project.
Parent project.json "shorts" field is a list of slugs (just an index):
"shorts": ["short_pixelated_universe", "short_planck_length"]
Short project.json has its own full config plus a parent_project field:
{
"id": "short_pixelated_universe",
"parent_project": "Video1",
"resolution": [1080, 1920],
"fps": 30,
"duration_seconds": 60,
...
}
Conflict detection:
- If server.updated_at > our recorded server_updated_at → server has newer changes
→ warn and abort unless --force
Reads project.json and companion JSON files, then POSTs to:
Production: POST /api/ingest (gnommoeditor, uses INGEST_API_KEY)
Local: POST /api/projects/push (gnommoweb, uses GNOMMOWEB_API_KEY)
Configuration (from .env or environment):
GNOMMOWEB_URL Base URL (e.g. http://localhost:3001)
GNOMMOWEB_API_KEY Bearer token (CONTENT_API_KEY from gnommoweb)
GNOMMOEDITOR_URL Base URL for production (e.g. https://editor.glitch.university)
INGEST_API_KEY Bearer token for gnommoeditor ingest endpoint
GNOMMOWEB_URL Base URL for local dev (e.g. http://localhost:3001)
GNOMMOWEB_API_KEY Bearer token for local gnommoweb
"""
import json
@@ -85,43 +68,135 @@ def _write_sync(project_path: Path, data: dict, prod: bool = False):
json.dump(data, f, indent=2)
def _parse_ts(ts_str) -> datetime | None:
if not ts_str:
def _load_json_file(path: Path, label: str, verbose: bool) -> dict | list | None:
"""Load a JSON file, returning None if it doesn't exist."""
if not path.exists():
if verbose:
print(f" {label}: not found at {path}")
return None
try:
return datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
except ValueError:
with open(path) as f:
return json.load(f)
except json.JSONDecodeError as e:
print(f" Warning: could not parse {label} ({path}): {e}", file=sys.stderr)
return None
def _load_text_file(path: Path, label: str) -> str | None:
"""Load a text file, returning None if it doesn't exist."""
if not path.exists():
return None
try:
return path.read_text(encoding="utf-8")
except UnicodeDecodeError:
return path.read_text(encoding="latin-1")
def _parse_seconds(value) -> float | None:
"""Convert a time value like '30s', '1:30', or 30 into a plain float of seconds."""
if value is None:
return None
if isinstance(value, (int, float)):
return float(value)
value = str(value).strip()
if value.endswith("s"):
value = value[:-1]
if ":" in value:
parts = value.split(":")
if len(parts) == 2:
return float(parts[0]) * 60 + float(parts[1])
elif len(parts) == 3:
return float(parts[0]) * 3600 + float(parts[1]) * 60 + float(parts[2])
return float(value)
def _sanitize_time_fields(data: dict | None, fields: list[str]) -> dict | None:
"""Return a copy of dict with the given fields converted to plain floats."""
if not data:
return data
result = dict(data)
for field in fields:
if field in result and result[field] is not None:
try:
result[field] = _parse_seconds(result[field])
except (ValueError, TypeError):
pass # leave invalid values for the server to reject with a clear error
return result
def _build_ingest_payload(project: dict, project_path: Path, verbose: bool) -> dict:
"""Build the rich ingest payload for gnommoeditor POST /api/ingest."""
# ── slides ────────────────────────────────────────────────────────────────
slides_path_str = project.get("slides", "slides.json")
slides_path = project_path / slides_path_str
slides = _load_json_file(slides_path, "slides", verbose)
if slides and verbose:
print(f" slides: {len(slides)} entries")
# ── manuscript ────────────────────────────────────────────────────────────
manuscript_path_str = project.get("manuscript", "manuscript.txt")
manuscript_path = project_path / manuscript_path_str
manuscript = _load_text_file(manuscript_path, "manuscript")
if manuscript:
print(f" manuscript: {len(manuscript)} chars")
elif verbose:
print(f" manuscript: not found at {manuscript_path}")
# ── narration ─────────────────────────────────────────────────────────────
narration_path_str = project.get("narration", "narration.json")
narration_path = project_path / narration_path_str
narration = _load_json_file(narration_path, "narration", verbose)
# ── audio ─────────────────────────────────────────────────────────────────
audio_path_str = project.get("audio_tracks", "audio.json")
audio_path = project_path / audio_path_str
audio = _load_json_file(audio_path, "audio", verbose)
# ── videos ────────────────────────────────────────────────────────────────
videos_path_str = project.get("videos", "videos.json")
videos_path = project_path / videos_path_str
videos = _load_json_file(videos_path, "videos", verbose)
# ── citations ─────────────────────────────────────────────────────────────
citations_path = project_path / "citations.json"
citations = _load_json_file(citations_path, "citations", verbose)
# Sanitize time fields — convert "30s", "1:30" etc. to plain floats
_VIDEO_TIME_FIELDS = ["duration", "pause_narration", "skip", "take"]
_NARRATION_TIME_FIELDS = ["skip", "take"]
_AUDIO_TIME_FIELDS = ["overlap", "duration"]
if videos:
videos = {
k: _sanitize_time_fields(v, _VIDEO_TIME_FIELDS) for k, v in videos.items()
}
if narration:
narration = {
k: _sanitize_time_fields(v, _NARRATION_TIME_FIELDS)
for k, v in narration.items()
}
if audio:
audio = {
k: _sanitize_time_fields(v, _AUDIO_TIME_FIELDS) for k, v in audio.items()
}
return {
"project": project,
"slides": slides,
"manuscript": manuscript,
"narration": narration,
"audio": audio,
"videos": videos,
"citations": citations,
}
def cmd_push(
project_path: Path, verbose: bool = False, force: bool = False, prod: bool = False
) -> int:
_load_env_file()
if prod:
api_url = os.environ.get("GNOMMOWEB_PROD_URL", "").rstrip("/")
api_key = os.environ.get("GNOMMOWEB_PROD_API_KEY", "")
if not api_url:
print("Error: GNOMMOWEB_PROD_URL is not set.", file=sys.stderr)
return 1
if not api_key:
print("Error: GNOMMOWEB_PROD_API_KEY is not set.", file=sys.stderr)
return 1
else:
api_url = os.environ.get("GNOMMOWEB_URL", "").rstrip("/")
api_key = os.environ.get("GNOMMOWEB_API_KEY", "")
if not api_url:
print("Error: GNOMMOWEB_URL is not set.", file=sys.stderr)
return 1
if not api_key:
print("Error: GNOMMOWEB_API_KEY is not set.", file=sys.stderr)
return 1
if verbose:
target = "production" if prod else "local"
print(f"{target}: {api_url}")
project_file = project_path / "project.json"
if not project_file.exists():
print(f"Error: {project_file} not found", file=sys.stderr)
@@ -136,9 +211,90 @@ def cmd_push(
print("Error: project.json must have 'id' and 'name' fields.", file=sys.stderr)
return 1
if prod:
return _push_prod(project, project_path, verbose)
else:
return _push_local(project, project_path, verbose, force)
# ── Production: gnommoeditor POST /api/ingest ─────────────────────────────────
def _push_prod(project: dict, project_path: Path, verbose: bool) -> int:
api_url = os.environ.get("GNOMMOEDITOR_URL", "").rstrip("/")
api_key = os.environ.get("INGEST_API_KEY", "")
if not api_url:
print("Error: GNOMMOEDITOR_URL is not set.", file=sys.stderr)
return 1
if not api_key:
print("Error: INGEST_API_KEY is not set.", file=sys.stderr)
return 1
project_id = project["id"]
payload = _build_ingest_payload(project, project_path, verbose)
# Attach sync state so the server can record it
sync = _read_sync(project_path, prod=True)
if sync:
payload["sync"] = sync
print(f"{api_url}/api/ingest")
try:
r = requests.post(
f"{api_url}/api/ingest",
json=payload,
headers={"Authorization": f"Bearer {api_key}"},
timeout=30,
)
except requests.exceptions.ConnectionError:
print(f"✗ Could not connect to {api_url}")
return 1
if not r.ok:
try:
body = r.json()
except Exception:
body = r.text[:500]
print(f"✗ Server returned {r.status_code}: {body}")
return 1
result = r.json()
video_id = result.get("video_id")
slides_upserted = result.get("slides_upserted", 0)
# Update sync state
now_iso = datetime.now(tz=timezone.utc).isoformat(timespec="seconds")
existing_sync = _read_sync(project_path, prod=True)
_write_sync(
project_path,
{**existing_sync, "last_pushed_at": now_iso},
prod=True,
)
print(f"{project_id} → video #{video_id} ({slides_upserted} slides)")
return 0
# ── Local dev: gnommoweb POST /api/projects/push ──────────────────────────────
def _push_local(project: dict, project_path: Path, verbose: bool, force: bool) -> int:
api_url = os.environ.get("GNOMMOWEB_URL", "").rstrip("/")
api_key = os.environ.get("GNOMMOWEB_API_KEY", "")
if not api_url:
print("Error: GNOMMOWEB_URL is not set.", file=sys.stderr)
return 1
if not api_key:
print("Error: GNOMMOWEB_API_KEY is not set.", file=sys.stderr)
return 1
if verbose:
print(f" → local: {api_url}")
project_id = project["id"]
parent_project = project.get("parent_project")
# ── Build payload ─────────────────────────────────────────────────────────
if parent_project:
payload = _build_short_payload(project, project_path, verbose)
else:
@@ -148,7 +304,6 @@ def cmd_push(
kind = "short" if parent_project else "parent video"
print(f"Pushing {project_id} ({kind}) to {api_url}")
# ── POST ──────────────────────────────────────────────────────────────────
try:
r = requests.post(
f"{api_url}/api/projects/push",
@@ -171,9 +326,8 @@ def cmd_push(
result = r.json()
server_updated_at = result.get("server_updated_at")
# ── Write sync state ──────────────────────────────────────────────────────
now_iso = datetime.now(tz=timezone.utc).isoformat(timespec="seconds")
existing_sync = _read_sync(project_path, prod)
existing_sync = _read_sync(project_path, prod=False)
_write_sync(
project_path,
{
@@ -181,10 +335,9 @@ def cmd_push(
"last_pushed_at": now_iso,
"server_updated_at": server_updated_at,
},
prod,
prod=False,
)
# ── Print summary ─────────────────────────────────────────────────────────
asset = result.get("asset", {})
if result.get("type") == "short":
print(f"{project_id} → gn_asset #{asset.get('id')} [{asset.get('status')}]")
@@ -202,7 +355,6 @@ def cmd_push(
def _build_parent_payload(project: dict, project_path: Path, verbose: bool) -> dict:
# Read the manuscript file if one is specified
script_content = None
manuscript_str = project.get("manuscript")
if manuscript_str:
@@ -238,7 +390,6 @@ def _build_parent_payload(project: dict, project_path: Path, verbose: bool) -> d
def _build_short_payload(project: dict, project_path: Path, verbose: bool) -> dict:
# Read the script file if one is specified
script_content = None
script_path_str = project.get("script")
if script_path_str:
+178 -100
View File
@@ -237,8 +237,27 @@ def _resolve_video_path(
source_path = base_dir / video_source.source_file
if project_path:
resolved, _ = resolve_with_cache(source_path, project_path)
return resolved
return source_path
else:
resolved = source_path
if not resolved.exists():
# File not found anywhere — substitute PlaceholderVideo so FFmpeg doesn't crash
placeholder = None
if shared_assets_dir:
p = shared_assets_dir / "PlaceholderVideo.mp4"
if project_path:
p, _ = resolve_with_cache(p, project_path)
if p.exists():
placeholder = p
if placeholder:
import sys
print(
f" Warning: {video_source.source_file} not found — using PlaceholderVideo",
file=sys.stderr,
)
return placeholder
return resolved
def _has_audio_stream(video_path: Path) -> bool:
@@ -303,6 +322,14 @@ def build_ffmpeg_command(plan: RenderPlan, output_path: Path) -> list[str]:
"""Build the complete FFmpeg command as a list of arguments."""
cmd = ["ffmpeg", "-y"] # -y to overwrite output
# Global thread limits before any -i. Without this, each format=rgba conversion
# in the filter graph (one per video layer) spawns one swscaler thread per CPU core,
# causing OOM on Apple Silicon where av_cpu_count() returns 10-11.
from .cache import get_ffmpeg_thread_count
_tc = str(get_ffmpeg_thread_count())
cmd.extend(["-threads", _tc, "-filter_threads", _tc])
# Resolve paths to absolute
project_path = plan.project_path.resolve()
output_path = output_path.resolve()
@@ -354,6 +381,7 @@ def build_ffmpeg_command(plan: RenderPlan, output_path: Path) -> list[str]:
f"Background handle '{bg_handle}' not found in shared_assets/videos.json"
)
bg_path = shared_assets_dir / bg_videos[bg_handle]["source_file"]
bg_path, _ = resolve_with_cache(bg_path, plan.project_path)
if not bg_path.exists():
raise RenderError(
f"Background file not found: {bg_path} (from handle '{bg_handle}')"
@@ -395,13 +423,30 @@ def build_ffmpeg_command(plan: RenderPlan, output_path: Path) -> list[str]:
video_path = _resolve_video_path(
videos_dir, event.video_source, shared_assets_dir, project_path
)
skip = event.video_source.skip
skip = event.video_source.skip or 0.0
# How long this clip needs to play in the output
clip_duration = event.end_time - event.start_time
if event.video_source.take is not None:
clip_duration = min(clip_duration, event.video_source.take)
# Loop the clip if the file is shorter than the display window.
# Don't loop pause-narration videos — they intentionally play once and stop.
needs_loop = False
if event.video_source.duration is not None and not event.video_source.pause_narration:
remaining = event.video_source.duration - skip
needs_loop = remaining < clip_duration - 0.1 # 0.1 s tolerance
if needs_loop:
cmd.extend(["-stream_loop", "-1"])
if skip > 0:
cmd.extend(["-ss", f"{skip:.3f}"])
cmd.extend(["-analyzeduration", "0", "-probesize", "1000"])
# Use pre-probed duration to tell FFmpeg exactly how much to read,
# preventing scans of ghost audio tracks on empty MP4 audio streams.
if event.video_source.duration is not None:
# Use pre-probed duration (or loop-limited duration) to tell FFmpeg exactly
# how much to read, preventing scans of ghost audio tracks on empty streams.
if needs_loop:
cmd.extend(["-t", f"{clip_duration:.3f}"])
elif event.video_source.duration is not None:
remaining = event.video_source.duration - skip
if remaining > 0:
cmd.extend(["-t", f"{remaining:.3f}"])
@@ -410,7 +455,9 @@ def build_ffmpeg_command(plan: RenderPlan, output_path: Path) -> list[str]:
input_idx += 1
has_audio = event.video_source.has_audio
if has_audio is None:
print(f" Warning: no cached metadata for '{event.video_source.source_file}' — run 'gnommo import' to avoid slow probing")
print(
f" Warning: no cached metadata for '{event.video_source.source_file}' — run 'gnommo import' to avoid slow probing"
)
has_audio = _has_audio_stream(video_path)
if has_audio:
video_events_with_audio.add(i)
@@ -423,7 +470,7 @@ def build_ffmpeg_command(plan: RenderPlan, output_path: Path) -> list[str]:
video_path = _resolve_video_path(
videos_dir, event.video_source, shared_assets_dir, project_path
)
skip = event.video_source.skip
skip = event.video_source.skip or 0.0
if skip > 0:
cmd.extend(["-ss", f"{skip:.3f}"])
cmd.extend(["-analyzeduration", "0", "-probesize", "1000"])
@@ -436,7 +483,9 @@ def build_ffmpeg_command(plan: RenderPlan, output_path: Path) -> list[str]:
input_idx += 1
has_audio = event.video_source.has_audio
if has_audio is None:
print(f" Warning: no cached metadata for '{event.video_source.source_file}' — run 'gnommo import' to avoid slow probing")
print(
f" Warning: no cached metadata for '{event.video_source.source_file}' — run 'gnommo import' to avoid slow probing"
)
has_audio = _has_audio_stream(video_path)
if has_audio:
outro_events_with_audio.add(i)
@@ -451,7 +500,12 @@ def build_ffmpeg_command(plan: RenderPlan, output_path: Path) -> list[str]:
for event in plan.audio_events:
if event.audio_id not in audio_inputs:
audio_path = audio_dir / event.audio_def.file
if event.audio_def.is_shared and plan.shared_assets_dir:
audio_path = (
plan.shared_assets_dir / "media" / "audio" / event.audio_def.file
)
else:
audio_path = audio_dir / event.audio_def.file
audio_path, _ = resolve_with_cache(audio_path, project_path)
# Use pre-probed duration from audio.json if available (set by import).
# For MP3 without Xing/VBRI headers this is critical — FFmpeg otherwise
@@ -468,7 +522,8 @@ def build_ffmpeg_command(plan: RenderPlan, output_path: Path) -> list[str]:
# Cache duration for crossfade loop filter
if event.audio_def.loop and event.audio_def.overlap:
audio_durations[event.audio_id] = (
file_duration if file_duration is not None
file_duration
if file_duration is not None
else _get_audio_duration(audio_path)
)
@@ -797,13 +852,14 @@ def build_filter_complex(
"""
Build the filter_complex string for FFmpeg.
Layer structure:
Layer structure (bottom to top):
- Layer 1: Background (solid color, image, or video)
- Layer 2: Always visible videos (like talking head) in cutouts
- Layer 3: Slides (with time-based enable)
- Layer 4: Triggered videos in cutouts (with time-based enable)
- Layer 5: Camera transform
- Layer 6: Outro videos (fullscreen, after narration ends)
- Layer 2: "below" triggered videos (vfb/vf2b/vsb) — behind slides, use with slide on top to mask
- Layer 3: Slides (transparent in talking-head cutout area)
- Layer 4: Always visible videos (talking head) — above slides, visible through cutout
- Layer 5: "above" triggered videos (vft/vf2t/vst) — topmost, covers everything including talking head
- Layer 6: Camera transform
- Layer 7: Outro videos (fullscreen, after narration ends)
- Audio: Main audio mixed with triggered sound effects and outro audio
"""
outro_inputs = outro_inputs or {}
@@ -830,70 +886,7 @@ def build_filter_complex(
current_label = "bg"
# Overlay always_visible videos (like talking head)
# If there are narration pauses, we need to segment the video
for i, (video_id, video_source, cutout) in enumerate(plan.narration_videos):
input_idx = always_visible_inputs[i]
cut_x, cut_y, cut_width, cut_height = _calculate_cutout_position(
cutout, width, height
)
# Apply zoom factor to cutout dimensions
zoom = video_source.zoom
zoomed_width = int(cut_width * zoom)
zoomed_height = int(cut_height * zoom)
if not plan.narration_pauses:
# Simple case: no pauses, continuous overlay
# fps+setpts normalise the source to a constant frame rate and reset
# the timeline to 0 so the video stays locked to the audio track.
video_label = f"av{i}"
filters.append(
f"[{input_idx}:v]fps={plan.config.fps},setpts=PTS-STARTPTS,"
f"format=yuva444p10le,"
f"scale={zoomed_width}:{zoomed_height}:force_original_aspect_ratio=increase,"
f"crop={cut_width}:{cut_height}:(iw-{cut_width})/2:(ih-{cut_height})/2,"
f"format=rgba[{video_label}]"
)
next_label = f"avbase{i}"
filters.append(
f"[{current_label}][{video_label}]overlay=x={cut_x}:y={cut_y}[{next_label}]"
)
current_label = next_label
else:
# Complex case: narration pauses - segment the video
# Each segment is trimmed from source and positioned in output timeline
segments = _build_narration_segments(
plan.narration_pauses, plan.total_duration
)
for seg_idx, (src_start, src_end, out_start, out_end) in enumerate(
segments
):
seg_label = f"av{i}_seg{seg_idx}"
# Trim to source range, then shift PTS to output position
# setpts=PTS-STARTPTS puts segment at 0, then +offset/TB shifts to output time
pts_offset = out_start
filters.append(
f"[{input_idx}:v]trim={src_start:.3f}:{src_end:.3f},"
f"setpts=PTS-STARTPTS+{pts_offset:.3f}/TB,"
f"format=yuva444p10le,"
f"scale={zoomed_width}:{zoomed_height}:force_original_aspect_ratio=increase,"
f"crop={cut_width}:{cut_height}:(iw-{cut_width})/2:(ih-{cut_height})/2,"
f"format=rgba[{seg_label}]"
)
# Overlay with enable for this segment's output time range
next_label = f"avbase{i}_seg{seg_idx}"
enable_expr = f"between(t\\,{out_start:.3f}\\,{out_end:.3f})"
filters.append(
f"[{current_label}][{seg_label}]overlay=x={cut_x}:y={cut_y}:"
f"enable={enable_expr}[{next_label}]"
)
current_label = next_label
# Add "below-slides" triggered video overlays (vfb/vsb or layer="below")
# Layer 2: "below" triggered video overlays (vfb/vsb) — behind slides and talking head
for i, event in enumerate(plan.video_events):
if event.layer != "below":
continue
@@ -925,23 +918,112 @@ def build_filter_complex(
enable_expr = f"between(t\\,{event.start_time:.3f}\\,{effective_end:.3f})"
filters.append(
f"[{current_label}][{video_label}]overlay="
f"x={cut_x}:y={cut_y}:enable={enable_expr}"
f"x={cut_x}:y={cut_y}:enable={enable_expr}:eof_action=pass"
f"[{next_label}]"
)
current_label = next_label
# Add slide overlays with time-based enable
# Layer 3: Talking head — above below-videos, but under slides so fullscreen slides cover it
for i, (video_id, video_source, cutout) in enumerate(plan.narration_videos):
input_idx = always_visible_inputs[i]
cut_x, cut_y, cut_width, cut_height = _calculate_cutout_position(
cutout, width, height
)
zoom = video_source.zoom
zoomed_width = int(cut_width * zoom)
zoomed_height = int(cut_height * zoom)
if not plan.narration_pauses:
video_label = f"av{i}"
filters.append(
f"[{input_idx}:v]fps={plan.config.fps},setpts=PTS-STARTPTS,"
f"format=yuva444p10le,"
f"scale={zoomed_width}:{zoomed_height}:force_original_aspect_ratio=increase,"
f"crop={cut_width}:{cut_height}:(iw-{cut_width})/2:(ih-{cut_height})/2,"
f"format=rgba[{video_label}]"
)
next_label = f"avbase{i}"
filters.append(
f"[{current_label}][{video_label}]overlay=x={cut_x}:y={cut_y}[{next_label}]"
)
current_label = next_label
else:
segments = _build_narration_segments(
plan.narration_pauses, plan.total_duration
)
for seg_idx, (src_start, src_end, out_start, out_end) in enumerate(
segments
):
seg_label = f"av{i}_seg{seg_idx}"
pts_offset = out_start
filters.append(
f"[{input_idx}:v]trim={src_start:.3f}:{src_end:.3f},"
f"setpts=PTS-STARTPTS+{pts_offset:.3f}/TB,"
f"format=yuva444p10le,"
f"scale={zoomed_width}:{zoomed_height}:force_original_aspect_ratio=increase,"
f"crop={cut_width}:{cut_height}:(iw-{cut_width})/2:(ih-{cut_height})/2,"
f"format=rgba[{seg_label}]"
)
next_label = f"avbase{i}_seg{seg_idx}"
enable_expr = f"between(t\\,{out_start:.3f}\\,{out_end:.3f})"
filters.append(
f"[{current_label}][{seg_label}]overlay=x={cut_x}:y={cut_y}:"
f"enable={enable_expr}[{next_label}]"
)
current_label = next_label
# Layer 4: "mid" triggered videos (vfm/vsm) — above talking head, below slides
# Use case: content that should show through a slide's transparent "screen hole"
for i, event in enumerate(plan.video_events):
if event.layer != "mid":
continue
video_idx = video_inputs[i]
cut_x, cut_y, cut_width, cut_height = _calculate_cutout_position(
event.cutout, width, height
)
duration = event.end_time - event.start_time
if event.video_source.take is not None:
duration = min(duration, event.video_source.take)
effective_end = event.start_time + duration
zoom = event.video_source.zoom
zoomed_width = int(cut_width * zoom)
zoomed_height = int(cut_height * zoom)
video_label = f"tvm{i}"
start_pts = event.start_time
filters.append(
f"[{video_idx}:v]format=yuva444p10le,"
f"setpts=PTS-STARTPTS+{start_pts:.3f}/TB,"
f"scale={zoomed_width}:{zoomed_height}:force_original_aspect_ratio=increase,"
f"crop={cut_width}:{cut_height}:(iw-{cut_width})/2:(ih-{cut_height})/2,"
f"format=rgba[{video_label}]"
)
next_label = f"tvmbase{i}"
enable_expr = f"between(t\\,{event.start_time:.3f}\\,{effective_end:.3f})"
filters.append(
f"[{current_label}][{video_label}]overlay="
f"x={cut_x}:y={cut_y}:enable={enable_expr}:eof_action=pass"
f"[{next_label}]"
)
current_label = next_label
# Layer 5: Slides — on top of talking head so fullscreen slides cover the narrator
for i, event in enumerate(plan.slide_events):
slide_idx = slide_inputs[event.slide_id]
# Scale slide to full frame size (transparent areas show through)
slide_label = f"s{i}"
filters.append(
f"[{slide_idx}:v]scale={width}:{height}:"
f"force_original_aspect_ratio=decrease,pad={width}:{height}:(ow-iw)/2:(oh-ih)/2:color=0x00000000[{slide_label}]"
)
# Overlay at 0,0 (full frame) with time-based enable
next_label = f"sbase{i}"
enable_expr = f"between(t\\,{event.start_time:.3f}\\,{event.end_time:.3f})"
filters.append(
@@ -949,10 +1031,10 @@ def build_filter_complex(
f"x=0:y=0:enable={enable_expr}"
f"[{next_label}]"
)
current_label = next_label
# Add "above-slides" triggered video overlays (vft/vst or layer="above")
# Layer 6: "above" triggered videos (vft/vf2t/vst) — topmost, covers slides and talking head
# Use case: fullscreen video that intentionally masks the narrator
for i, event in enumerate(plan.video_events):
if event.layer != "above":
continue
@@ -961,22 +1043,15 @@ def build_filter_complex(
event.cutout, width, height
)
# Calculate effective end time (respecting 'take' parameter)
duration = event.end_time - event.start_time
if event.video_source.take is not None:
duration = min(duration, event.video_source.take)
effective_end = event.start_time + duration
# Apply zoom factor to cutout dimensions
zoom = event.video_source.zoom
zoomed_width = int(cut_width * zoom)
zoomed_height = int(cut_height * zoom)
# Scale to cover the zoomed area (like CSS object-fit: cover)
# Then crop to cutout dimensions (centered)
# Use setpts to sync video start with overlay enable time
# IMPORTANT: convert to rgba FIRST (before scale/crop) so the alpha channel
# is preserved throughout. scale in yuva444p10le can silently strip alpha.
video_label = f"tv{i}"
start_pts = event.start_time
filters.append(
@@ -987,16 +1062,13 @@ def build_filter_complex(
f"[{video_label}]"
)
# Overlay with time-based enable; format=auto lets FFmpeg pick the right
# compositing format so the RGBA alpha channel is respected.
next_label = f"tvbase{i}"
enable_expr = f"between(t\\,{event.start_time:.3f}\\,{effective_end:.3f})"
filters.append(
f"[{current_label}][{video_label}]overlay="
f"x={cut_x}:y={cut_y}:enable={enable_expr}:format=auto"
f"x={cut_x}:y={cut_y}:enable={enable_expr}:format=auto:eof_action=pass"
f"[{next_label}]"
)
current_label = next_label
# Scene composition complete - now apply camera transform
@@ -1262,10 +1334,13 @@ def build_filter_complex(
delay_ms = int(event.start_time * 1000)
label = f"tvaud{i}"
vol = event.video_source.volume
vol_filter = f",volume={vol:.2f}" if vol != 1.0 else ""
filters.append(
f"[{video_idx}:a]atrim=0:{duration:.3f},"
f"asetpts=PTS-STARTPTS,"
f"adelay={delay_ms}|{delay_ms}[{label}]"
f"adelay={delay_ms}|{delay_ms}"
f"{vol_filter}[{label}]"
)
audio_labels_to_mix.append(f"[{label}]")
@@ -1281,10 +1356,13 @@ def build_filter_complex(
delay_ms = int(event.start_time * 1000)
label = f"outroaud{i}"
vol = event.video_source.volume
vol_filter = f",volume={vol:.2f}" if vol != 1.0 else ""
filters.append(
f"[{video_idx}:a]atrim=0:{duration:.3f},"
f"asetpts=PTS-STARTPTS,"
f"adelay={delay_ms}|{delay_ms}[{label}]"
f"adelay={delay_ms}|{delay_ms}"
f"{vol_filter}[{label}]"
)
audio_labels_to_mix.append(f"[{label}]")
+379 -179
View File
@@ -22,12 +22,38 @@ from .models import (
VideoEvent,
VideoSource,
)
from .parser import get_video_duration
from .parser import get_video_duration, resolve_missing_videos
from .transcriber import TranscribedWord
# Audio trigger offset: play sound this many seconds before the marker
AUDIO_OFFSET_SECONDS = 1.0
# Shorthand marker prefix → (cutout_name, layer).
# These are the ETL source-of-truth: when a manuscript contains [vft:X],
# that projects cutout="fullscreen" and layer="above" into videos.json for X.
# The pause-variant entries (vftp: etc.) carry a third element "pause_narration"
# which is a per-event property, not stored in videos.json.
_SHORTHAND_PREFIXES: dict[str, tuple] = {
"vft:": ("fullscreen", "above"),
"vfb:": ("fullscreen", "below"),
"vfm:": ("fullscreen", "mid"),
"vf2t:": ("fullscreen2", "above"),
"vf2b:": ("fullscreen2", "below"),
"vf2m:": ("fullscreen2", "mid"),
"vst:": ("square", "above"),
"vsb:": ("square", "below"),
"vsm:": ("square", "mid"),
"vftp:": ("fullscreen", "above"),
"vfbp:": ("fullscreen", "below"),
"vfmp:": ("fullscreen", "mid"),
"vf2tp:": ("fullscreen2", "above"),
"vf2bp:": ("fullscreen2", "below"),
"vf2mp:": ("fullscreen2", "mid"),
"vstp:": ("square", "above"),
"vsbp:": ("square", "below"),
"vsmp:": ("square", "mid"),
}
@dataclass
class MarkerTiming:
@@ -134,7 +160,16 @@ def _is_known_marker(
return True
# Video/narration triggers (all supported prefixes)
_VIDEO_PREFIXES = ("video:", "narration:", "vft:", "vfb:", "vst:", "vsb:", "vftp:", "vfbp:", "vstp:", "vsbp:")
_VIDEO_PREFIXES = (
"video:",
"narration:",
"vft:", "vfb:", "vfm:",
"vf2t:", "vf2b:", "vf2m:",
"vst:", "vsb:", "vsm:",
"vftp:", "vfbp:", "vfmp:",
"vf2tp:", "vf2bp:", "vf2mp:",
"vstp:", "vsbp:", "vsmp:",
)
if any(marker_id.startswith(p) for p in _VIDEO_PREFIXES):
return True
@@ -142,11 +177,15 @@ def _is_known_marker(
if marker_id in CAMERA_PRESETS:
return True
# Audio markers (A followed by id)
# Audio markers (A followed by id, e.g., Awoosh) or audio: prefix (e.g., audio:woosh)
if marker_id.startswith("A") and len(marker_id) > 1:
audio_id = marker_id[1:]
if audio_id in audio or audio_id.isdigit():
return True
if marker_id.startswith("audio:") and audio is not None:
audio_id = marker_id[6:]
if audio_id in audio:
return True
return False
@@ -167,78 +206,81 @@ def _extract_marker_contexts(
slides: dict = None,
videos: dict = None,
audio: dict = None,
) -> list[tuple[str, str]]:
) -> list[tuple[str, str, bool, str]]:
"""
Extract known markers and the text immediately following them from manuscript.
Unknown markers are filtered out and stripped from following text.
Note: [cite:...] markers are already stripped at parse time.
Returns list of (marker_id, following_text) tuples for known markers only.
Returns list of (marker_id, anchor_text, is_borrowed, anchor_type) tuples.
anchor_type is "before" (default — place before the matched phrase) or
"after" (place at the end of the matched phrase — used for markers that
trail a narration block and have no following text of their own).
"""
slides = slides or {}
videos = videos or {}
audio = audio or {}
# Split by markers, keeping the markers — broad pattern handles any content
# including paths with / and - (e.g. [vfb:pexels/7670835-uhd_3840_2160_30fps])
parts = re.split(r"\[([^\]]+)\]", manuscript_text)
# parts: [text_before, marker1, text_after1, marker2, text_after2, ...]
raw_contexts = []
for i in range(1, len(parts), 2):
marker_id = parts[i]
# Skip unknown markers entirely
if not _is_known_marker(marker_id, slides, videos, audio):
continue
# Collect all following text, looking past unknown markers until the
# next known marker. This handles [S1][segment:1] text... where the
# text lives two parts ahead rather than immediately after S1.
text_pieces = []
j = i + 1
while j < len(parts):
chunk = parts[j].strip()
if chunk:
text_pieces.append(chunk)
j += 1 # advance to the marker after this text chunk
j += 1
if j >= len(parts):
break
if _is_known_marker(parts[j], slides, videos, audio):
break # stop at the next known marker
j += 1 # skip the unknown marker; its following text is next
break
j += 1
following_text = " ".join(text_pieces)
following_text = " ".join(following_text.split()) # collapse whitespace
following_text = " ".join(following_text.split())
following_text = _strip_unknown_markers(following_text, slides, videos, audio)
following_text = " ".join(following_text.split())
raw_contexts.append((marker_id, following_text))
# For markers with no following text (consecutive markers), look ahead
# Return (marker_id, following_text, is_borrowed) - is_borrowed=True means text came from look-ahead
contexts = []
for i, (marker_id, following_text) in enumerate(raw_contexts):
if following_text:
# Take first ~10 words for matching
words = following_text.split()[:10]
contexts.append((marker_id, " ".join(words), False))
contexts.append((marker_id, " ".join(words), False, "before"))
else:
# Look ahead for next marker with text, but never borrow from another
# slide marker — slides must align independently to avoid two consecutive
# slides matching the same transcription position simultaneously.
borrowed = False
for j in range(i + 1, len(raw_contexts)):
next_marker_id, next_text = raw_contexts[j]
if next_text:
if next_marker_id in (slides or {}):
break # Slide owns this text; give up borrowing
break
words = next_text.split()[:10]
contexts.append((marker_id, " ".join(words), True)) # Borrowed
contexts.append((marker_id, " ".join(words), True, "before"))
borrowed = True
break
if not borrowed:
contexts.append((marker_id, "", False))
# No following text and blocked by a slide boundary — look
# backward for the tail of the preceding narration block and
# anchor to the END of those words instead of extrapolating.
preceding_text = ""
for k in range(i - 1, -1, -1):
if raw_contexts[k][1]:
preceding_text = raw_contexts[k][1]
break
if preceding_text:
words = preceding_text.split()
tail = " ".join(words[-6:])
contexts.append((marker_id, tail, False, "after"))
else:
contexts.append((marker_id, "", False, "before"))
return contexts
@@ -248,13 +290,18 @@ def _fuzzy_match_ratio(
transcription: list[TranscribedWord],
start_idx: int,
window_size: int = 10,
pre_filler: int = 30,
inter_filler: int = 3,
) -> tuple[float, int, int]:
"""
Calculate how many words from phrase match the transcription at start_idx.
Words are matched sequentially: each phrase word must appear at or after
the position of the previous match. This prevents false matches where
phrase words appear out of order or far into the window.
Words are matched sequentially. Two separate filler tolerances:
- pre_filler: max words before the FIRST phrase word (absorbs ad-libs)
- inter_filler: max words between consecutive phrase words (keeps the
match tight so common words don't stretch the window far
into later text, which would push last_idx past subsequent
markers' positions)
Returns (ratio, first_match_offset, last_match_end_offset) where offsets
are relative to start_idx. last_match_end_offset points past the last
@@ -263,40 +310,44 @@ def _fuzzy_match_ratio(
if not phrase_words:
return 0.0, 0, 0
words_to_check = min(len(phrase_words), window_size)
transcript_end = min(
start_idx + words_to_check + 5, len(transcription)
) # +5 for flexibility (speaker may add filler words)
if start_idx >= len(transcription):
return 0.0, 0, 0
words_to_check = min(len(phrase_words), window_size)
# Window only needs to cover pre_filler + phrase words + inter_filler slack
transcript_end = min(
start_idx + pre_filler + words_to_check + inter_filler, len(transcription)
)
transcript_words = [
_normalize_token(transcription[j].word)
for j in range(start_idx, transcript_end)
]
# Match phrase words sequentially against transcript window
matches = 0
words_checked = 0
t_pos = 0 # Current search position in transcript window
t_pos = 0
first_match_offset = 0
last_match_end_offset = 0
for phrase_word in phrase_words[:words_to_check]:
normalized = _normalize_token(phrase_word)
if len(normalized) < 2:
continue # skip very short words (a, I, etc.) - don't count them
continue
words_checked += 1
# Search forward from current position (preserves word order)
for j in range(t_pos, len(transcript_words)):
# First phrase word may be preceded by a long ad-lib; subsequent words
# should appear within a few positions of each other.
if matches == 0:
search_end = min(t_pos + pre_filler + 1, len(transcript_words))
else:
search_end = min(t_pos + inter_filler + 1, len(transcript_words))
for j in range(t_pos, search_end):
t_word = transcript_words[j]
matched = False
# Exact match
if normalized == t_word:
matched = True
# Allow substring match for words 4+ chars (handles plurals, tenses)
elif len(normalized) >= 4 and len(t_word) >= 4:
if normalized in t_word or t_word in normalized:
matched = True
@@ -306,7 +357,7 @@ def _fuzzy_match_ratio(
first_match_offset = j
matches += 1
last_match_end_offset = j + 1
t_pos = j + 1 # Next word must appear after this one
t_pos = j + 1
break
ratio = matches / words_checked if words_checked > 0 else 0.0
@@ -326,11 +377,6 @@ def _find_phrase_timestamp(
(-1, -1.0, 0.0, -1) if not found. word_index points to the first
matched word. match_end_idx points past the last matched word.
"""
# Normalize each word individually — same method as transcript tokens.
# This keeps contractions as single tokens ("haven't" stays "haven't") so
# phrase and transcript word counts stay in sync. Using _normalize_text on
# the whole phrase would expand "haven't" → "have not" (2 words), creating
# a phantom "not" that fails to match the transcript and corrupts the window.
phrase_words = [tok for tok in (_normalize_token(w) for w in phrase.split()) if tok]
if not phrase_words:
@@ -341,7 +387,6 @@ def _find_phrase_timestamp(
best_first_offset = 0
best_end_offset = 0
# Slide through transcription looking for best match
for i in range(start_from, len(transcription)):
ratio, first_offset, end_offset = _fuzzy_match_ratio(
phrase_words, transcription, i
@@ -352,13 +397,14 @@ def _find_phrase_timestamp(
best_first_offset = first_offset
best_end_offset = end_offset
# If we found a very good match, stop early
if ratio >= 0.95:
# Sequential alignment: stop at the first position that clears the
# threshold. Continuing to scan the full transcript risks jumping
# to a higher-ratio match much later and skipping over subsequent
# markers' positions entirely.
if best_ratio >= fuzzy_threshold:
break
if best_ratio >= fuzzy_threshold and best_idx >= 0:
# Use the actual first matched word position for the timestamp,
# not the window start position
actual_idx = best_idx + best_first_offset
match_end_idx = best_idx + best_end_offset
return actual_idx, transcription[actual_idx].start, best_ratio, match_end_idx
@@ -375,21 +421,25 @@ def align_markers_to_transcription(
fuzzy_threshold: float = 0.6,
) -> list[MarkerTiming]:
"""
Align manuscript markers to transcription timestamps using fuzzy matching.
Align manuscript markers to transcription timestamps using fuzzy phrase matching.
This is the core alignment function that matches markers in manuscript.txt
to their corresponding timecodes in the whisper transcription.
For each known marker, extracts the text immediately following it in the
manuscript and searches for that phrase in the Whisper transcript. Markers are
matched in manuscript order, each starting its search after the previous match.
Unknown markers are filtered out - they aren't pronounced and shouldn't
be in the render plan. Note: [cite:...] markers are stripped at parse time.
The filler-word window is intentionally large (+30 words) so that ad-libbed
words spoken before or between the manuscript cue words do not prevent a match.
Unknown markers are filtered out — they aren't pronounced and shouldn't be in
the render plan. Note: [cite:...] markers are stripped at parse time.
Args:
manuscript_text: Full manuscript with [S1], [video:xxx], etc.
transcription: Word-level timestamps from whisper
slides: Slide definitions (to identify valid slide markers)
videos: Video definitions (to identify valid video markers)
audio: Audio definitions (to identify valid audio markers)
fuzzy_threshold: Minimum match ratio (default 0.6 = 60% of words)
transcription: Word-level timestamps from Whisper
slides: Slide definitions (to identify valid slide markers)
videos: Video definitions (to identify valid video markers)
audio: Audio definitions (to identify valid audio markers)
fuzzy_threshold: Minimum match ratio (default 0.6 = 60% of words must match)
Returns:
List of MarkerTiming with timestamps and confidence (known markers only)
@@ -398,13 +448,10 @@ def align_markers_to_transcription(
timings: list[MarkerTiming] = []
last_idx = 0
last_end_time = 0.0 # Track end time of last matched phrase
last_end_time = 0.0
for marker_id, following_text, is_borrowed in contexts:
# If no text (empty context), place 1 second after the previous marker/phrase
# This handles markers like [video:xxx] that appear after text
if not following_text.strip():
# Use 1 second after the previous end time
for marker_id, anchor_text, is_borrowed, anchor_type in contexts:
if not anchor_text.strip():
marker_time = last_end_time + 1.0
timings.append(
MarkerTiming(
@@ -414,48 +461,169 @@ def align_markers_to_transcription(
confidence=1.0,
)
)
# Update last_end_time so subsequent markers without text continue to offset
last_end_time = marker_time
continue
idx, timestamp, confidence, match_end_idx = _find_phrase_timestamp(
following_text,
anchor_text,
transcription,
start_from=last_idx,
fuzzy_threshold=fuzzy_threshold,
)
if idx >= 0:
# Apply offset: marker should appear slightly before the words
adjusted_time = max(0.0, timestamp - 0.5)
timings.append(
MarkerTiming(
marker_id=marker_id,
timestamp=adjusted_time,
context=following_text[:50],
confidence=confidence,
if anchor_type == "after":
# Marker trails a narration block — place it at the END of the
# matched phrase (when those words finish being spoken).
end_idx = min(match_end_idx - 1, len(transcription) - 1)
marker_time = transcription[end_idx].end if transcription else 0.0
timings.append(
MarkerTiming(
marker_id=marker_id,
timestamp=marker_time,
context=f"(end of: {anchor_text[:40]})",
confidence=confidence,
)
)
)
# Only advance last_idx if this marker owns its text (not borrowed)
# If borrowed, the next marker needs to match the same text
if not is_borrowed:
last_idx = match_end_idx
# Calculate end time of this phrase for markers with no text
if last_idx > 0 and last_idx <= len(transcription):
last_end_time = transcription[last_idx - 1].end
else:
last_end_time = transcription[-1].end if transcription else 0.0
last_end_time = marker_time
else:
adjusted_time = max(0.0, timestamp - 0.5)
timings.append(
MarkerTiming(
marker_id=marker_id,
timestamp=adjusted_time,
context=anchor_text[:50],
confidence=confidence,
)
)
if not is_borrowed:
last_idx = match_end_idx
if last_idx > 0 and last_idx <= len(transcription):
last_end_time = transcription[last_idx - 1].end
else:
last_end_time = transcription[-1].end if transcription else 0.0
else:
timings.append(
MarkerTiming(
marker_id=marker_id,
timestamp=-1.0,
context=following_text[:50],
context=anchor_text[:50],
confidence=0.0,
)
)
return timings
# Repair pass: retry INTERPOLATED markers that the forward scan missed.
# Root cause of cascade failures: one bad match advances last_idx past
# the true positions of several subsequent markers. Fix: search in a
# bounded window [prev_marker_time - 1s, next_marker_time + 2s] so we
# avoid false early matches while still recovering from cascade failures.
if any(t.timestamp < 0 for t in timings):
for i, timing in enumerate(timings):
if timing.timestamp >= 0:
continue
marker_id, anchor_text, is_borrowed, anchor_type = contexts[i]
if not anchor_text.strip():
continue
# Lower bound: previous matched marker's timestamp → word index.
# Repairs processed in order, so already-repaired markers count too.
prev_time = 0.0
for j in range(i - 1, -1, -1):
if timings[j].timestamp >= 0:
prev_time = max(0.0, timings[j].timestamp - 1.0)
break
win_start = next(
(j for j, w in enumerate(transcription) if w.start >= prev_time),
0,
)
# Upper bound: next matched marker in the timings list (+2s padding)
next_time = float("inf")
for j in range(i + 1, len(timings)):
if timings[j].timestamp >= 0:
next_time = timings[j].timestamp + 2.0
break
win_end = (
next(
(j for j, w in enumerate(transcription) if w.start > next_time),
len(transcription),
)
if next_time < float("inf")
else len(transcription)
)
if win_end <= win_start:
continue
# Search in the bounded window with a relaxed threshold
sub = transcription[win_start:win_end]
idx, timestamp, confidence, match_end_idx = _find_phrase_timestamp(
anchor_text,
sub,
start_from=0,
fuzzy_threshold=max(0.4, fuzzy_threshold - 0.1),
)
if idx >= 0:
if anchor_type == "after" and match_end_idx > 0:
end_word = sub[min(match_end_idx - 1, len(sub) - 1)]
marker_time = end_word.end
else:
marker_time = max(0.0, timestamp - 0.5)
timings[i] = MarkerTiming(
marker_id=marker_id,
timestamp=marker_time,
context=f"(repaired: {anchor_text[:40]})",
confidence=confidence,
)
# Deduplicate slide markers. The manuscript pattern [SN]\n\n[SN] text... is
# common: the first blank occurrence is a visual-transition cue and the second
# carries the narration text used for alignment. We keep the first entry in
# order (preserving manuscript position) but upgrade its timestamp to the
# best-matched value found for that ID, then drop subsequent duplicates.
slides_set = set(slides or {})
seen: dict[str, int] = {} # marker_id → index in deduped list
deduped: list[MarkerTiming] = []
for timing in timings:
if timing.marker_id not in slides_set:
deduped.append(timing)
continue
if timing.marker_id not in seen:
seen[timing.marker_id] = len(deduped)
deduped.append(timing)
else:
prev_idx = seen[timing.marker_id]
prev = deduped[prev_idx]
# Upgrade if: previous was a placeholder/interpolated and the new one is better.
# Also upgrade if previous used the backward-looking "after" anchor —
# that heuristic gives end-of-preceding-section timing, but a direct
# "before" match on the second occurrence (start-of-new-section 0.5s)
# is more accurate for when the slide should appear.
should_upgrade = (
prev.context == "(after previous)"
and timing.context != "(after previous)"
) or (
prev.timestamp < 0
and timing.timestamp >= 0
) or (
prev.context.startswith("(end of:")
and timing.timestamp >= 0
and timing.context != "(after previous)"
and not timing.context.startswith("(end of:")
)
if should_upgrade:
deduped[prev_idx] = MarkerTiming(
marker_id=prev.marker_id,
timestamp=timing.timestamp,
context=timing.context,
confidence=timing.confidence,
)
return deduped
def build_render_plan(
@@ -566,7 +734,31 @@ def build_render_plan(
time_range=(time_offset, render_end_time) if slide_range else None,
)
video_events = _extract_video_events(
# Before extracting video events, resolve any referenced videos that are missing
# from the project's videos.json by looking them up in shared_assets/videos.json.
_VIDEO_MARKER_PREFIXES = (
"video:",
"narration:",
"vft:", "vfb:", "vfm:",
"vf2t:", "vf2b:", "vf2m:",
"vst:", "vsb:", "vsm:",
"vftp:", "vfbp:", "vfmp:",
"vf2tp:", "vf2bp:", "vf2mp:",
"vstp:", "vsbp:", "vsmp:",
)
missing_video_ids = [
timing.marker_id[len(prefix) :]
for timing in marker_timings
if timing.timestamp >= 0
for prefix in _VIDEO_MARKER_PREFIXES
if timing.marker_id.startswith(prefix)
and timing.marker_id[len(prefix) :] not in videos
]
if missing_video_ids:
found = resolve_missing_videos(missing_video_ids, project_path, config)
videos.update(found)
video_events, video_warnings = _extract_video_events(
marker_timings,
videos,
config.cutouts,
@@ -574,6 +766,13 @@ def build_render_plan(
effective_duration,
time_range=(time_offset, render_end_time) if slide_range else None,
)
if video_warnings:
import sys
print("\nWarnings:", file=sys.stderr)
for w in video_warnings:
print(f"{w}", file=sys.stderr)
print("", file=sys.stderr)
# Track cached files for triggered videos
for event in video_events:
@@ -640,7 +839,10 @@ def build_render_plan(
slide_event.end_time += pause_duration
for vid_event in video_events:
if vid_event.start_time > narration_time:
if vid_event is event:
# Don't shift the pause event by its own pause
continue
if vid_event.start_time >= narration_time:
vid_event.start_time += pause_duration
if vid_event.end_time > narration_time:
vid_event.end_time += pause_duration
@@ -661,9 +863,27 @@ def build_render_plan(
# Save narration end time (before outro)
narration_end_time = total_duration
# Include outro only when rendering to the end of the video.
# A slide_range with an explicit end slide (e.g. S1:S10) is a middle chunk —
# skip the outro so it doesn't appear on every chunk, only the last one.
is_last_chunk = not slide_range or slide_range[1] is None
# Resolve any outro videos missing from videos.json via shared_assets.
if config.outro and is_last_chunk:
missing_outro_ids = [vid_id for vid_id in config.outro if vid_id not in videos]
if missing_outro_ids:
found = resolve_missing_videos(missing_outro_ids, project_path, config)
videos.update(found)
still_missing = [vid_id for vid_id in config.outro if vid_id not in videos]
for vid_id in still_missing:
print(
f" WARNING: outro video '{vid_id}' not found in videos.json or shared_assets — skipped",
flush=True,
)
# Build outro events (plays after narration ends)
outro_events = _extract_outro_events(
config.outro,
config.outro if is_last_chunk else [],
videos,
config.cutouts,
total_duration,
@@ -677,8 +897,8 @@ def build_render_plan(
if outro_events:
total_duration = outro_events[-1].end_time
# Derive slides directory
slides_json_path = project_path / config.slides_path
# Derive slides directory — lowercase path for case-sensitive filesystems (WSL/Linux).
slides_json_path = project_path / config.slides_path.lower()
slides_dir = slides_json_path.parent
plan = RenderPlan(
@@ -805,8 +1025,8 @@ def _extract_slide_events(
events: list[SlideEvent] = []
for i, (marker_time, marker_id) in enumerate(resolved):
# Each slide starts at its own marker time
start_time = marker_time
# First slide always starts at 0 — it's the opening state of the presentation.
start_time = 0.0 if i == 0 else marker_time
# End time is when the NEXT slide's marker appears, or end of video
if i + 1 < len(resolved):
@@ -839,13 +1059,16 @@ def _extract_video_events(
slides: dict[str, SlideDefinition],
total_duration: float,
time_range: Optional[tuple[float, float]] = None,
) -> list[VideoEvent]:
) -> tuple[list[VideoEvent], list[str]]:
"""
Extract video events from aligned marker timings.
- [video:xxx] events end at the next SLIDE marker
- [narration:xxx] events run until end
Returns (events, warnings). Invalid markers are skipped and reported in warnings.
"""
warnings: list[str] = []
range_start, range_end = time_range if time_range else (0.0, float("inf"))
# Collect slide times for video: end time calculation
@@ -857,22 +1080,14 @@ def _extract_video_events(
]
)
# Mapping from shorthand marker prefix → (implied_cutout_name, implied_layer)
# These are the defaults; videos.json values act as a base but the marker wins.
_SHORTHAND: dict[str, tuple[str, str]] = {
"vft:": ("fullscreen", "above"),
"vfb:": ("fullscreen", "below"),
"vst:": ("square", "above"),
"vsb:": ("square", "below"),
"vftp:": ("fullscreen", "above", "pause_narration"),
"vfbp:": ("fullscreen", "below", "pause_narration"),
"vstp:": ("square", "above", "pause_narration"),
"vsbp:": ("square", "below", "pause_narration"),
}
# Pause-variant prefixes — the only thing the render pass still needs from
# shorthand markers at event-build time (pause_narration is per-event, not stored in videos.json).
_PAUSE_PREFIXES = {"vftp:", "vfbp:", "vfmp:", "vf2tp:", "vf2bp:", "vf2mp:", "vstp:", "vsbp:", "vsmp:"}
# Collect video markers: (time, video_id, event_type, cutout_name_override, layer_override)
# event_type is "video" (ends at next slide) or "narration" (runs to end)
video_markers: list[tuple[float, str, str, str | None, str | None]] = []
# Collect video markers: (time, video_id, event_type, pause_narration)
# video_markers: (timestamp, video_id, marker_type, pause_narration)
# cutout and layer are read from videos.json (projected there by _project_markers_to_videos)
video_markers: list[tuple[float, str, str, bool]] = []
for timing in marker_timings:
if timing.timestamp < 0:
@@ -880,97 +1095,79 @@ def _extract_video_events(
mid = timing.marker_id
# --- shorthand markers: vft/vfb/vst/vsb ---
shorthand_match = next((p for p in _SHORTHAND if mid.startswith(p)), None)
# --- shorthand markers (vft:/vfb:/vst:/vsb: and pause variants) ---
shorthand_match = next(
(p for p in _SHORTHAND_PREFIXES if mid.startswith(p)), None
)
if shorthand_match:
video_id = mid[len(shorthand_match) :]
if video_id not in videos:
raise ValueError(
f"Marker [{mid}] references unknown video '{video_id}'. "
warnings.append(
f"[{mid}] references unknown video '{video_id}' — skipped. "
f"Add it to videos.json or remove the marker."
)
implied_cutout, implied_layer = _SHORTHAND[shorthand_match]
if implied_cutout not in cutouts:
raise ValueError(
f"Marker [{mid}] uses shorthand '{shorthand_match}' which requires "
f"cutout '{implied_cutout}' but it is not defined in project config. "
f"Available cutouts: {list(cutouts.keys())}"
continue
# Validate that videos.json has the correct cutout (written by ETL)
video_source = videos[video_id]
if not video_source.cutout or video_source.cutout not in cutouts:
warnings.append(
f"[{mid}] video '{video_id}' has no valid cutout in videos.json — "
f"run render once to project values, or set cutout manually."
)
video_markers.append(
(timing.timestamp, video_id, "video", implied_cutout, implied_layer)
)
continue
pause_narration = shorthand_match in _PAUSE_PREFIXES
video_markers.append((timing.timestamp, video_id, "video", pause_narration))
continue
# --- legacy [video:xxx] ---
if mid.startswith("video:"):
video_id = mid[6:]
if video_id not in videos:
raise ValueError(
f"Marker [video:{video_id}] references unknown video '{video_id}'. "
f"Add it to videos.json or remove the marker."
warnings.append(
f"[video:{video_id}] references unknown video '{video_id}' — skipped."
)
continue
video_source = videos[video_id]
if not video_source.cutout:
raise ValueError(
f"Marker [video:{video_id}] — video '{video_id}' has no 'cutout' set in videos.json."
if not video_source.cutout or video_source.cutout not in cutouts:
warnings.append(
f"[video:{video_id}] has no valid cutout in videos.json — skipped."
)
if video_source.cutout not in cutouts:
raise ValueError(
f"Marker [video:{video_id}] — cutout '{video_source.cutout}' is not defined in project config. "
f"Available: {list(cutouts.keys())}"
)
video_markers.append(
(timing.timestamp, video_id, "video", None, None)
)
continue
video_markers.append((timing.timestamp, video_id, "video", False))
continue
# --- [narration:xxx] ---
if mid.startswith("narration:"):
video_id = mid[10:]
if video_id not in videos:
raise ValueError(
f"Marker [narration:{video_id}] references unknown video '{video_id}'. "
f"Add it to videos.json or remove the marker."
warnings.append(
f"[narration:{video_id}] references unknown video '{video_id}' — skipped."
)
continue
video_source = videos[video_id]
if not video_source.cutout:
raise ValueError(
f"Marker [narration:{video_id}] — video '{video_id}' has no 'cutout' set in videos.json."
if not video_source.cutout or video_source.cutout not in cutouts:
warnings.append(
f"[narration:{video_id}] has no valid cutout in videos.json — skipped."
)
if video_source.cutout not in cutouts:
raise ValueError(
f"Marker [narration:{video_id}] — cutout '{video_source.cutout}' is not defined in project config. "
f"Available: {list(cutouts.keys())}"
)
video_markers.append(
(timing.timestamp, video_id, "narration", None, None)
)
continue
video_markers.append((timing.timestamp, video_id, "narration", False))
events: list[VideoEvent] = []
for (
start_time,
video_id,
marker_type,
cutout_override,
layer_override,
) in video_markers:
for start_time, video_id, marker_type, pause_narration in video_markers:
video_source = videos[video_id]
# Resolve cutout: marker override > videos.json cutout
# (validation already ensured cutout exists — this is a safety assertion)
cutout_name = cutout_override or video_source.cutout
# Read cutout and layer directly from videos.json (projected by ETL)
cutout_name = video_source.cutout
cutout = cutouts[cutout_name]
# Resolve layer: marker override > videos.json layer
layer = layer_override if layer_override is not None else video_source.layer
layer = video_source.layer
end_on = video_source.end_on
if end_on == "take" and video_source.take is not None:
end_time = start_time + video_source.take
elif end_on == "end":
end_time = total_duration
elif end_on == "next_slide" or (end_on is None and marker_type == "video"):
# End at next slide marker
elif end_on in ("next_slide", "slide") or (end_on is None and marker_type == "video"):
# End at next slide marker ("slide" is a recognised alias for "next_slide")
end_time = total_duration
for slide_time in slide_times:
if slide_time > start_time:
@@ -997,7 +1194,7 @@ def _extract_video_events(
)
)
return events
return events, warnings
def _extract_audio_events(
@@ -1014,19 +1211,22 @@ def _extract_audio_events(
continue
marker_id = timing.marker_id
audio_id = None
if marker_id.startswith("A") and len(marker_id) > 1:
audio_id = marker_id[1:]
if audio_id in audio:
if timing.timestamp < range_start or timing.timestamp >= range_end:
continue
start_time = max(0, timing.timestamp - AUDIO_OFFSET_SECONDS)
events.append(
AudioEvent(
audio_id=audio_id,
start_time=start_time,
audio_def=audio[audio_id],
)
elif marker_id.startswith("audio:"):
audio_id = marker_id[6:]
if audio_id is not None and audio_id in audio:
if timing.timestamp < range_start or timing.timestamp >= range_end:
continue
start_time = max(0, timing.timestamp - AUDIO_OFFSET_SECONDS)
events.append(
AudioEvent(
audio_id=audio_id,
start_time=start_time,
audio_def=audio[audio_id],
)
)
return events
+72 -11
View File
@@ -4,7 +4,7 @@ from pathlib import Path
from .cache import resolve_with_cache
from .errors import ValidationError, ValidationIssue
from .parser import _read_json
from .parser import _read_json, resolve_missing_videos
from .models import (
ProjectConfig,
SlideDefinition,
@@ -38,6 +38,24 @@ def validate_project(
issues: list[ValidationIssue] = []
warnings: list[ValidationIssue] = []
# Collect video IDs actually referenced in the manuscript (for file-existence checks)
_VIDEO_PREFIXES = {
"video:": 6,
"vft:": 4, "vfb:": 4, "vfm:": 4,
"vf2t:": 5, "vf2b:": 5, "vf2m:": 5,
"vst:": 4, "vsb:": 4, "vsm:": 4,
"vftp:": 5, "vfbp:": 5, "vfmp:": 5,
"vf2tp:": 6, "vf2bp:": 6, "vf2mp:": 6,
"vstp:": 5, "vsbp:": 5, "vsmp:": 5,
}
referenced_video_ids: set[str] = set()
for marker in manuscript_markers:
prefix = next((p for p in _VIDEO_PREFIXES if marker.startswith(p)), None)
if prefix is not None:
referenced_video_ids.add(marker[_VIDEO_PREFIXES[prefix]:])
elif marker.startswith("narration:"):
referenced_video_ids.add(marker[10:])
# Check for malformed markers first (these are likely typos)
if malformed_markers:
for line_num, marker_text in malformed_markers:
@@ -57,15 +75,11 @@ def validate_project(
# Skip audio markers (start with 'A' followed by audio id, e.g., Awoosh)
if marker.startswith("A") and len(marker) > 1 and marker[1:].isalnum():
continue
# Skip audio: prefix markers (e.g., audio:woosh)
if marker.startswith("audio:"):
continue
# Validate video trigger markers — both legacy [video:xxx] and
# shorthand [vft:xxx] / [vfb:xxx] / [vst:xxx] / [vsb:xxx].
_VIDEO_PREFIXES = {
"video:": 6,
"vft:": 4,
"vfb:": 4,
"vst:": 4,
"vsb:": 4,
}
matched_prefix = next(
(p for p in _VIDEO_PREFIXES if marker.startswith(p)), None
)
@@ -83,6 +97,16 @@ def validate_project(
project_path / "manuscript.txt",
)
)
else:
vs = videos[video_id]
if not vs.cutout or vs.cutout not in config.cutouts:
warnings.append(
ValidationIssue(
f"[{marker}] video '{video_id}' has no valid cutout in videos.json — "
f"run 'gnommo import' to project values, or set cutout manually.",
project_path / "manuscript.txt",
)
)
continue
# Validate narration trigger markers (narration:xxx) - continuous videos
@@ -95,6 +119,16 @@ def validate_project(
project_path / "manuscript.txt",
)
)
else:
vs = videos[video_id]
if not vs.cutout or vs.cutout not in config.cutouts:
warnings.append(
ValidationIssue(
f"[{marker}] video '{video_id}' has no valid cutout in videos.json — "
f"run 'gnommo import' to project values, or set cutout manually.",
project_path / "manuscript.txt",
)
)
continue
# Segment markers are structural annotations, not slide references
@@ -120,8 +154,10 @@ def validate_project(
)
# Check all slide images exist
# Slides are in the same directory as the slides.json file
slides_json_path = project_path / config.slides_path
# Slides are in the same directory as the slides.json file.
# Lowercase the configured path so capital-cased project names (e.g.
# "media/slides/Video2/slides.json") resolve on case-sensitive filesystems.
slides_json_path = project_path / config.slides_path.lower()
slides_dir = slides_json_path.parent
for slide_id, slide_def in slides.items():
@@ -155,6 +191,10 @@ def validate_project(
shared_assets_dir = project_path.parent / "shared_assets"
for video_id, video_source in videos.items():
# Only check files for videos actually used in this manuscript
if video_id not in referenced_video_ids:
continue
# Determine base directory based on is_shared flag
if video_source.is_shared:
if shared_assets_dir:
@@ -173,9 +213,15 @@ def validate_project(
video_path = base_dir / video_source.source_file
video_path, _ = resolve_with_cache(video_path, project_path)
if not video_path.exists():
sf = video_source.source_file
hint = (
" — run 'gnommo pexels' to download"
if sf.startswith("pexels/")
else " — falling back to PlaceholderVideo"
)
warnings.append(
ValidationIssue(
f"Video file not found: {video_source.source_file} — falling back to PlaceholderVideo",
f"Video file not found: {sf}{hint}",
videos_json_path,
)
)
@@ -216,6 +262,7 @@ def validate_project(
)
else:
bg_path = shared_assets_dir / bg_videos[bg_handle]["source_file"]
bg_path, _ = resolve_with_cache(bg_path, project_path)
if not bg_path.exists():
issues.append(
ValidationIssue(
@@ -259,6 +306,20 @@ def validate_project(
)
)
# Check outro videos exist in videos.json or shared_assets
if config.outro:
missing_outro = [vid_id for vid_id in config.outro if vid_id not in videos]
if missing_outro:
found = resolve_missing_videos(missing_outro, project_path, config)
still_missing = [vid_id for vid_id in missing_outro if vid_id not in found]
for vid_id in still_missing:
warnings.append(
ValidationIssue(
f"Outro video '{vid_id}' not found in videos.json or shared_assets — will be skipped at render",
project_path / "project.json",
)
)
# If any issues, raise ValidationError
if issues:
raise ValidationError(issues)
Executable
+10
View File
@@ -0,0 +1,10 @@
#!/bin/sh
./gnommo.sh -p video1 all
./gnommo.sh -p video2 all
./gnommo.sh -p video3 all
./gnommo.sh -p video4 all
./gnommo.sh -p video5 all
./gnommo.sh -p video6 all