• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jdantonio / concurrent-ruby / #714

23 May 2014 08:08PM UTC coverage: 40.697% (-56.5%) from 97.237%
#714

push

jdantonio
Merge pull request #96 from jdantonio/refactor/errors

Moved all custom errors into a single file and into the Concurrent module

17 of 18 new or added lines in 10 files covered. (94.44%)

1432 existing lines in 56 files now uncovered.

1028 of 2526 relevant lines covered (40.7%)

0.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.85
/lib/concurrent/executor/timer_set.rb
1
require 'thread'
1✔
2
require_relative 'executor'
1✔
3
require 'concurrent/options_parser'
1✔
4
require 'concurrent/atomic/event'
1✔
5
require 'concurrent/collection/priority_queue'
1✔
6
require 'concurrent/executor/single_thread_executor'
1✔
7

8
module Concurrent
1✔
9

10
  # Executes a collection of tasks at the specified times. A master thread
11
  # monitors the set and schedules each task for execution at the appropriate
12
  # time. Tasks are run on the global task pool or on the supplied executor.
13
  class TimerSet
1✔
14
    include Executor
1✔
15

16
    # Create a new set of timed tasks.
17
    #
18
    # @param [Hash] opts the options controlling how the future will be processed
19
    # @option opts [Boolean] :operation (false) when `true` will execute the future on the global
20
    #   operation pool (for long-running operations), when `false` will execute the future on the
21
    #   global task pool (for short-running tasks)
22
    # @option opts [object] :executor when provided will run all operations on
23
    #   this executor rather than the global thread pool (overrides :operation)
24
    def initialize(opts = {})
1✔
25
      @queue = PriorityQueue.new(order: :min)
1✔
26
      @task_executor = OptionsParser::get_executor_from(opts)
1✔
27
      @timer_executor = SingleThreadExecutor.new
1✔
28
      @condition = Condition.new
1✔
29
      init_executor
1✔
30
    end
31

32
    # Post a task to be execute at the specified time. The given time may be either
33
    # a `Time` object or the number of seconds to wait. If the intended execution
34
    # time is within 1/100th of a second of the current time the task will be
35
    # immediately post to the executor.
36
    #
37
    # @param [Object] intended_time the time to schedule the task for execution
38
    #
39
    # @yield the task to be performed
40
    #
41
    # @return [Boolean] true if the message is post, false after shutdown
42
    #
43
    # @raise [ArgumentError] if the intended execution time is not in the future
44
    # @raise [ArgumentError] if no block is given
45
    def post(intended_time, *args, &task)
1✔
UNCOV
46
      time = TimerSet.calculate_schedule_time(intended_time).to_f
×
UNCOV
47
      raise ArgumentError.new('no block given') unless block_given?
×
48

UNCOV
49
      mutex.synchronize do
×
UNCOV
50
        return false unless running?
×
51

UNCOV
52
        if (time - Time.now.to_f) <= 0.01
×
UNCOV
53
          @task_executor.post(*args, &task)
×
54
        else
UNCOV
55
          @queue.push(Task.new(time, args, task))
×
UNCOV
56
          @timer_executor.post(&method(:process_tasks))
×
57
        end
58

UNCOV
59
        true
×
60
      end
61

62
    end
63

64
    alias_method :kill, :shutdown
1✔
65

66
    # Calculate an Epoch time with milliseconds at which to execute a
67
    # task. If the given time is a `Time` object it will be converted
68
    # accordingly. If the time is an integer value greater than zero
69
    # it will be understood as a number of seconds in the future and
70
    # will be added to the current time to calculate Epoch.
71
    #
72
    # @param [Object] intended_time the time (as a `Time` object or an integer)
73
    #   to schedule the task for execution
74
    # @param [Time] now (Time.now) the time from which to calculate an interval
75
    #
76
    # @return [Fixnum] the intended time as seconds/millis from Epoch
77
    #
78
    # @raise [ArgumentError] if the intended execution time is not in the future
79
    def self.calculate_schedule_time(intended_time, now = Time.now)
1✔
UNCOV
80
      if intended_time.is_a?(Time)
×
UNCOV
81
        raise ArgumentError.new('schedule time must be in the future') if intended_time <= now
×
UNCOV
82
        intended_time
×
83
      else
UNCOV
84
        raise ArgumentError.new('seconds must be greater than zero') if intended_time.to_f < 0.0
×
UNCOV
85
        now + intended_time
×
86
      end
87
    end
88

89
    private
1✔
90

91
    # A struct for encapsulating a task and its intended execution time.
92
    # It facilitates proper prioritization by overriding the comparison
93
    # (spaceship) operator as a comparison of the intended execution
94
    # times.
95
    #
96
    # @!visibility private
97
    Task = Struct.new(:time, :args, :op) do
1✔
98
      include Comparable
1✔
99

100
      def <=>(other)
1✔
UNCOV
101
        self.time <=> other.time
×
102
      end
103
    end
104

105
    private_constant :Task
1✔
106

107
    # @!visibility private
108
    def shutdown_execution
1✔
109
      @queue.clear
1✔
110
      @timer_executor.kill
1✔
111
      stopped_event.set
1✔
112
    end
113

114
    # Run a loop and execute tasks in the scheduled order and at the approximate
115
    # scheduled time. If no tasks remain the thread will exit gracefully so that
116
    # garbage collection can occur. If there are no ready tasks it will sleep
117
    # for up to 60 seconds waiting for the next scheduled task.
118
    #
119
    # @!visibility private
120
    def process_tasks
1✔
UNCOV
121
      loop do
×
UNCOV
122
        break if @queue.empty?
×
123

UNCOV
124
        task = @queue.peek
×
UNCOV
125
        interval = task.time - Time.now.to_f
×
126

UNCOV
127
        if interval <= 0
×
UNCOV
128
          @task_executor.post(*task.args, &task.op)
×
UNCOV
129
          @queue.pop
×
130
        else
UNCOV
131
          mutex.synchronize do
×
UNCOV
132
            @condition.wait(mutex, [interval, 60].min)
×
133
          end
134
        end
135
      end
136
    end
137
  end
138
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc