• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

PowerDNS / pdns / 18554073066

16 Oct 2025 07:43AM UTC coverage: 55.336% (-0.01%) from 55.346%
18554073066

Pull #16270

github

web-flow
Merge 6b7c2bd00 into 8f33ac184
Pull Request #16270: dnsdist-2.0.x: Backport 15267: Fix the build-packages workflow

18455 of 57264 branches covered (32.23%)

Branch coverage included in aggregate %.

51954 of 69975 relevant lines covered (74.25%)

2390307.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.89
/pdns/remote_logger.cc
1
#include <unistd.h>
2
#include "threadname.hh"
3
#include "remote_logger.hh"
4
#include <sys/uio.h>
5
#ifdef HAVE_CONFIG_H
6
#include "config.h"
7
#endif
8
#ifdef RECURSOR
9
#include "logger.hh"
10
#else
11
#include "dolog.hh"
12
#endif
13
#include "logging.hh"
14

15
bool CircularWriteBuffer::hasRoomFor(const std::string& str) const
16
{
108✔
17
  if (d_buffer.size() + 2 + str.size() > d_buffer.capacity()) {
108!
18
    return false;
×
19
  }
×
20

21
  return true;
108✔
22
}
108✔
23

24
bool CircularWriteBuffer::write(const std::string& str)
25
{
54✔
26
  if (str.size() > std::numeric_limits<uint16_t>::max() || !hasRoomFor(str)) {
54!
27
    return false;
×
28
  }
×
29

30
  uint16_t len = htons(str.size());
54✔
31
  const char* ptr = reinterpret_cast<const char*>(&len);
54✔
32
  d_buffer.insert(d_buffer.end(), ptr, ptr + 2);
54✔
33
  d_buffer.insert(d_buffer.end(), str.begin(), str.end());
54✔
34

35
  return true;
54✔
36
}
54✔
37

38
bool CircularWriteBuffer::flush(int fd)
39
{
91✔
40
  if (d_buffer.empty()) {
91✔
41
    // not optional, we report EOF otherwise
42
    return false;
60✔
43
  }
60✔
44

45
  auto arr1 = d_buffer.array_one();
31✔
46
  auto arr2 = d_buffer.array_two();
31✔
47

48
  struct iovec iov[2];
31✔
49
  int pos = 0;
31✔
50
  for(const auto& arr : {arr1, arr2}) {
64✔
51
    if(arr.second) {
64✔
52
      iov[pos].iov_base = arr.first;
32✔
53
      iov[pos].iov_len = arr.second;
32✔
54
      ++pos;
32✔
55
    }
32✔
56
  }
64✔
57

58
  ssize_t res = 0;
31✔
59
  do {
31✔
60
    res = writev(fd, iov, pos);
31✔
61

62
    if (res < 0) {
31!
63
      if (errno == EINTR) {
×
64
        continue;
×
65
      }
×
66

67
      if (errno == EAGAIN || errno == EWOULDBLOCK) {
×
68
        return false;
×
69
      }
×
70

71
      /* we can't be sure we haven't sent a partial message,
72
         and we don't want to send the remaining part after reconnecting */
73
      d_buffer.clear();
×
74
      throw std::runtime_error("Couldn't flush a thing: " + stringerror());
×
75
    }
×
76
    else if (!res) {
31!
77
      /* we can't be sure we haven't sent a partial message,
78
         and we don't want to send the remaining part after reconnecting */
79
      d_buffer.clear();
×
80
      throw std::runtime_error("EOF");
×
81
    }
×
82
  }
31✔
83
  while (res < 0);
31!
84

85
  if (static_cast<size_t>(res) == d_buffer.size()) {
31!
86
    d_buffer.clear();
31✔
87
  }
31✔
88
  else {
×
89
    while (res--) {
×
90
      d_buffer.pop_front();
×
91
    }
×
92
  }
×
93

94
  return true;
31✔
95
}
31✔
96

97
const std::string& RemoteLoggerInterface::toErrorString(Result r)
98
{
×
99
  static const std::array<std::string,5> str = {
×
100
    "Queued",
×
101
    "Queue full, dropping",
×
102
    "Not sending too large protobuf message",
×
103
    "Submiting to queue failed",
×
104
    "?"
×
105
  };
×
106
  auto i = static_cast<unsigned int>(r);
×
107
  return str[std::min(i, 4U)];
×
108
}
×
109

110
RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedBytes, uint8_t reconnectWaitTime, bool asyncConnect): d_remote(remote), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_asyncConnect(asyncConnect), d_runtime({CircularWriteBuffer(maxQueuedBytes), nullptr})
19✔
111
{
19✔
112
  if (!d_asyncConnect) {
19!
113
    reconnect();
19✔
114
  }
19✔
115

116
  d_thread = std::thread(&RemoteLogger::maintenanceThread, this);
19✔
117
}
19✔
118

119
bool RemoteLogger::reconnect()
120
{
19✔
121
  try {
19✔
122
    auto newSock = make_unique<Socket>(d_remote.sin4.sin_family, SOCK_STREAM, 0);
19✔
123
    newSock->setNonBlocking();
19✔
124
    newSock->connect(d_remote, d_timeout);
19✔
125

126
    {
19✔
127
      /* we are now successfully connected, time to take the lock and update the
128
         socket */
129
      auto runtime = d_runtime.lock();
19✔
130
      runtime->d_socket = std::move(newSock);
19✔
131
    }
19✔
132
  }
19✔
133
  catch (const std::exception& e) {
19✔
134
#ifdef RECURSOR
135
    SLOG(g_log<<Logger::Warning<<"Error connecting to remote logger "<<d_remote.toStringWithPort()<<": "<<e.what()<<std::endl,
136
         g_slog->withName("protobuf")->error(Logr::Error, e.what(), "Exception while connecting to remote logger", "address", Logging::Loggable(d_remote)));
137
#else
138
    warnlog("Error connecting to remote logger %s: %s", d_remote.toStringWithPort(), e.what());
×
139
#endif
×
140

141
    return false;
×
142
  }
×
143
  return true;
19✔
144
}
19✔
145

146
RemoteLoggerInterface::Result RemoteLogger::queueData(const std::string& data)
147
{
54✔
148
  auto runtime = d_runtime.lock();
54✔
149

150
  if (data.size() > std::numeric_limits<uint16_t>::max()) {
54!
151
    ++runtime->d_stats.d_tooLarge;
×
152
    return Result::TooLarge;
×
153
  }
×
154

155
  if (!runtime->d_writer.hasRoomFor(data)) {
54!
156
    /* not connected, queue is full, just drop */
157
    if (!runtime->d_socket) {
×
158
      ++runtime->d_stats.d_pipeFull;
×
159
      return Result::PipeFull;
×
160
    }
×
161
    try {
×
162
      /* we try to flush some data */
163
      if (!runtime->d_writer.flush(runtime->d_socket->getHandle())) {
×
164
        /* but failed, let's just drop */
165
        ++runtime->d_stats.d_pipeFull;
×
166
        return Result::PipeFull;
×
167
      }
×
168

169
      /* see if we freed enough data */
170
      if (!runtime->d_writer.hasRoomFor(data)) {
×
171
        /* we didn't */
172
        ++runtime->d_stats.d_pipeFull;
×
173
        return Result::PipeFull;
×
174
      }
×
175
    }
×
176
    catch(const std::exception& e) {
×
177
      //      cout << "Got exception writing: "<<e.what()<<endl;
178
      runtime->d_socket.reset();
×
179
      ++runtime->d_stats.d_otherError;
×
180
      return Result::OtherError;
×
181
    }
×
182
  }
×
183

184
  runtime->d_writer.write(data);
54✔
185
  ++runtime->d_stats.d_queued;
54✔
186
  return Result::Queued;
54✔
187
}
54✔
188

189
void RemoteLogger::maintenanceThread() 
190
{
19✔
191
  try {
19✔
192
#ifdef RECURSOR
193
    string threadName = "rec/remlog";
194
#else
195
    string threadName = "dnsdist/remLog";
19✔
196
#endif
19✔
197
    setThreadName(threadName);
19✔
198

199
    for (;;) {
94✔
200
      if (d_exiting) {
94!
201
        break;
×
202
      }
×
203

204
      bool connected = true;
94✔
205
      if (d_runtime.lock()->d_socket == nullptr) {
94!
206
        // if it was unset, it will remain so, we are the only ones setting it!
207
        connected = reconnect();
×
208
      }
×
209

210
      /* we will just go to sleep if the reconnection just failed */
211
      if (connected) {
94!
212
        try {
94✔
213
          /* we don't want to take the lock while trying to reconnect */
214
          auto runtime = d_runtime.lock();
94✔
215
          if (runtime->d_socket) { // check if it is set
94!
216
            /* if flush() returns false, it means that we couldn't flush anything yet
217
               either because there is nothing to flush, or because the outgoing TCP
218
               buffer is full. That's fine by us */
219
            runtime->d_writer.flush(runtime->d_socket->getHandle());
94✔
220
          }
94✔
221
          else {
×
222
            connected = false;
×
223
          }
×
224
        }
94✔
225
        catch (const std::exception& e) {
94✔
226
          d_runtime.lock()->d_socket.reset();
×
227
          connected = false;
×
228
        }
×
229

230
        if (!connected) {
94!
231
          /* let's try to reconnect right away, we are about to sleep anyway */
232
          reconnect();
×
233
        }
×
234
      }
94✔
235

236
      sleep(d_reconnectWaitTime);
94✔
237
    }
94✔
238
  }
19✔
239
  catch (const std::exception& e)
19✔
240
  {
19✔
241
    SLOG(cerr << "Remote Logger's maintenance thread died on: " << e.what() << endl,
×
242
         g_slog->withName("protobuf")->error(Logr::Error, e.what(), "Remote Logger's maintenance thread died"));
×
243
  }
×
244
  catch (...) {
19✔
245
    SLOG(cerr << "Remote Logger's maintenance thread died on unknown exception" << endl,
×
246
         g_slog->withName("protobuf")->info(Logr::Error, "Remote Logger's maintenance thread died"));
×
247
  }
×
248
}
19✔
249

250
RemoteLogger::~RemoteLogger()
251
{
×
252
  d_exiting = true;
×
253

254
  d_thread.join();
×
255
}
×
256

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc