feat: service inventory, ERP reports, sqlite-vec integration, visualize tool
- Add SERVICE_INVENTORY_V1.0.0.md (25 source-verified tools, 3.7GB) - Add ERP_SELECTION_REPORT.md (Odoo CE vs ERPNext comparison) - Add SFTPGO_ODOO_REPLACEMENT.md (SFTPGo migration plan) - Add SERVICE_GO_GITEA_BUILD.md (Go compiler + Gitea build report) - Add release visualize command (face trace heatmap + identity filter) - Add sqlite-vec integration (160MB SQLite with vec0 vector tables) - Add export_identities.py, export_sqlite.py, render_face_heatmap.py - Add Go, Gitea, Rust/Cargo, Swift, yt-dlp, SQLite, sqlite-vec to service CLI - Fix package to include identities and identity_bindings in data.sql - Update release list to show all deployed video stats - Add V1.0.0 YAML frontmatter to all docs (DOCS_STANDARD compliant)
This commit is contained in:
161
scripts/embed_faces.py
Normal file
161
scripts/embed_faces.py
Normal file
@@ -0,0 +1,161 @@
|
||||
#!/opt/homebrew/bin/python3.11
|
||||
"""
|
||||
Process Swift face detection output + add CoreML FaceNet embeddings.
|
||||
Replaces face_processor.py Step 2 when Swift already ran.
|
||||
"""
|
||||
import sys, os, json, argparse, time
|
||||
import cv2
|
||||
import numpy as np
|
||||
import coremltools as ct
|
||||
from pathlib import Path
|
||||
|
||||
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
FACENET_PATH = os.path.join(SCRIPT_DIR, "..", "models", "facenet512.mlpackage")
|
||||
|
||||
def classify_pose(roll, yaw):
|
||||
abs_yaw = abs(yaw)
|
||||
abs_roll = abs(roll)
|
||||
if abs_yaw < 15 and abs_roll < 15:
|
||||
return "frontal"
|
||||
elif abs_yaw > 30:
|
||||
return "profile_right" if yaw > 0 else "profile_left"
|
||||
else:
|
||||
return "three_quarter"
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--swift-json", required=True, help="Swift detection output")
|
||||
parser.add_argument("--video", required=True, help="Video file path")
|
||||
parser.add_argument("--output", required=True, help="Output face.json path")
|
||||
parser.add_argument("--fps", type=float, default=24.0)
|
||||
args = parser.parse_args()
|
||||
|
||||
print(f"[EMBED] Loading Swift output: {args.swift_json}")
|
||||
with open(args.swift_json) as f:
|
||||
swift = json.load(f)
|
||||
|
||||
swift_frames = swift.get("frames", [])
|
||||
print(f"[EMBED] Swift frames: {len(swift_frames)}")
|
||||
|
||||
# Load CoreML FaceNet
|
||||
facenet = os.path.normpath(FACENET_PATH)
|
||||
coreml_model = None
|
||||
if os.path.exists(facenet):
|
||||
coreml_model = ct.models.MLModel(facenet)
|
||||
print(f"[EMBED] FaceNet loaded")
|
||||
else:
|
||||
print(f"[EMBED] WARNING: FaceNet not found at {facenet}")
|
||||
|
||||
# Open video
|
||||
video = cv2.VideoCapture(args.video)
|
||||
if not video.isOpened():
|
||||
raise RuntimeError(f"Cannot open {args.video}")
|
||||
v_fps = video.get(cv2.CAP_PROP_FPS)
|
||||
v_total = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
|
||||
v_width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||||
v_height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||||
print(f"[EMBED] Video: {v_width}x{v_height}, {v_fps:.1f}fps")
|
||||
|
||||
# Sequential read optimization: build lookup set
|
||||
needed_frames = set()
|
||||
frame_data_map = {}
|
||||
for sf in swift_frames:
|
||||
fn = int(sf.get("frame", sf.get("frame_number", 0)))
|
||||
needed_frames.add(fn)
|
||||
frame_data_map[fn] = sf
|
||||
|
||||
output_frames = []
|
||||
embed_count = 0
|
||||
t0 = time.time()
|
||||
current_frame = 0
|
||||
|
||||
while True:
|
||||
ret, frame = video.read()
|
||||
if not ret:
|
||||
break
|
||||
|
||||
if current_frame not in needed_frames:
|
||||
current_frame += 1
|
||||
continue
|
||||
|
||||
sf = frame_data_map[current_frame]
|
||||
timestamp = sf.get("timestamp", current_frame / v_fps)
|
||||
faces_in = sf.get("faces", [])
|
||||
|
||||
processed_faces = []
|
||||
for face in faces_in:
|
||||
bb = face.get("bbox", {})
|
||||
x, y, w, h = bb.get("x", 0), bb.get("y", 0), bb.get("width", 0), bb.get("height", 0)
|
||||
|
||||
if w <= 10 or h <= 10:
|
||||
continue
|
||||
|
||||
x1, y1 = max(0, x), max(0, y)
|
||||
x2, y2 = min(v_width, x + w), min(v_height, y + h)
|
||||
if x2 <= x1 or y2 <= y1:
|
||||
continue
|
||||
face_img = frame[y1:y2, x1:x2]
|
||||
if face_img.size == 0:
|
||||
continue
|
||||
|
||||
emb = None
|
||||
if coreml_model is not None and face_img.shape[0] > 0 and face_img.shape[1] > 0:
|
||||
try:
|
||||
resized = cv2.resize(face_img, (160, 160))
|
||||
rgb = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB).astype(np.float32)
|
||||
normalized = rgb / 127.5 - 1.0
|
||||
input_data = np.expand_dims(np.transpose(normalized, (2, 0, 1)), axis=0)
|
||||
result = coreml_model.predict({"input": input_data})
|
||||
emb = list(result.values())[0].flatten().tolist()
|
||||
embed_count += 1
|
||||
except Exception as e:
|
||||
pass
|
||||
|
||||
# Pose
|
||||
pose_info = face.get("pose", {})
|
||||
pose_angle = classify_pose(pose_info.get("roll", 0), pose_info.get("yaw", 0))
|
||||
|
||||
processed_faces.append({
|
||||
"x": x, "y": y, "width": w, "height": h,
|
||||
"confidence": face.get("confidence", 0.5),
|
||||
"embedding": emb,
|
||||
"pose_angle": {
|
||||
"angle": pose_angle,
|
||||
"roll": pose_info.get("roll", 0),
|
||||
"yaw": pose_info.get("yaw", 0),
|
||||
"pitch": pose_info.get("pitch", 0),
|
||||
},
|
||||
"lips": face.get("lips"),
|
||||
"landmarks": face.get("landmarks"),
|
||||
"attributes": None,
|
||||
})
|
||||
|
||||
if processed_faces:
|
||||
output_frames.append({
|
||||
"frame": current_frame,
|
||||
"timestamp": timestamp,
|
||||
"faces": processed_faces,
|
||||
})
|
||||
|
||||
current_frame += 1
|
||||
|
||||
if len(output_frames) % 500 == 0:
|
||||
print(f"[EMBED] {len(output_frames)}/{len(needed_frames)} frames, {embed_count} embeddings, {time.time()-t0:.0f}s")
|
||||
|
||||
video.release()
|
||||
|
||||
output = {
|
||||
"frame_count": len(output_frames),
|
||||
"fps": v_fps,
|
||||
"frames": output_frames,
|
||||
}
|
||||
|
||||
os.makedirs(os.path.dirname(args.output), exist_ok=True)
|
||||
with open(args.output, "w") as f:
|
||||
json.dump(output, f, indent=2, ensure_ascii=False)
|
||||
|
||||
elapsed = time.time() - t0
|
||||
print(f"[EMBED] Done: {len(output_frames)} frames, {embed_count} embeddings, {elapsed:.0f}s → {args.output}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user