• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dedicate-project / beast / 5769674b-2abc-4901-862a-bb80b2dc41c5

pending completion
5769674b-2abc-4901-862a-bb80b2dc41c5

Pull #9

circleci

fairlight1337
Introduced metrics functions for pipelines
Pull Request #9: Adding pipeline applications

1281 of 1281 new or added lines in 23 files covered. (100.0%)

2873 of 3126 relevant lines covered (91.91%)

14977.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

41.18
/src/pipeline.cpp
1
#include <beast/pipeline.hpp>
2

3
// Standard
4
#include <algorithm>
5
#include <stdexcept>
6
#include <thread>
7

8
namespace beast {
9

10
Pipeline::Pipeline() { metrics_.measure_time_start = std::chrono::steady_clock::now(); }
40✔
11

12
void Pipeline::addPipe(const std::string& name, const std::shared_ptr<Pipe>& pipe) {
23✔
13
  if (pipeIsInPipeline(pipe)) {
23✔
14
    throw std::invalid_argument("Pipe already in this pipeline.");
1✔
15
  }
16

17
  if (getManagedPipeByName(name)) {
22✔
18
    throw std::invalid_argument("Pipe name already exists in this pipeline");
×
19
  }
20

21
  auto managed_pipe = std::make_shared<ManagedPipe>();
44✔
22
  managed_pipe->name = name;
22✔
23
  managed_pipe->pipe = pipe;
22✔
24
  managed_pipe->should_run = false;
22✔
25
  pipes_.push_back(std::move(managed_pipe));
22✔
26
}
22✔
27

28
void Pipeline::connectPipes(const std::shared_ptr<Pipe>& source_pipe, uint32_t source_slot_index,
10✔
29
                            const std::shared_ptr<Pipe>& destination_pipe,
30
                            uint32_t destination_slot_index, uint32_t buffer_size) {
31
  // Ensure that the pipes are both present already.
32
  if (!pipeIsInPipeline(source_pipe)) {
10✔
33
    throw std::invalid_argument("Source Pipe not in this Pipeline.");
2✔
34
  }
35

36
  if (!pipeIsInPipeline(destination_pipe)) {
8✔
37
    throw std::invalid_argument("Destination Pipe not in this Pipeline.");
1✔
38
  }
39

40
  auto managed_source_pipe = getManagedPipeForPipe(source_pipe);
14✔
41
  auto managed_destination_pipe = getManagedPipeForPipe(destination_pipe);
14✔
42

43
  // Ensure this connection doesn't exist yet.
44
  for (const std::shared_ptr<Connection>& connection : connections_) {
7✔
45
    if (connection->source_pipe == managed_source_pipe &&
5✔
46
        connection->source_slot_index == source_slot_index) {
2✔
47
      throw std::invalid_argument("Source port already occupied on Pipe.");
2✔
48
    }
49

50
    if (connection->destination_pipe == managed_destination_pipe &&
2✔
51
        connection->destination_slot_index == destination_slot_index) {
1✔
52
      throw std::invalid_argument("Destination port already occupied on Pipe.");
1✔
53
    }
54
  }
55

56
  auto connection = std::make_shared<Connection>();
8✔
57
  connection->source_pipe = managed_source_pipe;
4✔
58
  connection->source_slot_index = source_slot_index;
4✔
59
  connection->destination_pipe = managed_destination_pipe;
4✔
60
  connection->destination_slot_index = destination_slot_index;
4✔
61
  connection->buffer_size = buffer_size;
4✔
62
  connections_.push_back(std::move(connection));
4✔
63
}
4✔
64

65
const std::list<std::shared_ptr<Pipeline::ManagedPipe>>& Pipeline::getPipes() const {
11✔
66
  return pipes_;
11✔
67
}
68

69
const std::list<std::shared_ptr<Pipeline::Connection>>& Pipeline::getConnections() const {
7✔
70
  return connections_;
7✔
71
}
72

73
void Pipeline::start() {
5✔
74
  if (is_running_) {
5✔
75
    throw std::invalid_argument("Pipeline is already running, cannot start it.");
1✔
76
  }
77

78
  for (std::shared_ptr<ManagedPipe>& managed_pipe : pipes_) {
4✔
79
    if (!managed_pipe->is_running) {
×
80
      managed_pipe->should_run = true;
×
81
      std::thread thread(&Pipeline::pipelineWorker, this, std::ref(managed_pipe));
×
82
      std::swap(managed_pipe->thread, thread);
×
83
      managed_pipe->is_running = true;
×
84
    }
85
  }
86

87
  is_running_ = true;
4✔
88
}
4✔
89

90
void Pipeline::stop() {
3✔
91
  if (!is_running_) {
3✔
92
    throw std::invalid_argument("Pipeline is not running, cannot stop it.");
1✔
93
  }
94

95
  for (const std::shared_ptr<ManagedPipe>& managed_pipe : pipes_) {
2✔
96
    if (managed_pipe->is_running) {
×
97
      managed_pipe->should_run = false;
×
98
      managed_pipe->thread.join();
×
99
      managed_pipe->is_running = false;
×
100
    }
101
  }
102

103
  is_running_ = false;
2✔
104
}
2✔
105

106
bool Pipeline::isRunning() const { return is_running_; }
20✔
107

108
Pipeline::PipelineMetrics Pipeline::getMetrics() {
×
109
  std::scoped_lock lock(metrics_mutex_);
×
110
  // Cache and reset metrics object.
111
  PipelineMetrics metrics = metrics_;
×
112
  metrics_ = PipelineMetrics{};
×
113
  metrics_.measure_time_start = std::chrono::steady_clock::now();
×
114

115
  return metrics;
×
116
}
117

118
bool Pipeline::pipeIsInPipeline(const std::shared_ptr<Pipe>& pipe) const {
41✔
119
  return std::find_if(pipes_.begin(),
41✔
120
                      pipes_.end(),
121
                      [&pipe](const std::shared_ptr<ManagedPipe>& managed_pipe) {
38✔
122
                        return managed_pipe->pipe == pipe;
38✔
123
                      }) != pipes_.end();
41✔
124
}
125

126
void Pipeline::findConnections(
×
127
    const std::shared_ptr<ManagedPipe>& managed_pipe,
128
    std::vector<std::shared_ptr<Connection>>& source_connections,
129
    std::vector<std::shared_ptr<Connection>>& destination_connections) const {
130
  for (const std::shared_ptr<Connection>& connection : connections_) {
×
131
    if (connection->destination_pipe == managed_pipe) {
×
132
      source_connections.push_back(connection);
×
133
    }
134
    if (connection->source_pipe == managed_pipe) {
×
135
      destination_connections.push_back(connection);
×
136
    }
137
  }
138
}
×
139

140
std::unordered_map<uint32_t, uint32_t> Pipeline::processOutputSlots(
×
141
    const std::shared_ptr<ManagedPipe>& managed_pipe,
142
    const std::vector<std::shared_ptr<Connection>>& destination_connections) {
143
  std::unordered_map<uint32_t, uint32_t> metrics;
×
144
  for (uint32_t slot_index = 0; slot_index < managed_pipe->pipe->getOutputSlotCount();
×
145
       ++slot_index) {
146
    metrics[slot_index] = 0;
×
147
    if (!managed_pipe->pipe->hasOutput(slot_index)) {
×
148
      continue;
×
149
    }
150
    auto destination_slot_connection_iter =
151
        std::find_if(destination_connections.begin(),
152
                     destination_connections.end(),
153
                     [slot_index](const std::shared_ptr<Connection>& connection) {
×
154
                       return connection->destination_slot_index == slot_index;
×
155
                     });
×
156

157
    if (destination_slot_connection_iter == destination_connections.end()) {
×
158
      continue;
×
159
    }
160

161
    const std::shared_ptr<Connection>& destination_slot_connection =
162
        *destination_slot_connection_iter;
×
163
    std::scoped_lock lock(destination_slot_connection->buffer_mutex);
×
164
    while (
×
165
        managed_pipe->pipe->hasOutput(slot_index) &&
×
166
        (destination_slot_connection->buffer.size() < destination_slot_connection->buffer_size)) {
×
167
      auto data = managed_pipe->pipe->drawOutput(slot_index);
×
168
      destination_slot_connection->buffer.push_back(std::move(data));
×
169
      metrics[slot_index]++;
×
170
    }
171
  }
172
  return metrics;
×
173
}
174

175
std::unordered_map<uint32_t, uint32_t>
176
Pipeline::processInputSlots(const std::shared_ptr<ManagedPipe>& managed_pipe,
×
177
                            const std::vector<std::shared_ptr<Connection>>& source_connections) {
178
  std::unordered_map<uint32_t, uint32_t> metrics;
×
179
  for (uint32_t slot_index = 0; slot_index < managed_pipe->pipe->getInputSlotCount();
×
180
       ++slot_index) {
181
    metrics[slot_index] = 0;
×
182
    auto source_slot_connection_iter =
183
        std::find_if(source_connections.begin(),
184
                     source_connections.end(),
185
                     [slot_index](const std::shared_ptr<Connection>& connection) {
×
186
                       return connection->source_slot_index == slot_index;
×
187
                     });
×
188

189
    if (source_slot_connection_iter == source_connections.end()) {
×
190
      continue;
×
191
    }
192

193
    const std::shared_ptr<Connection>& source_slot_connection = *source_slot_connection_iter;
×
194
    if (source_slot_connection->buffer.empty()) {
×
195
      continue;
×
196
    }
197

198
    std::scoped_lock lock(source_slot_connection->buffer_mutex);
×
199
    while (managed_pipe->pipe->inputHasSpace(slot_index) &&
×
200
           !source_slot_connection->buffer.empty()) {
×
201
      auto data = source_slot_connection->buffer.back();
×
202
      source_slot_connection->buffer.pop_back();
×
203
      managed_pipe->pipe->addInput(slot_index, data.data);
×
204
      metrics[slot_index]++;
×
205
    }
206
  }
207
  return metrics;
×
208
}
209

210
void Pipeline::pipelineWorker(const std::shared_ptr<ManagedPipe>& managed_pipe) {
×
211
  std::vector<std::shared_ptr<Connection>> source_connections;
×
212
  std::vector<std::shared_ptr<Connection>> destination_connections;
×
213
  findConnections(managed_pipe, source_connections, destination_connections);
×
214

215
  while (managed_pipe->should_run) {
×
216
    const auto input_metrics = processInputSlots(managed_pipe, source_connections);
×
217

218
    bool executed = false;
×
219
    if (!managed_pipe->pipe->outputsAreSaturated() && managed_pipe->pipe->inputsAreSaturated()) {
×
220
      managed_pipe->pipe->execute();
×
221
      executed = true;
×
222
    }
223

224
    const auto output_metrics = processOutputSlots(managed_pipe, destination_connections);
×
225

226
    // Limit cycle time.
227
    std::this_thread::sleep_for(std::chrono::milliseconds(10));
×
228

229
    // Report metrics for this worker.
230
    reportMetrics(managed_pipe, executed, input_metrics, output_metrics);
×
231
  }
232
}
×
233

234
std::shared_ptr<Pipeline::ManagedPipe>
235
Pipeline::getManagedPipeForPipe(const std::shared_ptr<Pipe>& pipe) const {
14✔
236
  for (const auto& managed_pipe : pipes_) {
25✔
237
    if (managed_pipe->pipe == pipe) {
25✔
238
      return managed_pipe;
14✔
239
    }
240
  }
241
  return nullptr;
×
242
}
243

244
std::shared_ptr<Pipeline::ManagedPipe> Pipeline::getManagedPipeByName(std::string_view name) const {
22✔
245
  for (const auto& managed_pipe : pipes_) {
31✔
246
    if (managed_pipe->name == name) {
9✔
247
      return managed_pipe;
×
248
    }
249
  }
250
  return nullptr;
22✔
251
}
252

253
void Pipeline::reportMetrics(const std::shared_ptr<ManagedPipe>& managed_pipe, bool executed,
×
254
                             const std::unordered_map<uint32_t, uint32_t>& input_metrics,
255
                             const std::unordered_map<uint32_t, uint32_t>& output_metrics) {
256
  const std::string& pipe_name = managed_pipe->name;
×
257

258
  std::scoped_lock lock(metrics_mutex_);
×
259
  PipeMetrics& pipe_metrics = metrics_.pipes[pipe_name];
×
260

261
  if (executed) {
×
262
    pipe_metrics.execution_count++;
×
263
  }
264

265
  for (const auto& input_pair : input_metrics) {
×
266
    pipe_metrics.inputs_received[input_pair.first] += input_pair.second;
×
267
  }
268
  for (const auto& output_pair : output_metrics) {
×
269
    pipe_metrics.outputs_sent[output_pair.first] += output_pair.second;
×
270
  }
271
}
×
272

273
} // namespace beast
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc