Listing_SuggestPrice/backend/commands/queue_work.ts

92 lines
3.4 KiB
TypeScript

import { BaseCommand } from '@adonisjs/core/ace'
import type { CommandOptions } from '@adonisjs/core/types/ace'
import app from '@adonisjs/core/services/app'
import { Worker } from 'bullmq'
import { QUEUE_NAMES, JOB_NAMES, redisConnection, upsertSyncScheduler } from '#services/queue_service'
import env from '#start/env'
import PricingService from '#services/pricing_service'
import SyncService from '#services/sync_service'
/**
* Worker xử lý job nền (pricing batch, sync ERP).
* Chạy: `node ace queue:work` (process riêng với HTTP server).
*/
export default class QueueWork extends BaseCommand {
static commandName = 'queue:work'
static description = 'Khởi động worker xử lý queue (pricing, sync, import)'
static options: CommandOptions = { startApp: true, staysAlive: true }
async run() {
const concurrency = 5
this.logger.info('Queue worker đang chạy...')
// Đăng ký cron sync ERP hằng ngày (idempotent) — luôn active khi worker chạy.
const cronPattern = env.get('SYNC_CRON', '0 2 * * *')
const cronTz = env.get('SYNC_TZ', 'Australia/Sydney')
await upsertSyncScheduler(cronPattern, cronTz, 'cron')
this.logger.info(`Cron sync ERP: "${cronPattern}" (tz: ${cronTz})`)
const pricingWorker = new Worker(
QUEUE_NAMES.pricing,
async (job) => {
if (job.name === 'suggest') {
const { productId, username } = job.data
return PricingService.suggestForProduct(productId, username)
}
// Batch: 1 job = 1 lô product = 1 request GPT gộp (xem enqueuePricingBatch).
if (job.name === 'suggestBatch') {
const { productIds, username } = job.data
return PricingService.suggestForProducts(productIds, username)
}
},
{ connection: redisConnection, concurrency }
)
// Orchestrator: quét ERP rồi fan-out job upsert. Để concurrency 1 (1 lần sync).
const syncWorker = new Worker(
QUEUE_NAMES.sync,
async (job) => {
if (job.name === JOB_NAMES.erpSync) {
return SyncService.syncFromErp(job.data?.username)
}
},
{ connection: redisConnection, concurrency: 1 }
)
// Worker upsert từng sản phẩm — chạy song song để xử lý nhanh khối lượng lớn.
// Job lỗi sẽ được BullMQ tự retry (attempts + backoff) -> sync lại sau cùng.
const productWorker = new Worker(
QUEUE_NAMES.product,
async (job) => {
if (job.name === JOB_NAMES.upsertProduct) {
return SyncService.upsertProduct(job.data?.item)
}
},
{ connection: redisConnection, concurrency: 10 }
)
for (const [name, w] of [
['pricing', pricingWorker],
['sync', syncWorker],
['product', productWorker],
] as const) {
w.on('completed', (job) => this.logger.info(`[${name}] job ${job.id} done`))
w.on('failed', (job, err) => this.logger.error(`[${name}] job ${job?.id} failed: ${err.message}`))
}
// Giữ process sống cho tới khi nhận tín hiệu tắt
await new Promise<void>((resolve) => {
app.terminating(async () => {
const { default: EbayScraperService } = await import('#services/ebay_scraper_service')
await Promise.all([
pricingWorker.close(),
syncWorker.close(),
productWorker.close(),
EbayScraperService.close(),
])
resolve()
})
})
}
}