Files
gnommo/gnommo/parser.py
T
2026-05-12 00:52:14 +02:00

827 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Extract stage: parse all input files."""
import json
import re
from pathlib import Path
from typing import Any, Optional
from .cache import resolve_with_cache
from .errors import ParseError
from .models import (
Attribution,
AudioDefinition,
Citation,
CutoutDefinition,
ProjectConfig,
SlideDefinition,
VideoMetadata,
VideoSource,
)
def _read_json(path: Path) -> Any:
"""Read and parse a JSON file, treating an empty file as {}."""
text = path.read_text(encoding="utf-8").strip()
return json.loads(text) if text else {}
def _resolve_case_insensitive(path: Path) -> Path:
"""Return the real on-disk path, resolving each component case-insensitively.
On case-insensitive filesystems (macOS) paths just work. On case-sensitive
ones (Linux/WSL) a mismatch between project.json and the actual directory
name causes a FileNotFoundError. This walks each component and picks the
first directory entry whose name matches case-insensitively, returning the
corrected path. If the path already exists, it is returned unchanged.
"""
if path.exists():
return path
resolved = path.anchor and Path(path.anchor) or Path(".")
for part in path.parts[len(Path(path.anchor).parts) :]:
if (resolved / part).exists():
resolved = resolved / part
else:
try:
match = next(
(p for p in resolved.iterdir() if p.name.lower() == part.lower()),
None,
)
except (OSError, NotADirectoryError):
match = None
resolved = match if match else (resolved / part)
return resolved
def parse_manuscript(
project_path: Path,
) -> tuple[str, list[str], list[tuple[int, str]], list[Citation]]:
"""
Parse manuscript.txt and extract text content and slide markers.
Strips [cite:...] and [marker:...] markers from the returned text so they
never pollute alignment contexts. Citations are extracted and returned
separately. Marker cues are personal recording notes and are simply discarded.
Returns:
Tuple of (full text, list of marker IDs found, list of malformed markers, list of citations)
"""
manuscript_path = project_path / "manuscript.txt"
if not manuscript_path.exists():
raise ParseError("manuscript.txt not found", manuscript_path)
text = manuscript_path.read_text(encoding="utf-8")
# Extract citations before stripping them
citations = parse_citations(text)
# Strip [cite:...] markers from text so they don't pollute alignment
text = re.sub(r"\[cite:[^\]]+\]", "", text)
# Strip [marker:...] and [cue:...] markers (personal recording cues, ignored by pipeline)
text = re.sub(r"\[marker:[^\]]+\]", "", text)
text = re.sub(r"\[cue:[^\]]+\]", "", text)
# Extract all valid markers like [S1], [video:demo], [Zoom2], etc.
# Include . in pattern to catch markers with file extensions (so validator can warn about them)
markers = re.findall(r"\[([A-Za-z0-9_:.]+)\]", text)
# Find malformed markers (missing brackets, extra spaces, etc.)
malformed: list[tuple[int, str]] = []
lines = text.split("\n")
for line_num, line in enumerate(lines, start=1):
# Pattern for potential markers that are malformed:
# - Missing closing bracket: [S1 or [S12 (not followed by ])
# - Extra spaces: [S 1] or [S1 ] or [ S1]
# Find unclosed brackets: [S followed by digits, then space/newline/EOF (not ])
# Match [S1, [S12, [S123 etc that are NOT followed by ]
for match in re.finditer(r"\[S\d+", line):
start, end = match.span()
# Check if there's a ] immediately after
if end >= len(line) or line[end] != "]":
malformed.append((line_num, match.group()))
# Find markers with internal/trailing spaces like [S 1] or [S1 ] or [ S1]
spaced = re.findall(r"\[\s+S\d+\s*\]|\[S\d+\s+\]|\[S\s+\d+\]", line)
for match in spaced:
malformed.append((line_num, match))
return text, markers, malformed, citations
def parse_citations(manuscript_text: str) -> list[Citation]:
"""
Extract all [cite:...] markers from manuscript text.
The text after 'cite:' is the literal reference that should appear
in the video description.
Returns:
List of Citation objects with reference text and context for alignment.
"""
citations = []
# Match [cite:...] markers - content can include any characters except ]
# Use a more permissive pattern that handles multi-word citations
pattern = r"\[cite:([^\]]+)\]"
for match in re.finditer(pattern, manuscript_text):
reference = match.group(1).strip()
marker_id = f"cite:{reference}"
# Extract context: text following the citation (for alignment)
# Get up to 100 chars after the marker, stopping at next marker or newline
end_pos = match.end()
context_text = manuscript_text[end_pos : end_pos + 150]
# Clean up context: take text until next marker or double newline
context_match = re.match(r"([^\[]*?)(?:\[|\n\n|$)", context_text)
context = context_match.group(1).strip() if context_match else ""
# Truncate context to ~50 chars for display
if len(context) > 50:
context = context[:47] + "..."
citations.append(
Citation(
reference=reference,
marker_id=marker_id,
context=context,
)
)
return citations
def save_citations(citations: list[Citation], path: Path) -> None:
"""Save citations to a JSON file."""
data = [{"reference": c.reference, "context": c.context} for c in citations]
path.write_text(json.dumps(data, indent=2), encoding="utf-8")
def load_citations(path: Path) -> list[Citation]:
"""Load citations from a JSON file."""
if not path.exists():
return []
data = _read_json(path)
return [
Citation(
reference=item["reference"],
marker_id=f"cite:{item['reference']}",
context=item.get("context", ""),
)
for item in data
]
def parse_project_config(project_path: Path) -> ProjectConfig:
"""Parse project.json into ProjectConfig."""
config_path = project_path / "project.json"
if not config_path.exists():
raise ParseError("project.json not found", config_path)
try:
data = _read_json(config_path)
except json.JSONDecodeError as e:
raise ParseError(f"Invalid JSON: {e}", config_path)
# Built-in cutouts — used by vft/vfb/vst/vsb marker shorthand.
# Projects can override these by defining cutouts with the same names.
cutouts: dict[str, CutoutDefinition] = {
# 100 % × 100 % at origin — for fullscreen video (vf* markers)
"fullscreen": CutoutDefinition(
x=-1,
y=-1,
height=-1,
width=-1,
x_percent=0.0,
y_percent=0.0,
height_percent=1.0,
width_percent=1.0,
),
# 50 % height, square aspect, centred — for square video (vs* markers)
"square": CutoutDefinition(
x=-1,
y=-1,
height=-1,
width=-1,
x_percent=0.25,
y_percent=0.25,
height_percent=0.5,
width_percent=0.0,
),
}
# Parse cutouts (named zones for video placement) — project definitions
# override the built-ins above.
cutouts_data = data.get("cutouts", {})
for cutout_name, cutout_data in cutouts_data.items():
x, x_pct = _parse_dimension(cutout_data.get("x", 0))
y, y_pct = _parse_dimension(cutout_data.get("y", 0))
height, height_pct = _parse_dimension(cutout_data.get("height", 200))
# Width defaults to same as height (square) if not specified
width, width_pct = _parse_dimension(
cutout_data.get("width", cutout_data.get("height", 200))
)
cutouts[cutout_name] = CutoutDefinition(
x=x,
y=y,
height=height,
width=width,
x_percent=x_pct,
y_percent=y_pct,
height_percent=height_pct,
width_percent=width_pct,
)
# Parse resolution
resolution = data.get("resolution", [1920, 1080])
if not isinstance(resolution, list) or len(resolution) != 2:
raise ParseError("resolution must be [width, height]", config_path)
# Parse default_filters (named filter presets)
default_filters: dict[str, list[dict]] = data.get("default_filters", {})
return ProjectConfig(
resolution=tuple(resolution),
fps=data.get("fps", 30),
default_slide_type=data.get("defaultSlideType", "square"),
cutouts=cutouts,
default_filters=default_filters,
background=data.get("background", ""),
background_video=data.get("background_video", ""), # Deprecated
slides_path=data.get("slides", "slides.json"),
videos_path=data.get("videos", "videos.json"),
audio_path=data.get("audio", "audio.json"),
audio_source=data.get("audio_source"),
main_video=data.get("main_video"),
gnommo_scratch=data.get("gnommo_scratch"),
default_begin=float(data.get("default_begin", 0.0)),
default_end_trim=float(data.get("default_end_trim", 0.0)),
outro=data.get("outro", []),
description=data.get("description", ""),
footer=data.get("footer", ""),
output_video=data.get("output_video", ""),
)
def _parse_dimension(value: Any) -> tuple[int, float]:
"""
Parse a dimension value (can be int or string like '100%').
Returns:
Tuple of (pixels, percentage). If pixels is -1, use percentage.
"""
if isinstance(value, int):
return value, 0.0
if isinstance(value, str):
if value.endswith("%"):
pct = float(value[:-1]) / 100.0
return -1, pct
return int(value), 0.0
return 200, 0.0 # default
def parse_slides(
project_path: Path, config: ProjectConfig = None
) -> dict[str, SlideDefinition]:
"""Parse slides.json into slide definitions."""
if config and config.slides_path:
# Lowercase the path so that a capital-cased project name embedded by
# the import stage (e.g. "media/slides/video2/slides.json") resolves
# correctly on case-sensitive filesystems (WSL/Linux).
local_slides_path = project_path / config.slides_path.lower()
else:
local_slides_path = project_path / "slides.json"
# Try cache fallback for reading JSON
slides_path, _ = resolve_with_cache(local_slides_path, project_path)
if not slides_path.exists():
raise ParseError(
f"slides file not found: {local_slides_path}", local_slides_path
)
try:
data = _read_json(slides_path)
except json.JSONDecodeError as e:
raise ParseError(f"Invalid JSON: {e}", slides_path)
slides = {}
for slide_id, slide_data in data.items():
if "image" not in slide_data:
raise ParseError(
f"Slide '{slide_id}' missing required field 'image'", slides_path
)
slides[slide_id] = SlideDefinition(
image=slide_data["image"],
type=slide_data.get("type", "square"),
)
return slides
def parse_audio(
project_path: Path, config: Optional[ProjectConfig] = None
) -> tuple[dict[str, AudioDefinition], Path]:
"""
Parse audio.json into audio definitions.
Returns:
Tuple of (audio dict, audio_dir) where audio_dir is the directory
containing audio.json (for resolving relative file paths).
"""
if config and config.audio_path:
local_audio_path = project_path / config.audio_path
else:
local_audio_path = project_path / "audio.json"
# Keep local directory for file lookups (cache fallback handles resolution)
audio_dir = local_audio_path.parent
# Try cache fallback for reading JSON
audio_path, _ = resolve_with_cache(local_audio_path, project_path)
# Audio is optional - return empty dict if not found
if not audio_path.exists():
return {}, audio_dir
try:
data = _read_json(audio_path)
except json.JSONDecodeError as e:
raise ParseError(f"Invalid JSON: {e}", audio_path)
audio = {}
for audio_id, audio_data in data.items():
if "file" not in audio_data:
raise ParseError(
f"Audio '{audio_id}' missing required field 'file'", audio_path
)
# Parse overlap if specified (timestamp string like "10s")
overlap = None
if "overlap" in audio_data and audio_data["overlap"]:
overlap = parse_timestamp(audio_data["overlap"])
raw_duration = audio_data.get("duration")
audio[audio_id] = AudioDefinition(
file=audio_data["file"],
volume=float(audio_data.get("volume", 1.0)),
loop=bool(audio_data.get("loop", False)),
overlap=overlap,
ignore_pauses=bool(audio_data.get("ignore_pauses", False)),
duration=float(raw_duration) if raw_duration is not None else None,
is_shared=bool(audio_data.get("is_shared", False)),
)
return audio, audio_dir
def parse_timestamp(value: str) -> float:
"""
Parse a timestamp string into seconds.
Supported formats:
- "3.5s" or "3.5" → 3.5 seconds
- "2:54" → 2 minutes 54 seconds (174.0)
- "1:23:45" → 1 hour 23 minutes 45 seconds
- "2:54.5" → 2 minutes 54.5 seconds
Returns:
Time in seconds as a float.
"""
if not value:
return 0.0
value = value.strip()
# Remove trailing 's' if present (e.g., "3.5s")
if value.endswith("s"):
value = value[:-1]
# Check for colon-separated format (MM:SS or HH:MM:SS)
if ":" in value:
parts = value.split(":")
if len(parts) == 2:
# MM:SS format
minutes, seconds = parts
return float(minutes) * 60 + float(seconds)
elif len(parts) == 3:
# HH:MM:SS format
hours, minutes, seconds = parts
return float(hours) * 3600 + float(minutes) * 60 + float(seconds)
else:
raise ParseError(f"Invalid timestamp format: {value}", None)
# Plain number (seconds)
return float(value)
def parse_videos(
project_path: Path, config: Optional[ProjectConfig] = None
) -> tuple[dict[str, VideoSource], Path]:
"""
Parse videos.json into video source definitions.
Filter can be specified as:
- A list of filter configs (inline definition)
- A string referencing a named preset in config.default_filters
Trim points can be specified as:
- skip/take: raw values in seconds (traditional)
- begin/end: timestamp strings like "3.5s", "2:54", "1:23:45" (user-friendly)
These are converted to skip/take internally.
Returns:
Tuple of (videos dict, videos_dir) where videos_dir is the directory
containing videos.json (for resolving relative file paths).
"""
if config and config.videos_path:
local_videos_path = project_path / config.videos_path
else:
local_videos_path = project_path / "videos.json"
# Keep local directory for file lookups (cache fallback handles resolution)
videos_dir = local_videos_path.parent
# Try cache fallback for reading JSON
videos_path, _ = resolve_with_cache(local_videos_path, project_path)
if not videos_path.exists():
raise ParseError(
f"videos.json not found: {local_videos_path}", local_videos_path
)
try:
data = _read_json(videos_path)
except json.JSONDecodeError as e:
raise ParseError(f"Invalid JSON: {e}", videos_path)
# Get default_filters from config for resolving references
default_filters = config.default_filters if config else {}
videos = {}
for video_id, video_data in data.items():
if "source_file" not in video_data:
raise ParseError(
f"Video '{video_id}' missing required field 'source_file'", videos_path
)
# Parse attribution if present
attribution = None
if "attribution" in video_data:
attr_data = video_data["attribution"]
attribution = Attribution(
source=attr_data.get("source", "unknown"),
creator=attr_data.get("creator", "Unknown"),
url=attr_data.get("url"),
)
# Resolve filter - can be a list or a string reference to default_filters
filter_value = video_data.get("filter", [])
if isinstance(filter_value, str):
# It's a reference to a named filter preset
if filter_value not in default_filters:
raise ParseError(
f"Video '{video_id}' references unknown filter preset '{filter_value}'. "
f"Available presets: {list(default_filters.keys())}",
videos_path,
)
filter_list = default_filters[filter_value]
else:
# It's an inline filter definition
filter_list = filter_value
# Handle skip/take - can use begin/end as user-friendly alternatives
skip = float(video_data.get("skip") or 0.0)
take = float(video_data["take"]) if video_data.get("take") not in (None, "") else None
# Convert begin/end to skip/take if provided
if "begin" in video_data and video_data["begin"]:
skip = parse_timestamp(video_data["begin"])
if "end" in video_data and video_data["end"]:
end_time = parse_timestamp(video_data["end"])
# take = end - begin (duration from begin to end)
take = end_time - skip
raw_duration = video_data.get("duration")
raw_has_audio = video_data.get("has_audio")
videos[video_id] = VideoSource(
source_file=video_data["source_file"],
filter=filter_list,
output_file=video_data.get("output_file"),
take=take,
skip=skip,
zoom=video_data.get("zoom", 1.0),
cutout=video_data.get("cutout"),
always_visible=video_data.get("always_visible", False),
is_shared=video_data.get("is_shared", False),
pause_narration=float(video_data.get("pause_narration", 0)),
attribution=attribution,
use_audio_channels=video_data.get("use_audio_channels", "both"),
defer_loudnorm=video_data.get("defer_loudnorm", False),
volume=float(video_data.get("volume", 1.0)),
layer=video_data.get("layer", "above"),
duration=float(raw_duration) if raw_duration is not None else None,
has_audio=bool(raw_has_audio) if raw_has_audio is not None else None,
end_on=video_data.get("end_on"),
)
return videos, videos_dir
def parse_narration(
project_path: Path, config: Optional[ProjectConfig] = None
) -> tuple[dict[str, VideoSource], Path]:
"""
Parse narration.json into narration segment definitions.
Narration segments are stored in media/narration/ and are processed
separately from videos. Each segment can have filters, begin/end trim
points, and other properties similar to videos.
Filter can be specified as:
- A list of filter configs (inline definition)
- A string referencing a named preset in config.default_filters
Trim points can be specified as:
- skip/take: raw values in seconds (traditional)
- begin/end: timestamp strings like "3.5s", "2:54", "1:23:45" (user-friendly)
These are converted to skip/take internally.
Returns:
Tuple of (narration dict, narration_dir) where narration_dir is the directory
containing narration.json (for resolving relative file paths).
"""
# Narration is always in media/narration/
# Keep local directory for file lookups (cache fallback handles resolution)
narration_dir = project_path / "media" / "narration"
local_narration_path = narration_dir / "narration.json"
# Try cache fallback for reading JSON
narration_path, _ = resolve_with_cache(local_narration_path, project_path)
# Narration is optional - return empty dict if not found
if not narration_path.exists():
return {}, narration_dir
try:
data = _read_json(narration_path)
except json.JSONDecodeError as e:
raise ParseError(f"Invalid JSON: {e}", narration_path)
# Get default_filters from config for resolving references
default_filters = config.default_filters if config else {}
narration = {}
for segment_id, segment_data in data.items():
if "source_file" not in segment_data:
raise ParseError(
f"Narration segment '{segment_id}' missing required field 'source_file'",
narration_path,
)
# Resolve filter - can be a list or a string reference to default_filters
filter_value = segment_data.get("filter", [])
if isinstance(filter_value, str):
# It's a reference to a named filter preset
if filter_value not in default_filters:
raise ParseError(
f"Narration segment '{segment_id}' references unknown filter preset '{filter_value}'. "
f"Available presets: {list(default_filters.keys())}",
narration_path,
)
filter_list = default_filters[filter_value]
else:
# It's an inline filter definition
filter_list = filter_value
# Handle skip/take - can use begin/end as user-friendly alternatives
# Fall back to project-level defaults if no explicit value is set
default_begin = config.default_begin if config else 0.0
skip = segment_data.get("skip", default_begin)
take = segment_data.get("take")
# Explicit begin/end always override defaults
if "begin" in segment_data and segment_data["begin"]:
skip = parse_timestamp(segment_data["begin"])
if "end" in segment_data and segment_data["end"]:
end_time = parse_timestamp(segment_data["end"])
# take = end - begin (duration from begin to end)
take = end_time - skip
narration[segment_id] = VideoSource(
source_file=segment_data["source_file"],
filter=filter_list,
output_file=segment_data.get("output_file"),
take=take,
skip=skip,
zoom=segment_data.get("zoom", 1.0),
cutout=segment_data.get("cutout"),
always_visible=segment_data.get("always_visible", False),
use_audio_channels=segment_data.get("use_audio_channels", "both"),
defer_loudnorm=segment_data.get("defer_loudnorm", False),
volume=float(segment_data.get("volume", 1.0)),
)
return narration, narration_dir
def get_video_duration(video_path: Path) -> float:
"""Get duration of a video file using ffprobe."""
import subprocess
cmd = [
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
str(video_path),
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise ParseError(f"Failed to get duration: {result.stderr}", video_path)
return float(result.stdout.strip())
def parse_video_metadata(metadata_path: Path) -> VideoMetadata:
"""
Parse a video metadata JSON file.
Expected format:
{
"source_file": "talking_head.mov",
"preprocess": [
{"type": "chroma_key", "color": [0, 255, 0], "similarity": 0.15}
],
"output": {
"file": "intermediate/talking_head_rgba.mov",
"colorspace": "rgba",
"alpha": "straight"
}
}
"""
if not metadata_path.exists():
raise ParseError(f"Video metadata not found: {metadata_path}", metadata_path)
try:
data = _read_json(metadata_path)
except json.JSONDecodeError as e:
raise ParseError(f"Invalid JSON: {e}", metadata_path)
if "source_file" not in data:
raise ParseError(
"Video metadata missing required field 'source_file'", metadata_path
)
return VideoMetadata(
source_file=data["source_file"],
preprocess=data.get("preprocess", []),
output=data.get("output"),
)
def resolve_video_file(
project_path: Path, file_ref: str
) -> tuple[Path, Optional[VideoMetadata]]:
"""
Resolve a video file reference, which can be either:
1. A direct path to a video file
2. A path to a metadata JSON file
Returns:
Tuple of (actual video path to use, metadata if JSON file was used)
"""
ref_path = project_path / file_ref
# Check if it's a metadata JSON file
if file_ref.endswith(".json") and ref_path.exists():
metadata = parse_video_metadata(ref_path)
# Resolve paths relative to the metadata file's directory
metadata_dir = ref_path.parent
# If output is specified and exists, use it; otherwise use source
if metadata.output and metadata.output.get("file"):
output_path = metadata_dir / metadata.output["file"]
if output_path.exists():
return output_path, metadata
# Fall back to source file
source_path = metadata_dir / metadata.source_file
return source_path, metadata
# Direct video file reference
return ref_path, None
def resolve_missing_videos(
missing_ids: list[str],
project_path: Path,
config: Optional[ProjectConfig] = None,
) -> dict[str, VideoSource]:
"""
For video IDs not found in the project's videos.json, look them up in
shared_assets/videos.json. When a match is found the entry is written back
into the project's videos.json with ``is_shared: true`` so subsequent runs
find it without another lookup.
Returns a dict of newly resolved VideoSource objects (only the ones found).
Silently ignores IDs that aren't in the shared library either.
"""
if not missing_ids:
return {}
# Locate shared_assets
shared_dir: Optional[Path] = None
if (project_path / "shared_assets").exists():
shared_dir = project_path / "shared_assets"
elif (project_path.parent / "shared_assets").exists():
shared_dir = project_path.parent / "shared_assets"
if shared_dir is None:
return {}
shared_videos_path = shared_dir / "videos.json"
if not shared_videos_path.exists():
return {}
try:
shared_data = _read_json(shared_videos_path)
except (json.JSONDecodeError, OSError):
return {}
found = {vid_id for vid_id in missing_ids if vid_id in shared_data}
if not found:
return {}
# Load the project's videos.json so we can append to it
if config and config.videos_path:
local_videos_path = project_path / config.videos_path
else:
local_videos_path = project_path / "videos.json"
try:
local_data = _read_json(local_videos_path) if local_videos_path.exists() else {}
except (json.JSONDecodeError, OSError):
local_data = {}
resolved: dict[str, VideoSource] = {}
for video_id in sorted(found):
entry = dict(shared_data[video_id])
entry["is_shared"] = True
# Persist into the project's videos.json
local_data[video_id] = entry
print(f" → Copied shared video '{video_id}' into videos.json (is_shared=true)")
# Build the in-memory VideoSource
attribution = None
if "attribution" in entry:
attr = entry["attribution"]
attribution = Attribution(
source=attr.get("source", "unknown"),
creator=attr.get("creator", "Unknown"),
url=attr.get("url"),
)
raw_duration = entry.get("duration")
raw_has_audio = entry.get("has_audio")
resolved[video_id] = VideoSource(
source_file=entry["source_file"],
filter=entry.get("filter", []),
output_file=entry.get("output_file"),
take=entry.get("take"),
skip=float(entry.get("skip", 0.0)),
zoom=float(entry.get("zoom", 1.0)),
cutout=entry.get("cutout"),
always_visible=bool(entry.get("always_visible", False)),
is_shared=True,
pause_narration=float(entry.get("pause_narration", 0)),
attribution=attribution,
use_audio_channels=entry.get("use_audio_channels", "both"),
defer_loudnorm=bool(entry.get("defer_loudnorm", False)),
volume=float(entry.get("volume", 1.0)),
layer=entry.get("layer", "above"),
duration=float(raw_duration) if raw_duration is not None else None,
has_audio=bool(raw_has_audio) if raw_has_audio is not None else None,
end_on=entry.get("end_on"),
)
try:
with open(local_videos_path, "w", encoding="utf-8") as fh:
json.dump(local_data, fh, indent=4)
fh.write("\n")
except OSError as e:
print(f" Warning: could not update videos.json: {e}")
return resolved