• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ruby-concurrency / concurrent-ruby / #2743

04 Mar 2015 02:59AM UTC coverage: 45.152% (-50.4%) from 95.548%
#2743

push

jdantonio
Merge pull request #258 from ruby-concurrency/clock_time

Closes #256

23 of 69 new or added lines in 7 files covered. (33.33%)

1563 existing lines in 88 files now uncovered.

1425 of 3156 relevant lines covered (45.15%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

34.62
/lib/concurrent/timer_task.rb
1
require 'concurrent/dereferenceable'
1✔
2
require 'concurrent/observable'
1✔
3
require 'concurrent/atomic/atomic_boolean'
1✔
4
require 'concurrent/executor/executor'
1✔
5
require 'concurrent/executor/safe_task_executor'
1✔
6

7
module Concurrent
1✔
8

9
  # A very common currency pattern is to run a thread that performs a task at
10
  # regular intervals. The thread that performs the task sleeps for the given
11
  # interval then wakes up and performs the task. Lather, rinse, repeat... This
12
  # pattern causes two problems. First, it is difficult to test the business
13
  # logic of the task because the task itself is tightly coupled with the
14
  # concurrency logic. Second, an exception raised while performing the task can
15
  # cause the entire thread to abend. In a long-running application where the
16
  # task thread is intended to run for days/weeks/years a crashed task thread
17
  # can pose a significant problem. `TimerTask` alleviates both problems.
18
  #
19
  # When a `TimerTask` is launched it starts a thread for monitoring the
20
  # execution interval. The `TimerTask` thread does not perform the task,
21
  # however. Instead, the TimerTask launches the task on a separate thread.
22
  # Should the task experience an unrecoverable crash only the task thread will
23
  # crash. This makes the `TimerTask` very fault tolerant Additionally, the
24
  # `TimerTask` thread can respond to the success or failure of the task,
25
  # performing logging or ancillary operations. `TimerTask` can also be
26
  # configured with a timeout value allowing it to kill a task that runs too
27
  # long.
28
  #
29
  # One other advantage of `TimerTask` is that it forces the business logic to
30
  # be completely decoupled from the concurrency logic. The business logic can
31
  # be tested separately then passed to the `TimerTask` for scheduling and
32
  # running.
33
  #
34
  # In some cases it may be necessary for a `TimerTask` to affect its own
35
  # execution cycle. To facilitate this, a reference to the TimerTask instance
36
  # is passed as an argument to the provided block every time the task is
37
  # executed.
38
  #
39
  # The `TimerTask` class includes the `Dereferenceable` mixin module so the
40
  # result of the last execution is always available via the `#value` method.
41
  # Derefencing options can be passed to the `TimerTask` during construction or
42
  # at any later time using the `#set_deref_options` method.
43
  #
44
  # `TimerTask` supports notification through the Ruby standard library
45
  # {http://ruby-doc.org/stdlib-2.0/libdoc/observer/rdoc/Observable.html
46
  # Observable} module. On execution the `TimerTask` will notify the observers
47
  # with three arguments: time of execution, the result of the block (or nil on
48
  # failure), and any raised exceptions (or nil on success). If the timeout
49
  # interval is exceeded the observer will receive a `Concurrent::TimeoutError`
50
  # object as the third argument.
51
  #
52
  # @example Basic usage
53
  #   task = Concurrent::TimerTask.new{ puts 'Boom!' }
54
  #   task.execute
55
  #
56
  #   task.execution_interval #=> 60 (default)
57
  #   task.timeout_interval   #=> 30 (default)
58
  #
59
  #   # wait 60 seconds...
60
  #   #=> 'Boom!'
61
  #
62
  #   task.shutdown #=> true
63
  #
64
  # @example Configuring `:execution_interval` and `:timeout_interval`
65
  #   task = Concurrent::TimerTask.new(execution_interval: 5, timeout_interval: 5) do
66
  #          puts 'Boom!'
67
  #        end
68
  #
69
  #   task.execution_interval #=> 5
70
  #   task.timeout_interval   #=> 5
71
  #
72
  # @example Immediate execution with `:run_now`
73
  #   task = Concurrent::TimerTask.new(run_now: true){ puts 'Boom!' }
74
  #   task.execute
75
  #
76
  #   #=> 'Boom!'
77
  #
78
  # @example Last `#value` and `Dereferenceable` mixin
79
  #   task = Concurrent::TimerTask.new(
80
  #     dup_on_deref: true,
81
  #     execution_interval: 5
82
  #   ){ Time.now }
83
  #
84
  #   task.execute
85
  #   Time.now   #=> 2013-11-07 18:06:50 -0500
86
  #   sleep(10)
87
  #   task.value #=> 2013-11-07 18:06:55 -0500
88
  #
89
  # @example Controlling execution from within the block
90
  #   timer_task = Concurrent::TimerTask.new(execution_interval: 1) do |task|
91
  #     task.execution_interval.times{ print 'Boom! ' }
92
  #     print "\n"
93
  #     task.execution_interval += 1
94
  #     if task.execution_interval > 5
95
  #       puts 'Stopping...'
96
  #       task.shutdown
97
  #     end
98
  #   end
99
  #
100
  #   timer_task.execute # blocking call - this task will stop itself
101
  #   #=> Boom!
102
  #   #=> Boom! Boom!
103
  #   #=> Boom! Boom! Boom!
104
  #   #=> Boom! Boom! Boom! Boom!
105
  #   #=> Boom! Boom! Boom! Boom! Boom!
106
  #   #=> Stopping...
107
  #
108
  # @example Observation
109
  #   class TaskObserver
110
  #     def update(time, result, ex)
111
  #       if result
112
  #         print "(#{time}) Execution successfully returned #{result}\n"
113
  #       elsif ex.is_a?(Concurrent::TimeoutError)
114
  #         print "(#{time}) Execution timed out\n"
115
  #       else
116
  #         print "(#{time}) Execution failed with error #{ex}\n"
117
  #       end
118
  #     end
119
  #   end
120
  #
121
  #   task = Concurrent::TimerTask.new(execution_interval: 1, timeout_interval: 1){ 42 }
122
  #   task.add_observer(TaskObserver.new)
123
  #   task.execute
124
  #
125
  #   #=> (2013-10-13 19:08:58 -0400) Execution successfully returned 42
126
  #   #=> (2013-10-13 19:08:59 -0400) Execution successfully returned 42
127
  #   #=> (2013-10-13 19:09:00 -0400) Execution successfully returned 42
128
  #   task.shutdown
129
  #
130
  #   task = Concurrent::TimerTask.new(execution_interval: 1, timeout_interval: 1){ sleep }
131
  #   task.add_observer(TaskObserver.new)
132
  #   task.execute
133
  #
134
  #   #=> (2013-10-13 19:07:25 -0400) Execution timed out
135
  #   #=> (2013-10-13 19:07:27 -0400) Execution timed out
136
  #   #=> (2013-10-13 19:07:29 -0400) Execution timed out
137
  #   task.shutdown
138
  #
139
  #   task = Concurrent::TimerTask.new(execution_interval: 1){ raise StandardError }
140
  #   task.add_observer(TaskObserver.new)
141
  #   task.execute
142
  #
143
  #   #=> (2013-10-13 19:09:37 -0400) Execution failed with error StandardError
144
  #   #=> (2013-10-13 19:09:38 -0400) Execution failed with error StandardError
145
  #   #=> (2013-10-13 19:09:39 -0400) Execution failed with error StandardError
146
  #   task.shutdown
147
  #
148
  # @see http://ruby-doc.org/stdlib-2.0/libdoc/observer/rdoc/Observable.html
149
  # @see http://docs.oracle.com/javase/7/docs/api/java/util/TimerTask.html
150
  class TimerTask
1✔
151
    include Dereferenceable
1✔
152
    include RubyExecutor
1✔
153
    include Concurrent::Observable
1✔
154

155
    # Default `:execution_interval` in seconds.
156
    EXECUTION_INTERVAL = 60
1✔
157

158
    # Default `:timeout_interval` in seconds.
159
    TIMEOUT_INTERVAL = 30
1✔
160

161
    # Create a new TimerTask with the given task and configuration.
162
    #
163
    # @!macro [attach] timer_task_initialize
164
    #   @param [Hash] opts the options defining task execution.
165
    #   @option opts [Integer] :execution_interval number of seconds between
166
    #     task executions (default: EXECUTION_INTERVAL)
167
    #   @option opts [Integer] :timeout_interval number of seconds a task can
168
    #     run before it is considered to have failed (default: TIMEOUT_INTERVAL)
169
    #   @option opts [Boolean] :run_now Whether to run the task immediately
170
    #     upon instantiation or to wait until the first #  execution_interval
171
    #     has passed (default: false)
172
    #
173
    #   @raise ArgumentError when no block is given.
174
    #
175
    #   @yield to the block after :execution_interval seconds have passed since
176
    #     the last yield
177
    #   @yieldparam task a reference to the `TimerTask` instance so that the
178
    #     block can control its own lifecycle. Necessary since `self` will
179
    #     refer to the execution context of the block rather than the running
180
    #     `TimerTask`.
181
    #
182
    #   @note Calls Concurrent::Dereferenceable#  set_deref_options passing `opts`.
183
    #     All options supported by Concurrent::Dereferenceable can be set
184
    #     during object initialization.
185
    #
186
    #   @return [TimerTask] the new `TimerTask`
187
    #
188
    #   @see Concurrent::Dereferenceable#  set_deref_options
189
    def initialize(opts = {}, &task)
1✔
UNCOV
190
      raise ArgumentError.new('no block given') unless block_given?
×
191

UNCOV
192
      init_executor
×
UNCOV
193
      set_deref_options(opts)
×
194

UNCOV
195
      self.execution_interval = opts[:execution] || opts[:execution_interval] || EXECUTION_INTERVAL
×
UNCOV
196
      self.timeout_interval = opts[:timeout] || opts[:timeout_interval] || TIMEOUT_INTERVAL
×
UNCOV
197
      @run_now = opts[:now] || opts[:run_now]
×
UNCOV
198
      @executor = Concurrent::SafeTaskExecutor.new(task)
×
UNCOV
199
      @running = Concurrent::AtomicBoolean.new(false)
×
200

UNCOV
201
      self.observers = CopyOnNotifyObserverSet.new
×
202
    end
203

204
    # Is the executor running?
205
    #
206
    # @return [Boolean] `true` when running, `false` when shutting down or shutdown
207
    def running?
1✔
UNCOV
208
      @running.true?
×
209
    end
210

211
    # Execute a previously created `TimerTask`.
212
    #
213
    # @return [TimerTask] a reference to `self`
214
    #
215
    # @example Instance and execute in separate steps
216
    #   task = Concurrent::TimerTask.new(execution_interval: 10){ print "Hello World\n" }
217
    #   task.running? #=> false
218
    #   task.execute
219
    #   task.running? #=> true
220
    #
221
    # @example Instance and execute in one line
222
    #   task = Concurrent::TimerTask.new(execution_interval: 10){ print "Hello World\n" }.execute
223
    #   task.running? #=> true
224
    #
225
    # @since 0.6.0
226
    def execute
1✔
UNCOV
227
      mutex.synchronize do
×
UNCOV
228
        if @running.false?
×
UNCOV
229
          @running.make_true
×
UNCOV
230
          schedule_next_task(@run_now ? 0 : @execution_interval)
×
231
        end
232
      end
UNCOV
233
      self
×
234
    end
235

236
    # Create and execute a new `TimerTask`.
237
    #
238
    # @!macro timer_task_initialize
239
    #
240
    # @example
241
    #   task = Concurrent::TimerTask.execute(execution_interval: 10){ print "Hello World\n" }
242
    #   task.running? #=> true
243
    #
244
    # @since 0.6.0
245
    def self.execute(opts = {}, &task)
1✔
UNCOV
246
      TimerTask.new(opts, &task).execute
×
247
    end
248

249
    # @!attribute [rw] execution_interval
250
    # @return [Fixnum] Number of seconds after the task completes before the
251
    #   task is performed again.
252
    def execution_interval
1✔
UNCOV
253
      mutex.lock
×
UNCOV
254
      @execution_interval
×
255
    ensure
UNCOV
256
      mutex.unlock
×
257
    end
258

259
    # @!attribute [rw] execution_interval
260
    # @return [Fixnum] Number of seconds after the task completes before the
261
    #   task is performed again.
262
    def execution_interval=(value)
1✔
UNCOV
263
      if (value = value.to_f) <= 0.0
×
UNCOV
264
        raise ArgumentError.new('must be greater than zero')
×
265
      else
266
        begin
UNCOV
267
          mutex.lock
×
UNCOV
268
          @execution_interval = value
×
269
        ensure
UNCOV
270
          mutex.unlock
×
271
        end
272
      end
273
    end
274

275
    # @!attribute [rw] timeout_interval
276
    # @return [Fixnum] Number of seconds the task can run before it is
277
    #   considered to have failed.
278
    def timeout_interval
1✔
UNCOV
279
      mutex.lock
×
UNCOV
280
      @timeout_interval
×
281
    ensure
UNCOV
282
      mutex.unlock
×
283
    end
284

285
    # @!attribute [rw] timeout_interval
286
    # @return [Fixnum] Number of seconds the task can run before it is
287
    #   considered to have failed.
288
    def timeout_interval=(value)
1✔
UNCOV
289
      if (value = value.to_f) <= 0.0
×
UNCOV
290
        raise ArgumentError.new('must be greater than zero')
×
291
      else
292
        begin
UNCOV
293
          mutex.lock
×
UNCOV
294
          @timeout_interval = value
×
295
        ensure
UNCOV
296
          mutex.unlock
×
297
        end
298
      end
299
    end
300

301
    private :post, :<<
1✔
302

303
    protected
1✔
304

305
    # @!visibility private
306
    def shutdown_execution
1✔
UNCOV
307
      @running.make_false
×
UNCOV
308
      super
×
309
    end
310

311
    # @!visibility private
312
    def kill_execution
1✔
UNCOV
313
      @running.make_false
×
UNCOV
314
      super
×
315
    end
316

317
    # @!visibility private
318
    def schedule_next_task(interval = execution_interval)
1✔
UNCOV
319
      Concurrent::timer(interval, Concurrent::Event.new, &method(:execute_task))
×
320
    end
321

322
    # @!visibility private
323
    def execute_task(completion)
1✔
UNCOV
324
      return unless @running.true?
×
UNCOV
325
      Concurrent::timer(execution_interval, completion, &method(:timeout_task))
×
UNCOV
326
      _success, value, reason = @executor.execute(self)
×
UNCOV
327
      if completion.try?
×
UNCOV
328
        self.value = value
×
UNCOV
329
        schedule_next_task
×
UNCOV
330
        time = Time.now
×
UNCOV
331
        observers.notify_observers do
×
UNCOV
332
          [time, self.value, reason]
×
333
        end
334
      end
335
    end
336

337
    # @!visibility private
338
    def timeout_task(completion)
1✔
UNCOV
339
      return unless @running.true?
×
UNCOV
340
      if completion.try?
×
UNCOV
341
        self.value = value
×
UNCOV
342
        schedule_next_task
×
UNCOV
343
        observers.notify_observers(Time.now, nil, Concurrent::TimeoutError.new)
×
344
      end
345
    end
346
  end
347
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc