• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dedicate-project / beast / 8c849f2c-086d-406b-a03d-7ef8df12cdc1

pending completion
8c849f2c-086d-406b-a03d-7ef8df12cdc1

Pull #9

circleci

fairlight1337
Fixed more code smells
Pull Request #9: Adding pipeline applications

1247 of 1247 new or added lines in 23 files covered. (100.0%)

2868 of 3092 relevant lines covered (92.76%)

16138.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.21
/src/pipeline.cpp
1
#include <beast/pipeline.hpp>
2

3
// Standard
4
#include <algorithm>
5
#include <stdexcept>
6
#include <thread>
7

8
namespace beast {
9

10
void Pipeline::addPipe(const std::string& name, const std::shared_ptr<Pipe>& pipe) {
23✔
11
  if (pipeIsInPipeline(pipe)) {
23✔
12
    throw std::invalid_argument("Pipe already in this pipeline.");
1✔
13
  }
14

15
  if (getManagedPipeByName(name)) {
22✔
16
    throw std::invalid_argument("Pipe name already exists in this pipeline");
×
17
  }
18

19
  auto managed_pipe = std::make_shared<ManagedPipe>();
44✔
20
  managed_pipe->name = name;
22✔
21
  managed_pipe->pipe = pipe;
22✔
22
  managed_pipe->should_run = false;
22✔
23
  pipes_.push_back(std::move(managed_pipe));
22✔
24
}
22✔
25

26
void Pipeline::connectPipes(const std::shared_ptr<Pipe>& source_pipe, uint32_t source_slot_index,
10✔
27
                            const std::shared_ptr<Pipe>& destination_pipe,
28
                            uint32_t destination_slot_index, uint32_t buffer_size) {
29
  // Ensure that the pipes are both present already.
30
  if (!pipeIsInPipeline(source_pipe)) {
10✔
31
    throw std::invalid_argument("Source Pipe not in this Pipeline.");
2✔
32
  }
33

34
  if (!pipeIsInPipeline(destination_pipe)) {
8✔
35
    throw std::invalid_argument("Destination Pipe not in this Pipeline.");
1✔
36
  }
37

38
  auto managed_source_pipe = getManagedPipeForPipe(source_pipe);
14✔
39
  auto managed_destination_pipe = getManagedPipeForPipe(destination_pipe);
14✔
40

41
  // Ensure this connection doesn't exist yet.
42
  for (const std::shared_ptr<Connection>& connection : connections_) {
7✔
43
    if (connection->source_pipe == managed_source_pipe &&
5✔
44
        connection->source_slot_index == source_slot_index) {
2✔
45
      throw std::invalid_argument("Source port already occupied on Pipe.");
2✔
46
    }
47

48
    if (connection->destination_pipe == managed_destination_pipe &&
2✔
49
        connection->destination_slot_index == destination_slot_index) {
1✔
50
      throw std::invalid_argument("Destination port already occupied on Pipe.");
1✔
51
    }
52
  }
53

54
  auto connection = std::make_shared<Connection>();
8✔
55
  connection->source_pipe = managed_source_pipe;
4✔
56
  connection->source_slot_index = source_slot_index;
4✔
57
  connection->destination_pipe = managed_destination_pipe;
4✔
58
  connection->destination_slot_index = destination_slot_index;
4✔
59
  connection->buffer_size = buffer_size;
4✔
60
  connections_.push_back(std::move(connection));
4✔
61
}
4✔
62

63
const std::list<std::shared_ptr<Pipeline::ManagedPipe>>& Pipeline::getPipes() const {
11✔
64
  return pipes_;
11✔
65
}
66

67
const std::list<std::shared_ptr<Pipeline::Connection>>& Pipeline::getConnections() const {
7✔
68
  return connections_;
7✔
69
}
70

71
void Pipeline::start() {
5✔
72
  if (is_running_) {
5✔
73
    throw std::invalid_argument("Pipeline is already running, cannot start it.");
1✔
74
  }
75

76
  for (std::shared_ptr<ManagedPipe>& managed_pipe : pipes_) {
4✔
77
    if (!managed_pipe->is_running) {
×
78
      managed_pipe->should_run = true;
×
79
      std::thread thread(&Pipeline::pipelineWorker, this, std::ref(managed_pipe));
×
80
      std::swap(managed_pipe->thread, thread);
×
81
      managed_pipe->is_running = true;
×
82
    }
83
  }
84

85
  is_running_ = true;
4✔
86
}
4✔
87

88
void Pipeline::stop() {
3✔
89
  if (!is_running_) {
3✔
90
    throw std::invalid_argument("Pipeline is not running, cannot stop it.");
1✔
91
  }
92

93
  for (const std::shared_ptr<ManagedPipe>& managed_pipe : pipes_) {
2✔
94
    if (managed_pipe->is_running) {
×
95
      managed_pipe->should_run = false;
×
96
      managed_pipe->thread.join();
×
97
      managed_pipe->is_running = false;
×
98
    }
99
  }
100

101
  is_running_ = false;
2✔
102
}
2✔
103

104
bool Pipeline::isRunning() const { return is_running_; }
20✔
105

106
bool Pipeline::pipeIsInPipeline(const std::shared_ptr<Pipe>& pipe) const {
41✔
107
  return std::find_if(pipes_.begin(),
41✔
108
                      pipes_.end(),
109
                      [&pipe](const std::shared_ptr<ManagedPipe>& managed_pipe) {
38✔
110
                        return managed_pipe->pipe == pipe;
38✔
111
                      }) != pipes_.end();
41✔
112
}
113

114
void Pipeline::findConnections(
×
115
    const std::shared_ptr<ManagedPipe>& managed_pipe,
116
    std::vector<std::shared_ptr<Connection>>& source_connections,
117
    std::vector<std::shared_ptr<Connection>>& destination_connections) const {
118
  for (const std::shared_ptr<Connection>& connection : connections_) {
×
119
    if (connection->destination_pipe == managed_pipe) {
×
120
      source_connections.push_back(connection);
×
121
    }
122
    if (connection->source_pipe == managed_pipe) {
×
123
      destination_connections.push_back(connection);
×
124
    }
125
  }
126
}
×
127

128
void Pipeline::processOutputSlots(
×
129
    const std::shared_ptr<ManagedPipe>& managed_pipe,
130
    const std::vector<std::shared_ptr<Connection>>& destination_connections) {
131
  for (uint32_t slot_index = 0; slot_index < managed_pipe->pipe->getOutputSlotCount();
×
132
       ++slot_index) {
133
    if (!managed_pipe->pipe->hasOutput(slot_index)) {
×
134
      continue;
×
135
    }
136
    auto destination_slot_connection_iter =
137
        std::find_if(destination_connections.begin(),
138
                     destination_connections.end(),
139
                     [slot_index](const std::shared_ptr<Connection>& connection) {
×
140
                       return connection->destination_slot_index == slot_index;
×
141
                     });
×
142

143
    if (destination_slot_connection_iter == destination_connections.end()) {
×
144
      continue;
×
145
    }
146

147
    const std::shared_ptr<Connection>& destination_slot_connection =
148
        *destination_slot_connection_iter;
×
149
    std::scoped_lock lock(destination_slot_connection->buffer_mutex);
×
150
    while (
×
151
        managed_pipe->pipe->hasOutput(slot_index) &&
×
152
        (destination_slot_connection->buffer.size() < destination_slot_connection->buffer_size)) {
×
153
      auto data = managed_pipe->pipe->drawOutput(slot_index);
×
154
      destination_slot_connection->buffer.push_back(std::move(data));
×
155
    }
156
  }
157
}
×
158

159
void Pipeline::processInputSlots(
×
160
    const std::shared_ptr<ManagedPipe>& managed_pipe,
161
    const std::vector<std::shared_ptr<Connection>>& source_connections) {
162
  for (uint32_t slot_index = 0; slot_index < managed_pipe->pipe->getInputSlotCount();
×
163
       ++slot_index) {
164
    auto source_slot_connection_iter =
165
        std::find_if(source_connections.begin(),
166
                     source_connections.end(),
167
                     [slot_index](const std::shared_ptr<Connection>& connection) {
×
168
                       return connection->source_slot_index == slot_index;
×
169
                     });
×
170

171
    if (source_slot_connection_iter == source_connections.end()) {
×
172
      continue;
×
173
    }
174

175
    const std::shared_ptr<Connection>& source_slot_connection = *source_slot_connection_iter;
×
176
    if (source_slot_connection->buffer.empty()) {
×
177
      continue;
×
178
    }
179

180
    std::scoped_lock lock(source_slot_connection->buffer_mutex);
×
181
    while (managed_pipe->pipe->inputHasSpace(slot_index) &&
×
182
           !source_slot_connection->buffer.empty()) {
×
183
      auto data = source_slot_connection->buffer.back();
×
184
      source_slot_connection->buffer.pop_back();
×
185
      managed_pipe->pipe->addInput(slot_index, data.data);
×
186
    }
187
  }
188
}
×
189

190
void Pipeline::pipelineWorker(const std::shared_ptr<ManagedPipe>& managed_pipe) {
×
191
  std::vector<std::shared_ptr<Connection>> source_connections;
×
192
  std::vector<std::shared_ptr<Connection>> destination_connections;
×
193
  findConnections(managed_pipe, source_connections, destination_connections);
×
194

195
  while (managed_pipe->should_run) {
×
196
    processOutputSlots(managed_pipe, destination_connections);
×
197
    processInputSlots(managed_pipe, source_connections);
×
198

199
    if (!managed_pipe->pipe->outputsAreSaturated() && managed_pipe->pipe->inputsAreSaturated()) {
×
200
      managed_pipe->pipe->execute();
×
201
    }
202

203
    // Limit cycle time.
204
    std::this_thread::sleep_for(std::chrono::milliseconds(10));
×
205
  }
206
}
×
207

208
std::shared_ptr<Pipeline::ManagedPipe>
209
Pipeline::getManagedPipeForPipe(const std::shared_ptr<Pipe>& pipe) const {
14✔
210
  for (const auto& managed_pipe : pipes_) {
25✔
211
    if (managed_pipe->pipe == pipe) {
25✔
212
      return managed_pipe;
14✔
213
    }
214
  }
215
  return nullptr;
×
216
}
217

218
std::shared_ptr<Pipeline::ManagedPipe> Pipeline::getManagedPipeByName(std::string_view name) const {
22✔
219
  for (const auto& managed_pipe : pipes_) {
31✔
220
    if (managed_pipe->name == name) {
9✔
221
      return managed_pipe;
×
222
    }
223
  }
224
  return nullptr;
22✔
225
}
226

227
} // namespace beast
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc