• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ruby-concurrency / concurrent-ruby / #2711

16 Jun 2014 12:13PM UTC coverage: 45.896% (-50.5%) from 96.422%
#2711

push

jdantonio
Merge pull request #112 from ruby-concurrency/remove-old-actor

Remove old Actor

1 of 2 new or added lines in 2 files covered. (50.0%)

1362 existing lines in 62 files now uncovered.

1219 of 2656 relevant lines covered (45.9%)

0.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

34.62
/lib/concurrent/timer_task.rb
1
require 'concurrent/dereferenceable'
1✔
2
require 'concurrent/observable'
1✔
3
require 'concurrent/atomic/atomic_boolean'
1✔
4
require 'concurrent/executor/executor'
1✔
5
require 'concurrent/executor/safe_task_executor'
1✔
6

7
module Concurrent
1✔
8

9
  # A very common currency pattern is to run a thread that performs a task at regular
10
  # intervals. The thread that performs the task sleeps for the given interval then
11
  # wakes up and performs the task. Lather, rinse, repeat... This pattern causes two
12
  # problems. First, it is difficult to test the business logic of the task because the
13
  # task itself is tightly coupled with the concurrency logic. Second, an exception in
14
  # raised while performing the task can cause the entire thread to abend. In a
15
  # long-running application where the task thread is intended to run for days/weeks/years
16
  # a crashed task thread can pose a significant problem. `TimerTask` alleviates both problems.
17
  # 
18
  # When a `TimerTask` is launched it starts a thread for monitoring the execution interval.
19
  # The `TimerTask` thread does not perform the task, however. Instead, the TimerTask
20
  # launches the task on a separate thread. Should the task experience an unrecoverable
21
  # crash only the task thread will crash. This makes the `TimerTask` very fault tolerant
22
  # Additionally, the `TimerTask` thread can respond to the success or failure of the task,
23
  # performing logging or ancillary operations. `TimerTask` can also be configured with a
24
  # timeout value allowing it to kill a task that runs too long.
25
  # 
26
  # One other advantage of `TimerTask` is it forces the business logic to be completely decoupled
27
  # from the concurrency logic. The business logic can be tested separately then passed to the
28
  # `TimerTask` for scheduling and running.
29
  # 
30
  # In some cases it may be necessary for a `TimerTask` to affect its own execution cycle.
31
  # To facilitate this a reference to the task object is passed into the block as a block
32
  # argument every time the task is executed.
33
  # 
34
  # The `TimerTask` class includes the `Dereferenceable` mixin module so the result of
35
  # the last execution is always available via the `#value` method. Derefencing options
36
  # can be passed to the `TimerTask` during construction or at any later time using the
37
  # `#set_deref_options` method.
38
  # 
39
  # `TimerTask` supports notification through the Ruby standard library
40
  # {http://ruby-doc.org/stdlib-2.0/libdoc/observer/rdoc/Observable.html Observable}
41
  # module. On execution the `TimerTask` will notify the observers
42
  # with threes arguments: time of execution, the result of the block (or nil on failure),
43
  # and any raised exceptions (or nil on success). If the timeout interval is exceeded
44
  # the observer will receive a `Concurrent::TimeoutError` object as the third argument.
45
  #
46
  # @example Basic usage
47
  #   task = Concurrent::TimerTask.new{ puts 'Boom!' }
48
  #   task.run!
49
  #   
50
  #   task.execution_interval #=> 60 (default)
51
  #   task.timeout_interval   #=> 30 (default)
52
  #   
53
  #   # wait 60 seconds...
54
  #   #=> 'Boom!'
55
  #   
56
  #   task.stop #=> true
57
  #
58
  # @example Configuring `:execution_interval` and `:timeout_interval`
59
  #   task = Concurrent::TimerTask.new(execution_interval: 5, timeout_interval: 5) do
60
  #          puts 'Boom!'
61
  #        end
62
  #   
63
  #   task.execution_interval #=> 5
64
  #   task.timeout_interval   #=> 5
65
  #
66
  # @example Immediate execution with `:run_now`
67
  #   task = Concurrent::TimerTask.new(run_now: true){ puts 'Boom!' }
68
  #   task.run!
69
  #   
70
  #   #=> 'Boom!'
71
  #
72
  # @example Last `#value` and `Dereferenceable` mixin
73
  #   task = Concurrent::TimerTask.new(
74
  #     dup_on_deref: true,
75
  #     execution_interval: 5
76
  #   ){ Time.now }
77
  #   
78
  #   task.run!
79
  #   Time.now   #=> 2013-11-07 18:06:50 -0500
80
  #   sleep(10)
81
  #   task.value #=> 2013-11-07 18:06:55 -0500
82
  #
83
  # @example Controlling execution from within the block
84
  #   timer_task = Concurrent::TimerTask.new(execution_interval: 1) do |task|
85
  #     task.execution_interval.times{ print 'Boom! ' }
86
  #     print "\n"
87
  #     task.execution_interval += 1
88
  #     if task.execution_interval > 5
89
  #       puts 'Stopping...'
90
  #       task.stop
91
  #     end
92
  #   end
93
  #   
94
  #   timer_task.run # blocking call - this task will stop itself
95
  #   #=> Boom!
96
  #   #=> Boom! Boom!
97
  #   #=> Boom! Boom! Boom!
98
  #   #=> Boom! Boom! Boom! Boom!
99
  #   #=> Boom! Boom! Boom! Boom! Boom!
100
  #   #=> Stopping...
101
  #
102
  # @example Observation
103
  #   class TaskObserver
104
  #     def update(time, result, ex)
105
  #       if result
106
  #         print "(#{time}) Execution successfully returned #{result}\n"
107
  #       elsif ex.is_a?(Concurrent::TimeoutError)
108
  #         print "(#{time}) Execution timed out\n"
109
  #       else
110
  #         print "(#{time}) Execution failed with error #{ex}\n"
111
  #       end
112
  #     end
113
  #   end
114
  #   
115
  #   task = Concurrent::TimerTask.new(execution_interval: 1, timeout_interval: 1){ 42 }
116
  #   task.add_observer(TaskObserver.new)
117
  #   task.run!
118
  #   
119
  #   #=> (2013-10-13 19:08:58 -0400) Execution successfully returned 42
120
  #   #=> (2013-10-13 19:08:59 -0400) Execution successfully returned 42
121
  #   #=> (2013-10-13 19:09:00 -0400) Execution successfully returned 42
122
  #   task.stop
123
  #   
124
  #   task = Concurrent::TimerTask.new(execution_interval: 1, timeout_interval: 1){ sleep }
125
  #   task.add_observer(TaskObserver.new)
126
  #   task.run!
127
  #   
128
  #   #=> (2013-10-13 19:07:25 -0400) Execution timed out
129
  #   #=> (2013-10-13 19:07:27 -0400) Execution timed out
130
  #   #=> (2013-10-13 19:07:29 -0400) Execution timed out
131
  #   task.stop
132
  #   
133
  #   task = Concurrent::TimerTask.new(execution_interval: 1){ raise StandardError }
134
  #   task.add_observer(TaskObserver.new)
135
  #   task.run!
136
  #   
137
  #   #=> (2013-10-13 19:09:37 -0400) Execution failed with error StandardError
138
  #   #=> (2013-10-13 19:09:38 -0400) Execution failed with error StandardError
139
  #   #=> (2013-10-13 19:09:39 -0400) Execution failed with error StandardError
140
  #   task.stop
141
  #
142
  # @see http://ruby-doc.org/stdlib-2.0/libdoc/observer/rdoc/Observable.html
143
  # @see http://docs.oracle.com/javase/7/docs/api/java/util/TimerTask.html
144
  class TimerTask
1✔
145
    include Dereferenceable
1✔
146
    include RubyExecutor
1✔
147
    include Concurrent::Observable
1✔
148

149
    # Default `:execution_interval` in seconds.
150
    EXECUTION_INTERVAL = 60
1✔
151

152
    # Default `:timeout_interval` in seconds.
153
    TIMEOUT_INTERVAL = 30
1✔
154

155
    # Create a new TimerTask with the given task and configuration.
156
    #
157
    # @!macro [attach] timer_task_initialize
158
    #   @param [Hash] opts the options defining task execution.
159
    #   @option opts [Integer] :execution_interval number of seconds between
160
    #     task executions (default: EXECUTION_INTERVAL)
161
    #   @option opts [Integer] :timeout_interval number of seconds a task can
162
    #     run before it is considered to have failed (default: TIMEOUT_INTERVAL)
163
    #   @option opts [Boolean] :run_now Whether to run the task immediately
164
    #     upon instantiation or to wait until the first #  execution_interval
165
    #     has passed (default: false)
166
    #  
167
    #   @raise ArgumentError when no block is given.
168
    #  
169
    #   @yield to the block after :execution_interval seconds have passed since
170
    #     the last yield
171
    #   @yieldparam task a reference to the `TimerTask` instance so that the
172
    #     block can control its own lifecycle. Necessary since `self` will
173
    #     refer to the execution context of the block rather than the running
174
    #     `TimerTask`.
175
    #  
176
    #   @note Calls Concurrent::Dereferenceable#  set_deref_options passing `opts`.
177
    #     All options supported by Concurrent::Dereferenceable can be set
178
    #     during object initialization.
179
    #
180
    #   @return [TimerTask] the new `TimerTask`
181
    #  
182
    #   @see Concurrent::Dereferenceable#  set_deref_options
183
    def initialize(opts = {}, &task)
1✔
UNCOV
184
      raise ArgumentError.new('no block given') unless block_given?
×
185

UNCOV
186
      init_executor
×
UNCOV
187
      set_deref_options(opts)
×
188

UNCOV
189
      self.execution_interval = opts[:execution] || opts[:execution_interval] || EXECUTION_INTERVAL
×
UNCOV
190
      self.timeout_interval = opts[:timeout] || opts[:timeout_interval] || TIMEOUT_INTERVAL
×
UNCOV
191
      @run_now = opts[:now] || opts[:run_now]
×
UNCOV
192
      @executor = Concurrent::SafeTaskExecutor.new(task)
×
UNCOV
193
      @running = Concurrent::AtomicBoolean.new(false)
×
194

UNCOV
195
      self.observers = CopyOnNotifyObserverSet.new
×
196
    end
197

198
    # Is the executor running?
199
    #
200
    # @return [Boolean] `true` when running, `false` when shutting down or shutdown
201
    def running?
1✔
202
      @running.true?
×
203
    end
204

205
    # Execute a previously created `TimerTask`.
206
    #
207
    # @return [TimerTask] a reference to `self`
208
    #
209
    # @example Instance and execute in separate steps
210
    #   task = Concurrent::TimerTask.new(execution_interval: 10){ print "Hello World\n" }
211
    #   task.running? #=> false
212
    #   task.execute
213
    #   task.running? #=> true
214
    #
215
    # @example Instance and execute in one line
216
    #   task = Concurrent::TimerTask.new(execution_interval: 10){ print "Hello World\n" }.execute
217
    #   task.running? #=> true
218
    #
219
    # @since 0.6.0
220
    def execute
1✔
UNCOV
221
      mutex.synchronize do
×
UNCOV
222
        if @running.false?
×
UNCOV
223
          @running.make_true
×
UNCOV
224
          schedule_next_task(@run_now ? 0 : @execution_interval)
×
225
        end
226
      end
UNCOV
227
      self
×
228
    end
229

230
    # Create and execute a new `TimerTask`.
231
    #
232
    # @!macro timer_task_initialize
233
    #
234
    # @example 
235
    #   task = Concurrent::TimerTask.execute(execution_interval: 10){ print "Hello World\n" }
236
    #   task.running? #=> true
237
    #
238
    # @since 0.6.0
239
    def self.execute(opts = {}, &task)
1✔
UNCOV
240
      TimerTask.new(opts, &task).execute
×
241
    end
242

243
    # @!attribute [rw] execution_interval
244
    # @return [Fixnum] Number of seconds after the task completes before the
245
    #   task is performed again.
246
    def execution_interval
1✔
UNCOV
247
      mutex.lock
×
UNCOV
248
      @execution_interval
×
249
    ensure
UNCOV
250
      mutex.unlock
×
251
    end
252

253
    # @!attribute [rw] execution_interval
254
    # @return [Fixnum] Number of seconds after the task completes before the
255
    #   task is performed again.
256
    def execution_interval=(value)
1✔
UNCOV
257
      if (value = value.to_f) <= 0.0
×
UNCOV
258
        raise ArgumentError.new('must be greater than zero')
×
259
      else
260
        begin
UNCOV
261
          mutex.lock
×
UNCOV
262
          @execution_interval = value
×
263
        ensure
UNCOV
264
          mutex.unlock
×
265
        end
266
      end
267
    end
268

269
    # @!attribute [rw] timeout_interval
270
    # @return [Fixnum] Number of seconds the task can run before it is
271
    #   considered to have failed.
272
    def timeout_interval
1✔
UNCOV
273
      mutex.lock
×
UNCOV
274
      @timeout_interval
×
275
    ensure
UNCOV
276
      mutex.unlock
×
277
    end
278

279
    # @!attribute [rw] timeout_interval
280
    # @return [Fixnum] Number of seconds the task can run before it is
281
    #   considered to have failed.
282
    def timeout_interval=(value)
1✔
UNCOV
283
      if (value = value.to_f) <= 0.0
×
UNCOV
284
        raise ArgumentError.new('must be greater than zero')
×
285
      else
286
        begin
UNCOV
287
          mutex.lock
×
UNCOV
288
          @timeout_interval = value
×
289
        ensure
UNCOV
290
          mutex.unlock
×
291
        end
292
      end
293
    end
294

295
    private :post, :<<
1✔
296

297
    protected
1✔
298

299
    # @!visibility private
300
    def shutdown_execution
1✔
301
      @running.make_false
×
302
      super
×
303
    end
304

305
    # @!visibility private
306
    def kill_execution
1✔
UNCOV
307
      @running.make_false
×
UNCOV
308
      super
×
309
    end
310

311
    # @!visibility private
312
    def schedule_next_task(interval = execution_interval)
1✔
UNCOV
313
      Concurrent::timer(interval, Concurrent::Event.new, &method(:execute_task))
×
314
    end
315

316
    # @!visibility private
317
    def execute_task(completion)
1✔
UNCOV
318
      return unless @running.true?
×
UNCOV
319
      Concurrent::timer(timeout_interval, completion, &method(:timeout_task))
×
UNCOV
320
      success, value, reason = @executor.execute(self)
×
UNCOV
321
      if completion.try?
×
UNCOV
322
        self.value = value
×
UNCOV
323
        schedule_next_task
×
UNCOV
324
        time = Time.now
×
UNCOV
325
        observers.notify_observers do
×
UNCOV
326
          [time, self.value, reason]
×
327
        end
328
      end
329
    end
330

331
    # @!visibility private
332
    def timeout_task(completion)
1✔
UNCOV
333
      return unless @running.true?
×
UNCOV
334
      if completion.try?
×
UNCOV
335
        self.value = value
×
UNCOV
336
        schedule_next_task
×
UNCOV
337
        observers.notify_observers(Time.now, nil, Concurrent::TimeoutError.new)
×
338
      end
339
    end
340
  end
341
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc