• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dedicate-project / beast / 49939c40-5d31-48c2-8363-71719028ff1b

pending completion
49939c40-5d31-48c2-8363-71719028ff1b

Pull #9

circleci

fairlight1337
Made the right-click menu on the frontend a bit nicer
Pull Request #9: Adding pipeline applications

1399 of 1399 new or added lines in 23 files covered. (100.0%)

2922 of 3244 relevant lines covered (90.07%)

15773.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.6
/src/pipeline_manager.cpp
1
#include <beast/pipeline_manager.hpp>
2

3
// Standard
4
#include <chrono>
5

6
// Internal
7
#include <beast/pipes/evaluator_pipe.hpp>
8
#include <beast/pipes/evolution_pipe.hpp>
9
#include <beast/pipes/null_sink_pipe.hpp>
10
#include <beast/pipes/program_factory_pipe.hpp>
11

12
#include <beast/program_factory_base.hpp>
13
#include <beast/random_program_factory.hpp>
14

15
#include <beast/evaluators/maze_evaluator.hpp>
16

17
namespace beast {
18

19
PipelineManager::PipelineManager(const std::string& storage_path, uint32_t metrics_interval_time,
31✔
20
                                 uint32_t metrics_window_size)
31✔
21
    : filesystem_(storage_path), metrics_interval_time_{metrics_interval_time},
22
      metrics_window_size_{metrics_window_size} {
31✔
23
  std::scoped_lock lock{pipelines_mutex_};
31✔
24
  for (const auto& model : filesystem_.loadModels()) {
32✔
25
    PipelineDescriptor descriptor;
2✔
26
    descriptor.id = getFreeId();
1✔
27
    descriptor.name = model["content"]["name"];
1✔
28
    descriptor.filename = model["filename"];
1✔
29
    descriptor.pipeline = constructPipelineFromJson(model["content"]["model"]);
1✔
30
    descriptor.metadata = model["content"]["metadata"];
1✔
31
    pipelines_.push_back(std::move(descriptor));
1✔
32
  }
33

34
  if (metrics_interval_time == 0) {
31✔
35
    throw std::invalid_argument("metrics_interval_time must be > 0");
×
36
  }
37

38
  if (metrics_window_size == 0) {
31✔
39
    throw std::invalid_argument("metrics_window_size must be > 0");
×
40
  }
41

42
  metrics_time_constant_ =
31✔
43
      static_cast<uint32_t>(std::ceil(metrics_interval_time / metrics_window_size));
31✔
44

45
  should_run_metrics_collector_ = true;
31✔
46
  metrics_collector_thread_ = std::thread(&PipelineManager::metricsCollectorWorker, this);
31✔
47
}
31✔
48

49
PipelineManager::~PipelineManager() {
31✔
50
  should_run_metrics_collector_ = false;
31✔
51
  metrics_collector_thread_.join();
31✔
52
}
31✔
53

54
uint32_t PipelineManager::createPipeline(const std::string& name) {
18✔
55
  std::scoped_lock lock{pipelines_mutex_};
36✔
56
  nlohmann::json model;
36✔
57
  model["pipes"] = nlohmann::json::array();
18✔
58
  model["connections"] = {};
18✔
59
  const std::string filename = filesystem_.saveModel(name, model);
36✔
60
  const uint32_t new_id = getFreeId();
18✔
61

62
  PipelineDescriptor descriptor;
18✔
63
  descriptor.id = new_id;
18✔
64
  descriptor.name = name;
18✔
65
  descriptor.filename = filename;
18✔
66
  descriptor.pipeline = std::make_shared<Pipeline>();
18✔
67

68
  pipelines_.push_back(descriptor);
18✔
69

70
  return new_id;
36✔
71
}
72

73
void PipelineManager::savePipeline(uint32_t pipeline_id) {
×
74
  const auto& descriptor = getPipelineById(pipeline_id);
×
75
  const auto model = deconstructPipelineToJson(descriptor.pipeline);
×
76
  filesystem_.updateModel(descriptor.filename, descriptor.name, model, descriptor.metadata);
×
77
}
×
78

79
PipelineManager::PipelineDescriptor& PipelineManager::getPipelineById(uint32_t pipeline_id) {
28✔
80
  for (PipelineDescriptor& descriptor : pipelines_) {
28✔
81
    if (descriptor.id == pipeline_id) {
23✔
82
      return descriptor;
23✔
83
    }
84
  }
85
  throw std::invalid_argument("Pipeline with this ID not found: " + std::to_string(pipeline_id));
5✔
86
}
87

88
const std::list<PipelineManager::PipelineDescriptor>& PipelineManager::getPipelines() const {
3✔
89
  return pipelines_;
3✔
90
}
91

92
void PipelineManager::updatePipelineName(uint32_t pipeline_id, const std::string_view new_name) {
1✔
93
  std::scoped_lock lock{pipelines_mutex_};
2✔
94
  PipelineDescriptor& descriptor = getPipelineById(pipeline_id);
1✔
95
  descriptor.name = new_name;
1✔
96
}
1✔
97

98
void PipelineManager::deletePipeline(uint32_t pipeline_id) {
2✔
99
  std::scoped_lock lock{pipelines_mutex_};
4✔
100
  PipelineDescriptor descriptor = getPipelineById(pipeline_id);
4✔
101
  filesystem_.deleteModel(descriptor.filename);
2✔
102
  pipelines_.remove_if([pipeline_id](const auto& pipeline) { return pipeline.id == pipeline_id; });
4✔
103
}
2✔
104

105
Pipeline::PipelineMetrics PipelineManager::getPipelineMetrics(uint32_t pipeline_id) {
×
106
  std::scoped_lock lock(metrics_mutex_);
×
107
  auto pipeline_metrics = metrics_.find(pipeline_id);
×
108
  if (pipeline_metrics != metrics_.end()) {
×
109
    return pipeline_metrics->second;
×
110
  }
111
  return Pipeline::PipelineMetrics{};
×
112
}
113

114
nlohmann::json PipelineManager::getJsonForPipeline(uint32_t pipeline_id) {
3✔
115
  std::scoped_lock lock{pipelines_mutex_};
6✔
116
  PipelineDescriptor descriptor = getPipelineById(pipeline_id);
6✔
117
  return deconstructPipelineToJson(descriptor.pipeline);
6✔
118
}
119

120
void PipelineManager::checkForParameterPresenceInPipeJson(
6✔
121
    const nlohmann::detail::iteration_proxy_value<nlohmann::json::basic_json::const_iterator>& json,
122
    const std::vector<std::string>& parameters) {
123
  if (parameters.empty()) {
6✔
124
    return;
×
125
  }
126
  const std::string& pipe_name = json.key();
6✔
127
  if (!json.value().contains("parameters")) {
6✔
128
    std::string error = "Parameters not defined in model configuration for pipe '";
×
129
    error += pipe_name;
×
130
    error += "'";
×
131
    throw std::invalid_argument(error);
×
132
  }
133
  for (const std::string& parameter : parameters) {
34✔
134
    if (!json.value()["parameters"].contains(parameter)) {
28✔
135
      std::string error = "Required parameter '";
×
136
      error += parameter;
×
137
      error += "' not defined in model configuration for pipe '";
×
138
      error += pipe_name;
×
139
      error += "'";
×
140
      throw std::invalid_argument(error);
×
141
    }
142
  }
143
}
144

145
void PipelineManager::checkForKeyPresenceInJson(const nlohmann::json& json,
7✔
146
                                                const std::vector<std::string>& keys) {
147
  if (keys.empty()) {
7✔
148
    return;
×
149
  }
150
  for (const std::string& key : keys) {
26✔
151
    if (!json.contains(key)) {
19✔
152
      std::string error = "Required key '";
×
153
      error += key;
×
154
      error += "' not defined";
×
155
      throw std::invalid_argument(error);
×
156
    }
157
  }
158
}
159

160
std::vector<std::tuple<std::shared_ptr<Evaluator>, double, bool>>
161
PipelineManager::constructEvaluatorsFromJson(const nlohmann::json& json) {
3✔
162
  std::vector<std::tuple<std::shared_ptr<Evaluator>, double, bool>> evaluators;
3✔
163
  for (const auto& evaluator_json : json.items()) {
7✔
164
    checkForKeyPresenceInJson(evaluator_json.value(), {"type", "weight", "invert_logic"});
12✔
165
    const std::string type = evaluator_json.value()["type"].get<std::string>();
6✔
166

167
    std::shared_ptr<Evaluator> evaluator = nullptr;
6✔
168
    double weight = 0;
3✔
169
    bool invert_logic = false;
3✔
170

171
    if (type == "AggregationEvaluator") {
3✔
172
      evaluator = constructAggregationEvaluatorFromJson(evaluator_json.value());
×
173
      weight = evaluator_json.value()["weight"].get<double>();
×
174
      invert_logic = evaluator_json.value()["invert_logic"].get<bool>();
×
175
    } else if (type == "MazeEvaluator") {
3✔
176
      evaluator = constructMazeEvaluatorFromJson(evaluator_json.value());
2✔
177
      weight = evaluator_json.value()["weight"].get<double>();
2✔
178
      invert_logic = evaluator_json.value()["invert_logic"].get<bool>();
2✔
179
    } else {
180
      throw std::invalid_argument("Invalid evaluator type: " + type);
1✔
181
    }
182

183
    evaluators.emplace_back(evaluator, weight, invert_logic);
2✔
184
  }
185
  return evaluators;
2✔
186
}
187

188
std::shared_ptr<Evaluator>
189
PipelineManager::constructAggregationEvaluatorFromJson(const nlohmann::json& json) {
×
190
  auto evaluator = std::make_shared<AggregationEvaluator>();
×
191
  if (json.contains("parameters") && json["parameters"].contains("evaluators")) {
×
192
    const auto evaluator_triplets = constructEvaluatorsFromJson(json["parameters"]["evaluators"]);
×
193
    for (const auto& [sub_evaluator, weight, invert_logic] : evaluator_triplets) {
×
194
      std::dynamic_pointer_cast<AggregationEvaluator>(evaluator)->addEvaluator(
×
195
          sub_evaluator, weight, invert_logic);
×
196
    }
197
  }
198
  return evaluator;
×
199
}
200

201
std::shared_ptr<Evaluator>
202
PipelineManager::constructMazeEvaluatorFromJson(const nlohmann::json& json) {
2✔
203
  checkForKeyPresenceInJson(json, {"parameters"});
4✔
204
  checkForKeyPresenceInJson(json["parameters"], {"rows", "cols", "difficulty", "max_steps"});
10✔
205
  const uint32_t rows = json["parameters"]["rows"].get<uint32_t>();
2✔
206
  const uint32_t cols = json["parameters"]["cols"].get<uint32_t>();
2✔
207
  const double difficulty = json["parameters"]["difficulty"].get<double>();
2✔
208
  const uint32_t max_steps = json["parameters"]["max_steps"].get<uint32_t>();
2✔
209
  return std::make_shared<MazeEvaluator>(rows, cols, difficulty, max_steps);
2✔
210
}
211

212
std::shared_ptr<Pipeline> PipelineManager::constructPipelineFromJson(const nlohmann::json& json) {
8✔
213
  std::shared_ptr<Pipeline> pipeline = std::make_shared<Pipeline>();
8✔
214
  std::map<std::string, std::shared_ptr<Pipe>, std::less<>> created_pipes;
16✔
215
  if (json.contains("pipes") && !json["pipes"].is_null()) {
8✔
216
    for (const auto& pipe : json["pipes"].items()) {
18✔
217
      const std::string& pipe_name = pipe.key();
7✔
218
      if (!pipe.value().contains("type") || pipe.value()["type"].is_null()) {
7✔
219
        throw std::invalid_argument("Type must be defined for pipe '" + pipe_name + "'");
1✔
220
      }
221
      const std::string& pipe_type = pipe.value()["type"].get<std::string>();
12✔
222

223
      if (pipe_type == "ProgramFactoryPipe") {
6✔
224
        checkForParameterPresenceInPipeJson(pipe,
14✔
225
                                            {"factory",
226
                                             "max_candidates",
227
                                             "max_size",
228
                                             "memory_variables",
229
                                             "string_table_items",
230
                                             "string_table_item_length"});
12✔
231
        const uint32_t max_candidates =
232
            pipe.value()["parameters"]["max_candidates"].get<uint32_t>();
2✔
233
        const uint32_t max_size = pipe.value()["parameters"]["max_size"].get<uint32_t>();
2✔
234
        const uint32_t memory_variables =
235
            pipe.value()["parameters"]["memory_variables"].get<uint32_t>();
2✔
236
        const uint32_t string_table_items =
237
            pipe.value()["parameters"]["string_table_items"].get<uint32_t>();
2✔
238
        const uint32_t string_table_item_length =
239
            pipe.value()["parameters"]["string_table_item_length"].get<uint32_t>();
2✔
240

241
        const std::string factory_type = pipe.value()["parameters"]["factory"].get<std::string>();
4✔
242
        std::shared_ptr<ProgramFactoryBase> factory = nullptr;
4✔
243
        if (factory_type == "RandomProgramFactory") {
2✔
244
          factory = std::make_shared<RandomProgramFactory>();
1✔
245
        } else {
246
          throw std::invalid_argument("Invalid program factory type '" + factory_type + "'");
1✔
247
        }
248

249
        created_pipes[pipe_name] = std::make_shared<ProgramFactoryPipe>(max_candidates,
2✔
250
                                                                        max_size,
251
                                                                        memory_variables,
252
                                                                        string_table_items,
253
                                                                        string_table_item_length,
254
                                                                        factory);
1✔
255
        pipeline->addPipe(pipe_name, created_pipes[pipe_name]);
1✔
256
      } else if (pipe_type == "NullSinkPipe") {
4✔
257
        checkForParameterPresenceInPipeJson(pipe, {"max_candidates"});
2✔
258
        const uint32_t max_candidates =
259
            pipe.value()["parameters"]["max_candidates"].get<uint32_t>();
1✔
260
        created_pipes[pipe_name] = std::make_shared<NullSinkPipe>(max_candidates);
1✔
261
        pipeline->addPipe(pipe_name, created_pipes[pipe_name]);
1✔
262
      } else if (pipe_type == "EvaluatorPipe") {
3✔
263
        checkForParameterPresenceInPipeJson(pipe,
18✔
264
                                            {"evaluators",
265
                                             "max_candidates",
266
                                             "memory_variables",
267
                                             "string_table_items",
268
                                             "string_table_item_length"});
15✔
269
        const uint32_t max_candidates =
270
            pipe.value()["parameters"]["max_candidates"].get<uint32_t>();
3✔
271
        const uint32_t memory_variables =
272
            pipe.value()["parameters"]["memory_variables"].get<uint32_t>();
3✔
273
        const uint32_t string_table_items =
274
            pipe.value()["parameters"]["string_table_items"].get<uint32_t>();
3✔
275
        const uint32_t string_table_item_length =
276
            pipe.value()["parameters"]["string_table_item_length"].get<uint32_t>();
3✔
277

278
        auto evaluator_pipe = std::make_shared<EvaluatorPipe>(
279
            max_candidates, memory_variables, string_table_items, string_table_item_length);
6✔
280
        created_pipes[pipe_name] = evaluator_pipe;
3✔
281

282
        const auto evaluator_triplets =
283
            constructEvaluatorsFromJson(pipe.value()["parameters"]["evaluators"]);
5✔
284
        for (const auto& evaluator_triplet : evaluator_triplets) {
4✔
285
          std::shared_ptr<Evaluator> evaluator = nullptr;
4✔
286
          double weight = 0.0;
2✔
287
          bool invert_logic = false;
2✔
288
          std::tie(evaluator, weight, invert_logic) = evaluator_triplet;
2✔
289
          evaluator_pipe->addEvaluator(evaluator, weight, invert_logic);
2✔
290
        }
291

292
        pipeline->addPipe(pipe_name, created_pipes[pipe_name]);
2✔
293
      }
294
    }
295
  }
296
  if (json.contains("connections") && !json["connections"].is_null()) {
5✔
297
    for (const auto& connection : json["connections"].items()) {
×
298
      if (!connection.value().contains("source_pipe")) {
×
299
        throw std::invalid_argument("source_pipe parameter required in connection");
×
300
      }
301
      if (!connection.value().contains("source_slot")) {
×
302
        throw std::invalid_argument("source_slot parameter required in connection");
×
303
      }
304
      if (!connection.value().contains("destination_pipe")) {
×
305
        throw std::invalid_argument("destination_pipe parameter required in connection");
×
306
      }
307
      if (!connection.value().contains("destination_slot")) {
×
308
        throw std::invalid_argument("destination_slot parameter required in connection");
×
309
      }
310
      if (!connection.value().contains("buffer_size")) {
×
311
        throw std::invalid_argument("buffer_size parameter required in connection");
×
312
      }
313
      const std::string source_pipe = connection.value()["source_pipe"].get<std::string>();
×
314
      const uint32_t source_slot = connection.value()["source_slot"].get<uint32_t>();
×
315
      const std::string destination_pipe =
316
          connection.value()["destination_pipe"].get<std::string>();
×
317
      const uint32_t destination_slot = connection.value()["destination_slot"].get<uint32_t>();
×
318
      const uint32_t buffer_size = connection.value()["buffer_size"].get<uint32_t>();
×
319

320
      if (created_pipes.find(source_pipe) == created_pipes.end()) {
×
321
        throw std::invalid_argument("Source pipe '" + source_pipe + "' not found");
×
322
      }
323
      if (created_pipes.find(destination_pipe) == created_pipes.end()) {
×
324
        throw std::invalid_argument("Destination pipe '" + destination_pipe + "' not found");
×
325
      }
326

327
      pipeline->connectPipes(created_pipes[source_pipe],
×
328
                             source_slot,
329
                             created_pipes[destination_pipe],
×
330
                             destination_slot,
331
                             buffer_size);
332
    }
333
  }
334
  return pipeline;
10✔
335
}
336

337
nlohmann::json PipelineManager::deconstructEvaluatorsToJson(
2✔
338
    const std::vector<AggregationEvaluator::EvaluatorDescription>& descriptions) {
339
  nlohmann::json evaluators = {};
2✔
340

341
  for (const auto& description : descriptions) {
4✔
342
    nlohmann::json evaluator = nlohmann::json::object();
4✔
343
    evaluator["weight"] = description.weight;
2✔
344
    evaluator["invert_logic"] = description.invert_logic;
2✔
345

346
    if (const auto aggr_eval =
2✔
347
            std::dynamic_pointer_cast<AggregationEvaluator>(description.evaluator)) {
4✔
348
      evaluator["type"] = "AggregationEvaluator";
×
349
      evaluator["parameters"]["evaluators"] =
×
350
          deconstructEvaluatorsToJson(aggr_eval->getEvaluators());
×
351
    } else if (const auto maze_eval =
2✔
352
                   std::dynamic_pointer_cast<MazeEvaluator>(description.evaluator)) {
4✔
353
      evaluator["type"] = "MazeEvaluator";
2✔
354
      evaluator["parameters"]["rows"] = maze_eval->getRows();
2✔
355
      evaluator["parameters"]["cols"] = maze_eval->getCols();
2✔
356
      evaluator["parameters"]["difficulty"] = maze_eval->getDifficulty();
2✔
357
      evaluator["parameters"]["max_steps"] = maze_eval->getMaxSteps();
2✔
358
    }
359

360
    evaluators.push_back(std::move(evaluator));
2✔
361
  }
362

363
  return evaluators;
2✔
364
}
365

366
nlohmann::json
367
PipelineManager::deconstructPipelineToJson(const std::shared_ptr<Pipeline>& pipeline) {
6✔
368
  nlohmann::json value;
6✔
369

370
  for (const auto& pipe : pipeline->getPipes()) {
9✔
371
    nlohmann::json pipe_json;
3✔
372

373
    if (const auto evaluator_pipe = std::dynamic_pointer_cast<EvaluatorPipe>(pipe->pipe)) {
6✔
374
      pipe_json["type"] = "EvaluatorPipe";
2✔
375
      pipe_json["parameters"]["max_candidates"] = pipe->pipe->getMaxCandidates();
2✔
376
      pipe_json["parameters"]["memory_variables"] = evaluator_pipe->getMemorySize();
2✔
377
      pipe_json["parameters"]["string_table_item_length"] =
2✔
378
          evaluator_pipe->getStringTableItemLength();
4✔
379
      pipe_json["parameters"]["string_table_items"] = evaluator_pipe->getStringTableSize();
2✔
380
      pipe_json["parameters"]["evaluators"] =
2✔
381
          deconstructEvaluatorsToJson(evaluator_pipe->getEvaluators());
4✔
382
    } else if (std::dynamic_pointer_cast<EvolutionPipe>(pipe->pipe)) {
1✔
383
      pipe_json["type"] = "EvolutionPipe";
×
384
    } else if (std::dynamic_pointer_cast<NullSinkPipe>(pipe->pipe)) {
1✔
385
      pipe_json["type"] = "NullSinkPipe";
1✔
386
      pipe_json["parameters"]["max_candidates"] = pipe->pipe->getMaxCandidates();
1✔
387
    } else if (auto spec_pipe = std::dynamic_pointer_cast<ProgramFactoryPipe>(pipe->pipe)) {
×
388
      pipe_json["type"] = "ProgramFactoryPipe";
×
389
      pipe_json["parameters"]["max_candidates"] = pipe->pipe->getMaxCandidates();
×
390
      pipe_json["parameters"]["max_size"] = spec_pipe->getMaxSize();
×
391
      pipe_json["parameters"]["memory_variables"] = spec_pipe->getMemorySize();
×
392
      pipe_json["parameters"]["string_table_item_length"] = spec_pipe->getStringTableItemLength();
×
393
      pipe_json["parameters"]["string_table_items"] = spec_pipe->getStringTableSize();
×
394

395
      const auto factory = spec_pipe->getFactory();
×
396
      if (std::dynamic_pointer_cast<RandomProgramFactory>(factory)) {
×
397
        pipe_json["parameters"]["factory"] = "RandomProgramFactory";
×
398
      } else {
399
        pipe_json["parameters"]["factory"] = "Unknown";
×
400
      }
401
    } else {
402
      pipe_json["type"] = "Unknown";
×
403
    }
404

405
    value["pipes"][pipe->name] = std::move(pipe_json);
3✔
406
  }
407

408
  nlohmann::json connections_json = {};
12✔
409
  for (const auto& connection : pipeline->getConnections()) {
6✔
410
    nlohmann::json connection_json;
×
411
    connection_json["buffer_size"] = connection->buffer_size;
×
412
    connection_json["destination_pipe"] = connection->destination_pipe->name;
×
413
    connection_json["destination_slot"] = connection->destination_slot_index;
×
414
    connection_json["source_pipe"] = connection->source_pipe->name;
×
415
    connection_json["source_slot"] = connection->source_slot_index;
×
416

417
    connections_json.push_back(std::move(connection_json));
×
418
  }
419
  value["connections"] = std::move(connections_json);
6✔
420

421
  return value;
12✔
422
}
423

424
uint32_t PipelineManager::getFreeId() const {
19✔
425
  uint32_t new_id = -1;
19✔
426
  std::list<PipelineDescriptor>::const_iterator iter;
19✔
427
  do {
2✔
428
    new_id++;
21✔
429
    iter = std::find_if(pipelines_.begin(), pipelines_.end(), [new_id](const auto& pipeline) {
4✔
430
      return pipeline.id == new_id;
4✔
431
    });
21✔
432
  } while (iter != pipelines_.end());
21✔
433
  return new_id;
19✔
434
}
435

436
void PipelineManager::metricsCollectorWorker() {
31✔
437
  std::unordered_map<uint32_t, std::deque<Pipeline::PipelineMetrics>> metrics_cache;
62✔
438
  while (should_run_metrics_collector_) {
61✔
439
    // Remove the oldest element from the cache for each pipeline if maximum window size is reached.
440
    for (auto& metrics_pair : metrics_cache) {
30✔
441
      if (metrics_pair.second.size() >= metrics_window_size_) {
×
442
        metrics_pair.second.pop_back();
×
443
      }
444
    }
445

446
    // Collect the current metrics and push them into the cache.
447
    {
448
      std::scoped_lock lock(pipelines_mutex_);
60✔
449
      for (auto& descriptor : pipelines_) {
35✔
450
        metrics_cache[descriptor.id].push_front(descriptor.pipeline->getMetrics());
5✔
451
      }
452
    }
453

454
    // Calculate the current metrics.
455
    std::unordered_map<uint32_t, Pipeline::PipelineMetrics> metrics;
60✔
456
    for (auto& metrics_pair : metrics_cache) {
35✔
457
      const uint32_t pipeline_id = metrics_pair.first;
5✔
458
      const std::deque<Pipeline::PipelineMetrics>& metrics_history = metrics_pair.second;
5✔
459

460
      if (metrics_history.empty()) {
5✔
461
        continue;
×
462
      }
463

464
      const std::chrono::duration<double> elapsed_time =
465
          std::chrono::system_clock::now() - metrics_history.back().measure_time_start;
5✔
466
      const double duration_seconds = elapsed_time.count();
5✔
467

468
      Pipeline::PipelineMetrics pipeline_metrics;
10✔
469
      pipeline_metrics.measure_time_start = std::chrono::system_clock::now();
5✔
470

471
      std::unordered_map<std::string, Pipeline::PipeMetrics>& pipe_counters =
5✔
472
          pipeline_metrics.pipes;
473

474
      // Sum up all execution counts, inputs received, and outputs sent from all history items
475
      for (const auto& pipeline_metrics : metrics_history) {
10✔
476
        for (const auto& pipe_pair : pipeline_metrics.pipes) {
5✔
477
          const std::string& pipe_name = pipe_pair.first;
×
478
          const Pipeline::PipeMetrics& current_pipe_metrics = pipe_pair.second;
×
479
          Pipeline::PipeMetrics& current_counter = pipe_counters[pipe_name];
×
480

481
          current_counter.execution_count += current_pipe_metrics.execution_count;
×
482

483
          for (const auto& input_pair : current_pipe_metrics.inputs_received) {
×
484
            current_counter.inputs_received[input_pair.first] += input_pair.second;
×
485
          }
486
          for (const auto& output_pair : current_pipe_metrics.outputs_sent) {
×
487
            current_counter.outputs_sent[output_pair.first] += output_pair.second;
×
488
          }
489
        }
490
      }
491

492
      // Divide by the count of history items and convert to per-second rates
493
      for (const auto& pipe_pair : pipe_counters) {
5✔
494
        const std::string& pipe_name = pipe_pair.first;
×
495
        const Pipeline::PipeMetrics& counters = pipe_pair.second;
×
496
        Pipeline::PipeMetrics& averaged_metrics = pipeline_metrics.pipes[pipe_name];
×
497

498
        averaged_metrics.execution_count = counters.execution_count / duration_seconds;
×
499

500
        for (const auto& input_pair : counters.inputs_received) {
×
501
          averaged_metrics.inputs_received[input_pair.first] = input_pair.second / duration_seconds;
×
502
        }
503
        for (const auto& output_pair : counters.outputs_sent) {
×
504
          averaged_metrics.outputs_sent[output_pair.first] = output_pair.second / duration_seconds;
×
505
        }
506
      }
507

508
      // Store the resulting metrics for the pipeline.
509
      metrics[pipeline_id] = pipeline_metrics;
5✔
510
    }
511

512
    // Store the resulting metrics.
513
    {
514
      std::scoped_lock lock(metrics_mutex_);
60✔
515
      metrics_ = metrics;
30✔
516
    }
517

518
    // Limit cycle time.
519
    std::chrono::milliseconds interval_time(metrics_interval_time_);
30✔
520
    std::this_thread::sleep_for(interval_time);
30✔
521
  }
522
}
31✔
523

524
} // namespace beast
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc