feat: Phase 2.6 edges migration to Qdrant (TKG-only architecture)

Phase 2.6.1: co_occurrence_edges migration
- build_co_occurrence_edges_from_qdrant()
- Qdrant embeddings → frame grouping → YOLO objects
- Result: 6679 edges (vs 6701 PostgreSQL)

Phase 2.6.2: face_face_edges migration
- build_face_face_edges_from_qdrant()
- Qdrant embeddings → frame grouping → face pairs
- mutual_gaze detection preserved
- Result: 6 edges (exact match)

Phase 2.6.3: speaker_face_edges migration
- build_speaker_face_edges_from_qdrant()
- Qdrant embeddings → trace_id frame ranges
- SPEAKS_AS edge creation

Architecture:
- All edges use Qdrant payload (no face_detections queries)
- PostgreSQL fallback for empty Qdrant
- Estimated 3.6x performance improvement

Testing:
- Playground (3003): ✓ All Phase 2.6 logs verified
- Edge counts: ✓ Close match with PostgreSQL
- Fallback: ✓ Working

Docs:
- docs_v1.0/DESIGN/TKG_PHASE2_6_EDGES_MIGRATION.md
- docs_v1.0/M4_workspace/2026-06-21_phase2_6_test.md
This commit is contained in:
Accusys
2026-06-21 04:47:49 +08:00
parent 0afc70fc5b
commit 2cfcfdd1af
2926 changed files with 8311058 additions and 1394 deletions

View File

@@ -0,0 +1,790 @@
#!/opt/homebrew/bin/python3.11
"""
Momentry Core Visual Demo Dashboard
職責:提供處理器模組的視覺化預覽,支持時間軸檢查與多模組疊加顯示。
"""
import os
import json
import cv2
import numpy as np
import streamlit as st
import pandas as pd
import altair as alt
from PIL import Image, ImageDraw, ImageFont
import time
# ==========================================
# 設定與輔助函數
# ==========================================
OUTPUT_DIR = os.getenv("MOMENTRY_OUTPUT_DIR", "./output")
VIDEO_BASE_DIR = os.path.join(OUTPUT_DIR, "quick_preview") # 指向預覽目錄
# 色彩定義 (OpenCV BGR 格式)
COLORS = {
"YOLO": (0, 255, 0), # 綠
"FACE": (255, 0, 0), # 藍
"POSE": (0, 0, 255), # 紅
"OCR": (0, 255, 255), # 黃
"SCENE": (255, 255, 255), # 白 (文字)
}
# 骨架連接對 (MediaPipe Pose)
POSE_CONNECTIONS = [
(11, 12),
(11, 13),
(13, 15),
(12, 14),
(14, 16), # 上半身
(11, 23),
(12, 23),
(23, 24),
(23, 25),
(25, 27), # 下半身左
(24, 26),
(26, 28), # 下半身右
]
def load_json_safe(uuid, module):
path = os.path.join(OUTPUT_DIR, "quick_preview", f"preview.{module}.json")
if not os.path.exists(path):
return None
with open(path, "r") as f:
return json.load(f)
def get_video_path(uuid):
# 直接返回預覽影片
return os.path.join(OUTPUT_DIR, "quick_preview", "preview.mp4")
# ==========================================
# 渲染邏輯 (Renderers)
# ==========================================
def draw_yolo_overlay(frame, yolo_data, timestamp):
"""繪製 YOLO 檢測框"""
if not yolo_data:
return frame
h, w = frame.shape[:2]
# 尋找最接近的幀
best_frame = None
min_diff = float("inf")
frames_data = yolo_data.get("frames", {})
if isinstance(frames_data, dict):
frames_list = list(frames_data.values())
else:
frames_list = frames_data
for f in frames_list:
ts = f.get("time_seconds") or f.get("timestamp", 0)
diff = abs(ts - timestamp)
if diff < min_diff:
min_diff = diff
best_frame = f
if best_frame and min_diff < 0.1:
for obj in best_frame.get("detections", []):
# YOLO output has x1, y1, x2, y2 directly
x1 = int(obj.get("x1", 0))
y1 = int(obj.get("y1", 0))
x2 = int(obj.get("x2", 0))
y2 = int(obj.get("y2", 0))
label = f"{obj.get('class_name', '?')} {obj.get('confidence', 0):.2f}"
# Draw Rectangle
cv2.rectangle(frame, (x1, y1), (x2, y2), COLORS["YOLO"], 2)
# Draw Label Background
(tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
cv2.rectangle(frame, (x1, y1 - 15), (x1 + tw, y1), COLORS["YOLO"], -1)
# Draw Text
cv2.putText(
frame, label, (x1, y1 - 3), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1
)
return frame
def draw_pose_overlay(frame, pose_data, timestamp):
"""繪製 Pose 骨架"""
if not pose_data:
return frame
h, w = frame.shape[:2]
best_frame = None
min_diff = float("inf")
for f in pose_data.get("frames", []):
diff = abs(f.get("timestamp", 0) - timestamp)
if diff < min_diff:
min_diff = diff
best_frame = f
if best_frame and min_diff < 0.5:
for person in best_frame.get("persons", []):
kps = person.get("keypoints", [])
if not kps:
continue
# 繪製節點與連線
for conn in POSE_CONNECTIONS:
p1 = kps[conn[0]] if conn[0] < len(kps) else None
p2 = kps[conn[1]] if conn[1] < len(kps) else None
if (
p1
and p2
and p1.get("confidence", 0) > 0.5
and p2.get("confidence", 0) > 0.5
):
pt1 = (int(p1["x"] * w), int(p1["y"] * h))
pt2 = (int(p2["x"] * w), int(p2["y"] * h))
cv2.line(frame, pt1, pt2, COLORS["POSE"], 2)
return frame
def draw_ocr_overlay(frame, ocr_data, timestamp):
"""繪製 OCR 文字區域"""
if not ocr_data:
return frame
h, w = frame.shape[:2]
frames_data = ocr_data.get("frames", [])
if isinstance(frames_data, dict):
frames_list = list(frames_data.values())
else:
frames_list = frames_data
best_frame = None
min_diff = float("inf")
for f in frames_list:
diff = abs(f.get("timestamp", 0) - timestamp)
if diff < min_diff:
min_diff = diff
best_frame = f
if best_frame and min_diff < 0.5:
for text in best_frame.get("texts", []):
# Check if bbox is a list of 4 points OR x,y,w,h
box = text.get("bbox", [])
if isinstance(box, list) and len(box) == 4:
# Format: [[x1,y1], [x2,y2], [x3,y3], [x4,y4]]
pts = np.array([[int(p[0]), int(p[1])] for p in box], np.int32)
pts = pts.reshape((-1, 1, 2))
cv2.polylines(frame, [pts], True, COLORS["OCR"], 2)
cv2.putText(
frame,
text.get("text", ""),
(pts[0][0][0], pts[0][0][1] - 5),
cv2.FONT_HERSHEY_SIMPLEX,
0.4,
COLORS["OCR"],
1,
)
else:
# Format: x, y, width, height (EasyOCR style)
x = text.get("x", 0)
y = text.get("y", 0)
width = text.get("width", 0)
height = text.get("height", 0)
# Normalize to pixels if < 1
if x <= 1:
x *= w
if y <= 1:
y *= h
if width <= 1:
width *= w
if height <= 1:
height *= h
x, y, width, height = int(x), int(y), int(width), int(height)
cv2.rectangle(frame, (x, y), (x + width, y + height), COLORS["OCR"], 2)
cv2.putText(
frame,
text.get("text", ""),
(x, y - 5),
cv2.FONT_HERSHEY_SIMPLEX,
0.4,
COLORS["OCR"],
1,
)
return frame
def draw_scene_label(frame, scene_data, timestamp):
"""繪製場景標籤"""
if not scene_data:
return frame
for scene in scene_data.get("scenes", []):
if scene.get("start_time", 0) <= timestamp <= scene.get("end_time", 0):
label = f"📍 {scene.get('scene_type_zh') or scene.get('scene_type')}"
cv2.putText(
frame, label, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 0), 4
) # 陰影
cv2.putText(
frame,
label,
(10, 30),
cv2.FONT_HERSHEY_SIMPLEX,
0.8,
COLORS["SCENE"],
2,
)
break
return frame
def draw_face_overlay(frame, face_data, timestamp):
"""繪製 Face 檢測框"""
if not face_data:
return frame
h, w = frame.shape[:2]
frames_data = face_data.get("frames", [])
if isinstance(frames_data, dict):
frames_list = list(frames_data.values())
else:
frames_list = frames_data
best_frame = None
min_diff = float("inf")
for f in frames_list:
diff = abs(f.get("timestamp", 0) - timestamp)
if diff < min_diff:
min_diff = diff
best_frame = f
if best_frame and min_diff < 1.5: # 放寬容忍度到 1.5 秒,以匹配稀疏的關鍵幀
for face in best_frame.get("faces", []):
# Format: x, y, width, height (pixels)
x = face.get("x", 0)
y = face.get("y", 0)
width = face.get("width", 0)
height = face.get("height", 0)
cv2.rectangle(frame, (x, y), (x + width, y + height), COLORS["FACE"], 2)
# 優先顯示聚類後的 Person ID (使用 PIL 支援中文)
person_id = face.get("person_id")
if person_id:
label = f"ID: {person_id}"
color_rgb = (255, 255, 0) # Yellow
else:
label = f"Face {face.get('confidence', 0):.2f}"
color_rgb = tuple(COLORS["FACE"][::-1]) # RGB
# 1. 轉換為 PIL 格式以繪製中文
from PIL import Image, ImageDraw, ImageFont
img_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(img_pil)
# 2. 載入中文字型 (直接使用 STHeiti因為 PingFang.ttc 是集合檔有時無法讀取)
try:
font = ImageFont.truetype(
"/System/Library/Fonts/STHeiti Medium.ttc", 24
)
except:
# 備案:如果 STHeiti 也失敗,嘗試 Arial Unicode 或預設
try:
font = ImageFont.truetype("/Library/Fonts/Arial Unicode.ttf", 24)
except:
font = ImageFont.load_default()
# 3. 計算文字大小
bbox = draw.textbbox((0, 0), label, font=font)
tw = bbox[2] - bbox[0]
th = bbox[3] - bbox[1]
# 4. 繪製位置 (臉部框上方)
px = x
py = max(th + 5, y) # 確保文字不會超出畫面頂部
# 5. 繪製黑色背景
draw.rectangle([px, py - th - 4, px + tw + 4, py], fill=(0, 0, 0))
# 6. 繪製文字
draw.text((px + 2, py - th - 2), label, font=font, fill=color_rgb)
# 7. 轉回 OpenCV 格式 (BGR)
frame = cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR)
return frame
def draw_speaker_overlay(frame, asrx_data, timestamp):
"""繪製 Speaker 標籤 (右上角)"""
if not asrx_data:
return frame
# 尋找當前時間段的說話人
segments = asrx_data.get("segments", [])
current_speaker = None
for seg in segments:
start = seg.get("start", 0)
end = seg.get("end", 0)
if start <= timestamp <= end:
current_speaker = seg.get("speaker_id")
break
if current_speaker:
# 檢查是否有綁定身份 (這裡暫時直接顯示 ID未來可擴展查詢 DB)
label = f"🎤 {current_speaker}"
# 繪製標籤
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 1.0
thickness = 2
color = (255, 165, 0) # 橙色
(tw, th), _ = cv2.getTextSize(label, font, font_scale, thickness)
margin = 10
x, y = frame.shape[1] - tw - margin, th + margin
# 背景
cv2.rectangle(frame, (x - 5, y - th - 5), (x + tw + 5, y + 5), color, -1)
# 文字
cv2.putText(frame, label, (x, y), font, font_scale, (0, 0, 0), thickness)
return frame
def draw_asr_subtitle(frame, asr_data, timestamp):
"""繪製字幕 (Support Chinese)"""
if not asr_data:
return frame
h, w = frame.shape[:2]
# 尋找當前句子
text = ""
for seg in asr_data.get("segments", []):
if seg.get("start", 0) <= timestamp <= seg.get("end", 0):
text = seg.get("text", "")
break
if text:
# Convert BGR (OpenCV) to RGB (PIL)
img_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(img_pil)
# Measure text size to draw background
try:
font = ImageFont.truetype("/System/Library/Fonts/STHeiti Medium.ttc", 24)
except:
try:
font = ImageFont.truetype("/System/Library/Fonts/PingFang.ttc", 24)
except:
font = ImageFont.load_default()
bbox = draw.textbbox((0, 0), text, font=font)
text_w = bbox[2] - bbox[0]
text_h = bbox[3] - bbox[1]
# Background position
bg_x = (w - text_w) // 2
bg_y = h - text_h - 20
# Draw Background
draw.rectangle(
[bg_x - 10, bg_y - 10, bg_x + text_w + 10, bg_y + text_h + 10],
fill=(0, 0, 0),
)
# Draw Text
draw.text((bg_x, bg_y), text, font=font, fill=(255, 255, 255))
# Convert back to BGR
frame = cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR)
return frame
h, w = frame.shape[:2]
# 尋找當前句子
text = ""
for seg in asr_data.get("segments", []):
if seg.get("start", 0) <= timestamp <= seg.get("end", 0):
text = seg.get("text", "")
break
if text:
# 黑底白字
text_size = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)[0]
text_x = (w - text_size[0]) // 2
text_y = h - 30
cv2.rectangle(
frame,
(text_x - 5, text_y - 25),
(text_x + text_size[0] + 5, text_y + 5),
(0, 0, 0),
-1,
)
cv2.putText(
frame,
text,
(text_x, text_y),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(255, 255, 255),
2,
)
return frame
# ==========================================
# 主應用邏輯
# ==========================================
def main():
st.set_page_config(layout="wide", page_title="Momentry Visual Demo")
st.title("🎬 Momentry Processor Visual Demo")
uuid = "quick_preview"
video_path = get_video_path(uuid)
if not video_path or not os.path.exists(video_path):
st.error(f"Video file not found at {video_path}")
return
# 1. 原始音視頻播放器 (讓用戶聽到聲音)
st.subheader("🔊 原始聲音播放器 (可聽 Speaker 聲音)")
st.video(video_path, start_time=0)
st.markdown("---")
# 2. 使用說明 (How to Use)
with st.expander("📖 如何使用本工具?(點擊展開說明)"):
st.markdown(
"""
1. **時間軸控制**: 拖動下方的滑動條 (Slider) 來移動影片時間點。
2. **開啟/關閉功能**: 在右側的 **Layers** 面板中,勾選您想看到的效果。
- **✅ YOLO**: 綠色框標記物體 (如人、桌子)。
- **✅ ASR**: 底部顯示白色字幕。
- **✅ Scene**: 左上角顯示場景名稱。
3. **查看統計**: 底部圖表顯示各模組在哪些時間段有數據。
"""
)
# 3. 載入 JSON 數據
col1, col2 = st.columns([3, 1])
with col1:
st.header("Frame Inspector (幀檢查器)")
with col2:
st.subheader("顯示層控制 (Layers)")
show_yolo = st.checkbox("YOLO (Object)", value=True)
show_face = st.checkbox("Face (Person)", value=True)
show_pose = st.checkbox("Pose (Skeleton)", value=False)
show_ocr = st.checkbox("OCR (Text)", value=False)
show_scene = st.checkbox("Scene (Label)", value=True)
show_asr = st.checkbox("ASR (Subtitle)", value=True)
# 3. 數據載入
yolo_data = load_json_safe(uuid, "yolo") if show_yolo else None
# 強制嘗試載入聚類數據
face_data = load_json_safe(uuid, "face_clustered")
if face_data:
st.success("✅ 已載入聚類數據 (Face Clustered)")
else:
face_data = load_json_safe(uuid, "face")
st.warning("⚠️ 未找到聚類數據,使用原始數據")
pose_data = load_json_safe(uuid, "pose") if show_pose else None
ocr_data = load_json_safe(uuid, "ocr") if show_ocr else None
scene_data = load_json_safe(uuid, "scene") if show_scene else None
asr_data = load_json_safe(uuid, "asr") if show_asr else None
# 載入 ASRX (Speaker) 數據
asrx_data = load_json_safe(uuid, "asrx")
# 4. 視頻與幀控制與播放邏輯
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = total_frames / fps if fps else 0
# 初始化 Session State
if "playing" not in st.session_state:
st.session_state.playing = False
if "current_time" not in st.session_state:
st.session_state.current_time = 0.0
# 播放控制區
col_play, col_reset, col_info = st.columns([1, 1, 4])
with col_play:
if st.button("▶ 播放"):
st.session_state.playing = True
with col_reset:
if st.button("⏹ 重置"):
st.session_state.playing = False
st.session_state.current_time = 0.0
with col_info:
st.write(f"時間: {st.session_state.current_time:.2f} / {duration:.1f} s")
# 自動播放邏輯
placeholder = st.empty()
progress_bar = st.progress(0.0)
while st.session_state.playing:
if st.session_state.current_time >= duration:
st.session_state.playing = False
st.session_state.current_time = 0.0
break
current_time = st.session_state.current_time
frame_idx = int(current_time * fps)
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
ret, frame = cap.read()
if ret:
# 渲染
if show_asr:
frame = draw_asr_subtitle(frame, asr_data, current_time)
frame = draw_speaker_overlay(frame, asrx_data, current_time)
if show_scene:
frame = draw_scene_label(frame, scene_data, current_time)
if show_yolo:
frame = draw_yolo_overlay(frame, yolo_data, current_time)
if show_face:
frame = draw_face_overlay(frame, face_data, current_time)
if show_pose:
frame = draw_pose_overlay(frame, pose_data, current_time)
if show_ocr:
frame = draw_ocr_overlay(frame, ocr_data, current_time)
# 顯示
with placeholder.container():
st.image(frame, channels="BGR", use_container_width=True)
progress_bar.progress(
current_time / duration, text=f"播放中: {current_time:.1f}s"
)
# 更新時間 (每幀間隔)
time.sleep(1.0 / fps if fps > 0 else 0.04)
st.session_state.current_time += 1.0 / fps if fps > 0 else 0.04
else:
st.session_state.playing = False
break
# 手動拖動條 (僅在暫停時顯示/可用)
if not st.session_state.playing:
st.session_state.current_time = st.slider(
"⏯ 手動調整時間",
0.0,
duration,
st.session_state.current_time,
step=0.1,
key="manual_slider",
)
progress_bar.progress(
st.session_state.current_time / duration,
text=f"已暫停: {st.session_state.current_time:.1f}s",
)
# 最後一幀顯示 (如果是暫停狀態)
if not st.session_state.playing:
current_time = st.session_state.current_time
frame_idx = int(current_time * fps)
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
ret, frame = cap.read()
if ret:
if show_asr:
frame = draw_asr_subtitle(frame, asr_data, current_time)
frame = draw_speaker_overlay(frame, asrx_data, current_time)
if show_scene:
frame = draw_scene_label(frame, scene_data, current_time)
if show_yolo:
frame = draw_yolo_overlay(frame, yolo_data, current_time)
if show_face:
frame = draw_face_overlay(frame, face_data, current_time)
if show_pose:
frame = draw_pose_overlay(frame, pose_data, current_time)
if show_ocr:
frame = draw_ocr_overlay(frame, ocr_data, current_time)
with placeholder.container():
st.image(frame, channels="BGR", use_container_width=True)
# 5. 人工互動聚類介面 (Identity Manager)
st.header("👥 身份管理與合併 (Identity Manager)")
# 找出所有 Person 截圖
thumbnail_dir = os.path.join(OUTPUT_DIR, "quick_preview")
person_thumbnails = [
f
for f in os.listdir(thumbnail_dir)
if f.startswith("Person_") and f.endswith(".jpg")
]
if person_thumbnails:
# 顯示所有面孔
cols = st.columns(min(len(person_thumbnails), 4))
selected_ids = []
for i, fname in enumerate(sorted(person_thumbnails)):
person_id = fname.replace(".jpg", "")
img_path = os.path.join(thumbnail_dir, fname)
with cols[i % 4]:
st.image(img_path, caption=person_id, use_container_width=True)
if st.checkbox(f"選擇 {person_id}", key=f"chk_{person_id}"):
selected_ids.append(person_id)
# 合併操作區
if selected_ids:
st.markdown("---")
st.write(f"已選擇: **{', '.join(selected_ids)}**")
with st.form(key="merge_form"):
new_name = st.text_input(
"合併後的身份名稱 (e.g., 主角, 張三)", value="Speaker_A"
)
submitted = st.form_submit_button("✅ 確認合併與綁定")
if submitted:
# 1. 更新 JSON
face_json_path = os.path.join(
OUTPUT_DIR, "quick_preview", "preview.face_clustered.json"
)
if os.path.exists(face_json_path):
with open(face_json_path, "r") as f:
face_data = json.load(f)
count = 0
for frame in face_data.get("frames", []):
for face in frame.get("faces", []):
if face.get("person_id") in selected_ids:
face["person_id"] = new_name
count += 1
with open(face_json_path, "w", encoding="utf-8") as f:
json.dump(face_data, f, indent=2, ensure_ascii=False)
st.success(f"✅ 已更新 {count} 個臉部標籤為 '{new_name}'")
# 2. 更新資料庫 (綁定 Talent)
import psycopg2
try:
conn = psycopg2.connect(
"postgresql://accusys@localhost:5432/momentry"
)
cur = conn.cursor()
# 創建或更新 Talent
cur.execute(
"SELECT id FROM talents WHERE real_name = %s", (new_name,)
)
row = cur.fetchone()
if row:
talent_id = row[0]
else:
cur.execute(
"INSERT INTO talents (real_name) VALUES (%s) RETURNING id",
(new_name,),
)
talent_id = cur.fetchone()[0]
# 綁定 Faces
# (注意:這裡簡化為將對應的 Person ID 在 DB 中視為 Talent實際應更新 JSON ID)
# 這裡我們主要更新 Speaker 綁定邏輯,確保這個 Talent 有綁定到的 Speaker
# 找出這些 Person ID 曾經綁定的 Speaker
# 為了簡單,我們直接提示用戶去綁定 Speaker或者我們掃描 ASRX 對應關係
conn.commit()
cur.close()
conn.close()
st.success(
f"✅ 資料庫已建立 Talent '{new_name}' (ID: {talent_id})"
)
# 重新載入頁面以反映變更
st.rerun()
except Exception as e:
st.error(f"資料庫錯誤: {e}")
else:
st.info("未發現聚類截圖。請先執行 `face_clustering_processor.py`。")
# 6. 時間軸視覺化 (Timeline)
st.header("📅 Processor Timeline (處理器活動軸)")
plot_timeline(uuid, duration)
cap.release()
def plot_timeline(uuid, duration):
"""使用 Altair 繪製各模組的活動時間軸"""
data = []
# 解析 ASR 活動
asr = load_json_safe(uuid, "asr")
if asr:
for seg in asr.get("segments", []):
data.append(
{
"Module": "ASR Speech",
"Start": seg["start"],
"End": seg["end"],
"Task": "Speech",
}
)
# 解析 YOLO 活動 (隨機取樣)
yolo = load_json_safe(uuid, "yolo")
if yolo:
# frames 可能是 dict (keyed by frame_index) 或 list
frames_data = yolo.get("frames", {})
if isinstance(frames_data, dict):
frames_list = list(frames_data.values())
else:
frames_list = frames_data
# 取樣以避免圖表過慢 (取前 50 幀)
sample_count = 0
for f in frames_list:
if sample_count > 50:
break
detections = f.get("detections", []) or f.get("objects", [])
if detections:
ts = f.get("time_seconds") or f.get("timestamp", 0)
data.append(
{
"Module": "YOLO Detect",
"Start": ts,
"End": ts + 0.5,
"Task": "Obj",
}
)
sample_count += 1
if not data:
st.info("No timeline data available.")
return
df = pd.DataFrame(data)
chart = (
alt.Chart(df)
.mark_bar()
.encode(
x=alt.X("Start:Q", title="Time (sec)"),
x2="End:Q",
y=alt.Y("Module:N", title=""),
color=alt.Color("Module:N", scale=alt.Scale(scheme="category10")),
)
.properties(height=200)
)
st.altair_chart(chart, use_container_width=True)
if __name__ == "__main__":
main()