• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

PredatorCZ / PreCore / 461

pending completion
461

push

github-actions-ci

PredatorCZ
update readme

3204 of 6096 relevant lines covered (52.56%)

354.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.06
/src/app/out_cache.cpp
1
/*  Cache format generator for seekless zip loading
2

3
    Copyright 2021-2023 Lukas Cone
4

5
    Licensed under the Apache License, Version 2.0 (the "License");
6
    you may not use this file except in compliance with the License.
7
    You may obtain a copy of the License at
8

9
        http://www.apache.org/licenses/LICENSE-2.0
10

11
    Unless required by applicable law or agreed to in writing, software
12
    distributed under the License is distributed on an "AS IS" BASIS,
13
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
    See the License for the specific language governing permissions and
15
    limitations under the License.
16
*/
17

18
#include "spike/app/cache.hpp"
19
#include "spike/app/console.hpp"
20
#include "spike/app/tmp_storage.hpp"
21
#include "spike/io/binwritter_stream.hpp"
22
#include "spike/io/fileinfo.hpp"
23
#include <algorithm>
24
#include <barrier>
25
#include <cinttypes>
26
#include <fstream>
27
#include <functional>
28
#include <future>
29
#include <map>
30
#include <set>
31
#include <sstream>
32
#include <string>
33
#include <vector>
34

35
static std::atomic_size_t searchHitCount;
36
static std::atomic_size_t searchMissCount;
37

38
struct StringSlider {
39
  std::string buffer;
40

41
private:
42
  std::vector<std::thread> workingThreads;
43
  std::map<std::thread::id, int64> threadResults;
44
  struct SearchArgs {
45
    size_t chunkBegin;
46
    size_t splitPoint;
47
    std::string_view str;
48
    std::boyer_moore_horspool_searcher<std::string_view::iterator> *searcher;
49
    bool endThread;
50
  };
51
  std::map<std::thread::id, SearchArgs> threadArgs;
52

53
  // All threads finished searching, notify main thread and resume ready state
54
  struct {
55
    std::map<std::thread::id, int64> *threadResults;
56
    std::atomic_bool *finishedSearch;
57
    int64 *resultOffset;
58
    void operator()() noexcept {
×
59
      // PrintInfo("Processing worker data");
60
      *resultOffset = -1;
×
61
      for (auto [_, off] : *threadResults) {
×
62
        if (off >= 0) {
×
63
          *resultOffset = off;
×
64
          break;
×
65
        }
66
      }
67

68
      // PrintInfo("Search end");
69

70
      *finishedSearch = true;
×
71
      finishedSearch->notify_all();
×
72
    }
73
  } workersDone;
74

75
  std::barrier<decltype(workersDone)> workerSync;
76

77
  // Call when threads are in ready state and notify main thread
78
  struct {
79
    std::atomic_bool *runWorkers;
80
    void operator()() noexcept {
2✔
81
      runWorkers->wait(false);
2✔
82
      *runWorkers = false;
2✔
83
      runWorkers->notify_all();
2✔
84
    }
2✔
85
  } workersReadyCb;
86
  std::barrier<decltype(workersReadyCb)> workersReady;
87

88
  std::atomic_bool stopWorkers;
89
  std::atomic_bool finishedSearch;
90
  std::atomic_bool runWorkers;
91

92
  int64 resultOffset = -1;
93

94
  std::atomic_bool &allowThreads;
95
  bool doMetrics = false;
96
  size_t searchHitCount = 0;
97
  size_t searchMissCount = 0;
98

99
public:
100
  StringSlider(std::atomic_bool &allowThreads, bool metrics)
2✔
101
      : workersDone{&threadResults, &finishedSearch, &resultOffset},
2✔
102
        workerSync(std::thread::hardware_concurrency(), workersDone),
2✔
103
        workersReadyCb{&runWorkers},
2✔
104
        workersReady(std::thread::hardware_concurrency(), workersReadyCb),
4✔
105
        allowThreads(allowThreads), doMetrics(metrics) {
2✔
106
    const size_t numThreads = std::thread::hardware_concurrency();
2✔
107

108
    auto SearcherThread = [this, numThreads] {
8✔
109
      while (true) {
110
        // PrintInfo("Thread wait");
111
        workersReady.arrive_and_wait();
4✔
112

113
        // PrintInfo("Thread begin");
114

115
        if (stopWorkers) {
4✔
116
          return;
4✔
117
        }
118

119
        try {
120
          SearchArgs &args = threadArgs.at(std::this_thread::get_id());
×
121

122
          std::string_view item;
123
          if (args.endThread) {
×
124
            item = std::string_view(buffer.data() + args.chunkBegin,
×
125
                                    buffer.end().operator->());
126
          } else {
127
            item = std::string_view(buffer.data() + args.chunkBegin,
×
128
                                    args.splitPoint + args.str.size());
×
129
          }
130

131
          auto found = std::search(item.begin(), item.end(), *args.searcher);
×
132
          int64 offset = -1;
133

134
          if (found != item.end()) {
×
135
            offset =
136
                std::distance(const_cast<const char *>(buffer.data()), &*found);
137
          }
138

139
          threadResults.at(std::this_thread::get_id()) = offset;
×
140
        } catch (...) {
×
141
          std::terminate();
×
142
        }
143

144
        // PrintInfo("Thread end");
145
        workerSync.arrive_and_wait();
×
146
        // PrintInfo("Thread end wait");
147
      }
148
    };
2✔
149

150
    workingThreads.resize(numThreads);
2✔
151
    for (size_t i = 0; i < numThreads; i++) {
6✔
152
      workingThreads.at(i) = std::thread(SearcherThread);
4✔
153
      auto &curThread = workingThreads.at(i);
154
      threadArgs.emplace(curThread.get_id(), SearchArgs{});
4✔
155
      threadResults.emplace(curThread.get_id(), -1);
4✔
156
      pthread_setname_np(curThread.native_handle(), "cache_srch_wrkr");
4✔
157
    }
158
  }
2✔
159

160
  ~StringSlider() {
2✔
161
    stopWorkers = true;
162
    runWorkers = true;
163
    runWorkers.notify_all();
164

165
    for (auto &w : workingThreads) {
6✔
166
      if (w.joinable()) {
4✔
167
        w.join();
4✔
168
      }
169
    }
170

171
    if (doMetrics) {
2✔
172
      ::searchHitCount += this->searchHitCount;
1✔
173
      ::searchMissCount += this->searchMissCount;
1✔
174
    }
175
  }
2✔
176

177
  std::string::iterator FindString(std::string_view str) {
8,619✔
178
    if (str.size() > buffer.size()) [[unlikely]] {
8,619✔
179
      if (std::string_view(str).starts_with(buffer)) [[unlikely]] {
3✔
180
        buffer = str;
2✔
181
        return buffer.begin();
182
      } else {
183
        return buffer.end();
184
      }
185
    }
186
    auto searcher = std::boyer_moore_horspool_searcher(str.begin(), str.end());
8,616✔
187

188
    if (size_t bSize = buffer.size(); allowThreads && bSize > 1'000'000) {
8,616✔
189
      const size_t numThreads = workingThreads.size();
190
      size_t splitPoint = bSize / numThreads;
×
191
      size_t chunkBegin = 0;
192

193
      for (size_t i = 0; i < numThreads; i++) {
×
194
        threadArgs.at(workingThreads.at(i).get_id()) = SearchArgs{
×
195
            .chunkBegin = chunkBegin,
196
            .splitPoint = splitPoint,
197
            .str = str,
198
            .searcher = &searcher,
199
            .endThread = i + 1 == numThreads,
×
200
        };
201
        chunkBegin += splitPoint;
×
202
      }
203

204
      // PrintInfo("Notifying workers");
205
      finishedSearch = false;
206
      runWorkers.wait(true);
207
      runWorkers = true;
208
      runWorkers.notify_all();
209
      // PrintInfo("Waiting for workers");
210
      finishedSearch.wait(false);
211
      // PrintInfo("Search done");
212

213
      if (resultOffset >= 0) {
×
214
        return std::next(buffer.begin(), resultOffset);
215
      }
216

217
      return buffer.end();
218
    } else {
219
      return std::search(buffer.begin(), buffer.end(), searcher);
8,616✔
220
    }
221
  }
222

223
  size_t InsertString(std::string_view str) {
8,619✔
224
    auto found = FindString(str);
8,619✔
225

226
    if (found == buffer.end()) {
8,619✔
227
      searchMissCount++;
1,237✔
228
      // todo add tail compare?
229
      buffer.append(str.data(), str.size());
1,237✔
230
      return buffer.size() - str.size();
1,237✔
231
    } else {
232
      searchHitCount++;
7,382✔
233
      return std::distance(buffer.begin(), found);
7,382✔
234
    }
235
  }
236
};
237

238
struct SliderString {
239
  size_t offset;
240
  size_t size;
241
  StringSlider *base;
242

243
  bool operator<(const SliderString &other) const {
57,039✔
244
    return std::string_view(base->buffer.data() + offset, size) <
57,039✔
245
           std::string_view(other.base->buffer.data() + other.offset,
57,039✔
246
                            other.size);
57,039✔
247
  }
248
};
249

250
static constexpr size_t STRING_OFFSET = sizeof(CacheBaseHeader) + 12;
251
static constexpr size_t ENTRIES_OFFSET = sizeof(CacheBaseHeader) + 8;
252
static constexpr size_t ROOT_OFFSET = sizeof(CacheBaseHeader) + 4;
253
static constexpr size_t HYBRIDLEAF_PARENTPTR = 4;
254
static constexpr size_t FINAL_PARENTPTR = 16;
255

256
struct FinalEntry : SliderString {
257
  size_t zipOffset;
258
  size_t zipSize;
259
  size_t totalFileNameSize;
260
  mutable size_t wrOffset = 0;
261

262
  void Write(BinWritterRef wr) const {
1,738✔
263
    wrOffset = wr.Tell();
1,738✔
264
    wr.Write(zipOffset);
1,738✔
265
    wr.Write(zipSize);
1,738✔
266
    wr.Write<uint32>(0);
1,738✔
267
    wr.Write<uint16>(size);
1,738✔
268
    wr.Write<uint16>(totalFileNameSize);
1,738✔
269

270
    if (size < 9) {
1,738✔
271
      wr.WriteBuffer(base->buffer.data() + offset, size);
91✔
272
      wr.ApplyPadding(8);
91✔
273
    } else {
274
      const int32 stringOffset = STRING_OFFSET + offset;
1,647✔
275
      const int32 thisOffset = wr.Tell();
1,647✔
276
      wr.Write(stringOffset - thisOffset);
1,647✔
277
      wr.Write<uint32>(0);
1,647✔
278
    }
279
  }
1,738✔
280
};
281

282
struct HybridLeafGen {
283
  std::map<SliderString, const FinalEntry *> finals;
284
  std::map<SliderString, size_t> childrenIds;
285
  SliderString partName;
286

287
  size_t Write(BinWritterRef wr, const std::vector<size_t> &childrenOffsets) {
126✔
288
    wr.ApplyPadding(4);
126✔
289
    for (auto &[_, id] : childrenIds) {
251✔
290
      const int32 childOffset = childrenOffsets.at(id);
125✔
291
      const int32 thisOffset = wr.Tell();
125✔
292
      wr.Write((childOffset - thisOffset) / 4);
125✔
293
    }
294

295
    const size_t retVal = wr.Tell() - HYBRIDLEAF_PARENTPTR;
126✔
296

297
    // fixup parent offset for children
298
    wr.Push();
299
    for (auto &[_, id] : childrenIds) {
251✔
300
      const size_t childOffset = childrenOffsets.at(id);
125✔
301
      wr.Seek(childOffset + HYBRIDLEAF_PARENTPTR);
125✔
302
      const int32 thisOffset = retVal;
125✔
303
      const int32 memberOffset = wr.Tell();
125✔
304
      wr.Write((thisOffset - memberOffset) / 4);
125✔
305
    }
306
    wr.Pop();
307

308
    wr.Write<uint32>(0);
126✔
309
    if (!partName.size) {
126✔
310
      wr.Write<uint32>(0);
1✔
311
    } else if (partName.size < 5) {
125✔
312
      wr.WriteBuffer(partName.base->buffer.data() + partName.offset,
43✔
313
                     partName.size);
314
      wr.ApplyPadding(4);
43✔
315
    } else {
316
      const int32 pathPartOffset_ = STRING_OFFSET + partName.offset;
82✔
317
      const int32 thisOffset = wr.Tell();
82✔
318
      wr.Write(pathPartOffset_ - thisOffset);
82✔
319
    }
320

321
    wr.Write<uint16>(childrenIds.size());
126✔
322
    wr.Write<uint16>(partName.size);
126✔
323
    wr.Write<uint32>(finals.size());
126✔
324

325
    for (auto &[_, entry] : finals) {
1,864✔
326
      const int32 finalOffset = entry->wrOffset;
1,738✔
327
      const int32 thisOffset = wr.Tell();
1,738✔
328
      wr.Write((finalOffset - thisOffset) / 4);
1,738✔
329
    }
330

331
    wr.Push();
332
    for (auto &[_, entry] : finals) {
1,864✔
333
      const size_t finalOffset = entry->wrOffset;
1,738✔
334
      wr.Seek(finalOffset + FINAL_PARENTPTR);
1,738✔
335
      const int32 thisOffset = retVal;
1,738✔
336
      const int32 memberOffset = wr.Tell();
1,738✔
337
      wr.Write((thisOffset - memberOffset) / 4);
1,738✔
338
    }
339
    wr.Pop();
340

341
    return retVal;
126✔
342
  }
343
};
344

345
struct CacheGeneratorImpl {
346
  std::multiset<FinalEntry> totalCache;
347
  HybridLeafGen root{};
348
  std::vector<std::vector<HybridLeafGen>> levels;
349
  std::atomic_bool allowThreads;
350
  StringSlider slider{allowThreads, true};
351
  StringSlider sliderTiny{allowThreads, false};
352
  size_t maxPathSize = 0;
353

354
  void AddFile(std::string_view fileName, size_t offset, size_t size) {
1,738✔
355
    maxPathSize = std::max(fileName.size(), maxPathSize);
1,738✔
356
    std::vector<SliderString> parts;
1,738✔
357
    SliderString finalKey{};
1,738✔
358

359
    {
360
      AFileInfo f(fileName);
1,738✔
361
      auto fileParts = f.Explode();
1,738✔
362
      parts.reserve(fileParts.size());
1,738✔
363
      auto finalPart = fileParts.back();
1,738✔
364
      fileParts.pop_back();
365

366
      for (auto &p : fileParts) {
8,619✔
367
        auto &sliderRef = p.size() > 4 ? slider : sliderTiny;
6,881✔
368
        const size_t position = sliderRef.InsertString(p);
6,881✔
369
        parts.emplace_back(SliderString{position, p.size(), &sliderRef});
6,881✔
370
      }
371

372
      finalKey.size = finalPart.size();
1,738✔
373

374
      if (finalPart.size() < 9) {
1,738✔
375
        const size_t position = sliderTiny.InsertString(finalPart);
91✔
376
        finalKey.offset = position;
91✔
377
        finalKey.base = &sliderTiny;
91✔
378
      } else {
379
        const size_t position = slider.InsertString(finalPart);
1,647✔
380
        finalKey.offset = position;
1,647✔
381
        finalKey.base = &slider;
1,647✔
382
      }
383
    }
384

385
    const size_t numLevels = parts.size();
386

387
    if (numLevels > levels.size()) {
1,738✔
388
      levels.resize(numLevels);
5✔
389
    }
390

391
    auto AddFinal = [&](HybridLeafGen &where) {
1,738✔
392
      FinalEntry entry{finalKey, offset, size, fileName.size()};
1,738✔
393
      auto entryIter = totalCache.emplace(entry);
1,738✔
394
      where.finals.emplace(finalKey, entryIter.operator->());
1,738✔
395
    };
3,476✔
396

397
    if (!numLevels) {
1,738✔
398
      AddFinal(root);
7✔
399
      return;
400
    }
401

402
    auto AddChild = [&](HybridLeafGen &where, size_t index) {
6,881✔
403
      auto found = where.childrenIds.find(parts.at(index));
7,006✔
404

405
      if (es::IsEnd(where.childrenIds, found)) {
6,881✔
406
        where.childrenIds.emplace(parts.at(index), levels.at(index).size());
7,006✔
407
        levels.at(index).emplace_back();
125✔
408
        auto &levelItem = levels.at(index).back();
409
        levelItem.partName = parts.at(index);
125✔
410
        return &levelItem;
125✔
411
      } else {
412
        return &levels.at(index).at(found->second);
6,756✔
413
      }
414
    };
1,731✔
415

416
    auto *leaf = AddChild(root, 0);
1,731✔
417

418
    for (size_t l = 1; l < numLevels; l++) {
6,881✔
419
      leaf = AddChild(*leaf, l);
5,150✔
420
    }
421

422
    AddFinal(*leaf);
1,731✔
423
  }
424

425
  void Write(BinWritterRef wr, CacheBaseHeader &hdr,
1✔
426
             DetailedProgressBar *progress) {
427
    hdr.numFiles = totalCache.size();
1✔
428
    hdr.numLevels = levels.size() + 1;
1✔
429
    hdr.maxPathSize = maxPathSize;
1✔
430
    wr.Write(hdr);
431
    wr.Skip(12);
1✔
432
    wr.WriteContainer(slider.buffer);
433
    wr.ApplyPadding();
1✔
434

435
    if (progress) {
1✔
436
      progress->ItemCount(totalCache.size() + levels.size());
1✔
437
    }
438

439
    const int32 entriesOffset = (wr.Tell() - ENTRIES_OFFSET) / 4;
1✔
440
    int32 rootOffset;
441

442
    for (auto &f : totalCache) {
1,739✔
443
      f.Write(wr);
1,738✔
444
      if (progress) {
1,738✔
445
        (*progress)++;
446
      }
447
    }
448

449
    {
450
      std::vector<size_t> childrenOffsets;
1✔
451

452
      for (int64 l = levels.size() - 1; l >= 0; l--) {
10✔
453
        if (progress) {
9✔
454
          (*progress)++;
455
        }
456
        std::vector<size_t> newChildrenOffsets;
9✔
457

458
        for (auto &l : levels.at(l)) {
134✔
459
          auto retVal = l.Write(wr, childrenOffsets);
125✔
460
          newChildrenOffsets.push_back(retVal);
125✔
461
        }
462

463
        std::swap(childrenOffsets, newChildrenOffsets);
464
      }
465

466
      const int32 rootOffset_ = root.Write(wr, childrenOffsets);
1✔
467
      rootOffset = (rootOffset_ - ROOT_OFFSET) / 4;
1✔
468
    }
469

470
    const uint32 cacheSize = wr.Tell();
1✔
471

472
    wr.Push();
473
    wr.Seek(0);
474
    wr.Write(hdr);
475
    wr.Write(cacheSize);
476
    wr.Write(rootOffset);
477
    wr.Write(entriesOffset);
478
    wr.Pop();
479
  }
1✔
480
};
481

482
struct WALThread {
483
  std::string walFile;
484
  std::ofstream walStreamIn;
485
  std::ifstream walStreamOut;
486
  std::atomic_size_t sharedCounter;
487
  std::atomic_bool isDone;
488
  std::atomic<CounterLine *> totalCount{nullptr};
489
  CounterLine *totalCountOwned{nullptr};
490
  CacheGeneratorImpl generator;
491
  std::promise<void> state;
492
  std::future<void> exception;
493

494
  WALThread()
1✔
495
      : walFile(RequestTempFile()), walStreamIn(walFile), walStreamOut(walFile),
1✔
496
        exception(state.get_future()) {
1✔
497
    if (walStreamIn.fail()) {
1✔
498
      throw std::runtime_error("Failed to create wal file.");
×
499
    }
500
    if (walStreamOut.fail()) {
1✔
501
      throw std::runtime_error("Failed to open wal file.");
×
502
    }
503
  }
1✔
504

505
  WALThread(WALThread &&) = delete;
506
  WALThread(const WALThread &) = delete;
507

508
  void Loop() {
1✔
509
    size_t lastOffset = 0;
510

511
    while (!isDone || sharedCounter > 0) {
1,740✔
512
      if (sharedCounter == 0) {
1,739✔
513
        std::this_thread::sleep_for(std::chrono::milliseconds(5));
1✔
514
        continue;
1✔
515
      }
516

517
      if (!totalCountOwned && totalCount) {
1,738✔
518
        totalCountOwned = totalCount;
1✔
519
      }
520

521
      std::string curData;
522
      if (std::getline(walStreamOut, curData).eof()) {
1,738✔
523
        std::this_thread::sleep_for(std::chrono::milliseconds(5));
×
524
        walStreamOut.clear();
×
525
        walStreamOut.seekg(lastOffset);
×
526
        continue;
527
      }
528

529
      lastOffset = walStreamOut.tellg();
1,738✔
530

531
      char path[0x1000]{};
1,738✔
532
      size_t zipOffset;
533
      size_t fileSize;
534
      std::sscanf(curData.c_str(), "%[^;];%zx;%zx", path, &zipOffset,
1,738✔
535
                  &fileSize);
536
      sharedCounter--;
537
      generator.AddFile(path, zipOffset, fileSize);
1,738✔
538

539
      if (totalCountOwned) {
1,738✔
540
        (*totalCountOwned)++;
541
      }
542

543
      if (auto curException = std::current_exception(); curException) {
1,738✔
544
        state.set_exception(curException);
×
545
        std::terminate();
×
546
      }
547
    }
548

549
    state.set_value();
1✔
550
  }
1✔
551
};
552

553
CacheGenerator::CacheGenerator()
1✔
554
    : workThread(std::make_unique<WALThread>()),
1✔
555
      walThread([&] { workThread->Loop(); }) {
2✔
556
  pthread_setname_np(walThread.native_handle(), "cache_wal");
1✔
557
}
1✔
558

559
CacheGenerator::~CacheGenerator() = default;
1✔
560

561
void CacheGenerator::AddFile(std::string_view fileName, size_t zipOffset,
1,738✔
562
                             size_t fileSize) {
563
  workThread->walStreamIn << fileName << ';' << std::hex << zipOffset << ';'
3,476✔
564
                          << fileSize << '\n';
1,738✔
565
  if (workThread->walStreamIn.fail()) {
1,738✔
566
    throw std::runtime_error("Failed to add file to WAL stream");
×
567
  }
568
  workThread->sharedCounter++;
569
}
1,738✔
570

571
void CacheGenerator::WaitAndWrite(BinWritterRef wr) {
1✔
572
  workThread->isDone = true;
573
  workThread->generator.allowThreads = true;
574
  es::Dispose(workThread->walStreamIn);
1✔
575
  DetailedProgressBar *prog = nullptr;
576

577
  if (size_t count = workThread->sharedCounter; count > 1000) {
1✔
578
    prog = AppendNewLogLine<DetailedProgressBar>("Cache: ");
1✔
579
    prog->ItemCount(count);
1✔
580
    workThread->totalCount = prog;
1✔
581
  }
582

583
  if (workThread->exception.valid()) {
1✔
584
    workThread->exception.get();
1✔
585
  }
586

587
  if (walThread.joinable()) {
1✔
588
    walThread.join();
1✔
589
  }
590

591
  es::Dispose(workThread->walStreamOut);
1✔
592
  std::remove(workThread->walFile.c_str());
1✔
593

594
  workThread->generator.Write(wr, meta, prog);
1✔
595

596
  if (prog) {
1✔
597
    RemoveLogLines(prog);
1✔
598
  }
599
}
1✔
600

601
CacheGenerator::Metrics CacheGenerator::GlobalMetrics() {
×
602
  return {searchHitCount, searchMissCount};
×
603
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc