• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

PowerDNS / pdns / 10789862120

10 Sep 2024 09:38AM UTC coverage: 55.763% (-0.03%) from 55.792%
10789862120

push

github

web-flow
Merge pull request #14641 from rgacogne/ddist19-backport-14573

dnsdist-1.9.x: Backport 14573 - Stop reporting timeouts in `topSlow()`, add `topTimeouts()`

13812 of 45324 branches covered (30.47%)

Branch coverage included in aggregate %.

3 of 9 new or added lines in 1 file covered. (33.33%)

13 existing lines in 4 files now uncovered.

47994 of 65513 relevant lines covered (73.26%)

3755251.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.88
/pdns/dnsdist-tcp.cc
1
/*
2
 * This file is part of PowerDNS or dnsdist.
3
 * Copyright -- PowerDNS.COM B.V. and its contributors
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of version 2 of the GNU General Public License as
7
 * published by the Free Software Foundation.
8
 *
9
 * In addition, for the avoidance of any doubt, permission is granted to
10
 * link this program with OpenSSL and to (re)distribute the binaries
11
 * produced as the result of such linking.
12
 *
13
 * This program is distributed in the hope that it will be useful,
14
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
 * GNU General Public License for more details.
17
 *
18
 * You should have received a copy of the GNU General Public License
19
 * along with this program; if not, write to the Free Software
20
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21
 */
22

23
#include <thread>
24
#include <netinet/tcp.h>
25
#include <queue>
26

27
#include "dnsdist.hh"
28
#include "dnsdist-concurrent-connections.hh"
29
#include "dnsdist-dnsparser.hh"
30
#include "dnsdist-ecs.hh"
31
#include "dnsdist-nghttp2-in.hh"
32
#include "dnsdist-proxy-protocol.hh"
33
#include "dnsdist-rings.hh"
34
#include "dnsdist-tcp.hh"
35
#include "dnsdist-tcp-downstream.hh"
36
#include "dnsdist-downstream-connection.hh"
37
#include "dnsdist-tcp-upstream.hh"
38
#include "dnsdist-xpf.hh"
39
#include "dnsparser.hh"
40
#include "dolog.hh"
41
#include "gettime.hh"
42
#include "lock.hh"
43
#include "sstuff.hh"
44
#include "tcpiohandler.hh"
45
#include "tcpiohandler-mplexer.hh"
46
#include "threadname.hh"
47

48
/* TCP: the grand design.
49
   We forward 'messages' between clients and downstream servers. Messages are 65k bytes large, tops.
50
   An answer might theoretically consist of multiple messages (for example, in the case of AXFR), initially
51
   we will not go there.
52

53
   In a sense there is a strong symmetry between UDP and TCP, once a connection to a downstream has been setup.
54
   This symmetry is broken because of head-of-line blocking within TCP though, necessitating additional connections
55
   to guarantee performance.
56

57
   So the idea is to have a 'pool' of available downstream connections, and forward messages to/from them and never queue.
58
   So whenever an answer comes in, we know where it needs to go.
59

60
   Let's start naively.
61
*/
62

63
size_t g_maxTCPQueriesPerConn{0};
64
size_t g_maxTCPConnectionDuration{0};
65

66
#ifdef __linux__
67
// On Linux this gives us 128k pending queries (default is 8192 queries),
68
// which should be enough to deal with huge spikes
69
size_t g_tcpInternalPipeBufferSize{1048576U};
70
uint64_t g_maxTCPQueuedConnections{10000};
71
#else
72
size_t g_tcpInternalPipeBufferSize{0};
73
uint64_t g_maxTCPQueuedConnections{1000};
74
#endif
75

76
int g_tcpRecvTimeout{2};
77
int g_tcpSendTimeout{2};
78
std::atomic<uint64_t> g_tcpStatesDumpRequested{0};
79

80
LockGuarded<std::map<ComboAddress, size_t, ComboAddress::addressOnlyLessThan>> dnsdist::IncomingConcurrentTCPConnectionsManager::s_tcpClientsConcurrentConnectionsCount;
81
size_t dnsdist::IncomingConcurrentTCPConnectionsManager::s_maxTCPConnectionsPerClient = 0;
82

83
IncomingTCPConnectionState::~IncomingTCPConnectionState()
84
{
2,047✔
85
  dnsdist::IncomingConcurrentTCPConnectionsManager::accountClosedTCPConnection(d_ci.remote);
2,047✔
86

87
  if (d_ci.cs != nullptr) {
2,047✔
88
    timeval now{};
2,046✔
89
    gettimeofday(&now, nullptr);
2,046✔
90

91
    auto diff = now - d_connectionStartTime;
2,046✔
92
    d_ci.cs->updateTCPMetrics(d_queriesCount, diff.tv_sec * 1000 + diff.tv_usec / 1000);
2,046✔
93
  }
2,046✔
94

95
  // would have been done when the object is destroyed anyway,
96
  // but that way we make sure it's done before the ConnectionInfo is destroyed,
97
  // closing the descriptor, instead of relying on the declaration order of the objects in the class
98
  d_handler.close();
2,047✔
99
}
2,047✔
100

101
dnsdist::Protocol IncomingTCPConnectionState::getProtocol() const
102
{
23,265✔
103
  if (d_ci.cs->dohFrontend) {
23,265✔
104
    return dnsdist::Protocol::DoH;
159✔
105
  }
159✔
106
  if (d_handler.isTLS()) {
23,106✔
107
    return dnsdist::Protocol::DoT;
20,788✔
108
  }
20,788✔
109
  return dnsdist::Protocol::DoTCP;
2,318✔
110
}
23,106✔
111

112
size_t IncomingTCPConnectionState::clearAllDownstreamConnections()
113
{
182✔
114
  return t_downstreamTCPConnectionsManager.clear();
182✔
115
}
182✔
116

117
std::shared_ptr<TCPConnectionToBackend> IncomingTCPConnectionState::getDownstreamConnection(std::shared_ptr<DownstreamState>& backend, const std::unique_ptr<std::vector<ProxyProtocolValue>>& tlvs, const struct timeval& now)
118
{
2,014✔
119
  auto downstream = getOwnedDownstreamConnection(backend, tlvs);
2,014✔
120

121
  if (!downstream) {
2,014✔
122
    /* we don't have a connection to this backend owned yet, let's get one (it might not be a fresh one, though) */
123
    downstream = t_downstreamTCPConnectionsManager.getConnectionToDownstream(d_threadData.mplexer, backend, now, std::string());
1,982✔
124
    if (backend->d_config.useProxyProtocol) {
1,982✔
125
      registerOwnedDownstreamConnection(downstream);
15✔
126
    }
15✔
127
  }
1,982✔
128

129
  return downstream;
2,014✔
130
}
2,014✔
131

132
static void tcpClientThread(pdns::channel::Receiver<ConnectionInfo>&& queryReceiver, pdns::channel::Receiver<CrossProtocolQuery>&& crossProtocolQueryReceiver, pdns::channel::Receiver<TCPCrossProtocolResponse>&& crossProtocolResponseReceiver, pdns::channel::Sender<TCPCrossProtocolResponse>&& crossProtocolResponseSender, std::vector<ClientState*> tcpAcceptStates);
133

134
TCPClientCollection::TCPClientCollection(size_t maxThreads, std::vector<ClientState*> tcpAcceptStates) :
135
  d_tcpclientthreads(maxThreads), d_maxthreads(maxThreads)
136
{
311✔
137
  for (size_t idx = 0; idx < maxThreads; idx++) {
3,250✔
138
    addTCPClientThread(tcpAcceptStates);
2,939✔
139
  }
2,939✔
140
}
311✔
141

142
void TCPClientCollection::addTCPClientThread(std::vector<ClientState*>& tcpAcceptStates)
143
{
2,939✔
144
  try {
2,939✔
145
    auto [queryChannelSender, queryChannelReceiver] = pdns::channel::createObjectQueue<ConnectionInfo>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, g_tcpInternalPipeBufferSize);
2,939✔
146

147
    auto [crossProtocolQueryChannelSender, crossProtocolQueryChannelReceiver] = pdns::channel::createObjectQueue<CrossProtocolQuery>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, g_tcpInternalPipeBufferSize);
2,939✔
148

149
    auto [crossProtocolResponseChannelSender, crossProtocolResponseChannelReceiver] = pdns::channel::createObjectQueue<TCPCrossProtocolResponse>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, g_tcpInternalPipeBufferSize);
2,939✔
150

151
    vinfolog("Adding TCP Client thread");
2,939✔
152

153
    if (d_numthreads >= d_tcpclientthreads.size()) {
2,939!
154
      vinfolog("Adding a new TCP client thread would exceed the vector size (%d/%d), skipping. Consider increasing the maximum amount of TCP client threads with setMaxTCPClientThreads() in the configuration.", d_numthreads.load(), d_tcpclientthreads.size());
×
155
      return;
×
156
    }
×
157

158
    TCPWorkerThread worker(std::move(queryChannelSender), std::move(crossProtocolQueryChannelSender));
2,939✔
159

160
    try {
2,939✔
161
      std::thread clientThread(tcpClientThread, std::move(queryChannelReceiver), std::move(crossProtocolQueryChannelReceiver), std::move(crossProtocolResponseChannelReceiver), std::move(crossProtocolResponseChannelSender), tcpAcceptStates);
2,939✔
162
      clientThread.detach();
2,939✔
163
    }
2,939✔
164
    catch (const std::runtime_error& e) {
2,939✔
165
      errlog("Error creating a TCP thread: %s", e.what());
×
166
      return;
×
167
    }
×
168

169
    d_tcpclientthreads.at(d_numthreads) = std::move(worker);
2,939✔
170
    ++d_numthreads;
2,939✔
171
  }
2,939✔
172
  catch (const std::exception& e) {
2,939✔
173
    errlog("Error creating TCP worker: %s", e.what());
×
174
  }
×
175
}
2,939✔
176

177
std::unique_ptr<TCPClientCollection> g_tcpclientthreads;
178

179
static IOState sendQueuedResponses(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now)
180
{
25,044✔
181
  IOState result = IOState::Done;
25,044✔
182

183
  while (state->active() && !state->d_queuedResponses.empty()) {
47,988✔
184
    DEBUGLOG("queue size is " << state->d_queuedResponses.size() << ", sending the next one");
22,960✔
185
    TCPResponse resp = std::move(state->d_queuedResponses.front());
22,960✔
186
    state->d_queuedResponses.pop_front();
22,960✔
187
    state->d_state = IncomingTCPConnectionState::State::idle;
22,960✔
188
    result = state->sendResponse(now, std::move(resp));
22,960✔
189
    if (result != IOState::Done) {
22,960✔
190
      return result;
16✔
191
    }
16✔
192
  }
22,960✔
193

194
  state->d_state = IncomingTCPConnectionState::State::idle;
25,028✔
195
  return IOState::Done;
25,028✔
196
}
25,044✔
197

198
void IncomingTCPConnectionState::handleResponseSent(TCPResponse& currentResponse, size_t sentBytes)
199
{
22,954✔
200
  if (currentResponse.d_idstate.qtype == QType::AXFR || currentResponse.d_idstate.qtype == QType::IXFR) {
22,954✔
201
    return;
446✔
202
  }
446✔
203

204
  --d_currentQueriesCount;
22,508✔
205

206
  const auto& backend = currentResponse.d_connection ? currentResponse.d_connection->getDS() : currentResponse.d_ds;
22,508✔
207
  if (!currentResponse.d_idstate.selfGenerated && backend) {
22,508!
208
    const auto& ids = currentResponse.d_idstate;
2,063✔
209
    double udiff = ids.queryRealTime.udiff();
2,063✔
210
    vinfolog("Got answer from %s, relayed to %s (%s, %d bytes), took %f us", backend->d_config.remote.toStringWithPort(), ids.origRemote.toStringWithPort(), getProtocol().toString(), sentBytes, udiff);
2,063✔
211

212
    auto backendProtocol = backend->getProtocol();
2,063✔
213
    if (backendProtocol == dnsdist::Protocol::DoUDP && !currentResponse.d_idstate.forwardedOverUDP) {
2,063✔
214
      backendProtocol = dnsdist::Protocol::DoTCP;
1,668✔
215
    }
1,668✔
216
    ::handleResponseSent(ids, udiff, d_ci.remote, backend->d_config.remote, static_cast<unsigned int>(currentResponse.d_buffer.size()), currentResponse.d_cleartextDH, backendProtocol, true);
2,063✔
217
  }
2,063✔
218
  else {
20,445✔
219
    const auto& ids = currentResponse.d_idstate;
20,445✔
220
    ::handleResponseSent(ids, 0., d_ci.remote, ComboAddress(), static_cast<unsigned int>(currentResponse.d_buffer.size()), currentResponse.d_cleartextDH, ids.protocol, false);
20,445✔
221
  }
20,445✔
222

223
  currentResponse.d_buffer.clear();
22,508✔
224
  currentResponse.d_connection.reset();
22,508✔
225
}
22,508✔
226

227
static void prependSizeToTCPQuery(PacketBuffer& buffer, size_t proxyProtocolPayloadSize)
228
{
2,194✔
229
  if (buffer.size() <= proxyProtocolPayloadSize) {
2,194!
230
    throw std::runtime_error("The payload size is smaller or equal to the buffer size");
×
231
  }
×
232

233
  uint16_t queryLen = proxyProtocolPayloadSize > 0 ? (buffer.size() - proxyProtocolPayloadSize) : buffer.size();
2,194✔
234
  const std::array<uint8_t, 2> sizeBytes{static_cast<uint8_t>(queryLen / 256), static_cast<uint8_t>(queryLen % 256)};
2,194✔
235
  /* prepend the size. Yes, this is not the most efficient way but it prevents mistakes
236
     that could occur if we had to deal with the size during the processing,
237
     especially alignment issues */
238
  buffer.insert(buffer.begin() + static_cast<PacketBuffer::iterator::difference_type>(proxyProtocolPayloadSize), sizeBytes.begin(), sizeBytes.end());
2,194✔
239
}
2,194✔
240

241
bool IncomingTCPConnectionState::canAcceptNewQueries(const struct timeval& now)
242
{
25,023✔
243
  if (d_hadErrors) {
25,023✔
244
    DEBUGLOG("not accepting new queries because we encountered some error during the processing already");
2✔
245
    return false;
2✔
246
  }
2✔
247

248
  // for DoH, this is already handled by the underlying library
249
  if (!d_ci.cs->dohFrontend && d_currentQueriesCount >= d_ci.cs->d_maxInFlightQueriesPerConn) {
25,021✔
250
    DEBUGLOG("not accepting new queries because we already have " << d_currentQueriesCount << " out of " << d_ci.cs->d_maxInFlightQueriesPerConn);
2,158✔
251
    return false;
2,158✔
252
  }
2,158✔
253

254
  if (g_maxTCPQueriesPerConn != 0 && d_queriesCount > g_maxTCPQueriesPerConn) {
22,863✔
255
    vinfolog("not accepting new queries from %s because it reached the maximum number of queries per conn (%d / %d)", d_ci.remote.toStringWithPort(), d_queriesCount, g_maxTCPQueriesPerConn);
208✔
256
    return false;
208✔
257
  }
208✔
258

259
  if (maxConnectionDurationReached(g_maxTCPConnectionDuration, now)) {
22,655!
260
    vinfolog("not accepting new queries from %s because it reached the maximum TCP connection duration", d_ci.remote.toStringWithPort());
×
261
    return false;
×
262
  }
×
263

264
  return true;
22,655✔
265
}
22,655✔
266

267
void IncomingTCPConnectionState::resetForNewQuery()
268
{
22,655✔
269
  d_buffer.clear();
22,655✔
270
  d_currentPos = 0;
22,655✔
271
  d_querySize = 0;
22,655✔
272
  d_state = State::waitingForQuery;
22,655✔
273
}
22,655✔
274

275
std::shared_ptr<TCPConnectionToBackend> IncomingTCPConnectionState::getOwnedDownstreamConnection(const std::shared_ptr<DownstreamState>& backend, const std::unique_ptr<std::vector<ProxyProtocolValue>>& tlvs)
276
{
2,014✔
277
  auto connIt = d_ownedConnectionsToBackend.find(backend);
2,014✔
278
  if (connIt == d_ownedConnectionsToBackend.end()) {
2,014✔
279
    DEBUGLOG("no owned connection found for " << backend->getName());
1,982✔
280
    return nullptr;
1,982✔
281
  }
1,982✔
282

283
  for (auto& conn : connIt->second) {
32!
284
    if (conn->canBeReused(true) && conn->matchesTLVs(tlvs)) {
32!
285
      DEBUGLOG("Got one owned connection accepting more for " << backend->getName());
32✔
286
      conn->setReused();
32✔
287
      return conn;
32✔
288
    }
32✔
289
    DEBUGLOG("not accepting more for " << backend->getName());
×
290
  }
×
291

292
  return nullptr;
×
293
}
32✔
294

295
void IncomingTCPConnectionState::registerOwnedDownstreamConnection(std::shared_ptr<TCPConnectionToBackend>& conn)
296
{
15✔
297
  d_ownedConnectionsToBackend[conn->getDS()].push_front(conn);
15✔
298
}
15✔
299

300
/* called when the buffer has been set and the rules have been processed, and only from handleIO (sometimes indirectly via handleQuery) */
301
IOState IncomingTCPConnectionState::sendResponse(const struct timeval& now, TCPResponse&& response)
302
{
22,849✔
303
  d_state = State::sendingResponse;
22,849✔
304

305
  const auto responseSize = static_cast<uint16_t>(response.d_buffer.size());
22,849✔
306
  const std::array<uint8_t, 2> sizeBytes{static_cast<uint8_t>(responseSize / 256), static_cast<uint8_t>(responseSize % 256)};
22,849✔
307
  /* prepend the size. Yes, this is not the most efficient way but it prevents mistakes
308
     that could occur if we had to deal with the size during the processing,
309
     especially alignment issues */
310
  response.d_buffer.insert(response.d_buffer.begin(), sizeBytes.begin(), sizeBytes.end());
22,849✔
311
  d_currentPos = 0;
22,849✔
312
  d_currentResponse = std::move(response);
22,849✔
313

314
  try {
22,849✔
315
    auto iostate = d_handler.tryWrite(d_currentResponse.d_buffer, d_currentPos, d_currentResponse.d_buffer.size());
22,849✔
316
    if (iostate == IOState::Done) {
22,849✔
317
      DEBUGLOG("response sent from " << __PRETTY_FUNCTION__);
22,830✔
318
      handleResponseSent(d_currentResponse, d_currentResponse.d_buffer.size());
22,830✔
319
      return iostate;
22,830✔
320
    }
22,830✔
321
    d_lastIOBlocked = true;
19✔
322
    DEBUGLOG("partial write");
19✔
323
    return iostate;
19✔
324
  }
22,849✔
325
  catch (const std::exception& e) {
22,849✔
326
    vinfolog("Closing TCP client connection with %s: %s", d_ci.remote.toStringWithPort(), e.what());
4!
327
    DEBUGLOG("Closing TCP client connection: " << e.what());
4✔
328
    ++d_ci.cs->tcpDiedSendingResponse;
4✔
329

330
    terminateClientConnection();
4✔
331

332
    return IOState::Done;
4✔
333
  }
4✔
334
}
22,849✔
335

336
void IncomingTCPConnectionState::terminateClientConnection()
337
{
1,862✔
338
  DEBUGLOG("terminating client connection");
1,862✔
339
  d_queuedResponses.clear();
1,862✔
340
  /* we have already released idle connections that could be reused,
341
     we don't care about the ones still waiting for responses */
342
  for (auto& backend : d_ownedConnectionsToBackend) {
1,862✔
343
    for (auto& conn : backend.second) {
14✔
344
      conn->release();
14✔
345
    }
14✔
346
  }
14✔
347
  d_ownedConnectionsToBackend.clear();
1,862✔
348

349
  /* meaning we will no longer be 'active' when the backend
350
     response or timeout comes in */
351
  d_ioState.reset();
1,862✔
352

353
  /* if we do have remaining async descriptors associated with this TLS
354
     connection, we need to defer the destruction of the TLS object until
355
     the engine has reported back, otherwise we have a use-after-free.. */
356
  auto afds = d_handler.getAsyncFDs();
1,862✔
357
  if (afds.empty()) {
1,863✔
358
    d_handler.close();
1,863✔
359
  }
1,863✔
360
  else {
2,147,483,647✔
361
    /* we might already be waiting, but we might also not because sometimes we have already been
362
       notified via the descriptor, not received Async again, but the async job still exists.. */
363
    auto state = shared_from_this();
2,147,483,647✔
364
    for (const auto desc : afds) {
2,147,483,647!
365
      try {
×
366
        state->d_threadData.mplexer->addReadFD(desc, handleAsyncReady, state);
×
367
      }
×
368
      catch (...) {
×
369
      }
×
370
    }
×
371
  }
2,147,483,647✔
372
}
1,862✔
373

374
void IncomingTCPConnectionState::queueResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response, bool fromBackend)
375
{
22,960✔
376
  // queue response
377
  state->d_queuedResponses.emplace_back(std::move(response));
22,960✔
378
  DEBUGLOG("queueing response, state is " << (int)state->d_state << ", queue size is now " << state->d_queuedResponses.size());
22,960✔
379

380
  // when the response comes from a backend, there is a real possibility that we are currently
381
  // idle, and thus not trying to send the response right away would make our ref count go to 0.
382
  // Even if we are waiting for a query, we will not wake up before the new query arrives or a
383
  // timeout occurs
384
  if (state->d_state == State::idle || state->d_state == State::waitingForQuery) {
22,960✔
385
    auto iostate = sendQueuedResponses(state, now);
22,712✔
386

387
    if (iostate == IOState::Done && state->active()) {
22,712✔
388
      if (state->canAcceptNewQueries(now)) {
22,694✔
389
        state->resetForNewQuery();
22,448✔
390
        state->d_state = State::waitingForQuery;
22,448✔
391
        iostate = IOState::NeedRead;
22,448✔
392
      }
22,448✔
393
      else {
246✔
394
        state->d_state = State::idle;
246✔
395
      }
246✔
396
    }
22,694✔
397

398
    // for the same reason we need to update the state right away, nobody will do that for us
399
    if (state->active()) {
22,712✔
400
      updateIO(state, iostate, now);
22,707✔
401
      // if we have not finished reading every available byte, we _need_ to do an actual read
402
      // attempt before waiting for the socket to become readable again, because if there is
403
      // buffered data available the socket might never become readable again.
404
      // This is true as soon as we deal with TLS because TLS records are processed one by
405
      // one and might not match what we see at the application layer, so data might already
406
      // be available in the TLS library's buffers. This is especially true when OpenSSL's
407
      // read-ahead mode is enabled because then it buffers even more than one SSL record
408
      // for performance reasons.
409
      if (fromBackend && !state->d_lastIOBlocked) {
22,707✔
410
        state->handleIO();
2,153✔
411
      }
2,153✔
412
    }
22,707✔
413
  }
22,712✔
414
}
22,960✔
415

416
void IncomingTCPConnectionState::handleAsyncReady([[maybe_unused]] int desc, FDMultiplexer::funcparam_t& param)
417
{
×
418
  auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(param);
×
419

420
  /* If we are here, the async jobs for this SSL* are finished
421
     so we should be able to remove all FDs */
422
  auto afds = state->d_handler.getAsyncFDs();
×
423
  for (const auto afd : afds) {
×
424
    try {
×
425
      state->d_threadData.mplexer->removeReadFD(afd);
×
426
    }
×
427
    catch (...) {
×
428
    }
×
429
  }
×
430

431
  if (state->active()) {
×
432
    /* and now we restart our own I/O state machine */
433
    state->handleIO();
×
434
  }
×
435
  else {
×
436
    /* we were only waiting for the engine to come back,
437
       to prevent a use-after-free */
438
    state->d_handler.close();
×
439
  }
×
440
}
×
441

442
void IncomingTCPConnectionState::updateIO(std::shared_ptr<IncomingTCPConnectionState>& state, IOState newState, const struct timeval& now)
443
{
45,267✔
444
  if (newState == IOState::Async) {
45,267!
445
    auto fds = state->d_handler.getAsyncFDs();
×
446
    for (const auto desc : fds) {
×
447
      state->d_threadData.mplexer->addReadFD(desc, handleAsyncReady, state);
×
448
    }
×
449
    state->d_ioState->update(IOState::Done, handleIOCallback, state);
×
450
  }
×
451
  else {
45,267✔
452
    state->d_ioState->update(newState, handleIOCallback, state, newState == IOState::NeedWrite ? state->getClientWriteTTD(now) : state->getClientReadTTD(now));
45,267✔
453
  }
45,267✔
454
}
45,267✔
455

456
/* called from the backend code when a new response has been received */
457
void IncomingTCPConnectionState::handleResponse(const struct timeval& now, TCPResponse&& response)
458
{
2,301✔
459
  if (std::this_thread::get_id() != d_creatorThreadID) {
2,301✔
460
    handleCrossProtocolResponse(now, std::move(response));
124✔
461
    return;
124✔
462
  }
124✔
463

464
  std::shared_ptr<IncomingTCPConnectionState> state = shared_from_this();
2,177✔
465

466
  if (!response.isAsync() && response.d_connection && response.d_connection->getDS() && response.d_connection->getDS()->d_config.useProxyProtocol) {
2,177!
467
    // if we have added a TCP Proxy Protocol payload to a connection, don't release it to the general pool as no one else will be able to use it anyway
468
    if (!response.d_connection->willBeReusable(true)) {
40!
469
      // if it can't be reused even by us, well
470
      const auto connIt = state->d_ownedConnectionsToBackend.find(response.d_connection->getDS());
×
471
      if (connIt != state->d_ownedConnectionsToBackend.end()) {
×
472
        auto& list = connIt->second;
×
473

474
        for (auto it = list.begin(); it != list.end(); ++it) {
×
475
          if (*it == response.d_connection) {
×
476
            try {
×
477
              response.d_connection->release();
×
478
            }
×
479
            catch (const std::exception& e) {
×
480
              vinfolog("Error releasing connection: %s", e.what());
×
481
            }
×
482
            list.erase(it);
×
483
            break;
×
484
          }
×
485
        }
×
486
      }
×
487
    }
×
488
  }
40✔
489

490
  if (response.d_buffer.size() < sizeof(dnsheader)) {
2,177✔
491
    state->terminateClientConnection();
2✔
492
    return;
2✔
493
  }
2✔
494

495
  if (!response.isAsync()) {
2,175✔
496
    try {
2,091✔
497
      auto& ids = response.d_idstate;
2,091✔
498
      std::shared_ptr<DownstreamState> backend = response.d_ds ? response.d_ds : (response.d_connection ? response.d_connection->getDS() : nullptr);
2,091!
499
      if (backend == nullptr || !responseContentMatches(response.d_buffer, ids.qname, ids.qtype, ids.qclass, backend)) {
2,091!
500
        state->terminateClientConnection();
3✔
501
        return;
3✔
502
      }
3✔
503

504
      if (backend != nullptr) {
2,088!
505
        ++backend->responses;
2,088✔
506
      }
2,088✔
507

508
      DNSResponse dnsResponse(ids, response.d_buffer, backend);
2,088✔
509
      dnsResponse.d_incomingTCPState = state;
2,088✔
510

511
      memcpy(&response.d_cleartextDH, dnsResponse.getHeader().get(), sizeof(response.d_cleartextDH));
2,088✔
512

513
      if (!processResponse(response.d_buffer, *state->d_threadData.localRespRuleActions, *state->d_threadData.localCacheInsertedRespRuleActions, dnsResponse, false)) {
2,088✔
514
        state->terminateClientConnection();
6✔
515
        return;
6✔
516
      }
6✔
517

518
      if (dnsResponse.isAsynchronous()) {
2,082✔
519
        /* we are done for now */
520
        return;
79✔
521
      }
79✔
522
    }
2,082✔
523
    catch (const std::exception& e) {
2,091✔
524
      vinfolog("Unexpected exception while handling response from backend: %s", e.what());
4!
525
      state->terminateClientConnection();
4✔
526
      return;
4✔
527
    }
4✔
528
  }
2,091✔
529

530
  ++dnsdist::metrics::g_stats.responses;
2,083✔
531
  ++state->d_ci.cs->responses;
2,083✔
532

533
  queueResponse(state, now, std::move(response), true);
2,083✔
534
}
2,083✔
535

536
struct TCPCrossProtocolResponse
537
{
538
  TCPCrossProtocolResponse(TCPResponse&& response, std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now) :
539
    d_response(std::move(response)), d_state(state), d_now(now)
540
  {
255✔
541
  }
255✔
542
  TCPCrossProtocolResponse(const TCPCrossProtocolResponse&) = delete;
543
  TCPCrossProtocolResponse& operator=(const TCPCrossProtocolResponse&) = delete;
544
  TCPCrossProtocolResponse(TCPCrossProtocolResponse&&) = delete;
545
  TCPCrossProtocolResponse& operator=(TCPCrossProtocolResponse&&) = delete;
546
  ~TCPCrossProtocolResponse() = default;
255✔
547

548
  TCPResponse d_response;
549
  std::shared_ptr<IncomingTCPConnectionState> d_state;
550
  struct timeval d_now;
551
};
552

553
class TCPCrossProtocolQuery : public CrossProtocolQuery
554
{
555
public:
556
  TCPCrossProtocolQuery(PacketBuffer&& buffer, InternalQueryState&& ids, std::shared_ptr<DownstreamState> backend, std::shared_ptr<IncomingTCPConnectionState> sender) :
557
    CrossProtocolQuery(InternalQuery(std::move(buffer), std::move(ids)), backend), d_sender(std::move(sender))
558
  {
216✔
559
  }
216✔
560
  TCPCrossProtocolQuery(const TCPCrossProtocolQuery&) = delete;
561
  TCPCrossProtocolQuery& operator=(const TCPCrossProtocolQuery&) = delete;
562
  TCPCrossProtocolQuery(TCPCrossProtocolQuery&&) = delete;
563
  TCPCrossProtocolQuery& operator=(TCPCrossProtocolQuery&&) = delete;
564
  ~TCPCrossProtocolQuery() override = default;
216✔
565

566
  std::shared_ptr<TCPQuerySender> getTCPQuerySender() override
567
  {
194✔
568
    return d_sender;
194✔
569
  }
194✔
570

571
  DNSQuestion getDQ() override
572
  {
168✔
573
    auto& ids = query.d_idstate;
168✔
574
    DNSQuestion dnsQuestion(ids, query.d_buffer);
168✔
575
    dnsQuestion.d_incomingTCPState = d_sender;
168✔
576
    return dnsQuestion;
168✔
577
  }
168✔
578

579
  DNSResponse getDR() override
580
  {
72✔
581
    auto& ids = query.d_idstate;
72✔
582
    DNSResponse dnsResponse(ids, query.d_buffer, downstream);
72✔
583
    dnsResponse.d_incomingTCPState = d_sender;
72✔
584
    return dnsResponse;
72✔
585
  }
72✔
586

587
private:
588
  std::shared_ptr<IncomingTCPConnectionState> d_sender;
589
};
590

591
std::unique_ptr<CrossProtocolQuery> IncomingTCPConnectionState::getCrossProtocolQuery(PacketBuffer&& query, InternalQueryState&& state, const std::shared_ptr<DownstreamState>& backend)
592
{
4✔
593
  return std::make_unique<TCPCrossProtocolQuery>(std::move(query), std::move(state), backend, shared_from_this());
4✔
594
}
4✔
595

596
std::unique_ptr<CrossProtocolQuery> getTCPCrossProtocolQueryFromDQ(DNSQuestion& dnsQuestion)
597
{
187✔
598
  auto state = dnsQuestion.getIncomingTCPState();
187✔
599
  if (!state) {
187!
600
    throw std::runtime_error("Trying to create a TCP cross protocol query without a valid TCP state");
×
601
  }
×
602

603
  dnsQuestion.ids.origID = dnsQuestion.getHeader()->id;
187✔
604
  return std::make_unique<TCPCrossProtocolQuery>(std::move(dnsQuestion.getMutableData()), std::move(dnsQuestion.ids), nullptr, std::move(state));
187✔
605
}
187✔
606

607
void IncomingTCPConnectionState::handleCrossProtocolResponse(const struct timeval& now, TCPResponse&& response)
608
{
255✔
609
  std::shared_ptr<IncomingTCPConnectionState> state = shared_from_this();
255✔
610
  try {
255✔
611
    auto ptr = std::make_unique<TCPCrossProtocolResponse>(std::move(response), state, now);
255✔
612
    if (!state->d_threadData.crossProtocolResponseSender.send(std::move(ptr))) {
255!
613
      ++dnsdist::metrics::g_stats.tcpCrossProtocolResponsePipeFull;
×
614
      vinfolog("Unable to pass a cross-protocol response to the TCP worker thread because the pipe is full");
×
615
    }
×
616
  }
255✔
617
  catch (const std::exception& e) {
255✔
618
    vinfolog("Unable to pass a cross-protocol response to the TCP worker thread because we couldn't write to the pipe: %s", stringerror());
×
619
  }
×
620
}
255✔
621

622
IncomingTCPConnectionState::QueryProcessingResult IncomingTCPConnectionState::handleQuery(PacketBuffer&& queryIn, const struct timeval& now, std::optional<int32_t> streamID)
623
{
22,671✔
624
  auto query = std::move(queryIn);
22,671✔
625
  if (query.size() < sizeof(dnsheader)) {
22,671✔
626
    ++dnsdist::metrics::g_stats.nonCompliantQueries;
2✔
627
    ++d_ci.cs->nonCompliantQueries;
2✔
628
    return QueryProcessingResult::TooSmall;
2✔
629
  }
2✔
630

631
  ++d_queriesCount;
22,669✔
632
  ++d_ci.cs->queries;
22,669✔
633
  ++dnsdist::metrics::g_stats.queries;
22,669✔
634

635
  if (d_handler.isTLS()) {
22,669✔
636
    auto tlsVersion = d_handler.getTLSVersion();
20,890✔
637
    switch (tlsVersion) {
20,890✔
638
    case LibsslTLSVersion::TLS10:
×
639
      ++d_ci.cs->tls10queries;
×
640
      break;
×
641
    case LibsslTLSVersion::TLS11:
×
642
      ++d_ci.cs->tls11queries;
×
643
      break;
×
644
    case LibsslTLSVersion::TLS12:
9✔
645
      ++d_ci.cs->tls12queries;
9✔
646
      break;
9✔
647
    case LibsslTLSVersion::TLS13:
20,881✔
648
      ++d_ci.cs->tls13queries;
20,881✔
649
      break;
20,881✔
650
    default:
×
651
      ++d_ci.cs->tlsUnknownqueries;
×
652
    }
20,890✔
653
  }
20,890✔
654

655
  auto state = shared_from_this();
22,669✔
656
  InternalQueryState ids;
22,669✔
657
  ids.origDest = d_proxiedDestination;
22,669✔
658
  ids.origRemote = d_proxiedRemote;
22,669✔
659
  ids.cs = d_ci.cs;
22,669✔
660
  ids.queryRealTime.start();
22,669✔
661
  if (streamID) {
22,669✔
662
    ids.d_streamID = *streamID;
124✔
663
  }
124✔
664

665
  auto dnsCryptResponse = checkDNSCryptQuery(*d_ci.cs, query, ids.dnsCryptQuery, ids.queryRealTime.d_start.tv_sec, true);
22,669✔
666
  if (dnsCryptResponse) {
22,669!
667
    TCPResponse response;
×
668
    d_state = State::idle;
×
669
    ++d_currentQueriesCount;
×
670
    queueResponse(state, now, std::move(response), false);
×
671
    return QueryProcessingResult::SelfAnswered;
×
672
  }
×
673

674
  {
22,669✔
675
    /* this pointer will be invalidated the second the buffer is resized, don't hold onto it! */
676
    const dnsheader_aligned dnsHeader(query.data());
22,669✔
677
    if (!checkQueryHeaders(*dnsHeader, *d_ci.cs)) {
22,669✔
678
      return QueryProcessingResult::InvalidHeaders;
4✔
679
    }
4✔
680

681
    if (dnsHeader->qdcount == 0) {
22,665✔
682
      TCPResponse response;
2✔
683
      auto queryID = dnsHeader->id;
2✔
684
      dnsdist::PacketMangling::editDNSHeaderFromPacket(query, [](dnsheader& header) {
2✔
685
        header.rcode = RCode::NotImp;
2✔
686
        header.qr = true;
2✔
687
        return true;
2✔
688
      });
2✔
689
      response.d_idstate = std::move(ids);
2✔
690
      response.d_idstate.origID = queryID;
2✔
691
      response.d_idstate.selfGenerated = true;
2✔
692
      response.d_buffer = std::move(query);
2✔
693
      d_state = State::idle;
2✔
694
      ++d_currentQueriesCount;
2✔
695
      queueResponse(state, now, std::move(response), false);
2✔
696
      return QueryProcessingResult::SelfAnswered;
2✔
697
    }
2✔
698
  }
22,665✔
699

700
  // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast
701
  ids.qname = DNSName(reinterpret_cast<const char*>(query.data()), static_cast<int>(query.size()), sizeof(dnsheader), false, &ids.qtype, &ids.qclass);
22,663✔
702
  ids.protocol = getProtocol();
22,663✔
703
  if (ids.dnsCryptQuery) {
22,663✔
704
    ids.protocol = dnsdist::Protocol::DNSCryptTCP;
8✔
705
  }
8✔
706

707
  DNSQuestion dnsQuestion(ids, query);
22,663✔
708
  dnsdist::PacketMangling::editDNSHeaderFromPacket(dnsQuestion.getMutableData(), [&ids](dnsheader& header) {
22,663✔
709
    const uint16_t* flags = getFlagsFromDNSHeader(&header);
22,662✔
710
    ids.origFlags = *flags;
22,662✔
711
    return true;
22,662✔
712
  });
22,662✔
713
  dnsQuestion.d_incomingTCPState = state;
22,663✔
714
  dnsQuestion.sni = d_handler.getServerNameIndication();
22,663✔
715

716
  if (d_proxyProtocolValues) {
22,663✔
717
    /* we need to copy them, because the next queries received on that connection will
718
       need to get the _unaltered_ values */
719
    dnsQuestion.proxyProtocolValues = make_unique<std::vector<ProxyProtocolValue>>(*d_proxyProtocolValues);
32✔
720
  }
32✔
721

722
  if (dnsQuestion.ids.qtype == QType::AXFR || dnsQuestion.ids.qtype == QType::IXFR) {
22,663✔
723
    dnsQuestion.ids.skipCache = true;
26✔
724
  }
26✔
725

726
  if (forwardViaUDPFirst()) {
22,663✔
727
    // if there was no EDNS, we add it with a large buffer size
728
    // so we can use UDP to talk to the backend.
729
    const dnsheader_aligned dnsHeader(query.data());
121✔
730
    if (dnsHeader->arcount == 0U) {
121✔
731
      if (addEDNS(query, 4096, false, 4096, 0)) {
115!
732
        dnsQuestion.ids.ednsAdded = true;
115✔
733
      }
115✔
734
    }
115✔
735
  }
121✔
736

737
  if (streamID) {
22,663✔
738
    auto unit = getDOHUnit(*streamID);
121✔
739
    if (unit) {
121!
740
      dnsQuestion.ids.du = std::move(unit);
121✔
741
    }
121✔
742
  }
121✔
743

744
  std::shared_ptr<DownstreamState> backend;
22,663✔
745
  auto result = processQuery(dnsQuestion, d_threadData.holders, backend);
22,663✔
746

747
  if (result == ProcessQueryResult::Asynchronous) {
22,663✔
748
    /* we are done for now */
749
    ++d_currentQueriesCount;
108✔
750
    return QueryProcessingResult::Asynchronous;
108✔
751
  }
108✔
752

753
  if (streamID) {
22,555✔
754
    restoreDOHUnit(std::move(dnsQuestion.ids.du));
85✔
755
  }
85✔
756

757
  if (result == ProcessQueryResult::Drop) {
22,555✔
758
    return QueryProcessingResult::Dropped;
36✔
759
  }
36✔
760

761
  // the buffer might have been invalidated by now
762
  uint16_t queryID{0};
22,519✔
763
  {
22,519✔
764
    const auto dnsHeader = dnsQuestion.getHeader();
22,519✔
765
    queryID = dnsHeader->id;
22,519✔
766
  }
22,519✔
767

768
  if (result == ProcessQueryResult::SendAnswer) {
22,519✔
769
    TCPResponse response;
20,431✔
770
    {
20,431✔
771
      const auto dnsHeader = dnsQuestion.getHeader();
20,431✔
772
      memcpy(&response.d_cleartextDH, dnsHeader.get(), sizeof(response.d_cleartextDH));
20,431✔
773
    }
20,431✔
774
    response.d_idstate = std::move(ids);
20,431✔
775
    response.d_idstate.origID = queryID;
20,431✔
776
    response.d_idstate.selfGenerated = true;
20,431✔
777
    response.d_idstate.cs = d_ci.cs;
20,431✔
778
    response.d_buffer = std::move(query);
20,431✔
779

780
    d_state = State::idle;
20,431✔
781
    ++d_currentQueriesCount;
20,431✔
782
    queueResponse(state, now, std::move(response), false);
20,431✔
783
    return QueryProcessingResult::SelfAnswered;
20,431✔
784
  }
20,431✔
785

786
  if (result != ProcessQueryResult::PassToBackend || backend == nullptr) {
2,088!
787
    return QueryProcessingResult::NoBackend;
×
788
  }
×
789

790
  dnsQuestion.ids.origID = queryID;
2,088✔
791

792
  ++d_currentQueriesCount;
2,088✔
793

794
  std::string proxyProtocolPayload;
2,088✔
795
  if (backend->isDoH()) {
2,088✔
796
    vinfolog("Got query for %s|%s from %s (%s, %d bytes), relayed to %s", ids.qname.toLogString(), QType(ids.qtype).toString(), d_proxiedRemote.toStringWithPort(), getProtocol().toString(), query.size(), backend->getNameWithAddr());
25✔
797

798
    /* we need to do this _before_ creating the cross protocol query because
799
       after that the buffer will have been moved */
800
    if (backend->d_config.useProxyProtocol) {
25✔
801
      proxyProtocolPayload = getProxyProtocolPayload(dnsQuestion);
1✔
802
    }
1✔
803

804
    auto cpq = std::make_unique<TCPCrossProtocolQuery>(std::move(query), std::move(ids), backend, state);
25✔
805
    cpq->query.d_proxyProtocolPayload = std::move(proxyProtocolPayload);
25✔
806

807
    backend->passCrossProtocolQuery(std::move(cpq));
25✔
808
    return QueryProcessingResult::Forwarded;
25✔
809
  }
25✔
810
  if (!backend->isTCPOnly() && forwardViaUDPFirst()) {
2,063✔
811
    if (streamID) {
47!
812
      auto unit = getDOHUnit(*streamID);
47✔
813
      if (unit) {
47!
814
        dnsQuestion.ids.du = std::move(unit);
47✔
815
      }
47✔
816
    }
47✔
817
    if (assignOutgoingUDPQueryToBackend(backend, queryID, dnsQuestion, query)) {
47✔
818
      return QueryProcessingResult::Forwarded;
46✔
819
    }
46✔
820
    restoreDOHUnit(std::move(dnsQuestion.ids.du));
1✔
821
    // fallback to the normal flow
822
  }
1✔
823

824
  prependSizeToTCPQuery(query, 0);
2,017✔
825

826
  auto downstreamConnection = getDownstreamConnection(backend, dnsQuestion.proxyProtocolValues, now);
2,017✔
827

828
  if (backend->d_config.useProxyProtocol) {
2,017✔
829
    /* if we ever sent a TLV over a connection, we can never go back */
830
    if (!d_proxyProtocolPayloadHasTLV) {
47✔
831
      d_proxyProtocolPayloadHasTLV = dnsQuestion.proxyProtocolValues && !dnsQuestion.proxyProtocolValues->empty();
32!
832
    }
32✔
833

834
    proxyProtocolPayload = getProxyProtocolPayload(dnsQuestion);
47✔
835
  }
47✔
836

837
  if (dnsQuestion.proxyProtocolValues) {
2,017✔
838
    downstreamConnection->setProxyProtocolValuesSent(std::move(dnsQuestion.proxyProtocolValues));
21✔
839
  }
21✔
840

841
  TCPQuery tcpquery(std::move(query), std::move(ids));
2,017✔
842
  tcpquery.d_proxyProtocolPayload = std::move(proxyProtocolPayload);
2,017✔
843

844
  vinfolog("Got query for %s|%s from %s (%s, %d bytes), relayed to %s", tcpquery.d_idstate.qname.toLogString(), QType(tcpquery.d_idstate.qtype).toString(), d_proxiedRemote.toStringWithPort(), getProtocol().toString(), tcpquery.d_buffer.size(), backend->getNameWithAddr());
2,017✔
845
  std::shared_ptr<TCPQuerySender> incoming = state;
2,017✔
846
  downstreamConnection->queueQuery(incoming, std::move(tcpquery));
2,017✔
847
  return QueryProcessingResult::Forwarded;
2,017✔
848
}
2,063✔
849

850
void IncomingTCPConnectionState::handleIOCallback(int desc, FDMultiplexer::funcparam_t& param)
851
{
1,941✔
852
  auto conn = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(param);
1,941✔
853
  if (desc != conn->d_handler.getDescriptor()) {
1,941!
854
    // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-array-to-pointer-decay): __PRETTY_FUNCTION__ is fine
855
    throw std::runtime_error("Unexpected socket descriptor " + std::to_string(desc) + " received in " + std::string(__PRETTY_FUNCTION__) + ", expected " + std::to_string(conn->d_handler.getDescriptor()));
×
856
  }
×
857

858
  conn->handleIO();
1,941✔
859
}
1,941✔
860

861
void IncomingTCPConnectionState::handleHandshakeDone(const struct timeval& now)
862
{
2,060✔
863
  if (d_handler.isTLS()) {
2,060✔
864
    if (!d_handler.hasTLSSessionBeenResumed()) {
366✔
865
      ++d_ci.cs->tlsNewSessions;
341✔
866
    }
341✔
867
    else {
25✔
868
      ++d_ci.cs->tlsResumptions;
25✔
869
    }
25✔
870
    if (d_handler.getResumedFromInactiveTicketKey()) {
366✔
871
      ++d_ci.cs->tlsInactiveTicketKey;
8✔
872
    }
8✔
873
    if (d_handler.getUnknownTicketKey()) {
366✔
874
      ++d_ci.cs->tlsUnknownTicketKey;
6✔
875
    }
6✔
876
  }
366✔
877

878
  d_handshakeDoneTime = now;
2,060✔
879
}
2,060✔
880

881
IncomingTCPConnectionState::ProxyProtocolResult IncomingTCPConnectionState::handleProxyProtocolPayload()
882
{
16✔
883
  do {
27✔
884
    DEBUGLOG("reading proxy protocol header");
27✔
885
    auto iostate = d_handler.tryRead(d_buffer, d_currentPos, d_proxyProtocolNeed, false, isProxyPayloadOutsideTLS());
27✔
886
    if (iostate == IOState::Done) {
27✔
887
      d_buffer.resize(d_currentPos);
23✔
888
      ssize_t remaining = isProxyHeaderComplete(d_buffer);
23✔
889
      if (remaining == 0) {
23✔
890
        vinfolog("Unable to consume proxy protocol header in packet from TCP client %s", d_ci.remote.toStringWithPort());
3!
891
        ++dnsdist::metrics::g_stats.proxyProtocolInvalid;
3✔
892
        return ProxyProtocolResult::Error;
3✔
893
      }
3✔
894
      if (remaining < 0) {
20✔
895
        d_proxyProtocolNeed += -remaining;
11✔
896
        d_buffer.resize(d_currentPos + d_proxyProtocolNeed);
11✔
897
        /* we need to keep reading, since we might have buffered data */
898
      }
11✔
899
      else {
9✔
900
        /* proxy header received */
901
        std::vector<ProxyProtocolValue> proxyProtocolValues;
9✔
902
        if (!handleProxyProtocol(d_ci.remote, true, *d_threadData.holders.acl, d_buffer, d_proxiedRemote, d_proxiedDestination, proxyProtocolValues)) {
9!
903
          vinfolog("Error handling the Proxy Protocol received from TCP client %s", d_ci.remote.toStringWithPort());
×
904
          return ProxyProtocolResult::Error;
×
905
        }
×
906

907
        if (!proxyProtocolValues.empty()) {
9✔
908
          d_proxyProtocolValues = make_unique<std::vector<ProxyProtocolValue>>(std::move(proxyProtocolValues));
7✔
909
        }
7✔
910

911
        d_currentPos = 0;
9✔
912
        d_proxyProtocolNeed = 0;
9✔
913
        d_buffer.clear();
9✔
914
        return ProxyProtocolResult::Done;
9✔
915
      }
9✔
916
    }
20✔
917
    else {
4✔
918
      d_lastIOBlocked = true;
4✔
919
    }
4✔
920
  } while (active() && !d_lastIOBlocked);
27✔
921

922
  return ProxyProtocolResult::Reading;
4✔
923
}
16✔
924

925
IOState IncomingTCPConnectionState::handleHandshake(const struct timeval& now)
926
{
2,027✔
927
  DEBUGLOG("doing handshake");
2,027✔
928
  auto iostate = d_handler.tryHandshake();
2,027✔
929
  if (iostate == IOState::Done) {
2,027✔
930
    DEBUGLOG("handshake done");
1,903✔
931
    handleHandshakeDone(now);
1,903✔
932

933
    if (d_ci.cs != nullptr && d_ci.cs->d_enableProxyProtocol && !isProxyPayloadOutsideTLS() && expectProxyProtocolFrom(d_ci.remote)) {
1,903!
934
      d_state = State::readingProxyProtocolHeader;
13✔
935
      d_buffer.resize(s_proxyProtocolMinimumHeaderSize);
13✔
936
      d_proxyProtocolNeed = s_proxyProtocolMinimumHeaderSize;
13✔
937
    }
13✔
938
    else {
1,890✔
939
      d_state = State::readingQuerySize;
1,890✔
940
    }
1,890✔
941
  }
1,903✔
942
  else {
124✔
943
    d_lastIOBlocked = true;
124✔
944
  }
124✔
945

946
  return iostate;
2,027✔
947
}
2,027✔
948

949
IOState IncomingTCPConnectionState::handleIncomingQueryReceived(const struct timeval& now)
950
{
22,545✔
951
  DEBUGLOG("query received");
22,545✔
952
  d_buffer.resize(d_querySize);
22,545✔
953

954
  d_state = State::idle;
22,545✔
955
  auto processingResult = handleQuery(std::move(d_buffer), now, std::nullopt);
22,545✔
956
  switch (processingResult) {
22,545✔
957
  case QueryProcessingResult::TooSmall:
×
958
    /* fall-through */
959
  case QueryProcessingResult::InvalidHeaders:
3✔
960
    /* fall-through */
961
  case QueryProcessingResult::Dropped:
36✔
962
    /* fall-through */
963
  case QueryProcessingResult::NoBackend:
36!
964
    terminateClientConnection();
36✔
965
    ;
36✔
966
  default:
22,541✔
967
    break;
22,541✔
968
  }
22,545✔
969

970
  /* the state might have been updated in the meantime, we don't want to override it
971
     in that case */
972
  if (active() && d_state != State::idle) {
22,541✔
973
    if (d_ioState->isWaitingForRead()) {
20,400✔
974
      return IOState::NeedRead;
20,396✔
975
    }
20,396✔
976
    if (d_ioState->isWaitingForWrite()) {
4!
977
      return IOState::NeedWrite;
4✔
978
    }
4✔
979
    return IOState::Done;
×
980
  }
4✔
981
  return IOState::Done;
2,141✔
982
};
22,541✔
983

984
void IncomingTCPConnectionState::handleExceptionDuringIO(const std::exception& exp)
985
{
1,765✔
986
  if (d_state == State::idle || d_state == State::waitingForQuery) {
1,765✔
987
    /* no need to increase any counters in that case, the client is simply done with us */
988
  }
1,434✔
989
  else if (d_state == State::doingHandshake || d_state == State::readingProxyProtocolHeader || d_state == State::waitingForQuery || d_state == State::readingQuerySize || d_state == State::readingQuery) {
331!
990
    ++d_ci.cs->tcpDiedReadingQuery;
331✔
991
  }
331✔
992
  else if (d_state == State::sendingResponse) {
×
993
    /* unlikely to happen here, the exception should be handled in sendResponse() */
994
    ++d_ci.cs->tcpDiedSendingResponse;
×
995
  }
×
996

997
  if (d_ioState->isWaitingForWrite() || d_queriesCount == 0) {
1,765✔
998
    DEBUGLOG("Got an exception while handling TCP query: " << exp.what());
331✔
999
    vinfolog("Got an exception while handling (%s) TCP query from %s: %s", (d_ioState->isWaitingForRead() ? "reading" : "writing"), d_ci.remote.toStringWithPort(), exp.what());
331!
1000
  }
331✔
1001
  else {
1,434✔
1002
    vinfolog("Closing TCP client connection with %s: %s", d_ci.remote.toStringWithPort(), exp.what());
1,434✔
1003
    DEBUGLOG("Closing TCP client connection: " << exp.what());
1,434✔
1004
  }
1,434✔
1005
  /* remove this FD from the IO multiplexer */
1006
  terminateClientConnection();
1,765✔
1007
}
1,765✔
1008

1009
bool IncomingTCPConnectionState::readIncomingQuery(const timeval& now, IOState& iostate)
1010
{
26,130✔
1011
  if (!d_lastIOBlocked && (d_state == State::waitingForQuery || d_state == State::readingQuerySize)) {
26,130!
1012
    DEBUGLOG("reading query size");
26,011✔
1013
    d_buffer.resize(sizeof(uint16_t));
26,011✔
1014
    iostate = d_handler.tryRead(d_buffer, d_currentPos, sizeof(uint16_t));
26,011✔
1015
    if (d_currentPos > 0) {
26,011✔
1016
      /* if we got at least one byte, we can't go around sending responses */
1017
      d_state = State::readingQuerySize;
22,556✔
1018
    }
22,556✔
1019

1020
    if (iostate == IOState::Done) {
26,011✔
1021
      DEBUGLOG("query size received");
22,552✔
1022
      d_state = State::readingQuery;
22,552✔
1023
      d_querySizeReadTime = now;
22,552✔
1024
      if (d_queriesCount == 0) {
22,552✔
1025
        d_firstQuerySizeReadTime = now;
1,570✔
1026
      }
1,570✔
1027
      d_querySize = d_buffer.at(0) * 256 + d_buffer.at(1);
22,552✔
1028
      if (d_querySize < sizeof(dnsheader)) {
22,552✔
1029
        /* go away */
1030
        terminateClientConnection();
2✔
1031
        return true;
2✔
1032
      }
2✔
1033

1034
      d_buffer.resize(d_querySize);
22,550✔
1035
      d_currentPos = 0;
22,550✔
1036
    }
22,550✔
1037
    else {
3,459✔
1038
      d_lastIOBlocked = true;
3,459✔
1039
    }
3,459✔
1040
  }
26,011✔
1041

1042
  if (!d_lastIOBlocked && d_state == State::readingQuery) {
26,128!
1043
    DEBUGLOG("reading query");
22,669✔
1044
    iostate = d_handler.tryRead(d_buffer, d_currentPos, d_querySize);
22,669✔
1045
    if (iostate == IOState::Done) {
22,669✔
1046
      iostate = handleIncomingQueryReceived(now);
22,545✔
1047
    }
22,545✔
1048
    else {
124✔
1049
      d_lastIOBlocked = true;
124✔
1050
    }
124✔
1051
  }
22,669✔
1052

1053
  return false;
26,128✔
1054
}
26,130✔
1055

1056
void IncomingTCPConnectionState::handleIO()
1057
{
5,922✔
1058
  // why do we loop? Because the TLS layer does buffering, and thus can have data ready to read
1059
  // even though the underlying socket is not ready, so we need to actually ask for the data first
1060
  IOState iostate = IOState::Done;
5,922✔
1061
  timeval now{};
5,922✔
1062
  gettimeofday(&now, nullptr);
5,922✔
1063

1064
  do {
26,521✔
1065
    iostate = IOState::Done;
26,521✔
1066
    IOStateGuard ioGuard(d_ioState);
26,521✔
1067

1068
    if (maxConnectionDurationReached(g_maxTCPConnectionDuration, now)) {
26,521✔
1069
      vinfolog("Terminating TCP connection from %s because it reached the maximum TCP connection duration", d_ci.remote.toStringWithPort());
1!
1070
      // will be handled by the ioGuard
1071
      // handleNewIOState(state, IOState::Done, fd, handleIOCallback);
1072
      return;
1✔
1073
    }
1✔
1074

1075
    d_lastIOBlocked = false;
26,520✔
1076

1077
    try {
26,520✔
1078
      if (d_state == State::starting) {
26,520✔
1079
        if (d_ci.cs != nullptr && d_ci.cs->d_enableProxyProtocol && isProxyPayloadOutsideTLS() && expectProxyProtocolFrom(d_ci.remote)) {
1,904!
1080
          d_state = State::readingProxyProtocolHeader;
1✔
1081
          d_buffer.resize(s_proxyProtocolMinimumHeaderSize);
1✔
1082
          d_proxyProtocolNeed = s_proxyProtocolMinimumHeaderSize;
1✔
1083
        }
1✔
1084
        else {
1,903✔
1085
          d_state = State::doingHandshake;
1,903✔
1086
        }
1,903✔
1087
      }
1,904✔
1088

1089
      if (d_state == State::doingHandshake) {
26,520✔
1090
        iostate = handleHandshake(now);
2,026✔
1091
      }
2,026✔
1092

1093
      if (!d_lastIOBlocked && d_state == State::readingProxyProtocolHeader) {
26,520✔
1094
        auto status = handleProxyProtocolPayload();
14✔
1095
        if (status == ProxyProtocolResult::Done) {
14✔
1096
          d_buffer.resize(sizeof(uint16_t));
7✔
1097

1098
          if (isProxyPayloadOutsideTLS()) {
7✔
1099
            d_state = State::doingHandshake;
1✔
1100
            iostate = handleHandshake(now);
1✔
1101
          }
1✔
1102
          else {
6✔
1103
            d_state = State::readingQuerySize;
6✔
1104
          }
6✔
1105
        }
7✔
1106
        else if (status == ProxyProtocolResult::Error) {
7✔
1107
          iostate = IOState::Done;
3✔
1108
        }
3✔
1109
        else {
4✔
1110
          iostate = IOState::NeedRead;
4✔
1111
        }
4✔
1112
      }
14✔
1113

1114
      if (!d_lastIOBlocked && (d_state == State::waitingForQuery || d_state == State::readingQuerySize || d_state == State::readingQuery)) {
26,520✔
1115
        if (readIncomingQuery(now, iostate)) {
26,130✔
1116
          return;
2✔
1117
        }
2✔
1118
      }
26,130✔
1119

1120
      if (!d_lastIOBlocked && d_state == State::sendingResponse) {
26,518✔
1121
        DEBUGLOG("sending response");
13✔
1122
        iostate = d_handler.tryWrite(d_currentResponse.d_buffer, d_currentPos, d_currentResponse.d_buffer.size());
13✔
1123
        if (iostate == IOState::Done) {
13!
1124
          DEBUGLOG("response sent from " << __PRETTY_FUNCTION__);
13✔
1125
          handleResponseSent(d_currentResponse, d_currentResponse.d_buffer.size());
13✔
1126
          d_state = State::idle;
13✔
1127
        }
13✔
1128
        else {
×
1129
          d_lastIOBlocked = true;
×
1130
        }
×
1131
      }
13✔
1132

1133
      if (active() && !d_lastIOBlocked && iostate == IOState::Done && (d_state == State::idle || d_state == State::waitingForQuery)) {
26,518!
1134
        // try sending queued responses
1135
        DEBUGLOG("send responses, if any");
2,332✔
1136
        auto state = shared_from_this();
2,332✔
1137
        iostate = sendQueuedResponses(state, now);
2,332✔
1138

1139
        if (!d_lastIOBlocked && active() && iostate == IOState::Done) {
2,332!
1140
          // if the query has been passed to a backend, or dropped, and the responses have been sent,
1141
          // we can start reading again
1142
          if (canAcceptNewQueries(now)) {
2,329✔
1143
            resetForNewQuery();
207✔
1144
            iostate = IOState::NeedRead;
207✔
1145
          }
207✔
1146
          else {
2,122✔
1147
            d_state = State::idle;
2,122✔
1148
            iostate = IOState::Done;
2,122✔
1149
          }
2,122✔
1150
        }
2,329✔
1151
      }
2,332✔
1152

1153
      if (d_state != State::idle && d_state != State::doingHandshake && d_state != State::readingProxyProtocolHeader && d_state != State::waitingForQuery && d_state != State::readingQuerySize && d_state != State::readingQuery && d_state != State::sendingResponse) {
26,518!
1154
        vinfolog("Unexpected state %d in handleIOCallback", static_cast<int>(d_state));
×
1155
      }
×
1156
    }
26,518✔
1157
    catch (const std::exception& exp) {
26,520✔
1158
      /* most likely an EOF because the other end closed the connection,
1159
         but it might also be a real IO error or something else.
1160
         Let's just drop the connection
1161
      */
1162
      handleExceptionDuringIO(exp);
1,765✔
1163
    }
1,765✔
1164

1165
    if (!active()) {
26,518✔
1166
      DEBUGLOG("state is no longer active");
1,833✔
1167
      return;
1,833✔
1168
    }
1,833✔
1169

1170
    auto state = shared_from_this();
24,685✔
1171
    if (iostate == IOState::Done) {
24,685✔
1172
      d_ioState->update(iostate, handleIOCallback, state);
2,125✔
1173
    }
2,125✔
1174
    else {
22,560✔
1175
      updateIO(state, iostate, now);
22,560✔
1176
    }
22,560✔
1177
    ioGuard.release();
24,685✔
1178
  } while ((iostate == IOState::NeedRead || iostate == IOState::NeedWrite) && !d_lastIOBlocked);
24,685✔
1179
}
5,922✔
1180

1181
void IncomingTCPConnectionState::notifyIOError(const struct timeval& now, TCPResponse&& response)
1182
{
61✔
1183
  if (std::this_thread::get_id() != d_creatorThreadID) {
61✔
1184
    /* empty buffer will signal an IO error */
1185
    response.d_buffer.clear();
18✔
1186
    handleCrossProtocolResponse(now, std::move(response));
18✔
1187
    return;
18✔
1188
  }
18✔
1189

1190
  std::shared_ptr<IncomingTCPConnectionState> state = shared_from_this();
43✔
1191
  --state->d_currentQueriesCount;
43✔
1192
  state->d_hadErrors = true;
43✔
1193

1194
  if (state->d_state == State::sendingResponse) {
43✔
1195
    /* if we have responses to send, let's do that first */
1196
  }
2✔
1197
  else if (!state->d_queuedResponses.empty()) {
41!
1198
    /* stop reading and send what we have */
1199
    try {
×
1200
      auto iostate = sendQueuedResponses(state, now);
×
1201

1202
      if (state->active() && iostate != IOState::Done) {
×
1203
        // we need to update the state right away, nobody will do that for us
1204
        updateIO(state, iostate, now);
×
1205
      }
×
1206
    }
×
1207
    catch (const std::exception& e) {
×
1208
      vinfolog("Exception in notifyIOError: %s", e.what());
×
1209
    }
×
1210
  }
×
1211
  else {
41✔
1212
    // the backend code already tried to reconnect if it was possible
1213
    state->terminateClientConnection();
41✔
1214
  }
41✔
1215
}
43✔
1216

1217
void IncomingTCPConnectionState::handleXFRResponse(const struct timeval& now, TCPResponse&& response)
1218
{
444✔
1219
  if (std::this_thread::get_id() != d_creatorThreadID) {
444!
1220
    handleCrossProtocolResponse(now, std::move(response));
×
1221
    return;
×
1222
  }
×
1223

1224
  std::shared_ptr<IncomingTCPConnectionState> state = shared_from_this();
444✔
1225
  queueResponse(state, now, std::move(response), true);
444✔
1226
}
444✔
1227

1228
void IncomingTCPConnectionState::handleTimeout(std::shared_ptr<IncomingTCPConnectionState>& state, bool write)
1229
{
17✔
1230
  vinfolog("Timeout while %s TCP client %s", (write ? "writing to" : "reading from"), state->d_ci.remote.toStringWithPort());
17!
1231
  DEBUGLOG("client timeout");
17✔
1232
  DEBUGLOG("Processed " << state->d_queriesCount << " queries, current count is " << state->d_currentQueriesCount << ", " << state->d_ownedConnectionsToBackend.size() << " owned connections, " << state->d_queuedResponses.size() << " response queued");
17✔
1233

1234
  if (write || state->d_currentQueriesCount == 0) {
17✔
1235
    ++state->d_ci.cs->tcpClientTimeouts;
11✔
1236
    state->d_ioState.reset();
11✔
1237
  }
11✔
1238
  else {
6✔
1239
    DEBUGLOG("Going idle");
6✔
1240
    /* we still have some queries in flight, let's just stop reading for now */
1241
    state->d_state = State::idle;
6✔
1242
    state->d_ioState->update(IOState::Done, handleIOCallback, state);
6✔
1243
  }
6✔
1244
}
17✔
1245

1246
static void handleIncomingTCPQuery(int pipefd, FDMultiplexer::funcparam_t& param)
1247
{
1,963✔
1248
  auto* threadData = boost::any_cast<TCPClientThreadData*>(param);
1,963✔
1249

1250
  std::unique_ptr<ConnectionInfo> citmp{nullptr};
1,963✔
1251
  try {
1,963✔
1252
    auto tmp = threadData->queryReceiver.receive();
1,963✔
1253
    if (!tmp) {
1,963!
1254
      return;
×
1255
    }
×
1256
    citmp = std::move(*tmp);
1,963✔
1257
  }
1,963✔
1258
  catch (const std::exception& e) {
1,963✔
1259
    throw std::runtime_error("Error while reading from the TCP query channel: " + std::string(e.what()));
×
1260
  }
×
1261

1262
  g_tcpclientthreads->decrementQueuedCount();
1,963✔
1263

1264
  timeval now{};
1,963✔
1265
  gettimeofday(&now, nullptr);
1,963✔
1266

1267
  if (citmp->cs->dohFrontend) {
1,963✔
1268
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
151✔
1269
    auto state = std::make_shared<IncomingHTTP2Connection>(std::move(*citmp), *threadData, now);
151✔
1270
    state->handleIO();
151✔
1271
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
151✔
1272
  }
151✔
1273
  else {
1,812✔
1274
    auto state = std::make_shared<IncomingTCPConnectionState>(std::move(*citmp), *threadData, now);
1,812✔
1275
    state->handleIO();
1,812✔
1276
  }
1,812✔
1277
}
1,963✔
1278

1279
static void handleCrossProtocolQuery(int pipefd, FDMultiplexer::funcparam_t& param)
1280
{
180✔
1281
  auto* threadData = boost::any_cast<TCPClientThreadData*>(param);
180✔
1282

1283
  std::unique_ptr<CrossProtocolQuery> cpq{nullptr};
180✔
1284
  try {
180✔
1285
    auto tmp = threadData->crossProtocolQueryReceiver.receive();
180✔
1286
    if (!tmp) {
180!
1287
      return;
×
1288
    }
×
1289
    cpq = std::move(*tmp);
180✔
1290
  }
180✔
1291
  catch (const std::exception& e) {
180✔
1292
    throw std::runtime_error("Error while reading from the TCP cross-protocol channel: " + std::string(e.what()));
×
1293
  }
×
1294

1295
  timeval now{};
180✔
1296
  gettimeofday(&now, nullptr);
180✔
1297

1298
  std::shared_ptr<TCPQuerySender> tqs = cpq->getTCPQuerySender();
180✔
1299
  auto query = std::move(cpq->query);
180✔
1300
  auto downstreamServer = std::move(cpq->downstream);
180✔
1301

1302
  try {
180✔
1303
    auto downstream = t_downstreamTCPConnectionsManager.getConnectionToDownstream(threadData->mplexer, downstreamServer, now, std::string());
180✔
1304

1305
    prependSizeToTCPQuery(query.d_buffer, query.d_idstate.d_proxyProtocolPayloadSize);
180✔
1306

1307
    vinfolog("Got query for %s|%s from %s (%s, %d bytes), relayed to %s", query.d_idstate.qname.toLogString(), QType(query.d_idstate.qtype).toString(), query.d_idstate.origRemote.toStringWithPort(), query.d_idstate.protocol.toString(), query.d_buffer.size(), downstreamServer->getNameWithAddr());
180✔
1308

1309
    downstream->queueQuery(tqs, std::move(query));
180✔
1310
  }
180✔
1311
  catch (...) {
180✔
1312
    tqs->notifyIOError(now, std::move(query));
×
1313
  }
×
1314
}
180✔
1315

1316
static void handleCrossProtocolResponse(int pipefd, FDMultiplexer::funcparam_t& param)
1317
{
255✔
1318
  auto* threadData = boost::any_cast<TCPClientThreadData*>(param);
255✔
1319

1320
  std::unique_ptr<TCPCrossProtocolResponse> cpr{nullptr};
255✔
1321
  try {
255✔
1322
    auto tmp = threadData->crossProtocolResponseReceiver.receive();
255✔
1323
    if (!tmp) {
255!
1324
      return;
×
1325
    }
×
1326
    cpr = std::move(*tmp);
255✔
1327
  }
255✔
1328
  catch (const std::exception& e) {
255✔
1329
    throw std::runtime_error("Error while reading from the TCP cross-protocol response: " + std::string(e.what()));
×
1330
  }
×
1331

1332
  auto& response = *cpr;
255✔
1333

1334
  try {
255✔
1335
    if (response.d_response.d_buffer.empty()) {
255✔
1336
      response.d_state->notifyIOError(response.d_now, std::move(response.d_response));
24✔
1337
    }
24✔
1338
    else if (response.d_response.d_idstate.qtype == QType::AXFR || response.d_response.d_idstate.qtype == QType::IXFR) {
231!
1339
      response.d_state->handleXFRResponse(response.d_now, std::move(response.d_response));
×
1340
    }
×
1341
    else {
231✔
1342
      response.d_state->handleResponse(response.d_now, std::move(response.d_response));
231✔
1343
    }
231✔
1344
  }
255✔
1345
  catch (...) {
255✔
1346
    /* no point bubbling up from there */
1347
  }
×
1348
}
255✔
1349

1350
struct TCPAcceptorParam
1351
{
1352
  // NOLINTNEXTLINE(cppcoreguidelines-avoid-const-or-ref-data-members)
1353
  ClientState& clientState;
1354
  ComboAddress local;
1355
  // NOLINTNEXTLINE(cppcoreguidelines-avoid-const-or-ref-data-members)
1356
  LocalStateHolder<NetmaskGroup>& acl;
1357
  int socket{-1};
1358
};
1359

1360
static void acceptNewConnection(const TCPAcceptorParam& param, TCPClientThreadData* threadData);
1361

1362
static void scanForTimeouts(const TCPClientThreadData& data, const timeval& now)
1363
{
6,172✔
1364
  auto expiredReadConns = data.mplexer->getTimeouts(now, false);
6,172✔
1365
  for (const auto& cbData : expiredReadConns) {
6,172!
UNCOV
1366
    if (cbData.second.type() == typeid(std::shared_ptr<IncomingTCPConnectionState>)) {
×
UNCOV
1367
      auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(cbData.second);
×
UNCOV
1368
      if (cbData.first == state->d_handler.getDescriptor()) {
×
UNCOV
1369
        vinfolog("Timeout (read) from remote TCP client %s", state->d_ci.remote.toStringWithPort());
×
UNCOV
1370
        state->handleTimeout(state, false);
×
UNCOV
1371
      }
×
UNCOV
1372
    }
×
1373
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
1374
    else if (cbData.second.type() == typeid(std::shared_ptr<IncomingHTTP2Connection>)) {
×
1375
      auto state = boost::any_cast<std::shared_ptr<IncomingHTTP2Connection>>(cbData.second);
1376
      if (cbData.first == state->d_handler.getDescriptor()) {
×
1377
        vinfolog("Timeout (read) from remote H2 client %s", state->d_ci.remote.toStringWithPort());
×
1378
        std::shared_ptr<IncomingTCPConnectionState> parentState = state;
1379
        state->handleTimeout(parentState, false);
1380
      }
1381
    }
1382
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
1383
    else if (cbData.second.type() == typeid(std::shared_ptr<TCPConnectionToBackend>)) {
×
1384
      auto conn = boost::any_cast<std::shared_ptr<TCPConnectionToBackend>>(cbData.second);
×
1385
      vinfolog("Timeout (read) from remote backend %s", conn->getBackendName());
×
1386
      conn->handleTimeout(now, false);
×
1387
    }
×
UNCOV
1388
  }
×
1389

1390
  auto expiredWriteConns = data.mplexer->getTimeouts(now, true);
6,172✔
1391
  for (const auto& cbData : expiredWriteConns) {
6,172!
1392
    if (cbData.second.type() == typeid(std::shared_ptr<IncomingTCPConnectionState>)) {
×
1393
      auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(cbData.second);
×
1394
      if (cbData.first == state->d_handler.getDescriptor()) {
×
1395
        vinfolog("Timeout (write) from remote TCP client %s", state->d_ci.remote.toStringWithPort());
×
1396
        state->handleTimeout(state, true);
×
1397
      }
×
1398
    }
×
1399
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
1400
    else if (cbData.second.type() == typeid(std::shared_ptr<IncomingHTTP2Connection>)) {
×
1401
      auto state = boost::any_cast<std::shared_ptr<IncomingHTTP2Connection>>(cbData.second);
1402
      if (cbData.first == state->d_handler.getDescriptor()) {
×
1403
        vinfolog("Timeout (write) from remote H2 client %s", state->d_ci.remote.toStringWithPort());
×
1404
        std::shared_ptr<IncomingTCPConnectionState> parentState = state;
1405
        state->handleTimeout(parentState, true);
1406
      }
1407
    }
1408
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
1409
    else if (cbData.second.type() == typeid(std::shared_ptr<TCPConnectionToBackend>)) {
×
1410
      auto conn = boost::any_cast<std::shared_ptr<TCPConnectionToBackend>>(cbData.second);
×
1411
      vinfolog("Timeout (write) from remote backend %s", conn->getBackendName());
×
1412
      conn->handleTimeout(now, true);
×
1413
    }
×
1414
  }
×
1415
}
6,172✔
1416

1417
static void dumpTCPStates(const TCPClientThreadData& data)
1418
{
×
1419
  /* just to keep things clean in the output, debug only */
1420
  static std::mutex s_lock;
×
1421
  std::lock_guard<decltype(s_lock)> lck(s_lock);
×
1422
  if (g_tcpStatesDumpRequested > 0) {
×
1423
    /* no race here, we took the lock so it can only be increased in the meantime */
1424
    --g_tcpStatesDumpRequested;
×
1425
    infolog("Dumping the TCP states, as requested:");
×
1426
    data.mplexer->runForAllWatchedFDs([](bool isRead, int desc, const FDMultiplexer::funcparam_t& param, struct timeval ttd) {
×
1427
      timeval lnow{};
×
1428
      gettimeofday(&lnow, nullptr);
×
1429
      if (ttd.tv_sec > 0) {
×
1430
        infolog("- Descriptor %d is in %s state, TTD in %d", desc, (isRead ? "read" : "write"), (ttd.tv_sec - lnow.tv_sec));
×
1431
      }
×
1432
      else {
×
1433
        infolog("- Descriptor %d is in %s state, no TTD set", desc, (isRead ? "read" : "write"));
×
1434
      }
×
1435

1436
      if (param.type() == typeid(std::shared_ptr<IncomingTCPConnectionState>)) {
×
1437
        auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(param);
×
1438
        infolog(" - %s", state->toString());
×
1439
      }
×
1440
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
1441
      else if (param.type() == typeid(std::shared_ptr<IncomingHTTP2Connection>)) {
×
1442
        auto state = boost::any_cast<std::shared_ptr<IncomingHTTP2Connection>>(param);
1443
        infolog(" - %s", state->toString());
1444
      }
1445
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
1446
      else if (param.type() == typeid(std::shared_ptr<TCPConnectionToBackend>)) {
×
1447
        auto conn = boost::any_cast<std::shared_ptr<TCPConnectionToBackend>>(param);
×
1448
        infolog(" - %s", conn->toString());
×
1449
      }
×
1450
      else if (param.type() == typeid(TCPClientThreadData*)) {
×
1451
        infolog(" - Worker thread pipe");
×
1452
      }
×
1453
    });
×
1454
    infolog("The TCP/DoT client cache has %d active and %d idle outgoing connections cached", t_downstreamTCPConnectionsManager.getActiveCount(), t_downstreamTCPConnectionsManager.getIdleCount());
×
1455
  }
×
1456
}
×
1457

1458
// NOLINTNEXTLINE(performance-unnecessary-value-param): you are wrong, clang-tidy, go home
1459
static void tcpClientThread(pdns::channel::Receiver<ConnectionInfo>&& queryReceiver, pdns::channel::Receiver<CrossProtocolQuery>&& crossProtocolQueryReceiver, pdns::channel::Receiver<TCPCrossProtocolResponse>&& crossProtocolResponseReceiver, pdns::channel::Sender<TCPCrossProtocolResponse>&& crossProtocolResponseSender, std::vector<ClientState*> tcpAcceptStates)
1460
{
2,939✔
1461
  /* we get launched with a pipe on which we receive file descriptors from clients that we own
1462
     from that point on */
1463

1464
  setThreadName("dnsdist/tcpClie");
2,939✔
1465

1466
  try {
2,939✔
1467
    TCPClientThreadData data;
2,939✔
1468
    data.crossProtocolResponseSender = std::move(crossProtocolResponseSender);
2,939✔
1469
    data.queryReceiver = std::move(queryReceiver);
2,939✔
1470
    data.crossProtocolQueryReceiver = std::move(crossProtocolQueryReceiver);
2,939✔
1471
    data.crossProtocolResponseReceiver = std::move(crossProtocolResponseReceiver);
2,939✔
1472

1473
    data.mplexer->addReadFD(data.queryReceiver.getDescriptor(), handleIncomingTCPQuery, &data);
2,939✔
1474
    data.mplexer->addReadFD(data.crossProtocolQueryReceiver.getDescriptor(), handleCrossProtocolQuery, &data);
2,939✔
1475
    data.mplexer->addReadFD(data.crossProtocolResponseReceiver.getDescriptor(), handleCrossProtocolResponse, &data);
2,939✔
1476

1477
    /* only used in single acceptor mode for now */
1478
    auto acl = g_ACL.getLocal();
2,939✔
1479
    std::vector<TCPAcceptorParam> acceptParams;
2,939✔
1480
    acceptParams.reserve(tcpAcceptStates.size());
2,939✔
1481

1482
    for (auto& state : tcpAcceptStates) {
2,939!
1483
      acceptParams.emplace_back(TCPAcceptorParam{*state, state->local, acl, state->tcpFD});
×
1484
      for (const auto& [addr, socket] : state->d_additionalAddresses) {
×
1485
        acceptParams.emplace_back(TCPAcceptorParam{*state, addr, acl, socket});
×
1486
      }
×
1487
    }
×
1488

1489
    auto acceptCallback = [&data](int socket, FDMultiplexer::funcparam_t& funcparam) {
2,939✔
1490
      const auto* acceptorParam = boost::any_cast<const TCPAcceptorParam*>(funcparam);
×
1491
      acceptNewConnection(*acceptorParam, &data);
×
1492
    };
×
1493

1494
    for (const auto& param : acceptParams) {
2,939!
1495
      setNonBlocking(param.socket);
×
1496
      data.mplexer->addReadFD(param.socket, acceptCallback, &param);
×
1497
    }
×
1498

1499
    timeval now{};
2,939✔
1500
    gettimeofday(&now, nullptr);
2,939✔
1501
    time_t lastTimeoutScan = now.tv_sec;
2,939✔
1502

1503
    for (;;) {
22,448✔
1504
      data.mplexer->run(&now);
22,448✔
1505

1506
      try {
22,448✔
1507
        t_downstreamTCPConnectionsManager.cleanupClosedConnections(now);
22,448✔
1508

1509
        if (now.tv_sec > lastTimeoutScan) {
22,448✔
1510
          lastTimeoutScan = now.tv_sec;
6,293✔
1511
          scanForTimeouts(data, now);
6,293✔
1512

1513
          if (g_tcpStatesDumpRequested > 0) {
6,293!
1514
            dumpTCPStates(data);
×
1515
          }
×
1516
        }
6,293✔
1517
      }
22,448✔
1518
      catch (const std::exception& e) {
22,448✔
1519
        warnlog("Error in TCP worker thread: %s", e.what());
×
1520
      }
×
1521
    }
22,448✔
1522
  }
2,939✔
1523
  catch (const std::exception& e) {
2,939✔
1524
    errlog("Fatal error in TCP worker thread: %s", e.what());
×
1525
  }
×
1526
}
2,939✔
1527

1528
static void acceptNewConnection(const TCPAcceptorParam& param, TCPClientThreadData* threadData)
1529
{
2,335✔
1530
  auto& clientState = param.clientState;
2,335✔
1531
  auto& acl = param.acl;
2,335✔
1532
  const bool checkACL = clientState.dohFrontend == nullptr || (!clientState.dohFrontend->d_trustForwardedForHeader && clientState.dohFrontend->d_earlyACLDrop);
2,335!
1533
  const int socket = param.socket;
2,335✔
1534
  bool tcpClientCountIncremented = false;
2,335✔
1535
  ComboAddress remote;
2,335✔
1536
  remote.sin4.sin_family = param.local.sin4.sin_family;
2,335✔
1537

1538
  tcpClientCountIncremented = false;
2,335✔
1539
  try {
2,335✔
1540
    socklen_t remlen = remote.getSocklen();
2,335✔
1541
    ConnectionInfo connInfo(&clientState);
2,335✔
1542
#ifdef HAVE_ACCEPT4
2,335✔
1543
    // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
1544
    connInfo.fd = accept4(socket, reinterpret_cast<struct sockaddr*>(&remote), &remlen, SOCK_NONBLOCK);
2,335✔
1545
#else
1546
    // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
1547
    connInfo.fd = accept(socket, reinterpret_cast<struct sockaddr*>(&remote), &remlen);
1548
#endif
1549
    // will be decremented when the ConnectionInfo object is destroyed, no matter the reason
1550
    auto concurrentConnections = ++clientState.tcpCurrentConnections;
2,335✔
1551

1552
    if (connInfo.fd < 0) {
2,335!
1553
      throw std::runtime_error((boost::format("accepting new connection on socket: %s") % stringerror()).str());
×
1554
    }
×
1555

1556
    if (checkACL && !acl->match(remote)) {
2,335✔
1557
      ++dnsdist::metrics::g_stats.aclDrops;
9✔
1558
      vinfolog("Dropped TCP connection from %s because of ACL", remote.toStringWithPort());
9✔
1559
      return;
9✔
1560
    }
9✔
1561

1562
    if (clientState.d_tcpConcurrentConnectionsLimit > 0 && concurrentConnections > clientState.d_tcpConcurrentConnectionsLimit) {
2,326✔
1563
      vinfolog("Dropped TCP connection from %s because of concurrent connections limit", remote.toStringWithPort());
3✔
1564
      return;
3✔
1565
    }
3✔
1566

1567
    if (concurrentConnections > clientState.tcpMaxConcurrentConnections.load()) {
2,323✔
1568
      clientState.tcpMaxConcurrentConnections.store(concurrentConnections);
467✔
1569
    }
467✔
1570

1571
#ifndef HAVE_ACCEPT4
1572
    if (!setNonBlocking(connInfo.fd)) {
1573
      return;
1574
    }
1575
#endif
1576

1577
    setTCPNoDelay(connInfo.fd); // disable NAGLE
2,323✔
1578

1579
    if (g_maxTCPQueuedConnections > 0 && g_tcpclientthreads->getQueuedCount() >= g_maxTCPQueuedConnections) {
2,323!
1580
      vinfolog("Dropping TCP connection from %s because we have too many queued already", remote.toStringWithPort());
×
1581
      return;
×
1582
    }
×
1583

1584
    if (!dnsdist::IncomingConcurrentTCPConnectionsManager::accountNewTCPConnection(remote)) {
2,323✔
1585
      vinfolog("Dropping TCP connection from %s because we have too many from this client already", remote.toStringWithPort());
2✔
1586
      return;
2✔
1587
    }
2✔
1588
    tcpClientCountIncremented = true;
2,321✔
1589

1590
    vinfolog("Got TCP connection from %s", remote.toStringWithPort());
2,321✔
1591

1592
    connInfo.remote = remote;
2,321✔
1593

1594
    if (threadData == nullptr) {
2,321✔
1595
      if (!g_tcpclientthreads->passConnectionToThread(std::make_unique<ConnectionInfo>(std::move(connInfo)))) {
1,963!
1596
        if (tcpClientCountIncremented) {
×
1597
          dnsdist::IncomingConcurrentTCPConnectionsManager::accountClosedTCPConnection(remote);
×
1598
        }
×
1599
      }
×
1600
    }
1,963✔
1601
    else {
358✔
1602
      timeval now{};
358✔
1603
      gettimeofday(&now, nullptr);
358✔
1604

1605
      if (connInfo.cs->dohFrontend) {
358!
1606
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
1607
        auto state = std::make_shared<IncomingHTTP2Connection>(std::move(connInfo), *threadData, now);
1608
        state->handleIO();
1609
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
1610
      }
×
1611
      else {
358✔
1612
        auto state = std::make_shared<IncomingTCPConnectionState>(std::move(connInfo), *threadData, now);
358✔
1613
        state->handleIO();
358✔
1614
      }
358✔
1615
    }
358✔
1616
  }
2,321✔
1617
  catch (const std::exception& e) {
2,335✔
1618
    errlog("While reading a TCP question: %s", e.what());
×
1619
    if (tcpClientCountIncremented) {
×
1620
      dnsdist::IncomingConcurrentTCPConnectionsManager::accountClosedTCPConnection(remote);
×
1621
    }
×
1622
  }
×
1623
  catch (...) {
2,335✔
1624
  }
×
1625
}
2,335✔
1626

1627
/* spawn as many of these as required, they call Accept on a socket on which they will accept queries, and
1628
   they will hand off to worker threads & spawn more of them if required
1629
*/
1630
#ifndef USE_SINGLE_ACCEPTOR_THREAD
1631
void tcpAcceptorThread(const std::vector<ClientState*>& states)
1632
{
358✔
1633
  setThreadName("dnsdist/tcpAcce");
358✔
1634

1635
  auto acl = g_ACL.getLocal();
358✔
1636
  std::vector<TCPAcceptorParam> params;
358✔
1637
  params.reserve(states.size());
358✔
1638

1639
  for (const auto& state : states) {
358✔
1640
    params.emplace_back(TCPAcceptorParam{*state, state->local, acl, state->tcpFD});
358✔
1641
    for (const auto& [addr, socket] : state->d_additionalAddresses) {
358!
1642
      params.emplace_back(TCPAcceptorParam{*state, addr, acl, socket});
×
1643
    }
×
1644
  }
358✔
1645

1646
  if (params.size() == 1) {
358!
1647
    while (true) {
2,693✔
1648
      acceptNewConnection(params.at(0), nullptr);
2,335✔
1649
    }
2,335✔
1650
  }
358✔
1651
  else {
×
1652
    auto acceptCallback = [](int socket, FDMultiplexer::funcparam_t& funcparam) {
×
1653
      const auto* acceptorParam = boost::any_cast<const TCPAcceptorParam*>(funcparam);
×
1654
      acceptNewConnection(*acceptorParam, nullptr);
×
1655
    };
×
1656

1657
    auto mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent(params.size()));
×
1658
    for (const auto& param : params) {
×
1659
      mplexer->addReadFD(param.socket, acceptCallback, &param);
×
1660
    }
×
1661

1662
    timeval now{};
×
1663
    while (true) {
×
1664
      mplexer->run(&now, -1);
×
1665
    }
×
1666
  }
×
1667
}
358✔
1668
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc