diff --git a/CHANGELOG.md b/CHANGELOG.md index 9c6620c57e..ea7206ceac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ librdkafka v2.3.0 is a feature release: * [KIP-580](https://cwiki.apache.org/confluence/display/KAFKA/KIP-580%3A+Exponential+Backoff+for+Kafka+Clients): Added Exponential Backoff mechanism for retriable requests with `retry.backoff.ms` as minimum backoff and `retry.backoff.max.ms` as the maximum backoff, with 20% jitter (#4422). + * [KIP-396](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97551484): completed the implementation with + the addition of ListOffsets (#4225). * Fixed ListConsumerGroupOffsets not fetching offsets for all the topics in a group with Apache Kafka version below 2.4.0. * Add missing destroy that leads to leaking partition structure memory when there are partition leader changes and a stale leader epoch is received (#4429). @@ -32,6 +34,8 @@ librdkafka v2.3.0 is a feature release: consume_cb (#4431). * Fix for idempotent producer fatal errors, triggered after a possibly persisted message state (#4438). * Fix `rd_kafka_query_watermark_offsets` continuing beyond timeout expiry (#4460). + * Fix `rd_kafka_query_watermark_offsets` not refreshing the partition leader + after a leader change and subsequent `NOT_LEADER_OR_FOLLOWER` error (#4225). ## Upgrade considerations @@ -54,9 +58,9 @@ librdkafka v2.3.0 is a feature release: * An assertion failed with insufficient buffer size when allocating rack information on 32bit architectures. Solved by aligning all allocations to the maximum allowed word size (#4449). - * The timeout for `rd_kafka_query_watermark_offsets` was not checked after + * The timeout for `rd_kafka_query_watermark_offsets` was not enforced after making the necessary ListOffsets requests, and thus, it never timed out in - case of broker/network issues. Fixed by checking timeout expiry (#4460). + case of broker/network issues. Fixed by setting an absolute timeout (#4460). ### Idempotent producer fixes @@ -93,6 +97,10 @@ librdkafka v2.3.0 is a feature release: consumer messages, while the method to service the queue internally also services the queue forwarded to from `rk_rep`, which is `rkcg_q`. Solved by moving the `max.poll.interval.ms` check into `rd_kafka_q_serve` (#4431). + * After a leader change a `rd_kafka_query_watermark_offsets` call would continue + trying to call ListOffsets on the old leader, if the topic wasn't included in + the subscription set, so it started querying the new leader only after + `topic.metadata.refresh.interval.ms` (#4225). diff --git a/INTRODUCTION.md b/INTRODUCTION.md index f90d8f7a0e..b0e2bd38b0 100644 --- a/INTRODUCTION.md +++ b/INTRODUCTION.md @@ -319,7 +319,7 @@ error code set. The application should typically not attempt to retry producing the message on failure, but instead configure librdkafka to perform these retries -using the `retries`, `retry.backoff.ms` and `retry.backoff.max.ms` +using the `retries`, `retry.backoff.ms` and `retry.backoff.max.ms` configuration properties. @@ -1923,7 +1923,7 @@ The [Apache Kafka Implementation Proposals (KIPs)](https://cwiki.apache.org/conf | KIP-389 - Consumer group max size | 2.2.0 | Supported (error is propagated to application, but the consumer does not raise a fatal error) | | KIP-392 - Allow consumers to fetch from closest replica | 2.4.0 | Supported | | KIP-394 - Consumer: require member.id in JoinGroupRequest | 2.2.0 | Supported | -| KIP-396 - AdminAPI: commit/list offsets | 2.4.0 | Partially supported (remaining APIs available outside Admin client) | +| KIP-396 - AdminAPI: commit/list offsets | 2.4.0 | Supported | | KIP-412 - AdminAPI: adjust log levels | 2.4.0 | Not supported | | KIP-421 - Variables in client config files | 2.3.0 | Not applicable (librdkafka, et.al, does not provide a config file interface, and shouldn't) | | KIP-429 - Consumer: incremental rebalance protocol | 2.4.0 | Supported | @@ -1976,7 +1976,7 @@ release of librdkafka. | ------- | ------------------------------| ----------- | ----------------------- | | 0 | Produce | 9 | 7 | | 1 | Fetch | 15 | 11 | -| 2 | ListOffsets | 8 | 5 | +| 2 | ListOffsets | 8 | 7 | | 3 | Metadata | 12 | 12 | | 8 | OffsetCommit | 8 | 7 | | 9 | OffsetFetch | 8 | 7 | diff --git a/examples/.gitignore b/examples/.gitignore index f56e06bf2e..9b2c65a2f7 100644 --- a/examples/.gitignore +++ b/examples/.gitignore @@ -20,4 +20,5 @@ describe_cluster list_consumer_group_offsets alter_consumer_group_offsets incremental_alter_configs -user_scram \ No newline at end of file +user_scram +list_offsets \ No newline at end of file diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 9b1478ea2d..8c0079abee 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -59,6 +59,9 @@ target_link_libraries(describe_topics PUBLIC rdkafka) add_executable(describe_cluster describe_cluster.c ${win32_sources}) target_link_libraries(describe_cluster PUBLIC rdkafka) +add_executable(list_offsets list_offsets.c ${win32_sources}) +target_link_libraries(list_offsets PUBLIC rdkafka) + # The targets below has Unix include dirs and do not compile on Windows. if(NOT WIN32) add_executable(rdkafka_example rdkafka_example.c) @@ -69,4 +72,5 @@ if(NOT WIN32) add_executable(kafkatest_verifiable_client kafkatest_verifiable_client.cpp) target_link_libraries(kafkatest_verifiable_client PUBLIC rdkafka++) + endif(NOT WIN32) diff --git a/examples/Makefile b/examples/Makefile index f97e33eacd..f76702d02c 100644 --- a/examples/Makefile +++ b/examples/Makefile @@ -12,6 +12,7 @@ EXAMPLES ?= rdkafka_example rdkafka_performance rdkafka_example_cpp \ alter_consumer_group_offsets \ incremental_alter_configs \ user_scram \ + list_offsets \ misc all: $(EXAMPLES) @@ -148,6 +149,10 @@ user_scram: ../src/librdkafka.a user_scram.c $(CC) $(CPPFLAGS) $(CFLAGS) $@.c -o $@ $(LDFLAGS) \ ../src/librdkafka.a $(LIBS) +list_offsets: ../src/librdkafka.a list_offsets.c + $(CC) $(CPPFLAGS) $(CFLAGS) $@.c -o $@ $(LDFLAGS) \ + ../src/librdkafka.a $(LIBS) + misc: ../src/librdkafka.a misc.c $(CC) $(CPPFLAGS) $(CFLAGS) $@.c -o $@ $(LDFLAGS) \ ../src/librdkafka.a $(LIBS) diff --git a/examples/list_offsets.c b/examples/list_offsets.c new file mode 100644 index 0000000000..d01c975030 --- /dev/null +++ b/examples/list_offsets.c @@ -0,0 +1,316 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2023, Confluent Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SH THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +/** + * Example utility that shows how to use ListOffsets (AdminAPI) + * to list the offset[EARLIEST,LATEST,...] for + * one or more topic partitions. + */ + +#include +#include +#include +#include + +#ifdef _WIN32 +#include "../win32/wingetopt.h" +#else +#include +#endif + + +/* Typical include path would be , but this program + * is builtin from within the librdkafka source tree and thus differs. */ +#include "rdkafka.h" + + +const char *argv0; + +static rd_kafka_queue_t *queue; /** Admin result queue. + * This is a global so we can + * yield in stop() */ +static volatile sig_atomic_t run = 1; + +/** + * @brief Signal termination of program + */ +static void stop(int sig) { + if (!run) { + fprintf(stderr, "%% Forced termination\n"); + exit(2); + } + run = 0; + rd_kafka_queue_yield(queue); +} + + +static void usage(const char *reason, ...) { + + fprintf(stderr, + "List offsets usage examples\n" + "\n" + "Usage: %s [--] " + " " + "[ ...]\n" + "\n" + "Options:\n" + " -b Bootstrap server list to connect to.\n" + " -X Set librdkafka configuration property.\n" + " See CONFIGURATION.md for full list.\n" + " -d Enable librdkafka debugging (%s).\n" + "\n", + argv0, rd_kafka_get_debug_contexts()); + + if (reason) { + va_list ap; + char reasonbuf[512]; + + va_start(ap, reason); + vsnprintf(reasonbuf, sizeof(reasonbuf), reason, ap); + va_end(ap); + + fprintf(stderr, "ERROR: %s\n", reasonbuf); + } + + exit(reason ? 1 : 0); +} + + +#define fatal(...) \ + do { \ + fprintf(stderr, "ERROR: "); \ + fprintf(stderr, __VA_ARGS__); \ + fprintf(stderr, "\n"); \ + exit(2); \ + } while (0) + + +/** + * @brief Set config property. Exit on failure. + */ +static void conf_set(rd_kafka_conf_t *conf, const char *name, const char *val) { + char errstr[512]; + + if (rd_kafka_conf_set(conf, name, val, errstr, sizeof(errstr)) != + RD_KAFKA_CONF_OK) + fatal("Failed to set %s=%s: %s", name, val, errstr); +} + +/** + * @brief Print list offsets result information. + */ +static int +print_list_offsets_result_info(const rd_kafka_ListOffsets_result_t *result) { + const rd_kafka_ListOffsetsResultInfo_t **result_infos; + size_t cnt; + size_t i; + result_infos = rd_kafka_ListOffsets_result_infos(result, &cnt); + printf("ListOffsets results:\n"); + for (i = 0; i < cnt; i++) { + const rd_kafka_topic_partition_t *topic_partition = + rd_kafka_ListOffsetsResultInfo_topic_partition( + result_infos[i]); + int64_t timestamp = + rd_kafka_ListOffsetsResultInfo_timestamp(result_infos[i]); + printf( + "Topic: %s Partition: %d Error: %s " + "Offset: %" PRId64 " Leader Epoch: %" PRId32 + " Timestamp: %" PRId64 "\n", + topic_partition->topic, topic_partition->partition, + rd_kafka_err2str(topic_partition->err), + topic_partition->offset, + rd_kafka_topic_partition_get_leader_epoch(topic_partition), + timestamp); + } + return 0; +} + +/** + * @brief Parse an integer or fail. + */ +int64_t parse_int(const char *what, const char *str) { + char *end; + unsigned long n = strtoull(str, &end, 0); + + if (end != str + strlen(str)) { + fprintf(stderr, "%% Invalid input for %s: %s: not an integer\n", + what, str); + exit(1); + } + + return (int64_t)n; +} + +/** + * @brief Call rd_kafka_ListOffsets() with a list of topic partitions. + */ +static void cmd_list_offsets(rd_kafka_conf_t *conf, int argc, char **argv) { + rd_kafka_t *rk; + char errstr[512]; + rd_kafka_AdminOptions_t *options; + rd_kafka_IsolationLevel_t isolation_level; + rd_kafka_event_t *event = NULL; + rd_kafka_error_t *error = NULL; + int i; + int retval = 0; + rd_kafka_topic_partition_list_t *rktpars; + + if ((argc - 1) % 3 != 0) { + usage("Wrong number of arguments: %d", argc); + } + + isolation_level = parse_int("isolation level", argv[0]); + argc--; + argv++; + rktpars = rd_kafka_topic_partition_list_new(argc / 3); + for (i = 0; i < argc; i += 3) { + rd_kafka_topic_partition_list_add( + rktpars, argv[i], parse_int("partition", argv[i + 1])) + ->offset = parse_int("offset", argv[i + 2]); + } + + /* + * Create consumer instance + * NOTE: rd_kafka_new() takes ownership of the conf object + * and the application must not reference it again after + * this call. + */ + rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)); + if (!rk) { + usage("Failed to create new consumer: %s", errstr); + } + + /* + * List offsets + */ + queue = rd_kafka_queue_new(rk); + + /* Signal handler for clean shutdown */ + signal(SIGINT, stop); + + options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_LISTOFFSETS); + + if (rd_kafka_AdminOptions_set_request_timeout( + options, 10 * 1000 /* 10s */, errstr, sizeof(errstr))) { + fprintf(stderr, "%% Failed to set timeout: %s\n", errstr); + goto exit; + } + + if ((error = rd_kafka_AdminOptions_set_isolation_level( + options, isolation_level))) { + fprintf(stderr, "%% Failed to set isolation level: %s\n", + rd_kafka_error_string(error)); + rd_kafka_error_destroy(error); + goto exit; + } + + rd_kafka_ListOffsets(rk, rktpars, options, queue); + rd_kafka_topic_partition_list_destroy(rktpars); + rd_kafka_AdminOptions_destroy(options); + + /* Wait for results */ + event = rd_kafka_queue_poll(queue, -1 /* indefinitely but limited by + * the request timeout set + * above (10s) */); + + if (!event) { + /* User hit Ctrl-C, + * see yield call in stop() signal handler */ + fprintf(stderr, "%% Cancelled by user\n"); + + } else if (rd_kafka_event_error(event)) { + rd_kafka_resp_err_t err = rd_kafka_event_error(event); + /* ListOffsets request failed */ + fprintf(stderr, "%% ListOffsets failed[%" PRId32 "]: %s\n", err, + rd_kafka_event_error_string(event)); + goto exit; + } else { + /* ListOffsets request succeeded, but individual + * partitions may have errors. */ + const rd_kafka_ListOffsets_result_t *result; + result = rd_kafka_event_ListOffsets_result(event); + retval = print_list_offsets_result_info(result); + } + + +exit: + if (event) + rd_kafka_event_destroy(event); + rd_kafka_queue_destroy(queue); + /* Destroy the client instance */ + rd_kafka_destroy(rk); + + exit(retval); +} + +int main(int argc, char **argv) { + rd_kafka_conf_t *conf; /**< Client configuration object */ + int opt; + argv0 = argv[0]; + + /* + * Create Kafka client configuration place-holder + */ + conf = rd_kafka_conf_new(); + + + /* + * Parse common options + */ + while ((opt = getopt(argc, argv, "b:X:d:")) != -1) { + switch (opt) { + case 'b': + conf_set(conf, "bootstrap.servers", optarg); + break; + + case 'X': { + char *name = optarg, *val; + + if (!(val = strchr(name, '='))) + fatal("-X expects a name=value argument"); + + *val = '\0'; + val++; + + conf_set(conf, name, val); + break; + } + + case 'd': + conf_set(conf, "debug", optarg); + break; + + default: + usage("Unknown option %c", (char)opt); + } + } + + cmd_list_offsets(conf, argc - optind, &argv[optind]); + + return 0; +} diff --git a/src/rdkafka.c b/src/rdkafka.c index 64b2bfec6c..99d9c17449 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -3480,6 +3480,7 @@ static void rd_kafka_query_wmark_offsets_resp_cb(rd_kafka_t *rk, struct _query_wmark_offsets_state *state; rd_kafka_topic_partition_list_t *offsets; rd_kafka_topic_partition_t *rktpar; + int actions = 0; if (err == RD_KAFKA_RESP_ERR__DESTROY) { /* 'state' has gone out of scope when query_watermark..() @@ -3491,7 +3492,15 @@ static void rd_kafka_query_wmark_offsets_resp_cb(rd_kafka_t *rk, offsets = rd_kafka_topic_partition_list_new(1); err = rd_kafka_handle_ListOffsets(rk, rkb, err, rkbuf, request, offsets, - NULL); + &actions); + + if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { + /* Remove its cache in case the topic isn't a known topic. */ + rd_kafka_wrlock(rk); + rd_kafka_metadata_cache_delete_by_name(rk, state->topic); + rd_kafka_wrunlock(rk); + } + if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) { rd_kafka_topic_partition_list_destroy(offsets); return; /* Retrying */ @@ -3512,14 +3521,18 @@ static void rd_kafka_query_wmark_offsets_resp_cb(rd_kafka_t *rk, /* FALLTHRU */ } - /* Partition not seen in response. */ - if (!(rktpar = rd_kafka_topic_partition_list_find(offsets, state->topic, - state->partition))) + rktpar = rd_kafka_topic_partition_list_find(offsets, state->topic, + state->partition); + if (!rktpar && err > RD_KAFKA_RESP_ERR__END) { + /* Partition not seen in response, + * not a local error. */ err = RD_KAFKA_RESP_ERR__BAD_MSG; - else if (rktpar->err) - err = rktpar->err; - else - state->offsets[state->offidx] = rktpar->offset; + } else if (rktpar) { + if (rktpar->err) + err = rktpar->err; + else + state->offsets[state->offidx] = rktpar->offset; + } state->offidx++; @@ -3544,7 +3557,6 @@ rd_kafka_resp_err_t rd_kafka_query_watermark_offsets(rd_kafka_t *rk, struct rd_kafka_partition_leader *leader; rd_list_t leaders; rd_kafka_resp_err_t err; - int tmout; partitions = rd_kafka_topic_partition_list_new(1); rktpar = @@ -3576,29 +3588,24 @@ rd_kafka_resp_err_t rd_kafka_query_watermark_offsets(rd_kafka_t *rk, state.ts_end = ts_end; state.state_version = rd_kafka_brokers_get_state_version(rk); - rktpar->offset = RD_KAFKA_OFFSET_BEGINNING; rd_kafka_ListOffsetsRequest( leader->rkb, partitions, RD_KAFKA_REPLYQ(rkq, 0), - rd_kafka_query_wmark_offsets_resp_cb, &state); + rd_kafka_query_wmark_offsets_resp_cb, timeout_ms, &state); rktpar->offset = RD_KAFKA_OFFSET_END; rd_kafka_ListOffsetsRequest( leader->rkb, partitions, RD_KAFKA_REPLYQ(rkq, 0), - rd_kafka_query_wmark_offsets_resp_cb, &state); + rd_kafka_query_wmark_offsets_resp_cb, timeout_ms, &state); rd_kafka_topic_partition_list_destroy(partitions); rd_list_destroy(&leaders); /* Wait for reply (or timeout) */ while (state.err == RD_KAFKA_RESP_ERR__IN_PROGRESS) { - tmout = rd_timeout_remains(ts_end); - if (rd_timeout_expired(tmout)) { - state.err = RD_KAFKA_RESP_ERR__TIMED_OUT; - break; - } - rd_kafka_q_serve(rkq, tmout, 0, RD_KAFKA_Q_CB_CALLBACK, - rd_kafka_poll_cb, NULL); + rd_kafka_q_serve(rkq, RD_POLL_INFINITE, 0, + RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, + NULL); } rd_kafka_q_destroy_owner(rkq); @@ -3739,7 +3746,7 @@ rd_kafka_offsets_for_times(rd_kafka_t *rk, state.wait_reply++; rd_kafka_ListOffsetsRequest( leader->rkb, leader->partitions, RD_KAFKA_REPLYQ(rkq, 0), - rd_kafka_get_offsets_for_times_resp_cb, &state); + rd_kafka_get_offsets_for_times_resp_cb, timeout_ms, &state); } rd_list_destroy(&leaders); @@ -3963,6 +3970,7 @@ rd_kafka_op_res_t rd_kafka_poll_cb(rd_kafka_t *rk, case RD_KAFKA_OP_CREATEACLS: case RD_KAFKA_OP_DESCRIBEACLS: case RD_KAFKA_OP_DELETEACLS: + case RD_KAFKA_OP_LISTOFFSETS: /* Calls op_destroy() from worker callback, * when the time comes. */ res = rd_kafka_op_call(rk, rkq, rko); diff --git a/src/rdkafka.h b/src/rdkafka.h index 0802d6507d..de620284f0 100644 --- a/src/rdkafka.h +++ b/src/rdkafka.h @@ -5454,6 +5454,8 @@ typedef int rd_kafka_event_type_t; #define RD_KAFKA_EVENT_DESCRIBETOPICS_RESULT 0x100000 /** DescribeCluster_result_t */ #define RD_KAFKA_EVENT_DESCRIBECLUSTER_RESULT 0x200000 +/** ListOffsets_result_t */ +#define RD_KAFKA_EVENT_LISTOFFSETS_RESULT 0x400000 /** * @returns the event type for the given event. @@ -5611,6 +5613,7 @@ int rd_kafka_event_error_is_fatal(rd_kafka_event_t *rkev); * - RD_KAFKA_EVENT_ALTERCONSUMERGROUPOFFSETS_RESULT * - RD_KAFKA_EVENT_DESCRIBETOPICS_RESULT * - RD_KAFKA_EVENT_DESCRIBECLUSTER_RESULT + * - RD_KAFKA_EVENT_LISTOFFSETS_RESULT */ RD_EXPORT void *rd_kafka_event_opaque(rd_kafka_event_t *rkev); @@ -5732,6 +5735,8 @@ typedef rd_kafka_event_t rd_kafka_DescribeCluster_result_t; typedef rd_kafka_event_t rd_kafka_DescribeUserScramCredentials_result_t; /*! AlterUserScramCredentials result type */ typedef rd_kafka_event_t rd_kafka_AlterUserScramCredentials_result_t; +/*! ListOffsets result type */ +typedef rd_kafka_event_t rd_kafka_ListOffsets_result_t; /** * @brief Get CreateTopics result. @@ -5958,6 +5963,22 @@ rd_kafka_event_ListConsumerGroupOffsets_result(rd_kafka_event_t *rkev); RD_EXPORT const rd_kafka_AlterConsumerGroupOffsets_result_t * rd_kafka_event_AlterConsumerGroupOffsets_result(rd_kafka_event_t *rkev); +/** + * @brief Get ListOffsets result. + * + * @returns the result of a ListOffsets request, or NULL if + * event is of different type. + * + * @remark The lifetime of the returned memory is the same + * as the lifetime of the \p rkev object. + * + * Event types: + * RD_KAFKA_EVENT_LISTOFFSETS_RESULT + */ +RD_EXPORT const rd_kafka_ListOffsets_result_t * +rd_kafka_event_ListOffsets_result(rd_kafka_event_t *rkev); + + /** * @brief Get DescribeUserScramCredentials result. * @@ -6899,6 +6920,7 @@ typedef enum rd_kafka_admin_op_t { RD_KAFKA_ADMIN_OP_ALTERUSERSCRAMCREDENTIALS, RD_KAFKA_ADMIN_OP_DESCRIBETOPICS, /**< DescribeTopics */ RD_KAFKA_ADMIN_OP_DESCRIBECLUSTER, /**< DescribeCluster */ + RD_KAFKA_ADMIN_OP_LISTOFFSETS, /**< ListOffsets */ RD_KAFKA_ADMIN_OP__CNT /**< Number of ops defined */ } rd_kafka_admin_op_t; @@ -6916,6 +6938,18 @@ typedef enum rd_kafka_admin_op_t { typedef struct rd_kafka_AdminOptions_s rd_kafka_AdminOptions_t; +/** + * @enum rd_kafka_IsolationLevel_t + * + * @brief IsolationLevel enum name for use with rd_kafka_AdminOptions_new() + * + * @sa rd_kafka_AdminOptions_new() + */ +typedef enum rd_kafka_IsolationLevel_t { + RD_KAFKA_ISOLATION_LEVEL_READ_UNCOMMITTED = 0, + RD_KAFKA_ISOLATION_LEVEL_READ_COMMITTED = 1 +} rd_kafka_IsolationLevel_t; + /** * @brief Create a new AdminOptions object. * @@ -7115,6 +7149,14 @@ rd_kafka_error_t *rd_kafka_AdminOptions_set_match_consumer_group_states( const rd_kafka_consumer_group_state_t *consumer_group_states, size_t consumer_group_states_cnt); +/** + * @brief Set Isolation Level to an allowed `rd_kafka_IsolationLevel_t` value. + */ +RD_EXPORT +rd_kafka_error_t * +rd_kafka_AdminOptions_set_isolation_level(rd_kafka_AdminOptions_t *options, + rd_kafka_IsolationLevel_t value); + /** * @brief Set application opaque value that can be extracted from the * result event using rd_kafka_event_opaque() @@ -7154,7 +7196,6 @@ typedef enum rd_kafka_AclOperation_t { /**@}*/ - /** * @name Admin API - Topics * @brief Topic related operations. @@ -9107,6 +9148,92 @@ rd_kafka_DeleteConsumerGroupOffsets_result_groups( /**@}*/ +/** + * @name Admin API - ListOffsets + * @brief Given a topic_partition list, provides the offset information. + * @{ + */ + +/** + * @enum rd_kafka_OffsetSpec_t + * @brief Allows to specify the desired offsets when using ListOffsets. + */ +typedef enum rd_kafka_OffsetSpec_t { + /* Used to retrieve the offset with the largest timestamp of a partition + * as message timestamps can be specified client side this may not match + * the log end offset returned by SPEC_LATEST. + */ + RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP = -3, + /* Used to retrieve the offset with the earliest timestamp of a + partition. */ + RD_KAFKA_OFFSET_SPEC_EARLIEST = -2, + /* Used to retrieve the offset with the latest timestamp of a partition. + */ + RD_KAFKA_OFFSET_SPEC_LATEST = -1, +} rd_kafka_OffsetSpec_t; + +/** + * @brief Information returned from a ListOffsets call for a specific + * `rd_kafka_topic_partition_t`. + */ +typedef struct rd_kafka_ListOffsetsResultInfo_s + rd_kafka_ListOffsetsResultInfo_t; + +/** + * @brief Returns the topic partition of the passed \p result_info. + */ +RD_EXPORT +const rd_kafka_topic_partition_t * +rd_kafka_ListOffsetsResultInfo_topic_partition( + const rd_kafka_ListOffsetsResultInfo_t *result_info); + +/** + * @brief Returns the timestamp corresponding to the offset in \p result_info. + */ +RD_EXPORT +int64_t rd_kafka_ListOffsetsResultInfo_timestamp( + const rd_kafka_ListOffsetsResultInfo_t *result_info); + +/** + * @brief Returns the array of ListOffsetsResultInfo in \p result + * and populates the size of the array in \p cntp. + */ +RD_EXPORT +const rd_kafka_ListOffsetsResultInfo_t ** +rd_kafka_ListOffsets_result_infos(const rd_kafka_ListOffsets_result_t *result, + size_t *cntp); + +/** + * @brief List offsets for the specified \p topic_partitions. + * This operation enables to find the beginning offset, + * end offset as well as the offset matching a timestamp in partitions + * or the offset with max timestamp. + * + * @param rk Client instance. + * @param topic_partitions topic_partition_list_t with the partitions and + * offsets to list. Each topic partition offset can be + * a value of the `rd_kafka_OffsetSpec_t` enum or + * a non-negative value, representing a timestamp, + * to query for the first offset after the + * given timestamp. + * @param options Optional admin options, or NULL for defaults. + * @param rkqu Queue to emit result on. + * + * Supported admin options: + * - rd_kafka_AdminOptions_set_isolation_level() - default \c + * RD_KAFKA_ISOLATION_LEVEL_READ_UNCOMMITTED + * - rd_kafka_AdminOptions_set_request_timeout() - default socket.timeout.ms + * + * @remark The result event type emitted on the supplied queue is of type + * \c RD_KAFKA_EVENT_LISTOFFSETS_RESULT + */ +RD_EXPORT +void rd_kafka_ListOffsets(rd_kafka_t *rk, + rd_kafka_topic_partition_list_t *topic_partitions, + const rd_kafka_AdminOptions_t *options, + rd_kafka_queue_t *rkqu); + +/**@}*/ /** * @name Admin API - User SCRAM credentials diff --git a/src/rdkafka_admin.c b/src/rdkafka_admin.c index 93e4e7d6d3..4d27c9e13c 100644 --- a/src/rdkafka_admin.c +++ b/src/rdkafka_admin.c @@ -428,6 +428,8 @@ static RD_UNUSED RD_FORMAT(printf, 3, 4) void rd_kafka_admin_result_set_err( */ static RD_INLINE void rd_kafka_admin_result_enq(rd_kafka_op_t *rko_req, rd_kafka_op_t *rko_result) { + if (rko_req->rko_u.admin_result.result_cb) + rko_req->rko_u.admin_result.result_cb(rko_result); rd_kafka_replyq_enq(&rko_req->rko_u.admin_request.replyq, rko_result, rko_req->rko_u.admin_request.replyq.version); } @@ -660,6 +662,12 @@ rd_kafka_admin_request_op_new(rd_kafka_t *rk, return rko; } +static void +rd_kafka_admin_request_op_result_cb_set(rd_kafka_op_t *op, + void (*result_cb)(rd_kafka_op_t *)) { + op->rko_u.admin_result.result_cb = result_cb; +} + /** * @returns the remaining request timeout in milliseconds. @@ -1428,8 +1436,7 @@ static rd_kafka_op_res_t rd_kafka_admin_fanout_worker(rd_kafka_t *rk, NULL); /* Enqueue result on application queue, we're done. */ - rd_kafka_replyq_enq(&rko_fanout->rko_u.admin_request.replyq, rko_result, - rko_fanout->rko_u.admin_request.replyq.version); + rd_kafka_admin_result_enq(rko_fanout, rko_result); /* FALLTHRU */ if (rko_fanout->rko_u.admin_request.fanout.outstanding == 0) @@ -1563,6 +1570,16 @@ rd_kafka_AdminOptions_set_broker(rd_kafka_AdminOptions_t *options, &ibroker_id, errstr, errstr_size); } +rd_kafka_error_t * +rd_kafka_AdminOptions_set_isolation_level(rd_kafka_AdminOptions_t *options, + rd_kafka_IsolationLevel_t value) { + char errstr[512]; + rd_kafka_resp_err_t err = rd_kafka_confval_set_type( + &options->isolation_level, RD_KAFKA_CONFVAL_INT, &value, errstr, + sizeof(errstr)); + return !err ? NULL : rd_kafka_error_new(err, "%s", errstr); +} + rd_kafka_error_t *rd_kafka_AdminOptions_set_require_stable_offsets( rd_kafka_AdminOptions_t *options, int true_or_false) { @@ -1650,7 +1667,8 @@ static void rd_kafka_AdminOptions_init(rd_kafka_t *rk, options->for_api == RD_KAFKA_ADMIN_OP_CREATETOPICS || options->for_api == RD_KAFKA_ADMIN_OP_DELETETOPICS || options->for_api == RD_KAFKA_ADMIN_OP_CREATEPARTITIONS || - options->for_api == RD_KAFKA_ADMIN_OP_DELETERECORDS) + options->for_api == RD_KAFKA_ADMIN_OP_DELETERECORDS || + options->for_api == RD_KAFKA_ADMIN_OP_LISTOFFSETS) rd_kafka_confval_init_int(&options->operation_timeout, "operation_timeout", -1, 3600 * 1000, rk->rk_conf.admin.request_timeout_ms); @@ -1697,6 +1715,14 @@ static void rd_kafka_AdminOptions_init(rd_kafka_t *rk, rd_kafka_confval_disable(&options->match_consumer_group_states, "match_consumer_group_states"); + if (options->for_api == RD_KAFKA_ADMIN_OP_ANY || + options->for_api == RD_KAFKA_ADMIN_OP_LISTOFFSETS) + rd_kafka_confval_init_int(&options->isolation_level, + "isolation_level", 0, 1, 0); + else + rd_kafka_confval_disable(&options->isolation_level, + "isolation_level"); + rd_kafka_confval_init_int(&options->broker, "broker", 0, INT32_MAX, -1); rd_kafka_confval_init_ptr(&options->opaque, "opaque"); } @@ -3933,12 +3959,309 @@ rd_kafka_DeleteRecordsResponse_parse(rd_kafka_op_t *rko_req, return reply->rkbuf_err; } +/** + * @brief Creates a ListOffsetsResultInfo with the topic and parition and + * returns the ListOffsetsResultInfo. + */ +rd_kafka_ListOffsetsResultInfo_t * +rd_kafka_ListOffsetsResultInfo_new(rd_kafka_topic_partition_t *rktpar, + rd_ts_t timestamp) { + rd_kafka_ListOffsetsResultInfo_t *result_info; + result_info = rd_calloc(1, sizeof(*result_info)); + result_info->timestamp = timestamp; + result_info->topic_partition = rd_kafka_topic_partition_copy(rktpar); + return result_info; +} + +/** + * @brief Copies the ListOffsetsResultInfo. + */ +static rd_kafka_ListOffsetsResultInfo_t *rd_kafka_ListOffsetsResultInfo_copy( + const rd_kafka_ListOffsetsResultInfo_t *result_info) { + return rd_kafka_ListOffsetsResultInfo_new(result_info->topic_partition, + result_info->timestamp); +} + +/** + * @brief Same as rd_kafka_ListOffsetsResultInfo_copy() but suitable for + * rd_list_copy(). The \p opaque is ignored. + */ +static void *rd_kafka_ListOffsetsResultInfo_copy_opaque(const void *element, + void *opaque) { + return rd_kafka_ListOffsetsResultInfo_copy(element); +} + +/** + * @brief Returns the topic partition of the passed \p result_info. + */ +const rd_kafka_topic_partition_t * +rd_kafka_ListOffsetsResultInfo_topic_partition( + const rd_kafka_ListOffsetsResultInfo_t *result_info) { + return result_info->topic_partition; +} + +/** + * @brief Returns the timestamp specified for the offset of the + * rd_kafka_ListOffsetsResultInfo_t. + */ +int64_t rd_kafka_ListOffsetsResultInfo_timestamp( + const rd_kafka_ListOffsetsResultInfo_t *result_info) { + return result_info->timestamp; +} + +static void rd_kafka_ListOffsetsResultInfo_destroy( + rd_kafka_ListOffsetsResultInfo_t *element) { + rd_kafka_topic_partition_destroy(element->topic_partition); + rd_free(element); +} + +static void rd_kafka_ListOffsetsResultInfo_destroy_free(void *element) { + rd_kafka_ListOffsetsResultInfo_destroy(element); +} + +/** + * @brief Merges the response of the partial request made for ListOffsets via + * the \p rko_partial into the \p rko_fanout responsible for the + * ListOffsets request. + * @param rko_fanout The rd_kafka_op_t corresponding to the whole original + * ListOffsets request. + * @param rko_partial The rd_kafka_op_t corresponding to the leader specific + * ListOffset request sent after leaders querying. + */ +static void +rd_kafka_ListOffsets_response_merge(rd_kafka_op_t *rko_fanout, + const rd_kafka_op_t *rko_partial) { + size_t partition_cnt; + size_t total_partitions; + size_t i, j; + rd_assert(rko_partial->rko_evtype == RD_KAFKA_EVENT_LISTOFFSETS_RESULT); + + partition_cnt = rd_list_cnt(&rko_partial->rko_u.admin_result.results); + total_partitions = + rd_list_cnt(&rko_fanout->rko_u.admin_request.fanout.results); + + for (i = 0; i < partition_cnt; i++) { + rd_kafka_ListOffsetsResultInfo_t *partial_result_info = + rd_list_elem(&rko_partial->rko_u.admin_result.results, i); + for (j = 0; j < total_partitions; j++) { + rd_kafka_ListOffsetsResultInfo_t *result_info = + rd_list_elem( + &rko_fanout->rko_u.admin_request.fanout.results, + j); + if (rd_kafka_topic_partition_cmp( + result_info->topic_partition, + partial_result_info->topic_partition) == 0) { + result_info->timestamp = + partial_result_info->timestamp; + rd_kafka_topic_partition_destroy( + result_info->topic_partition); + result_info->topic_partition = + rd_kafka_topic_partition_copy( + partial_result_info->topic_partition); + break; + } + } + } +} + +/** + * @brief Returns the array of pointers of rd_kafka_ListOffsetsResultInfo_t + * given rd_kafka_ListOffsets_result_t and populates the size of the array. + */ +const rd_kafka_ListOffsetsResultInfo_t ** +rd_kafka_ListOffsets_result_infos(const rd_kafka_ListOffsets_result_t *result, + size_t *cntp) { + *cntp = rd_list_cnt(&result->rko_u.admin_result.results); + return (const rd_kafka_ListOffsetsResultInfo_t **) + result->rko_u.admin_result.results.rl_elems; +} + +/** + * @brief Admin compatible API to parse the ListOffsetResponse buffer + * provided in \p reply. + */ +static rd_kafka_resp_err_t +rd_kafka_ListOffsetsResponse_parse(rd_kafka_op_t *rko_req, + rd_kafka_op_t **rko_resultp, + rd_kafka_buf_t *reply, + char *errstr, + size_t errstr_size) { + rd_list_t *result_list = + rd_list_new(1, rd_kafka_ListOffsetsResultInfo_destroy_free); + rd_kafka_op_t *rko_result; + rd_kafka_parse_ListOffsets(reply, NULL, result_list); + if (reply->rkbuf_err) { + rd_snprintf(errstr, errstr_size, + "Error parsing ListOffsets response: %s", + rd_kafka_err2str(reply->rkbuf_err)); + return reply->rkbuf_err; + } + + rko_result = rd_kafka_admin_result_new(rko_req); + rd_list_init_copy(&rko_result->rko_u.admin_result.results, result_list); + rd_list_copy_to(&rko_result->rko_u.admin_result.results, result_list, + rd_kafka_ListOffsetsResultInfo_copy_opaque, NULL); + rd_list_destroy(result_list); + + *rko_resultp = rko_result; + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + +/** + * @brief Should the received error code cause a metadata refresh? + */ +static rd_bool_t rd_kafka_admin_result_err_refresh(rd_kafka_resp_err_t err) { + switch (err) { + case RD_KAFKA_RESP_ERR_NOT_LEADER_OR_FOLLOWER: + case RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE: + return rd_true; + default: + return rd_false; + } +} + +/** + * @brief ListOffsets result handler for internal side effects. + */ +static void rd_kafka_ListOffsets_handle_result(rd_kafka_op_t *rko_result) { + rd_kafka_topic_partition_list_t *rktpars; + rd_kafka_ListOffsetsResultInfo_t *result_info; + rd_kafka_t *rk; + rd_kafka_resp_err_t err, rktpar_err; + rd_kafka_topic_partition_t *rktpar; + size_t i; + + err = rko_result->rko_err; + if (rd_list_empty(&rko_result->rko_u.admin_result.args) || + rd_list_empty(&rko_result->rko_u.admin_result.results)) + return; + + rk = rko_result->rko_rk; + rktpars = rd_list_elem(&rko_result->rko_u.admin_result.args, 0); + rd_kafka_wrlock(rk); + i = 0; + RD_KAFKA_TPLIST_FOREACH(rktpar, rktpars) { + result_info = + rd_list_elem(&rko_result->rko_u.admin_result.results, i); + rktpar_err = err ? err : result_info->topic_partition->err; + + if (rd_kafka_admin_result_err_refresh(rktpar_err)) { + rd_kafka_metadata_cache_delete_by_name(rk, + rktpar->topic); + } + i++; + } + rd_kafka_wrunlock(rk); +} + +/** + * @brief Call when leaders have been queried to progress the ListOffsets + * admin op to its next phase, sending ListOffsets to partition + * leaders. + */ +static rd_kafka_op_res_t +rd_kafka_ListOffsets_leaders_queried_cb(rd_kafka_t *rk, + rd_kafka_q_t *rkq, + rd_kafka_op_t *reply) { + + rd_kafka_resp_err_t err = reply->rko_err; + const rd_list_t *leaders = + reply->rko_u.leaders.leaders; /* Possibly NULL (on err) */ + rd_kafka_topic_partition_list_t *partitions = + reply->rko_u.leaders.partitions; /* Possibly NULL (on err) */ + rd_kafka_op_t *rko_fanout = reply->rko_u.leaders.opaque; + rd_kafka_topic_partition_list_t *topic_partitions; + rd_kafka_topic_partition_t *rktpar; + size_t partition_cnt; + const struct rd_kafka_partition_leader *leader; + size_t i; + static const struct rd_kafka_admin_worker_cbs cbs = { + rd_kafka_ListOffsetsRequest_admin, + rd_kafka_ListOffsetsResponse_parse, + }; + + rd_assert((rko_fanout->rko_type & ~RD_KAFKA_OP_FLAGMASK) == + RD_KAFKA_OP_ADMIN_FANOUT); + + if (err) { + rd_kafka_admin_result_fail( + rko_fanout, err, "Failed to query partition leaders: %s", + err == RD_KAFKA_RESP_ERR__NOENT ? "No leaders found" + : rd_kafka_err2str(err)); + rd_kafka_admin_common_worker_destroy(rk, rko_fanout, + rd_true /*destroy*/); + return RD_KAFKA_OP_RES_HANDLED; + } + + /* Create fanout results */ + topic_partitions = + rd_list_elem(&rko_fanout->rko_u.admin_request.args, 0); + partition_cnt = topic_partitions->cnt; + rd_list_init(&rko_fanout->rko_u.admin_request.fanout.results, + partition_cnt, + rd_kafka_ListOffsetsResultInfo_destroy_free); + + for (i = 0; i < partition_cnt; i++) { + rd_kafka_topic_partition_t *topic_partition = + &topic_partitions->elems[i]; + rd_kafka_ListOffsetsResultInfo_t *result_element = + rd_kafka_ListOffsetsResultInfo_new(topic_partition, -1); + rd_kafka_topic_partition_set_from_fetch_pos( + result_element->topic_partition, + RD_KAFKA_FETCH_POS(RD_KAFKA_OFFSET_INVALID, -1)); + result_element->topic_partition->err = + RD_KAFKA_RESP_ERR_NO_ERROR; + rd_list_add(&rko_fanout->rko_u.admin_request.fanout.results, + result_element); + } + + /* Set errors to corresponding result partitions */ + RD_KAFKA_TPLIST_FOREACH(rktpar, partitions) { + rd_kafka_ListOffsetsResultInfo_t *result_element; + if (!rktpar->err) + continue; + result_element = NULL; + for (i = 0; i < partition_cnt; i++) { + result_element = rd_list_elem( + &rko_fanout->rko_u.admin_request.fanout.results, i); + if (rd_kafka_topic_partition_cmp( + result_element->topic_partition, rktpar) == 0) + break; + } + result_element->topic_partition->err = rktpar->err; + } + + /* For each leader send a request for its partitions */ + rko_fanout->rko_u.admin_request.fanout.outstanding = + rd_list_cnt(leaders); + + RD_LIST_FOREACH(leader, leaders, i) { + rd_kafka_op_t *rko = rd_kafka_admin_request_op_new( + rk, RD_KAFKA_OP_LISTOFFSETS, + RD_KAFKA_EVENT_LISTOFFSETS_RESULT, &cbs, + &rko_fanout->rko_u.admin_request.options, rk->rk_ops); + + rko->rko_u.admin_request.fanout_parent = rko_fanout; + rko->rko_u.admin_request.broker_id = leader->rkb->rkb_nodeid; + + rd_kafka_topic_partition_list_sort_by_topic(leader->partitions); + rd_list_init(&rko->rko_u.admin_request.args, 1, + rd_kafka_topic_partition_list_destroy_free); + rd_list_add( + &rko->rko_u.admin_request.args, + rd_kafka_topic_partition_list_copy(leader->partitions)); + + /* Enqueue op for admin_worker() to transition to next state */ + rd_kafka_q_enq(rk->rk_ops, rko); + } + + return RD_KAFKA_OP_RES_HANDLED; +} + /** * @brief Call when leaders have been queried to progress the DeleteRecords * admin op to its next phase, sending DeleteRecords to partition * leaders. - * - * @param rko Reply op (RD_KAFKA_OP_LEADERS). */ static rd_kafka_op_res_t rd_kafka_DeleteRecords_leaders_queried_cb(rd_kafka_t *rk, @@ -4106,6 +4429,82 @@ void rd_kafka_DeleteRecords(rd_kafka_t *rk, } +void rd_kafka_ListOffsets(rd_kafka_t *rk, + rd_kafka_topic_partition_list_t *topic_partitions, + const rd_kafka_AdminOptions_t *options, + rd_kafka_queue_t *rkqu) { + int i; + int16_t error_code = RD_KAFKA_RESP_ERR_NO_ERROR; + rd_kafka_op_t *rko_fanout; + rd_kafka_topic_partition_list_t *copied_topic_partitions; + rd_list_t *topic_partitions_sorted; + + static const struct rd_kafka_admin_fanout_worker_cbs fanout_cbs = { + rd_kafka_ListOffsets_response_merge, + rd_kafka_ListOffsetsResultInfo_copy_opaque, + rd_kafka_topic_partition_list_copy_opaque}; + + rko_fanout = rd_kafka_admin_fanout_op_new( + rk, RD_KAFKA_OP_LISTOFFSETS, RD_KAFKA_EVENT_LISTOFFSETS_RESULT, + &fanout_cbs, options, rkqu->rkqu_q); + + rd_kafka_admin_request_op_result_cb_set( + rko_fanout, rd_kafka_ListOffsets_handle_result); + + if (topic_partitions->cnt == 0) { + error_code = RD_KAFKA_RESP_ERR__INVALID_ARG; + goto err; + } + + topic_partitions_sorted = rd_list_new( + topic_partitions->cnt, rd_kafka_topic_partition_destroy_free); + for (i = 0; i < topic_partitions->cnt; i++) + rd_list_add( + topic_partitions_sorted, + rd_kafka_topic_partition_copy(&topic_partitions->elems[i])); + + rd_list_sort(topic_partitions_sorted, rd_kafka_topic_partition_cmp); + if (rd_list_find_duplicate(topic_partitions_sorted, + rd_kafka_topic_partition_cmp)) + error_code = RD_KAFKA_RESP_ERR__INVALID_ARG; + rd_list_destroy(topic_partitions_sorted); + + if (error_code) + goto err; + + for (i = 0; i < topic_partitions->cnt; i++) { + rd_kafka_topic_partition_t *partition = + &topic_partitions->elems[i]; + if (partition->offset < RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP) { + error_code = RD_KAFKA_RESP_ERR__INVALID_ARG; + break; + } + } + + if (error_code) + goto err; + + copied_topic_partitions = + rd_kafka_topic_partition_list_copy(topic_partitions); + rd_list_init(&rko_fanout->rko_u.admin_request.args, 1, + rd_kafka_topic_partition_list_destroy_free); + rd_list_add(&rko_fanout->rko_u.admin_request.args, + copied_topic_partitions); + + /* Async query for partition leaders */ + rd_kafka_topic_partition_list_query_leaders_async( + rk, copied_topic_partitions, + rd_kafka_admin_timeout_remains(rko_fanout), + RD_KAFKA_REPLYQ(rk->rk_ops, 0), + rd_kafka_ListOffsets_leaders_queried_cb, rko_fanout); + return; +err: + rd_kafka_admin_result_fail(rko_fanout, error_code, "%s", + rd_kafka_err2str(error_code)); + rd_kafka_admin_common_worker_destroy(rk, rko_fanout, + rd_true /*destroy*/); +} + /** * @brief Get the list of offsets from a DeleteRecords result. * @@ -8556,4 +8955,4 @@ void rd_kafka_DescribeCluster(rd_kafka_t *rk, rd_kafka_q_enq(rk->rk_ops, rko); } -/**@}*/ \ No newline at end of file +/**@}*/ diff --git a/src/rdkafka_admin.h b/src/rdkafka_admin.h index 3e7378af56..62b2e7244c 100644 --- a/src/rdkafka_admin.h +++ b/src/rdkafka_admin.h @@ -109,6 +109,13 @@ struct rd_kafka_AdminOptions_s { * Valid for: ListConsumerGroups. */ + rd_kafka_confval_t + isolation_level; /**< INT:Isolation Level needed for list Offset + * to query for. + * Default Set to + * RD_KAFKA_ISOLATION_LEVEL_READ_UNCOMMITTED + */ + rd_kafka_confval_t opaque; /**< PTR: Application opaque. * Valid for all. */ }; @@ -307,6 +314,47 @@ struct rd_kafka_DeleteRecords_s { /**@}*/ +/** + * @name ListConsumerGroupOffsets + * @{ + */ + +/** + * @brief ListConsumerGroupOffsets result + */ +struct rd_kafka_ListConsumerGroupOffsets_result_s { + rd_list_t groups; /**< Type (rd_kafka_group_result_t *) */ +}; + +struct rd_kafka_ListConsumerGroupOffsets_s { + char *group_id; /**< Points to data */ + rd_kafka_topic_partition_list_t *partitions; + char data[1]; /**< The group id is allocated along with + * the struct here. */ +}; + +/**@}*/ + +/** + * @name AlterConsumerGroupOffsets + * @{ + */ + +/** + * @brief AlterConsumerGroupOffsets result + */ +struct rd_kafka_AlterConsumerGroupOffsets_result_s { + rd_list_t groups; /**< Type (rd_kafka_group_result_t *) */ +}; + +struct rd_kafka_AlterConsumerGroupOffsets_s { + char *group_id; /**< Points to data */ + rd_kafka_topic_partition_list_t *partitions; + char data[1]; /**< The group id is allocated along with + * the struct here. */ +}; + +/**@}*/ /** * @name DeleteConsumerGroupOffsets @@ -329,6 +377,24 @@ struct rd_kafka_DeleteConsumerGroupOffsets_s { /**@}*/ +/** + * @name ListOffsets + * @{ + */ + +/** + * @struct ListOffsets result about a single partition + */ +struct rd_kafka_ListOffsetsResultInfo_s { + rd_kafka_topic_partition_t *topic_partition; + int64_t timestamp; +}; + +rd_kafka_ListOffsetsResultInfo_t * +rd_kafka_ListOffsetsResultInfo_new(rd_kafka_topic_partition_t *rktpar, + rd_ts_t timestamp); +/**@}*/ + /** * @name CreateAcls * @{ @@ -366,50 +432,6 @@ struct rd_kafka_DeleteAcls_result_response_s { /**@}*/ - -/** - * @name AlterConsumerGroupOffsets - * @{ - */ - -/** - * @brief AlterConsumerGroupOffsets result - */ -struct rd_kafka_AlterConsumerGroupOffsets_result_s { - rd_list_t groups; /**< Type (rd_kafka_group_result_t *) */ -}; - -struct rd_kafka_AlterConsumerGroupOffsets_s { - char *group_id; /**< Points to data */ - rd_kafka_topic_partition_list_t *partitions; - char data[1]; /**< The group id is allocated along with - * the struct here. */ -}; - -/**@}*/ - - -/** - * @name ListConsumerGroupOffsets - * @{ - */ - -/** - * @brief ListConsumerGroupOffsets result - */ -struct rd_kafka_ListConsumerGroupOffsets_result_s { - rd_list_t groups; /**< Type (rd_kafka_group_result_t *) */ -}; - -struct rd_kafka_ListConsumerGroupOffsets_s { - char *group_id; /**< Points to data */ - rd_kafka_topic_partition_list_t *partitions; - char data[1]; /**< The group id is allocated along with - * the struct here. */ -}; - -/**@}*/ - /** * @name ListConsumerGroups * @{ diff --git a/src/rdkafka_buf.c b/src/rdkafka_buf.c index e31ae00a29..362f57a27d 100644 --- a/src/rdkafka_buf.c +++ b/src/rdkafka_buf.c @@ -2,6 +2,7 @@ * librdkafka - Apache Kafka C library * * Copyright (c) 2012-2022, Magnus Edenhill + * 2023, Confluent Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -120,6 +121,18 @@ rd_kafka_buf_t *rd_kafka_buf_new0(int segcnt, size_t size, int flags) { return rkbuf; } +/** + * @brief Upgrade request header to flexver by writing header tags. + */ +void rd_kafka_buf_upgrade_flexver_request(rd_kafka_buf_t *rkbuf) { + if (likely(!(rkbuf->rkbuf_flags & RD_KAFKA_OP_F_FLEXVER))) { + rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_FLEXVER; + + /* Empty request header tags */ + rd_kafka_buf_write_i8(rkbuf, 0); + } +} + /** * @brief Create new request buffer with the request-header written (will @@ -165,12 +178,7 @@ rd_kafka_buf_t *rd_kafka_buf_new_request0(rd_kafka_broker_t *rkb, rd_kafka_buf_write_kstr(rkbuf, rkb->rkb_rk->rk_client_id); if (is_flexver) { - /* Must set flexver after writing the client id since - * it is still a standard non-compact string. */ - rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_FLEXVER; - - /* Empty request header tags */ - rd_kafka_buf_write_i8(rkbuf, 0); + rd_kafka_buf_upgrade_flexver_request(rkbuf); } return rkbuf; diff --git a/src/rdkafka_buf.h b/src/rdkafka_buf.h index b6568b0ca9..099f705018 100644 --- a/src/rdkafka_buf.h +++ b/src/rdkafka_buf.h @@ -948,6 +948,7 @@ rd_kafka_buf_t *rd_kafka_buf_new_request0(rd_kafka_broker_t *rkb, #define rd_kafka_buf_new_flexver_request(rkb, ApiKey, segcnt, size, \ is_flexver) \ rd_kafka_buf_new_request0(rkb, ApiKey, segcnt, size, is_flexver) +void rd_kafka_buf_upgrade_flexver_request(rd_kafka_buf_t *rkbuf); rd_kafka_buf_t * rd_kafka_buf_new_shadow(const void *ptr, size_t size, void (*free_cb)(void *)); diff --git a/src/rdkafka_event.c b/src/rdkafka_event.c index 8fd93280ab..6ea366a5a8 100644 --- a/src/rdkafka_event.c +++ b/src/rdkafka_event.c @@ -95,6 +95,8 @@ const char *rd_kafka_event_name(const rd_kafka_event_t *rkev) { return "DescribeUserScramCredentials"; case RD_KAFKA_EVENT_ALTERUSERSCRAMCREDENTIALS_RESULT: return "AlterUserScramCredentials"; + case RD_KAFKA_EVENT_LISTOFFSETS_RESULT: + return "ListOffsetsResult"; default: return "?unknown?"; } @@ -471,6 +473,15 @@ rd_kafka_event_AlterUserScramCredentials_result(rd_kafka_event_t *rkev) { return ( const rd_kafka_AlterUserScramCredentials_result_t *)rkev; } + +const rd_kafka_ListOffsets_result_t * +rd_kafka_event_ListOffsets_result(rd_kafka_event_t *rkev) { + if (!rkev || rkev->rko_evtype != RD_KAFKA_EVENT_LISTOFFSETS_RESULT) + return NULL; + else + return (const rd_kafka_ListOffsets_result_t *)rkev; +} + const rd_kafka_ListConsumerGroupOffsets_result_t * rd_kafka_event_ListConsumerGroupOffsets_result(rd_kafka_event_t *rkev) { if (!rkev || diff --git a/src/rdkafka_event.h b/src/rdkafka_event.h index 4b6f29a203..5d22456b38 100644 --- a/src/rdkafka_event.h +++ b/src/rdkafka_event.h @@ -2,6 +2,7 @@ * librdkafka - Apache Kafka C library * * Copyright (c) 2016-2022, Magnus Edenhill + * 2023, Confluent Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -115,6 +116,7 @@ static RD_UNUSED RD_INLINE int rd_kafka_event_setup(rd_kafka_t *rk, case RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH: case RD_KAFKA_EVENT_DESCRIBEUSERSCRAMCREDENTIALS_RESULT: case RD_KAFKA_EVENT_ALTERUSERSCRAMCREDENTIALS_RESULT: + case RD_KAFKA_EVENT_LISTOFFSETS_RESULT: return 1; default: diff --git a/src/rdkafka_metadata.h b/src/rdkafka_metadata.h index ded83bb14c..213bf2b896 100644 --- a/src/rdkafka_metadata.h +++ b/src/rdkafka_metadata.h @@ -268,6 +268,7 @@ struct rd_kafka_metadata_cache { +int rd_kafka_metadata_cache_delete_by_name(rd_kafka_t *rk, const char *topic); void rd_kafka_metadata_cache_expiry_start(rd_kafka_t *rk); int rd_kafka_metadata_cache_evict_by_age(rd_kafka_t *rk, rd_ts_t ts); void rd_kafka_metadata_cache_topic_update( diff --git a/src/rdkafka_metadata_cache.c b/src/rdkafka_metadata_cache.c index 1530e699e6..b3bad4de8d 100644 --- a/src/rdkafka_metadata_cache.c +++ b/src/rdkafka_metadata_cache.c @@ -94,8 +94,7 @@ rd_kafka_metadata_cache_delete(rd_kafka_t *rk, * @locks rd_kafka_wrlock() * @returns 1 if entry was found and removed, else 0. */ -static int rd_kafka_metadata_cache_delete_by_name(rd_kafka_t *rk, - const char *topic) { +int rd_kafka_metadata_cache_delete_by_name(rd_kafka_t *rk, const char *topic) { struct rd_kafka_metadata_cache_entry *rkmce; rkmce = rd_kafka_metadata_cache_find(rk, topic, 1); diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index f3d9f1134a..047f890f5e 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -433,10 +433,10 @@ static int rd_kafka_mock_handle_ListOffsets(rd_kafka_mock_connection_t *mconn, /* Inject error, if any */ all_err = rd_kafka_mock_next_request_error(mconn, resp); - rd_kafka_buf_read_i32(rkbuf, &TopicsCnt); + rd_kafka_buf_read_arraycnt(rkbuf, &TopicsCnt, RD_KAFKAP_TOPICS_MAX); /* Response: #Topics */ - rd_kafka_buf_write_i32(resp, TopicsCnt); + rd_kafka_buf_write_arraycnt(resp, TopicsCnt); while (TopicsCnt-- > 0) { rd_kafkap_str_t Topic; @@ -444,14 +444,15 @@ static int rd_kafka_mock_handle_ListOffsets(rd_kafka_mock_connection_t *mconn, rd_kafka_mock_topic_t *mtopic; rd_kafka_buf_read_str(rkbuf, &Topic); - rd_kafka_buf_read_i32(rkbuf, &PartitionCnt); + rd_kafka_buf_read_arraycnt(rkbuf, &PartitionCnt, + RD_KAFKAP_PARTITIONS_MAX); mtopic = rd_kafka_mock_topic_find_by_kstr(mcluster, &Topic); /* Response: Topic */ rd_kafka_buf_write_kstr(resp, &Topic); /* Response: #Partitions */ - rd_kafka_buf_write_i32(resp, PartitionCnt); + rd_kafka_buf_write_arraycnt(resp, PartitionCnt); while (PartitionCnt-- > 0) { int32_t Partition, CurrentLeaderEpoch = -1; @@ -471,6 +472,9 @@ static int rd_kafka_mock_handle_ListOffsets(rd_kafka_mock_connection_t *mconn, if (rkbuf->rkbuf_reqhdr.ApiVersion == 0) rd_kafka_buf_read_i32(rkbuf, &MaxNumOffsets); + /* Partition tags */ + rd_kafka_buf_skip_tags(rkbuf); + if (mtopic) mpart = rd_kafka_mock_partition_find(mtopic, Partition); @@ -524,6 +528,9 @@ static int rd_kafka_mock_handle_ListOffsets(rd_kafka_mock_connection_t *mconn, resp, mpart ? mpart->leader_epoch : -1); } + /* Response: Partition tags */ + rd_kafka_buf_write_tags(resp); + rd_kafka_dbg(mcluster->rk, MOCK, "MOCK", "Topic %.*s [%" PRId32 "] returning " @@ -534,6 +541,11 @@ static int rd_kafka_mock_handle_ListOffsets(rd_kafka_mock_connection_t *mconn, rd_kafka_offset2str(Timestamp), rd_kafka_err2str(err)); } + + /* Topic tags */ + rd_kafka_buf_skip_tags(rkbuf); + /* Response: Topic tags */ + rd_kafka_buf_write_tags(resp); } @@ -2114,7 +2126,7 @@ const struct rd_kafka_mock_api_handler /* [request-type] = { MinVersion, MaxVersion, FlexVersion, callback } */ [RD_KAFKAP_Produce] = {0, 7, -1, rd_kafka_mock_handle_Produce}, [RD_KAFKAP_Fetch] = {0, 11, -1, rd_kafka_mock_handle_Fetch}, - [RD_KAFKAP_ListOffsets] = {0, 5, -1, rd_kafka_mock_handle_ListOffsets}, + [RD_KAFKAP_ListOffsets] = {0, 7, 6, rd_kafka_mock_handle_ListOffsets}, [RD_KAFKAP_OffsetFetch] = {0, 6, 6, rd_kafka_mock_handle_OffsetFetch}, [RD_KAFKAP_OffsetCommit] = {0, 8, 8, rd_kafka_mock_handle_OffsetCommit}, [RD_KAFKAP_ApiVersion] = {0, 2, 3, rd_kafka_mock_handle_ApiVersion}, diff --git a/src/rdkafka_op.c b/src/rdkafka_op.c index fe009981ff..34e9e3fd34 100644 --- a/src/rdkafka_op.c +++ b/src/rdkafka_op.c @@ -116,6 +116,7 @@ const char *rd_kafka_op2str(rd_kafka_op_type_t type) { "REPLY:ALTERUSERSCRAMCREDENTIALS", [RD_KAFKA_OP_DESCRIBEUSERSCRAMCREDENTIALS] = "REPLY:DESCRIBEUSERSCRAMCREDENTIALS", + [RD_KAFKA_OP_LISTOFFSETS] = "REPLY:LISTOFFSETS", }; if (type & RD_KAFKA_OP_REPLY) @@ -274,6 +275,7 @@ rd_kafka_op_t *rd_kafka_op_new0(const char *source, rd_kafka_op_type_t type) { sizeof(rko->rko_u.admin_request), [RD_KAFKA_OP_DESCRIBEUSERSCRAMCREDENTIALS] = sizeof(rko->rko_u.admin_request), + [RD_KAFKA_OP_LISTOFFSETS] = sizeof(rko->rko_u.admin_request), }; size_t tsize = op2size[type & ~RD_KAFKA_OP_FLAGMASK]; @@ -424,6 +426,7 @@ void rd_kafka_op_destroy(rd_kafka_op_t *rko) { case RD_KAFKA_OP_LISTCONSUMERGROUPOFFSETS: case RD_KAFKA_OP_ALTERUSERSCRAMCREDENTIALS: case RD_KAFKA_OP_DESCRIBEUSERSCRAMCREDENTIALS: + case RD_KAFKA_OP_LISTOFFSETS: rd_kafka_replyq_destroy(&rko->rko_u.admin_request.replyq); rd_list_destroy(&rko->rko_u.admin_request.args); if (rko->rko_u.admin_request.options.match_consumer_group_states diff --git a/src/rdkafka_op.h b/src/rdkafka_op.h index 5a20a0f39b..3a1384362a 100644 --- a/src/rdkafka_op.h +++ b/src/rdkafka_op.h @@ -179,6 +179,7 @@ typedef enum { RD_KAFKA_OP_ALTERUSERSCRAMCREDENTIALS, /* < Admin: AlterUserScramCredentials u.admin_request >*/ + RD_KAFKA_OP_LISTOFFSETS, /**< Admin: ListOffsets u.admin_request >*/ RD_KAFKA_OP__END } rd_kafka_op_type_t; @@ -529,6 +530,9 @@ struct rd_kafka_op_s { char *errstr; /**< Error string, if rko_err * is set, else NULL. */ + /** Result cb for this op */ + void (*result_cb)(rd_kafka_op_t *); + rd_list_t results; /**< Type depends on request type: * * (rd_kafka_topic_result_t *): diff --git a/src/rdkafka_partition.c b/src/rdkafka_partition.c index 76baa3cfa3..b175ffbc79 100644 --- a/src/rdkafka_partition.c +++ b/src/rdkafka_partition.c @@ -163,9 +163,11 @@ static void rd_kafka_toppar_consumer_lag_req(rd_kafka_toppar_t *rktp) { /* Ask for oldest offset. The newest offset is automatically * propagated in FetchResponse.HighwaterMark. */ - rd_kafka_ListOffsetsRequest( - rktp->rktp_broker, partitions, RD_KAFKA_REPLYQ(rktp->rktp_ops, 0), - rd_kafka_toppar_lag_handle_Offset, rd_kafka_toppar_keep(rktp)); + rd_kafka_ListOffsetsRequest(rktp->rktp_broker, partitions, + RD_KAFKA_REPLYQ(rktp->rktp_ops, 0), + rd_kafka_toppar_lag_handle_Offset, + -1, /* don't set an absolute timeout */ + rd_kafka_toppar_keep(rktp)); rd_kafka_toppar_unlock(rktp); @@ -1600,7 +1602,9 @@ void rd_kafka_toppar_offset_request(rd_kafka_toppar_t *rktp, rd_kafka_ListOffsetsRequest( rkb, offsets, RD_KAFKA_REPLYQ(rktp->rktp_ops, rktp->rktp_op_version), - rd_kafka_toppar_handle_Offset, rktp); + rd_kafka_toppar_handle_Offset, + -1, /* don't set an absolute timeout */ + rktp); rd_kafka_topic_partition_list_destroy(offsets); } diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index ca99349e46..b9e250a9e5 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -475,25 +475,95 @@ rd_kafka_FindCoordinatorRequest(rd_kafka_broker_t *rkb, return RD_KAFKA_RESP_ERR_NO_ERROR; } +/** + * @struct rd_kafka_ListOffsetRequest_parameters_s + * @brief parameters for the rd_kafka_make_ListOffsetsRequest function. + */ +typedef struct rd_kafka_ListOffsetRequest_parameters_s { + /** Partitions to request offsets for. */ + rd_kafka_topic_partition_list_t *rktpars; + /** Isolation level. */ + rd_kafka_IsolationLevel_t isolation_level; + /** Error string (optional). */ + char *errstr; + /** Error string size (optional). */ + size_t errstr_size; +} rd_kafka_ListOffsetRequest_parameters_t; + + +static rd_kafka_ListOffsetRequest_parameters_t +rd_kafka_ListOffsetRequest_parameters_make( + rd_kafka_topic_partition_list_t *rktpars, + rd_kafka_IsolationLevel_t isolation_level, + char *errstr, + size_t errstr_size) { + rd_kafka_ListOffsetRequest_parameters_t params = RD_ZERO_INIT; + params.rktpars = rktpars; + params.isolation_level = isolation_level; + params.errstr = errstr; + params.errstr_size = errstr_size; + return params; +} + +static rd_kafka_ListOffsetRequest_parameters_t * +rd_kafka_ListOffsetRequest_parameters_new( + rd_kafka_topic_partition_list_t *rktpars, + rd_kafka_IsolationLevel_t isolation_level, + char *errstr, + size_t errstr_size) { + rd_kafka_ListOffsetRequest_parameters_t *params = + rd_calloc(1, sizeof(*params)); + *params = rd_kafka_ListOffsetRequest_parameters_make( + rktpars, isolation_level, errstr, errstr_size); + return params; +} + +static void rd_kafka_ListOffsetRequest_parameters_destroy_free(void *opaque) { + rd_kafka_ListOffsetRequest_parameters_t *parameters = opaque; + RD_IF_FREE(parameters->rktpars, rd_kafka_topic_partition_list_destroy); + RD_IF_FREE(parameters->errstr, rd_free); + rd_free(parameters); +} +static rd_kafka_buf_t * +rd_kafka_ListOffsetRequest_buf_new(rd_kafka_broker_t *rkb, + rd_kafka_topic_partition_list_t *rktpars) { + return rd_kafka_buf_new_flexver_request( + rkb, RD_KAFKAP_ListOffsets, 1, + /* ReplicaId+IsolationLevel+TopicArrayCnt+Topic */ + 4 + 1 + 4 + 100 + + /* PartArrayCnt */ + 4 + + /* partition_cnt * Partition+Time+MaxNumOffs */ + (rktpars->cnt * (4 + 8 + 4)), + rd_false); +} /** * @brief Parses a ListOffsets reply. * * Returns the parsed offsets (and errors) in \p offsets which must have been - * initialized by caller. + * initialized by caller. If \p result_info is passed instead, + * it's populated with rd_kafka_ListOffsetsResultInfo_t instances. + * + * Either \p offsets or \p result_info must be passed. + * and the one that is passed is populated. * * @returns 0 on success, else an error (\p offsets may be completely or * partially updated, depending on the nature of the error, and per * partition error codes should be checked by the caller). */ -static rd_kafka_resp_err_t +rd_kafka_resp_err_t rd_kafka_parse_ListOffsets(rd_kafka_buf_t *rkbuf, - rd_kafka_topic_partition_list_t *offsets) { + rd_kafka_topic_partition_list_t *offsets, + rd_list_t *result_infos) { const int log_decode_errors = LOG_ERR; int32_t TopicArrayCnt; int16_t api_version; rd_kafka_resp_err_t all_err = RD_KAFKA_RESP_ERR_NO_ERROR; + rd_bool_t return_result_infos; + rd_assert((offsets != NULL) ^ (result_infos != NULL)); + return_result_infos = result_infos != NULL; api_version = rkbuf->rkbuf_reqhdr.ApiVersion; @@ -504,35 +574,37 @@ rd_kafka_parse_ListOffsets(rd_kafka_buf_t *rkbuf, * Broker may return offsets in a different constellation than * in the original request .*/ - rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt); + rd_kafka_buf_read_arraycnt(rkbuf, &TopicArrayCnt, RD_KAFKAP_TOPICS_MAX); while (TopicArrayCnt-- > 0) { - rd_kafkap_str_t ktopic; + rd_kafkap_str_t Topic; int32_t PartArrayCnt; char *topic_name; - rd_kafka_buf_read_str(rkbuf, &ktopic); - rd_kafka_buf_read_i32(rkbuf, &PartArrayCnt); + rd_kafka_buf_read_str(rkbuf, &Topic); + rd_kafka_buf_read_arraycnt(rkbuf, &PartArrayCnt, + RD_KAFKAP_PARTITIONS_MAX); - RD_KAFKAP_STR_DUPA(&topic_name, &ktopic); + RD_KAFKAP_STR_DUPA(&topic_name, &Topic); while (PartArrayCnt-- > 0) { - int32_t kpartition; + int32_t Partition; int16_t ErrorCode; int32_t OffsetArrayCnt; int64_t Offset = -1; int32_t LeaderEpoch = -1; + int64_t Timestamp = -1; rd_kafka_topic_partition_t *rktpar; - rd_kafka_buf_read_i32(rkbuf, &kpartition); + rd_kafka_buf_read_i32(rkbuf, &Partition); rd_kafka_buf_read_i16(rkbuf, &ErrorCode); if (api_version >= 1) { - int64_t Timestamp; rd_kafka_buf_read_i64(rkbuf, &Timestamp); rd_kafka_buf_read_i64(rkbuf, &Offset); if (api_version >= 4) rd_kafka_buf_read_i32(rkbuf, &LeaderEpoch); + rd_kafka_buf_skip_tags(rkbuf); } else if (api_version == 0) { rd_kafka_buf_read_i32(rkbuf, &OffsetArrayCnt); /* We only request one offset so just grab @@ -543,16 +615,32 @@ rd_kafka_parse_ListOffsets(rd_kafka_buf_t *rkbuf, RD_NOTREACHED(); } - rktpar = rd_kafka_topic_partition_list_add( - offsets, topic_name, kpartition); - rktpar->err = ErrorCode; - rktpar->offset = Offset; - rd_kafka_topic_partition_set_leader_epoch(rktpar, - LeaderEpoch); + if (likely(!return_result_infos)) { + rktpar = rd_kafka_topic_partition_list_add( + offsets, topic_name, Partition); + rktpar->err = ErrorCode; + rktpar->offset = Offset; + rd_kafka_topic_partition_set_leader_epoch( + rktpar, LeaderEpoch); + } else { + rktpar = rd_kafka_topic_partition_new( + topic_name, Partition); + rktpar->err = ErrorCode; + rktpar->offset = Offset; + rd_kafka_topic_partition_set_leader_epoch( + rktpar, LeaderEpoch); + rd_kafka_ListOffsetsResultInfo_t *result_info = + rd_kafka_ListOffsetsResultInfo_new( + rktpar, Timestamp); + rd_list_add(result_infos, result_info); + rd_kafka_topic_partition_destroy(rktpar); + } if (ErrorCode && !all_err) all_err = ErrorCode; } + + rd_kafka_buf_skip_tags(rkbuf); } return all_err; @@ -561,91 +649,6 @@ rd_kafka_parse_ListOffsets(rd_kafka_buf_t *rkbuf, return rkbuf->rkbuf_err; } - - -/** - * @brief Parses and handles ListOffsets replies. - * - * Returns the parsed offsets (and errors) in \p offsets. - * \p offsets must be initialized by the caller. - * - * @returns 0 on success, else an error. \p offsets may be populated on error, - * depending on the nature of the error. - * On error \p actionsp (unless NULL) is updated with the recommended - * error actions. - */ -rd_kafka_resp_err_t -rd_kafka_handle_ListOffsets(rd_kafka_t *rk, - rd_kafka_broker_t *rkb, - rd_kafka_resp_err_t err, - rd_kafka_buf_t *rkbuf, - rd_kafka_buf_t *request, - rd_kafka_topic_partition_list_t *offsets, - int *actionsp) { - - int actions; - - if (!err) - err = rd_kafka_parse_ListOffsets(rkbuf, offsets); - if (!err) - return RD_KAFKA_RESP_ERR_NO_ERROR; - - actions = rd_kafka_err_action( - rkb, err, request, RD_KAFKA_ERR_ACTION_PERMANENT, - RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART, - - RD_KAFKA_ERR_ACTION_REFRESH, - RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION, - - RD_KAFKA_ERR_ACTION_REFRESH, - RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE, - - RD_KAFKA_ERR_ACTION_REFRESH, RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR, - - RD_KAFKA_ERR_ACTION_REFRESH, RD_KAFKA_RESP_ERR_OFFSET_NOT_AVAILABLE, - - RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY, - RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE, - - RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY, - RD_KAFKA_RESP_ERR_FENCED_LEADER_EPOCH, - - RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY, - RD_KAFKA_RESP_ERR_UNKNOWN_LEADER_EPOCH, - - RD_KAFKA_ERR_ACTION_RETRY, RD_KAFKA_RESP_ERR__TRANSPORT, - - RD_KAFKA_ERR_ACTION_RETRY, RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT, - - - RD_KAFKA_ERR_ACTION_END); - - if (actionsp) - *actionsp = actions; - - if (rkb) - rd_rkb_dbg( - rkb, TOPIC, "OFFSET", "OffsetRequest failed: %s (%s)", - rd_kafka_err2str(err), rd_kafka_actions2str(actions)); - - if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { - char tmp[256]; - /* Re-query for leader */ - rd_snprintf(tmp, sizeof(tmp), "ListOffsetsRequest failed: %s", - rd_kafka_err2str(err)); - rd_kafka_metadata_refresh_known_topics(rk, NULL, - rd_true /*force*/, tmp); - } - - if ((actions & RD_KAFKA_ERR_ACTION_RETRY) && - rd_kafka_buf_retry(rkb, request)) - return RD_KAFKA_RESP_ERR__IN_PROGRESS; - - return err; -} - - - /** * @brief Async maker for ListOffsetsRequest. */ @@ -653,8 +656,11 @@ static rd_kafka_resp_err_t rd_kafka_make_ListOffsetsRequest(rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf, void *make_opaque) { - const rd_kafka_topic_partition_list_t *partitions = - (const rd_kafka_topic_partition_list_t *)make_opaque; + rd_kafka_ListOffsetRequest_parameters_t *parameters = make_opaque; + const rd_kafka_topic_partition_list_t *partitions = parameters->rktpars; + int isolation_level = parameters->isolation_level; + char *errstr = parameters->errstr; + size_t errstr_size = parameters->errstr_size; int i; size_t of_TopicArrayCnt = 0, of_PartArrayCnt = 0; const char *last_topic = ""; @@ -662,20 +668,31 @@ rd_kafka_make_ListOffsetsRequest(rd_kafka_broker_t *rkb, int16_t ApiVersion; ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_ListOffsets, 0, 5, NULL); - if (ApiVersion == -1) + rkb, RD_KAFKAP_ListOffsets, 0, 7, NULL); + if (ApiVersion == -1) { + if (errstr) { + rd_snprintf( + errstr, errstr_size, + "ListOffsets (KIP-396) not supported " + "by broker, requires broker version >= 2.5.0"); + } return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; + } + + if (ApiVersion >= 6) { + rd_kafka_buf_upgrade_flexver_request(rkbuf); + } /* ReplicaId */ rd_kafka_buf_write_i32(rkbuf, -1); /* IsolationLevel */ if (ApiVersion >= 2) - rd_kafka_buf_write_i8(rkbuf, - rkb->rkb_rk->rk_conf.isolation_level); + rd_kafka_buf_write_i8(rkbuf, isolation_level); /* TopicArrayCnt */ - of_TopicArrayCnt = rd_kafka_buf_write_i32(rkbuf, 0); /* updated later */ + of_TopicArrayCnt = + rd_kafka_buf_write_arraycnt_pos(rkbuf); /* updated later */ for (i = 0; i < partitions->cnt; i++) { const rd_kafka_topic_partition_t *rktpar = @@ -683,9 +700,12 @@ rd_kafka_make_ListOffsetsRequest(rd_kafka_broker_t *rkb, if (strcmp(rktpar->topic, last_topic)) { /* Finish last topic, if any. */ - if (of_PartArrayCnt > 0) - rd_kafka_buf_update_i32(rkbuf, of_PartArrayCnt, - part_cnt); + if (of_PartArrayCnt > 0) { + rd_kafka_buf_finalize_arraycnt( + rkbuf, of_PartArrayCnt, part_cnt); + /* Topics tags */ + rd_kafka_buf_write_tags(rkbuf); + } /* Topic */ rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1); @@ -695,7 +715,8 @@ rd_kafka_make_ListOffsetsRequest(rd_kafka_broker_t *rkb, part_cnt = 0; /* PartitionArrayCnt: updated later */ - of_PartArrayCnt = rd_kafka_buf_write_i32(rkbuf, 0); + of_PartArrayCnt = + rd_kafka_buf_write_arraycnt_pos(rkbuf); } /* Partition */ @@ -716,12 +737,18 @@ rd_kafka_make_ListOffsetsRequest(rd_kafka_broker_t *rkb, /* MaxNumberOfOffsets */ rd_kafka_buf_write_i32(rkbuf, 1); } + + /* Partitions tags */ + rd_kafka_buf_write_tags(rkbuf); } if (of_PartArrayCnt > 0) { - rd_kafka_buf_update_i32(rkbuf, of_PartArrayCnt, part_cnt); - rd_kafka_buf_update_i32(rkbuf, of_TopicArrayCnt, topic_cnt); + rd_kafka_buf_finalize_arraycnt(rkbuf, of_PartArrayCnt, + part_cnt); + /* Topics tags */ + rd_kafka_buf_write_tags(rkbuf); } + rd_kafka_buf_finalize_arraycnt(rkbuf, of_TopicArrayCnt, topic_cnt); rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); @@ -734,39 +761,166 @@ rd_kafka_make_ListOffsetsRequest(rd_kafka_broker_t *rkb, return RD_KAFKA_RESP_ERR_NO_ERROR; } - /** * @brief Send ListOffsetsRequest for partitions in \p partitions. + * Set absolute timeout \p timeout_ms if >= 0. */ void rd_kafka_ListOffsetsRequest(rd_kafka_broker_t *rkb, rd_kafka_topic_partition_list_t *partitions, rd_kafka_replyq_t replyq, rd_kafka_resp_cb_t *resp_cb, + int timeout_ms, void *opaque) { rd_kafka_buf_t *rkbuf; - rd_kafka_topic_partition_list_t *make_parts; + rd_kafka_topic_partition_list_t *rktpars; + rd_kafka_ListOffsetRequest_parameters_t *params; - make_parts = rd_kafka_topic_partition_list_copy(partitions); - rd_kafka_topic_partition_list_sort_by_topic(make_parts); + rktpars = rd_kafka_topic_partition_list_copy(partitions); + rd_kafka_topic_partition_list_sort_by_topic(rktpars); - rkbuf = rd_kafka_buf_new_request( - rkb, RD_KAFKAP_ListOffsets, 1, - /* ReplicaId+IsolationLevel+TopicArrayCnt+Topic */ - 4 + 1 + 4 + 100 + - /* PartArrayCnt */ - 4 + - /* partition_cnt * Partition+Time+MaxNumOffs */ - (make_parts->cnt * (4 + 8 + 4))); + params = rd_kafka_ListOffsetRequest_parameters_new( + rktpars, + (rd_kafka_IsolationLevel_t)rkb->rkb_rk->rk_conf.isolation_level, + NULL, 0); + + rkbuf = rd_kafka_ListOffsetRequest_buf_new(rkb, partitions); + + if (timeout_ms >= 0) + rd_kafka_buf_set_abs_timeout(rkbuf, timeout_ms, 0); /* Postpone creating the request contents until time to send, * at which time the ApiVersion is known. */ - rd_kafka_buf_set_maker(rkbuf, rd_kafka_make_ListOffsetsRequest, - make_parts, - rd_kafka_topic_partition_list_destroy_free); + rd_kafka_buf_set_maker( + rkbuf, rd_kafka_make_ListOffsetsRequest, params, + rd_kafka_ListOffsetRequest_parameters_destroy_free); rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); } +/** + * @brief Send ListOffsetsRequest for offsets contained in the first + * element of \p offsets, that is a rd_kafka_topic_partition_list_t. + * AdminClient compatible request callback. + */ +rd_kafka_resp_err_t rd_kafka_ListOffsetsRequest_admin( + rd_kafka_broker_t *rkb, + const rd_list_t *offsets /* rd_kafka_topic_partition_list_t*/, + rd_kafka_AdminOptions_t *options, + char *errstr, + size_t errstr_size, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + rd_kafka_ListOffsetRequest_parameters_t params; + rd_kafka_IsolationLevel_t isolation_level; + rd_kafka_topic_partition_list_t *topic_partitions; + rd_kafka_buf_t *rkbuf; + rd_kafka_resp_err_t err; + topic_partitions = rd_list_elem(offsets, 0); + + isolation_level = RD_KAFKA_ISOLATION_LEVEL_READ_UNCOMMITTED; + if (options && options->isolation_level.u.INT.v) + isolation_level = options->isolation_level.u.INT.v; + + params = rd_kafka_ListOffsetRequest_parameters_make( + topic_partitions, isolation_level, errstr, errstr_size); + + rkbuf = rd_kafka_ListOffsetRequest_buf_new(rkb, topic_partitions); + + err = rd_kafka_make_ListOffsetsRequest(rkb, rkbuf, ¶ms); + + if (err) { + rd_kafka_buf_destroy(rkbuf); + rd_kafka_replyq_destroy(&replyq); + return err; + } + + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + +/** + * @brief Parses and handles ListOffsets replies. + * + * Returns the parsed offsets (and errors) in \p offsets. + * \p offsets must be initialized by the caller. + * + * @returns 0 on success, else an error. \p offsets may be populated on error, + * depending on the nature of the error. + * On error \p actionsp (unless NULL) is updated with the recommended + * error actions. + */ +rd_kafka_resp_err_t +rd_kafka_handle_ListOffsets(rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + rd_kafka_resp_err_t err, + rd_kafka_buf_t *rkbuf, + rd_kafka_buf_t *request, + rd_kafka_topic_partition_list_t *offsets, + int *actionsp) { + + int actions; + + if (!err) { + err = rd_kafka_parse_ListOffsets(rkbuf, offsets, NULL); + } + if (!err) + return RD_KAFKA_RESP_ERR_NO_ERROR; + + actions = rd_kafka_err_action( + rkb, err, request, RD_KAFKA_ERR_ACTION_PERMANENT, + RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART, + + RD_KAFKA_ERR_ACTION_REFRESH, + RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION, + + RD_KAFKA_ERR_ACTION_REFRESH, + RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE, + + RD_KAFKA_ERR_ACTION_REFRESH, RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR, + + RD_KAFKA_ERR_ACTION_REFRESH, RD_KAFKA_RESP_ERR_OFFSET_NOT_AVAILABLE, + + RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY, + RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE, + + RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY, + RD_KAFKA_RESP_ERR_FENCED_LEADER_EPOCH, + + RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY, + RD_KAFKA_RESP_ERR_UNKNOWN_LEADER_EPOCH, + + RD_KAFKA_ERR_ACTION_RETRY, RD_KAFKA_RESP_ERR__TRANSPORT, + + RD_KAFKA_ERR_ACTION_RETRY, RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT, + + RD_KAFKA_ERR_ACTION_END); + + if (actionsp) + *actionsp = actions; + + if (rkb) + rd_rkb_dbg( + rkb, TOPIC, "OFFSET", "OffsetRequest failed: %s (%s)", + rd_kafka_err2str(err), rd_kafka_actions2str(actions)); + + if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { + char tmp[256]; + /* Re-query for leader */ + rd_snprintf(tmp, sizeof(tmp), "ListOffsetsRequest failed: %s", + rd_kafka_err2str(err)); + rd_kafka_metadata_refresh_known_topics(rk, NULL, + rd_true /*force*/, tmp); + } + + if ((actions & RD_KAFKA_ERR_ACTION_RETRY) && + rd_kafka_buf_retry(rkb, request)) + return RD_KAFKA_RESP_ERR__IN_PROGRESS; + + return err; +} + /** * @brief OffsetForLeaderEpochResponse handler. diff --git a/src/rdkafka_request.h b/src/rdkafka_request.h index a921c26684..ec94b0a5a0 100644 --- a/src/rdkafka_request.h +++ b/src/rdkafka_request.h @@ -101,6 +101,7 @@ rd_kafka_FindCoordinatorRequest(rd_kafka_broker_t *rkb, rd_kafka_resp_cb_t *resp_cb, void *opaque); + rd_kafka_resp_err_t rd_kafka_handle_ListOffsets(rd_kafka_t *rk, rd_kafka_broker_t *rkb, @@ -114,8 +115,24 @@ void rd_kafka_ListOffsetsRequest(rd_kafka_broker_t *rkb, rd_kafka_topic_partition_list_t *offsets, rd_kafka_replyq_t replyq, rd_kafka_resp_cb_t *resp_cb, + int timeout_ms, void *opaque); +rd_kafka_resp_err_t +rd_kafka_ListOffsetsRequest_admin(rd_kafka_broker_t *rkb, + const rd_list_t *offsets, + rd_kafka_AdminOptions_t *options, + char *errstr, + size_t errstr_size, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque); + +rd_kafka_resp_err_t +rd_kafka_parse_ListOffsets(rd_kafka_buf_t *rkbuf, + rd_kafka_topic_partition_list_t *offsets, + rd_list_t *result_infos); + rd_kafka_resp_err_t rd_kafka_handle_OffsetForLeaderEpoch(rd_kafka_t *rk, rd_kafka_broker_t *rkb, diff --git a/tests/0031-get_offsets.c b/tests/0031-get_offsets.c index 25f6df588e..573e36b10f 100644 --- a/tests/0031-get_offsets.c +++ b/tests/0031-get_offsets.c @@ -3,6 +3,7 @@ * librdkafka - Apache Kafka C library * * Copyright (c) 2012-2022, Magnus Edenhill + * 2023, Confluent Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -36,10 +37,122 @@ /** - * Verify that rd_kafka_(query|get)_watermark_offsets() works. + * @brief Verify that rd_kafka_query_watermark_offsets times out in case we're + * unable to fetch offsets within the timeout (Issue #2588). + */ +void test_query_watermark_offsets_timeout(void) { + int64_t qry_low, qry_high; + rd_kafka_resp_err_t err; + const char *topic = test_mk_topic_name(__FUNCTION__, 1); + rd_kafka_mock_cluster_t *mcluster; + rd_kafka_t *rk; + rd_kafka_conf_t *conf; + const char *bootstraps; + const int timeout_ms = 1000; + + if (test_needs_auth()) { + TEST_SKIP("Mock cluster does not support SSL/SASL\n"); + return; + } + + SUB_TEST_QUICK(); + + mcluster = test_mock_cluster_new(1, &bootstraps); + rd_kafka_mock_topic_create(mcluster, topic, 1, 1); + rd_kafka_mock_broker_push_request_error_rtts( + mcluster, 1, RD_KAFKAP_ListOffsets, 1, RD_KAFKA_RESP_ERR_NO_ERROR, + (int)(timeout_ms * 1.2)); + + test_conf_init(&conf, NULL, 30); + test_conf_set(conf, "bootstrap.servers", bootstraps); + rk = test_create_handle(RD_KAFKA_PRODUCER, conf); + + + err = rd_kafka_query_watermark_offsets(rk, topic, 0, &qry_low, + &qry_high, timeout_ms); + + TEST_ASSERT(err == RD_KAFKA_RESP_ERR__TIMED_OUT, + "Querying watermark offsets should fail with %s when RTT > " + "timeout, instead got %s", + rd_kafka_err2name(RD_KAFKA_RESP_ERR__TIMED_OUT), + rd_kafka_err2name(err)); + + rd_kafka_destroy(rk); + test_mock_cluster_destroy(mcluster); + + SUB_TEST_PASS(); +} + +/** + * @brief Query watermark offsets should be able to query the correct + * leader immediately after a leader change. */ +void test_query_watermark_offsets_leader_change(void) { + int64_t qry_low, qry_high; + rd_kafka_resp_err_t err; + const char *topic = test_mk_topic_name(__FUNCTION__, 1); + rd_kafka_mock_cluster_t *mcluster; + rd_kafka_t *rk; + rd_kafka_conf_t *conf; + const char *bootstraps; + const int timeout_ms = 1000; + + if (test_needs_auth()) { + TEST_SKIP("Mock cluster does not support SSL/SASL\n"); + return; + } + + SUB_TEST_QUICK(); + + mcluster = test_mock_cluster_new(2, &bootstraps); + rd_kafka_mock_topic_create(mcluster, topic, 1, 2); + + /* Leader is broker 1 */ + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1); + + test_conf_init(&conf, NULL, 30); + test_conf_set(conf, "bootstrap.servers", bootstraps); + rk = test_create_handle(RD_KAFKA_PRODUCER, conf); + + err = rd_kafka_query_watermark_offsets(rk, topic, 0, &qry_low, + &qry_high, timeout_ms); + + TEST_ASSERT(err == RD_KAFKA_RESP_ERR_NO_ERROR, + "Querying watermark offsets succeed on the first broker" + "and cache the leader, got %s", + rd_kafka_err2name(err)); + + /* Leader is broker 2 */ + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 2); + + /* First call returns NOT_LEADER_FOR_PARTITION, second one should go to + * the second broker and return NO_ERROR instead of + * NOT_LEADER_FOR_PARTITION. */ + err = rd_kafka_query_watermark_offsets(rk, topic, 0, &qry_low, + &qry_high, timeout_ms); + + TEST_ASSERT(err == RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION, + "Querying watermark offsets should fail with " + "NOT_LEADER_FOR_PARTITION, got %s", + rd_kafka_err2name(err)); + + err = rd_kafka_query_watermark_offsets(rk, topic, 0, &qry_low, + &qry_high, timeout_ms); + TEST_ASSERT(err == RD_KAFKA_RESP_ERR_NO_ERROR, + "Querying watermark offsets should succeed by " + "querying the second broker, got %s", + rd_kafka_err2name(err)); + + rd_kafka_destroy(rk); + test_mock_cluster_destroy(mcluster); + SUB_TEST_PASS(); +} + +/** + * Verify that rd_kafka_(query|get)_watermark_offsets() works. + */ int main_0031_get_offsets(int argc, char **argv) { const char *topic = test_mk_topic_name(__FUNCTION__, 1); const int msgcnt = test_quick ? 10 : 100; @@ -115,51 +228,14 @@ int main_0031_get_offsets(int argc, char **argv) { rd_kafka_topic_destroy(rkt); rd_kafka_destroy(rk); - return 0; } -/* - * Verify that rd_kafka_query_watermark_offsets times out in case we're unable - * to fetch offsets within the timeout (Issue #2588). - */ int main_0031_get_offsets_mock(int argc, char **argv) { - int64_t qry_low, qry_high; - rd_kafka_resp_err_t err; - const char *topic = test_mk_topic_name(__FUNCTION__, 1); - rd_kafka_mock_cluster_t *mcluster; - rd_kafka_t *rk; - rd_kafka_conf_t *conf; - const char *bootstraps; - const int timeout_ms = 1000; - - if (test_needs_auth()) { - TEST_SKIP("Mock cluster does not support SSL/SASL\n"); - return 0; - } - - mcluster = test_mock_cluster_new(1, &bootstraps); - rd_kafka_mock_topic_create(mcluster, topic, 1, 1); - rd_kafka_mock_broker_push_request_error_rtts( - mcluster, 1, RD_KAFKAP_ListOffsets, 1, RD_KAFKA_RESP_ERR_NO_ERROR, - (int)(timeout_ms * 1.2)); - - test_conf_init(&conf, NULL, 30); - test_conf_set(conf, "bootstrap.servers", bootstraps); - rk = test_create_handle(RD_KAFKA_PRODUCER, conf); + test_query_watermark_offsets_timeout(); - err = rd_kafka_query_watermark_offsets(rk, topic, 0, &qry_low, - &qry_high, timeout_ms); - - TEST_ASSERT(err == RD_KAFKA_RESP_ERR__TIMED_OUT, - "Querying watermark offsets should fail with %s when RTT > " - "timeout, instead got %s", - rd_kafka_err2name(RD_KAFKA_RESP_ERR__TIMED_OUT), - rd_kafka_err2name(err)); - - rd_kafka_destroy(rk); - test_mock_cluster_destroy(mcluster); + test_query_watermark_offsets_leader_change(); return 0; } diff --git a/tests/0081-admin.c b/tests/0081-admin.c index c8c6fcc7ab..f788983986 100644 --- a/tests/0081-admin.c +++ b/tests/0081-admin.c @@ -4935,6 +4935,166 @@ static void do_test_UserScramCredentials(const char *what, SUB_TEST_PASS(); } +static void do_test_ListOffsets(const char *what, + rd_kafka_t *rk, + rd_kafka_queue_t *useq, + int req_timeout_ms) { + char errstr[512]; + const char *topic = test_mk_topic_name(__FUNCTION__, 1); + char *message = "Message"; + rd_kafka_AdminOptions_t *options; + rd_kafka_event_t *event; + rd_kafka_queue_t *q; + rd_kafka_t *p; + size_t i = 0; + rd_kafka_topic_partition_list_t *topic_partitions; + int64_t basetimestamp = 10000000; + int64_t timestamps[] = { + basetimestamp + 100, + basetimestamp + 400, + basetimestamp + 250, + }; + struct test_fixture_s { + int64_t query; + int64_t expected; + int min_broker_version; + } test_fixtures[] = { + {.query = RD_KAFKA_OFFSET_SPEC_EARLIEST, .expected = 0}, + {.query = RD_KAFKA_OFFSET_SPEC_LATEST, .expected = 3}, + {.query = RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP, + .expected = 1, + .min_broker_version = TEST_BRKVER(3, 0, 0, 0)}, + {.query = basetimestamp + 50, .expected = 0}, + {.query = basetimestamp + 300, .expected = 1}, + {.query = basetimestamp + 150, .expected = 1}, + }; + + SUB_TEST_QUICK( + "%s ListOffsets with %s, " + "request_timeout %d", + rd_kafka_name(rk), what, req_timeout_ms); + + q = useq ? useq : rd_kafka_queue_new(rk); + + test_CreateTopics_simple(rk, NULL, (char **)&topic, 1, 1, NULL); + + p = test_create_producer(); + for (i = 0; i < RD_ARRAY_SIZE(timestamps); i++) { + rd_kafka_producev( + /* Producer handle */ + p, + /* Topic name */ + RD_KAFKA_V_TOPIC(topic), + /* Make a copy of the payload. */ + RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), + /* Message value and length */ + RD_KAFKA_V_VALUE(message, strlen(message)), + + RD_KAFKA_V_TIMESTAMP(timestamps[i]), + /* Per-Message opaque, provided in + * delivery report callback as + * msg_opaque. */ + RD_KAFKA_V_OPAQUE(NULL), + /* End sentinel */ + RD_KAFKA_V_END); + } + + rd_kafka_flush(p, 20 * 1000); + rd_kafka_destroy(p); + + /* Set timeout (optional) */ + options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_LISTOFFSETS); + + TEST_CALL_ERR__(rd_kafka_AdminOptions_set_request_timeout( + options, 30 * 1000 /* 30s */, errstr, sizeof(errstr))); + + TEST_CALL_ERROR__(rd_kafka_AdminOptions_set_isolation_level( + options, RD_KAFKA_ISOLATION_LEVEL_READ_COMMITTED)); + + topic_partitions = rd_kafka_topic_partition_list_new(1); + rd_kafka_topic_partition_list_add(topic_partitions, topic, 0); + + for (i = 0; i < RD_ARRAY_SIZE(test_fixtures); i++) { + rd_bool_t retry = rd_true; + rd_kafka_topic_partition_list_t *topic_partitions_copy; + + struct test_fixture_s test_fixture = test_fixtures[i]; + if (test_fixture.min_broker_version && + test_broker_version < test_fixture.min_broker_version) { + TEST_SAY("Skipping offset %" PRId64 + ", as not supported\n", + test_fixture.query); + continue; + } + + TEST_SAY("Testing offset %" PRId64 "\n", test_fixture.query); + + topic_partitions_copy = + rd_kafka_topic_partition_list_copy(topic_partitions); + + /* Set OffsetSpec */ + topic_partitions_copy->elems[0].offset = test_fixture.query; + + while (retry) { + rd_kafka_resp_err_t err; + /* Call ListOffsets */ + rd_kafka_ListOffsets(rk, topic_partitions_copy, options, + q); + /* Wait for results */ + event = rd_kafka_queue_poll(q, -1 /*indefinitely*/); + if (!event) + TEST_FAIL("Event missing"); + + err = rd_kafka_event_error(event); + if (err == RD_KAFKA_RESP_ERR__NOENT) { + rd_kafka_event_destroy(event); + /* Still looking for the leader */ + rd_usleep(100000, 0); + continue; + } else if (err) { + TEST_FAIL("Failed with error: %s", + rd_kafka_err2name(err)); + } + + const rd_kafka_ListOffsets_result_t *result; + const rd_kafka_ListOffsetsResultInfo_t **result_infos; + size_t cnt; + size_t j; + result = rd_kafka_event_ListOffsets_result(event); + result_infos = + rd_kafka_ListOffsets_result_infos(result, &cnt); + for (j = 0; j < cnt; j++) { + const rd_kafka_topic_partition_t *topic_partition = + rd_kafka_ListOffsetsResultInfo_topic_partition( + result_infos[j]); + TEST_ASSERT( + topic_partition->err == 0, + "Expected error NO_ERROR, got %s", + rd_kafka_err2name(topic_partition->err)); + TEST_ASSERT(topic_partition->offset == + test_fixture.expected, + "Expected offset %" PRId64 + ", got %" PRId64, + test_fixture.expected, + topic_partition->offset); + } + rd_kafka_event_destroy(event); + retry = rd_false; + } + rd_kafka_topic_partition_list_destroy(topic_partitions_copy); + } + + rd_kafka_AdminOptions_destroy(options); + rd_kafka_topic_partition_list_destroy(topic_partitions); + + test_DeleteTopics_simple(rk, NULL, (char **)&topic, 1, NULL); + + if (!useq) + rd_kafka_queue_destroy(q); + + SUB_TEST_PASS(); +} + static void do_test_apis(rd_kafka_type_t cltype) { rd_kafka_t *rk; rd_kafka_conf_t *conf; @@ -4953,6 +5113,7 @@ static void do_test_apis(rd_kafka_type_t cltype) { test_conf_init(&conf, NULL, 180); test_conf_set(conf, "socket.timeout.ms", "10000"); + rk = test_create_handle(cltype, conf); mainq = rd_kafka_queue_get_main(rk); @@ -5054,6 +5215,10 @@ static void do_test_apis(rd_kafka_type_t cltype) { } if (test_broker_version >= TEST_BRKVER(2, 5, 0, 0)) { + /* ListOffsets */ + do_test_ListOffsets("temp queue", rk, NULL, -1); + do_test_ListOffsets("main queue", rk, mainq, 1500); + /* Alter committed offsets */ do_test_AlterConsumerGroupOffsets("temp queue", rk, NULL, -1, rd_false, rd_true); diff --git a/tests/0138-admin_mock.c b/tests/0138-admin_mock.c index 0f9021de97..32c67c09d7 100644 --- a/tests/0138-admin_mock.c +++ b/tests/0138-admin_mock.c @@ -175,6 +175,99 @@ static void do_test_AlterConsumerGroupOffsets_errors(int req_timeout_ms) { #undef TEST_ERR_SIZE } +/** + * @brief A leader change should remove metadata cache for a topic + * queried in ListOffsets. + */ +static void do_test_ListOffsets_leader_change(void) { + size_t cnt; + rd_kafka_conf_t *conf; + rd_kafka_mock_cluster_t *mcluster; + const char *bootstraps; + const char *topic = "test"; + rd_kafka_t *rk; + rd_kafka_queue_t *q; + rd_kafka_topic_partition_list_t *to_list; + rd_kafka_event_t *rkev; + rd_kafka_resp_err_t err; + const rd_kafka_ListOffsets_result_t *result; + const rd_kafka_ListOffsetsResultInfo_t **result_infos; + + test_conf_init(&conf, NULL, 60); + + mcluster = test_mock_cluster_new(2, &bootstraps); + rd_kafka_mock_topic_create(mcluster, topic, 1, 2); + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1); + test_conf_set(conf, "bootstrap.servers", bootstraps); + + rk = test_create_handle(RD_KAFKA_CONSUMER, conf); + + q = rd_kafka_queue_get_main(rk); + + to_list = rd_kafka_topic_partition_list_new(1); + rd_kafka_topic_partition_list_add(to_list, topic, 0)->offset = -1; + + TEST_SAY("First ListOffsets call to leader broker 1\n"); + rd_kafka_ListOffsets(rk, to_list, NULL, q); + + rkev = rd_kafka_queue_poll(q, -1); + + TEST_ASSERT(rd_kafka_event_type(rkev) == + RD_KAFKA_EVENT_LISTOFFSETS_RESULT, + "Expected LISTOFFSETS_RESULT event type, got %d", + rd_kafka_event_type(rkev)); + + TEST_CALL_ERR__(rd_kafka_event_error(rkev)); + + rd_kafka_event_destroy(rkev); + + + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 2); + + TEST_SAY( + "Second ListOffsets call to leader broker 1, returns " + "NOT_LEADER_OR_FOLLOWER" + " and invalidates cache\n"); + rd_kafka_ListOffsets(rk, to_list, NULL, q); + + rkev = rd_kafka_queue_poll(q, -1); + result = rd_kafka_event_ListOffsets_result(rkev); + result_infos = rd_kafka_ListOffsets_result_infos(result, &cnt); + + TEST_ASSERT(cnt == 1, "Result topic cnt should be 1, got %" PRIusz, + cnt); + err = rd_kafka_ListOffsetsResultInfo_topic_partition(result_infos[0]) + ->err; + TEST_ASSERT(err == RD_KAFKA_RESP_ERR_NOT_LEADER_OR_FOLLOWER, + "Expected event error NOT_LEADER_OR_FOLLOWER, got %s", + rd_kafka_err2name(err)); + + rd_kafka_event_destroy(rkev); + + TEST_SAY( + "Third ListOffsets call to leader broker 2, returns NO_ERROR\n"); + rd_kafka_ListOffsets(rk, to_list, NULL, q); + + rkev = rd_kafka_queue_poll(q, -1); + result = rd_kafka_event_ListOffsets_result(rkev); + result_infos = rd_kafka_ListOffsets_result_infos(result, &cnt); + + TEST_ASSERT(cnt == 1, "Result topic cnt should be 1, got %" PRIusz, + cnt); + err = rd_kafka_ListOffsetsResultInfo_topic_partition(result_infos[0]) + ->err; + TEST_ASSERT(err == RD_KAFKA_RESP_ERR_NO_ERROR, + "Expected event error NO_ERROR, got %s", + rd_kafka_err2name(err)); + + rd_kafka_event_destroy(rkev); + + rd_kafka_topic_partition_list_destroy(to_list); + rd_kafka_queue_destroy(q); + rd_kafka_destroy(rk); + test_mock_cluster_destroy(mcluster); +} + int main_0138_admin_mock(int argc, char **argv) { if (test_needs_auth()) { @@ -185,5 +278,7 @@ int main_0138_admin_mock(int argc, char **argv) { do_test_AlterConsumerGroupOffsets_errors(-1); do_test_AlterConsumerGroupOffsets_errors(1000); + do_test_ListOffsets_leader_change(); + return 0; }