diff --git a/docs/index.asciidoc b/docs/index.asciidoc index e62176c..c209d02 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -221,6 +221,12 @@ fields => { * Value type is <> * Default value is `["localhost:9200"]` + * Format for each entry is one of: + - a valid RFC3986 URI with scheme, hostname, and optional port + - an ipv4 address, optionally followed by a colon and port number + - a hostname, optionally followed by a colon and port number + - a square-bracketed ipv6 address, optionally followed by a colon and port number + - a bare ipv6 address List of elasticsearch hosts to use for querying. @@ -281,7 +287,9 @@ Comma-delimited list of `:` pairs that define the sort order * Value type is <> * Default value is `false` -SSL +Force SSL/TLS secured communication to Elasticsearch cluster. +Leaving this unspecified will use whatever scheme is specified in the URLs listed in <>, where mixed schemes are supported. +If SSL is set to `true`, the plugin will refuse to start if any of the hosts specifies an `http://` scheme. [id="plugins-{type}s-{plugin}-tag_on_failure"] ===== `tag_on_failure` diff --git a/lib/logstash/filters/elasticsearch.rb b/lib/logstash/filters/elasticsearch.rb index 3032153..3444bc3 100644 --- a/lib/logstash/filters/elasticsearch.rb +++ b/lib/logstash/filters/elasticsearch.rb @@ -5,6 +5,8 @@ require "logstash/json" java_import "java.util.concurrent.ConcurrentHashMap" +require 'resolv' + class LogStash::Filters::Elasticsearch < LogStash::Filters::Base config_name "elasticsearch" @@ -71,6 +73,8 @@ def register @query_dsl = file.read end + @normalised_hosts = normalise_hosts(@hosts, @ssl) + test_connection! end # def register @@ -140,8 +144,7 @@ def filter(event) private def client_options { - :ssl => @ssl, - :hosts => @hosts, + :hosts => @normalised_hosts, :ca_file => @ca_file, :logger => @logger } @@ -191,4 +194,84 @@ def extract_total_from_hits(hits) def test_connection! get_client.client.ping end + + private + + PATTERN_START_WITH_URI_SCHEME = + %r{\A[[:alpha:]][[:alnum:]\.\+\-]*://}i + + PATTERN_CAPTURING_HOSTNAME_AND_OPTIONAL_PORT = + %r{\A([^:\[\]]+|\[[^\]]+\])(?::([0-9]+))?\Z} + + ## + # Map the provided array-of-strings to an array of `URI::Generic` + # instances, which the Elasticsearch client can use to establish + # connections. + # + # @param hosts [Array]: (@see `#normalise_host`) + # @param force_ssl [Boolean]: (@see `#normalise_host`) + # + # @return [Array] + def normalise_hosts(hosts, force_ssl) + hosts.map { |input| normalise_host(input, force_ssl) } + end + + ## + # Convert the provided string to a `URI::Generic` instance, which the + # Elasticsearch client can use to establish connections. + # + # @param input [String]: a url, in one of the following formats: + # - a qualified URL with schema, hostname, and + # optional port + # - a bare hostname or ip, optionally followed by a + # colon and port number + # - a square-bracketed ipv6 literal, optionally + # followed by a colon and port number + # - a bare ipv6-address + # @param force_ssl [Boolean]: true to force SSL; will cause failure if one + # or more hosts explicitly supplies non-SSL + # scheme (e.g., `http`). + # + # @return [URI::Generic] + def normalise_host(input, force_ssl) + if force_ssl && input.start_with?('http://') + logger.error("Plugin configured to force SSL with `ssl => true`, " + + "but a host explicitly declared non-https URL `#{input}`") + + raise LogStash::ConfigurationError, "Aborting due to conflicting configuration" + end + + begin + if PATTERN_START_WITH_URI_SCHEME.match(input) + # Avoid `URI::parse`, which routes to specific implementations + # that inject defaults that do not make sense in this context. + URI::Generic.new(*URI.split(input)) + else + if PATTERN_CAPTURING_HOSTNAME_AND_OPTIONAL_PORT.match(input) + host, port = Regexp.last_match.captures + elsif input =~ Resolv::IPv6::Regex + # per RFC3986: to be used as hostname in URIs, ipv6 literals + # MUST be wrapped in square-brackets. + host, port = "[#{input}]", nil + else + fail('unsupported format') + end + URI::Generic.new( + force_ssl ? 'https' : 'http', + nil, # userinfo, + host, + port, + nil, # registry + nil, # path + nil, # opaque + nil, # query + nil # fragment + ) + end + rescue => e + logger.error("Plugin configured with invalid host value `#{input}`", + :exception => e.message, :class => e.class.name) + raise LogStash::ConfigurationError, "Aborting due to invalid configuration" + end + end end #class LogStash::Filters::Elasticsearch diff --git a/lib/logstash/filters/elasticsearch/client.rb b/lib/logstash/filters/elasticsearch/client.rb index 986e797..e81fa66 100644 --- a/lib/logstash/filters/elasticsearch/client.rb +++ b/lib/logstash/filters/elasticsearch/client.rb @@ -11,9 +11,8 @@ class ElasticsearchClient attr_reader :client def initialize(user, password, options={}) - ssl = options.fetch(:ssl, false) - hosts = options[:hosts] - @logger = options[:logger] + hosts = options.fetch(:hosts) + @logger = options.fetch(:logger) transport_options = {} if user && password @@ -21,7 +20,6 @@ def initialize(user, password, options={}) transport_options[:headers] = { Authorization: "Basic #{token}" } end - hosts.map! {|h| { host: h, scheme: 'https' } } if ssl # set ca_file even if ssl isn't on, since the host can be an https url ssl_options = { ssl: true, ca_file: options[:ca_file] } if options[:ca_file] ssl_options ||= {} diff --git a/spec/filters/elasticsearch_spec.rb b/spec/filters/elasticsearch_spec.rb index ac19e3e..b93bd85 100644 --- a/spec/filters/elasticsearch_spec.rb +++ b/spec/filters/elasticsearch_spec.rb @@ -8,13 +8,199 @@ context "registration" do - let(:plugin) { LogStash::Plugin.lookup("filter", "elasticsearch").new({}) } - before do - allow(plugin).to receive(:test_connection!) + let(:plugin_class) { LogStash::Plugin.lookup("filter", "elasticsearch") } + let(:plugin) { plugin_class.new(config) } + let(:config) { Hash.new } + + context 'with defaults' do + before do + allow(plugin).to receive(:test_connection!) + end + + it "should not raise an exception" do + expect {plugin.register}.to_not raise_error + end end - it "should not raise an exception" do - expect {plugin.register}.to_not raise_error + context 'hosts' do + let(:config) do + super().merge( + 'hosts' => hosts + ) + end + let(:hosts) do + fail NotImplementedError, 'spec or spec group must define `hosts`.' + end + + let(:client_stub) { double(:client).as_null_object } + let(:logger_stub) { double(:logger).as_null_object } + + before(:each) do + allow(plugin).to receive(:logger).and_return(logger_stub) + end + + context 'with schema://hostname' do + let(:hosts) { ['http://foo.local', 'http://bar.local'] } + + it 'creates client with URIs that do not include a port' do + expect(::Elasticsearch::Client).to receive(:new) do |options| + expect(options).to include :hosts + expect(options[:hosts]).to be_an Array + expect(options[:hosts]).to include(having_attributes(host: 'foo.local', scheme: 'http', port: nil)) + expect(options[:hosts]).to include(having_attributes(host: 'bar.local', scheme: 'http', port: nil)) + end.and_return(client_stub) + + plugin.register + end + end + + context 'with `ssl => true`' do + let(:config) { super().merge('ssl' => 'true') } + context 'and one or more explicitly-http hosts' do + let(:hosts) { ['https://foo.local', 'http://bar.local'] } + + it 'raises an exception' do + expect { plugin.register }.to raise_error(LogStash::ConfigurationError) + end + + it 'emits a helpful log message' do + plugin.register rescue nil + expect(plugin.logger).to have_received(:error).with(match(/force SSL/)) + end + end + + context 'and all explicitly-https hosts' do + let(:hosts) { ['https://foo.local', 'https://bar.local'] } + + it 'sets the schemas on all to https' do + expect(::Elasticsearch::Client).to receive(:new) do |options| + expect(options).to include :hosts + expect(options[:hosts]).to be_an Array + options[:hosts].each do |host| + expect(host).to be_an URI + expect(host.scheme).to eq 'https' + end + end.and_return(client_stub) + + plugin.register + end + end + + context 'and one or more schemaless hosts' do + let(:hosts) { ['https://foo.local', 'bar.local'] } + + it 'sets the schemas on all to https' do + expect(::Elasticsearch::Client).to receive(:new) do |options| + expect(options).to include :hosts + expect(options[:hosts]).to be_an Array + options[:hosts].each do |host| + expect(host).to be_an URI + expect(host.scheme).to eq 'https' + end + end.and_return(client_stub) + + plugin.register + end + end + + context 'with one or more ipv6 hostnames' do + let(:hosts) { ['[::1]', '[::2]:9201', 'https://[::3]:9202', '::4'] } + it 'defaults to the http protocol' do + expect(::Elasticsearch::Client).to receive(:new) do |options| + expect(options).to include :hosts + expect(options[:hosts]).to be_an Array + expect(options[:hosts]).to include(having_attributes(scheme: 'https', host: '[::1]', port: nil)) + expect(options[:hosts]).to include(having_attributes(scheme: 'https', host: '[::2]', port: 9201)) + expect(options[:hosts]).to include(having_attributes(scheme: 'https', host: '[::3]', port: 9202)) + expect(options[:hosts]).to include(having_attributes(scheme: 'https', host: '[::4]', port: nil)) + end.and_return(client_stub) + + plugin.register + end + end + end + + { + 'with `ssl => false' => {'ssl' => 'false'}, + 'without `ssl` directive' => {} + }.each do |context_string, config_override| + context(context_string) do + let(:config) { super().merge(config_override) } + + context 'with a mix of http and https hosts' do + let(:hosts) { ['https://foo.local', 'http://bar.local'] } + it 'does not modify the protocol' do + expect(::Elasticsearch::Client).to receive(:new) do |options| + expect(options).to include :hosts + expect(options[:hosts]).to be_an Array + expect(options[:hosts]).to include(having_attributes(scheme: 'https', host: 'foo.local', port: nil)) + expect(options[:hosts]).to include(having_attributes(scheme: 'http', host: 'bar.local', port: nil)) + end.and_return(client_stub) + + plugin.register + end + end + + context 'with https-only hosts' do + let(:hosts) { ['https://foo.local', 'https://bar.local'] } + it 'does not modify the protocol' do + expect(::Elasticsearch::Client).to receive(:new) do |options| + expect(options).to include :hosts + expect(options[:hosts]).to be_an Array + expect(options[:hosts]).to include(having_attributes(scheme: 'https', host: 'foo.local', port: nil)) + expect(options[:hosts]).to include(having_attributes(scheme: 'https', host: 'bar.local', port: nil)) + end.and_return(client_stub) + + plugin.register + end + end + + context 'with http-only hosts' do + let(:hosts) { ['http://foo.local', 'http://bar.local'] } + it 'does not modify the protocol' do + expect(::Elasticsearch::Client).to receive(:new) do |options| + expect(options).to include :hosts + expect(options[:hosts]).to be_an Array + expect(options[:hosts]).to include(having_attributes(scheme: 'http', host: 'foo.local', port: nil)) + expect(options[:hosts]).to include(having_attributes(scheme: 'http', host: 'bar.local', port: nil)) + end.and_return(client_stub) + + plugin.register + end + end + + context 'with one or more schemaless hosts' do + let(:hosts) { ['foo.local', 'bar.local' ] } + it 'defaults to the http protocol' do + expect(::Elasticsearch::Client).to receive(:new) do |options| + expect(options).to include :hosts + expect(options[:hosts]).to be_an Array + expect(options[:hosts]).to include(having_attributes(scheme: 'http', host: 'foo.local', port: nil)) + expect(options[:hosts]).to include(having_attributes(scheme: 'http', host: 'bar.local', port: nil)) + end.and_return(client_stub) + + plugin.register + end + end + + context 'with one or more square-bracketed ipv6 literals' do + let(:hosts) { ['[::1]', '[::2]:9201', 'http://[::3]','https://[::4]:9202', '::5'] } + it 'defaults to the http protocol' do + expect(::Elasticsearch::Client).to receive(:new) do |options| + expect(options).to include :hosts + expect(options[:hosts]).to be_an Array + expect(options[:hosts]).to include(having_attributes(scheme: 'http', host: '[::1]', port: nil)) + expect(options[:hosts]).to include(having_attributes(scheme: 'http', host: '[::2]', port: 9201)) + expect(options[:hosts]).to include(having_attributes(scheme: 'http', host: '[::3]', port: nil)) + expect(options[:hosts]).to include(having_attributes(scheme: 'https', host: '[::4]', port: 9202)) + expect(options[:hosts]).to include(having_attributes(scheme: 'http', host: '[::5]', port: nil)) + end.and_return(client_stub) + + plugin.register + end + end + end + end end end