ManagementSystem/TrackingToolWebAU/main.py

488 lines
16 KiB
Python

from fastapi import FastAPI, UploadFile, File, Form, Depends, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse
from sqlalchemy.orm import Session
import face_recognition
import numpy as np
import os
import datetime
import threading
import logging
import cv2
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from database import SessionLocal, engine
from models import Base, Student, CheckInLog, StudentEncoding
from sqlalchemy.exc import IntegrityError
from sqlalchemy import text
from fastapi.middleware.cors import CORSMiddleware
from api import create_history, send_image
logging.basicConfig(level=logging.INFO)
# --- Encoding cache (Phương án 1: RAM cache) ---
_enc_matrix: np.ndarray | None = None # shape (N, 128)
_enc_student_ids: np.ndarray | None = None # shape (N,) int64
_enc_student_names: dict = {}
_cache_lock = threading.Lock()
_cache_dirty = True
def invalidate_encoding_cache():
global _cache_dirty
_cache_dirty = True
def _load_encoding_cache(db):
global _enc_matrix, _enc_student_ids, _enc_student_names, _cache_dirty
with _cache_lock:
if not _cache_dirty and _enc_matrix is not None:
return _enc_matrix, _enc_student_ids, _enc_student_names
rows = db.execute(
text("""
SELECT s.id AS student_id, s.name AS student_name, se.encoding AS encoding_blob
FROM student_encodings se
JOIN students s ON s.id = se.student_id
""")
).fetchall()
encodings, student_ids, names = [], [], {}
for r in rows:
try:
enc = np.frombuffer(r.encoding_blob, dtype=np.float64)
if enc.size == 128:
encodings.append(enc)
student_ids.append(r.student_id)
names[r.student_id] = r.student_name
else:
logging.warning(f"encoding size invalid for student {r.student_id}: {enc.size}")
except Exception as e:
logging.exception(f"Error decoding encoding for student {r.student_id}: {e}")
if encodings:
_enc_matrix = np.vstack(encodings)
_enc_student_ids = np.array(student_ids, dtype=np.int64)
else:
_enc_matrix = np.empty((0, 128), dtype=np.float64)
_enc_student_ids = np.array([], dtype=np.int64)
_enc_student_names = names
_cache_dirty = False
logging.info(f"Encoding cache loaded: {_enc_matrix.shape[0]} encodings, {len(names)} students")
return _enc_matrix, _enc_student_ids, _enc_student_names
# --- Image preprocessing (Phương án 3: resize trước khi detect) ---
def _preprocess_image(image_data: bytes, max_width: int = 640) -> np.ndarray:
nparr = np.frombuffer(image_data, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
h, w = img.shape[:2]
if w > max_width:
scale = max_width / w
img = cv2.resize(img, (int(w * scale), int(h * scale)), interpolation=cv2.INTER_AREA)
return cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
app = FastAPI()
# --- CORS ---
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
Base.metadata.create_all(bind=engine)
UPLOAD_DIR = "./uploads"
os.makedirs(UPLOAD_DIR, exist_ok=True)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
app.mount("/static", StaticFiles(directory="static"), name="static")
@app.get("/")
def root():
return FileResponse("static/index.html")
@app.post("/register")
async def register_face(
name: str = Form(...),
email: str = Form(...),
avatar: str = Form(None), # OPTIONAL
file: UploadFile = File(...)
):
db = SessionLocal()
# Check duplicate email
existing_email = db.execute(
text("SELECT id FROM students WHERE email = :email"),
{"email": email}
).fetchone()
# Save image
image_data = await file.read()
image_path = f"./uploads/{file.filename}"
with open(image_path, "wb") as f:
f.write(image_data)
# Encode face — dùng _preprocess_image để tránh load lại từ disk
image = _preprocess_image(image_data)
encodings = face_recognition.face_encodings(image)
if not encodings:
db.close()
return JSONResponse(
content={"message": "Không phát hiện khuôn mặt."},
status_code=400
)
encoding_bytes = encodings[0].tobytes()
try:
if existing_email:
# Email exists → just add new encoding
student_id = existing_email[0]
db.execute(
text("""
INSERT INTO student_encodings (student_id, encoding)
VALUES (:student_id, :encoding)
"""),
{"student_id": student_id, "encoding": encoding_bytes}
)
db.commit()
invalidate_encoding_cache()
return {"message": "Đã thêm encoding mới."}
else:
# Insert student + encoding trong cùng 1 transaction để tránh student không có encoding
try:
db.execute(
text("""
INSERT INTO students (name, email, avatar)
VALUES (:name, :email, :avatar)
"""),
{
"name": name,
"email": email,
"avatar": avatar,
}
)
student_id = db.execute(text("SELECT LAST_INSERT_ID()")).fetchone()[0]
db.execute(
text("""
INSERT INTO student_encodings (student_id, encoding)
VALUES (:student_id, :encoding)
"""),
{"student_id": student_id, "encoding": encoding_bytes}
)
db.commit()
invalidate_encoding_cache()
return {"message": "Đăng ký thành công."}
except IntegrityError:
db.rollback()
raise HTTPException(status_code=400, detail="Email đã tồn tại.")
finally:
db.close()
@app.post("/register-simple")
async def register_student(
name: str = Form(...),
email: str = Form(...),
avatar: str = Form(None), # OPTIONAL
):
db = SessionLocal()
try:
# Kiểm tra xem student đã tồn tại chưa
existing = db.execute(
text("SELECT id FROM students WHERE email = :email"),
{"email": email}
).fetchone()
if existing:
# UPDATE
db.execute(
text("""
UPDATE students
SET name = :name,
avatar = :avatar
WHERE email = :email
"""),
{
"name": name,
"avatar": avatar,
"email": email
}
)
db.commit()
return JSONResponse({"message": "Cập nhật thành công."}, status_code=200)
else:
# INSERT
db.execute(
text("""
INSERT INTO students (name, email, avatar)
VALUES (:name, :email, :avatar)
"""),
{
"name": name,
"email": email,
"avatar": avatar
}
)
db.commit()
return JSONResponse({"message": "Đăng ký thành công."}, status_code=201)
except IntegrityError:
db.rollback()
return JSONResponse(
{"message": "Lỗi cơ sở dữ liệu."},
status_code=400
)
finally:
db.close()
@app.post("/checkin")
async def checkin(background_tasks: BackgroundTasks, file: UploadFile = File(...), camera_id: str = Form("cam1"), db: Session = Depends(get_db)):
image_data = await file.read()
# Phương án 3: resize ảnh trước khi detect — bỏ disk I/O
unknown_img = _preprocess_image(image_data)
unknown_encodings = face_recognition.face_encodings(unknown_img)
if not unknown_encodings:
return {"message": "No face detected.", "status": False}
unknown_encoding = unknown_encodings[0]
# TÙY CHỈNH: threshold nhỏ hơn → ít nhầm lẫn, nhưng dễ false negative.
# Thử: 0.4 (chặt), 0.45 (cân bằng), 0.55 (lỏng)
DIST_THRESHOLD = 0.42
# Phương án 1: dùng cache RAM thay vì query DB mỗi request
enc_matrix, enc_sids, enc_names = _load_encoding_cache(db)
if enc_matrix.shape[0] == 0:
return {"message": "No known encodings in DB.", "status": False}
# Phương án 2: vectorized — tính tất cả distances 1 lần qua BLAS
all_dists = face_recognition.face_distance(enc_matrix, unknown_encoding)
# Tìm min distance theo từng student
best_student = None
best_distance = float("inf")
second_best_distance = float("inf")
for sid in np.unique(enc_sids):
mask = enc_sids == sid
min_dist = float(np.min(all_dists[mask]))
logging.info(f"Student {sid} ({enc_names.get(sid)}) min_dist = {min_dist:.4f}")
if min_dist < best_distance:
second_best_distance = best_distance
best_distance = min_dist
best_student = int(sid)
elif min_dist < second_best_distance:
second_best_distance = min_dist
# Debug log best/second distances
logging.info(f"Best student {best_student} dist={best_distance:.4f}, second_best={second_best_distance:.4f}")
# Ratio check: nếu best much better than second best => more confident
ratio_ok = True
if second_best_distance < float("inf"):
ratio = best_distance / (second_best_distance + 1e-8)
logging.info(f"Distance ratio (best/second) = {ratio:.4f}")
# Nếu ratio quá gần 1 (ví dụ > 0.85) => không đủ phân biệt
if ratio > 0.85:
ratio_ok = False
# Quyết định match nếu best_distance nhỏ hơn threshold và ratio ok
if best_distance <= DIST_THRESHOLD and ratio_ok and best_student is not None:
# kiểm tra recent check (nửa phút trước)
now = datetime.datetime.now()
recent_check = db.execute(
text("""
SELECT id FROM checkin_logs
WHERE student_id = :student_id
AND time > :time_threshold
"""),
{
"student_id": best_student,
"time_threshold": now - datetime.timedelta(minutes=0.5)
}
).fetchone()
if recent_check:
return {"message": f"{enc_names.get(best_student)} already checked in recently.", "status": True}
last_log = db.execute(
text("""
SELECT status FROM checkin_logs
WHERE student_id = :student_id
ORDER BY time DESC LIMIT 1
"""),
{"student_id": best_student}
).fetchone()
status = "check out" if last_log and last_log.status == "check in" else "check in"
insert_result = db.execute(
text("""
INSERT INTO checkin_logs (student_id, time, camera_id, status)
VALUES (:student_id, :time, :camera_id, :status)
"""),
{
"student_id": best_student,
"time": now,
"camera_id": camera_id,
"status": status
}
)
log_id = insert_result.lastrowid
db.commit()
def _sync_to_ms(name: str, time_string: str, img_data: bytes, local_status: str, checkin_log_id: int):
try:
# Gửi thông tin check-in lên MS server để tạo history
ms_response = create_history({"name": name.split('\n')[0], "time_string": time_string, "status": local_status})
id_log = ms_response.get('data', {}).get('id', 0)
ms_status = ms_response.get('data', {}).get('status', local_status)
# Nếu MS server trả về status khác với status local thì đồng bộ lại DB
if ms_status != local_status:
fix_db = SessionLocal()
try:
fix_db.execute(
text("UPDATE checkin_logs SET status = :status WHERE id = :id"),
{"status": ms_status, "id": checkin_log_id}
)
fix_db.commit()
logging.info(f"Corrected log #{checkin_log_id} status: {local_status}{ms_status}")
finally:
fix_db.close()
# Upload ảnh check-in lên MS server gắn với log id vừa tạo
send_image(id_log, img_data, name, ms_status)
except Exception as e:
logging.error(f"MS sync error: {e}")
# Chạy đồng bộ MS ở background để không block response trả về client
# TODO: bỏ comment khi deploy thật
# background_tasks.add_task(
# _sync_to_ms,
# enc_names.get(best_student),
# f"{datetime.datetime.now()}",
# image_data,
# status,
# log_id,
# )
student = db.execute(
text("""
SELECT id, name, email
FROM students
WHERE id = :id
"""),
{"id": best_student}
).fetchone()
user_data = {
"id": student.id,
"name": student.name,
"email": student.email,
} if student else None
return {"message": f"{status} successful for {enc_names.get(best_student)} (dist={best_distance:.4f})", "status": True, "status_type": status, "data": user_data}
# Nếu không thỏa threshold/rule thì trả no match (và log lý do)
reasons = []
if best_distance > DIST_THRESHOLD:
reasons.append(f"best_distance({best_distance:.4f}) > threshold({DIST_THRESHOLD})")
if not ratio_ok:
reasons.append(f"ratio not confident ({best_distance:.4f}/{second_best_distance:.4f})")
logging.info("No confident match: " + "; ".join(reasons))
return {"message": "No match found.", "reasons": reasons, "status": False}
@app.get("/logs")
def get_logs(db: Session = Depends(get_db)):
logs = db.execute(
text("""
SELECT s.name, cl.time, cl.camera_id, cl.status
FROM checkin_logs cl
JOIN students s ON cl.student_id = s.id
ORDER BY cl.time DESC
LIMIT 20
""")
).fetchall()
result = []
for log in logs:
result.append({
"name": log.name,
"time": log.time.strftime("%Y-%m-%d %H:%M:%S"),
"camera_id": log.camera_id,
"status": log.status
})
return result
@app.get("/users")
def get_users(db: Session = Depends(get_db)):
# Lấy danh sách student
students = db.execute(
text("""
SELECT id, name, email, avatar
FROM students
ORDER BY name DESC
""")
).fetchall()
result = []
for stu in students:
student_id = stu.id
# Lấy tối đa 5 checkpoint mới nhất
checkpoints = db.execute(
text("""
SELECT id, time, camera_id
FROM checkin_logs
WHERE student_id = :sid
ORDER BY time DESC
LIMIT 5
"""),
{"sid": student_id}
).fetchall()
result.append({
"id": stu.id,
"name": stu.name,
"email": stu.email,
"avatar": stu.avatar,
"checkpoints": [
{
"id": c.id,
"time": c.time,
"camera_id": c.camera_id
}
for c in checkpoints
]
})
return result