From e72ae84f80728ef4a767979c74ca11831fc69208 Mon Sep 17 00:00:00 2001 From: FFish Date: Thu, 25 May 2023 10:28:13 +0000 Subject: [PATCH] feat: add Processor interface, support processor to do some clean job after consumer shutdown --- consumer/processor.go | 19 +++++++++++++++++++ consumer/shard_worker.go | 15 +++++++++++---- consumer/tasks.go | 4 ++-- consumer/worker.go | 20 +++++++++++++------- 4 files changed, 45 insertions(+), 13 deletions(-) create mode 100644 consumer/processor.go diff --git a/consumer/processor.go b/consumer/processor.go new file mode 100644 index 00000000..4cc430bc --- /dev/null +++ b/consumer/processor.go @@ -0,0 +1,19 @@ +package consumerLibrary + +import sls "github.com/aliyun/aliyun-log-go-sdk" + +type Processor interface { + Process(int, *sls.LogGroupList, CheckPointTracker) string + Shutdown(CheckPointTracker) error +} + +type ProcessFunc func(int, *sls.LogGroupList, CheckPointTracker) string + +func (processor ProcessFunc) Process(shard int, lgList *sls.LogGroupList, checkpointTracker CheckPointTracker) string { + return processor(shard, lgList, checkpointTracker) +} + +func (processor ProcessFunc) Shutdown(checkpointTracker CheckPointTracker) error { + // Do nothing + return nil +} diff --git a/consumer/shard_worker.go b/consumer/shard_worker.go index cc1e6439..8cc657bc 100644 --- a/consumer/shard_worker.go +++ b/consumer/shard_worker.go @@ -18,7 +18,7 @@ type ShardConsumerWorker struct { lastFetchGroupCount int lastFetchTime time.Time consumerStatus string - process func(shard int, logGroup *sls.LogGroupList, checkpointTracker CheckPointTracker) string + processor Processor shardId int // TODO: refine to channel isCurrentDone bool @@ -43,10 +43,10 @@ func (consumer *ShardConsumerWorker) getConsumerStatus() string { return consumer.consumerStatus } -func initShardConsumerWorker(shardId int, consumerClient *ConsumerClient, consumerHeartBeat *ConsumerHeartBeat, do func(shard int, logGroup *sls.LogGroupList, checkpointTracker CheckPointTracker) string, logger log.Logger) *ShardConsumerWorker { +func initShardConsumerWorker(shardId int, consumerClient *ConsumerClient, consumerHeartBeat *ConsumerHeartBeat, processor Processor, logger log.Logger) *ShardConsumerWorker { shardConsumeWorker := &ShardConsumerWorker{ shutdownFlag: false, - process: do, + processor: processor, consumerCheckPointTracker: initConsumerCheckpointTracker(shardId, consumerClient, consumerHeartBeat, logger), client: consumerClient, consumerStatus: INITIALIZING, @@ -101,7 +101,14 @@ func (consumer *ShardConsumerWorker) consume() { }() case SHUTTING_DOWN: go func() { - err := consumer.consumerCheckPointTracker.flushCheckPoint() + err := consumer.processor.Shutdown(consumer.consumerCheckPointTracker) + if err != nil { + level.Error(consumer.logger).Log("msg", "failed to call processor shutdown", "err", err) + consumer.updateStatus(false) + return + } + + err = consumer.consumerCheckPointTracker.flushCheckPoint() if err == nil { level.Info(consumer.logger).Log("msg", "shard worker status shutdown_complete", "shardWorkerId", consumer.shardId) } else { diff --git a/consumer/tasks.go b/consumer/tasks.go index cb393e92..7b109a71 100644 --- a/consumer/tasks.go +++ b/consumer/tasks.go @@ -88,7 +88,7 @@ func (consumer *ShardConsumerWorker) consumerProcessTask() string { } }() if consumer.lastFetchLogGroupList != nil { - consumer.rollBackCheckpoint = consumer.process(consumer.shardId, consumer.lastFetchLogGroupList, consumer.consumerCheckPointTracker) + consumer.rollBackCheckpoint = consumer.processor.Process(consumer.shardId, consumer.lastFetchLogGroupList, consumer.consumerCheckPointTracker) consumer.saveCheckPointIfNeeded() consumer.lastFetchLogGroupList = nil } @@ -104,7 +104,7 @@ func (consumer *ShardConsumerWorker) consumerRetryProcessTask() bool { level.Error(consumer.logger).Log("msg", "get panic in your process function", "error", r, "stack", string(stackBuf)) } }() - consumer.rollBackCheckpoint = consumer.process(consumer.shardId, consumer.lastFetchLogGroupList, consumer.consumerCheckPointTracker) + consumer.rollBackCheckpoint = consumer.processor.Process(consumer.shardId, consumer.lastFetchLogGroupList, consumer.consumerCheckPointTracker) consumer.saveCheckPointIfNeeded() return true } diff --git a/consumer/worker.go b/consumer/worker.go index 03922e84..cfae3f4a 100644 --- a/consumer/worker.go +++ b/consumer/worker.go @@ -17,7 +17,7 @@ type ConsumerWorker struct { client *ConsumerClient workerShutDownFlag *atomic.Bool shardConsumer sync.Map // map[int]*ShardConsumerWorker - do func(shard int, logGroup *sls.LogGroupList, checkpointTracker CheckPointTracker) string + processor Processor waitGroup sync.WaitGroup Logger log.Logger } @@ -29,9 +29,9 @@ func InitConsumerWorker(option LogHubConfig, do func(int, *sls.LogGroupList) str panic("auto commit already disabled, sdk will not save any checkpoint, " + "please use InitConsumerWorkerWithCheckpointTracker or set AutoCommitDisabled to false") } - return InitConsumerWorkerWithCheckpointTracker( + return InitConsumerWorkerWithProcessor( option, - func(shardId int, logGroupList *sls.LogGroupList, checkpointTracker CheckPointTracker) string { + ProcessFunc(func(shardId int, logGroupList *sls.LogGroupList, checkpointTracker CheckPointTracker) string { cursor := do(shardId, logGroupList) // keep the original logic // if cursor is not empty, we don't save, @@ -39,13 +39,19 @@ func InitConsumerWorker(option LogHubConfig, do func(int, *sls.LogGroupList) str checkpointTracker.SaveCheckPoint(false) } return cursor - }, + }), ) } // InitConsumerWorkerWithCheckpointTracker // please note that you need to save after the process is successful, func InitConsumerWorkerWithCheckpointTracker(option LogHubConfig, do func(int, *sls.LogGroupList, CheckPointTracker) string) *ConsumerWorker { + return InitConsumerWorkerWithProcessor(option, ProcessFunc(do)) +} + +// InitConsumerWorkerWithProcessor +// you need save checkpoint by yourself and can do something after consumer shutdown +func InitConsumerWorkerWithProcessor(option LogHubConfig, processor Processor) *ConsumerWorker { logger := logConfig(option) consumerClient := initConsumerClient(option, logger) consumerHeatBeat := initConsumerHeatBeat(consumerClient, logger) @@ -54,8 +60,8 @@ func InitConsumerWorkerWithCheckpointTracker(option LogHubConfig, do func(int, * client: consumerClient, workerShutDownFlag: atomic.NewBool(false), //shardConsumer: make(map[int]*ShardConsumerWorker), - do: do, - Logger: logger, + processor: processor, + Logger: logger, } if err := consumerClient.createConsumerGroup(); err != nil { level.Error(consumerWorker.Logger).Log( @@ -130,7 +136,7 @@ func (consumerWorker *ConsumerWorker) getShardConsumer(shardId int) *ShardConsum if ok { return consumer.(*ShardConsumerWorker) } - consumerIns := initShardConsumerWorker(shardId, consumerWorker.client, consumerWorker.consumerHeatBeat, consumerWorker.do, consumerWorker.Logger) + consumerIns := initShardConsumerWorker(shardId, consumerWorker.client, consumerWorker.consumerHeatBeat, consumerWorker.processor, consumerWorker.Logger) consumerWorker.shardConsumer.Store(shardId, consumerIns) return consumerIns