Skip to content

Commit

Permalink
Merge pull request PelicanPlatform#441 from haoming29/fix-flaky-direc…
Browse files Browse the repository at this point in the history
…tor-test

Fix racing conditions in various director-related code and tests
  • Loading branch information
bbockelm authored Dec 1, 2023
2 parents 3a6840b + 1c77168 commit 173e07f
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 31 deletions.
36 changes: 25 additions & 11 deletions director/cache_ads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,6 @@ func TestConfigCacheEviction(t *testing.T) {
}

t.Run("evicted-origin-can-cancel-health-test", func(t *testing.T) {
serverAds.DeleteAll()
// Clear the map for the new test
healthTestCancelFuncs = make(map[ServerAd]context.CancelFunc)

// Start cache eviction
shutdownCtx, shutdownCancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
Expand All @@ -223,11 +219,21 @@ func TestConfigCacheEviction(t *testing.T) {
wg.Wait()
}()

serverAds.Set(mockPelicanOriginServerAd, []NamespaceAd{mockNamespaceAd}, ttlcache.DefaultTTL)
ctx, cancelFunc := context.WithDeadline(context.Background(), time.Now().Add(time.Second*5))
healthTestCancelFuncs[mockPelicanOriginServerAd] = cancelFunc

require.True(t, serverAds.Has(mockPelicanOriginServerAd), "serverAds failed to register the originAd")
func() {
serverAdMutex.Lock()
defer serverAdMutex.Unlock()
serverAds.DeleteAll()
serverAds.Set(mockPelicanOriginServerAd, []NamespaceAd{mockNamespaceAd}, ttlcache.DefaultTTL)
healthTestCancelFuncsMutex.Lock()
defer healthTestCancelFuncsMutex.Unlock()
// Clear the map for the new test
healthTestCancelFuncs = make(map[ServerAd]context.CancelFunc)
healthTestCancelFuncs[mockPelicanOriginServerAd] = cancelFunc

require.True(t, serverAds.Has(mockPelicanOriginServerAd), "serverAds failed to register the originAd")
}()

cancelChan := make(chan int)
go func() {
Expand All @@ -237,9 +243,13 @@ func TestConfigCacheEviction(t *testing.T) {
}
}()

serverAds.Delete(mockPelicanOriginServerAd) // This should call onEviction handler and close the context
func() {
serverAdMutex.Lock()
defer serverAdMutex.Unlock()
serverAds.Delete(mockPelicanOriginServerAd) // This should call onEviction handler and close the context

require.False(t, serverAds.Has(mockPelicanOriginServerAd), "serverAds didn't delete originAd")
require.False(t, serverAds.Has(mockPelicanOriginServerAd), "serverAds didn't delete originAd")
}()

// OnEviction is handled on a different goroutine than the cache management
// So we want to wait for a bit so that OnEviction can have time to be
Expand All @@ -250,7 +260,11 @@ func TestConfigCacheEviction(t *testing.T) {
case <-time.After(3 * time.Second):
require.False(t, true)
}
assert.True(t, healthTestCancelFuncs[mockPelicanOriginServerAd] == nil, "Evicted origin didn't clear cancelFunc in the map")
func() {
healthTestCancelFuncsMutex.RLock()
defer healthTestCancelFuncsMutex.RUnlock()
assert.True(t, healthTestCancelFuncs[mockPelicanOriginServerAd] == nil, "Evicted origin didn't clear cancelFunc in the map")
}()
})
}

Expand All @@ -271,7 +285,7 @@ func TestServerAdsCacheEviction(t *testing.T) {
deletedChan := make(chan int)
cancelChan := make(chan int)

go func() {
func() {
serverAdMutex.Lock()
defer serverAdMutex.Unlock()
serverAds.DeleteAll()
Expand Down
2 changes: 2 additions & 0 deletions director/director_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,8 @@ func ConfigTTLCache(ctx context.Context, wg *sync.WaitGroup) {
go namespaceKeys.Start()

serverAds.OnEviction(func(ctx context.Context, er ttlcache.EvictionReason, i *ttlcache.Item[ServerAd, []NamespaceAd]) {
healthTestCancelFuncsMutex.Lock()
defer healthTestCancelFuncsMutex.Unlock()
if cancelFunc, exists := healthTestCancelFuncs[i.Key()]; exists {
// Call the cancel function for the evicted originAd to end its health test
cancelFunc()
Expand Down
23 changes: 12 additions & 11 deletions director/director_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,24 +58,20 @@ func TestListNamespaces(t *testing.T) {
}

setup := func() {
serverAds.DeleteAll()
}

teardown := func() {
serverAdMutex.Lock()
defer serverAdMutex.Unlock()
serverAds.DeleteAll()
}

t.Run("empty-entry", func(t *testing.T) {
setup()
defer teardown()
ns := ListNamespacesFromOrigins()

// Initially there should be 0 namespaces registered
assert.Equal(t, 0, len(ns), "List is not empty for empty namespace cache.")
})
t.Run("one-origin-namespace-entry", func(t *testing.T) {
setup()
defer teardown()
serverAds.Set(mockOriginServerAd, mockNamespaceAds(1, "origin1"), ttlcache.DefaultTTL)
ns := ListNamespacesFromOrigins()

Expand All @@ -85,7 +81,6 @@ func TestListNamespaces(t *testing.T) {
})
t.Run("multiple-origin-namespace-entries-from-same-origin", func(t *testing.T) {
setup()
defer teardown()
serverAds.Set(mockOriginServerAd, mockNamespaceAds(10, "origin1"), ttlcache.DefaultTTL)
ns := ListNamespacesFromOrigins()

Expand All @@ -94,7 +89,6 @@ func TestListNamespaces(t *testing.T) {
})
t.Run("multiple-origin-namespace-entries-from-different-origins", func(t *testing.T) {
setup()
defer teardown()

serverAds.Set(mockOriginServerAd, mockNamespaceAds(10, "origin1"), ttlcache.DefaultTTL)

Expand All @@ -112,7 +106,6 @@ func TestListNamespaces(t *testing.T) {
})
t.Run("one-cache-namespace-entry", func(t *testing.T) {
setup()
defer teardown()
serverAds.Set(mockCacheServerAd, mockNamespaceAds(1, "cache1"), ttlcache.DefaultTTL)
ns := ListNamespacesFromOrigins()

Expand All @@ -124,13 +117,21 @@ func TestListNamespaces(t *testing.T) {
func TestListServerAds(t *testing.T) {

t.Run("emtpy-cache", func(t *testing.T) {
serverAds.DeleteAll()
func() {
serverAdMutex.Lock()
defer serverAdMutex.Unlock()
serverAds.DeleteAll()
}()
ads := ListServerAds([]ServerType{OriginType, CacheType})
assert.Equal(t, 0, len(ads))
})

t.Run("get-by-server-type", func(t *testing.T) {
serverAds.DeleteAll()
func() {
serverAdMutex.Lock()
defer serverAdMutex.Unlock()
serverAds.DeleteAll()
}()
serverAds.Set(mockOriginServerAd, []NamespaceAd{}, ttlcache.DefaultTTL)
serverAds.Set(mockCacheServerAd, []NamespaceAd{}, ttlcache.DefaultTTL)
adsAll := ListServerAds([]ServerType{OriginType, CacheType})
Expand Down
12 changes: 7 additions & 5 deletions director/redirect.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"path"
"regexp"
"strings"
"sync"

"github.com/pelicanplatform/pelican/param"

Expand All @@ -42,9 +43,10 @@ type PromDiscoveryItem struct {
}

var (
minClientVersion, _ = version.NewVersion("7.0.0")
minOriginVersion, _ = version.NewVersion("7.0.0")
healthTestCancelFuncs = make(map[ServerAd]context.CancelFunc)
minClientVersion, _ = version.NewVersion("7.0.0")
minOriginVersion, _ = version.NewVersion("7.0.0")
healthTestCancelFuncs = make(map[ServerAd]context.CancelFunc)
healthTestCancelFuncsMutex = sync.RWMutex{}
)

func getRedirectURL(reqPath string, ad ServerAd, requiresAuth bool) (redirectURL url.URL) {
Expand Down Expand Up @@ -439,8 +441,8 @@ func registerServeAd(ctx *gin.Context, sType ServerType) {

// Start director periodic test of origin's health status if origin AD
// has WebURL field AND it's not already been registered
serverAdMutex.RLock()
defer serverAdMutex.RUnlock()
healthTestCancelFuncsMutex.Lock()
defer healthTestCancelFuncsMutex.Unlock()
if ad.WebURL != "" && !hasOriginAdInCache {
ctx, cancel := context.WithCancel(context.Background())
healthTestCancelFuncs[sAd] = cancel
Expand Down
14 changes: 10 additions & 4 deletions director/redirect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ func TestDirectorRegistration(t *testing.T) {
namespaceKeys.Set("/foo/bar", &ar, ttlcache.DefaultTTL)
}

teardown := func() {
serverAdMutex.Lock()
defer serverAdMutex.Unlock()
serverAds.DeleteAll()
}

t.Run("valid-token", func(t *testing.T) {
c, r, w := setupContext()
pKey, token, issuerURL := generateToken(c)
Expand Down Expand Up @@ -184,7 +190,7 @@ func TestDirectorRegistration(t *testing.T) {
namaspaceADs := ListNamespacesFromOrigins()
// If the origin was successfully registed at director, we should be able to find it in director's originAds
assert.True(t, NamespaceAdContainsPath(namaspaceADs, "/foo/bar"), "Coudln't find namespace in the director cache.")
serverAds.DeleteAll()
teardown()
})

// Now repeat the above test, but with an invalid token
Expand Down Expand Up @@ -217,7 +223,7 @@ func TestDirectorRegistration(t *testing.T) {

namaspaceADs := ListNamespacesFromOrigins()
assert.False(t, NamespaceAdContainsPath(namaspaceADs, "/foo/bar"), "Found namespace in the director cache even if the token validation failed.")
serverAds.DeleteAll()
teardown()
})

t.Run("valid-token-with-web-url", func(t *testing.T) {
Expand All @@ -243,7 +249,7 @@ func TestDirectorRegistration(t *testing.T) {
assert.Equal(t, 200, w.Result().StatusCode, "Expected status code of 200")
assert.Equal(t, 1, len(serverAds.Keys()), "Origin fail to register at serverAds")
assert.Equal(t, "https://localhost:8844", serverAds.Keys()[0].WebURL.String(), "WebURL in serverAds does not match data in origin registration request")
serverAds.DeleteAll()
teardown()
})

// We want to ensure backwards compatibility for WebURL
Expand All @@ -270,7 +276,7 @@ func TestDirectorRegistration(t *testing.T) {
assert.Equal(t, 200, w.Result().StatusCode, "Expected status code of 200")
assert.Equal(t, 1, len(serverAds.Keys()), "Origin fail to register at serverAds")
assert.Equal(t, "", serverAds.Keys()[0].WebURL.String(), "WebURL in serverAds isn't empty with no WebURL provided in registration")
serverAds.DeleteAll()
teardown()
})
}

Expand Down

0 comments on commit 173e07f

Please # to comment.