• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

PowerDNS / pdns / 12321902803

13 Dec 2024 07:34PM UTC coverage: 66.359% (+1.6%) from 64.78%
12321902803

Pull #14970

github

web-flow
Merge e3a7df61c into 3dfd8e317
Pull Request #14970: boost > std optional

26084 of 54744 branches covered (47.65%)

Branch coverage included in aggregate %.

14 of 15 new or added lines in 2 files covered. (93.33%)

1863 existing lines in 52 files now uncovered.

85857 of 113946 relevant lines covered (75.35%)

4412729.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.47
/pdns/recursordist/rec-tcp.cc
1
/*
2
 * This file is part of PowerDNS or dnsdist.
3
 * Copyright -- PowerDNS.COM B.V. and its contributors
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of version 2 of the GNU General Public License as
7
 * published by the Free Software Foundation.
8
 *
9
 * In addition, for the avoidance of any doubt, permission is granted to
10
 * link this program with OpenSSL and to (re)distribute the binaries
11
 * produced as the result of such linking.
12
 *
13
 * This program is distributed in the hope that it will be useful,
14
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
 * GNU General Public License for more details.
17
 *
18
 * You should have received a copy of the GNU General Public License
19
 * along with this program; if not, write to the Free Software
20
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21
 */
22

23
#include "rec-main.hh"
24

25
#include "arguments.hh"
26
#include "logger.hh"
27
#include "mplexer.hh"
28
#include "uuid-utils.hh"
29

30
// OLD PRE 5.0.0 situation:
31
//
32
// When pdns-distributes-queries is false with reuseport true (the default since 4.9.0), TCP queries
33
// are read and handled by worker threads. If the kernel balancing is OK for TCP sockets (observed
34
// to be good on Debian bullseye, but not good on e.g. MacOS), the TCP handling is no extra burden.
35
// In the case of MacOS all incoming TCP queries are handled by a single worker, while incoming UDP
36
// queries do get distributed round-robin over the worker threads.  Do note the TCP queries might
37
// need to wait until the g_maxUDPQueriesPerRound is reached.
38
//
39
// In the case of pdns-distributes-queries true and reuseport false the queries were read and
40
// initially processed by the distributor thread(s).
41
//
42
// Initial processing consist of parsing, calling gettag and checking if we have a packet cache
43
// hit. If that does not produce a hit, the query is passed to an mthread in the same way as with
44
// UDP queries, but do note that the mthread processing is serviced by the distributor thread. The
45
// final answer will be sent by the same distributor thread that originally picked up the query.
46
//
47
// Changing this, and having incoming TCP queries handled by worker threads is somewhat more complex
48
// than UDP, as the socket must remain available in the distributor thread (for reading more
49
// queries), but the TCP socket must also be passed to a worker thread so it can write its
50
// answer. The in-flight bookkeeping also has to be aware of how a query is handled to do the
51
// accounting properly. I am not sure if changing the current setup is worth all this trouble,
52
// especially since the default is now to not use pdns-distributes-queries, which works well in many
53
// cases.
54
//
55
// NEW SITUATION SINCE 5.0.0:
56
//
57
// The drawback mentioned in https://github.com/PowerDNS/pdns/issues/8394 are not longer true, so an
58
// alternative approach would be to introduce dedicated TCP worker thread(s).
59
//
60
// This approach was implemented in https://github.com/PowerDNS/pdns/pull/13195. The distributor and
61
// worker thread(s) now no longer process TCP queries.
62

63
size_t g_tcpMaxQueriesPerConn;
64
unsigned int g_maxTCPClients;
65
unsigned int g_maxTCPPerClient;
66
int g_tcpTimeout;
67
bool g_anyToTcp;
68

69
uint16_t TCPConnection::s_maxInFlight;
70

71
thread_local std::unique_ptr<tcpClientCounts_t> t_tcpClientCounts;
72

73
static void handleRunningTCPQuestion(int fileDesc, FDMultiplexer::funcparam_t& var);
74

75
#if 0
76
#define TCPLOG(tcpsock, x)                                 \
77
  do {                                                     \
78
    cerr << []() { timeval t; gettimeofday(&t, nullptr); return t.tv_sec % 10  + t.tv_usec/1000000.0; }() << " FD " << (tcpsock) << ' ' << x; \
79
  } while (0)
80
#else
81
// We do not define this as empty since that produces a duplicate case label warning from clang-tidy
82
#define TCPLOG(pid, x) /* NOLINT(cppcoreguidelines-macro-usage) */ \
83
  while (false) {                                                  \
1,808✔
84
    cerr << x; /* NOLINT(bugprone-macro-parentheses) */            \
×
85
  }
×
86
#endif
87

88
std::atomic<uint32_t> TCPConnection::s_currentConnections;
89

90
TCPConnection::TCPConnection(int fileDesc, const ComboAddress& addr) :
91
  data(2, 0), d_remote(addr), d_fd(fileDesc)
92
{
371✔
93
  ++s_currentConnections;
371✔
94
  (*t_tcpClientCounts)[d_remote]++;
371✔
95
}
371✔
96

97
TCPConnection::~TCPConnection()
98
{
371✔
99
  try {
371✔
100
    if (closesocket(d_fd) < 0) {
371!
101
      SLOG(g_log << Logger::Error << "Error closing socket for TCPConnection" << endl,
×
102
           g_slogtcpin->info(Logr::Error, "Error closing socket for TCPConnection"));
×
103
    }
×
104
  }
371✔
105
  catch (const PDNSException& e) {
371✔
106
    SLOG(g_log << Logger::Error << "Error closing TCPConnection socket: " << e.reason << endl,
×
107
         g_slogtcpin->error(Logr::Error, e.reason, "Error closing TCPConnection socket", "exception", Logging::Loggable("PDNSException")));
×
108
  }
×
109

110
  if (t_tcpClientCounts->count(d_remote) != 0 && (*t_tcpClientCounts)[d_remote]-- == 0) {
371!
111
    t_tcpClientCounts->erase(d_remote);
×
112
  }
×
113
  --s_currentConnections;
371✔
114
}
371✔
115

116
static void terminateTCPConnection(int fileDesc)
117
{
368✔
118
  try {
368✔
119
    t_fdm->removeReadFD(fileDesc);
368✔
120
  }
368✔
121
  catch (const FDMultiplexerException& fde) {
368✔
122
  }
×
123
}
368✔
124

125
static void sendErrorOverTCP(std::unique_ptr<DNSComboWriter>& comboWriter, int rcode)
126
{
×
127
  std::vector<uint8_t> packet;
×
128
  if (comboWriter->d_mdp.d_header.qdcount == 0U) {
×
129
    /* header-only */
130
    packet.resize(sizeof(dnsheader));
×
131
  }
×
132
  else {
×
133
    DNSPacketWriter packetWriter(packet, comboWriter->d_mdp.d_qname, comboWriter->d_mdp.d_qtype, comboWriter->d_mdp.d_qclass);
×
134
    if (comboWriter->d_mdp.hasEDNS()) {
×
135
      /* we try to add the EDNS OPT RR even for truncated answers,
136
         as rfc6891 states:
137
         "The minimal response MUST be the DNS header, question section, and an
138
         OPT record.  This MUST also occur when a truncated response (using
139
         the DNS header's TC bit) is returned."
140
      */
141
      packetWriter.addOpt(512, 0, 0);
×
142
      packetWriter.commit();
×
143
    }
×
144
  }
×
145

146
  auto& header = reinterpret_cast<dnsheader&>(packet.at(0)); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast) safe cast
×
147
  header.aa = 0;
×
148
  header.ra = 1;
×
149
  header.qr = 1;
×
150
  header.tc = 0;
×
151
  header.id = comboWriter->d_mdp.d_header.id;
×
152
  header.rd = comboWriter->d_mdp.d_header.rd;
×
153
  header.cd = comboWriter->d_mdp.d_header.cd;
×
154
  header.rcode = rcode;
×
155

156
  sendResponseOverTCP(comboWriter, packet);
×
157
}
×
158

159
void finishTCPReply(std::unique_ptr<DNSComboWriter>& comboWriter, bool hadError, bool updateInFlight)
160
{
242✔
161
  // update tcp connection status, closing if needed and doing the fd multiplexer accounting
162
  if (updateInFlight && comboWriter->d_tcpConnection->d_requestsInFlight > 0) {
242!
163
    comboWriter->d_tcpConnection->d_requestsInFlight--;
204✔
164
  }
204✔
165

166
  // In the code below, we try to remove the fd from the set, but
167
  // we don't know if another mthread already did the remove, so we can get a
168
  // "Tried to remove unlisted fd" exception.  Not that an inflight < limit test
169
  // will not work since we do not know if the other mthread got an error or not.
170
  if (hadError) {
242!
171
    terminateTCPConnection(comboWriter->d_socket);
×
172
    comboWriter->d_socket = -1;
×
173
    return;
×
174
  }
×
175
  comboWriter->d_tcpConnection->queriesCount++;
242✔
176
  if ((g_tcpMaxQueriesPerConn > 0 && comboWriter->d_tcpConnection->queriesCount >= g_tcpMaxQueriesPerConn) || (comboWriter->d_tcpConnection->isDropOnIdle() && comboWriter->d_tcpConnection->d_requestsInFlight == 0)) {
242!
177
    try {
3✔
178
      t_fdm->removeReadFD(comboWriter->d_socket);
3✔
179
    }
3✔
180
    catch (FDMultiplexerException&) {
3✔
181
    }
×
182
    comboWriter->d_socket = -1;
3✔
183
    return;
3✔
184
  }
3✔
185

186
  Utility::gettimeofday(&g_now, nullptr); // needs to be updated
239✔
187
  struct timeval ttd = g_now;
239✔
188

189
  // If we cross from max to max-1 in flight requests, the fd was not listened to, add it back
190
  if (updateInFlight && comboWriter->d_tcpConnection->d_requestsInFlight == TCPConnection::s_maxInFlight - 1) {
239!
191
    // A read error might have happened. If we add the fd back, it will most likely error again.
192
    // This is not a big issue, the next handleTCPClientReadable() will see another read error
193
    // and take action.
194
    ttd.tv_sec += g_tcpTimeout;
×
195
    t_fdm->addReadFD(comboWriter->d_socket, handleRunningTCPQuestion, comboWriter->d_tcpConnection, &ttd);
×
196
    return;
×
197
  }
×
198
  // fd might have been removed by read error code, or a read timeout, so expect an exception
199
  try {
239✔
200
    t_fdm->setReadTTD(comboWriter->d_socket, ttd, g_tcpTimeout);
239✔
201
  }
239✔
202
  catch (const FDMultiplexerException&) {
239✔
203
    // but if the FD was removed because of a timeout while we were sending a response,
204
    // we need to re-arm it. If it was an error it will error again.
205
    ttd.tv_sec += g_tcpTimeout;
×
206
    t_fdm->addReadFD(comboWriter->d_socket, handleRunningTCPQuestion, comboWriter->d_tcpConnection, &ttd);
×
207
  }
×
208
}
239✔
209

210
/*
211
 * A helper class that by default closes the incoming TCP connection on destruct
212
 * If you want to keep the connection alive, call keep() on the guard object
213
 */
214
class RunningTCPQuestionGuard
215
{
216
public:
217
  RunningTCPQuestionGuard(const RunningTCPQuestionGuard&) = default;
218
  RunningTCPQuestionGuard(RunningTCPQuestionGuard&&) = delete;
219
  RunningTCPQuestionGuard& operator=(const RunningTCPQuestionGuard&) = default;
220
  RunningTCPQuestionGuard& operator=(RunningTCPQuestionGuard&&) = delete;
221
  RunningTCPQuestionGuard(int fileDesc) :
222
    d_fd(fileDesc) {}
936✔
223
  ~RunningTCPQuestionGuard()
224
  {
936✔
225
    if (d_fd != -1) {
936✔
226
      terminateTCPConnection(d_fd);
368✔
227
      d_fd = -1;
368✔
228
    }
368✔
229
  }
936✔
230
  void keep()
231
  {
810✔
232
    d_fd = -1;
810✔
233
  }
810✔
234
  bool handleTCPReadResult(int /* fd */, ssize_t bytes)
235
  {
399✔
236
    if (bytes == 0) {
399✔
237
      /* EOF */
238
      return false;
358✔
239
    }
358✔
240
    if (bytes < 0) {
41!
241
      if (errno != EAGAIN && errno != EWOULDBLOCK) {
41!
242
        return false;
×
243
      }
×
244
    }
41✔
245
    keep();
41✔
246
    return true;
41✔
247
  }
41✔
248

249
private:
250
  int d_fd{-1};
251
};
252

253
static void handleNotify(std::unique_ptr<DNSComboWriter>& comboWriter, const DNSName& qname)
254
{
1✔
255
  if (!t_allowNotifyFrom || !t_allowNotifyFrom->match(comboWriter->d_mappedSource)) {
1!
256
    if (!g_quiet) {
×
257
      SLOG(g_log << Logger::Error << "[" << g_multiTasker->getTid() << "] dropping TCP NOTIFY from " << comboWriter->d_mappedSource.toString() << ", address not matched by allow-notify-from" << endl,
×
258
           g_slogtcpin->info(Logr::Error, "Dropping TCP NOTIFY, address not matched by allow-notify-from", "source", Logging::Loggable(comboWriter->d_mappedSource)));
×
259
    }
×
260

261
    t_Counters.at(rec::Counter::sourceDisallowedNotify)++;
×
262
    return;
×
263
  }
×
264

265
  if (!isAllowNotifyForZone(qname)) {
1!
266
    if (!g_quiet) {
×
267
      SLOG(g_log << Logger::Error << "[" << g_multiTasker->getTid() << "] dropping TCP NOTIFY from " << comboWriter->d_mappedSource.toString() << ", for " << qname.toLogString() << ", zone not matched by allow-notify-for" << endl,
×
268
           g_slogtcpin->info(Logr::Error, "Dropping TCP NOTIFY,  zone not matched by allow-notify-for", "source", Logging::Loggable(comboWriter->d_mappedSource), "zone", Logging::Loggable(qname)));
×
269
    }
×
270

271
    t_Counters.at(rec::Counter::zoneDisallowedNotify)++;
×
272
    return;
×
273
  }
×
274
}
1✔
275

276
static void doProtobufLogQuery(bool logQuery, LocalStateHolder<LuaConfigItems>& luaconfsLocal, const std::unique_ptr<DNSComboWriter>& comboWriter, const DNSName& qname, QType qtype, QClass qclass, const dnsheader* dnsheader, const shared_ptr<TCPConnection>& conn, const boost::optional<uint32_t>& ednsVersion)
277
{
4✔
278
  try {
4✔
279
    if (logQuery && !(luaconfsLocal->protobufExportConfig.taggedOnly && comboWriter->d_policyTags.empty())) {
4!
280
      protobufLogQuery(luaconfsLocal, comboWriter->d_uuid, comboWriter->d_source, comboWriter->d_destination, comboWriter->d_mappedSource, comboWriter->d_ednssubnet.source, true, conn->qlen, qname, qtype, qclass, comboWriter->d_policyTags, comboWriter->d_requestorId, comboWriter->d_deviceId, comboWriter->d_deviceName, comboWriter->d_meta, ednsVersion, *dnsheader);
×
281
    }
×
282
  }
4✔
283
  catch (const std::exception& e) {
4✔
284
    if (g_logCommonErrors) {
×
285
      SLOG(g_log << Logger::Warning << "Error parsing a TCP query packet for edns subnet: " << e.what() << endl,
×
286
           g_slogtcpin->error(Logr::Warning, e.what(), "Error parsing a TCP query packet for edns subnet", "exception", Logging::Loggable("std::exception"), "remote", Logging::Loggable(conn->d_remote)));
×
287
    }
×
288
  }
×
289
}
4✔
290

291
static void doProcessTCPQuestion(std::unique_ptr<DNSComboWriter>& comboWriter, shared_ptr<TCPConnection>& conn, RunningTCPQuestionGuard& tcpGuard, int fileDesc)
292
{
244✔
293
  RecThreadInfo::self().incNumberOfDistributedQueries();
244✔
294
  struct timeval start
244✔
295
  {
244✔
296
  };
244✔
297
  Utility::gettimeofday(&start, nullptr);
244✔
298

299
  DNSName qname;
244✔
300
  uint16_t qtype = 0;
244✔
301
  uint16_t qclass = 0;
244✔
302
  bool needEDNSParse = false;
244✔
303
  string requestorId;
244✔
304
  string deviceId;
244✔
305
  string deviceName;
244✔
306
  bool logQuery = false;
244✔
307
  bool qnameParsed = false;
244✔
308
  boost::optional<uint32_t> ednsVersion;
244✔
309

310
  comboWriter->d_eventTrace.setEnabled(SyncRes::s_event_trace_enabled != 0);
244✔
311
  comboWriter->d_eventTrace.add(RecEventTrace::ReqRecv);
244✔
312
  auto luaconfsLocal = g_luaconfs.getLocal();
244✔
313
  if (checkProtobufExport(luaconfsLocal)) {
244✔
314
    needEDNSParse = true;
4✔
315
  }
4✔
316
  logQuery = t_protobufServers.servers && luaconfsLocal->protobufExportConfig.logQueries;
244!
317
  comboWriter->d_logResponse = t_protobufServers.servers && luaconfsLocal->protobufExportConfig.logResponses;
244!
318

319
  if (needEDNSParse || (t_pdl && (t_pdl->hasGettagFFIFunc() || t_pdl->hasGettagFunc())) || comboWriter->d_mdp.d_header.opcode == static_cast<unsigned>(Opcode::Notify)) {
244✔
320

321
    try {
41✔
322
      EDNSOptionViewMap ednsOptions;
41✔
323
      comboWriter->d_ecsParsed = true;
41✔
324
      comboWriter->d_ecsFound = false;
41✔
325
      getQNameAndSubnet(conn->data, &qname, &qtype, &qclass,
41✔
326
                        comboWriter->d_ecsFound, &comboWriter->d_ednssubnet, g_gettagNeedsEDNSOptions ? &ednsOptions : nullptr, ednsVersion);
41✔
327
      qnameParsed = true;
41✔
328

329
      if (t_pdl) {
41✔
330
        try {
40✔
331
          if (t_pdl->hasGettagFFIFunc()) {
40✔
332
            RecursorLua4::FFIParams params(qname, qtype, comboWriter->d_local, comboWriter->d_remote, comboWriter->d_destination, comboWriter->d_source, comboWriter->d_ednssubnet.source, comboWriter->d_data, comboWriter->d_gettagPolicyTags, comboWriter->d_records, ednsOptions, comboWriter->d_proxyProtocolValues, requestorId, deviceId, deviceName, comboWriter->d_routingTag, comboWriter->d_rcode, comboWriter->d_ttlCap, comboWriter->d_variable, true, logQuery, comboWriter->d_logResponse, comboWriter->d_followCNAMERecords, comboWriter->d_extendedErrorCode, comboWriter->d_extendedErrorExtra, comboWriter->d_responsePaddingDisabled, comboWriter->d_meta);
23✔
333
            comboWriter->d_eventTrace.add(RecEventTrace::LuaGetTagFFI);
23✔
334
            comboWriter->d_tag = t_pdl->gettag_ffi(params);
23✔
335
            comboWriter->d_eventTrace.add(RecEventTrace::LuaGetTagFFI, comboWriter->d_tag, false);
23✔
336
          }
23✔
337
          else if (t_pdl->hasGettagFunc()) {
17!
338
            comboWriter->d_eventTrace.add(RecEventTrace::LuaGetTag);
17✔
339
            comboWriter->d_tag = t_pdl->gettag(comboWriter->d_source, comboWriter->d_ednssubnet.source, comboWriter->d_destination, qname, qtype, &comboWriter->d_gettagPolicyTags, comboWriter->d_data, ednsOptions, true, requestorId, deviceId, deviceName, comboWriter->d_routingTag, comboWriter->d_proxyProtocolValues);
17✔
340
            comboWriter->d_eventTrace.add(RecEventTrace::LuaGetTag, comboWriter->d_tag, false);
17✔
341
          }
17✔
342
          // Copy d_gettagPolicyTags to d_policyTags, so other Lua hooks see them and can add their
343
          // own. Before storing into the packetcache, the tags in d_gettagPolicyTags will be
344
          // cleared by addPolicyTagsToPBMessageIfNeeded() so they do *not* end up in the PC. When an
345
          // Protobuf message is constructed, one part comes from the PC (including the tags
346
          // set by non-gettag hooks), and the tags in d_gettagPolicyTags will be added by the code
347
          // constructing the PB message.
348
          comboWriter->d_policyTags = comboWriter->d_gettagPolicyTags;
40✔
349
        }
40✔
350
        catch (const MOADNSException& moadnsexception) {
40✔
351
          if (g_logCommonErrors) {
×
352
            g_slogtcpin->error(moadnsexception.what(), "Error parsing a query packet for tag determination", "qname", Logging::Loggable(qname), "excepion", Logging::Loggable("MOADNSException"));
×
353
          }
×
354
        }
×
355
        catch (const std::exception& stdException) {
40✔
356
          g_rateLimitedLogger.log(g_slogtcpin, "Error parsing a query packet for tag determination", stdException, "qname", Logging::Loggable(qname), "remote", Logging::Loggable(conn->d_remote));
×
357
        }
×
358
      }
40✔
359
    }
41✔
360
    catch (const std::exception& stdException) {
41✔
361
      g_rateLimitedLogger.log(g_slogudpin, "Error parsing a query packet for tag determination, setting tag=0", stdException, "remote", Logging::Loggable(conn->d_remote));
×
362
    }
×
363
  }
41✔
364

365
  if (comboWriter->d_tag == 0 && !comboWriter->d_responsePaddingDisabled && g_paddingFrom.match(comboWriter->d_remote)) {
244!
366
    comboWriter->d_tag = g_paddingTag;
×
367
  }
×
368

369
  const dnsheader_aligned headerdata(conn->data.data());
244✔
370
  const struct dnsheader* dnsheader = headerdata.get();
244✔
371

372
  if (t_protobufServers.servers || t_outgoingProtobufServers.servers) {
244!
373
    comboWriter->d_requestorId = std::move(requestorId);
4✔
374
    comboWriter->d_deviceId = std::move(deviceId);
4✔
375
    comboWriter->d_deviceName = std::move(deviceName);
4✔
376
    comboWriter->d_uuid = getUniqueID();
4✔
377
  }
4✔
378

379
  if (t_protobufServers.servers) {
244✔
380
    doProtobufLogQuery(logQuery, luaconfsLocal, comboWriter, qname, qtype, qclass, dnsheader, conn, ednsVersion);
4✔
381
  }
4✔
382

383
  if (t_pdl) {
244✔
384
    bool ipf = t_pdl->ipfilter(comboWriter->d_source, comboWriter->d_destination, *dnsheader, comboWriter->d_eventTrace);
188✔
385
    if (ipf) {
188✔
386
      if (!g_quiet) {
2!
387
        SLOG(g_log << Logger::Notice << RecThreadInfo::id() << " [" << g_multiTasker->getTid() << "/" << g_multiTasker->numProcesses() << "] DROPPED TCP question from " << comboWriter->d_source.toStringWithPort() << (comboWriter->d_source != comboWriter->d_remote ? " (via " + comboWriter->d_remote.toStringWithPort() + ")" : "") << " based on policy" << endl,
2✔
388
             g_slogtcpin->info(Logr::Info, "Dropped TCP question based on policy", "remote", Logging::Loggable(conn->d_remote), "source", Logging::Loggable(comboWriter->d_source)));
2✔
389
      }
2✔
390
      t_Counters.at(rec::Counter::policyDrops)++;
2✔
391
      return;
2✔
392
    }
2✔
393
  }
188✔
394

395
  if (comboWriter->d_mdp.d_header.qr) {
242!
396
    t_Counters.at(rec::Counter::ignoredCount)++;
×
397
    if (g_logCommonErrors) {
×
398
      SLOG(g_log << Logger::Error << "Ignoring answer from TCP client " << comboWriter->getRemote() << " on server socket!" << endl,
×
399
           g_slogtcpin->info(Logr::Error, "Ignoring answer from TCP client on server socket", "remote", Logging::Loggable(comboWriter->getRemote())));
×
400
    }
×
401
    return;
×
402
  }
×
403
  if (comboWriter->d_mdp.d_header.opcode != static_cast<unsigned>(Opcode::Query) && comboWriter->d_mdp.d_header.opcode != static_cast<unsigned>(Opcode::Notify)) {
242!
404
    t_Counters.at(rec::Counter::ignoredCount)++;
×
405
    if (g_logCommonErrors) {
×
406
      SLOG(g_log << Logger::Error << "Ignoring unsupported opcode " << Opcode::to_s(comboWriter->d_mdp.d_header.opcode) << " from TCP client " << comboWriter->getRemote() << " on server socket!" << endl,
×
407
           g_slogtcpin->info(Logr::Error, "Ignoring unsupported opcode from TCP client", "remote", Logging::Loggable(comboWriter->getRemote()), "opcode", Logging::Loggable(Opcode::to_s(comboWriter->d_mdp.d_header.opcode))));
×
408
    }
×
409
    sendErrorOverTCP(comboWriter, RCode::NotImp);
×
410
    tcpGuard.keep();
×
411
    return;
×
412
  }
×
413
  if (dnsheader->qdcount == 0U) {
242!
414
    t_Counters.at(rec::Counter::emptyQueriesCount)++;
×
415
    if (g_logCommonErrors) {
×
416
      SLOG(g_log << Logger::Error << "Ignoring empty (qdcount == 0) query from " << comboWriter->getRemote() << " on server socket!" << endl,
×
417
           g_slogtcpin->info(Logr::Error, "Ignoring empty (qdcount == 0) query on server socket", "remote", Logging::Loggable(comboWriter->getRemote())));
×
418
    }
×
419
    sendErrorOverTCP(comboWriter, RCode::NotImp);
×
420
    tcpGuard.keep();
×
421
    return;
×
422
  }
×
423
  {
242✔
424
    // We have read a proper query
425
    ++t_Counters.at(rec::Counter::qcounter);
242✔
426
    ++t_Counters.at(rec::Counter::tcpqcounter);
242✔
427
    if (comboWriter->d_source.sin4.sin_family == AF_INET6) {
242✔
428
      ++t_Counters.at(rec::Counter::ipv6qcounter);
15✔
429
    }
15✔
430

431
    if (comboWriter->d_mdp.d_header.opcode == static_cast<unsigned>(Opcode::Notify)) {
242✔
432
      handleNotify(comboWriter, qname);
1✔
433
    }
1✔
434

435
    string response;
242✔
436
    RecursorPacketCache::OptPBData pbData{boost::none};
242✔
437

438
    if (comboWriter->d_mdp.d_header.opcode == static_cast<unsigned>(Opcode::Query)) {
242✔
439
      /* It might seem like a good idea to skip the packet cache lookup if we know that the answer is not cacheable,
440
         but it means that the hash would not be computed. If some script decides at a later time to mark back the answer
441
         as cacheable we would cache it with a wrong tag, so better safe than sorry. */
442
      comboWriter->d_eventTrace.add(RecEventTrace::PCacheCheck);
241✔
443
      bool cacheHit = checkForCacheHit(qnameParsed, comboWriter->d_tag, conn->data, qname, qtype, qclass, g_now, response, comboWriter->d_qhash, pbData, true, comboWriter->d_source, comboWriter->d_mappedSource);
241✔
444
      comboWriter->d_eventTrace.add(RecEventTrace::PCacheCheck, cacheHit, false);
241✔
445

446
      if (cacheHit) {
241✔
447
        if (!g_quiet) {
38!
448
          SLOG(g_log << Logger::Notice << RecThreadInfo::id() << " TCP question answered from packet cache tag=" << comboWriter->d_tag << " from " << comboWriter->d_source.toStringWithPort() << (comboWriter->d_source != comboWriter->d_remote ? " (via " + comboWriter->d_remote.toStringWithPort() + ")" : "") << endl,
38✔
449
               g_slogtcpin->info(Logr::Notice, "TCP question answered from packet cache", "tag", Logging::Loggable(comboWriter->d_tag),
38✔
450
                                 "qname", Logging::Loggable(qname), "qtype", Logging::Loggable(QType(qtype)),
38✔
451
                                 "source", Logging::Loggable(comboWriter->d_source), "remote", Logging::Loggable(comboWriter->d_remote)));
38✔
452
        }
38✔
453

454
        bool hadError = sendResponseOverTCP(comboWriter, response);
38✔
455
        finishTCPReply(comboWriter, hadError, false);
38✔
456
        struct timeval now
38✔
457
        {
38✔
458
        };
38✔
459
        Utility::gettimeofday(&now, nullptr);
38✔
460
        uint64_t spentUsec = uSec(now - start);
38✔
461
        t_Counters.at(rec::Histogram::cumulativeAnswers)(spentUsec);
38✔
462
        comboWriter->d_eventTrace.add(RecEventTrace::AnswerSent);
38✔
463

464
        if (t_protobufServers.servers && comboWriter->d_logResponse && (!luaconfsLocal->protobufExportConfig.taggedOnly || !pbData || pbData->d_tagged)) {
38!
465
          struct timeval tval
2✔
466
          {
2✔
467
            0, 0
2✔
468
          };
2✔
469
          protobufLogResponse(dnsheader, luaconfsLocal, pbData, tval, true, comboWriter->d_source, comboWriter->d_destination, comboWriter->d_mappedSource, comboWriter->d_ednssubnet, comboWriter->d_uuid, comboWriter->d_requestorId, comboWriter->d_deviceId, comboWriter->d_deviceName, comboWriter->d_meta, comboWriter->d_eventTrace, comboWriter->d_policyTags);
2✔
470
        }
2✔
471

472
        if (comboWriter->d_eventTrace.enabled() && (SyncRes::s_event_trace_enabled & SyncRes::event_trace_to_log) != 0) {
38!
473
          SLOG(g_log << Logger::Info << comboWriter->d_eventTrace.toString() << endl,
×
474
               g_slogtcpin->info(Logr::Info, comboWriter->d_eventTrace.toString())); // More fancy?
×
475
        }
×
476
        tcpGuard.keep();
38✔
477
        t_Counters.updateSnap(g_regressionTestMode);
38✔
478
        return;
38✔
479
      } // cache hit
38✔
480
    } // query opcode
241✔
481

482
    if (comboWriter->d_mdp.d_header.opcode == static_cast<unsigned>(Opcode::Notify)) {
204✔
483
      if (!g_quiet) {
1!
484
        SLOG(g_log << Logger::Notice << RecThreadInfo::id() << " got NOTIFY for " << qname.toLogString() << " from " << comboWriter->d_source.toStringWithPort() << (comboWriter->d_source != comboWriter->d_remote ? " (via " + comboWriter->d_remote.toStringWithPort() + ")" : "") << endl,
1✔
485
             g_slogtcpin->info(Logr::Notice, "Got NOTIFY", "qname", Logging::Loggable(qname), "source", Logging::Loggable(comboWriter->d_source), "remote", Logging::Loggable(comboWriter->d_remote)));
1✔
486
      }
1✔
487

488
      requestWipeCaches(qname);
1✔
489

490
      // the operation will now be treated as a Query, generating
491
      // a normal response, as the rest of the code does not
492
      // check dh->opcode, but we need to ensure that the response
493
      // to this request does not get put into the packet cache
494
      comboWriter->d_variable = true;
1✔
495
    }
1✔
496

497
    // setup for startDoResolve() in an mthread
498
    ++conn->d_requestsInFlight;
204✔
499
    if (conn->d_requestsInFlight >= TCPConnection::s_maxInFlight) {
204!
500
      t_fdm->removeReadFD(fileDesc); // should no longer awake ourselves when there is data to read
×
501
    }
×
502
    else {
204✔
503
      Utility::gettimeofday(&g_now, nullptr); // needed?
204✔
504
      struct timeval ttd = g_now;
204✔
505
      t_fdm->setReadTTD(fileDesc, ttd, g_tcpTimeout);
204✔
506
    }
204✔
507
    tcpGuard.keep();
204✔
508
    g_multiTasker->makeThread(startDoResolve, comboWriter.release()); // deletes dc
204✔
509
  } // good query
204✔
510
}
204✔
511

512
static void handleRunningTCPQuestion(int fileDesc, FDMultiplexer::funcparam_t& var) // NOLINT(readability-function-cognitive-complexity)
513
{
936✔
514
  auto conn = boost::any_cast<shared_ptr<TCPConnection>>(var);
936✔
515

516
  RunningTCPQuestionGuard tcpGuard{fileDesc};
936✔
517

518
  if (conn->state == TCPConnection::PROXYPROTOCOLHEADER) {
936✔
519
    ssize_t bytes = recv(conn->getFD(), &conn->data.at(conn->proxyProtocolGot), conn->proxyProtocolNeed, 0);
126✔
520
    if (bytes <= 0) {
126✔
521
      tcpGuard.handleTCPReadResult(fileDesc, bytes);
6✔
522
      return;
6✔
523
    }
6✔
524

525
    conn->proxyProtocolGot += bytes;
120✔
526
    conn->data.resize(conn->proxyProtocolGot);
120✔
527
    ssize_t remaining = isProxyHeaderComplete(conn->data);
120✔
528
    if (remaining == 0) {
120✔
529
      if (g_logCommonErrors) {
4!
530
        SLOG(g_log << Logger::Error << "Unable to consume proxy protocol header in packet from TCP client " << conn->d_remote.toStringWithPort() << endl,
4✔
531
             g_slogtcpin->info(Logr::Error, "Unable to consume proxy protocol header in packet from TCP client", "remote", Logging::Loggable(conn->d_remote)));
4✔
532
      }
4✔
533
      ++t_Counters.at(rec::Counter::proxyProtocolInvalidCount);
4✔
534
      return;
4✔
535
    }
4✔
536
    if (remaining < 0) {
116✔
537
      conn->proxyProtocolNeed = -remaining;
95✔
538
      conn->data.resize(conn->proxyProtocolGot + conn->proxyProtocolNeed);
95✔
539
      tcpGuard.keep();
95✔
540
      return;
95✔
541
    }
95✔
542
    {
21✔
543
      /* proxy header received */
544
      /* we ignore the TCP field for now, but we could properly set whether
545
         the connection was received over UDP or TCP if needed */
546
      bool tcp = false;
21✔
547
      bool proxy = false;
21✔
548
      size_t used = parseProxyHeader(conn->data, proxy, conn->d_source, conn->d_destination, tcp, conn->proxyProtocolValues);
21✔
549
      if (used <= 0) {
21!
550
        if (g_logCommonErrors) {
×
551
          SLOG(g_log << Logger::Error << "Unable to parse proxy protocol header in packet from TCP client " << conn->d_remote.toStringWithPort() << endl,
×
552
               g_slogtcpin->info(Logr::Error, "Unable to parse proxy protocol header in packet from TCP client", "remote", Logging::Loggable(conn->d_remote)));
×
553
        }
×
554
        ++t_Counters.at(rec::Counter::proxyProtocolInvalidCount);
×
555
        return;
×
556
      }
×
557
      if (static_cast<size_t>(used) > g_proxyProtocolMaximumSize) {
21✔
558
        if (g_logCommonErrors) {
2!
559
          SLOG(g_log << Logger::Error << "Proxy protocol header in packet from TCP client " << conn->d_remote.toStringWithPort() << " is larger than proxy-protocol-maximum-size (" << used << "), dropping" << endl,
2✔
560
               g_slogtcpin->info(Logr::Error, "Proxy protocol header in packet from TCP client is larger than proxy-protocol-maximum-size", "remote", Logging::Loggable(conn->d_remote), "size", Logging::Loggable(used)));
2✔
561
        }
2✔
562
        ++t_Counters.at(rec::Counter::proxyProtocolInvalidCount);
2✔
563
        return;
2✔
564
      }
2✔
565

566
      /* Now that we have retrieved the address of the client, as advertised by the proxy
567
         via the proxy protocol header, check that it is allowed by our ACL */
568
      /* note that if the proxy header used a 'LOCAL' command, the original source and destination are untouched so everything should be fine */
569
      conn->d_mappedSource = conn->d_source;
19✔
570
      if (t_proxyMapping) {
19!
571
        if (const auto* iter = t_proxyMapping->lookup(conn->d_source)) {
×
572
          conn->d_mappedSource = iter->second.address;
×
573
          ++iter->second.stats.netmaskMatches;
×
574
        }
×
575
      }
×
576
      if (t_remotes) {
19!
577
        t_remotes->push_back(conn->d_source);
19✔
578
      }
19✔
579
      if (t_allowFrom && !t_allowFrom->match(&conn->d_mappedSource)) {
19!
580
        if (!g_quiet) {
4!
581
          SLOG(g_log << Logger::Error << "[" << g_multiTasker->getTid() << "] dropping TCP query from " << conn->d_mappedSource.toString() << ", address not matched by allow-from" << endl,
4✔
582
               g_slogtcpin->info(Logr::Error, "Dropping TCP query, address not matched by allow-from", "remote", Logging::Loggable(conn->d_remote)));
4✔
583
        }
4✔
584

585
        ++t_Counters.at(rec::Counter::unauthorizedTCP);
4✔
586
        return;
4✔
587
      }
4✔
588

589
      conn->data.resize(2);
15✔
590
      conn->state = TCPConnection::BYTE0;
15✔
591
    }
15✔
592
  }
15✔
593

594
  if (conn->state == TCPConnection::BYTE0) {
825✔
595
    ssize_t bytes = recv(conn->getFD(), conn->data.data(), 2, 0);
598✔
596
    if (bytes == 1) {
598✔
597
      conn->state = TCPConnection::BYTE1;
2✔
598
    }
2✔
599
    if (bytes == 2) {
598✔
600
      conn->qlen = (((unsigned char)conn->data[0]) << 8) + (unsigned char)conn->data[1];
244✔
601
      conn->data.resize(conn->qlen);
244✔
602
      conn->bytesread = 0;
244✔
603
      conn->state = TCPConnection::GETQUESTION;
244✔
604
    }
244✔
605
    if (bytes <= 0) {
598✔
606
      tcpGuard.handleTCPReadResult(fileDesc, bytes);
352✔
607
      return;
352✔
608
    }
352✔
609
  }
598✔
610

611
  if (conn->state == TCPConnection::BYTE1) {
473✔
612
    ssize_t bytes = recv(conn->getFD(), &conn->data[1], 1, 0);
4✔
613
    if (bytes == 1) {
4✔
614
      conn->state = TCPConnection::GETQUESTION;
2✔
615
      conn->qlen = (((unsigned char)conn->data[0]) << 8) + (unsigned char)conn->data[1];
2✔
616
      conn->data.resize(conn->qlen);
2✔
617
      conn->bytesread = 0;
2✔
618
    }
2✔
619
    if (bytes <= 0) {
4✔
620
      if (!tcpGuard.handleTCPReadResult(fileDesc, bytes)) {
2!
621
        if (g_logCommonErrors) {
×
622
          SLOG(g_log << Logger::Error << "TCP client " << conn->d_remote.toStringWithPort() << " disconnected after first byte" << endl,
×
623
               g_slogtcpin->info(Logr::Error, "TCP client disconnected after first byte", "remote", Logging::Loggable(conn->d_remote)));
×
624
        }
×
625
      }
×
626
      return;
2✔
627
    }
2✔
628
  }
4✔
629

630
  if (conn->state == TCPConnection::GETQUESTION) {
471!
631
    ssize_t bytes = recv(conn->getFD(), &conn->data[conn->bytesread], conn->qlen - conn->bytesread, 0);
471✔
632
    if (bytes <= 0) {
471✔
633
      if (!tcpGuard.handleTCPReadResult(fileDesc, bytes)) {
39✔
634
        if (g_logCommonErrors) {
2!
635
          SLOG(g_log << Logger::Error << "TCP client " << conn->d_remote.toStringWithPort() << " disconnected while reading question body" << endl,
2✔
636
               g_slogtcpin->info(Logr::Error, "TCP client disconnected while reading question body", "remote", Logging::Loggable(conn->d_remote)));
2✔
637
        }
2✔
638
      }
2✔
639
      return;
39✔
640
    }
39✔
641
    if (bytes > std::numeric_limits<std::uint16_t>::max()) {
432!
642
      if (g_logCommonErrors) {
×
643
        SLOG(g_log << Logger::Error << "TCP client " << conn->d_remote.toStringWithPort() << " sent an invalid question size while reading question body" << endl,
×
644
             g_slogtcpin->info(Logr::Error, "TCP client sent an invalid question size while reading question body", "remote", Logging::Loggable(conn->d_remote)));
×
645
      }
×
646
      return;
×
647
    }
×
648
    conn->bytesread += (uint16_t)bytes;
432✔
649
    if (conn->bytesread == conn->qlen) {
432✔
650
      conn->state = TCPConnection::BYTE0;
244✔
651
      std::unique_ptr<DNSComboWriter> comboWriter;
244✔
652
      try {
244✔
653
        comboWriter = std::make_unique<DNSComboWriter>(conn->data, g_now, t_pdl);
244✔
654
      }
244✔
655
      catch (const MOADNSException& mde) {
244✔
656
        t_Counters.at(rec::Counter::clientParseError)++;
×
657
        if (g_logCommonErrors) {
×
658
          SLOG(g_log << Logger::Error << "Unable to parse packet from TCP client " << conn->d_remote.toStringWithPort() << endl,
×
659
               g_slogtcpin->info(Logr::Error, "Unable to parse packet from TCP client", "remte", Logging::Loggable(conn->d_remote)));
×
660
        }
×
661
        return;
×
662
      }
×
663

664
      comboWriter->d_tcpConnection = conn; // carry the torch
244✔
665
      comboWriter->setSocket(conn->getFD()); // this is the only time a copy is made of the actual fd
244✔
666
      comboWriter->d_tcp = true;
244✔
667
      comboWriter->setRemote(conn->d_remote); // the address the query was received from
244✔
668
      comboWriter->setSource(conn->d_source); // the address we assume the query is coming from, might be set by proxy protocol
244✔
669
      ComboAddress dest;
244✔
670
      dest.reset();
244✔
671
      dest.sin4.sin_family = conn->d_remote.sin4.sin_family;
244✔
672
      socklen_t len = dest.getSocklen();
244✔
673
      getsockname(conn->getFD(), reinterpret_cast<sockaddr*>(&dest), &len); // if this fails, we're ok with it NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
244✔
674
      comboWriter->setLocal(dest); // the address we received the query on
244✔
675
      comboWriter->setDestination(conn->d_destination); // the address we assume the query is received on, might be set by proxy protocol
244✔
676
      comboWriter->setMappedSource(conn->d_mappedSource); // the address we assume the query is coming from after table based mapping
244✔
677
      /* we can't move this if we want to be able to access the values in
678
         all queries sent over this connection */
679
      comboWriter->d_proxyProtocolValues = conn->proxyProtocolValues;
244✔
680

681
      doProcessTCPQuestion(comboWriter, conn, tcpGuard, fileDesc);
244✔
682
    } // reading query
244✔
683
  }
432✔
684
  // more to come
685
  tcpGuard.keep();
432✔
686
}
432✔
687

688
//! Handle new incoming TCP connection
689
void handleNewTCPQuestion(int fileDesc, [[maybe_unused]] FDMultiplexer::funcparam_t& var)
690
{
371✔
691
  ComboAddress addr;
371✔
692
  socklen_t addrlen = sizeof(addr);
371✔
693
  int newsock = accept(fileDesc, reinterpret_cast<struct sockaddr*>(&addr), &addrlen); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
371✔
694
  if (newsock < 0) {
371!
695
    return;
×
696
  }
×
697
  auto closeSock = [newsock](rec::Counter cnt, const string& msg) {
371✔
698
    try {
×
699
      closesocket(newsock);
×
700
      t_Counters.at(cnt)++;
×
701
      // We want this bump to percolate up without too much delay
702
      t_Counters.updateSnap(false);
×
703
    }
×
704
    catch (const PDNSException& e) {
×
705
      g_slogtcpin->error(Logr::Error, e.reason, msg, "exception", Logging::Loggable("PDNSException"));
×
706
    }
×
707
  };
×
708

709
  if (TCPConnection::getCurrentConnections() >= g_maxTCPClients) {
371!
710
    closeSock(rec::Counter::tcpOverflow, "Error closing TCP socket after an overflow drop");
×
711
    return;
×
712
  }
×
713
  if (g_multiTasker->numProcesses() >= g_maxMThreads) {
371!
714
    closeSock(rec::Counter::overCapacityDrops, "Error closing TCP socket after an over capacity drop");
×
715
    return;
×
716
  }
×
717

718
  ComboAddress destaddr;
371✔
719
  socklen_t len = sizeof(destaddr);
371✔
720
  getsockname(newsock, reinterpret_cast<sockaddr*>(&destaddr), &len); // if this fails, we're ok with it NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
371✔
721
  bool fromProxyProtocolSource = expectProxyProtocol(addr, destaddr);
371✔
722
  if (!fromProxyProtocolSource && t_remotes) {
371!
723
    t_remotes->push_back(addr);
340✔
724
  }
340✔
725
  ComboAddress mappedSource = addr;
371✔
726
  if (!fromProxyProtocolSource && t_proxyMapping) {
371✔
727
    if (const auto* iter = t_proxyMapping->lookup(addr)) {
5!
728
      mappedSource = iter->second.address;
5✔
729
      ++iter->second.stats.netmaskMatches;
5✔
730
    }
5✔
731
  }
5✔
732
  if (!fromProxyProtocolSource && t_allowFrom && !t_allowFrom->match(&mappedSource)) {
371!
733
    if (!g_quiet) {
×
734
      SLOG(g_log << Logger::Error << "[" << g_multiTasker->getTid() << "] dropping TCP query from " << mappedSource.toString() << ", address neither matched by allow-from nor proxy-protocol-from" << endl,
×
735
           g_slogtcpin->info(Logr::Error, "dropping TCP query address neither matched by allow-from nor proxy-protocol-from", "source", Logging::Loggable(mappedSource)));
×
736
    }
×
737
    closeSock(rec::Counter::unauthorizedTCP, "Error closing TCP socket after an ACL drop");
×
738
    return;
×
739
  }
×
740

741
  if (g_maxTCPPerClient > 0 && t_tcpClientCounts->count(addr) > 0 && (*t_tcpClientCounts)[addr] >= g_maxTCPPerClient) {
371!
742
    closeSock(rec::Counter::tcpClientOverflow, "Error closing TCP socket after a client overflow drop");
×
743
    return;
×
744
  }
×
745

746
  setNonBlocking(newsock);
371✔
747
  setTCPNoDelay(newsock);
371✔
748
  std::shared_ptr<TCPConnection> tcpConn = std::make_shared<TCPConnection>(newsock, addr);
371✔
749
  tcpConn->d_source = addr;
371✔
750
  tcpConn->d_destination = destaddr;
371✔
751
  tcpConn->d_mappedSource = mappedSource;
371✔
752

753
  if (fromProxyProtocolSource) {
371✔
754
    tcpConn->proxyProtocolNeed = s_proxyProtocolMinimumHeaderSize;
31✔
755
    tcpConn->data.resize(tcpConn->proxyProtocolNeed);
31✔
756
    tcpConn->state = TCPConnection::PROXYPROTOCOLHEADER;
31✔
757
  }
31✔
758
  else {
340✔
759
    tcpConn->state = TCPConnection::BYTE0;
340✔
760
  }
340✔
761

762
  timeval ttd{};
371✔
763
  Utility::gettimeofday(&ttd, nullptr);
371✔
764
  ttd.tv_sec += g_tcpTimeout;
371✔
765

766
  t_fdm->addReadFD(tcpConn->getFD(), handleRunningTCPQuestion, tcpConn, &ttd);
371✔
767
}
371✔
768

769
static void TCPIOHandlerIO(int fileDesc, FDMultiplexer::funcparam_t& var);
770

771
static void TCPIOHandlerStateChange(IOState oldstate, IOState newstate, std::shared_ptr<PacketID>& pid)
772
{
66✔
773
  TCPLOG(pid->tcpsock, "State transation " << int(oldstate) << "->" << int(newstate) << endl);
66✔
774

775
  pid->lowState = newstate;
66✔
776

777
  // handle state transitions
778
  switch (oldstate) {
66!
779
  case IOState::NeedRead:
29✔
780

781
    switch (newstate) {
29!
782
    case IOState::NeedWrite:
×
783
      TCPLOG(pid->tcpsock, "NeedRead -> NeedWrite: flip FD" << endl);
×
784
      t_fdm->alterFDToWrite(pid->tcpsock, TCPIOHandlerIO, pid);
×
785
      break;
×
UNCOV
786
    case IOState::NeedRead:
×
UNCOV
787
      break;
×
788
    case IOState::Done:
29!
789
      TCPLOG(pid->tcpsock, "Done -> removeReadFD" << endl);
29✔
790
      t_fdm->removeReadFD(pid->tcpsock);
29✔
791
      break;
29✔
792
    case IOState::Async:
×
793
      throw std::runtime_error("TLS async mode not supported");
×
794
      break;
×
795
    }
29✔
796
    break;
29✔
797

798
  case IOState::NeedWrite:
29✔
799

800
    switch (newstate) {
6!
801
    case IOState::NeedRead:
4✔
802
      TCPLOG(pid->tcpsock, "NeedWrite -> NeedRead: flip FD" << endl);
4✔
803
      t_fdm->alterFDToRead(pid->tcpsock, TCPIOHandlerIO, pid);
4✔
804
      break;
4✔
805
    case IOState::NeedWrite:
×
806
      break;
×
807
    case IOState::Done:
2✔
808
      TCPLOG(pid->tcpsock, "Done -> removeWriteFD" << endl);
2✔
809
      t_fdm->removeWriteFD(pid->tcpsock);
2✔
810
      break;
2✔
811
    case IOState::Async:
×
812
      throw std::runtime_error("TLS async mode not supported");
×
813
      break;
×
814
    }
6✔
815
    break;
6✔
816

817
  case IOState::Done:
31✔
818
    switch (newstate) {
31!
819
    case IOState::NeedRead:
25✔
820
      TCPLOG(pid->tcpsock, "NeedRead: addReadFD" << endl);
25✔
821
      t_fdm->addReadFD(pid->tcpsock, TCPIOHandlerIO, pid);
25✔
822
      break;
25✔
823
    case IOState::NeedWrite:
6✔
824
      TCPLOG(pid->tcpsock, "NeedWrite: addWriteFD" << endl);
6✔
825
      t_fdm->addWriteFD(pid->tcpsock, TCPIOHandlerIO, pid);
6✔
826
      break;
6✔
827
    case IOState::Done:
×
828
      break;
×
829
    case IOState::Async:
×
830
      throw std::runtime_error("TLS async mode not supported");
×
831
      break;
×
832
    }
31✔
833
    break;
31✔
834

835
  case IOState::Async:
31!
836
    throw std::runtime_error("TLS async mode not supported");
×
837
    break;
×
838
  }
66✔
839
}
66✔
840

841
static void TCPIOHandlerIO(int fileDesc, FDMultiplexer::funcparam_t& var)
842
{
35✔
843
  auto pid = boost::any_cast<std::shared_ptr<PacketID>>(var);
35✔
844
  assert(pid->tcphandler); // NOLINT(cppcoreguidelines-pro-bounds-array-to-pointer-decay): def off assert triggers it
35✔
845
  assert(fileDesc == pid->tcphandler->getDescriptor()); // NOLINT(cppcoreguidelines-pro-bounds-array-to-pointer-decay) idem
×
846
  IOState newstate = IOState::Done;
×
847

848
  TCPLOG(pid->tcpsock, "TCPIOHandlerIO: lowState " << int(pid->lowState) << endl);
35✔
849

850
  // In the code below, we want to update the state of the fd before calling sendEvent
851
  // a sendEvent might close the fd, and some poll multiplexers do not like to manipulate a closed fd
852

853
  switch (pid->highState) {
35!
854
  case TCPAction::DoingRead:
25✔
855
    TCPLOG(pid->tcpsock, "highState: Reading" << endl);
25✔
856
    // In arecvtcp, the buffer was resized already so inWanted bytes will fit
857
    // try reading
858
    try {
25✔
859
      newstate = pid->tcphandler->tryRead(pid->inMSG, pid->inPos, pid->inWanted);
25✔
860
      switch (newstate) {
25✔
861
      case IOState::Done:
13✔
862
      case IOState::NeedRead:
24✔
863
        TCPLOG(pid->tcpsock, "tryRead: Done or NeedRead " << int(newstate) << ' ' << pid->inPos << '/' << pid->inWanted << endl);
24✔
864
        TCPLOG(pid->tcpsock, "TCPIOHandlerIO " << pid->inWanted << ' ' << pid->inIncompleteOkay << endl);
24✔
865
        if (pid->inPos == pid->inWanted || (pid->inIncompleteOkay && pid->inPos > 0)) {
24!
866
          pid->inMSG.resize(pid->inPos); // old content (if there) + new bytes read, only relevant for the inIncompleteOkay case
24✔
867
          newstate = IOState::Done;
24✔
868
          TCPIOHandlerStateChange(pid->lowState, newstate, pid);
24✔
869
          g_multiTasker->sendEvent(pid, &pid->inMSG);
24✔
870
          return;
24✔
871
        }
24✔
UNCOV
872
        break;
×
UNCOV
873
      case IOState::NeedWrite:
×
874
        break;
×
875
      case IOState::Async:
×
876
        throw std::runtime_error("TLS async mode not supported");
×
877
        break;
×
878
      }
25✔
879
    }
25✔
880
    catch (const std::exception& e) {
25✔
881
      newstate = IOState::Done;
1✔
882
      TCPLOG(pid->tcpsock, "read exception..." << e.what() << endl);
1✔
883
      PacketBuffer empty;
1✔
884
      TCPIOHandlerStateChange(pid->lowState, newstate, pid);
1✔
885
      g_multiTasker->sendEvent(pid, &empty); // this conveys error status
1✔
886
      return;
1✔
887
    }
1✔
UNCOV
888
    break;
×
889

890
  case TCPAction::DoingWrite:
10✔
891
    TCPLOG(pid->tcpsock, "highState: Writing" << endl);
10✔
892
    try {
10✔
893
      TCPLOG(pid->tcpsock, "tryWrite: " << pid->outPos << '/' << pid->outMSG.size() << ' ' << " -> ");
10✔
894
      newstate = pid->tcphandler->tryWrite(pid->outMSG, pid->outPos, pid->outMSG.size());
10✔
895
      TCPLOG(pid->tcpsock, pid->outPos << '/' << pid->outMSG.size() << endl);
10✔
896
      switch (newstate) {
10!
897
      case IOState::Done: {
6✔
898
        TCPLOG(pid->tcpsock, "tryWrite: Done" << endl);
6✔
899
        TCPIOHandlerStateChange(pid->lowState, newstate, pid);
6✔
900
        g_multiTasker->sendEvent(pid, &pid->outMSG); // send back what we sent to convey everything is ok
6✔
901
        return;
6✔
902
      }
×
903
      case IOState::NeedRead:
4✔
904
        TCPLOG(pid->tcpsock, "tryWrite: NeedRead" << endl);
4✔
905
        break;
4✔
906
      case IOState::NeedWrite:
×
907
        TCPLOG(pid->tcpsock, "tryWrite: NeedWrite" << endl);
×
908
        break;
×
909
      case IOState::Async:
×
910
        throw std::runtime_error("TLS async mode not supported");
×
911
        break;
×
912
      }
10✔
913
    }
10✔
914
    catch (const std::exception& e) {
10✔
915
      newstate = IOState::Done;
×
916
      TCPLOG(pid->tcpsock, "write exception..." << e.what() << endl);
×
917
      PacketBuffer sent;
×
918
      TCPIOHandlerStateChange(pid->lowState, newstate, pid);
×
919
      g_multiTasker->sendEvent(pid, &sent); // we convey error status by sending empty string
×
920
      return;
×
921
    }
×
922
    break;
4✔
923
  }
35✔
924

925
  // Cases that did not end up doing a sendEvent
926
  TCPIOHandlerStateChange(pid->lowState, newstate, pid);
4✔
927
}
4✔
928

929
void checkFastOpenSysctl([[maybe_unused]] bool active, [[maybe_unused]] Logr::log_t log)
930
{
×
931
#ifdef __linux__
×
932
  string line;
×
933
  if (readFileIfThere("/proc/sys/net/ipv4/tcp_fastopen", &line)) {
×
934
    int flag = std::stoi(line);
×
935
    if (active && !(flag & 1)) {
×
936
      SLOG(g_log << Logger::Error << "tcp-fast-open-connect enabled but net.ipv4.tcp_fastopen does not allow it" << endl,
×
937
           log->info(Logr::Error, "tcp-fast-open-connect enabled but net.ipv4.tcp_fastopen does not allow it"));
×
938
    }
×
939
    if (!active && !(flag & 2)) {
×
940
      SLOG(g_log << Logger::Error << "tcp-fast-open enabled but net.ipv4.tcp_fastopen does not allow it" << endl,
×
941
           log->info(Logr::Error, "tcp-fast-open enabled but net.ipv4.tcp_fastopen does not allow it"));
×
942
    }
×
943
  }
×
944
  else {
×
945
    SLOG(g_log << Logger::Notice << "Cannot determine if kernel settings allow fast-open" << endl,
×
946
         log->info(Logr::Notice, "Cannot determine if kernel settings allow fast-open"));
×
947
  }
×
948
#else
949
  SLOG(g_log << Logger::Notice << "Cannot determine if kernel settings allow fast-open" << endl,
950
       log->info(Logr::Notice, "Cannot determine if kernel settings allow fast-open"));
951
#endif
952
}
×
953

954
void checkTFOconnect(Logr::log_t log)
955
{
×
956
  try {
×
957
    Socket socket(AF_INET, SOCK_STREAM);
×
958
    socket.setNonBlocking();
×
959
    socket.setFastOpenConnect();
×
960
  }
×
961
  catch (const NetworkError& e) {
×
962
    SLOG(g_log << Logger::Error << "tcp-fast-open-connect enabled but returned error: " << e.what() << endl,
×
963
         log->error(Logr::Error, e.what(), "tcp-fast-open-connect enabled but returned error"));
×
964
  }
×
965
}
×
966

967
LWResult::Result asendtcp(const PacketBuffer& data, shared_ptr<TCPIOHandler>& handler)
968
{
182✔
969
  TCPLOG(handler->getDescriptor(), "asendtcp called " << data.size() << endl);
182✔
970

971
  auto pident = std::make_shared<PacketID>();
182✔
972
  pident->tcphandler = handler;
182✔
973
  pident->tcpsock = handler->getDescriptor();
182✔
974
  pident->outMSG = data;
182✔
975
  pident->highState = TCPAction::DoingWrite;
182✔
976

977
  IOState state = IOState::Done;
182✔
978
  try {
182✔
979
    TCPLOG(pident->tcpsock, "Initial tryWrite: " << pident->outPos << '/' << pident->outMSG.size() << ' ' << " -> ");
182✔
980
    state = handler->tryWrite(pident->outMSG, pident->outPos, pident->outMSG.size());
182✔
981
    TCPLOG(pident->tcpsock, pident->outPos << '/' << pident->outMSG.size() << endl);
182✔
982

983
    if (state == IOState::Done) {
182✔
984
      TCPLOG(pident->tcpsock, "asendtcp success A" << endl);
176✔
985
      return LWResult::Result::Success;
176✔
986
    }
176✔
987
  }
182✔
988
  catch (const std::exception& e) {
182✔
989
    TCPLOG(pident->tcpsock, "tryWrite() exception..." << e.what() << endl);
×
990
    return LWResult::Result::PermanentError;
×
991
  }
×
992

993
  // Will set pident->lowState
994
  TCPIOHandlerStateChange(IOState::Done, state, pident);
6✔
995

996
  PacketBuffer packet;
6✔
997
  int ret = g_multiTasker->waitEvent(pident, &packet, g_networkTimeoutMsec);
6✔
998
  TCPLOG(pident->tcpsock, "asendtcp waitEvent returned " << ret << ' ' << packet.size() << '/' << data.size() << ' ');
6✔
999
  if (ret == 0) {
6!
1000
    TCPLOG(pident->tcpsock, "timeout" << endl);
×
1001
    TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
×
1002
    return LWResult::Result::Timeout;
×
1003
  }
×
1004
  if (ret == -1) { // error
6!
1005
    TCPLOG(pident->tcpsock, "PermanentError" << endl);
×
1006
    TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
×
1007
    return LWResult::Result::PermanentError;
×
1008
  }
×
1009
  if (packet.size() != data.size()) { // main loop tells us what it sent out, or empty in case of an error
6!
1010
    // fd housekeeping done by TCPIOHandlerIO
1011
    TCPLOG(pident->tcpsock, "PermanentError size mismatch" << endl);
×
1012
    return LWResult::Result::PermanentError;
×
1013
  }
×
1014

1015
  TCPLOG(pident->tcpsock, "asendtcp success" << endl);
6✔
1016
  return LWResult::Result::Success;
6✔
1017
}
6✔
1018

1019
LWResult::Result arecvtcp(PacketBuffer& data, const size_t len, shared_ptr<TCPIOHandler>& handler, const bool incompleteOkay)
1020
{
192✔
1021
  TCPLOG(handler->getDescriptor(), "arecvtcp called " << len << ' ' << data.size() << endl);
192✔
1022
  data.resize(len);
192✔
1023

1024
  // We might have data already available from the TLS layer, try to get that into the buffer
1025
  size_t pos = 0;
192✔
1026
  IOState state = IOState::Done;
192✔
1027
  try {
192✔
1028
    TCPLOG(handler->getDescriptor(), "calling tryRead() " << len << endl);
192✔
1029
    state = handler->tryRead(data, pos, len);
192✔
1030
    TCPLOG(handler->getDescriptor(), "arcvtcp tryRead() returned " << int(state) << ' ' << pos << '/' << len << endl);
192✔
1031
    switch (state) {
192✔
1032
    case IOState::Done:
13✔
1033
    case IOState::NeedRead:
171✔
1034
      if (pos == len || (incompleteOkay && pos > 0)) {
171✔
1035
        data.resize(pos);
146✔
1036
        TCPLOG(handler->getDescriptor(), "acecvtcp success A" << endl);
146✔
1037
        return LWResult::Result::Success;
146✔
1038
      }
146✔
1039
      break;
25✔
1040
    case IOState::NeedWrite:
25!
1041
      break;
×
1042
    case IOState::Async:
×
1043
      throw std::runtime_error("TLS async mode not supported");
×
1044
      break;
×
1045
    }
192✔
1046
  }
192✔
1047
  catch (const std::exception& e) {
192✔
1048
    TCPLOG(handler->getDescriptor(), "tryRead() exception..." << e.what() << endl);
21✔
1049
    return LWResult::Result::PermanentError;
21✔
1050
  }
21✔
1051

1052
  auto pident = std::make_shared<PacketID>();
25✔
1053
  pident->tcphandler = handler;
25✔
1054
  pident->tcpsock = handler->getDescriptor();
25✔
1055
  // We might have a partial result
1056
  pident->inMSG = std::move(data);
25✔
1057
  pident->inPos = pos;
25✔
1058
  pident->inWanted = len;
25✔
1059
  pident->inIncompleteOkay = incompleteOkay;
25✔
1060
  pident->highState = TCPAction::DoingRead;
25✔
1061

1062
  data.clear();
25✔
1063

1064
  // Will set pident->lowState
1065
  TCPIOHandlerStateChange(IOState::Done, state, pident);
25✔
1066

1067
  int ret = g_multiTasker->waitEvent(pident, &data, authWaitTimeMSec(g_multiTasker));
25✔
1068
  TCPLOG(pident->tcpsock, "arecvtcp " << ret << ' ' << data.size() << ' ');
25✔
1069
  if (ret == 0) {
25!
1070
    TCPLOG(pident->tcpsock, "timeout" << endl);
×
1071
    TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
×
1072
    return LWResult::Result::Timeout;
×
1073
  }
×
1074
  if (ret == -1) {
25!
1075
    TCPLOG(pident->tcpsock, "PermanentError" << endl);
×
1076
    TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
×
1077
    return LWResult::Result::PermanentError;
×
1078
  }
×
1079
  if (data.empty()) { // error, EOF or other
25✔
1080
    // fd housekeeping done by TCPIOHandlerIO
1081
    TCPLOG(pident->tcpsock, "EOF" << endl);
1✔
1082
    return LWResult::Result::PermanentError;
1✔
1083
  }
1✔
1084

1085
  TCPLOG(pident->tcpsock, "arecvtcp success" << endl);
24✔
1086
  return LWResult::Result::Success;
24✔
1087
}
25✔
1088

1089
// The two last arguments to makeTCPServerSockets are used for logging purposes only
1090
unsigned int makeTCPServerSockets(deferredAdd_t& deferredAdds, std::set<int>& tcpSockets, Logr::log_t log, bool doLog, unsigned int instances)
1091
{
152✔
1092
  vector<string> localAddresses;
152✔
1093
  stringtok(localAddresses, ::arg()["local-address"], " ,");
152✔
1094

1095
  if (localAddresses.empty()) {
152!
1096
    throw PDNSException("No local address specified");
×
1097
  }
×
1098

1099
#ifdef TCP_DEFER_ACCEPT
152✔
1100
  auto first = true;
152✔
1101
#endif
152✔
1102
  const uint16_t defaultLocalPort = ::arg().asNum("local-port");
152✔
1103
  for (const auto& localAddress : localAddresses) {
153✔
1104
    ComboAddress address{localAddress, defaultLocalPort};
153✔
1105
    const int socketFd = socket(address.sin6.sin6_family, SOCK_STREAM, 0);
153✔
1106
    if (socketFd < 0) {
153!
1107
      throw PDNSException("Making a TCP server socket for resolver: " + stringerror());
×
1108
    }
×
1109

1110
    setCloseOnExec(socketFd);
153✔
1111

1112
    int tmp = 1;
153✔
1113
    if (setsockopt(socketFd, SOL_SOCKET, SO_REUSEADDR, &tmp, sizeof tmp) < 0) {
153!
1114
      int err = errno;
×
1115
      SLOG(g_log << Logger::Error << "Setsockopt failed for TCP listening socket" << endl,
×
1116
           log->error(Logr::Critical, err, "Setsockopt failed for TCP listening socket"));
×
1117
      _exit(1);
×
1118
    }
×
1119
    if (address.sin6.sin6_family == AF_INET6 && setsockopt(socketFd, IPPROTO_IPV6, IPV6_V6ONLY, &tmp, sizeof(tmp)) < 0) {
153!
1120
      int err = errno;
×
1121
      SLOG(g_log << Logger::Error << "Failed to set IPv6 socket to IPv6 only, continuing anyhow: " << stringerror(err) << endl,
×
1122
           log->error(Logr::Warning, err, "Failed to set IPv6 socket to IPv6 only, continuing anyhow"));
×
1123
    }
×
1124

1125
#ifdef TCP_DEFER_ACCEPT
153✔
1126
    if (setsockopt(socketFd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &tmp, sizeof tmp) >= 0) {
153!
1127
      if (first) {
153✔
1128
        SLOG(g_log << Logger::Info << "Enabled TCP data-ready filter for (slight) DoS protection" << endl,
152✔
1129
             log->info(Logr::Info, "Enabled TCP data-ready filter for (slight) DoS protection"));
152✔
1130
      }
152✔
1131
    }
153✔
1132
#endif
153✔
1133

1134
    if (::arg().mustDo("non-local-bind")) {
153!
1135
      Utility::setBindAny(AF_INET, socketFd);
×
1136
    }
×
1137

1138
    if (g_reusePort) {
153✔
1139
#if defined(SO_REUSEPORT_LB)
1140
      try {
1141
        SSetsockopt(socketFd, SOL_SOCKET, SO_REUSEPORT_LB, 1);
1142
      }
1143
      catch (const std::exception& e) {
1144
        throw PDNSException(std::string("SO_REUSEPORT_LB: ") + e.what());
1145
      }
1146
#elif defined(SO_REUSEPORT)
1147
      try {
141✔
1148
        SSetsockopt(socketFd, SOL_SOCKET, SO_REUSEPORT, 1);
141✔
1149
      }
141✔
1150
      catch (const std::exception& e) {
141✔
1151
        throw PDNSException(std::string("SO_REUSEPORT: ") + e.what());
×
1152
      }
×
1153
#endif
141✔
1154
    }
141✔
1155

1156
    if (SyncRes::s_tcp_fast_open > 0) {
153!
1157
      checkFastOpenSysctl(false, log);
×
1158
#ifdef TCP_FASTOPEN
×
1159
      if (setsockopt(socketFd, IPPROTO_TCP, TCP_FASTOPEN, &SyncRes::s_tcp_fast_open, sizeof SyncRes::s_tcp_fast_open) < 0) {
×
1160
        int err = errno;
×
1161
        SLOG(g_log << Logger::Error << "Failed to enable TCP Fast Open for listening socket: " << stringerror(err) << endl,
×
1162
             log->error(Logr::Error, err, "Failed to enable TCP Fast Open for listening socket"));
×
1163
      }
×
1164
#else
1165
      SLOG(g_log << Logger::Warning << "TCP Fast Open configured but not supported for listening socket" << endl,
1166
           log->info(Logr::Warning, "TCP Fast Open configured but not supported for listening socket"));
1167
#endif
1168
    }
×
1169

1170
    socklen_t socklen = address.sin4.sin_family == AF_INET ? sizeof(address.sin4) : sizeof(address.sin6);
153✔
1171
    if (::bind(socketFd, reinterpret_cast<struct sockaddr*>(&address), socklen) < 0) { // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
153!
1172
      throw PDNSException("Binding TCP server socket for " + address.toStringWithPort() + ": " + stringerror());
×
1173
    }
×
1174

1175
    setNonBlocking(socketFd);
153✔
1176
    try {
153✔
1177
      setSocketSendBuffer(socketFd, 65000);
153✔
1178
    }
153✔
1179
    catch (const std::exception& e) {
153✔
1180
      SLOG(g_log << Logger::Error << e.what() << endl,
×
1181
           log->error(Logr::Error, e.what(), "Exception while setting socket send buffer"));
×
1182
    }
×
1183

1184
    listen(socketFd, 128);
153✔
1185
    deferredAdds.emplace_back(socketFd, handleNewTCPQuestion);
153✔
1186
    tcpSockets.insert(socketFd);
153✔
1187

1188
    // we don't need to update g_listenSocketsAddresses since it doesn't work for TCP/IP:
1189
    //  - fd is not that which we know here, but returned from accept()
1190

1191
#ifdef TCP_DEFER_ACCEPT
153✔
1192
    first = false;
153✔
1193
#endif
153✔
1194
  }
153✔
1195
  if (doLog) {
152!
1196
    log->info(Logr::Info, "Listening for queries", "protocol", Logging::Loggable("TCP"), "addresses", Logging::IterLoggable(localAddresses.cbegin(), localAddresses.cend()), "socketInstances", Logging::Loggable(instances), "reuseport", Logging::Loggable(g_reusePort));
152✔
1197
  }
152✔
1198
  return localAddresses.size();
152✔
1199
}
152✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc