• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jdantonio / concurrent-ruby / #749

06 Apr 2014 10:19PM UTC coverage: 92.79% (-4.5%) from 97.309%
#749

push

chrisseaton
Update README.md

1583 of 1706 relevant lines covered (92.79%)

655.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

43.4
/lib/concurrent/actor.rb
1
require 'thread'
1✔
2
require 'observer'
1✔
3

4
require 'concurrent/event'
1✔
5
require 'concurrent/obligation'
1✔
6
require 'concurrent/postable'
1✔
7
require 'concurrent/runnable'
1✔
8

9
module Concurrent
1✔
10

11
  # Actor-based concurrency is all the rage in some circles. Originally described in
12
  # 1973, the actor model is a paradigm for creating asynchronous, concurrent objects
13
  # that is becoming increasingly popular. Much has changed since actors were first
14
  # written about four decades ago, which has led to a serious fragmentation within
15
  # the actor community. There is *no* universally accepted, strict definition of
16
  # "actor" and actor implementations differ widely between languages and libraries.
17
  # 
18
  # A good definition of "actor" is:
19
  # 
20
  #   An independent, concurrent, single-purpose, computational entity that communicates exclusively via message passing.
21
  # 
22
  # The +Concurrent::Actor+ class in this library is based solely on the
23
  # {http://www.scala-lang.org/api/current/index.html#scala.actors.Actor Actor} trait
24
  # defined in the Scala standard library. It does not implement all the features of
25
  # Scala's +Actor+ but its behavior for what *has* been implemented is nearly identical.
26
  # The excluded features mostly deal with Scala's message semantics, strong typing,
27
  # and other characteristics of Scala that don't really apply to Ruby.
28
  # 
29
  # Unlike many of the abstractions in this library, +Actor+ takes an *object-oriented*
30
  # approach to asynchronous concurrency, rather than a *functional programming*
31
  # approach.
32
  #   
33
  # Because +Actor+ mixes in the +Concurrent::Runnable+ module subclasses have access to
34
  # the +#on_error+ method and can override it to implement custom error handling. The
35
  # +Actor+ base class does not use +#on_error+ so as to avoid conflit with subclasses
36
  # which override it. Generally speaking, +#on_error+ should not be used. The +Actor+
37
  # base class provides concictent, reliable, and robust error handling already, and
38
  # error handling specifics are tied to the message posting method. Incorrect behavior
39
  # in an +#on_error+ override can lead to inconsistent +Actor+ behavior that may lead
40
  # to confusion and difficult debugging.
41
  #   
42
  # The +Actor+ superclass mixes in the Ruby standard library
43
  # {http://ruby-doc.org/stdlib-2.0/libdoc/observer/rdoc/Observable.html Observable}
44
  # module to provide consistent callbacks upon message processing completion. The normal
45
  # +Observable+ methods, including +#add_observer+ behave normally. Once an observer
46
  # is added to an +Actor+ it will be notified of all messages processed *after*
47
  # addition. Notification will *not* occur for any messages that have already been
48
  # processed.
49
  #   
50
  # Observers will be notified regardless of whether the message processing is successful
51
  # or not. The +#update+ method of the observer will receive four arguments. The
52
  # appropriate method signature is:
53
  #   
54
  #   def update(time, message, result, reason)
55
  #   
56
  # These four arguments represent:
57
  #   
58
  # * The time that message processing was completed
59
  # * An array containing all elements of the original message, in order
60
  # * The result of the call to +#act+ (will be +nil+ if an exception was raised)
61
  # * Any exception raised by +#act+ (or +nil+ if message processing was successful)
62
  #
63
  # @example Actor Ping Pong
64
  #   class Ping < Concurrent::Actor
65
  #   
66
  #     def initialize(count, pong)
67
  #       super()
68
  #       @pong = pong
69
  #       @remaining = count
70
  #     end
71
  #     
72
  #     def act(msg)
73
  #   
74
  #       if msg == :pong
75
  #         print "Ping: pong\n" if @remaining % 1000 == 0
76
  #         @pong.post(:ping)
77
  #   
78
  #         if @remaining > 0
79
  #           @pong << :ping
80
  #           @remaining -= 1
81
  #         else
82
  #           print "Ping :stop\n"
83
  #           @pong << :stop
84
  #           self.stop
85
  #         end
86
  #       end
87
  #     end
88
  #   end
89
  #   
90
  #   class Pong < Concurrent::Actor
91
  #   
92
  #     attr_writer :ping
93
  #   
94
  #     def initialize
95
  #       super()
96
  #       @count = 0
97
  #     end
98
  #   
99
  #     def act(msg)
100
  #   
101
  #       if msg == :ping
102
  #         print "Pong: ping\n" if @count % 1000 == 0
103
  #         @ping << :pong
104
  #         @count += 1
105
  #   
106
  #       elsif msg == :stop
107
  #         print "Pong :stop\n"
108
  #         self.stop
109
  #       end
110
  #     end
111
  #   end
112
  #   
113
  #   pong = Pong.new
114
  #   ping = Ping.new(10000, pong)
115
  #   pong.ping = ping
116
  #   
117
  #   t1 = ping.run!
118
  #   t2 = pong.run!
119
  #   
120
  #   ping << :pong
121
  #
122
  # @deprecated +Actor+ is being replaced with a completely new framework prior to v1.0.0
123
  #
124
  # @see http://ruby-doc.org/stdlib-2.0/libdoc/observer/rdoc/Observable.html
125
  class Actor
1✔
126
    include Observable
1✔
127
    include Postable
1✔
128
    include Runnable
1✔
129

130
    private
1✔
131

132
    # @!visibility private
133
    class Poolbox # :nodoc:
1✔
134
      include Postable
1✔
135

136
      def initialize(queue)
1✔
137
        @queue = queue
×
138
      end
139
    end
140

141
    public
1✔
142

143
    # Create a pool of actors that share a common mailbox.
144
    #   
145
    # Every +Actor+ instance operates on its own thread. When one thread isn't enough capacity
146
    # to manage all the messages being sent to an +Actor+ a *pool* can be used instead. A pool
147
    # is a collection of +Actor+ instances, all of the same type, that shate a message queue.
148
    # Messages from other threads are all sent to a single queue against which all +Actor+s
149
    # load balance.
150
    #
151
    # @param [Integer] count the number of actors in the pool
152
    # @param [Array] args zero or more arguments to pass to each actor in the pool
153
    #
154
    # @return [Array] two-element array with the shared mailbox as the first element
155
    #   and an array of actors as the second element
156
    #
157
    # @raise ArgumentError if +count+ is zero or less
158
    #
159
    # @example
160
    #   class EchoActor < Concurrent::Actor
161
    #     def act(*message)
162
    #       puts "#{message} handled by #{self}"
163
    #     end
164
    #   end
165
    #     
166
    #   mailbox, pool = EchoActor.pool(5)
167
    #   pool.each{|echo| echo.run! }
168
    #     
169
    #   10.times{|i| mailbox.post(i) }
170
    #   #=> [0] handled by #<EchoActor:0x007fc8014fb8b8>
171
    #   #=> [1] handled by #<EchoActor:0x007fc8014fb890>
172
    #   #=> [2] handled by #<EchoActor:0x007fc8014fb868>
173
    #   #=> [3] handled by #<EchoActor:0x007fc8014fb890>
174
    #   #=> [4] handled by #<EchoActor:0x007fc8014fb840>
175
    #   #=> [5] handled by #<EchoActor:0x007fc8014fb8b8>
176
    #   #=> [6] handled by #<EchoActor:0x007fc8014fb8b8>
177
    #   #=> [7] handled by #<EchoActor:0x007fc8014fb818>
178
    #   #=> [8] handled by #<EchoActor:0x007fc8014fb890>
179
    #
180
    # @deprecated +Actor+ is being replaced with a completely new framework prior to v1.0.0
181
    def self.pool(count, *args, &block)
1✔
182
      raise ArgumentError.new('count must be greater than zero') unless count > 0
×
183
      mailbox = Queue.new
×
184
      actors = count.times.collect do
×
185
        if block_given?
×
186
          actor = self.new(*args, &block.dup)
×
187
        else
188
          actor = self.new(*args)
×
189
        end
190
        actor.instance_variable_set(:@queue, mailbox)
×
191
        actor
×
192
      end
193
      return Poolbox.new(mailbox), actors
×
194
    end
195

196
    protected
1✔
197

198
    # Actors are defined by subclassing the +Concurrent::Actor+ class and overriding the
199
    # #act method. The #act method can have any signature/arity but +def act(*args)+
200
    # is the most flexible and least error-prone signature. The #act method is called in
201
    # response to a message being post to the +Actor+ instance (see *Behavior* below).
202
    #
203
    # @param [Array] message one or more arguments representing the message sent to the
204
    #   actor via one of the Concurrent::Postable methods
205
    #
206
    # @return [Object] the result obtained when the message is successfully processed
207
    #
208
    # @raise NotImplementedError unless overridden in the +Actor+ subclass
209
    #
210
    # @deprecated +Actor+ is being replaced with a completely new framework prior to v1.0.0
211
    # 
212
    # @!visibility public
213
    def act(*message)
1✔
214
      raise NotImplementedError.new("#{self.class} does not implement #act")
×
215
    end
216

217
    # @!visibility private
218
    #
219
    # @deprecated +Actor+ is being replaced with a completely new framework prior to v1.0.0
220
    def on_run # :nodoc:
1✔
221
      queue.clear
×
222
    end
223

224
    # @!visibility private
225
    #
226
    # @deprecated +Actor+ is being replaced with a completely new framework prior to v1.0.0
227
    def on_stop # :nodoc:
1✔
228
      queue.clear
×
229
      queue.push(:stop)
×
230
    end
231

232
    # @!visibility private
233
    #
234
    # @deprecated +Actor+ is being replaced with a completely new framework prior to v1.0.0
235
    def on_task # :nodoc:
1✔
236
      package = queue.pop
×
237
      return if package == :stop
×
238
      result = ex = nil
×
239
      notifier = package.notifier
×
240
      begin
241
        if notifier.nil? || (notifier.is_a?(Event) && ! notifier.set?)
×
242
          result = act(*package.message)
×
243
        end
244
      rescue => ex
245
        on_error(Time.now, package.message, ex)
×
246
      ensure
247
        if notifier.is_a?(Event) && ! notifier.set?
×
248
          package.handler.push(result || ex)
×
249
          package.notifier.set
×
250
        elsif package.handler.is_a?(IVar)
×
251
          package.handler.complete(! result.nil?, result, ex)
×
252
        elsif package.handler.respond_to?(:post) && ex.nil?
×
253
          package.handler.post(result)
×
254
        end
255

256
        changed
×
257
        notify_observers(Time.now, package.message, result, ex)
×
258
      end
259
    end
260

261
    # @!visibility private
262
    #
263
    # @deprecated +Actor+ is being replaced with a completely new framework prior to v1.0.0
264
    def on_error(time, msg, ex) # :nodoc:
1✔
265
    end
266
  end
267
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc