const {exec} = require("shelljs"); const amqplib = require("amqplib"); const io = require("socket.io-client"); const CONNECT_SOCKET_STR = "ws://socketio:3000"; const CONNECT_RABBIT_STR = "amqp://rabbit:rabbit123@rabbitmq"; const QUEUE = "browser"; const QUEUE_NOTIFY = "browser-notify"; async function processMessage(channel, message, processCallback) { const timestamp = Date.now(); const response = JSON.parse(message.content.toString()); const socketioStart = io.connect(CONNECT_SOCKET_STR, { transports: ["websocket"], forceNew: true, }); await new Promise((resolve) => { socketioStart.on("connect", () => { console.log("Socket.IO start was connected"); resolve(); }); }); console.log("[x] RECEIVED", response); socketioStart.emit("message", { to: QUEUE_NOTIFY, payload: { browser_name: process.env.BROWSER_NAME, id: timestamp, time_process: Date.now(), finished: false, }, }).close(); // TODO run script if (typeof processCallback === 'function') { await processCallback() } else { throw new Error('processCallback is not function') } // !TODO const socketioEnd = io.connect(CONNECT_SOCKET_STR, { transports: ["websocket"], forceNew: true, }); await new Promise((resolve) => { socketioEnd.on("connect", () => { console.log("Socket.IO end was connected"); resolve(); }); }); socketioEnd.emit("message", { to: QUEUE_NOTIFY, payload: { browser_name: process.env.BROWSER_NAME, id: timestamp, time_process: Date.now(), finished: true, }, }).close(); // wait see result finished 10s await new Promise(resolve => setTimeout(resolve, 10000)); channel.ack(message); } module.exports = async function(processCallback) { const conn = await amqplib.connect(CONNECT_RABBIT_STR); const channelProcess = await conn.createChannel(); console.log(`[${process.env.BROWSER_NAME}] Running...`); await channelProcess.assertQueue(QUEUE); await channelProcess.prefetch(1); await channelProcess.consume(QUEUE, async (message) => { await processMessage(channelProcess, message, processCallback); }, {noAck: false} ); }