• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dedicate-project / beast / 83ba6ee4-64d8-4300-bcbc-007a3bb86324

pending completion
83ba6ee4-64d8-4300-bcbc-007a3bb86324

Pull #9

circleci

fairlight1337
Large block of Doxygen comments added
Pull Request #9: Adding pipeline applications

1197 of 1197 new or added lines in 21 files covered. (100.0%)

2837 of 3042 relevant lines covered (93.26%)

13117.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

47.11
/src/pipeline.cpp
1
#include <beast/pipeline.hpp>
2

3
// Standard
4
#include <algorithm>
5
#include <stdexcept>
6
#include <thread>
7

8
namespace beast {
9

10
void Pipeline::addPipe(const std::string& name, const std::shared_ptr<Pipe>& pipe) {
23✔
11
  if (pipeIsInPipeline(pipe)) {
23✔
12
    throw std::invalid_argument("Pipe already in this pipeline.");
1✔
13
  }
14

15
  if (getManagedPipeByName(name)) {
22✔
16
    throw std::invalid_argument("Pipe name already exists in this pipeline");
×
17
  }
18

19
  std::shared_ptr<ManagedPipe> managed_pipe = std::make_shared<ManagedPipe>();
44✔
20
  managed_pipe->name = name;
22✔
21
  managed_pipe->pipe = pipe;
22✔
22
  managed_pipe->should_run = false;
22✔
23
  pipes_.push_back(std::move(managed_pipe));
22✔
24
}
22✔
25

26
void Pipeline::connectPipes(const std::shared_ptr<Pipe>& source_pipe, uint32_t source_slot_index,
10✔
27
                            const std::shared_ptr<Pipe>& destination_pipe,
28
                            uint32_t destination_slot_index, uint32_t buffer_size) {
29
  // Ensure that the pipes are both present already.
30
  if (!pipeIsInPipeline(source_pipe)) {
10✔
31
    throw std::invalid_argument("Source Pipe not in this Pipeline.");
2✔
32
  }
33

34
  if (!pipeIsInPipeline(destination_pipe)) {
8✔
35
    throw std::invalid_argument("Destination Pipe not in this Pipeline.");
1✔
36
  }
37

38
  auto managed_source_pipe = getManagedPipeForPipe(source_pipe);
14✔
39
  auto managed_destination_pipe = getManagedPipeForPipe(destination_pipe);
14✔
40

41
  // Ensure this connection doesn't exist yet.
42
  for (const Connection& connection : connections_) {
7✔
43
    if (connection.source_pipe == managed_source_pipe &&
5✔
44
        connection.source_slot_index == source_slot_index) {
2✔
45
      throw std::invalid_argument("Source port already occupied on Pipe.");
2✔
46
    }
47

48
    if (connection.destination_pipe == managed_destination_pipe &&
2✔
49
        connection.destination_slot_index == destination_slot_index) {
1✔
50
      throw std::invalid_argument("Destination port already occupied on Pipe.");
1✔
51
    }
52
  }
53

54
  Connection connection{
55
      managed_source_pipe, source_slot_index, managed_destination_pipe, destination_slot_index, {}};
8✔
56
  connection.buffer.reserve(buffer_size);
4✔
57
  connections_.push_back(std::move(connection));
4✔
58
}
4✔
59

60
const std::list<std::shared_ptr<Pipeline::ManagedPipe>>& Pipeline::getPipes() const {
11✔
61
  return pipes_;
11✔
62
}
63

64
const std::list<Pipeline::Connection>& Pipeline::getConnections() const { return connections_; }
7✔
65

66
void Pipeline::start() {
5✔
67
  if (is_running_) {
5✔
68
    throw std::invalid_argument("Pipeline is already running, cannot start it.");
1✔
69
  }
70

71
  for (std::shared_ptr<ManagedPipe>& managed_pipe : pipes_) {
4✔
72
    if (!managed_pipe->is_running) {
×
73
      managed_pipe->should_run = true;
×
74
      std::thread thread(&Pipeline::pipelineWorker, this, std::ref(managed_pipe));
×
75
      std::swap(managed_pipe->thread, thread);
×
76
      managed_pipe->is_running = true;
×
77
    }
78
  }
79

80
  is_running_ = true;
4✔
81
}
4✔
82

83
void Pipeline::stop() {
3✔
84
  if (!is_running_) {
3✔
85
    throw std::invalid_argument("Pipeline is not running, cannot stop it.");
1✔
86
  }
87

88
  for (std::shared_ptr<ManagedPipe>& managed_pipe : pipes_) {
2✔
89
    if (managed_pipe->is_running) {
×
90
      managed_pipe->should_run = false;
×
91
      managed_pipe->thread.join();
×
92
      managed_pipe->is_running = false;
×
93
    }
94
  }
95

96
  is_running_ = false;
2✔
97
}
2✔
98

99
bool Pipeline::isRunning() const { return is_running_; }
20✔
100

101
bool Pipeline::pipeIsInPipeline(const std::shared_ptr<Pipe>& pipe) const {
41✔
102
  return std::find_if(pipes_.begin(),
41✔
103
                      pipes_.end(),
104
                      [&pipe](const std::shared_ptr<ManagedPipe>& managed_pipe) {
38✔
105
                        return managed_pipe->pipe == pipe;
38✔
106
                      }) != pipes_.end();
41✔
107
}
108

109
void Pipeline::findConnections(std::shared_ptr<ManagedPipe>& managed_pipe,
×
110
                               std::vector<Connection*>& source_connections,
111
                               std::vector<Connection*>& destination_connections) {
112
  for (Connection& connection : connections_) {
×
113
    if (connection.destination_pipe == managed_pipe) {
×
114
      source_connections.push_back(&connection);
×
115
    }
116
    if (connection.source_pipe == managed_pipe) {
×
117
      destination_connections.push_back(&connection);
×
118
    }
119
  }
120
}
×
121

122
void Pipeline::processOutputSlots(std::shared_ptr<ManagedPipe>& managed_pipe,
×
123
                                  const std::vector<Connection*>& destination_connections) {
124
  for (uint32_t slot_index = 0; slot_index < managed_pipe->pipe->getOutputSlotCount();
×
125
       ++slot_index) {
126
    if (!managed_pipe->pipe->hasOutput(slot_index)) {
×
127
      continue;
×
128
    }
129
    auto destination_slot_connection_iter =
130
        std::find_if(destination_connections.begin(),
131
                     destination_connections.end(),
132
                     [slot_index](const Connection* connection) {
×
133
                       return connection->destination_slot_index == slot_index;
×
134
                     });
×
135

136
    if (destination_slot_connection_iter == destination_connections.end()) {
×
137
      continue;
×
138
    }
139

140
    Connection* destination_slot_connection = *destination_slot_connection_iter;
×
141
    while (managed_pipe->pipe->hasOutput(slot_index) &&
×
142
           (destination_slot_connection->buffer.size() <
×
143
            destination_slot_connection->buffer.capacity())) {
×
144
      auto data = managed_pipe->pipe->drawOutput(slot_index);
×
145
      destination_slot_connection->buffer.push_back(std::move(data));
×
146
    }
147
  }
148
}
×
149

150
void Pipeline::processInputSlots(std::shared_ptr<ManagedPipe>& managed_pipe,
×
151
                                 const std::vector<Connection*>& source_connections) {
152
  for (uint32_t slot_index = 0; slot_index < managed_pipe->pipe->getInputSlotCount();
×
153
       ++slot_index) {
154
    auto source_slot_connection_iter =
155
        std::find_if(source_connections.begin(),
156
                     source_connections.end(),
157
                     [slot_index](const Connection* connection) {
×
158
                       return connection->source_slot_index == slot_index;
×
159
                     });
×
160

161
    if (source_slot_connection_iter == source_connections.end()) {
×
162
      continue;
×
163
    }
164

165
    Connection* source_slot_connection = *source_slot_connection_iter;
×
166
    if (source_slot_connection->buffer.empty()) {
×
167
      continue;
×
168
    }
169

170
    while (managed_pipe->pipe->inputHasSpace(slot_index) &&
×
171
           !source_slot_connection->buffer.empty()) {
×
172
      auto data = source_slot_connection->buffer.back();
×
173
      source_slot_connection->buffer.pop_back();
×
174
      managed_pipe->pipe->addInput(slot_index, data.data);
×
175
    }
176
  }
177
}
×
178

179
void Pipeline::pipelineWorker(std::shared_ptr<ManagedPipe>& managed_pipe) {
×
180
  std::vector<Connection*> source_connections;
×
181
  std::vector<Connection*> destination_connections;
×
182
  findConnections(managed_pipe, source_connections, destination_connections);
×
183

184
  while (managed_pipe->should_run) {
×
185
    processOutputSlots(managed_pipe, destination_connections);
×
186
    processInputSlots(managed_pipe, source_connections);
×
187

188
    const bool outputs_have_enough_space = !managed_pipe->pipe->outputsAreSaturated();
×
189
    const bool inputs_have_enough_data = managed_pipe->pipe->inputsAreSaturated();
×
190

191
    if (inputs_have_enough_data && outputs_have_enough_space) {
×
192
      managed_pipe->pipe->execute();
×
193
    }
194

195
    // Limit cycle time.
196
    std::this_thread::sleep_for(std::chrono::milliseconds(10));
×
197
  }
198
}
×
199

200
std::shared_ptr<Pipeline::ManagedPipe>
201
Pipeline::getManagedPipeForPipe(const std::shared_ptr<Pipe>& pipe) {
14✔
202
  for (const auto& managed_pipe : pipes_) {
25✔
203
    if (managed_pipe->pipe == pipe) {
25✔
204
      return managed_pipe;
14✔
205
    }
206
  }
207
  return nullptr;
×
208
}
209

210
std::shared_ptr<Pipeline::ManagedPipe> Pipeline::getManagedPipeByName(const std::string& name) {
22✔
211
  for (auto& managed_pipe : pipes_) {
31✔
212
    if (managed_pipe->name == name) {
9✔
213
      return managed_pipe;
×
214
    }
215
  }
216
  return nullptr;
22✔
217
}
218
} // namespace beast
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc