Skip to content

Commit

Permalink
runservice: add run workspace cleaner
Browse files Browse the repository at this point in the history
Removes old workspace files (defaults to 7 days)
  • Loading branch information
sgotti committed Sep 17, 2019
1 parent ab06a22 commit 7d375e4
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 4 deletions.
6 changes: 4 additions & 2 deletions internal/services/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ type Runservice struct {
Etcd Etcd `yaml:"etcd"`
ObjectStorage ObjectStorage `yaml:"objectStorage"`

RunCacheExpireInterval time.Duration `yaml:"runCacheExpireInterval"`
RunCacheExpireInterval time.Duration `yaml:"runCacheExpireInterval"`
RunWorkspaceExpireInterval time.Duration `yaml:"runWorkspaceExpireInterval"`
}

type Executor struct {
Expand Down Expand Up @@ -222,7 +223,8 @@ var defaultConfig = Config{
},
},
Runservice: Runservice{
RunCacheExpireInterval: 7 * 24 * time.Hour,
RunCacheExpireInterval: 7 * 24 * time.Hour,
RunWorkspaceExpireInterval: 7 * 24 * time.Hour,
},
Executor: Executor{
ActiveTasksLimit: 2,
Expand Down
1 change: 1 addition & 0 deletions internal/services/runservice/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ var (

EtcdCompactChangeGroupsLockKey = path.Join(EtcdSchedulerBaseDir, "compactchangegroupslock")
EtcdCacheCleanerLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "cachecleaner")
EtcdWorkspaceCleanerLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "workspacecleaner")
EtcdTaskUpdaterLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "taskupdater")

EtcdMaintenanceKey = "maintenance"
Expand Down
1 change: 1 addition & 0 deletions internal/services/runservice/runservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ func (s *Runservice) run(ctx context.Context) error {
util.GoWait(&wg, func() { s.finishedRunsArchiverLoop(ctx) })
util.GoWait(&wg, func() { s.compactChangeGroupsLoop(ctx) })
util.GoWait(&wg, func() { s.cacheCleanerLoop(ctx, s.c.RunCacheExpireInterval) })
util.GoWait(&wg, func() { s.workspaceCleanerLoop(ctx, s.c.RunWorkspaceExpireInterval) })
util.GoWait(&wg, func() { s.executorTaskUpdateHandler(ctx, ch) })
util.GoWait(&wg, func() { s.etcdPingerLoop(ctx) })
}
Expand Down
55 changes: 54 additions & 1 deletion internal/services/runservice/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ import (
)

const (
cacheCleanerInterval = 1 * 24 * time.Hour
cacheCleanerInterval = 1 * 24 * time.Hour
workspaceCleanerInterval = 1 * 24 * time.Hour

defaultExecutorNotAliveInterval = 60 * time.Second
)
Expand Down Expand Up @@ -1432,3 +1433,55 @@ func (s *Runservice) cacheCleaner(ctx context.Context, cacheExpireInterval time.

return nil
}

func (s *Runservice) workspaceCleanerLoop(ctx context.Context, workspaceExpireInterval time.Duration) {
for {
if err := s.workspaceCleaner(ctx, workspaceExpireInterval); err != nil {
log.Errorf("err: %+v", err)
}

sleepCh := time.NewTimer(workspaceCleanerInterval).C
select {
case <-ctx.Done():
return
case <-sleepCh:
}
}
}

func (s *Runservice) workspaceCleaner(ctx context.Context, workspaceExpireInterval time.Duration) error {
log.Debugf("workspaceCleaner")

session, err := concurrency.NewSession(s.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx))
if err != nil {
return err
}
defer session.Close()

m := concurrency.NewMutex(session, common.EtcdWorkspaceCleanerLockKey)

// TODO(sgotti) find a way to use a trylock so we'll just return if already
// locked. Currently multiple workspacecleaners will enqueue and start when another
// finishes (unuseful and consume resources)
if err := m.Lock(ctx); err != nil {
return err
}
defer func() { _ = m.Unlock(ctx) }()

doneCh := make(chan struct{})
defer close(doneCh)
for object := range s.ost.List(store.OSTArchivesBaseDir()+"/", "", true, doneCh) {
if object.Err != nil {
return object.Err
}
if object.LastModified.Add(workspaceExpireInterval).Before(time.Now()) {
if err := s.ost.DeleteObject(object.Path); err != nil {
if err != ostypes.ErrNotExist {
log.Warnf("failed to delete workspace object %q: %v", object.Path, err)
}
}
}
}

return nil
}
18 changes: 17 additions & 1 deletion internal/services/runservice/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,12 @@ func OSTRunTaskLogsRunPath(rtID, runID string) string {
return path.Join(OSTRunTaskLogsRunsDir(rtID), runID)
}

func OSTArchivesBaseDir() string {
return "workspacearchives"
}

func OSTRunTaskArchivesBaseDir(rtID string) string {
return path.Join("workspacearchives", rtID)
return path.Join(OSTArchivesBaseDir(), rtID)
}

func OSTRunTaskArchivesDataDir(rtID string) string {
Expand All @@ -155,6 +159,18 @@ func OSTRunTaskArchivesRunPath(rtID, runID string) string {
return path.Join(OSTRunTaskArchivesRunsDir(rtID), runID)
}

func OSTRunTaskIDFromPath(archivePath string) (string, error) {
pl := util.PathList(archivePath)
if len(pl) < 2 {
return "", errors.Errorf("wrong archive path %q", archivePath)
}
fmt.Printf("pl: %q\n", pl)
if pl[0] != "workspacearchives" {
return "", errors.Errorf("wrong archive path %q", archivePath)
}
return pl[1], nil
}

func OSTCacheDir() string {
return "caches"
}
Expand Down
81 changes: 81 additions & 0 deletions internal/services/runservice/store/store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2019 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
// limitations under the License.

package store

import (
"fmt"
"testing"
)

func TestOSTRunTaskIDFromArchivePath(t *testing.T) {
tests := []struct {
name string
archivePath string
out string
err error
}{
{
name: "test no runs 1",
archivePath: "aaaa",
err: fmt.Errorf("wrong archive path %q", "aaaa"),
},
{
name: "test no runs 1",
archivePath: "workspacearchives",
err: fmt.Errorf("wrong archive path %q", "workspacearchives"),
},
{
name: "test no runs 1",
archivePath: "/workspacearchives/",
err: fmt.Errorf("wrong archive path %q", "/workspacearchives/"),
},
{
name: "test no runs 1",
archivePath: "workspacearchives/2502c5c7-0fd9-432b-918e-3ccf31a664f8/data/3.tar",
out: "2502c5c7-0fd9-432b-918e-3ccf31a664f8",
},
{
name: "test no runs 1",
archivePath: "workspacearchives/2502c5c7-0fd9-432b-918e-3ccf31a664f8/data/3.tar",
out: "2502c5c7-0fd9-432b-918e-3ccf31a664f8",
},
{
name: "test no runs 1",
archivePath: "workspacearchives/2502c5c7-0fd9-432b-918e-3ccf31a664f8/data/3.tar",
out: "2502c5c7-0fd9-432b-918e-3ccf31a664f8",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
id, err := OSTRunTaskIDFromPath(tt.archivePath)
if err != nil {
if tt.err == nil {
t.Fatalf("got error: %v, expected no error", err)
}
if err.Error() != tt.err.Error() {
t.Fatalf("got error: %v, want error: %v", err, tt.err)
}
} else {
if tt.err != nil {
t.Fatalf("got nil error, want error: %v", tt.err)
}
if id != tt.out {
t.Fatalf("got id: %q, want id: %q", id, tt.out)
}
}
})
}
}

0 comments on commit 7d375e4

Please # to comment.