message-robot/queue/queue.js

79 lines
2.2 KiB
JavaScript
Executable File

const {exec} = require("shelljs");
const amqplib = require("amqplib");
const io = require("socket.io-client");
const CONNECT_SOCKET_STR = "ws://socketio:3000";
const CONNECT_RABBIT_STR = "amqp://rabbit:rabbit123@rabbitmq";
const QUEUE = "browser";
const QUEUE_NOTIFY = "browser-notify";
async function processMessage(channel, message, processCallback) {
const timestamp = Date.now();
const response = JSON.parse(message.content.toString());
const socketioStart = io.connect(CONNECT_SOCKET_STR, {
transports: ["websocket"],
forceNew: true,
});
await new Promise((resolve) => {
socketioStart.on("connect", () => {
console.log("Socket.IO start was connected");
resolve();
});
});
console.log("[x] RECEIVED", response);
socketioStart.emit("message", {
to: QUEUE_NOTIFY,
payload: {
browser_name: process.env.BROWSER_NAME,
id: timestamp,
time_process: Date.now(),
finished: false,
},
}).close();
// TODO run script
if (typeof processCallback === 'function') {
await processCallback()
} else {
throw new Error('processCallback is not function')
}
// !TODO
const socketioEnd = io.connect(CONNECT_SOCKET_STR, {
transports: ["websocket"],
forceNew: true,
});
await new Promise((resolve) => {
socketioEnd.on("connect", () => {
console.log("Socket.IO end was connected");
resolve();
});
});
socketioEnd.emit("message", {
to: QUEUE_NOTIFY,
payload: {
browser_name: process.env.BROWSER_NAME,
id: timestamp,
time_process: Date.now(),
finished: true,
},
}).close();
// wait see result finished 10s
await new Promise(resolve => setTimeout(resolve, 10000));
channel.ack(message);
}
module.exports = async function(processCallback) {
const conn = await amqplib.connect(CONNECT_RABBIT_STR);
const channelProcess = await conn.createChannel();
console.log(`[${process.env.BROWSER_NAME}] Running...`);
await channelProcess.assertQueue(QUEUE);
await channelProcess.prefetch(1);
await channelProcess.consume(QUEUE,
async (message) => {
await processMessage(channelProcess, message, processCallback);
},
{noAck: false}
);
}