From a573d62b509fb3b5d55e3c5bb54bb3f4c1fc194a Mon Sep 17 00:00:00 2001 From: Mashhur <99575341+mashhurs@users.noreply.github.com> Date: Fri, 18 Jul 2025 09:53:57 -0700 Subject: [PATCH] ES|QL support (#194) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * ESQL and DSL executors are introduced. param can accept ES|QL query shape now. is introduced for initial step but needs team's feedback. DSL logics moved into DSL executors. * Apply suggestions from code review Separate DSL and ESQL interface in the client. Co-authored-by: Rye Biesemeyer * Rebase against upstream main after target support added. Separate unit test for DSL. Address comments: do not save ES version in client, add apply target method in executors, set to target if target is defined, docs update. Co-authored-by: Rye Biesemeyer * Introduce query_type option which accepts dsl or esql to define a query shape. Remove multi-depth nested named_params and keep only top-level query_params which aligns with placeholder structure in the ES|QL. * Separate event referenced and static valued fields at initialization of the ESQL executor. * query_params now supports both Array and Hash types. * Add tech preview section under ESQL. * Place the query results based on the target specified. If not specified, first result will be set to event's top level. * Apply suggestions from code review Doc corrections. Co-authored-by: João Duarte * ES|QL result mapping to event doc correction. * Integration tests to run with credentials enabled and SSL configs. --------- Co-authored-by: Rye Biesemeyer Co-authored-by: João Duarte (cherry picked from commit 5e3c4648facde569499f85034ed94071e2932d35) --- CHANGELOG.md | 3 + docs/index.asciidoc | 138 +++++- lib/logstash/filters/elasticsearch.rb | 235 ++++----- lib/logstash/filters/elasticsearch/client.rb | 8 + .../filters/elasticsearch/dsl_executor.rb | 140 ++++++ .../filters/elasticsearch/esql_executor.rb | 178 +++++++ logstash-filter-elasticsearch.gemspec | 2 +- spec/filters/elasticsearch_dsl_spec.rb | 372 ++++++++++++++ spec/filters/elasticsearch_esql_spec.rb | 211 ++++++++ spec/filters/elasticsearch_spec.rb | 455 +++++------------- .../integration/elasticsearch_esql_spec.rb | 167 +++++++ 11 files changed, 1447 insertions(+), 462 deletions(-) create mode 100644 lib/logstash/filters/elasticsearch/dsl_executor.rb create mode 100644 lib/logstash/filters/elasticsearch/esql_executor.rb create mode 100644 spec/filters/elasticsearch_dsl_spec.rb create mode 100644 spec/filters/elasticsearch_esql_spec.rb create mode 100644 spec/filters/integration/elasticsearch_esql_spec.rb diff --git a/CHANGELOG.md b/CHANGELOG.md index cd9c939..234c44f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 3.19.0 + - ES|QL support [#199](https://github.com/logstash-plugins/logstash-filter-elasticsearch/pull/199) + ## 3.18.0 - Add `target` configuration option to store the result into it [#197](https://github.com/logstash-plugins/logstash-filter-elasticsearch/pull/197) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index db8f055..1e11fd4 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -54,7 +54,7 @@ if [type] == "end" { The example below reproduces the above example but utilises the query_template. This query_template represents a full Elasticsearch query DSL and supports the -standard Logstash field substitution syntax. The example below issues +standard {ls} field substitution syntax. The example below issues the same query as the first example but uses the template shown. [source,ruby] @@ -118,6 +118,110 @@ Authentication to a secure Elasticsearch cluster is possible using _one_ of the Authorization to a secure Elasticsearch cluster requires `read` permission at index level and `monitoring` permissions at cluster level. The `monitoring` permission at cluster level is necessary to perform periodic connectivity checks. +[id="plugins-{type}s-{plugin}-esql"] +==== {esql} support + +.Technical Preview +**** +The {esql} feature that allows using ES|QL queries with this plugin is in Technical Preview. +Configuration options and implementation details are subject to change in minor releases without being preceded by deprecation warnings. +**** + +{es} Query Language ({esql}) provides a SQL-like interface for querying your {es} data. + +To use {esql}, this plugin needs to be installed in {ls} 8.17.4 or newer, and must be connected to {es} 8.11 or newer. + +To configure {esql} query in the plugin, set your {esql} query in the `query` parameter. + +IMPORTANT: We recommend understanding {ref}/esql-limitations.html[{esql} current limitations] before using it in production environments. + +The following is a basic {esql} query that sets the food name to transaction event based on upstream event's food ID: +[source, ruby] + filter { + elasticsearch { + hosts => [ 'https://..'] + api_key => '....' + query => ' + FROM food-index + | WHERE id == ?food_id + ' + query_params => { + "food_id" => "[food][id]" + } + } + } + +Set `config.support_escapes: true` in `logstash.yml` if you need to escape special chars in the query. + +In the result event, the plugin sets total result size in `[@metadata][total_values]` field. + +[id="plugins-{type}s-{plugin}-esql-event-mapping"] +===== Mapping {esql} result to {ls} event +{esql} returns query results in a structured tabular format, where data is organized into _columns_ (fields) and _values_ (entries). +The plugin maps each value entry to an event, populating corresponding fields. +For example, a query might produce a table like: + +[cols="2,1,1,1,2",options="header"] +|=== +|`timestamp` |`user_id` | `action` | `status.code` | `status.desc` + +|2025-04-10T12:00:00 |123 |login |200 | Success +|2025-04-10T12:05:00 |456 |purchase |403 | Forbidden (unauthorized user) +|=== + +For this case, the plugin creates two JSON look like objects as below and places them into the `target` field of the event if `target` is defined. +If `target` is not defined, the plugin places the _only_ first result at the root of the event. +[source, json] +[ + { + "timestamp": "2025-04-10T12:00:00", + "user_id": 123, + "action": "login", + "status": { + "code": 200, + "desc": "Success" + } + }, + { + "timestamp": "2025-04-10T12:05:00", + "user_id": 456, + "action": "purchase", + "status": { + "code": 403, + "desc": "Forbidden (unauthorized user)" + } + } +] + +NOTE: If your index has a mapping with sub-objects where `status.code` and `status.desc` actually dotted fields, they appear in {ls} events as a nested structure. + +[id="plugins-{type}s-{plugin}-esql-multifields"] +===== Conflict on multi-fields + +{esql} query fetches all parent and sub-fields fields if your {es} index has https://www.elastic.co/docs/reference/elasticsearch/mapping-reference/multi-fields[multi-fields] or https://www.elastic.co/docs/reference/elasticsearch/mapping-reference/subobjects[subobjects]. +Since {ls} events cannot contain parent field's concrete value and sub-field values together, the plugin ignores sub-fields with warning and includes parent. +We recommend using the `RENAME` (or `DROP` to avoid warning) keyword in your {esql} query explicitly rename the fields to include sub-fields into the event. + +This is a common occurrence if your template or mapping follows the pattern of always indexing strings as "text" (`field`) + " keyword" (`field.keyword`) multi-field. +In this case it's recommended to do `KEEP field` if the string is identical and there is only one subfield as the engine will optimize and retrieve the keyword, otherwise you can do `KEEP field.keyword | RENAME field.keyword as field`. + +To illustrate the situation with example, assuming your mapping has a time `time` field with `time.min` and `time.max` sub-fields as following: +[source, ruby] + "properties": { + "time": { "type": "long" }, + "time.min": { "type": "long" }, + "time.max": { "type": "long" } + } + +The {esql} result will contain all three fields but the plugin cannot map them into {ls} event. +To avoid this, you can use the `RENAME` keyword to rename the `time` parent field to get all three fields with unique fields. +[source, ruby] + ... + query => 'FROM my-index | RENAME time AS time.current' + ... + +For comprehensive ES|QL syntax reference and best practices, see the https://www.elastic.co/guide/en/elasticsearch/reference/current/esql-syntax.html[{esql} documentation]. + [id="plugins-{type}s-{plugin}-options"] ==== Elasticsearch Filter Configuration Options @@ -140,6 +244,8 @@ This plugin supports the following configuration options plus the <> |<>|No | <> |<>|No | <> |<>|No +| <> |<>, one of `["dsl", "esql"]`|No +| <> |<> or <>|No | <> |<>|No | <> |<>|No | <> |<>|No @@ -337,11 +443,30 @@ environment variables e.g. `proxy => '${LS_PROXY:}'`. * Value type is <> * There is no default value for this setting. -Elasticsearch query string. More information is available in the -{ref}/query-dsl-query-string-query.html#query-string-syntax[Elasticsearch query -string documentation]. -Use either `query` or `query_template`. +The query to be executed. +The accepted query shape is DSL query string or ES|QL. +For the DSL query string, use either `query` or `query_template`. +Read the {ref}/query-dsl-query-string-query.html[{es} query +string documentation] or {ref}/esql.html[{es} ES|QL documentation] for more information. + +[id="plugins-{type}s-{plugin}-query_type"] +===== `query_type` + +* Value can be `dsl` or `esql` +* Default value is `dsl` + +Defines the <> shape. +When `dsl`, the query shape must be valid {es} JSON-style string. +When `esql`, the query shape must be a valid {esql} string and `index`, `query_template` and `sort` parameters are not allowed. + +[id="plugins-{type}s-{plugin}-query_params"] +===== `query_params` + +* The value type is <> or <>. When an array provided, the array elements are pairs of `key` and `value`. +* There is no default value for this setting +Named parameters in {esql} to send to {es} together with <>. +Visit {ref}/esql-rest.html#esql-rest-params[passing parameters to query page] for more information. [id="plugins-{type}s-{plugin}-query_template"] ===== `query_template` @@ -538,8 +663,9 @@ Tags the event on failure to look up previous log event information. This can be Define the target field for placing the result data. If this setting is omitted, the target will be the root (top level) of the event. +It is highly recommended to set when using `query_type=>'esql'` to set all query results into the event. -The destination fields specified in <>, <>, and <> are relative to this target. +When `query_type=>'dsl'`, the destination fields specified in <>, <>, and <> are relative to this target. For example, if you want the data to be put in the `operation` field: [source,ruby] diff --git a/lib/logstash/filters/elasticsearch.rb b/lib/logstash/filters/elasticsearch.rb index 4f8aca4..e1797cb 100644 --- a/lib/logstash/filters/elasticsearch.rb +++ b/lib/logstash/filters/elasticsearch.rb @@ -13,6 +13,9 @@ class LogStash::Filters::Elasticsearch < LogStash::Filters::Base + require 'logstash/filters/elasticsearch/dsl_executor' + require 'logstash/filters/elasticsearch/esql_executor' + include LogStash::PluginMixins::ECSCompatibilitySupport include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck @@ -25,8 +28,13 @@ class LogStash::Filters::Elasticsearch < LogStash::Filters::Base # Field substitution (e.g. `index-name-%{date_field}`) is available config :index, :validate => :string, :default => "" - # Elasticsearch query string. Read the Elasticsearch query string documentation. - # for more info at: https://www.elastic.co/guide/en/elasticsearch/reference/master/query-dsl-query-string-query.html#query-string-syntax + # A type of Elasticsearch query, provided by @query. + config :query_type, :validate => %w[esql dsl], :default => "dsl" + + # Elasticsearch query string. This can be in DSL or ES|QL query shape defined by @query_type. + # Read the Elasticsearch query string documentation. + # DSL: https://www.elastic.co/guide/en/elasticsearch/reference/master/query-dsl-query-string-query.html#query-string-syntax + # ES|QL: https://www.elastic.co/guide/en/elasticsearch/reference/current/esql.html config :query, :validate => :string # File path to elasticsearch query in DSL format. Read the Elasticsearch query documentation @@ -138,7 +146,7 @@ class LogStash::Filters::Elasticsearch < LogStash::Filters::Base # Tags the event on failure to look up geo information. This can be used in later analysis. config :tag_on_failure, :validate => :array, :default => ["_elasticsearch_lookup_failure"] - # If set, the the result set will be nested under the target field + # If set, the result set will be nested under the target field config :target, :validate => :field_reference # How many times to retry on failure? @@ -147,6 +155,16 @@ class LogStash::Filters::Elasticsearch < LogStash::Filters::Base # What status codes to retry on? config :retry_on_status, :validate => :number, :list => true, :default => [500, 502, 503, 504] + # named placeholders in ES|QL query + # example, + # if the query is "FROM my-index | WHERE some_type = ?type AND depth > ?min_depth" + # named placeholders can be applied as the following in query_params: + # query_params => [ + # {"type" => "%{[type]}"} + # {"min_depth" => "%{[depth]}"} + # ] + config :query_params, :validate => :array, :default => [] + # config :ca_trusted_fingerprint, :validate => :sha_256_hex include LogStash::PluginMixins::CATrustedFingerprintSupport @@ -155,6 +173,9 @@ class LogStash::Filters::Elasticsearch < LogStash::Filters::Base include MonitorMixin attr_reader :shared_client + LS_ESQL_SUPPORT_VERSION = "8.17.4" # the version started using elasticsearch-ruby v8 + ES_ESQL_SUPPORT_VERSION = "8.11.0" + ## # @override to handle proxy => '' as if none was set # @param value [Array] @@ -172,17 +193,22 @@ def self.validate_value(value, validator) return super(value, :uri) end + attr_reader :query_dsl + def register - #Load query if it exists - if @query_template - if File.zero?(@query_template) - raise "template is empty" - end - file = File.open(@query_template, 'r') - @query_dsl = file.read + case @query_type + when "esql" + invalid_params_with_esql = original_params.keys & %w(index query_template sort fields docinfo_fields aggregation_fields enable_sort result_size) + raise LogStash::ConfigurationError, "Configured #{invalid_params_with_esql} params cannot be used with ES|QL query" if invalid_params_with_esql.any? + + validate_ls_version_for_esql_support! + validate_esql_query_and_params! + @esql_executor ||= LogStash::Filters::Elasticsearch::EsqlExecutor.new(self, @logger) + else # dsl + validate_dsl_query_settings! + @esql_executor ||= LogStash::Filters::Elasticsearch::DslExecutor.new(self, @logger) end - validate_query_settings fill_hosts_from_cloud_id setup_ssl_params! validate_authentication @@ -191,6 +217,7 @@ def register @hosts = Array(@hosts).map { |host| host.to_s } # potential SafeURI#to_s test_connection! + validate_es_for_esql_support! if @query_type == "esql" setup_serverless if get_client.es_transport_client_type == "elasticsearch_transport" require_relative "elasticsearch/patches/_elasticsearch_transport_http_manticore" @@ -198,71 +225,15 @@ def register end # def register def filter(event) - matched = false - begin - params = { :index => event.sprintf(@index) } - - if @query_dsl - query = LogStash::Json.load(event.sprintf(@query_dsl)) - params[:body] = query - else - query = event.sprintf(@query) - params[:q] = query - params[:size] = result_size - params[:sort] = @sort if @enable_sort - end - - @logger.debug("Querying elasticsearch for lookup", :params => params) - - results = get_client.search(params) - raise "Elasticsearch query error: #{results["_shards"]["failures"]}" if results["_shards"].include? "failures" - - event.set("[@metadata][total_hits]", extract_total_from_hits(results['hits'])) - - resultsHits = results["hits"]["hits"] - if !resultsHits.nil? && !resultsHits.empty? - matched = true - @fields.each do |old_key, new_key| - old_key_path = extract_path(old_key) - extracted_hit_values = resultsHits.map do |doc| - extract_value(doc["_source"], old_key_path) - end - value_to_set = extracted_hit_values.count > 1 ? extracted_hit_values : extracted_hit_values.first - set_to_event_target(event, new_key, value_to_set) - end - @docinfo_fields.each do |old_key, new_key| - old_key_path = extract_path(old_key) - extracted_docs_info = resultsHits.map do |doc| - extract_value(doc, old_key_path) - end - value_to_set = extracted_docs_info.count > 1 ? extracted_docs_info : extracted_docs_info.first - set_to_event_target(event, new_key, value_to_set) - end - end - - resultsAggs = results["aggregations"] - if !resultsAggs.nil? && !resultsAggs.empty? - matched = true - @aggregation_fields.each do |agg_name, ls_field| - set_to_event_target(event, ls_field, resultsAggs[agg_name]) - end - end - - rescue => e - if @logger.trace? - @logger.warn("Failed to query elasticsearch for previous event", :index => @index, :query => query, :event => event.to_hash, :error => e.message, :backtrace => e.backtrace) - elsif @logger.debug? - @logger.warn("Failed to query elasticsearch for previous event", :index => @index, :error => e.message, :backtrace => e.backtrace) - else - @logger.warn("Failed to query elasticsearch for previous event", :index => @index, :error => e.message) - end - @tag_on_failure.each{|tag| event.tag(tag)} - else - filter_matched(event) if matched - end + @esql_executor.process(get_client, event) end # def filter - # public only to be reuse in testing + def decorate(event) + # this Elasticsearch class has access to `filter_matched` + filter_matched(event) + end + + # public only to be reused in testing def prepare_user_agent os_name = java.lang.System.getProperty('os.name') os_version = java.lang.System.getProperty('os.version') @@ -277,18 +248,6 @@ def prepare_user_agent private - # if @target is defined, creates a nested structure to inject result into target field - # if not defined, directly sets to the top-level event field - # @param event [LogStash::Event] - # @param new_key [String] name of the field to set - # @param value_to_set [Array] values to set - # @return [void] - def set_to_event_target(event, new_key, value_to_set) - key_to_set = target ? "[#{target}][#{new_key}]" : new_key - - event.set(key_to_set, value_to_set) - end - def client_options @client_options ||= { :user => @user, @@ -385,53 +344,10 @@ def get_client end end - # get an array of path elements from a path reference - def extract_path(path_reference) - return [path_reference] unless path_reference.start_with?('[') && path_reference.end_with?(']') - - path_reference[1...-1].split('][') - end - - # given a Hash and an array of path fragments, returns the value at the path - # @param source [Hash{String=>Object}] - # @param path [Array{String}] - # @return [Object] - def extract_value(source, path) - path.reduce(source) do |memo, old_key_fragment| - break unless memo.include?(old_key_fragment) - memo[old_key_fragment] - end - end - - # Given a "hits" object from an Elasticsearch response, return the total number of hits in - # the result set. - # @param hits [Hash{String=>Object}] - # @return [Integer] - def extract_total_from_hits(hits) - total = hits['total'] - - # Elasticsearch 7.x produces an object containing `value` and `relation` in order - # to enable unambiguous reporting when the total is only a lower bound; if we get - # an object back, return its `value`. - return total['value'] if total.kind_of?(Hash) - - total - end - def hosts_default?(hosts) hosts.is_a?(Array) && hosts.size == 1 && !original_params.key?('hosts') end - def validate_query_settings - unless @query || @query_template - raise LogStash::ConfigurationError, "Both `query` and `query_template` are empty. Require either `query` or `query_template`." - end - - if @query && @query_template - raise LogStash::ConfigurationError, "Both `query` and `query_template` are set. Use either `query` or `query_template`." - end - end - def validate_authentication authn_options = 0 authn_options += 1 if @cloud_auth @@ -560,4 +476,65 @@ def effectively_ssl? hosts.all? { |host| host && host.to_s.start_with?("https") } end + def validate_dsl_query_settings! + #Load query if it exists + if @query_template + if File.zero?(@query_template) + raise "template is empty" + end + file = File.open(@query_template, 'r') + @query_dsl = file.read + end + + validate_query_settings + end + + def validate_query_settings + unless @query || @query_template + raise LogStash::ConfigurationError, "Both `query` and `query_template` are empty. Require either `query` or `query_template`." + end + + if @query && @query_template + raise LogStash::ConfigurationError, "Both `query` and `query_template` are set. Use either `query` or `query_template`." + end + + if original_params.keys.include?("query_params") + raise LogStash::ConfigurationError, "`query_params` is not allowed when `query_type => 'dsl'`." + end + end + + def validate_ls_version_for_esql_support! + if Gem::Version.create(LOGSTASH_VERSION) < Gem::Version.create(LS_ESQL_SUPPORT_VERSION) + fail("Current version of Logstash does not include Elasticsearch client which supports ES|QL. Please upgrade Logstash to at least #{LS_ESQL_SUPPORT_VERSION}") + end + end + + def validate_esql_query_and_params! + # If Array, validate that query_params needs to contain only single-entry hashes, convert it to a Hash + if @query_params.kind_of?(Array) + illegal_entries = @query_params.reject {|e| e.kind_of?(Hash) && e.size == 1 } + raise LogStash::ConfigurationError, "`query_params` must contain only single-entry hashes. Illegal placeholders: #{illegal_entries}" if illegal_entries.any? + + @query_params = @query_params.reduce({}, :merge) + end + + illegal_keys = @query_params.keys.reject {|k| k[/^[a-z_][a-z0-9_]*$/] } + if illegal_keys.any? + message = "Illegal #{illegal_keys} placeholder names in `query_params`. A valid parameter name starts with a letter and contains letters, digits and underscores only;" + raise LogStash::ConfigurationError, message + end + + placeholders = @query.scan(/(?<=[?])[a-z_][a-z0-9_]*/i) + placeholders.each do |placeholder| + raise LogStash::ConfigurationError, "Placeholder #{placeholder} not found in query" unless @query_params.include?(placeholder) + end + end + + def validate_es_for_esql_support! + # make sure connected ES supports ES|QL (8.11+) + @es_version ||= get_client.es_version + es_supports_esql = Gem::Version.create(@es_version) >= Gem::Version.create(ES_ESQL_SUPPORT_VERSION) + fail("Connected Elasticsearch #{@es_version} version does not supports ES|QL. ES|QL feature requires at least Elasticsearch #{ES_ESQL_SUPPORT_VERSION} version.") unless es_supports_esql + end + end #class LogStash::Filters::Elasticsearch diff --git a/lib/logstash/filters/elasticsearch/client.rb b/lib/logstash/filters/elasticsearch/client.rb index 120d8e5..3174550 100644 --- a/lib/logstash/filters/elasticsearch/client.rb +++ b/lib/logstash/filters/elasticsearch/client.rb @@ -58,11 +58,19 @@ def initialize(logger, hosts, options = {}) def search(params={}) @client.search(params) end + + def esql_query(params={}) + @client.esql.query(params) + end def info @client.info end + def es_version + info&.dig('version', 'number') + end + def build_flavor @build_flavor ||= info&.dig('version', 'build_flavor') end diff --git a/lib/logstash/filters/elasticsearch/dsl_executor.rb b/lib/logstash/filters/elasticsearch/dsl_executor.rb new file mode 100644 index 0000000..bf92050 --- /dev/null +++ b/lib/logstash/filters/elasticsearch/dsl_executor.rb @@ -0,0 +1,140 @@ +# encoding: utf-8 + +module LogStash + module Filters + class Elasticsearch + class DslExecutor + def initialize(plugin, logger) + @index = plugin.params["index"] + @query = plugin.params["query"] + @query_dsl = plugin.query_dsl + @fields = plugin.params["fields"] + @result_size = plugin.params["result_size"] + @docinfo_fields = plugin.params["docinfo_fields"] + @tag_on_failure = plugin.params["tag_on_failure"] + @enable_sort = plugin.params["enable_sort"] + @sort = plugin.params["sort"] + @aggregation_fields = plugin.params["aggregation_fields"] + @logger = logger + @event_decorator = plugin.method(:decorate) + @target_field = plugin.params["target"] + if @target_field + def self.apply_target(path); "[#{@target_field}][#{path}]"; end + else + def self.apply_target(path); path; end + end + end + + def process(client, event) + matched = false + begin + params = { :index => event.sprintf(@index) } + + if @query_dsl + query = LogStash::Json.load(event.sprintf(@query_dsl)) + params[:body] = query + else + query = event.sprintf(@query) + params[:q] = query + params[:size] = @result_size + params[:sort] = @sort if @enable_sort + end + + @logger.debug("Querying elasticsearch for lookup", :params => params) + + results = client.search(params) + raise "Elasticsearch query error: #{results["_shards"]["failures"]}" if results["_shards"].include? "failures" + + event.set("[@metadata][total_hits]", extract_total_from_hits(results['hits'])) + + result_hits = results["hits"]["hits"] + if !result_hits.nil? && !result_hits.empty? + matched = true + @fields.each do |old_key, new_key| + old_key_path = extract_path(old_key) + extracted_hit_values = result_hits.map do |doc| + extract_value(doc["_source"], old_key_path) + end + value_to_set = extracted_hit_values.count > 1 ? extracted_hit_values : extracted_hit_values.first + set_to_event_target(event, new_key, value_to_set) + end + @docinfo_fields.each do |old_key, new_key| + old_key_path = extract_path(old_key) + extracted_docs_info = result_hits.map do |doc| + extract_value(doc, old_key_path) + end + value_to_set = extracted_docs_info.count > 1 ? extracted_docs_info : extracted_docs_info.first + set_to_event_target(event, new_key, value_to_set) + end + end + + result_aggregations = results["aggregations"] + if !result_aggregations.nil? && !result_aggregations.empty? + matched = true + @aggregation_fields.each do |agg_name, ls_field| + set_to_event_target(event, ls_field, result_aggregations[agg_name]) + end + end + + rescue => e + if @logger.trace? + @logger.warn("Failed to query elasticsearch for previous event", :index => @index, :query => @query, :event => event.to_hash, :error => e.message, :backtrace => e.backtrace) + elsif @logger.debug? + @logger.warn("Failed to query elasticsearch for previous event", :index => @index, :error => e.message, :backtrace => e.backtrace) + else + @logger.warn("Failed to query elasticsearch for previous event", :index => @index, :error => e.message) + end + @tag_on_failure.each { |tag| event.tag(tag) } + else + @event_decorator.call(event) if matched + end + end + + private + + # Given a "hits" object from an Elasticsearch response, return the total number of hits in + # the result set. + # @param hits [Hash{String=>Object}] + # @return [Integer] + def extract_total_from_hits(hits) + total = hits['total'] + + # Elasticsearch 7.x produces an object containing `value` and `relation` in order + # to enable unambiguous reporting when the total is only a lower bound; if we get + # an object back, return its `value`. + return total['value'] if total.kind_of?(Hash) + total + end + + # get an array of path elements from a path reference + def extract_path(path_reference) + return [path_reference] unless path_reference.start_with?('[') && path_reference.end_with?(']') + + path_reference[1...-1].split('][') + end + + # given a Hash and an array of path fragments, returns the value at the path + # @param source [Hash{String=>Object}] + # @param path [Array{String}] + # @return [Object] + def extract_value(source, path) + path.reduce(source) do |memo, old_key_fragment| + break unless memo.include?(old_key_fragment) + memo[old_key_fragment] + end + end + + # if @target is defined, creates a nested structure to inject result into target field + # if not defined, directly sets to the top-level event field + # @param event [LogStash::Event] + # @param new_key [String] name of the field to set + # @param value_to_set [Array] values to set + # @return [void] + def set_to_event_target(event, new_key, value_to_set) + key_to_set = self.apply_target(new_key) + event.set(key_to_set, value_to_set) + end + end + end + end +end \ No newline at end of file diff --git a/lib/logstash/filters/elasticsearch/esql_executor.rb b/lib/logstash/filters/elasticsearch/esql_executor.rb new file mode 100644 index 0000000..82ca47b --- /dev/null +++ b/lib/logstash/filters/elasticsearch/esql_executor.rb @@ -0,0 +1,178 @@ +# encoding: utf-8 + +module LogStash + module Filters + class Elasticsearch + class EsqlExecutor + + ESQL_PARSERS_BY_TYPE = Hash.new(lambda { |x| x }).merge( + 'date' => ->(value) { value && LogStash::Timestamp.new(value) }, + ) + + def initialize(plugin, logger) + @logger = logger + + @event_decorator = plugin.method(:decorate) + @query = plugin.params["query"] + + query_params = plugin.query_params || {} + reference_valued_params, static_valued_params = query_params.partition { |_, v| v.kind_of?(String) && v.match?(/^\[.*\]$/) } + @referenced_params = reference_valued_params&.to_h + # keep static params as an array of hashes to attach to the ES|QL api param easily + @static_params = static_valued_params.map { |k, v| { k => v } } + @tag_on_failure = plugin.params["tag_on_failure"] + @logger.debug("ES|QL query executor initialized with ", query: @query, query_params: query_params) + + # if the target is specified, all result entries will be copied to the target field + # otherwise, the first value of the result will be copied to the event + @target_field = plugin.params["target"] + @logger.warn("Only first query result will be copied to the event. Please specify `target` in plugin config to include all") if @target_field.nil? + end + + def process(client, event) + resolved_params = @referenced_params&.any? ? resolve_parameters(event) : [] + resolved_params.concat(@static_params) if @static_params&.any? + response = execute_query(client, resolved_params) + inform_warning(response) + process_response(event, response) + @event_decorator.call(event) + rescue => e + @logger.error("Failed to process ES|QL filter", exception: e) + @tag_on_failure.each { |tag| event.tag(tag) } + end + + private + + def resolve_parameters(event) + @referenced_params.map do |key, value| + begin + resolved_value = event.get(value) + @logger.debug("Resolved value for #{key}: #{resolved_value}, its class: #{resolved_value.class}") + { key => resolved_value } + rescue => e + # catches invalid field reference + raise "Failed to resolve parameter `#{key}` with `#{value}`. Error: #{e.message}" + end + end + end + + def execute_query(client, params) + # debug logs may help to check what query shape the plugin is sending to ES + @logger.debug("Executing ES|QL query", query: @query, params: params) + client.esql_query({ body: { query: @query, params: params }, format: 'json', drop_null_columns: true }) + end + + def process_response(event, response) + columns = response['columns']&.freeze || [] + values = response['values']&.freeze || [] + if values.nil? || values.size == 0 + @logger.debug("Empty ES|QL query result", columns: columns, values: values) + return + end + + # this shouldn't happen but just in case to avoid crashes the plugin + if columns.nil? || columns.size == 0 + @logger.error("No columns exist but received values", columns: columns, values: values) + return + end + + event.set("[@metadata][total_values]", values.size) + @logger.debug("ES|QL query result values size ", size: values.size) + + column_specs = columns.map { |column| ColumnSpec.new(column) } + sub_element_mark_map = mark_sub_elements(column_specs) + multi_fields = sub_element_mark_map.filter_map { |key, val| key.name if val == true } + + @logger.debug("Multi-fields found in ES|QL result and they will not be available in the event. Please use `RENAME` command if you want to include them.", { :detected_multi_fields => multi_fields }) if multi_fields.any? + + if @target_field + values_to_set = values.map do |row| + mapped_data = column_specs.each_with_index.with_object({}) do |(column, index), mapped_data| + # `unless value.nil?` is a part of `drop_null_columns` that if some of the columns' values are not `nil`, `nil` values appear, + # we should continuously filter them out to achieve full `drop_null_columns` on each individual row (ideal `LIMIT 1` result) + # we also exclude sub-elements of the base field + if row[index] && sub_element_mark_map[column] == false + value_to_set = ESQL_PARSERS_BY_TYPE[column.type].call(row[index]) + mapped_data[column.name] = value_to_set + end + end + generate_nested_structure(mapped_data) unless mapped_data.empty? + end + event.set("[#{@target_field}]", values_to_set) + else + column_specs.zip(values.first).each do |(column, value) | + if value && sub_element_mark_map[column] == false + value_to_set = ESQL_PARSERS_BY_TYPE[column.type].call(value) + event.set(column.field_reference, value_to_set) + end + end + end + end + + def inform_warning(response) + return unless (warning = response&.headers&.dig('warning')) + @logger.warn("ES|QL executor received warning", { message: warning }) + end + + # Transforms dotted keys to nested JSON shape + # @param dot_keyed_hash [Hash] whose keys are dotted (example 'a.b.c.d': 'val') + # @return [Hash] whose keys are nested with value mapped ({'a':{'b':{'c':{'d':'val'}}}}) + def generate_nested_structure(dot_keyed_hash) + dot_keyed_hash.each_with_object({}) do |(key, value), result| + key_parts = key.to_s.split('.') + *path, leaf = key_parts + leaf_scope = path.inject(result) { |scope, part| scope[part] ||= {} } + leaf_scope[leaf] = value + end + end + + # Determines whether each column in a collection is a nested sub-element (e.g "user.age") + # of another column in the same collection (e.g "user"). + # + # @param columns [Array] An array of objects with a `name` attribute representing field paths. + # @return [Hash] A hash mapping each column to `true` if it is a sub-element of another field, `false` otherwise. + # Time complexity: (O(NlogN+N*K)) where K is the number of conflict depth + # without (`prefix_set`) memoization, it would be O(N^2) + def mark_sub_elements(columns) + # Sort columns by name length (ascending) + sorted_columns = columns.sort_by { |c| c.name.length } + prefix_set = Set.new # memoization set + + sorted_columns.each_with_object({}) do |column, memo| + # Split the column name into parts (e.g., "user.profile.age" → ["user", "profile", "age"]) + parts = column.name.split('.') + + # Generate all possible parent prefixes (e.g., "user", "user.profile") + # and check if any parent prefix exists in the set + parent_prefixes = (0...parts.size - 1).map { |i| parts[0..i].join('.') } + memo[column] = parent_prefixes.any? { |prefix| prefix_set.include?(prefix) } + prefix_set.add(column.name) + end + end + end + + # Class representing a column specification in the ESQL response['columns'] + # The class's main purpose is to provide a structure for the event key + # columns is an array with `name` and `type` pair (example: `{"name"=>"@timestamp", "type"=>"date"}`) + # @attr_reader :name [String] The name of the column + # @attr_reader :type [String] The type of the column + class ColumnSpec + attr_reader :name, :type + + def initialize(spec) + @name = isolate(spec.fetch('name')) + @type = isolate(spec.fetch('type')) + end + + def field_reference + @_field_reference ||= '[' + name.gsub('.', '][') + ']' + end + + private + def isolate(value) + value.frozen? ? value : value.clone.freeze + end + end + end + end +end diff --git a/logstash-filter-elasticsearch.gemspec b/logstash-filter-elasticsearch.gemspec index 190901e..bd3e502 100644 --- a/logstash-filter-elasticsearch.gemspec +++ b/logstash-filter-elasticsearch.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-filter-elasticsearch' - s.version = '3.18.0' + s.version = '3.19.0' s.licenses = ['Apache License (2.0)'] s.summary = "Copies fields from previous log events in Elasticsearch to current events " s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/filters/elasticsearch_dsl_spec.rb b/spec/filters/elasticsearch_dsl_spec.rb new file mode 100644 index 0000000..2885caa --- /dev/null +++ b/spec/filters/elasticsearch_dsl_spec.rb @@ -0,0 +1,372 @@ +# encoding: utf-8 +require "logstash/devutils/rspec/spec_helper" +require "logstash/filters/elasticsearch" + +describe LogStash::Filters::Elasticsearch::DslExecutor do + let(:client) { instance_double(LogStash::Filters::ElasticsearchClient) } + let(:logger) { double("logger") } + let(:plugin) { LogStash::Filters::Elasticsearch.new(plugin_config) } + let(:plugin_config) do + { + "index" => "test_index", + "query" => "test_query", + "fields" => { "field1" => "field1_mapped" }, + "result_size" => 10, + "docinfo_fields" => { "_id" => "doc_id" }, + "tag_on_failure" => ["_failure"], + "enable_sort" => true, + "sort" => "@timestamp:desc", + "aggregation_fields" => { "agg1" => "agg1_mapped" } + } + end + let(:dsl_executor) { described_class.new(plugin, logger) } + let(:event) { LogStash::Event.new({}) } + + describe "#initialize" do + it "initializes instance variables correctly" do + expect(dsl_executor.instance_variable_get(:@index)).to eq("test_index") + expect(dsl_executor.instance_variable_get(:@query)).to eq("test_query") + expect(dsl_executor.instance_variable_get(:@query_dsl)).to eq(nil) + expect(dsl_executor.instance_variable_get(:@fields)).to eq({ "field1" => "field1_mapped" }) + expect(dsl_executor.instance_variable_get(:@result_size)).to eq(10) + expect(dsl_executor.instance_variable_get(:@docinfo_fields)).to eq({ "_id" => "doc_id" }) + expect(dsl_executor.instance_variable_get(:@tag_on_failure)).to eq(["_failure"]) + expect(dsl_executor.instance_variable_get(:@enable_sort)).to eq(true) + expect(dsl_executor.instance_variable_get(:@sort)).to eq("@timestamp:desc") + expect(dsl_executor.instance_variable_get(:@aggregation_fields)).to eq({ "agg1" => "agg1_mapped" }) + expect(dsl_executor.instance_variable_get(:@logger)).to eq(logger) + expect(dsl_executor.instance_variable_get(:@event_decorator)).not_to be_nil + end + end + + describe "data fetch" do + let(:plugin_config) do + { + "hosts" => ["localhost:9200"], + "query" => "response: 404", + "fields" => { "response" => "code" }, + "docinfo_fields" => { "_index" => "es_index" }, + "aggregation_fields" => { "bytes_avg" => "bytes_avg_ls_field" } + } + end + + let(:response) do + LogStash::Json.load(File.read(File.join(File.dirname(__FILE__), "fixtures", "request_x_1.json"))) + end + + let(:client) { double(:client) } + + before(:each) do + allow(LogStash::Filters::ElasticsearchClient).to receive(:new).and_return(client) + if defined?(Elastic::Transport) + allow(client).to receive(:es_transport_client_type).and_return('elastic_transport') + else + allow(client).to receive(:es_transport_client_type).and_return('elasticsearch_transport') + end + allow(client).to receive(:search).and_return(response) + allow(plugin).to receive(:test_connection!) + allow(plugin).to receive(:setup_serverless) + plugin.register + end + + after(:each) do + Thread.current[:filter_elasticsearch_client] = nil + end + + it "should enhance the current event with new data" do + plugin.filter(event) + expect(event.get("code")).to eq(404) + expect(event.get("es_index")).to eq("logstash-2014.08.26") + expect(event.get("bytes_avg_ls_field")["value"]).to eq(294) + end + + it "should receive all necessary params to perform the search" do + expect(client).to receive(:search).with({:q=>"response: 404", :size=>1, :index=>"", :sort=>"@timestamp:desc"}) + plugin.filter(event) + end + + context "when asking to hit specific index" do + + let(:plugin_config) do + { + "index" => "foo*", + "hosts" => ["localhost:9200"], + "query" => "response: 404", + "fields" => { "response" => "code" } + } + end + + it "should receive all necessary params to perform the search" do + expect(client).to receive(:search).with({:q=>"response: 404", :size=>1, :index=>"foo*", :sort=>"@timestamp:desc"}) + plugin.filter(event) + end + end + + context "when asking for more than one result" do + + let(:plugin_config) do + { + "hosts" => ["localhost:9200"], + "query" => "response: 404", + "fields" => { "response" => "code" }, + "result_size" => 10 + } + end + + let(:response) do + LogStash::Json.load(File.read(File.join(File.dirname(__FILE__), "fixtures", "request_x_10.json"))) + end + + it "should enhance the current event with new data" do + plugin.filter(event) + expect(event.get("code")).to eq([404]*10) + end + end + + context 'when Elasticsearch 7.x gives us a totals object instead of an integer' do + let(:plugin_config) do + { + "hosts" => ["localhost:9200"], + "query" => "response: 404", + "fields" => { "response" => "code" }, + "result_size" => 10 + } + end + + let(:response) do + LogStash::Json.load(File.read(File.join(File.dirname(__FILE__), "fixtures", "elasticsearch_7.x_hits_total_as_object.json"))) + end + + it "should enhance the current event with new data" do + plugin.filter(event) + expect(event.get("[@metadata][total_hits]")).to eq(13476) + end + end + + context "if something wrong happen during connection" do + + before(:each) do + allow(LogStash::Filters::ElasticsearchClient).to receive(:new).and_return(client) + allow(client).to receive(:search).and_raise("connection exception") + plugin.register + end + + it "tag the event as something happened, but still deliver it" do + expect(plugin.logger).to receive(:warn) + plugin.filter(event) + expect(event.to_hash["tags"]).to include("_elasticsearch_lookup_failure") + end + end + + # Tagging test for positive results + context "Tagging should occur if query returns results" do + let(:plugin_config) do + { + "index" => "foo*", + "hosts" => ["localhost:9200"], + "query" => "response: 404", + "add_tag" => ["tagged"] + } + end + + let(:response) do + LogStash::Json.load(File.read(File.join(File.dirname(__FILE__), "fixtures", "request_x_10.json"))) + end + + it "should tag the current event if results returned" do + plugin.filter(event) + expect(event.to_hash["tags"]).to include("tagged") + end + end + + context "an aggregation search with size 0 that matches" do + let(:plugin_config) do + { + "index" => "foo*", + "hosts" => ["localhost:9200"], + "query" => "response: 404", + "add_tag" => ["tagged"], + "result_size" => 0, + "aggregation_fields" => { "bytes_avg" => "bytes_avg_ls_field" } + } + end + + let(:response) do + LogStash::Json.load(File.read(File.join(File.dirname(__FILE__), "fixtures", "request_size0_agg.json"))) + end + + it "should tag the current event" do + plugin.filter(event) + expect(event.get("tags")).to include("tagged") + expect(event.get("bytes_avg_ls_field")["value"]).to eq(294) + end + end + + # Tagging test for negative results + context "Tagging should not occur if query has no results" do + let(:plugin_config) do + { + "index" => "foo*", + "hosts" => ["localhost:9200"], + "query" => "response: 404", + "add_tag" => ["tagged"] + } + end + + let(:response) do + LogStash::Json.load(File.read(File.join(File.dirname(__FILE__), "fixtures", "request_error.json"))) + end + + it "should not tag the current event" do + plugin.filter(event) + expect(event.to_hash["tags"]).to_not include("tagged") + end + end + context "testing a simple query template" do + let(:plugin_config) do + { + "hosts" => ["localhost:9200"], + "query_template" => File.join(File.dirname(__FILE__), "fixtures", "query_template.json"), + "fields" => { "response" => "code" }, + "result_size" => 1 + } + end + + let(:response) do + LogStash::Json.load(File.read(File.join(File.dirname(__FILE__), "fixtures", "request_x_1.json"))) + end + + it "should enhance the current event with new data" do + plugin.filter(event) + expect(event.get("code")).to eq(404) + end + + end + + context "testing a simple index substitution" do + let(:event) { + LogStash::Event.new( + { + "subst_field" => "subst_value" + } + ) + } + let(:plugin_config) do + { + "index" => "foo_%{subst_field}*", + "hosts" => ["localhost:9200"], + "query" => "response: 404", + "fields" => { "response" => "code" } + } + end + + it "should receive substituted index name" do + expect(client).to receive(:search).with({:q => "response: 404", :size => 1, :index => "foo_subst_value*", :sort => "@timestamp:desc"}) + plugin.filter(event) + end + end + + context "if query result errored but no exception is thrown" do + let(:response) do + LogStash::Json.load(File.read(File.join(File.dirname(__FILE__), "fixtures", "request_error.json"))) + end + + before(:each) do + allow(LogStash::Filters::ElasticsearchClient).to receive(:new).and_return(client) + allow(client).to receive(:search).and_return(response) + plugin.register + end + + it "tag the event as something happened, but still deliver it" do + expect(plugin.logger).to receive(:warn) + plugin.filter(event) + expect(event.to_hash["tags"]).to include("_elasticsearch_lookup_failure") + end + end + + context 'with client-level retries' do + let(:plugin_config) do + super().merge( + "retry_on_failure" => 3, + "retry_on_status" => [500] + ) + end + end + + context "with custom headers" do + let(:plugin_config) do + { + "query" => "*", + "custom_headers" => { "Custom-Header-1" => "Custom Value 1", "Custom-Header-2" => "Custom Value 2" } + } + end + + let(:plugin) { LogStash::Filters::Elasticsearch.new(plugin_config) } + let(:client_double) { double("client") } + let(:transport_double) { double("transport", options: { transport_options: { headers: plugin_config["custom_headers"] } }) } + + before do + allow(plugin).to receive(:get_client).and_return(client_double) + if defined?(Elastic::Transport) + allow(client_double).to receive(:es_transport_client_type).and_return('elastic_transport') + else + allow(client_double).to receive(:es_transport_client_type).and_return('elasticsearch_transport') + end + allow(client_double).to receive(:client).and_return(transport_double) + end + + it "sets custom headers" do + plugin.register + client = plugin.send(:get_client).client + expect(client.options[:transport_options][:headers]).to match(hash_including(plugin_config["custom_headers"])) + end + end + + context "if query is on nested field" do + let(:plugin_config) do + { + "hosts" => ["localhost:9200"], + "query" => "response: 404", + "fields" => [ ["[geoip][ip]", "ip_address"] ] + } + end + + it "should enhance the current event with new data" do + plugin.filter(event) + expect(event.get("ip_address")).to eq("66.249.73.185") + end + + end + end + + describe "#set_to_event_target" do + it 'is ready to set to `target`' do + expect(dsl_executor.apply_target("path")).to eq("path") + end + + context "when `@target` is nil, default behavior" do + it "sets the value directly to the top-level event field" do + dsl_executor.send(:set_to_event_target, event, "new_field", %w[value1 value2]) + expect(event.get("new_field")).to eq(%w[value1 value2]) + end + end + + context "when @target is defined" do + let(:plugin_config) { + super().merge({ "target" => "nested" }) + } + + it "creates a nested structure under the target field" do + dsl_executor.send(:set_to_event_target, event, "new_field", %w[value1 value2]) + expect(event.get("nested")).to eq({ "new_field" => %w[value1 value2] }) + end + + it "overwrites existing target field with new data" do + event.set("nested", { "existing_field" => "existing_value", "new_field" => "value0" }) + dsl_executor.send(:set_to_event_target, event, "new_field", ["value1"]) + expect(event.get("nested")).to eq({ "existing_field" => "existing_value", "new_field" => ["value1"] }) + end + end + end + +end diff --git a/spec/filters/elasticsearch_esql_spec.rb b/spec/filters/elasticsearch_esql_spec.rb new file mode 100644 index 0000000..ff102b4 --- /dev/null +++ b/spec/filters/elasticsearch_esql_spec.rb @@ -0,0 +1,211 @@ +# encoding: utf-8 +require "logstash/devutils/rspec/spec_helper" +require "logstash/filters/elasticsearch" + +describe LogStash::Filters::Elasticsearch::EsqlExecutor do + let(:client) { instance_double(LogStash::Filters::ElasticsearchClient) } + let(:logger) { double("logger") } + let(:plugin) { LogStash::Filters::Elasticsearch.new(plugin_config) } + let(:plugin_config) do + { + "query_type" => "esql", + "query" => "FROM test-index | STATS count() BY field | LIMIT 10" + } + end + let(:esql_executor) { described_class.new(plugin, logger) } + + context "when initializes" do + it "sets up the ESQL executor with correct parameters" do + allow(logger).to receive(:debug) + allow(logger).to receive(:warn) + expect(esql_executor.instance_variable_get(:@query)).to eq(plugin_config["query"]) + expect(esql_executor.instance_variable_get(:@referenced_params)).to eq({}) + expect(esql_executor.instance_variable_get(:@static_params)).to eq([]) + expect(esql_executor.instance_variable_get(:@tag_on_failure)).to eq(["_elasticsearch_lookup_failure"]) + end + end + + context "when processes" do + let(:plugin_config) { + super() + .merge( + { + "query" => "FROM my-index | WHERE field = ?foo | LIMIT 5", + "query_params" => { "foo" => "[bar]" } + }) + } + let(:event) { LogStash::Event.new({}) } + let(:response) { + { + 'values' => [["foo", "bar", nil]], + 'columns' => [{ 'name' => 'id', 'type' => 'keyword' }, { 'name' => 'val', 'type' => 'keyword' }, { 'name' => 'odd', 'type' => 'keyword' }] + } + } + + before do + allow(logger).to receive(:debug) + allow(logger).to receive(:warn) + end + + it "resolves parameters" do + expect(event).to receive(:get).with("[bar]").and_return("resolved_value") + resolved_params = esql_executor.send(:resolve_parameters, event) + expect(resolved_params).to include("foo" => "resolved_value") + end + + it "executes the query with resolved parameters" do + allow(logger).to receive(:debug) + expect(event).to receive(:get).with("[bar]").and_return("resolved_value") + expect(client).to receive(:esql_query).with( + { body: { query: plugin_config["query"], params: [{ "foo" => "resolved_value" }] }, format: 'json', drop_null_columns: true, }) + resolved_params = esql_executor.send(:resolve_parameters, event) + esql_executor.send(:execute_query, client, resolved_params) + end + + it "informs warning if received warning" do + allow(response).to receive(:headers).and_return({ "warning" => "some warning" }) + expect(logger).to receive(:warn).with("ES|QL executor received warning", { :message => "some warning" }) + esql_executor.send(:inform_warning, response) + end + + it "processes the response and adds metadata" do + expect(event).to receive(:set).with("[@metadata][total_values]", 1) + # [id], [val] aren't resolved via sprintf, use as it is + expect(event).to receive(:set).with("[id]", "foo") + expect(event).to receive(:set).with("[val]", "bar") + esql_executor.send(:process_response, event, response) + end + + it "executes chain of processes" do + allow(plugin).to receive(:decorate) + allow(logger).to receive(:debug) + allow(response).to receive(:headers).and_return({}) + expect(client).to receive(:esql_query).with( + { + body: { query: plugin_config["query"], params: [{"foo"=>"resolve_me"}] }, + format: 'json', + drop_null_columns: true, + }).and_return(response) + + event = LogStash::Event.new({ "hello" => "world", "bar" => "resolve_me" }) + expect { esql_executor.process(client, event) }.to_not raise_error + expect(event.get("[@metadata][total_values]")).to eq(1) + expect(event.get("hello")).to eq("world") + expect(event.get("val")).to eq("bar") + expect(event.get("odd")).to be_nil # filters out non-exist fields + end + + it "tags on plugin failures" do + expect(event).to receive(:get).with("[bar]").and_raise("Event#get Invalid FieldReference error") + + expect(logger).to receive(:error).with("Failed to process ES|QL filter", exception: instance_of(RuntimeError)) + expect(event).to receive(:tag).with("_elasticsearch_lookup_failure") + esql_executor.process(client, event) + end + + it "tags on query execution failures" do + allow(logger).to receive(:debug) + allow(client).to receive(:esql_query).and_raise("Query execution error") + + expect(logger).to receive(:error).with("Failed to process ES|QL filter", exception: instance_of(RuntimeError)) + expect(event).to receive(:tag).with("_elasticsearch_lookup_failure") + esql_executor.process(client, event) + end + + describe "#target" do + let(:event) { LogStash::Event.new({ "hello" => "world", "bar" => "resolve_me" }) } + let(:response) { + super().merge({ 'values' => [["foo", "bar", nil], %w[hello again world], %w[another value here]] }) + } + before(:each) do + expect(client).to receive(:esql_query).with(any_args).and_return(response) + allow(plugin).to receive(:decorate) + allow(logger).to receive(:debug) + allow(response).to receive(:headers).and_return({}) + end + + context "when specified" do + let(:plugin_config) { + super().merge({ "target" => "my-target" }) + } + + it "sets all query results into event" do + expected_result = [ + {"id"=>"foo", "val"=>"bar"}, + {"id"=>"hello", "val"=>"again", "odd"=>"world"}, + {"id"=>"another", "val"=>"value", "odd"=>"here"} + ] + expect { esql_executor.process(client, event) }.to_not raise_error + expect(event.get("[@metadata][total_values]")).to eq(3) + expect(event.get("my-target").size).to eq(3) + expect(event.get("my-target")).to eq(expected_result) + end + end + + context "when not specified" do + shared_examples "first result into the event" do + it "sets" do + expect { esql_executor.process(client, event) }.to_not raise_error + expect(event.get("[@metadata][total_values]")).to eq(3) + expect(event.get("id")).to eq("foo") + expect(event.get("val")).to eq("bar") + expect(event.get("odd")).to eq(nil) + end + end + context "when limit is included in the query" do + let(:plugin_config) { + super().merge({ "query" => "FROM my-index | LIMIT 555" }) + } + it_behaves_like "first result into the event" + end + + context "when limit is not included in the query" do + let(:plugin_config) { + super().merge({ "query" => "FROM my-index" }) + } + it_behaves_like "first result into the event" + end + end + end + end + + describe "#query placeholders" do + before(:each) do + allow(logger).to receive(:debug) + allow(logger).to receive(:warn) + plugin.send(:validate_esql_query_and_params!) + end + + context "when `query_params` is an Array contains {key => val} entries" do + let(:plugin_config) { + super() + .merge( + { + "query" => "FROM my-index | LIMIT 1", + "query_params" => [{ "a" => "b" }, { "c" => "[b]" }, { "e" => 1 }, { "f" => "[g]" }], + }) + } + + it "separates references and static params at initialization" do + expect(esql_executor.instance_variable_get(:@referenced_params)).to eq({"c" => "[b]", "f" => "[g]"}) + expect(esql_executor.instance_variable_get(:@static_params)).to eq([{"a" => "b"}, {"e" => 1}]) + end + end + + context "when `query_params` is a Hash" do + let(:plugin_config) { + super() + .merge( + { + "query" => "FROM my-index | LIMIT 1", + "query_params" => { "a" => "b", "c" => "[b]", "e" => 1, "f" => "[g]" }, + }) + } + + it "separates references and static params at initialization" do + expect(esql_executor.instance_variable_get(:@referenced_params)).to eq({"c" => "[b]", "f" => "[g]"}) + expect(esql_executor.instance_variable_get(:@static_params)).to eq([{"a" => "b"}, {"e" => 1}]) + end + end + end +end if LOGSTASH_VERSION >= LogStash::Filters::Elasticsearch::LS_ESQL_SUPPORT_VERSION \ No newline at end of file diff --git a/spec/filters/elasticsearch_spec.rb b/spec/filters/elasticsearch_spec.rb index 3e1f6f2..4c1deb4 100644 --- a/spec/filters/elasticsearch_spec.rb +++ b/spec/filters/elasticsearch_spec.rb @@ -61,7 +61,7 @@ allow(filter_client).to receive(:serverless?).and_return(true) allow(filter_client).to receive(:client).and_return(es_client) - if elastic_ruby_v8_client_available? + if defined?(Elastic::Transport) allow(es_client).to receive(:info) .with(a_hash_including( :headers => LogStash::Filters::ElasticsearchClient::DEFAULT_EAV_HEADER)) @@ -93,306 +93,6 @@ end end - describe "data fetch" do - let(:config) do - { - "hosts" => ["localhost:9200"], - "query" => "response: 404", - "fields" => { "response" => "code" }, - "docinfo_fields" => { "_index" => "es_index" }, - "aggregation_fields" => { "bytes_avg" => "bytes_avg_ls_field" } - } - end - - let(:response) do - LogStash::Json.load(File.read(File.join(File.dirname(__FILE__), "fixtures", "request_x_1.json"))) - end - - let(:client) { double(:client) } - - before(:each) do - allow(LogStash::Filters::ElasticsearchClient).to receive(:new).and_return(client) - if elastic_ruby_v8_client_available? - allow(client).to receive(:es_transport_client_type).and_return('elastic_transport') - else - allow(client).to receive(:es_transport_client_type).and_return('elasticsearch_transport') - end - allow(client).to receive(:search).and_return(response) - allow(plugin).to receive(:test_connection!) - allow(plugin).to receive(:setup_serverless) - plugin.register - end - - after(:each) do - Thread.current[:filter_elasticsearch_client] = nil - end - - it "should enhance the current event with new data" do - plugin.filter(event) - expect(event.get("code")).to eq(404) - expect(event.get("es_index")).to eq("logstash-2014.08.26") - expect(event.get("bytes_avg_ls_field")["value"]).to eq(294) - end - - it "should receive all necessary params to perform the search" do - expect(client).to receive(:search).with({:q=>"response: 404", :size=>1, :index=>"", :sort=>"@timestamp:desc"}) - plugin.filter(event) - end - - context "when asking to hit specific index" do - - let(:config) do - { - "index" => "foo*", - "hosts" => ["localhost:9200"], - "query" => "response: 404", - "fields" => { "response" => "code" } - } - end - - it "should receive all necessary params to perform the search" do - expect(client).to receive(:search).with({:q=>"response: 404", :size=>1, :index=>"foo*", :sort=>"@timestamp:desc"}) - plugin.filter(event) - end - end - - context "when asking for more than one result" do - - let(:config) do - { - "hosts" => ["localhost:9200"], - "query" => "response: 404", - "fields" => { "response" => "code" }, - "result_size" => 10 - } - end - - let(:response) do - LogStash::Json.load(File.read(File.join(File.dirname(__FILE__), "fixtures", "request_x_10.json"))) - end - - it "should enhance the current event with new data" do - plugin.filter(event) - expect(event.get("code")).to eq([404]*10) - end - end - - context 'when Elasticsearch 7.x gives us a totals object instead of an integer' do - let(:config) do - { - "hosts" => ["localhost:9200"], - "query" => "response: 404", - "fields" => { "response" => "code" }, - "result_size" => 10 - } - end - - let(:response) do - LogStash::Json.load(File.read(File.join(File.dirname(__FILE__), "fixtures", "elasticsearch_7.x_hits_total_as_object.json"))) - end - - it "should enhance the current event with new data" do - plugin.filter(event) - expect(event.get("[@metadata][total_hits]")).to eq(13476) - end - end - - context "if something wrong happen during connection" do - - before(:each) do - allow(LogStash::Filters::ElasticsearchClient).to receive(:new).and_return(client) - allow(client).to receive(:search).and_raise("connection exception") - plugin.register - end - - it "tag the event as something happened, but still deliver it" do - expect(plugin.logger).to receive(:warn) - plugin.filter(event) - expect(event.to_hash["tags"]).to include("_elasticsearch_lookup_failure") - end - end - - # Tagging test for positive results - context "Tagging should occur if query returns results" do - let(:config) do - { - "index" => "foo*", - "hosts" => ["localhost:9200"], - "query" => "response: 404", - "add_tag" => ["tagged"] - } - end - - let(:response) do - LogStash::Json.load(File.read(File.join(File.dirname(__FILE__), "fixtures", "request_x_10.json"))) - end - - it "should tag the current event if results returned" do - plugin.filter(event) - expect(event.to_hash["tags"]).to include("tagged") - end - end - - context "an aggregation search with size 0 that matches" do - let(:config) do - { - "index" => "foo*", - "hosts" => ["localhost:9200"], - "query" => "response: 404", - "add_tag" => ["tagged"], - "result_size" => 0, - "aggregation_fields" => { "bytes_avg" => "bytes_avg_ls_field" } - } - end - - let(:response) do - LogStash::Json.load(File.read(File.join(File.dirname(__FILE__), "fixtures", "request_size0_agg.json"))) - end - - it "should tag the current event" do - plugin.filter(event) - expect(event.get("tags")).to include("tagged") - expect(event.get("bytes_avg_ls_field")["value"]).to eq(294) - end - end - - # Tagging test for negative results - context "Tagging should not occur if query has no results" do - let(:config) do - { - "index" => "foo*", - "hosts" => ["localhost:9200"], - "query" => "response: 404", - "add_tag" => ["tagged"] - } - end - - let(:response) do - LogStash::Json.load(File.read(File.join(File.dirname(__FILE__), "fixtures", "request_error.json"))) - end - - it "should not tag the current event" do - plugin.filter(event) - expect(event.to_hash["tags"]).to_not include("tagged") - end - end - context "testing a simple query template" do - let(:config) do - { - "hosts" => ["localhost:9200"], - "query_template" => File.join(File.dirname(__FILE__), "fixtures", "query_template.json"), - "fields" => { "response" => "code" }, - "result_size" => 1 - } - end - - let(:response) do - LogStash::Json.load(File.read(File.join(File.dirname(__FILE__), "fixtures", "request_x_1.json"))) - end - - it "should enhance the current event with new data" do - plugin.filter(event) - expect(event.get("code")).to eq(404) - end - - end - - context "testing a simple index substitution" do - let(:event) { - LogStash::Event.new( - { - "subst_field" => "subst_value" - } - ) - } - let(:config) do - { - "index" => "foo_%{subst_field}*", - "hosts" => ["localhost:9200"], - "query" => "response: 404", - "fields" => { "response" => "code" } - } - end - - it "should receive substituted index name" do - expect(client).to receive(:search).with({:q => "response: 404", :size => 1, :index => "foo_subst_value*", :sort => "@timestamp:desc"}) - plugin.filter(event) - end - end - - context "if query result errored but no exception is thrown" do - let(:response) do - LogStash::Json.load(File.read(File.join(File.dirname(__FILE__), "fixtures", "request_error.json"))) - end - - before(:each) do - allow(LogStash::Filters::ElasticsearchClient).to receive(:new).and_return(client) - allow(client).to receive(:search).and_return(response) - plugin.register - end - - it "tag the event as something happened, but still deliver it" do - expect(plugin.logger).to receive(:warn) - plugin.filter(event) - expect(event.to_hash["tags"]).to include("_elasticsearch_lookup_failure") - end - end - - context 'with client-level retries' do - let(:config) do - super().merge( - "retry_on_failure" => 3, - "retry_on_status" => [500] - ) - end - end - - context "with custom headers" do - let(:config) do - { - "query" => "*", - "custom_headers" => { "Custom-Header-1" => "Custom Value 1", "Custom-Header-2" => "Custom Value 2" } - } - end - - let(:plugin) { LogStash::Filters::Elasticsearch.new(config) } - let(:client_double) { double("client") } - let(:transport_double) { double("transport", options: { transport_options: { headers: config["custom_headers"] } }) } - - before do - allow(plugin).to receive(:get_client).and_return(client_double) - if elastic_ruby_v8_client_available? - allow(client_double).to receive(:es_transport_client_type).and_return('elastic_transport') - else - allow(client_double).to receive(:es_transport_client_type).and_return('elasticsearch_transport') - end - allow(client_double).to receive(:client).and_return(transport_double) - end - - it "sets custom headers" do - plugin.register - client = plugin.send(:get_client).client - expect(client.options[:transport_options][:headers]).to match(hash_including(config["custom_headers"])) - end - end - - context "if query is on nested field" do - let(:config) do - { - "hosts" => ["localhost:9200"], - "query" => "response: 404", - "fields" => [ ["[geoip][ip]", "ip_address"] ] - } - end - - it "should enhance the current event with new data" do - plugin.filter(event) - expect(event.get("ip_address")).to eq("66.249.73.185") - end - - end - end - class StoppableServer attr_reader :port @@ -525,7 +225,7 @@ def wait_receive_request # this spec is a safeguard to trigger an assessment of thread-safety should # we choose a different transport adapter in the future. transport_class = extract_transport(client).options.fetch(:transport_class) - if elastic_ruby_v8_client_available? + if defined?(Elastic::Transport) allow(client).to receive(:es_transport_client_type).and_return("elastic_transport") expect(transport_class).to equal ::Elastic::Transport::Transport::HTTP::Manticore else @@ -845,7 +545,7 @@ def wait_receive_request before(:each) do allow(LogStash::Filters::ElasticsearchClient).to receive(:new).and_return(client) - if elastic_ruby_v8_client_available? + if defined?(Elastic::Transport) allow(client).to receive(:es_transport_client_type).and_return('elastic_transport') else allow(client).to receive(:es_transport_client_type).and_return('elasticsearch_transport') @@ -864,31 +564,141 @@ def wait_receive_request end end - describe "#set_to_event_target" do + describe "ES|QL" do - context "when `@target` is nil, default behavior" do - let(:config) {{ }} + describe "compatibility" do + let(:config) {{ "hosts" => ["localhost:9200"], "query_type" => "esql", "query" => "FROM my-index" }} - it "sets the value directly to the top-level event field" do - plugin.send(:set_to_event_target, event, "new_field", %w[value1 value2]) - expect(event.get("new_field")).to eq(%w[value1 value2]) + context "when LS doesn't support ES|QL" do + let(:ls_version) { LogStash::Filters::Elasticsearch::LS_ESQL_SUPPORT_VERSION } + before(:each) do + stub_const("LOGSTASH_VERSION", "8.17.0") + end + + it "raises a runtime error" do + expect { plugin.send(:validate_ls_version_for_esql_support!) } + .to raise_error(RuntimeError, /Current version of Logstash does not include Elasticsearch client which supports ES|QL. Please upgrade Logstash to at least #{ls_version}/) + end end - end - context "when @target is defined" do - let(:config) {{ "target" => "nested" }} + context "when ES doesn't support ES|QL" do + let(:es_version) { LogStash::Filters::Elasticsearch::ES_ESQL_SUPPORT_VERSION } + let(:client) { double(:client) } + + it "raises a runtime error" do + allow(plugin).to receive(:get_client).twice.and_return(client) + allow(client).to receive(:es_version).and_return("8.8.0") - it "creates a nested structure under the target field" do - plugin.send(:set_to_event_target, event, "new_field", %w[value1 value2]) - expect(event.get("nested")).to eq({ "new_field" => %w[value1 value2] }) + expect { plugin.send(:validate_es_for_esql_support!) } + .to raise_error(RuntimeError, /Connected Elasticsearch 8.8.0 version does not supports ES|QL. ES|QL feature requires at least Elasticsearch #{es_version} version./) + end end + end - it "overwrites existing target field with new data" do - event.set("nested", { "existing_field" => "existing_value", "new_field" => "value0" }) - plugin.send(:set_to_event_target, event, "new_field", ["value1"]) - expect(event.get("nested")).to eq({ "existing_field" => "existing_value", "new_field" => ["value1"] }) + context "when non-ES|QL params applied" do + let(:config) do + { + "hosts" => ["localhost:9200"], + "query_type" => "esql", + "query" => "FROM my-index", + "index" => "some-index", + "docinfo_fields" => { "_index" => "es_index" }, + "sort" => "@timestamp:desc", + "enable_sort" => true, + "aggregation_fields" => { "bytes_avg" => "bytes_avg_ls_field" } + } + end + it "raises a config error" do + invalid_params_with_esql = %w(index docinfo_fields sort enable_sort aggregation_fields) + error_text = /Configured #{invalid_params_with_esql} params cannot be used with ES|QL query/i + expect { plugin.register }.to raise_error LogStash::ConfigurationError, error_text end end + + describe "#query placeholder" do + let(:config) do + { + "hosts" => ["localhost:9200"], + "query_type" => "esql" + } + end + + context "when query placeholder doesn't exist in the query" do + let(:config) { + super() + .merge( + { + "query" => "FROM my-index", + "query_params" => { "a" => "b" }, + }) + } + + it "doesn't complain since not used" do + expect { plugin.send(:validate_esql_query_and_params!) }.not_to raise_error + end + end + + context "when illegal placeholders appear" do + let(:config) { + super() + .merge( + { + "query" => "FROM my-index | WHERE type = ?type", + "query_params" => { "1abcd_efg1" => "1", "$abcd_efg1" => 2, "type" => 3 }, + }) + } + it "raises a config error" do + message = 'Illegal ["1abcd_efg1", "$abcd_efg1"] placeholder names in `query_params`. A valid parameter name starts with a letter and contains letters, digits and underscores only;' + expect { plugin.register }.to raise_error LogStash::ConfigurationError, message + end + end + + context "when query placeholders and `query_params` do not match" do + let(:config) { + super() + .merge( + { + "query" => "FROM my-index | WHERE type = ?type", + "query_params" => {"b" => "c"}, + }) + } + it "raises a config error" do + expect { plugin.register }.to raise_error LogStash::ConfigurationError, /Placeholder type not found in query/ + end + end + + context "when `query_params` is an Array contains {key => val} entries" do + let(:config) { + super() + .merge( + { + "query" => "FROM my-index", + "query_params" => [{ "a" => "b" }, { "c" => "[b]" }, { "e" => 1 }, { "f" => "[g]" }], + }) + } + + it "doesn't complain since not used" do + expect { plugin.send(:validate_esql_query_and_params!) }.not_to raise_error + expect(plugin.query_params).to eq({ "a" => "b", "c" => "[b]", "e" => 1, "f" => "[g]" }) + end + end + + context "when `query_params` is a Hash" do + let(:config) { + super() + .merge( + { + "query" => "FROM my-index", + "query_params" => { "a" => "b", "c" => "[b]", "e" => 1, "f" => "[g]" }, + }) + } + + it "doesn't complain since not used" do + expect { plugin.send(:validate_esql_query_and_params!) }.not_to raise_error + expect(plugin.query_params).to eq({ "a" => "b", "c" => "[b]", "e" => 1, "f" => "[g]" }) + end + end + end if LOGSTASH_VERSION >= '8.17.4' end def extract_transport(client) @@ -897,13 +707,6 @@ def extract_transport(client) client.transport.respond_to?(:transport) ? client.transport.transport : client.transport end - def elastic_ruby_v8_client_available? - Elasticsearch::Transport - false - rescue NameError # NameError: uninitialized constant Elasticsearch::Transport if Elastic Ruby client is not available - true - end - class MockResponse attr_reader :code, :headers diff --git a/spec/filters/integration/elasticsearch_esql_spec.rb b/spec/filters/integration/elasticsearch_esql_spec.rb new file mode 100644 index 0000000..32da05f --- /dev/null +++ b/spec/filters/integration/elasticsearch_esql_spec.rb @@ -0,0 +1,167 @@ +# encoding: utf-8 +require "logstash/devutils/rspec/spec_helper" +require "logstash/filters/elasticsearch" +require "elasticsearch" +require_relative "../../../spec/es_helper" + +describe LogStash::Filters::Elasticsearch, integration: true do + + ELASTIC_SECURITY_ENABLED = ENV['ELASTIC_SECURITY_ENABLED'].eql? 'true' + SECURE_INTEGRATION = ENV['SECURE_INTEGRATION'].eql? 'true' + ES_HOSTS = ["http#{SECURE_INTEGRATION ? 's' : nil}://#{ESHelper.get_host_port}"] + CA_PATH = File.expand_path('../fixtures/test_certs/ca.crt', File.dirname(__FILE__)) + + let(:plugin) { described_class.new(config) } + let(:es_index) { "es-filter-plugin-esql-integration-#{rand(1000)}" } + let(:test_documents) do + [ + { "message" => "test message 1", "type" => "a", "count" => 1 }, + { "message" => "test message 2", "type" => "a", "count" => 2 }, + { "message" => "test message 3", "type" => "b", "count" => 3 }, + { "message" => "test message 4", "type" => "b", "count" => 4 }, + { "message" => "test message 5", "type" => "c", "count" => 5 }, + { "message" => "odd test message", "type" => "t" } + ] + end + + let(:base_config) do + { + "query_type" => "esql", + "hosts" => ES_HOSTS, + "ssl_enabled" => SECURE_INTEGRATION + } + end + + let(:credentials) do + if SECURE_INTEGRATION + { 'user' => 'tests', 'password' => 'Tests123' } + else + { 'user' => 'elastic', 'password' => ENV['ELASTIC_PASSWORD'] } + end + end + + let(:config) do + config = ELASTIC_SECURITY_ENABLED ? base_config.merge(credentials) : base_config + config = { 'ssl_certificate_authorities' => CA_PATH }.merge(config) if SECURE_INTEGRATION + config + end + + let(:event) { LogStash::Event.new({}) } + + def es_client + @es_client ||= begin + user = SECURE_INTEGRATION ? 'tests' : 'elastic' + password = SECURE_INTEGRATION ? 'Tests123' : ENV['ELASTIC_PASSWORD'] + + es_client_config = { hosts: ES_HOSTS } + es_client_config = es_client_config.merge({ user: user, password: password }) if ELASTIC_SECURITY_ENABLED || SECURE_INTEGRATION + es_client_config = es_client_config.merge({ transport_options: { ssl: { ca_path: CA_PATH, verify: false }}}) if SECURE_INTEGRATION + + Elasticsearch::Client.new(es_client_config) + end + end + + before(:all) do + is_ls_with_esql_supported_client = Gem::Version.create(LOGSTASH_VERSION) >= Gem::Version.create(LogStash::Filters::Elasticsearch::LS_ESQL_SUPPORT_VERSION) + # Skip tests if an ES version doesn't support ES|QL + skip "LS version does not have ES client which supports ES|QL" unless is_ls_with_esql_supported_client + + es_version_info = es_client.info["version"] + es_gem_version = Gem::Version.create(es_version_info["number"]) + skip "ES version does not support ES|QL" if es_gem_version.nil? || es_gem_version < Gem::Version.create(LogStash::Filters::Elasticsearch::ES_ESQL_SUPPORT_VERSION) + end + + before(:each) do + # Create index with test documents + es_client.indices.create(index: es_index, body: {}) unless es_client.indices.exists?(index: es_index) + + test_documents.each do |doc| + es_client.index(index: es_index, body: doc, refresh: true) + end + end + + after(:each) do + es_client.indices.delete(index: es_index) if es_client.indices.exists?(index: es_index) + end + + describe "run ES|QL queries" do + + before do + stub_const("LOGSTASH_VERSION", LogStash::Filters::Elasticsearch::LS_ESQL_SUPPORT_VERSION) + end + + before(:each) do + plugin.register + end + + shared_examples "ESQL query execution" do |expected_count, fields| + it "processes the event" do + plugin.filter(event) + expect(event.get("[@metadata][total_values]")).to eq(expected_count) + fields&.each do | field | + expect(event.get(field)).not_to be(nil) + end + end + end + + describe "with simple FROM query with LIMIT" do + let(:config) do + super().merge("query" => "FROM #{es_index} | LIMIT 99") + end + + include_examples "ESQL query execution", 6 + end + + describe "with simple FROM and WHERE query combinations" do + let(:config) do + super().merge("query" => "FROM #{es_index} | WHERE type==\"b\" | LIMIT 99") + end + + include_examples "ESQL query execution", 2 + end + + describe "with query params" do + let(:config) do + super().merge("query" => "FROM #{es_index} | WHERE type==?type", "query_params" => { "type" => "b" }) + end + + include_examples "ESQL query execution", 2 + end + + describe "when invalid query used" do + let(:config) do + super().merge("query" => "FROM undefined index | LIMIT 1") + end + + it "tags on failure" do + plugin.filter(event) + expect(event.to_hash["tags"]).to include("_elasticsearch_lookup_failure") + end + end + + describe "when field enrichment requested" do + let(:config) do + super().merge("query" => "FROM #{es_index} | WHERE type==\"b\" | LIMIT 99") + end + + include_examples "ESQL query execution", 2, %w[message count] + end + + describe "when non-exist field value appear" do + let(:config) do + super().merge("query" => "FROM #{es_index}", "target" => "target_field") + end + + it "processes the event" do + plugin.filter(event) + expect(event.get("[@metadata][total_values]")).to eq(6) + expect(event.get("target_field").size).to eq(6) + values = event.get("target_field") + counts = values.count { |entry| entry.key?("count") } + messages = values.count { |entry| entry.key?("message") } + expect(counts).to eq(5) + expect(messages).to eq(6) + end + end + end +end \ No newline at end of file