#!/opt/homebrew/bin/python3.11 """ Face Processor - 優化版 可調整採樣間隔,平衡速度與準確度 """ import sys import json import argparse import os import signal import subprocess sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from redis_publisher import RedisPublisher def signal_handler(signum, frame): print(f"Face: Received signal {signum}, exiting...") sys.exit(1) def has_audio_stream(video_path): """Check if video file has audio stream using ffprobe.""" try: cmd = [ "ffprobe", "-v", "error", "-select_streams", "a", "-show_entries", "stream=codec_type", "-of", "csv=p=0", video_path, ] result = subprocess.run(cmd, capture_output=True, text=True, check=True) return bool(result.stdout.strip()) except subprocess.CalledProcessError: return False except FileNotFoundError: print("WARNING: ffprobe not found, assuming audio exists") return True def process_face( video_path: str, output_path: str, uuid: str = "", sample_interval: int = 15 ): """ Process video for face detection Args: video_path: Path to video file output_path: Path to output JSON uuid: UUID for Redis progress sample_interval: Process every N frames (default: 15) """ signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) publisher = RedisPublisher(uuid) if uuid else None if publisher: publisher.info("face", "FACE_START") try: import cv2 except ImportError: if publisher: publisher.error("face", "opencv-python not installed") result = {"frame_count": 0, "fps": 0.0, "frames": []} if publisher: publisher.complete("face", "0 frames") with open(output_path, "w") as f: json.dump(result, f, indent=2) sys.exit(1) if publisher: publisher.info("face", "FACE_LOADING_CASCADE") # Load Haar Cascade face_cascade = cv2.CascadeClassifier( cv2.data.haarcascades + "haarcascade_frontalface_default.xml" ) if face_cascade.empty(): if publisher: publisher.error("face", "Could not load Haar Cascade") result = {"frame_count": 0, "fps": 0.0, "frames": []} if publisher: publisher.complete("face", "0 frames") with open(output_path, "w") as f: json.dump(result, f, indent=2) sys.exit(1) if publisher: publisher.info("face", "FACE_CASCADE_LOADED") # Get video info cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) cap.release() if publisher: publisher.info( "face", f"fps={fps}, frames={total_frames}, sample_interval={sample_interval}", ) publisher.progress("face", 0, total_frames, "Starting") frames = [] frame_count = 0 processed = 0 cap = cv2.VideoCapture(video_path) while True: ret, frame = cap.read() if not ret: break frame_count += 1 # Sample frames if frame_count % sample_interval != 0: continue processed += 1 timestamp = (frame_count - 1) / fps if fps > 0 else 0 # Convert to grayscale gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) # Detect faces try: faces = face_cascade.detectMultiScale( gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30) ) except Exception as e: if publisher: publisher.error("face", f"Frame {frame_count}: {e}") faces = [] face_list = [] for x, y, w, h in faces: face_list.append( { "face_id": None, "x": int(x), "y": int(y), "width": int(w), "height": int(h), "confidence": 0.8, } ) # Only add frames with faces if face_list: frames.append( { "frame": frame_count - 1, "timestamp": round(timestamp, 3), "faces": face_list, } ) if publisher: publisher.progress( "face", processed, total_frames // sample_interval, f"Frame {frame_count}, {len(face_list)} faces", ) cap.release() result = { "frame_count": total_frames, "fps": fps, "frames": frames, "sample_interval": sample_interval, "total_faces_detected": len(frames), } if publisher: publisher.complete("face", f"{len(frames)} frames with faces") with open(output_path, "w") as f: json.dump(result, f, indent=2) sys.stderr.write( f"Face: Detection complete, {len(frames)} frames written to {output_path}\n" ) sys.stderr.flush() sys.exit(0) if __name__ == "__main__": parser = argparse.ArgumentParser(description="Face Detection (Optimized)") parser.add_argument("video_path", help="Path to video file") parser.add_argument("output_path", help="Output JSON path") parser.add_argument("--uuid", "-u", help="UUID for Redis progress", default="") parser.add_argument( "--sample-interval", "-s", type=int, default=15, help="Process every N frames (default: 15)", ) args = parser.parse_args() process_face(args.video_path, args.output_path, args.uuid, args.sample_interval)