• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ruby-concurrency / concurrent-ruby / #2835

05 Oct 2014 10:16PM UTC coverage: 45.201% (-49.6%) from 94.81%
#2835

push

jdantonio
Merge pull request #158 from obrok/promise-composition

Promise composition

2 of 15 new or added lines in 1 file covered. (13.33%)

1514 existing lines in 84 files now uncovered.

1375 of 3042 relevant lines covered (45.2%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.74
/lib/concurrent/executor/serialized_execution.rb
1
require 'delegate'
1✔
2
require 'concurrent/executor/executor'
1✔
3
require 'concurrent/logging'
1✔
4
require 'concurrent/atomic/synchronization'
1✔
5

6
module Concurrent
1✔
7

8
  # Ensures passed jobs in a serialized order never running at the same time.
9
  class SerializedExecution
1✔
10
    include Logging
1✔
11
    include Synchronization
1✔
12

13
    Job = Struct.new(:executor, :args, :block) do
1✔
14
      def call
1✔
UNCOV
15
        block.call *args
×
16
      end
17
    end
18

19
    def initialize
1✔
UNCOV
20
      synchronize do
×
UNCOV
21
        @being_executed = false
×
UNCOV
22
        @stash          = []
×
23
      end
24
    end
25

26
    # Submit a task to the executor for asynchronous processing.
27
    #
28
    # @param [Executor] executor to be used for this job
29
    #
30
    # @param [Array] args zero or more arguments to be passed to the task
31
    #
32
    # @yield the asynchronous task to perform
33
    #
34
    # @return [Boolean] `true` if the task is queued, `false` if the executor
35
    #   is not running
36
    #
37
    # @raise [ArgumentError] if no task is given
38
    def post(executor, *args, &task)
1✔
UNCOV
39
      posts [[executor, args, task]]
×
UNCOV
40
      true
×
41
    end
42

43
    # As {#post} but allows to submit multiple tasks at once, it's guaranteed that they will not
44
    # be interleaved by other tasks.
45
    #
46
    # @param [Array<Array(Executor, Array<Object>, Proc)>] posts array of triplets where
47
    #   first is a {Executor}, second is array of args for task, third is a task (Proc)
48
    def posts(posts)
1✔
49
      # if can_overflow?
50
      #   raise ArgumentError, 'SerializedExecution does not support thread-pools which can overflow'
51
      # end
52

UNCOV
53
      return nil if posts.empty?
×
54

UNCOV
55
      jobs = posts.map { |executor, args, task| Job.new executor, args, task }
×
56

UNCOV
57
      job_to_post = synchronize do
×
UNCOV
58
        if @being_executed
×
UNCOV
59
          @stash.push(*jobs)
×
UNCOV
60
          nil
×
61
        else
UNCOV
62
          @being_executed = true
×
UNCOV
63
          @stash.push(*jobs[1..-1])
×
UNCOV
64
          jobs.first
×
65
        end
66
      end
67

UNCOV
68
      call_job job_to_post if job_to_post
×
UNCOV
69
      true
×
70
    end
71

72
    private
1✔
73

74
    def call_job(job)
1✔
75
      did_it_run = begin
UNCOV
76
        job.executor.post { work(job) }
×
UNCOV
77
        true
×
78
      rescue RejectedExecutionError => ex
79
        false
×
80
      end
81

82
      # TODO not the best idea to run it myself
UNCOV
83
      unless did_it_run
×
84
        begin
85
          work job
×
86
        rescue => ex
87
          # let it fail
88
          log DEBUG, ex
×
89
        end
90
      end
91
    end
92

93
    # ensures next job is executed if any is stashed
94
    def work(job)
1✔
UNCOV
95
      job.call
×
96
    ensure
UNCOV
97
      synchronize do
×
UNCOV
98
        job = @stash.shift || (@being_executed = false)
×
99
      end
100

UNCOV
101
      call_job job if job
×
102
    end
103
  end
104

105
  # A wrapper/delegator for any `Executor` or `ExecutorService` that
106
  # guarantees serialized execution of tasks.
107
  #
108
  # @see [SimpleDelegator](http://www.ruby-doc.org/stdlib-2.1.2/libdoc/delegate/rdoc/SimpleDelegator.html)
109
  # @see Concurrent::SerializedExecution
110
  class SerializedExecutionDelegator < SimpleDelegator
1✔
111
    include SerialExecutor
1✔
112

113
    def initialize(executor)
1✔
UNCOV
114
      @executor   = executor
×
UNCOV
115
      @serializer = SerializedExecution.new
×
UNCOV
116
      super(executor)
×
117
    end
118

119
    # @!macro executor_method_post
120
    def post(*args, &task)
1✔
UNCOV
121
      raise ArgumentError.new('no block given') unless block_given?
×
UNCOV
122
      return false unless running?
×
UNCOV
123
      @serializer.post(@executor, *args, &task)
×
124
    end
125
  end
126
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc