From 9fbb4f9b488103ff332705c3539a6b31b08dfb00 Mon Sep 17 00:00:00 2001 From: Accusys Date: Thu, 25 Jun 2026 00:23:20 +0800 Subject: [PATCH] feat: add Qdrant _faces collection embedding push - Add qdrant_faces.py utility module for _faces collection operations - Modify face_processor.py to push embeddings to Qdrant (CoreML extraction re-enabled) - Modify store_traced_faces.py to update trace_id in Qdrant after face tracking - Collection schema: 512D vectors, Cosine distance, fixed name '_faces' - Payload: file_uuid, frame, trace_id, bbox, confidence, identity_id/uuid, stranger_id - Batch size: 100 (default), configurable via QDRANT_BATCH_SIZE env var - Error handling: face_processor.py exits with error if Qdrant push fails --- scripts/face_processor.py | 28 +++- scripts/store_traced_faces.py | 22 +++ scripts/utils/qdrant_faces.py | 308 ++++++++++++++++++++++++++++++++++ 3 files changed, 355 insertions(+), 3 deletions(-) create mode 100644 scripts/utils/qdrant_faces.py diff --git a/scripts/face_processor.py b/scripts/face_processor.py index ee47d1f..542f588 100644 --- a/scripts/face_processor.py +++ b/scripts/face_processor.py @@ -30,7 +30,9 @@ from pathlib import Path import coremltools as ct sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "utils")) from redis_publisher import RedisPublisher +from qdrant_faces import push_face_embeddings_batch SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) SWIFT_BIN = os.path.join(SCRIPT_DIR, "swift_processors", ".build", "debug", "swift_face_pose") @@ -199,6 +201,7 @@ class FaceProcessorVision: embed_count = 0 total_face_count = 0 last_pct = -1 + all_embeddings = [] # Collect embeddings for Qdrant push for frame_info in frames: frame_num = frame_info["frame"] @@ -225,11 +228,18 @@ class FaceProcessorVision: if face_img.size == 0: continue - # CoreML embedding - TODO: push to Qdrant _faces collection instead - # emb = self.extract_face_embedding(face_img) - emb = None + # CoreML embedding - push to Qdrant _faces collection + emb = self.extract_face_embedding(face_img) if emb is not None: embed_count += 1 + # Collect for batch Qdrant push + all_embeddings.append({ + "frame": frame_num, + "trace_id": 0, # Initial, updated by face_tracker + "bbox": {"x": x, "y": y, "width": w, "height": h}, + "confidence": face.get("confidence", 0.5), + "embedding": emb, + }) # Pose classification pose_info = face.get("pose", {}) @@ -292,6 +302,18 @@ class FaceProcessorVision: with open(self.output_path, "w") as f: json.dump(output, f, indent=2, ensure_ascii=False) + # Push embeddings to Qdrant _faces collection + if all_embeddings: + try: + pushed = push_face_embeddings_batch(self.uuid, all_embeddings, self.publisher) + if pushed != len(all_embeddings): + raise RuntimeError( + f"Qdrant push incomplete: {pushed}/{len(all_embeddings)} embeddings pushed" + ) + except Exception as e: + print(f"[FACE_V2] ERROR: Qdrant push failed: {e}", file=sys.stderr) + raise RuntimeError(f"Qdrant push failed: {e}") + elapsed = time.time() - t0 print(f"[FACE_V2] Done: {len(frames_list)} frames, {embed_count} embeddings, {elapsed:.0f}s") diff --git a/scripts/store_traced_faces.py b/scripts/store_traced_faces.py index 4650ef3..cdef506 100644 --- a/scripts/store_traced_faces.py +++ b/scripts/store_traced_faces.py @@ -27,6 +27,7 @@ from datetime import datetime sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "utils")) +from qdrant_faces import update_trace_ids # Config DB_URL = os.environ.get("DATABASE_URL", "postgresql://accusys@localhost:5432/momentry") @@ -204,6 +205,25 @@ def store_traced_faces(file_uuid: str, traced_json_path: str, schema: str = SCHE conn.commit() + # Build trace_mapping for Qdrant update + trace_mapping = {} # {frame: {bbox_key: trace_id}} + for frame_num_str, frame_data in sorted(frames.items(), key=lambda x: int(x[0])): + frame_num = int(frame_num_str) + trace_mapping[frame_num] = {} + for face in frame_data.get("faces", []): + trace_id = face.get("trace_id") + if trace_id is None: + continue + bbox_key = f"{face['x']}_{face['y']}_{face['width']}_{face['height']}" + trace_mapping[frame_num][bbox_key] = trace_id + + # Update Qdrant _faces collection with trace_id + try: + qdrant_updated = update_trace_ids(file_uuid, trace_mapping) + except Exception as e: + print(f"[TRACE] Warning: Qdrant trace_id update failed: {e}") + qdrant_updated = 0 + # Log trace summary cur.execute( f"SELECT COUNT(DISTINCT trace_id) FROM {schema}.face_detections WHERE file_uuid = %s AND trace_id IS NOT NULL", @@ -217,6 +237,8 @@ def store_traced_faces(file_uuid: str, traced_json_path: str, schema: str = SCHE print( f"[TRACE] Stored {total_stored} face detections, {db_trace_count} unique traces in DB" ) + if qdrant_updated > 0: + print(f"[TRACE] Updated {qdrant_updated} Qdrant points with trace_id") return total_stored, db_trace_count diff --git a/scripts/utils/qdrant_faces.py b/scripts/utils/qdrant_faces.py new file mode 100644 index 0000000..ac6ed1a --- /dev/null +++ b/scripts/utils/qdrant_faces.py @@ -0,0 +1,308 @@ +#!/opt/homebrew/bin/python3.11 +""" +Qdrant _faces Collection Operations + +Functions: +- ensure_faces_collection(): Create _faces collection if not exists +- generate_point_id(): Generate consistent point ID +- push_face_embeddings_batch(): Batch push embeddings to Qdrant +- update_trace_ids(): Update trace_id after face tracking + +Collection Schema: +- Name: _faces (fixed, no schema prefix) +- Vector: 512D, Cosine distance +- Payload: {file_uuid, frame, trace_id, bbox, confidence, identity_id, identity_uuid, stranger_id} +""" + +import os +import json +import hashlib +import urllib.request +import urllib.error +from typing import Optional + +QDRANT_URL = os.environ.get("QDRANT_URL", "http://localhost:6333") +QDRANT_API_KEY = os.environ.get("QDRANT_API_KEY", "Test3200Test3200Test3200") +FACES_COLLECTION = "_faces" +VECTOR_DIM = 512 +BATCH_SIZE = int(os.environ.get("QDRANT_BATCH_SIZE", "100")) + + +def qdrant_request(method: str, path: str, body: dict = None) -> dict: + """Make HTTP request to Qdrant""" + url = f"{QDRANT_URL}{path}" + data = json.dumps(body).encode() if body else None + req = urllib.request.Request(url, data=data, method=method) + req.add_header("Content-Type", "application/json") + req.add_header("Api-Key", QDRANT_API_KEY) + try: + with urllib.request.urlopen(req) as resp: + return json.loads(resp.read()) + except urllib.error.HTTPError as e: + error_body = e.read().decode() + raise RuntimeError(f"Qdrant HTTP {e.code}: {error_body}") + + +def ensure_faces_collection() -> bool: + """Create _faces collection if not exists""" + url = f"{QDRANT_URL}/collections/{FACES_COLLECTION}" + req = urllib.request.Request(url, method="GET") + req.add_header("Api-Key", QDRANT_API_KEY) + try: + urllib.request.urlopen(req) + return True # Collection exists + except urllib.error.HTTPError as e: + if e.code != 404: + raise RuntimeError(f"Qdrant check failed: {e.read().decode()}") + + # Create collection + body = { + "vectors": { + "size": VECTOR_DIM, + "distance": "Cosine" + } + } + create_url = f"{QDRANT_URL}/collections/{FACES_COLLECTION}" + data = json.dumps(body).encode() + req = urllib.request.Request(create_url, data=data, method="PUT") + req.add_header("Content-Type", "application/json") + req.add_header("Api-Key", QDRANT_API_KEY) + try: + urllib.request.urlopen(req) + print(f"[QDRANT] Created collection: {FACES_COLLECTION}") + return True + except urllib.error.HTTPError as e: + raise RuntimeError(f"Qdrant create collection failed: {e.read().decode()}") + + +def generate_point_id(file_uuid: str, frame: int, trace_id: int = 0) -> int: + """Generate consistent point ID from file_uuid + frame + trace_id""" + key = f"{file_uuid}_{frame}_{trace_id}" + return int(hashlib.md5(key.encode()).hexdigest()[:16], 16) + + +def push_face_embeddings_batch( + file_uuid: str, + faces: list, + publisher=None +) -> int: + """Batch push face embeddings to _faces collection + + Args: + file_uuid: Video file UUID + faces: List of {frame, trace_id, bbox, confidence, embedding} + publisher: RedisPublisher for progress reporting (optional) + + Returns: + Number of successfully pushed embeddings + + Raises: + RuntimeError: If Qdrant push fails + """ + if not faces: + return 0 + + ensure_faces_collection() + + total = len(faces) + pushed = 0 + + for i in range(0, total, BATCH_SIZE): + batch = faces[i:i + BATCH_SIZE] + + points = [] + for face in batch: + point_id = generate_point_id( + file_uuid, + face["frame"], + face.get("trace_id", 0) + ) + points.append({ + "id": point_id, + "vector": face["embedding"], + "payload": { + "file_uuid": file_uuid, + "frame": face["frame"], + "trace_id": face.get("trace_id", 0), + "bbox": face["bbox"], + "confidence": face.get("confidence", 0.5), + "identity_id": None, + "identity_uuid": None, + "stranger_id": None, + } + }) + + body = {"points": points} + url = f"{QDRANT_URL}/collections/{FACES_COLLECTION}/points?wait=true" + data = json.dumps(body).encode() + req = urllib.request.Request(url, data=data, method="PUT") + req.add_header("Content-Type", "application/json") + req.add_header("Api-Key", QDRANT_API_KEY) + + try: + urllib.request.urlopen(req) + pushed += len(batch) + except urllib.error.HTTPError as e: + error_body = e.read().decode() + raise RuntimeError( + f"Qdrant push failed (batch {i//BATCH_SIZE}): HTTP {e.code} - {error_body}" + ) + + if publisher: + pct = int((i + len(batch)) * 100 / total) + publisher.progress("face", i + len(batch), total, f"Qdrant push {pct}%") + + print(f"[QDRANT] Pushed {pushed} embeddings to {FACES_COLLECTION}") + return pushed + + +def update_trace_ids(file_uuid: str, trace_mapping: dict) -> int: + """Update trace_id for all face points in a file + + Called by store_traced_faces.py after face tracking. + + Args: + file_uuid: Video file UUID + trace_mapping: {frame: {bbox_key: trace_id}} + bbox_key = f"{x}_{y}_{width}_{height}" + + Returns: + Number of updated points + """ + all_points = [] + offset = None + + while True: + body = { + "limit": BATCH_SIZE, + "with_payload": True, + "with_vector": True, + "filter": { + "must": [ + {"key": "file_uuid", "match": {"value": file_uuid}} + ] + } + } + if offset: + body["offset"] = offset + + result = qdrant_request("POST", f"/collections/{FACES_COLLECTION}/points/scroll", body) + batch = result.get("result", {}).get("points", []) + if not batch: + break + all_points.extend(batch) + offset = result.get("result", {}).get("next_page_offset") + if not offset: + break + + updates = [] + for point in all_points: + point_id = point["id"] + payload = point.get("payload", {}) + vector = point.get("vector", []) + + frame = payload.get("frame") + bbox = payload.get("bbox", {}) + bbox_key = f"{bbox.get('x')}_{bbox.get('y')}_{bbox.get('width')}_{bbox.get('height')}" + + trace_id = trace_mapping.get(frame, {}).get(bbox_key) + if trace_id is None: + continue + + payload["trace_id"] = trace_id + updates.append({ + "id": point_id, + "vector": vector, + "payload": payload, + }) + + if not updates: + return 0 + + for i in range(0, len(updates), BATCH_SIZE): + batch = updates[i:i + BATCH_SIZE] + body = {"points": batch} + qdrant_request("PUT", f"/collections/{FACES_COLLECTION}/points?wait=true", body) + + print(f"[QDRANT] Updated {len(updates)} trace_ids in {FACES_COLLECTION}") + return len(updates) + + +def delete_file_faces(file_uuid: str) -> int: + """Delete all face points for a file + + Args: + file_uuid: Video file UUID + + Returns: + Number of deleted points + """ + body = { + "filter": { + "must": [ + {"key": "file_uuid", "match": {"value": file_uuid}} + ] + } + } + result = qdrant_request("POST", f"/collections/{FACES_COLLECTION}/points/delete", body) + deleted = result.get("result", {}).get("operation_id", 0) + print(f"[QDRANT] Deleted faces for file_uuid={file_uuid}") + return deleted + + +def get_file_faces(file_uuid: str) -> list: + """Get all face points for a file + + Args: + file_uuid: Video file UUID + + Returns: + List of points with payload and vector + """ + all_points = [] + offset = None + + while True: + body = { + "limit": BATCH_SIZE, + "with_payload": True, + "with_vector": True, + "filter": { + "must": [ + {"key": "file_uuid", "match": {"value": file_uuid}} + ] + } + } + if offset: + body["offset"] = offset + + result = qdrant_request("POST", f"/collections/{FACES_COLLECTION}/points/scroll", body) + batch = result.get("result", {}).get("points", []) + if not batch: + break + all_points.extend(batch) + offset = result.get("result", {}).get("next_page_offset") + if not offset: + break + + return all_points + + +def count_file_faces(file_uuid: str) -> int: + """Count face points for a file + + Args: + file_uuid: Video file UUID + + Returns: + Number of face points + """ + body = { + "filter": { + "must": [ + {"key": "file_uuid", "match": {"value": file_uuid}} + ] + } + } + result = qdrant_request("POST", f"/collections/{FACES_COLLECTION}/points/count", body) + return result.get("result", {}).get("count", 0) \ No newline at end of file