• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ruby-concurrency / concurrent-ruby / #2743

04 Mar 2015 02:59AM UTC coverage: 45.152% (-50.4%) from 95.548%
#2743

push

jdantonio
Merge pull request #258 from ruby-concurrency/clock_time

Closes #256

23 of 69 new or added lines in 7 files covered. (33.33%)

1563 existing lines in 88 files now uncovered.

1425 of 3156 relevant lines covered (45.15%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.91
/lib/concurrent/executor/ruby_thread_pool_executor.rb
1
require 'thread'
1✔
2

3
require 'concurrent/atomic/event'
1✔
4
require 'concurrent/executor/executor'
1✔
5
require 'concurrent/executor/ruby_thread_pool_worker'
1✔
6
require 'concurrent/utility/monotonic_time'
1✔
7

8
module Concurrent
1✔
9

10
  # @!macro thread_pool_executor
11
  class RubyThreadPoolExecutor
1✔
12
    include RubyExecutor
1✔
13

14
    # Default maximum number of threads that will be created in the pool.
15
    DEFAULT_MAX_POOL_SIZE      = 2**15 # 32768
1✔
16

17
    # Default minimum number of threads that will be retained in the pool.
18
    DEFAULT_MIN_POOL_SIZE      = 0
1✔
19

20
    # Default maximum number of tasks that may be added to the task queue.
21
    DEFAULT_MAX_QUEUE_SIZE     = 0
1✔
22

23
    # Default maximum number of seconds a thread in the pool may remain idle
24
    # before being reclaimed.
25
    DEFAULT_THREAD_IDLETIMEOUT = 60
1✔
26

27
    # The maximum number of threads that may be created in the pool.
28
    attr_reader :max_length
1✔
29

30
    # The minimum number of threads that may be retained in the pool.
31
    attr_reader :min_length
1✔
32

33
    # The largest number of threads that have been created in the pool since construction.
34
    attr_reader :largest_length
1✔
35

36
    # The number of tasks that have been scheduled for execution on the pool since construction.
37
    attr_reader :scheduled_task_count
1✔
38

39
    # The number of tasks that have been completed by the pool since construction.
40
    attr_reader :completed_task_count
1✔
41

42
    # The number of seconds that a thread may be idle before being reclaimed.
43
    attr_reader :idletime
1✔
44

45
    # The maximum number of tasks that may be waiting in the work queue at any one time.
46
    # When the queue size reaches `max_queue` subsequent tasks will be rejected in
47
    # accordance with the configured `fallback_policy`.
48
    attr_reader :max_queue
1✔
49

50
    # Create a new thread pool.
51
    #
52
    # @param [Hash] opts the options which configure the thread pool
53
    #
54
    # @option opts [Integer] :max_threads (DEFAULT_MAX_POOL_SIZE) the maximum
55
    #   number of threads to be created
56
    # @option opts [Integer] :min_threads (DEFAULT_MIN_POOL_SIZE) the minimum
57
    #   number of threads to be retained
58
    # @option opts [Integer] :idletime (DEFAULT_THREAD_IDLETIMEOUT) the maximum
59
    #   number of seconds a thread may be idle before being reclaimed
60
    # @option opts [Integer] :max_queue (DEFAULT_MAX_QUEUE_SIZE) the maximum
61
    #   number of tasks allowed in the work queue at any one time; a value of
62
    #   zero means the queue may grow without bound
63
    # @option opts [Symbol] :fallback_policy (:abort) the policy for handling new
64
    #   tasks that are received when the queue size has reached
65
    #   `max_queue` or the executor has shut down
66
    #
67
    # @raise [ArgumentError] if `:max_threads` is less than one
68
    # @raise [ArgumentError] if `:min_threads` is less than zero
69
    # @raise [ArgumentError] if `:fallback_policy` is not one of the values specified
70
    #   in `FALLBACK_POLICIES`
71
    #
72
    # @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html
73
    def initialize(opts = {})
1✔
74
      @min_length      = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
2✔
75
      @max_length      = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
2✔
76
      @idletime        = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
2✔
77
      @max_queue       = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
2✔
78
      @fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort))
2✔
79
      warn '[DEPRECATED] :overflow_policy is deprecated terminology, please use :fallback_policy instead' if opts.has_key?(:overflow_policy)
2✔
80

81
      raise ArgumentError.new('max_threads must be greater than zero') if @max_length <= 0
2✔
82
      raise ArgumentError.new('min_threads cannot be less than zero') if @min_length < 0
2✔
83
      raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy)
2✔
84
      raise ArgumentError.new('min_threads cannot be more than max_threads') if min_length > max_length
2✔
85

86
      init_executor
2✔
87

88
      @pool                 = []
2✔
89
      @queue                = Queue.new
2✔
90
      @scheduled_task_count = 0
2✔
91
      @completed_task_count = 0
2✔
92
      @largest_length       = 0
2✔
93

94
      @gc_interval  = opts.fetch(:gc_interval, 1).to_i # undocumented
2✔
95
      @last_gc_time = Concurrent.monotonic_time - [1.0, (@gc_interval * 2.0)].max
2✔
96
    end
97

98
    # @!macro executor_module_method_can_overflow_question
99
    def can_overflow?
1✔
100
      @max_queue != 0
×
101
    end
102

103
    # The number of threads currently in the pool.
104
    #
105
    # @return [Integer] the length
106
    def length
1✔
UNCOV
107
      mutex.synchronize { running? ? @pool.length : 0 }
×
108
    end
109

110
    alias_method :current_length, :length
1✔
111

112
    # The number of tasks in the queue awaiting execution.
113
    #
114
    # @return [Integer] the queue_length
115
    def queue_length
1✔
UNCOV
116
      mutex.synchronize { running? ? @queue.length : 0 }
×
117
    end
118

119
    # Number of tasks that may be enqueued before reaching `max_queue` and rejecting
120
    # new tasks. A value of -1 indicates that the queue may grow without bound.
121
    #
122
    # @return [Integer] the remaining_capacity
123
    def remaining_capacity
1✔
UNCOV
124
      mutex.synchronize { @max_queue == 0 ? -1 : @max_queue - @queue.length }
×
125
    end
126

127
    # Returns an array with the status of each thread in the pool
128
    #
129
    # This method is deprecated and will be removed soon.
130
    def status
1✔
UNCOV
131
      warn '[DEPRECATED] `status` is deprecated and will be removed soon.'
×
UNCOV
132
      mutex.synchronize { @pool.collect { |worker| worker.status } }
×
133
    end
134

135
    # Run on task completion.
136
    #
137
    # @!visibility private
138
    def on_end_task
1✔
UNCOV
139
      mutex.synchronize do
×
UNCOV
140
        @completed_task_count += 1 #if success
×
UNCOV
141
        break unless running?
×
142
      end
143
    end
144

145
    # Run when a thread worker exits.
146
    #
147
    # @!visibility private
148
    def on_worker_exit(worker)
1✔
UNCOV
149
      mutex.synchronize do
×
UNCOV
150
        @pool.delete(worker)
×
UNCOV
151
        if @pool.empty? && !running?
×
UNCOV
152
          stop_event.set
×
UNCOV
153
          stopped_event.set
×
154
        end
155
      end
156
    end
157

158
    protected
1✔
159

160
    # @!visibility private
161
    def execute(*args, &task)
1✔
UNCOV
162
      if ensure_capacity?
×
UNCOV
163
        @scheduled_task_count += 1
×
UNCOV
164
        @queue << [args, task]
×
165
      else
UNCOV
166
        handle_fallback(*args, &task) if @max_queue != 0 && @queue.length >= @max_queue
×
167
      end
UNCOV
168
      prune_pool
×
169
    end
170

171
    # @!visibility private
172
    def shutdown_execution
1✔
173
      if @pool.empty?
2✔
174
        stopped_event.set
2✔
175
      else
UNCOV
176
        @pool.length.times { @queue << :stop }
×
177
      end
178
    end
179

180
    # @!visibility private
181
    def kill_execution
1✔
UNCOV
182
      @queue.clear
×
UNCOV
183
      drain_pool
×
184
    end
185

186
    # Check the thread pool configuration and determine if the pool
187
    # has enought capacity to handle the request. Will grow the size
188
    # of the pool if necessary.
189
    #
190
    # @return [Boolean] true if the pool has enough capacity else false
191
    #
192
    # @!visibility private
193
    def ensure_capacity?
1✔
UNCOV
194
      additional = 0
×
UNCOV
195
      capacity   = true
×
196

UNCOV
197
      if @pool.size < @min_length
×
UNCOV
198
        additional = @min_length - @pool.size
×
UNCOV
199
      elsif @queue.empty? && @queue.num_waiting >= 1
×
UNCOV
200
        additional = 0
×
UNCOV
201
      elsif @pool.size == 0 && @min_length == 0
×
UNCOV
202
        additional = 1
×
UNCOV
203
      elsif @pool.size < @max_length || @max_length == 0
×
UNCOV
204
        additional = 1
×
UNCOV
205
      elsif @max_queue == 0 || @queue.size < @max_queue
×
UNCOV
206
        additional = 0
×
207
      else
UNCOV
208
        capacity = false
×
209
      end
210

UNCOV
211
      additional.times do
×
UNCOV
212
        @pool << create_worker_thread
×
213
      end
214

UNCOV
215
      if additional > 0
×
UNCOV
216
        @largest_length = [@largest_length, @pool.length].max
×
217
      end
218

UNCOV
219
      capacity
×
220
    end
221

222
    # Scan all threads in the pool and reclaim any that are dead or
223
    # have been idle too long. Will check the last time the pool was
224
    # pruned and only run if the configured garbage collection
225
    # interval has passed.
226
    #
227
    # @!visibility private
228
    def prune_pool
1✔
NEW
229
      if Concurrent.monotonic_time - @gc_interval >= @last_gc_time
×
UNCOV
230
        @pool.delete_if { |worker| worker.dead? }
×
231
        # send :stop for each thread over idletime
UNCOV
232
        @pool.
×
NEW
233
            select { |worker| @idletime != 0 && Concurrent.monotonic_time - @idletime > worker.last_activity }.
×
UNCOV
234
            each { @queue << :stop }
×
NEW
235
        @last_gc_time = Concurrent.monotonic_time
×
236
      end
237
    end
238

239
    # Reclaim all threads in the pool.
240
    #
241
    # @!visibility private
242
    def drain_pool
1✔
UNCOV
243
      @pool.each { |worker| worker.kill }
×
UNCOV
244
      @pool.clear
×
245
    end
246

247
    # Create a single worker thread to be added to the pool.
248
    #
249
    # @return [Thread] the new thread.
250
    #
251
    # @!visibility private
252
    def create_worker_thread
1✔
UNCOV
253
      wrkr = RubyThreadPoolWorker.new(@queue, self)
×
UNCOV
254
      Thread.new(wrkr, self) do |worker, parent|
×
UNCOV
255
        Thread.current.abort_on_exception = false
×
UNCOV
256
        worker.run
×
UNCOV
257
        parent.on_worker_exit(worker)
×
258
      end
UNCOV
259
      return wrkr
×
260
    end
261
  end
262
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc