• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

PowerDNS / pdns / 18837422702

27 Oct 2025 10:15AM UTC coverage: 73.025% (+0.02%) from 73.004%
18837422702

Pull #16375

github

web-flow
Merge 2f23fc90d into 82ea647b4
Pull Request #16375: dnsdist: Include a Date: response header for rejected HTTP1 requests

38279 of 63122 branches covered (60.64%)

Branch coverage included in aggregate %.

0 of 11 new or added lines in 1 file covered. (0.0%)

10680 existing lines in 98 files now uncovered.

127481 of 163870 relevant lines covered (77.79%)

5384548.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.27
/pdns/channel.hh
1
/*
2
 * This file is part of PowerDNS or dnsdist.
3
 * Copyright -- PowerDNS.COM B.V. and its contributors
4
 *
5
 * This program is free software; you can redistribute it and/or modify
6
 * it under the terms of version 2 of the GNU General Public License as
7
 * published by the Free Software Foundation.
8
 *
9
 * In addition, for the avoidance of any doubt, permission is granted to
10
 * link this program with OpenSSL and to (re)distribute the binaries
11
 * produced as the result of such linking.
12
 *
13
 * This program is distributed in the hope that it will be useful,
14
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
 * GNU General Public License for more details.
17
 *
18
 * You should have received a copy of the GNU General Public License
19
 * along with this program; if not, write to the Free Software
20
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21
 */
22
#pragma once
23
#include <memory>
24
#include <optional>
25

26
#include "misc.hh"
27

28
/* g++ defines __SANITIZE_THREAD__
29
   clang++ supports the nice __has_feature(thread_sanitizer),
30
   let's merge them */
31
#if defined(__has_feature)
32
#if __has_feature(thread_sanitizer)
33
#define __SANITIZE_THREAD__ 1
34
#endif
35
#endif
36

37
#if __SANITIZE_THREAD__
38
#if defined __has_include
39
#if __has_include(<sanitizer/tsan_interface.h>)
40
#include <sanitizer/tsan_interface.h>
41
#else /* __has_include(<sanitizer/tsan_interface.h>) */
42
extern "C" void __tsan_acquire(void* addr);
43
extern "C" void __tsan_release(void* addr);
44
#endif /* __has_include(<sanitizer/tsan_interface.h>) */
45
#else /* defined __has_include */
46
extern "C" void __tsan_acquire(void* addr);
47
extern "C" void __tsan_release(void* addr);
48
#endif /* defined __has_include */
49
#endif /* __SANITIZE_THREAD__ */
50

51
namespace pdns
52
{
53
namespace channel
54
{
55
  enum class SenderBlockingMode
56
  {
57
    SenderNonBlocking,
58
    SenderBlocking
59
  };
60
  enum class ReceiverBlockingMode
61
  {
62
    ReceiverNonBlocking,
63
    ReceiverBlocking
64
  };
65

66
  /**
67
   * The sender's end of a channel used to pass objects between threads.
68
   *
69
   * A sender can be used by several threads in a safe way.
70
   */
71
  template <typename T, typename D = std::default_delete<T>>
72
  class Sender
73
  {
74
  public:
75
    Sender() = default;
11,816✔
76
    Sender(FDWrapper&& descriptor) :
77
      d_fd(std::move(descriptor))
12,139✔
78
    {
12,620✔
79
    }
12,620✔
80
    Sender(const Sender&) = delete;
81
    Sender& operator=(const Sender&) = delete;
82
    Sender(Sender&&) = default;
25,222✔
83
    Sender& operator=(Sender&&) = default;
11,801✔
84
    ~Sender() = default;
33,636✔
85
    /**
86
     * \brief Try to send the supplied object to the other end of that channel. Might block if the channel was created in blocking mode.
87
     *
88
     * \return True if the object was properly sent, False if the channel is full.
89
     *
90
     * \throw runtime_error if the channel is broken, for example if the other end has been closed.
91
     */
92
    bool send(std::unique_ptr<T, D>&&) const;
93
    void close();
94

95
  private:
96
    FDWrapper d_fd;
97
  };
98

99
  /**
100
   * The receiver's end of a channel used to pass objects between threads.
101
   *
102
   * A receiver can be used by several threads in a safe way, but in that case spurious wake up might happen.
103
   */
104
  template <typename T, typename D = std::default_delete<T>>
105
  class Receiver
106
  {
107
  public:
108
    Receiver() = default;
10,991✔
109
    Receiver(FDWrapper&& descriptor, bool throwOnEOF = true) :
110
      d_fd(std::move(descriptor)), d_throwOnEOF(throwOnEOF)
12,139✔
111
    {
12,620✔
112
    }
12,620✔
113
    Receiver(const Receiver&) = delete;
114
    Receiver& operator=(const Receiver&) = delete;
115
    Receiver(Receiver&&) = default;
25,639✔
116
    Receiver& operator=(Receiver&&) = default;
11,213✔
117
    ~Receiver() = default;
25,770✔
118
    /**
119
     * \brief Try to read an object sent by the other end of that channel. Might block if the channel was created in blocking mode.
120
     *
121
     * \return An object if one was available, and std::nullopt otherwise.
122
     *
123
     * \throw runtime_error if the channel is broken, for example if the other end has been closed.
124
     */
125
    std::optional<std::unique_ptr<T, D>> receive();
126
    std::optional<std::unique_ptr<T, D>> receive(D deleter);
127

128
    /**
129
     * \brief Get a descriptor that can be used with an I/O multiplexer to wait for an object to become available.
130
     *
131
     * \return A valid descriptor or -1 if the Receiver was not properly initialized.
132
     */
133
    int getDescriptor() const
134
    {
2,205,813✔
135
      return d_fd.getHandle();
2,205,813✔
136
    }
2,205,813✔
137
    /**
138
     * \brief Whether the remote end has closed the channel.
139
     */
140
    bool isClosed() const
141
    {
15✔
142
      return d_closed;
15✔
143
    }
15✔
144

145
  private:
146
    FDWrapper d_fd;
147
    bool d_closed{false};
148
    bool d_throwOnEOF{true};
149
  };
150

151
  /**
152
   * \brief Create a channel to pass objects between threads, accepting multiple senders and receivers.
153
   *
154
   * \return A pair of Sender and Receiver objects.
155
   *
156
   * \throw runtime_error if the channel creation failed.
157
   */
158
  template <typename T, typename D = std::default_delete<T>>
159
  std::pair<Sender<T, D>, Receiver<T, D>> createObjectQueue(SenderBlockingMode senderBlockingMode = SenderBlockingMode::SenderNonBlocking, ReceiverBlockingMode receiverBlockingMode = ReceiverBlockingMode::ReceiverNonBlocking, size_t pipeBufferSize = 0, bool throwOnEOF = true);
160

161
  /**
162
   * The notifier's end of a channel used to communicate between threads.
163
   *
164
   * A notifier can be used by several threads in a safe way.
165
   */
166
  class Notifier
167
  {
168
  public:
169
    Notifier() = default;
408✔
170
    Notifier(FDWrapper&&);
171
    Notifier(const Notifier&) = delete;
172
    Notifier& operator=(const Notifier&) = delete;
173
    Notifier(Notifier&&) = default;
411✔
174
    Notifier& operator=(Notifier&&) = default;
408✔
175
    ~Notifier() = default;
836✔
176

177
    /**
178
     * \brief Queue a notification to wake up the other end of the channel.
179
     *
180
     * \return True if the notification was properly sent, False if the channel is full.
181
     *
182
     * \throw runtime_error if the channel is broken, for example if the other end has been closed.
183
     */
184
    bool notify() const;
185

186
  private:
187
    FDWrapper d_fd;
188
  };
189

190
  /**
191
   * The waiter's end of a channel used to communicate between threads.
192
   *
193
   * A waiter can be used by several threads in a safe way, but in that case spurious wake up might happen.
194
   */
195
  class Waiter
196
  {
197
  public:
198
    Waiter() = default;
402✔
199
    Waiter(FDWrapper&&, bool throwOnEOF = true);
200
    Waiter(const Waiter&) = delete;
201
    Waiter& operator=(const Waiter&) = delete;
202
    Waiter(Waiter&&) = default;
411✔
203
    Waiter& operator=(Waiter&&) = default;
402✔
204
    ~Waiter() = default;
830✔
205

206
    /**
207
     * \brief Clear all notifications queued on that channel, if any.
208
     */
209
    void clear();
210
    /**
211
     * \brief Get a descriptor that can be used with an I/O multiplexer to wait for a notification to arrive.
212
     *
213
     * \return A valid descriptor or -1 if the Waiter was not properly initialized.
214
     */
215
    int getDescriptor() const;
216
    /**
217
     * \brief Whether the remote end has closed the channel.
218
     */
219
    bool isClosed() const
220
    {
12✔
221
      return d_closed;
12✔
222
    }
12✔
223

224
  private:
225
    FDWrapper d_fd;
226
    bool d_closed{false};
227
    bool d_throwOnEOF{true};
228
  };
229

230
  /**
231
   * \brief Create a channel to notify one thread from another one, accepting multiple senders and receivers.
232
   *
233
   * \return A pair of Notifier and Sender objects.
234
   *
235
   * \throw runtime_error if the channel creation failed.
236
   */
237
  std::pair<Notifier, Waiter> createNotificationQueue(bool nonBlocking = true, size_t pipeBufferSize = 0, bool throwOnEOF = true);
238

239
  template <typename T, typename D>
240
  bool Sender<T, D>::send(std::unique_ptr<T, D>&& object) const
241
  {
2,140,300✔
242
    /* we cannot touch the initial unique pointer after writing to the pipe,
243
       not even to release it, so let's transfer it to a local object */
244
    auto localObj = std::move(object);
2,140,300✔
245
    auto ptr = localObj.get();
2,140,300✔
246
    static_assert(sizeof(ptr) <= PIPE_BUF, "Writes up to PIPE_BUF are guaranted not to interleaved and to either fully succeed or fail");
2,140,300✔
247
    while (true) {
2,140,300✔
248
#if __SANITIZE_THREAD__
249
      __tsan_release(ptr);
250
#endif /* __SANITIZE_THREAD__ */
251
      ssize_t sent = write(d_fd.getHandle(), &ptr, sizeof(ptr));
2,140,300✔
252

253
      if (sent == sizeof(ptr)) {
2,140,300!
254
        // coverity[leaked_storage]
255
        localObj.release();
2,140,297✔
256
        return true;
2,140,297✔
257
      }
2,140,297✔
258
      else if (sent == 0) {
3!
259
#if __SANITIZE_THREAD__
260
        __tsan_acquire(ptr);
261
#endif /* __SANITIZE_THREAD__ */
262
        throw std::runtime_error("Unable to write to channel: remote end has been closed");
×
263
      }
×
264
      else {
3✔
265
#if __SANITIZE_THREAD__
266
        __tsan_acquire(ptr);
267
#endif /* __SANITIZE_THREAD__ */
268
        if (errno == EINTR) {
3!
269
          continue;
×
270
        }
×
271
        if (errno == EAGAIN || errno == EWOULDBLOCK) {
3!
272
          object = std::move(localObj);
3✔
273
          return false;
3✔
274
        }
3✔
275
        else {
×
276
          throw std::runtime_error("Unable to write to channel:" + stringerror());
×
277
        }
×
278
      }
3✔
279
    }
2,140,300✔
280
  }
2,140,300✔
281

282
  template <typename T, typename D>
283
  void Sender<T, D>::close()
284
  {
12✔
285
    d_fd.reset();
12✔
286
  }
12✔
287

288
  template <typename T, typename D>
289
  std::optional<std::unique_ptr<T, D>> Receiver<T, D>::receive()
290
  {
2,140,964✔
291
    return receive(D());
2,140,964✔
292
  }
2,140,964✔
293

294
  template <typename T, typename D>
295
  std::optional<std::unique_ptr<T, D>> Receiver<T, D>::receive(D deleter)
296
  {
2,140,977✔
297
    while (true) {
2,140,977✔
298
      std::optional<std::unique_ptr<T, D>> result;
2,140,971✔
299
      T* objPtr{nullptr};
2,140,971✔
300
      ssize_t got = read(d_fd.getHandle(), &objPtr, sizeof(objPtr));
2,140,971✔
301
      if (got == sizeof(objPtr)) {
2,140,971✔
302
#if __SANITIZE_THREAD__
303
        __tsan_acquire(objPtr);
304
#endif /* __SANITIZE_THREAD__ */
305
        return std::unique_ptr<T, D>(objPtr, deleter);
2,140,297✔
306
      }
2,140,297✔
307
      else if (got == 0) {
674!
308
        d_closed = true;
12✔
309
        if (!d_throwOnEOF) {
12!
310
          return result;
9✔
311
        }
9✔
312
        throw std::runtime_error("EOF while reading from Channel receiver");
3✔
313
      }
12✔
314
      else if (got == -1) {
662!
315
        if (errno == EINTR) {
178!
316
          continue;
×
317
        }
×
318
        if (errno == EAGAIN || errno == EWOULDBLOCK) {
178!
319
          return result;
178✔
320
        }
178✔
321
        throw std::runtime_error("Error while reading from Channel receiver: " + stringerror());
×
322
      }
178✔
323
      else {
484✔
324
        throw std::runtime_error("Partial read from Channel receiver");
484✔
325
      }
484✔
326
    }
2,140,971✔
327
  }
2,140,977✔
328

329
  template <typename T, typename D>
330
  std::pair<Sender<T, D>, Receiver<T, D>> createObjectQueue(SenderBlockingMode senderBlockingMode, ReceiverBlockingMode receiverBlockingMode, size_t pipeBufferSize, bool throwOnEOF)
331
  {
12,620✔
332
    int fds[2] = {-1, -1};
12,620✔
333
    if (pipe(fds) < 0) {
12,620!
334
      throw std::runtime_error("Error creating channel pipe: " + stringerror());
×
335
    }
×
336

337
    FDWrapper sender(fds[1]);
12,620✔
338
    FDWrapper receiver(fds[0]);
12,620✔
339
    if (receiverBlockingMode == ReceiverBlockingMode::ReceiverNonBlocking && !setNonBlocking(receiver.getHandle())) {
12,620!
340
      int err = errno;
×
341
      throw std::runtime_error("Error making channel pipe non-blocking: " + stringerror(err));
×
342
    }
×
343

344
    if (senderBlockingMode == SenderBlockingMode::SenderNonBlocking && !setNonBlocking(sender.getHandle())) {
12,620!
345
      int err = errno;
×
346
      throw std::runtime_error("Error making channel pipe non-blocking: " + stringerror(err));
×
347
    }
×
348

349
    if (pipeBufferSize > 0 && getPipeBufferSize(receiver.getHandle()) < pipeBufferSize) {
12,620!
UNCOV
350
      setPipeBufferSize(receiver.getHandle(), pipeBufferSize);
11,730✔
UNCOV
351
    }
11,730✔
352

353
    return {Sender<T, D>(std::move(sender)), Receiver<T, D>(std::move(receiver), throwOnEOF)};
12,620✔
354
  }
12,620✔
355
}
356
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc