laravel-rabbitmq/consumer/consumer.js

48 lines
1.3 KiB
JavaScript

const amqplib = require("amqplib");
const connectStr = "amqp://rabbit:rabbit123@rabbitmq";
(async () => {
const conn = await amqplib.connect(connectStr);
const channel = await conn.createChannel();
const queue = "process1";
process.once("SIGINT", async () => {
await channel.close();
await connection.close();
});
await channel.assertQueue(queue);
await channel.prefetch(1);
await channel.consume(
queue,
async (message) => {
try {
await processMessage(channel, message);
} catch (error) {
console.error("Error processing message:", error.message);
// If processing fails, you can choose to requeue or acknowledge the message
// Here, we're acknowledging the message even if an error occurs to remove it from the queue
channel.ack(message);
}
},
{noAck: false}
);
console.log("[*] Waiting for messages. To exit press CTRL+C");
})();
async function processMessage(channel, message) {
try {
const response = JSON.parse(message.content.toString())
console.log("[x] Received", response);
await require('./puppeteer')({})
console.log("[x] Done");
// After successful processing, acknowledge the message to delete it from the queue
channel.ack(message);
} catch (error) {
throw new Error("Error during processing: " + error.message);
}
}