• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

PowerDNS / pdns / 14400190872

11 Apr 2025 09:35AM UTC coverage: 63.46% (-0.02%) from 63.483%
14400190872

Pull #15420

github

web-flow
Merge 8143d6a52 into f0eaaf61c
Pull Request #15420: dnsdist: Add Lua bindings for the incoming network interface

41673 of 100404 branches covered (41.51%)

Branch coverage included in aggregate %.

36 of 36 new or added lines in 3 files covered. (100.0%)

58 existing lines in 15 files now uncovered.

128658 of 168002 relevant lines covered (76.58%)

3841118.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.96
/pdns/recursordist/rec-tcp.cc
1
/*
2
 * This file is part of PowerDNS or dnsdist.
3
 * Copyright -- PowerDNS.COM B.V. and its contributors
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of version 2 of the GNU General Public License as
7
 * published by the Free Software Foundation.
8
 *
9
 * In addition, for the avoidance of any doubt, permission is granted to
10
 * link this program with OpenSSL and to (re)distribute the binaries
11
 * produced as the result of such linking.
12
 *
13
 * This program is distributed in the hope that it will be useful,
14
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
 * GNU General Public License for more details.
17
 *
18
 * You should have received a copy of the GNU General Public License
19
 * along with this program; if not, write to the Free Software
20
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21
 */
22

23
#include "rec-main.hh"
24

25
#include "arguments.hh"
26
#include "logger.hh"
27
#include "mplexer.hh"
28
#include "uuid-utils.hh"
29

30
// OLD PRE 5.0.0 situation:
31
//
32
// When pdns-distributes-queries is false with reuseport true (the default since 4.9.0), TCP queries
33
// are read and handled by worker threads. If the kernel balancing is OK for TCP sockets (observed
34
// to be good on Debian bullseye, but not good on e.g. MacOS), the TCP handling is no extra burden.
35
// In the case of MacOS all incoming TCP queries are handled by a single worker, while incoming UDP
36
// queries do get distributed round-robin over the worker threads.  Do note the TCP queries might
37
// need to wait until the g_maxUDPQueriesPerRound is reached.
38
//
39
// In the case of pdns-distributes-queries true and reuseport false the queries were read and
40
// initially processed by the distributor thread(s).
41
//
42
// Initial processing consist of parsing, calling gettag and checking if we have a packet cache
43
// hit. If that does not produce a hit, the query is passed to an mthread in the same way as with
44
// UDP queries, but do note that the mthread processing is serviced by the distributor thread. The
45
// final answer will be sent by the same distributor thread that originally picked up the query.
46
//
47
// Changing this, and having incoming TCP queries handled by worker threads is somewhat more complex
48
// than UDP, as the socket must remain available in the distributor thread (for reading more
49
// queries), but the TCP socket must also be passed to a worker thread so it can write its
50
// answer. The in-flight bookkeeping also has to be aware of how a query is handled to do the
51
// accounting properly. I am not sure if changing the current setup is worth all this trouble,
52
// especially since the default is now to not use pdns-distributes-queries, which works well in many
53
// cases.
54
//
55
// NEW SITUATION SINCE 5.0.0:
56
//
57
// The drawback mentioned in https://github.com/PowerDNS/pdns/issues/8394 are not longer true, so an
58
// alternative approach would be to introduce dedicated TCP worker thread(s).
59
//
60
// This approach was implemented in https://github.com/PowerDNS/pdns/pull/13195. The distributor and
61
// worker thread(s) now no longer process TCP queries.
62

63
size_t g_tcpMaxQueriesPerConn;
64
unsigned int g_maxTCPClients;
65
unsigned int g_maxTCPPerClient;
66
int g_tcpTimeout;
67
bool g_anyToTcp;
68

69
uint16_t TCPConnection::s_maxInFlight;
70

71
using tcpClientCounts_t = map<ComboAddress, uint32_t, ComboAddress::addressOnlyLessThan>;
72
static thread_local std::unique_ptr<tcpClientCounts_t> t_tcpClientCounts = std::make_unique<tcpClientCounts_t>();
73

74
static void handleRunningTCPQuestion(int fileDesc, FDMultiplexer::funcparam_t& var);
75

76
#if 0
77
#define TCPLOG(tcpsock, x)                                 \
78
  do {                                                     \
79
    cerr << []() { timeval t; gettimeofday(&t, nullptr); return t.tv_sec % 10  + t.tv_usec/1000000.0; }() << " FD " << (tcpsock) << ' ' << x; \
80
  } while (0)
81
#else
82
// We do not define this as empty since that produces a duplicate case label warning from clang-tidy
83
#define TCPLOG(pid, x) /* NOLINT(cppcoreguidelines-macro-usage) */ \
84
  while (false) {                                                  \
1,488✔
85
    cerr << x; /* NOLINT(bugprone-macro-parentheses) */            \
×
86
  }
×
87
#endif
88

89
std::atomic<uint32_t> TCPConnection::s_currentConnections;
90

91
TCPConnection::TCPConnection(int fileDesc, const ComboAddress& addr) :
92
  data(2, 0), d_remote(addr), d_fd(fileDesc)
93
{
381✔
94
  ++s_currentConnections;
381✔
95
  (*t_tcpClientCounts)[d_remote]++;
381✔
96
}
381✔
97

98
TCPConnection::~TCPConnection()
99
{
381✔
100
  try {
381✔
101
    if (closesocket(d_fd) < 0) {
381!
102
      SLOG(g_log << Logger::Error << "Error closing socket for TCPConnection" << endl,
×
103
           g_slogtcpin->info(Logr::Error, "Error closing socket for TCPConnection"));
×
104
    }
×
105
  }
381✔
106
  catch (const PDNSException& e) {
381✔
107
    SLOG(g_log << Logger::Error << "Error closing TCPConnection socket: " << e.reason << endl,
×
108
         g_slogtcpin->error(Logr::Error, e.reason, "Error closing TCPConnection socket", "exception", Logging::Loggable("PDNSException")));
×
109
  }
×
110

111
  if (t_tcpClientCounts && t_tcpClientCounts->count(d_remote) != 0 && (*t_tcpClientCounts)[d_remote]-- == 0) {
381!
112
    t_tcpClientCounts->erase(d_remote);
×
113
  }
×
114
  --s_currentConnections;
381✔
115
}
381✔
116

117
static void terminateTCPConnection(int fileDesc)
118
{
378✔
119
  try {
378✔
120
    t_fdm->removeReadFD(fileDesc);
378✔
121
  }
378✔
122
  catch (const FDMultiplexerException& fde) {
378✔
123
  }
×
124
}
378✔
125

126
static void sendErrorOverTCP(std::unique_ptr<DNSComboWriter>& comboWriter, int rcode)
127
{
×
128
  std::vector<uint8_t> packet;
×
129
  if (comboWriter->d_mdp.d_header.qdcount == 0U) {
×
130
    /* header-only */
131
    packet.resize(sizeof(dnsheader));
×
132
  }
×
133
  else {
×
134
    DNSPacketWriter packetWriter(packet, comboWriter->d_mdp.d_qname, comboWriter->d_mdp.d_qtype, comboWriter->d_mdp.d_qclass);
×
135
    if (comboWriter->d_mdp.hasEDNS()) {
×
136
      /* we try to add the EDNS OPT RR even for truncated answers,
137
         as rfc6891 states:
138
         "The minimal response MUST be the DNS header, question section, and an
139
         OPT record.  This MUST also occur when a truncated response (using
140
         the DNS header's TC bit) is returned."
141
      */
142
      packetWriter.addOpt(512, 0, 0);
×
143
      packetWriter.commit();
×
144
    }
×
145
  }
×
146

147
  auto& header = reinterpret_cast<dnsheader&>(packet.at(0)); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast) safe cast
×
148
  header.aa = 0;
×
149
  header.ra = 1;
×
150
  header.qr = 1;
×
151
  header.tc = 0;
×
152
  header.id = comboWriter->d_mdp.d_header.id;
×
153
  header.rd = comboWriter->d_mdp.d_header.rd;
×
154
  header.cd = comboWriter->d_mdp.d_header.cd;
×
155
  header.rcode = rcode;
×
156

157
  sendResponseOverTCP(comboWriter, packet);
×
158
}
×
159

160
void finishTCPReply(std::unique_ptr<DNSComboWriter>& comboWriter, bool hadError, bool updateInFlight)
161
{
447✔
162
  // update tcp connection status, closing if needed and doing the fd multiplexer accounting
163
  if (updateInFlight && comboWriter->d_tcpConnection->d_requestsInFlight > 0) {
447!
164
    comboWriter->d_tcpConnection->d_requestsInFlight--;
407✔
165
  }
407✔
166

167
  // In the code below, we try to remove the fd from the set, but
168
  // we don't know if another mthread already did the remove, so we can get a
169
  // "Tried to remove unlisted fd" exception.  Not that an inflight < limit test
170
  // will not work since we do not know if the other mthread got an error or not.
171
  if (hadError) {
447!
172
    terminateTCPConnection(comboWriter->d_socket);
×
173
    comboWriter->d_socket = -1;
×
174
    return;
×
175
  }
×
176
  comboWriter->d_tcpConnection->queriesCount++;
447✔
177
  if ((g_tcpMaxQueriesPerConn > 0 && comboWriter->d_tcpConnection->queriesCount >= g_tcpMaxQueriesPerConn) || (comboWriter->d_tcpConnection->isDropOnIdle() && comboWriter->d_tcpConnection->d_requestsInFlight == 0)) {
447!
178
    try {
3✔
179
      t_fdm->removeReadFD(comboWriter->d_socket);
3✔
180
    }
3✔
181
    catch (FDMultiplexerException&) {
3✔
182
    }
×
183
    comboWriter->d_socket = -1;
3✔
184
    return;
3✔
185
  }
3✔
186

187
  Utility::gettimeofday(&g_now, nullptr); // needs to be updated
444✔
188
  struct timeval ttd = g_now;
444✔
189

190
  // If we cross from max to max-1 in flight requests, the fd was not listened to, add it back
191
  if (updateInFlight && comboWriter->d_tcpConnection->d_requestsInFlight == TCPConnection::s_maxInFlight - 1) {
444✔
192
    // A read error might have happened. If we add the fd back, it will most likely error again.
193
    // This is not a big issue, the next handleTCPClientReadable() will see another read error
194
    // and take action.
195
    ttd.tv_sec += g_tcpTimeout;
1✔
196
    t_fdm->addReadFD(comboWriter->d_socket, handleRunningTCPQuestion, comboWriter->d_tcpConnection, &ttd);
1✔
197
    return;
1✔
198
  }
1✔
199
  // fd might have been removed by read error code, or a read timeout, so expect an exception
200
  try {
443✔
201
    t_fdm->setReadTTD(comboWriter->d_socket, ttd, g_tcpTimeout);
443✔
202
  }
443✔
203
  catch (const FDMultiplexerException&) {
443✔
204
    // but if the FD was removed because of a timeout while we were sending a response,
205
    // we need to re-arm it. If it was an error it will error again.
206
    ttd.tv_sec += g_tcpTimeout;
×
207
    t_fdm->addReadFD(comboWriter->d_socket, handleRunningTCPQuestion, comboWriter->d_tcpConnection, &ttd);
×
208
  }
×
209
}
443✔
210

211
/*
212
 * A helper class that by default closes the incoming TCP connection on destruct
213
 * If you want to keep the connection alive, call keep() on the guard object
214
 */
215
class RunningTCPQuestionGuard
216
{
217
public:
218
  RunningTCPQuestionGuard(const RunningTCPQuestionGuard&) = default;
219
  RunningTCPQuestionGuard(RunningTCPQuestionGuard&&) = delete;
220
  RunningTCPQuestionGuard& operator=(const RunningTCPQuestionGuard&) = default;
221
  RunningTCPQuestionGuard& operator=(RunningTCPQuestionGuard&&) = delete;
222
  RunningTCPQuestionGuard(int fileDesc) :
223
    d_fd(fileDesc) {}
1,139✔
224
  ~RunningTCPQuestionGuard()
225
  {
1,139✔
226
    if (d_fd != -1) {
1,139✔
227
      terminateTCPConnection(d_fd);
378✔
228
      d_fd = -1;
378✔
229
    }
378✔
230
  }
1,139✔
231
  void keep()
232
  {
1,208✔
233
    d_fd = -1;
1,208✔
234
  }
1,208✔
235
  bool handleTCPReadResult(int /* fd */, ssize_t bytes)
236
  {
395✔
237
    if (bytes == 0) {
395✔
238
      /* EOF */
239
      return false;
368✔
240
    }
368✔
241
    if (bytes < 0) {
27!
242
      if (errno != EAGAIN && errno != EWOULDBLOCK) {
27!
243
        return false;
×
244
      }
×
245
    }
27✔
246
    keep();
27✔
247
    return true;
27✔
248
  }
27✔
249

250
private:
251
  int d_fd{-1};
252
};
253

254
static void handleNotify(std::unique_ptr<DNSComboWriter>& comboWriter, const DNSName& qname)
255
{
1✔
256
  if (!t_allowNotifyFrom || !t_allowNotifyFrom->match(comboWriter->d_mappedSource)) {
1!
257
    if (!g_quiet) {
×
258
      SLOG(g_log << Logger::Error << "[" << g_multiTasker->getTid() << "] dropping TCP NOTIFY from " << comboWriter->d_mappedSource.toString() << ", address not matched by allow-notify-from" << endl,
×
259
           g_slogtcpin->info(Logr::Error, "Dropping TCP NOTIFY, address not matched by allow-notify-from", "source", Logging::Loggable(comboWriter->d_mappedSource)));
×
260
    }
×
261

262
    t_Counters.at(rec::Counter::sourceDisallowedNotify)++;
×
263
    return;
×
264
  }
×
265

266
  if (!isAllowNotifyForZone(qname)) {
1!
267
    if (!g_quiet) {
×
268
      SLOG(g_log << Logger::Error << "[" << g_multiTasker->getTid() << "] dropping TCP NOTIFY from " << comboWriter->d_mappedSource.toString() << ", for " << qname.toLogString() << ", zone not matched by allow-notify-for" << endl,
×
269
           g_slogtcpin->info(Logr::Error, "Dropping TCP NOTIFY,  zone not matched by allow-notify-for", "source", Logging::Loggable(comboWriter->d_mappedSource), "zone", Logging::Loggable(qname)));
×
270
    }
×
271

272
    t_Counters.at(rec::Counter::zoneDisallowedNotify)++;
×
273
    return;
×
274
  }
×
275
}
1✔
276

277
static void doProtobufLogQuery(bool logQuery, LocalStateHolder<LuaConfigItems>& luaconfsLocal, const std::unique_ptr<DNSComboWriter>& comboWriter, const DNSName& qname, QType qtype, QClass qclass, const dnsheader* dnsheader, const shared_ptr<TCPConnection>& conn, const boost::optional<uint32_t>& ednsVersion)
278
{
8✔
279
  try {
8✔
280
    if (logQuery && !(luaconfsLocal->protobufExportConfig.taggedOnly && comboWriter->d_policyTags.empty())) {
8!
281
      protobufLogQuery(luaconfsLocal, comboWriter->d_uuid, comboWriter->d_source, comboWriter->d_destination, comboWriter->d_mappedSource, comboWriter->d_ednssubnet.getSource(), true, conn->qlen, qname, qtype, qclass, comboWriter->d_policyTags, comboWriter->d_requestorId, comboWriter->d_deviceId, comboWriter->d_deviceName, comboWriter->d_meta, ednsVersion, *dnsheader);
2✔
282
    }
2✔
283
  }
8✔
284
  catch (const std::exception& e) {
8✔
285
    if (g_logCommonErrors) {
×
286
      SLOG(g_log << Logger::Warning << "Error parsing a TCP query packet for edns subnet: " << e.what() << endl,
×
287
           g_slogtcpin->error(Logr::Warning, e.what(), "Error parsing a TCP query packet for edns subnet", "exception", Logging::Loggable("std::exception"), "remote", Logging::Loggable(conn->d_remote)));
×
288
    }
×
289
  }
×
290
}
8✔
291

292
static void doProcessTCPQuestion(std::unique_ptr<DNSComboWriter>& comboWriter, shared_ptr<TCPConnection>& conn, RunningTCPQuestionGuard& tcpGuard, int fileDesc)
293
{
449✔
294
  RecThreadInfo::self().incNumberOfDistributedQueries();
449✔
295
  struct timeval start{};
449✔
296
  Utility::gettimeofday(&start, nullptr);
449✔
297

298
  DNSName qname;
449✔
299
  uint16_t qtype = 0;
449✔
300
  uint16_t qclass = 0;
449✔
301
  bool needEDNSParse = false;
449✔
302
  string requestorId;
449✔
303
  string deviceId;
449✔
304
  string deviceName;
449✔
305
  bool logQuery = false;
449✔
306
  bool qnameParsed = false;
449✔
307
  boost::optional<uint32_t> ednsVersion;
449✔
308

309
  comboWriter->d_eventTrace.setEnabled(SyncRes::s_event_trace_enabled != 0);
449✔
310
  comboWriter->d_eventTrace.add(RecEventTrace::ReqRecv);
449✔
311
  auto luaconfsLocal = g_luaconfs.getLocal();
449✔
312
  if (checkProtobufExport(luaconfsLocal)) {
449✔
313
    needEDNSParse = true;
8✔
314
  }
8✔
315
  logQuery = t_protobufServers.servers && luaconfsLocal->protobufExportConfig.logQueries;
449✔
316
  comboWriter->d_logResponse = t_protobufServers.servers && luaconfsLocal->protobufExportConfig.logResponses;
449!
317

318
  if (needEDNSParse || (t_pdl && (t_pdl->hasGettagFFIFunc() || t_pdl->hasGettagFunc())) || comboWriter->d_mdp.d_header.opcode == static_cast<unsigned>(Opcode::Notify)) {
449✔
319

320
    try {
45✔
321
      EDNSOptionViewMap ednsOptions;
45✔
322
      comboWriter->d_ecsParsed = true;
45✔
323
      comboWriter->d_ecsFound = false;
45✔
324
      getQNameAndSubnet(conn->data, &qname, &qtype, &qclass,
45✔
325
                        comboWriter->d_ecsFound, &comboWriter->d_ednssubnet, g_gettagNeedsEDNSOptions ? &ednsOptions : nullptr, ednsVersion);
45✔
326
      qnameParsed = true;
45✔
327

328
      if (t_pdl) {
45✔
329
        try {
44✔
330
          if (t_pdl->hasGettagFFIFunc()) {
44✔
331
            RecursorLua4::FFIParams params(qname, qtype, comboWriter->d_local, comboWriter->d_remote, comboWriter->d_destination, comboWriter->d_source, comboWriter->d_ednssubnet.getSource(), comboWriter->d_data, comboWriter->d_gettagPolicyTags, comboWriter->d_records, ednsOptions, comboWriter->d_proxyProtocolValues, requestorId, deviceId, deviceName, comboWriter->d_routingTag, comboWriter->d_rcode, comboWriter->d_ttlCap, comboWriter->d_variable, true, logQuery, comboWriter->d_logResponse, comboWriter->d_followCNAMERecords, comboWriter->d_extendedErrorCode, comboWriter->d_extendedErrorExtra, comboWriter->d_responsePaddingDisabled, comboWriter->d_meta);
23✔
332
            comboWriter->d_eventTrace.add(RecEventTrace::LuaGetTagFFI);
23✔
333
            comboWriter->d_tag = t_pdl->gettag_ffi(params);
23✔
334
            comboWriter->d_eventTrace.add(RecEventTrace::LuaGetTagFFI, comboWriter->d_tag, false);
23✔
335
          }
23✔
336
          else if (t_pdl->hasGettagFunc()) {
21!
337
            comboWriter->d_eventTrace.add(RecEventTrace::LuaGetTag);
21✔
338
            comboWriter->d_tag = t_pdl->gettag(comboWriter->d_source, comboWriter->d_ednssubnet.getSource(), comboWriter->d_destination, qname, qtype, &comboWriter->d_gettagPolicyTags, comboWriter->d_data, ednsOptions, true, requestorId, deviceId, deviceName, comboWriter->d_routingTag, comboWriter->d_proxyProtocolValues);
21✔
339
            comboWriter->d_eventTrace.add(RecEventTrace::LuaGetTag, comboWriter->d_tag, false);
21✔
340
          }
21✔
341
          // Copy d_gettagPolicyTags to d_policyTags, so other Lua hooks see them and can add their
342
          // own. Before storing into the packetcache, the tags in d_gettagPolicyTags will be
343
          // cleared by addPolicyTagsToPBMessageIfNeeded() so they do *not* end up in the PC. When an
344
          // Protobuf message is constructed, one part comes from the PC (including the tags
345
          // set by non-gettag hooks), and the tags in d_gettagPolicyTags will be added by the code
346
          // constructing the PB message.
347
          comboWriter->d_policyTags = comboWriter->d_gettagPolicyTags;
44✔
348
        }
44✔
349
        catch (const MOADNSException& moadnsexception) {
44✔
350
          if (g_logCommonErrors) {
×
351
            g_slogtcpin->error(moadnsexception.what(), "Error parsing a query packet for tag determination", "qname", Logging::Loggable(qname), "excepion", Logging::Loggable("MOADNSException"));
×
352
          }
×
353
        }
×
354
        catch (const std::exception& stdException) {
44✔
355
          g_rateLimitedLogger.log(g_slogtcpin, "Error parsing a query packet for tag determination", stdException, "qname", Logging::Loggable(qname), "remote", Logging::Loggable(conn->d_remote));
×
356
        }
×
357
      }
44✔
358
    }
45✔
359
    catch (const std::exception& stdException) {
45✔
360
      g_rateLimitedLogger.log(g_slogudpin, "Error parsing a query packet for tag determination, setting tag=0", stdException, "remote", Logging::Loggable(conn->d_remote));
×
361
    }
×
362
  }
45✔
363

364
  if (comboWriter->d_tag == 0 && !comboWriter->d_responsePaddingDisabled && g_paddingFrom.match(comboWriter->d_remote)) {
449!
365
    comboWriter->d_tag = g_paddingTag;
×
366
  }
×
367

368
  const dnsheader_aligned headerdata(conn->data.data());
449✔
369
  const struct dnsheader* dnsheader = headerdata.get();
449✔
370

371
  if (t_protobufServers.servers || t_outgoingProtobufServers.servers) {
449!
372
    comboWriter->d_requestorId = std::move(requestorId);
8✔
373
    comboWriter->d_deviceId = std::move(deviceId);
8✔
374
    comboWriter->d_deviceName = std::move(deviceName);
8✔
375
    comboWriter->d_uuid = getUniqueID();
8✔
376
  }
8✔
377

378
  if (t_protobufServers.servers) {
449✔
379
    doProtobufLogQuery(logQuery, luaconfsLocal, comboWriter, qname, qtype, qclass, dnsheader, conn, ednsVersion);
8✔
380
  }
8✔
381

382
  if (t_pdl) {
449✔
383
    bool ipf = t_pdl->ipfilter(comboWriter->d_source, comboWriter->d_destination, *dnsheader, comboWriter->d_eventTrace);
193✔
384
    if (ipf) {
193✔
385
      if (!g_quiet) {
2!
386
        SLOG(g_log << Logger::Notice << RecThreadInfo::id() << " [" << g_multiTasker->getTid() << "/" << g_multiTasker->numProcesses() << "] DROPPED TCP question from " << comboWriter->d_source.toStringWithPort() << (comboWriter->d_source != comboWriter->d_remote ? " (via " + comboWriter->d_remote.toStringWithPort() + ")" : "") << " based on policy" << endl,
2✔
387
             g_slogtcpin->info(Logr::Info, "Dropped TCP question based on policy", "remote", Logging::Loggable(conn->d_remote), "source", Logging::Loggable(comboWriter->d_source)));
2✔
388
      }
2✔
389
      t_Counters.at(rec::Counter::policyDrops)++;
2✔
390
      return;
2✔
391
    }
2✔
392
  }
193✔
393

394
  if (comboWriter->d_mdp.d_header.qr) {
447!
395
    t_Counters.at(rec::Counter::ignoredCount)++;
×
396
    if (g_logCommonErrors) {
×
397
      SLOG(g_log << Logger::Error << "Ignoring answer from TCP client " << comboWriter->getRemote() << " on server socket!" << endl,
×
398
           g_slogtcpin->info(Logr::Error, "Ignoring answer from TCP client on server socket", "remote", Logging::Loggable(comboWriter->getRemote())));
×
399
    }
×
400
    return;
×
401
  }
×
402
  if (comboWriter->d_mdp.d_header.opcode != static_cast<unsigned>(Opcode::Query) && comboWriter->d_mdp.d_header.opcode != static_cast<unsigned>(Opcode::Notify)) {
447!
403
    t_Counters.at(rec::Counter::ignoredCount)++;
×
404
    if (g_logCommonErrors) {
×
405
      SLOG(g_log << Logger::Error << "Ignoring unsupported opcode " << Opcode::to_s(comboWriter->d_mdp.d_header.opcode) << " from TCP client " << comboWriter->getRemote() << " on server socket!" << endl,
×
406
           g_slogtcpin->info(Logr::Error, "Ignoring unsupported opcode from TCP client", "remote", Logging::Loggable(comboWriter->getRemote()), "opcode", Logging::Loggable(Opcode::to_s(comboWriter->d_mdp.d_header.opcode))));
×
407
    }
×
408
    sendErrorOverTCP(comboWriter, RCode::NotImp);
×
409
    tcpGuard.keep();
×
410
    return;
×
411
  }
×
412
  if (dnsheader->qdcount == 0U) {
447!
413
    t_Counters.at(rec::Counter::emptyQueriesCount)++;
×
414
    if (g_logCommonErrors) {
×
415
      SLOG(g_log << Logger::Error << "Ignoring empty (qdcount == 0) query from " << comboWriter->getRemote() << " on server socket!" << endl,
×
416
           g_slogtcpin->info(Logr::Error, "Ignoring empty (qdcount == 0) query on server socket", "remote", Logging::Loggable(comboWriter->getRemote())));
×
417
    }
×
418
    sendErrorOverTCP(comboWriter, RCode::NotImp);
×
419
    tcpGuard.keep();
×
420
    return;
×
421
  }
×
422
  {
447✔
423
    // We have read a proper query
424
    ++t_Counters.at(rec::Counter::qcounter);
447✔
425
    ++t_Counters.at(rec::Counter::tcpqcounter);
447✔
426
    if (comboWriter->d_source.sin4.sin_family == AF_INET6) {
447✔
427
      ++t_Counters.at(rec::Counter::ipv6qcounter);
15✔
428
    }
15✔
429

430
    if (comboWriter->d_mdp.d_header.opcode == static_cast<unsigned>(Opcode::Notify)) {
447✔
431
      handleNotify(comboWriter, qname);
1✔
432
    }
1✔
433

434
    string response;
447✔
435
    RecursorPacketCache::OptPBData pbData{boost::none};
447✔
436

437
    if (comboWriter->d_mdp.d_header.opcode == static_cast<unsigned>(Opcode::Query)) {
447✔
438
      /* It might seem like a good idea to skip the packet cache lookup if we know that the answer is not cacheable,
439
         but it means that the hash would not be computed. If some script decides at a later time to mark back the answer
440
         as cacheable we would cache it with a wrong tag, so better safe than sorry. */
441
      comboWriter->d_eventTrace.add(RecEventTrace::PCacheCheck);
446✔
442
      bool cacheHit = checkForCacheHit(qnameParsed, comboWriter->d_tag, conn->data, qname, qtype, qclass, g_now, response, comboWriter->d_qhash, pbData, true, comboWriter->d_source, comboWriter->d_mappedSource);
446✔
443
      comboWriter->d_eventTrace.add(RecEventTrace::PCacheCheck, cacheHit, false);
446✔
444

445
      if (cacheHit) {
446✔
446
        if (!g_quiet) {
40!
447
          SLOG(g_log << Logger::Notice << RecThreadInfo::id() << " TCP question answered from packet cache tag=" << comboWriter->d_tag << " from " << comboWriter->d_source.toStringWithPort() << (comboWriter->d_source != comboWriter->d_remote ? " (via " + comboWriter->d_remote.toStringWithPort() + ")" : "") << endl,
40✔
448
               g_slogtcpin->info(Logr::Notice, "TCP question answered from packet cache", "tag", Logging::Loggable(comboWriter->d_tag),
40✔
449
                                 "qname", Logging::Loggable(qname), "qtype", Logging::Loggable(QType(qtype)),
40✔
450
                                 "source", Logging::Loggable(comboWriter->d_source), "remote", Logging::Loggable(comboWriter->d_remote)));
40✔
451
        }
40✔
452

453
        bool hadError = sendResponseOverTCP(comboWriter, response);
40✔
454
        finishTCPReply(comboWriter, hadError, false);
40✔
455
        struct timeval now{};
40✔
456
        Utility::gettimeofday(&now, nullptr);
40✔
457
        uint64_t spentUsec = uSec(now - start);
40✔
458
        t_Counters.at(rec::Histogram::cumulativeAnswers)(spentUsec);
40✔
459
        comboWriter->d_eventTrace.add(RecEventTrace::AnswerSent);
40✔
460

461
        if (t_protobufServers.servers && comboWriter->d_logResponse && (!luaconfsLocal->protobufExportConfig.taggedOnly || (pbData && pbData->d_tagged))) {
40!
462
          struct timeval tval{
3✔
463
            0, 0};
3✔
464
          protobufLogResponse(dnsheader, luaconfsLocal, pbData, tval, true, comboWriter->d_source, comboWriter->d_destination, comboWriter->d_mappedSource, comboWriter->d_ednssubnet, comboWriter->d_uuid, comboWriter->d_requestorId, comboWriter->d_deviceId, comboWriter->d_deviceName, comboWriter->d_meta, comboWriter->d_eventTrace, comboWriter->d_policyTags);
3✔
465
        }
3✔
466

467
        if (comboWriter->d_eventTrace.enabled() && (SyncRes::s_event_trace_enabled & SyncRes::event_trace_to_log) != 0) {
40!
468
          SLOG(g_log << Logger::Info << comboWriter->d_eventTrace.toString() << endl,
×
469
               g_slogtcpin->info(Logr::Info, comboWriter->d_eventTrace.toString())); // More fancy?
×
470
        }
×
471
        tcpGuard.keep();
40✔
472
        t_Counters.updateSnap(g_regressionTestMode);
40✔
473
        return;
40✔
474
      } // cache hit
40✔
475
    } // query opcode
446✔
476

477
    if (comboWriter->d_mdp.d_header.opcode == static_cast<unsigned>(Opcode::Notify)) {
407✔
478
      if (!g_quiet) {
1!
479
        SLOG(g_log << Logger::Notice << RecThreadInfo::id() << " got NOTIFY for " << qname.toLogString() << " from " << comboWriter->d_source.toStringWithPort() << (comboWriter->d_source != comboWriter->d_remote ? " (via " + comboWriter->d_remote.toStringWithPort() + ")" : "") << endl,
1✔
480
             g_slogtcpin->info(Logr::Notice, "Got NOTIFY", "qname", Logging::Loggable(qname), "source", Logging::Loggable(comboWriter->d_source), "remote", Logging::Loggable(comboWriter->d_remote)));
1✔
481
      }
1✔
482

483
      requestWipeCaches(qname);
1✔
484

485
      // the operation will now be treated as a Query, generating
486
      // a normal response, as the rest of the code does not
487
      // check dh->opcode, but we need to ensure that the response
488
      // to this request does not get put into the packet cache
489
      comboWriter->d_variable = true;
1✔
490
    }
1✔
491

492
    // setup for startDoResolve() in an mthread
493
    ++conn->d_requestsInFlight;
407✔
494
    if (conn->d_requestsInFlight >= TCPConnection::s_maxInFlight) {
407✔
495
      t_fdm->removeReadFD(fileDesc); // should no longer awake ourselves when there is data to read
1✔
496
    }
1✔
497
    else {
406✔
498
      Utility::gettimeofday(&g_now, nullptr); // needed?
406✔
499
      struct timeval ttd = g_now;
406✔
500
      t_fdm->setReadTTD(fileDesc, ttd, g_tcpTimeout);
406✔
501
    }
406✔
502
    tcpGuard.keep();
407✔
503
    g_multiTasker->makeThread(startDoResolve, comboWriter.release()); // deletes dc
407✔
504
  } // good query
407✔
505
}
407✔
506

507
static void handleRunningTCPQuestion(int fileDesc, FDMultiplexer::funcparam_t& var) // NOLINT(readability-function-cognitive-complexity)
508
{
1,139✔
509
  auto conn = boost::any_cast<shared_ptr<TCPConnection>>(var);
1,139✔
510

511
  RunningTCPQuestionGuard tcpGuard{fileDesc};
1,139✔
512

513
  if (conn->state == TCPConnection::PROXYPROTOCOLHEADER) {
1,139✔
514
    ssize_t bytes = recv(conn->getFD(), &conn->data.at(conn->proxyProtocolGot), conn->proxyProtocolNeed, 0);
128✔
515
    if (bytes <= 0) {
128✔
516
      tcpGuard.handleTCPReadResult(fileDesc, bytes);
6✔
517
      return;
6✔
518
    }
6✔
519

520
    conn->proxyProtocolGot += bytes;
122✔
521
    conn->data.resize(conn->proxyProtocolGot);
122✔
522
    ssize_t remaining = isProxyHeaderComplete(conn->data);
122✔
523
    if (remaining == 0) {
122✔
524
      if (g_logCommonErrors) {
4!
525
        SLOG(g_log << Logger::Error << "Unable to consume proxy protocol header in packet from TCP client " << conn->d_remote.toStringWithPort() << endl,
4✔
526
             g_slogtcpin->info(Logr::Error, "Unable to consume proxy protocol header in packet from TCP client", "remote", Logging::Loggable(conn->d_remote)));
4✔
527
      }
4✔
528
      ++t_Counters.at(rec::Counter::proxyProtocolInvalidCount);
4✔
529
      return;
4✔
530
    }
4✔
531
    if (remaining < 0) {
118✔
532
      conn->proxyProtocolNeed = -remaining;
96✔
533
      conn->data.resize(conn->proxyProtocolGot + conn->proxyProtocolNeed);
96✔
534
      tcpGuard.keep();
96✔
535
      return;
96✔
536
    }
96✔
537
    {
22✔
538
      /* proxy header received */
539
      /* we ignore the TCP field for now, but we could properly set whether
540
         the connection was received over UDP or TCP if needed */
541
      bool tcp = false;
22✔
542
      bool proxy = false;
22✔
543
      size_t used = parseProxyHeader(conn->data, proxy, conn->d_source, conn->d_destination, tcp, conn->proxyProtocolValues);
22✔
544
      if (used <= 0) {
22!
545
        if (g_logCommonErrors) {
×
546
          SLOG(g_log << Logger::Error << "Unable to parse proxy protocol header in packet from TCP client " << conn->d_remote.toStringWithPort() << endl,
×
547
               g_slogtcpin->info(Logr::Error, "Unable to parse proxy protocol header in packet from TCP client", "remote", Logging::Loggable(conn->d_remote)));
×
548
        }
×
549
        ++t_Counters.at(rec::Counter::proxyProtocolInvalidCount);
×
550
        return;
×
551
      }
×
552
      if (static_cast<size_t>(used) > g_proxyProtocolMaximumSize) {
22✔
553
        if (g_logCommonErrors) {
2!
554
          SLOG(g_log << Logger::Error << "Proxy protocol header in packet from TCP client " << conn->d_remote.toStringWithPort() << " is larger than proxy-protocol-maximum-size (" << used << "), dropping" << endl,
2✔
555
               g_slogtcpin->info(Logr::Error, "Proxy protocol header in packet from TCP client is larger than proxy-protocol-maximum-size", "remote", Logging::Loggable(conn->d_remote), "size", Logging::Loggable(used)));
2✔
556
        }
2✔
557
        ++t_Counters.at(rec::Counter::proxyProtocolInvalidCount);
2✔
558
        return;
2✔
559
      }
2✔
560

561
      /* Now that we have retrieved the address of the client, as advertised by the proxy
562
         via the proxy protocol header, check that it is allowed by our ACL */
563
      /* note that if the proxy header used a 'LOCAL' command, the original source and destination are untouched so everything should be fine */
564
      conn->d_mappedSource = conn->d_source;
20✔
565
      if (t_proxyMapping) {
20!
566
        if (const auto* iter = t_proxyMapping->lookup(conn->d_source)) {
×
567
          conn->d_mappedSource = iter->second.address;
×
568
          ++iter->second.stats.netmaskMatches;
×
569
        }
×
570
      }
×
571
      if (t_remotes) {
20!
572
        t_remotes->push_back(conn->d_source);
20✔
573
      }
20✔
574
      if (t_allowFrom && !t_allowFrom->match(&conn->d_mappedSource)) {
20!
575
        if (!g_quiet) {
4!
576
          SLOG(g_log << Logger::Error << "[" << g_multiTasker->getTid() << "] dropping TCP query from " << conn->d_mappedSource.toString() << ", address not matched by allow-from" << endl,
4✔
577
               g_slogtcpin->info(Logr::Error, "Dropping TCP query, address not matched by allow-from", "remote", Logging::Loggable(conn->d_remote)));
4✔
578
        }
4✔
579

580
        ++t_Counters.at(rec::Counter::unauthorizedTCP);
4✔
581
        return;
4✔
582
      }
4✔
583

584
      conn->data.resize(2);
16✔
585
      conn->state = TCPConnection::BYTE0;
16✔
586
    }
16✔
587
  }
16✔
588

589
  if (conn->state == TCPConnection::BYTE0) {
1,027✔
590
    ssize_t bytes = recv(conn->getFD(), conn->data.data(), 2, 0);
813✔
591
    if (bytes == 1) {
813✔
592
      conn->state = TCPConnection::BYTE1;
2✔
593
    }
2✔
594
    if (bytes == 2) {
813✔
595
      conn->qlen = (((unsigned char)conn->data[0]) << 8) + (unsigned char)conn->data[1];
450✔
596
      conn->data.resize(conn->qlen);
450✔
597
      conn->bytesread = 0;
450✔
598
      conn->state = TCPConnection::GETQUESTION;
450✔
599
    }
450✔
600
    if (bytes <= 0) {
813✔
601
      tcpGuard.handleTCPReadResult(fileDesc, bytes);
361✔
602
      return;
361✔
603
    }
361✔
604
  }
813✔
605

606
  if (conn->state == TCPConnection::BYTE1) {
666✔
607
    ssize_t bytes = recv(conn->getFD(), &conn->data[1], 1, 0);
4✔
608
    if (bytes == 1) {
4✔
609
      conn->state = TCPConnection::GETQUESTION;
2✔
610
      conn->qlen = (((unsigned char)conn->data[0]) << 8) + (unsigned char)conn->data[1];
2✔
611
      conn->data.resize(conn->qlen);
2✔
612
      conn->bytesread = 0;
2✔
613
    }
2✔
614
    if (bytes <= 0) {
4✔
615
      if (!tcpGuard.handleTCPReadResult(fileDesc, bytes)) {
2!
616
        if (g_logCommonErrors) {
×
617
          SLOG(g_log << Logger::Error << "TCP client " << conn->d_remote.toStringWithPort() << " disconnected after first byte" << endl,
×
618
               g_slogtcpin->info(Logr::Error, "TCP client disconnected after first byte", "remote", Logging::Loggable(conn->d_remote)));
×
619
        }
×
620
      }
×
621
      return;
2✔
622
    }
2✔
623
  }
4✔
624

625
  if (conn->state == TCPConnection::GETQUESTION) {
664!
626
    ssize_t bytes = recv(conn->getFD(), &conn->data[conn->bytesread], conn->qlen - conn->bytesread, 0);
664✔
627
    if (bytes <= 0) {
664✔
628
      if (!tcpGuard.handleTCPReadResult(fileDesc, bytes)) {
26✔
629
        if (g_logCommonErrors) {
3!
630
          SLOG(g_log << Logger::Error << "TCP client " << conn->d_remote.toStringWithPort() << " disconnected while reading question body" << endl,
3✔
631
               g_slogtcpin->info(Logr::Error, "TCP client disconnected while reading question body", "remote", Logging::Loggable(conn->d_remote)));
3✔
632
        }
3✔
633
      }
3✔
634
      return;
26✔
635
    }
26✔
636
    if (bytes > std::numeric_limits<std::uint16_t>::max()) {
638!
637
      if (g_logCommonErrors) {
×
638
        SLOG(g_log << Logger::Error << "TCP client " << conn->d_remote.toStringWithPort() << " sent an invalid question size while reading question body" << endl,
×
639
             g_slogtcpin->info(Logr::Error, "TCP client sent an invalid question size while reading question body", "remote", Logging::Loggable(conn->d_remote)));
×
640
      }
×
641
      return;
×
642
    }
×
643
    conn->bytesread += (uint16_t)bytes;
638✔
644
    if (conn->bytesread == conn->qlen) {
638✔
645
      conn->state = TCPConnection::BYTE0;
449✔
646
      std::unique_ptr<DNSComboWriter> comboWriter;
449✔
647
      try {
449✔
648
        comboWriter = std::make_unique<DNSComboWriter>(conn->data, g_now, t_pdl);
449✔
649
      }
449✔
650
      catch (const MOADNSException& mde) {
449✔
651
        t_Counters.at(rec::Counter::clientParseError)++;
×
652
        if (g_logCommonErrors) {
×
653
          SLOG(g_log << Logger::Error << "Unable to parse packet from TCP client " << conn->d_remote.toStringWithPort() << endl,
×
654
               g_slogtcpin->info(Logr::Error, "Unable to parse packet from TCP client", "remte", Logging::Loggable(conn->d_remote)));
×
655
        }
×
656
        return;
×
657
      }
×
658

659
      comboWriter->d_tcpConnection = conn; // carry the torch
449✔
660
      comboWriter->setSocket(conn->getFD()); // this is the only time a copy is made of the actual fd
449✔
661
      comboWriter->d_tcp = true;
449✔
662
      comboWriter->setRemote(conn->d_remote); // the address the query was received from
449✔
663
      comboWriter->setSource(conn->d_source); // the address we assume the query is coming from, might be set by proxy protocol
449✔
664
      ComboAddress dest;
449✔
665
      dest.reset();
449✔
666
      dest.sin4.sin_family = conn->d_remote.sin4.sin_family;
449✔
667
      socklen_t len = dest.getSocklen();
449✔
668
      getsockname(conn->getFD(), reinterpret_cast<sockaddr*>(&dest), &len); // if this fails, we're ok with it NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
449✔
669
      comboWriter->setLocal(dest); // the address we received the query on
449✔
670
      comboWriter->setDestination(conn->d_destination); // the address we assume the query is received on, might be set by proxy protocol
449✔
671
      comboWriter->setMappedSource(conn->d_mappedSource); // the address we assume the query is coming from after table based mapping
449✔
672
      /* we can't move this if we want to be able to access the values in
673
         all queries sent over this connection */
674
      comboWriter->d_proxyProtocolValues = conn->proxyProtocolValues;
449✔
675

676
      doProcessTCPQuestion(comboWriter, conn, tcpGuard, fileDesc);
449✔
677
    } // reading query
449✔
678
  }
638✔
679
  // more to come
680
  tcpGuard.keep();
638✔
681
}
638✔
682

683
//! Handle new incoming TCP connection
684
void handleNewTCPQuestion(int fileDesc, [[maybe_unused]] FDMultiplexer::funcparam_t& var)
685
{
381✔
686
  ComboAddress addr;
381✔
687
  socklen_t addrlen = sizeof(addr);
381✔
688
  int newsock = accept(fileDesc, reinterpret_cast<struct sockaddr*>(&addr), &addrlen); // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
381✔
689
  if (newsock < 0) {
381!
690
    return;
×
691
  }
×
692
  auto closeSock = [newsock](rec::Counter cnt, const string& msg) {
381✔
693
    try {
×
694
      closesocket(newsock);
×
695
      t_Counters.at(cnt)++;
×
696
      // We want this bump to percolate up without too much delay
697
      t_Counters.updateSnap(false);
×
698
    }
×
699
    catch (const PDNSException& e) {
×
700
      g_slogtcpin->error(Logr::Error, e.reason, msg, "exception", Logging::Loggable("PDNSException"));
×
701
    }
×
702
  };
×
703

704
  if (TCPConnection::getCurrentConnections() >= g_maxTCPClients) {
381!
705
    closeSock(rec::Counter::tcpOverflow, "Error closing TCP socket after an overflow drop");
×
706
    return;
×
707
  }
×
708
  if (g_multiTasker->numProcesses() >= g_maxMThreads) {
381!
709
    closeSock(rec::Counter::overCapacityDrops, "Error closing TCP socket after an over capacity drop");
×
710
    return;
×
711
  }
×
712

713
  ComboAddress destaddr;
381✔
714
  socklen_t len = sizeof(destaddr);
381✔
715
  getsockname(newsock, reinterpret_cast<sockaddr*>(&destaddr), &len); // if this fails, we're ok with it NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
381✔
716
  bool fromProxyProtocolSource = expectProxyProtocol(addr, destaddr);
381✔
717
  if (!fromProxyProtocolSource && t_remotes) {
381!
718
    t_remotes->push_back(addr);
349✔
719
  }
349✔
720
  ComboAddress mappedSource = addr;
381✔
721
  if (!fromProxyProtocolSource && t_proxyMapping) {
381✔
722
    if (const auto* iter = t_proxyMapping->lookup(addr)) {
5!
723
      mappedSource = iter->second.address;
5✔
724
      ++iter->second.stats.netmaskMatches;
5✔
725
    }
5✔
726
  }
5✔
727
  if (!fromProxyProtocolSource && t_allowFrom && !t_allowFrom->match(&mappedSource)) {
381!
728
    if (!g_quiet) {
×
729
      SLOG(g_log << Logger::Error << "[" << g_multiTasker->getTid() << "] dropping TCP query from " << mappedSource.toString() << ", address neither matched by allow-from nor proxy-protocol-from" << endl,
×
730
           g_slogtcpin->info(Logr::Error, "dropping TCP query address neither matched by allow-from nor proxy-protocol-from", "source", Logging::Loggable(mappedSource)));
×
731
    }
×
732
    closeSock(rec::Counter::unauthorizedTCP, "Error closing TCP socket after an ACL drop");
×
733
    return;
×
734
  }
×
735

736
  if (g_maxTCPPerClient > 0 && t_tcpClientCounts->count(addr) > 0 && (*t_tcpClientCounts)[addr] >= g_maxTCPPerClient) {
381!
737
    closeSock(rec::Counter::tcpClientOverflow, "Error closing TCP socket after a client overflow drop");
×
738
    return;
×
739
  }
×
740

741
  setNonBlocking(newsock);
381✔
742
  setTCPNoDelay(newsock);
381✔
743
  std::shared_ptr<TCPConnection> tcpConn = std::make_shared<TCPConnection>(newsock, addr);
381✔
744
  tcpConn->d_source = addr;
381✔
745
  tcpConn->d_destination = destaddr;
381✔
746
  tcpConn->d_mappedSource = mappedSource;
381✔
747

748
  if (fromProxyProtocolSource) {
381✔
749
    tcpConn->proxyProtocolNeed = s_proxyProtocolMinimumHeaderSize;
32✔
750
    tcpConn->data.resize(tcpConn->proxyProtocolNeed);
32✔
751
    tcpConn->state = TCPConnection::PROXYPROTOCOLHEADER;
32✔
752
  }
32✔
753
  else {
349✔
754
    tcpConn->state = TCPConnection::BYTE0;
349✔
755
  }
349✔
756

757
  timeval ttd{};
381✔
758
  Utility::gettimeofday(&ttd, nullptr);
381✔
759
  ttd.tv_sec += g_tcpTimeout;
381✔
760

761
  t_fdm->addReadFD(tcpConn->getFD(), handleRunningTCPQuestion, tcpConn, &ttd);
381✔
762
}
381✔
763

764
static void TCPIOHandlerIO(int fileDesc, FDMultiplexer::funcparam_t& var);
765

766
static void TCPIOHandlerStateChange(IOState oldstate, IOState newstate, std::shared_ptr<PacketID>& pid)
767
{
185✔
768
  TCPLOG(pid->tcpsock, "State transation " << int(oldstate) << "->" << int(newstate) << endl);
185✔
769

770
  pid->lowState = newstate;
185✔
771

772
  // handle state transitions
773
  switch (oldstate) {
185✔
774
  case IOState::NeedRead:
54✔
775

776
    switch (newstate) {
54!
777
    case IOState::NeedWrite:
×
778
      TCPLOG(pid->tcpsock, "NeedRead -> NeedWrite: flip FD" << endl);
×
779
      t_fdm->alterFDToWrite(pid->tcpsock, TCPIOHandlerIO, pid);
×
780
      break;
×
781
    case IOState::NeedRead:
1✔
782
      break;
1✔
783
    case IOState::Done:
53✔
784
      TCPLOG(pid->tcpsock, "Done -> removeReadFD" << endl);
53✔
785
      t_fdm->removeReadFD(pid->tcpsock);
53✔
786
      break;
53✔
787
    case IOState::Async:
×
788
      throw std::runtime_error("TLS async mode not supported");
×
789
      break;
×
790
    }
54✔
791
    break;
54✔
792

793
  case IOState::NeedWrite:
54✔
794

795
    switch (newstate) {
41!
796
    case IOState::NeedRead:
4✔
797
      TCPLOG(pid->tcpsock, "NeedWrite -> NeedRead: flip FD" << endl);
4✔
798
      t_fdm->alterFDToRead(pid->tcpsock, TCPIOHandlerIO, pid);
4✔
799
      break;
4✔
800
    case IOState::NeedWrite:
×
801
      break;
×
802
    case IOState::Done:
37✔
803
      TCPLOG(pid->tcpsock, "Done -> removeWriteFD" << endl);
37✔
804
      t_fdm->removeWriteFD(pid->tcpsock);
37✔
805
      break;
37✔
806
    case IOState::Async:
×
807
      throw std::runtime_error("TLS async mode not supported");
×
808
      break;
×
809
    }
41✔
810
    break;
41✔
811

812
  case IOState::Done:
89✔
813
    switch (newstate) {
89✔
814
    case IOState::NeedRead:
49✔
815
      TCPLOG(pid->tcpsock, "NeedRead: addReadFD" << endl);
49✔
816
      t_fdm->addReadFD(pid->tcpsock, TCPIOHandlerIO, pid);
49✔
817
      break;
49✔
818
    case IOState::NeedWrite:
41✔
819
      TCPLOG(pid->tcpsock, "NeedWrite: addWriteFD" << endl);
41✔
820
      t_fdm->addWriteFD(pid->tcpsock, TCPIOHandlerIO, pid);
41✔
821
      break;
41✔
822
    case IOState::Done:
×
823
      break;
×
824
    case IOState::Async:
×
825
      throw std::runtime_error("TLS async mode not supported");
×
826
      break;
×
827
    }
89✔
828
    break;
90✔
829

830
  case IOState::Async:
90!
831
    throw std::runtime_error("TLS async mode not supported");
×
832
    break;
×
833
  }
185✔
834
}
185✔
835

836
static void TCPIOHandlerIO(int fileDesc, FDMultiplexer::funcparam_t& var)
837
{
95✔
838
  auto pid = boost::any_cast<std::shared_ptr<PacketID>>(var);
95✔
839
  assert(pid->tcphandler); // NOLINT(cppcoreguidelines-pro-bounds-array-to-pointer-decay): def off assert triggers it
95✔
840
  assert(fileDesc == pid->tcphandler->getDescriptor()); // NOLINT(cppcoreguidelines-pro-bounds-array-to-pointer-decay) idem
×
841
  IOState newstate = IOState::Done;
×
842

843
  TCPLOG(pid->tcpsock, "TCPIOHandlerIO: lowState " << int(pid->lowState) << endl);
95✔
844

845
  // In the code below, we want to update the state of the fd before calling sendEvent
846
  // a sendEvent might close the fd, and some poll multiplexers do not like to manipulate a closed fd
847

848
  switch (pid->highState) {
95!
849
  case TCPAction::DoingRead:
50✔
850
    TCPLOG(pid->tcpsock, "highState: Reading" << endl);
50✔
851
    // In arecvtcp, the buffer was resized already so inWanted bytes will fit
852
    // try reading
853
    try {
50✔
854
      newstate = pid->tcphandler->tryRead(pid->inMSG, pid->inPos, pid->inWanted);
50✔
855
      switch (newstate) {
50!
856
      case IOState::Done:
49✔
857
      case IOState::NeedRead:
50✔
858
        TCPLOG(pid->tcpsock, "tryRead: Done or NeedRead " << int(newstate) << ' ' << pid->inPos << '/' << pid->inWanted << endl);
50✔
859
        TCPLOG(pid->tcpsock, "TCPIOHandlerIO " << pid->inWanted << ' ' << pid->inIncompleteOkay << endl);
50✔
860
        if (pid->inPos == pid->inWanted || (pid->inIncompleteOkay && pid->inPos > 0)) {
50!
861
          pid->inMSG.resize(pid->inPos); // old content (if there) + new bytes read, only relevant for the inIncompleteOkay case
49✔
862
          newstate = IOState::Done;
49✔
863
          TCPIOHandlerStateChange(pid->lowState, newstate, pid);
49✔
864
          g_multiTasker->sendEvent(pid, &pid->inMSG);
49✔
865
          return;
49✔
866
        }
49✔
867
        break;
1✔
868
      case IOState::NeedWrite:
1!
869
        break;
×
870
      case IOState::Async:
×
871
        throw std::runtime_error("TLS async mode not supported");
×
872
        break;
×
873
      }
50✔
874
    }
50✔
875
    catch (const std::exception& e) {
50✔
876
      newstate = IOState::Done;
×
877
      TCPLOG(pid->tcpsock, "read exception..." << e.what() << endl);
×
878
      PacketBuffer empty;
×
879
      TCPIOHandlerStateChange(pid->lowState, newstate, pid);
×
880
      g_multiTasker->sendEvent(pid, &empty); // this conveys error status
×
881
      return;
×
882
    }
×
883
    break;
1✔
884

885
  case TCPAction::DoingWrite:
45✔
886
    TCPLOG(pid->tcpsock, "highState: Writing" << endl);
45✔
887
    try {
45✔
888
      TCPLOG(pid->tcpsock, "tryWrite: " << pid->outPos << '/' << pid->outMSG.size() << ' ' << " -> ");
45✔
889
      newstate = pid->tcphandler->tryWrite(pid->outMSG, pid->outPos, pid->outMSG.size());
45✔
890
      TCPLOG(pid->tcpsock, pid->outPos << '/' << pid->outMSG.size() << endl);
45✔
891
      switch (newstate) {
45!
892
      case IOState::Done: {
41✔
893
        TCPLOG(pid->tcpsock, "tryWrite: Done" << endl);
41✔
894
        TCPIOHandlerStateChange(pid->lowState, newstate, pid);
41✔
895
        g_multiTasker->sendEvent(pid, &pid->outMSG); // send back what we sent to convey everything is ok
41✔
896
        return;
41✔
897
      }
×
898
      case IOState::NeedRead:
4✔
899
        TCPLOG(pid->tcpsock, "tryWrite: NeedRead" << endl);
4✔
900
        break;
4✔
901
      case IOState::NeedWrite:
×
902
        TCPLOG(pid->tcpsock, "tryWrite: NeedWrite" << endl);
×
903
        break;
×
904
      case IOState::Async:
×
905
        throw std::runtime_error("TLS async mode not supported");
×
906
        break;
×
907
      }
45✔
908
    }
45✔
909
    catch (const std::exception& e) {
45✔
UNCOV
910
      newstate = IOState::Done;
×
UNCOV
911
      TCPLOG(pid->tcpsock, "write exception..." << e.what() << endl);
×
UNCOV
912
      PacketBuffer sent;
×
UNCOV
913
      TCPIOHandlerStateChange(pid->lowState, newstate, pid);
×
UNCOV
914
      g_multiTasker->sendEvent(pid, &sent); // we convey error status by sending empty string
×
UNCOV
915
      return;
×
UNCOV
916
    }
×
917
    break;
4✔
918
  }
95✔
919

920
  // Cases that did not end up doing a sendEvent
921
  TCPIOHandlerStateChange(pid->lowState, newstate, pid);
5✔
922
}
5✔
923

924
void checkFastOpenSysctl([[maybe_unused]] bool active, [[maybe_unused]] Logr::log_t log)
925
{
×
926
#ifdef __linux__
×
927
  string line;
×
928
  if (readFileIfThere("/proc/sys/net/ipv4/tcp_fastopen", &line)) {
×
929
    int flag = std::stoi(line);
×
930
    if (active && !(flag & 1)) {
×
931
      SLOG(g_log << Logger::Error << "tcp-fast-open-connect enabled but net.ipv4.tcp_fastopen does not allow it" << endl,
×
932
           log->info(Logr::Error, "tcp-fast-open-connect enabled but net.ipv4.tcp_fastopen does not allow it"));
×
933
    }
×
934
    if (!active && !(flag & 2)) {
×
935
      SLOG(g_log << Logger::Error << "tcp-fast-open enabled but net.ipv4.tcp_fastopen does not allow it" << endl,
×
936
           log->info(Logr::Error, "tcp-fast-open enabled but net.ipv4.tcp_fastopen does not allow it"));
×
937
    }
×
938
  }
×
939
  else {
×
940
    SLOG(g_log << Logger::Notice << "Cannot determine if kernel settings allow fast-open" << endl,
×
941
         log->info(Logr::Notice, "Cannot determine if kernel settings allow fast-open"));
×
942
  }
×
943
#else
944
  SLOG(g_log << Logger::Notice << "Cannot determine if kernel settings allow fast-open" << endl,
945
       log->info(Logr::Notice, "Cannot determine if kernel settings allow fast-open"));
946
#endif
947
}
×
948

949
void checkTFOconnect(Logr::log_t log)
950
{
×
951
  try {
×
952
    Socket socket(AF_INET, SOCK_STREAM);
×
953
    socket.setNonBlocking();
×
954
    socket.setFastOpenConnect();
×
955
  }
×
956
  catch (const NetworkError& e) {
×
957
    SLOG(g_log << Logger::Error << "tcp-fast-open-connect enabled but returned error: " << e.what() << endl,
×
958
         log->error(Logr::Error, e.what(), "tcp-fast-open-connect enabled but returned error"));
×
959
  }
×
960
}
×
961

962
LWResult::Result asendtcp(const PacketBuffer& data, shared_ptr<TCPIOHandler>& handler)
963
{
53✔
964
  TCPLOG(handler->getDescriptor(), "asendtcp called " << data.size() << endl);
53✔
965

966
  auto pident = std::make_shared<PacketID>();
53✔
967
  pident->tcphandler = handler;
53✔
968
  pident->tcpsock = handler->getDescriptor();
53✔
969
  pident->outMSG = data;
53✔
970
  pident->highState = TCPAction::DoingWrite;
53✔
971

972
  IOState state = IOState::Done;
53✔
973
  try {
53✔
974
    TCPLOG(pident->tcpsock, "Initial tryWrite: " << pident->outPos << '/' << pident->outMSG.size() << ' ' << " -> ");
53✔
975
    state = handler->tryWrite(pident->outMSG, pident->outPos, pident->outMSG.size());
53✔
976
    TCPLOG(pident->tcpsock, pident->outPos << '/' << pident->outMSG.size() << endl);
53✔
977

978
    if (state == IOState::Done) {
53✔
979
      TCPLOG(pident->tcpsock, "asendtcp success A" << endl);
12✔
980
      return LWResult::Result::Success;
12✔
981
    }
12✔
982
  }
53✔
983
  catch (const std::exception& e) {
53✔
984
    TCPLOG(pident->tcpsock, "tryWrite() exception..." << e.what() << endl);
×
985
    return LWResult::Result::PermanentError;
×
986
  }
×
987

988
  // Will set pident->lowState
989
  TCPIOHandlerStateChange(IOState::Done, state, pident);
41✔
990

991
  PacketBuffer packet;
41✔
992
  int ret = g_multiTasker->waitEvent(pident, &packet, g_networkTimeoutMsec);
41✔
993
  TCPLOG(pident->tcpsock, "asendtcp waitEvent returned " << ret << ' ' << packet.size() << '/' << data.size() << ' ');
41✔
994
  if (ret == 0) {
41!
995
    TCPLOG(pident->tcpsock, "timeout" << endl);
×
996
    TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
×
997
    return LWResult::Result::Timeout;
×
998
  }
×
999
  if (ret == -1) { // error
41!
1000
    TCPLOG(pident->tcpsock, "PermanentError" << endl);
×
1001
    TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
×
1002
    return LWResult::Result::PermanentError;
×
1003
  }
×
1004
  if (packet.size() != data.size()) { // main loop tells us what it sent out, or empty in case of an error
41!
1005
    // fd housekeeping done by TCPIOHandlerIO
UNCOV
1006
    TCPLOG(pident->tcpsock, "PermanentError size mismatch" << endl);
×
UNCOV
1007
    return LWResult::Result::PermanentError;
×
UNCOV
1008
  }
×
1009

1010
  TCPLOG(pident->tcpsock, "asendtcp success" << endl);
41✔
1011
  return LWResult::Result::Success;
41✔
1012
}
41✔
1013

1014
LWResult::Result arecvtcp(PacketBuffer& data, const size_t len, shared_ptr<TCPIOHandler>& handler, const bool incompleteOkay)
1015
{
98✔
1016
  TCPLOG(handler->getDescriptor(), "arecvtcp called " << len << ' ' << data.size() << endl);
98✔
1017
  data.resize(len);
98✔
1018

1019
  // We might have data already available from the TLS layer, try to get that into the buffer
1020
  size_t pos = 0;
98✔
1021
  IOState state = IOState::Done;
98✔
1022
  try {
98✔
1023
    TCPLOG(handler->getDescriptor(), "calling tryRead() " << len << endl);
98✔
1024
    state = handler->tryRead(data, pos, len);
98✔
1025
    TCPLOG(handler->getDescriptor(), "arcvtcp tryRead() returned " << int(state) << ' ' << pos << '/' << len << endl);
98✔
1026
    switch (state) {
98!
1027
    case IOState::Done:
49✔
1028
    case IOState::NeedRead:
98✔
1029
      if (pos == len || (incompleteOkay && pos > 0)) {
98!
1030
        data.resize(pos);
49✔
1031
        TCPLOG(handler->getDescriptor(), "acecvtcp success A" << endl);
49✔
1032
        return LWResult::Result::Success;
49✔
1033
      }
49✔
1034
      break;
49✔
1035
    case IOState::NeedWrite:
49!
1036
      break;
×
1037
    case IOState::Async:
×
1038
      throw std::runtime_error("TLS async mode not supported");
×
1039
      break;
×
1040
    }
98✔
1041
  }
98✔
1042
  catch (const std::exception& e) {
98✔
1043
    TCPLOG(handler->getDescriptor(), "tryRead() exception..." << e.what() << endl);
×
1044
    return LWResult::Result::PermanentError;
×
1045
  }
×
1046

1047
  auto pident = std::make_shared<PacketID>();
49✔
1048
  pident->tcphandler = handler;
49✔
1049
  pident->tcpsock = handler->getDescriptor();
49✔
1050
  // We might have a partial result
1051
  pident->inMSG = std::move(data);
49✔
1052
  pident->inPos = pos;
49✔
1053
  pident->inWanted = len;
49✔
1054
  pident->inIncompleteOkay = incompleteOkay;
49✔
1055
  pident->highState = TCPAction::DoingRead;
49✔
1056

1057
  data.clear();
49✔
1058

1059
  // Will set pident->lowState
1060
  TCPIOHandlerStateChange(IOState::Done, state, pident);
49✔
1061

1062
  int ret = g_multiTasker->waitEvent(pident, &data, authWaitTimeMSec(g_multiTasker));
49✔
1063
  TCPLOG(pident->tcpsock, "arecvtcp " << ret << ' ' << data.size() << ' ');
49✔
1064
  if (ret == 0) {
49!
1065
    TCPLOG(pident->tcpsock, "timeout" << endl);
×
1066
    TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
×
1067
    return LWResult::Result::Timeout;
×
1068
  }
×
1069
  if (ret == -1) {
49!
1070
    TCPLOG(pident->tcpsock, "PermanentError" << endl);
×
1071
    TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
×
1072
    return LWResult::Result::PermanentError;
×
1073
  }
×
1074
  if (data.empty()) { // error, EOF or other
49!
1075
    // fd housekeeping done by TCPIOHandlerIO
1076
    TCPLOG(pident->tcpsock, "EOF" << endl);
×
1077
    return LWResult::Result::PermanentError;
×
1078
  }
×
1079

1080
  TCPLOG(pident->tcpsock, "arecvtcp success" << endl);
49✔
1081
  return LWResult::Result::Success;
49✔
1082
}
49✔
1083

1084
// The two last arguments to makeTCPServerSockets are used for logging purposes only
1085
unsigned int makeTCPServerSockets(deferredAdd_t& deferredAdds, std::set<int>& tcpSockets, Logr::log_t log, bool doLog, unsigned int instances)
1086
{
157✔
1087
  vector<string> localAddresses;
157✔
1088
  vector<string> logVec;
157✔
1089
  stringtok(localAddresses, ::arg()["local-address"], " ,");
157✔
1090

1091
  if (localAddresses.empty()) {
157!
1092
    throw PDNSException("No local address specified");
×
1093
  }
×
1094

1095
#ifdef TCP_DEFER_ACCEPT
157✔
1096
  auto first = true;
157✔
1097
#endif
157✔
1098
  const uint16_t defaultLocalPort = ::arg().asNum("local-port");
157✔
1099
  for (const auto& localAddress : localAddresses) {
158✔
1100
    ComboAddress address{localAddress, defaultLocalPort};
158✔
1101
    const int socketFd = socket(address.sin6.sin6_family, SOCK_STREAM, 0);
158✔
1102
    if (socketFd < 0) {
158!
1103
      throw PDNSException("Making a TCP server socket for resolver: " + stringerror());
×
1104
    }
×
1105
    logVec.emplace_back(address.toStringWithPort());
158✔
1106
    setCloseOnExec(socketFd);
158✔
1107

1108
    int tmp = 1;
158✔
1109
    if (setsockopt(socketFd, SOL_SOCKET, SO_REUSEADDR, &tmp, sizeof tmp) < 0) {
158!
1110
      int err = errno;
×
1111
      SLOG(g_log << Logger::Error << "Setsockopt failed for TCP listening socket" << endl,
×
1112
           log->error(Logr::Critical, err, "Setsockopt failed for TCP listening socket"));
×
1113
      _exit(1);
×
1114
    }
×
1115
    if (address.sin6.sin6_family == AF_INET6 && setsockopt(socketFd, IPPROTO_IPV6, IPV6_V6ONLY, &tmp, sizeof(tmp)) < 0) {
158!
1116
      int err = errno;
×
1117
      SLOG(g_log << Logger::Error << "Failed to set IPv6 socket to IPv6 only, continuing anyhow: " << stringerror(err) << endl,
×
1118
           log->error(Logr::Warning, err, "Failed to set IPv6 socket to IPv6 only, continuing anyhow"));
×
1119
    }
×
1120

1121
#ifdef TCP_DEFER_ACCEPT
158✔
1122
    if (setsockopt(socketFd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &tmp, sizeof tmp) >= 0) {
158!
1123
      if (first) {
158✔
1124
        SLOG(g_log << Logger::Info << "Enabled TCP data-ready filter for (slight) DoS protection" << endl,
157✔
1125
             log->info(Logr::Info, "Enabled TCP data-ready filter for (slight) DoS protection"));
157✔
1126
      }
157✔
1127
    }
158✔
1128
#endif
158✔
1129

1130
    if (::arg().mustDo("non-local-bind")) {
158!
1131
      Utility::setBindAny(AF_INET, socketFd);
×
1132
    }
×
1133

1134
    if (g_reusePort) {
158✔
1135
#if defined(SO_REUSEPORT_LB)
1136
      try {
1137
        SSetsockopt(socketFd, SOL_SOCKET, SO_REUSEPORT_LB, 1);
1138
      }
1139
      catch (const std::exception& e) {
1140
        throw PDNSException(std::string("SO_REUSEPORT_LB: ") + e.what());
1141
      }
1142
#elif defined(SO_REUSEPORT)
1143
      try {
143✔
1144
        SSetsockopt(socketFd, SOL_SOCKET, SO_REUSEPORT, 1);
143✔
1145
      }
143✔
1146
      catch (const std::exception& e) {
143✔
1147
        throw PDNSException(std::string("SO_REUSEPORT: ") + e.what());
×
1148
      }
×
1149
#endif
143✔
1150
    }
143✔
1151

1152
    if (SyncRes::s_tcp_fast_open > 0) {
158!
1153
      checkFastOpenSysctl(false, log);
×
1154
#ifdef TCP_FASTOPEN
×
1155
      if (setsockopt(socketFd, IPPROTO_TCP, TCP_FASTOPEN, &SyncRes::s_tcp_fast_open, sizeof SyncRes::s_tcp_fast_open) < 0) {
×
1156
        int err = errno;
×
1157
        SLOG(g_log << Logger::Error << "Failed to enable TCP Fast Open for listening socket: " << stringerror(err) << endl,
×
1158
             log->error(Logr::Error, err, "Failed to enable TCP Fast Open for listening socket"));
×
1159
      }
×
1160
#else
1161
      SLOG(g_log << Logger::Warning << "TCP Fast Open configured but not supported for listening socket" << endl,
1162
           log->info(Logr::Warning, "TCP Fast Open configured but not supported for listening socket"));
1163
#endif
1164
    }
×
1165

1166
    socklen_t socklen = address.sin4.sin_family == AF_INET ? sizeof(address.sin4) : sizeof(address.sin6);
158✔
1167
    if (::bind(socketFd, reinterpret_cast<struct sockaddr*>(&address), socklen) < 0) { // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
158!
1168
      throw PDNSException("Binding TCP server socket for " + address.toStringWithPort() + ": " + stringerror());
×
1169
    }
×
1170

1171
    setNonBlocking(socketFd);
158✔
1172
    try {
158✔
1173
      setSocketSendBuffer(socketFd, 65000);
158✔
1174
    }
158✔
1175
    catch (const std::exception& e) {
158✔
1176
      SLOG(g_log << Logger::Error << e.what() << endl,
×
1177
           log->error(Logr::Error, e.what(), "Exception while setting socket send buffer"));
×
1178
    }
×
1179

1180
    listen(socketFd, 128);
158✔
1181
    deferredAdds.emplace_back(socketFd, handleNewTCPQuestion);
158✔
1182
    tcpSockets.insert(socketFd);
158✔
1183

1184
    // we don't need to update g_listenSocketsAddresses since it doesn't work for TCP/IP:
1185
    //  - fd is not that which we know here, but returned from accept()
1186

1187
#ifdef TCP_DEFER_ACCEPT
158✔
1188
    first = false;
158✔
1189
#endif
158✔
1190
  }
158✔
1191
  if (doLog) {
157!
1192
    log->info(Logr::Info, "Listening for queries", "protocol", Logging::Loggable("TCP"), "addresses", Logging::IterLoggable(logVec.cbegin(), logVec.cend()), "socketInstances", Logging::Loggable(instances), "reuseport", Logging::Loggable(g_reusePort));
157✔
1193
  }
157✔
1194
  return localAddresses.size();
157✔
1195
}
157✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc