From 140909cc3d02ea9f6178fc96876a0343e1ec9053 Mon Sep 17 00:00:00 2001 From: polebug Date: Wed, 11 Sep 2024 11:44:21 +0800 Subject: [PATCH] feat(monitor): Automatically release cronjob lock when program exits --- .../dialer/postgres/client_partitioned.go | 4 ++- internal/node/monitor/cronjob.go | 9 +++++++ internal/node/monitor/server.go | 27 ++++++++++++------- 3 files changed, 30 insertions(+), 10 deletions(-) diff --git a/internal/database/dialer/postgres/client_partitioned.go b/internal/database/dialer/postgres/client_partitioned.go index 22e9b749..880add11 100644 --- a/internal/database/dialer/postgres/client_partitioned.go +++ b/internal/database/dialer/postgres/client_partitioned.go @@ -556,6 +556,8 @@ func (c *client) deleteExpiredActivitiesPartitioned(ctx context.Context, network continue } + zap.L().Info("deleting expired activities", zap.String("table", activityTable), zap.String("indexTable", indexTable)) + for { done, err := c.batchDeleteExpiredActivities(ctx, network, timestamp, batchSize, &indexTable, lo.Ternary(activityTableExists, &activityTable, nil)) if err != nil { @@ -572,7 +574,7 @@ func (c *client) deleteExpiredActivitiesPartitioned(ctx context.Context, network } func (c *client) batchDeleteExpiredActivities(ctx context.Context, network network.Network, timestamp time.Time, batchSize int, indexTable *string, activityTable *string) (bool, error) { - databaseTransaction := c.database.WithContext(ctx).Debug().Begin() + databaseTransaction := c.database.WithContext(ctx).Begin() defer func() { _ = databaseTransaction.Rollback().Error }() diff --git a/internal/node/monitor/cronjob.go b/internal/node/monitor/cronjob.go index e5af38a3..9e7b8b35 100644 --- a/internal/node/monitor/cronjob.go +++ b/internal/node/monitor/cronjob.go @@ -74,11 +74,20 @@ func (c *CronJob) Renewal(ctx context.Context) { } func (c *CronJob) Start() { + zap.L().Info("start cron job", zap.String("key", c.mutex.Name())) + c.crontab.Start() } func (c *CronJob) Stop() { + zap.L().Info("stop cron job", zap.String("key", c.mutex.Name())) + c.crontab.Stop() + c.ReleaseLock() +} + +func (c *CronJob) ReleaseLock() { + _, _ = c.mutex.Unlock() } func NewCronJob(client rueidis.Client, name string, timeout time.Duration) (*CronJob, error) { diff --git a/internal/node/monitor/server.go b/internal/node/monitor/server.go index f965814e..5bbfefc4 100644 --- a/internal/node/monitor/server.go +++ b/internal/node/monitor/server.go @@ -17,6 +17,11 @@ import ( "go.uber.org/zap" ) +const ( + MonitorWorkerStatusJob = "worker_status" + DatabaseMaintenanceJob = "database_maintenance" +) + type Monitor struct { config *config.File databaseClient database.Client @@ -29,33 +34,31 @@ type Monitor struct { func (m *Monitor) Run(ctx context.Context) error { if m.databaseClient != nil && m.redisClient != nil { // Start the monitor cron job. - monitorWorkerStatus, err := NewCronJob(m.redisClient, "worker_status", 10*time.Minute) + monitorWorkerStatus, err := NewCronJob(m.redisClient, MonitorWorkerStatusJob, 10*time.Minute) if err != nil { return fmt.Errorf("new cron job: %w", err) } - if err := monitorWorkerStatus.AddFunc(ctx, "@every 5m", func() { - if err := parameter.CheckParamsTask(ctx, m.redisClient, m.networkParamsCaller); err != nil { + if err = monitorWorkerStatus.AddFunc(ctx, "@every 5m", func() { + if err = parameter.CheckParamsTask(ctx, m.redisClient, m.networkParamsCaller); err != nil { return } - if err := m.MonitorWorkerStatus(ctx); err != nil { + if err = m.MonitorWorkerStatus(ctx); err != nil { return } }); err != nil { return fmt.Errorf("add heartbeat cron job: %w", err) } - monitorWorkerStatus.Start() - // Start the database maintenance cron job. - databaseMaintenance, err := NewCronJob(m.redisClient, "database_maintenance", 5*24*time.Hour) + databaseMaintenance, err := NewCronJob(m.redisClient, DatabaseMaintenanceJob, 5*24*time.Hour) if err != nil { return fmt.Errorf("new cron job: %w", err) } - if err := databaseMaintenance.AddFunc(ctx, "0 0 0 * * *", func() { - if err := m.MaintainCoveragePeriod(ctx); err != nil { + if err = databaseMaintenance.AddFunc(ctx, "0 0 0 * * *", func() { + if err = m.MaintainCoveragePeriod(ctx); err != nil { zap.L().Error("maintain coverage period", zap.Error(err)) return @@ -64,6 +67,12 @@ func (m *Monitor) Run(ctx context.Context) error { return fmt.Errorf("add database maintenance cron job: %w", err) } + defer func() { + monitorWorkerStatus.Stop() + databaseMaintenance.Stop() + }() + + monitorWorkerStatus.Start() databaseMaintenance.Start() }