Add transcription and alignment pipeline

New commands:
- `transcribe`: Uses Whisper to generate word-level timestamps from video
- `align`: Matches manuscript markers to transcript, outputs transcript.csv

Workflow:
1. gnommo transcribe video.mov → video.transcript.json
2. gnommo align project/ → transcript.csv with markers at aligned times

Alignment uses fuzzy text matching to find the first phrase after each
marker in the manuscript, then locates it in the transcript. Applies
configurable offset (default -1s) so slides appear before speech.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-12 12:41:10 +01:00
parent 7f7425da46
commit 216131e072
4 changed files with 418 additions and 0 deletions
+1
View File
@@ -8,6 +8,7 @@ venv/
.venv/ .venv/
*.egg-info/ *.egg-info/
Video1/*
# OS # OS
.DS_Store .DS_Store
Thumbs.db Thumbs.db
+199
View File
@@ -0,0 +1,199 @@
"""Alignment stage: match manuscript markers to transcript timestamps."""
import csv
import re
from dataclasses import dataclass
from pathlib import Path
from .errors import GnommoError
from .transcriber import TranscribedWord
class AlignmentError(GnommoError):
"""Error during alignment."""
pass
@dataclass
class MarkerAlignment:
"""A marker with its aligned timestamp."""
marker_id: str
timestamp: float
matched_phrase: str
confidence: float # 0-1, how confident the match is
def extract_marker_contexts(manuscript_text: str) -> list[tuple[str, str]]:
"""
Extract markers and the text immediately following them.
Returns:
List of (marker_id, following_text) tuples
"""
# Split by markers, keeping the markers
parts = re.split(r"\[([A-Za-z0-9_]+)\]", manuscript_text)
# parts will be: [text_before, marker1, text_after1, marker2, text_after2, ...]
contexts = []
for i in range(1, len(parts), 2):
marker_id = parts[i]
if i + 1 < len(parts):
following_text = parts[i + 1].strip()
# Get first sentence or first N words
following_text = _get_first_phrase(following_text)
contexts.append((marker_id, following_text))
return contexts
def _get_first_phrase(text: str, max_words: int = 10) -> str:
"""Extract first phrase (up to first sentence end or max_words)."""
# Clean up the text
text = text.replace("\n", " ").strip()
# Find first sentence boundary
match = re.search(r"[.!?]", text)
if match and match.start() < 200:
text = text[: match.start()]
# Limit to max_words
words = text.split()[:max_words]
return " ".join(words)
def normalize_text(text: str) -> str:
"""Normalize text for matching (lowercase, remove punctuation)."""
text = text.lower()
text = re.sub(r"[^\w\s]", "", text)
text = re.sub(r"\s+", " ", text)
return text.strip()
def find_phrase_in_transcript(
phrase: str,
transcript: list[TranscribedWord],
start_from: int = 0,
) -> tuple[int, float]:
"""
Find a phrase in the transcript and return the word index and timestamp.
Uses sliding window matching with normalization.
Returns:
Tuple of (word_index, timestamp) or (-1, 0.0) if not found
"""
phrase_normalized = normalize_text(phrase)
phrase_words = phrase_normalized.split()
if not phrase_words:
return -1, 0.0
# Try to find increasingly shorter prefixes
for length in range(len(phrase_words), 2, -1):
target = " ".join(phrase_words[:length])
# Sliding window through transcript
for i in range(start_from, len(transcript) - length + 1):
window_words = [normalize_text(transcript[j].word) for j in range(i, i + length)]
window_text = " ".join(window_words)
if target in window_text or window_text in target:
return i, transcript[i].start
# Fallback: try to find just the first few words
if len(phrase_words) >= 2:
target = " ".join(phrase_words[:3])
for i in range(start_from, len(transcript) - 2):
window_words = [normalize_text(transcript[j].word) for j in range(i, min(i + 5, len(transcript)))]
window_text = " ".join(window_words)
if phrase_words[0] in window_text and phrase_words[1] in window_text:
return i, transcript[i].start
return -1, 0.0
def align_markers(
manuscript_text: str,
transcript: list[TranscribedWord],
offset_seconds: float = -1.0,
) -> list[MarkerAlignment]:
"""
Align manuscript markers to transcript timestamps.
Args:
manuscript_text: Full manuscript text with [S1], [S2] etc.
transcript: Word-level transcript with timestamps
offset_seconds: Offset to apply to found timestamps (default -1.0)
Returns:
List of MarkerAlignment with timestamps
"""
contexts = extract_marker_contexts(manuscript_text)
alignments: list[MarkerAlignment] = []
last_index = 0
for marker_id, following_text in contexts:
idx, timestamp = find_phrase_in_transcript(
following_text, transcript, start_from=last_index
)
if idx >= 0:
# Apply offset (e.g., -1 second before the word)
adjusted_time = max(0.0, timestamp + offset_seconds)
alignments.append(MarkerAlignment(
marker_id=marker_id,
timestamp=adjusted_time,
matched_phrase=following_text[:50],
confidence=1.0,
))
last_index = idx
else:
# Could not find match - report but continue
alignments.append(MarkerAlignment(
marker_id=marker_id,
timestamp=-1.0, # Indicates not found
matched_phrase=following_text[:50],
confidence=0.0,
))
return alignments
def save_aligned_transcript(
alignments: list[MarkerAlignment],
transcript: list[TranscribedWord],
output_path: Path,
) -> None:
"""
Save aligned transcript as CSV compatible with gnommo's transcript.csv format.
Format:
t,word
0.00,Hello
1.50,[S1]
1.51,This
...
"""
# Build list of (timestamp, word) including markers
entries: list[tuple[float, str]] = []
# Add all words from transcript
for word in transcript:
entries.append((word.start, word.word))
# Add markers at their aligned positions
for alignment in alignments:
if alignment.timestamp >= 0:
entries.append((alignment.timestamp, f"[{alignment.marker_id}]"))
# Sort by timestamp
entries.sort(key=lambda x: x[0])
# Write CSV
with open(output_path, "w", encoding="utf-8", newline="") as f:
writer = csv.writer(f)
writer.writerow(["t", "word"])
for timestamp, word in entries:
writer.writerow([f"{timestamp:.2f}", word])
+127
View File
@@ -18,6 +18,8 @@ from .parser import (
from .validator import validate_project from .validator import validate_project
from .transformer import build_render_plan from .transformer import build_render_plan
from .renderer import render, generate_ffmpeg_command_string from .renderer import render, generate_ffmpeg_command_string
from .transcriber import transcribe_video, save_transcript, load_transcript
from .aligner import align_markers, save_aligned_transcript
def main() -> int: def main() -> int:
@@ -87,6 +89,50 @@ def main() -> int:
help="Slide type for all slides (default: square)", help="Slide type for all slides (default: square)",
) )
# transcribe command
transcribe_parser = subparsers.add_parser(
"transcribe",
help="Transcribe video audio using Whisper",
)
transcribe_parser.add_argument(
"video",
type=Path,
help="Path to video file",
)
transcribe_parser.add_argument(
"-o", "--output",
type=Path,
help="Output JSON file (default: <video>.transcript.json)",
)
transcribe_parser.add_argument(
"--model",
default="base",
choices=["tiny", "base", "small", "medium", "large"],
help="Whisper model size (default: base)",
)
# align command
align_parser = subparsers.add_parser(
"align",
help="Align manuscript markers to transcript timestamps",
)
align_parser.add_argument(
"project",
type=Path,
help="Path to project directory",
)
align_parser.add_argument(
"--transcript",
type=Path,
help="Path to transcript JSON (default: media/talking_head.transcript.json)",
)
align_parser.add_argument(
"--offset",
type=float,
default=-1.0,
help="Seconds to offset marker times (default: -1.0)",
)
args = parser.parse_args() args = parser.parse_args()
try: try:
@@ -97,6 +143,11 @@ def main() -> int:
return cmd_render(args.project, output, args.verbose, args.dry_run) return cmd_render(args.project, output, args.verbose, args.dry_run)
elif args.command == "generate-slides": elif args.command == "generate-slides":
return cmd_generate_slides(args.directory, args.type) return cmd_generate_slides(args.directory, args.type)
elif args.command == "transcribe":
output = args.output or args.video.with_suffix(".transcript.json")
return cmd_transcribe(args.video, output, args.model)
elif args.command == "align":
return cmd_align(args.project, args.transcript, args.offset)
except GnommoError as e: except GnommoError as e:
print(f"Error: {e}", file=sys.stderr) print(f"Error: {e}", file=sys.stderr)
return 1 return 1
@@ -231,5 +282,81 @@ def cmd_generate_slides(directory: Path, slide_type: str) -> int:
return 0 return 0
def cmd_transcribe(video_path: Path, output_path: Path, model: str) -> int:
"""Transcribe video audio using Whisper."""
print(f"Transcribing: {video_path}")
print(f"Model: {model}")
print()
words = transcribe_video(video_path, model=model)
print(f" - Transcribed {len(words)} words")
print(f" - Duration: {words[-1].end:.1f}s" if words else " - No words found")
save_transcript(words, output_path)
print(f" - Saved to: {output_path}")
# Show first few words as preview
if words:
preview = " ".join(w.word for w in words[:10])
print(f" - Preview: {preview}...")
return 0
def cmd_align(project_path: Path, transcript_path: Path = None, offset: float = -1.0) -> int:
"""Align manuscript markers to transcript timestamps."""
print(f"Aligning: {project_path}")
print(f"Offset: {offset}s")
print()
# Load manuscript
manuscript_path = project_path / "manuscript.txt"
if not manuscript_path.exists():
print(f"Error: manuscript.txt not found", file=sys.stderr)
return 1
manuscript_text = manuscript_path.read_text(encoding="utf-8")
# Load transcript
if transcript_path is None:
# Try to find transcript in media folder
transcript_path = project_path / "media" / "talking_head.transcript.json"
if not transcript_path.exists():
print(f"Error: Transcript not found: {transcript_path}", file=sys.stderr)
print("Run 'gnommo transcribe' first to generate the transcript.", file=sys.stderr)
return 1
print(f" - Loading transcript: {transcript_path}")
transcript = load_transcript(transcript_path)
print(f" - Loaded {len(transcript)} words")
# Align markers
print(" - Aligning markers...")
alignments = align_markers(manuscript_text, transcript, offset_seconds=offset)
# Report results
print()
print("Alignment results:")
unmatched = 0
for a in alignments:
if a.timestamp >= 0:
print(f" [{a.marker_id}] @ {a.timestamp:.2f}s - \"{a.matched_phrase}...\"")
else:
print(f" [{a.marker_id}] NOT FOUND - \"{a.matched_phrase}...\"")
unmatched += 1
if unmatched > 0:
print(f"\nWarning: {unmatched} markers could not be aligned")
# Save aligned transcript.csv
output_path = project_path / "transcript.csv"
save_aligned_transcript(alignments, transcript, output_path)
print(f"\nSaved: {output_path}")
return 0
if __name__ == "__main__": if __name__ == "__main__":
sys.exit(main()) sys.exit(main())
+91
View File
@@ -0,0 +1,91 @@
"""Transcription stage: extract word-level timestamps from video audio."""
import json
import subprocess
from dataclasses import dataclass
from pathlib import Path
from .errors import GnommoError
@dataclass
class TranscribedWord:
"""A word with its timestamp from transcription."""
word: str
start: float
end: float
class TranscriptionError(GnommoError):
"""Error during transcription."""
pass
def transcribe_video(video_path: Path, model: str = "base") -> list[TranscribedWord]:
"""
Transcribe video audio using Whisper and return word-level timestamps.
Args:
video_path: Path to video file
model: Whisper model size (tiny, base, small, medium, large)
Returns:
List of TranscribedWord with timestamps
"""
try:
import whisper
except ImportError:
raise TranscriptionError(
"Whisper not installed. Run: pip install openai-whisper"
)
if not video_path.exists():
raise TranscriptionError(f"Video file not found: {video_path}")
print(f" Loading Whisper model '{model}'...")
whisper_model = whisper.load_model(model)
print(f" Transcribing {video_path.name}...")
result = whisper_model.transcribe(
str(video_path),
word_timestamps=True,
verbose=False,
)
# Extract word-level timestamps
words: list[TranscribedWord] = []
for segment in result.get("segments", []):
for word_info in segment.get("words", []):
words.append(TranscribedWord(
word=word_info["word"].strip(),
start=word_info["start"],
end=word_info["end"],
))
return words
def save_transcript(words: list[TranscribedWord], output_path: Path) -> None:
"""Save transcribed words to a JSON file."""
data = [
{"word": w.word, "start": w.start, "end": w.end}
for w in words
]
with open(output_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
def load_transcript(transcript_path: Path) -> list[TranscribedWord]:
"""Load transcribed words from a JSON file."""
if not transcript_path.exists():
raise TranscriptionError(f"Transcript file not found: {transcript_path}")
with open(transcript_path, "r", encoding="utf-8") as f:
data = json.load(f)
return [
TranscribedWord(word=w["word"], start=w["start"], end=w["end"])
for w in data
]