From 9f09706e731bd33e037bf43cbd8ca548eb325472 Mon Sep 17 00:00:00 2001 From: Trevor Brown Date: Fri, 26 Apr 2024 15:40:31 -0400 Subject: [PATCH] Fix errors caught by Dialyzer --- lib/broadway_kafka/brod_client.ex | 4 ++-- lib/broadway_kafka/kafka_client.ex | 7 ++++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/lib/broadway_kafka/brod_client.ex b/lib/broadway_kafka/brod_client.ex index f0d9be5..fae2212 100644 --- a/lib/broadway_kafka/brod_client.ex +++ b/lib/broadway_kafka/brod_client.ex @@ -80,7 +80,7 @@ defmodule BroadwayKafka.BrodClient do offset_reset_policy: offset_reset_policy, begin_offset: begin_offset, group_config: [{:offset_commit_policy, @offset_commit_policy} | group_config], - fetch_config: Map.new(fetch_config || []), + fetch_config: Map.new(fetch_config), client_config: client_config, shared_client: shared_client, shared_client_id: build_shared_client_id(opts) @@ -111,7 +111,7 @@ defmodule BroadwayKafka.BrodClient do def ack(group_coordinator, generation_id, topic, partition, offset, config) do :brod_group_coordinator.ack(group_coordinator, generation_id, topic, partition, offset) - if group_coordinator && config.offset_commit_on_ack do + if config.offset_commit_on_ack do :brod_group_coordinator.commit_offsets(group_coordinator, [{{topic, partition}, offset}]) end diff --git a/lib/broadway_kafka/kafka_client.ex b/lib/broadway_kafka/kafka_client.ex index dda9c3e..0976c56 100644 --- a/lib/broadway_kafka/kafka_client.ex +++ b/lib/broadway_kafka/kafka_client.ex @@ -15,6 +15,7 @@ defmodule BroadwayKafka.KafkaClient do } @typep offset_reset_policy :: :earliest | :latest + @typep brod_group_coordinator :: pid() @callback init(opts :: any) :: {:ok, config} | {:error, any} @callback setup( @@ -23,9 +24,9 @@ defmodule BroadwayKafka.KafkaClient do callback_module :: module, config ) :: - {:ok, group_coordinator :: pid} | {:error, any} + {:ok, group_coordinator :: brod_group_coordinator()} | {:error, any} @callback ack( - group_coordinator :: pid, + group_coordinator :: brod_group_coordinator, generation_id :: integer, topic :: binary, partition :: integer, @@ -51,7 +52,7 @@ defmodule BroadwayKafka.KafkaClient do ) :: offset :: integer | no_return() - @callback update_topics(:brod.group_coordinator(), [:brod.topic()]) :: :ok + @callback update_topics(brod_group_coordinator(), [:brod.topic()]) :: :ok @callback connected?(:brod.client()) :: boolean @callback disconnect(:brod.client()) :: :ok end