• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jdantonio / concurrent-ruby / #719

06 Apr 2014 10:19PM UTC coverage: 92.644% (-4.7%) from 97.309%
#719

push

chrisseaton
Update README.md

1587 of 1713 relevant lines covered (92.64%)

735.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.21
/lib/concurrent/ruby_thread_pool_executor.rb
1
require 'thread'
1✔
2

3
require 'concurrent/event'
1✔
4
require 'concurrent/ruby_thread_pool_worker'
1✔
5

6
module Concurrent
1✔
7

8
  RejectedExecutionError = Class.new(StandardError) unless defined? RejectedExecutionError
1✔
9

10
  # @!macro thread_pool_executor
11
  class RubyThreadPoolExecutor
1✔
12

13
    # The maximum number of threads that will be created in the pool
14
    # (unless overridden during construction).
15
    DEFAULT_MAX_POOL_SIZE = 2**15 # 32768
1✔
16

17
    # The minimum number of threads that will be created in the pool
18
    # (unless overridden during construction).
19
    DEFAULT_MIN_POOL_SIZE = 0
1✔
20

21
    DEFAULT_MAX_QUEUE_SIZE = 0
1✔
22

23
    # The maximum number of seconds a thread in the pool may remain idle before
24
    # being reclaimed (unless overridden during construction).
25
    DEFAULT_THREAD_IDLETIMEOUT = 60
1✔
26

27
    OVERFLOW_POLICIES = [:abort, :discard, :caller_runs]
1✔
28

29
    # The maximum number of threads that may be created in the pool.
30
    attr_reader :max_length
1✔
31
    attr_reader :min_length
1✔
32

33
    attr_reader :largest_length
1✔
34

35
    attr_reader :scheduled_task_count
1✔
36
    attr_reader :completed_task_count
1✔
37

38
    attr_reader :idletime
1✔
39

40
    attr_reader :max_queue
1✔
41

42
    attr_reader :overflow_policy
1✔
43

44
    # Create a new thread pool.
45
    #
46
    # @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html
47
    def initialize(opts = {})
1✔
48
      @min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
230✔
49
      @max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
230✔
50
      @idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
230✔
51
      @max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
230✔
52
      @overflow_policy = opts.fetch(:overflow_policy, :abort)
230✔
53

54
      raise ArgumentError.new('max_threads must be greater than zero') if @max_length <= 0
230✔
55
      raise ArgumentError.new('min_threads cannot be less than zero') if @min_length < 0
229✔
56
      raise ArgumentError.new("#{overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.include?(@overflow_policy)
228✔
57

58
      @state = :running
227✔
59
      @pool = []
227✔
60
      @terminator = Event.new
227✔
61
      @queue = Queue.new
227✔
62
      @mutex = Mutex.new
227✔
63
      @scheduled_task_count = 0
227✔
64
      @completed_task_count = 0
227✔
65
      @largest_length = 0
227✔
66

67
      @gc_interval = opts.fetch(:gc_interval, 1).to_i # undocumented
227✔
68
      @last_gc_time = Time.now.to_f - [1.0, (@gc_interval * 2.0)].max
227✔
69
    end
70

71
    def length
1✔
72
      @mutex.synchronize do
40✔
73
        @state != :shutdown ? @pool.length : 0
40✔
74
      end
75
    end
76
    alias_method :current_length, :length
1✔
77

78
    def queue_length
1✔
79
      @queue.length
5✔
80
    end
81

82
    def remaining_capacity
1✔
83
      @mutex.synchronize { @max_queue == 0 ? -1 : @max_queue - @queue.length }
10✔
84
    end
85

86
    # Is the thread pool running?
87
    #
88
    # @return [Boolean] +true+ when running, +false+ when shutting down or shutdown
89
    def running?
1✔
90
      @mutex.synchronize { @state == :running }
24✔
91
    end
92

93
    # Is the thread pool shutdown?
94
    #
95
    # @return [Boolean] +true+ when shutdown, +false+ when shutting down or running
96
    def shutdown?
1✔
97
      @mutex.synchronize { @state != :running }
2✔
98
    end
4✔
99

100
    # Block until thread pool shutdown is complete or until +timeout+ seconds have
101
    # passed.
102
    #
103
    # @note Does not initiate shutdown or termination. Either +shutdown+ or +kill+
104
    #   must be called before this method (or on another thread).
1✔
105
    #
×
106
    # @param [Integer] timeout the maximum number of seconds to wait for shutdown to complete
107
    #
108
    # @return [Boolean] +true+ if shutdown complete or false on +timeout+
109
    def wait_for_termination(timeout)
110
      return @terminator.wait(timeout.to_i)
111
    end
112

113
    # Submit a task to the thread pool for asynchronous processing.
114
    #
115
    # @param [Array] args zero or more arguments to be passed to the task
116
    #
117
    # @yield the asynchronous task to perform
1✔
118
    #
39✔
119
    # @return [Boolean] +true+ if the task is queued, +false+ if the thread pool
120
    #   is not running
121
    #
122
    # @raise [ArgumentError] if no task is given
123
    def post(*args, &task)
124
      raise ArgumentError.new('no block given') unless block_given?
125
      @mutex.synchronize do
126
        break false unless @state == :running
127
        return handle_overflow(*args, &task) if @max_queue != 0 && @queue.length >= @max_queue
128
        @scheduled_task_count += 1
129
        @queue << [args, task]
130
        if Time.now.to_f - @gc_interval >= @last_gc_time
131
          prune_pool
1✔
132
          @last_gc_time = Time.now.to_f
3,190✔
133
        end
3,184✔
134
        grow_pool
3,184✔
135
        true
3,163✔
136
      end
2,386✔
137
    end
2,386✔
138

2,386✔
139
    # Submit a task to the thread pool for asynchronous processing.
147✔
140
    #
147✔
141
    # @param [Proc] task the asynchronous task to perform
142
    #
2,386✔
143
    # @return [self] returns itself
2,386✔
144
    def <<(task)
145
      self.post(&task)
146
      return self
147
    end
148

149
    # Begin an orderly shutdown. Tasks already in the queue will be executed,
150
    # but no new tasks will be accepted. Has no additional effect if the
151
    # thread pool is not running.
152
    def shutdown
1✔
153
      @mutex.synchronize do
804✔
154
        break unless @state == :running
704✔
155
        @queue.clear
156
        if @pool.empty?
157
          @state = :shutdown
158
          @terminator.set
159
        else
160
          @state = :shuttingdown
1✔
161
          @pool.length.times{ @queue << :stop }
65✔
162
        end
65✔
163
      end
65✔
164
    end
65✔
165

10✔
166
    # Begin an immediate shutdown. In-progress tasks will be allowed to
10✔
167
    # complete but enqueued tasks will be dismissed and no new tasks
168
    # will be accepted. Has no additional effect if the thread pool is
55✔
169
    # not running.
430✔
170
    def kill
171
      @mutex.synchronize do
172
        break if @state == :shutdown
173
        @queue.clear
174
        @state = :shutdown
175
        drain_pool
176
        @terminator.set
177
      end
178
    end
1✔
179

468✔
180
    # @!visibility private
468✔
181
    def on_end_task # :nodoc:
131✔
182
      @mutex.synchronize do
131✔
183
        @completed_task_count += 1 #if success
131✔
184
        break unless @state == :running
131✔
185
      end
186
    end
187

188
    # @!visibility private
189
    def on_worker_exit(worker) # :nodoc:
1✔
190
      @mutex.synchronize do
1,828✔
191
        @pool.delete(worker)
1,827✔
192
        if @pool.empty? && @state != :running
1,827✔
193
          @state = :shutdown
194
          @terminator.set
195
        end
196
      end
197
    end
1✔
198

530✔
199
    protected
530✔
200

530✔
201
    # @!visibility private
85✔
202
    def handle_overflow(*args) # :nodoc:
85✔
203
      case @overflow_policy
204
      when :abort
205
        raise RejectedExecutionError
206
      when :discard
207
        false
1✔
208
      when :caller_runs
209
        begin
210
          yield(*args)
1✔
211
        rescue
769✔
212
          # let it fail
213
        end
200✔
214
        true
215
      end
558✔
216
    end
217

218
    # @!visibility private
11✔
219
    def prune_pool # :nodoc:
220
      @pool.delete_if do |worker|
221
        worker.dead? ||
222
          (@idletime == 0 ? false : Time.now.to_f - @idletime > worker.last_activity)
11✔
223
      end
224
    end
225

226
    # @!visibility private
227
    def grow_pool # :nodoc:
1✔
228
      if @min_length > @pool.length
147✔
229
        additional = @min_length - @pool.length
17✔
230
      elsif @pool.length < @max_length && ! @queue.empty?
14✔
231
        # NOTE: does not take into account idle threads
232
        additional = 1
233
      else
234
        additional = 0
235
      end
1✔
236
      additional.times do
2,386✔
237
        break if @pool.length >= @max_length
99✔
238
        @pool << create_worker_thread
2,287✔
239
      end
240
      @largest_length = [@largest_length, @pool.length].max
1,523✔
241
    end
242

764✔
243
    # @!visibility private
244
    def drain_pool # :nodoc:
2,386✔
245
      @pool.each {|worker| worker.kill }
2,953✔
246
      @pool.clear
2,953✔
247
    end
248

2,386✔
249
    # @!visibility private
250
    def create_worker_thread # :nodoc:
251
      wrkr = RubyThreadPoolWorker.new(@queue, self)
252
      Thread.new(wrkr, self) do |worker, parent|
1✔
253
        Thread.current.abort_on_exception = false
433✔
254
        worker.run
131✔
255
        parent.on_worker_exit(worker)
256
      end
257
      return wrkr
258
    end
1✔
259
  end
2,953✔
260
end
2,953✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc