diff --git a/docs/install.rst b/docs/install.rst index cc0e82d68..fe740f660 100644 --- a/docs/install.rst +++ b/docs/install.rst @@ -70,3 +70,16 @@ Install the `python-snappy` module .. code:: bash pip install python-snappy + + +Optional crc32c install +*********************** +Highly recommended if you are using Kafka 11+ brokers. For those `kafka-python` +uses a new message protocol version, that requires calculation of `crc32c`, +which differs from `zlib.crc32` hash implementation. By default `kafka-python` +calculates it in pure python, which is quite slow. To speed it up we optionally +support https://pypi.python.org/pypi/crc32c package if it's installed. + +.. code:: bash + + pip install crc32c diff --git a/kafka/record/util.py b/kafka/record/util.py index 55d7adbd0..74b9a69b0 100644 --- a/kafka/record/util.py +++ b/kafka/record/util.py @@ -1,6 +1,10 @@ import binascii from kafka.record._crc32c import crc as crc32c_py +try: + from crc32c import crc32 as crc32c_c +except ImportError: + crc32c_c = None def encode_varint(value, write): @@ -113,11 +117,15 @@ def decode_varint(buffer, pos=0): raise ValueError("Out of int64 range") -def calc_crc32c(memview): +_crc32c = crc32c_py +if crc32c_c is not None: + _crc32c = crc32c_c + + +def calc_crc32c(memview, _crc32c=_crc32c): """ Calculate CRC-32C (Castagnoli) checksum over a memoryview of data """ - crc = crc32c_py(memview) - return crc + return _crc32c(memview) def calc_crc32(memview): diff --git a/test/record/test_util.py b/test/record/test_util.py index bfe0fcc2e..0b2782e7a 100644 --- a/test/record/test_util.py +++ b/test/record/test_util.py @@ -68,9 +68,10 @@ def test_size_of_varint(encoded, decoded): assert util.size_of_varint(decoded) == len(encoded) -def test_crc32c(): +@pytest.mark.parametrize("crc32_func", [util.crc32c_c, util.crc32c_py]) +def test_crc32c(crc32_func): def make_crc(data): - crc = util.calc_crc32c(data) + crc = crc32_func(data) return struct.pack(">I", crc) assert make_crc(b"") == b"\x00\x00\x00\x00" assert make_crc(b"a") == b"\xc1\xd0\x43\x30" diff --git a/tox.ini b/tox.ini index 0f1aaf438..35dc84207 100644 --- a/tox.ini +++ b/tox.ini @@ -18,6 +18,7 @@ deps = python-snappy lz4 xxhash + crc32c py26: unittest2 commands = py.test {posargs:--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka --cov-config=.covrc}