from fastapi import FastAPI, UploadFile, File, Form, Depends, HTTPException, BackgroundTasks from fastapi.responses import JSONResponse from sqlalchemy.orm import Session import face_recognition import numpy as np import os import datetime import threading import logging import cv2 from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse from database import SessionLocal, engine from models import Base, Student, CheckInLog, StudentEncoding from sqlalchemy.exc import IntegrityError from sqlalchemy import text from fastapi.middleware.cors import CORSMiddleware from api import create_history, send_image logging.basicConfig(level=logging.INFO) # --- Encoding cache (Phương án 1: RAM cache) --- _enc_matrix: np.ndarray | None = None # shape (N, 128) _enc_student_ids: np.ndarray | None = None # shape (N,) int64 _enc_student_names: dict = {} _cache_lock = threading.Lock() _cache_dirty = True def invalidate_encoding_cache(): global _cache_dirty _cache_dirty = True def _load_encoding_cache(db): global _enc_matrix, _enc_student_ids, _enc_student_names, _cache_dirty with _cache_lock: if not _cache_dirty and _enc_matrix is not None: return _enc_matrix, _enc_student_ids, _enc_student_names rows = db.execute( text(""" SELECT s.id AS student_id, s.name AS student_name, se.encoding AS encoding_blob FROM student_encodings se JOIN students s ON s.id = se.student_id """) ).fetchall() encodings, student_ids, names = [], [], {} for r in rows: try: enc = np.frombuffer(r.encoding_blob, dtype=np.float64) if enc.size == 128: encodings.append(enc) student_ids.append(r.student_id) names[r.student_id] = r.student_name else: logging.warning(f"encoding size invalid for student {r.student_id}: {enc.size}") except Exception as e: logging.exception(f"Error decoding encoding for student {r.student_id}: {e}") if encodings: _enc_matrix = np.vstack(encodings) _enc_student_ids = np.array(student_ids, dtype=np.int64) else: _enc_matrix = np.empty((0, 128), dtype=np.float64) _enc_student_ids = np.array([], dtype=np.int64) _enc_student_names = names _cache_dirty = False logging.info(f"Encoding cache loaded: {_enc_matrix.shape[0]} encodings, {len(names)} students") return _enc_matrix, _enc_student_ids, _enc_student_names # --- Image preprocessing (Phương án 3: resize trước khi detect) --- def _preprocess_image(image_data: bytes, max_width: int = 640) -> np.ndarray: nparr = np.frombuffer(image_data, np.uint8) img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) h, w = img.shape[:2] if w > max_width: scale = max_width / w img = cv2.resize(img, (int(w * scale), int(h * scale)), interpolation=cv2.INTER_AREA) return cv2.cvtColor(img, cv2.COLOR_BGR2RGB) app = FastAPI() # --- CORS --- app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) Base.metadata.create_all(bind=engine) UPLOAD_DIR = "./uploads" os.makedirs(UPLOAD_DIR, exist_ok=True) def get_db(): db = SessionLocal() try: yield db finally: db.close() app.mount("/static", StaticFiles(directory="static"), name="static") @app.get("/") def root(): return FileResponse("static/index.html") @app.post("/register") async def register_face( name: str = Form(...), email: str = Form(...), avatar: str = Form(None), # OPTIONAL file: UploadFile = File(...) ): db = SessionLocal() # Check duplicate email existing_email = db.execute( text("SELECT id FROM students WHERE email = :email"), {"email": email} ).fetchone() # Save image image_data = await file.read() image_path = f"./uploads/{file.filename}" with open(image_path, "wb") as f: f.write(image_data) # Encode face — dùng _preprocess_image để tránh load lại từ disk image = _preprocess_image(image_data) encodings = face_recognition.face_encodings(image) if not encodings: db.close() return JSONResponse( content={"message": "Không phát hiện khuôn mặt."}, status_code=400 ) encoding_bytes = encodings[0].tobytes() try: if existing_email: # Email exists → just add new encoding student_id = existing_email[0] db.execute( text(""" INSERT INTO student_encodings (student_id, encoding) VALUES (:student_id, :encoding) """), {"student_id": student_id, "encoding": encoding_bytes} ) db.commit() invalidate_encoding_cache() return {"message": "Đã thêm encoding mới."} else: # Insert new student (avatar nullable) db.execute( text(""" INSERT INTO students (name, email, avatar) VALUES (:name, :email, :avatar) """), { "name": name, "email": email, "avatar": avatar, } ) db.commit() student_id = db.execute(text("SELECT LAST_INSERT_ID()")).fetchone()[0] # Insert encoding db.execute( text(""" INSERT INTO student_encodings (student_id, encoding) VALUES (:student_id, :encoding) """), {"student_id": student_id, "encoding": encoding_bytes} ) db.commit() invalidate_encoding_cache() return {"message": "Đăng ký thành công."} except IntegrityError: db.rollback() raise HTTPException(status_code=400, detail="Email đã tồn tại.") finally: db.close() @app.post("/register-simple") async def register_student( name: str = Form(...), email: str = Form(...), avatar: str = Form(None), # OPTIONAL ): db = SessionLocal() try: # Kiểm tra xem student đã tồn tại chưa existing = db.execute( text("SELECT id FROM students WHERE email = :email"), {"email": email} ).fetchone() if existing: # UPDATE db.execute( text(""" UPDATE students SET name = :name, avatar = :avatar WHERE email = :email """), { "name": name, "avatar": avatar, "email": email } ) db.commit() return JSONResponse({"message": "Cập nhật thành công."}, status_code=200) else: # INSERT db.execute( text(""" INSERT INTO students (name, email, avatar) VALUES (:name, :email, :avatar) """), { "name": name, "email": email, "avatar": avatar } ) db.commit() return JSONResponse({"message": "Đăng ký thành công."}, status_code=201) except IntegrityError: db.rollback() return JSONResponse( {"message": "Lỗi cơ sở dữ liệu."}, status_code=400 ) finally: db.close() @app.post("/checkin") async def checkin(background_tasks: BackgroundTasks, file: UploadFile = File(...), camera_id: str = Form("cam1"), db: Session = Depends(get_db)): image_data = await file.read() # Phương án 3: resize ảnh trước khi detect — bỏ disk I/O unknown_img = _preprocess_image(image_data) unknown_encodings = face_recognition.face_encodings(unknown_img) if not unknown_encodings: return {"message": "No face detected.", "status": False} unknown_encoding = unknown_encodings[0] # TÙY CHỈNH: threshold nhỏ hơn → ít nhầm lẫn, nhưng dễ false negative. # Thử: 0.4 (chặt), 0.45 (cân bằng), 0.55 (lỏng) DIST_THRESHOLD = 0.42 # Phương án 1: dùng cache RAM thay vì query DB mỗi request enc_matrix, enc_sids, enc_names = _load_encoding_cache(db) if enc_matrix.shape[0] == 0: return {"message": "No known encodings in DB.", "status": False} # Phương án 2: vectorized — tính tất cả distances 1 lần qua BLAS all_dists = face_recognition.face_distance(enc_matrix, unknown_encoding) # Tìm min distance theo từng student best_student = None best_distance = float("inf") second_best_distance = float("inf") for sid in np.unique(enc_sids): mask = enc_sids == sid min_dist = float(np.min(all_dists[mask])) logging.info(f"Student {sid} ({enc_names.get(sid)}) min_dist = {min_dist:.4f}") if min_dist < best_distance: second_best_distance = best_distance best_distance = min_dist best_student = int(sid) elif min_dist < second_best_distance: second_best_distance = min_dist # Debug log best/second distances logging.info(f"Best student {best_student} dist={best_distance:.4f}, second_best={second_best_distance:.4f}") # Ratio check: nếu best much better than second best => more confident ratio_ok = True if second_best_distance < float("inf"): ratio = best_distance / (second_best_distance + 1e-8) logging.info(f"Distance ratio (best/second) = {ratio:.4f}") # Nếu ratio quá gần 1 (ví dụ > 0.85) => không đủ phân biệt if ratio > 0.85: ratio_ok = False # Quyết định match nếu best_distance nhỏ hơn threshold và ratio ok if best_distance <= DIST_THRESHOLD and ratio_ok and best_student is not None: # kiểm tra recent check (nửa phút trước) now = datetime.datetime.now() recent_check = db.execute( text(""" SELECT id FROM checkin_logs WHERE student_id = :student_id AND time > :time_threshold """), { "student_id": best_student, "time_threshold": now - datetime.timedelta(minutes=0.5) } ).fetchone() if recent_check: return {"message": f"{enc_names.get(best_student)} already checked in recently.", "status": True} last_log = db.execute( text(""" SELECT status FROM checkin_logs WHERE student_id = :student_id ORDER BY time DESC LIMIT 1 """), {"student_id": best_student} ).fetchone() status = "check out" if last_log and last_log.status == "check in" else "check in" db.execute( text(""" INSERT INTO checkin_logs (student_id, time, camera_id, status) VALUES (:student_id, :time, :camera_id, :status) """), { "student_id": best_student, "time": now, "camera_id": camera_id, "status": status } ) db.commit() log_id = db.execute(text("SELECT LAST_INSERT_ID()")).scalar() def _sync_to_ms(name: str, time_string: str, img_data: bytes, local_status: str, checkin_log_id: int): try: ms_response = create_history({"name": name.split('\n')[0], "time_string": time_string, "status": local_status}) id_log = ms_response.get('data', {}).get('id', 0) ms_status = ms_response.get('data', {}).get('status', local_status) if ms_status != local_status: fix_db = SessionLocal() try: fix_db.execute( text("UPDATE checkin_logs SET status = :status WHERE id = :id"), {"status": ms_status, "id": checkin_log_id} ) fix_db.commit() logging.info(f"Corrected log #{checkin_log_id} status: {local_status} → {ms_status}") finally: fix_db.close() send_image(id_log, img_data, name, ms_status) except Exception as e: logging.error(f"MS sync error: {e}") background_tasks.add_task( _sync_to_ms, enc_names.get(best_student), f"{datetime.datetime.now()}", image_data, status, log_id, ) student = db.execute( text(""" SELECT id, name, email FROM students WHERE id = :id """), {"id": best_student} ).fetchone() user_data = { "id": student.id, "name": student.name, "email": student.email, } if student else None return {"message": f"{status} successful for {enc_names.get(best_student)} (dist={best_distance:.4f})", "status": True, "status_type": status, "data": user_data} # Nếu không thỏa threshold/rule thì trả no match (và log lý do) reasons = [] if best_distance > DIST_THRESHOLD: reasons.append(f"best_distance({best_distance:.4f}) > threshold({DIST_THRESHOLD})") if not ratio_ok: reasons.append(f"ratio not confident ({best_distance:.4f}/{second_best_distance:.4f})") logging.info("No confident match: " + "; ".join(reasons)) return {"message": "No match found.", "reasons": reasons, "status": False} @app.get("/logs") def get_logs(db: Session = Depends(get_db)): logs = db.execute( text(""" SELECT s.name, cl.time, cl.camera_id, cl.status FROM checkin_logs cl JOIN students s ON cl.student_id = s.id ORDER BY cl.time DESC LIMIT 20 """) ).fetchall() result = [] for log in logs: result.append({ "name": log.name, "time": log.time.strftime("%Y-%m-%d %H:%M:%S"), "camera_id": log.camera_id, "status": log.status }) return result @app.get("/users") def get_users(db: Session = Depends(get_db)): # Lấy danh sách student students = db.execute( text(""" SELECT id, name, email, avatar FROM students ORDER BY name DESC """) ).fetchall() result = [] for stu in students: student_id = stu.id # Lấy tối đa 5 checkpoint mới nhất checkpoints = db.execute( text(""" SELECT id, time, camera_id FROM checkin_logs WHERE student_id = :sid ORDER BY time DESC LIMIT 5 """), {"sid": student_id} ).fetchall() result.append({ "id": stu.id, "name": stu.name, "email": stu.email, "avatar": stu.avatar, "checkpoints": [ { "id": c.id, "time": c.time, "camera_id": c.camera_id } for c in checkpoints ] }) return result