• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dedicate-project / beast / 83ba6ee4-64d8-4300-bcbc-007a3bb86324

pending completion
83ba6ee4-64d8-4300-bcbc-007a3bb86324

Pull #9

circleci

fairlight1337
Large block of Doxygen comments added
Pull Request #9: Adding pipeline applications

1197 of 1197 new or added lines in 21 files covered. (100.0%)

2837 of 3042 relevant lines covered (93.26%)

13117.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.85
/src/pipeline_manager.cpp
1
#include <beast/pipeline_manager.hpp>
2

3
// Internal
4
#include <beast/pipes/evaluator_pipe.hpp>
5
#include <beast/pipes/evolution_pipe.hpp>
6
#include <beast/pipes/null_sink_pipe.hpp>
7
#include <beast/pipes/program_factory_pipe.hpp>
8

9
#include <beast/program_factory_base.hpp>
10
#include <beast/random_program_factory.hpp>
11

12
#include <beast/evaluators/maze_evaluator.hpp>
13

14
namespace beast {
15

16
PipelineManager::PipelineManager(const std::string& storage_path) : filesystem_(storage_path) {
31✔
17
  std::scoped_lock lock{pipelines_mutex_};
62✔
18
  for (const auto& model : filesystem_.loadModels()) {
32✔
19
    PipelineDescriptor descriptor;
2✔
20
    descriptor.id = getFreeId();
1✔
21
    descriptor.name = model["content"]["name"];
1✔
22
    descriptor.filename = model["filename"];
1✔
23
    descriptor.pipeline = constructPipelineFromJson(model["content"]["model"]);
1✔
24
    descriptor.metadata = model["content"]["metadata"];
1✔
25
    pipelines_.push_back(std::move(descriptor));
1✔
26
  }
27
}
31✔
28

29
uint32_t PipelineManager::createPipeline(const std::string& name) {
18✔
30
  std::scoped_lock lock{pipelines_mutex_};
36✔
31
  nlohmann::json model;
36✔
32
  model["pipes"] = nlohmann::json::array();
18✔
33
  model["connections"] = {};
18✔
34
  // TODO(fairlight1337): Add more data to the model.
35
  const std::string filename = filesystem_.saveModel(name, model);
18✔
36
  const uint32_t new_id = getFreeId();
18✔
37
  pipelines_.push_back({new_id, name, filename, beast::Pipeline()});
18✔
38

39
  return new_id;
36✔
40
}
41

42
void PipelineManager::savePipeline(uint32_t pipeline_id) {
×
43
  const auto& descriptor = getPipelineById(pipeline_id);
×
44
  const auto model = deconstructPipelineToJson(descriptor.pipeline);
×
45
  filesystem_.updateModel(descriptor.filename, descriptor.name, model, descriptor.metadata);
×
46
}
×
47

48
PipelineManager::PipelineDescriptor& PipelineManager::getPipelineById(uint32_t pipeline_id) {
28✔
49
  for (PipelineDescriptor& descriptor : pipelines_) {
28✔
50
    if (descriptor.id == pipeline_id) {
23✔
51
      return descriptor;
23✔
52
    }
53
  }
54
  throw std::invalid_argument("Pipeline with this ID not found: " + std::to_string(pipeline_id));
5✔
55
}
56

57
const std::list<PipelineManager::PipelineDescriptor>& PipelineManager::getPipelines() const {
3✔
58
  return pipelines_;
3✔
59
}
60

61
void PipelineManager::updatePipelineName(uint32_t pipeline_id, const std::string_view new_name) {
1✔
62
  std::scoped_lock lock{pipelines_mutex_};
2✔
63
  PipelineDescriptor& descriptor = getPipelineById(pipeline_id);
1✔
64
  descriptor.name = new_name;
1✔
65
}
1✔
66

67
void PipelineManager::deletePipeline(uint32_t pipeline_id) {
2✔
68
  std::scoped_lock lock{pipelines_mutex_};
4✔
69
  PipelineDescriptor descriptor = getPipelineById(pipeline_id);
4✔
70
  filesystem_.deleteModel(descriptor.filename);
2✔
71
  pipelines_.remove_if([pipeline_id](const auto& pipeline) { return pipeline.id == pipeline_id; });
4✔
72
}
2✔
73

74
nlohmann::json PipelineManager::getJsonForPipeline(uint32_t pipeline_id) {
3✔
75
  std::scoped_lock lock{pipelines_mutex_};
6✔
76
  PipelineDescriptor descriptor = getPipelineById(pipeline_id);
6✔
77
  return deconstructPipelineToJson(descriptor.pipeline);
6✔
78
}
79

80
void PipelineManager::checkForParameterPresenceInPipeJson(
5✔
81
    const nlohmann::detail::iteration_proxy_value<nlohmann::json::basic_json::const_iterator>& json,
82
    const std::vector<std::string>& parameters) {
83
  if (parameters.empty()) {
5✔
84
    return;
×
85
  }
86
  const std::string& pipe_name = json.key();
5✔
87
  if (!json.value().contains("parameters")) {
5✔
88
    std::string error = "Parameters not defined in model configuration for pipe '";
×
89
    error += pipe_name;
×
90
    error += "'";
×
91
    throw std::invalid_argument(error);
×
92
  }
93
  for (const std::string& parameter : parameters) {
32✔
94
    if (!json.value()["parameters"].contains(parameter)) {
27✔
95
      std::string error = "Required parameter '";
×
96
      error += parameter;
×
97
      error += "' not defined in model configuration for pipe '";
×
98
      error += pipe_name;
×
99
      error += "'";
×
100
      throw std::invalid_argument(error);
×
101
    }
102
  }
103
}
104

105
void PipelineManager::checkForKeyPresenceInJson(const nlohmann::json& json,
7✔
106
                                                const std::vector<std::string>& keys) {
107
  if (keys.empty()) {
7✔
108
    return;
×
109
  }
110
  for (const std::string& key : keys) {
26✔
111
    if (!json.contains(key)) {
19✔
112
      std::string error = "Required key '";
×
113
      error += key;
×
114
      error += "' not defined";
×
115
      throw std::invalid_argument(error);
×
116
    }
117
  }
118
}
119

120
std::vector<std::tuple<std::shared_ptr<Evaluator>, double, bool>>
121
PipelineManager::constructEvaluatorsFromJson(const nlohmann::json& json) {
3✔
122
  std::vector<std::tuple<std::shared_ptr<Evaluator>, double, bool>> evaluators;
3✔
123
  for (const auto& evaluator_json : json.items()) {
7✔
124
    checkForKeyPresenceInJson(evaluator_json.value(), {"type", "weight", "invert_logic"});
12✔
125
    const std::string type = evaluator_json.value()["type"].get<std::string>();
6✔
126

127
    std::shared_ptr<Evaluator> evaluator = nullptr;
4✔
128
    if (type == "AggregationEvaluator") {
3✔
129
      evaluator = std::make_shared<AggregationEvaluator>();
×
130
      if (evaluator_json.value().contains("parameters") &&
×
131
          evaluator_json.value()["parameters"].contains("evaluators")) {
×
132
        const auto evaluator_triplets =
133
            constructEvaluatorsFromJson(evaluator_json.value()["parameters"]["evaluators"]);
×
134
        for (const auto& evaluator_triplet : evaluator_triplets) {
×
135
          std::shared_ptr<Evaluator> sub_evaluator = nullptr;
×
136
          double weight = 0.0;
×
137
          bool invert_logic = false;
×
138
          std::tie(evaluator, weight, invert_logic);
×
139
          std::dynamic_pointer_cast<AggregationEvaluator>(evaluator)->addEvaluator(
×
140
              sub_evaluator, weight, invert_logic);
141
        }
142
      }
143
    } else if (type == "MazeEvaluator") {
3✔
144
      checkForKeyPresenceInJson(evaluator_json.value(), {"parameters"});
4✔
145
      checkForKeyPresenceInJson(evaluator_json.value()["parameters"],
10✔
146
                                {"rows", "cols", "difficulty", "max_steps"});
8✔
147
      const uint32_t rows = evaluator_json.value()["parameters"]["rows"].get<uint32_t>();
2✔
148
      const uint32_t cols = evaluator_json.value()["parameters"]["cols"].get<uint32_t>();
2✔
149
      const double difficulty = evaluator_json.value()["parameters"]["difficulty"].get<double>();
2✔
150
      const uint32_t max_steps = evaluator_json.value()["parameters"]["max_steps"].get<uint32_t>();
2✔
151
      evaluator = std::make_shared<MazeEvaluator>(rows, cols, difficulty, max_steps);
2✔
152
    } else {
153
      throw std::invalid_argument("Invalid evaluator type: " + type);
1✔
154
    }
155

156
    const double weight = evaluator_json.value()["weight"].get<double>();
2✔
157
    const bool invert_logic = evaluator_json.value()["invert_logic"].get<bool>();
2✔
158
    evaluators.emplace_back(std::make_tuple(evaluator, weight, invert_logic));
2✔
159
  }
160
  return evaluators;
2✔
161
}
162

163
Pipeline PipelineManager::constructPipelineFromJson(const nlohmann::json& json) {
8✔
164
  Pipeline pipeline;
8✔
165
  std::map<std::string, std::shared_ptr<Pipe>> created_pipes;
16✔
166
  if (json.contains("pipes") && !json["pipes"].is_null()) {
8✔
167
    for (const auto& pipe : json["pipes"].items()) {
18✔
168
      const std::string& pipe_name = pipe.key();
7✔
169
      if (!pipe.value().contains("type") || pipe.value()["type"].is_null()) {
7✔
170
        throw std::invalid_argument("Type must be defined for pipe '" + pipe_name + "'");
1✔
171
      }
172
      const std::string& pipe_type = pipe.value()["type"].get<std::string>();
12✔
173

174
      if (pipe_type == "ProgramFactoryPipe") {
6✔
175
        checkForParameterPresenceInPipeJson(pipe,
14✔
176
                                            {"factory",
177
                                             "max_candidates",
178
                                             "max_size",
179
                                             "memory_variables",
180
                                             "string_table_items",
181
                                             "string_table_item_length"});
12✔
182
        const uint32_t max_candidates =
183
            pipe.value()["parameters"]["max_candidates"].get<uint32_t>();
2✔
184
        const uint32_t max_size = pipe.value()["parameters"]["max_size"].get<uint32_t>();
2✔
185
        const uint32_t memory_variables =
186
            pipe.value()["parameters"]["memory_variables"].get<uint32_t>();
2✔
187
        const uint32_t string_table_items =
188
            pipe.value()["parameters"]["string_table_items"].get<uint32_t>();
2✔
189
        const uint32_t string_table_item_length =
190
            pipe.value()["parameters"]["string_table_item_length"].get<uint32_t>();
2✔
191

192
        const std::string factory_type = pipe.value()["parameters"]["factory"].get<std::string>();
4✔
193
        std::shared_ptr<ProgramFactoryBase> factory = nullptr;
4✔
194
        if (factory_type == "RandomProgramFactory") {
2✔
195
          factory = std::make_shared<RandomProgramFactory>();
1✔
196
        } else {
197
          throw std::invalid_argument("Invalid program factory type '" + factory_type + "'");
1✔
198
        }
199

200
        created_pipes[pipe_name] = std::make_shared<ProgramFactoryPipe>(max_candidates,
2✔
201
                                                                        max_size,
202
                                                                        memory_variables,
203
                                                                        string_table_items,
204
                                                                        string_table_item_length,
205
                                                                        factory);
1✔
206
        pipeline.addPipe(pipe_name, created_pipes[pipe_name]);
1✔
207
      } else if (pipe_type == "NullSinkPipe") {
4✔
208
        created_pipes[pipe_name] = std::make_shared<NullSinkPipe>();
1✔
209
        pipeline.addPipe(pipe_name, created_pipes[pipe_name]);
1✔
210
      } else if (pipe_type == "EvaluatorPipe") {
3✔
211
        checkForParameterPresenceInPipeJson(pipe,
18✔
212
                                            {"evaluators",
213
                                             "max_candidates",
214
                                             "memory_variables",
215
                                             "string_table_items",
216
                                             "string_table_item_length"});
15✔
217
        const uint32_t max_candidates =
218
            pipe.value()["parameters"]["max_candidates"].get<uint32_t>();
3✔
219
        const uint32_t memory_variables =
220
            pipe.value()["parameters"]["memory_variables"].get<uint32_t>();
3✔
221
        const uint32_t string_table_items =
222
            pipe.value()["parameters"]["string_table_items"].get<uint32_t>();
3✔
223
        const uint32_t string_table_item_length =
224
            pipe.value()["parameters"]["string_table_item_length"].get<uint32_t>();
3✔
225

226
        auto evaluator_pipe = std::make_shared<EvaluatorPipe>(
227
            max_candidates, memory_variables, string_table_items, string_table_item_length);
6✔
228
        created_pipes[pipe_name] = evaluator_pipe;
3✔
229

230
        const auto evaluator_triplets =
231
            constructEvaluatorsFromJson(pipe.value()["parameters"]["evaluators"]);
5✔
232
        for (const auto& evaluator_triplet : evaluator_triplets) {
4✔
233
          std::shared_ptr<Evaluator> evaluator = nullptr;
4✔
234
          double weight = 0.0;
2✔
235
          bool invert_logic = false;
2✔
236
          std::tie(evaluator, weight, invert_logic) = evaluator_triplet;
2✔
237
          evaluator_pipe->addEvaluator(evaluator, weight, invert_logic);
2✔
238
        }
239

240
        pipeline.addPipe(pipe_name, created_pipes[pipe_name]);
2✔
241
      }
242
    }
243
  }
244
  if (json.contains("connections") && !json["connections"].is_null()) {
5✔
245
    for (const auto& connection : json["connections"].items()) {
×
246
      if (!connection.value().contains("source_pipe")) {
×
247
        throw std::invalid_argument("source_pipe parameter required in connection");
×
248
      }
249
      if (!connection.value().contains("source_slot")) {
×
250
        throw std::invalid_argument("source_slot parameter required in connection");
×
251
      }
252
      if (!connection.value().contains("destination_pipe")) {
×
253
        throw std::invalid_argument("destination_pipe parameter required in connection");
×
254
      }
255
      if (!connection.value().contains("destination_slot")) {
×
256
        throw std::invalid_argument("destination_slot parameter required in connection");
×
257
      }
258
      if (!connection.value().contains("buffer_size")) {
×
259
        throw std::invalid_argument("buffer_size parameter required in connection");
×
260
      }
261
      const std::string source_pipe = connection.value()["source_pipe"].get<std::string>();
×
262
      const uint32_t source_slot = connection.value()["source_slot"].get<uint32_t>();
×
263
      const std::string destination_pipe =
264
          connection.value()["destination_pipe"].get<std::string>();
×
265
      const uint32_t destination_slot = connection.value()["destination_slot"].get<uint32_t>();
×
266
      const uint32_t buffer_size = connection.value()["buffer_size"].get<uint32_t>();
×
267

268
      if (created_pipes.find(source_pipe) == created_pipes.end()) {
×
269
        throw std::invalid_argument("Source pipe '" + source_pipe + "' not found");
×
270
      }
271
      if (created_pipes.find(destination_pipe) == created_pipes.end()) {
×
272
        throw std::invalid_argument("Destination pipe '" + destination_pipe + "' not found");
×
273
      }
274

275
      pipeline.connectPipes(created_pipes[source_pipe],
×
276
                            source_slot,
277
                            created_pipes[destination_pipe],
×
278
                            destination_slot,
279
                            buffer_size);
280
    }
281
  }
282
  return pipeline;
10✔
283
}
284

285
nlohmann::json PipelineManager::deconstructEvaluatorsToJson(
2✔
286
    const std::vector<AggregationEvaluator::EvaluatorDescription>& descriptions) {
287
  nlohmann::json evaluators = {};
2✔
288

289
  for (const auto& description : descriptions) {
4✔
290
    nlohmann::json evaluator = nlohmann::json::object();
4✔
291
    evaluator["weight"] = description.weight;
2✔
292
    evaluator["invert_logic"] = description.invert_logic;
2✔
293

294
    if (const auto aggr_eval =
2✔
295
            std::dynamic_pointer_cast<AggregationEvaluator>(description.evaluator)) {
4✔
296
      evaluator["type"] = "AggregationEvaluator";
×
297
      evaluator["parameters"]["evaluators"] =
×
298
          deconstructEvaluatorsToJson(aggr_eval->getEvaluators());
×
299
    } else if (const auto maze_eval =
2✔
300
                   std::dynamic_pointer_cast<MazeEvaluator>(description.evaluator)) {
4✔
301
      evaluator["type"] = "MazeEvaluator";
2✔
302
      evaluator["parameters"]["rows"] = maze_eval->getRows();
2✔
303
      evaluator["parameters"]["cols"] = maze_eval->getCols();
2✔
304
      evaluator["parameters"]["difficulty"] = maze_eval->getDifficulty();
2✔
305
      evaluator["parameters"]["max_steps"] = maze_eval->getMaxSteps();
2✔
306
    }
307

308
    evaluators.push_back(std::move(evaluator));
2✔
309
  }
310

311
  return evaluators;
2✔
312
}
313

314
nlohmann::json PipelineManager::deconstructPipelineToJson(const Pipeline& pipeline) {
6✔
315
  nlohmann::json value;
6✔
316

317
  for (const auto& pipe : pipeline.getPipes()) {
9✔
318
    nlohmann::json pipe_json;
3✔
319

320
    if (const auto evaluator_pipe = std::dynamic_pointer_cast<EvaluatorPipe>(pipe->pipe)) {
6✔
321
      pipe_json["type"] = "EvaluatorPipe";
2✔
322
      pipe_json["parameters"]["max_candidates"] = pipe->pipe->getMaxCandidates();
2✔
323
      pipe_json["parameters"]["memory_variables"] = evaluator_pipe->getMemorySize();
2✔
324
      pipe_json["parameters"]["string_table_item_length"] =
2✔
325
          evaluator_pipe->getStringTableItemLength();
4✔
326
      pipe_json["parameters"]["string_table_items"] = evaluator_pipe->getStringTableSize();
2✔
327
      pipe_json["parameters"]["evaluators"] =
2✔
328
          deconstructEvaluatorsToJson(evaluator_pipe->getEvaluators());
4✔
329
    } else if (std::dynamic_pointer_cast<EvolutionPipe>(pipe->pipe)) {
1✔
330
      pipe_json["type"] = "EvolutionPipe";
×
331
    } else if (std::dynamic_pointer_cast<NullSinkPipe>(pipe->pipe)) {
1✔
332
      pipe_json["type"] = "NullSinkPipe";
1✔
333
    } else if (auto spec_pipe = std::dynamic_pointer_cast<ProgramFactoryPipe>(pipe->pipe)) {
×
334
      pipe_json["type"] = "ProgramFactoryPipe";
×
335
      pipe_json["parameters"]["max_candidates"] = pipe->pipe->getMaxCandidates();
×
336
      pipe_json["parameters"]["max_size"] = spec_pipe->getMaxSize();
×
337
      pipe_json["parameters"]["memory_variables"] = spec_pipe->getMemorySize();
×
338
      pipe_json["parameters"]["string_table_item_length"] = spec_pipe->getStringTableItemLength();
×
339
      pipe_json["parameters"]["string_table_items"] = spec_pipe->getStringTableSize();
×
340

341
      const auto factory = spec_pipe->getFactory();
×
342
      if (std::dynamic_pointer_cast<RandomProgramFactory>(factory)) {
×
343
        pipe_json["parameters"]["factory"] = "RandomProgramFactory";
×
344
      } else {
345
        pipe_json["parameters"]["factory"] = "Unknown";
×
346
      }
347
    } else {
348
      pipe_json["type"] = "Unknown";
×
349
    }
350

351
    value["pipes"][pipe->name] = std::move(pipe_json);
3✔
352
  }
353

354
  nlohmann::json connections_json = {};
12✔
355
  for (const auto& connection : pipeline.getConnections()) {
6✔
356
    nlohmann::json connection_json;
×
357
    connection_json["buffer_size"] = connection.buffer.capacity();
×
358
    connection_json["destination_pipe"] = connection.destination_pipe->name;
×
359
    connection_json["destination_slot"] = connection.destination_slot_index;
×
360
    connection_json["source_pipe"] = connection.source_pipe->name;
×
361
    connection_json["source_slot"] = connection.source_slot_index;
×
362

363
    connections_json.push_back(std::move(connection_json));
×
364
  }
365
  value["connections"] = std::move(connections_json);
6✔
366

367
  return value;
12✔
368
}
369

370
uint32_t PipelineManager::getFreeId() const {
19✔
371
  uint32_t new_id = -1;
19✔
372
  std::list<PipelineDescriptor>::const_iterator iter;
19✔
373
  do {
2✔
374
    new_id++;
21✔
375
    iter = std::find_if(pipelines_.begin(), pipelines_.end(), [new_id](const auto& pipeline) {
4✔
376
      return pipeline.id == new_id;
4✔
377
    });
21✔
378
  } while (iter != pipelines_.end());
21✔
379
  return new_id;
19✔
380
}
381

382
} // namespace beast
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc