• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

payu-org / payu / 6489602581

12 Oct 2023 12:25AM UTC coverage: 45.573% (+2.8%) from 42.772%
6489602581

push

github

web-flow
Merge pull request #363 from jo-basevi/358-date-based-frequency

Add support for date-based restart frequency

111 of 147 new or added lines in 10 files covered. (75.51%)

2 existing lines in 1 file now uncovered.

1580 of 3467 relevant lines covered (45.57%)

1.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

43.92
/payu/experiment.py
1
"""payu.experiment ===============
2

3
   Interface to an individual experiment managed by payu
4

5
   :copyright: Copyright 2011 Marshall Ward, see AUTHORS for details.
6
   :license: Apache License, Version 2.0, see LICENSE for details.
7
"""
8

9
from __future__ import print_function
3✔
10

11
# Standard Library
12
import datetime
3✔
13
import errno
3✔
14
import getpass
3✔
15
import os
3✔
16
import re
3✔
17
import resource
3✔
18
import sys
3✔
19
import shlex
3✔
20
import shutil
3✔
21
import subprocess as sp
3✔
22
import sysconfig
3✔
23

24
# Extensions
25
import yaml
3✔
26

27
# Local
28
from payu import envmod
3✔
29
from payu.fsops import mkdir_p, make_symlink, read_config, movetree
3✔
30
from payu.schedulers.pbs import get_job_info, pbs_env_init, get_job_id
3✔
31
from payu.models import index as model_index
3✔
32
import payu.profilers
3✔
33
from payu.runlog import Runlog
3✔
34
from payu.manifest import Manifest
3✔
35
from payu.calendar import parse_date_offset
3✔
36

37
# Environment module support on vayu
38
# TODO: To be removed
39
core_modules = ['python', 'payu']
3✔
40

41
# Default payu parameters
42
default_archive_url = 'dc.nci.org.au'
3✔
43
default_restart_freq = 5
3✔
44

45

46
class Experiment(object):
3✔
47

48
    def __init__(self, lab, reproduce=False, force=False):
3✔
49
        self.lab = lab
3✔
50

51
        if not force:
3✔
52
            # check environment for force flag under PBS
53
            self.force = os.environ.get('PAYU_FORCE', False)
3✔
54
        else:
55
            self.force = force
3✔
56

57
        self.start_time = datetime.datetime.now()
3✔
58

59
        # TODO: replace with dict, check versions via key-value pairs
60
        self.modules = set()
3✔
61

62
        # TODO: __init__ should not be a config dumping ground!
63
        self.config = read_config()
3✔
64

65
        # Payu experiment type
66
        self.debug = self.config.get('debug', False)
3✔
67
        self.postscript = self.config.get('postscript')
3✔
68
        self.repeat_run = self.config.get('repeat', False)
3✔
69

70
        # Configuration
71
        self.expand_shell_vars = True   # TODO: configurable
3✔
72

73
        # Model run time
74
        self.runtime = None
3✔
75
        if ('calendar' in self.config and
3✔
76
                'runtime' in self.config['calendar']):
77
            self.runtime = self.config['calendar']['runtime']
×
78

79
        # Stacksize
80
        # NOTE: Possible PBS issue in setting non-unlimited stacksizes
81
        stacksize = self.config.get('stacksize', 'unlimited')
3✔
82
        self.set_stacksize(stacksize)
3✔
83

84
        # Initialize the submodels
85
        self.init_models()
3✔
86

87
        # TODO: Move to run/collate/sweep?
88
        self.set_expt_pathnames()
3✔
89
        self.set_counters()
3✔
90

91
        for model in self.models:
3✔
92
            model.set_input_paths()
3✔
93

94
        self.set_output_paths()
3✔
95

96
        if not reproduce:
3✔
97
            # check environment for reproduce flag under PBS
98
            reproduce = os.environ.get('PAYU_REPRODUCE', False)
3✔
99

100
        # Initialize manifest
101
        self.manifest = Manifest(self.config.get('manifest', {}),
3✔
102
                                 reproduce=reproduce)
103

104
        # Miscellaneous configurations
105
        # TODO: Move this stuff somewhere else
106
        self.userscripts = self.config.get('userscripts', {})
3✔
107

108
        self.profilers = []
3✔
109

110
        init_script = self.userscripts.get('init')
3✔
111
        if init_script:
3✔
112
            self.run_userscript(init_script)
×
113

114
        self.runlog = Runlog(self)
3✔
115

116
        # XXX: Temporary spot for the payu path
117
        #      This is horrible; payu/cli.py does this much more safely!
118
        #      But also does not even store it in os.environ!
119
        default_payu_bin = os.path.dirname(sys.argv[0])
3✔
120
        payu_bin = os.environ.get('PAYU_PATH', default_payu_bin)
3✔
121

122
        self.payu_path = os.path.join(payu_bin, 'payu')
3✔
123

124
        self.run_id = None
3✔
125

126
    def init_models(self):
3✔
127

128
        self.model_name = self.config.get('model')
3✔
129
        assert self.model_name
3✔
130

131
        model_fields = ['model', 'exe', 'input', 'ncpus', 'npernode', 'build',
3✔
132
                        'mpthreads', 'exe_prefix']
133

134
        # XXX: Temporarily adding this to model config...
135
        model_fields += ['mask']
3✔
136

137
        # TODO: Rename this to self.submodels
138
        self.models = []
3✔
139

140
        submodels = self.config.get('submodels', [])
3✔
141

142
        submodel_config = dict((f, self.config[f]) for f in model_fields
3✔
143
                               if f in self.config)
144
        submodel_config['name'] = self.model_name
3✔
145

146
        submodels.append(submodel_config)
3✔
147

148
        for m_config in submodels:
3✔
149
            ModelType = model_index[m_config['model']]
3✔
150
            self.models.append(ModelType(self, m_config['name'], m_config))
3✔
151

152
        # Load the top-level model
153
        if self.model_name:
3✔
154
            ModelType = model_index[self.model_name]
3✔
155
            model_config = dict((f, self.config[f]) for f in model_fields
3✔
156
                                if f in self.config)
157
            self.model = ModelType(self, self.model_name, model_config)
3✔
158
            self.model.top_level_model = True
3✔
159
        else:
160
            self.model = None
×
161

162
    def set_counters(self):
3✔
163
        # Assume that ``set_paths`` has already been called
164
        assert self.archive_path
3✔
165

166
        current_counter = os.environ.get('PAYU_CURRENT_RUN')
3✔
167
        if current_counter:
3✔
168
            self.counter = int(current_counter)
×
169
        else:
170
            self.counter = None
3✔
171

172
        self.n_runs = int(os.environ.get('PAYU_N_RUNS', 1))
3✔
173

174
        # Initialize counter if unset
175
        if self.counter is None:
3✔
176
            # Check for restart index
177
            max_restart_index = self.max_output_index(output_type="restart")
3✔
178
            if max_restart_index is not None:
3✔
179
                self.counter = 1 + max_restart_index
3✔
180
            else:
181
                # Now look for output directories,
182
                # as repeat runs do not generate restart files.
183
                max_output_index = self.max_output_index()
3✔
184
                if max_output_index is not None:
3✔
185
                    self.counter = 1 + max_output_index
×
186
                else:
187
                    self.counter = 0
3✔
188

189
    def max_output_index(self, output_type="output"):
3✔
190
        """Given a output directory type (output or restart),
191
        return the maximum index of output directories found"""
192
        try:
3✔
193
            output_dirs = self.list_output_dirs(output_type)
3✔
194
        except EnvironmentError as exc:
3✔
195
            if exc.errno == errno.ENOENT:
3✔
196
                output_dirs = None
3✔
197
            else:
198
                raise
×
199

200
        if output_dirs and len(output_dirs):
3✔
201
            return int(output_dirs[-1].lstrip(output_type))
3✔
202

203
    def list_output_dirs(self, output_type="output"):
3✔
204
        """Return a sorted list of restart or output directories in archive"""
205
        naming_pattern = re.compile(fr"^{output_type}[0-9][0-9][0-9]+$")
3✔
206
        dirs = [d for d in os.listdir(self.archive_path)
3✔
207
                if naming_pattern.match(d)]
208
        dirs.sort(key=lambda d: int(d.lstrip(output_type)))
3✔
209
        return dirs 
3✔
210

211
    def set_stacksize(self, stacksize):
3✔
212

213
        if stacksize == 'unlimited':
3✔
214
            stacksize = resource.RLIM_INFINITY
3✔
215
        else:
216
            assert type(stacksize) is int
×
217

218
        resource.setrlimit(resource.RLIMIT_STACK,
3✔
219
                           (stacksize, resource.RLIM_INFINITY))
220

221
    def load_modules(self):
3✔
222
        # NOTE: This function is increasingly irrelevant, and may be removable.
223

224
        # Scheduler
225
        sched_modname = self.config.get('scheduler', 'pbs')
×
226
        self.modules.add(sched_modname)
×
227

228
        # MPI library
229
        mpi_config = self.config.get('mpi', {})
×
230

231
        # Assign MPI module paths
232
        mpi_modpath = mpi_config.get('modulepath', None)
×
233
        if mpi_modpath:
×
234
            envmod.module('use', mpi_modpath)
×
235

236
        mpi_modname = mpi_config.get('module', 'openmpi')
×
237
        self.modules.add(mpi_modname)
×
238

239
        # Unload non-essential modules
240
        loaded_mods = os.environ.get('LOADEDMODULES', '').split(':')
×
241

242
        for mod in loaded_mods:
×
243
            if len(mod) > 0:
×
244
                print('mod '+mod)
×
245
                mod_base = mod.split('/')[0]
×
246
                if mod_base not in core_modules:
×
247
                    envmod.module('unload', mod)
×
248

249
        # Now load model-dependent modules
250
        for mod in self.modules:
×
251
            envmod.module('load', mod)
×
252

253
        # User-defined modules
254
        user_modules = self.config.get('modules', {}).get('load', [])
×
255
        for mod in user_modules:
×
256
            envmod.module('load', mod)
×
257

258
        envmod.module('list')
×
259

260
        for prof in self.profilers:
×
261
            prof.load_modules()
×
262

263
        # TODO: Consolidate this profiling stuff
264
        c_ipm = self.config.get('ipm', False)
×
265
        if c_ipm:
×
266
            if isinstance(c_ipm, str):
×
267
                ipm_mod = os.path.join('ipm', c_ipm)
×
268
            else:
269
                ipm_mod = 'ipm/2.0.2'
×
270

271
            envmod.module('load', ipm_mod)
×
272
            os.environ['IPM_LOGDIR'] = self.work_path
×
273

274
        if self.config.get('mpiP', False):
×
275
            envmod.module('load', 'mpiP')
×
276

277
        if self.config.get('hpctoolkit', False):
×
278
            envmod.module('load', 'hpctoolkit')
×
279

280
        if self.debug:
×
281
            envmod.module('load', 'totalview')
×
282

283
    def set_expt_pathnames(self):
3✔
284

285
        # Local "control" path default used to be applied here,
286
        # but now done in read_config
287
        self.control_path = self.config.get('control_path')
3✔
288

289
        # Experiment name
290
        self.name = self.config.get('experiment',
3✔
291
                                    os.path.basename(self.control_path))
292

293
        # Experiment subdirectories
294
        self.archive_path = os.path.join(self.lab.archive_path, self.name)
3✔
295
        self.work_path = os.path.join(self.lab.work_path, self.name)
3✔
296

297
        # Symbolic link paths to output
298
        self.work_sym_path = os.path.join(self.control_path, 'work')
3✔
299
        self.archive_sym_path = os.path.join(self.control_path, 'archive')
3✔
300

301
        for model in self.models:
3✔
302
            model.set_model_pathnames()
3✔
303
            model.set_local_pathnames()
3✔
304

305
        # Stream output filenames
306
        # TODO: per-model output streams?
307
        self.stdout_fname = self.lab.model_type + '.out'
3✔
308
        self.stderr_fname = self.lab.model_type + '.err'
3✔
309

310
        self.job_fname = 'job.yaml'
3✔
311
        self.env_fname = 'env.yaml'
3✔
312

313
        self.output_fnames = (self.stderr_fname,
3✔
314
                              self.stdout_fname,
315
                              self.job_fname,
316
                              self.env_fname)
317

318
    def set_output_paths(self):
3✔
319

320
        # Local archive paths
321

322
        # Check to see if we've provided a hard coded path -- valid for collate
323
        dir_path = os.environ.get('PAYU_DIR_PATH')
3✔
324
        if dir_path is not None:
3✔
325
            self.output_path = os.path.normpath(dir_path)
×
326
        else:
327
            output_dir = 'output{0:03}'.format(self.counter)
3✔
328
            self.output_path = os.path.join(self.archive_path, output_dir)
3✔
329

330
        # TODO: check case counter == 0
331
        prior_output_dir = 'output{0:03}'.format(self.counter - 1)
3✔
332
        prior_output_path = os.path.join(self.archive_path, prior_output_dir)
3✔
333
        if os.path.exists(prior_output_path):
3✔
334
            self.prior_output_path = prior_output_path
×
335
        else:
336
            self.prior_output_path = None
3✔
337

338
        # Local restart paths
339
        restart_dir = 'restart{0:03}'.format(self.counter)
3✔
340
        self.restart_path = os.path.join(self.archive_path, restart_dir)
3✔
341

342
        # Prior restart path
343

344
        # Check if a user restart directory is avaiable
345
        user_restart_dir = self.config.get('restart')
3✔
346
        if (self.counter == 0 or self.repeat_run) and user_restart_dir:
3✔
347
            # TODO: Some user friendliness needed...
348
            assert (os.path.isdir(user_restart_dir))
3✔
349
            self.prior_restart_path = user_restart_dir
3✔
350
        else:
351
            prior_restart_dir = 'restart{0:03}'.format(self.counter - 1)
3✔
352
            prior_restart_path = os.path.join(self.archive_path,
3✔
353
                                              prior_restart_dir)
354
            if os.path.exists(prior_restart_path) and not self.repeat_run:
3✔
355
                self.prior_restart_path = prior_restart_path
3✔
356
            else:
357
                self.prior_restart_path = None
3✔
358
                if self.counter > 0 and not self.repeat_run:
3✔
359
                    # TODO: This warning should be replaced with an abort in
360
                    #       setup
361
                    print('payu: warning: No restart files found.')
×
362

363
        for model in self.models:
3✔
364
            model.set_model_output_paths()
3✔
365

366
    def build_model(self):
3✔
367

368
        self.load_modules()
×
369

370
        for model in self.models:
×
371
            model.get_codebase()
×
372

373
        for model in self.models:
×
374
            model.build_model()
×
375

376
    def setup(self, force_archive=False):
3✔
377

378
        # Confirm that no output path already exists
379
        if os.path.exists(self.output_path):
3✔
380
            sys.exit('payu: error: Output path already exists: '
×
381
                     '{path}.'.format(path=self.output_path))
382

383
        # Confirm that no work path already exists
384
        if os.path.exists(self.work_path):
3✔
385
            if self.force:
3✔
386
                print('payu: work path already exists.\n'
3✔
387
                      '      Sweeping as --force option is True.')
388
                self.sweep()
3✔
389
            else:
390
                sys.exit('payu: error: work path already exists: {path}.\n'
3✔
391
                         '             payu sweep and then payu run'
392
                         .format(path=self.work_path))
393

394
        mkdir_p(self.work_path)
3✔
395

396
        if force_archive:
3✔
397
            mkdir_p(self.archive_path)
×
398
            make_symlink(self.archive_path, self.archive_sym_path)
×
399

400
        # Archive the payu config
401
        # TODO: This just copies the existing config.yaml file, but we should
402
        #       reconstruct a new file including default values
403
        config_src = os.path.join(self.control_path, 'config.yaml')
3✔
404
        config_dst = os.path.join(self.work_path)
3✔
405
        shutil.copy(config_src, config_dst)
3✔
406

407
        # Stripe directory in Lustre
408
        # TODO: Make this more configurable
409
        do_stripe = self.config.get('stripedio', False)
3✔
410
        if do_stripe:
3✔
411
            cmd = 'lfs setstripe -c 8 -s 8m {0}'.format(self.work_path)
×
412
            sp.check_call(shlex.split(cmd))
×
413

414
        make_symlink(self.work_path, self.work_sym_path)
3✔
415

416
        # Set up all file manifests
417
        self.manifest.setup()
3✔
418

419
        for model in self.models:
3✔
420
            model.setup()
3✔
421

422
        # Call the macro-model setup
423
        if len(self.models) > 1:
3✔
424
            self.model.setup()
×
425

426
        self.manifest.check_manifests()
3✔
427

428
        # Copy manifests to work directory so they archived on completion
429
        manifest_path = os.path.join(self.work_path, 'manifests')
3✔
430
        self.manifest.copy_manifests(manifest_path)
3✔
431

432
        setup_script = self.userscripts.get('setup')
3✔
433
        if setup_script:
3✔
434
            self.run_userscript(setup_script)
×
435

436
        # Profiler setup
437
        expt_profs = self.config.get('profilers', [])
3✔
438
        if not isinstance(expt_profs, list):
3✔
439
            expt_profs = [expt_profs]
×
440

441
        for prof_name in expt_profs:
3✔
442
            ProfType = payu.profilers.index[prof_name]
×
443
            prof = ProfType(self)
×
444
            self.profilers.append(prof)
×
445

446
            # Testing
447
            prof.setup()
×
448

449
        # Check restart pruning for valid configuration values and
450
        # warns user if more restarts than expected would be pruned
451
        if self.config.get('archive', True):
3✔
452
            self.get_restarts_to_prune()
3✔
453

454
    def run(self, *user_flags):
3✔
455

456
        # XXX: This was previously done in reversion
457
        envmod.setup()
×
458

459
        # Add any user-defined module dir(s) to MODULEPATH
460
        for module_dir in self.config.get('modules', {}).get('use', []):
×
NEW
461
            envmod.module('use', module_dir)
×
462

463
        self.load_modules()
×
464

465
        f_out = open(self.stdout_fname, 'w')
×
466
        f_err = open(self.stderr_fname, 'w')
×
467

468
        # Set MPI environment variables
469
        env = self.config.get('env')
×
470

471
        # Explicitly check for `None`, in case of an empty `env:` entry
472
        if env is None:
×
473
            env = {}
×
474

475
        for var in env:
×
476

477
            if env[var] is None:
×
478
                env_value = ''
×
479
            else:
480
                env_value = str(env[var])
×
481

482
            os.environ[var] = env_value
×
483

484
        mpi_config = self.config.get('mpi', {})
×
485
        mpi_runcmd = mpi_config.get('runcmd', 'mpirun')
×
486

487
        if self.config.get('scalasca', False):
×
488
            mpi_runcmd = ' '.join(['scalasca -analyze', mpi_runcmd])
×
489

490
        # MPI runtime flags
491
        mpi_flags = mpi_config.get('flags', [])
×
492
        if not mpi_flags:
×
493
            mpi_flags = self.config.get('mpirun', [])
×
494
            # TODO: Legacy config removal warning
495

NEW
496
        if not isinstance(mpi_flags, list):
×
497
            mpi_flags = [mpi_flags]
×
498

499
        # TODO: More uniform support needed here
500
        if self.config.get('scalasca', False):
×
501
            mpi_flags = ['\"{0}\"'.format(f) for f in mpi_flags]
×
502

503
        # XXX: I think this may be broken
504
        if user_flags:
×
505
            mpi_flags.extend(list(user_flags))
×
506

507
        if self.debug:
×
508
            mpi_flags.append('--debug')
×
509

510
        mpi_progs = []
×
511
        for model in self.models:
×
512

513
            # Skip models without executables (e.g. couplers)
514
            if not model.exec_path_local:
×
515
                continue
×
516

517
            mpi_config = self.config.get('mpi', {})
×
518
            mpi_module = mpi_config.get('module', None)
×
519

520
            # Update MPI library module (if not explicitly set)
521
            # TODO: Check for MPI library mismatch across multiple binaries
522
            if mpi_module is None and model.required_libs is not None:
×
523
                envmod.lib_update(
×
524
                    model.required_libs,
525
                    'libmpi.so'
526
                )
527

528
            model_prog = []
×
529

530
            wdir_arg = '-wdir'
×
531
            if self.config.get('scheduler') == 'slurm':
×
532
                # Option to set the working directory differs in slurm
533
                wdir_arg = '--chdir'
×
534
            model_prog.append(f'{wdir_arg} {model.work_path}')
×
535

536
            # Append any model-specific MPI flags
537
            model_flags = model.config.get('mpiflags', [])
×
538
            if not isinstance(model_flags, list):
×
539
                model_prog.append(model_flags)
×
540
            else:
541
                model_prog.extend(model_flags)
×
542

543
            model_ncpus = model.config.get('ncpus')
×
544
            if model_ncpus:
×
545
                if self.config.get('scheduler') == 'slurm':
×
546
                    model_prog.append('-n {0}'.format(model_ncpus))
×
547
                else:
548
                    # Default to preferred mpirun syntax
549
                    model_prog.append('-np {0}'.format(model_ncpus))
×
550

551
            model_npernode = model.config.get('npernode')
×
552
            # TODO: New Open MPI format?
553
            if model_npernode:
×
554
                if model_npernode % 2 == 0:
×
555
                    npernode_flag = ('-map-by ppr:{0}:socket'
×
556
                                     ''.format(model_npernode / 2))
557
                else:
558
                    npernode_flag = ('-map-by ppr:{0}:node'
×
559
                                     ''.format(model_npernode))
560

561
                if self.config.get('scalasca', False):
×
562
                    npernode_flag = '\"{0}\"'.format(npernode_flag)
×
563
                model_prog.append(npernode_flag)
×
564

565
            if self.config.get('hpctoolkit', False):
×
566
                os.environ['HPCRUN_EVENT_LIST'] = 'WALLCLOCK@5000'
×
567
                model_prog.append('hpcrun')
×
568

569
            for prof in self.profilers:
×
570
                if prof.runscript:
×
571
                    model_prog.append(prof.runscript)
×
572

573
            model_prog.append(model.exec_prefix)
×
574

575
            # Use the full path to symlinked exec_name in work as some
576
            # older MPI libraries complained executable was not in PATH
577
            model_prog.append(os.path.join(model.work_path, model.exec_name))
×
578

579
            mpi_progs.append(' '.join(model_prog))
×
580

581
        cmd = '{runcmd} {flags} {exes}'.format(
×
582
            runcmd=mpi_runcmd,
583
            flags=' '.join(mpi_flags),
584
            exes=' : '.join(mpi_progs)
585
        )
586

587
        for prof in self.profilers:
×
588
            cmd = prof.wrapper(cmd)
×
589

590
        # Expand shell variables inside flags
591
        if self.expand_shell_vars:
×
592
            cmd = os.path.expandvars(cmd)
×
593

594
        # TODO: Consider making this default
595
        if self.config.get('coredump', False):
×
596
            enable_core_dump()
×
597

598
        # Dump out environment
599
        with open(self.env_fname, 'w') as file:
×
600
            file.write(yaml.dump(dict(os.environ), default_flow_style=False))
×
601

602
        self.runlog.create_manifest()
×
603
        if self.runlog.enabled:
×
604
            self.runlog.commit()
×
605

606
        # NOTE: This may not be necessary, since env seems to be getting
607
        # correctly updated.  Need to look into this.
608
        print(cmd)
×
609
        if env:
×
610
            # TODO: Replace with mpirun -x flag inputs
611
            proc = sp.Popen(shlex.split(cmd), stdout=f_out, stderr=f_err,
×
612
                            env=os.environ.copy())
613
            proc.wait()
×
614
            rc = proc.returncode
×
615
        else:
616
            rc = sp.call(shlex.split(cmd), stdout=f_out, stderr=f_err)
×
617

618
        f_out.close()
×
619
        f_err.close()
×
620

621
        self.finish_time = datetime.datetime.now()
×
622

623
        info = get_job_info()
×
624

625
        if info is None:
×
626
            # Not being run under PBS, reverse engineer environment
627
            info = {
×
628
                'PAYU_PATH': os.path.dirname(self.payu_path)
629
            }
630

631
        # Add extra information to save to jobinfo
632
        info.update(
×
633
            {
634
                'PAYU_CONTROL_DIR': self.control_path,
635
                'PAYU_RUN_ID': self.run_id,
636
                'PAYU_CURRENT_RUN': self.counter,
637
                'PAYU_N_RUNS':  self.n_runs,
638
                'PAYU_JOB_STATUS': rc,
639
                'PAYU_START_TIME': self.start_time.isoformat(),
640
                'PAYU_FINISH_TIME': self.finish_time.isoformat(),
641
                'PAYU_WALLTIME': "{0} s".format(
642
                    (self.finish_time - self.start_time).total_seconds()
643
                ),
644
            }
645
        )
646

647
        # Dump job info
648
        with open(self.job_fname, 'w') as file:
×
649
            file.write(yaml.dump(info, default_flow_style=False))
×
650

651
        # Remove any empty output files (e.g. logs)
652
        for fname in os.listdir(self.work_path):
×
653
            fpath = os.path.join(self.work_path, fname)
×
654
            if os.path.getsize(fpath) == 0:
×
655
                os.remove(fpath)
×
656

657
        # Clean up any profiling output
658
        # TODO: Move after `rc` code check?
659
        for prof in self.profilers:
×
660
            prof.postprocess()
×
661

662
        # TODO: Need a model-specific cleanup method call here
663
        # NOTE: This does not appear to catch hanging jobs killed by PBS
664
        if rc != 0:
×
665
            # Backup logs for failed runs
666
            error_log_dir = os.path.join(self.archive_path, 'error_logs')
×
667
            mkdir_p(error_log_dir)
×
668

669
            # NOTE: This is PBS-specific
670
            job_id = get_job_id(short=False)
×
671

672
            if job_id == '':
×
673
                job_id = str(self.run_id)[:6]
×
674

675
            for fname in self.output_fnames:
×
676

677
                src = os.path.join(self.control_path, fname)
×
678

679
                stem, suffix = os.path.splitext(fname)
×
680
                dest = os.path.join(error_log_dir,
×
681
                                    ".".join((stem, job_id)) + suffix)
682

683
                print(src, dest)
×
684

685
                shutil.copyfile(src, dest)
×
686

687
            # Create the symlink to the logs if it does not exist
688
            make_symlink(self.archive_path, self.archive_sym_path)
×
689

690
            error_script = self.userscripts.get('error')
×
691
            if error_script:
×
692
                self.run_userscript(error_script)
×
693

694
            # Terminate payu
695
            sys.exit('payu: Model exited with error code {0}; aborting.'
×
696
                     ''.format(rc))
697

698
        # Decrement run counter on successful run
699
        stop_file_path = os.path.join(self.control_path, 'stop_run')
×
700
        if os.path.isfile(stop_file_path):
×
701
            assert os.stat(stop_file_path).st_size == 0
×
702
            os.remove(stop_file_path)
×
703
            print('payu: Stop file detected; terminating resubmission.')
×
704
            self.n_runs = 0
×
705
        else:
706
            self.n_runs -= 1
×
707

708
        # Move logs to archive (or delete if empty)
709
        for f in self.output_fnames:
×
710
            f_path = os.path.join(self.control_path, f)
×
711
            if os.path.getsize(f_path) == 0:
×
712
                os.remove(f_path)
×
713
            else:
714
                shutil.move(f_path, self.work_path)
×
715

716
        run_script = self.userscripts.get('run')
×
717
        if run_script:
×
718
            self.run_userscript(run_script)
×
719

720
    def archive(self, force_prune_restarts=False):
3✔
721
        if not self.config.get('archive', True):
×
722
            print('payu: not archiving due to config.yaml setting.')
×
723
            return
×
724

725
        # Check there is a work directory, otherwise bail
726
        if not os.path.exists(self.work_sym_path):
×
727
            sys.exit('payu: error: No work directory to archive.')
×
728

729
        mkdir_p(self.archive_path)
×
730
        make_symlink(self.archive_path, self.archive_sym_path)
×
731

732
        # Remove work symlink
733
        if os.path.islink(self.work_sym_path):
×
734
            os.remove(self.work_sym_path)
×
735

736
        mkdir_p(self.restart_path)
×
737

738
        for model in self.models:
×
739
            model.archive()
×
740

741
        # Postprocess the model suite
742
        if len(self.models) > 1:
×
743
            self.model.archive()
×
744

745
        # Double-check that the run path does not exist
746
        if os.path.exists(self.output_path):
×
747
            sys.exit('payu: error: Output path already exists.')
×
748

749
        movetree(self.work_path, self.output_path)
×
750

751
        # Remove any outdated restart files
NEW
752
        try:
×
NEW
753
            restarts_to_prune = self.get_restarts_to_prune(
×
754
                force=force_prune_restarts)
NEW
755
        except Exception as e:
×
NEW
756
            print(e)
×
NEW
757
            print("payu: error: Skipping pruning restarts")
×
NEW
758
            restarts_to_prune = []
×
759

NEW
760
        for restart in restarts_to_prune:
×
NEW
761
            restart_path = os.path.join(self.archive_path, restart)
×
762
            # Only delete real directories; ignore symbolic restart links
NEW
763
            if (os.path.isdir(restart_path) and
×
764
                    not os.path.islink(restart_path)):
NEW
765
                shutil.rmtree(restart_path)
×
766

767
        # Ensure dynamic library support for subsequent python calls
768
        ld_libpaths = os.environ.get('LD_LIBRARY_PATH', None)
×
769
        py_libpath = sysconfig.get_config_var('LIBDIR')
×
770
        if ld_libpaths is None:
×
771
            os.environ['LD_LIBRARY_PATH'] = py_libpath
×
772
        elif py_libpath not in ld_libpaths.split(':'):
×
773
            os.environ['LD_LIBRARY_PATH'] = f'{py_libpath}:{ld_libpaths}'
×
774

775
        collate_config = self.config.get('collate', {})
×
776
        collating = collate_config.get('enable', True)
×
777
        if collating:
×
778
            cmd = '{python} {payu} collate -i {expt}'.format(
×
779
                python=sys.executable,
780
                payu=self.payu_path,
781
                expt=self.counter
782
            )
783
            sp.check_call(shlex.split(cmd))
×
784

785
        if self.config.get('hpctoolkit', False):
×
786
            cmd = '{python} {payu} profile -i {expt}'.format(
×
787
                python=sys.executable,
788
                payu=self.payu_path,
789
                expt=self.counter
790
            )
791
            sp.check_call(shlex.split(cmd))
×
792

793
        archive_script = self.userscripts.get('archive')
×
794
        if archive_script:
×
795
            self.run_userscript(archive_script)
×
796

797
        # Ensure postprocess runs if model not collating
798
        if not collating and self.postscript:
×
799
            self.postprocess()
×
800

801
    def collate(self):
3✔
802
        for model in self.models:
×
803
            model.collate()
×
804

805
    def profile(self):
3✔
806
        for model in self.models:
×
807
            model.profile()
×
808

809
    def postprocess(self):
3✔
810
        """Submit a postprocessing script after collation"""
811
        assert self.postscript
×
812
        envmod.setup()
×
813
        envmod.module('load', 'pbs')
×
814

815
        cmd = 'qsub {script}'.format(script=self.postscript)
×
816

817
        cmd = shlex.split(cmd)
×
818
        rc = sp.call(cmd)
×
819
        assert rc == 0, 'Postprocessing script submission failed.'
×
820

821
    def remote_archive(self, config_name, archive_url=None,
3✔
822
                       max_rsync_attempts=1, rsync_protocol=None):
823

824
        if not archive_url:
×
825
            archive_url = default_archive_url
×
826

827
        archive_address = '{usr}@{url}'.format(usr=getpass.getuser(),
×
828
                                               url=archive_url)
829

830
        ssh_key_path = os.path.join(os.getenv('HOME'), '.ssh',
×
831
                                    'id_rsa_file_transfer')
832

833
        # Top-level path is implicitly set by the SSH key
834
        # (Usually /projects/[group])
835

836
        # Remote mkdir is currently not possible, so any new subdirectories
837
        # must be created before auto-archival
838

839
        remote_path = os.path.join(self.model_name, config_name, self.name)
×
840
        remote_url = '{addr}:{path}'.format(addr=archive_address,
×
841
                                            path=remote_path)
842

843
        # Rsync ouput and restart files
844
        rsync_cmd = ('rsync -a --safe-links -e "ssh -i {key}" '
×
845
                     ''.format(key=ssh_key_path))
846

847
        if rsync_protocol:
×
848
            rsync_cmd += '--protocol={0} '.format(rsync_protocol)
×
849

850
        run_cmd = rsync_cmd + '{src} {dst}'.format(src=self.output_path,
×
851
                                                   dst=remote_url)
852
        rsync_calls = [run_cmd]
×
853

854
        if (self.counter % 5) == 0 and os.path.isdir(self.restart_path):
×
855
            # Tar restart files before rsyncing
856
            restart_tar_path = self.restart_path + '.tar.gz'
×
857

858
            cmd = ('tar -C {0} -czf {1} {2}'
×
859
                   ''.format(self.archive_path, restart_tar_path,
860
                             os.path.basename(self.restart_path)))
861
            sp.check_call(shlex.split(cmd))
×
862

863
            restart_cmd = ('{0} {1} {2}'
×
864
                           ''.format(rsync_cmd, restart_tar_path, remote_url))
865
            rsync_calls.append(restart_cmd)
×
866
        else:
867
            res_tar_path = None
×
868

869
        for model in self.models:
×
870
            for input_path in self.model.input_paths:
×
871
                # Using explicit path separators to rename the input directory
872
                input_cmd = rsync_cmd + '{0} {1}'.format(
×
873
                    input_path + os.path.sep,
874
                    os.path.join(remote_url, 'input') + os.path.sep)
875
                rsync_calls.append(input_cmd)
×
876

877
        for cmd in rsync_calls:
×
878
            cmd = shlex.split(cmd)
×
879

880
            for rsync_attempt in range(max_rsync_attempts):
×
881
                rc = sp.Popen(cmd).wait()
×
882
                if rc == 0:
×
883
                    break
×
884
                else:
885
                    print('rsync failed, reattempting')
×
886
            assert rc == 0
×
887

888
        # TODO: Temporary; this should be integrated with the rsync call
889
        if res_tar_path and os.path.exists(res_tar_path):
×
890
            os.remove(res_tar_path)
×
891

892
    def resubmit(self):
3✔
893
        next_run = self.counter + 1
×
894
        cmd = '{python} {payu} run -i {start} -n {n}'.format(
×
895
            python=sys.executable,
896
            payu=self.payu_path,
897
            start=next_run,
898
            n=self.n_runs
899
        )
900
        cmd = shlex.split(cmd)
×
901
        sp.call(cmd)
×
902

903
    def run_userscript(self, script_cmd):
3✔
904
        # First try to interpret the argument as a full command:
905
        try:
×
906
            sp.check_call(shlex.split(script_cmd))
×
907
        except EnvironmentError as exc:
×
908
            # Now try to run the script explicitly
909
            if exc.errno == errno.ENOENT:
×
910
                cmd = os.path.join(self.control_path, script_cmd)
×
911
                # Simplistic recursion check
912
                assert os.path.isfile(cmd)
×
913
                self.run_userscript(cmd)
×
914

915
            # If we get a "non-executable" error, then guess the type
916
            elif exc.errno == errno.EACCES:
×
917
                # TODO: Move outside
918
                ext_cmd = {'.py': sys.executable,
×
919
                           '.sh': '/bin/bash',
920
                           '.csh': '/bin/tcsh'}
921

922
                _, f_ext = os.path.splitext(script_cmd)
×
923
                shell_name = ext_cmd.get(f_ext)
×
924
                if shell_name:
×
925
                    print('payu: warning: Assuming that {0} is a {1} script '
×
926
                          'based on the filename extension.'
927
                          ''.format(os.path.basename(script_cmd),
928
                                    os.path.basename(shell_name)))
929
                    cmd = ' '.join([shell_name, script_cmd])
×
930
                    self.run_userscript(cmd)
×
931
                else:
932
                    # If we can't guess the shell, then abort
933
                    raise
×
934
        except sp.CalledProcessError as exc:
×
935
            # If the script runs but the output is bad, then warn the user
936
            print('payu: warning: user script \'{0}\' failed (error {1}).'
×
937
                  ''.format(script_cmd, exc.returncode))
938

939
    def sweep(self, hard_sweep=False):
3✔
940
        # TODO: Fix the IO race conditions!
941

942
        # TODO: model outstreams and pbs logs need to be handled separately
943
        default_job_name = os.path.basename(os.getcwd())
3✔
944
        short_job_name = str(self.config.get('jobname', default_job_name))[:15]
3✔
945

946
        logs = [
3✔
947
            f for f in os.listdir(os.curdir) if os.path.isfile(f) and (
948
                f.startswith(short_job_name + '.o') or
949
                f.startswith(short_job_name + '.e') or
950
                f.startswith(short_job_name[:13] + '_c.o') or
951
                f.startswith(short_job_name[:13] + '_c.e') or
952
                f.startswith(short_job_name[:13] + '_p.o') or
953
                f.startswith(short_job_name[:13] + '_p.e')
954
            )
955
        ]
956

957
        pbs_log_path = os.path.join(self.archive_path, 'pbs_logs')
3✔
958
        legacy_pbs_log_path = os.path.join(self.control_path, 'pbs_logs')
3✔
959

960
        if os.path.isdir(legacy_pbs_log_path):
3✔
961
            # TODO: New path may still exist!
962
            assert not os.path.isdir(pbs_log_path)
×
963
            print('payu: Moving pbs_logs to {0}'.format(pbs_log_path))
×
964
            shutil.move(legacy_pbs_log_path, pbs_log_path)
×
965
        else:
966
            mkdir_p(pbs_log_path)
3✔
967

968
        for f in logs:
3✔
969
            print('Moving log {0}'.format(f))
×
970
            shutil.move(f, os.path.join(pbs_log_path, f))
×
971

972
        if hard_sweep:
3✔
973
            if os.path.isdir(self.archive_path):
3✔
974
                print('Removing archive path {0}'.format(self.archive_path))
3✔
975
                cmd = 'rm -rf {0}'.format(self.archive_path)
3✔
976
                cmd = shlex.split(cmd)
3✔
977
                rc = sp.call(cmd)
3✔
978
                assert rc == 0
3✔
979

980
            if os.path.islink(self.archive_sym_path):
3✔
981
                print('Removing symlink {0}'.format(self.archive_sym_path))
×
982
                os.remove(self.archive_sym_path)
×
983

984
        # Remove stdout/err and yaml dumps
985
        for f in self.output_fnames:
3✔
986
            if os.path.isfile(f):
3✔
987
                os.remove(f)
×
988

989
        if os.path.isdir(self.work_path):
3✔
990
            print('Removing work path {0}'.format(self.work_path))
3✔
991
            cmd = 'rm -rf {0}'.format(self.work_path)
3✔
992
            cmd = shlex.split(cmd)
3✔
993
            rc = sp.call(cmd)
3✔
994
            assert rc == 0
3✔
995

996
        if os.path.islink(self.work_sym_path):
3✔
997
            print('Removing symlink {0}'.format(self.work_sym_path))
3✔
998
            os.remove(self.work_sym_path)
3✔
999

1000
    def get_restarts_to_prune(self,
3✔
1001
                              ignore_intermediate_restarts=False,
1002
                              force=False):
1003
        """Returns a list of restart directories that can be pruned"""
1004
        # Check if archive path exists
1005
        if not os.path.exists(self.archive_path):
3✔
NEW
1006
            return []
×
1007

1008
        # List all restart directories in archive
1009
        restarts = self.list_output_dirs(output_type='restart')
3✔
1010

1011
        # TODO: Previous logic was to prune all restarts if self.repeat_run
1012
        # Still need to figure out what should happen in this case
1013
        if self.repeat_run:
3✔
NEW
1014
            return [os.path.join(self.archive_path, restart)
×
1015
                    for restart in restarts]
1016

1017
        # Use restart_freq to decide what restarts to prune
1018
        restarts_to_prune = []
3✔
1019
        intermediate_restarts, previous_intermediate_restarts = [], []
3✔
1020
        restart_freq = self.config.get('restart_freq', default_restart_freq)
3✔
1021
        if isinstance(restart_freq, int):
3✔
1022
            # Using integer frequency to prune restarts
1023
            for restart in restarts:
3✔
1024
                restart_idx = int(restart.lstrip('restart'))
3✔
1025
                if not restart_idx % restart_freq == 0:
3✔
1026
                    intermediate_restarts.append(restart)
3✔
1027
                else:
1028
                    # Add any intermediate restarts to restarts to prune
1029
                    restarts_to_prune.extend(intermediate_restarts)
3✔
1030
                    previous_intermediate_restarts = intermediate_restarts
3✔
1031
                    intermediate_restarts = []
3✔
1032
        else:
1033
            # Using date-based frequency to prune restarts
1034
            try:
3✔
1035
                date_offset = parse_date_offset(restart_freq)
3✔
NEW
1036
            except ValueError as e:
×
NEW
1037
                print('payu: error: Invalid configuration for restart_freq:',
×
1038
                      restart_freq)
NEW
1039
                raise
×
1040

1041
            next_dt = None
3✔
1042
            for restart in restarts:
3✔
1043
                # Use model-driver to parse restart directory for a datetime
1044
                restart_path = os.path.join(self.archive_path, restart)
3✔
1045
                try:
3✔
1046
                    restart_dt = self.model.get_restart_datetime(restart_path)
3✔
NEW
1047
                except NotImplementedError:
×
NEW
1048
                    print('payu: error: Date-based restart pruning is not '
×
1049
                          f'implemented for the {self.model.model_type} '
1050
                          'model. To use integer based restart pruning, '
1051
                          'set restart_freq to an integer value.')
NEW
1052
                    raise
×
NEW
1053
                except Exception:
×
NEW
1054
                    print('payu: error: Error parsing restart directory ',
×
1055
                          f'{restart} for a datetime to prune restarts.')
NEW
1056
                    raise
×
1057

1058
                if (next_dt is not None and restart_dt < next_dt):
3✔
1059
                    intermediate_restarts.append(restart)
3✔
1060
                else:
1061
                    # Keep the earliest datetime and use last kept datetime
1062
                    # as point of reference when adding the next time interval
1063
                    next_dt = date_offset.add_to_datetime(restart_dt)
3✔
1064

1065
                    # Add intermediate restarts to restarts to prune
1066
                    restarts_to_prune.extend(intermediate_restarts)
3✔
1067
                    previous_intermediate_restarts = intermediate_restarts
3✔
1068
                    intermediate_restarts = []
3✔
1069

1070
        if ignore_intermediate_restarts:
3✔
1071
            # Return all restarts that'll eventually be pruned
NEW
1072
            restarts_to_prune.extend(intermediate_restarts)
×
NEW
1073
            return restarts_to_prune
×
1074

1075
        if not force:
3✔
1076
            # check environment for --force-prune-restarts flag
1077
            force = os.environ.get('PAYU_FORCE_PRUNE_RESTARTS', False)
3✔
1078

1079
        # Flag to check whether more restarts than expected will be deleted
1080
        is_unexpected = restarts_to_prune != previous_intermediate_restarts
3✔
1081

1082
        # Restart_history override
1083
        restart_history = self.config.get('restart_history', None)
3✔
1084
        if restart_history is not None:
3✔
1085
            if not isinstance(restart_history, int):
3✔
NEW
1086
                raise ValueError("payu: error: restart_history is not an "
×
1087
                                 f"integer value: {restart_history}")
1088

1089
            # Keep restart_history latest restarts, in addition to the
1090
            # permanently saved restarts defined by restart_freq
1091
            restarts_to_prune.extend(intermediate_restarts)
3✔
1092
            max_index = self.max_output_index(output_type="restart")
3✔
1093
            index_bound = max_index - restart_history
3✔
1094
            restarts_to_prune = [res for res in restarts_to_prune
3✔
1095
                                 if int(res.lstrip('restart')) <= index_bound]
1096

1097
            # Only expect at most 1 restart to be pruned with restart_history
1098
            is_unexpected = len(restarts_to_prune) > 1
3✔
1099

1100
        # Log out warning if more restarts than expected will be deleted
1101
        if not force and is_unexpected:
3✔
1102
            config_info = (f'restart pruning frequency of {restart_freq}')
3✔
1103
            if restart_history:
3✔
1104
                config_info += f' and restart history of {restart_history}'
3✔
1105

1106
            print(f'payu: warning: Current {config_info} would result in '
3✔
1107
                  'following restarts being pruned: '
1108
                  f'{" ".join(restarts_to_prune)}\n'
1109
                  'If this is expected, use --force-prune-restarts flag at '
1110
                  'next run or archive (if running archive manually), e.g.:\n'
1111
                  '     payu run --force-prune-restarts, or\n'
1112
                  '     payu archive --force-prune-restarts\n'
1113
                  'Otherwise, no restarts will be pruned')
1114

1115
            # Return empty list to prevent restarts being pruned
1116
            restarts_to_prune = []
3✔
1117

1118
        return restarts_to_prune
3✔
1119

1120

1121
def enable_core_dump():
3✔
1122
    # Newer Intel compilers support 'FOR_DUMP_CORE_FILE' while most support
1123
    # 'decfort_dump_flag'.  Setting both for now, but there may be a more
1124
    # platform-independent way to support this.
1125

1126
    # Enable Fortran core dump
1127
    os.environ['FOR_DUMP_CORE_FILE'] = 'TRUE'
×
1128
    os.environ['decfort_dump_flag'] = 'TRUE'
×
1129

1130
    # Allow unlimited core dump file sizes
1131
    resource.setrlimit(resource.RLIMIT_CORE,
×
1132
                       (resource.RLIM_INFINITY, resource.RLIM_INFINITY))
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc