#!/opt/homebrew/bin/python3.11 """ CUT Processor - Scene Detection & Video Quality Check Uses ffprobe for video analysis. Always produces at least 1 scene. """ import json import argparse import os import subprocess import sys sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from redis_publisher import RedisPublisher def get_video_info(video_path: str) -> dict: """Get video info via ffprobe""" try: result = subprocess.run( ["ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", video_path], capture_output=True, text=True, timeout=30, ) info = json.loads(result.stdout) for stream in info.get("streams", []): if stream.get("codec_type") == "video": nb_frames = stream.get("nb_frames") if nb_frames: fr = stream.get("r_frame_rate", "0/1") fps = eval(fr) if "/" in fr else float(fr) return { "frame_count": int(nb_frames), "fps": fps, "duration": float(stream.get("duration", 0)), "width": int(stream.get("width", 0)), "height": int(stream.get("height", 0)), "codec": stream.get("codec_name", ""), } dur = float(stream.get("duration", 0)) afr = stream.get("avg_frame_rate", "0/1") avg_fps = eval(afr) if "/" in afr else float(afr) if dur > 0 and avg_fps > 0: return { "frame_count": int(dur * avg_fps), "fps": avg_fps, "duration": dur, "width": int(stream.get("width", 0)), "height": int(stream.get("height", 0)), "codec": stream.get("codec_name", ""), } return { "frame_count": 0, "fps": 0.0, "duration": dur, "width": 0, "height": 0, "codec": "", } return {"frame_count": 0, "fps": 0.0, "duration": 0, "width": 0, "height": 0, "codec": ""} except Exception: return {"frame_count": 0, "fps": 0.0, "duration": 0, "width": 0, "height": 0, "codec": ""} def detect_scenes_ffmpeg(video_path: str, fps: float, duration: float) -> list: """Detect scene changes using ffmpeg scene filter""" try: result = subprocess.run( ["ffprobe", "-v", "quiet", "-show_entries", "frame=pts_time", "-of", "default=nk=0", "-f", "lavfi", f"movie={video_path},select='gt(scene\\,0.3)',showinfo", "-show_frames"], capture_output=True, text=True, timeout=300, ) times = [] for line in (result.stderr + "\n" + result.stdout).split("\n"): for prefix in ("pts_time=", "pts_time:"): if prefix in line: rest = line.split(prefix)[1].split()[0] try: t = float(rest) times.append(t) except ValueError: pass scenes = [] prev_time = 0.0 for i, t in enumerate(times): end_frame = round(t * fps) start_frame = round(prev_time * fps) if end_frame > start_frame: scenes.append({ "scene_number": i + 1, "start_frame": start_frame, "end_frame": end_frame - 1, "start_time": prev_time, "end_time": t - (1.0 / fps) if fps > 0 else t, }) prev_time = t last_frame = round(duration * fps) if fps > 0 else 0 prev_frame = round(prev_time * fps) if fps > 0 else 0 if last_frame > prev_frame: scenes.append({ "scene_number": len(scenes) + 1, "start_frame": prev_frame, "end_frame": last_frame - 1, "start_time": prev_time, "end_time": duration, }) return scenes except Exception: return [] def process_cut(video_path: str, output_path: str, uuid: str = ""): """Process video for scene detection and quality verification""" publisher = RedisPublisher(uuid) if uuid else None if publisher: publisher.info("cut", "CUT_START") vinfo = get_video_info(video_path) if publisher: publisher.info("cut", f"fps={vinfo['fps']}, frames={vinfo['frame_count']}, codec={vinfo['codec']}") total_frames = vinfo["frame_count"] fps = vinfo["fps"] duration = vinfo["duration"] # Try ffmpeg scene detection scenes = detect_scenes_ffmpeg(video_path, fps, duration) # Always ensure at least 1 scene if not scenes and total_frames > 0: scenes = [{ "scene_number": 1, "start_frame": 0, "end_frame": total_frames - 1, "start_time": 0.0, "end_time": duration, }] if publisher: publisher.info("cut", "No scene changes detected, using whole video as single scene") result = { "frame_count": total_frames, "fps": fps, "scenes": scenes, } with open(output_path, "w") as f: json.dump(result, f, indent=2) if publisher: publisher.complete("cut", f"{len(scenes)} scenes") return result if __name__ == "__main__": parser = argparse.ArgumentParser(description="Scene Detection") parser.add_argument("video_path", help="Path to video file") parser.add_argument("output_path", help="Output JSON path") parser.add_argument("--uuid", "-u", help="UUID for Redis progress", default="") args = parser.parse_args() process_cut(args.video_path, args.output_path, args.uuid)