• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bcpearce / hass-motion-detection-addon / 21126887229

19 Jan 2026 05:54AM UTC coverage: 17.99% (-32.0%) from 49.971%
21126887229

Pull #26

github

web-flow
Merge c98d1d60e into 0206cb867
Pull Request #26: Additional test coverage

292 of 1865 branches covered (15.66%)

Branch coverage included in aggregate %.

8 of 22 new or added lines in 3 files covered. (36.36%)

827 existing lines in 29 files now uncovered.

347 of 1687 relevant lines covered (20.57%)

28.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/Callback/AsyncFileSave.cxx
1
#include "Logger.h"
2
#include "WindowsWrapper.h"
3

4
#include "Callback/AsyncFileSave.h"
5

6
#include "Util/BufferOperations.h"
7
#include "Util/Tools.h"
8

9
#include <ranges>
10

11
#if __linux__
12
#include <errno.h>
13
#include <fcntl.h>
14
#include <signal.h>
15
#include <string.h>
16
#define IO_SIGNAL SIGUSR1
17
#endif
18

19
namespace {
20
static size_t contextId{1};
21

22
#if _WIN32
23
std::string GetErrorMessage(DWORD dwErrorCode) {
24
  LPVOID lpMsgBuf{nullptr};
25

26
  FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM |
27
                    FORMAT_MESSAGE_IGNORE_INSERTS,
28
                NULL, dwErrorCode, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
29
                (LPTSTR)&lpMsgBuf, 0, NULL);
30

31
  if (lpMsgBuf) {
32
    return std::format(TEXT("Error ({}): {}"), dwErrorCode,
33
                       static_cast<LPTSTR>(lpMsgBuf));
34
    LocalFree(lpMsgBuf);
35
  } else {
36
    return "Error, unable to receive error message";
37
  }
38
}
39

40
void LogLastError() {
41
  const auto error = GetLastError();
42
  LOGGER->error("Win32 Error ({}): {}", error, GetErrorMessage(error));
43
}
44

45
#elif __linux__
46

47
void LogLastError() {
×
48
  const auto error = errno;
×
49
  LOGGER->error("Linux Error ({}): {}", error, strerror(error));
×
50
}
×
51

52
#endif
53

54
} // namespace
55

56
namespace callback {
57

UNCOV
58
AsyncFileSave::AsyncFileSave(std::shared_ptr<TaskScheduler> pSched,
×
59
                             const std::filesystem::path &dstPath,
60
                             const boost::url &url, const std::string &user,
UNCOV
61
                             const std::string &password)
×
UNCOV
62
    : AsyncDebouncer(pSched), pSched_{pSched}, dstPath_{dstPath}, url_{url},
×
UNCOV
63
      user_{user}, password_{password} {
×
UNCOV
64
  if (!std::filesystem::exists(dstPath_)) {
×
65
    std::filesystem::create_directories(dstPath_);
×
66
  }
UNCOV
67
  spareBuf_.reserve(defaultJpgBufferSize);
×
UNCOV
68
  savedFilePaths_.set_capacity(defaultSavedFilePathsSize);
×
UNCOV
69
}
×
70

UNCOV
71
AsyncFileSave::~AsyncFileSave() noexcept {
×
UNCOV
72
  for (const auto &socketCtx : socketCtxs_ | std::views::values) {
×
73
    pSched_->disableBackgroundHandling(socketCtx->sockfd);
×
74
    wCurlMulti_(curl_multi_assign, socketCtx->sockfd, nullptr);
×
75
  }
UNCOV
76
}
×
77

UNCOV
78
void AsyncFileSave::Register() {
×
UNCOV
79
  wCurlMulti_(curl_multi_setopt, CURLMOPT_SOCKETFUNCTION, SocketCallback);
×
UNCOV
80
  wCurlMulti_(curl_multi_setopt, CURLMOPT_SOCKETDATA, this);
×
UNCOV
81
  wCurlMulti_(curl_multi_setopt, CURLMOPT_TIMERFUNCTION, TimeoutCallback);
×
UNCOV
82
  wCurlMulti_(curl_multi_setopt, CURLMOPT_TIMERDATA, this);
×
83

UNCOV
84
  int runningHandles{0};
×
UNCOV
85
  wCurlMulti_(curl_multi_socket_action, CURL_SOCKET_TIMEOUT, 0,
×
86
              &runningHandles);
87

88
#ifdef __linux__
UNCOV
89
  InstallHandlers();
×
90
#endif
UNCOV
91
}
×
92

UNCOV
93
void AsyncFileSave::SaveFileAtEndpoint(const std::filesystem::path &_dst) {
×
94

95
  // format a filename based on the current time
UNCOV
96
  auto dst = dstPath_;
×
UNCOV
97
  if (std::filesystem::is_directory(dstPath_) && _dst.empty()) {
×
98
    const auto now = std::chrono::system_clock::now();
×
99
    const auto fileName = std::format("{:%Y-%m-%d_%H-%M-%S}.jpg", now);
×
100
    dst /= fileName;
×
UNCOV
101
  } else if (!_dst.has_parent_path()) {
×
102
    // if the path is not absolute, append it to the download directory
UNCOV
103
    dst /= _dst;
×
104
  } else {
105
    // if the path is absolute, use it as is
106
    dst = _dst;
×
107
  }
108

UNCOV
109
  auto pCtx = std::make_shared<_CurlEasyContext>();
×
110

111
  try {
112
#if _WIN32
113
    pCtx->writeData.hFile =
114
        CreateFile(dst.string().c_str(), GENERIC_WRITE, 0, NULL, CREATE_ALWAYS,
115
                   FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, NULL);
116
    ZeroMemory(&pCtx->writeData.overlapped, sizeof(pCtx->writeData.overlapped));
117
    pCtx->writeData.overlapped.Offset = 0xFFFFFFFF;
118
    pCtx->writeData.overlapped.OffsetHigh = 0xFFFFFFFF;
119
    pCtx->writeData.overlapped.hEvent = static_cast<HANDLE>(pCtx.get());
120
#elif __linux__
UNCOV
121
    pCtx->writeData._aiocb.aio_fildes = open(
×
UNCOV
122
        dst.string().c_str(), O_CREAT | O_WRONLY | O_ASYNC | O_TRUNC, 0666);
×
UNCOV
123
    if (pCtx->writeData._aiocb.aio_fildes == -1) {
×
124
      throw std::runtime_error(strerror(errno));
×
125
    }
UNCOV
126
    pCtx->writeData._aiocb.aio_reqprio = 0;
×
UNCOV
127
    pCtx->writeData._aiocb.aio_offset = 0;
×
UNCOV
128
    pCtx->writeData._aiocb.aio_sigevent.sigev_notify = SIGEV_SIGNAL;
×
UNCOV
129
    pCtx->writeData._aiocb.aio_sigevent.sigev_signo = IO_SIGNAL;
×
UNCOV
130
    pCtx->writeData._aiocb.aio_sigevent.sigev_value.sival_ptr = pCtx.get();
×
131
#endif
UNCOV
132
    pCtx->writeData.dstPath = std::move(dst);
×
UNCOV
133
    pCtx->writeData.buf = std::move(spareBuf_);
×
UNCOV
134
    pCtx->writeData.buf.clear();
×
UNCOV
135
    pCtx->pHandler = this->weak_from_this();
×
UNCOV
136
    pCtx->contextId = contextId;
×
137

UNCOV
138
    pCtx->wCurl(curl_easy_setopt, CURLOPT_WRITEFUNCTION,
×
139
                util::FillBufferCallback);
UNCOV
140
    pCtx->wCurl(curl_easy_setopt, CURLOPT_PRIVATE, contextId);
×
141

UNCOV
142
    pCtx->wCurl(curl_easy_setopt, CURLOPT_URL, url_.c_str());
×
UNCOV
143
    pCtx->wCurl(curl_easy_setopt, CURLOPT_HTTPGET, 1L);
×
UNCOV
144
    pCtx->wCurl(curl_easy_setopt, CURLOPT_WRITEDATA, &pCtx->writeData.buf);
×
145

UNCOV
146
    if (!user_.empty() && !password_.empty()) {
×
147
      pCtx->wCurl(curl_easy_setopt, CURLOPT_HTTPAUTH, CURLAUTH_DIGEST);
×
148
      pCtx->wCurl(curl_easy_setopt, CURLOPT_USERNAME, user_.c_str());
×
149
      pCtx->wCurl(curl_easy_setopt, CURLOPT_PASSWORD, password_.c_str());
×
150
    }
151

UNCOV
152
    wCurlMulti_(curl_multi_add_handle, pCtx->wCurl.pCurl_);
×
UNCOV
153
    pCtx->wCurl(curl_easy_setopt, CURLOPT_PRIVATE, contextId);
×
154

UNCOV
155
    easyCtxs_[contextId] = pCtx;
×
UNCOV
156
    ++contextId;
×
157
  } catch (const std::exception &e) {
×
158
    LOGGER->error(e.what());
×
159
  } catch (...) {
×
160
    LOGGER->error("Unknown error setting up Async File Save");
×
161
  }
×
UNCOV
162
}
×
163

164
void AsyncFileSave::operator()(detector::Payload data) {
×
165

166
  static decltype(data.rois) lastRois = {};
167

168
  const bool risingEdge = lastRois.empty() && !data.rois.empty();
×
169

170
  if (UpdateAllowed() && risingEdge) {
×
171
    SaveFileAtEndpoint();
×
172
    Debounce(debounceTime);
×
173
  }
174

175
  const bool reschedule = !UpdateAllowed() && !data.rois.empty();
×
176
  if (reschedule) {
×
177
    Debounce(debounceTime);
×
178
  }
179
}
×
180

181
const boost::circular_buffer<std::filesystem::path> &
UNCOV
182
AsyncFileSave::GetSavedFilePaths() const {
×
UNCOV
183
  return savedFilePaths_;
×
184
}
185

UNCOV
186
void AsyncFileSave::SetLimitSavedFilePaths(size_t limit) {
×
UNCOV
187
  while (savedFilePaths_.size() > limit) {
×
188
    if (!std::filesystem::remove(savedFilePaths_.front())) {
×
189
      LOGGER->warn("Failed to remove file: {}", savedFilePaths_.front());
×
190
    } else {
191
      LOGGER->info("Removed file: {}", savedFilePaths_.front());
×
192
    }
193
    savedFilePaths_.pop_front();
×
194
  }
UNCOV
195
  savedFilePaths_.set_capacity(limit);
×
UNCOV
196
}
×
197

198
#if _WIN32
199

200
AsyncFileSave::Win32Overlapped::~Win32Overlapped() noexcept {
201
  if (hFile != INVALID_HANDLE_VALUE && !CloseHandle(hFile)) {
202
    const auto error = GetLastError();
203
    LOGGER->error("Failed to close {} with error ({}): {}", dstPath, error,
204
                  GetErrorMessage(error));
205
  }
206
}
207

208
#elif __linux__
209

UNCOV
210
AsyncFileSave::LinuxAioFile::~LinuxAioFile() {
×
UNCOV
211
  if (close(_aiocb.aio_fildes) == -1) {
×
212
    LOGGER->error("Failed to close {} with error ({}): {}", dstPath, errno,
×
213
                  strerror(errno));
×
214
  }
UNCOV
215
}
×
216

217
#endif
218

UNCOV
219
void AsyncFileSave::CheckMultiInfo() {
×
UNCOV
220
  CURLMsg *message{nullptr};
×
221

UNCOV
222
  int pending{0};
×
223

UNCOV
224
  auto &wCurlMulti = wCurlMulti_;
×
UNCOV
225
  while ((message = curl_multi_info_read(wCurlMulti.pCurl_, &pending))) {
×
UNCOV
226
    switch (message->msg) {
×
UNCOV
227
    case CURLMSG_DONE: {
×
UNCOV
228
      size_t ctxId{0};
×
UNCOV
229
      CURL *easy = message->easy_handle;
×
230

UNCOV
231
      curl_easy_getinfo(easy, CURLINFO_PRIVATE, &ctxId);
×
UNCOV
232
      auto it = easyCtxs_.find(ctxId);
×
UNCOV
233
      if (it != easyCtxs_.end()) {
×
UNCOV
234
        auto &pCtx = it->second;
×
235
        try {
UNCOV
236
          char *effectiveMethod{nullptr};
×
UNCOV
237
          curl_easy_getinfo(easy, CURLINFO_EFFECTIVE_METHOD, &effectiveMethod);
×
UNCOV
238
          int responseCode{0};
×
UNCOV
239
          curl_easy_getinfo(easy, CURLINFO_RESPONSE_CODE, &responseCode);
×
240

UNCOV
241
          switch (responseCode) {
×
UNCOV
242
          case 200:
×
243
          case 201:
UNCOV
244
            if (util::NoCaseCmp(effectiveMethod, "GET")) {
×
245
#if _WIN32
246
              const auto res = WriteFileEx(
247
                  pCtx->writeData.hFile, pCtx->writeData.buf.data(),
248
                  pCtx->writeData.buf.size(), &pCtx->writeData.overlapped,
249
                  AsyncFileSave::FileIOCompletionRoutine);
250
              if (res == ERROR) {
251
                LogLastError();
252
                std::filesystem::remove(pCtx->writeData.dstPath);
253
              }
254
#elif __linux__
UNCOV
255
              pCtx->writeData._aiocb.aio_buf = pCtx->writeData.buf.data();
×
UNCOV
256
              pCtx->writeData._aiocb.aio_nbytes = pCtx->writeData.buf.size();
×
UNCOV
257
              const auto res = aio_write(&pCtx->writeData._aiocb);
×
UNCOV
258
              if (res == -1) {
×
259
                LogLastError();
×
260
                std::filesystem::remove(pCtx->writeData.dstPath);
×
261
              }
262
#endif
263
            }
UNCOV
264
            break;
×
265
          default:
×
266
            LOGGER->info("CURL Request failed with response code {}",
×
267
                         responseCode);
268
            std::filesystem::remove(pCtx->writeData.dstPath);
×
269
            wCurlMulti(curl_multi_remove_handle, pCtx->wCurl.pCurl_);
×
270
            RemoveContext(pCtx.get());
×
271
            return;
×
272
          }
273
        } catch (const std::exception &e) {
×
274
          LOGGER->error(e.what());
×
275
        }
×
276

277
        // cleanup
UNCOV
278
        wCurlMulti(curl_multi_remove_handle, pCtx->wCurl.pCurl_);
×
279
      }
UNCOV
280
      break;
×
281
    }
282
    default:
×
283
      LOGGER->warn("CURLMSG default");
×
284
      break;
×
285
    }
286
  }
287
}
288

UNCOV
289
int AsyncFileSave::SocketCallback(CURL *easy, curl_socket_t s, int action,
×
290
                                  AsyncFileSave *asyncFileSave,
291
                                  _CurlSocketContext *curlSocketContext) {
UNCOV
292
  if (auto pAfs = asyncFileSave->weak_from_this().lock()) {
×
UNCOV
293
    std::shared_ptr<_CurlSocketContext> pCtx;
×
UNCOV
294
    if (curlSocketContext) {
×
295
      pCtx = static_cast<_CurlSocketContext *>(curlSocketContext)
UNCOV
296
                 ->shared_from_this();
×
297
    };
UNCOV
298
    switch (action) {
×
UNCOV
299
    case CURL_POLL_IN:
×
300
    case CURL_POLL_OUT:
301
    case CURL_POLL_INOUT: {
UNCOV
302
      if (!pCtx) {
×
UNCOV
303
        pCtx = std::make_shared<_CurlSocketContext>();
×
UNCOV
304
        pCtx->sockfd = s;
×
UNCOV
305
        pCtx->pHandler = pAfs;
×
UNCOV
306
        pAfs->socketCtxs_[s] = pCtx; // save to the map for pointer preservation
×
307
      }
308

UNCOV
309
      pAfs->wCurlMulti_(curl_multi_assign, s, pCtx.get());
×
310

UNCOV
311
      int flags{0};
×
UNCOV
312
      flags |= (action != CURL_POLL_IN) ? SOCKET_WRITABLE : 0;
×
UNCOV
313
      flags |= (action != CURL_POLL_OUT) ? SOCKET_READABLE : 0;
×
UNCOV
314
      pAfs->pSched_->setBackgroundHandling(
×
UNCOV
315
          s, flags, AsyncFileSave::BackgroundHandlerProc, pCtx.get());
×
UNCOV
316
    } break;
×
UNCOV
317
    case CURL_POLL_REMOVE:
×
UNCOV
318
      pAfs->pSched_->disableBackgroundHandling(s);
×
UNCOV
319
      pAfs->wCurlMulti_(curl_multi_assign, s, nullptr);
×
UNCOV
320
      pAfs->socketCtxs_.erase(s);
×
UNCOV
321
      break;
×
322
    }
UNCOV
323
  }
×
UNCOV
324
  return 0;
×
325
}
326

UNCOV
327
int AsyncFileSave::TimeoutCallback(CURLM *multi, int timeoutMs,
×
328
                                   AsyncFileSave *asyncFileSave) {
UNCOV
329
  if (asyncFileSave) {
×
UNCOV
330
    if (timeoutMs < 0) {
×
UNCOV
331
      asyncFileSave->pSched_->unscheduleDelayedTask(asyncFileSave->token_);
×
332
    } else {
UNCOV
333
      if (timeoutMs == 0) {
×
UNCOV
334
        timeoutMs = 1;
×
335
      }
UNCOV
336
      asyncFileSave->token_ = asyncFileSave->pSched_->scheduleDelayedTask(
×
UNCOV
337
          timeoutMs * 1000, AsyncFileSave::TimeoutHandlerProc, asyncFileSave);
×
338
    }
339
  }
UNCOV
340
  return 0;
×
341
}
342

UNCOV
343
void AsyncFileSave::BackgroundHandlerProc(void *curlSocketContext_clientData,
×
344
                                          int mask) {
UNCOV
345
  if (curlSocketContext_clientData) {
×
UNCOV
346
    int flags{0};
×
UNCOV
347
    if (mask & SOCKET_READABLE) {
×
UNCOV
348
      flags |= CURL_CSELECT_IN;
×
349
    }
UNCOV
350
    if (mask & SOCKET_WRITABLE) {
×
UNCOV
351
      flags |= CURL_CSELECT_OUT;
×
352
    }
353
    auto csc = static_cast<_CurlSocketContext *>(curlSocketContext_clientData)
UNCOV
354
                   ->shared_from_this();
×
UNCOV
355
    if (auto pAfs = csc->pHandler.lock()) {
×
UNCOV
356
      int runningHandles{0};
×
UNCOV
357
      pAfs->wCurlMulti_(curl_multi_socket_action, csc->sockfd, flags,
×
358
                        &runningHandles);
UNCOV
359
      pAfs->CheckMultiInfo();
×
UNCOV
360
    }
×
UNCOV
361
  }
×
UNCOV
362
}
×
363

UNCOV
364
void AsyncFileSave::TimeoutHandlerProc(void *asyncFileSave_clientData) {
×
UNCOV
365
  if (asyncFileSave_clientData) {
×
UNCOV
366
    auto pAfs = static_cast<AsyncFileSave *>(asyncFileSave_clientData);
×
UNCOV
367
    int runningHandles{-1};
×
UNCOV
368
    pAfs->wCurlMulti_(curl_multi_socket_action, CURL_SOCKET_TIMEOUT, 0,
×
369
                      &runningHandles);
UNCOV
370
    pAfs->CheckMultiInfo();
×
371
  }
UNCOV
372
}
×
373

374
#if _WIN32
375

376
VOID CALLBACK AsyncFileSave::FileIOCompletionRoutine(
377
    __in DWORD dwErrorCode, __in DWORD dwNumberOfBytesTransferred,
378
    __in LPOVERLAPPED lpOverlapped) {
379
  if (dwErrorCode != NOERROR) {
380
    LOGGER->error(GetErrorMessage(dwErrorCode));
381
  }
382
  if (lpOverlapped->hEvent) {
383
    auto pCtx = static_cast<_CurlEasyContext *>(lpOverlapped->hEvent);
384
    if (dwNumberOfBytesTransferred != pCtx->writeData.buf.size()) {
385
      LOGGER->error("File IO incomplete for {}, {} bytes written of {}",
386
                    pCtx->writeData.dstPath.string(),
387
                    dwNumberOfBytesTransferred, pCtx->writeData.buf.size());
388
    }
389
    RemoveContext(pCtx);
390
  }
391
}
392

393
#elif __linux__
394

UNCOV
395
void AsyncFileSave::AioSigHandler(int sig, siginfo_t *si, void *) {
×
UNCOV
396
  if (si->si_code == SI_ASYNCIO) {
×
UNCOV
397
    auto *pCtx = static_cast<_CurlEasyContext *>(si->si_value.sival_ptr);
×
UNCOV
398
    const ssize_t bytesWritten = aio_return(&pCtx->writeData._aiocb);
×
UNCOV
399
    if (bytesWritten == -1) {
×
400
      LOGGER->error(strerror(errno));
×
UNCOV
401
    } else if (bytesWritten == pCtx->writeData.buf.size()) {
×
UNCOV
402
      LOGGER->info("File IO complete {}", pCtx->writeData.dstPath);
×
403
    } else {
404
      LOGGER->info("Filo IO incomplete {}, {} bytes written",
×
405
                   pCtx->writeData.dstPath, bytesWritten);
×
406
      return; // do not remove context yet
×
407
    }
UNCOV
408
    if (auto pHandler = pCtx->pHandler.lock()) {
×
UNCOV
409
      pHandler->pSched_->scheduleDelayedTask(
×
410
          0,
UNCOV
411
          [](void *clientData) {
×
UNCOV
412
            if (!clientData) {
×
413
              return;
×
414
            }
UNCOV
415
            RemoveContext(static_cast<_CurlEasyContext *>(clientData));
×
416
          },
417
          pCtx);
UNCOV
418
    }
×
419
  }
420
}
421

UNCOV
422
void AsyncFileSave::InstallHandlers() {
×
UNCOV
423
  struct sigaction sa{};
×
UNCOV
424
  sigaction(IO_SIGNAL, nullptr, &sa);
×
UNCOV
425
  if (sa.sa_sigaction == AioSigHandler) {
×
426
    return;
×
427
  }
UNCOV
428
  memset(&sa, 0, sizeof(sa));
×
UNCOV
429
  sa.sa_flags = SA_RESTART | SA_SIGINFO;
×
UNCOV
430
  sa.sa_sigaction = AioSigHandler;
×
UNCOV
431
  if (sigaction(IO_SIGNAL, &sa, nullptr) == -1) {
×
432
    throw std::runtime_error(strerror(errno));
×
433
  }
434
}
435

436
#endif
437

UNCOV
438
void AsyncFileSave::RemoveContext(_CurlEasyContext *pCtx) {
×
UNCOV
439
  if (!pCtx) {
×
440
    return; // no-op
×
441
  }
UNCOV
442
  if (auto pHandler = pCtx->pHandler.lock()) {
×
UNCOV
443
    if (pHandler->savedFilePaths_.full()) {
×
UNCOV
444
      const auto &oldFile = pHandler->savedFilePaths_.front();
×
UNCOV
445
      if (std::filesystem::remove(oldFile)) {
×
UNCOV
446
        LOGGER->info("Removed {}, maximum file buffer size reached ({})",
×
UNCOV
447
                     oldFile, pHandler->savedFilePaths_.capacity());
×
448
      } else {
449
        LOGGER->warn(
×
450
            "Failed to remove {}, maximum saved images reached ({}), but "
451
            "old data has not been deleted, the disk may begin to fill",
452
            oldFile, pHandler->savedFilePaths_.capacity());
×
453
      }
454
    }
UNCOV
455
    pHandler->savedFilePaths_.push_back(pCtx->writeData.dstPath);
×
456

457
    // Avoid reallocating a buffer, stash it in a node with a max key
UNCOV
458
    pHandler->spareBuf_.swap(pCtx->writeData.buf);
×
UNCOV
459
    pHandler->easyCtxs_.erase(pCtx->contextId);
×
UNCOV
460
  }
×
461
}
462

463
} // namespace callback
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc