Files
momentry_core/scripts/analyze_video_faces.py
Warren e75c4d6f07 cleanup: remove dead code and duplicate docs
- Remove session-ses_2f27.md (161KB raw session log)
- Remove 49 ROOT_* duplicate files across REFERENCE/
- Remove 14 duplicate files between REFERENCE/ root and history/
- Remove asr_legacy.rs (dead code, replaced by asr.rs)
- Remove src/core/worker/ (duplicate JobWorker)
- Remove src/core/layers/ (empty directory)
- Remove 4 .bak files in src/
- Remove 7 dead private methods in worker/processor.rs
- Remove backup directory from git tracking
2026-05-04 01:31:21 +08:00

485 lines
17 KiB
Python

#!/usr/bin/env python3
"""
分析 sftpgo demo 用戶視頻中的人臉
"""
import cv2
import os
import sys
import json
import time
from datetime import datetime
import psycopg2
# 導入人臉識別處理器
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
try:
from face_recognition_processor import FaceRecognitionProcessor
except ImportError as e:
print(f"❌ 無法導入人臉識別處理器: {e}")
sys.exit(1)
class VideoFaceAnalyzer:
def __init__(self):
"""初始化分析器"""
self.processor = None
self.db_conn = None
self.output_dir = "/tmp/face_analysis_results"
# 創建輸出目錄
os.makedirs(self.output_dir, exist_ok=True)
def connect_database(self):
"""連接數據庫"""
try:
self.db_conn = psycopg2.connect(
host="localhost",
port=5432,
database="momentry",
user="accusys",
password="accusys",
)
print("✅ 數據庫連接成功")
return True
except Exception as e:
print(f"❌ 數據庫連接失敗: {e}")
return False
def load_face_processor(self, use_mps=True):
"""加載人臉識別處理器"""
try:
print("加載人臉識別處理器...")
self.processor = FaceRecognitionProcessor()
self.processor.load_models(use_mps=use_mps)
print("✅ 人臉識別處理器加載成功")
return True
except Exception as e:
print(f"❌ 人臉識別處理器加載失敗: {e}")
return False
def extract_video_frames(self, video_path, interval_seconds=10, max_frames=100):
"""從視頻中提取幀"""
print(f"從視頻提取幀: {video_path}")
if not os.path.exists(video_path):
print(f"❌ 視頻文件不存在: {video_path}")
return []
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"❌ 無法打開視頻文件: {video_path}")
return []
# 獲取視頻信息
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = total_frames / fps if fps > 0 else 0
print(f" 視頻信息: {duration:.1f}秒, {total_frames}幀, {fps:.1f}FPS")
frames = []
frame_interval = int(fps * interval_seconds) if fps > 0 else 30
for frame_idx in range(0, total_frames, frame_interval):
if len(frames) >= max_frames:
break
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
ret, frame = cap.read()
if ret:
timestamp = frame_idx / fps if fps > 0 else 0
frames.append(
{"frame_idx": frame_idx, "timestamp": timestamp, "image": frame}
)
cap.release()
print(f"✅ 提取了 {len(frames)} 個幀 (間隔: {interval_seconds}秒)")
return frames
def detect_faces_in_frames(self, frames, video_uuid, video_name):
"""在幀中檢測人臉"""
if not frames or not self.processor:
return []
print(f"{len(frames)} 個幀中檢測人臉...")
all_detections = []
for i, frame_data in enumerate(frames):
frame_idx = frame_data["frame_idx"]
timestamp = frame_data["timestamp"]
image = frame_data["image"]
print(f" 處理幀 {i + 1}/{len(frames)} (時間: {timestamp:.1f}秒)")
# 檢測人臉
detections = self.processor.detect_faces(image)
if detections:
print(f" ✅ 檢測到 {len(detections)} 個人臉")
for detection in detections:
detection_info = {
"video_uuid": video_uuid,
"video_name": video_name,
"frame_idx": frame_idx,
"timestamp": timestamp,
"x": detection["x"],
"y": detection["y"],
"width": detection["width"],
"height": detection["height"],
"confidence": float(detection["confidence"]),
"embedding": detection.get("embedding"),
"attributes": detection.get("attributes"),
"detected_at": datetime.now().isoformat(),
}
all_detections.append(detection_info)
# 在圖像上繪製邊界框
x = detection["x"]
y = detection["y"]
width = detection["width"]
height = detection["height"]
x1, y1 = int(x), int(y)
x2, y2 = int(x + width), int(y + height)
cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)
cv2.putText(
image,
f"Face: {detection['confidence']:.2f}",
(x1, y1 - 10),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
(0, 255, 0),
2,
)
# 保存帶有邊界框的幀
output_path = os.path.join(
self.output_dir, f"{video_uuid}_frame_{frame_idx:06d}.jpg"
)
cv2.imwrite(output_path, image)
return all_detections
def save_detections_to_db(self, detections):
"""將檢測結果保存到數據庫"""
if not detections or not self.db_conn:
return 0
print(f"{len(detections)} 個檢測結果保存到數據庫...")
cursor = self.db_conn.cursor()
saved_count = 0
for detection in detections:
try:
# 插入人臉檢測記錄
cursor.execute(
"""
INSERT INTO face_detections (
video_uuid, frame_number, timestamp_secs,
x, y, width, height, confidence,
embedding, attributes, created_at
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
""",
(
detection["video_uuid"],
detection["frame_idx"],
detection["timestamp"],
detection["x"],
detection["y"],
detection["width"],
detection["height"],
detection["confidence"],
json.dumps(detection["embedding"])
if detection["embedding"]
else None,
json.dumps(detection["attributes"])
if detection["attributes"]
else None,
detection["detected_at"],
),
)
saved_count += 1
except Exception as e:
print(f"❌ 保存檢測結果失敗: {e}")
continue
self.db_conn.commit()
cursor.close()
print(f"✅ 成功保存 {saved_count} 個檢測結果到數據庫")
return saved_count
def analyze_video(self, video_path, video_uuid, video_name):
"""分析單個視頻"""
print(f"\n{'=' * 60}")
print(f"分析視頻: {video_name}")
print(f"UUID: {video_uuid}")
print(f"路徑: {video_path}")
print(f"{'=' * 60}")
start_time = time.time()
# 提取幀
frames = self.extract_video_frames(
video_path, interval_seconds=30, max_frames=50
)
if not frames:
print("❌ 無法從視頻提取幀")
return False
# 檢測人臉
detections = self.detect_faces_in_frames(frames, video_uuid, video_name)
if not detections:
print("⚠️ 未在視頻中檢測到人臉")
# 仍然保存結果(空結果)
result = {
"video_uuid": video_uuid,
"video_name": video_name,
"total_frames": len(frames),
"faces_detected": 0,
"detections": [],
"analysis_time": time.time() - start_time,
}
else:
# 保存到數據庫
saved_count = self.save_detections_to_db(detections)
# 生成結果摘要
result = {
"video_uuid": video_uuid,
"video_name": video_name,
"total_frames": len(frames),
"faces_detected": len(detections),
"saved_to_db": saved_count,
"unique_faces": len(
set((d["x"], d["y"], d["width"], d["height"]) for d in detections)
),
"detections": detections[:10], # 只保存前10個檢測結果
"analysis_time": time.time() - start_time,
}
# 保存結果到 JSON 文件
result_file = os.path.join(self.output_dir, f"{video_uuid}_analysis.json")
with open(result_file, "w", encoding="utf-8") as f:
json.dump(result, f, indent=2, ensure_ascii=False)
print("\n分析完成:")
print(f" - 處理幀數: {len(frames)}")
print(f" - 檢測到人臉: {len(detections)}")
print(f" - 分析時間: {result['analysis_time']:.1f}")
print(f" - 結果文件: {result_file}")
return True
def generate_report(self, video_results):
"""生成分析報告"""
report_file = os.path.join(self.output_dir, "face_analysis_report.md")
with open(report_file, "w", encoding="utf-8") as f:
f.write("# 人臉分析報告\n\n")
f.write(f"生成時間: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
f.write("## 視頻分析摘要\n\n")
f.write("| 視頻名稱 | UUID | 處理幀數 | 檢測到人臉 | 分析時間 |\n")
f.write("|----------|------|----------|------------|----------|\n")
total_frames = 0
total_faces = 0
total_time = 0
for result in video_results:
f.write(f"| {result['video_name']} | {result['video_uuid']} | ")
f.write(f"{result['total_frames']} | {result['faces_detected']} | ")
f.write(f"{result['analysis_time']:.1f}秒 |\n")
total_frames += result["total_frames"]
total_faces += result["faces_detected"]
total_time += result["analysis_time"]
f.write(
f"| **總計** | - | **{total_frames}** | **{total_faces}** | **{total_time:.1f}秒** |\n\n"
)
f.write("## 詳細結果\n\n")
for result in video_results:
f.write(f"### {result['video_name']}\n\n")
f.write(f"- **UUID**: {result['video_uuid']}\n")
f.write(f"- **處理幀數**: {result['total_frames']}\n")
f.write(f"- **檢測到人臉**: {result['faces_detected']}\n")
if "unique_faces" in result:
f.write(f"- **獨特人臉**: {result['unique_faces']}\n")
f.write(f"- **分析時間**: {result['analysis_time']:.1f}\n")
f.write(f"- **結果文件**: `{result['video_uuid']}_analysis.json`\n\n")
if result["faces_detected"] > 0:
f.write("#### 檢測示例\n\n")
f.write("| 時間戳 | 位置 | 置信度 | 屬性 |\n")
f.write("|--------|------|--------|------|\n")
for i, detection in enumerate(
result.get("detections", [])[:5]
): # 只顯示前5個
timestamp = detection.get("timestamp", 0)
x = detection.get("x", 0)
y = detection.get("y", 0)
width = detection.get("width", 0)
height = detection.get("height", 0)
confidence = detection.get("confidence", 0)
attributes = detection.get("attributes", {})
f.write(f"| {timestamp:.1f}秒 | ({x},{y},{width},{height}) | ")
f.write(f"{confidence:.3f} | ")
if attributes:
attrs = []
if attributes.get("age"):
attrs.append(f"年齡: {attributes['age']}")
if attributes.get("gender"):
attrs.append(f"性別: {attributes['gender']}")
f.write(", ".join(attrs))
else:
f.write("-")
f.write(" |\n")
f.write("\n---\n\n")
f.write("## 輸出文件\n\n")
f.write("以下文件已生成:\n\n")
for filename in os.listdir(self.output_dir):
filepath = os.path.join(self.output_dir, filename)
if os.path.isfile(filepath):
size = os.path.getsize(filepath)
f.write(f"- `{filename}` ({size:,} bytes)\n")
print(f"\n📊 分析報告已生成: {report_file}")
return report_file
def cleanup(self):
"""清理資源"""
if self.db_conn:
self.db_conn.close()
print("✅ 數據庫連接已關閉")
def main():
"""主函數"""
print("=" * 60)
print("sftpgo demo 用戶視頻人臉分析")
print("=" * 60)
# 視頻文件路徑
demo_dir = "/Users/accusys/momentry/var/sftpgo/data/demo"
videos = [
{
"path": os.path.join(
demo_dir,
"ExaSAN PCIe series - Director Ou Yu-Zhi Shares His Experience.mp4",
),
"uuid": "9760d0820f0cf9a7",
"name": "ExaSAN PCIe series - Director Ou Yu-Zhi Shares His Experience.mp4",
},
{
"path": os.path.join(demo_dir, "Old_Time_Movie_Show_-_Charade_1963.HD.mov"),
"uuid": "384b0ff44aaaa1f1",
"name": "Old_Time_Movie_Show_-_Charade_1963.HD.mov",
},
]
# 初始化分析器
analyzer = VideoFaceAnalyzer()
try:
# 連接數據庫
if not analyzer.connect_database():
print("⚠️ 將在無數據庫連接模式下運行")
# 加載人臉識別處理器
if not analyzer.load_face_processor(use_mps=True):
print("❌ 無法加載人臉識別處理器")
return False
# 分析每個視頻
video_results = []
for video_info in videos:
if os.path.exists(video_info["path"]):
success = analyzer.analyze_video(
video_info["path"], video_info["uuid"], video_info["name"]
)
if success:
# 讀取結果文件
result_file = os.path.join(
analyzer.output_dir, f"{video_info['uuid']}_analysis.json"
)
if os.path.exists(result_file):
with open(result_file, "r", encoding="utf-8") as f:
result = json.load(f)
video_results.append(result)
else:
print(f"❌ 視頻文件不存在: {video_info['path']}")
# 生成報告
if video_results:
report_file = analyzer.generate_report(video_results)
print(f"\n{'=' * 60}")
print("分析完成!")
print(f"{'=' * 60}")
print(f"\n📁 輸出目錄: {analyzer.output_dir}")
print(f"📊 分析報告: {report_file}")
# 顯示摘要
total_frames = sum(r["total_frames"] for r in video_results)
total_faces = sum(r["faces_detected"] for r in video_results)
total_time = sum(r["analysis_time"] for r in video_results)
print("\n📈 分析摘要:")
print(f" - 總處理視頻: {len(video_results)}")
print(f" - 總處理幀數: {total_frames}")
print(f" - 總檢測人臉: {total_faces}")
print(f" - 總分析時間: {total_time:.1f}")
# 列出生成的文件
print("\n📄 生成的文件:")
for filename in sorted(os.listdir(analyzer.output_dir)):
filepath = os.path.join(analyzer.output_dir, filename)
if os.path.isfile(filepath):
size = os.path.getsize(filepath)
print(f" - {filename} ({size:,} bytes)")
return True
except Exception as e:
print(f"❌ 分析過程中發生錯誤: {e}")
import traceback
traceback.print_exc()
return False
finally:
analyzer.cleanup()
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)