• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

PowerDNS / pdns / 12595591960

03 Jan 2025 09:27AM UTC coverage: 62.774% (+2.5%) from 60.245%
12595591960

Pull #15008

github

web-flow
Merge c2a2749d3 into 788f396a7
Pull Request #15008: Do not follow CNAME records for ANY or CNAME queries

30393 of 78644 branches covered (38.65%)

Branch coverage included in aggregate %.

105822 of 138350 relevant lines covered (76.49%)

4613078.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.75
/pdns/dnsdistdist/doq.cc
1
/*
2
 * This file is part of PowerDNS or dnsdist.
3
 * Copyright -- PowerDNS.COM B.V. and its contributors
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of version 2 of the GNU General Public License as
7
 * published by the Free Software Foundation.
8
 *
9
 * In addition, for the avoidance of any doubt, permission is granted to
10
 * link this program with OpenSSL and to (re)distribute the binaries
11
 * produced as the result of such linking.
12
 *
13
 * This program is distributed in the hope that it will be useful,
14
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
 * GNU General Public License for more details.
17
 *
18
 * You should have received a copy of the GNU General Public License
19
 * along with this program; if not, write to the Free Software
20
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21
 */
22

23
#include "doq.hh"
24

25
#ifdef HAVE_DNS_OVER_QUIC
26
#include <quiche.h>
27

28
#include "dolog.hh"
29
#include "iputils.hh"
30
#include "misc.hh"
31
#include "sstuff.hh"
32
#include "threadname.hh"
33

34
#include "dnsdist-dnsparser.hh"
35
#include "dnsdist-ecs.hh"
36
#include "dnsdist-proxy-protocol.hh"
37
#include "dnsdist-tcp.hh"
38
#include "dnsdist-random.hh"
39

40
#include "doq-common.hh"
41

42
using namespace dnsdist::doq;
43

44
#if 0
45
#define DEBUGLOG_ENABLED
46
#define DEBUGLOG(x) std::cerr << x << std::endl;
47
#else
48
#define DEBUGLOG(x)
49
#endif
50

51
class Connection
52
{
53
public:
54
  Connection(const ComboAddress& peer, const ComboAddress& localAddr, QuicheConfig config, QuicheConnection conn) :
55
    d_peer(peer), d_localAddr(localAddr), d_conn(std::move(conn)), d_config(std::move(config))
56
  {
62✔
57
  }
62✔
58
  Connection(const Connection&) = delete;
59
  Connection(Connection&&) = default;
62✔
60
  Connection& operator=(const Connection&) = delete;
61
  Connection& operator=(Connection&&) = default;
62
  ~Connection() = default;
115✔
63

64
  ComboAddress d_peer;
65
  ComboAddress d_localAddr;
66
  QuicheConnection d_conn;
67
  QuicheConfig d_config;
68

69
  std::unordered_map<uint64_t, PacketBuffer> d_streamBuffers;
70
  std::unordered_map<uint64_t, PacketBuffer> d_streamOutBuffers;
71
};
72

73
static void sendBackDOQUnit(DOQUnitUniquePtr&& unit, const char* description);
74

75
struct DOQServerConfig
76
{
77
  DOQServerConfig(QuicheConfig&& config_, uint32_t internalPipeBufferSize) :
78
    config(std::move(config_))
79
  {
12✔
80
    {
12✔
81
      auto [sender, receiver] = pdns::channel::createObjectQueue<DOQUnit>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, internalPipeBufferSize);
12✔
82
      d_responseSender = std::move(sender);
12✔
83
      d_responseReceiver = std::move(receiver);
12✔
84
    }
12✔
85
  }
12✔
86
  DOQServerConfig(const DOQServerConfig&) = delete;
87
  DOQServerConfig(DOQServerConfig&&) = default;
88
  DOQServerConfig& operator=(const DOQServerConfig&) = delete;
89
  DOQServerConfig& operator=(DOQServerConfig&&) = default;
90
  ~DOQServerConfig() = default;
×
91

92
  using ConnectionsMap = std::map<PacketBuffer, Connection>;
93

94
  ConnectionsMap d_connections;
95
  QuicheConfig config;
96
  ClientState* clientState{nullptr};
97
  std::shared_ptr<DOQFrontend> df{nullptr};
98
  pdns::channel::Sender<DOQUnit> d_responseSender;
99
  pdns::channel::Receiver<DOQUnit> d_responseReceiver;
100
};
101

102
/* these might seem useless, but they are needed because
103
   they need to be declared _after_ the definition of DOQServerConfig
104
   so that we can use a unique_ptr in DOQFrontend */
105
DOQFrontend::DOQFrontend() = default;
24✔
106
DOQFrontend::~DOQFrontend() = default;
×
107

108
class DOQTCPCrossQuerySender final : public TCPQuerySender
109
{
110
public:
111
  DOQTCPCrossQuerySender() = default;
649✔
112

113
  [[nodiscard]] bool active() const override
114
  {
68✔
115
    return true;
68✔
116
  }
68✔
117

118
  void handleResponse([[maybe_unused]] const struct timeval& now, TCPResponse&& response) override
119
  {
60✔
120
    if (!response.d_idstate.doqu) {
60!
121
      return;
×
122
    }
×
123

124
    auto unit = std::move(response.d_idstate.doqu);
60✔
125
    if (unit->dsc == nullptr) {
60!
126
      return;
×
127
    }
×
128

129
    unit->response = std::move(response.d_buffer);
60✔
130
    unit->ids = std::move(response.d_idstate);
60✔
131
    DNSResponse dnsResponse(unit->ids, unit->response, unit->downstream);
60✔
132

133
    dnsheader cleartextDH{};
60✔
134
    memcpy(&cleartextDH, dnsResponse.getHeader().get(), sizeof(cleartextDH));
60✔
135

136
    if (!response.isAsync()) {
60✔
137
      dnsResponse.ids.doqu = std::move(unit);
34✔
138

139
      if (!processResponse(dnsResponse.ids.doqu->response, dnsResponse, false)) {
34!
140
        if (dnsResponse.ids.doqu) {
×
141

142
          sendBackDOQUnit(std::move(dnsResponse.ids.doqu), "Response dropped by rules");
×
143
        }
×
144
        return;
×
145
      }
×
146

147
      if (dnsResponse.isAsynchronous()) {
34✔
148
        return;
24✔
149
      }
24✔
150

151
      unit = std::move(dnsResponse.ids.doqu);
10✔
152
    }
10✔
153

154
    if (!unit->ids.selfGenerated) {
36✔
155
      double udiff = unit->ids.queryRealTime.udiff();
30✔
156
      vinfolog("Got answer from %s, relayed to %s (quic, %d bytes), took %f us", unit->downstream->d_config.remote.toStringWithPort(), unit->ids.origRemote.toStringWithPort(), unit->response.size(), udiff);
30✔
157

158
      auto backendProtocol = unit->downstream->getProtocol();
30✔
159
      if (backendProtocol == dnsdist::Protocol::DoUDP && unit->tcp) {
30✔
160
        backendProtocol = dnsdist::Protocol::DoTCP;
10✔
161
      }
10✔
162
      handleResponseSent(unit->ids, udiff, unit->ids.origRemote, unit->downstream->d_config.remote, unit->response.size(), cleartextDH, backendProtocol, true);
30✔
163
    }
30✔
164

165
    ++dnsdist::metrics::g_stats.responses;
36✔
166
    if (unit->ids.cs != nullptr) {
36!
167
      ++unit->ids.cs->responses;
36✔
168
    }
36✔
169

170
    sendBackDOQUnit(std::move(unit), "Cross-protocol response");
36✔
171
  }
36✔
172

173
  void handleXFRResponse(const struct timeval& now, TCPResponse&& response) override
174
  {
×
175
    return handleResponse(now, std::move(response));
×
176
  }
×
177

178
  void notifyIOError([[maybe_unused]] const struct timeval& now, TCPResponse&& response) override
179
  {
6✔
180
    if (!response.d_idstate.doqu) {
6!
181
      return;
×
182
    }
×
183

184
    auto unit = std::move(response.d_idstate.doqu);
6✔
185
    if (unit->dsc == nullptr) {
6!
186
      return;
×
187
    }
×
188

189
    /* this will signal an error */
190
    unit->response.clear();
6✔
191
    unit->ids = std::move(response.d_idstate);
6✔
192
    sendBackDOQUnit(std::move(unit), "Cross-protocol error");
6✔
193
  }
6✔
194
};
195

196
class DOQCrossProtocolQuery : public CrossProtocolQuery
197
{
198
public:
199
  DOQCrossProtocolQuery(DOQUnitUniquePtr&& unit, bool isResponse)
200
  {
68✔
201
    if (isResponse) {
68✔
202
      /* happens when a response becomes async */
203
      query = InternalQuery(std::move(unit->response), std::move(unit->ids));
26✔
204
    }
26✔
205
    else {
42✔
206
      /* we need to duplicate the query here because we might need
207
         the existing query later if we get a truncated answer */
208
      query = InternalQuery(PacketBuffer(unit->query), std::move(unit->ids));
42✔
209
    }
42✔
210

211
    /* it might have been moved when we moved unit->ids */
212
    if (unit) {
68✔
213
      query.d_idstate.doqu = std::move(unit);
58✔
214
    }
58✔
215

216
    /* we _could_ remove it from the query buffer and put in query's d_proxyProtocolPayload,
217
       clearing query.d_proxyProtocolPayloadAdded and unit->proxyProtocolPayloadSize.
218
       Leave it for now because we know that the onky case where the payload has been
219
       added is when we tried over UDP, got a TC=1 answer and retried over TCP/DoT,
220
       and we know the TCP/DoT code can handle it. */
221
    query.d_proxyProtocolPayloadAdded = query.d_idstate.doqu->proxyProtocolPayloadSize > 0;
68✔
222
    downstream = query.d_idstate.doqu->downstream;
68✔
223
  }
68✔
224

225
  void handleInternalError()
226
  {
×
227
    sendBackDOQUnit(std::move(query.d_idstate.doqu), "DOQ internal error");
×
228
  }
×
229

230
  std::shared_ptr<TCPQuerySender> getTCPQuerySender() override
231
  {
66✔
232
    query.d_idstate.doqu->downstream = downstream;
66✔
233
    return s_sender;
66✔
234
  }
66✔
235

236
  DNSQuestion getDQ() override
237
  {
52✔
238
    auto& ids = query.d_idstate;
52✔
239
    DNSQuestion dnsQuestion(ids, query.d_buffer);
52✔
240
    return dnsQuestion;
52✔
241
  }
52✔
242

243
  DNSResponse getDR() override
244
  {
22✔
245
    auto& ids = query.d_idstate;
22✔
246
    DNSResponse dnsResponse(ids, query.d_buffer, downstream);
22✔
247
    return dnsResponse;
22✔
248
  }
22✔
249

250
  DOQUnitUniquePtr&& releaseDU()
251
  {
×
252
    return std::move(query.d_idstate.doqu);
×
253
  }
×
254

255
private:
256
  static std::shared_ptr<DOQTCPCrossQuerySender> s_sender;
257
};
258

259
std::shared_ptr<DOQTCPCrossQuerySender> DOQCrossProtocolQuery::s_sender = std::make_shared<DOQTCPCrossQuerySender>();
260

261
static bool tryWriteResponse(Connection& conn, const uint64_t streamID, PacketBuffer& response)
262
{
51✔
263
  size_t pos = 0;
51✔
264
  while (pos < response.size()) {
102✔
265
#ifdef HAVE_QUICHE_STREAM_ERROR_CODES
51✔
266
    uint64_t quicheErrorCode{0};
51✔
267
    auto res = quiche_conn_stream_send(conn.d_conn.get(), streamID, &response.at(pos), response.size() - pos, true, &quicheErrorCode);
51✔
268
#else
269
    auto res = quiche_conn_stream_send(conn.d_conn.get(), streamID, &response.at(pos), response.size() - pos, true);
270
#endif
271
    if (res == QUICHE_ERR_DONE) {
51!
272
      response.erase(response.begin(), response.begin() + static_cast<ssize_t>(pos));
×
273
      return false;
×
274
    }
×
275
    if (res < 0) {
51!
276
      quiche_conn_stream_shutdown(conn.d_conn.get(), streamID, QUICHE_SHUTDOWN_WRITE, static_cast<uint64_t>(DOQ_Error_Codes::DOQ_INTERNAL_ERROR));
×
277
      return true;
×
278
    }
×
279
    pos += res;
51✔
280
  }
51✔
281

282
  return true;
51✔
283
}
51✔
284

285
static void handleResponse(DOQFrontend& frontend, Connection& conn, const uint64_t streamID, PacketBuffer& response)
286
{
60✔
287
  if (response.empty()) {
60✔
288
    ++frontend.d_errorResponses;
9✔
289
    quiche_conn_stream_shutdown(conn.d_conn.get(), streamID, QUICHE_SHUTDOWN_WRITE, static_cast<uint64_t>(DOQ_Error_Codes::DOQ_UNSPECIFIED_ERROR));
9✔
290
    return;
9✔
291
  }
9✔
292
  ++frontend.d_validResponses;
51✔
293
  auto responseSize = static_cast<uint16_t>(response.size());
51✔
294
  const std::array<uint8_t, 2> sizeBytes = {static_cast<uint8_t>(responseSize / 256), static_cast<uint8_t>(responseSize % 256)};
51✔
295
  response.insert(response.begin(), sizeBytes.begin(), sizeBytes.end());
51✔
296
  if (!tryWriteResponse(conn, streamID, response)) {
51!
297
    conn.d_streamOutBuffers[streamID] = std::move(response);
×
298
  }
×
299
}
51✔
300

301
void DOQFrontend::setup()
302
{
12✔
303
  auto config = QuicheConfig(quiche_config_new(QUICHE_PROTOCOL_VERSION), quiche_config_free);
12✔
304
  d_quicheParams.d_alpn = std::string(DOQ_ALPN.begin(), DOQ_ALPN.end());
12✔
305
  configureQuiche(config, d_quicheParams, false);
12✔
306
  d_server_config = std::make_unique<DOQServerConfig>(std::move(config), d_internalPipeBufferSize);
12✔
307
}
12✔
308

309
void DOQFrontend::reloadCertificates()
310
{
1✔
311
  auto config = QuicheConfig(quiche_config_new(QUICHE_PROTOCOL_VERSION), quiche_config_free);
1✔
312
  d_quicheParams.d_alpn = std::string(DOQ_ALPN.begin(), DOQ_ALPN.end());
1✔
313
  configureQuiche(config, d_quicheParams, false);
1✔
314
  std::atomic_store_explicit(&d_server_config->config, std::move(config), std::memory_order_release);
1✔
315
}
1✔
316

317
static std::optional<std::reference_wrapper<Connection>> getConnection(DOQServerConfig::ConnectionsMap& connMap, const PacketBuffer& connID)
318
{
524✔
319
  auto iter = connMap.find(connID);
524✔
320
  if (iter == connMap.end()) {
524✔
321
    return std::nullopt;
124✔
322
  }
124✔
323
  return iter->second;
400✔
324
}
524✔
325

326
static void sendBackDOQUnit(DOQUnitUniquePtr&& unit, const char* description)
327
{
42✔
328
  if (unit->dsc == nullptr) {
42!
329
    return;
×
330
  }
×
331
  try {
42✔
332
    if (!unit->dsc->d_responseSender.send(std::move(unit))) {
42!
333
      ++dnsdist::metrics::g_stats.doqResponsePipeFull;
×
334
      vinfolog("Unable to pass a %s to the DoQ worker thread because the pipe is full", description);
×
335
    }
×
336
  }
42✔
337
  catch (const std::exception& e) {
42✔
338
    vinfolog("Unable to pass a %s to the DoQ worker thread because we couldn't write to the pipe: %s", description, e.what());
×
339
  }
×
340
}
42✔
341

342
static std::optional<std::reference_wrapper<Connection>> createConnection(DOQServerConfig& config, const PacketBuffer& serverSideID, const PacketBuffer& originalDestinationID, const ComboAddress& peer, const ComboAddress& localAddr)
343
{
62✔
344
  auto quicheConfig = std::atomic_load_explicit(&config.config, std::memory_order_acquire);
62✔
345
  auto quicheConn = QuicheConnection(quiche_accept(serverSideID.data(), serverSideID.size(),
62✔
346
                                                   originalDestinationID.data(), originalDestinationID.size(),
62✔
347
                                                   // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
348
                                                   reinterpret_cast<const struct sockaddr*>(&localAddr),
62✔
349
                                                   localAddr.getSocklen(),
62✔
350
                                                   // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
351
                                                   reinterpret_cast<const struct sockaddr*>(&peer),
62✔
352
                                                   peer.getSocklen(),
62✔
353
                                                   quicheConfig.get()),
62✔
354
                                     quiche_conn_free);
62✔
355

356
  if (config.df && !config.df->d_quicheParams.d_keyLogFile.empty()) {
62!
357
    quiche_conn_set_keylog_path(quicheConn.get(), config.df->d_quicheParams.d_keyLogFile.c_str());
×
358
  }
×
359

360
  auto conn = Connection(peer, localAddr, std::move(quicheConfig), std::move(quicheConn));
62✔
361
  auto pair = config.d_connections.emplace(serverSideID, std::move(conn));
62✔
362
  return pair.first->second;
62✔
363
}
62✔
364

365
std::unique_ptr<CrossProtocolQuery> getDOQCrossProtocolQueryFromDQ(DNSQuestion& dnsQuestion, bool isResponse)
366
{
58✔
367
  if (!dnsQuestion.ids.doqu) {
58!
368
    throw std::runtime_error("Trying to create a DoQ cross protocol query without a valid DoQ unit");
×
369
  }
×
370

371
  auto unit = std::move(dnsQuestion.ids.doqu);
58✔
372
  if (&dnsQuestion.ids != &unit->ids) {
58✔
373
    unit->ids = std::move(dnsQuestion.ids);
2✔
374
  }
2✔
375

376
  unit->ids.origID = dnsQuestion.getHeader()->id;
58✔
377

378
  if (!isResponse) {
58✔
379
    if (unit->query.data() != dnsQuestion.getMutableData().data()) {
32!
380
      unit->query = std::move(dnsQuestion.getMutableData());
×
381
    }
×
382
  }
32✔
383
  else {
26✔
384
    if (unit->response.data() != dnsQuestion.getMutableData().data()) {
26✔
385
      unit->response = std::move(dnsQuestion.getMutableData());
2✔
386
    }
2✔
387
  }
26✔
388

389
  return std::make_unique<DOQCrossProtocolQuery>(std::move(unit), isResponse);
58✔
390
}
58✔
391

392
static void processDOQQuery(DOQUnitUniquePtr&& doqUnit)
393
{
60✔
394
  const auto handleImmediateResponse = [](DOQUnitUniquePtr&& unit, [[maybe_unused]] const char* reason) {
60✔
395
    DEBUGLOG("handleImmediateResponse() reason=" << reason);
18✔
396
    auto conn = getConnection(unit->dsc->df->d_server_config->d_connections, unit->serverConnID);
18✔
397
    handleResponse(*unit->dsc->df, *conn, unit->streamID, unit->response);
18✔
398
    unit->ids.doqu.reset();
18✔
399
  };
18✔
400

401
  auto& ids = doqUnit->ids;
60✔
402
  ids.doqu = std::move(doqUnit);
60✔
403
  auto& unit = ids.doqu;
60✔
404
  uint16_t queryId = 0;
60✔
405
  ComboAddress remote;
60✔
406

407
  try {
60✔
408

409
    remote = unit->ids.origRemote;
60✔
410
    DOQServerConfig* dsc = unit->dsc;
60✔
411
    ClientState& clientState = *dsc->clientState;
60✔
412

413
    if (!dnsdist::configuration::getCurrentRuntimeConfiguration().d_ACL.match(remote)) {
60✔
414
      vinfolog("Query from %s (DoQ) dropped because of ACL", remote.toStringWithPort());
1!
415
      ++dnsdist::metrics::g_stats.aclDrops;
1✔
416
      unit->response.clear();
1✔
417

418
      handleImmediateResponse(std::move(unit), "DoQ query dropped because of ACL");
1✔
419
      return;
1✔
420
    }
1✔
421

422
    if (unit->query.size() < sizeof(dnsheader)) {
59!
423
      ++dnsdist::metrics::g_stats.nonCompliantQueries;
×
424
      ++clientState.nonCompliantQueries;
×
425
      unit->response.clear();
×
426

427
      handleImmediateResponse(std::move(unit), "DoQ non-compliant query");
×
428
      return;
×
429
    }
×
430

431
    ++clientState.queries;
59✔
432
    ++dnsdist::metrics::g_stats.queries;
59✔
433
    unit->ids.queryRealTime.start();
59✔
434

435
    {
59✔
436
      /* don't keep that pointer around, it will be invalidated if the buffer is ever resized */
437
      dnsheader_aligned dnsHeader(unit->query.data());
59✔
438

439
      if (!checkQueryHeaders(*dnsHeader, clientState)) {
59!
440
        dnsdist::PacketMangling::editDNSHeaderFromPacket(unit->query, [](dnsheader& header) {
×
441
          header.rcode = RCode::ServFail;
×
442
          header.qr = true;
×
443
          return true;
×
444
        });
×
445
        unit->response = std::move(unit->query);
×
446

447
        handleImmediateResponse(std::move(unit), "DoQ invalid headers");
×
448
        return;
×
449
      }
×
450

451
      if (dnsHeader->qdcount == 0) {
59!
452
        dnsdist::PacketMangling::editDNSHeaderFromPacket(unit->query, [](dnsheader& header) {
×
453
          header.rcode = RCode::NotImp;
×
454
          header.qr = true;
×
455
          return true;
×
456
        });
×
457
        unit->response = std::move(unit->query);
×
458

459
        handleImmediateResponse(std::move(unit), "DoQ empty query");
×
460
        return;
×
461
      }
×
462

463
      queryId = ntohs(dnsHeader->id);
59✔
464
    }
59✔
465

466
    auto downstream = unit->downstream;
×
467
    // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
468
    unit->ids.qname = DNSName(reinterpret_cast<const char*>(unit->query.data()), static_cast<int>(unit->query.size()), sizeof(dnsheader), false, &unit->ids.qtype, &unit->ids.qclass);
59✔
469
    DNSQuestion dnsQuestion(unit->ids, unit->query);
59✔
470
    dnsdist::PacketMangling::editDNSHeaderFromPacket(dnsQuestion.getMutableData(), [&ids](dnsheader& header) {
59✔
471
      const uint16_t* flags = getFlagsFromDNSHeader(&header);
59✔
472
      ids.origFlags = *flags;
59✔
473
      return true;
59✔
474
    });
59✔
475
    unit->ids.cs = &clientState;
59✔
476

477
    auto result = processQuery(dnsQuestion, downstream);
59✔
478
    if (result == ProcessQueryResult::Drop) {
59✔
479
      handleImmediateResponse(std::move(unit), "DoQ dropped query");
2✔
480
      return;
2✔
481
    }
2✔
482
    if (result == ProcessQueryResult::Asynchronous) {
57✔
483
      return;
32✔
484
    }
32✔
485
    if (result == ProcessQueryResult::SendAnswer) {
25✔
486
      if (unit->response.empty()) {
15!
487
        unit->response = std::move(unit->query);
15✔
488
      }
15✔
489
      if (unit->response.size() >= sizeof(dnsheader)) {
15!
490
        const dnsheader_aligned dnsHeader(unit->response.data());
15✔
491

492
        handleResponseSent(unit->ids.qname, QType(unit->ids.qtype), 0., unit->ids.origDest, ComboAddress(), unit->response.size(), *dnsHeader, dnsdist::Protocol::DoQ, dnsdist::Protocol::DoQ, false);
15✔
493
      }
15✔
494
      handleImmediateResponse(std::move(unit), "DoQ self-answered response");
15✔
495
      return;
15✔
496
    }
15✔
497

498
    ++dnsdist::metrics::g_stats.responses;
10✔
499
    if (unit->ids.cs != nullptr) {
10!
500
      ++unit->ids.cs->responses;
10✔
501
    }
10✔
502

503
    if (result != ProcessQueryResult::PassToBackend) {
10!
504
      handleImmediateResponse(std::move(unit), "DoQ no backend available");
×
505
      return;
×
506
    }
×
507

508
    if (downstream == nullptr) {
10!
509
      handleImmediateResponse(std::move(unit), "DoQ no backend available");
×
510
      return;
×
511
    }
×
512

513
    unit->downstream = downstream;
10✔
514

515
    std::string proxyProtocolPayload;
10✔
516
    /* we need to do this _before_ creating the cross protocol query because
517
       after that the buffer will have been moved */
518
    if (downstream->d_config.useProxyProtocol) {
10!
519
      proxyProtocolPayload = getProxyProtocolPayload(dnsQuestion);
×
520
    }
×
521

522
    unit->ids.origID = htons(queryId);
10✔
523
    unit->tcp = true;
10✔
524

525
    /* this moves unit->ids, careful! */
526
    auto cpq = std::make_unique<DOQCrossProtocolQuery>(std::move(unit), false);
10✔
527
    cpq->query.d_proxyProtocolPayload = std::move(proxyProtocolPayload);
10✔
528

529
    if (downstream->passCrossProtocolQuery(std::move(cpq))) {
10!
530
      return;
10✔
531
    }
10✔
532
    // NOLINTNEXTLINE(bugprone-use-after-move): it was only moved if the call succeeded
533
    unit = cpq->releaseDU();
×
534
    handleImmediateResponse(std::move(unit), "DoQ internal error");
×
535
    return;
×
536
  }
10✔
537
  catch (const std::exception& e) {
60✔
538
    vinfolog("Got an error in DOQ question thread while parsing a query from %s, id %d: %s", remote.toStringWithPort(), queryId, e.what());
×
539
    handleImmediateResponse(std::move(unit), "DoQ internal error");
×
540
    return;
×
541
  }
×
542
}
60✔
543

544
static void doq_dispatch_query(DOQServerConfig& dsc, PacketBuffer&& query, const ComboAddress& local, const ComboAddress& remote, const PacketBuffer& serverConnID, const uint64_t streamID)
545
{
60✔
546
  try {
60✔
547
    auto unit = std::make_unique<DOQUnit>(std::move(query));
60✔
548
    unit->dsc = &dsc;
60✔
549
    unit->ids.origDest = local;
60✔
550
    unit->ids.origRemote = remote;
60✔
551
    unit->ids.protocol = dnsdist::Protocol::DoQ;
60✔
552
    unit->serverConnID = serverConnID;
60✔
553
    unit->streamID = streamID;
60✔
554

555
    processDOQQuery(std::move(unit));
60✔
556
  }
60✔
557
  catch (const std::exception& exp) {
60✔
558
    vinfolog("Had error handling DoQ DNS packet from %s: %s", remote.toStringWithPort(), exp.what());
×
559
  }
×
560
}
60✔
561

562
static void flushResponses(pdns::channel::Receiver<DOQUnit>& receiver)
563
{
42✔
564
  for (;;) {
84✔
565
    try {
84✔
566
      auto tmp = receiver.receive();
84✔
567
      if (!tmp) {
84✔
568
        return;
42✔
569
      }
42✔
570

571
      auto unit = std::move(*tmp);
42✔
572
      auto conn = getConnection(unit->dsc->df->d_server_config->d_connections, unit->serverConnID);
42✔
573
      if (conn) {
42!
574
        handleResponse(*unit->dsc->df, *conn, unit->streamID, unit->response);
42✔
575
      }
42✔
576
    }
42✔
577
    catch (const std::exception& e) {
84✔
578
      errlog("Error while processing response received over DoQ: %s", e.what());
×
579
    }
×
580
    catch (...) {
84✔
581
      errlog("Unspecified error while processing response received over DoQ");
×
582
    }
×
583
  }
84✔
584
}
42✔
585

586
static void flushStalledResponses(Connection& conn)
587
{
494✔
588
  for (auto streamIt = conn.d_streamOutBuffers.begin(); streamIt != conn.d_streamOutBuffers.end();) {
494!
589
    const auto& streamID = streamIt->first;
×
590
    auto& response = streamIt->second;
×
591
    if (quiche_conn_stream_writable(conn.d_conn.get(), streamID, response.size()) == 1) {
×
592
      if (tryWriteResponse(conn, streamID, response)) {
×
593
        streamIt = conn.d_streamOutBuffers.erase(streamIt);
×
594
        continue;
×
595
      }
×
596
    }
×
597
    ++streamIt;
×
598
  }
×
599
}
494✔
600

601
static void handleReadableStream(DOQFrontend& frontend, ClientState& clientState, Connection& conn, uint64_t streamID, const ComboAddress& client, const PacketBuffer& serverConnID)
602
{
61✔
603
  auto& streamBuffer = conn.d_streamBuffers[streamID];
61✔
604
  while (true) {
61✔
605
    bool fin = false;
61✔
606
    auto existingLength = streamBuffer.size();
61✔
607
    streamBuffer.resize(existingLength + 512);
61✔
608
#ifdef HAVE_QUICHE_STREAM_ERROR_CODES
61✔
609
    uint64_t quicheErrorCode{0};
61✔
610
    auto received = quiche_conn_stream_recv(conn.d_conn.get(), streamID,
61✔
611
                                            &streamBuffer.at(existingLength), 512,
61✔
612
                                            &fin,
61✔
613
                                            &quicheErrorCode);
61✔
614
#else
615
    auto received = quiche_conn_stream_recv(conn.d_conn.get(), streamID,
616
                                            &streamBuffer.at(existingLength), 512,
617
                                            &fin);
618
#endif
619
    if (received == 0 || received == QUICHE_ERR_DONE) {
61!
620
      streamBuffer.resize(existingLength);
×
621
      return;
×
622
    }
×
623
    if (received < 0) {
61!
624
      ++dnsdist::metrics::g_stats.nonCompliantQueries;
×
625
      ++clientState.nonCompliantQueries;
×
626
      quiche_conn_stream_shutdown(conn.d_conn.get(), streamID, QUICHE_SHUTDOWN_WRITE, static_cast<uint64_t>(DOQ_Error_Codes::DOQ_PROTOCOL_ERROR));
×
627
      return;
×
628
    }
×
629

630
    streamBuffer.resize(existingLength + received);
61✔
631
    if (fin) {
61!
632
      break;
61✔
633
    }
61✔
634
  }
61✔
635

636
  if (streamBuffer.size() < (sizeof(uint16_t) + sizeof(dnsheader))) {
61!
637
    ++dnsdist::metrics::g_stats.nonCompliantQueries;
×
638
    ++clientState.nonCompliantQueries;
×
639
    quiche_conn_stream_shutdown(conn.d_conn.get(), streamID, QUICHE_SHUTDOWN_WRITE, static_cast<uint64_t>(DOQ_Error_Codes::DOQ_PROTOCOL_ERROR));
×
640
    return;
×
641
  }
×
642

643
  uint16_t payloadLength = streamBuffer.at(0) * 256 + streamBuffer.at(1);
61✔
644
  streamBuffer.erase(streamBuffer.begin(), streamBuffer.begin() + 2);
61✔
645
  if (payloadLength != streamBuffer.size()) {
61✔
646
    ++dnsdist::metrics::g_stats.nonCompliantQueries;
1✔
647
    ++clientState.nonCompliantQueries;
1✔
648
    quiche_conn_stream_shutdown(conn.d_conn.get(), streamID, QUICHE_SHUTDOWN_WRITE, static_cast<uint64_t>(DOQ_Error_Codes::DOQ_PROTOCOL_ERROR));
1✔
649
    return;
1✔
650
  }
1✔
651
  DEBUGLOG("Dispatching query");
60✔
652
  doq_dispatch_query(*(frontend.d_server_config), std::move(streamBuffer), conn.d_localAddr, client, serverConnID, streamID);
60✔
653
  conn.d_streamBuffers.erase(streamID);
60✔
654
}
60✔
655

656
static void handleSocketReadable(DOQFrontend& frontend, ClientState& clientState, Socket& sock, PacketBuffer& buffer)
657
{
459✔
658
  // destination connection ID, will have to be sent as original destination connection ID
659
  PacketBuffer serverConnID;
459✔
660
  // source connection ID, will have to be sent as destination connection ID
661
  PacketBuffer clientConnID;
459✔
662
  PacketBuffer tokenBuf;
459✔
663
  while (true) {
923✔
664
    ComboAddress client;
923✔
665
    ComboAddress localAddr;
923✔
666
    client.sin4.sin_family = clientState.local.sin4.sin_family;
923✔
667
    localAddr.sin4.sin_family = clientState.local.sin4.sin_family;
923✔
668
    buffer.resize(4096);
923✔
669
    if (!dnsdist::doq::recvAsync(sock, buffer, client, localAddr)) {
923✔
670
      return;
459✔
671
    }
459✔
672
    if (localAddr.sin4.sin_family == 0) {
464✔
673
      localAddr = clientState.local;
457✔
674
    }
457✔
675
    else {
7✔
676
      /* we don't get the port, only the address */
677
      localAddr.sin4.sin_port = clientState.local.sin4.sin_port;
7✔
678
    }
7✔
679

680
    DEBUGLOG("Received DoQ datagram of size " << buffer.size() << " from " << client.toStringWithPort());
464✔
681

682
    uint32_t version{0};
464✔
683
    uint8_t type{0};
464✔
684
    std::array<uint8_t, QUICHE_MAX_CONN_ID_LEN> scid{};
464✔
685
    size_t scid_len = scid.size();
464✔
686
    std::array<uint8_t, QUICHE_MAX_CONN_ID_LEN> dcid{};
464✔
687
    size_t dcid_len = dcid.size();
464✔
688
    std::array<uint8_t, MAX_TOKEN_LEN> token{};
464✔
689
    size_t token_len = token.size();
464✔
690

691
    auto res = quiche_header_info(buffer.data(), buffer.size(), LOCAL_CONN_ID_LEN,
464✔
692
                                  &version, &type,
464✔
693
                                  scid.data(), &scid_len,
464✔
694
                                  dcid.data(), &dcid_len,
464✔
695
                                  token.data(), &token_len);
464✔
696
    if (res != 0) {
464!
697
      DEBUGLOG("Error in quiche_header_info: " << res);
×
698
      continue;
×
699
    }
×
700

701
    serverConnID.assign(dcid.begin(), dcid.begin() + dcid_len);
464✔
702
    clientConnID.assign(scid.begin(), scid.begin() + scid_len);
464✔
703
    auto conn = getConnection(frontend.d_server_config->d_connections, serverConnID);
464✔
704

705
    if (!conn) {
464✔
706
      DEBUGLOG("Connection not found");
124✔
707
      if (type != static_cast<uint8_t>(DOQ_Packet_Types::QUIC_PACKET_TYPE_INITIAL)) {
124!
708
        DEBUGLOG("Packet is not initial");
×
709
        continue;
×
710
      }
×
711

712
      if (!quiche_version_is_supported(version)) {
124!
713
        DEBUGLOG("Unsupported version");
×
714
        ++frontend.d_doqUnsupportedVersionErrors;
×
715
        handleVersionNegociation(sock, clientConnID, serverConnID, client, localAddr, buffer);
×
716
        continue;
×
717
      }
×
718

719
      if (token_len == 0) {
124✔
720
        /* stateless retry */
721
        DEBUGLOG("No token received");
62✔
722
        handleStatelessRetry(sock, clientConnID, serverConnID, client, localAddr, version, buffer);
62✔
723
        continue;
62✔
724
      }
62✔
725

726
      tokenBuf.assign(token.begin(), token.begin() + token_len);
62✔
727
      auto originalDestinationID = validateToken(tokenBuf, client);
62✔
728
      if (!originalDestinationID) {
62!
729
        ++frontend.d_doqInvalidTokensReceived;
×
730
        DEBUGLOG("Discarding invalid token");
×
731
        continue;
×
732
      }
×
733

734
      DEBUGLOG("Creating a new connection");
62✔
735
      conn = createConnection(*frontend.d_server_config, serverConnID, *originalDestinationID, client, localAddr);
62✔
736
      if (!conn) {
62!
737
        continue;
×
738
      }
×
739
    }
62✔
740
    DEBUGLOG("Connection found");
402✔
741
    quiche_recv_info recv_info = {
402✔
742
      // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
743
      reinterpret_cast<struct sockaddr*>(&client),
402✔
744
      client.getSocklen(),
402✔
745
      // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
746
      reinterpret_cast<struct sockaddr*>(&localAddr),
402✔
747
      localAddr.getSocklen(),
402✔
748
    };
402✔
749

750
    auto done = quiche_conn_recv(conn->get().d_conn.get(), buffer.data(), buffer.size(), &recv_info);
402✔
751
    if (done < 0) {
402!
752
      continue;
×
753
    }
×
754

755
    if (quiche_conn_is_established(conn->get().d_conn.get()) || quiche_conn_is_in_early_data(conn->get().d_conn.get())) {
402!
756
      auto readable = std::unique_ptr<quiche_stream_iter, decltype(&quiche_stream_iter_free)>(quiche_conn_readable(conn->get().d_conn.get()), quiche_stream_iter_free);
218✔
757

758
      uint64_t streamID = 0;
218✔
759
      while (quiche_stream_iter_next(readable.get(), &streamID)) {
279✔
760
        handleReadableStream(frontend, clientState, *conn, streamID, client, serverConnID);
61✔
761
      }
61✔
762

763
      flushEgress(sock, conn->get().d_conn, client, localAddr, buffer);
218✔
764
    }
218✔
765
    else {
184✔
766
      DEBUGLOG("Connection not established");
184✔
767
    }
184✔
768
  }
402✔
769
}
459✔
770

771
// this is the entrypoint from dnsdist.cc
772
void doqThread(ClientState* clientState)
773
{
12✔
774
  try {
12✔
775
    std::shared_ptr<DOQFrontend>& frontend = clientState->doqFrontend;
12✔
776

777
    frontend->d_server_config->clientState = clientState;
12✔
778
    frontend->d_server_config->df = clientState->doqFrontend;
12✔
779

780
    setThreadName("dnsdist/doq");
12✔
781

782
    Socket sock(clientState->udpFD);
12✔
783
    sock.setNonBlocking();
12✔
784

785
    auto mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
12✔
786

787
    auto responseReceiverFD = frontend->d_server_config->d_responseReceiver.getDescriptor();
12✔
788
    mplexer->addReadFD(sock.getHandle(), [](int, FDMultiplexer::funcparam_t&) {});
12✔
789
    mplexer->addReadFD(responseReceiverFD, [](int, FDMultiplexer::funcparam_t&) {});
12✔
790
    std::vector<int> readyFDs;
12✔
791
    PacketBuffer buffer(4096);
12✔
792
    while (true) {
604✔
793
      readyFDs.clear();
604✔
794
      mplexer->getAvailableFDs(readyFDs, 500);
604✔
795

796
      try {
604✔
797
        if (std::find(readyFDs.begin(), readyFDs.end(), sock.getHandle()) != readyFDs.end()) {
604✔
798
          handleSocketReadable(*frontend, *clientState, sock, buffer);
459✔
799
        }
459✔
800

801
        if (std::find(readyFDs.begin(), readyFDs.end(), responseReceiverFD) != readyFDs.end()) {
604✔
802
          flushResponses(frontend->d_server_config->d_responseReceiver);
42✔
803
        }
42✔
804

805
        for (auto conn = frontend->d_server_config->d_connections.begin(); conn != frontend->d_server_config->d_connections.end();) {
1,151✔
806
          quiche_conn_on_timeout(conn->second.d_conn.get());
547✔
807

808
          flushEgress(sock, conn->second.d_conn, conn->second.d_peer, conn->second.d_localAddr, buffer);
547✔
809

810
          if (quiche_conn_is_closed(conn->second.d_conn.get())) {
547✔
811
#ifdef DEBUGLOG_ENABLED
812
            quiche_stats stats;
813
            quiche_path_stats path_stats;
814

815
            quiche_conn_stats(conn->second.d_conn.get(), &stats);
816
            quiche_conn_path_stats(conn->second.d_conn.get(), 0, &path_stats);
817

818
            DEBUGLOG("Connection (DoQ) closed, recv=" << stats.recv << " sent=" << stats.sent << " lost=" << stats.lost << " rtt=" << path_stats.rtt << "ns cwnd=" << path_stats.cwnd);
819
#endif
820
            conn = frontend->d_server_config->d_connections.erase(conn);
53✔
821
          }
53✔
822
          else {
494✔
823
            flushStalledResponses(conn->second);
494✔
824
            ++conn;
494✔
825
          }
494✔
826
        }
547✔
827
      }
604✔
828
      catch (const std::exception& exp) {
604✔
829
        vinfolog("Caught exception in the main DoQ thread: %s", exp.what());
×
830
      }
×
831
      catch (...) {
604✔
832
        vinfolog("Unknown exception in the main DoQ thread");
×
833
      }
×
834
    }
604✔
835
  }
12✔
836
  catch (const std::exception& e) {
12✔
837
    DEBUGLOG("Caught fatal error in the main DoQ thread: " << e.what());
×
838
  }
×
839
}
12✔
840

841
#endif /* HAVE_DNS_OVER_QUIC */
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc