ATC_SIMPLE/BACKEND/app/services/line_connection.ts

819 lines
24 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import fs from 'node:fs'
import { textfsmResults } from './../ultils/templates/index.js'
import net from 'node:net'
import {
appendLog,
cleanData,
getLogWithTimeScenario,
isValidJson,
mapToLineFormat,
sleep,
} from '../ultils/helper.js'
import Scenario from '#models/scenario'
import Station from '#models/station'
import APCController from './apc_connection.js'
import path from 'node:path'
import axios from 'axios'
import redis from '@adonisjs/redis/services/main'
import Line from '#models/line'
type Inventory = {
pid: string
vid: string
sn: string
licenseLevel: string
licenseType: string
nextLicenseLevel: string
}
interface LineConfig {
id: number
port: number
lineNumber: number
ip: string
stationId: number
stationName: string
stationIp: string
apcName?: string
outlet: number
output: string
status: string
baud: number
openCLI: boolean
userEmailOpenCLI: string
userOpenCLI: string
inventory: string
latestScenario?: {
name: string
time: number
detectAI?: {
status: string[]
issue: string[]
}
}
data: {
command: string
output: string
textfsm: string
}[]
commands: string[]
// history: string
}
/** HISTORY
* PID
* SN
* VID
* Timestamp
* Scenario
*/
interface HistoryItem {
pid: string
vid: string
sn: string
scenario: string
id: number
number: number
stationId: number
timestamp?: number
}
interface User {
userEmail: string
userName: string
}
interface DataDPELP {
line: number
pid: any
vid: any
sn: any
mode: string
mac: string
license: any
issues: string[]
}
export default class LineConnection {
public client: net.Socket
public config: LineConfig
public readonly socketIO: any
private outputBuffer: string
private isRunningScript: boolean
private connecting: boolean
private waitingScenario: boolean
private outputInventory: string
private outputScenario: string
// private bufferCommand: string
public dataDPELP: DataDPELP | string
private retryConnect: number
public handleClearLine: () => void
constructor(config: LineConfig, socketIO: any, handleClearLine: () => void) {
this.config = config
this.socketIO = socketIO
this.client = new net.Socket()
this.outputBuffer = ''
this.isRunningScript = false
this.connecting = false
this.waitingScenario = false
this.outputInventory = ''
this.outputScenario = ''
// this.bufferCommand = ''
this.dataDPELP = 'No data'
this.retryConnect = 0
this.handleClearLine = handleClearLine
}
connect(timeoutMs = 5000) {
return new Promise<void>((resolve, reject) => {
const { ip, port, lineNumber, id, stationId } = this.config
let resolvedOrRejected = false
// Set timeout
this.client.setTimeout(timeoutMs)
console.log(`🔌 Connecting to line ${lineNumber} (${ip}:${port})...`)
this.client.connect(port, ip, () => {
if (resolvedOrRejected) return
resolvedOrRejected = true
console.log(`[${Date.now()}] ✅ Connected to line ${lineNumber} (${ip}:${port})`)
this.connecting = true
setTimeout(() => {
this.config.status = 'connected'
// this.retryConnect = 0
this.connecting = false
this.socketIO.emit('line_connected', {
stationId,
lineId: id,
lineNumber,
status: 'connected',
})
resolve()
}, 1000)
})
this.client.on('data', (data) => {
let message = this.connecting ? cleanData(data.toString()) : data.toString()
let rawData = ''
if (this.isRunningScript) {
this.waitingScenario = true
this.outputBuffer += message
this.outputScenario += message
if (!this.config.inventory)
this.outputInventory = this.outputInventory.slice(-3000) + message
}
if (message.includes('--More--')) this.writeCommand(' ')
// let output = cleanData(message)
// console.log(`📨 [${this.config.port}] ${message}`)
// Handle netOutput with backspace support
for (const char of message) {
if (char === '\x7F' || char === '\x08') {
this.config.output = this.config.output.slice(0, -1)
// message = message.slice(0, -1)
} else {
rawData += char
}
}
this.config.output += cleanData(rawData)
this.config.output = this.config.output.slice(-15000)
this.socketIO.emit('line_output', {
stationId,
lineId: id,
data: message,
commands: this.config.commands,
})
if (!this.config.inventory) {
setTimeout(() => {
this.getInventory()
}, 5000)
}
appendLog(
cleanData(message),
this.config.stationId,
this.config.stationName,
this.config.stationIp,
this.config.lineNumber
)
})
this.client.on('error', (err) => {
if (resolvedOrRejected) return
resolvedOrRejected = true
console.error(`❌ Error line ${lineNumber}:`, err.message)
this.config.output += '\r\n' + err.message + '\r\n'
this.socketIO.emit('line_error', {
stationId,
lineId: id,
error: '\r\n' + err.message + '\r\n',
})
resolve()
})
this.client.on('close', async () => {
if (resolvedOrRejected) return
resolvedOrRejected = true
console.log(`[${Date.now()}] 🔌 Line ${lineNumber} disconnected`)
this.config.status = 'disconnected'
// this.config.inventory = undefined
this.socketIO.emit('line_disconnected', {
stationId,
lineId: id,
lineNumber,
status: 'disconnected',
})
if (this.retryConnect <= 5) {
await this.sleep(5000)
console.log(`Retry connect line [${this.config.lineNumber}] times`, this.retryConnect)
this.retryConnect += 1
await this.reconnect()
} else {
this.retryConnect = 0
}
})
this.client.on('timeout', () => {
if (resolvedOrRejected) return
resolvedOrRejected = true
const message = '\r\nConnection timeout!!\r\n'
this.config.output += message
this.socketIO.emit('line_output', {
stationId,
lineId: id,
data: message,
})
appendLog(
cleanData(message),
this.config.stationId,
this.config.stationName,
this.config.stationIp,
this.config.lineNumber
)
console.log(`⏳ Connection timeout line ${lineNumber}`)
this.client.destroy()
resolve()
// reject(new Error('Connection timeout'))
})
})
}
private sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms))
}
async writeCommand(cmd: string | Buffer<ArrayBuffer>, userName = '') {
if (this.client.destroyed) {
console.log(`⚠️ Cannot send, line ${this.config.lineNumber} is closed`)
this.disconnect()
await sleep(2000)
await this.connect()
return
}
// console.log(
// `Write command "${cmd.toString().replace(/\r/g, '\\r').replace(/\n/g, '\\n')}" to line ${this.config.lineNumber} of ${this.config.stationName}`
// )
this.client.write(cmd)
if (userName) {
// appendLog(
// `\n---${userName}---\n`,
// this.config.stationId,
// this.config.lineNumber,
// this.config.port
// )
// for (const char of command) {
// if (char === '\x7F') this.bufferCommand = this.bufferCommand.slice(0, -1)
// else if (char === '\r' && cleanData(this.bufferCommand).length > 0) {
// this.config.commands = [
// cleanData(this.bufferCommand.replace('\r', '')),
// ...this.config.commands.filter(
// (el) => el !== cleanData(this.bufferCommand.replace('\r', ''))
// ),
// ].slice(0, 10)
// this.bufferCommand = ''
// } else this.bufferCommand += char
// }
}
}
async disconnect() {
try {
this.handleClearLine()
this.client.destroy()
this.config.status = 'disconnected'
this.socketIO.emit('line_disconnected', {
...this.config,
status: 'disconnected',
})
console.log(`🔻 Closed connection to line ${this.config.lineNumber}`)
} catch (e) {
console.error('Error closing line:', e)
}
}
async runScript(script: Scenario, userName: string) {
if (!this.client || this.client.destroyed) {
console.log('Not connected')
this.isRunningScript = false
this.socketIO.emit('running_scenario', {
stationId: this.config.stationId,
lineId: this.config.id,
title: '',
})
this.outputBuffer = ''
return
}
if (this.isRunningScript) {
console.log('Script already running')
return
}
console.log(
`Run scenario "${script?.title}" to line ${this.config.lineNumber} of ${this.config.stationName}`
)
if (script?.title === 'DPELP') this.dataDPELP = ''
this.isRunningScript = true
this.socketIO.emit('running_scenario', {
stationId: this.config.stationId,
lineId: this.config.id,
title: script?.title,
})
const now = Date.now()
this.outputScenario += `\n\n---start-scenarios---${now}---${userName}---${script?.title}---\n---scenario---${script?.title}---${now}---\n`
appendLog(
`\n\n---start-scenarios---${now}---${userName}---${script?.title}---\n---scenario---${script?.title}---${now}---\n`,
this.config.stationId,
this.config.stationName,
this.config.stationIp,
this.config.lineNumber
)
this.config.latestScenario = {
name: script?.title,
time: now,
}
const steps = typeof script?.body === 'string' ? JSON.parse(script?.body) : []
let stepIndex = 0
return new Promise((resolve, reject) => {
const timeoutTimer = setTimeout(() => {
this.isRunningScript = false
this.socketIO.emit('running_scenario', {
stationId: this.config.stationId,
lineId: this.config.id,
title: '',
})
this.outputBuffer = ''
this.outputScenario = ''
this.config.output += 'Timeout run scenario'
this.socketIO.emit('line_output', {
stationId: this.config.stationId,
lineId: this.config.id,
data: 'Timeout run scenario',
})
this.outputScenario += `\n---end-scenarios---${now}---${userName}---\n`
appendLog(
`\n---end-scenarios---${now}---${userName}---\n`,
this.config.stationId,
this.config.stationName,
this.config.stationIp,
this.config.lineNumber
)
// reject(new Error('Script timeout'))
}, script.timeout || 300000)
const runStep = async (index: number) => {
if (index >= steps.length) {
if (this.waitingScenario) {
this.waitingScenario = false
setTimeout(() => {
runStep(index)
}, 5000)
return
} else clearTimeout(timeoutTimer)
this.isRunningScript = false
this.socketIO.emit('running_scenario', {
stationId: this.config.stationId,
lineId: this.config.id,
title: '',
})
this.outputBuffer = ''
this.outputScenario += `\n---end-scenarios---${now}---${userName}---\n`
appendLog(
`\n---end-scenarios---${now}---${userName}---\n`,
this.config.stationId,
this.config.stationName,
this.config.stationIp,
this.config.lineNumber
)
const logScenarios = getLogWithTimeScenario(this.outputScenario, now) || ''
const data = textfsmResults(logScenarios, '')
try {
data.forEach((item) => {
if (item?.textfsm && isValidJson(item?.textfsm)) {
if (
['show inventory', 'sh inventory', 'show inv', 'sh inv'].includes(item.command)
) {
const dataInventory = JSON.parse(item.textfsm)[0]
this.config.inventory = dataInventory
this.addHistory(this.config.stationId, this.config.id, {
id: this.config.id,
number: this.config.lineNumber,
stationId: this.config.stationId,
pid: dataInventory?.pid,
sn: dataInventory?.sn,
vid: dataInventory?.vid,
scenario: script?.title,
timestamp: Date.now(),
})
}
item.textfsm = JSON.parse(item.textfsm)
}
})
const detectLog = await this.detectLogWithAI(logScenarios)
const result = mapToLineFormat({
lineNumber: this.config.lineNumber,
inventory: this.config.inventory,
latestScenario: {
detectAI: detectLog,
},
data,
})
if (script?.title === 'DPELP') this.dataDPELP = result
if (this.config.latestScenario)
this.config.latestScenario = { ...this.config.latestScenario, detectAI: detectLog }
this.config.data = data
this.socketIO.emit('data_textfsm', {
stationId: this.config.stationId,
lineId: this.config.id,
data,
inventory: this.config.inventory || null,
latestScenario: this.config.latestScenario || null,
})
} catch (error) {
console.log(error)
}
this.outputScenario = ''
resolve(true)
return
}
const step = steps[index]
let repeatCount = Number(step.repeat) || 1
const sendCommand = async () => {
if (repeatCount <= 0) {
// Done → next step
stepIndex++
return runStep(stepIndex)
}
if (step.send) {
this.outputScenario += `\n---send-command---"${step?.send ?? ''}"---${now}---\n`
appendLog(
`\n---send-command---"${step?.send ?? ''}"---${now}---\n`,
this.config.stationId,
this.config.stationName,
this.config.stationIp,
this.config.lineNumber
)
this.writeCommand(step?.send + '\r\n')
}
repeatCount--
setTimeout(() => sendCommand(), Number(step?.delay) || 500)
}
// Nếu expect rỗng → gửi ngay
if (!step?.expect || step?.expect.trim() === '') {
setTimeout(() => sendCommand(), Number(step?.delay) || 500)
return
}
// while (this.outputBuffer) {
// await sleep(200)
// if (this.outputBuffer.includes(step.expect)) {
// this.outputBuffer = ''
// setTimeout(() => sendCommand(), Number(step?.delay) || 500)
// }
// }
const matched = await this.waitForExpect(step.expect, Number(step?.timeout) || 60000)
if (matched) setTimeout(() => sendCommand(), Number(step?.delay) || 500)
}
runStep(stepIndex)
})
}
public async reconnect(): Promise<boolean> {
try {
this.disconnect()
this.client = new net.Socket()
await this.sleep(1000)
await this.connect()
return true
} catch (err: any) {
return false
}
}
userOpenCLI(user: User) {
this.config.openCLI = true
this.config.userEmailOpenCLI = user.userEmail
this.config.userOpenCLI = user.userName
this.socketIO.emit('user_open_cli', {
stationId: this.config.stationId,
lineId: this.config.id,
userEmailOpenCLI: user.userEmail,
userOpenCLI: user.userName,
})
appendLog(
`\n-------${user.userName}-------\n`,
this.config.stationId,
this.config.stationName,
this.config.stationIp,
this.config.lineNumber
)
}
userCloseCLI() {
this.config.openCLI = false
this.config.userEmailOpenCLI = ''
this.config.userOpenCLI = ''
this.socketIO.emit('user_close_cli', {
stationId: this.config.stationId,
lineId: this.config.id,
userEmailOpenCLI: '',
})
}
waitForExpect = async (expect: string, timeout = 60000) => {
const start = Date.now()
while (Date.now() - start < timeout) {
if (this.outputBuffer.includes(expect)) {
this.outputBuffer = ''
return true
}
await sleep(200)
}
return false
}
async apcControl(action: 'on' | 'off' | 'restart') {
try {
const station = await Station.find(this.config.stationId)
if (!station) throw new Error('Station not found')
const apcName = this.config.apcName || 'apc_1'
const ip = (station as any)[`${apcName}_ip`] as string
const port = (station as any)[`${apcName}_port`] as number
const username = (station as any)[`${apcName}_username`] as string
const password = (station as any)[`${apcName}_password`] as string
if (!ip || !port || !username || !password)
throw new Error(`Missing APC configuration for ${apcName}`)
// Tạo APC Controller instance
const apc = new APCController({
host: ip,
port,
username,
password,
number: this.config.lineNumber,
onData: (data: string, status: string) => {
this.config.output += data
this.socketIO.emit('apc_output', {
stationId: this.config.stationId,
lineId: this.config.id,
apcNumber: apcName === 'apc_1' ? 1 : 2,
data,
status,
})
appendLog(
cleanData(data),
this.config.stationId,
this.config.stationName,
this.config.stationIp,
this.config.lineNumber
)
},
})
// Connect và login
await apc.connect()
await apc.login()
// Thực thi hành động
this.socketIO.emit('apc_status', {
stationId: this.config.stationId,
lineId: this.config.id,
action,
status: 'running',
})
switch (action) {
case 'on':
await apc.turnOnOutlet(this.config.outlet)
break
case 'off':
await apc.turnOffOutlet(this.config.outlet)
break
case 'restart':
await apc.restartOutlet(this.config.outlet)
break
}
// Hoàn thành
this.socketIO.emit('apc_status', {
stationId: this.config.stationId,
lineId: this.config.id,
action,
status: 'done',
})
apc.disconnect()
} catch (error) {
const msg = (error as Error).message
console.error('APC Control error:', msg)
this.socketIO.emit('apc_status', {
stationId: this.config.stationId,
lineId: this.config.id,
action,
status: 'error',
message: msg,
})
}
}
getInventory = () => {
const data = textfsmResults(this.outputInventory, 'show inventory')
try {
data.forEach((item) => {
if (item?.textfsm && isValidJson(item?.textfsm)) {
if (['show inventory', 'sh inventory', 'show inv', 'sh inv'].includes(item.command)) {
this.config.inventory = JSON.parse(item.textfsm)[0]
}
item.textfsm = JSON.parse(item.textfsm)
}
})
if (this.config.inventory) {
this.config.data = data
this.socketIO.emit('data_textfsm', {
stationId: this.config.stationId,
lineId: this.config.id,
data,
inventory: this.config.inventory || null,
latestScenario: this.config.latestScenario || null,
})
this.outputInventory = ''
}
} catch (error) {
console.log(error)
}
}
// Gửi nhiều ký tự ESC để vào ROMMON
breakSpam() {
console.log('SPAM Break to line:', this.config.lineNumber)
let count = 0
const escInterval = setInterval(() => {
if (count >= 10) {
clearInterval(escInterval)
return
}
this.client.write(Buffer.from([0xff, 0xf3])) // Ctrl + Break
count++
}, 1)
}
async setBaud(baud: number) {
this.writeCommand('enable\r\n')
await sleep(500)
this.writeCommand('configure terminal\r\n')
await sleep(500)
this.writeCommand('line console 0\r\n')
await sleep(500)
this.writeCommand(`speed ${baud.toString()}\r\n`)
await sleep(500)
this.writeCommand('end\r\n')
await sleep(500)
this.writeCommand('write memory\r\n')
this.writeCommand('\r\n')
}
async getLog(date: string) {
const logDir = path.join('storage', 'system_logs')
const logFile = path
.join(
logDir,
`${date}-AUTO-Session.${this.config.stationName}-${this.config.stationId}-${this.config.stationIp}-${this.config.lineNumber}.log`
)
.replaceAll(' ', '_')
if (!fs.existsSync(logDir) || !fs.existsSync(logFile)) {
return ''
}
return await fs.promises.readFile(logFile, 'utf8')
}
async detectLogWithAI(log: string) {
try {
const payload = {
model: 'gpt-4o-mini',
max_tokens: 1000,
messages: [
{
role: 'user',
content: `Bạn là chuyên gia phân tích log thiết bị mạng.
Hãy phân tích đoạn log sau và xuất kết quả theo đúng format:
${log}
Yêu cầu đầu ra đúng cấu trúc:
status:
(Tóm tắt trạng thái tổng thể của hệ thống trong 24 ý)
issue:
(Tóm tắt cực ngắn gọn các lỗi/dấu hiệu bất thường, mỗi vấn đề 1 dòng, không bắt các vấn đề không quan trọng như về port changed state to up/down hay Invalid input, Incomplete command)
Quy tắc:
Không giải thích dài dòng.
Chỉ tập trung vào lỗi, cảnh báo, thay đổi trạng thái up/down bất thường.
Nếu log không có lỗi → ghi rõ "No issues detected.".
Ngắn gọn, dễ đọc, đúng format
Return only json format with English`,
},
],
}
const remoteUrl = process.env.ERP_URL || 'https://stage.nswteam.net'
const remoteResp = await axios.post(
remoteUrl + '/api/transferPostData',
{
urlAPI: '/api/open-ai-sfp/model-image-info',
data: payload,
},
{
headers: {
Authorization: 'Bearer ' + process.env.ERP_TOKEN,
},
}
)
return remoteResp.data?.Status === 'OK' ? remoteResp.data?.data : ''
} catch (error: any) {
console.log('[ERROR] Detect log from AI', error)
}
return ''
}
async addHistory(stationId: number, lineId: number, item: HistoryItem) {
if (!item.pid || !item.sn) return
const key = `station:${stationId}:line:${lineId}:history`
const now = Date.now()
const newItem = JSON.stringify({
...item,
timestamp: now,
})
// Lấy phần tử cuối
const lastItems = await redis.zrevrange(key, 0, 0)
if (lastItems.length > 0) {
const last = JSON.parse(lastItems[0])
if (last.pid === item.pid && last.sn === item.sn) {
return false // không thay đổi
}
}
const line = await Line.find(lineId)
if (line) {
const listHistory = line.history ? JSON.parse(line.history) : []
listHistory.unshift(newItem)
line.history = JSON.stringify(listHistory)
await line.save()
}
// Add vào ZSET
await redis.zadd(key, now, newItem)
// Tự động xóa item > 96h
const expireTime = now - 96 * 60 * 60 * 1000
await redis.zremrangebyscore(key, 0, expireTime)
return true
}
async getHistory(stationId: number, lineId: number) {
const key = `station:${stationId}:line:${lineId}:history`
const items = await redis.zrange(key, 0, -1)
return items.map((i) => JSON.parse(i))
}
}