Files
momentry_core/scripts/update_person_demographics.py
Warren e75c4d6f07 cleanup: remove dead code and duplicate docs
- Remove session-ses_2f27.md (161KB raw session log)
- Remove 49 ROOT_* duplicate files across REFERENCE/
- Remove 14 duplicate files between REFERENCE/ root and history/
- Remove asr_legacy.rs (dead code, replaced by asr.rs)
- Remove src/core/worker/ (duplicate JobWorker)
- Remove src/core/layers/ (empty directory)
- Remove 4 .bak files in src/
- Remove 7 dead private methods in worker/processor.rs
- Remove backup directory from git tracking
2026-05-04 01:31:21 +08:00

123 lines
3.3 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
Update person demographics (Age, Gender) using InsightFace.
This script scans the representative face of each person and updates the DB.
"""
import cv2
import psycopg2
from insightface.app import FaceAnalysis
# Configuration
DB_CONFIG = {"host": "localhost", "user": "accusys", "dbname": "momentry"}
VIDEO_PATH = "output/384b0ff44aaaa1f1/384b0ff44aaaa1f1.mp4"
def get_face_app():
app = FaceAnalysis(name="buffalo_l", providers=["CPUExecutionProvider"])
app.prepare(ctx_id=0, det_size=(640, 640))
return app
def get_person_frames(conn):
"""Get one frame timestamp for each person."""
cur = conn.cursor()
# Get the first appearance time for each person to save processing time
cur.execute("""
SELECT person_id, MIN(start_time) as start_time
FROM dev.person_appearances
GROUP BY person_id
""")
return cur.fetchall()
def get_frame_at_time(video_path, time_sec):
"""Extract a single frame from video."""
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return None
# InsightFace expects BGR (OpenCV default), but we need to seek correctly
# cv2 CAP_PROP_POS_MSEC is sometimes inaccurate, better to use FPS if known
# But for a "representative" frame, seeking by ms is usually "good enough"
cap.set(cv2.CAP_PROP_POS_MSEC, time_sec * 1000)
ret, frame = cap.read()
cap.release()
if ret:
return frame
return None
def analyze_face(app, frame):
"""Run InsightFace to get age/gender."""
faces = app.get(frame)
if faces:
# Just take the first face found
face = faces[0]
age = int(face.age) if hasattr(face, "age") else None
gender = (
"female"
if (hasattr(face, "gender") and face.gender == 0)
else ("male" if (hasattr(face, "gender") and face.gender == 1) else None)
)
return age, gender
return None, None
def update_person_db(conn, person_id, age, gender):
"""Update DB."""
cur = conn.cursor()
cur.execute(
"""
UPDATE dev.person_identities
SET age = %s, gender = %s
WHERE person_id = %s
""",
(age, gender, person_id),
)
conn.commit()
def main():
print("=== Person Demographics Updater ===")
# 1. Init DB
conn = psycopg2.connect(**DB_CONFIG)
# 2. Init Model
print("Loading InsightFace model...")
app = get_face_app()
# 3. Get List
persons = get_person_frames(conn)
print(f"Found {len(persons)} persons to process.")
for i, (person_id, start_time) in enumerate(persons):
print(
f"Processing {i + 1}/{len(persons)}: {person_id} (Time: {start_time:.1f}s)"
)
# 4. Get Frame
frame = get_frame_at_time(VIDEO_PATH, start_time)
if frame is not None:
# 5. Analyze
age, gender = analyze_face(app, frame)
if age and gender:
print(f" -> Detected: Age {age}, Gender {gender}")
# 6. Update
update_person_db(conn, person_id, age, gender)
else:
print(" -> Face not found or attributes missing in this frame.")
else:
print(" -> Failed to retrieve frame.")
print("=== Done ===")
conn.close()
if __name__ == "__main__":
main()