-
Notifications
You must be signed in to change notification settings - Fork 61
/
Copy pathcommon.rb
92 lines (79 loc) · 3.68 KB
/
common.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
module LogStash
module PluginMixins
module KafkaAvroSchemaRegistry
def self.included(base)
base.extend(self)
base.setup_schema_registry_config
end
def setup_schema_registry_config
# Option to set key to access Schema Registry.
config :schema_registry_key, :validate => :string
# Option to set secret to access Schema Registry.
config :schema_registry_secret, :validate => :password
# Option to set the endpoint of the Schema Registry.
# This option permit the usage of Avro Kafka deserializer which retrieve the schema of the Avro message from an
# instance of schema registry. If this option has value `value_deserializer_class` nor `topics_pattern` could be valued
config :schema_registry_url, :validate => :uri
# Option to set the proxy of the Schema Registry.
# This option permits to define a proxy to be used to reach the schema registry service instance.
config :schema_registry_proxy, :validate => :uri
end
def check_schema_registry_parameters
if @schema_registry_url
check_for_schema_registry_conflicts
@schema_registry_proxy_host, @schema_registry_proxy_port = split_proxy_into_host_and_port(schema_registry_proxy)
check_for_key_and_secret
check_for_schema_registry_connectivity_and_subjects
end
end
private
def check_for_schema_registry_conflicts
if @value_deserializer_class != LogStash::Inputs::Kafka::DEFAULT_DESERIALIZER_CLASS
raise LogStash::ConfigurationError, 'Option schema_registry_url prohibit the customization of value_deserializer_class'
end
if @topics_pattern && !@topics_pattern.empty?
raise LogStash::ConfigurationError, 'Option schema_registry_url prohibit the customization of topics_pattern'
end
end
private
def check_for_schema_registry_connectivity_and_subjects
client = Faraday.new(@schema_registry_url.to_s) do |conn|
if schema_registry_proxy && !schema_registry_proxy.empty?
conn.proxy = schema_registry_proxy.to_s
end
if schema_registry_key and !schema_registry_key.empty?
conn.basic_auth(schema_registry_key, schema_registry_secret.value)
end
end
begin
response = client.get('/subjects')
rescue Faraday::Error => e
raise LogStash::ConfigurationError.new("Schema registry service doesn't respond, error: #{e.message}")
end
registered_subjects = JSON.parse response.body
expected_subjects = @topics.map { |t| "#{t}-value"}
if (expected_subjects & registered_subjects).size != expected_subjects.size
undefined_topic_subjects = expected_subjects - registered_subjects
raise LogStash::ConfigurationError, "The schema registry does not contain definitions for required topic subjects: #{undefined_topic_subjects}"
end
end
def split_proxy_into_host_and_port(proxy_uri)
return nil unless proxy_uri && !proxy_uri.empty?
port = proxy_uri.port
host_spec = ""
host_spec << proxy_uri.scheme || "http"
host_spec << "://"
host_spec << "#{proxy_uri.userinfo}@" if proxy_uri.userinfo
host_spec << proxy_uri.host
[host_spec, port]
end
def check_for_key_and_secret
if schema_registry_key and !schema_registry_key.empty?
if !schema_registry_secret or schema_registry_secret.value.empty?
raise LogStash::ConfigurationError, "Setting `schema_registry_secret` is required when `schema_registry_key` is provided."
end
end
end
end
end
end