• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

PowerDNS / pdns / 12323094430

13 Dec 2024 09:11PM UTC coverage: 64.759% (-0.02%) from 64.78%
12323094430

Pull #14970

github

web-flow
Merge 3e4597ff7 into 3dfd8e317
Pull Request #14970: boost > std optional

37533 of 88820 branches covered (42.26%)

Branch coverage included in aggregate %.

17 of 19 new or added lines in 4 files covered. (89.47%)

79 existing lines in 16 files now uncovered.

125890 of 163537 relevant lines covered (76.98%)

4110788.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.86
/pdns/distributor.hh
1
/*
2
 * This file is part of PowerDNS or dnsdist.
3
 * Copyright -- PowerDNS.COM B.V. and its contributors
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of version 2 of the GNU General Public License as
7
 * published by the Free Software Foundation.
8
 *
9
 * In addition, for the avoidance of any doubt, permission is granted to
10
 * link this program with OpenSSL and to (re)distribute the binaries
11
 * produced as the result of such linking.
12
 *
13
 * This program is distributed in the hope that it will be useful,
14
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
 * GNU General Public License for more details.
17
 *
18
 * You should have received a copy of the GNU General Public License
19
 * along with this program; if not, write to the Free Software
20
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21
 */
22
#pragma once
23
#ifdef HAVE_CONFIG_H
24
#include "config.h"
25
#endif
26
#include <string>
27
#include <deque>
28
#include <queue>
29
#include <vector>
30
#include <thread>
31
#include "threadname.hh"
32
#include <unistd.h>
33

34
#include "channel.hh"
35
#include "logger.hh"
36
#include "dns.hh"
37
#include "dnsbackend.hh"
38
#include "pdnsexception.hh"
39
#include "arguments.hh"
40
#include <atomic>
41
#include "statbag.hh"
42
#include "gss_context.hh"
43

44
extern StatBag S;
45

46
/** the Distributor template class enables you to multithread slow question/answer 
47
    processes. 
48
    
49
    Questions are posed to the Distributor, which returns the answer via a callback.
50

51
    The Distributor spawns sufficient backends, and if they thrown an exception,
52
    it will cycle the backend but drop the query that was active during the exception.
53
*/
54

55
template<class Answer, class Question, class Backend> class Distributor
56
{
57
public:
58
  static Distributor* Create(int n=1); //!< Create a new Distributor with \param n threads
59
  typedef std::function<void(std::unique_ptr<Answer>&, int)> callback_t;
60
  virtual int question(Question&, callback_t callback) =0; //!< Submit a question to the Distributor
61
  virtual int getQueueSize() =0; //!< Returns length of question queue
62
  virtual bool isOverloaded() =0;
63
  virtual ~Distributor() { cerr<<__func__<<endl;}
×
64
};
65

66
template<class Answer, class Question, class Backend> class SingleThreadDistributor
67
    : public Distributor<Answer, Question, Backend>
68
{
69
public:
70
  SingleThreadDistributor(const SingleThreadDistributor&) = delete;
71
  void operator=(const SingleThreadDistributor&) = delete;
72
  SingleThreadDistributor();
73
  typedef std::function<void(std::unique_ptr<Answer>&, int)> callback_t;
74
  int question(Question&, callback_t callback) override; //!< Submit a question to the Distributor
75
  int getQueueSize() override {
×
76
    return 0;
×
77
  }
×
78

79
  bool isOverloaded() override
80
  {
734✔
81
    return false;
734✔
82
  }
734✔
83

84
private:
85
  std::unique_ptr<Backend> b{nullptr};
86
};
87

88
template<class Answer, class Question, class Backend> class MultiThreadDistributor
89
    : public Distributor<Answer, Question, Backend>
90
{
91
public:
92
  MultiThreadDistributor(const MultiThreadDistributor&) = delete;
93
  void operator=(const MultiThreadDistributor&) = delete;
94
  MultiThreadDistributor(int n);
95
  typedef std::function<void(std::unique_ptr<Answer>&, int)> callback_t;
96
  int question(Question&, callback_t callback) override; //!< Submit a question to the Distributor
97
  void distribute(int n);
98
  int getQueueSize() override {
10✔
99
    return d_queued;
10✔
100
  }
10✔
101

102
  struct QuestionData
103
  {
104
    QuestionData(const Question& query): Q(query)
105
    {
87,274✔
106
      start = Q.d_dt.udiff();
87,274✔
107
    }
87,274✔
108

109
    Question Q;
110
    callback_t callback{nullptr};
111
    int id{0};
112
    int start{0};
113
  };
114

115
  bool isOverloaded() override
116
  {
86,072✔
117
    return d_overloadQueueLength && (d_queued > d_overloadQueueLength);
86,072!
118
  }
86,072✔
119

120
private:
121
  std::vector<pdns::channel::Sender<QuestionData>> d_senders;
122
  std::vector<pdns::channel::Receiver<QuestionData>> d_receivers;
123
  time_t d_last_started{0};
124
  std::atomic<unsigned int> d_queued{0};
125
  unsigned int d_overloadQueueLength{0};
126
  unsigned int d_maxQueueLength{0};
127
  int d_nextid{0};
128
  int d_num_threads{0};
129
};
130

131
template<class Answer, class Question, class Backend> Distributor<Answer,Question,Backend>* Distributor<Answer,Question,Backend>::Create(int n)
132
{
162✔
133
    if( n == 1 )
162!
134
      return new SingleThreadDistributor<Answer,Question,Backend>();
15✔
135
    else
147✔
136
      return new MultiThreadDistributor<Answer,Question,Backend>( n );
147✔
137
}
162✔
138

139
template<class Answer, class Question, class Backend>SingleThreadDistributor<Answer,Question,Backend>::SingleThreadDistributor()
140
{
15✔
141
  g_log<<Logger::Error<<"Only asked for 1 backend thread - operating unthreaded"<<endl;
15✔
142
  try {
15✔
143
    b=make_unique<Backend>();
15✔
144
  }
15✔
145
  catch(const PDNSException &AE) {
15✔
146
    g_log<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl;
×
147
    _exit(1);
×
148
  }
×
149
  catch(const std::exception& e) {
15✔
150
    g_log<<Logger::Error<<"Distributor caught fatal exception: "<<e.what()<<endl;
×
151
    _exit(1);
×
152
  }
×
153
  catch(...) {
15✔
154
    g_log<<Logger::Error<<"Caught an unknown exception when creating backend, probably"<<endl;
×
155
    _exit(1);
×
156
  }
×
157
}
15✔
158

159
template<class Answer, class Question, class Backend>MultiThreadDistributor<Answer,Question,Backend>::MultiThreadDistributor(int numberOfThreads) :
160
  d_last_started(time(nullptr)), d_overloadQueueLength(::arg().asNum("overload-queue-length")), d_maxQueueLength(::arg().asNum("max-queue-length")), d_num_threads(numberOfThreads)
161
{
147✔
162
  if (numberOfThreads < 1) {
147!
163
    g_log<<Logger::Error<<"Asked for fewer than 1 threads, nothing to do"<<endl;
×
164
    _exit(1);
×
165
  }
×
166

167
  for (int distributorIdx = 0; distributorIdx < numberOfThreads; distributorIdx++) {
593✔
168
    auto [sender, receiver] = pdns::channel::createObjectQueue<QuestionData>(pdns::channel::SenderBlockingMode::SenderBlocking, pdns::channel::ReceiverBlockingMode::ReceiverBlocking);
446✔
169
    d_senders.push_back(std::move(sender));
446✔
170
    d_receivers.push_back(std::move(receiver));
446✔
171
  }
446✔
172

173
  g_log<<Logger::Warning<<"About to create "<<numberOfThreads<<" backend threads for UDP"<<endl;
147✔
174

175
  for (int distributorIdx = 0; distributorIdx < numberOfThreads; distributorIdx++) {
593✔
176
    std::thread t([=](){distribute(distributorIdx);});
446✔
177
    t.detach();
446✔
178
    Utility::usleep(50000); // we've overloaded mysql in the past :-)
446✔
179
  }
446✔
180
  g_log<<Logger::Warning<<"Done launching threads, ready to distribute questions"<<endl;
147✔
181
}
147✔
182

183

184
// start of a new thread
185
template<class Answer, class Question, class Backend>void MultiThreadDistributor<Answer,Question,Backend>::distribute(int ournum)
186
{
446✔
187
  // this is the longest name we can use, not a typo
188
  setThreadName("pdns/distributo");
446✔
189

190
  try {
446✔
191
    auto b = make_unique<Backend>(); // this will answer our questions
446✔
192
    int queuetimeout = ::arg().asNum("queue-limit");
446✔
193
    auto& receiver = d_receivers.at(ournum);
446✔
194

195
    for (;;) {
87,716✔
196
      auto tempQD = receiver.receive();
87,716✔
197
      if (!tempQD) {
87,716!
198
        unixDie("read");
×
199
      }
×
200
      --d_queued;
87,716✔
201
      auto questionData = std::move(*tempQD);
87,716✔
202
      std::unique_ptr<Answer> a = nullptr;
87,716✔
203
      if (queuetimeout && questionData->Q.d_dt.udiff() > queuetimeout * 1000) {
87,716!
204
        S.inc("timedout-packets");
11✔
205
        continue;
11✔
206
      }
11✔
207

208
      bool allowRetry = true;
87,705✔
209
retry:
87,705✔
210
      // this is the only point where we interact with the backend (synchronous)
211
      try {
87,262✔
212
        if (!b) {
87,262!
213
          allowRetry = false;
1✔
214
          b = make_unique<Backend>();
1✔
215
        }
1✔
216
        a = b->question(questionData->Q);
87,262✔
217
      }
87,262✔
218
      catch (const PDNSException &e) {
87,262✔
219
        b.reset();
×
220
        if (!allowRetry) {
×
221
          g_log<<Logger::Error<<"Backend error: "<<e.reason<<endl;
×
222
          a = questionData->Q.replyPacket();
×
223

224
          a->setRcode(RCode::ServFail);
×
225
          S.inc("servfail-packets");
×
226
          S.ringAccount("servfail-queries", questionData->Q.qdomain, questionData->Q.qtype);
×
227
        } else {
×
228
          g_log<<Logger::Notice<<"Backend error (retry once): "<<e.reason<<endl;
×
229
          goto retry;
×
230
        }
×
231
      }
×
232
      catch (...) {
87,262✔
233
        b.reset();
1✔
234
        if (!allowRetry) {
1!
235
          g_log<<Logger::Error<<"Caught unknown exception in Distributor thread "<<std::this_thread::get_id()<<endl;
×
236
          a = questionData->Q.replyPacket();
×
237

238
          a->setRcode(RCode::ServFail);
×
239
          S.inc("servfail-packets");
×
240
          S.ringAccount("servfail-queries", questionData->Q.qdomain, questionData->Q.qtype);
×
241
        } else {
1✔
242
          g_log<<Logger::Warning<<"Caught unknown exception in Distributor thread "<<std::this_thread::get_id()<<" (retry once)"<<endl;
1✔
243
          goto retry;
1✔
244
        }
1✔
245
      }
1✔
246

247
      questionData->callback(a, questionData->start);
87,259✔
248
#ifdef ENABLE_GSS_TSIG
87,259✔
249
      if (g_doGssTSIG && a != nullptr) {
87,259!
250
        questionData->Q.cleanupGSS(a->d.rcode);
×
251
      }
×
252
#endif
87,259✔
253
      questionData.reset();
87,259✔
254
    }
87,259✔
255

UNCOV
256
    b.reset();
×
UNCOV
257
  }
×
258
  catch (const PDNSException &AE) {
446✔
259
    g_log<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl;
×
260
    _exit(1);
×
261
  }
×
262
  catch (const std::exception& e) {
446✔
263
    g_log<<Logger::Error<<"Distributor caught fatal exception: "<<e.what()<<endl;
×
264
    _exit(1);
×
265
  }
×
266
  catch (...) {
446✔
267
    g_log<<Logger::Error<<"Caught an unknown exception when creating backend, probably"<<endl;
×
268
    _exit(1);
×
269
  }
×
270
}
446✔
271

272
template<class Answer, class Question, class Backend>int SingleThreadDistributor<Answer,Question,Backend>::question(Question& q, callback_t callback)
273
{
734✔
274
  int start = q.d_dt.udiff();
734✔
275
  std::unique_ptr<Answer> a = nullptr;
734✔
276
  bool allowRetry=true;
734✔
277
retry:
734✔
278
  try {
734✔
279
    if (!b) {
734!
280
      allowRetry=false;
×
281
      b=make_unique<Backend>();
×
282
    }
×
283
    a=b->question(q); // a can be NULL!
734✔
284
  }
734✔
285
  catch(const PDNSException &e) {
734✔
286
    b.reset();
×
287
    if (!allowRetry) {
×
288
      g_log<<Logger::Error<<"Backend error: "<<e.reason<<endl;
×
289
      a=q.replyPacket();
×
290

291
      a->setRcode(RCode::ServFail);
×
292
      S.inc("servfail-packets");
×
293
      S.ringAccount("servfail-queries", q.qdomain, q.qtype);
×
294
    } else {
×
295
      g_log<<Logger::Notice<<"Backend error (retry once): "<<e.reason<<endl;
×
296
      goto retry;
×
297
    }
×
298
  }
×
299
  catch(...) {
734✔
300
    b.reset();
×
301
    if (!allowRetry) {
×
302
      g_log<<Logger::Error<<"Caught unknown exception in Distributor thread "<<std::this_thread::get_id()<<endl;
×
303
      a=q.replyPacket();
×
304

305
      a->setRcode(RCode::ServFail);
×
306
      S.inc("servfail-packets");
×
307
      S.ringAccount("servfail-queries", q.qdomain, q.qtype);
×
308
    } else {
×
309
      g_log<<Logger::Warning<<"Caught unknown exception in Distributor thread "<<std::this_thread::get_id()<<" (retry once)"<<endl;
×
310
      goto retry;
×
311
    }
×
312
  }
×
313
  callback(a, start);
734✔
314
#ifdef ENABLE_GSS_TSIG
734✔
315
  if (g_doGssTSIG && a != nullptr) {
734!
316
    q.cleanupGSS(a->d.rcode);
×
317
  }
×
318
#endif
734✔
319
  return 0;
734✔
320
}
734✔
321

322
struct DistributorFatal{};
323

324
template<class Answer, class Question, class Backend>int MultiThreadDistributor<Answer,Question,Backend>::question(Question& q, callback_t callback)
325
{
87,274✔
326
  // this is passed to other process over pipe and released there
327
  auto questionData = std::make_unique<QuestionData>(q);
87,274✔
328
  auto ret = questionData->id = d_nextid++; // might be deleted after write!
87,274✔
329
  questionData->callback = callback;
87,274✔
330

331
  ++d_queued;
87,274✔
332
  if (!d_senders.at(questionData->id % d_senders.size()).send(std::move(questionData))) {
87,274!
333
    --d_queued;
×
334
    questionData.reset();
×
335
    unixDie("write");
×
336
  }
×
337

338
  if (d_queued > d_maxQueueLength) {
87,274!
339
    g_log<<Logger::Error<< d_queued <<" questions waiting for database/backend attention. Limit is "<<::arg().asNum("max-queue-length")<<", respawning"<<endl;
1✔
340
    // this will leak the entire contents of all pipes, nothing will be freed. Respawn when this happens!
341
    throw DistributorFatal();
1✔
342
  }
1✔
343

344
  return ret;
87,273✔
345
}
87,274✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc