• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ruby-concurrency / concurrent-ruby / #2835

05 Oct 2014 10:16PM UTC coverage: 45.201% (-49.6%) from 94.81%
#2835

push

jdantonio
Merge pull request #158 from obrok/promise-composition

Promise composition

2 of 15 new or added lines in 1 file covered. (13.33%)

1514 existing lines in 84 files now uncovered.

1375 of 3042 relevant lines covered (45.2%)

0.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

47.37
/lib/concurrent/dataflow.rb
1
require 'concurrent/future'
1✔
2
require 'concurrent/atomic/atomic_fixnum'
1✔
3
require 'concurrent/executor/per_thread_executor'
1✔
4

5
module Concurrent
1✔
6

7
  # @!visibility private
8
  class DependencyCounter # :nodoc:
1✔
9

10
    def initialize(count, &block)
1✔
UNCOV
11
      @counter = AtomicFixnum.new(count)
×
UNCOV
12
      @block = block
×
13
    end
14

15
    def update(time, value, reason)
1✔
UNCOV
16
      if @counter.decrement == 0
×
UNCOV
17
        @block.call
×
18
      end
19
    end
20
  end
21

22
  # Dataflow allows you to create a task that will be scheduled then all of its
23
  # data dependencies are available. Data dependencies are `Future` values. The
24
  # dataflow task itself is also a `Future` value, so you can build up a graph of
25
  # these tasks, each of which is run when all the data and other tasks it depends
26
  # on are available or completed.
27
  #
28
  # Our syntax is somewhat related to that of Akka's `flow` and Habanero Java's
29
  # `DataDrivenFuture`. However unlike Akka we don't schedule a task at all until
30
  # it is ready to run, and unlike Habanero Java we pass the data values into the
31
  # task instead of dereferencing them again in the task.
32
  #
33
  # The theory of dataflow goes back to the 80s. In the terminology of the literature,
34
  # our implementation is coarse-grained, in that each task can be many instructions,
35
  # and dynamic in that you can create more tasks within other tasks.
36
  #
37
  # @example Parallel Fibonacci calculator
38
  #   def fib(n)
39
  #     if n < 2
40
  #       Concurrent::dataflow { n }
41
  #     else
42
  #       n1 = fib(n - 1)
43
  #       n2 = fib(n - 2)
44
  #       Concurrent::dataflow(n1, n2) { |v1, v2| v1 + v2 }
45
  #     end
46
  #   end
47
  #   
48
  #   f = fib(14) #=> #<Concurrent::Future:0x000001019a26d8 ...
49
  #   
50
  #   # wait up to 1 second for the answer...
51
  #   f.value(1) #=> 377
52
  #
53
  # @param [Future] inputs zero or more `Future` operations that this dataflow depends upon
54
  #
55
  # @yield The operation to perform once all the dependencies are met
56
  # @yieldparam [Future] inputs each of the `Future` inputs to the dataflow
57
  # @yieldreturn [Object] the result of the block operation
58
  #
59
  # @return [Object] the result of all the operations
60
  #
61
  # @raise [ArgumentError] if no block is given
62
  # @raise [ArgumentError] if any of the inputs are not `IVar`s
63
  def dataflow(*inputs, &block)
1✔
UNCOV
64
    dataflow_with(Concurrent.configuration.global_operation_pool, *inputs, &block)
×
65
  end
66
  module_function :dataflow
1✔
67

68
  def dataflow_with(executor, *inputs, &block)
1✔
UNCOV
69
    call_dataflow(:value, executor, *inputs, &block)
×
70
  end
71
  module_function :dataflow_with
1✔
72
  
73
  def dataflow!(*inputs, &block)
1✔
UNCOV
74
    dataflow_with!(Concurrent.configuration.global_task_pool, *inputs, &block)
×
75
  end
76
  module_function :dataflow!
1✔
77

78
  def dataflow_with!(executor, *inputs, &block)
1✔
UNCOV
79
    call_dataflow(:value!, executor, *inputs, &block)
×
80
  end
81
  module_function :dataflow_with!
1✔
82

83
  private 
1✔
84

85
  def call_dataflow(method, executor, *inputs, &block)
1✔
UNCOV
86
    raise ArgumentError.new('an executor must be provided') if executor.nil?
×
UNCOV
87
    raise ArgumentError.new('no block given') unless block_given?
×
UNCOV
88
    raise ArgumentError.new('not all dependencies are IVars') unless inputs.all? { |input| input.is_a? IVar }
×
89

UNCOV
90
    result = Future.new(executor: executor) do
×
UNCOV
91
      values = inputs.map { |input| input.send(method) }
×
UNCOV
92
      block.call(*values)
×
93
    end
94

UNCOV
95
    if inputs.empty?
×
UNCOV
96
      result.execute
×
97
    else
UNCOV
98
      counter = DependencyCounter.new(inputs.size) { result.execute }
×
99

UNCOV
100
      inputs.each do |input|
×
UNCOV
101
        input.add_observer counter
×
102
      end
103
    end
104

UNCOV
105
    result
×
106
  end
107
  module_function :call_dataflow
1✔
108
end
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc