• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ruby-concurrency / concurrent-ruby / #2764

08 Dec 2014 03:40PM UTC coverage: 91.388% (-0.4%) from 91.753%
#2764

push

jdantonio
Merge pull request #201 from rkday/fallback_handling

Posting to a shutdown thread pool - JRuby consistency and better naming

18 of 26 new or added lines in 5 files covered. (69.23%)

212 existing lines in 36 files now uncovered.

2812 of 3077 relevant lines covered (91.39%)

369.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.4
/lib/concurrent/agent.rb
1
require 'thread'
1✔
2

3
require 'concurrent/dereferenceable'
1✔
4
require 'concurrent/observable'
1✔
5
require 'concurrent/options_parser'
1✔
6
require 'concurrent/utility/timeout'
1✔
7
require 'concurrent/logging'
1✔
8

9
module Concurrent
1✔
10

11
  # {include:file:doc/agent.md}
12
  #
13
  # @!attribute [r] timeout
14
  #   @return [Fixnum] the maximum number of seconds before an update is cancelled
15
  class Agent
1✔
16
    include Dereferenceable
1✔
17
    include Concurrent::Observable
1✔
18
    include Logging
1✔
19

20
    attr_reader :timeout, :task_executor, :operation_executor
1✔
21

22
    # Initialize a new Agent with the given initial value and provided options.
23
    #
24
    # @param [Object] initial the initial value
25
    # @param [Hash] opts the options used to define the behavior at update and deref
26
    #
27
    # @option opts [Boolean] :operation (false) when `true` will execute the future on the global
28
    #   operation pool (for long-running operations), when `false` will execute the future on the
29
    #   global task pool (for short-running tasks)
30
    # @option opts [object] :executor when provided will run all operations on
31
    #   this executor rather than the global thread pool (overrides :operation)
32
    #
33
    # @option opts [String] :dup_on_deref (false) call `#dup` before returning the data
34
    # @option opts [String] :freeze_on_deref (false) call `#freeze` before returning the data
35
    # @option opts [String] :copy_on_deref (nil) call the given `Proc` passing the internal value and
36
    #   returning the value returned from the proc
37
    def initialize(initial, opts = {})
1✔
38
      @value                = initial
89✔
39
      @rescuers             = []
89✔
40
      @validator            = Proc.new { |result| true }
118✔
41
      self.observers        = CopyOnWriteObserverSet.new
89✔
42
      @serialized_execution = SerializedExecution.new
89✔
43
      @task_executor        = OptionsParser.get_task_executor_from(opts)
89✔
44
      @operation_executor   = OptionsParser.get_operation_executor_from(opts)
89✔
45
      init_mutex
89✔
46
      set_deref_options(opts)
89✔
47
    end
48

49
    # Specifies a block operation to be performed when an update operation raises
50
    # an exception. Rescue blocks will be checked in order they were added. The first
51
    # block for which the raised exception "is-a" subclass of the given `clazz` will
52
    # be called. If no `clazz` is given the block will match any caught exception.
53
    # This behavior is intended to be identical to Ruby's `begin/rescue/end` behavior.
54
    # Any number of rescue handlers can be added. If no rescue handlers are added then
55
    # caught exceptions will be suppressed.
56
    #
57
    # @param [Exception] clazz the class of exception to catch
58
    # @yield the block to be called when a matching exception is caught
59
    # @yieldparam [StandardError] ex the caught exception
60
    #
61
    # @example
62
    #   score = Concurrent::Agent.new(0).
63
    #             rescue(NoMethodError){|ex| puts "Bam!" }.
64
    #             rescue(ArgumentError){|ex| puts "Pow!" }.
65
    #             rescue{|ex| puts "Boom!" }
66
    #   
67
    #   score << proc{|current| raise ArgumentError }
68
    #   sleep(0.1)
69
    #   #=> puts "Pow!"
70
    def rescue(clazz = StandardError, &block)
1✔
71
      unless block.nil?
30✔
72
        mutex.synchronize do
27✔
73
          @rescuers << Rescuer.new(clazz, block)
27✔
74
        end
75
      end
76
      self
30✔
77
    end
78
    alias_method :catch, :rescue
1✔
79
    alias_method :on_error, :rescue
1✔
80

81
    # A block operation to be performed after every update to validate if the new
82
    # value is valid. If the new value is not valid then the current value is not
83
    # updated. If no validator is provided then all updates are considered valid.
84
    #
85
    # @yield the block to be called after every update operation to determine if
86
    #   the result is valid
87
    # @yieldparam [Object] value the result of the last update operation
88
    # @yieldreturn [Boolean] true if the value is valid else false
89
    def validate(&block)
1✔
90

91
      unless block.nil?
12✔
92
        begin
93
          mutex.lock
10✔
94
          @validator = block
10✔
95
        ensure
96
          mutex.unlock
10✔
97
        end
98
      end
99
      self
12✔
100
    end
101
    alias_method :validates, :validate
1✔
102
    alias_method :validate_with, :validate
1✔
103
    alias_method :validates_with, :validate
1✔
104

105
    # Update the current value with the result of the given block operation,
106
    # block should not do blocking calls, use #post_off for blocking calls
107
    #
108
    # @yield the operation to be performed with the current value in order to calculate
109
    #   the new value
110
    # @yieldparam [Object] value the current value
111
    # @yieldreturn [Object] the new value
112
    # @return [true, nil] nil when no block is given
113
    def post(&block)
1✔
114
      post_on(@task_executor, &block)
57✔
115
    end
116

117
    # Update the current value with the result of the given block operation,
118
    # block can do blocking calls
119
    #
120
    # @param [Fixnum, nil] timeout maximum number of seconds before an update is cancelled
121
    #
122
    # @yield the operation to be performed with the current value in order to calculate
123
    #   the new value
124
    # @yieldparam [Object] value the current value
125
    # @yieldreturn [Object] the new value
126
    # @return [true, nil] nil when no block is given
127
    def post_off(timeout = nil, &block)
1✔
128
      block = if timeout
2✔
129
                lambda { |value| Concurrent::timeout(timeout) { block.call(value) } }
12,556✔
130
              else
131
                block
1✔
132
              end
133
      post_on(@operation_executor, &block)
2✔
134
    end
135

136
    # Update the current value with the result of the given block operation,
137
    # block should not do blocking calls, use #post_off for blocking calls
138
    #
139
    # @yield the operation to be performed with the current value in order to calculate
140
    #   the new value
141
    # @yieldparam [Object] value the current value
142
    # @yieldreturn [Object] the new value
143
    def <<(block)
1✔
UNCOV
144
      post(&block)
×
UNCOV
145
      self
×
146
    end
147

148
    # Waits/blocks until all the updates sent before this call are done.
149
    #
150
    # @param [Numeric] timeout the maximum time in second to wait.
151
    # @return [Boolean] false on timeout, true otherwise
152
    def await(timeout = nil)
1✔
153
      done = Event.new
5✔
154
      post { |val| done.set; val }
10✔
155
      done.wait timeout
5✔
156
    end
157

158
    private
1✔
159

160
    def post_on(executor, &block)
1✔
161
      return nil if block.nil?
59✔
162
      @serialized_execution.post(executor) { work(&block) }
110✔
163
      true
58✔
164
    end
165

166
    # @!visibility private
167
    Rescuer = Struct.new(:clazz, :block) # :nodoc:
1✔
168

169
    # @!visibility private
170
    def try_rescue(ex) # :nodoc:
1✔
171
      rescuer = mutex.synchronize do
51✔
172
        @rescuers.find { |r| ex.is_a?(r.clazz) }
70✔
173
      end
174
      rescuer.block.call(ex) if rescuer
51✔
175
    rescue Exception => ex
176
      # suppress
177
      log DEBUG, ex
1✔
178
    end
179

180
    # @!visibility private
181
    def work(&handler) # :nodoc:
1✔
182
      validator, value = mutex.synchronize { [@validator, @value] }
104✔
183

184
      begin
185
        result = handler.call(value)
52✔
186
        valid  = validator.call(result)
38✔
187
      rescue Exception => ex
188
        exception = ex
14✔
189
      end
190

191
      begin
192
        mutex.lock
51✔
193
        should_notify = if !exception && valid
51✔
194
                          @value = result
35✔
195
                          true
35✔
196
                        end
197
      ensure
198
        mutex.unlock
51✔
199
      end
200

201
      if should_notify
51✔
202
        time = Time.now
35✔
203
        observers.notify_observers { [time, self.value] }
52✔
204
      end
205

206
      try_rescue(exception)
51✔
207
    end
208
  end
209
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc