Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

fix(cli): update wrong prompt type when error #1670

Merged
merged 4 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cli/src/lib/conn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const conn = (options: ConnectOptions) => {
basicLog.reconnectTimesLimit()
})
} else {
basicLog.reconnecting()
basicLog.reconnecting(retryTimes, maximumReconnectTimes)
}
})

Expand Down
49 changes: 39 additions & 10 deletions cli/src/lib/pub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,23 @@ const send = (
format: FormatType | undefined
opts: IClientPublishOptions
},
maximumReconnectTimes: number,
) => {
let retryTimes = 0
let isNewConnection = true
const client = mqtt.connect(connOpts)
basicLog.connecting(config, connOpts.hostname!, connOpts.port, pubOpts.topic, pubOpts.message.toString())

client.on('connect', () => {
retryTimes = 0
basicLog.connected()
const { topic, message, protobufPath, protobufMessageName, format } = pubOpts
basicLog.publishing()
const publishMessage = processPublishMessage(message, protobufPath, protobufMessageName, format)
client.publish(topic, publishMessage, pubOpts.opts, (err) => {
if (err) {
logWrapper.warn(err.toString())
basicLog.error(err)
process.exit(1)
} else {
basicLog.published()
}
Expand All @@ -72,13 +78,35 @@ const send = (
}
})
})

client.on('error', (err) => {
basicLog.error(err)
client.end()
})

client.on('reconnect', () => {
retryTimes += 1
if (retryTimes > maximumReconnectTimes) {
client.end(true, {}, () => {
basicLog.reconnectTimesLimit()
process.exit(1)
})
} else {
basicLog.reconnecting(retryTimes, maximumReconnectTimes)
isNewConnection = false
}
})

client.on('close', () => {
basicLog.close()
})

client.on('disconnect', (packet: IDisconnectPacket) => {
basicLog.disconnect(packet)
})
}

const multisend = (
const multiSend = (
config: boolean | string | undefined,
connOpts: IClientOptions,
pubOpts: {
Expand Down Expand Up @@ -114,7 +142,8 @@ const multisend = (
pump(process.stdin, split2(), sender, (err) => {
client.end()
if (err) {
throw err
basicLog.error(err)
process.exit(1)
}
})
})
Expand All @@ -127,12 +156,12 @@ const multisend = (
client.on('reconnect', () => {
retryTimes += 1
if (retryTimes > maximumReconnectTimes) {
client.end(false, {}, () => {
client.end(true, {}, () => {
basicLog.reconnectTimesLimit()
process.exit(1)
})
} else {
basicLog.reconnecting()
basicLog.reconnecting(retryTimes, maximumReconnectTimes)
isNewConnection = false
sender.uncork()
}
Expand Down Expand Up @@ -179,12 +208,12 @@ const pub = (options: PublishOptions) => {

const handleStdin = () => {
if (options.multiline) {
multisend(loadOptions, connOpts, pubOpts, options.maximumReconnectTimes)
multiSend(loadOptions, connOpts, pubOpts, options.maximumReconnectTimes)
} else {
process.stdin.pipe(
concat((data) => {
pubOpts.message = data
send(loadOptions, connOpts, pubOpts)
send(loadOptions, connOpts, pubOpts, options.maximumReconnectTimes)
}),
)
}
Expand All @@ -193,11 +222,11 @@ const pub = (options: PublishOptions) => {
if (options.fileRead) {
const bufferData = handleFileRead(processPath(options.fileRead!))
pubOpts.message = bufferData
send(loadOptions, connOpts, pubOpts)
send(loadOptions, connOpts, pubOpts, options.maximumReconnectTimes)
} else if (options.stdin) {
handleStdin()
} else {
send(loadOptions, connOpts, pubOpts)
send(loadOptions, connOpts, pubOpts, options.maximumReconnectTimes)
}
}

Expand Down Expand Up @@ -356,7 +385,7 @@ const multiPub = async (commandType: CommandType, options: BenchPublishOptions |
client.publish(publishTopic, publishMessage, pubOpts.opts, (err) => {
inFlightMessageCount -= 1
if (err) {
logWrapper.warn(err.toString())
basicLog.error(err)
} else {
total += 1
rate += 1
Expand Down
12 changes: 5 additions & 7 deletions cli/src/lib/sub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,14 @@ const sub = (options: SubscribeOptions) => {
if (err) {
!outputModeClean && basicLog.error(err)
process.exit(1)
} else {
!outputModeClean && basicLog.subscribed(t)
}

result.forEach((sub) => {
if (sub.qos > 2) {
!outputModeClean && basicLog.subscriptionNegated(sub)
process.exit(1)
}
})
!outputModeClean && basicLog.subscribed(t)
})
})
})
Expand Down Expand Up @@ -165,7 +163,7 @@ const sub = (options: SubscribeOptions) => {
!outputModeClean && basicLog.reconnectTimesLimit()
})
} else {
!outputModeClean && basicLog.reconnecting()
!outputModeClean && basicLog.reconnecting(retryTimes, maximumReconnectTimes)
}
})

Expand Down Expand Up @@ -241,9 +239,6 @@ const benchSub = async (options: BenchSubscribeOptions) => {
if (err) {
logWrapper.fail(`[${i}/${count}] - Client ID: ${opts.clientId}, ${err}`)
process.exit(1)
} else {
interactiveSub.success('[%d/%d] - Subscribed to %s', connectedCount, count, topicName)
subscribedCount += 1
}

result.forEach((sub) => {
Expand All @@ -255,6 +250,9 @@ const benchSub = async (options: BenchSubscribeOptions) => {
}
})

interactiveSub.success('[%d/%d] - Subscribed to %s', connectedCount, count, topicName)
subscribedCount += 1

if (connectedCount === count && subscribedCount === count * topic.length && !isLogged) {
const connEnd = Date.now()
signale.success(`Created ${count} connections in ${(connEnd - connStart) / 1000}s`)
Expand Down
3 changes: 2 additions & 1 deletion cli/src/utils/logWrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ const basicLog = {
enterToPublish: () => logWrapper.success('Connected, press Enter to publish, press Ctrl+C to exit'),
error: (err: Error) => logWrapper.fail(err.toString()),
close: () => logWrapper.fail('Connection closed'),
reconnecting: () => logWrapper.await('Reconnecting...'),
reconnecting: (retryTimes: number, maxReTryTimes: number) =>
logWrapper.await(`Reconnecting...[${retryTimes}/${maxReTryTimes}]`),
reconnectTimesLimit: () => logWrapper.fail('Exceed the maximum reconnect times limit, stop retry'),
disconnect: (packet: IDisconnectPacket, clientId?: string) => {
const { reasonCode } = packet
Expand Down
8 changes: 5 additions & 3 deletions src/views/connections/ConnectionsDetail.vue
Original file line number Diff line number Diff line change
Expand Up @@ -1077,12 +1077,14 @@ export default class ConnectionsDetail extends Vue {
})
this.$emit('reload')
} else {
this.reTryConnectTimes += 1
if (this.reTryConnectTimes > this.maxReconnectTimes) {
this.$log.warn('Max reconnect limit reached, stopping retries')
this.forceCloseTheConnection()
} else {
this.$log.info(`Retrying connection for ${this.record.name}, attempt: ${this.reTryConnectTimes}`)
this.reTryConnectTimes += 1
this.$log.info(
`Retrying connection for ${this.record.name}, attempt: [${this.reTryConnectTimes}/${this.maxReconnectTimes}]`,
)
this.connectLoading = true
this.$notify({
title: this.$tc('connections.reconnect'),
Expand Down Expand Up @@ -1603,7 +1605,7 @@ export default class ConnectionsDetail extends Vue {
*/
private handleErrorOnPublish(error: Error) {
const errorMsg = error.toString()
this.$message.error(errorMsg)
this.notifyMsgWithCopilot(errorMsg)
this.stopTimedSend()
this.$log.error(
`Failed to publish message for ${this.record.name}. Error: ${errorMsg}. Stack trace: ${error.stack}`,
Expand Down
Loading