ATC_SIMPLE/BACKEND/providers/socket_io_provider.ts

444 lines
14 KiB
TypeScript

import net from 'node:net'
import fs from 'node:fs'
import { Server as SocketIOServer } from 'socket.io'
import http from 'node:http'
import LineConnection from '../app/services/line_connection.js'
import { ApplicationService } from '@adonisjs/core/types'
import env from '#start/env'
import { CustomServer, CustomSocket } from '../app/ultils/types.js'
import Line from '#models/line'
import Station from '#models/station'
import APCController from '#services/apc_connection'
import { sleep } from '../app/ultils/helper.js'
interface HandleOptions {
command?: string
actionApc?: string
scenario?: any
timeout?: number
}
type LineAction = (line: LineConnection, options?: HandleOptions) => Promise<void | unknown> | void
export default class SocketIoProvider {
private static _io: CustomServer
constructor(protected app: ApplicationService) {}
/**
* Register bindings to the container
*/
register() {}
/**
* The container bindings have booted
*/
async boot() {}
/**
* The application has been booted
*/
async start() {}
/**
* The process has been started
*/
async ready() {
if (process.argv[1].includes('server.js')) {
const webSocket = new WebSocketIo(this.app)
SocketIoProvider._io = await webSocket.boot()
}
}
/**
* Preparing to shutdown the app
*/
async shutdown() {}
public static get io() {
return this._io
}
}
export class WebSocketIo {
intervalMap: { [key: string]: NodeJS.Timeout } = {}
stationMap: Map<number, Station> = new Map()
lineMap: Map<number, LineConnection> = new Map() // key = lineId
lineConnecting: number[] = [] // key = lineId
userConnecting: Map<number, { userId: number; userName: string }> = new Map()
apcsControl: Map<string, APCController> = new Map()
constructor(protected app: ApplicationService) {}
async boot() {
const SOCKET_IO_PORT = env.get('SOCKET_PORT') || 8989
const FRONTEND_URL = env.get('FRONTEND_URL') || 'http://localhost:5173'
const socketServer = http.createServer()
const io = new SocketIOServer(socketServer, {
pingInterval: 25000, // 25s server gửi ping
pingTimeout: 20000, // chờ 20s không có pong thì disconnect
cors: {
origin: [FRONTEND_URL],
methods: ['GET', 'POST'],
credentials: true,
},
})
io.on('connection', (socket: CustomSocket) => {
const { userId, userName } = socket.handshake.auth
console.log('Socket connected:', socket.id)
socket.connectionTime = new Date()
this.userConnecting.set(userId, { userId, userName })
setTimeout(() => {
const listUser = Array.from(this.userConnecting.values())
if (!listUser.find((el) => el.userId === userId)) {
listUser.push({ userId, userName })
}
io.emit('user_connecting', listUser)
}, 500)
setTimeout(() => {
io.to(socket.id).emit(
'init',
Array.from(this.lineMap.values()).map((el) => el.config)
)
}, 500)
socket.on('disconnect', () => {
console.log(`FE disconnected: ${socket.id}`)
this.userConnecting.delete(userId)
setTimeout(() => {
io.emit('user_connecting', Array.from(this.userConnecting.values()))
}, 200)
})
// FE gửi yêu cầu connect lines
socket.on('connect_lines', async (data) => {
const { stationData, linesData } = data
await this.connectLine(io, linesData, stationData)
})
socket.on('write_command_line_from_web', async (data) => {
const { lineIds, stationId, command } = data
await this.handleLineOperation(
io,
stationId,
lineIds,
async (line) => line.writeCommand(command),
{ command, timeout: 120000 }
)
})
socket.on('run_scenario', async (data) => {
const lineId = data.id
const scenario = data.scenario
await this.handleLineOperation(
io,
data.stationId,
[lineId],
async (line) => line.runScript(scenario),
{
scenario,
timeout: scenario?.timeout ? Number(scenario.timeout) + 120000 : 300000,
}
)
})
socket.on('open_cli', async (data) => {
const { lineId, userEmail, userName: name, stationId } = data
const line = this.lineMap.get(lineId)
if (line) {
line.userOpenCLI({ userEmail, userName: name })
} else {
if (this.lineConnecting.includes(lineId)) return
const linesData = await Line.findBy('id', lineId)
const stationData = await Station.findBy('id', stationId)
if (linesData && stationData) {
this.lineConnecting.push(lineId)
await this.connectLine(io, [linesData], stationData)
const lineReconnect = this.lineMap.get(lineId)
if (lineReconnect) {
lineReconnect.userOpenCLI({ userEmail, userName: name })
}
}
}
})
socket.on('close_cli', async (data) => {
const { lineId, stationId } = data
const line = this.lineMap.get(lineId)
if (line) {
line.userCloseCLI()
} else {
if (this.lineConnecting.includes(lineId)) return
const linesData = await Line.findBy('id', lineId)
const stationData = await Station.findBy('id', stationId)
if (linesData && stationData) {
this.lineConnecting.push(lineId)
await this.connectLine(io, [linesData], stationData)
const lineReconnect = this.lineMap.get(lineId)
if (lineReconnect) {
lineReconnect.userCloseCLI()
}
}
}
})
socket.on('request_take_over', async (data) => {
io.emit('confirm_take_over', data)
})
socket.on('get_list_logs', async () => {
let getListSystemLogs = fs
.readdirSync('storage/system_logs')
.map((f) => 'storage/system_logs/' + f)
io.to(socket.id).emit('list_logs', getListSystemLogs)
})
socket.on('get_content_log', async (data) => {
try {
const { line, socketId } = data
const filePath = line.systemLogUrl
if (fs.existsSync(filePath)) {
// Get file stats
const stats = fs.statSync(filePath)
const fileSizeInBytes = stats.size
if (fileSizeInBytes / 1024 / 1024 > 0.5) {
// File is larger than 0.5 MB
const fileId = Date.now() // Mã định danh file
const chunkSize = 64 * 1024 // 64KB
const fileBuffer = fs.readFileSync(filePath)
const totalChunks = Math.ceil(fileBuffer.length / chunkSize)
for (let i = 0; i < totalChunks; i++) {
const chunk = fileBuffer.slice(i * chunkSize, (i + 1) * chunkSize)
io.to(socketId).emit('response_content_log', {
type: 'chunk',
chunk: {
fileId,
chunkIndex: i,
totalChunks,
chunk,
},
})
}
} else {
console.log(filePath)
const content = fs.readFileSync(filePath)
socket.emit('response_content_log', content)
}
} else {
io.to(socketId).emit('response_content_log', Buffer.from('File not found', 'utf-8'))
}
} catch (error) {
console.log(error)
}
})
socket.on('control_apc', async (data) => {
const { lineIds, stationId, action } = data
await this.handleLineOperation(
io,
stationId,
lineIds,
async (line) => line.apcControl(action),
{ actionApc: action, timeout: 120000 }
)
})
socket.on('connect_apc', async (data) => {
const { apcIp, station, apcName } = data
const apc = this.apcsControl.get(apcIp)
if (apc && apc.status === 'CONNECTED') {
return
} else if (apc && apc.status !== 'CONNECTED') {
await apc.reconnect()
} else await this.connectApc(io, apcName, station)
})
})
socketServer.listen(SOCKET_IO_PORT, () => {
console.log(`Socket server is running on port ${SOCKET_IO_PORT}`)
})
return io
}
private async connectLine(socket: any, lines: Line[], station: Station) {
try {
this.stationMap.set(station.id, station)
for (const line of lines) {
const lineConn = new LineConnection(
{
id: line.id,
port: line.port,
ip: station.ip,
lineNumber: line.lineNumber,
stationId: station.id,
apcName: line.apcName,
outlet: line.outlet,
output: '',
status: '',
openCLI: false,
userEmailOpenCLI: '',
userOpenCLI: '',
data: [],
},
socket
)
// 👉 Bước 1: clear line trước khi connect
if (line.lineClear && line.lineClear > 0)
await this.clearLineBeforeConnect(station.id, line.lineClear)
this.lineMap.set(line.id, lineConn)
await lineConn.connect()
lineConn.writeCommand('\r\n\r\n')
this.setTimeoutConnect(line.id, lineConn)
}
} catch (error) {
console.log(error)
}
}
private setTimeoutConnect = (lineId: number, lineConn: LineConnection, timeout = 120000) => {
if (this.intervalMap[`${lineId}`]) {
clearInterval(this.intervalMap[`${lineId}`])
delete this.intervalMap[`${lineId}`]
}
const interval = setInterval(() => {
lineConn.disconnect()
// this.lineMap.delete(lineId)
if (this.intervalMap[`${lineId}`]) {
clearInterval(this.intervalMap[`${lineId}`])
delete this.intervalMap[`${lineId}`]
}
}, timeout)
this.intervalMap[`${lineId}`] = interval
}
/**
* Hàm xử lý chung cho mọi action (writeCommand, runScript, v.v.)
*/
async handleLineOperation(
io: CustomServer,
stationId: number,
lineIds: number[],
action: LineAction,
options: HandleOptions = {}
): Promise<void> {
for (const lineId of lineIds) {
try {
const line = this.lineMap.get(lineId)
if (line && line.config.status === 'connected') {
this.lineConnecting = this.lineConnecting.filter((el) => el !== lineId)
this.setTimeoutConnect(lineId, line, options.timeout)
await action(line, options)
} else {
if (this.lineConnecting.includes(lineId)) continue
const linesData = await Line.findBy('id', lineId)
const stationData = await Station.findBy('id', stationId)
if (linesData && stationData) {
this.lineConnecting.push(lineId)
await this.connectLine(io, [linesData], stationData)
this.lineConnecting = this.lineConnecting.filter((el) => el !== lineId)
const lineReconnect = this.lineMap.get(lineId)
if (lineReconnect) {
this.setTimeoutConnect(lineId, lineReconnect, options.timeout)
await action(lineReconnect, options)
}
} else {
io.emit('line_disconnected', {
stationId,
lineId,
status: 'disconnected',
})
io.emit('line_error', { lineId, error: 'Line not connected\r\n', stationId })
}
}
} catch (err: any) {
io.emit('line_error', { lineId, error: `[ERROR] ${err.message}\r\n`, stationId })
}
}
}
private async connectApc(socket: any, apcName: string, station: Station) {
try {
const ip = (station as any)[`${apcName}_ip`] as string
const port = (station as any)[`${apcName}_port`] as number
const username = (station as any)[`${apcName}_username`] as string
const password = (station as any)[`${apcName}_password`] as string
if (!ip || !port || !username || !password)
throw new Error(`Missing APC configuration for ${apcName}`)
// Tạo APC Controller instance
const apc = new APCController({
host: ip,
port,
username,
password,
number: apcName === 'apc_1' ? 1 : 2,
onData: (data: string, status: string) => {
socket.emit('apc_output', {
stationId: station.id,
apcNumber: apcName === 'apc_1' ? 1 : 2,
data,
status,
})
},
})
// Connect và login
await apc.connect()
await apc.login()
this.apcsControl.set(ip, apc)
} catch (error) {
console.log(error)
}
}
private async clearLineBeforeConnect(stationId: number, clearLine: number) {
const station = await Station.find(stationId)
if (!station) throw new Error(`Station ${stationId} not found`)
// Kết nối tới station qua Telnet / Socket
const client = new net.Socket()
return new Promise<void>((resolve, reject) => {
client.connect(station.port, station.ip, async () => {
console.log(
`Connected to station ${station.name} (${station.ip}) to clear line ${clearLine}`
)
// Gửi lệnh clear line
client.write(`clear line ${clearLine}\r\n`)
await sleep(500)
client.write(`\r\n\r\n`)
})
client.on('data', (data) => {
const output = data.toString()
if (output.includes('Clear completed') || output.includes('OK')) {
console.log(`Line ${clearLine} cleared successfully.`)
client.destroy()
resolve()
}
})
client.on('error', (err) => {
console.error(`Error clearing line ${clearLine}:`, err)
reject(err)
})
client.on('close', () => {
console.log(`Station connection closed (line ${clearLine})`)
})
})
}
}