• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dedicate-project / beast / 5769674b-2abc-4901-862a-bb80b2dc41c5

pending completion
5769674b-2abc-4901-862a-bb80b2dc41c5

Pull #9

circleci

fairlight1337
Introduced metrics functions for pipelines
Pull Request #9: Adding pipeline applications

1281 of 1281 new or added lines in 23 files covered. (100.0%)

2873 of 3126 relevant lines covered (91.91%)

14977.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.02
/src/pipeline_manager.cpp
1
#include <beast/pipeline_manager.hpp>
2

3
// Internal
4
#include <beast/pipes/evaluator_pipe.hpp>
5
#include <beast/pipes/evolution_pipe.hpp>
6
#include <beast/pipes/null_sink_pipe.hpp>
7
#include <beast/pipes/program_factory_pipe.hpp>
8

9
#include <beast/program_factory_base.hpp>
10
#include <beast/random_program_factory.hpp>
11

12
#include <beast/evaluators/maze_evaluator.hpp>
13

14
namespace beast {
15

16
PipelineManager::PipelineManager(const std::string& storage_path) : filesystem_(storage_path) {
31✔
17
  std::scoped_lock lock{pipelines_mutex_};
62✔
18
  for (const auto& model : filesystem_.loadModels()) {
32✔
19
    PipelineDescriptor descriptor;
2✔
20
    descriptor.id = getFreeId();
1✔
21
    descriptor.name = model["content"]["name"];
1✔
22
    descriptor.filename = model["filename"];
1✔
23
    descriptor.pipeline = constructPipelineFromJson(model["content"]["model"]);
1✔
24
    descriptor.metadata = model["content"]["metadata"];
1✔
25
    pipelines_.push_back(std::move(descriptor));
1✔
26
  }
27
}
31✔
28

29
uint32_t PipelineManager::createPipeline(const std::string& name) {
18✔
30
  std::scoped_lock lock{pipelines_mutex_};
36✔
31
  nlohmann::json model;
36✔
32
  model["pipes"] = nlohmann::json::array();
18✔
33
  model["connections"] = {};
18✔
34
  const std::string filename = filesystem_.saveModel(name, model);
36✔
35
  const uint32_t new_id = getFreeId();
18✔
36

37
  PipelineDescriptor descriptor;
18✔
38
  descriptor.id = new_id;
18✔
39
  descriptor.name = name;
18✔
40
  descriptor.filename = filename;
18✔
41
  descriptor.pipeline = std::make_shared<Pipeline>();
18✔
42

43
  pipelines_.push_back(descriptor);
18✔
44

45
  return new_id;
36✔
46
}
47

48
void PipelineManager::savePipeline(uint32_t pipeline_id) {
×
49
  const auto& descriptor = getPipelineById(pipeline_id);
×
50
  const auto model = deconstructPipelineToJson(descriptor.pipeline);
×
51
  filesystem_.updateModel(descriptor.filename, descriptor.name, model, descriptor.metadata);
×
52
}
×
53

54
PipelineManager::PipelineDescriptor& PipelineManager::getPipelineById(uint32_t pipeline_id) {
28✔
55
  for (PipelineDescriptor& descriptor : pipelines_) {
28✔
56
    if (descriptor.id == pipeline_id) {
23✔
57
      return descriptor;
23✔
58
    }
59
  }
60
  throw std::invalid_argument("Pipeline with this ID not found: " + std::to_string(pipeline_id));
5✔
61
}
62

63
const std::list<PipelineManager::PipelineDescriptor>& PipelineManager::getPipelines() const {
3✔
64
  return pipelines_;
3✔
65
}
66

67
void PipelineManager::updatePipelineName(uint32_t pipeline_id, const std::string_view new_name) {
1✔
68
  std::scoped_lock lock{pipelines_mutex_};
2✔
69
  PipelineDescriptor& descriptor = getPipelineById(pipeline_id);
1✔
70
  descriptor.name = new_name;
1✔
71
}
1✔
72

73
void PipelineManager::deletePipeline(uint32_t pipeline_id) {
2✔
74
  std::scoped_lock lock{pipelines_mutex_};
4✔
75
  PipelineDescriptor descriptor = getPipelineById(pipeline_id);
4✔
76
  filesystem_.deleteModel(descriptor.filename);
2✔
77
  pipelines_.remove_if([pipeline_id](const auto& pipeline) { return pipeline.id == pipeline_id; });
4✔
78
}
2✔
79

80
Pipeline::PipelineMetrics PipelineManager::getPipelineMetrics(uint32_t pipeline_id) {
×
81
  return getPipelineById(pipeline_id).pipeline->getMetrics();
×
82
}
83

84
nlohmann::json PipelineManager::getJsonForPipeline(uint32_t pipeline_id) {
3✔
85
  std::scoped_lock lock{pipelines_mutex_};
6✔
86
  PipelineDescriptor descriptor = getPipelineById(pipeline_id);
6✔
87
  return deconstructPipelineToJson(descriptor.pipeline);
6✔
88
}
89

90
void PipelineManager::checkForParameterPresenceInPipeJson(
5✔
91
    const nlohmann::detail::iteration_proxy_value<nlohmann::json::basic_json::const_iterator>& json,
92
    const std::vector<std::string>& parameters) {
93
  if (parameters.empty()) {
5✔
94
    return;
×
95
  }
96
  const std::string& pipe_name = json.key();
5✔
97
  if (!json.value().contains("parameters")) {
5✔
98
    std::string error = "Parameters not defined in model configuration for pipe '";
×
99
    error += pipe_name;
×
100
    error += "'";
×
101
    throw std::invalid_argument(error);
×
102
  }
103
  for (const std::string& parameter : parameters) {
32✔
104
    if (!json.value()["parameters"].contains(parameter)) {
27✔
105
      std::string error = "Required parameter '";
×
106
      error += parameter;
×
107
      error += "' not defined in model configuration for pipe '";
×
108
      error += pipe_name;
×
109
      error += "'";
×
110
      throw std::invalid_argument(error);
×
111
    }
112
  }
113
}
114

115
void PipelineManager::checkForKeyPresenceInJson(const nlohmann::json& json,
7✔
116
                                                const std::vector<std::string>& keys) {
117
  if (keys.empty()) {
7✔
118
    return;
×
119
  }
120
  for (const std::string& key : keys) {
26✔
121
    if (!json.contains(key)) {
19✔
122
      std::string error = "Required key '";
×
123
      error += key;
×
124
      error += "' not defined";
×
125
      throw std::invalid_argument(error);
×
126
    }
127
  }
128
}
129

130
std::vector<std::tuple<std::shared_ptr<Evaluator>, double, bool>>
131
PipelineManager::constructEvaluatorsFromJson(const nlohmann::json& json) {
3✔
132
  std::vector<std::tuple<std::shared_ptr<Evaluator>, double, bool>> evaluators;
3✔
133
  for (const auto& evaluator_json : json.items()) {
7✔
134
    checkForKeyPresenceInJson(evaluator_json.value(), {"type", "weight", "invert_logic"});
12✔
135
    const std::string type = evaluator_json.value()["type"].get<std::string>();
6✔
136

137
    std::shared_ptr<Evaluator> evaluator = nullptr;
6✔
138
    double weight = 0;
3✔
139
    bool invert_logic = false;
3✔
140

141
    if (type == "AggregationEvaluator") {
3✔
142
      evaluator = constructAggregationEvaluatorFromJson(evaluator_json.value());
×
143
      weight = evaluator_json.value()["weight"].get<double>();
×
144
      invert_logic = evaluator_json.value()["invert_logic"].get<bool>();
×
145
    } else if (type == "MazeEvaluator") {
3✔
146
      evaluator = constructMazeEvaluatorFromJson(evaluator_json.value());
2✔
147
      weight = evaluator_json.value()["weight"].get<double>();
2✔
148
      invert_logic = evaluator_json.value()["invert_logic"].get<bool>();
2✔
149
    } else {
150
      throw std::invalid_argument("Invalid evaluator type: " + type);
1✔
151
    }
152

153
    evaluators.emplace_back(evaluator, weight, invert_logic);
2✔
154
  }
155
  return evaluators;
2✔
156
}
157

158
std::shared_ptr<Evaluator>
159
PipelineManager::constructAggregationEvaluatorFromJson(const nlohmann::json& json) {
×
160
  auto evaluator = std::make_shared<AggregationEvaluator>();
×
161
  if (json.contains("parameters") && json["parameters"].contains("evaluators")) {
×
162
    const auto evaluator_triplets = constructEvaluatorsFromJson(json["parameters"]["evaluators"]);
×
163
    for (const auto& [sub_evaluator, weight, invert_logic] : evaluator_triplets) {
×
164
      std::dynamic_pointer_cast<AggregationEvaluator>(evaluator)->addEvaluator(
×
165
          sub_evaluator, weight, invert_logic);
×
166
    }
167
  }
168
  return evaluator;
×
169
}
170

171
std::shared_ptr<Evaluator>
172
PipelineManager::constructMazeEvaluatorFromJson(const nlohmann::json& json) {
2✔
173
  checkForKeyPresenceInJson(json, {"parameters"});
4✔
174
  checkForKeyPresenceInJson(json["parameters"], {"rows", "cols", "difficulty", "max_steps"});
10✔
175
  const uint32_t rows = json["parameters"]["rows"].get<uint32_t>();
2✔
176
  const uint32_t cols = json["parameters"]["cols"].get<uint32_t>();
2✔
177
  const double difficulty = json["parameters"]["difficulty"].get<double>();
2✔
178
  const uint32_t max_steps = json["parameters"]["max_steps"].get<uint32_t>();
2✔
179
  return std::make_shared<MazeEvaluator>(rows, cols, difficulty, max_steps);
2✔
180
}
181

182
std::shared_ptr<Pipeline> PipelineManager::constructPipelineFromJson(const nlohmann::json& json) {
8✔
183
  std::shared_ptr<Pipeline> pipeline = std::make_shared<Pipeline>();
8✔
184
  std::map<std::string, std::shared_ptr<Pipe>, std::less<>> created_pipes;
16✔
185
  if (json.contains("pipes") && !json["pipes"].is_null()) {
8✔
186
    for (const auto& pipe : json["pipes"].items()) {
18✔
187
      const std::string& pipe_name = pipe.key();
7✔
188
      if (!pipe.value().contains("type") || pipe.value()["type"].is_null()) {
7✔
189
        throw std::invalid_argument("Type must be defined for pipe '" + pipe_name + "'");
1✔
190
      }
191
      const std::string& pipe_type = pipe.value()["type"].get<std::string>();
12✔
192

193
      if (pipe_type == "ProgramFactoryPipe") {
6✔
194
        checkForParameterPresenceInPipeJson(pipe,
14✔
195
                                            {"factory",
196
                                             "max_candidates",
197
                                             "max_size",
198
                                             "memory_variables",
199
                                             "string_table_items",
200
                                             "string_table_item_length"});
12✔
201
        const uint32_t max_candidates =
202
            pipe.value()["parameters"]["max_candidates"].get<uint32_t>();
2✔
203
        const uint32_t max_size = pipe.value()["parameters"]["max_size"].get<uint32_t>();
2✔
204
        const uint32_t memory_variables =
205
            pipe.value()["parameters"]["memory_variables"].get<uint32_t>();
2✔
206
        const uint32_t string_table_items =
207
            pipe.value()["parameters"]["string_table_items"].get<uint32_t>();
2✔
208
        const uint32_t string_table_item_length =
209
            pipe.value()["parameters"]["string_table_item_length"].get<uint32_t>();
2✔
210

211
        const std::string factory_type = pipe.value()["parameters"]["factory"].get<std::string>();
4✔
212
        std::shared_ptr<ProgramFactoryBase> factory = nullptr;
4✔
213
        if (factory_type == "RandomProgramFactory") {
2✔
214
          factory = std::make_shared<RandomProgramFactory>();
1✔
215
        } else {
216
          throw std::invalid_argument("Invalid program factory type '" + factory_type + "'");
1✔
217
        }
218

219
        created_pipes[pipe_name] = std::make_shared<ProgramFactoryPipe>(max_candidates,
2✔
220
                                                                        max_size,
221
                                                                        memory_variables,
222
                                                                        string_table_items,
223
                                                                        string_table_item_length,
224
                                                                        factory);
1✔
225
        pipeline->addPipe(pipe_name, created_pipes[pipe_name]);
1✔
226
      } else if (pipe_type == "NullSinkPipe") {
4✔
227
        created_pipes[pipe_name] = std::make_shared<NullSinkPipe>();
1✔
228
        pipeline->addPipe(pipe_name, created_pipes[pipe_name]);
1✔
229
      } else if (pipe_type == "EvaluatorPipe") {
3✔
230
        checkForParameterPresenceInPipeJson(pipe,
18✔
231
                                            {"evaluators",
232
                                             "max_candidates",
233
                                             "memory_variables",
234
                                             "string_table_items",
235
                                             "string_table_item_length"});
15✔
236
        const uint32_t max_candidates =
237
            pipe.value()["parameters"]["max_candidates"].get<uint32_t>();
3✔
238
        const uint32_t memory_variables =
239
            pipe.value()["parameters"]["memory_variables"].get<uint32_t>();
3✔
240
        const uint32_t string_table_items =
241
            pipe.value()["parameters"]["string_table_items"].get<uint32_t>();
3✔
242
        const uint32_t string_table_item_length =
243
            pipe.value()["parameters"]["string_table_item_length"].get<uint32_t>();
3✔
244

245
        auto evaluator_pipe = std::make_shared<EvaluatorPipe>(
246
            max_candidates, memory_variables, string_table_items, string_table_item_length);
6✔
247
        created_pipes[pipe_name] = evaluator_pipe;
3✔
248

249
        const auto evaluator_triplets =
250
            constructEvaluatorsFromJson(pipe.value()["parameters"]["evaluators"]);
5✔
251
        for (const auto& evaluator_triplet : evaluator_triplets) {
4✔
252
          std::shared_ptr<Evaluator> evaluator = nullptr;
4✔
253
          double weight = 0.0;
2✔
254
          bool invert_logic = false;
2✔
255
          std::tie(evaluator, weight, invert_logic) = evaluator_triplet;
2✔
256
          evaluator_pipe->addEvaluator(evaluator, weight, invert_logic);
2✔
257
        }
258

259
        pipeline->addPipe(pipe_name, created_pipes[pipe_name]);
2✔
260
      }
261
    }
262
  }
263
  if (json.contains("connections") && !json["connections"].is_null()) {
5✔
264
    for (const auto& connection : json["connections"].items()) {
×
265
      if (!connection.value().contains("source_pipe")) {
×
266
        throw std::invalid_argument("source_pipe parameter required in connection");
×
267
      }
268
      if (!connection.value().contains("source_slot")) {
×
269
        throw std::invalid_argument("source_slot parameter required in connection");
×
270
      }
271
      if (!connection.value().contains("destination_pipe")) {
×
272
        throw std::invalid_argument("destination_pipe parameter required in connection");
×
273
      }
274
      if (!connection.value().contains("destination_slot")) {
×
275
        throw std::invalid_argument("destination_slot parameter required in connection");
×
276
      }
277
      if (!connection.value().contains("buffer_size")) {
×
278
        throw std::invalid_argument("buffer_size parameter required in connection");
×
279
      }
280
      const std::string source_pipe = connection.value()["source_pipe"].get<std::string>();
×
281
      const uint32_t source_slot = connection.value()["source_slot"].get<uint32_t>();
×
282
      const std::string destination_pipe =
283
          connection.value()["destination_pipe"].get<std::string>();
×
284
      const uint32_t destination_slot = connection.value()["destination_slot"].get<uint32_t>();
×
285
      const uint32_t buffer_size = connection.value()["buffer_size"].get<uint32_t>();
×
286

287
      if (created_pipes.find(source_pipe) == created_pipes.end()) {
×
288
        throw std::invalid_argument("Source pipe '" + source_pipe + "' not found");
×
289
      }
290
      if (created_pipes.find(destination_pipe) == created_pipes.end()) {
×
291
        throw std::invalid_argument("Destination pipe '" + destination_pipe + "' not found");
×
292
      }
293

294
      pipeline->connectPipes(created_pipes[source_pipe],
×
295
                             source_slot,
296
                             created_pipes[destination_pipe],
×
297
                             destination_slot,
298
                             buffer_size);
299
    }
300
  }
301
  return pipeline;
10✔
302
}
303

304
nlohmann::json PipelineManager::deconstructEvaluatorsToJson(
2✔
305
    const std::vector<AggregationEvaluator::EvaluatorDescription>& descriptions) {
306
  nlohmann::json evaluators = {};
2✔
307

308
  for (const auto& description : descriptions) {
4✔
309
    nlohmann::json evaluator = nlohmann::json::object();
4✔
310
    evaluator["weight"] = description.weight;
2✔
311
    evaluator["invert_logic"] = description.invert_logic;
2✔
312

313
    if (const auto aggr_eval =
2✔
314
            std::dynamic_pointer_cast<AggregationEvaluator>(description.evaluator)) {
4✔
315
      evaluator["type"] = "AggregationEvaluator";
×
316
      evaluator["parameters"]["evaluators"] =
×
317
          deconstructEvaluatorsToJson(aggr_eval->getEvaluators());
×
318
    } else if (const auto maze_eval =
2✔
319
                   std::dynamic_pointer_cast<MazeEvaluator>(description.evaluator)) {
4✔
320
      evaluator["type"] = "MazeEvaluator";
2✔
321
      evaluator["parameters"]["rows"] = maze_eval->getRows();
2✔
322
      evaluator["parameters"]["cols"] = maze_eval->getCols();
2✔
323
      evaluator["parameters"]["difficulty"] = maze_eval->getDifficulty();
2✔
324
      evaluator["parameters"]["max_steps"] = maze_eval->getMaxSteps();
2✔
325
    }
326

327
    evaluators.push_back(std::move(evaluator));
2✔
328
  }
329

330
  return evaluators;
2✔
331
}
332

333
nlohmann::json
334
PipelineManager::deconstructPipelineToJson(const std::shared_ptr<Pipeline>& pipeline) {
6✔
335
  nlohmann::json value;
6✔
336

337
  for (const auto& pipe : pipeline->getPipes()) {
9✔
338
    nlohmann::json pipe_json;
3✔
339

340
    if (const auto evaluator_pipe = std::dynamic_pointer_cast<EvaluatorPipe>(pipe->pipe)) {
6✔
341
      pipe_json["type"] = "EvaluatorPipe";
2✔
342
      pipe_json["parameters"]["max_candidates"] = pipe->pipe->getMaxCandidates();
2✔
343
      pipe_json["parameters"]["memory_variables"] = evaluator_pipe->getMemorySize();
2✔
344
      pipe_json["parameters"]["string_table_item_length"] =
2✔
345
          evaluator_pipe->getStringTableItemLength();
4✔
346
      pipe_json["parameters"]["string_table_items"] = evaluator_pipe->getStringTableSize();
2✔
347
      pipe_json["parameters"]["evaluators"] =
2✔
348
          deconstructEvaluatorsToJson(evaluator_pipe->getEvaluators());
4✔
349
    } else if (std::dynamic_pointer_cast<EvolutionPipe>(pipe->pipe)) {
1✔
350
      pipe_json["type"] = "EvolutionPipe";
×
351
    } else if (std::dynamic_pointer_cast<NullSinkPipe>(pipe->pipe)) {
1✔
352
      pipe_json["type"] = "NullSinkPipe";
1✔
353
    } else if (auto spec_pipe = std::dynamic_pointer_cast<ProgramFactoryPipe>(pipe->pipe)) {
×
354
      pipe_json["type"] = "ProgramFactoryPipe";
×
355
      pipe_json["parameters"]["max_candidates"] = pipe->pipe->getMaxCandidates();
×
356
      pipe_json["parameters"]["max_size"] = spec_pipe->getMaxSize();
×
357
      pipe_json["parameters"]["memory_variables"] = spec_pipe->getMemorySize();
×
358
      pipe_json["parameters"]["string_table_item_length"] = spec_pipe->getStringTableItemLength();
×
359
      pipe_json["parameters"]["string_table_items"] = spec_pipe->getStringTableSize();
×
360

361
      const auto factory = spec_pipe->getFactory();
×
362
      if (std::dynamic_pointer_cast<RandomProgramFactory>(factory)) {
×
363
        pipe_json["parameters"]["factory"] = "RandomProgramFactory";
×
364
      } else {
365
        pipe_json["parameters"]["factory"] = "Unknown";
×
366
      }
367
    } else {
368
      pipe_json["type"] = "Unknown";
×
369
    }
370

371
    value["pipes"][pipe->name] = std::move(pipe_json);
3✔
372
  }
373

374
  nlohmann::json connections_json = {};
12✔
375
  for (const auto& connection : pipeline->getConnections()) {
6✔
376
    nlohmann::json connection_json;
×
377
    connection_json["buffer_size"] = connection->buffer_size;
×
378
    connection_json["destination_pipe"] = connection->destination_pipe->name;
×
379
    connection_json["destination_slot"] = connection->destination_slot_index;
×
380
    connection_json["source_pipe"] = connection->source_pipe->name;
×
381
    connection_json["source_slot"] = connection->source_slot_index;
×
382

383
    connections_json.push_back(std::move(connection_json));
×
384
  }
385
  value["connections"] = std::move(connections_json);
6✔
386

387
  return value;
12✔
388
}
389

390
uint32_t PipelineManager::getFreeId() const {
19✔
391
  uint32_t new_id = -1;
19✔
392
  std::list<PipelineDescriptor>::const_iterator iter;
19✔
393
  do {
2✔
394
    new_id++;
21✔
395
    iter = std::find_if(pipelines_.begin(), pipelines_.end(), [new_id](const auto& pipeline) {
4✔
396
      return pipeline.id == new_id;
4✔
397
    });
21✔
398
  } while (iter != pipelines_.end());
21✔
399
  return new_id;
19✔
400
}
401

402
} // namespace beast
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc