Files
gnommo/gnommo/parser.py
T

652 lines
23 KiB
Python

"""Extract stage: parse all input files."""
import json
import re
from pathlib import Path
from typing import Any, Optional
from .cache import resolve_with_cache
from .errors import ParseError
from .models import (
Attribution,
AudioDefinition,
Citation,
CutoutDefinition,
ProjectConfig,
SlideDefinition,
VideoMetadata,
VideoSource,
)
def _read_json(path: Path) -> Any:
"""Read and parse a JSON file, treating an empty file as {}."""
text = path.read_text(encoding="utf-8").strip()
return json.loads(text) if text else {}
def parse_manuscript(
project_path: Path,
) -> tuple[str, list[str], list[tuple[int, str]], list[Citation]]:
"""
Parse manuscript.txt and extract text content and slide markers.
Strips [cite:...] and [marker:...] markers from the returned text so they
never pollute alignment contexts. Citations are extracted and returned
separately. Marker cues are personal recording notes and are simply discarded.
Returns:
Tuple of (full text, list of marker IDs found, list of malformed markers, list of citations)
"""
manuscript_path = project_path / "manuscript.txt"
if not manuscript_path.exists():
raise ParseError("manuscript.txt not found", manuscript_path)
text = manuscript_path.read_text(encoding="utf-8")
# Extract citations before stripping them
citations = parse_citations(text)
# Strip [cite:...] markers from text so they don't pollute alignment
text = re.sub(r"\[cite:[^\]]+\]", "", text)
# Strip [marker:...] and [cue:...] markers (personal recording cues, ignored by pipeline)
text = re.sub(r"\[marker:[^\]]+\]", "", text)
text = re.sub(r"\[cue:[^\]]+\]", "", text)
# Extract all valid markers like [S1], [video:demo], [Zoom2], etc.
# Include . in pattern to catch markers with file extensions (so validator can warn about them)
markers = re.findall(r"\[([A-Za-z0-9_:.]+)\]", text)
# Find malformed markers (missing brackets, extra spaces, etc.)
malformed: list[tuple[int, str]] = []
lines = text.split("\n")
for line_num, line in enumerate(lines, start=1):
# Pattern for potential markers that are malformed:
# - Missing closing bracket: [S1 or [S12 (not followed by ])
# - Extra spaces: [S 1] or [S1 ] or [ S1]
# Find unclosed brackets: [S followed by digits, then space/newline/EOF (not ])
# Match [S1, [S12, [S123 etc that are NOT followed by ]
for match in re.finditer(r"\[S\d+", line):
start, end = match.span()
# Check if there's a ] immediately after
if end >= len(line) or line[end] != "]":
malformed.append((line_num, match.group()))
# Find markers with internal/trailing spaces like [S 1] or [S1 ] or [ S1]
spaced = re.findall(r"\[\s+S\d+\s*\]|\[S\d+\s+\]|\[S\s+\d+\]", line)
for match in spaced:
malformed.append((line_num, match))
return text, markers, malformed, citations
def parse_citations(manuscript_text: str) -> list[Citation]:
"""
Extract all [cite:...] markers from manuscript text.
The text after 'cite:' is the literal reference that should appear
in the video description.
Returns:
List of Citation objects with reference text and context for alignment.
"""
citations = []
# Match [cite:...] markers - content can include any characters except ]
# Use a more permissive pattern that handles multi-word citations
pattern = r"\[cite:([^\]]+)\]"
for match in re.finditer(pattern, manuscript_text):
reference = match.group(1).strip()
marker_id = f"cite:{reference}"
# Extract context: text following the citation (for alignment)
# Get up to 100 chars after the marker, stopping at next marker or newline
end_pos = match.end()
context_text = manuscript_text[end_pos : end_pos + 150]
# Clean up context: take text until next marker or double newline
context_match = re.match(r"([^\[]*?)(?:\[|\n\n|$)", context_text)
context = context_match.group(1).strip() if context_match else ""
# Truncate context to ~50 chars for display
if len(context) > 50:
context = context[:47] + "..."
citations.append(
Citation(
reference=reference,
marker_id=marker_id,
context=context,
)
)
return citations
def save_citations(citations: list[Citation], path: Path) -> None:
"""Save citations to a JSON file."""
data = [{"reference": c.reference, "context": c.context} for c in citations]
path.write_text(json.dumps(data, indent=2), encoding="utf-8")
def load_citations(path: Path) -> list[Citation]:
"""Load citations from a JSON file."""
if not path.exists():
return []
data = _read_json(path)
return [
Citation(
reference=item["reference"],
marker_id=f"cite:{item['reference']}",
context=item.get("context", ""),
)
for item in data
]
def parse_project_config(project_path: Path) -> ProjectConfig:
"""Parse project.json into ProjectConfig."""
config_path = project_path / "project.json"
if not config_path.exists():
raise ParseError("project.json not found", config_path)
try:
data = _read_json(config_path)
except json.JSONDecodeError as e:
raise ParseError(f"Invalid JSON: {e}", config_path)
# Parse cutouts (named zones for video placement)
cutouts: dict[str, CutoutDefinition] = {}
cutouts_data = data.get("cutouts", {})
for cutout_name, cutout_data in cutouts_data.items():
x, x_pct = _parse_dimension(cutout_data.get("x", 0))
y, y_pct = _parse_dimension(cutout_data.get("y", 0))
height, height_pct = _parse_dimension(cutout_data.get("height", 200))
# Width defaults to same as height (square) if not specified
width, width_pct = _parse_dimension(
cutout_data.get("width", cutout_data.get("height", 200))
)
cutouts[cutout_name] = CutoutDefinition(
x=x,
y=y,
height=height,
width=width,
x_percent=x_pct,
y_percent=y_pct,
height_percent=height_pct,
width_percent=width_pct,
)
# Parse resolution
resolution = data.get("resolution", [1920, 1080])
if not isinstance(resolution, list) or len(resolution) != 2:
raise ParseError("resolution must be [width, height]", config_path)
# Parse default_filters (named filter presets)
default_filters: dict[str, list[dict]] = data.get("default_filters", {})
return ProjectConfig(
resolution=tuple(resolution),
fps=data.get("fps", 30),
default_slide_type=data.get("defaultSlideType", "square"),
cutouts=cutouts,
default_filters=default_filters,
background=data.get("background", ""),
background_video=data.get("background_video", ""), # Deprecated
slides_path=data.get("slides", "slides.json"),
videos_path=data.get("videos", "videos.json"),
audio_path=data.get("audio", "audio.json"),
audio_source=data.get("audio_source"),
main_video=data.get("main_video"),
gnommo_scratch=data.get("gnommo_scratch"),
default_begin=float(data.get("default_begin", 0.0)),
default_end_trim=float(data.get("default_end_trim", 0.0)),
outro=data.get("outro", []),
description=data.get("description", ""),
footer=data.get("footer", ""),
output_video=data.get("output_video", ""),
)
def _parse_dimension(value: Any) -> tuple[int, float]:
"""
Parse a dimension value (can be int or string like '100%').
Returns:
Tuple of (pixels, percentage). If pixels is -1, use percentage.
"""
if isinstance(value, int):
return value, 0.0
if isinstance(value, str):
if value.endswith("%"):
pct = float(value[:-1]) / 100.0
return -1, pct
return int(value), 0.0
return 200, 0.0 # default
def parse_slides(
project_path: Path, config: ProjectConfig = None
) -> dict[str, SlideDefinition]:
"""Parse slides.json into slide definitions."""
if config and config.slides_path:
local_slides_path = project_path / config.slides_path
else:
local_slides_path = project_path / "slides.json"
# Try cache fallback for reading JSON
slides_path, _ = resolve_with_cache(local_slides_path, project_path)
if not slides_path.exists():
raise ParseError(f"slides file not found: {local_slides_path}", local_slides_path)
try:
data = _read_json(slides_path)
except json.JSONDecodeError as e:
raise ParseError(f"Invalid JSON: {e}", slides_path)
slides = {}
for slide_id, slide_data in data.items():
if "image" not in slide_data:
raise ParseError(
f"Slide '{slide_id}' missing required field 'image'", slides_path
)
slides[slide_id] = SlideDefinition(
image=slide_data["image"],
type=slide_data.get("type", "square"),
)
return slides
def parse_audio(
project_path: Path, config: Optional[ProjectConfig] = None
) -> tuple[dict[str, AudioDefinition], Path]:
"""
Parse audio.json into audio definitions.
Returns:
Tuple of (audio dict, audio_dir) where audio_dir is the directory
containing audio.json (for resolving relative file paths).
"""
if config and config.audio_path:
local_audio_path = project_path / config.audio_path
else:
local_audio_path = project_path / "audio.json"
# Keep local directory for file lookups (cache fallback handles resolution)
audio_dir = local_audio_path.parent
# Try cache fallback for reading JSON
audio_path, _ = resolve_with_cache(local_audio_path, project_path)
# Audio is optional - return empty dict if not found
if not audio_path.exists():
return {}, audio_dir
try:
data = _read_json(audio_path)
except json.JSONDecodeError as e:
raise ParseError(f"Invalid JSON: {e}", audio_path)
audio = {}
for audio_id, audio_data in data.items():
if "file" not in audio_data:
raise ParseError(
f"Audio '{audio_id}' missing required field 'file'", audio_path
)
# Parse overlap if specified (timestamp string like "10s")
overlap = None
if "overlap" in audio_data and audio_data["overlap"]:
overlap = parse_timestamp(audio_data["overlap"])
audio[audio_id] = AudioDefinition(
file=audio_data["file"],
volume=float(audio_data.get("volume", 1.0)),
loop=bool(audio_data.get("loop", False)),
overlap=overlap,
ignore_pauses=bool(audio_data.get("ignore_pauses", False)),
)
return audio, audio_dir
def parse_timestamp(value: str) -> float:
"""
Parse a timestamp string into seconds.
Supported formats:
- "3.5s" or "3.5" → 3.5 seconds
- "2:54" → 2 minutes 54 seconds (174.0)
- "1:23:45" → 1 hour 23 minutes 45 seconds
- "2:54.5" → 2 minutes 54.5 seconds
Returns:
Time in seconds as a float.
"""
if not value:
return 0.0
value = value.strip()
# Remove trailing 's' if present (e.g., "3.5s")
if value.endswith("s"):
value = value[:-1]
# Check for colon-separated format (MM:SS or HH:MM:SS)
if ":" in value:
parts = value.split(":")
if len(parts) == 2:
# MM:SS format
minutes, seconds = parts
return float(minutes) * 60 + float(seconds)
elif len(parts) == 3:
# HH:MM:SS format
hours, minutes, seconds = parts
return float(hours) * 3600 + float(minutes) * 60 + float(seconds)
else:
raise ParseError(f"Invalid timestamp format: {value}", None)
# Plain number (seconds)
return float(value)
def parse_videos(
project_path: Path, config: Optional[ProjectConfig] = None
) -> tuple[dict[str, VideoSource], Path]:
"""
Parse videos.json into video source definitions.
Filter can be specified as:
- A list of filter configs (inline definition)
- A string referencing a named preset in config.default_filters
Trim points can be specified as:
- skip/take: raw values in seconds (traditional)
- begin/end: timestamp strings like "3.5s", "2:54", "1:23:45" (user-friendly)
These are converted to skip/take internally.
Returns:
Tuple of (videos dict, videos_dir) where videos_dir is the directory
containing videos.json (for resolving relative file paths).
"""
if config and config.videos_path:
local_videos_path = project_path / config.videos_path
else:
local_videos_path = project_path / "videos.json"
# Keep local directory for file lookups (cache fallback handles resolution)
videos_dir = local_videos_path.parent
# Try cache fallback for reading JSON
videos_path, _ = resolve_with_cache(local_videos_path, project_path)
if not videos_path.exists():
raise ParseError(f"videos.json not found: {local_videos_path}", local_videos_path)
try:
data = _read_json(videos_path)
except json.JSONDecodeError as e:
raise ParseError(f"Invalid JSON: {e}", videos_path)
# Get default_filters from config for resolving references
default_filters = config.default_filters if config else {}
videos = {}
for video_id, video_data in data.items():
if "source_file" not in video_data:
raise ParseError(
f"Video '{video_id}' missing required field 'source_file'", videos_path
)
# Parse attribution if present
attribution = None
if "attribution" in video_data:
attr_data = video_data["attribution"]
attribution = Attribution(
source=attr_data.get("source", "unknown"),
creator=attr_data.get("creator", "Unknown"),
url=attr_data.get("url"),
)
# Resolve filter - can be a list or a string reference to default_filters
filter_value = video_data.get("filter", [])
if isinstance(filter_value, str):
# It's a reference to a named filter preset
if filter_value not in default_filters:
raise ParseError(
f"Video '{video_id}' references unknown filter preset '{filter_value}'. "
f"Available presets: {list(default_filters.keys())}",
videos_path,
)
filter_list = default_filters[filter_value]
else:
# It's an inline filter definition
filter_list = filter_value
# Handle skip/take - can use begin/end as user-friendly alternatives
skip = video_data.get("skip", 0.0)
take = video_data.get("take")
# Convert begin/end to skip/take if provided
if "begin" in video_data and video_data["begin"]:
skip = parse_timestamp(video_data["begin"])
if "end" in video_data and video_data["end"]:
end_time = parse_timestamp(video_data["end"])
# take = end - begin (duration from begin to end)
take = end_time - skip
videos[video_id] = VideoSource(
source_file=video_data["source_file"],
filter=filter_list,
output_file=video_data.get("output_file"),
take=take,
skip=skip,
zoom=video_data.get("zoom", 1.0),
cutout=video_data.get("cutout"),
always_visible=video_data.get("always_visible", False),
is_shared=video_data.get("is_shared", False),
pause_narration=float(video_data.get("pause_narration", 0)),
attribution=attribution,
use_audio_channels=video_data.get("use_audio_channels", "both"),
defer_loudnorm=video_data.get("defer_loudnorm", False),
volume=float(video_data.get("volume", 1.0)),
)
return videos, videos_dir
def parse_narration(
project_path: Path, config: Optional[ProjectConfig] = None
) -> tuple[dict[str, VideoSource], Path]:
"""
Parse narration.json into narration segment definitions.
Narration segments are stored in media/narration/ and are processed
separately from videos. Each segment can have filters, begin/end trim
points, and other properties similar to videos.
Filter can be specified as:
- A list of filter configs (inline definition)
- A string referencing a named preset in config.default_filters
Trim points can be specified as:
- skip/take: raw values in seconds (traditional)
- begin/end: timestamp strings like "3.5s", "2:54", "1:23:45" (user-friendly)
These are converted to skip/take internally.
Returns:
Tuple of (narration dict, narration_dir) where narration_dir is the directory
containing narration.json (for resolving relative file paths).
"""
# Narration is always in media/narration/
# Keep local directory for file lookups (cache fallback handles resolution)
narration_dir = project_path / "media" / "narration"
local_narration_path = narration_dir / "narration.json"
# Try cache fallback for reading JSON
narration_path, _ = resolve_with_cache(local_narration_path, project_path)
# Narration is optional - return empty dict if not found
if not narration_path.exists():
return {}, narration_dir
try:
data = _read_json(narration_path)
except json.JSONDecodeError as e:
raise ParseError(f"Invalid JSON: {e}", narration_path)
# Get default_filters from config for resolving references
default_filters = config.default_filters if config else {}
narration = {}
for segment_id, segment_data in data.items():
if "source_file" not in segment_data:
raise ParseError(
f"Narration segment '{segment_id}' missing required field 'source_file'",
narration_path,
)
# Resolve filter - can be a list or a string reference to default_filters
filter_value = segment_data.get("filter", [])
if isinstance(filter_value, str):
# It's a reference to a named filter preset
if filter_value not in default_filters:
raise ParseError(
f"Narration segment '{segment_id}' references unknown filter preset '{filter_value}'. "
f"Available presets: {list(default_filters.keys())}",
narration_path,
)
filter_list = default_filters[filter_value]
else:
# It's an inline filter definition
filter_list = filter_value
# Handle skip/take - can use begin/end as user-friendly alternatives
# Fall back to project-level defaults if no explicit value is set
default_begin = config.default_begin if config else 0.0
skip = segment_data.get("skip", default_begin)
take = segment_data.get("take")
# Explicit begin/end always override defaults
if "begin" in segment_data and segment_data["begin"]:
skip = parse_timestamp(segment_data["begin"])
if "end" in segment_data and segment_data["end"]:
end_time = parse_timestamp(segment_data["end"])
# take = end - begin (duration from begin to end)
take = end_time - skip
narration[segment_id] = VideoSource(
source_file=segment_data["source_file"],
filter=filter_list,
output_file=segment_data.get("output_file"),
take=take,
skip=skip,
zoom=segment_data.get("zoom", 1.0),
cutout=segment_data.get("cutout"),
always_visible=segment_data.get("always_visible", False),
use_audio_channels=segment_data.get("use_audio_channels", "both"),
defer_loudnorm=segment_data.get("defer_loudnorm", False),
volume=float(segment_data.get("volume", 1.0)),
)
return narration, narration_dir
def get_video_duration(video_path: Path) -> float:
"""Get duration of a video file using ffprobe."""
import subprocess
cmd = [
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
str(video_path),
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise ParseError(f"Failed to get duration: {result.stderr}", video_path)
return float(result.stdout.strip())
def parse_video_metadata(metadata_path: Path) -> VideoMetadata:
"""
Parse a video metadata JSON file.
Expected format:
{
"source_file": "talking_head.mov",
"preprocess": [
{"type": "chroma_key", "color": [0, 255, 0], "similarity": 0.15}
],
"output": {
"file": "intermediate/talking_head_rgba.mov",
"colorspace": "rgba",
"alpha": "straight"
}
}
"""
if not metadata_path.exists():
raise ParseError(f"Video metadata not found: {metadata_path}", metadata_path)
try:
data = _read_json(metadata_path)
except json.JSONDecodeError as e:
raise ParseError(f"Invalid JSON: {e}", metadata_path)
if "source_file" not in data:
raise ParseError(
"Video metadata missing required field 'source_file'", metadata_path
)
return VideoMetadata(
source_file=data["source_file"],
preprocess=data.get("preprocess", []),
output=data.get("output"),
)
def resolve_video_file(
project_path: Path, file_ref: str
) -> tuple[Path, Optional[VideoMetadata]]:
"""
Resolve a video file reference, which can be either:
1. A direct path to a video file
2. A path to a metadata JSON file
Returns:
Tuple of (actual video path to use, metadata if JSON file was used)
"""
ref_path = project_path / file_ref
# Check if it's a metadata JSON file
if file_ref.endswith(".json") and ref_path.exists():
metadata = parse_video_metadata(ref_path)
# Resolve paths relative to the metadata file's directory
metadata_dir = ref_path.parent
# If output is specified and exists, use it; otherwise use source
if metadata.output and metadata.output.get("file"):
output_path = metadata_dir / metadata.output["file"]
if output_path.exists():
return output_path, metadata
# Fall back to source file
source_path = metadata_dir / metadata.source_file
return source_path, metadata
# Direct video file reference
return ref_path, None