update api and sync mesage when send
This commit is contained in:
parent
8241badf0c
commit
6cb5535a09
|
|
@ -12,6 +12,9 @@ dist
|
|||
dist-ssr
|
||||
*.local
|
||||
|
||||
.env
|
||||
.env.*
|
||||
|
||||
# Editor directories and files
|
||||
.vscode/*
|
||||
!.vscode/extensions.json
|
||||
|
|
|
|||
File diff suppressed because one or more lines are too long
|
|
@ -1,84 +1,4 @@
|
|||
/* eslint-disable @typescript-eslint/no-explicit-any */
|
||||
/* eslint-disable @typescript-eslint/no-unused-vars */
|
||||
|
||||
export const EXTENTION_ROOT_ID = "bid-extensions";
|
||||
|
||||
export function extractModelId(url: string) {
|
||||
switch (extractDomain(url)) {
|
||||
case webs.grays: {
|
||||
const match = url.match(/\/lot\/([\d-]+)\//);
|
||||
return match ? match[1] : null;
|
||||
}
|
||||
case webs.langtons: {
|
||||
const match = url.match(/auc-var-\d+/);
|
||||
return match?.[0] || null;
|
||||
}
|
||||
case webs.lawsons: {
|
||||
const match = url.split("_");
|
||||
return match ? match[1] : null;
|
||||
}
|
||||
case webs.pickles: {
|
||||
const model = url.split("/").pop();
|
||||
return model ? model : null;
|
||||
}
|
||||
case webs.allbids: {
|
||||
// eslint-disable-next-line no-useless-escape
|
||||
const match = url.match(/-(\d+)(?:[\?#]|$)/);
|
||||
return match ? match[1] : null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export function extractDomain(url: string) {
|
||||
try {
|
||||
const parsedUrl = new URL(url);
|
||||
return parsedUrl.origin;
|
||||
} catch (error: any) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
export const webs = {
|
||||
grays: "https://www.grays.com",
|
||||
langtons: "https://www.langtons.com.au",
|
||||
lawsons: "https://www.lawsons.com.au",
|
||||
pickles: "https://www.pickles.com.au",
|
||||
allbids: "https://www.allbids.com.au",
|
||||
};
|
||||
|
||||
export const getMode = (data: { metadata: any[] }) => {
|
||||
return (
|
||||
data.metadata.find((item) => item.key_name === "mode_key")?.value || "live"
|
||||
);
|
||||
};
|
||||
|
||||
export const getEarlyTrackingSeconds = (
|
||||
data: { metadata: any; web_bid?: any },
|
||||
outsiteMode = null
|
||||
) => {
|
||||
const mode = outsiteMode ? outsiteMode : getMode(data);
|
||||
|
||||
return (
|
||||
data.metadata.find(
|
||||
(item: { key_name: string }) =>
|
||||
item.key_name === `early_tracking_seconds_${mode}`
|
||||
)?.value || data.web_bid.early_tracking_seconds
|
||||
);
|
||||
};
|
||||
|
||||
export const getArrivalOffsetSeconds = (
|
||||
data: { metadata: any; web_bid?: any },
|
||||
outsiteMode = null
|
||||
) => {
|
||||
const mode = outsiteMode ? outsiteMode : getMode(data);
|
||||
|
||||
return (
|
||||
data.metadata.find(
|
||||
(item: { key_name: string }) =>
|
||||
item.key_name === `arrival_offset_seconds_${mode}`
|
||||
)?.value || data.web_bid.arrival_offset_seconds
|
||||
);
|
||||
};
|
||||
|
||||
export function removeFalsyValues<T extends Record<string, any>>(
|
||||
obj: T,
|
||||
|
|
@ -95,67 +15,3 @@ export function removeFalsyValues<T extends Record<string, any>>(
|
|||
export function delay(ms: number): Promise<void> {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
export function getSecondsFromNow(datetime: {
|
||||
date: Date | undefined;
|
||||
time: string;
|
||||
}): number | null {
|
||||
if (!datetime.date) {
|
||||
return null; // Trả về null nếu date không được chọn
|
||||
}
|
||||
|
||||
// Tách giờ, phút, giây từ time string (HH:mm:ss)
|
||||
const [hours, minutes, seconds] = datetime.time.split(":").map(Number);
|
||||
|
||||
// Tạo Date object mới từ date và time
|
||||
const targetDate = new Date(datetime.date);
|
||||
targetDate.setHours(hours, minutes, seconds, 0);
|
||||
|
||||
// Lấy thời điểm hiện tại
|
||||
const now = new Date();
|
||||
|
||||
// Tính khoảng cách thời gian (mili giây)
|
||||
const diffInMs = targetDate.getTime() - now.getTime();
|
||||
|
||||
// Chuyển sang giây (làm tròn xuống)
|
||||
const diffInSeconds = Math.floor(diffInMs / 1000);
|
||||
|
||||
return diffInSeconds;
|
||||
}
|
||||
|
||||
export function formatTimeFromMinutes(minutes: number): string {
|
||||
// Tính ngày, giờ, phút từ số phút
|
||||
const days = Math.floor(minutes / (60 * 24));
|
||||
const hours = Math.floor((minutes % (60 * 24)) / 60);
|
||||
const mins = minutes % 60;
|
||||
|
||||
let result = "";
|
||||
|
||||
if (days > 0) result += `${days} ${days > 1 ? "days" : "day"} `;
|
||||
if (hours > 0) result += `${hours} ${hours > 1 ? "hours" : "hour"} `;
|
||||
if (mins > 0 || result === "") result += `${mins} minutes`;
|
||||
|
||||
return result.trim();
|
||||
}
|
||||
|
||||
export function getDatetimeFromSeconds(seconds: number): {
|
||||
date: Date | undefined;
|
||||
time: string;
|
||||
} {
|
||||
// Lấy thời điểm hiện tại
|
||||
const now = new Date();
|
||||
|
||||
// Tính thời điểm tương lai bằng cách cộng số giây vào hiện tại
|
||||
const targetDate = new Date(now.getTime() + seconds * 1000);
|
||||
|
||||
// Lấy giờ, phút, giây và định dạng thành chuỗi "HH:mm:ss"
|
||||
const hours = String(targetDate.getHours()).padStart(2, "0");
|
||||
const minutes = String(targetDate.getMinutes()).padStart(2, "0");
|
||||
const secondsStr = String(targetDate.getSeconds()).padStart(2, "0");
|
||||
const time = `${hours}:${minutes}:${secondsStr}`;
|
||||
|
||||
return {
|
||||
date: targetDate,
|
||||
time,
|
||||
};
|
||||
}
|
||||
|
|
|
|||
|
|
@ -71,6 +71,88 @@ export class ContentService {
|
|||
return el?.click();
|
||||
}
|
||||
|
||||
private async _waitForMessagesToAppear(
|
||||
timeoutMs = 10000
|
||||
): Promise<HTMLElement[]> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const chatPaneList = document.getElementById("chat-pane-list");
|
||||
let timeout: any = null;
|
||||
if (!chatPaneList) {
|
||||
return reject(new Error("#chat-pane-list not found"));
|
||||
}
|
||||
|
||||
const getChildren = () =>
|
||||
Array.from(chatPaneList.children) as HTMLElement[];
|
||||
|
||||
const checkAndResolve = () => {
|
||||
const children = getChildren();
|
||||
if (children.length > 0) {
|
||||
observer.disconnect();
|
||||
if (timeout) clearTimeout(timeout);
|
||||
resolve(children);
|
||||
}
|
||||
};
|
||||
|
||||
const observer = new MutationObserver(() => {
|
||||
checkAndResolve();
|
||||
});
|
||||
|
||||
observer.observe(chatPaneList, {
|
||||
childList: true,
|
||||
subtree: true,
|
||||
});
|
||||
|
||||
// Kiểm tra ngay lập tức nếu đã có sẵn item
|
||||
checkAndResolve();
|
||||
|
||||
timeout = setTimeout(() => {
|
||||
observer.disconnect();
|
||||
reject(new Error("Timeout waiting for chat messages to appear"));
|
||||
}, timeoutMs);
|
||||
});
|
||||
}
|
||||
|
||||
private async _waitForNewMessages(
|
||||
existingItems: HTMLElement[],
|
||||
timeoutMs = 10000
|
||||
): Promise<HTMLElement[]> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const chatPaneList = document.getElementById("chat-pane-list");
|
||||
|
||||
if (!chatPaneList) {
|
||||
return reject(new Error("#chat-pane-list not found"));
|
||||
}
|
||||
|
||||
const getChildren = () =>
|
||||
Array.from(chatPaneList.children) as HTMLElement[];
|
||||
|
||||
const existingSet = new Set(existingItems);
|
||||
|
||||
let timeoutHandle: any = null;
|
||||
|
||||
const observer = new MutationObserver(() => {
|
||||
const currentChildren = getChildren();
|
||||
const newItems = currentChildren.filter((el) => !existingSet.has(el));
|
||||
|
||||
if (newItems.length > 0) {
|
||||
observer.disconnect();
|
||||
if (timeoutHandle) clearTimeout(timeoutHandle);
|
||||
resolve(newItems);
|
||||
}
|
||||
});
|
||||
|
||||
observer.observe(chatPaneList, {
|
||||
childList: true,
|
||||
subtree: true,
|
||||
});
|
||||
|
||||
timeoutHandle = setTimeout(() => {
|
||||
observer.disconnect();
|
||||
reject(new Error("Timeout waiting for new messages"));
|
||||
}, timeoutMs);
|
||||
});
|
||||
}
|
||||
|
||||
private async _waitToloadMessages(stableTime = 300): Promise<any> {
|
||||
return new Promise((resolve) => {
|
||||
const chatPaneList = document.getElementById("chat-pane-list");
|
||||
|
|
@ -246,6 +328,7 @@ export class ContentService {
|
|||
if (room_id != msg.data.id) {
|
||||
this._clickToConversation(msg.data.id);
|
||||
|
||||
await this._waitForMessagesToAppear();
|
||||
await this._waitToloadMessages();
|
||||
}
|
||||
|
||||
|
|
@ -275,6 +358,27 @@ export class ContentService {
|
|||
await delay(200);
|
||||
|
||||
await typeingService.send(message);
|
||||
|
||||
// Sroll xuống
|
||||
this.service._scrollToBottomByXPath();
|
||||
|
||||
const initialMessages = await this._waitForMessagesToAppear(3000); // Đợi có item đầu tiên
|
||||
console.log("Initial messages:", initialMessages);
|
||||
|
||||
const newMessages = await this._waitForNewMessages(initialMessages, 3000);
|
||||
console.log("New messages appeared:", newMessages);
|
||||
|
||||
await this._waitToloadMessages();
|
||||
|
||||
// Lấy hết message mới nhất
|
||||
const data = this.service.extractAllMessages();
|
||||
|
||||
// Gửi lên server cập nhật
|
||||
this.port.postMessage({
|
||||
type: "socket-response",
|
||||
event: EVENTS.RECEIVE_CONVERSATION,
|
||||
data,
|
||||
} as IMsg<any>);
|
||||
});
|
||||
}
|
||||
|
||||
|
|
@ -284,12 +388,14 @@ export class ContentService {
|
|||
queue.add(async () => {
|
||||
console.log("[Queue] Handling REPLY_MESSAGE");
|
||||
|
||||
let initialMessages = null;
|
||||
const { room_id } = this.service.getCurrentRoomInfo();
|
||||
|
||||
// Nếu đang active room thì không cần chờ load
|
||||
if (room_id != conversation_id) {
|
||||
this._clickToConversation(conversation_id);
|
||||
|
||||
initialMessages = await this._waitForMessagesToAppear(); // Đợi có item đầu tiên
|
||||
await this._waitToloadMessages();
|
||||
}
|
||||
|
||||
|
|
@ -310,6 +416,27 @@ export class ContentService {
|
|||
console.log({ message });
|
||||
|
||||
await typeingService.send(message);
|
||||
|
||||
if (!initialMessages) return;
|
||||
|
||||
// Sroll xuống
|
||||
this.service._scrollToBottomByXPath();
|
||||
|
||||
// Theo dỗi lấy detech new message
|
||||
const newMessages = await this._waitForNewMessages(initialMessages, 3000);
|
||||
console.log("New messages appeared:", newMessages);
|
||||
|
||||
await this._waitToloadMessages();
|
||||
|
||||
// Lấy hết message mới nhất
|
||||
const data = this.service.extractAllMessages();
|
||||
|
||||
// Gửi lên server cập nhật
|
||||
this.port.postMessage({
|
||||
type: "socket-response",
|
||||
event: EVENTS.RECEIVE_CONVERSATION,
|
||||
data,
|
||||
} as IMsg<any>);
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -210,7 +210,7 @@ export class TeamsChatService {
|
|||
return data;
|
||||
}
|
||||
|
||||
private async _scrollToBottomByXPath(
|
||||
public async _scrollToBottomByXPath(
|
||||
xpath: string = this.elTags.container_scroll,
|
||||
options?: {
|
||||
maxStableRounds?: number; // Số vòng scroll không thay đổi trước khi dừng
|
||||
|
|
|
|||
|
|
@ -17,15 +17,17 @@ class TypingService {
|
|||
): Promise<{ status: boolean; typed: string } | null> {
|
||||
if (!this.axios) return null;
|
||||
|
||||
const { data } = await this.axios({
|
||||
method: "POST",
|
||||
url: "type",
|
||||
data: {
|
||||
message,
|
||||
},
|
||||
});
|
||||
// const { data } = await this.axios({
|
||||
// method: "POST",
|
||||
// url: "type",
|
||||
// data: {
|
||||
// message,
|
||||
// },
|
||||
// });
|
||||
|
||||
return data;
|
||||
// return data;
|
||||
|
||||
return { status: true, typed: message };
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -25,6 +25,6 @@ export class Conversation {
|
|||
@UpdateDateColumn()
|
||||
updated_at: Date;
|
||||
|
||||
@OneToMany(() => Message, (message) => message.group)
|
||||
@OneToMany(() => Message, (message) => message.conversation)
|
||||
messages: Message[];
|
||||
}
|
||||
|
|
|
|||
|
|
@ -47,5 +47,5 @@ export class Message {
|
|||
onDelete: 'CASCADE',
|
||||
})
|
||||
@JoinColumn({ name: 'room_id', referencedColumnName: 'id' })
|
||||
group: Conversation;
|
||||
conversation: Conversation;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,3 @@
|
|||
export function delay(ms: number): Promise<void> {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||
}
|
||||
|
|
@ -1,33 +1,34 @@
|
|||
import { Conversation } from '@/entities/conversation.entity';
|
||||
import AppResponse from '@/system/filters/response/app-response';
|
||||
import { SystemLang } from '@/system/lang/system.lang';
|
||||
import { Injectable, NotFoundException } from '@nestjs/common';
|
||||
import { InjectRepository } from '@nestjs/typeorm';
|
||||
import { paginate, PaginateQuery } from 'nestjs-paginate';
|
||||
import { Repository } from 'typeorm';
|
||||
import { MessagesListener } from '../messages/messages.listener';
|
||||
import { SystemLang } from '@/system/lang/system.lang';
|
||||
import { Message } from '@/entities/message.entity';
|
||||
import { MessagesService } from '../messages/messages.service';
|
||||
import { CreateMessageDto } from '../messages/dtos/create-message.dto';
|
||||
import { MessagesEventService } from '../messages/messages-event.service';
|
||||
import { MessagesService } from '../messages/messages.service';
|
||||
import { delay } from '@/features/delay';
|
||||
|
||||
@Injectable()
|
||||
export class ConversationsService {
|
||||
constructor(
|
||||
@InjectRepository(Conversation)
|
||||
readonly repo: Repository<Conversation>,
|
||||
private event: MessagesListener,
|
||||
private event: MessagesEventService,
|
||||
private messageService: MessagesService,
|
||||
) {}
|
||||
|
||||
async index(query: PaginateQuery) {
|
||||
await this.event.sendEvent(MessagesListener.EVENTS.GET_CONVERSATIONS, {});
|
||||
await this.event.sendEvent(
|
||||
MessagesEventService.EVENTS.GET_CONVERSATIONS,
|
||||
{},
|
||||
);
|
||||
|
||||
try {
|
||||
const data = await this.event.waitForEvent<Conversation[]>(
|
||||
MessagesListener.EVENTS.RECEIVE_CONVERSATIONS,
|
||||
await this.event.waitForEvent<Conversation[]>(
|
||||
MessagesEventService.LOCAL_EVENTS.RECEIVE_CONVERSATIONS,
|
||||
);
|
||||
|
||||
await this.repo.save(data);
|
||||
} catch (error) {
|
||||
console.log(error);
|
||||
}
|
||||
|
|
@ -47,13 +48,13 @@ export class ConversationsService {
|
|||
}
|
||||
|
||||
async getConversation(id: Conversation['id']) {
|
||||
await this.event.sendEvent(MessagesListener.EVENTS.GET_CONVERSATION, {
|
||||
await this.event.sendEvent(MessagesEventService.EVENTS.GET_CONVERSATION, {
|
||||
id,
|
||||
});
|
||||
|
||||
try {
|
||||
const data = await this.event.waitForEvent<CreateMessageDto[]>(
|
||||
MessagesListener.EVENTS.RECEIVE_CONVERSATION,
|
||||
MessagesEventService.LOCAL_EVENTS.RECEIVE_CONVERSATION,
|
||||
);
|
||||
|
||||
await this.messageService.bulkCreate(data);
|
||||
|
|
@ -68,6 +69,9 @@ export class ConversationsService {
|
|||
const messages = await this.messageService.repo.find({
|
||||
where: { room_id: result.id },
|
||||
take: 20,
|
||||
order: {
|
||||
time_raw: 'DESC',
|
||||
},
|
||||
});
|
||||
|
||||
result.messages = messages;
|
||||
|
|
|
|||
|
|
@ -0,0 +1,57 @@
|
|||
// user.listener.ts
|
||||
import { Message } from '@/entities/message.entity';
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { EventEmitter2 } from '@nestjs/event-emitter';
|
||||
import { InjectRepository } from '@nestjs/typeorm';
|
||||
import { Repository } from 'typeorm';
|
||||
|
||||
@Injectable()
|
||||
export class MessagesEventService {
|
||||
public static EVENTS = {
|
||||
GET_CONVERSATIONS: 'messages.get-conversations',
|
||||
GET_CONVERSATION: 'messages.get-conversation',
|
||||
RECEIVE_CONVERSATIONS: 'messages.receive-conversations',
|
||||
RECEIVE_CONVERSATION: 'messages.receive-conversation',
|
||||
SEND_MESSAGE: 'messages.send-messsage',
|
||||
REPLY_MESSAGE: 'messages.reply-messsage',
|
||||
};
|
||||
|
||||
public static LOCAL_EVENTS = {
|
||||
GET_CONVERSATIONS: 'local_messages.get-conversations',
|
||||
GET_CONVERSATION: 'local_messages.get-conversation',
|
||||
RECEIVE_CONVERSATIONS: 'local_messages.receive-conversations',
|
||||
RECEIVE_CONVERSATION: 'local_messages.receive-conversation',
|
||||
SEND_MESSAGE: 'local_messages.send-messsage',
|
||||
REPLY_MESSAGE: 'local_messages.reply-messsage',
|
||||
};
|
||||
|
||||
constructor(
|
||||
private event: EventEmitter2,
|
||||
@InjectRepository(Message)
|
||||
readonly repo: Repository<Message>,
|
||||
) {}
|
||||
|
||||
public async waitForEvent<T>(
|
||||
eventName: string,
|
||||
timeoutMs = 10000,
|
||||
): Promise<T> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const timer = setTimeout(() => {
|
||||
this.event.off(eventName, listener); // cleanup
|
||||
resolve(null); // hoặc reject(new Error('Timeout')) nếu muốn
|
||||
}, timeoutMs);
|
||||
|
||||
const listener = (data: any) => {
|
||||
clearTimeout(timer);
|
||||
this.event.off(eventName, listener); // cleanup
|
||||
resolve(data);
|
||||
};
|
||||
|
||||
this.event.on(eventName, listener);
|
||||
});
|
||||
}
|
||||
|
||||
public async sendEvent<T>(eventName: string, data: T) {
|
||||
return this.event.emit(eventName, data);
|
||||
}
|
||||
}
|
||||
|
|
@ -4,14 +4,15 @@ import { CreateMessageDto } from './dtos/create-message.dto';
|
|||
import { CreateBulkMessageDto } from './dtos/create-bulk-message.dto';
|
||||
import { SendMessageDto } from './dtos/send-message.dto';
|
||||
import { ReplyMessageDto } from './dtos/reply-message.dto';
|
||||
import { Paginate, PaginateQuery } from 'nestjs-paginate';
|
||||
|
||||
@Controller('messages')
|
||||
export class MessagesController {
|
||||
constructor(private readonly service: MessagesService) {}
|
||||
|
||||
@Post()
|
||||
create(@Body() dto: CreateMessageDto) {
|
||||
return this.service.create(dto);
|
||||
@Get('')
|
||||
index(@Paginate() query: PaginateQuery) {
|
||||
return this.service.index(query);
|
||||
}
|
||||
|
||||
@Post('send-message')
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ import {
|
|||
WebSocketServer,
|
||||
} from '@nestjs/websockets';
|
||||
import { Server, Socket } from 'socket.io';
|
||||
import { MessagesListener } from './messages.listener';
|
||||
import { MessagesEventService } from './messages-event.service';
|
||||
|
||||
@WebSocketGateway({
|
||||
cors: {
|
||||
|
|
@ -22,10 +22,10 @@ export class MessagesGateway implements OnGatewayConnection {
|
|||
|
||||
async onModuleInit() {
|
||||
const eventsToForward = [
|
||||
MessagesListener.EVENTS.GET_CONVERSATIONS,
|
||||
MessagesListener.EVENTS.GET_CONVERSATION,
|
||||
MessagesListener.EVENTS.SEND_MESSAGE,
|
||||
MessagesListener.EVENTS.REPLY_MESSAGE,
|
||||
MessagesEventService.EVENTS.GET_CONVERSATIONS,
|
||||
MessagesEventService.EVENTS.GET_CONVERSATION,
|
||||
MessagesEventService.EVENTS.SEND_MESSAGE,
|
||||
MessagesEventService.EVENTS.REPLY_MESSAGE,
|
||||
];
|
||||
|
||||
for (const event of eventsToForward) {
|
||||
|
|
@ -38,13 +38,13 @@ export class MessagesGateway implements OnGatewayConnection {
|
|||
console.log(`📢 Client connected: ${client.id}`);
|
||||
}
|
||||
|
||||
@SubscribeMessage(MessagesListener.EVENTS.RECEIVE_CONVERSATIONS)
|
||||
@SubscribeMessage(MessagesEventService.EVENTS.RECEIVE_CONVERSATIONS)
|
||||
async handleReiveConversations(@MessageBody() data: any) {
|
||||
this.event.emit(MessagesListener.EVENTS.RECEIVE_CONVERSATIONS, data);
|
||||
this.event.emit(MessagesEventService.EVENTS.RECEIVE_CONVERSATIONS, data);
|
||||
}
|
||||
|
||||
@SubscribeMessage(MessagesListener.EVENTS.RECEIVE_CONVERSATION)
|
||||
@SubscribeMessage(MessagesEventService.EVENTS.RECEIVE_CONVERSATION)
|
||||
async handleReiveConversation(@MessageBody() data: any) {
|
||||
this.event.emit(MessagesListener.EVENTS.RECEIVE_CONVERSATION, data);
|
||||
this.event.emit(MessagesEventService.EVENTS.RECEIVE_CONVERSATION, data);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,41 +1,43 @@
|
|||
// user.listener.ts
|
||||
// messages.listener.ts
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { EventEmitter2 } from '@nestjs/event-emitter';
|
||||
import { OnEvent } from '@nestjs/event-emitter';
|
||||
import { MessagesService } from './messages.service';
|
||||
import { CreateMessageDto } from './dtos/create-message.dto';
|
||||
import { MessagesEventService } from './messages-event.service';
|
||||
import { InjectRepository } from '@nestjs/typeorm';
|
||||
import { Conversation } from '@/entities/conversation.entity';
|
||||
import { Repository } from 'typeorm';
|
||||
|
||||
@Injectable()
|
||||
export class MessagesListener {
|
||||
public static EVENTS = {
|
||||
GET_CONVERSATIONS: 'messages.get-conversations',
|
||||
GET_CONVERSATION: 'messages.get-conversation',
|
||||
RECEIVE_CONVERSATIONS: 'messages.receive-conversations',
|
||||
RECEIVE_CONVERSATION: 'messages.receive-conversation',
|
||||
SEND_MESSAGE: 'messages.send-messsage',
|
||||
REPLY_MESSAGE: 'messages.reply-messsage',
|
||||
};
|
||||
constructor(
|
||||
private service: MessagesService,
|
||||
private eventService: MessagesEventService,
|
||||
@InjectRepository(Conversation)
|
||||
readonly repoConversation: Repository<Conversation>,
|
||||
) {}
|
||||
|
||||
constructor(private event: EventEmitter2) {}
|
||||
// Tự động được gọi khi emit 'messages.receive-conversation'
|
||||
@OnEvent(MessagesEventService.EVENTS.RECEIVE_CONVERSATION)
|
||||
async handleReceiveConversation(payload: CreateMessageDto[]) {
|
||||
const result = await this.service.bulkCreate(payload);
|
||||
|
||||
public async waitForEvent<T>(
|
||||
eventName: string,
|
||||
timeoutMs = 10000,
|
||||
): Promise<T> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const timer = setTimeout(() => {
|
||||
this.event.off(eventName, listener); // cleanup
|
||||
resolve(null); // hoặc reject(new Error('Timeout')) nếu muốn
|
||||
}, timeoutMs);
|
||||
|
||||
const listener = (data: any) => {
|
||||
clearTimeout(timer);
|
||||
this.event.off(eventName, listener); // cleanup
|
||||
resolve(data);
|
||||
};
|
||||
|
||||
this.event.on(eventName, listener);
|
||||
});
|
||||
// Send local event
|
||||
this.eventService.sendEvent(
|
||||
MessagesEventService.LOCAL_EVENTS.RECEIVE_CONVERSATION,
|
||||
result,
|
||||
);
|
||||
}
|
||||
|
||||
public async sendEvent<T>(eventName: string, data: T) {
|
||||
return this.event.emit(eventName, data);
|
||||
// Tự động được gọi khi emit 'messages.receive-conversations'
|
||||
@OnEvent(MessagesEventService.EVENTS.RECEIVE_CONVERSATIONS)
|
||||
async handleReceiveConversations(data: Conversation[]) {
|
||||
const result = await this.repoConversation.save(data);
|
||||
|
||||
// Send local event
|
||||
this.eventService.sendEvent(
|
||||
MessagesEventService.LOCAL_EVENTS.RECEIVE_CONVERSATIONS,
|
||||
result,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,17 +1,22 @@
|
|||
import { Module } from '@nestjs/common';
|
||||
import { MessagesService } from './messages.service';
|
||||
import { TypeOrmModule } from '@nestjs/typeorm';
|
||||
import { Conversation } from '@/entities/conversation.entity';
|
||||
import { Message } from '@/entities/message.entity';
|
||||
import { Module } from '@nestjs/common';
|
||||
import { TypeOrmModule } from '@nestjs/typeorm';
|
||||
import { MessagesEventService } from './messages-event.service';
|
||||
import { MessagesController } from './messages.controller';
|
||||
import { MessagesGateway } from './messages.gateway';
|
||||
import { MessagesService } from './messages.service';
|
||||
import { MessagesListener } from './messages.listener';
|
||||
import { EventEmitterModule } from '@nestjs/event-emitter';
|
||||
import { Conversation } from '@/entities/conversation.entity';
|
||||
|
||||
@Module({
|
||||
imports: [TypeOrmModule.forFeature([Message, Conversation])],
|
||||
providers: [MessagesService, MessagesGateway, MessagesListener],
|
||||
providers: [
|
||||
MessagesService,
|
||||
MessagesGateway,
|
||||
MessagesEventService,
|
||||
MessagesListener,
|
||||
],
|
||||
controllers: [MessagesController],
|
||||
exports: [MessagesGateway, MessagesService, MessagesListener],
|
||||
exports: [MessagesGateway, MessagesService, MessagesEventService],
|
||||
})
|
||||
export class MessagesModule {}
|
||||
|
|
|
|||
|
|
@ -1,20 +1,15 @@
|
|||
import { Conversation } from '@/entities/conversation.entity';
|
||||
import { Message } from '@/entities/message.entity';
|
||||
import AppResponse from '@/system/filters/response/app-response';
|
||||
import { SystemLang } from '@/system/lang/system.lang';
|
||||
import {
|
||||
BadRequestException,
|
||||
HttpStatus,
|
||||
Injectable,
|
||||
NotFoundException,
|
||||
} from '@nestjs/common';
|
||||
import { HttpStatus, Injectable, NotFoundException } from '@nestjs/common';
|
||||
import { InjectRepository } from '@nestjs/typeorm';
|
||||
import { In, Repository } from 'typeorm';
|
||||
import { paginate, PaginateQuery } from 'nestjs-paginate';
|
||||
import { Repository } from 'typeorm';
|
||||
import { CreateMessageDto } from './dtos/create-message.dto';
|
||||
import { MessagesListener } from './messages.listener';
|
||||
import { Conversation } from '@/entities/conversation.entity';
|
||||
import { SendMessageDto } from './dtos/send-message.dto';
|
||||
import { ConversationsService } from '../conversations/conversations.service';
|
||||
import { ReplyMessageDto } from './dtos/reply-message.dto';
|
||||
import { SendMessageDto } from './dtos/send-message.dto';
|
||||
import { MessagesEventService } from './messages-event.service';
|
||||
@Injectable()
|
||||
export class MessagesService {
|
||||
constructor(
|
||||
|
|
@ -22,9 +17,29 @@ export class MessagesService {
|
|||
readonly repo: Repository<Message>,
|
||||
@InjectRepository(Conversation)
|
||||
readonly conversationRepo: Repository<Conversation>,
|
||||
private event: MessagesListener,
|
||||
private event: MessagesEventService,
|
||||
) {}
|
||||
|
||||
async index(query: PaginateQuery) {
|
||||
const result = await paginate(query, this.repo, {
|
||||
sortableColumns: ['created_at'],
|
||||
searchableColumns: ['message'],
|
||||
defaultLimit: 10,
|
||||
filterableColumns: {
|
||||
id: true,
|
||||
time: true,
|
||||
message: true,
|
||||
updated_at: true,
|
||||
created_at: true,
|
||||
'conversation.id': true,
|
||||
},
|
||||
maxLimit: 100,
|
||||
defaultSortBy: [['time_raw', 'DESC']],
|
||||
});
|
||||
|
||||
return AppResponse.toPagination<Message>(result, true, Message);
|
||||
}
|
||||
|
||||
async create(dto: CreateMessageDto): Promise<Message> {
|
||||
const time = new Date(dto.time);
|
||||
|
||||
|
|
@ -111,7 +126,7 @@ export class MessagesService {
|
|||
}),
|
||||
);
|
||||
|
||||
this.event.sendEvent(MessagesListener.EVENTS.SEND_MESSAGE, data);
|
||||
this.event.sendEvent(MessagesEventService.EVENTS.SEND_MESSAGE, data);
|
||||
|
||||
return AppResponse.toResponse({
|
||||
conversation,
|
||||
|
|
@ -136,7 +151,7 @@ export class MessagesService {
|
|||
}),
|
||||
);
|
||||
|
||||
this.event.sendEvent(MessagesListener.EVENTS.REPLY_MESSAGE, data);
|
||||
this.event.sendEvent(MessagesEventService.EVENTS.REPLY_MESSAGE, data);
|
||||
|
||||
return AppResponse.toResponse({
|
||||
conversation,
|
||||
|
|
|
|||
Loading…
Reference in New Issue