Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Minor test cleanup #1885

Merged
merged 1 commit into from
Aug 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,21 @@ def version():
"""Return the Kafka version set in the OS environment"""
return kafka_version()


@pytest.fixture(scope="module")
def zookeeper():
"""Return a Zookeeper fixture"""
zk_instance = ZookeeperFixture.instance()
yield zk_instance
zk_instance.close()


@pytest.fixture(scope="module")
def kafka_broker(kafka_broker_factory):
"""Return a Kafka broker fixture"""
return kafka_broker_factory()[0]


@pytest.fixture(scope="module")
def kafka_broker_factory(version, zookeeper):
"""Return a Kafka broker fixture factory"""
Expand All @@ -42,6 +45,7 @@ def factory(**broker_params):
for broker in _brokers:
broker.close()


@pytest.fixture
def simple_client(kafka_broker, request, topic):
"""Return a SimpleClient fixture"""
Expand All @@ -50,18 +54,21 @@ def simple_client(kafka_broker, request, topic):
yield client
client.close()


@pytest.fixture
def kafka_client(kafka_broker, request):
"""Return a KafkaClient fixture"""
(client,) = kafka_broker.get_clients(cnt=1, client_id='%s_client' % (request.node.name,))
yield client
client.close()


@pytest.fixture
def kafka_consumer(kafka_consumer_factory):
"""Return a KafkaConsumer fixture"""
return kafka_consumer_factory()


@pytest.fixture
def kafka_consumer_factory(kafka_broker, topic, request):
"""Return a KafkaConsumer factory fixture"""
Expand All @@ -79,11 +86,13 @@ def factory(**kafka_consumer_params):
if _consumer[0]:
_consumer[0].close()


@pytest.fixture
def kafka_producer(kafka_producer_factory):
"""Return a KafkaProducer fixture"""
yield kafka_producer_factory()


@pytest.fixture
def kafka_producer_factory(kafka_broker, request):
"""Return a KafkaProduce factory fixture"""
Expand All @@ -100,13 +109,15 @@ def factory(**kafka_producer_params):
if _producer[0]:
_producer[0].close()


@pytest.fixture
def topic(kafka_broker, request):
"""Return a topic fixture"""
topic_name = '%s_%s' % (request.node.name, random_string(10))
kafka_broker.create_topics([topic_name])
return topic_name


@pytest.fixture
def conn(mocker):
"""Return a connection mocker fixture"""
Expand Down
9 changes: 8 additions & 1 deletion test/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,33 @@

log = logging.getLogger(__name__)


def random_string(length):
return "".join(random.choice(string.ascii_letters) for i in range(length))


def version_str_to_tuple(version_str):
"""Transform a version string into a tuple.

Example: '0.8.1.1' --> (0, 8, 1, 1)
"""
return tuple(map(int, version_str.split('.')))


def version():
if 'KAFKA_VERSION' not in os.environ:
return ()
return version_str_to_tuple(os.environ['KAFKA_VERSION'])


def get_open_port():
sock = socket.socket()
sock.bind(("", 0))
port = sock.getsockname()[1]
sock.close()
return port


def gen_ssl_resources(directory):
os.system("""
cd {0}
Expand Down Expand Up @@ -74,6 +79,7 @@ def gen_ssl_resources(directory):
-file cert-signed -storepass foobar -noprompt
""".format(directory))


class Fixture(object):
kafka_version = os.environ.get('KAFKA_VERSION', '0.11.0.2')
scala_version = os.environ.get("SCALA_VERSION", '2.8.0')
Expand Down Expand Up @@ -158,6 +164,7 @@ def render_template(cls, source_file, target_file, binding):
def dump_logs(self):
self.child.dump_logs()


class ZookeeperFixture(Fixture):
@classmethod
def instance(cls):
Expand Down Expand Up @@ -496,7 +503,7 @@ def _create_topic(self, topic_name, num_partitions, replication_factor, timeout_
proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = proc.communicate()
if proc.returncode != 0:
if not 'kafka.common.TopicExistsException' in stdout:
if 'kafka.common.TopicExistsException' not in stdout:
self.out("Failed to create topic %s" % (topic_name,))
self.out(stdout)
self.out(stderr)
Expand Down
3 changes: 1 addition & 2 deletions test/test_assignors.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@

from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
from kafka.coordinator.protocol import (
ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment)
from kafka.coordinator.protocol import ConsumerProtocolMemberAssignment


@pytest.fixture
Expand Down
2 changes: 1 addition & 1 deletion test/test_codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from kafka.vendor.six.moves import range

from kafka.codec import (
has_snappy, has_gzip, has_lz4,
has_snappy, has_lz4,
gzip_encode, gzip_decode,
snappy_encode, snappy_decode,
lz4_encode, lz4_decode,
Expand Down
1 change: 0 additions & 1 deletion test/test_conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

from errno import EALREADY, EINPROGRESS, EISCONN, ECONNRESET
import socket
import time

import mock
import pytest
Expand Down
5 changes: 2 additions & 3 deletions test/test_consumer_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from kafka.conn import ConnectionStates
from kafka.consumer.group import KafkaConsumer
from kafka.coordinator.base import MemberState, Generation
from kafka.coordinator.base import MemberState
from kafka.structs import TopicPartition

from test.fixtures import random_string, version
Expand All @@ -34,8 +34,7 @@ def test_consumer_topics(kafka_broker, topic, version):
consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker))
# Necessary to drive the IO
consumer.poll(500)
consumer_topics = consumer.topics()
assert topic in consumer_topics
assert topic in consumer.topics()
assert len(consumer.partitions_for_topic(topic)) > 0
consumer.close()

Expand Down
1 change: 0 additions & 1 deletion test/test_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import struct

import pytest
from kafka.vendor import six

from kafka.protocol.api import RequestHeader
from kafka.protocol.commit import GroupCoordinatorRequest
Expand Down
4 changes: 2 additions & 2 deletions test/testutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ def construct_lambda(s):
op_str = '='
v_str = s
elif s[1].isdigit():
op_str = s[0] # ! < > =
op_str = s[0] # ! < > =
v_str = s[1:]
elif s[2].isdigit():
op_str = s[0:2] # >= <=
op_str = s[0:2] # >= <=
v_str = s[2:]
else:
raise ValueError('Unrecognized kafka version / operator: %s' % (s,))
Expand Down