• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

PowerDNS / pdns / 12595591960

03 Jan 2025 09:27AM UTC coverage: 62.774% (+2.5%) from 60.245%
12595591960

Pull #15008

github

web-flow
Merge c2a2749d3 into 788f396a7
Pull Request #15008: Do not follow CNAME records for ANY or CNAME queries

30393 of 78644 branches covered (38.65%)

Branch coverage included in aggregate %.

105822 of 138350 relevant lines covered (76.49%)

4613078.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

39.13
/pdns/distributor.hh
1
/*
2
 * This file is part of PowerDNS or dnsdist.
3
 * Copyright -- PowerDNS.COM B.V. and its contributors
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of version 2 of the GNU General Public License as
7
 * published by the Free Software Foundation.
8
 *
9
 * In addition, for the avoidance of any doubt, permission is granted to
10
 * link this program with OpenSSL and to (re)distribute the binaries
11
 * produced as the result of such linking.
12
 *
13
 * This program is distributed in the hope that it will be useful,
14
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
 * GNU General Public License for more details.
17
 *
18
 * You should have received a copy of the GNU General Public License
19
 * along with this program; if not, write to the Free Software
20
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21
 */
22
#pragma once
23
#ifdef HAVE_CONFIG_H
24
#include "config.h"
25
#endif
26
#include <string>
27
#include <deque>
28
#include <queue>
29
#include <vector>
30
#include <thread>
31
#include "threadname.hh"
32
#include <unistd.h>
33

34
#include "channel.hh"
35
#include "logger.hh"
36
#include "dns.hh"
37
#include "dnsbackend.hh"
38
#include "pdnsexception.hh"
39
#include "arguments.hh"
40
#include <atomic>
41
#include "statbag.hh"
42
#include "gss_context.hh"
43

44
extern StatBag S;
45

46
/** the Distributor template class enables you to multithread slow question/answer 
47
    processes. 
48
    
49
    Questions are posed to the Distributor, which returns the answer via a callback.
50

51
    The Distributor spawns sufficient backends, and if they thrown an exception,
52
    it will cycle the backend but drop the query that was active during the exception.
53
*/
54

55
template<class Answer, class Question, class Backend> class Distributor
56
{
57
public:
58
  static Distributor* Create(int n=1); //!< Create a new Distributor with \param n threads
59
  typedef std::function<void(std::unique_ptr<Answer>&, int)> callback_t;
60
  virtual int question(Question&, callback_t callback) =0; //!< Submit a question to the Distributor
61
  virtual int getQueueSize() =0; //!< Returns length of question queue
62
  virtual bool isOverloaded() =0;
63
  virtual ~Distributor() { cerr<<__func__<<endl;}
×
64
};
65

66
template<class Answer, class Question, class Backend> class SingleThreadDistributor
67
    : public Distributor<Answer, Question, Backend>
68
{
69
public:
70
  SingleThreadDistributor(const SingleThreadDistributor&) = delete;
71
  void operator=(const SingleThreadDistributor&) = delete;
72
  SingleThreadDistributor();
73
  typedef std::function<void(std::unique_ptr<Answer>&, int)> callback_t;
74
  int question(Question&, callback_t callback) override; //!< Submit a question to the Distributor
75
  int getQueueSize() override {
×
76
    return 0;
×
77
  }
×
78

79
  bool isOverloaded() override
80
  {
×
81
    return false;
×
82
  }
×
83

84
private:
85
  std::unique_ptr<Backend> b{nullptr};
86
};
87

88
template<class Answer, class Question, class Backend> class MultiThreadDistributor
89
    : public Distributor<Answer, Question, Backend>
90
{
91
public:
92
  MultiThreadDistributor(const MultiThreadDistributor&) = delete;
93
  void operator=(const MultiThreadDistributor&) = delete;
94
  MultiThreadDistributor(int n);
95
  typedef std::function<void(std::unique_ptr<Answer>&, int)> callback_t;
96
  int question(Question&, callback_t callback) override; //!< Submit a question to the Distributor
97
  void distribute(int n);
98
  int getQueueSize() override {
1✔
99
    return d_queued;
1✔
100
  }
1✔
101

102
  struct QuestionData
103
  {
104
    QuestionData(const Question& query): Q(query)
105
    {
1,203✔
106
      start = Q.d_dt.udiff();
1,203✔
107
    }
1,203✔
108

109
    Question Q;
110
    callback_t callback{nullptr};
111
    int id{0};
112
    int start{0};
113
  };
114

115
  bool isOverloaded() override
116
  {
×
117
    return d_overloadQueueLength && (d_queued > d_overloadQueueLength);
×
118
  }
×
119

120
private:
121
  std::vector<pdns::channel::Sender<QuestionData>> d_senders;
122
  std::vector<pdns::channel::Receiver<QuestionData>> d_receivers;
123
  time_t d_last_started{0};
124
  std::atomic<unsigned int> d_queued{0};
125
  unsigned int d_overloadQueueLength{0};
126
  unsigned int d_maxQueueLength{0};
127
  int d_nextid{0};
128
  int d_num_threads{0};
129
};
130

131
template<class Answer, class Question, class Backend> Distributor<Answer,Question,Backend>* Distributor<Answer,Question,Backend>::Create(int n)
132
{
3✔
133
    if( n == 1 )
3!
134
      return new SingleThreadDistributor<Answer,Question,Backend>();
×
135
    else
3✔
136
      return new MultiThreadDistributor<Answer,Question,Backend>( n );
3✔
137
}
3✔
138

139
template<class Answer, class Question, class Backend>SingleThreadDistributor<Answer,Question,Backend>::SingleThreadDistributor()
140
{
×
141
  g_log<<Logger::Error<<"Only asked for 1 backend thread - operating unthreaded"<<endl;
×
142
  try {
×
143
    b=make_unique<Backend>();
×
144
  }
×
145
  catch(const PDNSException &AE) {
×
146
    g_log<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl;
×
147
    _exit(1);
×
148
  }
×
149
  catch(const std::exception& e) {
×
150
    g_log<<Logger::Error<<"Distributor caught fatal exception: "<<e.what()<<endl;
×
151
    _exit(1);
×
152
  }
×
153
  catch(...) {
×
154
    g_log<<Logger::Error<<"Caught an unknown exception when creating backend, probably"<<endl;
×
155
    _exit(1);
×
156
  }
×
157
}
×
158

159
template<class Answer, class Question, class Backend>MultiThreadDistributor<Answer,Question,Backend>::MultiThreadDistributor(int numberOfThreads) :
160
  d_last_started(time(nullptr)), d_overloadQueueLength(::arg().asNum("overload-queue-length")), d_maxQueueLength(::arg().asNum("max-queue-length")), d_num_threads(numberOfThreads)
161
{
3✔
162
  if (numberOfThreads < 1) {
3!
163
    g_log<<Logger::Error<<"Asked for fewer than 1 threads, nothing to do"<<endl;
×
164
    _exit(1);
×
165
  }
×
166

167
  for (int distributorIdx = 0; distributorIdx < numberOfThreads; distributorIdx++) {
17✔
168
    auto [sender, receiver] = pdns::channel::createObjectQueue<QuestionData>(pdns::channel::SenderBlockingMode::SenderBlocking, pdns::channel::ReceiverBlockingMode::ReceiverBlocking);
14✔
169
    d_senders.push_back(std::move(sender));
14✔
170
    d_receivers.push_back(std::move(receiver));
14✔
171
  }
14✔
172

173
  g_log<<Logger::Warning<<"About to create "<<numberOfThreads<<" backend threads for UDP"<<endl;
3✔
174

175
  for (int distributorIdx = 0; distributorIdx < numberOfThreads; distributorIdx++) {
17✔
176
    std::thread t([=](){distribute(distributorIdx);});
14✔
177
    t.detach();
14✔
178
    Utility::usleep(50000); // we've overloaded mysql in the past :-)
14✔
179
  }
14✔
180
  g_log<<Logger::Warning<<"Done launching threads, ready to distribute questions"<<endl;
3✔
181
}
3✔
182

183

184
// start of a new thread
185
template<class Answer, class Question, class Backend>void MultiThreadDistributor<Answer,Question,Backend>::distribute(int ournum)
186
{
14✔
187
  // this is the longest name we can use, not a typo
188
  setThreadName("pdns/distributo");
14✔
189

190
  try {
14✔
191
    auto b = make_unique<Backend>(); // this will answer our questions
14✔
192
    int queuetimeout = ::arg().asNum("queue-limit");
14✔
193
    auto& receiver = d_receivers.at(ournum);
14✔
194

195
    for (;;) {
1,207✔
196
      auto tempQD = receiver.receive();
1,207✔
197
      if (!tempQD) {
1,207!
198
        unixDie("read");
×
199
      }
×
200
      --d_queued;
1,207✔
201
      auto questionData = std::move(*tempQD);
1,207✔
202
      std::unique_ptr<Answer> a = nullptr;
1,207✔
203
      if (queuetimeout && questionData->Q.d_dt.udiff() > queuetimeout * 1000) {
1,207!
204
        S.inc("timedout-packets");
×
205
        continue;
×
206
      }
×
207

208
      bool allowRetry = true;
1,207✔
209
retry:
1,207✔
210
      // this is the only point where we interact with the backend (synchronous)
211
      try {
1,201✔
212
        if (!b) {
1,201!
213
          allowRetry = false;
1✔
214
          b = make_unique<Backend>();
1✔
215
        }
1✔
216
        a = b->question(questionData->Q);
1,201✔
217
      }
1,201✔
218
      catch (const PDNSException &e) {
1,201✔
219
        b.reset();
×
220
        if (!allowRetry) {
×
221
          g_log<<Logger::Error<<"Backend error: "<<e.reason<<endl;
×
222
          a = questionData->Q.replyPacket();
×
223

224
          a->setRcode(RCode::ServFail);
×
225
          S.inc("servfail-packets");
×
226
          S.ringAccount("servfail-queries", questionData->Q.qdomain, questionData->Q.qtype);
×
227
        } else {
×
228
          g_log<<Logger::Notice<<"Backend error (retry once): "<<e.reason<<endl;
×
229
          goto retry;
×
230
        }
×
231
      }
×
232
      catch (...) {
1,201✔
233
        b.reset();
1✔
234
        if (!allowRetry) {
1!
235
          g_log<<Logger::Error<<"Caught unknown exception in Distributor thread "<<std::this_thread::get_id()<<endl;
×
236
          a = questionData->Q.replyPacket();
×
237

238
          a->setRcode(RCode::ServFail);
×
239
          S.inc("servfail-packets");
×
240
          S.ringAccount("servfail-queries", questionData->Q.qdomain, questionData->Q.qtype);
×
241
        } else {
1✔
242
          g_log<<Logger::Warning<<"Caught unknown exception in Distributor thread "<<std::this_thread::get_id()<<" (retry once)"<<endl;
1✔
243
          goto retry;
1✔
244
        }
1✔
245
      }
1✔
246

247
      questionData->callback(a, questionData->start);
1,199✔
248
#ifdef ENABLE_GSS_TSIG
1,199✔
249
      if (g_doGssTSIG && a != nullptr) {
1,199!
250
        questionData->Q.cleanupGSS(a->d.rcode);
×
251
      }
×
252
#endif
1,199✔
253
      questionData.reset();
1,199✔
254
    }
1,199✔
255

256
    b.reset();
6✔
257
  }
6✔
258
  catch (const PDNSException &AE) {
14✔
259
    g_log<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl;
×
260
    _exit(1);
×
261
  }
×
262
  catch (const std::exception& e) {
14✔
263
    g_log<<Logger::Error<<"Distributor caught fatal exception: "<<e.what()<<endl;
×
264
    _exit(1);
×
265
  }
×
266
  catch (...) {
14✔
267
    g_log<<Logger::Error<<"Caught an unknown exception when creating backend, probably"<<endl;
×
268
    _exit(1);
×
269
  }
×
270
}
14✔
271

272
template<class Answer, class Question, class Backend>int SingleThreadDistributor<Answer,Question,Backend>::question(Question& q, callback_t callback)
273
{
×
274
  int start = q.d_dt.udiff();
×
275
  std::unique_ptr<Answer> a = nullptr;
×
276
  bool allowRetry=true;
×
277
retry:
×
278
  try {
×
279
    if (!b) {
×
280
      allowRetry=false;
×
281
      b=make_unique<Backend>();
×
282
    }
×
283
    a=b->question(q); // a can be NULL!
×
284
  }
×
285
  catch(const PDNSException &e) {
×
286
    b.reset();
×
287
    if (!allowRetry) {
×
288
      g_log<<Logger::Error<<"Backend error: "<<e.reason<<endl;
×
289
      a=q.replyPacket();
×
290

291
      a->setRcode(RCode::ServFail);
×
292
      S.inc("servfail-packets");
×
293
      S.ringAccount("servfail-queries", q.qdomain, q.qtype);
×
294
    } else {
×
295
      g_log<<Logger::Notice<<"Backend error (retry once): "<<e.reason<<endl;
×
296
      goto retry;
×
297
    }
×
298
  }
×
299
  catch(...) {
×
300
    b.reset();
×
301
    if (!allowRetry) {
×
302
      g_log<<Logger::Error<<"Caught unknown exception in Distributor thread "<<std::this_thread::get_id()<<endl;
×
303
      a=q.replyPacket();
×
304

305
      a->setRcode(RCode::ServFail);
×
306
      S.inc("servfail-packets");
×
307
      S.ringAccount("servfail-queries", q.qdomain, q.qtype);
×
308
    } else {
×
309
      g_log<<Logger::Warning<<"Caught unknown exception in Distributor thread "<<std::this_thread::get_id()<<" (retry once)"<<endl;
×
310
      goto retry;
×
311
    }
×
312
  }
×
313
  callback(a, start);
×
314
#ifdef ENABLE_GSS_TSIG
×
315
  if (g_doGssTSIG && a != nullptr) {
×
316
    q.cleanupGSS(a->d.rcode);
×
317
  }
×
318
#endif
×
319
  return 0;
×
320
}
×
321

322
struct DistributorFatal{};
323

324
template<class Answer, class Question, class Backend>int MultiThreadDistributor<Answer,Question,Backend>::question(Question& q, callback_t callback)
325
{
1,203✔
326
  // this is passed to other process over pipe and released there
327
  auto questionData = std::make_unique<QuestionData>(q);
1,203✔
328
  auto ret = questionData->id = d_nextid++; // might be deleted after write!
1,203✔
329
  questionData->callback = callback;
1,203✔
330

331
  ++d_queued;
1,203✔
332
  if (!d_senders.at(questionData->id % d_senders.size()).send(std::move(questionData))) {
1,203!
333
    --d_queued;
×
334
    questionData.reset();
×
335
    unixDie("write");
×
336
  }
×
337

338
  if (d_queued > d_maxQueueLength) {
1,203!
339
    g_log<<Logger::Error<< d_queued <<" questions waiting for database/backend attention. Limit is "<<::arg().asNum("max-queue-length")<<", respawning"<<endl;
1✔
340
    // this will leak the entire contents of all pipes, nothing will be freed. Respawn when this happens!
341
    throw DistributorFatal();
1✔
342
  }
1✔
343

344
  return ret;
1,202✔
345
}
1,203✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc