• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jgomezselles / hermes / 6989477809

25 Nov 2023 01:50PM UTC coverage: 96.907% (+0.4%) from 96.514%
6989477809

push

github

jgomezselles
Bump up checkout actions

940 of 970 relevant lines covered (96.91%)

167.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.67
/src/http2_client/client_impl.cpp
1
#include "client_impl.hpp"
2

3
#include <nghttp2/asio_http2_client.h>
4
#include <syslog.h>
5

6
#include <atomic>
7
#include <boost/asio.hpp>
8
#include <boost/bind/bind.hpp>
9
#include <boost/system/error_code.hpp>
10
#include <chrono>
11
#include <iostream>
12
#include <map>
13
#include <mutex>
14
#include <optional>
15
#include <shared_mutex>
16
#include <utility>
17

18
#include "connection.hpp"
19
#include "opentelemetry/trace/semantic_conventions.h"
20
#include "script.hpp"
21
#include "script_queue.hpp"
22
#include "stats.hpp"
23
#include "tracer.hpp"
24

25
namespace ng = nghttp2::asio_http2;
26
using namespace std::chrono;
27
namespace ot_trace = opentelemetry::trace;
28

29
namespace http2_client
30
{
31
client_impl::client_impl(std::shared_ptr<stats::stats_if> st, boost::asio::io_context& io_ctx,
36✔
32
                         std::unique_ptr<traffic::script_queue_if> q, const std::string& h,
33
                         const std::string& p, const bool secure_session)
36✔
34
    : stats(std::move(st)),
36✔
35
      io_ctx(io_ctx),
36✔
36
      queue(std::move(q)),
36✔
37
      host(h),
36✔
38
      port(p),
36✔
39
      secure_session(secure_session),
36✔
40
      conn(std::make_unique<connection>(h, p, secure_session))
72✔
41
{
42
    if (!conn->wait_to_be_connected())
36✔
43
    {
44
        std::cerr << "Fatal error. Could not connect to: " << host << ":" << port << std::endl;
4✔
45
    }
46
}
36✔
47

48
void client_impl::handle_timeout(const std::shared_ptr<race_control>& control,
4✔
49
                                 const std::string& msg_name) const
50
{
51
    std::scoped_lock guard(control->mtx);
4✔
52
    if (control->answered)
4✔
53
    {
54
        return;
×
55
    }
56
    control->timed_out = true;
4✔
57
    stats->add_timeout(msg_name);
4✔
58
    queue->cancel_script();
4✔
59
}
4✔
60
// TODO: Add timeout handling in spans
61
void client_impl::handle_timeout_cancelled(const std::shared_ptr<race_control>& control,
12✔
62
                                           const std::string& msg_name) const
63
{
64
    if (control->mtx.try_lock())
12✔
65
    {
66
        if (!control->answered)
12✔
67
        {
68
            control->timed_out = true;
×
69
            stats->add_error(msg_name, 469);
×
70
            queue->cancel_script();
×
71
        }
72
        control->mtx.unlock();
12✔
73
    }
74
}
12✔
75

76
void client_impl::on_timeout(const boost::system::error_code& e,
16✔
77
                             std::shared_ptr<race_control> control,
78
                             const std::string& msg_name) const
79
{
80
    if (e.value() == 0)
16✔
81
    {
82
        handle_timeout(control, msg_name);
4✔
83
    }
84
    else
85
    {
86
        handle_timeout_cancelled(control, msg_name);
12✔
87
    }
88
}
16✔
89

90
void client_impl::open_new_connection()
8✔
91
{
92
    if (!mtx.try_lock())
8✔
93
    {
94
        return;
×
95
    }
96
    conn.reset();
8✔
97

98
    if (auto new_conn = std::make_unique<connection>(host, port, secure_session);
8✔
99
        new_conn->wait_to_be_connected())
8✔
100
    {
101
        conn = std::move(new_conn);
4✔
102
    }
103
    else
104
    {
105
        new_conn.reset();
4✔
106
    }
8✔
107
    mtx.unlock();
8✔
108
}
109

110
void client_impl::send()
24✔
111
{
112
    auto script_opt = queue->get_next_script();
24✔
113
    if (!script_opt.has_value())
24✔
114
    {
115
        return;
×
116
    }
117
    const auto& script = *script_opt;
24✔
118
    request req = get_next_request(host, port, script);
24✔
119

120
    if (!is_connected())
24✔
121
    {
122
        stats->add_client_error(req.name, 466);
8✔
123
        queue->cancel_script();
8✔
124
        open_new_connection();
8✔
125
        return;
8✔
126
    }
127

128
    if (!mtx.try_lock_shared())
16✔
129
    {
130
        stats->add_client_error(req.name, 467);
×
131
        queue->cancel_script();
×
132
        return;
×
133
    }
134

135
    const auto& session = conn->get_session();
16✔
136
    session.io_service().post(
16✔
137
        [this, script, &session, req]
16✔
138
        {
139
            boost::system::error_code ec;
16✔
140
            auto init_time = std::make_shared<time_point<steady_clock>>(steady_clock::now());
16✔
141

142
            auto span = o11y::create_child_span(req.name, script.get_span());
16✔
143
            span->SetAttribute(ot_trace::SemanticConventions::kUrlFull, req.url);
16✔
144
            span->SetAttribute(ot_trace::SemanticConventions::kHttpRequestMethod, req.method);
16✔
145

146
            auto nghttp_req = session.submit(ec, req.method, req.url, req.body, req.headers);
16✔
147
            if (!nghttp_req)
16✔
148
            {
149
                std::cerr << "Error submitting. Closing connection:" << ec.message() << std::endl;
×
150
                conn->close();
×
151
                stats->add_client_error(req.name, 468);
×
152
                queue->cancel_script();
×
153
                return;
×
154
            }
155

156
            stats->increase_sent(req.name);
16✔
157
            span->AddEvent("Request sent");
16✔
158

159
            auto ctrl = std::make_shared<race_control>();
16✔
160
            auto timer = std::make_shared<boost::asio::steady_timer>(io_ctx);
16✔
161
            timer->expires_after(milliseconds(script.get_timeout_ms()));
16✔
162
            timer->async_wait(boost::bind(&client_impl::on_timeout, this,
32✔
163
                                          boost::asio::placeholders::error, ctrl, req.name));
16✔
164

165
            nghttp_req->on_response(
16✔
166
                [this, timer, init_time, script, ctrl, req, span](const ng::client::response& res)
32✔
167
                {
168
                    auto elapsed_time =
169
                        duration_cast<microseconds>(steady_clock::now() - (*init_time)).count();
12✔
170

171
                    std::lock_guard guard(ctrl->mtx);
12✔
172
                    if (ctrl->timed_out)
12✔
173
                    {
174
                        return;
×
175
                    }
176
                    ctrl->answered = true;
12✔
177
                    timer->cancel();
12✔
178

179
                    span->AddEvent("Response received");
12✔
180
                    auto answer = std::make_shared<std::string>();
12✔
181
                    res.on_data(
12✔
182
                        [this, &res, script, answer, elapsed_time, req, span](const uint8_t* data,
24✔
183
                                                                              std::size_t len)
184
                        {
185
                            if (len > 0)
24✔
186
                            {
187
                                std::string json(reinterpret_cast<const char*>(data), len);
12✔
188
                                *answer += json;
12✔
189
                            }
12✔
190
                            else
191
                            {
192
                                span->AddEvent("Body received");
12✔
193
                                traffic::answer_type ans = {res.status_code(), *answer,
12✔
194
                                                            res.header()};
12✔
195
                                span->SetAttribute(
24✔
196
                                    ot_trace::SemanticConventions::kHttpResponseStatusCode,
197
                                    res.status_code());
12✔
198

199
                                bool valid_answer = script.validate_answer(ans);
12✔
200
                                if (valid_answer)
12✔
201
                                {
202
                                    stats->add_measurement(req.name, elapsed_time,
8✔
203
                                                           res.status_code());
204
                                    span->SetStatus(ot_trace::StatusCode::kOk);
8✔
205
                                    span->End();
8✔
206
                                    queue->enqueue_script(script, ans);
8✔
207
                                }
208
                                else
209
                                {
210
                                    stats->add_error(req.name, res.status_code());
4✔
211
                                    span->SetStatus(opentelemetry::trace::StatusCode::kError);
4✔
212
                                    span->End();
4✔
213
                                    queue->cancel_script();
4✔
214
                                }
215
                            }
12✔
216
                        });
24✔
217
                });
12✔
218

219
            nghttp_req->on_close(
16✔
220
                []([[maybe_unused]] uint32_t error_code)
×
221
                {
222
                    // on_close is registered here for the sake of completion and
223
                    // because it helps debugging sometimes, but no implementation needed.
224
                });
16✔
225
        });
16✔
226
    mtx.unlock_shared();
16✔
227
}
32✔
228

229
}  // namespace http2_client
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc