diff --git a/CHANGELOG.md b/CHANGELOG.md index bfa9f155..069f8022 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 10.7.0 + - Switched use from Faraday to Manticore as HTTP client library to access Schema Registry service + to fix issue [#63](https://github.com/logstash-plugins/logstash-integration-kafka/issue/63) + ## 10.6.0 - Added functionality to Kafka input to use Avro deserializer in retrieving data from Kafka. The schema is retrieved from an instance of Confluent's Schema Registry service [#51](https://github.com/logstash-plugins/logstash-integration-kafka/pull/51) diff --git a/lib/logstash/inputs/kafka.rb b/lib/logstash/inputs/kafka.rb index 5738806e..bebb8809 100644 --- a/lib/logstash/inputs/kafka.rb +++ b/lib/logstash/inputs/kafka.rb @@ -4,7 +4,7 @@ require 'java' require 'logstash-integration-kafka_jars.rb' require 'logstash/plugin_mixins/kafka_support' -require "faraday" +require 'manticore' require "json" require "logstash/json" require_relative '../plugin_mixins/common' diff --git a/lib/logstash/plugin_mixins/common.rb b/lib/logstash/plugin_mixins/common.rb index e6b18a51..a2bff722 100644 --- a/lib/logstash/plugin_mixins/common.rb +++ b/lib/logstash/plugin_mixins/common.rb @@ -45,20 +45,21 @@ def check_for_schema_registry_conflicts private def check_for_schema_registry_connectivity_and_subjects - client = Faraday.new(@schema_registry_url.to_s) do |conn| - if schema_registry_proxy && !schema_registry_proxy.empty? - conn.proxy = schema_registry_proxy.to_s - end - if schema_registry_key and !schema_registry_key.empty? - conn.basic_auth(schema_registry_key, schema_registry_secret.value) - end + options = {} + if schema_registry_proxy && !schema_registry_proxy.empty? + options[:proxy] = schema_registry_proxy.to_s end + if schema_registry_key and !schema_registry_key.empty? + options[:auth] = {:user => schema_registry_key, :password => schema_registry_secret.value} + end + client = Manticore::Client.new(options) + begin - response = client.get('/subjects') - rescue Faraday::Error => e + response = client.get(@schema_registry_url.to_s + '/subjects').body + rescue Manticore::ManticoreException => e raise LogStash::ConfigurationError.new("Schema registry service doesn't respond, error: #{e.message}") end - registered_subjects = JSON.parse response.body + registered_subjects = JSON.parse response expected_subjects = @topics.map { |t| "#{t}-value"} if (expected_subjects & registered_subjects).size != expected_subjects.size undefined_topic_subjects = expected_subjects - registered_subjects diff --git a/logstash-integration-kafka.gemspec b/logstash-integration-kafka.gemspec index ebba03df..03c25e05 100644 --- a/logstash-integration-kafka.gemspec +++ b/logstash-integration-kafka.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-integration-kafka' - s.version = '10.6.0' + s.version = '10.7.0' s.licenses = ['Apache-2.0'] s.summary = "Integration with Kafka - input and output plugins" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline "+ @@ -46,6 +46,7 @@ Gem::Specification.new do |s| s.add_runtime_dependency 'logstash-codec-json' s.add_runtime_dependency 'logstash-codec-plain' s.add_runtime_dependency 'stud', '>= 0.0.22', '< 0.1.0' + s.add_runtime_dependency "manticore", '>= 0.5.4', '< 1.0.0' s.add_development_dependency 'logstash-devutils' s.add_development_dependency 'rspec-wait' diff --git a/spec/integration/inputs/kafka_spec.rb b/spec/integration/inputs/kafka_spec.rb index fd653eb2..41924762 100644 --- a/spec/integration/inputs/kafka_spec.rb +++ b/spec/integration/inputs/kafka_spec.rb @@ -3,7 +3,7 @@ require "logstash/inputs/kafka" require "rspec/wait" require "stud/try" -require "faraday" +require "manticore" require "json" # Please run kafka_test_setup.sh prior to executing this integration test. @@ -164,11 +164,11 @@ def consume_messages(config, queue: Queue.new, timeout:, event_count:) before(:each) do response = save_avro_schema_to_schema_registry(File.join(Dir.pwd, "spec", "unit", "inputs", "avro_schema_fixture_payment.asvc"), SUBJECT_NAME) - expect( response.status ).to be(200) + expect( response.code ).to be(200) end after(:each) do - schema_registry_client = Faraday.new('http://localhost:8081') + schema_registry_client = Manticore::Client.new delete_remote_schema(schema_registry_client, SUBJECT_NAME) end @@ -187,26 +187,26 @@ def consume_messages(config, queue: Queue.new, timeout:, event_count:) def save_avro_schema_to_schema_registry(schema_file, subject_name) raw_schema = File.readlines(schema_file).map(&:chomp).join raw_schema_quoted = raw_schema.gsub('"', '\"') - response = Faraday.post("http://localhost:8081/subjects/#{subject_name}/versions", - '{"schema": "' + raw_schema_quoted + '"}', - "Content-Type" => "application/vnd.schemaregistry.v1+json") + response = Manticore.post("http://localhost:8081/subjects/#{subject_name}/versions", + body: '{"schema": "' + raw_schema_quoted + '"}', + headers: {"Content-Type" => "application/vnd.schemaregistry.v1+json"}) response end def delete_remote_schema(schema_registry_client, subject_name) - expect(schema_registry_client.delete("/subjects/#{subject_name}").status ).to be(200) - expect(schema_registry_client.delete("/subjects/#{subject_name}?permanent=true").status ).to be(200) + expect(schema_registry_client.delete("http://localhost:8081/subjects/#{subject_name}").code ).to be(200) + expect(schema_registry_client.delete("http://localhost:8081/subjects/#{subject_name}?permanent=true").code ).to be(200) end # AdminClientConfig = org.alpache.kafka.clients.admin.AdminClientConfig describe "Schema registry API", :integration => true do - let(:schema_registry) { Faraday.new('http://localhost:8081') } + let(:schema_registry) { Manticore::Client.new } context 'listing subject on clean instance' do it "should return an empty set" do - subjects = JSON.parse schema_registry.get('/subjects').body + subjects = JSON.parse schema_registry.get('http://localhost:8081/subjects').body expect( subjects ).to be_empty end end @@ -222,18 +222,18 @@ def delete_remote_schema(schema_registry_client, subject_name) response = save_avro_schema_to_schema_registry(File.join(Dir.pwd, "spec", "unit", "inputs", "avro_schema_fixture_payment.asvc"), "schema_test_1") expect( response.status ).to be(200) - expect( schema_registry.delete('/subjects/schema_test_1?permanent=false').status ).to be(200) + expect( schema_registry.delete('http://localhost:8081/subjects/schema_test_1?permanent=false').status ).to be(200) sleep(1) - subjects = JSON.parse schema_registry.get('/subjects').body + subjects = JSON.parse schema_registry.get('http://localhost:8081/subjects').body expect( subjects ).to be_empty end end context 'use the schema to serialize' do after(:each) do - expect( schema_registry.delete('/subjects/topic_avro-value').status ).to be(200) + expect( schema_registry.delete('http://localhost:8081/subjects/topic_avro-value').status ).to be(200) sleep 1 - expect( schema_registry.delete('/subjects/topic_avro-value?permanent=true').status ).to be(200) + expect( schema_registry.delete('http://localhost:8081/subjects/topic_avro-value?permanent=true').status ).to be(200) Stud.try(3.times, [StandardError, RSpec::Expectations::ExpectationNotMetError]) do wait(10).for do