• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

PowerDNS / pdns / 19741624072

27 Nov 2025 03:45PM UTC coverage: 73.086% (+0.02%) from 73.065%
19741624072

Pull #16570

github

web-flow
Merge 08a2cdb1d into f94a3f63f
Pull Request #16570: rec: rewrite all unwrap calls in web.rs

38523 of 63408 branches covered (60.75%)

Branch coverage included in aggregate %.

128044 of 164496 relevant lines covered (77.84%)

6531485.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.89
/pdns/dnsdistdist/dnsdist-backend.cc
1
/*
2
 * This file is part of PowerDNS or dnsdist.
3
 * Copyright -- PowerDNS.COM B.V. and its contributors
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of version 2 of the GNU General Public License as
7
 * published by the Free Software Foundation.
8
 *
9
 * In addition, for the avoidance of any doubt, permission is granted to
10
 * link this program with OpenSSL and to (re)distribute the binaries
11
 * produced as the result of such linking.
12
 *
13
 * This program is distributed in the hope that it will be useful,
14
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
 * GNU General Public License for more details.
17
 *
18
 * You should have received a copy of the GNU General Public License
19
 * along with this program; if not, write to the Free Software
20
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21
 */
22

23
// for OpenBSD, sys/socket.h needs to come before net/if.h
24
#include <sys/socket.h>
25
#include <net/if.h>
26

27
#include <boost/format.hpp>
28

29
#include "config.h"
30
#include "dnsdist.hh"
31
#include "dnsdist-backend.hh"
32
#include "dnsdist-backoff.hh"
33
#include "dnsdist-metrics.hh"
34
#include "dnsdist-nghttp2.hh"
35
#include "dnsdist-random.hh"
36
#include "dnsdist-rings.hh"
37
#include "dnsdist-snmp.hh"
38
#include "dnsdist-tcp.hh"
39
#include "dnsdist-xsk.hh"
40
#include "dolog.hh"
41
#include "xsk.hh"
42

43
bool DownstreamState::passCrossProtocolQuery(std::unique_ptr<CrossProtocolQuery>&& cpq)
44
{
333✔
45
#if defined(HAVE_DNS_OVER_HTTPS) && defined(HAVE_NGHTTP2)
333✔
46
  if (!d_config.d_dohPath.empty()) {
333✔
47
    return g_dohClientThreads && g_dohClientThreads->passCrossProtocolQueryToThread(std::move(cpq));
113!
48
  }
113✔
49
#endif
220✔
50
  return g_tcpclientthreads && g_tcpclientthreads->passCrossProtocolQueryToThread(std::move(cpq));
220!
51
}
333✔
52

53
#ifdef HAVE_XSK
54
void DownstreamState::addXSKDestination(int fd)
55
{
×
56
  auto socklen = d_config.remote.getSocklen();
×
57
  ComboAddress local;
×
58
    // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast): sorry, it's the API
59
  if (getsockname(fd, reinterpret_cast<sockaddr*>(&local), &socklen) != 0) {
×
60
    return;
×
61
  }
×
62

63
  {
×
64
    auto addresses = d_socketSourceAddresses.write_lock();
×
65
    addresses->push_back(local);
×
66
  }
×
67
  dnsdist::xsk::addDestinationAddress(local);
×
68
  for (size_t idx = 0; idx < d_xskSockets.size(); idx++) {
×
69
    d_xskSockets.at(idx)->addWorkerRoute(d_xskInfos.at(idx), local);
×
70
  }
×
71
}
×
72

73
void DownstreamState::removeXSKDestination(int fd)
74
{
×
75
  auto socklen = d_config.remote.getSocklen();
×
76
  ComboAddress local;
×
77
  // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast): sorry, it's the API
78
  if (getsockname(fd, reinterpret_cast<sockaddr*>(&local), &socklen) != 0) {
×
79
    return;
×
80
  }
×
81

82
  dnsdist::xsk::removeDestinationAddress(local);
×
83
  for (auto& xskSocket : d_xskSockets) {
×
84
    xskSocket->removeWorkerRoute(local);
×
85
  }
×
86
}
×
87
#endif /* HAVE_XSK */
88

89
bool DownstreamState::reconnect(bool initialAttempt)
90
{
427✔
91
  std::unique_lock<std::mutex> lock(connectLock, std::try_to_lock);
427✔
92
  if (!lock.owns_lock() || isStopped()) {
427!
93
    /* we are already reconnecting or stopped anyway */
94
    return false;
×
95
  }
×
96

97
  if (IsAnyAddress(d_config.remote)) {
427✔
98
    return true;
12✔
99
  }
12✔
100

101
  connected = false;
415✔
102
#ifdef HAVE_XSK
415✔
103
  if (!d_xskInfos.empty()) {
415!
104
    auto addresses = d_socketSourceAddresses.write_lock();
×
105
    addresses->clear();
×
106
  }
×
107
#endif /* HAVE_XSK */
415✔
108

109
  for (auto& fd : sockets) {
415✔
110
    if (fd != -1) {
415!
111
      if (sockets.size() > 1) {
×
112
        (*mplexer.lock())->removeReadFD(fd);
×
113
      }
×
114
#ifdef HAVE_XSK
×
115
      if (!d_xskInfos.empty()) {
×
116
        removeXSKDestination(fd);
×
117
      }
×
118
#endif /* HAVE_XSK */
×
119
      /* shutdown() is needed to wake up recv() in the responderThread */
120
      shutdown(fd, SHUT_RDWR);
×
121
      close(fd);
×
122
      fd = -1;
×
123
    }
×
124
    fd = SSocket(d_config.remote.sin4.sin_family, SOCK_DGRAM, 0);
415✔
125

126
#ifdef SO_BINDTODEVICE
415✔
127
    if (!d_config.sourceItfName.empty()) {
415✔
128
      int res = setsockopt(fd, SOL_SOCKET, SO_BINDTODEVICE, d_config.sourceItfName.c_str(), d_config.sourceItfName.length());
2✔
129
      if (res != 0) {
2!
130
        infolog("Error setting up the interface on backend socket '%s': %s", d_config.remote.toStringWithPort(), stringerror());
×
131
      }
×
132
    }
2✔
133
#endif
415✔
134

135
    if (!IsAnyAddress(d_config.sourceAddr)) {
415✔
136
#ifdef IP_BIND_ADDRESS_NO_PORT
2✔
137
      if (d_config.ipBindAddrNoPort) {
2!
138
        SSetsockopt(fd, SOL_IP, IP_BIND_ADDRESS_NO_PORT, 1);
2✔
139
      }
2✔
140
#endif
2✔
141
      SBind(fd, d_config.sourceAddr);
2✔
142
    }
2✔
143

144
    try {
415✔
145
      setDscp(fd, d_config.remote.sin4.sin_family, d_config.dscp);
415✔
146
      SConnect(fd, d_config.tcpFastOpen, d_config.remote);
415✔
147
      if (sockets.size() > 1) {
415!
148
        (*mplexer.lock())->addReadFD(fd, [](int, boost::any) {});
×
149
      }
×
150
#ifdef HAVE_XSK
415✔
151
      if (!d_xskInfos.empty()) {
415!
152
        addXSKDestination(fd);
×
153
      }
×
154
#endif /* HAVE_XSK */
415✔
155
      connected = true;
415✔
156
    }
415✔
157
    catch (const std::runtime_error& error) {
415✔
158
      if (initialAttempt || dnsdist::configuration::getCurrentRuntimeConfiguration().d_verbose) {
×
159
        if (!IsAnyAddress(d_config.sourceAddr) || !d_config.sourceItfName.empty()) {
×
160
            infolog("Error connecting to new server with address %s (source address: %s, source interface: %s): %s", d_config.remote.toStringWithPort(), IsAnyAddress(d_config.sourceAddr) ? "not set" : d_config.sourceAddr.toString(), d_config.sourceItfName.empty() ? "not set" : d_config.sourceItfName, error.what());
×
161
          }
×
162
        else {
×
163
          infolog("Error connecting to new server with address %s: %s", d_config.remote.toStringWithPort(), error.what());
×
164
        }
×
165
      }
×
166
      connected = false;
×
167
      break;
×
168
    }
×
169
  }
415✔
170

171
  /* if at least one (re-)connection failed, close all sockets */
172
  if (!connected) {
415!
173
#ifdef HAVE_XSK
×
174
    if (!d_xskInfos.empty()) {
×
175
      auto addresses = d_socketSourceAddresses.write_lock();
×
176
      addresses->clear();
×
177
    }
×
178
#endif /* HAVE_XSK */
×
179
    for (auto& fd : sockets) {
×
180
      if (fd != -1) {
×
181
#ifdef HAVE_XSK
×
182
        if (!d_xskInfos.empty()) {
×
183
          removeXSKDestination(fd);
×
184
        }
×
185
#endif /* HAVE_XSK */
×
186
        if (sockets.size() > 1) {
×
187
          try {
×
188
            (*mplexer.lock())->removeReadFD(fd);
×
189
          }
×
190
          catch (const FDMultiplexerException& e) {
×
191
            /* some sockets might not have been added to the multiplexer
192
               yet, that's fine */
193
          }
×
194
        }
×
195
        /* shutdown() is needed to wake up recv() in the responderThread */
196
        shutdown(fd, SHUT_RDWR);
×
197
        close(fd);
×
198
        fd = -1;
×
199
      }
×
200
    }
×
201
  }
×
202

203
  if (connected) {
415!
204
    lock.unlock();
415✔
205
    d_connectedWait.notify_all();
415✔
206
    if (!initialAttempt) {
415!
207
      /* we need to be careful not to start this
208
         thread too soon, as the creation should only
209
         happen after the configuration has been parsed */
210
      start();
×
211
    }
×
212
  }
415✔
213

214
  return connected;
415✔
215
}
415✔
216

217
void DownstreamState::waitUntilConnected()
218
{
×
219
  if (d_stopped) {
×
220
    return;
×
221
  }
×
222
  if (connected) {
×
223
    return;
×
224
  }
×
225
  {
×
226
    std::unique_lock<std::mutex> lock(connectLock);
×
227
    d_connectedWait.wait(lock, [this]{
×
228
      return connected.load();
×
229
    });
×
230
  }
×
231
}
×
232

233
void DownstreamState::stop()
234
{
906✔
235
  if (d_stopped) {
906!
236
    return;
×
237
  }
×
238
  d_stopped = true;
906✔
239

240
  {
906✔
241
    auto tlock = std::scoped_lock(connectLock);
906✔
242
    auto slock = mplexer.lock();
906✔
243

244
    for (auto& fd : sockets) {
906✔
245
      if (fd != -1) {
415!
246
        /* shutdown() is needed to wake up recv() in the responderThread */
247
        shutdown(fd, SHUT_RDWR);
415✔
248
      }
415✔
249
    }
415✔
250
  }
906✔
251
}
906✔
252

253
void DownstreamState::hash()
254
{
2✔
255
  const auto hashPerturbation = dnsdist::configuration::getImmutableConfiguration().d_hashPerturbation;
2✔
256
  vinfolog("Computing hashes for id=%s and weight=%d, hash_perturbation=%d", *d_config.id, d_config.d_weight, hashPerturbation);
2!
257
  auto weight = d_config.d_weight;
2✔
258
  auto idStr = boost::str(boost::format("%s") % *d_config.id);
2✔
259
  auto lockedHashes = hashes.write_lock();
2✔
260
  lockedHashes->clear();
2✔
261
  lockedHashes->reserve(weight);
2✔
262
  while (weight > 0) {
2,002✔
263
    std::string uuid = boost::str(boost::format("%s-%d") % idStr % weight);
2,000✔
264
    // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast): sorry, it's the burtle API
265
    unsigned int wshash = burtleCI(reinterpret_cast<const unsigned char*>(uuid.c_str()), uuid.size(), hashPerturbation);
2,000✔
266
    lockedHashes->push_back(wshash);
2,000✔
267
    --weight;
2,000✔
268
  }
2,000✔
269
  std::sort(lockedHashes->begin(), lockedHashes->end());
2✔
270
  hashesComputed = true;
2✔
271
}
2✔
272

273
void DownstreamState::setId(const boost::uuids::uuid& newId)
274
{
7✔
275
  d_config.id = newId;
7✔
276
  // compute hashes only if already done
277
  if (hashesComputed) {
7!
278
    hash();
×
279
  }
×
280
}
7✔
281

282
void DownstreamState::setWeight(int newWeight)
283
{
1,055✔
284
  if (newWeight < 1) {
1,055!
285
    errlog("Error setting server's weight: downstream weight value must be greater than 0.");
×
286
    return ;
×
287
  }
×
288

289
  d_config.d_weight = newWeight;
1,055✔
290

291
  if (hashesComputed) {
1,055!
292
    hash();
×
293
  }
×
294
}
1,055✔
295

296
DownstreamState::DownstreamState(DownstreamState::Config&& config, std::shared_ptr<TLSCtx> tlsCtx, bool connect): d_config(std::move(config)), d_tlsCtx(std::move(tlsCtx))
1,055✔
297
{
1,055✔
298
  threadStarted.clear();
1,055✔
299

300
  if (d_config.d_qpsLimit > 0) {
1,055✔
301
    d_qpsLimiter = QPSLimiter(d_config.d_qpsLimit, d_config.d_qpsLimit);
2✔
302
  }
2✔
303

304
  if (d_config.id) {
1,055✔
305
    setId(*d_config.id);
7✔
306
  }
7✔
307
  else {
1,048✔
308
    d_config.id = getUniqueID();
1,048✔
309
  }
1,048✔
310

311
  if (d_config.d_weight > 0) {
1,055!
312
    setWeight(d_config.d_weight);
1,055✔
313
  }
1,055✔
314

315
  if (d_config.d_availability == Availability::Auto && d_config.d_healthCheckMode == HealthCheckMode::Lazy && d_config.d_lazyHealthCheckSampleSize > 0) {
1,055!
316
    d_lazyHealthCheckStats.lock()->d_lastResults.set_capacity(d_config.d_lazyHealthCheckSampleSize);
15✔
317
    setUpStatus(true);
15✔
318
  }
15✔
319

320
  setName(d_config.name);
1,055✔
321

322
  if (d_tlsCtx && !d_config.d_dohPath.empty()) {
1,055✔
323
#ifdef HAVE_NGHTTP2
30✔
324
    auto outgoingDoHWorkerThreads = dnsdist::configuration::getImmutableConfiguration().d_outgoingDoHWorkers;
30✔
325
    if (dnsdist::configuration::isImmutableConfigurationDone() && outgoingDoHWorkerThreads && *outgoingDoHWorkerThreads == 0) {
30!
326
      throw std::runtime_error("Error: setOutgoingDoHWorkerThreads() is set to 0 so no outgoing DoH worker thread is available to serve queries");
327
    }
328

329
    if (!dnsdist::configuration::isImmutableConfigurationDone() && (!outgoingDoHWorkerThreads || *outgoingDoHWorkerThreads == 0)) {
30!
330
      dnsdist::configuration::updateImmutableConfiguration([](dnsdist::configuration::ImmutableConfiguration& immutableConfig) {
26✔
331
        immutableConfig.d_outgoingDoHWorkers = 1;
26✔
332
      });
26✔
333
    }
26✔
334
#endif /* HAVE_NGHTTP2 */
30✔
335
  }
30✔
336

337
  if (connect && !isTCPOnly()) {
1,055✔
338
    if (!IsAnyAddress(d_config.remote)) {
415!
339
      connectUDPSockets();
415✔
340
    }
415✔
341
  }
415✔
342

343
  sw.start();
1,055✔
344
}
1,055✔
345

346

347
void DownstreamState::start()
348
{
416✔
349
  if (connected && !threadStarted.test_and_set()) {
416!
350
#ifdef HAVE_XSK
413✔
351
    for (auto& xskInfo : d_xskInfos) {
413!
352
      auto xskResponderThread = std::thread(dnsdist::xsk::XskResponderThread, shared_from_this(), xskInfo);
×
353
      if (!d_config.d_cpus.empty()) {
×
354
        mapThreadToCPUList(xskResponderThread.native_handle(), d_config.d_cpus);
×
355
      }
×
356
      xskResponderThread.detach();
×
357
    }
×
358
#endif /* HAVE_XSK */
413✔
359

360
    auto tid = std::thread(responderThread, shared_from_this());
413✔
361
    if (!d_config.d_cpus.empty()) {
413!
362
      mapThreadToCPUList(tid.native_handle(), d_config.d_cpus);
×
363
    }
×
364
    tid.detach();
413✔
365
  }
413✔
366
}
416✔
367

368
void DownstreamState::connectUDPSockets()
369
{
415✔
370
  const auto& config = dnsdist::configuration::getImmutableConfiguration();
415✔
371
  if (config.d_randomizeIDsToBackend) {
415✔
372
    idStates.clear();
2✔
373
  }
2✔
374
  else {
413✔
375
    idStates.resize(config.d_maxUDPOutstanding);
413✔
376
  }
413✔
377
  sockets.resize(d_config.d_numberOfSockets);
415✔
378

379
  if (sockets.size() > 1) {
415!
380
    *(mplexer.lock()) = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent(sockets.size()));
×
381
  }
×
382

383
  for (auto& fd : sockets) {
415✔
384
    fd = -1;
415✔
385
  }
415✔
386

387
  reconnect(true);
415✔
388
}
415✔
389

390
DownstreamState::~DownstreamState()
391
{
572✔
392
  for (auto& fd : sockets) {
572✔
393
    if (fd >= 0) {
2!
394
      close(fd);
2✔
395
      fd = -1;
2✔
396
    }
2✔
397
  }
2✔
398
}
572✔
399

400
void DownstreamState::incCurrentConnectionsCount()
401
{
2,583✔
402
  auto currentConnectionsCount = ++tcpCurrentConnections;
2,583✔
403
  if (currentConnectionsCount > tcpMaxConcurrentConnections) {
2,583✔
404
    tcpMaxConcurrentConnections.store(currentConnectionsCount);
865✔
405
  }
865✔
406
}
2,583✔
407

408
int DownstreamState::pickSocketForSending()
409
{
2,671✔
410
  size_t numberOfSockets = sockets.size();
2,671✔
411
  if (numberOfSockets == 1) {
2,671!
412
    return sockets[0];
2,671✔
413
  }
2,671✔
414

415
  size_t idx{0};
×
416
  if (dnsdist::configuration::getImmutableConfiguration().d_randomizeUDPSocketsToBackend) {
×
417
    idx = dnsdist::getRandomValue(numberOfSockets);
×
418
  }
×
419
  else {
×
420
    idx = socketsOffset++;
×
421
  }
×
422

423
  return sockets[idx % numberOfSockets];
×
424
}
2,671✔
425

426
void DownstreamState::pickSocketsReadyForReceiving(std::vector<int>& ready)
427
{
3,080✔
428
  ready.clear();
3,080✔
429

430
  if (sockets.size() == 1) {
3,080!
431
    ready.push_back(sockets[0]);
3,080✔
432
    return ;
3,080✔
433
  }
3,080✔
434

435
  (*mplexer.lock())->getAvailableFDs(ready, 1000);
×
436
}
×
437

438
static bool isIDSExpired(const IDState& ids, uint8_t udpTimeout)
439
{
42✔
440
  auto age = ids.age.load();
42✔
441
  return age > udpTimeout;
42✔
442
}
42✔
443

444
void DownstreamState::handleUDPTimeout(IDState& ids)
445
{
7✔
446
  ids.age = 0;
7✔
447
  ids.inUse = false;
7✔
448
  ++reuseds;
7✔
449
  --outstanding;
7✔
450
  ++dnsdist::metrics::g_stats.downstreamTimeouts; // this is an 'actively' discovered timeout
7✔
451
  vinfolog("Had a downstream timeout from %s (%s) for query for %s|%s from %s",
7✔
452
           d_config.remote.toStringWithPort(), getName(),
2✔
453
           ids.internal.qname.toLogString(), QType(ids.internal.qtype).toString(), ids.internal.origRemote.toStringWithPort());
2✔
454

455
  const auto& chains = dnsdist::configuration::getCurrentRuntimeConfiguration().d_ruleChains;
7✔
456
  const auto& timeoutRespRules = dnsdist::rules::getResponseRuleChain(chains, dnsdist::rules::ResponseRuleChain::TimeoutResponseRules);
7✔
457
  auto sender = ids.internal.du == nullptr ? nullptr : ids.internal.du->getQuerySender();
7✔
458
  if (!handleTimeoutResponseRules(timeoutRespRules, ids.internal, shared_from_this(), sender)) {
7✔
459
    DOHUnitInterface::handleTimeout(std::move(ids.internal.du));
5✔
460
  }
5✔
461

462
  if (g_rings.shouldRecordResponses()) {
7!
463
    timespec now{};
7✔
464
    gettime(&now);
7✔
465

466
    dnsheader fake{};
7✔
467
    memset(&fake, 0, sizeof(fake));
7✔
468
    fake.id = ids.internal.origID;
7✔
469
    uint16_t* flags = getFlagsFromDNSHeader(&fake);
7✔
470
    *flags = ids.internal.origFlags;
7✔
471

472
    g_rings.insertResponse(now, ids.internal.origRemote, ids.internal.qname, ids.internal.qtype, std::numeric_limits<unsigned int>::max(), 0, fake, d_config.remote, getProtocol());
7✔
473
  }
7✔
474

475
  reportTimeoutOrError();
7✔
476
}
7✔
477

478
void DownstreamState::reportResponse(uint8_t rcode)
479
{
36,345✔
480
  if (d_config.d_availability == Availability::Auto && d_config.d_healthCheckMode == HealthCheckMode::Lazy && d_config.d_lazyHealthCheckSampleSize > 0) {
36,345!
481
    bool failure = d_config.d_lazyHealthCheckMode == LazyHealthCheckMode::TimeoutOrServFail ? rcode == RCode::ServFail : false;
654!
482
    d_lazyHealthCheckStats.lock()->d_lastResults.push_back(failure);
654✔
483
  }
654✔
484
}
36,345✔
485

486
void DownstreamState::reportTimeoutOrError()
487
{
213✔
488
  if (d_config.d_availability == Availability::Auto && d_config.d_healthCheckMode == HealthCheckMode::Lazy && d_config.d_lazyHealthCheckSampleSize > 0) {
213!
489
    d_lazyHealthCheckStats.lock()->d_lastResults.push_back(true);
108✔
490
  }
108✔
491
}
213✔
492

493
void DownstreamState::handleUDPTimeouts()
494
{
2,017✔
495
  if (getProtocol() != dnsdist::Protocol::DoUDP) {
2,017✔
496
    return;
217✔
497
  }
217✔
498

499
  const auto& config = dnsdist::configuration::getImmutableConfiguration();
1,800✔
500
  const auto udpTimeout = d_config.udpTimeout > 0 ? d_config.udpTimeout : config.d_udpTimeout;
1,800✔
501
  if (config.d_randomizeIDsToBackend) {
1,800✔
502
    auto map = d_idStatesMap.lock();
4✔
503
    for (auto it = map->begin(); it != map->end(); ) {
4!
504
      auto& ids = it->second;
×
505
      if (isIDSExpired(ids, udpTimeout)) {
×
506
        handleUDPTimeout(ids);
×
507
        it = map->erase(it);
×
508
        continue;
×
509
      }
×
510
      ++ids.age;
×
511
      ++it;
×
512
    }
×
513
  }
4✔
514
  else {
1,796✔
515
    if (outstanding.load() > 0) {
1,796✔
516
      for (IDState& ids : idStates) {
2,293,725✔
517
        if (!ids.isInUse()) {
2,293,725✔
518
          continue;
2,293,690✔
519
        }
2,293,690✔
520
        if (!isIDSExpired(ids, udpTimeout)) {
35✔
521
          ++ids.age;
28✔
522
          continue;
28✔
523
        }
28✔
524
        auto guard = ids.acquire();
7✔
525
        if (!guard) {
7!
526
          continue;
×
527
        }
×
528
        /* check again, now that we have locked this state */
529
        if (ids.isInUse() && isIDSExpired(ids, udpTimeout)) {
7!
530
          handleUDPTimeout(ids);
7✔
531
        }
7✔
532
      }
7✔
533
    }
35✔
534
  }
1,796✔
535
}
1,800✔
536

537
uint16_t DownstreamState::saveState(InternalQueryState&& state)
538
{
2,671✔
539
  const auto& config = dnsdist::configuration::getImmutableConfiguration();
2,671✔
540
  if (config.d_randomizeIDsToBackend) {
2,671✔
541
    /* if the state is already in use we will retry,
542
       up to 5 five times. The last selected one is used
543
       even if it was already in use */
544
    size_t remainingAttempts = 5;
20✔
545
    auto map = d_idStatesMap.lock();
20✔
546

547
    do {
20✔
548
      uint16_t selectedID = dnsdist::getRandomValue(std::numeric_limits<uint16_t>::max());
20✔
549
      auto [it, inserted] = map->emplace(selectedID, IDState());
20✔
550

551
      if (!inserted) {
20!
552
        remainingAttempts--;
×
553
        if (remainingAttempts > 0) {
×
554
          continue;
×
555
        }
×
556

557
        auto oldDU = std::move(it->second.internal.du);
×
558
        ++reuseds;
×
559
        ++dnsdist::metrics::g_stats.downstreamTimeouts;
×
560
        DOHUnitInterface::handleTimeout(std::move(oldDU));
×
561
      }
×
562
      else {
20✔
563
        ++outstanding;
20✔
564
      }
20✔
565

566
      it->second.internal = std::move(state);
20✔
567
      it->second.age.store(0);
20✔
568

569
      return it->first;
20✔
570
    }
20✔
571
    while (true);
20✔
572
  }
20✔
573

574
  do {
2,651✔
575
    uint16_t selectedID = (idOffset++) % idStates.size();
2,651✔
576
    IDState& ids = idStates[selectedID];
2,651✔
577
    auto guard = ids.acquire();
2,651✔
578
    if (!guard) {
2,651!
579
      continue;
×
580
    }
×
581
    if (ids.isInUse()) {
2,651!
582
      /* we are reusing a state, no change in outstanding but if there was an existing DOHUnit we need
583
         to handle it because it's about to be overwritten. */
584
      auto oldDU = std::move(ids.internal.du);
×
585
      ++reuseds;
×
586
      ++dnsdist::metrics::g_stats.downstreamTimeouts;
×
587
      DOHUnitInterface::handleTimeout(std::move(oldDU));
×
588
    }
×
589
    else {
2,651✔
590
      ++outstanding;
2,651✔
591
    }
2,651✔
592
    ids.internal = std::move(state);
2,651✔
593
    ids.age.store(0);
2,651✔
594
    ids.inUse = true;
2,651✔
595
    return selectedID;
2,651✔
596
  }
2,651✔
597
  while (true);
2,651✔
598
}
2,651✔
599

600
void DownstreamState::restoreState(uint16_t id, InternalQueryState&& state)
601
{
3✔
602
  const auto& config = dnsdist::configuration::getImmutableConfiguration();
3✔
603
  if (config.d_randomizeIDsToBackend) {
3!
604
    auto map = d_idStatesMap.lock();
×
605

606
    auto [it, inserted] = map->emplace(id, IDState());
×
607
    if (!inserted) {
×
608
      /* already used */
609
      ++reuseds;
×
610
      ++dnsdist::metrics::g_stats.downstreamTimeouts;
×
611
      DOHUnitInterface::handleTimeout(std::move(state.du));
×
612
    }
×
613
    else {
×
614
      it->second.internal = std::move(state);
×
615
      ++outstanding;
×
616
    }
×
617
    return;
×
618
  }
×
619

620
  auto& ids = idStates[id];
3✔
621
  auto guard = ids.acquire();
3✔
622
  if (!guard) {
3!
623
    /* already used */
624
    ++reuseds;
×
625
    ++dnsdist::metrics::g_stats.downstreamTimeouts;
×
626
    DOHUnitInterface::handleTimeout(std::move(state.du));
×
627
    return;
×
628
  }
×
629
  if (ids.isInUse()) {
3!
630
    /* already used */
631
    ++reuseds;
×
632
    ++dnsdist::metrics::g_stats.downstreamTimeouts;
×
633
    DOHUnitInterface::handleTimeout(std::move(state.du));
×
634
    return;
×
635
  }
×
636
  ids.internal = std::move(state);
3✔
637
  ids.inUse = true;
3✔
638
  ++outstanding;
3✔
639
}
3✔
640

641
std::optional<InternalQueryState> DownstreamState::getState(uint16_t id)
642
{
2,664✔
643
  std::optional<InternalQueryState> result = std::nullopt;
2,664✔
644
  const auto& config = dnsdist::configuration::getImmutableConfiguration();
2,664✔
645
  if (config.d_randomizeIDsToBackend) {
2,664✔
646
    auto map = d_idStatesMap.lock();
20✔
647

648
    auto it = map->find(id);
20✔
649
    if (it == map->end()) {
20!
650
      return result;
×
651
    }
×
652

653
    result = std::move(it->second.internal);
20✔
654
    map->erase(it);
20✔
655
    --outstanding;
20✔
656
    return result;
20✔
657
  }
20✔
658

659
  if (id > idStates.size()) {
2,644!
660
    return result;
×
661
  }
×
662

663
  auto& ids = idStates[id];
2,644✔
664
  auto guard = ids.acquire();
2,644✔
665
  if (!guard) {
2,644!
666
    return result;
×
667
  }
×
668

669
  if (ids.isInUse()) {
2,644!
670
    result = std::move(ids.internal);
2,644✔
671
    --outstanding;
2,644✔
672
  }
2,644✔
673
  ids.inUse = false;
2,644✔
674
  return result;
2,644✔
675
}
2,644✔
676

677
bool DownstreamState::healthCheckRequired(std::optional<time_t> currentTime)
678
{
2,095✔
679
  if (d_config.d_availability != DownstreamState::Availability::Auto) {
2,095✔
680
    return false;
498✔
681
  }
498✔
682

683
  if (d_config.d_healthCheckMode == DownstreamState::HealthCheckMode::Lazy) {
1,597✔
684
    auto stats = d_lazyHealthCheckStats.lock();
93✔
685
    if (stats->d_status == LazyHealthCheckStats::LazyStatus::PotentialFailure) {
93✔
686
      vinfolog("Sending health-check query for %s which is still in the Potential Failure state", getNameWithAddr());
15!
687
      return true;
15✔
688
    }
15✔
689
    if (stats->d_status == LazyHealthCheckStats::LazyStatus::Failed) {
78✔
690
      auto now = currentTime ? *currentTime : time(nullptr);
18!
691
      if (stats->d_nextCheck <= now) {
18✔
692
        /* we update the next check time here because the check might time out,
693
           and we do not want to send a second check during that time unless
694
           the timer is actually very short */
695
        vinfolog("Sending health-check query for %s which is still in the Failed state", getNameWithAddr());
15!
696
        updateNextLazyHealthCheck(*stats, true, now);
15✔
697
        return true;
15✔
698
      }
15✔
699
      return false;
3✔
700
    }
18✔
701
    if (stats->d_status == LazyHealthCheckStats::LazyStatus::Healthy) {
60!
702
      auto& lastResults = stats->d_lastResults;
60✔
703
      size_t totalCount = lastResults.size();
60✔
704
      if (totalCount < d_config.d_lazyHealthCheckMinSampleCount) {
60✔
705
        return false;
48✔
706
      }
48✔
707

708
      size_t failures = 0;
12✔
709
      for (const auto& result : lastResults) {
162✔
710
        if (result) {
162✔
711
          ++failures;
114✔
712
        }
114✔
713
      }
162✔
714

715
      const auto maxFailureRate = static_cast<float>(d_config.d_lazyHealthCheckThreshold);
12✔
716
      auto current = (100.0 * failures) / totalCount;
12✔
717
      if (current >= maxFailureRate) {
12!
718
        lastResults.clear();
12✔
719
        vinfolog("Backend %s reached the lazy health-check threshold (%f%% out of %f%%, looking at sample of %d items with %d failures), moving to Potential Failure state", getNameWithAddr(), current, maxFailureRate, totalCount, failures);
12✔
720
        stats->d_status = LazyHealthCheckStats::LazyStatus::PotentialFailure;
12✔
721
        consecutiveSuccessfulChecks = 0;
12✔
722
        /* we update the next check time here because the check might time out,
723
           and we do not want to send a second check during that time unless
724
           the timer is actually very short */
725
        updateNextLazyHealthCheck(*stats, true);
12✔
726
        return true;
12✔
727
      }
12✔
728
    }
12✔
729

730
    return false;
×
731
  }
60✔
732
  if (d_config.d_healthCheckMode == DownstreamState::HealthCheckMode::Active) {
1,504!
733

734
    if (d_nextCheck > 1) {
1,504✔
735
      --d_nextCheck;
8✔
736
      return false;
8✔
737
    }
8✔
738

739
    d_nextCheck = d_config.checkInterval;
1,496✔
740
    return true;
1,496✔
741
  }
1,504✔
742

743
  return false;
×
744
}
1,504✔
745

746
time_t DownstreamState::getNextLazyHealthCheck()
747
{
24✔
748
  auto stats = d_lazyHealthCheckStats.lock();
24✔
749
  return stats->d_nextCheck;
24✔
750
}
24✔
751

752
void DownstreamState::updateNextLazyHealthCheck(LazyHealthCheckStats& stats, bool checkScheduled, std::optional<time_t> currentTime)
753
{
57✔
754
  auto now = currentTime ? * currentTime : time(nullptr);
57✔
755
  if (d_config.d_lazyHealthCheckUseExponentialBackOff) {
57✔
756
    if (stats.d_status == DownstreamState::LazyHealthCheckStats::LazyStatus::PotentialFailure) {
33✔
757
      /* we are still in the "up" state, we need to send the next query quickly to
758
         determine if the backend is really down */
759
      stats.d_nextCheck = now + d_config.checkInterval;
3✔
760
      vinfolog("Backend %s is in potential failure state, next check in %d seconds", getNameWithAddr(), d_config.checkInterval.load());
3!
761
    }
3✔
762
    else if (consecutiveSuccessfulChecks > 0) {
30✔
763
      /* we are in 'Failed' state, but just had one (or more) successful check,
764
         so we want the next one to happen quite quickly as the backend might
765
         be available again. */
766
      stats.d_nextCheck = now + d_config.d_lazyHealthCheckFailedInterval;
12✔
767
      if (!checkScheduled) {
12!
768
        vinfolog("Backend %s is in failed state but had %d consecutive successful checks, next check in %d seconds", getNameWithAddr(), std::to_string(consecutiveSuccessfulChecks), d_config.d_lazyHealthCheckFailedInterval);
12!
769
      }
12✔
770
    }
12✔
771
    else {
18✔
772
      uint16_t failedTests = currentCheckFailures;
18✔
773
      if (checkScheduled) {
18✔
774
        /* we are planning the check after that one, which will only
775
           occur if there is a failure */
776
        failedTests++;
15✔
777
      }
15✔
778

779
      time_t backOff = d_config.d_lazyHealthCheckMaxBackOff;
18✔
780
      const ExponentialBackOffTimer backOffTimer(d_config.d_lazyHealthCheckMaxBackOff);
18✔
781
      auto backOffCoeffTmp = backOffTimer.get(failedTests - 1);
18✔
782
      /* backOffCoeffTmp cannot be higher than d_config.d_lazyHealthCheckMaxBackOff */
783
      const auto backOffCoeff = static_cast<time_t>(backOffCoeffTmp);
18✔
784
      if ((std::numeric_limits<time_t>::max() / d_config.d_lazyHealthCheckFailedInterval) >= backOffCoeff) {
18!
785
        backOff = d_config.d_lazyHealthCheckFailedInterval * backOffCoeff;
18✔
786
        if (backOff > d_config.d_lazyHealthCheckMaxBackOff || (std::numeric_limits<time_t>::max() - now) <= backOff) {
18!
787
          backOff = d_config.d_lazyHealthCheckMaxBackOff;
×
788
        }
×
789
      }
18✔
790

791
      stats.d_nextCheck = now + backOff;
18✔
792
      vinfolog("Backend %s is in failed state and has failed %d consecutive checks, next check in %d seconds", getNameWithAddr(), failedTests, backOff);
18!
793
    }
18✔
794
  }
33✔
795
  else {
24✔
796
    stats.d_nextCheck = now + d_config.d_lazyHealthCheckFailedInterval;
24✔
797
    vinfolog("Backend %s is in %s state, next check in %d seconds", getNameWithAddr(), (stats.d_status == DownstreamState::LazyHealthCheckStats::LazyStatus::PotentialFailure ? "potential failure" : "failed"), d_config.d_lazyHealthCheckFailedInterval);
24!
798
  }
24✔
799
}
57✔
800

801
void DownstreamState::submitHealthCheckResult(bool initial, bool newResult)
802
{
1,958✔
803
  if (!newResult) {
1,958✔
804
    ++d_healthCheckMetrics.d_failures;
392✔
805
  }
392✔
806

807
  if (initial) {
1,958✔
808
    /* if this is the initial health-check, at startup, we do not care
809
       about the minimum number of failed/successful health-checks */
810
    if (!IsAnyAddress(d_config.remote)) {
366✔
811
      infolog("Marking downstream %s as '%s'", getNameWithAddr(), newResult ? "up" : "down");
363✔
812
    }
363✔
813
    setUpStatus(newResult);
366✔
814
    if (newResult == false) {
366✔
815
      currentCheckFailures++;
4✔
816
      if (d_config.d_healthCheckMode == DownstreamState::HealthCheckMode::Lazy) {
4!
817
        auto stats = d_lazyHealthCheckStats.lock();
×
818
        stats->d_status = LazyHealthCheckStats::LazyStatus::Failed;
×
819
        updateNextLazyHealthCheck(*stats, false);
×
820
      }
×
821
    }
4✔
822
    handleServerStateChange(getNameWithAddr(), newResult);
366✔
823
    return;
366✔
824
  }
366✔
825

826
  bool newState = newResult;
1,592✔
827

828
  if (newResult) {
1,592✔
829
    /* check succeeded */
830
    currentCheckFailures = 0;
1,204✔
831
    consecutiveSuccessfulChecks++;
1,204✔
832

833
    if (!upStatus.load(std::memory_order_relaxed)) {
1,204✔
834
      /* we were previously marked as "down" and had a successful health-check,
835
         let's see if this is enough to move to the "up" state or if we need
836
         more successful health-checks for that */
837
      if (consecutiveSuccessfulChecks < d_config.minRiseSuccesses) {
60✔
838
        /* we need more than one successful check to rise
839
           and we didn't reach the threshold yet, let's stay down */
840
        newState = false;
37✔
841

842
        if (d_config.d_healthCheckMode == DownstreamState::HealthCheckMode::Lazy) {
37✔
843
          auto stats = d_lazyHealthCheckStats.lock();
24✔
844
          updateNextLazyHealthCheck(*stats, false);
24✔
845
        }
24✔
846
      }
37✔
847
    }
60✔
848

849
    if (newState) {
1,204✔
850
      if (d_config.d_healthCheckMode == DownstreamState::HealthCheckMode::Lazy) {
1,167✔
851
        auto stats = d_lazyHealthCheckStats.lock();
12✔
852
        vinfolog("Backend %s had %d successful checks, moving to Healthy", getNameWithAddr(), std::to_string(consecutiveSuccessfulChecks));
12✔
853
        stats->d_status = LazyHealthCheckStats::LazyStatus::Healthy;
12✔
854
        stats->d_lastResults.clear();
12✔
855
      }
12✔
856
    }
1,167✔
857
  }
1,204✔
858
  else {
388✔
859
    /* check failed */
860
    consecutiveSuccessfulChecks = 0;
388✔
861

862
    currentCheckFailures++;
388✔
863

864
    if (upStatus.load(std::memory_order_relaxed)) {
388✔
865
      /* we were previously marked as "up" and failed a health-check,
866
         let's see if this is enough to move to the "down" state or if
867
         need more failed checks for that */
868
      if (currentCheckFailures < d_config.maxCheckFailures) {
361✔
869
        /* we need more than one failure to be marked as down,
870
           and we did not reach the threshold yet, let's stay up */
871
        newState = true;
25✔
872
      }
25✔
873
      else if (d_config.d_healthCheckMode == DownstreamState::HealthCheckMode::Lazy) {
336✔
874
        auto stats = d_lazyHealthCheckStats.lock();
6✔
875
        vinfolog("Backend %s failed its health-check, moving from Potential failure to Failed", getNameWithAddr());
6!
876
        stats->d_status = LazyHealthCheckStats::LazyStatus::Failed;
6✔
877
        currentCheckFailures = 1;
6✔
878
        updateNextLazyHealthCheck(*stats, false);
6✔
879
      }
6✔
880
    }
361✔
881
  }
388✔
882

883
  if (newState != upStatus.load(std::memory_order_relaxed)) {
1,592✔
884
    /* we are actually moving to a new state */
885
    if (!IsAnyAddress(d_config.remote)) {
359✔
886
      infolog("Marking downstream %s as '%s'", getNameWithAddr(), newState ? "up" : "down");
335✔
887
    }
335✔
888

889
    if (newState && !isTCPOnly() && (!connected || d_config.reconnectOnUp)) {
359!
890
      newState = reconnect();
12✔
891
    }
12✔
892

893
    setUpStatus(newState);
359✔
894
    if (g_snmpAgent != nullptr && dnsdist::configuration::getImmutableConfiguration().d_snmpTrapsEnabled) {
359!
895
      g_snmpAgent->sendBackendStatusChangeTrap(*this);
1✔
896
    }
1✔
897
    handleServerStateChange(getNameWithAddr(), newResult);
359✔
898
  }
359✔
899
}
1,592✔
900

901
#ifdef HAVE_XSK
902
[[nodiscard]] ComboAddress DownstreamState::pickSourceAddressForSending()
903
{
×
904
  if (!connected) {
×
905
    waitUntilConnected();
×
906
  }
×
907

908
  auto addresses = d_socketSourceAddresses.read_lock();
×
909
  auto numberOfAddresses = addresses->size();
×
910
  if (numberOfAddresses == 0) {
×
911
    throw std::runtime_error("No source address available for sending XSK data to backend " + getNameWithAddr());
×
912
  }
×
913
  size_t idx = dnsdist::getRandomValue(numberOfAddresses);
×
914
  return (*addresses)[idx % numberOfAddresses];
×
915
}
×
916

917
void DownstreamState::registerXsk(std::vector<std::shared_ptr<XskSocket>>& xsks)
918
{
×
919
  d_xskSockets = xsks;
×
920

921
  if (d_config.sourceAddr.sin4.sin_family == 0 || (IsAnyAddress(d_config.sourceAddr))) {
×
922
    const auto& ifName = xsks.at(0)->getInterfaceName();
×
923
    auto addresses = getListOfAddressesOfNetworkInterface(ifName);
×
924
    if (addresses.empty()) {
×
925
      throw std::runtime_error("Unable to get source address from interface " + ifName);
×
926
    }
×
927

928
    if (addresses.size() > 1) {
×
929
      warnlog("More than one address configured on interface %s, picking the first one (%s) for XSK. Set the 'source' parameter on 'newServer' if you want to use a different address.", ifName, addresses.at(0).toString());
×
930
    }
×
931
    d_config.sourceAddr = addresses.at(0);
×
932
  }
×
933
  d_config.sourceMACAddr = d_xskSockets.at(0)->getSourceMACAddress();
×
934

935
  for (auto& xsk : d_xskSockets) {
×
936
    auto xskInfo = XskWorker::create(XskWorker::Type::Bidirectional, xsk->sharedEmptyFrameOffset);
×
937
    d_xskInfos.push_back(xskInfo);
×
938
    xsk->addWorker(xskInfo);
×
939
  }
×
940
  reconnect(false);
×
941
}
×
942
#endif /* HAVE_XSK */
943

944
bool DownstreamState::parseSourceParameter(const std::string& source, DownstreamState::Config& config)
945
{
4✔
946
  /* handle source in the following forms:
947
     - v4 address ("192.0.2.1")
948
     - v6 address ("2001:DB8::1")
949
     - interface name ("eth0")
950
     - v4 address and interface name ("192.0.2.1@eth0")
951
     - v6 address and interface name ("2001:DB8::1@eth0")
952
  */
953
  std::string::size_type pos = source.find('@');
4✔
954
  if (pos == std::string::npos) {
4!
955
    /* no '@', try to parse that as a valid v4/v6 address */
956
    try {
×
957
      config.sourceAddr = ComboAddress(source);
×
958
      return true;
×
959
    }
×
960
    catch (...) {
×
961
    }
×
962
  }
×
963

964
  /* try to parse as interface name, or v4/v6@itf */
965
  config.sourceItfName = source.substr(pos == std::string::npos ? 0 : pos + 1);
4!
966
  unsigned int itfIdx = if_nametoindex(config.sourceItfName.c_str());
4✔
967
  if (itfIdx != 0) {
4!
968
    if (pos == 0 || pos == std::string::npos) {
4!
969
      /* "eth0" or "@eth0" */
970
      config.sourceItf = itfIdx;
×
971
    }
×
972
    else {
4✔
973
      /* "192.0.2.1@eth0" */
974
      config.sourceAddr = ComboAddress(source.substr(0, pos));
4✔
975
      config.sourceItf = itfIdx;
4✔
976
    }
4✔
977
#ifdef SO_BINDTODEVICE
4✔
978
    if (!dnsdist::configuration::isImmutableConfigurationDone()) {
4!
979
      /* we need to retain CAP_NET_RAW to be able to set SO_BINDTODEVICE in the health checks */
980
      dnsdist::configuration::updateImmutableConfiguration([](dnsdist::configuration::ImmutableConfiguration& currentConfig) {
4✔
981
        currentConfig.d_capabilitiesToRetain.insert("CAP_NET_RAW");
4✔
982
      });
4✔
983
    }
4✔
984
#endif
4✔
985
    return true;
4✔
986
  }
4✔
987

988
  warnlog("Dismissing source %s because '%s' is not a valid interface name", source, config.sourceItfName);
×
989
  return false;
×
990
}
4✔
991

992
bool DownstreamState::parseAvailabilityConfigFromStr(DownstreamState::Config& config, const std::string& str)
993
{
81✔
994
  if (pdns_iequals(str, "auto")) {
81✔
995
    config.d_availability = DownstreamState::Availability::Auto;
65✔
996
    config.d_healthCheckMode = DownstreamState::HealthCheckMode::Active;
65✔
997
    return true;
65✔
998
  }
65✔
999
  if (pdns_iequals(str, "lazy")) {
16✔
1000
    config.d_availability = DownstreamState::Availability::Auto;
12✔
1001
    config.d_healthCheckMode = DownstreamState::HealthCheckMode::Lazy;
12✔
1002
    return true;
12✔
1003
  }
12✔
1004
  if (pdns_iequals(str, "up")) {
4!
1005
    config.d_availability = DownstreamState::Availability::Up;
4✔
1006
    return true;
4✔
1007
  }
4✔
1008
  if (pdns_iequals(str, "down")) {
×
1009
    config.d_availability = DownstreamState::Availability::Down;
×
1010
    return true;
×
1011
  }
×
1012
  return false;
×
1013
}
×
1014

1015
unsigned int DownstreamState::getQPSLimit() const
1016
{
293✔
1017
  return d_qpsLimiter ? d_qpsLimiter->getRate() : 0U;
293!
1018
}
293✔
1019

1020
size_t ServerPool::countServers(bool upOnly) const
1021
{
501✔
1022
  size_t count = 0;
501✔
1023
  for (const auto& server : d_servers) {
532✔
1024
    if (!upOnly || std::get<1>(server)->isUp() ) {
532✔
1025
      count++;
516✔
1026
    }
516✔
1027
  }
532✔
1028

1029
  return count;
501✔
1030
}
501✔
1031

1032
size_t ServerPool::poolLoad() const
1033
{
15✔
1034
  size_t load = 0;
15✔
1035
  for (const auto& server : d_servers) {
21✔
1036
    size_t serverOutstanding = std::get<1>(server)->outstanding.load();
21✔
1037
    load += serverOutstanding;
21✔
1038
  }
21✔
1039
  return load;
15✔
1040
}
15✔
1041

1042
bool ServerPool::hasAtLeastOneServerAvailable() const
1043
{
24✔
1044
  // NOLINTNEXTLINE(readability-use-anyofallof): no it's not more readable
1045
  for (const auto& server : d_servers) {
24✔
1046
    if (std::get<1>(server)->isUp()) {
21✔
1047
      return true;
12✔
1048
    }
12✔
1049
  }
21✔
1050
  return false;
12✔
1051
}
24✔
1052

1053
const ServerPolicy::NumberedServerVector& ServerPool::getServers() const
1054
{
11,300✔
1055
  return d_servers;
11,300✔
1056
}
11,300✔
1057

1058
void ServerPool::addServer(std::shared_ptr<DownstreamState>& server)
1059
{
993✔
1060
  auto count = static_cast<unsigned int>(d_servers.size());
993✔
1061
  d_servers.emplace_back(++count, server);
993✔
1062
  /* we need to reorder based on the server 'order' */
1063
  std::stable_sort(d_servers.begin(), d_servers.end(), [](const std::pair<unsigned int,std::shared_ptr<DownstreamState> >& lhs, const std::pair<unsigned int,std::shared_ptr<DownstreamState> >& rhs) {
993✔
1064
      return lhs.second->d_config.order < rhs.second->d_config.order;
849✔
1065
    });
849✔
1066
  /* and now we need to renumber for Lua (custom policies) */
1067
  size_t idx = 1;
993✔
1068
  for (auto& serv : d_servers) {
1,381✔
1069
    serv.first = idx++;
1,381✔
1070
  }
1,381✔
1071

1072
  updateConsistency();
993✔
1073
}
993✔
1074

1075
void ServerPool::removeServer(shared_ptr<DownstreamState>& server)
1076
{
28✔
1077
  size_t idx = 1;
28✔
1078
  bool found = false;
28✔
1079
  for (auto it = d_servers.begin(); it != d_servers.end();) {
115✔
1080
    if (found) {
87✔
1081
      /* we need to renumber the servers placed
1082
         after the removed one, for Lua (custom policies) */
1083
      it->first = idx++;
33✔
1084
      it++;
33✔
1085
    }
33✔
1086
    else if (it->second == server) {
54✔
1087
      it = d_servers.erase(it);
27✔
1088
      found = true;
27✔
1089
    } else {
36✔
1090
      idx++;
27✔
1091
      it++;
27✔
1092
    }
27✔
1093
  }
87✔
1094

1095
  if (found && !d_isConsistent) {
28✔
1096
    updateConsistency();
10✔
1097
  }
10✔
1098
}
28✔
1099

1100
void ServerPool::updateConsistency()
1101
{
1,046✔
1102
  bool consistent{true};
1,046✔
1103
  bool first{true};
1,046✔
1104
  bool useECS{false};
1,046✔
1105
  bool tcpOnly{false};
1,046✔
1106
  bool zeroScope{true};
1,046✔
1107

1108
  for (const auto& serverPair : d_servers) {
1,455✔
1109
    const auto& server = serverPair.second;
1,455✔
1110
    if (first) {
1,455✔
1111
      first = false;
1,034✔
1112
      useECS = server->d_config.useECS;
1,034✔
1113
      tcpOnly = server->isTCPOnly();
1,034✔
1114
      zeroScope = !server->d_config.disableZeroScope;
1,034✔
1115
      continue;
1,034✔
1116
    }
1,034✔
1117
    if (consistent) {
421!
1118
      if (server->d_config.useECS != useECS) {
421✔
1119
        consistent = false;
22✔
1120
      }
22✔
1121
      if (server->d_config.disableZeroScope == zeroScope) {
421✔
1122
        consistent = false;
12✔
1123
      }
12✔
1124
    }
421✔
1125
    if (server->isTCPOnly() != tcpOnly) {
421✔
1126
      consistent = false;
10✔
1127
      tcpOnly = false;
10✔
1128
    }
10✔
1129
  }
421✔
1130

1131
  d_tcpOnly = tcpOnly;
1,046✔
1132
  if (consistent) {
1,046✔
1133
    /* at this point we know that all servers agree
1134
       on these settings, so let's just use the same
1135
       values for the pool itself */
1136
    d_useECS = useECS;
1,008✔
1137
    d_zeroScope = zeroScope;
1,008✔
1138
  }
1,008✔
1139
  d_isConsistent = consistent;
1,046✔
1140
}
1,046✔
1141

1142
void ServerPool::setZeroScope(bool enabled)
1143
{
16✔
1144
  d_zeroScope = enabled;
16✔
1145
  updateConsistency();
16✔
1146
}
16✔
1147

1148
void ServerPool::setECS(bool useECS)
1149
{
27✔
1150
  d_useECS = useECS;
27✔
1151
  updateConsistency();
27✔
1152
}
27✔
1153

1154
namespace dnsdist::backend
1155
{
1156
void registerNewBackend(std::shared_ptr<DownstreamState>& backend)
1157
{
914✔
1158
  dnsdist::configuration::updateRuntimeConfiguration([&backend](dnsdist::configuration::RuntimeConfiguration& config) {
914✔
1159
    auto& backends = config.d_backends;
914✔
1160
    backends.push_back(backend);
914✔
1161
    std::stable_sort(backends.begin(), backends.end(), [](const std::shared_ptr<DownstreamState>& lhs, const std::shared_ptr<DownstreamState>& rhs) {
1,054✔
1162
      return lhs->d_config.order < rhs->d_config.order;
1,054✔
1163
    });
1,054✔
1164
  });
914✔
1165
}
914✔
1166
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc