• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

PredatorCZ / PreCore / 487

02 Oct 2023 03:56PM UTC coverage: 55.265% (-0.04%) from 55.309%
487

push

github-actions-ci

PredatorCZ
fix sending wrong paths to pack context

7 of 7 new or added lines in 1 file covered. (100.0%)

4146 of 7502 relevant lines covered (55.27%)

8926.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/app/batch.cpp
1
#include "spike/app/batch.hpp"
2
#include "nlohmann/json.hpp"
3
#include "spike/app/console.hpp"
4
#include "spike/io/binreader.hpp"
5
#include "spike/io/stat.hpp"
6
#include "spike/master_printer.hpp"
7
#include <cinttypes>
8
#include <deque>
9
#include <future>
10

11
static constexpr bool CATCH_EXCEPTIONS = false;
12

13
struct WorkerThread {
×
14
  MultiThreadManagerImpl &manager;
15
  std::promise<void> state;
16
  std::function<void(std::string)> func{};
17
  void operator()();
18
};
19

20
struct MultiThreadManagerImpl {
21
  using FuncType = MultiThreadManager::FuncType;
22

23
  MultiThreadManagerImpl(size_t capacityPerThread)
×
24
      : capacity(capacityPerThread * std::thread::hardware_concurrency()) {
×
25
    const size_t minWorkerCount = std::thread::hardware_concurrency();
×
26

27
    for (size_t i = 0; i < minWorkerCount; i++) {
×
28
      std::promise<void> promise;
×
29
      states.emplace_back(promise.get_future());
×
30
      workers.emplace_back(WorkerThread{*this, std::move(promise)});
×
31
      pthread_setname_np(workers.back().native_handle(), "batch_worker");
×
32
    }
33
  }
34

35
  void Push(FuncType item) {
×
36
    {
37
      std::unique_lock<std::mutex> lk(mutex);
×
38
      hasSpace.wait(lk, [&] { return queue.size() < capacity; });
×
39
      queue.push_back(std::move(item));
×
40
    }
41
    canProcess.notify_one();
×
42
  }
43

44
  void Wait() {
×
45
    if (queue.empty() && workingWorkers == 0) {
×
46
      return;
×
47
    }
48
    std::mutex mutex_;
×
49
    std::unique_lock<std::mutex> lk(mutex_);
50
    finsihedBatch.wait(lk,
×
51
                       [&] { return queue.empty() && workingWorkers == 0; });
×
52
  }
53

54
  FuncType Pop() {
×
55
    FuncType retval;
56
    {
57
      std::unique_lock<std::mutex> lk(mutex);
×
58
      canProcess.wait(
×
59
          lk, [&] { return !queue.empty() || (done && queue.empty()); });
×
60

61
      if (!queue.empty()) [[likely]] {
×
62
        workingWorkers++;
63
        retval = std::move(queue.front());
×
64
        queue.pop_front();
×
65
      }
66
    }
67
    hasSpace.notify_one();
×
68
    return retval;
×
69
  }
70

71
  ~MultiThreadManagerImpl() {
×
72
    done = true;
×
73
    canProcess.notify_all();
×
74
    for (auto &future : states) {
×
75
      if (future.valid()) {
×
76
        future.get();
×
77
      }
78
    }
79
    for (auto &w : workers) {
×
80
      if (w.joinable()) {
×
81
        w.join();
×
82
      }
83
    }
84
  }
85

86
  std::vector<std::thread> workers;
87
  std::vector<std::future<void>> states;
88
  std::deque<FuncType> queue;
89
  std::atomic_int32_t workingWorkers;
90
  size_t capacity;
91
  bool done = false;
92
  std::mutex mutex;
93
  std::condition_variable canProcess;
94
  std::condition_variable hasSpace;
95
  std::condition_variable finsihedBatch;
96
};
97

98
void WorkerThread::operator()() {
×
99
  while (true) {
100
    {
101
      auto item = manager.Pop();
×
102

103
      if (!item) [[unlikely]] {
×
104
        manager.workingWorkers--;
×
105
        manager.finsihedBatch.notify_all();
×
106
        break;
107
      }
108

109
      if constexpr (CATCH_EXCEPTIONS) {
110
        item();
111

112
        if (auto curException = std::current_exception(); curException) {
113
          state.set_exception(curException);
114
        }
115
      } else {
116
        try {
117
          item();
118
        } catch (const std::exception &e) {
×
119
          printerror(e.what());
×
120
        } catch (...) {
×
121
          printerror("Uncaught exception");
×
122
        }
×
123
      }
124
    }
125

126
    manager.workingWorkers--;
×
127
    manager.finsihedBatch.notify_all();
×
128
  }
129

130
  state.set_value();
×
131
}
132

133
MultiThreadManager::MultiThreadManager(size_t capacity_)
×
134
    : pi(std::make_unique<MultiThreadManagerImpl>(capacity_)) {}
×
135

136
MultiThreadManager::~MultiThreadManager() = default;
×
137

138
void MultiThreadManager::Push(FuncType item) { pi->Push(std::move(item)); }
×
139

140
void MultiThreadManager::Wait() { pi->Wait(); }
×
141

142
void SimpleManager::Push(SimpleManager::FuncType item) {
×
143
  if constexpr (CATCH_EXCEPTIONS) {
144
    item();
145
  } else {
146
    try {
147
      item();
148
    } catch (const std::exception &e) {
×
149
      printerror(e.what());
×
150
    }
×
151
  }
152
}
153

154
struct ScanningFoldersBar : LoadingBar {
155
  char buffer[512]{};
156
  size_t modifyPos = 0;
157

158
  ScanningFoldersBar(std::string_view folder)
×
159
      : LoadingBar({buffer, sizeof(buffer)}) {
×
160
    static constexpr std::string_view part1("Scanning folder: ");
161
    strncpy(buffer, part1.data(), part1.size());
162
    modifyPos = part1.size();
×
163
    strncpy(buffer + modifyPos, folder.data(), folder.size());
×
164
    modifyPos += folder.size();
×
165
  }
166

167
  void Update(size_t numFolders, size_t numFiles, size_t foundFiles) {
168
    snprintf(buffer + modifyPos, sizeof(buffer) - modifyPos,
169
             " %4" PRIuMAX " folders, %4" PRIuMAX " files, %4" PRIuMAX
170
             " found.",
171
             numFolders, numFiles, foundFiles);
172
  }
173
};
174

175
Batch::Batch(APPContext *ctx_, size_t queueCapacity)
×
176
    : ctx(ctx_), manager(queueCapacity) {
×
177
  if (ctx->info->batchControlFilters.empty()) {
×
178
    for (auto &c : ctx->info->filters) {
×
179
      scanner.AddFilter(c);
×
180
      loaderFilter.AddFilter(c);
×
181
    }
182
  } else {
183
    scanner.AddFilter(std::string_view("batch.json$"));
×
184

185
    for (auto &c : ctx->info->batchControlFilters) {
×
186
      loaderFilter.AddFilter(c);
×
187
      batchControlFilter.AddFilter(c);
×
188
    }
189

190
    for (auto &c : ctx->info->filters) {
×
191
      loaderFilter.AddFilter(c);
×
192
      supplementalFilter.AddFilter(c);
×
193
    }
194
  }
195
}
196

197
void Batch::AddFile(std::string path) {
×
198
  auto type = FileType(path);
×
199
  switch (type) {
×
200
  case FileType_e::Directory: {
×
201
    auto scanBar = AppendNewLogLine<ScanningFoldersBar>(path);
×
202
    scanner.scanCbData = scanBar;
×
203
    scanner.scanCb = [](void *data, size_t numFolders, size_t numFiles,
×
204
                        size_t foundFiles) {
205
      auto barData = static_cast<ScanningFoldersBar *>(data);
206
      barData->Update(numFolders, numFiles, foundFiles);
207
    };
208
    scanner.Clear();
×
209
    scanner.Scan(path);
×
210
    scanBar->Finish();
211
    if (keepFinishLines) {
×
212
      ReleaseLogLines(scanBar);
×
213
    } else {
214
      RemoveLogLines(scanBar);
×
215
    }
216

217
    if (updateFileCount && scanner.Files().size()) {
×
218
      updateFileCount(scanner.Files().size() - 1);
×
219
    }
220

221
    if (forEachFolder) {
×
222
      AppPackStats stats{};
223
      stats.numFiles = scanner.Files().size();
224

225
      for (auto &f : scanner) {
×
226
        stats.totalSizeFileNames += f.size() + 1;
×
227
      }
228

229
      forEachFolder(path, stats);
×
230
    }
231

232
    for (auto &f : scanner) {
×
233
      manager.Push([&, iCtx{MakeIOContext(f)}] {
×
234
        forEachFile(iCtx.get());
×
235
        iCtx->Finish();
×
236
      });
×
237
    }
238

239
    manager.Wait();
240

241
    if (forEachFolderFinish) {
×
242
      forEachFolderFinish();
243
    }
244

245
    break;
246
  }
247

248
  default: {
249
    const size_t found = path.find(".zip");
250
    if (found != path.npos) {
×
251
      if (found + 4 == path.size()) {
×
252
        if (rootZips.contains(path)) {
×
253
          break;
254
        }
255

256
        if (zips.contains(path)) {
×
257
          zips.erase(path);
258
        }
259
        rootZips.emplace(path);
260

261
        auto labelData = "Loading ZIP vfs: " + path;
×
262
        auto loadBar = AppendNewLogLine<LoadingBar>(labelData);
×
263
        const bool loadFiltered = ctx->info->filteredLoad;
×
264
        auto fctx = loadFiltered ? MakeZIPContext(path, loaderFilter, {})
×
265
                                 : MakeZIPContext(path);
×
266

267
        AFileInfo zFile(path);
×
268
        fctx->basePath = zFile.GetFullPathNoExt();
×
269

270
        loadBar->Finish();
271
        if (keepFinishLines) {
×
272
          ReleaseLogLines(loadBar);
×
273
        } else {
274
          RemoveLogLines(loadBar);
×
275
        }
276

277
        auto Iterate = [&](auto &&what) {
×
278
          auto vfsIter = fctx->Iter();
×
279

280
          for (auto f : vfsIter) {
×
281
            auto item = f.AsView();
×
282
            if (size_t lastSlash = item.find_last_of("/\\");
×
283
                lastSlash != item.npos) {
284
              item.remove_prefix(lastSlash + 1);
×
285
            }
286

287
            if (scanner.IsFiltered(item)) {
×
288
              what(f);
×
289
            }
290
          }
291
        };
292

×
293
        if (forEachFolder) {
×
294
          auto zipPath = path.substr(0, path.size() - 4);
295
          AppPackStats stats{};
×
296

×
297
          Iterate([&](auto &f) {
×
298
            stats.numFiles++;
299
            stats.totalSizeFileNames += f.AsView().size() + 1;
×
300
          });
301

302
          forEachFolder(zipPath, stats);
×
303
        }
×
304

305
        Iterate([&](auto &f) {
306
          manager.Push([&, zInstance(fctx->Instance(f))] {
307
            forEachFile(zInstance.get());
×
308
            zInstance->Finish();
×
309
          });
310
        });
×
311

×
312
        manager.Wait();
×
313

314
        if (forEachFolderFinish) {
×
315
          forEachFolderFinish();
316
        }
317

×
318
        fctx->Finish();
319
        break;
320
      } else if (path[found + 4] == '/') {
321
        auto sub = path.substr(0, found + 4);
322
        if (rootZips.contains(sub)) {
323
          break;
×
324
        }
×
325

×
326
        auto foundZip = zips.find(sub);
327
        auto filterString = "^" + path.substr(found + 5);
×
328

×
329
        if (es::IsEnd(zips, foundZip)) {
×
330
          PathFilter pVec;
×
331
          pVec.AddFilter(filterString);
332
          zips.emplace(std::move(sub), std::move(pVec));
×
333
        } else {
334
          foundZip->second.AddFilter(filterString);
335
        }
×
336
        break;
×
337
      }
×
338
    }
×
339

340
    if (type == FileType_e::File) {
×
341
      if (!ctx->info->batchControlFilters.empty()) {
342
        if (!path.ends_with("batch.json")) {
343
          printerror("Expected json bach, got: " << path);
344
          break;
×
345
        }
346

347
        BinReader mainFile(path);
348
        std::string pathDir(AFileInfo(path).GetFolder());
×
349
        nlohmann::json batch(nlohmann::json::parse(mainFile.BaseStream()));
350

×
351
        if (updateFileCount) {
×
352
          updateFileCount(batch.size());
×
353
        }
354

355
        for (auto &group : batch) {
356
          std::vector<std::string> supplementals;
357
          std::string controlPath;
×
358

359
          for (std::string item : group) {
×
360
            if (batchControlFilter.IsFiltered(item)) {
×
361
              if (!controlPath.empty()) {
×
362
                printerror("Dupicate main file for batch: " << item);
×
363
                continue;
×
364
              }
×
365

366
              controlPath = pathDir + item;
367
              continue;
368
            }
369

370
            supplementals.emplace_back(pathDir + item);
×
371
          }
×
372

×
373
          manager.Push(
×
374
              [&, iCtx{MakeIOContext(controlPath, std::move(supplementals))}] {
×
375
                forEachFile(iCtx.get());
376
                iCtx->Finish();
377
              });
×
378
        }
×
379
      } else {
×
380
        manager.Push([&, iCtx{MakeIOContext(path)}] {
381
          forEachFile(iCtx.get());
×
382
          iCtx->Finish();
×
383
        });
384
      }
385
      break;
×
386
    }
×
387
    printerror("Invalid path: " << path);
388
    break;
389
  }
×
390
  }
×
391
}
×
392

×
393
void Batch::FinishBatch() {
×
394
  for (auto &[zip, paths] : zips) {
395
    auto labelData = "Loading ZIP vfs: " + zip;
396
    auto loadBar = AppendNewLogLine<LoadingBar>(labelData);
×
397
    const bool loadFiltered = ctx->info->filteredLoad;
×
398
    auto fctx = loadFiltered ? MakeZIPContext(zip, loaderFilter, paths)
399
                             : MakeZIPContext(zip);
400

×
401
    AFileInfo zFile(zip);
402
    fctx->basePath = zFile.GetFullPathNoExt();
403

×
404
    loadBar->Finish();
×
405
    if (keepFinishLines) {
×
406
      ReleaseLogLines(loadBar);
×
407
    } else {
×
408
      RemoveLogLines(loadBar);
409
    }
410

×
411
    auto Iterate = [&, &paths = paths](auto &&what) {
×
412
      auto vfsIter = fctx->Iter();
×
413

×
414
      for (auto f : vfsIter) {
415
        auto item = f.AsView();
416
        if (size_t lastSlash = item.find_last_of("/\\");
417
            lastSlash != item.npos) {
×
418
          item.remove_prefix(lastSlash + 1);
×
419
        }
420

421
        if (scanner.IsFiltered(item) && paths.IsFiltered(f.AsView())) {
422
          what(f);
423
        }
×
424
      }
×
425
    };
×
426

×
427
    if (forEachFolder) {
×
428
      auto zipPath = zip.substr(0, zip.size() - 4);
×
429
      AppPackStats stats{};
×
430

431
      Iterate([&](auto &f) {
×
432
        stats.numFiles++;
×
433
        stats.totalSizeFileNames += f.AsView().size() + 1;
434
      });
435

×
436
      forEachFolder(std::move(zipPath), stats);
×
437
    }
438

×
439
    Iterate([&](auto &f) {
440
      manager.Push([&, zInstance(fctx->Instance(f))] {
441
        forEachFile(zInstance.get());
×
442
        zInstance->Finish();
×
443
      });
444
    });
×
445

×
446
    manager.Wait();
×
447

448
    if (forEachFolderFinish) {
×
449
      forEachFolderFinish();
450
    }
451

×
452
    fctx->Finish();
×
453
  }
454
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc