#!/opt/homebrew/bin/python3.11 """ Face Detection Count Comparison 对比四个版本在同一帧上的检测数量差异 """ import json from pathlib import Path def load_results(filepath): """加载检测结果""" with open(filepath) as f: data = json.load(f) results = {} # 处理不同的帧格式 frames = data.get('frames', {}) if isinstance(frames, dict): # InsightFace, MediaPipe格式: {"750": {...}, "900": {...}} for frame_num, frame_data in frames.items(): if isinstance(frame_data, dict): faces = frame_data.get('faces', []) results[int(frame_num)] = { 'count': len(faces), 'faces': faces, 'timestamp': frame_data.get('timestamp', frame_data.get('time_seconds', 0)) } elif isinstance(frames, list): # OpenCV格式: [{...}, {...}] for frame_data in frames: if isinstance(frame_data, dict): frame_num = frame_data.get('frame', 0) faces = frame_data.get('faces', []) results[frame_num] = { 'count': len(faces), 'faces': faces, 'timestamp': frame_data.get('timestamp', 0) } return results def compare_frames(results_a, results_b, results_c): """对比同一帧的检测结果""" # 找出所有检测到的帧号 all_frames = set() all_frames.update(results_a.keys()) all_frames.update(results_b.keys()) all_frames.update(results_c.keys()) comparison = [] for frame_num in sorted(all_frames): a_count = results_a.get(frame_num, {}).get('count', 0) b_count = results_b.get(frame_num, {}).get('count', 0) c_count = results_c.get(frame_num, {}).get('count', 0) # 只对比检测数量不同的帧 if not (a_count == b_count == c_count): comparison.append({ 'frame': frame_num, 'timestamp': results_a.get(frame_num, {}).get('timestamp', results_b.get(frame_num, {}).get('timestamp', results_c.get(frame_num, {}).get('timestamp', 0))), 'insightface': a_count, 'mediapipe': b_count, 'opencv': c_count, 'max': max(a_count, b_count, c_count), 'min': min(a_count, b_count, c_count), 'diff': max(a_count, b_count, c_count) - min(a_count, b_count, c_count) }) return comparison def analyze_detection_distribution(results_a, results_b, results_c): """分析检测分布""" stats = { 'insightface': { 'total_faces': sum(r['count'] for r in results_a.values()), 'total_frames': len(results_a), 'avg_per_frame': 0, 'frames_with_faces': len([r for r in results_a.values() if r['count'] > 0]), 'frames_no_faces': len([r for r in results_a.values() if r['count'] == 0]), 'max_faces': max(r['count'] for r in results_a.values()) if results_a else 0, }, 'mediapipe': { 'total_faces': sum(r['count'] for r in results_b.values()), 'total_frames': len(results_b), 'avg_per_frame': 0, 'frames_with_faces': len([r for r in results_b.values() if r['count'] > 0]), 'frames_no_faces': len([r for r in results_b.values() if r['count'] == 0]), 'max_faces': max(r['count'] for r in results_b.values()) if results_b else 0, }, 'opencv': { 'total_faces': sum(r['count'] for r in results_c.values()), 'total_frames': len(results_c), 'avg_per_frame': 0, 'frames_with_faces': len([r for r in results_c.values() if r['count'] > 0]), 'frames_no_faces': len([r for r in results_c.values() if r['count'] == 0]), 'max_faces': max(r['count'] for r in results_c.values()) if results_c else 0, } } for key in stats: if stats[key]['total_frames'] > 0: stats[key]['avg_per_frame'] = stats[key]['total_faces'] / stats[key]['frames_with_faces'] return stats def find_missed_frames(results_a, results_b, results_c): """找出被漏检的帧""" all_frames = set() all_frames.update(results_a.keys()) all_frames.update(results_b.keys()) all_frames.update(results_c.keys()) missed = [] for frame_num in sorted(all_frames): a = results_a.get(frame_num, {}).get('count', 0) b = results_b.get(frame_num, {}).get('count', 0) c = results_c.get(frame_num, {}).get('count', 0) # 某个版本完全漏检(检测到0张) if a > 0 and b == 0: missed.append({ 'frame': frame_num, 'missed_by': 'MediaPipe', 'insightface_count': a, 'opencv_count': c }) if a > 0 and c == 0: missed.append({ 'frame': frame_num, 'missed_by': 'OpenCV', 'insightface_count': a, 'mediapipe_count': b }) if (a > 0 or c > 0) and b == 0: missed.append({ 'frame': frame_num, 'missed_by': 'MediaPipe', 'others_count': max(a, c) }) return missed def main(): benchmark_dir = Path('/Users/accusys/momentry_core_0.1/output/benchmark/face_processor') # 加载四个版本的结果 print("=" * 80) print("Face Detection Count Comparison") print("=" * 80) print() results_a = load_results(benchmark_dir / 'scheme_A_fixed.json') results_b = load_results(benchmark_dir / 'scheme_B_mediapipe_fixed.json') results_c = load_results(benchmark_dir / 'scheme_C_face_processor_optimized.json') # results_d = load_results(benchmark_dir / 'scheme_D_face_processor_contract_v1.json') print("【检测结果统计】") print() stats = analyze_detection_distribution(results_a, results_b, results_c) print("| 版本 | 总人脸数 | 检测帧数 | 有人脸帧 | 无人脸帧 | 平均每帧 | 最多人脸 |") print("|------|---------|---------|---------|---------|---------|---------|") for name, s in stats.items(): print(f"| {name} | {s['total_faces']} | {s['total_frames']} | {s['frames_with_faces']} | {s['frames_no_faces']} | {s['avg_per_frame']:.2f} | {s['max_faces']} |") print() print("【检测数量差异对比】") print() comparison = compare_frames(results_a, results_b, results_c) print(f"共有 {len(comparison)} 帧检测数量不同") print() print("| 帧号 | 时间(秒) | InsightFace | MediaPipe | OpenCV | 最大差异 |") print("|------|---------|------------|----------|--------|---------|") for item in comparison[:30]: # 只显示前30帧 print(f"| {item['frame']} | {item['timestamp']:.2f} | {item['insightface']} | {item['mediapipe']} | {item['opencv']} | {item['diff']} |") if len(comparison) > 30: print("| ... | ... | ... | ... | ... | ... |") print(f"| 共 {len(comparison)} 帧有差异 |") print() print("【漏检分析】") print() missed = find_missed_frames(results_a, results_b, results_c) mediapipe_missed = [m for m in missed if m.get('missed_by') == 'MediaPipe'] opencv_missed = [m for m in missed if m.get('missed_by') == 'OpenCV'] print(f"MediaPipe漏检帧数: {len(mediapipe_missed)}") print(f"OpenCV漏检帧数: {len(opencv_missed)}") print() if mediapipe_missed: print("MediaPipe漏检详情(前10帧):") print("| 帧号 | InsightFace检测 | OpenCV检测 |") print("|------|----------------|-----------|") for m in mediapipe_missed[:10]: print(f"| {m['frame']} | {m.get('insightface_count', m.get('others_count', '?'))} | {m.get('opencv_count', '?')} |") print() print("【检测率分析】") print() baseline = stats['insightface']['total_faces'] print(f"以InsightFace为基准({baseline}张人脸):") print() print("| 版本 | 检测数 | 检测率 | 漏检数 |") print("|------|--------|--------|--------|") for name, s in stats.items(): rate = s['total_faces'] / baseline * 100 if baseline > 0 else 0 missed_count = baseline - s['total_faces'] print(f"| {name} | {s['total_faces']} | {rate:.1f}% | {missed_count} |") print() print("=" * 80) print("对比完成") print("=" * 80) # 保存详细对比结果 output = { 'stats': stats, 'comparison': comparison, 'missed_frames': missed, 'summary': { 'baseline_faces': baseline, 'mediapipe_detection_rate': stats['mediapipe']['total_faces'] / baseline * 100 if baseline > 0 else 0, 'opencv_detection_rate': stats['opencv']['total_faces'] / baseline * 100 if baseline > 0 else 0, } } output_path = benchmark_dir / 'FACE_COUNT_COMPARISON.json' with open(output_path, 'w') as f: json.dump(output, f, indent=2, ensure_ascii=False) print(f"\n详细对比已保存: {output_path}") if __name__ == '__main__': main()