• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jdantonio / concurrent-ruby / #748

24 Apr 2014 12:31AM UTC coverage: 78.31% (-19.5%) from 97.805%
#748

push

jdantonio
Attempting to fix a brittle test of Concurrent::timer.

1928 of 2462 relevant lines covered (78.31%)

409.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.43
/lib/concurrent/agent.rb
1
require 'thread'
1✔
2

3
require 'concurrent/dereferenceable'
1✔
4
require 'concurrent/observable'
1✔
5
require 'concurrent/options_parser'
1✔
6
require 'concurrent/utility/timeout'
1✔
7

8
module Concurrent
1✔
9

10
  # An agent is a single atomic value that represents an identity. The current value
11
  # of the agent can be requested at any time (`#deref`). Each agent has a work queue and operates on
12
  # the global thread pool. Consumers can `#post` code blocks to the agent. The code block (function)
13
  # will receive the current value of the agent as its sole parameter. The return value of the block
14
  # will become the new value of the agent. Agents support two error handling modes: fail and continue.
15
  # A good example of an agent is a shared incrementing counter, such as the score in a video game.
16
  #
17
  # @example Basic usage
18
  #   score = Concurrent::Agent.new(10)
19
  #   score.value #=> 10
20
  #   
21
  #   score << proc{|current| current + 100 }
22
  #   sleep(0.1)
23
  #   score.value #=> 110
24
  #   
25
  #   score << proc{|current| current * 2 }
26
  #   sleep(0.1)
27
  #   score.value #=> 220
28
  #   
29
  #   score << proc{|current| current - 50 }
30
  #   sleep(0.1)
31
  #   score.value #=> 170
32
  #
33
  # @!attribute [r] timeout
34
  #   @return [Fixnum] the maximum number of seconds before an update is cancelled
35
  class Agent
1✔
36
    include Dereferenceable
1✔
37
    include Concurrent::Observable
1✔
38

39
    # The default timeout value (in seconds); used when no timeout option
40
    # is given at initialization
41
    TIMEOUT = 5
1✔
42

43
    attr_reader :timeout
1✔
44

45
    # Initialize a new Agent with the given initial value and provided options.
46
    #
47
    # @param [Object] initial the initial value
48
    # @param [Hash] opts the options used to define the behavior at update and deref
49
    #
50
    # @option opts [Fixnum] :timeout (TIMEOUT) maximum number of seconds before an update is cancelled
51
    #
52
    # @option opts [Boolean] :operation (false) when `true` will execute the future on the global
53
    #   operation pool (for long-running operations), when `false` will execute the future on the
54
    #   global task pool (for short-running tasks)
55
    # @option opts [object] :executor when provided will run all operations on
56
    #   this executor rather than the global thread pool (overrides :operation)
57
    #
58
    # @option opts [String] :dup_on_deref (false) call `#dup` before returning the data
59
    # @option opts [String] :freeze_on_deref (false) call `#freeze` before returning the data
60
    # @option opts [String] :copy_on_deref (nil) call the given `Proc` passing the internal value and
61
    #   returning the value returned from the proc
62
    def initialize(initial, opts = {})
1✔
63
      @value = initial
82✔
64
      @rescuers = []
82✔
65
      @validator = Proc.new { |result| true }
98✔
66
      @timeout = opts.fetch(:timeout, TIMEOUT).freeze
82✔
67
      self.observers = CopyOnWriteObserverSet.new
82✔
68
      @executor = OptionsParser::get_executor_from(opts)
82✔
69
      init_mutex
82✔
70
      set_deref_options(opts)
82✔
71
    end
72

73
    # Specifies a block operation to be performed when an update operation raises
74
    # an exception. Rescue blocks will be checked in order they were added. The first
75
    # block for which the raised exception "is-a" subclass of the given `clazz` will
76
    # be called. If no `clazz` is given the block will match any caught exception.
77
    # This behavior is intended to be identical to Ruby's `begin/rescue/end` behavior.
78
    # Any number of rescue handlers can be added. If no rescue handlers are added then
79
    # caught exceptions will be suppressed.
80
    #
81
    # @param [Exception] clazz the class of exception to catch
82
    # @yield the block to be called when a matching exception is caught
83
    # @yieldparam [StandardError] ex the caught exception
84
    #
85
    # @example
86
    #   score = Concurrent::Agent.new(0).
87
    #             rescue(NoMethodError){|ex| puts "Bam!" }.
88
    #             rescue(ArgumentError){|ex| puts "Pow!" }.
89
    #             rescue{|ex| puts "Boom!" }
90
    #   
91
    #   score << proc{|current| raise ArgumentError }
92
    #   sleep(0.1)
93
    #   #=> puts "Pow!"
94
    def rescue(clazz = StandardError, &block)
1✔
95
      unless block.nil?
30✔
96
        mutex.synchronize do
27✔
97
          @rescuers << Rescuer.new(clazz, block)
27✔
98
        end
99
      end
100
      self
30✔
101
    end
102
    alias_method :catch, :rescue
1✔
103
    alias_method :on_error, :rescue
1✔
104

105
    # A block operation to be performed after every update to validate if the new
106
    # value is valid. If the new value is not valid then the current value is not
107
    # updated. If no validator is provided then all updates are considered valid.
108
    #
109
    # @yield the block to be called after every update operation to determine if
110
    #   the result is valid
111
    # @yieldparam [Object] value the result of the last update operation
112
    # @yieldreturn [Boolean] true if the value is valid else false
113
    def validate(&block)
1✔
114
      @validator = block unless block.nil?
12✔
115
      self
12✔
116
    end
117
    alias_method :validates, :validate
1✔
118
    alias_method :validate_with, :validate
1✔
119
    alias_method :validates_with, :validate
1✔
120

121
    # Update the current value with the result of the given block operation
122
    #
123
    # @yield the operation to be performed with the current value in order to calculate
124
    #   the new value
125
    # @yieldparam [Object] value the current value
126
    # @yieldreturn [Object] the new value
127
    def post(&block)
1✔
128
      @executor.post{ work(&block) } unless block.nil?
82✔
129
    end
130

131
    # Update the current value with the result of the given block operation
132
    #
133
    # @yield the operation to be performed with the current value in order to calculate
134
    #   the new value
135
    # @yieldparam [Object] value the current value
136
    # @yieldreturn [Object] the new value
137
    def <<(block)
1✔
138
      self.post(&block)
×
139
      self
×
140
    end
141

142
    private
1✔
143

144
    # @!visibility private
145
    Rescuer = Struct.new(:clazz, :block) # :nodoc:
1✔
146

147
    # @!visibility private
148
    def try_rescue(ex) # :nodoc:
1✔
149
      rescuer = mutex.synchronize do
13✔
150
        @rescuers.find{|r| ex.is_a?(r.clazz) }
32✔
151
      end
152
      rescuer.block.call(ex) if rescuer
13✔
153
    rescue Exception => ex
154
      # supress
155
    end
156

157
    # @!visibility private
158
    def work(&handler) # :nodoc:
1✔
159
      begin
160

161
        should_notify = false
37✔
162

163
        mutex.synchronize do
37✔
164
          result = Concurrent::timeout(@timeout) do
37✔
165
            handler.call(@value)
37✔
166
          end
167
          if @validator.call(result)
25✔
168
            @value = result
22✔
169
            should_notify = true
22✔
170
          end
171
        end
172
        time = Time.now
24✔
173
        observers.notify_observers{ [time, self.value] } if should_notify
41✔
174
      rescue Exception => ex
13✔
175
        try_rescue(ex)
13✔
176
      end
177
    end
178
  end
179
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc