- Remove session-ses_2f27.md (161KB raw session log) - Remove 49 ROOT_* duplicate files across REFERENCE/ - Remove 14 duplicate files between REFERENCE/ root and history/ - Remove asr_legacy.rs (dead code, replaced by asr.rs) - Remove src/core/worker/ (duplicate JobWorker) - Remove src/core/layers/ (empty directory) - Remove 4 .bak files in src/ - Remove 7 dead private methods in worker/processor.rs - Remove backup directory from git tracking
379 lines
12 KiB
Python
Executable File
379 lines
12 KiB
Python
Executable File
#!/opt/homebrew/bin/python3.11
|
|
"""
|
|
CLIP Logo Identity Integration Script
|
|
|
|
Purpose:
|
|
1. Download logo image
|
|
2. Extract CLIP ViT-L/14 embedding (768-dim)
|
|
3. Store embedding to reference_data JSONB
|
|
4. Register Logo Identity to PostgreSQL database
|
|
|
|
Test Object: Accusys Storage Logo
|
|
https://www.accusys.com.tw/wp-content/uploads/2023/03/Accusys-Orange-2017.png
|
|
|
|
Usage:
|
|
python3 scripts/clip_logo_integration.py --logo-url "URL" --name "Logo Name"
|
|
python3 scripts/clip_logo_integration.py --test-accusys
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import argparse
|
|
import requests
|
|
import psycopg2
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
import numpy as np
|
|
|
|
DATABASE_URL = os.getenv("DATABASE_URL", "postgres://accusys@localhost:5432/momentry?options=-c%20search_path=dev")
|
|
|
|
TEMP_DIR = Path("data/logo_images")
|
|
TEMP_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def download_image(image_url: str, save_path: Path) -> bool:
|
|
"""Download image from URL"""
|
|
try:
|
|
resp = requests.get(image_url, timeout=30)
|
|
resp.raise_for_status()
|
|
save_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(save_path, "wb") as f:
|
|
f.write(resp.content)
|
|
print(f"✅ Downloaded: {save_path.name} ({len(resp.content)} bytes)")
|
|
return True
|
|
except Exception as e:
|
|
print(f"❌ Download failed: {e}")
|
|
return False
|
|
|
|
|
|
def load_clip_model():
|
|
"""Load CLIP ViT-L/14 model"""
|
|
try:
|
|
import torch
|
|
from transformers import CLIPModel, CLIPProcessor
|
|
|
|
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
|
|
print(f"🔧 Loading CLIP ViT-L/14 on {device}...")
|
|
|
|
model = CLIPModel.from_pretrained("openai/clip-vit-large-patch14").to(device)
|
|
processor = CLIPProcessor.from_pretrained("openai/clip-vit-large-patch14")
|
|
|
|
print(f"✅ CLIP model loaded on {device}")
|
|
return model, processor, device
|
|
except Exception as e:
|
|
print(f"❌ Failed to load CLIP: {e}")
|
|
return None, None, None
|
|
|
|
|
|
def extract_clip_embedding(model, processor, device, image_path: Path) -> list[float] | None:
|
|
"""Extract CLIP ViT-L/14 embedding (768-dim)"""
|
|
try:
|
|
from PIL import Image
|
|
import torch
|
|
|
|
image = Image.open(image_path).convert("RGB")
|
|
|
|
inputs = processor(images=image, return_tensors="pt").to(device)
|
|
|
|
with torch.no_grad():
|
|
embedding = model.get_image_features(**inputs)
|
|
|
|
embedding = embedding.cpu().numpy().flatten().tolist()
|
|
|
|
print(f"✅ Extracted embedding: {len(embedding)}-dim")
|
|
return embedding
|
|
except Exception as e:
|
|
print(f"❌ Extraction failed: {e}")
|
|
return None
|
|
|
|
|
|
def test_mps_performance(model, processor, device, image_path: Path, iterations: int = 100):
|
|
"""Test MPS vs CPU performance"""
|
|
try:
|
|
from PIL import Image
|
|
import torch
|
|
import time
|
|
from transformers import CLIPModel
|
|
|
|
image = Image.open(image_path).convert("RGB")
|
|
|
|
print(f"\n🔧 Performance test: {iterations} iterations...")
|
|
|
|
# MPS performance
|
|
inputs_mps = processor(images=image, return_tensors="pt").to(device)
|
|
|
|
start_time = time.time()
|
|
for i in range(iterations):
|
|
with torch.no_grad():
|
|
embedding = model.get_image_features(**inputs_mps)
|
|
mps_time = time.time() - start_time
|
|
|
|
print(f" MPS: {mps_time:.3f}s ({iterations} iterations)")
|
|
print(f" MPS: {mps_time/iterations:.4f}s per image")
|
|
|
|
# CPU performance
|
|
cpu_device = torch.device("cpu")
|
|
model_cpu = CLIPModel.from_pretrained("openai/clip-vit-large-patch14").to(cpu_device)
|
|
inputs_cpu = processor(images=image, return_tensors="pt").to(cpu_device)
|
|
|
|
start_time = time.time()
|
|
for i in range(iterations):
|
|
with torch.no_grad():
|
|
embedding = model_cpu.get_image_features(**inputs_cpu)
|
|
cpu_time = time.time() - start_time
|
|
|
|
print(f" CPU: {cpu_time:.3f}s ({iterations} iterations)")
|
|
print(f" CPU: {cpu_time/iterations:.4f}s per image")
|
|
|
|
speedup = cpu_time / mps_time if mps_time > 0 else 1.0
|
|
print(f" Speedup: {speedup:.2f}x")
|
|
|
|
return {
|
|
"mps_time": mps_time / iterations,
|
|
"cpu_time": cpu_time / iterations,
|
|
"speedup": speedup,
|
|
}
|
|
except Exception as e:
|
|
print(f"❌ Performance test failed: {e}")
|
|
return None
|
|
|
|
|
|
def register_logo_identity_to_db(
|
|
name: str,
|
|
logo_url: str,
|
|
embedding: list[float],
|
|
schema: str = "dev",
|
|
) -> str | None:
|
|
"""Register Logo Identity to PostgreSQL"""
|
|
|
|
conn = psycopg2.connect(DATABASE_URL)
|
|
cur = conn.cursor()
|
|
|
|
try:
|
|
reference_data = {
|
|
"identity_embeddings": [
|
|
{
|
|
"embedding": embedding,
|
|
"source": "logo_image",
|
|
"image_url": logo_url,
|
|
"context": "brand_logo",
|
|
"created_at": datetime.now().isoformat(),
|
|
}
|
|
],
|
|
"image_urls": [logo_url],
|
|
}
|
|
|
|
sql = f"""
|
|
UPDATE {schema}.identities
|
|
SET
|
|
identity_embedding = %s,
|
|
reference_data = %s,
|
|
status = 'confirmed',
|
|
updated_at = NOW()
|
|
WHERE name = %s
|
|
RETURNING uuid;
|
|
"""
|
|
|
|
embedding_str = "[" + ",".join(str(x) for x in embedding) + "]"
|
|
|
|
cur.execute(
|
|
sql,
|
|
(
|
|
embedding_str,
|
|
json.dumps(reference_data),
|
|
name,
|
|
),
|
|
)
|
|
|
|
result = cur.fetchone()
|
|
|
|
if result:
|
|
uuid = result[0]
|
|
conn.commit()
|
|
print(f"✅ Logo Identity updated: {name} (UUID: {uuid})")
|
|
return uuid
|
|
else:
|
|
print(f"⚠️ Identity '{name}' not found, creating new...")
|
|
|
|
sql = f"""
|
|
INSERT INTO {schema}.identities (
|
|
name, identity_type, source, status,
|
|
identity_embedding, reference_data,
|
|
created_at, updated_at
|
|
) VALUES (
|
|
%s, %s, %s, %s,
|
|
%s, %s,
|
|
NOW(), NOW()
|
|
)
|
|
RETURNING uuid;
|
|
"""
|
|
|
|
cur.execute(
|
|
sql,
|
|
(
|
|
name,
|
|
"logo",
|
|
"manual",
|
|
"confirmed",
|
|
embedding_str,
|
|
json.dumps(reference_data),
|
|
),
|
|
)
|
|
|
|
uuid = cur.fetchone()[0]
|
|
conn.commit()
|
|
print(f"✅ Logo Identity created: {name} (UUID: {uuid})")
|
|
return uuid
|
|
|
|
except Exception as e:
|
|
print(f"❌ Database error: {e}")
|
|
conn.rollback()
|
|
return None
|
|
finally:
|
|
cur.close()
|
|
conn.close()
|
|
|
|
|
|
def test_similarity_search(
|
|
identity_uuid: str,
|
|
test_embeddings: list[list[float]],
|
|
threshold: float = 0.85,
|
|
schema: str = "dev",
|
|
) -> list[dict]:
|
|
"""Test similarity search against Identity"""
|
|
|
|
conn = psycopg2.connect(DATABASE_URL)
|
|
cur = conn.cursor()
|
|
|
|
try:
|
|
cur.execute(f"""
|
|
SELECT identity_embedding
|
|
FROM {schema}.identities
|
|
WHERE uuid = %s;
|
|
""", (identity_uuid,))
|
|
|
|
result = cur.fetchone()
|
|
|
|
if not result or not result[0]:
|
|
print("⚠️ Identity embedding not found")
|
|
return []
|
|
|
|
stored_embedding_raw = result[0]
|
|
|
|
if isinstance(stored_embedding_raw, str):
|
|
stored_embedding_raw = json.loads(stored_embedding_raw)
|
|
|
|
stored_embedding = np.array(stored_embedding_raw, dtype=np.float64)
|
|
|
|
matches = []
|
|
for i, test_emb in enumerate(test_embeddings):
|
|
test_emb_array = np.array(test_emb)
|
|
|
|
similarity = np.dot(stored_embedding, test_emb_array) / (
|
|
np.linalg.norm(stored_embedding) * np.linalg.norm(test_emb_array)
|
|
)
|
|
|
|
is_match = similarity >= threshold
|
|
|
|
matches.append({
|
|
"test_index": i,
|
|
"similarity": float(similarity),
|
|
"is_match": is_match,
|
|
})
|
|
|
|
print(f" Test {i+1}: similarity={similarity:.4f}, match={is_match}")
|
|
|
|
return matches
|
|
except Exception as e:
|
|
print(f"❌ Similarity search failed: {e}")
|
|
return []
|
|
finally:
|
|
cur.close()
|
|
conn.close()
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="CLIP Logo Identity Integration")
|
|
parser.add_argument("--logo-url", help="Logo image URL")
|
|
parser.add_argument("--name", help="Logo name")
|
|
parser.add_argument("--schema", default="dev", help="Database schema")
|
|
parser.add_argument("--test-accusys", action="store_true", help="Test Accusys Logo")
|
|
parser.add_argument("--performance", action="store_true", help="Run performance test")
|
|
args = parser.parse_args()
|
|
|
|
if args.test_accusys:
|
|
logo_url = "https://www.accusys.com.tw/wp-content/uploads/2023/03/Accusys-Orange-2017.png"
|
|
name = "Accusys Storage Logo"
|
|
elif args.logo_url and args.name:
|
|
logo_url = args.logo_url
|
|
name = args.name
|
|
else:
|
|
print("❌ Please provide --logo-url and --name, or use --test-accusys")
|
|
sys.exit(1)
|
|
|
|
print("=" * 60)
|
|
print("CLIP Logo Identity Integration")
|
|
print("=" * 60)
|
|
print(f"Logo: {name}")
|
|
print(f"URL: {logo_url}")
|
|
print(f"Schema: {args.schema}")
|
|
print("=" * 60)
|
|
|
|
logo_path = TEMP_DIR / f"{name.replace(' ', '_')}.png"
|
|
|
|
if not logo_path.exists():
|
|
print("\n🔧 Downloading logo...")
|
|
if not download_image(logo_url, logo_path):
|
|
sys.exit(1)
|
|
|
|
model, processor, device = load_clip_model()
|
|
if not model:
|
|
sys.exit(1)
|
|
|
|
if args.performance:
|
|
perf_result = test_mps_performance(model, processor, device, logo_path, iterations=10)
|
|
if perf_result:
|
|
print("\n📊 Performance Summary:")
|
|
print(f" MPS: {perf_result['mps_time']:.4f}s/img")
|
|
print(f" CPU: {perf_result['cpu_time']:.4f}s/img")
|
|
print(f" Speedup: {perf_result['speedup']:.2f}x")
|
|
|
|
print("\n🔧 Extracting CLIP embedding...")
|
|
embedding = extract_clip_embedding(model, processor, device, logo_path)
|
|
|
|
if not embedding:
|
|
sys.exit(1)
|
|
|
|
print("\n🔧 Registering to database...")
|
|
uuid = register_logo_identity_to_db(
|
|
name=name,
|
|
logo_url=logo_url,
|
|
embedding=embedding,
|
|
schema=args.schema,
|
|
)
|
|
|
|
if uuid:
|
|
print("\n🎉 Integration completed!")
|
|
print(f" Identity: {name}")
|
|
print(f" UUID: {uuid}")
|
|
print(f" Embedding: {len(embedding)}-dim")
|
|
print(f" URL: {logo_url}")
|
|
|
|
print("\n🔧 Testing similarity search...")
|
|
test_embeddings = [
|
|
embedding,
|
|
[0.1] * 768,
|
|
]
|
|
|
|
matches = test_similarity_search(uuid, test_embeddings, threshold=0.85, schema=args.schema)
|
|
|
|
if matches:
|
|
print("\n✅ Similarity search test passed")
|
|
else:
|
|
print("\n❌ Integration failed")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |