import { Queue } from 'bullmq' import env from '#start/env' import type { ErpProductItem } from '#services/erp_service' export const redisConnection = { host: env.get('REDIS_HOST', '127.0.0.1'), port: Number(env.get('REDIS_PORT', 6379)), password: env.get('REDIS_PASSWORD') || undefined, } export const QUEUE_NAMES = { pricing: 'pricing', sync: 'sync', product: 'product', import: 'import', } as const export const JOB_NAMES = { /** Job orchestrator: quét ERP rồi fan-out các job upsert. */ erpSync: 'erp', /** Job upsert 1 sản phẩm (BullMQ tự retry khi lỗi). */ upsertProduct: 'upsert', } as const const defaultJobOptions = { attempts: 3, backoff: { type: 'exponential' as const, delay: 5000 }, removeOnComplete: 1000, removeOnFail: 5000, } /* | Lazy singletons — chỉ mở kết nối Redis khi thực sự enqueue, tránh việc | import module (vd ace liệt kê command) lại tự mở kết nối và treo process. */ let _pricingQueue: Queue | undefined let _syncQueue: Queue | undefined let _productQueue: Queue | undefined let _importQueue: Queue | undefined export function pricingQueue(): Queue { if (!_pricingQueue) { _pricingQueue = new Queue(QUEUE_NAMES.pricing, { connection: redisConnection, defaultJobOptions }) } return _pricingQueue } export function syncQueue(): Queue { if (!_syncQueue) { _syncQueue = new Queue(QUEUE_NAMES.sync, { connection: redisConnection, defaultJobOptions }) } return _syncQueue } export function productQueue(): Queue { if (!_productQueue) { _productQueue = new Queue(QUEUE_NAMES.product, { connection: redisConnection, defaultJobOptions }) } return _productQueue } export function importQueue(): Queue { if (!_importQueue) { _importQueue = new Queue(QUEUE_NAMES.import, { connection: redisConnection, defaultJobOptions }) } return _importQueue } /** * Đóng tất cả kết nối Redis của các queue đã mở. * Dùng cho command chạy 1 lần để tiến trình thoát sạch. */ export async function closeQueues(): Promise { await Promise.all([ _pricingQueue?.close(), _syncQueue?.close(), _productQueue?.close(), _importQueue?.close(), ]) _pricingQueue = _syncQueue = _productQueue = _importQueue = undefined } /** Đẩy job gợi ý giá cho 1 product. */ export async function enqueuePricingSuggest(productId: number, username: string) { return pricingQueue().add('suggest', { productId, username }) } /** Đẩy nhiều job gợi ý giá (batch). */ export async function enqueuePricingBatch(productIds: number[], username: string) { return pricingQueue().addBulk( productIds.map((productId) => ({ name: 'suggest', data: { productId, username } })) ) } /** Đẩy job orchestrator đồng bộ ERP (job này sẽ tự fan-out các job upsert). */ export async function enqueueSync(username: string) { return syncQueue().add(JOB_NAMES.erpSync, { username }) } /** ID cố định cho scheduler sync hằng ngày (idempotent). */ export const SYNC_SCHEDULER_ID = 'daily-erp-sync' /** * Tạo/cập nhật job scheduler chạy sync ERP định kỳ (cron). * Idempotent: gọi lại nhiều lần chỉ cập nhật lịch, không nhân đôi. */ export async function upsertSyncScheduler( pattern: string, tz: string | undefined, username = 'cron' ) { return syncQueue().upsertJobScheduler( SYNC_SCHEDULER_ID, { pattern, ...(tz ? { tz } : {}) }, { name: JOB_NAMES.erpSync, data: { username } } ) } /** Gỡ scheduler sync định kỳ. */ export async function removeSyncScheduler() { return syncQueue().removeJobScheduler(SYNC_SCHEDULER_ID) } /** * Đẩy batch job upsert sản phẩm — mỗi sản phẩm 1 job. * BullMQ tự retry job lỗi theo `defaultJobOptions` (attempts + backoff), * nên các sản phẩm lỗi sẽ được sync lại tự động sau cùng. */ export async function enqueueProductUpserts(items: ErpProductItem[], username: string) { return productQueue().addBulk( items.map((item) => ({ name: JOB_NAMES.upsertProduct, data: { item, username }, })) ) }