• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

fluent / fluent-plugin-opensearch / 14873730023

07 May 2025 02:22AM UTC coverage: 91.339% (-0.006%) from 91.345%
14873730023

Pull #156

github

web-flow
Merge 1576e49ae into 4b6ff1a28
Pull Request #156: Fix memory usage

1160 of 1270 relevant lines covered (91.34%)

155.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.0
/lib/fluent/plugin/opensearch_error_handler.rb
1
# SPDX-License-Identifier: Apache-2.0
2
#
3
# The fluent-plugin-opensearch Contributors require contributions made to
4
# this file be licensed under the Apache-2.0 license or a
5
# compatible open source license.
6
#
7
# Modifications Copyright fluent-plugin-opensearch Contributors. See
8
# GitHub history for details.
9
#
10
# Licensed to Uken Inc. under one or more contributor
11
# license agreements. See the NOTICE file distributed with
12
# this work for additional information regarding copyright
13
# ownership. Uken Inc. licenses this file to you under
14
# the Apache License, Version 2.0 (the "License"); you may
15
# not use this file except in compliance with the License.
16
# You may obtain a copy of the License at
17
#
18
#   http://www.apache.org/licenses/LICENSE-2.0
19
#
20
# Unless required by applicable law or agreed to in writing,
21
# software distributed under the License is distributed on an
22
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
23
# KIND, either express or implied.  See the License for the
24
# specific language governing permissions and limitations
25
# under the License.
26

27
require 'fluent/event'
1✔
28
require 'fluent/error'
1✔
29
require_relative 'opensearch_constants'
1✔
30

31
class Fluent::Plugin::OpenSearchErrorHandler
1✔
32
  include Fluent::Plugin::OpenSearchConstants
1✔
33

34
  attr_accessor :bulk_message_count
1✔
35
  class OpenSearchVersionMismatch < Fluent::UnrecoverableError; end
1✔
36
  class OpenSearchSubmitMismatch < Fluent::UnrecoverableError; end
1✔
37
  class OpenSearchRequestAbortError < Fluent::UnrecoverableError; end
1✔
38
  class OpenSearchError < StandardError; end
1✔
39

40
  def initialize(plugin)
1✔
41
    @plugin = plugin
19✔
42
  end
43

44
  def unrecoverable_error_types
1✔
45
    @plugin.unrecoverable_error_types
18✔
46
  end
47

48
  def unrecoverable_error?(type)
1✔
49
    unrecoverable_error_types.include?(type)
18✔
50
  end
51

52
  def unrecoverable_record_types
1✔
53
    @plugin.unrecoverable_record_types
29✔
54
  end
55

56
  def unrecoverable_record_error?(type)
1✔
57
    unrecoverable_record_types.include?(type)
29✔
58
  end
59

60
  def log_os_400_reason(&block)
1✔
61
    if @plugin.log_os_400_reason
9✔
62
      block.call
2✔
63
    else
64
      @plugin.log.on_debug(&block)
7✔
65
    end
66
  end
67

68
  def emit_error_label_event?
1✔
69
    !!@plugin.emit_error_label_event
14✔
70
  end
71

72
  def handle_error(response, tag, chunk, bulk_message_count, extracted_values)
1✔
73
    items = response['items']
18✔
74
    if items.nil? || !items.is_a?(Array)
18✔
75
      raise OpenSearchVersionMismatch, "The response format was unrecognized: #{response}"
×
76
    end
77
    if bulk_message_count != items.length
18✔
78
      raise OpenSearchSubmitMismatch, "The number of records submitted #{bulk_message_count} do not match the number returned #{items.length}. Unable to process bulk response."
×
79
    end
80
    retry_stream = Fluent::MultiEventStream.new
18✔
81
    stats = Hash.new(0)
18✔
82
    meta = {}
18✔
83
    header = {}
18✔
84
    affinity_target_indices = @plugin.get_affinity_target_indices(chunk)
18✔
85
    chunk.msgpack_each do |time, rawrecord|
18✔
86
      bulk_message = ''
52✔
87
      next unless rawrecord.is_a? Hash
52✔
88
      begin
89
        # we need a deep copy for process_message to alter
90
        processrecord = Marshal.load(Marshal.dump(rawrecord))
52✔
91
        meta, header, record = @plugin.process_message(tag, meta, header, time, processrecord, affinity_target_indices, extracted_values)
52✔
92
        next unless @plugin.append_record_to_messages(@plugin.write_operation, meta, header, record, bulk_message)
52✔
93
      rescue => e
94
        @plugin.log.debug("Exception in error handler during deep copy: #{e}")
7✔
95
        stats[:bad_chunk_record] += 1
7✔
96
        next
7✔
97
      end
98
      item = items.shift
41✔
99
      if item.is_a?(Hash) && item.has_key?(@plugin.write_operation)
41✔
100
        write_operation = @plugin.write_operation
4✔
101
      elsif INDEX_OP == @plugin.write_operation && item.is_a?(Hash) && item.has_key?(CREATE_OP)
37✔
102
        write_operation = CREATE_OP
29✔
103
      elsif UPSERT_OP == @plugin.write_operation && item.is_a?(Hash) && item.has_key?(UPDATE_OP)
8✔
104
        write_operation = UPDATE_OP
7✔
105
      elsif item.nil?
1✔
106
        stats[:errors_nil_resp] += 1
×
107
        next
×
108
      else
109
        # When we don't have an expected ops field, something changed in the API
110
        # expected return values.
111
        stats[:errors_bad_resp] += 1
1✔
112
        next
1✔
113
      end
114
      if item[write_operation].has_key?('status')
40✔
115
        status = item[write_operation]['status']
38✔
116
      else
117
        # When we don't have a status field, something changed in the API
118
        # expected return values.
119
        stats[:errors_bad_resp] += 1
2✔
120
        next
2✔
121
      end
122
      case
123
      when [200, 201].include?(status)
38✔
124
        stats[:successes] += 1
4✔
125
      when CREATE_OP == write_operation && 409 == status
126
        stats[:duplicates] += 1
3✔
127
      when 400 == status
128
        stats[:bad_argument] += 1
9✔
129
        reason = ""
9✔
130
        log_os_400_reason do
9✔
131
          if item[write_operation].has_key?('error') && item[write_operation]['error'].has_key?('type')
4✔
132
            reason = " [error type]: #{item[write_operation]['error']['type']}"
4✔
133
          end
134
          if item[write_operation].has_key?('error') && item[write_operation]['error'].has_key?('reason')
4✔
135
            reason += " [reason]: \'#{item[write_operation]['error']['reason']}\'"
4✔
136
          end
137
        end
138
        if emit_error_label_event?
9✔
139
          @plugin.router.emit_error_event(tag, time, rawrecord, OpenSearchError.new("400 - Rejected by OpenSearch#{reason}"))
7✔
140
        end
141
      else
142
        if item[write_operation]['error'].is_a?(String)
22✔
143
          reason = item[write_operation]['error']
1✔
144
          stats[:errors_block_resp] += 1
1✔
145
          if emit_error_label_event?
1✔
146
            @plugin.router.emit_error_event(tag, time, rawrecord, OpenSearchError.new("#{status} - #{reason}"))
1✔
147
          end
148
          next
1✔
149
        elsif item[write_operation].has_key?('error') && item[write_operation]['error'].has_key?('type')
21✔
150
          type = item[write_operation]['error']['type']
18✔
151
          stats[type] += 1
18✔
152
          if unrecoverable_error?(type)
18✔
153
            raise OpenSearchRequestAbortError, "Rejected OpenSearch due to #{type}"
3✔
154
          end
155
          if unrecoverable_record_error?(type)
15✔
156
            if emit_error_label_event?
1✔
157
              @plugin.router.emit_error_event(tag, time, rawrecord, OpenSearchError.new("#{status} - #{type}: #{reason}"))
1✔
158
            end
159
            next
1✔
160
          else
161
            retry_stream.add(time, rawrecord) unless unrecoverable_record_error?(type)
14✔
162
          end
163
        else
164
          # When we don't have a type field, something changed in the API
165
          # expected return values.
166
          stats[:errors_bad_resp] += 1
3✔
167
          if emit_error_label_event?
3✔
168
            @plugin.router.emit_error_event(tag, time, rawrecord, OpenSearchError.new("#{status} - No error type provided in the response"))
3✔
169
          end
170
          next
3✔
171
        end
172
        stats[type] += 1
14✔
173
      end
174
    end
175
    @plugin.log.on_debug do
15✔
176
      msg = ["Indexed (op = #{@plugin.write_operation})"]
4✔
177
      stats.each_pair { |key, value| msg << "#{value} #{key}" }
10✔
178
      @plugin.log.debug msg.join(', ')
4✔
179
    end
180
    raise Fluent::Plugin::OpenSearchOutput::RetryStreamError.new(retry_stream) unless retry_stream.empty?
15✔
181
  end
182
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc