• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

fluent / fluent-plugin-opensearch / 14873730023

07 May 2025 02:22AM UTC coverage: 91.339% (-0.006%) from 91.345%
14873730023

Pull #156

github

web-flow
Merge 1576e49ae into 4b6ff1a28
Pull Request #156: Fix memory usage

1160 of 1270 relevant lines covered (91.34%)

155.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.26
/lib/fluent/plugin/in_opensearch.rb
1
# SPDX-License-Identifier: Apache-2.0
2
#
3
# The fluent-plugin-opensearch Contributors require contributions made to
4
# this file be licensed under the Apache-2.0 license or a
5
# compatible open source license.
6
#
7
# Modifications Copyright fluent-plugin-opensearch Contributors. See
8
# GitHub history for details.
9
#
10
# Licensed to Uken Inc. under one or more contributor
11
# license agreements. See the NOTICE file distributed with
12
# this work for additional information regarding copyright
13
# ownership. Uken Inc. licenses this file to you under
14
# the Apache License, Version 2.0 (the "License"); you may
15
# not use this file except in compliance with the License.
16
# You may obtain a copy of the License at
17
#
18
#   http://www.apache.org/licenses/LICENSE-2.0
19
#
20
# Unless required by applicable law or agreed to in writing,
21
# software distributed under the License is distributed on an
22
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
23
# KIND, either express or implied.  See the License for the
24
# specific language governing permissions and limitations
25
# under the License.
26

27
require 'opensearch'
1✔
28

29
require 'faraday/excon'
1✔
30
require 'fluent/log-ext'
1✔
31
require 'fluent/plugin/input'
1✔
32
require 'fluent/plugin_helper'
1✔
33
require_relative 'opensearch_constants'
1✔
34

35
module Fluent::Plugin
1✔
36
  class OpenSearchInput < Input
1✔
37
    class UnrecoverableRequestFailure < Fluent::UnrecoverableError; end
1✔
38

39
    DEFAULT_RELOAD_AFTER = -1
1✔
40
    DEFAULT_STORAGE_TYPE = 'local'
1✔
41
    METADATA = "@metadata".freeze
1✔
42

43
    helpers :timer, :thread, :retry_state
1✔
44

45
    Fluent::Plugin.register_input('opensearch', self)
1✔
46

47
    config_param :tag, :string
1✔
48
    config_param :host, :string,  :default => 'localhost'
1✔
49
    config_param :port, :integer, :default => 9200
1✔
50
    config_param :user, :string, :default => nil
1✔
51
    config_param :password, :string, :default => nil, :secret => true
1✔
52
    config_param :path, :string, :default => nil
1✔
53
    config_param :scheme, :enum, :list => [:https, :http], :default => :http
1✔
54
    config_param :hosts, :string, :default => nil
1✔
55
    config_param :index_name, :string, :default => "fluentd"
1✔
56
    config_param :parse_timestamp, :bool, :default => false
1✔
57
    config_param :timestamp_key_format, :string, :default => nil
1✔
58
    config_param :timestamp_parse_error_tag, :string, :default => 'opensearch_plugin.input.time.error'
1✔
59
    config_param :query, :hash, :default => {"sort" => [ "_doc" ]}
1✔
60
    config_param :scroll, :string, :default => "1m"
1✔
61
    config_param :size, :integer, :default => 1000
1✔
62
    config_param :num_slices, :integer, :default => 1
1✔
63
    config_param :interval, :size, :default => 5
1✔
64
    config_param :repeat, :bool, :default => true
1✔
65
    config_param :http_backend, :enum, list: [:excon, :typhoeus], :default => :excon
1✔
66
    config_param :request_timeout, :time, :default => 5
1✔
67
    config_param :reload_connections, :bool, :default => true
1✔
68
    config_param :reload_on_failure, :bool, :default => false
1✔
69
    config_param :resurrect_after, :time, :default => 60
1✔
70
    config_param :reload_after, :integer, :default => DEFAULT_RELOAD_AFTER
1✔
71
    config_param :ssl_verify , :bool, :default => true
1✔
72
    config_param :client_key, :string, :default => nil
1✔
73
    config_param :client_cert, :string, :default => nil
1✔
74
    config_param :client_key_pass, :string, :default => nil, :secret => true
1✔
75
    config_param :ca_file, :string, :default => nil
1✔
76
    config_param :ssl_version, :enum, list: [:SSLv23, :TLSv1, :TLSv1_1, :TLSv1_2], :default => :TLSv1_2
1✔
77
    config_param :with_transporter_log, :bool, :default => false
1✔
78
    config_param :emit_error_label_event, :bool, :default => true
1✔
79
    config_param :sniffer_class_name, :string, :default => nil
1✔
80
    config_param :custom_headers, :hash, :default => {}
1✔
81
    config_param :docinfo_fields, :array, :default => ['_index', '_type', '_id']
1✔
82
    config_param :docinfo_target, :string, :default => METADATA
1✔
83
    config_param :docinfo, :bool, :default => false
1✔
84
    config_param :check_connection, :bool, :default => true
1✔
85
    config_param :retry_forever, :bool, default: true, desc: 'If true, plugin will ignore retry_timeout and retry_max_times options and retry forever.'
1✔
86
    config_param :retry_timeout, :time, default: 72 * 60 * 60, desc: 'The maximum seconds to retry'
1✔
87
    # 72hours == 17 times with exponential backoff (not to change default behavior)
88
    config_param :retry_max_times, :integer, default: 5, desc: 'The maximum number of times to retry'
1✔
89
    # exponential backoff sequence will be initialized at the time of this threshold
90
    config_param :retry_type, :enum, list: [:exponential_backoff, :periodic], default: :exponential_backoff
1✔
91
    ### Periodic -> fixed :retry_wait
92
    ### Exponential backoff: k is number of retry times
93
    # c: constant factor, @retry_wait
94
    # b: base factor, @retry_exponential_backoff_base
95
    # k: times
96
    # total retry time: c + c * b^1 + (...) + c*b^k = c*b^(k+1) - 1
97
    config_param :retry_wait, :time, default: 5, desc: 'Seconds to wait before next retry , or constant factor of exponential backoff.'
1✔
98
    config_param :retry_exponential_backoff_base, :float, default: 2, desc: 'The base number of exponential backoff for retries.'
1✔
99
    config_param :retry_max_interval, :time, default: nil, desc: 'The maximum interval seconds for exponential backoff between retries while failing.'
1✔
100
    config_param :retry_randomize, :bool, default: false, desc: 'If true, output plugin will retry after randomized interval not to do burst retries.'
1✔
101

102
    include Fluent::Plugin::OpenSearchConstants
1✔
103

104
    def initialize
1✔
105
      super
13✔
106
    end
107

108
    def configure(conf)
1✔
109
      super
13✔
110

111
      @timestamp_parser = create_time_parser
13✔
112
      @backend_options = backend_options
13✔
113
      @retry = nil
13✔
114

115
      raise Fluent::ConfigError, "`password` must be present if `user` is present" if @user && @password.nil?
13✔
116

117
      if @user && m = @user.match(/%{(?<user>.*)}/)
13✔
118
        @user = URI.encode_www_form_component(m["user"])
1✔
119
      end
120
      if @password && m = @password.match(/%{(?<password>.*)}/)
13✔
121
        @password = URI.encode_www_form_component(m["password"])
1✔
122
      end
123

124
      @transport_logger = nil
13✔
125
      if @with_transporter_log
13✔
126
        @transport_logger = log
×
127
        log_level = conf['@log_level'] || conf['log_level']
×
128
        log.warn "Consider to specify log_level with @log_level." unless log_level
×
129
      end
130
      @current_config = nil
13✔
131
      # Specify @sniffer_class before calling #client.
132
      @sniffer_class = nil
13✔
133
      begin
134
        @sniffer_class = Object.const_get(@sniffer_class_name) if @sniffer_class_name
13✔
135
      rescue Exception => ex
136
        raise Fluent::ConfigError, "Could not load sniffer class #{@sniffer_class_name}: #{ex}"
×
137
      end
138

139
      @options = {
140
        :index => @index_name,
13✔
141
        :scroll => @scroll,
142
        :size => @size
143
      }
144
      @base_query = @query
13✔
145
    end
146

147
    def backend_options
1✔
148
      case @http_backend
13✔
149
      when :excon
150
        { client_key: @client_key, client_cert: @client_cert, client_key_pass: @client_key_pass }
13✔
151
      when :typhoeus
152
        require 'typhoeus'
×
153
        { sslkey: @client_key, sslcert: @client_cert, keypasswd: @client_key_pass }
×
154
      end
155
    rescue LoadError => ex
156
      log.error_backtrace(ex.backtrace)
×
157
      raise Fluent::ConfigError, "You must install #{@http_backend} gem. Exception: #{ex}"
×
158
    end
159

160
    def retry_state(randomize)
1✔
161
      retry_state_create(
×
162
        :input_retries, @retry_type, @retry_wait, @retry_timeout,
163
        forever: @retry_forever, max_steps: @retry_max_times,
164
        max_interval: @retry_max_interval, backoff_base: @retry_exponential_backoff_base,
165
        randomize: randomize
166
      )
167
    end
168

169
    def get_escaped_userinfo(host_str)
1✔
170
      if m = host_str.match(/(?<scheme>.*)%{(?<user>.*)}:%{(?<password>.*)}(?<path>@.*)/)
8✔
171
        m["scheme"] +
2✔
172
          URI.encode_www_form_component(m["user"]) +
173
          ':' +
174
          URI.encode_www_form_component(m["password"]) +
175
          m["path"]
176
      else
177
        host_str
6✔
178
      end
179
    end
180

181
    def get_connection_options(con_host=nil)
1✔
182

183
      hosts = if con_host || @hosts
21✔
184
        (con_host || @hosts).split(',').map do |host_str|
6✔
185
          # Support legacy hosts format host:port,host:port,host:port...
186
          if host_str.match(%r{^[^:]+(\:\d+)?$})
14✔
187
            {
188
              host:   host_str.split(':')[0],
6✔
189
              port:   (host_str.split(':')[1] || @port).to_i,
6✔
190
              scheme: @scheme.to_s
191
            }
192
          else
193
            # New hosts format expects URLs such as http://logs.foo.com,https://john:pass@logs2.foo.com/elastic
194
            uri = URI(get_escaped_userinfo(host_str))
8✔
195
            %w(user password path).inject(host: uri.host, port: uri.port, scheme: uri.scheme) do |hash, key|
8✔
196
              hash[key.to_sym] = uri.public_send(key) unless uri.public_send(key).nil? || uri.public_send(key) == ''
24✔
197
              hash
24✔
198
            end
199
          end
200
        end.compact
201
      else
202
        [{host: @host, port: @port, scheme: @scheme.to_s}]
15✔
203
      end.each do |host|
204
        host.merge!(user: @user, password: @password) if !host[:user] && @user
29✔
205
        host.merge!(path: @path) if !host[:path] && @path
29✔
206
      end
207
      live_hosts = @check_connection ? hosts.select { |host| reachable_host?(host) } : hosts
21✔
208
      {
209
        hosts: live_hosts
21✔
210
      }
211
    end
212

213
    def reachable_host?(host)
1✔
214
      client = OpenSearch::Client.new(
×
215
        host: ["#{host[:scheme]}://#{host[:host]}:#{host[:port]}"],
216
        user: host[:user],
217
        password: host[:password],
218
        reload_connections: @reload_connections,
219
        request_timeout: @request_timeout,
220
        resurrect_after: @resurrect_after,
221
        reload_on_failure: @reload_on_failure,
222
        transport_options: { ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version } }
223
      )
224
      client.ping
×
225
    rescue => e
226
      log.warn "Failed to connect to #{host[:scheme]}://#{host[:host]}:#{host[:port]}: #{e.message}"
×
227
      false
×
228
    end
229

230
    def emit_error_label_event(&block)
1✔
231
      # If `emit_error_label_event` is specified as false, error event emittions are not occurred.
232
      if emit_error_label_event
×
233
        block.call
×
234
      end
235
    end
236

237
    def start
1✔
238
      super
7✔
239

240
      timer_execute(:in_opensearch_timer, @interval, repeat: @repeat, &method(:run))
7✔
241
    end
242

243
    # We might be able to use
244
    # Fluent::Parser::TimeParser, but it doesn't quite do what we want - if gives
245
    # [sec,nsec] where as we want something we can call `strftime` on...
246
    def create_time_parser
1✔
247
      if @timestamp_key_format
13✔
248
        begin
249
          # Strptime doesn't support all formats, but for those it does it's
250
          # blazingly fast.
251
          strptime = Strptime.new(@timestamp_key_format)
1✔
252
          Proc.new { |value|
1✔
253
            value = convert_numeric_time_into_string(value, @timestamp_key_format) if value.is_a?(Numeric)
1✔
254
            strptime.exec(value).to_time
1✔
255
          }
256
        rescue
257
          # Can happen if Strptime doesn't recognize the format; or
258
          # if strptime couldn't be required (because it's not installed -- it's
259
          # ruby 2 only)
260
          Proc.new { |value|
×
261
            value = convert_numeric_time_into_string(value, @timestamp_key_format) if value.is_a?(Numeric)
×
262
            DateTime.strptime(value, @timestamp_key_format).to_time
×
263
          }
264
        end
265
      else
266
        Proc.new { |value|
12✔
267
          value = convert_numeric_time_into_string(value) if value.is_a?(Numeric)
1✔
268
          DateTime.parse(value).to_time
1✔
269
        }
270
      end
271
    end
272

273
    def convert_numeric_time_into_string(numeric_time, timestamp_key_format = "%Y-%m-%dT%H:%M:%S.%N%z")
1✔
274
      numeric_time_parser = Fluent::NumericTimeParser.new(:float)
×
275
      Time.at(numeric_time_parser.parse(numeric_time).to_r).strftime(timestamp_key_format)
×
276
    end
277

278
    def parse_time(value, event_time, tag)
1✔
279
      @timestamp_parser.call(value)
2✔
280
    rescue => e
281
      emit_error_label_event do
×
282
        router.emit_error_event(@timestamp_parse_error_tag, Fluent::Engine.now, {'tag' => tag, 'time' => event_time, 'format' => @timestamp_key_format, 'value' => value}, e)
×
283
      end
284
      return Time.at(event_time).to_time
×
285
    end
286

287
    def client(host = nil)
1✔
288
      # check here to see if we already have a client connection for the given host
289
      connection_options = get_connection_options(host)
11✔
290

291
      @_os = nil unless is_existing_connection(connection_options[:hosts])
11✔
292

293
      @_os ||= begin
11✔
294
        @current_config = connection_options[:hosts].clone
7✔
295
        adapter_conf = lambda {|f| f.adapter @http_backend, @backend_options }
14✔
296
        local_reload_connections = @reload_connections
7✔
297
        if local_reload_connections && @reload_after > DEFAULT_RELOAD_AFTER
7✔
298
          local_reload_connections = @reload_after
×
299
        end
300

301
        headers = { 'Content-Type' => "application/json" }.merge(@custom_headers)
7✔
302

303
        transport = OpenSearch::Transport::Transport::HTTP::Faraday.new(
7✔
304
          connection_options.merge(
305
            options: {
306
              reload_connections: local_reload_connections,
307
              reload_on_failure: @reload_on_failure,
308
              resurrect_after: @resurrect_after,
309
              logger: @transport_logger,
310
              transport_options: {
311
                headers: headers,
312
                request: { timeout: @request_timeout },
313
                ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version }
314
              },
315
              http: {
316
                user: @user,
317
                password: @password
318
              },
319
              sniffer_class: @sniffer_class,
320
            }), &adapter_conf)
321
        OpenSearch::Client.new transport: transport
7✔
322
      end
323
    end
324

325
    def is_existing_connection(host)
1✔
326
      # check if the host provided match the current connection
327
      return false if @_os.nil?
11✔
328
      return false if @current_config.nil?
4✔
329
      return false if host.length != @current_config.length
4✔
330

331
      for i in 0...host.length
4✔
332
        if !host[i][:host].eql? @current_config[i][:host] || host[i][:port] != @current_config[i][:port]
4✔
333
          return false
×
334
        end
335
      end
336

337
      return true
4✔
338
    end
339

340
    def update_retry_state(error=nil)
1✔
341
      if error
8✔
342
        unless @retry
×
343
          @retry = retry_state(@retry_randomize)
×
344
        end
345
        @retry.step
×
346
        #Raise error if the retry limit has been reached
347
        raise "Hit limit for retries. retry_times: #{@retry.steps}, error: #{error.message}" if @retry.limit?
×
348
        #Retry if the limit hasn't been reached
349
        log.warn("failed to connect or search.", retry_times: @retry.steps, next_retry_time: @retry.next_time.round, error: error.message)
×
350
        sleep(@retry.next_time - Time.now)
×
351
      else
352
        unless @retry.nil?
8✔
353
          log.debug("retry succeeded.")
×
354
          @retry = nil
×
355
        end
356
      end
357
    end
358

359
    def run
1✔
360
      return run_slice if @num_slices <= 1
7✔
361

362
      log.warn("Large slice number is specified:(#{@num_slices}). Consider reducing num_slices") if @num_slices > 8
1✔
363

364
      @num_slices.times.map do |slice_id|
1✔
365
        thread_create(:"in_opensearch_thread_#{slice_id}") do
2✔
366
          run_slice(slice_id)
2✔
367
        end
368
      end
369
    rescue Faraday::ConnectionFailed, OpenSearch::Transport::Transport::Error => error
370
      update_retry_state(error)
×
371
      retry
×
372
    end
373

374
    def run_slice(slice_id=nil)
1✔
375
      slice_query = @base_query
8✔
376
      slice_query = slice_query.merge('slice' => { 'id' => slice_id, 'max' => @num_slices}) unless slice_id.nil?
8✔
377
      result = client.search(@options.merge(:body => Yajl.dump(slice_query) ))
8✔
378
      es = Fluent::MultiEventStream.new
8✔
379

380
      result["hits"]["hits"].each {|hit| process_events(hit, es)}
16✔
381
      has_hits = result['hits']['hits'].any?
8✔
382
      scroll_id = result['_scroll_id']
8✔
383

384
      while has_hits && scroll_id
8✔
385
        result = process_next_scroll_request(es, scroll_id)
2✔
386
        has_hits = result['has_hits']
2✔
387
        scroll_id = result['_scroll_id']
2✔
388
      end
389

390
      router.emit_stream(@tag, es)
8✔
391
      clear_scroll(scroll_id)
8✔
392
      update_retry_state
8✔
393
    end
394

395
    def clear_scroll(scroll_id)
1✔
396
      client.clear_scroll(scroll_id: scroll_id) if scroll_id
8✔
397
    rescue => e
398
      # ignore & log any clear_scroll errors
399
      log.warn("Ignoring clear_scroll exception", message: e.message, exception: e.class)
×
400
    end
401

402
    def process_scroll_request(scroll_id)
1✔
403
      client.scroll(:body => { :scroll_id => scroll_id }, :scroll => @scroll)
2✔
404
    end
405

406
    def process_next_scroll_request(es, scroll_id)
1✔
407
      result = process_scroll_request(scroll_id)
2✔
408
      result['hits']['hits'].each { |hit| process_events(hit, es) }
3✔
409
      {'has_hits' => result['hits']['hits'].any?, '_scroll_id' => result['_scroll_id']}
2✔
410
    end
411

412
    def process_events(hit, es)
1✔
413
      event = hit["_source"]
9✔
414
      time = Fluent::Engine.now
9✔
415
      if @parse_timestamp
9✔
416
        if event.has_key?(TIMESTAMP_FIELD)
2✔
417
          rts = event[TIMESTAMP_FIELD]
2✔
418
          time = parse_time(rts, time, @tag)
2✔
419
        end
420
      end
421
      if @docinfo
9✔
422
        docinfo_target = event[@docinfo_target] || {}
1✔
423

424
        unless docinfo_target.is_a?(Hash)
1✔
425
          raise UnrecoverableError, "incompatible type for the docinfo_target=#{@docinfo_target} field in the `_source` document, expected a hash got:", :type => docinfo_target.class, :event => event
×
426
        end
427

428
        @docinfo_fields.each do |field|
1✔
429
          docinfo_target[field] = hit[field]
3✔
430
        end
431

432
        event[@docinfo_target] = docinfo_target
1✔
433
      end
434
      es.add(time, event)
9✔
435
    end
436
  end
437
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc