• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ruby-concurrency / concurrent-ruby / #2782

08 Dec 2014 03:40PM UTC coverage: 91.391% (-0.4%) from 91.753%
#2782

push

jdantonio
Merge pull request #201 from rkday/fallback_handling

Posting to a shutdown thread pool - JRuby consistency and better naming

17 of 25 new or added lines in 5 files covered. (68.0%)

212 existing lines in 36 files now uncovered.

2813 of 3078 relevant lines covered (91.39%)

371.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.08
/lib/concurrent/executor/ruby_thread_pool_executor.rb
1
require 'thread'
1✔
2

3
require_relative 'executor'
1✔
4
require 'concurrent/atomic/event'
1✔
5
require 'concurrent/executor/ruby_thread_pool_worker'
1✔
6

7
module Concurrent
1✔
8

9
  # @!macro thread_pool_executor
10
  class RubyThreadPoolExecutor
1✔
11
    include RubyExecutor
1✔
12

13
    # Default maximum number of threads that will be created in the pool.
14
    DEFAULT_MAX_POOL_SIZE      = 2**15 # 32768
1✔
15

16
    # Default minimum number of threads that will be retained in the pool.
17
    DEFAULT_MIN_POOL_SIZE      = 0
1✔
18

19
    # Default maximum number of tasks that may be added to the task queue.
20
    DEFAULT_MAX_QUEUE_SIZE     = 0
1✔
21

22
    # Default maximum number of seconds a thread in the pool may remain idle
23
    # before being reclaimed.
24
    DEFAULT_THREAD_IDLETIMEOUT = 60
1✔
25

26
    # The maximum number of threads that may be created in the pool.
27
    attr_reader :max_length
1✔
28

29
    # The minimum number of threads that may be retained in the pool.
30
    attr_reader :min_length
1✔
31

32
    # The largest number of threads that have been created in the pool since construction.
33
    attr_reader :largest_length
1✔
34

35
    # The number of tasks that have been scheduled for execution on the pool since construction.
36
    attr_reader :scheduled_task_count
1✔
37

38
    # The number of tasks that have been completed by the pool since construction.
39
    attr_reader :completed_task_count
1✔
40

41
    # The number of seconds that a thread may be idle before being reclaimed.
42
    attr_reader :idletime
1✔
43

44
    # The maximum number of tasks that may be waiting in the work queue at any one time.
45
    # When the queue size reaches `max_queue` subsequent tasks will be rejected in
46
    # accordance with the configured `fallback_policy`.
47
    attr_reader :max_queue
1✔
48

49
    # Create a new thread pool.
50
    #
51
    # @param [Hash] opts the options which configure the thread pool
52
    #
53
    # @option opts [Integer] :max_threads (DEFAULT_MAX_POOL_SIZE) the maximum
54
    #   number of threads to be created
55
    # @option opts [Integer] :min_threads (DEFAULT_MIN_POOL_SIZE) the minimum
56
    #   number of threads to be retained
57
    # @option opts [Integer] :idletime (DEFAULT_THREAD_IDLETIMEOUT) the maximum
58
    #   number of seconds a thread may be idle before being reclaimed
59
    # @option opts [Integer] :max_queue (DEFAULT_MAX_QUEUE_SIZE) the maximum
60
    #   number of tasks allowed in the work queue at any one time; a value of
61
    #   zero means the queue may grow without bound
62
    # @option opts [Symbol] :fallback_policy (:abort) the policy for handling new
63
    #   tasks that are received when the queue size has reached
64
    #   `max_queue` or the executor has shut down
65
    #
66
    # @raise [ArgumentError] if `:max_threads` is less than one
67
    # @raise [ArgumentError] if `:min_threads` is less than zero
68
    # @raise [ArgumentError] if `:fallback_policy` is not one of the values specified
69
    #   in `FALLBACK_POLICIES`
70
    #
71
    # @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html
72
    def initialize(opts = {})
1✔
73
      @min_length      = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
474✔
74
      @max_length      = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
474✔
75
      @idletime        = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
474✔
76
      @max_queue       = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
474✔
77
      @fallback_policy = opts.fetch(:fallback_policy, opts.fetch(:overflow_policy, :abort))
474✔
78

474✔
79
      raise ArgumentError.new('max_threads must be greater than zero') if @max_length <= 0
80
      raise ArgumentError.new('min_threads cannot be less than zero') if @min_length < 0
474✔
81
      raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy)
473✔
82
      raise ArgumentError.new('min_threads cannot be more than max_threads') if min_length > max_length
472✔
83

471✔
84
      init_executor
85

470✔
86
      @pool                 = []
87
      @queue                = Queue.new
470✔
88
      @scheduled_task_count = 0
470✔
89
      @completed_task_count = 0
470✔
90
      @largest_length       = 0
470✔
91

470✔
92
      @gc_interval  = opts.fetch(:gc_interval, 1).to_i # undocumented
93
      @last_gc_time = Time.now.to_f - [1.0, (@gc_interval * 2.0)].max
470✔
94
    end
470✔
95

96
    # @!macro executor_module_method_can_overflow_question
97
    def can_overflow?
98
      @max_queue != 0
1✔
UNCOV
99
    end
×
100

101
    # The number of threads currently in the pool.
102
    #
103
    # @return [Integer] the length
104
    def length
105
      mutex.synchronize { running? ? @pool.length : 0 }
1✔
106
    end
54✔
107

108
    alias_method :current_length, :length
109

1✔
110
    # The number of tasks in the queue awaiting execution.
111
    #
112
    # @return [Integer] the queue_length
113
    def queue_length
114
      mutex.synchronize { running? ? @queue.length : 0 }
1✔
115
    end
10✔
116

117
    # Number of tasks that may be enqueued before reaching `max_queue` and rejecting
118
    # new tasks. A value of -1 indicates that the queue may grow without bound.
119
    #
120
    # @return [Integer] the remaining_capacity
121
    def remaining_capacity
122
      mutex.synchronize { @max_queue == 0 ? -1 : @max_queue - @queue.length }
1✔
123
    end
10✔
124

125
    # Returns an array with the status of each thread in the pool
126
    #
127
    # This method is deprecated and will be removed soon.
128
    def status
129
      warn '[DEPRECATED] `status` is deprecated and will be removed soon.'
1✔
130
      mutex.synchronize { @pool.collect { |worker| worker.status } }
2✔
131
    end
4✔
132

133
    # Run on task completion.
134
    #
135
    # @!visibility private
136
    def on_end_task
137
      mutex.synchronize do
1✔
138
        @completed_task_count += 1 #if success
3,404✔
139
        break unless running?
3,403✔
140
      end
3,403✔
141
    end
142

143
    # Run when a thread worker exits.
144
    #
145
    # @!visibility private
146
    def on_worker_exit(worker)
147
      mutex.synchronize do
1✔
148
        @pool.delete(worker)
586✔
149
        if @pool.empty? && !running?
586✔
150
          stop_event.set
586✔
151
          stopped_event.set
74✔
152
        end
74✔
153
      end
154
    end
155

156
    protected
157

1✔
158
    # @!visibility private
159
    def execute(*args, &task)
160
      prune_pool
1✔
161
      if ensure_capacity?
6,038✔
162
        @scheduled_task_count += 1
6,038✔
163
        @queue << [args, task]
3,732✔
164
      else
3,732✔
165
        handle_fallback(*args, &task) if @max_queue != 0 && @queue.length >= @max_queue
166
      end
2,306✔
167
    end
168

169
    # @!visibility private
170
    def shutdown_execution
171
      if @pool.empty?
1✔
172
        stopped_event.set
57✔
173
      else
16✔
174
        @pool.length.times { @queue << :stop }
175
      end
437✔
176
    end
177

178
    # @!visibility private
179
    def kill_execution
180
      @queue.clear
1✔
181
      drain_pool
107✔
182
    end
107✔
183

184
    # Check the thread pool configuration and determine if the pool
185
    # has enought capacity to handle the request. Will grow the size
186
    # of the pool if necessary.
187
    #
188
    # @return [Boolean] true if the pool has enough capacity else false
189
    #
190
    # @!visibility private
191
    def ensure_capacity?
192
      additional = 0
1✔
193
      capacity   = true
6,038✔
194

6,038✔
195
      if @pool.size < @min_length
196
        additional = @min_length - @pool.size
6,038✔
197
      elsif @queue.empty? && @queue.num_waiting >= 1
218✔
198
        additional = 0
5,820✔
199
      elsif @pool.size == 0 && @min_length == 0
2,081✔
200
        additional = 1
3,739✔
201
      elsif @pool.size < @max_length || @max_length == 0
38✔
202
        additional = 1
3,701✔
203
      elsif @max_queue == 0 || @queue.size < @max_queue
911✔
204
        additional = 0
2,790✔
205
      else
484✔
206
        capacity = false
207
      end
2,306✔
208

209
      additional.times do
210
        @pool << create_worker_thread
6,038✔
211
      end
11,786✔
212

213
      if additional > 0
214
        @largest_length = [@largest_length, @pool.length].max
6,038✔
215
      end
1,167✔
216

217
      capacity
218
    end
6,038✔
219

220
    # Scan all threads in the pool and reclaim any that are dead or
221
    # have been idle too long. Will check the last time the pool was
222
    # pruned and only run if the configured garbage collection
223
    # interval has passed.
224
    #
225
    # @!visibility private
226
    def prune_pool
227
      if Time.now.to_f - @gc_interval >= @last_gc_time
1✔
228
        @pool.delete_if { |worker| worker.dead? }
6,040✔
229
        # send :stop for each thread over idletime
497✔
230
        @pool.
231
            select { |worker| @idletime != 0 && Time.now.to_f - @idletime > worker.last_activity }.
267✔
232
            each { @queue << :stop }
227✔
233
        @last_gc_time = Time.now.to_f
4✔
234
      end
267✔
235
    end
236

237
    # Reclaim all threads in the pool.
238
    #
239
    # @!visibility private
240
    def drain_pool
241
      @pool.each { |worker| worker.kill }
1✔
242
      @pool.clear
425✔
243
    end
107✔
244

245
    # Create a single worker thread to be added to the pool.
246
    #
247
    # @return [Thread] the new thread.
248
    #
249
    # @!visibility private
250
    def create_worker_thread
251
      wrkr = RubyThreadPoolWorker.new(@queue, self)
1✔
252
      Thread.new(wrkr, self) do |worker, parent|
11,786✔
253
        Thread.current.abort_on_exception = false
11,786✔
254
        worker.run
10,066✔
255
        parent.on_worker_exit(worker)
10,065✔
256
      end
293✔
257
      return wrkr
258
    end
11,786✔
259
  end
260
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc