from fastapi import FastAPI, UploadFile, File, Form, Depends, HTTPException from fastapi.responses import JSONResponse from sqlalchemy.orm import Session import face_recognition import numpy as np import os import datetime from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse from database import SessionLocal, engine from models import Base, Student, CheckInLog, StudentEncoding from sqlalchemy.exc import IntegrityError from sqlalchemy import text from fastapi.middleware.cors import CORSMiddleware from api import create_history, send_image app = FastAPI() # --- CORS --- app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) Base.metadata.create_all(bind=engine) UPLOAD_DIR = "./uploads" os.makedirs(UPLOAD_DIR, exist_ok=True) def get_db(): db = SessionLocal() try: yield db finally: db.close() app.mount("/static", StaticFiles(directory="static"), name="static") @app.get("/") def root(): return FileResponse("static/index.html") @app.post("/register") async def register_face( name: str = Form(...), email: str = Form(...), avatar: str = Form(None), # OPTIONAL file: UploadFile = File(...) ): db = SessionLocal() # Check duplicate email existing_email = db.execute( text("SELECT id FROM students WHERE email = :email"), {"email": email} ).fetchone() # Save image image_data = await file.read() image_path = f"./uploads/{file.filename}" with open(image_path, "wb") as f: f.write(image_data) # Encode face image = face_recognition.load_image_file(image_path) encodings = face_recognition.face_encodings(image) if not encodings: db.close() return JSONResponse( content={"message": "Không phát hiện khuôn mặt."}, status_code=400 ) encoding_bytes = encodings[0].tobytes() try: if existing_email: # Email exists → just add new encoding student_id = existing_email[0] db.execute( text(""" INSERT INTO student_encodings (student_id, encoding) VALUES (:student_id, :encoding) """), {"student_id": student_id, "encoding": encoding_bytes} ) db.commit() return {"message": "Đã thêm encoding mới."} else: # Insert new student (avatar nullable) db.execute( text(""" INSERT INTO students (name, email, avatar) VALUES (:name, :email, :avatar) """), { "name": name, "email": email, "avatar": avatar, } ) db.commit() student_id = db.execute(text("SELECT LAST_INSERT_ID()")).fetchone()[0] # Insert encoding db.execute( text(""" INSERT INTO student_encodings (student_id, encoding) VALUES (:student_id, :encoding) """), {"student_id": student_id, "encoding": encoding_bytes} ) db.commit() return {"message": "Đăng ký thành công."} except IntegrityError: db.rollback() raise HTTPException(status_code=400, detail="Email đã tồn tại.") finally: db.close() @app.post("/register-simple") async def register_student( name: str = Form(...), email: str = Form(...), avatar: str = Form(None), # OPTIONAL ): db = SessionLocal() try: # Kiểm tra xem student đã tồn tại chưa existing = db.execute( text("SELECT id FROM students WHERE email = :email"), {"email": email} ).fetchone() if existing: # UPDATE db.execute( text(""" UPDATE students SET name = :name, avatar = :avatar WHERE email = :email """), { "name": name, "avatar": avatar, "email": email } ) db.commit() return JSONResponse({"message": "Cập nhật thành công."}, status_code=200) else: # INSERT db.execute( text(""" INSERT INTO students (name, email, avatar) VALUES (:name, :email, :avatar) """), { "name": name, "email": email, "avatar": avatar } ) db.commit() return JSONResponse({"message": "Đăng ký thành công."}, status_code=201) except IntegrityError: db.rollback() return JSONResponse( {"message": "Lỗi cơ sở dữ liệu."}, status_code=400 ) finally: db.close() @app.post("/checkin") async def checkin(file: UploadFile = File(...), camera_id: str = Form("cam1"), db: Session = Depends(get_db)): image_data = await file.read() path = os.path.join(UPLOAD_DIR, "checkin.jpg") with open(path, "wb") as f: f.write(image_data) unknown_img = face_recognition.load_image_file(path) unknown_encodings = face_recognition.face_encodings(unknown_img) if not unknown_encodings: return {"message": "No face detected."} unknown_encoding = unknown_encodings[0] # Get all encodings with student info encodings = db.execute( text(""" SELECT s.id, s.name, s.email, s.avatar, se.encoding FROM student_encodings se JOIN students s ON s.id = se.student_id """) ).fetchall() for encoding in encodings: known_encoding = np.frombuffer(encoding.encoding) result = face_recognition.compare_faces([known_encoding], unknown_encoding, tolerance=0.5) if result[0]: now = datetime.datetime.now() # Check recent checkin recent_check = db.execute( text(""" SELECT id FROM checkin_logs WHERE student_id = :student_id AND time > :time_threshold """), { "student_id": encoding.id, "time_threshold": now - datetime.timedelta(minutes=5) } ).fetchone() if recent_check: return { "message": f"{encoding.name} already checked in recently.", "checking": False, "data": { "id": encoding.id, "name": encoding.name, "email": encoding.email, "avatar": encoding.avatar, "camera_id": camera_id, "time": now.isoformat() } } # thêm dô đây id_log = 0 ms_response = create_history({"name": encoding.name.split('\n')[0], "time_string": f"{datetime.datetime.now()}", "status": "check in"}) id_log = ms_response.get('data').get('id') status = ms_response.get('data').get('status') # reset pointer file.file.seek(0) send_image_res = send_image( id=id_log, file=file, student_name=encoding.name, status=status ) print(id_log, send_image_res) # Insert new checkin db.execute( text(""" INSERT INTO checkin_logs (student_id, time, camera_id, status) VALUES (:student_id, :time, :camera_id, :status) """), { "student_id": encoding.id, "time": now, "camera_id": camera_id, "status": status } ) db.commit() return { "message": f"Check-in successful for {encoding.name}", "checking": True, "status": status, "data": { "id": encoding.id, "name": encoding.name, "email": encoding.email, "avatar": encoding.avatar, "camera_id": camera_id, "time": now.isoformat() } } return {"message": "No match found."} @app.get("/logs") def get_logs(db: Session = Depends(get_db)): logs = db.execute( text(""" SELECT s.name, cl.time, cl.camera_id, cl.status FROM checkin_logs cl JOIN students s ON cl.student_id = s.id ORDER BY cl.time DESC LIMIT 20 """) ).fetchall() result = [] for log in logs: result.append({ "name": log.name, "time": log.time.strftime("%Y-%m-%d %H:%M:%S"), "camera_id": log.camera_id, "status": log.status }) return result @app.get("/users") def get_users(db: Session = Depends(get_db)): # Lấy danh sách student students = db.execute( text(""" SELECT id, name, email, avatar FROM students ORDER BY name DESC """) ).fetchall() result = [] for stu in students: student_id = stu.id # Lấy tối đa 5 checkpoint mới nhất checkpoints = db.execute( text(""" SELECT id, time, camera_id FROM checkin_logs WHERE student_id = :sid ORDER BY time DESC LIMIT 5 """), {"sid": student_id} ).fetchall() result.append({ "id": stu.id, "name": stu.name, "email": stu.email, "avatar": stu.avatar, "checkpoints": [ { "id": c.id, "time": c.time, "camera_id": c.camera_id } for c in checkpoints ] }) return result