• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ruby-concurrency / concurrent-ruby / #2742

04 Mar 2015 02:59AM UTC coverage: 91.73% (-3.8%) from 95.548%
#2742

push

jdantonio
Merge pull request #258 from ruby-concurrency/clock_time

Closes #256

48 of 69 new or added lines in 7 files covered. (69.57%)

118 existing lines in 18 files now uncovered.

2895 of 3156 relevant lines covered (91.73%)

462.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.77
/lib/concurrent/executor/timer_set.rb
1
require 'thread'
1✔
2
require 'concurrent/options_parser'
1✔
3
require 'concurrent/atomic/event'
1✔
4
require 'concurrent/collection/priority_queue'
1✔
5
require 'concurrent/executor/executor'
1✔
6
require 'concurrent/executor/single_thread_executor'
1✔
7
require 'concurrent/utility/monotonic_time'
1✔
8

9
module Concurrent
1✔
10

11
  # Executes a collection of tasks, each after a given delay. A master task
12
  # monitors the set and schedules each task for execution at the appropriate
13
  # time. Tasks are run on the global task pool or on the supplied executor.
14
  #
15
  # @!macro monotonic_clock_warning
16
  class TimerSet
1✔
17
    include RubyExecutor
1✔
18

19
    # Create a new set of timed tasks.
20
    #
21
    # @param [Hash] opts the options controlling how the future will be processed
22
    # @option opts [Boolean] :operation (false) when `true` will execute the future on the global
23
    #   operation pool (for long-running operations), when `false` will execute the future on the
24
    #   global task pool (for short-running tasks)
25
    # @option opts [object] :executor when provided will run all operations on
26
    #   this executor rather than the global thread pool (overrides :operation)
27
    def initialize(opts = {})
1✔
28
      @queue          = PriorityQueue.new(order: :min)
98✔
29
      @task_executor  = OptionsParser::get_executor_from(opts) || Concurrent.configuration.global_task_pool
98✔
30
      @timer_executor = SingleThreadExecutor.new
98✔
31
      @condition      = Condition.new
98✔
32
      init_executor
98✔
33
    end
34

35
    # Post a task to be execute run after a given delay (in seconds). If the
36
    # delay is less than 1/100th of a second the task will be immediately post
37
    # to the executor.
38
    #
39
    # @param [Float] delay the number of seconds to wait for before executing the task
40
    #
41
    # @yield the task to be performed
42
    #
43
    # @return [Boolean] true if the message is post, false after shutdown
44
    #
45
    # @raise [ArgumentError] if the intended execution time is not in the future
46
    # @raise [ArgumentError] if no block is given
47
    #
48
    # @!macro deprecated_scheduling_by_clock_time
49
    def post(delay, *args, &task)
1✔
50
      raise ArgumentError.new('no block given') unless block_given?
1,240✔
51
      delay = TimerSet.calculate_delay!(delay) # raises exceptions
1,239✔
52

53
      mutex.synchronize do
1,237✔
54
        return false unless running?
1,237✔
55

56
        if (delay) <= 0.01
1,235✔
57
          @task_executor.post(*args, &task)
1,032✔
58
        else
59
          @queue.push(Task.new(Concurrent.monotonic_time + delay, args, task))
203✔
60
          @timer_executor.post(&method(:process_tasks))
203✔
61
        end
62
      end
63

64
      @condition.signal
1,235✔
65
      true
1,235✔
66
    end
67

68
    # @!visibility private
69
    def <<(task)
1✔
NEW
70
      post(0.0, &task)
×
NEW
71
      self
×
72
    end
73

74
    # For a timer, #kill is like an orderly shutdown, except we need to manually
75
    # (and destructively) clear the queue first
76
    def kill
1✔
77
      mutex.synchronize { @queue.clear }
62✔
78
      # possible race condition
79
      shutdown
31✔
80
    end
81

82
    # Schedule a task to be executed after a given delay (in seconds).
83
    #
84
    # @param [Float] delay the number of seconds to wait for before executing the task
85
    #
86
    # @return [Float] the number of seconds to delay
87
    #
88
    # @raise [ArgumentError] if the intended execution time is not in the future
89
    # @raise [ArgumentError] if no block is given
90
    #
91
    # @!macro deprecated_scheduling_by_clock_time
92
    #
93
    # @!visibility private
94
    def self.calculate_delay!(delay)
1✔
95
      if delay.is_a?(Time)
2,296✔
96
        warn '[DEPRECATED] Use an interval not a clock time; schedule is now based on a monotonic clock'
4✔
97
        now = Time.now
4✔
98
        raise ArgumentError.new('schedule time must be in the future') if delay <= now
4✔
99
        delay.to_f - now.to_f
2✔
100
      else
101
        raise ArgumentError.new('seconds must be greater than zero') if delay.to_f < 0.0
2,292✔
102
        delay.to_f
2,290✔
103
      end
104
    end
105

106
    private
1✔
107

108
    # A struct for encapsulating a task and its intended execution time.
109
    # It facilitates proper prioritization by overriding the comparison
110
    # (spaceship) operator as a comparison of the intended execution
111
    # times.
112
    #
113
    # @!visibility private
114
    Task = Struct.new(:time, :args, :op) do
1✔
115
      include Comparable
1✔
116

117
      def <=>(other)
1✔
118
        self.time <=> other.time
95✔
119
      end
120
    end
121

122
    private_constant :Task
1✔
123

124
    # @!visibility private
125
    def shutdown_execution
1✔
126
      @queue.clear
28✔
127
      @timer_executor.kill
28✔
128
      stopped_event.set
28✔
129
    end
130

131
    # Run a loop and execute tasks in the scheduled order and at the approximate
132
    # scheduled time. If no tasks remain the thread will exit gracefully so that
133
    # garbage collection can occur. If there are no ready tasks it will sleep
134
    # for up to 60 seconds waiting for the next scheduled task.
135
    #
136
    # @!visibility private
137
    def process_tasks
1✔
138
      loop do
144✔
139
        task = mutex.synchronize { @queue.peek }
672✔
140
        break unless task
336✔
141

142
        now = Concurrent.monotonic_time
225✔
143
        diff = task.time - now
225✔
144

145
        if diff <= 0
225✔
146
          # We need to remove the task from the queue before passing
147
          # it to the executor, to avoid race conditions where we pass
148
          # the peek'ed task to the executor and then pop a different
149
          # one that's been added in the meantime.
150
          #
151
          # Note that there's no race condition between the peek and
152
          # this pop - this pop could retrieve a different task from
153
          # the peek, but that task would be due to fire now anyway
154
          # (because @queue is a priority queue, and this thread is
155
          # the only reader, so whatever timer is at the head of the
156
          # queue now must have the same pop time, or a closer one, as
157
          # when we peeked).
158
          task = mutex.synchronize { @queue.pop }
228✔
159
          @task_executor.post(*task.args, &task.op)
114✔
160
        else
161
          mutex.synchronize do
111✔
162
            @condition.wait(mutex, [diff, 60].min)
111✔
163
          end
164
        end
165
      end
166
    end
167
  end
168
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc