• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

fluent / fluent-plugin-opensearch / 4080784654

pending completion
4080784654

push

github

Takuro Ashie
Replace obsolete File.exists? with File.exist?

1129 of 1227 relevant lines covered (92.01%)

157.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.18
/lib/fluent/plugin/out_opensearch_data_stream.rb
1

2
require_relative 'out_opensearch'
1✔
3

4
module Fluent::Plugin
1✔
5
  class OpenSearchOutputDataStream < OpenSearchOutput
1✔
6

7
    Fluent::Plugin.register_output('opensearch_data_stream', self)
1✔
8

9
    helpers :event_emitter
1✔
10

11
    config_param :data_stream_name, :string
1✔
12
    config_param :data_stream_template_name, :string, :default => nil
1✔
13
    # OpenSearch 1.0 or later always support new style of index template.
14
    config_set_default :use_legacy_template, false
1✔
15

16
    INVALID_START_CHRACTERS = ["-", "_", "+", "."]
1✔
17
    INVALID_CHARACTERS = ["\\", "/", "*", "?", "\"", "<", ">", "|", " ", ",", "#", ":"]
1✔
18

19
    def configure(conf)
1✔
20
      super
58✔
21

22
      @data_stream_template_name = "#{@data_stream_name}_template" if @data_stream_template_name.nil?
56✔
23

24
      # ref. https://opensearch.org/docs/latest/opensearch/data-streams/
25
      unless placeholder?(:data_stream_name_placeholder, @data_stream_name)
56✔
26
        validate_data_stream_parameters
50✔
27
      else
28
        @use_placeholder = true
6✔
29
        @data_stream_names = []
6✔
30
      end
31

32
      unless @use_placeholder
16✔
33
        begin
34
          @data_stream_names = [@data_stream_name]
10✔
35
          retry_operate(@max_retry_putting_template,
10✔
36
                        @fail_on_putting_template_retry_exceed,
37
                        @catch_transport_exception_on_retry) do
38
            create_index_template(@data_stream_name, @data_stream_template_name)
11✔
39
          end
40
        rescue => e
41
          raise Fluent::ConfigError, "Failed to create data stream: <#{@data_stream_name}> #{e.message}"
×
42
        end
43
      end
44
    end
45

46
    def validate_data_stream_parameters
1✔
47
      {"data_stream_name" => @data_stream_name,
50✔
48
       "data_stream_template_name" => @data_stream_template_name}.each do |parameter, value|
49
        unless valid_data_stream_parameters?(value)
80✔
50
          unless start_with_valid_characters?(value)
40✔
51
            if not_dots?(value)
12✔
52
              raise Fluent::ConfigError, "'#{parameter}' must not start with #{INVALID_START_CHRACTERS.join(",")}: <#{value}>"
8✔
53
            else
54
              raise Fluent::ConfigError, "'#{parameter}' must not be . or ..: <#{value}>"
4✔
55
            end
56
          end
57
          unless valid_characters?(value)
28✔
58
            raise Fluent::ConfigError, "'#{parameter}' must not contain invalid characters #{INVALID_CHARACTERS.join(",")}: <#{value}>"
24✔
59
          end
60
          unless lowercase_only?(value)
4✔
61
            raise Fluent::ConfigError, "'#{parameter}' must be lowercase only: <#{value}>"
2✔
62
          end
63
          if value.bytes.size > 255
2✔
64
            raise Fluent::ConfigError, "'#{parameter}' must not be longer than 255 bytes: <#{value}>"
2✔
65
          end
66
        end
67
      end
68
    end
69

70
    def create_index_template(datastream_name, template_name, host = nil)
1✔
71
      # Create index template from file
72
      if @template_file
18✔
73
        template_installation_actual(template_name, @customize_template, @application_name, datastream_name, host)
1✔
74
      else # Create default index template
75
        return if data_stream_exist?(datastream_name, host) or template_exists?(template_name, host)
17✔
76
        body = {
77
          "index_patterns" => ["#{datastream_name}*"],
12✔
78
          "data_stream" => {},
79
        }
80

81
        params = {
82
          name: template_name,
12✔
83
          body: body
84
        }
85
        retry_operate(@max_retry_putting_template,
12✔
86
                      @fail_on_putting_template_retry_exceed,
87
                      @catch_transport_exception_on_retry) do
88
          client(host).indices.put_index_template(params)
12✔
89
        end
90
      end
91
    end
92

93
    def data_stream_exist?(datastream_name, host = nil)
1✔
94
      params = {
95
        name: datastream_name
17✔
96
      }
97
      begin
98
        # TODO: Use X-Pack equivalent performing DataStream operation method on the following line
99
        response = client(host).perform_request('GET', "/_data_stream/#{datastream_name}", {}, params)
17✔
100
        return (not response.is_a?(OpenSearch::Transport::Transport::Errors::NotFound))
4✔
101
      rescue OpenSearch::Transport::Transport::Errors::NotFound => e
13✔
102
        log.info "Specified data stream does not exist. Will be created: <#{e}>"
13✔
103
        return false
13✔
104
      end
105
    end
106

107
    def template_exists?(name, host = nil)
1✔
108
      if @use_legacy_template
18✔
109
        client(host).indices.get_template(:name => name)
×
110
      else
111
        client(host).indices.get_index_template(:name => name)
18✔
112
      end
113
      return true
×
114
    rescue OpenSearch::Transport::Transport::Errors::NotFound
115
      return false
13✔
116
    end
117

118
    def valid_data_stream_parameters?(data_stream_parameter)
1✔
119
      lowercase_only?(data_stream_parameter) and
80✔
120
        valid_characters?(data_stream_parameter) and
121
        start_with_valid_characters?(data_stream_parameter) and
122
        not_dots?(data_stream_parameter) and
123
        data_stream_parameter.bytes.size <= 255
124
    end
125

126
    def lowercase_only?(data_stream_parameter)
1✔
127
      data_stream_parameter.downcase == data_stream_parameter
84✔
128
    end
129

130
    def valid_characters?(data_stream_parameter)
1✔
131
      not (INVALID_CHARACTERS.each.any? do |v| data_stream_parameter.include?(v) end)
830✔
132
    end
133

134
    def start_with_valid_characters?(data_stream_parameter)
1✔
135
      not (INVALID_START_CHRACTERS.each.any? do |v| data_stream_parameter.start_with?(v) end)
418✔
136
    end
137

138
    def not_dots?(data_stream_parameter)
1✔
139
      not (data_stream_parameter == "." or data_stream_parameter == "..")
54✔
140
    end
141

142
    def client_library_version
1✔
143
      OpenSearch::VERSION
×
144
    end
145

146
    def multi_workers_ready?
1✔
147
      true
×
148
    end
149

150
    def write(chunk)
1✔
151
      data_stream_name = @data_stream_name
11✔
152
      data_stream_template_name = @data_stream_template_name
11✔
153
      host = nil
11✔
154
      if @use_placeholder
11✔
155
        host = if @hosts
7✔
156
                 extract_placeholders(@hosts, chunk)
1✔
157
               else
158
                 extract_placeholders(@host, chunk)
6✔
159
               end
160
        data_stream_name = extract_placeholders(@data_stream_name, chunk).downcase
7✔
161
        data_stream_template_name = extract_placeholders(@data_stream_template_name, chunk).downcase
7✔
162
        unless @data_stream_names.include?(data_stream_name)
7✔
163
          begin
164
            create_index_template(data_stream_name, data_stream_template_name, host)
7✔
165
            @data_stream_names << data_stream_name
7✔
166
          rescue => e
167
            raise Fluent::ConfigError, "Failed to create data stream: <#{data_stream_name}> #{e.message}"
×
168
          end
169
        end
170
      end
171

172
      bulk_message = ""
11✔
173
      headers = {
174
        CREATE_OP => {}
11✔
175
      }
176
      tag = chunk.metadata.tag
11✔
177
      chunk.msgpack_each do |time, record|
11✔
178
        next unless record.is_a? Hash
1,010✔
179
        begin
180
          if record.has_key?(TIMESTAMP_FIELD)
1,010✔
181
            rts = record[TIMESTAMP_FIELD]
1,007✔
182
            dt = parse_time(rts, time, tag)
1,007✔
183
          elsif record.has_key?(@time_key)
3✔
184
            rts = record[@time_key]
2✔
185
            dt = parse_time(rts, time, tag)
2✔
186
          else
187
            dt = Time.at(time).to_datetime
1✔
188
          end
189
          record.merge!({"@timestamp" => dt.iso8601(@time_precision)})
1,010✔
190
          bulk_message = append_record_to_messages(CREATE_OP, {}, headers, record, bulk_message)
1,010✔
191
        rescue => e
192
          emit_error_label_event do
×
193
            router.emit_error_event(tag, time, record, e)
×
194
          end
195
        end
196
      end
197

198
      params = {
199
        index: data_stream_name,
11✔
200
        body: bulk_message
201
      }
202
      begin
203
        response = client(host).bulk(params)
11✔
204
        if response['errors']
11✔
205
          log.error "Could not bulk insert to Data Stream: #{data_stream_name} #{response}"
×
206
        end
207
      rescue => e
208
        raise RecoverableRequestFailure, "could not push logs to OpenSearch cluster (#{data_stream_name}): #{e.message}"
×
209
      end
210
    end
211

212
    def append_record_to_messages(op, meta, header, record, msgs)
1✔
213
      header[CREATE_OP] = meta
1,010✔
214
      msgs << @dump_proc.call(header) << BODY_DELIMITER
1,010✔
215
      msgs << @dump_proc.call(record) << BODY_DELIMITER
1,010✔
216
      msgs
1,010✔
217
    end
218

219
    def retry_stream_retryable?
1✔
220
      @buffer.storable?
×
221
    end
222
  end
223
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc