Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

feature: enhance dry-run with cache status #1988

Merged
merged 22 commits into from
Sep 22, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cli/internal/cache/async_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func (c *asyncCache) Fetch(target string, key string, files []string) (bool, []s
return c.realCache.Fetch(target, key, files)
}

func (c *asyncCache) Exists(key string) (CacheState, error) {
return c.realCache.Exists(key)
}

func (c *asyncCache) Clean(target string) {
c.realCache.Clean(target)
}
Expand Down
24 changes: 24 additions & 0 deletions cli/internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,19 @@ type Cache interface {
// Fetch returns true if there is a cache it. It is expected to move files
// into their correct position as a side effect
Fetch(target string, hash string, files []string) (bool, []string, int, error)
Exists(hash string) (CacheState, error)
// Put caches files for a given hash
Put(target string, hash string, duration int, files []string) error
Clean(target string)
CleanAll()
Shutdown()
}

type CacheState struct {
Local bool `json:"local"`
Remote bool `json:"remote"`
}

const cacheEventHit = "HIT"
const cacheEventMiss = "MISS"

Expand Down Expand Up @@ -271,6 +277,24 @@ func (mplex *cacheMultiplexer) Fetch(target string, key string, files []string)
return false, files, 0, nil
}

func (mplex *cacheMultiplexer) Exists(target string) (CacheState, error) {
syncCacheState := CacheState{}
for _, cache := range mplex.caches {
cacheState, err := cache.Exists(target)
if err != nil {
return syncCacheState, err
}
if cacheState.Local == true {
syncCacheState.Local = syncCacheState.Local || cacheState.Local
}
if cacheState.Remote == true {
syncCacheState.Remote = syncCacheState.Remote || cacheState.Remote
}
}

return syncCacheState, nil
}

func (mplex *cacheMultiplexer) Clean(target string) {
for _, cache := range mplex.caches {
cache.Clean(target)
Expand Down
10 changes: 10 additions & 0 deletions cli/internal/cache/cache_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@ func (f *fsCache) Fetch(target, hash string, _unusedOutputGlobs []string) (bool,
return true, nil, meta.Duration, nil
}

func (f *fsCache) Exists(hash string) (CacheState, error) {
cachedFolder := filepath.Join(f.cacheDirectory, hash)

if !fs.PathExists(cachedFolder) {
return CacheState{Local: false}, nil
}

return CacheState{Local: true}, nil
}

func (f *fsCache) logFetch(hit bool, hash string, duration int) {
var event string
if hit {
Expand Down
25 changes: 25 additions & 0 deletions cli/internal/cache/cache_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
type client interface {
PutArtifact(hash string, body []byte, duration int, tag string) error
FetchArtifact(hash string) (*http.Response, error)
ArtifactExists(hash string) (*http.Response, error)
GetTeamID() string
}

Expand Down Expand Up @@ -150,6 +151,16 @@ func (cache *httpCache) Fetch(target, key string, _unusedOutputGlobs []string) (
return hit, files, duration, err
}

func (cache *httpCache) Exists(key string) (CacheState, error) {
cache.requestLimiter.acquire()
defer cache.requestLimiter.release()
hit, err := cache.exists(key)
if err != nil {
return CacheState{}, fmt.Errorf("failed to verify files from HTTP cache: %w", err)
}
return CacheState{Remote: hit}, err
}

func (cache *httpCache) logFetch(hit bool, hash string, duration int) {
var event string
if hit {
Expand All @@ -166,6 +177,20 @@ func (cache *httpCache) logFetch(hit bool, hash string, duration int) {
cache.recorder.LogEvent(payload)
}

func (cache *httpCache) exists(hash string) (bool, error) {
resp, err := cache.client.ArtifactExists(hash)
if err != nil {
return false, nil
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNotFound {
return false, nil
} else if resp.StatusCode != http.StatusOK {
return false, fmt.Errorf("%s", strconv.Itoa(resp.StatusCode))
}
return true, nil
}

func (cache *httpCache) retrieve(hash string) (bool, []string, int, error) {
resp, err := cache.client.FetchArtifact(hash)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions cli/internal/cache/cache_http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ func (sr *errorResp) FetchArtifact(hash string) (*http.Response, error) {
return nil, sr.err
}

func (sr *errorResp) ArtifactExists(hash string) (*http.Response, error) {
return nil, sr.err
}

func (sr *errorResp) GetTeamID() string {
return ""
}
Expand Down
4 changes: 4 additions & 0 deletions cli/internal/cache/cache_noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ func (c *noopCache) Put(target string, key string, duration int, files []string)
func (c *noopCache) Fetch(target string, key string, files []string) (bool, []string, int, error) {
return false, nil, 0, nil
}
func (c *noopCache) Exists(key string) (CacheState, error) {
return CacheState{}, nil
}

func (c *noopCache) Clean(target string) {}
func (c *noopCache) CleanAll() {}
func (c *noopCache) Shutdown() {}
47 changes: 47 additions & 0 deletions cli/internal/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,17 @@ func (tc *testCache) Fetch(target string, hash string, files []string) (bool, []
return false, nil, 0, nil
}

func (tc *testCache) Exists(hash string) (CacheState, error) {
if tc.disabledErr != nil {
return CacheState{}, nil
}
_, ok := tc.entries[hash]
if ok {
return CacheState{Local: true}, nil
}
return CacheState{}, nil
}

func (tc *testCache) Put(target string, hash string, duration int, files []string) error {
if tc.disabledErr != nil {
return tc.disabledErr
Expand Down Expand Up @@ -108,13 +119,49 @@ func TestPutCachingDisabled(t *testing.T) {
}
}

func TestExists(t *testing.T) {
caches := []Cache{
newEnabledCache(),
}

mplex := &cacheMultiplexer{
caches: caches,
}

cacheState, err := mplex.Exists("some-hash")
if err != nil {
t.Errorf("got error verifying files: %v", err)
}
if cacheState.Local {
t.Error("did not expect file to exist")
}

err = mplex.Put("unused-target", "some-hash", 5, []string{"a-file"})
if err != nil {
// don't leak the cache removal
t.Errorf("Put got error %v, want <nil>", err)
}

cacheState, err = mplex.Exists("some-hash")
if err != nil {
t.Errorf("got error verifying files: %v", err)
}
if !cacheState.Local {
t.Error("failed to find previously stored files")
}
}

type fakeClient struct{}

// FetchArtifact implements client
func (*fakeClient) FetchArtifact(hash string) (*http.Response, error) {
panic("unimplemented")
}

func (*fakeClient) ArtifactExists(hash string) (*http.Response, error) {
panic("unimplemented")
}

// GetTeamID implements client
func (*fakeClient) GetTeamID() string {
return "fake-team-id"
Expand Down
17 changes: 16 additions & 1 deletion cli/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,21 @@ func (c *ApiClient) PutArtifact(hash string, artifactBody []byte, duration int,
// FetchArtifact attempts to retrieve the build artifact with the given hash from the
// Remote Caching server
func (c *ApiClient) FetchArtifact(hash string) (*http.Response, error) {
return c.getArtifact(hash, http.MethodGet)
}

func (c *ApiClient) ArtifactExists(hash string) (*http.Response, error) {
return c.getArtifact(hash, http.MethodHead)
}

// FetchArtifact attempts to retrieve the build artifact with the given hash from the
// Remote Caching server
func (c *ApiClient) getArtifact(hash string, httpMethod string) (*http.Response, error) {

if httpMethod != http.MethodHead && httpMethod != http.MethodGet {
return nil, fmt.Errorf("invalid httpMethod %v, expected GET or HEAD", httpMethod)
}

if err := c.okToRequest(); err != nil {
return nil, err
}
Expand All @@ -325,7 +340,7 @@ func (c *ApiClient) FetchArtifact(hash string) (*http.Response, error) {
allowAuth = strings.Contains(strings.ToLower(headers), strings.ToLower("Authorization"))
}

req, err := retryablehttp.NewRequest(http.MethodGet, requestURL, nil)
req, err := retryablehttp.NewRequest(httpMethod, requestURL, nil)
if allowAuth {
req.Header.Set("Authorization", "Bearer "+c.token)
}
Expand Down
62 changes: 49 additions & 13 deletions cli/internal/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"path/filepath"
"regexp"
"sort"
"strconv"
"strings"
"sync"
"text/tabwriter"
Expand Down Expand Up @@ -347,6 +348,8 @@ func (r *run) runOperation(ctx gocontext.Context, g *completeGraph, rs *runSpec,
fmt.Fprintln(w, util.Sprintf(" ${GREY}Task\t=\t%s\t${RESET}", task.Task))
fmt.Fprintln(w, util.Sprintf(" ${GREY}Package\t=\t%s\t${RESET}", task.Package))
fmt.Fprintln(w, util.Sprintf(" ${GREY}Hash\t=\t%s\t${RESET}", task.Hash))
fmt.Fprintln(w, util.Sprintf(" ${GREY}Cached (Local)\t=\t%s\t${RESET}", strconv.FormatBool(task.CacheState.Local)))
fmt.Fprintln(w, util.Sprintf(" ${GREY}Cached (Remote)\t=\t%s\t${RESET}", strconv.FormatBool(task.CacheState.Remote)))
fmt.Fprintln(w, util.Sprintf(" ${GREY}Directory\t=\t%s\t${RESET}", task.Dir))
fmt.Fprintln(w, util.Sprintf(" ${GREY}Command\t=\t%s\t${RESET}", task.Command))
fmt.Fprintln(w, util.Sprintf(" ${GREY}Outputs\t=\t%s\t${RESET}", strings.Join(task.Outputs, ", ")))
Expand Down Expand Up @@ -609,7 +612,7 @@ func (r *run) logWarning(prefix string, err error) {
r.base.UI.Error(fmt.Sprintf("%s%s%s", ui.WARNING_PREFIX, prefix, color.YellowString(" %v", err)))
}

func (r *run) executeTasks(ctx gocontext.Context, g *completeGraph, rs *runSpec, engine *core.Scheduler, packageManager *packagemanager.PackageManager, hashes *taskhash.Tracker, startAt time.Time) error {
func (r *run) initAnalyticsClient(ctx gocontext.Context) analytics.Client {
apiClient := r.base.APIClient
var analyticsSink analytics.Sink
if apiClient.IsLinked() {
Expand All @@ -619,17 +622,28 @@ func (r *run) executeTasks(ctx gocontext.Context, g *completeGraph, rs *runSpec,
analyticsSink = analytics.NullSink
}
analyticsClient := analytics.NewClient(ctx, analyticsSink, r.base.Logger.Named("analytics"))
defer analyticsClient.CloseWithTimeout(50 * time.Millisecond)
return analyticsClient
}

func (r *run) initCache(ctx gocontext.Context, rs *runSpec, analyticsClient analytics.Client) (cache.Cache, error) {
apiClient := r.base.APIClient
// Theoretically this is overkill, but bias towards not spamming the console
once := &sync.Once{}
turboCache, err := cache.New(rs.Opts.cacheOpts, r.base.RepoRoot, apiClient, analyticsClient, func(_cache cache.Cache, err error) {
return cache.New(rs.Opts.cacheOpts, r.base.RepoRoot, apiClient, analyticsClient, func(_cache cache.Cache, err error) {
// Currently the HTTP Cache is the only one that can be disabled.
// With a cache system refactor, we might consider giving names to the caches so
// we can accurately report them here.
once.Do(func() {
r.logWarning("Remote Caching is unavailable", err)
})
})
}

func (r *run) executeTasks(ctx gocontext.Context, g *completeGraph, rs *runSpec, engine *core.Scheduler, packageManager *packagemanager.PackageManager, hashes *taskhash.Tracker, startAt time.Time) error {
analyticsClient := r.initAnalyticsClient(ctx)
defer analyticsClient.CloseWithTimeout(50 * time.Millisecond)

turboCache, err := r.initCache(ctx, rs, analyticsClient)
if err != nil {
if errors.Is(err, cache.ErrNoCachesEnabled) {
r.logWarning("No caches are enabled. You can try \"turbo login\", \"turbo link\", or ensuring you are not passing --remote-only to enable caching", nil)
Expand Down Expand Up @@ -692,20 +706,35 @@ func (r *run) executeTasks(ctx gocontext.Context, g *completeGraph, rs *runSpec,
}

type hashedTask struct {
TaskID string `json:"taskId"`
Task string `json:"task"`
Package string `json:"package"`
Hash string `json:"hash"`
Command string `json:"command"`
Outputs []string `json:"outputs"`
LogFile string `json:"logFile"`
Dir string `json:"directory"`
Dependencies []string `json:"dependencies"`
Dependents []string `json:"dependents"`
TaskID string `json:"taskId"`
Task string `json:"task"`
Package string `json:"package"`
Hash string `json:"hash"`
CacheState cache.CacheState `json:"cacheState"`
Command string `json:"command"`
Outputs []string `json:"outputs"`
LogFile string `json:"logFile"`
Dir string `json:"directory"`
Dependencies []string `json:"dependencies"`
Dependents []string `json:"dependents"`
}

func (r *run) executeDryRun(ctx gocontext.Context, engine *core.Scheduler, g *completeGraph, taskHashes *taskhash.Tracker, rs *runSpec) ([]hashedTask, error) {
analyticsClient := r.initAnalyticsClient(ctx)
defer analyticsClient.CloseWithTimeout(50 * time.Millisecond)
turboCache, err := r.initCache(ctx, rs, analyticsClient)
defer turboCache.Shutdown()

if err != nil {
if errors.Is(err, cache.ErrNoCachesEnabled) {
r.logWarning("No caches are enabled. You can try \"turbo login\", \"turbo link\", or ensuring you are not passing --remote-only to enable caching", nil)
} else {
return nil, errors.Wrap(err, "failed to set up caching")
}
}

taskIDs := []hashedTask{}

errs := engine.Execute(g.getPackageTaskVisitor(ctx, func(ctx gocontext.Context, packageTask *nodes.PackageTask) error {
passThroughArgs := rs.ArgsForTask(packageTask.Task)
deps := engine.TaskGraph.DownEdges(packageTask.TaskID)
Expand Down Expand Up @@ -745,18 +774,25 @@ func (r *run) executeDryRun(ctx gocontext.Context, engine *core.Scheduler, g *co
}
sort.Strings(stringDescendents)

cacheState, err := turboCache.Exists(hash)
if err != nil {
return err
}

taskIDs = append(taskIDs, hashedTask{
TaskID: packageTask.TaskID,
Task: packageTask.Task,
Package: packageTask.PackageName,
Hash: hash,
CacheState: cacheState,
Command: command,
Dir: packageTask.Pkg.Dir.ToString(),
Outputs: packageTask.TaskDefinition.Outputs,
LogFile: packageTask.RepoRelativeLogFile(),
Dependencies: stringAncestors,
Dependents: stringDescendents,
})

return nil
}), core.ExecOpts{
Concurrency: 1,
Expand Down