• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

fluent / fluent-plugin-opensearch / 4594666041

pending completion
4594666041

Pull #97

github

GitHub
Merge 6ac5c91df into 7c8ff50d9
Pull Request #97: Load Faraday v2 instead of v1

3 of 3 new or added lines in 2 files covered. (100.0%)

1131 of 1229 relevant lines covered (92.03%)

157.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.58
/lib/fluent/plugin/out_opensearch.rb
1
# SPDX-License-Identifier: Apache-2.0
2
#
3
# The fluent-plugin-opensearch Contributors require contributions made to
4
# this file be licensed under the Apache-2.0 license or a
5
# compatible open source license.
6
#
7
# Modifications Copyright fluent-plugin-opensearch Contributors. See
8
# GitHub history for details.
9
#
10
# Licensed to Uken Inc. under one or more contributor
11
# license agreements. See the NOTICE file distributed with
12
# this work for additional information regarding copyright
13
# ownership. Uken Inc. licenses this file to you under
14
# the Apache License, Version 2.0 (the "License"); you may
15
# not use this file except in compliance with the License.
16
# You may obtain a copy of the License at
17
#
18
#   http://www.apache.org/licenses/LICENSE-2.0
19
#
20
# Unless required by applicable law or agreed to in writing,
21
# software distributed under the License is distributed on an
22
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
23
# KIND, either express or implied.  See the License for the
24
# specific language governing permissions and limitations
25
# under the License.
26

27
require 'date'
28
require 'excon'
29
require 'opensearch'
30
require 'set'
31
require 'json'
32
require 'uri'
33
require 'base64'
34
begin
35
  require 'strptime'
36
rescue LoadError
37
end
38
require 'resolv'
39

40
require 'fluent/plugin/output'
41
require 'fluent/event'
42
require 'fluent/error'
43
require 'fluent/time'
44
require 'fluent/unique_id'
45
require 'fluent/log-ext'
46
require 'zlib'
47
require_relative 'opensearch_constants'
48
require_relative 'opensearch_error'
49
require_relative 'opensearch_error_handler'
50
require_relative 'opensearch_index_template'
51
require_relative 'opensearch_tls'
52
require_relative 'opensearch_fallback_selector'
53
begin
54
  require_relative 'oj_serializer'
55
rescue LoadError
56
end
57
require 'aws-sdk-core'
58
require 'faraday_middleware/aws_sigv4'
59
require 'faraday/excon'
60

61
module Fluent::Plugin
62
  class OpenSearchOutput < Output
63
    class RecoverableRequestFailure < StandardError; end
64
    class UnrecoverableRequestFailure < Fluent::UnrecoverableError; end
65
    class RetryStreamEmitFailure < StandardError; end
66

67
    # MissingIdFieldError is raised for records that do not
68
    # include the field for the unique record identifier
69
    class MissingIdFieldError < StandardError; end
70

71
    # RetryStreamError privides a stream to be
72
    # put back in the pipeline for cases where a bulk request
73
    # failed (e.g some records succeed while others failed)
74
    class RetryStreamError < StandardError
75
      attr_reader :retry_stream
76
      def initialize(retry_stream)
77
        @retry_stream = retry_stream
78
      end
79
    end
80

81
    RequestInfo = Struct.new(:host, :index, :target_index, :alias)
82

83
    attr_reader :template_names
84
    attr_reader :ssl_version_options
85
    attr_reader :compressable_connection
86

87
    helpers :event_emitter, :compat_parameters, :record_accessor, :timer
88

89
    Fluent::Plugin.register_output('opensearch', self)
90

91
    DEFAULT_BUFFER_TYPE = "memory"
92
    DEFAULT_OPENSEARCH_VERSION = 1
93
    DEFAULT_TYPE_NAME = "_doc".freeze
94
    DEFAULT_RELOAD_AFTER = -1
95
    DEFAULT_TARGET_BULK_BYTES = -1
96
    DEFAULT_POLICY_ID = "logstash-policy"
97

98
    config_param :host, :string,  :default => 'localhost'
99
    config_param :port, :integer, :default => 9200
100
    config_param :user, :string, :default => nil
101
    config_param :password, :string, :default => nil, :secret => true
102
    config_param :path, :string, :default => nil
103
    config_param :scheme, :enum, :list => [:https, :http], :default => :http
104
    config_param :hosts, :string, :default => nil
105
    config_param :target_index_key, :string, :default => nil
106
    config_param :time_key_format, :string, :default => nil
107
    config_param :time_precision, :integer, :default => 9
108
    config_param :include_timestamp, :bool, :default => false
109
    config_param :logstash_format, :bool, :default => false
110
    config_param :logstash_prefix, :string, :default => "logstash"
111
    config_param :logstash_prefix_separator, :string, :default => '-'
112
    config_param :logstash_dateformat, :string, :default => "%Y.%m.%d"
113
    config_param :utc_index, :bool, :default => true
114
    config_param :suppress_type_name, :bool, :default => false
115
    config_param :index_name, :string, :default => "fluentd"
116
    config_param :id_key, :string, :default => nil
117
    config_param :write_operation, :string, :default => "index"
118
    config_param :parent_key, :string, :default => nil
119
    config_param :routing_key, :string, :default => nil
120
    config_param :request_timeout, :time, :default => 5
121
    config_param :reload_connections, :bool, :default => true
122
    config_param :reload_on_failure, :bool, :default => false
123
    config_param :retry_tag, :string, :default=>nil
124
    config_param :resurrect_after, :time, :default => 60
125
    config_param :time_key, :string, :default => nil
126
    config_param :time_key_exclude_timestamp, :bool, :default => false
127
    config_param :ssl_verify , :bool, :default => true
128
    config_param :client_key, :string, :default => nil
129
    config_param :client_cert, :string, :default => nil
130
    config_param :client_key_pass, :string, :default => nil, :secret => true
131
    config_param :ca_file, :string, :default => nil
132
    config_param :remove_keys, :string, :default => nil
133
    config_param :remove_keys_on_update, :string, :default => ""
134
    config_param :remove_keys_on_update_key, :string, :default => nil
135
    config_param :flatten_hashes, :bool, :default => false
136
    config_param :flatten_hashes_separator, :string, :default => "_"
137
    config_param :template_name, :string, :default => nil
138
    config_param :template_file, :string, :default => nil
139
    config_param :template_overwrite, :bool, :default => false
140
    config_param :customize_template, :hash, :default => nil
141
    config_param :index_date_pattern, :string, :default => "now/d"
142
    config_param :index_separator, :string, :default => "-"
143
    config_param :application_name, :string, :default => "default"
144
    config_param :templates, :hash, :default => nil
145
    config_param :max_retry_putting_template, :integer, :default => 10
146
    config_param :fail_on_putting_template_retry_exceed, :bool, :default => true
147
    config_param :fail_on_detecting_os_version_retry_exceed, :bool, :default => true
148
    config_param :max_retry_get_os_version, :integer, :default => 15
149
    config_param :include_tag_key, :bool, :default => false
150
    config_param :tag_key, :string, :default => 'tag'
151
    config_param :time_parse_error_tag, :string, :default => 'opensearch_plugin.output.time.error'
152
    config_param :reconnect_on_error, :bool, :default => false
153
    config_param :pipeline, :string, :default => nil
154
    config_param :with_transporter_log, :bool, :default => false
155
    config_param :emit_error_for_missing_id, :bool, :default => false
156
    config_param :sniffer_class_name, :string, :default => nil
157
    config_param :selector_class_name, :string, :default => nil
158
    config_param :reload_after, :integer, :default => DEFAULT_RELOAD_AFTER
159
    config_param :include_index_in_url, :bool, :default => false
160
    config_param :http_backend, :enum, list: [:excon, :typhoeus], :default => :excon
161
    config_param :http_backend_excon_nonblock, :bool, :default => true
162
    config_param :validate_client_version, :bool, :default => false
163
    config_param :prefer_oj_serializer, :bool, :default => false
164
    config_param :unrecoverable_error_types, :array, :default => ["out_of_memory_error", "rejected_execution_exception"]
165
    config_param :unrecoverable_record_types, :array, :default => ["json_parse_exception"]
166
    config_param :emit_error_label_event, :bool, :default => true
167
    config_param :verify_os_version_at_startup, :bool, :default => true
168
    config_param :default_opensearch_version, :integer, :default => DEFAULT_OPENSEARCH_VERSION
169
    config_param :log_os_400_reason, :bool, :default => false
170
    config_param :custom_headers, :hash, :default => {}
171
    config_param :suppress_doc_wrap, :bool, :default => false
172
    config_param :ignore_exceptions, :array, :default => [], value_type: :string, :desc => "Ignorable exception list"
173
    config_param :exception_backup, :bool, :default => true, :desc => "Chunk backup flag when ignore exception occured"
174
    config_param :bulk_message_request_threshold, :size, :default => DEFAULT_TARGET_BULK_BYTES
175
    config_param :compression_level, :enum, list: [:no_compression, :best_speed, :best_compression, :default_compression], :default => :no_compression
176
    config_param :truncate_caches_interval, :time, :default => nil
177
    config_param :use_legacy_template, :bool, :default => true
178
    config_param :catch_transport_exception_on_retry, :bool, :default => true
179
    config_param :target_index_affinity, :bool, :default => false
180

181
    config_section :metadata, param_name: :metainfo, multi: false do
182
      config_param :include_chunk_id, :bool, :default => false
183
      config_param :chunk_id_key, :string, :default => "chunk_id".freeze
184
    end
185

186
    config_section :endpoint, multi: false do
187
      config_param :region, :string
188
      config_param :url do |c|
189
        c.chomp("/")
190
      end
191
      config_param :access_key_id, :string, :default => ""
192
      config_param :secret_access_key, :string, :default => "", secret: true
193
      config_param :assume_role_arn, :string, :default => nil
194
      config_param :ecs_container_credentials_relative_uri, :string, :default => nil #Set with AWS_CONTAINER_CREDENTIALS_RELATIVE_URI environment variable value
195
      config_param :assume_role_session_name, :string, :default => "fluentd"
196
      config_param :assume_role_web_identity_token_file, :string, :default => nil
197
      config_param :sts_credentials_region, :string, :default => nil
198
      config_param :refresh_credentials_interval, :time, :default => "5h"
199
      config_param :aws_service_name, :enum, list: [:es, :aoss], :default => :es
200
    end
201

202
    config_section :buffer do
203
      config_set_default :@type, DEFAULT_BUFFER_TYPE
204
      config_set_default :chunk_keys, ['tag']
205
      config_set_default :timekey_use_utc, true
206
    end
207

208
    include Fluent::OpenSearchIndexTemplate
209
    include Fluent::Plugin::OpenSearchConstants
210
    include Fluent::Plugin::OpenSearchTLS
211

212
    def initialize
213
      super
214
    end
215

216
    ######################################################################################################
217
    # This creating AWS credentials code part is heavily based on fluent-plugin-aws-elasticsearch-service:
218
    # https://github.com/atomita/fluent-plugin-aws-elasticsearch-service/blob/master/lib/fluent/plugin/out_aws-elasticsearch-service.rb#L73-L134
219
    ######################################################################################################
220
    def aws_credentials(conf)
221
      credentials = nil
222
      unless conf[:access_key_id].empty? || conf[:secret_access_key].empty?
223
        credentials = Aws::Credentials.new(conf[:access_key_id], conf[:secret_access_key])
224
      else
225
        if conf[:assume_role_arn].nil?
226
          aws_container_credentials_relative_uri = conf[:ecs_container_credentials_relative_uri] || ENV["AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"]
227
          if aws_container_credentials_relative_uri.nil?
228
            credentials = Aws::SharedCredentials.new({retries: 2}).credentials rescue nil
229
            credentials ||= Aws::InstanceProfileCredentials.new.credentials rescue nil
230
            credentials ||= Aws::ECSCredentials.new.credentials
231
          else
232
            credentials = Aws::ECSCredentials.new({
233
                            credential_path: aws_container_credentials_relative_uri
234
                          }).credentials
235
          end
236
        else
237
          if conf[:assume_role_web_identity_token_file].nil?
238
            credentials = Aws::AssumeRoleCredentials.new({
239
                            role_arn: conf[:assume_role_arn],
240
                            role_session_name: conf[:assume_role_session_name],
241
                            region: sts_creds_region(conf)
242
                          }).credentials
243
          else
244
            credentials = Aws::AssumeRoleWebIdentityCredentials.new({
245
                            role_arn: conf[:assume_role_arn],
246
                            web_identity_token_file: conf[:assume_role_web_identity_token_file],
247
                            region: sts_creds_region(conf)
248
                          }).credentials
249
          end
250
        end
251
      end
252
      raise "No valid AWS credentials found." unless credentials.set?
253

254
      credentials
255
    end
256

257
    def sts_creds_region(conf)
258
      conf[:sts_credentials_region] || conf[:region]
259
    end
260
    ###############################
261
    # AWS credential part is ended.
262
    ###############################
263

264
    def configure(conf)
265
      compat_parameters_convert(conf, :buffer)
266

267
      super
268

269
      if @endpoint
270
        # here overrides default value of reload_connections to false because
271
        # AWS Elasticsearch Service doesn't return addresses of nodes and Elasticsearch client
272
        # fails to reload connections properly. This ends up "temporarily failed to flush the buffer"
273
        # error repeating forever. See this discussion for details:
274
        # https://discuss.elastic.co/t/elasitcsearch-ruby-raises-cannot-get-new-connection-from-pool-error/36252
275
        @reload_connections = false
276
      end
277

278
      if placeholder_substitution_needed_for_template?
279
        # nop.
280
      elsif not @buffer_config.chunk_keys.include? "tag" and
281
        not @buffer_config.chunk_keys.include? "_index"
282
        raise Fluent::ConfigError, "'tag' or '_index' in chunk_keys is required."
283
      end
284
      @time_parser = create_time_parser
285
      @backend_options = backend_options
286
      @ssl_version_options = set_tls_minmax_version_config(@ssl_version, @ssl_max_version, @ssl_min_version)
287

288
      if @remove_keys
289
        @remove_keys = @remove_keys.split(/\s*,\s*/)
290
      end
291

292
      if @target_index_key && @target_index_key.is_a?(String)
293
        @target_index_key = @target_index_key.split '.'
294
      end
295

296
      if @remove_keys_on_update && @remove_keys_on_update.is_a?(String)
297
        @remove_keys_on_update = @remove_keys_on_update.split ','
298
      end
299

300
      raise Fluent::ConfigError, "'max_retry_putting_template' must be greater than or equal to zero." if @max_retry_putting_template < 0
301
      raise Fluent::ConfigError, "'max_retry_get_os_version' must be greater than or equal to zero." if @max_retry_get_os_version < 0
302

303
      # Dump log when using host placeholders and template features at same time.
304
      valid_host_placeholder = placeholder?(:host_placeholder, @host)
305
      if valid_host_placeholder && (@template_name && @template_file || @templates)
306
        if @verify_os_version_at_startup
307
          raise Fluent::ConfigError, "host placeholder, template installation, and verify OpenSearch version at startup are exclusive feature at same time. Please specify verify_os_version_at_startup as `false` when host placeholder and template installation are enabled."
308
        end
309
        log.info "host placeholder and template installation makes your OpenSearch cluster a bit slow down(beta)."
310
      end
311

312
      @template_names = []
313
      if !dry_run?
314
        if @template_name && @template_file
315
          if @logstash_format || placeholder_substitution_needed_for_template?
316
            class << self
317
              alias_method :template_installation, :template_installation_actual
318
            end
319
          else
320
            template_installation_actual(@template_name, @customize_template, @application_name, @index_name)
321
          end
322
        end
323
        if @templates
324
          retry_operate(@max_retry_putting_template,
325
                        @fail_on_putting_template_retry_exceed,
326
                        @catch_transport_exception_on_retry) do
327
            templates_hash_install(@templates, @template_overwrite)
328
          end
329
        end
330
      end
331

332
      @truncate_mutex = Mutex.new
333
      if @truncate_caches_interval
334
        timer_execute(:out_opensearch_truncate_caches, @truncate_caches_interval) do
335
          log.info('Clean up the indices and template names cache')
336

337
          @truncate_mutex.synchronize {
338
            @template_names.clear
339
          }
340
        end
341
      end
342
      # If AWS credentials is set, consider to expire credentials information forcibly before expired.
343
      @credential_mutex = Mutex.new
344
      if @endpoint
345
        @_aws_credentials = aws_credentials(@endpoint)
346

347
        if @endpoint.refresh_credentials_interval
348
          timer_execute(:out_opensearch_expire_credentials, @endpoint.refresh_credentials_interval) do
349
            log.debug('Recreate the AWS credentials')
350

351
            @credential_mutex.synchronize do
352
              @_os = nil
353
              @_aws_credentials = aws_credentials(@endpoint)
354
            end
355
          end
356
        end
357
      end
358

359
      @serializer_class = nil
360
      begin
361
        require 'oj'
362
        @dump_proc = Oj.method(:dump)
363
        if @prefer_oj_serializer
364
          @serializer_class = Fluent::Plugin::Serializer::Oj
365
          OpenSearch::API.settings[:serializer] = Fluent::Plugin::Serializer::Oj
366
        end
367
      rescue LoadError
368
        @dump_proc = Yajl.method(:dump)
369
      end
370

371
      raise Fluent::ConfigError, "`password` must be present if `user` is present" if @user && @password.nil?
372

373
      if @user && m = @user.match(/%{(?<user>.*)}/)
374
        @user = URI.encode_www_form_component(m["user"])
375
      end
376
      if @password && m = @password.match(/%{(?<password>.*)}/)
377
        @password = URI.encode_www_form_component(m["password"])
378
      end
379

380
      @transport_logger = nil
381
      if @with_transporter_log
382
        @transport_logger = log
383
        log_level = conf['@log_level'] || conf['log_level']
384
        log.warn "Consider to specify log_level with @log_level." unless log_level
385
      end
386
      # Specify @sniffer_class before calling #client.
387
      # #detect_os_major_version uses #client.
388
      @sniffer_class = nil
389
      begin
390
        @sniffer_class = Object.const_get(@sniffer_class_name) if @sniffer_class_name
391
      rescue Exception => ex
392
        raise Fluent::ConfigError, "Could not load sniffer class #{@sniffer_class_name}: #{ex}"
393
      end
394

395
      @selector_class = nil
396
      begin
397
        @selector_class = Object.const_get(@selector_class_name) if @selector_class_name
398
      rescue Exception => ex
399
        raise Fluent::ConfigError, "Could not load selector class #{@selector_class_name}: #{ex}"
400
      end
401

402
      @last_seen_major_version = if major_version = handle_last_seen_os_major_version
403
                                   major_version
404
                                 else
405
                                   @default_opensearch_version
406
                                 end
407

408
      if @validate_client_version && !dry_run?
409
        if @last_seen_major_version != client_library_version.to_i
410
          raise Fluent::ConfigError, <<-EOC
411
            Detected OpenSearch #{@last_seen_major_version} but you use OpenSearch client #{client_library_version}.
412
            Please consider to use #{@last_seen_major_version}.x series OpenSearch client.
413
          EOC
414
        end
415
      end
416

417
      if @last_seen_major_version >= 1
418
        case @ssl_version
419
        when :SSLv23, :TLSv1, :TLSv1_1
420
          if @scheme == :https
421
            log.warn "Detected OpenSearch 1.x or above and enabled insecure security:
422
                      You might have to specify `ssl_version TLSv1_2` in configuration."
423
          end
424
        end
425
      end
426

427
      if @ssl_version && @scheme == :https
428
        if !@http_backend_excon_nonblock
429
          log.warn "TLS handshake will be stucked with block connection.
430
                    Consider to set `http_backend_excon_nonblock` as true"
431
        end
432
      end
433

434
      # Consider missing the prefix of "$." in nested key specifiers.
435
      @id_key = convert_compat_id_key(@id_key) if @id_key
436
      @parent_key = convert_compat_id_key(@parent_key) if @parent_key
437
      @routing_key = convert_compat_id_key(@routing_key) if @routing_key
438

439
      @routing_key_name = configure_routing_key_name
440
      @meta_config_map = create_meta_config_map
441
      @current_config = nil
442
      @compressable_connection = false
443

444
      @ignore_exception_classes = @ignore_exceptions.map do |exception|
445
        unless Object.const_defined?(exception)
446
          log.warn "Cannot find class #{exception}. Will ignore it."
447

448
          nil
449
        else
450
          Object.const_get(exception)
451
        end
452
      end.compact
453

454
      if @bulk_message_request_threshold < 0
455
        class << self
456
          alias_method :split_request?, :split_request_size_uncheck?
457
        end
458
      else
459
        class << self
460
          alias_method :split_request?, :split_request_size_check?
461
        end
462
      end
463
    end
464

465
    def dry_run?
466
      if Fluent::Engine.respond_to?(:dry_run_mode)
467
        Fluent::Engine.dry_run_mode
468
      elsif Fluent::Engine.respond_to?(:supervisor_mode)
469
        Fluent::Engine.supervisor_mode
470
      end
471
    end
472

473
    def placeholder?(name, param)
474
      placeholder_validities = []
475
      placeholder_validators(name, param).each do |v|
476
        begin
477
          v.validate!
478
          placeholder_validities << true
479
        rescue Fluent::ConfigError => e
480
          log.debug("'#{name} #{param}' is tested built-in placeholder(s) but there is no valid placeholder(s). error: #{e}")
481
          placeholder_validities << false
482
        end
483
      end
484
      placeholder_validities.include?(true)
485
    end
486

487
    def emit_error_label_event?
488
      !!@emit_error_label_event
489
    end
490

491
    def compression
492
      !(@compression_level == :no_compression)
493
    end
494

495
    def compression_strategy
496
      case @compression_level
497
      when :default_compression
498
        Zlib::DEFAULT_COMPRESSION
499
      when :best_compression
500
        Zlib::BEST_COMPRESSION
501
      when :best_speed
502
        Zlib::BEST_SPEED
503
      else
504
        Zlib::NO_COMPRESSION
505
      end
506
    end
507

508
    def backend_options
509
      case @http_backend
510
      when :excon
511
        { client_key: @client_key, client_cert: @client_cert, client_key_pass: @client_key_pass, nonblock: @http_backend_excon_nonblock }
512
      when :typhoeus
513
        require 'faraday/typhoeus'
514
        { sslkey: @client_key, sslcert: @client_cert, keypasswd: @client_key_pass }
515
      end
516
    rescue LoadError => ex
517
      log.error_backtrace(ex.backtrace)
518
      raise Fluent::ConfigError, "You must install #{@http_backend} gem. Exception: #{ex}"
519
    end
520

521
    def handle_last_seen_os_major_version
522
      if @verify_os_version_at_startup && !dry_run?
523
        retry_operate(@max_retry_get_os_version,
524
                      @fail_on_detecting_os_version_retry_exceed,
525
                      @catch_transport_exception_on_retry) do
526
          detect_os_major_version
527
        end
528
      else
529
        nil
530
      end
531
    end
532

533
    def detect_os_major_version
534
      @_os_info ||= client.info
535
      begin
536
        unless version = @_os_info.dig("version", "number")
537
          version = @default_opensearch_version
538
        end
539
      rescue NoMethodError => e
540
        log.warn "#{@_os_info} can not dig version information. Assuming OpenSearch #{@default_opensearch_version}", error: e
541
        version = @default_opensearch_version
542
      end
543
      version.to_i
544
    end
545

546
    def client_library_version
547
      OpenSearch::VERSION
548
    end
549

550
    def configure_routing_key_name
551
      'routing'.freeze
552
    end
553

554
    def convert_compat_id_key(key)
555
      if key.include?('.') && !key.start_with?('$[')
556
        key = "$.#{key}" unless key.start_with?('$.')
557
      end
558
      key
559
    end
560

561
    def create_meta_config_map
562
      result = []
563
      result << [record_accessor_create(@id_key), '_id'] if @id_key
564
      result << [record_accessor_create(@parent_key), '_parent'] if @parent_key
565
      result << [record_accessor_create(@routing_key), @routing_key_name] if @routing_key
566
      result
567
    end
568

569
    # once fluent v0.14 is released we might be able to use
570
    # Fluent::Parser::TimeParser, but it doesn't quite do what we want - if gives
571
    # [sec,nsec] where as we want something we can call `strftime` on...
572
    def create_time_parser
573
      if @time_key_format
574
        begin
575
          # Strptime doesn't support all formats, but for those it does it's
576
          # blazingly fast.
577
          strptime = Strptime.new(@time_key_format)
578
          Proc.new { |value|
579
            value = convert_numeric_time_into_string(value, @time_key_format) if value.is_a?(Numeric)
580
            strptime.exec(value).to_datetime
581
          }
582
        rescue
583
          # Can happen if Strptime doesn't recognize the format; or
584
          # if strptime couldn't be required (because it's not installed -- it's
585
          # ruby 2 only)
586
          Proc.new { |value|
587
            value = convert_numeric_time_into_string(value, @time_key_format) if value.is_a?(Numeric)
588
            DateTime.strptime(value, @time_key_format)
589
          }
590
        end
591
      else
592
        Proc.new { |value|
593
          value = convert_numeric_time_into_string(value) if value.is_a?(Numeric)
594
          DateTime.parse(value)
595
        }
596
      end
597
    end
598

599
    def convert_numeric_time_into_string(numeric_time, time_key_format = "%Y-%m-%d %H:%M:%S.%N%z")
600
      numeric_time_parser = Fluent::NumericTimeParser.new(:float)
601
      Time.at(numeric_time_parser.parse(numeric_time).to_r).strftime(time_key_format)
602
    end
603

604
    def parse_time(value, event_time, tag)
605
      @time_parser.call(value)
606
    rescue => e
607
      if emit_error_label_event?
608
        router.emit_error_event(@time_parse_error_tag, Fluent::Engine.now, {'tag' => tag, 'time' => event_time, 'format' => @time_key_format, 'value' => value}, e)
609
      end
610
      return Time.at(event_time).to_datetime
611
    end
612

613
    def client(host = nil, compress_connection = false)
614
      # check here to see if we already have a client connection for the given host
615
      connection_options = get_connection_options(host)
616

617
      @_os = nil unless is_existing_connection(connection_options[:hosts])
618
      @_os = nil unless @compressable_connection == compress_connection
619

620
      @_os ||= begin
621
        @compressable_connection = compress_connection
622
        @current_config = connection_options[:hosts].clone
623
        adapter_conf = if @endpoint
624
                         lambda do |f|
625
                           f.request(
626
                             :aws_sigv4,
627
                             service: @endpoint.aws_service_name.to_s,
628
                             region: @endpoint.region,
629
                             credentials: @_aws_credentials,
630
                           )
631

632
                           f.adapter @http_backend, @backend_options
633
                         end
634
                       else
635
                         lambda {|f| f.adapter @http_backend, @backend_options }
636
                       end
637

638
        local_reload_connections = @reload_connections
639
        if local_reload_connections && @reload_after > DEFAULT_RELOAD_AFTER
640
          local_reload_connections = @reload_after
641
        end
642

643
        gzip_headers = if compress_connection
644
                         {'Content-Encoding' => 'gzip'}
645
                       else
646
                         {}
647
                       end
648
        headers = {}.merge(@custom_headers)
649
                    .merge(gzip_headers)
650
        ssl_options = { verify: @ssl_verify, ca_file: @ca_file}.merge(@ssl_version_options)
651

652
        transport = OpenSearch::Transport::Transport::HTTP::Faraday.new(connection_options.merge(
653
                                                                            options: {
654
                                                                              reload_connections: local_reload_connections,
655
                                                                              reload_on_failure: @reload_on_failure,
656
                                                                              resurrect_after: @resurrect_after,
657
                                                                              logger: @transport_logger,
658
                                                                              transport_options: {
659
                                                                                headers: headers,
660
                                                                                request: { timeout: @request_timeout },
661
                                                                                ssl: ssl_options,
662
                                                                              },
663
                                                                              http: {
664
                                                                                user: @user,
665
                                                                                password: @password,
666
                                                                                scheme: @scheme
667
                                                                              },
668
                                                                              sniffer_class: @sniffer_class,
669
                                                                              serializer_class: @serializer_class,
670
                                                                              selector_class: @selector_class,
671
                                                                              compression: compress_connection,
672
                                                                            }), &adapter_conf)
673
        OpenSearch::Client.new transport: transport
674
      end
675
    end
676

677
    def get_escaped_userinfo(host_str)
678
      if m = host_str.match(/(?<scheme>.*)%{(?<user>.*)}:%{(?<password>.*)}(?<path>@.*)/)
679
        m["scheme"] +
680
          URI.encode_www_form_component(m["user"]) +
681
          ':' +
682
          URI.encode_www_form_component(m["password"]) +
683
          m["path"]
684
      else
685
        host_str
686
      end
687
    end
688

689
    def get_connection_options(con_host=nil)
690

691
      hosts = if @endpoint # For AWS OpenSearch Service
692
        uri = URI(@endpoint.url)
693
        host = %w(user password path).inject(host: uri.host, port: uri.port, scheme: uri.scheme) do |hash, key|
694
          hash[key.to_sym] = uri.public_send(key) unless uri.public_send(key).nil? || uri.public_send(key) == ''
695
          hash
696
        end
697
        [host]
698
      elsif con_host || @hosts
699
        (con_host || @hosts).split(',').map do |host_str|
700
          # Support legacy hosts format host:port,host:port,host:port...
701
          if host_str.match(%r{^[^:]+(\:\d+)?$})
702
            {
703
              host:   host_str.split(':')[0],
704
              port:   (host_str.split(':')[1] || @port).to_i,
705
              scheme: @scheme.to_s
706
            }
707
          else
708
            # New hosts format expects URLs such as http://logs.foo.com,https://john:pass@logs2.foo.com/elastic
709
            uri = URI(get_escaped_userinfo(host_str))
710
            %w(user password path).inject(host: uri.host, port: uri.port, scheme: uri.scheme) do |hash, key|
711
              hash[key.to_sym] = uri.public_send(key) unless uri.public_send(key).nil? || uri.public_send(key) == ''
712
              hash
713
            end
714
          end
715
        end.compact
716
      else
717
        if Resolv::IPv6::Regex.match(@host)
718
          [{host: "[#{@host}]", scheme: @scheme.to_s, port: @port}]
719
        else
720
          [{host: @host, port: @port, scheme: @scheme.to_s}]
721
        end
722
      end.each do |host|
723
        host.merge!(user: @user, password: @password) if !host[:user] && @user
724
        host.merge!(path: @path) if !host[:path] && @path
725
      end
726

727
      {
728
        hosts: hosts
729
      }
730
    end
731

732
    def connection_options_description(con_host=nil)
733
      get_connection_options(con_host)[:hosts].map do |host_info|
734
        attributes = host_info.dup
735
        attributes[:password] = 'obfuscated' if attributes.has_key?(:password)
736
        attributes.inspect
737
      end.join(', ')
738
    end
739

740
    # append_record_to_messages adds a record to the bulk message
741
    # payload to be submitted to OpenSearch.  Records that do
742
    # not include '_id' field are skipped when 'write_operation'
743
    # is configured for 'create' or 'update'
744
    #
745
    # returns 'true' if record was appended to the bulk message
746
    #         and 'false' otherwise
747
    def append_record_to_messages(op, meta, header, record, msgs)
748
      case op
749
      when UPDATE_OP, UPSERT_OP
750
        if meta.has_key?(ID_FIELD)
751
          header[UPDATE_OP] = meta
752
          msgs << @dump_proc.call(header) << BODY_DELIMITER
753
          msgs << @dump_proc.call(update_body(record, op)) << BODY_DELIMITER
754
          return true
755
        end
756
      when CREATE_OP
757
        if meta.has_key?(ID_FIELD)
758
          header[CREATE_OP] = meta
759
          msgs << @dump_proc.call(header) << BODY_DELIMITER
760
          msgs << @dump_proc.call(record) << BODY_DELIMITER
761
          return true
762
        end
763
      when INDEX_OP
764
        header[INDEX_OP] = meta
765
        msgs << @dump_proc.call(header) << BODY_DELIMITER
766
        msgs << @dump_proc.call(record) << BODY_DELIMITER
767
        return true
768
      end
769
      return false
770
    end
771

772
    def update_body(record, op)
773
      update = remove_keys(record)
774
      if @suppress_doc_wrap
775
        return update
776
      end
777
      body = {"doc".freeze => update}
778
      if op == UPSERT_OP
779
        if update == record
780
          body["doc_as_upsert".freeze] = true
781
        else
782
          body[UPSERT_OP] = record
783
        end
784
      end
785
      body
786
    end
787

788
    def remove_keys(record)
789
      keys = record[@remove_keys_on_update_key] || @remove_keys_on_update || []
790
      record.delete(@remove_keys_on_update_key)
791
      return record unless keys.any?
792
      record = record.dup
793
      keys.each { |key| record.delete(key) }
794
      record
795
    end
796

797
    def flatten_record(record, prefix=[])
798
      ret = {}
799
      if record.is_a? Hash
800
        record.each { |key, value|
801
          ret.merge! flatten_record(value, prefix + [key.to_s])
802
        }
803
      elsif record.is_a? Array
804
        # Don't mess with arrays, leave them unprocessed
805
        ret.merge!({prefix.join(@flatten_hashes_separator) => record})
806
      else
807
        return {prefix.join(@flatten_hashes_separator) => record}
808
      end
809
      ret
810
    end
811

812
    def expand_placeholders(chunk)
813
      logstash_prefix = extract_placeholders(@logstash_prefix, chunk)
814
      logstash_dateformat = extract_placeholders(@logstash_dateformat, chunk)
815
      index_name = extract_placeholders(@index_name, chunk)
816
      if @template_name
817
        template_name = extract_placeholders(@template_name, chunk)
818
      else
819
        template_name = nil
820
      end
821
      if @customize_template
822
        customize_template = @customize_template.each_with_object({}) { |(key, value), hash| hash[key] = extract_placeholders(value, chunk) }
823
      else
824
        customize_template = nil
825
      end
826
      if @application_name
827
        application_name = extract_placeholders(@application_name, chunk)
828
      else
829
        application_name = nil
830
      end
831
      if @pipeline
832
        pipeline = extract_placeholders(@pipeline, chunk)
833
      else
834
        pipeline = nil
835
      end
836
      return logstash_prefix, logstash_dateformat, index_name, template_name, customize_template, application_name, pipeline
837
    end
838

839
    def multi_workers_ready?
840
      true
841
    end
842

843
    def inject_chunk_id_to_record_if_needed(record, chunk_id)
844
      if @metainfo&.include_chunk_id
845
        record[@metainfo.chunk_id_key] = chunk_id
846
        record
847
      else
848
        record
849
      end
850
    end
851

852
    def write(chunk)
853
      bulk_message_count = Hash.new { |h,k| h[k] = 0 }
854
      bulk_message = Hash.new { |h,k| h[k] = '' }
855
      header = {}
856
      meta = {}
857

858
      tag = chunk.metadata.tag
859
      chunk_id = dump_unique_id_hex(chunk.unique_id)
860
      extracted_values = expand_placeholders(chunk)
861
      host = if @hosts
862
               extract_placeholders(@hosts, chunk)
863
             else
864
               extract_placeholders(@host, chunk)
865
             end
866

867
      affinity_target_indices = get_affinity_target_indices(chunk)
868
      chunk.msgpack_each do |time, record|
869
        next unless record.is_a? Hash
870

871
        record = inject_chunk_id_to_record_if_needed(record, chunk_id)
872

873
        begin
874
          meta, header, record = process_message(tag, meta, header, time, record, affinity_target_indices, extracted_values)
875
          info = if @include_index_in_url
876
                   RequestInfo.new(host, meta.delete("_index".freeze), meta["_index".freeze], meta.delete("_alias".freeze))
877
                 else
878
                   RequestInfo.new(host, nil, meta["_index".freeze], meta.delete("_alias".freeze))
879
                 end
880

881
          if split_request?(bulk_message, info)
882
            bulk_message.each do |info, msgs|
883
              send_bulk(msgs, tag, chunk, bulk_message_count[info], extracted_values, info) unless msgs.empty?
884
              msgs.clear
885
              # Clear bulk_message_count for this info.
886
              bulk_message_count[info] = 0;
887
              next
888
            end
889
          end
890

891
          if append_record_to_messages(@write_operation, meta, header, record, bulk_message[info])
892
            bulk_message_count[info] += 1;
893
          else
894
            if @emit_error_for_missing_id
895
              raise MissingIdFieldError, "Missing '_id' field. Write operation is #{@write_operation}"
896
            else
897
              log.on_debug { log.debug("Dropping record because its missing an '_id' field and write_operation is #{@write_operation}: #{record}") }
898
            end
899
          end
900
        rescue => e
901
          if emit_error_label_event?
902
            router.emit_error_event(tag, time, record, e)
903
          end
904
        end
905
      end
906

907
      bulk_message.each do |info, msgs|
908
        send_bulk(msgs, tag, chunk, bulk_message_count[info], extracted_values, info) unless msgs.empty?
909
        msgs.clear
910
      end
911
    end
912

913
    def target_index_affinity_enabled?()
914
      @target_index_affinity && @logstash_format && @id_key && (@write_operation == UPDATE_OP || @write_operation == UPSERT_OP)
915
    end
916

917
    def get_affinity_target_indices(chunk)
918
      indices = Hash.new
919
      if target_index_affinity_enabled?()
920
        id_key_accessor = record_accessor_create(@id_key)
921
        ids = Set.new
922
        chunk.msgpack_each do |time, record|
923
          next unless record.is_a? Hash
924
          begin
925
            ids << id_key_accessor.call(record)
926
          end
927
        end
928
        log.debug("Find affinity target_indices by quering on OpenSearch (write_operation #{@write_operation}) for ids: #{ids.to_a}")
929
        options = {
930
          :index => "#{logstash_prefix}#{@logstash_prefix_separator}*",
931
        }
932
        query = {
933
          'query' => { 'ids' => { 'values' => ids.to_a } },
934
          '_source' => false,
935
          'sort' => [
936
            {"_index" => {"order" => "desc"}}
937
         ]
938
        }
939
        result = client.search(options.merge(:body => Yajl.dump(query)))
940
        # There should be just one hit per _id, but in case there still is multiple, just the oldest index is stored to map
941
        result['hits']['hits'].each do |hit|
942
          indices[hit["_id"]] = hit["_index"]
943
          log.debug("target_index for id: #{hit["_id"]} from es: #{hit["_index"]}")
944
        end
945
      end
946
      indices
947
    end
948

949
    def split_request?(bulk_message, info)
950
      # For safety.
951
    end
952

953
    def split_request_size_check?(bulk_message, info)
954
      bulk_message[info].size > @bulk_message_request_threshold
955
    end
956

957
    def split_request_size_uncheck?(bulk_message, info)
958
      false
959
    end
960

961
    def process_message(tag, meta, header, time, record, affinity_target_indices, extracted_values)
962
      logstash_prefix, logstash_dateformat, index_name, _template_name, _customize_template, application_name, pipeline = extracted_values
963

964
      if @flatten_hashes
965
        record = flatten_record(record)
966
      end
967

968
      dt = nil
969
      if @logstash_format || @include_timestamp
970
        if record.has_key?(TIMESTAMP_FIELD)
971
          rts = record[TIMESTAMP_FIELD]
972
          dt = parse_time(rts, time, tag)
973
        elsif record.has_key?(@time_key)
974
          rts = record[@time_key]
975
          dt = parse_time(rts, time, tag)
976
          record[TIMESTAMP_FIELD] = dt.iso8601(@time_precision) unless @time_key_exclude_timestamp
977
        else
978
          dt = Time.at(time).to_datetime
979
          record[TIMESTAMP_FIELD] = dt.iso8601(@time_precision)
980
        end
981
      end
982

983
      target_index_parent, target_index_child_key = @target_index_key ? get_parent_of(record, @target_index_key) : nil
984
      if target_index_parent && target_index_parent[target_index_child_key]
985
        target_index_alias = target_index = target_index_parent.delete(target_index_child_key)
986
      elsif @logstash_format
987
        dt = dt.new_offset(0) if @utc_index
988
        target_index = "#{logstash_prefix}#{@logstash_prefix_separator}#{dt.strftime(logstash_dateformat)}"
989
        target_index_alias = "#{logstash_prefix}#{@logstash_prefix_separator}#{application_name}#{@logstash_prefix_separator}#{dt.strftime(logstash_dateformat)}"
990
      else
991
        target_index_alias = target_index = index_name
992
      end
993

994
      # Change target_index to lower-case since OpenSearch doesn't
995
      # allow upper-case characters in index names.
996
      target_index = target_index.downcase
997
      target_index_alias = target_index_alias.downcase
998
      if @include_tag_key
999
        record[@tag_key] = tag
1000
      end
1001

1002
      # If affinity target indices map has value for this particular id, use it as target_index
1003
      if !affinity_target_indices.empty?
1004
        id_accessor = record_accessor_create(@id_key)
1005
        id_value = id_accessor.call(record)
1006
        if affinity_target_indices.key?(id_value)
1007
          target_index = affinity_target_indices[id_value]
1008
        end
1009
      end
1010

1011
      if @suppress_type_name || @last_seen_major_version >= 2
1012
        target_type = nil
1013
      else
1014
        # OpenSearch only supports "_doc".
1015
        target_type = DEFAULT_TYPE_NAME
1016
      end
1017

1018
      meta.clear
1019
      meta["_index".freeze] = target_index
1020
      meta["_type".freeze] = target_type unless target_type.nil?
1021
      meta["_alias".freeze] = target_index_alias
1022

1023
      if @pipeline
1024
        meta["pipeline".freeze] = pipeline
1025
      end
1026

1027
      @meta_config_map.each do |record_accessor, meta_key|
1028
        if raw_value = record_accessor.call(record)
1029
          meta[meta_key] = raw_value
1030
        end
1031
      end
1032

1033
      if @remove_keys
1034
        @remove_keys.each { |key| record.delete(key) }
1035
      end
1036

1037
      return [meta, header, record]
1038
    end
1039

1040
    # returns [parent, child_key] of child described by path array in record's tree
1041
    # returns [nil, child_key] if path doesnt exist in record
1042
    def get_parent_of(record, path)
1043
      parent_object = path[0..-2].reduce(record) { |a, e| a.is_a?(Hash) ? a[e] : nil }
1044
      [parent_object, path[-1]]
1045
    end
1046

1047
    # gzip compress data
1048
    def gzip(string)
1049
      wio = StringIO.new("w")
1050
      w_gz = Zlib::GzipWriter.new(wio, strategy = compression_strategy)
1051
      w_gz.write(string)
1052
      w_gz.close
1053
      wio.string
1054
    end
1055

1056
    def placeholder_substitution_needed_for_template?
1057
      need_substitution = placeholder?(:host, @host.to_s) ||
1058
        placeholder?(:index_name, @index_name.to_s) ||
1059
        placeholder?(:template_name, @template_name.to_s) ||
1060
        @customize_template&.values&.any? { |value| placeholder?(:customize_template, value.to_s) } ||
1061
        placeholder?(:logstash_prefix, @logstash_prefix.to_s) ||
1062
        placeholder?(:logstash_dateformat, @logstash_dateformat.to_s) ||
1063
        placeholder?(:application_name, @application_name.to_s) ||
1064
      log.debug("Need substitution: #{need_substitution}")
1065
      need_substitution
1066
    end
1067

1068
    def template_installation(template_name, customize_template, application_name, target_index, host)
1069
      # for safety.
1070
    end
1071

1072
    def template_installation_actual(template_name, customize_template, application_name, target_index, host=nil)
1073
      if template_name && @template_file
1074
        if !@logstash_format && @template_names.include?(template_name)
1075
          log.debug("Template #{template_name} already exists (cached)")
1076
        else
1077
          retry_operate(@max_retry_putting_template,
1078
                        @fail_on_putting_template_retry_exceed,
1079
                        @catch_transport_exception_on_retry) do
1080
            if customize_template
1081
              template_custom_install(template_name, @template_file, @template_overwrite, customize_template, host, target_index, @index_separator)
1082
            else
1083
              template_install(template_name, @template_file, @template_overwrite, host, target_index, @index_separator)
1084
            end
1085
          end
1086
          @template_names << template_name
1087
        end
1088
      end
1089
    end
1090

1091
    # send_bulk given a specific bulk request, the original tag,
1092
    # chunk, and bulk_message_count
1093
    def send_bulk(data, tag, chunk, bulk_message_count, extracted_values, info)
1094
      _logstash_prefix, _logstash_dateformat, index_name, template_name, customize_template, application_name, _pipeline  = extracted_values
1095
      template_installation(template_name, customize_template, application_name, index_name, info.host)
1096

1097
      begin
1098

1099
        log.on_trace { log.trace "bulk request: #{data}" }
1100

1101
        prepared_data = if compression
1102
                          gzip(data)
1103
                        else
1104
                          data
1105
                        end
1106

1107
        response = client(info.host, compression).bulk body: prepared_data, index: info.index
1108
        log.on_trace { log.trace "bulk response: #{response}" }
1109

1110
        if response['errors']
1111
          error = Fluent::Plugin::OpenSearchErrorHandler.new(self)
1112
          error.handle_error(response, tag, chunk, bulk_message_count, extracted_values)
1113
        end
1114
      rescue RetryStreamError => e
1115
        log.trace "router.emit_stream for retry stream doing..."
1116
        emit_tag = @retry_tag ? @retry_tag : tag
1117
        # check capacity of buffer space
1118
        if retry_stream_retryable?
1119
          router.emit_stream(emit_tag, e.retry_stream)
1120
        else
1121
          raise RetryStreamEmitFailure, "buffer is full."
1122
        end
1123
        log.trace "router.emit_stream for retry stream done."
1124
      rescue => e
1125
        ignore = @ignore_exception_classes.any? { |clazz| e.class <= clazz }
1126

1127
        log.warn "Exception ignored in tag #{tag}: #{e.class.name} #{e.message}" if ignore
1128

1129
        @_os = nil if @reconnect_on_error
1130
        @_os_info = nil if @reconnect_on_error
1131

1132
        raise UnrecoverableRequestFailure if ignore && @exception_backup
1133

1134
        # FIXME: identify unrecoverable errors and raise UnrecoverableRequestFailure instead
1135
        raise RecoverableRequestFailure, "could not push logs to OpenSearch cluster (#{connection_options_description(info.host)}): #{e.message}" unless ignore
1136
      end
1137
    end
1138

1139
    def retry_stream_retryable?
1140
      @buffer.storable?
1141
    end
1142

1143
    def is_existing_connection(host)
1144
      # check if the host provided match the current connection
1145
      return false if @_os.nil?
1146
      return false if @current_config.nil?
1147
      return false if host.length != @current_config.length
1148

1149
      for i in 0...host.length
1150
        if !host[i][:host].eql? @current_config[i][:host] || host[i][:port] != @current_config[i][:port]
1151
          return false
1152
        end
1153
      end
1154

1155
      return true
1156
    end
1157
  end
1158
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc