diff --git a/base/util.go b/base/util.go index c28068ea03..4cb2da63aa 100644 --- a/base/util.go +++ b/base/util.go @@ -467,12 +467,13 @@ func RetryLoop(ctx context.Context, description string, worker RetryWorker, slee select { case <-ctx.Done(): verb := "closed" - if errors.Is(ctx.Err(), context.Canceled) { + ctxErr := ctx.Err() + if errors.Is(ctxErr, context.Canceled) { verb = "canceled" - } else if errors.Is(ctx.Err(), context.DeadlineExceeded) { + } else if errors.Is(ctxErr, context.DeadlineExceeded) { verb = "timed out" } - return fmt.Errorf("Retry loop for %v %s based on context", description, verb), nil + return fmt.Errorf("Retry loop for %v %s based on context: %w", description, verb, ctxErr), nil case <-time.After(time.Millisecond * time.Duration(sleepMs)): } diff --git a/db/active_replicator_common.go b/db/active_replicator_common.go index ab21a27fda..4282b3cfc2 100644 --- a/db/active_replicator_common.go +++ b/db/active_replicator_common.go @@ -172,6 +172,7 @@ func (a *activeReplicatorCommon) reconnectLoop() { } a.lock.Lock() + defer a.lock.Unlock() // preserve lastError from the previous connect attempt a.setState(ReplicationStateReconnecting) @@ -188,23 +189,36 @@ func (a *activeReplicatorCommon) reconnectLoop() { a.setLastError(err) a._publishStatus() - a.lock.Unlock() - if err != nil { base.InfofCtx(a.ctx, base.KeyReplicate, "error starting replicator on reconnect: %v", err) } return err != nil, err, nil } - err, _ := base.RetryLoop(ctx, "replicator reconnect", retryFunc, sleeperFunc) + retryErr, _ := base.RetryLoop(ctx, "replicator reconnect", retryFunc, sleeperFunc) // release timer associated with context deadline if deadlineCancel != nil { deadlineCancel() } - if err != nil { - a.replicationStats.NumReconnectsAborted.Add(1) - base.WarnfCtx(ctx, "couldn't reconnect replicator: %v", err) + // Exit early if no error + if retryErr == nil { + return } + + // replicator was stopped - appropriate state has already been set + if errors.Is(ctx.Err(), context.Canceled) { + base.DebugfCtx(ctx, base.KeyReplicate, "exiting reconnect loop: %v", retryErr) + return + } + + base.WarnfCtx(ctx, "aborting reconnect loop: %v", retryErr) + a.replicationStats.NumReconnectsAborted.Add(1) + a.lock.Lock() + defer a.lock.Unlock() + // use setState to preserve last error from retry loop set by setLastError + a.setState(ReplicationStateError) + a._publishStatus() + a._stop() } // reconnect will disconnect and stop the replicator, but not set the state - such that it will be reassigned and started again. diff --git a/rest/replicatortest/replicator_test.go b/rest/replicatortest/replicator_test.go index 05f3ffdfa7..b4d1a47dca 100644 --- a/rest/replicatortest/replicator_test.go +++ b/rest/replicatortest/replicator_test.go @@ -2471,6 +2471,75 @@ func TestReconnectReplicator(t *testing.T) { } +func TestReplicatorReconnectTimeout(t *testing.T) { + base.RequireNumTestBuckets(t, 2) + base.SetUpTestLogging(t, base.LevelInfo, base.KeyReplicate, base.KeyHTTP, base.KeyHTTPResp) + passiveRT := rest.NewRestTester(t, nil) + defer passiveRT.Close() + srv := httptest.NewServer(passiveRT.TestPublicHandler()) + defer srv.Close() + + // Build remoteDBURL with basic auth creds + remoteDBURL, err := url.Parse(srv.URL + "/db") + require.NoError(t, err) + + // Add basic auth creds to target db URL + remoteDBURL.User = url.UserPassword("alice", "pass") + + // Active + activeRT := rest.NewRestTester(t, nil) + defer activeRT.Close() + id, err := base.GenerateRandomID() + require.NoError(t, err) + stats, err := base.SyncGatewayStats.NewDBStats(t.Name(), false, false, false, nil, nil) + require.NoError(t, err) + dbstats, err := stats.DBReplicatorStats(t.Name()) + require.NoError(t, err) + arConfig := db.ActiveReplicatorConfig{ + ID: id, + Direction: db.ActiveReplicatorTypePushAndPull, + RemoteDBURL: remoteDBURL, + ActiveDB: &db.Database{ + DatabaseContext: activeRT.GetDatabase(), + }, + Continuous: true, + // aggressive reconnect intervals for testing purposes + TotalReconnectTimeout: time.Millisecond * 10, + ReplicationStatsMap: dbstats, + CollectionsEnabled: !activeRT.GetDatabase().OnlyDefaultCollection(), + } + + // Create the first active replicator to pull from seq:0 + ar, err := db.NewActiveReplicator(activeRT.Context(), &arConfig) + require.NoError(t, err) + require.Equal(t, int64(0), ar.Push.GetStats().NumConnectAttempts.Value()) + + expectedErrMsg := "unexpected status code 401 from target database" + require.ErrorContains(t, ar.Start(activeRT.Context()), expectedErrMsg) + require.EventuallyWithT(t, func(c *assert.CollectT) { + state, _ := ar.State(activeRT.Context()) + assert.Equal(c, db.ReplicationStateError, state) + }, time.Second*10, time.Millisecond*100) + + status, err := activeRT.GetDatabase().SGReplicateMgr.GetReplicationStatus(activeRT.Context(), id, db.DefaultReplicationStatusOptions()) + require.NoError(t, err) + require.Equal(t, db.ReplicationStateError, status.Status) + require.Equal(t, expectedErrMsg, status.ErrorMessage) + require.Equal(t, int64(1), ar.Push.GetStats().NumReconnectsAborted.Value()) + firstNumConnectAttempts := ar.Push.GetStats().NumConnectAttempts.Value() + require.GreaterOrEqual(t, firstNumConnectAttempts, int64(1)) + + // restart replicator to make sure we'll retry a reconnection, so the state can go back to reconnecting + require.ErrorContains(t, ar.Start(activeRT.Context()), expectedErrMsg) + require.EventuallyWithT(t, func(c *assert.CollectT) { + state, errMsg := ar.State(activeRT.Context()) + assert.Equal(c, db.ReplicationStateError, state) + assert.Equal(c, expectedErrMsg, errMsg) + }, time.Second*10, time.Millisecond*100) + require.Equal(t, int64(2), ar.Push.GetStats().NumReconnectsAborted.Value()) + require.GreaterOrEqual(t, ar.Push.GetStats().NumConnectAttempts.Value(), firstNumConnectAttempts+1) +} + // TestTotalSyncTimeStat: // - starts a replicator to simulate a long lived websocket connection on a sync gateway // - wait for this replication connection to be picked up on stats (NumReplicationsActive)