• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ruby-concurrency / concurrent-ruby / #2835

05 Oct 2014 10:16PM UTC coverage: 45.201% (-49.6%) from 94.81%
#2835

push

jdantonio
Merge pull request #158 from obrok/promise-composition

Promise composition

2 of 15 new or added lines in 1 file covered. (13.33%)

1514 existing lines in 84 files now uncovered.

1375 of 3042 relevant lines covered (45.2%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.85
/lib/concurrent/executor/timer_set.rb
1
require 'thread'
1✔
2
require_relative 'executor'
1✔
3
require 'concurrent/options_parser'
1✔
4
require 'concurrent/atomic/event'
1✔
5
require 'concurrent/collection/priority_queue'
1✔
6
require 'concurrent/executor/single_thread_executor'
1✔
7

8
module Concurrent
1✔
9

10
  # Executes a collection of tasks at the specified times. A master thread
11
  # monitors the set and schedules each task for execution at the appropriate
12
  # time. Tasks are run on the global task pool or on the supplied executor.
13
  class TimerSet
1✔
14
    include RubyExecutor
1✔
15

16
    # Create a new set of timed tasks.
17
    #
18
    # @param [Hash] opts the options controlling how the future will be processed
19
    # @option opts [Boolean] :operation (false) when `true` will execute the future on the global
20
    #   operation pool (for long-running operations), when `false` will execute the future on the
21
    #   global task pool (for short-running tasks)
22
    # @option opts [object] :executor when provided will run all operations on
23
    #   this executor rather than the global thread pool (overrides :operation)
24
    def initialize(opts = {})
1✔
25
      @queue = PriorityQueue.new(order: :min)
1✔
26
      @task_executor = OptionsParser::get_executor_from(opts) || Concurrent.configuration.global_task_pool
1✔
27
      @timer_executor = SingleThreadExecutor.new
1✔
28
      @condition = Condition.new
1✔
29
      init_executor
1✔
30
    end
31

32
    # Post a task to be execute at the specified time. The given time may be either
33
    # a `Time` object or the number of seconds to wait. If the intended execution
34
    # time is within 1/100th of a second of the current time the task will be
35
    # immediately post to the executor.
36
    #
37
    # @param [Object] intended_time the time to schedule the task for execution
38
    #
39
    # @yield the task to be performed
40
    #
41
    # @return [Boolean] true if the message is post, false after shutdown
42
    #
43
    # @raise [ArgumentError] if the intended execution time is not in the future
44
    # @raise [ArgumentError] if no block is given
45
    def post(intended_time, *args, &task)
1✔
UNCOV
46
      time = TimerSet.calculate_schedule_time(intended_time).to_f
×
UNCOV
47
      raise ArgumentError.new('no block given') unless block_given?
×
48

UNCOV
49
      mutex.synchronize do
×
UNCOV
50
        return false unless running?
×
51

UNCOV
52
        if (time - Time.now.to_f) <= 0.01
×
UNCOV
53
          @task_executor.post(*args, &task)
×
54
        else
UNCOV
55
          @queue.push(Task.new(time, args, task))
×
UNCOV
56
          @timer_executor.post(&method(:process_tasks))
×
57
        end
58

UNCOV
59
        true
×
60
      end
61

62
    end
63

64
    # For a timer, #kill is like an orderly shutdown, except we need to manually
65
    # (and destructively) clear the queue first
66
    def kill
1✔
UNCOV
67
      @queue.clear
×
UNCOV
68
      shutdown
×
69
    end
70

71
    # Calculate an Epoch time with milliseconds at which to execute a
72
    # task. If the given time is a `Time` object it will be converted
73
    # accordingly. If the time is an integer value greater than zero
74
    # it will be understood as a number of seconds in the future and
75
    # will be added to the current time to calculate Epoch.
76
    #
77
    # @param [Object] intended_time the time (as a `Time` object or an integer)
78
    #   to schedule the task for execution
79
    # @param [Time] now (Time.now) the time from which to calculate an interval
80
    #
81
    # @return [Fixnum] the intended time as seconds/millis from Epoch
82
    #
83
    # @raise [ArgumentError] if the intended execution time is not in the future
84
    def self.calculate_schedule_time(intended_time, now = Time.now)
1✔
UNCOV
85
      if intended_time.is_a?(Time)
×
UNCOV
86
        raise ArgumentError.new('schedule time must be in the future') if intended_time <= now
×
UNCOV
87
        intended_time
×
88
      else
UNCOV
89
        raise ArgumentError.new('seconds must be greater than zero') if intended_time.to_f < 0.0
×
UNCOV
90
        now + intended_time
×
91
      end
92
    end
93

94
    private
1✔
95

96
    # A struct for encapsulating a task and its intended execution time.
97
    # It facilitates proper prioritization by overriding the comparison
98
    # (spaceship) operator as a comparison of the intended execution
99
    # times.
100
    #
101
    # @!visibility private
102
    Task = Struct.new(:time, :args, :op) do
1✔
103
      include Comparable
1✔
104

105
      def <=>(other)
1✔
UNCOV
106
        self.time <=> other.time
×
107
      end
108
    end
109

110
    private_constant :Task
1✔
111

112
    # @!visibility private
113
    def shutdown_execution
1✔
114
      @queue.clear
1✔
115
      @timer_executor.kill
1✔
116
      stopped_event.set
1✔
117
    end
118

119
    # Run a loop and execute tasks in the scheduled order and at the approximate
120
    # scheduled time. If no tasks remain the thread will exit gracefully so that
121
    # garbage collection can occur. If there are no ready tasks it will sleep
122
    # for up to 60 seconds waiting for the next scheduled task.
123
    #
124
    # @!visibility private
125
    def process_tasks
1✔
UNCOV
126
      loop do
×
UNCOV
127
        break if @queue.empty?
×
128

UNCOV
129
        task = @queue.peek
×
UNCOV
130
        interval = task.time - Time.now.to_f
×
131

UNCOV
132
        if interval <= 0
×
UNCOV
133
          @task_executor.post(*task.args, &task.op)
×
UNCOV
134
          @queue.pop
×
135
        else
UNCOV
136
          mutex.synchronize do
×
UNCOV
137
            @condition.wait(mutex, [interval, 60].min)
×
138
          end
139
        end
140
      end
141
    end
142
  end
143
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc