Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

feat(monitor): automatically release cronjob lock when program exits #537

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion internal/database/dialer/postgres/client_partitioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,8 @@ func (c *client) deleteExpiredActivitiesPartitioned(ctx context.Context, network
continue
}

zap.L().Info("deleting expired activities", zap.String("table", activityTable), zap.String("indexTable", indexTable))

for {
done, err := c.batchDeleteExpiredActivities(ctx, network, timestamp, batchSize, &indexTable, lo.Ternary(activityTableExists, &activityTable, nil))
if err != nil {
Expand All @@ -572,7 +574,7 @@ func (c *client) deleteExpiredActivitiesPartitioned(ctx context.Context, network
}

func (c *client) batchDeleteExpiredActivities(ctx context.Context, network network.Network, timestamp time.Time, batchSize int, indexTable *string, activityTable *string) (bool, error) {
databaseTransaction := c.database.WithContext(ctx).Debug().Begin()
databaseTransaction := c.database.WithContext(ctx).Begin()
defer func() {
_ = databaseTransaction.Rollback().Error
}()
Expand Down
9 changes: 9 additions & 0 deletions internal/node/monitor/cronjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,20 @@ func (c *CronJob) Renewal(ctx context.Context) {
}

func (c *CronJob) Start() {
zap.L().Info("start cron job", zap.String("key", c.mutex.Name()))

c.crontab.Start()
}

func (c *CronJob) Stop() {
zap.L().Info("stop cron job", zap.String("key", c.mutex.Name()))

c.crontab.Stop()
c.ReleaseLock()
}

func (c *CronJob) ReleaseLock() {
_, _ = c.mutex.Unlock()
}

func NewCronJob(client rueidis.Client, name string, timeout time.Duration) (*CronJob, error) {
Expand Down
27 changes: 18 additions & 9 deletions internal/node/monitor/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ import (
"go.uber.org/zap"
)

const (
MonitorWorkerStatusJob = "worker_status"
DatabaseMaintenanceJob = "database_maintenance"
)

type Monitor struct {
config *config.File
databaseClient database.Client
Expand All @@ -29,33 +34,31 @@ type Monitor struct {
func (m *Monitor) Run(ctx context.Context) error {
if m.databaseClient != nil && m.redisClient != nil {
// Start the monitor cron job.
monitorWorkerStatus, err := NewCronJob(m.redisClient, "worker_status", 10*time.Minute)
monitorWorkerStatus, err := NewCronJob(m.redisClient, MonitorWorkerStatusJob, 10*time.Minute)
if err != nil {
return fmt.Errorf("new cron job: %w", err)
}

if err := monitorWorkerStatus.AddFunc(ctx, "@every 5m", func() {
if err := parameter.CheckParamsTask(ctx, m.redisClient, m.networkParamsCaller); err != nil {
if err = monitorWorkerStatus.AddFunc(ctx, "@every 5m", func() {
if err = parameter.CheckParamsTask(ctx, m.redisClient, m.networkParamsCaller); err != nil {
return
}

if err := m.MonitorWorkerStatus(ctx); err != nil {
if err = m.MonitorWorkerStatus(ctx); err != nil {
return
}
}); err != nil {
return fmt.Errorf("add heartbeat cron job: %w", err)
}

monitorWorkerStatus.Start()

// Start the database maintenance cron job.
databaseMaintenance, err := NewCronJob(m.redisClient, "database_maintenance", 5*24*time.Hour)
databaseMaintenance, err := NewCronJob(m.redisClient, DatabaseMaintenanceJob, 5*24*time.Hour)
if err != nil {
return fmt.Errorf("new cron job: %w", err)
}

if err := databaseMaintenance.AddFunc(ctx, "0 0 0 * * *", func() {
if err := m.MaintainCoveragePeriod(ctx); err != nil {
if err = databaseMaintenance.AddFunc(ctx, "0 0 0 * * *", func() {
if err = m.MaintainCoveragePeriod(ctx); err != nil {
zap.L().Error("maintain coverage period", zap.Error(err))

return
Expand All @@ -64,6 +67,12 @@ func (m *Monitor) Run(ctx context.Context) error {
return fmt.Errorf("add database maintenance cron job: %w", err)
}

defer func() {
monitorWorkerStatus.Stop()
databaseMaintenance.Stop()
}()

monitorWorkerStatus.Start()
databaseMaintenance.Start()
}

Expand Down
Loading