Skip to content

Commit

Permalink
feat: add Processor interface, support processor to do some clean job…
Browse files Browse the repository at this point in the history
… after consumer shutdown
  • Loading branch information
wxybear committed May 25, 2023
1 parent aad7ba8 commit e72ae84
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 13 deletions.
19 changes: 19 additions & 0 deletions consumer/processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package consumerLibrary

import sls "github.com/aliyun/aliyun-log-go-sdk"

type Processor interface {
Process(int, *sls.LogGroupList, CheckPointTracker) string
Shutdown(CheckPointTracker) error
}

type ProcessFunc func(int, *sls.LogGroupList, CheckPointTracker) string

func (processor ProcessFunc) Process(shard int, lgList *sls.LogGroupList, checkpointTracker CheckPointTracker) string {
return processor(shard, lgList, checkpointTracker)
}

func (processor ProcessFunc) Shutdown(checkpointTracker CheckPointTracker) error {
// Do nothing
return nil
}
15 changes: 11 additions & 4 deletions consumer/shard_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type ShardConsumerWorker struct {
lastFetchGroupCount int
lastFetchTime time.Time
consumerStatus string
process func(shard int, logGroup *sls.LogGroupList, checkpointTracker CheckPointTracker) string
processor Processor
shardId int
// TODO: refine to channel
isCurrentDone bool
Expand All @@ -43,10 +43,10 @@ func (consumer *ShardConsumerWorker) getConsumerStatus() string {
return consumer.consumerStatus
}

func initShardConsumerWorker(shardId int, consumerClient *ConsumerClient, consumerHeartBeat *ConsumerHeartBeat, do func(shard int, logGroup *sls.LogGroupList, checkpointTracker CheckPointTracker) string, logger log.Logger) *ShardConsumerWorker {
func initShardConsumerWorker(shardId int, consumerClient *ConsumerClient, consumerHeartBeat *ConsumerHeartBeat, processor Processor, logger log.Logger) *ShardConsumerWorker {
shardConsumeWorker := &ShardConsumerWorker{
shutdownFlag: false,
process: do,
processor: processor,
consumerCheckPointTracker: initConsumerCheckpointTracker(shardId, consumerClient, consumerHeartBeat, logger),
client: consumerClient,
consumerStatus: INITIALIZING,
Expand Down Expand Up @@ -101,7 +101,14 @@ func (consumer *ShardConsumerWorker) consume() {
}()
case SHUTTING_DOWN:
go func() {
err := consumer.consumerCheckPointTracker.flushCheckPoint()
err := consumer.processor.Shutdown(consumer.consumerCheckPointTracker)
if err != nil {
level.Error(consumer.logger).Log("msg", "failed to call processor shutdown", "err", err)
consumer.updateStatus(false)
return
}

err = consumer.consumerCheckPointTracker.flushCheckPoint()
if err == nil {
level.Info(consumer.logger).Log("msg", "shard worker status shutdown_complete", "shardWorkerId", consumer.shardId)
} else {
Expand Down
4 changes: 2 additions & 2 deletions consumer/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (consumer *ShardConsumerWorker) consumerProcessTask() string {
}
}()
if consumer.lastFetchLogGroupList != nil {
consumer.rollBackCheckpoint = consumer.process(consumer.shardId, consumer.lastFetchLogGroupList, consumer.consumerCheckPointTracker)
consumer.rollBackCheckpoint = consumer.processor.Process(consumer.shardId, consumer.lastFetchLogGroupList, consumer.consumerCheckPointTracker)
consumer.saveCheckPointIfNeeded()
consumer.lastFetchLogGroupList = nil
}
Expand All @@ -104,7 +104,7 @@ func (consumer *ShardConsumerWorker) consumerRetryProcessTask() bool {
level.Error(consumer.logger).Log("msg", "get panic in your process function", "error", r, "stack", string(stackBuf))
}
}()
consumer.rollBackCheckpoint = consumer.process(consumer.shardId, consumer.lastFetchLogGroupList, consumer.consumerCheckPointTracker)
consumer.rollBackCheckpoint = consumer.processor.Process(consumer.shardId, consumer.lastFetchLogGroupList, consumer.consumerCheckPointTracker)
consumer.saveCheckPointIfNeeded()
return true
}
20 changes: 13 additions & 7 deletions consumer/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type ConsumerWorker struct {
client *ConsumerClient
workerShutDownFlag *atomic.Bool
shardConsumer sync.Map // map[int]*ShardConsumerWorker
do func(shard int, logGroup *sls.LogGroupList, checkpointTracker CheckPointTracker) string
processor Processor
waitGroup sync.WaitGroup
Logger log.Logger
}
Expand All @@ -29,23 +29,29 @@ func InitConsumerWorker(option LogHubConfig, do func(int, *sls.LogGroupList) str
panic("auto commit already disabled, sdk will not save any checkpoint, " +
"please use InitConsumerWorkerWithCheckpointTracker or set AutoCommitDisabled to false")
}
return InitConsumerWorkerWithCheckpointTracker(
return InitConsumerWorkerWithProcessor(
option,
func(shardId int, logGroupList *sls.LogGroupList, checkpointTracker CheckPointTracker) string {
ProcessFunc(func(shardId int, logGroupList *sls.LogGroupList, checkpointTracker CheckPointTracker) string {
cursor := do(shardId, logGroupList)
// keep the original logic
// if cursor is not empty, we don't save,
if cursor == "" {
checkpointTracker.SaveCheckPoint(false)
}
return cursor
},
}),
)
}

// InitConsumerWorkerWithCheckpointTracker
// please note that you need to save after the process is successful,
func InitConsumerWorkerWithCheckpointTracker(option LogHubConfig, do func(int, *sls.LogGroupList, CheckPointTracker) string) *ConsumerWorker {
return InitConsumerWorkerWithProcessor(option, ProcessFunc(do))
}

// InitConsumerWorkerWithProcessor
// you need save checkpoint by yourself and can do something after consumer shutdown
func InitConsumerWorkerWithProcessor(option LogHubConfig, processor Processor) *ConsumerWorker {
logger := logConfig(option)
consumerClient := initConsumerClient(option, logger)
consumerHeatBeat := initConsumerHeatBeat(consumerClient, logger)
Expand All @@ -54,8 +60,8 @@ func InitConsumerWorkerWithCheckpointTracker(option LogHubConfig, do func(int, *
client: consumerClient,
workerShutDownFlag: atomic.NewBool(false),
//shardConsumer: make(map[int]*ShardConsumerWorker),
do: do,
Logger: logger,
processor: processor,
Logger: logger,
}
if err := consumerClient.createConsumerGroup(); err != nil {
level.Error(consumerWorker.Logger).Log(
Expand Down Expand Up @@ -130,7 +136,7 @@ func (consumerWorker *ConsumerWorker) getShardConsumer(shardId int) *ShardConsum
if ok {
return consumer.(*ShardConsumerWorker)
}
consumerIns := initShardConsumerWorker(shardId, consumerWorker.client, consumerWorker.consumerHeatBeat, consumerWorker.do, consumerWorker.Logger)
consumerIns := initShardConsumerWorker(shardId, consumerWorker.client, consumerWorker.consumerHeatBeat, consumerWorker.processor, consumerWorker.Logger)
consumerWorker.shardConsumer.Store(shardId, consumerIns)
return consumerIns

Expand Down

0 comments on commit e72ae84

Please # to comment.