From 2094d27b1d0c9a2d13ab0dc8d1c0b58ed7b5c3d7 Mon Sep 17 00:00:00 2001 From: YuShifan <894402575bt@gmail.com> Date: Fri, 13 Dec 2024 15:16:36 +0800 Subject: [PATCH] fix(desktop: optimize message rendering with dynamic buffer --- src/utils/constant.ts | 3 + src/views/connections/ConnectionsDetail.vue | 87 ++++++++++++--------- 2 files changed, 53 insertions(+), 37 deletions(-) diff --git a/src/utils/constant.ts b/src/utils/constant.ts index 556c6c164..7a15b9e5f 100644 --- a/src/utils/constant.ts +++ b/src/utils/constant.ts @@ -12,3 +12,6 @@ export const MAX_MESSAGES_COUNT = 40 // Maximum scroll offset in pixels to trigger loading more messages export const SCROLL_OFFSET_MAX_NUM = 100 + +// Messages buffer time in milliseconds +export const MESSAGES_BUFFER_TIME = 1000 diff --git a/src/views/connections/ConnectionsDetail.vue b/src/views/connections/ConnectionsDetail.vue index 2c82d2d7d..4a8d0dd90 100644 --- a/src/views/connections/ConnectionsDetail.vue +++ b/src/views/connections/ConnectionsDetail.vue @@ -330,7 +330,7 @@ import { ipcRenderer } from 'electron' import { MqttClient, IConnackPacket, IPublishPacket, IClientPublishOptions, IDisconnectPacket, Packet } from 'mqtt' import _ from 'lodash' import { Subject, fromEvent } from 'rxjs' -import { bufferTime, map, filter, takeUntil, shareReplay } from 'rxjs/operators' +import { bufferTime, map, filter, takeUntil, shareReplay, distinctUntilChanged } from 'rxjs/operators' import cbor from 'cbor' import { pack, unpack } from 'msgpackr' @@ -339,7 +339,6 @@ import matchMultipleSearch from '@/utils/matchMultipleSearch' import { matchTopicMethod } from '@/utils/topicMatch' import { createClient, ignoreQoS0Message } from '@/utils/mqttUtils' import validFormatJson from '@/utils/validFormatJson' -import delay from '@/utils/delay' import MessageList from '@/components/MessageList.vue' import MsgPublish from '@/components/MsgPublish.vue' @@ -369,7 +368,13 @@ import { serializeAvroToBuffer, deserializeBufferToAvro } from '@/utils/avro' import { globalEventBus } from '@/utils/globalEventBus' import SyncTopicTreeDialog from '@/widgets/SyncTopicTreeDialog.vue' import ConnectionSelect from '@/components/ConnectionSelect.vue' -import { MAX_MESSAGES_COUNT, SCROLL_BOTTOM_THRESHOLD, SCROLL_HEIGHT_COMPENSATION } from '@/utils/constant' +import { + MAX_MESSAGES_COUNT, + SCROLL_BOTTOM_THRESHOLD, + SCROLL_HEIGHT_COMPENSATION, + MESSAGE_RATE_THRESHOLD, + MESSAGES_BUFFER_TIME, +} from '@/utils/constant' type CommandType = | 'searchContent' @@ -1509,8 +1514,10 @@ export default class ConnectionsDetail extends Vue { // received message private onMessageArrived(client: MqttClient, id: string) { const unsubscribe$ = new Subject() + const messageBuffer$ = new Subject() + let isBufferEnabled = false - // Add close event handler if not already present + // Add close event handler if (client.listenerCount('close') <= 1) { fromEvent(client, 'close').subscribe(() => { unsubscribe$.next() @@ -1528,39 +1535,6 @@ export default class ConnectionsDetail extends Vue { shareReplay(1), ) - // 消息速率检测流 - // const rateCheck$ = messageSubject$.pipe( - // bufferTime(1000), - // map((messages) => messages.length), - // map((rate) => { - // console.log('Message Rate', rate) - // if (rate > 10) { - // this.$log.info(`Message rate: ${rate}/s, enabling buffer`) - // return true - // } - // return false - // }), - // ) - - // // 消息速率检测流 - // rateCheck$.subscribe((enableBuffer) => { - // console.log('Buffer Enabled', enableBuffer) - // }) - - // Print message log - messageSubject$.subscribe((message: MessageModel) => { - this.printMessageLog(id, message) - this.renderMessage(id, message) - }) - - // Render messages - // 需要接收到的数据量非常大的时候,才动态开启 - // messageSubject$.pipe(bufferTime(500)).subscribe((messages: MessageModel[]) => { - // if (messages.length) { - // this.renderMessage(id, messages) - // } - // }) - // Save messages with QoS filtering messageSubject$ .pipe( @@ -1572,6 +1546,45 @@ export default class ConnectionsDetail extends Vue { this.saveMessage(id, messages) } }) + + // Rate monitoring stream + const rateCheck$ = messageSubject$.pipe( + bufferTime(MESSAGES_BUFFER_TIME), + map((messages) => messages.length), + map((rate) => rate > MESSAGE_RATE_THRESHOLD), + distinctUntilChanged(), + ) + + // Toggle buffer mode based on message rate + rateCheck$.subscribe((shouldEnableBuffer) => { + isBufferEnabled = shouldEnableBuffer + if (isBufferEnabled) { + this.$log.info(`Message buffer mode enabled (> ${MESSAGE_RATE_THRESHOLD}/s)`) + } + }) + + // Handle message rendering with dynamic buffering + messageSubject$.subscribe((message) => { + if (!message) return + this.printMessageLog(id, message) + if (isBufferEnabled) { + messageBuffer$.next(message) + } else { + this.renderMessage(id, message) + } + }) + + // Buffer rendering stream + messageBuffer$ + .pipe( + bufferTime(MESSAGES_BUFFER_TIME), + filter((messages) => messages.length > 0), + ) + .subscribe((messages) => { + if (messages.length) { + this.renderMessage(id, messages) + } + }) } // Set timed message success