Skip to content

Commit

Permalink
Changes
Browse files Browse the repository at this point in the history
  • Loading branch information
mahajanadhitya committed May 25, 2023
1 parent 34d87e2 commit cbd499c
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 11 deletions.
14 changes: 13 additions & 1 deletion examples/list_offsets.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ int main(int argc, char **argv) {
* offsets */
rd_kafka_AdminOptions_t *options; /* (Optional) Options for */
rd_kafka_event_t *event; /* Result event */
rd_kafka_resp_err_t api_error = 0;
int exitcode = 0;
int i;

Expand Down Expand Up @@ -170,9 +171,20 @@ int main(int argc, char **argv) {
/* Set OffsetSpec to EARLIEST */
topic_partition->offset = RD_KAFKA_OFFSET_SPEC_EARLIEST;

topic_partition = rd_kafka_topic_partition_list_add(topic_partitions,topicname,0);
topic_partition->offset = RD_KAFKA_OFFSET_SPEC_LATEST;

/* Call ListOffsets */
rd_kafka_ListOffsets(rk, topic_partitions, options, queue);
api_error = rd_kafka_ListOffsets(rk, topic_partitions, options, queue);
rd_kafka_AdminOptions_destroy(options);
if(api_error){
printf("Api Entry Point Error : %s\n",rd_kafka_err2str(api_error));
rd_kafka_queue_destroy(queue);

/* Destroy the producer instance */
rd_kafka_destroy(rk);
return api_error;
}
/* Wait for results */
event = rd_kafka_queue_poll(queue, -1 /*indefinitely*/);

Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -8104,7 +8104,7 @@ void rd_kafka_DeleteGroups(rd_kafka_t *rk,
* @brief ListOffsets for the given partitions in the partion_list_t
*/
RD_EXPORT
void rd_kafka_ListOffsets(rd_kafka_t *rk,
rd_kafka_resp_err_t rd_kafka_ListOffsets(rd_kafka_t *rk,
rd_kafka_topic_partition_list_t *topic_partitions,
const rd_kafka_AdminOptions_t *options,
rd_kafka_queue_t *rkqu);
Expand Down
12 changes: 3 additions & 9 deletions src/rdkafka_admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -3979,14 +3979,8 @@ void rd_kafka_DeleteRecords(rd_kafka_t *rk,
rd_kafka_DeleteRecords_leaders_queried_cb, rko_fanout);
}

int rd_kafka_topic_partition_cmp(rd_kafka_topic_partition_t *tp1,rd_kafka_topic_partition_t *tp2){
if(strcasecmp(tp1->topic,tp2->topic)){
return strcasecmp(tp1->topic,tp2->topic);
}
return tp1->partition - tp2->partition;
}
/*listOffsets*/
void rd_kafka_ListOffsets(rd_kafka_t *rk,
rd_kafka_resp_err_t rd_kafka_ListOffsets(rd_kafka_t *rk,
rd_kafka_topic_partition_list_t *topic_partitions,
const rd_kafka_AdminOptions_t *options,
rd_kafka_queue_t *rkqu){
Expand Down Expand Up @@ -4023,7 +4017,7 @@ void rd_kafka_ListOffsets(rd_kafka_t *rk,
"No Partitions to list Offsets for");
rd_kafka_admin_common_worker_destroy(rk, rko_fanout,
rd_true /*destroy*/);
return;
return RD_KAFKA_RESP_ERR__INVALID_ARG;
}
copied_topic_partitions = rd_kafka_topic_partition_list_copy(topic_partitions);
rd_list_init(&rko_fanout->rko_u.admin_request.args, 1, rd_kafka_topic_partition_list_destroy_free);
Expand All @@ -4034,7 +4028,7 @@ void rd_kafka_ListOffsets(rd_kafka_t *rk,
rk, copied_topic_partitions, rd_kafka_admin_timeout_remains(rko_fanout),
RD_KAFKA_REPLYQ(rk->rk_ops, 0),
rd_kafka_ListOffsets_leaders_queried_cb, rko_fanout);

return RD_KAFKA_RESP_ERR_NO_ERROR;

}

Expand Down

0 comments on commit cbd499c

Please # to comment.