• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

PowerDNS / pdns / 17129807640

21 Aug 2025 02:23PM UTC coverage: 65.961% (+0.008%) from 65.953%
17129807640

Pull #16013

github

web-flow
Merge 7b9d23664 into 5566651aa
Pull Request #16013: update keyblocks with non-SHA1 signing keys

42083 of 92400 branches covered (45.54%)

Branch coverage included in aggregate %.

127968 of 165404 relevant lines covered (77.37%)

5605035.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.47
/pdns/recursordist/rec-tcp.cc
1
/*
2
 * This file is part of PowerDNS or dnsdist.
3
 * Copyright -- PowerDNS.COM B.V. and its contributors
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of version 2 of the GNU General Public License as
7
 * published by the Free Software Foundation.
8
 *
9
 * In addition, for the avoidance of any doubt, permission is granted to
10
 * link this program with OpenSSL and to (re)distribute the binaries
11
 * produced as the result of such linking.
12
 *
13
 * This program is distributed in the hope that it will be useful,
14
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
 * GNU General Public License for more details.
17
 *
18
 * You should have received a copy of the GNU General Public License
19
 * along with this program; if not, write to the Free Software
20
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21
 */
22

23
#include "rec-main.hh"
24

25
#include "arguments.hh"
26
#include "logger.hh"
27
#include "mplexer.hh"
28
#include "uuid-utils.hh"
29

30
// OLD PRE 5.0.0 situation:
31
//
32
// When pdns-distributes-queries is false with reuseport true (the default since 4.9.0), TCP queries
33
// are read and handled by worker threads. If the kernel balancing is OK for TCP sockets (observed
34
// to be good on Debian bullseye, but not good on e.g. MacOS), the TCP handling is no extra burden.
35
// In the case of MacOS all incoming TCP queries are handled by a single worker, while incoming UDP
36
// queries do get distributed round-robin over the worker threads.  Do note the TCP queries might
37
// need to wait until the g_maxUDPQueriesPerRound is reached.
38
//
39
// In the case of pdns-distributes-queries true and reuseport false the queries were read and
40
// initially processed by the distributor thread(s).
41
//
42
// Initial processing consist of parsing, calling gettag and checking if we have a packet cache
43
// hit. If that does not produce a hit, the query is passed to an mthread in the same way as with
44
// UDP queries, but do note that the mthread processing is serviced by the distributor thread. The
45
// final answer will be sent by the same distributor thread that originally picked up the query.
46
//
47
// Changing this, and having incoming TCP queries handled by worker threads is somewhat more complex
48
// than UDP, as the socket must remain available in the distributor thread (for reading more
49
// queries), but the TCP socket must also be passed to a worker thread so it can write its
50
// answer. The in-flight bookkeeping also has to be aware of how a query is handled to do the
51
// accounting properly. I am not sure if changing the current setup is worth all this trouble,
52
// especially since the default is now to not use pdns-distributes-queries, which works well in many
53
// cases.
54
//
55
// NEW SITUATION SINCE 5.0.0:
56
//
57
// The drawback mentioned in https://github.com/PowerDNS/pdns/issues/8394 are not longer true, so an
58
// alternative approach would be to introduce dedicated TCP worker thread(s).
59
//
60
// This approach was implemented in https://github.com/PowerDNS/pdns/pull/13195. The distributor and
61
// worker thread(s) now no longer process TCP queries.
62

63
size_t g_tcpMaxQueriesPerConn;
64
unsigned int g_maxTCPClients;
65
unsigned int g_maxTCPPerClient;
66
int g_tcpTimeout;
67
bool g_anyToTcp;
68

69
uint16_t TCPConnection::s_maxInFlight;
70

71
using tcpClientCounts_t = map<ComboAddress, uint32_t, ComboAddress::addressOnlyLessThan>;
72
static thread_local std::unique_ptr<tcpClientCounts_t> t_tcpClientCounts = std::make_unique<tcpClientCounts_t>();
73

74
static void handleRunningTCPQuestion(int fileDesc, FDMultiplexer::funcparam_t& var);
75

76
#if 0
77
#define TCPLOG(tcpsock, x)                                 \
78
  do {                                                     \
79
    cerr << []() { timeval t; gettimeofday(&t, nullptr); return t.tv_sec % 10  + t.tv_usec/1000000.0; }() << " FD " << (tcpsock) << ' ' << x; \
80
  } while (0)
81
#else
82
// We do not define this as empty since that produces a duplicate case label warning from clang-tidy
83
#define TCPLOG(pid, x) /* NOLINT(cppcoreguidelines-macro-usage) */ \
84
  while (false) {                                                  \
995✔
85
    cerr << x; /* NOLINT(bugprone-macro-parentheses) */            \
×
86
  }
×
87
#endif
88

89
std::atomic<uint32_t> TCPConnection::s_currentConnections;
90

91
TCPConnection::TCPConnection(int fileDesc, const ComboAddress& addr) :
92
  data(2, 0), d_remote(addr), d_fd(fileDesc)
93
{
404✔
94
  ++s_currentConnections;
404✔
95
  (*t_tcpClientCounts)[d_remote]++;
404✔
96
}
404✔
97

98
TCPConnection::~TCPConnection()
99
{
404✔
100
  try {
404✔
101
    if (closesocket(d_fd) < 0) {
404!
102
      g_slogtcpin->info(Logr::Error, "Error closing socket for TCPConnection");
×
103
    }
×
104
  }
404✔
105
  catch (const PDNSException& e) {
404✔
106
    g_slogtcpin->error(Logr::Error, e.reason, "Error closing TCPConnection socket", "exception", Logging::Loggable("PDNSException"));
×
107
  }
×
108

109
  if (t_tcpClientCounts && t_tcpClientCounts->count(d_remote) != 0 && (*t_tcpClientCounts)[d_remote]-- == 0) {
404!
110
    t_tcpClientCounts->erase(d_remote);
×
111
  }
×
112
  --s_currentConnections;
404✔
113
}
404✔
114

115
static void terminateTCPConnection(int fileDesc)
116
{
400✔
117
  try {
400✔
118
    t_fdm->removeReadFD(fileDesc);
400✔
119
  }
400✔
120
  catch (const FDMultiplexerException& fde) {
400✔
121
  }
×
122
}
400✔
123

124
static void sendErrorOverTCP(std::unique_ptr<DNSComboWriter>& comboWriter, int rcode)
125
{
×
126
  std::vector<uint8_t> packet;
×
127
  if (comboWriter->d_mdp.d_header.qdcount == 0U) {
×
128
    /* header-only */
129
    packet.resize(sizeof(dnsheader));
×
130
  }
×
131
  else {
×
132
    DNSPacketWriter packetWriter(packet, comboWriter->d_mdp.d_qname, comboWriter->d_mdp.d_qtype, comboWriter->d_mdp.d_qclass);
×
133
    if (comboWriter->d_mdp.hasEDNS()) {
×
134
      /* we try to add the EDNS OPT RR even for truncated answers,
135
         as rfc6891 states:
136
         "The minimal response MUST be the DNS header, question section, and an
137
         OPT record.  This MUST also occur when a truncated response (using
138
         the DNS header's TC bit) is returned."
139
      */
140
      packetWriter.addOpt(512, 0, 0);
×
141
      packetWriter.commit();
×
142
    }
×
143
  }
×
144

145
  auto& header = reinterpret_cast<dnsheader&>(packet.at(0)); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast) safe cast
×
146
  header.aa = 0;
×
147
  header.ra = 1;
×
148
  header.qr = 1;
×
149
  header.tc = 0;
×
150
  header.id = comboWriter->d_mdp.d_header.id;
×
151
  header.rd = comboWriter->d_mdp.d_header.rd;
×
152
  header.cd = comboWriter->d_mdp.d_header.cd;
×
153
  header.rcode = rcode;
×
154

155
  sendResponseOverTCP(comboWriter, packet);
×
156
}
×
157

158
void finishTCPReply(std::unique_ptr<DNSComboWriter>& comboWriter, bool hadError, bool updateInFlight)
159
{
850✔
160
  // update tcp connection status, closing if needed and doing the fd multiplexer accounting
161
  if (updateInFlight && comboWriter->d_tcpConnection->d_requestsInFlight > 0) {
850!
162
    comboWriter->d_tcpConnection->d_requestsInFlight--;
784✔
163
  }
784✔
164

165
  // In the code below, we try to remove the fd from the set, but
166
  // we don't know if another mthread already did the remove, so we can get a
167
  // "Tried to remove unlisted fd" exception.  Not that an inflight < limit test
168
  // will not work since we do not know if the other mthread got an error or not.
169
  if (hadError) {
850!
170
    terminateTCPConnection(comboWriter->d_socket);
×
171
    comboWriter->d_socket = -1;
×
172
    return;
×
173
  }
×
174
  comboWriter->d_tcpConnection->queriesCount++;
850✔
175
  if ((g_tcpMaxQueriesPerConn > 0 && comboWriter->d_tcpConnection->queriesCount >= g_tcpMaxQueriesPerConn) || (comboWriter->d_tcpConnection->isDropOnIdle() && comboWriter->d_tcpConnection->d_requestsInFlight == 0)) {
850!
176
    try {
3✔
177
      t_fdm->removeReadFD(comboWriter->d_socket);
3✔
178
    }
3✔
179
    catch (FDMultiplexerException&) {
3✔
180
    }
×
181
    comboWriter->d_socket = -1;
3✔
182
    return;
3✔
183
  }
3✔
184

185
  Utility::gettimeofday(&g_now, nullptr); // needs to be updated
847✔
186
  struct timeval ttd = g_now;
847✔
187

188
  // If we cross from max to max-1 in flight requests, the fd was not listened to, add it back
189
  if (updateInFlight && comboWriter->d_tcpConnection->d_requestsInFlight == TCPConnection::s_maxInFlight - 1) {
847✔
190
    // A read error might have happened. If we add the fd back, it will most likely error again.
191
    // This is not a big issue, the next handleTCPClientReadable() will see another read error
192
    // and take action.
193
    ttd.tv_sec += g_tcpTimeout;
2✔
194
    t_fdm->addReadFD(comboWriter->d_socket, handleRunningTCPQuestion, comboWriter->d_tcpConnection, &ttd);
2✔
195
    return;
2✔
196
  }
2✔
197
  // fd might have been removed by read error code, or a read timeout, so expect an exception
198
  try {
845✔
199
    t_fdm->setReadTTD(comboWriter->d_socket, ttd, g_tcpTimeout);
845✔
200
  }
845✔
201
  catch (const FDMultiplexerException&) {
845✔
202
    // but if the FD was removed because of a timeout while we were sending a response,
203
    // we need to re-arm it. If it was an error it will error again.
204
    ttd.tv_sec += g_tcpTimeout;
×
205
    t_fdm->addReadFD(comboWriter->d_socket, handleRunningTCPQuestion, comboWriter->d_tcpConnection, &ttd);
×
206
  }
×
207
}
845✔
208

209
/*
210
 * A helper class that by default closes the incoming TCP connection on destruct
211
 * If you want to keep the connection alive, call keep() on the guard object
212
 */
213
class RunningTCPQuestionGuard
214
{
215
public:
216
  RunningTCPQuestionGuard(const RunningTCPQuestionGuard&) = default;
217
  RunningTCPQuestionGuard(RunningTCPQuestionGuard&&) = delete;
218
  RunningTCPQuestionGuard& operator=(const RunningTCPQuestionGuard&) = default;
219
  RunningTCPQuestionGuard& operator=(RunningTCPQuestionGuard&&) = delete;
220
  RunningTCPQuestionGuard(int fileDesc) :
221
    d_fd(fileDesc) {}
1,562✔
222
  ~RunningTCPQuestionGuard()
223
  {
1,562✔
224
    if (d_fd != -1) {
1,562✔
225
      terminateTCPConnection(d_fd);
400✔
226
      d_fd = -1;
400✔
227
    }
400✔
228
  }
1,562✔
229
  void keep()
230
  {
2,012✔
231
    d_fd = -1;
2,012✔
232
  }
2,012✔
233
  bool handleTCPReadResult(int /* fd */, ssize_t bytes)
234
  {
414✔
235
    if (bytes == 0) {
414✔
236
      /* EOF */
237
      return false;
390✔
238
    }
390✔
239
    if (bytes < 0) {
24!
240
      if (errno != EAGAIN && errno != EWOULDBLOCK) {
24!
241
        return false;
×
242
      }
×
243
    }
24✔
244
    keep();
24✔
245
    return true;
24✔
246
  }
24✔
247

248
private:
249
  int d_fd{-1};
250
};
251

252
static void handleNotify(std::unique_ptr<DNSComboWriter>& comboWriter, const DNSName& qname)
253
{
1✔
254
  if (!t_allowNotifyFrom || !t_allowNotifyFrom->match(comboWriter->d_mappedSource)) {
1!
255
    if (!g_quiet) {
×
256
      g_slogtcpin->info(Logr::Error, "Dropping TCP NOTIFY, address not matched by allow-notify-from", "source", Logging::Loggable(comboWriter->d_mappedSource));
×
257
    }
×
258

259
    t_Counters.at(rec::Counter::sourceDisallowedNotify)++;
×
260
    return;
×
261
  }
×
262

263
  if (!isAllowNotifyForZone(qname)) {
1!
264
    if (!g_quiet) {
×
265
      g_slogtcpin->info(Logr::Error, "Dropping TCP NOTIFY,  zone not matched by allow-notify-for", "source", Logging::Loggable(comboWriter->d_mappedSource), "zone", Logging::Loggable(qname));
×
266
    }
×
267

268
    t_Counters.at(rec::Counter::zoneDisallowedNotify)++;
×
269
    return;
×
270
  }
×
271
}
1✔
272

273
static void doProtobufLogQuery(bool logQuery, LocalStateHolder<LuaConfigItems>& luaconfsLocal, const std::unique_ptr<DNSComboWriter>& comboWriter, const DNSName& qname, QType qtype, QClass qclass, const dnsheader* dnsheader, const shared_ptr<TCPConnection>& conn, const boost::optional<uint32_t>& ednsVersion)
274
{
8✔
275
  try {
8✔
276
    if (logQuery && !(luaconfsLocal->protobufExportConfig.taggedOnly && comboWriter->d_policyTags.empty())) {
8!
277
      protobufLogQuery(luaconfsLocal, comboWriter->d_uuid, comboWriter->d_source, comboWriter->d_destination, comboWriter->d_mappedSource, comboWriter->d_ednssubnet.getSource(), true, conn->qlen, qname, qtype, qclass, comboWriter->d_policyTags, comboWriter->d_requestorId, comboWriter->d_deviceId, comboWriter->d_deviceName, comboWriter->d_meta, ednsVersion, *dnsheader);
2✔
278
    }
2✔
279
  }
8✔
280
  catch (const std::exception& e) {
8✔
281
    if (g_logCommonErrors) {
×
282
      g_slogtcpin->error(Logr::Warning, e.what(), "Error parsing a TCP query packet for edns subnet", "exception", Logging::Loggable("std::exception"), "remote", Logging::Loggable(conn->d_remote));
×
283
    }
×
284
  }
×
285
}
8✔
286

287
static void doProcessTCPQuestion(std::unique_ptr<DNSComboWriter>& comboWriter, shared_ptr<TCPConnection>& conn, RunningTCPQuestionGuard& tcpGuard, int fileDesc)
288
{
852✔
289
  RecThreadInfo::self().incNumberOfDistributedQueries();
852✔
290
  struct timeval start{};
852✔
291
  Utility::gettimeofday(&start, nullptr);
852✔
292

293
  DNSName qname;
852✔
294
  uint16_t qtype = 0;
852✔
295
  uint16_t qclass = 0;
852✔
296
  bool needEDNSParse = false;
852✔
297
  string requestorId;
852✔
298
  string deviceId;
852✔
299
  string deviceName;
852✔
300
  bool logQuery = false;
852✔
301
  bool qnameParsed = false;
852✔
302
  boost::optional<uint32_t> ednsVersion;
852✔
303

304
  comboWriter->d_eventTrace.setEnabled(SyncRes::s_event_trace_enabled != 0);
852✔
305
  // eventTrace uses monotonic time, while OpenTelemetry uses absolute time. setEnabled()
306
  // established the reference point, get an absolute TS as close as possible to the
307
  // eventTrace start of trace time.
308
  auto traceTS = pdns::trace::timestamp();
852✔
309
  comboWriter->d_eventTrace.add(RecEventTrace::ReqRecv);
852✔
310
  if (SyncRes::eventTraceEnabled(SyncRes::event_trace_to_ot)) {
852!
311
    comboWriter->d_otTrace.clear();
×
312
    comboWriter->d_otTrace.start_time_unix_nano = traceTS;
×
313
  }
×
314
  auto luaconfsLocal = g_luaconfs.getLocal();
852✔
315
  if (checkProtobufExport(luaconfsLocal)) {
852✔
316
    needEDNSParse = true;
8✔
317
  }
8✔
318
  logQuery = t_protobufServers.servers && luaconfsLocal->protobufExportConfig.logQueries;
852✔
319
  comboWriter->d_logResponse = t_protobufServers.servers && luaconfsLocal->protobufExportConfig.logResponses;
852!
320

321
  if (needEDNSParse || (t_pdl && (t_pdl->hasGettagFFIFunc() || t_pdl->hasGettagFunc())) || comboWriter->d_mdp.d_header.opcode == static_cast<unsigned>(Opcode::Notify)) {
852✔
322

323
    try {
45✔
324
      EDNSOptionViewMap ednsOptions;
45✔
325
      comboWriter->d_ecsParsed = true;
45✔
326
      comboWriter->d_ecsFound = false;
45✔
327
      getQNameAndSubnet(conn->data, &qname, &qtype, &qclass,
45✔
328
                        comboWriter->d_ecsFound, &comboWriter->d_ednssubnet,
45✔
329
                        (g_gettagNeedsEDNSOptions || SyncRes::eventTraceEnabled(SyncRes::event_trace_to_ot)) ? &ednsOptions : nullptr, ednsVersion);
45!
330
      qnameParsed = true;
45✔
331

332
      if (SyncRes::eventTraceEnabled(SyncRes::event_trace_to_ot)) {
45!
333
        pdns::trace::extractOTraceIDs(ednsOptions, comboWriter->d_otTrace);
×
334
      }
×
335
      if (t_pdl) {
45✔
336
        try {
44✔
337
          if (t_pdl->hasGettagFFIFunc()) {
44✔
338
            RecursorLua4::FFIParams params(qname, qtype, comboWriter->d_local, comboWriter->d_remote, comboWriter->d_destination, comboWriter->d_source, comboWriter->d_ednssubnet.getSource(), comboWriter->d_data, comboWriter->d_gettagPolicyTags, comboWriter->d_records, ednsOptions, comboWriter->d_proxyProtocolValues, requestorId, deviceId, deviceName, comboWriter->d_routingTag, comboWriter->d_rcode, comboWriter->d_ttlCap, comboWriter->d_variable, true, logQuery, comboWriter->d_logResponse, comboWriter->d_followCNAMERecords, comboWriter->d_extendedErrorCode, comboWriter->d_extendedErrorExtra, comboWriter->d_responsePaddingDisabled, comboWriter->d_meta);
23✔
339
            auto match = comboWriter->d_eventTrace.add(RecEventTrace::LuaGetTagFFI);
23✔
340
            comboWriter->d_tag = t_pdl->gettag_ffi(params);
23✔
341
            comboWriter->d_eventTrace.add(RecEventTrace::LuaGetTagFFI, comboWriter->d_tag, false, match);
23✔
342
          }
23✔
343
          else if (t_pdl->hasGettagFunc()) {
21!
344
            auto match = comboWriter->d_eventTrace.add(RecEventTrace::LuaGetTag);
21✔
345
            comboWriter->d_tag = t_pdl->gettag(comboWriter->d_source, comboWriter->d_ednssubnet.getSource(), comboWriter->d_destination, qname, qtype, &comboWriter->d_gettagPolicyTags, comboWriter->d_data, ednsOptions, true, requestorId, deviceId, deviceName, comboWriter->d_routingTag, comboWriter->d_proxyProtocolValues);
21✔
346
            comboWriter->d_eventTrace.add(RecEventTrace::LuaGetTag, comboWriter->d_tag, false, match);
21✔
347
          }
21✔
348
          // Copy d_gettagPolicyTags to d_policyTags, so other Lua hooks see them and can add their
349
          // own. Before storing into the packetcache, the tags in d_gettagPolicyTags will be
350
          // cleared by addPolicyTagsToPBMessageIfNeeded() so they do *not* end up in the PC. When an
351
          // Protobuf message is constructed, one part comes from the PC (including the tags
352
          // set by non-gettag hooks), and the tags in d_gettagPolicyTags will be added by the code
353
          // constructing the PB message.
354
          comboWriter->d_policyTags = comboWriter->d_gettagPolicyTags;
44✔
355
        }
44✔
356
        catch (const MOADNSException& moadnsexception) {
44✔
357
          if (g_logCommonErrors) {
×
358
            g_slogtcpin->error(moadnsexception.what(), "Error parsing a query packet for tag determination", "qname", Logging::Loggable(qname), "excepion", Logging::Loggable("MOADNSException"));
×
359
          }
×
360
        }
×
361
        catch (const std::exception& stdException) {
44✔
362
          g_rateLimitedLogger.log(g_slogtcpin, "Error parsing a query packet for tag determination", stdException, "qname", Logging::Loggable(qname), "remote", Logging::Loggable(conn->d_remote));
×
363
        }
×
364
      }
44✔
365
    }
45✔
366
    catch (const std::exception& stdException) {
45✔
367
      g_rateLimitedLogger.log(g_slogudpin, "Error parsing a query packet for tag determination, setting tag=0", stdException, "remote", Logging::Loggable(conn->d_remote));
×
368
    }
×
369
  }
45✔
370

371
  if (comboWriter->d_tag == 0 && !comboWriter->d_responsePaddingDisabled && g_paddingFrom.match(comboWriter->d_remote)) {
852!
372
    comboWriter->d_tag = g_paddingTag;
×
373
  }
×
374

375
  const dnsheader_aligned headerdata(conn->data.data());
852✔
376
  const struct dnsheader* dnsheader = headerdata.get();
852✔
377

378
  if (t_protobufServers.servers || t_outgoingProtobufServers.servers) {
852!
379
    comboWriter->d_requestorId = std::move(requestorId);
8✔
380
    comboWriter->d_deviceId = std::move(deviceId);
8✔
381
    comboWriter->d_deviceName = std::move(deviceName);
8✔
382
    comboWriter->d_uuid = getUniqueID();
8✔
383
  }
8✔
384

385
  if (t_protobufServers.servers) {
852✔
386
    doProtobufLogQuery(logQuery, luaconfsLocal, comboWriter, qname, qtype, qclass, dnsheader, conn, ednsVersion);
8✔
387
  }
8✔
388

389
  if (t_pdl) {
852✔
390
    bool ipf = t_pdl->ipfilter(comboWriter->d_source, comboWriter->d_destination, *dnsheader, comboWriter->d_eventTrace);
195✔
391
    if (ipf) {
195✔
392
      if (!g_quiet) {
2!
393
        g_slogtcpin->info(Logr::Info, "Dropped TCP question based on policy", "remote", Logging::Loggable(conn->d_remote), "source", Logging::Loggable(comboWriter->d_source));
2✔
394
      }
2✔
395
      t_Counters.at(rec::Counter::policyDrops)++;
2✔
396
      return;
2✔
397
    }
2✔
398
  }
195✔
399

400
  if (comboWriter->d_mdp.d_header.qr) {
850!
401
    t_Counters.at(rec::Counter::ignoredCount)++;
×
402
    if (g_logCommonErrors) {
×
403
      g_slogtcpin->info(Logr::Error, "Ignoring answer from TCP client on server socket", "remote", Logging::Loggable(comboWriter->getRemote()));
×
404
    }
×
405
    return;
×
406
  }
×
407
  if (comboWriter->d_mdp.d_header.opcode != static_cast<unsigned>(Opcode::Query) && comboWriter->d_mdp.d_header.opcode != static_cast<unsigned>(Opcode::Notify)) {
850!
408
    t_Counters.at(rec::Counter::ignoredCount)++;
×
409
    if (g_logCommonErrors) {
×
410
      g_slogtcpin->info(Logr::Error, "Ignoring unsupported opcode from TCP client", "remote", Logging::Loggable(comboWriter->getRemote()), "opcode", Logging::Loggable(Opcode::to_s(comboWriter->d_mdp.d_header.opcode)));
×
411
    }
×
412
    sendErrorOverTCP(comboWriter, RCode::NotImp);
×
413
    tcpGuard.keep();
×
414
    return;
×
415
  }
×
416
  if (dnsheader->qdcount == 0U) {
850!
417
    t_Counters.at(rec::Counter::emptyQueriesCount)++;
×
418
    if (g_logCommonErrors) {
×
419
      g_slogtcpin->info(Logr::Error, "Ignoring empty (qdcount == 0) query on server socket", "remote", Logging::Loggable(comboWriter->getRemote()));
×
420
    }
×
421
    sendErrorOverTCP(comboWriter, RCode::NotImp);
×
422
    tcpGuard.keep();
×
423
    return;
×
424
  }
×
425
  {
850✔
426
    // We have read a proper query
427
    ++t_Counters.at(rec::Counter::qcounter);
850✔
428
    ++t_Counters.at(rec::Counter::tcpqcounter);
850✔
429
    if (comboWriter->d_source.sin4.sin_family == AF_INET6) {
850✔
430
      ++t_Counters.at(rec::Counter::ipv6qcounter);
16✔
431
    }
16✔
432

433
    if (comboWriter->d_mdp.d_header.opcode == static_cast<unsigned>(Opcode::Notify)) {
850✔
434
      handleNotify(comboWriter, qname);
1✔
435
    }
1✔
436

437
    string response;
850✔
438
    RecursorPacketCache::OptPBData pbData{boost::none};
850✔
439

440
    if (comboWriter->d_mdp.d_header.opcode == static_cast<unsigned>(Opcode::Query)) {
850✔
441
      /* It might seem like a good idea to skip the packet cache lookup if we know that the answer is not cacheable,
442
         but it means that the hash would not be computed. If some script decides at a later time to mark back the answer
443
         as cacheable we would cache it with a wrong tag, so better safe than sorry. */
444
      auto match = comboWriter->d_eventTrace.add(RecEventTrace::PCacheCheck);
849✔
445
      bool cacheHit = checkForCacheHit(qnameParsed, comboWriter->d_tag, conn->data, qname, qtype, qclass, g_now, response, comboWriter->d_qhash, pbData, true, comboWriter->d_source, comboWriter->d_mappedSource);
849✔
446
      comboWriter->d_eventTrace.add(RecEventTrace::PCacheCheck, cacheHit, false, match);
849✔
447

448
      if (cacheHit) {
849✔
449
        if (!g_quiet) {
66✔
450
          g_slogtcpin->info(Logr::Notice, "TCP question answered from packet cache", "tag", Logging::Loggable(comboWriter->d_tag),
40✔
451
                            "qname", Logging::Loggable(qname), "qtype", Logging::Loggable(QType(qtype)),
40✔
452
                            "source", Logging::Loggable(comboWriter->d_source), "remote", Logging::Loggable(comboWriter->d_remote));
40✔
453
        }
40✔
454

455
        bool hadError = sendResponseOverTCP(comboWriter, response);
66✔
456
        finishTCPReply(comboWriter, hadError, false);
66✔
457
        struct timeval now{};
66✔
458
        Utility::gettimeofday(&now, nullptr);
66✔
459
        uint64_t spentUsec = uSec(now - start);
66✔
460
        t_Counters.at(rec::Histogram::cumulativeAnswers)(spentUsec);
66✔
461
        comboWriter->d_eventTrace.add(RecEventTrace::AnswerSent);
66✔
462

463
        if (t_protobufServers.servers && comboWriter->d_logResponse && (!luaconfsLocal->protobufExportConfig.taggedOnly || (pbData && pbData->d_tagged))) {
66!
464
          struct timeval tval{
3✔
465
            0, 0};
3✔
466
          protobufLogResponse(qname, qtype, dnsheader, luaconfsLocal, pbData, tval, true, comboWriter->d_source, comboWriter->d_destination, comboWriter->d_mappedSource, comboWriter->d_ednssubnet, comboWriter->d_uuid, comboWriter->d_requestorId, comboWriter->d_deviceId, comboWriter->d_deviceName, comboWriter->d_meta, comboWriter->d_eventTrace, comboWriter->d_otTrace, comboWriter->d_policyTags);
3✔
467
        }
3✔
468

469
        if (comboWriter->d_eventTrace.enabled() && SyncRes::eventTraceEnabled(SyncRes::event_trace_to_log)) {
66!
470
          g_slogtcpin->info(Logr::Info, comboWriter->d_eventTrace.toString()); // More fancy?
×
471
        }
×
472
        tcpGuard.keep();
66✔
473
        t_Counters.updateSnap(g_regressionTestMode);
66✔
474
        return;
66✔
475
      } // cache hit
66✔
476
    } // query opcode
849✔
477

478
    if (comboWriter->d_mdp.d_header.opcode == static_cast<unsigned>(Opcode::Notify)) {
784✔
479
      if (!g_quiet) {
1!
480
        g_slogtcpin->info(Logr::Notice, "Got NOTIFY", "qname", Logging::Loggable(qname), "source", Logging::Loggable(comboWriter->d_source), "remote", Logging::Loggable(comboWriter->d_remote));
1✔
481
      }
1✔
482

483
      requestWipeCaches(qname);
1✔
484

485
      // the operation will now be treated as a Query, generating
486
      // a normal response, as the rest of the code does not
487
      // check dh->opcode, but we need to ensure that the response
488
      // to this request does not get put into the packet cache
489
      comboWriter->d_variable = true;
1✔
490
    }
1✔
491

492
    // setup for startDoResolve() in an mthread
493
    ++conn->d_requestsInFlight;
784✔
494
    if (conn->d_requestsInFlight >= TCPConnection::s_maxInFlight) {
784✔
495
      t_fdm->removeReadFD(fileDesc); // should no longer awake ourselves when there is data to read
2✔
496
    }
2✔
497
    else {
782✔
498
      Utility::gettimeofday(&g_now, nullptr); // needed?
782✔
499
      struct timeval ttd = g_now;
782✔
500
      t_fdm->setReadTTD(fileDesc, ttd, g_tcpTimeout);
782✔
501
    }
782✔
502
    tcpGuard.keep();
784✔
503
    g_multiTasker->makeThread(startDoResolve, comboWriter.release()); // deletes dc
784✔
504
  } // good query
784✔
505
}
784✔
506

507
static void handleRunningTCPQuestion(int fileDesc, FDMultiplexer::funcparam_t& var) // NOLINT(readability-function-cognitive-complexity)
508
{
1,562✔
509
  auto conn = boost::any_cast<shared_ptr<TCPConnection>>(var);
1,562✔
510

511
  RunningTCPQuestionGuard tcpGuard{fileDesc};
1,562✔
512

513
  if (conn->state == TCPConnection::PROXYPROTOCOLHEADER) {
1,562✔
514
    ssize_t bytes = recv(conn->getFD(), &conn->data.at(conn->proxyProtocolGot), conn->proxyProtocolNeed, 0);
131✔
515
    if (bytes <= 0) {
131✔
516
      tcpGuard.handleTCPReadResult(fileDesc, bytes);
7✔
517
      return;
7✔
518
    }
7✔
519

520
    conn->proxyProtocolGot += bytes;
124✔
521
    conn->data.resize(conn->proxyProtocolGot);
124✔
522
    ssize_t remaining = isProxyHeaderComplete(conn->data);
124✔
523
    if (remaining == 0) {
124✔
524
      if (g_logCommonErrors) {
4!
525
        g_slogtcpin->info(Logr::Error, "Unable to consume proxy protocol header in packet from TCP client", "remote", Logging::Loggable(conn->d_remote));
4✔
526
      }
4✔
527
      ++t_Counters.at(rec::Counter::proxyProtocolInvalidCount);
4✔
528
      return;
4✔
529
    }
4✔
530
    if (remaining < 0) {
120✔
531
      conn->proxyProtocolNeed = -remaining;
97✔
532
      conn->data.resize(conn->proxyProtocolGot + conn->proxyProtocolNeed);
97✔
533
      tcpGuard.keep();
97✔
534
      return;
97✔
535
    }
97✔
536
    {
23✔
537
      /* proxy header received */
538
      /* we ignore the TCP field for now, but we could properly set whether
539
         the connection was received over UDP or TCP if needed */
540
      bool tcp = false;
23✔
541
      bool proxy = false;
23✔
542
      size_t used = parseProxyHeader(conn->data, proxy, conn->d_source, conn->d_destination, tcp, conn->proxyProtocolValues);
23✔
543
      if (used <= 0) {
23!
544
        if (g_logCommonErrors) {
×
545
          g_slogtcpin->info(Logr::Error, "Unable to parse proxy protocol header in packet from TCP client", "remote", Logging::Loggable(conn->d_remote));
×
546
        }
×
547
        ++t_Counters.at(rec::Counter::proxyProtocolInvalidCount);
×
548
        return;
×
549
      }
×
550
      if (static_cast<size_t>(used) > g_proxyProtocolMaximumSize) {
23✔
551
        if (g_logCommonErrors) {
2!
552
          g_slogtcpin->info(Logr::Error, "Proxy protocol header in packet from TCP client is larger than proxy-protocol-maximum-size", "remote", Logging::Loggable(conn->d_remote), "size", Logging::Loggable(used));
2✔
553
        }
2✔
554
        ++t_Counters.at(rec::Counter::proxyProtocolInvalidCount);
2✔
555
        return;
2✔
556
      }
2✔
557

558
      /* Now that we have retrieved the address of the client, as advertised by the proxy
559
         via the proxy protocol header, check that it is allowed by our ACL */
560
      /* note that if the proxy header used a 'LOCAL' command, the original source and destination are untouched so everything should be fine */
561
      conn->d_mappedSource = conn->d_source;
21✔
562
      if (t_proxyMapping) {
21!
563
        if (const auto* iter = t_proxyMapping->lookup(conn->d_source)) {
×
564
          conn->d_mappedSource = iter->second.address;
×
565
          ++iter->second.stats.netmaskMatches;
×
566
        }
×
567
      }
×
568
      if (t_remotes) {
21!
569
        t_remotes->push_back(conn->d_source);
21✔
570
      }
21✔
571
      if (t_allowFrom && !t_allowFrom->match(&conn->d_mappedSource)) {
21!
572
        if (!g_quiet) {
4!
573
          g_slogtcpin->info(Logr::Error, "Dropping TCP query, address not matched by allow-from", "remote", Logging::Loggable(conn->d_remote));
4✔
574
        }
4✔
575

576
        ++t_Counters.at(rec::Counter::unauthorizedTCP);
4✔
577
        return;
4✔
578
      }
4✔
579

580
      conn->data.resize(2);
17✔
581
      conn->state = TCPConnection::BYTE0;
17✔
582
    }
17✔
583
  }
17✔
584

585
  if (conn->state == TCPConnection::BYTE0) {
1,448✔
586
    ssize_t bytes = recv(conn->getFD(), conn->data.data(), 2, 0);
1,237✔
587
    if (bytes == 1) {
1,237✔
588
      conn->state = TCPConnection::BYTE1;
2✔
589
    }
2✔
590
    if (bytes == 2) {
1,237✔
591
      conn->qlen = (((unsigned char)conn->data[0]) << 8) + (unsigned char)conn->data[1];
853✔
592
      conn->data.resize(conn->qlen);
853✔
593
      conn->bytesread = 0;
853✔
594
      conn->state = TCPConnection::GETQUESTION;
853✔
595
    }
853✔
596
    if (bytes <= 0) {
1,237✔
597
      tcpGuard.handleTCPReadResult(fileDesc, bytes);
382✔
598
      return;
382✔
599
    }
382✔
600
  }
1,237✔
601

602
  if (conn->state == TCPConnection::BYTE1) {
1,066✔
603
    ssize_t bytes = recv(conn->getFD(), &conn->data[1], 1, 0);
4✔
604
    if (bytes == 1) {
4✔
605
      conn->state = TCPConnection::GETQUESTION;
2✔
606
      conn->qlen = (((unsigned char)conn->data[0]) << 8) + (unsigned char)conn->data[1];
2✔
607
      conn->data.resize(conn->qlen);
2✔
608
      conn->bytesread = 0;
2✔
609
    }
2✔
610
    if (bytes <= 0) {
4✔
611
      if (!tcpGuard.handleTCPReadResult(fileDesc, bytes)) {
2!
612
        if (g_logCommonErrors) {
×
613
          g_slogtcpin->info(Logr::Error, "TCP client disconnected after first byte", "remote", Logging::Loggable(conn->d_remote));
×
614
        }
×
615
      }
×
616
      return;
2✔
617
    }
2✔
618
  }
4✔
619

620
  if (conn->state == TCPConnection::GETQUESTION) {
1,064!
621
    ssize_t bytes = recv(conn->getFD(), &conn->data[conn->bytesread], conn->qlen - conn->bytesread, 0);
1,064✔
622
    if (bytes <= 0) {
1,064✔
623
      if (!tcpGuard.handleTCPReadResult(fileDesc, bytes)) {
23✔
624
        if (g_logCommonErrors) {
3!
625
          g_slogtcpin->info(Logr::Error, "TCP client disconnected while reading question body", "remote", Logging::Loggable(conn->d_remote));
3✔
626
        }
3✔
627
      }
3✔
628
      return;
23✔
629
    }
23✔
630
    if (bytes > std::numeric_limits<std::uint16_t>::max()) {
1,041!
631
      if (g_logCommonErrors) {
×
632
        g_slogtcpin->info(Logr::Error, "TCP client sent an invalid question size while reading question body", "remote", Logging::Loggable(conn->d_remote));
×
633
      }
×
634
      return;
×
635
    }
×
636
    conn->bytesread += (uint16_t)bytes;
1,041✔
637
    if (conn->bytesread == conn->qlen) {
1,041✔
638
      conn->state = TCPConnection::BYTE0;
852✔
639
      std::unique_ptr<DNSComboWriter> comboWriter;
852✔
640
      try {
852✔
641
        comboWriter = std::make_unique<DNSComboWriter>(conn->data, g_now, t_pdl);
852✔
642
      }
852✔
643
      catch (const MOADNSException& mde) {
852✔
644
        t_Counters.at(rec::Counter::clientParseError)++;
×
645
        if (g_logCommonErrors) {
×
646
          g_slogtcpin->info(Logr::Error, "Unable to parse packet from TCP client", "remte", Logging::Loggable(conn->d_remote));
×
647
        }
×
648
        return;
×
649
      }
×
650

651
      comboWriter->d_tcpConnection = conn; // carry the torch
852✔
652
      comboWriter->setSocket(conn->getFD()); // this is the only time a copy is made of the actual fd
852✔
653
      comboWriter->d_tcp = true;
852✔
654
      comboWriter->setRemote(conn->d_remote); // the address the query was received from
852✔
655
      comboWriter->setSource(conn->d_source); // the address we assume the query is coming from, might be set by proxy protocol
852✔
656
      ComboAddress dest;
852✔
657
      dest.reset();
852✔
658
      dest.sin4.sin_family = conn->d_remote.sin4.sin_family;
852✔
659
      socklen_t len = dest.getSocklen();
852✔
660
      getsockname(conn->getFD(), reinterpret_cast<sockaddr*>(&dest), &len); // if this fails, we're ok with it NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
852✔
661
      comboWriter->setLocal(dest); // the address we received the query on
852✔
662
      comboWriter->setDestination(conn->d_destination); // the address we assume the query is received on, might be set by proxy protocol
852✔
663
      comboWriter->setMappedSource(conn->d_mappedSource); // the address we assume the query is coming from after table based mapping
852✔
664
      /* we can't move this if we want to be able to access the values in
665
         all queries sent over this connection */
666
      comboWriter->d_proxyProtocolValues = conn->proxyProtocolValues;
852✔
667

668
      doProcessTCPQuestion(comboWriter, conn, tcpGuard, fileDesc);
852✔
669
    } // reading query
852✔
670
  }
1,041✔
671
  // more to come
672
  tcpGuard.keep();
1,041✔
673
}
1,041✔
674

675
//! Handle new incoming TCP connection
676
void handleNewTCPQuestion(int fileDesc, [[maybe_unused]] FDMultiplexer::funcparam_t& var)
677
{
404✔
678
  ComboAddress addr;
404✔
679
  socklen_t addrlen = sizeof(addr);
404✔
680
  int newsock = accept(fileDesc, reinterpret_cast<struct sockaddr*>(&addr), &addrlen); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
404✔
681
  if (newsock < 0) {
404!
682
    return;
×
683
  }
×
684
  auto closeSock = [newsock](rec::Counter cnt, const string& msg) {
404✔
685
    try {
×
686
      closesocket(newsock);
×
687
      t_Counters.at(cnt)++;
×
688
      // We want this bump to percolate up without too much delay
689
      t_Counters.updateSnap(false);
×
690
    }
×
691
    catch (const PDNSException& e) {
×
692
      g_slogtcpin->error(Logr::Error, e.reason, msg, "exception", Logging::Loggable("PDNSException"));
×
693
    }
×
694
  };
×
695

696
  if (TCPConnection::getCurrentConnections() >= g_maxTCPClients) {
404!
697
    closeSock(rec::Counter::tcpOverflow, "Error closing TCP socket after an overflow drop");
×
698
    return;
×
699
  }
×
700
  if (g_multiTasker->numProcesses() >= g_maxMThreads) {
404!
701
    closeSock(rec::Counter::overCapacityDrops, "Error closing TCP socket after an over capacity drop");
×
702
    return;
×
703
  }
×
704

705
  ComboAddress destaddr;
404✔
706
  socklen_t len = sizeof(destaddr);
404✔
707
  getsockname(newsock, reinterpret_cast<sockaddr*>(&destaddr), &len); // if this fails, we're ok with it NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
404✔
708
  bool fromProxyProtocolSource = expectProxyProtocol(addr, destaddr);
404✔
709
  if (!fromProxyProtocolSource && t_remotes) {
404!
710
    t_remotes->push_back(addr);
370✔
711
  }
370✔
712
  ComboAddress mappedSource = addr;
404✔
713
  if (!fromProxyProtocolSource && t_proxyMapping) {
404✔
714
    if (const auto* iter = t_proxyMapping->lookup(addr)) {
5!
715
      mappedSource = iter->second.address;
5✔
716
      ++iter->second.stats.netmaskMatches;
5✔
717
    }
5✔
718
  }
5✔
719
  if (!fromProxyProtocolSource && t_allowFrom && !t_allowFrom->match(&mappedSource)) {
404!
720
    if (!g_quiet) {
×
721
      g_slogtcpin->info(Logr::Error, "dropping TCP query address neither matched by allow-from nor proxy-protocol-from", "source", Logging::Loggable(mappedSource));
×
722
    }
×
723
    closeSock(rec::Counter::unauthorizedTCP, "Error closing TCP socket after an ACL drop");
×
724
    return;
×
725
  }
×
726

727
  if (g_maxTCPPerClient > 0 && t_tcpClientCounts->count(addr) > 0 && (*t_tcpClientCounts)[addr] >= g_maxTCPPerClient) {
404!
728
    closeSock(rec::Counter::tcpClientOverflow, "Error closing TCP socket after a client overflow drop");
×
729
    return;
×
730
  }
×
731

732
  setNonBlocking(newsock);
404✔
733
  setTCPNoDelay(newsock);
404✔
734
  std::shared_ptr<TCPConnection> tcpConn = std::make_shared<TCPConnection>(newsock, addr);
404✔
735
  tcpConn->d_source = addr;
404✔
736
  tcpConn->d_destination = destaddr;
404✔
737
  tcpConn->d_mappedSource = mappedSource;
404✔
738

739
  if (fromProxyProtocolSource) {
404✔
740
    tcpConn->proxyProtocolNeed = s_proxyProtocolMinimumHeaderSize;
34✔
741
    tcpConn->data.resize(tcpConn->proxyProtocolNeed);
34✔
742
    tcpConn->state = TCPConnection::PROXYPROTOCOLHEADER;
34✔
743
  }
34✔
744
  else {
370✔
745
    tcpConn->state = TCPConnection::BYTE0;
370✔
746
  }
370✔
747

748
  timeval ttd{};
404✔
749
  Utility::gettimeofday(&ttd, nullptr);
404✔
750
  ttd.tv_sec += g_tcpTimeout;
404✔
751

752
  t_fdm->addReadFD(tcpConn->getFD(), handleRunningTCPQuestion, tcpConn, &ttd);
404✔
753
}
404✔
754

755
static void TCPIOHandlerIO(int fileDesc, FDMultiplexer::funcparam_t& var);
756

757
static void TCPIOHandlerStateChange(IOState oldstate, IOState newstate, std::shared_ptr<PacketID>& pid)
758
{
120✔
759
  TCPLOG(pid->tcpsock, "State transation " << int(oldstate) << "->" << int(newstate) << endl);
120✔
760

761
  pid->lowState = newstate;
120✔
762

763
  // handle state transitions
764
  switch (oldstate) {
120!
765
  case IOState::NeedRead:
37✔
766

767
    switch (newstate) {
37!
768
    case IOState::NeedWrite:
×
769
      TCPLOG(pid->tcpsock, "NeedRead -> NeedWrite: flip FD" << endl);
×
770
      t_fdm->alterFDToWrite(pid->tcpsock, TCPIOHandlerIO, pid);
×
771
      break;
×
772
    case IOState::NeedRead:
×
773
      break;
×
774
    case IOState::Done:
37!
775
      TCPLOG(pid->tcpsock, "Done -> removeReadFD" << endl);
37✔
776
      t_fdm->removeReadFD(pid->tcpsock);
37✔
777
      break;
37✔
778
    case IOState::Async:
×
779
      throw std::runtime_error("TLS async mode not supported");
×
780
      break;
×
781
    }
37✔
782
    break;
37✔
783

784
  case IOState::NeedWrite:
37✔
785

786
    switch (newstate) {
25!
787
    case IOState::NeedRead:
4✔
788
      TCPLOG(pid->tcpsock, "NeedWrite -> NeedRead: flip FD" << endl);
4✔
789
      t_fdm->alterFDToRead(pid->tcpsock, TCPIOHandlerIO, pid);
4✔
790
      break;
4✔
791
    case IOState::NeedWrite:
×
792
      break;
×
793
    case IOState::Done:
21✔
794
      TCPLOG(pid->tcpsock, "Done -> removeWriteFD" << endl);
21✔
795
      t_fdm->removeWriteFD(pid->tcpsock);
21✔
796
      break;
21✔
797
    case IOState::Async:
×
798
      throw std::runtime_error("TLS async mode not supported");
×
799
      break;
×
800
    }
25✔
801
    break;
25✔
802

803
  case IOState::Done:
58✔
804
    switch (newstate) {
58!
805
    case IOState::NeedRead:
33✔
806
      TCPLOG(pid->tcpsock, "NeedRead: addReadFD" << endl);
33✔
807
      t_fdm->addReadFD(pid->tcpsock, TCPIOHandlerIO, pid);
33✔
808
      break;
33✔
809
    case IOState::NeedWrite:
25✔
810
      TCPLOG(pid->tcpsock, "NeedWrite: addWriteFD" << endl);
25✔
811
      t_fdm->addWriteFD(pid->tcpsock, TCPIOHandlerIO, pid);
25✔
812
      break;
25✔
813
    case IOState::Done:
×
814
      break;
×
815
    case IOState::Async:
×
816
      throw std::runtime_error("TLS async mode not supported");
×
817
      break;
×
818
    }
58✔
819
    break;
58✔
820

821
  case IOState::Async:
58!
822
    throw std::runtime_error("TLS async mode not supported");
×
823
    break;
×
824
  }
120✔
825
}
120✔
826

827
static void TCPIOHandlerIO(int fileDesc, FDMultiplexer::funcparam_t& var)
828
{
62✔
829
  auto pid = boost::any_cast<std::shared_ptr<PacketID>>(var);
62✔
830
  assert(pid->tcphandler); // NOLINT(cppcoreguidelines-pro-bounds-array-to-pointer-decay): def off assert triggers it
62✔
831
  assert(fileDesc == pid->tcphandler->getDescriptor()); // NOLINT(cppcoreguidelines-pro-bounds-array-to-pointer-decay) idem
×
832
  IOState newstate = IOState::Done;
×
833

834
  TCPLOG(pid->tcpsock, "TCPIOHandlerIO: lowState " << int(pid->lowState) << endl);
62✔
835

836
  // In the code below, we want to update the state of the fd before calling sendEvent
837
  // a sendEvent might close the fd, and some poll multiplexers do not like to manipulate a closed fd
838

839
  switch (pid->highState) {
62!
840
  case TCPAction::DoingRead:
33✔
841
    TCPLOG(pid->tcpsock, "highState: Reading" << endl);
33✔
842
    // In arecvtcp, the buffer was resized already so inWanted bytes will fit
843
    // try reading
844
    try {
33✔
845
      newstate = pid->tcphandler->tryRead(pid->inMSG, pid->inPos, pid->inWanted);
33✔
846
      switch (newstate) {
33!
847
      case IOState::Done:
33!
848
      case IOState::NeedRead:
33!
849
        TCPLOG(pid->tcpsock, "tryRead: Done or NeedRead " << int(newstate) << ' ' << pid->inPos << '/' << pid->inWanted << endl);
33✔
850
        TCPLOG(pid->tcpsock, "TCPIOHandlerIO " << pid->inWanted << ' ' << pid->inIncompleteOkay << endl);
33✔
851
        if (pid->inPos == pid->inWanted || (pid->inIncompleteOkay && pid->inPos > 0)) {
33!
852
          pid->inMSG.resize(pid->inPos); // old content (if there) + new bytes read, only relevant for the inIncompleteOkay case
33✔
853
          newstate = IOState::Done;
33✔
854
          TCPIOHandlerStateChange(pid->lowState, newstate, pid);
33✔
855
          g_multiTasker->sendEvent(pid, &pid->inMSG);
33✔
856
          return;
33✔
857
        }
33✔
858
        break;
×
859
      case IOState::NeedWrite:
×
860
        break;
×
861
      case IOState::Async:
×
862
        throw std::runtime_error("TLS async mode not supported");
×
863
        break;
×
864
      }
33✔
865
    }
33✔
866
    catch (const std::exception& e) {
33✔
867
      newstate = IOState::Done;
×
868
      TCPLOG(pid->tcpsock, "read exception..." << e.what() << endl);
×
869
      PacketBuffer empty;
×
870
      TCPIOHandlerStateChange(pid->lowState, newstate, pid);
×
871
      g_multiTasker->sendEvent(pid, &empty); // this conveys error status
×
872
      return;
×
873
    }
×
874
    break;
×
875

876
  case TCPAction::DoingWrite:
29✔
877
    TCPLOG(pid->tcpsock, "highState: Writing" << endl);
29✔
878
    try {
29✔
879
      TCPLOG(pid->tcpsock, "tryWrite: " << pid->outPos << '/' << pid->outMSG.size() << ' ' << " -> ");
29✔
880
      newstate = pid->tcphandler->tryWrite(pid->outMSG, pid->outPos, pid->outMSG.size());
29✔
881
      TCPLOG(pid->tcpsock, pid->outPos << '/' << pid->outMSG.size() << endl);
29✔
882
      switch (newstate) {
29!
883
      case IOState::Done: {
25✔
884
        TCPLOG(pid->tcpsock, "tryWrite: Done" << endl);
25✔
885
        TCPIOHandlerStateChange(pid->lowState, newstate, pid);
25✔
886
        g_multiTasker->sendEvent(pid, &pid->outMSG); // send back what we sent to convey everything is ok
25✔
887
        return;
25✔
888
      }
×
889
      case IOState::NeedRead:
4✔
890
        TCPLOG(pid->tcpsock, "tryWrite: NeedRead" << endl);
4✔
891
        break;
4✔
892
      case IOState::NeedWrite:
×
893
        TCPLOG(pid->tcpsock, "tryWrite: NeedWrite" << endl);
×
894
        break;
×
895
      case IOState::Async:
×
896
        throw std::runtime_error("TLS async mode not supported");
×
897
        break;
×
898
      }
29✔
899
    }
29✔
900
    catch (const std::exception& e) {
29✔
901
      newstate = IOState::Done;
×
902
      TCPLOG(pid->tcpsock, "write exception..." << e.what() << endl);
×
903
      PacketBuffer sent;
×
904
      TCPIOHandlerStateChange(pid->lowState, newstate, pid);
×
905
      g_multiTasker->sendEvent(pid, &sent); // we convey error status by sending empty string
×
906
      return;
×
907
    }
×
908
    break;
4✔
909
  }
62✔
910

911
  // Cases that did not end up doing a sendEvent
912
  TCPIOHandlerStateChange(pid->lowState, newstate, pid);
4✔
913
}
4✔
914

915
void checkFastOpenSysctl([[maybe_unused]] bool active, [[maybe_unused]] Logr::log_t log)
916
{
×
917
#ifdef __linux__
×
918
  string line;
×
919
  if (readFileIfThere("/proc/sys/net/ipv4/tcp_fastopen", &line)) {
×
920
    int flag = std::stoi(line);
×
921
    if (active && !(flag & 1)) {
×
922
      log->info(Logr::Error, "tcp-fast-open-connect enabled but net.ipv4.tcp_fastopen does not allow it");
×
923
    }
×
924
    if (!active && !(flag & 2)) {
×
925
      log->info(Logr::Error, "tcp-fast-open enabled but net.ipv4.tcp_fastopen does not allow it");
×
926
    }
×
927
  }
×
928
  else {
×
929
    log->info(Logr::Notice, "Cannot determine if kernel settings allow fast-open");
×
930
  }
×
931
#else
932
  log->info(Logr::Notice, "Cannot determine if kernel settings allow fast-open");
933
#endif
934
}
×
935

936
void checkTFOconnect(Logr::log_t log)
937
{
×
938
  try {
×
939
    Socket socket(AF_INET, SOCK_STREAM);
×
940
    socket.setNonBlocking();
×
941
    socket.setFastOpenConnect();
×
942
  }
×
943
  catch (const NetworkError& e) {
×
944
    log->error(Logr::Error, e.what(), "tcp-fast-open-connect enabled but returned error");
×
945
  }
×
946
}
×
947

948
LWResult::Result asendtcp(const PacketBuffer& data, shared_ptr<TCPIOHandler>& handler)
949
{
39✔
950
  TCPLOG(handler->getDescriptor(), "asendtcp called " << data.size() << endl);
39✔
951

952
  auto pident = std::make_shared<PacketID>();
39✔
953
  pident->tcphandler = handler;
39✔
954
  pident->tcpsock = handler->getDescriptor();
39✔
955
  pident->outMSG = data;
39✔
956
  pident->highState = TCPAction::DoingWrite;
39✔
957

958
  IOState state = IOState::Done;
39✔
959
  try {
39✔
960
    TCPLOG(pident->tcpsock, "Initial tryWrite: " << pident->outPos << '/' << pident->outMSG.size() << ' ' << " -> ");
39✔
961
    state = handler->tryWrite(pident->outMSG, pident->outPos, pident->outMSG.size());
39✔
962
    TCPLOG(pident->tcpsock, pident->outPos << '/' << pident->outMSG.size() << endl);
39✔
963

964
    if (state == IOState::Done) {
39✔
965
      TCPLOG(pident->tcpsock, "asendtcp success A" << endl);
12✔
966
      return LWResult::Result::Success;
12✔
967
    }
12✔
968
  }
39✔
969
  catch (const std::exception& e) {
39✔
970
    TCPLOG(pident->tcpsock, "tryWrite() exception..." << e.what() << endl);
2✔
971
    return LWResult::Result::PermanentError;
2✔
972
  }
2✔
973

974
  // Will set pident->lowState
975
  TCPIOHandlerStateChange(IOState::Done, state, pident);
25✔
976

977
  PacketBuffer packet;
25✔
978
  int ret = g_multiTasker->waitEvent(pident, &packet, g_networkTimeoutMsec);
25✔
979
  TCPLOG(pident->tcpsock, "asendtcp waitEvent returned " << ret << ' ' << packet.size() << '/' << data.size() << ' ');
25✔
980
  if (ret == 0) {
25!
981
    TCPLOG(pident->tcpsock, "timeout" << endl);
×
982
    TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
×
983
    return LWResult::Result::Timeout;
×
984
  }
×
985
  if (ret == -1) { // error
25!
986
    TCPLOG(pident->tcpsock, "PermanentError" << endl);
×
987
    TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
×
988
    return LWResult::Result::PermanentError;
×
989
  }
×
990
  if (packet.size() != data.size()) { // main loop tells us what it sent out, or empty in case of an error
25!
991
    // fd housekeeping done by TCPIOHandlerIO
992
    TCPLOG(pident->tcpsock, "PermanentError size mismatch" << endl);
×
993
    return LWResult::Result::PermanentError;
×
994
  }
×
995

996
  TCPLOG(pident->tcpsock, "asendtcp success" << endl);
25✔
997
  return LWResult::Result::Success;
25✔
998
}
25✔
999

1000
LWResult::Result arecvtcp(PacketBuffer& data, const size_t len, shared_ptr<TCPIOHandler>& handler, const bool incompleteOkay)
1001
{
66✔
1002
  TCPLOG(handler->getDescriptor(), "arecvtcp called " << len << ' ' << data.size() << endl);
66✔
1003
  data.resize(len);
66✔
1004

1005
  // We might have data already available from the TLS layer, try to get that into the buffer
1006
  size_t pos = 0;
66✔
1007
  IOState state = IOState::Done;
66✔
1008
  try {
66✔
1009
    TCPLOG(handler->getDescriptor(), "calling tryRead() " << len << endl);
66✔
1010
    state = handler->tryRead(data, pos, len);
66✔
1011
    TCPLOG(handler->getDescriptor(), "arcvtcp tryRead() returned " << int(state) << ' ' << pos << '/' << len << endl);
66✔
1012
    switch (state) {
66!
1013
    case IOState::Done:
33✔
1014
    case IOState::NeedRead:
66✔
1015
      if (pos == len || (incompleteOkay && pos > 0)) {
66!
1016
        data.resize(pos);
33✔
1017
        TCPLOG(handler->getDescriptor(), "acecvtcp success A" << endl);
33✔
1018
        return LWResult::Result::Success;
33✔
1019
      }
33✔
1020
      break;
33✔
1021
    case IOState::NeedWrite:
33!
1022
      break;
×
1023
    case IOState::Async:
×
1024
      throw std::runtime_error("TLS async mode not supported");
×
1025
      break;
×
1026
    }
66✔
1027
  }
66✔
1028
  catch (const std::exception& e) {
66✔
1029
    TCPLOG(handler->getDescriptor(), "tryRead() exception..." << e.what() << endl);
×
1030
    return LWResult::Result::PermanentError;
×
1031
  }
×
1032

1033
  auto pident = std::make_shared<PacketID>();
33✔
1034
  pident->tcphandler = handler;
33✔
1035
  pident->tcpsock = handler->getDescriptor();
33✔
1036
  // We might have a partial result
1037
  pident->inMSG = std::move(data);
33✔
1038
  pident->inPos = pos;
33✔
1039
  pident->inWanted = len;
33✔
1040
  pident->inIncompleteOkay = incompleteOkay;
33✔
1041
  pident->highState = TCPAction::DoingRead;
33✔
1042

1043
  data.clear();
33✔
1044

1045
  // Will set pident->lowState
1046
  TCPIOHandlerStateChange(IOState::Done, state, pident);
33✔
1047

1048
  int ret = g_multiTasker->waitEvent(pident, &data, authWaitTimeMSec(g_multiTasker));
33✔
1049
  TCPLOG(pident->tcpsock, "arecvtcp " << ret << ' ' << data.size() << ' ');
33✔
1050
  if (ret == 0) {
33!
1051
    TCPLOG(pident->tcpsock, "timeout" << endl);
×
1052
    TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
×
1053
    return LWResult::Result::Timeout;
×
1054
  }
×
1055
  if (ret == -1) {
33!
1056
    TCPLOG(pident->tcpsock, "PermanentError" << endl);
×
1057
    TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
×
1058
    return LWResult::Result::PermanentError;
×
1059
  }
×
1060
  if (data.empty()) { // error, EOF or other
33!
1061
    // fd housekeeping done by TCPIOHandlerIO
1062
    TCPLOG(pident->tcpsock, "EOF" << endl);
×
1063
    return LWResult::Result::PermanentError;
×
1064
  }
×
1065

1066
  TCPLOG(pident->tcpsock, "arecvtcp success" << endl);
33✔
1067
  return LWResult::Result::Success;
33✔
1068
}
33✔
1069

1070
// The two last arguments to makeTCPServerSockets are used for logging purposes only
1071
unsigned int makeTCPServerSockets(deferredAdd_t& deferredAdds, std::set<int>& tcpSockets, Logr::log_t log, bool doLog, unsigned int instances)
1072
{
175✔
1073
  vector<string> localAddresses;
175✔
1074
  vector<string> logVec;
175✔
1075
  stringtok(localAddresses, ::arg()["local-address"], " ,");
175✔
1076

1077
  if (localAddresses.empty()) {
175!
1078
    throw PDNSException("No local address specified");
×
1079
  }
×
1080

1081
#ifdef TCP_DEFER_ACCEPT
175✔
1082
  auto first = true;
175✔
1083
#endif
175✔
1084
  const uint16_t defaultLocalPort = ::arg().asNum("local-port");
175✔
1085
  const vector<string> defaultVector = {"127.0.0.1", "::1"};
175✔
1086
  const auto configIsDefault = localAddresses == defaultVector;
175✔
1087

1088
  for (const auto& localAddress : localAddresses) {
192✔
1089
    ComboAddress address{localAddress, defaultLocalPort};
192✔
1090
    auto socketFd = FDWrapper(socket(address.sin6.sin6_family, SOCK_STREAM, 0));
192✔
1091
    if (socketFd < 0) {
192!
1092
      throw PDNSException("Making a TCP server socket for resolver: " + stringerror());
×
1093
    }
×
1094
    setCloseOnExec(socketFd);
192✔
1095

1096
    int tmp = 1;
192✔
1097
    if (setsockopt(socketFd, SOL_SOCKET, SO_REUSEADDR, &tmp, sizeof tmp) < 0) {
192!
1098
      int err = errno;
×
1099
      log->error(Logr::Critical, err, "Setsockopt failed for TCP listening socket");
×
1100
      _exit(1);
×
1101
    }
×
1102
    if (address.sin6.sin6_family == AF_INET6 && setsockopt(socketFd, IPPROTO_IPV6, IPV6_V6ONLY, &tmp, sizeof(tmp)) < 0) {
192!
1103
      int err = errno;
×
1104
      log->error(Logr::Warning, err, "Failed to set IPv6 socket to IPv6 only, continuing anyhow");
×
1105
    }
×
1106

1107
#ifdef TCP_DEFER_ACCEPT
192✔
1108
    if (setsockopt(socketFd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &tmp, sizeof tmp) >= 0) {
192!
1109
      if (first) {
192✔
1110
        log->info(Logr::Info, "Enabled TCP data-ready filter for (slight) DoS protection");
175✔
1111
      }
175✔
1112
    }
192✔
1113
#endif
192✔
1114

1115
    if (::arg().mustDo("non-local-bind")) {
192!
1116
      Utility::setBindAny(AF_INET, socketFd);
×
1117
    }
×
1118

1119
    if (g_reusePort) {
192✔
1120
#if defined(SO_REUSEPORT_LB)
1121
      try {
1122
        SSetsockopt(socketFd, SOL_SOCKET, SO_REUSEPORT_LB, 1);
1123
      }
1124
      catch (const std::exception& e) {
1125
        throw PDNSException(std::string("SO_REUSEPORT_LB: ") + e.what());
1126
      }
1127
#elif defined(SO_REUSEPORT)
1128
      try {
162✔
1129
        SSetsockopt(socketFd, SOL_SOCKET, SO_REUSEPORT, 1);
162✔
1130
      }
162✔
1131
      catch (const std::exception& e) {
162✔
1132
        throw PDNSException(std::string("SO_REUSEPORT: ") + e.what());
×
1133
      }
×
1134
#endif
162✔
1135
    }
162✔
1136

1137
    if (SyncRes::s_tcp_fast_open > 0) {
192!
1138
      checkFastOpenSysctl(false, log);
×
1139
#ifdef TCP_FASTOPEN
×
1140
      if (setsockopt(socketFd, IPPROTO_TCP, TCP_FASTOPEN, &SyncRes::s_tcp_fast_open, sizeof SyncRes::s_tcp_fast_open) < 0) {
×
1141
        int err = errno;
×
1142
        log->error(Logr::Error, err, "Failed to enable TCP Fast Open for listening socket");
×
1143
      }
×
1144
#else
1145
      log->info(Logr::Warning, "TCP Fast Open configured but not supported for listening socket");
1146
#endif
1147
    }
×
1148

1149
    socklen_t socklen = address.sin4.sin_family == AF_INET ? sizeof(address.sin4) : sizeof(address.sin6);
192✔
1150
    if (::bind(socketFd, reinterpret_cast<struct sockaddr*>(&address), socklen) < 0) { // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
192!
1151
      int err = errno;
×
1152
      if (!configIsDefault || address != ComboAddress{"::1", defaultLocalPort}) {
×
1153
        throw PDNSException("Binding TCP server socket for " + address.toStringWithPort() + ": " + stringerror(err));
×
1154
      }
×
1155
      log->info(Logr::Warning, "Cannot listen on this address, skipping", "proto", Logging::Loggable("TCP"), "address", Logging::Loggable(address), "error", Logging::Loggable(stringerror(err)));
×
1156
      continue;
×
1157
    }
×
1158

1159
    setNonBlocking(socketFd);
192✔
1160
    try {
192✔
1161
      setSocketSendBuffer(socketFd, 65000);
192✔
1162
    }
192✔
1163
    catch (const std::exception& e) {
192✔
1164
      log->error(Logr::Error, e.what(), "Exception while setting socket send buffer");
×
1165
    }
×
1166

1167
    listen(socketFd, 128);
192✔
1168
    deferredAdds.emplace_back(socketFd, handleNewTCPQuestion);
192✔
1169
    tcpSockets.insert(socketFd);
192✔
1170
    logVec.emplace_back(address.toStringWithPort());
192✔
1171

1172
    // we don't need to update g_listenSocketsAddresses since it doesn't work for TCP/IP:
1173
    //  - fd is not that which we know here, but returned from accept()
1174

1175
#ifdef TCP_DEFER_ACCEPT
192✔
1176
    first = false;
192✔
1177
#endif
192✔
1178
    socketFd.release(); // to avoid auto-close by FDWrapper
192✔
1179
  }
192✔
1180
  if (doLog) {
175!
1181
    log->info(Logr::Info, "Listening for queries", "protocol", Logging::Loggable("TCP"), "addresses", Logging::IterLoggable(logVec.cbegin(), logVec.cend()), "socketInstances", Logging::Loggable(instances), "reuseport", Logging::Loggable(g_reusePort));
175✔
1182
  }
175✔
1183
  return localAddresses.size();
175✔
1184
}
175✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc