Files
momentry_core/scripts/identity_matcher.py
Accusys 6851cb4734 feat: add identity_matcher.py for multi-angle face matching
Implements:
- match_faces_round_1: TMDb seeds → traces (TH=0.55)
- match_faces_round_2: Confirmed traces → pending (TH=0.55)
- match_faces_round_3_plus: Propagation (TH=0.50)
- cluster_strangers: Greedy merge unmatched traces (TH=0.40)
- multi_angle_match: max(cosine(seed, rep)) across 3 representatives
- cosine_similarity: Vector similarity calculation

Usage:
  python identity_matcher.py --file-uuid <uuid> --round 1
  python identity_matcher.py --file-uuid <uuid> --round 2 --confirmed-traces 1,2,3
  python identity_matcher.py --file-uuid <uuid> --round 1 --stranger

Output: JSON with suggestions {trace_id: {identity_id, uuid, name, score, suggested_by}}
2026-06-25 00:57:22 +08:00

410 lines
13 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
Identity Matcher - Multi-angle face matching against seeds
Flow:
1. Query Qdrant _faces for all traces in a file
2. Query Qdrant _seeds for all seeds (TMDb + manual + propagation)
3. Multi-angle matching: max(cosine(seed, rep)) across 3 representatives
4. Return suggestions with confidence scores
Thresholds:
- Round 1: 0.55 (TMDb seeds)
- Round 2: 0.55 (Propagation from confirmed traces)
- Round 3+: 0.50 (Propagation continues)
- Stranger clustering: 0.40
Usage:
python identity_matcher.py --file-uuid <uuid> --round 1
python identity_matcher.py --file-uuid <uuid> --round 2 --confirmed-traces 1,2,3
Output:
JSON with suggestions: {trace_id: {identity_id, identity_uuid, name, score, suggested_by}}
"""
import os
import sys
import json
import argparse
import numpy as np
from typing import Optional
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "utils"))
from qdrant_faces import (
get_trace_representatives,
get_seeds,
search_seeds,
get_trace_centroid,
)
TH_ROUND_1 = 0.55
TH_ROUND_2 = 0.55
TH_ROUND_3 = 0.50
TH_STRANGER = 0.40
def cosine_similarity(a: list, b: list) -> float:
"""Compute cosine similarity between two vectors"""
if len(a) != len(b) or len(a) == 0:
return 0.0
dot = sum(x * y for x, y in zip(a, b))
na = sum(x * x for x in a) ** 0.5
nb = sum(x * x for x in b) ** 0.5
if na == 0.0 or nb == 0.0:
return 0.0
return dot / (na * nb)
def multi_angle_match(seed_embedding: list, trace_reps: list) -> float:
"""Multi-angle matching: max(cosine(seed, rep))"""
if not trace_reps:
return 0.0
best_score = 0.0
for rep in trace_reps:
score = cosine_similarity(seed_embedding, rep["embedding"])
if score > best_score:
best_score = score
return best_score
def match_faces_round_1(file_uuid: str) -> dict:
"""Round 1: TMDb seeds → Traces (TH=0.55)
Returns:
{trace_id: {identity_id, identity_uuid, name, score, suggested_by: 'tmdb'}}
"""
traces = get_trace_representatives(file_uuid)
seeds = get_seeds(source="tmdb")
if not seeds:
print("[MATCH] No TMDb seeds available")
return {}
suggestions = {}
threshold = TH_ROUND_1
for trace_id, reps in traces.items():
best_match = None
best_score = 0.0
for seed in seeds:
seed_emb = seed.get("vector", [])
seed_payload = seed.get("payload", {})
score = multi_angle_match(seed_emb, reps)
if score >= threshold and score > best_score:
best_score = score
best_match = {
"identity_id": seed_payload.get("identity_id"),
"identity_uuid": seed_payload.get("identity_uuid"),
"name": seed_payload.get("name"),
"score": score,
"suggested_by": "tmdb",
}
if best_match:
suggestions[trace_id] = best_match
print(f"[MATCH] Round 1: {len(suggestions)}/{len(traces)} traces suggested (TH={threshold})")
return suggestions
def match_faces_round_2(
file_uuid: str,
confirmed_traces: list,
identity_map: dict,
) -> dict:
"""Round 2: Confirmed traces → Pending traces (TH=0.55)
Args:
file_uuid: Video file UUID
confirmed_traces: List of confirmed trace_ids
identity_map: {trace_id: {identity_id, identity_uuid, name}}
Returns:
{trace_id: {identity_id, identity_uuid, name, score, suggested_by: 'propagation'}}
"""
traces = get_trace_representatives(file_uuid)
pending_traces = set(traces.keys()) - set(confirmed_traces)
if not pending_traces:
print("[MATCH] Round 2: No pending traces")
return {}
seed_pool = {}
for trace_id in confirmed_traces:
if trace_id not in traces:
continue
identity_info = identity_map.get(trace_id)
if not identity_info:
continue
centroid = get_trace_centroid(file_uuid, trace_id)
if not centroid or all(v == 0.0 for v in centroid):
continue
identity_id = identity_info.get("identity_id")
if identity_id not in seed_pool:
seed_pool[identity_id] = {
"identity_id": identity_id,
"identity_uuid": identity_info.get("identity_uuid"),
"name": identity_info.get("name"),
"embeddings": [],
}
seed_pool[identity_id]["embeddings"].append(centroid)
if not seed_pool:
print("[MATCH] Round 2: No confirmed traces with embeddings")
return {}
suggestions = {}
threshold = TH_ROUND_2
for trace_id in pending_traces:
reps = traces.get(trace_id, [])
if not reps:
continue
best_match = None
best_score = 0.0
for identity_id, seed_data in seed_pool.items():
for seed_emb in seed_data["embeddings"]:
score = multi_angle_match(seed_emb, reps)
if score >= threshold and score > best_score:
best_score = score
best_match = {
"identity_id": seed_data["identity_id"],
"identity_uuid": seed_data["identity_uuid"],
"name": seed_data["name"],
"score": score,
"suggested_by": "propagation",
}
if best_match:
suggestions[trace_id] = best_match
print(f"[MATCH] Round 2: {len(suggestions)}/{len(pending_traces)} traces suggested (TH={threshold})")
return suggestions
def match_faces_round_3_plus(
file_uuid: str,
all_confirmed: dict,
prev_suggestions: dict,
round_num: int,
) -> dict:
"""Round 3+: Propagation continues (TH=0.50)
Args:
file_uuid: Video file UUID
all_confirmed: {trace_id: {identity_id, identity_uuid, name}} - all confirmed so far
prev_suggestions: {trace_id: {...}} - suggestions from previous round
round_num: Round number (3, 4, 5...)
Returns:
{trace_id: {identity_id, identity_uuid, name, score, suggested_by: 'propagation'}}
"""
traces = get_trace_representatives(file_uuid)
confirmed_traces = set(all_confirmed.keys())
prev_suggested_traces = set(prev_suggestions.keys())
pending_traces = set(traces.keys()) - confirmed_traces - prev_suggested_traces
if not pending_traces:
print(f"[MATCH] Round {round_num}: No pending traces")
return {}
seed_pool = {}
for trace_id, identity_info in all_confirmed.items():
if trace_id not in traces:
continue
centroid = get_trace_centroid(file_uuid, trace_id)
if not centroid or all(v == 0.0 for v in centroid):
continue
identity_id = identity_info.get("identity_id")
if identity_id not in seed_pool:
seed_pool[identity_id] = {
"identity_id": identity_id,
"identity_uuid": identity_info.get("identity_uuid"),
"name": identity_info.get("name"),
"embeddings": [],
}
seed_pool[identity_id]["embeddings"].append(centroid)
if not seed_pool:
print(f"[MATCH] Round {round_num}: No seeds available")
return {}
suggestions = {}
threshold = TH_ROUND_3
for trace_id in pending_traces:
reps = traces.get(trace_id, [])
if not reps:
continue
best_match = None
best_score = 0.0
for identity_id, seed_data in seed_pool.items():
for seed_emb in seed_data["embeddings"]:
score = multi_angle_match(seed_emb, reps)
if score >= threshold and score > best_score:
best_score = score
best_match = {
"identity_id": seed_data["identity_id"],
"identity_uuid": seed_data["identity_uuid"],
"name": seed_data["name"],
"score": score,
"suggested_by": "propagation",
}
if best_match:
suggestions[trace_id] = best_match
print(f"[MATCH] Round {round_num}: {len(suggestions)}/{len(pending_traces)} traces suggested (TH={threshold})")
return suggestions
def cluster_strangers(file_uuid: str, matched_traces: list) -> dict:
"""Stranger clustering: Greedy merge unmatched traces (TH=0.40)
Args:
file_uuid: Video file UUID
matched_traces: List of trace_ids that have identity suggestions
Returns:
{stranger_cluster_id: [trace_ids]}
"""
traces = get_trace_representatives(file_uuid)
unmatched_traces = set(traces.keys()) - set(matched_traces)
if not unmatched_traces:
print("[STRANGER] All traces matched")
return {}
clusters = []
threshold = TH_STRANGER
for trace_id in unmatched_traces:
reps = traces.get(trace_id, [])
if not reps:
continue
centroid = get_trace_centroid(file_uuid, trace_id)
if not centroid or all(v == 0.0 for v in centroid):
continue
best_cluster_idx = None
best_score = 0.0
for idx, cluster in enumerate(clusters):
cluster_centroid = compute_cluster_centroid(cluster, traces)
score = cosine_similarity(centroid, cluster_centroid)
if score >= threshold and score > best_score:
best_score = score
best_cluster_idx = idx
if best_cluster_idx is not None:
clusters[best_cluster_idx].append(trace_id)
else:
clusters.append([trace_id])
stranger_clusters = {}
for idx, trace_ids in enumerate(clusters):
stranger_clusters[idx + 1] = trace_ids
print(f"[STRANGER] {len(stranger_clusters)} clusters from {len(unmatched_traces)} unmatched traces (TH={threshold})")
return stranger_clusters
def compute_cluster_centroid(cluster: list, traces: dict) -> list:
"""Compute centroid embedding for a cluster of traces"""
all_embeddings = []
for trace_id in cluster:
reps = traces.get(trace_id, [])
for rep in reps:
all_embeddings.append(rep["embedding"])
if not all_embeddings:
return [0.0] * 512
centroid = [0.0] * 512
for emb in all_embeddings:
for i, v in enumerate(emb):
centroid[i] += v
for i in range(512):
centroid[i] /= len(all_embeddings)
return centroid
def main():
parser = argparse.ArgumentParser(description="Identity Matcher")
parser.add_argument("--file-uuid", required=True, help="Video file UUID")
parser.add_argument("--round", type=int, default=1, help="Round number (1, 2, 3+)")
parser.add_argument("--confirmed-traces", help="Comma-separated confirmed trace_ids (for Round 2+)")
parser.add_argument("--identity-map", help="JSON file with {trace_id: {identity_id, uuid, name}} (for Round 2+)")
parser.add_argument("--output", help="Output JSON file path")
parser.add_argument("--stranger", action="store_true", help="Also run stranger clustering")
args = parser.parse_args()
if args.round == 1:
suggestions = match_faces_round_1(args.file_uuid)
elif args.round == 2:
confirmed = []
identity_map = {}
if args.confirmed_traces:
confirmed = [int(x) for x in args.confirmed_traces.split(",")]
if args.identity_map:
with open(args.identity_map) as f:
identity_map = json.load(f)
suggestions = match_faces_round_2(args.file_uuid, confirmed, identity_map)
else:
all_confirmed = {}
prev_suggestions = {}
if args.identity_map:
with open(args.identity_map) as f:
all_confirmed = json.load(f)
suggestions = match_faces_round_3_plus(
args.file_uuid, all_confirmed, prev_suggestions, args.round
)
result = {
"file_uuid": args.file_uuid,
"round": args.round,
"suggestions": suggestions,
"total_traces": len(get_trace_representatives(args.file_uuid)),
"matched": len(suggestions),
}
if args.stranger:
matched_traces = list(suggestions.keys())
stranger_clusters = cluster_strangers(args.file_uuid, matched_traces)
result["stranger_clusters"] = stranger_clusters
output_json = json.dumps(result, indent=2, ensure_ascii=False)
if args.output:
with open(args.output, "w") as f:
f.write(output_json)
print(f"[MATCH] Output saved to {args.output}")
else:
print(output_json)
if __name__ == "__main__":
main()