#!/opt/homebrew/bin/python3.11 """ One-pass ASR + Speaker Change Detection + Split → asr.json """ import json, os, sys, time, argparse, subprocess, tempfile, shutil import numpy as np from pathlib import Path sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "asrx_self")) from speaker_encoder import load_speaker_encoder, extract_speaker_embedding, normalize_embeddings import torchaudio from faster_whisper import WhisperModel SUB_WIN = 0.5 SUB_STRIDE = 0.25 MIN_DUR = 0.3 SIM_THRESHOLD = 0.45 CHANGE_CONFIRM = 2 def extract_audio(video_path, tmp_dir, sr=16000): wav_path = os.path.join(tmp_dir, "audio.wav") subprocess.run(["ffmpeg", "-y", "-v", "quiet", "-i", video_path, "-ar", str(sr), "-ac", "1", "-sample_fmt", "s16", wav_path], check=True, capture_output=True, timeout=300) wav_data, sr_actual = torchaudio.load(wav_path) if wav_data.shape[0] > 1: wav_data = wav_data.mean(dim=0, keepdim=True) return wav_data, sr_actual def transcribe_pass1(model, wav_path, vad_params=None): print(" [faster-whisper] Transcribing...") if vad_params is None: vad_params = {"min_silence_duration_ms": 500, "speech_pad_ms": 200} segments, info = model.transcribe(wav_path, beam_size=5, vad_filter=True, word_timestamps=True, vad_parameters=vad_params) pass1 = [] for i, seg in enumerate(segments): words = [] if seg.words: for w in seg.words: words.append({"word": w.word.strip(), "start": round(w.start,3), "end": round(w.end,3)}) pass1.append({ "index": i, "start": round(seg.start, 3), "end": round(seg.end, 3), "text": seg.text.strip(), "words": words, }) print(f" Pass1 segments: {len(pass1)}") return pass1 def detect_speaker_changes(wav_data, sr, pass1_segs, encoder, progress_step=100): print(" [Speaker Detection] Scanning...") ws = int(SUB_WIN * sr) sw = int(SUB_STRIDE * sr) change_points = [] # List[List[float]] → change times per pass1 segment t0 = time.time() for si, seg in enumerate(pass1_segs): st = int(seg["start"] * sr) et = int(seg["end"] * sr) dur = seg["end"] - seg["start"] if dur < 1.0: change_points.append([]) continue sub_embs = [] sub_times = [] for wpos in range(st, et - ws + 1, sw): chunk = wav_data[:, wpos:wpos+ws] emb = extract_speaker_embedding(encoder, chunk.numpy(), sr) emb = emb / (np.linalg.norm(emb) + 1e-10) sub_embs.append(emb) sub_times.append(wpos / sr) if len(sub_embs) < 3: change_points.append([]) continue sub_embs = normalize_embeddings(np.array(sub_embs)) cps = [] # Require CHANGE_CONFIRM consecutive low-similarity windows before registering a change low_run = 0 for i in range(1, len(sub_embs)): sim = float(np.dot(sub_embs[i-1], sub_embs[i])) if sim < SIM_THRESHOLD: low_run += 1 if low_run >= CHANGE_CONFIRM: # Change point at the START of the low-sim run cps.append(round(sub_times[i - low_run + 1], 2)) low_run = 0 else: low_run = 0 change_points.append(cps) if (si + 1) % progress_step == 0: pct = (si + 1) * 100 // len(pass1_segs) print(f" {si+1}/{len(pass1_segs)} ({pct}%) [{time.time()-t0:.0f}s]") total_changes = sum(len(cps) for cps in change_points) print(f" Speaker changes detected: {total_changes} in {len(pass1_segs)} segments ({time.time()-t0:.0f}s)") return change_points def build_segments(pass1_segs, change_points, wav_data, sr, asr_model, tmp_dir, fps=24.0): print(" [Split] Building final segments...") final = [] chunk_idx = 0 for si, seg in enumerate(pass1_segs): cps = change_points[si] if not cps: final.append({ "chunk_id": str(chunk_idx), "pass1_index": si, "start_time": seg["start"], "end_time": seg["end"], "start_frame": int(seg["start"] * fps), "end_frame": int(seg["end"] * fps), "text": seg["text"], }) chunk_idx += 1 continue seg["split"] = True boundaries = [seg["start"]] + cps + [seg["end"]] for pi in range(len(boundaries) - 1): ps, pe = boundaries[pi], boundaries[pi+1] if pe - ps < MIN_DUR: continue # Try word_timestamp mapping first (wider tolerance) sub_words = [w["word"] for w in seg["words"] if w["start"] >= ps - 0.3 and w["end"] <= pe + 0.3] text = " ".join(sub_words).strip() if sub_words else "" # Fallback: call faster-whisper on the sub-audio chunk if not text: import soundfile as sf chunk_path = os.path.join(tmp_dir, f"sub_{chunk_idx}.wav") a_chunk = wav_data[:, int(ps*sr):int(pe*sr)].numpy()[0] if len(a_chunk) > sr * 0.3: # skip if < 0.3s sf.write(chunk_path, a_chunk, sr) try: sub_segs, _ = asr_model.transcribe(chunk_path, beam_size=5, vad_filter=True, vad_parameters={"min_silence_duration_ms": 100}) text = " ".join(s.text.strip() for s in sub_segs) except: pass os.remove(chunk_path) if not text: text = " ".join([w["word"] for w in seg["words"] if w["start"] >= ps - 0.5 and w["end"] <= pe + 0.5]).strip() if not text: text = seg["text"][:60] final.append({ "chunk_id": str(chunk_idx), "pass1_index": si, "start_time": round(ps, 3), "end_time": round(pe, 3), "start_frame": int(ps * fps), "end_frame": int(pe * fps), "text": text, "speaker_change": True, }) chunk_idx += 1 print(f" Final segments: {len(final)}") return final def voice_vectors_to_qdrant(wav_data, sr, final_segs, encoder, qdrant_url="http://localhost:6333"): print(" [Voice Vectors] Extracting 192D embeddings...") embeddings = [] t0 = time.time() for si, seg in enumerate(final_segs): st = int(seg["start_time"] * sr) et = int(seg["end_time"] * sr) a_chunk = wav_data[:, st:et] emb = extract_speaker_embedding(encoder, a_chunk.numpy(), sr) emb = emb / (np.linalg.norm(emb) + 1e-10) embeddings.append({"chunk_id": seg["chunk_id"], "embedding": emb.tolist()}) if (si + 1) % 500 == 0: print(f" {si+1}/{len(final_segs)} [{time.time()-t0:.0f}s]") print(f" Writing to Qdrant...") from urllib.request import Request, urlopen batch = [] for i, e in enumerate(embeddings): batch.append({"id": i + 1, "vector": e["embedding"], "payload": {"chunk_id": e["chunk_id"], "chunk_type": "sentence"}}) if len(batch) >= 100: req = Request(f"{qdrant_url}/collections/momentry_dev_voice/points?wait=true", data=json.dumps({"points": batch}).encode(), headers={"Content-Type": "application/json"}, method="PUT") try: urlopen(req) except: pass batch = [] # Flush remaining if batch: req = Request(f"{qdrant_url}/collections/momentry_dev_voice/points?wait=true", data=json.dumps({"points": batch}).encode(), headers={"Content-Type": "application/json"}, method="PUT") try: urlopen(req) except: pass print(f" Voice vectors: {len(embeddings)} pts → Qdrant [{time.time()-t0:.0f}s]") return embeddings def main(): parser = argparse.ArgumentParser() parser.add_argument("--video", default="/Users/accusys/momentry/var/sftpgo/data/demo/Charade (1963) Cary Grant & Audrey Hepburn | Comedy Mystery Romance Thriller | Full Movie.mp4") parser.add_argument("--output", help="Output path for asr.json", default="/Users/accusys/momentry/output_dev/aeed71342a899fe4b4c57b7d41bcb692.asr.json") parser.add_argument("--sample", type=int, help="Process only first N pass1 segments (for testing)") parser.add_argument("--no-qdrant", action="store_true", help="Skip Qdrant upload") args = parser.parse_args() t0 = time.time() # Load models print("=== Loading Models ===") asr_model = WhisperModel("small", device="cpu", compute_type="int8") print(" faster-whisper small loaded") encoder = load_speaker_encoder() print(" ECAPA-TDNN loaded") print() # Extract audio print("=== Audio Extraction ===") tmp_dir = tempfile.mkdtemp(prefix="transcribe_") wav_data, sr = extract_audio(args.video, tmp_dir) print(f" Audio: {wav_data.shape[1]/sr:.0f}s, {sr}Hz") wav_path = os.path.join(tmp_dir, "audio.wav") print() # Step 1: faster-whisper pass1 print("=== Step 1: Pass1 Transcription ===") pass1_segs = transcribe_pass1(asr_model, wav_path) if args.sample: pass1_segs = pass1_segs[:args.sample] print(f" SAMPLE MODE: limiting to {args.sample} segments") print() # Step 2: Speaker change detection print("=== Step 2: Speaker Change Detection ===") change_points = detect_speaker_changes(wav_data, sr, pass1_segs, encoder) print() # Step 3: Build final segments print("=== Step 3: Build Final Segments ===") final_segs = build_segments(pass1_segs, change_points, wav_data, sr, asr_model, tmp_dir) print() # Step 4: Voice vectors → Qdrant if not args.no_qdrant: print("=== Step 4: Voice Vectors → Qdrant ===") voice_vectors_to_qdrant(wav_data, sr, final_segs, encoder) print() # Step 5: Write asr.json print("=== Step 5: Write asr.json ===") uuid = os.path.basename(args.output).replace(".asr.json", "") output = { "file_uuid": uuid, "pass1": pass1_segs, "segments": final_segs, } with open(args.output, "w") as f: json.dump(output, f, indent=2, ensure_ascii=False) sz = os.path.getsize(args.output) print(f" {args.output} ({sz/1024:.0f} KB)") # Cleanup shutil.rmtree(tmp_dir, ignore_errors=True) elapsed = time.time() - t0 print(f"\n=== Done ({elapsed:.0f}s) ===") print(f" Pass1 segments: {len(pass1_segs)}") print(f" Final segments: {len(final_segs)}") fp = args.output print(f" Output: {fp}") if __name__ == "__main__": main()