• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HDFGroup / hermes / 4837525566

pending completion
4837525566

Pull #515

github

GitHub
Merge f018521fc into 87672e106
Pull Request #515: v1.0

5502 of 5502 new or added lines in 117 files covered. (100.0%)

4997 of 7300 relevant lines covered (68.45%)

6194762.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.28
/src/buffer_organizer.cc
1
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
2
 * Distributed under BSD 3-Clause license.                                   *
3
 * Copyright by The HDF Group.                                               *
4
 * Copyright by the Illinois Institute of Technology.                        *
5
 * All rights reserved.                                                      *
6
 *                                                                           *
7
 * This file is part of Hermes. The full Hermes copyright notice, including  *
8
 * terms governing use, modification, and redistribution, is contained in    *
9
 * the COPYING file, which can be found at the top directory. If you do not  *
10
 * have access to the file, you may request a copy from help@hdfgroup.org.   *
11
 * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
12

13
#include "buffer_organizer.h"
14
#include "metadata_manager.h"
15
#include "borg_io_clients/borg_io_client_factory.h"
16
#include "hermes.h"
17
#include "bucket.h"
18

19
namespace hermes {
20

21
/**====================================
22
 * BORG I/O thread manager
23
 * ===================================*/
24

25
/** Spawn the enqueuing thread */
26
void BorgIoThreadManager::SpawnFlushMonitor(int num_threads) {
28✔
27
  auto flush_scheduler = [](void *args) {
56✔
28
    HILOG(kDebug, "Flushing scheduler thread has started")
28✔
29
    (void) args;
28✔
30
    auto borg = &HERMES->borg_;
28✔
31
    while (HERMES_THREAD_MANAGER->Alive()) {
523✔
32
      borg->LocalEnqueueFlushes();
495✔
33
      // TODO(llogan): make configurable
34
      tl::thread::self().sleep(*HERMES->rpc_.server_engine_, 1000);
990✔
35
    }
36
    HERMES_BORG_IO_THREAD_MANAGER->Join();
28✔
37
    HILOG(kDebug, "Flush scheduler thread has stopped")
28✔
38
  };
28✔
39
  HERMES_THREAD_MANAGER->Spawn(flush_scheduler);
28✔
40
}
28✔
41

42
/** Spawn the flushing I/O threads */
43
void BorgIoThreadManager::SpawnFlushWorkers(int num_threads) {
28✔
44
  // Define flush worker thread function
45
  // The function will continue working until all pending flushes have
46
  // been processed
47
  auto flush = [](void *params) {
56✔
48
    BufferOrganizer *borg = &HERMES->borg_;
28✔
49
    int *id = reinterpret_cast<int*>(params);
28✔
50
    BorgIoThreadQueue &bq = (*borg->queues_)[*id];
28✔
51
    BorgIoThreadQueueInfo& bq_info = bq.GetSecond();
28✔
52
    _BorgIoThreadQueue& queue = bq.GetFirst();
28✔
53
    HILOG(kDebug, "Flushing worker {} has started", bq_info.id_)
28✔
54
    while (HERMES_BORG_IO_THREAD_MANAGER->Alive() ||
14,093✔
55
          (!HERMES_BORG_IO_THREAD_MANAGER->Alive() && bq_info.load_)) {
28✔
56
      borg->LocalProcessFlushes(bq_info, queue);
14,037✔
57
      tl::thread::self().sleep(*HERMES->rpc_.server_engine_, 1);
28,074✔
58
    }
59
    HILOG(kDebug, "Flushing worker {} has stopped", bq_info.id_)
28✔
60
  };
28✔
61

62
  // Create the flushing threads
63
  for (int i = 0; i < num_threads; ++i) {
56✔
64
    BorgIoThreadQueue &bq = (*queues_)[i];
28✔
65
    BorgIoThreadQueueInfo& bq_info = bq.GetSecond();
28✔
66
    bq_info.id_ = i;
28✔
67
    bq_info.load_ = 0;
28✔
68
    HERMES_THREAD_MANAGER->Spawn(flush, &bq_info.id_);
28✔
69
  }
70
}
28✔
71

72
/** Wait for flushing to complete */
73
void BorgIoThreadManager::WaitForFlush() {
3,304✔
74
  HILOG(kDebug, "Waiting for all flushing to complete {}",
3,304✔
75
        HERMES->rpc_.node_id_)
76
  while (IsFlushing()) {
4,707✔
77
    tl::thread::self().sleep(*HERMES->rpc_.server_engine_, 5);
2,806✔
78
  }
79
  HILOG(kDebug, "Finished flushing {}",
3,304✔
80
        HERMES->rpc_.node_id_)
81
}
3,304✔
82

83
/**====================================
84
 * SHM Init
85
 * ===================================*/
86

87
/**
88
 * Initialize the BORG
89
 * REQUIRES mdm to be initialized already.
90
 * */
91
void BufferOrganizer::shm_init(hipc::ShmArchive<BufferOrganizerShm> &header,
28✔
92
                               hipc::Allocator *alloc) {
93
  shm_deserialize(header);
28✔
94

95
  // Initialize device information
96
  for (TargetInfo &target : (*mdm_->targets_)) {
140✔
97
    DeviceInfo &dev_info =
112✔
98
        (*mdm_->devices_)[target.id_.GetDeviceId()];
112✔
99
    if (dev_info.mount_dir_->size() == 0) {
112✔
100
      dev_info.io_api_ = IoInterface::kRam;
28✔
101
    } else {
102
      dev_info.io_api_ = IoInterface::kPosix;
84✔
103
    }
104
    auto io_client = borg::BorgIoClientFactory::Get(dev_info.io_api_);
224✔
105
    io_client->Init(dev_info);
112✔
106
  }
107

108
  // Print out device info
109
  mdm_->PrintDeviceInfo();
28✔
110

111
  // Spawn the thread for flushing blobs
112
  int num_threads = HERMES->server_config_.borg_.num_threads_;
28✔
113
  HSHM_MAKE_AR((*header).queues_, alloc, num_threads)
28✔
114
  HERMES_BORG_IO_THREAD_MANAGER->queues_ = queues_;
28✔
115
  HERMES_BORG_IO_THREAD_MANAGER->SpawnFlushMonitor(num_threads);
28✔
116
  HERMES_BORG_IO_THREAD_MANAGER->SpawnFlushWorkers(num_threads);
28✔
117
}
28✔
118

119
/**====================================
120
 * SHM Deserialization
121
 * ===================================*/
122

123
/** Deserialize the BORG from shared memory */
124
void BufferOrganizer::shm_deserialize(
105✔
125
    hipc::ShmArchive<BufferOrganizerShm> &header)  {
126
  mdm_ = &HERMES->mdm_;
105✔
127
  rpc_ = &HERMES->rpc_;
105✔
128
  queues_ = (*header).queues_.get();
105✔
129
  HERMES_BORG_IO_THREAD_MANAGER->queues_ = queues_;
105✔
130
}
105✔
131

132
/**====================================
133
 * Destructors
134
 * ===================================*/
135

136
/** Finalize the BORG */
137
void BufferOrganizer::shm_destroy_main() {
×
138
  queues_->shm_destroy();
×
139
}
×
140

141
/**====================================
142
 * BORG Methods
143
 * ===================================*/
144

145
/** Stores a blob into a set of buffers */
146
RPC void BufferOrganizer::LocalPlaceBlobInBuffers(
27,967✔
147
    const Blob &blob, std::vector<BufferInfo> &buffers) {
148
  AUTO_TRACE(1)
149
  size_t blob_off = 0;
27,967✔
150
  for (BufferInfo &buffer_info : buffers) {
56,737✔
151
    if (buffer_info.tid_.GetNodeId() != mdm_->rpc_->node_id_) {
28,770✔
152
      continue;
×
153
    }
154
    TIMER_START("DeviceInfo")
155
    DeviceInfo &dev_info =
28,770✔
156
        (*mdm_->devices_)[buffer_info.tid_.GetDeviceId()];
28,770✔
157
    if (buffer_info.t_off_ + buffer_info.blob_size_ >
28,770✔
158
        dev_info.capacity_) {
28,770✔
159
      HELOG(kFatal, "Out of bounds: attempting to write to offset: {} / {} "
×
160
            "on device {}: {}",
161
            buffer_info.t_off_ + buffer_info.blob_size_,
162
            dev_info.capacity_,
163
            buffer_info.tid_.GetDeviceId(),
164
            dev_info.mount_point_->str())
165
    }
166
    auto io_client = borg::BorgIoClientFactory::Get(dev_info.io_api_);
57,540✔
167
    TIMER_END()
168

169
    TIMER_START("IO")
170
    bool ret = io_client->Write(dev_info,
28,770✔
171
                                blob.data() + blob_off,
28,770✔
172
                                buffer_info.t_off_,
173
                                buffer_info.blob_size_);
28,770✔
174
    TIMER_END()
175
    blob_off += buffer_info.blob_size_;
28,770✔
176
    if (!ret) {
28,770✔
177
      mdm_->PrintDeviceInfo();
×
178
      HELOG(kFatal, "Could not perform I/O in BORG."
×
179
            " Writing to target ID:"
180
            " (node_id: {}, tgt_id: {}, dev_id: {},"
181
            " t_off: {}, blob_size: {})",
182
            buffer_info.tid_.GetNodeId(),
183
            buffer_info.tid_.GetIndex(),
184
            buffer_info.tid_.GetDeviceId(),
185
            buffer_info.t_off_,
186
            buffer_info.blob_size_)
187
    }
188
  }
189
}
27,967✔
190

191
/** Globally store a blob into a set of buffers */
192
void BufferOrganizer::GlobalPlaceBlobInBuffers(
×
193
    const Blob &blob, std::vector<BufferInfo> &buffers) {
194
  AUTO_TRACE(1)
195
  // Get the nodes to transfer buffers to
196
  size_t total_size;
×
197
  auto unique_nodes = BufferPool::GroupByNodeId(buffers, total_size);
×
198

199
  // Send the buffers to each node
200
  for (auto &[node_id, size] : unique_nodes) {
×
201
    if (NODE_ID_IS_LOCAL(node_id)) {
×
202
      LocalPlaceBlobInBuffers(blob, buffers);
×
203
    } else {
204
      rpc_->IoCall<void>(
×
205
          node_id, "RpcPlaceBlobInBuffers",
206
          IoType::kWrite, blob.data(), blob.size(),
207
          blob.size(), buffers);
×
208
    }
209
  }
210
}
×
211

212
/** Stores a blob into a set of buffers */
213
RPC void BufferOrganizer::LocalReadBlobFromBuffers(
48,919✔
214
    Blob &blob, std::vector<BufferInfo> &buffers) {
215
  AUTO_TRACE(1)
216
  size_t blob_off = 0;
48,919✔
217
  for (BufferInfo &buffer_info : buffers) {
98,114✔
218
    if (buffer_info.tid_.GetNodeId() != mdm_->rpc_->node_id_) {
49,195✔
219
      continue;
×
220
    }
221
    DeviceInfo &dev_info =
49,195✔
222
        (*mdm_->devices_)[buffer_info.tid_.GetDeviceId()];
49,195✔
223
    if (buffer_info.t_off_ + buffer_info.blob_size_ >
49,195✔
224
        dev_info.capacity_) {
49,195✔
225
      HELOG(kFatal, "Out of bounds: attempting to read from offset: {} / {}"
×
226
            " on device {}: {}",
227
            buffer_info.t_off_ + buffer_info.blob_size_,
228
            dev_info.capacity_,
229
            buffer_info.tid_.GetDeviceId(),
230
            dev_info.mount_point_->str())
231
    }
232
    auto io_client = borg::BorgIoClientFactory::Get(dev_info.io_api_);
98,390✔
233
    bool ret = io_client->Read(dev_info,
49,195✔
234
                               blob.data() + blob_off,
49,195✔
235
                               buffer_info.t_off_,
236
                               buffer_info.blob_size_);
49,195✔
237
    blob_off += buffer_info.blob_size_;
49,195✔
238
    if (!ret) {
49,195✔
239
      HELOG(kFatal, "Could not perform I/O in BORG."
×
240
                    " reading from target ID:"
241
                    " (node_id: {}, tgt_id: {}, dev_id: {},"
242
                    " t_off: {}, blob_size: {})",
243
            buffer_info.tid_.GetNodeId(),
244
            buffer_info.tid_.GetIndex(),
245
            buffer_info.tid_.GetDeviceId(),
246
            buffer_info.t_off_,
247
            buffer_info.blob_size_)
248
    }
249
  }
250
}
48,919✔
251

252
/** The Global form of ReadBLobFromBuffers */
253
Blob BufferOrganizer::GlobalReadBlobFromBuffers(
51,580✔
254
    std::vector<BufferInfo> &buffers) {
255
  AUTO_TRACE(1)
256
  // Get the nodes to transfer buffers to
257
  size_t total_size = 0;
51,580✔
258
  auto unique_nodes = BufferPool::GroupByNodeId(buffers, total_size);
103,160✔
259

260
  // Send the buffers to each node
261
  std::vector<Blob> blobs;
103,160✔
262
  blobs.reserve(unique_nodes.size());
51,580✔
263
  for (auto &[node_id, size] : unique_nodes) {
100,485✔
264
    blobs.emplace_back(size);
48,905✔
265
    if (NODE_ID_IS_LOCAL(node_id)) {
48,905✔
266
      LocalReadBlobFromBuffers(blobs.back(), buffers);
48,905✔
267
    } else {
268
      rpc_->IoCall<void>(
×
269
          node_id, "RpcReadBlobFromBuffers",
270
          IoType::kRead, blobs.back().data(), size,
×
271
          size, buffers);
272
    }
273
  }
274

275
  // If the blob was only on one node
276
  if (unique_nodes.size() == 1) {
51,580✔
277
    return std::move(blobs.back());
48,905✔
278
  }
279

280
  // Merge the blobs at the end
281
  hapi::Blob blob(total_size);
54,255✔
282
  for (size_t i = 0; i < unique_nodes.size(); ++i) {
2,675✔
283
    auto &[node_id, size] = unique_nodes[i];
×
284
    auto &tmp_blob = blobs[i];
×
285
    size_t tmp_blob_off = 0;
×
286
    for (BufferInfo &info : buffers) {
×
287
      if (info.tid_.GetNodeId() != node_id) {
×
288
        continue;
×
289
      }
290
      memcpy(blob.data() + info.blob_off_,
×
291
             tmp_blob.data() + tmp_blob_off,
×
292
             info.blob_size_);
293
      tmp_blob_off += info.blob_size_;
×
294
    }
295
  }
296
  return blob;
2,675✔
297
}
298

299
/** Re-organize blobs based on a score */
300
void BufferOrganizer::GlobalOrganizeBlob(const std::string &bucket_name,
×
301
                                         const std::string &blob_name,
302
                                         float score) {
303
  AUTO_TRACE(1)
304
  auto bkt = HERMES->GetBucket(bucket_name);
×
305
  BlobId blob_id;
×
306
  bkt.GetBlobId(blob_name, blob_id);
×
307
  float blob_score = bkt.GetBlobScore(blob_id);
×
308
  Context ctx;
×
309

310
  HILOG(kDebug, "Changing blob score from: {} to {}", blob_score, score)
×
311

312
  // Skip organizing if below threshold
313
  if (abs(blob_score - score) < .05) {
×
314
    return;
×
315
  }
316

317
  // Lock the blob to ensure it doesn't get modified
318
  bkt.LockBlob(blob_id, MdLockType::kExternalWrite);
×
319

320
  // Get the blob
321
  hapi::Blob blob;
×
322
  bkt.Get(blob_id, blob, ctx);
×
323

324
  // Re-emplace the blob with new score
325
  BlobId tmp_id;
×
326
  ctx.blob_score_ = score;
×
327
  bkt.Put(blob_name, blob, tmp_id, ctx);
×
328

329
  // Unlock the blob
330
  bkt.UnlockBlob(blob_id, MdLockType::kExternalWrite);
×
331
}
332

333
/**====================================
334
 * BORG Flushing methods
335
 * ===================================*/
336

337
/** Flush all blobs registered in this daemon */
338
void BufferOrganizer::LocalEnqueueFlushes() {
3,799✔
339
  auto mdm = &HERMES->mdm_;
3,799✔
340
  // Acquire the read lock on the blob map
341
  ScopedRwReadLock blob_map_lock(mdm->header_->lock_[kBlobMapLock],
3,799✔
342
                                 kBORG_LocalEnqueueFlushes);
7,598✔
343
  // Begin checking for blobs which need flushing
344
  size_t count = 0;
3,799✔
345
  for (hipc::pair<BlobId, BlobInfo>& blob_p : *mdm->blob_map_) {
59,765✔
346
    BlobId &blob_id = blob_p.GetFirst();
17,389✔
347
    BlobInfo &blob_info = blob_p.GetSecond();
17,389✔
348
    // Verify that flush is needing to happen
349
    if (blob_info.mod_count_ == blob_info.last_flush_) {
17,389✔
350
      continue;
9,141✔
351
    }
352
    // Check if bucket has flush trait
353
    TagId &bkt_id = blob_info.tag_id_;
8,248✔
354
    size_t blob_size = blob_info.blob_size_;
8,248✔
355
    std::vector<Trait*> traits = HERMES->GetTraits(bkt_id,
8,248✔
356
                                                   HERMES_TRAIT_FLUSH);
11,990✔
357
    if (traits.size() == 0) {
8,248✔
358
      continue;
18,153✔
359
    }
360
    // Schedule the blob on an I/O worker thread
361
    HERMES_BORG_IO_THREAD_MANAGER->Enqueue(bkt_id, blob_id, blob_size,
3,742✔
362
                                           std::move(traits));
3,742✔
363
    count += 1;
3,742✔
364
  }
365
  if (count) {
3,799✔
366
    HILOG(kDebug, "Flushing {} blobs", count);
947✔
367
  }
368
}
3,799✔
369

370
/** Actually process flush operations */
371
void BufferOrganizer::LocalProcessFlushes(
14,037✔
372
    BorgIoThreadQueueInfo &bq_info,
373
    _BorgIoThreadQueue& queue) {
374
  // Begin flushing
375
  ScopedRwWriteLock flush_lock(mdm_->header_->lock_[kFlushLock],
14,037✔
376
                               kBORG_LocalProcessFlushes);
14,037✔
377
  // Process tasks
378
  auto entry = hipc::make_uptr<BorgIoTask>();
28,074✔
379
  while (!queue.pop(*entry).IsNull()) {
35,558✔
380
    BorgIoTask &task = *entry;
3,742✔
381
    HILOG(kDebug, "Attempting to flush blob {}", task.blob_id_);
3,742✔
382
    Blob blob;
6,851✔
383

384
    // Verify the blob exists and then read lock it
385
    auto iter = mdm_->blob_map_->find(task.blob_id_);
3,742✔
386
    if (iter.is_end()) {
3,742✔
387
      bq_info.load_.fetch_sub(task.blob_size_);
×
388
      HILOG(kDebug, "Finished BORG task for blob {} and load {}",
×
389
            task.blob_id_, bq_info.load_.load())
390
      continue;
633✔
391
    }
392
    hipc::pair<BlobId, BlobInfo>& blob_info_p = *iter;
3,742✔
393
    BlobInfo &blob_info = blob_info_p.GetSecond();
3,742✔
394
    std::string blob_name = blob_info.name_->str();
6,851✔
395

396
    // Verify that flush is needing to happen
397
    if (blob_info.mod_count_ == blob_info.last_flush_) {
3,742✔
398
      bq_info.load_.fetch_sub(task.blob_size_);
633✔
399
      HILOG(kDebug, "Finished BORG task for blob {} and load {}",
633✔
400
            task.blob_id_, bq_info.load_.load())
401
      continue;
1,266✔
402
    }
403
    size_t last_flush = blob_info.mod_count_;
3,109✔
404
    blob_info.last_flush_ = last_flush;
3,109✔
405

406
    // Get the current blob from Hermes
407
    api::Bucket bkt = HERMES->GetBucket(task.bkt_id_);
6,218✔
408
    bkt.Get(task.blob_id_, blob, bkt.GetContext());
3,109✔
409
    HILOG(kDebug, "Flushing blob {} ({}) of size {}",
3,109✔
410
          blob_name,
411
          task.blob_id_,
412
          blob.size())
413
    FlushTraitParams trait_params;
6,218✔
414
    for (Trait *trait : task.traits_) {
6,218✔
415
      trait_params.blob_ = &blob;
3,109✔
416
      trait_params.blob_name_ = blob_name;
3,109✔
417
      trait_params.bkt_ = &bkt;
3,109✔
418
      trait->Run(HERMES_TRAIT_FLUSH, &trait_params);
3,109✔
419
    }
420

421
    // Dequeue
422
    bq_info.load_.fetch_sub(task.blob_size_);
3,109✔
423
    HILOG(kDebug, "Finished BORG task for blob {} and load {}",
3,109✔
424
          task.blob_id_, bq_info.load_.load())
425
  }
426
}
14,037✔
427

428
/** Barrier for all flushing to complete */
429
void BufferOrganizer::LocalWaitForFullFlush() {
3,304✔
430
  HILOG(kInfo, "Full synchronous flush on node {}", rpc_->node_id_)
3,304✔
431
  LocalEnqueueFlushes();
3,304✔
432
  HERMES_BORG_IO_THREAD_MANAGER->WaitForFlush();
3,304✔
433
  HILOG(kInfo, "Finished synchronous flush on node {}", rpc_->node_id_)
3,304✔
434
}
3,304✔
435

436
/** Barrier for all I/O in Hermes to flush */
437
void BufferOrganizer::GlobalWaitForFullFlush() {
3,304✔
438
  for (int i = 0; i < (int)rpc_->hosts_.size(); ++i) {
6,608✔
439
    int node_id = i + 1;
3,304✔
440
    HILOG(kInfo, "Wait for flush on node {}", node_id)
3,304✔
441
    rpc_->Call<bool>(node_id, "RpcWaitForFullFlush");
3,304✔
442
  }
443
}
3,304✔
444

445
}  // namespace hermes
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc