• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

PredatorCZ / PreCore / 505

24 Mar 2024 11:59AM UTC coverage: 54.243% (-0.8%) from 55.056%
505

push

github

PredatorCZ
fix more build fails

4142 of 7636 relevant lines covered (54.24%)

8779.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/app/batch.cpp
1
#include "spike/app/batch.hpp"
2
#include "nlohmann/json.hpp"
3
#include "spike/app/console.hpp"
4
#include "spike/io/binreader.hpp"
5
#include "spike/io/stat.hpp"
6
#include "spike/master_printer.hpp"
7
#include <cinttypes>
8
#include <deque>
9
#include <future>
10

11
static constexpr bool CATCH_EXCEPTIONS = false;
12

13
struct WorkerThread {
×
14
  MultiThreadManagerImpl &manager;
15
  std::promise<void> state;
16
  std::function<void(std::string)> func{};
17
  void operator()();
18
};
19

20
struct MultiThreadManagerImpl {
21
  using FuncType = MultiThreadManager::FuncType;
22

23
  MultiThreadManagerImpl(size_t capacityPerThread)
×
24
      : capacity(capacityPerThread * std::thread::hardware_concurrency()) {
×
25
    const size_t minWorkerCount = std::thread::hardware_concurrency();
×
26

27
    for (size_t i = 0; i < minWorkerCount; i++) {
×
28
      std::promise<void> promise;
×
29
      states.emplace_back(promise.get_future());
×
30
      workers.emplace_back(WorkerThread{*this, std::move(promise)});
×
31
      pthread_setname_np(workers.back().native_handle(), "batch_worker");
×
32
    }
33
  }
34

35
  void Push(FuncType item) {
×
36
    {
37
      std::unique_lock<std::mutex> lk(mutex);
×
38
      hasSpace.wait(lk, [&] { return queue.size() < capacity; });
×
39
      queue.push_back(std::move(item));
×
40
    }
41
    canProcess.notify_one();
×
42
  }
43

44
  void Wait() {
×
45
    if (queue.empty() && workingWorkers == 0) {
×
46
      return;
×
47
    }
48
    std::mutex mutex_;
×
49
    std::unique_lock<std::mutex> lk(mutex_);
50
    finsihedBatch.wait(lk,
×
51
                       [&] { return queue.empty() && workingWorkers == 0; });
×
52
  }
53

54
  FuncType Pop() {
×
55
    FuncType retval;
56
    {
57
      std::unique_lock<std::mutex> lk(mutex);
×
58
      canProcess.wait(
×
59
          lk, [&] { return !queue.empty() || (done && queue.empty()); });
×
60

61
      if (!queue.empty()) [[likely]] {
×
62
        workingWorkers++;
63
        retval = std::move(queue.front());
×
64
        queue.pop_front();
×
65
      }
66
    }
67
    hasSpace.notify_one();
×
68
    return retval;
×
69
  }
70

71
  ~MultiThreadManagerImpl() {
×
72
    done = true;
×
73
    canProcess.notify_all();
×
74
    for (auto &future : states) {
×
75
      if (future.valid()) {
×
76
        future.get();
×
77
      }
78
    }
79
    for (auto &w : workers) {
×
80
      if (w.joinable()) {
×
81
        w.join();
×
82
      }
83
    }
84
  }
85

86
  std::vector<std::thread> workers;
87
  std::vector<std::future<void>> states;
88
  std::deque<FuncType> queue;
89
  std::atomic_int32_t workingWorkers;
90
  size_t capacity;
91
  bool done = false;
92
  std::mutex mutex;
93
  std::condition_variable canProcess;
94
  std::condition_variable hasSpace;
95
  std::condition_variable finsihedBatch;
96
};
97

98
void WorkerThread::operator()() {
×
99
  while (true) {
100
    {
101
      auto item = manager.Pop();
×
102

103
      if (!item) [[unlikely]] {
×
104
        manager.workingWorkers--;
×
105
        manager.finsihedBatch.notify_all();
×
106
        break;
107
      }
108

109
      if constexpr (CATCH_EXCEPTIONS) {
110
        item();
111

112
        if (auto curException = std::current_exception(); curException) {
113
          state.set_exception(curException);
114
        }
115
      } else {
116
        try {
117
          item();
118
        } catch (const std::exception &e) {
×
119
          PrintError(e.what());
×
120
        } catch (...) {
×
121
          PrintError("Uncaught exception");
122
        }
×
123
      }
124
    }
125

126
    manager.workingWorkers--;
×
127
    manager.finsihedBatch.notify_all();
×
128
  }
129

130
  state.set_value();
×
131
}
132

133
MultiThreadManager::MultiThreadManager(size_t capacity_)
×
134
    : pi(std::make_unique<MultiThreadManagerImpl>(capacity_)) {}
×
135

136
MultiThreadManager::~MultiThreadManager() = default;
×
137

138
void MultiThreadManager::Push(FuncType item) { pi->Push(std::move(item)); }
×
139

140
void MultiThreadManager::Wait() { pi->Wait(); }
×
141

142
void SimpleManager::Push(SimpleManager::FuncType item) {
×
143
  if constexpr (CATCH_EXCEPTIONS) {
144
    item();
145
  } else {
146
    try {
147
      item();
148
    } catch (const std::exception &e) {
×
149
      PrintError(e.what());
×
150
    }
×
151
  }
152
}
153

154
struct ScanningFoldersBar : LoadingBar {
155
  char buffer[512]{};
156
  size_t modifyPos = 0;
157

158
  ScanningFoldersBar(std::string_view folder)
×
159
      : LoadingBar({buffer, sizeof(buffer)}) {
×
160
    static constexpr std::string_view part1("Scanning folder: ");
161
    strncpy(buffer, part1.data(), part1.size());
162
    modifyPos = part1.size();
×
163
    strncpy(buffer + modifyPos, folder.data(), folder.size());
×
164
    modifyPos += folder.size();
×
165
  }
166

167
  void Update(size_t numFolders, size_t numFiles, size_t foundFiles) {
168
    snprintf(buffer + modifyPos, sizeof(buffer) - modifyPos,
169
             " %4" PRIuMAX " folders, %4" PRIuMAX " files, %4" PRIuMAX
170
             " found.",
171
             numFolders, numFiles, foundFiles);
172
  }
173
};
174

175
Batch::Batch(APPContext *ctx_, size_t queueCapacity)
×
176
    : ctx(ctx_), manager(queueCapacity) {
×
177
  if (ctx->info->batchControlFilters.empty()) {
×
178
    for (auto &c : ctx->info->filters) {
×
179
      scanner.AddFilter(c);
×
180
      loaderFilter.AddFilter(c);
×
181
    }
182
  } else {
183
    scanner.AddFilter(std::string_view("batch.json$"));
×
184

185
    for (auto &c : ctx->info->batchControlFilters) {
×
186
      loaderFilter.AddFilter(c);
×
187
      batchControlFilter.AddFilter(c);
×
188
    }
189

190
    for (auto &c : ctx->info->filters) {
×
191
      loaderFilter.AddFilter(c);
×
192
      supplementalFilter.AddFilter(c);
×
193
    }
194
  }
195
}
196

197
void Batch::AddBatch(nlohmann::json &batch, const std::string &batchPath) {
×
198
  for (auto &group : batch) {
×
199
    std::vector<std::string> supplementals;
×
200
    std::string controlPath;
201

202
    for (std::string item : group) {
×
203
      if (batchControlFilter.IsFiltered(item)) {
×
204
        if (!controlPath.empty()) {
×
205
          PrintError("Dupicate main file in batch group: ", item);
×
206
          continue;
×
207
        }
208

209
        controlPath = batchPath + item;
×
210
        continue;
×
211
      }
212

213
      supplementals.emplace_back(batchPath + item);
×
214
    }
215

216
    if (controlPath.empty()) {
×
217
      PrintError("Main file not provided for batch group");
218
      continue;
219
    }
220

221
    manager.Push(
×
222
        [&, iCtx{MakeIOContext(controlPath, std::move(supplementals))}] {
×
223
          forEachFile(iCtx.get());
×
224
          iCtx->Finish();
×
225
        });
×
226
  }
227
}
228

229
void Batch::AddFile(std::string path) {
×
230
  auto type = FileType(path);
×
231
  switch (type) {
×
232
  case FileType_e::Directory: {
×
233
    auto scanBar = AppendNewLogLine<ScanningFoldersBar>(path);
×
234
    scanner.scanCbData = scanBar;
×
235
    scanner.scanCb = [](void *data, size_t numFolders, size_t numFiles,
×
236
                        size_t foundFiles) {
237
      auto barData = static_cast<ScanningFoldersBar *>(data);
238
      barData->Update(numFolders, numFiles, foundFiles);
239
    };
240
    scanner.Clear();
×
241
    scanner.Scan(path);
×
242
    scanBar->Finish();
243
    if (keepFinishLines) {
×
244
      ReleaseLogLines(scanBar);
×
245
    } else {
246
      RemoveLogLines(scanBar);
×
247
    }
248

249
    if (updateFileCount && scanner.Files().size()) {
×
250
      updateFileCount(scanner.Files().size() - 1);
×
251
    }
252

253
    if (forEachFolder) {
×
254
      forEachFolder(path, scanner.Files().size());
×
255
    }
256

257
    for (auto &f : scanner) {
×
258
      manager.Push([&, iCtx{MakeIOContext(f)}] {
×
259
        forEachFile(iCtx.get());
×
260
        iCtx->Finish();
×
261
      });
×
262
    }
263

264
    manager.Wait();
265

266
    if (forEachFolderFinish) {
×
267
      forEachFolderFinish();
268
    }
269

270
    break;
271
  }
272

273
  default: {
274
    const size_t found = path.find(".zip");
275
    if (found != path.npos) {
×
276
      if (found + 4 == path.size()) {
×
277
        if (rootZips.contains(path)) {
×
278
          break;
279
        }
280

281
        if (zips.contains(path)) {
×
282
          zips.erase(path);
283
        }
284
        rootZips.emplace(path);
285

286
        auto labelData = "Loading ZIP vfs: " + path;
×
287
        auto loadBar = AppendNewLogLine<LoadingBar>(labelData);
×
288
        const bool loadFiltered = ctx->info->filteredLoad;
×
289
        auto fctx = loadFiltered ? MakeZIPContext(path, loaderFilter, {})
×
290
                                 : MakeZIPContext(path);
×
291

292
        AFileInfo zFile(path);
×
293
        fctx->basePath = zFile.GetFullPathNoExt();
×
294

295
        loadBar->Finish();
296
        if (keepFinishLines) {
×
297
          ReleaseLogLines(loadBar);
×
298
        } else {
299
          RemoveLogLines(loadBar);
×
300
        }
301

302
        auto Iterate = [&](auto &&what) {
×
303
          auto vfsIter = fctx->Iter();
×
304

305
          for (auto f : vfsIter) {
×
306
            auto item = f.AsView();
×
307
            if (size_t lastSlash = item.find_last_of("/\\");
×
308
                lastSlash != item.npos) {
309
              item.remove_prefix(lastSlash + 1);
×
310
            }
311

312
            if (scanner.IsFiltered(item)) {
×
313
              what(f);
×
314
            }
315
          }
316
        };
317

×
318
        if (forEachFolder) {
×
319
          auto zipPath = path.substr(0, path.size() - 4);
320
          size_t numFiles = 0;
×
321

×
322
          Iterate([&](auto &) { numFiles++; });
×
323

324
          forEachFolder(zipPath, numFiles);
×
325
        }
326

327
        Iterate([&](auto &f) {
×
328
          manager.Push([&, zInstance(fctx->Instance(f))] {
×
329
            forEachFile(zInstance.get());
330
            zInstance->Finish();
331
          });
332
        });
×
333

×
334
        manager.Wait();
335

×
336
        if (forEachFolderFinish) {
×
337
          forEachFolderFinish();
×
338
        }
339

×
340
        fctx->Finish();
341
        break;
342
      } else if (path[found + 4] == '/') {
×
343
        auto sub = path.substr(0, found + 4);
344
        if (rootZips.contains(sub)) {
345
          break;
346
        }
347

348
        auto foundZip = zips.find(sub);
×
349
        auto filterString = "^" + path.substr(found + 5);
×
350

×
351
        if (es::IsEnd(zips, foundZip)) {
352
          PathFilter pVec;
×
353
          pVec.AddFilter(filterString);
354
          zips.emplace(std::move(sub), std::move(pVec));
×
355
        } else {
356
          foundZip->second.AddFilter(filterString);
357
        }
×
358
        break;
×
359
      }
×
360
    }
×
361

362
    if (type == FileType_e::File) {
×
363
      if (!ctx->info->batchControlFilters.empty()) {
364
        if (!path.ends_with("batch.json")) {
365
          PrintError("Expected json bach, got: ", path);
366
          break;
×
367
        }
368

369
        BinReader mainFile(path);
370
        std::string pathDir(AFileInfo(path).GetFolder());
×
371
        nlohmann::json batch(nlohmann::json::parse(mainFile.BaseStream()));
372

×
373
        if (updateFileCount) {
×
374
          updateFileCount(batch.size());
×
375
        }
376

377
        AddBatch(batch, pathDir);
378
      } else {
379
        manager.Push([&, iCtx{MakeIOContext(path)}] {
×
380
          forEachFile(iCtx.get());
381
          iCtx->Finish();
×
382
        });
×
383
      }
×
384
      break;
×
385
    }
×
386
    PrintError("Invalid path: ", path);
×
387
    break;
388
  }
389
  }
390
}
391

392
void Batch::FinishBatch() {
×
393
  for (auto &[zip, paths] : zips) {
×
394
    auto labelData = "Loading ZIP vfs: " + zip;
×
395
    auto loadBar = AppendNewLogLine<LoadingBar>(labelData);
×
396
    const bool loadFiltered = ctx->info->filteredLoad;
×
397
    auto fctx = loadFiltered ? MakeZIPContext(zip, loaderFilter, paths)
398
                             : MakeZIPContext(zip);
399

×
400
    AFileInfo zFile(zip);
×
401
    fctx->basePath = zFile.GetFullPathNoExt();
×
402

403
    loadBar->Finish();
×
404
    if (keepFinishLines) {
×
405
      ReleaseLogLines(loadBar);
406
    } else {
407
      RemoveLogLines(loadBar);
×
408
    }
409

×
410
    auto Iterate = [&, &paths = paths](auto &&what) {
×
411
      auto vfsIter = fctx->Iter();
×
412

×
413
      for (auto f : vfsIter) {
414
        auto item = f.AsView();
415
        if (size_t lastSlash = item.find_last_of("/\\");
416
            lastSlash != item.npos) {
×
417
          item.remove_prefix(lastSlash + 1);
×
418
        }
419

420
        if (scanner.IsFiltered(item) && paths.IsFiltered(f.AsView())) {
421
          what(f);
422
        }
×
423
      }
×
424
    };
×
425

×
426
    if (forEachFolder) {
×
427
      auto zipPath = zip.substr(0, zip.size() - 4);
×
428
      size_t numFiles = 0;
×
429

430
      Iterate([&](auto &) { numFiles++; });
×
431

×
432
      forEachFolder(std::move(zipPath), numFiles);
433
    }
434

×
435
    Iterate([&](auto &f) {
×
436
      manager.Push([&, zInstance(fctx->Instance(f))] {
437
        forEachFile(zInstance.get());
×
438
        zInstance->Finish();
439
      });
440
    });
×
441

×
442
    manager.Wait();
443

×
444
    if (forEachFolderFinish) {
×
445
      forEachFolderFinish();
×
446
    }
447

×
448
    fctx->Finish();
449
  }
450
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc