• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jdantonio / concurrent-ruby / #712

24 Apr 2014 12:31AM UTC coverage: 77.896% (-19.9%) from 97.805%
#712

push

jdantonio
Attempting to fix a brittle test of Concurrent::timer.

1903 of 2443 relevant lines covered (77.9%)

541.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.84
/lib/concurrent/timer_task.rb
1
require 'concurrent/dereferenceable'
1✔
2
require 'concurrent/observable'
1✔
3
require 'concurrent/atomic/atomic_boolean'
1✔
4
require 'concurrent/executor/executor'
1✔
5
require 'concurrent/executor/safe_task_executor'
1✔
6

7
# deprecated Updated to use `Executor` instead of `Runnable`
8
require 'concurrent/runnable'
1✔
9

10
module Concurrent
1✔
11

12
  # A very common currency pattern is to run a thread that performs a task at regular
13
  # intervals. The thread that performs the task sleeps for the given interval then
14
  # wakes up and performs the task. Lather, rinse, repeat... This pattern causes two
15
  # problems. First, it is difficult to test the business logic of the task because the
16
  # task itself is tightly coupled with the concurrency logic. Second, an exception in
17
  # raised while performing the task can cause the entire thread to abend. In a
18
  # long-running application where the task thread is intended to run for days/weeks/years
19
  # a crashed task thread can pose a significant problem. `TimerTask` alleviates both problems.
20
  # 
21
  # When a `TimerTask` is launched it starts a thread for monitoring the execution interval.
22
  # The `TimerTask` thread does not perform the task, however. Instead, the TimerTask
23
  # launches the task on a separate thread. Should the task experience an unrecoverable
24
  # crash only the task thread will crash. This makes the `TimerTask` very fault tolerant
25
  # Additionally, the `TimerTask` thread can respond to the success or failure of the task,
26
  # performing logging or ancillary operations. `TimerTask` can also be configured with a
27
  # timeout value allowing it to kill a task that runs too long.
28
  # 
29
  # One other advantage of `TimerTask` is it forces the business logic to be completely decoupled
30
  # from the concurrency logic. The business logic can be tested separately then passed to the
31
  # `TimerTask` for scheduling and running.
32
  # 
33
  # In some cases it may be necessary for a `TimerTask` to affect its own execution cycle.
34
  # To facilitate this a reference to the task object is passed into the block as a block
35
  # argument every time the task is executed.
36
  # 
37
  # The `TimerTask` class includes the `Dereferenceable` mixin module so the result of
38
  # the last execution is always available via the `#value` method. Derefencing options
39
  # can be passed to the `TimerTask` during construction or at any later time using the
40
  # `#set_deref_options` method.
41
  # 
42
  # `TimerTask` supports notification through the Ruby standard library
43
  # {http://ruby-doc.org/stdlib-2.0/libdoc/observer/rdoc/Observable.html Observable}
44
  # module. On execution the `TimerTask` will notify the observers
45
  # with threes arguments: time of execution, the result of the block (or nil on failure),
46
  # and any raised exceptions (or nil on success). If the timeout interval is exceeded
47
  # the observer will receive a `Concurrent::TimeoutError` object as the third argument.
48
  #
49
  # @example Basic usage
50
  #   task = Concurrent::TimerTask.new{ puts 'Boom!' }
51
  #   task.run!
52
  #   
53
  #   task.execution_interval #=> 60 (default)
54
  #   task.timeout_interval   #=> 30 (default)
55
  #   
56
  #   # wait 60 seconds...
57
  #   #=> 'Boom!'
58
  #   
59
  #   task.stop #=> true
60
  #
61
  # @example Configuring `:execution_interval` and `:timeout_interval`
62
  #   task = Concurrent::TimerTask.new(execution_interval: 5, timeout_interval: 5) do
63
  #          puts 'Boom!'
64
  #        end
65
  #   
66
  #   task.execution_interval #=> 5
67
  #   task.timeout_interval   #=> 5
68
  #
69
  # @example Immediate execution with `:run_now`
70
  #   task = Concurrent::TimerTask.new(run_now: true){ puts 'Boom!' }
71
  #   task.run!
72
  #   
73
  #   #=> 'Boom!'
74
  #
75
  # @example Last `#value` and `Dereferenceable` mixin
76
  #   task = Concurrent::TimerTask.new(
77
  #     dup_on_deref: true,
78
  #     execution_interval: 5
79
  #   ){ Time.now }
80
  #   
81
  #   task.run!
82
  #   Time.now   #=> 2013-11-07 18:06:50 -0500
83
  #   sleep(10)
84
  #   task.value #=> 2013-11-07 18:06:55 -0500
85
  #
86
  # @example Controlling execution from within the block
87
  #   timer_task = Concurrent::TimerTask.new(execution_interval: 1) do |task|
88
  #     task.execution_interval.times{ print 'Boom! ' }
89
  #     print "\n"
90
  #     task.execution_interval += 1
91
  #     if task.execution_interval > 5
92
  #       puts 'Stopping...'
93
  #       task.stop
94
  #     end
95
  #   end
96
  #   
97
  #   timer_task.run # blocking call - this task will stop itself
98
  #   #=> Boom!
99
  #   #=> Boom! Boom!
100
  #   #=> Boom! Boom! Boom!
101
  #   #=> Boom! Boom! Boom! Boom!
102
  #   #=> Boom! Boom! Boom! Boom! Boom!
103
  #   #=> Stopping...
104
  #
105
  # @example Observation
106
  #   class TaskObserver
107
  #     def update(time, result, ex)
108
  #       if result
109
  #         print "(#{time}) Execution successfully returned #{result}\n"
110
  #       elsif ex.is_a?(Concurrent::TimeoutError)
111
  #         print "(#{time}) Execution timed out\n"
112
  #       else
113
  #         print "(#{time}) Execution failed with error #{ex}\n"
114
  #       end
115
  #     end
116
  #   end
117
  #   
118
  #   task = Concurrent::TimerTask.new(execution_interval: 1, timeout_interval: 1){ 42 }
119
  #   task.add_observer(TaskObserver.new)
120
  #   task.run!
121
  #   
122
  #   #=> (2013-10-13 19:08:58 -0400) Execution successfully returned 42
123
  #   #=> (2013-10-13 19:08:59 -0400) Execution successfully returned 42
124
  #   #=> (2013-10-13 19:09:00 -0400) Execution successfully returned 42
125
  #   task.stop
126
  #   
127
  #   task = Concurrent::TimerTask.new(execution_interval: 1, timeout_interval: 1){ sleep }
128
  #   task.add_observer(TaskObserver.new)
129
  #   task.run!
130
  #   
131
  #   #=> (2013-10-13 19:07:25 -0400) Execution timed out
132
  #   #=> (2013-10-13 19:07:27 -0400) Execution timed out
133
  #   #=> (2013-10-13 19:07:29 -0400) Execution timed out
134
  #   task.stop
135
  #   
136
  #   task = Concurrent::TimerTask.new(execution_interval: 1){ raise StandardError }
137
  #   task.add_observer(TaskObserver.new)
138
  #   task.run!
139
  #   
140
  #   #=> (2013-10-13 19:09:37 -0400) Execution failed with error StandardError
141
  #   #=> (2013-10-13 19:09:38 -0400) Execution failed with error StandardError
142
  #   #=> (2013-10-13 19:09:39 -0400) Execution failed with error StandardError
143
  #   task.stop
144
  #
145
  # @see http://ruby-doc.org/stdlib-2.0/libdoc/observer/rdoc/Observable.html
146
  # @see http://docs.oracle.com/javase/7/docs/api/java/util/TimerTask.html
147
  class TimerTask
1✔
148
    include Dereferenceable
1✔
149
    include Executor
1✔
150
    include Concurrent::Observable
1✔
151

152
    # Default `:execution_interval` in seconds.
153
    EXECUTION_INTERVAL = 60
1✔
154

155
    # Default `:timeout_interval` in seconds.
156
    TIMEOUT_INTERVAL = 30
1✔
157

158
    # Create a new TimerTask with the given task and configuration.
159
    #
160
    # @!macro [attach] timer_task_initialize
161
    #   @param [Hash] opts the options defining task execution.
162
    #   @option opts [Integer] :execution_interval number of seconds between
163
    #     task executions (default: EXECUTION_INTERVAL)
164
    #   @option opts [Integer] :timeout_interval number of seconds a task can
165
    #     run before it is considered to have failed (default: TIMEOUT_INTERVAL)
166
    #   @option opts [Boolean] :run_now Whether to run the task immediately
167
    #     upon instantiation or to wait until the first #  execution_interval
168
    #     has passed (default: false)
169
    #  
170
    #   @raise ArgumentError when no block is given.
171
    #  
172
    #   @yield to the block after :execution_interval seconds have passed since
173
    #     the last yield
174
    #   @yieldparam task a reference to the `TimerTask` instance so that the
175
    #     block can control its own lifecycle. Necessary since `self` will
176
    #     refer to the execution context of the block rather than the running
177
    #     `TimerTask`.
178
    #  
179
    #   @note Calls Concurrent::Dereferenceable#  set_deref_options passing `opts`.
180
    #     All options supported by Concurrent::Dereferenceable can be set
181
    #     during object initialization.
182
    #
183
    #   @return [TimerTask] the new `TimerTask`
184
    #  
185
    #   @see Concurrent::Dereferenceable#  set_deref_options
186
    def initialize(opts = {}, &task)
1✔
187
      raise ArgumentError.new('no block given') unless block_given?
70✔
188

189
      init_executor
69✔
190
      set_deref_options(opts)
69✔
191

192
      self.execution_interval = opts[:execution] || opts[:execution_interval] || EXECUTION_INTERVAL
69✔
193
      self.timeout_interval = opts[:timeout] || opts[:timeout_interval] || TIMEOUT_INTERVAL
67✔
194
      @run_now = opts[:now] || opts[:run_now]
65✔
195
      @executor = Concurrent::SafeTaskExecutor.new(task)
65✔
196
      @running = Concurrent::AtomicBoolean.new(false)
65✔
197

198
      self.observers = CopyOnNotifyObserverSet.new
65✔
199
    end
200

201
    # Is the executor running?
202
    #
203
    # @return [Boolean] `true` when running, `false` when shutting down or shutdown
204
    def running?
1✔
205
      @running.true?
16✔
206
    end
207

208
    # Execute a previously created `TimerTask`.
209
    #
210
    # @return [TimerTask] a reference to `self`
211
    #
212
    # @example Instance and execute in separate steps
213
    #   task = Concurrent::TimerTask.new(execution_interval: 10){ print "Hello World\n" }
214
    #   task.running? #=> false
215
    #   task.execute
216
    #   task.running? #=> true
217
    #
218
    # @example Instance and execute in one line
219
    #   task = Concurrent::TimerTask.new(execution_interval: 10){ print "Hello World\n" }.execute
220
    #   task.running? #=> true
221
    #
222
    # @since 0.6.0
223
    def execute
1✔
224
      mutex.synchronize do
43✔
225
        if @running.false?
43✔
226
          @running.make_true
43✔
227
          schedule_next_task(@run_now ? 0 : @execution_interval)
43✔
228
        end
229
      end
230
      self
43✔
231
    end
232

233
    # Create and execute a new `TimerTask`.
234
    #
235
    # @!macro timer_task_initialize
236
    #
237
    # @example 
238
    #   task = Concurrent::TimerTask.execute(execution_interval: 10){ print "Hello World\n" }
239
    #   task.running? #=> true
240
    #
241
    # @since 0.6.0
242
    def self.execute(opts = {}, &task)
1✔
243
      TimerTask.new(opts, &task).execute
×
244
    end
245

246
    # @!attribute [rw] execution_interval
247
    # @return [Fixnum] Number of seconds after the task completes before the
248
    #   task is performed again.
249
    def execution_interval
1✔
250
      mutex.synchronize{ @execution_interval }
120✔
251
    end
252

253
    # @!attribute [rw] execution_interval
254
    # @return [Fixnum] Number of seconds after the task completes before the
255
    #   task is performed again.
256
    def execution_interval=(value)
1✔
257
      if (value = value.to_f) <= 0.0
71✔
258
        raise ArgumentError.new('must be greater than zero')
2✔
259
      else
260
        mutex.synchronize{ @execution_interval = value }
138✔
261
      end
262
    end
263

264
    # @!attribute [rw] timeout_interval
265
    # @return [Fixnum] Number of seconds the task can run before it is
266
    #   considered to have failed.
267
    def timeout_interval
1✔
268
      mutex.synchronize{ @timeout_interval }
121✔
269
    end
270

271
    # @!attribute [rw] timeout_interval
272
    # @return [Fixnum] Number of seconds the task can run before it is
273
    #   considered to have failed.
274
    def timeout_interval=(value)
1✔
275
      if (value = value.to_f) <= 0.0
69✔
276
        raise ArgumentError.new('must be greater than zero')
2✔
277
      else
278
        mutex.synchronize{ @timeout_interval = value }
134✔
279
      end
280
    end
281

282
    # @deprecated Updated to use `Executor` instead of `Runnable`
283
    def terminate(*args) deprecated(:terminate, :kill, *args); end
1✔
284

285
    # @deprecated Updated to use `Executor` instead of `Runnable`
286
    def stop(*args) deprecated(:stop, :shutdown, *args); end
13✔
287

288
    # @deprecated Updated to use `Executor` instead of `Runnable`
289
    def cancel(*args) deprecated(:cancel, :shutdown, *args); end
1✔
290

291
    # @deprecated Updated to use `Executor` instead of `Runnable`
292
    def run!(*args) deprecated(:run!, :execute); end
2✔
293

294
    # @deprecated Updated to use `Executor` instead of `Runnable`
295
    def self.run!(*args, &block)
1✔
296
      warn "[DEPRECATED] `run!` is deprecated, please use `execute` instead."
6✔
297
      Concurrent::Runnable::Context.new(TimerTask.new(*args, &block))
6✔
298
    end
299

300
    # @deprecated Updated to use `Executor` instead of `Runnable`
301
    def run
1✔
302
      raise Concurrent::Runnable::LifecycleError.new('already running') if @running.true?
17✔
303
      self.execute
16✔
304
      self.wait_for_termination
16✔
305
      true
9✔
306
    end
307

308
    private :post, :<<
1✔
309

310
    protected
1✔
311

312
    # @!visibility private
313
    def shutdown_execution
1✔
314
      @running.make_false
5✔
315
      super
5✔
316
    end
317

318
    # @!visibility private
319
    def kill_execution
1✔
320
      @running.make_false
17✔
321
      super
17✔
322
    end
323

324
    # @!visibility private
325
    def schedule_next_task(interval = execution_interval)
1✔
326
      Concurrent::timer(interval, Concurrent::Event.new, &method(:execute_task))
98✔
327
    end
328

329
    # @!visibility private
330
    def execute_task(completion)
1✔
331
      return unless @running.true?
57✔
332
      Concurrent::timer(timeout_interval, completion, &method(:timeout_task))
56✔
333
      success, value, reason = @executor.execute(self)
55✔
334
      if completion.try?
54✔
335
        self.value = value
54✔
336
        schedule_next_task
54✔
337
        time = Time.now
54✔
338
        observers.notify_observers do
54✔
339
          [time, self.value, reason]
23✔
340
        end
341
      end
342
    end
343

344
    # @!visibility private
345
    def timeout_task(completion)
1✔
346
      return unless @running.true?
1✔
347
      if completion.try?
1✔
348
        self.value = value
1✔
349
        schedule_next_task
1✔
350
        observers.notify_observers(Time.now, nil, Concurrent::TimeoutError.new)
1✔
351
      end
352
    end
353

354
    # @deprecated Updated to use `Executor` instead of `Runnable`
355
    # @!visibility private
356
    def deprecated(old, new, *args)
1✔
357
      warn "[DEPRECATED] `#{old}` is deprecated, please use `#{new}` instead."
13✔
358
      self.send(new, *args)
13✔
359
    end
360
  end
361
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc