#!/usr/bin/env python3 """ 分析 sftpgo demo 用戶視頻中的人臉 """ import cv2 import os import sys import json import time from datetime import datetime import psycopg2 # 導入人臉識別處理器 sys.path.append(os.path.dirname(os.path.abspath(__file__))) try: from face_recognition_processor import FaceRecognitionProcessor except ImportError as e: print(f"❌ 無法導入人臉識別處理器: {e}") sys.exit(1) class VideoFaceAnalyzer: def __init__(self): """初始化分析器""" self.processor = None self.db_conn = None self.output_dir = "/tmp/face_analysis_results" # 創建輸出目錄 os.makedirs(self.output_dir, exist_ok=True) def connect_database(self): """連接數據庫""" try: self.db_conn = psycopg2.connect( host="localhost", port=5432, database="momentry", user="accusys", password="accusys", ) print("✅ 數據庫連接成功") return True except Exception as e: print(f"❌ 數據庫連接失敗: {e}") return False def load_face_processor(self, use_mps=True): """加載人臉識別處理器""" try: print("加載人臉識別處理器...") self.processor = FaceRecognitionProcessor() self.processor.load_models(use_mps=use_mps) print("✅ 人臉識別處理器加載成功") return True except Exception as e: print(f"❌ 人臉識別處理器加載失敗: {e}") return False def extract_video_frames(self, video_path, interval_seconds=10, max_frames=100): """從視頻中提取幀""" print(f"從視頻提取幀: {video_path}") if not os.path.exists(video_path): print(f"❌ 視頻文件不存在: {video_path}") return [] cap = cv2.VideoCapture(video_path) if not cap.isOpened(): print(f"❌ 無法打開視頻文件: {video_path}") return [] # 獲取視頻信息 fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) duration = total_frames / fps if fps > 0 else 0 print(f" 視頻信息: {duration:.1f}秒, {total_frames}幀, {fps:.1f}FPS") frames = [] frame_interval = int(fps * interval_seconds) if fps > 0 else 30 for frame_idx in range(0, total_frames, frame_interval): if len(frames) >= max_frames: break cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) ret, frame = cap.read() if ret: timestamp = frame_idx / fps if fps > 0 else 0 frames.append( {"frame_idx": frame_idx, "timestamp": timestamp, "image": frame} ) cap.release() print(f"✅ 提取了 {len(frames)} 個幀 (間隔: {interval_seconds}秒)") return frames def detect_faces_in_frames(self, frames, video_uuid, video_name): """在幀中檢測人臉""" if not frames or not self.processor: return [] print(f"在 {len(frames)} 個幀中檢測人臉...") all_detections = [] for i, frame_data in enumerate(frames): frame_idx = frame_data["frame_idx"] timestamp = frame_data["timestamp"] image = frame_data["image"] print(f" 處理幀 {i + 1}/{len(frames)} (時間: {timestamp:.1f}秒)") # 檢測人臉 detections = self.processor.detect_faces(image) if detections: print(f" ✅ 檢測到 {len(detections)} 個人臉") for detection in detections: detection_info = { "video_uuid": video_uuid, "video_name": video_name, "frame_idx": frame_idx, "timestamp": timestamp, "x": detection["x"], "y": detection["y"], "width": detection["width"], "height": detection["height"], "confidence": float(detection["confidence"]), "embedding": detection.get("embedding"), "attributes": detection.get("attributes"), "detected_at": datetime.now().isoformat(), } all_detections.append(detection_info) # 在圖像上繪製邊界框 x = detection["x"] y = detection["y"] width = detection["width"] height = detection["height"] x1, y1 = int(x), int(y) x2, y2 = int(x + width), int(y + height) cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2) cv2.putText( image, f"Face: {detection['confidence']:.2f}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2, ) # 保存帶有邊界框的幀 output_path = os.path.join( self.output_dir, f"{video_uuid}_frame_{frame_idx:06d}.jpg" ) cv2.imwrite(output_path, image) return all_detections def save_detections_to_db(self, detections): """將檢測結果保存到數據庫""" if not detections or not self.db_conn: return 0 print(f"將 {len(detections)} 個檢測結果保存到數據庫...") cursor = self.db_conn.cursor() saved_count = 0 for detection in detections: try: # 插入人臉檢測記錄 cursor.execute( """ INSERT INTO face_detections ( video_uuid, frame_number, timestamp_secs, x, y, width, height, confidence, embedding, attributes, created_at ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id """, ( detection["video_uuid"], detection["frame_idx"], detection["timestamp"], detection["x"], detection["y"], detection["width"], detection["height"], detection["confidence"], json.dumps(detection["embedding"]) if detection["embedding"] else None, json.dumps(detection["attributes"]) if detection["attributes"] else None, detection["detected_at"], ), ) saved_count += 1 except Exception as e: print(f"❌ 保存檢測結果失敗: {e}") continue self.db_conn.commit() cursor.close() print(f"✅ 成功保存 {saved_count} 個檢測結果到數據庫") return saved_count def analyze_video(self, video_path, video_uuid, video_name): """分析單個視頻""" print(f"\n{'=' * 60}") print(f"分析視頻: {video_name}") print(f"UUID: {video_uuid}") print(f"路徑: {video_path}") print(f"{'=' * 60}") start_time = time.time() # 提取幀 frames = self.extract_video_frames( video_path, interval_seconds=30, max_frames=50 ) if not frames: print("❌ 無法從視頻提取幀") return False # 檢測人臉 detections = self.detect_faces_in_frames(frames, video_uuid, video_name) if not detections: print("⚠️ 未在視頻中檢測到人臉") # 仍然保存結果(空結果) result = { "video_uuid": video_uuid, "video_name": video_name, "total_frames": len(frames), "faces_detected": 0, "detections": [], "analysis_time": time.time() - start_time, } else: # 保存到數據庫 saved_count = self.save_detections_to_db(detections) # 生成結果摘要 result = { "video_uuid": video_uuid, "video_name": video_name, "total_frames": len(frames), "faces_detected": len(detections), "saved_to_db": saved_count, "unique_faces": len( set((d["x"], d["y"], d["width"], d["height"]) for d in detections) ), "detections": detections[:10], # 只保存前10個檢測結果 "analysis_time": time.time() - start_time, } # 保存結果到 JSON 文件 result_file = os.path.join(self.output_dir, f"{video_uuid}_analysis.json") with open(result_file, "w", encoding="utf-8") as f: json.dump(result, f, indent=2, ensure_ascii=False) print("\n分析完成:") print(f" - 處理幀數: {len(frames)}") print(f" - 檢測到人臉: {len(detections)}") print(f" - 分析時間: {result['analysis_time']:.1f}秒") print(f" - 結果文件: {result_file}") return True def generate_report(self, video_results): """生成分析報告""" report_file = os.path.join(self.output_dir, "face_analysis_report.md") with open(report_file, "w", encoding="utf-8") as f: f.write("# 人臉分析報告\n\n") f.write(f"生成時間: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") f.write("## 視頻分析摘要\n\n") f.write("| 視頻名稱 | UUID | 處理幀數 | 檢測到人臉 | 分析時間 |\n") f.write("|----------|------|----------|------------|----------|\n") total_frames = 0 total_faces = 0 total_time = 0 for result in video_results: f.write(f"| {result['video_name']} | {result['video_uuid']} | ") f.write(f"{result['total_frames']} | {result['faces_detected']} | ") f.write(f"{result['analysis_time']:.1f}秒 |\n") total_frames += result["total_frames"] total_faces += result["faces_detected"] total_time += result["analysis_time"] f.write( f"| **總計** | - | **{total_frames}** | **{total_faces}** | **{total_time:.1f}秒** |\n\n" ) f.write("## 詳細結果\n\n") for result in video_results: f.write(f"### {result['video_name']}\n\n") f.write(f"- **UUID**: {result['video_uuid']}\n") f.write(f"- **處理幀數**: {result['total_frames']}\n") f.write(f"- **檢測到人臉**: {result['faces_detected']}\n") if "unique_faces" in result: f.write(f"- **獨特人臉**: {result['unique_faces']}\n") f.write(f"- **分析時間**: {result['analysis_time']:.1f}秒\n") f.write(f"- **結果文件**: `{result['video_uuid']}_analysis.json`\n\n") if result["faces_detected"] > 0: f.write("#### 檢測示例\n\n") f.write("| 時間戳 | 位置 | 置信度 | 屬性 |\n") f.write("|--------|------|--------|------|\n") for i, detection in enumerate( result.get("detections", [])[:5] ): # 只顯示前5個 timestamp = detection.get("timestamp", 0) x = detection.get("x", 0) y = detection.get("y", 0) width = detection.get("width", 0) height = detection.get("height", 0) confidence = detection.get("confidence", 0) attributes = detection.get("attributes", {}) f.write(f"| {timestamp:.1f}秒 | ({x},{y},{width},{height}) | ") f.write(f"{confidence:.3f} | ") if attributes: attrs = [] if attributes.get("age"): attrs.append(f"年齡: {attributes['age']}") if attributes.get("gender"): attrs.append(f"性別: {attributes['gender']}") f.write(", ".join(attrs)) else: f.write("-") f.write(" |\n") f.write("\n---\n\n") f.write("## 輸出文件\n\n") f.write("以下文件已生成:\n\n") for filename in os.listdir(self.output_dir): filepath = os.path.join(self.output_dir, filename) if os.path.isfile(filepath): size = os.path.getsize(filepath) f.write(f"- `{filename}` ({size:,} bytes)\n") print(f"\n📊 分析報告已生成: {report_file}") return report_file def cleanup(self): """清理資源""" if self.db_conn: self.db_conn.close() print("✅ 數據庫連接已關閉") def main(): """主函數""" print("=" * 60) print("sftpgo demo 用戶視頻人臉分析") print("=" * 60) # 視頻文件路徑 demo_dir = "/Users/accusys/momentry/var/sftpgo/data/demo" videos = [ { "path": os.path.join( demo_dir, "ExaSAN PCIe series - Director Ou Yu-Zhi Shares His Experience.mp4", ), "uuid": "9760d0820f0cf9a7", "name": "ExaSAN PCIe series - Director Ou Yu-Zhi Shares His Experience.mp4", }, { "path": os.path.join(demo_dir, "Old_Time_Movie_Show_-_Charade_1963.HD.mov"), "uuid": "384b0ff44aaaa1f1", "name": "Old_Time_Movie_Show_-_Charade_1963.HD.mov", }, ] # 初始化分析器 analyzer = VideoFaceAnalyzer() try: # 連接數據庫 if not analyzer.connect_database(): print("⚠️ 將在無數據庫連接模式下運行") # 加載人臉識別處理器 if not analyzer.load_face_processor(use_mps=True): print("❌ 無法加載人臉識別處理器") return False # 分析每個視頻 video_results = [] for video_info in videos: if os.path.exists(video_info["path"]): success = analyzer.analyze_video( video_info["path"], video_info["uuid"], video_info["name"] ) if success: # 讀取結果文件 result_file = os.path.join( analyzer.output_dir, f"{video_info['uuid']}_analysis.json" ) if os.path.exists(result_file): with open(result_file, "r", encoding="utf-8") as f: result = json.load(f) video_results.append(result) else: print(f"❌ 視頻文件不存在: {video_info['path']}") # 生成報告 if video_results: report_file = analyzer.generate_report(video_results) print(f"\n{'=' * 60}") print("分析完成!") print(f"{'=' * 60}") print(f"\n📁 輸出目錄: {analyzer.output_dir}") print(f"📊 分析報告: {report_file}") # 顯示摘要 total_frames = sum(r["total_frames"] for r in video_results) total_faces = sum(r["faces_detected"] for r in video_results) total_time = sum(r["analysis_time"] for r in video_results) print("\n📈 分析摘要:") print(f" - 總處理視頻: {len(video_results)}") print(f" - 總處理幀數: {total_frames}") print(f" - 總檢測人臉: {total_faces}") print(f" - 總分析時間: {total_time:.1f}秒") # 列出生成的文件 print("\n📄 生成的文件:") for filename in sorted(os.listdir(analyzer.output_dir)): filepath = os.path.join(analyzer.output_dir, filename) if os.path.isfile(filepath): size = os.path.getsize(filepath) print(f" - {filename} ({size:,} bytes)") return True except Exception as e: print(f"❌ 分析過程中發生錯誤: {e}") import traceback traceback.print_exc() return False finally: analyzer.cleanup() if __name__ == "__main__": success = main() sys.exit(0 if success else 1)