Files
momentry_core/scripts/generate_chunk_summaries.py
Warren e75c4d6f07 cleanup: remove dead code and duplicate docs
- Remove session-ses_2f27.md (161KB raw session log)
- Remove 49 ROOT_* duplicate files across REFERENCE/
- Remove 14 duplicate files between REFERENCE/ root and history/
- Remove asr_legacy.rs (dead code, replaced by asr.rs)
- Remove src/core/worker/ (duplicate JobWorker)
- Remove src/core/layers/ (empty directory)
- Remove 4 .bak files in src/
- Remove 7 dead private methods in worker/processor.rs
- Remove backup directory from git tracking
2026-05-04 01:31:21 +08:00

454 lines
14 KiB
Python
Executable File

#!/opt/homebrew/bin/python3.11
"""
Generate individual chunk summaries combining:
- chunk.text_content (specific content)
- parent.structured_summary (5W1H context)
Each chunk gets a tailored summary that contextualizes its specific content
within the broader parent chunk narrative.
"""
import json
import requests
import psycopg2
import psycopg2.extras
import time
import os
DB_CONFIG = {
"host": "localhost",
"user": "accusys",
"dbname": "momentry",
}
SCHEMA = os.environ.get("DATABASE_SCHEMA", "dev")
LLAMA_URL = "http://127.0.0.1:8081/v1/chat/completions"
BATCH_SIZE = 50
DELAY_BETWEEN_BATCHES = 1
def get_chunks_with_parents(uuid=None, limit=None):
"""Get chunks with their parent 5W1H metadata and identity info"""
conn = psycopg2.connect(**DB_CONFIG)
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
where_clause = "WHERE c.summary_text IS NULL AND c.text_content IS NOT NULL AND c.parent_chunk_id IS NOT NULL"
if uuid:
where_clause += f" AND c.uuid = '{uuid}'"
query = f"""
SELECT c.chunk_id, c.uuid, c.text_content, c.chunk_type,
c.parent_chunk_id,
c.speaker_ids,
c.face_ids,
c.visual_stats,
pc.metadata->'structured_summary' as structured_summary,
pc.summary_text as parent_summary,
c.start_time,
c.end_time
FROM {SCHEMA}.chunks c
LEFT JOIN {SCHEMA}.parent_chunks pc
ON c.parent_chunk_id = pc.id::varchar
{where_clause}
ORDER BY c.chunk_id
"""
if limit:
query += f" LIMIT {limit}"
cur.execute(query)
chunks = cur.fetchall()
cur.close()
conn.close()
return chunks
def get_person_identities(uuid, start_time, end_time):
"""取得 chunk 時間範圍內的人物識別"""
conn = psycopg2.connect(**DB_CONFIG)
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute(
f"""
SELECT person_id, name, speaker_id
FROM {SCHEMA}.person_identities
WHERE video_uuid = %s
AND speaker_id IS NOT NULL
AND last_appearance_time >= %s
AND first_appearance_time <= %s
""",
(uuid, start_time, end_time),
)
persons = cur.fetchall()
cur.close()
conn.close()
return persons
if limit:
query += f" LIMIT {limit}"
cur.execute(query)
chunks = cur.fetchall()
cur.close()
conn.close()
return chunks
def call_llm(prompt, max_tokens=500):
"""Call Gemma4 via llama-server"""
payload = {
"messages": [{"role": "user", "content": prompt}],
"max_tokens": max_tokens,
"temperature": 0.3,
"min_p": 0.1,
}
try:
resp = requests.post(LLAMA_URL, json=payload, timeout=60)
if resp.status_code == 200:
result = resp.json()
choice = result.get("choices", [{}])[0]
message = choice.get("message", {})
# Gemma4 returns content directly (final answer)
content = message.get("content", "").strip()
# If content exists and is not empty, use it
if content:
return content
# If content is empty, try to extract from reasoning
reasoning = message.get("reasoning_content", "")
if reasoning:
# Look for final answer markers in reasoning
markers = ["Final:", "**Final**:", "Final answer:", "**Final answer**:"]
for marker in markers:
if marker in reasoning:
answer = reasoning.split(marker)[-1].strip()
# Clean up the answer
answer = answer.split("\n")[0].strip()
if answer and not answer.startswith("Thinking"):
return answer
# Last resort: return the whole reasoning (will contain thinking process)
return reasoning.strip()
except Exception as e:
print(f" ⚠️ LLM error: {e}")
return ""
def generate_chunk_summary(chunk):
"""Generate summary for a single chunk with 5W1H"""
text_content = chunk.get("text_content", "")
parent_5w1h = chunk.get("structured_summary") or {}
parent_summary = chunk.get("parent_summary", "")
speaker_ids = chunk.get("speaker_ids", [])
face_ids = chunk.get("face_ids", [])
visual_stats = chunk.get("visual_stats", {})
uuid = chunk.get("uuid", "")
start_time = chunk.get("start_time", 0)
end_time = chunk.get("end_time", 0)
if not text_content:
return ""
speaker_list = ", ".join(speaker_ids) if speaker_ids else "None"
face_list = ", ".join([f"face_{x}" for x in face_ids]) if face_ids else "None"
visual_objects = (
visual_stats.get("objects", []) if isinstance(visual_stats, dict) else []
)
visual_places = (
visual_stats.get("places", []) if isinstance(visual_stats, dict) else []
)
visual_actions = (
visual_stats.get("actions", []) if isinstance(visual_stats, dict) else []
)
visual_list = ", ".join(visual_objects[:5]) if visual_objects else "None"
places_list = ", ".join(visual_places[:3]) if visual_places else "None"
actions_list = ", ".join(visual_actions[:3]) if visual_actions else "None"
identified_persons = []
if uuid and start_time and end_time:
try:
identified_persons = get_person_identities(uuid, start_time, end_time)
except Exception as e:
print(f" ⚠️ Person lookup error: {e}")
person_list = (
", ".join(
[
f"{p['name'] or p['person_id']}({p['speaker_id']})"
for p in identified_persons
]
)
if identified_persons
else "None"
)
prompt = f"""You are analyzing a video chunk. Provide accurate, detailed 5W1H analysis.
CHUNK INFO:
- Chunk ID: {chunk.get("chunk_id")}
- Time range: {start_time:.2f}s - {end_time:.2f}s
BROADER SCENE CONTEXT (parent chunk, high confidence):
- Scene Who: {parent_5w1h.get("who", "N/A")}
- Scene What: {parent_5w1h.get("what", "N/A")}
- Scene When: {parent_5w1h.get("when", "N/A")}
- Scene Where: {parent_5w1h.get("where", "N/A")}
- Scene Why: {parent_5w1h.get("why", "N/A")}
- Scene How: {parent_5w1h.get("how", "N/A")}
- Tone: {parent_5w1h.get("tone", [])}
- Characters: {parent_5w1h.get("characters", [])}
- Key Events: {parent_5w1h.get("key_events", [])}
Parent summary: {parent_summary[:150] if parent_summary else "N/A"}...
CHUNK IDENTITY (from ASRX + Face + Person Recognition):
- Speakers (ASRX): {speaker_list}
- Faces (Face): {face_list}
- Identified Persons (verified): {person_list}
VISUAL CONTEXT (YOLO + Places365):
- Objects: {visual_list}
- Places: {places_list}
- Actions: {actions_list}
THIS CHUNK'S CONTENT:
"{text_content}"
Based on ALL the above information, provide accurate analysis:
1. **Who** (use verified names if available, e.g., "John (SPEAKER_1)"):
- List characters with confidence level
2. **What** (key action in this specific moment)
3. **When** (temporal position: beginning/middle/end of scene)
4. **Where** (location from video or None)
5. **Why** (purpose of this specific action)
6. **How** (manner: tone, emotion, expression)
7. **Emotion/Tone** (specific emotions detected)
8. **Key Actions** (verbs describing what's happening)
Output format:
Who: [names with source]
What: [action]
When: [position]
Where: [location or None]
Why: [purpose]
How: [manner]
Emotion: [emotion]
Actions: [verb1, verb2]
---
Summary: [2-3 sentence detailed summary connecting to scene]"""
result = call_llm(prompt)
return result
def parse_5w1h_summary(result_text):
"""Parse 5W1H and summary from LLM response"""
data = {
"who": "",
"what": "",
"when": "",
"where": "",
"why": "",
"how": "",
"emotion": "",
"actions": "",
"summary": "",
}
try:
parts = result_text.split("---")
if len(parts) >= 2:
five_w_one_h = parts[0].strip()
data["summary"] = parts[1].strip().replace("Summary:", "").strip()
for line in five_w_one_h.split("\n"):
line = line.strip()
if line.startswith("Who:"):
data["who"] = line.replace("Who:", "").strip()
elif line.startswith("What:"):
data["what"] = line.replace("What:", "").strip()
elif line.startswith("When:"):
data["when"] = line.replace("When:", "").strip()
elif line.startswith("Where:"):
data["where"] = line.replace("Where:", "").strip()
elif line.startswith("Why:"):
data["why"] = line.replace("Why:", "").strip()
elif line.startswith("How:"):
data["how"] = line.replace("How:", "").strip()
elif line.startswith("Emotion:"):
data["emotion"] = line.replace("Emotion:", "").strip()
elif line.startswith("Actions:"):
data["actions"] = line.replace("Actions:", "").strip()
data["what"] = line.replace("What:", "").strip()
elif line.startswith("When:"):
data["when"] = line.replace("When:", "").strip()
elif line.startswith("Where:"):
data["where"] = line.replace("Where:", "").strip()
elif line.startswith("Why:"):
data["why"] = line.replace("Why:", "").strip()
elif line.startswith("How:"):
data["how"] = line.replace("How:", "").strip()
except Exception as e:
print(f" ⚠️ Parse error: {e}")
return data
def update_chunk_summary(
chunk_id,
summary_text,
chunk_5w1h=None,
identity_info=None,
visual_stats=None,
uuid=None,
):
"""Update chunk summary, 5W1H, identity, and visual in database"""
conn = psycopg2.connect(**DB_CONFIG)
cur = conn.cursor()
metadata_obj = {}
if chunk_5w1h:
metadata_obj["chunk_5w1h"] = chunk_5w1h
if identity_info:
metadata_obj["chunk_identity"] = identity_info
if visual_stats:
try:
metadata_obj["chunk_visual"] = (
visual_stats
if isinstance(visual_stats, dict)
else json.loads(str(visual_stats))
)
except:
metadata_obj["chunk_visual"] = {}
if metadata_obj:
metadata = json.dumps(metadata_obj)
cur.execute(
f"""
UPDATE {SCHEMA}.chunks
SET summary_text = %s,
metadata = COALESCE(metadata, '{{}}'::jsonb) || %s::jsonb,
metadata_version = metadata_version + 1,
updated_at = CURRENT_TIMESTAMP
WHERE chunk_id = %s
""",
(summary_text, metadata, chunk_id),
)
else:
cur.execute(
f"""
UPDATE {SCHEMA}.chunks
SET summary_text = %s,
content_version = content_version + 1,
updated_at = CURRENT_TIMESTAMP
WHERE chunk_id = %s
""",
(summary_text, chunk_id),
)
conn.commit()
cur.close()
conn.close()
def main():
import argparse
parser = argparse.ArgumentParser(description="Generate chunk summaries")
parser.add_argument("--uuid", help="Process specific video UUID")
parser.add_argument("--limit", type=int, help="Limit number of chunks")
parser.add_argument("--dry-run", action="store_true", help="Print without saving")
args = parser.parse_args()
print(f"Fetching chunks (schema={SCHEMA})...")
chunks = get_chunks_with_parents(uuid=args.uuid, limit=args.limit)
print(f"Found {len(chunks)} chunks to process")
if not chunks:
print("No chunks need summary generation")
return
success = 0
failed = 0
for i, chunk in enumerate(chunks, 1):
chunk_id = chunk["chunk_id"]
print(f"\n[{i}/{len(chunks)}] {chunk_id}")
if not chunk.get("text_content"):
print(" ⚠️ No text_content, skipping")
continue
if not chunk.get("structured_summary"):
print(" ⚠️ No parent 5W1H, skipping")
continue
print(f" Text: {chunk['text_content'][:50]}...")
result = generate_chunk_summary(chunk)
if result:
parsed = parse_5w1h_summary(result)
summary_text = parsed.get("summary", result)
chunk_5w1h = {k: v for k, v in parsed.items() if k != "summary" and v}
speaker_ids = chunk.get("speaker_ids", [])
face_ids = chunk.get("face_ids", [])
visual_stats = chunk.get("visual_stats", {})
identity_info = {
"speakers": speaker_ids,
"faces": [f"face_{x}" for x in face_ids] if face_ids else [],
}
print(f" ✓ Summary: {summary_text[:80]}...")
if chunk_5w1h:
print(
f" ✓ Chunk 5W1H: Who={chunk_5w1h.get('who', 'N/A')[:30]}, What={chunk_5w1h.get('what', 'N/A')[:30]}"
)
if identity_info["speakers"] or identity_info["faces"]:
print(
f" ✓ Identity: speakers={identity_info['speakers']}, faces={identity_info['faces']}"
)
if visual_stats:
print(
f" ✓ Visual: {list(visual_stats.keys()) if isinstance(visual_stats, dict) else 'present'}"
)
if not args.dry_run:
update_chunk_summary(
chunk_id,
summary_text,
chunk_5w1h,
identity_info,
visual_stats,
args.uuid,
)
success += 1
else:
print(" ✗ Failed to generate summary")
failed += 1
if i % BATCH_SIZE == 0:
print(f"\n Batch complete ({success} success, {failed} failed)")
time.sleep(DELAY_BETWEEN_BATCHES)
print(f"\n{'=' * 50}")
print(f"Done! Success: {success}, Failed: {failed}")
if args.dry_run:
print("(Dry run - no updates saved)")
if __name__ == "__main__":
main()