#!/opt/bin/python3.11 """ Chunk-based statistics for ASR, Face, and Speaker combinations. Generates a comprehensive report of each chunk's content. """ import json import os UUID = "384b0ff44aaaa1f1" BASE_DIR = f"output/{UUID}" CHUNK_DURATION = 60 # seconds per chunk def load_json(filepath): with open(filepath, "r") as f: return json.load(f) def build_chunk_stats(): print(f"šŸ“Š Building chunk statistics for {UUID}...") print(f" Chunk duration: {CHUNK_DURATION}s") # Load data asr_data = load_json(os.path.join(BASE_DIR, f"{UUID}.asr.json")) face_data = load_json(os.path.join(BASE_DIR, f"{UUID}.face_clustered.json")) # Get video duration segments = asr_data.get("segments", []) video_duration = max(seg.get("end", 0) for seg in segments) if segments else 0 print(f" Video duration: {video_duration:.0f}s ({video_duration / 60:.1f} min)") # Build chunk structure num_chunks = int(video_duration // CHUNK_DURATION) + 1 chunks = [] for i in range(num_chunks): chunk_start = i * CHUNK_DURATION chunk_end = (i + 1) * CHUNK_DURATION chunks.append( { "chunk_id": i, "start": chunk_start, "end": chunk_end, "asr_count": 0, "asr_text_len": 0, "face_count": 0, "unique_persons": set(), "has_speech": False, "has_faces": False, } ) # Count ASR segments per chunk for seg in segments: start = seg.get("start", 0) end = seg.get("end", 0) text = seg.get("text", "") # Find overlapping chunks chunk_start_idx = int(start // CHUNK_DURATION) chunk_end_idx = int(end // CHUNK_DURATION) for ci in range(chunk_start_idx, min(chunk_end_idx + 1, len(chunks))): chunks[ci]["asr_count"] += 1 chunks[ci]["asr_text_len"] += len(text) chunks[ci]["has_speech"] = True # Count faces per chunk face_frames = face_data.get("frames", []) for frame in face_frames: timestamp = frame.get("timestamp", 0) faces = frame.get("faces", []) chunk_idx = int(timestamp // CHUNK_DURATION) if chunk_idx < len(chunks): chunks[chunk_idx]["face_count"] += len(faces) chunks[chunk_idx]["has_faces"] = len(faces) > 0 for face in faces: pid = face.get("person_id") if pid: chunks[chunk_idx]["unique_persons"].add(pid) # Convert sets to counts for serialization for chunk in chunks: chunk["unique_person_count"] = len(chunk["unique_persons"]) chunk["top_persons"] = list(chunk["unique_persons"])[:10] # Top 10 del chunk["unique_persons"] return chunks, video_duration def print_summary(chunks): print("\n" + "=" * 80) print("šŸ“ˆ CHUNK STATISTICS SUMMARY") print("=" * 80) # Overall stats total_asr = sum(c["asr_count"] for c in chunks) total_faces = sum(c["face_count"] for c in chunks) total_speech_chunks = sum(1 for c in chunks if c["has_speech"]) total_face_chunks = sum(1 for c in chunks if c["has_faces"]) chunks_with_both = sum(1 for c in chunks if c["has_speech"] and c["has_faces"]) chunks_with_neither = sum( 1 for c in chunks if not c["has_speech"] and not c["has_faces"] ) print("\nšŸ“Š Overview:") print(f" Total chunks: {len(chunks)}") print( f" Chunks with speech: {total_speech_chunks} ({total_speech_chunks / len(chunks) * 100:.0f}%)" ) print( f" Chunks with faces: {total_face_chunks} ({total_face_chunks / len(chunks) * 100:.0f}%)" ) print( f" Both speech+faces: {chunks_with_both} ({chunks_with_both / len(chunks) * 100:.0f}%)" ) print( f" Neither: {chunks_with_neither} ({chunks_with_neither / len(chunks) * 100:.0f}%)" ) print(f" Total ASR segments: {total_asr}") print(f" Total face frames: {total_faces}") # Combination breakdown print("\nšŸŽÆ ASR/Face Combination Breakdown:") combos = {} for c in chunks: key = (c["has_speech"], c["has_faces"]) if key not in combos: combos[key] = {"count": 0, "chunk_ids": []} combos[key]["count"] += 1 combos[key]["chunk_ids"].append(c["chunk_id"]) for (has_speech, has_faces), info in sorted(combos.items()): speech_str = "šŸŽ¤ Speech" if has_speech else " No Speech" face_str = "šŸ‘¤ Faces" if has_faces else " No Faces" chunk_range = ( f"{min(info['chunk_ids'])}-{max(info['chunk_ids'])}" if len(info["chunk_ids"]) > 1 else f"{info['chunk_ids'][0]}" ) print( f" {speech_str} + {face_str}: {info['count']} chunks (IDs: {chunk_range})" ) # Top chunks by activity print("\nšŸ”„ Top 10 Most Active Chunks (by ASR+Faces):") scored_chunks = [] for c in chunks: score = c["asr_count"] + c["face_count"] scored_chunks.append((score, c)) scored_chunks.sort(key=lambda x: x[0], reverse=True) for score, c in scored_chunks[:10]: persons = ", ".join(c["top_persons"][:3]) print( f" Chunk {c['chunk_id']:3d} ({c['start']:5d}-{c['end']:5d}s): " f"ASR={c['asr_count']:3d}, Faces={c['face_count']:4d}, " f"Persons={c['unique_person_count']:2d} ({persons})" ) # Stamp scene chunk print("\nšŸ” Special Interest Chunks:") for c in chunks: # Stamp scene around 5730s if c["start"] <= 5730 <= c["end"]: persons = ", ".join(c["top_persons"][:5]) print( f" šŸŽÆ Stamp scene chunk: {c['chunk_id']} ({c['start']}-{c['end']}s)" ) print( f" ASR={c['asr_count']}, Faces={c['face_count']}, " f"Persons={c['unique_person_count']} ({persons})" ) # Magnifying glass scene around 5727s if c["start"] <= 5727 <= c["end"]: print( f" šŸ” Magnifier scene chunk: {c['chunk_id']} ({c['start']}-{c['end']}s)" ) # Vase scenes vase_times = [300, 660, 3720] for vt in vase_times: for c in chunks: if c["start"] <= vt <= c["end"]: persons = ", ".join(c["top_persons"][:3]) print( f" šŸŗ Vase scene chunk: {c['chunk_id']} ({c['start']}-{c['end']}s)" ) print( f" ASR={c['asr_count']}, Faces={c['face_count']}, " f"Persons={c['unique_person_count']} ({persons})" ) if __name__ == "__main__": chunks, duration = build_chunk_stats() print_summary(chunks) # Save to file output_path = os.path.join(BASE_DIR, "chunk_statistics.json") with open(output_path, "w") as f: json.dump( { "uuid": UUID, "duration": duration, "chunk_duration": CHUNK_DURATION, "chunks": chunks, }, f, indent=2, ) print(f"\nšŸ’¾ Saved detailed stats to: {output_path}")