diff --git a/src/components/MsgPublish.vue b/src/components/MsgPublish.vue index 1adeb77b5..ddf58991f 100644 --- a/src/components/MsgPublish.vue +++ b/src/components/MsgPublish.vue @@ -227,8 +227,6 @@ import validFormatJson from '@/utils/validFormatJson' import useServices from '@/database/useServices' import time from '@/utils/time' -type UserPairObect = { key: string; value: string; checked: boolean } - @Component({ components: { Editor, @@ -318,7 +316,7 @@ export default class MsgPublish extends Vue { } public defaultPropObj = { key: '', value: '', checked: true } - public listData: UserPairObect[] = [_.cloneDeep(this.defaultPropObj)] + public listData: UserPropsPairObject[] = [_.cloneDeep(this.defaultPropObj)] private hasMqtt5Prop: boolean = false @@ -465,10 +463,27 @@ export default class MsgPublish extends Vue { } private async loadHistoryData(isNewPayload?: boolean, isLoadData?: boolean) { - const { historyMessageHeaderService, historyMessagePayloadService } = useServices() + const { historyMessageHeaderService, historyMessagePayloadService, messageService } = useServices() const headersHistory = (await historyMessageHeaderService.getAll()) ?? [] const payloadsHistory = (await historyMessagePayloadService.getAll()) ?? [] + if (this.mqtt5PropsEnable) { + const propHistory = await messageService.getMessageProp(this.$route.params.id) + if (propHistory) { + this.MQTT5Props = propHistory + if (propHistory.userProperties) { + this.listData = Object.entries(propHistory.userProperties).map(([key, value]) => { + return { + key, + value, + checked: true, + } as UserPropsPairObject + }) + } + } + } + const historyMsg = payloadsHistory[payloadsHistory.length - 1] + if (historyMsg && isLoadData) { this.payloadType = historyMsg.payloadType } diff --git a/src/database/database.config.ts b/src/database/database.config.ts index 22c611d36..5f371aae6 100644 --- a/src/database/database.config.ts +++ b/src/database/database.config.ts @@ -24,6 +24,7 @@ import { autoScroll1635155945767 } from './migration/1635155945767-autoScroll' import { huLang1635392304194 } from './migration/1635393164071-huLang' import { messageProps1637636965786 } from './migration/1637636965786-messageProps' import { enhanceMessageType1638081576988 } from './migration/1638081576988-enhanceMessageType' +import { messageHistoryProps1638375518392 } from './migration/1638375518392-messageHistoryProps' const STORE_PATH = getAppDataPath('MQTTX') try { @@ -55,6 +56,7 @@ const ORMConfig = { huLang1635392304194, messageProps1637636965786, enhanceMessageType1638081576988, + messageHistoryProps1638375518392, ], migrationsTableName: 'temp_migration_table', entities: [ diff --git a/src/database/migration/1638375518392-messageHistoryProps.ts b/src/database/migration/1638375518392-messageHistoryProps.ts new file mode 100644 index 000000000..95ac71a7f --- /dev/null +++ b/src/database/migration/1638375518392-messageHistoryProps.ts @@ -0,0 +1,291 @@ +import { MigrationInterface, QueryRunner } from 'typeorm' + +export class messageHistoryProps1638375518392 implements MigrationInterface { + name = 'messageHistoryProps1638375518392' + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(` + DROP INDEX "IDX_6fcc95296b7ab85477d47b698f" + `) + await queryRunner.query(` + CREATE TABLE "temporary_ConnectionEntity" ( + "id" varchar PRIMARY KEY NOT NULL, + "client_id" varchar NOT NULL, + "name" varchar NOT NULL, + "clean" boolean NOT NULL DEFAULT (1), + "protocol" varchar CHECK(protocol IN ('ws', 'wss', 'mqtt', 'mqtts')) NOT NULL DEFAULT ('mqtt'), + "host" varchar NOT NULL, + "port" integer NOT NULL, + "keepalive" integer NOT NULL DEFAULT (60), + "connectTimeout" integer NOT NULL, + "reconnect" boolean NOT NULL, + "username" varchar, + "password" varchar, + "path" varchar, + "certType" varchar DEFAULT (''), + "ssl" boolean NOT NULL, + "mqttVersion" varchar NOT NULL, + "unreadMessageCount" integer NOT NULL, + "clientIdWithTime" boolean DEFAULT (0), + "parent_id" varchar, + "orderId" integer, + "rejectUnauthorized" boolean DEFAULT (1), + "ca" varchar NOT NULL, + "cert" varchar NOT NULL, + "key" varchar NOT NULL, + "isCollection" boolean NOT NULL DEFAULT (0), + "createAt" datetime NOT NULL DEFAULT (CURRENT_TIMESTAMP), + "updateAt" datetime NOT NULL DEFAULT (CURRENT_TIMESTAMP), + "willId" varchar, + "sessionExpiryInterval" integer, + "receiveMaximum" integer, + "maximumPacketSize" integer, + "topicAliasMaximum" integer, + "requestResponseInformation" boolean, + "requestProblemInformation" boolean, + "userProperties" varchar, + "authenticationMethod" varchar, + "authenticationData" varchar, + "pushPropsPayloadFormatIndicator" boolean, + "pushPropsMessageExpiryInterval" integer, + "pushPropsTopicAlias" integer, + "pushPropsResponseTopic" varchar, + "pushPropsCorrelationData" varchar, + "pushPropsUserProperties" varchar, + "pushPropsSubscriptionIdentifier" integer, + "pushPropsContentType" varchar, + CONSTRAINT "REL_71db93dbf719b8b12e835e343f" UNIQUE ("willId"), + CONSTRAINT "FK_71db93dbf719b8b12e835e343fe" FOREIGN KEY ("willId") REFERENCES "WillEntity" ("id") ON DELETE CASCADE ON UPDATE NO ACTION, + CONSTRAINT "FK_9beef409e9fbe4bd50dd024bac4" FOREIGN KEY ("parent_id") REFERENCES "CollectionEntity" ("id") ON DELETE CASCADE ON UPDATE NO ACTION + ) + `) + await queryRunner.query(` + INSERT INTO "temporary_ConnectionEntity"( + "id", + "client_id", + "name", + "clean", + "protocol", + "host", + "port", + "keepalive", + "connectTimeout", + "reconnect", + "username", + "password", + "path", + "certType", + "ssl", + "mqttVersion", + "unreadMessageCount", + "clientIdWithTime", + "parent_id", + "orderId", + "rejectUnauthorized", + "ca", + "cert", + "key", + "isCollection", + "createAt", + "updateAt", + "willId", + "sessionExpiryInterval", + "receiveMaximum", + "maximumPacketSize", + "topicAliasMaximum", + "requestResponseInformation", + "requestProblemInformation", + "userProperties", + "authenticationMethod", + "authenticationData" + ) + SELECT "id", + "client_id", + "name", + "clean", + "protocol", + "host", + "port", + "keepalive", + "connectTimeout", + "reconnect", + "username", + "password", + "path", + "certType", + "ssl", + "mqttVersion", + "unreadMessageCount", + "clientIdWithTime", + "parent_id", + "orderId", + "rejectUnauthorized", + "ca", + "cert", + "key", + "isCollection", + "createAt", + "updateAt", + "willId", + "sessionExpiryInterval", + "receiveMaximum", + "maximumPacketSize", + "topicAliasMaximum", + "requestResponseInformation", + "requestProblemInformation", + "userProperties", + "authenticationMethod", + "authenticationData" + FROM "ConnectionEntity" + `) + await queryRunner.query(` + DROP TABLE "ConnectionEntity" + `) + await queryRunner.query(` + ALTER TABLE "temporary_ConnectionEntity" + RENAME TO "ConnectionEntity" + `) + await queryRunner.query(` + CREATE UNIQUE INDEX "IDX_6fcc95296b7ab85477d47b698f" ON "ConnectionEntity" ("client_id") + `) + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(` + DROP INDEX "IDX_6fcc95296b7ab85477d47b698f" + `) + await queryRunner.query(` + ALTER TABLE "ConnectionEntity" + RENAME TO "temporary_ConnectionEntity" + `) + await queryRunner.query(` + CREATE TABLE "ConnectionEntity" ( + "id" varchar PRIMARY KEY NOT NULL, + "client_id" varchar NOT NULL, + "name" varchar NOT NULL, + "clean" boolean NOT NULL DEFAULT (1), + "protocol" varchar CHECK(protocol IN ('ws', 'wss', 'mqtt', 'mqtts')) NOT NULL DEFAULT ('mqtt'), + "host" varchar NOT NULL, + "port" integer NOT NULL, + "keepalive" integer NOT NULL DEFAULT (60), + "connectTimeout" integer NOT NULL, + "reconnect" boolean NOT NULL, + "username" varchar, + "password" varchar, + "path" varchar, + "certType" varchar DEFAULT (''), + "ssl" boolean NOT NULL, + "mqttVersion" varchar NOT NULL, + "unreadMessageCount" integer NOT NULL, + "clientIdWithTime" boolean DEFAULT (0), + "parent_id" varchar, + "orderId" integer, + "rejectUnauthorized" boolean DEFAULT (1), + "ca" varchar NOT NULL, + "cert" varchar NOT NULL, + "key" varchar NOT NULL, + "isCollection" boolean NOT NULL DEFAULT (0), + "createAt" datetime NOT NULL DEFAULT (CURRENT_TIMESTAMP), + "updateAt" datetime NOT NULL DEFAULT (CURRENT_TIMESTAMP), + "willId" varchar, + "sessionExpiryInterval" integer, + "receiveMaximum" integer, + "maximumPacketSize" integer, + "topicAliasMaximum" integer, + "requestResponseInformation" boolean, + "requestProblemInformation" boolean, + "userProperties" varchar, + "authenticationMethod" varchar, + "authenticationData" varchar, + CONSTRAINT "REL_71db93dbf719b8b12e835e343f" UNIQUE ("willId"), + CONSTRAINT "FK_71db93dbf719b8b12e835e343fe" FOREIGN KEY ("willId") REFERENCES "WillEntity" ("id") ON DELETE CASCADE ON UPDATE NO ACTION, + CONSTRAINT "FK_9beef409e9fbe4bd50dd024bac4" FOREIGN KEY ("parent_id") REFERENCES "CollectionEntity" ("id") ON DELETE CASCADE ON UPDATE NO ACTION + ) + `) + await queryRunner.query(` + INSERT INTO "ConnectionEntity"( + "id", + "client_id", + "name", + "clean", + "protocol", + "host", + "port", + "keepalive", + "connectTimeout", + "reconnect", + "username", + "password", + "path", + "certType", + "ssl", + "mqttVersion", + "unreadMessageCount", + "clientIdWithTime", + "parent_id", + "orderId", + "rejectUnauthorized", + "ca", + "cert", + "key", + "isCollection", + "createAt", + "updateAt", + "willId", + "sessionExpiryInterval", + "receiveMaximum", + "maximumPacketSize", + "topicAliasMaximum", + "requestResponseInformation", + "requestProblemInformation", + "userProperties", + "authenticationMethod", + "authenticationData" + ) + SELECT "id", + "client_id", + "name", + "clean", + "protocol", + "host", + "port", + "keepalive", + "connectTimeout", + "reconnect", + "username", + "password", + "path", + "certType", + "ssl", + "mqttVersion", + "unreadMessageCount", + "clientIdWithTime", + "parent_id", + "orderId", + "rejectUnauthorized", + "ca", + "cert", + "key", + "isCollection", + "createAt", + "updateAt", + "willId", + "sessionExpiryInterval", + "receiveMaximum", + "maximumPacketSize", + "topicAliasMaximum", + "requestResponseInformation", + "requestProblemInformation", + "userProperties", + "authenticationMethod", + "authenticationData" + FROM "temporary_ConnectionEntity" + `) + await queryRunner.query(` + DROP TABLE "temporary_ConnectionEntity" + `) + await queryRunner.query(` + CREATE UNIQUE INDEX "IDX_6fcc95296b7ab85477d47b698f" ON "ConnectionEntity" ("client_id") + `) + } +} diff --git a/src/database/models/ConnectionEntity.ts b/src/database/models/ConnectionEntity.ts index 2fa5daf13..eadb4c302 100644 --- a/src/database/models/ConnectionEntity.ts +++ b/src/database/models/ConnectionEntity.ts @@ -65,6 +65,31 @@ export default class ConnectionEntity { @Column({ type: 'boolean', default: false, nullable: true }) clientIdWithTime?: boolean + // message push props + @Column({ type: 'boolean', nullable: true }) + pushPropsPayloadFormatIndicator?: boolean + + @Column({ type: 'integer', nullable: true }) + pushPropsMessageExpiryInterval?: number + + @Column({ type: 'integer', nullable: true }) + pushPropsTopicAlias?: number + + @Column({ type: 'varchar', nullable: true }) + pushPropsResponseTopic?: string + + @Column({ type: 'varchar', nullable: true }) + pushPropsCorrelationData?: string + + @Column({ type: 'varchar', nullable: true }) + pushPropsUserProperties?: string + + @Column({ type: 'integer', nullable: true }) + pushPropsSubscriptionIdentifier?: number + + @Column({ type: 'varchar', nullable: true }) + pushPropsContentType?: string + // ManyToOne entities @ManyToOne(() => CollectionEntity, (collection) => collection.connections, { onDelete: 'CASCADE', nullable: true }) @JoinColumn({ name: 'parent_id', referencedColumnName: 'id' }) diff --git a/src/database/services/MessageService.ts b/src/database/services/MessageService.ts index 4c9e1652d..468d04ceb 100644 --- a/src/database/services/MessageService.ts +++ b/src/database/services/MessageService.ts @@ -1,6 +1,7 @@ import { Service } from 'typedi' import { InjectRepository } from 'typeorm-typedi-extensions' import MessageEntity from '../models/MessageEntity' +import ConnectionEntity from '../models/ConnectionEntity' import { Repository } from 'typeorm' @Service() @@ -8,6 +9,8 @@ export default class MessageService { constructor( @InjectRepository(MessageEntity) private messageRepository: Repository, + @InjectRepository(ConnectionEntity) + private connectionRepository: Repository, ) {} public static modelToEntity(model: MessageModel, connectionId: string | undefined): MessageEntity { @@ -35,6 +38,42 @@ export default class MessageService { } as MessageModel } + public async addMessageProp(properties: MessageModel['properties'], connectionId: string) { + if (!properties) return + const query = await this.connectionRepository.findOne(connectionId) + if (!query) { + return + } + this.connectionRepository.update(connectionId, { + ...query, + pushPropsPayloadFormatIndicator: properties?.payloadFormatIndicator, + pushPropsMessageExpiryInterval: properties?.messageExpiryInterval, + pushPropsTopicAlias: properties?.topicAlias, + pushPropsResponseTopic: properties?.responseTopic, + pushPropsCorrelationData: properties?.correlationData?.toString(), + pushPropsUserProperties: JSON.stringify(properties?.userProperties), + pushPropsSubscriptionIdentifier: properties?.subscriptionIdentifier, + pushPropsContentType: properties?.contentType, + }) + } + + public async getMessageProp(connectionId: string): Promise { + const query = await this.connectionRepository.findOne(connectionId) + if (!query) { + return + } + return { + payloadFormatIndicator: query.pushPropsPayloadFormatIndicator, + messageExpiryInterval: query.pushPropsMessageExpiryInterval, + topicAlias: query.pushPropsTopicAlias, + responseTopic: query.pushPropsResponseTopic, + correlationData: query.pushPropsCorrelationData ? Buffer.from(query.pushPropsCorrelationData) : undefined, + userProperties: query.pushPropsUserProperties ? JSON.parse(query.pushPropsUserProperties) : undefined, + subscriptionIdentifier: query.pushPropsSubscriptionIdentifier, + contentType: query.pushPropsContentType, + } as MessageModel['properties'] + } + public async pushToConnection(message: MessageModel, connectionId: string): Promise { const query: MessageEntity | undefined = await this.messageRepository.findOne(message.id) if (!query) { diff --git a/src/types/global.d.ts b/src/types/global.d.ts index b7a3de3f9..78f100350 100644 --- a/src/types/global.d.ts +++ b/src/types/global.d.ts @@ -38,6 +38,8 @@ declare global { type MessageType = 'all' | 'received' | 'publish' + type UserPropsPairObject = { key: string; value: string; checked: boolean } + // Vue type VueForm = Vue & { validate: (validate: (valid: boolean) => void) => void diff --git a/src/views/connections/ConnectionForm.vue b/src/views/connections/ConnectionForm.vue index 7f1f107a1..b1985a73d 100644 --- a/src/views/connections/ConnectionForm.vue +++ b/src/views/connections/ConnectionForm.vue @@ -543,8 +543,6 @@ import _ from 'lodash' import time from '@/utils/time' import useServices from '@/database/useServices' -type UserPairObect = { key: string; value: string; checked: boolean } - @Component({ components: { Editor, @@ -575,8 +573,20 @@ export default class ConnectionForm extends Vue { private record: ConnectionModel = _.cloneDeep(this.defaultRecord) public defaultPropObj = { key: '', value: '', checked: true } + private initProps() { + if (this.record.properties?.userProperties) { + const props = this.record.properties.userProperties + this.listData = Object.entries(props).map(([key, value]) => { + return { + key, + value, + checked: true, + } as UserPropsPairObject + }) + } + } - public listData: UserPairObect[] = [_.cloneDeep(this.defaultPropObj)] + public listData: UserPropsPairObject[] = [_.cloneDeep(this.defaultPropObj)] private checkItem(index: number) { this.listData[index].checked = !this.listData[index].checked @@ -632,6 +642,7 @@ export default class ConnectionForm extends Vue { this.record = res this.oldName = res.name this.record.protocol = getMQTTProtocol(res) + this.initProps() } } diff --git a/src/views/connections/ConnectionsDetail.vue b/src/views/connections/ConnectionsDetail.vue index 4545fc594..3e4f1f92b 100644 --- a/src/views/connections/ConnectionsDetail.vue +++ b/src/views/connections/ConnectionsDetail.vue @@ -1174,7 +1174,12 @@ export default class ConnectionsDetail extends Vue { const { id, topic, qos, payload, retain, properties } = message let props: PushPropertiesModel | undefined = undefined if (properties && Object.entries(properties).filter(([_, v]) => v !== null && v !== undefined).length > 0) { - props = Object.fromEntries(Object.entries(properties).filter(([_, v]) => v !== null && v !== undefined)) + const propRecords = Object.entries(properties).filter(([_, v]) => v !== null && v !== undefined) + props = Object.fromEntries(propRecords) + if (propRecords.length > 0) { + const { messageService } = useServices() + this.record.id && messageService.addMessageProp(props, this.record.id) + } } if (!topic) {