• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

PowerDNS / pdns / 13012068652

28 Jan 2025 01:59PM UTC coverage: 64.71% (+0.01%) from 64.699%
13012068652

Pull #14724

github

web-flow
Merge b15562560 into db18c3a17
Pull Request #14724: dnsdist: Add meson support

38328 of 90334 branches covered (42.43%)

Branch coverage included in aggregate %.

361 of 513 new or added lines in 35 files covered. (70.37%)

42 existing lines in 13 files now uncovered.

128150 of 166934 relevant lines covered (76.77%)

4540890.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.53
/pdns/dnsdistdist/dnsdist-nghttp2.cc
1
/*
2
 * This file is part of PowerDNS or dnsdist.
3
 * Copyright -- PowerDNS.COM B.V. and its contributors
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of version 2 of the GNU General Public License as
7
 * published by the Free Software Foundation.
8
 *
9
 * In addition, for the avoidance of any doubt, permission is granted to
10
 * link this program with OpenSSL and to (re)distribute the binaries
11
 * produced as the result of such linking.
12
 *
13
 * This program is distributed in the hope that it will be useful,
14
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
 * GNU General Public License for more details.
17
 *
18
 * You should have received a copy of the GNU General Public License
19
 * along with this program; if not, write to the Free Software
20
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21
 */
22

23
#include "config.h"
24

25
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
26
#include <nghttp2/nghttp2.h>
27
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
28

29
#include "dnsdist-nghttp2.hh"
30
#include "dnsdist-nghttp2-in.hh"
31
#include "dnsdist-tcp.hh"
32
#include "dnsdist-tcp-downstream.hh"
33
#include "dnsdist-downstream-connection.hh"
34

35
#include "dolog.hh"
36
#include "channel.hh"
37
#include "iputils.hh"
38
#include "libssl.hh"
39
#include "noinitvector.hh"
40
#include "tcpiohandler.hh"
41
#include "threadname.hh"
42
#include "sstuff.hh"
43

44
std::atomic<uint64_t> g_dohStatesDumpRequested{0};
45
std::unique_ptr<DoHClientCollection> g_dohClientThreads{nullptr};
46

47
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
48
class DoHConnectionToBackend : public ConnectionToBackend
49
{
50
public:
51
  DoHConnectionToBackend(const std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDMultiplexer>& mplexer, const struct timeval& now, std::string&& proxyProtocolPayload);
52

53
  void handleTimeout(const struct timeval& now, bool write) override;
54
  void queueQuery(std::shared_ptr<TCPQuerySender>& sender, TCPQuery&& query) override;
55

56
  std::string toString() const override
57
  {
×
58
    ostringstream o;
×
59
    o << "DoH connection to backend " << (d_ds ? d_ds->getName() : "empty") << " over FD " << (d_handler ? std::to_string(d_handler->getDescriptor()) : "no socket") << ", " << getConcurrentStreamsCount() << " streams";
×
60
    return o.str();
×
61
  }
×
62

63
  void setHealthCheck(bool h)
64
  {
14✔
65
    d_healthCheckQuery = h;
14✔
66
  }
14✔
67

68
  void stopIO() override;
69
  bool reachedMaxConcurrentQueries() const override;
70
  bool reachedMaxStreamID() const override;
71
  bool isIdle() const override;
72
  void release(bool removeFromCache) override
73
  {
×
74
    (void)removeFromCache;
×
75
  }
×
76

77
private:
78
  static ssize_t send_callback(nghttp2_session* session, const uint8_t* data, size_t length, int flags, void* user_data);
79
  static int on_frame_recv_callback(nghttp2_session* session, const nghttp2_frame* frame, void* user_data);
80
  static int on_data_chunk_recv_callback(nghttp2_session* session, uint8_t flags, int32_t stream_id, const uint8_t* data, size_t len, void* user_data);
81
  static int on_stream_close_callback(nghttp2_session* session, int32_t stream_id, uint32_t error_code, void* user_data);
82
  static int on_header_callback(nghttp2_session* session, const nghttp2_frame* frame, const uint8_t* name, size_t namelen, const uint8_t* value, size_t valuelen, uint8_t flags, void* user_data);
83
  static int on_error_callback(nghttp2_session* session, int lib_error_code, const char* msg, size_t len, void* user_data);
84
  static void handleReadableIOCallback(int fd, FDMultiplexer::funcparam_t& param);
85
  static void handleWritableIOCallback(int fd, FDMultiplexer::funcparam_t& param);
86

87
  class PendingRequest
88
  {
89
  public:
90
    std::shared_ptr<TCPQuerySender> d_sender{nullptr};
91
    TCPQuery d_query;
92
    PacketBuffer d_buffer;
93
    size_t d_queryPos{0};
94
    uint16_t d_responseCode{0};
95
    bool d_finished{false};
96
  };
97
  void updateIO(IOState newState, const FDMultiplexer::callbackfunc_t& callback, bool noTTD = false);
98
  void watchForRemoteHostClosingConnection();
99
  void handleResponse(PendingRequest&& request);
100
  void handleResponseError(PendingRequest&& request, const struct timeval& now);
101
  void handleIOError();
102
  uint32_t getConcurrentStreamsCount() const;
103

104
  size_t getUsageCount() const
105
  {
×
106
    auto ref = shared_from_this();
×
107
    return ref.use_count();
×
108
  }
×
109

110
  static const std::unordered_map<std::string, std::string> s_constants;
111

112
  std::unordered_map<int32_t, PendingRequest> d_currentStreams;
113
  std::string d_proxyProtocolPayload;
114
  PacketBuffer d_out;
115
  PacketBuffer d_in;
116
  std::unique_ptr<nghttp2_session, void (*)(nghttp2_session*)> d_session{nullptr, nghttp2_session_del};
117
  size_t d_outPos{0};
118
  size_t d_inPos{0};
119
  bool d_healthCheckQuery{false};
120
  bool d_firstWrite{true};
121
};
122

123
using DownstreamDoHConnectionsManager = DownstreamConnectionsManager<DoHConnectionToBackend>;
124
thread_local DownstreamDoHConnectionsManager t_downstreamDoHConnectionsManager;
125

126
uint32_t DoHConnectionToBackend::getConcurrentStreamsCount() const
127
{
1,360✔
128
  return d_currentStreams.size();
1,360✔
129
}
1,360✔
130

131
void DoHConnectionToBackend::handleResponse(PendingRequest&& request)
132
{
117✔
133
  struct timeval now
117✔
134
  {
117✔
135
    .tv_sec = 0, .tv_usec = 0
117✔
136
  };
117✔
137

138
  gettimeofday(&now, nullptr);
117✔
139
  try {
117✔
140
    if (!d_healthCheckQuery) {
117✔
141
      const double udiff = request.d_query.d_idstate.queryRealTime.udiff();
103✔
142
      d_ds->updateTCPLatency(udiff);
103✔
143
      if (request.d_buffer.size() >= sizeof(dnsheader)) {
103✔
144
        dnsheader dh;
98✔
145
        memcpy(&dh, request.d_buffer.data(), sizeof(dh));
98✔
146
        d_ds->reportResponse(dh.rcode);
98✔
147
      }
98✔
148
      else {
5✔
149
        d_ds->reportTimeoutOrError();
5✔
150
      }
5✔
151
    }
103✔
152

153
    TCPResponse response(std::move(request.d_query));
117✔
154
    response.d_buffer = std::move(request.d_buffer);
117✔
155
    response.d_connection = shared_from_this();
117✔
156
    response.d_ds = d_ds;
117✔
157
    request.d_sender->handleResponse(now, std::move(response));
117✔
158
  }
117✔
159
  catch (const std::exception& e) {
117✔
160
    vinfolog("Got exception while handling response for cross-protocol DoH: %s", e.what());
3✔
161
  }
3✔
162
}
117✔
163

164
void DoHConnectionToBackend::handleResponseError(PendingRequest&& request, const struct timeval& now)
165
{
24✔
166
  try {
24✔
167
    if (!d_healthCheckQuery) {
24!
168
      d_ds->reportTimeoutOrError();
24✔
169
    }
24✔
170

171
    TCPResponse response(PacketBuffer(), std::move(request.d_query.d_idstate), nullptr, nullptr);
24✔
172
    request.d_sender->notifyIOError(now, std::move(response));
24✔
173
  }
24✔
174
  catch (const std::exception& e) {
24✔
175
    vinfolog("Got exception while handling response for cross-protocol DoH: %s", e.what());
×
176
  }
×
177
}
24✔
178

179
void DoHConnectionToBackend::handleIOError()
180
{
13✔
181
  d_connectionDied = true;
13✔
182
  nghttp2_session_terminate_session(d_session.get(), NGHTTP2_PROTOCOL_ERROR);
13✔
183

184
  struct timeval now
13✔
185
  {
13✔
186
    .tv_sec = 0, .tv_usec = 0
13✔
187
  };
13✔
188

189
  gettimeofday(&now, nullptr);
13✔
190
  for (auto& request : d_currentStreams) {
17✔
191
    handleResponseError(std::move(request.second), now);
17✔
192
  }
17✔
193

194
  d_currentStreams.clear();
13✔
195
  stopIO();
13✔
196
}
13✔
197

198
void DoHConnectionToBackend::handleTimeout(const struct timeval& now, bool write)
199
{
3✔
200
  (void)now;
3✔
201
  if (write) {
3✔
202
    if (d_firstWrite) {
1!
203
      ++d_ds->tcpConnectTimeouts;
×
204
    }
×
205
    else {
1✔
206
      ++d_ds->tcpWriteTimeouts;
1✔
207
    }
1✔
208
  }
1✔
209
  else {
2✔
210
    ++d_ds->tcpReadTimeouts;
2✔
211
  }
2✔
212

213
  handleIOError();
3✔
214
}
3✔
215

216
bool DoHConnectionToBackend::reachedMaxStreamID() const
217
{
590✔
218
  const uint32_t maximumStreamID = (static_cast<uint32_t>(1) << 31) - 1;
590✔
219
  return d_highestStreamID == maximumStreamID;
590✔
220
}
590✔
221

222
bool DoHConnectionToBackend::reachedMaxConcurrentQueries() const
223
{
93✔
224
  // cerr<<"Got "<<getConcurrentStreamsCount()<<" concurrent streams, max is "<<nghttp2_session_get_remote_settings(d_session.get(), NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS)<<endl;
225
  if (nghttp2_session_get_remote_settings(d_session.get(), NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS) <= getConcurrentStreamsCount()) {
93!
226
    return true;
×
227
  }
×
228
  return false;
93✔
229
}
93✔
230

231
bool DoHConnectionToBackend::isIdle() const
232
{
1,208✔
233
  return getConcurrentStreamsCount() == 0;
1,208✔
234
}
1,208✔
235

236
void DoHConnectionToBackend::queueQuery(std::shared_ptr<TCPQuerySender>& sender, TCPQuery&& query)
237
{
143✔
238
  auto payloadSize = std::to_string(query.d_buffer.size());
143✔
239

240
  bool addXForwarded = d_ds->d_config.d_addXForwardedHeaders;
143✔
241

242
  /* We use nghttp2_nv_flag.NGHTTP2_NV_FLAG_NO_COPY_NAME and nghttp2_nv_flag.NGHTTP2_NV_FLAG_NO_COPY_VALUE
243
     to avoid a copy and lowercasing but we need to make sure that the data will outlive the request (nghttp2_on_frame_send_callback called), and that it is already lowercased. */
244
  std::vector<nghttp2_nv> headers;
143✔
245
  // these need to live until after the request headers have been processed
246
  std::string remote;
143✔
247
  std::string remotePort;
143✔
248
  headers.reserve(8 + (addXForwarded ? 3 : 0));
143✔
249

250
  /* Pseudo-headers need to come first (rfc7540 8.1.2.1) */
251
  NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::METHOD_NAME, NGHTTP2Headers::HeaderConstantIndexes::METHOD_VALUE);
143✔
252
  NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::SCHEME_NAME, NGHTTP2Headers::HeaderConstantIndexes::SCHEME_VALUE);
143✔
253
  NGHTTP2Headers::addDynamicHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::AUTHORITY_NAME, d_ds->d_config.d_tlsSubjectName);
143✔
254
  NGHTTP2Headers::addDynamicHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::PATH_NAME, d_ds->d_config.d_dohPath);
143✔
255
  NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::ACCEPT_NAME, NGHTTP2Headers::HeaderConstantIndexes::ACCEPT_VALUE);
143✔
256
  NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::CONTENT_TYPE_NAME, NGHTTP2Headers::HeaderConstantIndexes::CONTENT_TYPE_VALUE);
143✔
257
  NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::USER_AGENT_NAME, NGHTTP2Headers::HeaderConstantIndexes::USER_AGENT_VALUE);
143✔
258
  NGHTTP2Headers::addDynamicHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::CONTENT_LENGTH_NAME, payloadSize);
143✔
259
  /* no need to add these headers for health-check queries */
260
  if (addXForwarded && query.d_idstate.origRemote.getPort() != 0) {
143✔
261
    remote = query.d_idstate.origRemote.toString();
2✔
262
    remotePort = std::to_string(query.d_idstate.origRemote.getPort());
2✔
263
    NGHTTP2Headers::addDynamicHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_FOR_NAME, remote);
2✔
264
    NGHTTP2Headers::addDynamicHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PORT_NAME, remotePort);
2✔
265
    if (query.d_idstate.cs != nullptr) {
2!
266
      if (query.d_idstate.cs->isUDP()) {
2✔
267
        NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_NAME, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_VALUE_DNS_OVER_UDP);
1✔
268
      }
1✔
269
      else if (query.d_idstate.cs->isDoH()) {
1!
270
        if (query.d_idstate.cs->hasTLS()) {
×
271
          NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_NAME, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_VALUE_DNS_OVER_HTTPS);
×
272
        }
×
273
        else {
×
274
          NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_NAME, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_VALUE_DNS_OVER_HTTP);
×
275
        }
×
276
      }
×
277
      else if (query.d_idstate.cs->hasTLS()) {
1!
278
        NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_NAME, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_VALUE_DNS_OVER_TLS);
×
279
      }
×
280
      else {
1✔
281
        NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_NAME, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_VALUE_DNS_OVER_TCP);
1✔
282
      }
1✔
283
    }
2✔
284
  }
2✔
285

286
  PendingRequest pending;
143✔
287
  pending.d_query = std::move(query);
143✔
288
  pending.d_sender = std::move(sender);
143✔
289

290
  uint32_t streamId = nghttp2_session_get_next_stream_id(d_session.get());
143✔
291
  auto insertPair = d_currentStreams.insert({streamId, std::move(pending)});
143✔
292
  if (!insertPair.second) {
143!
293
    /* there is a stream ID collision, something is very wrong! */
294
    d_connectionDied = true;
×
295
    nghttp2_session_terminate_session(d_session.get(), NGHTTP2_NO_ERROR);
×
296
    throw std::runtime_error("Stream ID collision");
×
297
  }
×
298

299
  /* if data_prd is not NULL, it provides data which will be sent in subsequent DATA frames. In this case, a method that allows request message bodies (https://tools.ietf.org/html/rfc7231#section-4) must be specified with :method key (e.g. POST). This function does not take ownership of the data_prd. The function copies the members of the data_prd. If data_prd is NULL, HEADERS have END_STREAM set.
300
   */
301
  nghttp2_data_provider data_provider;
143✔
302

303
  data_provider.source.ptr = this;
143✔
304
  data_provider.read_callback = [](nghttp2_session* session, int32_t stream_id, uint8_t* buf, size_t length, uint32_t* data_flags, nghttp2_data_source* source, void* user_data) -> ssize_t {
143✔
305
    (void)session;
142✔
306
    (void)source;
142✔
307
    auto* conn = static_cast<DoHConnectionToBackend*>(user_data);
142✔
308
    auto& request = conn->d_currentStreams.at(stream_id);
142✔
309
    size_t toCopy = 0;
142✔
310
    if (request.d_queryPos < request.d_query.d_buffer.size()) {
142!
311
      size_t remaining = request.d_query.d_buffer.size() - request.d_queryPos;
142✔
312
      toCopy = length > remaining ? remaining : length;
142!
313
      memcpy(buf, &request.d_query.d_buffer.at(request.d_queryPos), toCopy);
142✔
314
      request.d_queryPos += toCopy;
142✔
315
    }
142✔
316

317
    if (request.d_queryPos >= request.d_query.d_buffer.size()) {
142!
318
      *data_flags |= NGHTTP2_DATA_FLAG_EOF;
142✔
319
    }
142✔
320
    return toCopy;
142✔
321
  };
142✔
322

323
  auto newStreamId = nghttp2_submit_request(d_session.get(), nullptr, headers.data(), headers.size(), &data_provider, this);
143✔
324
  if (newStreamId < 0) {
143!
325
    d_connectionDied = true;
×
326
    ++d_ds->tcpDiedSendingQuery;
×
327
    d_currentStreams.erase(streamId);
×
328
    throw std::runtime_error("Error submitting HTTP request:" + std::string(nghttp2_strerror(newStreamId)));
×
329
  }
×
330

331
  auto rv = nghttp2_session_send(d_session.get());
143✔
332
  if (rv != 0) {
143!
333
    d_connectionDied = true;
×
334
    ++d_ds->tcpDiedSendingQuery;
×
335
    d_currentStreams.erase(streamId);
×
336
    throw std::runtime_error("Error in nghttp2_session_send:" + std::to_string(rv));
×
337
  }
×
338

339
  d_highestStreamID = newStreamId;
143✔
340
}
143✔
341

342
class DoHClientThreadData
343
{
344
public:
345
  DoHClientThreadData(pdns::channel::Receiver<CrossProtocolQuery>&& receiver) :
346
    mplexer(std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent())),
347
    d_receiver(std::move(receiver))
348
  {
332✔
349
  }
332✔
350

351
  std::unique_ptr<FDMultiplexer> mplexer{nullptr};
352
  pdns::channel::Receiver<CrossProtocolQuery> d_receiver;
353
};
354

355
void DoHConnectionToBackend::handleReadableIOCallback(int fd, FDMultiplexer::funcparam_t& param)
356
{
221✔
357
  auto conn = boost::any_cast<std::shared_ptr<DoHConnectionToBackend>>(param);
221✔
358
  if (fd != conn->getHandle()) {
221!
359
    throw std::runtime_error("Unexpected socket descriptor " + std::to_string(fd) + " received in " + std::string(__PRETTY_FUNCTION__) + ", expected " + std::to_string(conn->getHandle()));
×
360
  }
×
361

362
  IOStateGuard ioGuard(conn->d_ioState);
221✔
363
  do {
280✔
364
    conn->d_inPos = 0;
280✔
365
    conn->d_in.resize(conn->d_in.size() + 512);
280✔
366
    // cerr<<"trying to read "<<conn->d_in.size()<<endl;
367
    try {
280✔
368
      IOState newState = conn->d_handler->tryRead(conn->d_in, conn->d_inPos, conn->d_in.size(), true);
280✔
369
      // cerr<<"got a "<<(int)newState<<" state and "<<conn->d_inPos<<" bytes"<<endl;
370
      conn->d_in.resize(conn->d_inPos);
280✔
371

372
      if (conn->d_inPos > 0) {
280✔
373
        /* we got something */
374
        auto readlen = nghttp2_session_mem_recv(conn->d_session.get(), conn->d_in.data(), conn->d_inPos);
184✔
375
        // cerr<<"nghttp2_session_mem_recv returned "<<readlen<<endl;
376
        /* as long as we don't require a pause by returning nghttp2_error.NGHTTP2_ERR_PAUSE from a CB,
377
           all data should be consumed before returning */
378
        if (readlen > 0 && static_cast<size_t>(readlen) < conn->d_inPos) {
184!
379
          throw std::runtime_error("Fatal error while passing received data to nghttp2: " + std::string(nghttp2_strerror((int)readlen)));
×
380
        }
×
381

382
        struct timeval now
184✔
383
        {
184✔
384
          .tv_sec = 0, .tv_usec = 0
184✔
385
        };
184✔
386

387
        gettimeofday(&now, nullptr);
184✔
388
        conn->d_lastDataReceivedTime = now;
184✔
389

390
        // cerr<<"after read send"<<endl;
391
        nghttp2_session_send(conn->d_session.get());
184✔
392
      }
184✔
393

394
      if (newState == IOState::Done) {
280✔
395
        if (conn->isIdle()) {
183✔
396
          conn->stopIO();
124✔
397
          conn->watchForRemoteHostClosingConnection();
124✔
398
          ioGuard.release();
124✔
399
          break;
124✔
400
        }
124✔
401
      }
183✔
402
      else {
97✔
403
        if (newState == IOState::NeedWrite) {
97!
404
          // cerr<<"need write"<<endl;
405
          conn->updateIO(IOState::NeedWrite, handleReadableIOCallback);
×
406
        }
×
407
        ioGuard.release();
97✔
408
        break;
97✔
409
      }
97✔
410
    }
280✔
411
    catch (const std::exception& e) {
280✔
412
      vinfolog("Exception while trying to read from HTTP backend connection: %s", e.what());
5✔
413
      ++conn->d_ds->tcpDiedReadingResponse;
5✔
414
      conn->handleIOError();
5✔
415
      break;
5✔
416
    }
5✔
417
  } while (conn->getConcurrentStreamsCount() > 0);
280!
418
}
221✔
419

420
void DoHConnectionToBackend::handleWritableIOCallback(int fd, FDMultiplexer::funcparam_t& param)
421
{
33✔
422
  auto conn = boost::any_cast<std::shared_ptr<DoHConnectionToBackend>>(param);
33✔
423
  if (fd != conn->getHandle()) {
33!
424
    throw std::runtime_error("Unexpected socket descriptor " + std::to_string(fd) + " received in " + std::string(__PRETTY_FUNCTION__) + ", expected " + std::to_string(conn->getHandle()));
×
425
  }
×
426
  IOStateGuard ioGuard(conn->d_ioState);
33✔
427

428
  // cerr<<"in "<<__PRETTY_FUNCTION__<<" trying to write "<<conn->d_out.size()-conn->d_outPos<<endl;
429
  try {
33✔
430
    IOState newState = conn->d_handler->tryWrite(conn->d_out, conn->d_outPos, conn->d_out.size());
33✔
431
    // cerr<<"got a "<<(int)newState<<" state, "<<conn->d_out.size()-conn->d_outPos<<" bytes remaining"<<endl;
432
    if (newState == IOState::NeedRead) {
33!
433
      conn->updateIO(IOState::NeedRead, handleWritableIOCallback);
×
434
    }
×
435
    else if (newState == IOState::Done) {
33✔
436
      // cerr<<"done, buffer size was "<<conn->d_out.size()<<", pos was "<<conn->d_outPos<<endl;
437
      conn->d_firstWrite = false;
29✔
438
      conn->d_out.clear();
29✔
439
      conn->d_outPos = 0;
29✔
440
      conn->stopIO();
29✔
441
      if (!conn->isIdle()) {
29!
442
        conn->updateIO(IOState::NeedRead, handleReadableIOCallback);
29✔
443
      }
29✔
444
      else {
×
445
        conn->watchForRemoteHostClosingConnection();
×
446
      }
×
447
    }
29✔
448
    ioGuard.release();
33✔
449
  }
33✔
450
  catch (const std::exception& e) {
33✔
451
    vinfolog("Exception while trying to write (ready) to HTTP backend connection: %s", e.what());
4!
452
    ++conn->d_ds->tcpDiedSendingQuery;
4✔
453
    conn->handleIOError();
4✔
454
  }
4✔
455
}
33✔
456

457
void DoHConnectionToBackend::stopIO()
458
{
580✔
459
  d_ioState->reset();
580✔
460

461
  if (isIdle()) {
580✔
462
    auto shared = std::dynamic_pointer_cast<DoHConnectionToBackend>(shared_from_this());
271✔
463
    if (!willBeReusable(false)) {
271✔
464
      /* remove ourselves from the connection cache, this might mean that our
465
         reference count drops to zero after that, so we need to be careful */
466
      t_downstreamDoHConnectionsManager.removeDownstreamConnection(shared);
31✔
467
    }
31✔
468
    else {
240✔
469
      t_downstreamDoHConnectionsManager.moveToIdle(shared);
240✔
470
    }
240✔
471
  }
271✔
472
}
580✔
473

474
void DoHConnectionToBackend::updateIO(IOState newState, const FDMultiplexer::callbackfunc_t& callback, bool noTTD)
475
{
551✔
476
  struct timeval now
551✔
477
  {
551✔
478
    .tv_sec = 0, .tv_usec = 0
551✔
479
  };
551✔
480

481
  gettimeofday(&now, nullptr);
551✔
482
  boost::optional<struct timeval> ttd{boost::none};
551✔
483
  if (!noTTD) {
551!
484
    if (d_healthCheckQuery) {
551✔
485
      ttd = getBackendHealthCheckTTD(now);
42✔
486
    }
42✔
487
    else if (newState == IOState::NeedRead) {
509✔
488
      ttd = getBackendReadTTD(now);
507✔
489
    }
507✔
490
    else if (isFresh() && d_firstWrite) {
2!
491
      /* first write just after the non-blocking connect */
492
      ttd = getBackendConnectTTD(now);
×
493
    }
×
494
    else {
2✔
495
      ttd = getBackendWriteTTD(now);
2✔
496
    }
2✔
497
  }
551✔
498

499
  auto shared = std::dynamic_pointer_cast<DoHConnectionToBackend>(shared_from_this());
551✔
500
  if (shared) {
551!
501
    if (newState == IOState::NeedRead) {
551✔
502
      d_ioState->update(newState, callback, std::move(shared), ttd);
549✔
503
    }
549✔
504
    else if (newState == IOState::NeedWrite) {
2!
505
      d_ioState->update(newState, callback, std::move(shared), ttd);
2✔
506
    }
2✔
507
  }
551✔
508
}
551✔
509

510
void DoHConnectionToBackend::watchForRemoteHostClosingConnection()
511
{
254✔
512
  if (willBeReusable(false) && !d_healthCheckQuery) {
254✔
513
    updateIO(IOState::NeedRead, handleReadableIOCallback, false);
208✔
514
  }
208✔
515
}
254✔
516

517
ssize_t DoHConnectionToBackend::send_callback(nghttp2_session* session, const uint8_t* data, size_t length, int flags, void* user_data)
518
{
426✔
519
  (void)session;
426✔
520
  (void)flags;
426✔
521
  DoHConnectionToBackend* conn = reinterpret_cast<DoHConnectionToBackend*>(user_data);
426✔
522
  bool bufferWasEmpty = conn->d_out.empty();
426✔
523
  if (!conn->d_proxyProtocolPayloadSent && !conn->d_proxyProtocolPayload.empty()) {
426✔
524
    conn->d_out.insert(conn->d_out.end(), conn->d_proxyProtocolPayload.begin(), conn->d_proxyProtocolPayload.end());
4✔
525
    conn->d_proxyProtocolPayloadSent = true;
4✔
526
  }
4✔
527

528
  conn->d_out.insert(conn->d_out.end(), data, data + length);
426✔
529

530
  if (bufferWasEmpty) {
426✔
531
    try {
325✔
532
      // cerr<<"in "<<__PRETTY_FUNCTION__<<" trying to write "<<conn->d_out.size()-conn->d_outPos<<endl;
533
      auto state = conn->d_handler->tryWrite(conn->d_out, conn->d_outPos, conn->d_out.size());
325✔
534
      // cerr<<"got a "<<(int)state<<" state, "<<conn->d_out.size()-conn->d_outPos<<" bytes remaining"<<endl;
535
      if (state == IOState::Done) {
325✔
536
        conn->d_firstWrite = false;
290✔
537
        conn->d_out.clear();
290✔
538
        conn->d_outPos = 0;
290✔
539
        conn->stopIO();
290✔
540
        if (!conn->isIdle()) {
290✔
541
          conn->updateIO(IOState::NeedRead, handleReadableIOCallback);
280✔
542
        }
280✔
543
        else {
10✔
544
          conn->watchForRemoteHostClosingConnection();
10✔
545
        }
10✔
546
      }
290✔
547
      else {
35✔
548
        conn->updateIO(state, handleWritableIOCallback);
35✔
549
      }
35✔
550
    }
325✔
551
    catch (const std::exception& e) {
325✔
552
      vinfolog("Exception while trying to write (send) to HTTP backend connection: %s", e.what());
1!
553
      conn->handleIOError();
1✔
554
      ++conn->d_ds->tcpDiedSendingQuery;
1✔
555
    }
1✔
556
  }
325✔
557

558
  return length;
426✔
559
}
426✔
560

561
int DoHConnectionToBackend::on_frame_recv_callback(nghttp2_session* session, const nghttp2_frame* frame, void* user_data)
562
{
334✔
563
  (void)session;
334✔
564
  DoHConnectionToBackend* conn = reinterpret_cast<DoHConnectionToBackend*>(user_data);
334✔
565
  // cerr<<"Frame type is "<<std::to_string(frame->hd.type)<<endl;
566
#if 0
567
  switch (frame->hd.type) {
568
  case NGHTTP2_HEADERS:
569
    cerr<<"got headers"<<endl;
570
    if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE) {
571
      cerr<<"All headers received"<<endl;
572
    }
573
    break;
574
  case NGHTTP2_WINDOW_UPDATE:
575
    cerr<<"got window update"<<endl;
576
    break;
577
  case NGHTTP2_SETTINGS:
578
    cerr<<"got settings"<<endl;
579
    cerr<<frame->settings.niv<<endl;
580
    for (size_t idx = 0; idx < frame->settings.niv; idx++) {
581
      cerr<<"- "<<frame->settings.iv[idx].settings_id<<" "<<frame->settings.iv[idx].value<<endl;
582
    }
583
    break;
584
  case NGHTTP2_DATA:
585
    cerr<<"got data"<<endl;
586
    break;
587
  }
588
#endif
589

590
  if (frame->hd.type == NGHTTP2_GOAWAY) {
334✔
591
    conn->d_connectionDied = true;
6✔
592
  }
6✔
593

594
  /* is this the last frame for this stream? */
595
  else if ((frame->hd.type == NGHTTP2_HEADERS || frame->hd.type == NGHTTP2_DATA) && frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
328✔
596
    auto stream = conn->d_currentStreams.find(frame->hd.stream_id);
122✔
597
    if (stream != conn->d_currentStreams.end()) {
122!
598
      // cerr<<"Stream "<<frame->hd.stream_id<<" is now finished"<<endl;
599
      stream->second.d_finished = true;
122✔
600
      ++conn->d_queries;
122✔
601

602
      auto request = std::move(stream->second);
122✔
603
      conn->d_currentStreams.erase(stream->first);
122✔
604
      if (request.d_responseCode == 200U) {
122✔
605
        conn->handleResponse(std::move(request));
117✔
606
      }
117✔
607
      else {
5✔
608
        vinfolog("HTTP response has a non-200 status code: %d", request.d_responseCode);
5✔
609
        struct timeval now
5✔
610
        {
5✔
611
          .tv_sec = 0, .tv_usec = 0
5✔
612
        };
5✔
613

614
        gettimeofday(&now, nullptr);
5✔
615

616
        conn->handleResponseError(std::move(request), now);
5✔
617
      }
5✔
618

619
      if (conn->isIdle()) {
122✔
620
        conn->stopIO();
118✔
621
        conn->watchForRemoteHostClosingConnection();
118✔
622
      }
118✔
623
    }
122✔
624
    else {
×
625
      vinfolog("Stream %d NOT FOUND", frame->hd.stream_id);
×
626
      conn->d_connectionDied = true;
×
627
      ++conn->d_ds->tcpDiedReadingResponse;
×
628
      return NGHTTP2_ERR_CALLBACK_FAILURE;
×
629
    }
×
630
  }
122✔
631

632
  return 0;
334✔
633
}
334✔
634

635
int DoHConnectionToBackend::on_data_chunk_recv_callback(nghttp2_session* session, uint8_t flags, int32_t stream_id, const uint8_t* data, size_t len, void* user_data)
636
{
121✔
637
  (void)session;
121✔
638
  (void)flags;
121✔
639
  DoHConnectionToBackend* conn = reinterpret_cast<DoHConnectionToBackend*>(user_data);
121✔
640
  // cerr<<"Got data of size "<<len<<" for stream "<<stream_id<<endl;
641
  auto stream = conn->d_currentStreams.find(stream_id);
121✔
642
  if (stream == conn->d_currentStreams.end()) {
121!
643
    vinfolog("Unable to match the stream ID %d to a known one!", stream_id);
×
644
    conn->d_connectionDied = true;
×
645
    ++conn->d_ds->tcpDiedReadingResponse;
×
646
    return NGHTTP2_ERR_CALLBACK_FAILURE;
×
647
  }
×
648
  if (len > std::numeric_limits<uint16_t>::max() || (std::numeric_limits<uint16_t>::max() - stream->second.d_buffer.size()) < len) {
121!
649
    vinfolog("Data frame of size %d is too large for a DNS response (we already have %d)", len, stream->second.d_buffer.size());
×
650
    conn->d_connectionDied = true;
×
651
    ++conn->d_ds->tcpDiedReadingResponse;
×
652
    return NGHTTP2_ERR_CALLBACK_FAILURE;
×
653
  }
×
654

655
  stream->second.d_buffer.insert(stream->second.d_buffer.end(), data, data + len);
121✔
656
  if (stream->second.d_finished) {
121!
657
    // cerr<<"we now have the full response!"<<endl;
658
    // cerr<<std::string(reinterpret_cast<const char*>(data), len)<<endl;
659

660
    auto request = std::move(stream->second);
×
661
    conn->d_currentStreams.erase(stream->first);
×
662
    if (request.d_responseCode == 200U) {
×
663
      conn->handleResponse(std::move(request));
×
664
    }
×
665
    else {
×
666
      vinfolog("HTTP response has a non-200 status code: %d", request.d_responseCode);
×
667
      struct timeval now
×
668
      {
×
669
        .tv_sec = 0, .tv_usec = 0
×
670
      };
×
671

672
      gettimeofday(&now, nullptr);
×
673

674
      conn->handleResponseError(std::move(request), now);
×
675
    }
×
676
    if (conn->isIdle()) {
×
677
      conn->stopIO();
×
678
      conn->watchForRemoteHostClosingConnection();
×
679
    }
×
680
  }
×
681

682
  return 0;
121✔
683
}
121✔
684

685
int DoHConnectionToBackend::on_stream_close_callback(nghttp2_session* session, int32_t stream_id, uint32_t error_code, void* user_data)
686
{
126✔
687
  (void)session;
126✔
688
  DoHConnectionToBackend* conn = reinterpret_cast<DoHConnectionToBackend*>(user_data);
126✔
689

690
  if (error_code == 0) {
126✔
691
    return 0;
122✔
692
  }
122✔
693

694
  // cerr << "Stream " << stream_id << " closed with error_code=" << error_code << endl;
695
  conn->d_connectionDied = true;
4✔
696
  ++conn->d_ds->tcpDiedReadingResponse;
4✔
697

698
  auto stream = conn->d_currentStreams.find(stream_id);
4✔
699
  if (stream == conn->d_currentStreams.end()) {
4!
700
    /* we don't care, then */
701
    return 0;
×
702
  }
×
703

704
  struct timeval now
4✔
705
  {
4✔
706
    .tv_sec = 0, .tv_usec = 0
4✔
707
  };
4✔
708

709
  gettimeofday(&now, nullptr);
4✔
710
  auto request = std::move(stream->second);
4✔
711
  conn->d_currentStreams.erase(stream->first);
4✔
712

713
  // cerr<<"Query has "<<request.d_query.d_downstreamFailures<<" failures, backend limit is "<<conn->d_ds->d_retries<<endl;
714
  if (request.d_query.d_downstreamFailures < conn->d_ds->d_config.d_retries) {
4✔
715
    // cerr<<"in "<<__PRETTY_FUNCTION__<<", looking for a connection to send a query of size "<<request.d_query.d_buffer.size()<<endl;
716
    ++request.d_query.d_downstreamFailures;
2✔
717
    auto downstream = t_downstreamDoHConnectionsManager.getConnectionToDownstream(conn->d_mplexer, conn->d_ds, now, std::string(conn->d_proxyProtocolPayload));
2✔
718
    downstream->queueQuery(request.d_sender, std::move(request.d_query));
2✔
719
  }
2✔
720
  else {
2✔
721
    conn->handleResponseError(std::move(request), now);
2✔
722
  }
2✔
723

724
  // cerr<<"we now have "<<conn->getConcurrentStreamsCount()<<" concurrent connections"<<endl;
725
  if (conn->isIdle()) {
4✔
726
    // cerr<<"stopping IO"<<endl;
727
    conn->stopIO();
2✔
728
    conn->watchForRemoteHostClosingConnection();
2✔
729
  }
2✔
730

731
  return 0;
4✔
732
}
4✔
733

734
int DoHConnectionToBackend::on_header_callback(nghttp2_session* session, const nghttp2_frame* frame, const uint8_t* name, size_t namelen, const uint8_t* value, size_t valuelen, uint8_t flags, void* user_data)
735
{
337✔
736
  (void)session;
337✔
737
  (void)flags;
337✔
738
  DoHConnectionToBackend* conn = reinterpret_cast<DoHConnectionToBackend*>(user_data);
337✔
739

740
  const std::string status(":status");
337✔
741
  if (frame->hd.type == NGHTTP2_HEADERS && frame->headers.cat == NGHTTP2_HCAT_RESPONSE) {
337!
742
    // cerr<<"got header for "<<frame->hd.stream_id<<":"<<endl;
743
    // cerr<<"- "<<std::string(reinterpret_cast<const char*>(name), namelen)<<endl;
744
    // cerr<<"- "<<std::string(reinterpret_cast<const char*>(value), valuelen)<<endl;
745
    if (namelen == status.size() && memcmp(status.data(), name, status.size()) == 0) {
337!
746
      auto stream = conn->d_currentStreams.find(frame->hd.stream_id);
123✔
747
      if (stream == conn->d_currentStreams.end()) {
123!
748
        vinfolog("Unable to match the stream ID %d to a known one!", frame->hd.stream_id);
×
749
        conn->d_connectionDied = true;
×
750
        return NGHTTP2_ERR_CALLBACK_FAILURE;
×
751
      }
×
752
      try {
123✔
753
        pdns::checked_stoi_into(stream->second.d_responseCode, std::string(reinterpret_cast<const char*>(value), valuelen));
123✔
754
      }
123✔
755
      catch (...) {
123✔
756
        vinfolog("Error parsing the status header for stream ID %d", frame->hd.stream_id);
×
757
        conn->d_connectionDied = true;
×
758
        ++conn->d_ds->tcpDiedReadingResponse;
×
759
        return NGHTTP2_ERR_CALLBACK_FAILURE;
×
760
      }
×
761
    }
123✔
762
  }
337✔
763
  return 0;
337✔
764
}
337✔
765

766
int DoHConnectionToBackend::on_error_callback(nghttp2_session* session, int lib_error_code, const char* msg, size_t len, void* user_data)
767
{
×
NEW
768
  (void)session;
×
NEW
769
  vinfolog("Error in HTTP/2 connection: %s (%d)", std::string(msg, len), lib_error_code);
×
770

771
  DoHConnectionToBackend* conn = reinterpret_cast<DoHConnectionToBackend*>(user_data);
×
772
  conn->d_connectionDied = true;
×
773
  ++conn->d_ds->tcpDiedReadingResponse;
×
774

775
  return 0;
×
776
}
×
777

778
DoHConnectionToBackend::DoHConnectionToBackend(const std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDMultiplexer>& mplexer, const struct timeval& now, std::string&& proxyProtocolPayload) :
779
  ConnectionToBackend(ds, mplexer, now), d_proxyProtocolPayload(std::move(proxyProtocolPayload))
780
{
50✔
781
  // inherit most of the stuff from the ConnectionToBackend()
782
  d_ioState = make_unique<IOStateHandler>(*d_mplexer, d_handler->getDescriptor());
50✔
783

784
  nghttp2_session_callbacks* cbs = nullptr;
50✔
785
  if (nghttp2_session_callbacks_new(&cbs) != 0) {
50!
786
    d_connectionDied = true;
×
787
    ++d_ds->tcpDiedSendingQuery;
×
788
    vinfolog("Unable to create a callback object for a new HTTP/2 session");
×
789
    return;
×
790
  }
×
791
  std::unique_ptr<nghttp2_session_callbacks, void (*)(nghttp2_session_callbacks*)> callbacks(cbs, nghttp2_session_callbacks_del);
50✔
792
  cbs = nullptr;
50✔
793

794
  nghttp2_session_callbacks_set_send_callback(callbacks.get(), send_callback);
50✔
795
  nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks.get(), on_frame_recv_callback);
50✔
796
  nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks.get(), on_data_chunk_recv_callback);
50✔
797
  nghttp2_session_callbacks_set_on_stream_close_callback(callbacks.get(), on_stream_close_callback);
50✔
798
  nghttp2_session_callbacks_set_on_header_callback(callbacks.get(), on_header_callback);
50✔
799
  nghttp2_session_callbacks_set_error_callback2(callbacks.get(), on_error_callback);
50✔
800

801
  nghttp2_session* sess = nullptr;
50✔
802
  if (nghttp2_session_client_new(&sess, callbacks.get(), this) != 0) {
50!
803
    d_connectionDied = true;
×
804
    ++d_ds->tcpDiedSendingQuery;
×
805
    vinfolog("Coult not allocate a new HTTP/2 session");
×
806
    return;
×
807
  }
×
808

809
  d_session = std::unique_ptr<nghttp2_session, void (*)(nghttp2_session*)>(sess, nghttp2_session_del);
50✔
810
  sess = nullptr;
50✔
811

812
  callbacks.reset();
50✔
813

814
  nghttp2_settings_entry iv[] = {
50✔
815
    /* rfc7540 section-8.2.2:
816
       "Advertising a SETTINGS_MAX_CONCURRENT_STREAMS value of zero disables
817
       server push by preventing the server from creating the necessary
818
       streams."
819
    */
820
    {NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0},
50✔
821
    {NGHTTP2_SETTINGS_ENABLE_PUSH, 0},
50✔
822
    /* we might want to make the initial window size configurable, but 16M is a large enough default */
823
    {NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, 16 * 1024 * 1024}};
50✔
824
  /* client 24 bytes magic string will be sent by nghttp2 library */
825
  int rv = nghttp2_submit_settings(d_session.get(), NGHTTP2_FLAG_NONE, iv, sizeof(iv) / sizeof(*iv));
50✔
826
  if (rv != 0) {
50!
827
    d_connectionDied = true;
×
828
    ++d_ds->tcpDiedSendingQuery;
×
829
    vinfolog("Could not submit SETTINGS: %s", nghttp2_strerror(rv));
×
830
    return;
×
831
  }
×
832
}
50✔
833

834
static void handleCrossProtocolQuery(int pipefd, FDMultiplexer::funcparam_t& param)
835
{
101✔
836
  (void)pipefd;
101✔
837
  auto threadData = boost::any_cast<DoHClientThreadData*>(param);
101✔
838

839
  std::unique_ptr<CrossProtocolQuery> cpq{nullptr};
101✔
840
  try {
101✔
841
    auto tmp = threadData->d_receiver.receive();
101✔
842
    if (!tmp) {
101!
843
      return;
×
844
    }
×
845
    cpq = std::move(*tmp);
101✔
846
  }
101✔
847
  catch (const std::exception& e) {
101✔
848
    throw std::runtime_error("Error while reading from the DoH cross-protocol channel:" + std::string(e.what()));
×
849
  }
×
850

851
  struct timeval now
101✔
852
  {
101✔
853
    .tv_sec = 0, .tv_usec = 0
101✔
854
  };
101✔
855
  gettimeofday(&now, nullptr);
101✔
856

857
  std::shared_ptr<TCPQuerySender> tqs = cpq->getTCPQuerySender();
101✔
858
  auto query = std::move(cpq->query);
101✔
859
  auto downstreamServer = std::move(cpq->downstream);
101✔
860
  cpq.reset();
101✔
861

862
  try {
101✔
863
    auto downstream = t_downstreamDoHConnectionsManager.getConnectionToDownstream(threadData->mplexer, downstreamServer, now, std::move(query.d_proxyProtocolPayload));
101✔
864
    downstream->queueQuery(tqs, std::move(query));
101✔
865
  }
101✔
866
  catch (...) {
101✔
867
    TCPResponse response(std::move(query));
×
868
    tqs->notifyIOError(now, std::move(response));
×
869
  }
×
870
}
101✔
871

872
static void dohClientThread(pdns::channel::Receiver<CrossProtocolQuery>&& receiver)
873
{
332✔
874
  setThreadName("dnsdist/dohClie");
332✔
875

876
  try {
332✔
877
    DoHClientThreadData data(std::move(receiver));
332✔
878
    data.mplexer->addReadFD(data.d_receiver.getDescriptor(), handleCrossProtocolQuery, &data);
332✔
879

880
    struct timeval now
332✔
881
    {
332✔
882
      .tv_sec = 0, .tv_usec = 0
332✔
883
    };
332✔
884

885
    gettimeofday(&now, nullptr);
332✔
886
    time_t lastTimeoutScan = now.tv_sec;
332✔
887

888
    for (;;) {
1,278✔
889
      data.mplexer->run(&now, 1000);
1,278✔
890

891
      if (now.tv_sec > lastTimeoutScan) {
1,278✔
892
        lastTimeoutScan = now.tv_sec;
690✔
893

894
        try {
690✔
895
          t_downstreamDoHConnectionsManager.cleanupClosedConnections(now);
690✔
896
          handleH2Timeouts(*data.mplexer, now);
690✔
897

898
          if (g_dohStatesDumpRequested > 0) {
690!
899
            /* just to keep things clean in the output, debug only */
900
            static std::mutex s_lock;
×
901
            std::lock_guard<decltype(s_lock)> lck(s_lock);
×
902
            if (g_dohStatesDumpRequested > 0) {
×
903
              /* no race here, we took the lock so it can only be increased in the meantime */
904
              --g_dohStatesDumpRequested;
×
905
              infolog("Dumping the DoH client states, as requested:");
×
906
              data.mplexer->runForAllWatchedFDs([](bool isRead, int fd, const FDMultiplexer::funcparam_t& param, struct timeval ttd) {
×
907
                struct timeval lnow;
×
908
                gettimeofday(&lnow, nullptr);
×
909
                if (ttd.tv_sec > 0) {
×
910
                  infolog("- Descriptor %d is in %s state, TTD in %d", fd, (isRead ? "read" : "write"), (ttd.tv_sec - lnow.tv_sec));
×
911
                }
×
912
                else {
×
913
                  infolog("- Descriptor %d is in %s state, no TTD set", fd, (isRead ? "read" : "write"));
×
914
                }
×
915

916
                if (param.type() == typeid(std::shared_ptr<DoHConnectionToBackend>)) {
×
917
                  auto conn = boost::any_cast<std::shared_ptr<DoHConnectionToBackend>>(param);
×
918
                  infolog(" - %s", conn->toString());
×
919
                }
×
920
                else if (param.type() == typeid(DoHClientThreadData*)) {
×
921
                  infolog(" - Worker thread pipe");
×
922
                }
×
923
              });
×
924
              infolog("The DoH client cache has %d active and %d idle outgoing connections cached", t_downstreamDoHConnectionsManager.getActiveCount(), t_downstreamDoHConnectionsManager.getIdleCount());
×
925
            }
×
926
          }
×
927
        }
690✔
928
        catch (const std::exception& e) {
690✔
929
          warnlog("Error in outgoing DoH thread: %s", e.what());
×
930
        }
×
931
      }
690✔
932
    }
1,278✔
933
  }
332✔
934
  catch (const std::exception& e) {
332✔
935
    errlog("Fatal error in outgoing DoH thread: %s", e.what());
×
936
  }
×
937
}
332✔
938
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
939

940
struct DoHClientCollection::DoHWorkerThread
941
{
942
  DoHWorkerThread()
943
  {
332✔
944
  }
332✔
945

946
  DoHWorkerThread(pdns::channel::Sender<CrossProtocolQuery>&& sender) :
947
    d_sender(std::move(sender))
948
  {
332✔
949
  }
332✔
950

951
  DoHWorkerThread(DoHWorkerThread&& rhs) :
952
    d_sender(std::move(rhs.d_sender))
953
  {
×
954
  }
×
955

956
  DoHWorkerThread& operator=(DoHWorkerThread&& rhs)
957
  {
332✔
958
    d_sender = std::move(rhs.d_sender);
332✔
959
    return *this;
332✔
960
  }
332✔
961

962
  DoHWorkerThread(const DoHWorkerThread& rhs) = delete;
963
  DoHWorkerThread& operator=(const DoHWorkerThread&) = delete;
964

965
  pdns::channel::Sender<CrossProtocolQuery> d_sender;
966
};
967

968
DoHClientCollection::DoHClientCollection(size_t numberOfThreads) :
969
  d_clientThreads(numberOfThreads)
970
{
332✔
971
}
332✔
972

973
bool DoHClientCollection::passCrossProtocolQueryToThread(std::unique_ptr<CrossProtocolQuery>&& cpq)
974
{
101✔
975
  if (d_numberOfThreads == 0) {
101!
976
    throw std::runtime_error("No DoH worker thread yet");
×
977
  }
×
978

979
  uint64_t pos = d_pos++;
101✔
980
  if (!d_clientThreads.at(pos % d_numberOfThreads).d_sender.send(std::move(cpq))) {
101!
981
    ++dnsdist::metrics::g_stats.outgoingDoHQueryPipeFull;
×
982
    return false;
×
983
  }
×
984

985
  return true;
101✔
986
}
101✔
987

988
void DoHClientCollection::addThread()
989
{
332✔
990
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
332✔
991
  try {
332✔
992
    const auto internalPipeBufferSize = dnsdist::configuration::getImmutableConfiguration().d_tcpInternalPipeBufferSize;
332✔
993
    auto [sender, receiver] = pdns::channel::createObjectQueue<CrossProtocolQuery>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, internalPipeBufferSize);
332✔
994

995
    vinfolog("Adding DoH Client thread");
332✔
996
    std::lock_guard<std::mutex> lock(d_mutex);
332✔
997

998
    if (d_numberOfThreads >= d_clientThreads.size()) {
332!
999
      vinfolog("Adding a new DoH client thread would exceed the vector size (%d/%d), skipping. Consider increasing the maximum amount of DoH client threads with setMaxDoHClientThreads() in the configuration.", d_numberOfThreads, d_clientThreads.size());
×
1000
      return;
×
1001
    }
×
1002

1003
    DoHWorkerThread worker(std::move(sender));
332✔
1004
    try {
332✔
1005
      std::thread t1(dohClientThread, std::move(receiver));
332✔
1006
      t1.detach();
332✔
1007
    }
332✔
1008
    catch (const std::runtime_error& e) {
332✔
1009
      /* the thread creation failed */
1010
      errlog("Error creating a DoH thread: %s", e.what());
×
1011
      return;
×
1012
    }
×
1013

1014
    d_clientThreads.at(d_numberOfThreads) = std::move(worker);
332✔
1015
    ++d_numberOfThreads;
332✔
1016
  }
332✔
1017
  catch (const std::exception& e) {
332✔
1018
    errlog("Error creating the DoH channel: %s", e.what());
×
1019
    return;
×
1020
  }
×
1021
#else /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
1022
  throw std::runtime_error("DoHClientCollection::addThread() called but nghttp2 support is not available");
1023
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
1024
}
332✔
1025

1026
bool initDoHWorkers()
1027
{
332✔
1028
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
332✔
1029
  auto outgoingDoHWorkerThreads = dnsdist::configuration::getImmutableConfiguration().d_outgoingDoHWorkers;
332✔
1030
  if (!outgoingDoHWorkerThreads) {
332✔
1031
    /* Unless the value has been set to 0 explicitly, always start at least one outgoing DoH worker thread, in case a DoH backend
1032
       is added at a later time. */
1033
    outgoingDoHWorkerThreads = 1;
320✔
1034
  }
320✔
1035

1036
  if (outgoingDoHWorkerThreads && *outgoingDoHWorkerThreads > 0) {
332!
1037
    g_dohClientThreads = std::make_unique<DoHClientCollection>(*outgoingDoHWorkerThreads);
332✔
1038
    for (size_t idx = 0; idx < *outgoingDoHWorkerThreads; idx++) {
664✔
1039
      g_dohClientThreads->addThread();
332✔
1040
    }
332✔
1041
  }
332✔
1042
  return true;
332✔
1043
#else
1044
  return false;
1045
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
1046
}
332✔
1047

1048
bool sendH2Query([[maybe_unused]] const std::shared_ptr<DownstreamState>& ds, [[maybe_unused]] std::unique_ptr<FDMultiplexer>& mplexer, [[maybe_unused]] std::shared_ptr<TCPQuerySender>& sender, [[maybe_unused]] InternalQuery&& query, [[maybe_unused]] bool healthCheck)
1049
{
40✔
1050
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
40✔
1051
  struct timeval now
40✔
1052
  {
40✔
1053
    .tv_sec = 0, .tv_usec = 0
40✔
1054
  };
40✔
1055
  gettimeofday(&now, nullptr);
40✔
1056

1057
  if (healthCheck) {
40✔
1058
    /* always do health-checks over a new connection */
1059
    auto newConnection = std::make_shared<DoHConnectionToBackend>(ds, mplexer, now, std::move(query.d_proxyProtocolPayload));
14✔
1060
    newConnection->setHealthCheck(healthCheck);
14✔
1061
    newConnection->queueQuery(sender, std::move(query));
14✔
1062
  }
14✔
1063
  else {
26✔
1064
    auto connection = t_downstreamDoHConnectionsManager.getConnectionToDownstream(mplexer, ds, now, std::move(query.d_proxyProtocolPayload));
26✔
1065
    connection->queueQuery(sender, std::move(query));
26✔
1066
  }
26✔
1067

1068
  return true;
40✔
1069
#else /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
1070
  return false;
1071
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
1072
}
40✔
1073

1074
size_t clearH2Connections()
1075
{
24✔
1076
  size_t cleared = 0;
24✔
1077
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
24✔
1078
  cleared = t_downstreamDoHConnectionsManager.clear();
24✔
1079
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
24✔
1080
  return cleared;
24✔
1081
}
24✔
1082

1083
size_t handleH2Timeouts([[maybe_unused]] FDMultiplexer& mplexer, [[maybe_unused]] const struct timeval& now)
1084
{
708✔
1085
  size_t got = 0;
708✔
1086
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
708✔
1087
  auto expiredReadConns = mplexer.getTimeouts(now, false);
708✔
1088
  for (const auto& cbData : expiredReadConns) {
708✔
1089
    if (cbData.second.type() == typeid(std::shared_ptr<DoHConnectionToBackend>)) {
3✔
1090
      auto conn = boost::any_cast<std::shared_ptr<DoHConnectionToBackend>>(cbData.second);
2✔
1091
      vinfolog("Timeout (read) from remote DoH backend %s", conn->getBackendName());
2!
1092
      conn->handleTimeout(now, false);
2✔
1093
      ++got;
2✔
1094
    }
2✔
1095
  }
3✔
1096

1097
  auto expiredWriteConns = mplexer.getTimeouts(now, true);
708✔
1098
  for (const auto& cbData : expiredWriteConns) {
708✔
1099
    if (cbData.second.type() == typeid(std::shared_ptr<DoHConnectionToBackend>)) {
1!
1100
      auto conn = boost::any_cast<std::shared_ptr<DoHConnectionToBackend>>(cbData.second);
1✔
1101
      vinfolog("Timeout (write) from remote DoH backend %s", conn->getBackendName());
1!
1102
      conn->handleTimeout(now, true);
1✔
1103
      ++got;
1✔
1104
    }
1✔
1105
  }
1✔
1106
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
708✔
1107
  return got;
708✔
1108
}
708✔
1109

1110
void setDoHDownstreamCleanupInterval([[maybe_unused]] uint16_t max)
1111
{
332✔
1112
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
332✔
1113
  DownstreamDoHConnectionsManager::setCleanupInterval(max);
332✔
1114
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
332✔
1115
}
332✔
1116

1117
void setDoHDownstreamMaxIdleTime([[maybe_unused]] uint16_t max)
1118
{
332✔
1119
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
332✔
1120
  DownstreamDoHConnectionsManager::setMaxIdleTime(max);
332✔
1121
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
332✔
1122
}
332✔
1123

1124
void setDoHDownstreamMaxIdleConnectionsPerBackend([[maybe_unused]] size_t max)
1125
{
332✔
1126
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
332✔
1127
  DownstreamDoHConnectionsManager::setMaxIdleConnectionsPerDownstream(max);
332✔
1128
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
332✔
1129
}
332✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc