ManagementSystem/TrackingToolWeb/main.py

438 lines
14 KiB
Python

from fastapi import FastAPI, UploadFile, File, Form, Depends, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse
from sqlalchemy.orm import Session
import face_recognition
import numpy as np
import os
import datetime
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from database import SessionLocal, engine
from models import Base, Student, CheckInLog, StudentEncoding
from sqlalchemy.exc import IntegrityError
from sqlalchemy import text
from fastapi.middleware.cors import CORSMiddleware
from api import create_history, send_image
app = FastAPI()
# --- CORS ---
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
Base.metadata.create_all(bind=engine)
UPLOAD_DIR = "./uploads"
os.makedirs(UPLOAD_DIR, exist_ok=True)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
app.mount("/static", StaticFiles(directory="static"), name="static")
@app.get("/")
def root():
return FileResponse("static/index.html")
@app.post("/register")
async def register_face(
name: str = Form(...),
email: str = Form(...),
avatar: str = Form(None), # OPTIONAL
file: UploadFile = File(...)
):
db = SessionLocal()
# Check duplicate email
existing_email = db.execute(
text("SELECT id FROM students WHERE email = :email"),
{"email": email}
).fetchone()
# Save image
image_data = await file.read()
image_path = f"./uploads/{file.filename}"
with open(image_path, "wb") as f:
f.write(image_data)
# Encode face
image = face_recognition.load_image_file(image_path)
encodings = face_recognition.face_encodings(image)
if not encodings:
db.close()
return JSONResponse(
content={"message": "Không phát hiện khuôn mặt."},
status_code=400
)
encoding_bytes = encodings[0].tobytes()
try:
if existing_email:
# Email exists → just add new encoding
student_id = existing_email[0]
db.execute(
text("""
INSERT INTO student_encodings (student_id, encoding)
VALUES (:student_id, :encoding)
"""),
{"student_id": student_id, "encoding": encoding_bytes}
)
db.commit()
return {"message": "Đã thêm encoding mới."}
else:
# Insert new student (avatar nullable)
db.execute(
text("""
INSERT INTO students (name, email, avatar)
VALUES (:name, :email, :avatar)
"""),
{
"name": name,
"email": email,
"avatar": avatar,
}
)
db.commit()
student_id = db.execute(text("SELECT LAST_INSERT_ID()")).fetchone()[0]
# Insert encoding
db.execute(
text("""
INSERT INTO student_encodings (student_id, encoding)
VALUES (:student_id, :encoding)
"""),
{"student_id": student_id, "encoding": encoding_bytes}
)
db.commit()
return {"message": "Đăng ký thành công."}
except IntegrityError:
db.rollback()
raise HTTPException(status_code=400, detail="Email đã tồn tại.")
finally:
db.close()
@app.post("/register-simple")
async def register_student(
name: str = Form(...),
email: str = Form(...),
avatar: str = Form(None), # OPTIONAL
):
db = SessionLocal()
try:
# Kiểm tra xem student đã tồn tại chưa
existing = db.execute(
text("SELECT id FROM students WHERE email = :email"),
{"email": email}
).fetchone()
if existing:
# UPDATE
db.execute(
text("""
UPDATE students
SET name = :name,
avatar = :avatar
WHERE email = :email
"""),
{
"name": name,
"avatar": avatar,
"email": email
}
)
db.commit()
return JSONResponse({"message": "Cập nhật thành công."}, status_code=200)
else:
# INSERT
db.execute(
text("""
INSERT INTO students (name, email, avatar)
VALUES (:name, :email, :avatar)
"""),
{
"name": name,
"email": email,
"avatar": avatar
}
)
db.commit()
return JSONResponse({"message": "Đăng ký thành công."}, status_code=201)
except IntegrityError:
db.rollback()
return JSONResponse(
{"message": "Lỗi cơ sở dữ liệu."},
status_code=400
)
finally:
db.close()
@app.post("/checkin")
async def checkin(background_tasks: BackgroundTasks, file: UploadFile = File(...), camera_id: str = Form("cam1"), db: Session = Depends(get_db)):
import logging
logging.basicConfig(level=logging.INFO)
image_data = await file.read()
path = os.path.join(UPLOAD_DIR, "checkin.jpg")
with open(path, "wb") as f:
f.write(image_data)
unknown_img = face_recognition.load_image_file(path)
# Option: dùng CNN detector (chậm nhưng chính xác hơn) nếu bạn đã cài dlib với CUDA / muốn chính xác tối đa:
# unknown_locations = face_recognition.face_locations(unknown_img, model="cnn")
# unknown_encodings = face_recognition.face_encodings(unknown_img, unknown_locations)
unknown_encodings = face_recognition.face_encodings(unknown_img)
if not unknown_encodings:
return {"message": "No face detected.", "status": False}
unknown_encoding = unknown_encodings[0]
# TÙY CHỈNH: threshold nhỏ hơn → ít nhầm lẫn, nhưng dễ false negative.
# Thử: 0.4 (chặt), 0.45 (cân bằng), 0.55 (lỏng)
DIST_THRESHOLD = 0.42
# Lấy tất cả encodings (mỗi row là một encoding blob) kèm student info
rows = db.execute(
text("""
SELECT s.id AS student_id, s.name AS student_name, se.encoding AS encoding_blob
FROM student_encodings se
JOIN students s ON s.id = se.student_id
""")
).fetchall()
# Gom các encoding theo student_id
from collections import defaultdict
student_encodings = defaultdict(list)
student_names = {}
for r in rows:
sid = r.student_id
student_names[sid] = r.student_name
# chuyển BLOB -> numpy array đúng dtype & shape
try:
enc = np.frombuffer(r.encoding_blob, dtype=np.float64)
# Một bản encoding phải dài 128
if enc.size == 128:
student_encodings[sid].append(enc)
else:
logging.warning(f"encoding size invalid for student {sid}: {enc.size}")
except Exception as e:
logging.exception(f"Error decoding encoding for student {sid}: {e}")
# Nếu không có encoding nào trong DB
if not student_encodings:
return {"message": "No known encodings in DB.", "status": False}
# Tìm khoảng cách nhỏ nhất cho từng student
best_student = None
best_distance = float("inf")
second_best_distance = float("inf")
for sid, enc_list in student_encodings.items():
# calc distances between unknown and all encs of this student
try:
dists = face_recognition.face_distance(enc_list, unknown_encoding) # returns array
except Exception:
# fallback if enc_list is list of 1D arrays -> convert to 2D array
arr = np.vstack(enc_list)
dists = face_recognition.face_distance(arr, unknown_encoding)
min_dist = float(np.min(dists))
logging.info(f"Student {sid} ({student_names.get(sid)}) min_dist = {min_dist:.4f}")
# update best / second best global
if min_dist < best_distance:
second_best_distance = best_distance
best_distance = min_dist
best_student = sid
elif min_dist < second_best_distance:
second_best_distance = min_dist
# Debug log best/second distances
logging.info(f"Best student {best_student} dist={best_distance:.4f}, second_best={second_best_distance:.4f}")
# Ratio check: nếu best much better than second best => more confident
ratio_ok = True
if second_best_distance < float("inf"):
ratio = best_distance / (second_best_distance + 1e-8)
logging.info(f"Distance ratio (best/second) = {ratio:.4f}")
# Nếu ratio quá gần 1 (ví dụ > 0.85) => không đủ phân biệt
if ratio > 0.85:
ratio_ok = False
# Quyết định match nếu best_distance nhỏ hơn threshold và ratio ok
if best_distance <= DIST_THRESHOLD and ratio_ok and best_student is not None:
# kiểm tra recent check (nửa phút trước)
now = datetime.datetime.now()
recent_check = db.execute(
text("""
SELECT id FROM checkin_logs
WHERE student_id = :student_id
AND time > :time_threshold
"""),
{
"student_id": best_student,
"time_threshold": now - datetime.timedelta(minutes=0.5)
}
).fetchone()
if recent_check:
return {"message": f"{student_names.get(best_student)} already checked in recently.", "status": True}
# thêm dô đây
id_log = 0
ms_response = create_history({"name": student_names.get(best_student).split('\n')[0], "time_string": f"{datetime.datetime.now()}", "status": "check in"})
id_log = ms_response.get('data').get('id')
status = ms_response.get('data').get('status')
# reset pointer
file.file.seek(0)
background_tasks.add_task(
send_image,
id_log,
image_data, # truyền bytes, không phải UploadFile
student_names.get(best_student),
status
)
db.execute(
text("""
INSERT INTO checkin_logs (student_id, time, camera_id, status)
VALUES (:student_id, :time, :camera_id, :status)
"""),
{
"student_id": best_student,
"time": now,
"camera_id": camera_id,
"status": status
}
)
db.commit()
student = db.execute(
text("""
SELECT id, name, email
FROM students
WHERE id = :id
"""),
{"id": best_student}
).fetchone()
user_data = {
"id": student.id,
"name": student.name,
"email": student.email,
} if student else None
return {"message": f"Check-in successful for {student_names.get(best_student)} (dist={best_distance:.4f})", "status": True, "status_type":status, "data": user_data}
# Nếu không thỏa threshold/rule thì trả no match (và log lý do)
reasons = []
if best_distance > DIST_THRESHOLD:
reasons.append(f"best_distance({best_distance:.4f}) > threshold({DIST_THRESHOLD})")
if not ratio_ok:
reasons.append(f"ratio not confident ({best_distance:.4f}/{second_best_distance:.4f})")
logging.info("No confident match: " + "; ".join(reasons))
return {"message": "No match found.", "reasons": reasons, "status": False}
@app.get("/logs")
def get_logs(db: Session = Depends(get_db)):
logs = db.execute(
text("""
SELECT s.name, cl.time, cl.camera_id, cl.status
FROM checkin_logs cl
JOIN students s ON cl.student_id = s.id
ORDER BY cl.time DESC
LIMIT 20
""")
).fetchall()
result = []
for log in logs:
result.append({
"name": log.name,
"time": log.time.strftime("%Y-%m-%d %H:%M:%S"),
"camera_id": log.camera_id,
"status": log.status
})
return result
@app.get("/users")
def get_users(db: Session = Depends(get_db)):
# Lấy danh sách student
students = db.execute(
text("""
SELECT id, name, email, avatar
FROM students
ORDER BY name DESC
""")
).fetchall()
result = []
for stu in students:
student_id = stu.id
# Lấy tối đa 5 checkpoint mới nhất
checkpoints = db.execute(
text("""
SELECT id, time, camera_id
FROM checkin_logs
WHERE student_id = :sid
ORDER BY time DESC
LIMIT 5
"""),
{"sid": student_id}
).fetchall()
result.append({
"id": stu.id,
"name": stu.name,
"email": stu.email,
"avatar": stu.avatar,
"checkpoints": [
{
"id": c.id,
"time": c.time,
"camera_id": c.camera_id
}
for c in checkpoints
]
})
return result