• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HDFGroup / hermes / 4837888970

pending completion
4837888970

Pull #515

github

GitHub
Merge a0aa46f48 into 87672e106
Pull Request #515: v1.0

5501 of 5501 new or added lines in 117 files covered. (100.0%)

4996 of 7299 relevant lines covered (68.45%)

6141431.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.12
/src/buffer_pool.cc
1
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
2
* Distributed under BSD 3-Clause license.                                   *
3
* Copyright by The HDF Group.                                               *
4
* Copyright by the Illinois Institute of Technology.                        *
5
* All rights reserved.                                                      *
6
*                                                                           *
7
* This file is part of Hermes. The full Hermes copyright notice, including  *
8
* terms governing use, modification, and redistribution, is contained in    *
9
* the COPYING file, which can be found at the top directory. If you do not  *
10
* have access to the file, you may request a copy from help@hdfgroup.org.   *
11
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
12

13
#include "buffer_pool.h"
14
#include "metadata_manager.h"
15
#include "hermes.h"
16
#include "buffer_organizer.h"
17

18
namespace hermes {
19

20
/**====================================
21
 * Default Constructor
22
 * ===================================*/
23

24
/**
25
* Initialize the BPM and its shared memory.
26
* REQUIRES mdm to be initialized already.
27
* */
28
void BufferPool::shm_init(hipc::ShmArchive<BufferPoolShm> &header,
28✔
29
                          hipc::Allocator *alloc) {
30
  shm_deserialize(header);
28✔
31
  // Initialize header
32
  HSHM_MAKE_AR0(header_->free_lists_, alloc);
28✔
33
  // [target] [cpu] [page_size]
34
  header_->ntargets_ = mdm_->targets_->size();
28✔
35
  header_->concurrency_ = HERMES_SYSTEM_INFO->ncpu_;
28✔
36
  header_->nslabs_ = 0;
28✔
37
  // Get the maximum number of slab sizes over all targets
38
  for (TargetInfo &target : *mdm_->targets_) {
140✔
39
    int dev_id = target.id_.GetDeviceId();
112✔
40
    DeviceInfo &dev_info = (*mdm_->devices_)[dev_id];
112✔
41
    if (header_->nslabs_ < dev_info.slab_sizes_->size()) {
112✔
42
      header_->nslabs_ = dev_info.slab_sizes_->size();
28✔
43
    }
44
  }
45
  // Create target free lists
46
  target_allocs_->resize(header_->ntargets_ * header_->concurrency_ *
28✔
47
                         header_->nslabs_);
28✔
48
  // Initialize target free lists
49
  for (u16 target_id = 0; target_id < header_->ntargets_; ++target_id) {
140✔
50
    for (size_t cpu = 0; cpu < header_->concurrency_; ++cpu) {
336✔
51
      // Get the target + device info
52
      TargetInfo &target = (*mdm_->targets_)[target_id];
224✔
53
      int dev_id = target.id_.GetDeviceId();
224✔
54
      DeviceInfo &dev_info = (*mdm_->devices_)[dev_id];
224✔
55
      size_t size_per_core = target.max_cap_ / header_->concurrency_;
224✔
56
      if (size_per_core < MEGABYTES(1)) {
224✔
57
        HELOG(kFatal, "The capacity of the target {} ({} bytes)"
×
58
              " is not enough to give each of the {} CPUs at least"
59
              " 1MB of space",
60
              dev_info.mount_point_->str(),
61
              target.max_cap_, header_->concurrency_)
62
      }
63

64
      // Initialize the core's metadata
65
      BpFreeListStat *target_stat;
224✔
66
      GetTargetStatForCpu(target_id, cpu, target_stat);
224✔
67
      target_stat->region_off_ = cpu * size_per_core;
224✔
68
      target_stat->region_size_ = size_per_core;
224✔
69
      target_stat->lock_.Init();
224✔
70
    }
71
  }
72
}
28✔
73

74
/**====================================
75
 * SHM Deserialize
76
 * ===================================*/
77

78
/** Deserialize the BPM from shared memory */
79
void BufferPool::shm_deserialize(hipc::ShmArchive<BufferPoolShm> &header) {
105✔
80
  mdm_ = &HERMES->mdm_;
105✔
81
  borg_ = &HERMES->borg_;
105✔
82
  rpc_ = &HERMES->rpc_;
105✔
83
  header_ = header.get();
105✔
84
  target_allocs_ = header_->free_lists_.get();
105✔
85
}
105✔
86

87
/**====================================
88
 * Destructor
89
 * ===================================*/
90

91
/** Destroy the BPM shared memory. */
92
void BufferPool::shm_destroy_main() {
×
93
  target_allocs_->shm_destroy();
×
94
}
×
95

96
/**====================================
97
 * Allocate Buffers
98
 * ===================================*/
99

100
/**
101
* Allocate buffers from the targets according to the schema
102
* */
103
std::vector<BufferInfo>
104
BufferPool::LocalAllocateAndSetBuffers(PlacementSchema &schema,
27,909✔
105
                                       const Blob &blob) {
106
  AUTO_TRACE(1)
107
  std::vector<BufferInfo> buffers;
27,909✔
108
  size_t blob_off = 0;
27,909✔
109
  int cpu = hshm::NodeThreadId().hash() % header_->concurrency_;
27,909✔
110

111
  TIMER_START("AllocateBuffers")
112
  for (SubPlacement &plcmnt : schema.plcmnts_) {
55,830✔
113
    // Get the target and device in the placement schema
114
    if (plcmnt.tid_.GetNodeId() != mdm_->rpc_->node_id_) {
27,921✔
115
      blob_off += plcmnt.size_;
×
116
      continue;
×
117
    }
118
    int dev_id = plcmnt.tid_.GetDeviceId();
27,921✔
119
    DeviceInfo &dev_info = (*mdm_->devices_)[dev_id];
27,921✔
120

121
    // Get the number of each buffer size to allocate
122
    size_t buffer_count;
27,921✔
123
    std::vector<BpCoin> coins = CoinSelect(dev_info,
27,921✔
124
                                           plcmnt.size_,
125
                                           buffer_count);
55,842✔
126

127
    // Allocate buffers
128
    AllocateBuffers(plcmnt.size_,
27,921✔
129
                    coins,
130
                    plcmnt.tid_,
27,921✔
131
                    cpu,
132
                    blob_off,
133
                    buffers);
134
  }
135
  TIMER_END()
136
  borg_->LocalPlaceBlobInBuffers(blob, buffers);
27,909✔
137
  return buffers;
27,909✔
138
}
139

140
/**
141
 * The RPC of LocalAllocateAndSendBuffers
142
 * */
143
std::vector<BufferInfo>
144
BufferPool::GlobalAllocateAndSetBuffers(PlacementSchema &schema,
27,895✔
145
                                        const Blob &blob) {
146
  AUTO_TRACE(1)
147
  // Get the nodes to transfer buffers to
148
  size_t total_size;
27,895✔
149
  auto unique_nodes = GroupByNodeId(schema, total_size);
27,895✔
150
  std::vector<BufferInfo> info;
27,895✔
151

152
  // Send the buffers to each node
153
  for (auto &[node_id, size] : unique_nodes) {
55,790✔
154
    std::vector<BufferInfo> sub_info;
55,790✔
155
    if (NODE_ID_IS_LOCAL(node_id)) {
27,895✔
156
      sub_info = LocalAllocateAndSetBuffers(schema, blob);
27,895✔
157
    } else {
158
      sub_info = rpc_->IoCall<std::vector<BufferInfo>>(
×
159
          node_id, "RpcAllocateAndSetBuffers",
160
          IoType::kWrite, blob.data(), blob.size(),
161
          blob.size(), schema);
×
162
    }
163

164
    // Concatenate
165
    info.reserve(info.size() + sub_info.size());
27,895✔
166
    for (BufferInfo &tmp_info : sub_info) {
56,317✔
167
      info.emplace_back(tmp_info);
28,422✔
168
    }
169
  }
170

171
  return info;
55,790✔
172
}
173

174
/**
175
 * Allocate requested slabs from this target.
176
 * If the target runs out of space, it will provision from the next target.
177
 * We assume there is a baseline target, which can house an infinite amount
178
 * of data. This would be the PFS in an HPC machine.
179
 *
180
 * @param total_size the total amount of data being placed in this target
181
 * @param coins The requested number of slabs to allocate from this target
182
 * @param target_id The ID of the (ideal) target to allocate from
183
 * @param blob_off [out] The current size of the blob which has been placed
184
 * @param buffers [out] The buffers which were allocated
185
 * */
186
void BufferPool::AllocateBuffers(size_t total_size,
27,921✔
187
                                 std::vector<BpCoin> &coins,
188
                                 TargetId tid,
189
                                 int cpu,
190
                                 size_t &blob_off,
191
                                 std::vector<BufferInfo> &buffers) {
192
  // Get this target's stack allocator
193
  size_t rem_size = total_size;
27,921✔
194

195
  // Allocate each slab size
196
  for (size_t slab_id = 0; slab_id < coins.size(); ++slab_id) {
139,605✔
197
    size_t slab_size = coins[slab_id].slab_size_;
111,684✔
198
    size_t slab_count = coins[slab_id].count_;
111,684✔
199
    while (slab_count) {
173,519✔
200
      // Allocate slabs
201
      AllocateSlabs(rem_size, slab_size, slab_count, slab_id, tid,
61,835✔
202
                    cpu, blob_off, buffers);
203

204
      // Go to the next target if there was not enough space in this target
205
      if (slab_count > 0) {
61,835✔
206
        i32 target_id = tid.GetIndex() + 1;
33,914✔
207
        if (target_id >= (i32)mdm_->targets_->size()) {
33,914✔
208
          HELOG(kFatal, "BORG ran out of space on all targets."
×
209
                " This shouldn't happen."
210
                " Please increase the amount of space dedicated to PFS.")
211
        }
212
        tid = (*mdm_->targets_)[target_id].id_;
67,828✔
213
      }
214
    }
215
  }
216
}
27,921✔
217

218
/**
219
 * Allocate a set of slabs of particular size from this target.
220
 *
221
 * @param rem_size the amount of data remaining that needs to be allocated
222
 * @param slab_size Size of the slabs to allocate
223
 * @param slab_count The count of this slab size to allocate
224
 * @param slab_id The offset of the slab in the device's slab list
225
 * @param tid The target to allocate slabs from
226
 * @param cpu the CPU this node is on
227
 * @param blob_off [out] The current size of the blob which has been placed
228
 * @param buffers [out] The buffers which were allocated
229
 * */
230
void BufferPool::AllocateSlabs(size_t &rem_size,
61,835✔
231
                               size_t slab_size,
232
                               size_t &slab_count,
233
                               size_t slab_id,
234
                               TargetId tid,
235
                               int cpu,
236
                               size_t &blob_off,
237
                               std::vector<BufferInfo> &buffers) {
238
  int cpu_off = 0;
61,835✔
239
  int ncpu = header_->concurrency_;
61,835✔
240

241
  // Get the free list for this CPU
242
  BpFreeList *free_list;
61,835✔
243
  BpFreeListStat *free_list_stat;
61,835✔
244
  BpFreeListStat *target_stat;
61,835✔
245
  GetFreeListForCpu(tid.GetIndex(), cpu, slab_id, free_list,
61,835✔
246
                    free_list_stat);
247
  GetTargetStatForCpu(tid.GetIndex(), cpu, target_stat);
61,835✔
248
  target_stat->lock_.Lock(10);
61,835✔
249

250
  while (slab_count > 0) {
160,313✔
251
    BpSlot slot =
132,392✔
252
        AllocateSlabSize(cpu, slab_size,
253
                         free_list, free_list_stat, target_stat);
132,392✔
254
    if (slot.IsNull()) {
132,392✔
255
      if (cpu_off < ncpu) {
103,680✔
256
        cpu = (cpu + 1) % ncpu;
69,766✔
257
        target_stat->lock_.Unlock();
69,766✔
258
        GetFreeListForCpu(tid.GetIndex(), cpu, slab_id, free_list,
69,766✔
259
                          free_list_stat);
260
        GetTargetStatForCpu(tid.GetIndex(), cpu, target_stat);
69,766✔
261
        target_stat->lock_.Lock(11);
69,766✔
262
        cpu_off += 1;
69,766✔
263
        continue;
69,766✔
264
      } else {
265
        target_stat->lock_.Unlock();
33,914✔
266
        return;
33,914✔
267
      }
268
    }
269
    buffers.emplace_back();
28,712✔
270
    BufferInfo &info = buffers.back();
28,712✔
271
    info.t_off_ = slot.t_off_;
28,712✔
272
    info.t_size_ = slot.t_size_;
28,712✔
273
    info.t_slab_ = slab_id;
28,712✔
274
    info.blob_off_ = blob_off;
28,712✔
275
    info.blob_size_ = slot.t_size_;
28,712✔
276
    if (rem_size < slot.t_size_) {
28,712✔
277
      info.blob_size_ = rem_size;
7,361✔
278
    }
279
    rem_size -= info.blob_size_;
28,712✔
280
    info.tid_ = tid;
28,712✔
281
    blob_off += info.blob_size_;
28,712✔
282
    mdm_->LocalUpdateTargetCapacity(tid, -static_cast<off64_t>(slab_size));
28,712✔
283
    --slab_count;
28,712✔
284
  }
285
  target_stat->lock_.Unlock();
27,921✔
286
}
287

288
/**
289
 * Allocate a buffer of a particular size
290
 *
291
 * @param slab_id The size of slab to allocate
292
 * @param tid The target to (ideally) allocate the slab from
293
 * @param coin The buffer size information
294
 *
295
 * @return returns a BufferPool (BP) slot. The slot is NULL if the
296
 * target didn't have enough remaining space.
297
 * */
298
BpSlot BufferPool::AllocateSlabSize(int cpu,
132,392✔
299
                                    size_t slab_size,
300
                                    BpFreeList *free_list,
301
                                    BpFreeListStat *free_list_stat,
302
                                    BpFreeListStat *target_stat) {
303
  BpSlot slot(0, 0);
132,392✔
304

305
  // Case 1: Slab is cached on this core
306
  if (free_list->size()) {
132,392✔
307
    auto first = free_list->begin();
2,136✔
308
    slot = *first;
2,136✔
309
    free_list->erase(first);
2,136✔
310
    return slot;
2,136✔
311
  }
312

313
  // Case 2: Allocate slab from stack
314
  if (slot.IsNull() && target_stat->region_size_ >= slab_size) {
130,256✔
315
    slot.t_off_ = target_stat->region_off_.fetch_add(slab_size);
26,576✔
316
    slot.t_size_ = slab_size;
26,576✔
317
    target_stat->region_size_.fetch_sub(slab_size);
26,576✔
318
    return slot;
26,576✔
319
  }
320

321
  // Case 3: Coalesce
322
  // TOOD(llogan)
323

324
  // Case 4: No more space left in this target.
325
  return slot;
103,680✔
326
}
327

328
/**
329
   * Determines a reasonable allocation of buffers based on the size of I/O.
330
   * Returns the number of each slab size to allocate
331
   * */
332
std::vector<BpCoin> BufferPool::CoinSelect(DeviceInfo &dev_info,
27,921✔
333
                                           size_t total_size,
334
                                           size_t &buffer_count) {
335
  std::vector<BpCoin> coins(dev_info.slab_sizes_->size());
27,921✔
336
  size_t rem_size = total_size;
27,921✔
337
  buffer_count = 0;
27,921✔
338

339
  while (rem_size) {
55,848✔
340
    // Find the slab size nearest to the rem_size
341
    size_t i = 0, slab_size = 0;
27,927✔
342
    for (size_t &tmp_slab_size : *dev_info.slab_sizes_) {
100,133✔
343
      slab_size = tmp_slab_size;
100,108✔
344
      if (slab_size >= rem_size) {
100,108✔
345
        break;
346
      }
347
      ++i;
72,206✔
348
    }
349
    if (i == dev_info.slab_sizes_->size()) { i -= 1; }
27,927✔
350

351
    // Divide rem_size into slabs
352
    if (rem_size > slab_size) {
27,927✔
353
      coins[i].count_ += rem_size / slab_size;
25✔
354
      coins[i].slab_size_ = slab_size;
25✔
355
      rem_size %= slab_size;
25✔
356
    } else {
357
      coins[i].count_ += 1;
27,902✔
358
      coins[i].slab_size_ = slab_size;
27,902✔
359
      rem_size = 0;
27,902✔
360
    }
361
    buffer_count += coins[i].count_;
27,927✔
362
  }
363

364
  return coins;
27,921✔
365
}
366

367
/**====================================
368
 * Free Buffers
369
 * ===================================*/
370

371
/**
372
 * Free buffers from the BufferPool
373
 * */
374
bool BufferPool::LocalReleaseBuffers(std::vector<BufferInfo> &buffers) {
2,432✔
375
  AUTO_TRACE(1)
376
  HILOG(kDebug, "Releasing buffers")
2,432✔
377
  int cpu = hshm::NodeThreadId().hash() % header_->concurrency_;
2,432✔
378
  for (BufferInfo &info : buffers) {
5,667✔
379
    // Acquire the main CPU lock for the target
380
    BpFreeListStat *target_stat;
3,235✔
381
    GetTargetStatForCpu(info.tid_.GetIndex(), cpu, target_stat);
3,235✔
382
    hshm::ScopedMutex bpm_lock(target_stat->lock_, 12);
6,470✔
383

384
    // Get this core's free list for the page_size
385
    BpFreeListStat *free_list_stat;
3,235✔
386
    BpFreeList *free_list;
3,235✔
387
    GetFreeListForCpu(info.tid_.GetIndex(), cpu, info.t_slab_,
3,235✔
388
                      free_list, free_list_stat);
389
    free_list->emplace_front(info.t_off_, info.t_size_);
3,235✔
390
  }
391
  return true;
2,432✔
392
}
393

394
/**
395
 * Free buffers from the BufferPool (global)
396
 * */
397
bool BufferPool::GlobalReleaseBuffers(std::vector<BufferInfo> &buffers) {
2,418✔
398
  AUTO_TRACE(1)
399
  // Get the nodes to transfer buffers to
400
  size_t total_size;
2,418✔
401
  auto unique_nodes = GroupByNodeId(buffers, total_size);
2,418✔
402

403
  // Send the buffers to each node
404
  for (auto &[node_id, size] : unique_nodes) {
4,836✔
405
    if (NODE_ID_IS_LOCAL(node_id)) {
2,418✔
406
      LocalReleaseBuffers(buffers);
2,418✔
407
    } else {
408
      rpc_->Call<bool>(node_id, "RpcReleaseBuffers", buffers);
×
409
    }
410
  }
411

412
  return true;
4,836✔
413
}
414

415
/**====================================
416
 * Helper Methods
417
 * ===================================*/
418

419
/** Get a free list reference */
420
void BufferPool::GetFreeListForCpu(u16 target_id, int cpu, int slab_id,
134,836✔
421
                                   BpFreeList* &free_list,
422
                                   BpFreeListStat* &free_list_stat) {
423
  size_t cpu_free_list_idx = header_->GetCpuFreeList(target_id,
134,836✔
424
                                                     cpu, slab_id);
425
  if (cpu_free_list_idx >= target_allocs_->size()) {
134,836✔
426
    HELOG(kFatal, "For some reason, the CPU free list was "
×
427
          "not allocated properly and overflowed.")
428
  }
429
  BpFreeListPair &free_list_p =
134,836✔
430
      (*target_allocs_)[cpu_free_list_idx];
134,836✔
431
  free_list_stat = &free_list_p.GetFirst();
134,836✔
432
  free_list = &free_list_p.GetSecond();
134,836✔
433
}
134,836✔
434

435
/** Get the stack allocator from the cpu */
436
void BufferPool::GetTargetStatForCpu(u16 target_id, int cpu,
135,060✔
437
                                     BpFreeListStat* &target_stat) {
438
  size_t tgt_free_list_start = header_->GetCpuTargetStat(target_id, cpu);
135,060✔
439
  BpFreeListPair &free_list_p =
135,060✔
440
      (*target_allocs_)[tgt_free_list_start];
135,060✔
441
  target_stat = &free_list_p.GetFirst();
135,060✔
442
}
135,060✔
443

444
}  // namespace hermes
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc