• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

fluent / fluent-plugin-opensearch / 14873730023

07 May 2025 02:22AM UTC coverage: 91.339% (-0.006%) from 91.345%
14873730023

Pull #156

github

web-flow
Merge 1576e49ae into 4b6ff1a28
Pull Request #156: Fix memory usage

1160 of 1270 relevant lines covered (91.34%)

155.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.41
/lib/fluent/plugin/opensearch_index_template.rb
1
# SPDX-License-Identifier: Apache-2.0
2
#
3
# The fluent-plugin-opensearch Contributors require contributions made to
4
# this file be licensed under the Apache-2.0 license or a
5
# compatible open source license.
6
#
7
# Modifications Copyright fluent-plugin-opensearch Contributors. See
8
# GitHub history for details.
9
#
10
# Licensed to Uken Inc. under one or more contributor
11
# license agreements. See the NOTICE file distributed with
12
# this work for additional information regarding copyright
13
# ownership. Uken Inc. licenses this file to you under
14
# the Apache License, Version 2.0 (the "License"); you may
15
# not use this file except in compliance with the License.
16
# You may obtain a copy of the License at
17
#
18
#   http://www.apache.org/licenses/LICENSE-2.0
19
#
20
# Unless required by applicable law or agreed to in writing,
21
# software distributed under the License is distributed on an
22
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
23
# KIND, either express or implied.  See the License for the
24
# specific language governing permissions and limitations
25
# under the License.
26

27
require 'fluent/error'
1✔
28
require_relative './opensearch_error'
1✔
29

30
module Fluent::OpenSearchIndexTemplate
1✔
31
  def get_template(template_file)
1✔
32
    if !File.exist?(template_file)
28✔
33
      raise "If you specify a template_name you must specify a valid template file (checked '#{template_file}')!"
3✔
34
    end
35
    file_contents = IO.read(template_file).gsub(/\n/,'')
25✔
36
    JSON.parse(file_contents)
25✔
37
  end
38

39
  def get_custom_template(template_file, customize_template)
1✔
40
    if !File.exist?(template_file)
8✔
41
      raise "If you specify a template_name you must specify a valid template file (checked '#{template_file}')!"
×
42
    end
43
    file_contents = IO.read(template_file).gsub(/\n/,'')
8✔
44
    customize_template.each do |key, value|
8✔
45
      file_contents = file_contents.gsub(key,value.downcase)
16✔
46
    end
47
    JSON.parse(file_contents)
8✔
48
  end
49

50
  def template_exists?(name, host = nil)
1✔
51
    if @use_legacy_template
86✔
52
      client(host).indices.get_template(:name => name)
23✔
53
    else
54
      client(host).indices.get_index_template(:name => name)
63✔
55
    end
56
    return true
4✔
57
  rescue OpenSearch::Transport::Transport::Errors::NotFound
58
    return false
25✔
59
  end
60

61
  def host_unreachable_exceptions
1✔
62
    client.transport.transport.host_unreachable_exceptions
74✔
63
  end
64

65
  def retry_operate(max_retries, fail_on_retry_exceed = true, catch_transport_exceptions = true)
1✔
66
    return unless block_given?
470✔
67
    retries = 0
470✔
68
    transport_errors = OpenSearch::Transport::Transport::Errors.constants.map{ |c| OpenSearch::Transport::Transport::Errors.const_get c } if catch_transport_exceptions
20,168✔
69
    begin
70
      yield
491✔
71
    rescue *host_unreachable_exceptions, *transport_errors, Timeout::Error => e
74✔
72
      @_es = nil
69✔
73
      @_es_info = nil
69✔
74
      if retries < max_retries
69✔
75
        retries += 1
21✔
76
        wait_seconds = 2**retries
21✔
77
        sleep wait_seconds
21✔
78
        log.warn "Could not communicate to OpenSearch, resetting connection and trying again. #{e.message}"
21✔
79
        log.warn "Remaining retry: #{max_retries - retries}. Retry to communicate after #{wait_seconds} second(s)."
21✔
80
        retry
21✔
81
      end
82
      message = "Could not communicate to OpenSearch after #{retries} retries. #{e.message}"
48✔
83
      log.warn message
48✔
84
      raise Fluent::Plugin::OpenSearchError::RetryableOperationExhaustedFailure,
85
            message if fail_on_retry_exceed
48✔
86
    end
87
  end
88

89
  def template_put(name, template, host = nil)
1✔
90
    if @use_legacy_template
33✔
91
      client(host).indices.put_template(:name => name, :body => template)
16✔
92
    else
93
      client(host).indices.put_index_template(:name => name, :body => template)
17✔
94
    end
95
  end
96

97
  def indexcreation(index_name, host = nil)
1✔
98
    client(host).indices.create(:index => index_name)
×
99
  rescue OpenSearch::Transport::Transport::Error => e
100
    if e.message =~ /"already exists"/ || e.message =~ /resource_already_exists_exception/
×
101
      log.debug("Index #{index_name} already exists")
×
102
    else
103
      log.error("Error while index creation - #{index_name}", error: e)
×
104
    end
105
  end
106

107
  def template_install(name, template_file, overwrite, host = nil, target_index = nil, index_separator = '-')
1✔
108
    if overwrite
93✔
109
      template_put(name,
8✔
110
                   get_template(template_file), host)
111

112
      log.debug("Template '#{name}' overwritten with #{template_file}.")
8✔
113
      return
8✔
114
    end
115
    if !template_exists?(name, host)
85✔
116
      template_put(name,
20✔
117
                   get_template(template_file), host)
118
      log.info("Template configured, but no template installed. Installed '#{name}' from #{template_file}.")
17✔
119
    else
120
      log.debug("Template '#{name}' configured and already installed.")
4✔
121
    end
122
  end
123

124
  def template_custom_install(template_name, template_file, overwrite, customize_template, host, target_index, index_separator)
1✔
125
    custom_template = get_custom_template(template_file, customize_template)
8✔
126

127
    if overwrite
8✔
128
      template_put(template_name, custom_template, host)
2✔
129
      log.info("Template '#{template_name}' overwritten with #{template_file}.")
2✔
130
    else
131
      if !template_exists?(template_name, host)
6✔
132
        template_put(template_name, custom_template, host)
6✔
133
        log.info("Template configured, but no template installed. Installed '#{template_name}' from #{template_file}.")
6✔
134
      else
135
        log.debug("Template '#{template_name}' configured and already installed.")
×
136
      end
137
    end
138
  end
139

140
  def templates_hash_install(templates, overwrite)
1✔
141
    templates.each do |key, value|
8✔
142
      template_install(key, value, overwrite)
20✔
143
    end
144
  end
145

146
  def rollover_alias_payload(rollover_alias)
1✔
147
    {
148
      'aliases' => {
×
149
        rollover_alias => {
150
          'is_write_index' =>  true
151
        }
152
      }
153
    }
154
  end
155
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc