diff --git a/consumer/README.md b/consumer/README.md index 5a99beba..403de594 100644 --- a/consumer/README.md +++ b/consumer/README.md @@ -1,7 +1,5 @@ # Aliyun LOG Go Consumer Library - - Aliyun LOG Go Consumer Library 是一个易于使用且高度可配置的golang 类库,专门为大数据高并发场景下的多个消费者协同消费同一个logstore而编写的纯go语言的类库。 ## 功能特点 @@ -19,9 +17,7 @@ Aliyun LOG Go Consumer Library 是一个易于使用且高度可配置的golang - 用户可以创建多个消费者对同一Logstore中的数据进行消费,而且不用关心消费者之间的负载均衡,consumer library 会进行自动处理,并且保证数据不会被重复消费。在cpu等资源有限情况下可以尽最大能力去消费logstore中的数据,并且会自动为用户保存消费断点到服务端。 - 当网络不稳定出现网络震荡时,consumer library可以在网络恢复时继续消费并且保证数据不会丢失及重复消费。 -- 提供了更多高阶用法,使用户可以通过多种方法去调控运行中的consumer library,具体事例请参考[aliyun log go consumer library 高阶用法](https://yq.aliyun.com/articles/693820) - - +- 提供了更多高阶用法,使用户可以通过多种方法去调控运行中的consumer library ## 安装 @@ -31,53 +27,93 @@ Aliyun LOG Go Consumer Library 是一个易于使用且高度可配置的golang git clone git@github.com:aliyun/aliyun-log-go-sdk.git ``` +## 原理剖析及快速入门 - -##原理剖析及快速入门 - -参考教程: [ALiyun LOG Go Consumer Library 快速入门及原理剖析](https://yq.aliyun.com/articles/693820) - - +参考教程: [ALiyun LOG Go Consumer Library 快速入门及原理剖析](https://developer.aliyun.com/article/693820) ## 使用步骤 1.**配置LogHubConfig** LogHubConfig是提供给用户的配置类,用于配置消费策略,您可以根据不同的需求设定不同的值,各参数含义如其中所示 +|参数|含义|详情| +| --- | --- | --- | +|Endpoint|sls的endpoint|必填,如cn-hangzhou.sls.aliyuncs.com| +|AccessKeyId|aliyun的AccessKeyId|必填| +|AccessKeySecret|aliyun的AccessKeySecret|必填| +|Project|sls的project信息|必填| +|Logstore|sls的logstore|必填| +|ConsumerGroupName|消费组名称|必填| +|Consumer|消费者名称|必填,sls的consumer需要自行指定,请注意不要重复| +|CursorPosition|消费的点位|必填,支持 1.BEGIN_CURSOR: logstore的开始点位 2. END_CURSOR: logstore的最新数据点位 3.SPECIAL_TIME_CURSOR: 自行设置的unix时间戳| +||sls的logstore|必填| +|HeartbeatIntervalInSecond|心跳的时间间隔|非必填,默认时间为20s, sdk会根据心跳时间与服务器确认alive| +|DataFetchIntervalInMs|数据默认拉取的间隔|非必填,默认为200ms| +|MaxFetchLogGroupCount|数据一次拉取的log group数量|非必填,默认为1000| +|CursorStartTime|数据点位的时间戳|非必填,CursorPosition为SPECIAL_TIME_CURSOR时需填写| +|InOrder|shard分裂后是否in order消费|非必填,默认为false,当为true时,分裂shard会在老的read only shard消费完后再继续消费| +|AllowLogLevel|允许的日志级别|非必填,默认为info,日志级别由低到高为debug, info, warn, error,仅高于此AllowLogLevel的才会被log出来| +|LogFileName|程序运行日志文件名称|非必填,默认为stdout| +|IsJsonType|是否为json类型|非必填,默认为logfmt格式,true时为json格式| +|LogMaxSize|日志文件最大size|非必填,默认为10| +|LogMaxBackups|最大保存的old日志文件|非必填,默认为10| +|LogCompass|日志是否压缩|非必填,默认不压缩,如果压缩为gzip压缩| +|HTTPClient|指定http client|非必填,可指定http client实现一些逻辑,sdk发送http请求会使用这个client| +|SecurityToken|aliyun SecurityToken|非必填,参考https://help.aliyun.com/document_detail/47277.html| +|AutoCommitDisabled|是否禁用sdk自动提交checkpoint|非必填,默认不会禁用| +|AutoCommitIntervalInMS|自动提交checkpoint的时间间隔|非必填,单位为MS,默认时间为60s| 2.**覆写消费逻辑** ``` -func process(shardId int, logGroupList *sls.LogGroupList) string { - for _, logGroup := range logGroupList.LogGroups { - err := client.PutLogs(option.Project, "copy-logstore", logGroup) - if err != nil { - fmt.Println(err) - } +func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker CheckPointTracker) (string, error) { + err := dosomething() + if err != nil { + return "", nil } - fmt.Println("shardId %v processing works sucess", shardId) - return "" // 不需要重置检查点情况下,请返回空字符串,如需要重置检查点,请返回需要重置的检查点游标。 + fmt.Println("shardId %v processing works success", shardId) + // 标记给CheckPointTracker process已成功,保存存档点, + // false 标记process已成功,但并不直接写入服务器,等待一定的interval后sdk批量写入 (AutoCommitDisable为false情况SDK会批量写入) + // true 标记已成功, 且直接写入服务器 + // 推荐大多数场景下使用false即可 + checkpointTracker.SaveCheckPoint(false); // 代表process成功保存存档点,但并不直接写入服务器,等待一定的interval后写入 + // 不需要重置检查点情况下,请返回空字符串,如需要重置检查点,请返回需要重置的检查点游标。 + // 如果需要重置检查点的情况下,比如可以返回checkpointTracker.GetCurrentCursor, current checkpoint即尚未process的这批数据开始的检查点 + // 如果已经返回error的话,无需重置到current checkpoint,代码会继续process这批数据,一般来说返回空即可 + return "", nil } ``` -在实际消费当中,您只需要根据自己的需要重新覆写消费函数process 即可,上图只是一个简单的demo,将consumer获取到的日志进行了打印处理,注意,该函数参数和返回值不可改变,否则会导致消费失败。 +在实际消费当中,您只需要根据自己的需要重新覆写消费函数process即可,上图只是一个简单的demo,将consumer获取到的日志进行了打印处理,注意,该函数参数和返回值不可改变,否则会导致消费失败。 +另外的,如果你在process时有特别的需求,比如process暂存,实际异步操作,这里可以实现自己的Processor接口,除了Process函数,可以实现Shutdown函数对异步操作等进行优雅退出。 +但是,请注意,checkpoint tracker是线程不安全的,它仅可负责本次process的checkpoint保存,请不要保存起来这个实例异步进行save! +``` +type Processor interface { + Process(int, *sls.LogGroupList, CheckPointTracker) string + Shutdown(CheckPointTracker) error +} + +``` 3.**创建消费者并开始消费** ``` // option是LogHubConfig的实例 -consumerWorker := consumerLibrary.InitConsumerWorker(option, process) +consumerWorker := consumerLibrary.InitConsumerWorkerWithCheckpointTracker(option, process) +// 如果实现了自己的processor,可以使用下面的语句 +// consumerWroer := consumerLibrary.InitConsumerWorkerWithProcessor(option, myProcessor) // 调用Start方法开始消费 consumerWorker.Start() ``` +> 注意目前已废弃`InitConsumerWorker(option, process)`,其代表在process函数后,sdk会执行一次`checkpointTracker.SaveCheckPoint(false)`,但是无法手动强制写入服务器/获取上一个的checkpoint等功能 调用InitConsumerWorkwer方法,将配置实例对象和消费函数传递到参数中生成消费者实例对象,调用Start方法进行消费。 4.**关闭消费者** ``` -ch:=make(chan os.Signal) //将os信号值作为信道 -signal.Notify(ch, os.Kill, os.Interrupt) +ch := make(chan os.Signal, 1) //将os信号值作为信道 +signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT) consumerWorker.Start() if _, ok := <-ch; ok { // 当获取到os停止信号以后,例如ctrl+c触发的os信号,会调用消费者退出方法进行退出。 consumerWorker.StopAndWait() @@ -87,7 +123,6 @@ if _, ok := <-ch; ok { // 当获取到os停止信号以后,例如ctrl+c触发 上图中的例子通过go的信道做了os信号的监听,当监听到用户触发了os退出信号以后,调用StopAndWait()方法进行退出,用户可以根据自己的需要设计自己的退出逻辑,只需要调用StopAndWait()即可。 - ## 简单样例 为了方便用户可以更快速的上手consumer library 我们提供了两个简单的通过代码操作consumer library的简单样例,请参考[consumer library example](https://github.com/aliyun/aliyun-log-go-sdk/tree/master/example/consumer) diff --git a/consumer/checkpoint_tracker.go b/consumer/checkpoint_tracker.go index a6e99b44..77f3df3d 100644 --- a/consumer/checkpoint_tracker.go +++ b/consumer/checkpoint_tracker.go @@ -1,61 +1,106 @@ package consumerLibrary import ( + "strings" + "time" + + sls "github.com/aliyun/aliyun-log-go-sdk" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - "time" ) -type ConsumerCheckPointTracker struct { - client *ConsumerClient - defaultFlushCheckPointIntervalSec int64 - tempCheckPoint string - lastPersistentCheckPoint string - trackerShardId int - lastCheckTime int64 - logger log.Logger -} - -func initConsumerCheckpointTracker(shardId int, consumerClient *ConsumerClient, logger log.Logger) *ConsumerCheckPointTracker { - checkpointTracker := &ConsumerCheckPointTracker{ - defaultFlushCheckPointIntervalSec: 60, - client: consumerClient, - trackerShardId: shardId, - logger: logger, - } - return checkpointTracker +type CheckPointTracker interface { + // GetCheckPoint get lastest saved check point + GetCheckPoint() string + // GetCurrentCursor get current fetched data cursor + GetCurrentCursor() string + // SaveCheckPoint, save checkpoint + SaveCheckPoint(force bool) error } -func (checkPointTracker *ConsumerCheckPointTracker) setMemoryCheckPoint(cursor string) { - checkPointTracker.tempCheckPoint = cursor +type DefaultCheckPointTracker struct { + client *ConsumerClient + heartBeat *ConsumerHeartBeat + nextCursor string // cursor for already pulled data + currentCursor string // cursor for data processed, but may not be saved to server + pendingCheckPoint string // pending cursor to saved + savedCheckPoint string // already saved + shardId int + logger log.Logger } -func (checkPointTracker *ConsumerCheckPointTracker) setPersistentCheckPoint(cursor string) { - checkPointTracker.lastPersistentCheckPoint = cursor +func initConsumerCheckpointTracker(shardId int, consumerClient *ConsumerClient, consumerHeatBeat *ConsumerHeartBeat, logger log.Logger) *DefaultCheckPointTracker { + checkpointTracker := &DefaultCheckPointTracker{ + client: consumerClient, + heartBeat: consumerHeatBeat, + shardId: shardId, + logger: logger, + } + return checkpointTracker } -func (checkPointTracker *ConsumerCheckPointTracker) flushCheckPoint() error { - if checkPointTracker.tempCheckPoint != "" && checkPointTracker.tempCheckPoint != checkPointTracker.lastPersistentCheckPoint { - if err := checkPointTracker.client.updateCheckPoint(checkPointTracker.trackerShardId, checkPointTracker.tempCheckPoint, true); err != nil { - return err - } +func (tracker *DefaultCheckPointTracker) initCheckPoint(cursor string) { + tracker.savedCheckPoint = cursor +} - checkPointTracker.lastPersistentCheckPoint = checkPointTracker.tempCheckPoint +func (tracker *DefaultCheckPointTracker) SaveCheckPoint(force bool) error { + tracker.pendingCheckPoint = tracker.nextCursor + if force { + return tracker.flushCheckPoint() } + return nil } -func (checkPointTracker *ConsumerCheckPointTracker) flushCheck() { - currentTime := time.Now().Unix() - if currentTime > checkPointTracker.lastCheckTime+checkPointTracker.defaultFlushCheckPointIntervalSec { - if err := checkPointTracker.flushCheckPoint(); err != nil { - level.Warn(checkPointTracker.logger).Log("msg", "update checkpoint get error", "error", err) - } else { - checkPointTracker.lastCheckTime = currentTime +func (tracker *DefaultCheckPointTracker) GetCheckPoint() string { + return tracker.savedCheckPoint +} + +func (tracker *DefaultCheckPointTracker) GetCurrentCursor() string { + return tracker.currentCursor +} + +func (tracker *DefaultCheckPointTracker) setCurrentCheckPoint(cursor string) { + tracker.currentCursor = cursor +} + +func (tracker *DefaultCheckPointTracker) setNextCursor(cursor string) { + tracker.nextCursor = cursor +} + +func (tracker *DefaultCheckPointTracker) flushCheckPoint() error { + if tracker.pendingCheckPoint == "" || tracker.pendingCheckPoint == tracker.savedCheckPoint { + return nil + } + for i := 0; ; i++ { + err := tracker.client.updateCheckPoint(tracker.shardId, tracker.pendingCheckPoint, true) + if err == nil { + break + } + slsErr, ok := err.(*sls.Error) + if ok { + if strings.EqualFold(slsErr.Code, "ConsumerNotExsit") || strings.EqualFold(slsErr.Code, "ConsumerNotMatch") { + tracker.heartBeat.removeHeartShard(tracker.shardId) + level.Warn(tracker.logger).Log("msg", "consumer has been removed or shard has been reassigned", "shard", tracker.shardId, "err", slsErr) + break + } else if strings.EqualFold(slsErr.Code, "ShardNotExsit") { + tracker.heartBeat.removeHeartShard(tracker.shardId) + level.Warn(tracker.logger).Log("msg", "shard does not exist", "shard", tracker.shardId) + break + } + } + if i >= 2 { + level.Error(tracker.logger).Log( + "msg", "failed to save checkpoint", + "consumer", tracker.client.option.ConsumerName, + "shard", tracker.shardId, + "checkpoint", tracker.pendingCheckPoint, + ) + return err } + time.Sleep(100 * time.Millisecond) } -} -func (checkPointTracker *ConsumerCheckPointTracker) getCheckPoint() string { - return checkPointTracker.tempCheckPoint + tracker.savedCheckPoint = tracker.pendingCheckPoint + return nil } diff --git a/consumer/config.go b/consumer/config.go index 1a37588c..66284961 100644 --- a/consumer/config.go +++ b/consumer/config.go @@ -34,6 +34,9 @@ type LogHubConfig struct { // deleted.) //:param LogCompass: Compress determines if the rotated log files should be compressed using gzip. //:param HTTPClient: custom http client for sending data to sls + //:param AutoCommitDisabled: whether to disable commit checkpoint automatically, default is false, means auto commit checkpoint + // Note that if you set autocommit to false, you must use InitConsumerWorkerWithCheckpointTracker instead of InitConsumerWorker + //:param AutoCommitIntervalInSec: default auto commit interval, default is 30 Endpoint string AccessKeyID string @@ -56,17 +59,20 @@ type LogHubConfig struct { LogCompass bool HTTPClient *http.Client SecurityToken string + AutoCommitDisabled bool + AutoCommitIntervalInMS int64 } const ( - BEGIN_CURSOR = "BEGIN_CURSOR" - END_CURSOR = "END_CURSOR" - SPECIAL_TIMER_CURSOR = "SPECIAL_TIMER_CURSOR" - INITIALIZING = "INITIALIZING" - INITIALIZING_DONE = "INITIALIZING_DONE" - PULL_PROCESSING = "PULL_PROCESSING" - PULL_PROCESSING_DONE = "PULL_PROCESSING_DONE" - CONSUME_PROCESSING = "CONSUME_PROCESSING" - CONSUME_PROCESSING_DONE = "CONSUME_PROCESSING_DONE" - SHUTDOWN_COMPLETE = "SHUTDOWN_COMPLETE" + BEGIN_CURSOR = "BEGIN_CURSOR" + END_CURSOR = "END_CURSOR" + SPECIAL_TIMER_CURSOR = "SPECIAL_TIMER_CURSOR" +) + +const ( + INITIALIZING = "INITIALIZING" + PULLING = "PULLING" + PROCESSING = "PROCESSING" + SHUTTING_DOWN = "SHUTTING_DOWN" + SHUTDOWN_COMPLETE = "SHUTDOWN_COMPLETE" ) diff --git a/consumer/consumer_client.go b/consumer/consumer_client.go index 3f474c15..0f098670 100644 --- a/consumer/consumer_client.go +++ b/consumer/consumer_client.go @@ -27,6 +27,9 @@ func initConsumerClient(option LogHubConfig, logger log.Logger) *ConsumerClient if option.MaxFetchLogGroupCount == 0 { option.MaxFetchLogGroupCount = 1000 } + if option.AutoCommitIntervalInMS == 0 { + option.AutoCommitIntervalInMS = 60 * 1000 + } client := &sls.Client{ Endpoint: option.Endpoint, AccessKeyID: option.AccessKeyID, diff --git a/consumer/heart_beat.go b/consumer/heart_beat.go index cf2d10ca..acc774fe 100644 --- a/consumer/heart_beat.go +++ b/consumer/heart_beat.go @@ -10,19 +10,18 @@ import ( "go.uber.org/atomic" ) -var shardLock sync.RWMutex - -type ConsumerHeatBeat struct { +type ConsumerHeartBeat struct { client *ConsumerClient shutDownFlag *atomic.Bool heldShards []int heartShards []int logger log.Logger lastHeartBeatSuccessTime int64 + shardLock sync.RWMutex } -func initConsumerHeatBeat(consumerClient *ConsumerClient, logger log.Logger) *ConsumerHeatBeat { - consumerHeatBeat := &ConsumerHeatBeat{ +func initConsumerHeatBeat(consumerClient *ConsumerClient, logger log.Logger) *ConsumerHeartBeat { + consumerHeartBeat := &ConsumerHeartBeat{ client: consumerClient, shutDownFlag: atomic.NewBool(false), heldShards: []int{}, @@ -30,77 +29,77 @@ func initConsumerHeatBeat(consumerClient *ConsumerClient, logger log.Logger) *Co logger: logger, lastHeartBeatSuccessTime: time.Now().Unix(), } - return consumerHeatBeat + return consumerHeartBeat } -func (consumerHeatBeat *ConsumerHeatBeat) getHeldShards() []int { - shardLock.RLock() - defer shardLock.RUnlock() - return consumerHeatBeat.heldShards +func (heartbeat *ConsumerHeartBeat) getHeldShards() []int { + heartbeat.shardLock.RLock() + defer heartbeat.shardLock.RUnlock() + return heartbeat.heldShards } -func (consumerHeatBeat *ConsumerHeatBeat) setHeldShards(heldShards []int) { - shardLock.Lock() - defer shardLock.Unlock() - consumerHeatBeat.heldShards = heldShards +func (heartbeat *ConsumerHeartBeat) setHeldShards(heldShards []int) { + heartbeat.shardLock.Lock() + defer heartbeat.shardLock.Unlock() + heartbeat.heldShards = heldShards } -func (consumerHeatBeat *ConsumerHeatBeat) setHeartShards(heartShards []int) { - shardLock.Lock() - defer shardLock.Unlock() - consumerHeatBeat.heartShards = heartShards +func (heartbeat *ConsumerHeartBeat) setHeartShards(heartShards []int) { + heartbeat.shardLock.Lock() + defer heartbeat.shardLock.Unlock() + heartbeat.heartShards = heartShards } -func (consumerHeatBeat *ConsumerHeatBeat) getHeartShards() []int { - shardLock.RLock() - defer shardLock.RUnlock() - return consumerHeatBeat.heartShards +func (heartbeat *ConsumerHeartBeat) getHeartShards() []int { + heartbeat.shardLock.RLock() + defer heartbeat.shardLock.RUnlock() + return heartbeat.heartShards } -func (consumerHeatBeat *ConsumerHeatBeat) shutDownHeart() { - level.Info(consumerHeatBeat.logger).Log("msg", "try to stop heart beat") - consumerHeatBeat.shutDownFlag.Store(true) +func (heartbeat *ConsumerHeartBeat) shutDownHeart() { + level.Info(heartbeat.logger).Log("msg", "try to stop heart beat") + heartbeat.shutDownFlag.Store(true) } -func (consumerHeatBeat *ConsumerHeatBeat) heartBeatRun() { +func (heartbeat *ConsumerHeartBeat) heartBeatRun() { var lastHeartBeatTime int64 - for !consumerHeatBeat.shutDownFlag.Load() { + for !heartbeat.shutDownFlag.Load() { lastHeartBeatTime = time.Now().Unix() - uploadShards := append(consumerHeatBeat.heartShards, consumerHeatBeat.heldShards...) - consumerHeatBeat.setHeartShards(Set(uploadShards)) - responseShards, err := consumerHeatBeat.client.heartBeat(consumerHeatBeat.getHeartShards()) + uploadShards := append(heartbeat.heartShards, heartbeat.heldShards...) + heartbeat.setHeartShards(Set(uploadShards)) + responseShards, err := heartbeat.client.heartBeat(heartbeat.getHeartShards()) if err != nil { - level.Warn(consumerHeatBeat.logger).Log("msg", "send heartbeat error", "error", err) - if time.Now().Unix()-consumerHeatBeat.lastHeartBeatSuccessTime > int64(consumerHeatBeat.client.consumerGroup.Timeout+consumerHeatBeat.client.option.HeartbeatIntervalInSecond) { - consumerHeatBeat.setHeldShards([]int{}) - level.Info(consumerHeatBeat.logger).Log("msg", "Heart beat timeout, automatic reset consumer held shards") + level.Warn(heartbeat.logger).Log("msg", "send heartbeat error", "error", err) + if time.Now().Unix()-heartbeat.lastHeartBeatSuccessTime > int64(heartbeat.client.consumerGroup.Timeout+heartbeat.client.option.HeartbeatIntervalInSecond) { + heartbeat.setHeldShards([]int{}) + level.Info(heartbeat.logger).Log("msg", "Heart beat timeout, automatic reset consumer held shards") } } else { - consumerHeatBeat.lastHeartBeatSuccessTime = time.Now().Unix() - level.Info(consumerHeatBeat.logger).Log("heart beat result", fmt.Sprintf("%v", consumerHeatBeat.heartShards), "get", fmt.Sprintf("%v", responseShards)) - consumerHeatBeat.setHeldShards(responseShards) - if !IntSliceReflectEqual(consumerHeatBeat.getHeartShards(), consumerHeatBeat.getHeldShards()) { - currentSet := Set(consumerHeatBeat.getHeartShards()) - responseSet := Set(consumerHeatBeat.getHeldShards()) + heartbeat.lastHeartBeatSuccessTime = time.Now().Unix() + level.Info(heartbeat.logger).Log("heart beat result", fmt.Sprintf("%v", heartbeat.heartShards), "get", fmt.Sprintf("%v", responseShards)) + heartbeat.setHeldShards(responseShards) + if !IntSliceReflectEqual(heartbeat.getHeartShards(), heartbeat.getHeldShards()) { + currentSet := Set(heartbeat.getHeartShards()) + responseSet := Set(heartbeat.getHeldShards()) add := Subtract(currentSet, responseSet) remove := Subtract(responseSet, currentSet) - level.Info(consumerHeatBeat.logger).Log("shard reorganize, adding:", fmt.Sprintf("%v", add), "removing:", fmt.Sprintf("%v", remove)) + level.Info(heartbeat.logger).Log("shard reorganize, adding:", fmt.Sprintf("%v", add), "removing:", fmt.Sprintf("%v", remove)) } } - TimeToSleepInSecond(int64(consumerHeatBeat.client.option.HeartbeatIntervalInSecond), lastHeartBeatTime, consumerHeatBeat.shutDownFlag.Load()) + TimeToSleepInSecond(int64(heartbeat.client.option.HeartbeatIntervalInSecond), lastHeartBeatTime, heartbeat.shutDownFlag.Load()) } - level.Info(consumerHeatBeat.logger).Log("msg", "heart beat exit") + level.Info(heartbeat.logger).Log("msg", "heart beat exit") } -func (consumerHeatBeat *ConsumerHeatBeat) removeHeartShard(shardId int) bool { - shardLock.Lock() - defer shardLock.Unlock() +func (heartbeat *ConsumerHeartBeat) removeHeartShard(shardId int) bool { + heartbeat.shardLock.Lock() + defer heartbeat.shardLock.Unlock() isDeleteShard := false - for i, heartShard := range consumerHeatBeat.heartShards { + for i, heartShard := range heartbeat.heartShards { if shardId == heartShard { - consumerHeatBeat.heartShards = append(consumerHeatBeat.heartShards[:i], consumerHeatBeat.heartShards[i+1:]...) + heartbeat.heartShards = append(heartbeat.heartShards[:i], heartbeat.heartShards[i+1:]...) isDeleteShard = true break } diff --git a/consumer/processor.go b/consumer/processor.go new file mode 100644 index 00000000..35b024d9 --- /dev/null +++ b/consumer/processor.go @@ -0,0 +1,19 @@ +package consumerLibrary + +import sls "github.com/aliyun/aliyun-log-go-sdk" + +type Processor interface { + Process(int, *sls.LogGroupList, CheckPointTracker) (string, error) + Shutdown(CheckPointTracker) error +} + +type ProcessFunc func(int, *sls.LogGroupList, CheckPointTracker) (string, error) + +func (processor ProcessFunc) Process(shard int, lgList *sls.LogGroupList, checkpointTracker CheckPointTracker) (string, error) { + return processor(shard, lgList, checkpointTracker) +} + +func (processor ProcessFunc) Shutdown(checkpointTracker CheckPointTracker) error { + // Do nothing + return nil +} diff --git a/consumer/shard_worker.go b/consumer/shard_worker.go index 278b19d7..39a6fa6b 100644 --- a/consumer/shard_worker.go +++ b/consumer/shard_worker.go @@ -1,214 +1,192 @@ package consumerLibrary import ( - "github.com/aliyun/aliyun-log-go-sdk" - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" "sync" "time" -) -var shutDownLock sync.RWMutex -var consumeStatusLock sync.RWMutex -var consumerTaskLock sync.RWMutex + sls "github.com/aliyun/aliyun-log-go-sdk" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" +) type ShardConsumerWorker struct { - client *ConsumerClient - consumerCheckPointTracker *ConsumerCheckPointTracker - consumerShutDownFlag bool - lastFetchLogGroupList *sls.LogGroupList - nextFetchCursor string - lastFetchGroupCount int - lastFetchtime int64 - consumerStatus string - process func(shard int, logGroup *sls.LogGroupList) string - shardId int - tempCheckPoint string - isCurrentDone bool - logger log.Logger - lastFetchTimeForForceFlushCpt int64 - isFlushCheckpointDone bool - rollBackCheckpoint string + client *ConsumerClient + consumerCheckPointTracker *DefaultCheckPointTracker + shutdownFlag bool + lastFetchLogGroupList *sls.LogGroupList + nextFetchCursor string + lastFetchGroupCount int + lastFetchTime time.Time + consumerStatus string + processor Processor + shardId int + // TODO: refine to channel + isCurrentDone bool + logger log.Logger + // unix time + lastCheckpointSaveTime time.Time + + taskLock sync.RWMutex + statusLock sync.RWMutex } func (consumer *ShardConsumerWorker) setConsumerStatus(status string) { - consumeStatusLock.Lock() - defer consumeStatusLock.Unlock() + consumer.statusLock.Lock() + defer consumer.statusLock.Unlock() consumer.consumerStatus = status } func (consumer *ShardConsumerWorker) getConsumerStatus() string { - consumeStatusLock.RLock() - defer consumeStatusLock.RUnlock() + consumer.statusLock.RLock() + defer consumer.statusLock.RUnlock() return consumer.consumerStatus } -func initShardConsumerWorker(shardId int, consumerClient *ConsumerClient, do func(shard int, logGroup *sls.LogGroupList) string, logger log.Logger) *ShardConsumerWorker { +func initShardConsumerWorker(shardId int, consumerClient *ConsumerClient, consumerHeartBeat *ConsumerHeartBeat, processor Processor, logger log.Logger) *ShardConsumerWorker { shardConsumeWorker := &ShardConsumerWorker{ - consumerShutDownFlag: false, - process: do, - consumerCheckPointTracker: initConsumerCheckpointTracker(shardId, consumerClient, logger), - client: consumerClient, - consumerStatus: INITIALIZING, - shardId: shardId, - lastFetchtime: 0, - isCurrentDone: true, - isFlushCheckpointDone: true, - logger: logger, - lastFetchTimeForForceFlushCpt: 0, - rollBackCheckpoint: "", + shutdownFlag: false, + processor: processor, + consumerCheckPointTracker: initConsumerCheckpointTracker(shardId, consumerClient, consumerHeartBeat, logger), + client: consumerClient, + consumerStatus: INITIALIZING, + shardId: shardId, + lastFetchTime: time.Now(), + isCurrentDone: true, + logger: logger, } return shardConsumeWorker } func (consumer *ShardConsumerWorker) consume() { - if consumer.consumerShutDownFlag { - consumer.setIsFlushCheckpointDoneToFalse() - go func() { - // If the data is not consumed, save the tempCheckPoint to the server - if consumer.getConsumerStatus() == PULL_PROCESSING_DONE { - consumer.consumerCheckPointTracker.tempCheckPoint = consumer.tempCheckPoint - } else if consumer.getConsumerStatus() == CONSUME_PROCESSING { - level.Info(consumer.logger).Log("msg", "Consumption is in progress, waiting for consumption to be completed") - consumer.setIsFlushCheckpointDoneToTrue() - return - } - err := consumer.consumerCheckPointTracker.flushCheckPoint() - if err != nil { - level.Warn(consumer.logger).Log("msg", "Flush checkpoint error,prepare for retry", "error message:", err) - } else { - consumer.setConsumerStatus(SHUTDOWN_COMPLETE) - level.Info(consumer.logger).Log("msg", "shardworker are shut down complete", "shardWorkerId", consumer.shardId) - } - consumer.setIsFlushCheckpointDoneToTrue() - }() - } else if consumer.getConsumerStatus() == INITIALIZING { - consumer.setConsumerIsCurrentDoneToFalse() + if !consumer.isTaskDone() { + return + } + + // start a new task + // initial task / fetch data task / processing task / shutdown task + consumer.setTaskDoneFlag(false) + switch consumer.getConsumerStatus() { + case INITIALIZING: go func() { cursor, err := consumer.consumerInitializeTask() - if err != nil { - consumer.setConsumerStatus(INITIALIZING) - } else { + if err == nil { consumer.nextFetchCursor = cursor - consumer.setConsumerStatus(INITIALIZING_DONE) } - consumer.setConsumerIsCurrentDoneToTrue() + consumer.updateStatus(err == nil) }() - } else if consumer.getConsumerStatus() == INITIALIZING_DONE || consumer.getConsumerStatus() == CONSUME_PROCESSING_DONE { - consumer.setConsumerIsCurrentDoneToFalse() - consumer.setConsumerStatus(PULL_PROCESSING) + case PULLING: go func() { - var isGenerateFetchTask = true - // throttling control, similar as Java's SDK - if consumer.lastFetchGroupCount < 100 { - // The time used here is in milliseconds. - isGenerateFetchTask = (time.Now().UnixNano()/1e6 - consumer.lastFetchtime) > 500 - } else if consumer.lastFetchGroupCount < 500 { - isGenerateFetchTask = (time.Now().UnixNano()/1e6 - consumer.lastFetchtime) > 200 - } else if consumer.lastFetchGroupCount < 1000 { - isGenerateFetchTask = (time.Now().UnixNano()/1e6 - consumer.lastFetchtime) > 50 - } - if isGenerateFetchTask { - consumer.lastFetchtime = time.Now().UnixNano() / 1e6 - // Set the logback cursor. If the logs are not consumed, save the logback cursor to the server. - consumer.tempCheckPoint = consumer.nextFetchCursor - - logGroupList, nextCursor, err := consumer.consumerFetchTask() - if err != nil { - consumer.setConsumerStatus(INITIALIZING_DONE) - } else { - consumer.lastFetchLogGroupList = logGroupList - consumer.nextFetchCursor = nextCursor - consumer.consumerCheckPointTracker.setMemoryCheckPoint(consumer.nextFetchCursor) - consumer.lastFetchGroupCount = GetLogGroupCount(consumer.lastFetchLogGroupList) - level.Debug(consumer.logger).Log("shardId", consumer.shardId, "fetch log count", GetLogCount(consumer.lastFetchLogGroupList)) - if consumer.lastFetchGroupCount == 0 { - consumer.lastFetchLogGroupList = nil - } else { - consumer.lastFetchTimeForForceFlushCpt = time.Now().Unix() - } - if time.Now().Unix()-consumer.lastFetchTimeForForceFlushCpt > 30 { - err := consumer.consumerCheckPointTracker.flushCheckPoint() - if err != nil { - level.Warn(consumer.logger).Log("msg", "Failed to save the final checkpoint", "error:", err) - } else { - consumer.lastFetchTimeForForceFlushCpt = 0 - } - - } - consumer.setConsumerStatus(PULL_PROCESSING_DONE) - } - } else { + if !consumer.shouldFetch() { level.Debug(consumer.logger).Log("msg", "Pull Log Current Limitation and Re-Pull Log") - consumer.setConsumerStatus(INITIALIZING_DONE) + consumer.updateStatus(false) + return } - consumer.setConsumerIsCurrentDoneToTrue() + err := consumer.nextFetchTask() + consumer.updateStatus(err == nil && consumer.lastFetchGroupCount > 0) }() - } else if consumer.getConsumerStatus() == PULL_PROCESSING_DONE { - consumer.setConsumerIsCurrentDoneToFalse() - consumer.setConsumerStatus(CONSUME_PROCESSING) + case PROCESSING: go func() { - rollBackCheckpoint := consumer.consumerProcessTask() + rollBackCheckpoint, err := consumer.consumerProcessTask() + if err != nil { + level.Warn(consumer.logger).Log("messge", "process failed", "err", err) + } if rollBackCheckpoint != "" { consumer.nextFetchCursor = rollBackCheckpoint - level.Info(consumer.logger).Log("msg", "Checkpoints set for users have been reset", "shardWorkerId", consumer.shardId, "rollBackCheckpoint", rollBackCheckpoint) + level.Info(consumer.logger).Log( + "msg", "Checkpoints set for users have been reset", + "shardId", consumer.shardId, + "rollBackCheckpoint", rollBackCheckpoint, + ) } - consumer.lastFetchLogGroupList = nil - consumer.setConsumerStatus(CONSUME_PROCESSING_DONE) - consumer.setConsumerIsCurrentDoneToTrue() + consumer.updateStatus(err == nil) }() - } + case SHUTTING_DOWN: + go func() { + err := consumer.processor.Shutdown(consumer.consumerCheckPointTracker) + if err != nil { + level.Error(consumer.logger).Log("msg", "failed to call processor shutdown", "err", err) + consumer.updateStatus(false) + return + } + err = consumer.consumerCheckPointTracker.flushCheckPoint() + if err == nil { + level.Info(consumer.logger).Log("msg", "shard worker status shutdown_complete", "shardWorkerId", consumer.shardId) + } else { + level.Warn(consumer.logger).Log("msg", "failed to flush checkpoint when shutdown", "err", err) + } + + consumer.updateStatus(err == nil) + }() + default: + consumer.setTaskDoneFlag(true) + } } -func (consumer *ShardConsumerWorker) consumerShutDown() { - consumer.consumerShutDownFlag = true - if !consumer.isShutDownComplete() { - if consumer.getIsFlushCheckpointDoneStatus() { - consumer.consume() - } else { - return +func (consumer *ShardConsumerWorker) updateStatus(success bool) { + status := consumer.getConsumerStatus() + if status == SHUTTING_DOWN { + if success { + consumer.setConsumerStatus(SHUTDOWN_COMPLETE) + } + } else if consumer.shutdownFlag { + consumer.setConsumerStatus(SHUTTING_DOWN) + } else if success { + switch status { + case PULLING: + consumer.setConsumerStatus(PROCESSING) + case INITIALIZING,PROCESSING: + consumer.setConsumerStatus(PULLING) } } -} -func (consumer *ShardConsumerWorker) isShutDownComplete() bool { - return consumer.getConsumerStatus() == SHUTDOWN_COMPLETE + consumer.setTaskDoneFlag(true) } -func (consumer *ShardConsumerWorker) setConsumerIsCurrentDoneToFalse() { - consumerTaskLock.Lock() - defer consumerTaskLock.Unlock() - consumer.isCurrentDone = false - +func (consumer *ShardConsumerWorker) shouldFetch() bool { + if consumer.lastFetchGroupCount >= 1000 { + return true + } + duration := time.Since(consumer.lastFetchTime) + if consumer.lastFetchGroupCount < 100 { + // The time used here is in milliseconds. + return duration > 500*time.Millisecond + } else if consumer.lastFetchGroupCount < 500 { + return duration > 200*time.Millisecond + } else { // 500 - 1000 + return duration > 50*time.Millisecond + } } -func (consumer *ShardConsumerWorker) setConsumerIsCurrentDoneToTrue() { - consumerTaskLock.Lock() - defer consumerTaskLock.Unlock() - consumer.isCurrentDone = true +func (consumer *ShardConsumerWorker) saveCheckPointIfNeeded() { + if consumer.client.option.AutoCommitDisabled { + return + } + if time.Since(consumer.lastCheckpointSaveTime) > time.Millisecond*time.Duration(consumer.client.option.AutoCommitIntervalInMS) { + consumer.consumerCheckPointTracker.flushCheckPoint() + consumer.lastCheckpointSaveTime = time.Now() + } } -func (consumer *ShardConsumerWorker) getConsumerIsCurrentDoneStatus() bool { - consumerTaskLock.RLock() - defer consumerTaskLock.RUnlock() - return consumer.isCurrentDone +func (consumer *ShardConsumerWorker) consumerShutDown() { + consumer.shutdownFlag = true + if !consumer.isShutDownComplete() { + consumer.consume() + } } -func (consumer *ShardConsumerWorker) setIsFlushCheckpointDoneToFalse() { - shutDownLock.Lock() - defer shutDownLock.Unlock() - consumer.isFlushCheckpointDone = false +func (consumer *ShardConsumerWorker) isShutDownComplete() bool { + return consumer.getConsumerStatus() == SHUTDOWN_COMPLETE } -func (consumer *ShardConsumerWorker) setIsFlushCheckpointDoneToTrue() { - shutDownLock.Lock() - defer shutDownLock.Unlock() - consumer.isFlushCheckpointDone = true +func (consumer *ShardConsumerWorker) setTaskDoneFlag(done bool) { + consumer.taskLock.Lock() + defer consumer.taskLock.Unlock() + consumer.isCurrentDone = done } -func (consumer *ShardConsumerWorker) getIsFlushCheckpointDoneStatus() bool { - shutDownLock.RLock() - defer shutDownLock.RUnlock() - return consumer.isFlushCheckpointDone +func (consumer *ShardConsumerWorker) isTaskDone() bool { + consumer.taskLock.RLock() + defer consumer.taskLock.RUnlock() + return consumer.isCurrentDone } diff --git a/consumer/tasks.go b/consumer/tasks.go index eb6429d5..c2f5512f 100644 --- a/consumer/tasks.go +++ b/consumer/tasks.go @@ -6,17 +6,17 @@ import ( "runtime" "time" - sls "github.com/aliyun/aliyun-log-go-sdk" "github.com/go-kit/kit/log/level" ) func (consumer *ShardConsumerWorker) consumerInitializeTask() (string, error) { + // read checkpoint firstly checkpoint, err := consumer.client.getCheckPoint(consumer.shardId) if err != nil { - return checkpoint, err + return "", err } if checkpoint != "" && err == nil { - consumer.consumerCheckPointTracker.setPersistentCheckPoint(checkpoint) + consumer.consumerCheckPointTracker.initCheckPoint(checkpoint) return checkpoint, nil } @@ -38,53 +38,58 @@ func (consumer *ShardConsumerWorker) consumerInitializeTask() (string, error) { cursor, err := consumer.client.getCursor(consumer.shardId, fmt.Sprintf("%v", consumer.client.option.CursorStartTime)) if err != nil { level.Warn(consumer.logger).Log("msg", "get specialCursor error", "shard", consumer.shardId, "error", err) - } return cursor, err } - level.Info(consumer.logger).Log("msg", "CursorPosition setting error, please reset with BEGIN_CURSOR or END_CURSOR or SPECIAL_TIMER_CURSOR") + level.Warn(consumer.logger).Log("msg", "CursorPosition setting error, please reset with BEGIN_CURSOR or END_CURSOR or SPECIAL_TIMER_CURSOR") return "", errors.New("CursorPositionError") } -func (consumer *ShardConsumerWorker) consumerFetchTask() (*sls.LogGroupList, string, error) { - logGroup, next_cursor, err := consumer.client.pullLogs(consumer.shardId, consumer.nextFetchCursor) - return logGroup, next_cursor, err +func (consumer *ShardConsumerWorker) nextFetchTask() error { + // update last fetch time, for control fetch frequency + consumer.lastFetchTime = time.Now() + + logGroup, nextCursor, err := consumer.client.pullLogs(consumer.shardId, consumer.nextFetchCursor) + if err != nil { + return err + } + // set cursors user to decide whether to save according to the execution of `process` + consumer.consumerCheckPointTracker.setCurrentCheckPoint(consumer.nextFetchCursor) + consumer.lastFetchLogGroupList = logGroup + consumer.nextFetchCursor = nextCursor + consumer.lastFetchGroupCount = GetLogGroupCount(consumer.lastFetchLogGroupList) + consumer.consumerCheckPointTracker.setNextCursor(consumer.nextFetchCursor) + level.Debug(consumer.logger).Log( + "shardId", consumer.shardId, + "fetch log count", consumer.lastFetchGroupCount, + ) + if consumer.lastFetchGroupCount == 0 { + consumer.lastFetchLogGroupList = nil + // may no new data can be pulled, no process func can trigger checkpoint saving + consumer.saveCheckPointIfNeeded() + } + + return nil } -func (consumer *ShardConsumerWorker) consumerProcessTask() string { +func (consumer *ShardConsumerWorker) consumerProcessTask() (rollBackCheckpoint string, err error) { // If the user's consumption function reports a panic error, it will be captured and retry until sucessed. defer func() { if r := recover(); r != nil { stackBuf := make([]byte, 1<<16) - runtime.Stack(stackBuf, false) - level.Error(consumer.logger).Log("msg", "get panic in your process function", "error", r, "stack", string(stackBuf)) - for { - if consumer.consumerRetryProcessTask() { - break - } else { - time.Sleep(time.Second * 2) - } - } + n := runtime.Stack(stackBuf, false) + level.Error(consumer.logger).Log("msg", "get panic in your process function", "error", r, "stack", stackBuf[:n]) + err = fmt.Errorf("get a panic when process: %v", r) } }() if consumer.lastFetchLogGroupList != nil { - consumer.rollBackCheckpoint = consumer.process(consumer.shardId, consumer.lastFetchLogGroupList) - consumer.consumerCheckPointTracker.flushCheck() - } - return consumer.rollBackCheckpoint -} - -func (consumer *ShardConsumerWorker) consumerRetryProcessTask() bool { - level.Info(consumer.logger).Log("msg", "Start retrying the process function") - defer func() { - if r := recover(); r != nil { - stackBuf := make([]byte, 1<<16) - runtime.Stack(stackBuf, false) - level.Error(consumer.logger).Log("msg", "get panic in your process function", "error", r, "stack", string(stackBuf)) + rollBackCheckpoint, err = consumer.processor.Process(consumer.shardId, consumer.lastFetchLogGroupList, consumer.consumerCheckPointTracker) + consumer.saveCheckPointIfNeeded() + if err != nil { + return } - }() - consumer.rollBackCheckpoint = consumer.process(consumer.shardId, consumer.lastFetchLogGroupList) - consumer.consumerCheckPointTracker.flushCheck() - return true + consumer.lastFetchLogGroupList = nil + } + return } diff --git a/consumer/util.go b/consumer/util.go index 17669cb9..cff93a8c 100644 --- a/consumer/util.go +++ b/consumer/util.go @@ -1,9 +1,10 @@ package consumerLibrary import ( - "github.com/aliyun/aliyun-log-go-sdk" "reflect" "time" + + sls "github.com/aliyun/aliyun-log-go-sdk" ) // List removal of duplicate elements diff --git a/consumer/worker.go b/consumer/worker.go index 5cfa59f3..294ccbe6 100644 --- a/consumer/worker.go +++ b/consumer/worker.go @@ -13,16 +13,45 @@ import ( ) type ConsumerWorker struct { - consumerHeatBeat *ConsumerHeatBeat + consumerHeatBeat *ConsumerHeartBeat client *ConsumerClient workerShutDownFlag *atomic.Bool shardConsumer sync.Map // map[int]*ShardConsumerWorker - do func(shard int, logGroup *sls.LogGroupList) string + processor Processor waitGroup sync.WaitGroup Logger log.Logger } +// depreciated: this old logic is to automatically save to memory, and then commit at a fixed time +// we highly recommend you to use InitConsumerWorkerWithCheckpointTracker func InitConsumerWorker(option LogHubConfig, do func(int, *sls.LogGroupList) string) *ConsumerWorker { + if option.AutoCommitDisabled { + panic("auto commit already disabled, sdk will not save any checkpoint, " + + "please use InitConsumerWorkerWithCheckpointTracker or set AutoCommitDisabled to false") + } + return InitConsumerWorkerWithProcessor( + option, + ProcessFunc(func(shardId int, logGroupList *sls.LogGroupList, checkpointTracker CheckPointTracker) (string, error) { + cursor := do(shardId, logGroupList) + // keep the original logic + // if cursor is not empty, we don't save, + if cursor == "" { + checkpointTracker.SaveCheckPoint(false) + } + return cursor, nil + }), + ) +} + +// InitConsumerWorkerWithCheckpointTracker +// please note that you need to save after the process is successful, +func InitConsumerWorkerWithCheckpointTracker(option LogHubConfig, do func(int, *sls.LogGroupList, CheckPointTracker) (string, error)) *ConsumerWorker { + return InitConsumerWorkerWithProcessor(option, ProcessFunc(do)) +} + +// InitConsumerWorkerWithProcessor +// you need save checkpoint by yourself and can do something after consumer shutdown +func InitConsumerWorkerWithProcessor(option LogHubConfig, processor Processor) *ConsumerWorker { logger := logConfig(option) consumerClient := initConsumerClient(option, logger) consumerHeatBeat := initConsumerHeatBeat(consumerClient, logger) @@ -31,8 +60,8 @@ func InitConsumerWorker(option LogHubConfig, do func(int, *sls.LogGroupList) str client: consumerClient, workerShutDownFlag: atomic.NewBool(false), //shardConsumer: make(map[int]*ShardConsumerWorker), - do: do, - Logger: logger, + processor: processor, + Logger: logger, } if err := consumerClient.createConsumerGroup(); err != nil { level.Error(consumerWorker.Logger).Log( @@ -69,11 +98,7 @@ func (consumerWorker *ConsumerWorker) run() { break } shardConsumer := consumerWorker.getShardConsumer(shard) - if shardConsumer.getConsumerIsCurrentDoneStatus() { - shardConsumer.consume() - } else { - continue - } + shardConsumer.consume() } consumerWorker.cleanShardConsumer(heldShards) TimeToSleepInMillsecond(consumerWorker.client.option.DataFetchIntervalInMs, lastFetchTime, consumerWorker.workerShutDownFlag.Load()) @@ -93,7 +118,7 @@ func (consumerWorker *ConsumerWorker) shutDownAndWait() { consumer := value.(*ShardConsumerWorker) if !consumer.isShutDownComplete() { consumer.consumerShutDown() - } else if consumer.isShutDownComplete() { + } else { consumerWorker.shardConsumer.Delete(key) } return true @@ -111,7 +136,7 @@ func (consumerWorker *ConsumerWorker) getShardConsumer(shardId int) *ShardConsum if ok { return consumer.(*ShardConsumerWorker) } - consumerIns := initShardConsumerWorker(shardId, consumerWorker.client, consumerWorker.do, consumerWorker.Logger) + consumerIns := initShardConsumerWorker(shardId, consumerWorker.client, consumerWorker.consumerHeatBeat, consumerWorker.processor, consumerWorker.Logger) consumerWorker.shardConsumer.Store(shardId, consumerIns) return consumerIns diff --git a/example/consumer/copy_data/copy_data.go b/example/consumer/copy_data/copy_data.go index 3b163007..f692a3fd 100644 --- a/example/consumer/copy_data/copy_data.go +++ b/example/consumer/copy_data/copy_data.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "os/signal" + "syscall" sls "github.com/aliyun/aliyun-log-go-sdk" consumerLibrary "github.com/aliyun/aliyun-log-go-sdk/consumer" @@ -46,9 +47,9 @@ func main() { if err != nil { fmt.Println(err) } - consumerWorker := consumerLibrary.InitConsumerWorker(option, process) + consumerWorker := consumerLibrary.InitConsumerWorkerWithCheckpointTracker(option, process) ch := make(chan os.Signal) - signal.Notify(ch, os.Kill, os.Interrupt) + signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT) consumerWorker.Start() if _, ok := <-ch; ok { level.Info(consumerWorker.Logger).Log("msg", "get stop signal, start to stop consumer worker", "consumer worker name", option.ConsumerName) @@ -56,13 +57,16 @@ func main() { } } -func process(shardId int, logGroupList *sls.LogGroupList) string { +func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker consumerLibrary.CheckPointTracker) (string, error) { for _, logGroup := range logGroupList.LogGroups { err := client.PutLogs(option.Project, "copy-logstore", logGroup) if err != nil { fmt.Println(err) + // may lead to partial success + return "", err } } fmt.Printf("shardId %v processing works sucess\n", shardId) - return "" + checkpointTracker.SaveCheckPoint(false) + return "", nil } diff --git a/example/consumer/demo/simple_demo.go b/example/consumer/demo/simple_demo.go index 04b318b1..fc383241 100644 --- a/example/consumer/demo/simple_demo.go +++ b/example/consumer/demo/simple_demo.go @@ -28,9 +28,9 @@ func main() { CursorPosition: consumerLibrary.END_CURSOR, } - consumerWorker := consumerLibrary.InitConsumerWorker(option, process) - ch := make(chan os.Signal) - signal.Notify(ch, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR1, syscall.SIGUSR2) + consumerWorker := consumerLibrary.InitConsumerWorkerWithCheckpointTracker(option, process) + ch := make(chan os.Signal, 1) + signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) consumerWorker.Start() if _, ok := <-ch; ok { level.Info(consumerWorker.Logger).Log("msg", "get stop signal, start to stop consumer worker", "consumer worker name", option.ConsumerName) @@ -40,7 +40,8 @@ func main() { // Fill in your consumption logic here, and be careful not to change the parameters of the function and the return value, // otherwise you will report errors. -func process(shardId int, logGroupList *sls.LogGroupList) string { +func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker consumerLibrary.CheckPointTracker) (string, error) { fmt.Println(shardId, logGroupList) - return "" + checkpointTracker.SaveCheckPoint(false) + return "", nil } diff --git a/example/consumer/reset_checkpoint_demo.go b/example/consumer/reset_checkpoint_demo.go index 56aac993..323a4c71 100644 --- a/example/consumer/reset_checkpoint_demo.go +++ b/example/consumer/reset_checkpoint_demo.go @@ -2,13 +2,14 @@ package main import ( "fmt" - "github.com/aliyun/aliyun-log-go-sdk" - "github.com/aliyun/aliyun-log-go-sdk/consumer" - "github.com/go-kit/kit/log/level" "os" "os/signal" "syscall" "time" + + sls "github.com/aliyun/aliyun-log-go-sdk" + consumerLibrary "github.com/aliyun/aliyun-log-go-sdk/consumer" + "github.com/go-kit/kit/log/level" ) // README : @@ -19,7 +20,6 @@ import ( // shardMap 里面,就重置消费位点为当前时间的cursor, 从当前时间进行消费,不在消费存量数据。 // Note: 使用该demo时,消费组必须是之前创建过并存在的。 - func main() { option := consumerLibrary.LogHubConfig{ Endpoint: "", @@ -34,12 +34,12 @@ func main() { CursorPosition: consumerLibrary.BEGIN_CURSOR, } err := UpdateConsumerGroupCheckPoint(option) - if err != nil{ + if err != nil { fmt.Println(err) return } - consumerWorker := consumerLibrary.InitConsumerWorker(option, process) - ch := make(chan os.Signal) + consumerWorker := consumerLibrary.InitConsumerWorkerWithCheckpointTracker(option, process) + ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR1, syscall.SIGUSR2) consumerWorker.Start() if _, ok := <-ch; ok { @@ -50,34 +50,38 @@ func main() { // Fill in your consumption logic here, and be careful not to change the parameters of the function and the return value, // otherwise you will report errors. -func process(shardId int, logGroupList *sls.LogGroupList) string { - // 这里填入自己的消费逻辑 +func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker consumerLibrary.CheckPointTracker) (string, error) { + // 这里填入自己的消费处理逻辑 和 cpt保存逻辑 fmt.Println(logGroupList) - return "" + return "", nil } -func updateCheckpoint(config consumerLibrary.LogHubConfig,client sls.Client,shardId int) error { +func updateCheckpoint(config consumerLibrary.LogHubConfig, client *sls.Client, shardId int) error { from := fmt.Sprintf("%d", time.Now().Unix()) - cursor, err := client.GetCursor(config.Project,config.Logstore, shardId, from) + cursor, err := client.GetCursor(config.Project, config.Logstore, shardId, from) if err != nil { fmt.Println(err) } - return client.UpdateCheckpoint(config.Project,config.Logstore,config.ConsumerGroupName,"",shardId, cursor,true) + return client.UpdateCheckpoint(config.Project, config.Logstore, config.ConsumerGroupName, "", shardId, cursor, true) } func UpdateConsumerGroupCheckPoint(config consumerLibrary.LogHubConfig) error { - client := sls.Client{Endpoint:config.Endpoint, AccessKeyID:config.AccessKeyID, AccessKeySecret:config.AccessKeySecret} - shards, err := client.ListShards(config.Project,config.Logstore) + client := &sls.Client{ + Endpoint: config.Endpoint, + AccessKeyID: config.AccessKeyID, + AccessKeySecret: config.AccessKeySecret, + } + shards, err := client.ListShards(config.Project, config.Logstore) if err != nil { return err - }else { - for _,v := range shards { + } else { + for _, v := range shards { err = updateCheckpoint(config, client, v.ShardID) - if err != nil{ + if err != nil { return err } } } return nil -} \ No newline at end of file +}