Files
momentry_core/scripts/face_processor.py
Accusys 766a1d9a6d feat: Swift Face Pose integration + TKG 方案 B
Major Changes:
- swift_face_pose: output pose angles (yaw/pitch/roll) in face.json
- face_processor.py: call swift_face_pose (dual output: face.json + pose.json)
- Face struct: add pose_angle field
- TKG 方案 B: gaze/lip_track nodes from face.json (no face_detections dependency)
- Chunk cleanup: delete old data before rebuild (avoid duplicate key)
- Hand nodes: classify by hand_type + gesture (15 combinations)
- HAND_OBJECT edges: bbox spatial matching (174 matches)

Test Results:
- Blake Jones: 8 faces, pose_angle ✓, 66 nodes, 174 edges
- FilmRiot: 394 faces, pose_angle ✓, 35 nodes, 39 edges
- Left hands: 132, Right hands: 2

Architecture:
- All TKG nodes built from JSON files (face.json, hand.json, yolo.json)
- Swift processors: sample_interval=3 (Face/Pose/Hand sync)
- Cleanup functions: delete_tkg_nodes_by_uuid, delete_tkg_edges_by_uuid
2026-06-23 05:47:24 +08:00

350 lines
13 KiB
Python

#!/opt/homebrew/bin/python3.11
"""
Face Processor V2 - Apple Vision detection + CoreML FaceNet embedding
Flow:
1. swift_face (Vision/ANE) → bbox + pose per frame
2. cv2 opens video, crops faces from bbox
3. CoreML FaceNet → 512D embedding per face
4. Output face.json in standard format
Replaces face_processor.py (no more InsightFace CPU detection).
Detection cost: near-zero CPU (Vision ANE)
Embedding cost: near-zero CPU (CoreML ANE)
"""
import re
import sys
import os
import json
import argparse
import subprocess
import time
from typing import Optional, Dict
import cv2
import numpy as np
from pathlib import Path
# CoreML
import coremltools as ct
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from redis_publisher import RedisPublisher
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
SWIFT_BIN = os.path.join(SCRIPT_DIR, "swift_processors", ".build", "debug", "swift_face_pose")
FACENET_PATH = os.path.join(SCRIPT_DIR, "..", "models", "facenet512.mlpackage")
# Pose angle classification from roll/yaw
def classify_pose(roll: float, yaw: float) -> str:
"""Convert roll/yaw to pose angle label"""
abs_yaw = abs(yaw)
abs_roll = abs(roll)
if abs_yaw < 15 and abs_roll < 15:
return "frontal"
elif abs_yaw > 30:
return "profile_right" if yaw > 0 else "profile_left"
else:
return "three_quarter"
class FaceProcessorVision:
def __init__(self, video_path: str, output_path: str, uuid: str = "",
sample_interval: int = 3, publisher: RedisPublisher = None):
self.video_path = video_path
self.output_path = output_path
self.uuid = uuid
self.sample_interval = sample_interval
self.publisher = publisher
# Load CoreML FaceNet
self.coreml_model = None
facenet = os.path.normpath(FACENET_PATH)
if os.path.exists(facenet):
try:
self.coreml_model = ct.models.MLModel(facenet)
print(f"[FACE_V2] CoreML FaceNet loaded: {facenet}")
except Exception as e:
print(f"[FACE_V2] CoreML load failed: {e}")
self.video = None
self.fps = 30.0
self.total_frames = 0
self.width = 0
self.height = 0
def open_video(self):
self.video = cv2.VideoCapture(self.video_path)
if not self.video.isOpened():
raise RuntimeError(f"Cannot open: {self.video_path}")
self.fps = self.video.get(cv2.CAP_PROP_FPS)
self.total_frames = int(self.video.get(cv2.CAP_PROP_FRAME_COUNT))
self.width = int(self.video.get(cv2.CAP_PROP_FRAME_WIDTH))
self.height = int(self.video.get(cv2.CAP_PROP_FRAME_HEIGHT))
print(f"[FACE_V2] Video: {self.width}x{self.height}, {self.fps:.1f}fps, {self.total_frames}f")
def extract_face_embedding(self, face_img: np.ndarray) -> Optional[list]:
"""Run CoreML FaceNet on cropped face"""
if self.coreml_model is None:
return None
try:
# Resize to 160x160
resized = cv2.resize(face_img, (160, 160))
# Convert HWC to CHW and normalize to [-1, 1]
normalized = (resized.astype(np.float32) / 127.5) - 1.0
normalized = np.transpose(normalized, (2, 0, 1)) # HWC -> CHW
# Add batch dim: (1, 3, 160, 160)
input_array = np.expand_dims(normalized, axis=0)
result = self.coreml_model.predict({"input": input_array})
# Find output key (var_xxx)
emb_key = [k for k in result.keys() if k.startswith("var_")][0]
emb = result[emb_key].flatten().tolist()
return emb
except Exception as e:
print(f"[FACE_V2] Embedding error: {e}")
return None
def process_with_swift(self) -> Dict:
"""Step 1: Run swift_face_pose to get bbox + pose (generates face.json + pose.json)"""
print(f"[FACE_V2] Step 1: Vision detection (face + pose)...")
# Build swift_face_pose if needed
if not os.path.exists(SWIFT_BIN):
build_dir = os.path.join(SCRIPT_DIR, "swift_processors")
print(f"[FACE_V2] Building swift_face_pose in {build_dir}...")
subprocess.run(
["swift", "build", "-c", "debug", "--product", "swift_face_pose"],
cwd=build_dir, check=True
)
swift_face_out = self.output_path.replace(".json", "_detect.json")
# Pose output: same directory, but replace "face" with "pose" in filename
output_dir = os.path.dirname(self.output_path)
output_basename = os.path.basename(self.output_path)
pose_basename = output_basename.replace("face", "pose")
swift_pose_out = os.path.join(output_dir, pose_basename)
cmd = [
SWIFT_BIN,
self.video_path,
swift_face_out,
swift_pose_out,
"--sample-interval", str(self.sample_interval),
]
if self.uuid:
cmd.extend(["--uuid", self.uuid])
print(f"[FACE_V2] Running: {' '.join(cmd)}")
t0 = time.time()
log_path = swift_face_out + ".log"
log_f = open(log_path, "w")
proc = subprocess.Popen(cmd, stdout=log_f, stderr=subprocess.STDOUT, text=True)
last_pct = -1
while proc.poll() is None:
time.sleep(10)
# Read latest log lines
try:
with open(log_path) as lf:
for line in lf:
line = line.strip()
m = re.search(r'(\d+)% complete', line)
if m:
pct = int(m.group(1))
if pct > last_pct:
last_pct = pct
if self.publisher:
self.publisher.progress("face", pct, 100, f"swift detect {pct}%")
except Exception:
pass
log_f.close()
if proc.returncode != 0:
stderr_out = proc.stderr.read()
if stderr_out:
print(stderr_out.strip(), file=sys.stderr)
raise RuntimeError(f"swift_face_pose exited with code {proc.returncode}")
elapsed = time.time() - t0
print(f"[FACE_V2] Detection done in {elapsed:.1f}s")
with open(swift_face_out) as f:
face_data = json.load(f)
# Also check if pose.json was generated (for reference)
if os.path.exists(swift_pose_out):
print(f"[FACE_V2] Pose file generated: {swift_pose_out}")
return face_data
def embed_and_save(self, detection_data: Dict):
"""Step 2: Crop faces + CoreML embedding + save face.json"""
print(f"[FACE_V2] Step 2: CoreML embedding...")
frames = detection_data.get("frames", [])
self.open_video()
face_data = {
"metadata": {
"video_path": os.path.abspath(self.video_path),
"fps": self.fps, "width": self.width, "height": self.height,
"sample_interval": self.sample_interval,
"detection_method": "apple_vision",
"embedding_method": "coreml_facenet",
"status": "in_progress",
"total_frames": self.total_frames,
},
"frames": {}
}
t0 = time.time()
embed_count = 0
total_face_count = 0
last_pct = -1
for frame_info in frames:
frame_num = frame_info["frame"]
faces = []
for face in frame_info.get("faces", []):
bb = face["bbox"]
x, y, w, h = bb["x"], bb["y"], bb["width"], bb["height"]
if w <= 10 or h <= 10:
continue # skip tiny faces
# Seek to frame and read
self.video.set(cv2.CAP_PROP_POS_FRAMES, frame_num)
ret, frame = self.video.read()
if not ret:
continue
# Crop face
x1, y1 = max(0, x), max(0, y)
x2, y2 = min(self.width, x + w), min(self.height, y + h)
if x2 <= x1 or y2 <= y1:
continue
face_img = frame[y1:y2, x1:x2]
if face_img.size == 0:
continue
# CoreML embedding
emb = self.extract_face_embedding(face_img)
if emb is not None:
embed_count += 1
# Pose classification
pose_info = face.get("pose", {})
pose_angle = classify_pose(
pose_info.get("roll", 0),
pose_info.get("yaw", 0)
)
faces.append({
"x": x, "y": y, "width": w, "height": h,
"confidence": face.get("confidence", 0.5),
"embedding": emb,
"pose_angle": {
"angle": pose_angle,
"roll": pose_info.get("roll", 0),
"yaw": pose_info.get("yaw", 0),
"pitch": pose_info.get("pitch", 0),
},
"lips": face.get("lips"),
"landmarks": face.get("landmarks"),
"attributes": None,
})
if faces:
face_data["frames"][str(frame_num)] = {
"frame_number": frame_num,
"time_seconds": frame_info.get("timestamp", frame_num / self.fps),
"time_formatted": f"{frame_num / self.fps:.1f}s",
"faces": faces,
}
if len(face_data["frames"]) % 100 == 0:
elapsed = time.time() - t0
print(f"[FACE_V2] {len(face_data['frames'])} frames, {embed_count} embeddings, {elapsed:.0f}s")
if self.publisher:
pct = int(len(face_data["frames"]) * 100 / max(len(frames), 1))
if pct > last_pct:
last_pct = pct
self.publisher.progress("face", len(face_data["frames"]), len(frames),
f"{embed_count} faces", embed_count, "faces")
self.video.release()
# Finalize
face_data["metadata"]["status"] = "completed"
face_data["metadata"]["total_embeddings"] = embed_count
face_data["metadata"]["embedder"] = "coreml_facenet"
# Convert dict frames to list for Rust FaceResult format
frames_list = []
for fnum_str, fdata in sorted(face_data["frames"].items(), key=lambda x: int(x[0])):
frames_list.append({
"frame": int(fnum_str),
"timestamp": fdata["time_seconds"],
"faces": fdata["faces"],
})
output = {
"frame_count": len(frames_list),
"fps": self.fps,
"frames": frames_list,
}
with open(self.output_path, "w") as f:
json.dump(output, f, indent=2, ensure_ascii=False)
elapsed = time.time() - t0
print(f"[FACE_V2] Done: {len(frames_list)} frames, {embed_count} embeddings, {elapsed:.0f}s")
def main():
parser = argparse.ArgumentParser(description="Apple Vision Face Processor V2")
parser.add_argument("video_path", help="Video file path")
parser.add_argument("output_path", help="Output JSON path")
parser.add_argument("--uuid", "-u", default="")
parser.add_argument("--sample-interval", type=int, default=3)
parser.add_argument("--force", action="store_true")
args = parser.parse_args()
publisher = RedisPublisher(args.uuid) if args.uuid else None
if publisher:
publisher.info("face", "FACE_START")
if args.force and os.path.exists(args.output_path):
os.remove(args.output_path)
processor = FaceProcessorVision(
args.video_path, args.output_path,
args.uuid, args.sample_interval, publisher
)
# Step 1: Vision detection (bbox + pose via ANE)
try:
detection = processor.process_with_swift()
except Exception as e:
if publisher:
publisher.error("face", f"Detection failed: {e}")
raise
# Step 2: CoreML embedding + save
try:
processor.embed_and_save(detection)
except Exception as e:
if publisher:
publisher.error("face", f"Embedding failed: {e}")
raise
if publisher:
publisher.complete("face", f"{len(detection.get('frames',[]))} frames")
# Clean up temp detection file
swift_out = args.output_path.replace(".json", "_detect.json")
if os.path.exists(swift_out):
os.remove(swift_out)
if __name__ == "__main__":
main()