• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jdantonio / concurrent-ruby / #714

23 May 2014 08:08PM UTC coverage: 40.697% (-56.5%) from 97.237%
#714

push

jdantonio
Merge pull request #96 from jdantonio/refactor/errors

Moved all custom errors into a single file and into the Concurrent module

17 of 18 new or added lines in 10 files covered. (94.44%)

1432 existing lines in 56 files now uncovered.

1028 of 2526 relevant lines covered (40.7%)

0.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.23
/lib/concurrent/timer_task.rb
1
require 'concurrent/dereferenceable'
1✔
2
require 'concurrent/observable'
1✔
3
require 'concurrent/atomic/atomic_boolean'
1✔
4
require 'concurrent/executor/executor'
1✔
5
require 'concurrent/executor/safe_task_executor'
1✔
6

7
# deprecated Updated to use `Executor` instead of `Runnable`
8
require 'concurrent/runnable'
1✔
9

10
module Concurrent
1✔
11

12
  # A very common currency pattern is to run a thread that performs a task at regular
13
  # intervals. The thread that performs the task sleeps for the given interval then
14
  # wakes up and performs the task. Lather, rinse, repeat... This pattern causes two
15
  # problems. First, it is difficult to test the business logic of the task because the
16
  # task itself is tightly coupled with the concurrency logic. Second, an exception in
17
  # raised while performing the task can cause the entire thread to abend. In a
18
  # long-running application where the task thread is intended to run for days/weeks/years
19
  # a crashed task thread can pose a significant problem. `TimerTask` alleviates both problems.
20
  # 
21
  # When a `TimerTask` is launched it starts a thread for monitoring the execution interval.
22
  # The `TimerTask` thread does not perform the task, however. Instead, the TimerTask
23
  # launches the task on a separate thread. Should the task experience an unrecoverable
24
  # crash only the task thread will crash. This makes the `TimerTask` very fault tolerant
25
  # Additionally, the `TimerTask` thread can respond to the success or failure of the task,
26
  # performing logging or ancillary operations. `TimerTask` can also be configured with a
27
  # timeout value allowing it to kill a task that runs too long.
28
  # 
29
  # One other advantage of `TimerTask` is it forces the business logic to be completely decoupled
30
  # from the concurrency logic. The business logic can be tested separately then passed to the
31
  # `TimerTask` for scheduling and running.
32
  # 
33
  # In some cases it may be necessary for a `TimerTask` to affect its own execution cycle.
34
  # To facilitate this a reference to the task object is passed into the block as a block
35
  # argument every time the task is executed.
36
  # 
37
  # The `TimerTask` class includes the `Dereferenceable` mixin module so the result of
38
  # the last execution is always available via the `#value` method. Derefencing options
39
  # can be passed to the `TimerTask` during construction or at any later time using the
40
  # `#set_deref_options` method.
41
  # 
42
  # `TimerTask` supports notification through the Ruby standard library
43
  # {http://ruby-doc.org/stdlib-2.0/libdoc/observer/rdoc/Observable.html Observable}
44
  # module. On execution the `TimerTask` will notify the observers
45
  # with threes arguments: time of execution, the result of the block (or nil on failure),
46
  # and any raised exceptions (or nil on success). If the timeout interval is exceeded
47
  # the observer will receive a `Concurrent::TimeoutError` object as the third argument.
48
  #
49
  # @example Basic usage
50
  #   task = Concurrent::TimerTask.new{ puts 'Boom!' }
51
  #   task.run!
52
  #   
53
  #   task.execution_interval #=> 60 (default)
54
  #   task.timeout_interval   #=> 30 (default)
55
  #   
56
  #   # wait 60 seconds...
57
  #   #=> 'Boom!'
58
  #   
59
  #   task.stop #=> true
60
  #
61
  # @example Configuring `:execution_interval` and `:timeout_interval`
62
  #   task = Concurrent::TimerTask.new(execution_interval: 5, timeout_interval: 5) do
63
  #          puts 'Boom!'
64
  #        end
65
  #   
66
  #   task.execution_interval #=> 5
67
  #   task.timeout_interval   #=> 5
68
  #
69
  # @example Immediate execution with `:run_now`
70
  #   task = Concurrent::TimerTask.new(run_now: true){ puts 'Boom!' }
71
  #   task.run!
72
  #   
73
  #   #=> 'Boom!'
74
  #
75
  # @example Last `#value` and `Dereferenceable` mixin
76
  #   task = Concurrent::TimerTask.new(
77
  #     dup_on_deref: true,
78
  #     execution_interval: 5
79
  #   ){ Time.now }
80
  #   
81
  #   task.run!
82
  #   Time.now   #=> 2013-11-07 18:06:50 -0500
83
  #   sleep(10)
84
  #   task.value #=> 2013-11-07 18:06:55 -0500
85
  #
86
  # @example Controlling execution from within the block
87
  #   timer_task = Concurrent::TimerTask.new(execution_interval: 1) do |task|
88
  #     task.execution_interval.times{ print 'Boom! ' }
89
  #     print "\n"
90
  #     task.execution_interval += 1
91
  #     if task.execution_interval > 5
92
  #       puts 'Stopping...'
93
  #       task.stop
94
  #     end
95
  #   end
96
  #   
97
  #   timer_task.run # blocking call - this task will stop itself
98
  #   #=> Boom!
99
  #   #=> Boom! Boom!
100
  #   #=> Boom! Boom! Boom!
101
  #   #=> Boom! Boom! Boom! Boom!
102
  #   #=> Boom! Boom! Boom! Boom! Boom!
103
  #   #=> Stopping...
104
  #
105
  # @example Observation
106
  #   class TaskObserver
107
  #     def update(time, result, ex)
108
  #       if result
109
  #         print "(#{time}) Execution successfully returned #{result}\n"
110
  #       elsif ex.is_a?(Concurrent::TimeoutError)
111
  #         print "(#{time}) Execution timed out\n"
112
  #       else
113
  #         print "(#{time}) Execution failed with error #{ex}\n"
114
  #       end
115
  #     end
116
  #   end
117
  #   
118
  #   task = Concurrent::TimerTask.new(execution_interval: 1, timeout_interval: 1){ 42 }
119
  #   task.add_observer(TaskObserver.new)
120
  #   task.run!
121
  #   
122
  #   #=> (2013-10-13 19:08:58 -0400) Execution successfully returned 42
123
  #   #=> (2013-10-13 19:08:59 -0400) Execution successfully returned 42
124
  #   #=> (2013-10-13 19:09:00 -0400) Execution successfully returned 42
125
  #   task.stop
126
  #   
127
  #   task = Concurrent::TimerTask.new(execution_interval: 1, timeout_interval: 1){ sleep }
128
  #   task.add_observer(TaskObserver.new)
129
  #   task.run!
130
  #   
131
  #   #=> (2013-10-13 19:07:25 -0400) Execution timed out
132
  #   #=> (2013-10-13 19:07:27 -0400) Execution timed out
133
  #   #=> (2013-10-13 19:07:29 -0400) Execution timed out
134
  #   task.stop
135
  #   
136
  #   task = Concurrent::TimerTask.new(execution_interval: 1){ raise StandardError }
137
  #   task.add_observer(TaskObserver.new)
138
  #   task.run!
139
  #   
140
  #   #=> (2013-10-13 19:09:37 -0400) Execution failed with error StandardError
141
  #   #=> (2013-10-13 19:09:38 -0400) Execution failed with error StandardError
142
  #   #=> (2013-10-13 19:09:39 -0400) Execution failed with error StandardError
143
  #   task.stop
144
  #
145
  # @see http://ruby-doc.org/stdlib-2.0/libdoc/observer/rdoc/Observable.html
146
  # @see http://docs.oracle.com/javase/7/docs/api/java/util/TimerTask.html
147
  class TimerTask
1✔
148
    include Dereferenceable
1✔
149
    include Executor
1✔
150
    include Concurrent::Observable
1✔
151

152
    # Default `:execution_interval` in seconds.
153
    EXECUTION_INTERVAL = 60
1✔
154

155
    # Default `:timeout_interval` in seconds.
156
    TIMEOUT_INTERVAL = 30
1✔
157

158
    # Create a new TimerTask with the given task and configuration.
159
    #
160
    # @!macro [attach] timer_task_initialize
161
    #   @param [Hash] opts the options defining task execution.
162
    #   @option opts [Integer] :execution_interval number of seconds between
163
    #     task executions (default: EXECUTION_INTERVAL)
164
    #   @option opts [Integer] :timeout_interval number of seconds a task can
165
    #     run before it is considered to have failed (default: TIMEOUT_INTERVAL)
166
    #   @option opts [Boolean] :run_now Whether to run the task immediately
167
    #     upon instantiation or to wait until the first #  execution_interval
168
    #     has passed (default: false)
169
    #  
170
    #   @raise ArgumentError when no block is given.
171
    #  
172
    #   @yield to the block after :execution_interval seconds have passed since
173
    #     the last yield
174
    #   @yieldparam task a reference to the `TimerTask` instance so that the
175
    #     block can control its own lifecycle. Necessary since `self` will
176
    #     refer to the execution context of the block rather than the running
177
    #     `TimerTask`.
178
    #  
179
    #   @note Calls Concurrent::Dereferenceable#  set_deref_options passing `opts`.
180
    #     All options supported by Concurrent::Dereferenceable can be set
181
    #     during object initialization.
182
    #
183
    #   @return [TimerTask] the new `TimerTask`
184
    #  
185
    #   @see Concurrent::Dereferenceable#  set_deref_options
186
    def initialize(opts = {}, &task)
1✔
UNCOV
187
      raise ArgumentError.new('no block given') unless block_given?
×
188

UNCOV
189
      init_executor
×
UNCOV
190
      set_deref_options(opts)
×
191

UNCOV
192
      self.execution_interval = opts[:execution] || opts[:execution_interval] || EXECUTION_INTERVAL
×
UNCOV
193
      self.timeout_interval = opts[:timeout] || opts[:timeout_interval] || TIMEOUT_INTERVAL
×
UNCOV
194
      @run_now = opts[:now] || opts[:run_now]
×
UNCOV
195
      @executor = Concurrent::SafeTaskExecutor.new(task)
×
UNCOV
196
      @running = Concurrent::AtomicBoolean.new(false)
×
197

UNCOV
198
      self.observers = CopyOnNotifyObserverSet.new
×
199
    end
200

201
    # Is the executor running?
202
    #
203
    # @return [Boolean] `true` when running, `false` when shutting down or shutdown
204
    def running?
1✔
205
      @running.true?
×
206
    end
207

208
    # Execute a previously created `TimerTask`.
209
    #
210
    # @return [TimerTask] a reference to `self`
211
    #
212
    # @example Instance and execute in separate steps
213
    #   task = Concurrent::TimerTask.new(execution_interval: 10){ print "Hello World\n" }
214
    #   task.running? #=> false
215
    #   task.execute
216
    #   task.running? #=> true
217
    #
218
    # @example Instance and execute in one line
219
    #   task = Concurrent::TimerTask.new(execution_interval: 10){ print "Hello World\n" }.execute
220
    #   task.running? #=> true
221
    #
222
    # @since 0.6.0
223
    def execute
1✔
UNCOV
224
      mutex.synchronize do
×
UNCOV
225
        if @running.false?
×
UNCOV
226
          @running.make_true
×
UNCOV
227
          schedule_next_task(@run_now ? 0 : @execution_interval)
×
228
        end
229
      end
UNCOV
230
      self
×
231
    end
232

233
    # Create and execute a new `TimerTask`.
234
    #
235
    # @!macro timer_task_initialize
236
    #
237
    # @example 
238
    #   task = Concurrent::TimerTask.execute(execution_interval: 10){ print "Hello World\n" }
239
    #   task.running? #=> true
240
    #
241
    # @since 0.6.0
242
    def self.execute(opts = {}, &task)
1✔
UNCOV
243
      TimerTask.new(opts, &task).execute
×
244
    end
245

246
    # @!attribute [rw] execution_interval
247
    # @return [Fixnum] Number of seconds after the task completes before the
248
    #   task is performed again.
249
    def execution_interval
1✔
UNCOV
250
      mutex.lock
×
UNCOV
251
      @execution_interval
×
252
    ensure
UNCOV
253
      mutex.unlock
×
254
    end
255

256
    # @!attribute [rw] execution_interval
257
    # @return [Fixnum] Number of seconds after the task completes before the
258
    #   task is performed again.
259
    def execution_interval=(value)
1✔
UNCOV
260
      if (value = value.to_f) <= 0.0
×
UNCOV
261
        raise ArgumentError.new('must be greater than zero')
×
262
      else
263
        begin
UNCOV
264
          mutex.lock
×
UNCOV
265
          @execution_interval = value
×
266
        ensure
UNCOV
267
          mutex.unlock
×
268
        end
269
      end
270
    end
271

272
    # @!attribute [rw] timeout_interval
273
    # @return [Fixnum] Number of seconds the task can run before it is
274
    #   considered to have failed.
275
    def timeout_interval
1✔
UNCOV
276
      mutex.lock
×
UNCOV
277
      @timeout_interval
×
278
    ensure
UNCOV
279
      mutex.unlock
×
280
    end
281

282
    # @!attribute [rw] timeout_interval
283
    # @return [Fixnum] Number of seconds the task can run before it is
284
    #   considered to have failed.
285
    def timeout_interval=(value)
1✔
UNCOV
286
      if (value = value.to_f) <= 0.0
×
UNCOV
287
        raise ArgumentError.new('must be greater than zero')
×
288
      else
289
        begin
UNCOV
290
          mutex.lock
×
UNCOV
291
          @timeout_interval = value
×
292
        ensure
UNCOV
293
          mutex.unlock
×
294
        end
295
      end
296
    end
297

298
    # @deprecated Updated to use `Executor` instead of `Runnable`
299
    def terminate(*args) deprecated(:terminate, :kill, *args); end
1✔
300

301
    # @deprecated Updated to use `Executor` instead of `Runnable`
302
    def stop(*args) deprecated(:stop, :shutdown, *args); end
1✔
303

304
    # @deprecated Updated to use `Executor` instead of `Runnable`
305
    def cancel(*args) deprecated(:cancel, :shutdown, *args); end
1✔
306

307
    # @deprecated Updated to use `Executor` instead of `Runnable`
308
    def run!(*args) deprecated(:run!, :execute); end
1✔
309

310
    # @deprecated Updated to use `Executor` instead of `Runnable`
311
    def self.run!(*args, &block)
1✔
312
      warn "[DEPRECATED] `run!` is deprecated, please use `execute` instead."
×
313
      Concurrent::Runnable::Context.new(TimerTask.new(*args, &block))
×
314
    end
315

316
    # @deprecated Updated to use `Executor` instead of `Runnable`
317
    def run
1✔
318
      raise Concurrent::Runnable::LifecycleError.new('already running') if @running.true?
×
319
      self.execute
×
320
      self.wait_for_termination
×
321
      true
×
322
    end
323

324
    private :post, :<<
1✔
325

326
    protected
1✔
327

328
    # @!visibility private
329
    def shutdown_execution
1✔
330
      @running.make_false
×
331
      super
×
332
    end
333

334
    # @!visibility private
335
    def kill_execution
1✔
UNCOV
336
      @running.make_false
×
UNCOV
337
      super
×
338
    end
339

340
    # @!visibility private
341
    def schedule_next_task(interval = execution_interval)
1✔
UNCOV
342
      Concurrent::timer(interval, Concurrent::Event.new, &method(:execute_task))
×
343
    end
344

345
    # @!visibility private
346
    def execute_task(completion)
1✔
UNCOV
347
      return unless @running.true?
×
UNCOV
348
      Concurrent::timer(timeout_interval, completion, &method(:timeout_task))
×
UNCOV
349
      success, value, reason = @executor.execute(self)
×
UNCOV
350
      if completion.try?
×
UNCOV
351
        self.value = value
×
UNCOV
352
        schedule_next_task
×
UNCOV
353
        time = Time.now
×
UNCOV
354
        observers.notify_observers do
×
UNCOV
355
          [time, self.value, reason]
×
356
        end
357
      end
358
    end
359

360
    # @!visibility private
361
    def timeout_task(completion)
1✔
UNCOV
362
      return unless @running.true?
×
UNCOV
363
      if completion.try?
×
UNCOV
364
        self.value = value
×
UNCOV
365
        schedule_next_task
×
UNCOV
366
        observers.notify_observers(Time.now, nil, Concurrent::TimeoutError.new)
×
367
      end
368
    end
369

370
    # @deprecated Updated to use `Executor` instead of `Runnable`
371
    # @!visibility private
372
    def deprecated(old, new, *args)
1✔
373
      warn "[DEPRECATED] `#{old}` is deprecated, please use `#{new}` instead."
×
374
      self.send(new, *args)
×
375
    end
376
  end
377
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc