Skip to content

Commit

Permalink
Add purge configuration and scheduled purger functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
fujiwara committed Nov 7, 2024
1 parent 713242e commit a1db244
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 24 deletions.
10 changes: 9 additions & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Config struct {
ECS ECSCfg `yaml:"ecs"`
Link Link `yaml:"link"`
Auth *Auth `yaml:"auth"`
Purge *Purge `yaml:"purge"`

compatV1 bool
localMode bool
Expand Down Expand Up @@ -234,7 +235,8 @@ func NewConfig(ctx context.Context, p *ConfigParams) (*Config, error) {
ECS: ECSCfg{
Region: os.Getenv("AWS_REGION"),
},
Auth: nil,
Auth: nil,
Purge: nil,

localMode: p.LocalMode,
compatV1: p.CompatV1,
Expand Down Expand Up @@ -318,6 +320,12 @@ func NewConfig(ctx context.Context, p *ConfigParams) (*Config, error) {
if err := cfg.fillECSDefaults(ctx); err != nil {
slog.Warn(f("failed to fill ECS defaults: %s", err))
}

if cfg.Purge != nil {
if err := cfg.Purge.Validate(); err != nil {
return nil, fmt.Errorf("invalid purge config: %w", err)
}
}
return cfg, nil
}

Expand Down
29 changes: 28 additions & 1 deletion mirage.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ func (m *Mirage) Run(ctx context.Context) error {
}(v.ListenPort)
}

wg.Add(2)
wg.Add(3)
go m.syncECSToMirage(ctx, &wg)
go m.RunAccessCountCollector(ctx, &wg)
go m.RunScheduledPurger(ctx, &wg)
wg.Wait()
slog.Info("shutdown mirage-ecs")
select {
Expand Down Expand Up @@ -147,6 +148,32 @@ func (m *Mirage) RunAccessCountCollector(ctx context.Context, wg *sync.WaitGroup
}
}

func (m *Mirage) RunScheduledPurger(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
p := m.Config.Purge
if p == nil {
slog.Debug("Purge is not configured")
return
}
slog.Info(f("starting up RunScheduledPurger() schedule: %s", p.cron.String()))
for {
now := time.Now().Add(time.Minute)
next := p.cron.Next(now)
slog.Info(f("next purge invocation at: %s", next))
duration := time.Until(next)
select {
case <-ctx.Done():
slog.Info("RunScheduledPurger() is done")
return
case <-time.After(duration):
slog.Info("scheduled purge invoked")
if err := m.WebApi.purge(ctx, p.purgeParams); err != nil {
slog.Warn(err.Error())
}
}
}
}

const (
CloudWatchMetricNameSpace = "mirage-ecs"
CloudWatchMetricName = "RequestCount"
Expand Down
34 changes: 34 additions & 0 deletions purge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package mirageecs

import (
"fmt"

"github.com/winebarrel/cronplan"

Check failure on line 6 in purge.go

View workflow job for this annotation

GitHub Actions / Build (1.21)

no required module provides package github.com/winebarrel/cronplan; to add it:

Check failure on line 6 in purge.go

View workflow job for this annotation

GitHub Actions / Build (1.22)

no required module provides package github.com/winebarrel/cronplan; to add it:

Check failure on line 6 in purge.go

View workflow job for this annotation

GitHub Actions / Build (1.22)

no required module provides package github.com/winebarrel/cronplan; to add it:
)

type Purge struct {
Schedule string `json:"schedule"`
Request *APIPurgeRequest `json:"request"`

purgeParams *PurgeParams
cron *cronplan.Expression
}

func (p *Purge) Validate() error {
cron, err := cronplan.Parse(p.Schedule)
if err != nil {
return fmt.Errorf("invalid schedule expression %s: %w", p.Schedule, err)
}
p.cron = cron

if p.Request == nil {
return fmt.Errorf("purge request is required")
}
purgeParams, err := p.Request.Validate()
if err != nil {
return fmt.Errorf("invalid purge request: %w", err)
}
p.purgeParams = purgeParams

return nil
}
8 changes: 4 additions & 4 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ func (r *APILaunchRequest) MergeForm(form url.Values) {
}

type APIPurgeRequest struct {
Duration json.Number `json:"duration" form:"duration"`
Excludes []string `json:"excludes" form:"excludes"`
ExcludeTags []string `json:"exclude_tags" form:"exclude_tags"`
ExcludeRegexp string `json:"exclude_regexp" form:"exclude_regexp"`
Duration json.Number `json:"duration" form:"duration" yaml:"duration"`
Excludes []string `json:"excludes" form:"excludes" yaml:"excludes"`
ExcludeTags []string `json:"exclude_tags" form:"exclude_tags" yaml:"exclude_tags"`
ExcludeRegexp string `json:"exclude_regexp" form:"exclude_regexp" yaml:"exclude_regexp"`
}

type PurgeParams struct {
Expand Down
36 changes: 18 additions & 18 deletions webapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,22 @@ func (api *WebApi) ApiAccess(c echo.Context) error {
}

func (api *WebApi) ApiPurge(c echo.Context) error {
code, err := api.purge(c)
r := APIPurgeRequest{}
if err := c.Bind(&r); err != nil {
return c.JSON(http.StatusBadRequest, APICommonResponse{Result: err.Error()})
}

params, err := r.Validate()
if err != nil {
return c.JSON(code, APICommonResponse{Result: err.Error()})
slog.Error(f("purge failed: %s", err))
return c.JSON(http.StatusBadRequest, APICommonResponse{Result: err.Error()})
}
return c.JSON(code, APICommonResponse{Result: "accepted"})

ctx := c.Request().Context()
if err := api.purge(ctx, params); err != nil {
return c.JSON(http.StatusInternalServerError, APICommonResponse{Result: err.Error()})
}
return c.JSON(http.StatusOK, APICommonResponse{Result: "accepted"})
}

func (api *WebApi) logs(c echo.Context) (int, []string, error) {
Expand Down Expand Up @@ -371,22 +382,11 @@ func validateSubdomain(s string) error {
return nil
}

func (api *WebApi) purge(c echo.Context) (int, error) {
r := APIPurgeRequest{}
if err := c.Bind(&r); err != nil {
return http.StatusBadRequest, err
}

p, err := r.Validate()
if err != nil {
slog.Error(f("purge failed: %s", err))
return http.StatusBadRequest, err
}

infos, err := api.runner.List(c.Request().Context(), statusRunning)
func (api *WebApi) purge(ctx context.Context, p *PurgeParams) error {
infos, err := api.runner.List(ctx, statusRunning)
if err != nil {
slog.Error(f("list ecs failed: %s", err))
return http.StatusInternalServerError, err
return fmt.Errorf("list tasks failed: %w", err)
}
slog.Info("purge subdomains",
"duration", p.Duration,
Expand All @@ -408,7 +408,7 @@ func (api *WebApi) purge(c echo.Context) (int, error) {
}

slog.Info("no subdomains to purge")
return http.StatusOK, nil
return nil
}

func (api *WebApi) purgeSubdomains(ctx context.Context, subdomains []string, duration time.Duration) {
Expand Down

0 comments on commit a1db244

Please # to comment.