const amqplib = require("amqplib"); const connectStr = "amqp://rabbit:rabbit123@localhost"; (async () => { const conn = await amqplib.connect(connectStr); const channel = await conn.createChannel(); const queue = "process1"; process.once("SIGINT", async () => { await channel.close(); await connection.close(); }); await channel.assertQueue(queue); await channel.prefetch(1); await channel.consume( queue, async (message) => { try { await processMessage(channel, message); } catch (error) { console.error("Error processing message:", error.message); // If processing fails, you can choose to requeue or acknowledge the message // Here, we're acknowledging the message even if an error occurs to remove it from the queue channel.ack(message); } }, {noAck: false} ); console.log("[*] Waiting for messages. To exit press CTRL+C"); })(); async function processMessage(channel, message) { try { const response = JSON.parse(message.content.toString()) console.log("[x] Received", response); await require('./puppeteer')({}) console.log("[x] Done"); // After successful processing, acknowledge the message to delete it from the queue channel.ack(message); } catch (error) { throw new Error("Error during processing: " + error.message); } }