Skip to content

Commit

Permalink
close producer when TopicNotFound/TopicTerminated/ProducerFenced
Browse files Browse the repository at this point in the history
  • Loading branch information
gunli committed Nov 20, 2023
1 parent 1b1dd23 commit 2fa1411
Showing 1 changed file with 19 additions and 11 deletions.
30 changes: 19 additions & 11 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,16 +448,20 @@ func (p *partitionProducer) reconnectToBroker() {
errMsg := err.Error()
if strings.Contains(errMsg, errMsgTopicNotFound) {
// when topic is deleted, we should give up reconnection.
p.log.Warn("Topic not found, stop reconnecting")
p.log.Warn("Topic not found, stop reconnecting, close the producer")
// in JAVA client, when topic not found, closeAsync() will be called, see
// nolint: lll
// https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1792-L1799
p.doClose(newError(TopicNotFound, err.Error()))
break
}

if strings.Contains(errMsg, errMsgTopicTerminated) {
p.log.Warn("Topic was terminated, failing pending messages, stop reconnecting")
p.failPendingMessages(newError(TopicTerminated, err.Error()))
// can not set to producerClosing , or it will fail when we call internalClose()
// there is a Terminated state in JAVA client, maybe we should add a producerTerminated state ?
// p.setProducerState(producerClosing)
p.log.Warn("Topic was terminated, failing pending messages, stop reconnecting, close the producer")
// in JAVA client, producer will be set to `Terminated` state and close, see
// nolint: lll
// https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1822-L1828
p.doClose(newError(TopicTerminated, err.Error()))
break
}

Expand All @@ -469,10 +473,10 @@ func (p *partitionProducer) reconnectToBroker() {

if strings.Contains(errMsg, errMsgProducerFenced) {
p.log.Warn("Producer was fenced, failing pending messages, stop reconnecting")
p.failPendingMessages(newError(ProducerFenced, err.Error()))
// can not set to producerClosing , or it will fail when we call internalClose()
// there is a ProducerFenced state in JAVA client, maybe we should add a producerFenced state ?
// p.setProducerState(producerClosing)
// in JAVA client, producer will be set to `Fenced` state and close, see
// nolint: lll
// https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1830-L1836
p.doClose(newError(ProducerFenced, err.Error()))
break
}

Expand Down Expand Up @@ -1333,6 +1337,10 @@ func (p *partitionProducer) internalClose(req *closeProducer) {
return
}

p.doClose(errProducerClosed)
}

func (p *partitionProducer) doClose(reason error) {
defer close(p.dataChan)
defer close(p.cmdChan)
p.log.Info("Closing producer")
Expand All @@ -1348,7 +1356,7 @@ func (p *partitionProducer) internalClose(req *closeProducer) {
} else {
p.log.Info("Closed producer")
}
p.failPendingMessages(errProducerClosed)
p.failPendingMessages(reason)

if p.batchBuilder != nil {
if err = p.batchBuilder.Close(); err != nil {
Expand Down

0 comments on commit 2fa1411

Please # to comment.