Skip to content

Commit

Permalink
Merge pull request #416 from haoming29/test-cache-eviction
Browse files Browse the repository at this point in the history
Ensure TTL caches evict expired item
  • Loading branch information
jhiemstrawisc authored Nov 29, 2023
2 parents d937eb1 + a143e71 commit 146e8e1
Show file tree
Hide file tree
Showing 10 changed files with 241 additions and 27 deletions.
17 changes: 15 additions & 2 deletions cmd/cache_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"net/url"
"os"
"strings"
"sync"
"time"

"github.com/lestrrat-go/jwx/v2/jwk"
Expand Down Expand Up @@ -145,14 +146,26 @@ func advertiseCache(prefix string, nsAds []director.NamespaceAd) error {
}

func serveCache( /*cmd*/ *cobra.Command /*args*/, []string) error {
defer config.CleanupTempResources()
// Use this context for any goroutines that needs to react to server shutdown
shutdownCtx, shutdownCancel := context.WithCancel(context.Background())
// Use this wait group to ensure the goroutines can finish before the server exits/shutdown
var wg sync.WaitGroup

// This anonymous function ensures we cancel any context and wait for those goroutines to
// finish their cleanup work before the server exits
defer func() {
shutdownCancel()
wg.Wait()
config.CleanupTempResources()
}()

err := config.DiscoverFederation()
if err != nil {
log.Warningln("Failed to do service auto-discovery:", err)
}

err = xrootd.SetUpMonitoring()
wg.Add(1)
err = xrootd.SetUpMonitoring(shutdownCtx, &wg)
if err != nil {
return err
}
Expand Down
21 changes: 17 additions & 4 deletions cmd/director_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
package main

import (
"context"
"fmt"
"os"
"os/signal"
"sync"
"syscall"

"github.com/pelicanplatform/pelican/config"
Expand All @@ -33,6 +35,18 @@ import (
)

func serveDirector( /*cmd*/ *cobra.Command /*args*/, []string) error {
// Use this context for any goroutines that needs to react to server shutdown
shutdownCtx, shutdownCancel := context.WithCancel(context.Background())
// Use this wait group to ensure the goroutines can finish before the server exits/shutdown
var wg sync.WaitGroup

// This anonymous function ensures we cancel any context and wait for those goroutines to
// finish their cleanup work before the server exits
defer func() {
shutdownCancel()
wg.Wait()
}()

log.Info("Initializing Director GeoIP database...")
director.InitializeDB()

Expand All @@ -47,6 +61,9 @@ func serveDirector( /*cmd*/ *cobra.Command /*args*/, []string) error {
}
go director.PeriodicCacheReload()

wg.Add(1)
director.ConfigTTLCache(shutdownCtx, &wg)

engine, err := web_ui.GetEngine()
if err != nil {
return err
Expand All @@ -59,10 +76,6 @@ func serveDirector( /*cmd*/ *cobra.Command /*args*/, []string) error {
return err
}

// Configure Cache eviction policies. In the future, this function can also be
// promoted to run the actual eviction logic (cache.start())
director.ConfigCacheEviction()

// Configure the shortcut middleware to either redirect to a cache
// or to an origin
defaultResponse := param.Director_DefaultResponse.GetString()
Expand Down
19 changes: 16 additions & 3 deletions cmd/origin_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"fmt"
"net/url"
"os"
"sync"

"github.com/pelicanplatform/pelican/config"
"github.com/pelicanplatform/pelican/daemon"
Expand Down Expand Up @@ -101,9 +102,21 @@ func checkDefaults(origin bool, nsAds []director.NamespaceAd) error {
}

func serveOrigin( /*cmd*/ *cobra.Command /*args*/, []string) error {
defer config.CleanupTempResources()

err := xrootd.SetUpMonitoring()
// Use this context for any goroutines that needs to react to server shutdown
shutdownCtx, shutdownCancel := context.WithCancel(context.Background())
// Use this wait group to ensure the goroutines can finish before the server exits/shutdown
var wg sync.WaitGroup

// This anonymous function ensures we cancel any context and wait for those goroutines to
// finish their cleanup work before the server exits
defer func() {
shutdownCancel()
wg.Wait()
config.CleanupTempResources()
}()

wg.Add(1)
err := xrootd.SetUpMonitoring(shutdownCtx, &wg)
if err != nil {
return err
}
Expand Down
13 changes: 0 additions & 13 deletions director/cache_ads.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package director

import (
"context"
"errors"
"fmt"
"net"
Expand Down Expand Up @@ -196,15 +195,3 @@ func GetAdsForPath(reqPath string) (originNamespace NamespaceAd, originAds []Ser
}
return
}

func ConfigCacheEviction() {
serverAds.OnEviction(func(ctx context.Context, er ttlcache.EvictionReason, i *ttlcache.Item[ServerAd, []NamespaceAd]) {
if cancelFunc, exists := healthTestCancelFuncs[i.Key()]; exists {
// Call the cancel function for the evicted originAd to end its health test
cancelFunc()

// Remove the cancel function from the map as it's no longer needed
delete(healthTestCancelFuncs, i.Key())
}
})
}
64 changes: 63 additions & 1 deletion director/cache_ads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package director
import (
"context"
"net/url"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -211,7 +212,16 @@ func TestConfigCacheEviction(t *testing.T) {
serverAds.DeleteAll()
// Clear the map for the new test
healthTestCancelFuncs = make(map[ServerAd]context.CancelFunc)
ConfigCacheEviction()

// Start cache eviction
shutdownCtx, shutdownCancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
wg.Add(1)
ConfigTTLCache(shutdownCtx, &wg)
defer func() {
shutdownCancel()
wg.Wait()
}()

serverAds.Set(mockPelicanOriginServerAd, []NamespaceAd{mockNamespaceAd}, ttlcache.DefaultTTL)
ctx, cancelFunc := context.WithDeadline(context.Background(), time.Now().Add(time.Second*5))
Expand Down Expand Up @@ -243,3 +253,55 @@ func TestConfigCacheEviction(t *testing.T) {
assert.True(t, healthTestCancelFuncs[mockPelicanOriginServerAd] == nil, "Evicted origin didn't clear cancelFunc in the map")
})
}

func TestServerAdsCacheEviction(t *testing.T) {
mockServerAd := ServerAd{Name: "foo", Type: OriginType, URL: url.URL{}}

t.Run("evict-after-expire-time", func(t *testing.T) {
// Start cache eviction
shutdownCtx, shutdownCancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
wg.Add(1)
ConfigTTLCache(shutdownCtx, &wg)
defer func() {
shutdownCancel()
wg.Wait()
}()

deletedChan := make(chan int)
cancelChan := make(chan int)

go func() {
serverAdMutex.Lock()
defer serverAdMutex.Unlock()
serverAds.DeleteAll()

serverAds.Set(mockServerAd, []NamespaceAd{}, time.Second*2)
require.True(t, serverAds.Has(mockServerAd), "Failed to register server Ad")
}()

// Keep checking if the cache item is present until absent or cancelled
go func() {
for {
select {
case <-cancelChan:
return
default:
if !serverAds.Has(mockServerAd) {
deletedChan <- 1
return
}
}
}
}()

// Wait for 3s to check if the expired cache item is evicted
select {
case <-deletedChan:
require.True(t, true)
case <-time.After(3 * time.Second):
cancelChan <- 1
require.False(t, true, "Cache didn't evict expired item")
}
})
}
32 changes: 32 additions & 0 deletions director/director_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@
package director

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"sync"
"time"

"github.com/jellydator/ttlcache/v3"
"github.com/lestrrat-go/jwx/v2/jwa"
"github.com/lestrrat-go/jwx/v2/jwk"
"github.com/lestrrat-go/jwx/v2/jwt"
Expand Down Expand Up @@ -258,3 +261,32 @@ func CreateDirectorScrapeToken() (string, error) {
}
return string(signed), nil
}

// Configure TTL caches to enable cache eviction and other additional cache events handling logic
//
// The `ctx` is the context for listening to server shutdown event in order to cleanup internal cache eviction
// goroutine and `wg` is the wait group to notify when the clean up goroutine finishes
func ConfigTTLCache(ctx context.Context, wg *sync.WaitGroup) {
// Start automatic expired item deletion
go serverAds.Start()
go namespaceKeys.Start()

serverAds.OnEviction(func(ctx context.Context, er ttlcache.EvictionReason, i *ttlcache.Item[ServerAd, []NamespaceAd]) {
if cancelFunc, exists := healthTestCancelFuncs[i.Key()]; exists {
// Call the cancel function for the evicted originAd to end its health test
cancelFunc()

// Remove the cancel function from the map as it's no longer needed
delete(healthTestCancelFuncs, i.Key())
}
})

// Put stop logic in a separate goroutine so that parent function is not blocking
go func() {
defer wg.Done()
<-ctx.Done()
log.Info("Gracefully stoppping TTL cache eviction...")
serverAds.Stop()
namespaceKeys.Stop()
}()
}
58 changes: 58 additions & 0 deletions director/origin_api_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package director

import (
"context"
"crypto/elliptic"
"path/filepath"
"sync"
"testing"
"time"

Expand All @@ -13,6 +15,7 @@ import (
"github.com/pelicanplatform/pelican/config"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestVerifyAdvertiseToken(t *testing.T) {
Expand Down Expand Up @@ -160,3 +163,58 @@ func TestGetRegistryIssuerURL(t *testing.T) {
assert.Equal(t, "test-path/api/v1.0/registry/test-prefix/.well-known/issuer.jwks", url)

}

func TestNamespaceKeysCacheEviction(t *testing.T) {
t.Run("evict-after-expire-time", func(t *testing.T) {
// Start cache eviction
shutdownCtx, shutdownCancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
wg.Add(1)
ConfigTTLCache(shutdownCtx, &wg)

defer func() {
shutdownCancel()
wg.Wait()
}()

mockNamespaceKey := "foo"
mockCtx := context.Background()
mockAr := jwk.NewCache(mockCtx)

deletedChan := make(chan int)
cancelChan := make(chan int)

go func() {
namespaceKeysMutex.Lock()
defer namespaceKeysMutex.Unlock()
namespaceKeys.DeleteAll()

namespaceKeys.Set(mockNamespaceKey, mockAr, time.Second*2)
require.True(t, namespaceKeys.Has(mockNamespaceKey), "Failed to register namespace key")
}()

// Keep checking if the cache item is absent or cancelled
go func() {
for {
select {
case <-cancelChan:
return
default:
if !namespaceKeys.Has(mockNamespaceKey) {
deletedChan <- 1
return
}
}
}
}()

// Wait for 3s to check if the expired cache item is evicted
select {
case <-deletedChan:
require.True(t, true)
case <-time.After(3 * time.Second):
cancelChan <- 1
require.False(t, true, "Cache didn't evict expired item")
}
})
}
Loading

0 comments on commit 146e8e1

Please # to comment.