Log_service/app/utils/runtimeCheckLogs.ts

383 lines
11 KiB
TypeScript

import Env from "@ioc:Adonis/Core/Env";
import Database from "@ioc:Adonis/Lucid/Database";
import KeyValue from "App/Models/KeyValue";
import LogDetectFile from "App/Models/LogDetectFile";
import LogReport from "App/Models/LogReport";
import chokidar from "chokidar";
import fs from "fs";
import readline from "readline";
import { DateTime } from "luxon";
import moment from "moment";
import { checkIndexSN } from "./checkIndexSN";
import { getListLineByItem } from "./getListLineByItem";
import { sendMessToZulip } from "./sendMessToZulip";
import { checkSpecialVersion } from "./helper";
/**
* ======================================================
* CONSTANTS
* ======================================================
*/
const MAX_LINES_PER_BATCH = 5000; // Process max 5k lines at a time
const MAX_INSERTS_PER_BATCH = 500; // Insert max 500 records per transaction
/**
* ======================================================
* KEY VALUE CACHE (GLOBAL)
* ======================================================
*/
let keyValueCache: Record<string, string[]> | null = null;
let keyValueCacheAt = 0;
const CACHE_TTL = 60_000;
async function getKeyValueCached() {
if (keyValueCache && Date.now() - keyValueCacheAt < CACHE_TTL) {
return keyValueCache;
}
const rows = await KeyValue.query().select("key", "value");
const map: Record<string, string[]> = {};
for (const r of rows) {
if (!map[r.key]) map[r.key] = [];
map[r.key].push(r.value);
}
keyValueCache = map;
keyValueCacheAt = Date.now();
return map;
}
/**
* ======================================================
* FILE LOCK + DEBOUNCE
* ======================================================
*/
const fileLocks = new Set<string>();
const debounceMap = new Map<string, NodeJS.Timeout>();
async function safeProcessFileChange(filePath: string) {
if (fileLocks.has(filePath)) return;
fileLocks.add(filePath);
try {
await processFileChange(filePath);
} catch (err) {
console.error("processFileChange error:", err);
} finally {
fileLocks.delete(filePath);
}
}
/**
* ======================================================
* STREAM-BASED FILE READER (MEMORY EFFICIENT)
* ======================================================
*/
async function readLinesFromPosition(
filePath: string,
startLine: number,
maxLines: number = MAX_LINES_PER_BATCH
): Promise<Array<{ lineNo: number; content: string }>> {
return new Promise((resolve, reject) => {
const lines: Array<{ lineNo: number; content: string }> = [];
let currentLine = 0;
const stream = fs.createReadStream(filePath, { encoding: "utf8" });
const rl = readline.createInterface({
input: stream,
crlfDelay: Infinity,
});
rl.on("line", (line) => {
currentLine++;
if (currentLine > startLine) {
lines.push({ lineNo: currentLine, content: line });
// ✅ FIX: Stop reading after maxLines to prevent memory overflow
if (lines.length >= maxLines) {
rl.close();
stream.destroy();
}
}
});
rl.on("close", () => resolve(lines));
rl.on("error", reject);
stream.on("error", reject);
});
}
/**
* ======================================================
* CORE LOGIC (FIXED FOR STACK OVERFLOW)
* ======================================================
*/
async function processFileChange(filePath: string) {
const fileName = filePath.split("/").pop()!;
const fileDetect = await LogDetectFile
.query()
.where("file_name", fileName)
.first();
if (!fileDetect) return;
const existedReports = await LogReport
.query()
.select("line", "detected_content")
.where("id_file", fileDetect.id_ldf);
const existedSet = new Set(
existedReports.map(r => `${r.line}-${r.detected_content}`)
);
let lastLine = Math.max(0, ...existedReports.map(r => r.line));
const keyMap = await getKeyValueCached();
const MODEL_SPECIAL = keyMap.MODEL_SPECIAL || [];
const CATCH_FAULTY = keyMap.CATCH_FAULTY || [];
const EXCLUDE_ERR = keyMap.EXCLUDE_ERR || [];
// ✅ FIX: Combine arrays safely (avoid spread operator with large arrays)
const keysToCheck: string[] = [];
for (const key of MODEL_SPECIAL) keysToCheck.push(key);
for (const key of CATCH_FAULTY) keysToCheck.push(key);
let totalNewLines = 0;
let totalInserts = 0;
let hasMoreLines = true;
// ✅ FIX: Process file in batches
while (hasMoreLines) {
// Read batch of lines
const newLines = await readLinesFromPosition(filePath, lastLine, MAX_LINES_PER_BATCH);
if (newLines.length === 0) {
break; // No more lines
}
totalNewLines += newLines.length;
// For checkIndexSN - only call once on first batch
if (lastLine === Math.max(0, ...existedReports.map(r => r.line))) {
const contentFileForSN = fs.readFileSync(filePath, "utf8").split("\n");
checkIndexSN(contentFileForSN, lastLine, fileName);
}
const inserts: any[] = [];
// Process each line in the batch
for (const { lineNo, content: line } of newLines) {
if (!line) continue;
if (EXCLUDE_ERR.some(e => line.includes(e))) continue;
// ✅ FIX: Use normal loop instead of spread operator
for (const key of keysToCheck) {
if (line.includes(key)) {
const uniq = `${lineNo}-${key}`;
if (!existedSet.has(uniq)) {
existedSet.add(uniq);
inserts.push({
id_file: fileDetect.id_ldf,
line: lineNo,
detected_content: key,
});
}
}
}
const version = checkSpecialVersion(line);
if (version) {
const uniq = `${lineNo}-${version}`;
if (!existedSet.has(uniq)) {
existedSet.add(uniq);
inserts.push({
id_file: fileDetect.id_ldf,
line: lineNo,
detected_content: version,
});
}
}
}
// ✅ FIX: Insert in smaller sub-batches to prevent transaction timeout
if (inserts.length > 0) {
totalInserts += inserts.length;
for (let i = 0; i < inserts.length; i += MAX_INSERTS_PER_BATCH) {
const subBatch = inserts.slice(i, i + MAX_INSERTS_PER_BATCH);
let trx;
try {
trx = await Database.transaction();
await LogReport.createMany(subBatch, { client: trx });
await trx.commit();
} catch (error: any) {
if (trx) await trx.rollback();
// Check for disk space errors
if (error.code === 'EE_WRITE' || error.errno === 3 ||
(error.sqlMessage && error.sqlMessage.includes('No space left'))) {
console.error("💥 CRITICAL: Disk full! Cannot write to database.");
try {
await sendMessToZulip(
"stream",
Env.get("ZULIP_STREAM_ALERT"),
Env.get("ZULIP_TOPIC_ALERT"),
`🚨 **CRITICAL DISK FULL ERROR**\n\nFile: ${fileName}\nError: ${error.sqlMessage}\n\n**ACTION REQUIRED: Clean disk space immediately!**`
);
} catch (alertError) {
console.error("Failed to send critical alert:", alertError);
}
return; // Stop processing
}
console.error("Database transaction error:", error);
throw error;
}
}
// Send notification for this batch
await sendBatchNotification(fileName, inserts, filePath, MODEL_SPECIAL);
}
// Update lastLine for next iteration
lastLine = newLines[newLines.length - 1].lineNo;
// Check if we got fewer lines than max, meaning we've reached the end
if (newLines.length < MAX_LINES_PER_BATCH) {
hasMoreLines = false;
}
}
if (totalInserts === 0) {
console.log(`${fileName} (${totalNewLines} new lines) --- Good`);
} else {
console.log(`${fileName} - Processed ${totalNewLines} lines, found ${totalInserts} issues`);
}
}
/**
* ======================================================
* NOTIFICATION HELPER
* ======================================================
*/
async function sendBatchNotification(
fileName: string,
inserts: any[],
filePath: string,
MODEL_SPECIAL: string[]
) {
try {
// Read file content for building notification
const contentFile = fs.readFileSync(filePath, "utf8").split("\n");
const listReport = await getListLineByItem(inserts);
let table =
"| |Last updated at | Item/error | Line | Report |\n" +
"|---|:---:|:---|:---|:-----------:|\n";
let issueFound = "";
listReport.forEach((log, index) => {
const item = MODEL_SPECIAL.includes(log.detected_content)
? `:medal: **${log.detected_content}**`
: `:small_orange_diamond: ${log.detected_content}`;
log.line.forEach((ln: number) => {
const lineContent = contentFile[ln - 1];
if (lineContent) {
issueFound +=
`\n\`${ln}\` ` +
lineContent.replace(
log.detected_content,
`[${log.detected_content}](https://logs.danielvu.com/logs/${fileName}#${ln})`
);
}
});
table +=
`|${index + 1}|**${moment().format("HH:mm - DD/MM")}**|${item}|${log.line}` +
`|[View](https://logs.danielvu.com/logs/${fileName}#${log.line})|\n`;
});
const icon =
checkSpecialVersion(table) !== ""
? `------------\n\n:no_entry: :no_entry:**${fileName}**:no_entry: :no_entry:`
: `------------\n\n:warning: :warning: **${fileName}**`;
await sendMessToZulip(
"stream",
Env.get("ZULIP_STREAM_ALERT"),
Env.get("ZULIP_TOPIC_ALERT"),
`${icon}\n\n${table}\n\n***Issue found:***\n${issueFound}`
);
} catch (error) {
console.error("Zulip notification error:", error);
// Don't throw - notification failure shouldn't break the process
}
}
/**
* ======================================================
* MAIN ENTRY
* ======================================================
*/
export async function runtimeCheckLogs(folderPath: string) {
const watcher = chokidar.watch(folderPath, {
persistent: true,
ignoreInitial: true,
depth: 0,
awaitWriteFinish: {
stabilityThreshold: 2000,
pollInterval: 100,
},
});
watcher.on("add", async (filePath) => {
if (!filePath.endsWith(".log")) return;
const fileName = filePath.split("/").pop()!;
const today = moment().format("YYYYMMDD");
if (!fileName.startsWith(today)) return;
try {
await LogDetectFile.firstOrCreate(
{ file_name: fileName },
{ file_name: fileName }
);
console.log("New log file:", fileName);
// ⬇️ xử lý luôn file mới
setTimeout(() => {
safeProcessFileChange(filePath);
}, 2000); // đợi file ổn định
} catch (error) {
console.error("Error creating LogDetectFile:", error);
}
});
watcher.on("change", (filePath) => {
if (!filePath.endsWith(".log")) return;
clearTimeout(debounceMap.get(filePath));
debounceMap.set(
filePath,
setTimeout(() => {
safeProcessFileChange(filePath);
}, 1500) // ✅ Increased debounce to reduce load
);
});
watcher.on("error", (err) => {
console.error("Watcher error:", err);
});
console.log("Log watcher started:", folderPath);
}