• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

PowerDNS / pdns / 12595591960

03 Jan 2025 09:27AM UTC coverage: 62.774% (+2.5%) from 60.245%
12595591960

Pull #15008

github

web-flow
Merge c2a2749d3 into 788f396a7
Pull Request #15008: Do not follow CNAME records for ANY or CNAME queries

30393 of 78644 branches covered (38.65%)

Branch coverage included in aggregate %.

105822 of 138350 relevant lines covered (76.49%)

4613078.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.26
/pdns/dnsdistdist/dnsdist-nghttp2.cc
1
/*
2
 * This file is part of PowerDNS or dnsdist.
3
 * Copyright -- PowerDNS.COM B.V. and its contributors
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of version 2 of the GNU General Public License as
7
 * published by the Free Software Foundation.
8
 *
9
 * In addition, for the avoidance of any doubt, permission is granted to
10
 * link this program with OpenSSL and to (re)distribute the binaries
11
 * produced as the result of such linking.
12
 *
13
 * This program is distributed in the hope that it will be useful,
14
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
 * GNU General Public License for more details.
17
 *
18
 * You should have received a copy of the GNU General Public License
19
 * along with this program; if not, write to the Free Software
20
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21
 */
22

23
#include "config.h"
24

25
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
26
#include <nghttp2/nghttp2.h>
27
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
28

29
#include "dnsdist-nghttp2.hh"
30
#include "dnsdist-nghttp2-in.hh"
31
#include "dnsdist-tcp.hh"
32
#include "dnsdist-tcp-downstream.hh"
33
#include "dnsdist-downstream-connection.hh"
34

35
#include "dolog.hh"
36
#include "channel.hh"
37
#include "iputils.hh"
38
#include "libssl.hh"
39
#include "noinitvector.hh"
40
#include "tcpiohandler.hh"
41
#include "threadname.hh"
42
#include "sstuff.hh"
43

44
std::atomic<uint64_t> g_dohStatesDumpRequested{0};
45
std::unique_ptr<DoHClientCollection> g_dohClientThreads{nullptr};
46

47
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
48
class DoHConnectionToBackend : public ConnectionToBackend
49
{
50
public:
51
  DoHConnectionToBackend(const std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDMultiplexer>& mplexer, const struct timeval& now, std::string&& proxyProtocolPayload);
52

53
  void handleTimeout(const struct timeval& now, bool write) override;
54
  void queueQuery(std::shared_ptr<TCPQuerySender>& sender, TCPQuery&& query) override;
55

56
  std::string toString() const override
57
  {
×
58
    ostringstream o;
×
59
    o << "DoH connection to backend " << (d_ds ? d_ds->getName() : "empty") << " over FD " << (d_handler ? std::to_string(d_handler->getDescriptor()) : "no socket") << ", " << getConcurrentStreamsCount() << " streams";
×
60
    return o.str();
×
61
  }
×
62

63
  void setHealthCheck(bool h)
64
  {
12✔
65
    d_healthCheckQuery = h;
12✔
66
  }
12✔
67

68
  void stopIO() override;
69
  bool reachedMaxConcurrentQueries() const override;
70
  bool reachedMaxStreamID() const override;
71
  bool isIdle() const override;
72
  void release(bool removeFromCache) override
73
  {
×
74
    (void)removeFromCache;
×
75
  }
×
76

77
private:
78
  static ssize_t send_callback(nghttp2_session* session, const uint8_t* data, size_t length, int flags, void* user_data);
79
  static int on_frame_recv_callback(nghttp2_session* session, const nghttp2_frame* frame, void* user_data);
80
  static int on_data_chunk_recv_callback(nghttp2_session* session, uint8_t flags, int32_t stream_id, const uint8_t* data, size_t len, void* user_data);
81
  static int on_stream_close_callback(nghttp2_session* session, int32_t stream_id, uint32_t error_code, void* user_data);
82
  static int on_header_callback(nghttp2_session* session, const nghttp2_frame* frame, const uint8_t* name, size_t namelen, const uint8_t* value, size_t valuelen, uint8_t flags, void* user_data);
83
  static int on_error_callback(nghttp2_session* session, int lib_error_code, const char* msg, size_t len, void* user_data);
84
  static void handleReadableIOCallback(int fd, FDMultiplexer::funcparam_t& param);
85
  static void handleWritableIOCallback(int fd, FDMultiplexer::funcparam_t& param);
86

87
  class PendingRequest
88
  {
89
  public:
90
    std::shared_ptr<TCPQuerySender> d_sender{nullptr};
91
    TCPQuery d_query;
92
    PacketBuffer d_buffer;
93
    size_t d_queryPos{0};
94
    uint16_t d_responseCode{0};
95
    bool d_finished{false};
96
  };
97
  void updateIO(IOState newState, const FDMultiplexer::callbackfunc_t& callback, bool noTTD = false);
98
  void watchForRemoteHostClosingConnection();
99
  void handleResponse(PendingRequest&& request);
100
  void handleResponseError(PendingRequest&& request, const struct timeval& now);
101
  void handleIOError();
102
  uint32_t getConcurrentStreamsCount() const;
103

104
  size_t getUsageCount() const
105
  {
×
106
    auto ref = shared_from_this();
×
107
    return ref.use_count();
×
108
  }
×
109

110
  static const std::unordered_map<std::string, std::string> s_constants;
111

112
  std::unordered_map<int32_t, PendingRequest> d_currentStreams;
113
  std::string d_proxyProtocolPayload;
114
  PacketBuffer d_out;
115
  PacketBuffer d_in;
116
  std::unique_ptr<nghttp2_session, void (*)(nghttp2_session*)> d_session{nullptr, nghttp2_session_del};
117
  size_t d_outPos{0};
118
  size_t d_inPos{0};
119
  bool d_healthCheckQuery{false};
120
  bool d_firstWrite{true};
121
};
122

123
using DownstreamDoHConnectionsManager = DownstreamConnectionsManager<DoHConnectionToBackend>;
124
thread_local DownstreamDoHConnectionsManager t_downstreamDoHConnectionsManager;
125

126
uint32_t DoHConnectionToBackend::getConcurrentStreamsCount() const
127
{
1,210✔
128
  return d_currentStreams.size();
1,210✔
129
}
1,210✔
130

131
void DoHConnectionToBackend::handleResponse(PendingRequest&& request)
132
{
102✔
133
  struct timeval now
102✔
134
  {
102✔
135
    .tv_sec = 0, .tv_usec = 0
102✔
136
  };
102✔
137

138
  gettimeofday(&now, nullptr);
102✔
139
  try {
102✔
140
    if (!d_healthCheckQuery) {
102✔
141
      const double udiff = request.d_query.d_idstate.queryRealTime.udiff();
90✔
142
      d_ds->updateTCPLatency(udiff);
90✔
143
      if (request.d_buffer.size() >= sizeof(dnsheader)) {
90✔
144
        dnsheader dh;
85✔
145
        memcpy(&dh, request.d_buffer.data(), sizeof(dh));
85✔
146
        d_ds->reportResponse(dh.rcode);
85✔
147
      }
85✔
148
      else {
5✔
149
        d_ds->reportTimeoutOrError();
5✔
150
      }
5✔
151
    }
90✔
152

153
    TCPResponse response(std::move(request.d_query));
102✔
154
    response.d_buffer = std::move(request.d_buffer);
102✔
155
    response.d_connection = shared_from_this();
102✔
156
    response.d_ds = d_ds;
102✔
157
    request.d_sender->handleResponse(now, std::move(response));
102✔
158
  }
102✔
159
  catch (const std::exception& e) {
102✔
160
    vinfolog("Got exception while handling response for cross-protocol DoH: %s", e.what());
3✔
161
  }
3✔
162
}
102✔
163

164
void DoHConnectionToBackend::handleResponseError(PendingRequest&& request, const struct timeval& now)
165
{
24✔
166
  try {
24✔
167
    if (!d_healthCheckQuery) {
24!
168
      d_ds->reportTimeoutOrError();
24✔
169
    }
24✔
170

171
    TCPResponse response(PacketBuffer(), std::move(request.d_query.d_idstate), nullptr, nullptr);
24✔
172
    request.d_sender->notifyIOError(now, std::move(response));
24✔
173
  }
24✔
174
  catch (const std::exception& e) {
24✔
175
    vinfolog("Got exception while handling response for cross-protocol DoH: %s", e.what());
×
176
  }
×
177
}
24✔
178

179
void DoHConnectionToBackend::handleIOError()
180
{
13✔
181
  d_connectionDied = true;
13✔
182
  nghttp2_session_terminate_session(d_session.get(), NGHTTP2_PROTOCOL_ERROR);
13✔
183

184
  struct timeval now
13✔
185
  {
13✔
186
    .tv_sec = 0, .tv_usec = 0
13✔
187
  };
13✔
188

189
  gettimeofday(&now, nullptr);
13✔
190
  for (auto& request : d_currentStreams) {
17✔
191
    handleResponseError(std::move(request.second), now);
17✔
192
  }
17✔
193

194
  d_currentStreams.clear();
13✔
195
  stopIO();
13✔
196
}
13✔
197

198
void DoHConnectionToBackend::handleTimeout(const struct timeval& now, bool write)
199
{
3✔
200
  if (write) {
3✔
201
    if (d_firstWrite) {
1!
202
      ++d_ds->tcpConnectTimeouts;
×
203
    }
×
204
    else {
1✔
205
      ++d_ds->tcpWriteTimeouts;
1✔
206
    }
1✔
207
  }
1✔
208
  else {
2✔
209
    ++d_ds->tcpReadTimeouts;
2✔
210
  }
2✔
211

212
  handleIOError();
3✔
213
}
3✔
214

215
bool DoHConnectionToBackend::reachedMaxStreamID() const
216
{
518✔
217
  const uint32_t maximumStreamID = (static_cast<uint32_t>(1) << 31) - 1;
518✔
218
  return d_highestStreamID == maximumStreamID;
518✔
219
}
518✔
220

221
bool DoHConnectionToBackend::reachedMaxConcurrentQueries() const
222
{
81✔
223
  // cerr<<"Got "<<getConcurrentStreamsCount()<<" concurrent streams, max is "<<nghttp2_session_get_remote_settings(d_session.get(), NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS)<<endl;
224
  if (nghttp2_session_get_remote_settings(d_session.get(), NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS) <= getConcurrentStreamsCount()) {
81!
225
    return true;
×
226
  }
×
227
  return false;
81✔
228
}
81✔
229

230
bool DoHConnectionToBackend::isIdle() const
231
{
1,076✔
232
  return getConcurrentStreamsCount() == 0;
1,076✔
233
}
1,076✔
234

235
void DoHConnectionToBackend::queueQuery(std::shared_ptr<TCPQuerySender>& sender, TCPQuery&& query)
236
{
128✔
237
  auto payloadSize = std::to_string(query.d_buffer.size());
128✔
238

239
  bool addXForwarded = d_ds->d_config.d_addXForwardedHeaders;
128✔
240

241
  /* We use nghttp2_nv_flag.NGHTTP2_NV_FLAG_NO_COPY_NAME and nghttp2_nv_flag.NGHTTP2_NV_FLAG_NO_COPY_VALUE
242
     to avoid a copy and lowercasing but we need to make sure that the data will outlive the request (nghttp2_on_frame_send_callback called), and that it is already lowercased. */
243
  std::vector<nghttp2_nv> headers;
128✔
244
  // these need to live until after the request headers have been processed
245
  std::string remote;
128✔
246
  std::string remotePort;
128✔
247
  headers.reserve(8 + (addXForwarded ? 3 : 0));
128✔
248

249
  /* Pseudo-headers need to come first (rfc7540 8.1.2.1) */
250
  NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::METHOD_NAME, NGHTTP2Headers::HeaderConstantIndexes::METHOD_VALUE);
128✔
251
  NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::SCHEME_NAME, NGHTTP2Headers::HeaderConstantIndexes::SCHEME_VALUE);
128✔
252
  NGHTTP2Headers::addDynamicHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::AUTHORITY_NAME, d_ds->d_config.d_tlsSubjectName);
128✔
253
  NGHTTP2Headers::addDynamicHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::PATH_NAME, d_ds->d_config.d_dohPath);
128✔
254
  NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::ACCEPT_NAME, NGHTTP2Headers::HeaderConstantIndexes::ACCEPT_VALUE);
128✔
255
  NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::CONTENT_TYPE_NAME, NGHTTP2Headers::HeaderConstantIndexes::CONTENT_TYPE_VALUE);
128✔
256
  NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::USER_AGENT_NAME, NGHTTP2Headers::HeaderConstantIndexes::USER_AGENT_VALUE);
128✔
257
  NGHTTP2Headers::addDynamicHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::CONTENT_LENGTH_NAME, payloadSize);
128✔
258
  /* no need to add these headers for health-check queries */
259
  if (addXForwarded && query.d_idstate.origRemote.getPort() != 0) {
128✔
260
    remote = query.d_idstate.origRemote.toString();
2✔
261
    remotePort = std::to_string(query.d_idstate.origRemote.getPort());
2✔
262
    NGHTTP2Headers::addDynamicHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_FOR_NAME, remote);
2✔
263
    NGHTTP2Headers::addDynamicHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PORT_NAME, remotePort);
2✔
264
    if (query.d_idstate.cs != nullptr) {
2!
265
      if (query.d_idstate.cs->isUDP()) {
2✔
266
        NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_NAME, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_VALUE_DNS_OVER_UDP);
1✔
267
      }
1✔
268
      else if (query.d_idstate.cs->isDoH()) {
1!
269
        if (query.d_idstate.cs->hasTLS()) {
×
270
          NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_NAME, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_VALUE_DNS_OVER_HTTPS);
×
271
        }
×
272
        else {
×
273
          NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_NAME, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_VALUE_DNS_OVER_HTTP);
×
274
        }
×
275
      }
×
276
      else if (query.d_idstate.cs->hasTLS()) {
1!
277
        NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_NAME, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_VALUE_DNS_OVER_TLS);
×
278
      }
×
279
      else {
1✔
280
        NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_NAME, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_VALUE_DNS_OVER_TCP);
1✔
281
      }
1✔
282
    }
2✔
283
  }
2✔
284

285
  PendingRequest pending;
128✔
286
  pending.d_query = std::move(query);
128✔
287
  pending.d_sender = std::move(sender);
128✔
288

289
  uint32_t streamId = nghttp2_session_get_next_stream_id(d_session.get());
128✔
290
  auto insertPair = d_currentStreams.insert({streamId, std::move(pending)});
128✔
291
  if (!insertPair.second) {
128!
292
    /* there is a stream ID collision, something is very wrong! */
293
    d_connectionDied = true;
×
294
    nghttp2_session_terminate_session(d_session.get(), NGHTTP2_NO_ERROR);
×
295
    throw std::runtime_error("Stream ID collision");
×
296
  }
×
297

298
  /* if data_prd is not NULL, it provides data which will be sent in subsequent DATA frames. In this case, a method that allows request message bodies (https://tools.ietf.org/html/rfc7231#section-4) must be specified with :method key (e.g. POST). This function does not take ownership of the data_prd. The function copies the members of the data_prd. If data_prd is NULL, HEADERS have END_STREAM set.
299
   */
300
  nghttp2_data_provider data_provider;
128✔
301

302
  data_provider.source.ptr = this;
128✔
303
  data_provider.read_callback = [](nghttp2_session* session, int32_t stream_id, uint8_t* buf, size_t length, uint32_t* data_flags, nghttp2_data_source* source, void* user_data) -> ssize_t {
128✔
304
    auto* conn = static_cast<DoHConnectionToBackend*>(user_data);
127✔
305
    auto& request = conn->d_currentStreams.at(stream_id);
127✔
306
    size_t toCopy = 0;
127✔
307
    if (request.d_queryPos < request.d_query.d_buffer.size()) {
127!
308
      size_t remaining = request.d_query.d_buffer.size() - request.d_queryPos;
127✔
309
      toCopy = length > remaining ? remaining : length;
127!
310
      memcpy(buf, &request.d_query.d_buffer.at(request.d_queryPos), toCopy);
127✔
311
      request.d_queryPos += toCopy;
127✔
312
    }
127✔
313

314
    if (request.d_queryPos >= request.d_query.d_buffer.size()) {
127!
315
      *data_flags |= NGHTTP2_DATA_FLAG_EOF;
127✔
316
    }
127✔
317
    return toCopy;
127✔
318
  };
127✔
319

320
  auto newStreamId = nghttp2_submit_request(d_session.get(), nullptr, headers.data(), headers.size(), &data_provider, this);
128✔
321
  if (newStreamId < 0) {
128!
322
    d_connectionDied = true;
×
323
    ++d_ds->tcpDiedSendingQuery;
×
324
    d_currentStreams.erase(streamId);
×
325
    throw std::runtime_error("Error submitting HTTP request:" + std::string(nghttp2_strerror(newStreamId)));
×
326
  }
×
327

328
  auto rv = nghttp2_session_send(d_session.get());
128✔
329
  if (rv != 0) {
128!
330
    d_connectionDied = true;
×
331
    ++d_ds->tcpDiedSendingQuery;
×
332
    d_currentStreams.erase(streamId);
×
333
    throw std::runtime_error("Error in nghttp2_session_send:" + std::to_string(rv));
×
334
  }
×
335

336
  d_highestStreamID = newStreamId;
128✔
337
}
128✔
338

339
class DoHClientThreadData
340
{
341
public:
342
  DoHClientThreadData(pdns::channel::Receiver<CrossProtocolQuery>&& receiver) :
343
    mplexer(std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent())),
344
    d_receiver(std::move(receiver))
345
  {
320✔
346
  }
320✔
347

348
  std::unique_ptr<FDMultiplexer> mplexer{nullptr};
349
  pdns::channel::Receiver<CrossProtocolQuery> d_receiver;
350
};
351

352
void DoHConnectionToBackend::handleReadableIOCallback(int fd, FDMultiplexer::funcparam_t& param)
353
{
201✔
354
  auto conn = boost::any_cast<std::shared_ptr<DoHConnectionToBackend>>(param);
201✔
355
  if (fd != conn->getHandle()) {
201!
356
    throw std::runtime_error("Unexpected socket descriptor " + std::to_string(fd) + " received in " + std::string(__PRETTY_FUNCTION__) + ", expected " + std::to_string(conn->getHandle()));
×
357
  }
×
358

359
  IOStateGuard ioGuard(conn->d_ioState);
201✔
360
  do {
254✔
361
    conn->d_inPos = 0;
254✔
362
    conn->d_in.resize(conn->d_in.size() + 512);
254✔
363
    // cerr<<"trying to read "<<conn->d_in.size()<<endl;
364
    try {
254✔
365
      IOState newState = conn->d_handler->tryRead(conn->d_in, conn->d_inPos, conn->d_in.size(), true);
254✔
366
      // cerr<<"got a "<<(int)newState<<" state and "<<conn->d_inPos<<" bytes"<<endl;
367
      conn->d_in.resize(conn->d_inPos);
254✔
368

369
      if (conn->d_inPos > 0) {
254✔
370
        /* we got something */
371
        auto readlen = nghttp2_session_mem_recv(conn->d_session.get(), conn->d_in.data(), conn->d_inPos);
163✔
372
        // cerr<<"nghttp2_session_mem_recv returned "<<readlen<<endl;
373
        /* as long as we don't require a pause by returning nghttp2_error.NGHTTP2_ERR_PAUSE from a CB,
374
           all data should be consumed before returning */
375
        if (readlen > 0 && static_cast<size_t>(readlen) < conn->d_inPos) {
163!
376
          throw std::runtime_error("Fatal error while passing received data to nghttp2: " + std::string(nghttp2_strerror((int)readlen)));
×
377
        }
×
378

379
        struct timeval now
163✔
380
        {
163✔
381
          .tv_sec = 0, .tv_usec = 0
163✔
382
        };
163✔
383

384
        gettimeofday(&now, nullptr);
163✔
385
        conn->d_lastDataReceivedTime = now;
163✔
386

387
        // cerr<<"after read send"<<endl;
388
        nghttp2_session_send(conn->d_session.get());
163✔
389
      }
163✔
390

391
      if (newState == IOState::Done) {
254✔
392
        if (conn->isIdle()) {
162✔
393
          conn->stopIO();
109✔
394
          conn->watchForRemoteHostClosingConnection();
109✔
395
          ioGuard.release();
109✔
396
          break;
109✔
397
        }
109✔
398
      }
162✔
399
      else {
92✔
400
        if (newState == IOState::NeedWrite) {
92!
401
          // cerr<<"need write"<<endl;
402
          conn->updateIO(IOState::NeedWrite, handleReadableIOCallback);
×
403
        }
×
404
        ioGuard.release();
92✔
405
        break;
92✔
406
      }
92✔
407
    }
254✔
408
    catch (const std::exception& e) {
254✔
409
      vinfolog("Exception while trying to read from HTTP backend connection: %s", e.what());
5✔
410
      ++conn->d_ds->tcpDiedReadingResponse;
5✔
411
      conn->handleIOError();
5✔
412
      break;
5✔
413
    }
5✔
414
  } while (conn->getConcurrentStreamsCount() > 0);
254!
415
}
201✔
416

417
void DoHConnectionToBackend::handleWritableIOCallback(int fd, FDMultiplexer::funcparam_t& param)
418
{
31✔
419
  auto conn = boost::any_cast<std::shared_ptr<DoHConnectionToBackend>>(param);
31✔
420
  if (fd != conn->getHandle()) {
31!
421
    throw std::runtime_error("Unexpected socket descriptor " + std::to_string(fd) + " received in " + std::string(__PRETTY_FUNCTION__) + ", expected " + std::to_string(conn->getHandle()));
×
422
  }
×
423
  IOStateGuard ioGuard(conn->d_ioState);
31✔
424

425
  // cerr<<"in "<<__PRETTY_FUNCTION__<<" trying to write "<<conn->d_out.size()-conn->d_outPos<<endl;
426
  try {
31✔
427
    IOState newState = conn->d_handler->tryWrite(conn->d_out, conn->d_outPos, conn->d_out.size());
31✔
428
    // cerr<<"got a "<<(int)newState<<" state, "<<conn->d_out.size()-conn->d_outPos<<" bytes remaining"<<endl;
429
    if (newState == IOState::NeedRead) {
31!
430
      conn->updateIO(IOState::NeedRead, handleWritableIOCallback);
×
431
    }
×
432
    else if (newState == IOState::Done) {
31✔
433
      // cerr<<"done, buffer size was "<<conn->d_out.size()<<", pos was "<<conn->d_outPos<<endl;
434
      conn->d_firstWrite = false;
27✔
435
      conn->d_out.clear();
27✔
436
      conn->d_outPos = 0;
27✔
437
      conn->stopIO();
27✔
438
      if (!conn->isIdle()) {
27!
439
        conn->updateIO(IOState::NeedRead, handleReadableIOCallback);
27✔
440
      }
27✔
441
      else {
×
442
        conn->watchForRemoteHostClosingConnection();
×
443
      }
×
444
    }
27✔
445
    ioGuard.release();
31✔
446
  }
31✔
447
  catch (const std::exception& e) {
31✔
448
    vinfolog("Exception while trying to write (ready) to HTTP backend connection: %s", e.what());
4!
449
    ++conn->d_ds->tcpDiedSendingQuery;
4✔
450
    conn->handleIOError();
4✔
451
  }
4✔
452
}
31✔
453

454
void DoHConnectionToBackend::stopIO()
455
{
517✔
456
  d_ioState->reset();
517✔
457

458
  if (isIdle()) {
517✔
459
    auto shared = std::dynamic_pointer_cast<DoHConnectionToBackend>(shared_from_this());
241✔
460
    if (!willBeReusable(false)) {
241✔
461
      /* remove ourselves from the connection cache, this might mean that our
462
         reference count drops to zero after that, so we need to be careful */
463
      t_downstreamDoHConnectionsManager.removeDownstreamConnection(shared);
31✔
464
    }
31✔
465
    else {
210✔
466
      t_downstreamDoHConnectionsManager.moveToIdle(shared);
210✔
467
    }
210✔
468
  }
241✔
469
}
517✔
470

471
void DoHConnectionToBackend::updateIO(IOState newState, const FDMultiplexer::callbackfunc_t& callback, bool noTTD)
472
{
490✔
473
  struct timeval now
490✔
474
  {
490✔
475
    .tv_sec = 0, .tv_usec = 0
490✔
476
  };
490✔
477

478
  gettimeofday(&now, nullptr);
490✔
479
  boost::optional<struct timeval> ttd{boost::none};
490✔
480
  if (!noTTD) {
490!
481
    if (d_healthCheckQuery) {
490✔
482
      ttd = getBackendHealthCheckTTD(now);
36✔
483
    }
36✔
484
    else if (newState == IOState::NeedRead) {
454✔
485
      ttd = getBackendReadTTD(now);
452✔
486
    }
452✔
487
    else if (isFresh() && d_firstWrite) {
2!
488
      /* first write just after the non-blocking connect */
489
      ttd = getBackendConnectTTD(now);
×
490
    }
×
491
    else {
2✔
492
      ttd = getBackendWriteTTD(now);
2✔
493
    }
2✔
494
  }
490✔
495

496
  auto shared = std::dynamic_pointer_cast<DoHConnectionToBackend>(shared_from_this());
490✔
497
  if (shared) {
490!
498
    if (newState == IOState::NeedRead) {
490✔
499
      d_ioState->update(newState, callback, std::move(shared), ttd);
488✔
500
    }
488✔
501
    else if (newState == IOState::NeedWrite) {
2!
502
      d_ioState->update(newState, callback, std::move(shared), ttd);
2✔
503
    }
2✔
504
  }
490✔
505
}
490✔
506

507
void DoHConnectionToBackend::watchForRemoteHostClosingConnection()
508
{
224✔
509
  if (willBeReusable(false) && !d_healthCheckQuery) {
224✔
510
    updateIO(IOState::NeedRead, handleReadableIOCallback, false);
182✔
511
  }
182✔
512
}
224✔
513

514
ssize_t DoHConnectionToBackend::send_callback(nghttp2_session* session, const uint8_t* data, size_t length, int flags, void* user_data)
515
{
387✔
516
  DoHConnectionToBackend* conn = reinterpret_cast<DoHConnectionToBackend*>(user_data);
387✔
517
  bool bufferWasEmpty = conn->d_out.empty();
387✔
518
  if (!conn->d_proxyProtocolPayloadSent && !conn->d_proxyProtocolPayload.empty()) {
387✔
519
    conn->d_out.insert(conn->d_out.end(), conn->d_proxyProtocolPayload.begin(), conn->d_proxyProtocolPayload.end());
4✔
520
    conn->d_proxyProtocolPayloadSent = true;
4✔
521
  }
4✔
522

523
  conn->d_out.insert(conn->d_out.end(), data, data + length);
387✔
524

525
  if (bufferWasEmpty) {
387✔
526
    try {
292✔
527
      // cerr<<"in "<<__PRETTY_FUNCTION__<<" trying to write "<<conn->d_out.size()-conn->d_outPos<<endl;
528
      auto state = conn->d_handler->tryWrite(conn->d_out, conn->d_outPos, conn->d_out.size());
292✔
529
      // cerr<<"got a "<<(int)state<<" state, "<<conn->d_out.size()-conn->d_outPos<<" bytes remaining"<<endl;
530
      if (state == IOState::Done) {
292✔
531
        conn->d_firstWrite = false;
259✔
532
        conn->d_out.clear();
259✔
533
        conn->d_outPos = 0;
259✔
534
        conn->stopIO();
259✔
535
        if (!conn->isIdle()) {
259✔
536
          conn->updateIO(IOState::NeedRead, handleReadableIOCallback);
249✔
537
        }
249✔
538
        else {
10✔
539
          conn->watchForRemoteHostClosingConnection();
10✔
540
        }
10✔
541
      }
259✔
542
      else {
33✔
543
        conn->updateIO(state, handleWritableIOCallback);
33✔
544
      }
33✔
545
    }
292✔
546
    catch (const std::exception& e) {
292✔
547
      vinfolog("Exception while trying to write (send) to HTTP backend connection: %s", e.what());
1!
548
      conn->handleIOError();
1✔
549
      ++conn->d_ds->tcpDiedSendingQuery;
1✔
550
    }
1✔
551
  }
292✔
552

553
  return length;
387✔
554
}
387✔
555

556
int DoHConnectionToBackend::on_frame_recv_callback(nghttp2_session* session, const nghttp2_frame* frame, void* user_data)
557
{
298✔
558
  DoHConnectionToBackend* conn = reinterpret_cast<DoHConnectionToBackend*>(user_data);
298✔
559
  // cerr<<"Frame type is "<<std::to_string(frame->hd.type)<<endl;
560
#if 0
561
  switch (frame->hd.type) {
562
  case NGHTTP2_HEADERS:
563
    cerr<<"got headers"<<endl;
564
    if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE) {
565
      cerr<<"All headers received"<<endl;
566
    }
567
    break;
568
  case NGHTTP2_WINDOW_UPDATE:
569
    cerr<<"got window update"<<endl;
570
    break;
571
  case NGHTTP2_SETTINGS:
572
    cerr<<"got settings"<<endl;
573
    cerr<<frame->settings.niv<<endl;
574
    for (size_t idx = 0; idx < frame->settings.niv; idx++) {
575
      cerr<<"- "<<frame->settings.iv[idx].settings_id<<" "<<frame->settings.iv[idx].value<<endl;
576
    }
577
    break;
578
  case NGHTTP2_DATA:
579
    cerr<<"got data"<<endl;
580
    break;
581
  }
582
#endif
583

584
  if (frame->hd.type == NGHTTP2_GOAWAY) {
298✔
585
    conn->d_connectionDied = true;
6✔
586
  }
6✔
587

588
  /* is this the last frame for this stream? */
589
  else if ((frame->hd.type == NGHTTP2_HEADERS || frame->hd.type == NGHTTP2_DATA) && frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
292✔
590
    auto stream = conn->d_currentStreams.find(frame->hd.stream_id);
107✔
591
    if (stream != conn->d_currentStreams.end()) {
107!
592
      // cerr<<"Stream "<<frame->hd.stream_id<<" is now finished"<<endl;
593
      stream->second.d_finished = true;
107✔
594
      ++conn->d_queries;
107✔
595

596
      auto request = std::move(stream->second);
107✔
597
      conn->d_currentStreams.erase(stream->first);
107✔
598
      if (request.d_responseCode == 200U) {
107✔
599
        conn->handleResponse(std::move(request));
102✔
600
      }
102✔
601
      else {
5✔
602
        vinfolog("HTTP response has a non-200 status code: %d", request.d_responseCode);
5✔
603
        struct timeval now
5✔
604
        {
5✔
605
          .tv_sec = 0, .tv_usec = 0
5✔
606
        };
5✔
607

608
        gettimeofday(&now, nullptr);
5✔
609

610
        conn->handleResponseError(std::move(request), now);
5✔
611
      }
5✔
612

613
      if (conn->isIdle()) {
107✔
614
        conn->stopIO();
103✔
615
        conn->watchForRemoteHostClosingConnection();
103✔
616
      }
103✔
617
    }
107✔
618
    else {
×
619
      vinfolog("Stream %d NOT FOUND", frame->hd.stream_id);
×
620
      conn->d_connectionDied = true;
×
621
      ++conn->d_ds->tcpDiedReadingResponse;
×
622
      return NGHTTP2_ERR_CALLBACK_FAILURE;
×
623
    }
×
624
  }
107✔
625

626
  return 0;
298✔
627
}
298✔
628

629
int DoHConnectionToBackend::on_data_chunk_recv_callback(nghttp2_session* session, uint8_t flags, int32_t stream_id, const uint8_t* data, size_t len, void* user_data)
630
{
106✔
631
  DoHConnectionToBackend* conn = reinterpret_cast<DoHConnectionToBackend*>(user_data);
106✔
632
  // cerr<<"Got data of size "<<len<<" for stream "<<stream_id<<endl;
633
  auto stream = conn->d_currentStreams.find(stream_id);
106✔
634
  if (stream == conn->d_currentStreams.end()) {
106!
635
    vinfolog("Unable to match the stream ID %d to a known one!", stream_id);
×
636
    conn->d_connectionDied = true;
×
637
    ++conn->d_ds->tcpDiedReadingResponse;
×
638
    return NGHTTP2_ERR_CALLBACK_FAILURE;
×
639
  }
×
640
  if (len > std::numeric_limits<uint16_t>::max() || (std::numeric_limits<uint16_t>::max() - stream->second.d_buffer.size()) < len) {
106!
641
    vinfolog("Data frame of size %d is too large for a DNS response (we already have %d)", len, stream->second.d_buffer.size());
×
642
    conn->d_connectionDied = true;
×
643
    ++conn->d_ds->tcpDiedReadingResponse;
×
644
    return NGHTTP2_ERR_CALLBACK_FAILURE;
×
645
  }
×
646

647
  stream->second.d_buffer.insert(stream->second.d_buffer.end(), data, data + len);
106✔
648
  if (stream->second.d_finished) {
106!
649
    // cerr<<"we now have the full response!"<<endl;
650
    // cerr<<std::string(reinterpret_cast<const char*>(data), len)<<endl;
651

652
    auto request = std::move(stream->second);
×
653
    conn->d_currentStreams.erase(stream->first);
×
654
    if (request.d_responseCode == 200U) {
×
655
      conn->handleResponse(std::move(request));
×
656
    }
×
657
    else {
×
658
      vinfolog("HTTP response has a non-200 status code: %d", request.d_responseCode);
×
659
      struct timeval now
×
660
      {
×
661
        .tv_sec = 0, .tv_usec = 0
×
662
      };
×
663

664
      gettimeofday(&now, nullptr);
×
665

666
      conn->handleResponseError(std::move(request), now);
×
667
    }
×
668
    if (conn->isIdle()) {
×
669
      conn->stopIO();
×
670
      conn->watchForRemoteHostClosingConnection();
×
671
    }
×
672
  }
×
673

674
  return 0;
106✔
675
}
106✔
676

677
int DoHConnectionToBackend::on_stream_close_callback(nghttp2_session* session, int32_t stream_id, uint32_t error_code, void* user_data)
678
{
111✔
679
  DoHConnectionToBackend* conn = reinterpret_cast<DoHConnectionToBackend*>(user_data);
111✔
680

681
  if (error_code == 0) {
111✔
682
    return 0;
107✔
683
  }
107✔
684

685
  // cerr << "Stream " << stream_id << " closed with error_code=" << error_code << endl;
686
  conn->d_connectionDied = true;
4✔
687
  ++conn->d_ds->tcpDiedReadingResponse;
4✔
688

689
  auto stream = conn->d_currentStreams.find(stream_id);
4✔
690
  if (stream == conn->d_currentStreams.end()) {
4!
691
    /* we don't care, then */
692
    return 0;
×
693
  }
×
694

695
  struct timeval now
4✔
696
  {
4✔
697
    .tv_sec = 0, .tv_usec = 0
4✔
698
  };
4✔
699

700
  gettimeofday(&now, nullptr);
4✔
701
  auto request = std::move(stream->second);
4✔
702
  conn->d_currentStreams.erase(stream->first);
4✔
703

704
  // cerr<<"Query has "<<request.d_query.d_downstreamFailures<<" failures, backend limit is "<<conn->d_ds->d_retries<<endl;
705
  if (request.d_query.d_downstreamFailures < conn->d_ds->d_config.d_retries) {
4✔
706
    // cerr<<"in "<<__PRETTY_FUNCTION__<<", looking for a connection to send a query of size "<<request.d_query.d_buffer.size()<<endl;
707
    ++request.d_query.d_downstreamFailures;
2✔
708
    auto downstream = t_downstreamDoHConnectionsManager.getConnectionToDownstream(conn->d_mplexer, conn->d_ds, now, std::string(conn->d_proxyProtocolPayload));
2✔
709
    downstream->queueQuery(request.d_sender, std::move(request.d_query));
2✔
710
  }
2✔
711
  else {
2✔
712
    conn->handleResponseError(std::move(request), now);
2✔
713
  }
2✔
714

715
  // cerr<<"we now have "<<conn->getConcurrentStreamsCount()<<" concurrent connections"<<endl;
716
  if (conn->isIdle()) {
4✔
717
    // cerr<<"stopping IO"<<endl;
718
    conn->stopIO();
2✔
719
    conn->watchForRemoteHostClosingConnection();
2✔
720
  }
2✔
721

722
  return 0;
4✔
723
}
4✔
724

725
int DoHConnectionToBackend::on_header_callback(nghttp2_session* session, const nghttp2_frame* frame, const uint8_t* name, size_t namelen, const uint8_t* value, size_t valuelen, uint8_t flags, void* user_data)
726
{
292✔
727
  DoHConnectionToBackend* conn = reinterpret_cast<DoHConnectionToBackend*>(user_data);
292✔
728

729
  const std::string status(":status");
292✔
730
  if (frame->hd.type == NGHTTP2_HEADERS && frame->headers.cat == NGHTTP2_HCAT_RESPONSE) {
292!
731
    // cerr<<"got header for "<<frame->hd.stream_id<<":"<<endl;
732
    // cerr<<"- "<<std::string(reinterpret_cast<const char*>(name), namelen)<<endl;
733
    // cerr<<"- "<<std::string(reinterpret_cast<const char*>(value), valuelen)<<endl;
734
    if (namelen == status.size() && memcmp(status.data(), name, status.size()) == 0) {
292!
735
      auto stream = conn->d_currentStreams.find(frame->hd.stream_id);
108✔
736
      if (stream == conn->d_currentStreams.end()) {
108!
737
        vinfolog("Unable to match the stream ID %d to a known one!", frame->hd.stream_id);
×
738
        conn->d_connectionDied = true;
×
739
        return NGHTTP2_ERR_CALLBACK_FAILURE;
×
740
      }
×
741
      try {
108✔
742
        pdns::checked_stoi_into(stream->second.d_responseCode, std::string(reinterpret_cast<const char*>(value), valuelen));
108✔
743
      }
108✔
744
      catch (...) {
108✔
745
        vinfolog("Error parsing the status header for stream ID %d", frame->hd.stream_id);
×
746
        conn->d_connectionDied = true;
×
747
        ++conn->d_ds->tcpDiedReadingResponse;
×
748
        return NGHTTP2_ERR_CALLBACK_FAILURE;
×
749
      }
×
750
    }
108✔
751
  }
292✔
752
  return 0;
292✔
753
}
292✔
754

755
int DoHConnectionToBackend::on_error_callback(nghttp2_session* session, int lib_error_code, const char* msg, size_t len, void* user_data)
756
{
×
757
  vinfolog("Error in HTTP/2 connection: %s", std::string(msg, len));
×
758

759
  DoHConnectionToBackend* conn = reinterpret_cast<DoHConnectionToBackend*>(user_data);
×
760
  conn->d_connectionDied = true;
×
761
  ++conn->d_ds->tcpDiedReadingResponse;
×
762

763
  return 0;
×
764
}
×
765

766
DoHConnectionToBackend::DoHConnectionToBackend(const std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDMultiplexer>& mplexer, const struct timeval& now, std::string&& proxyProtocolPayload) :
767
  ConnectionToBackend(ds, mplexer, now), d_proxyProtocolPayload(std::move(proxyProtocolPayload))
768
{
47✔
769
  // inherit most of the stuff from the ConnectionToBackend()
770
  d_ioState = make_unique<IOStateHandler>(*d_mplexer, d_handler->getDescriptor());
47✔
771

772
  nghttp2_session_callbacks* cbs = nullptr;
47✔
773
  if (nghttp2_session_callbacks_new(&cbs) != 0) {
47!
774
    d_connectionDied = true;
×
775
    ++d_ds->tcpDiedSendingQuery;
×
776
    vinfolog("Unable to create a callback object for a new HTTP/2 session");
×
777
    return;
×
778
  }
×
779
  std::unique_ptr<nghttp2_session_callbacks, void (*)(nghttp2_session_callbacks*)> callbacks(cbs, nghttp2_session_callbacks_del);
47✔
780
  cbs = nullptr;
47✔
781

782
  nghttp2_session_callbacks_set_send_callback(callbacks.get(), send_callback);
47✔
783
  nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks.get(), on_frame_recv_callback);
47✔
784
  nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks.get(), on_data_chunk_recv_callback);
47✔
785
  nghttp2_session_callbacks_set_on_stream_close_callback(callbacks.get(), on_stream_close_callback);
47✔
786
  nghttp2_session_callbacks_set_on_header_callback(callbacks.get(), on_header_callback);
47✔
787
  nghttp2_session_callbacks_set_error_callback2(callbacks.get(), on_error_callback);
47✔
788

789
  nghttp2_session* sess = nullptr;
47✔
790
  if (nghttp2_session_client_new(&sess, callbacks.get(), this) != 0) {
47!
791
    d_connectionDied = true;
×
792
    ++d_ds->tcpDiedSendingQuery;
×
793
    vinfolog("Coult not allocate a new HTTP/2 session");
×
794
    return;
×
795
  }
×
796

797
  d_session = std::unique_ptr<nghttp2_session, void (*)(nghttp2_session*)>(sess, nghttp2_session_del);
47✔
798
  sess = nullptr;
47✔
799

800
  callbacks.reset();
47✔
801

802
  nghttp2_settings_entry iv[] = {
47✔
803
    /* rfc7540 section-8.2.2:
804
       "Advertising a SETTINGS_MAX_CONCURRENT_STREAMS value of zero disables
805
       server push by preventing the server from creating the necessary
806
       streams."
807
    */
808
    {NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0},
47✔
809
    {NGHTTP2_SETTINGS_ENABLE_PUSH, 0},
47✔
810
    /* we might want to make the initial window size configurable, but 16M is a large enough default */
811
    {NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, 16 * 1024 * 1024}};
47✔
812
  /* client 24 bytes magic string will be sent by nghttp2 library */
813
  int rv = nghttp2_submit_settings(d_session.get(), NGHTTP2_FLAG_NONE, iv, sizeof(iv) / sizeof(*iv));
47✔
814
  if (rv != 0) {
47!
815
    d_connectionDied = true;
×
816
    ++d_ds->tcpDiedSendingQuery;
×
817
    vinfolog("Could not submit SETTINGS: %s", nghttp2_strerror(rv));
×
818
    return;
×
819
  }
×
820
}
47✔
821

822
static void handleCrossProtocolQuery(int pipefd, FDMultiplexer::funcparam_t& param)
823
{
88✔
824
  auto threadData = boost::any_cast<DoHClientThreadData*>(param);
88✔
825

826
  std::unique_ptr<CrossProtocolQuery> cpq{nullptr};
88✔
827
  try {
88✔
828
    auto tmp = threadData->d_receiver.receive();
88✔
829
    if (!tmp) {
88!
830
      return;
×
831
    }
×
832
    cpq = std::move(*tmp);
88✔
833
  }
88✔
834
  catch (const std::exception& e) {
88✔
835
    throw std::runtime_error("Error while reading from the DoH cross-protocol channel:" + std::string(e.what()));
×
836
  }
×
837

838
  struct timeval now
88✔
839
  {
88✔
840
    .tv_sec = 0, .tv_usec = 0
88✔
841
  };
88✔
842
  gettimeofday(&now, nullptr);
88✔
843

844
  std::shared_ptr<TCPQuerySender> tqs = cpq->getTCPQuerySender();
88✔
845
  auto query = std::move(cpq->query);
88✔
846
  auto downstreamServer = std::move(cpq->downstream);
88✔
847
  cpq.reset();
88✔
848

849
  try {
88✔
850
    auto downstream = t_downstreamDoHConnectionsManager.getConnectionToDownstream(threadData->mplexer, downstreamServer, now, std::move(query.d_proxyProtocolPayload));
88✔
851
    downstream->queueQuery(tqs, std::move(query));
88✔
852
  }
88✔
853
  catch (...) {
88✔
854
    TCPResponse response(std::move(query));
×
855
    tqs->notifyIOError(now, std::move(response));
×
856
  }
×
857
}
88✔
858

859
static void dohClientThread(pdns::channel::Receiver<CrossProtocolQuery>&& receiver)
860
{
320✔
861
  setThreadName("dnsdist/dohClie");
320✔
862

863
  try {
320✔
864
    DoHClientThreadData data(std::move(receiver));
320✔
865
    data.mplexer->addReadFD(data.d_receiver.getDescriptor(), handleCrossProtocolQuery, &data);
320✔
866

867
    struct timeval now
320✔
868
    {
320✔
869
      .tv_sec = 0, .tv_usec = 0
320✔
870
    };
320✔
871

872
    gettimeofday(&now, nullptr);
320✔
873
    time_t lastTimeoutScan = now.tv_sec;
320✔
874

875
    for (;;) {
1,228✔
876
      data.mplexer->run(&now, 1000);
1,228✔
877

878
      if (now.tv_sec > lastTimeoutScan) {
1,228✔
879
        lastTimeoutScan = now.tv_sec;
677✔
880

881
        try {
677✔
882
          t_downstreamDoHConnectionsManager.cleanupClosedConnections(now);
677✔
883
          handleH2Timeouts(*data.mplexer, now);
677✔
884

885
          if (g_dohStatesDumpRequested > 0) {
677!
886
            /* just to keep things clean in the output, debug only */
887
            static std::mutex s_lock;
×
888
            std::lock_guard<decltype(s_lock)> lck(s_lock);
×
889
            if (g_dohStatesDumpRequested > 0) {
×
890
              /* no race here, we took the lock so it can only be increased in the meantime */
891
              --g_dohStatesDumpRequested;
×
892
              infolog("Dumping the DoH client states, as requested:");
×
893
              data.mplexer->runForAllWatchedFDs([](bool isRead, int fd, const FDMultiplexer::funcparam_t& param, struct timeval ttd) {
×
894
                struct timeval lnow;
×
895
                gettimeofday(&lnow, nullptr);
×
896
                if (ttd.tv_sec > 0) {
×
897
                  infolog("- Descriptor %d is in %s state, TTD in %d", fd, (isRead ? "read" : "write"), (ttd.tv_sec - lnow.tv_sec));
×
898
                }
×
899
                else {
×
900
                  infolog("- Descriptor %d is in %s state, no TTD set", fd, (isRead ? "read" : "write"));
×
901
                }
×
902

903
                if (param.type() == typeid(std::shared_ptr<DoHConnectionToBackend>)) {
×
904
                  auto conn = boost::any_cast<std::shared_ptr<DoHConnectionToBackend>>(param);
×
905
                  infolog(" - %s", conn->toString());
×
906
                }
×
907
                else if (param.type() == typeid(DoHClientThreadData*)) {
×
908
                  infolog(" - Worker thread pipe");
×
909
                }
×
910
              });
×
911
              infolog("The DoH client cache has %d active and %d idle outgoing connections cached", t_downstreamDoHConnectionsManager.getActiveCount(), t_downstreamDoHConnectionsManager.getIdleCount());
×
912
            }
×
913
          }
×
914
        }
677✔
915
        catch (const std::exception& e) {
677✔
916
          warnlog("Error in outgoing DoH thread: %s", e.what());
×
917
        }
×
918
      }
677✔
919
    }
1,228✔
920
  }
320✔
921
  catch (const std::exception& e) {
320✔
922
    errlog("Fatal error in outgoing DoH thread: %s", e.what());
×
923
  }
×
924
}
320✔
925
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
926

927
struct DoHClientCollection::DoHWorkerThread
928
{
929
  DoHWorkerThread()
930
  {
320✔
931
  }
320✔
932

933
  DoHWorkerThread(pdns::channel::Sender<CrossProtocolQuery>&& sender) :
934
    d_sender(std::move(sender))
935
  {
320✔
936
  }
320✔
937

938
  DoHWorkerThread(DoHWorkerThread&& rhs) :
939
    d_sender(std::move(rhs.d_sender))
940
  {
×
941
  }
×
942

943
  DoHWorkerThread& operator=(DoHWorkerThread&& rhs)
944
  {
320✔
945
    d_sender = std::move(rhs.d_sender);
320✔
946
    return *this;
320✔
947
  }
320✔
948

949
  DoHWorkerThread(const DoHWorkerThread& rhs) = delete;
950
  DoHWorkerThread& operator=(const DoHWorkerThread&) = delete;
951

952
  pdns::channel::Sender<CrossProtocolQuery> d_sender;
953
};
954

955
DoHClientCollection::DoHClientCollection(size_t numberOfThreads) :
956
  d_clientThreads(numberOfThreads)
957
{
320✔
958
}
320✔
959

960
bool DoHClientCollection::passCrossProtocolQueryToThread(std::unique_ptr<CrossProtocolQuery>&& cpq)
961
{
88✔
962
  if (d_numberOfThreads == 0) {
88!
963
    throw std::runtime_error("No DoH worker thread yet");
×
964
  }
×
965

966
  uint64_t pos = d_pos++;
88✔
967
  if (!d_clientThreads.at(pos % d_numberOfThreads).d_sender.send(std::move(cpq))) {
88!
968
    ++dnsdist::metrics::g_stats.outgoingDoHQueryPipeFull;
×
969
    return false;
×
970
  }
×
971

972
  return true;
88✔
973
}
88✔
974

975
void DoHClientCollection::addThread()
976
{
320✔
977
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
320✔
978
  try {
320✔
979
    const auto internalPipeBufferSize = dnsdist::configuration::getImmutableConfiguration().d_tcpInternalPipeBufferSize;
320✔
980
    auto [sender, receiver] = pdns::channel::createObjectQueue<CrossProtocolQuery>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, internalPipeBufferSize);
320✔
981

982
    vinfolog("Adding DoH Client thread");
320✔
983
    std::lock_guard<std::mutex> lock(d_mutex);
320✔
984

985
    if (d_numberOfThreads >= d_clientThreads.size()) {
320!
986
      vinfolog("Adding a new DoH client thread would exceed the vector size (%d/%d), skipping. Consider increasing the maximum amount of DoH client threads with setMaxDoHClientThreads() in the configuration.", d_numberOfThreads, d_clientThreads.size());
×
987
      return;
×
988
    }
×
989

990
    DoHWorkerThread worker(std::move(sender));
320✔
991
    try {
320✔
992
      std::thread t1(dohClientThread, std::move(receiver));
320✔
993
      t1.detach();
320✔
994
    }
320✔
995
    catch (const std::runtime_error& e) {
320✔
996
      /* the thread creation failed */
997
      errlog("Error creating a DoH thread: %s", e.what());
×
998
      return;
×
999
    }
×
1000

1001
    d_clientThreads.at(d_numberOfThreads) = std::move(worker);
320✔
1002
    ++d_numberOfThreads;
320✔
1003
  }
320✔
1004
  catch (const std::exception& e) {
320✔
1005
    errlog("Error creating the DoH channel: %s", e.what());
×
1006
    return;
×
1007
  }
×
1008
#else /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
1009
  throw std::runtime_error("DoHClientCollection::addThread() called but nghttp2 support is not available");
1010
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
1011
}
320✔
1012

1013
bool initDoHWorkers()
1014
{
320✔
1015
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
320✔
1016
  auto outgoingDoHWorkerThreads = dnsdist::configuration::getImmutableConfiguration().d_outgoingDoHWorkers;
320✔
1017
  if (!outgoingDoHWorkerThreads) {
320✔
1018
    /* Unless the value has been set to 0 explicitly, always start at least one outgoing DoH worker thread, in case a DoH backend
1019
       is added at a later time. */
1020
    outgoingDoHWorkerThreads = 1;
309✔
1021
  }
309✔
1022

1023
  if (outgoingDoHWorkerThreads && *outgoingDoHWorkerThreads > 0) {
320!
1024
    g_dohClientThreads = std::make_unique<DoHClientCollection>(*outgoingDoHWorkerThreads);
320✔
1025
    for (size_t idx = 0; idx < *outgoingDoHWorkerThreads; idx++) {
640✔
1026
      g_dohClientThreads->addThread();
320✔
1027
    }
320✔
1028
  }
320✔
1029
  return true;
320✔
1030
#else
1031
  return false;
1032
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
1033
}
320✔
1034

1035
bool sendH2Query(const std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDMultiplexer>& mplexer, std::shared_ptr<TCPQuerySender>& sender, InternalQuery&& query, bool healthCheck)
1036
{
38✔
1037
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
38✔
1038
  struct timeval now
38✔
1039
  {
38✔
1040
    .tv_sec = 0, .tv_usec = 0
38✔
1041
  };
38✔
1042
  gettimeofday(&now, nullptr);
38✔
1043

1044
  if (healthCheck) {
38✔
1045
    /* always do health-checks over a new connection */
1046
    auto newConnection = std::make_shared<DoHConnectionToBackend>(ds, mplexer, now, std::move(query.d_proxyProtocolPayload));
12✔
1047
    newConnection->setHealthCheck(healthCheck);
12✔
1048
    newConnection->queueQuery(sender, std::move(query));
12✔
1049
  }
12✔
1050
  else {
26✔
1051
    auto connection = t_downstreamDoHConnectionsManager.getConnectionToDownstream(mplexer, ds, now, std::move(query.d_proxyProtocolPayload));
26✔
1052
    connection->queueQuery(sender, std::move(query));
26✔
1053
  }
26✔
1054

1055
  return true;
38✔
1056
#else /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
1057
  return false;
1058
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
1059
}
38✔
1060

1061
size_t clearH2Connections()
1062
{
24✔
1063
  size_t cleared = 0;
24✔
1064
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
24✔
1065
  cleared = t_downstreamDoHConnectionsManager.clear();
24✔
1066
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
24✔
1067
  return cleared;
24✔
1068
}
24✔
1069

1070
size_t handleH2Timeouts(FDMultiplexer& mplexer, const struct timeval& now)
1071
{
695✔
1072
  size_t got = 0;
695✔
1073
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
695✔
1074
  auto expiredReadConns = mplexer.getTimeouts(now, false);
695✔
1075
  for (const auto& cbData : expiredReadConns) {
695✔
1076
    if (cbData.second.type() == typeid(std::shared_ptr<DoHConnectionToBackend>)) {
3✔
1077
      auto conn = boost::any_cast<std::shared_ptr<DoHConnectionToBackend>>(cbData.second);
2✔
1078
      vinfolog("Timeout (read) from remote DoH backend %s", conn->getBackendName());
2!
1079
      conn->handleTimeout(now, false);
2✔
1080
      ++got;
2✔
1081
    }
2✔
1082
  }
3✔
1083

1084
  auto expiredWriteConns = mplexer.getTimeouts(now, true);
695✔
1085
  for (const auto& cbData : expiredWriteConns) {
695✔
1086
    if (cbData.second.type() == typeid(std::shared_ptr<DoHConnectionToBackend>)) {
1!
1087
      auto conn = boost::any_cast<std::shared_ptr<DoHConnectionToBackend>>(cbData.second);
1✔
1088
      vinfolog("Timeout (write) from remote DoH backend %s", conn->getBackendName());
1!
1089
      conn->handleTimeout(now, true);
1✔
1090
      ++got;
1✔
1091
    }
1✔
1092
  }
1✔
1093
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
695✔
1094
  return got;
695✔
1095
}
695✔
1096

1097
void setDoHDownstreamCleanupInterval(uint16_t max)
1098
{
320✔
1099
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
320✔
1100
  DownstreamDoHConnectionsManager::setCleanupInterval(max);
320✔
1101
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
320✔
1102
}
320✔
1103

1104
void setDoHDownstreamMaxIdleTime(uint16_t max)
1105
{
320✔
1106
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
320✔
1107
  DownstreamDoHConnectionsManager::setMaxIdleTime(max);
320✔
1108
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
320✔
1109
}
320✔
1110

1111
void setDoHDownstreamMaxIdleConnectionsPerBackend(size_t max)
1112
{
320✔
1113
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
320✔
1114
  DownstreamDoHConnectionsManager::setMaxIdleConnectionsPerDownstream(max);
320✔
1115
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
320✔
1116
}
320✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc