Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Added support for offsets_for_times, beginning_offsets and end_offsets APIs. #1161

Merged
merged 6 commits into from
Aug 7, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from kafka.protocol.admin import SaslHandShakeRequest
from kafka.protocol.commit import GroupCoordinatorResponse, OffsetFetchRequest
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.fetch import FetchRequest
from kafka.protocol.types import Int32
from kafka.version import __version__

Expand Down Expand Up @@ -886,7 +887,7 @@ def _handle_api_version_response(self, response):

def _infer_broker_version_from_api_versions(self, api_versions):
# The logic here is to check the list of supported request versions
# in descending order. As soon as we find one that works, return it
# in reverse order. As soon as we find one that works, return it
test_cases = [
# format (<broker verion>, <needed struct>)
((0, 11, 0), MetadataRequest[4]),
Expand Down
241 changes: 180 additions & 61 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.protocol.fetch import FetchRequest
from kafka.protocol.message import PartialMessage
from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy
from kafka.protocol.offset import (
OffsetRequest, OffsetResetStrategy, UNKNOWN_OFFSET
)
from kafka.serializer import Deserializer
from kafka.structs import TopicPartition
from kafka.structs import TopicPartition, OffsetAndTimestamp

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -48,6 +50,7 @@ class Fetcher(six.Iterator):
'iterator_refetch_records': 1, # undocumented -- interface may change
'metric_group_prefix': 'consumer',
'api_version': (0, 8, 0),
'retry_backoff_ms': 100
}

def __init__(self, client, subscriptions, metrics, **configs):
Expand Down Expand Up @@ -180,6 +183,31 @@ def update_fetch_positions(self, partitions):
" offset %s", tp, committed)
self._subscriptions.seek(tp, committed)

def get_offsets_by_times(self, timestamps, timeout_ms):
offsets = self._retrieve_offsets(timestamps, timeout_ms)
for tp in timestamps:
if tp not in offsets:
offsets[tp] = None
else:
offset, timestamp = offsets[tp]
offsets[tp] = OffsetAndTimestamp(offset, timestamp)
return offsets

def beginning_offsets(self, partitions, timeout_ms):
return self.beginning_or_end_offset(
partitions, OffsetResetStrategy.EARLIEST, timeout_ms)

def end_offsets(self, partitions, timeout_ms):
return self.beginning_or_end_offset(
partitions, OffsetResetStrategy.LATEST, timeout_ms)

def beginning_or_end_offset(self, partitions, timestamp, timeout_ms):
timestamps = dict([(tp, timestamp) for tp in partitions])
offsets = self._retrieve_offsets(timestamps, timeout_ms)
for tp in timestamps:
offsets[tp] = offsets[tp][0]
return offsets

def _reset_offset(self, partition):
"""Reset offsets for the given partition using the offset reset strategy.

Expand All @@ -199,40 +227,64 @@ def _reset_offset(self, partition):

log.debug("Resetting offset for partition %s to %s offset.",
partition, strategy)
offset = self._offset(partition, timestamp)
offsets = self._retrieve_offsets({partition: timestamp})
if partition not in offsets:
raise NoOffsetForPartitionError(partition)
offset = offsets[partition][0]

# we might lose the assignment while fetching the offset,
# so check it is still active
if self._subscriptions.is_assigned(partition):
self._subscriptions.seek(partition, offset)

def _offset(self, partition, timestamp):
"""Fetch a single offset before the given timestamp for the partition.
def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
"""Fetch offset for each partition passed in ``timestamps`` map.

Blocks until offset is obtained, or a non-retriable exception is raised
Blocks until offsets are obtained, a non-retriable exception is raised
or ``timeout_ms`` passed.

Arguments:
partition The partition that needs fetching offset.
timestamp (int): timestamp for fetching offset. -1 for the latest
available, -2 for the earliest available. Otherwise timestamp
is treated as epoch seconds.
timestamps: {TopicPartition: int} dict with timestamps to fetch
offsets by. -1 for the latest available, -2 for the earliest
available. Otherwise timestamp is treated as epoch miliseconds.

Returns:
int: message offset
{TopicPartition: (int, int)}: Mapping of partition to
retrieved offset and timestamp. If offset does not exist for
the provided timestamp, that partition will be missing from
this mapping.
"""
while True:
future = self._send_offset_request(partition, timestamp)
self._client.poll(future=future)
if not timestamps:
return {}

start_time = time.time()
remaining_ms = timeout_ms
while remaining_ms > 0:
future = self._send_offset_requests(timestamps)
self._client.poll(future=future, timeout_ms=remaining_ms)

if future.succeeded():
return future.value

if not future.retriable():
raise future.exception # pylint: disable-msg=raising-bad-type

elapsed_ms = (time.time() - start_time) * 1000
remaining_ms = timeout_ms - elapsed_ms
if remaining_ms < 0:
break

if future.exception.invalid_metadata:
refresh_future = self._client.cluster.request_update()
self._client.poll(future=refresh_future, sleep=True)
self._client.poll(
future=refresh_future, sleep=True, timeout_ms=remaining_ms)
else:
time.sleep(self.config['retry_backoff_ms'] / 1000.0)

elapsed_ms = (time.time() - start_time) * 1000
remaining_ms = timeout_ms - elapsed_ms

raise Errors.KafkaTimeoutError(
"Failed to get offsets by timestamps in %s ms" % timeout_ms)

def _raise_if_offset_out_of_range(self):
"""Check FetchResponses for offset out of range.
Expand Down Expand Up @@ -576,73 +628,140 @@ def _deserialize(self, f, topic, bytes_):
return f.deserialize(topic, bytes_)
return f(bytes_)

def _send_offset_request(self, partition, timestamp):
"""Fetch a single offset before the given timestamp for the partition.
def _send_offset_requests(self, timestamps):
"""Fetch offsets for each partition in timestamps dict. This may send
request to multiple nodes, based on who is Leader for partition.

Arguments:
partition (TopicPartition): partition that needs fetching offset
timestamp (int): timestamp for fetching offset
timestamps (dict): {TopicPartition: int} mapping of fetching
timestamps.

Returns:
Future: resolves to the corresponding offset
Future: resolves to a mapping of retrieved offsets
"""
node_id = self._client.cluster.leader_for_partition(partition)
if node_id is None:
log.debug("Partition %s is unknown for fetching offset,"
" wait for metadata refresh", partition)
return Future().failure(Errors.StaleMetadata(partition))
elif node_id == -1:
log.debug("Leader for partition %s unavailable for fetching offset,"
" wait for metadata refresh", partition)
return Future().failure(Errors.LeaderNotAvailableError(partition))

request = OffsetRequest[0](
-1, [(partition.topic, [(partition.partition, timestamp, 1)])]
)
timestamps_by_node = collections.defaultdict(dict)
for partition, timestamp in six.iteritems(timestamps):
node_id = self._client.cluster.leader_for_partition(partition)
if node_id is None:
self._client.add_topic(partition.topic)
log.debug("Partition %s is unknown for fetching offset,"
" wait for metadata refresh", partition)
return Future().failure(Errors.StaleMetadata(partition))
elif node_id == -1:
log.debug("Leader for partition %s unavailable for fetching "
"offset, wait for metadata refresh", partition)
return Future().failure(
Errors.LeaderNotAvailableError(partition))
else:
timestamps_by_node[node_id][partition] = timestamp

# Aggregate results until we have all
list_offsets_future = Future()
responses = []
node_count = len(timestamps_by_node)

def on_success(value):
responses.append(value)
if len(responses) == node_count:
offsets = {}
for r in responses:
offsets.update(r)
list_offsets_future.success(offsets)

def on_fail(err):
if not list_offsets_future.is_done:
list_offsets_future.failure(err)

for node_id, timestamps in six.iteritems(timestamps_by_node):
_f = self._send_offset_request(node_id, timestamps)
_f.add_callback(on_success)
_f.add_errback(on_fail)
return list_offsets_future

def _send_offset_request(self, node_id, timestamps):
by_topic = collections.defaultdict(list)
for tp, timestamp in six.iteritems(timestamps):
if self.config['api_version'] >= (0, 10, 1):
data = (tp.partition, timestamp)
else:
data = (tp.partition, timestamp, 1)
by_topic[tp.topic].append(data)

if self.config['api_version'] >= (0, 10, 1):
request = OffsetRequest[1](-1, list(six.iteritems(by_topic)))
else:
request = OffsetRequest[0](-1, list(six.iteritems(by_topic)))

# Client returns a future that only fails on network issues
# so create a separate future and attach a callback to update it
# based on response error codes
future = Future()

_f = self._client.send(node_id, request)
_f.add_callback(self._handle_offset_response, partition, future)
_f.add_callback(self._handle_offset_response, future)
_f.add_errback(lambda e: future.failure(e))
return future

def _handle_offset_response(self, partition, future, response):
def _handle_offset_response(self, future, response):
"""Callback for the response of the list offset call above.

Arguments:
partition (TopicPartition): The partition that was fetched
future (Future): the future to update based on response
response (OffsetResponse): response from the server

Raises:
AssertionError: if response does not match partition
"""
topic, partition_info = response.topics[0]
assert len(response.topics) == 1 and len(partition_info) == 1, (
'OffsetResponse should only be for a single topic-partition')

part, error_code, offsets = partition_info[0]
assert topic == partition.topic and part == partition.partition, (
'OffsetResponse partition does not match OffsetRequest partition')

error_type = Errors.for_code(error_code)
if error_type is Errors.NoError:
assert len(offsets) == 1, 'Expected OffsetResponse with one offset'
offset = offsets[0]
log.debug("Fetched offset %d for partition %s", offset, partition)
future.success(offset)
elif error_type in (Errors.NotLeaderForPartitionError,
Errors.UnknownTopicOrPartitionError):
log.debug("Attempt to fetch offsets for partition %s failed due"
" to obsolete leadership information, retrying.",
partition)
future.failure(error_type(partition))
else:
log.warning("Attempt to fetch offsets for partition %s failed due to:"
" %s", partition, error_type)
future.failure(error_type(partition))
timestamp_offset_map = {}
for topic, part_data in response.topics:
for partition_info in part_data:
partition, error_code = partition_info[:2]
partition = TopicPartition(topic, partition)
error_type = Errors.for_code(error_code)
if error_type is Errors.NoError:
if response.API_VERSION == 0:
offsets = partition_info[2]
assert len(offsets) <= 1, 'Expected OffsetResponse with one offset'
if not offsets:
offset = UNKNOWN_OFFSET
else:
offset = offsets[0]
log.debug("Handling v0 ListOffsetResponse response for %s. "
"Fetched offset %s", partition, offset)
if offset != UNKNOWN_OFFSET:
timestamp_offset_map[partition] = (offset, None)
else:
timestamp, offset = partition_info[2:]
log.debug("Handling ListOffsetResponse response for %s. "
"Fetched offset %s, timestamp %s",
partition, offset, timestamp)
if offset != UNKNOWN_OFFSET:
timestamp_offset_map[partition] = (offset, timestamp)
elif error_type is Errors.UnsupportedForMessageFormatError:
# The message format on the broker side is before 0.10.0,
# we simply put None in the response.
log.debug("Cannot search by timestamp for partition %s because the"
" message format version is before 0.10.0", partition)
elif error_type is Errors.NotLeaderForPartitionError:
log.debug("Attempt to fetch offsets for partition %s failed due"
" to obsolete leadership information, retrying.",
partition)
future.failure(error_type(partition))
return
elif error_type is Errors.UnknownTopicOrPartitionError:
log.warn("Received unknown topic or partition error in ListOffset "
"request for partition %s. The topic/partition " +
"may not exist or the user may not have Describe access "
"to it.", partition)
future.failure(error_type(partition))
return
else:
log.warning("Attempt to fetch offsets for partition %s failed due to:"
" %s", partition, error_type)
future.failure(error_type(partition))
return
if not future.is_done:
future.success(timestamp_offset_map)

def _fetchable_partitions(self):
fetchable = self._subscriptions.fetchable_partitions()
Expand Down
Loading