ManagementSystem/TrackingToolWeb/sync.py

267 lines
7.7 KiB
Python

from api import users
from database import SessionLocal
import os
import requests
import tempfile
import face_recognition
from sqlalchemy.exc import IntegrityError
from sqlalchemy import text
from database import SessionLocal
import json
URL_BASE_RESOURCE = "http://172.16.6.38:8000/image/storage/"
def register_face_handler(name: str, email: str, avatar: str | None, image_url: str):
print(f"[DEBUG] Bắt đầu register_face_handler với email: {email}, image_url: {image_url}")
db = SessionLocal()
try:
# 1. Tải ảnh từ URL
try:
res = requests.get(image_url)
res.raise_for_status()
except Exception as e:
print(f"[ERROR] Không tải được ảnh từ URL {image_url}: {e}")
return {"status": False, "message": "Không tải được ảnh từ URL."}
# 2. Lưu ảnh vào file tạm
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp:
tmp.write(res.content)
tmp_path = tmp.name
print(f"[DEBUG] Ảnh đã lưu tạm ở: {tmp_path}")
# 3. Encode khuôn mặt
image = face_recognition.load_image_file(tmp_path)
encodings = face_recognition.face_encodings(image)
if not encodings:
print("[ERROR] Không phát hiện khuôn mặt trong ảnh.")
return {"status": False, "message": "Không phát hiện khuôn mặt."}
encoding_bytes = encodings[0].tobytes()
print("[DEBUG] Đã encode khuôn mặt.")
# 4. Check email tồn tại
print(f"[DEBUG] Kiểm tra email tồn tại: {email}")
existing = db.execute(
text("SELECT id FROM students WHERE email = :email"),
{"email": email}
).fetchone()
if existing:
student_id = existing[0]
print(f"[DEBUG] Email đã tồn tại, student_id = {student_id}. Thêm encoding mới…")
db.execute(
text("""
INSERT INTO student_encodings (student_id, encoding)
VALUES (:student_id, :encoding)
"""),
{"student_id": student_id, "encoding": encoding_bytes}
)
db.commit()
return {"status": True, "message": "Đã thêm encoding mới."}
# 5. Email chưa tồn tại → tạo student mới
print(f"[DEBUG] Email chưa tồn tại, tạo student mới: {name}, {email}")
db.execute(
text("""
INSERT INTO students (name, email, avatar)
VALUES (:name, :email, :avatar)
"""),
{
"name": name,
"email": email,
"avatar": avatar,
}
)
db.commit()
student_id = db.execute(text("SELECT LAST_INSERT_ID()")).fetchone()[0]
print(f"[DEBUG] Student mới ID = {student_id}")
# 6. Lưu encoding
db.execute(
text("""
INSERT INTO student_encodings (student_id, encoding)
VALUES (:student_id, :encoding)
"""),
{"student_id": student_id, "encoding": encoding_bytes}
)
db.commit()
return {"status": True, "message": "Đăng ký thành công."}
except IntegrityError as e:
db.rollback()
print(f"[ERROR] IntegrityError (email có thể đã tồn tại): {e}")
return {"status": False, "message": "Email đã tồn tại."}
except Exception as e:
print(f"[ERROR] Lỗi không xác định: {e}")
return {"status": False, "message": "Lỗi server."}
finally:
db.close()
def extract_images(history_list):
images = []
for day_item in history_list:
values = day_item.get("values", [])
for v in values:
img = v.get("image")
if img:
images.append(img)
return images
def sync_data_user():
response = users({"month": 11, "year": 2025})
if not response.get("status"):
return
raw_data = response.get("data")
if isinstance(raw_data, str):
try:
data = json.loads(raw_data)
except:
print(raw_data)
return
else:
data = raw_data
db = SessionLocal()
for item in data:
histories = item.get("history", [])
user = item.get("user")
if len(histories) <= 0:
continue
# 👉 Lấy số lượng encoding hiện có trong DB
try:
count = db.execute(
text("""
SELECT COUNT(*)
FROM student_encodings se
JOIN students s ON s.id = se.student_id
WHERE s.email = :email
"""),
{"email": user.get("email")}
).fetchone()[0]
except Exception as e:
print("[ERROR] Khi lấy count:", e)
continue
# 👉 Nếu đủ 5 bản ghi → SKIP người này
limit = 10
if count >= limit:
print(f"==> Bỏ qua {user.get('email')} vì đã đủ {limit} encoding ({count}/{limit})")
continue
# 👉 Nếu chưa đủ thì mới xử lý ảnh
histories_list = extract_images(histories)
for image in histories_list:
# Kiểm tra lại lần nữa trước khi thêm (tránh dư khi có nhiềsu ảnh)
if count >= limit:
print(f"==> Đã đạt {limit} encoding, dừng cho {user.get('email')}")
break
avatar = URL_BASE_RESOURCE + user.get("avatar", "")
image_url = URL_BASE_RESOURCE + image
print(user.get("name"), image_url)
result = register_face_handler(
name=user.get("name"),
email=user.get("email"),
avatar=avatar,
image_url=image_url
)
print("Result:", result)
# Tăng biến đếm sau mỗi lần thêm
if result.get("status"):
count += 1
db.close()
return response
def test_valid_data():
response = users({"month": 10, "year": 2025})
if not response.get("status"):
print("API trả status=False")
return
raw_data = response.get("data")
if isinstance(raw_data, str):
try:
data = json.loads(raw_data)
except Exception as e:
print("[ERROR] Không parse được data:", e)
return
else:
data = raw_data
for item in data:
histories = item.get("history", [])
user = item.get("user")
if len(histories) <= 0 or not user:
continue
histories_list = extract_images(histories)
for image in histories_list:
# Tải ảnh từ server trước khi gửi
image_url = URL_BASE_RESOURCE + image
try:
res = requests.get(image_url)
res.raise_for_status()
except Exception as e:
print(f"[ERROR] Không tải được ảnh {image_url}: {e}")
continue
# Lưu tạm để upload
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp:
tmp.write(res.content)
tmp_path = tmp.name
# Gửi lên API
with open(tmp_path, "rb") as f:
r = requests.post(
"http://localhost:8000/checkin",
files={"file": f},
data={"camera_id": "cam1"}
)
print(r.status_code, r.json(), user.get("name"))
# Xóa file tạm
os.remove(tmp_path)
return response
sync_data_user()
# test_valid_data()