370 lines
10 KiB
Python
370 lines
10 KiB
Python
from fastapi import FastAPI, UploadFile, File, Form, Depends, HTTPException
|
|
from fastapi.responses import JSONResponse
|
|
from sqlalchemy.orm import Session
|
|
import face_recognition
|
|
import numpy as np
|
|
import os
|
|
import datetime
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.responses import FileResponse
|
|
from database import SessionLocal, engine
|
|
from models import Base, Student, CheckInLog, StudentEncoding
|
|
from sqlalchemy.exc import IntegrityError
|
|
from sqlalchemy import text
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from api import create_history, send_image
|
|
|
|
app = FastAPI()
|
|
|
|
# --- CORS ---
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
Base.metadata.create_all(bind=engine)
|
|
|
|
UPLOAD_DIR = "./uploads"
|
|
os.makedirs(UPLOAD_DIR, exist_ok=True)
|
|
|
|
def get_db():
|
|
db = SessionLocal()
|
|
try:
|
|
yield db
|
|
finally:
|
|
db.close()
|
|
app.mount("/static", StaticFiles(directory="static"), name="static")
|
|
|
|
@app.get("/")
|
|
def root():
|
|
return FileResponse("static/index.html")
|
|
|
|
@app.post("/register")
|
|
async def register_face(
|
|
name: str = Form(...),
|
|
email: str = Form(...),
|
|
avatar: str = Form(None), # OPTIONAL
|
|
file: UploadFile = File(...)
|
|
):
|
|
db = SessionLocal()
|
|
|
|
# Check duplicate email
|
|
existing_email = db.execute(
|
|
text("SELECT id FROM students WHERE email = :email"),
|
|
{"email": email}
|
|
).fetchone()
|
|
|
|
# Save image
|
|
image_data = await file.read()
|
|
image_path = f"./uploads/{file.filename}"
|
|
with open(image_path, "wb") as f:
|
|
f.write(image_data)
|
|
|
|
# Encode face
|
|
image = face_recognition.load_image_file(image_path)
|
|
encodings = face_recognition.face_encodings(image)
|
|
|
|
if not encodings:
|
|
db.close()
|
|
return JSONResponse(
|
|
content={"message": "Không phát hiện khuôn mặt."},
|
|
status_code=400
|
|
)
|
|
|
|
encoding_bytes = encodings[0].tobytes()
|
|
|
|
try:
|
|
if existing_email:
|
|
# Email exists → just add new encoding
|
|
student_id = existing_email[0]
|
|
db.execute(
|
|
text("""
|
|
INSERT INTO student_encodings (student_id, encoding)
|
|
VALUES (:student_id, :encoding)
|
|
"""),
|
|
{"student_id": student_id, "encoding": encoding_bytes}
|
|
)
|
|
db.commit()
|
|
|
|
return {"message": "Đã thêm encoding mới."}
|
|
|
|
else:
|
|
# Insert new student (avatar nullable)
|
|
db.execute(
|
|
text("""
|
|
INSERT INTO students (name, email, avatar)
|
|
VALUES (:name, :email, :avatar)
|
|
"""),
|
|
{
|
|
"name": name,
|
|
"email": email,
|
|
"avatar": avatar,
|
|
}
|
|
)
|
|
db.commit()
|
|
|
|
student_id = db.execute(text("SELECT LAST_INSERT_ID()")).fetchone()[0]
|
|
|
|
# Insert encoding
|
|
db.execute(
|
|
text("""
|
|
INSERT INTO student_encodings (student_id, encoding)
|
|
VALUES (:student_id, :encoding)
|
|
"""),
|
|
{"student_id": student_id, "encoding": encoding_bytes}
|
|
)
|
|
db.commit()
|
|
|
|
return {"message": "Đăng ký thành công."}
|
|
|
|
except IntegrityError:
|
|
db.rollback()
|
|
raise HTTPException(status_code=400, detail="Email đã tồn tại.")
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
|
|
@app.post("/register-simple")
|
|
async def register_student(
|
|
name: str = Form(...),
|
|
email: str = Form(...),
|
|
avatar: str = Form(None), # OPTIONAL
|
|
):
|
|
db = SessionLocal()
|
|
|
|
try:
|
|
# Kiểm tra xem student đã tồn tại chưa
|
|
existing = db.execute(
|
|
text("SELECT id FROM students WHERE email = :email"),
|
|
{"email": email}
|
|
).fetchone()
|
|
|
|
if existing:
|
|
# UPDATE
|
|
db.execute(
|
|
text("""
|
|
UPDATE students
|
|
SET name = :name,
|
|
avatar = :avatar
|
|
WHERE email = :email
|
|
"""),
|
|
{
|
|
"name": name,
|
|
"avatar": avatar,
|
|
"email": email
|
|
}
|
|
)
|
|
db.commit()
|
|
return JSONResponse({"message": "Cập nhật thành công."}, status_code=200)
|
|
|
|
else:
|
|
# INSERT
|
|
db.execute(
|
|
text("""
|
|
INSERT INTO students (name, email, avatar)
|
|
VALUES (:name, :email, :avatar)
|
|
"""),
|
|
{
|
|
"name": name,
|
|
"email": email,
|
|
"avatar": avatar
|
|
}
|
|
)
|
|
db.commit()
|
|
return JSONResponse({"message": "Đăng ký thành công."}, status_code=201)
|
|
|
|
except IntegrityError:
|
|
db.rollback()
|
|
return JSONResponse(
|
|
{"message": "Lỗi cơ sở dữ liệu."},
|
|
status_code=400
|
|
)
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
|
|
@app.post("/checkin")
|
|
async def checkin(file: UploadFile = File(...), camera_id: str = Form("cam1"), db: Session = Depends(get_db)):
|
|
image_data = await file.read()
|
|
path = os.path.join(UPLOAD_DIR, "checkin.jpg")
|
|
with open(path, "wb") as f:
|
|
f.write(image_data)
|
|
|
|
unknown_img = face_recognition.load_image_file(path)
|
|
unknown_encodings = face_recognition.face_encodings(unknown_img)
|
|
|
|
if not unknown_encodings:
|
|
return {"message": "No face detected."}
|
|
|
|
unknown_encoding = unknown_encodings[0]
|
|
|
|
# Get all encodings with student info
|
|
encodings = db.execute(
|
|
text("""
|
|
SELECT s.id, s.name, s.email, s.avatar, se.encoding
|
|
FROM student_encodings se
|
|
JOIN students s ON s.id = se.student_id
|
|
""")
|
|
).fetchall()
|
|
|
|
for encoding in encodings:
|
|
known_encoding = np.frombuffer(encoding.encoding)
|
|
result = face_recognition.compare_faces([known_encoding], unknown_encoding, tolerance=0.5)
|
|
if result[0]:
|
|
now = datetime.datetime.now()
|
|
|
|
# Check recent checkin
|
|
recent_check = db.execute(
|
|
text("""
|
|
SELECT id FROM checkin_logs
|
|
WHERE student_id = :student_id
|
|
AND time > :time_threshold
|
|
"""),
|
|
{
|
|
"student_id": encoding.id,
|
|
"time_threshold": now - datetime.timedelta(minutes=5)
|
|
}
|
|
).fetchone()
|
|
|
|
if recent_check:
|
|
return {
|
|
"message": f"{encoding.name} already checked in recently.",
|
|
"checking": False,
|
|
"data": {
|
|
"id": encoding.id,
|
|
"name": encoding.name,
|
|
"email": encoding.email,
|
|
"avatar": encoding.avatar,
|
|
"camera_id": camera_id,
|
|
"time": now.isoformat()
|
|
}
|
|
}
|
|
|
|
|
|
|
|
# thêm dô đây------------
|
|
# id_log = 0
|
|
# ms_response = create_history({"name": encoding.name.split('\n')[0], "time_string": f"{datetime.datetime.now()}", "status": "check in"})
|
|
# id_log = ms_response.get('data').get('id')
|
|
# status = ms_response.get('data').get('status')
|
|
|
|
# # reset pointer
|
|
# file.file.seek(0)
|
|
|
|
# send_image_res = send_image(
|
|
# id=id_log,
|
|
# file=file,
|
|
# student_name=encoding.name,
|
|
# status=status
|
|
# )
|
|
|
|
# print(id_log, send_image_res)
|
|
|
|
# Insert new checkin
|
|
db.execute(
|
|
text("""
|
|
INSERT INTO checkin_logs (student_id, time, camera_id, status)
|
|
VALUES (:student_id, :time, :camera_id, :status)
|
|
"""),
|
|
{
|
|
"student_id": encoding.id,
|
|
"time": now,
|
|
"camera_id": camera_id,
|
|
"status": status
|
|
}
|
|
)
|
|
db.commit()
|
|
|
|
return {
|
|
"message": f"Check-in successful for {encoding.name}",
|
|
"checking": True,
|
|
"status": status,
|
|
"data": {
|
|
"id": encoding.id,
|
|
"name": encoding.name,
|
|
"email": encoding.email,
|
|
"avatar": encoding.avatar,
|
|
"camera_id": camera_id,
|
|
"time": now.isoformat()
|
|
}
|
|
}
|
|
|
|
return {"message": "No match found."}
|
|
|
|
|
|
@app.get("/logs")
|
|
def get_logs(db: Session = Depends(get_db)):
|
|
logs = db.execute(
|
|
text("""
|
|
SELECT s.name, cl.time, cl.camera_id, cl.status
|
|
FROM checkin_logs cl
|
|
JOIN students s ON cl.student_id = s.id
|
|
ORDER BY cl.time DESC
|
|
LIMIT 20
|
|
""")
|
|
).fetchall()
|
|
|
|
result = []
|
|
for log in logs:
|
|
result.append({
|
|
"name": log.name,
|
|
"time": log.time.strftime("%Y-%m-%d %H:%M:%S"),
|
|
"camera_id": log.camera_id,
|
|
"status": log.status
|
|
})
|
|
return result
|
|
|
|
@app.get("/users")
|
|
def get_users(db: Session = Depends(get_db)):
|
|
# Lấy danh sách student
|
|
students = db.execute(
|
|
text("""
|
|
SELECT id, name, email, avatar
|
|
FROM students
|
|
ORDER BY name DESC
|
|
""")
|
|
).fetchall()
|
|
|
|
result = []
|
|
|
|
for stu in students:
|
|
student_id = stu.id
|
|
|
|
# Lấy tối đa 5 checkpoint mới nhất
|
|
checkpoints = db.execute(
|
|
text("""
|
|
SELECT id, time, camera_id
|
|
FROM checkin_logs
|
|
WHERE student_id = :sid
|
|
ORDER BY time DESC
|
|
LIMIT 5
|
|
"""),
|
|
{"sid": student_id}
|
|
).fetchall()
|
|
|
|
result.append({
|
|
"id": stu.id,
|
|
"name": stu.name,
|
|
"email": stu.email,
|
|
"avatar": stu.avatar,
|
|
"checkpoints": [
|
|
{
|
|
"id": c.id,
|
|
"time": c.time,
|
|
"camera_id": c.camera_id
|
|
}
|
|
for c in checkpoints
|
|
]
|
|
})
|
|
|
|
return result
|
|
|
|
|