• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

STEllAR-GROUP / hpx / #882

31 Aug 2023 07:44PM UTC coverage: 41.798% (-44.7%) from 86.546%
#882

push

19442 of 46514 relevant lines covered (41.8%)

126375.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/libs/full/checkpoint/examples/1d_stencil_4_checkpoint.cpp
1
//  Copyright (c) 2014-2025 Hartmut Kaiser
2
//  Copyright (c) 2014 Patricia Grubel
3
//  Copyright (c) 2018 Adrian Serio
4
//
5
//  SPDX-License-Identifier: BSL-1.0
6
//  Distributed under the Boost Software License, Version 1.0. (See accompanying
7
//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
8

9
// This is the fourth in a series of examples demonstrating the development of a
10
// fully distributed solver for a simple 1D heat distribution problem.
11
//
12
// This example builds on example three. It futurizes the code from that
13
// example. Compared to example two this code runs much more efficiently. It
14
// allows for changing the amount of work executed in one HPX thread which
15
// enables tuning the performance for the optimal grain size of the computation.
16
// This example is still fully local but demonstrates nice scalability on SMP
17
// machines.
18
//
19
// In this variation of stencil we use the save_checkpoint and
20
// restore_checkpoint functions to back up the state of the application every n
21
// time steps.
22
//
23

24
#include <hpx/hpx.hpp>
25
#include <hpx/hpx_init.hpp>
26

27
#include <hpx/algorithm.hpp>
28
#include <hpx/execution.hpp>
29
#include <hpx/modules/checkpoint.hpp>
30
#include <hpx/modules/iterator_support.hpp>
31
#include <hpx/modules/serialization.hpp>
32

33
#include <cstddef>
34
#include <cstdint>
35
#include <fstream>
36
#include <iostream>
37
#include <memory>
38
#include <utility>
39
#include <vector>
40

41
#include "print_time_results.hpp"
42

43
///////////////////////////////////////////////////////////////////////////////
44
// Command-line variables
45
bool header = true;    // print csv heading
46
double k = 0.5;        // heat transfer coefficient
47
double dt = 1.;        // time step
48
double dx = 1.;        // grid spacing
49

50
inline std::size_t idx(std::size_t i, int dir, std::size_t size)
51
{
52
    if (i == 0 && dir == -1)
×
53
        return size - 1;
54
    if (i == size - 1 && dir == +1)
×
55
        return 0;
56

57
    HPX_ASSERT((i + dir) < size);
58

59
    return i + dir;
×
60
}
61

62
///////////////////////////////////////////////////////////////////////////////
63
// Our partition data type
64
struct partition_data
×
65
{
66
private:
67
    using buffer_type = hpx::serialization::serialize_buffer<double>;
68

69
public:
70
    partition_data() = default;
71

72
    explicit partition_data(std::size_t size)
×
73
      : data_(new double[size], size, buffer_type::take)
×
74
      , size_(size)
×
75
    {
76
    }
×
77

78
    partition_data(std::size_t size, double initial_value)
×
79
      : data_(new double[size], size, buffer_type::take)
×
80
      , size_(size)
×
81
    {
82
        double const base_value = initial_value * static_cast<double>(size);
×
83
        for (std::size_t i = 0; i != size; ++i)
×
84
            data_[i] = base_value + static_cast<double>(i);
×
85
    }
×
86

87
    partition_data(partition_data const& old_part)
×
88
      : data_(new double[old_part.size()], old_part.size(), buffer_type::take)
×
89
      , size_(old_part.size())
90
    {
×
91
        for (std::size_t i = 0; i < old_part.size(); i++)
92
        {
×
93
            data_[i] = old_part[i];
94
        }
×
95
    }
96

×
97
    double& operator[](std::size_t idx)
98
    {
99
        return data_[idx];
100
    }
101
    double operator[](std::size_t idx) const
102
    {
103
        return data_[idx];
104
    }
×
105

106
    std::size_t size() const
107
    {
108
        return size_;
109
    }
×
110

111
private:
112
    buffer_type data_;
113
    std::size_t size_ = 0;
114

115
    // Serialization Definitions
116
    friend class hpx::serialization::access;
117
    template <typename Volume>
118
    void serialize(Volume& vol, unsigned int const)
119
    {
×
120
        // clang-format off
121
        vol & data_ & size_;
122
        // clang-format on
×
123
    }
124
};
×
125

126
std::ostream& operator<<(std::ostream& os, partition_data const& c)
127
{
×
128
    os << "{";
129
    for (std::size_t i = 0; i != c.size(); ++i)
×
130
    {
×
131
        if (i != 0)
132
            os << ", ";
×
133
        os << c[i];
×
134
    }
135
    os << "}";
136
    return os;
×
137
}
×
138

139
///////////////////////////////////////////////////////////////////////////////
140
// Checkpoint Function
141

142
struct backup
143
{
144
    std::vector<hpx::util::checkpoint> bin;
145
    std::string file_name_;
146

147
    backup(std::string const& file_name, std::size_t np)
148
      : bin(np)
×
149
      , file_name_(file_name)
×
150
    {
×
151
    }
152
    backup(backup&& old) noexcept
×
153
      : bin(std::move(old.bin))
154
      , file_name_(std::move(old.file_name_))
×
155
    {
×
156
    }
157
    ~backup() = default;
158

×
159
    void save(partition_data const& status, std::size_t index)
160
    {
×
161
        bin[index] = hpx::util::save_checkpoint(hpx::launch::sync, status);
162
    }
×
163

×
164
    void write()
165
    {
×
166
        hpx::util::checkpoint const archive_data =
167
            hpx::util::save_checkpoint(hpx::launch::sync, bin);
168

×
169
        // Make sure file stream is binary for Windows/Mac machines
170
        std::ofstream file_archive(
171
            file_name_, std::ios::binary | std::ios::out);
172
        if (file_archive.is_open())
×
173
        {
×
174
            file_archive << archive_data;
175
        }
×
176
        else
177
        {
178
            std::cout << "Error opening file!" << std::endl;
179
        }
180
    }
181

×
182
    void revive(std::vector<std::vector<hpx::shared_future<partition_data>>>& U,
183
        std::size_t nx)
×
184
    {
185
        hpx::util::checkpoint temp_archive;
186
        // Make sure file stream is binary for Windows/Mac machines
×
187
        std::ifstream ist(file_name_, std::ios::binary | std::ios::in);
188
        ist >> temp_archive;
×
189
        hpx::util::restore_checkpoint(temp_archive, bin);
×
190
        for (std::size_t i = 0; i < U[0].size(); i++)
×
191
        {
×
192
            partition_data temp(nx, static_cast<double>(i));
193
            hpx::util::restore_checkpoint(bin[i], temp);
×
194
            //Check
195
            for (std::size_t e = 0; e < temp.size(); e++)
196
            {
×
197
                std::cout << temp[e] << ", ";
198
            }
×
199
            std::cout << std::endl;
200
            U[0][i] = hpx::make_ready_future(temp);
201
        }
×
202
    }
203
};
×
204

205
void print(
206
    std::vector<std::vector<hpx::shared_future<partition_data>>> const& U)
×
207
{
208
    for (std::size_t out = 0; out < U[0].size(); out++)
209
    {
×
210
        partition_data print_buff(U[0][out].get());
211
        for (std::size_t inner = 0; inner < print_buff.size(); inner++)
×
212
        {
×
213
            std::cout << print_buff[inner] << ", ";
214
            if (inner % 9 == 0 && inner != 0)
×
215
                std::cout << std::endl;
×
216
        }
217
    }
218
    std::cout << std::endl;
219
}
220
void print_space(std::vector<hpx::shared_future<partition_data>> const& next)
×
221
{
×
222
    for (std::size_t out = 0; out < next.size(); out++)
223
    {
×
224
        partition_data print_buff(next[out].get());
225
        for (std::size_t inner = 0; inner < print_buff.size(); inner++)
×
226
        {
×
227
            std::cout << print_buff[inner] << ", ";
228
            if (inner % 9 == 0 && inner != 0)
×
229
                std::cout << std::endl;
×
230
        }
231
    }
232
    std::cout << std::endl;
233
}
234

×
235
///////////////////////////////////////////////////////////////////////////////
236
struct stepper
237
{
238
    // Our data for one time step
239
    using partition = hpx::shared_future<partition_data>;
240
    using space = std::vector<partition>;
241

242
    // Our operator
243
    static double heat(double left, double middle, double right)
244
    {
245
        return middle + (k * dt / (dx * dx)) * (left - 2 * middle + right);
246
    }
×
247

248
    // The partitioned operator, it invokes the heat operator above on all
249
    // elements of a partition.
250
    static partition_data heat_part(partition_data const& left,
251
        partition_data const& middle, partition_data const& right)
×
252
    {
253
        std::size_t const size = middle.size();
254
        partition_data next(size);
255

×
256
        next[0] = heat(left[size - 1], middle[0], middle[1]);
257

×
258
        for (std::size_t i = 1; i != size - 1; ++i)
259
        {
×
260
            next[i] = heat(middle[i - 1], middle[i], middle[i + 1]);
261
        }
×
262

263
        next[size - 1] = heat(middle[size - 2], middle[size - 1], right[0]);
264

×
265
        return next;
266
    }
×
267

268
    // do all the work on 'np' partitions, 'nx' data points each, for 'nt'
269
    // time steps, limit depth of dependency tree to 'nd'
270
    static hpx::future<space> do_work(std::size_t np, std::size_t nx,
271
        std::size_t nt, std::uint64_t nd, std::uint64_t cp, std::string rsf,
×
272
        std::string fn)
273
    {
274
        using hpx::dataflow;
275
        using hpx::unwrapping;
276

277
        // Set up Check-pointing
278
        std::size_t num_c = nt / cp;    // Number of checkpoints to be made
279
        std::cout << "Number of checkpoints to be made: " << num_c << std::endl;
×
280
        std::vector<std::string> v_file_names(num_c, fn);
281
        std::vector<backup> container;
×
282

×
283
        // Initialize checkpoint file names
284
        for (std::size_t i = 0; i < num_c; i++)
285
        {
×
286
            v_file_names[i] =
287
                v_file_names[i] + "_" + std::to_string((i + 1) * cp);
288
            container.push_back(backup(v_file_names[i], np));
×
289
        }
×
290

291
        // Container to wait on all held futures
292
        std::vector<hpx::future<void>> backup_complete;
293

×
294
        // U[t][i] is the state of position i at time t.
295
        std::vector<space> U(2);
296
        for (space& s : U)
×
297
            s.resize(np);
×
298

×
299
        // Initial conditions: f(0, i) = i
300
        auto range = hpx::util::counting_shape(np);
301
        using hpx::execution::par;
302
        hpx::ranges::for_each(par, range, [&U, nx](std::size_t i) {
303
            U[0][i] = hpx::make_ready_future(
×
304
                partition_data(nx, static_cast<double>(i)));
×
305
        });
×
306

×
307
        //Initialize from backup
308
        if (rsf != "")
309
        {
×
310
            backup restart(rsf, np);
311
            restart.revive(U, nx);
×
312
        }
×
313

314
        //Check
315
        std::cout << "Initialization Check" << std::endl;
316
        print(U);
317

×
318
        // limit depth of dependency tree
319
        auto sem = std::make_shared<hpx::sliding_semaphore>(nd);
320

321
        auto Op = unwrapping(&stepper::heat_part);
322

×
323
        // Actual time step loop
324
        for (std::size_t t = 0; t != nt; ++t)
325
        {
×
326
            space const& current = U[t % 2];
327
            space& next = U[(t + 1) % 2];
×
328

×
329
            for (std::size_t i = 0; i != np; ++i)
330
            {
×
331
                next[i] =
332
                    dataflow(hpx::launch::async, Op, current[idx(i, -1, np)],
333
                        current[i], current[idx(i, +1, np)]);
×
334

×
335
                //Checkpoint
336
                if (t % cp == 0 && t != 0)
337
                {
×
338
                    next[i] =
339
                        next[i].then([&container, i, t, cp](partition&& p) {
340
                            partition_data value(p.get());
×
341
                            container[(t / cp) - 1].save(value, i);
×
342
                            partition f_value = hpx::make_ready_future(value);
×
343
                            return f_value;
×
344
                        });
×
345
                }
×
346
            }
347

348
            //Print Checkpoint to file
349
            if (t % cp == 0 && t != 0)
350
            {
×
351
                hpx::future<void> f_print = hpx::when_all(next).then(
352
                    [&container, t, cp](hpx::future<space>&&) {
×
353
                        container[(t / cp) - 1].write();
×
354
                    });
×
355
                backup_complete.push_back(std::move(f_print));
×
356
            }
357

358
            //Check
359
            if (t % cp == 0 && t != 0)
360
            {
×
361
                std::cout << "Checkpoint Check:" << std::endl;
362
                print_space(next);
363
            }
×
364

365
            // every nd time steps, attach additional continuation which will
366
            // trigger the semaphore once computation has reached this point
367
            if ((t % nd) == 0)
368
            {
×
369
                next[0].then([sem, t](partition&&) {
370
                    // inform semaphore about new lower limit
×
371
                    sem->signal(static_cast<std::int64_t>(t));
372
                });
×
373
            }
374

375
            // suspend if the tree has become too deep, the continuation above
376
            // will resume this thread once the computation has caught up
377
            sem->wait(static_cast<std::int64_t>(t));
378
        }
×
379

380
        // Wait on Checkpoint Printing
381
        hpx::wait_all(backup_complete);
382

383
        //Begin Test
384
        //Create a new test vector and resize it
385
        std::vector<space> Z(2);
386
        for (space& y : Z)
×
387
        {
×
388
            y.resize(np);
389
        }
×
390

391
        backup test(v_file_names[0], np);
392
        std::cout << std::endl;
×
393
        std::cout << "Revive Check:" << std::endl;
394
        test.revive(Z, nx);
395
        std::cout << std::endl;
×
396
        //End Test
397

398
        // Return the solution at time-step 'nt'.
399
        return hpx::when_all(U[nt % 2]);
400
    }
×
401
};
×
402

403
///////////////////////////////////////////////////////////////////////////////
404
int hpx_main(hpx::program_options::variables_map& vm)
405
{
×
406
    std::uint64_t const np =
407
        vm["np"].as<std::uint64_t>();    // Number of partitions.
408
    std::uint64_t const nx =
×
409
        vm["nx"].as<std::uint64_t>();    // Number of grid points.
410
    std::uint64_t const nt =
×
411
        vm["nt"].as<std::uint64_t>();    // Number of steps.
412
    std::uint64_t const nd =
×
413
        vm["nd"].as<std::uint64_t>();    // Max depth of dep tree.
414
    std::uint64_t const cp =
×
415
        vm["cp"].as<std::uint64_t>();    // Num. steps to checkpoint
416
    std::string const rsf = vm["restart-file"].as<std::string>();
×
417
    std::string const fn = vm["output-file"].as<std::string>();
×
418

×
419
    if (vm.count("no-header"))
420
        header = false;
×
421

×
422
    // Create the stepper object
423
    stepper step;
424

425
    // Measure execution time.
426
    std::uint64_t const t = hpx::chrono::high_resolution_clock::now();
427

428
    // Execute nt time steps on nx grid points and print the final solution.
429
    hpx::future<stepper::space> result =
430
        step.do_work(np, nx, nt, nd, cp, rsf, fn);
431

×
432
    stepper::space solution = result.get();
433
    hpx::wait_all(solution);
×
434

435
    std::uint64_t const elapsed = hpx::chrono::high_resolution_clock::now() - t;
436

×
437
    // Print the final solution
438
    if (vm.count("results"))
439
    {
×
440
        for (std::size_t i = 0; i != np; ++i)
441
            std::cout << "U[" << i << "] = " << solution[i].get() << std::endl;
×
442
    }
×
443

444
    std::uint64_t const os_thread_count = hpx::get_os_thread_count();
445
    print_time_results(os_thread_count, elapsed, nx, np, nt, header);
×
446

×
447
    return hpx::finalize();
448
}
×
449

×
450
int main(int argc, char* argv[])
451
{
×
452
    using namespace hpx::program_options;
453

454
    // Configure application-specific options.
455
    options_description desc_commandline;
456

×
457
    desc_commandline.add_options()(
458
        "results", "print generated results (default: false)")("nx",
×
459
        value<std::uint64_t>()->default_value(10),
×
460
        "Local x dimension (of each partition)")("nt",
×
461
        value<std::uint64_t>()->default_value(45),
×
462
        "Number of time steps")("nd", value<std::uint64_t>()->default_value(10),
×
463
        "Number of time steps to allow the dependency tree to grow to")("np",
×
464
        value<std::uint64_t>()->default_value(10),
×
465
        "Number of partitions")("k", value<double>(&k)->default_value(0.5),
×
466
        "Heat transfer coefficient (default: 0.5)")("dt",
×
467
        value<double>(&dt)->default_value(1.0),
×
468
        "Timestep unit (default: 1.0[s])")(
×
469
        "dx", value<double>(&dx)->default_value(1.0), "Local x dimension")("cp",
×
470
        value<std::uint64_t>()->default_value(44),
×
471
        "Number of steps to checkpoint")(
×
472
        "no-header", "do not print out the csv header row")("restart-file",
×
473
        value<std::string>()->default_value(""),
×
474
        "Start application from restart file")("output-file",
×
475
        value<std::string>()->default_value("1d.archive"),
×
476
        "Base name of archive file");
×
477

478
    // Initialize and run HPX
479
    hpx::init_params init_args;
480
    init_args.desc_cmdline = desc_commandline;
×
481

×
482
    return hpx::init(argc, argv, init_args);
483
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc