• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

PowerDNS / pdns / 12321902803

13 Dec 2024 07:34PM UTC coverage: 66.359% (+1.6%) from 64.78%
12321902803

Pull #14970

github

web-flow
Merge e3a7df61c into 3dfd8e317
Pull Request #14970: boost > std optional

26084 of 54744 branches covered (47.65%)

Branch coverage included in aggregate %.

14 of 15 new or added lines in 2 files covered. (93.33%)

1863 existing lines in 52 files now uncovered.

85857 of 113946 relevant lines covered (75.35%)

4412729.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.89
/pdns/remote_logger.cc
1
#include <unistd.h>
2
#include "threadname.hh"
3
#include "remote_logger.hh"
4
#include <sys/uio.h>
5
#ifdef HAVE_CONFIG_H
6
#include "config.h"
7
#endif
8
#ifdef PDNS_CONFIG_ARGS
9
#include "logger.hh"
10
#define WE_ARE_RECURSOR
11
#else
12
#include "dolog.hh"
13
#endif
14
#include "logging.hh"
15

16
bool CircularWriteBuffer::hasRoomFor(const std::string& str) const
17
{
352✔
18
  if (d_buffer.size() + 2 + str.size() > d_buffer.capacity()) {
352!
19
    return false;
×
20
  }
×
21

22
  return true;
352✔
23
}
352✔
24

25
bool CircularWriteBuffer::write(const std::string& str)
26
{
176✔
27
  if (str.size() > std::numeric_limits<uint16_t>::max() || !hasRoomFor(str)) {
176!
28
    return false;
×
29
  }
×
30

31
  uint16_t len = htons(str.size());
176✔
32
  const char* ptr = reinterpret_cast<const char*>(&len);
176✔
33
  d_buffer.insert(d_buffer.end(), ptr, ptr + 2);
176✔
34
  d_buffer.insert(d_buffer.end(), str.begin(), str.end());
176✔
35

36
  return true;
176✔
37
}
176✔
38

39
bool CircularWriteBuffer::flush(int fd)
40
{
924✔
41
  if (d_buffer.empty()) {
924✔
42
    // not optional, we report EOF otherwise
43
    return false;
840✔
44
  }
840✔
45

46
  auto arr1 = d_buffer.array_one();
84✔
47
  auto arr2 = d_buffer.array_two();
84✔
48

49
  struct iovec iov[2];
84✔
50
  int pos = 0;
84✔
51
  for(const auto& arr : {arr1, arr2}) {
168✔
52
    if(arr.second) {
168✔
53
      iov[pos].iov_base = arr.first;
84✔
54
      iov[pos].iov_len = arr.second;
84✔
55
      ++pos;
84✔
56
    }
84✔
57
  }
168✔
58

59
  ssize_t res = 0;
84✔
60
  do {
84✔
61
    res = writev(fd, iov, pos);
84✔
62

63
    if (res < 0) {
84!
64
      if (errno == EINTR) {
×
65
        continue;
×
66
      }
×
67

68
      if (errno == EAGAIN || errno == EWOULDBLOCK) {
×
69
        return false;
×
70
      }
×
71

72
      /* we can't be sure we haven't sent a partial message,
73
         and we don't want to send the remaining part after reconnecting */
74
      d_buffer.clear();
×
75
      throw std::runtime_error("Couldn't flush a thing: " + stringerror());
×
76
    }
×
77
    else if (!res) {
84!
78
      /* we can't be sure we haven't sent a partial message,
79
         and we don't want to send the remaining part after reconnecting */
80
      d_buffer.clear();
×
81
      throw std::runtime_error("EOF");
×
82
    }
×
83
  }
84✔
84
  while (res < 0);
84!
85

86
  if (static_cast<size_t>(res) == d_buffer.size()) {
84!
87
    d_buffer.clear();
84✔
88
  }
84✔
UNCOV
89
  else {
×
UNCOV
90
    while (res--) {
×
91
      d_buffer.pop_front();
×
92
    }
×
UNCOV
93
  }
×
94

95
  return true;
84✔
96
}
84✔
97

98
const std::string& RemoteLoggerInterface::toErrorString(Result r)
99
{
×
100
  static const std::array<std::string,5> str = {
×
101
    "Queued",
×
102
    "Queue full, dropping",
×
103
    "Not sending too large protobuf message",
×
104
    "Submiting to queue failed",
×
105
    "?"
×
106
  };
×
107
  auto i = static_cast<unsigned int>(r);
×
108
  return str[std::min(i, 4U)];
×
109
}
×
110

111
RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedBytes, uint8_t reconnectWaitTime, bool asyncConnect): d_remote(remote), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_asyncConnect(asyncConnect), d_runtime({CircularWriteBuffer(maxQueuedBytes), nullptr})
112
{
220✔
113
  if (!d_asyncConnect) {
220!
114
    reconnect();
220✔
115
  }
220✔
116

117
  d_thread = std::thread(&RemoteLogger::maintenanceThread, this);
220✔
118
}
220✔
119

120
bool RemoteLogger::reconnect()
121
{
220✔
122
  try {
220✔
123
    auto newSock = make_unique<Socket>(d_remote.sin4.sin_family, SOCK_STREAM, 0);
220✔
124
    newSock->setNonBlocking();
220✔
125
    newSock->connect(d_remote, d_timeout);
220✔
126

127
    {
220✔
128
      /* we are now successfully connected, time to take the lock and update the
129
         socket */
130
      auto runtime = d_runtime.lock();
220✔
131
      runtime->d_socket = std::move(newSock);
220✔
132
    }
220✔
133
  }
220✔
134
  catch (const std::exception& e) {
220✔
UNCOV
135
#ifdef WE_ARE_RECURSOR
×
UNCOV
136
    SLOG(g_log<<Logger::Warning<<"Error connecting to remote logger "<<d_remote.toStringWithPort()<<": "<<e.what()<<std::endl,
×
UNCOV
137
         g_slog->withName("protobuf")->error(Logr::Error, e.what(), "Exception while connecting to remote logger", "address", Logging::Loggable(d_remote)));
×
138
#else
139
    warnlog("Error connecting to remote logger %s: %s", d_remote.toStringWithPort(), e.what());
140
#endif
141

142
    return false;
×
143
  }
×
144
  return true;
218✔
145
}
220✔
146

147
RemoteLoggerInterface::Result RemoteLogger::queueData(const std::string& data)
148
{
176✔
149
  auto runtime = d_runtime.lock();
176✔
150

151
  if (data.size() > std::numeric_limits<uint16_t>::max()) {
176!
152
    ++runtime->d_stats.d_tooLarge;
×
153
    return Result::TooLarge;
×
154
  }
×
155

156
  if (!runtime->d_writer.hasRoomFor(data)) {
176!
157
    /* not connected, queue is full, just drop */
158
    if (!runtime->d_socket) {
×
159
      ++runtime->d_stats.d_pipeFull;
×
160
      return Result::PipeFull;
×
161
    }
×
162
    try {
×
163
      /* we try to flush some data */
164
      if (!runtime->d_writer.flush(runtime->d_socket->getHandle())) {
×
165
        /* but failed, let's just drop */
166
        ++runtime->d_stats.d_pipeFull;
×
167
        return Result::PipeFull;
×
168
      }
×
169

170
      /* see if we freed enough data */
171
      if (!runtime->d_writer.hasRoomFor(data)) {
×
172
        /* we didn't */
173
        ++runtime->d_stats.d_pipeFull;
×
174
        return Result::PipeFull;
×
175
      }
×
176
    }
×
177
    catch(const std::exception& e) {
×
178
      //      cout << "Got exception writing: "<<e.what()<<endl;
179
      runtime->d_socket.reset();
×
180
      ++runtime->d_stats.d_otherError;
×
181
      return Result::OtherError;
×
182
    }
×
183
  }
×
184

185
  runtime->d_writer.write(data);
176✔
186
  ++runtime->d_stats.d_queued;
176✔
187
  return Result::Queued;
176✔
188
}
176✔
189

190
void RemoteLogger::maintenanceThread() 
191
{
220✔
192
  try {
220✔
193
#ifdef WE_ARE_RECURSOR
220✔
194
    string threadName = "rec/remlog";
220✔
195
#else
196
    string threadName = "dnsdist/remLog";
197
#endif
198
    setThreadName(threadName);
220✔
199

200
    for (;;) {
926✔
201
      if (d_exiting) {
926!
202
        break;
×
203
      }
×
204

205
      bool connected = true;
926✔
206
      if (d_runtime.lock()->d_socket == nullptr) {
926!
207
        // if it was unset, it will remain so, we are the only ones setting it!
208
        connected = reconnect();
×
209
      }
×
210

211
      /* we will just go to sleep if the reconnection just failed */
212
      if (connected) {
926✔
213
        try {
925✔
214
          /* we don't want to take the lock while trying to reconnect */
215
          auto runtime = d_runtime.lock();
925✔
216
          if (runtime->d_socket) { // check if it is set
925!
217
            /* if flush() returns false, it means that we couldn't flush anything yet
218
               either because there is nothing to flush, or because the outgoing TCP
219
               buffer is full. That's fine by us */
220
            runtime->d_writer.flush(runtime->d_socket->getHandle());
925✔
221
          }
925✔
222
          else {
×
223
            connected = false;
×
224
          }
×
225
        }
925✔
226
        catch (const std::exception& e) {
925✔
227
          d_runtime.lock()->d_socket.reset();
×
228
          connected = false;
×
229
        }
×
230

231
        if (!connected) {
925!
232
          /* let's try to reconnect right away, we are about to sleep anyway */
233
          reconnect();
×
234
        }
×
235
      }
925✔
236

237
      sleep(d_reconnectWaitTime);
926✔
238
    }
926✔
239
  }
220✔
240
  catch (const std::exception& e)
220✔
241
  {
220✔
242
    SLOG(cerr << "Remote Logger's maintenance thread died on: " << e.what() << endl,
×
243
         g_slog->withName("protobuf")->error(Logr::Error, e.what(), "Remote Logger's maintenance thread died"));
×
244
  }
×
245
  catch (...) {
220✔
246
    SLOG(cerr << "Remote Logger's maintenance thread died on unknown exception" << endl,
×
247
         g_slog->withName("protobuf")->info(Logr::Error, "Remote Logger's maintenance thread died"));
×
248
  }
×
249
}
220✔
250

251
RemoteLogger::~RemoteLogger()
252
{
×
253
  d_exiting = true;
×
254

255
  d_thread.join();
×
256
}
×
257

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc