#!/opt/homebrew/bin/python3.11 """ Face Processor Benchmark Runner 测试不同 Face processor 版本的性能和质量 测试版本: A. face_processor.py (InsightFace CPU) B. face_processor_mps.py (MediaPipe MPS) C. face_processor_optimized.py (OpenCV) D. face_processor_contract_v1.py (Contract) 测试指标: - 处理时间 - 内存峰值 (MB) - 检测人脸数 - 输出文件大小 (KB) - 是否有 embedding - 是否有 landmarks """ import os import sys import json import time import subprocess from pathlib import Path from datetime import datetime SCRIPTS_DIR = Path(__file__).parent OUTPUT_DIR = SCRIPTS_DIR.parent / "output" / "benchmark" / "face_processor" def get_video_info(video_path): """获取视频基本信息""" cmd = [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", video_path ] try: result = subprocess.run(cmd, capture_output=True, text=True, check=True) data = json.loads(result.stdout) video_stream = next((s for s in data["streams"] if s["codec_type"] == "video"), None) return { "duration": float(data["format"].get("duration", 0)), "size_mb": int(data["format"].get("size", 0)) / 1024 / 1024, "width": video_stream.get("width", 0) if video_stream else 0, "height": video_stream.get("height", 0) if video_stream else 0, "fps": video_stream.get("r_frame_rate", "0/1") if video_stream else "0/1", "total_frames": int(video_stream.get("nb_frames", 0)) if video_stream else 0 } except Exception as e: print(f"获取视频信息失败: {e}") return {} def get_memory_peak(pid): """获取进程内存峰值""" try: cmd = ["ps", "-p", str(pid), "-o", "rss="] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: return int(result.stdout.strip()) / 1024 except: pass return 0 def run_processor(script_name, video_path, output_path, uuid="", sample_interval=30): """运行指定 Face processor""" script_path = SCRIPTS_DIR / script_name if not script_path.exists(): print(f"❌ 脚本不存在: {script_path}") return None # 不同处理器使用不同的参数格式 if script_name == "face_processor_mps.py": cmd = [ sys.executable, str(script_path), "--video", video_path, "--output", output_path, "--sample-interval", str(sample_interval) ] else: cmd = [sys.executable, str(script_path), video_path, output_path] if uuid: cmd.extend(["--uuid", uuid]) if script_name in ["face_processor.py", "face_processor_optimized.py"]: cmd.extend(["--sample-interval", str(sample_interval)]) print(f"\n执行: {script_name}") print(f"命令: {' '.join(cmd)}") start_time = time.time() process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) peak_memory = 0 while process.poll() is None: mem = get_memory_peak(process.pid) if mem > peak_memory: peak_memory = mem time.sleep(0.5) stdout, stderr = process.communicate() elapsed_time = time.time() - start_time if process.returncode != 0: print(f"❌ 处理失败: {stderr}") return None if os.path.exists(output_path): with open(output_path) as f: result = json.load(f) frames_data = result.get("frames", {}) # 处理两种格式:字典或列表 if isinstance(frames_data, dict): total_faces = sum(len(f.get("faces", [])) for f in frames_data.values() if isinstance(f, dict)) has_embedding = False has_landmarks = False for frame_data in frames_data.values(): if isinstance(frame_data, dict): for face in frame_data.get("faces", []): if "embedding" in face: has_embedding = True if "landmarks" in face: has_landmarks = True elif isinstance(frames_data, list): total_faces = sum(len(f.get("faces", [])) for f in frames_data if isinstance(f, dict)) has_embedding = False has_landmarks = False for frame_data in frames_data: if isinstance(frame_data, dict): for face in frame_data.get("faces", []): if "embedding" in face: has_embedding = True if "landmarks" in face: has_landmarks = True else: total_faces = 0 has_embedding = False has_landmarks = False file_size_kb = os.path.getsize(output_path) / 1024 return { "elapsed_time": elapsed_time, "peak_memory_mb": peak_memory, "total_frames": len(frames_data), "total_faces": total_faces, "file_size_kb": file_size_kb, "has_embedding": has_embedding, "has_landmarks": has_landmarks, "stdout": stdout, "stderr": stderr } return None def analyze_output(output_path): """分析输出 JSON 质量""" if not os.path.exists(output_path): return None with open(output_path) as f: data = json.load(f) frames = data.get("frames", {}) if not frames: return {"error": "no frames"} # 处理两种格式 if isinstance(frames, dict): first_frame_key = list(frames.keys())[0] first_frame = frames[first_frame_key] elif isinstance(frames, list): first_frame = frames[0] if frames else {} else: return {"error": "unknown frames format"} faces = first_frame.get("faces", []) if not faces: return {"error": "no faces in first frame"} first_face = faces[0] return { "has_bbox": "bbox" in first_face, "has_confidence": "confidence" in first_face, "has_embedding": "embedding" in first_face, "embedding_dim": len(first_face.get("embedding", [])), "has_landmarks": "landmarks" in first_face, "landmarks_count": len(first_face.get("landmarks", [])), "has_age": "age" in first_face, "has_gender": "gender" in first_face } def main(): print("=" * 80) print("Face Processor Benchmark 测试") print("=" * 80) OUTPUT_DIR.mkdir(parents=True, exist_ok=True) video_uuid = "ac625815183a21e1" video_path = "/Users/accusys/momentry/var/sftpgo/data/demo/Gamma Carry Saves the World..mp4" if not os.path.exists(video_path): print(f"❌ 测试视频不存在: {video_path}") sys.exit(1) video_info = get_video_info(video_path) print("\n测试视频:") print(f" UUID: {video_uuid}") print(f" 文件: {video_info.get('size_mb', 0):.1f} MB") print(f" 时长: {video_info.get('duration', 0):.1f} 秒") print(f" 分辨率: {video_info.get('width', 0)}x{video_info.get('height', 0)}") print(f" FPS: {video_info.get('fps', 'unknown')}") print(f" 总帧数: {video_info.get('total_frames', 0)}") processors = [ ("A", "face_processor.py", "InsightFace CPU"), ("B", "face_processor_mps.py", "MediaPipe MPS"), ("C", "face_processor_optimized.py", "OpenCV Optimized"), ("D", "face_processor_contract_v1.py", "Contract v1"), ] results = [] for scheme_id, script_name, description in processors: print(f"\n{'=' * 80}") print(f"方案 {scheme_id}: {description}") print(f"{'=' * 80}") output_path = OUTPUT_DIR / f"scheme_{scheme_id}_{script_name.replace('.py', '.json')}" if os.path.exists(output_path): os.remove(output_path) result = run_processor( script_name, video_path, str(output_path), uuid=f"face_bench_{scheme_id}", sample_interval=30 ) if result: quality = analyze_output(str(output_path)) duration = video_info.get("duration", 0) speed = duration / result["elapsed_time"] if result["elapsed_time"] > 0 else 0 results.append({ "scheme": scheme_id, "script": script_name, "description": description, "elapsed_time": result["elapsed_time"], "peak_memory_mb": result["peak_memory_mb"], "total_frames": result["total_frames"], "total_faces": result["total_faces"], "file_size_kb": result["file_size_kb"], "speed_ratio": speed, "quality": quality, "has_embedding": result["has_embedding"], "has_landmarks": result["has_landmarks"] }) print("\n✅ 处理完成:") print(f" 时间: {result['elapsed_time']:.2f}秒") print(f" 速度: {speed:.2f}x 实时倍速") print(f" 内存峰值: {result['peak_memory_mb']:.1f} MB") print(f" 处理帧数: {result['total_frames']}") print(f" 检测人脸: {result['total_faces']}") print(f" 输出大小: {result['file_size_kb']:.1f} KB") print(f" Embedding: {'有' if result['has_embedding'] else '无'}") print(f" Landmarks: {'有' if result['has_landmarks'] else '无'}") if quality: print(f" 质量: {json.dumps(quality, indent=4)}") else: print(f"❌ 方案 {scheme_id} 处理失败") results.append({ "scheme": scheme_id, "script": script_name, "description": description, "error": "processing failed" }) report = { "test_date": datetime.now().isoformat(), "video_info": video_info, "video_uuid": video_uuid, "results": results } report_path = OUTPUT_DIR / "FACE_BENCHMARK_REPORT.json" with open(report_path, "w") as f: json.dump(report, f, indent=2, ensure_ascii=False) print(f"\n{'=' * 80}") print("测试报告已保存:") print(f" {report_path}") print(f"{'=' * 80}") print("\n【对比总结】") print("\n| 方案 | 脚本 | 时间(秒) | 速度 | 内存(MB) | 人脸数 | Embedding |") print("|------|------|---------|------|---------|--------|-----------|") for r in results: if "error" not in r: print(f"| {r['scheme']} | {r['script']} | {r['elapsed_time']:.2f} | {r['speed_ratio']:.2f}x | {r['peak_memory_mb']:.1f} | {r['total_faces']} | {'✅' if r['has_embedding'] else '❌'} |") else: print(f"| {r['scheme']} | {r['script']} | - | - | - | - | ❌ |") if __name__ == "__main__": main()