• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dedicate-project / beast / a47e88bc-4720-4817-b77a-d8632196f326

pending completion
a47e88bc-4720-4817-b77a-d8632196f326

Pull #9

circleci

fairlight1337
Added proper display of per-pipe mouse-over stats to the frontend
Pull Request #9: Adding pipeline applications

1377 of 1377 new or added lines in 23 files covered. (100.0%)

2922 of 3222 relevant lines covered (90.69%)

17290.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.57
/src/pipeline_server.cpp
1
#include <beast/pipeline_server.hpp>
2

3
// Standard
4
#include <chrono>
5
#include <iomanip>
6
#include <sstream>
7

8
// Internal
9
#include <beast/time_functions.hpp>
10
#include <beast/version.hpp>
11

12
namespace beast {
13

14
namespace {
15
std::string
16
timePointToIso8601(const std::chrono::time_point<std::chrono::system_clock>& timepoint) {
×
17
  // Convert to time_t.
18
  auto time_t_value = std::chrono::system_clock::to_time_t(timepoint);
×
19

20
  // Convert to tm structure.
21
  std::tm tm_value{};
×
22
  gmtime_r(&time_t_value, &tm_value); // Use gmtime_r for thread-safety.
×
23

24
  // Extract milliseconds.
25
  auto time_since_epoch = timepoint.time_since_epoch();
×
26
  auto milliseconds =
27
      std::chrono::duration_cast<std::chrono::milliseconds>(time_since_epoch) % 1000;
×
28

29
  // Format the time as an ISO 8601 string.
30
  std::ostringstream oss;
×
31
  oss << std::put_time(&tm_value, "%Y-%m-%dT%H:%M:%S");
×
32
  oss << '.' << std::setfill('0') << std::setw(3) << milliseconds.count() << "Z";
×
33
  return oss.str();
×
34
}
35
} // namespace
36

37
PipelineServer::PipelineServer(const std::string& storage_folder)
17✔
38
    : pipeline_manager_{storage_folder, 10, 50} {}
17✔
39

40
crow::json::wvalue PipelineServer::serveStatus() {
1✔
41
  crow::json::wvalue value;
1✔
42
  value["version"] = getVersionString();
1✔
43
  return value;
1✔
44
}
45

46
crow::json::wvalue PipelineServer::serveNewPipeline(const crow::request& req) {
15✔
47
  crow::json::wvalue value;
15✔
48
  if (const auto req_body = crow::json::load(req.body); req_body && req_body.has("name")) {
30✔
49
    const auto name = static_cast<std::string>(req_body["name"]);
28✔
50
    try {
51
      const auto pipeline_id = pipeline_manager_.createPipeline(name);
14✔
52
      value["status"] = "success";
14✔
53
      value["id"] = pipeline_id;
14✔
54
    } catch (const std::invalid_argument& exception) {
×
55
      value["status"] = "failed";
×
56
      value["error"] = exception.what();
×
57
    }
58
  } else {
59
    value["status"] = "failed";
1✔
60
    value["error"] = "Missing 'name' in request body";
1✔
61
  }
62
  return value;
15✔
63
}
64

65
crow::json::wvalue PipelineServer::servePipelineById(uint32_t pipeline_id) {
6✔
66
  crow::json::wvalue value;
6✔
67
  value["id"] = pipeline_id;
6✔
68
  try {
69
    const auto& descriptor = pipeline_manager_.getPipelineById(pipeline_id);
6✔
70
    value["state"] =
6✔
71
        descriptor.pipeline->isRunning() ? std::string("running") : std::string("stopped");
9✔
72
    value["name"] = descriptor.name;
3✔
73
    value["metadata"] = crow::json::load(descriptor.metadata.dump());
3✔
74
    value["model"] = crow::json::load(pipeline_manager_.getJsonForPipeline(pipeline_id).dump());
3✔
75

76
    value["status"] = "success";
3✔
77
  } catch (const std::invalid_argument& exception) {
3✔
78
    value["status"] = "failed";
3✔
79
    value["error"] = exception.what();
3✔
80
  }
81
  return value;
6✔
82
}
83

84
crow::json::wvalue PipelineServer::servePipelineAction(const crow::request& req,
14✔
85
                                                       uint32_t pipeline_id,
86
                                                       const std::string_view path) {
87
  crow::json::wvalue value;
14✔
88
  value["id"] = pipeline_id;
14✔
89
  try {
90
    auto& pipeline = pipeline_manager_.getPipelineById(pipeline_id);
14✔
91
    const bool running = pipeline.pipeline->isRunning();
13✔
92
    if (path == "start") {
13✔
93
      if (running) {
4✔
94
        value["status"] = "failed";
1✔
95
        value["error"] = "already_running";
1✔
96
      } else {
97
        try {
98
          pipeline.pipeline->start();
3✔
99
          value["status"] = "success";
3✔
100
        } catch (const std::invalid_argument& exception) {
×
101
          value["status"] = "failed";
×
102
          value["error"] = exception.what();
×
103
        }
104
      }
105
    } else if (path == "stop") {
9✔
106
      if (running) {
2✔
107
        try {
108
          pipeline.pipeline->stop();
1✔
109
          value["status"] = "success";
1✔
110
        } catch (const std::invalid_argument& exception) {
×
111
          value["status"] = "failed";
×
112
          value["error"] = exception.what();
×
113
        }
114
      } else {
115
        value["status"] = "failed";
1✔
116
        value["error"] = "not_running";
1✔
117
      }
118
    } else if (path == "update") {
7✔
119
      if (req.get_header_value("Content-Type") == "application/json") {
4✔
120
        try {
121
          const auto req_body = crow::json::load(req.body);
6✔
122
          const auto action = static_cast<std::string>(req_body["action"]);
5✔
123

124
          if (action == "change_name") {
2✔
125
            const auto new_name = static_cast<std::string>(req_body["name"]);
1✔
126
            pipeline_manager_.updatePipelineName(pipeline.id, new_name);
1✔
127
            value["status"] = "success";
1✔
128
          } else if (action == "move_pipe") {
1✔
129
            auto& descriptor = pipeline_manager_.getPipelineById(pipeline.id);
×
130
            const auto pipe_name = static_cast<std::string>(req_body["name"]);
×
131
            descriptor.metadata["pipes"][pipe_name]["position"]["x"] =
×
132
                static_cast<int32_t>(req_body["x"]);
×
133
            descriptor.metadata["pipes"][pipe_name]["position"]["y"] =
×
134
                static_cast<int32_t>(req_body["y"]);
×
135
            pipeline_manager_.savePipeline(pipeline.id);
×
136
          } else {
137
            value["status"] = "failed";
1✔
138
            value["action"] = action;
1✔
139
            value["error"] = "invalid_action";
1✔
140
          }
141
        } catch (const nlohmann::detail::parse_error& exception) {
×
142
          value["status"] = "failed";
×
143
          value["error"] = exception.what();
×
144
        } catch (const std::invalid_argument& exception) {
×
145
          value["status"] = "failed";
×
146
          value["error"] = exception.what();
×
147
        } catch (const std::runtime_error& exception) {
1✔
148
          value["status"] = "failed";
1✔
149
          value["error"] = exception.what();
1✔
150
        }
151
      } else {
152
        value["status"] = "failed";
1✔
153
        value["error"] = "invalid_request";
1✔
154
      }
155
    } else if (path == "metrics") {
3✔
156
      const auto& metrics = pipeline_manager_.getPipelineMetrics(pipeline.id);
×
157
      value["time"] = timePointToIso8601(metrics.measure_time_start);
×
158
      value["state"] =
×
159
          pipeline.pipeline->isRunning() ? std::string("running") : std::string("stopped");
×
160
      value["pipes"] = crow::json::wvalue::list();
×
161
      uint32_t idx = 0;
×
162
      for (const auto& pipe_pair : metrics.pipes) {
×
163
        crow::json::wvalue pipe_item;
×
164
        pipe_item["name"] = pipe_pair.first;
×
165
        pipe_item["execution_count"] = pipe_pair.second.execution_count;
×
166
        pipe_item["inputs"] = crow::json::wvalue::list();
×
167
        for (const auto& input_pair : pipe_pair.second.inputs_received) {
×
168
          pipe_item["inputs"][input_pair.first] = input_pair.second;
×
169
        }
170
        pipe_item["outputs"] = crow::json::wvalue::list();
×
171
        for (const auto& output_pair : pipe_pair.second.outputs_sent) {
×
172
          pipe_item["outputs"][output_pair.first] = output_pair.second;
×
173
        }
174
        value["pipes"][idx] = std::move(pipe_item);
×
175
        idx++;
×
176
      }
177
      value["status"] = "success";
×
178
    } else if (path == "delete") {
3✔
179
      if (pipeline.pipeline->isRunning()) {
2✔
180
        pipeline.pipeline->stop();
1✔
181
      }
182
      pipeline_manager_.deletePipeline(pipeline.id);
2✔
183
      value["status"] = "success";
2✔
184
    } else {
185
      value["status"] = "failed";
1✔
186
      value["error"] = "invalid_command";
1✔
187
      value["command"] = std::string(path);
1✔
188
    }
189
  } catch (const std::invalid_argument& exception) {
1✔
190
    value["status"] = "failed";
1✔
191
    value["error"] = exception.what();
1✔
192
  }
193
  return value;
14✔
194
}
195

196
crow::json::wvalue PipelineServer::serveAllPipelines() const {
1✔
197
  crow::json::wvalue value = crow::json::wvalue::list();
1✔
198
  uint32_t idx = 0;
1✔
199
  for (const auto& pipeline : pipeline_manager_.getPipelines()) {
3✔
200
    crow::json::wvalue pipeline_item;
2✔
201
    pipeline_item["id"] = pipeline.id;
2✔
202
    pipeline_item["name"] = pipeline.name;
2✔
203
    pipeline_item["state"] =
4✔
204
        pipeline.pipeline->isRunning() ? std::string("running") : std::string("stopped");
6✔
205
    value[idx] = std::move(pipeline_item);
2✔
206
    idx++;
2✔
207
  }
208
  return value;
1✔
209
}
210

211
} // namespace beast
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc