• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

PowerDNS / pdns / 18903493638

29 Oct 2025 09:39AM UTC coverage: 73.004%. Remained the same
18903493638

Pull #16388

github

web-flow
Merge 1bddbd8fe into 82ea647b4
Pull Request #16388: gh actions build-packages: fix pattern for the download-artifacts action and publication issues

38272 of 63120 branches covered (60.63%)

Branch coverage included in aggregate %.

127434 of 163861 relevant lines covered (77.77%)

6052684.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.4
/pdns/dnsdistdist/dnsdist-tcp.cc
1
/*
2
 * This file is part of PowerDNS or dnsdist.
3
 * Copyright -- PowerDNS.COM B.V. and its contributors
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of version 2 of the GNU General Public License as
7
 * published by the Free Software Foundation.
8
 *
9
 * In addition, for the avoidance of any doubt, permission is granted to
10
 * link this program with OpenSSL and to (re)distribute the binaries
11
 * produced as the result of such linking.
12
 *
13
 * This program is distributed in the hope that it will be useful,
14
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
 * GNU General Public License for more details.
17
 *
18
 * You should have received a copy of the GNU General Public License
19
 * along with this program; if not, write to the Free Software
20
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21
 */
22

23
#include <thread>
24
#include <netinet/tcp.h>
25
#include <queue>
26
#include <boost/format.hpp>
27

28
#include "dnsdist.hh"
29
#include "dnsdist-concurrent-connections.hh"
30
#include "dnsdist-dnsparser.hh"
31
#include "dnsdist-ecs.hh"
32
#include "dnsdist-edns.hh"
33
#include "dnsdist-nghttp2-in.hh"
34
#include "dnsdist-proxy-protocol.hh"
35
#include "dnsdist-rings.hh"
36
#include "dnsdist-tcp.hh"
37
#include "dnsdist-tcp-downstream.hh"
38
#include "dnsdist-downstream-connection.hh"
39
#include "dnsdist-tcp-upstream.hh"
40
#include "dnsparser.hh"
41
#include "dolog.hh"
42
#include "gettime.hh"
43
#include "lock.hh"
44
#include "sstuff.hh"
45
#include "tcpiohandler.hh"
46
#include "tcpiohandler-mplexer.hh"
47
#include "threadname.hh"
48

49
/* TCP: the grand design.
50
   We forward 'messages' between clients and downstream servers. Messages are 65k bytes large, tops.
51
   An answer might theoretically consist of multiple messages (for example, in the case of AXFR), initially
52
   we will not go there.
53

54
   In a sense there is a strong symmetry between UDP and TCP, once a connection to a downstream has been setup.
55
   This symmetry is broken because of head-of-line blocking within TCP though, necessitating additional connections
56
   to guarantee performance.
57

58
   So the idea is to have a 'pool' of available downstream connections, and forward messages to/from them and never queue.
59
   So whenever an answer comes in, we know where it needs to go.
60

61
   Let's start naively.
62
*/
63

64
std::atomic<uint64_t> g_tcpStatesDumpRequested{0};
65

66
IncomingTCPConnectionState::~IncomingTCPConnectionState()
67
{
2,761✔
68
  try {
2,761✔
69
    dnsdist::IncomingConcurrentTCPConnectionsManager::accountClosedTCPConnection(d_ci.remote);
2,761✔
70
  }
2,761✔
71
  catch (...) {
2,761✔
72
    /* in theory it might raise an exception, and we cannot allow it to be uncaught in a dtor */
73
  }
×
74

75
  if (d_ci.cs != nullptr) {
2,762!
76
    timeval now{};
2,762✔
77
    gettimeofday(&now, nullptr);
2,762✔
78

79
    auto diff = now - d_connectionStartTime;
2,762✔
80
    d_ci.cs->updateTCPMetrics(d_queriesCount, diff.tv_sec * 1000 + diff.tv_usec / 1000, d_queriesCount > 0 ? d_readIOsTotal / d_queriesCount : d_readIOsTotal);
2,762✔
81
  }
2,762✔
82

83
  // would have been done when the object is destroyed anyway,
84
  // but that way we make sure it's done before the ConnectionInfo is destroyed,
85
  // closing the descriptor, instead of relying on the declaration order of the objects in the class
86
  d_handler.close();
2,762✔
87
}
2,762✔
88

89
dnsdist::Protocol IncomingTCPConnectionState::getProtocol() const
90
{
44,024✔
91
  if (d_ci.cs->dohFrontend) {
44,024✔
92
    return dnsdist::Protocol::DoH;
208✔
93
  }
208✔
94
  if (d_handler.isTLS()) {
43,816✔
95
    return dnsdist::Protocol::DoT;
41,189✔
96
  }
41,189✔
97
  return dnsdist::Protocol::DoTCP;
2,627✔
98
}
43,816✔
99

100
size_t IncomingTCPConnectionState::clearAllDownstreamConnections()
101
{
190✔
102
  return t_downstreamTCPConnectionsManager.clear();
190✔
103
}
190✔
104

105
static std::pair<std::shared_ptr<TCPConnectionToBackend>, bool> getOwnedDownstreamConnection(std::map<std::shared_ptr<DownstreamState>, std::deque<std::shared_ptr<TCPConnectionToBackend>>>& ownedConnectionsToBackend, const std::shared_ptr<DownstreamState>& backend, const std::unique_ptr<std::vector<ProxyProtocolValue>>& tlvs)
106
{
22,644✔
107
  bool tlvsMismatch = false;
22,644✔
108
  auto connIt = ownedConnectionsToBackend.find(backend);
22,644✔
109
  if (connIt == ownedConnectionsToBackend.end()) {
22,644✔
110
    DEBUGLOG("no owned connection found for " << backend->getName());
22,607✔
111
    return {nullptr, tlvsMismatch};
22,607✔
112
  }
22,607✔
113

114
  for (auto& conn : connIt->second) {
37✔
115
    if (conn->canBeReused(true)) {
37!
116
      if (conn->matchesTLVs(tlvs)) {
37✔
117
        DEBUGLOG("Got one owned connection accepting more for " << backend->getName());
32✔
118
        conn->setReused();
32✔
119
        ++backend->tcpReusedConnections;
32✔
120
        return {conn, tlvsMismatch};
32✔
121
      }
32✔
122
      DEBUGLOG("Found one connection to " << backend->getName() << " but with different TLV values");
5✔
123
      tlvsMismatch = true;
5✔
124
    }
5✔
125
    DEBUGLOG("not accepting more for " << backend->getName());
5✔
126
  }
5✔
127

128
  return {nullptr, tlvsMismatch};
5✔
129
}
37✔
130

131
bool IncomingTCPConnectionState::isNearTCPLimits() const
132
{
222,220✔
133
  if (d_ci.d_restricted) {
222,220✔
134
    return true;
5✔
135
  }
5✔
136

137
  const auto tcpConnectionsOverloadThreshold = dnsdist::configuration::getImmutableConfiguration().d_tcpConnectionsOverloadThreshold;
222,215✔
138
  if (tcpConnectionsOverloadThreshold == 0) {
222,215✔
139
    return false;
558✔
140
  }
558✔
141

142
  const auto& clientState = d_ci.cs;
221,657✔
143
  if (clientState->d_tcpConcurrentConnectionsLimit > 0) {
221,657✔
144
    auto concurrentConnections = clientState->tcpCurrentConnections.load();
90✔
145
    auto current = (100 * concurrentConnections) / clientState->d_tcpConcurrentConnectionsLimit;
90✔
146
    if (current >= tcpConnectionsOverloadThreshold) {
90✔
147
      return true;
18✔
148
    }
18✔
149
  }
90✔
150

151
  return dnsdist::IncomingConcurrentTCPConnectionsManager::isClientOverThreshold(d_ci.remote);
221,639✔
152
}
221,657✔
153

154
std::shared_ptr<TCPConnectionToBackend> IncomingTCPConnectionState::getDownstreamConnection(std::shared_ptr<DownstreamState>& backend, const std::unique_ptr<std::vector<ProxyProtocolValue>>& tlvs, const struct timeval& now)
155
{
22,644✔
156
  auto [downstream, tlvsMismatch] = getOwnedDownstreamConnection(d_ownedConnectionsToBackend, backend, tlvs);
22,644✔
157

158
  if (!downstream) {
22,644✔
159
    if (backend->d_config.useProxyProtocol && tlvsMismatch) {
22,612✔
160
      clearOwnedDownstreamConnections(backend);
5✔
161
    }
5✔
162

163
    /* we don't have a connection to this backend owned yet, let's get one (it might not be a fresh one, though) */
164
    downstream = t_downstreamTCPConnectionsManager.getConnectionToDownstream(d_threadData.mplexer, backend, now, std::string());
22,612✔
165
    // if we had an existing connection but the TLVs are different, they are likely unique per query so do not bother keeping the connection
166
    // around
167
    if (backend->d_config.useProxyProtocol && !tlvsMismatch) {
22,612✔
168
      registerOwnedDownstreamConnection(downstream);
21✔
169
    }
21✔
170
  }
22,612✔
171

172
  return downstream;
22,644✔
173
}
22,644✔
174

175
static void tcpClientThread(pdns::channel::Receiver<ConnectionInfo>&& queryReceiver, pdns::channel::Receiver<CrossProtocolQuery>&& crossProtocolQueryReceiver, pdns::channel::Receiver<TCPCrossProtocolResponse>&& crossProtocolResponseReceiver, pdns::channel::Sender<TCPCrossProtocolResponse>&& crossProtocolResponseSender, std::vector<ClientState*> tcpAcceptStates);
176

177
TCPClientCollection::TCPClientCollection(size_t maxThreads, std::vector<ClientState*> tcpAcceptStates) :
178
  d_tcpclientthreads(maxThreads), d_maxthreads(maxThreads)
394✔
179
{
394✔
180
  for (size_t idx = 0; idx < maxThreads; idx++) {
4,145✔
181
    addTCPClientThread(tcpAcceptStates);
3,751✔
182
  }
3,751✔
183
}
394✔
184

185
void TCPClientCollection::addTCPClientThread(std::vector<ClientState*>& tcpAcceptStates)
186
{
3,751✔
187
  try {
3,751✔
188
    const auto internalPipeBufferSize = dnsdist::configuration::getImmutableConfiguration().d_tcpInternalPipeBufferSize;
3,751✔
189

190
    auto [queryChannelSender, queryChannelReceiver] = pdns::channel::createObjectQueue<ConnectionInfo>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, internalPipeBufferSize);
3,751✔
191

192
    auto [crossProtocolQueryChannelSender, crossProtocolQueryChannelReceiver] = pdns::channel::createObjectQueue<CrossProtocolQuery>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, internalPipeBufferSize);
3,751✔
193

194
    auto [crossProtocolResponseChannelSender, crossProtocolResponseChannelReceiver] = pdns::channel::createObjectQueue<TCPCrossProtocolResponse>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, internalPipeBufferSize);
3,751✔
195

196
    vinfolog("Adding TCP Client thread");
3,751✔
197

198
    if (d_numthreads >= d_tcpclientthreads.size()) {
3,751!
199
      vinfolog("Adding a new TCP client thread would exceed the vector size (%d/%d), skipping. Consider increasing the maximum amount of TCP client threads with setMaxTCPClientThreads() in the configuration.", d_numthreads.load(), d_tcpclientthreads.size());
×
200
      return;
×
201
    }
×
202

203
    TCPWorkerThread worker(std::move(queryChannelSender), std::move(crossProtocolQueryChannelSender));
3,751✔
204

205
    try {
3,751✔
206
      std::thread clientThread(tcpClientThread, std::move(queryChannelReceiver), std::move(crossProtocolQueryChannelReceiver), std::move(crossProtocolResponseChannelReceiver), std::move(crossProtocolResponseChannelSender), tcpAcceptStates);
3,751✔
207
      clientThread.detach();
3,751✔
208
    }
3,751✔
209
    catch (const std::runtime_error& e) {
3,751✔
210
      errlog("Error creating a TCP thread: %s", e.what());
×
211
      return;
×
212
    }
×
213

214
    d_tcpclientthreads.at(d_numthreads) = std::move(worker);
3,751✔
215
    ++d_numthreads;
3,751✔
216
  }
3,751✔
217
  catch (const std::exception& e) {
3,751✔
218
    errlog("Error creating TCP worker: %s", e.what());
×
219
  }
×
220
}
3,751✔
221

222
std::unique_ptr<TCPClientCollection> g_tcpclientthreads;
223

224
static IOState sendQueuedResponses(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now)
225
{
46,186✔
226
  IOState result = IOState::Done;
46,186✔
227

228
  while (state->active() && !state->d_queuedResponses.empty()) {
89,840✔
229
    DEBUGLOG("queue size is " << state->d_queuedResponses.size() << ", sending the next one");
43,669✔
230
    TCPResponse resp = std::move(state->d_queuedResponses.front());
43,669✔
231
    state->d_queuedResponses.pop_front();
43,669✔
232
    state->d_state = IncomingTCPConnectionState::State::idle;
43,669✔
233
    result = state->sendResponse(now, std::move(resp));
43,669✔
234
    if (result != IOState::Done) {
43,669✔
235
      return result;
15✔
236
    }
15✔
237
  }
43,669✔
238

239
  state->d_state = IncomingTCPConnectionState::State::idle;
46,171✔
240
  return IOState::Done;
46,171✔
241
}
46,186✔
242

243
void IncomingTCPConnectionState::handleResponseSent(TCPResponse& currentResponse, size_t sentBytes)
244
{
43,663✔
245
  if (currentResponse.d_idstate.qtype == QType::AXFR || currentResponse.d_idstate.qtype == QType::IXFR) {
43,663✔
246
    return;
451✔
247
  }
451✔
248

249
  --d_currentQueriesCount;
43,212✔
250

251
  const auto& backend = currentResponse.d_connection ? currentResponse.d_connection->getDS() : currentResponse.d_ds;
43,212✔
252
  if (!currentResponse.d_idstate.selfGenerated && backend) {
43,212!
253
    const auto& ids = currentResponse.d_idstate;
22,714✔
254
    double udiff = ids.queryRealTime.udiff();
22,714✔
255
    vinfolog("Got answer from %s, relayed to %s (%s, %d bytes), took %f us", backend->d_config.remote.toStringWithPort(), ids.origRemote.toStringWithPort(), getProtocol().toString(), sentBytes, udiff);
22,714✔
256

257
    auto backendProtocol = backend->getProtocol();
22,714✔
258
    if (backendProtocol == dnsdist::Protocol::DoUDP && !currentResponse.d_idstate.forwardedOverUDP) {
22,714✔
259
      backendProtocol = dnsdist::Protocol::DoTCP;
2,289✔
260
    }
2,289✔
261
    ::handleResponseSent(ids, udiff, d_ci.remote, backend->d_config.remote, static_cast<unsigned int>(sentBytes), currentResponse.d_cleartextDH, backendProtocol, true);
22,714✔
262
  }
22,714✔
263
  else {
20,498✔
264
    const auto& ids = currentResponse.d_idstate;
20,498✔
265
    ::handleResponseSent(ids, 0., d_ci.remote, ComboAddress(), static_cast<unsigned int>(currentResponse.d_buffer.size()), currentResponse.d_cleartextDH, ids.protocol, false);
20,498✔
266
  }
20,498✔
267

268
  currentResponse.d_buffer.clear();
43,212✔
269
  currentResponse.d_connection.reset();
43,212✔
270
}
43,212✔
271

272
static void prependSizeToTCPQuery(PacketBuffer& buffer, size_t proxyProtocolPayloadSize)
273
{
22,872✔
274
  if (buffer.size() <= proxyProtocolPayloadSize) {
22,872!
275
    throw std::runtime_error("The payload size is smaller or equal to the buffer size");
×
276
  }
×
277

278
  uint16_t queryLen = proxyProtocolPayloadSize > 0 ? (buffer.size() - proxyProtocolPayloadSize) : buffer.size();
22,872✔
279
  const std::array<uint8_t, 2> sizeBytes{static_cast<uint8_t>(queryLen / 256), static_cast<uint8_t>(queryLen % 256)};
22,872✔
280
  /* prepend the size. Yes, this is not the most efficient way but it prevents mistakes
281
     that could occur if we had to deal with the size during the processing,
282
     especially alignment issues */
283
  buffer.insert(buffer.begin() + static_cast<PacketBuffer::iterator::difference_type>(proxyProtocolPayloadSize), sizeBytes.begin(), sizeBytes.end());
22,872✔
284
}
22,872✔
285

286
bool IncomingTCPConnectionState::canAcceptNewQueries(const struct timeval& now)
287
{
46,167✔
288
  if (d_hadErrors) {
46,167✔
289
    DEBUGLOG("not accepting new queries because we encountered some error during the processing already");
2✔
290
    return false;
2✔
291
  }
2✔
292

293
  if (isNearTCPLimits()) {
46,165✔
294
    d_ci.d_restricted = true;
4✔
295
    DEBUGLOG("not accepting new queries because we already near our TCP limits");
4✔
296
    return false;
4✔
297
  }
4✔
298

299
  // for DoH, this is already handled by the underlying library
300
  if (!d_ci.cs->dohFrontend && d_currentQueriesCount >= d_ci.cs->d_maxInFlightQueriesPerConn) {
46,161✔
301
    DEBUGLOG("not accepting new queries because we already have " << d_currentQueriesCount << " out of " << d_ci.cs->d_maxInFlightQueriesPerConn);
2,792✔
302
    return false;
2,792✔
303
  }
2,792✔
304

305
  const auto& currentConfig = dnsdist::configuration::getCurrentRuntimeConfiguration();
43,369✔
306
  if (currentConfig.d_maxTCPQueriesPerConn != 0 && d_queriesCount > currentConfig.d_maxTCPQueriesPerConn) {
43,369✔
307
    vinfolog("not accepting new queries from %s because it reached the maximum number of queries per conn (%d / %d)", d_ci.remote.toStringWithPort(), d_queriesCount, currentConfig.d_maxTCPQueriesPerConn);
6!
308
    return false;
6✔
309
  }
6✔
310

311
  if (maxConnectionDurationReached(currentConfig.d_maxTCPConnectionDuration, now)) {
43,363!
312
    vinfolog("not accepting new queries from %s because it reached the maximum TCP connection duration", d_ci.remote.toStringWithPort());
×
313
    return false;
×
314
  }
×
315

316
  return true;
43,363✔
317
}
43,363✔
318

319
void IncomingTCPConnectionState::resetForNewQuery()
320
{
43,363✔
321
  d_buffer.clear();
43,363✔
322
  d_currentPos = 0;
43,363✔
323
  d_querySize = 0;
43,363✔
324
  d_state = State::waitingForQuery;
43,363✔
325
  d_readIOsTotal += d_readIOsCurrentQuery;
43,363✔
326
  d_readIOsCurrentQuery = 0;
43,363✔
327
}
43,363✔
328

329
boost::optional<timeval> IncomingTCPConnectionState::getClientReadTTD(timeval now) const
330
{
87,585✔
331
  const auto& runtimeConfiguration = dnsdist::configuration::getCurrentRuntimeConfiguration();
87,585✔
332
  if (!isNearTCPLimits() && runtimeConfiguration.d_maxTCPConnectionDuration == 0 && runtimeConfiguration.d_tcpRecvTimeout == 0) {
87,585!
333
    return boost::none;
×
334
  }
×
335

336
  size_t maxTCPConnectionDuration = runtimeConfiguration.d_maxTCPConnectionDuration;
87,585✔
337
  uint16_t tcpRecvTimeout = runtimeConfiguration.d_tcpRecvTimeout;
87,585✔
338
  uint32_t tcpRecvTimeoutUsec = 0U;
87,585✔
339
  if (isNearTCPLimits()) {
87,585✔
340
    constexpr size_t maxTCPConnectionDurationNearLimits = 5U;
8✔
341
    constexpr uint32_t tcpRecvTimeoutUsecNearLimits = 500U * 1000U;
8✔
342
    maxTCPConnectionDuration = runtimeConfiguration.d_maxTCPConnectionDuration != 0 ? std::min(runtimeConfiguration.d_maxTCPConnectionDuration, maxTCPConnectionDurationNearLimits) : maxTCPConnectionDurationNearLimits;
8!
343
    tcpRecvTimeout = 0;
8✔
344
    tcpRecvTimeoutUsec = tcpRecvTimeoutUsecNearLimits;
8✔
345
  }
8✔
346

347
  if (maxTCPConnectionDuration > 0) {
87,585✔
348
    auto elapsed = now.tv_sec - d_connectionStartTime.tv_sec;
521✔
349
    if (elapsed < 0 || (static_cast<size_t>(elapsed) >= maxTCPConnectionDuration)) {
521!
350
      return now;
×
351
    }
×
352
    auto remaining = maxTCPConnectionDuration - elapsed;
521✔
353
    if (!isNearTCPLimits() && (runtimeConfiguration.d_tcpRecvTimeout == 0 || remaining <= static_cast<size_t>(runtimeConfiguration.d_tcpRecvTimeout))) {
521!
354
      now.tv_sec += static_cast<time_t>(remaining);
20✔
355
      return now;
20✔
356
    }
20✔
357
  }
521✔
358

359
  now.tv_sec += static_cast<time_t>(tcpRecvTimeout);
87,565✔
360
  now.tv_usec += tcpRecvTimeoutUsec;
87,565✔
361
  normalizeTV(now);
87,565✔
362
  return now;
87,565✔
363
}
87,585✔
364

365
boost::optional<timeval> IncomingTCPConnectionState::getClientWriteTTD(const timeval& now) const
366
{
23✔
367
  const auto& runtimeConfiguration = dnsdist::configuration::getCurrentRuntimeConfiguration();
23✔
368
  if (runtimeConfiguration.d_maxTCPConnectionDuration == 0 && runtimeConfiguration.d_tcpSendTimeout == 0) {
23!
369
    return boost::none;
×
370
  }
×
371

372
  timeval res(now);
23✔
373

374
  if (runtimeConfiguration.d_maxTCPConnectionDuration > 0) {
23!
375
    auto elapsed = res.tv_sec - d_connectionStartTime.tv_sec;
×
376
    if (elapsed < 0 || static_cast<size_t>(elapsed) >= runtimeConfiguration.d_maxTCPConnectionDuration) {
×
377
      return res;
×
378
    }
×
379
    auto remaining = runtimeConfiguration.d_maxTCPConnectionDuration - elapsed;
×
380
    if (runtimeConfiguration.d_tcpSendTimeout == 0 || remaining <= static_cast<size_t>(runtimeConfiguration.d_tcpSendTimeout)) {
×
381
      res.tv_sec += static_cast<time_t>(remaining);
×
382
      return res;
×
383
    }
×
384
  }
×
385

386
  res.tv_sec += static_cast<time_t>(runtimeConfiguration.d_tcpSendTimeout);
23✔
387
  return res;
23✔
388
}
23✔
389

390
bool IncomingTCPConnectionState::maxConnectionDurationReached(unsigned int maxConnectionDuration, const timeval& now) const
391
{
92,959✔
392
  if (maxConnectionDuration > 0) {
92,959✔
393
    time_t curtime = now.tv_sec;
788✔
394
    unsigned int elapsed = 0;
788✔
395
    if (curtime > d_connectionStartTime.tv_sec) { // To prevent issues when time goes backward
788✔
396
      elapsed = curtime - d_connectionStartTime.tv_sec;
183✔
397
    }
183✔
398
    if (elapsed >= maxConnectionDuration) {
788✔
399
      return true;
1✔
400
    }
1✔
401
  }
788✔
402

403
  return false;
92,958✔
404
}
92,959✔
405

406
void IncomingTCPConnectionState::registerOwnedDownstreamConnection(std::shared_ptr<TCPConnectionToBackend>& conn)
407
{
21✔
408
  const auto& downstream = conn->getDS();
21✔
409
  auto& queue = d_ownedConnectionsToBackend[downstream];
21✔
410
  // how many proxy-protocol enabled connections do we want to keep around?
411
  // - they are only usable for this incoming connection because of the proxy protocol header containing the source and destination addresses and ports
412
  // - if we have TLV values, and they are unique per query, keeping these is useless
413
  // - if there is no, or identical, TLV values, a single outgoing connection is enough if maxInFlight == 1, or if incoming maxInFlight == outgoing maxInFlight
414
  // so it makes sense to keep a few of them around if incoming maxInFlight is greater than outgoing maxInFlight
415

416
  auto incomingMaxInFlightQueriesPerConn = d_ci.cs->d_maxInFlightQueriesPerConn;
21✔
417
  incomingMaxInFlightQueriesPerConn = std::max(incomingMaxInFlightQueriesPerConn, static_cast<size_t>(1U));
21✔
418
  auto outgoingMaxInFlightQueriesPerConn = downstream->d_config.d_maxInFlightQueriesPerConn;
21✔
419
  outgoingMaxInFlightQueriesPerConn = std::max(outgoingMaxInFlightQueriesPerConn, static_cast<size_t>(1U));
21✔
420
  size_t maxCachedOutgoingConnections = std::min(static_cast<size_t>(incomingMaxInFlightQueriesPerConn / outgoingMaxInFlightQueriesPerConn), static_cast<size_t>(5U));
21✔
421

422
  queue.push_front(conn);
21✔
423
  if (queue.size() > maxCachedOutgoingConnections) {
21!
424
    queue.pop_back();
×
425
  }
×
426
}
21✔
427

428
void IncomingTCPConnectionState::clearOwnedDownstreamConnections(const std::shared_ptr<DownstreamState>& downstream)
429
{
5✔
430
  d_ownedConnectionsToBackend.erase(downstream);
5✔
431
}
5✔
432

433
/* called when the buffer has been set and the rules have been processed, and only from handleIO (sometimes indirectly via handleQuery) */
434
IOState IncomingTCPConnectionState::sendResponse(const struct timeval& now, TCPResponse&& response)
435
{
43,525✔
436
  (void)now;
43,525✔
437
  d_state = State::sendingResponse;
43,525✔
438

439
  const auto responseSize = static_cast<uint16_t>(response.d_buffer.size());
43,525✔
440
  const std::array<uint8_t, 2> sizeBytes{static_cast<uint8_t>(responseSize / 256), static_cast<uint8_t>(responseSize % 256)};
43,525✔
441
  /* prepend the size. Yes, this is not the most efficient way but it prevents mistakes
442
     that could occur if we had to deal with the size during the processing,
443
     especially alignment issues */
444
  response.d_buffer.insert(response.d_buffer.begin(), sizeBytes.begin(), sizeBytes.end());
43,525✔
445
  d_currentPos = 0;
43,525✔
446
  d_currentResponse = std::move(response);
43,525✔
447

448
  try {
43,525✔
449
    auto iostate = d_handler.tryWrite(d_currentResponse.d_buffer, d_currentPos, d_currentResponse.d_buffer.size());
43,525✔
450
    if (iostate == IOState::Done) {
43,525✔
451
      DEBUGLOG("response sent from " << __PRETTY_FUNCTION__);
43,506✔
452
      handleResponseSent(d_currentResponse, d_currentResponse.d_buffer.size());
43,506✔
453
      return iostate;
43,506✔
454
    }
43,506✔
455
    d_lastIOBlocked = true;
19✔
456
    DEBUGLOG("partial write");
19✔
457
    return iostate;
19✔
458
  }
43,525✔
459
  catch (const std::exception& e) {
43,525✔
460
    vinfolog("Closing TCP client connection with %s: %s", d_ci.remote.toStringWithPort(), e.what());
4!
461
    DEBUGLOG("Closing TCP client connection: " << e.what());
4✔
462
    ++d_ci.cs->tcpDiedSendingResponse;
4✔
463

464
    terminateClientConnection();
4✔
465

466
    return IOState::Done;
4✔
467
  }
4✔
468
}
43,525✔
469

470
void IncomingTCPConnectionState::terminateClientConnection()
471
{
2,509✔
472
  DEBUGLOG("terminating client connection");
2,509✔
473
  d_queuedResponses.clear();
2,509✔
474
  /* we have already released idle connections that could be reused,
475
     we don't care about the ones still waiting for responses */
476
  for (auto& backend : d_ownedConnectionsToBackend) {
2,509✔
477
    for (auto& conn : backend.second) {
15✔
478
      conn->release(true);
15✔
479
    }
15✔
480
  }
15✔
481
  d_ownedConnectionsToBackend.clear();
2,509✔
482

483
  /* meaning we will no longer be 'active' when the backend
484
     response or timeout comes in */
485
  d_ioState.reset();
2,509✔
486

487
  /* if we do have remaining async descriptors associated with this TLS
488
     connection, we need to defer the destruction of the TLS object until
489
     the engine has reported back, otherwise we have a use-after-free.. */
490
  auto afds = d_handler.getAsyncFDs();
2,509✔
491
  if (afds.empty()) {
2,509!
492
    d_handler.close();
2,509✔
493
  }
2,509✔
494
  else {
×
495
    /* we might already be waiting, but we might also not because sometimes we have already been
496
       notified via the descriptor, not received Async again, but the async job still exists.. */
497
    auto state = shared_from_this();
×
498
    for (const auto desc : afds) {
×
499
      try {
×
500
        state->d_threadData.mplexer->addReadFD(desc, handleAsyncReady, state);
×
501
      }
×
502
      catch (...) {
×
503
      }
×
504
    }
×
505
  }
×
506
}
2,509✔
507

508
void IncomingTCPConnectionState::queueResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response, bool fromBackend)
509
{
43,669✔
510
  // queue response
511
  state->d_queuedResponses.emplace_back(std::move(response));
43,669✔
512
  DEBUGLOG("queueing response, state is " << (int)state->d_state << ", queue size is now " << state->d_queuedResponses.size());
43,669✔
513

514
  // when the response comes from a backend, there is a real possibility that we are currently
515
  // idle, and thus not trying to send the response right away would make our ref count go to 0.
516
  // Even if we are waiting for a query, we will not wake up before the new query arrives or a
517
  // timeout occurs
518
  if (state->d_state == State::idle || state->d_state == State::waitingForQuery) {
43,669✔
519
    auto iostate = sendQueuedResponses(state, now);
43,417✔
520

521
    if (iostate == IOState::Done && state->active()) {
43,417✔
522
      if (state->canAcceptNewQueries(now)) {
43,401✔
523
        state->resetForNewQuery();
43,146✔
524
        state->d_state = State::waitingForQuery;
43,146✔
525
        iostate = IOState::NeedRead;
43,146✔
526
      }
43,146✔
527
      else {
255✔
528
        state->d_state = State::idle;
255✔
529
      }
255✔
530
    }
43,401✔
531

532
    // for the same reason we need to update the state right away, nobody will do that for us
533
    if (state->active()) {
43,417✔
534
      state->updateIO(iostate, now);
43,413✔
535
      // if we have not finished reading every available byte, we _need_ to do an actual read
536
      // attempt before waiting for the socket to become readable again, because if there is
537
      // buffered data available the socket might never become readable again.
538
      // This is true as soon as we deal with TLS because TLS records are processed one by
539
      // one and might not match what we see at the application layer, so data might already
540
      // be available in the TLS library's buffers. This is especially true when OpenSSL's
541
      // read-ahead mode is enabled because then it buffers even more than one SSL record
542
      // for performance reasons.
543
      if (fromBackend && !state->d_lastIOBlocked) {
43,413✔
544
        state->handleIO();
22,803✔
545
      }
22,803✔
546
    }
43,413✔
547
  }
43,417✔
548
}
43,669✔
549

550
void IncomingTCPConnectionState::handleAsyncReady([[maybe_unused]] int desc, FDMultiplexer::funcparam_t& param)
551
{
×
552
  auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(param);
×
553

554
  /* If we are here, the async jobs for this SSL* are finished
555
     so we should be able to remove all FDs */
556
  auto afds = state->d_handler.getAsyncFDs();
×
557
  for (const auto afd : afds) {
×
558
    try {
×
559
      state->d_threadData.mplexer->removeReadFD(afd);
×
560
    }
×
561
    catch (...) {
×
562
    }
×
563
  }
×
564

565
  if (state->active()) {
×
566
    /* and now we restart our own I/O state machine */
567
    state->handleIO();
×
568
  }
×
569
  else {
×
570
    /* we were only waiting for the engine to come back,
571
       to prevent a use-after-free */
572
    state->d_handler.close();
×
573
  }
×
574
}
×
575

576
void IncomingTCPConnectionState::updateIOForAsync(std::shared_ptr<IncomingTCPConnectionState>& conn)
577
{
×
578
  auto fds = conn->d_handler.getAsyncFDs();
×
579
  for (const auto desc : fds) {
×
580
    conn->d_threadData.mplexer->addReadFD(desc, handleAsyncReady, conn);
×
581
  }
×
582
  conn->d_ioState->update(IOState::Done, handleIOCallback, conn);
×
583
}
×
584

585
void IncomingTCPConnectionState::updateIO(IOState newState, const struct timeval& now)
586
{
87,160✔
587
  auto sharedPtrToConn = shared_from_this();
87,160✔
588
  if (newState == IOState::Async) {
87,160!
589
    updateIOForAsync(sharedPtrToConn);
×
590
    return;
×
591
  }
×
592

593
  d_ioState->update(newState, handleIOCallback, sharedPtrToConn, newState == IOState::NeedWrite ? getClientWriteTTD(now) : getClientReadTTD(now));
87,160✔
594
}
87,160✔
595

596
/* called from the backend code when a new response has been received */
597
void IncomingTCPConnectionState::handleResponse(const struct timeval& now, TCPResponse&& response)
598
{
22,962✔
599
  if (std::this_thread::get_id() != d_creatorThreadID) {
22,962✔
600
    handleCrossProtocolResponse(now, std::move(response));
133✔
601
    return;
133✔
602
  }
133✔
603

604
  std::shared_ptr<IncomingTCPConnectionState> state = shared_from_this();
22,829✔
605

606
  if (!response.isAsync() && response.d_connection && response.d_connection->getDS() && response.d_connection->getDS()->d_config.useProxyProtocol) {
22,829!
607
    // if we have added a TCP Proxy Protocol payload to a connection, don't release it to the general pool as no one else will be able to use it anyway
608
    if (!response.d_connection->willBeReusable(true)) {
51!
609
      // if it can't be reused even by us, well
610
      const auto connIt = state->d_ownedConnectionsToBackend.find(response.d_connection->getDS());
×
611
      if (connIt != state->d_ownedConnectionsToBackend.end()) {
×
612
        auto& list = connIt->second;
×
613

614
        for (auto it = list.begin(); it != list.end(); ++it) {
×
615
          if (*it == response.d_connection) {
×
616
            try {
×
617
              response.d_connection->release(true);
×
618
            }
×
619
            catch (const std::exception& e) {
×
620
              vinfolog("Error releasing connection: %s", e.what());
×
621
            }
×
622
            list.erase(it);
×
623
            break;
×
624
          }
×
625
        }
×
626
      }
×
627
    }
×
628
  }
51✔
629

630
  if (response.d_buffer.size() < sizeof(dnsheader)) {
22,829✔
631
    state->terminateClientConnection();
2✔
632
    return;
2✔
633
  }
2✔
634

635
  if (!response.isAsync()) {
22,827✔
636
    try {
22,743✔
637
      auto& ids = response.d_idstate;
22,743✔
638
      std::shared_ptr<DownstreamState> backend = response.d_ds ? response.d_ds : (response.d_connection ? response.d_connection->getDS() : nullptr);
22,743!
639
      if (backend == nullptr || !responseContentMatches(response.d_buffer, ids.qname, ids.qtype, ids.qclass, backend, dnsdist::configuration::getCurrentRuntimeConfiguration().d_allowEmptyResponse)) {
22,743!
640
        state->terminateClientConnection();
3✔
641
        return;
3✔
642
      }
3✔
643

644
      if (backend != nullptr) {
22,740!
645
        ++backend->responses;
22,740✔
646
      }
22,740✔
647

648
      DNSResponse dnsResponse(ids, response.d_buffer, backend);
22,740✔
649
      dnsResponse.d_incomingTCPState = state;
22,740✔
650

651
      memcpy(&response.d_cleartextDH, dnsResponse.getHeader().get(), sizeof(response.d_cleartextDH));
22,740✔
652

653
      if (!processResponse(response.d_buffer, dnsResponse, false)) {
22,740✔
654
        state->terminateClientConnection();
6✔
655
        return;
6✔
656
      }
6✔
657

658
      if (dnsResponse.isAsynchronous()) {
22,734✔
659
        /* we are done for now */
660
        return;
80✔
661
      }
80✔
662
    }
22,734✔
663
    catch (const std::exception& e) {
22,743✔
664
      vinfolog("Unexpected exception while handling response from backend: %s", e.what());
4!
665
      state->terminateClientConnection();
4✔
666
      return;
4✔
667
    }
4✔
668
  }
22,743✔
669

670
  ++dnsdist::metrics::g_stats.responses;
22,734✔
671
  ++state->d_ci.cs->responses;
22,734✔
672

673
  queueResponse(state, now, std::move(response), true);
22,734✔
674
}
22,734✔
675

676
struct TCPCrossProtocolResponse
677
{
678
  TCPCrossProtocolResponse(TCPResponse&& response, std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now) :
679
    d_response(std::move(response)), d_state(state), d_now(now)
284✔
680
  {
284✔
681
  }
284✔
682
  TCPCrossProtocolResponse(const TCPCrossProtocolResponse&) = delete;
683
  TCPCrossProtocolResponse& operator=(const TCPCrossProtocolResponse&) = delete;
684
  TCPCrossProtocolResponse(TCPCrossProtocolResponse&&) = delete;
685
  TCPCrossProtocolResponse& operator=(TCPCrossProtocolResponse&&) = delete;
686
  ~TCPCrossProtocolResponse() = default;
284✔
687

688
  TCPResponse d_response;
689
  std::shared_ptr<IncomingTCPConnectionState> d_state;
690
  struct timeval d_now;
691
};
692

693
class TCPCrossProtocolQuery : public CrossProtocolQuery
694
{
695
public:
696
  TCPCrossProtocolQuery(PacketBuffer&& buffer, InternalQueryState&& ids, std::shared_ptr<DownstreamState> backend, std::shared_ptr<IncomingTCPConnectionState> sender) :
697
    CrossProtocolQuery(InternalQuery(std::move(buffer), std::move(ids)), backend), d_sender(std::move(sender))
231✔
698
  {
231✔
699
  }
231✔
700
  TCPCrossProtocolQuery(const TCPCrossProtocolQuery&) = delete;
701
  TCPCrossProtocolQuery& operator=(const TCPCrossProtocolQuery&) = delete;
702
  TCPCrossProtocolQuery(TCPCrossProtocolQuery&&) = delete;
703
  TCPCrossProtocolQuery& operator=(TCPCrossProtocolQuery&&) = delete;
704
  ~TCPCrossProtocolQuery() override = default;
231✔
705

706
  std::shared_ptr<TCPQuerySender> getTCPQuerySender() override
707
  {
208✔
708
    return d_sender;
208✔
709
  }
208✔
710

711
  DNSQuestion getDQ() override
712
  {
178✔
713
    auto& ids = query.d_idstate;
178✔
714
    DNSQuestion dnsQuestion(ids, query.d_buffer);
178✔
715
    dnsQuestion.d_incomingTCPState = d_sender;
178✔
716
    return dnsQuestion;
178✔
717
  }
178✔
718

719
  DNSResponse getDR() override
720
  {
72✔
721
    auto& ids = query.d_idstate;
72✔
722
    DNSResponse dnsResponse(ids, query.d_buffer, downstream);
72✔
723
    dnsResponse.d_incomingTCPState = d_sender;
72✔
724
    return dnsResponse;
72✔
725
  }
72✔
726

727
private:
728
  std::shared_ptr<IncomingTCPConnectionState> d_sender;
729
};
730

731
std::unique_ptr<CrossProtocolQuery> IncomingTCPConnectionState::getCrossProtocolQuery(PacketBuffer&& query, InternalQueryState&& state, const std::shared_ptr<DownstreamState>& backend)
732
{
4✔
733
  return std::make_unique<TCPCrossProtocolQuery>(std::move(query), std::move(state), backend, shared_from_this());
4✔
734
}
4✔
735

736
std::unique_ptr<CrossProtocolQuery> getTCPCrossProtocolQueryFromDQ(DNSQuestion& dnsQuestion)
737
{
197✔
738
  auto state = dnsQuestion.getIncomingTCPState();
197✔
739
  if (!state) {
197!
740
    throw std::runtime_error("Trying to create a TCP cross protocol query without a valid TCP state");
×
741
  }
×
742

743
  dnsQuestion.ids.origID = dnsQuestion.getHeader()->id;
197✔
744
  return std::make_unique<TCPCrossProtocolQuery>(std::move(dnsQuestion.getMutableData()), std::move(dnsQuestion.ids), nullptr, std::move(state));
197✔
745
}
197✔
746

747
void IncomingTCPConnectionState::handleCrossProtocolResponse(const struct timeval& now, TCPResponse&& response)
748
{
284✔
749
  std::shared_ptr<IncomingTCPConnectionState> state = shared_from_this();
284✔
750
  try {
284✔
751
    auto ptr = std::make_unique<TCPCrossProtocolResponse>(std::move(response), state, now);
284✔
752
    if (!state->d_threadData.crossProtocolResponseSender.send(std::move(ptr))) {
284!
753
      ++dnsdist::metrics::g_stats.tcpCrossProtocolResponsePipeFull;
×
754
      vinfolog("Unable to pass a cross-protocol response to the TCP worker thread because the pipe is full");
×
755
    }
×
756
  }
284✔
757
  catch (const std::exception& e) {
284✔
758
    vinfolog("Unable to pass a cross-protocol response to the TCP worker thread because we couldn't write to the pipe: %s", stringerror());
×
759
  }
×
760
}
284✔
761

762
IncomingTCPConnectionState::QueryProcessingResult IncomingTCPConnectionState::handleQuery(PacketBuffer&& queryIn, const struct timeval& now, std::optional<int32_t> streamID)
763
{
43,384✔
764
  auto query = std::move(queryIn);
43,384✔
765
  if (query.size() < sizeof(dnsheader)) {
43,384✔
766
    ++dnsdist::metrics::g_stats.nonCompliantQueries;
4✔
767
    ++d_ci.cs->nonCompliantQueries;
4✔
768
    return QueryProcessingResult::TooSmall;
4✔
769
  }
4✔
770

771
  ++d_queriesCount;
43,380✔
772
  ++d_ci.cs->queries;
43,380✔
773
  ++dnsdist::metrics::g_stats.queries;
43,380✔
774

775
  if (d_handler.isTLS()) {
43,380✔
776
    auto tlsVersion = d_handler.getTLSVersion();
41,276✔
777
    switch (tlsVersion) {
41,276✔
778
    case LibsslTLSVersion::TLS10:
×
779
      ++d_ci.cs->tls10queries;
×
780
      break;
×
781
    case LibsslTLSVersion::TLS11:
×
782
      ++d_ci.cs->tls11queries;
×
783
      break;
×
784
    case LibsslTLSVersion::TLS12:
14✔
785
      ++d_ci.cs->tls12queries;
14✔
786
      break;
14✔
787
    case LibsslTLSVersion::TLS13:
41,262✔
788
      ++d_ci.cs->tls13queries;
41,262✔
789
      break;
41,262✔
790
    default:
×
791
      ++d_ci.cs->tlsUnknownqueries;
×
792
    }
41,276✔
793
  }
41,276✔
794

795
  auto state = shared_from_this();
43,380✔
796
  InternalQueryState ids;
43,380✔
797
  ids.origDest = d_proxiedDestination;
43,380✔
798
  ids.origRemote = d_proxiedRemote;
43,380✔
799
  ids.cs = d_ci.cs;
43,380✔
800
  ids.queryRealTime.start();
43,380✔
801
  if (streamID) {
43,380✔
802
    ids.d_streamID = *streamID;
161✔
803
  }
161✔
804

805
  auto dnsCryptResponse = checkDNSCryptQuery(*d_ci.cs, query, ids.dnsCryptQuery, ids.queryRealTime.d_start.tv_sec, true);
43,380✔
806
  if (dnsCryptResponse) {
43,380!
807
    TCPResponse response;
×
808
    d_state = State::idle;
×
809
    ++d_currentQueriesCount;
×
810
    queueResponse(state, now, std::move(response), false);
×
811
    return QueryProcessingResult::SelfAnswered;
×
812
  }
×
813

814
  {
43,380✔
815
    /* this pointer will be invalidated the second the buffer is resized, don't hold onto it! */
816
    const dnsheader_aligned dnsHeader(query.data());
43,380✔
817
    if (!checkQueryHeaders(*dnsHeader, *d_ci.cs)) {
43,380✔
818
      return QueryProcessingResult::InvalidHeaders;
5✔
819
    }
5✔
820

821
    if (dnsHeader->qdcount == 0) {
43,375✔
822
      TCPResponse response;
3✔
823
      auto queryID = dnsHeader->id;
3✔
824
      dnsdist::PacketMangling::editDNSHeaderFromPacket(query, [](dnsheader& header) {
3✔
825
        header.rcode = RCode::NotImp;
3✔
826
        header.qr = true;
3✔
827
        return true;
3✔
828
      });
3✔
829
      response.d_idstate = std::move(ids);
3✔
830
      response.d_idstate.origID = queryID;
3✔
831
      response.d_idstate.selfGenerated = true;
3✔
832
      response.d_buffer = std::move(query);
3✔
833
      d_state = State::idle;
3✔
834
      ++d_currentQueriesCount;
3✔
835
      queueResponse(state, now, std::move(response), false);
3✔
836
      return QueryProcessingResult::SelfAnswered;
3✔
837
    }
3✔
838
  }
43,375✔
839

840
  pdns::trace::dnsdist::Tracer::Closer closer;
43,372✔
841
  if (auto tracer = ids.getTracer(); tracer != nullptr) {
43,372✔
842
    // TODO: figure out if this is a root span
843
    closer = tracer->openSpan("IncomingTCPConnectionState::handleQuery", tracer->getLastSpanID());
4✔
844
  }
4✔
845

846
  // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast
847
  ids.qname = DNSName(reinterpret_cast<const char*>(query.data()), static_cast<int>(query.size()), sizeof(dnsheader), false, &ids.qtype, &ids.qclass);
43,372✔
848
  ids.protocol = getProtocol();
43,372✔
849
  if (ids.dnsCryptQuery) {
43,372✔
850
    ids.protocol = dnsdist::Protocol::DNSCryptTCP;
16✔
851
  }
16✔
852

853
  DNSQuestion dnsQuestion(ids, query);
43,372✔
854
  dnsdist::PacketMangling::editDNSHeaderFromPacket(dnsQuestion.getMutableData(), [&ids](dnsheader& header) {
43,372✔
855
    const uint16_t* flags = getFlagsFromDNSHeader(&header);
43,370✔
856
    ids.origFlags = *flags;
43,370✔
857
    return true;
43,370✔
858
  });
43,370✔
859
  dnsQuestion.d_incomingTCPState = state;
43,372✔
860
  dnsQuestion.sni = d_handler.getServerNameIndication();
43,372✔
861

862
  if (d_proxyProtocolValues) {
43,372✔
863
    /* we need to copy them, because the next queries received on that connection will
864
       need to get the _unaltered_ values */
865
    dnsQuestion.proxyProtocolValues = make_unique<std::vector<ProxyProtocolValue>>(*d_proxyProtocolValues);
34✔
866
  }
34✔
867

868
  if (dnsQuestion.ids.qtype == QType::AXFR || dnsQuestion.ids.qtype == QType::IXFR) {
43,372✔
869
    dnsQuestion.ids.skipCache = true;
27✔
870
  }
27✔
871

872
  if (forwardViaUDPFirst()) {
43,372✔
873
    // if there was no EDNS, we add it with a large buffer size
874
    // so we can use UDP to talk to the backend.
875
    const dnsheader_aligned dnsHeader(query.data());
155✔
876
    if (dnsHeader->arcount == 0U) {
155✔
877
      if (addEDNS(query, 4096, false, 4096, 0)) {
147!
878
        dnsQuestion.ids.ednsAdded = true;
147✔
879
      }
147✔
880
    }
147✔
881
  }
155✔
882

883
  if (streamID) {
43,372✔
884
    auto unit = getDOHUnit(*streamID);
155✔
885
    if (unit) {
155!
886
      dnsQuestion.ids.du = std::move(unit);
155✔
887
    }
155✔
888
  }
155✔
889

890
  std::shared_ptr<DownstreamState> backend;
43,372✔
891
  auto result = processQuery(dnsQuestion, backend);
43,372✔
892

893
  if (result == ProcessQueryResult::Asynchronous) {
43,372✔
894
    /* we are done for now */
895
    ++d_currentQueriesCount;
108✔
896
    return QueryProcessingResult::Asynchronous;
108✔
897
  }
108✔
898

899
  if (streamID) {
43,264✔
900
    restoreDOHUnit(std::move(dnsQuestion.ids.du));
119✔
901
  }
119✔
902

903
  if (result == ProcessQueryResult::Drop) {
43,264✔
904
    return QueryProcessingResult::Dropped;
39✔
905
  }
39✔
906

907
  // the buffer might have been invalidated by now
908
  uint16_t queryID{0};
43,225✔
909
  {
43,225✔
910
    const auto dnsHeader = dnsQuestion.getHeader();
43,225✔
911
    queryID = dnsHeader->id;
43,225✔
912
  }
43,225✔
913

914
  if (result == ProcessQueryResult::SendAnswer) {
43,225✔
915
    TCPResponse response;
20,483✔
916
    {
20,483✔
917
      const auto dnsHeader = dnsQuestion.getHeader();
20,483✔
918
      memcpy(&response.d_cleartextDH, dnsHeader.get(), sizeof(response.d_cleartextDH));
20,483✔
919
    }
20,483✔
920
    response.d_idstate = std::move(ids);
20,483✔
921
    response.d_idstate.origID = queryID;
20,483✔
922
    response.d_idstate.selfGenerated = true;
20,483✔
923
    response.d_idstate.cs = d_ci.cs;
20,483✔
924
    response.d_buffer = std::move(query);
20,483✔
925

926
    d_state = State::idle;
20,483✔
927
    ++d_currentQueriesCount;
20,483✔
928
    queueResponse(state, now, std::move(response), false);
20,483✔
929
    return QueryProcessingResult::SelfAnswered;
20,483✔
930
  }
20,483✔
931

932
  if (result != ProcessQueryResult::PassToBackend || backend == nullptr) {
22,742!
933
    return QueryProcessingResult::NoBackend;
×
934
  }
×
935

936
  dnsQuestion.ids.origID = queryID;
22,742✔
937

938
  ++d_currentQueriesCount;
22,742✔
939

940
  std::string proxyProtocolPayload;
22,742✔
941
  if (backend->isDoH()) {
22,742✔
942
    vinfolog("Got query for %s|%s from %s (%s, %d bytes), relayed to %s", ids.qname.toLogString(), QType(ids.qtype).toString(), d_proxiedRemote.toStringWithPort(), getProtocol().toString(), query.size(), backend->getNameWithAddr());
30✔
943

944
    /* we need to do this _before_ creating the cross protocol query because
945
       after that the buffer will have been moved */
946
    if (backend->d_config.useProxyProtocol) {
30✔
947
      proxyProtocolPayload = getProxyProtocolPayload(dnsQuestion);
1✔
948
    }
1✔
949

950
    auto cpq = std::make_unique<TCPCrossProtocolQuery>(std::move(query), std::move(ids), backend, state);
30✔
951
    cpq->query.d_proxyProtocolPayload = std::move(proxyProtocolPayload);
30✔
952

953
    backend->passCrossProtocolQuery(std::move(cpq));
30✔
954
    return QueryProcessingResult::Forwarded;
30✔
955
  }
30✔
956
  if (!backend->isTCPOnly() && forwardViaUDPFirst()) {
22,712✔
957
    if (streamID) {
65!
958
      auto unit = getDOHUnit(*streamID);
65✔
959
      if (unit) {
65!
960
        dnsQuestion.ids.du = std::move(unit);
65✔
961
      }
65✔
962
    }
65✔
963
    if (assignOutgoingUDPQueryToBackend(backend, queryID, dnsQuestion, query)) {
65✔
964
      return QueryProcessingResult::Forwarded;
64✔
965
    }
64✔
966
    restoreDOHUnit(std::move(dnsQuestion.ids.du));
1✔
967
    // fallback to the normal flow
968
  }
1✔
969

970
  prependSizeToTCPQuery(query, 0);
22,648✔
971

972
  auto downstreamConnection = getDownstreamConnection(backend, dnsQuestion.proxyProtocolValues, now);
22,648✔
973

974
  if (backend->d_config.useProxyProtocol) {
22,648✔
975
    /* if we ever sent a TLV over a connection, we can never go back */
976
    if (!d_proxyProtocolPayloadHasTLV) {
58✔
977
      d_proxyProtocolPayloadHasTLV = dnsQuestion.proxyProtocolValues && !dnsQuestion.proxyProtocolValues->empty();
34!
978
    }
34✔
979

980
    proxyProtocolPayload = getProxyProtocolPayload(dnsQuestion);
58✔
981
  }
58✔
982

983
  if (dnsQuestion.proxyProtocolValues) {
22,648✔
984
    downstreamConnection->setProxyProtocolValuesSent(std::move(dnsQuestion.proxyProtocolValues));
31✔
985
  }
31✔
986

987
  TCPQuery tcpquery(std::move(query), std::move(ids));
22,648✔
988
  tcpquery.d_proxyProtocolPayload = std::move(proxyProtocolPayload);
22,648✔
989

990
  vinfolog("Got query for %s|%s from %s (%s, %d bytes), relayed to %s", tcpquery.d_idstate.qname.toLogString(), QType(tcpquery.d_idstate.qtype).toString(), d_proxiedRemote.toStringWithPort(), getProtocol().toString(), tcpquery.d_buffer.size(), backend->getNameWithAddr());
22,648✔
991
  std::shared_ptr<TCPQuerySender> incoming = state;
22,648✔
992
  downstreamConnection->queueQuery(incoming, std::move(tcpquery));
22,648✔
993
  return QueryProcessingResult::Forwarded;
22,648✔
994
}
22,712✔
995

996
void IncomingTCPConnectionState::handleIOCallback(int desc, FDMultiplexer::funcparam_t& param)
997
{
3,007✔
998
  auto conn = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(param);
3,007✔
999
  if (desc != conn->d_handler.getDescriptor()) {
3,007!
1000
    // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-array-to-pointer-decay): __PRETTY_FUNCTION__ is fine
1001
    throw std::runtime_error("Unexpected socket descriptor " + std::to_string(desc) + " received in " + std::string(__PRETTY_FUNCTION__) + ", expected " + std::to_string(conn->d_handler.getDescriptor()));
×
1002
  }
×
1003

1004
  conn->handleIO();
3,007✔
1005
}
3,007✔
1006

1007
void IncomingTCPConnectionState::handleHandshakeDone(const struct timeval& now)
1008
{
2,760✔
1009
  if (d_handler.isTLS()) {
2,760✔
1010
    if (!d_handler.hasTLSSessionBeenResumed()) {
666✔
1011
      ++d_ci.cs->tlsNewSessions;
430✔
1012
      dnsdist::IncomingConcurrentTCPConnectionsManager::accountTLSNewSession(d_ci.remote);
430✔
1013
    }
430✔
1014
    else {
236✔
1015
      ++d_ci.cs->tlsResumptions;
236✔
1016
      dnsdist::IncomingConcurrentTCPConnectionsManager::accountTLSResumedSession(d_ci.remote);
236✔
1017
    }
236✔
1018
    if (d_handler.getResumedFromInactiveTicketKey()) {
666✔
1019
      ++d_ci.cs->tlsInactiveTicketKey;
8✔
1020
    }
8✔
1021
    if (d_handler.getUnknownTicketKey()) {
666✔
1022
      ++d_ci.cs->tlsUnknownTicketKey;
6✔
1023
    }
6✔
1024
  }
666✔
1025

1026
  d_handshakeDoneTime = now;
2,760✔
1027
}
2,760✔
1028

1029
IncomingTCPConnectionState::ProxyProtocolResult IncomingTCPConnectionState::handleProxyProtocolPayload()
1030
{
19✔
1031
  do {
32✔
1032
    DEBUGLOG("reading proxy protocol header");
32✔
1033
    auto iostate = d_handler.tryRead(d_buffer, d_currentPos, d_proxyProtocolNeed, false, isProxyPayloadOutsideTLS());
32✔
1034
    if (iostate == IOState::Done) {
32✔
1035
      d_buffer.resize(d_currentPos);
27✔
1036
      ssize_t remaining = isProxyHeaderComplete(d_buffer);
27✔
1037
      if (remaining == 0) {
27✔
1038
        vinfolog("Unable to consume proxy protocol header in packet from TCP client %s", d_ci.remote.toStringWithPort());
3!
1039
        ++dnsdist::metrics::g_stats.proxyProtocolInvalid;
3✔
1040
        return ProxyProtocolResult::Error;
3✔
1041
      }
3✔
1042
      if (remaining < 0) {
24✔
1043
        d_proxyProtocolNeed += -remaining;
13✔
1044
        d_buffer.resize(d_currentPos + d_proxyProtocolNeed);
13✔
1045
        /* we need to keep reading, since we might have buffered data */
1046
      }
13✔
1047
      else {
11✔
1048
        /* proxy header received */
1049
        std::vector<ProxyProtocolValue> proxyProtocolValues;
11✔
1050
        if (!handleProxyProtocol(d_ci.remote, true, dnsdist::configuration::getCurrentRuntimeConfiguration().d_ACL, d_buffer, d_proxiedRemote, d_proxiedDestination, proxyProtocolValues)) {
11!
1051
          vinfolog("Error handling the Proxy Protocol received from TCP client %s", d_ci.remote.toStringWithPort());
×
1052
          return ProxyProtocolResult::Error;
×
1053
        }
×
1054

1055
        if (!proxyProtocolValues.empty()) {
11✔
1056
          d_proxyProtocolValues = make_unique<std::vector<ProxyProtocolValue>>(std::move(proxyProtocolValues));
9✔
1057
        }
9✔
1058

1059
        d_currentPos = 0;
11✔
1060
        d_proxyProtocolNeed = 0;
11✔
1061
        d_buffer.clear();
11✔
1062
        return ProxyProtocolResult::Done;
11✔
1063
      }
11✔
1064
    }
24✔
1065
    else {
5✔
1066
      d_lastIOBlocked = true;
5✔
1067
    }
5✔
1068
  } while (active() && !d_lastIOBlocked);
32✔
1069

1070
  return ProxyProtocolResult::Reading;
5✔
1071
}
19✔
1072

1073
IOState IncomingTCPConnectionState::handleHandshake(const struct timeval& now)
1074
{
2,925✔
1075
  DEBUGLOG("doing handshake");
2,925✔
1076
  auto iostate = d_handler.tryHandshake();
2,925✔
1077
  if (iostate == IOState::Done) {
2,925✔
1078
    DEBUGLOG("handshake done");
2,553✔
1079
    handleHandshakeDone(now);
2,553✔
1080

1081
    if (d_ci.cs != nullptr && d_ci.cs->d_enableProxyProtocol && !isProxyPayloadOutsideTLS() && expectProxyProtocolFrom(d_ci.remote)) {
2,553!
1082
      d_state = State::readingProxyProtocolHeader;
16✔
1083
      d_buffer.resize(s_proxyProtocolMinimumHeaderSize);
16✔
1084
      d_proxyProtocolNeed = s_proxyProtocolMinimumHeaderSize;
16✔
1085
    }
16✔
1086
    else {
2,537✔
1087
      d_state = State::readingQuerySize;
2,537✔
1088
    }
2,537✔
1089
  }
2,553✔
1090
  else {
372✔
1091
    d_lastIOBlocked = true;
372✔
1092
  }
372✔
1093

1094
  return iostate;
2,925✔
1095
}
2,925✔
1096

1097
IOState IncomingTCPConnectionState::handleIncomingQueryReceived(const struct timeval& now)
1098
{
43,219✔
1099
  DEBUGLOG("query received");
43,219✔
1100
  d_buffer.resize(d_querySize);
43,219✔
1101

1102
  d_state = State::idle;
43,219✔
1103
  auto processingResult = handleQuery(std::move(d_buffer), now, std::nullopt);
43,219✔
1104
  switch (processingResult) {
43,219✔
1105
  case QueryProcessingResult::TooSmall:
×
1106
    /* fall-through */
1107
  case QueryProcessingResult::InvalidHeaders:
3✔
1108
    /* fall-through */
1109
  case QueryProcessingResult::Dropped:
37✔
1110
    /* fall-through */
1111
  case QueryProcessingResult::NoBackend:
37!
1112
    terminateClientConnection();
37✔
1113
    ;
37✔
1114
  default:
43,215✔
1115
    break;
43,215✔
1116
  }
43,219✔
1117

1118
  /* the state might have been updated in the meantime, we don't want to override it
1119
     in that case */
1120
  if (active() && d_state != State::idle) {
43,215✔
1121
    if (d_ioState->isWaitingForRead()) {
40,654✔
1122
      return IOState::NeedRead;
40,650✔
1123
    }
40,650✔
1124
    if (d_ioState->isWaitingForWrite()) {
4!
1125
      return IOState::NeedWrite;
4✔
1126
    }
4✔
1127
    return IOState::Done;
×
1128
  }
4✔
1129
  return IOState::Done;
2,561✔
1130
};
43,215✔
1131

1132
void IncomingTCPConnectionState::handleExceptionDuringIO(const std::exception& exp)
1133
{
2,409✔
1134
  if (d_state == State::idle || d_state == State::waitingForQuery) {
2,409✔
1135
    /* no need to increase any counters in that case, the client is simply done with us */
1136
  }
1,994✔
1137
  else if (d_state == State::doingHandshake || d_state == State::readingProxyProtocolHeader || d_state == State::waitingForQuery || d_state == State::readingQuerySize || d_state == State::readingQuery) {
415!
1138
    ++d_ci.cs->tcpDiedReadingQuery;
415✔
1139
  }
415✔
1140
  else if (d_state == State::sendingResponse) {
×
1141
    /* unlikely to happen here, the exception should be handled in sendResponse() */
1142
    ++d_ci.cs->tcpDiedSendingResponse;
×
1143
  }
×
1144

1145
  if (d_ioState->isWaitingForWrite() || d_queriesCount == 0) {
2,409!
1146
    DEBUGLOG("Got an exception while handling TCP query: " << exp.what());
415✔
1147
    vinfolog("Got an exception while handling (%s) TCP query from %s: %s", (d_ioState->isWaitingForRead() ? "reading" : "writing"), d_ci.remote.toStringWithPort(), exp.what());
415!
1148
  }
415✔
1149
  else {
1,994✔
1150
    vinfolog("Closing TCP client connection with %s: %s", d_ci.remote.toStringWithPort(), exp.what());
1,994✔
1151
    DEBUGLOG("Closing TCP client connection: " << exp.what());
1,994✔
1152
  }
1,994✔
1153
  /* remove this FD from the IO multiplexer */
1154
  terminateClientConnection();
2,409✔
1155
}
2,409✔
1156

1157
bool IncomingTCPConnectionState::readIncomingQuery(const timeval& now, IOState& iostate)
1158
{
48,266✔
1159
  if (!d_lastIOBlocked && (d_state == State::waitingForQuery || d_state == State::readingQuerySize)) {
48,266!
1160
    DEBUGLOG("reading query size");
48,123✔
1161
    d_buffer.resize(sizeof(uint16_t));
48,123✔
1162
    d_readIOsCurrentQuery++;
48,123✔
1163
    iostate = d_handler.tryRead(d_buffer, d_currentPos, sizeof(uint16_t));
48,123✔
1164
    if (d_currentPos > 0) {
48,123✔
1165
      /* if we got at least one byte, we can't go around sending responses */
1166
      d_state = State::readingQuerySize;
43,231✔
1167
    }
43,231✔
1168

1169
    if (iostate == IOState::Done) {
48,123✔
1170
      DEBUGLOG("query size received");
43,227✔
1171
      d_state = State::readingQuery;
43,227✔
1172
      d_querySizeReadTime = now;
43,227✔
1173
      if (d_queriesCount == 0) {
43,227✔
1174
        d_firstQuerySizeReadTime = now;
2,135✔
1175
      }
2,135✔
1176
      d_querySize = d_buffer.at(0) * 256 + d_buffer.at(1);
43,227✔
1177
      if (d_querySize < sizeof(dnsheader)) {
43,227✔
1178
        /* go away */
1179
        terminateClientConnection();
2✔
1180
        return true;
2✔
1181
      }
2✔
1182

1183
      d_buffer.resize(d_querySize);
43,225✔
1184
      d_currentPos = 0;
43,225✔
1185
    }
43,225✔
1186
    else {
4,896✔
1187
      d_lastIOBlocked = true;
4,896✔
1188
    }
4,896✔
1189
  }
48,123✔
1190

1191
  if (!d_lastIOBlocked && d_state == State::readingQuery) {
48,264!
1192
    DEBUGLOG("reading query");
43,368✔
1193
    d_readIOsCurrentQuery++;
43,368✔
1194
    iostate = d_handler.tryRead(d_buffer, d_currentPos, d_querySize);
43,368✔
1195
    if (iostate == IOState::Done) {
43,368✔
1196
      iostate = handleIncomingQueryReceived(now);
43,219✔
1197
    }
43,219✔
1198
    else {
149✔
1199
      d_lastIOBlocked = true;
149✔
1200
    }
149✔
1201
  }
43,368✔
1202

1203
  return false;
48,264✔
1204
}
48,266✔
1205

1206
void IncomingTCPConnectionState::handleIO()
1207
{
28,268✔
1208
  // let's make sure we are not already in handleIO() below in the stack:
1209
  // this might happen when we have a response available on the backend socket
1210
  // right after forwarding the query, and then a query waiting for us on the
1211
  // client socket right after forwarding the response, and then a response available
1212
  // on the backend socket right after forwarding the query.. you get the idea.
1213
  if (d_handlingIO) {
28,268✔
1214
    return;
20,223✔
1215
  }
20,223✔
1216
  dnsdist::tcp::HandlingIOGuard reentryGuard(d_handlingIO);
8,045✔
1217

1218
  // why do we loop? Because the TLS layer does buffering, and thus can have data ready to read
1219
  // even though the underlying socket is not ready, so we need to actually ask for the data first
1220
  IOState iostate = IOState::Done;
8,045✔
1221
  timeval now{};
8,045✔
1222
  gettimeofday(&now, nullptr);
8,045✔
1223

1224
  do {
48,912✔
1225
    iostate = IOState::Done;
48,912✔
1226
    IOStateGuard ioGuard(d_ioState);
48,912✔
1227

1228
    if (maxConnectionDurationReached(dnsdist::configuration::getCurrentRuntimeConfiguration().d_maxTCPConnectionDuration, now)) {
48,912✔
1229
      vinfolog("Terminating TCP connection from %s because it reached the maximum TCP connection duration", d_ci.remote.toStringWithPort());
1!
1230
      // will be handled by the ioGuard
1231
      // handleNewIOState(state, IOState::Done, fd, handleIOCallback);
1232
      return;
1✔
1233
    }
1✔
1234

1235
    const auto& immutable = dnsdist::configuration::getImmutableConfiguration();
48,911✔
1236
    if (immutable.d_maxTCPReadIOsPerQuery > 0 && d_readIOsCurrentQuery >= immutable.d_maxTCPReadIOsPerQuery) {
48,911✔
1237
      vinfolog("Terminating TCP connection from %s for reaching the maximum number of read IO events per query (%d)", d_ci.remote.toStringWithPort(), immutable.d_maxTCPReadIOsPerQuery);
1!
1238
      dnsdist::IncomingConcurrentTCPConnectionsManager::banClientFor(d_ci.remote, time(nullptr), immutable.d_tcpBanDurationForExceedingMaxReadIOsPerQuery);
1✔
1239
      return;
1✔
1240
    }
1✔
1241

1242
    d_lastIOBlocked = false;
48,910✔
1243

1244
    try {
48,910✔
1245
      if (d_state == State::starting) {
48,910✔
1246
        if (d_ci.cs != nullptr && d_ci.cs->d_enableProxyProtocol && isProxyPayloadOutsideTLS() && expectProxyProtocolFrom(d_ci.remote)) {
2,554!
1247
          d_state = State::readingProxyProtocolHeader;
1✔
1248
          d_buffer.resize(s_proxyProtocolMinimumHeaderSize);
1✔
1249
          d_proxyProtocolNeed = s_proxyProtocolMinimumHeaderSize;
1✔
1250
        }
1✔
1251
        else {
2,553✔
1252
          d_state = State::doingHandshake;
2,553✔
1253
        }
2,553✔
1254
      }
2,554✔
1255

1256
      if (d_state == State::doingHandshake) {
48,910✔
1257
        iostate = handleHandshake(now);
2,924✔
1258
      }
2,924✔
1259

1260
      if (!d_lastIOBlocked && d_state == State::readingProxyProtocolHeader) {
48,910✔
1261
        auto status = handleProxyProtocolPayload();
17✔
1262
        if (status == ProxyProtocolResult::Done) {
17✔
1263
          d_buffer.resize(sizeof(uint16_t));
9✔
1264

1265
          if (isProxyPayloadOutsideTLS()) {
9✔
1266
            d_state = State::doingHandshake;
1✔
1267
            iostate = handleHandshake(now);
1✔
1268
          }
1✔
1269
          else {
8✔
1270
            d_state = State::readingQuerySize;
8✔
1271
          }
8✔
1272
        }
9✔
1273
        else if (status == ProxyProtocolResult::Error) {
8✔
1274
          iostate = IOState::Done;
3✔
1275
        }
3✔
1276
        else {
5✔
1277
          iostate = IOState::NeedRead;
5✔
1278
        }
5✔
1279
      }
17✔
1280

1281
      if (!d_lastIOBlocked && (d_state == State::waitingForQuery || d_state == State::readingQuerySize || d_state == State::readingQuery)) {
48,910✔
1282
        if (readIncomingQuery(now, iostate)) {
48,266✔
1283
          return;
2✔
1284
        }
2✔
1285
      }
48,266✔
1286

1287
      if (!d_lastIOBlocked && d_state == State::sendingResponse) {
48,908✔
1288
        DEBUGLOG("sending response");
13✔
1289
        iostate = d_handler.tryWrite(d_currentResponse.d_buffer, d_currentPos, d_currentResponse.d_buffer.size());
13✔
1290
        if (iostate == IOState::Done) {
13!
1291
          DEBUGLOG("response sent from " << __PRETTY_FUNCTION__);
13✔
1292
          handleResponseSent(d_currentResponse, d_currentResponse.d_buffer.size());
13✔
1293
          d_state = State::idle;
13✔
1294
        }
13✔
1295
        else {
×
1296
          d_lastIOBlocked = true;
×
1297
        }
×
1298
      }
13✔
1299

1300
      if (active() && !d_lastIOBlocked && iostate == IOState::Done && (d_state == State::idle || d_state == State::waitingForQuery)) {
48,908!
1301
        // try sending queued responses
1302
        DEBUGLOG("send responses, if any");
2,769✔
1303
        auto state = shared_from_this();
2,769✔
1304
        iostate = sendQueuedResponses(state, now);
2,769✔
1305

1306
        if (!d_lastIOBlocked && active() && iostate == IOState::Done) {
2,769!
1307
          // if the query has been passed to a backend, or dropped, and the responses have been sent,
1308
          // we can start reading again
1309
          if (canAcceptNewQueries(now)) {
2,766✔
1310
            resetForNewQuery();
217✔
1311
            iostate = IOState::NeedRead;
217✔
1312
          }
217✔
1313
          else {
2,549✔
1314
            d_state = State::idle;
2,549✔
1315
            iostate = IOState::Done;
2,549✔
1316
          }
2,549✔
1317
        }
2,766✔
1318
      }
2,769✔
1319

1320
      if (d_state != State::idle && d_state != State::doingHandshake && d_state != State::readingProxyProtocolHeader && d_state != State::waitingForQuery && d_state != State::readingQuerySize && d_state != State::readingQuery && d_state != State::sendingResponse) {
48,908!
1321
        vinfolog("Unexpected state %d in handleIOCallback", static_cast<int>(d_state));
×
1322
      }
×
1323
    }
48,908✔
1324
    catch (const std::exception& exp) {
48,910✔
1325
      /* most likely an EOF because the other end closed the connection,
1326
         but it might also be a real IO error or something else.
1327
         Let's just drop the connection
1328
      */
1329
      handleExceptionDuringIO(exp);
2,409✔
1330
    }
2,409✔
1331

1332
    if (!active()) {
48,907✔
1333
      DEBUGLOG("state is no longer active");
2,465✔
1334
      return;
2,465✔
1335
    }
2,465✔
1336

1337
    auto sharedPtrToConn = shared_from_this();
46,442✔
1338
    if (iostate == IOState::Done) {
46,442✔
1339
      d_ioState->update(iostate, handleIOCallback, sharedPtrToConn);
2,552✔
1340
    }
2,552✔
1341
    else {
43,890✔
1342
      updateIO(iostate, now);
43,890✔
1343
    }
43,890✔
1344
    ioGuard.release();
46,442✔
1345
  } while ((iostate == IOState::NeedRead || iostate == IOState::NeedWrite) && !d_lastIOBlocked);
46,442✔
1346
}
8,045✔
1347

1348
void IncomingTCPConnectionState::notifyIOError(const struct timeval& now, TCPResponse&& response)
1349
{
62✔
1350
  if (std::this_thread::get_id() != d_creatorThreadID) {
62✔
1351
    /* empty buffer will signal an IO error */
1352
    response.d_buffer.clear();
18✔
1353
    handleCrossProtocolResponse(now, std::move(response));
18✔
1354
    return;
18✔
1355
  }
18✔
1356

1357
  auto sharedPtrToConn = shared_from_this();
44✔
1358
  --sharedPtrToConn->d_currentQueriesCount;
44✔
1359
  sharedPtrToConn->d_hadErrors = true;
44✔
1360

1361
  if (sharedPtrToConn->d_state == State::sendingResponse) {
44✔
1362
    /* if we have responses to send, let's do that first */
1363
  }
2✔
1364
  else if (!sharedPtrToConn->d_queuedResponses.empty()) {
42!
1365
    /* stop reading and send what we have */
1366
    try {
×
1367
      auto iostate = sendQueuedResponses(sharedPtrToConn, now);
×
1368

1369
      if (sharedPtrToConn->active() && iostate != IOState::Done) {
×
1370
        // we need to update the state right away, nobody will do that for us
1371
        updateIO(iostate, now);
×
1372
      }
×
1373
    }
×
1374
    catch (const std::exception& e) {
×
1375
      vinfolog("Exception in notifyIOError: %s", e.what());
×
1376
    }
×
1377
  }
×
1378
  else {
42✔
1379
    // the backend code already tried to reconnect if it was possible
1380
    sharedPtrToConn->terminateClientConnection();
42✔
1381
  }
42✔
1382
}
44✔
1383

1384
static bool processXFRResponse(DNSResponse& dnsResponse)
1385
{
449✔
1386
  const auto& chains = dnsdist::configuration::getCurrentRuntimeConfiguration().d_ruleChains;
449✔
1387
  const auto& xfrRespRuleActions = dnsdist::rules::getResponseRuleChain(chains, dnsdist::rules::ResponseRuleChain::XFRResponseRules);
449✔
1388

1389
  if (!applyRulesToResponse(xfrRespRuleActions, dnsResponse)) {
449!
1390
    return false;
×
1391
  }
×
1392

1393
  if (dnsResponse.isAsynchronous()) {
449!
1394
    return true;
×
1395
  }
×
1396

1397
  if (dnsResponse.ids.d_extendedError) {
449!
1398
    dnsdist::edns::addExtendedDNSError(dnsResponse.getMutableData(), dnsResponse.getMaximumSize(), dnsResponse.ids.d_extendedError->infoCode, dnsResponse.ids.d_extendedError->extraText);
×
1399
  }
×
1400

1401
  return true;
449✔
1402
}
449✔
1403

1404
void IncomingTCPConnectionState::handleXFRResponse(const struct timeval& now, TCPResponse&& response)
1405
{
449✔
1406
  if (std::this_thread::get_id() != d_creatorThreadID) {
449!
1407
    handleCrossProtocolResponse(now, std::move(response));
×
1408
    return;
×
1409
  }
×
1410

1411
  std::shared_ptr<IncomingTCPConnectionState> state = shared_from_this();
449✔
1412
  auto& ids = response.d_idstate;
449✔
1413
  std::shared_ptr<DownstreamState> backend = response.d_ds ? response.d_ds : (response.d_connection ? response.d_connection->getDS() : nullptr);
449!
1414
  DNSResponse dnsResponse(ids, response.d_buffer, backend);
449✔
1415
  dnsResponse.d_incomingTCPState = state;
449✔
1416
  memcpy(&response.d_cleartextDH, dnsResponse.getHeader().get(), sizeof(response.d_cleartextDH));
449✔
1417

1418
  if (!processXFRResponse(dnsResponse)) {
449!
1419
    state->terminateClientConnection();
×
1420
    return;
×
1421
  }
×
1422

1423
  queueResponse(state, now, std::move(response), true);
449✔
1424
}
449✔
1425

1426
void IncomingTCPConnectionState::handleTimeout(std::shared_ptr<IncomingTCPConnectionState>& state, bool write)
1427
{
18✔
1428
  vinfolog("Timeout while %s TCP client %s", (write ? "writing to" : "reading from"), state->d_ci.remote.toStringWithPort());
18!
1429
  DEBUGLOG("client timeout");
18✔
1430
  DEBUGLOG("Processed " << state->d_queriesCount << " queries, current count is " << state->d_currentQueriesCount << ", " << state->d_ownedConnectionsToBackend.size() << " owned connections, " << state->d_queuedResponses.size() << " response queued");
18✔
1431

1432
  if (write || state->d_currentQueriesCount == 0) {
18✔
1433
    ++state->d_ci.cs->tcpClientTimeouts;
12✔
1434
    state->d_ioState.reset();
12✔
1435
  }
12✔
1436
  else {
6✔
1437
    DEBUGLOG("Going idle");
6✔
1438
    /* we still have some queries in flight, let's just stop reading for now */
1439
    state->d_state = State::idle;
6✔
1440
    state->d_ioState->update(IOState::Done, handleIOCallback, state);
6✔
1441
  }
6✔
1442
}
18✔
1443

1444
static void handleIncomingTCPQuery(int pipefd, FDMultiplexer::funcparam_t& param)
1445
{
2,661✔
1446
  (void)pipefd;
2,661✔
1447
  auto* threadData = boost::any_cast<TCPClientThreadData*>(param);
2,661✔
1448

1449
  std::unique_ptr<ConnectionInfo> citmp{nullptr};
2,661✔
1450
  try {
2,661✔
1451
    auto tmp = threadData->queryReceiver.receive();
2,661✔
1452
    if (!tmp) {
2,661!
1453
      return;
×
1454
    }
×
1455
    citmp = std::move(*tmp);
2,661✔
1456
  }
2,661✔
1457
  catch (const std::exception& e) {
2,661✔
1458
    throw std::runtime_error("Error while reading from the TCP query channel: " + std::string(e.what()));
×
1459
  }
×
1460

1461
  g_tcpclientthreads->decrementQueuedCount();
2,661✔
1462

1463
  timeval now{};
2,661✔
1464
  gettimeofday(&now, nullptr);
2,661✔
1465

1466
  if (citmp->cs->dohFrontend) {
2,661✔
1467
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
201✔
1468
    auto state = std::make_shared<IncomingHTTP2Connection>(std::move(*citmp), *threadData, now);
201✔
1469
    state->handleIO();
201✔
1470
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
201✔
1471
  }
201✔
1472
  else {
2,460✔
1473
    auto state = std::make_shared<IncomingTCPConnectionState>(std::move(*citmp), *threadData, now);
2,460✔
1474
    state->handleIO();
2,460✔
1475
  }
2,460✔
1476
}
2,661✔
1477

1478
static void handleCrossProtocolQuery(int pipefd, FDMultiplexer::funcparam_t& param)
1479
{
228✔
1480
  (void)pipefd;
228✔
1481
  auto* threadData = boost::any_cast<TCPClientThreadData*>(param);
228✔
1482

1483
  std::unique_ptr<CrossProtocolQuery> cpq{nullptr};
228✔
1484
  try {
228✔
1485
    auto tmp = threadData->crossProtocolQueryReceiver.receive();
228✔
1486
    if (!tmp) {
228!
1487
      return;
×
1488
    }
×
1489
    cpq = std::move(*tmp);
228✔
1490
  }
228✔
1491
  catch (const std::exception& e) {
228✔
1492
    throw std::runtime_error("Error while reading from the TCP cross-protocol channel: " + std::string(e.what()));
×
1493
  }
×
1494

1495
  timeval now{};
228✔
1496
  gettimeofday(&now, nullptr);
228✔
1497

1498
  std::shared_ptr<TCPQuerySender> tqs = cpq->getTCPQuerySender();
228✔
1499
  auto query = std::move(cpq->query);
228✔
1500
  auto downstreamServer = std::move(cpq->downstream);
228✔
1501

1502
  try {
228✔
1503
    auto downstream = t_downstreamTCPConnectionsManager.getConnectionToDownstream(threadData->mplexer, downstreamServer, now, std::string());
228✔
1504

1505
    prependSizeToTCPQuery(query.d_buffer, query.d_idstate.d_proxyProtocolPayloadSize);
228✔
1506

1507
    vinfolog("Got query for %s|%s from %s (%s, %d bytes), relayed to %s", query.d_idstate.qname.toLogString(), QType(query.d_idstate.qtype).toString(), query.d_idstate.origRemote.toStringWithPort(), query.d_idstate.protocol.toString(), query.d_buffer.size(), downstreamServer->getNameWithAddr());
228✔
1508

1509
    downstream->queueQuery(tqs, std::move(query));
228✔
1510
  }
228✔
1511
  catch (...) {
228✔
1512
    tqs->notifyIOError(now, std::move(query));
×
1513
  }
×
1514
}
228✔
1515

1516
static void handleCrossProtocolResponse(int pipefd, FDMultiplexer::funcparam_t& param)
1517
{
284✔
1518
  (void)pipefd;
284✔
1519
  auto* threadData = boost::any_cast<TCPClientThreadData*>(param);
284✔
1520

1521
  std::unique_ptr<TCPCrossProtocolResponse> cpr{nullptr};
284✔
1522
  try {
284✔
1523
    auto tmp = threadData->crossProtocolResponseReceiver.receive();
284✔
1524
    if (!tmp) {
284!
1525
      return;
×
1526
    }
×
1527
    cpr = std::move(*tmp);
284✔
1528
  }
284✔
1529
  catch (const std::exception& e) {
284✔
1530
    throw std::runtime_error("Error while reading from the TCP cross-protocol response: " + std::string(e.what()));
×
1531
  }
×
1532

1533
  auto& response = *cpr;
284✔
1534

1535
  try {
284✔
1536
    if (response.d_response.d_buffer.empty()) {
284✔
1537
      response.d_state->notifyIOError(response.d_now, std::move(response.d_response));
24✔
1538
    }
24✔
1539
    else if (response.d_response.d_idstate.qtype == QType::AXFR || response.d_response.d_idstate.qtype == QType::IXFR) {
260!
1540
      response.d_state->handleXFRResponse(response.d_now, std::move(response.d_response));
×
1541
    }
×
1542
    else {
260✔
1543
      response.d_state->handleResponse(response.d_now, std::move(response.d_response));
260✔
1544
    }
260✔
1545
  }
284✔
1546
  catch (...) {
284✔
1547
    /* no point bubbling up from there */
1548
  }
×
1549
}
284✔
1550

1551
struct TCPAcceptorParam
1552
{
1553
  // NOLINTNEXTLINE(cppcoreguidelines-avoid-const-or-ref-data-members)
1554
  ClientState& clientState;
1555
  ComboAddress local;
1556
  int socket{-1};
1557
};
1558

1559
static void acceptNewConnection(const TCPAcceptorParam& param, TCPClientThreadData* threadData);
1560

1561
static void scanForTimeouts(const TCPClientThreadData& data, const timeval& now)
1562
{
11,738✔
1563
  auto expiredReadConns = data.mplexer->getTimeouts(now, false);
11,738✔
1564
  for (const auto& cbData : expiredReadConns) {
11,738✔
1565
    if (cbData.second.type() == typeid(std::shared_ptr<IncomingTCPConnectionState>)) {
11✔
1566
      auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(cbData.second);
1✔
1567
      if (cbData.first == state->d_handler.getDescriptor()) {
1!
1568
        vinfolog("Timeout (read) from remote TCP client %s", state->d_ci.remote.toStringWithPort());
1!
1569
        state->handleTimeout(state, false);
1✔
1570
      }
1✔
1571
    }
1✔
1572
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
10✔
1573
    else if (cbData.second.type() == typeid(std::shared_ptr<IncomingHTTP2Connection>)) {
10!
1574
      auto state = boost::any_cast<std::shared_ptr<IncomingHTTP2Connection>>(cbData.second);
1575
      if (cbData.first == state->d_handler.getDescriptor()) {
×
1576
        vinfolog("Timeout (read) from remote H2 client %s", state->d_ci.remote.toStringWithPort());
×
1577
        std::shared_ptr<IncomingTCPConnectionState> parentState = state;
1578
        state->handleTimeout(parentState, false);
1579
      }
1580
    }
1581
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
10✔
1582
    else if (cbData.second.type() == typeid(std::shared_ptr<TCPConnectionToBackend>)) {
10!
1583
      auto conn = boost::any_cast<std::shared_ptr<TCPConnectionToBackend>>(cbData.second);
10✔
1584
      vinfolog("Timeout (read) from remote backend %s", conn->getBackendName());
10!
1585
      conn->handleTimeout(now, false);
10✔
1586
    }
10✔
1587
  }
11✔
1588

1589
  auto expiredWriteConns = data.mplexer->getTimeouts(now, true);
11,738✔
1590
  for (const auto& cbData : expiredWriteConns) {
11,738!
1591
    if (cbData.second.type() == typeid(std::shared_ptr<IncomingTCPConnectionState>)) {
×
1592
      auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(cbData.second);
×
1593
      if (cbData.first == state->d_handler.getDescriptor()) {
×
1594
        vinfolog("Timeout (write) from remote TCP client %s", state->d_ci.remote.toStringWithPort());
×
1595
        state->handleTimeout(state, true);
×
1596
      }
×
1597
    }
×
1598
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
1599
    else if (cbData.second.type() == typeid(std::shared_ptr<IncomingHTTP2Connection>)) {
×
1600
      auto state = boost::any_cast<std::shared_ptr<IncomingHTTP2Connection>>(cbData.second);
1601
      if (cbData.first == state->d_handler.getDescriptor()) {
×
1602
        vinfolog("Timeout (write) from remote H2 client %s", state->d_ci.remote.toStringWithPort());
×
1603
        std::shared_ptr<IncomingTCPConnectionState> parentState = state;
1604
        state->handleTimeout(parentState, true);
1605
      }
1606
    }
1607
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
1608
    else if (cbData.second.type() == typeid(std::shared_ptr<TCPConnectionToBackend>)) {
×
1609
      auto conn = boost::any_cast<std::shared_ptr<TCPConnectionToBackend>>(cbData.second);
×
1610
      vinfolog("Timeout (write) from remote backend %s", conn->getBackendName());
×
1611
      conn->handleTimeout(now, true);
×
1612
    }
×
1613
  }
×
1614
}
11,738✔
1615

1616
static void dumpTCPStates(const TCPClientThreadData& data)
1617
{
×
1618
  /* just to keep things clean in the output, debug only */
1619
  static std::mutex s_lock;
×
1620
  auto lock = std::scoped_lock(s_lock);
×
1621
  if (g_tcpStatesDumpRequested > 0) {
×
1622
    /* no race here, we took the lock so it can only be increased in the meantime */
1623
    --g_tcpStatesDumpRequested;
×
1624
    infolog("Dumping the TCP states, as requested:");
×
1625
    data.mplexer->runForAllWatchedFDs([](bool isRead, int desc, const FDMultiplexer::funcparam_t& param, struct timeval ttd) {
×
1626
      timeval lnow{};
×
1627
      gettimeofday(&lnow, nullptr);
×
1628
      if (ttd.tv_sec > 0) {
×
1629
        infolog("- Descriptor %d is in %s state, TTD in %d", desc, (isRead ? "read" : "write"), (ttd.tv_sec - lnow.tv_sec));
×
1630
      }
×
1631
      else {
×
1632
        infolog("- Descriptor %d is in %s state, no TTD set", desc, (isRead ? "read" : "write"));
×
1633
      }
×
1634

1635
      if (param.type() == typeid(std::shared_ptr<IncomingTCPConnectionState>)) {
×
1636
        auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(param);
×
1637
        infolog(" - %s", state->toString());
×
1638
      }
×
1639
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
1640
      else if (param.type() == typeid(std::shared_ptr<IncomingHTTP2Connection>)) {
×
1641
        auto state = boost::any_cast<std::shared_ptr<IncomingHTTP2Connection>>(param);
1642
        infolog(" - %s", state->toString());
1643
      }
1644
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
1645
      else if (param.type() == typeid(std::shared_ptr<TCPConnectionToBackend>)) {
×
1646
        auto conn = boost::any_cast<std::shared_ptr<TCPConnectionToBackend>>(param);
×
1647
        infolog(" - %s", conn->toString());
×
1648
      }
×
1649
      else if (param.type() == typeid(TCPClientThreadData*)) {
×
1650
        infolog(" - Worker thread pipe");
×
1651
      }
×
1652
    });
×
1653
    infolog("The TCP/DoT client cache has %d active and %d idle outgoing connections cached", t_downstreamTCPConnectionsManager.getActiveCount(), t_downstreamTCPConnectionsManager.getIdleCount());
×
1654
  }
×
1655
}
×
1656

1657
// NOLINTNEXTLINE(performance-unnecessary-value-param): you are wrong, clang-tidy, go home
1658
static void tcpClientThread(pdns::channel::Receiver<ConnectionInfo>&& queryReceiver, pdns::channel::Receiver<CrossProtocolQuery>&& crossProtocolQueryReceiver, pdns::channel::Receiver<TCPCrossProtocolResponse>&& crossProtocolResponseReceiver, pdns::channel::Sender<TCPCrossProtocolResponse>&& crossProtocolResponseSender, std::vector<ClientState*> tcpAcceptStates)
1659
{
3,750✔
1660
  /* we get launched with a pipe on which we receive file descriptors from clients that we own
1661
     from that point on */
1662

1663
  setThreadName("dnsdist/tcpClie");
3,750✔
1664

1665
  try {
3,750✔
1666
    TCPClientThreadData data;
3,750✔
1667
    data.crossProtocolResponseSender = std::move(crossProtocolResponseSender);
3,750✔
1668
    data.queryReceiver = std::move(queryReceiver);
3,750✔
1669
    data.crossProtocolQueryReceiver = std::move(crossProtocolQueryReceiver);
3,750✔
1670
    data.crossProtocolResponseReceiver = std::move(crossProtocolResponseReceiver);
3,750✔
1671

1672
    data.mplexer->addReadFD(data.queryReceiver.getDescriptor(), handleIncomingTCPQuery, &data);
3,750✔
1673
    data.mplexer->addReadFD(data.crossProtocolQueryReceiver.getDescriptor(), handleCrossProtocolQuery, &data);
3,750✔
1674
    data.mplexer->addReadFD(data.crossProtocolResponseReceiver.getDescriptor(), handleCrossProtocolResponse, &data);
3,750✔
1675

1676
    /* only used in single acceptor mode for now */
1677
    std::vector<TCPAcceptorParam> acceptParams;
3,750✔
1678
    acceptParams.reserve(tcpAcceptStates.size());
3,750✔
1679

1680
    for (auto& state : tcpAcceptStates) {
3,750!
1681
      acceptParams.emplace_back(TCPAcceptorParam{*state, state->local, state->tcpFD});
×
1682
      for (const auto& [addr, socket] : state->d_additionalAddresses) {
×
1683
        acceptParams.emplace_back(TCPAcceptorParam{*state, addr, socket});
×
1684
      }
×
1685
    }
×
1686

1687
    auto acceptCallback = [&data](int socket, FDMultiplexer::funcparam_t& funcparam) {
3,750✔
1688
      (void)socket;
×
1689
      const auto* acceptorParam = boost::any_cast<const TCPAcceptorParam*>(funcparam);
×
1690
      acceptNewConnection(*acceptorParam, &data);
×
1691
    };
×
1692

1693
    for (const auto& param : acceptParams) {
3,750!
1694
      setNonBlocking(param.socket);
×
1695
      data.mplexer->addReadFD(param.socket, acceptCallback, &param);
×
1696
    }
×
1697

1698
    timeval now{};
3,750✔
1699
    gettimeofday(&now, nullptr);
3,750✔
1700
    time_t lastTimeoutScan = now.tv_sec;
3,750✔
1701
    time_t lastConfigRefresh = now.tv_sec;
3,750✔
1702

1703
    for (;;) {
38,798✔
1704
      data.mplexer->run(&now);
38,798✔
1705

1706
      if (now.tv_sec > lastConfigRefresh) {
38,798✔
1707
        lastConfigRefresh = now.tv_sec;
11,518✔
1708
        dnsdist::configuration::refreshLocalRuntimeConfiguration();
11,518✔
1709
      }
11,518✔
1710

1711
      try {
38,798✔
1712
        t_downstreamTCPConnectionsManager.cleanupClosedConnections(now);
38,798✔
1713
        dnsdist::IncomingConcurrentTCPConnectionsManager::cleanup(time(nullptr));
38,798✔
1714

1715
        if (now.tv_sec > lastTimeoutScan) {
38,798✔
1716
          lastTimeoutScan = now.tv_sec;
11,753✔
1717
          scanForTimeouts(data, now);
11,753✔
1718

1719
          if (g_tcpStatesDumpRequested > 0) {
11,753!
1720
            dumpTCPStates(data);
×
1721
          }
×
1722
        }
11,753✔
1723
      }
38,798✔
1724
      catch (const std::exception& e) {
38,798✔
1725
        warnlog("Error in TCP worker thread: %s", e.what());
×
1726
      }
×
1727
    }
38,798✔
1728
  }
3,750✔
1729
  catch (const std::exception& e) {
3,750✔
1730
    errlog("Fatal error in TCP worker thread: %s", e.what());
×
1731
  }
×
1732
}
3,750✔
1733

1734
static void acceptNewConnection(const TCPAcceptorParam& param, TCPClientThreadData* threadData)
1735
{
3,155✔
1736
  auto& clientState = param.clientState;
3,155✔
1737
  const bool checkACL = clientState.dohFrontend == nullptr || (!clientState.dohFrontend->d_trustForwardedForHeader && clientState.dohFrontend->d_earlyACLDrop);
3,155!
1738
  const int socket = param.socket;
3,155✔
1739
  bool tcpClientCountIncremented = false;
3,155✔
1740
  ComboAddress remote;
3,155✔
1741
  remote.sin4.sin_family = param.local.sin4.sin_family;
3,155✔
1742

1743
  tcpClientCountIncremented = false;
3,155✔
1744
  try {
3,155✔
1745
    socklen_t remlen = remote.getSocklen();
3,155✔
1746
    ConnectionInfo connInfo(&clientState);
3,155✔
1747
#ifdef HAVE_ACCEPT4
3,155✔
1748
    // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
1749
    connInfo.fd = accept4(socket, reinterpret_cast<struct sockaddr*>(&remote), &remlen, SOCK_NONBLOCK);
3,155✔
1750
#else
1751
    // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
1752
    connInfo.fd = accept(socket, reinterpret_cast<struct sockaddr*>(&remote), &remlen);
1753
#endif
1754
    // will be decremented when the ConnectionInfo object is destroyed, no matter the reason
1755
    auto concurrentConnections = ++clientState.tcpCurrentConnections;
3,155✔
1756

1757
    if (connInfo.fd < 0) {
3,155!
1758
      throw std::runtime_error((boost::format("accepting new connection on socket: %s") % stringerror()).str());
×
1759
    }
×
1760

1761
    if (checkACL && !dnsdist::configuration::getCurrentRuntimeConfiguration().d_ACL.match(remote)) {
3,155✔
1762
      ++dnsdist::metrics::g_stats.aclDrops;
9✔
1763
      vinfolog("Dropped TCP connection from %s because of ACL", remote.toStringWithPort());
9✔
1764
      return;
9✔
1765
    }
9✔
1766

1767
    if (clientState.d_tcpConcurrentConnectionsLimit > 0 && concurrentConnections > clientState.d_tcpConcurrentConnectionsLimit) {
3,146✔
1768
      vinfolog("Dropped TCP connection from %s because of concurrent connections limit", remote.toStringWithPort());
3!
1769
      return;
3✔
1770
    }
3✔
1771

1772
    if (concurrentConnections > clientState.tcpMaxConcurrentConnections.load()) {
3,143✔
1773
      clientState.tcpMaxConcurrentConnections.store(concurrentConnections);
601✔
1774
    }
601✔
1775

1776
#ifndef HAVE_ACCEPT4
1777
    if (!setNonBlocking(connInfo.fd)) {
1778
      return;
1779
    }
1780
#endif
1781

1782
    setTCPNoDelay(connInfo.fd); // disable NAGLE
3,143✔
1783

1784
    const auto maxTCPQueuedConnections = dnsdist::configuration::getImmutableConfiguration().d_maxTCPQueuedConnections;
3,143✔
1785
    if (maxTCPQueuedConnections > 0 && g_tcpclientthreads->getQueuedCount() >= maxTCPQueuedConnections) {
3,143!
1786
      vinfolog("Dropping TCP connection from %s because we have too many queued already", remote.toStringWithPort());
×
1787
      return;
×
1788
    }
×
1789

1790
    auto connectionResult = dnsdist::IncomingConcurrentTCPConnectionsManager::accountNewTCPConnection(remote, connInfo.cs->hasTLS());
3,143✔
1791
    if (connectionResult == dnsdist::IncomingConcurrentTCPConnectionsManager::NewConnectionResult::Denied) {
3,143✔
1792
      return;
6✔
1793
    }
6✔
1794
    tcpClientCountIncremented = true;
3,137✔
1795
    if (connectionResult == dnsdist::IncomingConcurrentTCPConnectionsManager::NewConnectionResult::Restricted) {
3,137!
1796
      connInfo.d_restricted = true;
×
1797
    }
×
1798

1799
    vinfolog("Got TCP connection from %s", remote.toStringWithPort());
3,137✔
1800

1801
    connInfo.remote = remote;
3,137✔
1802

1803
    if (threadData == nullptr) {
3,137✔
1804
      if (!g_tcpclientthreads->passConnectionToThread(std::make_unique<ConnectionInfo>(std::move(connInfo)))) {
2,661!
1805
        if (tcpClientCountIncremented) {
×
1806
          dnsdist::IncomingConcurrentTCPConnectionsManager::accountClosedTCPConnection(remote);
×
1807
        }
×
1808
      }
×
1809
    }
2,661✔
1810
    else {
476✔
1811
      timeval now{};
476✔
1812
      gettimeofday(&now, nullptr);
476✔
1813

1814
      if (connInfo.cs->dohFrontend) {
476!
1815
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
1816
        auto state = std::make_shared<IncomingHTTP2Connection>(std::move(connInfo), *threadData, now);
1817
        state->handleIO();
1818
#endif /* HAVE_DNS_OVER_HTTPS && HAVE_NGHTTP2 */
1819
      }
×
1820
      else {
476✔
1821
        auto state = std::make_shared<IncomingTCPConnectionState>(std::move(connInfo), *threadData, now);
476✔
1822
        state->handleIO();
476✔
1823
      }
476✔
1824
    }
476✔
1825
  }
3,137✔
1826
  catch (const std::exception& e) {
3,155✔
1827
    errlog("While reading a TCP question: %s", e.what());
×
1828
    if (tcpClientCountIncremented) {
×
1829
      dnsdist::IncomingConcurrentTCPConnectionsManager::accountClosedTCPConnection(remote);
×
1830
    }
×
1831
  }
×
1832
  catch (...) {
3,155✔
1833
  }
×
1834
}
3,155✔
1835

1836
/* spawn as many of these as required, they call Accept on a socket on which they will accept queries, and
1837
   they will hand off to worker threads & spawn more of them if required
1838
*/
1839
#ifndef USE_SINGLE_ACCEPTOR_THREAD
1840
void tcpAcceptorThread(const std::vector<ClientState*>& states)
1841
{
476✔
1842
  setThreadName("dnsdist/tcpAcce");
476✔
1843

1844
  std::vector<TCPAcceptorParam> params;
476✔
1845
  params.reserve(states.size());
476✔
1846

1847
  for (const auto& state : states) {
476✔
1848
    params.emplace_back(TCPAcceptorParam{*state, state->local, state->tcpFD});
476✔
1849
    for (const auto& [addr, socket] : state->d_additionalAddresses) {
476!
1850
      params.emplace_back(TCPAcceptorParam{*state, addr, socket});
×
1851
    }
×
1852
  }
476✔
1853

1854
  if (params.size() == 1) {
476!
1855
    while (true) {
3,631✔
1856
      dnsdist::configuration::refreshLocalRuntimeConfiguration();
3,155✔
1857
      acceptNewConnection(params.at(0), nullptr);
3,155✔
1858
    }
3,155✔
1859
  }
476✔
1860
  else {
×
1861
    auto acceptCallback = [](int socket, FDMultiplexer::funcparam_t& funcparam) {
×
1862
      (void)socket;
×
1863
      const auto* acceptorParam = boost::any_cast<const TCPAcceptorParam*>(funcparam);
×
1864
      acceptNewConnection(*acceptorParam, nullptr);
×
1865
    };
×
1866

1867
    auto mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent(params.size()));
×
1868
    for (const auto& param : params) {
×
1869
      mplexer->addReadFD(param.socket, acceptCallback, &param);
×
1870
    }
×
1871

1872
    timeval now{};
×
1873
    time_t lastConfigRefresh = now.tv_sec;
×
1874
    while (true) {
×
1875
      mplexer->run(&now, -1);
×
1876

1877
      if (now.tv_sec > lastConfigRefresh) {
×
1878
        lastConfigRefresh = now.tv_sec;
×
1879
        dnsdist::configuration::refreshLocalRuntimeConfiguration();
×
1880
      }
×
1881
    }
×
1882
  }
×
1883
}
476✔
1884
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc