#!/opt/homebrew/bin/python3.11 """ Face Tracker - Track faces across frames using embedding similarity and bbox proximity Purpose: 1. Assign unique trace_id to each face across frames 2. Track face movement across adjacent frames 3. Output trace statistics (duration, path, confidence) Algorithm: 1. For first frame: assign new trace_id to each face 2. For subsequent frames: - Calculate bbox overlap with previous frame faces - Calculate embedding cosine similarity - Match faces if both conditions met - Assign same trace_id if matched, new trace_id if not Matching Conditions: - bbox overlap > 0.3 (IoU) - embedding similarity > 0.7 - OR single condition > threshold (fallback) Output: - face.json with trace_id added to each face - trace statistics report """ import json import argparse import numpy as np from typing import Dict, List, Set from collections import defaultdict def calculate_bbox_iou(bbox1: Dict, bbox2: Dict) -> float: """ Calculate Intersection over Union (IoU) between two bboxes Args: bbox1: {"x": int, "y": int, "width": int, "height": int} bbox2: same structure Returns: IoU score (0.0 - 1.0) """ x1, y1, w1, h1 = bbox1["x"], bbox1["y"], bbox1["width"], bbox1["height"] x2, y2, w2, h2 = bbox2["x"], bbox2["y"], bbox2["width"], bbox2["height"] x1_min, x1_max = x1, x1 + w1 y1_min, y1_max = y1, y1 + h1 x2_min, x2_max = x2, x2 + w2 y2_min, y2_max = y2, y2 + h2 inter_x_min = max(x1_min, x2_min) inter_x_max = min(x1_max, x2_max) inter_y_min = max(y1_min, y2_min) inter_y_max = min(y1_max, y2_max) if inter_x_max <= inter_x_min or inter_y_max <= inter_y_min: return 0.0 inter_area = (inter_x_max - inter_x_min) * (inter_y_max - inter_y_min) area1 = w1 * h1 area2 = w2 * h2 union_area = area1 + area2 - inter_area return inter_area / union_area if union_area > 0 else 0.0 def calculate_bbox_distance(bbox1: Dict, bbox2: Dict) -> float: """ Calculate center distance between two bboxes Returns: Euclidean distance between centers """ cx1 = bbox1["x"] + bbox1["width"] / 2 cy1 = bbox1["y"] + bbox1["height"] / 2 cx2 = bbox2["x"] + bbox2["width"] / 2 cy2 = bbox2["y"] + bbox2["height"] / 2 return np.sqrt((cx1 - cx2) ** 2 + (cy1 - cy2) ** 2) def calculate_embedding_similarity(emb1: List[float], emb2: List[float]) -> float: """ Calculate cosine similarity between two embeddings Returns: Cosine similarity (-1.0 - 1.0) """ if emb1 is None or emb2 is None: return 0.0 v1 = np.array(emb1) v2 = np.array(emb2) norm1 = np.linalg.norm(v1) norm2 = np.linalg.norm(v2) if norm1 == 0 or norm2 == 0: return 0.0 return np.dot(v1, v2) / (norm1 * norm2) def match_faces( current_faces: List[Dict], previous_faces: List[Dict], iou_threshold: float = 0.3, similarity_threshold: float = 0.7, distance_threshold: float = 100.0, use_embedding: bool = True, frame_gap: int = 1, cut_boundaries: Set[int] = None, prev_frame: int = None, curr_frame: int = None, ) -> Dict[int, int]: """ Match current frame faces to previous frame faces Args: current_faces: Faces in current frame previous_faces: Faces in previous frame iou_threshold: Minimum IoU for matching similarity_threshold: Minimum embedding similarity for matching distance_threshold: Maximum bbox center distance for matching use_embedding: Whether to use embedding similarity frame_gap: Number of frames between current and previous (1=adjacent) cut_boundaries: Set of frame numbers where scene cuts occur prev_frame: Previous frame number (for cut detection) curr_frame: Current frame number (for cut detection) Returns: Dict mapping current_face_index -> previous_face_index (or -1 if new) """ if not previous_faces: return {i: -1 for i in range(len(current_faces))} # If a scene cut exists between prev and current frame, force all new traces if cut_boundaries and prev_frame is not None and curr_frame is not None: for cf in cut_boundaries: if prev_frame < cf <= curr_frame: return {i: -1 for i in range(len(current_faces))} matches = {} used_prev = set() for curr_idx, curr_face in enumerate(current_faces): best_prev_idx = -1 best_score = 0.0 curr_bbox = { "x": curr_face["x"], "y": curr_face["y"], "width": curr_face["width"], "height": curr_face["height"], } curr_emb = curr_face.get("embedding") for prev_idx, prev_face in enumerate(previous_faces): if prev_idx in used_prev: continue prev_bbox = { "x": prev_face["x"], "y": prev_face["y"], "width": prev_face["width"], "height": prev_face["height"], } prev_emb = prev_face.get("embedding") iou = calculate_bbox_iou(curr_bbox, prev_bbox) distance = calculate_bbox_distance(curr_bbox, prev_bbox) similarity = 0.0 if use_embedding and curr_emb and prev_emb: similarity = calculate_embedding_similarity(curr_emb, prev_emb) # Bbox size consistency check: sudden size change = different person prev_area = prev_bbox["width"] * prev_bbox["height"] curr_area = curr_bbox["width"] * curr_bbox["height"] area_ratio = max(curr_area, prev_area) / max(1, min(curr_area, prev_area)) score = 0.0 # Reject only if BOTH embedding AND IoU disagree (different person + different position) if use_embedding and curr_emb and prev_emb and similarity < 0.5 and iou < 0.3: continue # Reject if bbox size changes by more than 5x (e.g., far shot → close-up) if area_ratio > 5.0 and similarity < 0.8: continue # Edge exit: if previous face was near frame edge and current face is not, # the old face likely exited and a new face appeared prev_at_edge = (prev_bbox["x"] < 50 or prev_bbox["x"] + prev_bbox["width"] > 1870 or prev_bbox["y"] < 50 or prev_bbox["y"] + prev_bbox["height"] > 1030) curr_at_edge = (curr_bbox["x"] < 50 or curr_bbox["x"] + curr_bbox["width"] > 1870 or curr_bbox["y"] < 50 or curr_bbox["y"] + curr_bbox["height"] > 1030) if prev_at_edge and not curr_at_edge and similarity < 0.8: continue if iou > iou_threshold and similarity > similarity_threshold: score = iou + similarity elif iou > 0.5 and similarity > 0.65: score = iou * 1.5 + similarity * 0.5 elif iou > 0.35 and distance < distance_threshold: score = iou * 2 - distance / 500 elif similarity > 0.85: score = similarity * 2 elif similarity > 0.75 and distance < distance_threshold: score = similarity - distance / 1000 # For frame gaps (tracking lost and recovered), require higher confidence elif frame_gap > 1 and similarity > 0.8 and iou > 0.2: score = similarity + iou if score > best_score: best_score = score best_prev_idx = prev_idx if best_prev_idx >= 0 and best_score > 0: matches[curr_idx] = best_prev_idx used_prev.add(best_prev_idx) else: matches[curr_idx] = -1 return matches def track_faces( face_data: Dict, iou_threshold: float = 0.3, similarity_threshold: float = 0.7, distance_threshold: float = 100.0, use_embedding: bool = True, cut_boundaries: Set[int] = None, ) -> Dict: """ Track faces across all frames Args: face_data: face.json data iou_threshold: IoU threshold for matching similarity_threshold: Embedding similarity threshold distance_threshold: Distance threshold for matching use_embedding: Whether to use embedding Returns: Updated face_data with trace_id added to each face """ frames = face_data.get("frames", {}) if not frames: print("No frames found in face.json") return face_data sorted_frames = sorted(frames.items(), key=lambda x: int(x[0])) next_trace_id = 0 traces = defaultdict(list) prev_faces = [] prev_trace_ids = [] prev_frame_num = None prev_face_frame = None # last frame number that had actual faces print(f"\nTracking faces across {len(sorted_frames)} frames...") print(f"Parameters: iou={iou_threshold}, similarity={similarity_threshold}, distance={distance_threshold}") print() for frame_num_str, frame_data in sorted_frames: frame_num = int(frame_num_str) frame_gap = frame_num - prev_frame_num if prev_frame_num is not None else 1 prev_frame_num = frame_num faces = frame_data.get("faces", []) if not faces: prev_faces = [] prev_trace_ids = [] continue matches = match_faces( faces, prev_faces, iou_threshold, similarity_threshold, distance_threshold, use_embedding, frame_gap, cut_boundaries, prev_face_frame, frame_num, ) trace_ids = [] for curr_idx, prev_idx in matches.items(): if prev_idx >= 0: trace_id = prev_trace_ids[prev_idx] else: trace_id = next_trace_id next_trace_id += 1 faces[curr_idx]["trace_id"] = trace_id trace_ids.append(trace_id) traces[trace_id].append({ "frame": frame_num, "face_index": curr_idx, "bbox": { "x": faces[curr_idx]["x"], "y": faces[curr_idx]["y"], "width": faces[curr_idx]["width"], "height": faces[curr_idx]["height"], }, "confidence": faces[curr_idx].get("confidence", 0.0), "pose_angle": faces[curr_idx].get("pose_angle", {}).get("angle", "unknown"), "pose_full": faces[curr_idx].get("pose_angle", {}), # 完整 pose 信息 }) prev_faces = faces prev_trace_ids = trace_ids prev_face_frame = frame_num if frame_num % 100 == 0: print(f" Frame {frame_num}: {len(faces)} faces, {len(set(trace_ids))} active traces") face_data["traces"] = {} for trace_id, path in traces.items(): if len(path) >= 1: duration_frames = path[-1]["frame"] - path[0]["frame"] + 1 avg_confidence = sum(p["confidence"] for p in path) / len(path) pose_angles = [p["pose_angle"] for p in path] # Pose Trace: 完整 pose 信息 pose_trace = [] for p in path: pose_info = p.get("pose_full", {}) pose_trace.append({ "frame": p["frame"], "angle": pose_info.get("angle", "unknown"), "confidence": pose_info.get("confidence", 0.0), "pitch": pose_info.get("pitch", "neutral"), "features": pose_info.get("features", {}), }) # Pose Statistics pose_counts = defaultdict(int) pose_confidence_by_angle = defaultdict(list) for pose in pose_trace: pose_counts[pose["angle"]] += 1 pose_confidence_by_angle[pose["angle"]].append(pose["confidence"]) pose_statistics = { "distribution": dict(pose_counts), "avg_confidence_by_angle": { angle: round(sum(conf_list) / len(conf_list), 3) for angle, conf_list in pose_confidence_by_angle.items() }, "dominant_angle": max(pose_counts.items(), key=lambda x: x[1])[0] if pose_counts else "unknown", "pose_count": len(pose_counts), } # Pose Transitions: pose 变化事件 pose_transitions = [] prev_pose = None for i, pose in enumerate(pose_trace): if prev_pose is not None and pose["angle"] != prev_pose["angle"]: pose_transitions.append({ "frame": pose["frame"], "from_angle": prev_pose["angle"], "to_angle": pose["angle"], "transition_index": len(pose_transitions) + 1, }) prev_pose = pose face_data["traces"][str(trace_id)] = { "trace_id": trace_id, "start_frame": path[0]["frame"], "end_frame": path[-1]["frame"], "duration_frames": duration_frames, "duration_seconds": duration_frames / face_data["metadata"]["fps"], "total_appearances": len(path), "avg_confidence": avg_confidence, "pose_angles": pose_angles, "pose_trace": pose_trace, "pose_statistics": pose_statistics, "pose_transitions": pose_transitions, "path": path, } face_data["metadata"]["trace_stats"] = { "total_traces": next_trace_id, "active_traces": len(traces), "long_traces": len([t for t in traces.values() if len(t) >= 2]), } return face_data def analyze_traces(face_data: Dict) -> None: """ Analyze and print trace statistics """ traces = face_data.get("traces", {}) metadata = face_data.get("metadata", {}) print("\n" + "=" * 60) print("Face Trace Analysis") print("=" * 60) print(f"\nTotal traces: {metadata.get('trace_stats', {}).get('total_traces', 0)}") print(f"Long traces (>= 2 frames): {len(traces)}") if not traces: return sorted_traces = sorted(traces.values(), key=lambda x: x["duration_frames"], reverse=True) print("\n=== Top 10 Longest Traces ===") for i, trace in enumerate(sorted_traces[:10]): print(f"\nTrace {trace['trace_id']}:") print(f" Frames: {trace['start_frame']} - {trace['end_frame']} ({trace['duration_frames']} frames)") print(f" Duration: {trace['duration_seconds']:.2f} seconds") print(f" Appearances: {trace['total_appearances']}") print(f" Avg Confidence: {trace['avg_confidence']:.3f}") # Pose Statistics pose_stats = trace.get("pose_statistics", {}) print(f" Pose Distribution: {pose_stats.get('distribution', {})}") print(f" Dominant Angle: {pose_stats.get('dominant_angle', 'unknown')}") # Pose Transitions transitions = trace.get("pose_transitions", []) if transitions: print(f" Pose Transitions: {len(transitions)} events") for t in transitions[:3]: # 只显示前 3 个 print(f" - Frame {t['frame']}: {t['from_angle']} → {t['to_angle']}") pose_stats = defaultdict(int) for trace in traces.values(): for pose in trace["pose_angles"]: pose_stats[pose] += 1 print("\n=== Pose Distribution in Traces ===") for pose, count in sorted(pose_stats.items(), key=lambda x: x[1], reverse=True): print(f" {pose}: {count}") duration_distribution = defaultdict(int) for trace in traces.values(): d = trace["duration_frames"] if d <= 30: duration_distribution["short (<= 30 frames)"] += 1 elif d <= 90: duration_distribution["medium (31-90 frames)"] += 1 else: duration_distribution["long (> 90 frames)"] += 1 print("\n=== Trace Duration Distribution ===") for duration, count in sorted(duration_distribution.items()): print(f" {duration}: {count}") def main(): parser = argparse.ArgumentParser(description="Track faces across frames") parser.add_argument("--face-json", required=True, help="Path to face.json") parser.add_argument("--output", help="Output path (default: face_traced.json)") parser.add_argument("--iou-threshold", type=float, default=0.3, help="IoU threshold") parser.add_argument("--similarity-threshold", type=float, default=0.7, help="Embedding similarity threshold") parser.add_argument("--distance-threshold", type=float, default=100.0, help="Distance threshold") parser.add_argument("--no-embedding", action="store_true", help="Disable embedding matching") parser.add_argument("--cuts-json", help="Path to cut.json for scene-cut-aware tracking") parser.add_argument("--analyze-only", action="store_true", help="Only analyze, don't output") args = parser.parse_args() # Load cut boundaries if provided cut_boundaries = None if args.cuts_json: with open(args.cuts_json) as f: cuts = json.load(f) cut_boundaries = {s["start_frame"] for s in cuts.get("scenes", []) if s["start_frame"] > 0} print(f" Cut boundaries loaded: {len(cut_boundaries)} cuts") print("=" * 60) print("Face Tracker") print("=" * 60) with open(args.face_json) as f: face_data = json.load(f) print(f"\nInput: {args.face_json}") print(f"Frames: {len(face_data.get('frames', {}))}") face_data = track_faces( face_data, iou_threshold=args.iou_threshold, similarity_threshold=args.similarity_threshold, distance_threshold=args.distance_threshold, use_embedding=not args.no_embedding, cut_boundaries=cut_boundaries, ) analyze_traces(face_data) if not args.analyze_only: output_path = args.output or args.face_json.replace(".json", "_traced.json") with open(output_path, "w") as f: json.dump(face_data, f, indent=2) print(f"\n✅ Output saved to: {output_path}") if __name__ == "__main__": main()