• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ruby-concurrency / concurrent-ruby / #2743

04 Mar 2015 02:59AM UTC coverage: 45.152% (-50.4%) from 95.548%
#2743

push

jdantonio
Merge pull request #258 from ruby-concurrency/clock_time

Closes #256

23 of 69 new or added lines in 7 files covered. (33.33%)

1563 existing lines in 88 files now uncovered.

1425 of 3156 relevant lines covered (45.15%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

47.37
/lib/concurrent/dataflow.rb
1
require 'concurrent/future'
1✔
2
require 'concurrent/atomic/atomic_fixnum'
1✔
3
require 'concurrent/executor/per_thread_executor'
1✔
4

5
module Concurrent
1✔
6

7
  # @!visibility private
8
  class DependencyCounter # :nodoc:
1✔
9

10
    def initialize(count, &block)
1✔
UNCOV
11
      @counter = AtomicFixnum.new(count)
×
UNCOV
12
      @block = block
×
13
    end
14

15
    def update(time, value, reason)
1✔
UNCOV
16
      if @counter.decrement == 0
×
UNCOV
17
        @block.call
×
18
      end
19
    end
20
  end
21

22
  # {include:file:doc/dataflow.md}
23
  #
24
  # @param [Future] inputs zero or more `Future` operations that this dataflow depends upon
25
  #
26
  # @yield The operation to perform once all the dependencies are met
27
  # @yieldparam [Future] inputs each of the `Future` inputs to the dataflow
28
  # @yieldreturn [Object] the result of the block operation
29
  #
30
  # @return [Object] the result of all the operations
31
  #
32
  # @raise [ArgumentError] if no block is given
33
  # @raise [ArgumentError] if any of the inputs are not `IVar`s
34
  def dataflow(*inputs, &block)
1✔
UNCOV
35
    dataflow_with(Concurrent.configuration.global_operation_pool, *inputs, &block)
×
36
  end
37
  module_function :dataflow
1✔
38

39
  def dataflow_with(executor, *inputs, &block)
1✔
UNCOV
40
    call_dataflow(:value, executor, *inputs, &block)
×
41
  end
42
  module_function :dataflow_with
1✔
43
  
44
  def dataflow!(*inputs, &block)
1✔
UNCOV
45
    dataflow_with!(Concurrent.configuration.global_task_pool, *inputs, &block)
×
46
  end
47
  module_function :dataflow!
1✔
48

49
  def dataflow_with!(executor, *inputs, &block)
1✔
UNCOV
50
    call_dataflow(:value!, executor, *inputs, &block)
×
51
  end
52
  module_function :dataflow_with!
1✔
53

54
  private 
1✔
55

56
  def call_dataflow(method, executor, *inputs, &block)
1✔
UNCOV
57
    raise ArgumentError.new('an executor must be provided') if executor.nil?
×
UNCOV
58
    raise ArgumentError.new('no block given') unless block_given?
×
UNCOV
59
    raise ArgumentError.new('not all dependencies are IVars') unless inputs.all? { |input| input.is_a? IVar }
×
60

UNCOV
61
    result = Future.new(executor: executor) do
×
UNCOV
62
      values = inputs.map { |input| input.send(method) }
×
UNCOV
63
      block.call(*values)
×
64
    end
65

UNCOV
66
    if inputs.empty?
×
UNCOV
67
      result.execute
×
68
    else
UNCOV
69
      counter = DependencyCounter.new(inputs.size) { result.execute }
×
70

UNCOV
71
      inputs.each do |input|
×
UNCOV
72
        input.add_observer counter
×
73
      end
74
    end
75

UNCOV
76
    result
×
77
  end
78
  module_function :call_dataflow
1✔
79
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc