From 83db23a2a2cf442ea797a66c1fad3d282166e92b Mon Sep 17 00:00:00 2001 From: FFish Date: Thu, 11 May 2023 09:58:50 +0000 Subject: [PATCH 1/9] feat: update golang consumer group 1. support user use CheckpointTracker, can get checkpoint or save checkpoint 2. check save checkpoint err code 3. fix may save checkpoint even process failed 4. refine codes... --- consumer/README.md | 60 +++++-- consumer/checkpoint_tracker.go | 121 +++++++++----- consumer/config.go | 5 + consumer/consumer_client.go | 3 + consumer/heart_beat.go | 95 ++++++----- consumer/shard_worker.go | 294 +++++++++++++++------------------ consumer/tasks.go | 50 ++++-- consumer/util.go | 3 +- consumer/worker.go | 31 +++- 9 files changed, 378 insertions(+), 284 deletions(-) diff --git a/consumer/README.md b/consumer/README.md index 5a99beba..f06addd0 100644 --- a/consumer/README.md +++ b/consumer/README.md @@ -1,7 +1,5 @@ # Aliyun LOG Go Consumer Library - - Aliyun LOG Go Consumer Library 是一个易于使用且高度可配置的golang 类库,专门为大数据高并发场景下的多个消费者协同消费同一个logstore而编写的纯go语言的类库。 ## 功能特点 @@ -19,9 +17,7 @@ Aliyun LOG Go Consumer Library 是一个易于使用且高度可配置的golang - 用户可以创建多个消费者对同一Logstore中的数据进行消费,而且不用关心消费者之间的负载均衡,consumer library 会进行自动处理,并且保证数据不会被重复消费。在cpu等资源有限情况下可以尽最大能力去消费logstore中的数据,并且会自动为用户保存消费断点到服务端。 - 当网络不稳定出现网络震荡时,consumer library可以在网络恢复时继续消费并且保证数据不会丢失及重复消费。 -- 提供了更多高阶用法,使用户可以通过多种方法去调控运行中的consumer library,具体事例请参考[aliyun log go consumer library 高阶用法](https://yq.aliyun.com/articles/693820) - - +- 提供了更多高阶用法,使用户可以通过多种方法去调控运行中的consumer library ## 安装 @@ -31,45 +27,74 @@ Aliyun LOG Go Consumer Library 是一个易于使用且高度可配置的golang git clone git@github.com:aliyun/aliyun-log-go-sdk.git ``` - - -##原理剖析及快速入门 - -参考教程: [ALiyun LOG Go Consumer Library 快速入门及原理剖析](https://yq.aliyun.com/articles/693820) - - +## 原理剖析及快速入门 +参考教程: [ALiyun LOG Go Consumer Library 快速入门及原理剖析](https://developer.aliyun.com/article/693820) ## 使用步骤 1.**配置LogHubConfig** LogHubConfig是提供给用户的配置类,用于配置消费策略,您可以根据不同的需求设定不同的值,各参数含义如其中所示 +|参数|含义|详情| +| --- | --- | --- | +|Endpoint|sls的endpoint|必填,如cn-hangzhou.sls.aliyuncs.com| +|AccessKeyId|aliyun的AccessKeyId|必填| +|AccessKeySecret|aliyun的AccessKeySecret|必填| +|Project|sls的project信息|必填| +|Logstore|sls的logstore|必填| +|ConsumerGroupName|消费组名称|必填| +|Consumer|消费者名称|必填,sls的consumer需要自行指定,请注意不要重复| +|CursorPosition|消费的点位|必填,支持 1.BEGIN_CURSOR: logstore的开始点位 2. END_CURSOR: logstore的最新数据点位 3.SPECIAL_TIME_CURSOR: 自行设置的unix时间戳| +||sls的logstore|必填| +|HeartbeatIntervalInSecond|心跳的时间间隔|非必填,默认时间为20s, sdk会根据心跳时间与服务器确认alive| +|DataFetchIntervalInMs|数据默认拉取的间隔|非必填,默认为200ms| +|MaxFetchLogGroupCount|数据一次拉取的log group数量|非必填,默认为1000| +|CursorStartTime|数据点位的时间戳|非必填,CursorPosition为SPECIAL_TIME_CURSOR时需填写| +|InOrder|shard分裂后是否in order消费|非必填,默认为false,当为true时,分裂shard会在老的read only shard消费完后再继续消费| +|AllowLogLevel|允许的日志级别|非必填,默认为info,日志级别由低到高为debug, info, warn, error,仅高于此AllowLogLevel的才会被log出来| +|LogFileName|程序运行日志文件名称|非必填,默认为stdout| +|IsJsonType|是否为json类型|非必填,默认为logfmt格式,true时为json格式| +|LogMaxSize|日志文件最大size|非必填,默认为10| +|LogMaxBackups|最大保存的old日志文件|非必填,默认为10| +|LogCompass|日志是否压缩|非必填,默认不压缩,如果压缩为gzip压缩| +|HTTPClient|指定http client|非必填,可指定http client实现一些逻辑,sdk发送http请求会使用这个client| +|SecurityToken|aliyun SecurityToken|非必填,参考https://help.aliyun.com/document_detail/47277.html| +|AutoCommitDisabled|是否禁用sdk自动提交checkpoint|非必填,默认不会禁用| +|AutoCommitIntervalInMS|自动提交checkpoint的时间间隔|非必填,单位为MS,默认时间为60s| 2.**覆写消费逻辑** ``` -func process(shardId int, logGroupList *sls.LogGroupList) string { +func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker CheckPointTracker) string { for _, logGroup := range logGroupList.LogGroups { err := client.PutLogs(option.Project, "copy-logstore", logGroup) if err != nil { fmt.Println(err) } } - fmt.Println("shardId %v processing works sucess", shardId) - return "" // 不需要重置检查点情况下,请返回空字符串,如需要重置检查点,请返回需要重置的检查点游标。 + fmt.Println("shardId %v processing works success", shardId) + // 标记给CheckPointTracker process已成功,保存存档点, + // false 标记process已成功,但并不直接写入服务器,等待一定的interval后sdk批量写入 (AutoCommitDisable为false情况SDK会批量写入) + // true 标记已成功, 且直接写入服务器 + // 推荐大多数场景下使用false即可 + checkpointTracker.SaveCheckPoint(false); // 代表process成功保存存档点,但并不直接写入服务器,等待一定的interval后写入 + // 不需要重置检查点情况下,请返回空字符串,如需要重置检查点,请返回需要重置的检查点游标。 + // 如果需要重置检查点的情况下,可以返回checkpointTracker.GetCurrentCheckPoint, current checkpoint即尚未process的这批数据开始的检查点 + return "" } ``` -在实际消费当中,您只需要根据自己的需要重新覆写消费函数process 即可,上图只是一个简单的demo,将consumer获取到的日志进行了打印处理,注意,该函数参数和返回值不可改变,否则会导致消费失败。 +在实际消费当中,您只需要根据自己的需要重新覆写消费函数process即可,上图只是一个简单的demo,将consumer获取到的日志进行了打印处理,注意,该函数参数和返回值不可改变,否则会导致消费失败。 3.**创建消费者并开始消费** ``` // option是LogHubConfig的实例 -consumerWorker := consumerLibrary.InitConsumerWorker(option, process) +consumerWorker := consumerLibrary.InitConsumerWorkerWithCheckpointTracker(option, process) // 调用Start方法开始消费 consumerWorker.Start() ``` +> 注意目前已废弃`InitConsumerWorker(option, process)`,其代表在process函数后,sdk会执行一次`checkpointTracker.SaveCheckPoint(false)`,但是无法手动强制写入服务器/获取上一个的checkpoint等功能 调用InitConsumerWorkwer方法,将配置实例对象和消费函数传递到参数中生成消费者实例对象,调用Start方法进行消费。 @@ -87,7 +112,6 @@ if _, ok := <-ch; ok { // 当获取到os停止信号以后,例如ctrl+c触发 上图中的例子通过go的信道做了os信号的监听,当监听到用户触发了os退出信号以后,调用StopAndWait()方法进行退出,用户可以根据自己的需要设计自己的退出逻辑,只需要调用StopAndWait()即可。 - ## 简单样例 为了方便用户可以更快速的上手consumer library 我们提供了两个简单的通过代码操作consumer library的简单样例,请参考[consumer library example](https://github.com/aliyun/aliyun-log-go-sdk/tree/master/example/consumer) diff --git a/consumer/checkpoint_tracker.go b/consumer/checkpoint_tracker.go index a6e99b44..e1c15ba3 100644 --- a/consumer/checkpoint_tracker.go +++ b/consumer/checkpoint_tracker.go @@ -1,61 +1,104 @@ package consumerLibrary import ( + "strings" + + sls "github.com/aliyun/aliyun-log-go-sdk" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - "time" ) -type ConsumerCheckPointTracker struct { - client *ConsumerClient - defaultFlushCheckPointIntervalSec int64 - tempCheckPoint string - lastPersistentCheckPoint string - trackerShardId int - lastCheckTime int64 - logger log.Logger -} - -func initConsumerCheckpointTracker(shardId int, consumerClient *ConsumerClient, logger log.Logger) *ConsumerCheckPointTracker { - checkpointTracker := &ConsumerCheckPointTracker{ - defaultFlushCheckPointIntervalSec: 60, - client: consumerClient, - trackerShardId: shardId, - logger: logger, - } - return checkpointTracker +type CheckPointTracer interface { + // GetCheckPoint get lastest saved check point + GetCheckPoint() string + // GetCurrentCursor get current fetched data cursor + GetCurrentCheckPoint() string + // SaveCheckPoint, save checkpoint + SaveCheckPoint(force bool) error } -func (checkPointTracker *ConsumerCheckPointTracker) setMemoryCheckPoint(cursor string) { - checkPointTracker.tempCheckPoint = cursor +type DefaultCheckPointTracker struct { + client *ConsumerClient + heartBeat *ConsumerHeartBeat + nextCheckPoint string // cursor for already pulled data + currentCheckPoint string // cursor for data processed, but may not be saved to server + pendingCheckPoint string // pending cursor to saved + savedCheckPoint string // already saved + shardId int + logger log.Logger } -func (checkPointTracker *ConsumerCheckPointTracker) setPersistentCheckPoint(cursor string) { - checkPointTracker.lastPersistentCheckPoint = cursor +func initConsumerCheckpointTracker(shardId int, consumerClient *ConsumerClient, consumerHeatBeat *ConsumerHeartBeat, logger log.Logger) *DefaultCheckPointTracker { + checkpointTracker := &DefaultCheckPointTracker{ + client: consumerClient, + heartBeat: consumerHeatBeat, + shardId: shardId, + logger: logger, + } + return checkpointTracker } -func (checkPointTracker *ConsumerCheckPointTracker) flushCheckPoint() error { - if checkPointTracker.tempCheckPoint != "" && checkPointTracker.tempCheckPoint != checkPointTracker.lastPersistentCheckPoint { - if err := checkPointTracker.client.updateCheckPoint(checkPointTracker.trackerShardId, checkPointTracker.tempCheckPoint, true); err != nil { - return err - } +func (tracker *DefaultCheckPointTracker) initCheckPoint(cursor string) { + tracker.savedCheckPoint = cursor +} - checkPointTracker.lastPersistentCheckPoint = checkPointTracker.tempCheckPoint +func (tracker *DefaultCheckPointTracker) SaveCheckPoint(force bool) error { + tracker.pendingCheckPoint = tracker.nextCheckPoint + if force { + return tracker.flushCheckPoint() } + return nil } -func (checkPointTracker *ConsumerCheckPointTracker) flushCheck() { - currentTime := time.Now().Unix() - if currentTime > checkPointTracker.lastCheckTime+checkPointTracker.defaultFlushCheckPointIntervalSec { - if err := checkPointTracker.flushCheckPoint(); err != nil { - level.Warn(checkPointTracker.logger).Log("msg", "update checkpoint get error", "error", err) - } else { - checkPointTracker.lastCheckTime = currentTime +func (tracker *DefaultCheckPointTracker) GetCheckPoint() string { + return tracker.savedCheckPoint +} + +func (tracker *DefaultCheckPointTracker) GetCurrentCheckPoint() string { + return tracker.currentCheckPoint +} + +func (tracker *DefaultCheckPointTracker) setCurrentCheckPoint(cursor string) { + tracker.currentCheckPoint = cursor +} + +func (tracker *DefaultCheckPointTracker) setNextCheckPoint(cursor string) { + tracker.nextCheckPoint = cursor +} + +func (tracker *DefaultCheckPointTracker) flushCheckPoint() error { + if tracker.pendingCheckPoint == "" || tracker.pendingCheckPoint == tracker.savedCheckPoint { + return nil + } + for i := 0; ; i++ { + err := tracker.client.updateCheckPoint(tracker.shardId, tracker.pendingCheckPoint, true) + if err == nil { + break + } + slsErr, ok := err.(*sls.Error) + if ok { + if strings.EqualFold(slsErr.Code, "ConsumerNotExsit") || strings.EqualFold(slsErr.Code, "ConsumerNotMatch") { + tracker.heartBeat.removeHeartShard(tracker.shardId) + level.Warn(tracker.logger).Log("msg", "consumer has been removed or shard has been reassigned", "shard", tracker.shardId, "err", slsErr) + break + } else if strings.EqualFold(slsErr.Code, "ShardNotExsit") { + tracker.heartBeat.removeHeartShard(tracker.shardId) + level.Warn(tracker.logger).Log("msg", "shard does not exist", "shard", tracker.shardId) + break + } + } + if i >= 2 { + level.Error(tracker.logger).Log( + "msg", "failed to save checkpoint", + "consumer", tracker.client.option.ConsumerName, + "shard", tracker.shardId, + "checkpoint", tracker.pendingCheckPoint, + ) + return err } } -} -func (checkPointTracker *ConsumerCheckPointTracker) getCheckPoint() string { - return checkPointTracker.tempCheckPoint + tracker.savedCheckPoint = tracker.pendingCheckPoint + return nil } diff --git a/consumer/config.go b/consumer/config.go index 1a37588c..154f74e5 100644 --- a/consumer/config.go +++ b/consumer/config.go @@ -34,6 +34,9 @@ type LogHubConfig struct { // deleted.) //:param LogCompass: Compress determines if the rotated log files should be compressed using gzip. //:param HTTPClient: custom http client for sending data to sls + //:param AutoCommitDisabled: whether to disable commit checkpoint automatically, default is false, means auto commit checkpoint + // Note that if you set autocommit to false, you must use InitConsumerWorkerWithCheckpointTracker instead of InitConsumerWorker + //:param AutoCommitIntervalInSec: default auto commit interval, default is 30 Endpoint string AccessKeyID string @@ -56,6 +59,8 @@ type LogHubConfig struct { LogCompass bool HTTPClient *http.Client SecurityToken string + AutoCommitDisabled bool + AutoCommitIntervalInMS int64 } const ( diff --git a/consumer/consumer_client.go b/consumer/consumer_client.go index 3f474c15..0f098670 100644 --- a/consumer/consumer_client.go +++ b/consumer/consumer_client.go @@ -27,6 +27,9 @@ func initConsumerClient(option LogHubConfig, logger log.Logger) *ConsumerClient if option.MaxFetchLogGroupCount == 0 { option.MaxFetchLogGroupCount = 1000 } + if option.AutoCommitIntervalInMS == 0 { + option.AutoCommitIntervalInMS = 60 * 1000 + } client := &sls.Client{ Endpoint: option.Endpoint, AccessKeyID: option.AccessKeyID, diff --git a/consumer/heart_beat.go b/consumer/heart_beat.go index cf2d10ca..acc774fe 100644 --- a/consumer/heart_beat.go +++ b/consumer/heart_beat.go @@ -10,19 +10,18 @@ import ( "go.uber.org/atomic" ) -var shardLock sync.RWMutex - -type ConsumerHeatBeat struct { +type ConsumerHeartBeat struct { client *ConsumerClient shutDownFlag *atomic.Bool heldShards []int heartShards []int logger log.Logger lastHeartBeatSuccessTime int64 + shardLock sync.RWMutex } -func initConsumerHeatBeat(consumerClient *ConsumerClient, logger log.Logger) *ConsumerHeatBeat { - consumerHeatBeat := &ConsumerHeatBeat{ +func initConsumerHeatBeat(consumerClient *ConsumerClient, logger log.Logger) *ConsumerHeartBeat { + consumerHeartBeat := &ConsumerHeartBeat{ client: consumerClient, shutDownFlag: atomic.NewBool(false), heldShards: []int{}, @@ -30,77 +29,77 @@ func initConsumerHeatBeat(consumerClient *ConsumerClient, logger log.Logger) *Co logger: logger, lastHeartBeatSuccessTime: time.Now().Unix(), } - return consumerHeatBeat + return consumerHeartBeat } -func (consumerHeatBeat *ConsumerHeatBeat) getHeldShards() []int { - shardLock.RLock() - defer shardLock.RUnlock() - return consumerHeatBeat.heldShards +func (heartbeat *ConsumerHeartBeat) getHeldShards() []int { + heartbeat.shardLock.RLock() + defer heartbeat.shardLock.RUnlock() + return heartbeat.heldShards } -func (consumerHeatBeat *ConsumerHeatBeat) setHeldShards(heldShards []int) { - shardLock.Lock() - defer shardLock.Unlock() - consumerHeatBeat.heldShards = heldShards +func (heartbeat *ConsumerHeartBeat) setHeldShards(heldShards []int) { + heartbeat.shardLock.Lock() + defer heartbeat.shardLock.Unlock() + heartbeat.heldShards = heldShards } -func (consumerHeatBeat *ConsumerHeatBeat) setHeartShards(heartShards []int) { - shardLock.Lock() - defer shardLock.Unlock() - consumerHeatBeat.heartShards = heartShards +func (heartbeat *ConsumerHeartBeat) setHeartShards(heartShards []int) { + heartbeat.shardLock.Lock() + defer heartbeat.shardLock.Unlock() + heartbeat.heartShards = heartShards } -func (consumerHeatBeat *ConsumerHeatBeat) getHeartShards() []int { - shardLock.RLock() - defer shardLock.RUnlock() - return consumerHeatBeat.heartShards +func (heartbeat *ConsumerHeartBeat) getHeartShards() []int { + heartbeat.shardLock.RLock() + defer heartbeat.shardLock.RUnlock() + return heartbeat.heartShards } -func (consumerHeatBeat *ConsumerHeatBeat) shutDownHeart() { - level.Info(consumerHeatBeat.logger).Log("msg", "try to stop heart beat") - consumerHeatBeat.shutDownFlag.Store(true) +func (heartbeat *ConsumerHeartBeat) shutDownHeart() { + level.Info(heartbeat.logger).Log("msg", "try to stop heart beat") + heartbeat.shutDownFlag.Store(true) } -func (consumerHeatBeat *ConsumerHeatBeat) heartBeatRun() { +func (heartbeat *ConsumerHeartBeat) heartBeatRun() { var lastHeartBeatTime int64 - for !consumerHeatBeat.shutDownFlag.Load() { + for !heartbeat.shutDownFlag.Load() { lastHeartBeatTime = time.Now().Unix() - uploadShards := append(consumerHeatBeat.heartShards, consumerHeatBeat.heldShards...) - consumerHeatBeat.setHeartShards(Set(uploadShards)) - responseShards, err := consumerHeatBeat.client.heartBeat(consumerHeatBeat.getHeartShards()) + uploadShards := append(heartbeat.heartShards, heartbeat.heldShards...) + heartbeat.setHeartShards(Set(uploadShards)) + responseShards, err := heartbeat.client.heartBeat(heartbeat.getHeartShards()) if err != nil { - level.Warn(consumerHeatBeat.logger).Log("msg", "send heartbeat error", "error", err) - if time.Now().Unix()-consumerHeatBeat.lastHeartBeatSuccessTime > int64(consumerHeatBeat.client.consumerGroup.Timeout+consumerHeatBeat.client.option.HeartbeatIntervalInSecond) { - consumerHeatBeat.setHeldShards([]int{}) - level.Info(consumerHeatBeat.logger).Log("msg", "Heart beat timeout, automatic reset consumer held shards") + level.Warn(heartbeat.logger).Log("msg", "send heartbeat error", "error", err) + if time.Now().Unix()-heartbeat.lastHeartBeatSuccessTime > int64(heartbeat.client.consumerGroup.Timeout+heartbeat.client.option.HeartbeatIntervalInSecond) { + heartbeat.setHeldShards([]int{}) + level.Info(heartbeat.logger).Log("msg", "Heart beat timeout, automatic reset consumer held shards") } } else { - consumerHeatBeat.lastHeartBeatSuccessTime = time.Now().Unix() - level.Info(consumerHeatBeat.logger).Log("heart beat result", fmt.Sprintf("%v", consumerHeatBeat.heartShards), "get", fmt.Sprintf("%v", responseShards)) - consumerHeatBeat.setHeldShards(responseShards) - if !IntSliceReflectEqual(consumerHeatBeat.getHeartShards(), consumerHeatBeat.getHeldShards()) { - currentSet := Set(consumerHeatBeat.getHeartShards()) - responseSet := Set(consumerHeatBeat.getHeldShards()) + heartbeat.lastHeartBeatSuccessTime = time.Now().Unix() + level.Info(heartbeat.logger).Log("heart beat result", fmt.Sprintf("%v", heartbeat.heartShards), "get", fmt.Sprintf("%v", responseShards)) + heartbeat.setHeldShards(responseShards) + if !IntSliceReflectEqual(heartbeat.getHeartShards(), heartbeat.getHeldShards()) { + currentSet := Set(heartbeat.getHeartShards()) + responseSet := Set(heartbeat.getHeldShards()) add := Subtract(currentSet, responseSet) remove := Subtract(responseSet, currentSet) - level.Info(consumerHeatBeat.logger).Log("shard reorganize, adding:", fmt.Sprintf("%v", add), "removing:", fmt.Sprintf("%v", remove)) + level.Info(heartbeat.logger).Log("shard reorganize, adding:", fmt.Sprintf("%v", add), "removing:", fmt.Sprintf("%v", remove)) } } - TimeToSleepInSecond(int64(consumerHeatBeat.client.option.HeartbeatIntervalInSecond), lastHeartBeatTime, consumerHeatBeat.shutDownFlag.Load()) + TimeToSleepInSecond(int64(heartbeat.client.option.HeartbeatIntervalInSecond), lastHeartBeatTime, heartbeat.shutDownFlag.Load()) } - level.Info(consumerHeatBeat.logger).Log("msg", "heart beat exit") + level.Info(heartbeat.logger).Log("msg", "heart beat exit") } -func (consumerHeatBeat *ConsumerHeatBeat) removeHeartShard(shardId int) bool { - shardLock.Lock() - defer shardLock.Unlock() +func (heartbeat *ConsumerHeartBeat) removeHeartShard(shardId int) bool { + heartbeat.shardLock.Lock() + defer heartbeat.shardLock.Unlock() isDeleteShard := false - for i, heartShard := range consumerHeatBeat.heartShards { + for i, heartShard := range heartbeat.heartShards { if shardId == heartShard { - consumerHeatBeat.heartShards = append(consumerHeatBeat.heartShards[:i], consumerHeatBeat.heartShards[i+1:]...) + heartbeat.heartShards = append(heartbeat.heartShards[:i], heartbeat.heartShards[i+1:]...) isDeleteShard = true break } diff --git a/consumer/shard_worker.go b/consumer/shard_worker.go index 278b19d7..17c162dc 100644 --- a/consumer/shard_worker.go +++ b/consumer/shard_worker.go @@ -1,174 +1,175 @@ package consumerLibrary import ( - "github.com/aliyun/aliyun-log-go-sdk" - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" "sync" "time" -) -var shutDownLock sync.RWMutex -var consumeStatusLock sync.RWMutex -var consumerTaskLock sync.RWMutex + sls "github.com/aliyun/aliyun-log-go-sdk" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" +) type ShardConsumerWorker struct { - client *ConsumerClient - consumerCheckPointTracker *ConsumerCheckPointTracker - consumerShutDownFlag bool - lastFetchLogGroupList *sls.LogGroupList - nextFetchCursor string - lastFetchGroupCount int - lastFetchtime int64 - consumerStatus string - process func(shard int, logGroup *sls.LogGroupList) string - shardId int - tempCheckPoint string - isCurrentDone bool - logger log.Logger - lastFetchTimeForForceFlushCpt int64 - isFlushCheckpointDone bool - rollBackCheckpoint string + client *ConsumerClient + consumerCheckPointTracker *DefaultCheckPointTracker + consumerShutDownFlag bool + lastFetchLogGroupList *sls.LogGroupList + nextFetchCursor string + lastFetchGroupCount int + lastFetchTime time.Time + consumerStatus string + process func(shard int, logGroup *sls.LogGroupList, checkpointTracker CheckPointTracer) string + shardId int + isCurrentDone bool + logger log.Logger + // unix time + lastCheckpointSaveTime time.Time + rollBackCheckpoint string + + statusLock sync.RWMutex + taskLock sync.RWMutex + shutDownLock sync.RWMutex } func (consumer *ShardConsumerWorker) setConsumerStatus(status string) { - consumeStatusLock.Lock() - defer consumeStatusLock.Unlock() + consumer.statusLock.Lock() + defer consumer.statusLock.Unlock() consumer.consumerStatus = status } func (consumer *ShardConsumerWorker) getConsumerStatus() string { - consumeStatusLock.RLock() - defer consumeStatusLock.RUnlock() + consumer.statusLock.RLock() + defer consumer.statusLock.RUnlock() return consumer.consumerStatus } -func initShardConsumerWorker(shardId int, consumerClient *ConsumerClient, do func(shard int, logGroup *sls.LogGroupList) string, logger log.Logger) *ShardConsumerWorker { +func initShardConsumerWorker(shardId int, consumerClient *ConsumerClient, consumerHeartBeat *ConsumerHeartBeat, do func(shard int, logGroup *sls.LogGroupList, checkpointTracker CheckPointTracer) string, logger log.Logger) *ShardConsumerWorker { shardConsumeWorker := &ShardConsumerWorker{ - consumerShutDownFlag: false, - process: do, - consumerCheckPointTracker: initConsumerCheckpointTracker(shardId, consumerClient, logger), - client: consumerClient, - consumerStatus: INITIALIZING, - shardId: shardId, - lastFetchtime: 0, - isCurrentDone: true, - isFlushCheckpointDone: true, - logger: logger, - lastFetchTimeForForceFlushCpt: 0, - rollBackCheckpoint: "", + consumerShutDownFlag: false, + process: do, + consumerCheckPointTracker: initConsumerCheckpointTracker(shardId, consumerClient, consumerHeartBeat, logger), + client: consumerClient, + consumerStatus: INITIALIZING, + shardId: shardId, + lastFetchTime: time.Now(), + isCurrentDone: true, + logger: logger, + rollBackCheckpoint: "", } return shardConsumeWorker } func (consumer *ShardConsumerWorker) consume() { if consumer.consumerShutDownFlag { - consumer.setIsFlushCheckpointDoneToFalse() go func() { - // If the data is not consumed, save the tempCheckPoint to the server - if consumer.getConsumerStatus() == PULL_PROCESSING_DONE { - consumer.consumerCheckPointTracker.tempCheckPoint = consumer.tempCheckPoint - } else if consumer.getConsumerStatus() == CONSUME_PROCESSING { - level.Info(consumer.logger).Log("msg", "Consumption is in progress, waiting for consumption to be completed") - consumer.setIsFlushCheckpointDoneToTrue() - return + defer consumer.setConsumerStatus(SHUTDOWN_COMPLETE) + // if processing, we can wait it to switch status, otherwise we save checkpoint straightly + if consumer.getConsumerStatus() == CONSUME_PROCESSING { + level.Info(consumer.logger).Log("msg", "Consumption is in progress, sleep to wait for consumption to be completed") + shutdownWaitTimes := 10 + // OPTIMIZE + // now just sleep, won't wait until the end + for i := 0; i < shutdownWaitTimes; i++ { + time.Sleep(time.Millisecond * 10) + if consumer.getConsumerStatus() != CONSUME_PROCESSING { + break + } + if i == shutdownWaitTimes { + level.Warn(consumer.logger).Log("msg", "wait many times, but last process may be not over yes", "retryTimes", shutdownWaitTimes) + } + } } - err := consumer.consumerCheckPointTracker.flushCheckPoint() - if err != nil { - level.Warn(consumer.logger).Log("msg", "Flush checkpoint error,prepare for retry", "error message:", err) - } else { - consumer.setConsumerStatus(SHUTDOWN_COMPLETE) - level.Info(consumer.logger).Log("msg", "shardworker are shut down complete", "shardWorkerId", consumer.shardId) + + var err error + retryTimes := 3 + for i := 0; i < retryTimes; i++ { + err = consumer.consumerCheckPointTracker.SaveCheckPoint(true) + if err == nil { + break + } } - consumer.setIsFlushCheckpointDoneToTrue() - }() - } else if consumer.getConsumerStatus() == INITIALIZING { - consumer.setConsumerIsCurrentDoneToFalse() - go func() { - cursor, err := consumer.consumerInitializeTask() - if err != nil { - consumer.setConsumerStatus(INITIALIZING) + if err == nil { + level.Info(consumer.logger).Log("msg", "shardworker are shut down complete", "shardWorkerId", consumer.shardId) } else { - consumer.nextFetchCursor = cursor - consumer.setConsumerStatus(INITIALIZING_DONE) + level.Warn(consumer.logger).Log("msg", "failed after retry", "retryTimes", retryTimes, "err", err) } - consumer.setConsumerIsCurrentDoneToTrue() }() - } else if consumer.getConsumerStatus() == INITIALIZING_DONE || consumer.getConsumerStatus() == CONSUME_PROCESSING_DONE { - consumer.setConsumerIsCurrentDoneToFalse() - consumer.setConsumerStatus(PULL_PROCESSING) - go func() { - var isGenerateFetchTask = true - // throttling control, similar as Java's SDK - if consumer.lastFetchGroupCount < 100 { - // The time used here is in milliseconds. - isGenerateFetchTask = (time.Now().UnixNano()/1e6 - consumer.lastFetchtime) > 500 - } else if consumer.lastFetchGroupCount < 500 { - isGenerateFetchTask = (time.Now().UnixNano()/1e6 - consumer.lastFetchtime) > 200 - } else if consumer.lastFetchGroupCount < 1000 { - isGenerateFetchTask = (time.Now().UnixNano()/1e6 - consumer.lastFetchtime) > 50 - } - if isGenerateFetchTask { - consumer.lastFetchtime = time.Now().UnixNano() / 1e6 - // Set the logback cursor. If the logs are not consumed, save the logback cursor to the server. - consumer.tempCheckPoint = consumer.nextFetchCursor + } else { + consumer.setTaskDoneFlag(false) + if consumer.getConsumerStatus() == INITIALIZING { + go func() { + defer consumer.setTaskDoneFlag(true) - logGroupList, nextCursor, err := consumer.consumerFetchTask() + cursor, err := consumer.consumerInitializeTask() if err != nil { - consumer.setConsumerStatus(INITIALIZING_DONE) + consumer.setConsumerStatus(INITIALIZING) } else { - consumer.lastFetchLogGroupList = logGroupList - consumer.nextFetchCursor = nextCursor - consumer.consumerCheckPointTracker.setMemoryCheckPoint(consumer.nextFetchCursor) - consumer.lastFetchGroupCount = GetLogGroupCount(consumer.lastFetchLogGroupList) - level.Debug(consumer.logger).Log("shardId", consumer.shardId, "fetch log count", GetLogCount(consumer.lastFetchLogGroupList)) - if consumer.lastFetchGroupCount == 0 { - consumer.lastFetchLogGroupList = nil - } else { - consumer.lastFetchTimeForForceFlushCpt = time.Now().Unix() - } - if time.Now().Unix()-consumer.lastFetchTimeForForceFlushCpt > 30 { - err := consumer.consumerCheckPointTracker.flushCheckPoint() - if err != nil { - level.Warn(consumer.logger).Log("msg", "Failed to save the final checkpoint", "error:", err) - } else { - consumer.lastFetchTimeForForceFlushCpt = 0 - } + consumer.nextFetchCursor = cursor + consumer.setConsumerStatus(INITIALIZING_DONE) + } + }() + } else if consumer.getConsumerStatus() == INITIALIZING_DONE || consumer.getConsumerStatus() == CONSUME_PROCESSING_DONE { + consumer.setConsumerStatus(PULL_PROCESSING) + go func() { + defer consumer.setTaskDoneFlag(true) + if !consumer.shouldFetch() { + level.Debug(consumer.logger).Log("msg", "Pull Log Current Limitation and Re-Pull Log") + consumer.setConsumerStatus(INITIALIZING_DONE) + } - } + if err := consumer.nextFetchTask(); err != nil { + consumer.setConsumerStatus(INITIALIZING_DONE) + } else { consumer.setConsumerStatus(PULL_PROCESSING_DONE) } - } else { - level.Debug(consumer.logger).Log("msg", "Pull Log Current Limitation and Re-Pull Log") - consumer.setConsumerStatus(INITIALIZING_DONE) - } - consumer.setConsumerIsCurrentDoneToTrue() - }() - } else if consumer.getConsumerStatus() == PULL_PROCESSING_DONE { - consumer.setConsumerIsCurrentDoneToFalse() - consumer.setConsumerStatus(CONSUME_PROCESSING) - go func() { - rollBackCheckpoint := consumer.consumerProcessTask() - if rollBackCheckpoint != "" { - consumer.nextFetchCursor = rollBackCheckpoint - level.Info(consumer.logger).Log("msg", "Checkpoints set for users have been reset", "shardWorkerId", consumer.shardId, "rollBackCheckpoint", rollBackCheckpoint) - } - consumer.lastFetchLogGroupList = nil - consumer.setConsumerStatus(CONSUME_PROCESSING_DONE) - consumer.setConsumerIsCurrentDoneToTrue() - }() + }() + } else if consumer.getConsumerStatus() == PULL_PROCESSING_DONE { + consumer.setConsumerStatus(CONSUME_PROCESSING) + go func() { + defer consumer.setTaskDoneFlag(true) + defer consumer.setConsumerStatus(CONSUME_PROCESSING_DONE) + rollBackCheckpoint := consumer.consumerProcessTask() + if rollBackCheckpoint != "" { + consumer.nextFetchCursor = rollBackCheckpoint + level.Info(consumer.logger).Log( + "msg", "Checkpoints set for users have been reset", + "shardId", consumer.shardId, + "rollBackCheckpoint", rollBackCheckpoint, + ) + } + }() + } + } +} + +func (consumer *ShardConsumerWorker) shouldFetch() bool { + if consumer.lastFetchGroupCount >= 1000 { + return true + } + duration := time.Since(consumer.lastFetchTime) + if consumer.lastFetchGroupCount < 100 { + // The time used here is in milliseconds. + return duration > 500*time.Millisecond + } else if consumer.lastFetchGroupCount < 500 { + return duration > 200*time.Millisecond + } else { // 500 - 1000 + return duration > 50*time.Millisecond } +} +func (consumer *ShardConsumerWorker) saveCheckPointIfNeeded() { + if !consumer.client.option.AutoCommitDisabled { + return + } + if time.Since(consumer.lastCheckpointSaveTime) > time.Millisecond*time.Duration(consumer.client.option.AutoCommitIntervalInMS) { + consumer.consumerCheckPointTracker.SaveCheckPoint(true) + } } func (consumer *ShardConsumerWorker) consumerShutDown() { consumer.consumerShutDownFlag = true if !consumer.isShutDownComplete() { - if consumer.getIsFlushCheckpointDoneStatus() { - consumer.consume() - } else { - return - } + consumer.consume() } } @@ -176,39 +177,14 @@ func (consumer *ShardConsumerWorker) isShutDownComplete() bool { return consumer.getConsumerStatus() == SHUTDOWN_COMPLETE } -func (consumer *ShardConsumerWorker) setConsumerIsCurrentDoneToFalse() { - consumerTaskLock.Lock() - defer consumerTaskLock.Unlock() - consumer.isCurrentDone = false - -} - -func (consumer *ShardConsumerWorker) setConsumerIsCurrentDoneToTrue() { - consumerTaskLock.Lock() - defer consumerTaskLock.Unlock() - consumer.isCurrentDone = true +func (consumer *ShardConsumerWorker) setTaskDoneFlag(done bool) { + consumer.taskLock.Lock() + defer consumer.taskLock.Unlock() + consumer.isCurrentDone = done } -func (consumer *ShardConsumerWorker) getConsumerIsCurrentDoneStatus() bool { - consumerTaskLock.RLock() - defer consumerTaskLock.RUnlock() +func (consumer *ShardConsumerWorker) isTaskDone() bool { + consumer.taskLock.RLock() + defer consumer.taskLock.RUnlock() return consumer.isCurrentDone } - -func (consumer *ShardConsumerWorker) setIsFlushCheckpointDoneToFalse() { - shutDownLock.Lock() - defer shutDownLock.Unlock() - consumer.isFlushCheckpointDone = false -} - -func (consumer *ShardConsumerWorker) setIsFlushCheckpointDoneToTrue() { - shutDownLock.Lock() - defer shutDownLock.Unlock() - consumer.isFlushCheckpointDone = true -} - -func (consumer *ShardConsumerWorker) getIsFlushCheckpointDoneStatus() bool { - shutDownLock.RLock() - defer shutDownLock.RUnlock() - return consumer.isFlushCheckpointDone -} diff --git a/consumer/tasks.go b/consumer/tasks.go index eb6429d5..cb393e92 100644 --- a/consumer/tasks.go +++ b/consumer/tasks.go @@ -6,17 +6,17 @@ import ( "runtime" "time" - sls "github.com/aliyun/aliyun-log-go-sdk" "github.com/go-kit/kit/log/level" ) func (consumer *ShardConsumerWorker) consumerInitializeTask() (string, error) { + // read checkpoint firstly checkpoint, err := consumer.client.getCheckPoint(consumer.shardId) if err != nil { - return checkpoint, err + return "", err } if checkpoint != "" && err == nil { - consumer.consumerCheckPointTracker.setPersistentCheckPoint(checkpoint) + consumer.consumerCheckPointTracker.initCheckPoint(checkpoint) return checkpoint, nil } @@ -38,17 +38,38 @@ func (consumer *ShardConsumerWorker) consumerInitializeTask() (string, error) { cursor, err := consumer.client.getCursor(consumer.shardId, fmt.Sprintf("%v", consumer.client.option.CursorStartTime)) if err != nil { level.Warn(consumer.logger).Log("msg", "get specialCursor error", "shard", consumer.shardId, "error", err) - } return cursor, err } - level.Info(consumer.logger).Log("msg", "CursorPosition setting error, please reset with BEGIN_CURSOR or END_CURSOR or SPECIAL_TIMER_CURSOR") + level.Warn(consumer.logger).Log("msg", "CursorPosition setting error, please reset with BEGIN_CURSOR or END_CURSOR or SPECIAL_TIMER_CURSOR") return "", errors.New("CursorPositionError") } -func (consumer *ShardConsumerWorker) consumerFetchTask() (*sls.LogGroupList, string, error) { - logGroup, next_cursor, err := consumer.client.pullLogs(consumer.shardId, consumer.nextFetchCursor) - return logGroup, next_cursor, err +func (consumer *ShardConsumerWorker) nextFetchTask() error { + // update last fetch time, for control fetch frequency + consumer.lastFetchTime = time.Now() + + logGroup, nextCursor, err := consumer.client.pullLogs(consumer.shardId, consumer.nextFetchCursor) + if err != nil { + return err + } + // set cursors user to decide whether to save according to the execution of `process` + consumer.consumerCheckPointTracker.setCurrentCheckPoint(consumer.nextFetchCursor) + consumer.lastFetchLogGroupList = logGroup + consumer.nextFetchCursor = nextCursor + consumer.lastFetchGroupCount = GetLogGroupCount(consumer.lastFetchLogGroupList) + consumer.consumerCheckPointTracker.setNextCheckPoint(consumer.nextFetchCursor) + level.Debug(consumer.logger).Log( + "shardId", consumer.shardId, + "fetch log count", GetLogCount(consumer.lastFetchLogGroupList), + ) + if consumer.lastFetchGroupCount == 0 { + consumer.lastFetchLogGroupList = nil + // may no new data can be pulled, no process func can trigger checkpoint saving + consumer.saveCheckPointIfNeeded() + } + + return nil } func (consumer *ShardConsumerWorker) consumerProcessTask() string { @@ -61,15 +82,15 @@ func (consumer *ShardConsumerWorker) consumerProcessTask() string { for { if consumer.consumerRetryProcessTask() { break - } else { - time.Sleep(time.Second * 2) } + time.Sleep(time.Second * 2) } } }() if consumer.lastFetchLogGroupList != nil { - consumer.rollBackCheckpoint = consumer.process(consumer.shardId, consumer.lastFetchLogGroupList) - consumer.consumerCheckPointTracker.flushCheck() + consumer.rollBackCheckpoint = consumer.process(consumer.shardId, consumer.lastFetchLogGroupList, consumer.consumerCheckPointTracker) + consumer.saveCheckPointIfNeeded() + consumer.lastFetchLogGroupList = nil } return consumer.rollBackCheckpoint } @@ -83,8 +104,7 @@ func (consumer *ShardConsumerWorker) consumerRetryProcessTask() bool { level.Error(consumer.logger).Log("msg", "get panic in your process function", "error", r, "stack", string(stackBuf)) } }() - consumer.rollBackCheckpoint = consumer.process(consumer.shardId, consumer.lastFetchLogGroupList) - consumer.consumerCheckPointTracker.flushCheck() + consumer.rollBackCheckpoint = consumer.process(consumer.shardId, consumer.lastFetchLogGroupList, consumer.consumerCheckPointTracker) + consumer.saveCheckPointIfNeeded() return true - } diff --git a/consumer/util.go b/consumer/util.go index 17669cb9..cff93a8c 100644 --- a/consumer/util.go +++ b/consumer/util.go @@ -1,9 +1,10 @@ package consumerLibrary import ( - "github.com/aliyun/aliyun-log-go-sdk" "reflect" "time" + + sls "github.com/aliyun/aliyun-log-go-sdk" ) // List removal of duplicate elements diff --git a/consumer/worker.go b/consumer/worker.go index 5cfa59f3..6df215d1 100644 --- a/consumer/worker.go +++ b/consumer/worker.go @@ -13,16 +13,39 @@ import ( ) type ConsumerWorker struct { - consumerHeatBeat *ConsumerHeatBeat + consumerHeatBeat *ConsumerHeartBeat client *ConsumerClient workerShutDownFlag *atomic.Bool shardConsumer sync.Map // map[int]*ShardConsumerWorker - do func(shard int, logGroup *sls.LogGroupList) string + do func(shard int, logGroup *sls.LogGroupList, checkpointTracker CheckPointTracer) string waitGroup sync.WaitGroup Logger log.Logger } +// depreciated: this old logic is to automatically save to memory, and then commit at a fixed time +// we highly recommend you to use InitConsumerWorkerWithCheckpointTracker func InitConsumerWorker(option LogHubConfig, do func(int, *sls.LogGroupList) string) *ConsumerWorker { + if option.AutoCommitDisabled { + panic("auto commit already disabled, sdk will not save any checkpoint, " + + "please use InitConsumerWorkerWithCheckpointTracker or set AutoCommitDisabled to false") + } + return InitConsumerWorkerWithCheckpointTracker( + option, + func(shardId int, logGroupList *sls.LogGroupList, checkpointTracker CheckPointTracer) string { + cursor := do(shardId, logGroupList) + // keep the original logic + // if cursor is not empty, we don't save, + if cursor == "" { + checkpointTracker.SaveCheckPoint(false) + } + return cursor + }, + ) +} + +// InitConsumerWorkerWithCheckpointTracker +// please note that you need to save after the process is successful, +func InitConsumerWorkerWithCheckpointTracker(option LogHubConfig, do func(int, *sls.LogGroupList, CheckPointTracer) string) *ConsumerWorker { logger := logConfig(option) consumerClient := initConsumerClient(option, logger) consumerHeatBeat := initConsumerHeatBeat(consumerClient, logger) @@ -69,7 +92,7 @@ func (consumerWorker *ConsumerWorker) run() { break } shardConsumer := consumerWorker.getShardConsumer(shard) - if shardConsumer.getConsumerIsCurrentDoneStatus() { + if shardConsumer.isTaskDone() { shardConsumer.consume() } else { continue @@ -111,7 +134,7 @@ func (consumerWorker *ConsumerWorker) getShardConsumer(shardId int) *ShardConsum if ok { return consumer.(*ShardConsumerWorker) } - consumerIns := initShardConsumerWorker(shardId, consumerWorker.client, consumerWorker.do, consumerWorker.Logger) + consumerIns := initShardConsumerWorker(shardId, consumerWorker.client, consumerWorker.consumerHeatBeat, consumerWorker.do, consumerWorker.Logger) consumerWorker.shardConsumer.Store(shardId, consumerIns) return consumerIns From 40c3949bb3372a7e2fcaf89db4929cf7a642592b Mon Sep 17 00:00:00 2001 From: FFish Date: Thu, 11 May 2023 12:13:33 +0000 Subject: [PATCH 2/9] fix: little bug and update docs/example --- consumer/README.md | 5 +-- consumer/shard_worker.go | 8 ++--- example/consumer/copy_data/copy_data.go | 8 +++-- example/consumer/demo/simple_demo.go | 9 ++--- example/consumer/reset_checkpoint_demo.go | 40 +++++++++++++---------- 5 files changed, 39 insertions(+), 31 deletions(-) diff --git a/consumer/README.md b/consumer/README.md index f06addd0..3c80636b 100644 --- a/consumer/README.md +++ b/consumer/README.md @@ -28,6 +28,7 @@ git clone git@github.com:aliyun/aliyun-log-go-sdk.git ``` ## 原理剖析及快速入门 + 参考教程: [ALiyun LOG Go Consumer Library 快速入门及原理剖析](https://developer.aliyun.com/article/693820) ## 使用步骤 @@ -101,8 +102,8 @@ consumerWorker.Start() 4.**关闭消费者** ``` -ch:=make(chan os.Signal) //将os信号值作为信道 -signal.Notify(ch, os.Kill, os.Interrupt) +ch := make(chan os.Signal, 1) //将os信号值作为信道 +signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT) consumerWorker.Start() if _, ok := <-ch; ok { // 当获取到os停止信号以后,例如ctrl+c触发的os信号,会调用消费者退出方法进行退出。 consumerWorker.StopAndWait() diff --git a/consumer/shard_worker.go b/consumer/shard_worker.go index 17c162dc..4fd89369 100644 --- a/consumer/shard_worker.go +++ b/consumer/shard_worker.go @@ -28,7 +28,6 @@ type ShardConsumerWorker struct { statusLock sync.RWMutex taskLock sync.RWMutex - shutDownLock sync.RWMutex } func (consumer *ShardConsumerWorker) setConsumerStatus(status string) { @@ -83,7 +82,7 @@ func (consumer *ShardConsumerWorker) consume() { var err error retryTimes := 3 for i := 0; i < retryTimes; i++ { - err = consumer.consumerCheckPointTracker.SaveCheckPoint(true) + err = consumer.consumerCheckPointTracker.flushCheckPoint() if err == nil { break } @@ -158,11 +157,12 @@ func (consumer *ShardConsumerWorker) shouldFetch() bool { } func (consumer *ShardConsumerWorker) saveCheckPointIfNeeded() { - if !consumer.client.option.AutoCommitDisabled { + if consumer.client.option.AutoCommitDisabled { return } if time.Since(consumer.lastCheckpointSaveTime) > time.Millisecond*time.Duration(consumer.client.option.AutoCommitIntervalInMS) { - consumer.consumerCheckPointTracker.SaveCheckPoint(true) + consumer.consumerCheckPointTracker.flushCheckPoint() + consumer.lastCheckpointSaveTime = time.Now() } } diff --git a/example/consumer/copy_data/copy_data.go b/example/consumer/copy_data/copy_data.go index 3b163007..87120fe1 100644 --- a/example/consumer/copy_data/copy_data.go +++ b/example/consumer/copy_data/copy_data.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "os/signal" + "syscall" sls "github.com/aliyun/aliyun-log-go-sdk" consumerLibrary "github.com/aliyun/aliyun-log-go-sdk/consumer" @@ -46,9 +47,9 @@ func main() { if err != nil { fmt.Println(err) } - consumerWorker := consumerLibrary.InitConsumerWorker(option, process) + consumerWorker := consumerLibrary.InitConsumerWorkerWithCheckpointTracker(option, process) ch := make(chan os.Signal) - signal.Notify(ch, os.Kill, os.Interrupt) + signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT) consumerWorker.Start() if _, ok := <-ch; ok { level.Info(consumerWorker.Logger).Log("msg", "get stop signal, start to stop consumer worker", "consumer worker name", option.ConsumerName) @@ -56,7 +57,7 @@ func main() { } } -func process(shardId int, logGroupList *sls.LogGroupList) string { +func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker consumerLibrary.CheckPointTracer) string { for _, logGroup := range logGroupList.LogGroups { err := client.PutLogs(option.Project, "copy-logstore", logGroup) if err != nil { @@ -64,5 +65,6 @@ func process(shardId int, logGroupList *sls.LogGroupList) string { } } fmt.Printf("shardId %v processing works sucess\n", shardId) + checkpointTracker.SaveCheckPoint(false) return "" } diff --git a/example/consumer/demo/simple_demo.go b/example/consumer/demo/simple_demo.go index 04b318b1..3bbef415 100644 --- a/example/consumer/demo/simple_demo.go +++ b/example/consumer/demo/simple_demo.go @@ -28,9 +28,9 @@ func main() { CursorPosition: consumerLibrary.END_CURSOR, } - consumerWorker := consumerLibrary.InitConsumerWorker(option, process) - ch := make(chan os.Signal) - signal.Notify(ch, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR1, syscall.SIGUSR2) + consumerWorker := consumerLibrary.InitConsumerWorkerWithCheckpointTracker(option, process) + ch := make(chan os.Signal, 1) + signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) consumerWorker.Start() if _, ok := <-ch; ok { level.Info(consumerWorker.Logger).Log("msg", "get stop signal, start to stop consumer worker", "consumer worker name", option.ConsumerName) @@ -40,7 +40,8 @@ func main() { // Fill in your consumption logic here, and be careful not to change the parameters of the function and the return value, // otherwise you will report errors. -func process(shardId int, logGroupList *sls.LogGroupList) string { +func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker consumerLibrary.CheckPointTracer) string { fmt.Println(shardId, logGroupList) + checkpointTracker.SaveCheckPoint(false) return "" } diff --git a/example/consumer/reset_checkpoint_demo.go b/example/consumer/reset_checkpoint_demo.go index 56aac993..590f9b1b 100644 --- a/example/consumer/reset_checkpoint_demo.go +++ b/example/consumer/reset_checkpoint_demo.go @@ -2,13 +2,14 @@ package main import ( "fmt" - "github.com/aliyun/aliyun-log-go-sdk" - "github.com/aliyun/aliyun-log-go-sdk/consumer" - "github.com/go-kit/kit/log/level" "os" "os/signal" "syscall" "time" + + sls "github.com/aliyun/aliyun-log-go-sdk" + consumerLibrary "github.com/aliyun/aliyun-log-go-sdk/consumer" + "github.com/go-kit/kit/log/level" ) // README : @@ -19,7 +20,6 @@ import ( // shardMap 里面,就重置消费位点为当前时间的cursor, 从当前时间进行消费,不在消费存量数据。 // Note: 使用该demo时,消费组必须是之前创建过并存在的。 - func main() { option := consumerLibrary.LogHubConfig{ Endpoint: "", @@ -34,12 +34,12 @@ func main() { CursorPosition: consumerLibrary.BEGIN_CURSOR, } err := UpdateConsumerGroupCheckPoint(option) - if err != nil{ + if err != nil { fmt.Println(err) return } - consumerWorker := consumerLibrary.InitConsumerWorker(option, process) - ch := make(chan os.Signal) + consumerWorker := consumerLibrary.InitConsumerWorkerWithCheckpointTracker(option, process) + ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR1, syscall.SIGUSR2) consumerWorker.Start() if _, ok := <-ch; ok { @@ -50,34 +50,38 @@ func main() { // Fill in your consumption logic here, and be careful not to change the parameters of the function and the return value, // otherwise you will report errors. -func process(shardId int, logGroupList *sls.LogGroupList) string { - // 这里填入自己的消费逻辑 +func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker consumerLibrary.CheckPointTracer) string { + // 这里填入自己的消费处理逻辑 和 cpt保存逻辑 fmt.Println(logGroupList) return "" } -func updateCheckpoint(config consumerLibrary.LogHubConfig,client sls.Client,shardId int) error { +func updateCheckpoint(config consumerLibrary.LogHubConfig, client *sls.Client, shardId int) error { from := fmt.Sprintf("%d", time.Now().Unix()) - cursor, err := client.GetCursor(config.Project,config.Logstore, shardId, from) + cursor, err := client.GetCursor(config.Project, config.Logstore, shardId, from) if err != nil { fmt.Println(err) } - return client.UpdateCheckpoint(config.Project,config.Logstore,config.ConsumerGroupName,"",shardId, cursor,true) + return client.UpdateCheckpoint(config.Project, config.Logstore, config.ConsumerGroupName, "", shardId, cursor, true) } func UpdateConsumerGroupCheckPoint(config consumerLibrary.LogHubConfig) error { - client := sls.Client{Endpoint:config.Endpoint, AccessKeyID:config.AccessKeyID, AccessKeySecret:config.AccessKeySecret} - shards, err := client.ListShards(config.Project,config.Logstore) + client := &sls.Client{ + Endpoint: config.Endpoint, + AccessKeyID: config.AccessKeyID, + AccessKeySecret: config.AccessKeySecret, + } + shards, err := client.ListShards(config.Project, config.Logstore) if err != nil { return err - }else { - for _,v := range shards { + } else { + for _, v := range shards { err = updateCheckpoint(config, client, v.ShardID) - if err != nil{ + if err != nil { return err } } } return nil -} \ No newline at end of file +} From 06b1554867ca6179640df7b799669d4e4d6f262a Mon Sep 17 00:00:00 2001 From: FFish Date: Fri, 19 May 2023 08:33:21 +0000 Subject: [PATCH 3/9] feat: refine consume() func --- consumer/config.go | 21 +++--- consumer/shard_worker.go | 149 +++++++++++++++++++-------------------- consumer/worker.go | 8 +-- 3 files changed, 84 insertions(+), 94 deletions(-) diff --git a/consumer/config.go b/consumer/config.go index 154f74e5..66284961 100644 --- a/consumer/config.go +++ b/consumer/config.go @@ -64,14 +64,15 @@ type LogHubConfig struct { } const ( - BEGIN_CURSOR = "BEGIN_CURSOR" - END_CURSOR = "END_CURSOR" - SPECIAL_TIMER_CURSOR = "SPECIAL_TIMER_CURSOR" - INITIALIZING = "INITIALIZING" - INITIALIZING_DONE = "INITIALIZING_DONE" - PULL_PROCESSING = "PULL_PROCESSING" - PULL_PROCESSING_DONE = "PULL_PROCESSING_DONE" - CONSUME_PROCESSING = "CONSUME_PROCESSING" - CONSUME_PROCESSING_DONE = "CONSUME_PROCESSING_DONE" - SHUTDOWN_COMPLETE = "SHUTDOWN_COMPLETE" + BEGIN_CURSOR = "BEGIN_CURSOR" + END_CURSOR = "END_CURSOR" + SPECIAL_TIMER_CURSOR = "SPECIAL_TIMER_CURSOR" +) + +const ( + INITIALIZING = "INITIALIZING" + PULLING = "PULLING" + PROCESSING = "PROCESSING" + SHUTTING_DOWN = "SHUTTING_DOWN" + SHUTDOWN_COMPLETE = "SHUTDOWN_COMPLETE" ) diff --git a/consumer/shard_worker.go b/consumer/shard_worker.go index 4fd89369..5e4252c4 100644 --- a/consumer/shard_worker.go +++ b/consumer/shard_worker.go @@ -12,7 +12,7 @@ import ( type ShardConsumerWorker struct { client *ConsumerClient consumerCheckPointTracker *DefaultCheckPointTracker - consumerShutDownFlag bool + shutdownFlag bool lastFetchLogGroupList *sls.LogGroupList nextFetchCursor string lastFetchGroupCount int @@ -20,14 +20,15 @@ type ShardConsumerWorker struct { consumerStatus string process func(shard int, logGroup *sls.LogGroupList, checkpointTracker CheckPointTracer) string shardId int - isCurrentDone bool - logger log.Logger + // TODO: refine to channel + isCurrentDone bool + logger log.Logger // unix time lastCheckpointSaveTime time.Time rollBackCheckpoint string - statusLock sync.RWMutex - taskLock sync.RWMutex + taskLock sync.RWMutex + statusLock sync.RWMutex } func (consumer *ShardConsumerWorker) setConsumerStatus(status string) { @@ -44,7 +45,7 @@ func (consumer *ShardConsumerWorker) getConsumerStatus() string { func initShardConsumerWorker(shardId int, consumerClient *ConsumerClient, consumerHeartBeat *ConsumerHeartBeat, do func(shard int, logGroup *sls.LogGroupList, checkpointTracker CheckPointTracer) string, logger log.Logger) *ShardConsumerWorker { shardConsumeWorker := &ShardConsumerWorker{ - consumerShutDownFlag: false, + shutdownFlag: false, process: do, consumerCheckPointTracker: initConsumerCheckpointTracker(shardId, consumerClient, consumerHeartBeat, logger), client: consumerClient, @@ -59,86 +60,78 @@ func initShardConsumerWorker(shardId int, consumerClient *ConsumerClient, consum } func (consumer *ShardConsumerWorker) consume() { - if consumer.consumerShutDownFlag { + if !consumer.isTaskDone() { + return + } + + // start a new task + // initial task / fetch data task / processing task / shutdown task + consumer.setTaskDoneFlag(false) + switch consumer.getConsumerStatus() { + case INITIALIZING: go func() { - defer consumer.setConsumerStatus(SHUTDOWN_COMPLETE) - // if processing, we can wait it to switch status, otherwise we save checkpoint straightly - if consumer.getConsumerStatus() == CONSUME_PROCESSING { - level.Info(consumer.logger).Log("msg", "Consumption is in progress, sleep to wait for consumption to be completed") - shutdownWaitTimes := 10 - // OPTIMIZE - // now just sleep, won't wait until the end - for i := 0; i < shutdownWaitTimes; i++ { - time.Sleep(time.Millisecond * 10) - if consumer.getConsumerStatus() != CONSUME_PROCESSING { - break - } - if i == shutdownWaitTimes { - level.Warn(consumer.logger).Log("msg", "wait many times, but last process may be not over yes", "retryTimes", shutdownWaitTimes) - } - } + cursor, err := consumer.consumerInitializeTask() + if err == nil { + consumer.nextFetchCursor = cursor } - - var err error - retryTimes := 3 - for i := 0; i < retryTimes; i++ { - err = consumer.consumerCheckPointTracker.flushCheckPoint() - if err == nil { - break - } + consumer.updateStatus(err == nil) + }() + case PULLING: + go func() { + if !consumer.shouldFetch() { + level.Debug(consumer.logger).Log("msg", "Pull Log Current Limitation and Re-Pull Log") + consumer.updateStatus(false) + } + err := consumer.nextFetchTask() + consumer.updateStatus(err == nil && consumer.lastFetchGroupCount > 0) + }() + case PROCESSING: + go func() { + rollBackCheckpoint := consumer.consumerProcessTask() + if rollBackCheckpoint != "" { + consumer.nextFetchCursor = rollBackCheckpoint + level.Info(consumer.logger).Log( + "msg", "Checkpoints set for users have been reset", + "shardId", consumer.shardId, + "rollBackCheckpoint", rollBackCheckpoint, + ) } + consumer.updateStatus(true) + }() + case SHUTTING_DOWN: + go func() { + err := consumer.consumerCheckPointTracker.flushCheckPoint() if err == nil { - level.Info(consumer.logger).Log("msg", "shardworker are shut down complete", "shardWorkerId", consumer.shardId) + level.Info(consumer.logger).Log("msg", "shard worker status shutdown_complete", "shardWorkerId", consumer.shardId) } else { - level.Warn(consumer.logger).Log("msg", "failed after retry", "retryTimes", retryTimes, "err", err) + level.Warn(consumer.logger).Log("msg", "failed to flush checkpoint when shutdown", "err", err) } + + consumer.updateStatus(err == nil) }() - } else { - consumer.setTaskDoneFlag(false) - if consumer.getConsumerStatus() == INITIALIZING { - go func() { - defer consumer.setTaskDoneFlag(true) - - cursor, err := consumer.consumerInitializeTask() - if err != nil { - consumer.setConsumerStatus(INITIALIZING) - } else { - consumer.nextFetchCursor = cursor - consumer.setConsumerStatus(INITIALIZING_DONE) - } - }() - } else if consumer.getConsumerStatus() == INITIALIZING_DONE || consumer.getConsumerStatus() == CONSUME_PROCESSING_DONE { - consumer.setConsumerStatus(PULL_PROCESSING) - go func() { - defer consumer.setTaskDoneFlag(true) - if !consumer.shouldFetch() { - level.Debug(consumer.logger).Log("msg", "Pull Log Current Limitation and Re-Pull Log") - consumer.setConsumerStatus(INITIALIZING_DONE) - } - - if err := consumer.nextFetchTask(); err != nil { - consumer.setConsumerStatus(INITIALIZING_DONE) - } else { - consumer.setConsumerStatus(PULL_PROCESSING_DONE) - } - }() - } else if consumer.getConsumerStatus() == PULL_PROCESSING_DONE { - consumer.setConsumerStatus(CONSUME_PROCESSING) - go func() { - defer consumer.setTaskDoneFlag(true) - defer consumer.setConsumerStatus(CONSUME_PROCESSING_DONE) - rollBackCheckpoint := consumer.consumerProcessTask() - if rollBackCheckpoint != "" { - consumer.nextFetchCursor = rollBackCheckpoint - level.Info(consumer.logger).Log( - "msg", "Checkpoints set for users have been reset", - "shardId", consumer.shardId, - "rollBackCheckpoint", rollBackCheckpoint, - ) - } - }() + default: + consumer.setTaskDoneFlag(true) + } +} + +func (consumer *ShardConsumerWorker) updateStatus(success bool) { + status := consumer.getConsumerStatus() + if (status == SHUTTING_DOWN) { + if success { + consumer.setConsumerStatus(SHUTDOWN_COMPLETE) + } + } else if consumer.shutdownFlag { + consumer.setConsumerStatus(SHUTTING_DOWN) + } else if success { + switch status { + case INITIALIZING, PULLING: + consumer.setConsumerStatus(PROCESSING) + case PROCESSING: + consumer.setConsumerStatus(PULLING) } } + + consumer.setTaskDoneFlag(true) } func (consumer *ShardConsumerWorker) shouldFetch() bool { @@ -167,7 +160,7 @@ func (consumer *ShardConsumerWorker) saveCheckPointIfNeeded() { } func (consumer *ShardConsumerWorker) consumerShutDown() { - consumer.consumerShutDownFlag = true + consumer.shutdownFlag = true if !consumer.isShutDownComplete() { consumer.consume() } diff --git a/consumer/worker.go b/consumer/worker.go index 6df215d1..9b22a0e5 100644 --- a/consumer/worker.go +++ b/consumer/worker.go @@ -92,11 +92,7 @@ func (consumerWorker *ConsumerWorker) run() { break } shardConsumer := consumerWorker.getShardConsumer(shard) - if shardConsumer.isTaskDone() { - shardConsumer.consume() - } else { - continue - } + shardConsumer.consume() } consumerWorker.cleanShardConsumer(heldShards) TimeToSleepInMillsecond(consumerWorker.client.option.DataFetchIntervalInMs, lastFetchTime, consumerWorker.workerShutDownFlag.Load()) @@ -116,7 +112,7 @@ func (consumerWorker *ConsumerWorker) shutDownAndWait() { consumer := value.(*ShardConsumerWorker) if !consumer.isShutDownComplete() { consumer.consumerShutDown() - } else if consumer.isShutDownComplete() { + } else { consumerWorker.shardConsumer.Delete(key) } return true From a0c4903ab31eb54419296e035a2df30edae02ff4 Mon Sep 17 00:00:00 2001 From: FFish Date: Sun, 21 May 2023 13:04:00 +0000 Subject: [PATCH 4/9] fix: missing a return and rename wrong CheckpointTracker naming --- consumer/checkpoint_tracker.go | 2 +- consumer/shard_worker.go | 7 ++++--- consumer/worker.go | 6 +++--- example/consumer/copy_data/copy_data.go | 2 +- example/consumer/demo/simple_demo.go | 2 +- example/consumer/reset_checkpoint_demo.go | 6 +++--- 6 files changed, 13 insertions(+), 12 deletions(-) diff --git a/consumer/checkpoint_tracker.go b/consumer/checkpoint_tracker.go index e1c15ba3..b50f0a65 100644 --- a/consumer/checkpoint_tracker.go +++ b/consumer/checkpoint_tracker.go @@ -8,7 +8,7 @@ import ( "github.com/go-kit/kit/log/level" ) -type CheckPointTracer interface { +type CheckPointTracker interface { // GetCheckPoint get lastest saved check point GetCheckPoint() string // GetCurrentCursor get current fetched data cursor diff --git a/consumer/shard_worker.go b/consumer/shard_worker.go index 5e4252c4..cc1e6439 100644 --- a/consumer/shard_worker.go +++ b/consumer/shard_worker.go @@ -18,7 +18,7 @@ type ShardConsumerWorker struct { lastFetchGroupCount int lastFetchTime time.Time consumerStatus string - process func(shard int, logGroup *sls.LogGroupList, checkpointTracker CheckPointTracer) string + process func(shard int, logGroup *sls.LogGroupList, checkpointTracker CheckPointTracker) string shardId int // TODO: refine to channel isCurrentDone bool @@ -43,7 +43,7 @@ func (consumer *ShardConsumerWorker) getConsumerStatus() string { return consumer.consumerStatus } -func initShardConsumerWorker(shardId int, consumerClient *ConsumerClient, consumerHeartBeat *ConsumerHeartBeat, do func(shard int, logGroup *sls.LogGroupList, checkpointTracker CheckPointTracer) string, logger log.Logger) *ShardConsumerWorker { +func initShardConsumerWorker(shardId int, consumerClient *ConsumerClient, consumerHeartBeat *ConsumerHeartBeat, do func(shard int, logGroup *sls.LogGroupList, checkpointTracker CheckPointTracker) string, logger log.Logger) *ShardConsumerWorker { shardConsumeWorker := &ShardConsumerWorker{ shutdownFlag: false, process: do, @@ -81,6 +81,7 @@ func (consumer *ShardConsumerWorker) consume() { if !consumer.shouldFetch() { level.Debug(consumer.logger).Log("msg", "Pull Log Current Limitation and Re-Pull Log") consumer.updateStatus(false) + return } err := consumer.nextFetchTask() consumer.updateStatus(err == nil && consumer.lastFetchGroupCount > 0) @@ -116,7 +117,7 @@ func (consumer *ShardConsumerWorker) consume() { func (consumer *ShardConsumerWorker) updateStatus(success bool) { status := consumer.getConsumerStatus() - if (status == SHUTTING_DOWN) { + if status == SHUTTING_DOWN { if success { consumer.setConsumerStatus(SHUTDOWN_COMPLETE) } diff --git a/consumer/worker.go b/consumer/worker.go index 9b22a0e5..03922e84 100644 --- a/consumer/worker.go +++ b/consumer/worker.go @@ -17,7 +17,7 @@ type ConsumerWorker struct { client *ConsumerClient workerShutDownFlag *atomic.Bool shardConsumer sync.Map // map[int]*ShardConsumerWorker - do func(shard int, logGroup *sls.LogGroupList, checkpointTracker CheckPointTracer) string + do func(shard int, logGroup *sls.LogGroupList, checkpointTracker CheckPointTracker) string waitGroup sync.WaitGroup Logger log.Logger } @@ -31,7 +31,7 @@ func InitConsumerWorker(option LogHubConfig, do func(int, *sls.LogGroupList) str } return InitConsumerWorkerWithCheckpointTracker( option, - func(shardId int, logGroupList *sls.LogGroupList, checkpointTracker CheckPointTracer) string { + func(shardId int, logGroupList *sls.LogGroupList, checkpointTracker CheckPointTracker) string { cursor := do(shardId, logGroupList) // keep the original logic // if cursor is not empty, we don't save, @@ -45,7 +45,7 @@ func InitConsumerWorker(option LogHubConfig, do func(int, *sls.LogGroupList) str // InitConsumerWorkerWithCheckpointTracker // please note that you need to save after the process is successful, -func InitConsumerWorkerWithCheckpointTracker(option LogHubConfig, do func(int, *sls.LogGroupList, CheckPointTracer) string) *ConsumerWorker { +func InitConsumerWorkerWithCheckpointTracker(option LogHubConfig, do func(int, *sls.LogGroupList, CheckPointTracker) string) *ConsumerWorker { logger := logConfig(option) consumerClient := initConsumerClient(option, logger) consumerHeatBeat := initConsumerHeatBeat(consumerClient, logger) diff --git a/example/consumer/copy_data/copy_data.go b/example/consumer/copy_data/copy_data.go index 87120fe1..a2fd9f9f 100644 --- a/example/consumer/copy_data/copy_data.go +++ b/example/consumer/copy_data/copy_data.go @@ -57,7 +57,7 @@ func main() { } } -func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker consumerLibrary.CheckPointTracer) string { +func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker consumerLibrary.CheckPointTracker) string { for _, logGroup := range logGroupList.LogGroups { err := client.PutLogs(option.Project, "copy-logstore", logGroup) if err != nil { diff --git a/example/consumer/demo/simple_demo.go b/example/consumer/demo/simple_demo.go index 3bbef415..095e0506 100644 --- a/example/consumer/demo/simple_demo.go +++ b/example/consumer/demo/simple_demo.go @@ -40,7 +40,7 @@ func main() { // Fill in your consumption logic here, and be careful not to change the parameters of the function and the return value, // otherwise you will report errors. -func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker consumerLibrary.CheckPointTracer) string { +func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker consumerLibrary.CheckPointTracker) string { fmt.Println(shardId, logGroupList) checkpointTracker.SaveCheckPoint(false) return "" diff --git a/example/consumer/reset_checkpoint_demo.go b/example/consumer/reset_checkpoint_demo.go index 590f9b1b..4748d0eb 100644 --- a/example/consumer/reset_checkpoint_demo.go +++ b/example/consumer/reset_checkpoint_demo.go @@ -50,7 +50,7 @@ func main() { // Fill in your consumption logic here, and be careful not to change the parameters of the function and the return value, // otherwise you will report errors. -func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker consumerLibrary.CheckPointTracer) string { +func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker consumerLibrary.CheckPointTracker) string { // 这里填入自己的消费处理逻辑 和 cpt保存逻辑 fmt.Println(logGroupList) return "" @@ -68,8 +68,8 @@ func updateCheckpoint(config consumerLibrary.LogHubConfig, client *sls.Client, s func UpdateConsumerGroupCheckPoint(config consumerLibrary.LogHubConfig) error { client := &sls.Client{ - Endpoint: config.Endpoint, - AccessKeyID: config.AccessKeyID, + Endpoint: config.Endpoint, + AccessKeyID: config.AccessKeyID, AccessKeySecret: config.AccessKeySecret, } shards, err := client.ListShards(config.Project, config.Logstore) From aad7ba8be5c154888a15d6527c3eeb3c9c8909e2 Mon Sep 17 00:00:00 2001 From: FFish Date: Tue, 23 May 2023 11:50:19 +0000 Subject: [PATCH 5/9] feat: add update checkpoint sleep --- consumer/checkpoint_tracker.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/consumer/checkpoint_tracker.go b/consumer/checkpoint_tracker.go index b50f0a65..eca0f5ab 100644 --- a/consumer/checkpoint_tracker.go +++ b/consumer/checkpoint_tracker.go @@ -2,6 +2,7 @@ package consumerLibrary import ( "strings" + "time" sls "github.com/aliyun/aliyun-log-go-sdk" "github.com/go-kit/kit/log" @@ -97,6 +98,7 @@ func (tracker *DefaultCheckPointTracker) flushCheckPoint() error { ) return err } + time.Sleep(100 * time.Millisecond) } tracker.savedCheckPoint = tracker.pendingCheckPoint From e72ae84f80728ef4a767979c74ca11831fc69208 Mon Sep 17 00:00:00 2001 From: FFish Date: Thu, 25 May 2023 10:28:13 +0000 Subject: [PATCH 6/9] feat: add Processor interface, support processor to do some clean job after consumer shutdown --- consumer/processor.go | 19 +++++++++++++++++++ consumer/shard_worker.go | 15 +++++++++++---- consumer/tasks.go | 4 ++-- consumer/worker.go | 20 +++++++++++++------- 4 files changed, 45 insertions(+), 13 deletions(-) create mode 100644 consumer/processor.go diff --git a/consumer/processor.go b/consumer/processor.go new file mode 100644 index 00000000..4cc430bc --- /dev/null +++ b/consumer/processor.go @@ -0,0 +1,19 @@ +package consumerLibrary + +import sls "github.com/aliyun/aliyun-log-go-sdk" + +type Processor interface { + Process(int, *sls.LogGroupList, CheckPointTracker) string + Shutdown(CheckPointTracker) error +} + +type ProcessFunc func(int, *sls.LogGroupList, CheckPointTracker) string + +func (processor ProcessFunc) Process(shard int, lgList *sls.LogGroupList, checkpointTracker CheckPointTracker) string { + return processor(shard, lgList, checkpointTracker) +} + +func (processor ProcessFunc) Shutdown(checkpointTracker CheckPointTracker) error { + // Do nothing + return nil +} diff --git a/consumer/shard_worker.go b/consumer/shard_worker.go index cc1e6439..8cc657bc 100644 --- a/consumer/shard_worker.go +++ b/consumer/shard_worker.go @@ -18,7 +18,7 @@ type ShardConsumerWorker struct { lastFetchGroupCount int lastFetchTime time.Time consumerStatus string - process func(shard int, logGroup *sls.LogGroupList, checkpointTracker CheckPointTracker) string + processor Processor shardId int // TODO: refine to channel isCurrentDone bool @@ -43,10 +43,10 @@ func (consumer *ShardConsumerWorker) getConsumerStatus() string { return consumer.consumerStatus } -func initShardConsumerWorker(shardId int, consumerClient *ConsumerClient, consumerHeartBeat *ConsumerHeartBeat, do func(shard int, logGroup *sls.LogGroupList, checkpointTracker CheckPointTracker) string, logger log.Logger) *ShardConsumerWorker { +func initShardConsumerWorker(shardId int, consumerClient *ConsumerClient, consumerHeartBeat *ConsumerHeartBeat, processor Processor, logger log.Logger) *ShardConsumerWorker { shardConsumeWorker := &ShardConsumerWorker{ shutdownFlag: false, - process: do, + processor: processor, consumerCheckPointTracker: initConsumerCheckpointTracker(shardId, consumerClient, consumerHeartBeat, logger), client: consumerClient, consumerStatus: INITIALIZING, @@ -101,7 +101,14 @@ func (consumer *ShardConsumerWorker) consume() { }() case SHUTTING_DOWN: go func() { - err := consumer.consumerCheckPointTracker.flushCheckPoint() + err := consumer.processor.Shutdown(consumer.consumerCheckPointTracker) + if err != nil { + level.Error(consumer.logger).Log("msg", "failed to call processor shutdown", "err", err) + consumer.updateStatus(false) + return + } + + err = consumer.consumerCheckPointTracker.flushCheckPoint() if err == nil { level.Info(consumer.logger).Log("msg", "shard worker status shutdown_complete", "shardWorkerId", consumer.shardId) } else { diff --git a/consumer/tasks.go b/consumer/tasks.go index cb393e92..7b109a71 100644 --- a/consumer/tasks.go +++ b/consumer/tasks.go @@ -88,7 +88,7 @@ func (consumer *ShardConsumerWorker) consumerProcessTask() string { } }() if consumer.lastFetchLogGroupList != nil { - consumer.rollBackCheckpoint = consumer.process(consumer.shardId, consumer.lastFetchLogGroupList, consumer.consumerCheckPointTracker) + consumer.rollBackCheckpoint = consumer.processor.Process(consumer.shardId, consumer.lastFetchLogGroupList, consumer.consumerCheckPointTracker) consumer.saveCheckPointIfNeeded() consumer.lastFetchLogGroupList = nil } @@ -104,7 +104,7 @@ func (consumer *ShardConsumerWorker) consumerRetryProcessTask() bool { level.Error(consumer.logger).Log("msg", "get panic in your process function", "error", r, "stack", string(stackBuf)) } }() - consumer.rollBackCheckpoint = consumer.process(consumer.shardId, consumer.lastFetchLogGroupList, consumer.consumerCheckPointTracker) + consumer.rollBackCheckpoint = consumer.processor.Process(consumer.shardId, consumer.lastFetchLogGroupList, consumer.consumerCheckPointTracker) consumer.saveCheckPointIfNeeded() return true } diff --git a/consumer/worker.go b/consumer/worker.go index 03922e84..cfae3f4a 100644 --- a/consumer/worker.go +++ b/consumer/worker.go @@ -17,7 +17,7 @@ type ConsumerWorker struct { client *ConsumerClient workerShutDownFlag *atomic.Bool shardConsumer sync.Map // map[int]*ShardConsumerWorker - do func(shard int, logGroup *sls.LogGroupList, checkpointTracker CheckPointTracker) string + processor Processor waitGroup sync.WaitGroup Logger log.Logger } @@ -29,9 +29,9 @@ func InitConsumerWorker(option LogHubConfig, do func(int, *sls.LogGroupList) str panic("auto commit already disabled, sdk will not save any checkpoint, " + "please use InitConsumerWorkerWithCheckpointTracker or set AutoCommitDisabled to false") } - return InitConsumerWorkerWithCheckpointTracker( + return InitConsumerWorkerWithProcessor( option, - func(shardId int, logGroupList *sls.LogGroupList, checkpointTracker CheckPointTracker) string { + ProcessFunc(func(shardId int, logGroupList *sls.LogGroupList, checkpointTracker CheckPointTracker) string { cursor := do(shardId, logGroupList) // keep the original logic // if cursor is not empty, we don't save, @@ -39,13 +39,19 @@ func InitConsumerWorker(option LogHubConfig, do func(int, *sls.LogGroupList) str checkpointTracker.SaveCheckPoint(false) } return cursor - }, + }), ) } // InitConsumerWorkerWithCheckpointTracker // please note that you need to save after the process is successful, func InitConsumerWorkerWithCheckpointTracker(option LogHubConfig, do func(int, *sls.LogGroupList, CheckPointTracker) string) *ConsumerWorker { + return InitConsumerWorkerWithProcessor(option, ProcessFunc(do)) +} + +// InitConsumerWorkerWithProcessor +// you need save checkpoint by yourself and can do something after consumer shutdown +func InitConsumerWorkerWithProcessor(option LogHubConfig, processor Processor) *ConsumerWorker { logger := logConfig(option) consumerClient := initConsumerClient(option, logger) consumerHeatBeat := initConsumerHeatBeat(consumerClient, logger) @@ -54,8 +60,8 @@ func InitConsumerWorkerWithCheckpointTracker(option LogHubConfig, do func(int, * client: consumerClient, workerShutDownFlag: atomic.NewBool(false), //shardConsumer: make(map[int]*ShardConsumerWorker), - do: do, - Logger: logger, + processor: processor, + Logger: logger, } if err := consumerClient.createConsumerGroup(); err != nil { level.Error(consumerWorker.Logger).Log( @@ -130,7 +136,7 @@ func (consumerWorker *ConsumerWorker) getShardConsumer(shardId int) *ShardConsum if ok { return consumer.(*ShardConsumerWorker) } - consumerIns := initShardConsumerWorker(shardId, consumerWorker.client, consumerWorker.consumerHeatBeat, consumerWorker.do, consumerWorker.Logger) + consumerIns := initShardConsumerWorker(shardId, consumerWorker.client, consumerWorker.consumerHeatBeat, consumerWorker.processor, consumerWorker.Logger) consumerWorker.shardConsumer.Store(shardId, consumerIns) return consumerIns From e64cd34f543e57c172550363db6de32a02df98ec Mon Sep 17 00:00:00 2001 From: FFish Date: Fri, 26 May 2023 06:34:16 +0000 Subject: [PATCH 7/9] feat: process add error and add README and fix status trans --- consumer/README.md | 26 ++++++++++----- consumer/checkpoint_tracker.go | 8 ++--- consumer/processor.go | 6 ++-- consumer/shard_worker.go | 10 +++--- consumer/tasks.go | 39 +++++++---------------- consumer/worker.go | 6 ++-- example/consumer/copy_data/copy_data.go | 6 ++-- example/consumer/demo/simple_demo.go | 4 +-- example/consumer/reset_checkpoint_demo.go | 4 +-- 9 files changed, 52 insertions(+), 57 deletions(-) diff --git a/consumer/README.md b/consumer/README.md index 3c80636b..4a2c9e76 100644 --- a/consumer/README.md +++ b/consumer/README.md @@ -66,12 +66,10 @@ LogHubConfig是提供给用户的配置类,用于配置消费策略,您可 2.**覆写消费逻辑** ``` -func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker CheckPointTracker) string { - for _, logGroup := range logGroupList.LogGroups { - err := client.PutLogs(option.Project, "copy-logstore", logGroup) - if err != nil { - fmt.Println(err) - } +func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker CheckPointTracker) (string, error) { + err := dosomething() + if err != nil { + return "", nil } fmt.Println("shardId %v processing works success", shardId) // 标记给CheckPointTracker process已成功,保存存档点, @@ -80,18 +78,30 @@ func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker Chec // 推荐大多数场景下使用false即可 checkpointTracker.SaveCheckPoint(false); // 代表process成功保存存档点,但并不直接写入服务器,等待一定的interval后写入 // 不需要重置检查点情况下,请返回空字符串,如需要重置检查点,请返回需要重置的检查点游标。 - // 如果需要重置检查点的情况下,可以返回checkpointTracker.GetCurrentCheckPoint, current checkpoint即尚未process的这批数据开始的检查点 - return "" + // 如果需要重置检查点的情况下,比如可以返回checkpointTracker.GetCurrentCheckPoint, current checkpoint即尚未process的这批数据开始的检查点 + // 如果已经返回error的话,无需重置到current checkpoint,代码会继续process这批数据,一般来说返回空即可 + return "", nil } ``` 在实际消费当中,您只需要根据自己的需要重新覆写消费函数process即可,上图只是一个简单的demo,将consumer获取到的日志进行了打印处理,注意,该函数参数和返回值不可改变,否则会导致消费失败。 +另外的,如果你在process时有特别的需求,比如process暂存,实际异步操作,这里可以实现自己的Processor接口,除了Process函数,可以实现Shutdown函数对异步操作等进行优雅退出。 +但是,请注意,checkpoint tracker是线程不安全的,它仅可负责本次process的checkpoint保存,请不要保存起来这个实例异步进行save! +``` +type Processor interface { + Process(int, *sls.LogGroupList, CheckPointTracker) string + Shutdown(CheckPointTracker) error +} + +``` 3.**创建消费者并开始消费** ``` // option是LogHubConfig的实例 consumerWorker := consumerLibrary.InitConsumerWorkerWithCheckpointTracker(option, process) +// 如果实现了自己的processor,可以使用下面的语句 +// consumerWroer := consumerLibrary.InitConsumerWorkerWithProcessor(option, myProcessor) // 调用Start方法开始消费 consumerWorker.Start() ``` diff --git a/consumer/checkpoint_tracker.go b/consumer/checkpoint_tracker.go index eca0f5ab..c7a14ec8 100644 --- a/consumer/checkpoint_tracker.go +++ b/consumer/checkpoint_tracker.go @@ -21,7 +21,7 @@ type CheckPointTracker interface { type DefaultCheckPointTracker struct { client *ConsumerClient heartBeat *ConsumerHeartBeat - nextCheckPoint string // cursor for already pulled data + nextCursor string // cursor for already pulled data currentCheckPoint string // cursor for data processed, but may not be saved to server pendingCheckPoint string // pending cursor to saved savedCheckPoint string // already saved @@ -44,7 +44,7 @@ func (tracker *DefaultCheckPointTracker) initCheckPoint(cursor string) { } func (tracker *DefaultCheckPointTracker) SaveCheckPoint(force bool) error { - tracker.pendingCheckPoint = tracker.nextCheckPoint + tracker.pendingCheckPoint = tracker.nextCursor if force { return tracker.flushCheckPoint() } @@ -64,8 +64,8 @@ func (tracker *DefaultCheckPointTracker) setCurrentCheckPoint(cursor string) { tracker.currentCheckPoint = cursor } -func (tracker *DefaultCheckPointTracker) setNextCheckPoint(cursor string) { - tracker.nextCheckPoint = cursor +func (tracker *DefaultCheckPointTracker) setNextCursor(cursor string) { + tracker.nextCursor = cursor } func (tracker *DefaultCheckPointTracker) flushCheckPoint() error { diff --git a/consumer/processor.go b/consumer/processor.go index 4cc430bc..35b024d9 100644 --- a/consumer/processor.go +++ b/consumer/processor.go @@ -3,13 +3,13 @@ package consumerLibrary import sls "github.com/aliyun/aliyun-log-go-sdk" type Processor interface { - Process(int, *sls.LogGroupList, CheckPointTracker) string + Process(int, *sls.LogGroupList, CheckPointTracker) (string, error) Shutdown(CheckPointTracker) error } -type ProcessFunc func(int, *sls.LogGroupList, CheckPointTracker) string +type ProcessFunc func(int, *sls.LogGroupList, CheckPointTracker) (string, error) -func (processor ProcessFunc) Process(shard int, lgList *sls.LogGroupList, checkpointTracker CheckPointTracker) string { +func (processor ProcessFunc) Process(shard int, lgList *sls.LogGroupList, checkpointTracker CheckPointTracker) (string, error) { return processor(shard, lgList, checkpointTracker) } diff --git a/consumer/shard_worker.go b/consumer/shard_worker.go index 8cc657bc..511c5a8a 100644 --- a/consumer/shard_worker.go +++ b/consumer/shard_worker.go @@ -25,7 +25,6 @@ type ShardConsumerWorker struct { logger log.Logger // unix time lastCheckpointSaveTime time.Time - rollBackCheckpoint string taskLock sync.RWMutex statusLock sync.RWMutex @@ -54,7 +53,6 @@ func initShardConsumerWorker(shardId int, consumerClient *ConsumerClient, consum lastFetchTime: time.Now(), isCurrentDone: true, logger: logger, - rollBackCheckpoint: "", } return shardConsumeWorker } @@ -88,7 +86,7 @@ func (consumer *ShardConsumerWorker) consume() { }() case PROCESSING: go func() { - rollBackCheckpoint := consumer.consumerProcessTask() + rollBackCheckpoint, err := consumer.consumerProcessTask() if rollBackCheckpoint != "" { consumer.nextFetchCursor = rollBackCheckpoint level.Info(consumer.logger).Log( @@ -97,7 +95,7 @@ func (consumer *ShardConsumerWorker) consume() { "rollBackCheckpoint", rollBackCheckpoint, ) } - consumer.updateStatus(true) + consumer.updateStatus(err == nil) }() case SHUTTING_DOWN: go func() { @@ -132,9 +130,9 @@ func (consumer *ShardConsumerWorker) updateStatus(success bool) { consumer.setConsumerStatus(SHUTTING_DOWN) } else if success { switch status { - case INITIALIZING, PULLING: + case PULLING: consumer.setConsumerStatus(PROCESSING) - case PROCESSING: + case INITIALIZING,PROCESSING: consumer.setConsumerStatus(PULLING) } } diff --git a/consumer/tasks.go b/consumer/tasks.go index 7b109a71..101f39eb 100644 --- a/consumer/tasks.go +++ b/consumer/tasks.go @@ -48,7 +48,7 @@ func (consumer *ShardConsumerWorker) consumerInitializeTask() (string, error) { func (consumer *ShardConsumerWorker) nextFetchTask() error { // update last fetch time, for control fetch frequency consumer.lastFetchTime = time.Now() - + logGroup, nextCursor, err := consumer.client.pullLogs(consumer.shardId, consumer.nextFetchCursor) if err != nil { return err @@ -58,10 +58,10 @@ func (consumer *ShardConsumerWorker) nextFetchTask() error { consumer.lastFetchLogGroupList = logGroup consumer.nextFetchCursor = nextCursor consumer.lastFetchGroupCount = GetLogGroupCount(consumer.lastFetchLogGroupList) - consumer.consumerCheckPointTracker.setNextCheckPoint(consumer.nextFetchCursor) + consumer.consumerCheckPointTracker.setNextCursor(consumer.nextFetchCursor) level.Debug(consumer.logger).Log( "shardId", consumer.shardId, - "fetch log count", GetLogCount(consumer.lastFetchLogGroupList), + "fetch log count", consumer.lastFetchGroupCount, ) if consumer.lastFetchGroupCount == 0 { consumer.lastFetchLogGroupList = nil @@ -72,39 +72,24 @@ func (consumer *ShardConsumerWorker) nextFetchTask() error { return nil } -func (consumer *ShardConsumerWorker) consumerProcessTask() string { +func (consumer *ShardConsumerWorker) consumerProcessTask() (rollBackCheckpoint string, err error) { // If the user's consumption function reports a panic error, it will be captured and retry until sucessed. defer func() { if r := recover(); r != nil { stackBuf := make([]byte, 1<<16) - runtime.Stack(stackBuf, false) - level.Error(consumer.logger).Log("msg", "get panic in your process function", "error", r, "stack", string(stackBuf)) - for { - if consumer.consumerRetryProcessTask() { - break - } - time.Sleep(time.Second * 2) - } + n := runtime.Stack(stackBuf, false) + level.Error(consumer.logger).Log("msg", "get panic in your process function", "error", r, "stack", stackBuf[:n]) + err = fmt.Errorf("get a panic when process: %v", r) } }() if consumer.lastFetchLogGroupList != nil { - consumer.rollBackCheckpoint = consumer.processor.Process(consumer.shardId, consumer.lastFetchLogGroupList, consumer.consumerCheckPointTracker) + rollBackCheckpoint, err = consumer.processor.Process(consumer.shardId, consumer.lastFetchLogGroupList, consumer.consumerCheckPointTracker) + if err != nil { + return + } consumer.saveCheckPointIfNeeded() consumer.lastFetchLogGroupList = nil } - return consumer.rollBackCheckpoint -} -func (consumer *ShardConsumerWorker) consumerRetryProcessTask() bool { - level.Info(consumer.logger).Log("msg", "Start retrying the process function") - defer func() { - if r := recover(); r != nil { - stackBuf := make([]byte, 1<<16) - runtime.Stack(stackBuf, false) - level.Error(consumer.logger).Log("msg", "get panic in your process function", "error", r, "stack", string(stackBuf)) - } - }() - consumer.rollBackCheckpoint = consumer.processor.Process(consumer.shardId, consumer.lastFetchLogGroupList, consumer.consumerCheckPointTracker) - consumer.saveCheckPointIfNeeded() - return true + return } diff --git a/consumer/worker.go b/consumer/worker.go index cfae3f4a..294ccbe6 100644 --- a/consumer/worker.go +++ b/consumer/worker.go @@ -31,21 +31,21 @@ func InitConsumerWorker(option LogHubConfig, do func(int, *sls.LogGroupList) str } return InitConsumerWorkerWithProcessor( option, - ProcessFunc(func(shardId int, logGroupList *sls.LogGroupList, checkpointTracker CheckPointTracker) string { + ProcessFunc(func(shardId int, logGroupList *sls.LogGroupList, checkpointTracker CheckPointTracker) (string, error) { cursor := do(shardId, logGroupList) // keep the original logic // if cursor is not empty, we don't save, if cursor == "" { checkpointTracker.SaveCheckPoint(false) } - return cursor + return cursor, nil }), ) } // InitConsumerWorkerWithCheckpointTracker // please note that you need to save after the process is successful, -func InitConsumerWorkerWithCheckpointTracker(option LogHubConfig, do func(int, *sls.LogGroupList, CheckPointTracker) string) *ConsumerWorker { +func InitConsumerWorkerWithCheckpointTracker(option LogHubConfig, do func(int, *sls.LogGroupList, CheckPointTracker) (string, error)) *ConsumerWorker { return InitConsumerWorkerWithProcessor(option, ProcessFunc(do)) } diff --git a/example/consumer/copy_data/copy_data.go b/example/consumer/copy_data/copy_data.go index a2fd9f9f..f692a3fd 100644 --- a/example/consumer/copy_data/copy_data.go +++ b/example/consumer/copy_data/copy_data.go @@ -57,14 +57,16 @@ func main() { } } -func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker consumerLibrary.CheckPointTracker) string { +func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker consumerLibrary.CheckPointTracker) (string, error) { for _, logGroup := range logGroupList.LogGroups { err := client.PutLogs(option.Project, "copy-logstore", logGroup) if err != nil { fmt.Println(err) + // may lead to partial success + return "", err } } fmt.Printf("shardId %v processing works sucess\n", shardId) checkpointTracker.SaveCheckPoint(false) - return "" + return "", nil } diff --git a/example/consumer/demo/simple_demo.go b/example/consumer/demo/simple_demo.go index 095e0506..fc383241 100644 --- a/example/consumer/demo/simple_demo.go +++ b/example/consumer/demo/simple_demo.go @@ -40,8 +40,8 @@ func main() { // Fill in your consumption logic here, and be careful not to change the parameters of the function and the return value, // otherwise you will report errors. -func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker consumerLibrary.CheckPointTracker) string { +func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker consumerLibrary.CheckPointTracker) (string, error) { fmt.Println(shardId, logGroupList) checkpointTracker.SaveCheckPoint(false) - return "" + return "", nil } diff --git a/example/consumer/reset_checkpoint_demo.go b/example/consumer/reset_checkpoint_demo.go index 4748d0eb..323a4c71 100644 --- a/example/consumer/reset_checkpoint_demo.go +++ b/example/consumer/reset_checkpoint_demo.go @@ -50,10 +50,10 @@ func main() { // Fill in your consumption logic here, and be careful not to change the parameters of the function and the return value, // otherwise you will report errors. -func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker consumerLibrary.CheckPointTracker) string { +func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker consumerLibrary.CheckPointTracker) (string, error) { // 这里填入自己的消费处理逻辑 和 cpt保存逻辑 fmt.Println(logGroupList) - return "" + return "", nil } func updateCheckpoint(config consumerLibrary.LogHubConfig, client *sls.Client, shardId int) error { From 82d58fcf53565f98d957274475dbcb6db83cf812 Mon Sep 17 00:00:00 2001 From: FFish Date: Fri, 26 May 2023 07:19:01 +0000 Subject: [PATCH 8/9] log: add process error log --- consumer/shard_worker.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/consumer/shard_worker.go b/consumer/shard_worker.go index 511c5a8a..39a6fa6b 100644 --- a/consumer/shard_worker.go +++ b/consumer/shard_worker.go @@ -87,6 +87,9 @@ func (consumer *ShardConsumerWorker) consume() { case PROCESSING: go func() { rollBackCheckpoint, err := consumer.consumerProcessTask() + if err != nil { + level.Warn(consumer.logger).Log("messge", "process failed", "err", err) + } if rollBackCheckpoint != "" { consumer.nextFetchCursor = rollBackCheckpoint level.Info(consumer.logger).Log( From 2876f78ff6b964481eb48d4d912b74de67c94e5f Mon Sep 17 00:00:00 2001 From: FFish Date: Mon, 29 May 2023 02:36:41 +0000 Subject: [PATCH 9/9] feat: rename current cursor naming and save checkpoint early --- consumer/README.md | 2 +- consumer/checkpoint_tracker.go | 12 ++++++------ consumer/tasks.go | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/consumer/README.md b/consumer/README.md index 4a2c9e76..403de594 100644 --- a/consumer/README.md +++ b/consumer/README.md @@ -78,7 +78,7 @@ func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker Chec // 推荐大多数场景下使用false即可 checkpointTracker.SaveCheckPoint(false); // 代表process成功保存存档点,但并不直接写入服务器,等待一定的interval后写入 // 不需要重置检查点情况下,请返回空字符串,如需要重置检查点,请返回需要重置的检查点游标。 - // 如果需要重置检查点的情况下,比如可以返回checkpointTracker.GetCurrentCheckPoint, current checkpoint即尚未process的这批数据开始的检查点 + // 如果需要重置检查点的情况下,比如可以返回checkpointTracker.GetCurrentCursor, current checkpoint即尚未process的这批数据开始的检查点 // 如果已经返回error的话,无需重置到current checkpoint,代码会继续process这批数据,一般来说返回空即可 return "", nil } diff --git a/consumer/checkpoint_tracker.go b/consumer/checkpoint_tracker.go index c7a14ec8..77f3df3d 100644 --- a/consumer/checkpoint_tracker.go +++ b/consumer/checkpoint_tracker.go @@ -13,7 +13,7 @@ type CheckPointTracker interface { // GetCheckPoint get lastest saved check point GetCheckPoint() string // GetCurrentCursor get current fetched data cursor - GetCurrentCheckPoint() string + GetCurrentCursor() string // SaveCheckPoint, save checkpoint SaveCheckPoint(force bool) error } @@ -21,8 +21,8 @@ type CheckPointTracker interface { type DefaultCheckPointTracker struct { client *ConsumerClient heartBeat *ConsumerHeartBeat - nextCursor string // cursor for already pulled data - currentCheckPoint string // cursor for data processed, but may not be saved to server + nextCursor string // cursor for already pulled data + currentCursor string // cursor for data processed, but may not be saved to server pendingCheckPoint string // pending cursor to saved savedCheckPoint string // already saved shardId int @@ -56,12 +56,12 @@ func (tracker *DefaultCheckPointTracker) GetCheckPoint() string { return tracker.savedCheckPoint } -func (tracker *DefaultCheckPointTracker) GetCurrentCheckPoint() string { - return tracker.currentCheckPoint +func (tracker *DefaultCheckPointTracker) GetCurrentCursor() string { + return tracker.currentCursor } func (tracker *DefaultCheckPointTracker) setCurrentCheckPoint(cursor string) { - tracker.currentCheckPoint = cursor + tracker.currentCursor = cursor } func (tracker *DefaultCheckPointTracker) setNextCursor(cursor string) { diff --git a/consumer/tasks.go b/consumer/tasks.go index 101f39eb..c2f5512f 100644 --- a/consumer/tasks.go +++ b/consumer/tasks.go @@ -84,10 +84,10 @@ func (consumer *ShardConsumerWorker) consumerProcessTask() (rollBackCheckpoint s }() if consumer.lastFetchLogGroupList != nil { rollBackCheckpoint, err = consumer.processor.Process(consumer.shardId, consumer.lastFetchLogGroupList, consumer.consumerCheckPointTracker) + consumer.saveCheckPointIfNeeded() if err != nil { return } - consumer.saveCheckPointIfNeeded() consumer.lastFetchLogGroupList = nil }