Skip to content

Commit

Permalink
normalise hosts to URI::Generics, which reliably preserve defaults
Browse files Browse the repository at this point in the history
An upstream bug in the Elasticsearch Ruby Client's handling of `String` host
arguments that begin with a schema (e.g., `https://localhost`) causes it to
default to port 80 or 443, depending on the schema, instead of Elasticsearch's
port 9200.

Since the Elasticsearch Ruby Client will accept a `URI` in this case, and will
correctly handle falling through to appropriate defaults, we normalise to
`URI::Generic`, which does not have a default port.

We absorb the `ssl => true` case into this normalisation, as its previous
implementation prevented the use of non-default ports in the array provided
to `hosts`.

We also add support for IPv6 addresses, requiring a square-bracketed notation
when used in conjunction with a specified port.
(see: RFC-3986)

Supersedes: logstash-plugins#104
Resolves:   logstash-plugins#110
Resolves:   logstash-plugins#111
  • Loading branch information
yaauie committed Mar 27, 2019
1 parent a8ae485 commit b18bfb6
Show file tree
Hide file tree
Showing 4 changed files with 287 additions and 12 deletions.
10 changes: 9 additions & 1 deletion docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,12 @@ fields => {

* Value type is <<array,array>>
* Default value is `["localhost:9200"]`
* Format for each entry is one of:
- a valid RFC3986 URI with scheme, hostname, and optional port
- an ipv4 address, optionally followed by a colon and port number
- a hostname, optionally followed by a colon and port number
- a square-bracketed ipv6 address, optionally followed by a colon and port number
- a bare ipv6 address

List of elasticsearch hosts to use for querying.

Expand Down Expand Up @@ -281,7 +287,9 @@ Comma-delimited list of `<field>:<direction>` pairs that define the sort order
* Value type is <<boolean,boolean>>
* Default value is `false`

SSL
Force SSL/TLS secured communication to Elasticsearch cluster.
Leaving this unspecified will use whatever scheme is specified in the URLs listed in <<plugins-{type}s-{plugin}-hosts>>, where mixed schemes are supported.
If SSL is set to `true`, the plugin will refuse to start if any of the hosts specifies an `http://` scheme.

[id="plugins-{type}s-{plugin}-tag_on_failure"]
===== `tag_on_failure`
Expand Down
87 changes: 85 additions & 2 deletions lib/logstash/filters/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
require "logstash/json"
java_import "java.util.concurrent.ConcurrentHashMap"

require 'resolv'


class LogStash::Filters::Elasticsearch < LogStash::Filters::Base
config_name "elasticsearch"
Expand Down Expand Up @@ -71,6 +73,8 @@ def register
@query_dsl = file.read
end

@normalised_hosts = normalise_hosts(@hosts, @ssl)

test_connection!
end # def register

Expand Down Expand Up @@ -140,8 +144,7 @@ def filter(event)
private
def client_options
{
:ssl => @ssl,
:hosts => @hosts,
:hosts => @normalised_hosts,
:ca_file => @ca_file,
:logger => @logger
}
Expand Down Expand Up @@ -191,4 +194,84 @@ def extract_total_from_hits(hits)
def test_connection!
get_client.client.ping
end

private

PATTERN_START_WITH_URI_SCHEME =
%r{\A[[:alpha:]][[:alnum:]\.\+\-]*://}i

PATTERN_CAPTURING_HOSTNAME_AND_OPTIONAL_PORT =
%r{\A([^:\[\]]+|\[[^\]]+\])(?::([0-9]+))?\Z}

##
# Map the provided array-of-strings to an array of `URI::Generic`
# instances, which the Elasticsearch client can use to establish
# connections.
#
# @param hosts [Array<String>]: (@see `#normalise_host`)
# @param force_ssl [Boolean]: (@see `#normalise_host`)
#
# @return [Array<URI::Generic>]
def normalise_hosts(hosts, force_ssl)
hosts.map { |input| normalise_host(input, force_ssl) }
end

##
# Convert the provided string to a `URI::Generic` instance, which the
# Elasticsearch client can use to establish connections.
#
# @param input [String]: a url, in one of the following formats:
# - a qualified URL with schema, hostname, and
# optional port
# - a bare hostname or ip, optionally followed by a
# colon and port number
# - a square-bracketed ipv6 literal, optionally
# followed by a colon and port number
# - a bare ipv6-address
# @param force_ssl [Boolean]: true to force SSL; will cause failure if one
# or more hosts explicitly supplies non-SSL
# scheme (e.g., `http`).
#
# @return [URI::Generic]
def normalise_host(input, force_ssl)
if force_ssl && input.start_with?('http://')
logger.error("Plugin configured to force SSL with `ssl => true`, " +
"but a host explicitly declared non-https URL `#{input}`")

raise LogStash::ConfigurationError, "Aborting due to conflicting configuration"
end

begin
if PATTERN_START_WITH_URI_SCHEME.match(input)
# Avoid `URI::parse`, which routes to specific implementations
# that inject defaults that do not make sense in this context.
URI::Generic.new(*URI.split(input))
else
if PATTERN_CAPTURING_HOSTNAME_AND_OPTIONAL_PORT.match(input)
host, port = Regexp.last_match.captures
elsif input =~ Resolv::IPv6::Regex
# per RFC3986: to be used as hostname in URIs, ipv6 literals
# MUST be wrapped in square-brackets.
host, port = "[#{input}]", nil
else
fail('unsupported format')
end
URI::Generic.new(
force_ssl ? 'https' : 'http',
nil, # userinfo,
host,
port,
nil, # registry
nil, # path
nil, # opaque
nil, # query
nil # fragment
)
end
rescue => e
logger.error("Plugin configured with invalid host value `#{input}`",
:exception => e.message, :class => e.class.name)
raise LogStash::ConfigurationError, "Aborting due to invalid configuration"
end
end
end #class LogStash::Filters::Elasticsearch
6 changes: 2 additions & 4 deletions lib/logstash/filters/elasticsearch/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,15 @@ class ElasticsearchClient
attr_reader :client

def initialize(user, password, options={})
ssl = options.fetch(:ssl, false)
hosts = options[:hosts]
@logger = options[:logger]
hosts = options.fetch(:hosts)
@logger = options.fetch(:logger)

transport_options = {}
if user && password
token = ::Base64.strict_encode64("#{user}:#{password.value}")
transport_options[:headers] = { Authorization: "Basic #{token}" }
end

hosts.map! {|h| { host: h, scheme: 'https' } } if ssl
# set ca_file even if ssl isn't on, since the host can be an https url
ssl_options = { ssl: true, ca_file: options[:ca_file] } if options[:ca_file]
ssl_options ||= {}
Expand Down
196 changes: 191 additions & 5 deletions spec/filters/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,199 @@

context "registration" do

let(:plugin) { LogStash::Plugin.lookup("filter", "elasticsearch").new({}) }
before do
allow(plugin).to receive(:test_connection!)
let(:plugin_class) { LogStash::Plugin.lookup("filter", "elasticsearch") }
let(:plugin) { plugin_class.new(config) }
let(:config) { Hash.new }

context 'with defaults' do
before do
allow(plugin).to receive(:test_connection!)
end

it "should not raise an exception" do
expect {plugin.register}.to_not raise_error
end
end

it "should not raise an exception" do
expect {plugin.register}.to_not raise_error
context 'hosts' do
let(:config) do
super().merge(
'hosts' => hosts
)
end
let(:hosts) do
fail NotImplementedError, 'spec or spec group must define `hosts`.'
end

let(:client_stub) { double(:client).as_null_object }
let(:logger_stub) { double(:logger).as_null_object }

before(:each) do
allow(plugin).to receive(:logger).and_return(logger_stub)
end

context 'with schema://hostname' do
let(:hosts) { ['http://foo.local', 'http://bar.local'] }

it 'creates client with URIs that do not include a port' do
expect(::Elasticsearch::Client).to receive(:new) do |options|
expect(options).to include :hosts
expect(options[:hosts]).to be_an Array
expect(options[:hosts]).to include(having_attributes(host: 'foo.local', scheme: 'http', port: nil))
expect(options[:hosts]).to include(having_attributes(host: 'bar.local', scheme: 'http', port: nil))
end.and_return(client_stub)

plugin.register
end
end

context 'with `ssl => true`' do
let(:config) { super().merge('ssl' => 'true') }
context 'and one or more explicitly-http hosts' do
let(:hosts) { ['https://foo.local', 'http://bar.local'] }

it 'raises an exception' do
expect { plugin.register }.to raise_error(LogStash::ConfigurationError)
end

it 'emits a helpful log message' do
plugin.register rescue nil
expect(plugin.logger).to have_received(:error).with(match(/force SSL/))
end
end

context 'and all explicitly-https hosts' do
let(:hosts) { ['https://foo.local', 'https://bar.local'] }

it 'sets the schemas on all to https' do
expect(::Elasticsearch::Client).to receive(:new) do |options|
expect(options).to include :hosts
expect(options[:hosts]).to be_an Array
options[:hosts].each do |host|
expect(host).to be_an URI
expect(host.scheme).to eq 'https'
end
end.and_return(client_stub)

plugin.register
end
end

context 'and one or more schemaless hosts' do
let(:hosts) { ['https://foo.local', 'bar.local'] }

it 'sets the schemas on all to https' do
expect(::Elasticsearch::Client).to receive(:new) do |options|
expect(options).to include :hosts
expect(options[:hosts]).to be_an Array
options[:hosts].each do |host|
expect(host).to be_an URI
expect(host.scheme).to eq 'https'
end
end.and_return(client_stub)

plugin.register
end
end

context 'with one or more ipv6 hostnames' do
let(:hosts) { ['[::1]', '[::2]:9201', 'https://[::3]:9202', '::4'] }
it 'defaults to the http protocol' do
expect(::Elasticsearch::Client).to receive(:new) do |options|
expect(options).to include :hosts
expect(options[:hosts]).to be_an Array
expect(options[:hosts]).to include(having_attributes(scheme: 'https', host: '[::1]', port: nil))
expect(options[:hosts]).to include(having_attributes(scheme: 'https', host: '[::2]', port: 9201))
expect(options[:hosts]).to include(having_attributes(scheme: 'https', host: '[::3]', port: 9202))
expect(options[:hosts]).to include(having_attributes(scheme: 'https', host: '[::4]', port: nil))
end.and_return(client_stub)

plugin.register
end
end
end

{
'with `ssl => false' => {'ssl' => 'false'},
'without `ssl` directive' => {}
}.each do |context_string, config_override|
context(context_string) do
let(:config) { super().merge(config_override) }

context 'with a mix of http and https hosts' do
let(:hosts) { ['https://foo.local', 'http://bar.local'] }
it 'does not modify the protocol' do
expect(::Elasticsearch::Client).to receive(:new) do |options|
expect(options).to include :hosts
expect(options[:hosts]).to be_an Array
expect(options[:hosts]).to include(having_attributes(scheme: 'https', host: 'foo.local', port: nil))
expect(options[:hosts]).to include(having_attributes(scheme: 'http', host: 'bar.local', port: nil))
end.and_return(client_stub)

plugin.register
end
end

context 'with https-only hosts' do
let(:hosts) { ['https://foo.local', 'https://bar.local'] }
it 'does not modify the protocol' do
expect(::Elasticsearch::Client).to receive(:new) do |options|
expect(options).to include :hosts
expect(options[:hosts]).to be_an Array
expect(options[:hosts]).to include(having_attributes(scheme: 'https', host: 'foo.local', port: nil))
expect(options[:hosts]).to include(having_attributes(scheme: 'https', host: 'bar.local', port: nil))
end.and_return(client_stub)

plugin.register
end
end

context 'with http-only hosts' do
let(:hosts) { ['http://foo.local', 'http://bar.local'] }
it 'does not modify the protocol' do
expect(::Elasticsearch::Client).to receive(:new) do |options|
expect(options).to include :hosts
expect(options[:hosts]).to be_an Array
expect(options[:hosts]).to include(having_attributes(scheme: 'http', host: 'foo.local', port: nil))
expect(options[:hosts]).to include(having_attributes(scheme: 'http', host: 'bar.local', port: nil))
end.and_return(client_stub)

plugin.register
end
end

context 'with one or more schemaless hosts' do
let(:hosts) { ['foo.local', 'bar.local' ] }
it 'defaults to the http protocol' do
expect(::Elasticsearch::Client).to receive(:new) do |options|
expect(options).to include :hosts
expect(options[:hosts]).to be_an Array
expect(options[:hosts]).to include(having_attributes(scheme: 'http', host: 'foo.local', port: nil))
expect(options[:hosts]).to include(having_attributes(scheme: 'http', host: 'bar.local', port: nil))
end.and_return(client_stub)

plugin.register
end
end

context 'with one or more square-bracketed ipv6 literals' do
let(:hosts) { ['[::1]', '[::2]:9201', 'http://[::3]','https://[::4]:9202', '::5'] }
it 'defaults to the http protocol' do
expect(::Elasticsearch::Client).to receive(:new) do |options|
expect(options).to include :hosts
expect(options[:hosts]).to be_an Array
expect(options[:hosts]).to include(having_attributes(scheme: 'http', host: '[::1]', port: nil))
expect(options[:hosts]).to include(having_attributes(scheme: 'http', host: '[::2]', port: 9201))
expect(options[:hosts]).to include(having_attributes(scheme: 'http', host: '[::3]', port: nil))
expect(options[:hosts]).to include(having_attributes(scheme: 'https', host: '[::4]', port: 9202))
expect(options[:hosts]).to include(having_attributes(scheme: 'http', host: '[::5]', port: nil))
end.and_return(client_stub)

plugin.register
end
end
end
end
end
end

Expand Down

0 comments on commit b18bfb6

Please sign in to comment.