# tasks/listed_tasks.py import threading import queue from database.db import get_connection from services.core.log_service import log_service from gui.global_signals import global_signals stop_event = threading.Event() dialog_semaphore = None # sẽ khởi tạo dựa trên setting def stop_listed_task(): """Dừng toàn bộ task""" stop_event.set() log_service.info("[Task] Stop signal sent") def get_settings(): """Lấy setting từ DB""" conn = get_connection() cursor = conn.cursor() cursor.execute("SELECT key, value FROM settings") rows = cursor.fetchall() conn.close() return {row["key"]: row["value"] for row in rows} def get_pending_items(limit=1000): """Lấy danh sách pending listed items""" conn = get_connection() cursor = conn.cursor() cursor.execute(f""" SELECT l.id, l.account_id, l.product_id, l.listed_at, l.status FROM listed l JOIN accounts a ON l.account_id = a.id WHERE l.status='pending' AND a.is_active=1 ORDER BY l.listed_at ASC LIMIT {limit} """) rows = cursor.fetchall() conn.close() return rows def process_item(row): """Xử lý 1 item: mở dialog trên main thread và update DB""" listed_id, account_id, product_id, listed_at, status = row log_service.info(f"[Task] Processing listed_id={listed_id}") if stop_event.is_set(): log_service.info("[Task] Stop signal received, skip item") return # --- Đợi semaphore để giới hạn số dialog --- dialog_semaphore.acquire() # --- Tạo event để chờ dialog xong --- finished_event = threading.Event() # Callback khi dialog xong def on_dialog_finished(finished_account_id): if finished_account_id == account_id: finished_event.set() global_signals.dialog_finished.disconnect(on_dialog_finished) dialog_semaphore.release() global_signals.dialog_finished.connect(on_dialog_finished) # --- Emit signal mở dialog trên main thread --- global_signals.open_login_dialog.emit(account_id) # --- Chờ dialog hoàn tất --- finished_event.wait() # --- Update DB --- conn = get_connection() cursor = conn.cursor() cursor.execute("UPDATE listed SET status='listed' WHERE id=?", (listed_id,)) conn.commit() conn.close() log_service.info(f"[Task] Finished listed_id={listed_id}") def worker_thread(q: queue.Queue): while not stop_event.is_set(): try: row = q.get(timeout=1) except queue.Empty: break try: process_item(row) except Exception as e: log_service.error(f"[Task] Exception in worker: {e}") finally: q.task_done() def process_all_listed(): """ Entry point: tạo queue và 2 worker thread (hoặc theo setting) """ log_service.info("[Task] Start processing all listed items") settings = get_settings() max_workers = int(settings.get("MAX_CONCURRENT_LISTING", 2)) global dialog_semaphore dialog_semaphore = threading.Semaphore(max_workers) # --- Lấy pending items --- items = get_pending_items(limit=1000) if not items: log_service.info("[Task] No pending items") global_signals.listed_finished.emit() return # --- Tạo queue --- q = queue.Queue() for row in items: q.put(row) # --- Tạo worker threads --- threads = [] for _ in range(max_workers): t = threading.Thread(target=worker_thread, args=(q,)) t.start() threads.append(t) # --- Chờ queue xong --- q.join() stop_event.set() for t in threads: t.join() log_service.info("[Task] All listed items processed") global_signals.listed_finished.emit()