• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

PowerDNS / pdns / 19741624072

27 Nov 2025 03:45PM UTC coverage: 73.086% (+0.02%) from 73.065%
19741624072

Pull #16570

github

web-flow
Merge 08a2cdb1d into f94a3f63f
Pull Request #16570: rec: rewrite all unwrap calls in web.rs

38523 of 63408 branches covered (60.75%)

Branch coverage included in aggregate %.

128044 of 164496 relevant lines covered (77.84%)

6531485.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

4.17
/pdns/dnsdistdist/dnsdist-xsk.cc
1
/*
2
 * This file is part of PowerDNS or dnsdist.
3
 * Copyright -- PowerDNS.COM B.V. and its contributors
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of version 2 of the GNU General Public License as
7
 * published by the Free Software Foundation.
8
 *
9
 * In addition, for the avoidance of any doubt, permission is granted to
10
 * link this program with OpenSSL and to (re)distribute the binaries
11
 * produced as the result of such linking.
12
 *
13
 * This program is distributed in the hope that it will be useful,
14
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
 * GNU General Public License for more details.
17
 *
18
 * You should have received a copy of the GNU General Public License
19
 * along with this program; if not, write to the Free Software
20
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21
 */
22
#include "dnsdist.hh"
23
#include "dnsdist-xsk.hh"
24

25
#ifdef HAVE_XSK
26
#include <sys/poll.h>
27

28
#include "dolog.hh"
29
#include "dnsdist-metrics.hh"
30
#include "dnsdist-proxy-protocol.hh"
31
#include "threadname.hh"
32
#include "xsk.hh"
33

34
namespace dnsdist::xsk
35
{
36
std::vector<std::shared_ptr<XskSocket>> g_xsk;
37

38
void XskResponderThread(std::shared_ptr<DownstreamState> dss, std::shared_ptr<XskWorker> xskInfo)
39
{
×
40
  try {
×
41
    setThreadName("dnsdist/XskResp");
×
42
    auto pollfds = getPollFdsForWorker(*xskInfo);
×
43
    while (!dss->isStopped()) {
×
44
      poll(pollfds.data(), pollfds.size(), -1);
×
45
      dnsdist::configuration::refreshLocalRuntimeConfiguration();
×
46
      bool needNotify = false;
×
47
      if ((pollfds[0].revents & POLLIN) != 0) {
×
48
        needNotify = true;
×
49
        xskInfo->cleanSocketNotification();
×
50
        xskInfo->processIncomingFrames([&](XskPacket packet) {
×
51
          if (packet.getDataLen() < sizeof(dnsheader)) {
×
52
            xskInfo->markAsFree(packet);
×
53
            return;
×
54
          }
×
55
          const dnsheader_aligned dnsHeader(packet.getPayloadData());
×
56
          const auto queryId = dnsHeader->id;
×
57
          auto ids = dss->getState(queryId);
×
58
          if (ids) {
×
59
            if (!ids->isXSK()) {
×
60
              dss->restoreState(queryId, std::move(*ids));
×
61
              ids = std::nullopt;
×
62
            }
×
63
          }
×
64
          if (!ids) {
×
65
            xskInfo->markAsFree(packet);
×
66
            return;
×
67
          }
×
68
          auto response = packet.clonePacketBuffer();
×
69
          if (response.size() > packet.getCapacity()) {
×
70
            /* fallback to sending the packet via normal socket */
71
            ids->xskPacketHeader.clear();
×
72
          }
×
73
          if (!processResponderPacket(dss, response, std::move(*ids))) {
×
74
            xskInfo->markAsFree(packet);
×
75
            vinfolog("XSK packet dropped because processResponderPacket failed");
×
76
            return;
×
77
          }
×
78
          if (response.size() > packet.getCapacity()) {
×
79
            /* fallback to sending the packet via normal socket */
80
            sendUDPResponse(ids->cs->udpFD, response, ids->delayMsec, ids->hopLocal, ids->hopRemote);
×
81
            infolog("XSK packet falling back because packet is too large");
×
82
            xskInfo->markAsFree(packet);
×
83
            return;
×
84
          }
×
85
          packet.setHeader(ids->xskPacketHeader);
×
86
          if (!packet.setPayload(response)) {
×
87
            infolog("Unable to set XSK payload !");
×
88
          }
×
89
          if (ids->delayMsec > 0) {
×
90
            packet.addDelay(ids->delayMsec);
×
91
          }
×
92
          packet.updatePacket();
×
93
          xskInfo->pushToSendQueue(packet);
×
94
        });
×
95
      }
×
96
      if (needNotify) {
×
97
        xskInfo->notifyXskSocket();
×
98
      }
×
99
    }
×
100
  }
×
101
  catch (const std::exception& e) {
×
102
    errlog("XSK responder thread died because of exception: %s", e.what());
×
103
  }
×
104
  catch (const PDNSException& e) {
×
105
    errlog("XSK responder thread died because of PowerDNS exception: %s", e.reason);
×
106
  }
×
107
  catch (...) {
×
108
    errlog("XSK responder thread died because of an exception: %s", "unknown");
×
109
  }
×
110
}
×
111

112
bool XskIsQueryAcceptable(const XskPacket& packet, ClientState& clientState, bool& expectProxyProtocol)
113
{
×
114
  const auto& from = packet.getFromAddr();
×
115
  expectProxyProtocol = expectProxyProtocolFrom(from);
×
116
  if (!dnsdist::configuration::getCurrentRuntimeConfiguration().d_ACL.match(from) && !expectProxyProtocol) {
×
117
    vinfolog("Query from %s dropped because of ACL", from.toStringWithPort());
×
118
    ++dnsdist::metrics::g_stats.aclDrops;
×
119
    return false;
×
120
  }
×
121
  clientState.queries++;
×
122
  ++dnsdist::metrics::g_stats.queries;
×
123

124
  return true;
×
125
}
×
126

127
void XskRouter(std::shared_ptr<XskSocket> xsk)
128
{
×
129
  setThreadName("dnsdist/XskRouter");
×
130
  uint32_t failed = 0;
×
131
  // packets to be submitted for sending
132
  vector<XskPacket> fillInTx;
×
133
  const auto& fds = xsk->getDescriptors();
×
134
  // list of workers that need to be notified
135
  std::set<int> needNotify;
×
136
  std::vector<XskPacket> packets;
×
137
  while (true) {
×
138
    try {
×
139
      auto ready = xsk->wait(-1);
×
140
      dnsdist::configuration::refreshLocalRuntimeConfiguration();
×
141
      // descriptor 0 gets incoming AF_XDP packets
142
      if ((fds.at(0).revents & POLLIN) != 0) {
×
143
        xsk->recv(packets, 64, &failed);
×
144
        dnsdist::metrics::g_stats.nonCompliantQueries += failed;
×
145
        for (auto& packet : packets) {
×
146
          const auto dest = packet.getToAddr();
×
147
          auto worker = xsk->getWorkerByDestination(dest);
×
148
          if (!worker) {
×
149
            xsk->markAsFree(packet);
×
150
            continue;
×
151
          }
×
152
          worker->pushToProcessingQueue(packet);
×
153
          needNotify.insert(worker->workerWaker.getHandle());
×
154
        }
×
155
        for (auto socket : needNotify) {
×
156
          uint64_t value = 1;
×
157
          auto written = write(socket, &value, sizeof(value));
×
158
          if (written != sizeof(value)) {
×
159
            // oh, well, the worker is clearly overloaded
160
            // but there is nothing we can do about it,
161
            // and hopefully the queue will be processed eventually
162
          }
×
163
        }
×
164
        needNotify.clear();
×
165
        ready--;
×
166
      }
×
167
      for (size_t fdIndex = 1; fdIndex < fds.size() && ready > 0; fdIndex++) {
×
168
        if ((fds.at(fdIndex).revents & POLLIN) != 0) {
×
169
          ready--;
×
170
          const auto& info = xsk->getWorkerByDescriptor(fds.at(fdIndex).fd);
×
171
          info->processOutgoingFrames([&](XskPacket packet) {
×
172
            if ((packet.getFlags() & XskPacket::UPDATE) == 0) {
×
173
              xsk->markAsFree(packet);
×
174
              return;
×
175
            }
×
176
            if ((packet.getFlags() & XskPacket::DELAY) != 0) {
×
177
              xsk->pushDelayed(packet);
×
178
              return;
×
179
            }
×
180
            fillInTx.push_back(packet);
×
181
          });
×
182
          info->cleanWorkerNotification();
×
183
        }
×
184
      }
×
185
      xsk->pickUpReadyPacket(fillInTx);
×
186
      xsk->recycle(4096);
×
187
      xsk->fillFq();
×
188
      xsk->send(fillInTx);
×
189
    }
×
190
    catch (...) {
×
191
      vinfolog("Exception in XSK router loop");
×
192
    }
×
193
  }
×
194
}
×
195

196
void XskClientThread(ClientState* clientState)
197
{
×
198
  setThreadName("dnsdist/xskClient");
×
199
  auto xskInfo = clientState->xskInfo;
×
200

201
  for (;;) {
×
202
    while (!xskInfo->hasIncomingFrames()) {
×
203
      xskInfo->waitForXskSocket();
×
204
    }
×
205
    dnsdist::configuration::refreshLocalRuntimeConfiguration();
×
206
    xskInfo->processIncomingFrames([&](XskPacket packet) {
×
207
      if (XskProcessQuery(*clientState, packet)) {
×
208
        packet.updatePacket();
×
209
        xskInfo->pushToSendQueue(packet);
×
210
      }
×
211
      else {
×
212
        xskInfo->markAsFree(packet);
×
213
      }
×
214
    });
×
215
    xskInfo->notifyXskSocket();
×
216
  }
×
217
}
×
218

219
static std::string getDestinationMap(bool isV6)
220
{
814✔
221
  return !isV6 ? "/sys/fs/bpf/dnsdist/xsk-destinations-v4" : "/sys/fs/bpf/dnsdist/xsk-destinations-v6";
814!
222
}
814✔
223

224
void addDestinationAddress(const ComboAddress& addr)
225
{
×
226
  auto map = getDestinationMap(addr.isIPv6());
×
227
  XskSocket::addDestinationAddress(map, addr);
×
228
}
×
229

230
void removeDestinationAddress(const ComboAddress& addr)
231
{
×
232
  auto map = getDestinationMap(addr.isIPv6());
×
233
  XskSocket::removeDestinationAddress(map, addr);
×
234
}
×
235

236
void clearDestinationAddresses()
237
{
814✔
238
  auto map = getDestinationMap(false);
814✔
239
  XskSocket::clearDestinationMap(map, false);
814✔
240
  map = getDestinationMap(true);
814✔
241
  XskSocket::clearDestinationMap(map, true);
814✔
242
}
814✔
243

244
}
245
#endif /* HAVE_XSK */
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc