diff --git a/.gitignore b/.gitignore index 11fa983..817c628 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ venv/ .venv/ *.egg-info/ +Video1/* # OS .DS_Store Thumbs.db diff --git a/gnommo/aligner.py b/gnommo/aligner.py new file mode 100644 index 0000000..abea290 --- /dev/null +++ b/gnommo/aligner.py @@ -0,0 +1,199 @@ +"""Alignment stage: match manuscript markers to transcript timestamps.""" + +import csv +import re +from dataclasses import dataclass +from pathlib import Path + +from .errors import GnommoError +from .transcriber import TranscribedWord + + +class AlignmentError(GnommoError): + """Error during alignment.""" + pass + + +@dataclass +class MarkerAlignment: + """A marker with its aligned timestamp.""" + marker_id: str + timestamp: float + matched_phrase: str + confidence: float # 0-1, how confident the match is + + +def extract_marker_contexts(manuscript_text: str) -> list[tuple[str, str]]: + """ + Extract markers and the text immediately following them. + + Returns: + List of (marker_id, following_text) tuples + """ + # Split by markers, keeping the markers + parts = re.split(r"\[([A-Za-z0-9_]+)\]", manuscript_text) + + # parts will be: [text_before, marker1, text_after1, marker2, text_after2, ...] + contexts = [] + + for i in range(1, len(parts), 2): + marker_id = parts[i] + if i + 1 < len(parts): + following_text = parts[i + 1].strip() + # Get first sentence or first N words + following_text = _get_first_phrase(following_text) + contexts.append((marker_id, following_text)) + + return contexts + + +def _get_first_phrase(text: str, max_words: int = 10) -> str: + """Extract first phrase (up to first sentence end or max_words).""" + # Clean up the text + text = text.replace("\n", " ").strip() + + # Find first sentence boundary + match = re.search(r"[.!?]", text) + if match and match.start() < 200: + text = text[: match.start()] + + # Limit to max_words + words = text.split()[:max_words] + return " ".join(words) + + +def normalize_text(text: str) -> str: + """Normalize text for matching (lowercase, remove punctuation).""" + text = text.lower() + text = re.sub(r"[^\w\s]", "", text) + text = re.sub(r"\s+", " ", text) + return text.strip() + + +def find_phrase_in_transcript( + phrase: str, + transcript: list[TranscribedWord], + start_from: int = 0, +) -> tuple[int, float]: + """ + Find a phrase in the transcript and return the word index and timestamp. + + Uses sliding window matching with normalization. + + Returns: + Tuple of (word_index, timestamp) or (-1, 0.0) if not found + """ + phrase_normalized = normalize_text(phrase) + phrase_words = phrase_normalized.split() + + if not phrase_words: + return -1, 0.0 + + # Try to find increasingly shorter prefixes + for length in range(len(phrase_words), 2, -1): + target = " ".join(phrase_words[:length]) + + # Sliding window through transcript + for i in range(start_from, len(transcript) - length + 1): + window_words = [normalize_text(transcript[j].word) for j in range(i, i + length)] + window_text = " ".join(window_words) + + if target in window_text or window_text in target: + return i, transcript[i].start + + # Fallback: try to find just the first few words + if len(phrase_words) >= 2: + target = " ".join(phrase_words[:3]) + for i in range(start_from, len(transcript) - 2): + window_words = [normalize_text(transcript[j].word) for j in range(i, min(i + 5, len(transcript)))] + window_text = " ".join(window_words) + if phrase_words[0] in window_text and phrase_words[1] in window_text: + return i, transcript[i].start + + return -1, 0.0 + + +def align_markers( + manuscript_text: str, + transcript: list[TranscribedWord], + offset_seconds: float = -1.0, +) -> list[MarkerAlignment]: + """ + Align manuscript markers to transcript timestamps. + + Args: + manuscript_text: Full manuscript text with [S1], [S2] etc. + transcript: Word-level transcript with timestamps + offset_seconds: Offset to apply to found timestamps (default -1.0) + + Returns: + List of MarkerAlignment with timestamps + """ + contexts = extract_marker_contexts(manuscript_text) + alignments: list[MarkerAlignment] = [] + + last_index = 0 + + for marker_id, following_text in contexts: + idx, timestamp = find_phrase_in_transcript( + following_text, transcript, start_from=last_index + ) + + if idx >= 0: + # Apply offset (e.g., -1 second before the word) + adjusted_time = max(0.0, timestamp + offset_seconds) + alignments.append(MarkerAlignment( + marker_id=marker_id, + timestamp=adjusted_time, + matched_phrase=following_text[:50], + confidence=1.0, + )) + last_index = idx + else: + # Could not find match - report but continue + alignments.append(MarkerAlignment( + marker_id=marker_id, + timestamp=-1.0, # Indicates not found + matched_phrase=following_text[:50], + confidence=0.0, + )) + + return alignments + + +def save_aligned_transcript( + alignments: list[MarkerAlignment], + transcript: list[TranscribedWord], + output_path: Path, +) -> None: + """ + Save aligned transcript as CSV compatible with gnommo's transcript.csv format. + + Format: + t,word + 0.00,Hello + 1.50,[S1] + 1.51,This + ... + """ + # Build list of (timestamp, word) including markers + entries: list[tuple[float, str]] = [] + + # Add all words from transcript + for word in transcript: + entries.append((word.start, word.word)) + + # Add markers at their aligned positions + for alignment in alignments: + if alignment.timestamp >= 0: + entries.append((alignment.timestamp, f"[{alignment.marker_id}]")) + + # Sort by timestamp + entries.sort(key=lambda x: x[0]) + + # Write CSV + with open(output_path, "w", encoding="utf-8", newline="") as f: + writer = csv.writer(f) + writer.writerow(["t", "word"]) + for timestamp, word in entries: + writer.writerow([f"{timestamp:.2f}", word]) diff --git a/gnommo/cli.py b/gnommo/cli.py index 130e0db..b54c6b1 100644 --- a/gnommo/cli.py +++ b/gnommo/cli.py @@ -18,6 +18,8 @@ from .parser import ( from .validator import validate_project from .transformer import build_render_plan from .renderer import render, generate_ffmpeg_command_string +from .transcriber import transcribe_video, save_transcript, load_transcript +from .aligner import align_markers, save_aligned_transcript def main() -> int: @@ -87,6 +89,50 @@ def main() -> int: help="Slide type for all slides (default: square)", ) + # transcribe command + transcribe_parser = subparsers.add_parser( + "transcribe", + help="Transcribe video audio using Whisper", + ) + transcribe_parser.add_argument( + "video", + type=Path, + help="Path to video file", + ) + transcribe_parser.add_argument( + "-o", "--output", + type=Path, + help="Output JSON file (default: