• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ruby-concurrency / concurrent-ruby / #2798

24 Nov 2014 04:45PM UTC coverage: 91.282% (-0.4%) from 91.64%
#2798

push

jdantonio
Merge pull request #191 from jrochkind/thread_pool_overview_doc

expanded and updated thread pool overview doc

2806 of 3074 relevant lines covered (91.28%)

371.14 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.34
/lib/concurrent/executor/serialized_execution.rb
1
require 'delegate'
1✔
2
require 'concurrent/executor/executor'
1✔
3
require 'concurrent/logging'
1✔
4
require 'concurrent/atomic/synchronization'
1✔
5

6
module Concurrent
1✔
7

8
  # Ensures passed jobs in a serialized order never running at the same time.
9
  class SerializedExecution
1✔
10
    include Logging
1✔
11
    include Synchronization
1✔
12

13
    Job = Struct.new(:executor, :args, :block) do
1✔
14
      def call
1✔
15
        block.call *args
2,593✔
16
      end
17
    end
18

19
    def initialize
1✔
20
      synchronize do
399✔
21
        @being_executed = false
399✔
22
        @stash          = []
399✔
23
      end
24
    end
25

26
    # Submit a task to the executor for asynchronous processing.
27
    #
28
    # @param [Executor] executor to be used for this job
29
    #
30
    # @param [Array] args zero or more arguments to be passed to the task
31
    #
32
    # @yield the asynchronous task to perform
33
    #
34
    # @return [Boolean] `true` if the task is queued, `false` if the executor
35
    #   is not running
36
    #
37
    # @raise [ArgumentError] if no task is given
38
    def post(executor, *args, &task)
1✔
39
      posts [[executor, args, task]]
2,604✔
40
      true
2,604✔
41
    end
42

43
    # As {#post} but allows to submit multiple tasks at once, it's guaranteed that they will not
44
    # be interleaved by other tasks.
45
    #
46
    # @param [Array<Array(Executor, Array<Object>, Proc)>] posts array of triplets where
47
    #   first is a {Executor}, second is array of args for task, third is a task (Proc)
48
    def posts(posts)
1✔
49
      # if can_overflow?
50
      #   raise ArgumentError, 'SerializedExecution does not support thread-pools which can overflow'
51
      # end
52

53
      return nil if posts.empty?
2,604✔
54

55
      jobs = posts.map { |executor, args, task| Job.new executor, args, task }
5,208✔
56

57
      job_to_post = synchronize do
2,604✔
58
        if @being_executed
2,604✔
59
          @stash.push(*jobs)
1,081✔
60
          nil
1,081✔
61
        else
62
          @being_executed = true
1,523✔
63
          @stash.push(*jobs[1..-1])
1,523✔
64
          jobs.first
1,523✔
65
        end
66
      end
67

68
      call_job job_to_post if job_to_post
2,604✔
69
      true
2,604✔
70
    end
71

72
    private
1✔
73

74
    def call_job(job)
1✔
75
      did_it_run = begin
76
        job.executor.post { work(job) }
5,195✔
77
        true
2,602✔
78
      rescue RejectedExecutionError => ex
79
        false
×
80
      end
81

82
      # TODO not the best idea to run it myself
83
      unless did_it_run
2,602✔
84
        begin
85
          work job
×
86
        rescue => ex
87
          # let it fail
88
          log DEBUG, ex
×
89
        end
90
      end
91
    end
92

93
    # ensures next job is executed if any is stashed
94
    def work(job)
1✔
95
      job.call
2,593✔
96
    ensure
97
      synchronize do
2,593✔
98
        job = @stash.shift || (@being_executed = false)
2,593✔
99
      end
100

101
      call_job job if job
2,593✔
102
    end
103
  end
104

105
  # A wrapper/delegator for any `Executor` or `ExecutorService` that
106
  # guarantees serialized execution of tasks.
107
  #
108
  # @see [SimpleDelegator](http://www.ruby-doc.org/stdlib-2.1.2/libdoc/delegate/rdoc/SimpleDelegator.html)
109
  # @see Concurrent::SerializedExecution
110
  class SerializedExecutionDelegator < SimpleDelegator
1✔
111
    include SerialExecutor
1✔
112

113
    def initialize(executor)
1✔
114
      @executor   = executor
25✔
115
      @serializer = SerializedExecution.new
25✔
116
      super(executor)
25✔
117
    end
118

119
    # @!macro executor_method_post
120
    def post(*args, &task)
1✔
121
      raise ArgumentError.new('no block given') unless block_given?
32✔
122
      return false unless running?
31✔
123
      @serializer.post(@executor, *args, &task)
22✔
124
    end
125
  end
126
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc