• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

PowerDNS / pdns / 18369417242

09 Oct 2025 07:52AM UTC coverage: 64.136% (+0.04%) from 64.101%
18369417242

push

github

web-flow
Merge pull request #16223 from miodvallat/doctweaks

auth: minor doc tweak

42816 of 101504 branches covered (42.18%)

Branch coverage included in aggregate %.

129913 of 167814 relevant lines covered (77.41%)

5820209.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.73
/pdns/dnsdistdist/dnsdist-tcp.cc
1
/*
2
 * This file is part of PowerDNS or dnsdist.
3
 * Copyright -- PowerDNS.COM B.V. and its contributors
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of version 2 of the GNU General Public License as
7
 * published by the Free Software Foundation.
8
 *
9
 * In addition, for the avoidance of any doubt, permission is granted to
10
 * link this program with OpenSSL and to (re)distribute the binaries
11
 * produced as the result of such linking.
12
 *
13
 * This program is distributed in the hope that it will be useful,
14
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
 * GNU General Public License for more details.
17
 *
18
 * You should have received a copy of the GNU General Public License
19
 * along with this program; if not, write to the Free Software
20
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21
 */
22

23
#include <thread>
24
#include <netinet/tcp.h>
25
#include <queue>
26
#include <boost/format.hpp>
27

28
#include "dnsdist.hh"
29
#include "dnsdist-concurrent-connections.hh"
30
#include "dnsdist-dnsparser.hh"
31
#include "dnsdist-ecs.hh"
32
#include "dnsdist-edns.hh"
33
#include "dnsdist-nghttp2-in.hh"
34
#include "dnsdist-proxy-protocol.hh"
35
#include "dnsdist-rings.hh"
36
#include "dnsdist-tcp.hh"
37
#include "dnsdist-tcp-downstream.hh"
38
#include "dnsdist-downstream-connection.hh"
39
#include "dnsdist-tcp-upstream.hh"
40
#include "dnsparser.hh"
41
#include "dolog.hh"
42
#include "gettime.hh"
43
#include "lock.hh"
44
#include "sstuff.hh"
45
#include "tcpiohandler.hh"
46
#include "tcpiohandler-mplexer.hh"
47
#include "threadname.hh"
48

49
/* TCP: the grand design.
50
   We forward 'messages' between clients and downstream servers. Messages are 65k bytes large, tops.
51
   An answer might theoretically consist of multiple messages (for example, in the case of AXFR), initially
52
   we will not go there.
53

54
   In a sense there is a strong symmetry between UDP and TCP, once a connection to a downstream has been setup.
55
   This symmetry is broken because of head-of-line blocking within TCP though, necessitating additional connections
56
   to guarantee performance.
57

58
   So the idea is to have a 'pool' of available downstream connections, and forward messages to/from them and never queue.
59
   So whenever an answer comes in, we know where it needs to go.
60

61
   Let's start naively.
62
*/
63

64
std::atomic<uint64_t> g_tcpStatesDumpRequested{0};
65

66
IncomingTCPConnectionState::~IncomingTCPConnectionState()
67
{
2,736✔
68
  try {
2,736✔
69
    dnsdist::IncomingConcurrentTCPConnectionsManager::accountClosedTCPConnection(d_ci.remote);
2,736✔
70
  }
2,736✔
71
  catch (...) {
2,736✔
72
    /* in theory it might raise an exception, and we cannot allow it to be uncaught in a dtor */
73
  }
×
74

75
  if (d_ci.cs != nullptr) {
2,738✔
76
    timeval now{};
2,738✔
77
    gettimeofday(&now, nullptr);
2,738✔
78

79
    auto diff = now - d_connectionStartTime;
2,738✔
80
    d_ci.cs->updateTCPMetrics(d_queriesCount, diff.tv_sec * 1000 + diff.tv_usec / 1000, d_queriesCount > 0 ? d_readIOsTotal / d_queriesCount : d_readIOsTotal);
2,738✔
81
  }
2,738✔
82

83
  // would have been done when the object is destroyed anyway,
84
  // but that way we make sure it's done before the ConnectionInfo is destroyed,
85
  // closing the descriptor, instead of relying on the declaration order of the objects in the class
86
  d_handler.close();
2,737✔
87
}
2,737✔
88

89
dnsdist::Protocol IncomingTCPConnectionState::getProtocol() const
90
{
44,015✔
91
  if (d_ci.cs->dohFrontend) {
44,015✔
92
    return dnsdist::Protocol::DoH;
208✔
93
  }
208✔
94
  if (d_handler.isTLS()) {
43,807✔
95
    return dnsdist::Protocol::DoT;
41,189✔
96
  }
41,189✔
97
  return dnsdist::Protocol::DoTCP;
2,618✔
98
}
43,807✔
99

100
size_t IncomingTCPConnectionState::clearAllDownstreamConnections()
101
{
190✔
102
  return t_downstreamTCPConnectionsManager.clear();
190✔
103
}
190✔
104

105
static std::pair<std::shared_ptr<TCPConnectionToBackend>, bool> getOwnedDownstreamConnection(std::map<std::shared_ptr<DownstreamState>, std::deque<std::shared_ptr<TCPConnectionToBackend>>>& ownedConnectionsToBackend, const std::shared_ptr<DownstreamState>& backend, const std::unique_ptr<std::vector<ProxyProtocolValue>>& tlvs)
106
{
22,635✔
107
  bool tlvsMismatch = false;
22,635✔
108
  auto connIt = ownedConnectionsToBackend.find(backend);
22,635✔
109
  if (connIt == ownedConnectionsToBackend.end()) {
22,635✔
110
    DEBUGLOG("no owned connection found for " << backend->getName());
22,598✔
111
    return {nullptr, tlvsMismatch};
22,598✔
112
  }
22,598✔
113

114
  for (auto& conn : connIt->second) {
37✔
115
    if (conn->canBeReused(true)) {
37!
116
      if (conn->matchesTLVs(tlvs)) {
37✔
117
        DEBUGLOG("Got one owned connection accepting more for " << backend->getName());
32✔
118
        conn->setReused();
32✔
119
        ++backend->tcpReusedConnections;
32✔
120
        return {conn, tlvsMismatch};
32✔
121
      }
32✔
122
      DEBUGLOG("Found one connection to " << backend->getName() << " but with different TLV values");
5✔
123
      tlvsMismatch = true;
5✔
124
    }
5✔
125
    DEBUGLOG("not accepting more for " << backend->getName());
5✔
126
  }
5✔
127

128
  return {nullptr, tlvsMismatch};
5✔
129
}
37✔
130

131
bool IncomingTCPConnectionState::isNearTCPLimits() const
132
{
222,147✔
133
  if (d_ci.d_restricted) {
222,147✔
134
    return true;
5✔
135
  }
5✔
136

137
  const auto tcpConnectionsOverloadThreshold = dnsdist::configuration::getImmutableConfiguration().d_tcpConnectionsOverloadThreshold;
222,142✔
138
  if (tcpConnectionsOverloadThreshold == 0) {
222,142✔
139
    return false;
555✔
140
  }
555✔
141

142
  const auto& clientState = d_ci.cs;
221,587✔
143
  if (clientState->d_tcpConcurrentConnectionsLimit > 0) {
221,587✔
144
    auto concurrentConnections = clientState->tcpCurrentConnections.load();
90✔
145
    auto current = (100 * concurrentConnections) / clientState->d_tcpConcurrentConnectionsLimit;
90✔
146
    if (current >= tcpConnectionsOverloadThreshold) {
90✔
147
      return true;
18✔
148
    }
18✔
149
  }
90✔
150

151
  return dnsdist::IncomingConcurrentTCPConnectionsManager::isClientOverThreshold(d_ci.remote);
221,569✔
152
}
221,587✔
153

154
std::shared_ptr<TCPConnectionToBackend> IncomingTCPConnectionState::getDownstreamConnection(std::shared_ptr<DownstreamState>& backend, const std::unique_ptr<std::vector<ProxyProtocolValue>>& tlvs, const struct timeval& now)
155
{
22,635✔
156
  auto [downstream, tlvsMismatch] = getOwnedDownstreamConnection(d_ownedConnectionsToBackend, backend, tlvs);
22,635✔
157

158
  if (!downstream) {
22,635✔
159
    if (backend->d_config.useProxyProtocol && tlvsMismatch) {
22,603✔
160
      clearOwnedDownstreamConnections(backend);
5✔
161
    }
5✔
162

163
    /* we don't have a connection to this backend owned yet, let's get one (it might not be a fresh one, though) */
164
    downstream = t_downstreamTCPConnectionsManager.getConnectionToDownstream(d_threadData.mplexer, backend, now, std::string());
22,603✔
165
    // if we had an existing connection but the TLVs are different, they are likely unique per query so do not bother keeping the connection
166
    // around
167
    if (backend->d_config.useProxyProtocol && !tlvsMismatch) {
22,603✔
168
      registerOwnedDownstreamConnection(downstream);
21✔
169
    }
21✔
170
  }
22,603✔
171

172
  return downstream;
22,635✔
173
}
22,635✔
174

175
static void tcpClientThread(pdns::channel::Receiver<ConnectionInfo>&& queryReceiver, pdns::channel::Receiver<CrossProtocolQuery>&& crossProtocolQueryReceiver, pdns::channel::Receiver<TCPCrossProtocolResponse>&& crossProtocolResponseReceiver, pdns::channel::Sender<TCPCrossProtocolResponse>&& crossProtocolResponseSender, std::vector<ClientState*> tcpAcceptStates);
176

177
TCPClientCollection::TCPClientCollection(size_t maxThreads, std::vector<ClientState*> tcpAcceptStates) :
178
  d_tcpclientthreads(maxThreads), d_maxthreads(maxThreads)
380✔
179
{
380✔
180
  for (size_t idx = 0; idx < maxThreads; idx++) {
3,991✔
181
    addTCPClientThread(tcpAcceptStates);
3,611✔
182
  }
3,611✔
183
}
380✔
184

185
void TCPClientCollection::addTCPClientThread(std::vector<ClientState*>& tcpAcceptStates)
186
{
3,611✔
187
  try {
3,611✔
188
    const auto internalPipeBufferSize = dnsdist::configuration::getImmutableConfiguration().d_tcpInternalPipeBufferSize;
3,611✔
189

190
    auto [queryChannelSender, queryChannelReceiver] = pdns::channel::createObjectQueue<ConnectionInfo>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, internalPipeBufferSize);
3,611✔
191

192
    auto [crossProtocolQueryChannelSender, crossProtocolQueryChannelReceiver] = pdns::channel::createObjectQueue<CrossProtocolQuery>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, internalPipeBufferSize);
3,611✔
193

194
    auto [crossProtocolResponseChannelSender, crossProtocolResponseChannelReceiver] = pdns::channel::createObjectQueue<TCPCrossProtocolResponse>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, internalPipeBufferSize);
3,611✔
195

196
    vinfolog("Adding TCP Client thread");
3,611✔
197

198
    if (d_numthreads >= d_tcpclientthreads.size()) {
3,611!
199
      vinfolog("Adding a new TCP client thread would exceed the vector size (%d/%d), skipping. Consider increasing the maximum amount of TCP client threads with setMaxTCPClientThreads() in the configuration.", d_numthreads.load(), d_tcpclientthreads.size());
×
200
      return;
×
201
    }
×
202

203
    TCPWorkerThread worker(std::move(queryChannelSender), std::move(crossProtocolQueryChannelSender));
3,611✔
204

205
    try {
3,611✔
206
      std::thread clientThread(tcpClientThread, std::move(queryChannelReceiver), std::move(crossProtocolQueryChannelReceiver), std::move(crossProtocolResponseChannelReceiver), std::move(crossProtocolResponseChannelSender), tcpAcceptStates);
3,611✔
207
      clientThread.detach();
3,611✔
208
    }
3,611✔
209
    catch (const std::runtime_error& e) {
3,611✔
210
      errlog("Error creating a TCP thread: %s", e.what());
×
211
      return;
×
212
    }
×
213

214
    d_tcpclientthreads.at(d_numthreads) = std::move(worker);
3,611✔
215
    ++d_numthreads;
3,611✔
216
  }
3,611✔
217
  catch (const std::exception& e) {
3,611✔
218
    errlog("Error creating TCP worker: %s", e.what());
×
219
  }
×
220
}
3,611✔
221

222
std::unique_ptr<TCPClientCollection> g_tcpclientthreads;
223

224
static IOState sendQueuedResponses(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now)
225
{
46,163✔
226
  IOState result = IOState::Done;
46,163✔
227

228
  while (state->active() && !state->d_queuedResponses.empty()) {
89,809✔
229
    DEBUGLOG("queue size is " << state->d_queuedResponses.size() << ", sending the next one");
43,661✔
230
    TCPResponse resp = std::move(state->d_queuedResponses.front());
43,661✔
231
    state->d_queuedResponses.pop_front();
43,661✔
232
    state->d_state = IncomingTCPConnectionState::State::idle;
43,661✔
233
    result = state->sendResponse(now, std::move(resp));
43,661✔
234
    if (result != IOState::Done) {
43,661✔
235
      return result;
15✔
236
    }
15✔
237
  }
43,661✔
238

239
  state->d_state = IncomingTCPConnectionState::State::idle;
46,148✔
240
  return IOState::Done;
46,148✔
241
}
46,163✔
242

243
void IncomingTCPConnectionState::handleResponseSent(TCPResponse& currentResponse, size_t sentBytes)
244
{
43,655✔
245
  if (currentResponse.d_idstate.qtype == QType::AXFR || currentResponse.d_idstate.qtype == QType::IXFR) {
43,655✔
246
    return;
451✔
247
  }
451✔
248

249
  --d_currentQueriesCount;
43,204✔
250

251
  const auto& backend = currentResponse.d_connection ? currentResponse.d_connection->getDS() : currentResponse.d_ds;
43,204✔
252
  if (!currentResponse.d_idstate.selfGenerated && backend) {
43,204!
253
    const auto& ids = currentResponse.d_idstate;
22,706✔
254
    double udiff = ids.queryRealTime.udiff();
22,706✔
255
    vinfolog("Got answer from %s, relayed to %s (%s, %d bytes), took %f us", backend->d_config.remote.toStringWithPort(), ids.origRemote.toStringWithPort(), getProtocol().toString(), sentBytes, udiff);
22,706✔
256

257
    auto backendProtocol = backend->getProtocol();
22,706✔
258
    if (backendProtocol == dnsdist::Protocol::DoUDP && !currentResponse.d_idstate.forwardedOverUDP) {
22,706✔
259
      backendProtocol = dnsdist::Protocol::DoTCP;
2,281✔
260
    }
2,281✔
261
    ::handleResponseSent(ids, udiff, d_ci.remote, backend->d_config.remote, static_cast<unsigned int>(sentBytes), currentResponse.d_cleartextDH, backendProtocol, true);
22,706✔
262
  }
22,706✔
263
  else {
20,498✔
264
    const auto& ids = currentResponse.d_idstate;
20,498✔
265
    ::handleResponseSent(ids, 0., d_ci.remote, ComboAddress(), static_cast<unsigned int>(currentResponse.d_buffer.size()), currentResponse.d_cleartextDH, ids.protocol, false);
20,498✔
266
  }
20,498✔
267

268
  currentResponse.d_buffer.clear();
43,204✔
269
  currentResponse.d_connection.reset();
43,204✔
270
}
43,204✔
271

272
static void prependSizeToTCPQuery(PacketBuffer& buffer, size_t proxyProtocolPayloadSize)
273
{
22,863✔
274
  if (buffer.size() <= proxyProtocolPayloadSize) {
22,863!
275
    throw std::runtime_error("The payload size is smaller or equal to the buffer size");
×
276
  }
×
277

278
  uint16_t queryLen = proxyProtocolPayloadSize > 0 ? (buffer.size() - proxyProtocolPayloadSize) : buffer.size();
22,863✔
279
  const std::array<uint8_t, 2> sizeBytes{static_cast<uint8_t>(queryLen / 256), static_cast<uint8_t>(queryLen % 256)};
22,863✔
280
  /* prepend the size. Yes, this is not the most efficient way but it prevents mistakes
281
     that could occur if we had to deal with the size during the processing,
282
     especially alignment issues */
283
  buffer.insert(buffer.begin() + static_cast<PacketBuffer::iterator::difference_type>(proxyProtocolPayloadSize), sizeBytes.begin(), sizeBytes.end());
22,863✔
284
}
22,863✔
285

286
bool IncomingTCPConnectionState::canAcceptNewQueries(const struct timeval& now)
287
{
46,144✔
288
  if (d_hadErrors) {
46,144✔
289
    DEBUGLOG("not accepting new queries because we encountered some error during the processing already");
2✔
290
    return false;
2✔
291
  }
2✔
292

293
  if (isNearTCPLimits()) {
46,142✔
294
    d_ci.d_restricted = true;
4✔
295
    DEBUGLOG("not accepting new queries because we already near our TCP limits");
4✔
296
    return false;
4✔
297
  }
4✔
298

299
  // for DoH, this is already handled by the underlying library
300
  if (!d_ci.cs->dohFrontend && d_currentQueriesCount >= d_ci.cs->d_maxInFlightQueriesPerConn) {
46,138✔
301
    DEBUGLOG("not accepting new queries because we already have " << d_currentQueriesCount << " out of " << d_ci.cs->d_maxInFlightQueriesPerConn);
2,790✔
302
    return false;
2,790✔
303
  }
2,790✔
304

305
  const auto& currentConfig = dnsdist::configuration::getCurrentRuntimeConfiguration();
43,348✔
306
  if (currentConfig.d_maxTCPQueriesPerConn != 0 && d_queriesCount > currentConfig.d_maxTCPQueriesPerConn) {
43,348✔
307
    vinfolog("not accepting new queries from %s because it reached the maximum number of queries per conn (%d / %d)", d_ci.remote.toStringWithPort(), d_queriesCount, currentConfig.d_maxTCPQueriesPerConn);
6!
308
    return false;
6✔
309
  }
6✔
310

311
  if (maxConnectionDurationReached(currentConfig.d_maxTCPConnectionDuration, now)) {
43,342!
312
    vinfolog("not accepting new queries from %s because it reached the maximum TCP connection duration", d_ci.remote.toStringWithPort());
×
313
    return false;
×
314
  }
×
315

316
  return true;
43,342✔
317
}
43,342✔
318

319
void IncomingTCPConnectionState::resetForNewQuery()
320
{
43,342✔
321
  d_buffer.clear();
43,342✔
322
  d_currentPos = 0;
43,342✔
323
  d_querySize = 0;
43,342✔
324
  d_state = State::waitingForQuery;
43,342✔
325
  d_readIOsTotal += d_readIOsCurrentQuery;
43,342✔
326
  d_readIOsCurrentQuery = 0;
43,342✔
327
}
43,342✔
328

329
boost::optional<timeval> IncomingTCPConnectionState::getClientReadTTD(timeval now) const
330
{
87,561✔
331
  const auto& runtimeConfiguration = dnsdist::configuration::getCurrentRuntimeConfiguration();
87,561✔
332
  if (!isNearTCPLimits() && runtimeConfiguration.d_maxTCPConnectionDuration == 0 && runtimeConfiguration.d_tcpRecvTimeout == 0) {
87,561!
333
    return boost::none;
×
334
  }
×
335

336
  size_t maxTCPConnectionDuration = runtimeConfiguration.d_maxTCPConnectionDuration;
87,561✔
337
  uint16_t tcpRecvTimeout = runtimeConfiguration.d_tcpRecvTimeout;
87,561✔
338
  uint32_t tcpRecvTimeoutUsec = 0U;
87,561✔
339
  if (isNearTCPLimits()) {
87,561✔
340
    constexpr size_t maxTCPConnectionDurationNearLimits = 5U;
8✔
341
    constexpr uint32_t tcpRecvTimeoutUsecNearLimits = 500U * 1000U;
8✔
342
    maxTCPConnectionDuration = runtimeConfiguration.d_maxTCPConnectionDuration != 0 ? std::min(runtimeConfiguration.d_maxTCPConnectionDuration, maxTCPConnectionDurationNearLimits) : maxTCPConnectionDurationNearLimits;
8!
343
    tcpRecvTimeout = 0;
8✔
344
    tcpRecvTimeoutUsec = tcpRecvTimeoutUsecNearLimits;
8✔
345
  }
8✔
346

347
  if (maxTCPConnectionDuration > 0) {
87,561✔
348
    auto elapsed = now.tv_sec - d_connectionStartTime.tv_sec;
519✔
349
    if (elapsed < 0 || (static_cast<size_t>(elapsed) >= maxTCPConnectionDuration)) {
519!
350
      return now;
×
351
    }
×
352
    auto remaining = maxTCPConnectionDuration - elapsed;
519✔
353
    if (!isNearTCPLimits() && (runtimeConfiguration.d_tcpRecvTimeout == 0 || remaining <= static_cast<size_t>(runtimeConfiguration.d_tcpRecvTimeout))) {
519!
354
      now.tv_sec += static_cast<time_t>(remaining);
20✔
355
      return now;
20✔
356
    }
20✔
357
  }
519✔
358

359
  now.tv_sec += static_cast<time_t>(tcpRecvTimeout);
87,541✔
360
  now.tv_usec += tcpRecvTimeoutUsec;
87,541✔
361
  normalizeTV(now);
87,541✔
362
  return now;
87,541✔
363
}
87,561✔
364

365
boost::optional<timeval> IncomingTCPConnectionState::getClientWriteTTD(const timeval& now) const
366
{
23✔
367
  const auto& runtimeConfiguration = dnsdist::configuration::getCurrentRuntimeConfiguration();
23✔
368
  if (runtimeConfiguration.d_maxTCPConnectionDuration == 0 && runtimeConfiguration.d_tcpSendTimeout == 0) {
23!
369
    return boost::none;
×
370
  }
×
371

372
  timeval res(now);
23✔
373

374
  if (runtimeConfiguration.d_maxTCPConnectionDuration > 0) {
23!
375
    auto elapsed = res.tv_sec - d_connectionStartTime.tv_sec;
×
376
    if (elapsed < 0 || static_cast<size_t>(elapsed) >= runtimeConfiguration.d_maxTCPConnectionDuration) {
×
377
      return res;
×
378
    }
×
379
    auto remaining = runtimeConfiguration.d_maxTCPConnectionDuration - elapsed;
×
380
    if (runtimeConfiguration.d_tcpSendTimeout == 0 || remaining <= static_cast<size_t>(runtimeConfiguration.d_tcpSendTimeout)) {
×
381
      res.tv_sec += static_cast<time_t>(remaining);
×
382
      return res;
×
383
    }
×
384
  }
×
385

386
  res.tv_sec += static_cast<time_t>(runtimeConfiguration.d_tcpSendTimeout);
23✔
387
  return res;
23✔
388
}
23✔
389

390
bool IncomingTCPConnectionState::maxConnectionDurationReached(unsigned int maxConnectionDuration, const timeval& now) const
391
{
92,904✔
392
  if (maxConnectionDuration > 0) {
92,904✔
393
    time_t curtime = now.tv_sec;
786✔
394
    unsigned int elapsed = 0;
786✔
395
    if (curtime > d_connectionStartTime.tv_sec) { // To prevent issues when time goes backward
786✔
396
      elapsed = curtime - d_connectionStartTime.tv_sec;
183✔
397
    }
183✔
398
    if (elapsed >= maxConnectionDuration) {
786✔
399
      return true;
1✔
400
    }
1✔
401
  }
786✔
402

403
  return false;
92,903✔
404
}
92,904✔
405

406
void IncomingTCPConnectionState::registerOwnedDownstreamConnection(std::shared_ptr<TCPConnectionToBackend>& conn)
407
{
21✔
408
  const auto& downstream = conn->getDS();
21✔
409
  auto& queue = d_ownedConnectionsToBackend[downstream];
21✔
410
  // how many proxy-protocol enabled connections do we want to keep around?
411
  // - they are only usable for this incoming connection because of the proxy protocol header containing the source and destination addresses and ports
412
  // - if we have TLV values, and they are unique per query, keeping these is useless
413
  // - if there is no, or identical, TLV values, a single outgoing connection is enough if maxInFlight == 1, or if incoming maxInFlight == outgoing maxInFlight
414
  // so it makes sense to keep a few of them around if incoming maxInFlight is greater than outgoing maxInFlight
415

416
  auto incomingMaxInFlightQueriesPerConn = d_ci.cs->d_maxInFlightQueriesPerConn;
21✔
417
  incomingMaxInFlightQueriesPerConn = std::max(incomingMaxInFlightQueriesPerConn, static_cast<size_t>(1U));
21✔
418
  auto outgoingMaxInFlightQueriesPerConn = downstream->d_config.d_maxInFlightQueriesPerConn;
21✔
419
  outgoingMaxInFlightQueriesPerConn = std::max(outgoingMaxInFlightQueriesPerConn, static_cast<size_t>(1U));
21✔
420
  size_t maxCachedOutgoingConnections = std::min(static_cast<size_t>(incomingMaxInFlightQueriesPerConn / outgoingMaxInFlightQueriesPerConn), static_cast<size_t>(5U));
21✔
421

422
  queue.push_front(conn);
21✔
423
  if (queue.size() > maxCachedOutgoingConnections) {
21!
424
    queue.pop_back();
×
425
  }
×
426
}
21✔
427

428
void IncomingTCPConnectionState::clearOwnedDownstreamConnections(const std::shared_ptr<DownstreamState>& downstream)
429
{
5✔
430
  d_ownedConnectionsToBackend.erase(downstream);
5✔
431
}
5✔
432

433
/* called when the buffer has been set and the rules have been processed, and only from handleIO (sometimes indirectly via handleQuery) */
434
IOState IncomingTCPConnectionState::sendResponse(const struct timeval& now, TCPResponse&& response)
435
{
43,517✔
436
  (void)now;
43,517✔
437
  d_state = State::sendingResponse;
43,517✔
438

439
  const auto responseSize = static_cast<uint16_t>(response.d_buffer.size());
43,517✔
440
  const std::array<uint8_t, 2> sizeBytes{static_cast<uint8_t>(responseSize / 256), static_cast<uint8_t>(responseSize % 256)};
43,517✔
441
  /* prepend the size. Yes, this is not the most efficient way but it prevents mistakes
442
     that could occur if we had to deal with the size during the processing,
443
     especially alignment issues */
444
  response.d_buffer.insert(response.d_buffer.begin(), sizeBytes.begin(), sizeBytes.end());
43,517✔
445
  d_currentPos = 0;
43,517✔
446
  d_currentResponse = std::move(response);
43,517✔
447

448
  try {
43,517✔
449
    auto iostate = d_handler.tryWrite(d_currentResponse.d_buffer, d_currentPos, d_currentResponse.d_buffer.size());
43,517✔
450
    if (iostate == IOState::Done) {
43,517✔
451
      DEBUGLOG("response sent from " << __PRETTY_FUNCTION__);
43,498✔
452
      handleResponseSent(d_currentResponse, d_currentResponse.d_buffer.size());
43,498✔
453
      return iostate;
43,498✔
454
    }
43,498✔
455
    d_lastIOBlocked = true;
19✔
456
    DEBUGLOG("partial write");
19✔
457
    return iostate;
19✔
458
  }
43,517✔
459
  catch (const std::exception& e) {
43,517✔
460
    vinfolog("Closing TCP client connection with %s: %s", d_ci.remote.toStringWithPort(), e.what());
4!
461
    DEBUGLOG("Closing TCP client connection: " << e.what());
4✔
462
    ++d_ci.cs->tcpDiedSendingResponse;
4✔
463

464
    terminateClientConnection();
4✔
465

466
    return IOState::Done;
4✔
467
  }
4✔
468
}
43,517✔
469

470
void IncomingTCPConnectionState::terminateClientConnection()
471
{
2,484✔
472
  DEBUGLOG("terminating client connection");
2,484✔
473
  d_queuedResponses.clear();
2,484✔
474
  /* we have already released idle connections that could be reused,
475
     we don't care about the ones still waiting for responses */
476
  for (auto& backend : d_ownedConnectionsToBackend) {
2,484✔
477
    for (auto& conn : backend.second) {
15✔
478
      conn->release(true);
15✔
479
    }
15✔
480
  }
15✔
481
  d_ownedConnectionsToBackend.clear();
2,484✔
482

483
  /* meaning we will no longer be 'active' when the backend
484
     response or timeout comes in */
485
  d_ioState.reset();
2,484✔
486

487
  /* if we do have remaining async descriptors associated with this TLS
488
     connection, we need to defer the destruction of the TLS object until
489
     the engine has reported back, otherwise we have a use-after-free.. */
490
  auto afds = d_handler.getAsyncFDs();
2,484✔
491
  if (afds.empty()) {
2,486✔
492
    d_handler.close();
2,486✔
493
  }
2,486✔
494
  else {
2,147,483,647✔
495
    /* we might already be waiting, but we might also not because sometimes we have already been
496
       notified via the descriptor, not received Async again, but the async job still exists.. */
497
    auto state = shared_from_this();
2,147,483,647✔
498
    for (const auto desc : afds) {
2,147,483,647!
499
      try {
×
500
        state->d_threadData.mplexer->addReadFD(desc, handleAsyncReady, state);
×
501
      }
×
502
      catch (...) {
×
503
      }
×
504
    }
×
505
  }
2,147,483,647✔
506
}
2,484✔
507

508
void IncomingTCPConnectionState::queueResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response, bool fromBackend)
509
{
43,661✔
510
  // queue response
511
  state->d_queuedResponses.emplace_back(std::move(response));
43,661✔
512
  DEBUGLOG("queueing response, state is " << (int)state->d_state << ", queue size is now " << state->d_queuedResponses.size());
43,661✔
513

514
  // when the response comes from a backend, there is a real possibility that we are currently
515
  // idle, and thus not trying to send the response right away would make our ref count go to 0.
516
  // Even if we are waiting for a query, we will not wake up before the new query arrives or a
517
  // timeout occurs
518
  if (state->d_state == State::idle || state->d_state == State::waitingForQuery) {
43,661✔
519
    auto iostate = sendQueuedResponses(state, now);
43,399✔
520

521
    if (iostate == IOState::Done && state->active()) {
43,399✔
522
      if (state->canAcceptNewQueries(now)) {
43,383✔
523
        state->resetForNewQuery();
43,125✔
524
        state->d_state = State::waitingForQuery;
43,125✔
525
        iostate = IOState::NeedRead;
43,125✔
526
      }
43,125✔
527
      else {
258✔
528
        state->d_state = State::idle;
258✔
529
      }
258✔
530
    }
43,383✔
531

532
    // for the same reason we need to update the state right away, nobody will do that for us
533
    if (state->active()) {
43,399✔
534
      state->updateIO(iostate, now);
43,395✔
535
      // if we have not finished reading every available byte, we _need_ to do an actual read
536
      // attempt before waiting for the socket to become readable again, because if there is
537
      // buffered data available the socket might never become readable again.
538
      // This is true as soon as we deal with TLS because TLS records are processed one by
539
      // one and might not match what we see at the application layer, so data might already
540
      // be available in the TLS library's buffers. This is especially true when OpenSSL's
541
      // read-ahead mode is enabled because then it buffers even more than one SSL record
542
      // for performance reasons.
543
      if (fromBackend && !state->d_lastIOBlocked) {
43,395✔
544
        state->handleIO();
22,798✔
545
      }
22,798✔
546
    }
43,395✔
547
  }
43,399✔
548
}
43,661✔
549

550
void IncomingTCPConnectionState::handleAsyncReady([[maybe_unused]] int desc, FDMultiplexer::funcparam_t& param)
551
{
×
552
  auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(param);
×
553

554
  /* If we are here, the async jobs for this SSL* are finished
555
     so we should be able to remove all FDs */
556
  auto afds = state->d_handler.getAsyncFDs();
×
557
  for (const auto afd : afds) {
×
558
    try {
×
559
      state->d_threadData.mplexer->removeReadFD(afd);
×
560
    }
×
561
    catch (...) {
×
562
    }
×
563
  }
×
564

565
  if (state->active()) {
×
566
    /* and now we restart our own I/O state machine */
567
    state->handleIO();
×
568
  }
×
569
  else {
×
570
    /* we were only waiting for the engine to come back,
571
       to prevent a use-after-free */
572
    state->d_handler.close();
×
573
  }
×
574
}
×
575

576
void IncomingTCPConnectionState::updateIOForAsync(std::shared_ptr<IncomingTCPConnectionState>& conn)
577
{
×
578
  auto fds = conn->d_handler.getAsyncFDs();
×
579
  for (const auto desc : fds) {
×
580
    conn->d_threadData.mplexer->addReadFD(desc, handleAsyncReady, conn);
×
581
  }
×
582
  conn->d_ioState->update(IOState::Done, handleIOCallback, conn);
×
583
}
×
584

585
void IncomingTCPConnectionState::updateIO(IOState newState, const struct timeval& now)
586
{
87,137✔
587
  auto sharedPtrToConn = shared_from_this();
87,137✔
588
  if (newState == IOState::Async) {
87,137!
589
    updateIOForAsync(sharedPtrToConn);
×
590
    return;
×
591
  }
×
592

593
  d_ioState->update(newState, handleIOCallback, sharedPtrToConn, newState == IOState::NeedWrite ? getClientWriteTTD(now) : getClientReadTTD(now));
87,137✔
594
}
87,137✔
595

596
/* called from the backend code when a new response has been received */
597
void IncomingTCPConnectionState::handleResponse(const struct timeval& now, TCPResponse&& response)
598
{
22,954✔
599
  if (std::this_thread::get_id() != d_creatorThreadID) {
22,954✔
600
    handleCrossProtocolResponse(now, std::move(response));
133✔
601
    return;
133✔
602
  }
133✔
603

604
  std::shared_ptr<IncomingTCPConnectionState> state = shared_from_this();
22,821✔
605

606
  if (!response.isAsync() && response.d_connection && response.d_connection->getDS() && response.d_connection->getDS()->d_config.useProxyProtocol) {
22,821!
607
    // if we have added a TCP Proxy Protocol payload to a connection, don't release it to the general pool as no one else will be able to use it anyway
608
    if (!response.d_connection->willBeReusable(true)) {
51!
609
      // if it can't be reused even by us, well
610
      const auto connIt = state->d_ownedConnectionsToBackend.find(response.d_connection->getDS());
×
611
      if (connIt != state->d_ownedConnectionsToBackend.end()) {
×
612
        auto& list = connIt->second;
×
613

614
        for (auto it = list.begin(); it != list.end(); ++it) {
×
615
          if (*it == response.d_connection) {
×
616
            try {
×
617
              response.d_connection->release(true);
×
618
            }
×
619
            catch (const std::exception& e) {
×
620
              vinfolog("Error releasing connection: %s", e.what());
×
621
            }
×
622
            list.erase(it);
×
623
            break;
×
624
          }
×
625
        }
×
626
      }
×
627
    }
×
628
  }
51✔
629

630
  if (response.d_buffer.size() < sizeof(dnsheader)) {
22,821✔
631
    state->terminateClientConnection();
2✔
632
    return;
2✔
633
  }
2✔
634

635
  if (!response.isAsync()) {
22,819✔
636
    try {
22,735✔
637
      auto& ids = response.d_idstate;
22,735✔
638
      std::shared_ptr<DownstreamState> backend = response.d_ds ? response.d_ds : (response.d_connection ? response.d_connection->getDS() : nullptr);
22,735!
639
      if (backend == nullptr || !responseContentMatches(response.d_buffer, ids.qname, ids.qtype, ids.qclass, backend, dnsdist::configuration::getCurrentRuntimeConfiguration().d_allowEmptyResponse)) {
22,735!
640
        state->terminateClientConnection();
3✔
641
        return;
3✔
642
      }
3✔
643

644
      if (backend != nullptr) {
22,732!
645
        ++backend->responses;
22,732✔
646
      }
22,732✔
647

648
      DNSResponse dnsResponse(ids, response.d_buffer, backend);
22,732✔
649
      dnsResponse.d_incomingTCPState = state;
22,732✔
650

651
      memcpy(&response.d_cleartextDH, dnsResponse.getHeader().get(), sizeof(response.d_cleartextDH));
22,732✔
652

653
      if (!processResponse(response.d_buffer, dnsResponse, false)) {
22,732✔
654
        state->terminateClientConnection();
6✔
655
        return;
6✔
656
      }
6✔
657

658
      if (dnsResponse.isAsynchronous()) {
22,726✔
659
        /* we are done for now */
660
        return;
80✔
661
      }
80✔
662
    }
22,726✔
663
    catch (const std::exception& e) {
22,735✔
664
      vinfolog("Unexpected exception while handling response from backend: %s", e.what());
4!
665
      state->terminateClientConnection();
4✔
666
      return;
4✔
667
    }
4✔
668
  }
22,735✔
669

670
  ++dnsdist::metrics::g_stats.responses;
22,726✔
671
  ++state->d_ci.cs->responses;
22,726✔
672

673
  queueResponse(state, now, std::move(response), true);
22,726✔
674
}
22,726✔
675

676
struct TCPCrossProtocolResponse
677
{
678
  TCPCrossProtocolResponse(TCPResponse&& response, std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now) :
679
    d_response(std::move(response)), d_state(state), d_now(now)
284✔
680
  {
284✔
681
  }
284✔
682
  TCPCrossProtocolResponse(const TCPCrossProtocolResponse&) = delete;
683
  TCPCrossProtocolResponse& operator=(const TCPCrossProtocolResponse&) = delete;
684
  TCPCrossProtocolResponse(TCPCrossProtocolResponse&&) = delete;
685
  TCPCrossProtocolResponse& operator=(TCPCrossProtocolResponse&&) = delete;
686
  ~TCPCrossProtocolResponse() = default;
284✔
687

688
  TCPResponse d_response;
689
  std::shared_ptr<IncomingTCPConnectionState> d_state;
690
  struct timeval d_now;
691
};
692

693
class TCPCrossProtocolQuery : public CrossProtocolQuery
694
{
695
public:
696
  TCPCrossProtocolQuery(PacketBuffer&& buffer, InternalQueryState&& ids, std::shared_ptr<DownstreamState> backend, std::shared_ptr<IncomingTCPConnectionState> sender) :
697
    CrossProtocolQuery(InternalQuery(std::move(buffer), std::move(ids)), backend), d_sender(std::move(sender))
231✔
698
  {
231✔
699
  }
231✔
700
  TCPCrossProtocolQuery(const TCPCrossProtocolQuery&) = delete;
701
  TCPCrossProtocolQuery& operator=(const TCPCrossProtocolQuery&) = delete;
702
  TCPCrossProtocolQuery(TCPCrossProtocolQuery&&) = delete;
703
  TCPCrossProtocolQuery& operator=(TCPCrossProtocolQuery&&) = delete;
704
  ~TCPCrossProtocolQuery() override = default;
231✔
705

706
  std::shared_ptr<TCPQuerySender> getTCPQuerySender() override
707
  {
208✔
708
    return d_sender;
208✔
709
  }
208✔
710

711
  DNSQuestion getDQ() override
712
  {
178✔
713
    auto& ids = query.d_idstate;
178✔
714
    DNSQuestion dnsQuestion(ids, query.d_buffer);
178✔
715
    dnsQuestion.d_incomingTCPState = d_sender;
178✔
716
    return dnsQuestion;
178✔
717
  }
178✔
718

719
  DNSResponse getDR() override
720
  {
72✔
721
    auto& ids = query.d_idstate;
72✔
722
    DNSResponse dnsResponse(ids, query.d_buffer, downstream);
72✔
723
    dnsResponse.d_incomingTCPState = d_sender;
72✔
724
    return dnsResponse;
72✔
725
  }
72✔
726

727
private:
728
  std::shared_ptr<IncomingTCPConnectionState> d_sender;
729
};
730

731
std::unique_ptr<CrossProtocolQuery> IncomingTCPConnectionState::getCrossProtocolQuery(PacketBuffer&& query, InternalQueryState&& state, const std::shared_ptr<DownstreamState>& backend)
732
{
4✔
733
  return std::make_unique<TCPCrossProtocolQuery>(std::move(query), std::move(state), backend, shared_from_this());
4✔
734
}
4✔
735

736
std::unique_ptr<CrossProtocolQuery> getTCPCrossProtocolQueryFromDQ(DNSQuestion& dnsQuestion)
737
{
197✔
738
  auto state = dnsQuestion.getIncomingTCPState();
197✔
739
  if (!state) {
197!
740
    throw std::runtime_error("Trying to create a TCP cross protocol query without a valid TCP state");
×
741
  }
×
742

743
  dnsQuestion.ids.origID = dnsQuestion.getHeader()->id;
197✔
744
  return std::make_unique<TCPCrossProtocolQuery>(std::move(dnsQuestion.getMutableData()), std::move(dnsQuestion.ids), nullptr, std::move(state));
197✔
745
}
197✔
746

747
void IncomingTCPConnectionState::handleCrossProtocolResponse(const struct timeval& now, TCPResponse&& response)
748
{
284✔
749
  std::shared_ptr<IncomingTCPConnectionState> state = shared_from_this();
284✔
750
  try {
284✔
751
    auto ptr = std::make_unique<TCPCrossProtocolResponse>(std::move(response), state, now);
284✔
752
    if (!state->d_threadData.crossProtocolResponseSender.send(std::move(ptr))) {
284!
753
      ++dnsdist::metrics::g_stats.tcpCrossProtocolResponsePipeFull;
×
754
      vinfolog("Unable to pass a cross-protocol response to the TCP worker thread because the pipe is full");
×
755
    }
×
756
  }
284✔
757
  catch (const std::exception& e) {
284✔
758
    vinfolog("Unable to pass a cross-protocol response to the TCP worker thread because we couldn't write to the pipe: %s", stringerror());
×
759
  }
×
760
}
284✔
761

762
IncomingTCPConnectionState::QueryProcessingResult IncomingTCPConnectionState::handleQuery(PacketBuffer&& queryIn, const struct timeval& now, std::optional<int32_t> streamID)
763
{
43,375✔
764
  auto query = std::move(queryIn);
43,375✔
765
  if (query.size() < sizeof(dnsheader)) {
43,375✔
766
    ++dnsdist::metrics::g_stats.nonCompliantQueries;
4✔
767
    ++d_ci.cs->nonCompliantQueries;
4✔
768
    return QueryProcessingResult::TooSmall;
4✔
769
  }
4✔
770

771
  ++d_queriesCount;
43,371✔
772
  ++d_ci.cs->queries;
43,371✔
773
  ++dnsdist::metrics::g_stats.queries;
43,371✔
774

775
  if (d_handler.isTLS()) {
43,371✔
776
    auto tlsVersion = d_handler.getTLSVersion();
41,276✔
777
    switch (tlsVersion) {
41,276✔
778
    case LibsslTLSVersion::TLS10:
×
779
      ++d_ci.cs->tls10queries;
×
780
      break;
×
781
    case LibsslTLSVersion::TLS11:
×
782
      ++d_ci.cs->tls11queries;
×
783
      break;
×
784
    case LibsslTLSVersion::TLS12:
14✔
785
      ++d_ci.cs->tls12queries;
14✔
786
      break;
14✔
787
    case LibsslTLSVersion::TLS13:
41,262✔
788
      ++d_ci.cs->tls13queries;
41,262✔
789
      break;
41,262✔
790
    default:
×
791
      ++d_ci.cs->tlsUnknownqueries;
×
792
    }
41,276✔
793
  }
41,276✔
794

795
  auto state = shared_from_this();
43,371✔
796
  InternalQueryState ids;
43,371✔
797
  ids.origDest = d_proxiedDestination;
43,371✔
798
  ids.origRemote = d_proxiedRemote;
43,371✔
799
  ids.cs = d_ci.cs;
43,371✔
800
  ids.queryRealTime.start();
43,371✔
801
  if (streamID) {
43,371✔
802
    ids.d_streamID = *streamID;
161✔
803
  }
161✔
804

805
  auto dnsCryptResponse = checkDNSCryptQuery(*d_ci.cs, query, ids.dnsCryptQuery, ids.queryRealTime.d_start.tv_sec, true);
43,371✔
806
  if (dnsCryptResponse) {
43,371!
807
    TCPResponse response;
×
808
    d_state = State::idle;
×
809
    ++d_currentQueriesCount;
×
810
    queueResponse(state, now, std::move(response), false);
×
811
    return QueryProcessingResult::SelfAnswered;
×
812
  }
×
813

814
  {
43,371✔
815
    /* this pointer will be invalidated the second the buffer is resized, don't hold onto it! */
816
    const dnsheader_aligned dnsHeader(query.data());
43,371✔
817
    if (!checkQueryHeaders(*dnsHeader, *d_ci.cs)) {
43,371✔
818
      return QueryProcessingResult::InvalidHeaders;
5✔
819
    }
5✔
820

821
    if (dnsHeader->qdcount == 0) {
43,366✔
822
      TCPResponse response;
3✔
823
      auto queryID = dnsHeader->id;
3✔
824
      dnsdist::PacketMangling::editDNSHeaderFromPacket(query, [](dnsheader& header) {
3✔
825
        header.rcode = RCode::NotImp;
3✔
826
        header.qr = true;
3✔
827
        return true;
3✔
828
      });
3✔
829
      response.d_idstate = std::move(ids);
3✔
830
      response.d_idstate.origID = queryID;
3✔
831
      response.d_idstate.selfGenerated = true;
3✔
832
      response.d_buffer = std::move(query);
3✔
833
      d_state = State::idle;
3✔
834
      ++d_currentQueriesCount;
3✔
835
      queueResponse(state, now, std::move(response), false);
3✔
836
      return QueryProcessingResult::SelfAnswered;
3✔
837
    }
3✔
838
  }
43,366✔
839

840
  // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast
841
  ids.qname = DNSName(reinterpret_cast<const char*>(query.data()), static_cast<int>(query.size()), sizeof(dnsheader), false, &ids.qtype, &ids.qclass);
43,363✔
842
  ids.protocol = getProtocol();
43,363✔
843
  if (ids.dnsCryptQuery) {
43,363✔
844
    ids.protocol = dnsdist::Protocol::DNSCryptTCP;
16✔
845
  }
16✔
846

847
  DNSQuestion dnsQuestion(ids, query);
43,363✔
848
  dnsdist::PacketMangling::editDNSHeaderFromPacket(dnsQuestion.getMutableData(), [&ids](dnsheader& header) {
43,363✔
849
    const uint16_t* flags = getFlagsFromDNSHeader(&header);
43,361✔
850
    ids.origFlags = *flags;
43,361✔
851
    return true;
43,361✔
852
  });
43,361✔
853
  dnsQuestion.d_incomingTCPState = state;
43,363✔
854
  dnsQuestion.sni = d_handler.getServerNameIndication();
43,363✔
855

856
  if (d_proxyProtocolValues) {
43,363✔
857
    /* we need to copy them, because the next queries received on that connection will
858
       need to get the _unaltered_ values */
859
    dnsQuestion.proxyProtocolValues = make_unique<std::vector<ProxyProtocolValue>>(*d_proxyProtocolValues);
34✔
860
  }
34✔
861

862
  if (dnsQuestion.ids.qtype == QType::AXFR || dnsQuestion.ids.qtype == QType::IXFR) {
43,363✔
863
    dnsQuestion.ids.skipCache = true;
27✔
864
  }
27✔
865

866
  if (forwardViaUDPFirst()) {
43,363✔
867
    // if there was no EDNS, we add it with a large buffer size
868
    // so we can use UDP to talk to the backend.
869
    const dnsheader_aligned dnsHeader(query.data());
155✔
870
    if (dnsHeader->arcount == 0U) {
155✔
871
      if (addEDNS(query, 4096, false, 4096, 0)) {
147!
872
        dnsQuestion.ids.ednsAdded = true;
147✔
873
      }
147✔
874
    }
147✔
875
  }
155✔
876

877
  if (streamID) {
43,363✔
878
    auto unit = getDOHUnit(*streamID);
155✔
879
    if (unit) {
155!
880
      dnsQuestion.ids.du = std::move(unit);
155✔
881
    }
155✔
882
  }
155✔
883

884
  std::shared_ptr<DownstreamState> backend;
43,363✔
885
  auto result = processQuery(dnsQuestion, backend);
43,363✔
886

887
  if (result == ProcessQueryResult::Asynchronous) {
43,363✔
888
    /* we are done for now */
889
    ++d_currentQueriesCount;
108✔
890
    return QueryProcessingResult::Asynchronous;
108✔
891
  }
108✔
892

893
  if (streamID) {
43,255✔
894
    restoreDOHUnit(std::move(dnsQuestion.ids.du));
119✔
895
  }
119✔
896

897
  if (result == ProcessQueryResult::Drop) {
43,255✔
898
    return QueryProcessingResult::Dropped;
39✔
899
  }
39✔
900

901
  // the buffer might have been invalidated by now
902
  uint16_t queryID{0};
43,216✔
903
  {
43,216✔
904
    const auto dnsHeader = dnsQuestion.getHeader();
43,216✔
905
    queryID = dnsHeader->id;
43,216✔
906
  }
43,216✔
907

908
  if (result == ProcessQueryResult::SendAnswer) {
43,216✔
909
    TCPResponse response;
20,483✔
910
    {
20,483✔
911
      const auto dnsHeader = dnsQuestion.getHeader();
20,483✔
912
      memcpy(&response.d_cleartextDH, dnsHeader.get(), sizeof(response.d_cleartextDH));
20,483✔
913
    }
20,483✔
914
    response.d_idstate = std::move(ids);
20,483✔
915
    response.d_idstate.origID = queryID;
20,483✔
916
    response.d_idstate.selfGenerated = true;
20,483✔
917
    response.d_idstate.cs = d_ci.cs;
20,483✔
918
    response.d_buffer = std::move(query);
20,483✔
919

920
    d_state = State::idle;
20,483✔
921
    ++d_currentQueriesCount;
20,483✔
922
    queueResponse(state, now, std::move(response), false);
20,483✔
923
    return QueryProcessingResult::SelfAnswered;
20,483✔
924
  }
20,483✔
925

926
  if (result != ProcessQueryResult::PassToBackend || backend == nullptr) {
22,733!
927
    return QueryProcessingResult::NoBackend;
×
928
  }
×
929

930
  dnsQuestion.ids.origID = queryID;
22,733✔
931

932
  ++d_currentQueriesCount;
22,733✔
933

934
  std::string proxyProtocolPayload;
22,733✔
935
  if (backend->isDoH()) {
22,733✔
936
    vinfolog("Got query for %s|%s from %s (%s, %d bytes), relayed to %s", ids.qname.toLogString(), QType(ids.qtype).toString(), d_proxiedRemote.toStringWithPort(), getProtocol().toString(), query.size(), backend->getNameWithAddr());
30✔
937

938
    /* we need to do this _before_ creating the cross protocol query because
939
       after that the buffer will have been moved */
940
    if (backend->d_config.useProxyProtocol) {
30✔
941
      proxyProtocolPayload = getProxyProtocolPayload(dnsQuestion);
1✔
942
    }
1✔
943

944
    auto cpq = std::make_unique<TCPCrossProtocolQuery>(std::move(query), std::move(ids), backend, state);
30✔
945
    cpq->query.d_proxyProtocolPayload = std::move(proxyProtocolPayload);
30✔
946

947
    backend->passCrossProtocolQuery(std::move(cpq));
30✔
948
    return QueryProcessingResult::Forwarded;
30✔
949
  }
30✔
950
  if (!backend->isTCPOnly() && forwardViaUDPFirst()) {
22,703✔
951
    if (streamID) {
65!
952
      auto unit = getDOHUnit(*streamID);
65✔
953
      if (unit) {
65!
954
        dnsQuestion.ids.du = std::move(unit);
65✔
955
      }
65✔
956
    }
65✔
957
    if (assignOutgoingUDPQueryToBackend(backend, queryID, dnsQuestion, query)) {
65✔
958
      return QueryProcessingResult::Forwarded;
64✔
959
    }
64✔
960
    restoreDOHUnit(std::move(dnsQuestion.ids.du));
1✔
961
    // fallback to the normal flow
962
  }
1✔
963

964
  prependSizeToTCPQuery(query, 0);
22,639✔
965

966
  auto downstreamConnection = getDownstreamConnection(backend, dnsQuestion.proxyProtocolValues, now);
22,639✔
967

968
  if (backend->d_config.useProxyProtocol) {
22,639✔
969
    /* if we ever sent a TLV over a connection, we can never go back */
970
    if (!d_proxyProtocolPayloadHasTLV) {
58✔
971
      d_proxyProtocolPayloadHasTLV = dnsQuestion.proxyProtocolValues && !dnsQuestion.proxyProtocolValues->empty();
34!
972
    }
34✔
973

974
    proxyProtocolPayload = getProxyProtocolPayload(dnsQuestion);
58✔
975
  }
58✔
976

977
  if (dnsQuestion.proxyProtocolValues) {
22,639✔
978
    downstreamConnection->setProxyProtocolValuesSent(std::move(dnsQuestion.proxyProtocolValues));
31✔
979
  }
31✔
980

981
  TCPQuery tcpquery(std::move(query), std::move(ids));
22,639✔
982
  tcpquery.d_proxyProtocolPayload = std::move(proxyProtocolPayload);
22,639✔
983

984
  vinfolog("Got query for %s|%s from %s (%s, %d bytes), relayed to %s", tcpquery.d_idstate.qname.toLogString(), QType(tcpquery.d_idstate.qtype).toString(), d_proxiedRemote.toStringWithPort(), getProtocol().toString(), tcpquery.d_buffer.size(), backend->getNameWithAddr());
22,639✔
985
  std::shared_ptr<TCPQuerySender> incoming = state;
22,639✔
986
  downstreamConnection->queueQuery(incoming, std::move(tcpquery));
22,639✔
987
  return QueryProcessingResult::Forwarded;
22,639✔
988
}
22,703✔
989

990
void IncomingTCPConnectionState::handleIOCallback(int desc, FDMultiplexer::funcparam_t& param)
991
{
3,001✔
992
  auto conn = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(param);
3,001✔
993
  if (desc != conn->d_handler.getDescriptor()) {
3,001!
994
    // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-array-to-pointer-decay): __PRETTY_FUNCTION__ is fine
995
    throw std::runtime_error("Unexpected socket descriptor " + std::to_string(desc) + " received in " + std::string(__PRETTY_FUNCTION__) + ", expected " + std::to_string(conn->d_handler.getDescriptor()));
×
996
  }
×
997

998
  conn->handleIO();
3,001✔
999
}
3,001✔
1000

1001
void IncomingTCPConnectionState::handleHandshakeDone(const struct timeval& now)
1002
{
2,737✔
1003
  if (d_handler.isTLS()) {
2,737✔
1004
    if (!d_handler.hasTLSSessionBeenResumed()) {
666✔
1005
      ++d_ci.cs->tlsNewSessions;
430✔
1006
      dnsdist::IncomingConcurrentTCPConnectionsManager::accountTLSNewSession(d_ci.remote);
430✔
1007
    }
430✔
1008
    else {
236✔
1009
      ++d_ci.cs->tlsResumptions;
236✔
1010
      dnsdist::IncomingConcurrentTCPConnectionsManager::accountTLSResumedSession(d_ci.remote);
236✔
1011
    }
236✔
1012
    if (d_handler.getResumedFromInactiveTicketKey()) {
666✔
1013
      ++d_ci.cs->tlsInactiveTicketKey;
8✔
1014
    }
8✔
1015
    if (d_handler.getUnknownTicketKey()) {
666✔
1016
      ++d_ci.cs->tlsUnknownTicketKey;
6✔
1017
    }
6✔
1018
  }
666✔
1019

1020
  d_handshakeDoneTime = now;
2,737✔
1021
}
2,737✔
1022

1023
IncomingTCPConnectionState::ProxyProtocolResult IncomingTCPConnectionState::handleProxyProtocolPayload()
1024
{
19✔
1025
  do {
32✔
1026
    DEBUGLOG("reading proxy protocol header");
32✔
1027
    auto iostate = d_handler.tryRead(d_buffer, d_currentPos, d_proxyProtocolNeed, false, isProxyPayloadOutsideTLS());
32✔
1028
    if (iostate == IOState::Done) {
32✔
1029
      d_buffer.resize(d_currentPos);
27✔
1030
      ssize_t remaining = isProxyHeaderComplete(d_buffer);
27✔
1031
      if (remaining == 0) {
27✔
1032
        vinfolog("Unable to consume proxy protocol header in packet from TCP client %s", d_ci.remote.toStringWithPort());
3!
1033
        ++dnsdist::metrics::g_stats.proxyProtocolInvalid;
3✔
1034
        return ProxyProtocolResult::Error;
3✔
1035
      }
3✔
1036
      if (remaining < 0) {
24✔
1037
        d_proxyProtocolNeed += -remaining;
13✔
1038
        d_buffer.resize(d_currentPos + d_proxyProtocolNeed);
13✔
1039
        /* we need to keep reading, since we might have buffered data */
1040
      }
13✔
1041
      else {
11✔
1042
        /* proxy header received */
1043
        std::vector<ProxyProtocolValue> proxyProtocolValues;
11✔
1044
        if (!handleProxyProtocol(d_ci.remote, true, dnsdist::configuration::getCurrentRuntimeConfiguration().d_ACL, d_buffer, d_proxiedRemote, d_proxiedDestination, proxyProtocolValues)) {
11!
1045
          vinfolog("Error handling the Proxy Protocol received from TCP client %s", d_ci.remote.toStringWithPort());
×
1046
          return ProxyProtocolResult::Error;
×
1047
        }
×
1048

1049
        if (!proxyProtocolValues.empty()) {
11✔
1050
          d_proxyProtocolValues = make_unique<std::vector<ProxyProtocolValue>>(std::move(proxyProtocolValues));
9✔
1051
        }
9✔
1052

1053
        d_currentPos = 0;
11✔
1054
        d_proxyProtocolNeed = 0;
11✔
1055
        d_buffer.clear();
11✔
1056
        return ProxyProtocolResult::Done;
11✔
1057
      }
11✔
1058
    }
24✔
1059
    else {
5✔
1060
      d_lastIOBlocked = true;
5✔
1061
    }
5✔
1062
  } while (active() && !d_lastIOBlocked);
32✔
1063

1064
  return ProxyProtocolResult::Reading;
5✔
1065
}
19✔
1066

1067
IOState IncomingTCPConnectionState::handleHandshake(const struct timeval& now)
1068
{
2,902✔
1069
  DEBUGLOG("doing handshake");
2,902✔
1070
  auto iostate = d_handler.tryHandshake();
2,902✔
1071
  if (iostate == IOState::Done) {
2,902✔
1072
    DEBUGLOG("handshake done");
2,530✔
1073
    handleHandshakeDone(now);
2,530✔
1074

1075
    if (d_ci.cs != nullptr && d_ci.cs->d_enableProxyProtocol && !isProxyPayloadOutsideTLS() && expectProxyProtocolFrom(d_ci.remote)) {
2,530!
1076
      d_state = State::readingProxyProtocolHeader;
16✔
1077
      d_buffer.resize(s_proxyProtocolMinimumHeaderSize);
16✔
1078
      d_proxyProtocolNeed = s_proxyProtocolMinimumHeaderSize;
16✔
1079
    }
16✔
1080
    else {
2,514✔
1081
      d_state = State::readingQuerySize;
2,514✔
1082
    }
2,514✔
1083
  }
2,530✔
1084
  else {
372✔
1085
    d_lastIOBlocked = true;
372✔
1086
  }
372✔
1087

1088
  return iostate;
2,902✔
1089
}
2,902✔
1090

1091
IOState IncomingTCPConnectionState::handleIncomingQueryReceived(const struct timeval& now)
1092
{
43,210✔
1093
  DEBUGLOG("query received");
43,210✔
1094
  d_buffer.resize(d_querySize);
43,210✔
1095

1096
  d_state = State::idle;
43,210✔
1097
  auto processingResult = handleQuery(std::move(d_buffer), now, std::nullopt);
43,210✔
1098
  switch (processingResult) {
43,210✔
1099
  case QueryProcessingResult::TooSmall:
×
1100
    /* fall-through */
1101
  case QueryProcessingResult::InvalidHeaders:
3✔
1102
    /* fall-through */
1103
  case QueryProcessingResult::Dropped:
37✔
1104
    /* fall-through */
1105
  case QueryProcessingResult::NoBackend:
37!
1106
    terminateClientConnection();
37✔
1107
    ;
37✔
1108
  default:
43,206✔
1109
    break;
43,206✔
1110
  }
43,210✔
1111

1112
  /* the state might have been updated in the meantime, we don't want to override it
1113
     in that case */
1114
  if (active() && d_state != State::idle) {
43,206✔
1115
    if (d_ioState->isWaitingForRead()) {
40,654✔
1116
      return IOState::NeedRead;
40,650✔
1117
    }
40,650✔
1118
    if (d_ioState->isWaitingForWrite()) {
4!
1119
      return IOState::NeedWrite;
4✔
1120
    }
4✔
1121
    return IOState::Done;
×
1122
  }
4✔
1123
  return IOState::Done;
2,552✔
1124
};
43,206✔
1125

1126
void IncomingTCPConnectionState::handleExceptionDuringIO(const std::exception& exp)
1127
{
2,386✔
1128
  if (d_state == State::idle || d_state == State::waitingForQuery) {
2,386✔
1129
    /* no need to increase any counters in that case, the client is simply done with us */
1130
  }
1,985✔
1131
  else if (d_state == State::doingHandshake || d_state == State::readingProxyProtocolHeader || d_state == State::waitingForQuery || d_state == State::readingQuerySize || d_state == State::readingQuery) {
401!
1132
    ++d_ci.cs->tcpDiedReadingQuery;
401✔
1133
  }
401✔
1134
  else if (d_state == State::sendingResponse) {
×
1135
    /* unlikely to happen here, the exception should be handled in sendResponse() */
1136
    ++d_ci.cs->tcpDiedSendingResponse;
×
1137
  }
×
1138

1139
  if (d_ioState->isWaitingForWrite() || d_queriesCount == 0) {
2,387✔
1140
    DEBUGLOG("Got an exception while handling TCP query: " << exp.what());
401✔
1141
    vinfolog("Got an exception while handling (%s) TCP query from %s: %s", (d_ioState->isWaitingForRead() ? "reading" : "writing"), d_ci.remote.toStringWithPort(), exp.what());
401!
1142
  }
401✔
1143
  else {
1,985✔
1144
    vinfolog("Closing TCP client connection with %s: %s", d_ci.remote.toStringWithPort(), exp.what());
1,985✔
1145
    DEBUGLOG("Closing TCP client connection: " << exp.what());
1,985✔
1146
  }
1,985✔
1147
  /* remove this FD from the IO multiplexer */
1148
  terminateClientConnection();
2,386✔
1149
}
2,386✔
1150

1151
bool IncomingTCPConnectionState::readIncomingQuery(const timeval& now, IOState& iostate)
1152
{
48,230✔
1153
  if (!d_lastIOBlocked && (d_state == State::waitingForQuery || d_state == State::readingQuerySize)) {
48,230!
1154
    DEBUGLOG("reading query size");
48,084✔
1155
    d_buffer.resize(sizeof(uint16_t));
48,084✔
1156
    d_readIOsCurrentQuery++;
48,084✔
1157
    iostate = d_handler.tryRead(d_buffer, d_currentPos, sizeof(uint16_t));
48,084✔
1158
    if (d_currentPos > 0) {
48,084✔
1159
      /* if we got at least one byte, we can't go around sending responses */
1160
      d_state = State::readingQuerySize;
43,222✔
1161
    }
43,222✔
1162

1163
    if (iostate == IOState::Done) {
48,084✔
1164
      DEBUGLOG("query size received");
43,218✔
1165
      d_state = State::readingQuery;
43,218✔
1166
      d_querySizeReadTime = now;
43,218✔
1167
      if (d_queriesCount == 0) {
43,218✔
1168
        d_firstQuerySizeReadTime = now;
2,126✔
1169
      }
2,126✔
1170
      d_querySize = d_buffer.at(0) * 256 + d_buffer.at(1);
43,218✔
1171
      if (d_querySize < sizeof(dnsheader)) {
43,218✔
1172
        /* go away */
1173
        terminateClientConnection();
2✔
1174
        return true;
2✔
1175
      }
2✔
1176

1177
      d_buffer.resize(d_querySize);
43,216✔
1178
      d_currentPos = 0;
43,216✔
1179
    }
43,216✔
1180
    else {
4,866✔
1181
      d_lastIOBlocked = true;
4,866✔
1182
    }
4,866✔
1183
  }
48,084✔
1184

1185
  if (!d_lastIOBlocked && d_state == State::readingQuery) {
48,228!
1186
    DEBUGLOG("reading query");
43,362✔
1187
    d_readIOsCurrentQuery++;
43,362✔
1188
    iostate = d_handler.tryRead(d_buffer, d_currentPos, d_querySize);
43,362✔
1189
    if (iostate == IOState::Done) {
43,362✔
1190
      iostate = handleIncomingQueryReceived(now);
43,210✔
1191
    }
43,210✔
1192
    else {
152✔
1193
      d_lastIOBlocked = true;
152✔
1194
    }
152✔
1195
  }
43,362✔
1196

1197
  return false;
48,228✔
1198
}
48,230✔
1199

1200
void IncomingTCPConnectionState::handleIO()
1201
{
28,234✔
1202
  // let's make sure we are not already in handleIO() below in the stack:
1203
  // this might happen when we have a response available on the backend socket
1204
  // right after forwarding the query, and then a query waiting for us on the
1205
  // client socket right after forwarding the response, and then a response available
1206
  // on the backend socket right after forwarding the query.. you get the idea.
1207
  if (d_handlingIO) {
28,234✔
1208
    return;
20,223✔
1209
  }
20,223✔
1210
  dnsdist::tcp::HandlingIOGuard reentryGuard(d_handlingIO);
8,011✔
1211

1212
  // why do we loop? Because the TLS layer does buffering, and thus can have data ready to read
1213
  // even though the underlying socket is not ready, so we need to actually ask for the data first
1214
  IOState iostate = IOState::Done;
8,011✔
1215
  timeval now{};
8,011✔
1216
  gettimeofday(&now, nullptr);
8,011✔
1217

1218
  do {
48,878✔
1219
    iostate = IOState::Done;
48,878✔
1220
    IOStateGuard ioGuard(d_ioState);
48,878✔
1221

1222
    if (maxConnectionDurationReached(dnsdist::configuration::getCurrentRuntimeConfiguration().d_maxTCPConnectionDuration, now)) {
48,878✔
1223
      vinfolog("Terminating TCP connection from %s because it reached the maximum TCP connection duration", d_ci.remote.toStringWithPort());
1!
1224
      // will be handled by the ioGuard
1225
      // handleNewIOState(state, IOState::Done, fd, handleIOCallback);
1226
      return;
1✔
1227
    }
1✔
1228

1229
    const auto& immutable = dnsdist::configuration::getImmutableConfiguration();
48,877✔
1230
    if (immutable.d_maxTCPReadIOsPerQuery > 0 && d_readIOsCurrentQuery >= immutable.d_maxTCPReadIOsPerQuery) {
48,877✔
1231
      vinfolog("Terminating TCP connection from %s for reaching the maximum number of read IO events per query (%d)", d_ci.remote.toStringWithPort(), immutable.d_maxTCPReadIOsPerQuery);
1!
1232
      dnsdist::IncomingConcurrentTCPConnectionsManager::banClientFor(d_ci.remote, time(nullptr), immutable.d_tcpBanDurationForExceedingMaxReadIOsPerQuery);
1✔
1233
      return;
1✔
1234
    }
1✔
1235

1236
    d_lastIOBlocked = false;
48,876✔
1237

1238
    try {
48,876✔
1239
      if (d_state == State::starting) {
48,876✔
1240
        if (d_ci.cs != nullptr && d_ci.cs->d_enableProxyProtocol && isProxyPayloadOutsideTLS() && expectProxyProtocolFrom(d_ci.remote)) {
2,531!
1241
          d_state = State::readingProxyProtocolHeader;
1✔
1242
          d_buffer.resize(s_proxyProtocolMinimumHeaderSize);
1✔
1243
          d_proxyProtocolNeed = s_proxyProtocolMinimumHeaderSize;
1✔
1244
        }
1✔
1245
        else {
2,530✔
1246
          d_state = State::doingHandshake;
2,530✔
1247
        }
2,530✔
1248
      }
2,531✔
1249

1250
      if (d_state == State::doingHandshake) {
48,876✔
1251
        iostate = handleHandshake(now);
2,901✔
1252
      }
2,901✔
1253

1254
      if (!d_lastIOBlocked && d_state == State::readingProxyProtocolHeader) {
48,876✔
1255
        auto status = handleProxyProtocolPayload();
17✔
1256
        if (status == ProxyProtocolResult::Done) {
17✔
1257
          d_buffer.resize(sizeof(uint16_t));
9✔
1258

1259
          if (isProxyPayloadOutsideTLS()) {
9✔
1260
            d_state = State::doingHandshake;
1✔
1261
            iostate = handleHandshake(now);
1✔
1262
          }
1✔
1263
          else {
8✔
1264
            d_state = State::readingQuerySize;
8✔
1265
          }
8✔
1266
        }
9✔
1267
        else if (status == ProxyProtocolResult::Error) {
8✔
1268
          iostate = IOState::Done;
3✔
1269
        }
3✔
1270
        else {
5✔
1271
          iostate = IOState::NeedRead;
5✔
1272
        }
5✔
1273
      }
17✔
1274

1275
      if (!d_lastIOBlocked && (d_state == State::waitingForQuery || d_state == State::readingQuerySize || d_state == State::readingQuery)) {
48,876✔
1276
        if (readIncomingQuery(now, iostate)) {
48,230✔
1277
          return;
2✔
1278
        }
2✔
1279
      }
48,230✔
1280

1281
      if (!d_lastIOBlocked && d_state == State::sendingResponse) {
48,874✔
1282
        DEBUGLOG("sending response");
13✔
1283
        iostate = d_handler.tryWrite(d_currentResponse.d_buffer, d_currentPos, d_currentResponse.d_buffer.size());
13✔
1284
        if (iostate == IOState::Done) {
13!
1285
          DEBUGLOG("response sent from " << __PRETTY_FUNCTION__);
13✔
1286
          handleResponseSent(d_currentResponse, d_currentResponse.d_buffer.size());
13✔
1287
          d_state = State::idle;
13✔
1288
        }
13✔
1289
        else {
×
1290
          d_lastIOBlocked = true;
×
1291
        }
×
1292
      }
13✔
1293

1294
      if (active() && !d_lastIOBlocked && iostate == IOState::Done && (d_state == State::idle || d_state == State::waitingForQuery)) {
48,874!
1295
        // try sending queued responses
1296
        DEBUGLOG("send responses, if any");
2,764✔
1297
        auto state = shared_from_this();
2,764✔
1298
        iostate = sendQueuedResponses(state, now);
2,764✔
1299

1300
        if (!d_lastIOBlocked && active() && iostate == IOState::Done) {
2,764!
1301
          // if the query has been passed to a backend, or dropped, and the responses have been sent,
1302
          // we can start reading again
1303
          if (canAcceptNewQueries(now)) {
2,761✔
1304
            resetForNewQuery();
217✔
1305
            iostate = IOState::NeedRead;
217✔
1306
          }
217✔
1307
          else {
2,544✔
1308
            d_state = State::idle;
2,544✔
1309
            iostate = IOState::Done;
2,544✔
1310
          }
2,544✔
1311
        }
2,761✔
1312
      }
2,764✔
1313

1314
      if (d_state != State::idle && d_state != State::doingHandshake && d_state != State::readingProxyProtocolHeader && d_state != State::waitingForQuery && d_state != State::readingQuerySize && d_state != State::readingQuery && d_state != State::sendingResponse) {
48,874!
1315
        vinfolog("Unexpected state %d in handleIOCallback", static_cast<int>(d_state));
×
1316
      }
×
1317
    }
48,874✔
1318
    catch (const std::exception& exp) {
48,876✔
1319
      /* most likely an EOF because the other end closed the connection,
1320
         but it might also be a real IO error or something else.
1321
         Let's just drop the connection
1322
      */
1323
      handleExceptionDuringIO(exp);
2,387✔
1324
    }
2,387✔
1325

1326
    if (!active()) {
48,873✔
1327
      DEBUGLOG("state is no longer active");
2,442✔
1328
      return;
2,442✔
1329
    }
2,442✔
1330

1331
    auto sharedPtrToConn = shared_from_this();
46,431✔
1332
    if (iostate == IOState::Done) {
46,431✔
1333
      d_ioState->update(iostate, handleIOCallback, sharedPtrToConn);
2,547✔
1334
    }
2,547✔
1335
    else {
43,884✔
1336
      updateIO(iostate, now);
43,884✔
1337
    }
43,884✔
1338
    ioGuard.release();
46,431✔
1339
  } while ((iostate == IOState::NeedRead || iostate == IOState::NeedWrite) && !d_lastIOBlocked);
46,431✔
1340
}
8,011✔
1341

1342
void IncomingTCPConnectionState::notifyIOError(const struct timeval& now, TCPResponse&& response)
1343
{
61✔
1344
  if (std::this_thread::get_id() != d_creatorThreadID) {
61✔
1345
    /* empty buffer will signal an IO error */
1346
    response.d_buffer.clear();
18✔
1347
    handleCrossProtocolResponse(now, std::move(response));
18✔
1348
    return;
18✔
1349
  }
18✔
1350

1351
  auto sharedPtrToConn = shared_from_this();
43✔
1352
  --sharedPtrToConn->d_currentQueriesCount;
43✔
1353
  sharedPtrToConn->d_hadErrors = true;
43✔
1354

1355
  if (sharedPtrToConn->d_state == State::sendingResponse) {
43✔
1356
    /* if we have responses to send, let's do that first */
1357
  }
2✔
1358
  else if (!sharedPtrToConn->d_queuedResponses.empty()) {
41!
1359
    /* stop reading and send what we have */
1360
    try {
×
1361
      auto iostate = sendQueuedResponses(sharedPtrToConn, now);
×
1362

1363
      if (sharedPtrToConn->active() && iostate != IOState::Done) {
×
1364
        // we need to update the state right away, nobody will do that for us
1365
        updateIO(iostate, now);
×
1366
      }
×
1367
    }
×
1368
    catch (const std::exception& e) {
×
1369
      vinfolog("Exception in notifyIOError: %s", e.what());
×
1370
    }
×
1371
  }
×
1372
  else {
41✔
1373
    // the backend code already tried to reconnect if it was possible
1374
    sharedPtrToConn->terminateClientConnection();
41✔
1375
  }
41✔
1376
}
43✔
1377

1378
static bool processXFRResponse(DNSResponse& dnsResponse)
1379
{
449✔
1380
  const auto& chains = dnsdist::configuration::getCurrentRuntimeConfiguration().d_ruleChains;
449✔
1381
  const auto& xfrRespRuleActions = dnsdist::rules::getResponseRuleChain(chains, dnsdist::rules::ResponseRuleChain::XFRResponseRules);
449✔
1382

1383
  if (!applyRulesToResponse(xfrRespRuleActions, dnsResponse)) {
449!
1384
    return false;
×
1385
  }
×
1386

1387
  if (dnsResponse.isAsynchronous()) {
449!
1388
    return true;
×
1389
  }
×
1390

1391
  if (dnsResponse.ids.d_extendedError) {
449!
1392
    dnsdist::edns::addExtendedDNSError(dnsResponse.getMutableData(), dnsResponse.getMaximumSize(), dnsResponse.ids.d_extendedError->infoCode, dnsResponse.ids.d_extendedError->extraText);
×
1393
  }
×
1394

1395
  return true;
449✔
1396
}
449✔
1397

1398
void IncomingTCPConnectionState::handleXFRResponse(const struct timeval& now, TCPResponse&& response)
1399
{
449✔
1400
  if (std::this_thread::get_id() != d_creatorThreadID) {
449!
1401
    handleCrossProtocolResponse(now, std::move(response));
×
1402
    return;
×
1403
  }
×
1404

1405
  std::shared_ptr<IncomingTCPConnectionState> state = shared_from_this();
449✔
1406
  auto& ids = response.d_idstate;
449✔
1407
  std::shared_ptr<DownstreamState> backend = response.d_ds ? response.d_ds : (response.d_connection ? response.d_connection->getDS() : nullptr);
449!
1408
  DNSResponse dnsResponse(ids, response.d_buffer, backend);
449✔
1409
  dnsResponse.d_incomingTCPState = state;
449✔
1410
  memcpy(&response.d_cleartextDH, dnsResponse.getHeader().get(), sizeof(response.d_cleartextDH));
449✔
1411

1412
  if (!processXFRResponse(dnsResponse)) {
449!
1413
    state->terminateClientConnection();
×
1414
    return;
×
1415
  }
×
1416

1417
  queueResponse(state, now, std::move(response), true);
449✔
1418
}
449✔
1419

1420
void IncomingTCPConnectionState::handleTimeout(std::shared_ptr<IncomingTCPConnectionState>& state, bool write)
1421
{
18✔
1422
  vinfolog("Timeout while %s TCP client %s", (write ? "writing to" : "reading from"), state->d_ci.remote.toStringWithPort());
18!
1423
  DEBUGLOG("client timeout");
18✔
1424
  DEBUGLOG("Processed " << state->d_queriesCount << " queries, current count is " << state->d_currentQueriesCount << ", " << state->d_ownedConnectionsToBackend.size() << " owned connections, " << state->d_queuedResponses.size() << " response queued");
18✔
1425

1426
  if (write || state->d_currentQueriesCount == 0) {
18✔
1427
    ++state->d_ci.cs->tcpClientTimeouts;
12✔
1428
    state->d_ioState.reset();
12✔
1429
  }
12✔
1430
  else {
6✔
1431
    DEBUGLOG("Going idle");
6✔
1432
    /* we still have some queries in flight, let's just stop reading for now */
1433
    state->d_state = State::idle;
6✔
1434
    state->d_ioState->update(IOState::Done, handleIOCallback, state);
6✔
1435
  }
6✔
1436
}
18✔
1437

1438
static void handleIncomingTCPQuery(int pipefd, FDMultiplexer::funcparam_t& param)
1439
{
2,638✔
1440
  (void)pipefd;
2,638✔
1441
  auto* threadData = boost::any_cast<TCPClientThreadData*>(param);
2,638✔
1442

1443
  std::unique_ptr<ConnectionInfo> citmp{nullptr};
2,638✔
1444
  try {
2,638✔
1445
    auto tmp = threadData->queryReceiver.receive();
2,638✔
1446
    if (!tmp) {
2,638!
1447
      return;
×
1448
    }
×
1449
    citmp = std::move(*tmp);
2,638✔
1450
  }
2,638✔
1451
  catch (const std::exception& e) {
2,638✔
1452
    throw std::runtime_error("Error while reading from the TCP query channel: " + std::string(e.what()));
×
1453
  }
×
1454

1455
  g_tcpclientthreads->decrementQueuedCount();
2,638✔
1456

1457
  timeval now{};
2,638✔
1458
  gettimeofday(&now, nullptr);
2,638✔
1459

1460
  if (citmp->cs->dohFrontend) {
2,638✔
1461
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
201✔
1462
    auto state = std::make_shared<IncomingHTTP2Connection>(std::move(*citmp), *threadData, now);
201✔
1463
    state->handleIO();
201✔
1464
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
201✔
1465
  }
201✔
1466
  else {
2,437✔
1467
    auto state = std::make_shared<IncomingTCPConnectionState>(std::move(*citmp), *threadData, now);
2,437✔
1468
    state->handleIO();
2,437✔
1469
  }
2,437✔
1470
}
2,638✔
1471

1472
static void handleCrossProtocolQuery(int pipefd, FDMultiplexer::funcparam_t& param)
1473
{
228✔
1474
  (void)pipefd;
228✔
1475
  auto* threadData = boost::any_cast<TCPClientThreadData*>(param);
228✔
1476

1477
  std::unique_ptr<CrossProtocolQuery> cpq{nullptr};
228✔
1478
  try {
228✔
1479
    auto tmp = threadData->crossProtocolQueryReceiver.receive();
228✔
1480
    if (!tmp) {
228!
1481
      return;
×
1482
    }
×
1483
    cpq = std::move(*tmp);
228✔
1484
  }
228✔
1485
  catch (const std::exception& e) {
228✔
1486
    throw std::runtime_error("Error while reading from the TCP cross-protocol channel: " + std::string(e.what()));
×
1487
  }
×
1488

1489
  timeval now{};
228✔
1490
  gettimeofday(&now, nullptr);
228✔
1491

1492
  std::shared_ptr<TCPQuerySender> tqs = cpq->getTCPQuerySender();
228✔
1493
  auto query = std::move(cpq->query);
228✔
1494
  auto downstreamServer = std::move(cpq->downstream);
228✔
1495

1496
  try {
228✔
1497
    auto downstream = t_downstreamTCPConnectionsManager.getConnectionToDownstream(threadData->mplexer, downstreamServer, now, std::string());
228✔
1498

1499
    prependSizeToTCPQuery(query.d_buffer, query.d_idstate.d_proxyProtocolPayloadSize);
228✔
1500

1501
    vinfolog("Got query for %s|%s from %s (%s, %d bytes), relayed to %s", query.d_idstate.qname.toLogString(), QType(query.d_idstate.qtype).toString(), query.d_idstate.origRemote.toStringWithPort(), query.d_idstate.protocol.toString(), query.d_buffer.size(), downstreamServer->getNameWithAddr());
228✔
1502

1503
    downstream->queueQuery(tqs, std::move(query));
228✔
1504
  }
228✔
1505
  catch (...) {
228✔
1506
    tqs->notifyIOError(now, std::move(query));
×
1507
  }
×
1508
}
228✔
1509

1510
static void handleCrossProtocolResponse(int pipefd, FDMultiplexer::funcparam_t& param)
1511
{
284✔
1512
  (void)pipefd;
284✔
1513
  auto* threadData = boost::any_cast<TCPClientThreadData*>(param);
284✔
1514

1515
  std::unique_ptr<TCPCrossProtocolResponse> cpr{nullptr};
284✔
1516
  try {
284✔
1517
    auto tmp = threadData->crossProtocolResponseReceiver.receive();
284✔
1518
    if (!tmp) {
284!
1519
      return;
×
1520
    }
×
1521
    cpr = std::move(*tmp);
284✔
1522
  }
284✔
1523
  catch (const std::exception& e) {
284✔
1524
    throw std::runtime_error("Error while reading from the TCP cross-protocol response: " + std::string(e.what()));
×
1525
  }
×
1526

1527
  auto& response = *cpr;
284✔
1528

1529
  try {
284✔
1530
    if (response.d_response.d_buffer.empty()) {
284✔
1531
      response.d_state->notifyIOError(response.d_now, std::move(response.d_response));
24✔
1532
    }
24✔
1533
    else if (response.d_response.d_idstate.qtype == QType::AXFR || response.d_response.d_idstate.qtype == QType::IXFR) {
260!
1534
      response.d_state->handleXFRResponse(response.d_now, std::move(response.d_response));
×
1535
    }
×
1536
    else {
260✔
1537
      response.d_state->handleResponse(response.d_now, std::move(response.d_response));
260✔
1538
    }
260✔
1539
  }
284✔
1540
  catch (...) {
284✔
1541
    /* no point bubbling up from there */
1542
  }
×
1543
}
284✔
1544

1545
struct TCPAcceptorParam
1546
{
1547
  // NOLINTNEXTLINE(cppcoreguidelines-avoid-const-or-ref-data-members)
1548
  ClientState& clientState;
1549
  ComboAddress local;
1550
  int socket{-1};
1551
};
1552

1553
static void acceptNewConnection(const TCPAcceptorParam& param, TCPClientThreadData* threadData);
1554

1555
static void scanForTimeouts(const TCPClientThreadData& data, const timeval& now)
1556
{
11,395✔
1557
  auto expiredReadConns = data.mplexer->getTimeouts(now, false);
11,395✔
1558
  for (const auto& cbData : expiredReadConns) {
11,395✔
1559
    if (cbData.second.type() == typeid(std::shared_ptr<IncomingTCPConnectionState>)) {
11✔
1560
      auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(cbData.second);
1✔
1561
      if (cbData.first == state->d_handler.getDescriptor()) {
1!
1562
        vinfolog("Timeout (read) from remote TCP client %s", state->d_ci.remote.toStringWithPort());
1!
1563
        state->handleTimeout(state, false);
1✔
1564
      }
1✔
1565
    }
1✔
1566
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
10✔
1567
    else if (cbData.second.type() == typeid(std::shared_ptr<IncomingHTTP2Connection>)) {
10!
1568
      auto state = boost::any_cast<std::shared_ptr<IncomingHTTP2Connection>>(cbData.second);
1569
      if (cbData.first == state->d_handler.getDescriptor()) {
×
1570
        vinfolog("Timeout (read) from remote H2 client %s", state->d_ci.remote.toStringWithPort());
×
1571
        std::shared_ptr<IncomingTCPConnectionState> parentState = state;
1572
        state->handleTimeout(parentState, false);
1573
      }
1574
    }
1575
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
10✔
1576
    else if (cbData.second.type() == typeid(std::shared_ptr<TCPConnectionToBackend>)) {
10!
1577
      auto conn = boost::any_cast<std::shared_ptr<TCPConnectionToBackend>>(cbData.second);
10✔
1578
      vinfolog("Timeout (read) from remote backend %s", conn->getBackendName());
10!
1579
      conn->handleTimeout(now, false);
10✔
1580
    }
10✔
1581
  }
11✔
1582

1583
  auto expiredWriteConns = data.mplexer->getTimeouts(now, true);
11,395✔
1584
  for (const auto& cbData : expiredWriteConns) {
11,395!
1585
    if (cbData.second.type() == typeid(std::shared_ptr<IncomingTCPConnectionState>)) {
×
1586
      auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(cbData.second);
×
1587
      if (cbData.first == state->d_handler.getDescriptor()) {
×
1588
        vinfolog("Timeout (write) from remote TCP client %s", state->d_ci.remote.toStringWithPort());
×
1589
        state->handleTimeout(state, true);
×
1590
      }
×
1591
    }
×
1592
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
1593
    else if (cbData.second.type() == typeid(std::shared_ptr<IncomingHTTP2Connection>)) {
×
1594
      auto state = boost::any_cast<std::shared_ptr<IncomingHTTP2Connection>>(cbData.second);
1595
      if (cbData.first == state->d_handler.getDescriptor()) {
×
1596
        vinfolog("Timeout (write) from remote H2 client %s", state->d_ci.remote.toStringWithPort());
×
1597
        std::shared_ptr<IncomingTCPConnectionState> parentState = state;
1598
        state->handleTimeout(parentState, true);
1599
      }
1600
    }
1601
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
1602
    else if (cbData.second.type() == typeid(std::shared_ptr<TCPConnectionToBackend>)) {
×
1603
      auto conn = boost::any_cast<std::shared_ptr<TCPConnectionToBackend>>(cbData.second);
×
1604
      vinfolog("Timeout (write) from remote backend %s", conn->getBackendName());
×
1605
      conn->handleTimeout(now, true);
×
1606
    }
×
1607
  }
×
1608
}
11,395✔
1609

1610
static void dumpTCPStates(const TCPClientThreadData& data)
1611
{
×
1612
  /* just to keep things clean in the output, debug only */
1613
  static std::mutex s_lock;
×
1614
  auto lock = std::scoped_lock(s_lock);
×
1615
  if (g_tcpStatesDumpRequested > 0) {
×
1616
    /* no race here, we took the lock so it can only be increased in the meantime */
1617
    --g_tcpStatesDumpRequested;
×
1618
    infolog("Dumping the TCP states, as requested:");
×
1619
    data.mplexer->runForAllWatchedFDs([](bool isRead, int desc, const FDMultiplexer::funcparam_t& param, struct timeval ttd) {
×
1620
      timeval lnow{};
×
1621
      gettimeofday(&lnow, nullptr);
×
1622
      if (ttd.tv_sec > 0) {
×
1623
        infolog("- Descriptor %d is in %s state, TTD in %d", desc, (isRead ? "read" : "write"), (ttd.tv_sec - lnow.tv_sec));
×
1624
      }
×
1625
      else {
×
1626
        infolog("- Descriptor %d is in %s state, no TTD set", desc, (isRead ? "read" : "write"));
×
1627
      }
×
1628

1629
      if (param.type() == typeid(std::shared_ptr<IncomingTCPConnectionState>)) {
×
1630
        auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(param);
×
1631
        infolog(" - %s", state->toString());
×
1632
      }
×
1633
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
1634
      else if (param.type() == typeid(std::shared_ptr<IncomingHTTP2Connection>)) {
×
1635
        auto state = boost::any_cast<std::shared_ptr<IncomingHTTP2Connection>>(param);
1636
        infolog(" - %s", state->toString());
1637
      }
1638
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
1639
      else if (param.type() == typeid(std::shared_ptr<TCPConnectionToBackend>)) {
×
1640
        auto conn = boost::any_cast<std::shared_ptr<TCPConnectionToBackend>>(param);
×
1641
        infolog(" - %s", conn->toString());
×
1642
      }
×
1643
      else if (param.type() == typeid(TCPClientThreadData*)) {
×
1644
        infolog(" - Worker thread pipe");
×
1645
      }
×
1646
    });
×
1647
    infolog("The TCP/DoT client cache has %d active and %d idle outgoing connections cached", t_downstreamTCPConnectionsManager.getActiveCount(), t_downstreamTCPConnectionsManager.getIdleCount());
×
1648
  }
×
1649
}
×
1650

1651
// NOLINTNEXTLINE(performance-unnecessary-value-param): you are wrong, clang-tidy, go home
1652
static void tcpClientThread(pdns::channel::Receiver<ConnectionInfo>&& queryReceiver, pdns::channel::Receiver<CrossProtocolQuery>&& crossProtocolQueryReceiver, pdns::channel::Receiver<TCPCrossProtocolResponse>&& crossProtocolResponseReceiver, pdns::channel::Sender<TCPCrossProtocolResponse>&& crossProtocolResponseSender, std::vector<ClientState*> tcpAcceptStates)
1653
{
3,611✔
1654
  /* we get launched with a pipe on which we receive file descriptors from clients that we own
1655
     from that point on */
1656

1657
  setThreadName("dnsdist/tcpClie");
3,611✔
1658

1659
  try {
3,611✔
1660
    TCPClientThreadData data;
3,611✔
1661
    data.crossProtocolResponseSender = std::move(crossProtocolResponseSender);
3,611✔
1662
    data.queryReceiver = std::move(queryReceiver);
3,611✔
1663
    data.crossProtocolQueryReceiver = std::move(crossProtocolQueryReceiver);
3,611✔
1664
    data.crossProtocolResponseReceiver = std::move(crossProtocolResponseReceiver);
3,611✔
1665

1666
    data.mplexer->addReadFD(data.queryReceiver.getDescriptor(), handleIncomingTCPQuery, &data);
3,611✔
1667
    data.mplexer->addReadFD(data.crossProtocolQueryReceiver.getDescriptor(), handleCrossProtocolQuery, &data);
3,611✔
1668
    data.mplexer->addReadFD(data.crossProtocolResponseReceiver.getDescriptor(), handleCrossProtocolResponse, &data);
3,611✔
1669

1670
    /* only used in single acceptor mode for now */
1671
    std::vector<TCPAcceptorParam> acceptParams;
3,611✔
1672
    acceptParams.reserve(tcpAcceptStates.size());
3,611✔
1673

1674
    for (auto& state : tcpAcceptStates) {
3,611!
1675
      acceptParams.emplace_back(TCPAcceptorParam{*state, state->local, state->tcpFD});
×
1676
      for (const auto& [addr, socket] : state->d_additionalAddresses) {
×
1677
        acceptParams.emplace_back(TCPAcceptorParam{*state, addr, socket});
×
1678
      }
×
1679
    }
×
1680

1681
    auto acceptCallback = [&data](int socket, FDMultiplexer::funcparam_t& funcparam) {
3,611✔
1682
      (void)socket;
×
1683
      const auto* acceptorParam = boost::any_cast<const TCPAcceptorParam*>(funcparam);
×
1684
      acceptNewConnection(*acceptorParam, &data);
×
1685
    };
×
1686

1687
    for (const auto& param : acceptParams) {
3,611!
1688
      setNonBlocking(param.socket);
×
1689
      data.mplexer->addReadFD(param.socket, acceptCallback, &param);
×
1690
    }
×
1691

1692
    timeval now{};
3,611✔
1693
    gettimeofday(&now, nullptr);
3,611✔
1694
    time_t lastTimeoutScan = now.tv_sec;
3,611✔
1695
    time_t lastConfigRefresh = now.tv_sec;
3,611✔
1696

1697
    for (;;) {
37,743✔
1698
      data.mplexer->run(&now);
37,743✔
1699

1700
      if (now.tv_sec > lastConfigRefresh) {
37,743✔
1701
        lastConfigRefresh = now.tv_sec;
11,176✔
1702
        dnsdist::configuration::refreshLocalRuntimeConfiguration();
11,176✔
1703
      }
11,176✔
1704

1705
      try {
37,743✔
1706
        t_downstreamTCPConnectionsManager.cleanupClosedConnections(now);
37,743✔
1707
        dnsdist::IncomingConcurrentTCPConnectionsManager::cleanup(time(nullptr));
37,743✔
1708

1709
        if (now.tv_sec > lastTimeoutScan) {
37,743✔
1710
          lastTimeoutScan = now.tv_sec;
11,402✔
1711
          scanForTimeouts(data, now);
11,402✔
1712

1713
          if (g_tcpStatesDumpRequested > 0) {
11,402!
1714
            dumpTCPStates(data);
×
1715
          }
×
1716
        }
11,402✔
1717
      }
37,743✔
1718
      catch (const std::exception& e) {
37,743✔
1719
        warnlog("Error in TCP worker thread: %s", e.what());
×
1720
      }
×
1721
    }
37,743✔
1722
  }
3,611✔
1723
  catch (const std::exception& e) {
3,611✔
1724
    errlog("Fatal error in TCP worker thread: %s", e.what());
×
1725
  }
×
1726
}
3,611✔
1727

1728
static void acceptNewConnection(const TCPAcceptorParam& param, TCPClientThreadData* threadData)
1729
{
3,118✔
1730
  auto& clientState = param.clientState;
3,118✔
1731
  const bool checkACL = clientState.dohFrontend == nullptr || (!clientState.dohFrontend->d_trustForwardedForHeader && clientState.dohFrontend->d_earlyACLDrop);
3,118!
1732
  const int socket = param.socket;
3,118✔
1733
  bool tcpClientCountIncremented = false;
3,118✔
1734
  ComboAddress remote;
3,118✔
1735
  remote.sin4.sin_family = param.local.sin4.sin_family;
3,118✔
1736

1737
  tcpClientCountIncremented = false;
3,118✔
1738
  try {
3,118✔
1739
    socklen_t remlen = remote.getSocklen();
3,118✔
1740
    ConnectionInfo connInfo(&clientState);
3,118✔
1741
#ifdef HAVE_ACCEPT4
3,118✔
1742
    // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
1743
    connInfo.fd = accept4(socket, reinterpret_cast<struct sockaddr*>(&remote), &remlen, SOCK_NONBLOCK);
3,118✔
1744
#else
1745
    // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
1746
    connInfo.fd = accept(socket, reinterpret_cast<struct sockaddr*>(&remote), &remlen);
1747
#endif
1748
    // will be decremented when the ConnectionInfo object is destroyed, no matter the reason
1749
    auto concurrentConnections = ++clientState.tcpCurrentConnections;
3,118✔
1750

1751
    if (connInfo.fd < 0) {
3,118!
1752
      throw std::runtime_error((boost::format("accepting new connection on socket: %s") % stringerror()).str());
×
1753
    }
×
1754

1755
    if (checkACL && !dnsdist::configuration::getCurrentRuntimeConfiguration().d_ACL.match(remote)) {
3,118✔
1756
      ++dnsdist::metrics::g_stats.aclDrops;
9✔
1757
      vinfolog("Dropped TCP connection from %s because of ACL", remote.toStringWithPort());
9✔
1758
      return;
9✔
1759
    }
9✔
1760

1761
    if (clientState.d_tcpConcurrentConnectionsLimit > 0 && concurrentConnections > clientState.d_tcpConcurrentConnectionsLimit) {
3,109✔
1762
      vinfolog("Dropped TCP connection from %s because of concurrent connections limit", remote.toStringWithPort());
3!
1763
      return;
3✔
1764
    }
3✔
1765

1766
    if (concurrentConnections > clientState.tcpMaxConcurrentConnections.load()) {
3,106✔
1767
      clientState.tcpMaxConcurrentConnections.store(concurrentConnections);
586✔
1768
    }
586✔
1769

1770
#ifndef HAVE_ACCEPT4
1771
    if (!setNonBlocking(connInfo.fd)) {
1772
      return;
1773
    }
1774
#endif
1775

1776
    setTCPNoDelay(connInfo.fd); // disable NAGLE
3,106✔
1777

1778
    const auto maxTCPQueuedConnections = dnsdist::configuration::getImmutableConfiguration().d_maxTCPQueuedConnections;
3,106✔
1779
    if (maxTCPQueuedConnections > 0 && g_tcpclientthreads->getQueuedCount() >= maxTCPQueuedConnections) {
3,106!
1780
      vinfolog("Dropping TCP connection from %s because we have too many queued already", remote.toStringWithPort());
×
1781
      return;
×
1782
    }
×
1783

1784
    auto connectionResult = dnsdist::IncomingConcurrentTCPConnectionsManager::accountNewTCPConnection(remote, connInfo.cs->hasTLS());
3,106✔
1785
    if (connectionResult == dnsdist::IncomingConcurrentTCPConnectionsManager::NewConnectionResult::Denied) {
3,106✔
1786
      return;
6✔
1787
    }
6✔
1788
    tcpClientCountIncremented = true;
3,100✔
1789
    if (connectionResult == dnsdist::IncomingConcurrentTCPConnectionsManager::NewConnectionResult::Restricted) {
3,100!
1790
      connInfo.d_restricted = true;
×
1791
    }
×
1792

1793
    vinfolog("Got TCP connection from %s", remote.toStringWithPort());
3,100✔
1794

1795
    connInfo.remote = remote;
3,100✔
1796

1797
    if (threadData == nullptr) {
3,100✔
1798
      if (!g_tcpclientthreads->passConnectionToThread(std::make_unique<ConnectionInfo>(std::move(connInfo)))) {
2,638!
1799
        if (tcpClientCountIncremented) {
×
1800
          dnsdist::IncomingConcurrentTCPConnectionsManager::accountClosedTCPConnection(remote);
×
1801
        }
×
1802
      }
×
1803
    }
2,638✔
1804
    else {
462✔
1805
      timeval now{};
462✔
1806
      gettimeofday(&now, nullptr);
462✔
1807

1808
      if (connInfo.cs->dohFrontend) {
462!
1809
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
1810
        auto state = std::make_shared<IncomingHTTP2Connection>(std::move(connInfo), *threadData, now);
1811
        state->handleIO();
1812
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
1813
      }
×
1814
      else {
462✔
1815
        auto state = std::make_shared<IncomingTCPConnectionState>(std::move(connInfo), *threadData, now);
462✔
1816
        state->handleIO();
462✔
1817
      }
462✔
1818
    }
462✔
1819
  }
3,100✔
1820
  catch (const std::exception& e) {
3,118✔
1821
    errlog("While reading a TCP question: %s", e.what());
×
1822
    if (tcpClientCountIncremented) {
×
1823
      dnsdist::IncomingConcurrentTCPConnectionsManager::accountClosedTCPConnection(remote);
×
1824
    }
×
1825
  }
×
1826
  catch (...) {
3,118✔
1827
  }
×
1828
}
3,118✔
1829

1830
/* spawn as many of these as required, they call Accept on a socket on which they will accept queries, and
1831
   they will hand off to worker threads & spawn more of them if required
1832
*/
1833
#ifndef USE_SINGLE_ACCEPTOR_THREAD
1834
void tcpAcceptorThread(const std::vector<ClientState*>& states)
1835
{
462✔
1836
  setThreadName("dnsdist/tcpAcce");
462✔
1837

1838
  std::vector<TCPAcceptorParam> params;
462✔
1839
  params.reserve(states.size());
462✔
1840

1841
  for (const auto& state : states) {
462✔
1842
    params.emplace_back(TCPAcceptorParam{*state, state->local, state->tcpFD});
462✔
1843
    for (const auto& [addr, socket] : state->d_additionalAddresses) {
462!
1844
      params.emplace_back(TCPAcceptorParam{*state, addr, socket});
×
1845
    }
×
1846
  }
462✔
1847

1848
  if (params.size() == 1) {
462!
1849
    while (true) {
3,580✔
1850
      dnsdist::configuration::refreshLocalRuntimeConfiguration();
3,118✔
1851
      acceptNewConnection(params.at(0), nullptr);
3,118✔
1852
    }
3,118✔
1853
  }
462✔
1854
  else {
×
1855
    auto acceptCallback = [](int socket, FDMultiplexer::funcparam_t& funcparam) {
×
1856
      (void)socket;
×
1857
      const auto* acceptorParam = boost::any_cast<const TCPAcceptorParam*>(funcparam);
×
1858
      acceptNewConnection(*acceptorParam, nullptr);
×
1859
    };
×
1860

1861
    auto mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent(params.size()));
×
1862
    for (const auto& param : params) {
×
1863
      mplexer->addReadFD(param.socket, acceptCallback, &param);
×
1864
    }
×
1865

1866
    timeval now{};
×
1867
    time_t lastConfigRefresh = now.tv_sec;
×
1868
    while (true) {
×
1869
      mplexer->run(&now, -1);
×
1870

1871
      if (now.tv_sec > lastConfigRefresh) {
×
1872
        lastConfigRefresh = now.tv_sec;
×
1873
        dnsdist::configuration::refreshLocalRuntimeConfiguration();
×
1874
      }
×
1875
    }
×
1876
  }
×
1877
}
462✔
1878
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc