diff --git a/pkg/channel/jetstream/dispatcher/dispatcher_test.go b/pkg/channel/jetstream/dispatcher/dispatcher_test.go index d5cb61618..45b8bc4d3 100644 --- a/pkg/channel/jetstream/dispatcher/dispatcher_test.go +++ b/pkg/channel/jetstream/dispatcher/dispatcher_test.go @@ -112,7 +112,13 @@ func TestDispatcher_ReconcileConsumers(t *testing.T) { UID: subscriber2UID, }) - err = d.ReconcileConsumers(ctx, *configNew, true) + err = d.ReconcileConsumers(ctx, *configNew, false) + if err != nil { + t.Fatal(err) + } + + newConfig := createChannelConfig(nc) + err = d.ReconcileConsumers(ctx, *newConfig, true) if err != nil { t.Fatal(err) } diff --git a/pkg/channel/jetstream/dispatcher/message_dispatcher.go b/pkg/channel/jetstream/dispatcher/message_dispatcher.go index a6244c542..f46dfdf20 100644 --- a/pkg/channel/jetstream/dispatcher/message_dispatcher.go +++ b/pkg/channel/jetstream/dispatcher/message_dispatcher.go @@ -70,10 +70,7 @@ type NatsMessageDispatcherImpl struct { } func NewNatsMessageDispatcher(logger *zap.Logger) *NatsMessageDispatcherImpl { - sender, err := kncloudevents.NewHTTPMessageSenderWithTarget("") - if err != nil { - logger.Fatal("Unable to create cloudevents binding sender", zap.Error(err)) - } + sender, _ := kncloudevents.NewHTTPMessageSenderWithTarget("") return NewMessageDispatcherFromSender(logger, sender) } @@ -111,7 +108,7 @@ func (d *NatsMessageDispatcherImpl) DispatchMessageWithNatsRetries(ctx context.C retryNumber = int(meta.NumDelivered) } - if retryNumber <= retriesConfig.RetryMax { + if (retriesConfig != nil) && (retryNumber <= retriesConfig.RetryMax) { lastTry = false } else { lastTry = true @@ -212,8 +209,11 @@ func (d *NatsMessageDispatcherImpl) processDispatchResult(ctx context.Context, m zap.Any("dispatch_resp_code", dispatchExecutionInfo.ResponseCode)) code := dispatchExecutionInfo.ResponseCode - if code/100 == 5 || code == nethttp.StatusTooManyRequests || code == nethttp.StatusRequestTimeout { - // tell JSM to redeliver the message later + if retryConfig == nil || retryNumber >= retryConfig.RetryMax { + // no more retries, just ACK + result = protocol.ResultACK + } else if code/100 == 5 || code == nethttp.StatusTooManyRequests || code == nethttp.StatusRequestTimeout { + // NACK, tell JSM to redeliver the message later result = protocol.NewReceipt(false, "%w", err) } else { result = err diff --git a/pkg/channel/jetstream/dispatcher/message_dispatcher_test.go b/pkg/channel/jetstream/dispatcher/message_dispatcher_test.go index 8ec3bd22e..c5bacd289 100644 --- a/pkg/channel/jetstream/dispatcher/message_dispatcher_test.go +++ b/pkg/channel/jetstream/dispatcher/message_dispatcher_test.go @@ -74,6 +74,12 @@ const ( ackWait = 30 * time.Second ) +var ( + retryCount int32 = 3 + backoffDelay = "PT1S" + backoffPolicy = v1.BackoffPolicyLinear +) + func TestDispatchMessage(t *testing.T) { testCases := map[string]struct { sendToDestination bool @@ -90,6 +96,7 @@ func TestDispatchMessage(t *testing.T) { expectedReplyRequest *requestValidation expectedDeadLetterRequest *requestValidation lastReceiver string + delivery *v1.DeliverySpec }{ "destination - only": { sendToDestination: true, @@ -792,6 +799,89 @@ func TestDispatchMessage(t *testing.T) { lastReceiver: "reply", }, + "error response and retries": { + delivery: &v1.DeliverySpec{ + Retry: &retryCount, + BackoffDelay: &backoffDelay, + BackoffPolicy: &backoffPolicy, + }, + sendToDestination: true, + header: map[string][]string{ + // do-not-forward should not get forwarded. + "do-not-forward": {"header"}, + "x-request-id": {"id123"}, + "knative-1": {"knative-1-value"}, + "knative-2": {"knative-2-value"}, + }, + body: "destination", + eventExtensions: map[string]string{ + "abc": `"ce-abc-value"`, + }, + expectedDestRequest: &requestValidation{ + Headers: map[string][]string{ + "x-request-id": {"id123"}, + "knative-1": {"knative-1-value"}, + "knative-2": {"knative-2-value"}, + "prefer": {"reply"}, + "traceparent": {"ignored-value-header"}, + "ce-abc": {`"ce-abc-value"`}, + "ce-id": {"ignored-value-header"}, + "ce-time": {"ignored-value-header"}, + "ce-source": {testCeSource}, + "ce-type": {testCeType}, + "ce-specversion": {cloudevents.VersionV1}, + }, + Body: `"destination"`, + }, + fakeResponse: &http.Response{ + StatusCode: http.StatusGatewayTimeout, + Body: io.NopCloser(bytes.NewBufferString("destination-response")), + }, + expectedErr: true, + lastReceiver: "destination", + }, + "error response term message": { + delivery: &v1.DeliverySpec{ + Retry: &retryCount, + BackoffDelay: &backoffDelay, + BackoffPolicy: &backoffPolicy, + }, + sendToDestination: true, + sendToReply: true, + header: map[string][]string{ + // do-not-forward should not get forwarded. + "do-not-forward": {"header"}, + "x-request-id": {"id123"}, + "knative-1": {"knative-1-value"}, + "knative-2": {"knative-2-value"}, + }, + body: "destination", + eventExtensions: map[string]string{ + "abc": `"ce-abc-value"`, + }, + expectedDestRequest: &requestValidation{ + Headers: map[string][]string{ + "x-request-id": {"id123"}, + "knative-1": {"knative-1-value"}, + "knative-2": {"knative-2-value"}, + "prefer": {"reply"}, + "traceparent": {"ignored-value-header"}, + "ce-abc": {`"ce-abc-value"`}, + "ce-id": {"ignored-value-header"}, + "ce-time": {"ignored-value-header"}, + "ce-source": {testCeSource}, + "ce-type": {testCeType}, + "ce-specversion": {cloudevents.VersionV1}, + }, + Body: `"destination"`, + }, + fakeResponse: &http.Response{ + StatusCode: http.StatusBadRequest, + Body: io.NopCloser(bytes.NewBufferString("destination-response")), + }, + expectedErr: true, + lastReceiver: "destination", + }, } for n, tc := range testCases { t.Run(n, func(t *testing.T) { @@ -863,12 +953,23 @@ func TestDispatchMessage(t *testing.T) { headers = utils.PassThroughHeaders(tc.header) } msg := nats.Msg{} - noRetries := kncloudevents.NoRetries() - backoffLinear := v1.BackoffPolicyLinear - noRetries.BackoffPolicy = &backoffLinear - backoffDelay := "5s" - noRetries.BackoffDelay = &backoffDelay - info, err := md.DispatchMessageWithNatsRetries(ctx, message, headers, destination, reply, deadLetterSink, &noRetries, ackWait, &msg) + + var retryConfig kncloudevents.RetryConfig + if tc.delivery == nil { + retryConfig = kncloudevents.NoRetries() + backoffLinear := v1.BackoffPolicyLinear + retryConfig.BackoffPolicy = &backoffLinear + backoffDelay := "5s" + retryConfig.BackoffDelay = &backoffDelay + } else { + retryConfig = kncloudevents.RetryConfig{ + RetryMax: int(*tc.delivery.Retry), + BackoffPolicy: tc.delivery.BackoffPolicy, + BackoffDelay: tc.delivery.BackoffDelay, + } + } + + info, err := md.DispatchMessageWithNatsRetries(ctx, message, headers, destination, reply, deadLetterSink, &retryConfig, ackWait, &msg) if tc.lastReceiver != "" { switch tc.lastReceiver {