import { BaseCommand } from '@adonisjs/core/ace' import type { CommandOptions } from '@adonisjs/core/types/ace' import app from '@adonisjs/core/services/app' import { Worker } from 'bullmq' import { QUEUE_NAMES, JOB_NAMES, redisConnection, upsertSyncScheduler } from '#services/queue_service' import env from '#start/env' import PricingService from '#services/pricing_service' import SyncService from '#services/sync_service' /** * Worker xử lý job nền (pricing batch, sync ERP). * Chạy: `node ace queue:work` (process riêng với HTTP server). */ export default class QueueWork extends BaseCommand { static commandName = 'queue:work' static description = 'Khởi động worker xử lý queue (pricing, sync, import)' static options: CommandOptions = { startApp: true, staysAlive: true } async run() { const concurrency = 5 this.logger.info('Queue worker đang chạy...') // Đăng ký cron sync ERP hằng ngày (idempotent) — luôn active khi worker chạy. const cronPattern = env.get('SYNC_CRON', '0 2 * * *') const cronTz = env.get('SYNC_TZ', 'Australia/Sydney') await upsertSyncScheduler(cronPattern, cronTz, 'cron') this.logger.info(`Cron sync ERP: "${cronPattern}" (tz: ${cronTz})`) const pricingWorker = new Worker( QUEUE_NAMES.pricing, async (job) => { if (job.name === 'suggest') { const { productId, username } = job.data return PricingService.suggestForProduct(productId, username) } }, { connection: redisConnection, concurrency } ) // Orchestrator: quét ERP rồi fan-out job upsert. Để concurrency 1 (1 lần sync). const syncWorker = new Worker( QUEUE_NAMES.sync, async (job) => { if (job.name === JOB_NAMES.erpSync) { return SyncService.syncFromErp(job.data?.username) } }, { connection: redisConnection, concurrency: 1 } ) // Worker upsert từng sản phẩm — chạy song song để xử lý nhanh khối lượng lớn. // Job lỗi sẽ được BullMQ tự retry (attempts + backoff) -> sync lại sau cùng. const productWorker = new Worker( QUEUE_NAMES.product, async (job) => { if (job.name === JOB_NAMES.upsertProduct) { return SyncService.upsertProduct(job.data?.item) } }, { connection: redisConnection, concurrency: 10 } ) for (const [name, w] of [ ['pricing', pricingWorker], ['sync', syncWorker], ['product', productWorker], ] as const) { w.on('completed', (job) => this.logger.info(`[${name}] job ${job.id} done`)) w.on('failed', (job, err) => this.logger.error(`[${name}] job ${job?.id} failed: ${err.message}`)) } // Giữ process sống cho tới khi nhận tín hiệu tắt await new Promise((resolve) => { app.terminating(async () => { const { default: EbayScraperService } = await import('#services/ebay_scraper_service') await Promise.all([ pricingWorker.close(), syncWorker.close(), productWorker.close(), EbayScraperService.close(), ]) resolve() }) }) } }