• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

fluent / fluent-plugin-opensearch / 14873730023

07 May 2025 02:22AM UTC coverage: 91.339% (-0.006%) from 91.345%
14873730023

Pull #156

github

web-flow
Merge 1576e49ae into 4b6ff1a28
Pull Request #156: Fix memory usage

1160 of 1270 relevant lines covered (91.34%)

155.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.6
/lib/fluent/plugin/out_opensearch_data_stream.rb
1

2
require_relative 'out_opensearch'
1✔
3

4
module Fluent::Plugin
1✔
5
  class OpenSearchOutputDataStream < OpenSearchOutput
1✔
6

7
    Fluent::Plugin.register_output('opensearch_data_stream', self)
1✔
8

9
    helpers :event_emitter
1✔
10

11
    config_param :data_stream_name, :string
1✔
12
    config_param :data_stream_template_name, :string, :default => nil
1✔
13
    # OpenSearch 1.0 or later always support new style of index template.
14
    config_set_default :use_legacy_template, false
1✔
15

16
    INVALID_START_CHRACTERS = ["-", "_", "+", "."]
1✔
17
    INVALID_CHARACTERS = ["\\", "/", "*", "?", "\"", "<", ">", "|", " ", ",", "#", ":"]
1✔
18

19
    def configure(conf)
1✔
20
      super
61✔
21

22
      @data_stream_template_name = "#{@data_stream_name}_template" if @data_stream_template_name.nil?
59✔
23

24
      # ref. https://opensearch.org/docs/latest/opensearch/data-streams/
25
      unless placeholder?(:data_stream_name_placeholder, @data_stream_name)
59✔
26
        validate_data_stream_parameters
53✔
27
      else
28
        @use_placeholder = true
6✔
29
        @data_stream_names = []
6✔
30
      end
31

32
      unless @use_placeholder
19✔
33
        begin
34
          @data_stream_names = [@data_stream_name]
13✔
35
          retry_operate(@max_retry_putting_template,
13✔
36
                        @fail_on_putting_template_retry_exceed,
37
                        @catch_transport_exception_on_retry) do
38
            create_index_template(@data_stream_name, @data_stream_template_name)
14✔
39
          end
40
        rescue => e
41
          raise Fluent::ConfigError, "Failed to create data stream: <#{@data_stream_name}> #{e.message}"
×
42
        end
43
      end
44
    end
45

46
    def validate_data_stream_parameters
1✔
47
      {"data_stream_name" => @data_stream_name,
53✔
48
       "data_stream_template_name" => @data_stream_template_name}.each do |parameter, value|
49
        unless valid_data_stream_parameters?(value)
86✔
50
          unless start_with_valid_characters?(value)
40✔
51
            if not_dots?(value)
12✔
52
              raise Fluent::ConfigError, "'#{parameter}' must not start with #{INVALID_START_CHRACTERS.join(",")}: <#{value}>"
8✔
53
            else
54
              raise Fluent::ConfigError, "'#{parameter}' must not be . or ..: <#{value}>"
4✔
55
            end
56
          end
57
          unless valid_characters?(value)
28✔
58
            raise Fluent::ConfigError, "'#{parameter}' must not contain invalid characters #{INVALID_CHARACTERS.join(",")}: <#{value}>"
24✔
59
          end
60
          unless lowercase_only?(value)
4✔
61
            raise Fluent::ConfigError, "'#{parameter}' must be lowercase only: <#{value}>"
2✔
62
          end
63
          if value.bytes.size > 255
2✔
64
            raise Fluent::ConfigError, "'#{parameter}' must not be longer than 255 bytes: <#{value}>"
2✔
65
          end
66
        end
67
      end
68
    end
69

70
    def create_index_template(datastream_name, template_name, host = nil)
1✔
71
      # Create index template from file
72
      if !dry_run?
21✔
73
        if @template_file
21✔
74
          return if data_stream_exist?(datastream_name, host) or template_exists?(template_name, host)
1✔
75
          template_installation_actual(template_name, @customize_template, @application_name, datastream_name, host)
1✔
76
        else # Create default index template
77
          return if data_stream_exist?(datastream_name, host) or template_exists?(template_name, host)
20✔
78
          body = {
79
            "index_patterns" => ["#{datastream_name}*"],
15✔
80
            "data_stream" => {},
81
          }
82

83
          params = {
84
            name: template_name,
15✔
85
            body: body
86
          }
87
          retry_operate(@max_retry_putting_template,
15✔
88
                        @fail_on_putting_template_retry_exceed,
89
                        @catch_transport_exception_on_retry) do
90
            client(host).indices.put_index_template(params)
15✔
91
          end
92
        end
93
      end
94
    end
95

96
    def data_stream_exist?(datastream_name, host = nil)
1✔
97
      params = {
98
        name: datastream_name
21✔
99
      }
100
      begin
101
        # TODO: Use X-Pack equivalent performing DataStream operation method on the following line
102
        response = client(host).perform_request('GET', "/_data_stream/#{datastream_name}", {}, params)
21✔
103
        return (not response.is_a?(OpenSearch::Transport::Transport::Errors::NotFound))
4✔
104
      rescue OpenSearch::Transport::Transport::Errors::NotFound => e
17✔
105
        log.info "Specified data stream does not exist. Will be created: <#{e}>"
17✔
106
        return false
17✔
107
      end
108
    end
109

110
    def template_exists?(name, host = nil)
1✔
111
      if @use_legacy_template
22✔
112
        client(host).indices.get_template(:name => name)
×
113
      else
114
        client(host).indices.get_index_template(:name => name)
22✔
115
      end
116
      return true
×
117
    rescue OpenSearch::Transport::Transport::Errors::NotFound
118
      return false
17✔
119
    end
120

121
    def valid_data_stream_parameters?(data_stream_parameter)
1✔
122
      lowercase_only?(data_stream_parameter) and
86✔
123
        valid_characters?(data_stream_parameter) and
124
        start_with_valid_characters?(data_stream_parameter) and
125
        not_dots?(data_stream_parameter) and
126
        data_stream_parameter.bytes.size <= 255
127
    end
128

129
    def lowercase_only?(data_stream_parameter)
1✔
130
      data_stream_parameter.downcase == data_stream_parameter
90✔
131
    end
132

133
    def valid_characters?(data_stream_parameter)
1✔
134
      not (INVALID_CHARACTERS.each.any? do |v| data_stream_parameter.include?(v) end)
908✔
135
    end
136

137
    def start_with_valid_characters?(data_stream_parameter)
1✔
138
      not (INVALID_START_CHRACTERS.each.any? do |v| data_stream_parameter.start_with?(v) end)
448✔
139
    end
140

141
    def not_dots?(data_stream_parameter)
1✔
142
      not (data_stream_parameter == "." or data_stream_parameter == "..")
60✔
143
    end
144

145
    def client_library_version
1✔
146
      OpenSearch::VERSION
×
147
    end
148

149
    def multi_workers_ready?
1✔
150
      true
×
151
    end
152

153
    def write(chunk)
1✔
154
      data_stream_name = @data_stream_name
14✔
155
      data_stream_template_name = @data_stream_template_name
14✔
156
      host = nil
14✔
157
      if @use_placeholder
14✔
158
        host = if @hosts
7✔
159
                 extract_placeholders(@hosts, chunk)
1✔
160
               else
161
                 extract_placeholders(@host, chunk)
6✔
162
               end
163
        data_stream_name = extract_placeholders(@data_stream_name, chunk).downcase
7✔
164
        data_stream_template_name = extract_placeholders(@data_stream_template_name, chunk).downcase
7✔
165
        begin
166
          create_index_template(data_stream_name, data_stream_template_name, host)
7✔
167
        rescue => e
168
          raise Fluent::ConfigError, "Failed to create data stream: <#{data_stream_name}> #{e.message}"
×
169
        end
170
      end
171

172
      bulk_message = ""
14✔
173
      headers = {
174
        CREATE_OP => {}
14✔
175
      }
176
      tag = chunk.metadata.tag
14✔
177
      chunk.msgpack_each do |time, record|
14✔
178
        next unless record.is_a? Hash
1,013✔
179
        begin
180
          if record.has_key?(TIMESTAMP_FIELD)
1,013✔
181
            rts = record[TIMESTAMP_FIELD]
1,007✔
182
            dt = parse_time(rts, time, tag)
1,007✔
183
          elsif record.has_key?(@time_key)
6✔
184
            rts = record[@time_key]
2✔
185
            dt = parse_time(rts, time, tag)
2✔
186
          else
187
            dt = Time.at(time).to_datetime
4✔
188
          end
189
          record.merge!({"@timestamp" => dt.iso8601(@time_precision)})
1,013✔
190
          if @include_tag_key
1,013✔
191
            record[@tag_key] = tag
1✔
192
          end
193
          if @remove_keys
1,013✔
194
            @remove_keys.each { |key| record.delete(key) }
2✔
195
          end
196
          bulk_message = append_record_to_messages(CREATE_OP, {}, headers, record, bulk_message)
1,013✔
197
        rescue => e
198
          emit_error_label_event do
×
199
            router.emit_error_event(tag, time, record, e)
×
200
          end
201
        end
202
      end
203

204
      return if bulk_message.to_s.empty?
14✔
205

206
      params = {
207
        index: data_stream_name,
14✔
208
        body: bulk_message
209
      }
210
      begin
211
        response = client(host).bulk(params)
14✔
212
        if response['errors']
14✔
213
          log.error "Could not bulk insert to Data Stream: #{data_stream_name} #{response}"
×
214
        end
215
      rescue => e
216
        raise RecoverableRequestFailure, "could not push logs to OpenSearch cluster (#{data_stream_name}): #{e.message}"
×
217
      end
218
    end
219

220
    def append_record_to_messages(op, meta, header, record, msgs)
1✔
221
      header[CREATE_OP] = meta
1,013✔
222
      msgs << @dump_proc.call(header) << BODY_DELIMITER
1,013✔
223
      msgs << @dump_proc.call(record) << BODY_DELIMITER
1,013✔
224
      msgs
1,013✔
225
    end
226

227
    def retry_stream_retryable?
1✔
228
      @buffer.storable?
×
229
    end
230
  end
231
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc