Files
gnommo/gnommo/parser.py
T
gitprov df900dfd59 Add gnommo.sh wrapper and fix slide/scaling issues
gnommo.sh:
- Bash wrapper for easy CLI usage
- Commands: validate, transcribe, align, render, all
- `gnommo.sh -p video1 all` runs full pipeline

Slide scaling:
- Slides now scale to full frame (1920x1080)
- Transparent areas show through to layers below
- Positioned at 0,0 for full overlay

targetheight percentage:
- Supports percentage values like "100%"
- Calculates actual height from frame resolution
- "100%" on 1080p = 1080px height

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-12 15:13:33 +01:00

232 lines
7.2 KiB
Python

"""Extract stage: parse all input files."""
import csv
import json
import re
from pathlib import Path
from typing import Any
from .errors import ParseError
from .models import (
ProjectConfig,
SlideDefinition,
TalkingHeadConfig,
TimedWord,
VideoSource,
)
def parse_manuscript(project_path: Path) -> tuple[str, list[str], list[tuple[int, str]]]:
"""
Parse manuscript.txt and extract text content and slide markers.
Returns:
Tuple of (full text, list of marker IDs found, list of malformed markers as (line_num, text))
"""
manuscript_path = project_path / "manuscript.txt"
if not manuscript_path.exists():
raise ParseError("manuscript.txt not found", manuscript_path)
text = manuscript_path.read_text(encoding="utf-8")
# Extract all valid slide markers like [S1], [S2], etc.
markers = re.findall(r"\[([A-Za-z0-9_]+)\]", text)
# Find malformed markers (missing brackets, extra spaces, etc.)
malformed: list[tuple[int, str]] = []
lines = text.split("\n")
for line_num, line in enumerate(lines, start=1):
# Pattern for potential markers that are malformed:
# - Missing closing bracket: [S1 or [S12 (not followed by ])
# - Extra spaces: [S 1] or [S1 ] or [ S1]
# Find unclosed brackets: [S followed by digits, then space/newline/EOF (not ])
# Match [S1, [S12, [S123 etc that are NOT followed by ]
for match in re.finditer(r"\[S\d+", line):
start, end = match.span()
# Check if there's a ] immediately after
if end >= len(line) or line[end] != "]":
malformed.append((line_num, match.group()))
# Find markers with internal/trailing spaces like [S 1] or [S1 ] or [ S1]
spaced = re.findall(r"\[\s+S\d+\s*\]|\[S\d+\s+\]|\[S\s+\d+\]", line)
for match in spaced:
malformed.append((line_num, match))
return text, markers, malformed
def parse_transcript(project_path: Path) -> list[TimedWord]:
"""
Parse transcript.csv into a list of timed words.
Expected format:
t,word
0.00,This
0.42,is
...
"""
transcript_path = project_path / "transcript.csv"
if not transcript_path.exists():
raise ParseError("transcript.csv not found", transcript_path)
timed_words = []
with open(transcript_path, "r", encoding="utf-8") as f:
reader = csv.DictReader(f)
if reader.fieldnames is None or "t" not in reader.fieldnames or "word" not in reader.fieldnames:
raise ParseError(
"transcript.csv must have columns: t, word",
transcript_path
)
for line_num, row in enumerate(reader, start=2): # start=2 because line 1 is header
try:
time = float(row["t"])
word = row["word"].strip()
timed_words.append(TimedWord(time=time, word=word))
except (ValueError, KeyError) as e:
raise ParseError(
f"Invalid row: {e}",
transcript_path,
line_num
)
return timed_words
def parse_project_config(project_path: Path) -> ProjectConfig:
"""Parse project.json into ProjectConfig."""
config_path = project_path / "project.json"
if not config_path.exists():
raise ParseError("project.json not found", config_path)
try:
data = json.loads(config_path.read_text(encoding="utf-8"))
except json.JSONDecodeError as e:
raise ParseError(f"Invalid JSON: {e}", config_path)
# Parse talking head config
th_data = data.get("talkinghead", {})
th_height, th_height_pct = _parse_dimension(th_data.get("targetheight", 200))
talking_head = TalkingHeadConfig(
x=th_data.get("x", 100),
y=th_data.get("y", 100),
target_height=th_height,
target_height_percent=th_height_pct,
)
# Parse resolution
resolution = data.get("resolution", [1920, 1080])
if not isinstance(resolution, list) or len(resolution) != 2:
raise ParseError("resolution must be [width, height]", config_path)
return ProjectConfig(
resolution=tuple(resolution),
fps=data.get("fps", 30),
talking_head=talking_head,
default_slide_type=data.get("defaultSlideType", "square"),
background_video=data.get("background_video", ""),
slides_path=data.get("slides", "slides.json"),
audio_source=data.get("audio_source"),
)
def _parse_dimension(value: Any) -> tuple[int, float]:
"""
Parse a dimension value (can be int or string like '100%').
Returns:
Tuple of (pixels, percentage). If pixels is -1, use percentage.
"""
if isinstance(value, int):
return value, 0.0
if isinstance(value, str):
if value.endswith("%"):
pct = float(value[:-1]) / 100.0
return -1, pct
return int(value), 0.0
return 200, 0.0 # default
def parse_slides(project_path: Path, config: ProjectConfig = None) -> dict[str, SlideDefinition]:
"""Parse slides.json into slide definitions."""
if config and config.slides_path:
slides_path = project_path / config.slides_path
else:
slides_path = project_path / "slides.json"
if not slides_path.exists():
raise ParseError(f"slides file not found: {slides_path}", slides_path)
try:
data = json.loads(slides_path.read_text(encoding="utf-8"))
except json.JSONDecodeError as e:
raise ParseError(f"Invalid JSON: {e}", slides_path)
slides = {}
for slide_id, slide_data in data.items():
if "image" not in slide_data:
raise ParseError(
f"Slide '{slide_id}' missing required field 'image'",
slides_path
)
slides[slide_id] = SlideDefinition(
image=slide_data["image"],
type=slide_data.get("type", "square"),
)
return slides
def parse_videos(project_path: Path) -> dict[str, VideoSource]:
"""Parse videos.json into video source definitions."""
videos_path = project_path / "videos.json"
if not videos_path.exists():
raise ParseError("videos.json not found", videos_path)
try:
data = json.loads(videos_path.read_text(encoding="utf-8"))
except json.JSONDecodeError as e:
raise ParseError(f"Invalid JSON: {e}", videos_path)
videos = {}
for video_id, video_data in data.items():
if "file" not in video_data:
raise ParseError(
f"Video '{video_id}' missing required field 'file'",
videos_path
)
videos[video_id] = VideoSource(
file=video_data["file"],
preprocess=video_data.get("preprocess", []),
)
return videos
def get_video_duration(video_path: Path) -> float:
"""Get duration of a video file using ffprobe."""
import subprocess
cmd = [
"ffprobe",
"-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
str(video_path)
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise ParseError(f"Failed to get duration: {result.stderr}", video_path)
return float(result.stdout.strip())