diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index a6298211c..8c0ae948e 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -893,10 +893,6 @@ def offsets_for_times(self, timestamps): up the offsets by timestamp. KafkaTimeoutError: If fetch failed in request_timeout_ms """ - if self.config['api_version'] <= (0, 10, 0): - raise UnsupportedVersionError( - "offsets_for_times API not supported for cluster version {}" - .format(self.config['api_version'])) for tp, ts in timestamps.items(): timestamps[tp] = int(ts) if ts < 0: @@ -928,10 +924,6 @@ def beginning_offsets(self, partitions): up the offsets by timestamp. KafkaTimeoutError: If fetch failed in request_timeout_ms. """ - if self.config['api_version'] <= (0, 10, 0): - raise UnsupportedVersionError( - "offsets_for_times API not supported for cluster version {}" - .format(self.config['api_version'])) offsets = self._fetcher.beginning_offsets( partitions, self.config['request_timeout_ms']) return offsets @@ -959,10 +951,6 @@ def end_offsets(self, partitions): up the offsets by timestamp. KafkaTimeoutError: If fetch failed in request_timeout_ms """ - if self.config['api_version'] <= (0, 10, 0): - raise UnsupportedVersionError( - "offsets_for_times API not supported for cluster version {}" - .format(self.config['api_version'])) offsets = self._fetcher.end_offsets( partitions, self.config['request_timeout_ms']) return offsets diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 4b5e78a35..be00c3d85 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -727,20 +727,6 @@ def test_kafka_consumer_offsets_search_many_partitions(self): tp1: p1msg.offset + 1 }) - @kafka_versions('<0.10.1') - def test_kafka_consumer_offsets_for_time_old(self): - consumer = self.kafka_consumer() - tp = TopicPartition(self.topic, 0) - - with self.assertRaises(UnsupportedVersionError): - consumer.offsets_for_times({tp: int(time.time())}) - - with self.assertRaises(UnsupportedVersionError): - consumer.beginning_offsets([tp]) - - with self.assertRaises(UnsupportedVersionError): - consumer.end_offsets([tp]) - @kafka_versions('>=0.10.1') def test_kafka_consumer_offsets_for_times_errors(self): consumer = self.kafka_consumer()