• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

PredatorCZ / PreCore / 460

pending completion
460

push

github-actions-ci

PredatorCZ
try fix coverage

3204 of 6095 relevant lines covered (52.57%)

354.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.06
/src/app/out_cache.cpp
1
/*  Cache format generator for seekless zip loading
2
    Part of PreCore's Spike project
3

4
    Copyright 2021-2022 Lukas Cone
5

6
    Licensed under the Apache License, Version 2.0 (the "License");
7
    you may not use this file except in compliance with the License.
8
    You may obtain a copy of the License at
9

10
        http://www.apache.org/licenses/LICENSE-2.0
11

12
    Unless required by applicable law or agreed to in writing, software
13
    distributed under the License is distributed on an "AS IS" BASIS,
14
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
    See the License for the specific language governing permissions and
16
    limitations under the License.
17
*/
18

19
#include "spike/app/cache.hpp"
20
#include "spike/app/console.hpp"
21
#include "spike/app/tmp_storage.hpp"
22
#include "spike/io/binwritter_stream.hpp"
23
#include "spike/io/fileinfo.hpp"
24
#include <algorithm>
25
#include <barrier>
26
#include <cinttypes>
27
#include <fstream>
28
#include <functional>
29
#include <future>
30
#include <map>
31
#include <set>
32
#include <sstream>
33
#include <string>
34
#include <vector>
35

36
static std::atomic_size_t searchHitCount;
37
static std::atomic_size_t searchMissCount;
38

39
struct StringSlider {
40
  std::string buffer;
41

42
private:
43
  std::vector<std::thread> workingThreads;
44
  std::map<std::thread::id, int64> threadResults;
45
  struct SearchArgs {
46
    size_t chunkBegin;
47
    size_t splitPoint;
48
    std::string_view str;
49
    std::boyer_moore_horspool_searcher<std::string_view::iterator> *searcher;
50
    bool endThread;
51
  };
52
  std::map<std::thread::id, SearchArgs> threadArgs;
53

54
  // All threads finished searching, notify main thread and resume ready state
55
  struct {
56
    std::map<std::thread::id, int64> *threadResults;
57
    std::atomic_bool *finishedSearch;
58
    int64 *resultOffset;
59
    void operator()() noexcept {
×
60
      // PrintInfo("Processing worker data");
61
      *resultOffset = -1;
×
62
      for (auto [_, off] : *threadResults) {
×
63
        if (off >= 0) {
×
64
          *resultOffset = off;
×
65
          break;
×
66
        }
67
      }
68

69
      // PrintInfo("Search end");
70

71
      *finishedSearch = true;
×
72
      finishedSearch->notify_all();
×
73
    }
74
  } workersDone;
75

76
  std::barrier<decltype(workersDone)> workerSync;
77

78
  // Call when threads are in ready state and notify main thread
79
  struct {
80
    std::atomic_bool *runWorkers;
81
    void operator()() noexcept {
2✔
82
      runWorkers->wait(false);
2✔
83
      *runWorkers = false;
2✔
84
      runWorkers->notify_all();
2✔
85
    }
2✔
86
  } workersReadyCb;
87
  std::barrier<decltype(workersReadyCb)> workersReady;
88

89
  std::atomic_bool stopWorkers;
90
  std::atomic_bool finishedSearch;
91
  std::atomic_bool runWorkers;
92

93
  int64 resultOffset = -1;
94

95
  std::atomic_bool &allowThreads;
96
  bool doMetrics = false;
97
  size_t searchHitCount = 0;
98
  size_t searchMissCount = 0;
99

100
public:
101
  StringSlider(std::atomic_bool &allowThreads, bool metrics)
2✔
102
      : workersDone{&threadResults, &finishedSearch, &resultOffset},
2✔
103
        workerSync(std::thread::hardware_concurrency(), workersDone),
2✔
104
        workersReadyCb{&runWorkers},
2✔
105
        workersReady(std::thread::hardware_concurrency(), workersReadyCb),
4✔
106
        allowThreads(allowThreads), doMetrics(metrics) {
2✔
107
    const size_t numThreads = std::thread::hardware_concurrency();
2✔
108

109
    auto SearcherThread = [this, numThreads] {
8✔
110
      while (true) {
111
        // PrintInfo("Thread wait");
112
        workersReady.arrive_and_wait();
4✔
113

114
        // PrintInfo("Thread begin");
115

116
        if (stopWorkers) {
4✔
117
          return;
4✔
118
        }
119

120
        try {
121
          SearchArgs &args = threadArgs.at(std::this_thread::get_id());
×
122

123
          std::string_view item;
124
          if (args.endThread) {
×
125
            item = std::string_view(buffer.data() + args.chunkBegin,
×
126
                                    buffer.end().operator->());
127
          } else {
128
            item = std::string_view(buffer.data() + args.chunkBegin,
×
129
                                    args.splitPoint + args.str.size());
×
130
          }
131

132
          auto found = std::search(item.begin(), item.end(), *args.searcher);
×
133
          int64 offset = -1;
134

135
          if (found != item.end()) {
×
136
            offset =
137
                std::distance(const_cast<const char *>(buffer.data()), &*found);
138
          }
139

140
          threadResults.at(std::this_thread::get_id()) = offset;
×
141
        } catch (...) {
×
142
          std::terminate();
×
143
        }
144

145
        // PrintInfo("Thread end");
146
        workerSync.arrive_and_wait();
×
147
        // PrintInfo("Thread end wait");
148
      }
149
    };
2✔
150

151
    workingThreads.resize(numThreads);
2✔
152
    for (size_t i = 0; i < numThreads; i++) {
6✔
153
      workingThreads.at(i) = std::thread(SearcherThread);
4✔
154
      auto &curThread = workingThreads.at(i);
155
      threadArgs.emplace(curThread.get_id(), SearchArgs{});
4✔
156
      threadResults.emplace(curThread.get_id(), -1);
4✔
157
      pthread_setname_np(curThread.native_handle(), "cache_srch_wrkr");
4✔
158
    }
159
  }
2✔
160

161
  ~StringSlider() {
2✔
162
    stopWorkers = true;
163
    runWorkers = true;
164
    runWorkers.notify_all();
165

166
    for (auto &w : workingThreads) {
6✔
167
      if (w.joinable()) {
4✔
168
        w.join();
4✔
169
      }
170
    }
171

172
    if (doMetrics) {
2✔
173
      ::searchHitCount += this->searchHitCount;
1✔
174
      ::searchMissCount += this->searchMissCount;
1✔
175
    }
176
  }
2✔
177

178
  std::string::iterator FindString(std::string_view str) {
8,619✔
179
    if (str.size() > buffer.size()) [[unlikely]] {
8,619✔
180
      if (std::string_view(str).starts_with(buffer)) [[unlikely]] {
3✔
181
        buffer = str;
2✔
182
        return buffer.begin();
183
      } else {
184
        return buffer.end();
185
      }
186
    }
187
    auto searcher = std::boyer_moore_horspool_searcher(str.begin(), str.end());
8,616✔
188

189
    if (size_t bSize = buffer.size(); allowThreads && bSize > 1'000'000) {
8,616✔
190
      const size_t numThreads = workingThreads.size();
191
      size_t splitPoint = bSize / numThreads;
×
192
      size_t chunkBegin = 0;
193

194
      for (size_t i = 0; i < numThreads; i++) {
×
195
        threadArgs.at(workingThreads.at(i).get_id()) = SearchArgs{
×
196
            .chunkBegin = chunkBegin,
197
            .splitPoint = splitPoint,
198
            .str = str,
199
            .searcher = &searcher,
200
            .endThread = i + 1 == numThreads,
×
201
        };
202
        chunkBegin += splitPoint;
×
203
      }
204

205
      // PrintInfo("Notifying workers");
206
      finishedSearch = false;
207
      runWorkers.wait(true);
208
      runWorkers = true;
209
      runWorkers.notify_all();
210
      // PrintInfo("Waiting for workers");
211
      finishedSearch.wait(false);
212
      // PrintInfo("Search done");
213

214
      if (resultOffset >= 0) {
×
215
        return std::next(buffer.begin(), resultOffset);
216
      }
217

218
      return buffer.end();
219
    } else {
220
      return std::search(buffer.begin(), buffer.end(), searcher);
8,616✔
221
    }
222
  }
223

224
  size_t InsertString(std::string_view str) {
8,619✔
225
    auto found = FindString(str);
8,619✔
226

227
    if (found == buffer.end()) {
8,619✔
228
      searchMissCount++;
1,237✔
229
      // todo add tail compare?
230
      buffer.append(str.data(), str.size());
1,237✔
231
      return buffer.size() - str.size();
1,237✔
232
    } else {
233
      searchHitCount++;
7,382✔
234
      return std::distance(buffer.begin(), found);
7,382✔
235
    }
236
  }
237
};
238

239
struct SliderString {
240
  size_t offset;
241
  size_t size;
242
  StringSlider *base;
243

244
  bool operator<(const SliderString &other) const {
57,039✔
245
    return std::string_view(base->buffer.data() + offset, size) <
57,039✔
246
           std::string_view(other.base->buffer.data() + other.offset,
57,039✔
247
                            other.size);
57,039✔
248
  }
249
};
250

251
static constexpr size_t STRING_OFFSET = sizeof(CacheBaseHeader) + 12;
252
static constexpr size_t ENTRIES_OFFSET = sizeof(CacheBaseHeader) + 8;
253
static constexpr size_t ROOT_OFFSET = sizeof(CacheBaseHeader) + 4;
254
static constexpr size_t HYBRIDLEAF_PARENTPTR = 4;
255
static constexpr size_t FINAL_PARENTPTR = 16;
256

257
struct FinalEntry : SliderString {
258
  size_t zipOffset;
259
  size_t zipSize;
260
  size_t totalFileNameSize;
261
  mutable size_t wrOffset = 0;
262

263
  void Write(BinWritterRef wr) const {
1,738✔
264
    wrOffset = wr.Tell();
1,738✔
265
    wr.Write(zipOffset);
1,738✔
266
    wr.Write(zipSize);
1,738✔
267
    wr.Write<uint32>(0);
1,738✔
268
    wr.Write<uint16>(size);
1,738✔
269
    wr.Write<uint16>(totalFileNameSize);
1,738✔
270

271
    if (size < 9) {
1,738✔
272
      wr.WriteBuffer(base->buffer.data() + offset, size);
91✔
273
      wr.ApplyPadding(8);
91✔
274
    } else {
275
      const int32 stringOffset = STRING_OFFSET + offset;
1,647✔
276
      const int32 thisOffset = wr.Tell();
1,647✔
277
      wr.Write(stringOffset - thisOffset);
1,647✔
278
      wr.Write<uint32>(0);
1,647✔
279
    }
280
  }
1,738✔
281
};
282

283
struct HybridLeafGen {
284
  std::map<SliderString, const FinalEntry *> finals;
285
  std::map<SliderString, size_t> childrenIds;
286
  SliderString partName;
287

288
  size_t Write(BinWritterRef wr, const std::vector<size_t> &childrenOffsets) {
126✔
289
    wr.ApplyPadding(4);
126✔
290
    for (auto &[_, id] : childrenIds) {
251✔
291
      const int32 childOffset = childrenOffsets.at(id);
125✔
292
      const int32 thisOffset = wr.Tell();
125✔
293
      wr.Write((childOffset - thisOffset) / 4);
125✔
294
    }
295

296
    const size_t retVal = wr.Tell() - HYBRIDLEAF_PARENTPTR;
126✔
297

298
    // fixup parent offset for children
299
    wr.Push();
300
    for (auto &[_, id] : childrenIds) {
251✔
301
      const size_t childOffset = childrenOffsets.at(id);
125✔
302
      wr.Seek(childOffset + HYBRIDLEAF_PARENTPTR);
125✔
303
      const int32 thisOffset = retVal;
125✔
304
      const int32 memberOffset = wr.Tell();
125✔
305
      wr.Write((thisOffset - memberOffset) / 4);
125✔
306
    }
307
    wr.Pop();
308

309
    wr.Write<uint32>(0);
126✔
310
    if (!partName.size) {
126✔
311
      wr.Write<uint32>(0);
1✔
312
    } else if (partName.size < 5) {
125✔
313
      wr.WriteBuffer(partName.base->buffer.data() + partName.offset,
43✔
314
                     partName.size);
315
      wr.ApplyPadding(4);
43✔
316
    } else {
317
      const int32 pathPartOffset_ = STRING_OFFSET + partName.offset;
82✔
318
      const int32 thisOffset = wr.Tell();
82✔
319
      wr.Write(pathPartOffset_ - thisOffset);
82✔
320
    }
321

322
    wr.Write<uint16>(childrenIds.size());
126✔
323
    wr.Write<uint16>(partName.size);
126✔
324
    wr.Write<uint32>(finals.size());
126✔
325

326
    for (auto &[_, entry] : finals) {
1,864✔
327
      const int32 finalOffset = entry->wrOffset;
1,738✔
328
      const int32 thisOffset = wr.Tell();
1,738✔
329
      wr.Write((finalOffset - thisOffset) / 4);
1,738✔
330
    }
331

332
    wr.Push();
333
    for (auto &[_, entry] : finals) {
1,864✔
334
      const size_t finalOffset = entry->wrOffset;
1,738✔
335
      wr.Seek(finalOffset + FINAL_PARENTPTR);
1,738✔
336
      const int32 thisOffset = retVal;
1,738✔
337
      const int32 memberOffset = wr.Tell();
1,738✔
338
      wr.Write((thisOffset - memberOffset) / 4);
1,738✔
339
    }
340
    wr.Pop();
341

342
    return retVal;
126✔
343
  }
344
};
345

346
struct CacheGeneratorImpl {
347
  std::multiset<FinalEntry> totalCache;
348
  HybridLeafGen root{};
349
  std::vector<std::vector<HybridLeafGen>> levels;
350
  std::atomic_bool allowThreads;
351
  StringSlider slider{allowThreads, true};
352
  StringSlider sliderTiny{allowThreads, false};
353
  size_t maxPathSize = 0;
354

355
  void AddFile(std::string_view fileName, size_t offset, size_t size) {
1,738✔
356
    maxPathSize = std::max(fileName.size(), maxPathSize);
1,738✔
357
    std::vector<SliderString> parts;
1,738✔
358
    SliderString finalKey{};
1,738✔
359

360
    {
361
      AFileInfo f(fileName);
1,738✔
362
      auto fileParts = f.Explode();
1,738✔
363
      parts.reserve(fileParts.size());
1,738✔
364
      auto finalPart = fileParts.back();
1,738✔
365
      fileParts.pop_back();
366

367
      for (auto &p : fileParts) {
8,619✔
368
        auto &sliderRef = p.size() > 4 ? slider : sliderTiny;
6,881✔
369
        const size_t position = sliderRef.InsertString(p);
6,881✔
370
        parts.emplace_back(SliderString{position, p.size(), &sliderRef});
6,881✔
371
      }
372

373
      finalKey.size = finalPart.size();
1,738✔
374

375
      if (finalPart.size() < 9) {
1,738✔
376
        const size_t position = sliderTiny.InsertString(finalPart);
91✔
377
        finalKey.offset = position;
91✔
378
        finalKey.base = &sliderTiny;
91✔
379
      } else {
380
        const size_t position = slider.InsertString(finalPart);
1,647✔
381
        finalKey.offset = position;
1,647✔
382
        finalKey.base = &slider;
1,647✔
383
      }
384
    }
385

386
    const size_t numLevels = parts.size();
387

388
    if (numLevels > levels.size()) {
1,738✔
389
      levels.resize(numLevels);
5✔
390
    }
391

392
    auto AddFinal = [&](HybridLeafGen &where) {
1,738✔
393
      FinalEntry entry{finalKey, offset, size, fileName.size()};
1,738✔
394
      auto entryIter = totalCache.emplace(entry);
1,738✔
395
      where.finals.emplace(finalKey, entryIter.operator->());
1,738✔
396
    };
3,476✔
397

398
    if (!numLevels) {
1,738✔
399
      AddFinal(root);
7✔
400
      return;
401
    }
402

403
    auto AddChild = [&](HybridLeafGen &where, size_t index) {
6,881✔
404
      auto found = where.childrenIds.find(parts.at(index));
7,006✔
405

406
      if (es::IsEnd(where.childrenIds, found)) {
6,881✔
407
        where.childrenIds.emplace(parts.at(index), levels.at(index).size());
7,006✔
408
        levels.at(index).emplace_back();
125✔
409
        auto &levelItem = levels.at(index).back();
410
        levelItem.partName = parts.at(index);
125✔
411
        return &levelItem;
125✔
412
      } else {
413
        return &levels.at(index).at(found->second);
6,756✔
414
      }
415
    };
1,731✔
416

417
    auto *leaf = AddChild(root, 0);
1,731✔
418

419
    for (size_t l = 1; l < numLevels; l++) {
6,881✔
420
      leaf = AddChild(*leaf, l);
5,150✔
421
    }
422

423
    AddFinal(*leaf);
1,731✔
424
  }
425

426
  void Write(BinWritterRef wr, CacheBaseHeader &hdr,
1✔
427
             DetailedProgressBar *progress) {
428
    hdr.numFiles = totalCache.size();
1✔
429
    hdr.numLevels = levels.size() + 1;
1✔
430
    hdr.maxPathSize = maxPathSize;
1✔
431
    wr.Write(hdr);
432
    wr.Skip(12);
1✔
433
    wr.WriteContainer(slider.buffer);
434
    wr.ApplyPadding();
1✔
435

436
    if (progress) {
1✔
437
      progress->ItemCount(totalCache.size() + levels.size());
1✔
438
    }
439

440
    const int32 entriesOffset = (wr.Tell() - ENTRIES_OFFSET) / 4;
1✔
441
    int32 rootOffset;
442

443
    for (auto &f : totalCache) {
1,739✔
444
      f.Write(wr);
1,738✔
445
      if (progress) {
1,738✔
446
        (*progress)++;
447
      }
448
    }
449

450
    {
451
      std::vector<size_t> childrenOffsets;
1✔
452

453
      for (int64 l = levels.size() - 1; l >= 0; l--) {
10✔
454
        if (progress) {
9✔
455
          (*progress)++;
456
        }
457
        std::vector<size_t> newChildrenOffsets;
9✔
458

459
        for (auto &l : levels.at(l)) {
134✔
460
          auto retVal = l.Write(wr, childrenOffsets);
125✔
461
          newChildrenOffsets.push_back(retVal);
125✔
462
        }
463

464
        std::swap(childrenOffsets, newChildrenOffsets);
465
      }
466

467
      const int32 rootOffset_ = root.Write(wr, childrenOffsets);
1✔
468
      rootOffset = (rootOffset_ - ROOT_OFFSET) / 4;
1✔
469
    }
470

471
    const uint32 cacheSize = wr.Tell();
1✔
472

473
    wr.Push();
474
    wr.Seek(0);
475
    wr.Write(hdr);
476
    wr.Write(cacheSize);
477
    wr.Write(rootOffset);
478
    wr.Write(entriesOffset);
479
    wr.Pop();
480
  }
1✔
481
};
482

483
struct WALThread {
484
  std::string walFile;
485
  std::ofstream walStreamIn;
486
  std::ifstream walStreamOut;
487
  std::atomic_size_t sharedCounter;
488
  std::atomic_bool isDone;
489
  std::atomic<CounterLine *> totalCount{nullptr};
490
  CounterLine *totalCountOwned{nullptr};
491
  CacheGeneratorImpl generator;
492
  std::promise<void> state;
493
  std::future<void> exception;
494

495
  WALThread()
1✔
496
      : walFile(RequestTempFile()), walStreamIn(walFile), walStreamOut(walFile),
1✔
497
        exception(state.get_future()) {
1✔
498
    if (walStreamIn.fail()) {
1✔
499
      throw std::runtime_error("Failed to create wal file.");
×
500
    }
501
    if (walStreamOut.fail()) {
1✔
502
      throw std::runtime_error("Failed to open wal file.");
×
503
    }
504
  }
1✔
505

506
  WALThread(WALThread &&) = delete;
507
  WALThread(const WALThread &) = delete;
508

509
  void Loop() {
1✔
510
    size_t lastOffset = 0;
511

512
    while (!isDone || sharedCounter > 0) {
1,740✔
513
      if (sharedCounter == 0) {
1,739✔
514
        std::this_thread::sleep_for(std::chrono::milliseconds(5));
1✔
515
        continue;
1✔
516
      }
517

518
      if (!totalCountOwned && totalCount) {
1,738✔
519
        totalCountOwned = totalCount;
1✔
520
      }
521

522
      std::string curData;
523
      if (std::getline(walStreamOut, curData).eof()) {
1,738✔
524
        std::this_thread::sleep_for(std::chrono::milliseconds(5));
×
525
        walStreamOut.clear();
×
526
        walStreamOut.seekg(lastOffset);
×
527
        continue;
528
      }
529

530
      lastOffset = walStreamOut.tellg();
1,738✔
531

532
      char path[0x1000]{};
1,738✔
533
      size_t zipOffset;
534
      size_t fileSize;
535
      std::sscanf(curData.c_str(), "%[^;];%zx;%zx", path, &zipOffset,
1,738✔
536
                  &fileSize);
537
      sharedCounter--;
538
      generator.AddFile(path, zipOffset, fileSize);
1,738✔
539

540
      if (totalCountOwned) {
1,738✔
541
        (*totalCountOwned)++;
542
      }
543

544
      if (auto curException = std::current_exception(); curException) {
1,738✔
545
        state.set_exception(curException);
×
546
        std::terminate();
×
547
      }
548
    }
549

550
    state.set_value();
1✔
551
  }
1✔
552
};
553

554
CacheGenerator::CacheGenerator()
1✔
555
    : workThread(std::make_unique<WALThread>()),
1✔
556
      walThread([&] { workThread->Loop(); }) {
2✔
557
  pthread_setname_np(walThread.native_handle(), "cache_wal");
1✔
558
}
1✔
559

560
CacheGenerator::~CacheGenerator() = default;
1✔
561

562
void CacheGenerator::AddFile(std::string_view fileName, size_t zipOffset,
1,738✔
563
                             size_t fileSize) {
564
  workThread->walStreamIn << fileName << ';' << std::hex << zipOffset << ';'
3,476✔
565
                          << fileSize << '\n';
1,738✔
566
  if (workThread->walStreamIn.fail()) {
1,738✔
567
    throw std::runtime_error("Failed to add file to WAL stream");
×
568
  }
569
  workThread->sharedCounter++;
570
}
1,738✔
571

572
void CacheGenerator::WaitAndWrite(BinWritterRef wr) {
1✔
573
  workThread->isDone = true;
574
  workThread->generator.allowThreads = true;
575
  es::Dispose(workThread->walStreamIn);
1✔
576
  DetailedProgressBar *prog = nullptr;
577

578
  if (size_t count = workThread->sharedCounter; count > 1000) {
1✔
579
    prog = AppendNewLogLine<DetailedProgressBar>("Cache: ");
1✔
580
    prog->ItemCount(count);
1✔
581
    workThread->totalCount = prog;
1✔
582
  }
583

584
  if (workThread->exception.valid()) {
1✔
585
    workThread->exception.get();
1✔
586
  }
587

588
  if (walThread.joinable()) {
1✔
589
    walThread.join();
1✔
590
  }
591

592
  es::Dispose(workThread->walStreamOut);
1✔
593
  std::remove(workThread->walFile.c_str());
1✔
594

595
  workThread->generator.Write(wr, meta, prog);
1✔
596

597
  if (prog) {
1✔
598
    RemoveLogLines(prog);
1✔
599
  }
600
}
1✔
601

602
CacheGenerator::Metrics CacheGenerator::GlobalMetrics() {
×
603
  return {searchHitCount, searchMissCount};
×
604
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc