• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ruby-concurrency / concurrent-ruby / #2700

12 Feb 2015 06:34PM UTC coverage: 92.31% (+0.6%) from 91.69%
#2700

push

lucasallan
Merge pull request #238 from ruby-concurrency/semaphore-pure-java

Implementation of Concurrent::JavaSemaphore in pure Java.

2881 of 3121 relevant lines covered (92.31%)

393.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.8
/lib/concurrent/executor/executor.rb
1
require 'concurrent/errors'
1✔
2
require 'concurrent/logging'
1✔
3
require 'concurrent/atomic/event'
1✔
4

5
module Concurrent
1✔
6

7
  module Executor
1✔
8
    # The policy defining how rejected tasks (tasks received once the
9
    # queue size reaches the configured `max_queue`, or after the
10
    # executor has shut down) are handled. Must be one of the values
11
    # specified in `FALLBACK_POLICIES`.
12
    attr_reader :fallback_policy
1✔
13

14
    # @!macro [attach] executor_module_method_can_overflow_question
15
    #
16
    #   Does the task queue have a maximum size?
17
    #
18
    #   @return [Boolean] True if the task queue has a maximum size else false.
19
    #
20
    # @note Always returns `false`
21
    def can_overflow?
1✔
22
      false
×
23
    end
24

25
    # Handler which executes the `fallback_policy` once the queue size
26
    # reaches `max_queue`.
27
    #
28
    # @param [Array] args the arguments to the task which is being handled.
29
    #
30
    # @!visibility private
31
    def handle_fallback(*args)
1✔
32
      case @fallback_policy
158✔
33
      when :abort
34
        raise RejectedExecutionError
51✔
35
      when :discard
36
        false
88✔
37
      when :caller_runs
38
        begin
39
          yield(*args)
19✔
40
        rescue => ex
41
          # let it fail
42
          log DEBUG, ex
×
43
        end
44
        true
19✔
45
      else
46
        fail "Unknown fallback policy #{@fallback_policy}"
×
47
      end
48
    end
49

50
    # @!macro [attach] executor_module_method_serialized_question
51
    #
52
    #   Does this executor guarantee serialization of its operations?
53
    #
54
    #   @return [Boolean] True if the executor guarantees that all operations
55
    #     will be post in the order they are received and no two operations may
56
    #     occur simultaneously. Else false.
57
    #
58
    # @note Always returns `false`
59
    def serialized?
1✔
60
      false
×
61
    end
62
  end
63

64
  # Indicates that the including `Executor` or `ExecutorService` guarantees
65
  # that all operations will occur in the order they are post and that no
66
  # two operations may occur simultaneously. This module provides no
67
  # functionality and provides no guarantees. That is the responsibility
68
  # of the including class. This module exists solely to allow the including
69
  # object to be interrogated for its serialization status.
70
  #
71
  # @example
72
  #   class Foo
73
  #     include Concurrent::SerialExecutor
74
  #   end
75
  #
76
  #   foo = Foo.new
77
  #
78
  #   foo.is_a? Concurrent::Executor       #=> true
79
  #   foo.is_a? Concurrent::SerialExecutor #=> true
80
  #   foo.serialized?                      #=> true
81
  module SerialExecutor
1✔
82
    include Executor
1✔
83

84
    # @!macro executor_module_method_serialized_question
85
    #
86
    # @note Always returns `true`
87
    def serialized?
1✔
88
      true
×
89
    end
90
  end
91

92
  module RubyExecutor
1✔
93
    include Executor
1✔
94
    include Logging
1✔
95

96
    # The set of possible fallback policies that may be set at thread pool creation.
97
    FALLBACK_POLICIES          = [:abort, :discard, :caller_runs]
1✔
98

99
    # @!macro [attach] executor_method_post
100
    #
101
    #   Submit a task to the executor for asynchronous processing.
102
    #
103
    #   @param [Array] args zero or more arguments to be passed to the task
104
    #
105
    #   @yield the asynchronous task to perform
106
    #
107
    #   @return [Boolean] `true` if the task is queued, `false` if the executor
108
    #     is not running
109
    #
110
    #   @raise [ArgumentError] if no task is given
111
    def post(*args, &task)
1✔
112
      raise ArgumentError.new('no block given') unless block_given?
4,163✔
113
      mutex.synchronize do
4,160✔
114
        # If the executor is shut down, reject this task
115
        return handle_fallback(*args, &task) unless running?
4,160✔
116
        execute(*args, &task)
4,129✔
117
        true
4,086✔
118
      end
119
    end
120

121
    # @!macro [attach] executor_method_left_shift
122
    #
123
    #   Submit a task to the executor for asynchronous processing.
124
    #
125
    #   @param [Proc] task the asynchronous task to perform
126
    #
127
    #   @return [self] returns itself
128
    def <<(task)
1✔
129
      post(&task)
234✔
130
      self
212✔
131
    end
132

133
    # @!macro [attach] executor_method_running_question
134
    #
135
    #   Is the executor running?
136
    #
137
    #   @return [Boolean] `true` when running, `false` when shutting down or shutdown
138
    def running?
1✔
139
      ! stop_event.set?
8,073✔
140
    end
141

142
    # @!macro [attach] executor_method_shuttingdown_question
143
    #
144
    #   Is the executor shuttingdown?
145
    #
146
    #   @return [Boolean] `true` when not running and not shutdown, else `false`
147
    def shuttingdown?
1✔
148
      ! (running? || shutdown?)
×
149
    end
150

151
    # @!macro [attach] executor_method_shutdown_question
152
    #
153
    #   Is the executor shutdown?
154
    #
155
    #   @return [Boolean] `true` when shutdown, `false` when shutting down or running
156
    def shutdown?
1✔
157
      stopped_event.set?
557✔
158
    end
159

160
    # @!macro [attach] executor_method_shutdown
161
    #
162
    #   Begin an orderly shutdown. Tasks already in the queue will be executed,
163
    #   but no new tasks will be accepted. Has no additional effect if the
164
    #   thread pool is not running.
165
    def shutdown
1✔
166
      mutex.synchronize do
113✔
167
        break unless running?
113✔
168
        stop_event.set
104✔
169
        shutdown_execution
104✔
170
      end
171
      true
113✔
172
    end
173

174
    # @!macro [attach] executor_method_kill
175
    #
176
    #   Begin an immediate shutdown. In-progress tasks will be allowed to
177
    #   complete but enqueued tasks will be dismissed and no new tasks
178
    #   will be accepted. Has no additional effect if the thread pool is
179
    #   not running.
180
    def kill
1✔
181
      mutex.synchronize do
553✔
182
        break if shutdown?
553✔
183
        stop_event.set
189✔
184
        kill_execution
189✔
185
        stopped_event.set
189✔
186
      end
187
      true
553✔
188
    end
189

190
    # @!macro [attach] executor_method_wait_for_termination
191
    #
192
    #   Block until executor shutdown is complete or until `timeout` seconds have
193
    #   passed.
194
    #
195
    #   @note Does not initiate shutdown or termination. Either `shutdown` or `kill`
196
    #     must be called before this method (or on another thread).
197
    #
198
    #   @param [Integer] timeout the maximum number of seconds to wait for shutdown to complete
199
    #
200
    #   @return [Boolean] `true` if shutdown complete or false on `timeout`
201
    def wait_for_termination(timeout = nil)
1✔
202
      stopped_event.wait(timeout)
96✔
203
    end
204

205
    protected
1✔
206

207
    attr_reader :mutex, :stop_event, :stopped_event
1✔
208

209
    # @!macro [attach] executor_method_init_executor
210
    #
211
    #   Initialize the executor by creating and initializing all the
212
    #   internal synchronization objects.
213
    def init_executor
1✔
214
      @mutex = Mutex.new
782✔
215
      @stop_event = Event.new
782✔
216
      @stopped_event = Event.new
782✔
217
    end
218

219
    # @!macro [attach] executor_method_execute
220
    def execute(*args, &task)
1✔
221
      raise NotImplementedError
×
222
    end
223

224
    # @!macro [attach] executor_method_shutdown_execution
225
    # 
226
    #   Callback method called when an orderly shutdown has completed.
227
    #   The default behavior is to signal all waiting threads.
228
    def shutdown_execution
1✔
229
      stopped_event.set
1✔
230
    end
231

232
    # @!macro [attach] executor_method_kill_execution
233
    #
234
    #   Callback method called when the executor has been killed.
235
    #   The default behavior is to do nothing.
236
    def kill_execution
1✔
237
      # do nothing
238
    end
239
  end
240

241
  if RUBY_PLATFORM == 'java'
1✔
242

243
    module JavaExecutor
×
244
      include Executor
×
245
      java_import 'java.lang.Runnable'
×
246

247
      # The set of possible fallback policies that may be set at thread pool creation.
248
      FALLBACK_POLICIES = {
249
        abort: java.util.concurrent.ThreadPoolExecutor::AbortPolicy,
×
250
        discard: java.util.concurrent.ThreadPoolExecutor::DiscardPolicy,
251
        caller_runs: java.util.concurrent.ThreadPoolExecutor::CallerRunsPolicy
252
      }.freeze
253

254
      # @!macro executor_method_post
255
      def post(*args, &task)
×
256
        raise ArgumentError.new('no block given') unless block_given?
×
257
        return handle_fallback(*args, &task) unless running?
×
258
        executor_submit = @executor.java_method(:submit, [Runnable.java_class])
×
259
        executor_submit.call { yield(*args) }
×
260
        true
×
261
      rescue Java::JavaUtilConcurrent::RejectedExecutionException
262
        raise RejectedExecutionError
×
263
      end
264

265
      # @!macro executor_method_left_shift
266
      def <<(task)
×
267
        post(&task)
×
268
        self
×
269
      end
270

271
      # @!macro executor_method_running_question
272
      def running?
×
273
        ! (shuttingdown? || shutdown?)
×
274
      end
275

276
      # @!macro executor_method_shuttingdown_question
277
      def shuttingdown?
×
278
        if @executor.respond_to? :isTerminating
×
279
          @executor.isTerminating
×
280
        else
281
          false
×
282
        end
283
      end
284

285
      # @!macro executor_method_shutdown_question
286
      def shutdown?
×
287
        @executor.isShutdown || @executor.isTerminated
×
288
      end
289

290
      # @!macro executor_method_wait_for_termination
291
      def wait_for_termination(timeout = nil)
×
292
        if timeout.nil?
×
293
          ok = @executor.awaitTermination(60, java.util.concurrent.TimeUnit::SECONDS) until ok
×
294
          true
×
295
        else
296
          @executor.awaitTermination(1000 * timeout, java.util.concurrent.TimeUnit::MILLISECONDS)
×
297
        end
298
      end
299

300
      # @!macro executor_method_shutdown
301
      def shutdown
×
302
        @executor.shutdown
×
303
        nil
304
      end
305

306
      # @!macro executor_method_kill
307
      def kill
×
308
        @executor.shutdownNow
×
309
        nil
310
      end
311

312
      protected
×
313

314
      def set_shutdown_hook
×
315
        # without this the process may fail to exit
316
        at_exit { self.kill }
×
317
      end
318
    end
319
  end
320
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc