• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HDFGroup / hermes / 4485341203

pending completion
4485341203

push

github

Hyo-Kyung Lee
ci: update checkout and cache action versions

6193 of 7556 relevant lines covered (81.96%)

646738.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.37
/src/api/hermes.cc
1
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
2
 * Distributed under BSD 3-Clause license.                                   *
3
 * Copyright by The HDF Group.                                               *
4
 * Copyright by the Illinois Institute of Technology.                        *
5
 * All rights reserved.                                                      *
6
 *                                                                           *
7
 * This file is part of Hermes. The full Hermes copyright notice, including  *
8
 * terms governing use, modification, and redistribution, is contained in    *
9
 * the COPYING file, which can be found at the top directory. If you do not  *
10
 * have access to the file, you may request a copy from help@hdfgroup.org.   *
11
 * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
12

13
#include <sys/mman.h>
14

15
#include <cmath>
16
#include <fstream>
17

18
#include <glog/logging.h>
19

20
#include "utils.h"
21
#include "hermes.h"
22
#include "bucket.h"
23
#include "buffer_pool.h"
24
#include "buffer_pool_internal.h"
25
#include "metadata_management_internal.h"
26
#include "config_parser.h"
27

28
namespace hermes {
29

30
namespace api {
31

32
std::string GetVersion() {
6✔
33
  std::string result(HERMES_VERSION_STRING);
12✔
34

35
  return result;
6✔
36
}
37

38
int Context::default_buffer_organizer_retries;
39
PlacementPolicy Context::default_placement_policy;
40
bool Context::default_rr_split;
41

42
Status RenameBucket(const std::string &old_name,
×
43
                    const std::string &new_name,
44
                    Context &ctx) {
45
  (void)ctx;
46
  Status ret;
×
47

48
  LOG(INFO) << "Renaming Bucket from " << old_name << " to "
×
49
            << new_name << '\n';
×
50

51
  return ret;
×
52
}
53

54
Status TransferBlob(const Bucket &src_bkt,
×
55
                    const std::string &src_blob_name,
56
                    Bucket &dst_bkt,
57
                    const std::string &dst_blob_name,
58
                    Context &ctx) {
59
  (void)src_bkt;
60
  (void)dst_bkt;
61
  (void)ctx;
62
  Status ret;
×
63

64
  LOG(INFO) << "Transferring Blob from " << src_blob_name << " to "
×
65
            << dst_blob_name << '\n';
×
66

67
  HERMES_NOT_IMPLEMENTED_YET;
×
68

69
  return ret;
70
}
71

72
bool Hermes::IsApplicationCore() {
65✔
73
  bool result = comm_.proc_kind == ProcessKind::kApp;
65✔
74

75
  return result;
65✔
76
}
77

78
bool Hermes::IsFirstRankOnNode() {
×
79
  bool result = comm_.first_on_node;
×
80

81
  return result;
×
82
}
83

84
void Hermes::AppBarrier() {
16✔
85
  hermes::SubBarrier(&comm_);
16✔
86
}
16✔
87

88
bool Hermes::BucketContainsBlob(const std::string &bucket_name,
7,381✔
89
                                const std::string &blob_name) {
90
  BucketID bucket_id = GetBucketId(&context_, &rpc_, bucket_name.c_str());
7,381✔
91
  bool result = hermes::ContainsBlob(&context_, &rpc_, bucket_id, blob_name);
7,381✔
92

93
  return result;
7,381✔
94
}
95

96
bool Hermes::BucketExists(const std::string &bucket_name) {
989✔
97
  BucketID id = hermes::GetBucketId(&context_, &rpc_, bucket_name.c_str());
989✔
98
  bool result = !IsNullBucketId(id);
989✔
99

100
  return result;
989✔
101
}
102

103
int Hermes::GetProcessRank() {
4✔
104
  int result = comm_.sub_proc_id;
4✔
105

106
  return result;
4✔
107
}
108

109
int Hermes::GetNodeId() {
×
110
  int result = comm_.node_id;
×
111

112
  return result;
×
113
}
114

115
int Hermes::GetNumProcesses() {
4✔
116
  int result = comm_.app_size;
4✔
117

118
  return result;
4✔
119
}
120

121
void *Hermes::GetAppCommunicator() {
2✔
122
  void *result = hermes::GetAppCommunicator(&comm_);
2✔
123

124
  return result;
2✔
125
}
126

127
void Hermes::Finalize(bool force_rpc_shutdown) {
51✔
128
  hermes::Finalize(&context_, &comm_, &rpc_, shmem_name_.c_str(), &trans_arena_,
51✔
129
                   IsApplicationCore(), force_rpc_shutdown);
51✔
130
  is_initialized = false;
51✔
131
}
51✔
132

133
void Hermes::FinalizeClient(bool stop_daemon) {
8✔
134
  hermes::FinalizeClient(&context_, &rpc_, &comm_, &trans_arena_, stop_daemon);
8✔
135
}
8✔
136

137
void Hermes::RemoteFinalize() {
×
138
  hermes::RpcCall<void>(&rpc_, rpc_.node_id, "RemoteFinalize");
×
139
}
×
140

141
void Hermes::RunDaemon() {
×
142
  hermes::RunDaemon(&context_, &rpc_, &comm_, &trans_arena_,
×
143
                    shmem_name_.c_str());
144
}
×
145

146
}  // namespace api
147

148
/** get arena information from \a config configuration */
149
ArenaInfo GetArenaInfo(Config *config) {
59✔
150
  size_t page_size = sysconf(_SC_PAGESIZE);
59✔
151
  // NOTE(chogan): Assumes first Device is RAM
152
  size_t total_hermes_memory = RoundDownToMultiple(config->capacities[0],
59✔
153
                                                   page_size);
154
  size_t total_pages = total_hermes_memory / page_size;
59✔
155
  size_t pages_left = total_pages;
59✔
156

157
  ArenaInfo result = {};
59✔
158

159
  for (int i = kArenaType_Count - 1; i > kArenaType_BufferPool; --i) {
177✔
160
    size_t desired_pages =
118✔
161
      std::floor(config->arena_percentages[i] * total_pages);
118✔
162
    // NOTE(chogan): Each arena gets 1 page at minimum
163
    size_t pages = std::max(desired_pages, 1UL);
118✔
164
    pages_left -= pages;
118✔
165
    size_t num_bytes = pages * page_size;
118✔
166
    result.sizes[i] = num_bytes;
118✔
167
    result.total += num_bytes;
118✔
168
  }
169

170
  if (pages_left == 0) {
59✔
171
    // TODO(chogan): @errorhandling
172
    HERMES_NOT_IMPLEMENTED_YET;
×
173
  }
174

175
  // NOTE(chogan): BufferPool Arena gets remainder of pages
176
  result.sizes[kArenaType_BufferPool] = pages_left * page_size;
59✔
177
  result.total += result.sizes[kArenaType_BufferPool];
59✔
178

179
  return result;
59✔
180
}
181

182
/** get hosts from \a host_file file  */
183
std::vector<std::string> GetHostsFromFile(const std::string &host_file) {
2✔
184
  std::vector<std::string> result;
2✔
185
  std::fstream file;
4✔
186
  LOG(INFO) << "Reading hosts from " << host_file;
2✔
187

188
  file.open(host_file.c_str(), std::ios::in);
2✔
189
  if (file.is_open()) {
2✔
190
    std::string file_line;
4✔
191
    while (getline(file, file_line)) {
8✔
192
      if (!file_line.empty()) {
6✔
193
        result.emplace_back(file_line);
4✔
194
      }
195
    }
196
  } else {
197
    LOG(FATAL) << "Failed to open host file " << host_file;
×
198
  }
199
  file.close();
2✔
200

201
  return result;
4✔
202
}
203

204
/** push host names */
205
void PushHostNames(Arena *arenas, RpcContext *rpc,
42✔
206
                   const std::vector<std::string> &host_names,
207
                   MetadataManager *mdm, u8 *shmem_base) {
208
  rpc->host_names = PushArray<ShmemString>(&arenas[kArenaType_MetaData],
42✔
209
                                           host_names.size());
42✔
210
  for (size_t i = 0; i < host_names.size(); ++i) {
86✔
211
    char *host_name_mem = PushArray<char>(&arenas[kArenaType_MetaData],
44✔
212
                                          host_names[i].size());
44✔
213
    MakeShmemString(&rpc->host_names[i], (u8 *)host_name_mem, host_names[i]);
44✔
214
  }
215
  mdm->host_names_offset = (u8 *)rpc->host_names - (u8 *)shmem_base;
42✔
216
}
42✔
217

218
/** initialize Hermes core  */
219
SharedMemoryContext InitHermesCore(Config *config, CommunicationContext *comm,
42✔
220
                                   ArenaInfo *arena_info, Arena *arenas,
221
                                   RpcContext *rpc) {
222
  size_t shmem_size = (arena_info->total -
42✔
223
                       arena_info->sizes[kArenaType_Transient]);
42✔
224
  u8 *shmem_base = InitSharedMemory(config->buffer_pool_shmem_name, shmem_size);
42✔
225
  LOG(INFO) << "HERMES CORE" << std::endl;
42✔
226

227
  // NOTE(chogan): Initialize shared arenas
228
  ptrdiff_t base_offset = 0;
42✔
229
  for (int i = 0; i < kArenaType_Count; ++i) {
168✔
230
    if (i == kArenaType_Transient) {
126✔
231
      // NOTE(chogan): Transient arena exists per rank, not in shared memory
232
      continue;
42✔
233
    }
234
    size_t arena_size = arena_info->sizes[i];
84✔
235
    InitArena(&arenas[i], arena_size, shmem_base + base_offset);
84✔
236
    base_offset += arena_size;
84✔
237
  }
238

239
  SharedMemoryContext context = {};
42✔
240
  context.shm_base = shmem_base;
42✔
241
  context.shm_size = shmem_size;
42✔
242
  context.buffer_pool_offset = InitBufferPool(context.shm_base,
42✔
243
                                              &arenas[kArenaType_BufferPool],
244
                                              &arenas[kArenaType_Transient],
245
                                              comm->node_id, config);
246

247
  MetadataManager *mdm =
248
    PushClearedStruct<MetadataManager>(&arenas[kArenaType_MetaData]);
42✔
249
  context.metadata_manager_offset = (u8 *)mdm - (u8 *)shmem_base;
42✔
250

251
  rpc->state = CreateRpcState(&arenas[kArenaType_MetaData]);
42✔
252
  mdm->rpc_state_offset = (u8 *)rpc->state - shmem_base;
42✔
253

254
  if (rpc->use_host_file) {
42✔
255
    std::vector<std::string> host_names =
2✔
256
      GetHostsFromFile(config->rpc_server_host_file);
4✔
257
    CHECK_EQ(host_names.size(), rpc->num_nodes);
2✔
258
    PushHostNames(arenas, rpc, host_names, mdm, shmem_base);
2✔
259
  } else {
260
    PushHostNames(arenas, rpc, config->host_names, mdm, shmem_base);
40✔
261
  }
262
  InitMetadataManager(mdm, rpc, &arenas[kArenaType_MetaData], config);
42✔
263
  InitMetadataStorage(&context, mdm, &arenas[kArenaType_MetaData], config);
42✔
264

265
  ShmemClientInfo *client_info = (ShmemClientInfo *)shmem_base;
42✔
266
  client_info->mdm_offset = context.metadata_manager_offset;
42✔
267

268
  return context;
42✔
269
}
270

271
/** boostrap shared memory  */
272
SharedMemoryContext
273
BootstrapSharedMemory(Arena *arenas, Config *config, CommunicationContext *comm,
59✔
274
                      RpcContext *rpc, bool is_daemon, bool is_adapter) {
275
  size_t bootstrap_size = KILOBYTES(4);
59✔
276
  u8 *bootstrap_memory = (u8 *)malloc(bootstrap_size);
59✔
277
  InitArena(&arenas[kArenaType_Transient], bootstrap_size, bootstrap_memory);
59✔
278

279
  ArenaInfo arena_info = GetArenaInfo(config);
59✔
280
  // NOTE(chogan): The buffering capacity for the RAM Device is the size of the
281
  // BufferPool Arena
282
  config->capacities[0] = arena_info.sizes[kArenaType_BufferPool];
59✔
283
  size_t trans_arena_size =
284
    InitCommunication(comm, &arenas[kArenaType_Transient],
59✔
285
                      arena_info.sizes[kArenaType_Transient], is_daemon,
286
                      is_adapter);
287

288
  GrowArena(&arenas[kArenaType_Transient], trans_arena_size);
59✔
289
  comm->state = arenas[kArenaType_Transient].base;
59✔
290

291
  InitRpcContext(rpc, comm->num_nodes, comm->node_id, config);
59✔
292

293
  SharedMemoryContext result = {};
59✔
294
  if (comm->proc_kind == ProcessKind::kHermes && comm->first_on_node) {
59✔
295
    result = InitHermesCore(config, comm, &arena_info, arenas, rpc);
42✔
296
  }
297

298
  return result;
118✔
299
}
300

301
// TODO(chogan): https://github.com/HDFGroup/hermes/issues/323
302
#if 0
303
static void InitGlog() {
304
  FLAGS_logtostderr = 1;
305
  const char kMinLogLevel[] = "GLOG_minloglevel";
306
  char *min_log_level = getenv(kMinLogLevel);
307

308
  if (!min_log_level) {
309
    FLAGS_minloglevel = 0;
310
  }
311

312
  FLAGS_v = 0;
313

314
  google::InitGoogleLogging("hermes");
315
}
316
#endif
317

318
std::shared_ptr<api::Hermes> InitHermes(Config *config, bool is_daemon,
59✔
319
                                        bool is_adapter) {
320
  // TODO(chogan): https://github.com/HDFGroup/hermes/issues/323
321
  // InitGlog();
322

323
  std::string base_shmem_name(config->buffer_pool_shmem_name);
177✔
324
  MakeFullShmemName(config->buffer_pool_shmem_name, base_shmem_name.c_str());
59✔
325

326
  // TODO(chogan): Do we need a transfer window arena? We can probably just use
327
  // the transient arena for this.
328
  Arena arenas[kArenaType_Count] = {};
59✔
329
  CommunicationContext comm = {};
59✔
330
  RpcContext rpc = {};
59✔
331
  SharedMemoryContext context =
59✔
332
    BootstrapSharedMemory(arenas, config, &comm, &rpc, is_daemon, is_adapter);
118✔
333

334
  WorldBarrier(&comm);
59✔
335
  std::shared_ptr<api::Hermes> result = nullptr;
59✔
336

337
  if (comm.proc_kind == ProcessKind::kHermes) {
59✔
338
    result = std::make_shared<api::Hermes>(context);
42✔
339
    result->shmem_name_ = std::string(config->buffer_pool_shmem_name);
42✔
340
  } else {
341
    context = GetSharedMemoryContext(config->buffer_pool_shmem_name);
17✔
342
    SubBarrier(&comm);
17✔
343
    result = std::make_shared<api::Hermes>(context);
17✔
344
    // NOTE(chogan): Give every App process a valid pointer to the internal RPC
345
    // state in shared memory
346
    MetadataManager *mdm = GetMetadataManagerFromContext(&context);
17✔
347
    rpc.state = (void *)(context.shm_base + mdm->rpc_state_offset);
17✔
348
    rpc.host_names =
17✔
349
      (ShmemString *)((u8 *)context.shm_base + mdm->host_names_offset);
17✔
350
  }
351

352
  InitFilesForBuffering(&context, comm);
59✔
353

354
  WorldBarrier(&comm);
59✔
355

356
  rpc.node_id = comm.node_id;
59✔
357
  rpc.num_nodes = comm.num_nodes;
59✔
358

359
  result->trans_arena_ = arenas[kArenaType_Transient];
59✔
360
  result->comm_ = comm;
59✔
361
  result->context_ = context;
59✔
362
  result->rpc_ = rpc;
59✔
363

364
  // NOTE(chogan): The RPC servers have to be started here because they need to
365
  // save a reference to the context and rpc instances that are members of the
366
  // Hermes instance.
367
  if (comm.proc_kind == ProcessKind::kHermes) {
59✔
368
    std::string rpc_server_addr =
42✔
369
      GetRpcAddress(&result->rpc_, config, result->rpc_.node_id,
84✔
370
                    config->rpc_port);
84✔
371
    std::string bo_address =
42✔
372
      GetRpcAddress(&result->rpc_, config, result->rpc_.node_id,
84✔
373
                    config->buffer_organizer_port);
84✔
374

375
    result->rpc_.start_server(&result->context_, &result->rpc_,
84✔
376
                              &result->trans_arena_, rpc_server_addr.c_str(),
42✔
377
                              config->rpc_num_threads);
378

379
    StartBufferOrganizer(&result->context_, &result->rpc_,
84✔
380
                         &result->trans_arena_, bo_address.c_str(),
42✔
381
                         config->bo_num_threads, config->buffer_organizer_port);
382

383
    double sleep_ms = config->system_view_state_update_interval_ms;
42✔
384
    StartGlobalSystemViewStateUpdateThread(&result->context_, &result->rpc_,
42✔
385
                                           &result->trans_arena_, sleep_ms);
42✔
386
  }
387

388
  WorldBarrier(&comm);
59✔
389

390
  api::Context::default_buffer_organizer_retries =
59✔
391
    config->num_buffer_organizer_retries;
59✔
392
  api::Context::default_placement_policy = config->default_placement_policy;
59✔
393
  api::Context::default_rr_split = config->default_rr_split;
59✔
394
  RoundRobin::InitDevices(config, result);
59✔
395

396
  InitRpcClients(&result->rpc_);
59✔
397

398
  // NOTE(chogan): Can only initialize the neighborhood Targets once the RPC
399
  // clients have been initialized.
400
  InitNeighborhoodTargets(&result->context_, &result->rpc_);
59✔
401

402
  result->is_initialized = true;
59✔
403

404
  WorldBarrier(&comm);
59✔
405

406
  return result;
118✔
407
}
408

409
namespace api {
410

411
std::shared_ptr<Hermes> InitHermes(const char *config_file, bool is_daemon,
47✔
412
                                   bool is_adapter) {
413
  u16 endian_test = 0x1;
47✔
414
  char *endian_ptr = (char *)&endian_test;
47✔
415
  if (endian_ptr[0] != 1) {
47✔
416
    LOG(FATAL) << "Big endian machines not supported yet." << std::endl;
×
417
  }
418

419
  LOG(INFO) << "Initializing hermes config" << std::endl;
47✔
420

421
  hermes::Config config = {};
94✔
422
  hermes::InitConfig(&config, config_file);
47✔
423
  std::shared_ptr<Hermes> result = InitHermes(&config, is_daemon, is_adapter);
47✔
424

425
  LOG(INFO) << "Initialized hermes config" << std::endl;
47✔
426

427
  return result;
94✔
428
}
429

430
}  // namespace api
431

432
std::shared_ptr<api::Hermes> InitHermesClient(const char *config_file) {
8✔
433
  std::shared_ptr<api::Hermes> result =
434
    api::InitHermes(config_file, false, true);
8✔
435

436
  return result;
8✔
437
}
438

439
std::shared_ptr<api::Hermes> InitHermesDaemon(char *config_file) {
22✔
440
  std::shared_ptr<api::Hermes> result = api::InitHermes(config_file, true);
22✔
441

442
  return result;
22✔
443
}
444

445
std::shared_ptr<api::Hermes> InitHermesDaemon(Config *config) {
11✔
446
  std::shared_ptr<api::Hermes> result = InitHermes(config, true, false);
11✔
447

448
  return result;
11✔
449
}
450

451
}  // namespace hermes
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc