From 28652f5b76208b5d0b38c56cea86c613ecc644bb Mon Sep 17 00:00:00 2001 From: Accusys Date: Sat, 9 May 2026 13:58:55 +0800 Subject: [PATCH] feat: phased release packaging (Phase 1 + Phase 2) - scripts/release_pack.py: packages output_json + schema + chunks + vectors - Phase 1: triggered after ASR+ASRX+Rule 1+vectorization (sentence chunk delivery) - Phase 2: triggered after full pipeline + 5W1H Agent (full delivery) - Both phases include all available {uuid}.*.json files - Non-overlapping directories: release/phase1/ and release/phase2/ --- docs/RELEASE_PHASES.md | 75 +++++++++++++++++++++++++ scripts/release_pack.py | 115 +++++++++++++++++++++++++++++++++++++++ src/worker/job_worker.rs | 34 +++++++++++- 3 files changed, 223 insertions(+), 1 deletion(-) create mode 100644 docs/RELEASE_PHASES.md create mode 100644 scripts/release_pack.py diff --git a/docs/RELEASE_PHASES.md b/docs/RELEASE_PHASES.md new file mode 100644 index 0000000..f8e2cfb --- /dev/null +++ b/docs/RELEASE_PHASES.md @@ -0,0 +1,75 @@ +# Release 分階段交付 + +## 階段劃分 + +### Phase 1:Sentence Chunk Embedding 交付 + +**觸發時機**: ASR + ASRX 完成 + Rule 1 Ingestion 完成 + +**交付內容**: +- `{uuid}.asr.json` +- `{uuid}.asrx.json` +- chunks(chunk_type = 'sentence') +- chunk_vectors(sentence embedding) +- DB schema + chunks table data + +**用途**: 終端使用者可進行語意搜尋 + +### Phase 2:5W1H Summary Chunk Embedding 交付 + +**觸發時機**: 全部 processor 完成 + Rule 3 Ingestion + 5W1H Agent + +**交付內容**: +- Phase 1 全部內容 +- `{uuid}.cut.json` +- `{uuid}.yolo.json` +- `{uuid}.face.json` +- `{uuid}.pose.json` +- `{uuid}.ocr.json` +- chunks(chunk_type = 'cut', 'visual', 'trace', 'story') +- chunk_vectors(summary embedding) +- identities / identity_bindings / face_detections + +**用途**: 完整搜尋 + 摘要 + 人物識別 + +--- + +## Worker Pipeline 整合 + +``` +ASR 完成 → ASRX 完成 + ↓ + Rule 1 Ingestion (sentence chunks) + ↓ + Phase 1 Release Packaging ← 自動 + ↓ + 其餘 Processors 繼續 + ↓ + Rule 3 Ingestion (cut chunks + 5W1H summary) + ↓ + Phase 2 Release Packaging ← 自動 +``` + +## 產出目錄結構 + +``` +release/ +├── phase1/ +│ ├── v1.0.0_20260509_120000/ +│ │ ├── output_json/ ← asr.json, asrx.json +│ │ ├── schema.sql ← chunks table DDL +│ │ ├── chunks.csv ← sentence chunks data +│ │ ├── vectors.csv ← sentence embeddings +│ │ └── RELEASE_INFO.txt +│ └── latest → v1.0.0_20260509_120000 +│ +└── phase2/ + ├── v1.0.0_20260509_140000/ + │ ├── output_json/ ← all processor outputs + │ ├── schema.sql ← full schema + │ ├── chunks.csv ← all chunks + │ ├── vectors.csv ← all embeddings + │ ├── identities.csv ← person identities + │ └── RELEASE_INFO.txt + └── latest → v1.0.0_20260509_140000 +``` diff --git a/scripts/release_pack.py b/scripts/release_pack.py new file mode 100644 index 0000000..8847c73 --- /dev/null +++ b/scripts/release_pack.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python3 +""" +Release packaging — two non-overlapping phases. + +Phase 1: ASR + ASRX + Rule 1 sentence chunks complete +Phase 2: Full pipeline + Rule 3 + 5W1H complete + +Output: release/phase{N}/v{VERSION}_{TIMESTAMP}/ +""" + +import json +import os +import shutil +import subprocess +import sys +import time +from datetime import datetime, timezone +from pathlib import Path + +PROJECT = Path(__file__).resolve().parent.parent +OUTPUT_DIR = Path(os.environ.get("MOMENTRY_OUTPUT_DIR", PROJECT / "output_dev")) +RELEASE_DIR = PROJECT / "release" +VERSION = "v1.0.0" + +DB_USER = os.environ.get("USER", "accusys") +DB_NAME = "momentry" + + +def ts(): + return datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") + + +def run_sql(sql: str) -> str: + r = subprocess.run( + ["psql", "-U", DB_USER, "-d", DB_NAME, "-t", "-A", "-c", sql], + capture_output=True, text=True, timeout=30, + ) + return r.stdout.strip() + + +def pack_phase(file_uuid: str, phase: int) -> Path: + """Package deliverables for phase 1 or 2.""" + phase_dir = RELEASE_DIR / f"phase{phase}" + stamp = ts() + pkg_dir = phase_dir / f"{VERSION}_{stamp}" + out_dir = pkg_dir / "output_json" + out_dir.mkdir(parents=True, exist_ok=True) + + # 收集 processor output .json 檔 + for f in OUTPUT_DIR.glob(f"{file_uuid}.*.json"): + if f.is_file(): + shutil.copy2(f, out_dir / f.name) + + # 收集 schema + schema_path = pkg_dir / "schema.sql" + with open(schema_path, "w") as fh: + subprocess.run( + ["pg_dump", "-U", DB_USER, "-d", DB_NAME, "--schema=dev", "--schema-only", + "-T", "dev.monitor_jobs", "-T", "dev.processor_results"], + stdout=fh, text=True, timeout=60, + ) + + # 收集 chunks + chunks_csv = pkg_dir / "chunks.csv" + run_sql(f"\\COPY (SELECT * FROM dev.chunks WHERE file_uuid='{file_uuid}') TO '{chunks_csv}' CSV HEADER") + + # 收集 vectors + vecs_csv = pkg_dir / "vectors.csv" + run_sql(f"\\COPY (SELECT * FROM dev.chunk_vectors WHERE uuid='{file_uuid}') TO '{vecs_csv}' CSV HEADER") + + if phase >= 2: + faces_csv = pkg_dir / "face_detections.csv" + run_sql(f"\\COPY (SELECT * FROM dev.face_detections WHERE file_uuid='{file_uuid}') TO '{faces_csv}' CSV HEADER") + idents_csv = pkg_dir / "identities.csv" + run_sql(f"\\COPY (SELECT * FROM dev.identities) TO '{idents_csv}' CSV HEADER") + + # RELEASE_INFO + git_commit = subprocess.run( + ["git", "-C", str(PROJECT), "rev-parse", "HEAD"], + capture_output=True, text=True, timeout=10, + ).stdout.strip() + + info = pkg_dir / "RELEASE_INFO.txt" + with open(info, "w") as fh: + fh.write(f"Phase: {phase}\n") + fh.write(f"Version: {VERSION}\n") + fh.write(f"Timestamp: {stamp}\n") + fh.write(f"File UUID: {file_uuid}\n") + fh.write(f"Git Commit: {git_commit}\n") + fh.write(f"Packaged at: {datetime.now(timezone.utc).isoformat()}\n") + + # latest symlink + latest = phase_dir / "latest" + if latest.is_symlink(): + latest.unlink() + if not latest.exists(): + latest.symlink_to(pkg_dir.name, target_is_directory=True) + + size = sum(f.stat().st_size for f in pkg_dir.rglob("*") if f.is_file()) + print(f"[RELEASE] Phase {phase} packaged: {pkg_dir} ({size / 1024:.0f} KB)") + return pkg_dir + + +def main(): + import argparse + parser = argparse.ArgumentParser() + parser.add_argument("--phase", type=int, required=True, choices=[1, 2]) + parser.add_argument("--file-uuid", required=True) + args = parser.parse_args() + + pack_phase(args.file_uuid, args.phase) + + +if __name__ == "__main__": + main() diff --git a/src/worker/job_worker.rs b/src/worker/job_worker.rs index dceb674..4accd3e 100644 --- a/src/worker/job_worker.rs +++ b/src/worker/job_worker.rs @@ -681,6 +681,21 @@ impl JobWorker { error!("❌ Auto-vectorize failed for {}: {}", uuid_clone, e); } } + // Phase 1 release: sentence chunk embedding 交付 + info!("📦 Phase 1 release packaging..."); + let executor = match crate::core::processor::PythonExecutor::new() { + Ok(ex) => ex, + Err(e) => { error!("Failed PythonExecutor for release pack: {}", e); return; } + }; + match executor.run( + "release_pack.py", + &["--phase", "1", "--file-uuid", &uuid_clone], + None, "RELEASE_P1", + Some(std::time::Duration::from_secs(120)), + ).await { + Ok(()) => info!("✅ Phase 1 release packaged for {}", uuid_clone), + Err(e) => error!("❌ Phase 1 release pack failed: {}", e), + } } Err(e) => error!("❌ Rule 1 Ingestion failed: {}", e), } @@ -830,7 +845,24 @@ impl JobWorker { tokio::spawn(async move { tokio::time::sleep(tokio::time::Duration::from_secs(30)).await; match run_5w1h_agent(&db_clone, &uuid_clone).await { - Ok(()) => info!("✅ 5W1H Agent completed for {}", uuid_clone), + Ok(()) => { + info!("✅ 5W1H Agent completed for {}", uuid_clone); + // Phase 2 release: full pipeline 交付 + info!("📦 Phase 2 release packaging..."); + let executor = match crate::core::processor::PythonExecutor::new() { + Ok(ex) => ex, + Err(e) => { error!("Failed PythonExecutor for release pack: {}", e); return; } + }; + match executor.run( + "release_pack.py", + &["--phase", "2", "--file-uuid", &uuid_clone], + None, "RELEASE_P2", + Some(std::time::Duration::from_secs(120)), + ).await { + Ok(()) => info!("✅ Phase 2 release packaged for {}", uuid_clone), + Err(e) => error!("❌ Phase 2 release pack failed: {}", e), + } + } Err(e) => error!("❌ 5W1H Agent failed for {}: {}", uuid_clone, e), } });