Skip to content

Commit

Permalink
[fr33m0nk]: Refactoring of all tests
Browse files Browse the repository at this point in the history
  • Loading branch information
fr33m0nk committed Feb 23, 2024
1 parent cf4c974 commit 570c75b
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
[fr33m0nk.alpakka-kafka.consumer :as consumer]
[fr33m0nk.alpakka-kafka.producer :as producer]
[fr33m0nk.utils :as utils]
[fr33m0nk.test-utils :as ftu])
[fr33m0nk.test-utils :as tu])
(:import (java.util.concurrent CompletableFuture)
(org.apache.kafka.common.serialization StringSerializer StringDeserializer)))

Expand All @@ -26,17 +26,21 @@

(deftest kafka-stream-with-producer-test
(testing "kafka stream producing multiple messages per consumed message test"
(ftu/with-kafka-test-container in-topic out-topic bootstrap-servers
(tu/with-kafka-test-container
in-topic out-topic bootstrap-servers
(let [actor-system (actor/->actor-system "test-actor-system")
committer-settings (committer/committer-settings actor-system {:batch-size 2})
consumer-settings (consumer/consumer-settings actor-system
{:group-id "alpakka-consumer"
:bootstrap-servers bootstrap-servers
:key-deserializer (StringDeserializer.)
:value-deserializer (StringDeserializer.)})
producer-settings (producer/producer-settings actor-system {:bootstrap-servers bootstrap-servers
:key-serializer (StringSerializer.)
:value-serializer (StringSerializer.)})
consumer-settings (consumer/consumer-settings-from-actor-system-config actor-system
{:consumer-config-key "akka.kafka.consumer"
:group-id "alpakka-consumer-group-1"
:key-deserializer (StringDeserializer.)
:value-deserializer (StringDeserializer.)
:bootstrap-servers bootstrap-servers})
producer-settings (producer/producer-settings-from-actor-system-config actor-system
{:producer-config-key "akka.kafka.producer"
:key-serializer (StringSerializer.)
:value-serializer (StringSerializer.)
:bootstrap-servers bootstrap-servers})
processing-fn (fn [producer-topic message]
(->> (repeat 5 message)
(mapv #(producer/->producer-record producer-topic (str/upper-case %)))))
Expand All @@ -47,10 +51,10 @@
(into [] (comp (map #(repeat 5 %)) cat)))]
(try
(Thread/sleep 1000)
(run! (fn [{:keys [key value]}] (ftu/send-record bootstrap-servers in-topic key value)) in-messages)
(run! (fn [{:keys [key value]}] (tu/send-record bootstrap-servers in-topic key value)) in-messages)
(Thread/sleep 1000)

(is (= expected-out (ftu/read-records bootstrap-servers out-topic 1000)))
(is (= expected-out (tu/read-records bootstrap-servers out-topic 1000)))

(finally
@(consumer/drain-and-shutdown consumer-control
Expand Down
18 changes: 10 additions & 8 deletions test/fr33m0nk/kafka_stream_sink_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
[fr33m0nk.alpakka-kafka.committer :as committer]
[fr33m0nk.alpakka-kafka.consumer :as consumer]
[fr33m0nk.utils :as utils]
[fr33m0nk.test-utils :as ftu])
[fr33m0nk.test-utils :as tu])
(:import (java.util.concurrent CompletableFuture)
(org.apache.kafka.common.serialization StringDeserializer)))

Expand All @@ -26,14 +26,16 @@

(deftest kafka-stream-sink-only-test
(testing "kafka stream sink test"
(ftu/with-kafka-test-container in-topic out-topic bootstrap-servers
(tu/with-kafka-test-container
in-topic out-topic bootstrap-servers
(let [actor-system (actor/->actor-system "test-actor-system")
committer-settings (committer/committer-settings actor-system {:batch-size 2})
consumer-settings (consumer/consumer-settings actor-system
{:group-id "alpakka-consumer"
:bootstrap-servers bootstrap-servers
:key-deserializer (StringDeserializer.)
:value-deserializer (StringDeserializer.)})
consumer-settings (consumer/consumer-settings-from-actor-system-config actor-system
{:consumer-config-key "akka.kafka.consumer"
:group-id "alpakka-consumer-group-1"
:key-deserializer (StringDeserializer.)
:value-deserializer (StringDeserializer.)
:bootstrap-servers bootstrap-servers})
actual-messages (atom [])
processing-fn (fn [message]
(let [key (consumer/key message)
Expand All @@ -44,7 +46,7 @@
expected [{:key "key-1" :value "msg-1"} {:key "key-2" :value "msg-2"} {:key "key-3" :value "msg-3"}]]
(try
(Thread/sleep 1000)
(run! (fn [{:keys [key value]}] (ftu/send-record bootstrap-servers in-topic key value)) expected)
(run! (fn [{:keys [key value]}] (tu/send-record bootstrap-servers in-topic key value)) expected)
(Thread/sleep 1000)
(is (= expected @actual-messages))
(finally
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
[fr33m0nk.alpakka-kafka.consumer :as consumer]
[fr33m0nk.alpakka-kafka.producer :as producer]
[fr33m0nk.utils :as utils]
[fr33m0nk.test-utils :as ftu])
[fr33m0nk.test-utils :as tu])
(:import (java.util.concurrent CompletableFuture)
(org.apache.kafka.common.serialization StringSerializer StringDeserializer)))

Expand All @@ -28,17 +28,21 @@

(deftest kafka-stream-producing-multiple-messages-with-at-least-once-semantics-test
(testing "kafka stream producing multiple messages with at least once semantics test"
(ftu/with-kafka-test-container in-topic out-topic bootstrap-servers
(tu/with-kafka-test-container
in-topic out-topic bootstrap-servers
(let [actor-system (actor/->actor-system "test-actor-system")
committer-settings (committer/committer-settings actor-system {:batch-size 2})
consumer-settings (consumer/consumer-settings actor-system
{:group-id "alpakka-consumer"
:bootstrap-servers bootstrap-servers
:key-deserializer (StringDeserializer.)
:value-deserializer (StringDeserializer.)})
producer-settings (producer/producer-settings actor-system {:bootstrap-servers bootstrap-servers
:key-serializer (StringSerializer.)
:value-serializer (StringSerializer.)})
consumer-settings (consumer/consumer-settings-from-actor-system-config actor-system
{:consumer-config-key "akka.kafka.consumer"
:group-id "alpakka-consumer-group-1"
:key-deserializer (StringDeserializer.)
:value-deserializer (StringDeserializer.)
:bootstrap-servers bootstrap-servers})
producer-settings (producer/producer-settings-from-actor-system-config actor-system
{:producer-config-key "akka.kafka.producer"
:key-serializer (StringSerializer.)
:value-serializer (StringSerializer.)
:bootstrap-servers bootstrap-servers})
processing-fn (fn [producer-topic message]
(->> (repeat 3 message)
(mapv #(producer/->producer-record producer-topic (str/upper-case %)))))
Expand All @@ -49,10 +53,10 @@
(into [] (comp (map #(repeat 3 %)) cat)))]
(try
(Thread/sleep 1000)
(run! (fn [{:keys [key value]}] (ftu/send-record bootstrap-servers in-topic key value)) in-messages)
(run! (fn [{:keys [key value]}] (tu/send-record bootstrap-servers in-topic key value)) in-messages)
(Thread/sleep 1000)

(is (= expected-out (ftu/read-records bootstrap-servers out-topic 1000)))
(is (= expected-out (tu/read-records bootstrap-servers out-topic 1000)))

(finally
@(consumer/drain-and-shutdown consumer-control
Expand Down
35 changes: 20 additions & 15 deletions test/fr33m0nk/kafka_stream_with_error_handling.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@
[fr33m0nk.alpakka-kafka.producer :as producer]
[fr33m0nk.akka.restart-source :as restart]
[fr33m0nk.utils :as utils]
[fr33m0nk.test-utils :as ftu])
[fr33m0nk.test-utils :as tu])
(:import (java.util.concurrent CompletableFuture)
(org.apache.kafka.common.serialization StringSerializer StringDeserializer)))

(defn kafka-stream-with-error-handling
[actor-system restart-settings consumer-settings committer-settings producer-settings consumer-topics producer-topic restart-count processing-fn]
(let [restart-counter (atom 1)
consumer-control-reference (atom consumer/create-noop-control)]
consumer-control-reference (atom consumer/create-noop-control)
recovered-from-crashes (promise)]
(-> (restart/->restart-source-on-failures-with-backoff
restart-settings
(fn []
Expand All @@ -28,6 +29,7 @@
;; throwing exception to simulate processing failure and stream-restart and incrementing restart-counter
(when (<= @restart-counter restart-count)
(throw (ex-info (str "Simulating processing failure - " @restart-counter) {:error-count (swap! restart-counter inc)})))
(deliver recovered-from-crashes true)
(let [_key (consumer/key message)
value (consumer/value message)
committable-offset (consumer/committable-offset message)
Expand All @@ -37,40 +39,43 @@
(s/map producer/producer-message-passthrough)
(s/via (committer/flow committer-settings)))))
(s/run-with s/ignoring-sink actor-system))
consumer-control-reference))
[consumer-control-reference recovered-from-crashes]))

(deftest kafka-stream-producing-multiple-messages-with-at-least-once-semantics-test
(testing "kafka stream with error handling test"
(ftu/with-kafka-test-container
(tu/with-kafka-test-container
in-topic out-topic bootstrap-servers
(let [in-messages [{:key "key-1" :value "msg-1"} {:key "key-2" :value "msg-2"} {:key "key-3" :value "msg-3"}]
_ (run! (fn [{:keys [key value]}] (ftu/send-record bootstrap-servers in-topic key value)) in-messages)
_ (run! (fn [{:keys [key value]}] (tu/send-record bootstrap-servers in-topic key value)) in-messages)
actor-system (actor/->actor-system "test-actor-system")
committer-settings (committer/committer-settings actor-system {:batch-size 2})
consumer-settings (consumer/consumer-settings-from-actor-system-config actor-system
{:consumer-config-key "akka.kafka.consumer"
:group-id "alpakka-consumer-group-1"
:key-deserializer (StringDeserializer.)
:value-deserializer (StringDeserializer.)
:bootstrap-servers bootstrap-servers})
:group-id "alpakka-consumer-group-1"
:key-deserializer (StringDeserializer.)
:value-deserializer (StringDeserializer.)
:bootstrap-servers bootstrap-servers})
producer-settings (producer/producer-settings-from-actor-system-config actor-system
{:producer-config-key "akka.kafka.producer"
:key-serializer (StringSerializer.)
:value-serializer (StringSerializer.)
:bootstrap-servers bootstrap-servers})
restart-settings (restart/restart-settings 100 200 0.1 {})
restart-settings (restart/restart-settings 1000 1000 0.1 {})
processing-fn (fn [producer-topic message]
(->> (repeat 3 message)
(mapv #(producer/->producer-record producer-topic (str/upper-case %)))))
consumer-control-reference (kafka-stream-with-error-handling actor-system restart-settings consumer-settings
committer-settings producer-settings
[in-topic] out-topic 1 processing-fn)
[consumer-control-reference recovered-from-crashes]
(kafka-stream-with-error-handling actor-system restart-settings consumer-settings
committer-settings producer-settings
[in-topic] out-topic 1 processing-fn)

expected-out (->> [{:key nil :value "MSG-1"} {:key nil :value "MSG-2"} {:key nil :value "MSG-3"}]
(into [] (comp (map #(repeat 3 %)) cat)))]
(try
(Thread/sleep 300000)
(is (= expected-out (ftu/read-records bootstrap-servers out-topic 240000)))
(loop []
(when-not (realized? recovered-from-crashes)
(recur)))
(is (= expected-out (tu/read-records bootstrap-servers out-topic 240000)))

(finally
@(consumer/drain-and-shutdown @consumer-control-reference
Expand Down
28 changes: 16 additions & 12 deletions test/fr33m0nk/kafka_stream_with_producer_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
[fr33m0nk.alpakka-kafka.consumer :as consumer]
[fr33m0nk.alpakka-kafka.producer :as producer]
[fr33m0nk.utils :as utils]
[fr33m0nk.test-utils :as ftu])
[fr33m0nk.test-utils :as tu])
(:import (java.util.concurrent CompletableFuture)
(org.apache.kafka.common.serialization StringSerializer StringDeserializer)))

Expand All @@ -26,27 +26,31 @@

(deftest kafka-stream-with-producer-test
(testing "kafka stream with producer test"
(ftu/with-kafka-test-container in-topic out-topic bootstrap-servers
(tu/with-kafka-test-container
in-topic out-topic bootstrap-servers
(let [actor-system (actor/->actor-system "test-actor-system")
committer-settings (committer/committer-settings actor-system {:batch-size 2})
consumer-settings (consumer/consumer-settings actor-system
{:group-id "alpakka-consumer"
:bootstrap-servers bootstrap-servers
:key-deserializer (StringDeserializer.)
:value-deserializer (StringDeserializer.)})
producer-settings (producer/producer-settings actor-system {:bootstrap-servers bootstrap-servers
:key-serializer (StringSerializer.)
:value-serializer (StringSerializer.)})
consumer-settings (consumer/consumer-settings-from-actor-system-config actor-system
{:consumer-config-key "akka.kafka.consumer"
:group-id "alpakka-consumer-group-1"
:key-deserializer (StringDeserializer.)
:value-deserializer (StringDeserializer.)
:bootstrap-servers bootstrap-servers})
producer-settings (producer/producer-settings-from-actor-system-config actor-system
{:producer-config-key "akka.kafka.producer"
:key-serializer (StringSerializer.)
:value-serializer (StringSerializer.)
:bootstrap-servers bootstrap-servers})
processing-fn (fn [message] (str/upper-case message))
consumer-control (kafka-stream-with-producer actor-system consumer-settings committer-settings producer-settings [in-topic] out-topic processing-fn)
in-messages [{:key "key-1" :value "msg-1"} {:key "key-2" :value "msg-2"} {:key "key-3" :value "msg-3"}]
expected-out [{:key nil :value "MSG-1"} {:key nil :value "MSG-2"} {:key nil :value "MSG-3"}]]
(try
(Thread/sleep 1000)
(run! (fn [{:keys [key value]}] (ftu/send-record bootstrap-servers in-topic key value)) in-messages)
(run! (fn [{:keys [key value]}] (tu/send-record bootstrap-servers in-topic key value)) in-messages)
(Thread/sleep 1000)

(is (= expected-out (ftu/read-records bootstrap-servers out-topic 1000)))
(is (= expected-out (tu/read-records bootstrap-servers out-topic 1000)))

(finally
@(consumer/drain-and-shutdown consumer-control
Expand Down
Loading

0 comments on commit 570c75b

Please # to comment.