diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 5dfe380e23..1824cd99e8 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -448,16 +448,20 @@ func (p *partitionProducer) reconnectToBroker() { errMsg := err.Error() if strings.Contains(errMsg, errMsgTopicNotFound) { // when topic is deleted, we should give up reconnection. - p.log.Warn("Topic not found, stop reconnecting") + p.log.Warn("Topic not found, stop reconnecting, close the producer") + // in JAVA client, when topic not found, closeAsync() will be called, see + // nolint: lll + // https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1792-L1799 + p.doClose(newError(TopicNotFound, err.Error())) break } if strings.Contains(errMsg, errMsgTopicTerminated) { - p.log.Warn("Topic was terminated, failing pending messages, stop reconnecting") - p.failPendingMessages(newError(TopicTerminated, err.Error())) - // can not set to producerClosing , or it will fail when we call internalClose() - // there is a Terminated state in JAVA client, maybe we should add a producerTerminated state ? - // p.setProducerState(producerClosing) + p.log.Warn("Topic was terminated, failing pending messages, stop reconnecting, close the producer") + // in JAVA client, producer will be set to `Terminated` state and close, see + // nolint: lll + // https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1822-L1828 + p.doClose(newError(TopicTerminated, err.Error())) break } @@ -469,10 +473,10 @@ func (p *partitionProducer) reconnectToBroker() { if strings.Contains(errMsg, errMsgProducerFenced) { p.log.Warn("Producer was fenced, failing pending messages, stop reconnecting") - p.failPendingMessages(newError(ProducerFenced, err.Error())) - // can not set to producerClosing , or it will fail when we call internalClose() - // there is a ProducerFenced state in JAVA client, maybe we should add a producerFenced state ? - // p.setProducerState(producerClosing) + // in JAVA client, producer will be set to `Fenced` state and close, see + // nolint: lll + // https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1830-L1836 + p.doClose(newError(ProducerFenced, err.Error())) break } @@ -1333,6 +1337,10 @@ func (p *partitionProducer) internalClose(req *closeProducer) { return } + p.doClose(errProducerClosed) +} + +func (p *partitionProducer) doClose(reason error) { defer close(p.dataChan) defer close(p.cmdChan) p.log.Info("Closing producer") @@ -1348,7 +1356,7 @@ func (p *partitionProducer) internalClose(req *closeProducer) { } else { p.log.Info("Closed producer") } - p.failPendingMessages(errProducerClosed) + p.failPendingMessages(reason) if p.batchBuilder != nil { if err = p.batchBuilder.Close(); err != nil {