diff --git a/scripts/pipeline_checklist.py b/scripts/pipeline_checklist.py new file mode 100644 index 0000000..01b3a19 --- /dev/null +++ b/scripts/pipeline_checklist.py @@ -0,0 +1,167 @@ +#!/opt/homebrew/bin/python3.11 +""" +Pipeline Checklist — 獨立於 pipeline 之外的驗收檢查 +每個 stage 完成後自動檢查產出是否到位,沒過的標記出來。 +""" + +import json, os, subprocess, sys +from datetime import datetime +from pathlib import Path + +PROJECT = Path(__file__).resolve().parent.parent +OUTPUT_DIR = Path(os.environ.get("MOMENTRY_OUTPUT_DIR", "/Users/accusys/momentry/output_dev")) +DB_USER = os.environ.get("USER", "accusys") +DB_NAME = "momentry" +PSQL = "/Users/accusys/pgsql/18.3/bin/psql" + + +def run_sql(sql: str) -> str: + r = subprocess.run( + [PSQL, "-U", DB_USER, "-d", DB_NAME, "-t", "-A", "-c", sql], + capture_output=True, text=True, timeout=30, + ) + return r.stdout.strip() + + +def check_file(uuid: str, suffix: str) -> tuple[bool, str]: + f = OUTPUT_DIR / f"{uuid}.{suffix}" + if not f.exists(): + return False, f"missing: {f.name}" + if f.stat().st_size == 0: + return False, f"empty: {f.name} (0 bytes)" + return True, f"ok ({f.stat().st_size // 1024}KB)" + + +stages = [] # (name, checks) +uuid = "aeed71342a899fe4b4c57b7d41bcb692" +results = [] + +def check(name, checks): + stage_results = [] + for desc, ok, msg in checks: + stage_results.append((desc, ok, msg)) + passed = all(ok for _, ok, _ in stage_results) + results.append((name, passed, stage_results)) + return passed + + +print(f"\n{'='*60}") +print(f"Pipeline Checklist — {uuid}") +print(f"{'='*60}\n") + +# ── Stage 1: ASR ── +print("[1/8] ASR") +ok, msg = check_file(uuid, "asr.json") +segments = 0 +if ok: + try: + with open(OUTPUT_DIR / f"{uuid}.asr.json") as fh: + d = json.load(fh) + segments = len(d.get("segments", [])) + except Exception: + segments = 0 +check("ASR output", [ + ("asr.json exists", ok, msg), + ("has segments", segments > 0, f"{segments} segments"), +]) + +# ── Stage 2: ASRX ── +print("[2/8] ASRX") +ok, msg = check_file(uuid, "asrx.json") +segments = 0 +if ok: + try: + with open(OUTPUT_DIR / f"{uuid}.asrx.json") as fh: + d = json.load(fh) + segments = len(d.get("segments", [])) + except Exception: + segments = 0 +check("ASRX output", [ + ("asrx.json exists", ok, msg), + ("has segments", segments > 0, f"{segments} segments"), +]) + +# ── Stage 3: Rule 1 + Sentence Chunks ── +print("[3/8] Rule 1 - Sentence Chunks") +chunk_count = int(run_sql(f"SELECT count(*) FROM dev.chunks WHERE file_uuid='{uuid}' AND chunk_type='sentence'")) +check("sentence chunks", [ + ("chunks exist", chunk_count > 0, f"{chunk_count} sentence chunks"), +]) + +# ── Stage 4: Vectorization ── +print("[4/8] Vectorization") +vec_count = int(run_sql(f"SELECT count(*) FROM dev.chunk_vectors WHERE uuid='{uuid}'")) +col = os.environ.get('QDRANT_COLLECTION', 'momentry_dev_rule1_v2') +qdrant = subprocess.run( + ["curl", "-s", "-X", "POST", f"http://localhost:6333/collections/{col}/points/count", + "-H", "Content-Type: application/json", "-d", '{"exact": true}'], + capture_output=True, text=True, timeout=10, +) +qdrant_ok = '"count"' in qdrant.stdout +check("vector embeddings", [ + ("PG vectors", vec_count > 0, f"{vec_count} vectors"), + ("Qdrant accessible", qdrant_ok, "ok" if qdrant_ok else "no response"), +]) + +# ── Stage 5: Face Trace ── +print("[5/8] Face Trace") +trace_count = int(run_sql(f"SELECT count(DISTINCT trace_id) FROM dev.face_detections WHERE file_uuid='{uuid}' AND trace_id IS NOT NULL")) +face_count = int(run_sql(f"SELECT count(*) FROM dev.face_detections WHERE file_uuid='{uuid}' AND trace_id IS NOT NULL")) +check("face trace", [ + ("traces exist", trace_count > 0, f"{trace_count} traces, {face_count} detections"), +]) + +# ── Stage 6: TKG Graph ── +print("[6/8] TKG") +node_count = int(run_sql(f"SELECT count(*) FROM dev.tkg_nodes WHERE file_uuid='{uuid}'")) +edge_count = int(run_sql(f"SELECT count(*) FROM dev.tkg_edges WHERE file_uuid='{uuid}'")) +face_face = int(run_sql(f"SELECT count(*) FROM dev.tkg_edges WHERE file_uuid='{uuid}' AND edge_type='CO_OCCURS_WITH' AND source_node_id IN (SELECT id FROM dev.tkg_nodes WHERE node_type='face_trace')")) +check("TKG graph", [ + ("nodes", node_count > 0, f"{node_count} nodes"), + ("edges", edge_count > 0, f"{edge_count} edges"), + ("face-face edges", face_face > 0, f"{face_face} face-face edges"), +]) + +# ── Stage 7: Trace Chunks ── +print("[7/8] Trace Chunks") +trace_chunks = int(run_sql(f"SELECT count(*) FROM dev.chunks WHERE file_uuid='{uuid}' AND chunk_type='trace'")) +check("trace chunks", [ + ("trace chunks exist", trace_chunks > 0, f"{trace_chunks} trace chunks"), +]) + +# ── Stage 8: Phase 1 Release ── +print("[8/8] Phase 1 Release") +phase1_dir = PROJECT / "release" / "phase1" / "latest" +phase1_ok = phase1_dir.exists() and (phase1_dir / "RELEASE_INFO.txt").exists() +if phase1_ok: + total_size = sum(f.stat().st_size for f in phase1_dir.rglob("*") if f.is_file()) + msg = f"ok ({total_size // 1024}KB)" +else: + msg = "not found" +check("Phase 1 release", [ + ("release dir exists", phase1_ok, msg), +]) + + +# ── Summary ── +print(f"\n{'='*60}") +print("SUMMARY") +print(f"{'='*60}") +all_passed = True +for name, passed, _ in results: + status = "✅" if passed else "❌" + print(f" {status} {name}") + all_passed = all_passed and passed + +print(f"\n{'PASS' if all_passed else 'FAIL'}") +print(f"{'='*60}\n") + +# Output as JSON for machine parsing +report = { + "uuid": uuid, + "timestamp": datetime.utcnow().isoformat(), + "passed": all_passed, + "stages": {name: {"passed": passed, "checks": {d: o for d, o, _ in checks}} + for name, passed, checks in results}, +} +print(json.dumps(report, indent=2)) diff --git a/scripts/release_pack.py b/scripts/release_pack.py index a50c16a..5ab0d8f 100644 --- a/scripts/release_pack.py +++ b/scripts/release_pack.py @@ -26,15 +26,19 @@ DB_USER = os.environ.get("USER", "accusys") DB_NAME = "momentry" QDRANT_URL = os.environ.get("QDRANT_URL", "http://localhost:6333") QDRANT_COLLECTION = os.environ.get("QDRANT_COLLECTION", "momentry_dev_rule1_v2") +PG_BIN = "/Users/accusys/pgsql/18.3/bin" def ts(): return datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") +def psql_cmd() -> list: + return [f"{PG_BIN}/psql", "-U", DB_USER, "-d", DB_NAME] + def run_sql(sql: str) -> str: r = subprocess.run( - ["psql", "-U", DB_USER, "-d", DB_NAME, "-t", "-A", "-c", sql], + psql_cmd() + ["-t", "-A", "-c", sql], capture_output=True, text=True, timeout=30, ) return r.stdout.strip() @@ -57,7 +61,7 @@ def pack_phase(file_uuid: str, phase: int) -> Path: schema_path = pkg_dir / "schema.sql" with open(schema_path, "w") as fh: subprocess.run( - ["pg_dump", "-U", DB_USER, "-d", DB_NAME, "--schema=dev", "--schema-only", + [f"{PG_BIN}/pg_dump", "-U", DB_USER, "-d", DB_NAME, "--schema=dev", "--schema-only", "-T", "dev.monitor_jobs", "-T", "dev.processor_results"], stdout=fh, text=True, timeout=60, ) @@ -141,6 +145,7 @@ def main(): parser = argparse.ArgumentParser() parser.add_argument("--phase", type=int, required=True, choices=[1, 2]) parser.add_argument("--file-uuid", required=True) + parser.add_argument("--uuid", help="UUID for Redis tracking (accepted by executor)") args = parser.parse_args() pack_phase(args.file_uuid, args.phase) diff --git a/scripts/store_traced_faces.py b/scripts/store_traced_faces.py index 8dd2ddc..dfdbb78 100644 --- a/scripts/store_traced_faces.py +++ b/scripts/store_traced_faces.py @@ -150,8 +150,12 @@ def store_traced_faces(file_uuid: str, traced_json_path: str, schema: str = SCHE def main(): parser = argparse.ArgumentParser(description="Store traced faces in DB") parser.add_argument("--file-uuid", required=True, help="Video file UUID") + parser.add_argument("--face-json", help="Path to face.json (default: auto-detect)") + parser.add_argument("--schema", default=SCHEMA, help="DB schema name") + + parser.add_argument("--uuid", help="UUID for Redis tracking (accepted by executor)") args = parser.parse_args() face_json = args.face_json or os.path.join( diff --git a/scripts/tkg_builder.py b/scripts/tkg_builder.py index 8941d7f..f933717 100644 --- a/scripts/tkg_builder.py +++ b/scripts/tkg_builder.py @@ -436,6 +436,7 @@ def main(): parser = argparse.ArgumentParser(description="Build Temporal Knowledge Graph") parser.add_argument("--file-uuid", required=True) parser.add_argument("--schema", default=SCHEMA) + parser.add_argument("--uuid", help="UUID for Redis tracking (accepted by executor)") args = parser.parse_args() conn = get_conn()