Files
momentry_core/scripts/qa/pipeline.py

154 lines
5.1 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
M5 QA Self-Check Agent
Usage: python3 pipeline.py --uuid aeed71342a899fe4b4c57b7d41bcb692
"""
import sys, os, argparse
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "judges"))
from query_generator import generate
from executor import execute
from scorer import aggregate, generate_report
# Import judges
from judges import paligemma, gdino, maskformer, yolo, facenet, gemma4
JUDGE_WEIGHTS = {
"PaliGemma": 0.25,
"Gemma4": 0.35,
"MaskFormer": 0.15,
"YOLO": 0.15,
"GroundingDINO": 0.05,
"FaceNet": 0.05,
}
def run_judges(query, result, file_uuid):
"""Run all judges on the extracted frames and prompt."""
frames = result.get("frames", [])
prompt = query["prompt"]
qid = query["id"]
if not frames:
print(f" [{qid}] No frames to judge")
return []
results = []
# Run PaliGemma first (produces text needed by Gemma4)
print(f" [{qid}] PaliGemma...", end="", flush=True)
try:
pg_result = paligemma.score(frames, prompt)
print(" done")
results.append(pg_result)
except Exception as e:
print(f" ERROR: {str(e)[:60]}")
results.append({"agent": "PaliGemma", "score": 50, "reasoning": f"Judge error: {str(e)[:60]}", "details": {}})
# Run other judges
print(f" [{qid}] YOLO...", end="", flush=True)
try:
yo_result = yolo.score(frames, prompt)
print(" done")
results.append(yo_result)
except Exception as e:
print(f" ERROR: {str(e)[:60]}")
results.append({"agent": "YOLO", "score": 50, "reasoning": f"Judge error: {str(e)[:60]}", "details": {}})
print(f" [{qid}] MaskFormer...", end="", flush=True)
try:
mf_result = maskformer.score(frames, prompt)
print(" done")
results.append(mf_result)
except Exception as e:
print(f" ERROR: {str(e)[:60]}")
results.append({"agent": "MaskFormer", "score": 50, "reasoning": f"Judge error: {str(e)[:60]}", "details": {}})
# Grounding DINO — SKIP (too slow per-video search)
results.append({"agent": "GroundingDINO", "score": 50, "reasoning": "Skipped (too slow for full-video search)", "details": {}})
print(f" [{qid}] FaceNet...", end="", flush=True)
try:
fn_result = facenet.score(frames, prompt)
print(" done")
results.append(fn_result)
except Exception as e:
print(f" ERROR: {str(e)[:60]}")
results.append({"agent": "FaceNet", "score": 50, "reasoning": f"Judge error", "details": {}})
# Gemma4 — uses context from other judges
print(f" [{qid}] Gemma4...", end="", flush=True)
try:
pali_text = ""
for r in results:
if r["agent"] == "PaliGemma":
pali_text = r.get("reasoning", "")
break
ctx = {
"paligemma": pali_text,
"maskformer": mf_result.get("reasoning", "") if 'mf_result' in dir() else "",
"yolo": yo_result.get("details", {}).get("frames", [{}])[0].get("found", []) if 'yo_result' in dir() else []
}
gm_result = gemma4.score(frames, prompt, context=ctx)
print(" done")
results.append(gm_result)
except Exception as e:
print(f" ERROR: {str(e)[:60]}")
results.append({"agent": "Gemma4", "score": 50, "reasoning": f"LLM error: {str(e)[:60]}", "details": {}})
return results
def main():
parser = argparse.ArgumentParser(description="QA Self-Check Agent")
parser.add_argument("--uuid", required=True, help="File UUID")
args = parser.parse_args()
file_uuid = args.uuid
print(f"=== QA Self-Check Agent ===")
print(f"UUID: {file_uuid}")
print()
# Phase 1: Generate 15 test queries
print("=== Phase 1: Generating queries ===")
queries = generate(file_uuid)
print(f" Generated {len(queries)} queries:")
for q in queries:
print(f" {q['id']} [{q['type']:>7}] {q['prompt'][:60]}")
print()
# Phase 2: Execute (API search + video download + frame extraction)
print("=== Phase 2: Executing queries ===")
results = []
for q in queries:
result = execute(q, file_uuid)
results.append(result)
print()
# Phase 3: Run judges
print("=== Phase 3: Running judges ===")
for i, r in enumerate(results):
if r.get("status") != "ok" or not r.get("frames"):
print(f" [{r['query']['id']}] Skipped (no video/frames)")
r["judge_results"] = []
continue
r["judge_results"] = run_judges(r["query"], r, file_uuid)
# Phase 4: Generate report
print()
print("=== Phase 4: Generating report ===")
# Strip non-serializable data
for r in results:
r.pop("frames", None)
# Strip PIL Image from judge details if any
for jr in r.get("judge_results", []):
if "frames" in jr.get("details", {}):
jr["details"].pop("frames")
generate_report(results, file_uuid)
print()
print("=== Done ===")
if __name__ == "__main__":
main()