Files
momentry_core/scripts/video_comparison_statistics.py
Warren e75c4d6f07 cleanup: remove dead code and duplicate docs
- Remove session-ses_2f27.md (161KB raw session log)
- Remove 49 ROOT_* duplicate files across REFERENCE/
- Remove 14 duplicate files between REFERENCE/ root and history/
- Remove asr_legacy.rs (dead code, replaced by asr.rs)
- Remove src/core/worker/ (duplicate JobWorker)
- Remove src/core/layers/ (empty directory)
- Remove 4 .bak files in src/
- Remove 7 dead private methods in worker/processor.rs
- Remove backup directory from git tracking
2026-05-04 01:31:21 +08:00

218 lines
7.2 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
Video Processing Comparison Statistics
Compare ASRX broken vs fixed implementation
"""
import json
from pathlib import Path
from datetime import datetime
def load_json(path):
"""Load JSON file"""
try:
return json.load(open(path))
except Exception as e:
return {"error": str(e)}
def count_segments(data, module_name):
"""Count segments for different modules"""
if module_name == "asr":
return len(data.get("segments", []))
elif module_name == "asrx":
return len(data.get("segments", []))
elif module_name == "cut":
return len(data.get("cuts", []))
elif module_name == "yolo":
return len(data.get("frames", []))
elif module_name == "ocr":
return len(data.get("frames", []))
elif module_name == "face":
return len(data.get("frames", []))
elif module_name == "pose":
return len(data.get("frames", []))
else:
return 0
def get_video_info(uuid):
"""Get video metadata"""
mp4_path = Path(f"/Users/accusys/momentry/var/sftpgo/data/demo/{uuid}/{uuid}.mp4")
if mp4_path.exists():
import subprocess
result = subprocess.run(
[
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration,size:stream=width,height,codec_name",
"-of",
"json",
str(mp4_path),
],
capture_output=True,
text=True,
)
try:
info = json.loads(result.stdout)
format_info = info.get("format", {})
stream_info = info.get("streams", [{}])[0]
return {
"duration": float(format_info.get("duration", 0)),
"size": int(format_info.get("size", 0)),
"width": stream_info.get("width", 0),
"height": stream_info.get("height", 0),
"codec": stream_info.get("codec_name", "unknown"),
}
except:
return {}
return {}
def generate_comparison_report(output_dir="./output"):
"""Generate comparison statistics report"""
output_path = Path(output_dir)
report = {"generated_at": datetime.now().isoformat(), "videos": {}}
for uuid in ["9760d0820f0cf9a7", "384b0ff44aaaa1f1"]:
video_report = {"uuid": uuid, "metadata": get_video_info(uuid), "modules": {}}
modules = ["asr", "cut", "yolo", "ocr", "face", "pose"]
for module in modules:
file_path = output_path / f"{uuid}.{module}.json"
if file_path.exists():
data = load_json(file_path)
video_report["modules"][module] = {
"file": str(file_path),
"segments": count_segments(data, module),
"status": "complete" if "error" not in data else "error",
}
# ASRX comparison (broken vs fixed)
asrx_broken_path = output_path / f"{uuid}.asrx.json.bak"
asrx_fixed_path = output_path / f"{uuid}.asrx.json"
if asrx_broken_path.exists():
broken_data = load_json(asrx_broken_path)
video_report["modules"]["asrx_broken"] = {
"file": str(asrx_broken_path),
"segments": count_segments(broken_data, "asrx"),
"status": "broken",
"note": "Original implementation - 0 segments",
}
if asrx_fixed_path.exists():
fixed_data = load_json(asrx_fixed_path)
stats = fixed_data.get("speaker_stats", {})
video_report["modules"]["asrx_fixed"] = {
"file": str(asrx_fixed_path),
"segments": count_segments(fixed_data, "asrx"),
"speakers": len(stats),
"speaker_stats": stats,
"status": "fixed",
"note": "Custom SpeechBrain implementation",
}
report["videos"][uuid] = video_report
# Summary
report["summary"] = {
"asrx_broken": {"9760d0820f0cf9a7": 0, "384b0ff44aaaa1f1": 0, "total": 0},
"asrx_fixed": {
"9760d0820f0cf9a7": report["videos"]["9760d0820f0cf9a7"]["modules"][
"asrx_fixed"
]["segments"],
"384b0ff44aaaa1f1": report["videos"]["384b0ff44aaaa1f1"]["modules"][
"asrx_fixed"
]["segments"],
"total": report["videos"]["9760d0820f0cf9a7"]["modules"]["asrx_fixed"][
"segments"
]
+ report["videos"]["384b0ff44aaaa1f1"]["modules"]["asrx_fixed"]["segments"],
},
"improvement": "Custom SpeechBrain implementation successfully detects speakers",
}
return report
def print_report(report):
"""Print formatted report"""
print("=" * 80)
print("VIDEO PROCESSING COMPARISON STATISTICS")
print("=" * 80)
print(f"Generated: {report['generated_at']}")
print()
for uuid, video_data in report["videos"].items():
print(f"\n{'=' * 80}")
print(f"Video: {uuid}")
print(f"{'=' * 80}")
meta = video_data["metadata"]
if meta:
print(f"Duration: {meta.get('duration', 0):.2f}s")
print(f"Resolution: {meta.get('width', 0)}x{meta.get('height', 0)}")
print(f"Size: {meta.get('size', 0) / 1024 / 1024:.2f} MB")
print("\nModule Results:")
print(f"{'-' * 80}")
for module, data in video_data["modules"].items():
if module.startswith("asrx"):
print(
f"{module:20} {data['segments']:10} segments [{data['status']:10}] {data.get('note', '')}"
)
else:
print(
f"{module:20} {data['segments']:10} segments [{data['status']:10}]"
)
# Speaker stats for ASRX fixed
if "asrx_fixed" in video_data["modules"]:
stats = video_data["modules"]["asrx_fixed"].get("speaker_stats", {})
if stats:
print("\nSpeaker Statistics (ASRX Fixed):")
for speaker, spec in stats.items():
print(
f" {speaker}: {spec['count']} segments, {spec['duration']:.2f}s"
)
# Summary
print(f"\n{'=' * 80}")
print("SUMMARY")
print(f"{'=' * 80}")
print("\nASRX Broken (pyannote):")
for uuid, count in report["summary"]["asrx_broken"].items():
if uuid != "total":
print(f" {uuid}: {count} segments")
print(f" Total: {report['summary']['asrx_broken']['total']} segments")
print("\nASRX Fixed (SpeechBrain):")
for uuid, count in report["summary"]["asrx_fixed"].items():
if uuid != "total":
print(f" {uuid}: {count} segments")
print(f" Total: {report['summary']['asrx_fixed']['total']} segments")
print(f"\n{report['summary']['improvement']}")
print(f"\n{'=' * 80}")
if __name__ == "__main__":
report = generate_comparison_report()
print_report(report)
# Save report
output_file = Path("./output/video_comparison_report.json")
with open(output_file, "w") as f:
json.dump(report, f, indent=2)
print(f"\nReport saved to: {output_file}")