import net from 'node:net' import fs from 'node:fs' import { Server as SocketIOServer } from 'socket.io' import http from 'node:http' import LineConnection from '../app/services/line_connection.js' import { ApplicationService } from '@adonisjs/core/types' import env from '#start/env' import { CustomServer, CustomSocket } from '../app/ultils/types.js' import Line from '#models/line' import Station from '#models/station' import APCController from '#services/apc_connection' import { appendLog, sleep } from '../app/ultils/helper.js' import SwitchController from '#services/switch_connection' import redis from '@adonisjs/redis/services/main' interface HandleOptions { command?: string actionApc?: string scenario?: any timeout?: number baud?: number } type LineAction = (line: LineConnection, options?: HandleOptions) => Promise | void export default class SocketIoProvider { private static _io: CustomServer constructor(protected app: ApplicationService) {} /** * Register bindings to the container */ register() {} /** * The container bindings have booted */ async boot() {} /** * The application has been booted */ async start() {} /** * The process has been started */ async ready() { if (process.argv[1].includes('server.js')) { const webSocket = new WebSocketIo(this.app) SocketIoProvider._io = await webSocket.boot() } } /** * Preparing to shutdown the app */ async shutdown() {} public static get io() { return this._io } } export class WebSocketIo { intervalMap: { [key: string]: NodeJS.Timeout } = {} lineMap: Map = new Map() // key = lineId userConnecting: Map = new Map() apcsControl: Map = new Map() switchControl: Map = new Map() lineConnecting: number[] = [] // key = lineId intervalKeepConnect: { [key: string]: NodeJS.Timeout } = {} constructor(protected app: ApplicationService) {} async boot() { const SOCKET_IO_PORT = env.get('SOCKET_PORT') || 8989 const FRONTEND_URL = env.get('FRONTEND_URL') || 'http://localhost:5173' await this.restoreState() const socketServer = http.createServer() const io = new SocketIOServer(socketServer, { pingInterval: 25000, // 25s server gửi ping pingTimeout: 20000, // chờ 20s không có pong thì disconnect cors: { origin: [FRONTEND_URL], methods: ['GET', 'POST'], credentials: true, }, }) io.on('connection', (socket: CustomSocket) => { const { userId, userName } = socket.handshake.auth console.log('Socket connected:', socket.id) socket.connectionTime = new Date() this.userConnecting.set(userId, { userId, userName }) setTimeout(() => { const listUser = Array.from(this.userConnecting.values()) if (!listUser.find((el) => el.userId === userId)) { listUser.push({ userId, userName }) } io.emit('user_connecting', listUser) }, 500) setTimeout(() => { io.to(socket.id).emit( 'init', Array.from(this.lineMap.values()).map((el) => el?.config || {}) ) }, 500) socket.on('disconnect', () => { console.log(`FE disconnected: ${socket.id}`) this.userConnecting.delete(userId) const listLine = Array.from(this.lineMap.values()).map((el) => el?.config || {}) listLine.forEach((el) => { if (el?.userOpenCLI === userName) { const line = this.lineMap.get(el.id) if (line) { line.config.openCLI = false line.config.userEmailOpenCLI = '' line.config.userOpenCLI = '' io.emit('user_close_cli', { stationId: line.config.stationId, lineId: line.config.id, userEmailOpenCLI: '', }) } } }) setTimeout(() => { io.emit('user_connecting', Array.from(this.userConnecting.values())) }, 200) }) // FE gửi yêu cầu connect lines socket.on('connect_lines', async (data) => { const { stationData, linesData } = data await this.connectLine(io, linesData, stationData) }) socket.on('write_command_line_from_web', async (data) => { const { lineIds, stationId, command } = data await this.handleLineOperation( io, stationId, lineIds, async (line) => command === 'spam_break' ? line.breakSpam() : line.writeCommand(command, userName), { command, timeout: 120000 } ) }) socket.on('run_scenario', async (data) => { const lineId = data.id const scenario = data.scenario await this.handleLineOperation( io, data.stationId, [lineId], async (line) => line.runScript(scenario, userName), { scenario, timeout: scenario?.timeout ? Number(scenario.timeout) + 120000 : 300000, } ) }) socket.on('set_baud', async (data) => { console.log('Set baud', data) const lineId = data.lineId const baud = data.baud const line = await Line.find(lineId) if (!line) { console.log(`Line [${lineId}] not found!!!`) return } Object.assign(line, { baud }) line?.save() await this.setBaudByLineNumber(data.stationId, line?.lineNumber, baud) }) socket.on('open_cli', async (data) => { const { lineId, userEmail, userName: name, stationId } = data const line = this.lineMap.get(lineId) if (line) { if (line?.userOpenCLI) line?.userOpenCLI({ userEmail, userName: name }) else { line.config.openCLI = true line.config.userEmailOpenCLI = userEmail line.config.userOpenCLI = userName io.emit('user_open_cli', { stationId: line.config.stationId, lineId: line.config.id, userEmailOpenCLI: userEmail, userOpenCLI: userName, }) } } else { if (this.lineConnecting.includes(lineId)) return const linesData = await Line.findBy('id', lineId) const stationData = await Station.findBy('id', stationId) if (linesData && stationData) { this.lineConnecting.push(lineId) await this.connectLine(io, [linesData], stationData) const lineReconnect = this.lineMap.get(lineId) if (lineReconnect) { lineReconnect.userOpenCLI({ userEmail, userName: name }) } } } }) socket.on('close_cli', async (data) => { const { lineId, stationId } = data const line = this.lineMap.get(lineId) if (line) { if (line?.userCloseCLI) line?.userCloseCLI() else { line.config.openCLI = false line.config.userEmailOpenCLI = '' line.config.userOpenCLI = '' io.emit('user_close_cli', { stationId: line.config.stationId, lineId: line.config.id, userEmailOpenCLI: '', }) } } else { if (this.lineConnecting.includes(lineId)) return const linesData = await Line.findBy('id', lineId) const stationData = await Station.findBy('id', stationId) if (linesData && stationData) { this.lineConnecting.push(lineId) await this.connectLine(io, [linesData], stationData) const lineReconnect = this.lineMap.get(lineId) if (lineReconnect) { lineReconnect.userCloseCLI() } } } }) socket.on('request_take_over', async (data) => { io.emit('confirm_take_over', data) }) socket.on('get_list_logs', async () => { let getListSystemLogs = fs .readdirSync('storage/system_logs') .map((f) => 'storage/system_logs/' + f) io.to(socket.id).emit('list_logs', getListSystemLogs) }) socket.on('get_content_log', async (data) => { try { const { line, socketId } = data const filePath = line.systemLogUrl if (fs.existsSync(filePath)) { // Get file stats const stats = fs.statSync(filePath) const fileSizeInBytes = stats.size if (fileSizeInBytes / 1024 / 1024 > 0.5) { // File is larger than 0.5 MB const fileId = Date.now() // Mã định danh file const chunkSize = 64 * 1024 // 64KB const fileBuffer = fs.readFileSync(filePath) const totalChunks = Math.ceil(fileBuffer.length / chunkSize) for (let i = 0; i < totalChunks; i++) { const chunk = fileBuffer.slice(i * chunkSize, (i + 1) * chunkSize) io.to(socketId).emit('response_content_log', { type: 'chunk', chunk: { fileId, chunkIndex: i, totalChunks, chunk, }, }) } } else { console.log(filePath) const content = fs.readFileSync(filePath) socket.emit('response_content_log', content) } } else { io.to(socketId).emit('response_content_log', Buffer.from('File not found', 'utf-8')) } } catch (error) { console.log(error) } }) socket.on('control_apc', async (data) => { const { outletNumbers, action, station, apcName } = data if (action === 'reconnect') { if (!station) return const apcIp = (station as any)[`${apcName}_ip`] as string const apc = this.apcsControl.get(apcIp) if (apc) { await apc.reconnect() this.keepConnectAPC(apcIp, io) } else await this.connectApc(io, apcName, station) } else { for (const outletNumber of outletNumbers) { if (!outletNumber || outletNumber < 0) return if (!station) return // find line from station by apcName and outletNumber const lines = await Line.query() .where('station_id', station.id) .andWhere('apc_name', apcName) .andWhere('outlet', outletNumber) if (lines.length > 0) { const line = this.lineMap.get(lines[0].id) if (line) this.setTimeoutConnect(lines[0].id, line, 300000) } const apcIp = (station as any)[`${apcName}_ip`] as string const apc = this.apcsControl.get(apcIp) if (apc && apc.status !== 'CONNECTED') { await apc.reconnect() this.keepConnectAPC(apcIp, io) } if (apc) { switch (action) { case 'on': await apc?.turnOnOutlet(outletNumber) break case 'off': await apc?.turnOffOutlet(outletNumber) break case 'restart': await apc?.restartOutlet(outletNumber) break case 'reconnect': await apc?.reconnect() break default: break } setTimeout(() => { apc?.navigateToOutlets() }, 10000) } } } }) socket.on('connect_apc', async (data) => { try { const { apcIp, station, apcName } = data const apc = this.apcsControl.get(apcIp) if (apc && apc.status === 'CONNECTED') { socket.emit('apc_output', { stationId: station.id, apcNumber: apcName === 'apc_1' ? 1 : 2, data: apc.output, status: apc.status, }) this.keepConnectAPC(apcIp, io) } else if (apc && apc.status !== 'CONNECTED') { await apc.reconnect() this.apcsControl.set(apcIp, apc) this.keepConnectAPC(apcIp, io) } else await this.connectApc(io, apcName, station) } catch (error) { console.log(error) } }) socket.on('connect_switch', async (data) => { try { const { ip, station } = data const element = this.switchControl.get(ip) if (element && element.status === 'CONNECTED') { socket.emit('switch_output', { stationId: station.id, portGroups: element.portGroups, status: element.status, }) } else if (element && element.status !== 'CONNECTED') { await element.reconnect() } else await this.connectSwitch(io, station) } catch (error) { console.log(error) } }) socket.on('control_switch', async (data) => { const { ports, command, ip, station } = data const element1 = this.switchControl.get(ip) if (!element1) { await this.connectSwitch(io, station) } const element = this.switchControl.get(ip) if (element) { this.setTimeoutConnect(station.id, element) switch (command) { case 'on': ports.forEach(async (port: string) => { console.log(`Turn on port ${port}`) await element?.turnPortOn(port) await sleep(500) }) break case 'off': ports.forEach(async (port: string) => { console.log(`Turn off port ${port}`) await element?.turnPortOff(port) await sleep(500) }) break case 'restart': ports.forEach(async (port: string) => { console.log(`Restarting on port ${port}`) await element?.restartPort(port) await sleep(500) }) break case 'on-poe': ports.forEach(async (port: string) => { console.log(`Turn on port poe ${port}`) await element?.enablePoE(port) await sleep(500) }) break case 'off-poe': ports.forEach(async (port: string) => { console.log(`Turn off port poe ${port}`) await element?.disablePoE(port) await sleep(500) }) break case 'restart-poe': ports.forEach(async (port: string) => { console.log(`Restarting on port poe ${port}`) await element?.restartPoE(port) await sleep(500) }) break case 'reconnect': await element?.reconnect() break default: break } } }) socket.on('update_ticket', async (data) => { io.emit('update_ticket', data) }) }) socketServer.listen(SOCKET_IO_PORT, () => { console.log(`Socket server is running on port ${SOCKET_IO_PORT}`) }) // 🔹 Tự động lưu dữ liệu định kỳ mỗi 60 giây setInterval(async () => await this.saveState(), 60000) return io } private async connectLine( socket: any, lines: Line[], station: Station, output = '', inventory: string = '' ) { try { for (const line of lines) { const lineConn = new LineConnection( { id: line.id, port: line.port, ip: station.ip, lineNumber: line.lineNumber, stationId: station.id, apcName: line.apcName, outlet: line.outlet, output: output, status: '', openCLI: false, userEmailOpenCLI: '', userOpenCLI: '', data: [], commands: [], inventory: inventory, }, socket ) // 👉 Bước 1: clear line trước khi connect if (line.lineClear && line.lineClear > 0) await this.clearLineBeforeConnect(station.id, line.lineClear) this.lineMap.set(line.id, lineConn) await lineConn.connect() lineConn.writeCommand('\r\n\r\n') this.setTimeoutConnect(line.id, lineConn) } } catch (error) { console.log(error) } } private setTimeoutConnect = ( lineId: number, lineConn: LineConnection | SwitchController, timeout = 120000 ) => { if (this.intervalMap[`${lineId}`]) { clearInterval(this.intervalMap[`${lineId}`]) delete this.intervalMap[`${lineId}`] } const interval = setInterval(() => { if (lineConn.disconnect) lineConn.disconnect() // this.lineMap.delete(lineId) if (this.intervalMap[`${lineId}`]) { clearInterval(this.intervalMap[`${lineId}`]) delete this.intervalMap[`${lineId}`] } }, timeout) this.intervalMap[`${lineId}`] = interval } /** * Hàm xử lý chung cho mọi action (write command, runScript, v.v.) */ async handleLineOperation( io: CustomServer, stationId: number, lineIds: number[], action: LineAction, options: HandleOptions = {} ): Promise { for (const lineId of lineIds) { try { const line = this.lineMap.get(lineId) // console.log(line?.config) if (line && line.config.status === 'connected') { this.lineConnecting = this.lineConnecting.filter((el) => el !== lineId) this.setTimeoutConnect(lineId, line, options.timeout) await sleep(500) await action(line, options) } else { if (this.lineConnecting.includes(lineId)) continue const linesData = await Line.findBy('id', lineId) const stationData = await Station.findBy('id', stationId) if (linesData && stationData) { this.lineConnecting.push(lineId) await this.connectLine( io, [linesData], stationData, line?.config?.output || '', line?.config?.inventory || '' ) this.lineConnecting = this.lineConnecting.filter((el) => el !== lineId) const lineReconnect = this.lineMap.get(lineId) if (lineReconnect) { this.setTimeoutConnect(lineId, lineReconnect, options.timeout) await sleep(500) await action(lineReconnect, options) } } else { io.emit('line_disconnected', { stationId, lineId, status: 'disconnected', }) io.emit('line_error', { lineId, error: 'Line not connected\r\n', stationId }) } } } catch (err: any) { this.lineConnecting = this.lineConnecting.filter((el) => el !== lineId) io.emit('line_error', { lineId, error: `\n[ERROR] ${err.message}\n`, stationId }) } } } private async connectApc(socket: any, apcName: string, station: Station) { try { const ip = (station as any)[`${apcName}_ip`] as string const port = (station as any)[`${apcName}_port`] as number const username = (station as any)[`${apcName}_username`] as string const password = (station as any)[`${apcName}_password`] as string if (!ip || !port || !username || !password) throw new Error(`Missing APC configuration for ${apcName}`) // Tạo APC Controller instance const apc = new APCController({ host: ip, port, username, password, number: apcName === 'apc_1' ? 1 : 2, onData: (data: string, status: string) => { socket.emit('apc_output', { stationId: station.id, apcNumber: apcName === 'apc_1' ? 1 : 2, data, status, }) }, }) // Connect và login await apc.connect() await apc.login() this.apcsControl.set(ip, apc) this.keepConnectAPC(ip, socket) } catch (error) { console.log(error) } } private async connectSwitch(socket: any, station: Station) { try { const ip = station.switch_control_ip as string const port = station.switch_control_port as number const username = station.switch_control_username as string const password = station.switch_control_password as string if (!ip || !port || !password) { socket.emit('switch_output', { stationId: station.id, portGroups: [], status: 'DISCONNECTED', message: `Missing Switch configuration`, }) return } // Tạo APC Controller instance const infoSwitch = new SwitchController({ host: ip, port: port, username: username, password: password, onData: (ports?: any, status?: string) => { socket.emit('switch_output', { stationId: station.id, portGroups: ports, status, }) }, }) // Connect và login await infoSwitch.connect() await infoSwitch.login() await infoSwitch.getPorts() this.switchControl.set(ip, infoSwitch) this.setTimeoutConnect(station.id, infoSwitch) } catch (error) { console.log(error) } } private async clearLineBeforeConnect(stationId: number, clearLine: number) { const station = await Station.find(stationId) if (!station) { console.log('[ERROR connect station] Not found!') return } // Kết nối tới station qua Telnet / Socket const client = new net.Socket() return new Promise((resolve, reject) => { client.setTimeout(5000) client.connect(station.port, station.ip, async () => { console.log( `Connected to station ${station.name} (${station.ip}) to clear line ${clearLine}` ) // Gửi lệnh clear line client.write(`clear line ${clearLine}\r\n`) await sleep(500) client.write(`\r\n\r\n`) }) client.on('data', (data) => { const output = data.toString() if (output.includes('Clear completed') || output.includes('OK')) { console.log(`Line ${clearLine} cleared successfully.`) client.destroy() resolve() } }) client.on('error', (err) => { console.error(`Error clearing line ${clearLine}:`, err) resolve() }) client.on('close', () => { console.log(`Station connection closed (line ${clearLine})`) resolve() }) client.on('timeout', () => { console.log(`Station connection timeout (line ${clearLine})`) client.destroy() resolve() }) }) } async saveState() { const newMap = new Map() this.lineMap.forEach((line, id) => { if (line && line.config) { newMap.set(id, { config: { ...line.config, status: 'disconnected' }, } as LineConnection) } }) const data = { lineMap: newMap ? [...newMap.entries()] : [], } await redis.set('socket_state', JSON.stringify(data)) } async restoreState() { const raw = await redis.get('socket_state') if (!raw) return const parsed = JSON.parse(raw) this.lineMap = new Map(parsed.lineMap) } private keepConnectAPC = (ip: string, io: any) => { if (this.intervalKeepConnect[`${ip}`]) { clearInterval(this.intervalKeepConnect[`${ip}`]) delete this.intervalKeepConnect[`${ip}`] } const interval = setInterval(() => { const apcConnect = this.apcsControl.get(ip) if (apcConnect && apcConnect.status === 'CONNECTED') { apcConnect._send('ENTER') } }, 40000) this.intervalKeepConnect[`${ip}`] = interval } private async setBaudByLineNumber(stationId: number, lineNumber: number, baud: number) { const station = await Station.find(stationId) if (!station) { console.log('[ERROR connect station] Not found!') return } // Kết nối tới station qua Telnet / Socket const client = new net.Socket() return new Promise((resolve, reject) => { client.setTimeout(5000) client.connect(station.port, station.ip, async () => { console.log(`Connected to station ${station.name} (${station.ip})`) // Gửi lệnh clear line client.write(`conf t\r\n`) await sleep(500) client.write(`line ${lineNumber}\r\n`) await sleep(500) client.write(`speed ${baud.toString()}\r\n`) await sleep(500) client.write(`end`) await sleep(500) client.write(`\r\n`) await sleep(500) client.destroy() resolve() }) client.on('data', (data) => { appendLog(data.toString(), 0, 0, lineNumber) }) client.on('error', (err) => { console.error(`Error clearing line ${lineNumber}:`, err) resolve() }) client.on('close', () => { console.log(`Station connection closed (line ${lineNumber})`) resolve() }) client.on('timeout', () => { console.log(`Station connection timeout (line ${lineNumber})`) client.destroy() resolve() }) }) } }