From aaae3951eb3088acce99e03b9f0e872e0fde6b0e Mon Sep 17 00:00:00 2001 From: jack Date: Tue, 2 Jul 2024 11:12:18 +0800 Subject: [PATCH 1/2] Improve lag calculation and error handling in lagCmd --- cmd/kaf/topic.go | 47 ++++++++++++++++++++++++++++++----------------- 1 file changed, 30 insertions(+), 17 deletions(-) diff --git a/cmd/kaf/topic.go b/cmd/kaf/topic.go index ad1ff18..85dca10 100644 --- a/cmd/kaf/topic.go +++ b/cmd/kaf/topic.go @@ -1,13 +1,12 @@ package main import ( + "encoding/json" "fmt" + "os" "sort" - "text/tabwriter" - "strings" - - "encoding/json" + "text/tabwriter" "github.com/IBM/sarama" "github.com/spf13/cobra" @@ -343,6 +342,8 @@ var lagCmd = &cobra.Command{ Run: func(cmd *cobra.Command, args []string) { topic := args[0] admin := getClusterAdmin() + defer admin.Close() + topicDetails, err := admin.DescribeTopics([]string{args[0]}) if err != nil { errorExit("Unable to describe topics: %v\n", err) @@ -354,35 +355,48 @@ var lagCmd = &cobra.Command{ highWatermarks := getHighWatermarks(args[0], partitions) var groups []string - rst, err := admin.ListConsumerGroups() + consumerGroups, err := admin.ListConsumerGroups() if err != nil { - errorExit("Unable to list consumer info: %v\n", err) + errorExit("Unable to list consumer groups: %v\n", err) } - for group, _ := range rst { + for group := range consumerGroups { groups = append(groups, group) } groupsInfo, err := admin.DescribeConsumerGroups(groups) if err != nil { - errorExit("Unable to list consumer info: %v\n", err) + errorExit("Unable to describe consumer groups: %v\n", err) } var lagInfo = make(map[string]int64) for _, v := range groupsInfo { + var sum int64 for _, member := range v.Members { assignment, err := member.GetMemberAssignment() if err != nil || assignment == nil { continue } - if _, exist := assignment.Topics[topic]; exist { - var sum int64 - resp, _ := admin.ListConsumerGroupOffsets(v.GroupId, assignment.Topics) - for _, v1 := range resp.Blocks { - for pid, v2 := range v1 { - sum += highWatermarks[pid] - v2.Offset + if topicPartitions, exist := assignment.Topics[topic]; exist { + resp, err := admin.ListConsumerGroupOffsets(v.GroupId, map[string][]int32{topic: topicPartitions}) + if err != nil { + fmt.Fprintf(os.Stderr, "Error fetching offsets for group %s: %v\n", v.GroupId, err) + continue + } + if blocks, ok := resp.Blocks[topic]; ok { + for pid, block := range blocks { + if hwm, ok := highWatermarks[pid]; ok { + if block.Offset > hwm { + fmt.Fprintf(os.Stderr, "Warning: Consumer offset (%d) is greater than high watermark (%d) for partition %d in group %s\n", block.Offset, hwm, pid, v.GroupId) + // You might choose to set the lag to 0 in this case + // sum += 0 + } else { + sum += hwm - block.Offset + } + } } } - lagInfo[v.GroupId] = sum } - + } + if sum > 0 { + lagInfo[v.GroupId] = sum } } w := tabwriter.NewWriter(outWriter, tabwriterMinWidth, tabwriterWidth, tabwriterPadding, tabwriterPadChar, tabwriterFlags) @@ -393,6 +407,5 @@ var lagCmd = &cobra.Command{ fmt.Fprintf(w, "%v\t%v\n", group, lag) } w.Flush() - }, } From 0c17de1ff2c265bd969f922ee95cdadc1efb8b40 Mon Sep 17 00:00:00 2001 From: jack Date: Tue, 9 Jul 2024 16:45:56 +0800 Subject: [PATCH 2/2] feat: Show zero lag consumer groups and include group state in lagCmd output --- cmd/kaf/topic.go | 56 ++++++++++++++++++++++++++++++++++-------------- 1 file changed, 40 insertions(+), 16 deletions(-) diff --git a/cmd/kaf/topic.go b/cmd/kaf/topic.go index 85dca10..503b554 100644 --- a/cmd/kaf/topic.go +++ b/cmd/kaf/topic.go @@ -335,6 +335,7 @@ var deleteTopicCmd = &cobra.Command{ } }, } + var lagCmd = &cobra.Command{ Use: "lag", Short: "Display the total lags for each consumer group", @@ -344,49 +345,68 @@ var lagCmd = &cobra.Command{ admin := getClusterAdmin() defer admin.Close() - topicDetails, err := admin.DescribeTopics([]string{args[0]}) - if err != nil { + // Describe the topic + topicDetails, err := admin.DescribeTopics([]string{topic}) + if err != nil || len(topicDetails) == 0 { errorExit("Unable to describe topics: %v\n", err) } + + // Get the list of partitions for the topic partitions := make([]int32, 0, len(topicDetails[0].Partitions)) for _, partition := range topicDetails[0].Partitions { partitions = append(partitions, partition.ID) } - highWatermarks := getHighWatermarks(args[0], partitions) + highWatermarks := getHighWatermarks(topic, partitions) - var groups []string + // List all consumer groups consumerGroups, err := admin.ListConsumerGroups() if err != nil { errorExit("Unable to list consumer groups: %v\n", err) } + + var groups []string for group := range consumerGroups { groups = append(groups, group) } + + // Describe all consumer groups groupsInfo, err := admin.DescribeConsumerGroups(groups) if err != nil { errorExit("Unable to describe consumer groups: %v\n", err) } - var lagInfo = make(map[string]int64) - for _, v := range groupsInfo { + + // Calculate lag for each group + lagInfo := make(map[string]int64) + groupStates := make(map[string]string) // To store the state of each group + for _, group := range groupsInfo { var sum int64 - for _, member := range v.Members { + show := false + for _, member := range group.Members { assignment, err := member.GetMemberAssignment() if err != nil || assignment == nil { continue } + + metadata, err := member.GetMemberMetadata() + if err != nil || metadata == nil { + continue + } + if topicPartitions, exist := assignment.Topics[topic]; exist { - resp, err := admin.ListConsumerGroupOffsets(v.GroupId, map[string][]int32{topic: topicPartitions}) + show = true + resp, err := admin.ListConsumerGroupOffsets(group.GroupId, map[string][]int32{topic: topicPartitions}) if err != nil { - fmt.Fprintf(os.Stderr, "Error fetching offsets for group %s: %v\n", v.GroupId, err) + fmt.Fprintf(os.Stderr, "Error fetching offsets for group %s: %v\n", group.GroupId, err) continue } + if blocks, ok := resp.Blocks[topic]; ok { for pid, block := range blocks { if hwm, ok := highWatermarks[pid]; ok { if block.Offset > hwm { - fmt.Fprintf(os.Stderr, "Warning: Consumer offset (%d) is greater than high watermark (%d) for partition %d in group %s\n", block.Offset, hwm, pid, v.GroupId) - // You might choose to set the lag to 0 in this case - // sum += 0 + fmt.Fprintf(os.Stderr, "Warning: Consumer offset (%d) is greater than high watermark (%d) for partition %d in group %s\n", block.Offset, hwm, pid, group.GroupId) + } else if block.Offset < 0 { + // Skip partitions with negative offsets } else { sum += hwm - block.Offset } @@ -395,16 +415,20 @@ var lagCmd = &cobra.Command{ } } } - if sum > 0 { - lagInfo[v.GroupId] = sum + + if show && sum >= 0 { + lagInfo[group.GroupId] = sum + groupStates[group.GroupId] = group.State // Store the state of the group } } + + // Print the lag information along with group state w := tabwriter.NewWriter(outWriter, tabwriterMinWidth, tabwriterWidth, tabwriterPadding, tabwriterPadChar, tabwriterFlags) if !noHeaderFlag { - fmt.Fprintf(w, "GROUP ID\tLAG\n") + fmt.Fprintf(w, "GROUP ID\tSTATE\tLAG\n") } for group, lag := range lagInfo { - fmt.Fprintf(w, "%v\t%v\n", group, lag) + fmt.Fprintf(w, "%v\t%v\t%v\n", group, groupStates[group], lag) } w.Flush() },