• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dedicate-project / beast / 8c849f2c-086d-406b-a03d-7ef8df12cdc1

pending completion
8c849f2c-086d-406b-a03d-7ef8df12cdc1

Pull #9

circleci

fairlight1337
Fixed more code smells
Pull Request #9: Adding pipeline applications

1247 of 1247 new or added lines in 23 files covered. (100.0%)

2868 of 3092 relevant lines covered (92.76%)

16138.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.95
/src/pipeline_manager.cpp
1
#include <beast/pipeline_manager.hpp>
2

3
// Internal
4
#include <beast/pipes/evaluator_pipe.hpp>
5
#include <beast/pipes/evolution_pipe.hpp>
6
#include <beast/pipes/null_sink_pipe.hpp>
7
#include <beast/pipes/program_factory_pipe.hpp>
8

9
#include <beast/program_factory_base.hpp>
10
#include <beast/random_program_factory.hpp>
11

12
#include <beast/evaluators/maze_evaluator.hpp>
13

14
namespace beast {
15

16
PipelineManager::PipelineManager(const std::string& storage_path) : filesystem_(storage_path) {
31✔
17
  std::scoped_lock lock{pipelines_mutex_};
62✔
18
  for (const auto& model : filesystem_.loadModels()) {
32✔
19
    PipelineDescriptor descriptor;
2✔
20
    descriptor.id = getFreeId();
1✔
21
    descriptor.name = model["content"]["name"];
1✔
22
    descriptor.filename = model["filename"];
1✔
23
    descriptor.pipeline = constructPipelineFromJson(model["content"]["model"]);
1✔
24
    descriptor.metadata = model["content"]["metadata"];
1✔
25
    pipelines_.push_back(std::move(descriptor));
1✔
26
  }
27
}
31✔
28

29
uint32_t PipelineManager::createPipeline(const std::string& name) {
18✔
30
  std::scoped_lock lock{pipelines_mutex_};
36✔
31
  nlohmann::json model;
36✔
32
  model["pipes"] = nlohmann::json::array();
18✔
33
  model["connections"] = {};
18✔
34
  const std::string filename = filesystem_.saveModel(name, model);
18✔
35
  const uint32_t new_id = getFreeId();
18✔
36
  pipelines_.push_back({new_id, name, filename, beast::Pipeline()});
18✔
37

38
  return new_id;
36✔
39
}
40

41
void PipelineManager::savePipeline(uint32_t pipeline_id) {
×
42
  const auto& descriptor = getPipelineById(pipeline_id);
×
43
  const auto model = deconstructPipelineToJson(descriptor.pipeline);
×
44
  filesystem_.updateModel(descriptor.filename, descriptor.name, model, descriptor.metadata);
×
45
}
×
46

47
PipelineManager::PipelineDescriptor& PipelineManager::getPipelineById(uint32_t pipeline_id) {
28✔
48
  for (PipelineDescriptor& descriptor : pipelines_) {
28✔
49
    if (descriptor.id == pipeline_id) {
23✔
50
      return descriptor;
23✔
51
    }
52
  }
53
  throw std::invalid_argument("Pipeline with this ID not found: " + std::to_string(pipeline_id));
5✔
54
}
55

56
const std::list<PipelineManager::PipelineDescriptor>& PipelineManager::getPipelines() const {
3✔
57
  return pipelines_;
3✔
58
}
59

60
void PipelineManager::updatePipelineName(uint32_t pipeline_id, const std::string_view new_name) {
1✔
61
  std::scoped_lock lock{pipelines_mutex_};
2✔
62
  PipelineDescriptor& descriptor = getPipelineById(pipeline_id);
1✔
63
  descriptor.name = new_name;
1✔
64
}
1✔
65

66
void PipelineManager::deletePipeline(uint32_t pipeline_id) {
2✔
67
  std::scoped_lock lock{pipelines_mutex_};
4✔
68
  PipelineDescriptor descriptor = getPipelineById(pipeline_id);
4✔
69
  filesystem_.deleteModel(descriptor.filename);
2✔
70
  pipelines_.remove_if([pipeline_id](const auto& pipeline) { return pipeline.id == pipeline_id; });
4✔
71
}
2✔
72

73
nlohmann::json PipelineManager::getJsonForPipeline(uint32_t pipeline_id) {
3✔
74
  std::scoped_lock lock{pipelines_mutex_};
6✔
75
  PipelineDescriptor descriptor = getPipelineById(pipeline_id);
6✔
76
  return deconstructPipelineToJson(descriptor.pipeline);
6✔
77
}
78

79
void PipelineManager::checkForParameterPresenceInPipeJson(
5✔
80
    const nlohmann::detail::iteration_proxy_value<nlohmann::json::basic_json::const_iterator>& json,
81
    const std::vector<std::string>& parameters) {
82
  if (parameters.empty()) {
5✔
83
    return;
×
84
  }
85
  const std::string& pipe_name = json.key();
5✔
86
  if (!json.value().contains("parameters")) {
5✔
87
    std::string error = "Parameters not defined in model configuration for pipe '";
×
88
    error += pipe_name;
×
89
    error += "'";
×
90
    throw std::invalid_argument(error);
×
91
  }
92
  for (const std::string& parameter : parameters) {
32✔
93
    if (!json.value()["parameters"].contains(parameter)) {
27✔
94
      std::string error = "Required parameter '";
×
95
      error += parameter;
×
96
      error += "' not defined in model configuration for pipe '";
×
97
      error += pipe_name;
×
98
      error += "'";
×
99
      throw std::invalid_argument(error);
×
100
    }
101
  }
102
}
103

104
void PipelineManager::checkForKeyPresenceInJson(const nlohmann::json& json,
7✔
105
                                                const std::vector<std::string>& keys) {
106
  if (keys.empty()) {
7✔
107
    return;
×
108
  }
109
  for (const std::string& key : keys) {
26✔
110
    if (!json.contains(key)) {
19✔
111
      std::string error = "Required key '";
×
112
      error += key;
×
113
      error += "' not defined";
×
114
      throw std::invalid_argument(error);
×
115
    }
116
  }
117
}
118

119
std::vector<std::tuple<std::shared_ptr<Evaluator>, double, bool>>
120
PipelineManager::constructEvaluatorsFromJson(const nlohmann::json& json) {
3✔
121
  std::vector<std::tuple<std::shared_ptr<Evaluator>, double, bool>> evaluators;
3✔
122
  for (const auto& evaluator_json : json.items()) {
7✔
123
    checkForKeyPresenceInJson(evaluator_json.value(), {"type", "weight", "invert_logic"});
12✔
124
    const std::string type = evaluator_json.value()["type"].get<std::string>();
6✔
125

126
    std::shared_ptr<Evaluator> evaluator = nullptr;
6✔
127
    double weight = 0;
3✔
128
    bool invert_logic = false;
3✔
129

130
    if (type == "AggregationEvaluator") {
3✔
131
      evaluator = constructAggregationEvaluatorFromJson(evaluator_json.value());
×
132
      weight = evaluator_json.value()["weight"].get<double>();
×
133
      invert_logic = evaluator_json.value()["invert_logic"].get<bool>();
×
134
    } else if (type == "MazeEvaluator") {
3✔
135
      evaluator = constructMazeEvaluatorFromJson(evaluator_json.value());
2✔
136
      weight = evaluator_json.value()["weight"].get<double>();
2✔
137
      invert_logic = evaluator_json.value()["invert_logic"].get<bool>();
2✔
138
    } else {
139
      throw std::invalid_argument("Invalid evaluator type: " + type);
1✔
140
    }
141

142
    evaluators.emplace_back(evaluator, weight, invert_logic);
2✔
143
  }
144
  return evaluators;
2✔
145
}
146

147
std::shared_ptr<Evaluator>
148
PipelineManager::constructAggregationEvaluatorFromJson(const nlohmann::json& json) {
×
149
  auto evaluator = std::make_shared<AggregationEvaluator>();
×
150
  if (json.contains("parameters") && json["parameters"].contains("evaluators")) {
×
151
    const auto evaluator_triplets = constructEvaluatorsFromJson(json["parameters"]["evaluators"]);
×
152
    for (const auto& [sub_evaluator, weight, invert_logic] : evaluator_triplets) {
×
153
      std::dynamic_pointer_cast<AggregationEvaluator>(evaluator)->addEvaluator(
×
154
          sub_evaluator, weight, invert_logic);
×
155
    }
156
  }
157
  return evaluator;
×
158
}
159

160
std::shared_ptr<Evaluator>
161
PipelineManager::constructMazeEvaluatorFromJson(const nlohmann::json& json) {
2✔
162
  checkForKeyPresenceInJson(json, {"parameters"});
4✔
163
  checkForKeyPresenceInJson(json["parameters"], {"rows", "cols", "difficulty", "max_steps"});
10✔
164
  const uint32_t rows = json["parameters"]["rows"].get<uint32_t>();
2✔
165
  const uint32_t cols = json["parameters"]["cols"].get<uint32_t>();
2✔
166
  const double difficulty = json["parameters"]["difficulty"].get<double>();
2✔
167
  const uint32_t max_steps = json["parameters"]["max_steps"].get<uint32_t>();
2✔
168
  return std::make_shared<MazeEvaluator>(rows, cols, difficulty, max_steps);
2✔
169
}
170

171
Pipeline PipelineManager::constructPipelineFromJson(const nlohmann::json& json) {
8✔
172
  Pipeline pipeline;
8✔
173
  std::map<std::string, std::shared_ptr<Pipe>, std::less<>> created_pipes;
16✔
174
  if (json.contains("pipes") && !json["pipes"].is_null()) {
8✔
175
    for (const auto& pipe : json["pipes"].items()) {
18✔
176
      const std::string& pipe_name = pipe.key();
7✔
177
      if (!pipe.value().contains("type") || pipe.value()["type"].is_null()) {
7✔
178
        throw std::invalid_argument("Type must be defined for pipe '" + pipe_name + "'");
1✔
179
      }
180
      const std::string& pipe_type = pipe.value()["type"].get<std::string>();
12✔
181

182
      if (pipe_type == "ProgramFactoryPipe") {
6✔
183
        checkForParameterPresenceInPipeJson(pipe,
14✔
184
                                            {"factory",
185
                                             "max_candidates",
186
                                             "max_size",
187
                                             "memory_variables",
188
                                             "string_table_items",
189
                                             "string_table_item_length"});
12✔
190
        const uint32_t max_candidates =
191
            pipe.value()["parameters"]["max_candidates"].get<uint32_t>();
2✔
192
        const uint32_t max_size = pipe.value()["parameters"]["max_size"].get<uint32_t>();
2✔
193
        const uint32_t memory_variables =
194
            pipe.value()["parameters"]["memory_variables"].get<uint32_t>();
2✔
195
        const uint32_t string_table_items =
196
            pipe.value()["parameters"]["string_table_items"].get<uint32_t>();
2✔
197
        const uint32_t string_table_item_length =
198
            pipe.value()["parameters"]["string_table_item_length"].get<uint32_t>();
2✔
199

200
        const std::string factory_type = pipe.value()["parameters"]["factory"].get<std::string>();
4✔
201
        std::shared_ptr<ProgramFactoryBase> factory = nullptr;
4✔
202
        if (factory_type == "RandomProgramFactory") {
2✔
203
          factory = std::make_shared<RandomProgramFactory>();
1✔
204
        } else {
205
          throw std::invalid_argument("Invalid program factory type '" + factory_type + "'");
1✔
206
        }
207

208
        created_pipes[pipe_name] = std::make_shared<ProgramFactoryPipe>(max_candidates,
2✔
209
                                                                        max_size,
210
                                                                        memory_variables,
211
                                                                        string_table_items,
212
                                                                        string_table_item_length,
213
                                                                        factory);
1✔
214
        pipeline.addPipe(pipe_name, created_pipes[pipe_name]);
1✔
215
      } else if (pipe_type == "NullSinkPipe") {
4✔
216
        created_pipes[pipe_name] = std::make_shared<NullSinkPipe>();
1✔
217
        pipeline.addPipe(pipe_name, created_pipes[pipe_name]);
1✔
218
      } else if (pipe_type == "EvaluatorPipe") {
3✔
219
        checkForParameterPresenceInPipeJson(pipe,
18✔
220
                                            {"evaluators",
221
                                             "max_candidates",
222
                                             "memory_variables",
223
                                             "string_table_items",
224
                                             "string_table_item_length"});
15✔
225
        const uint32_t max_candidates =
226
            pipe.value()["parameters"]["max_candidates"].get<uint32_t>();
3✔
227
        const uint32_t memory_variables =
228
            pipe.value()["parameters"]["memory_variables"].get<uint32_t>();
3✔
229
        const uint32_t string_table_items =
230
            pipe.value()["parameters"]["string_table_items"].get<uint32_t>();
3✔
231
        const uint32_t string_table_item_length =
232
            pipe.value()["parameters"]["string_table_item_length"].get<uint32_t>();
3✔
233

234
        auto evaluator_pipe = std::make_shared<EvaluatorPipe>(
235
            max_candidates, memory_variables, string_table_items, string_table_item_length);
6✔
236
        created_pipes[pipe_name] = evaluator_pipe;
3✔
237

238
        const auto evaluator_triplets =
239
            constructEvaluatorsFromJson(pipe.value()["parameters"]["evaluators"]);
5✔
240
        for (const auto& evaluator_triplet : evaluator_triplets) {
4✔
241
          std::shared_ptr<Evaluator> evaluator = nullptr;
4✔
242
          double weight = 0.0;
2✔
243
          bool invert_logic = false;
2✔
244
          std::tie(evaluator, weight, invert_logic) = evaluator_triplet;
2✔
245
          evaluator_pipe->addEvaluator(evaluator, weight, invert_logic);
2✔
246
        }
247

248
        pipeline.addPipe(pipe_name, created_pipes[pipe_name]);
2✔
249
      }
250
    }
251
  }
252
  if (json.contains("connections") && !json["connections"].is_null()) {
5✔
253
    for (const auto& connection : json["connections"].items()) {
×
254
      if (!connection.value().contains("source_pipe")) {
×
255
        throw std::invalid_argument("source_pipe parameter required in connection");
×
256
      }
257
      if (!connection.value().contains("source_slot")) {
×
258
        throw std::invalid_argument("source_slot parameter required in connection");
×
259
      }
260
      if (!connection.value().contains("destination_pipe")) {
×
261
        throw std::invalid_argument("destination_pipe parameter required in connection");
×
262
      }
263
      if (!connection.value().contains("destination_slot")) {
×
264
        throw std::invalid_argument("destination_slot parameter required in connection");
×
265
      }
266
      if (!connection.value().contains("buffer_size")) {
×
267
        throw std::invalid_argument("buffer_size parameter required in connection");
×
268
      }
269
      const std::string source_pipe = connection.value()["source_pipe"].get<std::string>();
×
270
      const uint32_t source_slot = connection.value()["source_slot"].get<uint32_t>();
×
271
      const std::string destination_pipe =
272
          connection.value()["destination_pipe"].get<std::string>();
×
273
      const uint32_t destination_slot = connection.value()["destination_slot"].get<uint32_t>();
×
274
      const uint32_t buffer_size = connection.value()["buffer_size"].get<uint32_t>();
×
275

276
      if (created_pipes.find(source_pipe) == created_pipes.end()) {
×
277
        throw std::invalid_argument("Source pipe '" + source_pipe + "' not found");
×
278
      }
279
      if (created_pipes.find(destination_pipe) == created_pipes.end()) {
×
280
        throw std::invalid_argument("Destination pipe '" + destination_pipe + "' not found");
×
281
      }
282

283
      pipeline.connectPipes(created_pipes[source_pipe],
×
284
                            source_slot,
285
                            created_pipes[destination_pipe],
×
286
                            destination_slot,
287
                            buffer_size);
288
    }
289
  }
290
  return pipeline;
10✔
291
}
292

293
nlohmann::json PipelineManager::deconstructEvaluatorsToJson(
2✔
294
    const std::vector<AggregationEvaluator::EvaluatorDescription>& descriptions) {
295
  nlohmann::json evaluators = {};
2✔
296

297
  for (const auto& description : descriptions) {
4✔
298
    nlohmann::json evaluator = nlohmann::json::object();
4✔
299
    evaluator["weight"] = description.weight;
2✔
300
    evaluator["invert_logic"] = description.invert_logic;
2✔
301

302
    if (const auto aggr_eval =
2✔
303
            std::dynamic_pointer_cast<AggregationEvaluator>(description.evaluator)) {
4✔
304
      evaluator["type"] = "AggregationEvaluator";
×
305
      evaluator["parameters"]["evaluators"] =
×
306
          deconstructEvaluatorsToJson(aggr_eval->getEvaluators());
×
307
    } else if (const auto maze_eval =
2✔
308
                   std::dynamic_pointer_cast<MazeEvaluator>(description.evaluator)) {
4✔
309
      evaluator["type"] = "MazeEvaluator";
2✔
310
      evaluator["parameters"]["rows"] = maze_eval->getRows();
2✔
311
      evaluator["parameters"]["cols"] = maze_eval->getCols();
2✔
312
      evaluator["parameters"]["difficulty"] = maze_eval->getDifficulty();
2✔
313
      evaluator["parameters"]["max_steps"] = maze_eval->getMaxSteps();
2✔
314
    }
315

316
    evaluators.push_back(std::move(evaluator));
2✔
317
  }
318

319
  return evaluators;
2✔
320
}
321

322
nlohmann::json PipelineManager::deconstructPipelineToJson(const Pipeline& pipeline) {
6✔
323
  nlohmann::json value;
6✔
324

325
  for (const auto& pipe : pipeline.getPipes()) {
9✔
326
    nlohmann::json pipe_json;
3✔
327

328
    if (const auto evaluator_pipe = std::dynamic_pointer_cast<EvaluatorPipe>(pipe->pipe)) {
6✔
329
      pipe_json["type"] = "EvaluatorPipe";
2✔
330
      pipe_json["parameters"]["max_candidates"] = pipe->pipe->getMaxCandidates();
2✔
331
      pipe_json["parameters"]["memory_variables"] = evaluator_pipe->getMemorySize();
2✔
332
      pipe_json["parameters"]["string_table_item_length"] =
2✔
333
          evaluator_pipe->getStringTableItemLength();
4✔
334
      pipe_json["parameters"]["string_table_items"] = evaluator_pipe->getStringTableSize();
2✔
335
      pipe_json["parameters"]["evaluators"] =
2✔
336
          deconstructEvaluatorsToJson(evaluator_pipe->getEvaluators());
4✔
337
    } else if (std::dynamic_pointer_cast<EvolutionPipe>(pipe->pipe)) {
1✔
338
      pipe_json["type"] = "EvolutionPipe";
×
339
    } else if (std::dynamic_pointer_cast<NullSinkPipe>(pipe->pipe)) {
1✔
340
      pipe_json["type"] = "NullSinkPipe";
1✔
341
    } else if (auto spec_pipe = std::dynamic_pointer_cast<ProgramFactoryPipe>(pipe->pipe)) {
×
342
      pipe_json["type"] = "ProgramFactoryPipe";
×
343
      pipe_json["parameters"]["max_candidates"] = pipe->pipe->getMaxCandidates();
×
344
      pipe_json["parameters"]["max_size"] = spec_pipe->getMaxSize();
×
345
      pipe_json["parameters"]["memory_variables"] = spec_pipe->getMemorySize();
×
346
      pipe_json["parameters"]["string_table_item_length"] = spec_pipe->getStringTableItemLength();
×
347
      pipe_json["parameters"]["string_table_items"] = spec_pipe->getStringTableSize();
×
348

349
      const auto factory = spec_pipe->getFactory();
×
350
      if (std::dynamic_pointer_cast<RandomProgramFactory>(factory)) {
×
351
        pipe_json["parameters"]["factory"] = "RandomProgramFactory";
×
352
      } else {
353
        pipe_json["parameters"]["factory"] = "Unknown";
×
354
      }
355
    } else {
356
      pipe_json["type"] = "Unknown";
×
357
    }
358

359
    value["pipes"][pipe->name] = std::move(pipe_json);
3✔
360
  }
361

362
  nlohmann::json connections_json = {};
12✔
363
  for (const auto& connection : pipeline.getConnections()) {
6✔
364
    nlohmann::json connection_json;
×
365
    connection_json["buffer_size"] = connection->buffer_size;
×
366
    connection_json["destination_pipe"] = connection->destination_pipe->name;
×
367
    connection_json["destination_slot"] = connection->destination_slot_index;
×
368
    connection_json["source_pipe"] = connection->source_pipe->name;
×
369
    connection_json["source_slot"] = connection->source_slot_index;
×
370

371
    connections_json.push_back(std::move(connection_json));
×
372
  }
373
  value["connections"] = std::move(connections_json);
6✔
374

375
  return value;
12✔
376
}
377

378
uint32_t PipelineManager::getFreeId() const {
19✔
379
  uint32_t new_id = -1;
19✔
380
  std::list<PipelineDescriptor>::const_iterator iter;
19✔
381
  do {
2✔
382
    new_id++;
21✔
383
    iter = std::find_if(pipelines_.begin(), pipelines_.end(), [new_id](const auto& pipeline) {
4✔
384
      return pipeline.id == new_id;
4✔
385
    });
21✔
386
  } while (iter != pipelines_.end());
21✔
387
  return new_id;
19✔
388
}
389

390
} // namespace beast
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc