#!/opt/homebrew/bin/python3.11 """ Identity Agent - Multi-Evidence Identity Inference Core Logic: 1. Time Overlap Matching (Speaker vs Person frames) 2. Embedding Similarity Calculation 3. Multi-Evidence Fusion 4. LLM Inference for Ambiguous Cases 5. Identity Assignment Usage: python3 scripts/identity_agent.py --video-uuid --analyze python3 scripts/identity_agent.py --video-uuid --suggest """ import sys import json import argparse import os import numpy as np from typing import Dict, List, Optional, Tuple from sklearn.metrics.pairwise import cosine_similarity sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from redis_publisher import RedisPublisher class IdentityAgent: """ Identity Agent for Multi-Evidence Identity Inference Attributes: video_uuid (str): Video UUID output_dir (str): Output directory fps (float): Video frame rate auto_merge_threshold (float): Auto merge threshold (default: 0.8) llm_threshold (float): LLM inference threshold (default: 0.5) face_similarity_threshold (float): Face similarity threshold (default: 0.3) use_llm (bool): Use LLM for ambiguous cases model (str): LLM model name """ def __init__( self, video_uuid: str, output_dir: str = None, auto_merge_threshold: float = 0.8, llm_threshold: float = 0.5, face_similarity_threshold: float = 0.3, use_llm: bool = True, model: str = "gemma4", ): self.video_uuid = video_uuid self.output_dir = output_dir or os.getenv( "MOMENTRY_OUTPUT_DIR", "/Users/accusys/momentry/output_dev" ) self.auto_merge_threshold = auto_merge_threshold self.llm_threshold = llm_threshold self.face_similarity_threshold = face_similarity_threshold self.use_llm = use_llm self.model = model self.fps = 23.976 # Default FPS self.face_data = None self.asrx_data = None self.persons = [] self.speakers = [] self.identities = [] self.publisher = RedisPublisher(video_uuid) if video_uuid else None def load_data(self) -> bool: """Load face clustered and ASRX data from files""" video_dir = os.path.join(self.output_dir, self.video_uuid) face_clustered_path = os.path.join( video_dir, f"{self.video_uuid}.face_clustered.json" ) asrx_path = os.path.join(video_dir, f"{self.video_uuid}.asrx.json") probe_path = os.path.join(video_dir, f"{self.video_uuid}.probe.json") if not os.path.exists(face_clustered_path): print(f"Error: Face clustered data not found: {face_clustered_path}") return False self.face_data = self._load_json(face_clustered_path) self.asrx_data = self._load_json(asrx_path) if os.path.exists(asrx_path) else None if os.path.exists(probe_path): probe_data = self._load_json(probe_path) self.fps = probe_data.get("fps", 23.976) self.persons = self._extract_persons() self.speakers = self._extract_speakers() print(f"Loaded {len(self.persons)} persons, {len(self.speakers)} speakers") return True def _load_json(self, path: str) -> Dict: """Load JSON file""" with open(path, "r", encoding="utf-8") as f: return json.load(f) def _extract_persons(self) -> List[Dict]: """Extract persons from face clustered data""" persons = [] if not self.face_data: return persons if "clusters" in self.face_data: for cluster in self.face_data["clusters"]: person_id = cluster.get("person_id", f"Person_{len(persons) + 1}") frames = cluster.get("frames", []) avg_embedding = cluster.get("avg_embedding", None) persons.append({ "person_id": person_id, "frames": frames, "frame_count": len(frames), "avg_embedding": avg_embedding, "timestamps": [f / self.fps for f in frames], }) return persons def _extract_speakers(self) -> List[Dict]: """Extract speakers from ASRX data""" speakers = [] if not self.asrx_data: return speakers if "segments" in self.asrx_data: speaker_segments_map = {} for segment in self.asrx_data["segments"]: speaker_id = segment.get("speaker", "SPEAKER_01") start = segment.get("start", 0.0) end = segment.get("end", 0.0) if speaker_id not in speaker_segments_map: speaker_segments_map[speaker_id] = [] speaker_segments_map[speaker_id].append({"start": start, "end": end}) for speaker_id, segments in speaker_segments_map.items(): total_duration = sum(s["end"] - s["start"] for s in segments) speakers.append({ "speaker_id": speaker_id, "segments": segments, "total_duration": total_duration, }) return speakers def calculate_speaker_person_overlap( self, person: Dict, speaker: Dict ) -> Tuple[int, float]: """ Calculate overlap between Person and Speaker Returns: Tuple of (overlap_frames, overlap_ratio) """ overlap_frames = 0 for frame in person["frames"]: frame_time = frame / self.fps for segment in speaker["segments"]: if segment["start"] <= frame_time <= segment["end"]: overlap_frames += 1 break overlap_ratio = overlap_frames / person["frame_count"] if person["frame_count"] > 0 else 0 return overlap_frames, overlap_ratio def calculate_person_similarity( self, person1: Dict, person2: Dict ) -> Optional[float]: """ Calculate cosine similarity between two Person embeddings Returns: Similarity score (0-1) or None if embeddings not available """ if not person1.get("avg_embedding") or not person2.get("avg_embedding"): return None emb1 = np.array(person1["avg_embedding"]).reshape(1, -1) emb2 = np.array(person2["avg_embedding"]).reshape(1, -1) similarity = cosine_similarity(emb1, emb2)[0][0] return similarity def fuse_evidence( self, face_similarity: Optional[float], speaker_overlap: float, time_overlap: float, frame_ratio: float, ) -> float: """ Fuse multiple evidence sources into a single confidence score Args: face_similarity: Cosine similarity between face embeddings (0-1) speaker_overlap: Speaker-Person overlap ratio (0-1) time_overlap: Temporal overlap ratio (0-1) frame_ratio: Person's frame count ratio in video (0-1) Returns: Fused confidence score (0-1) """ weights = { "face": 0.4, "speaker": 0.3, "time": 0.2, "frame": 0.1, } face_score = face_similarity if face_similarity is not None else 0.5 confidence = ( weights["face"] * face_score + weights["speaker"] * speaker_overlap + weights["time"] * time_overlap + weights["frame"] * frame_ratio ) return confidence def analyze(self) -> Dict: """ Analyze video identity Returns: Identity analysis result """ if not self.load_data(): return {"success": False, "error": "Failed to load data"} if self.publisher: self.publisher.info("identity", "IDENTITY_ANALYZE_START") identities = [] for i, person in enumerate(self.persons): identity_id = f"identity_{i + 1}" speaker_overlaps = [] max_overlap = 0.0 max_speaker_id = None for speaker in self.speakers: overlap_frames, overlap_ratio = self.calculate_speaker_person_overlap( person, speaker ) if overlap_ratio > 0.3: speaker_overlaps.append({ "speaker_id": speaker["speaker_id"], "overlap_frames": overlap_frames, "overlap_ratio": overlap_ratio, }) if overlap_ratio > max_overlap: max_overlap = overlap_ratio max_speaker_id = speaker["speaker_id"] frame_ratio = person["frame_count"] / max(p["frame_count"] for p in self.persons) confidence = self.fuse_evidence( face_similarity=None, speaker_overlap=max_overlap, time_overlap=max_overlap, frame_ratio=frame_ratio, ) identity = { "identity_id": identity_id, "person_ids": [person["person_id"]], "speaker_ids": [s["speaker_id"] for s in speaker_overlaps], "confidence": confidence, "evidence": { "face_similarity": None, "speaker_overlap": max_overlap, "time_overlap": max_overlap, "frame_ratio": frame_ratio, }, "reasoning": f"Person {person['person_id']} has {max_overlap:.0%} overlap with {max_speaker_id or 'no speaker'}", } identities.append(identity) if self.publisher: self.publisher.info("identity", f"IDENTITY_ANALYZE_COMPLETE:{len(identities)}") return { "success": True, "video_uuid": self.video_uuid, "identities": identities, "processing_status": { "status": "completed", "persons_analyzed": len(self.persons), "identities_created": len(identities), "merges_suggested": 0, }, } def suggest_merges(self) -> Dict: """ Suggest Identity merges Returns: Merge suggestions """ analyze_result = self.analyze() if not analyze_result.get("success"): return analyze_result identities = analyze_result["identities"] merge_suggestions = [] for identity in identities: if len(identity["person_ids"]) >= 1 and len(identity["speaker_ids"]) >= 1: confidence = identity["confidence"] if confidence > self.auto_merge_threshold: action = "auto_apply" elif confidence > self.llm_threshold: action = "review_needed" else: continue reasons = [ f"Shared speaker overlap: {identity['evidence']['speaker_overlap']:.0%}", f"Confidence: {confidence:.2f}", ] merge_suggestions.append({ "target_person_id": identity["person_ids"][0], "source_person_ids": identity["person_ids"][1:] if len(identity["person_ids"]) > 1 else [], "confidence": confidence, "reasons": reasons, "action": action, }) return { "success": True, "video_uuid": self.video_uuid, "merge_suggestions": merge_suggestions, "naming_suggestions": [], } def call_llm(self, prompt: str) -> Dict: """ Call LLM for inference Args: prompt: LLM prompt Returns: LLM response """ import requests ollama_url = "http://localhost:11434/api/generate" body = { "model": self.model, "prompt": prompt, "stream": False, } try: response = requests.post(ollama_url, json=body, timeout=30) result = response.json() llm_output = result.get("response", "") try: parsed = json.loads(llm_output) return parsed except json.JSONDecodeError: return { "decision": "keep_separate", "confidence": 0.5, "reasoning": llm_output, } except Exception as e: print(f"LLM call failed: {e}") return { "decision": "keep_separate", "confidence": 0.5, "reasoning": f"LLM call failed: {e}", } def llm_identity_inference(self, evidence: Dict) -> Dict: """ Use LLM to infer identity for ambiguous cases Args: evidence: Multi-evidence data Returns: LLM inference result """ confidence = evidence.get("confidence", 0.5) if confidence > self.auto_merge_threshold: return { "decision": "merge", "confidence": confidence, "reasoning": f"High confidence ({confidence:.2f}) - auto merge", } if confidence < self.llm_threshold: return { "decision": "keep_separate", "confidence": confidence, "reasoning": f"Low confidence ({confidence:.2f}) - keep separate", } if not self.use_llm: return { "decision": "review_needed", "confidence": confidence, "reasoning": "Medium confidence - manual review required", } prompt = f""" You are an identity analyst for a video analysis system. Given the following evidence: - Face similarity: {evidence.get('face_similarity', 'N/A')} - Speaker overlap: {evidence.get('speaker_overlap', 0):.2f} - Time overlap: {evidence.get('time_overlap', 0):.2f} - Frame ratio: {evidence.get('frame_ratio', 0):.2f} - Person: {evidence.get('person_id', 'Unknown')} ({evidence.get('frame_count', 0)} frames) - Shared speaker: {evidence.get('shared_speaker', 'None')} Should this person be merged with other persons sharing the same speaker? Provide: 1. Decision: "merge" or "keep_separate" 2. Confidence: 0.0-1.0 3. Reasoning: 1-2 sentences explaining your decision Output in JSON format only: {{ "decision": "merge" or "keep_separate", "confidence": 0.85, "reasoning": "..." }} """ return self.call_llm(prompt) def main(): parser = argparse.ArgumentParser(description="Identity Agent - Multi-Evidence Identity Inference") parser.add_argument("--video-uuid", "-u", help="Video UUID", required=True) parser.add_argument("--output-dir", "-o", help="Output directory", default=None) parser.add_argument( "--analyze", "-a", help="Analyze video identity", action="store_true" ) parser.add_argument( "--suggest", "-s", help="Suggest Identity merges", action="store_true" ) parser.add_argument( "--auto-merge-threshold", "-t", help="Auto merge threshold", type=float, default=0.8, ) parser.add_argument( "--llm-threshold", "-l", help="LLM inference threshold", type=float, default=0.5, ) parser.add_argument( "--use-llm", help="Use LLM for ambiguous cases", action="store_true" ) parser.add_argument("--model", "-m", help="LLM model", default="gemma4") args = parser.parse_args() agent = IdentityAgent( video_uuid=args.video_uuid, output_dir=args.output_dir, auto_merge_threshold=args.auto_merge_threshold, llm_threshold=args.llm_threshold, use_llm=args.use_llm, model=args.model, ) if args.analyze: result = agent.analyze() print(json.dumps(result, indent=2)) if args.suggest: result = agent.suggest_merges() print(json.dumps(result, indent=2)) if not args.analyze and not args.suggest: print("Please specify --analyze or --suggest") if __name__ == "__main__": main()