Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

convert test_admin_integration.py to pytest #1923

Merged
merged 1 commit into from
Oct 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,25 @@ def factory(**kafka_producer_params):
if _producer[0]:
_producer[0].close()

@pytest.fixture
def kafka_admin_client(kafka_admin_client_factory):
"""Return a KafkaAdminClient fixture"""
yield kafka_admin_client_factory()

@pytest.fixture
def kafka_admin_client_factory(kafka_broker):
"""Return a KafkaAdminClient factory fixture"""
_admin_client = [None]

def factory(**kafka_admin_client_params):
params = {} if kafka_admin_client_params is None else kafka_admin_client_params.copy()
_admin_client[0] = next(kafka_broker.get_admin_clients(cnt=1, **params))
return _admin_client[0]

yield factory

if _admin_client[0]:
_admin_client[0].close()

@pytest.fixture
def topic(kafka_broker, request):
Expand Down
10 changes: 9 additions & 1 deletion test/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from kafka.vendor.six.moves import urllib, range
from kafka.vendor.six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401

from kafka import errors, KafkaConsumer, KafkaProducer, SimpleClient
from kafka import errors, KafkaConsumer, KafkaProducer, SimpleClient, KafkaAdminClient
from kafka.client_async import KafkaClient
from kafka.protocol.admin import CreateTopicsRequest
from kafka.protocol.metadata import MetadataRequest
Expand Down Expand Up @@ -500,6 +500,14 @@ def get_clients(self, cnt=1, client_id=None):
return tuple(KafkaClient(client_id='%s_%s' % (client_id, random_string(4)),
bootstrap_servers=self.bootstrap_server()) for x in range(cnt))

def get_admin_clients(self, cnt=1, **params):
params.setdefault('client_id', 'admin_client')
params['bootstrap_servers'] = self.bootstrap_server()
client_id = params['client_id']
for x in range(cnt):
params['client_id'] = '%s_%s' % (client_id, random_string(4))
yield KafkaAdminClient(**params)

def get_consumers(self, cnt, topics, **params):
params.setdefault('client_id', 'consumer')
params.setdefault('heartbeat_interval_ms', 500)
Expand Down
164 changes: 62 additions & 102 deletions test/test_admin_integration.py
Original file line number Diff line number Diff line change
@@ -1,122 +1,82 @@
import pytest
import os

from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import KafkaIntegrationTestCase, env_kafka_version, current_offset
from test.testutil import env_kafka_version

from kafka.errors import NoError
from kafka.admin import KafkaAdminClient, ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL
from kafka.admin import ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL

# This test suite passes for me locally, but fails on travis
# Needs investigation
DISABLED = True

# TODO: Convert to pytest / fixtures
# Note that ACL features require broker 0.11, but other admin apis may work on
# earlier broker versions
class TestAdminClientIntegration(KafkaIntegrationTestCase):
@classmethod
def setUpClass(cls): # noqa
if env_kafka_version() < (0, 11) or DISABLED:
return
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11")
def test_create_describe_delete_acls(kafka_admin_client):
"""Tests that we can add, list and remove ACLs
"""

cls.zk = ZookeeperFixture.instance()
cls.server = KafkaFixture.instance(0, cls.zk)

@classmethod
def tearDownClass(cls): # noqa
if env_kafka_version() < (0, 11) or DISABLED:
return

cls.server.close()
cls.zk.close()

def setUp(self):
if env_kafka_version() < (0, 11) or DISABLED:
self.skipTest('Admin ACL Integration test requires KAFKA_VERSION >= 0.11')
super(TestAdminClientIntegration, self).setUp()

def tearDown(self):
if env_kafka_version() < (0, 11) or DISABLED:
return
super(TestAdminClientIntegration, self).tearDown()

def test_create_describe_delete_acls(self):
"""Tests that we can add, list and remove ACLs
"""

# Setup
brokers = '%s:%d' % (self.server.host, self.server.port)
admin_client = KafkaAdminClient(
bootstrap_servers=brokers
# Check that we don't have any ACLs in the cluster
acls, error = kafka_admin_client.describe_acls(
ACLFilter(
principal=None,
host="*",
operation=ACLOperation.ANY,
permission_type=ACLPermissionType.ANY,
resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
)

# Check that we don't have any ACLs in the cluster
acls, error = admin_client.describe_acls(
)

assert error is NoError
assert len(acls) == 0

# Try to add an ACL
acl = ACL(
principal="User:test",
host="*",
operation=ACLOperation.READ,
permission_type=ACLPermissionType.ALLOW,
resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
)
result = kafka_admin_client.create_acls([acl])

assert len(result["failed"]) == 0
assert len(result["succeeded"]) == 1

# Check that we can list the ACL we created
acl_filter = ACLFilter(
principal=None,
host="*",
operation=ACLOperation.ANY,
permission_type=ACLPermissionType.ANY,
resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
)
acls, error = kafka_admin_client.describe_acls(acl_filter)

assert error is NoError
assert len(acls) == 1

# Remove the ACL
delete_results = kafka_admin_client.delete_acls(
[
ACLFilter(
principal=None,
principal="User:test",
host="*",
operation=ACLOperation.ANY,
permission_type=ACLPermissionType.ANY,
operation=ACLOperation.READ,
permission_type=ACLPermissionType.ALLOW,
resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
)
)
]
)

self.assertIs(error, NoError)
self.assertEqual(0, len(acls))
assert len(delete_results) == 1
assert len(delete_results[0][1]) == 1 # Check number of affected ACLs

# Try to add an ACL
acl = ACL(
principal="User:test",
host="*",
operation=ACLOperation.READ,
permission_type=ACLPermissionType.ALLOW,
resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
)
result = admin_client.create_acls([acl])

self.assertFalse(len(result["failed"]))
self.assertEqual(len(result["succeeded"]), 1)

# Check that we can list the ACL we created
acl_filter = ACLFilter(
principal=None,
# Make sure the ACL does not exist in the cluster anymore
acls, error = kafka_admin_client.describe_acls(
ACLFilter(
principal="*",
host="*",
operation=ACLOperation.ANY,
permission_type=ACLPermissionType.ANY,
resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
)
acls, error = admin_client.describe_acls(acl_filter)

self.assertIs(error, NoError)
self.assertEqual(1, len(acls))

# Remove the ACL
delete_results = admin_client.delete_acls(
[
ACLFilter(
principal="User:test",
host="*",
operation=ACLOperation.READ,
permission_type=ACLPermissionType.ALLOW,
resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
)
]
)
)

self.assertEqual(1, len(delete_results))
self.assertEqual(1, len(delete_results[0][1])) # Check number of affected ACLs


# Make sure the ACL does not exist in the cluster anymore
acls, error = admin_client.describe_acls(
ACLFilter(
principal="*",
host="*",
operation=ACLOperation.ANY,
permission_type=ACLPermissionType.ANY,
resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
)
)
self.assertIs(error, NoError)
self.assertEqual(0, len(acls))
assert error is NoError
assert len(acls) == 0