#!/opt/homebrew/bin/python3.11 """ Generate pre-computed visual statistics for chunks. Reads frame yolo_objects, counts them per chunk, and updates chunks.visual_stats. """ import json import psycopg2 import psycopg2.extras from collections import Counter DB_CONFIG = { "host": "localhost", "user": "accusys", "dbname": "momentry", } def get_chunks_to_process(conn, schema="public"): """Fetch all chunks that need visual_stats processing.""" with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: # We check both public and dev chunks cur.execute(f""" SELECT id, uuid, start_time, end_time FROM {schema}.chunks WHERE (visual_stats IS NULL OR visual_stats = '{{}}'::jsonb) """) return cur.fetchall() def get_yolo_stats_for_range(conn, uuid, start_time, end_time, schema="public"): """Aggregate YOLO object counts for a specific time range.""" # We need to find file_id for the given uuid with conn.cursor() as cur: cur.execute(f"SELECT id FROM {schema}.videos WHERE uuid = %s", (uuid,)) row = cur.fetchone() if not row: return {} file_id = row[0] # Fetch yolo_objects from frames in range cur.execute( f""" SELECT yolo_objects FROM {schema}.frames WHERE file_id = %s AND timestamp >= %s AND timestamp <= %s AND yolo_objects IS NOT NULL """, (file_id, start_time, end_time), ) objects = Counter() for (yolo_data,) in cur.fetchall(): # yolo_data is a JSON list of objects: [{"class_name": "person", ...}, ...] if isinstance(yolo_data, str): try: yolo_data = json.loads(yolo_data) except: continue if isinstance(yolo_data, list): for obj in yolo_data: class_name = obj.get("class_name") if class_name: objects[class_name] += 1 return dict(objects) def update_chunk_visual_stats(conn, chunk_id, stats, schema="public"): """Update the visual_stats column for a chunk.""" with conn.cursor() as cur: cur.execute( f"UPDATE {schema}.chunks SET visual_stats = %s::jsonb WHERE id = %s", (json.dumps(stats), chunk_id), ) def main(): print("🚀 Starting visual stats generation...") conn = psycopg2.connect(**DB_CONFIG) for schema in ["public", "dev"]: print(f"📊 Processing schema: {schema}") chunks = get_chunks_to_process(conn, schema) print(f" Found {len(chunks)} chunks to process.") processed_count = 0 for chunk in chunks: chunk_id = chunk["id"] uuid = chunk["uuid"] start_time = chunk["start_time"] end_time = chunk["end_time"] stats = get_yolo_stats_for_range(conn, uuid, start_time, end_time, schema) # Update DB even if empty to mark as processed (avoid re-scanning) update_chunk_visual_stats(conn, chunk_id, stats, schema) processed_count += 1 if processed_count % 100 == 0: conn.commit() print(f" ✅ Processed {processed_count}/{len(chunks)} chunks...") conn.commit() print(f"🎉 Done with {schema}! Processed {processed_count} chunks.") conn.close() if __name__ == "__main__": main()