Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[fix] consumer map race #130

Merged
merged 1 commit into from
Jul 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 46 additions & 34 deletions consumer/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"sync"
"time"

"github.com/aliyun/aliyun-log-go-sdk"
sls "github.com/aliyun/aliyun-log-go-sdk"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"go.uber.org/atomic"
Expand All @@ -16,7 +16,7 @@ type ConsumerWorker struct {
consumerHeatBeat *ConsumerHeatBeat
client *ConsumerClient
workerShutDownFlag *atomic.Bool
shardConsumer map[int]*ShardConsumerWorker
shardConsumer sync.Map // map[int]*ShardConsumerWorker
do func(shard int, logGroup *sls.LogGroupList) string
waitGroup sync.WaitGroup
Logger log.Logger
Expand All @@ -30,9 +30,9 @@ func InitConsumerWorker(option LogHubConfig, do func(int, *sls.LogGroupList) str
consumerHeatBeat: consumerHeatBeat,
client: consumerClient,
workerShutDownFlag: atomic.NewBool(false),
shardConsumer: make(map[int]*ShardConsumerWorker),
do: do,
Logger: logger,
//shardConsumer: make(map[int]*ShardConsumerWorker),
do: do,
Logger: logger,
}
consumerClient.createConsumerGroup()
return consumerWorker
Expand Down Expand Up @@ -82,50 +82,62 @@ func (consumerWorker *ConsumerWorker) run() {
func (consumerWorker *ConsumerWorker) shutDownAndWait() {
for {
time.Sleep(500 * time.Millisecond)
for shard, consumer := range consumerWorker.shardConsumer {
if !consumer.isShutDownComplete() {
consumer.consumerShutDown()
} else if consumer.isShutDownComplete() {
delete(consumerWorker.shardConsumer, shard)
}
}
if len(consumerWorker.shardConsumer) == 0 {
count := 0
consumerWorker.shardConsumer.Range(
func(key, value interface{}) bool {
count++
consumer := value.(*ShardConsumerWorker)
if !consumer.isShutDownComplete() {
consumer.consumerShutDown()
} else if consumer.isShutDownComplete() {
consumerWorker.shardConsumer.Delete(key)
}
return true
},
)
if count == 0 {
break
}
}

}

func (consumerWorker *ConsumerWorker) getShardConsumer(shardId int) *ShardConsumerWorker {
consumer := consumerWorker.shardConsumer[shardId]
if consumer != nil {
return consumer
consumer, ok := consumerWorker.shardConsumer.Load(shardId)
if ok {
return consumer.(*ShardConsumerWorker)
}
consumer = initShardConsumerWorker(shardId, consumerWorker.client, consumerWorker.do, consumerWorker.Logger)
consumerWorker.shardConsumer[shardId] = consumer
return consumer
consumerIns := initShardConsumerWorker(shardId, consumerWorker.client, consumerWorker.do, consumerWorker.Logger)
consumerWorker.shardConsumer.Store(shardId, consumerIns)
return consumerIns

}

func (consumerWorker *ConsumerWorker) cleanShardConsumer(owned_shards []int) {
for shard, consumer := range consumerWorker.shardConsumer {

if !Contain(shard, owned_shards) {
level.Info(consumerWorker.Logger).Log("msg", "try to call shut down for unassigned consumer shard", "shardId", shard)
consumer.consumerShutDown()
level.Info(consumerWorker.Logger).Log("msg", "Complete call shut down for unassigned consumer shard", "shardId", shard)
}
consumerWorker.shardConsumer.Range(
func(key, value interface{}) bool {
shard := key.(int)
consumer := value.(*ShardConsumerWorker)

if consumer.isShutDownComplete() {
isDeleteShard := consumerWorker.consumerHeatBeat.removeHeartShard(shard)
if isDeleteShard {
level.Info(consumerWorker.Logger).Log("msg", "Remove an assigned consumer shard", "shardId", shard)
delete(consumerWorker.shardConsumer, shard)
} else {
level.Info(consumerWorker.Logger).Log("msg", "Remove an assigned consumer shard failed", "shardId", shard)
if !Contain(shard, owned_shards) {
level.Info(consumerWorker.Logger).Log("msg", "try to call shut down for unassigned consumer shard", "shardId", shard)
consumer.consumerShutDown()
level.Info(consumerWorker.Logger).Log("msg", "Complete call shut down for unassigned consumer shard", "shardId", shard)
}
}
}

if consumer.isShutDownComplete() {
isDeleteShard := consumerWorker.consumerHeatBeat.removeHeartShard(shard)
if isDeleteShard {
level.Info(consumerWorker.Logger).Log("msg", "Remove an assigned consumer shard", "shardId", shard)
consumerWorker.shardConsumer.Delete(shard)
} else {
level.Info(consumerWorker.Logger).Log("msg", "Remove an assigned consumer shard failed", "shardId", shard)
}
}
return true
},
)
}

// This function is used to initialize the global log configuration
Expand Down
15 changes: 9 additions & 6 deletions producer/io_thread_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ func (threadPool *IoThreadPool) addTask(batch *ProducerBatch) {
func (threadPool *IoThreadPool) popTask() *ProducerBatch {
defer threadPool.lock.Unlock()
threadPool.lock.Lock()
if threadPool.queue.Len() <= 0 {
return nil
}
ele := threadPool.queue.Front()
threadPool.queue.Remove(ele)
return ele.Value.(*ProducerBatch)
Expand All @@ -50,12 +53,12 @@ func (threadPool *IoThreadPool) hasTask() bool {
func (threadPool *IoThreadPool) start(ioWorkerWaitGroup *sync.WaitGroup, ioThreadPoolwait *sync.WaitGroup) {
defer ioThreadPoolwait.Done()
for {
if threadPool.hasTask() {
select {
case threadPool.ioworker.maxIoWorker <- 1:
ioWorkerWaitGroup.Add(1)
go threadPool.ioworker.sendToServer(threadPool.popTask(), ioWorkerWaitGroup)
}
if task := threadPool.popTask(); task != nil {
threadPool.ioworker.startSendTask(ioWorkerWaitGroup)
go func(producerBatch *ProducerBatch) {
defer threadPool.ioworker.closeSendTask(ioWorkerWaitGroup)
threadPool.ioworker.sendToServer(producerBatch)
}(task)
} else {
if !threadPool.threadPoolShutDownFlag.Load() {
time.Sleep(100 * time.Millisecond)
Expand Down
17 changes: 9 additions & 8 deletions producer/io_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,9 @@ func initIoWorker(client sls.ClientInterface, retryQueue *RetryQueue, logger log
}
}

func (ioWorker *IoWorker) sendToServer(producerBatch *ProducerBatch, ioWorkerWaitGroup *sync.WaitGroup) {
if producerBatch == nil || ioWorkerWaitGroup == nil {
return
}
func (ioWorker *IoWorker) sendToServer(producerBatch *ProducerBatch) {
level.Debug(ioWorker.logger).Log("msg", "ioworker send data to server")
defer ioWorker.closeSendTask(ioWorkerWaitGroup)
var err error
atomic.AddInt64(&ioWorker.taskCount, 1)
if producerBatch.shardHash != nil {
err = ioWorker.client.PostLogStoreLogs(producerBatch.getProject(), producerBatch.getLogstore(), producerBatch.logGroup, producerBatch.getShardHash())
} else {
Expand Down Expand Up @@ -116,9 +111,15 @@ func (ioWorker *IoWorker) addErrorMessageToBatchAttempt(producerBatch *ProducerB
}

func (ioWorker *IoWorker) closeSendTask(ioWorkerWaitGroup *sync.WaitGroup) {
ioWorkerWaitGroup.Done()
atomic.AddInt64(&ioWorker.taskCount, -1)
<-ioWorker.maxIoWorker
atomic.AddInt64(&ioWorker.taskCount, -1)
ioWorkerWaitGroup.Done()
}

func (ioWorker *IoWorker) startSendTask(ioWorkerWaitGroup *sync.WaitGroup) {
atomic.AddInt64(&ioWorker.taskCount, 1)
ioWorker.maxIoWorker <- 1
ioWorkerWaitGroup.Add(1)
}

func (ioWorker *IoWorker) excuteFailedCallback(producerBatch *ProducerBatch) {
Expand Down