349 lines
14 KiB
Python
349 lines
14 KiB
Python
import os
|
|
import cv2 as cv
|
|
import openpyxl
|
|
import pyttsx3
|
|
import pyautogui
|
|
from pyzbar import pyzbar
|
|
from datetime import datetime
|
|
import requests
|
|
import face_recognition
|
|
import numpy as np
|
|
import pickle
|
|
import time
|
|
# Khởi tạo danh sách rỗng để lưu trữ thông tin người dùng
|
|
user_data = []
|
|
history = []
|
|
screen_width = 1250
|
|
screen_height = 1100
|
|
WINDOW_QR_CODE = "QR Code"
|
|
WINDOW_TRACKING = "Tracking"
|
|
WINDOW_HISTORY = "History"
|
|
# URL_API = "http://localhost:8000/api/v1"
|
|
URL_API = "https://ms.prology.net/api/v1"
|
|
data = [0]
|
|
# Hàm thông báo bằng giọng nói
|
|
def speak(text):
|
|
engine = pyttsx3.init()
|
|
engine.say(text)
|
|
engine.runAndWait()
|
|
|
|
def send_image(id, file_name):
|
|
# Ensure id is a string if it's an integer
|
|
id = str(id)
|
|
|
|
today_date = datetime.now().strftime("%Y_%m_%d")
|
|
folder_path = f"./images/{today_date}"
|
|
|
|
# Ensure the directory exists
|
|
if not os.path.exists(folder_path):
|
|
os.makedirs(folder_path)
|
|
|
|
# Ensure the full file path is correct
|
|
file_path = os.path.join(folder_path, file_name)
|
|
|
|
# Check if the file exists
|
|
if not os.path.isfile(file_path):
|
|
print(f"Error: The file {file_path} does not exist.")
|
|
return {"status": False, "message": f"The file {file_path} does not exist."}
|
|
|
|
with open(file_path, 'rb') as image_file:
|
|
files = {'image': image_file}
|
|
# Correct payload for the data parameter
|
|
data = {'id': id, 'file_name': file_name}
|
|
print(files)
|
|
try:
|
|
response = requests.post(URL_API + "/admin/tracking/send-image", data=data, files=files)
|
|
response.raise_for_status()
|
|
res = response.json()
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"HTTP Request failed: {e}")
|
|
return {"status": False, "message": str(e)}
|
|
|
|
# Check if the request was successful
|
|
if res.get('status') == True:
|
|
# Process the returned data
|
|
print(res)
|
|
return res
|
|
else:
|
|
return res
|
|
def create_history(frame, data):
|
|
# Gửi yêu cầu POST với dữ liệu đã chỉ định
|
|
response = requests.post(URL_API+"/admin/tracking/scan-create", data=data)
|
|
res = response.json()
|
|
# Kiểm tra xem gửi yêu cầu có thành công hay không
|
|
if res.get('status') == True:
|
|
# Xử lý dữ liệu trả về
|
|
print(res)
|
|
return res
|
|
else:
|
|
display_text(frame, res.get('data'), (25, 25), 0.7, (6, 6, 255), 2)
|
|
speak(res.get('data'))
|
|
return res
|
|
# Hàm để ghi thông tin vào tệp Excel
|
|
def write_to_excel(name, time, check_type):
|
|
try:
|
|
# Mở workbook hiện có
|
|
workbook = openpyxl.load_workbook("./data/"+time.strftime("%Y_%m")+"_check_in_out.xlsx")
|
|
sheet = workbook.active
|
|
except FileNotFoundError:
|
|
# Tạo workbook mới nếu tệp không tồn tại
|
|
workbook = openpyxl.Workbook()
|
|
sheet = workbook.active
|
|
sheet.append(["Name", "Role", "Time", "Check Type"])
|
|
|
|
# Ghi thông tin vào các ô trong tệp Excel
|
|
sheet.append([name.split('\n')[0].strip(), name.split('\n')[1].strip(), time, check_type])
|
|
|
|
# Lưu tệp Excel
|
|
workbook.save("./data/"+time.strftime("%Y_%m")+"_check_in_out.xlsx")
|
|
|
|
|
|
def check_response(res, frame, name, timestamp, text):
|
|
if res.get('status'):
|
|
display_text(frame, f"{text.split('\n')[0].strip()} {res.get('check_status')}", (25, 25), 0.7, (0, 255, 10), 2)
|
|
display_text(frame, f"{text.split('\n')[1]}", (25, 50), 0.7, (0, 255, 10), 2)
|
|
display_text(frame, f"{datetime.now()}", (25, 75), 0.7, (0, 255, 10), 2)
|
|
display_text(frame, f"Sucessful", (25, 100), 0.7, (0, 255, 10), 2)
|
|
write_to_excel(name, timestamp, res.get('check_status'))
|
|
speak(res.get('check_status') + " success")
|
|
return True
|
|
else:
|
|
display_text(frame, f"Call API fail", (25, 50), 0.7, (6, 6, 255), 2)
|
|
speak("Call API fail")
|
|
cv.waitKey(2000)
|
|
return False
|
|
|
|
# Hàm để thêm thông tin mới vào danh sách
|
|
def check_in(name, frame, text):
|
|
timestamp = datetime.now()
|
|
user_data.append({"name": name, "check_in_time": timestamp})
|
|
res = create_history(frame, {"name": name.split('\n')[0], "time_string": f"{datetime.now()}", "status": "check in"})
|
|
result = check_response(res, frame, name, timestamp, text)
|
|
return res
|
|
# cv.waitKey(5000)
|
|
|
|
|
|
# Hàm để xóa thông tin khi check out
|
|
def check_out(name, frame, text):
|
|
for user in user_data:
|
|
if user["name"] == name:
|
|
timestamp = datetime.now()
|
|
user["check_out_time"] = timestamp
|
|
print(f"{name} đã check out lúc {timestamp}")
|
|
user_data.remove(user)
|
|
res = create_history(frame, {"name": name.split('\n')[0], "time_string": f"{datetime.now()}", "status": "check out"})
|
|
result = check_response(res, frame, name, timestamp, text)
|
|
if result:
|
|
return res
|
|
# Hàm để hiển thị văn bản lên hình ảnh
|
|
def display_text(frame, text, position, font_scale, color, thickness):
|
|
cv.putText(frame, text, position, cv.FONT_HERSHEY_COMPLEX, font_scale, color, thickness)
|
|
|
|
def screenshot_window(save_path):
|
|
today_date = datetime.now().strftime("%Y_%m_%d")
|
|
folder_path = f"./images/{today_date}"
|
|
# Kiểm tra xem thư mục đã tồn tại chưa
|
|
if not os.path.exists(folder_path):
|
|
# Nếu thư mục chưa tồn tại, tạo mới
|
|
os.makedirs(folder_path)
|
|
print(f"Folder '{today_date}' created successfully!")
|
|
|
|
screenshot = pyautogui.screenshot(region=(10, 10, screen_width, screen_height))
|
|
screenshot.save(folder_path+"/"+save_path)
|
|
print("Screenshot saved successfully!")
|
|
|
|
# Hàm để xử lý quá trình quét mã QR code
|
|
def process_qr_code(frame):
|
|
decoded_objects = pyzbar.decode(frame)
|
|
for obj in decoded_objects:
|
|
(x, y, w, h) = obj.rect
|
|
cv.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 255), 2)
|
|
text = obj.data.decode('utf-8')
|
|
file_name = ""
|
|
status = ""
|
|
id_log = 0
|
|
|
|
if text.endswith("\n\n"):
|
|
if text not in [user["name"] for user in user_data]:
|
|
print(f"{text} đã check in lúc {datetime.now()}")
|
|
status += "check in"
|
|
file_name+=text.split('\n')[0]+"_"+f"{status}_at_{datetime.now().strftime("%Y_%m_%d_%H_%M_%S")}.png"
|
|
# screenshot_window(file_name)
|
|
res = check_in(text, frame, text)
|
|
id_log = res.get('data').get('id')
|
|
else:
|
|
print(f"{text} đã check out lúc {datetime.now()}")
|
|
status += "check out"
|
|
file_name+=text.split('\n')[0]+"_"+f"{status}_at_{datetime.now().strftime("%Y_%m_%d_%H_%M_%S")}.png"
|
|
# screenshot_window(file_name)
|
|
res = check_out(text, frame, text)
|
|
id_log = res.get('data').get('id')
|
|
cv.namedWindow(WINDOW_QR_CODE, cv.WINDOW_NORMAL)
|
|
cv.resizeWindow(WINDOW_QR_CODE, screen_width, screen_height)
|
|
cv.imshow(WINDOW_QR_CODE, frame)
|
|
cv.moveWindow(WINDOW_QR_CODE, 10, 10)
|
|
cv.waitKey(3000) # Chờ 5 giây
|
|
screenshot_window(file_name)
|
|
cv.destroyWindow(WINDOW_QR_CODE)
|
|
send_image(id_log, file_name)
|
|
else:
|
|
display_text(frame, f"QR invalid", (25, 25), 0.7, (6, 6, 255), 2)
|
|
display_text(frame, f"Failed", (25, 50), 0.7, (6, 6, 255), 2)
|
|
speak("Failed")
|
|
cv.namedWindow(WINDOW_QR_CODE, cv.WINDOW_NORMAL)
|
|
cv.resizeWindow(WINDOW_QR_CODE, screen_width, screen_height)
|
|
cv.imshow(WINDOW_QR_CODE, frame)
|
|
cv.moveWindow(WINDOW_QR_CODE, 10, 10)
|
|
cv.waitKey(2000)
|
|
cv.destroyWindow(WINDOW_QR_CODE)
|
|
return frame
|
|
|
|
# Hàm để xử lý quá trình quét mã QR code
|
|
def process_face_detect(text, frame):
|
|
if text.endswith("\n\n"):
|
|
file_name = ""
|
|
status = ""
|
|
id_log = 0
|
|
if text not in [user["name"] for user in user_data]:
|
|
print(f"{text} đã check in lúc {datetime.now()}")
|
|
status += "check in"
|
|
file_name+=text.split('\n')[0]+"_"+f"{status}_at_{datetime.now().strftime("%Y_%m_%d_%H_%M_%S")}.png"
|
|
# screenshot_window(file_name)
|
|
res = check_in(text, frame, text)
|
|
id_log = res.get('data').get('id')
|
|
else:
|
|
print(f"{text} đã check out lúc {datetime.now()}")
|
|
status += "check out"
|
|
file_name+=text.split('\n')[0]+"_"+f"{status}_at_{datetime.now().strftime("%Y_%m_%d_%H_%M_%S")}.png"
|
|
# screenshot_window(file_name)
|
|
res = check_out(text, frame, text)
|
|
id_log = res.get('data').get('id')
|
|
screenshot_window(file_name)
|
|
cv.namedWindow(WINDOW_QR_CODE, cv.WINDOW_NORMAL)
|
|
cv.resizeWindow(WINDOW_QR_CODE, screen_width, screen_height)
|
|
cv.imshow(WINDOW_QR_CODE, frame)
|
|
cv.moveWindow(WINDOW_QR_CODE, 10, 10)
|
|
# Chạy vòng lặp không chặn trong 2 giây
|
|
cv.waitKey(2000) # Chờ 10ms
|
|
cv.destroyWindow(WINDOW_QR_CODE)
|
|
send_image(id_log, file_name)
|
|
else:
|
|
display_text(frame, f"QR invalid", (25, 25), 0.7, (6, 6, 255), 2)
|
|
display_text(frame, f"Failed", (25, 50), 0.7, (6, 6, 255), 2)
|
|
speak("Failed")
|
|
cv.namedWindow(WINDOW_QR_CODE, cv.WINDOW_NORMAL)
|
|
cv.resizeWindow(WINDOW_QR_CODE, screen_width, screen_height)
|
|
cv.imshow(WINDOW_QR_CODE, frame)
|
|
cv.moveWindow(WINDOW_QR_CODE, 10, 10)
|
|
cv.waitKey(2000)
|
|
cv.destroyWindow(WINDOW_QR_CODE)
|
|
return frame
|
|
|
|
datasetPath = "../DetectFace/dataset"
|
|
listFilesPath = '../DetectFace/listFiles.pkl'
|
|
images = []
|
|
classNames = []
|
|
lisFileTrain = []
|
|
|
|
if os.path.exists(listFilesPath):
|
|
with open(listFilesPath, 'rb') as f:
|
|
lisFileTrain = pickle.load(f)
|
|
else:
|
|
lisFileTrain = os.listdir(datasetPath)
|
|
with open(listFilesPath, 'wb') as f:
|
|
pickle.dump(lisFileTrain, f)
|
|
for file in lisFileTrain:
|
|
classNames.append(os.path.splitext(file)[0].split('_')[0])
|
|
|
|
def encodeImgs(save_path="../DetectFace/encodings.pkl"):
|
|
if os.path.exists(save_path):
|
|
print(f"Loading encodings from {save_path}...")
|
|
with open(save_path, "rb") as f:
|
|
return pickle.load(f)
|
|
|
|
encodeListKnow = encodeImgs()
|
|
print("Load data success")
|
|
# Khởi tạo camera
|
|
|
|
def main():
|
|
recognized_faces = {}
|
|
name_history = {}
|
|
cap = cv.VideoCapture(0)
|
|
face_cascade = cv.CascadeClassifier(cv.data.haarcascades + 'haarcascade_frontalface_default.xml')
|
|
cv.namedWindow(WINDOW_TRACKING, cv.WINDOW_NORMAL)
|
|
cv.resizeWindow(WINDOW_TRACKING, screen_width, screen_height)
|
|
while True:
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
break
|
|
|
|
frameS = cv.resize(frame, (0,0), None, fx=0.5, fy=0.5)
|
|
frameS = cv.cvtColor(frameS, cv.COLOR_BGR2RGB)
|
|
|
|
faceCurFrame = face_recognition.face_locations(frameS, model='hog')
|
|
encodeCurFrame = face_recognition.face_encodings(frameS)
|
|
frame = process_qr_code(frame)
|
|
current_time = time.time()
|
|
for encodeFace, faceLoc in zip(encodeCurFrame, faceCurFrame):
|
|
matches = face_recognition.compare_faces(encodeListKnow, encodeFace)
|
|
faceDis = face_recognition.face_distance(encodeListKnow, encodeFace)
|
|
# print(faceDis)
|
|
matchIndex = np.argmin(faceDis)
|
|
|
|
if faceDis[matchIndex] < 0.35:
|
|
name = classNames[matchIndex].upper()
|
|
# If the face is recognized, track the timestamp
|
|
if name not in recognized_faces:
|
|
recognized_faces[name] = current_time # Store first detection time
|
|
else:
|
|
elapsed_time = current_time - recognized_faces[name]
|
|
if (name not in name_history) or (current_time - name_history[name] >= 60):
|
|
if elapsed_time >= 2.5: # If face is seen for 2s, execute script
|
|
process_face_detect(f"{name}\n{"Staff"}\n\n", frame)
|
|
name_history[name] = time.time()
|
|
del recognized_faces[name]
|
|
else:
|
|
display_text(frame, f"Checking: "+str(round((elapsed_time/2.5)*100,2))+"%", (700, 55), 1, (0, 255, 255), 2)
|
|
else:
|
|
display_text(frame, f"Checked. Try after {round(60-(current_time - name_history[name]),0)}s", (600, 55), 1, (0, 255, 255), 2)
|
|
else:
|
|
name = "Unknow"
|
|
recognized_faces = {}
|
|
display_text(frame, f"Face not found - use QRcode", (20, 55), 0.7, (6, 6, 255), 2)
|
|
y1, x2, y2, x1 = faceLoc
|
|
y1, x2, y2, x1 = y1*2, x2*2, y2*2, x1*2
|
|
cv.rectangle(frame, (x1,y1), (x2,y2), (0,255,0), 2)
|
|
cv.putText(frame, name + f"({(1 - round(faceDis[matchIndex], 2))*100}%)", (x1, y1), cv.FONT_HERSHEY_COMPLEX, 0.8, (0,255,0), 2)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Convert the frame to grayscale
|
|
# gray_frame = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)
|
|
|
|
# # Detect faces in the frame
|
|
# faces = face_cascade.detectMultiScale(gray_frame, scaleFactor=1.1, minNeighbors=25, minSize=(30, 30))
|
|
|
|
# # Draw rectangles around the faces
|
|
# if len(faces) == 1:
|
|
# for (x, y, w, h) in faces:
|
|
# cv.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0), 2)
|
|
# display_text(frame, f"Face detected", (430, 25), 0.7, (0, 255, 0), 2)
|
|
# frame = process_qr_code(frame)
|
|
# else:
|
|
# display_text(frame, f"Face not found", (430, 25), 0.7, (6, 6, 255), 2)
|
|
cv.imshow(WINDOW_TRACKING, frame)
|
|
cv.moveWindow(WINDOW_TRACKING, 10, 10)
|
|
if cv.waitKey(1) == ord('q'):
|
|
break
|
|
|
|
cap.release()
|
|
cv.destroyAllWindows()
|
|
|
|
if __name__ == "__main__":
|
|
main() |