517 lines
18 KiB
Python
Executable File
517 lines
18 KiB
Python
Executable File
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
Face Tracker - Track faces across frames using embedding similarity and bbox proximity
|
|
|
|
Purpose:
|
|
1. Assign unique trace_id to each face across frames
|
|
2. Track face movement across adjacent frames
|
|
3. Output trace statistics (duration, path, confidence)
|
|
|
|
Algorithm:
|
|
1. For first frame: assign new trace_id to each face
|
|
2. For subsequent frames:
|
|
- Calculate bbox overlap with previous frame faces
|
|
- Calculate embedding cosine similarity
|
|
- Match faces if both conditions met
|
|
- Assign same trace_id if matched, new trace_id if not
|
|
|
|
Matching Conditions:
|
|
- bbox overlap > 0.3 (IoU)
|
|
- embedding similarity > 0.7
|
|
- OR single condition > threshold (fallback)
|
|
|
|
Output:
|
|
- face.json with trace_id added to each face
|
|
- trace statistics report
|
|
"""
|
|
|
|
import json
|
|
import argparse
|
|
import numpy as np
|
|
from typing import Dict, List, Set
|
|
from collections import defaultdict
|
|
|
|
|
|
def calculate_bbox_iou(bbox1: Dict, bbox2: Dict) -> float:
|
|
"""
|
|
Calculate Intersection over Union (IoU) between two bboxes
|
|
|
|
Args:
|
|
bbox1: {"x": int, "y": int, "width": int, "height": int}
|
|
bbox2: same structure
|
|
|
|
Returns:
|
|
IoU score (0.0 - 1.0)
|
|
"""
|
|
x1, y1, w1, h1 = bbox1["x"], bbox1["y"], bbox1["width"], bbox1["height"]
|
|
x2, y2, w2, h2 = bbox2["x"], bbox2["y"], bbox2["width"], bbox2["height"]
|
|
|
|
x1_min, x1_max = x1, x1 + w1
|
|
y1_min, y1_max = y1, y1 + h1
|
|
x2_min, x2_max = x2, x2 + w2
|
|
y2_min, y2_max = y2, y2 + h2
|
|
|
|
inter_x_min = max(x1_min, x2_min)
|
|
inter_x_max = min(x1_max, x2_max)
|
|
inter_y_min = max(y1_min, y2_min)
|
|
inter_y_max = min(y1_max, y2_max)
|
|
|
|
if inter_x_max <= inter_x_min or inter_y_max <= inter_y_min:
|
|
return 0.0
|
|
|
|
inter_area = (inter_x_max - inter_x_min) * (inter_y_max - inter_y_min)
|
|
area1 = w1 * h1
|
|
area2 = w2 * h2
|
|
union_area = area1 + area2 - inter_area
|
|
|
|
return inter_area / union_area if union_area > 0 else 0.0
|
|
|
|
|
|
def calculate_bbox_distance(bbox1: Dict, bbox2: Dict) -> float:
|
|
"""
|
|
Calculate center distance between two bboxes
|
|
|
|
Returns:
|
|
Euclidean distance between centers
|
|
"""
|
|
cx1 = bbox1["x"] + bbox1["width"] / 2
|
|
cy1 = bbox1["y"] + bbox1["height"] / 2
|
|
cx2 = bbox2["x"] + bbox2["width"] / 2
|
|
cy2 = bbox2["y"] + bbox2["height"] / 2
|
|
|
|
return np.sqrt((cx1 - cx2) ** 2 + (cy1 - cy2) ** 2)
|
|
|
|
|
|
def calculate_embedding_similarity(emb1: List[float], emb2: List[float]) -> float:
|
|
"""
|
|
Calculate cosine similarity between two embeddings
|
|
|
|
Returns:
|
|
Cosine similarity (-1.0 - 1.0)
|
|
"""
|
|
if emb1 is None or emb2 is None:
|
|
return 0.0
|
|
|
|
v1 = np.array(emb1)
|
|
v2 = np.array(emb2)
|
|
|
|
norm1 = np.linalg.norm(v1)
|
|
norm2 = np.linalg.norm(v2)
|
|
|
|
if norm1 == 0 or norm2 == 0:
|
|
return 0.0
|
|
|
|
return np.dot(v1, v2) / (norm1 * norm2)
|
|
|
|
|
|
def match_faces(
|
|
current_faces: List[Dict],
|
|
previous_faces: List[Dict],
|
|
iou_threshold: float = 0.3,
|
|
similarity_threshold: float = 0.7,
|
|
distance_threshold: float = 100.0,
|
|
use_embedding: bool = True,
|
|
frame_gap: int = 1,
|
|
cut_boundaries: Set[int] = None,
|
|
prev_frame: int = None,
|
|
curr_frame: int = None,
|
|
) -> Dict[int, int]:
|
|
"""
|
|
Match current frame faces to previous frame faces
|
|
|
|
Args:
|
|
current_faces: Faces in current frame
|
|
previous_faces: Faces in previous frame
|
|
iou_threshold: Minimum IoU for matching
|
|
similarity_threshold: Minimum embedding similarity for matching
|
|
distance_threshold: Maximum bbox center distance for matching
|
|
use_embedding: Whether to use embedding similarity
|
|
frame_gap: Number of frames between current and previous (1=adjacent)
|
|
cut_boundaries: Set of frame numbers where scene cuts occur
|
|
prev_frame: Previous frame number (for cut detection)
|
|
curr_frame: Current frame number (for cut detection)
|
|
|
|
Returns:
|
|
Dict mapping current_face_index -> previous_face_index (or -1 if new)
|
|
"""
|
|
if not previous_faces:
|
|
return {i: -1 for i in range(len(current_faces))}
|
|
|
|
# If a scene cut exists between prev and current frame, force all new traces
|
|
if cut_boundaries and prev_frame is not None and curr_frame is not None:
|
|
for cf in cut_boundaries:
|
|
if prev_frame < cf <= curr_frame:
|
|
return {i: -1 for i in range(len(current_faces))}
|
|
|
|
matches = {}
|
|
used_prev = set()
|
|
|
|
for curr_idx, curr_face in enumerate(current_faces):
|
|
best_prev_idx = -1
|
|
best_score = 0.0
|
|
|
|
curr_bbox = {
|
|
"x": curr_face["x"],
|
|
"y": curr_face["y"],
|
|
"width": curr_face["width"],
|
|
"height": curr_face["height"],
|
|
}
|
|
curr_emb = curr_face.get("embedding")
|
|
|
|
for prev_idx, prev_face in enumerate(previous_faces):
|
|
if prev_idx in used_prev:
|
|
continue
|
|
|
|
prev_bbox = {
|
|
"x": prev_face["x"],
|
|
"y": prev_face["y"],
|
|
"width": prev_face["width"],
|
|
"height": prev_face["height"],
|
|
}
|
|
prev_emb = prev_face.get("embedding")
|
|
|
|
iou = calculate_bbox_iou(curr_bbox, prev_bbox)
|
|
distance = calculate_bbox_distance(curr_bbox, prev_bbox)
|
|
|
|
similarity = 0.0
|
|
if use_embedding and curr_emb and prev_emb:
|
|
similarity = calculate_embedding_similarity(curr_emb, prev_emb)
|
|
|
|
# Bbox size consistency check: sudden size change = different person
|
|
prev_area = prev_bbox["width"] * prev_bbox["height"]
|
|
curr_area = curr_bbox["width"] * curr_bbox["height"]
|
|
area_ratio = max(curr_area, prev_area) / max(1, min(curr_area, prev_area))
|
|
|
|
score = 0.0
|
|
|
|
# Reject only if BOTH embedding AND IoU disagree (different person + different position)
|
|
if use_embedding and curr_emb and prev_emb and similarity < 0.5 and iou < 0.3:
|
|
continue
|
|
|
|
# Reject if bbox size changes by more than 5x (e.g., far shot → close-up)
|
|
if area_ratio > 5.0 and similarity < 0.8:
|
|
continue
|
|
|
|
# Edge exit: if previous face was near frame edge and current face is not,
|
|
# the old face likely exited and a new face appeared
|
|
prev_at_edge = (prev_bbox["x"] < 50 or
|
|
prev_bbox["x"] + prev_bbox["width"] > 1870 or
|
|
prev_bbox["y"] < 50 or
|
|
prev_bbox["y"] + prev_bbox["height"] > 1030)
|
|
curr_at_edge = (curr_bbox["x"] < 50 or
|
|
curr_bbox["x"] + curr_bbox["width"] > 1870 or
|
|
curr_bbox["y"] < 50 or
|
|
curr_bbox["y"] + curr_bbox["height"] > 1030)
|
|
if prev_at_edge and not curr_at_edge and similarity < 0.8:
|
|
continue
|
|
|
|
if iou > iou_threshold and similarity > similarity_threshold:
|
|
score = iou + similarity
|
|
elif iou > 0.5 and similarity > 0.65:
|
|
score = iou * 1.5 + similarity * 0.5
|
|
elif iou > 0.35 and distance < distance_threshold:
|
|
score = iou * 2 - distance / 500
|
|
elif similarity > 0.85:
|
|
score = similarity * 2
|
|
elif similarity > 0.75 and distance < distance_threshold:
|
|
score = similarity - distance / 1000
|
|
# For frame gaps (tracking lost and recovered), require higher confidence
|
|
elif frame_gap > 1 and similarity > 0.8 and iou > 0.2:
|
|
score = similarity + iou
|
|
|
|
if score > best_score:
|
|
best_score = score
|
|
best_prev_idx = prev_idx
|
|
|
|
if best_prev_idx >= 0 and best_score > 0:
|
|
matches[curr_idx] = best_prev_idx
|
|
used_prev.add(best_prev_idx)
|
|
else:
|
|
matches[curr_idx] = -1
|
|
|
|
return matches
|
|
|
|
|
|
def track_faces(
|
|
face_data: Dict,
|
|
iou_threshold: float = 0.3,
|
|
similarity_threshold: float = 0.7,
|
|
distance_threshold: float = 100.0,
|
|
use_embedding: bool = True,
|
|
cut_boundaries: Set[int] = None,
|
|
) -> Dict:
|
|
"""
|
|
Track faces across all frames
|
|
|
|
Args:
|
|
face_data: face.json data
|
|
iou_threshold: IoU threshold for matching
|
|
similarity_threshold: Embedding similarity threshold
|
|
distance_threshold: Distance threshold for matching
|
|
use_embedding: Whether to use embedding
|
|
|
|
Returns:
|
|
Updated face_data with trace_id added to each face
|
|
"""
|
|
frames = face_data.get("frames", {})
|
|
|
|
if not frames:
|
|
print("No frames found in face.json")
|
|
return face_data
|
|
|
|
sorted_frames = sorted(frames.items(), key=lambda x: int(x[0]))
|
|
|
|
next_trace_id = 0
|
|
traces = defaultdict(list)
|
|
|
|
prev_faces = []
|
|
prev_trace_ids = []
|
|
prev_frame_num = None
|
|
prev_face_frame = None # last frame number that had actual faces
|
|
|
|
print(f"\nTracking faces across {len(sorted_frames)} frames...")
|
|
print(f"Parameters: iou={iou_threshold}, similarity={similarity_threshold}, distance={distance_threshold}")
|
|
print()
|
|
|
|
for frame_num_str, frame_data in sorted_frames:
|
|
frame_num = int(frame_num_str)
|
|
frame_gap = frame_num - prev_frame_num if prev_frame_num is not None else 1
|
|
prev_frame_num = frame_num
|
|
|
|
faces = frame_data.get("faces", [])
|
|
|
|
if not faces:
|
|
prev_faces = []
|
|
prev_trace_ids = []
|
|
continue
|
|
|
|
matches = match_faces(
|
|
faces,
|
|
prev_faces,
|
|
iou_threshold,
|
|
similarity_threshold,
|
|
distance_threshold,
|
|
use_embedding,
|
|
frame_gap,
|
|
cut_boundaries,
|
|
prev_face_frame,
|
|
frame_num,
|
|
)
|
|
|
|
trace_ids = []
|
|
for curr_idx, prev_idx in matches.items():
|
|
if prev_idx >= 0:
|
|
trace_id = prev_trace_ids[prev_idx]
|
|
else:
|
|
trace_id = next_trace_id
|
|
next_trace_id += 1
|
|
|
|
faces[curr_idx]["trace_id"] = trace_id
|
|
trace_ids.append(trace_id)
|
|
traces[trace_id].append({
|
|
"frame": frame_num,
|
|
"face_index": curr_idx,
|
|
"bbox": {
|
|
"x": faces[curr_idx]["x"],
|
|
"y": faces[curr_idx]["y"],
|
|
"width": faces[curr_idx]["width"],
|
|
"height": faces[curr_idx]["height"],
|
|
},
|
|
"confidence": faces[curr_idx].get("confidence", 0.0),
|
|
"pose_angle": faces[curr_idx].get("pose_angle", {}).get("angle", "unknown"),
|
|
"pose_full": faces[curr_idx].get("pose_angle", {}), # 完整 pose 信息
|
|
})
|
|
|
|
prev_faces = faces
|
|
prev_trace_ids = trace_ids
|
|
prev_face_frame = frame_num
|
|
|
|
if frame_num % 100 == 0:
|
|
print(f" Frame {frame_num}: {len(faces)} faces, {len(set(trace_ids))} active traces")
|
|
|
|
face_data["traces"] = {}
|
|
for trace_id, path in traces.items():
|
|
if len(path) >= 1:
|
|
duration_frames = path[-1]["frame"] - path[0]["frame"] + 1
|
|
avg_confidence = sum(p["confidence"] for p in path) / len(path)
|
|
pose_angles = [p["pose_angle"] for p in path]
|
|
|
|
# Pose Trace: 完整 pose 信息
|
|
pose_trace = []
|
|
for p in path:
|
|
pose_info = p.get("pose_full", {})
|
|
pose_trace.append({
|
|
"frame": p["frame"],
|
|
"angle": pose_info.get("angle", "unknown"),
|
|
"confidence": pose_info.get("confidence", 0.0),
|
|
"pitch": pose_info.get("pitch", "neutral"),
|
|
"features": pose_info.get("features", {}),
|
|
})
|
|
|
|
# Pose Statistics
|
|
pose_counts = defaultdict(int)
|
|
pose_confidence_by_angle = defaultdict(list)
|
|
for pose in pose_trace:
|
|
pose_counts[pose["angle"]] += 1
|
|
pose_confidence_by_angle[pose["angle"]].append(pose["confidence"])
|
|
|
|
pose_statistics = {
|
|
"distribution": dict(pose_counts),
|
|
"avg_confidence_by_angle": {
|
|
angle: round(sum(conf_list) / len(conf_list), 3)
|
|
for angle, conf_list in pose_confidence_by_angle.items()
|
|
},
|
|
"dominant_angle": max(pose_counts.items(), key=lambda x: x[1])[0] if pose_counts else "unknown",
|
|
"pose_count": len(pose_counts),
|
|
}
|
|
|
|
# Pose Transitions: pose 变化事件
|
|
pose_transitions = []
|
|
prev_pose = None
|
|
for i, pose in enumerate(pose_trace):
|
|
if prev_pose is not None and pose["angle"] != prev_pose["angle"]:
|
|
pose_transitions.append({
|
|
"frame": pose["frame"],
|
|
"from_angle": prev_pose["angle"],
|
|
"to_angle": pose["angle"],
|
|
"transition_index": len(pose_transitions) + 1,
|
|
})
|
|
prev_pose = pose
|
|
|
|
face_data["traces"][str(trace_id)] = {
|
|
"trace_id": trace_id,
|
|
"start_frame": path[0]["frame"],
|
|
"end_frame": path[-1]["frame"],
|
|
"duration_frames": duration_frames,
|
|
"duration_seconds": duration_frames / face_data["metadata"]["fps"],
|
|
"total_appearances": len(path),
|
|
"avg_confidence": avg_confidence,
|
|
"pose_angles": pose_angles,
|
|
"pose_trace": pose_trace,
|
|
"pose_statistics": pose_statistics,
|
|
"pose_transitions": pose_transitions,
|
|
"path": path,
|
|
}
|
|
|
|
face_data["metadata"]["trace_stats"] = {
|
|
"total_traces": next_trace_id,
|
|
"active_traces": len(traces),
|
|
"long_traces": len([t for t in traces.values() if len(t) >= 2]),
|
|
}
|
|
|
|
return face_data
|
|
|
|
|
|
def analyze_traces(face_data: Dict) -> None:
|
|
"""
|
|
Analyze and print trace statistics
|
|
"""
|
|
traces = face_data.get("traces", {})
|
|
metadata = face_data.get("metadata", {})
|
|
|
|
print("\n" + "=" * 60)
|
|
print("Face Trace Analysis")
|
|
print("=" * 60)
|
|
|
|
print(f"\nTotal traces: {metadata.get('trace_stats', {}).get('total_traces', 0)}")
|
|
print(f"Long traces (>= 2 frames): {len(traces)}")
|
|
|
|
if not traces:
|
|
return
|
|
|
|
sorted_traces = sorted(traces.values(), key=lambda x: x["duration_frames"], reverse=True)
|
|
|
|
print("\n=== Top 10 Longest Traces ===")
|
|
for i, trace in enumerate(sorted_traces[:10]):
|
|
print(f"\nTrace {trace['trace_id']}:")
|
|
print(f" Frames: {trace['start_frame']} - {trace['end_frame']} ({trace['duration_frames']} frames)")
|
|
print(f" Duration: {trace['duration_seconds']:.2f} seconds")
|
|
print(f" Appearances: {trace['total_appearances']}")
|
|
print(f" Avg Confidence: {trace['avg_confidence']:.3f}")
|
|
|
|
# Pose Statistics
|
|
pose_stats = trace.get("pose_statistics", {})
|
|
print(f" Pose Distribution: {pose_stats.get('distribution', {})}")
|
|
print(f" Dominant Angle: {pose_stats.get('dominant_angle', 'unknown')}")
|
|
|
|
# Pose Transitions
|
|
transitions = trace.get("pose_transitions", [])
|
|
if transitions:
|
|
print(f" Pose Transitions: {len(transitions)} events")
|
|
for t in transitions[:3]: # 只显示前 3 个
|
|
print(f" - Frame {t['frame']}: {t['from_angle']} → {t['to_angle']}")
|
|
|
|
pose_stats = defaultdict(int)
|
|
for trace in traces.values():
|
|
for pose in trace["pose_angles"]:
|
|
pose_stats[pose] += 1
|
|
|
|
print("\n=== Pose Distribution in Traces ===")
|
|
for pose, count in sorted(pose_stats.items(), key=lambda x: x[1], reverse=True):
|
|
print(f" {pose}: {count}")
|
|
|
|
duration_distribution = defaultdict(int)
|
|
for trace in traces.values():
|
|
d = trace["duration_frames"]
|
|
if d <= 30:
|
|
duration_distribution["short (<= 30 frames)"] += 1
|
|
elif d <= 90:
|
|
duration_distribution["medium (31-90 frames)"] += 1
|
|
else:
|
|
duration_distribution["long (> 90 frames)"] += 1
|
|
|
|
print("\n=== Trace Duration Distribution ===")
|
|
for duration, count in sorted(duration_distribution.items()):
|
|
print(f" {duration}: {count}")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Track faces across frames")
|
|
parser.add_argument("--face-json", required=True, help="Path to face.json")
|
|
parser.add_argument("--output", help="Output path (default: face_traced.json)")
|
|
parser.add_argument("--iou-threshold", type=float, default=0.3, help="IoU threshold")
|
|
parser.add_argument("--similarity-threshold", type=float, default=0.7, help="Embedding similarity threshold")
|
|
parser.add_argument("--distance-threshold", type=float, default=100.0, help="Distance threshold")
|
|
parser.add_argument("--no-embedding", action="store_true", help="Disable embedding matching")
|
|
parser.add_argument("--cuts-json", help="Path to cut.json for scene-cut-aware tracking")
|
|
parser.add_argument("--analyze-only", action="store_true", help="Only analyze, don't output")
|
|
args = parser.parse_args()
|
|
|
|
# Load cut boundaries if provided
|
|
cut_boundaries = None
|
|
if args.cuts_json:
|
|
with open(args.cuts_json) as f:
|
|
cuts = json.load(f)
|
|
cut_boundaries = {s["start_frame"] for s in cuts.get("scenes", []) if s["start_frame"] > 0}
|
|
print(f" Cut boundaries loaded: {len(cut_boundaries)} cuts")
|
|
|
|
print("=" * 60)
|
|
print("Face Tracker")
|
|
print("=" * 60)
|
|
|
|
with open(args.face_json) as f:
|
|
face_data = json.load(f)
|
|
|
|
print(f"\nInput: {args.face_json}")
|
|
print(f"Frames: {len(face_data.get('frames', {}))}")
|
|
|
|
face_data = track_faces(
|
|
face_data,
|
|
iou_threshold=args.iou_threshold,
|
|
similarity_threshold=args.similarity_threshold,
|
|
distance_threshold=args.distance_threshold,
|
|
use_embedding=not args.no_embedding,
|
|
cut_boundaries=cut_boundaries,
|
|
)
|
|
|
|
analyze_traces(face_data)
|
|
|
|
if not args.analyze_only:
|
|
output_path = args.output or args.face_json.replace(".json", "_traced.json")
|
|
with open(output_path, "w") as f:
|
|
json.dump(face_data, f, indent=2)
|
|
print(f"\n✅ Output saved to: {output_path}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |