• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer / 28558654667

02 Jul 2026 01:18AM UTC coverage: 18.466%. First build
28558654667

Pull #366

github

web-flow
Merge 93c85619d into 1e1d153d0
Pull Request #366: fixed we do not do multi process lock.

7276 of 53576 branches covered (13.58%)

Branch coverage included in aggregate %.

25 of 35 new or added lines in 2 files covered. (71.43%)

4792 of 11778 relevant lines covered (40.69%)

1109.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.89
/src/dftracer/core/buffer/buffer.cpp
1
#include <dftracer/core/buffer/buffer.h>
2
template <>
3
std::shared_ptr<dftracer::BufferManager>
4
    dftracer::Singleton<dftracer::BufferManager>::instance = nullptr;
5
template <>
6
bool dftracer::Singleton<dftracer::BufferManager>::stop_creating_instances =
7
    false;
8
namespace dftracer {
9

10
void BufferManager::compress_and_write_if_needed(size_t size, bool force) {
2,169✔
11
  if (force || buffer_pos + size > this->config->write_buffer_size) {
2,169!
12
    // On forced flush, serialize any remaining aggregated data
13
    if (force && this->config->aggregation_enable) {
64!
NEW
14
      auto data = dftracer::AggregatedDataType();
×
NEW
15
      bool has_data = this->aggregator->get_previous_aggregations(data, false);
×
NEW
16
      if (has_data) {
×
NEW
17
        size_t agg_size = this->serializer->aggregated(
×
NEW
18
            buffer + buffer_pos + size, 0, rank, data);
×
NEW
19
        size += agg_size;
×
20
      }
NEW
21
    }
×
22

23
    if (this->config->compression) {
64!
24
      size = this->compressor->compress(buffer, buffer_pos + size);
64✔
25
      DFTRACER_LOG_DEBUG(
55✔
26
          "BufferManager.compress_and_write_if_needed compressed size %zu "
27
          "bytes",
28
          size);
29
    } else {
NEW
30
      size = buffer_pos + size;
×
31
    }
32
    if (size > 0) {
64!
33
      size = this->writer->write(buffer, size, true);
64✔
34
      DFTRACER_LOG_DEBUG(
55✔
35
          "BufferManager.compress_and_write_if_needed wrote %zu bytes", size);
36
    }
37
    buffer_pos = 0;
64✔
38
  } else {
39
    buffer_pos += size;
2,105✔
40
    DFTRACER_LOG_DEBUG(
1,575✔
41
        "BufferManager.compress_and_write_if_needed buffer_pos %zu not writing",
42
        buffer_pos);
43
  }
44
}
2,169✔
45
int BufferManager::initialize(const char* filename, HashType hostname_hash) {
64✔
46
  DFTRACER_LOG_DEBUG("BufferManager.initialize %s %d", filename, hostname_hash);
55✔
47
  this->config =
48
      dftracer::Singleton<dftracer::ConfigurationManager>::get_instance();
64✔
49
  if (buffer == nullptr) {
64!
50
    buffer = (char*)malloc(this->config->write_buffer_size + 16 * 1024);
64✔
51
  }
52
  buffer_pos = 0;
64✔
53
  if (!buffer) {
64!
54
    DFTRACER_LOG_ERROR("BufferManager.BufferManager Failed to allocate buffer");
×
55
  }
56
  this->writer = dftracer::Singleton<dftracer::STDIOWriter>::get_instance();
64✔
57
  this->writer->initialize(filename);
64✔
58
  this->serializer = dftracer::Singleton<dftracer::JsonLines>::get_instance();
64✔
59
  this->aggregator = dftracer::Singleton<dftracer::Aggregator>::get_instance();
64✔
60
  if (this->config->compression) {
64!
61
    this->compressor =
62
        dftracer::Singleton<dftracer::ZlibCompression>::get_instance();
64✔
63
    this->compressor->initialize(this->config->write_buffer_size);
64✔
64
  }
65
  size_t size = this->serializer->initialize(buffer, hostname_hash);
64✔
66
  compress_and_write_if_needed(size);
64✔
67
  return 0;
64✔
68
}
69

70
int BufferManager::finalize(int index, ProcessID process_id, bool end_sym) {
64✔
71
  std::unique_lock<std::shared_mutex> lock(mtx);
64✔
72
  if (buffer) {
64!
73
    size_t size = 0;
64✔
74
    if (this->config->aggregation_enable) {
64!
75
      auto data = dftracer::AggregatedDataType();
×
76
      this->aggregator->get_previous_aggregations(data, true);
×
77
      size = this->serializer->aggregated(buffer + buffer_pos, index,
×
78
                                          process_id, data);
79
      this->aggregator->finalize();
×
80
    }
×
81
    auto end_size =
82
        this->serializer->finalize(buffer + buffer_pos + size, end_sym);
64✔
83
    compress_and_write_if_needed(size + end_size, true);
64✔
84

85
    if (this->config->compression) this->compressor->finalize();
64!
86
    this->writer->finalize(index);
64✔
87
    free(buffer);
64✔
88
    buffer = nullptr;
64✔
89
    buffer_pos = 0;
64✔
90
  }
91
  return 0;
64✔
92
}
64✔
93

94
void BufferManager::log_data_event(int index, ConstEventNameType event_name,
1,430✔
95
                                   ConstEventNameType category,
96
                                   TimeResolution start_time,
97
                                   TimeResolution duration,
98
                                   dftracer::Metadata* metadata,
99
                                   ProcessID process_id, ThreadID tid) {
100
  std::unique_lock<std::shared_mutex> lock(mtx);
1,430✔
101
  DFTRACER_LOG_DEBUG("BufferManager.log_data_event %d", index);
1,065✔
102
  size_t size = 0;
1,430✔
103
  bool enable_tracing = true;
1,430✔
104
  if (this->config->aggregation_enable && strcmp(category, "dftracer") != 0) {
1,430!
105
    enable_tracing = false;
×
106
    auto aggregated_key =
107
        AggregatedKey{category, event_name, start_time,     duration,
108
                      tid,      metadata,   get_app_name(), &rank};
×
109
    if (this->config->aggregation_type ==
×
110
        AggregationType::AGGREGATION_TYPE_SELECTIVE) {
111
      enable_tracing = !this->aggregator->should_aggregate(&aggregated_key);
×
112
    }
113
    if (!enable_tracing) {
×
114
      // Accumulate data; returns true when time interval changes
115
      bool interval_changed = this->aggregator->aggregate(aggregated_key);
×
116
      // Serialize aggregated data when moving to new time interval
117
      if (interval_changed) {
×
118
        auto data = dftracer::AggregatedDataType();
×
119
        this->aggregator->get_previous_aggregations(data, false);
×
120
        if (!data.empty()) {
×
121
          size = this->serializer->aggregated(buffer + buffer_pos, index,
×
122
                                              process_id, data);
123

124
          DFTRACER_LOG_DEBUG(
×
125
              "BufferManager.log_data_event serialized aggregated size %zu "
126
              "bytes",
127
              size);
128
        }
129
      }
×
130
    }
131
  }
×
132
  if (enable_tracing) {
1,430!
133
    size =
134
        this->serializer->data(buffer + buffer_pos, index, event_name, category,
1,430✔
135
                               start_time, duration, metadata, process_id, tid);
136
    DFTRACER_LOG_DEBUG(
1,065✔
137
        "BufferManager.log_data_event serialized tracing size %zu bytes", size);
138
  }
139
  compress_and_write_if_needed(size);
1,430✔
140
}
1,430✔
141

142
void BufferManager::log_counter_event(int index, ConstEventNameType name,
112✔
143
                                      ConstEventNameType category,
144
                                      TimeResolution start_time,
145
                                      ProcessID process_id, ThreadID thread_id,
146
                                      dftracer::Metadata* metadata) {
147
  std::unique_lock<std::shared_mutex> lock(mtx);
112!
148
  DFTRACER_LOG_DEBUG("BufferManager.log_counter_event %d", index);
×
149
  size_t size =
150
      this->serializer->counter(buffer + buffer_pos, index, name, category,
112!
151
                                start_time, process_id, thread_id, metadata);
152
  compress_and_write_if_needed(size);
112!
153
}
112✔
154

155
void BufferManager::log_metadata_event(ConstEventNameType name,
499✔
156
                                       ConstEventNameType value,
157
                                       ConstEventNameType ph,
158
                                       ProcessID process_id, ThreadID tid,
159
                                       bool is_string) {
160
  std::unique_lock<std::shared_mutex> lock(mtx);
499✔
161
  DFTRACER_LOG_DEBUG("BufferManager.log_metadata_event %s", value);
455✔
162
  size_t size = this->serializer->metadata(buffer + buffer_pos, name, value, ph,
499✔
163
                                           process_id, tid, is_string);
164
  compress_and_write_if_needed(size);
499✔
165
}
499✔
166
}  // namespace dftracer
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc