• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ruby-concurrency / concurrent-ruby / #2777

05 Oct 2014 10:16PM UTC coverage: 45.201% (-49.6%) from 94.81%
#2777

push

jdantonio
Merge pull request #158 from obrok/promise-composition

Promise composition

2 of 15 new or added lines in 1 file covered. (13.33%)

1514 existing lines in 84 files now uncovered.

1375 of 3042 relevant lines covered (45.2%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.14
/lib/concurrent/executor/ruby_thread_pool_executor.rb
1
require 'thread'
1✔
2

3
require_relative 'executor'
1✔
4
require 'concurrent/atomic/event'
1✔
5
require 'concurrent/executor/ruby_thread_pool_worker'
1✔
6

7
module Concurrent
1✔
8

9
  # @!macro thread_pool_executor
10
  class RubyThreadPoolExecutor
1✔
11
    include RubyExecutor
1✔
12

13
    # Default maximum number of threads that will be created in the pool.
14
    DEFAULT_MAX_POOL_SIZE = 2**15 # 32768
1✔
15

16
    # Default minimum number of threads that will be retained in the pool.
17
    DEFAULT_MIN_POOL_SIZE = 0
1✔
18

19
    # Default maximum number of tasks that may be added to the task queue.
20
    DEFAULT_MAX_QUEUE_SIZE = 0
1✔
21

22
    # Default maximum number of seconds a thread in the pool may remain idle
23
    # before being reclaimed.
24
    DEFAULT_THREAD_IDLETIMEOUT = 60
1✔
25

26
    # The set of possible overflow policies that may be set at thread pool creation.
27
    OVERFLOW_POLICIES = [:abort, :discard, :caller_runs]
1✔
28

29
    # The maximum number of threads that may be created in the pool.
30
    attr_reader :max_length
1✔
31

32
    # The minimum number of threads that may be retained in the pool.
33
    attr_reader :min_length
1✔
34

35
    # The largest number of threads that have been created in the pool since construction.
36
    attr_reader :largest_length
1✔
37

38
    # The number of tasks that have been scheduled for execution on the pool since construction.
39
    attr_reader :scheduled_task_count
1✔
40

41
    # The number of tasks that have been completed by the pool since construction.
42
    attr_reader :completed_task_count
1✔
43

44
    # The number of seconds that a thread may be idle before being reclaimed.
45
    attr_reader :idletime
1✔
46

47
    # The maximum number of tasks that may be waiting in the work queue at any one time.
48
    # When the queue size reaches `max_queue` subsequent tasks will be rejected in
49
    # accordance with the configured `overflow_policy`.
50
    attr_reader :max_queue
1✔
51

52
    # The policy defining how rejected tasks (tasks received once the queue size reaches
53
    # the configured `max_queue`) are handled. Must be one of the values specified in
54
    # `OVERFLOW_POLICIES`.
55
    attr_reader :overflow_policy
1✔
56

57
    # Create a new thread pool.
58
    #
59
    # @param [Hash] opts the options which configure the thread pool
60
    #
61
    # @option opts [Integer] :max_threads (DEFAULT_MAX_POOL_SIZE) the maximum
62
    #   number of threads to be created
63
    # @option opts [Integer] :min_threads (DEFAULT_MIN_POOL_SIZE) the minimum
64
    #   number of threads to be retained
65
    # @option opts [Integer] :idletime (DEFAULT_THREAD_IDLETIMEOUT) the maximum
66
    #   number of seconds a thread may be idle before being reclaimed
67
    # @option opts [Integer] :max_queue (DEFAULT_MAX_QUEUE_SIZE) the maximum
68
    #   number of tasks allowed in the work queue at any one time; a value of
69
    #   zero means the queue may grow without bounnd
70
    # @option opts [Symbol] :overflow_policy (:abort) the policy for handling new
71
    #   tasks that are received when the queue size has reached `max_queue`
72
    #
73
    # @raise [ArgumentError] if `:max_threads` is less than one
74
    # @raise [ArgumentError] if `:min_threads` is less than zero
75
    # @raise [ArgumentError] if `:overflow_policy` is not one of the values specified
76
    #   in `OVERFLOW_POLICIES`
77
    #
78
    # @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html
79
    def initialize(opts = {})
1✔
80
      @min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
2✔
81
      @max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
2✔
82
      @idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
2✔
83
      @max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
2✔
84
      @overflow_policy = opts.fetch(:overflow_policy, :abort)
2✔
85

86
      raise ArgumentError.new('max_threads must be greater than zero') if @max_length <= 0
2✔
87
      raise ArgumentError.new('min_threads cannot be less than zero') if @min_length < 0
2✔
88
      raise ArgumentError.new("#{overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.include?(@overflow_policy)
2✔
89
      raise ArgumentError.new('min_threads cannot be more than max_threads') if min_length > max_length
2✔
90

91
      init_executor
2✔
92

93
      @pool = []
2✔
94
      @queue = Queue.new
2✔
95
      @scheduled_task_count = 0
2✔
96
      @completed_task_count = 0
2✔
97
      @largest_length = 0
2✔
98

99
      @gc_interval = opts.fetch(:gc_interval, 1).to_i # undocumented
2✔
100
      @last_gc_time = Time.now.to_f - [1.0, (@gc_interval * 2.0)].max
2✔
101
    end
102

103
    # @!macro executor_module_method_can_overflow_question
104
    def can_overflow?
1✔
105
      @max_queue != 0
×
106
    end
107

108
    # The number of threads currently in the pool.
109
    #
110
    # @return [Integer] the length
111
    def length
1✔
UNCOV
112
      mutex.synchronize{ running? ? @pool.length : 0 }
×
113
    end
114
    alias_method :current_length, :length
1✔
115

116
    # The number of tasks in the queue awaiting execution.
117
    #
118
    # @return [Integer] the queue_length
119
    def queue_length
1✔
UNCOV
120
      mutex.synchronize{ running? ? @queue.length : 0 }
×
121
    end
122

123
    # Number of tasks that may be enqueued before reaching `max_queue` and rejecting
124
    # new tasks. A value of -1 indicates that the queue may grow without bound.
125
    #
126
    # @return [Integer] the remaining_capacity
127
    def remaining_capacity
1✔
UNCOV
128
      mutex.synchronize { @max_queue == 0 ? -1 : @max_queue - @queue.length }
×
129
    end
130

131
    # Returns an array with the status of each thread in the pool
132
    #
133
    # This method is deprecated and will be removed soon.
134
    def status
1✔
UNCOV
135
      warn '[DEPRECATED] `status` is deprecated and will be removed soon.'
×
UNCOV
136
      mutex.synchronize { @pool.collect { |worker| worker.status } }
×
137
    end
138

139
    # Run on task completion.
140
    #
141
    # @!visibility private
142
    def on_end_task
1✔
UNCOV
143
      mutex.synchronize do
×
UNCOV
144
        @completed_task_count += 1 #if success
×
UNCOV
145
        break unless running?
×
146
      end
147
    end
148

149
    # Run when a thread worker exits.
150
    #
151
    # @!visibility private
152
    def on_worker_exit(worker)
1✔
UNCOV
153
      mutex.synchronize do
×
UNCOV
154
        @pool.delete(worker)
×
UNCOV
155
        if @pool.empty? && ! running?
×
UNCOV
156
          stop_event.set
×
UNCOV
157
          stopped_event.set
×
158
        end
159
      end
160
    end
161

162
    protected
1✔
163

164
    # @!visibility private
165
    def execute(*args, &task)
1✔
UNCOV
166
      prune_pool
×
UNCOV
167
      if ensure_capacity?
×
UNCOV
168
        @scheduled_task_count += 1
×
UNCOV
169
        @queue << [args, task]
×
170
      else
UNCOV
171
        handle_overflow(*args, &task) if @max_queue != 0 && @queue.length >= @max_queue
×
172
      end
173
    end
174

175
    # @!visibility private
176
    def shutdown_execution
1✔
177
      if @pool.empty?
2✔
178
        stopped_event.set
2✔
179
      else
UNCOV
180
        @pool.length.times{ @queue << :stop }
×
181
      end
182
    end
183

184
    # @!visibility private
185
    def kill_execution
1✔
UNCOV
186
      @queue.clear
×
UNCOV
187
      drain_pool
×
188
    end
189

190
    # Check the thread pool configuration and determine if the pool
191
    # has enought capacity to handle the request. Will grow the size
192
    # of the pool if necessary.
193
    #
194
    # @return [Boolean] true if the pool has enough capacity else false
195
    #
196
    # @!visibility private
197
    def ensure_capacity?
1✔
UNCOV
198
      additional = 0
×
UNCOV
199
      capacity = true
×
200

UNCOV
201
      if @pool.size < @min_length
×
UNCOV
202
        additional = @min_length - @pool.size
×
UNCOV
203
      elsif @queue.empty? && @queue.num_waiting >= 1
×
UNCOV
204
        additional = 0
×
UNCOV
205
      elsif @pool.size == 0 && @min_length == 0
×
UNCOV
206
        additional = 1
×
UNCOV
207
      elsif @pool.size < @max_length || @max_length == 0
×
UNCOV
208
        additional = 1
×
UNCOV
209
      elsif @max_queue == 0 || @queue.size < @max_queue
×
UNCOV
210
        additional = 0
×
211
      else
UNCOV
212
        capacity = false
×
213
      end
214

UNCOV
215
      additional.times do
×
UNCOV
216
        @pool << create_worker_thread
×
217
      end
218

UNCOV
219
      if additional > 0
×
UNCOV
220
        @largest_length = [@largest_length, @pool.length].max
×
221
      end
222

UNCOV
223
      capacity
×
224
    end
225

226
    # Handler which executes the `overflow_policy` once the queue size
227
    # reaches `max_queue`.
228
    #
229
    # @param [Array] args the arguments to the task which is being handled.
230
    #
231
    # @!visibility private
232
    def handle_overflow(*args)
1✔
UNCOV
233
      case @overflow_policy
×
234
      when :abort
UNCOV
235
        raise RejectedExecutionError
×
236
      when :discard
UNCOV
237
        false
×
238
      when :caller_runs
239
        begin
UNCOV
240
          yield(*args)
×
241
        rescue => ex
242
          # let it fail
243
          log DEBUG, ex
×
244
        end
UNCOV
245
        true
×
246
      end
247
    end
248

249
    # Scan all threads in the pool and reclaim any that are dead or
250
    # have been idle too long. Will check the last time the pool was
251
    # pruned and only run if the configured garbage collection
252
    # interval has passed.
253
    #
254
    # @!visibility private
255
    def prune_pool
1✔
UNCOV
256
      if Time.now.to_f - @gc_interval >= @last_gc_time
×
UNCOV
257
        @pool.delete_if do |worker|
×
UNCOV
258
          worker.dead? ||
×
UNCOV
259
            (@idletime == 0 ? false : Time.now.to_f - @idletime > worker.last_activity)
×
260
        end
UNCOV
261
        @last_gc_time = Time.now.to_f
×
262
      end
263
    end
264

265
    # Reclaim all threads in the pool.
266
    #
267
    # @!visibility private
268
    def drain_pool
1✔
UNCOV
269
      @pool.each {|worker| worker.kill }
×
UNCOV
270
      @pool.clear
×
271
    end
272

273
    # Create a single worker thread to be added to the pool.
274
    #
275
    # @return [Thread] the new thread.
276
    #
277
    # @!visibility private
278
    def create_worker_thread
1✔
UNCOV
279
      wrkr = RubyThreadPoolWorker.new(@queue, self)
×
UNCOV
280
      Thread.new(wrkr, self) do |worker, parent|
×
UNCOV
281
        Thread.current.abort_on_exception = false
×
UNCOV
282
        worker.run
×
UNCOV
283
        parent.on_worker_exit(worker)
×
284
      end
UNCOV
285
      return wrkr
×
286
    end
287
  end
288
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc