• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jgomezselles / hermes / 19001526536

01 Nov 2025 07:22PM UTC coverage: 95.648% (+0.06%) from 95.587%
19001526536

push

github

jgomezselles
bump up docker image in example

967 of 1011 relevant lines covered (95.65%)

159.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.67
/src/http2_client/client_impl.cpp
1
#include "client_impl.hpp"
2

3
#include <nghttp2/asio_http2_client.h>
4
#include <syslog.h>
5

6
#include <atomic>
7
#include <boost/asio.hpp>
8
#include <boost/bind/bind.hpp>
9
#include <boost/system/error_code.hpp>
10
#include <chrono>
11
#include <iostream>
12
#include <map>
13
#include <mutex>
14
#include <optional>
15
#include <shared_mutex>
16
#include <utility>
17

18
#include "connection.hpp"
19
#include "opentelemetry/context/propagation/global_propagator.h"
20
#include "opentelemetry/context/propagation/text_map_propagator.h"
21
#include "opentelemetry/semconv/incubating/http_attributes.h"
22
#include "opentelemetry/semconv/url_attributes.h"
23
#include "script.hpp"
24
#include "script_queue.hpp"
25
#include "stats.hpp"
26
#include "tracer.hpp"
27

28
namespace ng = nghttp2::asio_http2;
29
using namespace std::chrono;
30
namespace ot_trace = opentelemetry::trace;
31
namespace ot_conv = opentelemetry::semconv;
32

33
namespace http2_client
34
{
35
client_impl::client_impl(std::shared_ptr<stats::stats_if> st, boost::asio::io_context& io_ctx,
36✔
36
                         std::unique_ptr<traffic::script_queue_if> q, const std::string& h,
37
                         const std::string& p, const bool secure_session)
36✔
38
    : stats(std::move(st)),
36✔
39
      io_ctx(io_ctx),
36✔
40
      queue(std::move(q)),
36✔
41
      host(h),
36✔
42
      port(p),
36✔
43
      secure_session(secure_session),
36✔
44
      conn(std::make_unique<connection>(h, p, secure_session))
72✔
45
{
46
    if (!conn->wait_to_be_connected())
36✔
47
    {
48
        std::cerr << "Fatal error. Could not connect to: " << host << ":" << port << std::endl;
4✔
49
    }
50
}
36✔
51

52
void client_impl::handle_timeout(const std::shared_ptr<race_control>& control,
4✔
53
                                 const std::string& msg_name) const
54
{
55
    std::scoped_lock guard(control->mtx);
4✔
56
    if (control->answered)
4✔
57
    {
58
        return;
×
59
    }
60
    control->timed_out = true;
4✔
61
    stats->add_timeout(msg_name);
4✔
62
    queue->cancel_script();
4✔
63
}
4✔
64
// TODO: Add timeout handling in spans
65
void client_impl::handle_timeout_cancelled(const std::shared_ptr<race_control>& control,
12✔
66
                                           const std::string& msg_name) const
67
{
68
    if (control->mtx.try_lock())
12✔
69
    {
70
        if (!control->answered)
12✔
71
        {
72
            control->timed_out = true;
×
73
            stats->add_error(msg_name, 469);
×
74
            queue->cancel_script();
×
75
        }
76
        control->mtx.unlock();
12✔
77
    }
78
}
12✔
79

80
void client_impl::on_timeout(const boost::system::error_code& e,
16✔
81
                             std::shared_ptr<race_control> control,
82
                             const std::string& msg_name) const
83
{
84
    if (e.value() == 0)
16✔
85
    {
86
        handle_timeout(control, msg_name);
4✔
87
    }
88
    else
89
    {
90
        handle_timeout_cancelled(control, msg_name);
12✔
91
    }
92
}
16✔
93

94
void client_impl::open_new_connection()
8✔
95
{
96
    if (!mtx.try_lock())
8✔
97
    {
98
        return;
×
99
    }
100
    conn.reset();
8✔
101

102
    if (auto new_conn = std::make_unique<connection>(host, port, secure_session);
8✔
103
        new_conn->wait_to_be_connected())
8✔
104
    {
105
        conn = std::move(new_conn);
4✔
106
    }
107
    else
108
    {
109
        new_conn.reset();
4✔
110
    }
8✔
111
    mtx.unlock();
8✔
112
}
113

114
void client_impl::send()
24✔
115
{
116
    auto script = queue->get_next_script();
24✔
117
    if (!script)
24✔
118
    {
119
        return;
×
120
    }
121
    request req = get_next_request(host, port, *script);
24✔
122

123
    if (!is_connected())
24✔
124
    {
125
        stats->add_client_error(req.name, 466);
8✔
126
        queue->cancel_script();
8✔
127
        open_new_connection();
8✔
128
        return;
8✔
129
    }
130

131
    if (!mtx.try_lock_shared())
16✔
132
    {
133
        stats->add_client_error(req.name, 467);
×
134
        queue->cancel_script();
×
135
        return;
×
136
    }
137

138
    const auto& session = conn->get_session();
16✔
139
    session.io_service().post(
16✔
140
        [this, script = std::move(script), &session, req]() mutable
16✔
141
        {
142
            boost::system::error_code ec;
16✔
143
            auto init_time = std::make_shared<time_point<steady_clock>>(steady_clock::now());
16✔
144

145
            auto span = o11y::create_child_span(req.name, script->get_span());
16✔
146
            span->SetAttribute(ot_conv::url::kUrlFull, req.url);
16✔
147
            span->SetAttribute(ot_conv::http::kHttpRequestMethod, req.method);
16✔
148
            o11y::inject_trace_context(span, req.headers);
16✔
149

150
            auto nghttp_req = session.submit(ec, req.method, req.url, req.body, req.headers);
16✔
151
            if (!nghttp_req)
16✔
152
            {
153
                std::cerr << "Error submitting. Closing connection:" << ec.message() << std::endl;
×
154
                conn->close();
×
155
                stats->add_client_error(req.name, 468);
×
156
                queue->cancel_script();
×
157
                return;
×
158
            }
159

160
            stats->increase_sent(req.name);
16✔
161
            span->AddEvent("Request sent");
16✔
162

163
            auto ctrl = std::make_shared<race_control>();
16✔
164
            auto timer = std::make_shared<boost::asio::steady_timer>(io_ctx);
16✔
165
            timer->expires_after(milliseconds(script->get_timeout_ms()));
16✔
166
            timer->async_wait(boost::bind(&client_impl::on_timeout, this,
32✔
167
                                          boost::asio::placeholders::error, ctrl, req.name));
16✔
168

169
            nghttp_req->on_response(
16✔
170
                [this, timer, init_time, script = std::move(script), ctrl, req,
32✔
171
                 span](const ng::client::response& res) mutable
172
                {
173
                    auto elapsed_time =
174
                        duration_cast<microseconds>(steady_clock::now() - (*init_time)).count();
12✔
175

176
                    std::lock_guard guard(ctrl->mtx);
12✔
177
                    if (ctrl->timed_out)
12✔
178
                    {
179
                        return;
×
180
                    }
181
                    ctrl->answered = true;
12✔
182
                    timer->cancel();
12✔
183

184
                    span->AddEvent("Response received");
12✔
185
                    auto answer = std::make_shared<std::string>();
12✔
186
                    res.on_data(
12✔
187
                        [this, &res, script = std::move(script), answer, elapsed_time, req, span](
24✔
188
                            const uint8_t* data, std::size_t len) mutable
189
                        {
190
                            if (len > 0)
24✔
191
                            {
192
                                std::string json(reinterpret_cast<const char*>(data), len);
12✔
193
                                *answer += json;
12✔
194
                            }
12✔
195
                            else
196
                            {
197
                                span->AddEvent("Body received");
12✔
198
                                traffic::answer_type ans = {res.status_code(), *answer,
12✔
199
                                                            res.header()};
12✔
200
                                span->SetAttribute(ot_conv::http::kHttpResponseStatusCode,
24✔
201
                                                   res.status_code());
12✔
202

203
                                bool valid_answer = script->validate_answer(ans);
12✔
204
                                if (valid_answer)
12✔
205
                                {
206
                                    stats->add_measurement(req.name, elapsed_time,
8✔
207
                                                           res.status_code());
208
                                    span->SetStatus(ot_trace::StatusCode::kOk);
8✔
209
                                    span->End();
8✔
210
                                    queue->enqueue_script(std::move(script), ans);
8✔
211
                                }
212
                                else
213
                                {
214
                                    stats->add_error(req.name, res.status_code());
4✔
215
                                    span->SetStatus(opentelemetry::trace::StatusCode::kError);
4✔
216
                                    span->End();
4✔
217
                                    queue->cancel_script();
4✔
218
                                }
219
                            }
12✔
220
                        });
24✔
221
                });
12✔
222

223
            nghttp_req->on_close(
16✔
224
                []([[maybe_unused]] uint32_t error_code)
×
225
                {
226
                    // on_close is registered here for the sake of completion and
227
                    // because it helps debugging sometimes, but no implementation needed.
228
                });
16✔
229
        });
16✔
230
    mtx.unlock_shared();
16✔
231
}
32✔
232

233
}  // namespace http2_client
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc