#!/opt/homebrew/bin/python3.11 """ Qdrant _faces Collection Operations Functions: - ensure_faces_collection(): Create _faces collection if not exists - generate_point_id(): Generate consistent point ID - push_face_embeddings_batch(): Batch push embeddings to Qdrant - update_trace_ids(): Update trace_id after face tracking Collection Schema: - Name: _faces (fixed, no schema prefix) - Vector: 512D, Cosine distance - Payload: {file_uuid, frame, trace_id, bbox, confidence, identity_id, identity_uuid, stranger_id} """ import os import json import hashlib import urllib.request import urllib.error from typing import Optional QDRANT_URL = os.environ.get("QDRANT_URL", "http://localhost:6333") QDRANT_API_KEY = os.environ.get("QDRANT_API_KEY", "Test3200Test3200Test3200") FACES_COLLECTION = "_faces" VECTOR_DIM = 512 BATCH_SIZE = int(os.environ.get("QDRANT_BATCH_SIZE", "100")) def qdrant_request(method: str, path: str, body: dict = None) -> dict: """Make HTTP request to Qdrant""" url = f"{QDRANT_URL}{path}" data = json.dumps(body).encode() if body else None req = urllib.request.Request(url, data=data, method=method) req.add_header("Content-Type", "application/json") req.add_header("Api-Key", QDRANT_API_KEY) try: with urllib.request.urlopen(req) as resp: return json.loads(resp.read()) except urllib.error.HTTPError as e: error_body = e.read().decode() raise RuntimeError(f"Qdrant HTTP {e.code}: {error_body}") def ensure_faces_collection() -> bool: """Create _faces collection if not exists""" url = f"{QDRANT_URL}/collections/{FACES_COLLECTION}" req = urllib.request.Request(url, method="GET") req.add_header("Api-Key", QDRANT_API_KEY) try: urllib.request.urlopen(req) return True # Collection exists except urllib.error.HTTPError as e: if e.code != 404: raise RuntimeError(f"Qdrant check failed: {e.read().decode()}") # Create collection body = { "vectors": { "size": VECTOR_DIM, "distance": "Cosine" } } create_url = f"{QDRANT_URL}/collections/{FACES_COLLECTION}" data = json.dumps(body).encode() req = urllib.request.Request(create_url, data=data, method="PUT") req.add_header("Content-Type", "application/json") req.add_header("Api-Key", QDRANT_API_KEY) try: urllib.request.urlopen(req) print(f"[QDRANT] Created collection: {FACES_COLLECTION}") return True except urllib.error.HTTPError as e: raise RuntimeError(f"Qdrant create collection failed: {e.read().decode()}") def generate_point_id(file_uuid: str, frame: int, trace_id: int = 0) -> int: """Generate consistent point ID from file_uuid + frame + trace_id""" key = f"{file_uuid}_{frame}_{trace_id}" return int(hashlib.md5(key.encode()).hexdigest()[:16], 16) def push_face_embeddings_batch( file_uuid: str, faces: list, publisher=None ) -> int: """Batch push face embeddings to _faces collection Args: file_uuid: Video file UUID faces: List of {frame, trace_id, bbox, confidence, embedding} publisher: RedisPublisher for progress reporting (optional) Returns: Number of successfully pushed embeddings Raises: RuntimeError: If Qdrant push fails """ if not faces: return 0 ensure_faces_collection() total = len(faces) pushed = 0 for i in range(0, total, BATCH_SIZE): batch = faces[i:i + BATCH_SIZE] points = [] for face in batch: point_id = generate_point_id( file_uuid, face["frame"], face.get("trace_id", 0) ) points.append({ "id": point_id, "vector": face["embedding"], "payload": { "file_uuid": file_uuid, "frame": face["frame"], "trace_id": face.get("trace_id", 0), "bbox": face["bbox"], "confidence": face.get("confidence", 0.5), "identity_id": None, "identity_uuid": None, "stranger_id": None, } }) body = {"points": points} url = f"{QDRANT_URL}/collections/{FACES_COLLECTION}/points?wait=true" data = json.dumps(body).encode() req = urllib.request.Request(url, data=data, method="PUT") req.add_header("Content-Type", "application/json") req.add_header("Api-Key", QDRANT_API_KEY) try: urllib.request.urlopen(req) pushed += len(batch) except urllib.error.HTTPError as e: error_body = e.read().decode() raise RuntimeError( f"Qdrant push failed (batch {i//BATCH_SIZE}): HTTP {e.code} - {error_body}" ) if publisher: pct = int((i + len(batch)) * 100 / total) publisher.progress("face", i + len(batch), total, f"Qdrant push {pct}%") print(f"[QDRANT] Pushed {pushed} embeddings to {FACES_COLLECTION}") return pushed def update_trace_ids(file_uuid: str, trace_mapping: dict) -> int: """Update trace_id for all face points in a file Called by store_traced_faces.py after face tracking. Args: file_uuid: Video file UUID trace_mapping: {frame: {bbox_key: trace_id}} bbox_key = f"{x}_{y}_{width}_{height}" Returns: Number of updated points """ all_points = [] offset = None while True: body = { "limit": BATCH_SIZE, "with_payload": True, "with_vector": True, "filter": { "must": [ {"key": "file_uuid", "match": {"value": file_uuid}} ] } } if offset: body["offset"] = offset result = qdrant_request("POST", f"/collections/{FACES_COLLECTION}/points/scroll", body) batch = result.get("result", {}).get("points", []) if not batch: break all_points.extend(batch) offset = result.get("result", {}).get("next_page_offset") if not offset: break updates = [] for point in all_points: point_id = point["id"] payload = point.get("payload", {}) vector = point.get("vector", []) frame = payload.get("frame") bbox = payload.get("bbox", {}) bbox_key = f"{bbox.get('x')}_{bbox.get('y')}_{bbox.get('width')}_{bbox.get('height')}" trace_id = trace_mapping.get(frame, {}).get(bbox_key) if trace_id is None: continue payload["trace_id"] = trace_id updates.append({ "id": point_id, "vector": vector, "payload": payload, }) if not updates: return 0 for i in range(0, len(updates), BATCH_SIZE): batch = updates[i:i + BATCH_SIZE] body = {"points": batch} qdrant_request("PUT", f"/collections/{FACES_COLLECTION}/points?wait=true", body) print(f"[QDRANT] Updated {len(updates)} trace_ids in {FACES_COLLECTION}") return len(updates) def delete_file_faces(file_uuid: str) -> int: """Delete all face points for a file Args: file_uuid: Video file UUID Returns: Number of deleted points """ body = { "filter": { "must": [ {"key": "file_uuid", "match": {"value": file_uuid}} ] } } result = qdrant_request("POST", f"/collections/{FACES_COLLECTION}/points/delete", body) deleted = result.get("result", {}).get("operation_id", 0) print(f"[QDRANT] Deleted faces for file_uuid={file_uuid}") return deleted def get_file_faces(file_uuid: str) -> list: """Get all face points for a file Args: file_uuid: Video file UUID Returns: List of points with payload and vector """ all_points = [] offset = None while True: body = { "limit": BATCH_SIZE, "with_payload": True, "with_vector": True, "filter": { "must": [ {"key": "file_uuid", "match": {"value": file_uuid}} ] } } if offset: body["offset"] = offset result = qdrant_request("POST", f"/collections/{FACES_COLLECTION}/points/scroll", body) batch = result.get("result", {}).get("points", []) if not batch: break all_points.extend(batch) offset = result.get("result", {}).get("next_page_offset") if not offset: break return all_points def count_file_faces(file_uuid: str) -> int: """Count face points for a file Args: file_uuid: Video file UUID Returns: Number of face points """ body = { "filter": { "must": [ {"key": "file_uuid", "match": {"value": file_uuid}} ] } } result = qdrant_request("POST", f"/collections/{FACES_COLLECTION}/points/count", body) return result.get("result", {}).get("count", 0)