Fixing some filter paralleism

This commit is contained in:
2026-05-12 08:04:45 +02:00
parent 994a2e0bb6
commit 409d7790c0
6 changed files with 92 additions and 18 deletions
+51 -14
View File
@@ -1256,6 +1256,16 @@ def cmd_validate(project_path: Path, verbose: bool) -> int:
# ============================================================================= # =============================================================================
def _resolve_process_cache(project_path: Path, config) -> Optional[Path]:
"""Return per-project cache dir on external disk, or None if not configured."""
if not (config and config.process_cache):
return None
p = Path(config.process_cache)
if not p.is_absolute():
p = (project_path / p).resolve()
return p / project_path.name
def cmd_preprocess( def cmd_preprocess(
project_path: Path, project_path: Path,
verbose: bool, verbose: bool,
@@ -1280,13 +1290,24 @@ def cmd_preprocess(
config = parse_project_config(project_path) config = parse_project_config(project_path)
# Narration directory — always media/narration/ # Narration directory — source files always in project media/narration/
narration_dir = project_path / "media" / "narration" narration_dir = project_path / "media" / "narration"
narration_dir.mkdir(parents=True, exist_ok=True) narration_dir.mkdir(parents=True, exist_ok=True)
raw_dir = narration_dir / "raw_mov" raw_dir = narration_dir / "raw_mov"
compressed_dir = narration_dir / "raw_mp4" compressed_dir = narration_dir / "raw_mp4"
processed_dir = narration_dir / "processed"
# process_cache: write processed outputs to external disk to save laptop space
cache_root = _resolve_process_cache(project_path, config)
if cache_root:
cache_narration_dir = cache_root / "narration"
cache_narration_dir.mkdir(parents=True, exist_ok=True)
(cache_narration_dir / "processed").mkdir(parents=True, exist_ok=True)
print(f" Using process cache: {cache_root}")
else:
cache_narration_dir = None
processed_dir = (cache_narration_dir or narration_dir) / "processed"
processed_dir.mkdir(parents=True, exist_ok=True) processed_dir.mkdir(parents=True, exist_ok=True)
# Resolve intermediate directory # Resolve intermediate directory
@@ -1362,7 +1383,10 @@ def cmd_preprocess(
output_file = f"{_subdir}/processed/{segment_id}_processed.mov" output_file = f"{_subdir}/processed/{segment_id}_processed.mov"
else: else:
output_file = f"processed/{segment_id}_processed.mov" output_file = f"processed/{segment_id}_processed.mov"
output_path = narration_dir / output_file # When process_cache is set, output goes to the cache dir; narration.json
# still records the relative path so stitch (also using cache) can find it.
output_base = cache_narration_dir or narration_dir
output_path = output_base / output_file
if output_path.exists() and not force: if output_path.exists() and not force:
print(f" {segment_id}: output exists, skipping (use --force to reprocess)") print(f" {segment_id}: output exists, skipping (use --force to reprocess)")
@@ -1420,7 +1444,7 @@ def cmd_preprocess(
def process_segment_task(task): def process_segment_task(task):
seg_id, seg_source = task seg_id, seg_source = task
preprocess_video( preprocess_video(
narration_dir, cache_narration_dir or narration_dir,
seg_id, seg_id,
seg_source, seg_source,
verbose=False, verbose=False,
@@ -1439,7 +1463,7 @@ def cmd_preprocess(
seg_id, seg_source = future.result() seg_id, seg_source = future.result()
completed += 1 completed += 1
print(f" Completed: {seg_id} ({completed}/{len(segments_to_process)})") print(f" Completed: {seg_id} ({completed}/{len(segments_to_process)})")
output_path = narration_dir / seg_source.output_file output_path = (cache_narration_dir or narration_dir) / seg_source.output_file
if output_path.exists(): if output_path.exists():
successfully_processed.append((seg_id, seg_source)) successfully_processed.append((seg_id, seg_source))
else: else:
@@ -1449,7 +1473,7 @@ def cmd_preprocess(
print(f" Output: {segment_source.output_file}") print(f" Output: {segment_source.output_file}")
print(f" Filters: {len(segment_source.filter)} step(s)") print(f" Filters: {len(segment_source.filter)} step(s)")
preprocess_video( preprocess_video(
narration_dir, cache_narration_dir or narration_dir,
segment_id, segment_id,
segment_source, segment_source,
verbose, verbose,
@@ -1457,7 +1481,7 @@ def cmd_preprocess(
gnommo_scratch, gnommo_scratch,
res=res, res=res,
) )
output_path = narration_dir / segment_source.output_file output_path = (cache_narration_dir or narration_dir) / segment_source.output_file
if output_path.exists(): if output_path.exists():
successfully_processed.append((segment_id, segment_source)) successfully_processed.append((segment_id, segment_source))
@@ -2039,15 +2063,26 @@ def cmd_stitch(
else: else:
videos_dir = project_path / "media" / "videos" videos_dir = project_path / "media" / "videos"
# When process_cache is set, redirect processed segment reads and combined output
cache_root = _resolve_process_cache(project_path, config)
if cache_root:
narration_dir = cache_root / "narration"
narration_dir.mkdir(parents=True, exist_ok=True)
videos_dir_out = cache_root / "videos"
videos_dir_out.mkdir(parents=True, exist_ok=True)
print(f" Using process cache: {cache_root}")
else:
videos_dir_out = videos_dir
# Use downscaled dirs for non-full res # Use downscaled dirs for non-full res
if res != "full": if res != "full":
cfg = RES_CONFIGS[res] cfg = RES_CONFIGS[res]
narration_dir = ensure_downscaled_files_exist( narration_dir = ensure_downscaled_files_exist(
narration_dir, res, force=False, verbose=verbose narration_dir, res, force=False, verbose=verbose
) )
videos_dir = videos_dir / cfg[2] videos_dir_out = videos_dir_out / cfg[2]
videos_dir.mkdir(parents=True, exist_ok=True) videos_dir_out.mkdir(parents=True, exist_ok=True)
print(f" Using {res} dirs: {narration_dir}, {videos_dir}") print(f" Using {res} dirs: {narration_dir}, {videos_dir_out}")
# Get segment IDs in sorted order # Get segment IDs in sorted order
segment_ids = sorted(narration.keys()) segment_ids = sorted(narration.keys())
@@ -2062,7 +2097,7 @@ def cmd_stitch(
trim_str = f" ({trim_info})" if trim_info else "" trim_str = f" ({trim_info})" if trim_info else ""
print(f" - {segment_id}{trim_str}") print(f" - {segment_id}{trim_str}")
stitch_output = videos_dir / "narration_combined.mov" stitch_output = videos_dir_out / "narration_combined.mov"
if stitch_output.exists() and not force: if stitch_output.exists() and not force:
print(f"\n Combined narration exists: {stitch_output.name}") print(f"\n Combined narration exists: {stitch_output.name}")
@@ -2085,12 +2120,14 @@ def cmd_stitch(
default_end_trim=config.default_end_trim if config else 0.0, default_end_trim=config.default_end_trim if config else 0.0,
loudnorm_config=_loudnorm_cfg, loudnorm_config=_loudnorm_cfg,
) )
# Run import videos again, because at this point narration_combined might have been created. # Run import videos again to update duration metadata (skip when using cache
_import_videos(videos_dir, config, verbose) # since narration_combined.mov lives on the external disk, not in videos_dir).
if not cache_root:
_import_videos(videos_dir_out, config, verbose)
# Always update the MAIN videos.json (parent of subdir when using low/tiny res) # Always update the MAIN videos.json (parent of subdir when using low/tiny res)
# Downscaled dirs only affect file paths, not JSON metadata updates # Downscaled dirs only affect file paths, not JSON metadata updates
main_videos_dir = videos_dir.parent if res != "full" else videos_dir main_videos_dir = videos_dir_out.parent if (res != "full" and not cache_root) else videos_dir
videos_json_path = main_videos_dir / "videos.json" videos_json_path = main_videos_dir / "videos.json"
if True: # Always update JSON regardless of proxy mode if True: # Always update JSON regardless of proxy mode
existing_videos: dict = {} existing_videos: dict = {}
+6
View File
@@ -56,6 +56,9 @@ class ProjectConfig:
gnommo_scratch: Optional[ gnommo_scratch: Optional[
str str
] = None # directory for intermediate files (e.g., external SSD) ] = None # directory for intermediate files (e.g., external SSD)
process_cache: Optional[
str
] = None # external directory for processed/combined outputs (saves laptop disk space)
default_begin: float = 0.0 # Trim this many seconds from the start of each segment (if no explicit begin/skip) default_begin: float = 0.0 # Trim this many seconds from the start of each segment (if no explicit begin/skip)
default_end_trim: float = 0.0 # Trim this many seconds from the end of each segment (if no explicit end/take) default_end_trim: float = 0.0 # Trim this many seconds from the end of each segment (if no explicit end/take)
# Outro sequence - plays after narration ends (not marker-triggered) # Outro sequence - plays after narration ends (not marker-triggered)
@@ -525,6 +528,9 @@ class RenderPlan:
output_path: Optional[ output_path: Optional[
Path Path
] = None # Final output file path (set after plan is built) ] = None # Final output file path (set after plan is built)
process_cache_dir: Optional[
Path
] = None # Per-project subdir in process_cache (e.g. /Volumes/GnommoDisk/video3)
# Slide layout configurations (hardcoded for POC) # Slide layout configurations (hardcoded for POC)
+1
View File
@@ -261,6 +261,7 @@ def parse_project_config(project_path: Path) -> ProjectConfig:
audio_source=data.get("audio_source"), audio_source=data.get("audio_source"),
main_video=data.get("main_video"), main_video=data.get("main_video"),
gnommo_scratch=data.get("gnommo_scratch"), gnommo_scratch=data.get("gnommo_scratch"),
process_cache=data.get("process_cache"),
default_begin=float(data.get("default_begin", 0.0)), default_begin=float(data.get("default_begin", 0.0)),
default_end_trim=float(data.get("default_end_trim", 0.0)), default_end_trim=float(data.get("default_end_trim", 0.0)),
outro=data.get("outro", []), outro=data.get("outro", []),
+10
View File
@@ -778,6 +778,8 @@ def apply_combined_video_filters(
[ [
"-i", "-i",
str(input_path), str(input_path),
"-filter_threads",
"1",
"-vf", "-vf",
video_filter, video_filter,
"-c:v", "-c:v",
@@ -1363,8 +1365,12 @@ def _process_chunk_to_prores4444(
cmd.extend(["-t", str(actual_take)]) cmd.extend(["-t", str(actual_take)])
# Video encode: ProRes 4444 with alpha # Video encode: ProRes 4444 with alpha
# -filter_threads 1: geq is serial anyway; limiting threads eliminates the N-way
# RGBA frame buffer explosion that causes OOM when chunk workers run in parallel.
cmd.extend( cmd.extend(
[ [
"-filter_threads",
"1",
"-vf", "-vf",
video_filter, video_filter,
"-c:v", "-c:v",
@@ -1689,6 +1695,8 @@ def apply_chroma_key(
[ [
"-i", "-i",
str(input_path), str(input_path),
"-filter_threads",
"1",
"-vf", "-vf",
video_filter, video_filter,
"-c:v", "-c:v",
@@ -1794,6 +1802,8 @@ def apply_mask(
[ [
"-i", "-i",
str(input_path), str(input_path),
"-filter_threads",
"1",
"-vf", "-vf",
video_filter, video_filter,
"-c:v", "-c:v",
+15 -4
View File
@@ -197,6 +197,7 @@ def _resolve_video_path(
video_source: VideoSource, video_source: VideoSource,
shared_assets_dir: Path = None, shared_assets_dir: Path = None,
project_path: Path = None, project_path: Path = None,
process_cache_dir: Path = None,
) -> Path: ) -> Path:
"""Resolve the actual video file path (output_file if exists, else source_file). """Resolve the actual video file path (output_file if exists, else source_file).
@@ -205,6 +206,7 @@ def _resolve_video_path(
If video_source.is_shared is True, looks in shared_assets_dir instead of videos_dir. If video_source.is_shared is True, looks in shared_assets_dir instead of videos_dir.
Uses gnommocache fallback if configured and project_path is provided. Uses gnommocache fallback if configured and project_path is provided.
When process_cache_dir is set, also checks {cache}/videos/ for the file.
""" """
from .cache import resolve_with_cache from .cache import resolve_with_cache
@@ -233,6 +235,12 @@ def _resolve_video_path(
elif webm_path.exists(): elif webm_path.exists():
return webm_path return webm_path
# Check process_cache_dir before falling back to source_file in project
if process_cache_dir and not video_source.is_shared:
cache_path = process_cache_dir / "videos" / video_source.source_file
if cache_path.exists():
return cache_path
# Fall back to source_file with cache fallback # Fall back to source_file with cache fallback
source_path = base_dir / video_source.source_file source_path = base_dir / video_source.source_file
if project_path: if project_path:
@@ -310,6 +318,9 @@ def build_ffmpeg_command(plan: RenderPlan, output_path: Path) -> list[str]:
shared_assets_dir = ( shared_assets_dir = (
plan.shared_assets_dir.resolve() if plan.shared_assets_dir else None plan.shared_assets_dir.resolve() if plan.shared_assets_dir else None
) )
process_cache_dir = (
plan.process_cache_dir.resolve() if plan.process_cache_dir else None
)
# Track input indices # Track input indices
input_idx = 0 input_idx = 0
@@ -319,7 +330,7 @@ def build_ffmpeg_command(plan: RenderPlan, output_path: Path) -> list[str]:
always_visible_inputs: list[int] = [] always_visible_inputs: list[int] = []
for video_id, video_source, cutout in plan.narration_videos: for video_id, video_source, cutout in plan.narration_videos:
video_path = _resolve_video_path( video_path = _resolve_video_path(
videos_dir, video_source, shared_assets_dir, project_path videos_dir, video_source, shared_assets_dir, project_path, process_cache_dir
) )
# Combine video skip setting with partial render offset # Combine video skip setting with partial render offset
total_seek = video_source.skip + plan.input_seek_time total_seek = video_source.skip + plan.input_seek_time
@@ -393,7 +404,7 @@ def build_ffmpeg_command(plan: RenderPlan, output_path: Path) -> list[str]:
for i, event in enumerate(plan.video_events): for i, event in enumerate(plan.video_events):
video_path = _resolve_video_path( video_path = _resolve_video_path(
videos_dir, event.video_source, shared_assets_dir, project_path videos_dir, event.video_source, shared_assets_dir, project_path, process_cache_dir
) )
skip = event.video_source.skip or 0.0 skip = event.video_source.skip or 0.0
if skip > 0: if skip > 0:
@@ -423,7 +434,7 @@ def build_ffmpeg_command(plan: RenderPlan, output_path: Path) -> list[str]:
for i, event in enumerate(plan.outro_events): for i, event in enumerate(plan.outro_events):
video_path = _resolve_video_path( video_path = _resolve_video_path(
videos_dir, event.video_source, shared_assets_dir, project_path videos_dir, event.video_source, shared_assets_dir, project_path, process_cache_dir
) )
skip = event.video_source.skip or 0.0 skip = event.video_source.skip or 0.0
if skip > 0: if skip > 0:
@@ -1103,7 +1114,7 @@ def build_filter_complex(
use_channels = first_video_source.use_audio_channels use_channels = first_video_source.use_audio_channels
if use_channels == "auto": if use_channels == "auto":
narration_path = _resolve_video_path( narration_path = _resolve_video_path(
videos_dir, first_video_source, shared_assets_dir, project_path videos_dir, first_video_source, shared_assets_dir, project_path, process_cache_dir
) )
use_channels = _resolve_auto_channel(narration_path) use_channels = _resolve_auto_channel(narration_path)
channel_filter = _build_audio_channel_filter(use_channels) channel_filter = _build_audio_channel_filter(use_channels)
+9
View File
@@ -586,6 +586,14 @@ def build_render_plan(
elif (project_path.parent / "shared_assets").exists(): elif (project_path.parent / "shared_assets").exists():
shared_assets_dir = project_path.parent / "shared_assets" shared_assets_dir = project_path.parent / "shared_assets"
# Resolve process_cache per-project directory
process_cache_dir = None
if config.process_cache:
_pc = Path(config.process_cache)
if not _pc.is_absolute():
_pc = (project_path / _pc).resolve()
process_cache_dir = _pc / project_path.name
narration_video = videos[narration_video_id] narration_video = videos[narration_video_id]
cutout = config.cutouts[narration_video.cutout] cutout = config.cutouts[narration_video.cutout]
@@ -800,6 +808,7 @@ def build_render_plan(
outro_events=outro_events, outro_events=outro_events,
narration_end_time=narration_end_time, narration_end_time=narration_end_time,
cached_files=cached_files, cached_files=cached_files,
process_cache_dir=process_cache_dir,
) )
return plan, marker_timings return plan, marker_timings