• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jdantonio / concurrent-ruby / #720

06 Apr 2014 10:19PM UTC coverage: 92.699% (-4.6%) from 97.309%
#720

push

chrisseaton
Update README.md

1587 of 1712 relevant lines covered (92.7%)

567.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.21
/lib/concurrent/ruby_thread_pool_executor.rb
1
require 'thread'
1✔
2

3
require 'concurrent/event'
1✔
4
require 'concurrent/ruby_thread_pool_worker'
1✔
5

6
module Concurrent
1✔
7

8
  RejectedExecutionError = Class.new(StandardError) unless defined? RejectedExecutionError
1✔
9

10
  # @!macro thread_pool_executor
11
  class RubyThreadPoolExecutor
1✔
12

13
    # The maximum number of threads that will be created in the pool
14
    # (unless overridden during construction).
15
    DEFAULT_MAX_POOL_SIZE = 2**15 # 32768
1✔
16

17
    # The minimum number of threads that will be created in the pool
18
    # (unless overridden during construction).
19
    DEFAULT_MIN_POOL_SIZE = 0
1✔
20

21
    DEFAULT_MAX_QUEUE_SIZE = 0
1✔
22

23
    # The maximum number of seconds a thread in the pool may remain idle before
24
    # being reclaimed (unless overridden during construction).
25
    DEFAULT_THREAD_IDLETIMEOUT = 60
1✔
26

27
    OVERFLOW_POLICIES = [:abort, :discard, :caller_runs]
1✔
28

29
    # The maximum number of threads that may be created in the pool.
30
    attr_reader :max_length
1✔
31
    attr_reader :min_length
1✔
32

33
    attr_reader :largest_length
1✔
34

35
    attr_reader :scheduled_task_count
1✔
36
    attr_reader :completed_task_count
1✔
37

38
    attr_reader :idletime
1✔
39

40
    attr_reader :max_queue
1✔
41

42
    attr_reader :overflow_policy
1✔
43

44
    # Create a new thread pool.
45
    #
46
    # @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html
47
    def initialize(opts = {})
1✔
48
      @min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
231✔
49
      @max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
231✔
50
      @idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
231✔
51
      @max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
231✔
52
      @overflow_policy = opts.fetch(:overflow_policy, :abort)
231✔
53

54
      raise ArgumentError.new('max_threads must be greater than zero') if @max_length <= 0
231✔
55
      raise ArgumentError.new('min_threads cannot be less than zero') if @min_length < 0
230✔
56
      raise ArgumentError.new("#{overflow_policy} is not a valid overflow policy") unless OVERFLOW_POLICIES.include?(@overflow_policy)
229✔
57

58
      @state = :running
228✔
59
      @pool = []
228✔
60
      @terminator = Event.new
228✔
61
      @queue = Queue.new
228✔
62
      @mutex = Mutex.new
228✔
63
      @scheduled_task_count = 0
228✔
64
      @completed_task_count = 0
228✔
65
      @largest_length = 0
228✔
66

67
      @gc_interval = opts.fetch(:gc_interval, 1).to_i # undocumented
228✔
68
      @last_gc_time = Time.now.to_f - [1.0, (@gc_interval * 2.0)].max
228✔
69
    end
70

71
    def length
1✔
72
      @mutex.synchronize do
40✔
73
        @state != :shutdown ? @pool.length : 0
40✔
74
      end
75
    end
76
    alias_method :current_length, :length
1✔
77

78
    def queue_length
79
      @queue.length
80
    end
81

82
    def remaining_capacity
83
      @mutex.synchronize { @max_queue == 0 ? -1 : @max_queue - @queue.length }
1✔
84
    end
2✔
85

2✔
86
    # Is the thread pool running?
87
    #
88
    # @return [Boolean] +true+ when running, +false+ when shutting down or shutdown
89
    def running?
1✔
90
      @mutex.synchronize { @state == :running }
5✔
91
    end
92

93
    # Is the thread pool shutdown?
1✔
94
    #
10✔
95
    # @return [Boolean] +true+ when shutdown, +false+ when shutting down or running
96
    def shutdown?
97
      @mutex.synchronize { @state != :running }
98
    end
99

100
    # Block until thread pool shutdown is complete or until +timeout+ seconds have
1✔
101
    # passed.
24✔
102
    #
103
    # @note Does not initiate shutdown or termination. Either +shutdown+ or +kill+
104
    #   must be called before this method (or on another thread).
105
    #
106
    # @param [Integer] timeout the maximum number of seconds to wait for shutdown to complete
107
    #
1✔
108
    # @return [Boolean] +true+ if shutdown complete or false on +timeout+
×
109
    def wait_for_termination(timeout)
110
      return @terminator.wait(timeout.to_i)
111
    end
112

113
    # Submit a task to the thread pool for asynchronous processing.
114
    #
115
    # @param [Array] args zero or more arguments to be passed to the task
116
    #
117
    # @yield the asynchronous task to perform
118
    #
119
    # @return [Boolean] +true+ if the task is queued, +false+ if the thread pool
120
    #   is not running
1✔
121
    #
39✔
122
    # @raise [ArgumentError] if no task is given
123
    def post(*args, &task)
124
      raise ArgumentError.new('no block given') unless block_given?
125
      @mutex.synchronize do
126
        break false unless @state == :running
127
        return handle_overflow(*args, &task) if @max_queue != 0 && @queue.length >= @max_queue
128
        @scheduled_task_count += 1
129
        @queue << [args, task]
130
        if Time.now.to_f - @gc_interval >= @last_gc_time
131
          prune_pool
132
          @last_gc_time = Time.now.to_f
133
        end
134
        grow_pool
1✔
135
        true
3,190✔
136
      end
3,184✔
137
    end
3,184✔
138

3,163✔
139
    # Submit a task to the thread pool for asynchronous processing.
2,386✔
140
    #
2,386✔
141
    # @param [Proc] task the asynchronous task to perform
2,386✔
142
    #
147✔
143
    # @return [self] returns itself
147✔
144
    def <<(task)
145
      self.post(&task)
2,386✔
146
      return self
2,386✔
147
    end
148

149
    # Begin an orderly shutdown. Tasks already in the queue will be executed,
150
    # but no new tasks will be accepted. Has no additional effect if the
151
    # thread pool is not running.
152
    def shutdown
153
      @mutex.synchronize do
154
        break unless @state == :running
155
        @queue.clear
1✔
156
        if @pool.empty?
804✔
157
          @state = :shutdown
704✔
158
          @terminator.set
159
        else
160
          @state = :shuttingdown
161
          @pool.length.times{ @queue << :stop }
162
        end
163
      end
1✔
164
    end
65✔
165

65✔
166
    # Begin an immediate shutdown. In-progress tasks will be allowed to
65✔
167
    # complete but enqueued tasks will be dismissed and no new tasks
65✔
168
    # will be accepted. Has no additional effect if the thread pool is
11✔
169
    # not running.
11✔
170
    def kill
171
      @mutex.synchronize do
54✔
172
        break if @state == :shutdown
365✔
173
        @queue.clear
174
        @state = :shutdown
175
        drain_pool
176
        @terminator.set
177
      end
178
    end
179

180
    # @!visibility private
181
    def on_end_task # :nodoc:
1✔
182
      @mutex.synchronize do
468✔
183
        @completed_task_count += 1 #if success
468✔
184
        break unless @state == :running
131✔
185
      end
131✔
186
    end
131✔
187

131✔
188
    # @!visibility private
189
    def on_worker_exit(worker) # :nodoc:
190
      @mutex.synchronize do
191
        @pool.delete(worker)
192
        if @pool.empty? && @state != :running
1✔
193
          @state = :shutdown
1,831✔
194
          @terminator.set
1,830✔
195
        end
1,830✔
196
      end
197
    end
198

199
    protected
200

1✔
201
    # @!visibility private
530✔
202
    def handle_overflow(*args) # :nodoc:
530✔
203
      case @overflow_policy
530✔
204
      when :abort
86✔
205
        raise RejectedExecutionError
86✔
206
      when :discard
207
        false
208
      when :caller_runs
209
        begin
210
          yield(*args)
1✔
211
        rescue
212
          # let it fail
213
        end
1✔
214
        true
769✔
215
      end
216
    end
200✔
217

218
    # @!visibility private
558✔
219
    def prune_pool # :nodoc:
220
      @pool.delete_if do |worker|
221
        worker.dead? ||
11✔
222
          (@idletime == 0 ? false : Time.now.to_f - @idletime > worker.last_activity)
223
      end
224
    end
225

11✔
226
    # @!visibility private
227
    def grow_pool # :nodoc:
228
      if @min_length > @pool.length
229
        additional = @min_length - @pool.length
230
      elsif @pool.length < @max_length && ! @queue.empty?
1✔
231
        # NOTE: does not take into account idle threads
147✔
232
        additional = 1
17✔
233
      else
14✔
234
        additional = 0
235
      end
236
      additional.times do
237
        break if @pool.length >= @max_length
238
        @pool << create_worker_thread
1✔
239
      end
2,386✔
240
      @largest_length = [@largest_length, @pool.length].max
99✔
241
    end
2,287✔
242

243
    # @!visibility private
361✔
244
    def drain_pool # :nodoc:
245
      @pool.each {|worker| worker.kill }
1,926✔
246
      @pool.clear
247
    end
2,386✔
248

675✔
249
    # @!visibility private
675✔
250
    def create_worker_thread # :nodoc:
251
      wrkr = RubyThreadPoolWorker.new(@queue, self)
2,386✔
252
      Thread.new(wrkr, self) do |worker, parent|
253
        Thread.current.abort_on_exception = false
254
        worker.run
255
        parent.on_worker_exit(worker)
1✔
256
      end
433✔
257
      return wrkr
131✔
258
    end
259
  end
260
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc