• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

PowerDNS / pdns / 19741624072

27 Nov 2025 03:45PM UTC coverage: 73.086% (+0.02%) from 73.065%
19741624072

Pull #16570

github

web-flow
Merge 08a2cdb1d into f94a3f63f
Pull Request #16570: rec: rewrite all unwrap calls in web.rs

38523 of 63408 branches covered (60.75%)

Branch coverage included in aggregate %.

128044 of 164496 relevant lines covered (77.84%)

6531485.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.97
/pdns/dnsdistdist/dnsdist-tcp-downstream.cc
1

2
#include "dnsdist-session-cache.hh"
3
#include "dnsdist-tcp-downstream.hh"
4
#include "dnsdist-tcp-upstream.hh"
5
#include "dnsdist-downstream-connection.hh"
6

7
#include "dnsparser.hh"
8

9
thread_local DownstreamTCPConnectionsManager t_downstreamTCPConnectionsManager;
10

11
ConnectionToBackend::~ConnectionToBackend()
12
{
310✔
13
  if (d_ds && d_handler) {
310!
14
    --d_ds->tcpCurrentConnections;
304✔
15
    struct timeval now;
304✔
16
    gettimeofday(&now, nullptr);
304✔
17

18
    if (d_handler->isTLS()) {
304✔
19
      if (d_handler->hasTLSSessionBeenResumed()) {
181✔
20
        ++d_ds->tlsResumptions;
24✔
21
      }
24✔
22
      try {
181✔
23
        auto sessions = d_handler->getTLSSessions();
181✔
24
        if (!sessions.empty()) {
181✔
25
          g_sessionCache.putSessions(d_ds->getID(), now.tv_sec, std::move(sessions));
38✔
26
        }
38✔
27
      }
181✔
28
      catch (const std::exception& e) {
181✔
29
        vinfolog("Unable to get a TLS session: %s", e.what());
×
30
      }
×
31
    }
181✔
32
    auto diff = now - d_connectionStartTime;
304✔
33
    // cerr<<"connection to backend terminated after "<<d_queries<<" queries, "<<diff.tv_sec<<" seconds"<<endl;
34
    d_ds->updateTCPMetrics(d_queries, diff.tv_sec * 1000 + diff.tv_usec / 1000);
304✔
35
  }
304✔
36
}
310✔
37

38
bool ConnectionToBackend::reconnect()
39
{
2,598✔
40
  std::unique_ptr<TLSSession> tlsSession{nullptr};
2,598✔
41
  if (d_handler) {
2,598✔
42
    DEBUGLOG("closing socket "<<d_handler->getDescriptor());
1,503✔
43
    if (d_handler->isTLS()) {
1,503✔
44
      if (d_handler->hasTLSSessionBeenResumed()) {
115✔
45
        ++d_ds->tlsResumptions;
52✔
46
      }
52✔
47
      try {
115✔
48
        auto sessions = d_handler->getTLSSessions();
115✔
49
        if (!sessions.empty()) {
115✔
50
          tlsSession = std::move(sessions.back());
52✔
51
          sessions.pop_back();
52✔
52
          if (!sessions.empty()) {
52!
53
            g_sessionCache.putSessions(d_ds->getID(), time(nullptr), std::move(sessions));
×
54
          }
×
55
        }
52✔
56
      }
115✔
57
      catch (const std::exception& e) {
115✔
58
        vinfolog("Unable to get a TLS session to resume: %s", e.what());
×
59
      }
×
60
    }
115✔
61
    d_handler->close();
1,503✔
62
    d_ioState.reset();
1,503✔
63
    d_handler.reset();
1,503✔
64
    --d_ds->tcpCurrentConnections;
1,503✔
65
  }
1,503✔
66

67
  d_fresh = true;
2,598✔
68
  d_highestStreamID = 0;
2,598✔
69
  d_proxyProtocolPayloadSent = false;
2,598✔
70

71
  do {
2,655✔
72
    DEBUGLOG("TCP connecting to downstream "<<d_ds->getNameWithAddr()<<" ("<<d_downstreamFailures<<")");
2,655✔
73
    DEBUGLOG("Opening TCP connection to backend "<<d_ds->getNameWithAddr());
2,655✔
74
    ++d_ds->tcpNewConnections;
2,655✔
75
    try {
2,655✔
76
      auto socket = Socket(d_ds->d_config.remote.sin4.sin_family, SOCK_STREAM, 0);
2,655✔
77
      DEBUGLOG("result of socket() is "<<socket.getHandle());
2,655✔
78

79
      /* disable NAGLE, which does not play nicely with delayed ACKs.
80
         In theory we could be wasting up to 500 milliseconds waiting for
81
         the other end to acknowledge our initial packet before we could
82
         send the rest. */
83
      setTCPNoDelay(socket.getHandle());
2,655✔
84
      setDscp(socket.getHandle(), d_ds->d_config.remote.sin4.sin_family, d_ds->d_config.dscp);
2,655✔
85

86
#ifdef SO_BINDTODEVICE
2,655✔
87
      if (!d_ds->d_config.sourceItfName.empty()) {
2,655!
88
        int res = setsockopt(socket.getHandle(), SOL_SOCKET, SO_BINDTODEVICE, d_ds->d_config.sourceItfName.c_str(), d_ds->d_config.sourceItfName.length());
×
89
        if (res != 0) {
×
90
          vinfolog("Error setting up the interface on backend TCP socket '%s': %s", d_ds->getNameWithAddr(), stringerror());
×
91
        }
×
92
      }
×
93
#endif
2,655✔
94

95
      if (!IsAnyAddress(d_ds->d_config.sourceAddr)) {
2,655!
96
        SSetsockopt(socket.getHandle(), SOL_SOCKET, SO_REUSEADDR, 1);
×
97
#ifdef IP_BIND_ADDRESS_NO_PORT
×
98
        if (d_ds->d_config.ipBindAddrNoPort) {
×
99
          SSetsockopt(socket.getHandle(), SOL_IP, IP_BIND_ADDRESS_NO_PORT, 1);
×
100
        }
×
101
#endif
×
102
        socket.bind(d_ds->d_config.sourceAddr, false);
×
103
      }
×
104
      socket.setNonBlocking();
2,655✔
105

106
      gettimeofday(&d_connectionStartTime, nullptr);
2,655✔
107
      auto handler = std::make_unique<TCPIOHandler>(d_ds->d_config.d_tlsSubjectName, d_ds->d_config.d_tlsSubjectIsAddr, socket.releaseHandle(), timeval{0,0}, d_ds->d_tlsCtx);
2,655✔
108
      if (!tlsSession && d_ds->d_tlsCtx) {
2,655✔
109
        tlsSession = g_sessionCache.getSession(d_ds->getID(), d_connectionStartTime.tv_sec);
346✔
110
      }
346✔
111
      if (tlsSession) {
2,655✔
112
        handler->setTLSSession(tlsSession);
100✔
113
      }
100✔
114
      handler->tryConnect(d_ds->d_config.tcpFastOpen && isFastOpenEnabled(), d_ds->d_config.remote);
2,655!
115
      d_queries = 0;
2,655✔
116

117
      d_handler = std::move(handler);
2,655✔
118
      d_ds->incCurrentConnectionsCount();
2,655✔
119
      return true;
2,655✔
120
    }
2,655✔
121
    catch (const std::runtime_error& e) {
2,655✔
122
      vinfolog("Connection to downstream server %s failed: %s", d_ds->getNameWithAddr(), e.what());
72✔
123
      d_downstreamFailures++;
72✔
124
      if (d_downstreamFailures >= d_ds->d_config.d_retries) {
72✔
125
        throw;
15✔
126
      }
15✔
127
    }
72✔
128
  }
2,655✔
129
  while (d_downstreamFailures < d_ds->d_config.d_retries);
2,598!
130

131
  return false;
×
132
}
2,598✔
133

134
TCPConnectionToBackend::~TCPConnectionToBackend()
135
{
241✔
136
  if (d_ds && !d_pendingResponses.empty()) {
241!
137
    d_ds->outstanding -= d_pendingResponses.size();
×
138
  }
×
139
}
241✔
140

141
void TCPConnectionToBackend::release(bool removeFromCache)
142
{
96✔
143
  d_ds->outstanding -= d_pendingResponses.size();
96✔
144

145
  d_pendingResponses.clear();
96✔
146
  d_pendingQueries.clear();
96✔
147

148
  if (d_ioState) {
96✔
149
    d_ioState.reset();
59✔
150
  }
59✔
151

152
  if (removeFromCache && !willBeReusable(true)) {
96!
153
    auto shared = std::dynamic_pointer_cast<TCPConnectionToBackend>(shared_from_this());
78✔
154
    /* remove ourselves from the connection cache, this might mean that our
155
       reference count drops to zero after that, so we need to be careful */
156
    t_downstreamTCPConnectionsManager.removeDownstreamConnection(shared);
78✔
157
  }
78✔
158
}
96✔
159

160
static uint32_t getSerialFromRawSOAContent(const std::vector<uint8_t>& raw)
161
{
66✔
162
  /* minimal size for a SOA record, as defined by rfc1035:
163
     MNAME (root): 1
164
     RNAME (root): 1
165
     SERIAL: 4
166
     REFRESH: 4
167
     RETRY: 4
168
     EXPIRE: 4
169
     MINIMUM: 4
170
     = 22 bytes
171
  */
172
  if (raw.size() < 22) {
66!
173
    throw std::runtime_error("Invalid content of size " + std::to_string(raw.size()) + " for a SOA record");
×
174
  }
×
175
  /* As rfc1025 states that "all domain names in the RDATA section of these RRs may be compressed",
176
     and we don't want to parse these names, start at the end */
177
  uint32_t serial = 0;
66✔
178
  memcpy(&serial, &raw.at(raw.size() - 20), sizeof(serial));
66✔
179
  return ntohl(serial);
66✔
180
}
66✔
181

182
static bool getSerialFromIXFRQuery(TCPQuery& query)
183
{
8✔
184
  try {
8✔
185
    size_t proxyPayloadSize = query.d_proxyProtocolPayloadAdded ? query.d_idstate.d_proxyProtocolPayloadSize : 0;
8!
186
    if (query.d_buffer.size() <= (proxyPayloadSize + sizeof(uint16_t))) {
8!
187
      return false;
×
188
    }
×
189

190
    size_t payloadSize = query.d_buffer.size() - sizeof(uint16_t) - proxyPayloadSize;
8✔
191

192
    MOADNSParser parser(true, reinterpret_cast<const char*>(query.d_buffer.data() + sizeof(uint16_t) + proxyPayloadSize), payloadSize);
8✔
193

194
    for (const auto& record : parser.d_answers) {
8!
195
      if (record.d_place != DNSResourceRecord::AUTHORITY || record.d_class != QClass::IN || record.d_type != QType::SOA) {
×
196
        return false;
×
197
      }
×
198

199
      auto unknownContent = getRR<UnknownRecordContent>(record);
×
200
      if (!unknownContent) {
×
201
        return false;
×
202
      }
×
203
      const auto& raw = unknownContent->getRawContent();
×
204
      query.d_ixfrQuerySerial = getSerialFromRawSOAContent(raw);
×
205
      return true;
×
206
    }
×
207
  }
8✔
208
  catch (const MOADNSException& e) {
8✔
209
    DEBUGLOG("Exception when parsing IXFR TCP Query to DNS: " << e.what());
×
210
    /* ponder what to do here, shall we close the connection? */
211
  }
×
212

213
  return false;
8✔
214
}
8✔
215

216
static void editPayloadID(PacketBuffer& payload, uint16_t newId, size_t proxyProtocolPayloadSize, bool sizePrepended)
217
{
67,933✔
218
  /* we cannot do a direct cast as the alignment might be off (the size of the payload might have been prepended, which is bad enough,
219
     but we might also have a proxy protocol payload */
220
  size_t startOfHeaderOffset = (sizePrepended ? sizeof(uint16_t) : 0) + proxyProtocolPayloadSize;
67,933✔
221
  if (payload.size() < startOfHeaderOffset + sizeof(dnsheader)) {
67,933!
222
    throw std::runtime_error("Invalid buffer for outgoing TCP query (size " + std::to_string(payload.size()));
×
223
  }
×
224
  uint16_t id = htons(newId);
67,933✔
225
  memcpy(&payload.at(startOfHeaderOffset), &id, sizeof(id));
67,933✔
226
}
67,933✔
227

228
enum class ConnectionState : uint8_t {
229
  needProxy,
230
  proxySent
231
};
232

233
static void prepareQueryForSending(TCPQuery& query, uint16_t queryID, ConnectionState connectionState)
234
{
34,529✔
235
  if (connectionState == ConnectionState::needProxy) {
34,529✔
236
    if (query.d_proxyProtocolPayload.size() > 0 && !query.d_proxyProtocolPayloadAdded) {
64!
237
      query.d_buffer.insert(query.d_buffer.begin(), query.d_proxyProtocolPayload.begin(), query.d_proxyProtocolPayload.end());
31✔
238
      query.d_proxyProtocolPayloadAdded = true;
31✔
239
      query.d_idstate.d_proxyProtocolPayloadSize = query.d_proxyProtocolPayload.size();
31✔
240
    }
31✔
241
  }
64✔
242
  else if (connectionState == ConnectionState::proxySent) {
34,465!
243
    if (query.d_proxyProtocolPayloadAdded) {
34,465!
244
      if (query.d_buffer.size() < query.d_idstate.d_proxyProtocolPayloadSize) {
×
245
        throw std::runtime_error("Trying to remove a proxy protocol payload of size " + std::to_string(query.d_proxyProtocolPayload.size()) + " from a buffer of size " + std::to_string(query.d_buffer.size()));
×
246
      }
×
247
      // NOLINTNEXTLINE(*-narrowing-conversions): the size of the payload is limited to 2^16-1
248
      query.d_buffer.erase(query.d_buffer.begin(), query.d_buffer.begin() + static_cast<ssize_t>(query.d_idstate.d_proxyProtocolPayloadSize));
×
249
      query.d_proxyProtocolPayloadAdded = false;
×
250
      query.d_idstate.d_proxyProtocolPayloadSize = 0;
×
251
    }
×
252
  }
34,465✔
253
  if (query.d_idstate.qclass == QClass::IN && query.d_idstate.qtype == QType::IXFR) {
34,529!
254
    getSerialFromIXFRQuery(query);
8✔
255
  }
8✔
256

257
  editPayloadID(query.d_buffer, queryID, query.d_proxyProtocolPayloadAdded ? query.d_idstate.d_proxyProtocolPayloadSize : 0, true);
34,529✔
258
}
34,529✔
259

260
IOState TCPConnectionToBackend::queueNextQuery(std::shared_ptr<TCPConnectionToBackend>& conn)
261
{
1,473✔
262
  conn->d_currentQuery = std::move(conn->d_pendingQueries.front());
1,473✔
263

264
  uint16_t id = conn->d_highestStreamID;
1,473✔
265
  prepareQueryForSending(conn->d_currentQuery.d_query, id, conn->needProxyProtocolPayload() ? ConnectionState::needProxy : ConnectionState::proxySent);
1,473✔
266

267
  conn->d_pendingQueries.pop_front();
1,473✔
268
  conn->d_state = State::sendingQueryToBackend;
1,473✔
269
  conn->d_currentPos = 0;
1,473✔
270

271
  return IOState::NeedWrite;
1,473✔
272
}
1,473✔
273

274
IOState TCPConnectionToBackend::sendQuery(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now)
275
{
34,614✔
276
  (void)now;
34,614✔
277
  DEBUGLOG("sending query to backend "<<conn->getDS()->getNameWithAddr()<<" over FD "<<conn->d_handler->getDescriptor());
34,614✔
278

279
  IOState state = conn->d_handler->tryWrite(conn->d_currentQuery.d_query.d_buffer, conn->d_currentPos, conn->d_currentQuery.d_query.d_buffer.size());
34,614✔
280

281
  if (state != IOState::Done) {
34,614✔
282
    conn->d_lastIOBlocked = true;
91✔
283
    return state;
91✔
284
  }
91✔
285

286
  DEBUGLOG("query sent to backend");
34,523✔
287
  /* request sent ! */
288
  if (conn->d_currentQuery.d_query.d_proxyProtocolPayloadAdded) {
34,523✔
289
    conn->d_proxyProtocolPayloadSent = true;
33✔
290
  }
33✔
291
  ++conn->d_queries;
34,523✔
292
  conn->d_currentPos = 0;
34,523✔
293

294
  DEBUGLOG("adding a pending response for ID "<<conn->d_highestStreamID<<" and QNAME "<<conn->d_currentQuery.d_query.d_idstate.qname);
34,523✔
295
  auto res = conn->d_pendingResponses.insert({conn->d_highestStreamID, std::move(conn->d_currentQuery)});
34,523✔
296
  /* if there was already a pending response with that ID, we messed up and we don't expect more
297
     than one response */
298
  if (res.second) {
34,523✔
299
    ++conn->d_ds->outstanding;
34,485✔
300
  }
34,485✔
301
  ++conn->d_highestStreamID;
34,523✔
302
  conn->d_currentQuery.d_sender.reset();
34,523✔
303
  conn->d_currentQuery.d_query.d_buffer.clear();
34,523✔
304

305
  return state;
34,523✔
306
}
34,614✔
307

308
void TCPConnectionToBackend::handleReconnectionAttempt(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now, IOStateGuard& ioGuard, IOState& iostate, bool& reconnected, bool& connectionDied)
309
{
1,511✔
310
  DEBUGLOG("connection died, number of failures is "<<conn->d_downstreamFailures<<", retries is "<<conn->d_ds->d_config.d_retries);
1,511✔
311

312
  if (conn->d_downstreamFailures < conn->d_ds->d_config.d_retries) {
1,511✔
313

314
    conn->d_ioState.reset();
1,503✔
315
    ioGuard.release();
1,503✔
316

317
    try {
1,503✔
318
      if (conn->reconnect()) {
1,503✔
319
        conn->d_ioState = make_unique<IOStateHandler>(*conn->d_mplexer, conn->d_handler->getDescriptor());
1,497✔
320

321
        /* we need to resend the queries that were in flight, if any */
322
        if (conn->d_state == State::sendingQueryToBackend) {
1,497✔
323
          /* we need to edit this query so it has the correct ID */
324
          auto query = std::move(conn->d_currentQuery);
31✔
325
          uint16_t streamId = conn->d_highestStreamID;
31✔
326
          prepareQueryForSending(query.d_query, streamId, ConnectionState::needProxy);
31✔
327
          conn->d_currentQuery = std::move(query);
31✔
328
        }
31✔
329

330
        /* if we notify the sender it might terminate us so we need to move these first */
331
        auto pendingResponses = std::move(conn->d_pendingResponses);
1,497✔
332
        conn->d_pendingResponses.clear();
1,497✔
333
        for (auto& pending : pendingResponses) {
1,497✔
334
          --conn->d_ds->outstanding;
1,469✔
335

336
          if (pending.second.d_query.isXFR() && pending.second.d_query.d_xfrStarted) {
1,469!
337
            /* this one can't be restarted, sorry */
338
            DEBUGLOG("A XFR for which a response has already been sent cannot be restarted");
5✔
339
            try {
5✔
340
              TCPResponse response(std::move(pending.second.d_query));
5✔
341
              pending.second.d_sender->notifyIOError(now, std::move(response));
5✔
342
            }
5✔
343
            catch (const std::exception& exp) {
5✔
344
              vinfolog("Got an exception while notifying: %s", exp.what());
×
345
            }
×
346
            catch (...) {
5✔
347
              vinfolog("Got exception while notifying");
×
348
            }
×
349
          }
5✔
350
          else {
1,464✔
351
            conn->d_pendingQueries.push_back(std::move(pending.second));
1,464✔
352
          }
1,464✔
353
        }
1,469✔
354
        conn->d_currentPos = 0;
1,497✔
355

356
        if (conn->d_state == State::sendingQueryToBackend) {
1,497✔
357
          iostate = IOState::NeedWrite;
31✔
358
          // resume sending query
359
        }
31✔
360
        else {
1,466✔
361
          if (conn->d_pendingQueries.empty()) {
1,466✔
362
            throw std::runtime_error("TCP connection to a backend in state " + std::to_string((int)conn->d_state) + " with no pending queries");
5✔
363
          }
5✔
364

365
          iostate = queueNextQuery(conn);
1,461✔
366
        }
1,461✔
367

368
        reconnected = true;
1,492✔
369
        connectionDied = false;
1,492✔
370
      }
1,492✔
371
    }
1,503✔
372
    catch (const std::exception& exp) {
1,503✔
373
      // reconnect might throw on failure, let's ignore that, we just need to know
374
      // it failed
375
    }
11✔
376
  }
1,503✔
377

378
  if (!reconnected) {
1,511✔
379
    /* reconnect failed, we give up */
380
    DEBUGLOG("reconnect failed, we give up");
19✔
381
    ++conn->d_ds->tcpGaveUp;
19✔
382
    conn->notifyAllQueriesFailed(now, FailureReason::gaveUp);
19✔
383
  }
19✔
384
}
1,511✔
385

386

387
void TCPConnectionToBackend::handleIO(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now)
388
{
36,595✔
389
  if (conn->d_handler == nullptr) {
36,595!
390
    throw std::runtime_error("No downstream socket in " + std::string(__PRETTY_FUNCTION__) + "!");
×
391
  }
×
392

393
  if (conn->d_handlingIO) {
36,595✔
394
    return;
486✔
395
  }
486✔
396
  dnsdist::tcp::HandlingIOGuard reentryGuard(conn->d_handlingIO);
36,109✔
397

398
  bool connectionDied = false;
36,109✔
399
  IOState iostate = IOState::Done;
36,109✔
400
  IOStateGuard ioGuard(conn->d_ioState);
36,109✔
401
  bool reconnected = false;
36,109✔
402

403
  do {
38,582✔
404
    reconnected = false;
38,582✔
405
    conn->d_lastIOBlocked = false;
38,582✔
406

407
    try {
38,582✔
408
      if (conn->d_state == State::sendingQueryToBackend) {
38,582✔
409
        iostate = sendQuery(conn, now);
34,605✔
410

411
        while (iostate == IOState::Done && !conn->d_pendingQueries.empty()) {
34,614✔
412
          queueNextQuery(conn);
9✔
413
          iostate = sendQuery(conn, now);
9✔
414
        }
9✔
415

416
        if (iostate == IOState::Done && conn->d_pendingQueries.empty()) {
34,605!
417
          conn->d_state = State::waitingForResponseFromBackend;
34,476✔
418
          conn->d_currentPos = 0;
34,476✔
419
          conn->d_responseBuffer.resize(sizeof(uint16_t));
34,476✔
420
          iostate = IOState::NeedRead;
34,476✔
421
        }
34,476✔
422
      }
34,605✔
423

424
      if (conn->d_state == State::waitingForResponseFromBackend ||
38,582✔
425
          conn->d_state == State::readingResponseSizeFromBackend) {
38,582✔
426
        DEBUGLOG("reading response size from backend");
37,975✔
427
        // then we need to allocate a new buffer (new because we might need to re-send the query if the
428
        // backend dies on us)
429
        // We also might need to read and send to the client more than one response in case of XFR (yeah!)
430
        conn->d_responseBuffer.resize(sizeof(uint16_t));
37,975✔
431
        iostate = conn->d_handler->tryRead(conn->d_responseBuffer, conn->d_currentPos, sizeof(uint16_t));
37,975✔
432
        if (iostate == IOState::Done) {
37,975✔
433
          DEBUGLOG("got response size from backend");
33,410✔
434
          conn->d_state = State::readingResponseFromBackend;
33,410✔
435
          conn->d_responseSize = conn->d_responseBuffer.at(0) * 256 + conn->d_responseBuffer.at(1);
33,410✔
436
          conn->d_responseBuffer.reserve(conn->d_responseSize + /* we will need to prepend the size later */ 2);
33,410✔
437
          conn->d_responseBuffer.resize(conn->d_responseSize);
33,410✔
438
          conn->d_currentPos = 0;
33,410✔
439
          conn->d_lastDataReceivedTime = now;
33,410✔
440
        }
33,410✔
441
        else {
4,565✔
442
          conn->d_lastIOBlocked = true;
4,565✔
443
          if (conn->d_state == State::waitingForResponseFromBackend && conn->d_currentPos > 0) {
4,565✔
444
            conn->d_state = State::readingResponseSizeFromBackend;
3✔
445
          }
3✔
446
        }
4,565✔
447
      }
37,975✔
448

449
      if (conn->d_state == State::readingResponseFromBackend) {
38,582✔
450
        DEBUGLOG("reading response from backend");
33,888✔
451
        iostate = conn->d_handler->tryRead(conn->d_responseBuffer, conn->d_currentPos, conn->d_responseSize);
33,888✔
452
        if (iostate == IOState::Done) {
33,888✔
453
          DEBUGLOG("got response from backend");
33,410✔
454
          try {
33,410✔
455
            conn->d_lastDataReceivedTime = now;
33,410✔
456
            iostate = conn->handleResponse(conn, now);
33,410✔
457
          }
33,410✔
458
          catch (const std::exception& e) {
33,410✔
459
            vinfolog("Got an exception while handling TCP response from %s (client is %s): %s", conn->d_ds ? conn->d_ds->getNameWithAddr() : "unknown", conn->d_currentQuery.d_query.d_idstate.origRemote.toStringWithPort(), e.what());
3!
460
            ioGuard.release();
3✔
461
            conn->release(true);
3✔
462
            return;
3✔
463
          }
3✔
464
        }
33,410✔
465
        else {
478✔
466
          conn->d_lastIOBlocked = true;
478✔
467
        }
478✔
468
      }
33,888✔
469

470
      if (conn->d_state != State::idle &&
38,579✔
471
          conn->d_state != State::sendingQueryToBackend &&
38,579✔
472
          conn->d_state != State::waitingForResponseFromBackend &&
38,579✔
473
          conn->d_state != State::readingResponseSizeFromBackend &&
38,579✔
474
          conn->d_state != State::readingResponseFromBackend) {
38,579!
475
        vinfolog("Unexpected state %d in TCPConnectionToBackend::handleIO", static_cast<int>(conn->d_state));
×
476
      }
×
477
    }
38,579✔
478
    catch (const std::exception& e) {
38,582✔
479
      /* most likely an EOF because the other end closed the connection,
480
         but it might also be a real IO error or something else.
481
         Let's just drop the connection
482
      */
483
      vinfolog("Got an exception while handling (%s backend) TCP query from %s: %s", (conn->d_state == State::sendingQueryToBackend ? "writing to" : "reading from"), conn->d_currentQuery.d_query.d_idstate.origRemote.toStringWithPort(), e.what());
1,511!
484

485
      if (conn->d_state == State::sendingQueryToBackend) {
1,511✔
486
        ++conn->d_ds->tcpDiedSendingQuery;
38✔
487
      }
38✔
488
      else if (conn->d_state != State::idle) {
1,473!
489
        ++conn->d_ds->tcpDiedReadingResponse;
1,473✔
490
      }
1,473✔
491

492
      /* don't increase this counter when reusing connections */
493
      if (conn->d_fresh) {
1,511✔
494
        ++conn->d_downstreamFailures;
72✔
495
      }
72✔
496

497
      /* remove this FD from the IO multiplexer */
498
      iostate = IOState::Done;
1,511✔
499
      connectionDied = true;
1,511✔
500
    }
1,511✔
501

502
    if (connectionDied) {
38,579✔
503
      handleReconnectionAttempt(conn, now, ioGuard, iostate, reconnected, connectionDied);
1,511✔
504
    }
1,511✔
505

506
    if (conn->d_ioState) {
38,579✔
507
      if (iostate == IOState::Done) {
38,557✔
508
        conn->d_ioState->update(iostate, handleIOCallback, conn);
32,423✔
509
      }
32,423✔
510
      else {
6,134✔
511
        boost::optional<struct timeval> ttd{boost::none};
6,134✔
512
        if (iostate == IOState::NeedRead) {
6,134✔
513
          ttd = conn->getBackendReadTTD(now);
4,141✔
514
        }
4,141✔
515
        else if (conn->isFresh() && conn->d_queries == 0) {
1,993!
516
          /* first write just after the non-blocking connect */
517
          ttd = conn->getBackendConnectTTD(now);
1,504✔
518
        }
1,504✔
519
        else {
489✔
520
          ttd = conn->getBackendWriteTTD(now);
489✔
521
        }
489✔
522

523
        conn->d_ioState->update(iostate, handleIOCallback, conn, ttd);
6,134✔
524
      }
6,134✔
525
    }
38,557✔
526
  }
38,579✔
527
  while (reconnected || (iostate != IOState::Done && !conn->d_connectionDied && !conn->d_lastIOBlocked));
38,579!
528

529
  ioGuard.release();
36,106✔
530
}
36,106✔
531

532
void TCPConnectionToBackend::handleIOCallback(int fd, FDMultiplexer::funcparam_t& param)
533
{
3,570✔
534
  auto conn = boost::any_cast<std::shared_ptr<TCPConnectionToBackend>>(param);
3,570✔
535
  if (fd != conn->getHandle()) {
3,570!
536
    throw std::runtime_error("Unexpected socket descriptor " + std::to_string(fd) + " received in " + std::string(__PRETTY_FUNCTION__) + ", expected " + std::to_string(conn->getHandle()));
×
537
  }
×
538

539
  struct timeval now;
3,570✔
540
  gettimeofday(&now, nullptr);
3,570✔
541
  handleIO(conn, now);
3,570✔
542
}
3,570✔
543

544
void TCPConnectionToBackend::queueQuery(std::shared_ptr<TCPQuerySender>& sender, TCPQuery&& query)
545
{
33,040✔
546
  if (!d_ioState) {
33,040✔
547
    d_ioState = make_unique<IOStateHandler>(*d_mplexer, d_handler->getDescriptor());
1,006✔
548
  }
1,006✔
549

550
  // if we are not already sending a query or in the middle of reading a response (so idle),
551
  // start sending the query
552
  if (d_state == State::idle || d_state == State::waitingForResponseFromBackend) {
33,040✔
553
    DEBUGLOG("Sending new query to backend right away, with ID "<<d_highestStreamID);
33,025✔
554
    d_state = State::sendingQueryToBackend;
33,025✔
555
    d_currentPos = 0;
33,025✔
556

557
    uint16_t id = d_highestStreamID;
33,025✔
558

559
    d_currentQuery = PendingRequest({sender, std::move(query)});
33,025✔
560
    prepareQueryForSending(d_currentQuery.d_query, id, needProxyProtocolPayload() ? ConnectionState::needProxy : ConnectionState::proxySent);
33,025✔
561

562
    struct timeval now;
33,025✔
563
    gettimeofday(&now, 0);
33,025✔
564

565
    auto shared = std::dynamic_pointer_cast<TCPConnectionToBackend>(shared_from_this());
33,025✔
566
    handleIO(shared, now);
33,025✔
567
  }
33,025✔
568
  else {
15✔
569
    DEBUGLOG("Adding new query to the queue because we are in state "<<(int)d_state);
15✔
570
    // store query in the list of queries to send
571
    d_pendingQueries.push_back(PendingRequest({sender, std::move(query)}));
15✔
572
  }
15✔
573
}
33,040✔
574

575
void TCPConnectionToBackend::handleTimeout(const struct timeval& now, bool write)
576
{
25✔
577
  /* in some cases we could retry, here, reconnecting and sending our pending responses again */
578
  if (write) {
25✔
579
    if (isFresh() && d_queries == 0) {
6!
580
      ++d_ds->tcpConnectTimeouts;
3✔
581
      vinfolog("Timeout while connecting to TCP backend %s", d_ds->getNameWithAddr());
3!
582
    }
3✔
583
    else {
3✔
584
      ++d_ds->tcpWriteTimeouts;
3✔
585
      vinfolog("Timeout while writing to TCP backend %s", d_ds->getNameWithAddr());
3!
586
    }
3✔
587
  }
6✔
588
  else {
19✔
589
    ++d_ds->tcpReadTimeouts;
19✔
590
    vinfolog("Timeout while reading from TCP backend %s", d_ds->getNameWithAddr());
19✔
591
  }
19✔
592

593
  try {
25✔
594
    notifyAllQueriesFailed(now, FailureReason::timeout);
25✔
595
  }
25✔
596
  catch (const std::exception& e) {
25✔
597
    vinfolog("Got an exception while notifying a timeout: %s", e.what());
×
598
  }
×
599
  catch (...) {
25✔
600
    vinfolog("Got exception while notifying a timeout");
×
601
  }
×
602

603
  release(true);
25✔
604
}
25✔
605

606
void TCPConnectionToBackend::notifyAllQueriesFailed(const struct timeval& now, FailureReason reason)
607
{
50✔
608
  d_connectionDied = true;
50✔
609
  d_ds->reportTimeoutOrError();
50✔
610

611
  /* we might be terminated while notifying a query sender */
612
  d_ds->outstanding -= d_pendingResponses.size();
50✔
613
  auto pendingQueries = std::move(d_pendingQueries);
50✔
614
  d_pendingQueries.clear();
50✔
615
  auto pendingResponses = std::move(d_pendingResponses);
50✔
616
  d_pendingResponses.clear();
50✔
617

618
  auto increaseCounters = [reason](const ClientState* cs) {
56✔
619
    if (reason == FailureReason::timeout) {
54✔
620
      if (cs) {
34!
621
        ++cs->tcpDownstreamTimeouts;
34✔
622
      }
34✔
623
    }
34✔
624
    else if (reason == FailureReason::gaveUp) {
20✔
625
      if (cs) {
14!
626
        ++cs->tcpGaveUp;
14✔
627
      }
14✔
628
    }
14✔
629
  };
54✔
630

631
  const auto& chains = dnsdist::configuration::getCurrentRuntimeConfiguration().d_ruleChains;
50✔
632
  const auto& timeoutRespRules = dnsdist::rules::getResponseRuleChain(chains, dnsdist::rules::ResponseRuleChain::TimeoutResponseRules);
50✔
633

634
  try {
50✔
635
    if (d_state == State::sendingQueryToBackend) {
50✔
636
      increaseCounters(d_currentQuery.d_query.d_idstate.cs);
13✔
637
      auto sender = std::move(d_currentQuery.d_sender);
13✔
638
      if (sender->active()) {
13!
639
        if (!handleTimeoutResponseRules(timeoutRespRules, d_currentQuery.d_query.d_idstate, d_ds, sender)) {
13!
640
          TCPResponse response(std::move(d_currentQuery.d_query));
13✔
641
          sender->notifyIOError(now, std::move(response));
13✔
642
        }
13✔
643
      }
13✔
644
    }
13✔
645

646
    for (auto& query : pendingQueries) {
50✔
647
      increaseCounters(query.d_query.d_idstate.cs);
6✔
648
      auto sender = std::move(query.d_sender);
6✔
649
      if (sender->active()) {
6!
650
        if (!handleTimeoutResponseRules(timeoutRespRules, query.d_query.d_idstate, d_ds, sender)) {
×
651
          TCPResponse response(std::move(query.d_query));
×
652
          sender->notifyIOError(now, std::move(response));
×
653
        }
×
654
      }
×
655
    }
6✔
656

657
    for (auto& response : pendingResponses) {
50✔
658
      increaseCounters(response.second.d_query.d_idstate.cs);
35✔
659
      auto sender = std::move(response.second.d_sender);
35✔
660
      if (sender->active()) {
35✔
661
        if (!handleTimeoutResponseRules(timeoutRespRules, response.second.d_query.d_idstate, d_ds, sender)) {
29✔
662
          TCPResponse tresp(std::move(response.second.d_query));
19✔
663
          sender->notifyIOError(now, std::move(tresp));
19✔
664
        }
19✔
665
      }
29✔
666
    }
35✔
667
  }
50✔
668
  catch (const std::exception& e) {
50✔
669
    vinfolog("Got an exception while notifying: %s", e.what());
×
670
  }
×
671
  catch (...) {
50✔
672
    vinfolog("Got exception while notifying");
×
673
  }
×
674

675
  release(true);
50✔
676
}
50✔
677

678
IOState TCPConnectionToBackend::handleResponse(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now)
679
{
33,410✔
680
  d_downstreamFailures = 0;
33,410✔
681

682
  uint16_t queryId = 0;
33,410✔
683
  try {
33,410✔
684
    queryId = getQueryIdFromResponse();
33,410✔
685
  }
33,410✔
686
  catch (const std::exception& e) {
33,410✔
687
    DEBUGLOG("Unable to get query ID");
3✔
688
    notifyAllQueriesFailed(now, FailureReason::unexpectedQueryID);
3✔
689
    throw;
3✔
690
  }
3✔
691

692
  auto it = d_pendingResponses.find(queryId);
33,407✔
693
  if (it == d_pendingResponses.end()) {
33,407✔
694
    DEBUGLOG("could not find any corresponding query for ID "<<queryId<<". This is likely a duplicated ID over the same TCP connection, giving up!");
3✔
695
    notifyAllQueriesFailed(now, FailureReason::unexpectedQueryID);
3✔
696
    return IOState::Done;
3✔
697
  }
3✔
698

699
  editPayloadID(d_responseBuffer, ntohs(it->second.d_query.d_idstate.origID), 0, false);
33,404✔
700

701
  auto sender = it->second.d_sender;
33,404✔
702

703
  if (sender->active() && it->second.d_query.isXFR()) {
33,404✔
704
    DEBUGLOG("XFR!");
455✔
705
    bool done = false;
455✔
706
    TCPResponse response;
455✔
707
    response.d_buffer = std::move(d_responseBuffer);
455✔
708
    response.d_connection = conn;
455✔
709
    response.d_ds = conn->d_ds;
455✔
710
    const auto& queryIDS = it->second.d_query.d_idstate;
455✔
711
    /* we don't move the whole IDS because we will need it for the responses to come */
712
    response.d_idstate = queryIDS.partialCloneForXFR();
455✔
713
    DEBUGLOG("passing XFRresponse to client connection for "<<response.d_idstate.qname);
455✔
714

715
    it->second.d_query.d_xfrStarted = true;
455✔
716
    done = isXFRFinished(response, it->second.d_query);
455✔
717

718
    if (done) {
455✔
719
      d_pendingResponses.erase(it);
23✔
720
      --conn->d_ds->outstanding;
23✔
721
      /* marking as idle for now, so we can accept new queries if our queues are empty */
722
      if (d_pendingQueries.empty() && d_pendingResponses.empty()) {
23!
723
        d_state = State::idle;
23✔
724
        t_downstreamTCPConnectionsManager.moveToIdle(conn);
23✔
725
      }
23✔
726
    }
23✔
727

728
    sender->handleXFRResponse(now, std::move(response));
455✔
729
    if (done) {
455✔
730
      d_state = State::idle;
23✔
731
      t_downstreamTCPConnectionsManager.moveToIdle(conn);
23✔
732
      return IOState::Done;
23✔
733
    }
23✔
734

735
    d_state = State::waitingForResponseFromBackend;
432✔
736
    d_currentPos = 0;
432✔
737
    d_responseBuffer.resize(sizeof(uint16_t));
432✔
738
    // get ready to read the next packet, if any
739
    return IOState::NeedRead;
432✔
740
  }
455✔
741

742
  --conn->d_ds->outstanding;
32,949✔
743
  auto ids = std::move(it->second.d_query.d_idstate);
32,949✔
744
  const double udiff = ids.queryRealTime.udiff();
32,949✔
745
  conn->d_ds->updateTCPLatency(udiff);
32,949✔
746
  if (d_responseBuffer.size() >= sizeof(dnsheader)) {
32,949!
747
    dnsheader dh;
32,949✔
748
    memcpy(&dh, d_responseBuffer.data(), sizeof(dh));
32,949✔
749
    conn->d_ds->reportResponse(dh.rcode);
32,949✔
750
  }
32,949✔
751
  else {
×
752
    conn->d_ds->reportTimeoutOrError();
×
753
  }
×
754

755
  d_pendingResponses.erase(it);
32,949✔
756
  /* marking as idle for now, so we can accept new queries if our queues are empty */
757
  if (d_pendingQueries.empty() && d_pendingResponses.empty()) {
32,949✔
758
    d_state = State::idle;
32,796✔
759
    t_downstreamTCPConnectionsManager.moveToIdle(conn);
32,796✔
760
  }
32,796✔
761
  else if (!d_pendingResponses.empty()) {
153✔
762
    d_currentPos = 0;
150✔
763
    d_state = State::waitingForResponseFromBackend;
150✔
764
  }
150✔
765

766
  // be very careful that handleResponse() might trigger new queries being assigned to us,
767
  // which may reset our d_currentPos, d_state and/or d_responseBuffer, so we cannot assume
768
  // anything without checking first
769
  auto shared = conn;
32,949✔
770
  if (sender->active()) {
32,949✔
771
    DEBUGLOG("passing response to client connection for "<<ids.qname);
32,946✔
772
    // make sure that we still exist after calling handleResponse()
773
    TCPResponse response(std::move(d_responseBuffer), std::move(ids), conn, conn->d_ds);
32,946✔
774
    sender->handleResponse(now, std::move(response));
32,946✔
775
  }
32,946✔
776

777
  if (!d_pendingQueries.empty()) {
32,949✔
778
    DEBUGLOG("still have some queries to send");
3✔
779
    return queueNextQuery(shared);
3✔
780
  }
3✔
781
  if (d_state == State::sendingQueryToBackend) {
32,946✔
782
    DEBUGLOG("still have a query to send");
486✔
783
    return IOState::NeedWrite;
486✔
784
  }
486✔
785
  if (!d_pendingResponses.empty()) {
32,460✔
786
    DEBUGLOG("still have some responses to read");
60✔
787
    return IOState::NeedRead;
60✔
788
  }
60✔
789

790
  DEBUGLOG("nothing to do, waiting for a new query");
32,400✔
791
  d_state = State::idle;
32,400✔
792
  t_downstreamTCPConnectionsManager.moveToIdle(conn);
32,400✔
793
  return IOState::Done;
32,400✔
794
}
32,460✔
795

796
uint16_t TCPConnectionToBackend::getQueryIdFromResponse() const
797
{
33,410✔
798
  if (d_responseBuffer.size() < sizeof(dnsheader)) {
33,410✔
799
    throw std::runtime_error("Unable to get query ID in a too small (" + std::to_string(d_responseBuffer.size()) + ") response from " + d_ds->getNameWithAddr());
3✔
800
  }
3✔
801

802
  uint16_t id;
33,407✔
803
  memcpy(&id, &d_responseBuffer.at(0), sizeof(id));
33,407✔
804
  return ntohs(id);
33,407✔
805
}
33,410✔
806

807
void TCPConnectionToBackend::setProxyProtocolValuesSent(std::unique_ptr<std::vector<ProxyProtocolValue>>&& proxyProtocolValuesSent)
808
{
31✔
809
  /* if we already have some values, we have already verified they match */
810
  if (!d_proxyProtocolValuesSent) {
31✔
811
    d_proxyProtocolValuesSent = std::move(proxyProtocolValuesSent);
16✔
812
  }
16✔
813
}
31✔
814

815
bool TCPConnectionToBackend::matchesTLVs(const std::unique_ptr<std::vector<ProxyProtocolValue>>& tlvs) const
816
{
41✔
817
  if (tlvs == nullptr) {
41✔
818
    if (d_proxyProtocolValuesSent == nullptr) {
21!
819
      return true;
21✔
820
    }
21✔
821
    else {
×
822
      return false;
×
823
    }
×
824
  }
21✔
825

826
  if (d_proxyProtocolValuesSent == nullptr) {
20!
827
    return false;
×
828
  }
×
829

830
  return *tlvs == *d_proxyProtocolValuesSent;
20✔
831
}
20✔
832

833
bool TCPConnectionToBackend::isXFRFinished(const TCPResponse& response, TCPQuery& query)
834
{
455✔
835
  bool done = false;
455✔
836

837
  try {
455✔
838
    MOADNSParser parser(true, reinterpret_cast<const char*>(response.d_buffer.data()), response.d_buffer.size());
455✔
839

840
    if (parser.d_header.rcode != 0U) {
455!
841
      done = true;
×
842
    }
×
843
    else {
455✔
844
      for (const auto& record : parser.d_answers) {
515✔
845
        if (record.d_class != QClass::IN || record.d_type != QType::SOA) {
515!
846
          continue;
449✔
847
        }
449✔
848

849
        auto unknownContent = getRR<UnknownRecordContent>(record);
66✔
850
        if (!unknownContent) {
66!
851
          continue;
×
852
        }
×
853
        const auto& raw = unknownContent->getRawContent();
66✔
854
        auto serial = getSerialFromRawSOAContent(raw);
66✔
855
        if (query.d_xfrPrimarySerial == 0) {
66✔
856
          // store the first SOA in our client's connection metadata
857
          query.d_xfrPrimarySerial = serial;
27✔
858
          if (query.d_idstate.qtype == QType::IXFR && (query.d_xfrPrimarySerial == query.d_ixfrQuerySerial || rfc1982LessThan(query.d_xfrPrimarySerial, query.d_ixfrQuerySerial))) {
27!
859
            /* This is the first message with a primary SOA:
860
               RFC 1995 Section 2:
861
                 If an IXFR query with the same or newer version number
862
                 than that of the server is received, it is replied to
863
                 with a single SOA record of the server's current version.
864
            */
865
            done = true;
×
866
            break;
×
867
          }
×
868
        }
27✔
869

870
        ++query.d_xfrSerialCount;
66✔
871
        if (serial == query.d_xfrPrimarySerial) {
66✔
872
          ++query.d_xfrPrimarySerialCount;
54✔
873
          // figure out if it's end when receiving primary's SOA again
874
          if (query.d_xfrSerialCount == 2) {
54✔
875
            // if there are only two SOA records marks a finished AXFR
876
            done = true;
19✔
877
            break;
19✔
878
          }
19✔
879
          if (query.d_xfrPrimarySerialCount == 3) {
35✔
880
            // receiving primary's SOA 3 times marks a finished IXFR
881
            done = true;
4✔
882
            break;
4✔
883
          }
4✔
884
        }
35✔
885
      }
66✔
886
    }
455✔
887
  }
455✔
888
  catch (const MOADNSException& e) {
455✔
889
    DEBUGLOG("Exception when parsing TCPResponse to DNS: " << e.what());
×
890
    /* ponder what to do here, shall we close the connection? */
891
  }
×
892
  return done;
455✔
893
}
455✔
894

895
void setTCPDownstreamMaxIdleConnectionsPerBackend(uint64_t max)
896
{
400✔
897
  DownstreamTCPConnectionsManager::setMaxIdleConnectionsPerDownstream(max);
400✔
898
}
400✔
899

900
void setTCPDownstreamCleanupInterval(uint64_t interval)
901
{
400✔
902
  DownstreamTCPConnectionsManager::setCleanupInterval(interval);
400✔
903
}
400✔
904

905
void setTCPDownstreamMaxIdleTime(uint64_t max)
906
{
400✔
907
  DownstreamTCPConnectionsManager::setMaxIdleTime(max);
400✔
908
}
400✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc