- Remove session-ses_2f27.md (161KB raw session log) - Remove 49 ROOT_* duplicate files across REFERENCE/ - Remove 14 duplicate files between REFERENCE/ root and history/ - Remove asr_legacy.rs (dead code, replaced by asr.rs) - Remove src/core/worker/ (duplicate JobWorker) - Remove src/core/layers/ (empty directory) - Remove 4 .bak files in src/ - Remove 7 dead private methods in worker/processor.rs - Remove backup directory from git tracking
519 lines
17 KiB
Python
519 lines
17 KiB
Python
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
Identity Agent - Multi-Evidence Identity Inference
|
|
|
|
Core Logic:
|
|
1. Time Overlap Matching (Speaker vs Person frames)
|
|
2. Embedding Similarity Calculation
|
|
3. Multi-Evidence Fusion
|
|
4. LLM Inference for Ambiguous Cases
|
|
5. Identity Assignment
|
|
|
|
Usage:
|
|
python3 scripts/identity_agent.py --video-uuid <uuid> --analyze
|
|
python3 scripts/identity_agent.py --video-uuid <uuid> --suggest
|
|
"""
|
|
|
|
import sys
|
|
import json
|
|
import argparse
|
|
import os
|
|
import numpy as np
|
|
from typing import Dict, List, Optional, Tuple
|
|
from sklearn.metrics.pairwise import cosine_similarity
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
from redis_publisher import RedisPublisher
|
|
|
|
|
|
class IdentityAgent:
|
|
"""
|
|
Identity Agent for Multi-Evidence Identity Inference
|
|
|
|
Attributes:
|
|
video_uuid (str): Video UUID
|
|
output_dir (str): Output directory
|
|
fps (float): Video frame rate
|
|
auto_merge_threshold (float): Auto merge threshold (default: 0.8)
|
|
llm_threshold (float): LLM inference threshold (default: 0.5)
|
|
face_similarity_threshold (float): Face similarity threshold (default: 0.3)
|
|
use_llm (bool): Use LLM for ambiguous cases
|
|
model (str): LLM model name
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
video_uuid: str,
|
|
output_dir: str = None,
|
|
auto_merge_threshold: float = 0.8,
|
|
llm_threshold: float = 0.5,
|
|
face_similarity_threshold: float = 0.3,
|
|
use_llm: bool = True,
|
|
model: str = "gemma4",
|
|
):
|
|
self.video_uuid = video_uuid
|
|
self.output_dir = output_dir or os.getenv(
|
|
"MOMENTRY_OUTPUT_DIR", "/Users/accusys/momentry/output_dev"
|
|
)
|
|
self.auto_merge_threshold = auto_merge_threshold
|
|
self.llm_threshold = llm_threshold
|
|
self.face_similarity_threshold = face_similarity_threshold
|
|
self.use_llm = use_llm
|
|
self.model = model
|
|
|
|
self.fps = 23.976 # Default FPS
|
|
self.face_data = None
|
|
self.asrx_data = None
|
|
self.persons = []
|
|
self.speakers = []
|
|
self.identities = []
|
|
|
|
self.publisher = RedisPublisher(video_uuid) if video_uuid else None
|
|
|
|
def load_data(self) -> bool:
|
|
"""Load face clustered and ASRX data from files"""
|
|
video_dir = os.path.join(self.output_dir, self.video_uuid)
|
|
|
|
face_clustered_path = os.path.join(
|
|
video_dir, f"{self.video_uuid}.face_clustered.json"
|
|
)
|
|
asrx_path = os.path.join(video_dir, f"{self.video_uuid}.asrx.json")
|
|
probe_path = os.path.join(video_dir, f"{self.video_uuid}.probe.json")
|
|
|
|
if not os.path.exists(face_clustered_path):
|
|
print(f"Error: Face clustered data not found: {face_clustered_path}")
|
|
return False
|
|
|
|
self.face_data = self._load_json(face_clustered_path)
|
|
self.asrx_data = self._load_json(asrx_path) if os.path.exists(asrx_path) else None
|
|
|
|
if os.path.exists(probe_path):
|
|
probe_data = self._load_json(probe_path)
|
|
self.fps = probe_data.get("fps", 23.976)
|
|
|
|
self.persons = self._extract_persons()
|
|
self.speakers = self._extract_speakers()
|
|
|
|
print(f"Loaded {len(self.persons)} persons, {len(self.speakers)} speakers")
|
|
return True
|
|
|
|
def _load_json(self, path: str) -> Dict:
|
|
"""Load JSON file"""
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
|
|
def _extract_persons(self) -> List[Dict]:
|
|
"""Extract persons from face clustered data"""
|
|
persons = []
|
|
|
|
if not self.face_data:
|
|
return persons
|
|
|
|
if "clusters" in self.face_data:
|
|
for cluster in self.face_data["clusters"]:
|
|
person_id = cluster.get("person_id", f"Person_{len(persons) + 1}")
|
|
frames = cluster.get("frames", [])
|
|
avg_embedding = cluster.get("avg_embedding", None)
|
|
|
|
persons.append({
|
|
"person_id": person_id,
|
|
"frames": frames,
|
|
"frame_count": len(frames),
|
|
"avg_embedding": avg_embedding,
|
|
"timestamps": [f / self.fps for f in frames],
|
|
})
|
|
|
|
return persons
|
|
|
|
def _extract_speakers(self) -> List[Dict]:
|
|
"""Extract speakers from ASRX data"""
|
|
speakers = []
|
|
|
|
if not self.asrx_data:
|
|
return speakers
|
|
|
|
if "segments" in self.asrx_data:
|
|
speaker_segments_map = {}
|
|
|
|
for segment in self.asrx_data["segments"]:
|
|
speaker_id = segment.get("speaker", "SPEAKER_01")
|
|
start = segment.get("start", 0.0)
|
|
end = segment.get("end", 0.0)
|
|
|
|
if speaker_id not in speaker_segments_map:
|
|
speaker_segments_map[speaker_id] = []
|
|
|
|
speaker_segments_map[speaker_id].append({"start": start, "end": end})
|
|
|
|
for speaker_id, segments in speaker_segments_map.items():
|
|
total_duration = sum(s["end"] - s["start"] for s in segments)
|
|
|
|
speakers.append({
|
|
"speaker_id": speaker_id,
|
|
"segments": segments,
|
|
"total_duration": total_duration,
|
|
})
|
|
|
|
return speakers
|
|
|
|
def calculate_speaker_person_overlap(
|
|
self, person: Dict, speaker: Dict
|
|
) -> Tuple[int, float]:
|
|
"""
|
|
Calculate overlap between Person and Speaker
|
|
|
|
Returns:
|
|
Tuple of (overlap_frames, overlap_ratio)
|
|
"""
|
|
overlap_frames = 0
|
|
|
|
for frame in person["frames"]:
|
|
frame_time = frame / self.fps
|
|
|
|
for segment in speaker["segments"]:
|
|
if segment["start"] <= frame_time <= segment["end"]:
|
|
overlap_frames += 1
|
|
break
|
|
|
|
overlap_ratio = overlap_frames / person["frame_count"] if person["frame_count"] > 0 else 0
|
|
|
|
return overlap_frames, overlap_ratio
|
|
|
|
def calculate_person_similarity(
|
|
self, person1: Dict, person2: Dict
|
|
) -> Optional[float]:
|
|
"""
|
|
Calculate cosine similarity between two Person embeddings
|
|
|
|
Returns:
|
|
Similarity score (0-1) or None if embeddings not available
|
|
"""
|
|
if not person1.get("avg_embedding") or not person2.get("avg_embedding"):
|
|
return None
|
|
|
|
emb1 = np.array(person1["avg_embedding"]).reshape(1, -1)
|
|
emb2 = np.array(person2["avg_embedding"]).reshape(1, -1)
|
|
|
|
similarity = cosine_similarity(emb1, emb2)[0][0]
|
|
return similarity
|
|
|
|
def fuse_evidence(
|
|
self,
|
|
face_similarity: Optional[float],
|
|
speaker_overlap: float,
|
|
time_overlap: float,
|
|
frame_ratio: float,
|
|
) -> float:
|
|
"""
|
|
Fuse multiple evidence sources into a single confidence score
|
|
|
|
Args:
|
|
face_similarity: Cosine similarity between face embeddings (0-1)
|
|
speaker_overlap: Speaker-Person overlap ratio (0-1)
|
|
time_overlap: Temporal overlap ratio (0-1)
|
|
frame_ratio: Person's frame count ratio in video (0-1)
|
|
|
|
Returns:
|
|
Fused confidence score (0-1)
|
|
"""
|
|
weights = {
|
|
"face": 0.4,
|
|
"speaker": 0.3,
|
|
"time": 0.2,
|
|
"frame": 0.1,
|
|
}
|
|
|
|
face_score = face_similarity if face_similarity is not None else 0.5
|
|
|
|
confidence = (
|
|
weights["face"] * face_score
|
|
+ weights["speaker"] * speaker_overlap
|
|
+ weights["time"] * time_overlap
|
|
+ weights["frame"] * frame_ratio
|
|
)
|
|
|
|
return confidence
|
|
|
|
def analyze(self) -> Dict:
|
|
"""
|
|
Analyze video identity
|
|
|
|
Returns:
|
|
Identity analysis result
|
|
"""
|
|
if not self.load_data():
|
|
return {"success": False, "error": "Failed to load data"}
|
|
|
|
if self.publisher:
|
|
self.publisher.info("identity", "IDENTITY_ANALYZE_START")
|
|
|
|
identities = []
|
|
|
|
for i, person in enumerate(self.persons):
|
|
identity_id = f"identity_{i + 1}"
|
|
|
|
speaker_overlaps = []
|
|
max_overlap = 0.0
|
|
max_speaker_id = None
|
|
|
|
for speaker in self.speakers:
|
|
overlap_frames, overlap_ratio = self.calculate_speaker_person_overlap(
|
|
person, speaker
|
|
)
|
|
|
|
if overlap_ratio > 0.3:
|
|
speaker_overlaps.append({
|
|
"speaker_id": speaker["speaker_id"],
|
|
"overlap_frames": overlap_frames,
|
|
"overlap_ratio": overlap_ratio,
|
|
})
|
|
|
|
if overlap_ratio > max_overlap:
|
|
max_overlap = overlap_ratio
|
|
max_speaker_id = speaker["speaker_id"]
|
|
|
|
frame_ratio = person["frame_count"] / max(p["frame_count"] for p in self.persons)
|
|
|
|
confidence = self.fuse_evidence(
|
|
face_similarity=None,
|
|
speaker_overlap=max_overlap,
|
|
time_overlap=max_overlap,
|
|
frame_ratio=frame_ratio,
|
|
)
|
|
|
|
identity = {
|
|
"identity_id": identity_id,
|
|
"person_ids": [person["person_id"]],
|
|
"speaker_ids": [s["speaker_id"] for s in speaker_overlaps],
|
|
"confidence": confidence,
|
|
"evidence": {
|
|
"face_similarity": None,
|
|
"speaker_overlap": max_overlap,
|
|
"time_overlap": max_overlap,
|
|
"frame_ratio": frame_ratio,
|
|
},
|
|
"reasoning": f"Person {person['person_id']} has {max_overlap:.0%} overlap with {max_speaker_id or 'no speaker'}",
|
|
}
|
|
|
|
identities.append(identity)
|
|
|
|
if self.publisher:
|
|
self.publisher.info("identity", f"IDENTITY_ANALYZE_COMPLETE:{len(identities)}")
|
|
|
|
return {
|
|
"success": True,
|
|
"video_uuid": self.video_uuid,
|
|
"identities": identities,
|
|
"processing_status": {
|
|
"status": "completed",
|
|
"persons_analyzed": len(self.persons),
|
|
"identities_created": len(identities),
|
|
"merges_suggested": 0,
|
|
},
|
|
}
|
|
|
|
def suggest_merges(self) -> Dict:
|
|
"""
|
|
Suggest Identity merges
|
|
|
|
Returns:
|
|
Merge suggestions
|
|
"""
|
|
analyze_result = self.analyze()
|
|
|
|
if not analyze_result.get("success"):
|
|
return analyze_result
|
|
|
|
identities = analyze_result["identities"]
|
|
merge_suggestions = []
|
|
|
|
for identity in identities:
|
|
if len(identity["person_ids"]) >= 1 and len(identity["speaker_ids"]) >= 1:
|
|
confidence = identity["confidence"]
|
|
|
|
if confidence > self.auto_merge_threshold:
|
|
action = "auto_apply"
|
|
elif confidence > self.llm_threshold:
|
|
action = "review_needed"
|
|
else:
|
|
continue
|
|
|
|
reasons = [
|
|
f"Shared speaker overlap: {identity['evidence']['speaker_overlap']:.0%}",
|
|
f"Confidence: {confidence:.2f}",
|
|
]
|
|
|
|
merge_suggestions.append({
|
|
"target_person_id": identity["person_ids"][0],
|
|
"source_person_ids": identity["person_ids"][1:] if len(identity["person_ids"]) > 1 else [],
|
|
"confidence": confidence,
|
|
"reasons": reasons,
|
|
"action": action,
|
|
})
|
|
|
|
return {
|
|
"success": True,
|
|
"video_uuid": self.video_uuid,
|
|
"merge_suggestions": merge_suggestions,
|
|
"naming_suggestions": [],
|
|
}
|
|
|
|
def call_llm(self, prompt: str) -> Dict:
|
|
"""
|
|
Call LLM for inference
|
|
|
|
Args:
|
|
prompt: LLM prompt
|
|
|
|
Returns:
|
|
LLM response
|
|
"""
|
|
import requests
|
|
|
|
ollama_url = "http://localhost:11434/api/generate"
|
|
|
|
body = {
|
|
"model": self.model,
|
|
"prompt": prompt,
|
|
"stream": False,
|
|
}
|
|
|
|
try:
|
|
response = requests.post(ollama_url, json=body, timeout=30)
|
|
result = response.json()
|
|
|
|
llm_output = result.get("response", "")
|
|
|
|
try:
|
|
parsed = json.loads(llm_output)
|
|
return parsed
|
|
except json.JSONDecodeError:
|
|
return {
|
|
"decision": "keep_separate",
|
|
"confidence": 0.5,
|
|
"reasoning": llm_output,
|
|
}
|
|
except Exception as e:
|
|
print(f"LLM call failed: {e}")
|
|
return {
|
|
"decision": "keep_separate",
|
|
"confidence": 0.5,
|
|
"reasoning": f"LLM call failed: {e}",
|
|
}
|
|
|
|
def llm_identity_inference(self, evidence: Dict) -> Dict:
|
|
"""
|
|
Use LLM to infer identity for ambiguous cases
|
|
|
|
Args:
|
|
evidence: Multi-evidence data
|
|
|
|
Returns:
|
|
LLM inference result
|
|
"""
|
|
confidence = evidence.get("confidence", 0.5)
|
|
|
|
if confidence > self.auto_merge_threshold:
|
|
return {
|
|
"decision": "merge",
|
|
"confidence": confidence,
|
|
"reasoning": f"High confidence ({confidence:.2f}) - auto merge",
|
|
}
|
|
|
|
if confidence < self.llm_threshold:
|
|
return {
|
|
"decision": "keep_separate",
|
|
"confidence": confidence,
|
|
"reasoning": f"Low confidence ({confidence:.2f}) - keep separate",
|
|
}
|
|
|
|
if not self.use_llm:
|
|
return {
|
|
"decision": "review_needed",
|
|
"confidence": confidence,
|
|
"reasoning": "Medium confidence - manual review required",
|
|
}
|
|
|
|
prompt = f"""
|
|
You are an identity analyst for a video analysis system.
|
|
|
|
Given the following evidence:
|
|
- Face similarity: {evidence.get('face_similarity', 'N/A')}
|
|
- Speaker overlap: {evidence.get('speaker_overlap', 0):.2f}
|
|
- Time overlap: {evidence.get('time_overlap', 0):.2f}
|
|
- Frame ratio: {evidence.get('frame_ratio', 0):.2f}
|
|
- Person: {evidence.get('person_id', 'Unknown')} ({evidence.get('frame_count', 0)} frames)
|
|
- Shared speaker: {evidence.get('shared_speaker', 'None')}
|
|
|
|
Should this person be merged with other persons sharing the same speaker?
|
|
|
|
Provide:
|
|
1. Decision: "merge" or "keep_separate"
|
|
2. Confidence: 0.0-1.0
|
|
3. Reasoning: 1-2 sentences explaining your decision
|
|
|
|
Output in JSON format only:
|
|
{{
|
|
"decision": "merge" or "keep_separate",
|
|
"confidence": 0.85,
|
|
"reasoning": "..."
|
|
}}
|
|
"""
|
|
|
|
return self.call_llm(prompt)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Identity Agent - Multi-Evidence Identity Inference")
|
|
parser.add_argument("--video-uuid", "-u", help="Video UUID", required=True)
|
|
parser.add_argument("--output-dir", "-o", help="Output directory", default=None)
|
|
parser.add_argument(
|
|
"--analyze", "-a", help="Analyze video identity", action="store_true"
|
|
)
|
|
parser.add_argument(
|
|
"--suggest", "-s", help="Suggest Identity merges", action="store_true"
|
|
)
|
|
parser.add_argument(
|
|
"--auto-merge-threshold",
|
|
"-t",
|
|
help="Auto merge threshold",
|
|
type=float,
|
|
default=0.8,
|
|
)
|
|
parser.add_argument(
|
|
"--llm-threshold",
|
|
"-l",
|
|
help="LLM inference threshold",
|
|
type=float,
|
|
default=0.5,
|
|
)
|
|
parser.add_argument(
|
|
"--use-llm", help="Use LLM for ambiguous cases", action="store_true"
|
|
)
|
|
parser.add_argument("--model", "-m", help="LLM model", default="gemma4")
|
|
|
|
args = parser.parse_args()
|
|
|
|
agent = IdentityAgent(
|
|
video_uuid=args.video_uuid,
|
|
output_dir=args.output_dir,
|
|
auto_merge_threshold=args.auto_merge_threshold,
|
|
llm_threshold=args.llm_threshold,
|
|
use_llm=args.use_llm,
|
|
model=args.model,
|
|
)
|
|
|
|
if args.analyze:
|
|
result = agent.analyze()
|
|
print(json.dumps(result, indent=2))
|
|
|
|
if args.suggest:
|
|
result = agent.suggest_merges()
|
|
print(json.dumps(result, indent=2))
|
|
|
|
if not args.analyze and not args.suggest:
|
|
print("Please specify --analyze or --suggest")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |