From bdaf8b39c2e50c01f65ea8eb7996ca3ee5df5f12 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Wed, 23 Jan 2019 14:01:53 -0800 Subject: [PATCH] fix(ProviderQueryManager): fix test + add logging Add debug logging for the provider query manager and make tests more reliable --- providerquerymanager/providerquerymanager.go | 22 ++++++++- .../providerquerymanager_test.go | 48 +++++++++++++------ 2 files changed, 54 insertions(+), 16 deletions(-) diff --git a/providerquerymanager/providerquerymanager.go b/providerquerymanager/providerquerymanager.go index d2ba9e72..21cfcd0d 100644 --- a/providerquerymanager/providerquerymanager.go +++ b/providerquerymanager/providerquerymanager.go @@ -2,6 +2,7 @@ package providerquerymanager import ( "context" + "fmt" "sync" "time" @@ -31,6 +32,7 @@ type ProviderQueryNetwork interface { } type providerQueryMessage interface { + debugMessage() string handle(pqm *ProviderQueryManager) } @@ -192,6 +194,7 @@ func (pqm *ProviderQueryManager) findProviderWorker() { return } + log.Debugf("Beginning Find Provider Request for cid: %s", k.String()) pqm.timeoutMutex.RLock() findProviderCtx, cancel := context.WithTimeout(pqm.ctx, pqm.findProviderTimeout) pqm.timeoutMutex.RUnlock() @@ -273,8 +276,6 @@ func (pqm *ProviderQueryManager) cleanupInProcessRequests() { } func (pqm *ProviderQueryManager) run() { - defer close(pqm.incomingFindProviderRequests) - defer close(pqm.providerRequestsProcessing) defer pqm.cleanupInProcessRequests() go pqm.providerRequestBufferWorker() @@ -285,6 +286,7 @@ func (pqm *ProviderQueryManager) run() { for { select { case nextMessage := <-pqm.providerQueryMessages: + log.Debug(nextMessage.debugMessage()) nextMessage.handle(pqm) case <-pqm.ctx.Done(): return @@ -292,6 +294,10 @@ func (pqm *ProviderQueryManager) run() { } } +func (rpm *receivedProviderMessage) debugMessage() string { + return fmt.Sprintf("Received provider (%s) for cid (%s)", rpm.p.String(), rpm.k.String()) +} + func (rpm *receivedProviderMessage) handle(pqm *ProviderQueryManager) { requestStatus, ok := pqm.inProgressRequestStatuses[rpm.k] if !ok { @@ -308,6 +314,10 @@ func (rpm *receivedProviderMessage) handle(pqm *ProviderQueryManager) { } } +func (fpqm *finishedProviderQueryMessage) debugMessage() string { + return fmt.Sprintf("Finished Provider Query on cid: %s", fpqm.k.String()) +} + func (fpqm *finishedProviderQueryMessage) handle(pqm *ProviderQueryManager) { requestStatus, ok := pqm.inProgressRequestStatuses[fpqm.k] if !ok { @@ -320,6 +330,10 @@ func (fpqm *finishedProviderQueryMessage) handle(pqm *ProviderQueryManager) { delete(pqm.inProgressRequestStatuses, fpqm.k) } +func (npqm *newProvideQueryMessage) debugMessage() string { + return fmt.Sprintf("New Provider Query on cid: %s from session: %d", npqm.k.String(), npqm.ses) +} + func (npqm *newProvideQueryMessage) handle(pqm *ProviderQueryManager) { requestStatus, ok := pqm.inProgressRequestStatuses[npqm.k] if !ok { @@ -343,6 +357,10 @@ func (npqm *newProvideQueryMessage) handle(pqm *ProviderQueryManager) { } } +func (crm *cancelRequestMessage) debugMessage() string { + return fmt.Sprintf("Cancel provider query on cid: %s from session: %d", crm.k.String(), crm.ses) +} + func (crm *cancelRequestMessage) handle(pqm *ProviderQueryManager) { requestStatus, ok := pqm.inProgressRequestStatuses[crm.k] if !ok { diff --git a/providerquerymanager/providerquerymanager_test.go b/providerquerymanager/providerquerymanager_test.go index f2e6f036..f5b6db1e 100644 --- a/providerquerymanager/providerquerymanager_test.go +++ b/providerquerymanager/providerquerymanager_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "reflect" + "sync" "testing" "time" @@ -14,11 +15,12 @@ import ( ) type fakeProviderNetwork struct { - peersFound []peer.ID - connectError error - delay time.Duration - connectDelay time.Duration - queriesMade int + peersFound []peer.ID + connectError error + delay time.Duration + connectDelay time.Duration + queriesMadeMutex sync.RWMutex + queriesMade int } func (fpn *fakeProviderNetwork) ConnectTo(context.Context, peer.ID) error { @@ -27,13 +29,20 @@ func (fpn *fakeProviderNetwork) ConnectTo(context.Context, peer.ID) error { } func (fpn *fakeProviderNetwork) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-chan peer.ID { + fpn.queriesMadeMutex.Lock() fpn.queriesMade++ + fpn.queriesMadeMutex.Unlock() incomingPeers := make(chan peer.ID) go func() { defer close(incomingPeers) for _, p := range fpn.peersFound { time.Sleep(fpn.delay) select { + case <-ctx.Done(): + return + default: + } + select { case incomingPeers <- p: case <-ctx.Done(): return @@ -75,9 +84,12 @@ func TestNormalSimultaneousFetch(t *testing.T) { t.Fatal("Did not collect all peers for request that was completed") } + fpn.queriesMadeMutex.Lock() + defer fpn.queriesMadeMutex.Unlock() if fpn.queriesMade != 2 { t.Fatal("Did not dedup provider requests running simultaneously") } + } func TestDedupingProviderRequests(t *testing.T) { @@ -93,7 +105,7 @@ func TestDedupingProviderRequests(t *testing.T) { sessionID1 := testutil.GenerateSessionID() sessionID2 := testutil.GenerateSessionID() - sessionCtx, cancel := context.WithTimeout(ctx, 20*time.Millisecond) + sessionCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) defer cancel() firstRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, key, sessionID1) secondRequestChan := providerQueryManager.FindProvidersAsync(sessionCtx, key, sessionID2) @@ -115,7 +127,8 @@ func TestDedupingProviderRequests(t *testing.T) { if !reflect.DeepEqual(firstPeersReceived, secondPeersReceived) { t.Fatal("Did not receive the same response to both find provider requests") } - + fpn.queriesMadeMutex.Lock() + defer fpn.queriesMadeMutex.Unlock() if fpn.queriesMade != 1 { t.Fatal("Did not dedup provider requests running simultaneously") } @@ -139,7 +152,7 @@ func TestCancelOneRequestDoesNotTerminateAnother(t *testing.T) { firstSessionCtx, firstCancel := context.WithTimeout(ctx, 3*time.Millisecond) defer firstCancel() firstRequestChan := providerQueryManager.FindProvidersAsync(firstSessionCtx, key, sessionID1) - secondSessionCtx, secondCancel := context.WithTimeout(ctx, 20*time.Millisecond) + secondSessionCtx, secondCancel := context.WithTimeout(ctx, 100*time.Millisecond) defer secondCancel() secondRequestChan := providerQueryManager.FindProvidersAsync(secondSessionCtx, key, sessionID2) @@ -160,7 +173,8 @@ func TestCancelOneRequestDoesNotTerminateAnother(t *testing.T) { if len(firstPeersReceived) >= len(peers) { t.Fatal("Collected all peers on cancelled peer, should have been cancelled immediately") } - + fpn.queriesMadeMutex.Lock() + defer fpn.queriesMadeMutex.Unlock() if fpn.queriesMade != 1 { t.Fatal("Did not dedup provider requests running simultaneously") } @@ -248,26 +262,33 @@ func TestRateLimitingRequests(t *testing.T) { delay: 1 * time.Millisecond, } ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() providerQueryManager := New(ctx, fpn) providerQueryManager.Startup() keys := testutil.GenerateCids(maxInProcessRequests + 1) sessionID := testutil.GenerateSessionID() - sessionCtx, cancel := context.WithTimeout(ctx, 20*time.Millisecond) + sessionCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) defer cancel() var requestChannels []<-chan peer.ID for i := 0; i < maxInProcessRequests+1; i++ { requestChannels = append(requestChannels, providerQueryManager.FindProvidersAsync(sessionCtx, keys[i], sessionID)) } - time.Sleep(2 * time.Millisecond) + time.Sleep(9 * time.Millisecond) + fpn.queriesMadeMutex.Lock() if fpn.queriesMade != maxInProcessRequests { + t.Logf("Queries made: %d\n", fpn.queriesMade) t.Fatal("Did not limit parallel requests to rate limit") } + fpn.queriesMadeMutex.Unlock() for i := 0; i < maxInProcessRequests+1; i++ { for range requestChannels[i] { } } + fpn.queriesMadeMutex.Lock() + defer fpn.queriesMadeMutex.Unlock() if fpn.queriesMade != maxInProcessRequests+1 { t.Fatal("Did not make all seperate requests") } @@ -282,7 +303,7 @@ func TestFindProviderTimeout(t *testing.T) { ctx := context.Background() providerQueryManager := New(ctx, fpn) providerQueryManager.Startup() - providerQueryManager.SetFindProviderTimeout(3 * time.Millisecond) + providerQueryManager.SetFindProviderTimeout(2 * time.Millisecond) keys := testutil.GenerateCids(1) sessionID1 := testutil.GenerateSessionID() @@ -293,8 +314,7 @@ func TestFindProviderTimeout(t *testing.T) { for p := range firstRequestChan { firstPeersReceived = append(firstPeersReceived, p) } - if len(firstPeersReceived) <= 0 || - len(firstPeersReceived) >= len(peers) { + if len(firstPeersReceived) >= len(peers) { t.Fatal("Find provider request should have timed out, did not") } }