• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

fluent / fluent-plugin-opensearch / 4080784654

pending completion
4080784654

push

github

Takuro Ashie
Replace obsolete File.exists? with File.exist?

1129 of 1227 relevant lines covered (92.01%)

157.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.56
/lib/fluent/plugin/out_opensearch.rb
1
# SPDX-License-Identifier: Apache-2.0
2
#
3
# The fluent-plugin-opensearch Contributors require contributions made to
4
# this file be licensed under the Apache-2.0 license or a
5
# compatible open source license.
6
#
7
# Modifications Copyright fluent-plugin-opensearch Contributors. See
8
# GitHub history for details.
9
#
10
# Licensed to Uken Inc. under one or more contributor
11
# license agreements. See the NOTICE file distributed with
12
# this work for additional information regarding copyright
13
# ownership. Uken Inc. licenses this file to you under
14
# the Apache License, Version 2.0 (the "License"); you may
15
# not use this file except in compliance with the License.
16
# You may obtain a copy of the License at
17
#
18
#   http://www.apache.org/licenses/LICENSE-2.0
19
#
20
# Unless required by applicable law or agreed to in writing,
21
# software distributed under the License is distributed on an
22
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
23
# KIND, either express or implied.  See the License for the
24
# specific language governing permissions and limitations
25
# under the License.
26

27
require 'date'
1✔
28
require 'excon'
1✔
29
require 'opensearch'
1✔
30
require 'set'
1✔
31
require 'json'
1✔
32
require 'uri'
1✔
33
require 'base64'
1✔
34
begin
35
  require 'strptime'
1✔
36
rescue LoadError
37
end
38
require 'resolv'
1✔
39

40
require 'fluent/plugin/output'
1✔
41
require 'fluent/event'
1✔
42
require 'fluent/error'
1✔
43
require 'fluent/time'
1✔
44
require 'fluent/unique_id'
1✔
45
require 'fluent/log-ext'
1✔
46
require 'zlib'
1✔
47
require_relative 'opensearch_constants'
1✔
48
require_relative 'opensearch_error'
1✔
49
require_relative 'opensearch_error_handler'
1✔
50
require_relative 'opensearch_index_template'
1✔
51
require_relative 'opensearch_tls'
1✔
52
require_relative 'opensearch_fallback_selector'
1✔
53
begin
54
  require_relative 'oj_serializer'
1✔
55
rescue LoadError
56
end
57
require 'aws-sdk-core'
1✔
58
require 'faraday_middleware/aws_sigv4'
1✔
59

60
module Fluent::Plugin
1✔
61
  class OpenSearchOutput < Output
1✔
62
    class RecoverableRequestFailure < StandardError; end
1✔
63
    class UnrecoverableRequestFailure < Fluent::UnrecoverableError; end
1✔
64
    class RetryStreamEmitFailure < StandardError; end
1✔
65

66
    # MissingIdFieldError is raised for records that do not
67
    # include the field for the unique record identifier
68
    class MissingIdFieldError < StandardError; end
1✔
69

70
    # RetryStreamError privides a stream to be
71
    # put back in the pipeline for cases where a bulk request
72
    # failed (e.g some records succeed while others failed)
73
    class RetryStreamError < StandardError
1✔
74
      attr_reader :retry_stream
1✔
75
      def initialize(retry_stream)
1✔
76
        @retry_stream = retry_stream
8✔
77
      end
78
    end
79

80
    RequestInfo = Struct.new(:host, :index, :target_index, :alias)
1✔
81

82
    attr_reader :template_names
1✔
83
    attr_reader :ssl_version_options
1✔
84
    attr_reader :compressable_connection
1✔
85

86
    helpers :event_emitter, :compat_parameters, :record_accessor, :timer
1✔
87

88
    Fluent::Plugin.register_output('opensearch', self)
1✔
89

90
    DEFAULT_BUFFER_TYPE = "memory"
1✔
91
    DEFAULT_OPENSEARCH_VERSION = 1
1✔
92
    DEFAULT_TYPE_NAME = "_doc".freeze
1✔
93
    DEFAULT_RELOAD_AFTER = -1
1✔
94
    DEFAULT_TARGET_BULK_BYTES = -1
1✔
95
    DEFAULT_POLICY_ID = "logstash-policy"
1✔
96

97
    config_param :host, :string,  :default => 'localhost'
1✔
98
    config_param :port, :integer, :default => 9200
1✔
99
    config_param :user, :string, :default => nil
1✔
100
    config_param :password, :string, :default => nil, :secret => true
1✔
101
    config_param :path, :string, :default => nil
1✔
102
    config_param :scheme, :enum, :list => [:https, :http], :default => :http
1✔
103
    config_param :hosts, :string, :default => nil
1✔
104
    config_param :target_index_key, :string, :default => nil
1✔
105
    config_param :time_key_format, :string, :default => nil
1✔
106
    config_param :time_precision, :integer, :default => 9
1✔
107
    config_param :include_timestamp, :bool, :default => false
1✔
108
    config_param :logstash_format, :bool, :default => false
1✔
109
    config_param :logstash_prefix, :string, :default => "logstash"
1✔
110
    config_param :logstash_prefix_separator, :string, :default => '-'
1✔
111
    config_param :logstash_dateformat, :string, :default => "%Y.%m.%d"
1✔
112
    config_param :utc_index, :bool, :default => true
1✔
113
    config_param :suppress_type_name, :bool, :default => false
1✔
114
    config_param :index_name, :string, :default => "fluentd"
1✔
115
    config_param :id_key, :string, :default => nil
1✔
116
    config_param :write_operation, :string, :default => "index"
1✔
117
    config_param :parent_key, :string, :default => nil
1✔
118
    config_param :routing_key, :string, :default => nil
1✔
119
    config_param :request_timeout, :time, :default => 5
1✔
120
    config_param :reload_connections, :bool, :default => true
1✔
121
    config_param :reload_on_failure, :bool, :default => false
1✔
122
    config_param :retry_tag, :string, :default=>nil
1✔
123
    config_param :resurrect_after, :time, :default => 60
1✔
124
    config_param :time_key, :string, :default => nil
1✔
125
    config_param :time_key_exclude_timestamp, :bool, :default => false
1✔
126
    config_param :ssl_verify , :bool, :default => true
1✔
127
    config_param :client_key, :string, :default => nil
1✔
128
    config_param :client_cert, :string, :default => nil
1✔
129
    config_param :client_key_pass, :string, :default => nil, :secret => true
1✔
130
    config_param :ca_file, :string, :default => nil
1✔
131
    config_param :remove_keys, :string, :default => nil
1✔
132
    config_param :remove_keys_on_update, :string, :default => ""
1✔
133
    config_param :remove_keys_on_update_key, :string, :default => nil
1✔
134
    config_param :flatten_hashes, :bool, :default => false
1✔
135
    config_param :flatten_hashes_separator, :string, :default => "_"
1✔
136
    config_param :template_name, :string, :default => nil
1✔
137
    config_param :template_file, :string, :default => nil
1✔
138
    config_param :template_overwrite, :bool, :default => false
1✔
139
    config_param :customize_template, :hash, :default => nil
1✔
140
    config_param :index_date_pattern, :string, :default => "now/d"
1✔
141
    config_param :index_separator, :string, :default => "-"
1✔
142
    config_param :application_name, :string, :default => "default"
1✔
143
    config_param :templates, :hash, :default => nil
1✔
144
    config_param :max_retry_putting_template, :integer, :default => 10
1✔
145
    config_param :fail_on_putting_template_retry_exceed, :bool, :default => true
1✔
146
    config_param :fail_on_detecting_os_version_retry_exceed, :bool, :default => true
1✔
147
    config_param :max_retry_get_os_version, :integer, :default => 15
1✔
148
    config_param :include_tag_key, :bool, :default => false
1✔
149
    config_param :tag_key, :string, :default => 'tag'
1✔
150
    config_param :time_parse_error_tag, :string, :default => 'opensearch_plugin.output.time.error'
1✔
151
    config_param :reconnect_on_error, :bool, :default => false
1✔
152
    config_param :pipeline, :string, :default => nil
1✔
153
    config_param :with_transporter_log, :bool, :default => false
1✔
154
    config_param :emit_error_for_missing_id, :bool, :default => false
1✔
155
    config_param :sniffer_class_name, :string, :default => nil
1✔
156
    config_param :selector_class_name, :string, :default => nil
1✔
157
    config_param :reload_after, :integer, :default => DEFAULT_RELOAD_AFTER
1✔
158
    config_param :include_index_in_url, :bool, :default => false
1✔
159
    config_param :http_backend, :enum, list: [:excon, :typhoeus], :default => :excon
1✔
160
    config_param :http_backend_excon_nonblock, :bool, :default => true
1✔
161
    config_param :validate_client_version, :bool, :default => false
1✔
162
    config_param :prefer_oj_serializer, :bool, :default => false
1✔
163
    config_param :unrecoverable_error_types, :array, :default => ["out_of_memory_error", "rejected_execution_exception"]
1✔
164
    config_param :unrecoverable_record_types, :array, :default => ["json_parse_exception"]
1✔
165
    config_param :emit_error_label_event, :bool, :default => true
1✔
166
    config_param :verify_os_version_at_startup, :bool, :default => true
1✔
167
    config_param :default_opensearch_version, :integer, :default => DEFAULT_OPENSEARCH_VERSION
1✔
168
    config_param :log_os_400_reason, :bool, :default => false
1✔
169
    config_param :custom_headers, :hash, :default => {}
1✔
170
    config_param :suppress_doc_wrap, :bool, :default => false
1✔
171
    config_param :ignore_exceptions, :array, :default => [], value_type: :string, :desc => "Ignorable exception list"
1✔
172
    config_param :exception_backup, :bool, :default => true, :desc => "Chunk backup flag when ignore exception occured"
1✔
173
    config_param :bulk_message_request_threshold, :size, :default => DEFAULT_TARGET_BULK_BYTES
1✔
174
    config_param :compression_level, :enum, list: [:no_compression, :best_speed, :best_compression, :default_compression], :default => :no_compression
1✔
175
    config_param :truncate_caches_interval, :time, :default => nil
1✔
176
    config_param :use_legacy_template, :bool, :default => true
1✔
177
    config_param :catch_transport_exception_on_retry, :bool, :default => true
1✔
178
    config_param :target_index_affinity, :bool, :default => false
1✔
179

180
    config_section :metadata, param_name: :metainfo, multi: false do
1✔
181
      config_param :include_chunk_id, :bool, :default => false
1✔
182
      config_param :chunk_id_key, :string, :default => "chunk_id".freeze
1✔
183
    end
184

185
    config_section :endpoint, multi: false do
1✔
186
      config_param :region, :string
1✔
187
      config_param :url do |c|
1✔
188
        c.chomp("/")
3✔
189
      end
190
      config_param :access_key_id, :string, :default => ""
1✔
191
      config_param :secret_access_key, :string, :default => "", secret: true
1✔
192
      config_param :assume_role_arn, :string, :default => nil
1✔
193
      config_param :ecs_container_credentials_relative_uri, :string, :default => nil #Set with AWS_CONTAINER_CREDENTIALS_RELATIVE_URI environment variable value
1✔
194
      config_param :assume_role_session_name, :string, :default => "fluentd"
1✔
195
      config_param :assume_role_web_identity_token_file, :string, :default => nil
1✔
196
      config_param :sts_credentials_region, :string, :default => nil
1✔
197
      config_param :refresh_credentials_interval, :time, :default => "5h"
1✔
198
      config_param :aws_service_name, :enum, list: [:es, :aoss], :default => :es
1✔
199
    end
200

201
    config_section :buffer do
1✔
202
      config_set_default :@type, DEFAULT_BUFFER_TYPE
1✔
203
      config_set_default :chunk_keys, ['tag']
1✔
204
      config_set_default :timekey_use_utc, true
1✔
205
    end
206

207
    include Fluent::OpenSearchIndexTemplate
1✔
208
    include Fluent::Plugin::OpenSearchConstants
1✔
209
    include Fluent::Plugin::OpenSearchTLS
1✔
210

211
    def initialize
1✔
212
      super
627✔
213
    end
214

215
    ######################################################################################################
216
    # This creating AWS credentials code part is heavily based on fluent-plugin-aws-elasticsearch-service:
217
    # https://github.com/atomita/fluent-plugin-aws-elasticsearch-service/blob/master/lib/fluent/plugin/out_aws-elasticsearch-service.rb#L73-L134
218
    ######################################################################################################
219
    def aws_credentials(conf)
1✔
220
      credentials = nil
3✔
221
      unless conf[:access_key_id].empty? || conf[:secret_access_key].empty?
3✔
222
        credentials = Aws::Credentials.new(conf[:access_key_id], conf[:secret_access_key])
3✔
223
      else
224
        if conf[:assume_role_arn].nil?
×
225
          aws_container_credentials_relative_uri = conf[:ecs_container_credentials_relative_uri] || ENV["AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"]
×
226
          if aws_container_credentials_relative_uri.nil?
×
227
            credentials = Aws::SharedCredentials.new({retries: 2}).credentials rescue nil
×
228
            credentials ||= Aws::InstanceProfileCredentials.new.credentials rescue nil
×
229
            credentials ||= Aws::ECSCredentials.new.credentials
×
230
          else
231
            credentials = Aws::ECSCredentials.new({
×
232
                            credential_path: aws_container_credentials_relative_uri
233
                          }).credentials
234
          end
235
        else
236
          if conf[:assume_role_web_identity_token_file].nil?
×
237
            credentials = Aws::AssumeRoleCredentials.new({
×
238
                            role_arn: conf[:assume_role_arn],
239
                            role_session_name: conf[:assume_role_session_name],
240
                            region: sts_creds_region(conf)
241
                          }).credentials
242
          else
243
            credentials = Aws::AssumeRoleWebIdentityCredentials.new({
×
244
                            role_arn: conf[:assume_role_arn],
245
                            web_identity_token_file: conf[:assume_role_web_identity_token_file],
246
                            region: sts_creds_region(conf)
247
                          }).credentials
248
          end
249
        end
250
      end
251
      raise "No valid AWS credentials found." unless credentials.set?
3✔
252

253
      credentials
3✔
254
    end
255

256
    def sts_creds_region(conf)
1✔
257
      conf[:sts_credentials_region] || conf[:region]
×
258
    end
259
    ###############################
260
    # AWS credential part is ended.
261
    ###############################
262

263
    def configure(conf)
1✔
264
      compat_parameters_convert(conf, :buffer)
422✔
265

266
      super
422✔
267

268
      if @endpoint
421✔
269
        # here overrides default value of reload_connections to false because
270
        # AWS Elasticsearch Service doesn't return addresses of nodes and Elasticsearch client
271
        # fails to reload connections properly. This ends up "temporarily failed to flush the buffer"
272
        # error repeating forever. See this discussion for details:
273
        # https://discuss.elastic.co/t/elasitcsearch-ruby-raises-cannot-get-new-connection-from-pool-error/36252
274
        @reload_connections = false
3✔
275
      end
276

277
      if placeholder_substitution_needed_for_template?
421✔
278
        # nop.
279
      elsif not @buffer_config.chunk_keys.include? "tag" and
372✔
280
        not @buffer_config.chunk_keys.include? "_index"
281
        raise Fluent::ConfigError, "'tag' or '_index' in chunk_keys is required."
2✔
282
      end
283
      @time_parser = create_time_parser
419✔
284
      @backend_options = backend_options
419✔
285
      @ssl_version_options = set_tls_minmax_version_config(@ssl_version, @ssl_max_version, @ssl_min_version)
419✔
286

287
      if @remove_keys
419✔
288
        @remove_keys = @remove_keys.split(/\s*,\s*/)
4✔
289
      end
290

291
      if @target_index_key && @target_index_key.is_a?(String)
419✔
292
        @target_index_key = @target_index_key.split '.'
6✔
293
      end
294

295
      if @remove_keys_on_update && @remove_keys_on_update.is_a?(String)
419✔
296
        @remove_keys_on_update = @remove_keys_on_update.split ','
419✔
297
      end
298

299
      raise Fluent::ConfigError, "'max_retry_putting_template' must be greater than or equal to zero." if @max_retry_putting_template < 0
419✔
300
      raise Fluent::ConfigError, "'max_retry_get_os_version' must be greater than or equal to zero." if @max_retry_get_os_version < 0
418✔
301

302
      # Dump log when using host placeholders and template features at same time.
303
      valid_host_placeholder = placeholder?(:host_placeholder, @host)
417✔
304
      if valid_host_placeholder && (@template_name && @template_file || @templates)
417✔
305
        if @verify_os_version_at_startup
5✔
306
          raise Fluent::ConfigError, "host placeholder, template installation, and verify OpenSearch version at startup are exclusive feature at same time. Please specify verify_os_version_at_startup as `false` when host placeholder and template installation are enabled."
1✔
307
        end
308
        log.info "host placeholder and template installation makes your OpenSearch cluster a bit slow down(beta)."
4✔
309
      end
310

311
      @template_names = []
416✔
312
      if !dry_run?
416✔
313
        if @template_name && @template_file
416✔
314
          if @logstash_format || placeholder_substitution_needed_for_template?
65✔
315
            class << self
6✔
316
              alias_method :template_installation, :template_installation_actual
6✔
317
            end
318
          else
319
            template_installation_actual(@template_name, @customize_template, @application_name, @index_name)
59✔
320
          end
321
        end
322
        if @templates
371✔
323
          retry_operate(@max_retry_putting_template,
8✔
324
                        @fail_on_putting_template_retry_exceed,
325
                        @catch_transport_exception_on_retry) do
326
            templates_hash_install(@templates, @template_overwrite)
8✔
327
          end
328
        end
329
      end
330

331
      @truncate_mutex = Mutex.new
369✔
332
      if @truncate_caches_interval
369✔
333
        timer_execute(:out_opensearch_truncate_caches, @truncate_caches_interval) do
×
334
          log.info('Clean up the indices and template names cache')
×
335

336
          @truncate_mutex.synchronize {
×
337
            @template_names.clear
×
338
          }
339
        end
340
      end
341
      # If AWS credentials is set, consider to expire credentials information forcibly before expired.
342
      @credential_mutex = Mutex.new
369✔
343
      if @endpoint
369✔
344
        @_aws_credentials = aws_credentials(@endpoint)
3✔
345

346
        if @endpoint.refresh_credentials_interval
3✔
347
          timer_execute(:out_opensearch_expire_credentials, @endpoint.refresh_credentials_interval) do
3✔
348
            log.debug('Recreate the AWS credentials')
×
349

350
            @credential_mutex.synchronize do
×
351
              @_os = nil
×
352
              @_aws_credentials = aws_credentials(@endpoint)
×
353
            end
354
          end
355
        end
356
      end
357

358
      @serializer_class = nil
369✔
359
      begin
360
        require 'oj'
369✔
361
        @dump_proc = Oj.method(:dump)
369✔
362
        if @prefer_oj_serializer
369✔
363
          @serializer_class = Fluent::Plugin::Serializer::Oj
×
364
          OpenSearch::API.settings[:serializer] = Fluent::Plugin::Serializer::Oj
×
365
        end
366
      rescue LoadError
367
        @dump_proc = Yajl.method(:dump)
×
368
      end
369

370
      raise Fluent::ConfigError, "`password` must be present if `user` is present" if @user && @password.nil?
369✔
371

372
      if @user && m = @user.match(/%{(?<user>.*)}/)
368✔
373
        @user = URI.encode_www_form_component(m["user"])
2✔
374
      end
375
      if @password && m = @password.match(/%{(?<password>.*)}/)
368✔
376
        @password = URI.encode_www_form_component(m["password"])
2✔
377
      end
378

379
      @transport_logger = nil
368✔
380
      if @with_transporter_log
368✔
381
        @transport_logger = log
2✔
382
        log_level = conf['@log_level'] || conf['log_level']
2✔
383
        log.warn "Consider to specify log_level with @log_level." unless log_level
2✔
384
      end
385
      # Specify @sniffer_class before calling #client.
386
      # #detect_os_major_version uses #client.
387
      @sniffer_class = nil
368✔
388
      begin
389
        @sniffer_class = Object.const_get(@sniffer_class_name) if @sniffer_class_name
368✔
390
      rescue Exception => ex
391
        raise Fluent::ConfigError, "Could not load sniffer class #{@sniffer_class_name}: #{ex}"
×
392
      end
393

394
      @selector_class = nil
368✔
395
      begin
396
        @selector_class = Object.const_get(@selector_class_name) if @selector_class_name
368✔
397
      rescue Exception => ex
398
        raise Fluent::ConfigError, "Could not load selector class #{@selector_class_name}: #{ex}"
×
399
      end
400

401
      @last_seen_major_version = if major_version = handle_last_seen_os_major_version
368✔
402
                                   major_version
360✔
403
                                 else
404
                                   @default_opensearch_version
5✔
405
                                 end
406

407
      if @validate_client_version && !dry_run?
365✔
408
        if @last_seen_major_version != client_library_version.to_i
2✔
409
          raise Fluent::ConfigError, <<-EOC
1✔
410
            Detected OpenSearch #{@last_seen_major_version} but you use OpenSearch client #{client_library_version}.
411
            Please consider to use #{@last_seen_major_version}.x series OpenSearch client.
412
          EOC
413
        end
414
      end
415

416
      if @last_seen_major_version >= 1
364✔
417
        case @ssl_version
364✔
418
        when :SSLv23, :TLSv1, :TLSv1_1
419
          if @scheme == :https
1✔
420
            log.warn "Detected OpenSearch 1.x or above and enabled insecure security:
1✔
421
                      You might have to specify `ssl_version TLSv1_2` in configuration."
422
          end
423
        end
424
      end
425

426
      if @ssl_version && @scheme == :https
364✔
427
        if !@http_backend_excon_nonblock
33✔
428
          log.warn "TLS handshake will be stucked with block connection.
1✔
429
                    Consider to set `http_backend_excon_nonblock` as true"
430
        end
431
      end
432

433
      # Consider missing the prefix of "$." in nested key specifiers.
434
      @id_key = convert_compat_id_key(@id_key) if @id_key
364✔
435
      @parent_key = convert_compat_id_key(@parent_key) if @parent_key
364✔
436
      @routing_key = convert_compat_id_key(@routing_key) if @routing_key
364✔
437

438
      @routing_key_name = configure_routing_key_name
364✔
439
      @meta_config_map = create_meta_config_map
364✔
440
      @current_config = nil
364✔
441
      @compressable_connection = false
364✔
442

443
      @ignore_exception_classes = @ignore_exceptions.map do |exception|
364✔
444
        unless Object.const_defined?(exception)
3✔
445
          log.warn "Cannot find class #{exception}. Will ignore it."
×
446

447
          nil
×
448
        else
449
          Object.const_get(exception)
3✔
450
        end
451
      end.compact
452

453
      if @bulk_message_request_threshold < 0
364✔
454
        class << self
363✔
455
          alias_method :split_request?, :split_request_size_uncheck?
363✔
456
        end
457
      else
458
        class << self
1✔
459
          alias_method :split_request?, :split_request_size_check?
1✔
460
        end
461
      end
462
    end
463

464
    def dry_run?
1✔
465
      if Fluent::Engine.respond_to?(:dry_run_mode)
782✔
466
        Fluent::Engine.dry_run_mode
×
467
      elsif Fluent::Engine.respond_to?(:supervisor_mode)
782✔
468
        Fluent::Engine.supervisor_mode
782✔
469
      end
470
    end
471

472
    def placeholder?(name, param)
1✔
473
      placeholder_validities = []
3,336✔
474
      placeholder_validators(name, param).each do |v|
3,336✔
475
        begin
476
          v.validate!
3,973✔
477
          placeholder_validities << true
98✔
478
        rescue Fluent::ConfigError => e
479
          log.debug("'#{name} #{param}' is tested built-in placeholder(s) but there is no valid placeholder(s). error: #{e}")
3,875✔
480
          placeholder_validities << false
3,875✔
481
        end
482
      end
483
      placeholder_validities.include?(true)
3,336✔
484
    end
485

486
    def emit_error_label_event?
1✔
487
      !!@emit_error_label_event
5✔
488
    end
489

490
    def compression
1✔
491
      !(@compression_level == :no_compression)
266✔
492
    end
493

494
    def compression_strategy
1✔
495
      case @compression_level
4✔
496
      when :default_compression
497
        Zlib::DEFAULT_COMPRESSION
1✔
498
      when :best_compression
499
        Zlib::BEST_COMPRESSION
2✔
500
      when :best_speed
501
        Zlib::BEST_SPEED
1✔
502
      else
503
        Zlib::NO_COMPRESSION
×
504
      end
505
    end
506

507
    def backend_options
1✔
508
      case @http_backend
419✔
509
      when :excon
510
        { client_key: @client_key, client_cert: @client_cert, client_key_pass: @client_key_pass, nonblock: @http_backend_excon_nonblock }
419✔
511
      when :typhoeus
512
        require 'typhoeus'
×
513
        { sslkey: @client_key, sslcert: @client_cert, keypasswd: @client_key_pass }
×
514
      end
515
    rescue LoadError => ex
516
      log.error_backtrace(ex.backtrace)
×
517
      raise Fluent::ConfigError, "You must install #{@http_backend} gem. Exception: #{ex}"
×
518
    end
519

520
    def handle_last_seen_os_major_version
1✔
521
      if @verify_os_version_at_startup && !dry_run?
368✔
522
        retry_operate(@max_retry_get_os_version,
364✔
523
                      @fail_on_detecting_os_version_retry_exceed,
524
                      @catch_transport_exception_on_retry) do
525
          detect_os_major_version
369✔
526
        end
527
      else
528
        nil
529
      end
530
    end
531

532
    def detect_os_major_version
1✔
533
      @_os_info ||= client.info
2✔
534
      begin
535
        unless version = @_os_info.dig("version", "number")
1✔
536
          version = @default_opensearch_version
×
537
        end
538
      rescue NoMethodError => e
539
        log.warn "#{@_os_info} can not dig version information. Assuming OpenSearch #{@default_opensearch_version}", error: e
×
540
        version = @default_opensearch_version
×
541
      end
542
      version.to_i
1✔
543
    end
544

545
    def client_library_version
1✔
546
      OpenSearch::VERSION
×
547
    end
548

549
    def configure_routing_key_name
1✔
550
      'routing'.freeze
364✔
551
    end
552

553
    def convert_compat_id_key(key)
1✔
554
      if key.include?('.') && !key.start_with?('$[')
33✔
555
        key = "$.#{key}" unless key.start_with?('$.')
6✔
556
      end
557
      key
33✔
558
    end
559

560
    def create_meta_config_map
1✔
561
      result = []
364✔
562
      result << [record_accessor_create(@id_key), '_id'] if @id_key
364✔
563
      result << [record_accessor_create(@parent_key), '_parent'] if @parent_key
364✔
564
      result << [record_accessor_create(@routing_key), @routing_key_name] if @routing_key
364✔
565
      result
364✔
566
    end
567

568
    # once fluent v0.14 is released we might be able to use
569
    # Fluent::Parser::TimeParser, but it doesn't quite do what we want - if gives
570
    # [sec,nsec] where as we want something we can call `strftime` on...
571
    def create_time_parser
1✔
572
      if @time_key_format
419✔
573
        begin
574
          # Strptime doesn't support all formats, but for those it does it's
575
          # blazingly fast.
576
          strptime = Strptime.new(@time_key_format)
9✔
577
          Proc.new { |value|
8✔
578
            value = convert_numeric_time_into_string(value, @time_key_format) if value.is_a?(Numeric)
8✔
579
            strptime.exec(value).to_datetime
8✔
580
          }
581
        rescue
582
          # Can happen if Strptime doesn't recognize the format; or
583
          # if strptime couldn't be required (because it's not installed -- it's
584
          # ruby 2 only)
585
          Proc.new { |value|
1✔
586
            value = convert_numeric_time_into_string(value, @time_key_format) if value.is_a?(Numeric)
1✔
587
            DateTime.strptime(value, @time_key_format)
1✔
588
          }
589
        end
590
      else
591
        Proc.new { |value|
410✔
592
          value = convert_numeric_time_into_string(value) if value.is_a?(Numeric)
1,013✔
593
          DateTime.parse(value)
1,013✔
594
        }
595
      end
596
    end
597

598
    def convert_numeric_time_into_string(numeric_time, time_key_format = "%Y-%m-%d %H:%M:%S.%N%z")
1✔
599
      numeric_time_parser = Fluent::NumericTimeParser.new(:float)
2✔
600
      Time.at(numeric_time_parser.parse(numeric_time).to_r).strftime(time_key_format)
2✔
601
    end
602

603
    def parse_time(value, event_time, tag)
1✔
604
      @time_parser.call(value)
1,022✔
605
    rescue => e
606
      if emit_error_label_event?
2✔
607
        router.emit_error_event(@time_parse_error_tag, Fluent::Engine.now, {'tag' => tag, 'time' => event_time, 'format' => @time_key_format, 'value' => value}, e)
2✔
608
      end
609
      return Time.at(event_time).to_datetime
2✔
610
    end
611

612
    def client(host = nil, compress_connection = false)
1✔
613
      # check here to see if we already have a client connection for the given host
614
      connection_options = get_connection_options(host)
402✔
615

616
      @_os = nil unless is_existing_connection(connection_options[:hosts])
402✔
617
      @_os = nil unless @compressable_connection == compress_connection
402✔
618

619
      @_os ||= begin
402✔
620
        @compressable_connection = compress_connection
217✔
621
        @current_config = connection_options[:hosts].clone
217✔
622
        adapter_conf = if @endpoint
217✔
623
                         lambda do |f|
×
624
                           f.request(
×
625
                             :aws_sigv4,
626
                             service: @endpoint.aws_service_name.to_s,
627
                             region: @endpoint.region,
628
                             credentials: @_aws_credentials,
629
                           )
630

631
                           f.adapter @http_backend, @backend_options
×
632
                         end
633
                       else
634
                         lambda {|f| f.adapter @http_backend, @backend_options }
445✔
635
                       end
636

637
        local_reload_connections = @reload_connections
217✔
638
        if local_reload_connections && @reload_after > DEFAULT_RELOAD_AFTER
217✔
639
          local_reload_connections = @reload_after
2✔
640
        end
641

642
        gzip_headers = if compress_connection
217✔
643
                         {'Content-Encoding' => 'gzip'}
3✔
644
                       else
645
                         {}
214✔
646
                       end
647
        headers = {}.merge(@custom_headers)
217✔
648
                    .merge(gzip_headers)
649
        ssl_options = { verify: @ssl_verify, ca_file: @ca_file}.merge(@ssl_version_options)
217✔
650

651
        transport = OpenSearch::Transport::Transport::HTTP::Faraday.new(connection_options.merge(
217✔
652
                                                                            options: {
653
                                                                              reload_connections: local_reload_connections,
654
                                                                              reload_on_failure: @reload_on_failure,
655
                                                                              resurrect_after: @resurrect_after,
656
                                                                              logger: @transport_logger,
657
                                                                              transport_options: {
658
                                                                                headers: headers,
659
                                                                                request: { timeout: @request_timeout },
660
                                                                                ssl: ssl_options,
661
                                                                              },
662
                                                                              http: {
663
                                                                                user: @user,
664
                                                                                password: @password,
665
                                                                                scheme: @scheme
666
                                                                              },
667
                                                                              sniffer_class: @sniffer_class,
668
                                                                              serializer_class: @serializer_class,
669
                                                                              selector_class: @selector_class,
670
                                                                              compression: compress_connection,
671
                                                                            }), &adapter_conf)
672
        OpenSearch::Client.new transport: transport
217✔
673
      end
674
    end
675

676
    def get_escaped_userinfo(host_str)
1✔
677
      if m = host_str.match(/(?<scheme>.*)%{(?<user>.*)}:%{(?<password>.*)}(?<path>@.*)/)
23✔
678
        m["scheme"] +
4✔
679
          URI.encode_www_form_component(m["user"]) +
680
          ':' +
681
          URI.encode_www_form_component(m["password"]) +
682
          m["path"]
683
      else
684
        host_str
19✔
685
      end
686
    end
687

688
    def get_connection_options(con_host=nil)
1✔
689

690
      hosts = if @endpoint # For AWS OpenSearch Service
430✔
691
        uri = URI(@endpoint.url)
×
692
        host = %w(user password path).inject(host: uri.host, port: uri.port, scheme: uri.scheme) do |hash, key|
×
693
          hash[key.to_sym] = uri.public_send(key) unless uri.public_send(key).nil? || uri.public_send(key) == ''
×
694
          hash
×
695
        end
696
        [host]
×
697
      elsif con_host || @hosts
430✔
698
        (con_host || @hosts).split(',').map do |host_str|
193✔
699
          # Support legacy hosts format host:port,host:port,host:port...
700
          if host_str.match(%r{^[^:]+(\:\d+)?$})
221✔
701
            {
702
              host:   host_str.split(':')[0],
198✔
703
              port:   (host_str.split(':')[1] || @port).to_i,
198✔
704
              scheme: @scheme.to_s
705
            }
706
          else
707
            # New hosts format expects URLs such as http://logs.foo.com,https://john:pass@logs2.foo.com/elastic
708
            uri = URI(get_escaped_userinfo(host_str))
23✔
709
            %w(user password path).inject(host: uri.host, port: uri.port, scheme: uri.scheme) do |hash, key|
22✔
710
              hash[key.to_sym] = uri.public_send(key) unless uri.public_send(key).nil? || uri.public_send(key) == ''
66✔
711
              hash
66✔
712
            end
713
          end
714
        end.compact
715
      else
716
        if Resolv::IPv6::Regex.match(@host)
237✔
717
          [{host: "[#{@host}]", scheme: @scheme.to_s, port: @port}]
4✔
718
        else
719
          [{host: @host, port: @port, scheme: @scheme.to_s}]
233✔
720
        end
721
      end.each do |host|
722
        host.merge!(user: @user, password: @password) if !host[:user] && @user
457✔
723
        host.merge!(path: @path) if !host[:path] && @path
457✔
724
      end
725

726
      {
727
        hosts: hosts
429✔
728
      }
729
    end
730

731
    def connection_options_description(con_host=nil)
1✔
732
      get_connection_options(con_host)[:hosts].map do |host_info|
6✔
733
        attributes = host_info.dup
6✔
734
        attributes[:password] = 'obfuscated' if attributes.has_key?(:password)
6✔
735
        attributes.inspect
6✔
736
      end.join(', ')
737
    end
738

739
    # append_record_to_messages adds a record to the bulk message
740
    # payload to be submitted to OpenSearch.  Records that do
741
    # not include '_id' field are skipped when 'write_operation'
742
    # is configured for 'create' or 'update'
743
    #
744
    # returns 'true' if record was appended to the bulk message
745
    #         and 'false' otherwise
746
    def append_record_to_messages(op, meta, header, record, msgs)
1✔
747
      case op
2,164✔
748
      when UPDATE_OP, UPSERT_OP
749
        if meta.has_key?(ID_FIELD)
21✔
750
          header[UPDATE_OP] = meta
19✔
751
          msgs << @dump_proc.call(header) << BODY_DELIMITER
19✔
752
          msgs << @dump_proc.call(update_body(record, op)) << BODY_DELIMITER
19✔
753
          return true
19✔
754
        end
755
      when CREATE_OP
756
        if meta.has_key?(ID_FIELD)
18✔
757
          header[CREATE_OP] = meta
9✔
758
          msgs << @dump_proc.call(header) << BODY_DELIMITER
9✔
759
          msgs << @dump_proc.call(record) << BODY_DELIMITER
9✔
760
          return true
9✔
761
        end
762
      when INDEX_OP
763
        header[INDEX_OP] = meta
2,125✔
764
        msgs << @dump_proc.call(header) << BODY_DELIMITER
2,125✔
765
        msgs << @dump_proc.call(record) << BODY_DELIMITER
2,125✔
766
        return true
2,125✔
767
      end
768
      return false
11✔
769
    end
770

771
    def update_body(record, op)
1✔
772
      update = remove_keys(record)
19✔
773
      if @suppress_doc_wrap
19✔
774
        return update
4✔
775
      end
776
      body = {"doc".freeze => update}
15✔
777
      if op == UPSERT_OP
15✔
778
        if update == record
7✔
779
          body["doc_as_upsert".freeze] = true
2✔
780
        else
781
          body[UPSERT_OP] = record
5✔
782
        end
783
      end
784
      body
15✔
785
    end
786

787
    def remove_keys(record)
1✔
788
      keys = record[@remove_keys_on_update_key] || @remove_keys_on_update || []
19✔
789
      record.delete(@remove_keys_on_update_key)
19✔
790
      return record unless keys.any?
19✔
791
      record = record.dup
6✔
792
      keys.each { |key| record.delete(key) }
13✔
793
      record
6✔
794
    end
795

796
    def flatten_record(record, prefix=[])
1✔
797
      ret = {}
4✔
798
      if record.is_a? Hash
4✔
799
        record.each { |key, value|
2✔
800
          ret.merge! flatten_record(value, prefix + [key.to_s])
3✔
801
        }
802
      elsif record.is_a? Array
2✔
803
        # Don't mess with arrays, leave them unprocessed
804
        ret.merge!({prefix.join(@flatten_hashes_separator) => record})
1✔
805
      else
806
        return {prefix.join(@flatten_hashes_separator) => record}
1✔
807
      end
808
      ret
3✔
809
    end
810

811
    def expand_placeholders(chunk)
1✔
812
      logstash_prefix = extract_placeholders(@logstash_prefix, chunk)
134✔
813
      logstash_dateformat = extract_placeholders(@logstash_dateformat, chunk)
134✔
814
      index_name = extract_placeholders(@index_name, chunk)
134✔
815
      if @template_name
134✔
816
        template_name = extract_placeholders(@template_name, chunk)
6✔
817
      else
818
        template_name = nil
128✔
819
      end
820
      if @customize_template
134✔
821
        customize_template = @customize_template.each_with_object({}) { |(key, value), hash| hash[key] = extract_placeholders(value, chunk) }
12✔
822
      else
823
        customize_template = nil
130✔
824
      end
825
      if @application_name
134✔
826
        application_name = extract_placeholders(@application_name, chunk)
134✔
827
      else
828
        application_name = nil
×
829
      end
830
      if @pipeline
134✔
831
        pipeline = extract_placeholders(@pipeline, chunk)
4✔
832
      else
833
        pipeline = nil
130✔
834
      end
835
      return logstash_prefix, logstash_dateformat, index_name, template_name, customize_template, application_name, pipeline
134✔
836
    end
837

838
    def multi_workers_ready?
1✔
839
      true
×
840
    end
841

842
    def inject_chunk_id_to_record_if_needed(record, chunk_id)
1✔
843
      if @metainfo&.include_chunk_id
2,150✔
844
        record[@metainfo.chunk_id_key] = chunk_id
2✔
845
        record
2✔
846
      else
847
        record
2,148✔
848
      end
849
    end
850

851
    def write(chunk)
1✔
852
      bulk_message_count = Hash.new { |h,k| h[k] = 0 }
265✔
853
      bulk_message = Hash.new { |h,k| h[k] = '' }
268✔
854
      header = {}
134✔
855
      meta = {}
134✔
856

857
      tag = chunk.metadata.tag
134✔
858
      chunk_id = dump_unique_id_hex(chunk.unique_id)
134✔
859
      extracted_values = expand_placeholders(chunk)
134✔
860
      host = if @hosts
134✔
861
               extract_placeholders(@hosts, chunk)
2✔
862
             else
863
               extract_placeholders(@host, chunk)
132✔
864
             end
865

866
      affinity_target_indices = get_affinity_target_indices(chunk)
134✔
867
      chunk.msgpack_each do |time, record|
134✔
868
        next unless record.is_a? Hash
2,150✔
869

870
        record = inject_chunk_id_to_record_if_needed(record, chunk_id)
2,150✔
871

872
        begin
873
          meta, header, record = process_message(tag, meta, header, time, record, affinity_target_indices, extracted_values)
2,150✔
874
          info = if @include_index_in_url
2,149✔
875
                   RequestInfo.new(host, meta.delete("_index".freeze), meta["_index".freeze], meta.delete("_alias".freeze))
1✔
876
                 else
877
                   RequestInfo.new(host, nil, meta["_index".freeze], meta.delete("_alias".freeze))
2,148✔
878
                 end
879

880
          if split_request?(bulk_message, info)
2,149✔
881
            bulk_message.each do |info, msgs|
1✔
882
              send_bulk(msgs, tag, chunk, bulk_message_count[info], extracted_values, info) unless msgs.empty?
1✔
883
              msgs.clear
1✔
884
              # Clear bulk_message_count for this info.
885
              bulk_message_count[info] = 0;
1✔
886
              next
1✔
887
            end
888
          end
889

890
          if append_record_to_messages(@write_operation, meta, header, record, bulk_message[info])
2,149✔
891
            bulk_message_count[info] += 1;
2,142✔
892
          else
893
            if @emit_error_for_missing_id
7✔
894
              raise MissingIdFieldError, "Missing '_id' field. Write operation is #{@write_operation}"
2✔
895
            else
896
              log.on_debug { log.debug("Dropping record because its missing an '_id' field and write_operation is #{@write_operation}: #{record}") }
7✔
897
            end
898
          end
899
        rescue => e
900
          if emit_error_label_event?
3✔
901
            router.emit_error_event(tag, time, record, e)
3✔
902
          end
903
        end
904
      end
905

906
      bulk_message.each do |info, msgs|
134✔
907
        send_bulk(msgs, tag, chunk, bulk_message_count[info], extracted_values, info) unless msgs.empty?
134✔
908
        msgs.clear
125✔
909
      end
910
    end
911

912
    def target_index_affinity_enabled?()
1✔
913
      @target_index_affinity && @logstash_format && @id_key && (@write_operation == UPDATE_OP || @write_operation == UPSERT_OP)
139✔
914
    end
915

916
    def get_affinity_target_indices(chunk)
1✔
917
      indices = Hash.new
139✔
918
      if target_index_affinity_enabled?()
139✔
919
        id_key_accessor = record_accessor_create(@id_key)
5✔
920
        ids = Set.new
5✔
921
        chunk.msgpack_each do |time, record|
5✔
922
          next unless record.is_a? Hash
7✔
923
          begin
924
            ids << id_key_accessor.call(record)
7✔
925
          end
926
        end
927
        log.debug("Find affinity target_indices by quering on OpenSearch (write_operation #{@write_operation}) for ids: #{ids.to_a}")
5✔
928
        options = {
929
          :index => "#{logstash_prefix}#{@logstash_prefix_separator}*",
5✔
930
        }
931
        query = {
932
          'query' => { 'ids' => { 'values' => ids.to_a } },
5✔
933
          '_source' => false,
934
          'sort' => [
935
            {"_index" => {"order" => "desc"}}
936
         ]
937
        }
938
        result = client.search(options.merge(:body => Yajl.dump(query)))
5✔
939
        # There should be just one hit per _id, but in case there still is multiple, just the oldest index is stored to map
940
        result['hits']['hits'].each do |hit|
5✔
941
          indices[hit["_id"]] = hit["_index"]
8✔
942
          log.debug("target_index for id: #{hit["_id"]} from es: #{hit["_index"]}")
8✔
943
        end
944
      end
945
      indices
139✔
946
    end
947

948
    def split_request?(bulk_message, info)
1✔
949
      # For safety.
950
    end
951

952
    def split_request_size_check?(bulk_message, info)
1✔
953
      bulk_message[info].size > @bulk_message_request_threshold
2✔
954
    end
955

956
    def split_request_size_uncheck?(bulk_message, info)
1✔
957
      false
2,148✔
958
    end
959

960
    def process_message(tag, meta, header, time, record, affinity_target_indices, extracted_values)
1✔
961
      logstash_prefix, logstash_dateformat, index_name, _template_name, _customize_template, application_name, pipeline = extracted_values
2,165✔
962

963
      if @flatten_hashes
2,165✔
964
        record = flatten_record(record)
1✔
965
      end
966

967
      dt = nil
2,165✔
968
      if @logstash_format || @include_timestamp
2,165✔
969
        if record.has_key?(TIMESTAMP_FIELD)
39✔
970
          rts = record[TIMESTAMP_FIELD]
7✔
971
          dt = parse_time(rts, time, tag)
7✔
972
        elsif record.has_key?(@time_key)
32✔
973
          rts = record[@time_key]
6✔
974
          dt = parse_time(rts, time, tag)
6✔
975
          record[TIMESTAMP_FIELD] = dt.iso8601(@time_precision) unless @time_key_exclude_timestamp
6✔
976
        else
977
          dt = Time.at(time).to_datetime
26✔
978
          record[TIMESTAMP_FIELD] = dt.iso8601(@time_precision)
26✔
979
        end
980
      end
981

982
      target_index_parent, target_index_child_key = @target_index_key ? get_parent_of(record, @target_index_key) : nil
2,165✔
983
      if target_index_parent && target_index_parent[target_index_child_key]
2,165✔
984
        target_index_alias = target_index = target_index_parent.delete(target_index_child_key)
4✔
985
      elsif @logstash_format
2,161✔
986
        dt = dt.new_offset(0) if @utc_index
33✔
987
        target_index = "#{logstash_prefix}#{@logstash_prefix_separator}#{dt.strftime(logstash_dateformat)}"
33✔
988
        target_index_alias = "#{logstash_prefix}#{@logstash_prefix_separator}#{application_name}#{@logstash_prefix_separator}#{dt.strftime(logstash_dateformat)}"
33✔
989
      else
990
        target_index_alias = target_index = index_name
2,128✔
991
      end
992

993
      # Change target_index to lower-case since OpenSearch doesn't
994
      # allow upper-case characters in index names.
995
      target_index = target_index.downcase
2,165✔
996
      target_index_alias = target_index_alias.downcase
2,164✔
997
      if @include_tag_key
2,164✔
998
        record[@tag_key] = tag
1✔
999
      end
1000

1001
      # If affinity target indices map has value for this particular id, use it as target_index
1002
      if !affinity_target_indices.empty?
2,164✔
1003
        id_accessor = record_accessor_create(@id_key)
6✔
1004
        id_value = id_accessor.call(record)
6✔
1005
        if affinity_target_indices.key?(id_value)
6✔
1006
          target_index = affinity_target_indices[id_value]
6✔
1007
        end
1008
      end
1009

1010
      if @suppress_type_name || @last_seen_major_version >= 2
2,164✔
1011
        target_type = nil
2✔
1012
      else
1013
        # OpenSearch only supports "_doc".
1014
        target_type = DEFAULT_TYPE_NAME
2,162✔
1015
      end
1016

1017
      meta.clear
2,164✔
1018
      meta["_index".freeze] = target_index
2,164✔
1019
      meta["_type".freeze] = target_type unless target_type.nil?
2,164✔
1020
      meta["_alias".freeze] = target_index_alias
2,164✔
1021

1022
      if @pipeline
2,164✔
1023
        meta["pipeline".freeze] = pipeline
4✔
1024
      end
1025

1026
      @meta_config_map.each do |record_accessor, meta_key|
2,164✔
1027
        if raw_value = record_accessor.call(record)
51✔
1028
          meta[meta_key] = raw_value
40✔
1029
        end
1030
      end
1031

1032
      if @remove_keys
2,164✔
1033
        @remove_keys.each { |key| record.delete(key) }
13✔
1034
      end
1035

1036
      return [meta, header, record]
2,164✔
1037
    end
1038

1039
    # returns [parent, child_key] of child described by path array in record's tree
1040
    # returns [nil, child_key] if path doesnt exist in record
1041
    def get_parent_of(record, path)
1✔
1042
      parent_object = path[0..-2].reduce(record) { |a, e| a.is_a?(Hash) ? a[e] : nil }
6✔
1043
      [parent_object, path[-1]]
6✔
1044
    end
1045

1046
    # gzip compress data
1047
    def gzip(string)
1✔
1048
      wio = StringIO.new("w")
3✔
1049
      w_gz = Zlib::GzipWriter.new(wio, strategy = compression_strategy)
3✔
1050
      w_gz.write(string)
3✔
1051
      w_gz.close
3✔
1052
      wio.string
3✔
1053
    end
1054

1055
    def placeholder_substitution_needed_for_template?
1✔
1056
      need_substitution = placeholder?(:host, @host.to_s) ||
512✔
1057
        placeholder?(:index_name, @index_name.to_s) ||
1058
        placeholder?(:template_name, @template_name.to_s) ||
1059
        @customize_template&.values&.any? { |value| placeholder?(:customize_template, value.to_s) } ||
24✔
1060
        placeholder?(:logstash_prefix, @logstash_prefix.to_s) ||
1061
        placeholder?(:logstash_dateformat, @logstash_dateformat.to_s) ||
1062
        placeholder?(:application_name, @application_name.to_s) ||
1063
      log.debug("Need substitution: #{need_substitution}")
1064
      need_substitution
512✔
1065
    end
1066

1067
    def template_installation(template_name, customize_template, application_name, target_index, host)
1✔
1068
      # for safety.
1069
    end
1070

1071
    def template_installation_actual(template_name, customize_template, application_name, target_index, host=nil)
1✔
1072
      if template_name && @template_file
66✔
1073
        if !@logstash_format && @template_names.include?(template_name)
66✔
1074
          log.debug("Template #{template_name} already exists (cached)")
×
1075
        else
1076
          retry_operate(@max_retry_putting_template,
66✔
1077
                        @fail_on_putting_template_retry_exceed,
1078
                        @catch_transport_exception_on_retry) do
1079
            if customize_template
81✔
1080
              template_custom_install(template_name, @template_file, @template_overwrite, customize_template, host, target_index, @index_separator)
8✔
1081
            else
1082
              template_install(template_name, @template_file, @template_overwrite, host, target_index, @index_separator)
73✔
1083
            end
1084
          end
1085
          @template_names << template_name
21✔
1086
        end
1087
      end
1088
    end
1089

1090
    # send_bulk given a specific bulk request, the original tag,
1091
    # chunk, and bulk_message_count
1092
    def send_bulk(data, tag, chunk, bulk_message_count, extracted_values, info)
1✔
1093
      _logstash_prefix, _logstash_dateformat, index_name, template_name, customize_template, application_name, _pipeline  = extracted_values
132✔
1094
      template_installation(template_name, customize_template, application_name, index_name, info.host)
132✔
1095

1096
      begin
1097

1098
        log.on_trace { log.trace "bulk request: #{data}" }
132✔
1099

1100
        prepared_data = if compression
132✔
1101
                          gzip(data)
3✔
1102
                        else
1103
                          data
129✔
1104
                        end
1105

1106
        response = client(info.host, compression).bulk body: prepared_data, index: info.index
132✔
1107
        log.on_trace { log.trace "bulk response: #{response}" }
124✔
1108

1109
        if response['errors']
124✔
1110
          error = Fluent::Plugin::OpenSearchErrorHandler.new(self)
5✔
1111
          error.handle_error(response, tag, chunk, bulk_message_count, extracted_values)
5✔
1112
        end
1113
      rescue RetryStreamError => e
13✔
1114
        log.trace "router.emit_stream for retry stream doing..."
5✔
1115
        emit_tag = @retry_tag ? @retry_tag : tag
5✔
1116
        # check capacity of buffer space
1117
        if retry_stream_retryable?
5✔
1118
          router.emit_stream(emit_tag, e.retry_stream)
4✔
1119
        else
1120
          raise RetryStreamEmitFailure, "buffer is full."
1✔
1121
        end
1122
        log.trace "router.emit_stream for retry stream done."
4✔
1123
      rescue => e
1124
        ignore = @ignore_exception_classes.any? { |clazz| e.class <= clazz }
11✔
1125

1126
        log.warn "Exception ignored in tag #{tag}: #{e.class.name} #{e.message}" if ignore
8✔
1127

1128
        @_os = nil if @reconnect_on_error
8✔
1129
        @_os_info = nil if @reconnect_on_error
8✔
1130

1131
        raise UnrecoverableRequestFailure if ignore && @exception_backup
8✔
1132

1133
        # FIXME: identify unrecoverable errors and raise UnrecoverableRequestFailure instead
1134
        raise RecoverableRequestFailure, "could not push logs to OpenSearch cluster (#{connection_options_description(info.host)}): #{e.message}" unless ignore
6✔
1135
      end
1136
    end
1137

1138
    def retry_stream_retryable?
1✔
1139
      @buffer.storable?
4✔
1140
    end
1141

1142
    def is_existing_connection(host)
1✔
1143
      # check if the host provided match the current connection
1144
      return false if @_os.nil?
402✔
1145
      return false if @current_config.nil?
189✔
1146
      return false if host.length != @current_config.length
188✔
1147

1148
      for i in 0...host.length
188✔
1149
        if !host[i][:host].eql? @current_config[i][:host] || host[i][:port] != @current_config[i][:port]
192✔
1150
          return false
1✔
1151
        end
1152
      end
1153

1154
      return true
187✔
1155
    end
1156
  end
1157
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc