from fastapi import FastAPI, UploadFile, File, Form, Depends, HTTPException from fastapi.responses import JSONResponse from sqlalchemy.orm import Session import face_recognition import numpy as np import os import datetime from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse from database import SessionLocal, engine from models import Base, Student, CheckInLog, StudentEncoding from sqlalchemy.exc import IntegrityError from sqlalchemy import text app = FastAPI() Base.metadata.create_all(bind=engine) UPLOAD_DIR = "./uploads" os.makedirs(UPLOAD_DIR, exist_ok=True) def get_db(): db = SessionLocal() try: yield db finally: db.close() app.mount("/static", StaticFiles(directory="static"), name="static") @app.get("/") def root(): return FileResponse("static/index.html") @app.post("/register") async def register_face(name: str = Form(...),email: str = Form(...),file: UploadFile = File(...)): db = SessionLocal() # Check if email already exists existing = db.execute( text("SELECT id FROM students WHERE email = :email"), {"email": email} ).fetchone() # Save image image_data = await file.read() image_path = f"./uploads/{file.filename}" with open(image_path, "wb") as f: f.write(image_data) # Encode face image = face_recognition.load_image_file(image_path) encodings = face_recognition.face_encodings(image) if not encodings: db.close() return JSONResponse(content={"message": "Không phát hiện khuôn mặt."}, status_code=400) encoding_bytes = encodings[0].tobytes() try: if existing: # Email exists, just add new encoding student_id = existing[0] db.execute( text("INSERT INTO student_encodings (student_id, encoding) VALUES (:student_id, :encoding)"), {"student_id": student_id, "encoding": encoding_bytes} ) db.commit() return {"message": "Đã thêm encoding mới cho sinh viên."} else: # Email doesn't exist, create new student db.execute( text("INSERT INTO students (name, email) VALUES (:name, :email)"), {"name": name, "email": email} ) db.commit() # Get the last inserted id result = db.execute(text("SELECT LAST_INSERT_ID()")).fetchone() student_id = result[0] # Insert encoding db.execute( text("INSERT INTO student_encodings (student_id, encoding) VALUES (:student_id, :encoding)"), {"student_id": student_id, "encoding": encoding_bytes} ) db.commit() return {"message": "Đăng ký thành công."} except IntegrityError: db.rollback() raise HTTPException(status_code=400, detail="Email đã tồn tại.") finally: db.close() @app.post("/checkin") async def checkin(file: UploadFile = File(...), camera_id: str = Form("cam1"), db: Session = Depends(get_db)): image_data = await file.read() path = os.path.join(UPLOAD_DIR, "checkin.jpg") with open(path, "wb") as f: f.write(image_data) unknown_img = face_recognition.load_image_file(path) unknown_encodings = face_recognition.face_encodings(unknown_img) if not unknown_encodings: return {"message": "No face detected."} unknown_encoding = unknown_encodings[0] # Get all encodings with student info encodings = db.execute( text(""" SELECT s.id, s.name, se.encoding FROM student_encodings se JOIN students s ON s.id = se.student_id """) ).fetchall() for encoding in encodings: known_encoding = np.frombuffer(encoding.encoding) result = face_recognition.compare_faces([known_encoding], unknown_encoding, tolerance=0.5) if result[0]: now = datetime.datetime.now() recent_check = db.execute( text(""" SELECT id FROM checkin_logs WHERE student_id = :student_id AND time > :time_threshold """), { "student_id": encoding.id, "time_threshold": now - datetime.timedelta(minutes=5) } ).fetchone() if recent_check: return {"message": f"{encoding.name} already checked in recently."} db.execute( text(""" INSERT INTO checkin_logs (student_id, time, camera_id) VALUES (:student_id, :time, :camera_id) """), { "student_id": encoding.id, "time": now, "camera_id": camera_id } ) db.commit() return {"message": f"Check-in successful for {encoding.name}"} return {"message": "No match found."} @app.get("/logs") def get_logs(db: Session = Depends(get_db)): logs = db.execute( text(""" SELECT s.name, cl.time, cl.camera_id FROM checkin_logs cl JOIN students s ON cl.student_id = s.id ORDER BY cl.time DESC """) ).fetchall() result = [] for log in logs: result.append({ "name": log.name, "time": log.time.strftime("%Y-%m-%d %H:%M:%S"), "camera_id": log.camera_id }) return result