• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HDFGroup / hermes / 4485341203

pending completion
4485341203

push

github

Hyo-Kyung Lee
ci: update checkout and cache action versions

6193 of 7556 relevant lines covered (81.96%)

646738.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.6
/src/buffer_pool.cc
1
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
2
 * Distributed under BSD 3-Clause license.                                   *
3
 * Copyright by The HDF Group.                                               *
4
 * Copyright by the Illinois Institute of Technology.                        *
5
 * All rights reserved.                                                      *
6
 *                                                                           *
7
 * This file is part of Hermes. The full Hermes copyright notice, including  *
8
 * terms governing use, modification, and redistribution, is contained in    *
9
 * the COPYING file, which can be found at the top directory. If you do not  *
10
 * have access to the file, you may request a copy from help@hdfgroup.org.   *
11
 * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
12

13
#include "buffer_pool.h"
14
#include "buffer_pool_internal.h"
15

16
#include <sys/mman.h>
17
#include <sys/stat.h>
18
#include <sys/types.h>
19
#include <assert.h>
20
#include <dlfcn.h>
21
#include <fcntl.h>
22
#include <sched.h>
23
#include <stdio.h>
24
#include <stdlib.h>
25
#include <string.h>
26
#include <unistd.h>
27
#include <stdio.h>
28
#include <errno.h>
29

30
#include <cmath>
31
#include <iostream>
32
#include <set>
33
#include <utility>
34
#include <vector>
35

36
#include <glog/logging.h>
37
#include "mpi.h"
38

39
#include "metadata_management.h"
40
#include "rpc.h"
41

42
#include "debug_state.cc"
43
#include "memory_management.cc"
44
#include "config_parser.cc"
45
#include "utils.cc"
46
#include "traits.cc"
47

48
#if defined(HERMES_COMMUNICATION_MPI)
49
#include "communication_mpi.cc"
50
#elif defined(HERMES_COMMUNICATION_ZMQ)
51
#include "communication_zmq.cc"
52
#else
53
#error "Communication implementation required " \
54
  "(e.g., -DHERMES_COMMUNICATION_MPI)."
55
#endif
56

57
#if defined(HERMES_RPC_THALLIUM)
58
#include "rpc_thallium.cc"
59
#else
60
#error "RPC implementation required (e.g., -DHERMES_RPC_THALLIUM)."
61
#endif
62

63
#include "metadata_management.cc"
64
#include "buffer_organizer.cc"
65

66
#if defined(HERMES_MDM_STORAGE_STBDS)
67
#include "metadata_storage_stb_ds.cc"
68
#else
69
#error "Metadata storage implementation required" \
70
  "(e.g., -DHERMES_MDM_STORAGE_STBDS)."
71
#endif
72

73
/**
74
 * @file buffer_pool.cc
75
 *
76
 * Implementation of a BufferPool that lives in shared memory. Other processes
77
 * interact with the BufferPool by requesting buffers through the `GetBuffers`
78
 * call to reserve a set of `BufferID`s and then using those IDs for I/O. Remote
79
 * processes can request remote buffers via the `GetBuffers` RPC call.
80
 */
81

82
namespace hermes {
83

84
/** comparison operator  */
85
bool operator==(const BufferID &lhs, const BufferID &rhs) {
201✔
86
  return lhs.as_int == rhs.as_int;
201✔
87
}
88

89
void Finalize(SharedMemoryContext *context, CommunicationContext *comm,
51✔
90
              RpcContext *rpc, const char *shmem_name, Arena *trans_arena,
91
              bool is_application_core, bool force_rpc_shutdown) {
92
  WorldBarrier(comm);
51✔
93
  if (!is_application_core && comm->first_on_node) {
51✔
94
      StopGlobalSystemViewStateUpdateThread(rpc);
42✔
95
  }
96
  WorldBarrier(comm);
51✔
97
  ShutdownRpcClients(rpc);
51✔
98

99
  if (is_application_core) {
51✔
100
    ReleaseSharedMemoryContext(context);
9✔
101
    HERMES_DEBUG_CLIENT_CLOSE();
102
  }
103
  WorldBarrier(comm);
51✔
104
  if (!is_application_core) {
51✔
105
    if (comm->first_on_node) {
42✔
106
      bool is_daemon =
42✔
107
        (comm->world_size == comm->num_nodes) && !force_rpc_shutdown;
42✔
108
      FinalizeRpcContext(rpc, is_daemon);
42✔
109
      LocalShutdownBufferOrganizer(context);
42✔
110
    }
111
    SubBarrier(comm);
42✔
112
    ReleaseSharedMemoryContext(context);
42✔
113
    shm_unlink(shmem_name);
42✔
114
    HERMES_DEBUG_SERVER_CLOSE();
115
  }
116
  DestroyArena(trans_arena);
51✔
117
  // TODO(chogan): https://github.com/HDFGroup/hermes/issues/323
118
  // google::ShutdownGoogleLogging();
119
}
51✔
120

121
/**
122
   Lock \a header buffer.
123
 */
124
void LockBuffer(BufferHeader *header) {
×
125
  while (true) {
126
    bool expected = false;
×
127
    if (header->locked.compare_exchange_weak(expected, true)) {
×
128
      break;
×
129
    }
130
  }
×
131
}
×
132

133
/**
134
   Unlock \a header buffer.
135
 */
136
void UnlockBuffer(BufferHeader *header) {
×
137
  header->locked.store(false);
×
138
}
×
139

140
BufferPool *GetBufferPoolFromContext(SharedMemoryContext *context) {
8,378,160✔
141
  BufferPool *result = (BufferPool *)(context->shm_base +
8,378,160✔
142
                                      context->buffer_pool_offset);
8,378,160✔
143

144
  return result;
8,378,160✔
145
}
146

147
/** Get Device from \a context SharedMemoryContext and \a header BufferHeader */
148
Device *GetDeviceFromHeader(SharedMemoryContext *context,
379,468✔
149
                            BufferHeader *header) {
150
  BufferPool *pool = GetBufferPoolFromContext(context);
379,468✔
151
  Device *devices_base = (Device *)(context->shm_base + pool->devices_offset);
379,468✔
152
  Device *result = devices_base + header->device_id;
379,468✔
153

154
  return result;
379,468✔
155
}
156

157
/** Get Target from \a context SharedMemoryContext and \a index index. */
158
Target *GetTarget(SharedMemoryContext *context, int index) {
693,459✔
159
  BufferPool *pool = GetBufferPoolFromContext(context);
693,459✔
160
  Target *targets_base = (Target *)(context->shm_base + pool->targets_offset);
693,459✔
161
  Target *result = targets_base + index;
693,459✔
162

163
  return result;
693,459✔
164
}
165

166
/** Get Target from \a context SharedMemoryContext and \a id TargetID. */
167
Target *GetTargetFromId(SharedMemoryContext *context, TargetID id) {
309,376✔
168
  Target *result = GetTarget(context, id.bits.index);
309,376✔
169

170
  return result;
309,376✔
171
}
172

173
std::vector<f32> GetBandwidths(SharedMemoryContext *context,
34,414✔
174
                               const std::vector<TargetID> &targets) {
175
  std::vector<f32> result(targets.size(), 0);
68,828✔
176

177
  for (size_t i = 0; i < targets.size(); i++) {
171,866✔
178
    Device *device = GetDeviceById(context, targets[i].bits.device_id);
137,452✔
179
    result[i] = device->bandwidth_mbps;
137,452✔
180
  }
181

182
  return result;
34,414✔
183
}
184

185
Device *GetDeviceById(SharedMemoryContext *context, DeviceID device_id) {
137,682✔
186
  BufferPool *pool = GetBufferPoolFromContext(context);
137,682✔
187
  Device *devices_base = (Device *)(context->shm_base + pool->devices_offset);
137,682✔
188
  Device *result = devices_base + device_id;
137,682✔
189

190
  return result;
137,682✔
191
}
192

193
/** Get DeviceID from \a target_id TargetID. */
194
DeviceID GetDeviceIdFromTargetId(TargetID target_id) {
34,531✔
195
  DeviceID result = target_id.bits.device_id;
34,531✔
196

197
  return result;
34,531✔
198
}
199

200
BufferHeader *GetHeadersBase(SharedMemoryContext *context) {
1,735,860✔
201
  BufferPool *pool = GetBufferPoolFromContext(context);
1,735,860✔
202
  BufferHeader *result = (BufferHeader *)(context->shm_base +
1,735,860✔
203
                                          pool->headers_offset);
1,735,860✔
204

205
  return result;
1,735,860✔
206
}
207

208
/** Get BufferHeader from \a context SharedMemoryContext and \a index index. */
209
inline BufferHeader *GetHeaderByIndex(SharedMemoryContext *context, u32 index) {
1,735,860✔
210
  [[maybe_unused]] BufferPool *pool = GetBufferPoolFromContext(context);
1,735,860✔
211
  BufferHeader *headers = GetHeadersBase(context);
1,735,860✔
212
  assert(index < pool->total_headers);
1,735,860✔
213
  BufferHeader *result = headers + index;
1,735,860✔
214

215
  return result;
1,735,860✔
216
}
217

218
BufferHeader *GetHeaderByBufferId(SharedMemoryContext *context,
603,637✔
219
                                  BufferID id) {
220
  BufferHeader *result = GetHeaderByIndex(context, id.bits.header_index);
603,637✔
221

222
  return result;
603,637✔
223
}
224

225
/** Reset \a header BufferHeader. */
226
void ResetHeader(BufferHeader *header) {
×
227
  if (header) {
×
228
    // NOTE(chogan): Keep `id` the same. They should never change.
229
    header->next_free.as_int = 0;
×
230
    // NOTE(chogan): Keep `data_offset` because splitting/merging may reuse it
231
    header->used = 0;
×
232
    header->capacity = 0;
×
233
    header->device_id = 0;
×
234
    header->in_use = 0;
×
235
    header->locked = 0;
×
236
  }
237
}
×
238

239
/** Make \a header BufferHeader dormant by setting capacity to 0. */
240
static inline void MakeHeaderDormant(BufferHeader *header) {
×
241
  header->capacity = 0;
×
242
}
×
243

244
bool HeaderIsDormant(BufferHeader *header) {
×
245
  bool result = header->capacity == 0;
×
246

247
  return result;
×
248
}
249

250
#if 0
251
BufferHeader *GetFirstDormantHeader(SharedMemoryContext *context) {
252
  BufferPool *pool = GetBufferPoolFromContext(context);
253
  BufferHeader *headers = GetHeadersBase(context);
254
  BufferHeader *result = 0;
255

256
  for (u32 i = 0; i < pool->num_headers; ++i) {
257
    BufferHeader *header = &headers[i];
258
    if (HeaderIsDormant(header)) {
259
      result = header;
260
      break;
261
    }
262
  }
263

264
  return result;
265
}
266
#endif
267

268
#if 0
269
f32 ComputeFragmentationScore(SharedMemoryContext *context) {
270
  BufferPool *pool = GetBufferPoolFromContext(context);
271
  BufferHeader *headers = GetHeadersBase(context);
272

273
  u32 total_used_headers = 0;
274
  f32 usage_score = 0;
275
  for (u32 i = 0; i < pool->num_headers; ++i) {
276
    BufferHeader *header = &headers[i];
277

278
    if (header->in_use) {
279
      total_used_headers++;
280

281
      // TODO(chogan): Is a lock really necessary? Need to check where
282
      // `used` and `capacity` get mutated. We only read them here. Could
283
      // just make both variables atomics.
284
      LockBuffer(header);
285
      f32 percentage_used = header->used / header->capacity;
286
      usage_score += percentage_used;
287
      UnlockBuffer(header);
288
    }
289
  }
290

291
  f32 percentage_of_used_headers = ((f32)total_used_headers /
292
                                    (f32)pool->num_headers);
293
  f32 max_usage_score = 1.0f * pool->num_headers;
294
  f32 optimal_usage = percentage_of_used_headers * max_usage_score;
295
  f32 result = optimal_usage - usage_score;
296

297
  // TODO(chogan): Reorganize BufferPool if score is > 0.5 or 0.6
298
  return result;
299
}
300
#endif
301

302
/**
303
   Get slab unit size from \a context SharedMemoryContext, \a device_id
304
   Device ID, and \a slab_index slab index.
305
*/
306
i32 GetSlabUnitSize(SharedMemoryContext *context, DeviceID device_id,
618,580✔
307
                    int slab_index) {
308
  BufferPool *pool = GetBufferPoolFromContext(context);
618,580✔
309
  i32 result = 0;
618,580✔
310
  i32 *slab_unit_sizes = nullptr;
618,580✔
311

312
  if (device_id < pool->num_devices) {
618,580✔
313
    slab_unit_sizes = (i32 *)(context->shm_base +
618,580✔
314
                              pool->slab_unit_sizes_offsets[device_id]);
618,580✔
315
    if (slab_index < pool->num_slabs[device_id]) {
618,580✔
316
      result = slab_unit_sizes[slab_index];
618,580✔
317
    } else {
318
      // TODO(chogan): @logging
319
    }
320
  } else {
321
    // TODO(chogan): @logging
322
  }
323

324
  return result;
618,580✔
325
}
326

327
/**
328
   Get slab buffer size from \a context SharedMemoryContext, \a device_id
329
   Device ID, and \a slab_index slab index.
330
*/
331
i32 GetSlabBufferSize(SharedMemoryContext *context, DeviceID device_id,
150,194✔
332
                       int slab_index) {
333
  BufferPool *pool = GetBufferPoolFromContext(context);
150,194✔
334
  i32 *slab_sizes = nullptr;
150,194✔
335
  i32 result = 0;
150,194✔
336

337
  if (device_id < pool->num_devices) {
150,194✔
338
    slab_sizes = (i32 *)(context->shm_base +
150,194✔
339
                         pool->slab_buffer_sizes_offsets[device_id]);
150,194✔
340
    if (slab_index < pool->num_slabs[device_id]) {
150,194✔
341
      result = slab_sizes[slab_index];
150,194✔
342
    } else {
343
      LOG(WARNING) << "Requested info for a non-existent slab "
×
344
                   << "(requested slab index: " << slab_index
×
345
                   << " , max index: " << pool->num_slabs[device_id]
×
346
                   << std::endl;
×
347
    }
348
  } else {
349
    LOG(WARNING) << "Requested info for a non-existent Device "
×
350
                 << "(requested id: " << device_id << " , max id: "
×
351
                 << pool->num_devices << std::endl;
×
352
  }
353

354
  return result;
150,194✔
355
}
356

357
/**
358
   Get BufferID pointer to the free list offset of \a device_id Device ID
359
   from \a context SharedMemoryContext.
360
*/
361
BufferID *GetFreeListPtr(SharedMemoryContext *context, DeviceID device_id) {
768,210✔
362
  BufferPool *pool = GetBufferPoolFromContext(context);
768,210✔
363
  BufferID *result = nullptr;
768,210✔
364

365
  if (device_id < pool->num_devices) {
768,210✔
366
    result =
768,210✔
367
      (BufferID *)(context->shm_base + pool->free_list_offsets[device_id]);
768,210✔
368
  }
369

370
  return result;
768,210✔
371
}
372

373
/**
374
  Get slab index from \a context SharedMemoryContext and \a header BufferHeader.
375
*/
376
int GetSlabIndexFromHeader(SharedMemoryContext *context, BufferHeader *header) {
201,605✔
377
  BufferPool *pool = GetBufferPoolFromContext(context);
201,605✔
378
  DeviceID device_id = header->device_id;
201,605✔
379
  i32 units = header->capacity / pool->block_sizes[device_id];
201,605✔
380
  int result = 0;
201,605✔
381

382
  for (int i = 0; i < pool->num_slabs[device_id]; ++i) {
618,580✔
383
    if (GetSlabUnitSize(context, device_id, i) == units) {
618,580✔
384
      result = i;
201,605✔
385
      break;
201,605✔
386
    }
387
  }
388

389
  return result;
201,605✔
390
}
391

392
/**
393
  Check if node ID of \a buffer_id BufferID is same as node ID of \a comm
394
  CommunicationContext.
395
*/
396
bool BufferIsRemote(CommunicationContext *comm, BufferID buffer_id) {
×
397
  bool result = (u32)comm->node_id != buffer_id.bits.node_id;
×
398

399
  return result;
×
400
}
401

402
/**
403
  Check if node ID of \a buffer_id BufferID is same as node ID of \a rpc
404
  RpcContext.
405
*/
406
bool BufferIsRemote(RpcContext *rpc, BufferID buffer_id) {
788,981✔
407
  bool result = rpc->node_id != buffer_id.bits.node_id;
788,981✔
408

409
  return result;
788,981✔
410
}
411

412
/** Check if \a id BufferID is null. */
413
bool IsNullBufferId(BufferID id) {
192,518✔
414
  bool result = id.as_int == 0;
192,518✔
415

416
  return result;
192,518✔
417
}
418

419
/**
420
  Check if the device with \a context SharedMemoryContext and
421
  \a buffer_id BufferID is byte-addressable.
422
*/
423
bool BufferIsByteAddressable(SharedMemoryContext *context, BufferID id) {
×
424
  BufferHeader *header = GetHeaderByBufferId(context, id);
×
425
  Device *device = GetDeviceFromHeader(context, header);
×
426
  bool result = device->is_byte_addressable;
×
427

428
  return result;
×
429
}
430

431
/**
432
  Get the first free BufferID from \a context SharedMemoryContext, 
433
  \a device_id DeviceID, and \a slab_index slab index.
434
*/
435
BufferID PeekFirstFreeBufferId(SharedMemoryContext *context, DeviceID device_id,
384,290✔
436
                               int slab_index) {
437
  BufferPool *pool = GetBufferPoolFromContext(context);
384,290✔
438
  BufferID result = {};
384,290✔
439
  BufferID *free_list = GetFreeListPtr(context, device_id);
384,290✔
440
  if (slab_index < pool->num_slabs[device_id]) {
384,290✔
441
    result = free_list[slab_index];
384,290✔
442
  }
443

444
  return result;
384,290✔
445
}
446

447
/**
448
  Set the value at \a slab_index slab index of free list
449
  from \a context SharedMemoryContext and \a device_id DeviceID
450
  as \a new_id BufferID.
451
*/
452
void SetFirstFreeBufferId(SharedMemoryContext *context, DeviceID device_id,
383,920✔
453
                          int slab_index, BufferID new_id) {
454
  BufferPool *pool = GetBufferPoolFromContext(context);
383,920✔
455
  BufferID *free_list = GetFreeListPtr(context, device_id);
383,920✔
456
  if (slab_index < pool->num_slabs[device_id]) {
383,920✔
457
    free_list[slab_index] = new_id;
383,920✔
458
  }
459
}
383,920✔
460

461
/**
462
  Get the array of available buffer offsets
463
  from \a context SharedMemoryContext and \a device_id DeviceID.
464
*/
465
std::atomic<u32> *GetAvailableBuffersArray(SharedMemoryContext *context,
385,276✔
466
                                           DeviceID device_id) {
467
  BufferPool *pool = GetBufferPoolFromContext(context);
385,276✔
468
  std::atomic<u32> *result =
385,276✔
469
    (std::atomic<u32> *)(context->shm_base +
385,276✔
470
                         pool->buffers_available_offsets[device_id]);
385,276✔
471

472
  return result;
385,276✔
473
}
474

475
/**
476
  Get the number of available buffers from \a context SharedMemoryContext,
477
  and \a device_id DeviceID at \a slab_index slab index.
478
*/  
479
static u32 GetNumBuffersAvailable(SharedMemoryContext *context,
1,355✔
480
                                  DeviceID device_id, int slab_index) {
481
  std::atomic<u32> *buffers_available = GetAvailableBuffersArray(context,
1,355✔
482
                                                                 device_id);
483
  u32 result = 0;
1,355✔
484
  if (buffers_available) {
1,355✔
485
    result = buffers_available[slab_index].load();
2,710✔
486
  }
487

488
  return result;
1,355✔
489
}
490

491
/**
492
  Get the number of available buffers from \a context SharedMemoryContext,
493
  and \a device_id DeviceID.
494
*/    
495
u32 GetNumBuffersAvailable(SharedMemoryContext *context, DeviceID device_id) {
230✔
496
  BufferPool *pool = GetBufferPoolFromContext(context);
230✔
497
  u32 result = 0;
230✔
498
  for (int slab = 0; slab < pool->num_slabs[device_id]; ++slab) {
1,123✔
499
    result += GetNumBuffersAvailable(context, device_id, slab);
893✔
500
  }
501

502
  return result;
230✔
503
}
504

505
#if 0
506
static u64 GetNumBytesRemaining(SharedMemoryContext *context,
507
                                DeviceID device_id, int slab_index) {
508
  u32 num_free_buffers = GetNumBuffersAvailable(context, device_id, slab_index);
509
  u32 buffer_size = GetSlabBufferSize(context, device_id, slab_index);
510
  u64 result = num_free_buffers * buffer_size;
511

512
  return result;
513
}
514

515
static u64 GetNumBytesRemaining(SharedMemoryContext *context, DeviceID id) {
516
  BufferPool *pool = GetBufferPoolFromContext(context);
517
  u64 result = 0;
518
  for (int i = 0; i < pool->num_slabs[id]; ++i) {
519
    result += GetNumBytesRemaining(context, id, i);
520
  }
521

522
  return result;
523
}
524
#endif
525

526
/**
527
  Decrement the number of available buffers at \a slab_index slab index
528
  from \a context SharedMemoryContext and \a device_id DeviceID.
529
*/
530
static void DecrementAvailableBuffers(SharedMemoryContext *context,
192,148✔
531
                                      DeviceID device_id, int slab_index) {
532
  std::atomic<u32> *buffers_available = GetAvailableBuffersArray(context,
192,148✔
533
                                                                 device_id);
534
  if (buffers_available) {
192,148✔
535
    buffers_available[slab_index].fetch_sub(1);
192,148✔
536
  }
537
}
192,148✔
538

539
/**
540
  Increment the number of available buffers at \a slab_index slab index
541
  from \a context SharedMemoryContext and \a device_id DeviceID.
542
*/
543
static void IncrementAvailableBuffers(SharedMemoryContext *context,
191,772✔
544
                                      DeviceID device_id, int slab_index) {
545
  std::atomic<u32> *buffers_available = GetAvailableBuffersArray(context,
191,772✔
546
                                                                 device_id);
547
  if (buffers_available) {
191,772✔
548
    buffers_available[slab_index].fetch_add(1);
191,772✔
549
  }
550
}
191,772✔
551

552
/**
553
  Update buffering capacities of \a device_id DeviceID by \a adjustment 
554
  from \a context SharedMemoryContext.
555
*/
556
void UpdateBufferingCapacities(SharedMemoryContext *context, i64 adjustment,
383,920✔
557
                               DeviceID device_id) {
558
  BufferPool *pool = GetBufferPoolFromContext(context);
383,920✔
559

560
  // NOTE(chogan): Update local capacities, which will eventually be reflected
561
  // in the global SystemViewState.
562
  // TODO(chogan): I think Target capacities will supersede the global system
563
  // view state once we have topologies. For now we track both node capacities
564
  // and global capacities.
565
  pool->capacity_adjustments[device_id].fetch_add(adjustment);
383,920✔
566

567
  // TODO(chogan): DeviceID is currently equal to TargetID, but that will change
568
  // once we have topologies. This function will need to support TargetIDs
569
  // instead of DeviceID.
570
  Target *target = GetTarget(context, device_id);
383,920✔
571
  target->remaining_space.fetch_add(adjustment);
383,920✔
572
}
383,920✔
573
/**
574
  Release local \a buffer_id buffer from \a context SharedMemoryContext.
575
*/
576
void LocalReleaseBuffer(SharedMemoryContext *context, BufferID buffer_id) {
191,772✔
577
  BufferPool *pool = GetBufferPoolFromContext(context);
191,772✔
578
  BufferHeader *header_to_free = GetHeaderByIndex(context,
191,772✔
579
                                                  buffer_id.bits.header_index);
580
  if (header_to_free) {
191,772✔
581
    BeginTicketMutex(&pool->ticket_mutex);
191,772✔
582
    header_to_free->used = 0;
191,772✔
583
    header_to_free->in_use = false;
191,772✔
584
    int slab_index = GetSlabIndexFromHeader(context, header_to_free);
191,772✔
585
    DeviceID device_id = header_to_free->device_id;
191,772✔
586
    header_to_free->next_free = PeekFirstFreeBufferId(context, device_id,
587
                                                     slab_index);
191,772✔
588
    SetFirstFreeBufferId(context, device_id, slab_index, buffer_id);
191,772✔
589
    IncrementAvailableBuffers(context, device_id, slab_index);
191,772✔
590

591
    i64 capacity_adjustment = header_to_free->capacity;
191,772✔
592
    UpdateBufferingCapacities(context, capacity_adjustment, device_id);
191,772✔
593

594
    EndTicketMutex(&pool->ticket_mutex);
191,772✔
595
  }
596
}
191,772✔
597

598
/**
599
  Release remote \a buffer_id buffer from \a context SharedMemoryContext
600
  and \a rpc RpcContext.
601
*/  
602
void ReleaseBuffer(SharedMemoryContext *context, RpcContext *rpc,
184,729✔
603
                   BufferID buffer_id) {
604
  u32 target_node = buffer_id.bits.node_id;
184,729✔
605
  if (target_node == rpc->node_id) {
184,729✔
606
    LocalReleaseBuffer(context, buffer_id);
184,729✔
607
  } else {
608
    RpcCall<bool>(rpc, target_node, "RemoteReleaseBuffer", buffer_id);
×
609
  }
610
}
184,729✔
611

612
/**
613
  Release remote \a buffer_ids buffers from \a context SharedMemoryContext
614
  and \a rpc RpcContext.
615
*/
616
void ReleaseBuffers(SharedMemoryContext *context, RpcContext *rpc,
34,231✔
617
                    const std::vector<BufferID> &buffer_ids) {
618
  for (auto id : buffer_ids) {
218,960✔
619
    ReleaseBuffer(context, rpc, id);
184,729✔
620
  }
621
}
34,231✔
622

623
/**
624
  Release local \a buffer_ids buffers from \a context SharedMemoryContext.
625
*/
626
void LocalReleaseBuffers(SharedMemoryContext *context,
139✔
627
                         const std::vector<BufferID> &buffer_ids) {
628
  for (auto id : buffer_ids) {
7,182✔
629
    LocalReleaseBuffer(context, id);
7,043✔
630
  }
631
}
139✔
632

633
/**
634
  Get the BufferID of free buffer at \a slab_index slab index
635
  from \a context SharedMemoryContext and \a device_id DeviceID.
636
*/
637
BufferID GetFreeBuffer(SharedMemoryContext *context, DeviceID device_id,
192,518✔
638
                       int slab_index) {
639
  BufferPool *pool = GetBufferPoolFromContext(context);
192,518✔
640
  BufferID result = {};
192,518✔
641

642
  BeginTicketMutex(&pool->ticket_mutex);
192,518✔
643
  BufferID id = PeekFirstFreeBufferId(context, device_id, slab_index);
192,518✔
644
  if (!IsNullBufferId(id)) {
192,518✔
645
    u32 header_index = id.bits.header_index;
192,148✔
646
    BufferHeader *header = GetHeaderByIndex(context, header_index);
192,148✔
647
    header->in_use = true;
192,148✔
648
    result = header->id;
192,148✔
649
    SetFirstFreeBufferId(context, device_id, slab_index, header->next_free);
192,148✔
650
    DecrementAvailableBuffers(context, device_id, slab_index);
192,148✔
651

652
    i64 capacity_adjustment = -(i64)header->capacity;
192,148✔
653
    UpdateBufferingCapacities(context, capacity_adjustment, device_id);
192,148✔
654
  }
655
  EndTicketMutex(&pool->ticket_mutex);
192,518✔
656

657
  return result;
385,036✔
658
}
659

660
std::vector<BufferID> GetBuffers(SharedMemoryContext *context,
34,480✔
661
                                 const PlacementSchema &schema) {
662
  BufferPool *pool = GetBufferPoolFromContext(context);
34,480✔
663

664
  bool failed = false;
34,480✔
665
  std::vector<BufferID> result;
34,480✔
666
  for (auto [size_left, target] : schema) {
69,011✔
667
    DeviceID device_id = GetDeviceIdFromTargetId(target);
34,531✔
668
    std::vector<size_t> num_buffers(pool->num_slabs[device_id], 0);
103,593✔
669

670
    // NOTE(chogan): naive buffer selection algorithm: fill with largest
671
    // buffers first
672
    for (int i = pool->num_slabs[device_id] - 1; i >= 0; --i) {
172,424✔
673
      size_t buffer_size = GetSlabBufferSize(context, device_id, i);
137,893✔
674
      size_t num_buffers = buffer_size ? size_left / buffer_size : 0;
137,893✔
675

676
      while (num_buffers > 0) {
318,206✔
677
        BufferID id = GetFreeBuffer(context, device_id, i);
180,679✔
678
        if (id.as_int) {
180,679✔
679
          result.push_back(id);
180,313✔
680
          BufferHeader *header = GetHeaderByBufferId(context, id);
180,313✔
681
          header->used = buffer_size;
180,313✔
682
          size_left -= buffer_size;
180,313✔
683
          num_buffers--;
180,313✔
684
        } else {
685
          // NOTE(chogan): Out of buffers in this slab. Go to next slab.
686
          break;
366✔
687
        }
688
      }
689
    }
690

691
    if (size_left > 0) {
34,531✔
692
      size_t buffer_size = GetSlabBufferSize(context, device_id, 0);
11,839✔
693
      BufferID id = GetFreeBuffer(context, device_id, 0);
11,839✔
694
      size_t used = std::min(buffer_size, size_left);
11,839✔
695
      size_left -= used;
11,839✔
696
      if (id.as_int && size_left == 0) {
11,839✔
697
        result.push_back(id);
11,835✔
698
        BufferHeader *header = GetHeaderByBufferId(context, id);
11,835✔
699
        header->used = used;
11,835✔
700
      } else {
701
        failed = true;
4✔
702
        DLOG(INFO) << "Not enough buffers to fulfill request" << std::endl;
4✔
703
      }
704
    }
705
  }
706

707
  if (failed) {
34,480✔
708
    // NOTE(chogan): All or none operation. Must release the acquired buffers if
709
    // we didn't get all we asked for
710
    LocalReleaseBuffers(context, result);
4✔
711
    result.clear();
4✔
712
  }
713

714
  return result;
34,480✔
715
}
716

717
/**
718
   Get buffer size from \a context SharedMemoryContext and \a id BufferID.
719
*/
720
u32 LocalGetBufferSize(SharedMemoryContext *context, BufferID id) {
410,305✔
721
  BufferHeader *header = GetHeaderByBufferId(context, id);
410,305✔
722
  u32 result = header->used;
410,305✔
723

724
  return result;
410,305✔
725
}
726

727
/**
728
   Get remote buffer size from \a context SharedMemoryContext,
729
   \a rpc RpcContext, and \a id BufferID.
730
*/  
731
u32 GetBufferSize(SharedMemoryContext *context, RpcContext *rpc, BufferID id) {
410,305✔
732
  u32 result = 0;
410,305✔
733
  if (BufferIsRemote(rpc, id)) {
410,305✔
734
    result = RpcCall<u32>(rpc, id.bits.node_id, "RemoteGetBufferSize", id);
4✔
735
  } else {
736
    result = LocalGetBufferSize(context, id);
410,301✔
737
  }
738

739
  return result;
410,305✔
740
}
741

742
/**
743
   Get remote BLOB size from \a context SharedMemoryContext,
744
   \a rpc RpcContext, and \a buffer_ids BufferIdArray.
745
*/
746
size_t GetBlobSize(SharedMemoryContext *context, RpcContext *rpc,
35,882✔
747
                   BufferIdArray *buffer_ids) {
748
  size_t result = 0;
35,882✔
749
  // TODO(chogan): @optimization Combine all ids on same node into 1 RPC
750
  for (u32 i = 0; i < buffer_ids->length; ++i) {
252,616✔
751
    u32 size = GetBufferSize(context, rpc, buffer_ids->ids[i]);
216,734✔
752
    result += size;
216,734✔
753
  }
754

755
  return result;
35,882✔
756
}
757

758
/**
759
   Get remote BLOB size from \a context SharedMemoryContext,
760
   \a rpc RpcContext, \a arena Arena, and \a blob_id BlobID.
761
*/  
762
size_t GetBlobSizeById(SharedMemoryContext *context, RpcContext *rpc,
35,883✔
763
                       Arena *arena, BlobID blob_id) {
764
  size_t result = 0;
35,883✔
765
  BufferIdArray buffer_ids =
35,883✔
766
    GetBufferIdsFromBlobId(arena, context, rpc, blob_id, NULL);
35,883✔
767

768
  if (BlobIsInSwap(blob_id)) {
35,883✔
769
    SwapBlob swap_blob = IdArrayToSwapBlob(buffer_ids);
1✔
770
    result = swap_blob.size;
1✔
771
  } else {
772
    result = GetBlobSize(context, rpc, &buffer_ids);
35,882✔
773
  }
774

775
  return result;
35,883✔
776
}
777

778

779
/**
780
   Get buffer offset of \a id buffer from \a context SharedMemoryContext.
781
*/
782
ptrdiff_t GetBufferOffset(SharedMemoryContext *context, BufferID id) {
369,237✔
783
  BufferHeader *header = GetHeaderByIndex(context, id.bits.header_index);
369,237✔
784
  ptrdiff_t result = header->data_offset;
369,237✔
785

786
  return result;
369,237✔
787
}
788

789
/**
790
   Get RAM buffer pointer of \a id buffer from \a context SharedMemoryContext.
791
*/  
792
u8 *GetRamBufferPtr(SharedMemoryContext *context, BufferID buffer_id) {
369,237✔
793
  ptrdiff_t data_offset = GetBufferOffset(context, buffer_id);
369,237✔
794
  u8 *result = context->shm_base + data_offset;
369,237✔
795

796
  return result;
369,237✔
797
}
798

799
/** Make BufferID using \a node_id and \a header_index. */
800
BufferID MakeBufferId(u32 node_id, u32 header_index) {
93,625,600✔
801
  BufferID result = {};
93,625,600✔
802
  result.bits.node_id = node_id;
93,625,600✔
803
  result.bits.header_index = header_index;
93,625,600✔
804

805
  return result;
93,625,600✔
806
}
807

808
/**
809
  Partition RAM buffers by \a block_size for \a buffer_count buffers
810
 with \a buffer_size size in \a arena Arena.
811
*/
812
void PartitionRamBuffers(Arena *arena, i32 buffer_size, i32 buffer_count,
159✔
813
                         int block_size) {
814
  for (int i = 0; i < buffer_count; ++i) {
3,952,660✔
815
    int num_blocks_needed = buffer_size / block_size;
3,952,510✔
816
    [[maybe_unused]] u8 *first_buffer = PushArray<u8>(arena, block_size);
3,952,510✔
817

818
    for (int block = 0; block < num_blocks_needed - 1; ++block) {
13,568,200✔
819
      // NOTE(chogan): @optimization Since we don't have to store these
820
      // pointers, the only effect of this loop is that the arena will end up
821
      // with the correct "next free" address. This function isn't really
822
      // necessary; it's mainly just testing that everything adds up correctly.
823
      [[maybe_unused]] u8 *buffer = PushArray<u8>(arena, block_size);
9,615,690✔
824
      // NOTE(chogan): Make sure the buffers are perfectly aligned (no holes or
825
      // padding is introduced)
826
      [[maybe_unused]] i32 buffer_block_offset = (block + 1) * block_size;
9,615,690✔
827
      assert((u8 *)first_buffer == ((u8 *)buffer) - buffer_block_offset);
9,615,690✔
828
    }
829
  }
830
}
159✔
831

832
/**
833
   Make \a end_index - \a start_index amount of BufferHeaders.
834
 */   
835
BufferID MakeBufferHeaders(Arena *arena, int buffer_size, u32 start_index,
621✔
836
                           u32 end_index, int node_id, DeviceID device_id,
837
                           ptrdiff_t initial_offset, u8 **header_begin) {
838
  BufferHeader dummy = {};
621✔
839
  BufferHeader *previous = &dummy;
621✔
840

841
  for (u32 i = start_index, j = 0; i < end_index; ++i, ++j) {
84,010,100✔
842
    BufferHeader *header = PushClearedStruct<BufferHeader>(arena);
84,009,500✔
843
    header->id = MakeBufferId(node_id, i);
84,009,500✔
844
    header->capacity = buffer_size;
84,009,500✔
845
    header->device_id = device_id;
84,009,500✔
846

847
    // NOTE(chogan): Stored as offset from base address of shared memory
848
    header->data_offset =
84,009,500✔
849
      (ptrdiff_t)buffer_size * (ptrdiff_t)j + initial_offset;
84,009,500✔
850

851
    previous->next_free = header->id;
84,009,500✔
852
    previous = header;
84,009,500✔
853

854
    // NOTE(chogan): Store the address of the first header so we can later
855
    // compute the `headers_offset`
856
    if (i == 0 && header_begin) {
84,009,500✔
857
      *header_begin = (u8 *)header;
42✔
858
    }
859
  }
860

861
  return dummy.next_free;
621✔
862
}
863

864
/**
865
   Initialize devices.
866
 */   
867
Device *InitDevices(Arena *arena, Config *config, f32 &min_bw, f32 &max_bw) {
42✔
868
  min_bw = FLT_MAX;
42✔
869
  max_bw = 0;
42✔
870

871
  Device *result = PushArray<Device>(arena, config->num_devices);
42✔
872

873
  for (int i = 0; i < config->num_devices; ++i) {
204✔
874
    Device *device = result + i;
162✔
875
    device->bandwidth_mbps = config->bandwidths[i];
162✔
876

877
    if (device->bandwidth_mbps > max_bw) {
162✔
878
      max_bw = device->bandwidth_mbps;
42✔
879
    }
880
    if (device->bandwidth_mbps < min_bw) {
162✔
881
      min_bw = device->bandwidth_mbps;
162✔
882
    }
883

884
    device->latency_ns = config->latencies[i];
162✔
885
    device->id = i;
162✔
886
    device->is_shared = config->is_shared_device[i];
162✔
887
    // TODO(chogan): @configuration Get this from cmake.
888
    device->has_fallocate = false;
162✔
889
    size_t path_length = config->mount_points[i].size();
162✔
890

891
    if (path_length == 0) {
162✔
892
      device->is_byte_addressable = true;
42✔
893
    } else {
894
      if (path_length < kMaxPathLength) {
120✔
895
        snprintf(device->mount_point, path_length + 1, "%s",
120✔
896
                 config->mount_points[i].c_str());
897
      } else {
898
        LOG(ERROR) << "Mount point path " << config->mount_points[i]
×
899
                   << " exceeds max length of " << kMaxPathLength << std::endl;
×
900
      }
901
    }
902
  }
903

904
  return result;
42✔
905
}
906

907
/** Initialize targets. */
908
Target *InitTargets(Arena *arena, Config *config, Device *devices,
42✔
909
                    int node_id) {
910
  Target *result = PushClearedArray<Target>(arena, config->num_targets);
42✔
911

912
  if (config->num_targets != config->num_devices) {
42✔
913
    HERMES_NOT_IMPLEMENTED_YET;
×
914
  }
915

916
  for (int i = 0; i < config->num_targets; ++i) {
204✔
917
    TargetID id = {};
162✔
918
    id.bits.node_id = node_id;
162✔
919
    id.bits.device_id = (DeviceID)i;
162✔
920
    id.bits.index = i;
162✔
921
    Target *target = result + i;
162✔
922
    target->id = id;
162✔
923
    target->capacity = config->capacities[i];
162✔
924
    target->remaining_space.store(config->capacities[i]);
162✔
925
    target->speed.store(devices[i].bandwidth_mbps);
162✔
926
  }
927

928
  return result;
42✔
929
}
930

931
/**
932
   Merge RAM buffer free list.
933
   
934
   \todo (chogan): needs more testing for the case when the free list has
935
   been jumbled for a while. Currently, we just test a nice linear free list.
936
*/
937
void MergeRamBufferFreeList(SharedMemoryContext *context, int slab_index) {
×
938
  BufferPool *pool = GetBufferPoolFromContext(context);
×
939

940
  if (slab_index < 0 || slab_index >= pool->num_slabs[0] - 1) {
×
941
    // TODO(chogan): @logging
942
    return;
×
943
  }
944

945
  // TODO(chogan): @configuration Assumes RAM is first Device
946
  int this_slab_unit_size = GetSlabUnitSize(context, 0, slab_index);
×
947
  int bigger_slab_unit_size = GetSlabUnitSize(context, 0, slab_index + 1);
×
948

949
  // TODO(chogan): Currently just handling the case where the next slab size is
950
  // perfectly divisible by this slab's size
951
  if (this_slab_unit_size == 0 ||
×
952
      bigger_slab_unit_size % this_slab_unit_size != 0) {
×
953
    // TODO(chogan): @logging
954
    return;
×
955
  }
956

957
  int merge_factor = bigger_slab_unit_size / this_slab_unit_size;
×
958
  int new_slab_size_in_bytes = bigger_slab_unit_size * pool->block_sizes[0];
×
959
  int old_slab_size_in_bytes = this_slab_unit_size * pool->block_sizes[0];
×
960

961
  BeginTicketMutex(&pool->ticket_mutex);
×
962
  // TODO(chogan): Assuming first Device is RAM
963
  DeviceID device_id = 0;
×
964
  BufferID id = PeekFirstFreeBufferId(context, device_id, slab_index);
×
965

966
  while (id.as_int != 0) {
×
967
    BufferHeader *header_to_merge = GetHeaderByIndex(context,
×
968
                                                     id.bits.header_index);
969
    ptrdiff_t initial_offset = header_to_merge->data_offset;
×
970

971
    // NOTE(chogan): First go through the next `merge_factor` buffers and see if
972
    // a merge is possible.
973
    bool have_enough_buffers = false;
×
974
    bool buffers_are_contiguous = true;
×
975
    bool buffer_offsets_are_ascending = true;
×
976
    BufferID id_copy = id;
×
977
    for (int i = 0;
×
978
         i < merge_factor && id_copy.as_int != 0 && buffers_are_contiguous;
×
979
         ++i) {
980
      BufferHeader *header = GetHeaderByBufferId(context, id_copy);
×
981
      if (i != 0) {
×
982
        // NOTE(chogan): Contiguous buffers may be in either ascending or
983
        // descending order
984
        int buffer_offset = i * old_slab_size_in_bytes;
×
985
        buffer_offsets_are_ascending = header->data_offset > initial_offset;
×
986
        buffer_offset *= buffer_offsets_are_ascending ? 1 : -1;
×
987
        if (initial_offset + buffer_offset != header->data_offset) {
×
988
          buffers_are_contiguous = false;
×
989
        }
990
      }
991
      if (i == merge_factor - 1) {
×
992
        have_enough_buffers = true;
×
993
      }
994
      id_copy = header->next_free;
×
995
    }
996

997
    if (have_enough_buffers && buffers_are_contiguous) {
×
998
      int saved_free_list_count = 0;
×
999
      // TODO(chogan): What's the max this number could be? Could save the tail
1000
      // and attach to the end of the free list rather than doing all this
1001
      // popping, saving, and pushing
1002
      const int max_saved_entries = 64;
×
1003
      BufferID saved_free_list_entries[max_saved_entries] = {};
×
1004
      // NOTE(chogan): Pop `merge_factor` entries off the free list for
1005
      // `slab_index`
1006
      id_copy = id;
×
1007
      for (int i = 0;
×
1008
           i < merge_factor;
×
1009
           ++i) {
1010
        BufferHeader *header = GetHeaderByBufferId(context, id_copy);
×
1011

1012
        while (header->id.as_int !=
×
1013
               PeekFirstFreeBufferId(context, device_id, slab_index).as_int) {
×
1014
          // NOTE(chogan): It's possible that the buffer we're trying to pop
1015
          // from the free list is not at the beginning of the list. In that
1016
          // case, we have to pop and save all the free buffers before the one
1017
          // we're interested in, and then restore them to the free list later.
1018
          assert(saved_free_list_count < max_saved_entries);
×
1019
          saved_free_list_entries[saved_free_list_count++] =
×
1020
            PeekFirstFreeBufferId(context, device_id, slab_index);
×
1021

1022
          BufferID first_id = PeekFirstFreeBufferId(context, device_id,
×
1023
                                                   slab_index);
×
1024
          BufferHeader *first_free = GetHeaderByBufferId(context, first_id);
×
1025
          SetFirstFreeBufferId(context, device_id, slab_index,
×
1026
                               first_free->next_free);
1027
        }
1028

1029
        SetFirstFreeBufferId(context, device_id, slab_index, header->next_free);
×
1030
        id_copy = header->next_free;
×
1031
        MakeHeaderDormant(header);
×
1032
      }
1033

1034
      ResetHeader(header_to_merge);
×
1035
      if (buffer_offsets_are_ascending) {
×
1036
        header_to_merge->data_offset = initial_offset;
×
1037
      } else {
1038
        // NOTE(chogan): In this case `initial_offset` points at the beginning
1039
        // of the last of `this_slab_unit_size` buffers. We need to back it up
1040
        // to the correct position.
1041
        ptrdiff_t back_up_count = old_slab_size_in_bytes * (merge_factor - 1);
×
1042
        header_to_merge->data_offset = initial_offset - back_up_count;
×
1043
      }
1044
      header_to_merge->capacity = new_slab_size_in_bytes;
×
1045

1046
      // NOTE(chogan): Add the new header to the next size up's free list
1047
      header_to_merge->next_free = PeekFirstFreeBufferId(context, device_id,
1048
                                                         slab_index + 1);
×
1049
      SetFirstFreeBufferId(context, device_id, slab_index + 1,
×
1050
                           header_to_merge->id);
1051

1052
      while (saved_free_list_count > 0) {
×
1053
        // NOTE(chogan): Restore headers that we popped and saved.
1054
        BufferID saved_id = saved_free_list_entries[--saved_free_list_count];
×
1055
        BufferHeader *saved_header = GetHeaderByBufferId(context, saved_id);
×
1056
        saved_header->next_free = PeekFirstFreeBufferId(context, device_id,
1057
                                                        slab_index);
×
1058
        SetFirstFreeBufferId(context, device_id, slab_index, saved_header->id);
×
1059
      }
1060

1061
      id = id_copy;
×
1062
    } else {
1063
      // NOTE(chogan): Didn't have enough contiguous buffers for a merge. Try
1064
      // the next free buffer.
1065
      id = header_to_merge->next_free;
×
1066
    }
1067
  }
1068
  EndTicketMutex(&pool->ticket_mutex);
×
1069
}
1070
/**
1071
   Split RAM buffer free list.
1072
   
1073
   \todo (chogan) Needs more testing for the case when the free list has
1074
    been jumbled for a while. Currently we just test a nice linear free list.
1075
*/
1076
void SplitRamBufferFreeList(SharedMemoryContext *context, int slab_index) {
×
1077
  BufferPool *pool = GetBufferPoolFromContext(context);
×
1078

1079
  if (slab_index < 1 || slab_index >= pool->num_slabs[0]) {
×
1080
    // TODO(chogan): @logging
1081
    return;
×
1082
  }
1083

1084
  int this_slab_unit_size = GetSlabUnitSize(context, 0, slab_index);
×
1085
  int smaller_slab_unit_size = GetSlabUnitSize(context, 0, slab_index - 1);
×
1086

1087
  // TODO(chogan): Currently just handling the case where this slab size is
1088
  // perfectly divisible by the next size down
1089
  assert(smaller_slab_unit_size &&
×
1090
         this_slab_unit_size % smaller_slab_unit_size == 0);
1091

1092
  int split_factor = this_slab_unit_size / smaller_slab_unit_size;
×
1093
  int new_slab_size_in_bytes = smaller_slab_unit_size * pool->block_sizes[0];
×
1094

1095
  // TODO(chogan): @optimization We don't really want to wait for a long queue
1096
  // on the ticket mutex. If we need to split, we want to stop the world and do
1097
  // it immediately.
1098
  BeginTicketMutex(&pool->ticket_mutex);
×
1099
  // TODO(chogan): Assuming first Device is RAM
1100
  DeviceID device_id = 0;
×
1101
  BufferID id = PeekFirstFreeBufferId(context, device_id, slab_index);
×
1102
  u32 unused_header_index = 0;
×
1103
  BufferHeader *headers = GetHeadersBase(context);
×
1104
  BufferHeader *next_unused_header = &headers[unused_header_index];
×
1105

1106
  while (id.as_int != 0) {
×
1107
    BufferHeader *header_to_split = GetHeaderByIndex(context,
×
1108
                                                     id.bits.header_index);
1109
    ptrdiff_t old_data_offset = header_to_split->data_offset;
×
1110
    SetFirstFreeBufferId(context, device_id, slab_index,
×
1111
                         header_to_split->next_free);
1112
    id = header_to_split->next_free;
×
1113

1114
    for (int i = 0; i < split_factor; ++i) {
×
1115
      // NOTE(chogan): Find the next dormant header. This is easy to optimize
1116
      // when splitting since we can keep the live and dormant headers separate
1117
      // and store `first_dormant_header`, but this isn't possible when merging
1118
      // (because we can't move headers that are in use). So, we have to scan
1119
      // the array.
1120
      if (i == 0) {
×
1121
        // NOTE(chogan): Reuse this header as the first unused one
1122
        next_unused_header = header_to_split;
×
1123
      } else {
1124
        while (!HeaderIsDormant(next_unused_header)) {
×
1125
          // NOTE(chogan): Assumes first Device is RAM
1126
          if (++unused_header_index >= pool->num_headers[0]) {
×
1127
            unused_header_index = 0;
×
1128
          }
1129
          next_unused_header = &headers[unused_header_index];
×
1130
        }
1131
      }
1132

1133
      ResetHeader(next_unused_header);
×
1134
      next_unused_header->data_offset = old_data_offset;
×
1135
      next_unused_header->capacity = new_slab_size_in_bytes;
×
1136

1137
      next_unused_header->next_free = PeekFirstFreeBufferId(context, device_id,
1138
                                                            slab_index - 1);
×
1139
      SetFirstFreeBufferId(context, device_id, slab_index - 1,
×
1140
                           next_unused_header->id);
1141

1142
      old_data_offset += new_slab_size_in_bytes;
×
1143
    }
1144
  }
1145
  EndTicketMutex(&pool->ticket_mutex);
×
1146
}
1147

1148
ptrdiff_t InitBufferPool(u8 *shmem_base, Arena *buffer_pool_arena,
42✔
1149
                         Arena *scratch_arena, i32 node_id, Config *config) {
1150
  ScopedTemporaryMemory scratch(scratch_arena);
42✔
1151

1152
  i32 **buffer_counts = PushArray<i32*>(scratch, config->num_devices);
42✔
1153
  i32 **slab_buffer_sizes = PushArray<i32*>(scratch, config->num_devices);
42✔
1154
  i32 *header_counts = PushArray<i32>(scratch, config->num_devices);
42✔
1155

1156
  size_t total_ram_bytes = 0;
42✔
1157

1158
  for (int device = 0; device < config->num_devices; ++device) {
204✔
1159
    slab_buffer_sizes[device] = PushArray<i32>(scratch,
162✔
1160
                                               config->num_slabs[device]);
1161
    buffer_counts[device] = PushArray<i32>(scratch, config->num_slabs[device]);
162✔
1162

1163
    for (int slab = 0; slab < config->num_slabs[device]; ++slab) {
783✔
1164
      slab_buffer_sizes[device][slab] = (config->block_sizes[device] *
621✔
1165
                                       config->slab_unit_sizes[device][slab]);
621✔
1166
      f32 slab_percentage = config->desired_slab_percentages[device][slab];
621✔
1167
      size_t bytes_for_slab = (size_t)((f32)config->capacities[device] *
621✔
1168
                                       slab_percentage);
1169
      buffer_counts[device][slab] = (bytes_for_slab /
621✔
1170
                                   slab_buffer_sizes[device][slab]);
621✔
1171
      if (device == 0) {
621✔
1172
        total_ram_bytes += bytes_for_slab;
159✔
1173
      }
1174
    }
1175
  }
1176

1177
  // TODO(chogan): @configuration Assumes first Device is RAM
1178
  // TODO(chogan): Allow splitting and merging for every Device
1179

1180
  // NOTE(chogan): We need one header per RAM block to allow for splitting and
1181
  //  merging
1182
  header_counts[0] = total_ram_bytes / config->block_sizes[0];
42✔
1183

1184
  for (int device = 1; device < config->num_devices; ++device) {
162✔
1185
    header_counts[device] = 0;
120✔
1186
    for (int slab = 0; slab < config->num_slabs[device]; ++slab) {
582✔
1187
      header_counts[device] += buffer_counts[device][slab];
462✔
1188
    }
1189
  }
1190

1191
  // NOTE(chogan): Anything stored in the buffer_pool_arena (besides buffers)
1192
  // needs to be accounted for here. It would be nice to have a compile time
1193
  // check that makes sure anything we allocate for the buffer pool but outside
1194
  // of it gets accounted for here.
1195

1196
  i32 max_headers_needed = 0;
42✔
1197
  size_t free_lists_size = 0;
42✔
1198
  size_t slab_metadata_size = 0;
42✔
1199
  for (int device = 0; device < config->num_devices; ++device) {
204✔
1200
    max_headers_needed += header_counts[device];
162✔
1201
    free_lists_size += config->num_slabs[device] * sizeof(BufferID);
162✔
1202
    // NOTE(chogan): The '* 2' is because we have an i32 for both slab unit size
1203
    // and slab buffer size
1204
    slab_metadata_size += config->num_slabs[device] * sizeof(i32) * 2;
162✔
1205
    // NOTE(chogan): buffers_available array
1206
    slab_metadata_size += config->num_slabs[device] * sizeof(u32);
162✔
1207
  }
1208

1209
  size_t client_info_size = sizeof(ShmemClientInfo);
42✔
1210
  size_t headers_size = max_headers_needed * sizeof(BufferHeader);
42✔
1211
  size_t devices_size = config->num_devices * sizeof(Device);
42✔
1212
  size_t buffer_pool_size = (sizeof(BufferPool) + free_lists_size +
42✔
1213
                             slab_metadata_size);
1214

1215
  // IMPORTANT(chogan): Currently, no additional bytes are added for alignment.
1216
  // However, if we add more metadata to the BufferPool in the future, automatic
1217
  // alignment could make this number larger than we think. `PushSize` will
1218
  // print out when it adds alignment padding, so for now we can monitor that.
1219
  // In the future it would be nice to have a programatic way to account for
1220
  // alignment padding.
1221
  size_t required_bytes_for_metadata = (client_info_size + headers_size +
42✔
1222
                                        buffer_pool_size + devices_size);
1223
  LOG(INFO) << required_bytes_for_metadata
84✔
1224
            << " bytes required for BufferPool metadata" << std::endl;
84✔
1225

1226
  size_t required_bytes_for_metadata_rounded =
1227
    RoundUpToMultiple(required_bytes_for_metadata, config->block_sizes[0]);
42✔
1228
  i32 num_blocks_reserved_for_metadata = (required_bytes_for_metadata_rounded /
42✔
1229
                                          config->block_sizes[0]);
42✔
1230

1231
  if (buffer_counts[0][0] >= num_blocks_reserved_for_metadata) {
42✔
1232
    // NOTE(chogan): Remove some of the smallest RAM buffers to make room for
1233
    // metadata
1234
    buffer_counts[0][0] -= num_blocks_reserved_for_metadata;
42✔
1235
    // NOTE(chogan): We need fewer headers because we have fewer buffers now
1236
    header_counts[0] -= num_blocks_reserved_for_metadata;
42✔
1237
  } else {
1238
    if (required_bytes_for_metadata > config->capacities[0]) {
×
1239
      LOG(FATAL) << "Insufficient memory for BufferPool. Increase "
×
1240
                 << "capacities_mb[0] or buffer_pool_arena_percentage\n";
×
1241
    }
1242
  }
1243

1244
  // NOTE(chogan): Adjust the config capacity for RAM to reflect the actual
1245
  // capacity for buffering (excluding BufferPool metadata).
1246
  size_t actual_ram_buffer_capacity = 0;
42✔
1247
  for (int slab = 0; slab < config->num_slabs[0]; ++slab) {
201✔
1248
    size_t slab_bytes =
159✔
1249
      (size_t)buffer_counts[0][slab] * (size_t)slab_buffer_sizes[0][slab];
159✔
1250
    actual_ram_buffer_capacity += slab_bytes;
159✔
1251
  }
1252
  config->capacities[0] = actual_ram_buffer_capacity;
42✔
1253

1254
  u32 total_headers = 0;
42✔
1255
  for (DeviceID device = 0; device < config->num_devices; ++device) {
204✔
1256
    total_headers += header_counts[device];
162✔
1257
  }
1258

1259
  int *num_buffers = PushArray<int>(scratch_arena, config->num_devices);
42✔
1260
  int total_buffers = 0;
42✔
1261
  for (int device = 0; device < config->num_devices; ++device) {
204✔
1262
    DLOG(INFO) << "Device: " << device << std::endl;
162✔
1263
    num_buffers[device] = 0;
162✔
1264
    for (int slab = 0; slab < config->num_slabs[device]; ++slab) {
783✔
1265
      DLOG(INFO) << "    " << slab << "-Buffers: "
1,242✔
1266
                 << buffer_counts[device][slab] << std::endl;
1,242✔
1267
      num_buffers[device] += buffer_counts[device][slab];
621✔
1268
    }
1269
    total_buffers += num_buffers[device];
162✔
1270
    DLOG(INFO) << "    Num Headers: " << header_counts[device] << std::endl;
162✔
1271
    DLOG(INFO) << "    Num Buffers: " << num_buffers[device] << std::endl;
162✔
1272
  }
1273
  DLOG(INFO) << "Total Buffers: " << total_buffers << std::endl;;
42✔
1274

1275
  // Build RAM buffers.
1276

1277
  // NOTE(chogan): Store offsets to the MDM and BPM at the beginning of shared
1278
  // memory so other processes can pick it up.
1279
  ShmemClientInfo *client_info = PushStruct<ShmemClientInfo>(buffer_pool_arena);
42✔
1280

1281
  // TODO(chogan): @configuration Assumes the first Device is RAM
1282
  for (int slab = 0; slab < config->num_slabs[0]; ++slab) {
201✔
1283
    PartitionRamBuffers(buffer_pool_arena, slab_buffer_sizes[0][slab],
159✔
1284
                        buffer_counts[0][slab], config->block_sizes[0]);
159✔
1285
  }
1286

1287
  // Init Devices and Targets
1288

1289
  f32 min_bw = 0;
42✔
1290
  f32 max_bw = 0;
42✔
1291
  Device *devices = InitDevices(buffer_pool_arena, config, min_bw, max_bw);
42✔
1292

1293
  Target *targets = InitTargets(buffer_pool_arena, config, devices, node_id);
42✔
1294

1295
  // Create Free Lists
1296

1297
  BufferID **free_lists = PushArray<BufferID*>(scratch_arena,
42✔
1298
                                               config->num_devices);
1299
  for (int device = 0; device < config->num_devices; ++device) {
204✔
1300
    free_lists[device] = PushArray<BufferID>(scratch_arena,
162✔
1301
                                           config->num_slabs[device]);
1302
  }
1303

1304
  // Build BufferHeaders
1305

1306
  u32 start = 0;
42✔
1307
  u8 *header_begin = 0;
42✔
1308
  ptrdiff_t initial_offset = sizeof(ShmemClientInfo);
42✔
1309
  // TODO(chogan): @configuration Assumes first Device is RAM
1310
  for (i32 i = 0; i < config->num_slabs[0]; ++i) {
201✔
1311
    u32 end = start + buffer_counts[0][i];
159✔
1312
    DeviceID ram_device_id = 0;
159✔
1313
    free_lists[ram_device_id][i] =
159✔
1314
      MakeBufferHeaders(buffer_pool_arena, slab_buffer_sizes[0][i], start, end,
159✔
1315
                        node_id, ram_device_id, initial_offset, &header_begin);
159✔
1316
    start = end;
159✔
1317
    initial_offset += (ptrdiff_t)buffer_counts[0][i] * slab_buffer_sizes[0][i];
159✔
1318
  }
1319

1320
  // NOTE(chogan): Add remaining unused RAM headers
1321
  for (u32 i = num_buffers[0]; i < (u32)header_counts[0]; ++i) {
9,616,130✔
1322
    BufferHeader *header = PushClearedStruct<BufferHeader>(buffer_pool_arena);
9,616,090✔
1323
    header->id = MakeBufferId(node_id, i);
9,616,090✔
1324
    start += 1;
9,616,090✔
1325
  }
1326

1327
  // NOTE(chogan): Add headers for the other Devices
1328
  for (int device = 1; device < config->num_devices; ++device) {
162✔
1329
    for (int slab = 0; slab < config->num_slabs[device]; ++slab) {
582✔
1330
      // NOTE(chogan): File buffering scheme is one file per slab per Device
1331
      u32 end = start + buffer_counts[device][slab];
462✔
1332
      free_lists[device][slab] =
462✔
1333
        MakeBufferHeaders(buffer_pool_arena, slab_buffer_sizes[device][slab],
462✔
1334
                          start, end, node_id, device, 0, &header_begin);
462✔
1335
      start = end;
462✔
1336
    }
1337
  }
1338

1339
  // Build BufferPool
1340

1341
  BufferPool *pool = PushClearedStruct<BufferPool>(buffer_pool_arena);
42✔
1342
  pool->headers_offset = header_begin - shmem_base;
42✔
1343
  pool->devices_offset = (u8 *)devices - shmem_base;
42✔
1344
  pool->targets_offset = (u8 *)targets - shmem_base;
42✔
1345
  pool->num_devices = config->num_devices;
42✔
1346
  pool->total_headers = total_headers;
42✔
1347
  pool->min_device_bw_mbps = min_bw;
42✔
1348
  pool->max_device_bw_mbps = max_bw;
42✔
1349

1350
  for (int device = 0; device < config->num_devices; ++device) {
204✔
1351
    pool->block_sizes[device] = config->block_sizes[device];
162✔
1352
    pool->num_headers[device] = header_counts[device];
162✔
1353
    pool->num_slabs[device] = config->num_slabs[device];
162✔
1354
    BufferID *free_list = PushArray<BufferID>(buffer_pool_arena,
162✔
1355
                                              config->num_slabs[device]);
1356
    i32 *slab_unit_sizes = PushArray<i32>(buffer_pool_arena,
162✔
1357
                                          config->num_slabs[device]);
1358
    i32 *slab_buffer_sizes_for_device = PushArray<i32>(buffer_pool_arena,
162✔
1359
                                            config->num_slabs[device]);
1360
    std::atomic<u32> *available_buffers =
1361
      PushArray<std::atomic<u32>>(buffer_pool_arena, config->num_slabs[device]);
162✔
1362

1363
    for (int slab = 0; slab < config->num_slabs[device]; ++slab) {
783✔
1364
      free_list[slab] = free_lists[device][slab];
621✔
1365
      slab_unit_sizes[slab] = config->slab_unit_sizes[device][slab];
621✔
1366
      slab_buffer_sizes_for_device[slab] = slab_buffer_sizes[device][slab];
621✔
1367
      available_buffers[slab] = buffer_counts[device][slab];
621✔
1368
    }
1369
    pool->free_list_offsets[device] = (u8 *)free_list - shmem_base;
162✔
1370
    pool->slab_unit_sizes_offsets[device] = (u8 *)slab_unit_sizes - shmem_base;
162✔
1371
    pool->slab_buffer_sizes_offsets[device] =
162✔
1372
      ((u8 *)slab_buffer_sizes_for_device - shmem_base);
162✔
1373
    pool->buffers_available_offsets[device] =
162✔
1374
      ((u8 *)available_buffers - shmem_base);
162✔
1375
  }
1376

1377
  ptrdiff_t buffer_pool_offset = (u8 *)pool - shmem_base;
42✔
1378
  client_info->bpm_offset = buffer_pool_offset;
42✔
1379

1380
  return buffer_pool_offset;
84✔
1381
}
1382

1383
/**
1384
   Write buffer pool to \a file file.
1385
*/
1386
void SerializeBufferPoolToFile(SharedMemoryContext *context, FILE *file) {
×
1387
  int result = fwrite(context->shm_base, context->shm_size, 1, file);
×
1388

1389
  if (result < 1) {
×
1390
    FailedLibraryCall("fwrite");
×
1391
  }
1392
}
×
1393

1394
// Per-rank application side initialization
1395

1396
void MakeFullShmemName(char *dest, const char *base) {
59✔
1397
  size_t base_name_length = strlen(base);
59✔
1398
  snprintf(dest, base_name_length + 1, "%s", base);
59✔
1399
  char *username = getenv("USER");
59✔
1400
  if (username) {
59✔
1401
    size_t username_length = strlen(username);
59✔
1402
    size_t total_length = base_name_length + username_length + 1;
59✔
1403

1404
    if (total_length < kMaxBufferPoolShmemNameLength) {
59✔
1405
      snprintf(dest + base_name_length, username_length + 1, "%s", username);
59✔
1406
    } else {
1407
      LOG(ERROR) << "Shared memory name " << base << username
×
1408
                 << " exceeds max length of " << kMaxBufferPoolShmemNameLength
×
1409
                 << std::endl;
×
1410
    }
1411
  } else {
1412
    // TODO(chogan): Use pid?
1413
  }
1414
}
59✔
1415

1416
/**  Terminate if fopen() call fails. Otherwise, return file pointer. */
1417
FILE *FopenOrTerminate(const char *fname, const char *mode) {
×
1418
  FILE *result = fopen(fname, mode);
×
1419

1420
  if (!result) {
×
1421
    LOG(ERROR) << "Failed to open file at " << fname << ": ";
×
1422
    perror(nullptr);
×
1423
    LOG(FATAL) << "Terminating...";
×
1424
  }
1425

1426
  return result;
×
1427
}
1428

1429
/** Terminate if open() call fails. Otherwise, return file pointer. */
1430
int OpenOrTerminate(const std::string &fname, int flags, mode_t mode = 0) {
666✔
1431
  int result = open(fname.c_str(), flags, mode);
666✔
1432

1433
  if (result == -1) {
666✔
1434
    LOG(ERROR) << "Failed to open file at " << fname << ": ";
×
1435
    perror(nullptr);
×
1436
    LOG(FATAL) << "Terminating...";
×
1437
  }
1438

1439
  return result;
666✔
1440
}
1441

1442
void InitFilesForBuffering(SharedMemoryContext *context,
59✔
1443
                           CommunicationContext &comm) {
1444
  BufferPool *pool = GetBufferPoolFromContext(context);
59✔
1445
  context->buffering_filenames.resize(pool->num_devices);
59✔
1446

1447
  // TODO(chogan): Check the limit for open files via getrlimit. We might have
1448
  // to do some smarter opening and closing to stay under the limit. Could also
1449
  // increase the soft limit to the hard limit.
1450
  for (int device_id = 0; device_id < pool->num_devices; ++device_id) {
289✔
1451
    Device *device = GetDeviceById(context, device_id);
230✔
1452
    char *mount_point = &device->mount_point[0];
230✔
1453

1454
    if (strlen(mount_point) == 0) {
230✔
1455
      // NOTE(chogan): RAM Device. No need for a file.
1456
      continue;
59✔
1457
    }
1458

1459
    bool ends_in_slash = mount_point[strlen(mount_point) - 1] == '/';
171✔
1460
    context->buffering_filenames[device_id].resize(pool->num_slabs[device_id]);
171✔
1461

1462
    for (int slab = 0; slab < pool->num_slabs[device_id]; ++slab) {
837✔
1463
      std::string node = (device->is_shared ? std::string("_node") +
1,332✔
1464
                          std::to_string(comm.node_id) : "");
1,998✔
1465
      context->buffering_filenames[device_id][slab] =
1,332✔
1466
        std::string(std::string(mount_point) + (ends_in_slash ? "" : "/") +
3,330✔
1467
                    "device" + std::to_string(device_id) + "_slab" +
3,330✔
1468
                    std::to_string(slab) + node + ".hermes");
3,996✔
1469

1470
      int buffering_file_fd = 0;
666✔
1471
      bool reserve_space = false;
666✔
1472

1473
      if (comm.proc_kind == ProcessKind::kHermes) {
666✔
1474
        if (device->is_shared && comm.sub_proc_id == 0) {
462✔
1475
          // Only one rank should create the file for shared devices
1476
          reserve_space = true;
×
1477
        }
1478

1479
        if (!device->is_shared && comm.first_on_node) {
462✔
1480
          // One rank per node creates the file for node-local devices
1481
          reserve_space = true;
462✔
1482
        }
1483

1484
        if (reserve_space) {
462✔
1485
          int open_flags = O_RDWR | O_CREAT | O_TRUNC;
462✔
1486
          int open_mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
462✔
1487
          buffering_file_fd =
1488
            OpenOrTerminate(context->buffering_filenames[device_id][slab],
462✔
1489
                            open_flags, open_mode);
1490

1491
          u32 num_buffers = GetNumBuffersAvailable(context, device_id, slab);
462✔
1492
          i32 buffer_size = GetSlabBufferSize(context, device_id, slab);
462✔
1493
          size_t this_slabs_capacity = num_buffers * buffer_size;
462✔
1494

1495
          // TODO(chogan): Use posix_fallocate when it is available
1496
          // if (device->has_fallocate) {
1497
          //   int fallocate_result = posix_fallocate(buffering_file_fd, 0,
1498
          //                                          this_slabs_capacity);
1499
          // }
1500

1501
          int ftruncate_result = ftruncate(buffering_file_fd,
462✔
1502
                                           this_slabs_capacity);
1503
          if (ftruncate_result) {
462✔
1504
            LOG(ERROR) << "Failed to allocate buffering file at "
×
1505
                       << context->buffering_filenames[device_id][slab] << ": ";
×
1506
            FailedLibraryCall("ftruncate");
×
1507
          }
1508
        }
1509
      }
1510

1511
      WorldBarrier(&comm);
666✔
1512

1513
      if (!reserve_space) {
666✔
1514
        // File should already be created, so we just open it
1515
        int open_flags = O_RDWR;
204✔
1516
        buffering_file_fd =
1517
          OpenOrTerminate(context->buffering_filenames[device_id][slab],
204✔
1518
                          open_flags);
1519
      }
1520
      context->open_files[device_id][slab] = buffering_file_fd;
666✔
1521
    }
1522
  }
1523
}
59✔
1524

1525
/** Initialize shared memory. */
1526
u8 *InitSharedMemory(const char *shmem_name, size_t total_size) {
42✔
1527
  u8 *result = 0;
42✔
1528
  int shmem_fd =
1529
    shm_open(shmem_name, O_CREAT | O_RDWR | O_TRUNC, S_IRUSR | S_IWUSR);
42✔
1530
  if (shmem_fd < 0) {
42✔
1531
    // shm_unlink(shmem_name);
1532
    FailedLibraryCall("shm_open");
×
1533
  }
1534

1535
  int ftruncate_result = ftruncate(shmem_fd, total_size);
42✔
1536
  if (ftruncate_result != 0) {
42✔
1537
    FailedLibraryCall("ftruncate");
×
1538
  }
1539

1540
  result = (u8 *)mmap(0, total_size, PROT_READ | PROT_WRITE, MAP_SHARED,
42✔
1541
                      shmem_fd, 0);
1542

1543
  if (result == MAP_FAILED) {
42✔
1544
    FailedLibraryCall("mmap");
×
1545
  }
1546
  if (close(shmem_fd) == -1) {
42✔
1547
    FailedLibraryCall("close");
×
1548
  }
1549

1550
  return result;
42✔
1551
}
1552

1553
SharedMemoryContext GetSharedMemoryContext(char *shmem_name) {
17✔
1554
  SharedMemoryContext result = {};
17✔
1555

1556
  int shmem_fd = shm_open(shmem_name, O_RDWR, S_IRUSR | S_IWUSR);
17✔
1557

1558
  if (shmem_fd >= 0) {
17✔
1559
    struct stat shm_stat;
17✔
1560
    if (__fxstat(_STAT_VER, shmem_fd, &shm_stat) == 0) {
17✔
1561
      u8 *shm_base = (u8 *)mmap(0, shm_stat.st_size, PROT_READ | PROT_WRITE,
17✔
1562
                                MAP_SHARED, shmem_fd, 0);
1563
      close(shmem_fd);
17✔
1564

1565
      if (shm_base) {
17✔
1566
        // NOTE(chogan): BPM and MDM offsets are stored at the beginning of the
1567
        // shared memory segment
1568
        ShmemClientInfo *client_info = (ShmemClientInfo *)shm_base;
17✔
1569
        result.buffer_pool_offset = client_info->bpm_offset;
17✔
1570
        result.metadata_manager_offset = client_info->mdm_offset;
17✔
1571
        result.shm_base = shm_base;
17✔
1572
        result.shm_size = shm_stat.st_size;
17✔
1573
      } else {
1574
        // TODO(chogan): @logging Error handling
1575
        perror("mmap_failed");
×
1576
      }
1577
    } else {
1578
      // TODO(chogan): @logging Error handling
1579
      perror("fstat failed");
×
1580
    }
1581
  } else {
1582
    // TODO(chogan): @logging Error handling
1583
    perror("shm_open failed");
×
1584
  }
1585

1586
  return result;
17✔
1587
}
1588

1589
/**  Unmap shared memory. */
1590
void UnmapSharedMemory(SharedMemoryContext *context) {
59✔
1591
  munmap(context->shm_base, context->shm_size);
59✔
1592
}
59✔
1593

1594
/** Close buffering files. */
1595
void CloseBufferingFiles(SharedMemoryContext *context) {
59✔
1596
  BufferPool *pool = GetBufferPoolFromContext(context);
59✔
1597

1598
  for (int device_id = 0; device_id < pool->num_devices; ++device_id) {
289✔
1599
    for (int slab = 0; slab < pool->num_slabs[device_id]; ++slab) {
1,123✔
1600
      if (context->open_files[device_id][slab]) {
893✔
1601
        int close_result = close(context->open_files[device_id][slab]);
666✔
1602
        if (close_result != 0) {
666✔
1603
          FailedLibraryCall("close");
×
1604
        }
1605
      }
1606
    }
1607
  }
1608

1609
  if (context->swap_file) {
59✔
1610
    if (fclose(context->swap_file) != 0) {
2✔
1611
      FailedLibraryCall("fclose");
×
1612
    }
1613
  }
1614
}
59✔
1615

1616
void ReleaseSharedMemoryContext(SharedMemoryContext *context) {
59✔
1617
  CloseBufferingFiles(context);
59✔
1618
  UnmapSharedMemory(context);
59✔
1619
}
59✔
1620

1621
// IO clients
1622

1623
/** Write buffer by \a id buffer locally. */
1624
size_t LocalWriteBufferById(SharedMemoryContext *context, BufferID id,
185,302✔
1625
                            const Blob &blob, size_t offset) {
1626
  BufferHeader *header = GetHeaderByIndex(context, id.bits.header_index);
185,302✔
1627
  Device *device = GetDeviceFromHeader(context, header);
185,302✔
1628
  size_t write_size = header->used;
185,302✔
1629

1630
  u8 *at = (u8 *)blob.data + offset;
185,302✔
1631
  if (device->is_byte_addressable) {
185,302✔
1632
    u8 *dest = GetRamBufferPtr(context, header->id);
180,834✔
1633
    memcpy(dest, at, write_size);
180,834✔
1634
  } else {
1635
    int slab_index = GetSlabIndexFromHeader(context, header);
4,468✔
1636
    const char *filename =
1637
      context->buffering_filenames[device->id][slab_index].c_str();
4,468✔
1638
    // TODO(chogan): Use context->open_files
1639
    int fd = open(filename, O_WRONLY);
4,468✔
1640

1641
    if (fd != -1) {
4,468✔
1642
      if (flock(fd, LOCK_EX) != 0) {
4,468✔
1643
        FailedLibraryCall("flock");
×
1644
      }
1645

1646
      ssize_t bytes_written = pwrite(fd, at, write_size, header->data_offset);
4,468✔
1647
      if (bytes_written == -1 || (size_t)bytes_written != write_size) {
4,468✔
1648
        FailedLibraryCall("pwrite");
×
1649
      }
1650

1651
      if (flock(fd, LOCK_UN) != 0) {
4,468✔
1652
        FailedLibraryCall("flock");
×
1653
      }
1654

1655
      if (close(fd) != 0) {
4,468✔
1656
        FailedLibraryCall("close");
×
1657
      }
1658
    } else {
1659
      FailedLibraryCall("open");
×
1660
    }
1661
  }
1662

1663
  return write_size;
185,302✔
1664
}
1665

1666
void WriteBlobToBuffers(SharedMemoryContext *context, RpcContext *rpc,
34,277✔
1667
                        const Blob &blob,
1668
                        const std::vector<BufferID> &buffer_ids) {
1669
  size_t bytes_left_to_write = blob.size;
34,277✔
1670
  size_t offset = 0;
34,277✔
1671
  // TODO(chogan): @optimization Handle sequential buffers as one I/O operation
1672
  // TODO(chogan): @optimization Aggregate multiple RPCs into one
1673
  for (const auto &id : buffer_ids) {
219,382✔
1674
    size_t bytes_written = 0;
185,105✔
1675
    if (BufferIsRemote(rpc, id)) {
185,105✔
1676
      // TODO(chogan): @optimization Set up bulk transfer if blob.size is > 4K
1677
      // TODO(chogan): @optimization Only send the portion of the blob we have
1678
      // to write.
1679
      // TODO(chogan): @optimization Avoid copy
1680
      std::vector<u8> data(blob.size);
×
1681
      memcpy(data.data(), blob.data, blob.size);
×
1682
      bytes_written = RpcCall<size_t>(rpc, id.bits.node_id,
×
1683
                                      "RemoteWriteBufferById", id, data,
1684
                                      offset);
1685
    } else {
1686
      bytes_written = LocalWriteBufferById(context, id, blob, offset);
185,105✔
1687
    }
1688
    bytes_left_to_write -= bytes_written;
185,105✔
1689
    offset += bytes_written;
185,105✔
1690
  }
1691
  assert(offset == blob.size);
34,277✔
1692
  assert(bytes_left_to_write == 0);
34,277✔
1693
}
34,277✔
1694

1695
/** Read buffer by \a id buffer locally. */
1696
size_t LocalReadBufferById(SharedMemoryContext *context, BufferID id,
193,768✔
1697
                           Blob *blob, size_t read_offset) {
1698
  BufferHeader *header = GetHeaderByIndex(context, id.bits.header_index);
193,768✔
1699
  Device *device = GetDeviceFromHeader(context, header);
193,768✔
1700
  size_t read_size = header->used;
193,768✔
1701
  size_t result = 0;
193,768✔
1702

1703
  if (read_size > 0) {
193,768✔
1704
    if (device->is_byte_addressable) {
193,768✔
1705
      u8 *src = GetRamBufferPtr(context, header->id);
188,403✔
1706
      memcpy((u8 *)blob->data + read_offset, src, read_size);
188,403✔
1707
      result = read_size;
188,403✔
1708
    } else {
1709
      int slab_index = GetSlabIndexFromHeader(context, header);
5,365✔
1710
      const char *filename =
1711
        context->buffering_filenames[device->id][slab_index].c_str();
5,365✔
1712
      int fd = open(filename, O_RDONLY);
5,365✔
1713

1714
      if (fd != -1) {
5,365✔
1715
        if (flock(fd, LOCK_SH) != 0) {
5,365✔
1716
          FailedLibraryCall("flock");
×
1717
        }
1718

1719
        ssize_t bytes_read = pread(fd, (u8 *)blob->data + read_offset,
5,365✔
1720
                                   read_size, header->data_offset);
1721
        if (bytes_read == -1 || (size_t)bytes_read != read_size) {
5,365✔
1722
          FailedLibraryCall("pread");
×
1723
        }
1724

1725
        if (flock(fd, LOCK_UN) != 0) {
5,365✔
1726
          FailedLibraryCall("flock");
×
1727
        }
1728

1729
        if (close(fd) != 0) {
5,365✔
1730
          FailedLibraryCall("close");
×
1731
        }
1732

1733
        result = bytes_read;
5,365✔
1734
      } else {
1735
        FailedLibraryCall("open");
×
1736
      }
1737
    }
1738
  }
1739

1740
  return result;
193,768✔
1741
}
1742

1743
size_t ReadBlobFromBuffers(SharedMemoryContext *context, RpcContext *rpc,
34,992✔
1744
                           Blob *blob, BufferIdArray *buffer_ids,
1745
                           u32 *buffer_sizes) {
1746
  size_t total_bytes_read = 0;
34,992✔
1747
  // TODO(chogan): @optimization Handle sequential buffers as one I/O operation
1748
  for (u32 i = 0; i < buffer_ids->length; ++i) {
228,563✔
1749
    size_t bytes_read = 0;
193,571✔
1750
    BufferID id = buffer_ids->ids[i];
193,571✔
1751
    if (BufferIsRemote(rpc, id)) {
193,571✔
1752
      // TODO(chogan): @optimization Aggregate multiple RPCs to same node into
1753
      // one RPC.
1754
      if (buffer_sizes[i] > KILOBYTES(4)) {
2✔
1755
        size_t bytes_transferred = BulkRead(rpc, id.bits.node_id,
×
1756
                                            "RemoteBulkReadBufferById",
1757
                                            blob->data + total_bytes_read,
×
1758
                                            buffer_sizes[i], id);
×
1759
        if (bytes_transferred != buffer_sizes[i]) {
×
1760
          LOG(ERROR) << "BulkRead only transferred " << bytes_transferred
×
1761
                     << " out of " << buffer_sizes[i] << " bytes" << std::endl;
×
1762
        }
1763
        bytes_read += bytes_transferred;
×
1764
      } else {
1765
        std::vector<u8> data =
2✔
1766
          RpcCall<std::vector<u8>>(rpc, id.bits.node_id, "RemoteReadBufferById",
1767
                                   id);
2✔
1768
        bytes_read = data.size();
2✔
1769
        // TODO(chogan): @optimization Avoid the copy
1770
        u8 *read_dest = (u8 *)blob->data + total_bytes_read;
2✔
1771
        memcpy(read_dest, data.data(), bytes_read);
2✔
1772
      }
1773
    } else {
1774
      bytes_read = LocalReadBufferById(context, id, blob, total_bytes_read);
193,569✔
1775
    }
1776
    total_bytes_read += bytes_read;
193,571✔
1777
  }
1778

1779
  if (total_bytes_read != blob->size) {
34,992✔
1780
    LOG(ERROR) << __func__ << "expected to read a Blob of size " << blob->size
×
1781
               << " but only read " << total_bytes_read << std::endl;
×
1782
  }
1783

1784
  return total_bytes_read;
34,992✔
1785
}
1786

1787
/** Read \a blob_id BLOB. */
1788
size_t ReadBlobById(SharedMemoryContext *context, RpcContext *rpc, Arena *arena,
34,993✔
1789
                    Blob blob, BlobID blob_id) {
1790
  size_t result = 0;
34,993✔
1791

1792
  BufferIdArray buffer_ids = {};
34,993✔
1793
  if (hermes::BlobIsInSwap(blob_id)) {
34,993✔
1794
    buffer_ids = GetBufferIdsFromBlobId(arena, context, rpc, blob_id, NULL);
1✔
1795
    SwapBlob swap_blob = IdArrayToSwapBlob(buffer_ids);
1✔
1796
    result = ReadFromSwap(context, blob, swap_blob);
1✔
1797
  } else {
1798
    u32 *buffer_sizes = 0;
34,992✔
1799
    buffer_ids = GetBufferIdsFromBlobId(arena, context, rpc, blob_id,
1800
                                        &buffer_sizes);
34,992✔
1801
    result = ReadBlobFromBuffers(context, rpc, &blob, &buffer_ids,
34,992✔
1802
                                 buffer_sizes);
1803
  }
1804

1805
  return result;
34,993✔
1806
}
1807

1808
/** Read a remote \a blob_id BLOB. */
1809
size_t ReadBlobById(SharedMemoryContext *context, RpcContext *rpc, Arena *arena,
7,237✔
1810
                    api::Blob &dest, BlobID blob_id) {
1811
  hermes::Blob blob = {};
7,237✔
1812
  blob.data = dest.data();
7,237✔
1813
  blob.size = dest.size();
7,237✔
1814
  size_t result = ReadBlobById(context, rpc, arena, blob, blob_id);
7,237✔
1815

1816
  return result;
7,237✔
1817
}
1818

1819
/** Open swap file. */
1820
void OpenSwapFile(SharedMemoryContext *context, u32 node_id) {
4✔
1821
  if (!context->swap_file) {
4✔
1822
    MetadataManager *mdm = GetMetadataManagerFromContext(context);
2✔
1823
    std::string swap_path = GetSwapFilename(mdm, node_id);
4✔
1824
    context->swap_file = fopen(swap_path.c_str(), "a+");
2✔
1825

1826
    if (!context->swap_file) {
2✔
1827
      FailedLibraryCall("fopen");
×
1828
    }
1829
  }
1830
}
4✔
1831

1832
/** Write \a blob data to a swap file. */
1833
SwapBlob WriteToSwap(SharedMemoryContext *context, Blob blob, u32 node_id,
2✔
1834
                     BucketID bucket_id) {
1835
  SwapBlob result = {};
2✔
1836

1837
  OpenSwapFile(context, node_id);
2✔
1838
  if (fseek(context->swap_file, 0, SEEK_END) != 0) {
2✔
1839
    FailedLibraryCall("fseek");
×
1840
  }
1841

1842
  long int file_position = ftell(context->swap_file);
2✔
1843
  if (file_position == -1) {
2✔
1844
    FailedLibraryCall("ftell");
×
1845
  }
1846
  result.offset = file_position;
2✔
1847

1848
  if (fwrite(blob.data, 1, blob.size, context->swap_file) != blob.size) {
2✔
1849
    FailedLibraryCall("fwrite");
×
1850
  }
1851

1852
  if (fflush(context->swap_file) != 0) {
2✔
1853
    FailedLibraryCall("fflush");
×
1854
  }
1855

1856
  result.node_id = node_id;
2✔
1857
  result.bucket_id = bucket_id;
2✔
1858
  result.size = blob.size;
2✔
1859

1860
  return result;
2✔
1861
}
1862

1863
/** Put \a data to a remote swap. */
1864
SwapBlob PutToSwap(SharedMemoryContext *context, RpcContext *rpc,
2✔
1865
                   const std::string &name, BucketID bucket_id, const u8 *data,
1866
                   size_t size) {
1867
  hermes::Blob blob = {};
2✔
1868
  blob.data = (u8 *)data;
2✔
1869
  blob.size = size;
2✔
1870

1871
  u32 target_node = rpc->node_id;
2✔
1872
  SwapBlob swap_blob =  WriteToSwap(context, blob, target_node, bucket_id);
2✔
1873
  std::vector<BufferID> buffer_ids = SwapBlobToVec(swap_blob);
4✔
1874
  AttachBlobToBucket(context, rpc, name.c_str(), bucket_id, buffer_ids,
2✔
1875
                     kSwapTargetId, true);
1876

1877
  return swap_blob;
4✔
1878
}
1879

1880
/** Read  \a blob data from swap file. */
1881
size_t ReadFromSwap(SharedMemoryContext *context, Blob blob,
2✔
1882
                  SwapBlob swap_blob) {
1883
  u32 node_id = swap_blob.node_id;
2✔
1884
  OpenSwapFile(context, node_id);
2✔
1885
  if (fseek(context->swap_file, swap_blob.offset, SEEK_SET) != 0) {
2✔
1886
    FailedLibraryCall("fseek");
×
1887
  }
1888

1889
  if (fread(blob.data, 1, swap_blob.size, context->swap_file) !=
2✔
1890
      swap_blob.size) {
2✔
1891
    FailedLibraryCall("fread");
×
1892
  }
1893

1894
  return swap_blob.size;
2✔
1895
}
1896

1897
/** Place \a blob BLOB. */
1898
api::Status PlaceBlob(SharedMemoryContext *context, RpcContext *rpc,
34,279✔
1899
                      PlacementSchema &schema, Blob blob,
1900
                      const std::string &name, BucketID bucket_id,
1901
                      const api::Context &ctx,
1902
                      bool called_from_buffer_organizer) {
1903
  api::Status result;
34,279✔
1904

1905
  if (ContainsBlob(context, rpc, bucket_id, name)
34,279✔
1906
      && !called_from_buffer_organizer) {
34,279✔
1907
    // TODO(chogan) @optimization If the existing buffers are already large
1908
    // enough to hold the new Blob, then we don't need to release them.
1909
    // Additionally, no metadata operations would be required.
1910
    DestroyBlobByName(context, rpc, bucket_id, name);
12,191✔
1911
  }
1912

1913
  HERMES_BEGIN_TIMED_BLOCK("GetBuffers");
1914
  std::vector<BufferID> buffer_ids = GetBuffers(context, schema);
34,279✔
1915
  HERMES_END_TIMED_BLOCK();
1916

1917
  if (buffer_ids.size()) {
34,279✔
1918
    HERMES_BEGIN_TIMED_BLOCK("WriteBlobToBuffers");
1919
    WriteBlobToBuffers(context, rpc, blob, buffer_ids);
34,277✔
1920
    HERMES_END_TIMED_BLOCK();
1921

1922
    std::pair<size_t, TargetID> max_target =
34,277✔
1923
      *std::max_element(schema.begin(), schema.end(),
34,277✔
1924
                        [](const auto& lhs, const auto& rhs) {
51✔
1925
                          return lhs.first < rhs.first;
51✔
1926
                        });
34,277✔
1927
    TargetID effective_target = max_target.second;
34,277✔
1928

1929
    // NOTE(chogan): Update all metadata associated with this Put
1930
    AttachBlobToBucket(context, rpc, name.c_str(), bucket_id, buffer_ids,
34,277✔
1931
                       effective_target, false, called_from_buffer_organizer);
1932
  } else {
1933
    if (ctx.disable_swap) {
2✔
1934
      result = PLACE_SWAP_BLOB_TO_BUF_FAILED;
×
1935
    } else {
1936
      if (called_from_buffer_organizer) {
2✔
1937
        result = PLACE_SWAP_BLOB_TO_BUF_FAILED;
×
1938
        LOG(ERROR) << result.Msg();
×
1939
      } else {
1940
        SwapBlob swap_blob = PutToSwap(context, rpc, name, bucket_id, blob.data,
2✔
1941
                                       blob.size);
2✔
1942
        result = BLOB_IN_SWAP_PLACE;
2✔
1943
        LOG(WARNING) << result.Msg();
2✔
1944
        RpcCall<void>(rpc, rpc->node_id, "BO::PlaceInHierarchy", swap_blob,
2✔
1945
                      name, ctx);
1946
      }
1947
    }
1948
  }
1949

1950
  return result;
68,558✔
1951
}
1952

1953
/** Persist Bucket using stdio. */
1954
api::Status StdIoPersistBucket(SharedMemoryContext *context, RpcContext *rpc,
1✔
1955
                          Arena *arena, BucketID bucket_id,
1956
                          const std::string &file_name,
1957
                          const std::string &open_mode) {
1958
  api::Status result;
1✔
1959
  FILE *file = fopen(file_name.c_str(), open_mode.c_str());
1✔
1960

1961
  if (file) {
1✔
1962
    std::vector<BlobID> blob_ids = GetBlobIds(context, rpc, bucket_id);
2✔
1963
    for (size_t i = 0; i < blob_ids.size(); ++i) {
4✔
1964
      ScopedTemporaryMemory scratch(arena);
3✔
1965
      size_t blob_size = GetBlobSizeById(context, rpc, arena, blob_ids[i]);
3✔
1966
      // TODO(chogan): @optimization We could use the actual Hermes buffers as
1967
      // the write buffer rather than collecting the whole blob into memory. For
1968
      // now we pay the cost of data copy in order to only do one I/O call.
1969
      api::Blob data(blob_size);
6✔
1970
      size_t num_bytes = blob_size > 0 ? sizeof(data[0]) * blob_size : 0;
3✔
1971
      if (ReadBlobById(context, rpc, arena, data, blob_ids[i]) == blob_size) {
3✔
1972
        // TODO(chogan): For now we just write the blobs in the order in which
1973
        // they were `Put`, but once we have a Trait that represents a file
1974
        // mapping, we'll need pwrite and offsets.
1975
        if (fwrite(data.data(), 1, num_bytes, file) != num_bytes) {
3✔
1976
          result = STDIO_FWRITE_FAILED;
×
1977
          int saved_errno = errno;
×
1978
          LOG(ERROR) << result.Msg() << strerror(saved_errno);
×
1979
          break;
×
1980
        }
1981
      } else {
1982
        result = READ_BLOB_FAILED;
×
1983
        LOG(ERROR) << result.Msg();
×
1984
        break;
×
1985
      }
1986
    }
1987

1988
    if (fclose(file) != 0) {
1✔
1989
      result = STDIO_FCLOSE_FAILED;
×
1990
      int saved_errno = errno;
×
1991
      LOG(ERROR) << result.Msg() << strerror(saved_errno);
×
1992
    }
1993
  } else {
1994
    result = STDIO_FOPEN_FAILED;
×
1995
    int saved_errno = errno;
×
1996
    LOG(ERROR) << result.Msg() << strerror(saved_errno);
×
1997
  }
1998

1999
  return result;
1✔
2000
}
2001

2002
/** Persist BLOB using stdio. */
2003
api::Status StdIoPersistBlob(SharedMemoryContext *context, RpcContext *rpc,
7,234✔
2004
                             Arena *arena, BlobID blob_id, int fd,
2005
                             const i32 &offset) {
2006
  api::Status result;
7,234✔
2007

2008
  if (fd > -1) {
7,234✔
2009
    ScopedTemporaryMemory scratch(arena);
14,468✔
2010
    size_t blob_size = GetBlobSizeById(context, rpc, arena, blob_id);
7,234✔
2011
    // TODO(chogan): @optimization We could use the actual Hermes buffers as
2012
    // the write buffer rather than collecting the whole blob into memory. For
2013
    // now we pay the cost of data copy in order to only do one I/O call.
2014
    api::Blob data(blob_size);
21,702✔
2015
    size_t num_bytes = blob_size > 0 ? sizeof(data[0]) * blob_size : 0;
7,234✔
2016
    if (ReadBlobById(context, rpc, arena, data, blob_id) == blob_size) {
7,234✔
2017
      // EnsureNoNull(data);
2018
      VLOG(1) << "STDIO flush:"
7,234✔
2019
              << "num_bytes=" << num_bytes << ", "
×
2020
              << "fd=" << fd << ", "
×
2021
              << "offset=" << offset << ", "
×
2022
              << "blob_id=" << blob_id.bits.buffer_ids_offset
×
2023
              << std::endl;
×
2024
      ssize_t bytes_written = pwrite(fd, data.data(), num_bytes, offset);
7,234✔
2025
      if (bytes_written == -1 || (size_t)bytes_written != num_bytes) {
7,234✔
2026
        // TODO(chogan):
2027
        // result = POSIX_PWRITE_ERROR;
2028
        // LOG(ERROR) << result.Msg() << strerror(errno);
2029
        FailedLibraryCall("pwrite");
×
2030
      }
2031
    }
2032
  } else {
2033
    result = INVALID_FILE;
×
2034
    LOG(ERROR) << result.Msg();
×
2035
  }
2036

2037
  return result;
7,234✔
2038
}
2039

2040
}  // namespace hermes
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc