Skip to content

Commit

Permalink
Implement methods to convert a Struct object to a pythonic object (#1951
Browse files Browse the repository at this point in the history
)

Implement methods to convert a Struct object to a pythonic object
  • Loading branch information
TylerLubeck authored Feb 6, 2020
1 parent 3d98741 commit 209515b
Show file tree
Hide file tree
Showing 3 changed files with 273 additions and 1 deletion.
32 changes: 31 additions & 1 deletion kafka/protocol/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import abc

from kafka.protocol.struct import Struct
from kafka.protocol.types import Int16, Int32, String, Schema
from kafka.protocol.types import Int16, Int32, String, Schema, Array


class RequestHeader(Struct):
Expand Down Expand Up @@ -47,6 +47,9 @@ def expect_response(self):
"""Override this method if an api request does not always generate a response"""
return True

def to_object(self):
return _to_object(self.SCHEMA, self)


class Response(Struct):
__metaclass__ = abc.ABCMeta
Expand All @@ -65,3 +68,30 @@ def API_VERSION(self):
def SCHEMA(self):
"""An instance of Schema() representing the response structure"""
pass

def to_object(self):
return _to_object(self.SCHEMA, self)


def _to_object(schema, data):
obj = {}
for idx, (name, _type) in enumerate(zip(schema.names, schema.fields)):
if isinstance(data, Struct):
val = data.get_item(name)
else:
val = data[idx]

if isinstance(_type, Schema):
obj[name] = _to_object(_type, val)
elif isinstance(_type, Array):
if isinstance(_type.array_of, (Array, Schema)):
obj[name] = [
_to_object(_type.array_of, x)
for x in val
]
else:
obj[name] = val
else:
obj[name] = val

return obj
6 changes: 6 additions & 0 deletions kafka/protocol/struct.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def __init__(self, *args, **kwargs):
# causes instances to "leak" to garbage
self.encode = WeakMethod(self._encode_self)


@classmethod
def encode(cls, item): # pylint: disable=E0202
bits = []
Expand All @@ -48,6 +49,11 @@ def decode(cls, data):
data = BytesIO(data)
return cls(*[field.decode(data) for field in cls.SCHEMA.fields])

def get_item(self, name):
if name not in self.SCHEMA.names:
raise KeyError("%s is not in the schema" % name)
return self.__dict__[name]

def __repr__(self):
key_vals = []
for name, field in zip(self.SCHEMA.names, self.SCHEMA.fields):
Expand Down
236 changes: 236 additions & 0 deletions test/test_object_conversion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
from kafka.protocol.admin import Request
from kafka.protocol.admin import Response
from kafka.protocol.types import Schema
from kafka.protocol.types import Array
from kafka.protocol.types import Int16
from kafka.protocol.types import String

import pytest

@pytest.mark.parametrize('superclass', (Request, Response))
class TestObjectConversion:
def test_get_item(self, superclass):
class TestClass(superclass):
API_KEY = 0
API_VERSION = 0
RESPONSE_TYPE = None # To satisfy the Request ABC
SCHEMA = Schema(
('myobject', Int16))

tc = TestClass(myobject=0)
assert tc.get_item('myobject') == 0
with pytest.raises(KeyError):
tc.get_item('does-not-exist')

def test_with_empty_schema(self, superclass):
class TestClass(superclass):
API_KEY = 0
API_VERSION = 0
RESPONSE_TYPE = None # To satisfy the Request ABC
SCHEMA = Schema()

tc = TestClass()
tc.encode()
assert tc.to_object() == {}

def test_with_basic_schema(self, superclass):
class TestClass(superclass):
API_KEY = 0
API_VERSION = 0
RESPONSE_TYPE = None # To satisfy the Request ABC
SCHEMA = Schema(
('myobject', Int16))

tc = TestClass(myobject=0)
tc.encode()
assert tc.to_object() == {'myobject': 0}

def test_with_basic_array_schema(self, superclass):
class TestClass(superclass):
API_KEY = 0
API_VERSION = 0
RESPONSE_TYPE = None # To satisfy the Request ABC
SCHEMA = Schema(
('myarray', Array(Int16)))

tc = TestClass(myarray=[1,2,3])
tc.encode()
assert tc.to_object()['myarray'] == [1, 2, 3]

def test_with_complex_array_schema(self, superclass):
class TestClass(superclass):
API_KEY = 0
API_VERSION = 0
RESPONSE_TYPE = None # To satisfy the Request ABC
SCHEMA = Schema(
('myarray', Array(
('subobject', Int16),
('othersubobject', String('utf-8')))))

tc = TestClass(
myarray=[[10, 'hello']]
)
tc.encode()
obj = tc.to_object()
assert len(obj['myarray']) == 1
assert obj['myarray'][0]['subobject'] == 10
assert obj['myarray'][0]['othersubobject'] == 'hello'

def test_with_array_and_other(self, superclass):
class TestClass(superclass):
API_KEY = 0
API_VERSION = 0
RESPONSE_TYPE = None # To satisfy the Request ABC
SCHEMA = Schema(
('myarray', Array(
('subobject', Int16),
('othersubobject', String('utf-8')))),
('notarray', Int16))

tc = TestClass(
myarray=[[10, 'hello']],
notarray=42
)

obj = tc.to_object()
assert len(obj['myarray']) == 1
assert obj['myarray'][0]['subobject'] == 10
assert obj['myarray'][0]['othersubobject'] == 'hello'
assert obj['notarray'] == 42

def test_with_nested_array(self, superclass):
class TestClass(superclass):
API_KEY = 0
API_VERSION = 0
RESPONSE_TYPE = None # To satisfy the Request ABC
SCHEMA = Schema(
('myarray', Array(
('subarray', Array(Int16)),
('otherobject', Int16))))

tc = TestClass(
myarray=[
[[1, 2], 2],
[[2, 3], 4],
]
)
print(tc.encode())


obj = tc.to_object()
assert len(obj['myarray']) == 2
assert obj['myarray'][0]['subarray'] == [1, 2]
assert obj['myarray'][0]['otherobject'] == 2
assert obj['myarray'][1]['subarray'] == [2, 3]
assert obj['myarray'][1]['otherobject'] == 4

def test_with_complex_nested_array(self, superclass):
class TestClass(superclass):
API_KEY = 0
API_VERSION = 0
RESPONSE_TYPE = None # To satisfy the Request ABC
SCHEMA = Schema(
('myarray', Array(
('subarray', Array(
('innertest', String('utf-8')),
('otherinnertest', String('utf-8')))),
('othersubarray', Array(Int16)))),
('notarray', String('utf-8')))

tc = TestClass(
myarray=[
[[['hello', 'hello'], ['hello again', 'hello again']], [0]],
[[['hello', 'hello again']], [1]],
],
notarray='notarray'
)
tc.encode()

obj = tc.to_object()

assert obj['notarray'] == 'notarray'
myarray = obj['myarray']
assert len(myarray) == 2

assert myarray[0]['othersubarray'] == [0]
assert len(myarray[0]['subarray']) == 2
assert myarray[0]['subarray'][0]['innertest'] == 'hello'
assert myarray[0]['subarray'][0]['otherinnertest'] == 'hello'
assert myarray[0]['subarray'][1]['innertest'] == 'hello again'
assert myarray[0]['subarray'][1]['otherinnertest'] == 'hello again'

assert myarray[1]['othersubarray'] == [1]
assert len(myarray[1]['subarray']) == 1
assert myarray[1]['subarray'][0]['innertest'] == 'hello'
assert myarray[1]['subarray'][0]['otherinnertest'] == 'hello again'

def test_with_metadata_response():
from kafka.protocol.metadata import MetadataResponse_v5
tc = MetadataResponse_v5(
throttle_time_ms=0,
brokers=[
[0, 'testhost0', 9092, 'testrack0'],
[1, 'testhost1', 9092, 'testrack1'],
],
cluster_id='abcd',
controller_id=0,
topics=[
[0, 'testtopic1', False, [
[0, 0, 0, [0, 1], [0, 1], []],
[0, 1, 1, [1, 0], [1, 0], []],
],
], [0, 'other-test-topic', True, [
[0, 0, 0, [0, 1], [0, 1], []],
]
]]
)
tc.encode() # Make sure this object encodes successfully


obj = tc.to_object()

assert obj['throttle_time_ms'] == 0

assert len(obj['brokers']) == 2
assert obj['brokers'][0]['node_id'] == 0
assert obj['brokers'][0]['host'] == 'testhost0'
assert obj['brokers'][0]['port'] == 9092
assert obj['brokers'][0]['rack'] == 'testrack0'
assert obj['brokers'][1]['node_id'] == 1
assert obj['brokers'][1]['host'] == 'testhost1'
assert obj['brokers'][1]['port'] == 9092
assert obj['brokers'][1]['rack'] == 'testrack1'

assert obj['cluster_id'] == 'abcd'
assert obj['controller_id'] == 0

assert len(obj['topics']) == 2
assert obj['topics'][0]['error_code'] == 0
assert obj['topics'][0]['topic'] == 'testtopic1'
assert obj['topics'][0]['is_internal'] == False
assert len(obj['topics'][0]['partitions']) == 2
assert obj['topics'][0]['partitions'][0]['error_code'] == 0
assert obj['topics'][0]['partitions'][0]['partition'] == 0
assert obj['topics'][0]['partitions'][0]['leader'] == 0
assert obj['topics'][0]['partitions'][0]['replicas'] == [0, 1]
assert obj['topics'][0]['partitions'][0]['isr'] == [0, 1]
assert obj['topics'][0]['partitions'][0]['offline_replicas'] == []
assert obj['topics'][0]['partitions'][1]['error_code'] == 0
assert obj['topics'][0]['partitions'][1]['partition'] == 1
assert obj['topics'][0]['partitions'][1]['leader'] == 1
assert obj['topics'][0]['partitions'][1]['replicas'] == [1, 0]
assert obj['topics'][0]['partitions'][1]['isr'] == [1, 0]
assert obj['topics'][0]['partitions'][1]['offline_replicas'] == []

assert obj['topics'][1]['error_code'] == 0
assert obj['topics'][1]['topic'] == 'other-test-topic'
assert obj['topics'][1]['is_internal'] == True
assert len(obj['topics'][1]['partitions']) == 1
assert obj['topics'][1]['partitions'][0]['error_code'] == 0
assert obj['topics'][1]['partitions'][0]['partition'] == 0
assert obj['topics'][1]['partitions'][0]['leader'] == 0
assert obj['topics'][1]['partitions'][0]['replicas'] == [0, 1]
assert obj['topics'][1]['partitions'][0]['isr'] == [0, 1]
assert obj['topics'][1]['partitions'][0]['offline_replicas'] == []

tc.encode()

0 comments on commit 209515b

Please # to comment.