122 lines
3.8 KiB
Python
122 lines
3.8 KiB
Python
from fastapi import FastAPI, UploadFile, File, Form, Depends, HTTPException
|
|
from fastapi.responses import JSONResponse
|
|
from sqlalchemy.orm import Session
|
|
import face_recognition
|
|
import numpy as np
|
|
import os
|
|
import datetime
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.responses import FileResponse
|
|
from database import SessionLocal, engine
|
|
from models import Base, Student, CheckInLog
|
|
from sqlalchemy.exc import IntegrityError
|
|
from sqlalchemy import text
|
|
|
|
app = FastAPI()
|
|
Base.metadata.create_all(bind=engine)
|
|
|
|
UPLOAD_DIR = "./uploads"
|
|
os.makedirs(UPLOAD_DIR, exist_ok=True)
|
|
|
|
def get_db():
|
|
db = SessionLocal()
|
|
try:
|
|
yield db
|
|
finally:
|
|
db.close()
|
|
app.mount("/static", StaticFiles(directory="static"), name="static")
|
|
|
|
@app.get("/")
|
|
def root():
|
|
return FileResponse("static/index.html")
|
|
|
|
@app.post("/register")
|
|
async def register_face(name: str = Form(...),email: str = Form(...),file: UploadFile = File(...)):
|
|
db = SessionLocal()
|
|
|
|
# Check if email already exists
|
|
existing = db.execute(
|
|
text("SELECT id FROM students WHERE email = :email"),
|
|
{"email": email}
|
|
).fetchone()
|
|
if existing:
|
|
db.close()
|
|
raise HTTPException(status_code=400, detail="Email đã được đăng ký.")
|
|
|
|
# Save image
|
|
image_data = await file.read()
|
|
image_path = f"./uploads/{file.filename}"
|
|
with open(image_path, "wb") as f:
|
|
f.write(image_data)
|
|
|
|
# Encode face
|
|
image = face_recognition.load_image_file(image_path)
|
|
encodings = face_recognition.face_encodings(image)
|
|
|
|
if not encodings:
|
|
db.close()
|
|
return JSONResponse(content={"message": "Không phát hiện khuôn mặt."}, status_code=400)
|
|
|
|
encoding_bytes = encodings[0].tobytes()
|
|
|
|
try:
|
|
db.execute(
|
|
text("INSERT INTO students (name, email, encoding) VALUES (:name, :email, :encoding)"),
|
|
{"name": name, "email": email, "encoding": encoding_bytes}
|
|
)
|
|
db.commit()
|
|
return {"message": "Đăng ký thành công."}
|
|
except IntegrityError:
|
|
db.rollback()
|
|
raise HTTPException(status_code=400, detail="Email đã tồn tại.")
|
|
finally:
|
|
db.close()
|
|
|
|
@app.post("/checkin")
|
|
async def checkin(file: UploadFile = File(...), camera_id: str = Form("cam1"), db: Session = Depends(get_db)):
|
|
image_data = await file.read()
|
|
path = os.path.join(UPLOAD_DIR, "checkin.jpg")
|
|
with open(path, "wb") as f:
|
|
f.write(image_data)
|
|
|
|
unknown_img = face_recognition.load_image_file(path)
|
|
unknown_encodings = face_recognition.face_encodings(unknown_img)
|
|
|
|
if not unknown_encodings:
|
|
return {"message": "No face detected."}
|
|
|
|
unknown_encoding = unknown_encodings[0]
|
|
|
|
students = db.query(Student).all()
|
|
for student in students:
|
|
known_encoding = np.frombuffer(student.encoding)
|
|
result = face_recognition.compare_faces([known_encoding], unknown_encoding, tolerance=0.5)
|
|
if result[0]:
|
|
now = datetime.datetime.now()
|
|
recent_check = db.query(CheckInLog).filter(
|
|
CheckInLog.student_id == student.id,
|
|
CheckInLog.time > now - datetime.timedelta(minutes=5)
|
|
).first()
|
|
|
|
if recent_check:
|
|
return {"message": f"{student.name} already checked in recently."}
|
|
|
|
log = CheckInLog(student_id=student.id, time=now, camera_id=camera_id)
|
|
db.add(log)
|
|
db.commit()
|
|
return {"message": f"Check-in successful for {student.name}"}
|
|
|
|
return {"message": "No match found."}
|
|
|
|
@app.get("/logs")
|
|
def get_logs(db: Session = Depends(get_db)):
|
|
logs = db.query(CheckInLog).all()
|
|
result = []
|
|
for log in logs:
|
|
result.append({
|
|
"name": log.student.name,
|
|
"time": log.time.strftime("%Y-%m-%d %H:%M:%S"),
|
|
"camera_id": log.camera_id
|
|
})
|
|
return result
|