Skip to content

Commit

Permalink
Fix flush lock keeps locked problem
Browse files Browse the repository at this point in the history
  • Loading branch information
mhmtszr committed Jul 21, 2023
1 parent b6a3184 commit f5b2043
Showing 1 changed file with 2 additions and 0 deletions.
2 changes: 2 additions & 0 deletions kafka/producer/producer_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ func (b *Batch) PrepareEndRebalancing() {
func (b *Batch) AddMessages(ctx *models.ListenerContext, messages []kafka.Message, eventTime time.Time) {
b.flushLock.Lock()
if b.isDcpRebalancing {
b.errorLogger.Printf("could not add new message to batch while rebalancing")
b.flushLock.Unlock()
return
}
b.messages = append(b.messages, messages...)
Expand Down

0 comments on commit f5b2043

Please # to comment.