• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

PowerDNS / pdns / 16615247828

30 Jul 2025 06:33AM UTC coverage: 65.845% (-0.03%) from 65.87%
16615247828

Pull #15942

github

web-flow
Merge 3e4243857 into 4a7b6a621
Pull Request #15942: Optimize reload-zones logic to reduce thread scheduling times

42051 of 92438 branches covered (45.49%)

Branch coverage included in aggregate %.

4 of 4 new or added lines in 1 file covered. (100.0%)

48 existing lines in 12 files now uncovered.

127942 of 165732 relevant lines covered (77.2%)

5529729.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.14
/pdns/dnsdistdist/dnsdist-tcp.cc
1
/*
2
 * This file is part of PowerDNS or dnsdist.
3
 * Copyright -- PowerDNS.COM B.V. and its contributors
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of version 2 of the GNU General Public License as
7
 * published by the Free Software Foundation.
8
 *
9
 * In addition, for the avoidance of any doubt, permission is granted to
10
 * link this program with OpenSSL and to (re)distribute the binaries
11
 * produced as the result of such linking.
12
 *
13
 * This program is distributed in the hope that it will be useful,
14
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
 * GNU General Public License for more details.
17
 *
18
 * You should have received a copy of the GNU General Public License
19
 * along with this program; if not, write to the Free Software
20
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21
 */
22

23
#include <thread>
24
#include <netinet/tcp.h>
25
#include <queue>
26
#include <boost/format.hpp>
27

28
#include "dnsdist.hh"
29
#include "dnsdist-concurrent-connections.hh"
30
#include "dnsdist-dnsparser.hh"
31
#include "dnsdist-ecs.hh"
32
#include "dnsdist-edns.hh"
33
#include "dnsdist-nghttp2-in.hh"
34
#include "dnsdist-proxy-protocol.hh"
35
#include "dnsdist-rings.hh"
36
#include "dnsdist-tcp.hh"
37
#include "dnsdist-tcp-downstream.hh"
38
#include "dnsdist-downstream-connection.hh"
39
#include "dnsdist-tcp-upstream.hh"
40
#include "dnsparser.hh"
41
#include "dolog.hh"
42
#include "gettime.hh"
43
#include "lock.hh"
44
#include "sstuff.hh"
45
#include "tcpiohandler.hh"
46
#include "tcpiohandler-mplexer.hh"
47
#include "threadname.hh"
48

49
/* TCP: the grand design.
50
   We forward 'messages' between clients and downstream servers. Messages are 65k bytes large, tops.
51
   An answer might theoretically consist of multiple messages (for example, in the case of AXFR), initially
52
   we will not go there.
53

54
   In a sense there is a strong symmetry between UDP and TCP, once a connection to a downstream has been setup.
55
   This symmetry is broken because of head-of-line blocking within TCP though, necessitating additional connections
56
   to guarantee performance.
57

58
   So the idea is to have a 'pool' of available downstream connections, and forward messages to/from them and never queue.
59
   So whenever an answer comes in, we know where it needs to go.
60

61
   Let's start naively.
62
*/
63

64
std::atomic<uint64_t> g_tcpStatesDumpRequested{0};
65

66
IncomingTCPConnectionState::~IncomingTCPConnectionState()
67
{
2,723✔
68
  try {
2,723✔
69
    dnsdist::IncomingConcurrentTCPConnectionsManager::accountClosedTCPConnection(d_ci.remote);
2,723✔
70
  }
2,723✔
71
  catch (...) {
2,723✔
72
    /* in theory it might raise an exception, and we cannot allow it to be uncaught in a dtor */
73
  }
×
74

75
  if (d_ci.cs != nullptr) {
2,723!
76
    timeval now{};
2,722✔
77
    gettimeofday(&now, nullptr);
2,722✔
78

79
    auto diff = now - d_connectionStartTime;
2,722✔
80
    d_ci.cs->updateTCPMetrics(d_queriesCount, diff.tv_sec * 1000 + diff.tv_usec / 1000, d_queriesCount > 0 ? d_readIOsTotal / d_queriesCount : d_readIOsTotal);
2,722✔
81
  }
2,722✔
82

83
  // would have been done when the object is destroyed anyway,
84
  // but that way we make sure it's done before the ConnectionInfo is destroyed,
85
  // closing the descriptor, instead of relying on the declaration order of the objects in the class
86
  d_handler.close();
2,722✔
87
}
2,722✔
88

89
dnsdist::Protocol IncomingTCPConnectionState::getProtocol() const
90
{
44,003✔
91
  if (d_ci.cs->dohFrontend) {
44,003✔
92
    return dnsdist::Protocol::DoH;
206✔
93
  }
206✔
94
  if (d_handler.isTLS()) {
43,797✔
95
    return dnsdist::Protocol::DoT;
41,187✔
96
  }
41,187✔
97
  return dnsdist::Protocol::DoTCP;
2,610✔
98
}
43,797✔
99

100
size_t IncomingTCPConnectionState::clearAllDownstreamConnections()
101
{
190✔
102
  return t_downstreamTCPConnectionsManager.clear();
190✔
103
}
190✔
104

105
static std::pair<std::shared_ptr<TCPConnectionToBackend>, bool> getOwnedDownstreamConnection(std::map<std::shared_ptr<DownstreamState>, std::deque<std::shared_ptr<TCPConnectionToBackend>>>& ownedConnectionsToBackend, const std::shared_ptr<DownstreamState>& backend, const std::unique_ptr<std::vector<ProxyProtocolValue>>& tlvs)
106
{
22,630✔
107
  bool tlvsMismatch = false;
22,630✔
108
  auto connIt = ownedConnectionsToBackend.find(backend);
22,630✔
109
  if (connIt == ownedConnectionsToBackend.end()) {
22,630✔
110
    DEBUGLOG("no owned connection found for " << backend->getName());
22,593✔
111
    return {nullptr, tlvsMismatch};
22,593✔
112
  }
22,593✔
113

114
  for (auto& conn : connIt->second) {
37✔
115
    if (conn->canBeReused(true)) {
37!
116
      if (conn->matchesTLVs(tlvs)) {
37✔
117
        DEBUGLOG("Got one owned connection accepting more for " << backend->getName());
32✔
118
        conn->setReused();
32✔
119
        ++backend->tcpReusedConnections;
32✔
120
        return {conn, tlvsMismatch};
32✔
121
      }
32✔
122
      DEBUGLOG("Found one connection to " << backend->getName() << " but with different TLV values");
5✔
123
      tlvsMismatch = true;
5✔
124
    }
5✔
125
    DEBUGLOG("not accepting more for " << backend->getName());
5✔
126
  }
5✔
127

128
  return {nullptr, tlvsMismatch};
5✔
129
}
37✔
130

131
bool IncomingTCPConnectionState::isNearTCPLimits() const
132
{
222,379✔
133
  if (d_ci.d_restricted) {
222,379✔
134
    return true;
5✔
135
  }
5✔
136

137
  const auto tcpConnectionsOverloadThreshold = dnsdist::configuration::getImmutableConfiguration().d_tcpConnectionsOverloadThreshold;
222,374✔
138
  if (tcpConnectionsOverloadThreshold == 0) {
222,374✔
139
    return false;
556✔
140
  }
556✔
141

142
  const auto& clientState = d_ci.cs;
221,818✔
143
  if (clientState->d_tcpConcurrentConnectionsLimit > 0) {
221,818✔
144
    auto concurrentConnections = clientState->tcpCurrentConnections.load();
90✔
145
    auto current = (100 * concurrentConnections) / clientState->d_tcpConcurrentConnectionsLimit;
90✔
146
    if (current >= tcpConnectionsOverloadThreshold) {
90✔
147
      return true;
18✔
148
    }
18✔
149
  }
90✔
150

151
  return dnsdist::IncomingConcurrentTCPConnectionsManager::isClientOverThreshold(d_ci.remote);
221,800✔
152
}
221,818✔
153

154
std::shared_ptr<TCPConnectionToBackend> IncomingTCPConnectionState::getDownstreamConnection(std::shared_ptr<DownstreamState>& backend, const std::unique_ptr<std::vector<ProxyProtocolValue>>& tlvs, const struct timeval& now)
155
{
22,630✔
156
  auto [downstream, tlvsMismatch] = getOwnedDownstreamConnection(d_ownedConnectionsToBackend, backend, tlvs);
22,630✔
157

158
  if (!downstream) {
22,630✔
159
    if (backend->d_config.useProxyProtocol && tlvsMismatch) {
22,598✔
160
      clearOwnedDownstreamConnections(backend);
5✔
161
    }
5✔
162

163
    /* we don't have a connection to this backend owned yet, let's get one (it might not be a fresh one, though) */
164
    downstream = t_downstreamTCPConnectionsManager.getConnectionToDownstream(d_threadData.mplexer, backend, now, std::string());
22,598✔
165
    // if we had an existing connection but the TLVs are different, they are likely unique per query so do not bother keeping the connection
166
    // around
167
    if (backend->d_config.useProxyProtocol && !tlvsMismatch) {
22,598✔
168
      registerOwnedDownstreamConnection(downstream);
21✔
169
    }
21✔
170
  }
22,598✔
171

172
  return downstream;
22,630✔
173
}
22,630✔
174

175
static void tcpClientThread(pdns::channel::Receiver<ConnectionInfo>&& queryReceiver, pdns::channel::Receiver<CrossProtocolQuery>&& crossProtocolQueryReceiver, pdns::channel::Receiver<TCPCrossProtocolResponse>&& crossProtocolResponseReceiver, pdns::channel::Sender<TCPCrossProtocolResponse>&& crossProtocolResponseSender, std::vector<ClientState*> tcpAcceptStates);
176

177
TCPClientCollection::TCPClientCollection(size_t maxThreads, std::vector<ClientState*> tcpAcceptStates) :
178
  d_tcpclientthreads(maxThreads), d_maxthreads(maxThreads)
374✔
179
{
374✔
180
  for (size_t idx = 0; idx < maxThreads; idx++) {
3,925✔
181
    addTCPClientThread(tcpAcceptStates);
3,551✔
182
  }
3,551✔
183
}
374✔
184

185
void TCPClientCollection::addTCPClientThread(std::vector<ClientState*>& tcpAcceptStates)
186
{
3,551✔
187
  try {
3,551✔
188
    const auto internalPipeBufferSize = dnsdist::configuration::getImmutableConfiguration().d_tcpInternalPipeBufferSize;
3,551✔
189

190
    auto [queryChannelSender, queryChannelReceiver] = pdns::channel::createObjectQueue<ConnectionInfo>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, internalPipeBufferSize);
3,551✔
191

192
    auto [crossProtocolQueryChannelSender, crossProtocolQueryChannelReceiver] = pdns::channel::createObjectQueue<CrossProtocolQuery>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, internalPipeBufferSize);
3,551✔
193

194
    auto [crossProtocolResponseChannelSender, crossProtocolResponseChannelReceiver] = pdns::channel::createObjectQueue<TCPCrossProtocolResponse>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, internalPipeBufferSize);
3,551✔
195

196
    vinfolog("Adding TCP Client thread");
3,551✔
197

198
    if (d_numthreads >= d_tcpclientthreads.size()) {
3,551!
199
      vinfolog("Adding a new TCP client thread would exceed the vector size (%d/%d), skipping. Consider increasing the maximum amount of TCP client threads with setMaxTCPClientThreads() in the configuration.", d_numthreads.load(), d_tcpclientthreads.size());
×
200
      return;
×
201
    }
×
202

203
    TCPWorkerThread worker(std::move(queryChannelSender), std::move(crossProtocolQueryChannelSender));
3,551✔
204

205
    try {
3,551✔
206
      std::thread clientThread(tcpClientThread, std::move(queryChannelReceiver), std::move(crossProtocolQueryChannelReceiver), std::move(crossProtocolResponseChannelReceiver), std::move(crossProtocolResponseChannelSender), tcpAcceptStates);
3,551✔
207
      clientThread.detach();
3,551✔
208
    }
3,551✔
209
    catch (const std::runtime_error& e) {
3,551✔
210
      errlog("Error creating a TCP thread: %s", e.what());
×
211
      return;
×
212
    }
×
213

214
    d_tcpclientthreads.at(d_numthreads) = std::move(worker);
3,551✔
215
    ++d_numthreads;
3,551✔
216
  }
3,551✔
217
  catch (const std::exception& e) {
3,551✔
218
    errlog("Error creating TCP worker: %s", e.what());
×
219
  }
×
220
}
3,551✔
221

222
std::unique_ptr<TCPClientCollection> g_tcpclientthreads;
223

224
static IOState sendQueuedResponses(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now)
225
{
46,209✔
226
  IOState result = IOState::Done;
46,209✔
227

228
  while (state->active() && !state->d_queuedResponses.empty()) {
89,844✔
229
    DEBUGLOG("queue size is " << state->d_queuedResponses.size() << ", sending the next one");
43,651✔
230
    TCPResponse resp = std::move(state->d_queuedResponses.front());
43,651✔
231
    state->d_queuedResponses.pop_front();
43,651✔
232
    state->d_state = IncomingTCPConnectionState::State::idle;
43,651✔
233
    result = state->sendResponse(now, std::move(resp));
43,651✔
234
    if (result != IOState::Done) {
43,651✔
235
      return result;
16✔
236
    }
16✔
237
  }
43,651✔
238

239
  state->d_state = IncomingTCPConnectionState::State::idle;
46,193✔
240
  return IOState::Done;
46,193✔
241
}
46,209✔
242

243
void IncomingTCPConnectionState::handleResponseSent(TCPResponse& currentResponse, size_t sentBytes)
244
{
43,645✔
245
  if (currentResponse.d_idstate.qtype == QType::AXFR || currentResponse.d_idstate.qtype == QType::IXFR) {
43,645✔
246
    return;
451✔
247
  }
451✔
248

249
  --d_currentQueriesCount;
43,194✔
250

251
  const auto& backend = currentResponse.d_connection ? currentResponse.d_connection->getDS() : currentResponse.d_ds;
43,194✔
252
  if (!currentResponse.d_idstate.selfGenerated && backend) {
43,194!
253
    const auto& ids = currentResponse.d_idstate;
22,700✔
254
    double udiff = ids.queryRealTime.udiff();
22,700✔
255
    vinfolog("Got answer from %s, relayed to %s (%s, %d bytes), took %f us", backend->d_config.remote.toStringWithPort(), ids.origRemote.toStringWithPort(), getProtocol().toString(), sentBytes, udiff);
22,700✔
256

257
    auto backendProtocol = backend->getProtocol();
22,700✔
258
    if (backendProtocol == dnsdist::Protocol::DoUDP && !currentResponse.d_idstate.forwardedOverUDP) {
22,700✔
259
      backendProtocol = dnsdist::Protocol::DoTCP;
2,278✔
260
    }
2,278✔
261
    ::handleResponseSent(ids, udiff, d_ci.remote, backend->d_config.remote, static_cast<unsigned int>(sentBytes), currentResponse.d_cleartextDH, backendProtocol, true);
22,700✔
262
  }
22,700✔
263
  else {
20,494✔
264
    const auto& ids = currentResponse.d_idstate;
20,494✔
265
    ::handleResponseSent(ids, 0., d_ci.remote, ComboAddress(), static_cast<unsigned int>(currentResponse.d_buffer.size()), currentResponse.d_cleartextDH, ids.protocol, false);
20,494✔
266
  }
20,494✔
267

268
  currentResponse.d_buffer.clear();
43,194✔
269
  currentResponse.d_connection.reset();
43,194✔
270
}
43,194✔
271

272
static void prependSizeToTCPQuery(PacketBuffer& buffer, size_t proxyProtocolPayloadSize)
273
{
22,856✔
274
  if (buffer.size() <= proxyProtocolPayloadSize) {
22,856!
275
    throw std::runtime_error("The payload size is smaller or equal to the buffer size");
×
276
  }
×
277

278
  uint16_t queryLen = proxyProtocolPayloadSize > 0 ? (buffer.size() - proxyProtocolPayloadSize) : buffer.size();
22,856✔
279
  const std::array<uint8_t, 2> sizeBytes{static_cast<uint8_t>(queryLen / 256), static_cast<uint8_t>(queryLen % 256)};
22,856✔
280
  /* prepend the size. Yes, this is not the most efficient way but it prevents mistakes
281
     that could occur if we had to deal with the size during the processing,
282
     especially alignment issues */
283
  buffer.insert(buffer.begin() + static_cast<PacketBuffer::iterator::difference_type>(proxyProtocolPayloadSize), sizeBytes.begin(), sizeBytes.end());
22,856✔
284
}
22,856✔
285

286
bool IncomingTCPConnectionState::canAcceptNewQueries(const struct timeval& now)
287
{
46,188✔
288
  if (d_hadErrors) {
46,188✔
289
    DEBUGLOG("not accepting new queries because we encountered some error during the processing already");
2✔
290
    return false;
2✔
291
  }
2✔
292

293
  if (isNearTCPLimits()) {
46,186✔
294
    d_ci.d_restricted = true;
4✔
295
    DEBUGLOG("not accepting new queries because we already near our TCP limits");
4✔
296
    return false;
4✔
297
  }
4✔
298

299
  // for DoH, this is already handled by the underlying library
300
  if (!d_ci.cs->dohFrontend && d_currentQueriesCount >= d_ci.cs->d_maxInFlightQueriesPerConn) {
46,182✔
301
    DEBUGLOG("not accepting new queries because we already have " << d_currentQueriesCount << " out of " << d_ci.cs->d_maxInFlightQueriesPerConn);
2,836✔
302
    return false;
2,836✔
303
  }
2,836✔
304

305
  const auto& currentConfig = dnsdist::configuration::getCurrentRuntimeConfiguration();
43,346✔
306
  if (currentConfig.d_maxTCPQueriesPerConn != 0 && d_queriesCount > currentConfig.d_maxTCPQueriesPerConn) {
43,346✔
307
    vinfolog("not accepting new queries from %s because it reached the maximum number of queries per conn (%d / %d)", d_ci.remote.toStringWithPort(), d_queriesCount, currentConfig.d_maxTCPQueriesPerConn);
6!
308
    return false;
6✔
309
  }
6✔
310

311
  if (maxConnectionDurationReached(currentConfig.d_maxTCPConnectionDuration, now)) {
43,340!
312
    vinfolog("not accepting new queries from %s because it reached the maximum TCP connection duration", d_ci.remote.toStringWithPort());
×
313
    return false;
×
314
  }
×
315

316
  return true;
43,340✔
317
}
43,340✔
318

319
void IncomingTCPConnectionState::resetForNewQuery()
320
{
43,340✔
321
  d_buffer.clear();
43,340✔
322
  d_currentPos = 0;
43,340✔
323
  d_querySize = 0;
43,340✔
324
  d_state = State::waitingForQuery;
43,340✔
325
  d_readIOsTotal += d_readIOsCurrentQuery;
43,340✔
326
  d_readIOsCurrentQuery = 0;
43,340✔
327
}
43,340✔
328

329
boost::optional<timeval> IncomingTCPConnectionState::getClientReadTTD(timeval now) const
330
{
87,656✔
331
  const auto& runtimeConfiguration = dnsdist::configuration::getCurrentRuntimeConfiguration();
87,656✔
332
  if (!isNearTCPLimits() && runtimeConfiguration.d_maxTCPConnectionDuration == 0 && runtimeConfiguration.d_tcpRecvTimeout == 0) {
87,656!
333
    return boost::none;
×
334
  }
×
335

336
  size_t maxTCPConnectionDuration = runtimeConfiguration.d_maxTCPConnectionDuration;
87,656✔
337
  uint16_t tcpRecvTimeout = runtimeConfiguration.d_tcpRecvTimeout;
87,656✔
338
  uint32_t tcpRecvTimeoutUsec = 0U;
87,656✔
339
  if (isNearTCPLimits()) {
87,656✔
340
    constexpr size_t maxTCPConnectionDurationNearLimits = 5U;
8✔
341
    constexpr uint32_t tcpRecvTimeoutUsecNearLimits = 500U * 1000U;
8✔
342
    maxTCPConnectionDuration = runtimeConfiguration.d_maxTCPConnectionDuration != 0 ? std::min(runtimeConfiguration.d_maxTCPConnectionDuration, maxTCPConnectionDurationNearLimits) : maxTCPConnectionDurationNearLimits;
8!
343
    tcpRecvTimeout = 0;
8✔
344
    tcpRecvTimeoutUsec = tcpRecvTimeoutUsecNearLimits;
8✔
345
  }
8✔
346

347
  if (maxTCPConnectionDuration > 0) {
87,656✔
348
    auto elapsed = now.tv_sec - d_connectionStartTime.tv_sec;
521✔
349
    if (elapsed < 0 || (static_cast<size_t>(elapsed) >= maxTCPConnectionDuration)) {
521!
350
      return now;
×
351
    }
×
352
    auto remaining = maxTCPConnectionDuration - elapsed;
521✔
353
    if (!isNearTCPLimits() && (runtimeConfiguration.d_tcpRecvTimeout == 0 || remaining <= static_cast<size_t>(runtimeConfiguration.d_tcpRecvTimeout))) {
521!
354
      now.tv_sec += static_cast<time_t>(remaining);
20✔
355
      return now;
20✔
356
    }
20✔
357
  }
521✔
358

359
  now.tv_sec += static_cast<time_t>(tcpRecvTimeout);
87,636✔
360
  now.tv_usec += tcpRecvTimeoutUsec;
87,636✔
361
  normalizeTV(now);
87,636✔
362
  return now;
87,636✔
363
}
87,656✔
364

365
boost::optional<timeval> IncomingTCPConnectionState::getClientWriteTTD(const timeval& now) const
366
{
24✔
367
  const auto& runtimeConfiguration = dnsdist::configuration::getCurrentRuntimeConfiguration();
24✔
368
  if (runtimeConfiguration.d_maxTCPConnectionDuration == 0 && runtimeConfiguration.d_tcpSendTimeout == 0) {
24!
369
    return boost::none;
×
370
  }
×
371

372
  timeval res(now);
24✔
373

374
  if (runtimeConfiguration.d_maxTCPConnectionDuration > 0) {
24!
375
    auto elapsed = res.tv_sec - d_connectionStartTime.tv_sec;
×
376
    if (elapsed < 0 || static_cast<size_t>(elapsed) >= runtimeConfiguration.d_maxTCPConnectionDuration) {
×
377
      return res;
×
378
    }
×
379
    auto remaining = runtimeConfiguration.d_maxTCPConnectionDuration - elapsed;
×
380
    if (runtimeConfiguration.d_tcpSendTimeout == 0 || remaining <= static_cast<size_t>(runtimeConfiguration.d_tcpSendTimeout)) {
×
381
      res.tv_sec += static_cast<time_t>(remaining);
×
382
      return res;
×
383
    }
×
384
  }
×
385

386
  res.tv_sec += static_cast<time_t>(runtimeConfiguration.d_tcpSendTimeout);
24✔
387
  return res;
24✔
388
}
24✔
389

390
bool IncomingTCPConnectionState::maxConnectionDurationReached(unsigned int maxConnectionDuration, const timeval& now) const
391
{
92,920✔
392
  if (maxConnectionDuration > 0) {
92,920✔
393
    time_t curtime = now.tv_sec;
788✔
394
    unsigned int elapsed = 0;
788✔
395
    if (curtime > d_connectionStartTime.tv_sec) { // To prevent issues when time goes backward
788✔
396
      elapsed = curtime - d_connectionStartTime.tv_sec;
397✔
397
    }
397✔
398
    if (elapsed >= maxConnectionDuration) {
788✔
399
      return true;
1✔
400
    }
1✔
401
  }
788✔
402

403
  return false;
92,919✔
404
}
92,920✔
405

406
void IncomingTCPConnectionState::registerOwnedDownstreamConnection(std::shared_ptr<TCPConnectionToBackend>& conn)
407
{
21✔
408
  const auto& downstream = conn->getDS();
21✔
409
  auto& queue = d_ownedConnectionsToBackend[downstream];
21✔
410
  // how many proxy-protocol enabled connections do we want to keep around?
411
  // - they are only usable for this incoming connection because of the proxy protocol header containing the source and destination addresses and ports
412
  // - if we have TLV values, and they are unique per query, keeping these is useless
413
  // - if there is no, or identical, TLV values, a single outgoing connection is enough if maxInFlight == 1, or if incoming maxInFlight == outgoing maxInFlight
414
  // so it makes sense to keep a few of them around if incoming maxInFlight is greater than outgoing maxInFlight
415

416
  auto incomingMaxInFlightQueriesPerConn = d_ci.cs->d_maxInFlightQueriesPerConn;
21✔
417
  incomingMaxInFlightQueriesPerConn = std::max(incomingMaxInFlightQueriesPerConn, static_cast<size_t>(1U));
21✔
418
  auto outgoingMaxInFlightQueriesPerConn = downstream->d_config.d_maxInFlightQueriesPerConn;
21✔
419
  outgoingMaxInFlightQueriesPerConn = std::max(outgoingMaxInFlightQueriesPerConn, static_cast<size_t>(1U));
21✔
420
  size_t maxCachedOutgoingConnections = std::min(static_cast<size_t>(incomingMaxInFlightQueriesPerConn / outgoingMaxInFlightQueriesPerConn), static_cast<size_t>(5U));
21✔
421

422
  queue.push_front(conn);
21✔
423
  if (queue.size() > maxCachedOutgoingConnections) {
21!
424
    queue.pop_back();
×
425
  }
×
426
}
21✔
427

428
void IncomingTCPConnectionState::clearOwnedDownstreamConnections(const std::shared_ptr<DownstreamState>& downstream)
429
{
5✔
430
  d_ownedConnectionsToBackend.erase(downstream);
5✔
431
}
5✔
432

433
/* called when the buffer has been set and the rules have been processed, and only from handleIO (sometimes indirectly via handleQuery) */
434
IOState IncomingTCPConnectionState::sendResponse(const struct timeval& now, TCPResponse&& response)
435
{
43,509✔
436
  (void)now;
43,509✔
437
  d_state = State::sendingResponse;
43,509✔
438

439
  const auto responseSize = static_cast<uint16_t>(response.d_buffer.size());
43,509✔
440
  const std::array<uint8_t, 2> sizeBytes{static_cast<uint8_t>(responseSize / 256), static_cast<uint8_t>(responseSize % 256)};
43,509✔
441
  /* prepend the size. Yes, this is not the most efficient way but it prevents mistakes
442
     that could occur if we had to deal with the size during the processing,
443
     especially alignment issues */
444
  response.d_buffer.insert(response.d_buffer.begin(), sizeBytes.begin(), sizeBytes.end());
43,509✔
445
  d_currentPos = 0;
43,509✔
446
  d_currentResponse = std::move(response);
43,509✔
447

448
  try {
43,509✔
449
    auto iostate = d_handler.tryWrite(d_currentResponse.d_buffer, d_currentPos, d_currentResponse.d_buffer.size());
43,509✔
450
    if (iostate == IOState::Done) {
43,509✔
451
      DEBUGLOG("response sent from " << __PRETTY_FUNCTION__);
43,490✔
452
      handleResponseSent(d_currentResponse, d_currentResponse.d_buffer.size());
43,490✔
453
      return iostate;
43,490✔
454
    }
43,490✔
455
    d_lastIOBlocked = true;
19✔
456
    DEBUGLOG("partial write");
19✔
457
    return iostate;
19✔
458
  }
43,509✔
459
  catch (const std::exception& e) {
43,509✔
460
    vinfolog("Closing TCP client connection with %s: %s", d_ci.remote.toStringWithPort(), e.what());
4!
461
    DEBUGLOG("Closing TCP client connection: " << e.what());
4✔
462
    ++d_ci.cs->tcpDiedSendingResponse;
4✔
463

464
    terminateClientConnection();
4✔
465

466
    return IOState::Done;
4✔
467
  }
4✔
468
}
43,509✔
469

470
void IncomingTCPConnectionState::terminateClientConnection()
471
{
2,473✔
472
  DEBUGLOG("terminating client connection");
2,473✔
473
  d_queuedResponses.clear();
2,473✔
474
  /* we have already released idle connections that could be reused,
475
     we don't care about the ones still waiting for responses */
476
  for (auto& backend : d_ownedConnectionsToBackend) {
2,473✔
477
    for (auto& conn : backend.second) {
15✔
478
      conn->release(true);
15✔
479
    }
15✔
480
  }
15✔
481
  d_ownedConnectionsToBackend.clear();
2,473✔
482

483
  /* meaning we will no longer be 'active' when the backend
484
     response or timeout comes in */
485
  d_ioState.reset();
2,473✔
486

487
  /* if we do have remaining async descriptors associated with this TLS
488
     connection, we need to defer the destruction of the TLS object until
489
     the engine has reported back, otherwise we have a use-after-free.. */
490
  auto afds = d_handler.getAsyncFDs();
2,473✔
491
  if (afds.empty()) {
2,473!
492
    d_handler.close();
2,473✔
493
  }
2,473✔
494
  else {
×
495
    /* we might already be waiting, but we might also not because sometimes we have already been
496
       notified via the descriptor, not received Async again, but the async job still exists.. */
497
    auto state = shared_from_this();
×
498
    for (const auto desc : afds) {
×
499
      try {
×
500
        state->d_threadData.mplexer->addReadFD(desc, handleAsyncReady, state);
×
501
      }
×
502
      catch (...) {
×
503
      }
×
504
    }
×
505
  }
×
506
}
2,473✔
507

508
void IncomingTCPConnectionState::queueResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response, bool fromBackend)
509
{
43,651✔
510
  // queue response
511
  state->d_queuedResponses.emplace_back(std::move(response));
43,651✔
512
  DEBUGLOG("queueing response, state is " << (int)state->d_state << ", queue size is now " << state->d_queuedResponses.size());
43,651✔
513

514
  // when the response comes from a backend, there is a real possibility that we are currently
515
  // idle, and thus not trying to send the response right away would make our ref count go to 0.
516
  // Even if we are waiting for a query, we will not wake up before the new query arrives or a
517
  // timeout occurs
518
  if (state->d_state == State::idle || state->d_state == State::waitingForQuery) {
43,651✔
519
    auto iostate = sendQueuedResponses(state, now);
43,453✔
520

521
    if (iostate == IOState::Done && state->active()) {
43,453✔
522
      if (state->canAcceptNewQueries(now)) {
43,434✔
523
        state->resetForNewQuery();
43,123✔
524
        state->d_state = State::waitingForQuery;
43,123✔
525
        iostate = IOState::NeedRead;
43,123✔
526
      }
43,123✔
527
      else {
311✔
528
        state->d_state = State::idle;
311✔
529
      }
311✔
530
    }
43,434✔
531

532
    // for the same reason we need to update the state right away, nobody will do that for us
533
    if (state->active()) {
43,453✔
534
      state->updateIO(iostate, now);
43,448✔
535
      // if we have not finished reading every available byte, we _need_ to do an actual read
536
      // attempt before waiting for the socket to become readable again, because if there is
537
      // buffered data available the socket might never become readable again.
538
      // This is true as soon as we deal with TLS because TLS records are processed one by
539
      // one and might not match what we see at the application layer, so data might already
540
      // be available in the TLS library's buffers. This is especially true when OpenSSL's
541
      // read-ahead mode is enabled because then it buffers even more than one SSL record
542
      // for performance reasons.
543
      if (fromBackend && !state->d_lastIOBlocked) {
43,448✔
544
        state->handleIO();
22,843✔
545
      }
22,843✔
546
    }
43,448✔
547
  }
43,453✔
548
}
43,651✔
549

550
void IncomingTCPConnectionState::handleAsyncReady([[maybe_unused]] int desc, FDMultiplexer::funcparam_t& param)
551
{
×
552
  auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(param);
×
553

554
  /* If we are here, the async jobs for this SSL* are finished
555
     so we should be able to remove all FDs */
556
  auto afds = state->d_handler.getAsyncFDs();
×
557
  for (const auto afd : afds) {
×
558
    try {
×
559
      state->d_threadData.mplexer->removeReadFD(afd);
×
560
    }
×
561
    catch (...) {
×
562
    }
×
563
  }
×
564

565
  if (state->active()) {
×
566
    /* and now we restart our own I/O state machine */
567
    state->handleIO();
×
568
  }
×
569
  else {
×
570
    /* we were only waiting for the engine to come back,
571
       to prevent a use-after-free */
572
    state->d_handler.close();
×
573
  }
×
574
}
×
575

576
void IncomingTCPConnectionState::updateIOForAsync(std::shared_ptr<IncomingTCPConnectionState>& conn)
577
{
×
578
  auto fds = conn->d_handler.getAsyncFDs();
×
579
  for (const auto desc : fds) {
×
580
    conn->d_threadData.mplexer->addReadFD(desc, handleAsyncReady, conn);
×
581
  }
×
582
  conn->d_ioState->update(IOState::Done, handleIOCallback, conn);
×
583
}
×
584

585
void IncomingTCPConnectionState::updateIO(IOState newState, const struct timeval& now)
586
{
87,237✔
587
  auto sharedPtrToConn = shared_from_this();
87,237✔
588
  if (newState == IOState::Async) {
87,237!
589
    updateIOForAsync(sharedPtrToConn);
×
590
    return;
×
591
  }
×
592

593
  d_ioState->update(newState, handleIOCallback, sharedPtrToConn, newState == IOState::NeedWrite ? getClientWriteTTD(now) : getClientReadTTD(now));
87,237✔
594
}
87,237✔
595

596
/* called from the backend code when a new response has been received */
597
void IncomingTCPConnectionState::handleResponse(const struct timeval& now, TCPResponse&& response)
598
{
22,948✔
599
  if (std::this_thread::get_id() != d_creatorThreadID) {
22,948✔
600
    handleCrossProtocolResponse(now, std::move(response));
133✔
601
    return;
133✔
602
  }
133✔
603

604
  std::shared_ptr<IncomingTCPConnectionState> state = shared_from_this();
22,815✔
605

606
  if (!response.isAsync() && response.d_connection && response.d_connection->getDS() && response.d_connection->getDS()->d_config.useProxyProtocol) {
22,815!
607
    // if we have added a TCP Proxy Protocol payload to a connection, don't release it to the general pool as no one else will be able to use it anyway
608
    if (!response.d_connection->willBeReusable(true)) {
51!
609
      // if it can't be reused even by us, well
610
      const auto connIt = state->d_ownedConnectionsToBackend.find(response.d_connection->getDS());
×
611
      if (connIt != state->d_ownedConnectionsToBackend.end()) {
×
612
        auto& list = connIt->second;
×
613

614
        for (auto it = list.begin(); it != list.end(); ++it) {
×
615
          if (*it == response.d_connection) {
×
616
            try {
×
617
              response.d_connection->release(true);
×
618
            }
×
619
            catch (const std::exception& e) {
×
620
              vinfolog("Error releasing connection: %s", e.what());
×
621
            }
×
622
            list.erase(it);
×
623
            break;
×
624
          }
×
625
        }
×
626
      }
×
627
    }
×
628
  }
51✔
629

630
  if (response.d_buffer.size() < sizeof(dnsheader)) {
22,815✔
631
    state->terminateClientConnection();
2✔
632
    return;
2✔
633
  }
2✔
634

635
  if (!response.isAsync()) {
22,813✔
636
    try {
22,729✔
637
      auto& ids = response.d_idstate;
22,729✔
638
      std::shared_ptr<DownstreamState> backend = response.d_ds ? response.d_ds : (response.d_connection ? response.d_connection->getDS() : nullptr);
22,729!
639
      if (backend == nullptr || !responseContentMatches(response.d_buffer, ids.qname, ids.qtype, ids.qclass, backend, dnsdist::configuration::getCurrentRuntimeConfiguration().d_allowEmptyResponse)) {
22,729!
640
        state->terminateClientConnection();
3✔
641
        return;
3✔
642
      }
3✔
643

644
      if (backend != nullptr) {
22,726!
645
        ++backend->responses;
22,726✔
646
      }
22,726✔
647

648
      DNSResponse dnsResponse(ids, response.d_buffer, backend);
22,726✔
649
      dnsResponse.d_incomingTCPState = state;
22,726✔
650

651
      memcpy(&response.d_cleartextDH, dnsResponse.getHeader().get(), sizeof(response.d_cleartextDH));
22,726✔
652

653
      if (!processResponse(response.d_buffer, dnsResponse, false)) {
22,726✔
654
        state->terminateClientConnection();
6✔
655
        return;
6✔
656
      }
6✔
657

658
      if (dnsResponse.isAsynchronous()) {
22,720✔
659
        /* we are done for now */
660
        return;
80✔
661
      }
80✔
662
    }
22,720✔
663
    catch (const std::exception& e) {
22,729✔
664
      vinfolog("Unexpected exception while handling response from backend: %s", e.what());
4!
665
      state->terminateClientConnection();
4✔
666
      return;
4✔
667
    }
4✔
668
  }
22,729✔
669

670
  ++dnsdist::metrics::g_stats.responses;
22,720✔
671
  ++state->d_ci.cs->responses;
22,720✔
672

673
  queueResponse(state, now, std::move(response), true);
22,720✔
674
}
22,720✔
675

676
struct TCPCrossProtocolResponse
677
{
678
  TCPCrossProtocolResponse(TCPResponse&& response, std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now) :
679
    d_response(std::move(response)), d_state(state), d_now(now)
283✔
680
  {
283✔
681
  }
283✔
682
  TCPCrossProtocolResponse(const TCPCrossProtocolResponse&) = delete;
683
  TCPCrossProtocolResponse& operator=(const TCPCrossProtocolResponse&) = delete;
684
  TCPCrossProtocolResponse(TCPCrossProtocolResponse&&) = delete;
685
  TCPCrossProtocolResponse& operator=(TCPCrossProtocolResponse&&) = delete;
686
  ~TCPCrossProtocolResponse() = default;
283✔
687

688
  TCPResponse d_response;
689
  std::shared_ptr<IncomingTCPConnectionState> d_state;
690
  struct timeval d_now;
691
};
692

693
class TCPCrossProtocolQuery : public CrossProtocolQuery
694
{
695
public:
696
  TCPCrossProtocolQuery(PacketBuffer&& buffer, InternalQueryState&& ids, std::shared_ptr<DownstreamState> backend, std::shared_ptr<IncomingTCPConnectionState> sender) :
697
    CrossProtocolQuery(InternalQuery(std::move(buffer), std::move(ids)), backend), d_sender(std::move(sender))
231✔
698
  {
231✔
699
  }
231✔
700
  TCPCrossProtocolQuery(const TCPCrossProtocolQuery&) = delete;
701
  TCPCrossProtocolQuery& operator=(const TCPCrossProtocolQuery&) = delete;
702
  TCPCrossProtocolQuery(TCPCrossProtocolQuery&&) = delete;
703
  TCPCrossProtocolQuery& operator=(TCPCrossProtocolQuery&&) = delete;
704
  ~TCPCrossProtocolQuery() override = default;
231✔
705

706
  std::shared_ptr<TCPQuerySender> getTCPQuerySender() override
707
  {
208✔
708
    return d_sender;
208✔
709
  }
208✔
710

711
  DNSQuestion getDQ() override
712
  {
178✔
713
    auto& ids = query.d_idstate;
178✔
714
    DNSQuestion dnsQuestion(ids, query.d_buffer);
178✔
715
    dnsQuestion.d_incomingTCPState = d_sender;
178✔
716
    return dnsQuestion;
178✔
717
  }
178✔
718

719
  DNSResponse getDR() override
720
  {
72✔
721
    auto& ids = query.d_idstate;
72✔
722
    DNSResponse dnsResponse(ids, query.d_buffer, downstream);
72✔
723
    dnsResponse.d_incomingTCPState = d_sender;
72✔
724
    return dnsResponse;
72✔
725
  }
72✔
726

727
private:
728
  std::shared_ptr<IncomingTCPConnectionState> d_sender;
729
};
730

731
std::unique_ptr<CrossProtocolQuery> IncomingTCPConnectionState::getCrossProtocolQuery(PacketBuffer&& query, InternalQueryState&& state, const std::shared_ptr<DownstreamState>& backend)
732
{
4✔
733
  return std::make_unique<TCPCrossProtocolQuery>(std::move(query), std::move(state), backend, shared_from_this());
4✔
734
}
4✔
735

736
std::unique_ptr<CrossProtocolQuery> getTCPCrossProtocolQueryFromDQ(DNSQuestion& dnsQuestion)
737
{
197✔
738
  auto state = dnsQuestion.getIncomingTCPState();
197✔
739
  if (!state) {
197!
740
    throw std::runtime_error("Trying to create a TCP cross protocol query without a valid TCP state");
×
741
  }
×
742

743
  dnsQuestion.ids.origID = dnsQuestion.getHeader()->id;
197✔
744
  return std::make_unique<TCPCrossProtocolQuery>(std::move(dnsQuestion.getMutableData()), std::move(dnsQuestion.ids), nullptr, std::move(state));
197✔
745
}
197✔
746

747
void IncomingTCPConnectionState::handleCrossProtocolResponse(const struct timeval& now, TCPResponse&& response)
748
{
283✔
749
  std::shared_ptr<IncomingTCPConnectionState> state = shared_from_this();
283✔
750
  try {
283✔
751
    auto ptr = std::make_unique<TCPCrossProtocolResponse>(std::move(response), state, now);
283✔
752
    if (!state->d_threadData.crossProtocolResponseSender.send(std::move(ptr))) {
283!
753
      ++dnsdist::metrics::g_stats.tcpCrossProtocolResponsePipeFull;
×
754
      vinfolog("Unable to pass a cross-protocol response to the TCP worker thread because the pipe is full");
×
755
    }
×
756
  }
283✔
757
  catch (const std::exception& e) {
283✔
758
    vinfolog("Unable to pass a cross-protocol response to the TCP worker thread because we couldn't write to the pipe: %s", stringerror());
×
759
  }
×
760
}
283✔
761

762
IncomingTCPConnectionState::QueryProcessingResult IncomingTCPConnectionState::handleQuery(PacketBuffer&& queryIn, const struct timeval& now, std::optional<int32_t> streamID)
763
{
43,365✔
764
  auto query = std::move(queryIn);
43,365✔
765
  if (query.size() < sizeof(dnsheader)) {
43,365✔
766
    ++dnsdist::metrics::g_stats.nonCompliantQueries;
4✔
767
    ++d_ci.cs->nonCompliantQueries;
4✔
768
    return QueryProcessingResult::TooSmall;
4✔
769
  }
4✔
770

771
  ++d_queriesCount;
43,361✔
772
  ++d_ci.cs->queries;
43,361✔
773
  ++dnsdist::metrics::g_stats.queries;
43,361✔
774

775
  if (d_handler.isTLS()) {
43,361✔
776
    auto tlsVersion = d_handler.getTLSVersion();
41,272✔
777
    switch (tlsVersion) {
41,272✔
778
    case LibsslTLSVersion::TLS10:
×
779
      ++d_ci.cs->tls10queries;
×
780
      break;
×
781
    case LibsslTLSVersion::TLS11:
×
782
      ++d_ci.cs->tls11queries;
×
783
      break;
×
784
    case LibsslTLSVersion::TLS12:
14✔
785
      ++d_ci.cs->tls12queries;
14✔
786
      break;
14✔
787
    case LibsslTLSVersion::TLS13:
41,258✔
788
      ++d_ci.cs->tls13queries;
41,258✔
789
      break;
41,258✔
790
    default:
×
791
      ++d_ci.cs->tlsUnknownqueries;
×
792
    }
41,272✔
793
  }
41,272✔
794

795
  auto state = shared_from_this();
43,361✔
796
  InternalQueryState ids;
43,361✔
797
  ids.origDest = d_proxiedDestination;
43,361✔
798
  ids.origRemote = d_proxiedRemote;
43,361✔
799
  ids.cs = d_ci.cs;
43,361✔
800
  ids.queryRealTime.start();
43,361✔
801
  if (streamID) {
43,361✔
802
    ids.d_streamID = *streamID;
159✔
803
  }
159✔
804

805
  auto dnsCryptResponse = checkDNSCryptQuery(*d_ci.cs, query, ids.dnsCryptQuery, ids.queryRealTime.d_start.tv_sec, true);
43,361✔
806
  if (dnsCryptResponse) {
43,361!
807
    TCPResponse response;
×
808
    d_state = State::idle;
×
809
    ++d_currentQueriesCount;
×
810
    queueResponse(state, now, std::move(response), false);
×
811
    return QueryProcessingResult::SelfAnswered;
×
812
  }
×
813

814
  {
43,361✔
815
    /* this pointer will be invalidated the second the buffer is resized, don't hold onto it! */
816
    const dnsheader_aligned dnsHeader(query.data());
43,361✔
817
    if (!checkQueryHeaders(*dnsHeader, *d_ci.cs)) {
43,361✔
818
      return QueryProcessingResult::InvalidHeaders;
5✔
819
    }
5✔
820

821
    if (dnsHeader->qdcount == 0) {
43,356✔
822
      TCPResponse response;
3✔
823
      auto queryID = dnsHeader->id;
3✔
824
      dnsdist::PacketMangling::editDNSHeaderFromPacket(query, [](dnsheader& header) {
3✔
825
        header.rcode = RCode::NotImp;
3✔
826
        header.qr = true;
3✔
827
        return true;
3✔
828
      });
3✔
829
      response.d_idstate = std::move(ids);
3✔
830
      response.d_idstate.origID = queryID;
3✔
831
      response.d_idstate.selfGenerated = true;
3✔
832
      response.d_buffer = std::move(query);
3✔
833
      d_state = State::idle;
3✔
834
      ++d_currentQueriesCount;
3✔
835
      queueResponse(state, now, std::move(response), false);
3✔
836
      return QueryProcessingResult::SelfAnswered;
3✔
837
    }
3✔
838
  }
43,356✔
839

840
  // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast
841
  ids.qname = DNSName(reinterpret_cast<const char*>(query.data()), static_cast<int>(query.size()), sizeof(dnsheader), false, &ids.qtype, &ids.qclass);
43,353✔
842
  ids.protocol = getProtocol();
43,353✔
843
  if (ids.dnsCryptQuery) {
43,353✔
844
    ids.protocol = dnsdist::Protocol::DNSCryptTCP;
16✔
845
  }
16✔
846

847
  DNSQuestion dnsQuestion(ids, query);
43,353✔
848
  dnsdist::PacketMangling::editDNSHeaderFromPacket(dnsQuestion.getMutableData(), [&ids](dnsheader& header) {
43,353✔
849
    const uint16_t* flags = getFlagsFromDNSHeader(&header);
43,351✔
850
    ids.origFlags = *flags;
43,351✔
851
    return true;
43,351✔
852
  });
43,351✔
853
  dnsQuestion.d_incomingTCPState = state;
43,353✔
854
  dnsQuestion.sni = d_handler.getServerNameIndication();
43,353✔
855

856
  if (d_proxyProtocolValues) {
43,353✔
857
    /* we need to copy them, because the next queries received on that connection will
858
       need to get the _unaltered_ values */
859
    dnsQuestion.proxyProtocolValues = make_unique<std::vector<ProxyProtocolValue>>(*d_proxyProtocolValues);
34✔
860
  }
34✔
861

862
  if (dnsQuestion.ids.qtype == QType::AXFR || dnsQuestion.ids.qtype == QType::IXFR) {
43,353✔
863
    dnsQuestion.ids.skipCache = true;
27✔
864
  }
27✔
865

866
  if (forwardViaUDPFirst()) {
43,353✔
867
    // if there was no EDNS, we add it with a large buffer size
868
    // so we can use UDP to talk to the backend.
869
    const dnsheader_aligned dnsHeader(query.data());
153✔
870
    if (dnsHeader->arcount == 0U) {
153✔
871
      if (addEDNS(query, 4096, false, 4096, 0)) {
145!
872
        dnsQuestion.ids.ednsAdded = true;
145✔
873
      }
145✔
874
    }
145✔
875
  }
153✔
876

877
  if (streamID) {
43,353✔
878
    auto unit = getDOHUnit(*streamID);
153✔
879
    if (unit) {
153!
880
      dnsQuestion.ids.du = std::move(unit);
153✔
881
    }
153✔
882
  }
153✔
883

884
  std::shared_ptr<DownstreamState> backend;
43,353✔
885
  auto result = processQuery(dnsQuestion, backend);
43,353✔
886

887
  if (result == ProcessQueryResult::Asynchronous) {
43,353✔
888
    /* we are done for now */
889
    ++d_currentQueriesCount;
108✔
890
    return QueryProcessingResult::Asynchronous;
108✔
891
  }
108✔
892

893
  if (streamID) {
43,245✔
894
    restoreDOHUnit(std::move(dnsQuestion.ids.du));
117✔
895
  }
117✔
896

897
  if (result == ProcessQueryResult::Drop) {
43,245✔
898
    return QueryProcessingResult::Dropped;
39✔
899
  }
39✔
900

901
  // the buffer might have been invalidated by now
902
  uint16_t queryID{0};
43,206✔
903
  {
43,206✔
904
    const auto dnsHeader = dnsQuestion.getHeader();
43,206✔
905
    queryID = dnsHeader->id;
43,206✔
906
  }
43,206✔
907

908
  if (result == ProcessQueryResult::SendAnswer) {
43,206✔
909
    TCPResponse response;
20,479✔
910
    {
20,479✔
911
      const auto dnsHeader = dnsQuestion.getHeader();
20,479✔
912
      memcpy(&response.d_cleartextDH, dnsHeader.get(), sizeof(response.d_cleartextDH));
20,479✔
913
    }
20,479✔
914
    response.d_idstate = std::move(ids);
20,479✔
915
    response.d_idstate.origID = queryID;
20,479✔
916
    response.d_idstate.selfGenerated = true;
20,479✔
917
    response.d_idstate.cs = d_ci.cs;
20,479✔
918
    response.d_buffer = std::move(query);
20,479✔
919

920
    d_state = State::idle;
20,479✔
921
    ++d_currentQueriesCount;
20,479✔
922
    queueResponse(state, now, std::move(response), false);
20,479✔
923
    return QueryProcessingResult::SelfAnswered;
20,479✔
924
  }
20,479✔
925

926
  if (result != ProcessQueryResult::PassToBackend || backend == nullptr) {
22,727!
927
    return QueryProcessingResult::NoBackend;
×
928
  }
×
929

930
  dnsQuestion.ids.origID = queryID;
22,727✔
931

932
  ++d_currentQueriesCount;
22,727✔
933

934
  std::string proxyProtocolPayload;
22,727✔
935
  if (backend->isDoH()) {
22,727✔
936
    vinfolog("Got query for %s|%s from %s (%s, %d bytes), relayed to %s", ids.qname.toLogString(), QType(ids.qtype).toString(), d_proxiedRemote.toStringWithPort(), getProtocol().toString(), query.size(), backend->getNameWithAddr());
30✔
937

938
    /* we need to do this _before_ creating the cross protocol query because
939
       after that the buffer will have been moved */
940
    if (backend->d_config.useProxyProtocol) {
30✔
941
      proxyProtocolPayload = getProxyProtocolPayload(dnsQuestion);
1✔
942
    }
1✔
943

944
    auto cpq = std::make_unique<TCPCrossProtocolQuery>(std::move(query), std::move(ids), backend, state);
30✔
945
    cpq->query.d_proxyProtocolPayload = std::move(proxyProtocolPayload);
30✔
946

947
    backend->passCrossProtocolQuery(std::move(cpq));
30✔
948
    return QueryProcessingResult::Forwarded;
30✔
949
  }
30✔
950
  if (!backend->isTCPOnly() && forwardViaUDPFirst()) {
22,697✔
951
    if (streamID) {
64!
952
      auto unit = getDOHUnit(*streamID);
64✔
953
      if (unit) {
64!
954
        dnsQuestion.ids.du = std::move(unit);
64✔
955
      }
64✔
956
    }
64✔
957
    if (assignOutgoingUDPQueryToBackend(backend, queryID, dnsQuestion, query)) {
64✔
958
      return QueryProcessingResult::Forwarded;
63✔
959
    }
63✔
960
    restoreDOHUnit(std::move(dnsQuestion.ids.du));
1✔
961
    // fallback to the normal flow
962
  }
1✔
963

964
  prependSizeToTCPQuery(query, 0);
22,634✔
965

966
  auto downstreamConnection = getDownstreamConnection(backend, dnsQuestion.proxyProtocolValues, now);
22,634✔
967

968
  if (backend->d_config.useProxyProtocol) {
22,634✔
969
    /* if we ever sent a TLV over a connection, we can never go back */
970
    if (!d_proxyProtocolPayloadHasTLV) {
58✔
971
      d_proxyProtocolPayloadHasTLV = dnsQuestion.proxyProtocolValues && !dnsQuestion.proxyProtocolValues->empty();
34!
972
    }
34✔
973

974
    proxyProtocolPayload = getProxyProtocolPayload(dnsQuestion);
58✔
975
  }
58✔
976

977
  if (dnsQuestion.proxyProtocolValues) {
22,634✔
978
    downstreamConnection->setProxyProtocolValuesSent(std::move(dnsQuestion.proxyProtocolValues));
31✔
979
  }
31✔
980

981
  TCPQuery tcpquery(std::move(query), std::move(ids));
22,634✔
982
  tcpquery.d_proxyProtocolPayload = std::move(proxyProtocolPayload);
22,634✔
983

984
  vinfolog("Got query for %s|%s from %s (%s, %d bytes), relayed to %s", tcpquery.d_idstate.qname.toLogString(), QType(tcpquery.d_idstate.qtype).toString(), d_proxiedRemote.toStringWithPort(), getProtocol().toString(), tcpquery.d_buffer.size(), backend->getNameWithAddr());
22,634✔
985
  std::shared_ptr<TCPQuerySender> incoming = state;
22,634✔
986
  downstreamConnection->queueQuery(incoming, std::move(tcpquery));
22,634✔
987
  return QueryProcessingResult::Forwarded;
22,634✔
988
}
22,697✔
989

990
void IncomingTCPConnectionState::handleIOCallback(int desc, FDMultiplexer::funcparam_t& param)
991
{
2,995✔
992
  auto conn = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(param);
2,995✔
993
  if (desc != conn->d_handler.getDescriptor()) {
2,995!
994
    // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-array-to-pointer-decay): __PRETTY_FUNCTION__ is fine
995
    throw std::runtime_error("Unexpected socket descriptor " + std::to_string(desc) + " received in " + std::string(__PRETTY_FUNCTION__) + ", expected " + std::to_string(conn->d_handler.getDescriptor()));
×
996
  }
×
997

998
  conn->handleIO();
2,995✔
999
}
2,995✔
1000

1001
void IncomingTCPConnectionState::handleHandshakeDone(const struct timeval& now)
1002
{
2,721✔
1003
  if (d_handler.isTLS()) {
2,721✔
1004
    if (!d_handler.hasTLSSessionBeenResumed()) {
662✔
1005
      ++d_ci.cs->tlsNewSessions;
426✔
1006
      dnsdist::IncomingConcurrentTCPConnectionsManager::accountTLSNewSession(d_ci.remote);
426✔
1007
    }
426✔
1008
    else {
236✔
1009
      ++d_ci.cs->tlsResumptions;
236✔
1010
      dnsdist::IncomingConcurrentTCPConnectionsManager::accountTLSResumedSession(d_ci.remote);
236✔
1011
    }
236✔
1012
    if (d_handler.getResumedFromInactiveTicketKey()) {
662✔
1013
      ++d_ci.cs->tlsInactiveTicketKey;
8✔
1014
    }
8✔
1015
    if (d_handler.getUnknownTicketKey()) {
662✔
1016
      ++d_ci.cs->tlsUnknownTicketKey;
6✔
1017
    }
6✔
1018
  }
662✔
1019

1020
  d_handshakeDoneTime = now;
2,721✔
1021
}
2,721✔
1022

1023
IncomingTCPConnectionState::ProxyProtocolResult IncomingTCPConnectionState::handleProxyProtocolPayload()
1024
{
19✔
1025
  do {
32✔
1026
    DEBUGLOG("reading proxy protocol header");
32✔
1027
    auto iostate = d_handler.tryRead(d_buffer, d_currentPos, d_proxyProtocolNeed, false, isProxyPayloadOutsideTLS());
32✔
1028
    if (iostate == IOState::Done) {
32✔
1029
      d_buffer.resize(d_currentPos);
27✔
1030
      ssize_t remaining = isProxyHeaderComplete(d_buffer);
27✔
1031
      if (remaining == 0) {
27✔
1032
        vinfolog("Unable to consume proxy protocol header in packet from TCP client %s", d_ci.remote.toStringWithPort());
3!
1033
        ++dnsdist::metrics::g_stats.proxyProtocolInvalid;
3✔
1034
        return ProxyProtocolResult::Error;
3✔
1035
      }
3✔
1036
      if (remaining < 0) {
24✔
1037
        d_proxyProtocolNeed += -remaining;
13✔
1038
        d_buffer.resize(d_currentPos + d_proxyProtocolNeed);
13✔
1039
        /* we need to keep reading, since we might have buffered data */
1040
      }
13✔
1041
      else {
11✔
1042
        /* proxy header received */
1043
        std::vector<ProxyProtocolValue> proxyProtocolValues;
11✔
1044
        if (!handleProxyProtocol(d_ci.remote, true, dnsdist::configuration::getCurrentRuntimeConfiguration().d_ACL, d_buffer, d_proxiedRemote, d_proxiedDestination, proxyProtocolValues)) {
11!
1045
          vinfolog("Error handling the Proxy Protocol received from TCP client %s", d_ci.remote.toStringWithPort());
×
1046
          return ProxyProtocolResult::Error;
×
1047
        }
×
1048

1049
        if (!proxyProtocolValues.empty()) {
11✔
1050
          d_proxyProtocolValues = make_unique<std::vector<ProxyProtocolValue>>(std::move(proxyProtocolValues));
9✔
1051
        }
9✔
1052

1053
        d_currentPos = 0;
11✔
1054
        d_proxyProtocolNeed = 0;
11✔
1055
        d_buffer.clear();
11✔
1056
        return ProxyProtocolResult::Done;
11✔
1057
      }
11✔
1058
    }
24✔
1059
    else {
5✔
1060
      d_lastIOBlocked = true;
5✔
1061
    }
5✔
1062
  } while (active() && !d_lastIOBlocked);
32✔
1063

1064
  return ProxyProtocolResult::Reading;
5✔
1065
}
19✔
1066

1067
IOState IncomingTCPConnectionState::handleHandshake(const struct timeval& now)
1068
{
2,886✔
1069
  DEBUGLOG("doing handshake");
2,886✔
1070
  auto iostate = d_handler.tryHandshake();
2,886✔
1071
  if (iostate == IOState::Done) {
2,886✔
1072
    DEBUGLOG("handshake done");
2,516✔
1073
    handleHandshakeDone(now);
2,516✔
1074

1075
    if (d_ci.cs != nullptr && d_ci.cs->d_enableProxyProtocol && !isProxyPayloadOutsideTLS() && expectProxyProtocolFrom(d_ci.remote)) {
2,516!
1076
      d_state = State::readingProxyProtocolHeader;
16✔
1077
      d_buffer.resize(s_proxyProtocolMinimumHeaderSize);
16✔
1078
      d_proxyProtocolNeed = s_proxyProtocolMinimumHeaderSize;
16✔
1079
    }
16✔
1080
    else {
2,500✔
1081
      d_state = State::readingQuerySize;
2,500✔
1082
    }
2,500✔
1083
  }
2,516✔
1084
  else {
370✔
1085
    d_lastIOBlocked = true;
370✔
1086
  }
370✔
1087

1088
  return iostate;
2,886✔
1089
}
2,886✔
1090

1091
IOState IncomingTCPConnectionState::handleIncomingQueryReceived(const struct timeval& now)
1092
{
43,202✔
1093
  DEBUGLOG("query received");
43,202✔
1094
  d_buffer.resize(d_querySize);
43,202✔
1095

1096
  d_state = State::idle;
43,202✔
1097
  auto processingResult = handleQuery(std::move(d_buffer), now, std::nullopt);
43,202✔
1098
  switch (processingResult) {
43,202✔
1099
  case QueryProcessingResult::TooSmall:
×
1100
    /* fall-through */
1101
  case QueryProcessingResult::InvalidHeaders:
3✔
1102
    /* fall-through */
1103
  case QueryProcessingResult::Dropped:
37✔
1104
    /* fall-through */
1105
  case QueryProcessingResult::NoBackend:
37!
1106
    terminateClientConnection();
37✔
1107
    ;
37✔
1108
  default:
43,198✔
1109
    break;
43,198✔
1110
  }
43,202✔
1111

1112
  /* the state might have been updated in the meantime, we don't want to override it
1113
     in that case */
1114
  if (active() && d_state != State::idle) {
43,198✔
1115
    if (d_ioState->isWaitingForRead()) {
40,707✔
1116
      return IOState::NeedRead;
40,703✔
1117
    }
40,703✔
1118
    if (d_ioState->isWaitingForWrite()) {
4!
1119
      return IOState::NeedWrite;
4✔
1120
    }
4✔
1121
    return IOState::Done;
×
1122
  }
4✔
1123
  return IOState::Done;
2,491✔
1124
};
43,198✔
1125

1126
void IncomingTCPConnectionState::handleExceptionDuringIO(const std::exception& exp)
1127
{
2,374✔
1128
  if (d_state == State::idle || d_state == State::waitingForQuery) {
2,374✔
1129
    /* no need to increase any counters in that case, the client is simply done with us */
1130
  }
1,978✔
1131
  else if (d_state == State::doingHandshake || d_state == State::readingProxyProtocolHeader || d_state == State::waitingForQuery || d_state == State::readingQuerySize || d_state == State::readingQuery) {
396!
1132
    ++d_ci.cs->tcpDiedReadingQuery;
396✔
1133
  }
396✔
1134
  else if (d_state == State::sendingResponse) {
×
1135
    /* unlikely to happen here, the exception should be handled in sendResponse() */
1136
    ++d_ci.cs->tcpDiedSendingResponse;
×
1137
  }
×
1138

1139
  if (d_ioState->isWaitingForWrite() || d_queriesCount == 0) {
2,374!
1140
    DEBUGLOG("Got an exception while handling TCP query: " << exp.what());
396✔
1141
    vinfolog("Got an exception while handling (%s) TCP query from %s: %s", (d_ioState->isWaitingForRead() ? "reading" : "writing"), d_ci.remote.toStringWithPort(), exp.what());
396!
1142
  }
396✔
1143
  else {
1,978✔
1144
    vinfolog("Closing TCP client connection with %s: %s", d_ci.remote.toStringWithPort(), exp.what());
1,978✔
1145
    DEBUGLOG("Closing TCP client connection: " << exp.what());
1,978✔
1146
  }
1,978✔
1147
  /* remove this FD from the IO multiplexer */
1148
  terminateClientConnection();
2,374✔
1149
}
2,374✔
1150

1151
bool IncomingTCPConnectionState::readIncomingQuery(const timeval& now, IOState& iostate)
1152
{
48,203✔
1153
  if (!d_lastIOBlocked && (d_state == State::waitingForQuery || d_state == State::readingQuerySize)) {
48,203!
1154
    DEBUGLOG("reading query size");
48,065✔
1155
    d_buffer.resize(sizeof(uint16_t));
48,065✔
1156
    d_readIOsCurrentQuery++;
48,065✔
1157
    iostate = d_handler.tryRead(d_buffer, d_currentPos, sizeof(uint16_t));
48,065✔
1158
    if (d_currentPos > 0) {
48,065✔
1159
      /* if we got at least one byte, we can't go around sending responses */
1160
      d_state = State::readingQuerySize;
43,214✔
1161
    }
43,214✔
1162

1163
    if (iostate == IOState::Done) {
48,065✔
1164
      DEBUGLOG("query size received");
43,210✔
1165
      d_state = State::readingQuery;
43,210✔
1166
      d_querySizeReadTime = now;
43,210✔
1167
      if (d_queriesCount == 0) {
43,210✔
1168
        d_firstQuerySizeReadTime = now;
2,118✔
1169
      }
2,118✔
1170
      d_querySize = d_buffer.at(0) * 256 + d_buffer.at(1);
43,210✔
1171
      if (d_querySize < sizeof(dnsheader)) {
43,210✔
1172
        /* go away */
1173
        terminateClientConnection();
2✔
1174
        return true;
2✔
1175
      }
2✔
1176

1177
      d_buffer.resize(d_querySize);
43,208✔
1178
      d_currentPos = 0;
43,208✔
1179
    }
43,208✔
1180
    else {
4,855✔
1181
      d_lastIOBlocked = true;
4,855✔
1182
    }
4,855✔
1183
  }
48,065✔
1184

1185
  if (!d_lastIOBlocked && d_state == State::readingQuery) {
48,201!
1186
    DEBUGLOG("reading query");
43,346✔
1187
    d_readIOsCurrentQuery++;
43,346✔
1188
    iostate = d_handler.tryRead(d_buffer, d_currentPos, d_querySize);
43,346✔
1189
    if (iostate == IOState::Done) {
43,346✔
1190
      iostate = handleIncomingQueryReceived(now);
43,202✔
1191
    }
43,202✔
1192
    else {
144✔
1193
      d_lastIOBlocked = true;
144✔
1194
    }
144✔
1195
  }
43,346✔
1196

1197
  return false;
48,201✔
1198
}
48,203✔
1199

1200
class HandlingIOGuard
1201
{
1202
public:
1203
  HandlingIOGuard(bool& handlingIO) :
1204
    d_handlingIO(handlingIO)
7,983✔
1205
  {
7,983✔
1206
  }
7,983✔
1207
  HandlingIOGuard(const HandlingIOGuard&) = delete;
1208
  HandlingIOGuard(HandlingIOGuard&&) = delete;
1209
  HandlingIOGuard& operator=(const HandlingIOGuard& rhs) = delete;
1210
  HandlingIOGuard& operator=(HandlingIOGuard&&) = delete;
1211
  ~HandlingIOGuard()
1212
  {
7,983✔
1213
    d_handlingIO = false;
7,983✔
1214
  }
7,983✔
1215

1216
private:
1217
  bool& d_handlingIO;
1218
};
1219

1220
void IncomingTCPConnectionState::handleIO()
1221
{
28,260✔
1222
  // let's make sure we are not already in handleIO() below in the stack:
1223
  // this might happen when we have a response available on the backend socket
1224
  // right after forwarding the query, and then a query waiting for us on the
1225
  // client socket right after forwarding the response, and then a response available
1226
  // on the backend socket right after forwarding the query.. you get the idea.
1227
  if (d_handlingIO) {
28,260✔
1228
    return;
20,277✔
1229
  }
20,277✔
1230
  d_handlingIO = true;
7,983✔
1231
  HandlingIOGuard reentryGuard(d_handlingIO);
7,983✔
1232

1233
  // why do we loop? Because the TLS layer does buffering, and thus can have data ready to read
1234
  // even though the underlying socket is not ready, so we need to actually ask for the data first
1235
  IOState iostate = IOState::Done;
7,983✔
1236
  timeval now{};
7,983✔
1237
  gettimeofday(&now, nullptr);
7,983✔
1238

1239
  do {
48,903✔
1240
    iostate = IOState::Done;
48,903✔
1241
    IOStateGuard ioGuard(d_ioState);
48,903✔
1242

1243
    if (maxConnectionDurationReached(dnsdist::configuration::getCurrentRuntimeConfiguration().d_maxTCPConnectionDuration, now)) {
48,903✔
1244
      vinfolog("Terminating TCP connection from %s because it reached the maximum TCP connection duration", d_ci.remote.toStringWithPort());
1!
1245
      // will be handled by the ioGuard
1246
      // handleNewIOState(state, IOState::Done, fd, handleIOCallback);
1247
      return;
1✔
1248
    }
1✔
1249

1250
    const auto& immutable = dnsdist::configuration::getImmutableConfiguration();
48,902✔
1251
    if (immutable.d_maxTCPReadIOsPerQuery > 0 && d_readIOsCurrentQuery >= immutable.d_maxTCPReadIOsPerQuery) {
48,902✔
1252
      vinfolog("Terminating TCP connection from %s for reaching the maximum number of read IO events per query (%d)", d_ci.remote.toStringWithPort(), immutable.d_maxTCPReadIOsPerQuery);
1!
1253
      dnsdist::IncomingConcurrentTCPConnectionsManager::banClientFor(d_ci.remote, time(nullptr), immutable.d_tcpBanDurationForExceedingMaxReadIOsPerQuery);
1✔
1254
      return;
1✔
1255
    }
1✔
1256

1257
    d_lastIOBlocked = false;
48,901✔
1258

1259
    try {
48,901✔
1260
      if (d_state == State::starting) {
48,901✔
1261
        if (d_ci.cs != nullptr && d_ci.cs->d_enableProxyProtocol && isProxyPayloadOutsideTLS() && expectProxyProtocolFrom(d_ci.remote)) {
2,517!
1262
          d_state = State::readingProxyProtocolHeader;
1✔
1263
          d_buffer.resize(s_proxyProtocolMinimumHeaderSize);
1✔
1264
          d_proxyProtocolNeed = s_proxyProtocolMinimumHeaderSize;
1✔
1265
        }
1✔
1266
        else {
2,516✔
1267
          d_state = State::doingHandshake;
2,516✔
1268
        }
2,516✔
1269
      }
2,517✔
1270

1271
      if (d_state == State::doingHandshake) {
48,901✔
1272
        iostate = handleHandshake(now);
2,885✔
1273
      }
2,885✔
1274

1275
      if (!d_lastIOBlocked && d_state == State::readingProxyProtocolHeader) {
48,901✔
1276
        auto status = handleProxyProtocolPayload();
17✔
1277
        if (status == ProxyProtocolResult::Done) {
17✔
1278
          d_buffer.resize(sizeof(uint16_t));
9✔
1279

1280
          if (isProxyPayloadOutsideTLS()) {
9✔
1281
            d_state = State::doingHandshake;
1✔
1282
            iostate = handleHandshake(now);
1✔
1283
          }
1✔
1284
          else {
8✔
1285
            d_state = State::readingQuerySize;
8✔
1286
          }
8✔
1287
        }
9✔
1288
        else if (status == ProxyProtocolResult::Error) {
8✔
1289
          iostate = IOState::Done;
3✔
1290
        }
3✔
1291
        else {
5✔
1292
          iostate = IOState::NeedRead;
5✔
1293
        }
5✔
1294
      }
17✔
1295

1296
      if (!d_lastIOBlocked && (d_state == State::waitingForQuery || d_state == State::readingQuerySize || d_state == State::readingQuery)) {
48,901✔
1297
        if (readIncomingQuery(now, iostate)) {
48,203✔
1298
          return;
2✔
1299
        }
2✔
1300
      }
48,203✔
1301

1302
      if (!d_lastIOBlocked && d_state == State::sendingResponse) {
48,899✔
1303
        DEBUGLOG("sending response");
13✔
1304
        iostate = d_handler.tryWrite(d_currentResponse.d_buffer, d_currentPos, d_currentResponse.d_buffer.size());
13✔
1305
        if (iostate == IOState::Done) {
13!
1306
          DEBUGLOG("response sent from " << __PRETTY_FUNCTION__);
13✔
1307
          handleResponseSent(d_currentResponse, d_currentResponse.d_buffer.size());
13✔
1308
          d_state = State::idle;
13✔
1309
        }
13✔
1310
        else {
×
1311
          d_lastIOBlocked = true;
×
1312
        }
×
1313
      }
13✔
1314

1315
      if (active() && !d_lastIOBlocked && iostate == IOState::Done && (d_state == State::idle || d_state == State::waitingForQuery)) {
48,899!
1316
        // try sending queued responses
1317
        DEBUGLOG("send responses, if any");
2,756✔
1318
        auto state = shared_from_this();
2,756✔
1319
        iostate = sendQueuedResponses(state, now);
2,756✔
1320

1321
        if (!d_lastIOBlocked && active() && iostate == IOState::Done) {
2,756!
1322
          // if the query has been passed to a backend, or dropped, and the responses have been sent,
1323
          // we can start reading again
1324
          if (canAcceptNewQueries(now)) {
2,754✔
1325
            resetForNewQuery();
217✔
1326
            iostate = IOState::NeedRead;
217✔
1327
          }
217✔
1328
          else {
2,537✔
1329
            d_state = State::idle;
2,537✔
1330
            iostate = IOState::Done;
2,537✔
1331
          }
2,537✔
1332
        }
2,754✔
1333
      }
2,756✔
1334

1335
      if (d_state != State::idle && d_state != State::doingHandshake && d_state != State::readingProxyProtocolHeader && d_state != State::waitingForQuery && d_state != State::readingQuerySize && d_state != State::readingQuery && d_state != State::sendingResponse) {
48,899!
1336
        vinfolog("Unexpected state %d in handleIOCallback", static_cast<int>(d_state));
×
1337
      }
×
1338
    }
48,899✔
1339
    catch (const std::exception& exp) {
48,901✔
1340
      /* most likely an EOF because the other end closed the connection,
1341
         but it might also be a real IO error or something else.
1342
         Let's just drop the connection
1343
      */
1344
      handleExceptionDuringIO(exp);
2,374✔
1345
    }
2,374✔
1346

1347
    if (!active()) {
48,899✔
1348
      DEBUGLOG("state is no longer active");
2,429✔
1349
      return;
2,429✔
1350
    }
2,429✔
1351

1352
    auto sharedPtrToConn = shared_from_this();
46,470✔
1353
    if (iostate == IOState::Done) {
46,470✔
1354
      d_ioState->update(iostate, handleIOCallback, sharedPtrToConn);
2,540✔
1355
    }
2,540✔
1356
    else {
43,930✔
1357
      updateIO(iostate, now);
43,930✔
1358
    }
43,930✔
1359
    ioGuard.release();
46,470✔
1360
  } while ((iostate == IOState::NeedRead || iostate == IOState::NeedWrite) && !d_lastIOBlocked);
46,470✔
1361
}
7,983✔
1362

1363
void IncomingTCPConnectionState::notifyIOError(const struct timeval& now, TCPResponse&& response)
1364
{
61✔
1365
  if (std::this_thread::get_id() != d_creatorThreadID) {
61✔
1366
    /* empty buffer will signal an IO error */
1367
    response.d_buffer.clear();
18✔
1368
    handleCrossProtocolResponse(now, std::move(response));
18✔
1369
    return;
18✔
1370
  }
18✔
1371

1372
  auto sharedPtrToConn = shared_from_this();
43✔
1373
  --sharedPtrToConn->d_currentQueriesCount;
43✔
1374
  sharedPtrToConn->d_hadErrors = true;
43✔
1375

1376
  if (sharedPtrToConn->d_state == State::sendingResponse) {
43✔
1377
    /* if we have responses to send, let's do that first */
1378
  }
2✔
1379
  else if (!sharedPtrToConn->d_queuedResponses.empty()) {
41!
1380
    /* stop reading and send what we have */
1381
    try {
×
1382
      auto iostate = sendQueuedResponses(sharedPtrToConn, now);
×
1383

1384
      if (sharedPtrToConn->active() && iostate != IOState::Done) {
×
1385
        // we need to update the state right away, nobody will do that for us
1386
        updateIO(iostate, now);
×
1387
      }
×
1388
    }
×
1389
    catch (const std::exception& e) {
×
1390
      vinfolog("Exception in notifyIOError: %s", e.what());
×
1391
    }
×
1392
  }
×
1393
  else {
41✔
1394
    // the backend code already tried to reconnect if it was possible
1395
    sharedPtrToConn->terminateClientConnection();
41✔
1396
  }
41✔
1397
}
43✔
1398

1399
static bool processXFRResponse(DNSResponse& dnsResponse)
1400
{
449✔
1401
  const auto& chains = dnsdist::configuration::getCurrentRuntimeConfiguration().d_ruleChains;
449✔
1402
  const auto& xfrRespRuleActions = dnsdist::rules::getResponseRuleChain(chains, dnsdist::rules::ResponseRuleChain::XFRResponseRules);
449✔
1403

1404
  if (!applyRulesToResponse(xfrRespRuleActions, dnsResponse)) {
449!
1405
    return false;
×
1406
  }
×
1407

1408
  if (dnsResponse.isAsynchronous()) {
449!
1409
    return true;
×
1410
  }
×
1411

1412
  if (dnsResponse.ids.d_extendedError) {
449!
1413
    dnsdist::edns::addExtendedDNSError(dnsResponse.getMutableData(), dnsResponse.getMaximumSize(), dnsResponse.ids.d_extendedError->infoCode, dnsResponse.ids.d_extendedError->extraText);
×
1414
  }
×
1415

1416
  return true;
449✔
1417
}
449✔
1418

1419
void IncomingTCPConnectionState::handleXFRResponse(const struct timeval& now, TCPResponse&& response)
1420
{
449✔
1421
  if (std::this_thread::get_id() != d_creatorThreadID) {
449!
1422
    handleCrossProtocolResponse(now, std::move(response));
×
1423
    return;
×
1424
  }
×
1425

1426
  std::shared_ptr<IncomingTCPConnectionState> state = shared_from_this();
449✔
1427
  auto& ids = response.d_idstate;
449✔
1428
  std::shared_ptr<DownstreamState> backend = response.d_ds ? response.d_ds : (response.d_connection ? response.d_connection->getDS() : nullptr);
449!
1429
  DNSResponse dnsResponse(ids, response.d_buffer, backend);
449✔
1430
  dnsResponse.d_incomingTCPState = state;
449✔
1431
  memcpy(&response.d_cleartextDH, dnsResponse.getHeader().get(), sizeof(response.d_cleartextDH));
449✔
1432

1433
  if (!processXFRResponse(dnsResponse)) {
449!
1434
    state->terminateClientConnection();
×
1435
    return;
×
1436
  }
×
1437

1438
  queueResponse(state, now, std::move(response), true);
449✔
1439
}
449✔
1440

1441
void IncomingTCPConnectionState::handleTimeout(std::shared_ptr<IncomingTCPConnectionState>& state, bool write)
1442
{
17✔
1443
  vinfolog("Timeout while %s TCP client %s", (write ? "writing to" : "reading from"), state->d_ci.remote.toStringWithPort());
17!
1444
  DEBUGLOG("client timeout");
17✔
1445
  DEBUGLOG("Processed " << state->d_queriesCount << " queries, current count is " << state->d_currentQueriesCount << ", " << state->d_ownedConnectionsToBackend.size() << " owned connections, " << state->d_queuedResponses.size() << " response queued");
17✔
1446

1447
  if (write || state->d_currentQueriesCount == 0) {
17✔
1448
    ++state->d_ci.cs->tcpClientTimeouts;
11✔
1449
    state->d_ioState.reset();
11✔
1450
  }
11✔
1451
  else {
6✔
1452
    DEBUGLOG("Going idle");
6✔
1453
    /* we still have some queries in flight, let's just stop reading for now */
1454
    state->d_state = State::idle;
6✔
1455
    state->d_ioState->update(IOState::Done, handleIOCallback, state);
6✔
1456
  }
6✔
1457
}
17✔
1458

1459
static void handleIncomingTCPQuery(int pipefd, FDMultiplexer::funcparam_t& param)
1460
{
2,622✔
1461
  (void)pipefd;
2,622✔
1462
  auto* threadData = boost::any_cast<TCPClientThreadData*>(param);
2,622✔
1463

1464
  std::unique_ptr<ConnectionInfo> citmp{nullptr};
2,622✔
1465
  try {
2,622✔
1466
    auto tmp = threadData->queryReceiver.receive();
2,622✔
1467
    if (!tmp) {
2,622!
1468
      return;
×
1469
    }
×
1470
    citmp = std::move(*tmp);
2,622✔
1471
  }
2,622✔
1472
  catch (const std::exception& e) {
2,622✔
1473
    throw std::runtime_error("Error while reading from the TCP query channel: " + std::string(e.what()));
×
1474
  }
×
1475

1476
  g_tcpclientthreads->decrementQueuedCount();
2,622✔
1477

1478
  timeval now{};
2,622✔
1479
  gettimeofday(&now, nullptr);
2,622✔
1480

1481
  if (citmp->cs->dohFrontend) {
2,622✔
1482
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
199✔
1483
    auto state = std::make_shared<IncomingHTTP2Connection>(std::move(*citmp), *threadData, now);
199✔
1484
    state->handleIO();
199✔
1485
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
199✔
1486
  }
199✔
1487
  else {
2,423✔
1488
    auto state = std::make_shared<IncomingTCPConnectionState>(std::move(*citmp), *threadData, now);
2,423✔
1489
    state->handleIO();
2,423✔
1490
  }
2,423✔
1491
}
2,622✔
1492

1493
static void handleCrossProtocolQuery(int pipefd, FDMultiplexer::funcparam_t& param)
1494
{
226✔
1495
  (void)pipefd;
226✔
1496
  auto* threadData = boost::any_cast<TCPClientThreadData*>(param);
226✔
1497

1498
  std::unique_ptr<CrossProtocolQuery> cpq{nullptr};
226✔
1499
  try {
226✔
1500
    auto tmp = threadData->crossProtocolQueryReceiver.receive();
226✔
1501
    if (!tmp) {
226!
1502
      return;
×
1503
    }
×
1504
    cpq = std::move(*tmp);
226✔
1505
  }
226✔
1506
  catch (const std::exception& e) {
226✔
1507
    throw std::runtime_error("Error while reading from the TCP cross-protocol channel: " + std::string(e.what()));
×
1508
  }
×
1509

1510
  timeval now{};
226✔
1511
  gettimeofday(&now, nullptr);
226✔
1512

1513
  std::shared_ptr<TCPQuerySender> tqs = cpq->getTCPQuerySender();
226✔
1514
  auto query = std::move(cpq->query);
226✔
1515
  auto downstreamServer = std::move(cpq->downstream);
226✔
1516

1517
  try {
226✔
1518
    auto downstream = t_downstreamTCPConnectionsManager.getConnectionToDownstream(threadData->mplexer, downstreamServer, now, std::string());
226✔
1519

1520
    prependSizeToTCPQuery(query.d_buffer, query.d_idstate.d_proxyProtocolPayloadSize);
226✔
1521

1522
    vinfolog("Got query for %s|%s from %s (%s, %d bytes), relayed to %s", query.d_idstate.qname.toLogString(), QType(query.d_idstate.qtype).toString(), query.d_idstate.origRemote.toStringWithPort(), query.d_idstate.protocol.toString(), query.d_buffer.size(), downstreamServer->getNameWithAddr());
226✔
1523

1524
    downstream->queueQuery(tqs, std::move(query));
226✔
1525
  }
226✔
1526
  catch (...) {
226✔
1527
    tqs->notifyIOError(now, std::move(query));
×
1528
  }
×
1529
}
226✔
1530

1531
static void handleCrossProtocolResponse(int pipefd, FDMultiplexer::funcparam_t& param)
1532
{
283✔
1533
  (void)pipefd;
283✔
1534
  auto* threadData = boost::any_cast<TCPClientThreadData*>(param);
283✔
1535

1536
  std::unique_ptr<TCPCrossProtocolResponse> cpr{nullptr};
283✔
1537
  try {
283✔
1538
    auto tmp = threadData->crossProtocolResponseReceiver.receive();
283✔
1539
    if (!tmp) {
283!
1540
      return;
×
1541
    }
×
1542
    cpr = std::move(*tmp);
283✔
1543
  }
283✔
1544
  catch (const std::exception& e) {
283✔
1545
    throw std::runtime_error("Error while reading from the TCP cross-protocol response: " + std::string(e.what()));
×
1546
  }
×
1547

1548
  auto& response = *cpr;
283✔
1549

1550
  try {
283✔
1551
    if (response.d_response.d_buffer.empty()) {
283✔
1552
      response.d_state->notifyIOError(response.d_now, std::move(response.d_response));
24✔
1553
    }
24✔
1554
    else if (response.d_response.d_idstate.qtype == QType::AXFR || response.d_response.d_idstate.qtype == QType::IXFR) {
259!
1555
      response.d_state->handleXFRResponse(response.d_now, std::move(response.d_response));
×
1556
    }
×
1557
    else {
259✔
1558
      response.d_state->handleResponse(response.d_now, std::move(response.d_response));
259✔
1559
    }
259✔
1560
  }
283✔
1561
  catch (...) {
283✔
1562
    /* no point bubbling up from there */
1563
  }
×
1564
}
283✔
1565

1566
struct TCPAcceptorParam
1567
{
1568
  // NOLINTNEXTLINE(cppcoreguidelines-avoid-const-or-ref-data-members)
1569
  ClientState& clientState;
1570
  ComboAddress local;
1571
  int socket{-1};
1572
};
1573

1574
static void acceptNewConnection(const TCPAcceptorParam& param, TCPClientThreadData* threadData);
1575

1576
static void scanForTimeouts(const TCPClientThreadData& data, const timeval& now)
1577
{
11,339✔
1578
  auto expiredReadConns = data.mplexer->getTimeouts(now, false);
11,339✔
1579
  for (const auto& cbData : expiredReadConns) {
11,339✔
1580
    if (cbData.second.type() == typeid(std::shared_ptr<IncomingTCPConnectionState>)) {
10!
UNCOV
1581
      auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(cbData.second);
×
UNCOV
1582
      if (cbData.first == state->d_handler.getDescriptor()) {
×
UNCOV
1583
        vinfolog("Timeout (read) from remote TCP client %s", state->d_ci.remote.toStringWithPort());
×
UNCOV
1584
        state->handleTimeout(state, false);
×
UNCOV
1585
      }
×
UNCOV
1586
    }
×
1587
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
10✔
1588
    else if (cbData.second.type() == typeid(std::shared_ptr<IncomingHTTP2Connection>)) {
10!
1589
      auto state = boost::any_cast<std::shared_ptr<IncomingHTTP2Connection>>(cbData.second);
1590
      if (cbData.first == state->d_handler.getDescriptor()) {
×
1591
        vinfolog("Timeout (read) from remote H2 client %s", state->d_ci.remote.toStringWithPort());
×
1592
        std::shared_ptr<IncomingTCPConnectionState> parentState = state;
1593
        state->handleTimeout(parentState, false);
1594
      }
1595
    }
1596
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
10✔
1597
    else if (cbData.second.type() == typeid(std::shared_ptr<TCPConnectionToBackend>)) {
10!
1598
      auto conn = boost::any_cast<std::shared_ptr<TCPConnectionToBackend>>(cbData.second);
10✔
1599
      vinfolog("Timeout (read) from remote backend %s", conn->getBackendName());
10!
1600
      conn->handleTimeout(now, false);
10✔
1601
    }
10✔
1602
  }
10✔
1603

1604
  auto expiredWriteConns = data.mplexer->getTimeouts(now, true);
11,339✔
1605
  for (const auto& cbData : expiredWriteConns) {
11,339!
1606
    if (cbData.second.type() == typeid(std::shared_ptr<IncomingTCPConnectionState>)) {
×
1607
      auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(cbData.second);
×
1608
      if (cbData.first == state->d_handler.getDescriptor()) {
×
1609
        vinfolog("Timeout (write) from remote TCP client %s", state->d_ci.remote.toStringWithPort());
×
1610
        state->handleTimeout(state, true);
×
1611
      }
×
1612
    }
×
1613
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
1614
    else if (cbData.second.type() == typeid(std::shared_ptr<IncomingHTTP2Connection>)) {
×
1615
      auto state = boost::any_cast<std::shared_ptr<IncomingHTTP2Connection>>(cbData.second);
1616
      if (cbData.first == state->d_handler.getDescriptor()) {
×
1617
        vinfolog("Timeout (write) from remote H2 client %s", state->d_ci.remote.toStringWithPort());
×
1618
        std::shared_ptr<IncomingTCPConnectionState> parentState = state;
1619
        state->handleTimeout(parentState, true);
1620
      }
1621
    }
1622
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
1623
    else if (cbData.second.type() == typeid(std::shared_ptr<TCPConnectionToBackend>)) {
×
1624
      auto conn = boost::any_cast<std::shared_ptr<TCPConnectionToBackend>>(cbData.second);
×
1625
      vinfolog("Timeout (write) from remote backend %s", conn->getBackendName());
×
1626
      conn->handleTimeout(now, true);
×
1627
    }
×
1628
  }
×
1629
}
11,339✔
1630

1631
static void dumpTCPStates(const TCPClientThreadData& data)
1632
{
×
1633
  /* just to keep things clean in the output, debug only */
1634
  static std::mutex s_lock;
×
1635
  auto lock = std::scoped_lock(s_lock);
×
1636
  if (g_tcpStatesDumpRequested > 0) {
×
1637
    /* no race here, we took the lock so it can only be increased in the meantime */
1638
    --g_tcpStatesDumpRequested;
×
1639
    infolog("Dumping the TCP states, as requested:");
×
1640
    data.mplexer->runForAllWatchedFDs([](bool isRead, int desc, const FDMultiplexer::funcparam_t& param, struct timeval ttd) {
×
1641
      timeval lnow{};
×
1642
      gettimeofday(&lnow, nullptr);
×
1643
      if (ttd.tv_sec > 0) {
×
1644
        infolog("- Descriptor %d is in %s state, TTD in %d", desc, (isRead ? "read" : "write"), (ttd.tv_sec - lnow.tv_sec));
×
1645
      }
×
1646
      else {
×
1647
        infolog("- Descriptor %d is in %s state, no TTD set", desc, (isRead ? "read" : "write"));
×
1648
      }
×
1649

1650
      if (param.type() == typeid(std::shared_ptr<IncomingTCPConnectionState>)) {
×
1651
        auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(param);
×
1652
        infolog(" - %s", state->toString());
×
1653
      }
×
1654
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
1655
      else if (param.type() == typeid(std::shared_ptr<IncomingHTTP2Connection>)) {
×
1656
        auto state = boost::any_cast<std::shared_ptr<IncomingHTTP2Connection>>(param);
1657
        infolog(" - %s", state->toString());
1658
      }
1659
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
1660
      else if (param.type() == typeid(std::shared_ptr<TCPConnectionToBackend>)) {
×
1661
        auto conn = boost::any_cast<std::shared_ptr<TCPConnectionToBackend>>(param);
×
1662
        infolog(" - %s", conn->toString());
×
1663
      }
×
1664
      else if (param.type() == typeid(TCPClientThreadData*)) {
×
1665
        infolog(" - Worker thread pipe");
×
1666
      }
×
1667
    });
×
1668
    infolog("The TCP/DoT client cache has %d active and %d idle outgoing connections cached", t_downstreamTCPConnectionsManager.getActiveCount(), t_downstreamTCPConnectionsManager.getIdleCount());
×
1669
  }
×
1670
}
×
1671

1672
// NOLINTNEXTLINE(performance-unnecessary-value-param): you are wrong, clang-tidy, go home
1673
static void tcpClientThread(pdns::channel::Receiver<ConnectionInfo>&& queryReceiver, pdns::channel::Receiver<CrossProtocolQuery>&& crossProtocolQueryReceiver, pdns::channel::Receiver<TCPCrossProtocolResponse>&& crossProtocolResponseReceiver, pdns::channel::Sender<TCPCrossProtocolResponse>&& crossProtocolResponseSender, std::vector<ClientState*> tcpAcceptStates)
1674
{
3,550✔
1675
  /* we get launched with a pipe on which we receive file descriptors from clients that we own
1676
     from that point on */
1677

1678
  setThreadName("dnsdist/tcpClie");
3,550✔
1679

1680
  try {
3,550✔
1681
    TCPClientThreadData data;
3,550✔
1682
    data.crossProtocolResponseSender = std::move(crossProtocolResponseSender);
3,550✔
1683
    data.queryReceiver = std::move(queryReceiver);
3,550✔
1684
    data.crossProtocolQueryReceiver = std::move(crossProtocolQueryReceiver);
3,550✔
1685
    data.crossProtocolResponseReceiver = std::move(crossProtocolResponseReceiver);
3,550✔
1686

1687
    data.mplexer->addReadFD(data.queryReceiver.getDescriptor(), handleIncomingTCPQuery, &data);
3,550✔
1688
    data.mplexer->addReadFD(data.crossProtocolQueryReceiver.getDescriptor(), handleCrossProtocolQuery, &data);
3,550✔
1689
    data.mplexer->addReadFD(data.crossProtocolResponseReceiver.getDescriptor(), handleCrossProtocolResponse, &data);
3,550✔
1690

1691
    /* only used in single acceptor mode for now */
1692
    std::vector<TCPAcceptorParam> acceptParams;
3,550✔
1693
    acceptParams.reserve(tcpAcceptStates.size());
3,550✔
1694

1695
    for (auto& state : tcpAcceptStates) {
3,550!
1696
      acceptParams.emplace_back(TCPAcceptorParam{*state, state->local, state->tcpFD});
×
1697
      for (const auto& [addr, socket] : state->d_additionalAddresses) {
×
1698
        acceptParams.emplace_back(TCPAcceptorParam{*state, addr, socket});
×
1699
      }
×
1700
    }
×
1701

1702
    auto acceptCallback = [&data](int socket, FDMultiplexer::funcparam_t& funcparam) {
3,550✔
1703
      (void)socket;
×
1704
      const auto* acceptorParam = boost::any_cast<const TCPAcceptorParam*>(funcparam);
×
1705
      acceptNewConnection(*acceptorParam, &data);
×
1706
    };
×
1707

1708
    for (const auto& param : acceptParams) {
3,550!
1709
      setNonBlocking(param.socket);
×
1710
      data.mplexer->addReadFD(param.socket, acceptCallback, &param);
×
1711
    }
×
1712

1713
    timeval now{};
3,550✔
1714
    gettimeofday(&now, nullptr);
3,550✔
1715
    time_t lastTimeoutScan = now.tv_sec;
3,550✔
1716

1717
    for (;;) {
37,759✔
1718
      data.mplexer->run(&now);
37,759✔
1719

1720
      try {
37,759✔
1721
        t_downstreamTCPConnectionsManager.cleanupClosedConnections(now);
37,759✔
1722
        dnsdist::IncomingConcurrentTCPConnectionsManager::cleanup(time(nullptr));
37,759✔
1723

1724
        if (now.tv_sec > lastTimeoutScan) {
37,759✔
1725
          lastTimeoutScan = now.tv_sec;
11,347✔
1726
          scanForTimeouts(data, now);
11,347✔
1727

1728
          if (g_tcpStatesDumpRequested > 0) {
11,347!
1729
            dumpTCPStates(data);
×
1730
          }
×
1731
        }
11,347✔
1732
      }
37,759✔
1733
      catch (const std::exception& e) {
37,759✔
1734
        warnlog("Error in TCP worker thread: %s", e.what());
×
1735
      }
×
1736
    }
37,759✔
1737
  }
3,550✔
1738
  catch (const std::exception& e) {
3,550✔
1739
    errlog("Fatal error in TCP worker thread: %s", e.what());
×
1740
  }
×
1741
}
3,550✔
1742

1743
static void acceptNewConnection(const TCPAcceptorParam& param, TCPClientThreadData* threadData)
1744
{
3,094✔
1745
  auto& clientState = param.clientState;
3,094✔
1746
  const bool checkACL = clientState.dohFrontend == nullptr || (!clientState.dohFrontend->d_trustForwardedForHeader && clientState.dohFrontend->d_earlyACLDrop);
3,094!
1747
  const int socket = param.socket;
3,094✔
1748
  bool tcpClientCountIncremented = false;
3,094✔
1749
  ComboAddress remote;
3,094✔
1750
  remote.sin4.sin_family = param.local.sin4.sin_family;
3,094✔
1751

1752
  tcpClientCountIncremented = false;
3,094✔
1753
  try {
3,094✔
1754
    socklen_t remlen = remote.getSocklen();
3,094✔
1755
    ConnectionInfo connInfo(&clientState);
3,094✔
1756
#ifdef HAVE_ACCEPT4
3,094✔
1757
    // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
1758
    connInfo.fd = accept4(socket, reinterpret_cast<struct sockaddr*>(&remote), &remlen, SOCK_NONBLOCK);
3,094✔
1759
#else
1760
    // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
1761
    connInfo.fd = accept(socket, reinterpret_cast<struct sockaddr*>(&remote), &remlen);
1762
#endif
1763
    // will be decremented when the ConnectionInfo object is destroyed, no matter the reason
1764
    auto concurrentConnections = ++clientState.tcpCurrentConnections;
3,094✔
1765

1766
    if (connInfo.fd < 0) {
3,094!
1767
      throw std::runtime_error((boost::format("accepting new connection on socket: %s") % stringerror()).str());
×
1768
    }
×
1769

1770
    if (checkACL && !dnsdist::configuration::getCurrentRuntimeConfiguration().d_ACL.match(remote)) {
3,094✔
1771
      ++dnsdist::metrics::g_stats.aclDrops;
9✔
1772
      vinfolog("Dropped TCP connection from %s because of ACL", remote.toStringWithPort());
9✔
1773
      return;
9✔
1774
    }
9✔
1775

1776
    if (clientState.d_tcpConcurrentConnectionsLimit > 0 && concurrentConnections > clientState.d_tcpConcurrentConnectionsLimit) {
3,085✔
1777
      vinfolog("Dropped TCP connection from %s because of concurrent connections limit", remote.toStringWithPort());
3!
1778
      return;
3✔
1779
    }
3✔
1780

1781
    if (concurrentConnections > clientState.tcpMaxConcurrentConnections.load()) {
3,082✔
1782
      clientState.tcpMaxConcurrentConnections.store(concurrentConnections);
578✔
1783
    }
578✔
1784

1785
#ifndef HAVE_ACCEPT4
1786
    if (!setNonBlocking(connInfo.fd)) {
1787
      return;
1788
    }
1789
#endif
1790

1791
    setTCPNoDelay(connInfo.fd); // disable NAGLE
3,082✔
1792

1793
    const auto maxTCPQueuedConnections = dnsdist::configuration::getImmutableConfiguration().d_maxTCPQueuedConnections;
3,082✔
1794
    if (maxTCPQueuedConnections > 0 && g_tcpclientthreads->getQueuedCount() >= maxTCPQueuedConnections) {
3,082!
1795
      vinfolog("Dropping TCP connection from %s because we have too many queued already", remote.toStringWithPort());
×
1796
      return;
×
1797
    }
×
1798

1799
    auto connectionResult = dnsdist::IncomingConcurrentTCPConnectionsManager::accountNewTCPConnection(remote, connInfo.cs->hasTLS());
3,082✔
1800
    if (connectionResult == dnsdist::IncomingConcurrentTCPConnectionsManager::NewConnectionResult::Denied) {
3,082✔
1801
      return;
6✔
1802
    }
6✔
1803
    tcpClientCountIncremented = true;
3,076✔
1804
    if (connectionResult == dnsdist::IncomingConcurrentTCPConnectionsManager::NewConnectionResult::Restricted) {
3,076!
1805
      connInfo.d_restricted = true;
×
1806
    }
×
1807

1808
    vinfolog("Got TCP connection from %s", remote.toStringWithPort());
3,076✔
1809

1810
    connInfo.remote = remote;
3,076✔
1811

1812
    if (threadData == nullptr) {
3,076✔
1813
      if (!g_tcpclientthreads->passConnectionToThread(std::make_unique<ConnectionInfo>(std::move(connInfo)))) {
2,622!
1814
        if (tcpClientCountIncremented) {
×
1815
          dnsdist::IncomingConcurrentTCPConnectionsManager::accountClosedTCPConnection(remote);
×
1816
        }
×
1817
      }
×
1818
    }
2,622✔
1819
    else {
454✔
1820
      timeval now{};
454✔
1821
      gettimeofday(&now, nullptr);
454✔
1822

1823
      if (connInfo.cs->dohFrontend) {
454!
1824
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
1825
        auto state = std::make_shared<IncomingHTTP2Connection>(std::move(connInfo), *threadData, now);
1826
        state->handleIO();
1827
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
1828
      }
×
1829
      else {
454✔
1830
        auto state = std::make_shared<IncomingTCPConnectionState>(std::move(connInfo), *threadData, now);
454✔
1831
        state->handleIO();
454✔
1832
      }
454✔
1833
    }
454✔
1834
  }
3,076✔
1835
  catch (const std::exception& e) {
3,094✔
1836
    errlog("While reading a TCP question: %s", e.what());
×
1837
    if (tcpClientCountIncremented) {
×
1838
      dnsdist::IncomingConcurrentTCPConnectionsManager::accountClosedTCPConnection(remote);
×
1839
    }
×
1840
  }
×
1841
  catch (...) {
3,094✔
1842
  }
×
1843
}
3,094✔
1844

1845
/* spawn as many of these as required, they call Accept on a socket on which they will accept queries, and
1846
   they will hand off to worker threads & spawn more of them if required
1847
*/
1848
#ifndef USE_SINGLE_ACCEPTOR_THREAD
1849
void tcpAcceptorThread(const std::vector<ClientState*>& states)
1850
{
454✔
1851
  setThreadName("dnsdist/tcpAcce");
454✔
1852

1853
  std::vector<TCPAcceptorParam> params;
454✔
1854
  params.reserve(states.size());
454✔
1855

1856
  for (const auto& state : states) {
454✔
1857
    params.emplace_back(TCPAcceptorParam{*state, state->local, state->tcpFD});
454✔
1858
    for (const auto& [addr, socket] : state->d_additionalAddresses) {
454!
1859
      params.emplace_back(TCPAcceptorParam{*state, addr, socket});
×
1860
    }
×
1861
  }
454✔
1862

1863
  if (params.size() == 1) {
454!
1864
    while (true) {
3,548✔
1865
      acceptNewConnection(params.at(0), nullptr);
3,094✔
1866
    }
3,094✔
1867
  }
454✔
1868
  else {
×
1869
    auto acceptCallback = [](int socket, FDMultiplexer::funcparam_t& funcparam) {
×
1870
      (void)socket;
×
1871
      const auto* acceptorParam = boost::any_cast<const TCPAcceptorParam*>(funcparam);
×
1872
      acceptNewConnection(*acceptorParam, nullptr);
×
1873
    };
×
1874

1875
    auto mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent(params.size()));
×
1876
    for (const auto& param : params) {
×
1877
      mplexer->addReadFD(param.socket, acceptCallback, &param);
×
1878
    }
×
1879

1880
    timeval now{};
×
1881
    while (true) {
×
1882
      mplexer->run(&now, -1);
×
1883
    }
×
1884
  }
×
1885
}
454✔
1886
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc