• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ruby-concurrency / concurrent-ruby / #2835

05 Oct 2014 10:16PM UTC coverage: 45.201% (-49.6%) from 94.81%
#2835

push

jdantonio
Merge pull request #158 from obrok/promise-composition

Promise composition

2 of 15 new or added lines in 1 file covered. (13.33%)

1514 existing lines in 84 files now uncovered.

1375 of 3042 relevant lines covered (45.2%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

36.36
/lib/concurrent/async.rb
1
require 'thread'
1✔
2
require 'concurrent/configuration'
1✔
3
require 'concurrent/delay'
1✔
4
require 'concurrent/errors'
1✔
5
require 'concurrent/ivar'
1✔
6
require 'concurrent/executor/immediate_executor'
1✔
7
require 'concurrent/executor/serialized_execution'
1✔
8

9
module Concurrent
1✔
10

11
  # A mixin module that provides simple asynchronous behavior to any standard
12
  # class/object or object. 
13
  #
14
  #   Scenario:
15
  #     As a stateful, plain old Ruby class/object
16
  #     I want safe, asynchronous behavior
17
  #     So my long-running methods don't block the main thread
18
  #
19
  # Stateful, mutable objects must be managed carefully when used asynchronously.
20
  # But Ruby is an object-oriented language so designing with objects and classes
21
  # plays to Ruby's strengths and is often more natural to many Ruby programmers.
22
  # The `Async` module is a way to mix simple yet powerful asynchronous capabilities
23
  # into any plain old Ruby object or class. These capabilities provide a reasonable
24
  # level of thread safe guarantees when used correctly.
25
  #
26
  # When this module is mixed into a class or object it provides to new methods:
27
  # `async` and `await`. These methods are thread safe with respect to the enclosing
28
  # object. The former method allows methods to be called asynchronously by posting
29
  # to the global thread pool. The latter allows a method to be called synchronously
30
  # on the current thread but does so safely with respect to any pending asynchronous
31
  # method calls. Both methods return an `Obligation` which can be inspected for
32
  # the result of the method call. Calling a method with `async` will return a
33
  # `:pending` `Obligation` whereas `await` will return a `:complete` `Obligation`.
34
  #
35
  # Very loosely based on the `async` and `await` keywords in C#.
36
  #
37
  # @example Defining an asynchronous class
38
  #   class Echo
39
  #     include Concurrent::Async
40
  #
41
  #     def initialize
42
  #       init_mutex # initialize the internal synchronization objects
43
  #     end
44
  #
45
  #     def echo(msg)
46
  #       sleep(rand)
47
  #       print "#{msg}\n"
48
  #       nil
49
  #     end
50
  #   end
51
  #   
52
  #   horn = Echo.new
53
  #   horn.echo('zero') # synchronous, not thread-safe
54
  #
55
  #   horn.async.echo('one') # asynchronous, non-blocking, thread-safe
56
  #   horn.await.echo('two') # synchronous, blocking, thread-safe
57
  #
58
  # @example Monkey-patching an existing object
59
  #   numbers = 1_000_000.times.collect{ rand }
60
  #   numbers.extend(Concurrent::Async)
61
  #   numbers.init_mutex # initialize the internal synchronization objects
62
  #   
63
  #   future = numbers.async.max
64
  #   future.state #=> :pending
65
  #   
66
  #   sleep(2)
67
  #   
68
  #   future.state #=> :fulfilled
69
  #   future.value #=> 0.999999138918843
70
  #
71
  # @note This module depends on several internal synchronization objects that
72
  #       must be initialized prior to calling any of the async/await/executor methods.
73
  #       The best practice is to call `init_mutex` from within the constructor
74
  #       of the including class. A less ideal but acceptable practice is for the
75
  #       thread creating the asynchronous object to explicitly call the `init_mutex`
76
  #       method prior to calling any of the async/await/executor methods. If
77
  #       `init_mutex` is *not* called explicitly the async/await/executor methods
78
  #       will raize a `Concurrent::InitializationError`. This is the only way 
79
  #       thread-safe initialization can be guaranteed.
80
  #
81
  # @note Thread safe guarantees can only be made when asynchronous method calls
82
  #       are not mixed with synchronous method calls. Use only synchronous calls
83
  #       when the object is used exclusively on a single thread. Use only
84
  #       `async` and `await` when the object is shared between threads. Once you
85
  #       call a method using `async`, you should no longer call any methods
86
  #       directly on the object. Use `async` and `await` exclusively from then on.
87
  #       With careful programming it is possible to switch back and forth but it's
88
  #       also very easy to create race conditions and break your application.
89
  #       Basically, it's "async all the way down."
90
  #
91
  # @since 0.6.0
92
  #
93
  # @see Concurrent::Obligation
94
  module Async
1✔
95

96
    # Check for the presence of a method on an object and determine if a given
97
    # set of arguments matches the required arity.
98
    #
99
    # @param [Object] obj the object to check against
100
    # @param [Symbol] method the method to check the object for
101
    # @param [Array] args zero or more arguments for the arity check
102
    #
103
    # @raise [NameError] the object does not respond to `method` method
104
    # @raise [ArgumentError] the given `args` do not match the arity of `method`
105
    #
106
    # @note This check is imperfect because of the way Ruby reports the arity of
107
    #   methods with a variable number of arguments. It is possible to determine
108
    #   if too few arguments are given but impossible to determine if too many
109
    #   arguments are given. This check may also fail to recognize dynamic behavior
110
    #   of the object, such as methods simulated with `method_missing`.
111
    #
112
    # @see http://www.ruby-doc.org/core-2.1.1/Method.html#method-i-arity Method#arity
113
    # @see http://ruby-doc.org/core-2.1.0/Object.html#method-i-respond_to-3F Object#respond_to?
114
    # @see http://www.ruby-doc.org/core-2.1.0/BasicObject.html#method-i-method_missing BasicObject#method_missing
115
    def validate_argc(obj, method, *args)
1✔
UNCOV
116
      argc = args.length
×
UNCOV
117
      arity = obj.method(method).arity
×
118

UNCOV
119
      if arity >= 0 && argc != arity
×
UNCOV
120
        raise ArgumentError.new("wrong number of arguments (#{argc} for #{arity})")
×
UNCOV
121
      elsif arity < 0 && (arity = (arity + 1).abs) > argc
×
UNCOV
122
        raise ArgumentError.new("wrong number of arguments (#{argc} for #{arity}..*)")
×
123
      end
124
    end
125
    module_function :validate_argc
1✔
126

127
    # Delegates asynchronous, thread-safe method calls to the wrapped object.
128
    #
129
    # @!visibility private
130
    class AsyncDelegator # :nodoc:
1✔
131

132
      # Create a new delegator object wrapping the given delegate,
133
      # protecting it with the given serializer, and executing it on the
134
      # given executor. Block if necessary.
135
      #
136
      # @param [Object] delegate the object to wrap and delegate method calls to
137
      # @param [Concurrent::Delay] executor a `Delay` wrapping the executor on which to execute delegated method calls
138
      # @param [Concurrent::SerializedExecution] serializer the serializer to use when delegating method calls
139
      # @param [Boolean] blocking will block awaiting result when `true`
140
      def initialize(delegate, executor, serializer, blocking = false)
1✔
UNCOV
141
        @delegate = delegate
×
UNCOV
142
        @executor = executor
×
UNCOV
143
        @serializer = serializer
×
UNCOV
144
        @blocking = blocking
×
145
      end
146

147
      # Delegates method calls to the wrapped object. For performance,
148
      # dynamically defines the given method on the delegator so that
149
      # all future calls to `method` will not be directed here.
150
      #
151
      # @param [Symbol] method the method being called
152
      # @param [Array] args zero or more arguments to the method
153
      #
154
      # @return [IVar] the result of the method call
155
      #
156
      # @raise [NameError] the object does not respond to `method` method
157
      # @raise [ArgumentError] the given `args` do not match the arity of `method`
158
      def method_missing(method, *args, &block)
1✔
UNCOV
159
        super unless @delegate.respond_to?(method)
×
UNCOV
160
        Async::validate_argc(@delegate, method, *args)
×
161

UNCOV
162
        self.define_singleton_method(method) do |*args|
×
UNCOV
163
          Async::validate_argc(@delegate, method, *args)
×
UNCOV
164
          ivar = Concurrent::IVar.new
×
UNCOV
165
          value, reason = nil, nil
×
UNCOV
166
          @serializer.post(@executor.value) do
×
167
            begin
UNCOV
168
              value = @delegate.send(method, *args, &block)
×
169
            rescue => reason
170
              # caught
171
            ensure
UNCOV
172
              ivar.complete(reason.nil?, value, reason)
×
173
            end
174
          end
UNCOV
175
          ivar.value if @blocking
×
UNCOV
176
          ivar
×
177
        end
178

UNCOV
179
        self.send(method, *args)
×
180
      end
181
    end
182

183
    # Causes the chained method call to be performed asynchronously on the
184
    # global thread pool. The method called by this method will return a
185
    # future object in the `:pending` state and the method call will have
186
    # been scheduled on the global thread pool. The final disposition of the
187
    # method call can be obtained by inspecting the returned future.
188
    #
189
    # Before scheduling the method on the global thread pool a best-effort
190
    # attempt will be made to validate that the method exists on the object
191
    # and that the given arguments match the arity of the requested function.
192
    # Due to the dynamic nature of Ruby and limitations of its reflection
193
    # library, some edge cases will be missed. For more information see
194
    # the documentation for the `validate_argc` method.
195
    #
196
    # @note The method call is guaranteed to be thread safe  with respect to
197
    #   all other method calls against the same object that are called with
198
    #   either `async` or `await`. The mutable nature of Ruby references
199
    #   (and object orientation in general) prevent any other thread safety
200
    #   guarantees. Do NOT mix non-protected method calls with protected
201
    #   method call. Use *only* protected method calls when sharing the object
202
    #   between threads.
203
    #
204
    # @return [Concurrent::IVar] the pending result of the asynchronous operation
205
    #
206
    # @raise [Concurrent::InitializationError] `#init_mutex` has not been called
207
    # @raise [NameError] the object does not respond to `method` method
208
    # @raise [ArgumentError] the given `args` do not match the arity of `method`
209
    #
210
    # @see Concurrent::IVar
211
    def async
1✔
UNCOV
212
      raise InitializationError.new('#init_mutex was never called') unless @__async_initialized__
×
UNCOV
213
      @__async_delegator__.value
×
214
    end
215
    alias_method :future, :async
1✔
216

217
    # Causes the chained method call to be performed synchronously on the
218
    # current thread. The method called by this method will return an
219
    # `IVar` object in either the `:fulfilled` or `rejected` state and the
220
    # method call will have completed. The final disposition of the
221
    # method call can be obtained by inspecting the returned `IVar`.
222
    #
223
    # Before scheduling the method on the global thread pool a best-effort
224
    # attempt will be made to validate that the method exists on the object
225
    # and that the given arguments match the arity of the requested function.
226
    # Due to the dynamic nature of Ruby and limitations of its reflection
227
    # library, some edge cases will be missed. For more information see
228
    # the documentation for the `validate_argc` method.
229
    #
230
    # @note The method call is guaranteed to be thread safe  with respect to
231
    #   all other method calls against the same object that are called with
232
    #   either `async` or `await`. The mutable nature of Ruby references
233
    #   (and object orientation in general) prevent any other thread safety
234
    #   guarantees. Do NOT mix non-protected method calls with protected
235
    #   method call. Use *only* protected method calls when sharing the object
236
    #   between threads.
237
    #
238
    # @return [Concurrent::IVar] the completed result of the synchronous operation
239
    #
240
    # @raise [Concurrent::InitializationError] `#init_mutex` has not been called
241
    # @raise [NameError] the object does not respond to `method` method
242
    # @raise [ArgumentError] the given `args` do not match the arity of `method`
243
    #
244
    # @see Concurrent::IVar
245
    def await
1✔
UNCOV
246
      raise InitializationError.new('#init_mutex was never called') unless @__async_initialized__
×
UNCOV
247
      @__await_delegator__.value
×
248
    end
249
    alias_method :delay, :await
1✔
250

251
    # Set a new executor
252
    #
253
    # @raise [Concurrent::InitializationError] `#init_mutex` has not been called
254
    # @raise [ArgumentError] executor has already been set
255
    def executor=(executor)
1✔
UNCOV
256
      raise InitializationError.new('#init_mutex was never called') unless @__async_initialized__
×
UNCOV
257
      @__async_executor__.reconfigure { executor } or
×
258
        raise ArgumentError.new('executor has already been set')
259
    end
260

261
    # Initialize the internal serializer and other synchronization objects. This method
262
    # *must* be called from the constructor of the including class or explicitly
263
    # by the caller prior to calling any other methods. If `init_mutex` is *not*
264
    # called explicitly the async/await/executor methods will raize a
265
    # `Concurrent::InitializationError`. This is the only way thread-safe
266
    # initialization can be guaranteed.
267
    #
268
    # @note This method *must* be called from the constructor of the including
269
    #       class or explicitly by the caller prior to calling any other methods.
270
    #       This is the only way thread-safe initialization can be guaranteed.
271
    #
272
    # @raise [Concurrent::InitializationError] when called more than once
273
    def init_mutex
1✔
UNCOV
274
      raise InitializationError.new('#init_mutex was already called') if @__async_initialized__
×
UNCOV
275
      @__async_initialized__ = true
×
UNCOV
276
      serializer = Concurrent::SerializedExecution.new
×
UNCOV
277
      @__async_executor__ = Delay.new{ Concurrent.configuration.global_operation_pool }
×
UNCOV
278
      @__await_delegator__ = Delay.new{ AsyncDelegator.new(
×
UNCOV
279
        self, Delay.new{ Concurrent::ImmediateExecutor.new }, serializer, true) }
×
UNCOV
280
      @__async_delegator__ = Delay.new{ AsyncDelegator.new(
×
281
        self, @__async_executor__, serializer, false) }
282
    end
283
  end
284
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc