- Remove session-ses_2f27.md (161KB raw session log) - Remove 49 ROOT_* duplicate files across REFERENCE/ - Remove 14 duplicate files between REFERENCE/ root and history/ - Remove asr_legacy.rs (dead code, replaced by asr.rs) - Remove src/core/worker/ (duplicate JobWorker) - Remove src/core/layers/ (empty directory) - Remove 4 .bak files in src/ - Remove 7 dead private methods in worker/processor.rs - Remove backup directory from git tracking
485 lines
17 KiB
Python
485 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
分析 sftpgo demo 用戶視頻中的人臉
|
|
"""
|
|
|
|
import cv2
|
|
import os
|
|
import sys
|
|
import json
|
|
import time
|
|
from datetime import datetime
|
|
import psycopg2
|
|
|
|
# 導入人臉識別處理器
|
|
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
|
try:
|
|
from face_recognition_processor import FaceRecognitionProcessor
|
|
except ImportError as e:
|
|
print(f"❌ 無法導入人臉識別處理器: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
class VideoFaceAnalyzer:
|
|
def __init__(self):
|
|
"""初始化分析器"""
|
|
self.processor = None
|
|
self.db_conn = None
|
|
self.output_dir = "/tmp/face_analysis_results"
|
|
|
|
# 創建輸出目錄
|
|
os.makedirs(self.output_dir, exist_ok=True)
|
|
|
|
def connect_database(self):
|
|
"""連接數據庫"""
|
|
try:
|
|
self.db_conn = psycopg2.connect(
|
|
host="localhost",
|
|
port=5432,
|
|
database="momentry",
|
|
user="accusys",
|
|
password="accusys",
|
|
)
|
|
print("✅ 數據庫連接成功")
|
|
return True
|
|
except Exception as e:
|
|
print(f"❌ 數據庫連接失敗: {e}")
|
|
return False
|
|
|
|
def load_face_processor(self, use_mps=True):
|
|
"""加載人臉識別處理器"""
|
|
try:
|
|
print("加載人臉識別處理器...")
|
|
self.processor = FaceRecognitionProcessor()
|
|
self.processor.load_models(use_mps=use_mps)
|
|
print("✅ 人臉識別處理器加載成功")
|
|
return True
|
|
except Exception as e:
|
|
print(f"❌ 人臉識別處理器加載失敗: {e}")
|
|
return False
|
|
|
|
def extract_video_frames(self, video_path, interval_seconds=10, max_frames=100):
|
|
"""從視頻中提取幀"""
|
|
print(f"從視頻提取幀: {video_path}")
|
|
|
|
if not os.path.exists(video_path):
|
|
print(f"❌ 視頻文件不存在: {video_path}")
|
|
return []
|
|
|
|
cap = cv2.VideoCapture(video_path)
|
|
if not cap.isOpened():
|
|
print(f"❌ 無法打開視頻文件: {video_path}")
|
|
return []
|
|
|
|
# 獲取視頻信息
|
|
fps = cap.get(cv2.CAP_PROP_FPS)
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
|
duration = total_frames / fps if fps > 0 else 0
|
|
|
|
print(f" 視頻信息: {duration:.1f}秒, {total_frames}幀, {fps:.1f}FPS")
|
|
|
|
frames = []
|
|
frame_interval = int(fps * interval_seconds) if fps > 0 else 30
|
|
|
|
for frame_idx in range(0, total_frames, frame_interval):
|
|
if len(frames) >= max_frames:
|
|
break
|
|
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
|
|
ret, frame = cap.read()
|
|
|
|
if ret:
|
|
timestamp = frame_idx / fps if fps > 0 else 0
|
|
frames.append(
|
|
{"frame_idx": frame_idx, "timestamp": timestamp, "image": frame}
|
|
)
|
|
|
|
cap.release()
|
|
print(f"✅ 提取了 {len(frames)} 個幀 (間隔: {interval_seconds}秒)")
|
|
return frames
|
|
|
|
def detect_faces_in_frames(self, frames, video_uuid, video_name):
|
|
"""在幀中檢測人臉"""
|
|
if not frames or not self.processor:
|
|
return []
|
|
|
|
print(f"在 {len(frames)} 個幀中檢測人臉...")
|
|
|
|
all_detections = []
|
|
|
|
for i, frame_data in enumerate(frames):
|
|
frame_idx = frame_data["frame_idx"]
|
|
timestamp = frame_data["timestamp"]
|
|
image = frame_data["image"]
|
|
|
|
print(f" 處理幀 {i + 1}/{len(frames)} (時間: {timestamp:.1f}秒)")
|
|
|
|
# 檢測人臉
|
|
detections = self.processor.detect_faces(image)
|
|
|
|
if detections:
|
|
print(f" ✅ 檢測到 {len(detections)} 個人臉")
|
|
|
|
for detection in detections:
|
|
detection_info = {
|
|
"video_uuid": video_uuid,
|
|
"video_name": video_name,
|
|
"frame_idx": frame_idx,
|
|
"timestamp": timestamp,
|
|
"x": detection["x"],
|
|
"y": detection["y"],
|
|
"width": detection["width"],
|
|
"height": detection["height"],
|
|
"confidence": float(detection["confidence"]),
|
|
"embedding": detection.get("embedding"),
|
|
"attributes": detection.get("attributes"),
|
|
"detected_at": datetime.now().isoformat(),
|
|
}
|
|
all_detections.append(detection_info)
|
|
|
|
# 在圖像上繪製邊界框
|
|
x = detection["x"]
|
|
y = detection["y"]
|
|
width = detection["width"]
|
|
height = detection["height"]
|
|
x1, y1 = int(x), int(y)
|
|
x2, y2 = int(x + width), int(y + height)
|
|
|
|
cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)
|
|
cv2.putText(
|
|
image,
|
|
f"Face: {detection['confidence']:.2f}",
|
|
(x1, y1 - 10),
|
|
cv2.FONT_HERSHEY_SIMPLEX,
|
|
0.5,
|
|
(0, 255, 0),
|
|
2,
|
|
)
|
|
|
|
# 保存帶有邊界框的幀
|
|
output_path = os.path.join(
|
|
self.output_dir, f"{video_uuid}_frame_{frame_idx:06d}.jpg"
|
|
)
|
|
cv2.imwrite(output_path, image)
|
|
|
|
return all_detections
|
|
|
|
def save_detections_to_db(self, detections):
|
|
"""將檢測結果保存到數據庫"""
|
|
if not detections or not self.db_conn:
|
|
return 0
|
|
|
|
print(f"將 {len(detections)} 個檢測結果保存到數據庫...")
|
|
|
|
cursor = self.db_conn.cursor()
|
|
saved_count = 0
|
|
|
|
for detection in detections:
|
|
try:
|
|
# 插入人臉檢測記錄
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO face_detections (
|
|
video_uuid, frame_number, timestamp_secs,
|
|
x, y, width, height, confidence,
|
|
embedding, attributes, created_at
|
|
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
RETURNING id
|
|
""",
|
|
(
|
|
detection["video_uuid"],
|
|
detection["frame_idx"],
|
|
detection["timestamp"],
|
|
detection["x"],
|
|
detection["y"],
|
|
detection["width"],
|
|
detection["height"],
|
|
detection["confidence"],
|
|
json.dumps(detection["embedding"])
|
|
if detection["embedding"]
|
|
else None,
|
|
json.dumps(detection["attributes"])
|
|
if detection["attributes"]
|
|
else None,
|
|
detection["detected_at"],
|
|
),
|
|
)
|
|
|
|
saved_count += 1
|
|
|
|
except Exception as e:
|
|
print(f"❌ 保存檢測結果失敗: {e}")
|
|
continue
|
|
|
|
self.db_conn.commit()
|
|
cursor.close()
|
|
|
|
print(f"✅ 成功保存 {saved_count} 個檢測結果到數據庫")
|
|
return saved_count
|
|
|
|
def analyze_video(self, video_path, video_uuid, video_name):
|
|
"""分析單個視頻"""
|
|
print(f"\n{'=' * 60}")
|
|
print(f"分析視頻: {video_name}")
|
|
print(f"UUID: {video_uuid}")
|
|
print(f"路徑: {video_path}")
|
|
print(f"{'=' * 60}")
|
|
|
|
start_time = time.time()
|
|
|
|
# 提取幀
|
|
frames = self.extract_video_frames(
|
|
video_path, interval_seconds=30, max_frames=50
|
|
)
|
|
|
|
if not frames:
|
|
print("❌ 無法從視頻提取幀")
|
|
return False
|
|
|
|
# 檢測人臉
|
|
detections = self.detect_faces_in_frames(frames, video_uuid, video_name)
|
|
|
|
if not detections:
|
|
print("⚠️ 未在視頻中檢測到人臉")
|
|
# 仍然保存結果(空結果)
|
|
result = {
|
|
"video_uuid": video_uuid,
|
|
"video_name": video_name,
|
|
"total_frames": len(frames),
|
|
"faces_detected": 0,
|
|
"detections": [],
|
|
"analysis_time": time.time() - start_time,
|
|
}
|
|
else:
|
|
# 保存到數據庫
|
|
saved_count = self.save_detections_to_db(detections)
|
|
|
|
# 生成結果摘要
|
|
result = {
|
|
"video_uuid": video_uuid,
|
|
"video_name": video_name,
|
|
"total_frames": len(frames),
|
|
"faces_detected": len(detections),
|
|
"saved_to_db": saved_count,
|
|
"unique_faces": len(
|
|
set((d["x"], d["y"], d["width"], d["height"]) for d in detections)
|
|
),
|
|
"detections": detections[:10], # 只保存前10個檢測結果
|
|
"analysis_time": time.time() - start_time,
|
|
}
|
|
|
|
# 保存結果到 JSON 文件
|
|
result_file = os.path.join(self.output_dir, f"{video_uuid}_analysis.json")
|
|
with open(result_file, "w", encoding="utf-8") as f:
|
|
json.dump(result, f, indent=2, ensure_ascii=False)
|
|
|
|
print("\n分析完成:")
|
|
print(f" - 處理幀數: {len(frames)}")
|
|
print(f" - 檢測到人臉: {len(detections)}")
|
|
print(f" - 分析時間: {result['analysis_time']:.1f}秒")
|
|
print(f" - 結果文件: {result_file}")
|
|
|
|
return True
|
|
|
|
def generate_report(self, video_results):
|
|
"""生成分析報告"""
|
|
report_file = os.path.join(self.output_dir, "face_analysis_report.md")
|
|
|
|
with open(report_file, "w", encoding="utf-8") as f:
|
|
f.write("# 人臉分析報告\n\n")
|
|
f.write(f"生成時間: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
|
|
|
|
f.write("## 視頻分析摘要\n\n")
|
|
f.write("| 視頻名稱 | UUID | 處理幀數 | 檢測到人臉 | 分析時間 |\n")
|
|
f.write("|----------|------|----------|------------|----------|\n")
|
|
|
|
total_frames = 0
|
|
total_faces = 0
|
|
total_time = 0
|
|
|
|
for result in video_results:
|
|
f.write(f"| {result['video_name']} | {result['video_uuid']} | ")
|
|
f.write(f"{result['total_frames']} | {result['faces_detected']} | ")
|
|
f.write(f"{result['analysis_time']:.1f}秒 |\n")
|
|
|
|
total_frames += result["total_frames"]
|
|
total_faces += result["faces_detected"]
|
|
total_time += result["analysis_time"]
|
|
|
|
f.write(
|
|
f"| **總計** | - | **{total_frames}** | **{total_faces}** | **{total_time:.1f}秒** |\n\n"
|
|
)
|
|
|
|
f.write("## 詳細結果\n\n")
|
|
|
|
for result in video_results:
|
|
f.write(f"### {result['video_name']}\n\n")
|
|
f.write(f"- **UUID**: {result['video_uuid']}\n")
|
|
f.write(f"- **處理幀數**: {result['total_frames']}\n")
|
|
f.write(f"- **檢測到人臉**: {result['faces_detected']}\n")
|
|
|
|
if "unique_faces" in result:
|
|
f.write(f"- **獨特人臉**: {result['unique_faces']}\n")
|
|
|
|
f.write(f"- **分析時間**: {result['analysis_time']:.1f}秒\n")
|
|
f.write(f"- **結果文件**: `{result['video_uuid']}_analysis.json`\n\n")
|
|
|
|
if result["faces_detected"] > 0:
|
|
f.write("#### 檢測示例\n\n")
|
|
f.write("| 時間戳 | 位置 | 置信度 | 屬性 |\n")
|
|
f.write("|--------|------|--------|------|\n")
|
|
|
|
for i, detection in enumerate(
|
|
result.get("detections", [])[:5]
|
|
): # 只顯示前5個
|
|
timestamp = detection.get("timestamp", 0)
|
|
x = detection.get("x", 0)
|
|
y = detection.get("y", 0)
|
|
width = detection.get("width", 0)
|
|
height = detection.get("height", 0)
|
|
confidence = detection.get("confidence", 0)
|
|
attributes = detection.get("attributes", {})
|
|
|
|
f.write(f"| {timestamp:.1f}秒 | ({x},{y},{width},{height}) | ")
|
|
f.write(f"{confidence:.3f} | ")
|
|
|
|
if attributes:
|
|
attrs = []
|
|
if attributes.get("age"):
|
|
attrs.append(f"年齡: {attributes['age']}")
|
|
if attributes.get("gender"):
|
|
attrs.append(f"性別: {attributes['gender']}")
|
|
f.write(", ".join(attrs))
|
|
else:
|
|
f.write("-")
|
|
|
|
f.write(" |\n")
|
|
|
|
f.write("\n---\n\n")
|
|
|
|
f.write("## 輸出文件\n\n")
|
|
f.write("以下文件已生成:\n\n")
|
|
|
|
for filename in os.listdir(self.output_dir):
|
|
filepath = os.path.join(self.output_dir, filename)
|
|
if os.path.isfile(filepath):
|
|
size = os.path.getsize(filepath)
|
|
f.write(f"- `{filename}` ({size:,} bytes)\n")
|
|
|
|
print(f"\n📊 分析報告已生成: {report_file}")
|
|
return report_file
|
|
|
|
def cleanup(self):
|
|
"""清理資源"""
|
|
if self.db_conn:
|
|
self.db_conn.close()
|
|
print("✅ 數據庫連接已關閉")
|
|
|
|
|
|
def main():
|
|
"""主函數"""
|
|
print("=" * 60)
|
|
print("sftpgo demo 用戶視頻人臉分析")
|
|
print("=" * 60)
|
|
|
|
# 視頻文件路徑
|
|
demo_dir = "/Users/accusys/momentry/var/sftpgo/data/demo"
|
|
|
|
videos = [
|
|
{
|
|
"path": os.path.join(
|
|
demo_dir,
|
|
"ExaSAN PCIe series - Director Ou Yu-Zhi Shares His Experience.mp4",
|
|
),
|
|
"uuid": "9760d0820f0cf9a7",
|
|
"name": "ExaSAN PCIe series - Director Ou Yu-Zhi Shares His Experience.mp4",
|
|
},
|
|
{
|
|
"path": os.path.join(demo_dir, "Old_Time_Movie_Show_-_Charade_1963.HD.mov"),
|
|
"uuid": "384b0ff44aaaa1f1",
|
|
"name": "Old_Time_Movie_Show_-_Charade_1963.HD.mov",
|
|
},
|
|
]
|
|
|
|
# 初始化分析器
|
|
analyzer = VideoFaceAnalyzer()
|
|
|
|
try:
|
|
# 連接數據庫
|
|
if not analyzer.connect_database():
|
|
print("⚠️ 將在無數據庫連接模式下運行")
|
|
|
|
# 加載人臉識別處理器
|
|
if not analyzer.load_face_processor(use_mps=True):
|
|
print("❌ 無法加載人臉識別處理器")
|
|
return False
|
|
|
|
# 分析每個視頻
|
|
video_results = []
|
|
|
|
for video_info in videos:
|
|
if os.path.exists(video_info["path"]):
|
|
success = analyzer.analyze_video(
|
|
video_info["path"], video_info["uuid"], video_info["name"]
|
|
)
|
|
|
|
if success:
|
|
# 讀取結果文件
|
|
result_file = os.path.join(
|
|
analyzer.output_dir, f"{video_info['uuid']}_analysis.json"
|
|
)
|
|
|
|
if os.path.exists(result_file):
|
|
with open(result_file, "r", encoding="utf-8") as f:
|
|
result = json.load(f)
|
|
video_results.append(result)
|
|
else:
|
|
print(f"❌ 視頻文件不存在: {video_info['path']}")
|
|
|
|
# 生成報告
|
|
if video_results:
|
|
report_file = analyzer.generate_report(video_results)
|
|
|
|
print(f"\n{'=' * 60}")
|
|
print("分析完成!")
|
|
print(f"{'=' * 60}")
|
|
|
|
print(f"\n📁 輸出目錄: {analyzer.output_dir}")
|
|
print(f"📊 分析報告: {report_file}")
|
|
|
|
# 顯示摘要
|
|
total_frames = sum(r["total_frames"] for r in video_results)
|
|
total_faces = sum(r["faces_detected"] for r in video_results)
|
|
total_time = sum(r["analysis_time"] for r in video_results)
|
|
|
|
print("\n📈 分析摘要:")
|
|
print(f" - 總處理視頻: {len(video_results)}")
|
|
print(f" - 總處理幀數: {total_frames}")
|
|
print(f" - 總檢測人臉: {total_faces}")
|
|
print(f" - 總分析時間: {total_time:.1f}秒")
|
|
|
|
# 列出生成的文件
|
|
print("\n📄 生成的文件:")
|
|
for filename in sorted(os.listdir(analyzer.output_dir)):
|
|
filepath = os.path.join(analyzer.output_dir, filename)
|
|
if os.path.isfile(filepath):
|
|
size = os.path.getsize(filepath)
|
|
print(f" - {filename} ({size:,} bytes)")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"❌ 分析過程中發生錯誤: {e}")
|
|
import traceback
|
|
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
finally:
|
|
analyzer.cleanup()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
success = main()
|
|
sys.exit(0 if success else 1)
|