From 4ca372de33c51c11ac5386353234928e78808cb2 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Wed, 5 Feb 2025 15:33:43 +0000 Subject: [PATCH 1/6] Put the replicator into an error and stopped state when a reconnect loop times out --- db/active_replicator_common.go | 5 ++ rest/replicatortest/replicator_test.go | 69 ++++++++++++++++++++++++++ 2 files changed, 74 insertions(+) diff --git a/db/active_replicator_common.go b/db/active_replicator_common.go index ab21a27fda..f82bf29711 100644 --- a/db/active_replicator_common.go +++ b/db/active_replicator_common.go @@ -204,6 +204,11 @@ func (a *activeReplicatorCommon) reconnectLoop() { if err != nil { a.replicationStats.NumReconnectsAborted.Add(1) base.WarnfCtx(ctx, "couldn't reconnect replicator: %v", err) + a.lock.Lock() + defer a.lock.Unlock() + a.setState(ReplicationStateError) + a._publishStatus() + a._stop() } } diff --git a/rest/replicatortest/replicator_test.go b/rest/replicatortest/replicator_test.go index 05f3ffdfa7..b4d1a47dca 100644 --- a/rest/replicatortest/replicator_test.go +++ b/rest/replicatortest/replicator_test.go @@ -2471,6 +2471,75 @@ func TestReconnectReplicator(t *testing.T) { } +func TestReplicatorReconnectTimeout(t *testing.T) { + base.RequireNumTestBuckets(t, 2) + base.SetUpTestLogging(t, base.LevelInfo, base.KeyReplicate, base.KeyHTTP, base.KeyHTTPResp) + passiveRT := rest.NewRestTester(t, nil) + defer passiveRT.Close() + srv := httptest.NewServer(passiveRT.TestPublicHandler()) + defer srv.Close() + + // Build remoteDBURL with basic auth creds + remoteDBURL, err := url.Parse(srv.URL + "/db") + require.NoError(t, err) + + // Add basic auth creds to target db URL + remoteDBURL.User = url.UserPassword("alice", "pass") + + // Active + activeRT := rest.NewRestTester(t, nil) + defer activeRT.Close() + id, err := base.GenerateRandomID() + require.NoError(t, err) + stats, err := base.SyncGatewayStats.NewDBStats(t.Name(), false, false, false, nil, nil) + require.NoError(t, err) + dbstats, err := stats.DBReplicatorStats(t.Name()) + require.NoError(t, err) + arConfig := db.ActiveReplicatorConfig{ + ID: id, + Direction: db.ActiveReplicatorTypePushAndPull, + RemoteDBURL: remoteDBURL, + ActiveDB: &db.Database{ + DatabaseContext: activeRT.GetDatabase(), + }, + Continuous: true, + // aggressive reconnect intervals for testing purposes + TotalReconnectTimeout: time.Millisecond * 10, + ReplicationStatsMap: dbstats, + CollectionsEnabled: !activeRT.GetDatabase().OnlyDefaultCollection(), + } + + // Create the first active replicator to pull from seq:0 + ar, err := db.NewActiveReplicator(activeRT.Context(), &arConfig) + require.NoError(t, err) + require.Equal(t, int64(0), ar.Push.GetStats().NumConnectAttempts.Value()) + + expectedErrMsg := "unexpected status code 401 from target database" + require.ErrorContains(t, ar.Start(activeRT.Context()), expectedErrMsg) + require.EventuallyWithT(t, func(c *assert.CollectT) { + state, _ := ar.State(activeRT.Context()) + assert.Equal(c, db.ReplicationStateError, state) + }, time.Second*10, time.Millisecond*100) + + status, err := activeRT.GetDatabase().SGReplicateMgr.GetReplicationStatus(activeRT.Context(), id, db.DefaultReplicationStatusOptions()) + require.NoError(t, err) + require.Equal(t, db.ReplicationStateError, status.Status) + require.Equal(t, expectedErrMsg, status.ErrorMessage) + require.Equal(t, int64(1), ar.Push.GetStats().NumReconnectsAborted.Value()) + firstNumConnectAttempts := ar.Push.GetStats().NumConnectAttempts.Value() + require.GreaterOrEqual(t, firstNumConnectAttempts, int64(1)) + + // restart replicator to make sure we'll retry a reconnection, so the state can go back to reconnecting + require.ErrorContains(t, ar.Start(activeRT.Context()), expectedErrMsg) + require.EventuallyWithT(t, func(c *assert.CollectT) { + state, errMsg := ar.State(activeRT.Context()) + assert.Equal(c, db.ReplicationStateError, state) + assert.Equal(c, expectedErrMsg, errMsg) + }, time.Second*10, time.Millisecond*100) + require.Equal(t, int64(2), ar.Push.GetStats().NumReconnectsAborted.Value()) + require.GreaterOrEqual(t, ar.Push.GetStats().NumConnectAttempts.Value(), firstNumConnectAttempts+1) +} + // TestTotalSyncTimeStat: // - starts a replicator to simulate a long lived websocket connection on a sync gateway // - wait for this replication connection to be picked up on stats (NumReplicationsActive) From 50ca63b557ab023d1b7f5f9c0c9c4f0fa06ba05e Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Wed, 5 Feb 2025 17:28:56 +0000 Subject: [PATCH 2/6] Conditionally set reconnectLoop error state based on context error --- db/active_replicator_common.go | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/db/active_replicator_common.go b/db/active_replicator_common.go index f82bf29711..6bc24e1284 100644 --- a/db/active_replicator_common.go +++ b/db/active_replicator_common.go @@ -196,14 +196,26 @@ func (a *activeReplicatorCommon) reconnectLoop() { return err != nil, err, nil } - err, _ := base.RetryLoop(ctx, "replicator reconnect", retryFunc, sleeperFunc) + retryErr, _ := base.RetryLoop(ctx, "replicator reconnect", retryFunc, sleeperFunc) // release timer associated with context deadline if deadlineCancel != nil { deadlineCancel() } - if err != nil { + if retryErr != nil { + switch ctxErr := ctx.Err(); ctxErr { + case context.Canceled: + // replicator was stopped - appropriate state has already been set + base.InfofCtx(ctx, base.KeyReplicate, "exiting reconnect loop: %v", ctxErr) + return + case context.DeadlineExceeded: + // timeout on reconnecting state + base.WarnfCtx(ctx, "aborting reconnect loop after timeout: %v", ctxErr) + default: + // unexpected error from retry loop + base.WarnfCtx(ctx, "aborting reconnect loop after error: %v", retryErr) + } + a.replicationStats.NumReconnectsAborted.Add(1) - base.WarnfCtx(ctx, "couldn't reconnect replicator: %v", err) a.lock.Lock() defer a.lock.Unlock() a.setState(ReplicationStateError) From 9c3ed550c9d94967f4b659d64fc1debd0e2c3315 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Wed, 5 Feb 2025 18:39:10 +0000 Subject: [PATCH 3/6] Rewrite error handling to have early returns --- db/active_replicator_common.go | 43 +++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/db/active_replicator_common.go b/db/active_replicator_common.go index 6bc24e1284..95f31e0ef1 100644 --- a/db/active_replicator_common.go +++ b/db/active_replicator_common.go @@ -201,27 +201,32 @@ func (a *activeReplicatorCommon) reconnectLoop() { if deadlineCancel != nil { deadlineCancel() } - if retryErr != nil { - switch ctxErr := ctx.Err(); ctxErr { - case context.Canceled: - // replicator was stopped - appropriate state has already been set - base.InfofCtx(ctx, base.KeyReplicate, "exiting reconnect loop: %v", ctxErr) - return - case context.DeadlineExceeded: - // timeout on reconnecting state - base.WarnfCtx(ctx, "aborting reconnect loop after timeout: %v", ctxErr) - default: - // unexpected error from retry loop - base.WarnfCtx(ctx, "aborting reconnect loop after error: %v", retryErr) - } - a.replicationStats.NumReconnectsAborted.Add(1) - a.lock.Lock() - defer a.lock.Unlock() - a.setState(ReplicationStateError) - a._publishStatus() - a._stop() + // Exit early if no error or if context was cancelled + if retryErr == nil { + return } + if ctx.Err() == context.Canceled { + // replicator was stopped - appropriate state has already been set + base.InfofCtx(ctx, base.KeyReplicate, "exiting reconnect loop: %v", ctx.Err()) + return + } + + // Handle remaining error cases + if ctx.Err() == context.DeadlineExceeded { + // timeout on reconnecting state + base.WarnfCtx(ctx, "aborting reconnect loop after timeout: %v", ctx.Err()) + } else { + // unexpected error from retry loop + base.WarnfCtx(ctx, "aborting reconnect loop after error: %v", retryErr) + } + + a.replicationStats.NumReconnectsAborted.Add(1) + a.lock.Lock() + defer a.lock.Unlock() + a.setState(ReplicationStateError) + a._publishStatus() + a._stop() } // reconnect will disconnect and stop the replicator, but not set the state - such that it will be reassigned and started again. From c6e27578d5bf69cab19e60fb7f8d87747f51ca60 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Wed, 5 Feb 2025 18:39:19 +0000 Subject: [PATCH 4/6] Defer unlock inside retry func --- db/active_replicator_common.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/db/active_replicator_common.go b/db/active_replicator_common.go index 95f31e0ef1..3f6b047c6a 100644 --- a/db/active_replicator_common.go +++ b/db/active_replicator_common.go @@ -172,6 +172,7 @@ func (a *activeReplicatorCommon) reconnectLoop() { } a.lock.Lock() + defer a.lock.Unlock() // preserve lastError from the previous connect attempt a.setState(ReplicationStateReconnecting) @@ -188,8 +189,6 @@ func (a *activeReplicatorCommon) reconnectLoop() { a.setLastError(err) a._publishStatus() - a.lock.Unlock() - if err != nil { base.InfofCtx(a.ctx, base.KeyReplicate, "error starting replicator on reconnect: %v", err) } From 7a41b0f807870fce8d897172e499046eb0cb7486 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Wed, 5 Feb 2025 18:42:55 +0000 Subject: [PATCH 5/6] add comment for setState instead of setError --- db/active_replicator_common.go | 1 + 1 file changed, 1 insertion(+) diff --git a/db/active_replicator_common.go b/db/active_replicator_common.go index 3f6b047c6a..8e64bc4a3d 100644 --- a/db/active_replicator_common.go +++ b/db/active_replicator_common.go @@ -223,6 +223,7 @@ func (a *activeReplicatorCommon) reconnectLoop() { a.replicationStats.NumReconnectsAborted.Add(1) a.lock.Lock() defer a.lock.Unlock() + // use setState to preserve last error from retry loop set by setLastError a.setState(ReplicationStateError) a._publishStatus() a._stop() From adcb911f44634e20b3c8f6f7de38f0359b6233e2 Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Thu, 6 Feb 2025 18:14:04 +0000 Subject: [PATCH 6/6] Simplify reconnectLoop error handling for logging - rely on retry loop error message which is now wrapping context err --- base/util.go | 7 ++++--- db/active_replicator_common.go | 20 ++++++-------------- 2 files changed, 10 insertions(+), 17 deletions(-) diff --git a/base/util.go b/base/util.go index c28068ea03..4cb2da63aa 100644 --- a/base/util.go +++ b/base/util.go @@ -467,12 +467,13 @@ func RetryLoop(ctx context.Context, description string, worker RetryWorker, slee select { case <-ctx.Done(): verb := "closed" - if errors.Is(ctx.Err(), context.Canceled) { + ctxErr := ctx.Err() + if errors.Is(ctxErr, context.Canceled) { verb = "canceled" - } else if errors.Is(ctx.Err(), context.DeadlineExceeded) { + } else if errors.Is(ctxErr, context.DeadlineExceeded) { verb = "timed out" } - return fmt.Errorf("Retry loop for %v %s based on context", description, verb), nil + return fmt.Errorf("Retry loop for %v %s based on context: %w", description, verb, ctxErr), nil case <-time.After(time.Millisecond * time.Duration(sleepMs)): } diff --git a/db/active_replicator_common.go b/db/active_replicator_common.go index 8e64bc4a3d..4282b3cfc2 100644 --- a/db/active_replicator_common.go +++ b/db/active_replicator_common.go @@ -200,26 +200,18 @@ func (a *activeReplicatorCommon) reconnectLoop() { if deadlineCancel != nil { deadlineCancel() } - - // Exit early if no error or if context was cancelled + // Exit early if no error if retryErr == nil { return } - if ctx.Err() == context.Canceled { - // replicator was stopped - appropriate state has already been set - base.InfofCtx(ctx, base.KeyReplicate, "exiting reconnect loop: %v", ctx.Err()) - return - } - // Handle remaining error cases - if ctx.Err() == context.DeadlineExceeded { - // timeout on reconnecting state - base.WarnfCtx(ctx, "aborting reconnect loop after timeout: %v", ctx.Err()) - } else { - // unexpected error from retry loop - base.WarnfCtx(ctx, "aborting reconnect loop after error: %v", retryErr) + // replicator was stopped - appropriate state has already been set + if errors.Is(ctx.Err(), context.Canceled) { + base.DebugfCtx(ctx, base.KeyReplicate, "exiting reconnect loop: %v", retryErr) + return } + base.WarnfCtx(ctx, "aborting reconnect loop: %v", retryErr) a.replicationStats.NumReconnectsAborted.Add(1) a.lock.Lock() defer a.lock.Unlock()