#!/opt/homebrew/bin/python3.11 """ Face + ASRX 整合處理器 將人臉檢測與說話人識別整合,識別「誰在說話」 """ import sys import json import argparse from pathlib import Path from datetime import datetime def load_json(path): """Load JSON file""" with open(path) as f: return json.load(f) def match_face_with_speaker(face_data, asrx_data, time_threshold=1.0): """ Match faces with speakers based on timestamp proximity Args: face_data: Face detection results asrx_data: ASRX (speaker diarization) results time_threshold: Maximum time difference to consider a match (seconds) Returns: Integrated results with face + speaker information """ integrated_segments = [] # Extract faces with timestamps face_frames = [] for frame_info in face_data.get("frames", []): timestamp = frame_info.get("timestamp", 0) for face in frame_info.get("faces", []): face_frames.append( { "timestamp": timestamp, "x": face.get("x"), "y": face.get("y"), "width": face.get("width"), "height": face.get("height"), "confidence": face.get("confidence", 0), } ) # Match each ASRX segment with nearest face for segment in asrx_data.get("segments", []): start_time = segment.get("start", 0) end_time = segment.get("end", 0) mid_time = (start_time + end_time) / 2 # Find closest face within time threshold matched_face = None min_time_diff = float("inf") for face in face_frames: time_diff = abs(face["timestamp"] - mid_time) if time_diff < min_time_diff and time_diff <= time_threshold: min_time_diff = time_diff matched_face = face # Create integrated segment integrated_segment = { "start": start_time, "end": end_time, "text": segment.get("text", ""), "speaker_id": segment.get("speaker_id"), "face_detected": matched_face is not None, "face": matched_face, "time_diff": min_time_diff if matched_face else None, } integrated_segments.append(integrated_segment) return integrated_segments def generate_statistics(integrated_segments, face_data): """Generate statistics about the integrated data""" total_segments = len(integrated_segments) segments_with_face = sum(1 for s in integrated_segments if s["face_detected"]) segments_without_face = total_segments - segments_with_face # Speaker statistics speakers = {} for seg in integrated_segments: speaker = seg.get("speaker_id") if speaker: if speaker not in speakers: speakers[speaker] = { "speaker_id": speaker, "segment_count": 0, "total_duration": 0, "with_face": 0, } speakers[speaker]["segment_count"] += 1 speakers[speaker]["total_duration"] += seg["end"] - seg["start"] if seg["face_detected"]: speakers[speaker]["with_face"] += 1 return { "total_segments": total_segments, "segments_with_face": segments_with_face, "segments_without_face": segments_without_face, "face_match_rate": segments_with_face / total_segments if total_segments > 0 else 0, "speakers": list(speakers.values()), "total_faces_detected": len(face_data.get("frames", [])), } def integrate_face_asrx(face_path, asrx_path, output_path, time_threshold=1.0): """ Integrate face detection and ASRX results Args: face_path: Path to face detection JSON asrx_path: Path to ASRX JSON output_path: Path to save integrated results time_threshold: Time threshold for matching (seconds) """ # Load data print(f"[Face-ASRX] Loading face data: {face_path}") face_data = load_json(face_path) print(f"[Face-ASRX] Loading ASRX data: {asrx_path}") asrx_data = load_json(asrx_path) # Check if ASRX has data if not asrx_data.get("segments"): print("[Face-ASRX] Warning: ASRX has no segments, creating empty output") output = { "integration_time": datetime.now().isoformat(), "face_data": face_data, "asrx_data": asrx_data, "integrated_segments": [], "stats": { "total_segments": 0, "segments_with_face": 0, "face_match_rate": 0, "note": "ASRX has no segments", }, } with open(output_path, "w") as f: json.dump(output, f, indent=2) return # Match faces with speakers print(f"[Face-ASRX] Matching faces with speakers (threshold: {time_threshold}s)") integrated_segments = match_face_with_speaker(face_data, asrx_data, time_threshold) # Generate statistics print("[Face-ASRX] Generating statistics") stats = generate_statistics(integrated_segments, face_data) # Create output output = { "integration_time": datetime.now().isoformat(), "face_source": face_path, "asrx_source": asrx_path, "time_threshold": time_threshold, "face_data": face_data, "asrx_data": asrx_data, "integrated_segments": integrated_segments, "stats": stats, } # Save results print(f"[Face-ASRX] Saving results to: {output_path}") with open(output_path, "w") as f: json.dump(output, f, indent=2, ensure_ascii=False) # Print summary print("\n=== Face-ASRX Integration Summary ===") print(f"Total segments: {stats['total_segments']}") print(f"Segments with face: {stats['segments_with_face']}") print(f"Segments without face: {stats['segments_without_face']}") print(f"Face match rate: {stats['face_match_rate'] * 100:.1f}%") print(f"Total speakers: {len(stats['speakers'])}") for speaker in stats["speakers"]: print(f"\n Speaker {speaker['speaker_id']}:") print(f" Segments: {speaker['segment_count']}") print(f" Duration: {speaker['total_duration']:.1f}s") print( f" With face: {speaker['with_face']} ({speaker['with_face'] / speaker['segment_count'] * 100:.0f}%)" ) print("\n[Face-ASRX] Integration complete!") def main(): parser = argparse.ArgumentParser( description="Integrate Face Detection with ASRX Speaker Diarization" ) parser.add_argument("face_json", help="Path to face detection JSON") parser.add_argument("asrx_json", help="Path to ASRX JSON") parser.add_argument("output_path", help="Path to save integrated results") parser.add_argument( "--threshold", "-t", type=float, default=1.0, help="Time threshold for matching face with speaker (seconds, default: 1.0)", ) args = parser.parse_args() # Check if files exist if not Path(args.face_json).exists(): print(f"Error: Face JSON not found: {args.face_json}") sys.exit(1) if not Path(args.asrx_json).exists(): print(f"Error: ASRX JSON not found: {args.asrx_json}") sys.exit(1) integrate_face_asrx( args.face_json, args.asrx_json, args.output_path, args.threshold ) if __name__ == "__main__": main()