• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dedicate-project / beast / 5908912e-2396-46c2-b03d-62e16fdb292e

pending completion
5908912e-2396-46c2-b03d-62e16fdb292e

Pull #9

circleci

fairlight1337
Added [[nodiscard]] attributes and fixed code smells
Pull Request #9: Adding pipeline applications

1195 of 1195 new or added lines in 21 files covered. (100.0%)

2835 of 3040 relevant lines covered (93.26%)

16628.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.71
/src/pipeline_manager.cpp
1
#include <beast/pipeline_manager.hpp>
2

3
// Internal
4
#include <beast/pipes/evaluator_pipe.hpp>
5
#include <beast/pipes/evolution_pipe.hpp>
6
#include <beast/pipes/null_sink_pipe.hpp>
7
#include <beast/pipes/program_factory_pipe.hpp>
8

9
#include <beast/program_factory_base.hpp>
10
#include <beast/random_program_factory.hpp>
11

12
#include <beast/evaluators/maze_evaluator.hpp>
13

14
namespace beast {
15

16
PipelineManager::PipelineManager(const std::string& storage_path) : filesystem_(storage_path) {
31✔
17
  std::scoped_lock lock{pipelines_mutex_};
62✔
18
  for (const auto& model : filesystem_.loadModels()) {
32✔
19
    PipelineDescriptor descriptor;
2✔
20
    descriptor.id = getFreeId();
1✔
21
    descriptor.name = model["content"]["name"];
1✔
22
    descriptor.filename = model["filename"];
1✔
23
    descriptor.pipeline = constructPipelineFromJson(model["content"]["model"]);
1✔
24
    descriptor.metadata = model["content"]["metadata"];
1✔
25
    pipelines_.push_back(std::move(descriptor));
1✔
26
  }
27
}
31✔
28

29
uint32_t PipelineManager::createPipeline(const std::string& name) {
18✔
30
  std::scoped_lock lock{pipelines_mutex_};
36✔
31
  nlohmann::json model;
36✔
32
  model["pipes"] = nlohmann::json::array();
18✔
33
  model["connections"] = {};
18✔
34
  // TODO(fairlight1337): Add more data to the model.
35
  const std::string filename = filesystem_.saveModel(name, model);
18✔
36
  const uint32_t new_id = getFreeId();
18✔
37
  pipelines_.push_back({new_id, name, filename, beast::Pipeline()});
18✔
38

39
  return new_id;
36✔
40
}
41

42
void PipelineManager::savePipeline(uint32_t pipeline_id) {
×
43
  const auto& descriptor = getPipelineById(pipeline_id);
×
44
  const auto model = deconstructPipelineToJson(descriptor.pipeline);
×
45
  filesystem_.updateModel(descriptor.filename, descriptor.name, model, descriptor.metadata);
×
46
}
×
47

48
PipelineManager::PipelineDescriptor& PipelineManager::getPipelineById(uint32_t pipeline_id) {
28✔
49
  for (PipelineDescriptor& descriptor : pipelines_) {
28✔
50
    if (descriptor.id == pipeline_id) {
23✔
51
      return descriptor;
23✔
52
    }
53
  }
54
  throw std::invalid_argument("Pipeline with this ID not found: " + std::to_string(pipeline_id));
5✔
55
}
56

57
const std::list<PipelineManager::PipelineDescriptor>& PipelineManager::getPipelines() const {
3✔
58
  return pipelines_;
3✔
59
}
60

61
void PipelineManager::updatePipelineName(uint32_t pipeline_id, const std::string_view new_name) {
1✔
62
  std::scoped_lock lock{pipelines_mutex_};
2✔
63
  PipelineDescriptor& descriptor = getPipelineById(pipeline_id);
1✔
64
  descriptor.name = new_name;
1✔
65
}
1✔
66

67
void PipelineManager::deletePipeline(uint32_t pipeline_id) {
2✔
68
  std::scoped_lock lock{pipelines_mutex_};
4✔
69
  PipelineDescriptor descriptor = getPipelineById(pipeline_id);
4✔
70
  filesystem_.deleteModel(descriptor.filename);
2✔
71
  pipelines_.remove_if([pipeline_id](const auto& pipeline) { return pipeline.id == pipeline_id; });
4✔
72
}
2✔
73

74
nlohmann::json PipelineManager::getJsonForPipeline(uint32_t pipeline_id) {
3✔
75
  std::scoped_lock lock{pipelines_mutex_};
6✔
76
  PipelineDescriptor descriptor = getPipelineById(pipeline_id);
6✔
77
  return deconstructPipelineToJson(descriptor.pipeline);
6✔
78
}
79

80
void PipelineManager::checkForParameterPresenceInPipeJson(
5✔
81
    const nlohmann::detail::iteration_proxy_value<nlohmann::json::basic_json::const_iterator>& json,
82
    const std::vector<std::string>& parameters) {
83
  if (parameters.empty()) {
5✔
84
    return;
×
85
  }
86
  const std::string& pipe_name = json.key();
5✔
87
  if (!json.value().contains("parameters")) {
5✔
88
    std::string error = "Parameters not defined in model configuration for pipe '";
×
89
    error += pipe_name;
×
90
    error += "'";
×
91
    throw std::invalid_argument(error);
×
92
  }
93
  for (const std::string& parameter : parameters) {
32✔
94
    if (!json.value()["parameters"].contains(parameter)) {
27✔
95
      std::string error = "Required parameter '";
×
96
      error += parameter;
×
97
      error += "' not defined in model configuration for pipe '";
×
98
      error += pipe_name;
×
99
      error += "'";
×
100
      throw std::invalid_argument(error);
×
101
    }
102
  }
103
}
104

105
void PipelineManager::checkForKeyPresenceInJson(const nlohmann::json& json,
7✔
106
                                                const std::vector<std::string>& keys) {
107
  if (keys.empty()) {
7✔
108
    return;
×
109
  }
110
  for (const std::string& key : keys) {
26✔
111
    if (!json.contains(key)) {
19✔
112
      std::string error = "Required key '";
×
113
      error += key;
×
114
      error += "' not defined";
×
115
      throw std::invalid_argument(error);
×
116
    }
117
  }
118
}
119

120
std::vector<std::tuple<std::shared_ptr<Evaluator>, double, bool>>
121
PipelineManager::constructEvaluatorsFromJson(const nlohmann::json& json) {
3✔
122
  std::vector<std::tuple<std::shared_ptr<Evaluator>, double, bool>> evaluators;
3✔
123
  for (const auto& evaluator_json : json.items()) {
7✔
124
    checkForKeyPresenceInJson(evaluator_json.value(), {"type", "weight", "invert_logic"});
12✔
125
    const std::string type = evaluator_json.value()["type"].get<std::string>();
6✔
126

127
    std::shared_ptr<Evaluator> evaluator = nullptr;
6✔
128
    if (type == "AggregationEvaluator") {
3✔
129
      evaluator = std::make_shared<AggregationEvaluator>();
×
130
      if (evaluator_json.value().contains("parameters") &&
×
131
          evaluator_json.value()["parameters"].contains("evaluators")) {
×
132
        const auto evaluator_triplets =
133
            constructEvaluatorsFromJson(evaluator_json.value()["parameters"]["evaluators"]);
×
134
        for (const auto& [sub_evaluator, weight, invert_logic] : evaluator_triplets) {
×
135
          std::dynamic_pointer_cast<AggregationEvaluator>(evaluator)->addEvaluator(
×
136
              sub_evaluator, weight, invert_logic);
×
137
        }
138
      }
139
    } else if (type == "MazeEvaluator") {
3✔
140
      checkForKeyPresenceInJson(evaluator_json.value(), {"parameters"});
4✔
141
      checkForKeyPresenceInJson(evaluator_json.value()["parameters"],
10✔
142
                                {"rows", "cols", "difficulty", "max_steps"});
8✔
143
      const uint32_t rows = evaluator_json.value()["parameters"]["rows"].get<uint32_t>();
2✔
144
      const uint32_t cols = evaluator_json.value()["parameters"]["cols"].get<uint32_t>();
2✔
145
      const double difficulty = evaluator_json.value()["parameters"]["difficulty"].get<double>();
2✔
146
      const uint32_t max_steps = evaluator_json.value()["parameters"]["max_steps"].get<uint32_t>();
2✔
147
      evaluator = std::make_shared<MazeEvaluator>(rows, cols, difficulty, max_steps);
2✔
148
    } else {
149
      throw std::invalid_argument("Invalid evaluator type: " + type);
1✔
150
    }
151

152
    const double weight = evaluator_json.value()["weight"].get<double>();
2✔
153
    const bool invert_logic = evaluator_json.value()["invert_logic"].get<bool>();
2✔
154
    evaluators.emplace_back(evaluator, weight, invert_logic);
2✔
155
  }
156
  return evaluators;
2✔
157
}
158

159
Pipeline PipelineManager::constructPipelineFromJson(const nlohmann::json& json) {
8✔
160
  Pipeline pipeline;
8✔
161
  std::map<std::string, std::shared_ptr<Pipe>> created_pipes;
16✔
162
  if (json.contains("pipes") && !json["pipes"].is_null()) {
8✔
163
    for (const auto& pipe : json["pipes"].items()) {
18✔
164
      const std::string& pipe_name = pipe.key();
7✔
165
      if (!pipe.value().contains("type") || pipe.value()["type"].is_null()) {
7✔
166
        throw std::invalid_argument("Type must be defined for pipe '" + pipe_name + "'");
1✔
167
      }
168
      const std::string& pipe_type = pipe.value()["type"].get<std::string>();
12✔
169

170
      if (pipe_type == "ProgramFactoryPipe") {
6✔
171
        checkForParameterPresenceInPipeJson(pipe,
14✔
172
                                            {"factory",
173
                                             "max_candidates",
174
                                             "max_size",
175
                                             "memory_variables",
176
                                             "string_table_items",
177
                                             "string_table_item_length"});
12✔
178
        const uint32_t max_candidates =
179
            pipe.value()["parameters"]["max_candidates"].get<uint32_t>();
2✔
180
        const uint32_t max_size = pipe.value()["parameters"]["max_size"].get<uint32_t>();
2✔
181
        const uint32_t memory_variables =
182
            pipe.value()["parameters"]["memory_variables"].get<uint32_t>();
2✔
183
        const uint32_t string_table_items =
184
            pipe.value()["parameters"]["string_table_items"].get<uint32_t>();
2✔
185
        const uint32_t string_table_item_length =
186
            pipe.value()["parameters"]["string_table_item_length"].get<uint32_t>();
2✔
187

188
        const std::string factory_type = pipe.value()["parameters"]["factory"].get<std::string>();
4✔
189
        std::shared_ptr<ProgramFactoryBase> factory = nullptr;
4✔
190
        if (factory_type == "RandomProgramFactory") {
2✔
191
          factory = std::make_shared<RandomProgramFactory>();
1✔
192
        } else {
193
          throw std::invalid_argument("Invalid program factory type '" + factory_type + "'");
1✔
194
        }
195

196
        created_pipes[pipe_name] = std::make_shared<ProgramFactoryPipe>(max_candidates,
2✔
197
                                                                        max_size,
198
                                                                        memory_variables,
199
                                                                        string_table_items,
200
                                                                        string_table_item_length,
201
                                                                        factory);
1✔
202
        pipeline.addPipe(pipe_name, created_pipes[pipe_name]);
1✔
203
      } else if (pipe_type == "NullSinkPipe") {
4✔
204
        created_pipes[pipe_name] = std::make_shared<NullSinkPipe>();
1✔
205
        pipeline.addPipe(pipe_name, created_pipes[pipe_name]);
1✔
206
      } else if (pipe_type == "EvaluatorPipe") {
3✔
207
        checkForParameterPresenceInPipeJson(pipe,
18✔
208
                                            {"evaluators",
209
                                             "max_candidates",
210
                                             "memory_variables",
211
                                             "string_table_items",
212
                                             "string_table_item_length"});
15✔
213
        const uint32_t max_candidates =
214
            pipe.value()["parameters"]["max_candidates"].get<uint32_t>();
3✔
215
        const uint32_t memory_variables =
216
            pipe.value()["parameters"]["memory_variables"].get<uint32_t>();
3✔
217
        const uint32_t string_table_items =
218
            pipe.value()["parameters"]["string_table_items"].get<uint32_t>();
3✔
219
        const uint32_t string_table_item_length =
220
            pipe.value()["parameters"]["string_table_item_length"].get<uint32_t>();
3✔
221

222
        auto evaluator_pipe = std::make_shared<EvaluatorPipe>(
223
            max_candidates, memory_variables, string_table_items, string_table_item_length);
6✔
224
        created_pipes[pipe_name] = evaluator_pipe;
3✔
225

226
        const auto evaluator_triplets =
227
            constructEvaluatorsFromJson(pipe.value()["parameters"]["evaluators"]);
5✔
228
        for (const auto& evaluator_triplet : evaluator_triplets) {
4✔
229
          std::shared_ptr<Evaluator> evaluator = nullptr;
4✔
230
          double weight = 0.0;
2✔
231
          bool invert_logic = false;
2✔
232
          std::tie(evaluator, weight, invert_logic) = evaluator_triplet;
2✔
233
          evaluator_pipe->addEvaluator(evaluator, weight, invert_logic);
2✔
234
        }
235

236
        pipeline.addPipe(pipe_name, created_pipes[pipe_name]);
2✔
237
      }
238
    }
239
  }
240
  if (json.contains("connections") && !json["connections"].is_null()) {
5✔
241
    for (const auto& connection : json["connections"].items()) {
×
242
      if (!connection.value().contains("source_pipe")) {
×
243
        throw std::invalid_argument("source_pipe parameter required in connection");
×
244
      }
245
      if (!connection.value().contains("source_slot")) {
×
246
        throw std::invalid_argument("source_slot parameter required in connection");
×
247
      }
248
      if (!connection.value().contains("destination_pipe")) {
×
249
        throw std::invalid_argument("destination_pipe parameter required in connection");
×
250
      }
251
      if (!connection.value().contains("destination_slot")) {
×
252
        throw std::invalid_argument("destination_slot parameter required in connection");
×
253
      }
254
      if (!connection.value().contains("buffer_size")) {
×
255
        throw std::invalid_argument("buffer_size parameter required in connection");
×
256
      }
257
      const std::string source_pipe = connection.value()["source_pipe"].get<std::string>();
×
258
      const uint32_t source_slot = connection.value()["source_slot"].get<uint32_t>();
×
259
      const std::string destination_pipe =
260
          connection.value()["destination_pipe"].get<std::string>();
×
261
      const uint32_t destination_slot = connection.value()["destination_slot"].get<uint32_t>();
×
262
      const uint32_t buffer_size = connection.value()["buffer_size"].get<uint32_t>();
×
263

264
      if (created_pipes.find(source_pipe) == created_pipes.end()) {
×
265
        throw std::invalid_argument("Source pipe '" + source_pipe + "' not found");
×
266
      }
267
      if (created_pipes.find(destination_pipe) == created_pipes.end()) {
×
268
        throw std::invalid_argument("Destination pipe '" + destination_pipe + "' not found");
×
269
      }
270

271
      pipeline.connectPipes(created_pipes[source_pipe],
×
272
                            source_slot,
273
                            created_pipes[destination_pipe],
×
274
                            destination_slot,
275
                            buffer_size);
276
    }
277
  }
278
  return pipeline;
10✔
279
}
280

281
nlohmann::json PipelineManager::deconstructEvaluatorsToJson(
2✔
282
    const std::vector<AggregationEvaluator::EvaluatorDescription>& descriptions) {
283
  nlohmann::json evaluators = {};
2✔
284

285
  for (const auto& description : descriptions) {
4✔
286
    nlohmann::json evaluator = nlohmann::json::object();
4✔
287
    evaluator["weight"] = description.weight;
2✔
288
    evaluator["invert_logic"] = description.invert_logic;
2✔
289

290
    if (const auto aggr_eval =
2✔
291
            std::dynamic_pointer_cast<AggregationEvaluator>(description.evaluator)) {
4✔
292
      evaluator["type"] = "AggregationEvaluator";
×
293
      evaluator["parameters"]["evaluators"] =
×
294
          deconstructEvaluatorsToJson(aggr_eval->getEvaluators());
×
295
    } else if (const auto maze_eval =
2✔
296
                   std::dynamic_pointer_cast<MazeEvaluator>(description.evaluator)) {
4✔
297
      evaluator["type"] = "MazeEvaluator";
2✔
298
      evaluator["parameters"]["rows"] = maze_eval->getRows();
2✔
299
      evaluator["parameters"]["cols"] = maze_eval->getCols();
2✔
300
      evaluator["parameters"]["difficulty"] = maze_eval->getDifficulty();
2✔
301
      evaluator["parameters"]["max_steps"] = maze_eval->getMaxSteps();
2✔
302
    }
303

304
    evaluators.push_back(std::move(evaluator));
2✔
305
  }
306

307
  return evaluators;
2✔
308
}
309

310
nlohmann::json PipelineManager::deconstructPipelineToJson(const Pipeline& pipeline) {
6✔
311
  nlohmann::json value;
6✔
312

313
  for (const auto& pipe : pipeline.getPipes()) {
9✔
314
    nlohmann::json pipe_json;
3✔
315

316
    if (const auto evaluator_pipe = std::dynamic_pointer_cast<EvaluatorPipe>(pipe->pipe)) {
6✔
317
      pipe_json["type"] = "EvaluatorPipe";
2✔
318
      pipe_json["parameters"]["max_candidates"] = pipe->pipe->getMaxCandidates();
2✔
319
      pipe_json["parameters"]["memory_variables"] = evaluator_pipe->getMemorySize();
2✔
320
      pipe_json["parameters"]["string_table_item_length"] =
2✔
321
          evaluator_pipe->getStringTableItemLength();
4✔
322
      pipe_json["parameters"]["string_table_items"] = evaluator_pipe->getStringTableSize();
2✔
323
      pipe_json["parameters"]["evaluators"] =
2✔
324
          deconstructEvaluatorsToJson(evaluator_pipe->getEvaluators());
4✔
325
    } else if (std::dynamic_pointer_cast<EvolutionPipe>(pipe->pipe)) {
1✔
326
      pipe_json["type"] = "EvolutionPipe";
×
327
    } else if (std::dynamic_pointer_cast<NullSinkPipe>(pipe->pipe)) {
1✔
328
      pipe_json["type"] = "NullSinkPipe";
1✔
329
    } else if (auto spec_pipe = std::dynamic_pointer_cast<ProgramFactoryPipe>(pipe->pipe)) {
×
330
      pipe_json["type"] = "ProgramFactoryPipe";
×
331
      pipe_json["parameters"]["max_candidates"] = pipe->pipe->getMaxCandidates();
×
332
      pipe_json["parameters"]["max_size"] = spec_pipe->getMaxSize();
×
333
      pipe_json["parameters"]["memory_variables"] = spec_pipe->getMemorySize();
×
334
      pipe_json["parameters"]["string_table_item_length"] = spec_pipe->getStringTableItemLength();
×
335
      pipe_json["parameters"]["string_table_items"] = spec_pipe->getStringTableSize();
×
336

337
      const auto factory = spec_pipe->getFactory();
×
338
      if (std::dynamic_pointer_cast<RandomProgramFactory>(factory)) {
×
339
        pipe_json["parameters"]["factory"] = "RandomProgramFactory";
×
340
      } else {
341
        pipe_json["parameters"]["factory"] = "Unknown";
×
342
      }
343
    } else {
344
      pipe_json["type"] = "Unknown";
×
345
    }
346

347
    value["pipes"][pipe->name] = std::move(pipe_json);
3✔
348
  }
349

350
  nlohmann::json connections_json = {};
12✔
351
  for (const auto& connection : pipeline.getConnections()) {
6✔
352
    nlohmann::json connection_json;
×
353
    connection_json["buffer_size"] = connection.buffer.capacity();
×
354
    connection_json["destination_pipe"] = connection.destination_pipe->name;
×
355
    connection_json["destination_slot"] = connection.destination_slot_index;
×
356
    connection_json["source_pipe"] = connection.source_pipe->name;
×
357
    connection_json["source_slot"] = connection.source_slot_index;
×
358

359
    connections_json.push_back(std::move(connection_json));
×
360
  }
361
  value["connections"] = std::move(connections_json);
6✔
362

363
  return value;
12✔
364
}
365

366
uint32_t PipelineManager::getFreeId() const {
19✔
367
  uint32_t new_id = -1;
19✔
368
  std::list<PipelineDescriptor>::const_iterator iter;
19✔
369
  do {
2✔
370
    new_id++;
21✔
371
    iter = std::find_if(pipelines_.begin(), pipelines_.end(), [new_id](const auto& pipeline) {
4✔
372
      return pipeline.id == new_id;
4✔
373
    });
21✔
374
  } while (iter != pipelines_.end());
21✔
375
  return new_id;
19✔
376
}
377

378
} // namespace beast
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc