• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

PowerDNS / pdns / 19741624072

27 Nov 2025 03:45PM UTC coverage: 73.086% (+0.02%) from 73.065%
19741624072

Pull #16570

github

web-flow
Merge 08a2cdb1d into f94a3f63f
Pull Request #16570: rec: rewrite all unwrap calls in web.rs

38523 of 63408 branches covered (60.75%)

Branch coverage included in aggregate %.

128044 of 164496 relevant lines covered (77.84%)

6531485.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.72
/pdns/dnsdistdist/dnsdist-async.cc
1
/*
2
 * This file is part of PowerDNS or dnsdist.
3
 * Copyright -- PowerDNS.COM B.V. and its contributors
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of version 2 of the GNU General Public License as
7
 * published by the Free Software Foundation.
8
 *
9
 * In addition, for the avoidance of any doubt, permission is granted to
10
 * link this program with OpenSSL and to (re)distribute the binaries
11
 * produced as the result of such linking.
12
 *
13
 * This program is distributed in the hope that it will be useful,
14
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
 * GNU General Public License for more details.
17
 *
18
 * You should have received a copy of the GNU General Public License
19
 * along with this program; if not, write to the Free Software
20
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21
 */
22
#include "dnsdist-async.hh"
23
#include "dnsdist-internal-queries.hh"
24
#include "dolog.hh"
25
#include "mplexer.hh"
26
#include "threadname.hh"
27

28
namespace dnsdist
29
{
30

31
AsynchronousHolder::Data::Data(bool failOpen) :
32
  d_failOpen(failOpen)
412✔
33
{
412✔
34
  auto [notifier, waiter] = pdns::channel::createNotificationQueue(true);
412✔
35
  // NOLINTNEXTLINE(cppcoreguidelines-prefer-member-initializer): how I am supposed to do that?
36
  d_waiter = std::move(waiter);
412✔
37
  // NOLINTNEXTLINE(cppcoreguidelines-prefer-member-initializer): how I am supposed to do that?
38
  d_notifier = std::move(notifier);
412✔
39
}
412✔
40

41
AsynchronousHolder::AsynchronousHolder(bool failOpen) :
42
  d_data(std::make_shared<Data>(failOpen))
412✔
43
{
412✔
44
  std::thread main([data = this->d_data] { mainThread(data); });
412✔
45
  main.detach();
412✔
46
}
412✔
47

48
AsynchronousHolder::~AsynchronousHolder()
49
{
12✔
50
  try {
12✔
51
    stop();
12✔
52
  }
12✔
53
  catch (...) {
12✔
54
  }
×
55
}
12✔
56

57
bool AsynchronousHolder::notify() const
58
{
804✔
59
  return d_data->d_notifier.notify();
804✔
60
}
804✔
61

62
bool AsynchronousHolder::wait(AsynchronousHolder::Data& data, FDMultiplexer& mplexer, std::vector<int>& readyFDs, int atMostMs)
63
{
854✔
64
  readyFDs.clear();
854✔
65
  mplexer.getAvailableFDs(readyFDs, atMostMs);
854✔
66
  if (readyFDs.empty()) {
854✔
67
    /* timeout */
68
    return true;
62✔
69
  }
62✔
70

71
  data.d_waiter.clear();
792✔
72
  return false;
792✔
73
}
854✔
74

75
void AsynchronousHolder::stop()
76
{
424✔
77
  {
424✔
78
    auto content = d_data->d_content.lock();
424✔
79
    d_data->d_done = true;
424✔
80
  }
424✔
81

82
  notify();
424✔
83
}
424✔
84

85
// NOLINTNEXTLINE(performance-unnecessary-value-param): this is a long-lived thread, and we want to make sure the reference count of the shared pointer has been increased
86
void AsynchronousHolder::mainThread(std::shared_ptr<Data> data)
87
{
412✔
88
  setThreadName("dnsdist/async");
412✔
89
  struct timeval now{};
412✔
90
  std::list<std::pair<uint16_t, std::unique_ptr<CrossProtocolQuery>>> expiredEvents;
412✔
91

92
  auto mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent(1));
412✔
93
  mplexer->addReadFD(data->d_waiter.getDescriptor(), [](int, FDMultiplexer::funcparam_t&) {});
412✔
94
  std::vector<int> readyFDs;
412✔
95

96
  while (true) {
1,269✔
97
    bool shouldWait = true;
1,269✔
98
    int timeout = -1;
1,269✔
99
    dnsdist::configuration::refreshLocalRuntimeConfiguration();
1,269✔
100

101
    {
1,269✔
102
      auto content = data->d_content.lock();
1,269✔
103
      if (data->d_done) {
1,269✔
104
        return;
412✔
105
      }
412✔
106

107
      if (!content->empty()) {
857✔
108
        gettimeofday(&now, nullptr);
378✔
109
        struct timeval next = getNextTTD(*content);
378✔
110
        if (next <= now) {
378✔
111
          pickupExpired(*content, now, expiredEvents);
3✔
112
          shouldWait = false;
3✔
113
        }
3✔
114
        else {
375✔
115
          auto remainingUsec = uSec(next - now);
375✔
116
          timeout = static_cast<int>(std::round(static_cast<double>(remainingUsec) / 1000.0));
375✔
117
          if (timeout == 0 && remainingUsec > 0) {
375!
118
            /* if we have less than 1 ms, let's wait at least 1 ms */
119
            timeout = 1;
1✔
120
          }
1✔
121
        }
375✔
122
      }
378✔
123
    }
857✔
124

125
    if (shouldWait) {
857✔
126
      auto timedOut = wait(*data, *mplexer, readyFDs, timeout);
854✔
127
      if (timedOut) {
854✔
128
        auto content = data->d_content.lock();
62✔
129
        gettimeofday(&now, nullptr);
62✔
130
        pickupExpired(*content, now, expiredEvents);
62✔
131
      }
62✔
132
    }
854✔
133

134
    while (!expiredEvents.empty()) {
915✔
135
      auto [queryID, query] = std::move(expiredEvents.front());
58✔
136
      expiredEvents.pop_front();
58✔
137
      if (!data->d_failOpen) {
58✔
138
        vinfolog("Asynchronous query %d has expired at %d.%d, notifying the sender", queryID, now.tv_sec, now.tv_usec);
6!
139
        auto sender = query->getTCPQuerySender();
6✔
140
        if (sender) {
6!
141
          TCPResponse tresponse(std::move(query->query));
6✔
142
          sender->notifyIOError(now, std::move(tresponse));
6✔
143
        }
6✔
144
      }
6✔
145
      else {
52✔
146
        vinfolog("Asynchronous query %d has expired at %d.%d, resuming", queryID, now.tv_sec, now.tv_usec);
52!
147
        resumeQuery(std::move(query));
52✔
148
      }
52✔
149
    }
58✔
150
  }
857✔
151
}
412✔
152

153
void AsynchronousHolder::push(uint16_t asyncID, uint16_t queryID, const struct timeval& ttd, std::unique_ptr<CrossProtocolQuery>&& query)
154
{
383✔
155
  bool needNotify = false;
383✔
156
  {
383✔
157
    auto content = d_data->d_content.lock();
383✔
158
    if (!content->empty()) {
383✔
159
      /* the thread is already waiting on a TTD expiry in addition to notifications,
160
         let's not wake it unless our TTD comes before the current one */
161
      const struct timeval next = getNextTTD(*content);
3✔
162
      if (ttd < next) {
3!
163
        needNotify = true;
×
164
      }
×
165
    }
3✔
166
    else {
380✔
167
      /* the thread is currently only waiting for a notify */
168
      needNotify = true;
380✔
169
    }
380✔
170
    content->insert({std::move(query), ttd, asyncID, queryID});
383✔
171
  }
383✔
172

173
  if (needNotify) {
383✔
174
    notify();
380✔
175
  }
380✔
176
}
383✔
177

178
std::unique_ptr<CrossProtocolQuery> AsynchronousHolder::get(uint16_t asyncID, uint16_t queryID)
179
{
334✔
180
  /* no need to notify, worst case the thread wakes up for nothing because this was the next TTD */
181
  auto content = d_data->d_content.lock();
334✔
182
  auto contentIt = content->find(std::tie(queryID, asyncID));
334✔
183
  if (contentIt == content->end()) {
334✔
184
    struct timeval now{};
12✔
185
    gettimeofday(&now, nullptr);
12✔
186
    vinfolog("Asynchronous object %d not found at %d.%d", queryID, now.tv_sec, now.tv_usec);
12!
187
    return nullptr;
12✔
188
  }
12✔
189

190
  auto result = std::move(contentIt->d_query);
322✔
191
  content->erase(contentIt);
322✔
192
  return result;
322✔
193
}
334✔
194

195
void AsynchronousHolder::pickupExpired(content_t& content, const struct timeval& now, std::list<std::pair<uint16_t, std::unique_ptr<CrossProtocolQuery>>>& events)
196
{
65✔
197
  auto& idx = content.get<TTDTag>();
65✔
198
  for (auto contentIt = idx.begin(); contentIt != idx.end() && contentIt->d_ttd < now;) {
123✔
199
    events.emplace_back(contentIt->d_queryID, std::move(contentIt->d_query));
58✔
200
    contentIt = idx.erase(contentIt);
58✔
201
  }
58✔
202
}
65✔
203

204
struct timeval AsynchronousHolder::getNextTTD(const content_t& content)
205
{
381✔
206
  if (content.empty()) {
381!
207
    throw std::runtime_error("AsynchronousHolder::getNextTTD() called on an empty holder");
×
208
  }
×
209

210
  return content.get<TTDTag>().begin()->d_ttd;
381✔
211
}
381✔
212

213
bool AsynchronousHolder::empty()
214
{
39✔
215
  return d_data->d_content.read_only_lock()->empty();
39✔
216
}
39✔
217

218
static bool resumeResponse(std::unique_ptr<CrossProtocolQuery>&& response)
219
{
142✔
220
  try {
142✔
221
    auto& ids = response->query.d_idstate;
142✔
222
    DNSResponse dnsResponse = response->getDR();
142✔
223

224
    auto result = processResponseAfterRules(response->query.d_buffer, dnsResponse, ids.cs->muted);
142✔
225
    if (!result) {
142!
226
      /* easy */
227
      return true;
×
228
    }
×
229

230
    auto sender = response->getTCPQuerySender();
142✔
231
    if (sender) {
142!
232
      struct timeval now{};
142✔
233
      gettimeofday(&now, nullptr);
142✔
234

235
      TCPResponse resp(std::move(response->query.d_buffer), std::move(response->query.d_idstate), nullptr, response->downstream);
142✔
236
      resp.d_async = true;
142✔
237
      sender->handleResponse(now, std::move(resp));
142✔
238
    }
142✔
239
  }
142✔
240
  catch (const std::exception& e) {
142✔
241
    vinfolog("Got exception while resuming cross-protocol response: %s", e.what());
×
242
    return false;
×
243
  }
×
244

245
  return true;
142✔
246
}
142✔
247

248
static LockGuarded<std::deque<std::unique_ptr<CrossProtocolQuery>>> s_asynchronousEventsQueue;
249

250
bool queueQueryResumptionEvent(std::unique_ptr<CrossProtocolQuery>&& query)
251
{
602✔
252
  s_asynchronousEventsQueue.lock()->push_back(std::move(query));
602✔
253
  return true;
602✔
254
}
602✔
255

256
void handleQueuedAsynchronousEvents()
257
{
1,985✔
258
  while (true) {
2,587✔
259
    std::unique_ptr<CrossProtocolQuery> query;
2,587✔
260
    {
2,587✔
261
      // we do not want to hold the lock while resuming
262
      auto queue = s_asynchronousEventsQueue.lock();
2,587✔
263
      if (queue->empty()) {
2,587✔
264
        return;
1,985✔
265
      }
1,985✔
266

267
      query = std::move(queue->front());
602✔
268
      queue->pop_front();
602✔
269
    }
602✔
270
    if (query && !resumeQuery(std::move(query))) {
602!
271
      vinfolog("Unable to resume asynchronous query event");
×
272
    }
×
273
  }
602✔
274
}
1,985✔
275

276
bool resumeQuery(std::unique_ptr<CrossProtocolQuery>&& query)
277
{
654✔
278
  if (query->d_isResponse) {
654✔
279
    return resumeResponse(std::move(query));
142✔
280
  }
142✔
281

282
  DNSQuestion dnsQuestion = query->getDQ();
512✔
283

284
  auto result = processQueryAfterRules(dnsQuestion, query->downstream);
512✔
285
  if (result == ProcessQueryResult::Drop) {
512✔
286
    /* easy */
287
    return true;
12✔
288
  }
12✔
289
  if (result == ProcessQueryResult::PassToBackend) {
500✔
290
    if (query->downstream == nullptr) {
476!
291
      return false;
×
292
    }
×
293

294
#ifdef HAVE_DNS_OVER_HTTPS
476✔
295
    if (dnsQuestion.ids.du != nullptr) {
476✔
296
      dnsQuestion.ids.du->downstream = query->downstream;
55✔
297
    }
55✔
298
#endif
476✔
299

300
    if (query->downstream->isTCPOnly() || !(dnsQuestion.getProtocol().isUDP() || dnsQuestion.getProtocol() == dnsdist::Protocol::DoH)) {
476✔
301
      query->downstream->passCrossProtocolQuery(std::move(query));
128✔
302
      return true;
128✔
303
    }
128✔
304

305
    auto queryID = dnsQuestion.getHeader()->id;
348✔
306
    /* at this point 'du', if it is not nullptr, is owned by the DoHCrossProtocolQuery
307
       which will stop existing when we return, so we need to increment the reference count
308
    */
309
    return assignOutgoingUDPQueryToBackend(query->downstream, queryID, dnsQuestion, query->query.d_buffer);
348✔
310
  }
476✔
311
  if (result == ProcessQueryResult::SendAnswer) {
24!
312
    auto sender = query->getTCPQuerySender();
24✔
313
    if (!sender) {
24!
314
      return false;
×
315
    }
×
316

317
    struct timeval now{};
24✔
318
    gettimeofday(&now, nullptr);
24✔
319

320
    TCPResponse response(std::move(query->query.d_buffer), std::move(query->query.d_idstate), nullptr, query->downstream);
24✔
321
    response.d_async = true;
24✔
322
    response.d_idstate.selfGenerated = true;
24✔
323

324
    try {
24✔
325
      sender->handleResponse(now, std::move(response));
24✔
326
      return true;
24✔
327
    }
24✔
328
    catch (const std::exception& e) {
24✔
329
      vinfolog("Got exception while resuming cross-protocol self-answered query: %s", e.what());
×
330
      return false;
×
331
    }
×
332
  }
24✔
333
  if (result == ProcessQueryResult::Asynchronous) {
×
334
    /* nope */
335
    errlog("processQueryAfterRules returned 'asynchronous' while trying to resume an already asynchronous query");
×
336
    return false;
×
337
  }
×
338

339
  return false;
×
340
}
×
341

342
bool suspendQuery(DNSQuestion& dnsQuestion, uint16_t asyncID, uint16_t queryID, uint32_t timeoutMs)
343
{
202✔
344
  if (!g_asyncHolder) {
202!
345
    return false;
×
346
  }
×
347

348
  struct timeval now{};
202✔
349
  gettimeofday(&now, nullptr);
202✔
350
  struct timeval ttd = now;
202✔
351
  ttd.tv_sec += timeoutMs / 1000;
202✔
352
  ttd.tv_usec += static_cast<decltype(ttd.tv_usec)>((timeoutMs % 1000) * 1000);
202✔
353
  normalizeTV(ttd);
202✔
354

355
  vinfolog("Suspending asynchronous query %d at %d.%d until %d.%d", queryID, now.tv_sec, now.tv_usec, ttd.tv_sec, ttd.tv_usec);
202!
356
  auto query = getInternalQueryFromDQ(dnsQuestion, false);
202✔
357

358
  g_asyncHolder->push(asyncID, queryID, ttd, std::move(query));
202✔
359
  return true;
202✔
360
}
202✔
361

362
bool suspendResponse(DNSResponse& dnsResponse, uint16_t asyncID, uint16_t queryID, uint32_t timeoutMs)
363
{
166✔
364
  if (!g_asyncHolder) {
166!
365
    return false;
×
366
  }
×
367

368
  struct timeval now{};
166✔
369
  gettimeofday(&now, nullptr);
166✔
370
  struct timeval ttd = now;
166✔
371
  ttd.tv_sec += timeoutMs / 1000;
166✔
372
  ttd.tv_usec += static_cast<decltype(ttd.tv_usec)>((timeoutMs % 1000) * 1000);
166✔
373
  normalizeTV(ttd);
166✔
374

375
  vinfolog("Suspending asynchronous response %d at %d.%d until %d.%d", queryID, now.tv_sec, now.tv_usec, ttd.tv_sec, ttd.tv_usec);
166!
376
  auto query = getInternalQueryFromDQ(dnsResponse, true);
166✔
377
  query->d_isResponse = true;
166✔
378
  query->downstream = dnsResponse.d_downstream;
166✔
379

380
  g_asyncHolder->push(asyncID, queryID, ttd, std::move(query));
166✔
381
  return true;
166✔
382
}
166✔
383

384
std::unique_ptr<AsynchronousHolder> g_asyncHolder;
385
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc