• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

PowerDNS / pdns / 19741624072

27 Nov 2025 03:45PM UTC coverage: 73.086% (+0.02%) from 73.065%
19741624072

Pull #16570

github

web-flow
Merge 08a2cdb1d into f94a3f63f
Pull Request #16570: rec: rewrite all unwrap calls in web.rs

38523 of 63408 branches covered (60.75%)

Branch coverage included in aggregate %.

128044 of 164496 relevant lines covered (77.84%)

6531485.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.78
/pdns/remote_logger.cc
1
#include <unistd.h>
2
#include "threadname.hh"
3
#include "remote_logger.hh"
4
#include <sys/uio.h>
5
#ifdef HAVE_CONFIG_H
6
#include "config.h"
7
#endif
8
#ifdef RECURSOR
9
#include "logger.hh"
10
#else
11
#include "dolog.hh"
12
#endif
13
#include "logging.hh"
14

15
bool CircularWriteBuffer::hasRoomFor(const std::string& str) const
16
{
856✔
17
  if (d_buffer.size() + 2 + str.size() > d_buffer.capacity()) {
856!
18
    return false;
×
19
  }
×
20

21
  return true;
856✔
22
}
856✔
23

24
bool CircularWriteBuffer::write(const std::string& str)
25
{
428✔
26
  if (str.size() > std::numeric_limits<uint16_t>::max() || !hasRoomFor(str)) {
428!
27
    return false;
×
28
  }
×
29

30
  uint16_t len = htons(str.size());
428✔
31
  const char* ptr = reinterpret_cast<const char*>(&len);
428✔
32
  d_buffer.insert(d_buffer.end(), ptr, ptr + 2);
428✔
33
  d_buffer.insert(d_buffer.end(), str.begin(), str.end());
428✔
34

35
  return true;
428✔
36
}
428✔
37

38
bool CircularWriteBuffer::flush(int fd)
39
{
981✔
40
  if (d_buffer.empty()) {
981✔
41
    // not optional, we report EOF otherwise
42
    return false;
577✔
43
  }
577✔
44

45
  auto arr1 = d_buffer.array_one();
404✔
46
  auto arr2 = d_buffer.array_two();
404✔
47

48
  struct iovec iov[2];
404✔
49
  int pos = 0;
404✔
50
  for(const auto& arr : {arr1, arr2}) {
808✔
51
    if(arr.second) {
808✔
52
      iov[pos].iov_base = arr.first;
408✔
53
      iov[pos].iov_len = arr.second;
408✔
54
      ++pos;
408✔
55
    }
408✔
56
  }
808✔
57

58
  ssize_t res = 0;
404✔
59
  do {
404✔
60
    res = writev(fd, iov, pos);
404✔
61

62
    if (res < 0) {
404!
63
      if (errno == EINTR) {
×
64
        continue;
×
65
      }
×
66

67
      if (errno == EAGAIN || errno == EWOULDBLOCK) {
×
68
        return false;
×
69
      }
×
70

71
      /* we can't be sure we haven't sent a partial message,
72
         and we don't want to send the remaining part after reconnecting */
73
      d_buffer.clear();
×
74
      throw std::runtime_error("Couldn't flush a thing: " + stringerror());
×
75
    }
×
76
    else if (!res) {
404!
77
      /* we can't be sure we haven't sent a partial message,
78
         and we don't want to send the remaining part after reconnecting */
79
      d_buffer.clear();
×
80
      throw std::runtime_error("EOF");
×
81
    }
×
82
  }
404✔
83
  while (res < 0);
404!
84

85
  if (static_cast<size_t>(res) == d_buffer.size()) {
404✔
86
    d_buffer.clear();
403✔
87
  }
403✔
88
  else {
1✔
89
    while (res--) {
1!
90
      d_buffer.pop_front();
×
91
    }
×
92
  }
1✔
93

94
  return true;
404✔
95
}
404✔
96

97
const std::string& RemoteLoggerInterface::toErrorString(Result r)
98
{
×
99
  static const std::array<std::string,5> str = {
×
100
    "Queued",
×
101
    "Queue full, dropping",
×
102
    "Not sending too large protobuf message",
×
103
    "Submiting to queue failed",
×
104
    "?"
×
105
  };
×
106
  auto i = static_cast<unsigned int>(r);
×
107
  return str[std::min(i, 4U)];
×
108
}
×
109

110
RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedBytes, uint8_t reconnectWaitTime, bool asyncConnect): d_remote(remote), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_asyncConnect(asyncConnect), d_runtime({CircularWriteBuffer(maxQueuedBytes), nullptr})
31✔
111
{
251✔
112
  if (!d_asyncConnect) {
251!
113
    reconnect();
251✔
114
  }
251✔
115

116
  d_thread = std::thread(&RemoteLogger::maintenanceThread, this);
251✔
117
}
251✔
118

119
bool RemoteLogger::reconnect()
120
{
251✔
121
  try {
251✔
122
    auto newSock = make_unique<Socket>(d_remote.sin4.sin_family, SOCK_STREAM, 0);
251✔
123
    newSock->setNonBlocking();
251✔
124
    newSock->connect(d_remote, d_timeout);
251✔
125

126
    {
251✔
127
      /* we are now successfully connected, time to take the lock and update the
128
         socket */
129
      auto runtime = d_runtime.lock();
251✔
130
      runtime->d_socket = std::move(newSock);
251✔
131
    }
251✔
132
  }
251✔
133
  catch (const std::exception& e) {
251✔
134
#ifdef RECURSOR
135
    SLOG(g_log<<Logger::Warning<<"Error connecting to remote logger "<<d_remote.toStringWithPort()<<": "<<e.what()<<std::endl,
136
         g_slog->withName("protobuf")->error(Logr::Error, e.what(), "Exception while connecting to remote logger", "address", Logging::Loggable(d_remote)));
137
#else
138
    warnlog("Error connecting to remote logger %s: %s", d_remote.toStringWithPort(), e.what());
139
#endif
140

141
    return false;
×
142
  }
×
143
  return true;
251✔
144
}
251✔
145

146
RemoteLoggerInterface::Result RemoteLogger::queueData(const std::string& data)
147
{
428✔
148
  auto runtime = d_runtime.lock();
428✔
149

150
  if (data.size() > std::numeric_limits<uint16_t>::max()) {
428!
151
    ++runtime->d_stats.d_tooLarge;
×
152
    return Result::TooLarge;
×
153
  }
×
154

155
  if (!runtime->d_writer.hasRoomFor(data)) {
428!
156
    /* not connected, queue is full, just drop */
157
    if (!runtime->d_socket) {
×
158
      ++runtime->d_stats.d_pipeFull;
×
159
      return Result::PipeFull;
×
160
    }
×
161
    try {
×
162
      /* we try to flush some data */
163
      if (!runtime->d_writer.flush(runtime->d_socket->getHandle())) {
×
164
        /* but failed, let's just drop */
165
        ++runtime->d_stats.d_pipeFull;
×
166
        return Result::PipeFull;
×
167
      }
×
168

169
      /* see if we freed enough data */
170
      if (!runtime->d_writer.hasRoomFor(data)) {
×
171
        /* we didn't */
172
        ++runtime->d_stats.d_pipeFull;
×
173
        return Result::PipeFull;
×
174
      }
×
175
    }
×
176
    catch(const std::exception& e) {
×
177
      //      cout << "Got exception writing: "<<e.what()<<endl;
178
      runtime->d_socket.reset();
×
179
      ++runtime->d_stats.d_otherError;
×
180
      return Result::OtherError;
×
181
    }
×
182
  }
×
183

184
  runtime->d_writer.write(data);
428✔
185
#ifdef RECURSOR
350✔
186
  extern bool g_regressionTestMode;
350✔
187
  if (g_regressionTestMode) {
350✔
188
    runtime->d_writer.flush(runtime->d_socket->getHandle());
336✔
189
  }
336✔
190
#endif
350✔
191
  ++runtime->d_stats.d_queued;
428✔
192
  return Result::Queued;
428✔
193
}
428✔
194

195
void RemoteLogger::maintenanceThread()
196
{
251✔
197
  try {
251✔
198
#ifdef RECURSOR
220✔
199
    string threadName = "rec/remlog";
220✔
200
#else
201
    string threadName = "dnsdist/remLog";
31✔
202
#endif
31✔
203
    setThreadName(threadName);
251✔
204

205
    for (;;) {
867✔
206
      if (d_exiting) {
867✔
207
        break;
220✔
208
      }
220✔
209

210
      bool connected = true;
647✔
211
      if (d_runtime.lock()->d_socket == nullptr) {
647!
212
        // if it was unset, it will remain so, we are the only ones setting it!
213
        connected = reconnect();
×
214
      }
×
215

216
      /* we will just go to sleep if the reconnection just failed */
217
      if (connected) {
647✔
218
        try {
646✔
219
          /* we don't want to take the lock while trying to reconnect */
220
          auto runtime = d_runtime.lock();
646✔
221
          if (runtime->d_socket) { // check if it is set
647✔
222
            /* if flush() returns false, it means that we couldn't flush anything yet
223
               either because there is nothing to flush, or because the outgoing TCP
224
               buffer is full. That's fine by us */
225
            runtime->d_writer.flush(runtime->d_socket->getHandle());
647✔
226
          }
647✔
227
          else {
2,147,483,647✔
228
            connected = false;
2,147,483,647✔
229
          }
2,147,483,647✔
230
        }
646✔
231
        catch (const std::exception& e) {
646✔
232
          d_runtime.lock()->d_socket.reset();
×
233
          connected = false;
×
234
        }
×
235

236
        if (!connected) {
646!
237
          /* let's try to reconnect right away, we are about to sleep anyway */
238
          reconnect();
×
239
        }
×
240
      }
645✔
241

242
      sleep(d_reconnectWaitTime);
647✔
243
    }
646✔
244
  }
251✔
245
  catch (const std::exception& e)
251✔
246
  {
251✔
247
    SLOG(cerr << "Remote Logger's maintenance thread died on: " << e.what() << endl,
×
248
         g_slog->withName("protobuf")->error(Logr::Error, e.what(), "Remote Logger's maintenance thread died"));
×
249
  }
×
250
  catch (...) {
251✔
251
    SLOG(cerr << "Remote Logger's maintenance thread died on unknown exception" << endl,
×
252
         g_slog->withName("protobuf")->info(Logr::Error, "Remote Logger's maintenance thread died"));
×
253
  }
×
254
}
251✔
255

256
RemoteLogger::~RemoteLogger()
257
{
220✔
258
  d_exiting = true;
220✔
259

260
  d_thread.join();
220✔
261
}
220✔
262

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc