diff --git a/kafka_exporter.go b/kafka_exporter.go index 3737708b..71d1b27a 100644 --- a/kafka_exporter.go +++ b/kafka_exporter.go @@ -284,17 +284,17 @@ func NewExporter(opts kafkaOpts, topicFilter string, topicExclude string, groupF }, nil } -//func (e *Exporter) fetchOffsetVersion() int16 { -// version := e.client.Config().Version -// if e.client.Config().Version.IsAtLeast(sarama.V2_0_0_0) { -// return 4 -// } else if version.IsAtLeast(sarama.V0_10_2_0) { -// return 2 -// } else if version.IsAtLeast(sarama.V0_8_2_2) { -// return 1 -// } -// return 0 -//} +func (e *Exporter) fetchOffsetVersion() int16 { + version := e.client.Config().Version + if e.client.Config().Version.IsAtLeast(sarama.V2_0_0_0) { + return 4 + } else if version.IsAtLeast(sarama.V0_10_2_0) { + return 2 + } else if version.IsAtLeast(sarama.V0_8_2_2) { + return 1 + } + return 0 +} // Describe describes all the metrics ever exported by the Kafka exporter. It // implements prometheus.Collector. @@ -574,7 +574,7 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { klog.Errorf("Cannot describe for the group %s with error code %d", group.GroupId, group.Err) continue } - offsetFetchRequest := sarama.OffsetFetchRequest{ConsumerGroup: group.GroupId, Version: 1} + offsetFetchRequest := sarama.OffsetFetchRequest{ConsumerGroup: group.GroupId, Version: e.fetchOffsetVersion()} if e.offsetShowAll { for topic, partitions := range offset { for partition := range partitions {