338 lines
12 KiB
Python
338 lines
12 KiB
Python
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
Face Processor V2 - Apple Vision detection + CoreML FaceNet embedding
|
|
|
|
Flow:
|
|
1. swift_face (Vision/ANE) → bbox + pose per frame
|
|
2. cv2 opens video, crops faces from bbox
|
|
3. CoreML FaceNet → 512D embedding per face
|
|
4. Output face.json in standard format
|
|
|
|
Replaces face_processor.py (no more InsightFace CPU detection).
|
|
Detection cost: near-zero CPU (Vision ANE)
|
|
Embedding cost: near-zero CPU (CoreML ANE)
|
|
"""
|
|
|
|
import re
|
|
import sys
|
|
import os
|
|
import json
|
|
import argparse
|
|
import subprocess
|
|
import time
|
|
from typing import Optional, Dict
|
|
|
|
import cv2
|
|
import numpy as np
|
|
from pathlib import Path
|
|
|
|
# CoreML
|
|
import coremltools as ct
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
from redis_publisher import RedisPublisher
|
|
|
|
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
SWIFT_BIN = os.path.join(SCRIPT_DIR, "swift_processors", ".build", "debug", "swift_face")
|
|
FACENET_PATH = os.path.join(SCRIPT_DIR, "..", "models", "facenet512.mlpackage")
|
|
|
|
# Pose angle classification from roll/yaw
|
|
def classify_pose(roll: float, yaw: float) -> str:
|
|
"""Convert roll/yaw to pose angle label"""
|
|
abs_yaw = abs(yaw)
|
|
abs_roll = abs(roll)
|
|
if abs_yaw < 15 and abs_roll < 15:
|
|
return "frontal"
|
|
elif abs_yaw > 30:
|
|
return "profile_right" if yaw > 0 else "profile_left"
|
|
else:
|
|
return "three_quarter"
|
|
|
|
|
|
class FaceProcessorVision:
|
|
def __init__(self, video_path: str, output_path: str, uuid: str = "",
|
|
sample_interval: int = 3, publisher: RedisPublisher = None):
|
|
self.video_path = video_path
|
|
self.output_path = output_path
|
|
self.uuid = uuid
|
|
self.sample_interval = sample_interval
|
|
self.publisher = publisher
|
|
|
|
# Load CoreML FaceNet
|
|
self.coreml_model = None
|
|
facenet = os.path.normpath(FACENET_PATH)
|
|
if os.path.exists(facenet):
|
|
try:
|
|
self.coreml_model = ct.models.MLModel(facenet)
|
|
print(f"[FACE_V2] CoreML FaceNet loaded: {facenet}")
|
|
except Exception as e:
|
|
print(f"[FACE_V2] CoreML load failed: {e}")
|
|
|
|
self.video = None
|
|
self.fps = 30.0
|
|
self.total_frames = 0
|
|
self.width = 0
|
|
self.height = 0
|
|
|
|
def open_video(self):
|
|
self.video = cv2.VideoCapture(self.video_path)
|
|
if not self.video.isOpened():
|
|
raise RuntimeError(f"Cannot open: {self.video_path}")
|
|
self.fps = self.video.get(cv2.CAP_PROP_FPS)
|
|
self.total_frames = int(self.video.get(cv2.CAP_PROP_FRAME_COUNT))
|
|
self.width = int(self.video.get(cv2.CAP_PROP_FRAME_WIDTH))
|
|
self.height = int(self.video.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
|
print(f"[FACE_V2] Video: {self.width}x{self.height}, {self.fps:.1f}fps, {self.total_frames}f")
|
|
|
|
def extract_face_embedding(self, face_img: np.ndarray) -> Optional[list]:
|
|
"""Run CoreML FaceNet on cropped face"""
|
|
if self.coreml_model is None:
|
|
return None
|
|
try:
|
|
# Resize to 160x160
|
|
resized = cv2.resize(face_img, (160, 160))
|
|
# Convert HWC to CHW and normalize to [-1, 1]
|
|
normalized = (resized.astype(np.float32) / 127.5) - 1.0
|
|
normalized = np.transpose(normalized, (2, 0, 1)) # HWC -> CHW
|
|
# Add batch dim: (1, 3, 160, 160)
|
|
input_array = np.expand_dims(normalized, axis=0)
|
|
result = self.coreml_model.predict({"input": input_array})
|
|
# Find output key (var_xxx)
|
|
emb_key = [k for k in result.keys() if k.startswith("var_")][0]
|
|
emb = result[emb_key].flatten().tolist()
|
|
return emb
|
|
except Exception as e:
|
|
print(f"[FACE_V2] Embedding error: {e}")
|
|
return None
|
|
|
|
def process_with_swift(self) -> Dict:
|
|
"""Step 1: Run swift_face to get bbox + pose"""
|
|
print(f"[FACE_V2] Step 1: Vision detection...")
|
|
|
|
# Build swift_face if needed
|
|
if not os.path.exists(SWIFT_BIN):
|
|
build_dir = os.path.join(SCRIPT_DIR, "swift_processors")
|
|
print(f"[FACE_V2] Building swift_face in {build_dir}...")
|
|
subprocess.run(
|
|
["swift", "build", "-c", "debug", "--product", "swift_face"],
|
|
cwd=build_dir, check=True
|
|
)
|
|
|
|
swift_out = self.output_path.replace(".json", "_detect.json")
|
|
cmd = [
|
|
SWIFT_BIN,
|
|
self.video_path,
|
|
swift_out,
|
|
"--sample-interval", str(self.sample_interval),
|
|
]
|
|
if self.uuid:
|
|
cmd.extend(["--uuid", self.uuid])
|
|
|
|
print(f"[FACE_V2] Running: {' '.join(cmd)}")
|
|
t0 = time.time()
|
|
log_path = swift_out + ".log"
|
|
log_f = open(log_path, "w")
|
|
proc = subprocess.Popen(cmd, stdout=log_f, stderr=subprocess.STDOUT, text=True)
|
|
last_pct = -1
|
|
while proc.poll() is None:
|
|
time.sleep(10)
|
|
# Read latest log lines
|
|
try:
|
|
with open(log_path) as lf:
|
|
for line in lf:
|
|
line = line.strip()
|
|
m = re.search(r'(\d+)% complete', line)
|
|
if m:
|
|
pct = int(m.group(1))
|
|
if pct > last_pct:
|
|
last_pct = pct
|
|
if self.publisher:
|
|
self.publisher.progress("face", pct, 100, f"swift detect {pct}%")
|
|
except Exception:
|
|
pass
|
|
log_f.close()
|
|
if proc.returncode != 0:
|
|
stderr_out = proc.stderr.read()
|
|
if stderr_out:
|
|
print(stderr_out.strip(), file=sys.stderr)
|
|
raise RuntimeError(f"swift_face exited with code {proc.returncode}")
|
|
|
|
elapsed = time.time() - t0
|
|
print(f"[FACE_V2] Detection done in {elapsed:.1f}s")
|
|
|
|
with open(swift_out) as f:
|
|
return json.load(f)
|
|
|
|
def embed_and_save(self, detection_data: Dict):
|
|
"""Step 2: Crop faces + CoreML embedding + save face.json"""
|
|
print(f"[FACE_V2] Step 2: CoreML embedding...")
|
|
|
|
frames = detection_data.get("frames", [])
|
|
self.open_video()
|
|
|
|
face_data = {
|
|
"metadata": {
|
|
"video_path": os.path.abspath(self.video_path),
|
|
"fps": self.fps, "width": self.width, "height": self.height,
|
|
"sample_interval": self.sample_interval,
|
|
"detection_method": "apple_vision",
|
|
"embedding_method": "coreml_facenet",
|
|
"status": "in_progress",
|
|
"total_frames": self.total_frames,
|
|
},
|
|
"frames": {}
|
|
}
|
|
|
|
t0 = time.time()
|
|
embed_count = 0
|
|
total_face_count = 0
|
|
last_pct = -1
|
|
|
|
for frame_info in frames:
|
|
frame_num = frame_info["frame"]
|
|
faces = []
|
|
for face in frame_info.get("faces", []):
|
|
bb = face["bbox"]
|
|
x, y, w, h = bb["x"], bb["y"], bb["width"], bb["height"]
|
|
|
|
if w <= 10 or h <= 10:
|
|
continue # skip tiny faces
|
|
|
|
# Seek to frame and read
|
|
self.video.set(cv2.CAP_PROP_POS_FRAMES, frame_num)
|
|
ret, frame = self.video.read()
|
|
if not ret:
|
|
continue
|
|
|
|
# Crop face
|
|
x1, y1 = max(0, x), max(0, y)
|
|
x2, y2 = min(self.width, x + w), min(self.height, y + h)
|
|
if x2 <= x1 or y2 <= y1:
|
|
continue
|
|
face_img = frame[y1:y2, x1:x2]
|
|
if face_img.size == 0:
|
|
continue
|
|
|
|
# CoreML embedding
|
|
emb = self.extract_face_embedding(face_img)
|
|
if emb is not None:
|
|
embed_count += 1
|
|
|
|
# Pose classification
|
|
pose_info = face.get("pose", {})
|
|
pose_angle = classify_pose(
|
|
pose_info.get("roll", 0),
|
|
pose_info.get("yaw", 0)
|
|
)
|
|
|
|
faces.append({
|
|
"x": x, "y": y, "width": w, "height": h,
|
|
"confidence": face.get("confidence", 0.5),
|
|
"embedding": emb,
|
|
"pose_angle": {
|
|
"angle": pose_angle,
|
|
"roll": pose_info.get("roll", 0),
|
|
"yaw": pose_info.get("yaw", 0),
|
|
"pitch": pose_info.get("pitch", 0),
|
|
},
|
|
"lips": face.get("lips"),
|
|
"landmarks": face.get("landmarks"),
|
|
"attributes": None,
|
|
})
|
|
|
|
if faces:
|
|
face_data["frames"][str(frame_num)] = {
|
|
"frame_number": frame_num,
|
|
"time_seconds": frame_info.get("timestamp", frame_num / self.fps),
|
|
"time_formatted": f"{frame_num / self.fps:.1f}s",
|
|
"faces": faces,
|
|
}
|
|
|
|
if len(face_data["frames"]) % 100 == 0:
|
|
elapsed = time.time() - t0
|
|
print(f"[FACE_V2] {len(face_data['frames'])} frames, {embed_count} embeddings, {elapsed:.0f}s")
|
|
if self.publisher:
|
|
pct = int(len(face_data["frames"]) * 100 / max(len(frames), 1))
|
|
if pct > last_pct:
|
|
last_pct = pct
|
|
self.publisher.progress("face", len(face_data["frames"]), len(frames),
|
|
f"{embed_count} faces", embed_count, "faces")
|
|
|
|
self.video.release()
|
|
|
|
# Finalize
|
|
face_data["metadata"]["status"] = "completed"
|
|
face_data["metadata"]["total_embeddings"] = embed_count
|
|
face_data["metadata"]["embedder"] = "coreml_facenet"
|
|
|
|
# Convert dict frames to list for Rust FaceResult format
|
|
frames_list = []
|
|
for fnum_str, fdata in sorted(face_data["frames"].items(), key=lambda x: int(x[0])):
|
|
frames_list.append({
|
|
"frame": int(fnum_str),
|
|
"timestamp": fdata["time_seconds"],
|
|
"faces": fdata["faces"],
|
|
})
|
|
|
|
output = {
|
|
"frame_count": len(frames_list),
|
|
"fps": self.fps,
|
|
"frames": frames_list,
|
|
}
|
|
|
|
with open(self.output_path, "w") as f:
|
|
json.dump(output, f, indent=2, ensure_ascii=False)
|
|
|
|
elapsed = time.time() - t0
|
|
print(f"[FACE_V2] Done: {len(frames_list)} frames, {embed_count} embeddings, {elapsed:.0f}s")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Apple Vision Face Processor V2")
|
|
parser.add_argument("video_path", help="Video file path")
|
|
parser.add_argument("output_path", help="Output JSON path")
|
|
parser.add_argument("--uuid", "-u", default="")
|
|
parser.add_argument("--sample-interval", type=int, default=3)
|
|
parser.add_argument("--force", action="store_true")
|
|
args = parser.parse_args()
|
|
|
|
publisher = RedisPublisher(args.uuid) if args.uuid else None
|
|
if publisher:
|
|
publisher.info("face", "FACE_START")
|
|
|
|
if args.force and os.path.exists(args.output_path):
|
|
os.remove(args.output_path)
|
|
|
|
processor = FaceProcessorVision(
|
|
args.video_path, args.output_path,
|
|
args.uuid, args.sample_interval, publisher
|
|
)
|
|
|
|
# Step 1: Vision detection (bbox + pose via ANE)
|
|
try:
|
|
detection = processor.process_with_swift()
|
|
except Exception as e:
|
|
if publisher:
|
|
publisher.error("face", f"Detection failed: {e}")
|
|
raise
|
|
|
|
# Step 2: CoreML embedding + save
|
|
try:
|
|
processor.embed_and_save(detection)
|
|
except Exception as e:
|
|
if publisher:
|
|
publisher.error("face", f"Embedding failed: {e}")
|
|
raise
|
|
|
|
if publisher:
|
|
publisher.complete("face", f"{len(detection.get('frames',[]))} frames")
|
|
|
|
# Clean up temp detection file
|
|
swift_out = args.output_path.replace(".json", "_detect.json")
|
|
if os.path.exists(swift_out):
|
|
os.remove(swift_out)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|