Skip to content

Commit

Permalink
Switched use of HTTP client form Faraday to Manticore in plugin code.
Browse files Browse the repository at this point in the history
Due to to some problems in configuring Faraday client (issue logstash-plugins#63), where once the connection is customized it also need to reassing the adapter.
This change something at framework level, and while on spec tests it worked smoothly in production revealed some problem.

Fixes logstash-plugins#63
  • Loading branch information
andsel committed Dec 2, 2020
1 parent c0b6bef commit b683488
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 26 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 10.7.0
- Switched use from Faraday to Manticore as HTTP client library to access Schema Registry service
to fix issue [#63](https://github.com/logstash-plugins/logstash-integration-kafka/issue/63)

## 10.6.0
- Added functionality to Kafka input to use Avro deserializer in retrieving data from Kafka. The schema is retrieved
from an instance of Confluent's Schema Registry service [#51](https://github.com/logstash-plugins/logstash-integration-kafka/pull/51)
Expand Down
2 changes: 1 addition & 1 deletion lib/logstash/inputs/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
require 'java'
require 'logstash-integration-kafka_jars.rb'
require 'logstash/plugin_mixins/kafka_support'
require "faraday"
require 'manticore'
require "json"
require "logstash/json"
require_relative '../plugin_mixins/common'
Expand Down
21 changes: 11 additions & 10 deletions lib/logstash/plugin_mixins/common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,21 @@ def check_for_schema_registry_conflicts

private
def check_for_schema_registry_connectivity_and_subjects
client = Faraday.new(@schema_registry_url.to_s) do |conn|
if schema_registry_proxy && !schema_registry_proxy.empty?
conn.proxy = schema_registry_proxy.to_s
end
if schema_registry_key and !schema_registry_key.empty?
conn.basic_auth(schema_registry_key, schema_registry_secret.value)
end
options = {}
if schema_registry_proxy && !schema_registry_proxy.empty?
options[:proxy] = schema_registry_proxy.to_s
end
if schema_registry_key and !schema_registry_key.empty?
options[:auth] = {:user => schema_registry_key, :password => schema_registry_secret.value}
end
client = Manticore::Client.new(options)

begin
response = client.get('/subjects')
rescue Faraday::Error => e
response = client.get(@schema_registry_url.to_s + '/subjects').body
rescue Manticore::ManticoreException => e
raise LogStash::ConfigurationError.new("Schema registry service doesn't respond, error: #{e.message}")
end
registered_subjects = JSON.parse response.body
registered_subjects = JSON.parse response
expected_subjects = @topics.map { |t| "#{t}-value"}
if (expected_subjects & registered_subjects).size != expected_subjects.size
undefined_topic_subjects = expected_subjects - registered_subjects
Expand Down
3 changes: 2 additions & 1 deletion logstash-integration-kafka.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-integration-kafka'
s.version = '10.6.0'
s.version = '10.7.0'
s.licenses = ['Apache-2.0']
s.summary = "Integration with Kafka - input and output plugins"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline "+
Expand Down Expand Up @@ -46,6 +46,7 @@ Gem::Specification.new do |s|
s.add_runtime_dependency 'logstash-codec-json'
s.add_runtime_dependency 'logstash-codec-plain'
s.add_runtime_dependency 'stud', '>= 0.0.22', '< 0.1.0'
s.add_runtime_dependency "manticore", '>= 0.5.4', '< 1.0.0'

s.add_development_dependency 'logstash-devutils'
s.add_development_dependency 'rspec-wait'
Expand Down
28 changes: 14 additions & 14 deletions spec/integration/inputs/kafka_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
require "logstash/inputs/kafka"
require "rspec/wait"
require "stud/try"
require "faraday"
require "manticore"
require "json"

# Please run kafka_test_setup.sh prior to executing this integration test.
Expand Down Expand Up @@ -164,11 +164,11 @@ def consume_messages(config, queue: Queue.new, timeout:, event_count:)

before(:each) do
response = save_avro_schema_to_schema_registry(File.join(Dir.pwd, "spec", "unit", "inputs", "avro_schema_fixture_payment.asvc"), SUBJECT_NAME)
expect( response.status ).to be(200)
expect( response.code ).to be(200)
end

after(:each) do
schema_registry_client = Faraday.new('http://localhost:8081')
schema_registry_client = Manticore::Client.new
delete_remote_schema(schema_registry_client, SUBJECT_NAME)
end

Expand All @@ -187,26 +187,26 @@ def consume_messages(config, queue: Queue.new, timeout:, event_count:)
def save_avro_schema_to_schema_registry(schema_file, subject_name)
raw_schema = File.readlines(schema_file).map(&:chomp).join
raw_schema_quoted = raw_schema.gsub('"', '\"')
response = Faraday.post("http://localhost:8081/subjects/#{subject_name}/versions",
'{"schema": "' + raw_schema_quoted + '"}',
"Content-Type" => "application/vnd.schemaregistry.v1+json")
response = Manticore.post("http://localhost:8081/subjects/#{subject_name}/versions",
body: '{"schema": "' + raw_schema_quoted + '"}',
headers: {"Content-Type" => "application/vnd.schemaregistry.v1+json"})
response
end

def delete_remote_schema(schema_registry_client, subject_name)
expect(schema_registry_client.delete("/subjects/#{subject_name}").status ).to be(200)
expect(schema_registry_client.delete("/subjects/#{subject_name}?permanent=true").status ).to be(200)
expect(schema_registry_client.delete("http://localhost:8081/subjects/#{subject_name}").code ).to be(200)
expect(schema_registry_client.delete("http://localhost:8081/subjects/#{subject_name}?permanent=true").code ).to be(200)
end

# AdminClientConfig = org.alpache.kafka.clients.admin.AdminClientConfig

describe "Schema registry API", :integration => true do

let(:schema_registry) { Faraday.new('http://localhost:8081') }
let(:schema_registry) { Manticore::Client.new }

context 'listing subject on clean instance' do
it "should return an empty set" do
subjects = JSON.parse schema_registry.get('/subjects').body
subjects = JSON.parse schema_registry.get('http://localhost:8081/subjects').body
expect( subjects ).to be_empty
end
end
Expand All @@ -222,18 +222,18 @@ def delete_remote_schema(schema_registry_client, subject_name)
response = save_avro_schema_to_schema_registry(File.join(Dir.pwd, "spec", "unit", "inputs", "avro_schema_fixture_payment.asvc"), "schema_test_1")
expect( response.status ).to be(200)

expect( schema_registry.delete('/subjects/schema_test_1?permanent=false').status ).to be(200)
expect( schema_registry.delete('http://localhost:8081/subjects/schema_test_1?permanent=false').status ).to be(200)
sleep(1)
subjects = JSON.parse schema_registry.get('/subjects').body
subjects = JSON.parse schema_registry.get('http://localhost:8081/subjects').body
expect( subjects ).to be_empty
end
end

context 'use the schema to serialize' do
after(:each) do
expect( schema_registry.delete('/subjects/topic_avro-value').status ).to be(200)
expect( schema_registry.delete('http://localhost:8081/subjects/topic_avro-value').status ).to be(200)
sleep 1
expect( schema_registry.delete('/subjects/topic_avro-value?permanent=true').status ).to be(200)
expect( schema_registry.delete('http://localhost:8081/subjects/topic_avro-value?permanent=true').status ).to be(200)

Stud.try(3.times, [StandardError, RSpec::Expectations::ExpectationNotMetError]) do
wait(10).for do
Expand Down

0 comments on commit b683488

Please # to comment.