diff --git a/global/nats/manager/manager_test.go b/global/nats/manager/manager_test.go index e8fa82ac..35720d5d 100644 --- a/global/nats/manager/manager_test.go +++ b/global/nats/manager/manager_test.go @@ -5,27 +5,41 @@ import ( "errors" "reflect" "strconv" - "sync/atomic" "testing" "time" + "github.com/ZiRunHua/LeapLedger/util/rand" "github.com/google/uuid" + "golang.org/x/sync/errgroup" ) func init() { UpdateTestBackOff() } func TestSubscribeAndPublish(t *testing.T) { - var task = Task(t.Name() + uuid.NewString()) - taskList := []Task{task + "_1", task + "_2", task + "_3"} - var count atomic.Int32 - for _, name := range taskList { + var taskPrefix = Task(t.Name() + uuid.NewString()) + taskList := []struct { + Task Task + Msg []byte + }{ + { + taskPrefix + "_1", []byte("msg1"), + }, + { + taskPrefix + "_2", []byte("123"), + }, + { + taskPrefix + "_3", []byte("msg3"), + }, + } + msgChan := make(chan []byte, len(taskList)) + for _, task := range taskList { taskManage.Subscribe( - name, func(payload []byte) error { - if !reflect.DeepEqual(payload, []byte("1")) { + task.Task, func(payload []byte) error { + if !reflect.DeepEqual(payload, task.Msg) { t.Fail() } - count.Add(1) + msgChan <- payload return nil }, ) @@ -33,15 +47,20 @@ func TestSubscribeAndPublish(t *testing.T) { t.Run( "publish", func(t *testing.T) { t.Parallel() - time.Sleep(time.Second * 3) - for _, name := range taskList { - taskManage.Publish(name, []byte("1")) + for _, task := range taskList { + taskManage.Publish(task.Task, task.Msg) } - time.Sleep(time.Second * 10) - if count.Load() != int32(len(taskList)) { - t.Fatal("count not is ", count.Load()) + count := 0 + for true { + count++ + if count == len(taskList) { + if len(msgChan) == 0 { + break + } else { + t.Fatal() + } + } } - t.Log("count is:", count.Load()) }, ) } @@ -54,145 +73,255 @@ func TestEventSubscribeAndPublish(t *testing.T) { for i := 1; i <= 3; i++ { taskMap[taskPrefix+Task("_"+strconv.Itoa(i))] = false } - + msgChan := make(chan Task) for task := range taskMap { taskManage.Subscribe( task, func(payload []byte) error { - taskMap[task] = true + msgChan <- task return nil }, ) eventManage.Subscribe(event, task, func(eventData []byte) ([]byte, error) { return eventData, nil }) } t.Run( - "publish", func(t *testing.T) { + "event publish", func(t *testing.T) { t.Parallel() - time.Sleep(time.Second * 3) eventManage.Publish(event, []byte("test")) - time.Sleep(time.Second * 30) - for task, b := range taskMap { - if !b { - t.Fatal(task, "fail") + count := 0 + for true { + taskMap[<-msgChan] = true + count++ + if count == len(taskMap) { + if len(msgChan) == 0 { + for task, result := range taskMap { + if !result { + t.Fatal(task, " not trigger") + } + } + break + } else { + t.Fatal() + } } } - t.Log("task trigger info", taskMap) }, ) } func TestDql(t *testing.T) { - taskM := taskManage - var task = Task(t.Name()) - var retryCount = 1 - taskM.Subscribe( + var task, event = Task(t.Name() + rand.String(12)), Event(t.Name() + rand.String(12)) + msgChan, msg := make(chan []byte), []byte(rand.String(12)) + taskManage.Subscribe( task, func(payload []byte) error { - retryCount++ + msgChan <- payload return errors.New("test dql") }, ) - t.Run( - "publish", func(t *testing.T) { - t.Parallel() - time.Sleep(time.Second * 3) - taskM.Publish(task, []byte("test")) - var backOffTime time.Duration - for _, duration := range backOff { - backOffTime += duration + taskManage.Publish(task, msg) + count := 0 + for true { + <-msgChan + count++ + if count == len(backOff)+1 { + if len(msgChan) == 0 { + break + } else { + t.Fatal("try too many times :", count+len(msgChan)) } - time.Sleep(time.Second*3 + backOffTime) - batch, err := dlqManage.consumer.Fetch(10) - if err != nil { - t.Error(err) - } - var processCount int - for msg := range batch.Messages() { - processCount++ - err = msg.Ack() - if err != nil { - t.Error(err) - } - } - t.Log("retry count", retryCount, "process count", processCount) - }, - ) -} - -func TestDqlRepublish(t *testing.T) { - var task = Task(t.Name() + uuid.NewString()) - var num = 3 - var retryCount atomic.Uint32 + } + } taskManage.Subscribe( task, func(payload []byte) error { - retryCount.Add(1) - return errors.New("test dql") + msgChan <- payload + return nil }, ) + time.Sleep(backOff[len(backOff)-1]) + _, err := dlqManage.RepublishBatch(1, context.TODO()) + if err != nil { + t.Fatal(err) + } + republishPayload := <-msgChan + if !reflect.DeepEqual(republishPayload, msg) { + t.Fatal("reConsume payload not compare", string(republishPayload), msg) + } t.Run( - "republish die msg", func(t *testing.T) { - time.Sleep(time.Second) - for i := 0; i < num; i++ { - taskManage.Publish(task, []byte("test_"+strconv.FormatInt(int64(i), 10))) - } - var backOffTime time.Duration - for _, duration := range backOff { - backOffTime += duration + "event and task dql", func(t *testing.T) { + type TestEvent struct { + Event Event + EventData []byte + TriggerTasks []struct { + task Task + fetchTaskData func(eventData []byte) ([]byte, error) + retryCount int + } + ExecConsumers []struct { + name string + retryCount int + } } - t.Log("sleep", backOffTime) - time.Sleep(time.Second*3 + backOffTime) - var count atomic.Int32 - count.Add(int32(num)) - taskManage.Subscribe( - task, func(payload []byte) error { - count.Add(-1) - return nil + var testEvent = TestEvent{ + Event: event + Event(rand.String(12)), + EventData: []byte("event_data_" + rand.String(12)), + TriggerTasks: []struct { + task Task + fetchTaskData func(eventData []byte) ([]byte, error) + retryCount int + }{ + { + task: task + "_0", + fetchTaskData: func(eventData []byte) ([]byte, error) { return eventData, nil }, + retryCount: 0, + }, + { + task: task + "_1", + fetchTaskData: func(eventData []byte) ([]byte, error) { return eventData, nil }, + retryCount: (len(backOff)) / 2, + }, + { + task: task + "_2", + fetchTaskData: func(eventData []byte) ([]byte, error) { return eventData, nil }, + retryCount: len(backOff) + 1, + }, + }, + ExecConsumers: []struct { + name string + retryCount int + }{ + { + name: string(task) + "_4", + retryCount: len(backOff) / 2, + }, + { + name: string(task) + "_5", + retryCount: len(backOff) + 1, + }, + { + name: string(task) + "_3", + retryCount: 0, + }, }, - ) - time.Sleep(time.Second * 3) - t.Log("retry count", retryCount.Load()) - _, err := dlqManage.RepublishBatch(num*10, context.TODO()) - - if err != nil { - t.Fatal(err) } - time.Sleep(time.Second * 30) - if 0 != count.Load() { - t.Fatal("die msg Remaining:", count.Load()) + + msgChan = make(chan []byte, 10) + var needToReConsume, noNeedToReConsume int + for _, triggerTask := range testEvent.TriggerTasks { + retryCount := 0 + taskManage.Subscribe( + triggerTask.task, func(payload []byte) error { + if retryCount == triggerTask.retryCount { + t.Log(triggerTask.task, "finish") + msgChan <- payload + return nil + } else if retryCount > triggerTask.retryCount { + t.Fatal( + "too many retries or re-consuming errors,task:", triggerTask.task, retryCount, ">", + triggerTask.retryCount, + ) + return nil + } + retryCount++ + return errors.New("test dql") + }, + ) + eventManage.Subscribe( + testEvent.Event, triggerTask.task, func(eventData []byte) ([]byte, error) { return eventData, nil }, + ) + if triggerTask.retryCount >= len(backOff)+1 { + needToReConsume++ + } else { + noNeedToReConsume++ + } } - }, - ) -} -func BenchmarkDql(b *testing.B) { - taskM := taskManage - var task Task = Task(uuid.NewString()) - var count = b.N - taskM.Subscribe( - task, func(payload []byte) error { - return errors.New("test dql") - }, - ) - time.Sleep(time.Second * 5) - for i := 0; i < b.N; i++ { - taskM.Publish(task, []byte("test_"+strconv.FormatInt(int64(i), 10))) - } - time.Sleep(time.Second * 20) - b.Run( - "republish", func(b *testing.B) { - taskM.Subscribe( - task, func(payload []byte) error { - count-- - return nil + for _, execConsumer := range testEvent.ExecConsumers { + retryCount := 0 + eventManage.SubscribeToNewConsumer( + testEvent.Event, + execConsumer.name, func(payload []byte) error { + if retryCount == execConsumer.retryCount { + t.Log(execConsumer.name, "finish") + msgChan <- payload + return nil + } else if retryCount > execConsumer.retryCount { + t.Fatal( + "too many retries or re-consuming errors,consumer:", execConsumer.name, retryCount, + ">", + execConsumer.retryCount, + ) + return nil + } + retryCount++ + return errors.New("test dql") + }, + ) + if execConsumer.retryCount >= len(backOff)+1 { + needToReConsume++ + } else { + noNeedToReConsume++ + } + } + eventManage.Publish(testEvent.Event, testEvent.EventData) + var successMsg int + for true { + msg = <-msgChan + successMsg++ + if !reflect.DeepEqual(msg, testEvent.EventData) { + t.Fatal("payload not equal", msg, testEvent.EventData) + } + if successMsg >= noNeedToReConsume { + if noNeedToReConsume == successMsg && len(msgChan) == 0 { + break + } + t.Fatal("success msg to much:", noNeedToReConsume, successMsg+len(msgChan)) + } + } + t.Run( + "test reConsume", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.TODO(), time.Second*30) + defer cancel() + g, _ := errgroup.WithContext(ctx) + g.Go( + func() error { + time.Sleep(backOff[len(backOff)-1]) + for true { + count, err := dlqManage.RepublishBatch(10, context.TODO()) + if err != nil { + t.Fatal(err) + } + if count == 0 { + break + } + } + return nil + }, + ) + g.Go( + func() error { + successMsg = 0 + for true { + msg = <-msgChan + successMsg++ + if !reflect.DeepEqual(msg, testEvent.EventData) { + t.Fatal("payload not equal", msg, testEvent.EventData) + } + if successMsg >= needToReConsume { + if needToReConsume == successMsg && len(msgChan) == 0 { + return nil + } + t.Fatal("success msg to much:", needToReConsume, successMsg, len(msgChan)) + } + } + t.Fatal("success msg to much:", needToReConsume, successMsg, len(msgChan)) + return nil + }, + ) + err := g.Wait() + if err != nil { + t.Fatal(err) + } }, ) - - _, err := dlqManage.RepublishBatch(b.N, context.Background()) - if err != nil { - b.Error(err) - } }, ) - time.Sleep(time.Second * 20) - if count != 0 { - b.Fatal("msg lose Publish:", b.N, " republish:", count) - } } diff --git a/global/nats/publicFunc_test.go b/global/nats/publicFunc_test.go index 9d2eddd7..0f1c10f4 100644 --- a/global/nats/publicFunc_test.go +++ b/global/nats/publicFunc_test.go @@ -31,219 +31,262 @@ func newTaskData() taskData { } func TestTaskPublishAndSubscribe(t *testing.T) { task := Task(t.Name() + uuid.NewString()) - success := false + msgChan := make(chan struct{}) SubscribeTask( task, func(ctx context.Context) error { - success = true + msgChan <- struct{}{} return nil }, ) t.Run( "Publish task", func(t *testing.T) { t.Parallel() - time.Sleep(time.Second * 3) if !PublishTask(task) { t.Error("Publish fail") } - time.Sleep(time.Second * 3) - if !success { - t.Fail() - } + <-msgChan + close(msgChan) }, ) withPayloadTask, data := Task(t.Name()+uuid.NewString()), newTaskData() - var withPayloadTaskSuccess bool + msgWithDataChan := make(chan struct{ Data taskData }) SubscribeTaskWithPayloadAndProcessInTransaction( withPayloadTask, func(pushData taskData, ctx context.Context) error { - withPayloadTaskSuccess = reflect.DeepEqual(data, pushData) + msgWithDataChan <- struct{ Data taskData }{Data: pushData} return nil }, ) t.Run( "Publish task With payload", func(t *testing.T) { t.Parallel() - time.Sleep(time.Second * 3) if !PublishTaskWithPayload(withPayloadTask, data) { t.Error("Publish fail") } - time.Sleep(time.Second * 3) - if !withPayloadTaskSuccess { - t.Fail() + msg := <-msgWithDataChan + if !reflect.DeepEqual(msg.Data, data) { + t.Fatal("push data not equal:", msg.Data, data) } + close(msgWithDataChan) }, ) } func TestEventPublishAndSubscribe(t *testing.T) { + var taskCount = 10 taskMap, event := make(map[Task]int), Event(uuid.NewString()) - for i := 0; i < 10; i++ { + taskChan := make(chan struct{ Task Task }, taskCount) + for i := 0; i < taskCount; i++ { task := Task("task_" + uuid.NewString()) - taskMap[task] = 0 SubscribeTask( task, func(ctx context.Context) error { - taskMap[task]++ + taskChan <- struct{ Task Task }{task} return nil }, ) - } - time.Sleep(time.Second) - for task := range taskMap { + taskMap[task]++ BindTaskToEvent(event, task) } t.Run( "publish", func(t *testing.T) { - time.Sleep(time.Second * 3) + t.Parallel() PublishEvent(event) - time.Sleep(time.Second * 10) - for task, value := range taskMap { - if value != 1 { - t.Log(task, "fail trigger count", value) + for true { + msg, open := <-taskChan + if !open { + t.Fatal(errors.New("chan close")) + } + taskMap[msg.Task]-- + if taskMap[msg.Task] == 0 { + delete(taskMap, msg.Task) + } else if taskMap[msg.Task] < 0 { + t.Fatal(msg, errors.New("msg repeat")) + } + if len(taskMap) != 0 { + continue } + if len(taskChan) != 0 { + t.Fatal(taskChan, errors.New("msg repeat")) + } + close(taskChan) + return } - t.Log("task trigger info", taskMap) + }, ) } func TestSubscribeEvent(t *testing.T) { name := t.Name() + uuid.NewString() - event := Event(name) - var count atomic.Int32 - count.Add(10) var retryCount atomic.Int32 - SubscribeEvent( - event, name, func(v int, ctx context.Context) error { - if retryCount.Add(1) < 10 { - return errors.New("test retry") - } - count.Add(int32(v)) - return nil - }, - ) - task := Task(name + "_task_1") - BindTaskToEvent(event, task) - SubscribeTaskWithPayload( - task, func(v int, ctx context.Context) error { - if retryCount.Add(1) < 10 { - return errors.New("test retry") - } - count.Add(int32(v)) - return nil - }, - ) - task = Task(name + "_task_2") - BindTaskToEvent(event, task) - SubscribeTaskWithPayload( - task, func(v int, ctx context.Context) error { - if retryCount.Add(1) < 10 { - return errors.New("test retry") - } - count.Add(int32(v)) - return nil + eventToTask := map[Event]map[Task]struct{}{ + Event(name + "_event_1"): { + Task(name + "_task_1"): struct{}{}, + Task(name + "_task_2"): struct{}{}, + Task(name + "_task_3"): struct{}{}, }, - ) - t.Run( - "publish", func(t *testing.T) { - t.Parallel() - time.Sleep(3 * time.Second) - PublishEventWithPayload(event, 1) - time.Sleep(30 * time.Second) - if count.Load() != 13 { - t.Fatal(count.Load()) - } - }, - ) + } + eventSubmitCount := make(map[Event]int) + msgChan := make(chan struct{ Event Event }) + // subscribe + for event, tasks := range eventToTask { + eventSubmitCount[event] = 1 + SubscribeEvent( + event, string(event)+"_new_customer_group", func(v int, ctx context.Context) error { + if retryCount.Add(1) < 10 { + return errors.New("test retry") + } + msgChan <- struct{ Event Event }{event} + return nil + }, + ) + for task := range tasks { + eventSubmitCount[event]++ + BindTaskToEvent(event, task) + SubscribeTaskWithPayload( + task, func(v int, ctx context.Context) error { + if retryCount.Add(1) < 10 { + return errors.New("test retry") + } + msgChan <- struct{ Event Event }{event} + return nil + }, + ) + } + } + for event := range eventToTask { + PublishEventWithPayload(event, 1) + } + for true { + msg, open := <-msgChan + if !open { + t.Fatal(errors.New("chan close")) + } + eventSubmitCount[msg.Event]-- + if eventSubmitCount[msg.Event] == 0 { + delete(eventSubmitCount, msg.Event) + } else if eventSubmitCount[msg.Event] < 0 { + t.Fatal(msg, errors.New("msg repeat")) + } + if len(eventSubmitCount) == 0 { + close(msgChan) + t.Log("finish") + return + } + } } func TestOutboxTask(t *testing.T) { - taskMap := make(map[Task]*atomic.Int32) - var retryCount int32 = 2 - for i := 0; i < 3; i++ { + var retryCount, taskNumber = 2, 3 + msgChan, taskRetryCount := make(chan struct{}, taskNumber), make(map[Task]*atomic.Int32) + for i := 0; i < taskNumber; i++ { task := Task(t.Name() + "task_" + uuid.NewString()) - taskMap[task] = new(atomic.Int32) + taskRetryCount[task] = new(atomic.Int32) + taskRetryCount[task].Add(int32(retryCount)) SubscribeTaskWithPayload( task, func(data int32, ctx context.Context) error { - if taskMap[task].Add(-1) > -retryCount { + if taskRetryCount[task].Add(-1) >= 0 { return errors.New("test retry") } - taskMap[task].Add(data) + msgChan <- struct{}{} return nil }, ) } - t.Run( - "publish", func(t *testing.T) { - t.Parallel() - time.Sleep(time.Second * 3) - for task := range taskMap { - err := db.Transaction( - context.TODO(), func(ctx *cus.TxContext) error { - return PublishTaskToOutboxWithPayload(ctx, task, retryCount+1) - }, - ) - if err != nil { - t.Fatal(err) - } - } - time.Sleep(time.Second * 30) - for task, i := range taskMap { - if i.Load() != 1 { - t.Fatal(task, i) - } + for task := range taskRetryCount { + err := db.Transaction( + context.TODO(), func(ctx *cus.TxContext) error { + return PublishTaskToOutboxWithPayload(ctx, task, 1) + }, + ) + if err != nil { + t.Fatal(err) + } + } + for true { + _, open := <-msgChan + if open { + taskNumber-- + if taskNumber == 0 { + close(msgChan) + return } - }, - ) + } else { + t.Fatal(errors.New("chan close")) + } + } } func TestOutboxEvent(t *testing.T) { - eventMap := make(map[Event]*atomic.Int32) - eventToTask := make(map[Event][]Task) - for i := 0; i < 10; i++ { + const eventNumber, taskNumber = 10, 3 + eventToTask, eventChan := make(map[Event]map[Task]struct{}), make( + chan struct { + Event Event + Task Task + }, eventNumber*taskNumber, + ) + for i := 0; i < eventNumber; i++ { event := Event("event_" + uuid.NewString()) - eventMap[event] = &atomic.Int32{} - for j := 0; j < 3; j++ { + eventToTask[event] = make(map[Task]struct{}) + for j := 0; j < taskNumber; j++ { task := Task("task_" + uuid.NewString()) - eventToTask[event] = append(eventToTask[event], task) + eventToTask[event][task] = struct{}{} SubscribeTaskWithPayload( task, func(t int, ctx context.Context) error { - eventMap[event].Add(3) + eventChan <- struct { + Event Event + Task Task + }{Event: event, Task: task} return nil }, ) BindTaskToEvent(event, task) } } - t.Run( - "public", func(t *testing.T) { - t.Parallel() - time.Sleep(time.Second * 3) - for event := range eventMap { - err := db.Transaction( - context.TODO(), func(ctx *cus.TxContext) error { - return PublishEventToOutboxWithPayload(ctx, event, 3) - }, - ) - if err != nil { - t.Fatal(err) - } + for event := range eventToTask { + err := db.Transaction( + context.TODO(), func(ctx *cus.TxContext) error { + return PublishEventToOutboxWithPayload(ctx, event, 3) + }, + ) + if err != nil { + t.Fatal(err) + } + } + for true { + msg, open := <-eventChan + if open { + if _, exist := eventToTask[msg.Event][msg.Task]; !exist { + close(eventChan) + t.Fatal(msg, errors.New("msg repeat")) } - time.Sleep(time.Second * 20) - for event, num := range eventMap { - if num.Load() != 9 { - t.Fatal(event, num.Load()) + delete(eventToTask[msg.Event], msg.Task) + if len(eventToTask[msg.Event]) == 0 { + delete(eventToTask, msg.Event) + } + if len(eventToTask) == 0 { + if len(eventChan) > 0 { + close(eventChan) + t.Fatal(eventChan, errors.New("msg repeat")) } + t.Log("finish") + close(eventChan) + return } - }, - ) + } else { + t.Fatal(errors.New("chan close")) + } + } } func TestCustomerProcessingTimeout(t *testing.T) { var count atomic.Int32 task := manager.Task(t.Name()) + msgChan := make(chan struct{}) taskManage.Subscribe( task, func(payload []byte) error { count.Add(1) time.Sleep(time.Second * 20) + msgChan <- struct{}{} return nil }, ) @@ -253,15 +296,16 @@ func TestCustomerProcessingTimeout(t *testing.T) { t.Parallel() err := db.Transaction( context.TODO(), func(ctx *cus.TxContext) error { - return PublishTaskToOutbox(ctx, Task(task)) + return PublishTaskToOutbox(ctx, task) }, ) if err != nil { t.Fatal(err) } - time.Sleep(time.Second * 31) + <-msgChan + close(msgChan) if count.Load() != 1 { - t.Fatal("count not is 1,count:", count.Load()) + t.Fatal("msg repeat, repeat count:", count.Load()-1) } }, )