From a844893630bfdc8ba8a4aa76a492962d0ae734d8 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Fri, 23 Aug 2024 18:05:17 +0530 Subject: [PATCH 1/8] added websocket retry logic, added input & config tests and updated docs --- .../docs/inputs/input-streaming.asciidoc | 37 +++++ x-pack/filebeat/input/streaming/config.go | 27 ++++ .../filebeat/input/streaming/config_test.go | 68 ++++++++ x-pack/filebeat/input/streaming/input_test.go | 146 ++++++++++++++---- x-pack/filebeat/input/streaming/websocket.go | 90 +++++++++-- 5 files changed, 321 insertions(+), 47 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-streaming.asciidoc b/x-pack/filebeat/docs/inputs/input-streaming.asciidoc index 3ec257f852d..8121d800c20 100644 --- a/x-pack/filebeat/docs/inputs/input-streaming.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-streaming.asciidoc @@ -266,6 +266,43 @@ This specifies fields in the `state` to be redacted prior to debug logging. Fiel This specifies whether fields should be replaced with a `*` or deleted entirely from messages sent to debug logs. If delete is `true`, fields will be deleted rather than replaced. +[[retry-streaming]] +[float] +==== `retry` + +The `retry` configuration allows the user to specify the number of times the input should attempt to reconnect to the streaming data source in the event of a connection failure. The default value is `nil` which means no retries will be attempted. It has a `wait_min` and `wait_max` configuration which specifies the minimum and maximum time to wait between retries. + +["source","yaml",subs="attributes"] +---- +filebeat.inputs: +- type: streaming + url: ws://localhost:443/_stream + program: | + bytes(state.response).decode_json().as(inner_body,{ + "events": { + "message": inner_body.encode_json(), + } + }) + retry: + max_attempts: 5 + wait_min: 1s + wait_max: 10s +---- +[float] +==== `retry.max_attempts` + +The maximum number of times the input should attempt to reconnect to the streaming data source in the event of a connection failure. The default value is `nil` which means no retries will be attempted. + +[float] +==== `retry.wait_min` + +The minimum time to wait between retries. This ensures that retries are spaced out enough to give the system time to recover or resolve transient issues, rather than bombarding the system with rapid retries. For example, `wait_min` might be set to 1 second, meaning that even if the calculated backoff is less than this, the client will wait at least 1 second before retrying. + +[float] +==== `retry.wait_max` + +The maximum time to wait between retries. prevents the retry mechanism from becoming too slow, ensuring that the client does not wait indefinitely between retries. This is crucial in systems where timeouts or user experience are critical. For example, `wait_max` might be set to 10 seconds, meaning that even if the calculated backoff is greater than this, the client will wait at most 10 seconds before retrying. + [float] === Metrics diff --git a/x-pack/filebeat/input/streaming/config.go b/x-pack/filebeat/input/streaming/config.go index be4ff28180e..750db63110b 100644 --- a/x-pack/filebeat/input/streaming/config.go +++ b/x-pack/filebeat/input/streaming/config.go @@ -6,9 +6,11 @@ package streaming import ( "context" + "errors" "fmt" "net/url" "regexp" + "time" "github.com/elastic/elastic-agent-libs/logp" ) @@ -32,6 +34,8 @@ type config struct { URL *urlConfig `config:"url" validate:"required"` // Redact is the debug log state redaction configuration. Redact *redact `config:"redact"` + // Retry is the configuration for retrying failed connections. + Retry *retry `config:"retry"` } type redact struct { @@ -43,6 +47,12 @@ type redact struct { Delete bool `config:"delete"` } +type retry struct { + MaxAttempts *int `config:"max_attempts"` + WaitMin *time.Duration `config:"wait_min"` + WaitMax *time.Duration `config:"wait_max"` +} + type authConfig struct { // Custom auth config to use for authentication. CustomAuth *customAuthConfig `config:"custom"` @@ -94,6 +104,23 @@ func (c config) Validate() error { if err != nil { return err } + + if c.Retry != nil { + switch { + case c.Retry.MaxAttempts != nil && *c.Retry.MaxAttempts <= 0: + return errors.New("max_attempts must be greater than zero") + case c.Retry.MaxAttempts != nil && c.Retry.WaitMin == nil || c.Retry.WaitMax == nil: + return errors.New("wait_min and wait_max must be set if max_attempts is set") + case (c.Retry.WaitMin != nil || c.Retry.WaitMax != nil) && c.Retry.MaxAttempts == nil: + return errors.New("max_attempts must be set if wait_min or wait_max is set") + case c.Retry.WaitMin != nil && c.Retry.WaitMax == nil: + return errors.New("wait_max must be set if wait_min is set") + case c.Retry.WaitMin == nil && c.Retry.WaitMax != nil: + return errors.New("wait_min must be set if wait_max is set") + case c.Retry.WaitMin != nil && c.Retry.WaitMax != nil && *c.Retry.WaitMin > *c.Retry.WaitMax: + return errors.New("wait_min must be less than or equal to wait_max") + } + } return nil } diff --git a/x-pack/filebeat/input/streaming/config_test.go b/x-pack/filebeat/input/streaming/config_test.go index c74fba4589e..d061ccec0c9 100644 --- a/x-pack/filebeat/input/streaming/config_test.go +++ b/x-pack/filebeat/input/streaming/config_test.go @@ -95,6 +95,74 @@ var configTests = []struct { }, }, }, + { + name: "invalid_retry_missing_wait_max", + config: map[string]interface{}{ + "retry": map[string]interface{}{ + "max_attempts": 3, + "wait_min": "1s", + }, + "url": "wss://localhost:443/v1/stream", + }, + wantErr: fmt.Errorf("wait_min and wait_max must be set if max_attempts is set accessing config"), + }, + { + name: "invalid_retry_missing_wait_min", + config: map[string]interface{}{ + "retry": map[string]interface{}{ + "max_attempts": 3, + "wait_max": "2s", + }, + "url": "wss://localhost:443/v1/stream", + }, + wantErr: fmt.Errorf("wait_min and wait_max must be set if max_attempts is set accessing config"), + }, + { + name: "invalid_retry_wait_min_greater_than_wait_max", + config: map[string]interface{}{ + "retry": map[string]interface{}{ + "max_attempts": 3, + "wait_min": "3s", + "wait_max": "2s", + }, + "url": "wss://localhost:443/v1/stream", + }, + wantErr: fmt.Errorf("wait_min must be less than or equal to wait_max accessing config"), + }, + { + name: "invalid_retry_missing_max_attempts", + config: map[string]interface{}{ + "retry": map[string]interface{}{ + "wait_min": "1s", + "wait_max": "2s", + }, + "url": "wss://localhost:443/v1/stream", + }, + wantErr: fmt.Errorf("max_attempts must be set if wait_min or wait_max is set accessing config"), + }, + { + name: "invalid_retry_max_attempts_eq_zero", + config: map[string]interface{}{ + "retry": map[string]interface{}{ + "max_attempts": 0, + "wait_min": "1s", + "wait_max": "2s", + }, + "url": "wss://localhost:443/v1/stream", + }, + wantErr: fmt.Errorf("max_attempts must be greater than zero accessing config"), + }, + { + name: "valid_retry", + config: map[string]interface{}{ + "retry": map[string]interface{}{ + "max_attempts": 3, + "wait_min": "2s", + "wait_max": "5s", + }, + "url": "wss://localhost:443/v1/stream", + }, + }, } func TestConfig(t *testing.T) { diff --git a/x-pack/filebeat/input/streaming/input_test.go b/x-pack/filebeat/input/streaming/input_test.go index 69a65c8207f..6d382bdf664 100644 --- a/x-pack/filebeat/input/streaming/input_test.go +++ b/x-pack/filebeat/input/streaming/input_test.go @@ -265,9 +265,9 @@ var inputTests = []struct { handler: defaultHandler, config: map[string]interface{}{ "program": ` - bytes(state.response).decode_json().as(inner_body,{ - "events": has(state.cursor) && inner_body.ts > state.cursor.last_updated ? [inner_body] : [], - })`, + bytes(state.response).decode_json().as(inner_body,{ + "events": has(state.cursor) && inner_body.ts > state.cursor.last_updated ? [inner_body] : [], + })`, "state": map[string]interface{}{ "cursor": map[string]int{ "last_updated": 1502908200, @@ -275,20 +275,20 @@ var inputTests = []struct { }, }, response: []string{` - { - "pps": { - "agent": "example.proofpoint.com", - "cid": "mmeng_uivm071" - }, - "ts": 1502908200 - }`, + { + "pps": { + "agent": "example.proofpoint.com", + "cid": "mmeng_uivm071" + }, + "ts": 1502908200 + }`, `{ - "pps": { - "agent": "example.proofpoint-1.com", - "cid": "mmeng_vxciml" - }, - "ts": 1503081000 - }`, + "pps": { + "agent": "example.proofpoint-1.com", + "cid": "mmeng_vxciml" + }, + "ts": 1503081000 + }`, }, want: []map[string]interface{}{ { @@ -314,13 +314,13 @@ var inputTests = []struct { }, }, response: []string{` - { - "pps": { - "agent": "example.proofpoint.com", - "cid": "mmeng_uivm071" - }, - "ts": 1502908200 - }`, + { + "pps": { + "agent": "example.proofpoint.com", + "cid": "mmeng_uivm071" + }, + "ts": 1502908200 + }`, }, want: []map[string]interface{}{ { @@ -346,13 +346,13 @@ var inputTests = []struct { }, }, response: []string{` - { - "pps": { - "agent": "example.proofpoint.com", - "cid": "mmeng_uivm071" - }, - "ts": 1502908200 - }`, + { + "pps": { + "agent": "example.proofpoint.com", + "cid": "mmeng_uivm071" + }, + "ts": 1502908200 + }`, }, want: []map[string]interface{}{ { @@ -381,6 +381,40 @@ var inputTests = []struct { }, }, response: []string{` + { + "pps": { + "agent": "example.proofpoint.com", + "cid": "mmeng_uivm071" + }, + "ts": 1502908200 + }`, + }, + want: []map[string]interface{}{ + { + "pps": map[string]interface{}{ + "agent": "example.proofpoint.com", + "cid": "mmeng_uivm071", + }, + "ts": float64(1502908200), + }, + }, + }, + { + name: "test_retry_success", + server: webSocketServerWithRetry(httptest.NewServer), + handler: defaultHandler, + config: map[string]interface{}{ + "program": ` + bytes(state.response).decode_json().as(inner_body,{ + "events": [inner_body], + })`, + "retry": map[string]interface{}{ + "max_attempts": 3, + "wait_min": "1s", + "wait_max": "2s", + }, + }, + response: []string{` { "pps": { "agent": "example.proofpoint.com", @@ -399,6 +433,23 @@ var inputTests = []struct { }, }, }, + { + name: "test_retry_failure", + server: webSocketServerWithRetry(httptest.NewServer), + handler: defaultHandler, + config: map[string]interface{}{ + "program": ` + bytes(state.response).decode_json().as(inner_body,{ + "events": [inner_body], + })`, + "retry": map[string]interface{}{ + "max_attempts": 2, + "wait_min": "1s", + "wait_max": "2s", + }, + }, + wantErr: fmt.Errorf("failed to establish WebSocket connection after 2 attempts with error websocket: bad handshake"), + }, } var urlEvalTests = []struct { @@ -533,7 +584,7 @@ func TestInput(t *testing.T) { t.Fatalf("unexpected error running test: %v", err) } - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() v2Ctx := v2.Context{ @@ -687,6 +738,39 @@ func webSocketTestServerWithAuth(serve func(http.Handler) *httptest.Server) func } } +// webSocketServerWithRetry returns a function that creates a WebSocket server that rejects the first two connection attempts and accepts the third. +func webSocketServerWithRetry(serve func(http.Handler) *httptest.Server) func(*testing.T, WebSocketHandler, map[string]interface{}, []string) { + var attempt int + return func(t *testing.T, handler WebSocketHandler, config map[string]interface{}, response []string) { + server := serve(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + attempt++ + if attempt <= 2 { + w.WriteHeader(http.StatusForbidden) + fmt.Fprintf(w, "connection attempt %d rejected", attempt) + return + } + upgrader := websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true + }, + } + + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + t.Fatalf("error upgrading connection to WebSocket: %v", err) + return + } + + handler(t, conn, response) + })) + // only set the resource URL if it is not already set + if config["url"] == nil { + config["url"] = "ws" + server.URL[4:] + } + t.Cleanup(server.Close) + } +} + // defaultHandler is a default handler for WebSocket connections. func defaultHandler(t *testing.T, conn *websocket.Conn, response []string) { for _, r := range response { diff --git a/x-pack/filebeat/input/streaming/websocket.go b/x-pack/filebeat/input/streaming/websocket.go index 409d1ee369b..4550b0287e4 100644 --- a/x-pack/filebeat/input/streaming/websocket.go +++ b/x-pack/filebeat/input/streaming/websocket.go @@ -7,7 +7,11 @@ package streaming import ( "bytes" "context" + "fmt" "io" + "math" + "math/rand/v2" + "net/http" "time" "github.com/gorilla/websocket" @@ -82,25 +86,14 @@ func (s *websocketStream) FollowStream(ctx context.Context) error { } // websocket client - headers := formHeader(s.cfg) - c, resp, err := websocket.DefaultDialer.DialContext(ctx, url, headers) - if resp != nil && resp.Body != nil { - var buf bytes.Buffer - if s.log.Core().Enabled(zapcore.DebugLevel) { - const limit = 1e4 - io.CopyN(&buf, resp.Body, limit) - } - if n, _ := io.Copy(io.Discard, resp.Body); n != 0 && buf.Len() != 0 { - buf.WriteString("... truncated") - } - s.log.Debugw("websocket connection response", "body", &buf) - resp.Body.Close() - } + c, resp, err := connectWebSocket(ctx, s.cfg, url, s.log) + handleConnctionResponse(resp, s.log) if err != nil { s.metrics.errorsTotal.Inc() s.log.Errorw("failed to establish websocket connection", "error", err) return err } + // ensures this is the last connection closed when the function returns defer c.Close() for { @@ -108,11 +101,21 @@ func (s *websocketStream) FollowStream(ctx context.Context) error { if err != nil { s.metrics.errorsTotal.Inc() if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { - s.log.Errorw("websocket connection closed", "error", err) + s.log.Debugw("websocket connection closed, attempting to reconnect...", "error", err) + // close the old connection and reconnect + c.Close() + // since c is already a pointer, we can reassign it to the new connection and the defer will still handle it + c, resp, err = connectWebSocket(ctx, s.cfg, url, s.log) + handleConnctionResponse(resp, s.log) + if err != nil { + s.metrics.errorsTotal.Inc() + s.log.Errorw("failed to reconnect websocket connection", "error", err) + return err + } } else { s.log.Errorw("failed to read websocket data", "error", err) + return err } - return err } s.metrics.receivedBytesTotal.Add(uint64(len(message))) state["response"] = message @@ -126,6 +129,61 @@ func (s *websocketStream) FollowStream(ctx context.Context) error { } } +// handleConnctionResponse logs the response body of the websocket connection. +func handleConnctionResponse(resp *http.Response, log *logp.Logger) { + if resp != nil && resp.Body != nil { + var buf bytes.Buffer + if log.Core().Enabled(zapcore.DebugLevel) { + const limit = 1e4 + io.CopyN(&buf, resp.Body, limit) + } + if n, _ := io.Copy(io.Discard, resp.Body); n != 0 && buf.Len() != 0 { + buf.WriteString("... truncated") + } + log.Debugw("websocket connection response", "body", &buf) + resp.Body.Close() + } +} + +// connectWebSocket attempts to connect to the websocket server with exponential backoff if retry config is available else it connects without retry. +func connectWebSocket(ctx context.Context, cfg config, url string, log *logp.Logger) (*websocket.Conn, *http.Response, error) { + var conn *websocket.Conn + var response *http.Response + var err error + headers := formHeader(cfg) + + if cfg.Retry != nil { + retryConfig := cfg.Retry + for attempt := 1; attempt <= *retryConfig.MaxAttempts; attempt++ { + conn, response, err = websocket.DefaultDialer.Dial(url, nil) + if err == nil { + return conn, response, nil + } + log.Debugw("attempt %d: webSocket connection failed. retrying...\n", attempt) + waitTime := calculateWaitTime(*retryConfig.WaitMin, *retryConfig.WaitMax, attempt) + time.Sleep(waitTime) + } + return nil, nil, fmt.Errorf("failed to establish WebSocket connection after %d attempts with error %w", *retryConfig.MaxAttempts, err) + } + + return websocket.DefaultDialer.DialContext(ctx, url, headers) +} + +// calculateWaitTime calculates the wait time for the next attempt based on the exponential backoff algorithm. +func calculateWaitTime(waitMin, waitMax time.Duration, attempt int) time.Duration { + // Calculate exponential backoff + base := float64(waitMin) + backoff := base * math.Pow(2, float64(attempt-1)) + + // Calculate jitter proportional to the backoff + maxJitter := float64(waitMax-waitMin) * math.Pow(2, float64(attempt-1)) + jitter := rand.Float64() * maxJitter + + waitTime := time.Duration(backoff + jitter) + + return waitTime +} + // now is time.Now with a modifiable time source. func (s *websocketStream) now() time.Time { if s.time == nil { From 5ced095aaff506ec3f2cee462c44824b2f103f60 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Fri, 23 Aug 2024 18:26:24 +0530 Subject: [PATCH 2/8] updated changelog --- CHANGELOG.next.asciidoc | 1 + x-pack/filebeat/input/streaming/websocket.go | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 86da7a16904..7b49f874af6 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -282,6 +282,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Update CEL mito extensions to v1.15.0. {pull}40294[40294] - Allow cross-region bucket configuration in s3 input. {issue}22161[22161] {pull}40309[40309] - Improve logging in Okta Entity Analytics provider. {issue}40106[40106] {pull}40347[40347] +- Added retry logic to websocket connections in the streaming input. {issue}40271[40271] {pull}40601[40601] *Auditbeat* diff --git a/x-pack/filebeat/input/streaming/websocket.go b/x-pack/filebeat/input/streaming/websocket.go index 4550b0287e4..4391ec87d39 100644 --- a/x-pack/filebeat/input/streaming/websocket.go +++ b/x-pack/filebeat/input/streaming/websocket.go @@ -171,11 +171,11 @@ func connectWebSocket(ctx context.Context, cfg config, url string, log *logp.Log // calculateWaitTime calculates the wait time for the next attempt based on the exponential backoff algorithm. func calculateWaitTime(waitMin, waitMax time.Duration, attempt int) time.Duration { - // Calculate exponential backoff + // calculate exponential backoff base := float64(waitMin) backoff := base * math.Pow(2, float64(attempt-1)) - // Calculate jitter proportional to the backoff + // calculate jitter proportional to the backoff maxJitter := float64(waitMax-waitMin) * math.Pow(2, float64(attempt-1)) jitter := rand.Float64() * maxJitter From ff12b50572c07cdee52a898fa2ce351b4e1a451e Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Fri, 23 Aug 2024 18:36:16 +0530 Subject: [PATCH 3/8] fixed function name spelling error --- x-pack/filebeat/input/streaming/websocket.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/x-pack/filebeat/input/streaming/websocket.go b/x-pack/filebeat/input/streaming/websocket.go index 4391ec87d39..d207cb3a971 100644 --- a/x-pack/filebeat/input/streaming/websocket.go +++ b/x-pack/filebeat/input/streaming/websocket.go @@ -87,7 +87,7 @@ func (s *websocketStream) FollowStream(ctx context.Context) error { // websocket client c, resp, err := connectWebSocket(ctx, s.cfg, url, s.log) - handleConnctionResponse(resp, s.log) + handleConnectionResponse(resp, s.log) if err != nil { s.metrics.errorsTotal.Inc() s.log.Errorw("failed to establish websocket connection", "error", err) @@ -106,7 +106,7 @@ func (s *websocketStream) FollowStream(ctx context.Context) error { c.Close() // since c is already a pointer, we can reassign it to the new connection and the defer will still handle it c, resp, err = connectWebSocket(ctx, s.cfg, url, s.log) - handleConnctionResponse(resp, s.log) + handleConnectionResponse(resp, s.log) if err != nil { s.metrics.errorsTotal.Inc() s.log.Errorw("failed to reconnect websocket connection", "error", err) @@ -129,8 +129,8 @@ func (s *websocketStream) FollowStream(ctx context.Context) error { } } -// handleConnctionResponse logs the response body of the websocket connection. -func handleConnctionResponse(resp *http.Response, log *logp.Logger) { +// handleConnectionResponse logs the response body of the websocket connection. +func handleConnectionResponse(resp *http.Response, log *logp.Logger) { if resp != nil && resp.Body != nil { var buf bytes.Buffer if log.Core().Enabled(zapcore.DebugLevel) { From 9e6f6647d3ac3ba098632b7246858af7bf3d8233 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Mon, 26 Aug 2024 14:55:36 +0530 Subject: [PATCH 4/8] added a retryable error check --- x-pack/filebeat/input/streaming/websocket.go | 98 +++++++++++++++----- 1 file changed, 75 insertions(+), 23 deletions(-) diff --git a/x-pack/filebeat/input/streaming/websocket.go b/x-pack/filebeat/input/streaming/websocket.go index d207cb3a971..0488c274bc0 100644 --- a/x-pack/filebeat/input/streaming/websocket.go +++ b/x-pack/filebeat/input/streaming/websocket.go @@ -7,11 +7,14 @@ package streaming import ( "bytes" "context" + "errors" "fmt" "io" "math" "math/rand/v2" + "net" "net/http" + "strings" "time" "github.com/gorilla/websocket" @@ -97,36 +100,84 @@ func (s *websocketStream) FollowStream(ctx context.Context) error { defer c.Close() for { - _, message, err := c.ReadMessage() - if err != nil { - s.metrics.errorsTotal.Inc() - if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { - s.log.Debugw("websocket connection closed, attempting to reconnect...", "error", err) - // close the old connection and reconnect - c.Close() - // since c is already a pointer, we can reassign it to the new connection and the defer will still handle it - c, resp, err = connectWebSocket(ctx, s.cfg, url, s.log) - handleConnectionResponse(resp, s.log) - if err != nil { - s.metrics.errorsTotal.Inc() - s.log.Errorw("failed to reconnect websocket connection", "error", err) + select { + case <-ctx.Done(): + s.log.Debugw("context cancelled, closing websocket connection") + c.Close() + return ctx.Err() + default: + _, message, err := c.ReadMessage() + if err != nil { + s.metrics.errorsTotal.Inc() + if isRetryableError(err) { + s.log.Debugw("websocket connection encountered an error, attempting to reconnect...", "error", err) + // close the old connection and reconnect + c.Close() + // since c is already a pointer, we can reassign it to the new connection and the defer will still handle it + c, resp, err = connectWebSocket(ctx, s.cfg, url, s.log) + handleConnectionResponse(resp, s.log) + if err != nil { + s.metrics.errorsTotal.Inc() + s.log.Errorw("failed to reconnect websocket connection", "error", err) + return err + } + } else { + s.log.Errorw("failed to read websocket data", "error", err) return err } - } else { - s.log.Errorw("failed to read websocket data", "error", err) + } + s.metrics.receivedBytesTotal.Add(uint64(len(message))) + state["response"] = message + s.log.Debugw("received websocket message", logp.Namespace("websocket"), string(message)) + err = s.process(ctx, state, s.cursor, s.now().In(time.UTC)) + if err != nil { + s.metrics.errorsTotal.Inc() + s.log.Errorw("failed to process and publish data", "error", err) return err } } - s.metrics.receivedBytesTotal.Add(uint64(len(message))) - state["response"] = message - s.log.Debugw("received websocket message", logp.Namespace("websocket"), string(message)) - err = s.process(ctx, state, s.cursor, s.now().In(time.UTC)) - if err != nil { - s.metrics.errorsTotal.Inc() - s.log.Errorw("failed to process and publish data", "error", err) - return err + } +} + +// isRetryableError checks if the error is retryable based on the error type. +func isRetryableError(err error) bool { + // check for specific network errors + var netErr *net.OpError + if errors.As(err, &netErr) { + switch { + case netErr.Op == "dial" && netErr.Err.Error() == "i/o timeout", + netErr.Op == "read" && netErr.Err.Error() == "i/o timeout", + netErr.Op == "read" && netErr.Err.Error() == "connection reset by peer", + netErr.Op == "read" && netErr.Err.Error() == "connection refused", + netErr.Op == "read" && netErr.Err.Error() == "connection reset", + netErr.Op == "read" && netErr.Err.Error() == "connection closed": + return true + } + } + + // check for specific websocket close errors + var closeErr *websocket.CloseError + if errors.As(err, &closeErr) { + switch closeErr.Code { + case websocket.CloseGoingAway, + websocket.CloseNormalClosure, + websocket.CloseInternalServerErr, + websocket.CloseTryAgainLater, + websocket.CloseServiceRestart, + websocket.CloseTLSHandshake: + return true } } + + // check for common error patterns + if strings.Contains(err.Error(), "timeout") || + strings.Contains(err.Error(), "connection reset") || + strings.Contains(err.Error(), "temporary failure") || + strings.Contains(err.Error(), "server is busy") { + return true + } + + return false } // handleConnectionResponse logs the response body of the websocket connection. @@ -135,6 +186,7 @@ func handleConnectionResponse(resp *http.Response, log *logp.Logger) { var buf bytes.Buffer if log.Core().Enabled(zapcore.DebugLevel) { const limit = 1e4 + //nolint:errcheck // ignore error since if this fails it signals deeper issues with the system and the code should panic io.CopyN(&buf, resp.Body, limit) } if n, _ := io.Copy(io.Discard, resp.Body); n != 0 && buf.Len() != 0 { From a2277876a305156a2f80260dfe5eb953eef4ee63 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Tue, 27 Aug 2024 14:53:54 +0530 Subject: [PATCH 5/8] addressed PR comments --- .../docs/inputs/input-streaming.asciidoc | 2 +- x-pack/filebeat/input/streaming/config.go | 18 +++------ .../filebeat/input/streaming/config_test.go | 33 ---------------- x-pack/filebeat/input/streaming/websocket.go | 38 +++++++++++++------ 4 files changed, 33 insertions(+), 58 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-streaming.asciidoc b/x-pack/filebeat/docs/inputs/input-streaming.asciidoc index 8121d800c20..4dc4e426dd3 100644 --- a/x-pack/filebeat/docs/inputs/input-streaming.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-streaming.asciidoc @@ -301,7 +301,7 @@ The minimum time to wait between retries. This ensures that retries are spaced o [float] ==== `retry.wait_max` -The maximum time to wait between retries. prevents the retry mechanism from becoming too slow, ensuring that the client does not wait indefinitely between retries. This is crucial in systems where timeouts or user experience are critical. For example, `wait_max` might be set to 10 seconds, meaning that even if the calculated backoff is greater than this, the client will wait at most 10 seconds before retrying. +The maximum time to wait between retries. This prevents the retry mechanism from becoming too slow, ensuring that the client does not wait indefinitely between retries. This is crucial in systems where timeouts or user experience are critical. For example, `wait_max` might be set to 10 seconds, meaning that even if the calculated backoff is greater than this, the client will wait at most 10 seconds before retrying. [float] === Metrics diff --git a/x-pack/filebeat/input/streaming/config.go b/x-pack/filebeat/input/streaming/config.go index 750db63110b..bcec48a9568 100644 --- a/x-pack/filebeat/input/streaming/config.go +++ b/x-pack/filebeat/input/streaming/config.go @@ -48,9 +48,9 @@ type redact struct { } type retry struct { - MaxAttempts *int `config:"max_attempts"` - WaitMin *time.Duration `config:"wait_min"` - WaitMax *time.Duration `config:"wait_max"` + MaxAttempts int `config:"max_attempts"` + WaitMin time.Duration `config:"wait_min"` + WaitMax time.Duration `config:"wait_max"` } type authConfig struct { @@ -107,17 +107,9 @@ func (c config) Validate() error { if c.Retry != nil { switch { - case c.Retry.MaxAttempts != nil && *c.Retry.MaxAttempts <= 0: + case c.Retry.MaxAttempts <= 0: return errors.New("max_attempts must be greater than zero") - case c.Retry.MaxAttempts != nil && c.Retry.WaitMin == nil || c.Retry.WaitMax == nil: - return errors.New("wait_min and wait_max must be set if max_attempts is set") - case (c.Retry.WaitMin != nil || c.Retry.WaitMax != nil) && c.Retry.MaxAttempts == nil: - return errors.New("max_attempts must be set if wait_min or wait_max is set") - case c.Retry.WaitMin != nil && c.Retry.WaitMax == nil: - return errors.New("wait_max must be set if wait_min is set") - case c.Retry.WaitMin == nil && c.Retry.WaitMax != nil: - return errors.New("wait_min must be set if wait_max is set") - case c.Retry.WaitMin != nil && c.Retry.WaitMax != nil && *c.Retry.WaitMin > *c.Retry.WaitMax: + case c.Retry.WaitMin > c.Retry.WaitMax: return errors.New("wait_min must be less than or equal to wait_max") } } diff --git a/x-pack/filebeat/input/streaming/config_test.go b/x-pack/filebeat/input/streaming/config_test.go index d061ccec0c9..840c35d400f 100644 --- a/x-pack/filebeat/input/streaming/config_test.go +++ b/x-pack/filebeat/input/streaming/config_test.go @@ -95,28 +95,6 @@ var configTests = []struct { }, }, }, - { - name: "invalid_retry_missing_wait_max", - config: map[string]interface{}{ - "retry": map[string]interface{}{ - "max_attempts": 3, - "wait_min": "1s", - }, - "url": "wss://localhost:443/v1/stream", - }, - wantErr: fmt.Errorf("wait_min and wait_max must be set if max_attempts is set accessing config"), - }, - { - name: "invalid_retry_missing_wait_min", - config: map[string]interface{}{ - "retry": map[string]interface{}{ - "max_attempts": 3, - "wait_max": "2s", - }, - "url": "wss://localhost:443/v1/stream", - }, - wantErr: fmt.Errorf("wait_min and wait_max must be set if max_attempts is set accessing config"), - }, { name: "invalid_retry_wait_min_greater_than_wait_max", config: map[string]interface{}{ @@ -129,17 +107,6 @@ var configTests = []struct { }, wantErr: fmt.Errorf("wait_min must be less than or equal to wait_max accessing config"), }, - { - name: "invalid_retry_missing_max_attempts", - config: map[string]interface{}{ - "retry": map[string]interface{}{ - "wait_min": "1s", - "wait_max": "2s", - }, - "url": "wss://localhost:443/v1/stream", - }, - wantErr: fmt.Errorf("max_attempts must be set if wait_min or wait_max is set accessing config"), - }, { name: "invalid_retry_max_attempts_eq_zero", config: map[string]interface{}{ diff --git a/x-pack/filebeat/input/streaming/websocket.go b/x-pack/filebeat/input/streaming/websocket.go index 0488c274bc0..282735efe28 100644 --- a/x-pack/filebeat/input/streaming/websocket.go +++ b/x-pack/filebeat/input/streaming/websocket.go @@ -96,14 +96,19 @@ func (s *websocketStream) FollowStream(ctx context.Context) error { s.log.Errorw("failed to establish websocket connection", "error", err) return err } + // ensures this is the last connection closed when the function returns - defer c.Close() + defer func() { + if err := c.Close(); err != nil { + s.metrics.errorsTotal.Inc() + s.log.Errorw("encountered an error while closing the websocket connection", "error", err) + } + }() for { select { case <-ctx.Done(): s.log.Debugw("context cancelled, closing websocket connection") - c.Close() return ctx.Err() default: _, message, err := c.ReadMessage() @@ -112,8 +117,11 @@ func (s *websocketStream) FollowStream(ctx context.Context) error { if isRetryableError(err) { s.log.Debugw("websocket connection encountered an error, attempting to reconnect...", "error", err) // close the old connection and reconnect - c.Close() - // since c is already a pointer, we can reassign it to the new connection and the defer will still handle it + if err := c.Close(); err != nil { + s.metrics.errorsTotal.Inc() + s.log.Errorw("encountered an error while closing the websocket connection", "error", err) + } + // since c is already a pointer, we can reassign it to the new connection and the defer func will still handle it c, resp, err = connectWebSocket(ctx, s.cfg, url, s.log) handleConnectionResponse(resp, s.log) if err != nil { @@ -184,16 +192,24 @@ func isRetryableError(err error) bool { func handleConnectionResponse(resp *http.Response, log *logp.Logger) { if resp != nil && resp.Body != nil { var buf bytes.Buffer + defer resp.Body.Close() + if log.Core().Enabled(zapcore.DebugLevel) { const limit = 1e4 - //nolint:errcheck // ignore error since if this fails it signals deeper issues with the system and the code should panic - io.CopyN(&buf, resp.Body, limit) + if _, err := io.CopyN(&buf, resp.Body, limit); err != nil && !errors.Is(err, io.EOF) { + log.Errorw("failed to read websocket response body", "error", err) + return + } } - if n, _ := io.Copy(io.Discard, resp.Body); n != 0 && buf.Len() != 0 { + + // discard the remaining part of the body and check for truncation. + if n, err := io.Copy(io.Discard, resp.Body); err != nil { + log.Errorw("failed to discard remaining response body", "error", err) + } else if n != 0 && buf.Len() != 0 { buf.WriteString("... truncated") } + log.Debugw("websocket connection response", "body", &buf) - resp.Body.Close() } } @@ -206,16 +222,16 @@ func connectWebSocket(ctx context.Context, cfg config, url string, log *logp.Log if cfg.Retry != nil { retryConfig := cfg.Retry - for attempt := 1; attempt <= *retryConfig.MaxAttempts; attempt++ { + for attempt := 1; attempt <= retryConfig.MaxAttempts; attempt++ { conn, response, err = websocket.DefaultDialer.Dial(url, nil) if err == nil { return conn, response, nil } log.Debugw("attempt %d: webSocket connection failed. retrying...\n", attempt) - waitTime := calculateWaitTime(*retryConfig.WaitMin, *retryConfig.WaitMax, attempt) + waitTime := calculateWaitTime(retryConfig.WaitMin, retryConfig.WaitMax, attempt) time.Sleep(waitTime) } - return nil, nil, fmt.Errorf("failed to establish WebSocket connection after %d attempts with error %w", *retryConfig.MaxAttempts, err) + return nil, nil, fmt.Errorf("failed to establish WebSocket connection after %d attempts with error %w", retryConfig.MaxAttempts, err) } return websocket.DefaultDialer.DialContext(ctx, url, headers) From 9b3932cc1d7ff0b80a4af4d68df7a3c7f6a877f2 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Tue, 27 Aug 2024 14:59:27 +0530 Subject: [PATCH 6/8] passed metrics to handleConnectionResponse to track errors --- x-pack/filebeat/input/streaming/websocket.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/x-pack/filebeat/input/streaming/websocket.go b/x-pack/filebeat/input/streaming/websocket.go index 282735efe28..e530281e3cb 100644 --- a/x-pack/filebeat/input/streaming/websocket.go +++ b/x-pack/filebeat/input/streaming/websocket.go @@ -90,7 +90,7 @@ func (s *websocketStream) FollowStream(ctx context.Context) error { // websocket client c, resp, err := connectWebSocket(ctx, s.cfg, url, s.log) - handleConnectionResponse(resp, s.log) + handleConnectionResponse(resp, s.metrics, s.log) if err != nil { s.metrics.errorsTotal.Inc() s.log.Errorw("failed to establish websocket connection", "error", err) @@ -123,7 +123,7 @@ func (s *websocketStream) FollowStream(ctx context.Context) error { } // since c is already a pointer, we can reassign it to the new connection and the defer func will still handle it c, resp, err = connectWebSocket(ctx, s.cfg, url, s.log) - handleConnectionResponse(resp, s.log) + handleConnectionResponse(resp, s.metrics, s.log) if err != nil { s.metrics.errorsTotal.Inc() s.log.Errorw("failed to reconnect websocket connection", "error", err) @@ -189,7 +189,7 @@ func isRetryableError(err error) bool { } // handleConnectionResponse logs the response body of the websocket connection. -func handleConnectionResponse(resp *http.Response, log *logp.Logger) { +func handleConnectionResponse(resp *http.Response, metrics *inputMetrics, log *logp.Logger) { if resp != nil && resp.Body != nil { var buf bytes.Buffer defer resp.Body.Close() @@ -197,6 +197,7 @@ func handleConnectionResponse(resp *http.Response, log *logp.Logger) { if log.Core().Enabled(zapcore.DebugLevel) { const limit = 1e4 if _, err := io.CopyN(&buf, resp.Body, limit); err != nil && !errors.Is(err, io.EOF) { + metrics.errorsTotal.Inc() log.Errorw("failed to read websocket response body", "error", err) return } @@ -204,6 +205,7 @@ func handleConnectionResponse(resp *http.Response, log *logp.Logger) { // discard the remaining part of the body and check for truncation. if n, err := io.Copy(io.Discard, resp.Body); err != nil { + metrics.errorsTotal.Inc() log.Errorw("failed to discard remaining response body", "error", err) } else if n != 0 && buf.Len() != 0 { buf.WriteString("... truncated") From d9e215058f4210ce9a9d750adee75a17c1a9447d Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Wed, 28 Aug 2024 12:51:40 +0530 Subject: [PATCH 7/8] addressed PR suggestions --- x-pack/filebeat/input/streaming/websocket.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/x-pack/filebeat/input/streaming/websocket.go b/x-pack/filebeat/input/streaming/websocket.go index e530281e3cb..c35002b1511 100644 --- a/x-pack/filebeat/input/streaming/websocket.go +++ b/x-pack/filebeat/input/streaming/websocket.go @@ -198,15 +198,14 @@ func handleConnectionResponse(resp *http.Response, metrics *inputMetrics, log *l const limit = 1e4 if _, err := io.CopyN(&buf, resp.Body, limit); err != nil && !errors.Is(err, io.EOF) { metrics.errorsTotal.Inc() - log.Errorw("failed to read websocket response body", "error", err) - return + fmt.Fprintf(&buf, "failed to read websocket response body with error: (%s) \n", err) } } // discard the remaining part of the body and check for truncation. if n, err := io.Copy(io.Discard, resp.Body); err != nil { metrics.errorsTotal.Inc() - log.Errorw("failed to discard remaining response body", "error", err) + fmt.Fprintf(&buf, "failed to discard remaining response body with error: (%s) ", err) } else if n != 0 && buf.Len() != 0 { buf.WriteString("... truncated") } From 0cbfdb22aecb4b59f36ee2025dfaea44883a4c34 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Wed, 28 Aug 2024 12:57:07 +0530 Subject: [PATCH 8/8] updated retry dial signature --- x-pack/filebeat/input/streaming/websocket.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/streaming/websocket.go b/x-pack/filebeat/input/streaming/websocket.go index c35002b1511..ce0f086e558 100644 --- a/x-pack/filebeat/input/streaming/websocket.go +++ b/x-pack/filebeat/input/streaming/websocket.go @@ -224,7 +224,7 @@ func connectWebSocket(ctx context.Context, cfg config, url string, log *logp.Log if cfg.Retry != nil { retryConfig := cfg.Retry for attempt := 1; attempt <= retryConfig.MaxAttempts; attempt++ { - conn, response, err = websocket.DefaultDialer.Dial(url, nil) + conn, response, err = websocket.DefaultDialer.DialContext(ctx, url, headers) if err == nil { return conn, response, nil }