Skip to content

Commit

Permalink
Added unit tests for fetcher's _reset_offset and related functions.
Browse files Browse the repository at this point in the history
  • Loading branch information
tvoinarovskyi committed Aug 6, 2017
1 parent 414e286 commit 61668aa
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 7 deletions.
21 changes: 16 additions & 5 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ def _reset_offset(self, partition):
log.debug("Resetting offset for partition %s to %s offset.",
partition, strategy)
offsets = self._retrieve_offsets({partition: timestamp})
assert partition in offsets
if partition not in offsets:
raise NoOffsetForPartitionError(partition)
offset = offsets[partition][0]

# we might lose the assignment while fetching the offset,
Expand Down Expand Up @@ -660,10 +661,14 @@ def on_success(value):
offsets.update(r)
list_offsets_future.success(offsets)

def on_fail(err):
if not list_offsets_future.is_done:
list_offsets_future.failure(err)

for node_id, timestamps in six.iteritems(timestamps_by_node):
_f = self._send_offset_request(node_id, timestamps)
_f.add_callback(on_success)
_f.add_errback(lambda e: list_offsets_future.failure(e))
_f.add_errback(on_fail)
return list_offsets_future

def _send_offset_request(self, node_id, timestamps):
Expand Down Expand Up @@ -710,10 +715,13 @@ def _handle_offset_response(self, future, response):
if response.API_VERSION == 0:
offsets = partition_info[2]
assert len(offsets) <= 1, 'Expected OffsetResponse with one offset'
if offsets:
if not offsets:
offset = UNKNOWN_OFFSET
else:
offset = offsets[0]
log.debug("Handling v0 ListOffsetResponse response for %s. "
"Fetched offset %s", partition, offset)
log.debug("Handling v0 ListOffsetResponse response for %s. "
"Fetched offset %s", partition, offset)
if offset != UNKNOWN_OFFSET:
timestamp_offset_map[partition] = (offset, None)
else:
timestamp, offset = partition_info[2:]
Expand All @@ -732,16 +740,19 @@ def _handle_offset_response(self, future, response):
" to obsolete leadership information, retrying.",
partition)
future.failure(error_type(partition))
return
elif error_type is Errors.UnknownTopicOrPartitionError:
log.warn("Received unknown topic or partition error in ListOffset "
"request for partition %s. The topic/partition " +
"may not exist or the user may not have Describe access "
"to it.", partition)
future.failure(error_type(partition))
return
else:
log.warning("Attempt to fetch offsets for partition %s failed due to:"
" %s", partition, error_type)
future.failure(error_type(partition))
return
if not future.is_done:
future.success(timestamp_offset_map)

Expand Down
2 changes: 1 addition & 1 deletion test/test_consumer_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,7 @@ def test_kafka_consumer_offsets_for_time_old(self):
with self.assertRaises(UnsupportedVersionError):
consumer.end_offsets([tp])

@kafka_versions('<0.10.1')
@kafka_versions('>=0.10.1')
def test_kafka_consumer_offsets_for_times_errors(self):
consumer = self.kafka_consumer()
tp = TopicPartition(self.topic, 0)
Expand Down
183 changes: 182 additions & 1 deletion test/test_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,21 @@

import pytest

import itertools
from collections import OrderedDict

from kafka.client_async import KafkaClient
from kafka.consumer.fetcher import Fetcher
from kafka.consumer.fetcher import Fetcher, NoOffsetForPartitionError
from kafka.consumer.subscription_state import SubscriptionState
from kafka.metrics import Metrics
from kafka.protocol.fetch import FetchRequest
from kafka.protocol.offset import OffsetResponse
from kafka.structs import TopicPartition
from kafka.future import Future
from kafka.errors import (
StaleMetadata, LeaderNotAvailableError, NotLeaderForPartitionError,
UnknownTopicOrPartitionError
)


@pytest.fixture
Expand Down Expand Up @@ -101,3 +110,175 @@ def test_update_fetch_positions(fetcher, mocker):
fetcher.update_fetch_positions([partition])
assert fetcher._reset_offset.call_count == 0
fetcher._subscriptions.seek.assert_called_with(partition, 123)


def test__reset_offset(fetcher, mocker):
tp = TopicPartition("topic", 0)
fetcher._subscriptions.subscribe(topics="topic")
fetcher._subscriptions.assign_from_subscribed([tp])
fetcher._subscriptions.need_offset_reset(tp)
mocked = mocker.patch.object(fetcher, '_retrieve_offsets')

mocked.return_value = {}
with pytest.raises(NoOffsetForPartitionError):
fetcher._reset_offset(tp)

mocked.return_value = {tp: (1001, None)}
fetcher._reset_offset(tp)
assert not fetcher._subscriptions.assignment[tp].awaiting_reset
assert fetcher._subscriptions.assignment[tp].position == 1001


def test__send_offset_requests(fetcher, mocker):
tp = TopicPartition("topic_send_offset", 1)
mocked_send = mocker.patch.object(fetcher, "_send_offset_request")
send_futures = []

def send_side_effect(*args, **kw):
f = Future()
send_futures.append(f)
return f
mocked_send.side_effect = send_side_effect

mocked_leader = mocker.patch.object(
fetcher._client.cluster, "leader_for_partition")
# First we report unavailable leader 2 times different ways and later
# always as available
mocked_leader.side_effect = itertools.chain(
[None, -1], itertools.cycle([0]))

# Leader == None
fut = fetcher._send_offset_requests({tp: 0})
assert fut.failed()
assert isinstance(fut.exception, StaleMetadata)
assert not mocked_send.called

# Leader == -1
fut = fetcher._send_offset_requests({tp: 0})
assert fut.failed()
assert isinstance(fut.exception, LeaderNotAvailableError)
assert not mocked_send.called

# Leader == 0, send failed
fut = fetcher._send_offset_requests({tp: 0})
assert not fut.is_done
assert mocked_send.called
# Check that we bound the futures correctly to chain failure
send_futures.pop().failure(NotLeaderForPartitionError(tp))
assert fut.failed()
assert isinstance(fut.exception, NotLeaderForPartitionError)

# Leader == 0, send success
fut = fetcher._send_offset_requests({tp: 0})
assert not fut.is_done
assert mocked_send.called
# Check that we bound the futures correctly to chain success
send_futures.pop().success({tp: (10, 10000)})
assert fut.succeeded()
assert fut.value == {tp: (10, 10000)}


def test__send_offset_requests_multiple_nodes(fetcher, mocker):
tp1 = TopicPartition("topic_send_offset", 1)
tp2 = TopicPartition("topic_send_offset", 2)
tp3 = TopicPartition("topic_send_offset", 3)
tp4 = TopicPartition("topic_send_offset", 4)
mocked_send = mocker.patch.object(fetcher, "_send_offset_request")
send_futures = []

def send_side_effect(node_id, timestamps):
f = Future()
send_futures.append((node_id, timestamps, f))
return f
mocked_send.side_effect = send_side_effect

mocked_leader = mocker.patch.object(
fetcher._client.cluster, "leader_for_partition")
mocked_leader.side_effect = itertools.cycle([0, 1])

# -- All node succeeded case
tss = OrderedDict([(tp1, 0), (tp2, 0), (tp3, 0), (tp4, 0)])
fut = fetcher._send_offset_requests(tss)
assert not fut.is_done
assert mocked_send.call_count == 2

req_by_node = {}
second_future = None
for node, timestamps, f in send_futures:
req_by_node[node] = timestamps
if node == 0:
# Say tp3 does not have any messages so it's missing
f.success({tp1: (11, 1001)})
else:
second_future = f
assert req_by_node == {
0: {tp1: 0, tp3: 0},
1: {tp2: 0, tp4: 0}
}

# We only resolved 1 future so far, so result future is not yet ready
assert not fut.is_done
second_future.success({tp2: (12, 1002), tp4: (14, 1004)})
assert fut.succeeded()
assert fut.value == {tp1: (11, 1001), tp2: (12, 1002), tp4: (14, 1004)}

# -- First succeeded second not
del send_futures[:]
fut = fetcher._send_offset_requests(tss)
assert len(send_futures) == 2
send_futures[0][2].success({tp1: (11, 1001)})
send_futures[1][2].failure(UnknownTopicOrPartitionError(tp1))
assert fut.failed()
assert isinstance(fut.exception, UnknownTopicOrPartitionError)

# -- First fails second succeeded
del send_futures[:]
fut = fetcher._send_offset_requests(tss)
assert len(send_futures) == 2
send_futures[0][2].failure(UnknownTopicOrPartitionError(tp1))
send_futures[1][2].success({tp1: (11, 1001)})
assert fut.failed()
assert isinstance(fut.exception, UnknownTopicOrPartitionError)


def test__handle_offset_response(fetcher, mocker):
# Broker returns UnsupportedForMessageFormatError, will omit partition
fut = Future()
res = OffsetResponse[1]([
("topic", [(0, 43, -1, -1)]),
("topic", [(1, 0, 1000, 9999)])
])
fetcher._handle_offset_response(fut, res)
assert fut.succeeded()
assert fut.value == {TopicPartition("topic", 1): (9999, 1000)}

# Broker returns NotLeaderForPartitionError
fut = Future()
res = OffsetResponse[1]([
("topic", [(0, 6, -1, -1)]),
])
fetcher._handle_offset_response(fut, res)
assert fut.failed()
assert isinstance(fut.exception, NotLeaderForPartitionError)

# Broker returns UnknownTopicOrPartitionError
fut = Future()
res = OffsetResponse[1]([
("topic", [(0, 3, -1, -1)]),
])
fetcher._handle_offset_response(fut, res)
assert fut.failed()
assert isinstance(fut.exception, UnknownTopicOrPartitionError)

# Broker returns many errors and 1 result
# Will fail on 1st error and return
fut = Future()
res = OffsetResponse[1]([
("topic", [(0, 43, -1, -1)]),
("topic", [(1, 6, -1, -1)]),
("topic", [(2, 3, -1, -1)]),
("topic", [(3, 0, 1000, 9999)])
])
fetcher._handle_offset_response(fut, res)
assert fut.failed()
assert isinstance(fut.exception, NotLeaderForPartitionError)

0 comments on commit 61668aa

Please # to comment.