Skip to content

Commit

Permalink
fixed case when no deliver configuration in subscription (#527)
Browse files Browse the repository at this point in the history
* fixed case when no deliver configuration in subscription

* added test coverage for added lines

* added additional test coverage

* added additional for dispatcher

* commented original changes

* returned fix back
  • Loading branch information
astelmashenko authored Apr 10, 2024
1 parent 44752ad commit 54230bf
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 14 deletions.
8 changes: 7 additions & 1 deletion pkg/channel/jetstream/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,13 @@ func TestDispatcher_ReconcileConsumers(t *testing.T) {
UID: subscriber2UID,
})

err = d.ReconcileConsumers(ctx, *configNew, true)
err = d.ReconcileConsumers(ctx, *configNew, false)
if err != nil {
t.Fatal(err)
}

newConfig := createChannelConfig(nc)
err = d.ReconcileConsumers(ctx, *newConfig, true)
if err != nil {
t.Fatal(err)
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/channel/jetstream/dispatcher/message_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,7 @@ type NatsMessageDispatcherImpl struct {
}

func NewNatsMessageDispatcher(logger *zap.Logger) *NatsMessageDispatcherImpl {
sender, err := kncloudevents.NewHTTPMessageSenderWithTarget("")
if err != nil {
logger.Fatal("Unable to create cloudevents binding sender", zap.Error(err))
}
sender, _ := kncloudevents.NewHTTPMessageSenderWithTarget("")
return NewMessageDispatcherFromSender(logger, sender)
}

Expand Down Expand Up @@ -111,7 +108,7 @@ func (d *NatsMessageDispatcherImpl) DispatchMessageWithNatsRetries(ctx context.C
retryNumber = int(meta.NumDelivered)
}

if retryNumber <= retriesConfig.RetryMax {
if (retriesConfig != nil) && (retryNumber <= retriesConfig.RetryMax) {
lastTry = false
} else {
lastTry = true
Expand Down Expand Up @@ -212,8 +209,11 @@ func (d *NatsMessageDispatcherImpl) processDispatchResult(ctx context.Context, m
zap.Any("dispatch_resp_code", dispatchExecutionInfo.ResponseCode))

code := dispatchExecutionInfo.ResponseCode
if code/100 == 5 || code == nethttp.StatusTooManyRequests || code == nethttp.StatusRequestTimeout {
// tell JSM to redeliver the message later
if retryConfig == nil || retryNumber >= retryConfig.RetryMax {
// no more retries, just ACK
result = protocol.ResultACK
} else if code/100 == 5 || code == nethttp.StatusTooManyRequests || code == nethttp.StatusRequestTimeout {
// NACK, tell JSM to redeliver the message later
result = protocol.NewReceipt(false, "%w", err)
} else {
result = err
Expand Down
113 changes: 107 additions & 6 deletions pkg/channel/jetstream/dispatcher/message_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ const (
ackWait = 30 * time.Second
)

var (
retryCount int32 = 3
backoffDelay = "PT1S"
backoffPolicy = v1.BackoffPolicyLinear
)

func TestDispatchMessage(t *testing.T) {
testCases := map[string]struct {
sendToDestination bool
Expand All @@ -90,6 +96,7 @@ func TestDispatchMessage(t *testing.T) {
expectedReplyRequest *requestValidation
expectedDeadLetterRequest *requestValidation
lastReceiver string
delivery *v1.DeliverySpec
}{
"destination - only": {
sendToDestination: true,
Expand Down Expand Up @@ -792,6 +799,89 @@ func TestDispatchMessage(t *testing.T) {

lastReceiver: "reply",
},
"error response and retries": {
delivery: &v1.DeliverySpec{
Retry: &retryCount,
BackoffDelay: &backoffDelay,
BackoffPolicy: &backoffPolicy,
},
sendToDestination: true,
header: map[string][]string{
// do-not-forward should not get forwarded.
"do-not-forward": {"header"},
"x-request-id": {"id123"},
"knative-1": {"knative-1-value"},
"knative-2": {"knative-2-value"},
},
body: "destination",
eventExtensions: map[string]string{
"abc": `"ce-abc-value"`,
},
expectedDestRequest: &requestValidation{
Headers: map[string][]string{
"x-request-id": {"id123"},
"knative-1": {"knative-1-value"},
"knative-2": {"knative-2-value"},
"prefer": {"reply"},
"traceparent": {"ignored-value-header"},
"ce-abc": {`"ce-abc-value"`},
"ce-id": {"ignored-value-header"},
"ce-time": {"ignored-value-header"},
"ce-source": {testCeSource},
"ce-type": {testCeType},
"ce-specversion": {cloudevents.VersionV1},
},
Body: `"destination"`,
},
fakeResponse: &http.Response{
StatusCode: http.StatusGatewayTimeout,
Body: io.NopCloser(bytes.NewBufferString("destination-response")),
},
expectedErr: true,
lastReceiver: "destination",
},
"error response term message": {
delivery: &v1.DeliverySpec{
Retry: &retryCount,
BackoffDelay: &backoffDelay,
BackoffPolicy: &backoffPolicy,
},
sendToDestination: true,
sendToReply: true,
header: map[string][]string{
// do-not-forward should not get forwarded.
"do-not-forward": {"header"},
"x-request-id": {"id123"},
"knative-1": {"knative-1-value"},
"knative-2": {"knative-2-value"},
},
body: "destination",
eventExtensions: map[string]string{
"abc": `"ce-abc-value"`,
},
expectedDestRequest: &requestValidation{
Headers: map[string][]string{
"x-request-id": {"id123"},
"knative-1": {"knative-1-value"},
"knative-2": {"knative-2-value"},
"prefer": {"reply"},
"traceparent": {"ignored-value-header"},
"ce-abc": {`"ce-abc-value"`},
"ce-id": {"ignored-value-header"},
"ce-time": {"ignored-value-header"},
"ce-source": {testCeSource},
"ce-type": {testCeType},
"ce-specversion": {cloudevents.VersionV1},
},
Body: `"destination"`,
},
fakeResponse: &http.Response{
StatusCode: http.StatusBadRequest,
Body: io.NopCloser(bytes.NewBufferString("destination-response")),
},
expectedErr: true,
lastReceiver: "destination",
},
}
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
Expand Down Expand Up @@ -863,12 +953,23 @@ func TestDispatchMessage(t *testing.T) {
headers = utils.PassThroughHeaders(tc.header)
}
msg := nats.Msg{}
noRetries := kncloudevents.NoRetries()
backoffLinear := v1.BackoffPolicyLinear
noRetries.BackoffPolicy = &backoffLinear
backoffDelay := "5s"
noRetries.BackoffDelay = &backoffDelay
info, err := md.DispatchMessageWithNatsRetries(ctx, message, headers, destination, reply, deadLetterSink, &noRetries, ackWait, &msg)

var retryConfig kncloudevents.RetryConfig
if tc.delivery == nil {
retryConfig = kncloudevents.NoRetries()
backoffLinear := v1.BackoffPolicyLinear
retryConfig.BackoffPolicy = &backoffLinear
backoffDelay := "5s"
retryConfig.BackoffDelay = &backoffDelay
} else {
retryConfig = kncloudevents.RetryConfig{
RetryMax: int(*tc.delivery.Retry),
BackoffPolicy: tc.delivery.BackoffPolicy,
BackoffDelay: tc.delivery.BackoffDelay,
}
}

info, err := md.DispatchMessageWithNatsRetries(ctx, message, headers, destination, reply, deadLetterSink, &retryConfig, ackWait, &msg)

if tc.lastReceiver != "" {
switch tc.lastReceiver {
Expand Down

0 comments on commit 54230bf

Please # to comment.