From b683488779226e953a57f96c44d29b6179df80a9 Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 2 Dec 2020 14:41:29 +0100 Subject: [PATCH] Switched use of HTTP client form Faraday to Manticore in plugin code. Due to to some problems in configuring Faraday client (issue #63), where once the connection is customized it also need to reassing the adapter. This change something at framework level, and while on spec tests it worked smoothly in production revealed some problem. Fixes #63 --- CHANGELOG.md | 4 ++++ lib/logstash/inputs/kafka.rb | 2 +- lib/logstash/plugin_mixins/common.rb | 21 ++++++++++---------- logstash-integration-kafka.gemspec | 3 ++- spec/integration/inputs/kafka_spec.rb | 28 +++++++++++++-------------- 5 files changed, 32 insertions(+), 26 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bfa9f155..069f8022 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 10.7.0 + - Switched use from Faraday to Manticore as HTTP client library to access Schema Registry service + to fix issue [#63](https://github.com/logstash-plugins/logstash-integration-kafka/issue/63) + ## 10.6.0 - Added functionality to Kafka input to use Avro deserializer in retrieving data from Kafka. The schema is retrieved from an instance of Confluent's Schema Registry service [#51](https://github.com/logstash-plugins/logstash-integration-kafka/pull/51) diff --git a/lib/logstash/inputs/kafka.rb b/lib/logstash/inputs/kafka.rb index 5738806e..bebb8809 100644 --- a/lib/logstash/inputs/kafka.rb +++ b/lib/logstash/inputs/kafka.rb @@ -4,7 +4,7 @@ require 'java' require 'logstash-integration-kafka_jars.rb' require 'logstash/plugin_mixins/kafka_support' -require "faraday" +require 'manticore' require "json" require "logstash/json" require_relative '../plugin_mixins/common' diff --git a/lib/logstash/plugin_mixins/common.rb b/lib/logstash/plugin_mixins/common.rb index e6b18a51..a2bff722 100644 --- a/lib/logstash/plugin_mixins/common.rb +++ b/lib/logstash/plugin_mixins/common.rb @@ -45,20 +45,21 @@ def check_for_schema_registry_conflicts private def check_for_schema_registry_connectivity_and_subjects - client = Faraday.new(@schema_registry_url.to_s) do |conn| - if schema_registry_proxy && !schema_registry_proxy.empty? - conn.proxy = schema_registry_proxy.to_s - end - if schema_registry_key and !schema_registry_key.empty? - conn.basic_auth(schema_registry_key, schema_registry_secret.value) - end + options = {} + if schema_registry_proxy && !schema_registry_proxy.empty? + options[:proxy] = schema_registry_proxy.to_s end + if schema_registry_key and !schema_registry_key.empty? + options[:auth] = {:user => schema_registry_key, :password => schema_registry_secret.value} + end + client = Manticore::Client.new(options) + begin - response = client.get('/subjects') - rescue Faraday::Error => e + response = client.get(@schema_registry_url.to_s + '/subjects').body + rescue Manticore::ManticoreException => e raise LogStash::ConfigurationError.new("Schema registry service doesn't respond, error: #{e.message}") end - registered_subjects = JSON.parse response.body + registered_subjects = JSON.parse response expected_subjects = @topics.map { |t| "#{t}-value"} if (expected_subjects & registered_subjects).size != expected_subjects.size undefined_topic_subjects = expected_subjects - registered_subjects diff --git a/logstash-integration-kafka.gemspec b/logstash-integration-kafka.gemspec index ebba03df..03c25e05 100644 --- a/logstash-integration-kafka.gemspec +++ b/logstash-integration-kafka.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-integration-kafka' - s.version = '10.6.0' + s.version = '10.7.0' s.licenses = ['Apache-2.0'] s.summary = "Integration with Kafka - input and output plugins" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline "+ @@ -46,6 +46,7 @@ Gem::Specification.new do |s| s.add_runtime_dependency 'logstash-codec-json' s.add_runtime_dependency 'logstash-codec-plain' s.add_runtime_dependency 'stud', '>= 0.0.22', '< 0.1.0' + s.add_runtime_dependency "manticore", '>= 0.5.4', '< 1.0.0' s.add_development_dependency 'logstash-devutils' s.add_development_dependency 'rspec-wait' diff --git a/spec/integration/inputs/kafka_spec.rb b/spec/integration/inputs/kafka_spec.rb index fd653eb2..41924762 100644 --- a/spec/integration/inputs/kafka_spec.rb +++ b/spec/integration/inputs/kafka_spec.rb @@ -3,7 +3,7 @@ require "logstash/inputs/kafka" require "rspec/wait" require "stud/try" -require "faraday" +require "manticore" require "json" # Please run kafka_test_setup.sh prior to executing this integration test. @@ -164,11 +164,11 @@ def consume_messages(config, queue: Queue.new, timeout:, event_count:) before(:each) do response = save_avro_schema_to_schema_registry(File.join(Dir.pwd, "spec", "unit", "inputs", "avro_schema_fixture_payment.asvc"), SUBJECT_NAME) - expect( response.status ).to be(200) + expect( response.code ).to be(200) end after(:each) do - schema_registry_client = Faraday.new('http://localhost:8081') + schema_registry_client = Manticore::Client.new delete_remote_schema(schema_registry_client, SUBJECT_NAME) end @@ -187,26 +187,26 @@ def consume_messages(config, queue: Queue.new, timeout:, event_count:) def save_avro_schema_to_schema_registry(schema_file, subject_name) raw_schema = File.readlines(schema_file).map(&:chomp).join raw_schema_quoted = raw_schema.gsub('"', '\"') - response = Faraday.post("http://localhost:8081/subjects/#{subject_name}/versions", - '{"schema": "' + raw_schema_quoted + '"}', - "Content-Type" => "application/vnd.schemaregistry.v1+json") + response = Manticore.post("http://localhost:8081/subjects/#{subject_name}/versions", + body: '{"schema": "' + raw_schema_quoted + '"}', + headers: {"Content-Type" => "application/vnd.schemaregistry.v1+json"}) response end def delete_remote_schema(schema_registry_client, subject_name) - expect(schema_registry_client.delete("/subjects/#{subject_name}").status ).to be(200) - expect(schema_registry_client.delete("/subjects/#{subject_name}?permanent=true").status ).to be(200) + expect(schema_registry_client.delete("http://localhost:8081/subjects/#{subject_name}").code ).to be(200) + expect(schema_registry_client.delete("http://localhost:8081/subjects/#{subject_name}?permanent=true").code ).to be(200) end # AdminClientConfig = org.alpache.kafka.clients.admin.AdminClientConfig describe "Schema registry API", :integration => true do - let(:schema_registry) { Faraday.new('http://localhost:8081') } + let(:schema_registry) { Manticore::Client.new } context 'listing subject on clean instance' do it "should return an empty set" do - subjects = JSON.parse schema_registry.get('/subjects').body + subjects = JSON.parse schema_registry.get('http://localhost:8081/subjects').body expect( subjects ).to be_empty end end @@ -222,18 +222,18 @@ def delete_remote_schema(schema_registry_client, subject_name) response = save_avro_schema_to_schema_registry(File.join(Dir.pwd, "spec", "unit", "inputs", "avro_schema_fixture_payment.asvc"), "schema_test_1") expect( response.status ).to be(200) - expect( schema_registry.delete('/subjects/schema_test_1?permanent=false').status ).to be(200) + expect( schema_registry.delete('http://localhost:8081/subjects/schema_test_1?permanent=false').status ).to be(200) sleep(1) - subjects = JSON.parse schema_registry.get('/subjects').body + subjects = JSON.parse schema_registry.get('http://localhost:8081/subjects').body expect( subjects ).to be_empty end end context 'use the schema to serialize' do after(:each) do - expect( schema_registry.delete('/subjects/topic_avro-value').status ).to be(200) + expect( schema_registry.delete('http://localhost:8081/subjects/topic_avro-value').status ).to be(200) sleep 1 - expect( schema_registry.delete('/subjects/topic_avro-value?permanent=true').status ).to be(200) + expect( schema_registry.delete('http://localhost:8081/subjects/topic_avro-value?permanent=true').status ).to be(200) Stud.try(3.times, [StandardError, RSpec::Expectations::ExpectationNotMetError]) do wait(10).for do