• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

PowerDNS / pdns / 16615247828

30 Jul 2025 06:33AM UTC coverage: 65.845% (-0.03%) from 65.87%
16615247828

Pull #15942

github

web-flow
Merge 3e4243857 into 4a7b6a621
Pull Request #15942: Optimize reload-zones logic to reduce thread scheduling times

42051 of 92438 branches covered (45.49%)

Branch coverage included in aggregate %.

4 of 4 new or added lines in 1 file covered. (100.0%)

48 existing lines in 12 files now uncovered.

127942 of 165732 relevant lines covered (77.2%)

5529729.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.36
/pdns/remote_logger.cc
1
#include <unistd.h>
2
#include "threadname.hh"
3
#include "remote_logger.hh"
4
#include <sys/uio.h>
5
#ifdef HAVE_CONFIG_H
6
#include "config.h"
7
#endif
8
#ifdef RECURSOR
9
#include "logger.hh"
10
#else
11
#include "dolog.hh"
12
#endif
13
#include "logging.hh"
14

15
bool CircularWriteBuffer::hasRoomFor(const std::string& str) const
16
{
496✔
17
  if (d_buffer.size() + 2 + str.size() > d_buffer.capacity()) {
496!
18
    return false;
×
19
  }
×
20

21
  return true;
496✔
22
}
496✔
23

24
bool CircularWriteBuffer::write(const std::string& str)
25
{
248✔
26
  if (str.size() > std::numeric_limits<uint16_t>::max() || !hasRoomFor(str)) {
248!
27
    return false;
×
28
  }
×
29

30
  uint16_t len = htons(str.size());
248✔
31
  const char* ptr = reinterpret_cast<const char*>(&len);
248✔
32
  d_buffer.insert(d_buffer.end(), ptr, ptr + 2);
248✔
33
  d_buffer.insert(d_buffer.end(), str.begin(), str.end());
248✔
34

35
  return true;
248✔
36
}
248✔
37

38
bool CircularWriteBuffer::flush(int fd)
39
{
871✔
40
  if (d_buffer.empty()) {
871✔
41
    // not optional, we report EOF otherwise
42
    return false;
753✔
43
  }
753✔
44

45
  auto arr1 = d_buffer.array_one();
118✔
46
  auto arr2 = d_buffer.array_two();
118✔
47

48
  struct iovec iov[2];
118✔
49
  int pos = 0;
118✔
50
  for(const auto& arr : {arr1, arr2}) {
236✔
51
    if(arr.second) {
236✔
52
      iov[pos].iov_base = arr.first;
118✔
53
      iov[pos].iov_len = arr.second;
118✔
54
      ++pos;
118✔
55
    }
118✔
56
  }
236✔
57

58
  ssize_t res = 0;
118✔
59
  do {
118✔
60
    res = writev(fd, iov, pos);
118✔
61

62
    if (res < 0) {
118!
63
      if (errno == EINTR) {
×
64
        continue;
×
65
      }
×
66

67
      if (errno == EAGAIN || errno == EWOULDBLOCK) {
×
68
        return false;
×
69
      }
×
70

71
      /* we can't be sure we haven't sent a partial message,
72
         and we don't want to send the remaining part after reconnecting */
73
      d_buffer.clear();
×
74
      throw std::runtime_error("Couldn't flush a thing: " + stringerror());
×
75
    }
×
76
    else if (!res) {
118!
77
      /* we can't be sure we haven't sent a partial message,
78
         and we don't want to send the remaining part after reconnecting */
79
      d_buffer.clear();
×
80
      throw std::runtime_error("EOF");
×
81
    }
×
82
  }
118✔
83
  while (res < 0);
118!
84

85
  if (static_cast<size_t>(res) == d_buffer.size()) {
118!
86
    d_buffer.clear();
118✔
87
  }
118✔
UNCOV
88
  else {
×
UNCOV
89
    while (res--) {
×
90
      d_buffer.pop_front();
×
91
    }
×
UNCOV
92
  }
×
93

94
  return true;
118✔
95
}
118✔
96

97
const std::string& RemoteLoggerInterface::toErrorString(Result r)
98
{
×
99
  static const std::array<std::string,5> str = {
×
100
    "Queued",
×
101
    "Queue full, dropping",
×
102
    "Not sending too large protobuf message",
×
103
    "Submiting to queue failed",
×
104
    "?"
×
105
  };
×
106
  auto i = static_cast<unsigned int>(r);
×
107
  return str[std::min(i, 4U)];
×
108
}
×
109

110
RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedBytes, uint8_t reconnectWaitTime, bool asyncConnect): d_remote(remote), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_asyncConnect(asyncConnect), d_runtime({CircularWriteBuffer(maxQueuedBytes), nullptr})
19✔
111
{
239✔
112
  if (!d_asyncConnect) {
239!
113
    reconnect();
239✔
114
  }
239✔
115

116
  d_thread = std::thread(&RemoteLogger::maintenanceThread, this);
239✔
117
}
239✔
118

119
bool RemoteLogger::reconnect()
120
{
239✔
121
  try {
239✔
122
    auto newSock = make_unique<Socket>(d_remote.sin4.sin_family, SOCK_STREAM, 0);
239✔
123
    newSock->setNonBlocking();
239✔
124
    newSock->connect(d_remote, d_timeout);
239✔
125

126
    {
239✔
127
      /* we are now successfully connected, time to take the lock and update the
128
         socket */
129
      auto runtime = d_runtime.lock();
239✔
130
      runtime->d_socket = std::move(newSock);
239✔
131
    }
239✔
132
  }
239✔
133
  catch (const std::exception& e) {
239✔
134
#ifdef RECURSOR
135
    SLOG(g_log<<Logger::Warning<<"Error connecting to remote logger "<<d_remote.toStringWithPort()<<": "<<e.what()<<std::endl,
136
         g_slog->withName("protobuf")->error(Logr::Error, e.what(), "Exception while connecting to remote logger", "address", Logging::Loggable(d_remote)));
137
#else
138
    warnlog("Error connecting to remote logger %s: %s", d_remote.toStringWithPort(), e.what());
139
#endif
140

141
    return false;
×
142
  }
×
143
  return true;
239✔
144
}
239✔
145

146
RemoteLoggerInterface::Result RemoteLogger::queueData(const std::string& data)
147
{
248✔
148
  auto runtime = d_runtime.lock();
248✔
149

150
  if (data.size() > std::numeric_limits<uint16_t>::max()) {
248!
151
    ++runtime->d_stats.d_tooLarge;
×
152
    return Result::TooLarge;
×
153
  }
×
154

155
  if (!runtime->d_writer.hasRoomFor(data)) {
248!
156
    /* not connected, queue is full, just drop */
157
    if (!runtime->d_socket) {
×
158
      ++runtime->d_stats.d_pipeFull;
×
159
      return Result::PipeFull;
×
160
    }
×
161
    try {
×
162
      /* we try to flush some data */
163
      if (!runtime->d_writer.flush(runtime->d_socket->getHandle())) {
×
164
        /* but failed, let's just drop */
165
        ++runtime->d_stats.d_pipeFull;
×
166
        return Result::PipeFull;
×
167
      }
×
168

169
      /* see if we freed enough data */
170
      if (!runtime->d_writer.hasRoomFor(data)) {
×
171
        /* we didn't */
172
        ++runtime->d_stats.d_pipeFull;
×
173
        return Result::PipeFull;
×
174
      }
×
175
    }
×
176
    catch(const std::exception& e) {
×
177
      //      cout << "Got exception writing: "<<e.what()<<endl;
178
      runtime->d_socket.reset();
×
179
      ++runtime->d_stats.d_otherError;
×
180
      return Result::OtherError;
×
181
    }
×
182
  }
×
183

184
  runtime->d_writer.write(data);
248✔
185
  ++runtime->d_stats.d_queued;
248✔
186
  return Result::Queued;
248✔
187
}
248✔
188

189
void RemoteLogger::maintenanceThread() 
190
{
239✔
191
  try {
239✔
192
#ifdef RECURSOR
220✔
193
    string threadName = "rec/remlog";
220✔
194
#else
195
    string threadName = "dnsdist/remLog";
19✔
196
#endif
19✔
197
    setThreadName(threadName);
239✔
198

199
    for (;;) {
1,091✔
200
      if (d_exiting) {
1,091✔
201
        break;
220✔
202
      }
220✔
203

204
      bool connected = true;
871✔
205
      if (d_runtime.lock()->d_socket == nullptr) {
871!
206
        // if it was unset, it will remain so, we are the only ones setting it!
207
        connected = reconnect();
×
208
      }
×
209

210
      /* we will just go to sleep if the reconnection just failed */
211
      if (connected) {
872✔
212
        try {
872✔
213
          /* we don't want to take the lock while trying to reconnect */
214
          auto runtime = d_runtime.lock();
872✔
215
          if (runtime->d_socket) { // check if it is set
872!
216
            /* if flush() returns false, it means that we couldn't flush anything yet
217
               either because there is nothing to flush, or because the outgoing TCP
218
               buffer is full. That's fine by us */
219
            runtime->d_writer.flush(runtime->d_socket->getHandle());
872✔
220
          }
872✔
UNCOV
221
          else {
×
UNCOV
222
            connected = false;
×
UNCOV
223
          }
×
224
        }
872✔
225
        catch (const std::exception& e) {
872✔
226
          d_runtime.lock()->d_socket.reset();
×
227
          connected = false;
×
228
        }
×
229

230
        if (!connected) {
872!
231
          /* let's try to reconnect right away, we are about to sleep anyway */
232
          reconnect();
×
233
        }
×
234
      }
870✔
235

236
      sleep(d_reconnectWaitTime);
871✔
237
    }
869✔
238
  }
239✔
239
  catch (const std::exception& e)
239✔
240
  {
239✔
241
    SLOG(cerr << "Remote Logger's maintenance thread died on: " << e.what() << endl,
×
242
         g_slog->withName("protobuf")->error(Logr::Error, e.what(), "Remote Logger's maintenance thread died"));
×
243
  }
×
244
  catch (...) {
239✔
245
    SLOG(cerr << "Remote Logger's maintenance thread died on unknown exception" << endl,
×
246
         g_slog->withName("protobuf")->info(Logr::Error, "Remote Logger's maintenance thread died"));
×
247
  }
×
248
}
239✔
249

250
RemoteLogger::~RemoteLogger()
251
{
220✔
252
  d_exiting = true;
220✔
253

254
  d_thread.join();
220✔
255
}
220✔
256

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc