Skip to content

Commit

Permalink
fix: provide IsMessageSizeTooLarge() function
Browse files Browse the repository at this point in the history
This provides a method for producers to workaround the regression
introduced by IBM#2628. Fixes IBM#2655.

Signed-off-by: Adam Eijdenberg <adam.eijdenberg@defence.gov.au>
  • Loading branch information
ae-govau committed Apr 1, 2024
1 parent 4ad3504 commit b2061ea
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 1 deletion.
2 changes: 1 addition & 1 deletion async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func (p *asyncProducer) dispatcher() {

size := msg.ByteSize(version)
if size > p.conf.Producer.MaxMessageBytes {
p.returnError(msg, ConfigurationError(fmt.Sprintf("Attempt to produce message larger than configured Producer.MaxMessageBytes: %d > %d", size, p.conf.Producer.MaxMessageBytes)))
p.returnError(msg, newMessageSizeTooLargeConfigurationError(size, p.conf.Producer.MaxMessageBytes))
continue
}

Expand Down
31 changes: 31 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,22 @@ func (err ConfigurationError) Error() string {
return "kafka: invalid configuration (" + string(err) + ")"
}

const (
// messageSizeTooLargePrefix used by newMessageSizeTooLargeConfigurationError and IsMessageSizeTooLarge
messageSizeTooLargePrefix = "Attempt to produce message larger than configured Producer.MaxMessageBytes"
)

// newMessageSizeTooLargeConfigurationError creates an error returned when a message payload is
// attempted to be published that exceeds the configured maximum message size. We return a specific
// string that is checked for by IsMessageSizeTooLarge() as several consumers of this library
// need to differentiate this error from other ConfigurationErrors, and the actual type returned
// has changed multiple times during the evolution of this library. While the string check is a hack,
// it's better that it's done once within this library, than multiple users reimplementing in their
// own clients.
func newMessageSizeTooLargeConfigurationError(msgSize, configuredSize int) ConfigurationError {
return ConfigurationError(fmt.Sprintf("%s: %d > %d", messageSizeTooLargePrefix, msgSize, configuredSize))
}

// KError is the type of error that can be returned directly by the Kafka broker.
// See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes
type KError int16
Expand Down Expand Up @@ -455,3 +471,18 @@ func (err KError) Error() string {

return fmt.Sprintf("Unknown error, how did this happen? Error code = %d", err)
}

// IsMessageSizeTooLarge returns true if the error relates the message size
// being either too large as reported by the broker, or too large because it
// exceeds the configured maximum size.
func IsMessageSizeTooLarge(err error) bool {
if err == nil {
return false
}

if errors.Is(err, ErrMessageSizeTooLarge) {
return true
}

return strings.Contains(err.Error(), messageSizeTooLargePrefix)
}
8 changes: 8 additions & 0 deletions errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"net"
"testing"

"github.com/stretchr/testify/assert"
)

func TestSentinelWithSingleWrappedError(t *testing.T) {
Expand Down Expand Up @@ -63,3 +65,9 @@ func TestSentinelWithMultipleWrappedErrors(t *testing.T) {
t.Errorf("unwrapped value unexpected result")
}
}

func TestIsMessageSizeTooLarge(t *testing.T) {
assert.True(t, IsMessageSizeTooLarge(ErrMessageSizeTooLarge), "broker side error must be regarded as a too large error")
assert.True(t, IsMessageSizeTooLarge(newMessageSizeTooLargeConfigurationError(2, 1)), "config side error must be regarded as a too large error")
assert.False(t, IsMessageSizeTooLarge(nil), "nil is not an error")
}

0 comments on commit b2061ea

Please # to comment.