• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

PredatorCZ / PreCore / 460

pending completion
460

push

github-actions-ci

PredatorCZ
try fix coverage

3204 of 6095 relevant lines covered (52.57%)

354.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/app/batch.cpp
1
#include "spike/app/batch.hpp"
2
#include "nlohmann/json.hpp"
3
#include "spike/app/console.hpp"
4
#include "spike/io/binreader.hpp"
5
#include "spike/io/stat.hpp"
6
#include "spike/master_printer.hpp"
7
#include <cinttypes>
8
#include <deque>
9
#include <future>
10

11
static constexpr bool CATCH_EXCEPTIONS = false;
12

13
struct WorkerThread {
×
14
  MultiThreadManagerImpl &manager;
15
  std::promise<void> state;
16
  std::function<void(std::string)> func{};
17
  void operator()();
18
};
19

20
struct MultiThreadManagerImpl {
21
  using FuncType = MultiThreadManager::FuncType;
22

23
  MultiThreadManagerImpl(size_t capacityPerThread)
×
24
      : capacity(capacityPerThread * std::thread::hardware_concurrency()) {
×
25
    const size_t minWorkerCount = std::thread::hardware_concurrency();
×
26

27
    for (size_t i = 0; i < minWorkerCount; i++) {
×
28
      std::promise<void> promise;
×
29
      states.emplace_back(promise.get_future());
×
30
      workers.emplace_back(WorkerThread{*this, std::move(promise)});
×
31
      pthread_setname_np(workers.back().native_handle(), "batch_worker");
×
32
    }
33
  }
34

35
  void Push(FuncType item) {
×
36
    {
37
      std::unique_lock<std::mutex> lk(mutex);
×
38
      hasSpace.wait(lk, [&] { return queue.size() < capacity; });
×
39
      queue.push_back(std::move(item));
×
40
    }
41
    canProcess.notify_one();
×
42
  }
43

44
  void Wait() {
×
45
    if (queue.empty() && workingWorkers == 0) {
×
46
      return;
×
47
    }
48
    std::mutex mutex_;
×
49
    std::unique_lock<std::mutex> lk(mutex_);
50
    finsihedBatch.wait(lk,
×
51
                       [&] { return queue.empty() && workingWorkers == 0; });
×
52
  }
53

54
  FuncType Pop() {
×
55
    FuncType retval;
56
    {
57
      std::unique_lock<std::mutex> lk(mutex);
×
58
      canProcess.wait(
×
59
          lk, [&] { return !queue.empty() || (done && queue.empty()); });
×
60

61
      if (!queue.empty()) [[likely]] {
×
62
        workingWorkers++;
63
        retval = std::move(queue.front());
×
64
        queue.pop_front();
×
65
      }
66
    }
67
    hasSpace.notify_one();
×
68
    return retval;
×
69
  }
70

71
  ~MultiThreadManagerImpl() {
×
72
    done = true;
×
73
    canProcess.notify_all();
×
74
    for (auto &future : states) {
×
75
      if (future.valid()) {
×
76
        future.get();
×
77
      }
78
    }
79
    for (auto &w : workers) {
×
80
      if (w.joinable()) {
×
81
        w.join();
×
82
      }
83
    }
84
  }
85

86
  std::vector<std::thread> workers;
87
  std::vector<std::future<void>> states;
88
  std::deque<FuncType> queue;
89
  std::atomic_int32_t workingWorkers;
90
  size_t capacity;
91
  bool done = false;
92
  std::mutex mutex;
93
  std::condition_variable canProcess;
94
  std::condition_variable hasSpace;
95
  std::condition_variable finsihedBatch;
96
};
97

98
void WorkerThread::operator()() {
×
99
  while (true) {
100
    {
101
      auto item = manager.Pop();
×
102

103
      if (!item) [[unlikely]] {
×
104
        manager.workingWorkers--;
×
105
        manager.finsihedBatch.notify_all();
×
106
        break;
107
      }
108

109
      if constexpr (CATCH_EXCEPTIONS) {
110
        item();
111

112
        if (auto curException = std::current_exception(); curException) {
113
          state.set_exception(curException);
114
        }
115
      } else {
116
        try {
117
          item();
118
        } catch (const std::exception &e) {
×
119
          printerror(e.what());
×
120
        } catch (...) {
×
121
          printerror("Uncaught exception");
×
122
        }
×
123
      }
124
    }
125

126
    manager.workingWorkers--;
×
127
    manager.finsihedBatch.notify_all();
×
128
  }
129

130
  state.set_value();
×
131
}
132

133
MultiThreadManager::MultiThreadManager(size_t capacity_)
×
134
    : pi(std::make_unique<MultiThreadManagerImpl>(capacity_)) {}
×
135

136
MultiThreadManager::~MultiThreadManager() = default;
×
137

138
void MultiThreadManager::Push(FuncType item) { pi->Push(std::move(item)); }
×
139

140
void MultiThreadManager::Wait() { pi->Wait(); }
×
141

142
void SimpleManager::Push(SimpleManager::FuncType item) {
×
143
  if constexpr (CATCH_EXCEPTIONS) {
144
    item();
145
  } else {
146
    try {
147
      item();
148
    } catch (const std::exception &e) {
×
149
      printerror(e.what());
×
150
    }
×
151
  }
152
}
153

154
struct ScanningFoldersBar : LoadingBar {
155
  char buffer[512]{};
156
  size_t modifyPos = 0;
157

158
  ScanningFoldersBar(std::string_view folder)
×
159
      : LoadingBar({buffer, sizeof(buffer)}) {
×
160
    static constexpr std::string_view part1("Scanning folder: ");
161
    strncpy(buffer, part1.data(), part1.size());
162
    modifyPos = part1.size();
×
163
    strncpy(buffer + modifyPos, folder.data(), folder.size());
×
164
    modifyPos += folder.size();
×
165
  }
166

167
  void Update(size_t numFolders, size_t numFiles, size_t foundFiles) {
168
    snprintf(buffer + modifyPos, sizeof(buffer) - modifyPos,
169
             " %4" PRIuMAX " folders, %4" PRIuMAX " files, %4" PRIuMAX
170
             " found.",
171
             numFolders, numFiles, foundFiles);
172
  }
173
};
174

175
Batch::Batch(APPContext *ctx_, size_t queueCapacity)
×
176
    : ctx(ctx_), manager(queueCapacity) {
×
177
  if (ctx->info->batchControlFilters.empty()) {
×
178
    for (auto &c : ctx->info->filters) {
×
179
      scanner.AddFilter(c);
×
180
      loaderFilter.AddFilter(c);
×
181
    }
182
  } else {
183
    scanner.AddFilter(std::string_view("batch.json$"));
×
184

185
    for (auto &c : ctx->info->batchControlFilters) {
×
186
      loaderFilter.AddFilter(c);
×
187
      batchControlFilter.AddFilter(c);
×
188
    }
189

190
    for (auto &c : ctx->info->filters) {
×
191
      loaderFilter.AddFilter(c);
×
192
      supplementalFilter.AddFilter(c);
×
193
    }
194
  }
195
}
196

197
void Batch::AddFile(std::string path) {
×
198
  auto type = FileType(path);
×
199
  switch (type) {
×
200
  case FileType_e::Directory: {
×
201
    auto scanBar = AppendNewLogLine<ScanningFoldersBar>(path);
×
202
    scanner.scanCbData = scanBar;
×
203
    scanner.scanCb = [](void *data, size_t numFolders, size_t numFiles,
×
204
                        size_t foundFiles) {
205
      auto barData = static_cast<ScanningFoldersBar *>(data);
206
      barData->Update(numFolders, numFiles, foundFiles);
207
    };
208
    scanner.Scan(path);
×
209
    scanBar->Finish();
210
    if (keepFinishLines) {
×
211
      ReleaseLogLines(scanBar);
×
212
    } else {
213
      RemoveLogLines(scanBar);
×
214
    }
215

216
    if (updateFileCount && scanner.Files().size()) {
×
217
      updateFileCount(scanner.Files().size() - 1);
×
218
    }
219

220
    if (forEachFolder) {
×
221
      AppPackStats stats{};
222
      stats.numFiles = scanner.Files().size();
223

224
      for (auto &f : scanner) {
×
225
        stats.totalSizeFileNames += f.size() + 1;
×
226
      }
227

228
      forEachFolder(path, stats);
×
229
    }
230

231
    for (auto &f : scanner) {
×
232
      manager.Push([&, iCtx{MakeIOContext(f)}] {
×
233
        forEachFile(iCtx.get());
×
234
        iCtx->Finish();
×
235
      });
×
236
    }
237

238
    manager.Wait();
239

240
    if (forEachFolderFinish) {
×
241
      forEachFolderFinish();
242
    }
243

244
    break;
245
  }
246

247
  default: {
248
    const size_t found = path.find(".zip");
249
    if (found != path.npos) {
×
250
      if (found + 4 == path.size()) {
×
251
        if (rootZips.contains(path)) {
×
252
          break;
253
        }
254

255
        if (zips.contains(path)) {
×
256
          zips.erase(path);
257
        }
258
        rootZips.emplace(path);
259

260
        auto labelData = "Loading ZIP vfs: " + path;
×
261
        auto loadBar = AppendNewLogLine<LoadingBar>(labelData);
×
262
        const bool loadFiltered = ctx->info->filteredLoad;
×
263
        auto fctx = loadFiltered ? MakeZIPContext(path, loaderFilter, {})
×
264
                                 : MakeZIPContext(path);
×
265

266
        AFileInfo zFile(path);
×
267
        fctx->basePath = zFile.GetFullPathNoExt();
×
268

269
        loadBar->Finish();
270
        if (keepFinishLines) {
×
271
          ReleaseLogLines(loadBar);
×
272
        } else {
273
          RemoveLogLines(loadBar);
×
274
        }
275

276
        auto Iterate = [&](auto &&what) {
×
277
          auto vfsIter = fctx->Iter();
×
278

279
          for (auto f : vfsIter) {
×
280
            auto item = f.AsView();
×
281
            if (size_t lastSlash = item.find_last_of("/\\");
×
282
                lastSlash != item.npos) {
283
              item.remove_prefix(lastSlash + 1);
×
284
            }
285

286
            if (scanner.IsFiltered(item)) {
×
287
              what(f);
×
288
            }
289
          }
290
        };
291

×
292
        if (forEachFolder) {
×
293
          auto zipPath = path.substr(0, path.size() - 4);
294
          AppPackStats stats{};
×
295

×
296
          Iterate([&](auto &f) {
×
297
            stats.numFiles++;
298
            stats.totalSizeFileNames += f.AsView().size() + 1;
×
299
          });
300

301
          forEachFolder(zipPath, stats);
×
302
        }
×
303

304
        Iterate([&](auto &f) {
305
          manager.Push([&, zInstance(fctx->Instance(f))] {
306
            forEachFile(zInstance.get());
×
307
            zInstance->Finish();
×
308
          });
309
        });
×
310

×
311
        manager.Wait();
×
312

313
        if (forEachFolderFinish) {
×
314
          forEachFolderFinish();
315
        }
316

×
317
        fctx->Finish();
318
        break;
319
      } else if (path[found + 4] == '/') {
320
        auto sub = path.substr(0, found + 4);
321
        if (rootZips.contains(sub)) {
322
          break;
×
323
        }
×
324

×
325
        auto foundZip = zips.find(sub);
326
        auto filterString = "^" + path.substr(found + 5);
×
327

×
328
        if (es::IsEnd(zips, foundZip)) {
×
329
          PathFilter pVec;
×
330
          pVec.AddFilter(filterString);
331
          zips.emplace(std::move(sub), std::move(pVec));
×
332
        } else {
333
          foundZip->second.AddFilter(filterString);
334
        }
×
335
        break;
×
336
      }
×
337
    }
×
338

339
    if (type == FileType_e::File) {
×
340
      if (!ctx->info->batchControlFilters.empty()) {
341
        if (!path.ends_with("batch.json")) {
342
          printerror("Expected json bach, got: " << path);
343
          break;
×
344
        }
345

346
        BinReader mainFile(path);
347
        std::string pathDir(AFileInfo(path).GetFolder());
×
348
        nlohmann::json batch(nlohmann::json::parse(mainFile.BaseStream()));
349

×
350
        if (updateFileCount) {
×
351
          updateFileCount(batch.size());
×
352
        }
353

354
        for (auto &group : batch) {
355
          std::vector<std::string> supplementals;
356
          std::string controlPath;
×
357

358
          for (std::string item : group) {
×
359
            if (batchControlFilter.IsFiltered(item)) {
×
360
              if (!controlPath.empty()) {
×
361
                printerror("Dupicate main file for batch: " << item);
×
362
                continue;
×
363
              }
×
364

365
              controlPath = pathDir + item;
366
              continue;
367
            }
368

369
            supplementals.emplace_back(pathDir + item);
×
370
          }
×
371

×
372
          manager.Push(
×
373
              [&, iCtx{MakeIOContext(controlPath, std::move(supplementals))}] {
×
374
                forEachFile(iCtx.get());
375
                iCtx->Finish();
376
              });
×
377
        }
×
378
      } else {
×
379
        manager.Push([&, iCtx{MakeIOContext(path)}] {
380
          forEachFile(iCtx.get());
×
381
          iCtx->Finish();
×
382
        });
383
      }
384
      break;
×
385
    }
×
386
    printerror("Invalid path: " << path);
387
    break;
388
  }
×
389
  }
×
390
}
×
391

×
392
void Batch::FinishBatch() {
×
393
  for (auto &[zip, paths] : zips) {
394
    auto labelData = "Loading ZIP vfs: " + zip;
395
    auto loadBar = AppendNewLogLine<LoadingBar>(labelData);
×
396
    const bool loadFiltered = ctx->info->filteredLoad;
×
397
    auto fctx = loadFiltered ? MakeZIPContext(zip, loaderFilter, paths)
398
                             : MakeZIPContext(zip);
399

×
400
    AFileInfo zFile(zip);
401
    fctx->basePath = zFile.GetFullPathNoExt();
402

×
403
    loadBar->Finish();
×
404
    if (keepFinishLines) {
×
405
      ReleaseLogLines(loadBar);
×
406
    } else {
×
407
      RemoveLogLines(loadBar);
408
    }
409

×
410
    auto Iterate = [&, &paths = paths](auto &&what) {
×
411
      auto vfsIter = fctx->Iter();
×
412

×
413
      for (auto f : vfsIter) {
414
        auto item = f.AsView();
415
        if (size_t lastSlash = item.find_last_of("/\\");
416
            lastSlash != item.npos) {
×
417
          item.remove_prefix(lastSlash + 1);
×
418
        }
419

420
        if (scanner.IsFiltered(item) && paths.IsFiltered(f.AsView())) {
421
          what(f);
422
        }
×
423
      }
×
424
    };
×
425

×
426
    if (forEachFolder) {
×
427
      auto zipPath = zip.substr(0, zip.size() - 4);
×
428
      AppPackStats stats{};
×
429

430
      Iterate([&](auto &f) {
×
431
        stats.numFiles++;
×
432
        stats.totalSizeFileNames += f.AsView().size() + 1;
433
      });
434

×
435
      forEachFolder(std::move(zipPath), stats);
×
436
    }
437

×
438
    Iterate([&](auto &f) {
439
      manager.Push([&, zInstance(fctx->Instance(f))] {
440
        forEachFile(zInstance.get());
×
441
        zInstance->Finish();
×
442
      });
443
    });
×
444

×
445
    manager.Wait();
×
446

447
    if (forEachFolderFinish) {
×
448
      forEachFolderFinish();
449
    }
450

×
451
    fctx->Finish();
×
452
  }
453
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc