• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

PowerDNS / pdns / 19741624072

27 Nov 2025 03:45PM UTC coverage: 73.086% (+0.02%) from 73.065%
19741624072

Pull #16570

github

web-flow
Merge 08a2cdb1d into f94a3f63f
Pull Request #16570: rec: rewrite all unwrap calls in web.rs

38523 of 63408 branches covered (60.75%)

Branch coverage included in aggregate %.

128044 of 164496 relevant lines covered (77.84%)

6531485.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.4
/pdns/dnsdistdist/dnsdist-nghttp2.cc
1
/*
2
 * This file is part of PowerDNS or dnsdist.
3
 * Copyright -- PowerDNS.COM B.V. and its contributors
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of version 2 of the GNU General Public License as
7
 * published by the Free Software Foundation.
8
 *
9
 * In addition, for the avoidance of any doubt, permission is granted to
10
 * link this program with OpenSSL and to (re)distribute the binaries
11
 * produced as the result of such linking.
12
 *
13
 * This program is distributed in the hope that it will be useful,
14
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
 * GNU General Public License for more details.
17
 *
18
 * You should have received a copy of the GNU General Public License
19
 * along with this program; if not, write to the Free Software
20
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21
 */
22

23
#include "config.h"
24

25
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
26
#include <nghttp2/nghttp2.h>
27
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
28

29
#include "dnsdist-nghttp2.hh"
30
#include "dnsdist-nghttp2-in.hh"
31
#include "dnsdist-tcp.hh"
32
#include "dnsdist-tcp-downstream.hh"
33
#include "dnsdist-downstream-connection.hh"
34

35
#include "dolog.hh"
36
#include "channel.hh"
37
#include "iputils.hh"
38
#include "libssl.hh"
39
#include "noinitvector.hh"
40
#include "tcpiohandler.hh"
41
#include "threadname.hh"
42
#include "sstuff.hh"
43

44
std::atomic<uint64_t> g_dohStatesDumpRequested{0};
45
std::unique_ptr<DoHClientCollection> g_dohClientThreads{nullptr};
46

47
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
48
class DoHConnectionToBackend : public ConnectionToBackend
49
{
50
public:
51
  using StreamID = int32_t;
52

53
  DoHConnectionToBackend(const std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDMultiplexer>& mplexer, const struct timeval& now, std::string&& proxyProtocolPayload);
54

55
  void handleTimeout(const struct timeval& now, bool write) override;
56
  void queueQuery(std::shared_ptr<TCPQuerySender>& sender, TCPQuery&& query) override;
57

58
  std::string toString() const override
59
  {
×
60
    ostringstream o;
×
61
    o << "DoH connection to backend " << (d_ds ? d_ds->getName() : "empty") << " over FD " << (d_handler ? std::to_string(d_handler->getDescriptor()) : "no socket") << ", " << getConcurrentStreamsCount() << " streams";
×
62
    return o.str();
×
63
  }
×
64

65
  void setHealthCheck(bool h)
66
  {
14✔
67
    d_healthCheckQuery = h;
14✔
68
  }
14✔
69

70
  void stopIO() override;
71
  bool reachedMaxConcurrentQueries() const override;
72
  bool reachedMaxStreamID() const override;
73
  bool isIdle() const override;
74
  void release(bool removeFromCache) override
75
  {
×
76
    (void)removeFromCache;
×
77
  }
×
78

79
private:
80
  static ssize_t send_callback(nghttp2_session* session, const uint8_t* data, size_t length, int flags, void* user_data);
81
  static int on_frame_recv_callback(nghttp2_session* session, const nghttp2_frame* frame, void* user_data);
82
  static int on_data_chunk_recv_callback(nghttp2_session* session, uint8_t flags, StreamID stream_id, const uint8_t* data, size_t len, void* user_data);
83
  static int on_stream_close_callback(nghttp2_session* session, StreamID stream_id, uint32_t error_code, void* user_data);
84
  static int on_header_callback(nghttp2_session* session, const nghttp2_frame* frame, const uint8_t* name, size_t namelen, const uint8_t* value, size_t valuelen, uint8_t flags, void* user_data);
85
  static int on_error_callback(nghttp2_session* session, int lib_error_code, const char* msg, size_t len, void* user_data);
86
  static void handleReadableIOCallback(int fd, FDMultiplexer::funcparam_t& param);
87
  static void handleWritableIOCallback(int fd, FDMultiplexer::funcparam_t& param);
88

89
  class PendingRequest
90
  {
91
  public:
92
    std::shared_ptr<TCPQuerySender> d_sender{nullptr};
93
    TCPQuery d_query;
94
    PacketBuffer d_buffer;
95
    size_t d_queryPos{0};
96
    uint16_t d_responseCode{0};
97
    bool d_finished{false};
98
  };
99
  void updateIO(IOState newState, const FDMultiplexer::callbackfunc_t& callback, bool noTTD = false);
100
  void watchForRemoteHostClosingConnection();
101
  void handleResponse(PendingRequest&& request);
102
  void handleResponseError(PendingRequest&& request, const struct timeval& now);
103
  void handleIOError();
104
  uint32_t getConcurrentStreamsCount() const;
105

106
  size_t getUsageCount() const
107
  {
×
108
    auto ref = shared_from_this();
×
109
    return ref.use_count();
×
110
  }
×
111

112
  static const std::unordered_map<std::string, std::string> s_constants;
113

114
  std::unordered_map<StreamID, PendingRequest> d_currentStreams;
115
  std::string d_proxyProtocolPayload;
116
  PacketBuffer d_out;
117
  PacketBuffer d_in;
118
  std::unique_ptr<nghttp2_session, void (*)(nghttp2_session*)> d_session{nullptr, nghttp2_session_del};
119
  size_t d_outPos{0};
120
  size_t d_inPos{0};
121
  bool d_healthCheckQuery{false};
122
  bool d_firstWrite{true};
123
  bool d_inIOCallback{false};
124
};
125

126
using DownstreamDoHConnectionsManager = DownstreamConnectionsManager<DoHConnectionToBackend>;
127
thread_local DownstreamDoHConnectionsManager t_downstreamDoHConnectionsManager;
128

129
uint32_t DoHConnectionToBackend::getConcurrentStreamsCount() const
130
{
1,761✔
131
  return d_currentStreams.size();
1,761✔
132
}
1,761✔
133

134
void DoHConnectionToBackend::handleResponse(PendingRequest&& request)
135
{
137✔
136
  struct timeval now{
137✔
137
    .tv_sec = 0, .tv_usec = 0};
137✔
138

139
  gettimeofday(&now, nullptr);
137✔
140
  try {
137✔
141
    if (!d_healthCheckQuery) {
137✔
142
      const double udiff = request.d_query.d_idstate.queryRealTime.udiff();
123✔
143
      d_ds->updateTCPLatency(udiff);
123✔
144
      if (request.d_buffer.size() >= sizeof(dnsheader)) {
123✔
145
        dnsheader dh;
117✔
146
        memcpy(&dh, request.d_buffer.data(), sizeof(dh));
117✔
147
        d_ds->reportResponse(dh.rcode);
117✔
148
      }
117✔
149
      else {
6✔
150
        d_ds->reportTimeoutOrError();
6✔
151
      }
6✔
152
    }
123✔
153

154
    TCPResponse response(std::move(request.d_query));
137✔
155
    response.d_buffer = std::move(request.d_buffer);
137✔
156
    response.d_connection = shared_from_this();
137✔
157
    response.d_ds = d_ds;
137✔
158
    request.d_sender->handleResponse(now, std::move(response));
137✔
159
  }
137✔
160
  catch (const std::exception& e) {
137✔
161
    vinfolog("Got exception while handling response for cross-protocol DoH: %s", e.what());
4✔
162
  }
4✔
163
}
137✔
164

165
void DoHConnectionToBackend::handleResponseError(PendingRequest&& request, const struct timeval& now)
166
{
36✔
167
  try {
36✔
168
    if (!d_healthCheckQuery) {
36!
169
      d_ds->reportTimeoutOrError();
36✔
170
    }
36✔
171

172
    TCPResponse response(PacketBuffer(), std::move(request.d_query.d_idstate), nullptr, nullptr);
36✔
173
    request.d_sender->notifyIOError(now, std::move(response));
36✔
174
  }
36✔
175
  catch (const std::exception& e) {
36✔
176
    vinfolog("Got exception while handling response for cross-protocol DoH: %s", e.what());
×
177
  }
×
178
}
36✔
179

180
void DoHConnectionToBackend::handleIOError()
181
{
29✔
182
  d_connectionDied = true;
29✔
183
  nghttp2_session_terminate_session(d_session.get(), NGHTTP2_PROTOCOL_ERROR);
29✔
184

185
  struct timeval now{
29✔
186
    .tv_sec = 0, .tv_usec = 0};
29✔
187

188
  const auto& chains = dnsdist::configuration::getCurrentRuntimeConfiguration().d_ruleChains;
29✔
189
  const auto& timeoutRespRules = dnsdist::rules::getResponseRuleChain(chains, dnsdist::rules::ResponseRuleChain::TimeoutResponseRules);
29✔
190

191
  gettimeofday(&now, nullptr);
29✔
192
  for (auto& request : d_currentStreams) {
37✔
193
    if (!d_healthCheckQuery && handleTimeoutResponseRules(timeoutRespRules, request.second.d_query.d_idstate, d_ds, request.second.d_sender)) {
32!
194
      d_ds->reportTimeoutOrError();
6✔
195
    }
6✔
196
    else {
26✔
197
      handleResponseError(std::move(request.second), now);
26✔
198
    }
26✔
199
  }
32✔
200

201
  d_currentStreams.clear();
29✔
202
  stopIO();
29✔
203
}
29✔
204

205
void DoHConnectionToBackend::handleTimeout(const struct timeval& now, bool write)
206
{
17✔
207
  (void)now;
17✔
208
  if (write) {
17✔
209
    if (d_firstWrite) {
2!
210
      ++d_ds->tcpConnectTimeouts;
×
211
    }
×
212
    else {
2✔
213
      ++d_ds->tcpWriteTimeouts;
2✔
214
    }
2✔
215
  }
2✔
216
  else {
15✔
217
    ++d_ds->tcpReadTimeouts;
15✔
218
  }
15✔
219

220
  handleIOError();
17✔
221
}
17✔
222

223
bool DoHConnectionToBackend::reachedMaxStreamID() const
224
{
693✔
225
  const uint32_t maximumStreamID = (static_cast<uint32_t>(1) << 31) - 1;
693✔
226
  return d_highestStreamID == maximumStreamID;
693✔
227
}
693✔
228

229
bool DoHConnectionToBackend::reachedMaxConcurrentQueries() const
230
{
104✔
231
  // cerr<<"Got "<<getConcurrentStreamsCount()<<" concurrent streams, max is "<<nghttp2_session_get_remote_settings(d_session.get(), NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS)<<endl;
232
  if (nghttp2_session_get_remote_settings(d_session.get(), NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS) <= getConcurrentStreamsCount()) {
104!
233
    return true;
×
234
  }
×
235
  return false;
104✔
236
}
104✔
237

238
bool DoHConnectionToBackend::isIdle() const
239
{
1,573✔
240
  return getConcurrentStreamsCount() == 0;
1,573✔
241
}
1,573✔
242

243
void DoHConnectionToBackend::queueQuery(std::shared_ptr<TCPQuerySender>& sender, TCPQuery&& query)
244
{
183✔
245
  auto payloadSize = std::to_string(query.d_buffer.size());
183✔
246

247
  bool addXForwarded = d_ds->d_config.d_addXForwardedHeaders;
183✔
248

249
  /* We use nghttp2_nv_flag.NGHTTP2_NV_FLAG_NO_COPY_NAME and nghttp2_nv_flag.NGHTTP2_NV_FLAG_NO_COPY_VALUE
250
     to avoid a copy and lowercasing but we need to make sure that the data will outlive the request (nghttp2_on_frame_send_callback called), and that it is already lowercased. */
251
  std::vector<nghttp2_nv> headers;
183✔
252
  // these need to live until after the request headers have been processed
253
  std::string remote;
183✔
254
  std::string remotePort;
183✔
255
  headers.reserve(8 + (addXForwarded ? 3 : 0));
183✔
256

257
  /* Pseudo-headers need to come first (rfc7540 8.1.2.1) */
258
  NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::METHOD_NAME, NGHTTP2Headers::HeaderConstantIndexes::METHOD_VALUE);
183✔
259
  NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::SCHEME_NAME, NGHTTP2Headers::HeaderConstantIndexes::SCHEME_VALUE);
183✔
260
  NGHTTP2Headers::addDynamicHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::AUTHORITY_NAME, d_ds->d_config.d_tlsSubjectName);
183✔
261
  NGHTTP2Headers::addDynamicHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::PATH_NAME, d_ds->d_config.d_dohPath);
183✔
262
  NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::ACCEPT_NAME, NGHTTP2Headers::HeaderConstantIndexes::ACCEPT_VALUE);
183✔
263
  NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::CONTENT_TYPE_NAME, NGHTTP2Headers::HeaderConstantIndexes::CONTENT_TYPE_VALUE);
183✔
264
  NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::USER_AGENT_NAME, NGHTTP2Headers::HeaderConstantIndexes::USER_AGENT_VALUE);
183✔
265
  NGHTTP2Headers::addDynamicHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::CONTENT_LENGTH_NAME, payloadSize);
183✔
266
  /* no need to add these headers for health-check queries */
267
  if (addXForwarded && query.d_idstate.origRemote.getPort() != 0) {
183✔
268
    remote = query.d_idstate.origRemote.toString();
2✔
269
    remotePort = std::to_string(query.d_idstate.origRemote.getPort());
2✔
270
    NGHTTP2Headers::addDynamicHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_FOR_NAME, remote);
2✔
271
    NGHTTP2Headers::addDynamicHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PORT_NAME, remotePort);
2✔
272
    if (query.d_idstate.cs != nullptr) {
2!
273
      if (query.d_idstate.cs->isUDP()) {
2✔
274
        NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_NAME, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_VALUE_DNS_OVER_UDP);
1✔
275
      }
1✔
276
      else if (query.d_idstate.cs->isDoH()) {
1!
277
        if (query.d_idstate.cs->hasTLS()) {
×
278
          NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_NAME, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_VALUE_DNS_OVER_HTTPS);
×
279
        }
×
280
        else {
×
281
          NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_NAME, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_VALUE_DNS_OVER_HTTP);
×
282
        }
×
283
      }
×
284
      else if (query.d_idstate.cs->hasTLS()) {
1!
285
        NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_NAME, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_VALUE_DNS_OVER_TLS);
×
286
      }
×
287
      else {
1✔
288
        NGHTTP2Headers::addStaticHeader(headers, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_NAME, NGHTTP2Headers::HeaderConstantIndexes::X_FORWARDED_PROTO_VALUE_DNS_OVER_TCP);
1✔
289
      }
1✔
290
    }
2✔
291
  }
2✔
292

293
  PendingRequest pending;
183✔
294
  pending.d_query = std::move(query);
183✔
295
  pending.d_sender = std::move(sender);
183✔
296

297
  uint32_t tentativeStreamId = nghttp2_session_get_next_stream_id(d_session.get());
183✔
298
  if (tentativeStreamId == static_cast<uint32_t>(1 << 31)) {
183!
299
    /* running out of stream IDs */
300
    d_connectionDied = true;
×
301
    nghttp2_session_terminate_session(d_session.get(), NGHTTP2_NO_ERROR);
×
302
    throw std::runtime_error("No more stream IDs");
×
303
  }
×
304

305
  auto streamId = static_cast<StreamID>(tentativeStreamId);
183✔
306
  auto insertPair = d_currentStreams.insert({streamId, std::move(pending)});
183✔
307
  if (!insertPair.second) {
183!
308
    /* there is a stream ID collision, something is very wrong! */
309
    d_connectionDied = true;
×
310
    nghttp2_session_terminate_session(d_session.get(), NGHTTP2_NO_ERROR);
×
311
    throw std::runtime_error("Stream ID collision");
×
312
  }
×
313

314
  /* if data_prd is not NULL, it provides data which will be sent in subsequent DATA frames. In this case, a method that allows request message bodies (https://tools.ietf.org/html/rfc7231#section-4) must be specified with :method key (e.g. POST). This function does not take ownership of the data_prd. The function copies the members of the data_prd. If data_prd is NULL, HEADERS have END_STREAM set.
315
   */
316
  nghttp2_data_provider data_provider;
183✔
317

318
  data_provider.source.ptr = this;
183✔
319
  data_provider.read_callback = [](nghttp2_session* session, StreamID stream_id, uint8_t* buf, size_t length, uint32_t* data_flags, nghttp2_data_source* source, void* user_data) -> ssize_t {
183✔
320
    (void)session;
181✔
321
    (void)source;
181✔
322
    auto* conn = static_cast<DoHConnectionToBackend*>(user_data);
181✔
323
    auto& request = conn->d_currentStreams.at(stream_id);
181✔
324
    size_t toCopy = 0;
181✔
325
    if (request.d_queryPos < request.d_query.d_buffer.size()) {
181!
326
      size_t remaining = request.d_query.d_buffer.size() - request.d_queryPos;
181✔
327
      toCopy = length > remaining ? remaining : length;
181!
328
      memcpy(buf, &request.d_query.d_buffer.at(request.d_queryPos), toCopy);
181✔
329
      request.d_queryPos += toCopy;
181✔
330
    }
181✔
331

332
    if (request.d_queryPos >= request.d_query.d_buffer.size()) {
181!
333
      *data_flags |= NGHTTP2_DATA_FLAG_EOF;
181✔
334
    }
181✔
335
    return toCopy;
181✔
336
  };
181✔
337

338
  auto newStreamId = nghttp2_submit_request(d_session.get(), nullptr, headers.data(), headers.size(), &data_provider, this);
183✔
339
  if (newStreamId < 0) {
183!
340
    d_connectionDied = true;
×
341
    ++d_ds->tcpDiedSendingQuery;
×
342
    d_currentStreams.erase(streamId);
×
343
    throw std::runtime_error("Error submitting HTTP request:" + std::string(nghttp2_strerror(newStreamId)));
×
344
  }
×
345

346
  if (!d_inIOCallback) {
183!
347
    auto rtv = nghttp2_session_send(d_session.get());
183✔
348
    if (rtv != 0) {
183!
349
      d_connectionDied = true;
×
350
      ++d_ds->tcpDiedSendingQuery;
×
351
      d_currentStreams.erase(streamId);
×
352
      throw std::runtime_error("Error in nghttp2_session_send: " + std::to_string(rtv));
×
353
    }
×
354
  }
183✔
355

356
  d_highestStreamID = newStreamId;
183✔
357
}
183✔
358

359
class DoHClientThreadData
360
{
361
public:
362
  DoHClientThreadData(pdns::channel::Receiver<CrossProtocolQuery>&& receiver) :
363
    mplexer(std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent())),
400✔
364
    d_receiver(std::move(receiver))
400✔
365
  {
400✔
366
  }
400✔
367

368
  std::unique_ptr<FDMultiplexer> mplexer{nullptr};
369
  pdns::channel::Receiver<CrossProtocolQuery> d_receiver;
370
};
371

372
void DoHConnectionToBackend::handleReadableIOCallback(int fd, FDMultiplexer::funcparam_t& param)
373
{
286✔
374
  auto conn = boost::any_cast<std::shared_ptr<DoHConnectionToBackend>>(param);
286✔
375
  if (fd != conn->getHandle()) {
286!
376
    throw std::runtime_error("Unexpected socket descriptor " + std::to_string(fd) + " received in " + std::string(__PRETTY_FUNCTION__) + ", expected " + std::to_string(conn->getHandle()));
×
377
  }
×
378

379
  if (conn->d_inIOCallback) {
286!
380
    return;
×
381
  }
×
382
  dnsdist::tcp::HandlingIOGuard handlingIOGuard(conn->d_inIOCallback);
286✔
383
  IOStateGuard ioGuard(conn->d_ioState);
286✔
384
  do {
370✔
385
    conn->d_inPos = 0;
370✔
386
    conn->d_in.resize(conn->d_in.size() + 512);
370✔
387
    // cerr<<"trying to read "<<conn->d_in.size()<<endl;
388
    try {
370✔
389
      IOState newState = conn->d_handler->tryRead(conn->d_in, conn->d_inPos, conn->d_in.size(), true);
370✔
390
      // cerr<<"got a "<<(int)newState<<" state and "<<conn->d_inPos<<" bytes"<<endl;
391
      conn->d_in.resize(conn->d_inPos);
370✔
392

393
      if (conn->d_inPos > 0) {
370✔
394
        /* we got something */
395
        auto readlen = nghttp2_session_mem_recv(conn->d_session.get(), conn->d_in.data(), conn->d_inPos);
233✔
396
        // cerr<<"nghttp2_session_mem_recv returned "<<readlen<<endl;
397
        /* as long as we don't require a pause by returning nghttp2_error.NGHTTP2_ERR_PAUSE from a CB,
398
           all data should be consumed before returning */
399
        if (readlen > 0 && static_cast<size_t>(readlen) < conn->d_inPos) {
233!
400
          throw std::runtime_error("Fatal error while passing received data to nghttp2: " + std::string(nghttp2_strerror((int)readlen)));
×
401
        }
×
402

403
        struct timeval now{
233✔
404
          .tv_sec = 0, .tv_usec = 0};
233✔
405

406
        gettimeofday(&now, nullptr);
233✔
407
        conn->d_lastDataReceivedTime = now;
233✔
408

409
        // cerr<<"after read send"<<endl;
410
        nghttp2_session_send(conn->d_session.get());
233✔
411
      }
233✔
412

413
      if (newState == IOState::Done) {
370✔
414
        if (conn->isIdle()) {
231✔
415
          conn->stopIO();
147✔
416
          conn->watchForRemoteHostClosingConnection();
147✔
417
          ioGuard.release();
147✔
418
          break;
147✔
419
        }
147✔
420
      }
231✔
421
      else {
139✔
422
        if (newState == IOState::NeedWrite) {
139!
423
          // cerr<<"need write"<<endl;
424
          conn->updateIO(IOState::NeedWrite, handleReadableIOCallback);
×
425
        }
×
426
        ioGuard.release();
139✔
427
        break;
139✔
428
      }
139✔
429
    }
370✔
430
    catch (const std::exception& e) {
370✔
431
      vinfolog("Exception while trying to read from HTTP backend connection: %s", e.what());
6✔
432
      ++conn->d_ds->tcpDiedReadingResponse;
6✔
433
      conn->handleIOError();
6✔
434
      break;
6✔
435
    }
6✔
436
  } while (conn->getConcurrentStreamsCount() > 0);
370!
437
}
286✔
438

439
void DoHConnectionToBackend::handleWritableIOCallback(int fd, FDMultiplexer::funcparam_t& param)
440
{
47✔
441
  auto conn = boost::any_cast<std::shared_ptr<DoHConnectionToBackend>>(param);
47✔
442
  if (fd != conn->getHandle()) {
47!
443
    throw std::runtime_error("Unexpected socket descriptor " + std::to_string(fd) + " received in " + std::string(__PRETTY_FUNCTION__) + ", expected " + std::to_string(conn->getHandle()));
×
444
  }
×
445
  IOStateGuard ioGuard(conn->d_ioState);
47✔
446

447
  // cerr<<"in "<<__PRETTY_FUNCTION__<<" trying to write "<<conn->d_out.size()-conn->d_outPos<<endl;
448
  try {
47✔
449
    IOState newState = conn->d_handler->tryWrite(conn->d_out, conn->d_outPos, conn->d_out.size());
47✔
450
    // cerr<<"got a "<<(int)newState<<" state, "<<conn->d_out.size()-conn->d_outPos<<" bytes remaining"<<endl;
451
    if (newState == IOState::NeedRead) {
47!
452
      conn->updateIO(IOState::NeedRead, handleWritableIOCallback);
×
453
    }
×
454
    else if (newState == IOState::Done) {
47✔
455
      // cerr<<"done, buffer size was "<<conn->d_out.size()<<", pos was "<<conn->d_outPos<<endl;
456
      conn->d_firstWrite = false;
43✔
457
      conn->d_out.clear();
43✔
458
      conn->d_outPos = 0;
43✔
459
      conn->stopIO();
43✔
460
      if (!conn->isIdle()) {
43!
461
        conn->updateIO(IOState::NeedRead, handleReadableIOCallback);
43✔
462
      }
43✔
463
      else {
×
464
        conn->watchForRemoteHostClosingConnection();
×
465
      }
×
466
    }
43✔
467
    ioGuard.release();
47✔
468
  }
47✔
469
  catch (const std::exception& e) {
47✔
470
    vinfolog("Exception while trying to write (ready) to HTTP backend connection: %s", e.what());
4!
471
    ++conn->d_ds->tcpDiedSendingQuery;
4✔
472
    conn->handleIOError();
4✔
473
  }
4✔
474
}
47✔
475

476
void DoHConnectionToBackend::stopIO()
477
{
757✔
478
  d_ioState->reset();
757✔
479

480
  if (isIdle()) {
757✔
481
    auto shared = std::dynamic_pointer_cast<DoHConnectionToBackend>(shared_from_this());
343✔
482
    if (!willBeReusable(false)) {
343✔
483
      /* remove ourselves from the connection cache, this might mean that our
484
         reference count drops to zero after that, so we need to be careful */
485
      t_downstreamDoHConnectionsManager.removeDownstreamConnection(shared);
61✔
486
    }
61✔
487
    else {
282✔
488
      t_downstreamDoHConnectionsManager.moveToIdle(shared);
282✔
489
    }
282✔
490
  }
343✔
491
}
757✔
492

493
void DoHConnectionToBackend::updateIO(IOState newState, const FDMultiplexer::callbackfunc_t& callback, bool noTTD)
494
{
709✔
495
  struct timeval now{
709✔
496
    .tv_sec = 0, .tv_usec = 0};
709✔
497

498
  gettimeofday(&now, nullptr);
709✔
499
  boost::optional<struct timeval> ttd{boost::none};
709✔
500
  if (!noTTD) {
709!
501
    if (d_healthCheckQuery) {
709✔
502
      ttd = getBackendHealthCheckTTD(now);
42✔
503
    }
42✔
504
    else if (newState == IOState::NeedRead) {
667✔
505
      ttd = getBackendReadTTD(now);
663✔
506
    }
663✔
507
    else if (isFresh() && d_firstWrite) {
4!
508
      /* first write just after the non-blocking connect */
509
      ttd = getBackendConnectTTD(now);
×
510
    }
×
511
    else {
4✔
512
      ttd = getBackendWriteTTD(now);
4✔
513
    }
4✔
514
  }
709✔
515

516
  auto shared = std::dynamic_pointer_cast<DoHConnectionToBackend>(shared_from_this());
709✔
517
  if (shared) {
709!
518
    if (newState == IOState::NeedRead) {
709✔
519
      d_ioState->update(newState, callback, std::move(shared), ttd);
705✔
520
    }
705✔
521
    else if (newState == IOState::NeedWrite) {
4!
522
      d_ioState->update(newState, callback, std::move(shared), ttd);
4✔
523
    }
4✔
524
  }
709✔
525
}
709✔
526

527
void DoHConnectionToBackend::watchForRemoteHostClosingConnection()
528
{
306✔
529
  if (willBeReusable(false) && !d_healthCheckQuery) {
306✔
530
    updateIO(IOState::NeedRead, handleReadableIOCallback, false);
246✔
531
  }
246✔
532
}
306✔
533

534
ssize_t DoHConnectionToBackend::send_callback(nghttp2_session* session, const uint8_t* data, size_t length, int flags, void* user_data)
535
{
587✔
536
  (void)session;
587✔
537
  (void)flags;
587✔
538
  DoHConnectionToBackend* conn = reinterpret_cast<DoHConnectionToBackend*>(user_data);
587✔
539
  bool bufferWasEmpty = conn->d_out.empty();
587✔
540
  if (!conn->d_proxyProtocolPayloadSent && !conn->d_proxyProtocolPayload.empty()) {
587✔
541
    conn->d_out.insert(conn->d_out.end(), conn->d_proxyProtocolPayload.begin(), conn->d_proxyProtocolPayload.end());
6✔
542
    conn->d_proxyProtocolPayloadSent = true;
6✔
543
  }
6✔
544

545
  conn->d_out.insert(conn->d_out.end(), data, data + length);
587✔
546

547
  if (bufferWasEmpty) {
587✔
548
    try {
442✔
549
      // cerr<<"in "<<__PRETTY_FUNCTION__<<" trying to write "<<conn->d_out.size()-conn->d_outPos<<endl;
550
      auto state = conn->d_handler->tryWrite(conn->d_out, conn->d_outPos, conn->d_out.size());
442✔
551
      // cerr<<"got a "<<(int)state<<" state, "<<conn->d_out.size()-conn->d_outPos<<" bytes remaining"<<endl;
552
      if (state == IOState::Done) {
442✔
553
        conn->d_firstWrite = false;
391✔
554
        conn->d_out.clear();
391✔
555
        conn->d_outPos = 0;
391✔
556
        conn->stopIO();
391✔
557
        if (!conn->isIdle()) {
391✔
558
          conn->updateIO(IOState::NeedRead, handleReadableIOCallback);
371✔
559
        }
371✔
560
        else {
20✔
561
          conn->watchForRemoteHostClosingConnection();
20✔
562
        }
20✔
563
      }
391✔
564
      else {
51✔
565
        conn->updateIO(state, handleWritableIOCallback);
51✔
566
      }
51✔
567
    }
442✔
568
    catch (const std::exception& e) {
442✔
569
      vinfolog("Exception while trying to write (send) to HTTP backend connection: %s", e.what());
2!
570
      conn->handleIOError();
2✔
571
      ++conn->d_ds->tcpDiedSendingQuery;
2✔
572
    }
2✔
573
  }
442✔
574

575
  return length;
587✔
576
}
587✔
577

578
int DoHConnectionToBackend::on_frame_recv_callback(nghttp2_session* session, const nghttp2_frame* frame, void* user_data)
579
{
432✔
580
  (void)session;
432✔
581
  DoHConnectionToBackend* conn = reinterpret_cast<DoHConnectionToBackend*>(user_data);
432✔
582
  // cerr<<"Frame type is "<<std::to_string(frame->hd.type)<<endl;
583
#if 0
584
  switch (frame->hd.type) {
585
  case NGHTTP2_HEADERS:
586
    cerr<<"got headers"<<endl;
587
    if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE) {
588
      cerr<<"All headers received"<<endl;
589
    }
590
    break;
591
  case NGHTTP2_WINDOW_UPDATE:
592
    cerr<<"got window update"<<endl;
593
    break;
594
  case NGHTTP2_SETTINGS:
595
    cerr<<"got settings"<<endl;
596
    cerr<<frame->settings.niv<<endl;
597
    for (size_t idx = 0; idx < frame->settings.niv; idx++) {
598
      cerr<<"- "<<frame->settings.iv[idx].settings_id<<" "<<frame->settings.iv[idx].value<<endl;
599
    }
600
    break;
601
  case NGHTTP2_DATA:
602
    cerr<<"got data"<<endl;
603
    break;
604
  }
605
#endif
606

607
  if (frame->hd.type == NGHTTP2_GOAWAY) {
432✔
608
    conn->d_connectionDied = true;
12✔
609
  }
12✔
610

611
  /* is this the last frame for this stream? */
612
  else if ((frame->hd.type == NGHTTP2_HEADERS || frame->hd.type == NGHTTP2_DATA) && frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
420✔
613
    auto stream = conn->d_currentStreams.find(frame->hd.stream_id);
143✔
614
    if (stream != conn->d_currentStreams.end()) {
143!
615
      // cerr<<"Stream "<<frame->hd.stream_id<<" is now finished"<<endl;
616
      stream->second.d_finished = true;
143✔
617
      ++conn->d_queries;
143✔
618

619
      auto request = std::move(stream->second);
143✔
620
      conn->d_currentStreams.erase(stream->first);
143✔
621
      if (request.d_responseCode == 200U) {
143✔
622
        conn->handleResponse(std::move(request));
137✔
623
      }
137✔
624
      else {
6✔
625
        vinfolog("HTTP response has a non-200 status code: %d", request.d_responseCode);
6✔
626
        struct timeval now{
6✔
627
          .tv_sec = 0, .tv_usec = 0};
6✔
628

629
        gettimeofday(&now, nullptr);
6✔
630

631
        conn->handleResponseError(std::move(request), now);
6✔
632
      }
6✔
633

634
      if (conn->isIdle()) {
143✔
635
        conn->stopIO();
135✔
636
        conn->watchForRemoteHostClosingConnection();
135✔
637
      }
135✔
638
    }
143✔
639
    else {
×
640
      vinfolog("Stream %d NOT FOUND", frame->hd.stream_id);
×
641
      conn->d_connectionDied = true;
×
642
      ++conn->d_ds->tcpDiedReadingResponse;
×
643
      return NGHTTP2_ERR_CALLBACK_FAILURE;
×
644
    }
×
645
  }
143✔
646

647
  return 0;
432✔
648
}
432✔
649

650
int DoHConnectionToBackend::on_data_chunk_recv_callback(nghttp2_session* session, uint8_t flags, StreamID stream_id, const uint8_t* data, size_t len, void* user_data)
651
{
141✔
652
  (void)session;
141✔
653
  (void)flags;
141✔
654
  DoHConnectionToBackend* conn = reinterpret_cast<DoHConnectionToBackend*>(user_data);
141✔
655
  // cerr<<"Got data of size "<<len<<" for stream "<<stream_id<<endl;
656
  auto stream = conn->d_currentStreams.find(stream_id);
141✔
657
  if (stream == conn->d_currentStreams.end()) {
141!
658
    vinfolog("Unable to match the stream ID %d to a known one!", stream_id);
×
659
    conn->d_connectionDied = true;
×
660
    ++conn->d_ds->tcpDiedReadingResponse;
×
661
    return NGHTTP2_ERR_CALLBACK_FAILURE;
×
662
  }
×
663
  if (len > std::numeric_limits<uint16_t>::max() || (std::numeric_limits<uint16_t>::max() - stream->second.d_buffer.size()) < len) {
141!
664
    vinfolog("Data frame of size %d is too large for a DNS response (we already have %d)", len, stream->second.d_buffer.size());
×
665
    conn->d_connectionDied = true;
×
666
    ++conn->d_ds->tcpDiedReadingResponse;
×
667
    return NGHTTP2_ERR_CALLBACK_FAILURE;
×
668
  }
×
669

670
  stream->second.d_buffer.insert(stream->second.d_buffer.end(), data, data + len);
141✔
671
  if (stream->second.d_finished) {
141!
672
    // cerr<<"we now have the full response!"<<endl;
673
    // cerr<<std::string(reinterpret_cast<const char*>(data), len)<<endl;
674

675
    auto request = std::move(stream->second);
×
676
    conn->d_currentStreams.erase(stream->first);
×
677
    if (request.d_responseCode == 200U) {
×
678
      conn->handleResponse(std::move(request));
×
679
    }
×
680
    else {
×
681
      vinfolog("HTTP response has a non-200 status code: %d", request.d_responseCode);
×
682
      struct timeval now{
×
683
        .tv_sec = 0, .tv_usec = 0};
×
684

685
      gettimeofday(&now, nullptr);
×
686

687
      conn->handleResponseError(std::move(request), now);
×
688
    }
×
689
    if (conn->isIdle()) {
×
690
      conn->stopIO();
×
691
      conn->watchForRemoteHostClosingConnection();
×
692
    }
×
693
  }
×
694

695
  return 0;
141✔
696
}
141✔
697

698
int DoHConnectionToBackend::on_stream_close_callback(nghttp2_session* session, StreamID stream_id, uint32_t error_code, void* user_data)
699
{
151✔
700
  (void)session;
151✔
701
  DoHConnectionToBackend* conn = reinterpret_cast<DoHConnectionToBackend*>(user_data);
151✔
702

703
  if (error_code == 0) {
151✔
704
    return 0;
143✔
705
  }
143✔
706

707
  // cerr << "Stream " << stream_id << " closed with error_code=" << error_code << endl;
708
  conn->d_connectionDied = true;
8✔
709
  ++conn->d_ds->tcpDiedReadingResponse;
8✔
710

711
  auto stream = conn->d_currentStreams.find(stream_id);
8✔
712
  if (stream == conn->d_currentStreams.end()) {
8!
713
    /* we don't care, then */
714
    return 0;
×
715
  }
×
716

717
  struct timeval now{
8✔
718
    .tv_sec = 0, .tv_usec = 0};
8✔
719

720
  gettimeofday(&now, nullptr);
8✔
721
  auto request = std::move(stream->second);
8✔
722
  conn->d_currentStreams.erase(stream->first);
8✔
723

724
  // cerr<<"Query has "<<request.d_query.d_downstreamFailures<<" failures, backend limit is "<<conn->d_ds->d_retries<<endl;
725
  if (request.d_query.d_downstreamFailures < conn->d_ds->d_config.d_retries) {
8✔
726
    // cerr<<"in "<<__PRETTY_FUNCTION__<<", looking for a connection to send a query of size "<<request.d_query.d_buffer.size()<<endl;
727
    ++request.d_query.d_downstreamFailures;
4✔
728
    auto downstream = t_downstreamDoHConnectionsManager.getConnectionToDownstream(conn->d_mplexer, conn->d_ds, now, std::string(conn->d_proxyProtocolPayload));
4✔
729
    downstream->queueQuery(request.d_sender, std::move(request.d_query));
4✔
730
  }
4✔
731
  else {
4✔
732
    conn->handleResponseError(std::move(request), now);
4✔
733
  }
4✔
734

735
  // cerr<<"we now have "<<conn->getConcurrentStreamsCount()<<" concurrent connections"<<endl;
736
  if (conn->isIdle()) {
8✔
737
    // cerr<<"stopping IO"<<endl;
738
    conn->stopIO();
4✔
739
    conn->watchForRemoteHostClosingConnection();
4✔
740
  }
4✔
741

742
  return 0;
8✔
743
}
8✔
744

745
int DoHConnectionToBackend::on_header_callback(nghttp2_session* session, const nghttp2_frame* frame, const uint8_t* name, size_t namelen, const uint8_t* value, size_t valuelen, uint8_t flags, void* user_data)
746
{
371✔
747
  (void)session;
371✔
748
  (void)flags;
371✔
749
  DoHConnectionToBackend* conn = reinterpret_cast<DoHConnectionToBackend*>(user_data);
371✔
750

751
  const std::string status(":status");
371✔
752
  if (frame->hd.type == NGHTTP2_HEADERS && frame->headers.cat == NGHTTP2_HCAT_RESPONSE) {
371!
753
    // cerr<<"got header for "<<frame->hd.stream_id<<":"<<endl;
754
    // cerr<<"- "<<std::string(reinterpret_cast<const char*>(name), namelen)<<endl;
755
    // cerr<<"- "<<std::string(reinterpret_cast<const char*>(value), valuelen)<<endl;
756
    if (namelen == status.size() && memcmp(status.data(), name, status.size()) == 0) {
371!
757
      auto stream = conn->d_currentStreams.find(frame->hd.stream_id);
145✔
758
      if (stream == conn->d_currentStreams.end()) {
145!
759
        vinfolog("Unable to match the stream ID %d to a known one!", frame->hd.stream_id);
×
760
        conn->d_connectionDied = true;
×
761
        return NGHTTP2_ERR_CALLBACK_FAILURE;
×
762
      }
×
763
      try {
145✔
764
        pdns::checked_stoi_into(stream->second.d_responseCode, std::string(reinterpret_cast<const char*>(value), valuelen));
145✔
765
      }
145✔
766
      catch (...) {
145✔
767
        vinfolog("Error parsing the status header for stream ID %d", frame->hd.stream_id);
×
768
        conn->d_connectionDied = true;
×
769
        ++conn->d_ds->tcpDiedReadingResponse;
×
770
        return NGHTTP2_ERR_CALLBACK_FAILURE;
×
771
      }
×
772
    }
145✔
773
  }
371✔
774
  return 0;
371✔
775
}
371✔
776

777
int DoHConnectionToBackend::on_error_callback(nghttp2_session* session, int lib_error_code, const char* msg, size_t len, void* user_data)
778
{
×
779
  (void)session;
×
780
  vinfolog("Error in HTTP/2 connection: %s (%d)", std::string(msg, len), lib_error_code);
×
781

782
  DoHConnectionToBackend* conn = reinterpret_cast<DoHConnectionToBackend*>(user_data);
×
783
  conn->d_connectionDied = true;
×
784
  ++conn->d_ds->tcpDiedReadingResponse;
×
785

786
  return 0;
×
787
}
×
788

789
DoHConnectionToBackend::DoHConnectionToBackend(const std::shared_ptr<DownstreamState>& ds, std::unique_ptr<FDMultiplexer>& mplexer, const struct timeval& now, std::string&& proxyProtocolPayload) :
790
  ConnectionToBackend(ds, mplexer, now), d_proxyProtocolPayload(std::move(proxyProtocolPayload))
85✔
791
{
85✔
792
  // inherit most of the stuff from the ConnectionToBackend()
793
  d_ioState = make_unique<IOStateHandler>(*d_mplexer, d_handler->getDescriptor());
85✔
794

795
  nghttp2_session_callbacks* cbs = nullptr;
85✔
796
  if (nghttp2_session_callbacks_new(&cbs) != 0) {
85!
797
    d_connectionDied = true;
×
798
    ++d_ds->tcpDiedSendingQuery;
×
799
    vinfolog("Unable to create a callback object for a new HTTP/2 session");
×
800
    return;
×
801
  }
×
802
  std::unique_ptr<nghttp2_session_callbacks, void (*)(nghttp2_session_callbacks*)> callbacks(cbs, nghttp2_session_callbacks_del);
85✔
803
  cbs = nullptr;
85✔
804

805
  nghttp2_session_callbacks_set_send_callback(callbacks.get(), send_callback);
85✔
806
  nghttp2_session_callbacks_set_on_frame_recv_callback(callbacks.get(), on_frame_recv_callback);
85✔
807
  nghttp2_session_callbacks_set_on_data_chunk_recv_callback(callbacks.get(), on_data_chunk_recv_callback);
85✔
808
  nghttp2_session_callbacks_set_on_stream_close_callback(callbacks.get(), on_stream_close_callback);
85✔
809
  nghttp2_session_callbacks_set_on_header_callback(callbacks.get(), on_header_callback);
85✔
810
  nghttp2_session_callbacks_set_error_callback2(callbacks.get(), on_error_callback);
85✔
811

812
  nghttp2_session* sess = nullptr;
85✔
813
  if (nghttp2_session_client_new(&sess, callbacks.get(), this) != 0) {
85!
814
    d_connectionDied = true;
×
815
    ++d_ds->tcpDiedSendingQuery;
×
816
    vinfolog("Could not allocate a new HTTP/2 session");
×
817
    return;
×
818
  }
×
819

820
  d_session = std::unique_ptr<nghttp2_session, void (*)(nghttp2_session*)>(sess, nghttp2_session_del);
85✔
821
  sess = nullptr;
85✔
822

823
  callbacks.reset();
85✔
824

825
  nghttp2_settings_entry iv[] = {
85✔
826
    /* rfc7540 section-8.2.2:
827
       "Advertising a SETTINGS_MAX_CONCURRENT_STREAMS value of zero disables
828
       server push by preventing the server from creating the necessary
829
       streams."
830
    */
831
    {NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0},
85✔
832
    {NGHTTP2_SETTINGS_ENABLE_PUSH, 0},
85✔
833
    /* we might want to make the initial window size configurable, but 16M is a large enough default */
834
    {NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, 16 * 1024 * 1024}};
85✔
835
  /* client 24 bytes magic string will be sent by nghttp2 library */
836
  int rv = nghttp2_submit_settings(d_session.get(), NGHTTP2_FLAG_NONE, iv, sizeof(iv) / sizeof(*iv));
85✔
837
  if (rv != 0) {
85!
838
    d_connectionDied = true;
×
839
    ++d_ds->tcpDiedSendingQuery;
×
840
    vinfolog("Could not submit SETTINGS: %s", nghttp2_strerror(rv));
×
841
    return;
×
842
  }
×
843
}
85✔
844

845
static void handleCrossProtocolQuery(int pipefd, FDMultiplexer::funcparam_t& param)
846
{
113✔
847
  (void)pipefd;
113✔
848
  auto threadData = boost::any_cast<DoHClientThreadData*>(param);
113✔
849

850
  std::unique_ptr<CrossProtocolQuery> cpq{nullptr};
113✔
851
  try {
113✔
852
    auto tmp = threadData->d_receiver.receive();
113✔
853
    if (!tmp) {
113!
854
      return;
×
855
    }
×
856
    cpq = std::move(*tmp);
113✔
857
  }
113✔
858
  catch (const std::exception& e) {
113✔
859
    throw std::runtime_error("Error while reading from the DoH cross-protocol channel:" + std::string(e.what()));
×
860
  }
×
861

862
  struct timeval now{
113✔
863
    .tv_sec = 0, .tv_usec = 0};
113✔
864
  gettimeofday(&now, nullptr);
113✔
865

866
  std::shared_ptr<TCPQuerySender> tqs = cpq->getTCPQuerySender();
113✔
867
  auto query = std::move(cpq->query);
113✔
868
  auto downstreamServer = std::move(cpq->downstream);
113✔
869
  cpq.reset();
113✔
870

871
  try {
113✔
872
    auto downstream = t_downstreamDoHConnectionsManager.getConnectionToDownstream(threadData->mplexer, downstreamServer, now, std::move(query.d_proxyProtocolPayload));
113✔
873
    downstream->queueQuery(tqs, std::move(query));
113✔
874
  }
113✔
875
  catch (...) {
113✔
876
    TCPResponse response(std::move(query));
×
877
    tqs->notifyIOError(now, std::move(response));
×
878
  }
×
879
}
113✔
880

881
static void dohClientThread(pdns::channel::Receiver<CrossProtocolQuery>&& receiver)
882
{
400✔
883
  setThreadName("dnsdist/dohClie");
400✔
884

885
  try {
400✔
886
    DoHClientThreadData data(std::move(receiver));
400✔
887
    data.mplexer->addReadFD(data.d_receiver.getDescriptor(), handleCrossProtocolQuery, &data);
400✔
888

889
    struct timeval now{
400✔
890
      .tv_sec = 0, .tv_usec = 0};
400✔
891

892
    gettimeofday(&now, nullptr);
400✔
893
    time_t lastTimeoutScan = now.tv_sec;
400✔
894
    time_t lastConfigRefresh = now.tv_sec;
400✔
895

896
    for (;;) {
2,287✔
897
      data.mplexer->run(&now, 1000);
2,287✔
898

899
      if (now.tv_sec > lastConfigRefresh) {
2,287✔
900
        lastConfigRefresh = now.tv_sec;
1,251✔
901
        dnsdist::configuration::refreshLocalRuntimeConfiguration();
1,251✔
902
      }
1,251✔
903

904
      if (now.tv_sec > lastTimeoutScan) {
2,287✔
905
        lastTimeoutScan = now.tv_sec;
1,251✔
906

907
        try {
1,251✔
908
          t_downstreamDoHConnectionsManager.cleanupClosedConnections(now);
1,251✔
909
          handleH2Timeouts(*data.mplexer, now);
1,251✔
910

911
          if (g_dohStatesDumpRequested > 0) {
1,251!
912
            /* just to keep things clean in the output, debug only */
913
            static std::mutex s_lock;
×
914
            auto lock = std::scoped_lock(s_lock);
×
915
            if (g_dohStatesDumpRequested > 0) {
×
916
              /* no race here, we took the lock so it can only be increased in the meantime */
917
              --g_dohStatesDumpRequested;
×
918
              infolog("Dumping the DoH client states, as requested:");
×
919
              data.mplexer->runForAllWatchedFDs([](bool isRead, int fd, const FDMultiplexer::funcparam_t& param, struct timeval ttd) {
×
920
                struct timeval lnow;
×
921
                gettimeofday(&lnow, nullptr);
×
922
                if (ttd.tv_sec > 0) {
×
923
                  infolog("- Descriptor %d is in %s state, TTD in %d", fd, (isRead ? "read" : "write"), (ttd.tv_sec - lnow.tv_sec));
×
924
                }
×
925
                else {
×
926
                  infolog("- Descriptor %d is in %s state, no TTD set", fd, (isRead ? "read" : "write"));
×
927
                }
×
928

929
                if (param.type() == typeid(std::shared_ptr<DoHConnectionToBackend>)) {
×
930
                  auto conn = boost::any_cast<std::shared_ptr<DoHConnectionToBackend>>(param);
×
931
                  infolog(" - %s", conn->toString());
×
932
                }
×
933
                else if (param.type() == typeid(DoHClientThreadData*)) {
×
934
                  infolog(" - Worker thread pipe");
×
935
                }
×
936
              });
×
937
              infolog("The DoH client cache has %d active and %d idle outgoing connections cached", t_downstreamDoHConnectionsManager.getActiveCount(), t_downstreamDoHConnectionsManager.getIdleCount());
×
938
            }
×
939
          }
×
940
        }
1,251✔
941
        catch (const std::exception& e) {
1,251✔
942
          warnlog("Error in outgoing DoH thread: %s", e.what());
×
943
        }
×
944
      }
1,251✔
945
    }
2,287✔
946
  }
400✔
947
  catch (const std::exception& e) {
400✔
948
    errlog("Fatal error in outgoing DoH thread: %s", e.what());
×
949
  }
×
950
}
400✔
951
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
952

953
struct DoHClientCollection::DoHWorkerThread
954
{
955
  DoHWorkerThread()
956
  {
400✔
957
  }
400✔
958

959
  DoHWorkerThread(pdns::channel::Sender<CrossProtocolQuery>&& sender) :
960
    d_sender(std::move(sender))
400✔
961
  {
400✔
962
  }
400✔
963

964
  DoHWorkerThread(DoHWorkerThread&& rhs) :
965
    d_sender(std::move(rhs.d_sender))
966
  {
×
967
  }
×
968

969
  DoHWorkerThread& operator=(DoHWorkerThread&& rhs)
970
  {
400✔
971
    d_sender = std::move(rhs.d_sender);
400✔
972
    return *this;
400✔
973
  }
400✔
974

975
  DoHWorkerThread(const DoHWorkerThread& rhs) = delete;
976
  DoHWorkerThread& operator=(const DoHWorkerThread&) = delete;
977

978
  pdns::channel::Sender<CrossProtocolQuery> d_sender;
979
};
980

981
DoHClientCollection::DoHClientCollection(size_t numberOfThreads) :
982
  d_clientThreads(numberOfThreads)
400✔
983
{
400✔
984
}
400✔
985

986
bool DoHClientCollection::passCrossProtocolQueryToThread(std::unique_ptr<CrossProtocolQuery>&& cpq)
987
{
113✔
988
  if (d_numberOfThreads == 0) {
113!
989
    throw std::runtime_error("No DoH worker thread yet");
×
990
  }
×
991

992
  uint64_t pos = d_pos++;
113✔
993
  if (!d_clientThreads.at(pos % d_numberOfThreads).d_sender.send(std::move(cpq))) {
113!
994
    ++dnsdist::metrics::g_stats.outgoingDoHQueryPipeFull;
×
995
    return false;
×
996
  }
×
997

998
  return true;
113✔
999
}
113✔
1000

1001
void DoHClientCollection::addThread()
1002
{
400✔
1003
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
400✔
1004
  try {
400✔
1005
    const auto internalPipeBufferSize = dnsdist::configuration::getImmutableConfiguration().d_tcpInternalPipeBufferSize;
400✔
1006
    auto [sender, receiver] = pdns::channel::createObjectQueue<CrossProtocolQuery>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, internalPipeBufferSize);
400✔
1007

1008
    vinfolog("Adding DoH Client thread");
400✔
1009
    auto lock = std::scoped_lock(d_mutex);
400✔
1010

1011
    if (d_numberOfThreads >= d_clientThreads.size()) {
400!
1012
      vinfolog("Adding a new DoH client thread would exceed the vector size (%d/%d), skipping. Consider increasing the maximum amount of DoH client threads with setMaxDoHClientThreads() in the configuration.", d_numberOfThreads, d_clientThreads.size());
×
1013
      return;
×
1014
    }
×
1015

1016
    DoHWorkerThread worker(std::move(sender));
400✔
1017
    try {
400✔
1018
      std::thread t1(dohClientThread, std::move(receiver));
400✔
1019
      t1.detach();
400✔
1020
    }
400✔
1021
    catch (const std::runtime_error& e) {
400✔
1022
      /* the thread creation failed */
1023
      errlog("Error creating a DoH thread: %s", e.what());
×
1024
      return;
×
1025
    }
×
1026

1027
    d_clientThreads.at(d_numberOfThreads) = std::move(worker);
400✔
1028
    ++d_numberOfThreads;
400✔
1029
  }
400✔
1030
  catch (const std::exception& e) {
400✔
1031
    errlog("Error creating the DoH channel: %s", e.what());
×
1032
    return;
×
1033
  }
×
1034
#else /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
1035
  throw std::runtime_error("DoHClientCollection::addThread() called but nghttp2 support is not available");
1036
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
1037
}
400✔
1038

1039
bool initDoHWorkers()
1040
{
400✔
1041
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
400✔
1042
  auto outgoingDoHWorkerThreads = dnsdist::configuration::getImmutableConfiguration().d_outgoingDoHWorkers;
400✔
1043
  if (!outgoingDoHWorkerThreads) {
400✔
1044
    /* Unless the value has been set to 0 explicitly, always start at least one outgoing DoH worker thread, in case a DoH backend
1045
       is added at a later time. */
1046
    outgoingDoHWorkerThreads = 1;
387✔
1047
  }
387✔
1048

1049
  if (outgoingDoHWorkerThreads && *outgoingDoHWorkerThreads > 0) {
400!
1050
    g_dohClientThreads = std::make_unique<DoHClientCollection>(*outgoingDoHWorkerThreads);
400✔
1051
    for (size_t idx = 0; idx < *outgoingDoHWorkerThreads; idx++) {
800✔
1052
      g_dohClientThreads->addThread();
400✔
1053
    }
400✔
1054
  }
400✔
1055
  return true;
400✔
1056
#else
1057
  return false;
1058
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
1059
}
400✔
1060

1061
bool sendH2Query([[maybe_unused]] const std::shared_ptr<DownstreamState>& downstream, [[maybe_unused]] std::unique_ptr<FDMultiplexer>& mplexer, [[maybe_unused]] std::shared_ptr<TCPQuerySender>& sender, [[maybe_unused]] InternalQuery&& query, [[maybe_unused]] bool healthCheck)
1062
{
72✔
1063
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
72✔
1064
  struct timeval now{
72✔
1065
    .tv_sec = 0, .tv_usec = 0};
72✔
1066
  gettimeofday(&now, nullptr);
72✔
1067

1068
  if (healthCheck) {
72✔
1069
    /* always do health-checks over a new connection */
1070
    auto newConnection = std::make_shared<DoHConnectionToBackend>(downstream, mplexer, now, std::move(query.d_proxyProtocolPayload));
20✔
1071
    newConnection->setHealthCheck(healthCheck);
20✔
1072
    newConnection->queueQuery(sender, std::move(query));
20✔
1073
  }
20✔
1074
  else {
52✔
1075
    auto connection = t_downstreamDoHConnectionsManager.getConnectionToDownstream(mplexer, downstream, now, std::move(query.d_proxyProtocolPayload));
52✔
1076
    connection->queueQuery(sender, std::move(query));
52✔
1077
  }
52✔
1078

1079
  return true;
72✔
1080
#else /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
1081
  return false;
1082
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
1083
}
72✔
1084

1085
size_t clearH2Connections()
1086
{
48✔
1087
  size_t cleared = 0;
48✔
1088
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
48✔
1089
  cleared = t_downstreamDoHConnectionsManager.clear();
48✔
1090
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
48✔
1091
  return cleared;
48✔
1092
}
48✔
1093

1094
size_t handleH2Timeouts([[maybe_unused]] FDMultiplexer& mplexer, [[maybe_unused]] const struct timeval& now)
1095
{
1,323✔
1096
  size_t got = 0;
1,323✔
1097
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
1,323✔
1098
  auto expiredReadConns = mplexer.getTimeouts(now, false);
1,323✔
1099
  for (const auto& cbData : expiredReadConns) {
1,323✔
1100
    if (cbData.second.type() == typeid(std::shared_ptr<DoHConnectionToBackend>)) {
22✔
1101
      auto conn = boost::any_cast<std::shared_ptr<DoHConnectionToBackend>>(cbData.second);
15✔
1102
      vinfolog("Timeout (read) from remote DoH backend %s", conn->getBackendName());
15✔
1103
      conn->handleTimeout(now, false);
15✔
1104
      ++got;
15✔
1105
    }
15✔
1106
  }
22✔
1107

1108
  auto expiredWriteConns = mplexer.getTimeouts(now, true);
1,323✔
1109
  for (const auto& cbData : expiredWriteConns) {
1,323✔
1110
    if (cbData.second.type() == typeid(std::shared_ptr<DoHConnectionToBackend>)) {
2!
1111
      auto conn = boost::any_cast<std::shared_ptr<DoHConnectionToBackend>>(cbData.second);
2✔
1112
      vinfolog("Timeout (write) from remote DoH backend %s", conn->getBackendName());
2!
1113
      conn->handleTimeout(now, true);
2✔
1114
      ++got;
2✔
1115
    }
2✔
1116
  }
2✔
1117
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
1,323✔
1118
  return got;
1,323✔
1119
}
1,323✔
1120

1121
void setDoHDownstreamCleanupInterval([[maybe_unused]] uint16_t max)
1122
{
400✔
1123
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
400✔
1124
  DownstreamDoHConnectionsManager::setCleanupInterval(max);
400✔
1125
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
400✔
1126
}
400✔
1127

1128
void setDoHDownstreamMaxIdleTime([[maybe_unused]] uint16_t max)
1129
{
400✔
1130
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
400✔
1131
  DownstreamDoHConnectionsManager::setMaxIdleTime(max);
400✔
1132
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
400✔
1133
}
400✔
1134

1135
void setDoHDownstreamMaxIdleConnectionsPerBackend([[maybe_unused]] size_t max)
1136
{
400✔
1137
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
400✔
1138
  DownstreamDoHConnectionsManager::setMaxIdleConnectionsPerDownstream(max);
400✔
1139
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
400✔
1140
}
400✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc