• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ruby-concurrency / concurrent-ruby / #2727

25 Jun 2014 01:41PM UTC coverage: 91.488% (-4.8%) from 96.27%
#2727

push

jdantonio
Merge pull request #121 from ruby-concurrency/refactor/executors

Refactor executors

72 of 78 new or added lines in 6 files covered. (92.31%)

2343 of 2561 relevant lines covered (91.49%)

475.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.77
/lib/concurrent/executor/executor.rb
1
require 'concurrent/errors'
1✔
2
require 'concurrent/logging'
1✔
3
require 'concurrent/atomic/event'
1✔
4

5
module Concurrent
1✔
6

7
  module Executor
1✔
8
    
9
    # @!macro [attach] executor_module_method_can_overflow_question
10
    #
11
    #   Does the task queue have a maximum size?
12
    #
13
    #   @return [Boolean] True if the task queue has a maximum size else false.
14
    #
15
    # @note Always returns `false`
16
    def can_overflow?
1✔
17
      false
×
18
    end
19

20
    # @!macro [attach] executor_module_method_serialized_question
21
    #
22
    #   Does this executor guarantee serialization of its operations?
23
    #
24
    #   @return [Boolean] True if the executor guarantees that all operations
25
    #     will be post in the order they are received and no two operations may
26
    #     occur simultaneously. Else false.
27
    #
28
    # @note Always returns `false`
29
    def serialized?
1✔
NEW
30
      false
×
31
    end
32
  end
33

34
  # Indicates that the including `Executor` or `ExecutorService` guarantees
35
  # that all operations will occur in the order they are post and that no
36
  # two operations may occur simultaneously. This module provides no
37
  # functionality and provides no guarantees. That is the responsibility
38
  # of the including class. This module exists solely to allow the including
39
  # object to be interrogated for its serialization status.
40
  #
41
  # @example
42
  #   class Foo
43
  #     include Concurrent::SerialExecutor
44
  #   end
45
  #
46
  #   foo = Foo.new
47
  #
48
  #   foo.is_a? Concurrent::Executor       #=> true
49
  #   foo.is_a? Concurrent::SerialExecutor #=> true
50
  #   foo.serialized?                      #=> true
51
  module SerialExecutor
1✔
52
    include Executor
1✔
53

54
    # @!macro executor_module_method_serialized_question
55
    #
56
    # @note Always returns `true`
57
    def serialized?
1✔
NEW
58
      true
×
59
    end
60
  end
61

62
  module RubyExecutor
1✔
63
    include Executor
1✔
64
    include Logging
1✔
65

66
    # @!macro [attach] executor_method_post
67
    #
68
    #   Submit a task to the executor for asynchronous processing.
69
    #
70
    #   @param [Array] args zero or more arguments to be passed to the task
71
    #
72
    #   @yield the asynchronous task to perform
73
    #
74
    #   @return [Boolean] `true` if the task is queued, `false` if the executor
75
    #     is not running
76
    #
77
    #   @raise [ArgumentError] if no task is given
78
    def post(*args, &task)
1✔
79
      raise ArgumentError.new('no block given') unless block_given?
7,172✔
80
      mutex.synchronize do
7,169✔
81
        return false unless running?
7,169✔
82
        execute(*args, &task)
7,142✔
83
        true
7,121✔
84
      end
85
    end
86

87
    # @!macro [attach] executor_method_left_shift
88
    #
89
    #   Submit a task to the executor for asynchronous processing.
90
    #
91
    #   @param [Proc] task the asynchronous task to perform
92
    #
93
    #   @return [self] returns itself
94
    def <<(task)
1✔
95
      post(&task)
1,196✔
96
      self
1,186✔
97
    end
98

99
    # @!macro [attach] executor_method_running_question
100
    #
101
    #   Is the executor running?
102
    #
103
    #   @return [Boolean] `true` when running, `false` when shutting down or shutdown
104
    def running?
1✔
105
      ! stop_event.set?
11,920✔
106
    end
107

108
    # @!macro [attach] executor_method_shuttingdown_question
109
    #
110
    #   Is the executor shuttingdown?
111
    #
112
    #   @return [Boolean] `true` when not running and not shutdown, else `false`
113
    def shuttingdown?
1✔
114
      ! (running? || shutdown?)
×
115
    end
116

117
    # @!macro [attach] executor_method_shutdown_question
118
    #
119
    #   Is the executor shutdown?
120
    #
121
    #   @return [Boolean] `true` when shutdown, `false` when shutting down or running
122
    def shutdown?
1✔
123
      stopped_event.set?
539✔
124
    end
125

126
    # @!macro [attach] executor_method_shutdown
127
    #
128
    #   Begin an orderly shutdown. Tasks already in the queue will be executed,
129
    #   but no new tasks will be accepted. Has no additional effect if the
130
    #   thread pool is not running.
131
    def shutdown
1✔
132
      mutex.synchronize do
100✔
133
        break unless running?
100✔
134
        stop_event.set
91✔
135
        shutdown_execution
91✔
136
      end
137
      true
100✔
138
    end
139

140
    # @!macro [attach] executor_method_kill
141
    #
142
    #   Begin an immediate shutdown. In-progress tasks will be allowed to
143
    #   complete but enqueued tasks will be dismissed and no new tasks
144
    #   will be accepted. Has no additional effect if the thread pool is
145
    #   not running.
146
    def kill
1✔
147
      mutex.synchronize do
535✔
148
        break if shutdown?
535✔
149
        stop_event.set
193✔
150
        kill_execution
193✔
151
        stopped_event.set
193✔
152
      end
153
      true
535✔
154
    end
155

156
    # @!macro [attach] executor_method_wait_for_termination
157
    #
158
    #   Block until executor shutdown is complete or until `timeout` seconds have
159
    #   passed.
160
    #
161
    #   @note Does not initiate shutdown or termination. Either `shutdown` or `kill`
162
    #     must be called before this method (or on another thread).
163
    #
164
    #   @param [Integer] timeout the maximum number of seconds to wait for shutdown to complete
165
    #
166
    #   @return [Boolean] `true` if shutdown complete or false on `timeout`
167
    def wait_for_termination(timeout = nil)
1✔
168
      stopped_event.wait(timeout)
46✔
169
    end
170

171
    protected
1✔
172

173
    attr_reader :mutex, :stop_event, :stopped_event
1✔
174

175
    # @!macro [attach] executor_method_init_executor
176
    #
177
    #   Initialize the executor by creating and initializing all the
178
    #   internal synchronization objects.
179
    def init_executor
1✔
180
      @mutex = Mutex.new
620✔
181
      @stop_event = Event.new
620✔
182
      @stopped_event = Event.new
620✔
183
    end
184

185
    # @!macro [attach] executor_method_execute
186
    def execute(*args, &task)
1✔
187
      raise NotImplementedError
×
188
    end
189

190
    # @!macro [attach] executor_method_shutdown_execution
191
    # 
192
    #   Callback method called when an orderly shutdown has completed.
193
    #   The default behavior is to signal all waiting threads.
194
    def shutdown_execution
1✔
195
      stopped_event.set
×
196
    end
197

198
    # @!macro [attach] executor_method_kill_execution
199
    #
200
    #   Callback method called when the executor has been killed.
201
    #   The default behavior is to do nothing.
202
    def kill_execution
1✔
203
      # do nothing
204
    end
205
  end
206

207
  if RUBY_PLATFORM == 'java'
1✔
208

209
    module JavaExecutor
×
210
      include Executor
×
211

212
      # @!macro executor_method_post
213
      def post(*args)
×
214
        raise ArgumentError.new('no block given') unless block_given?
×
215
        if running?
×
216
          @executor.submit{ yield(*args) }
×
217
          true
×
218
        else
219
          false
×
220
        end
221
      rescue Java::JavaUtilConcurrent::RejectedExecutionException => ex
222
        raise RejectedExecutionError
×
223
      end
224

225
      # @!macro executor_method_left_shift
226
      def <<(task)
×
227
        post(&task)
×
228
        self
×
229
      end
230

231
      # @!macro executor_method_running_question
232
      def running?
×
233
        ! (shuttingdown? || shutdown?)
×
234
      end
235

236
      # @!macro executor_method_shuttingdown_question
237
      def shuttingdown?
×
238
        if @executor.respond_to? :isTerminating
×
239
          @executor.isTerminating
×
240
        else
241
          false
×
242
        end
243
      end
244

245
      # @!macro executor_method_shutdown_question
246
      def shutdown?
×
247
        @executor.isShutdown || @executor.isTerminated
×
248
      end
249

250
      # @!macro executor_method_wait_for_termination
251
      def wait_for_termination(timeout)
×
252
        @executor.awaitTermination(1000 * timeout, java.util.concurrent.TimeUnit::MILLISECONDS)
×
253
      end
254

255
      # @!macro executor_method_shutdown
256
      def shutdown
×
257
        @executor.shutdown
×
258
        nil
259
      end
260

261
      # @!macro executor_method_kill
262
      def kill
×
263
        @executor.shutdownNow
×
264
        nil
265
      end
266

267
      protected
×
268

269
      def set_shutdown_hook
×
270
        # without this the process may fail to exit
271
        at_exit { self.kill }
×
272
      end
273
    end
274
  end
275
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc