• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HDFGroup / hermes / 4851036339

pending completion
4851036339

Pull #515

github

GitHub
Merge aee741046 into 87672e106
Pull Request #515: v1.0

5501 of 5501 new or added lines in 117 files covered. (100.0%)

4997 of 7299 relevant lines covered (68.46%)

6131966.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.73
/src/buffer_pool.h
1
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
2
 * Distributed under BSD 3-Clause license.                                   *
3
 * Copyright by The HDF Group.                                               *
4
 * Copyright by the Illinois Institute of Technology.                        *
5
 * All rights reserved.                                                      *
6
 *                                                                           *
7
 * This file is part of Hermes. The full Hermes copyright notice, including  *
8
 * terms governing use, modification, and redistribution, is contained in    *
9
 * the COPYING file, which can be found at the top directory. If you do not  *
10
 * have access to the file, you may request a copy from help@hdfgroup.org.   *
11
 * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
12

13
#ifndef HERMES_SRC_BUFFER_POOL_H_
14
#define HERMES_SRC_BUFFER_POOL_H_
15

16
#include "hermes_types.h"
17
#include "rpc.h"
18

19
namespace hermes {
20

21
class MetadataManager;
22
class BufferOrganizer;
23
class BufferPool;
24

25
struct BpCoin {
26
  size_t count_;
27
  size_t slab_size_;
28

29
  BpCoin() : count_(0), slab_size_(0) {}
112,104✔
30
};
31

32
struct BpSlot {
33
  size_t t_off_;    /**< Offset of the buffer in the target */
34
  size_t t_size_;   /**< Size of the buffer in the target*/
35

36
  BpSlot() : t_size_(0) {}
37

38
  BpSlot(size_t t_off, size_t t_size) : t_off_(t_off), t_size_(t_size) {}
122,789✔
39

40
  bool IsNull() {
243,439✔
41
    return t_size_ == 0;
243,439✔
42
  }
43
};
44

45
struct BpFreeListStat {
46
  std::atomic<size_t> region_off_;  /**< Current offset in the target */
47
  std::atomic<size_t> region_size_; /**< Current space remaining in the tgt */
48
  size_t page_size_;  /**< The size of page in this buffer list */
49
  size_t cur_count_;  /**< Current number of pages allocated */
50
  size_t max_count_;  /**< Max pages allocated at one time */
51
  Mutex lock_;        /**< The modifier lock for this slot */
52

53
  /** Default constructor */
54
  BpFreeListStat() = default;
896✔
55

56
  /** Copy constructor */
57
  BpFreeListStat(const BpFreeListStat &other) {
×
58
    strong_copy(other);
×
59
  }
60

61
  /** Copy assignment operator */
62
  BpFreeListStat& operator=(const BpFreeListStat &other) {
63
    strong_copy(other);
64
    return *this;
65
  }
66

67
  /** Move constructor */
68
  BpFreeListStat(BpFreeListStat &&other) {
×
69
    strong_copy(other);
×
70
  }
71

72
  /** Move assignment operator */
73
  BpFreeListStat& operator=(BpFreeListStat &&other) {
74
    strong_copy(other);
75
    return *this;
76
  }
77

78
  /** Internal copy */
79
  void strong_copy(const BpFreeListStat &other) {
×
80
    region_off_ = other.region_off_.load();
×
81
    region_size_ = other.region_size_.load();
×
82
    page_size_ = other.page_size_;
×
83
    cur_count_ = other.cur_count_;
×
84
    max_count_ = other.max_count_;
×
85
  }
×
86
};
87

88
/** Represents the list of available buffers */
89
typedef hipc::slist<BpSlot> BpFreeList;
90

91
/** Represents a cache of buffer size in the target */
92
typedef hipc::pair<BpFreeListStat, BpFreeList> BpFreeListPair;
93

94
/** Represents the set of targets */
95
typedef hipc::vector<BpFreeListPair> BpTargetAllocs;
96

97
/**
98
 * The shared-memory representation of the BufferPool
99
 * */
100
struct BufferPoolShm {
101
  hipc::ShmArchive<BpTargetAllocs> free_lists_;
102
  u16 ntargets_;
103
  size_t concurrency_;
104
  size_t nslabs_;
105

106
  /**
107
   * Get the free list of the target
108
   * This is where region_off_ & region_size_ in the BpFreeListStat are valid
109
   * */
110
  size_t GetCpuTargetStat(u16 target, int cpu) {
125,457✔
111
    return target * concurrency_ * nslabs_ + cpu * nslabs_;
125,457✔
112
  }
113

114
  /**
115
   * Get the start of the vector of the free list for the CPU in the target
116
   * This is where page_size_, cur_count_, and max_count_ are valid.
117
   *
118
   * [target] [cpu] [slab_id]
119
   * */
120
  size_t GetCpuFreeList(u16 target, int cpu, int slab_id) {
125,233✔
121
    return target * concurrency_ * nslabs_ + cpu * nslabs_ + slab_id;
125,233✔
122
  }
123
};
124

125
/**
126
 * Responsible for managing the buffering space of all node-local targets.
127
 * */
128
class BufferPool {
129
 private:
130
  MetadataManager *mdm_;
131
  BufferOrganizer *borg_;
132
  RPC_TYPE *rpc_;
133
  BufferPoolShm *header_;
134
  /** Per-target allocator */
135
  BpTargetAllocs *target_allocs_;
136

137
 public:
138
  /**====================================
139
   * Default Constructor
140
   * ===================================*/
141

142
  /** Default constructor */
143
  BufferPool() = default;
144

145
  /**====================================
146
   * SHM Init
147
   * ===================================*/
148

149
  /**
150
   * Initialize the BPM and its shared memory.
151
   * REQUIRES mdm to be initialized already.
152
   * */
153
  void shm_init(hipc::ShmArchive<BufferPoolShm> &header,
154
                hipc::Allocator *alloc);
155

156
  /**====================================
157
   * SHM Deserialize
158
   * ===================================*/
159

160
  /** Deserialize the BPM from shared memory */
161
  void shm_deserialize(hipc::ShmArchive<BufferPoolShm> &header);
162

163
  /**====================================
164
   * Destructor
165
   * ===================================*/
166

167
  /** Destroy the BPM shared memory. */
168
  void shm_destroy_main();
169

170
  /**====================================
171
   * Allocate Buffers
172
   * ===================================*/
173

174
  /**
175
   * Allocate buffers from the targets according to the schema
176
   * */
177
  RPC std::vector<BufferInfo>
178
  LocalAllocateAndSetBuffers(PlacementSchema &schema,
179
                             const Blob &blob);
180
  std::vector<BufferInfo>
181
  GlobalAllocateAndSetBuffers(PlacementSchema &schema,
182
                              const Blob &blob);
183

184
  /**
185
   * Determines a reasonable allocation of buffers based on the size of I/O.
186
   * Returns the number of each slab size to allocate
187
   * */
188
  std::vector<BpCoin> CoinSelect(DeviceInfo &dev_info,
189
                                 size_t total_size,
190
                                 size_t &buffer_count);
191

192
  /**
193
   * Allocate requested slabs from this target.
194
   * If the target runs out of space, it will provision from the next target.
195
   * We assume there is a baseline target, which can house an infinite amount
196
   * of data. This would be the PFS in an HPC machine.
197
   *
198
   * @param total_size the total amount of data being placed in this target
199
   * @param coins The requested number of slabs to allocate from this target
200
   * @param tid The ID of the (ideal) target to allocate from
201
   * @param cpu The CPU we are currently scheduled on
202
   * @param blob_off [out] The current size of the blob which has been placed
203
   * @param buffers [out] The buffers which were allocated
204
   * */
205
  void AllocateBuffers(size_t total_size,
206
                       std::vector<BpCoin> &coins,
207
                       TargetId tid,
208
                       int cpu,
209
                       size_t &blob_off,
210
                       std::vector<BufferInfo> &buffers);
211

212
  /**
213
   * Allocate a set of slabs of particular size from this target.
214
   *
215
   * @param rem_size the amount of data remaining that needs to be allocated
216
   * @param slab_size Size of the slabs to allocate
217
   * @param slab_count The count of this slab size to allocate
218
   * @param slab_id The offset of the slab in the device's slab list
219
   * @param tid The target to allocate slabs from
220
   * @param blob_off [out] The current size of the blob which has been placed
221
   * @param buffers [out] The buffers which were allocated
222
   * */
223
  void AllocateSlabs(size_t &rem_size,
224
                     size_t slab_size,
225
                     size_t &slab_count,
226
                     size_t slab_id,
227
                     TargetId tid,
228
                     int cpu,
229
                     size_t &blob_off,
230
                     std::vector<BufferInfo> &buffers);
231

232
  /**
233
   * Allocate a buffer of a particular size
234
   *
235
   * @param slab_id The size of slab to allocate
236
   * @param tid The target to (ideally) allocate the slab from
237
   * @param coin The buffer size information
238
   *
239
   * @return returns a BufferPool (BP) slot. The slot is NULL if the
240
   * target didn't have enough remaining space.
241
   * */
242
  BpSlot AllocateSlabSize(int cpu,
243
                          size_t slab_size,
244
                          BpFreeList *free_list,
245
                          BpFreeListStat *stat,
246
                          BpFreeListStat *target_stat);
247

248

249
  /**====================================
250
   * Free Buffers
251
   * ===================================*/
252

253
  /**
254
   * Free buffers from the BufferPool
255
   * */
256
  RPC bool LocalReleaseBuffers(std::vector<BufferInfo> &buffers);
257
  bool GlobalReleaseBuffers(std::vector<BufferInfo> &buffers);
258

259
  /**====================================
260
   * Helper Methods
261
   * ===================================*/
262

263
  /** Get a free list reference */
264
  void GetFreeListForCpu(u16 target_id, int cpu, int slab_id,
265
                         BpFreeList* &free_list,
266
                         BpFreeListStat* &free_list_stat);
267

268
  /** Get the stack allocator from the cpu */
269
  void GetTargetStatForCpu(u16 target_id, int cpu,
270
                           BpFreeListStat* &target_stat);
271

272
  /** Find instance of unique target if it exists */
273
  static std::vector<std::pair<i32, size_t>>::iterator
274
  FindUniqueNodeId(std::vector<std::pair<i32, size_t>> &unique_nodes,
79,752✔
275
                   i32 node_id) {
276
    for (auto iter = unique_nodes.begin(); iter != unique_nodes.end(); ++iter) {
79,752✔
277
      if (iter->first == node_id) {
527✔
278
        return iter;
279
      }
280
    }
281
    return unique_nodes.end();
79,752✔
282
  }
283

284
  /** Get the unique set of targets */
285
  static std::vector<std::pair<i32, size_t>>
286
  GroupByNodeId(std::vector<BufferInfo> &buffers, size_t &total_size) {
53,947✔
287
    total_size = 0;
53,947✔
288
    std::vector<std::pair<i32, size_t>> unique_nodes;
53,947✔
289
    for (BufferInfo &info : buffers) {
105,699✔
290
      auto iter = FindUniqueNodeId(unique_nodes, info.tid_.GetNodeId());
51,752✔
291
      if (iter == unique_nodes.end()) {
51,752✔
292
        unique_nodes.emplace_back(info.tid_.GetNodeId(), info.blob_size_);
51,225✔
293
      } else {
294
        (*iter).second += info.blob_size_;
527✔
295
      }
296
      total_size += info.blob_size_;
51,752✔
297
    }
298
    return unique_nodes;
53,947✔
299
  }
300

301
  /** Get the unique set of targets */
302
  static std::vector<std::pair<i32, size_t>>
303
  GroupByNodeId(PlacementSchema &schema, size_t &total_size) {
28,000✔
304
    total_size = 0;
28,000✔
305
    std::vector<std::pair<i32, size_t>> unique_nodes;
28,000✔
306
    for (auto &plcmnt : schema.plcmnts_) {
56,000✔
307
      auto iter = FindUniqueNodeId(unique_nodes, plcmnt.tid_.GetNodeId());
28,000✔
308
      if (iter == unique_nodes.end()) {
28,000✔
309
        unique_nodes.emplace_back(plcmnt.tid_.GetNodeId(), plcmnt.size_);
28,000✔
310
      } else {
311
        (*iter).second += plcmnt.size_;
×
312
      }
313
      total_size += plcmnt.size_;
28,000✔
314
    }
315
    return unique_nodes;
28,000✔
316
  }
317
};
318

319
}  // namespace hermes
320

321
#endif  // HERMES_SRC_BUFFER_POOL_H_
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc