school-checkin/main.py

122 lines
3.8 KiB
Python

from fastapi import FastAPI, UploadFile, File, Form, Depends, HTTPException
from fastapi.responses import JSONResponse
from sqlalchemy.orm import Session
import face_recognition
import numpy as np
import os
import datetime
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from database import SessionLocal, engine
from models import Base, Student, CheckInLog
from sqlalchemy.exc import IntegrityError
from sqlalchemy import text
app = FastAPI()
Base.metadata.create_all(bind=engine)
UPLOAD_DIR = "./uploads"
os.makedirs(UPLOAD_DIR, exist_ok=True)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
app.mount("/static", StaticFiles(directory="static"), name="static")
@app.get("/")
def root():
return FileResponse("static/index.html")
@app.post("/register")
async def register_face(name: str = Form(...),email: str = Form(...),file: UploadFile = File(...)):
db = SessionLocal()
# Check if email already exists
existing = db.execute(
text("SELECT id FROM students WHERE email = :email"),
{"email": email}
).fetchone()
if existing:
db.close()
raise HTTPException(status_code=400, detail="Email đã được đăng ký.")
# Save image
image_data = await file.read()
image_path = f"./uploads/{file.filename}"
with open(image_path, "wb") as f:
f.write(image_data)
# Encode face
image = face_recognition.load_image_file(image_path)
encodings = face_recognition.face_encodings(image)
if not encodings:
db.close()
return JSONResponse(content={"message": "Không phát hiện khuôn mặt."}, status_code=400)
encoding_bytes = encodings[0].tobytes()
try:
db.execute(
text("INSERT INTO students (name, email, encoding) VALUES (:name, :email, :encoding)"),
{"name": name, "email": email, "encoding": encoding_bytes}
)
db.commit()
return {"message": "Đăng ký thành công."}
except IntegrityError:
db.rollback()
raise HTTPException(status_code=400, detail="Email đã tồn tại.")
finally:
db.close()
@app.post("/checkin")
async def checkin(file: UploadFile = File(...), camera_id: str = Form("cam1"), db: Session = Depends(get_db)):
image_data = await file.read()
path = os.path.join(UPLOAD_DIR, "checkin.jpg")
with open(path, "wb") as f:
f.write(image_data)
unknown_img = face_recognition.load_image_file(path)
unknown_encodings = face_recognition.face_encodings(unknown_img)
if not unknown_encodings:
return {"message": "No face detected."}
unknown_encoding = unknown_encodings[0]
students = db.query(Student).all()
for student in students:
known_encoding = np.frombuffer(student.encoding)
result = face_recognition.compare_faces([known_encoding], unknown_encoding, tolerance=0.5)
if result[0]:
now = datetime.datetime.now()
recent_check = db.query(CheckInLog).filter(
CheckInLog.student_id == student.id,
CheckInLog.time > now - datetime.timedelta(minutes=5)
).first()
if recent_check:
return {"message": f"{student.name} already checked in recently."}
log = CheckInLog(student_id=student.id, time=now, camera_id=camera_id)
db.add(log)
db.commit()
return {"message": f"Check-in successful for {student.name}"}
return {"message": "No match found."}
@app.get("/logs")
def get_logs(db: Session = Depends(get_db)):
logs = db.query(CheckInLog).all()
result = []
for log in logs:
result.append({
"name": log.student.name,
"time": log.time.strftime("%Y-%m-%d %H:%M:%S"),
"camera_id": log.camera_id
})
return result