diff --git a/test/fr33m0nk/kafka_stream_producing_multiple_messages_per_consumed_message.clj b/test/fr33m0nk/kafka_stream_producing_multiple_messages_per_consumed_message.clj index 8fd94db..f1b624a 100644 --- a/test/fr33m0nk/kafka_stream_producing_multiple_messages_per_consumed_message.clj +++ b/test/fr33m0nk/kafka_stream_producing_multiple_messages_per_consumed_message.clj @@ -7,7 +7,7 @@ [fr33m0nk.alpakka-kafka.consumer :as consumer] [fr33m0nk.alpakka-kafka.producer :as producer] [fr33m0nk.utils :as utils] - [fr33m0nk.test-utils :as ftu]) + [fr33m0nk.test-utils :as tu]) (:import (java.util.concurrent CompletableFuture) (org.apache.kafka.common.serialization StringSerializer StringDeserializer))) @@ -26,17 +26,21 @@ (deftest kafka-stream-with-producer-test (testing "kafka stream producing multiple messages per consumed message test" - (ftu/with-kafka-test-container in-topic out-topic bootstrap-servers + (tu/with-kafka-test-container + in-topic out-topic bootstrap-servers (let [actor-system (actor/->actor-system "test-actor-system") committer-settings (committer/committer-settings actor-system {:batch-size 2}) - consumer-settings (consumer/consumer-settings actor-system - {:group-id "alpakka-consumer" - :bootstrap-servers bootstrap-servers - :key-deserializer (StringDeserializer.) - :value-deserializer (StringDeserializer.)}) - producer-settings (producer/producer-settings actor-system {:bootstrap-servers bootstrap-servers - :key-serializer (StringSerializer.) - :value-serializer (StringSerializer.)}) + consumer-settings (consumer/consumer-settings-from-actor-system-config actor-system + {:consumer-config-key "akka.kafka.consumer" + :group-id "alpakka-consumer-group-1" + :key-deserializer (StringDeserializer.) + :value-deserializer (StringDeserializer.) + :bootstrap-servers bootstrap-servers}) + producer-settings (producer/producer-settings-from-actor-system-config actor-system + {:producer-config-key "akka.kafka.producer" + :key-serializer (StringSerializer.) + :value-serializer (StringSerializer.) + :bootstrap-servers bootstrap-servers}) processing-fn (fn [producer-topic message] (->> (repeat 5 message) (mapv #(producer/->producer-record producer-topic (str/upper-case %))))) @@ -47,10 +51,10 @@ (into [] (comp (map #(repeat 5 %)) cat)))] (try (Thread/sleep 1000) - (run! (fn [{:keys [key value]}] (ftu/send-record bootstrap-servers in-topic key value)) in-messages) + (run! (fn [{:keys [key value]}] (tu/send-record bootstrap-servers in-topic key value)) in-messages) (Thread/sleep 1000) - (is (= expected-out (ftu/read-records bootstrap-servers out-topic 1000))) + (is (= expected-out (tu/read-records bootstrap-servers out-topic 1000))) (finally @(consumer/drain-and-shutdown consumer-control diff --git a/test/fr33m0nk/kafka_stream_sink_test.clj b/test/fr33m0nk/kafka_stream_sink_test.clj index efda3c6..9e20d81 100644 --- a/test/fr33m0nk/kafka_stream_sink_test.clj +++ b/test/fr33m0nk/kafka_stream_sink_test.clj @@ -5,7 +5,7 @@ [fr33m0nk.alpakka-kafka.committer :as committer] [fr33m0nk.alpakka-kafka.consumer :as consumer] [fr33m0nk.utils :as utils] - [fr33m0nk.test-utils :as ftu]) + [fr33m0nk.test-utils :as tu]) (:import (java.util.concurrent CompletableFuture) (org.apache.kafka.common.serialization StringDeserializer))) @@ -26,14 +26,16 @@ (deftest kafka-stream-sink-only-test (testing "kafka stream sink test" - (ftu/with-kafka-test-container in-topic out-topic bootstrap-servers + (tu/with-kafka-test-container + in-topic out-topic bootstrap-servers (let [actor-system (actor/->actor-system "test-actor-system") committer-settings (committer/committer-settings actor-system {:batch-size 2}) - consumer-settings (consumer/consumer-settings actor-system - {:group-id "alpakka-consumer" - :bootstrap-servers bootstrap-servers - :key-deserializer (StringDeserializer.) - :value-deserializer (StringDeserializer.)}) + consumer-settings (consumer/consumer-settings-from-actor-system-config actor-system + {:consumer-config-key "akka.kafka.consumer" + :group-id "alpakka-consumer-group-1" + :key-deserializer (StringDeserializer.) + :value-deserializer (StringDeserializer.) + :bootstrap-servers bootstrap-servers}) actual-messages (atom []) processing-fn (fn [message] (let [key (consumer/key message) @@ -44,7 +46,7 @@ expected [{:key "key-1" :value "msg-1"} {:key "key-2" :value "msg-2"} {:key "key-3" :value "msg-3"}]] (try (Thread/sleep 1000) - (run! (fn [{:keys [key value]}] (ftu/send-record bootstrap-servers in-topic key value)) expected) + (run! (fn [{:keys [key value]}] (tu/send-record bootstrap-servers in-topic key value)) expected) (Thread/sleep 1000) (is (= expected @actual-messages)) (finally diff --git a/test/fr33m0nk/kafka_stream_with_at_least_once_delivery_semantics.clj b/test/fr33m0nk/kafka_stream_with_at_least_once_delivery_semantics.clj index d5d57dd..2a1be45 100644 --- a/test/fr33m0nk/kafka_stream_with_at_least_once_delivery_semantics.clj +++ b/test/fr33m0nk/kafka_stream_with_at_least_once_delivery_semantics.clj @@ -7,7 +7,7 @@ [fr33m0nk.alpakka-kafka.consumer :as consumer] [fr33m0nk.alpakka-kafka.producer :as producer] [fr33m0nk.utils :as utils] - [fr33m0nk.test-utils :as ftu]) + [fr33m0nk.test-utils :as tu]) (:import (java.util.concurrent CompletableFuture) (org.apache.kafka.common.serialization StringSerializer StringDeserializer))) @@ -28,17 +28,21 @@ (deftest kafka-stream-producing-multiple-messages-with-at-least-once-semantics-test (testing "kafka stream producing multiple messages with at least once semantics test" - (ftu/with-kafka-test-container in-topic out-topic bootstrap-servers + (tu/with-kafka-test-container + in-topic out-topic bootstrap-servers (let [actor-system (actor/->actor-system "test-actor-system") committer-settings (committer/committer-settings actor-system {:batch-size 2}) - consumer-settings (consumer/consumer-settings actor-system - {:group-id "alpakka-consumer" - :bootstrap-servers bootstrap-servers - :key-deserializer (StringDeserializer.) - :value-deserializer (StringDeserializer.)}) - producer-settings (producer/producer-settings actor-system {:bootstrap-servers bootstrap-servers - :key-serializer (StringSerializer.) - :value-serializer (StringSerializer.)}) + consumer-settings (consumer/consumer-settings-from-actor-system-config actor-system + {:consumer-config-key "akka.kafka.consumer" + :group-id "alpakka-consumer-group-1" + :key-deserializer (StringDeserializer.) + :value-deserializer (StringDeserializer.) + :bootstrap-servers bootstrap-servers}) + producer-settings (producer/producer-settings-from-actor-system-config actor-system + {:producer-config-key "akka.kafka.producer" + :key-serializer (StringSerializer.) + :value-serializer (StringSerializer.) + :bootstrap-servers bootstrap-servers}) processing-fn (fn [producer-topic message] (->> (repeat 3 message) (mapv #(producer/->producer-record producer-topic (str/upper-case %))))) @@ -49,10 +53,10 @@ (into [] (comp (map #(repeat 3 %)) cat)))] (try (Thread/sleep 1000) - (run! (fn [{:keys [key value]}] (ftu/send-record bootstrap-servers in-topic key value)) in-messages) + (run! (fn [{:keys [key value]}] (tu/send-record bootstrap-servers in-topic key value)) in-messages) (Thread/sleep 1000) - (is (= expected-out (ftu/read-records bootstrap-servers out-topic 1000))) + (is (= expected-out (tu/read-records bootstrap-servers out-topic 1000))) (finally @(consumer/drain-and-shutdown consumer-control diff --git a/test/fr33m0nk/kafka_stream_with_error_handling.clj b/test/fr33m0nk/kafka_stream_with_error_handling.clj index 7a0ae66..e321778 100644 --- a/test/fr33m0nk/kafka_stream_with_error_handling.clj +++ b/test/fr33m0nk/kafka_stream_with_error_handling.clj @@ -8,14 +8,15 @@ [fr33m0nk.alpakka-kafka.producer :as producer] [fr33m0nk.akka.restart-source :as restart] [fr33m0nk.utils :as utils] - [fr33m0nk.test-utils :as ftu]) + [fr33m0nk.test-utils :as tu]) (:import (java.util.concurrent CompletableFuture) (org.apache.kafka.common.serialization StringSerializer StringDeserializer))) (defn kafka-stream-with-error-handling [actor-system restart-settings consumer-settings committer-settings producer-settings consumer-topics producer-topic restart-count processing-fn] (let [restart-counter (atom 1) - consumer-control-reference (atom consumer/create-noop-control)] + consumer-control-reference (atom consumer/create-noop-control) + recovered-from-crashes (promise)] (-> (restart/->restart-source-on-failures-with-backoff restart-settings (fn [] @@ -28,6 +29,7 @@ ;; throwing exception to simulate processing failure and stream-restart and incrementing restart-counter (when (<= @restart-counter restart-count) (throw (ex-info (str "Simulating processing failure - " @restart-counter) {:error-count (swap! restart-counter inc)}))) + (deliver recovered-from-crashes true) (let [_key (consumer/key message) value (consumer/value message) committable-offset (consumer/committable-offset message) @@ -37,40 +39,43 @@ (s/map producer/producer-message-passthrough) (s/via (committer/flow committer-settings))))) (s/run-with s/ignoring-sink actor-system)) - consumer-control-reference)) + [consumer-control-reference recovered-from-crashes])) (deftest kafka-stream-producing-multiple-messages-with-at-least-once-semantics-test (testing "kafka stream with error handling test" - (ftu/with-kafka-test-container + (tu/with-kafka-test-container in-topic out-topic bootstrap-servers (let [in-messages [{:key "key-1" :value "msg-1"} {:key "key-2" :value "msg-2"} {:key "key-3" :value "msg-3"}] - _ (run! (fn [{:keys [key value]}] (ftu/send-record bootstrap-servers in-topic key value)) in-messages) + _ (run! (fn [{:keys [key value]}] (tu/send-record bootstrap-servers in-topic key value)) in-messages) actor-system (actor/->actor-system "test-actor-system") committer-settings (committer/committer-settings actor-system {:batch-size 2}) consumer-settings (consumer/consumer-settings-from-actor-system-config actor-system {:consumer-config-key "akka.kafka.consumer" - :group-id "alpakka-consumer-group-1" - :key-deserializer (StringDeserializer.) - :value-deserializer (StringDeserializer.) - :bootstrap-servers bootstrap-servers}) + :group-id "alpakka-consumer-group-1" + :key-deserializer (StringDeserializer.) + :value-deserializer (StringDeserializer.) + :bootstrap-servers bootstrap-servers}) producer-settings (producer/producer-settings-from-actor-system-config actor-system {:producer-config-key "akka.kafka.producer" :key-serializer (StringSerializer.) :value-serializer (StringSerializer.) :bootstrap-servers bootstrap-servers}) - restart-settings (restart/restart-settings 100 200 0.1 {}) + restart-settings (restart/restart-settings 1000 1000 0.1 {}) processing-fn (fn [producer-topic message] (->> (repeat 3 message) (mapv #(producer/->producer-record producer-topic (str/upper-case %))))) - consumer-control-reference (kafka-stream-with-error-handling actor-system restart-settings consumer-settings - committer-settings producer-settings - [in-topic] out-topic 1 processing-fn) + [consumer-control-reference recovered-from-crashes] + (kafka-stream-with-error-handling actor-system restart-settings consumer-settings + committer-settings producer-settings + [in-topic] out-topic 1 processing-fn) expected-out (->> [{:key nil :value "MSG-1"} {:key nil :value "MSG-2"} {:key nil :value "MSG-3"}] (into [] (comp (map #(repeat 3 %)) cat)))] (try - (Thread/sleep 300000) - (is (= expected-out (ftu/read-records bootstrap-servers out-topic 240000))) + (loop [] + (when-not (realized? recovered-from-crashes) + (recur))) + (is (= expected-out (tu/read-records bootstrap-servers out-topic 240000))) (finally @(consumer/drain-and-shutdown @consumer-control-reference diff --git a/test/fr33m0nk/kafka_stream_with_producer_test.clj b/test/fr33m0nk/kafka_stream_with_producer_test.clj index 334f933..f9b2217 100644 --- a/test/fr33m0nk/kafka_stream_with_producer_test.clj +++ b/test/fr33m0nk/kafka_stream_with_producer_test.clj @@ -7,7 +7,7 @@ [fr33m0nk.alpakka-kafka.consumer :as consumer] [fr33m0nk.alpakka-kafka.producer :as producer] [fr33m0nk.utils :as utils] - [fr33m0nk.test-utils :as ftu]) + [fr33m0nk.test-utils :as tu]) (:import (java.util.concurrent CompletableFuture) (org.apache.kafka.common.serialization StringSerializer StringDeserializer))) @@ -26,27 +26,31 @@ (deftest kafka-stream-with-producer-test (testing "kafka stream with producer test" - (ftu/with-kafka-test-container in-topic out-topic bootstrap-servers + (tu/with-kafka-test-container + in-topic out-topic bootstrap-servers (let [actor-system (actor/->actor-system "test-actor-system") committer-settings (committer/committer-settings actor-system {:batch-size 2}) - consumer-settings (consumer/consumer-settings actor-system - {:group-id "alpakka-consumer" - :bootstrap-servers bootstrap-servers - :key-deserializer (StringDeserializer.) - :value-deserializer (StringDeserializer.)}) - producer-settings (producer/producer-settings actor-system {:bootstrap-servers bootstrap-servers - :key-serializer (StringSerializer.) - :value-serializer (StringSerializer.)}) + consumer-settings (consumer/consumer-settings-from-actor-system-config actor-system + {:consumer-config-key "akka.kafka.consumer" + :group-id "alpakka-consumer-group-1" + :key-deserializer (StringDeserializer.) + :value-deserializer (StringDeserializer.) + :bootstrap-servers bootstrap-servers}) + producer-settings (producer/producer-settings-from-actor-system-config actor-system + {:producer-config-key "akka.kafka.producer" + :key-serializer (StringSerializer.) + :value-serializer (StringSerializer.) + :bootstrap-servers bootstrap-servers}) processing-fn (fn [message] (str/upper-case message)) consumer-control (kafka-stream-with-producer actor-system consumer-settings committer-settings producer-settings [in-topic] out-topic processing-fn) in-messages [{:key "key-1" :value "msg-1"} {:key "key-2" :value "msg-2"} {:key "key-3" :value "msg-3"}] expected-out [{:key nil :value "MSG-1"} {:key nil :value "MSG-2"} {:key nil :value "MSG-3"}]] (try (Thread/sleep 1000) - (run! (fn [{:keys [key value]}] (ftu/send-record bootstrap-servers in-topic key value)) in-messages) + (run! (fn [{:keys [key value]}] (tu/send-record bootstrap-servers in-topic key value)) in-messages) (Thread/sleep 1000) - (is (= expected-out (ftu/read-records bootstrap-servers out-topic 1000))) + (is (= expected-out (tu/read-records bootstrap-servers out-topic 1000))) (finally @(consumer/drain-and-shutdown consumer-control diff --git a/test/fr33m0nk/kafka_stream_with_transactional_source_and_sink.clj b/test/fr33m0nk/kafka_stream_with_transactional_source_and_sink.clj index fef1539..d3abf66 100644 --- a/test/fr33m0nk/kafka_stream_with_transactional_source_and_sink.clj +++ b/test/fr33m0nk/kafka_stream_with_transactional_source_and_sink.clj @@ -7,12 +7,12 @@ [fr33m0nk.alpakka-kafka.producer :as producer] [fr33m0nk.akka.restart-source :as restart] [fr33m0nk.utils :as utils] - [fr33m0nk.test-utils :as ftu] + [fr33m0nk.test-utils :as tu] [fr33m0nk.alpakka-kafka.transactional :as transactional]) (:import (java.util.concurrent CompletableFuture) (org.apache.kafka.common.serialization StringSerializer StringDeserializer))) -(defn kafka-stream-with-transactional-source-and-sink +(defn transactional-kafka-stream-with-error-handling [actor-system restart-settings consumer-settings producer-settings consumer-topics producer-topic restart-count processing-fn] (let [consumer-control-reference (atom consumer/create-noop-control)] (-> (restart/->restart-source-on-failures-with-backoff @@ -34,20 +34,22 @@ (s/run-with s/ignoring-sink actor-system)) consumer-control-reference)) -(deftest kafka-stream-with-transactional-source-and-sink-test +(deftest transactional-kafka-stream-with-error-handling-test (testing "kafka stream with transactional source and sink test" - (ftu/with-kafka-test-container + (tu/with-kafka-test-container in-topic out-topic bootstrap-servers (let [actor-system (actor/->actor-system "test-actor-system") - consumer-settings (consumer/consumer-settings actor-system - {:group-id "alpakka-consumer" - :bootstrap-servers bootstrap-servers - :key-deserializer (StringDeserializer.) - :value-deserializer (StringDeserializer.) - :enable-auto-commit "false"}) - producer-settings (producer/producer-settings actor-system {:bootstrap-servers bootstrap-servers - :key-serializer (StringSerializer.) - :value-serializer (StringSerializer.)}) + consumer-settings (consumer/consumer-settings-from-actor-system-config actor-system + {:consumer-config-key "akka.kafka.consumer" + :group-id "alpakka-consumer-group-1" + :key-deserializer (StringDeserializer.) + :value-deserializer (StringDeserializer.) + :bootstrap-servers bootstrap-servers}) + producer-settings (producer/producer-settings-from-actor-system-config actor-system + {:producer-config-key "akka.kafka.producer" + :key-serializer (StringSerializer.) + :value-serializer (StringSerializer.) + :bootstrap-servers bootstrap-servers}) restart-settings (restart/restart-settings 1000 5000 0.2 {}) processing-fn (fn [producer-topic message] (->> (repeat 3 message) @@ -60,9 +62,9 @@ (into [] (comp (map #(repeat 3 %)) cat)))] (try (Thread/sleep 1000) - (run! (fn [{:keys [key value]}] (ftu/send-record bootstrap-servers in-topic key value)) in-messages) + (run! (fn [{:keys [key value]}] (tu/send-record bootstrap-servers in-topic key value)) in-messages) - (is (= expected-out (ftu/read-records bootstrap-servers out-topic 1000))) + (is (= expected-out (tu/read-records bootstrap-servers out-topic 1000))) (finally @(consumer/drain-and-shutdown @consumer-control-reference