From 117f78ea68a3a30cb4f2d3a5ca43d2587b2a0271 Mon Sep 17 00:00:00 2001 From: Tincu Gabriel Date: Tue, 5 May 2020 13:29:23 +0200 Subject: [PATCH] Add logic for inferring newer broker versions (#2038) * Add logic for inferring newer broker versions - New Fetch / ListOffsets request / response objects - Add new test cases to inferr code based on mentioned objects - Add unit test to check inferred version against whatever resides in KAFKA_VERSION - Add new kafka broker versions to integration list - Add more kafka broker versions to travis task list - Add support for broker version 2.5 id * Implement PR change requests: fewer versions for travis testing, remove unused older versions for inference code, remove one minor version from known server list Do not use newly created ACL request / responses in allowed version lists, due to flexible versions enabling in kafka actually requiring a serialization protocol header update Revert admin client file change --- .travis.yml | 1 + build_integration.sh | 6 +- kafka/conn.py | 12 +- kafka/protocol/admin.py | 20 ++ kafka/protocol/fetch.py | 182 +++++++++++++++++- kafka/protocol/offset.py | 89 ++++++++- .../resources/kafka.properties | 0 .../resources/kafka_server_jaas.conf | 0 .../resources/log4j.properties | 0 .../resources/zookeeper.properties | 0 test/test_consumer_integration.py | 15 +- 11 files changed, 315 insertions(+), 10 deletions(-) rename servers/{2.2.0 => 2.5.0}/resources/kafka.properties (100%) rename servers/{2.2.0 => 2.5.0}/resources/kafka_server_jaas.conf (100%) rename servers/{2.2.0 => 2.5.0}/resources/log4j.properties (100%) rename servers/{2.2.0 => 2.5.0}/resources/zookeeper.properties (100%) diff --git a/.travis.yml b/.travis.yml index bfbae5721..d660271fa 100644 --- a/.travis.yml +++ b/.travis.yml @@ -16,6 +16,7 @@ env: - KAFKA_VERSION=0.11.0.3 - KAFKA_VERSION=1.1.1 - KAFKA_VERSION=2.4.0 + - KAFKA_VERSION=2.5.0 addons: apt: diff --git a/build_integration.sh b/build_integration.sh index 98b9b2766..c020b0fe2 100755 --- a/build_integration.sh +++ b/build_integration.sh @@ -1,6 +1,6 @@ #!/bin/bash -: ${ALL_RELEASES:="0.8.2.2 0.9.0.1 0.10.1.1 0.10.2.2 0.11.0.3 1.0.2 1.1.1 2.0.1 2.1.1"} +: ${ALL_RELEASES:="0.8.2.2 0.9.0.1 0.10.1.1 0.10.2.2 0.11.0.3 1.0.2 1.1.1 2.0.1 2.1.1 2.2.1 2.3.0 2.4.0 2.5.0"} : ${SCALA_VERSION:=2.11} : ${DIST_BASE_URL:=https://archive.apache.org/dist/kafka/} : ${KAFKA_SRC_GIT:=https://github.com/apache/kafka.git} @@ -33,12 +33,14 @@ pushd servers echo "-------------------------------------" echo "Checking kafka binaries for ${kafka}" echo - # kafka 0.8.0 is only available w/ scala 2.8.0 if [ "$kafka" == "0.8.0" ]; then KAFKA_ARTIFACT="kafka_2.8.0-${kafka}.tar.gz" + else if [ "$kafka" \> "2.4.0" ]; then + KAFKA_ARTIFACT="kafka_2.12-${kafka}.tgz" else KAFKA_ARTIFACT="kafka_${SCALA_VERSION}-${kafka}.tgz" fi + fi if [ ! -f "../$kafka/kafka-bin/bin/kafka-run-class.sh" ]; then if [ -f "${KAFKA_ARTIFACT}" ]; then echo "Using cached artifact: ${KAFKA_ARTIFACT}" diff --git a/kafka/conn.py b/kafka/conn.py index c383123ca..5c7287568 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -24,9 +24,12 @@ from kafka.future import Future from kafka.metrics.stats import Avg, Count, Max, Rate from kafka.oauth.abstract import AbstractTokenProvider -from kafka.protocol.admin import SaslHandShakeRequest +from kafka.protocol.admin import SaslHandShakeRequest, DescribeAclsRequest_v2 from kafka.protocol.commit import OffsetFetchRequest +from kafka.protocol.offset import OffsetRequest +from kafka.protocol.produce import ProduceRequest from kafka.protocol.metadata import MetadataRequest +from kafka.protocol.fetch import FetchRequest from kafka.protocol.parser import KafkaProtocol from kafka.protocol.types import Int32, Int8 from kafka.scram import ScramClient @@ -1166,6 +1169,13 @@ def _infer_broker_version_from_api_versions(self, api_versions): # in reverse order. As soon as we find one that works, return it test_cases = [ # format (, ) + ((2, 5, 0), DescribeAclsRequest_v2), + ((2, 4, 0), ProduceRequest[8]), + ((2, 3, 0), FetchRequest[11]), + ((2, 2, 0), OffsetRequest[5]), + ((2, 1, 0), FetchRequest[10]), + ((2, 0, 0), FetchRequest[8]), + ((1, 1, 0), FetchRequest[7]), ((1, 0, 0), MetadataRequest[5]), ((0, 11, 0), MetadataRequest[4]), ((0, 10, 2), OffsetFetchRequest[2]), diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index b2694dc96..af88ea473 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -477,6 +477,13 @@ class DescribeAclsResponse_v1(Response): ('permission_type', Int8))))) ) + +class DescribeAclsResponse_v2(Response): + API_KEY = 29 + API_VERSION = 2 + SCHEMA = DescribeAclsResponse_v1.SCHEMA + + class DescribeAclsRequest_v0(Request): API_KEY = 29 API_VERSION = 0 @@ -490,6 +497,7 @@ class DescribeAclsRequest_v0(Request): ('permission_type', Int8) ) + class DescribeAclsRequest_v1(Request): API_KEY = 29 API_VERSION = 1 @@ -504,6 +512,17 @@ class DescribeAclsRequest_v1(Request): ('permission_type', Int8) ) + +class DescribeAclsRequest_v2(Request): + """ + Enable flexible version + """ + API_KEY = 29 + API_VERSION = 2 + RESPONSE_TYPE = DescribeAclsResponse_v2 + SCHEMA = DescribeAclsRequest_v1.SCHEMA + + DescribeAclsRequest = [DescribeAclsRequest_v0, DescribeAclsRequest_v1] DescribeAclsResponse = [DescribeAclsResponse_v0, DescribeAclsResponse_v1] @@ -862,3 +881,4 @@ class CreatePartitionsRequest_v1(Request): CreatePartitionsResponse = [ CreatePartitionsResponse_v0, CreatePartitionsResponse_v1, ] + diff --git a/kafka/protocol/fetch.py b/kafka/protocol/fetch.py index dd3f648cf..f367848ce 100644 --- a/kafka/protocol/fetch.py +++ b/kafka/protocol/fetch.py @@ -94,6 +94,72 @@ class FetchResponse_v6(Response): SCHEMA = FetchResponse_v5.SCHEMA +class FetchResponse_v7(Response): + """ + Add error_code and session_id to response + """ + API_KEY = 1 + API_VERSION = 7 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('error_code', Int16), + ('session_id', Int32), + ('topics', Array( + ('topics', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16), + ('highwater_offset', Int64), + ('last_stable_offset', Int64), + ('log_start_offset', Int64), + ('aborted_transactions', Array( + ('producer_id', Int64), + ('first_offset', Int64))), + ('message_set', Bytes))))) + ) + + +class FetchResponse_v8(Response): + API_KEY = 1 + API_VERSION = 8 + SCHEMA = FetchResponse_v7.SCHEMA + + +class FetchResponse_v9(Response): + API_KEY = 1 + API_VERSION = 9 + SCHEMA = FetchResponse_v7.SCHEMA + + +class FetchResponse_v10(Response): + API_KEY = 1 + API_VERSION = 10 + SCHEMA = FetchResponse_v7.SCHEMA + + +class FetchResponse_v11(Response): + API_KEY = 1 + API_VERSION = 11 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('error_code', Int16), + ('session_id', Int32), + ('topics', Array( + ('topics', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16), + ('highwater_offset', Int64), + ('last_stable_offset', Int64), + ('log_start_offset', Int64), + ('aborted_transactions', Array( + ('producer_id', Int64), + ('first_offset', Int64))), + ('preferred_read_replica', Int32), + ('message_set', Bytes))))) + ) + + class FetchRequest_v0(Request): API_KEY = 1 API_VERSION = 0 @@ -196,13 +262,125 @@ class FetchRequest_v6(Request): SCHEMA = FetchRequest_v5.SCHEMA +class FetchRequest_v7(Request): + """ + Add incremental fetch requests + """ + API_KEY = 1 + API_VERSION = 7 + RESPONSE_TYPE = FetchResponse_v7 + SCHEMA = Schema( + ('replica_id', Int32), + ('max_wait_time', Int32), + ('min_bytes', Int32), + ('max_bytes', Int32), + ('isolation_level', Int8), + ('session_id', Int32), + ('session_epoch', Int32), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('fetch_offset', Int64), + ('log_start_offset', Int64), + ('max_bytes', Int32))))), + ('forgotten_topics_data', Array( + ('topic', String), + ('partitions', Array(Int32)) + )), + ) + + +class FetchRequest_v8(Request): + """ + bump used to indicate that on quota violation brokers send out responses before throttling. + """ + API_KEY = 1 + API_VERSION = 8 + RESPONSE_TYPE = FetchResponse_v8 + SCHEMA = FetchRequest_v7.SCHEMA + + +class FetchRequest_v9(Request): + """ + adds the current leader epoch (see KIP-320) + """ + API_KEY = 1 + API_VERSION = 9 + RESPONSE_TYPE = FetchResponse_v9 + SCHEMA = Schema( + ('replica_id', Int32), + ('max_wait_time', Int32), + ('min_bytes', Int32), + ('max_bytes', Int32), + ('isolation_level', Int8), + ('session_id', Int32), + ('session_epoch', Int32), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('current_leader_epoch', Int32), + ('fetch_offset', Int64), + ('log_start_offset', Int64), + ('max_bytes', Int32))))), + ('forgotten_topics_data', Array( + ('topic', String), + ('partitions', Array(Int32)), + )), + ) + + +class FetchRequest_v10(Request): + """ + bumped up to indicate ZStandard capability. (see KIP-110) + """ + API_KEY = 1 + API_VERSION = 10 + RESPONSE_TYPE = FetchResponse_v10 + SCHEMA = FetchRequest_v9.SCHEMA + + +class FetchRequest_v11(Request): + """ + added rack ID to support read from followers (KIP-392) + """ + API_KEY = 1 + API_VERSION = 11 + RESPONSE_TYPE = FetchResponse_v11 + SCHEMA = Schema( + ('replica_id', Int32), + ('max_wait_time', Int32), + ('min_bytes', Int32), + ('max_bytes', Int32), + ('isolation_level', Int8), + ('session_id', Int32), + ('session_epoch', Int32), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('current_leader_epoch', Int32), + ('fetch_offset', Int64), + ('log_start_offset', Int64), + ('max_bytes', Int32))))), + ('forgotten_topics_data', Array( + ('topic', String), + ('partitions', Array(Int32)) + )), + ('rack_id', String('utf-8')), + ) + + FetchRequest = [ FetchRequest_v0, FetchRequest_v1, FetchRequest_v2, FetchRequest_v3, FetchRequest_v4, FetchRequest_v5, - FetchRequest_v6 + FetchRequest_v6, FetchRequest_v7, FetchRequest_v8, + FetchRequest_v9, FetchRequest_v10, FetchRequest_v11, ] FetchResponse = [ FetchResponse_v0, FetchResponse_v1, FetchResponse_v2, FetchResponse_v3, FetchResponse_v4, FetchResponse_v5, - FetchResponse_v6 + FetchResponse_v6, FetchResponse_v7, FetchResponse_v8, + FetchResponse_v9, FetchResponse_v10, FetchResponse_v11, ] diff --git a/kafka/protocol/offset.py b/kafka/protocol/offset.py index 3c254de40..1ed382b0d 100644 --- a/kafka/protocol/offset.py +++ b/kafka/protocol/offset.py @@ -53,6 +53,43 @@ class OffsetResponse_v2(Response): ) +class OffsetResponse_v3(Response): + """ + on quota violation, brokers send out responses before throttling + """ + API_KEY = 2 + API_VERSION = 3 + SCHEMA = OffsetResponse_v2.SCHEMA + + +class OffsetResponse_v4(Response): + """ + Add leader_epoch to response + """ + API_KEY = 2 + API_VERSION = 4 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16), + ('timestamp', Int64), + ('offset', Int64), + ('leader_epoch', Int32))))) + ) + + +class OffsetResponse_v5(Response): + """ + adds a new error code, OFFSET_NOT_AVAILABLE + """ + API_KEY = 2 + API_VERSION = 5 + SCHEMA = OffsetResponse_v4.SCHEMA + + class OffsetRequest_v0(Request): API_KEY = 2 API_VERSION = 0 @@ -105,5 +142,53 @@ class OffsetRequest_v2(Request): } -OffsetRequest = [OffsetRequest_v0, OffsetRequest_v1, OffsetRequest_v2] -OffsetResponse = [OffsetResponse_v0, OffsetResponse_v1, OffsetResponse_v2] +class OffsetRequest_v3(Request): + API_KEY = 2 + API_VERSION = 3 + RESPONSE_TYPE = OffsetResponse_v3 + SCHEMA = OffsetRequest_v2.SCHEMA + DEFAULTS = { + 'replica_id': -1 + } + + +class OffsetRequest_v4(Request): + """ + Add current_leader_epoch to request + """ + API_KEY = 2 + API_VERSION = 4 + RESPONSE_TYPE = OffsetResponse_v4 + SCHEMA = Schema( + ('replica_id', Int32), + ('isolation_level', Int8), # <- added isolation_level + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('current_leader_epoch', Int64), + ('timestamp', Int64))))) + ) + DEFAULTS = { + 'replica_id': -1 + } + + +class OffsetRequest_v5(Request): + API_KEY = 2 + API_VERSION = 5 + RESPONSE_TYPE = OffsetResponse_v5 + SCHEMA = OffsetRequest_v4.SCHEMA + DEFAULTS = { + 'replica_id': -1 + } + + +OffsetRequest = [ + OffsetRequest_v0, OffsetRequest_v1, OffsetRequest_v2, + OffsetRequest_v3, OffsetRequest_v4, OffsetRequest_v5, +] +OffsetResponse = [ + OffsetResponse_v0, OffsetResponse_v1, OffsetResponse_v2, + OffsetResponse_v3, OffsetResponse_v4, OffsetResponse_v5, +] diff --git a/servers/2.2.0/resources/kafka.properties b/servers/2.5.0/resources/kafka.properties similarity index 100% rename from servers/2.2.0/resources/kafka.properties rename to servers/2.5.0/resources/kafka.properties diff --git a/servers/2.2.0/resources/kafka_server_jaas.conf b/servers/2.5.0/resources/kafka_server_jaas.conf similarity index 100% rename from servers/2.2.0/resources/kafka_server_jaas.conf rename to servers/2.5.0/resources/kafka_server_jaas.conf diff --git a/servers/2.2.0/resources/log4j.properties b/servers/2.5.0/resources/log4j.properties similarity index 100% rename from servers/2.2.0/resources/log4j.properties rename to servers/2.5.0/resources/log4j.properties diff --git a/servers/2.2.0/resources/zookeeper.properties b/servers/2.5.0/resources/zookeeper.properties similarity index 100% rename from servers/2.2.0/resources/zookeeper.properties rename to servers/2.5.0/resources/zookeeper.properties diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 6e6bc9455..90b7ed203 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -6,14 +6,23 @@ from kafka.vendor.six.moves import range import kafka.codec -from kafka.errors import ( - KafkaTimeoutError, UnsupportedCodecError, UnsupportedVersionError -) +from kafka.errors import UnsupportedCodecError, UnsupportedVersionError from kafka.structs import TopicPartition, OffsetAndTimestamp from test.testutil import Timer, assert_message_count, env_kafka_version, random_string +@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") +def test_kafka_version_infer(kafka_consumer_factory): + consumer = kafka_consumer_factory() + actual_ver_major_minor = env_kafka_version()[:2] + client = consumer._client + conn = list(client._conns.values())[0] + inferred_ver_major_minor = conn.check_version()[:2] + assert actual_ver_major_minor == inferred_ver_major_minor, \ + "Was expecting inferred broker version to be %s but was %s" % (actual_ver_major_minor, inferred_ver_major_minor) + + @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") def test_kafka_consumer(kafka_consumer_factory, send_messages): """Test KafkaConsumer"""