• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

payu-org / payu / 12861176675

20 Jan 2025 04:45AM UTC coverage: 59.009% (+0.2%) from 58.821%
12861176675

Pull #550

github

web-flow
Merge 24442dcec into 20a8e76fc
Pull Request #550: Fix job.yaml qstat call

9 of 30 new or added lines in 5 files covered. (30.0%)

15 existing lines in 1 file now uncovered.

2918 of 4945 relevant lines covered (59.01%)

1.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.5
/payu/experiment.py
1
"""payu.experiment ===============
2

3
   Interface to an individual experiment managed by payu
4

5
   :copyright: Copyright 2011 Marshall Ward, see AUTHORS for details.
6
   :license: Apache License, Version 2.0, see LICENSE for details.
7
"""
8

9
from __future__ import print_function
3✔
10

11
# Standard Library
12
import datetime
3✔
13
import errno
3✔
14
import os
3✔
15
import re
3✔
16
import resource
3✔
17
import sys
3✔
18
import shlex
3✔
19
import shutil
3✔
20
import subprocess as sp
3✔
21
import sysconfig
3✔
22
from pathlib import Path
3✔
23

24
# Extensions
25
import yaml
3✔
26
from packaging import version
3✔
27

28
# Local
29
from payu import envmod
3✔
30
from payu.fsops import mkdir_p, make_symlink, read_config, movetree
3✔
31
from payu.fsops import list_archive_dirs
3✔
32
from payu.fsops import run_script_command
3✔
33
from payu.fsops import needs_subprocess_shell
3✔
34
from payu.schedulers import index as scheduler_index, DEFAULT_SCHEDULER_CONFIG
3✔
35
from payu.models import index as model_index
3✔
36
import payu.profilers
3✔
37
from payu.runlog import Runlog
3✔
38
from payu.manifest import Manifest
3✔
39
from payu.calendar import parse_date_offset
3✔
40
from payu.sync import SyncToRemoteArchive
3✔
41
from payu.metadata import Metadata
3✔
42

43
# Environment module support on vayu
44
# TODO: To be removed
45
core_modules = ['python', 'payu']
3✔
46

47
# Default payu parameters
48
default_restart_freq = 5
3✔
49

50

51
class Experiment(object):
3✔
52

53
    def __init__(self, lab, reproduce=False, force=False, metadata_off=False):
3✔
54
        self.lab = lab
3✔
55
        # Check laboratory directories are writable
56
        self.lab.initialize()
3✔
57

58
        if not force:
3✔
59
            # check environment for force flag under PBS
60
            self.force = os.environ.get('PAYU_FORCE', False)
3✔
61
        else:
62
            self.force = force
×
63

64
        self.start_time = datetime.datetime.now()
3✔
65

66
        # Initialise experiment metadata - uuid and experiment name
67
        self.metadata = Metadata(Path(lab.archive_path), disabled=metadata_off)
3✔
68
        self.metadata.setup()
3✔
69

70
        # TODO: replace with dict, check versions via key-value pairs
71
        self.modules = set()
3✔
72

73
        # TODO: __init__ should not be a config dumping ground!
74
        self.config = read_config()
3✔
75

76
        # Payu experiment type
77
        self.debug = self.config.get('debug', False)
3✔
78
        self.postscript = self.config.get('postscript')
3✔
79
        self.repeat_run = self.config.get('repeat', False)
3✔
80

81
        # Configuration
82
        self.expand_shell_vars = True   # TODO: configurable
3✔
83

84
        # Model run time
85
        self.runtime = None
3✔
86
        if ('calendar' in self.config and
3✔
87
                'runtime' in self.config['calendar']):
88
            self.runtime = self.config['calendar']['runtime']
3✔
89

90
        # Stacksize
91
        # NOTE: Possible PBS issue in setting non-unlimited stacksizes
92
        stacksize = self.config.get('stacksize', 'unlimited')
3✔
93
        self.set_stacksize(stacksize)
3✔
94

95
        # Initialize the submodels
96
        self.init_models()
3✔
97

98
        # TODO: Move to run/collate/sweep?
99
        self.set_expt_pathnames()
3✔
100
        self.set_counters()
3✔
101

102
        for model in self.models:
3✔
103
            model.set_input_paths()
3✔
104

105
        self.set_output_paths()
3✔
106

107
        # Create metadata file and move to archive
108
        self.metadata.write_metadata(restart_path=self.prior_restart_path)
3✔
109

110
        if not reproduce:
3✔
111
            # check environment for reproduce flag under PBS
112
            reproduce = os.environ.get('PAYU_REPRODUCE', False)
3✔
113

114
        # Initialize manifest
115
        self.manifest = Manifest(self.config.get('manifest', {}),
3✔
116
                                 reproduce=reproduce)
117

118
        # Miscellaneous configurations
119
        # TODO: Move this stuff somewhere else
120
        self.userscripts = self.config.get('userscripts', {})
3✔
121

122
        self.profilers = []
3✔
123

124
        init_script = self.userscripts.get('init')
3✔
125
        if init_script:
3✔
126
            self.run_userscript(init_script)
×
127

128
        self.runlog = Runlog(self)
3✔
129

130
        # XXX: Temporary spot for the payu path
131
        #      This is horrible; payu/cli.py does this much more safely!
132
        #      But also does not even store it in os.environ!
133
        default_payu_bin = os.path.dirname(sys.argv[0])
3✔
134
        payu_bin = os.environ.get('PAYU_PATH', default_payu_bin)
3✔
135

136
        self.payu_path = os.path.join(payu_bin, 'payu')
3✔
137

138
        self.run_id = None
3✔
139

140
        self.user_modules_paths = None
3✔
141

142
    def init_models(self):
3✔
143

144
        self.model_name = self.config.get('model')
3✔
145
        assert self.model_name
3✔
146

147
        model_fields = ['model', 'exe', 'input', 'ncpus', 'npernode', 'build',
3✔
148
                        'mpthreads', 'exe_prefix']
149

150
        # XXX: Temporarily adding this to model config...
151
        model_fields += ['mask']
3✔
152

153
        # TODO: Rename this to self.submodels
154
        self.models = []
3✔
155

156
        submodels = self.config.get('submodels', [])
3✔
157

158
        submodel_config = dict((f, self.config[f]) for f in model_fields
3✔
159
                               if f in self.config)
160
        submodel_config['name'] = self.model_name
3✔
161

162
        submodels.append(submodel_config)
3✔
163

164
        for m_config in submodels:
3✔
165
            ModelType = model_index[m_config['model']]
3✔
166
            self.models.append(ModelType(self, m_config['name'], m_config))
3✔
167

168
        # Load the top-level model
169
        if self.model_name:
3✔
170
            ModelType = model_index[self.model_name]
3✔
171
            model_config = dict((f, self.config[f]) for f in model_fields
3✔
172
                                if f in self.config)
173
            self.model = ModelType(self, self.model_name, model_config)
3✔
174
            self.model.top_level_model = True
3✔
175
        else:
176
            self.model = None
×
177

178
    def set_counters(self):
3✔
179
        # Assume that ``set_paths`` has already been called
180
        assert self.archive_path
3✔
181

182
        current_counter = os.environ.get('PAYU_CURRENT_RUN')
3✔
183
        if current_counter:
3✔
184
            self.counter = int(current_counter)
×
185
        else:
186
            self.counter = None
3✔
187

188
        self.n_runs = int(os.environ.get('PAYU_N_RUNS', 1))
3✔
189

190
        # Initialize counter if unset
191
        if self.counter is None:
3✔
192
            # Check for restart index
193
            max_restart_index = self.max_output_index(output_type="restart")
3✔
194
            if max_restart_index is not None:
3✔
195
                self.counter = 1 + max_restart_index
3✔
196
            else:
197
                # Now look for output directories,
198
                # as repeat runs do not generate restart files.
199
                max_output_index = self.max_output_index()
3✔
200
                if max_output_index is not None:
3✔
201
                    self.counter = 1 + max_output_index
×
202
                else:
203
                    self.counter = 0
3✔
204

205
    def max_output_index(self, output_type="output"):
3✔
206
        """Given a output directory type (output or restart),
207
        return the maximum index of output directories found"""
208
        try:
3✔
209
            output_dirs = list_archive_dirs(archive_path=self.archive_path,
3✔
210
                                            dir_type=output_type)
211
        except EnvironmentError as exc:
3✔
212
            if exc.errno == errno.ENOENT:
3✔
213
                output_dirs = None
3✔
214
            else:
215
                raise
×
216

217
        if output_dirs and len(output_dirs):
3✔
218
            return int(output_dirs[-1].lstrip(output_type))
3✔
219

220
    def set_stacksize(self, stacksize):
3✔
221

222
        if stacksize == 'unlimited':
3✔
223
            stacksize = resource.RLIM_INFINITY
3✔
224
        else:
225
            assert type(stacksize) is int
×
226

227
        resource.setrlimit(resource.RLIMIT_STACK,
3✔
228
                           (stacksize, resource.RLIM_INFINITY))
229

230
    def setup_modules(self):
3✔
231
        """Setup modules and get paths added to $PATH by user-modules"""
232
        envmod.setup()
3✔
233

234
        # Get user modules info from config
235
        user_modulepaths = self.config.get('modules', {}).get('use', [])
3✔
236
        user_modules = self.config.get('modules', {}).get('load', [])
3✔
237

238
        # Run module use + load commands for user-defined modules, and
239
        # get a set of paths and loaded modules added by loading the modules
240
        loaded_mods, paths = envmod.setup_user_modules(user_modules,
3✔
241
                                                       user_modulepaths)
242
        self.user_modules_paths = paths
3✔
243
        self.loaded_user_modules = [] if loaded_mods is None else loaded_mods
3✔
244

245
    def load_modules(self):
3✔
246
        # Scheduler
247
        sched_modname = self.config.get('scheduler', 'pbs')
×
248
        self.modules.add(sched_modname)
×
249

250
        # MPI library
251
        mpi_config = self.config.get('mpi', {})
×
252

253
        # Assign MPI module paths
254
        mpi_modpath = mpi_config.get('modulepath', None)
×
255
        if mpi_modpath:
×
256
            envmod.module('use', mpi_modpath)
×
257

258
        mpi_modname = mpi_config.get('module', 'openmpi')
×
259
        self.modules.add(mpi_modname)
×
260

261
        # Unload non-essential modules
262
        loaded_mods = os.environ.get('LOADEDMODULES', '').split(':')
×
263

264
        for mod in loaded_mods:
×
265
            if len(mod) > 0:
×
266
                print('mod '+mod)
×
267
                mod_base = mod.split('/')[0]
×
268
                if (mod_base not in core_modules and
×
269
                        mod not in self.loaded_user_modules):
270
                    envmod.module('unload', mod)
×
271

272
        # Now load model-dependent modules
273
        for mod in self.modules:
×
274
            envmod.module('load', mod)
×
275

276
        envmod.module('list')
×
277

278
        for prof in self.profilers:
×
279
            prof.load_modules()
×
280

281
        # TODO: Consolidate this profiling stuff
282
        c_ipm = self.config.get('ipm', False)
×
283
        if c_ipm:
×
284
            if isinstance(c_ipm, str):
×
285
                ipm_mod = os.path.join('ipm', c_ipm)
×
286
            else:
287
                ipm_mod = 'ipm/2.0.2'
×
288

289
            envmod.module('load', ipm_mod)
×
290
            os.environ['IPM_LOGDIR'] = self.work_path
×
291

292
        if self.config.get('mpiP', False):
×
293
            envmod.module('load', 'mpiP')
×
294

295
        if self.config.get('hpctoolkit', False):
×
296
            envmod.module('load', 'hpctoolkit')
×
297

298
        if self.debug:
×
299
            envmod.module('load', 'totalview')
×
300

301
    def set_expt_pathnames(self):
3✔
302

303
        # Local "control" path default used to be applied here,
304
        # but now done in read_config
305
        self.control_path = self.config.get('control_path')
3✔
306

307
        # Experiment name
308
        self.name = self.metadata.experiment_name
3✔
309

310
        # Experiment subdirectories
311
        self.archive_path = os.path.join(self.lab.archive_path, self.name)
3✔
312
        self.work_path = os.path.join(self.lab.work_path, self.name)
3✔
313

314
        # Symbolic link paths to output
315
        self.work_sym_path = os.path.join(self.control_path, 'work')
3✔
316
        self.archive_sym_path = os.path.join(self.control_path, 'archive')
3✔
317

318
        for model in self.models:
3✔
319
            model.set_model_pathnames()
3✔
320
            model.set_local_pathnames()
3✔
321

322
        # Stream output filenames
323
        # TODO: per-model output streams?
324
        self.stdout_fname = self.lab.model_type + '.out'
3✔
325
        self.stderr_fname = self.lab.model_type + '.err'
3✔
326

327
        self.job_fname = 'job.yaml'
3✔
328
        self.env_fname = 'env.yaml'
3✔
329

330
        self.output_fnames = (self.stderr_fname,
3✔
331
                              self.stdout_fname,
332
                              self.job_fname,
333
                              self.env_fname)
334

335
    def set_output_paths(self):
3✔
336

337
        # Local archive paths
338

339
        # Check to see if we've provided a hard coded path -- valid for collate
340
        dir_path = os.environ.get('PAYU_DIR_PATH')
3✔
341
        if dir_path is not None:
3✔
342
            self.output_path = os.path.normpath(dir_path)
×
343
        else:
344
            output_dir = 'output{0:03}'.format(self.counter)
3✔
345
            self.output_path = os.path.join(self.archive_path, output_dir)
3✔
346

347
        # TODO: check case counter == 0
348
        prior_output_dir = 'output{0:03}'.format(self.counter - 1)
3✔
349
        prior_output_path = os.path.join(self.archive_path, prior_output_dir)
3✔
350
        if os.path.exists(prior_output_path):
3✔
351
            self.prior_output_path = prior_output_path
3✔
352
        else:
353
            self.prior_output_path = None
3✔
354

355
        # Local restart paths
356
        restart_dir = 'restart{0:03}'.format(self.counter)
3✔
357
        self.restart_path = os.path.join(self.archive_path, restart_dir)
3✔
358

359
        # Prior restart path
360

361
        # Check if a user restart directory is avaiable
362
        user_restart_dir = self.config.get('restart')
3✔
363
        if (self.counter == 0 or self.repeat_run) and user_restart_dir:
3✔
364
            # TODO: Some user friendliness needed...
365
            assert (os.path.isdir(user_restart_dir))
3✔
366
            self.prior_restart_path = user_restart_dir
3✔
367
        else:
368
            prior_restart_dir = 'restart{0:03}'.format(self.counter - 1)
3✔
369
            prior_restart_path = os.path.join(self.archive_path,
3✔
370
                                              prior_restart_dir)
371
            if os.path.exists(prior_restart_path) and not self.repeat_run:
3✔
372
                self.prior_restart_path = prior_restart_path
3✔
373
            else:
374
                self.prior_restart_path = None
3✔
375
                if self.counter > 0 and not self.repeat_run:
3✔
376
                    # TODO: This warning should be replaced with an abort in
377
                    #       setup
378
                    print('payu: warning: No restart files found.')
×
379

380
        for model in self.models:
3✔
381
            model.set_model_output_paths()
3✔
382

383
    def build_model(self):
3✔
384

385
        self.load_modules()
×
386

387
        for model in self.models:
×
388
            model.get_codebase()
×
389

390
        for model in self.models:
×
391
            model.build_model()
×
392

393
    def check_payu_version(self):
3✔
394
        """Check current payu version is greater than minimum required
395
        payu version, if configured"""
396
        # TODO: Move this function to a setup file if setup is moved to
397
        # a separate file?
398
        minimum_version_fieldname = "payu_minimum_version"
3✔
399
        if minimum_version_fieldname not in self.config:
3✔
400
            # Skip version check
401
            return
3✔
402

403
        minimum_version = str(self.config[minimum_version_fieldname])
3✔
404
        try:
3✔
405
            # Attempt to parse the version
406
            parsed_minimum_version = version.parse(minimum_version)
3✔
407
        except version.InvalidVersion:
3✔
408
            raise ValueError(
3✔
409
                "Invalid version in configuration file (config.yaml) for "
410
                f"'{minimum_version_fieldname}': {minimum_version}"
411
            )
412

413
        # Get the current version of the package
414
        current_version = payu.__version__
3✔
415

416
        # Compare versions
417
        if version.parse(current_version) < parsed_minimum_version:
3✔
418
            raise RuntimeError(
3✔
419
                f"Payu version {current_version} does not meet the configured "
420
                f"minimum version. A version >= {minimum_version} is "
421
                "required to run this configuration."
422
            )
423

424
    def setup(self, force_archive=False):
3✔
425
        # Check version
426
        self.check_payu_version()
3✔
427

428
        # Confirm that no output path already exists
429
        if os.path.exists(self.output_path):
3✔
430
            sys.exit('payu: error: Output path already exists: '
×
431
                     '{path}.'.format(path=self.output_path))
432

433
        # Confirm that no work path already exists
434
        if os.path.exists(self.work_path):
3✔
435
            if self.force:
3✔
436
                print('payu: work path already exists.\n'
×
437
                      '      Sweeping as --force option is True.')
438
                self.sweep()
×
439
            else:
440
                sys.exit('payu: error: work path already exists: {path}.\n'
3✔
441
                         '             payu sweep and then payu run'
442
                         .format(path=self.work_path))
443

444
        mkdir_p(self.work_path)
3✔
445

446
        if force_archive:
3✔
447
            mkdir_p(self.archive_path)
×
448
            make_symlink(self.archive_path, self.archive_sym_path)
×
449

450
        # Archive the payu config
451
        # TODO: This just copies the existing config.yaml file, but we should
452
        #       reconstruct a new file including default values
453
        config_src = os.path.join(self.control_path, 'config.yaml')
3✔
454
        config_dst = os.path.join(self.work_path)
3✔
455
        shutil.copy(config_src, config_dst)
3✔
456

457
        # Stripe directory in Lustre
458
        # TODO: Make this more configurable
459
        do_stripe = self.config.get('stripedio', False)
3✔
460
        if do_stripe:
3✔
461
            cmd = 'lfs setstripe -c 8 -s 8m {0}'.format(self.work_path)
×
462
            sp.check_call(shlex.split(cmd))
×
463

464
        make_symlink(self.work_path, self.work_sym_path)
3✔
465

466
        # Set up executable paths - first search through paths added by modules
467
        self.setup_modules()
3✔
468
        for model in self.models:
3✔
469
            model.setup_executable_paths()
3✔
470

471
        # Set up all file manifests
472
        self.manifest.setup()
3✔
473

474
        for model in self.models:
3✔
475
            model.setup()
3✔
476

477
        # Call the macro-model setup
478
        if len(self.models) > 1:
3✔
479
            self.model.setup()
×
480

481
        self.manifest.check_manifests()
3✔
482

483
        # Copy manifests to work directory so they archived on completion
484
        manifest_path = os.path.join(self.work_path, 'manifests')
3✔
485
        self.manifest.copy_manifests(manifest_path)
3✔
486

487
        setup_script = self.userscripts.get('setup')
3✔
488
        if setup_script:
3✔
489
            self.run_userscript(setup_script)
×
490

491
        # Profiler setup
492
        expt_profs = self.config.get('profilers', [])
3✔
493
        if not isinstance(expt_profs, list):
3✔
494
            expt_profs = [expt_profs]
×
495

496
        for prof_name in expt_profs:
3✔
497
            ProfType = payu.profilers.index[prof_name]
×
498
            prof = ProfType(self)
×
499
            self.profilers.append(prof)
×
500

501
            # Testing
502
            prof.setup()
×
503

504
        # Check restart pruning for valid configuration values and
505
        # warns user if more restarts than expected would be pruned
506
        if self.archiving():
3✔
507
            self.get_restarts_to_prune()
3✔
508

509
    def run(self, *user_flags):
3✔
510
        self.load_modules()
×
511

512
        f_out = open(self.stdout_fname, 'w')
×
513
        f_err = open(self.stderr_fname, 'w')
×
514

515
        # Set MPI environment variables
516
        env = self.config.get('env')
×
517

518
        # Explicitly check for `None`, in case of an empty `env:` entry
519
        if env is None:
×
520
            env = {}
×
521

522
        for var in env:
×
523

524
            if env[var] is None:
×
525
                env_value = ''
×
526
            else:
527
                env_value = str(env[var])
×
528

529
            os.environ[var] = env_value
×
530

531
        mpi_config = self.config.get('mpi', {})
×
532
        mpi_runcmd = mpi_config.get('runcmd', 'mpirun')
×
533

534
        if self.config.get('scalasca', False):
×
535
            mpi_runcmd = ' '.join(['scalasca -analyze', mpi_runcmd])
×
536

537
        # MPI runtime flags
538
        mpi_flags = mpi_config.get('flags', [])
×
539
        if not mpi_flags:
×
540
            # DEPRECATED: confusing and a duplication of flags config
541
            if 'mpirun' in self.config:
×
542
                mpi_flags = self.config.get('mpirun')
×
543
                print('payu: warning: mpirun config option is deprecated.'
×
544
                      '  Use mpi: flags option instead')
545
            else:
546
                mpi_flags = []
×
547

548
        if not isinstance(mpi_flags, list):
×
549
            mpi_flags = [mpi_flags]
×
550

551
        # TODO: More uniform support needed here
552
        if self.config.get('scalasca', False):
×
553
            mpi_flags = ['\"{0}\"'.format(f) for f in mpi_flags]
×
554

555
        # XXX: I think this may be broken
556
        if user_flags:
×
557
            mpi_flags.extend(list(user_flags))
×
558

559
        if self.debug:
×
560
            mpi_flags.append('--debug')
×
561

562
        mpi_progs = []
×
563
        for model in self.models:
×
564

565
            # Skip models without executables (e.g. couplers)
566
            if not model.exec_path_local:
×
567
                continue
×
568

569
            mpi_config = self.config.get('mpi', {})
×
570
            mpi_module = mpi_config.get('module', None)
×
571

572
            # Update MPI library module (if not explicitly set)
573
            # TODO: Check for MPI library mismatch across multiple binaries
574
            if mpi_module is None and model.required_libs is not None:
×
575
                envmod.lib_update(
×
576
                    model.required_libs,
577
                    'libmpi.so'
578
                )
579

580
            model_prog = []
×
581

582
            wdir_arg = '-wdir'
×
583
            if self.config.get('scheduler') == 'slurm':
×
584
                # Option to set the working directory differs in slurm
585
                wdir_arg = '--chdir'
×
586
            model_prog.append(f'{wdir_arg} {model.work_path}')
×
587

588
            # Append any model-specific MPI flags
589
            model_flags = model.config.get('mpiflags', [])
×
590
            if not isinstance(model_flags, list):
×
591
                model_prog.append(model_flags)
×
592
            else:
593
                model_prog.extend(model_flags)
×
594

595
            model_ncpus = model.config.get('ncpus')
×
596
            if model_ncpus:
×
597
                if self.config.get('scheduler') == 'slurm':
×
598
                    model_prog.append('-n {0}'.format(model_ncpus))
×
599
                else:
600
                    # Default to preferred mpirun syntax
601
                    model_prog.append('-np {0}'.format(model_ncpus))
×
602

603
            model_npernode = model.config.get('npernode')
×
604
            # TODO: New Open MPI format?
605
            if model_npernode:
×
606
                if model_npernode % 2 == 0:
×
607
                    npernode_flag = ('-map-by ppr:{0}:socket'
×
608
                                     ''.format(model_npernode / 2))
609
                else:
610
                    npernode_flag = ('-map-by ppr:{0}:node'
×
611
                                     ''.format(model_npernode))
612

613
                if self.config.get('scalasca', False):
×
614
                    npernode_flag = '\"{0}\"'.format(npernode_flag)
×
615
                model_prog.append(npernode_flag)
×
616

617
            if self.config.get('hpctoolkit', False):
×
618
                os.environ['HPCRUN_EVENT_LIST'] = 'WALLCLOCK@5000'
×
619
                model_prog.append('hpcrun')
×
620

621
            for prof in self.profilers:
×
622
                if prof.runscript:
×
623
                    model_prog.append(prof.runscript)
×
624

625
            model_prog.append(model.exec_prefix)
×
626

627
            # Use the full path to symlinked exec_name in work as some
628
            # older MPI libraries complained executable was not in PATH
629
            model_prog.append(os.path.join(model.work_path, model.exec_name))
×
630

631
            mpi_progs.append(' '.join(model_prog))
×
632

633
        cmd = '{runcmd} {flags} {exes}'.format(
×
634
            runcmd=mpi_runcmd,
635
            flags=' '.join(mpi_flags),
636
            exes=' : '.join(mpi_progs)
637
        )
638

639
        for prof in self.profilers:
×
640
            cmd = prof.wrapper(cmd)
×
641

642
        # Expand shell variables inside flags
643
        if self.expand_shell_vars:
×
644
            cmd = os.path.expandvars(cmd)
×
645

646
        # TODO: Consider making this default
647
        if self.config.get('coredump', False):
×
648
            enable_core_dump()
×
649

650
        # Dump out environment
651
        with open(self.env_fname, 'w') as file:
×
652
            file.write(yaml.dump(dict(os.environ), default_flow_style=False))
×
653

654
        self.runlog.create_manifest()
×
655
        if self.runlog.enabled:
×
656
            self.runlog.commit()
×
657

658
        # NOTE: This may not be necessary, since env seems to be getting
659
        # correctly updated.  Need to look into this.
660
        print(cmd)
×
661
        if env:
×
662
            # TODO: Replace with mpirun -x flag inputs
663
            proc = sp.Popen(shlex.split(cmd), stdout=f_out, stderr=f_err,
×
664
                            env=os.environ.copy())
665
            proc.wait()
×
666
            rc = proc.returncode
×
667
        else:
668
            rc = sp.call(shlex.split(cmd), stdout=f_out, stderr=f_err)
×
669

670
        f_out.close()
×
671
        f_err.close()
×
672

673
        self.finish_time = datetime.datetime.now()
×
674

NEW
675
        scheduler_name = self.config.get('scheduler', DEFAULT_SCHEDULER_CONFIG)
×
NEW
676
        scheduler = scheduler_index[scheduler_name]()
×
677

NEW
678
        info = scheduler.get_job_info()
×
679

680
        if info is None:
×
681
            # Not being run under PBS, reverse engineer environment
682
            info = {
×
683
                'PAYU_PATH': os.path.dirname(self.payu_path)
684
            }
685

686
        # Add extra information to save to jobinfo
687
        info.update(
×
688
            {
689
                'PAYU_CONTROL_DIR': self.control_path,
690
                'PAYU_RUN_ID': self.run_id,
691
                'PAYU_CURRENT_RUN': self.counter,
692
                'PAYU_N_RUNS':  self.n_runs,
693
                'PAYU_JOB_STATUS': rc,
694
                'PAYU_START_TIME': self.start_time.isoformat(),
695
                'PAYU_FINISH_TIME': self.finish_time.isoformat(),
696
                'PAYU_WALLTIME': "{0} s".format(
697
                    (self.finish_time - self.start_time).total_seconds()
698
                ),
699
            }
700
        )
701

702
        # Dump job info
703
        with open(self.job_fname, 'w') as file:
×
704
            file.write(yaml.dump(info, default_flow_style=False))
×
705

706
        # Remove any empty output files (e.g. logs)
707
        for fname in os.listdir(self.work_path):
×
708
            fpath = os.path.join(self.work_path, fname)
×
709
            if os.path.getsize(fpath) == 0:
×
710
                os.remove(fpath)
×
711

712
        # Clean up any profiling output
713
        # TODO: Move after `rc` code check?
714
        for prof in self.profilers:
×
715
            prof.postprocess()
×
716

717
        # TODO: Need a model-specific cleanup method call here
718
        # NOTE: This does not appear to catch hanging jobs killed by PBS
719
        if rc != 0:
×
720
            # Backup logs for failed runs
721
            error_log_dir = os.path.join(self.archive_path, 'error_logs')
×
722
            mkdir_p(error_log_dir)
×
723

724
            # NOTE: This is only implemented for PBS scheduler
NEW
725
            job_id = scheduler.get_job_id(short=False)
×
726

NEW
727
            if job_id == '' or job_id is None:
×
728
                job_id = str(self.run_id)[:6]
×
729

730
            for fname in self.output_fnames:
×
731

732
                src = os.path.join(self.control_path, fname)
×
733

734
                stem, suffix = os.path.splitext(fname)
×
735
                dest = os.path.join(error_log_dir,
×
736
                                    ".".join((stem, job_id)) + suffix)
737

738
                print(src, dest)
×
739

740
                shutil.copyfile(src, dest)
×
741

742
            # Create the symlink to the logs if it does not exist
743
            make_symlink(self.archive_path, self.archive_sym_path)
×
744

745
            error_script = self.userscripts.get('error')
×
746
            if error_script:
×
747
                self.run_userscript(error_script)
×
748

749
            # Terminate payu
750
            sys.exit('payu: Model exited with error code {0}; aborting.'
×
751
                     ''.format(rc))
752

753
        # Decrement run counter on successful run
754
        stop_file_path = os.path.join(self.control_path, 'stop_run')
×
755
        if os.path.isfile(stop_file_path):
×
756
            assert os.stat(stop_file_path).st_size == 0
×
757
            os.remove(stop_file_path)
×
758
            print('payu: Stop file detected; terminating resubmission.')
×
759
            self.n_runs = 0
×
760
        else:
761
            self.n_runs -= 1
×
762

763
        # Move logs to archive (or delete if empty)
764
        for f in self.output_fnames:
×
765
            f_path = os.path.join(self.control_path, f)
×
766
            if os.path.getsize(f_path) == 0:
×
767
                os.remove(f_path)
×
768
            else:
769
                shutil.move(f_path, self.work_path)
×
770

771
        run_script = self.userscripts.get('run')
×
772
        if run_script:
×
773
            self.run_userscript(run_script)
×
774

775
    def archiving(self):
3✔
776
        """
777
        Determine whether to run archive step based on config.yaml settings.
778
        Default to True when archive settings are absent.
779
        """
780
        archive_config = self.config.get('archive', {})
3✔
781
        return archive_config.get('enable', True)
3✔
782

783
    def archive(self, force_prune_restarts=False):
3✔
784
        if not self.archiving():
×
785
            print('payu: not archiving due to config.yaml setting.')
×
786
            return
×
787

788
        # Check there is a work directory, otherwise bail
789
        if not os.path.exists(self.work_sym_path):
×
790
            sys.exit('payu: error: No work directory to archive.')
×
791

792
        mkdir_p(self.archive_path)
×
793
        make_symlink(self.archive_path, self.archive_sym_path)
×
794

795
        # Remove work symlink
796
        if os.path.islink(self.work_sym_path):
×
797
            os.remove(self.work_sym_path)
×
798

799
        mkdir_p(self.restart_path)
×
800

801
        for model in self.models:
×
802
            model.archive()
×
803

804
        # Postprocess the model suite
805
        if len(self.models) > 1:
×
806
            self.model.archive()
×
807

808
        # Double-check that the run path does not exist
809
        if os.path.exists(self.output_path):
×
810
            sys.exit('payu: error: Output path already exists.')
×
811

812
        movetree(self.work_path, self.output_path)
×
813

814
        # Remove any outdated restart files
815
        try:
×
816
            restarts_to_prune = self.get_restarts_to_prune(
×
817
                force=force_prune_restarts)
818
        except Exception as e:
×
819
            print(e)
×
820
            print("payu: error: Skipping pruning restarts")
×
821
            restarts_to_prune = []
×
822

823
        for restart in restarts_to_prune:
×
824
            restart_path = os.path.join(self.archive_path, restart)
×
825
            # Only delete real directories; ignore symbolic restart links
826
            if (os.path.isdir(restart_path) and
×
827
                    not os.path.islink(restart_path)):
828
                shutil.rmtree(restart_path)
×
829

830
        # Ensure dynamic library support for subsequent python calls
831
        ld_libpaths = os.environ.get('LD_LIBRARY_PATH', None)
×
832
        py_libpath = sysconfig.get_config_var('LIBDIR')
×
833
        if ld_libpaths is None:
×
834
            os.environ['LD_LIBRARY_PATH'] = py_libpath
×
835
        elif py_libpath not in ld_libpaths.split(':'):
×
836
            os.environ['LD_LIBRARY_PATH'] = f'{py_libpath}:{ld_libpaths}'
×
837

838
        collate_config = self.config.get('collate', {})
×
839
        collating = collate_config.get('enable', True)
×
840
        if collating:
×
841
            cmd = '{python} {payu} collate -i {expt}'.format(
×
842
                python=sys.executable,
843
                payu=self.payu_path,
844
                expt=self.counter
845
            )
846
            sp.check_call(shlex.split(cmd))
×
847

848
        if self.config.get('hpctoolkit', False):
×
849
            cmd = '{python} {payu} profile -i {expt}'.format(
×
850
                python=sys.executable,
851
                payu=self.payu_path,
852
                expt=self.counter
853
            )
854
            sp.check_call(shlex.split(cmd))
×
855

856
        archive_script = self.userscripts.get('archive')
×
857
        if archive_script:
×
858
            self.run_userscript(archive_script)
×
859

860
        # Ensure postprocessing runs if model not collating
861
        if not collating:
×
862
            self.postprocess()
×
863

864
    def collate(self):
3✔
865
        # Setup modules - load user-defined modules
866
        self.setup_modules()
×
867

868
        for model in self.models:
×
869
            model.collate()
×
870

871
    def profile(self):
3✔
872
        for model in self.models:
×
873
            model.profile()
×
874

875
    def postprocess(self):
3✔
876
        """Submit any postprocessing scripts or remote syncing if enabled"""
877

878
        # First submit postprocessing script
879
        if self.postscript:
×
880
            self.set_userscript_env_vars()
×
881
            envmod.setup()
×
882
            envmod.module('load', 'pbs')
×
883

884
            cmd = 'qsub {script}'.format(script=self.postscript)
×
885

886
            if needs_subprocess_shell(cmd):
×
887
                sp.check_call(cmd, shell=True)
×
888
            else:
889
                sp.check_call(shlex.split(cmd))
×
890

891
        # Submit a sync script if remote syncing is enabled
892
        sync_config = self.config.get('sync', {})
×
893
        syncing = sync_config.get('enable', False)
×
894
        if syncing:
×
895
            cmd = '{python} {payu} sync'.format(
×
896
                python=sys.executable,
897
                payu=self.payu_path
898
            )
899

900
            if self.postscript:
×
901
                print('payu: warning: postscript is configured, so by default '
×
902
                      'the lastest outputs will not be synced. To sync the '
903
                      'latest output, after the postscript job has completed '
904
                      'run:\n'
905
                      '    payu sync')
906
                cmd += f' --sync-ignore-last'
×
907

908
            sp.check_call(shlex.split(cmd))
×
909

910
    def sync(self):
3✔
911
        # RUN any user scripts before syncing archive
912
        envmod.setup()
×
913
        pre_sync_script = self.userscripts.get('sync')
×
914
        if pre_sync_script:
×
915
            self.run_userscript(pre_sync_script)
×
916

917
        # Run rsync commmands
918
        SyncToRemoteArchive(self).run()
×
919

920
    def resubmit(self):
3✔
921
        next_run = self.counter + 1
×
922
        cmd = '{python} {payu} run -i {start} -n {n}'.format(
×
923
            python=sys.executable,
924
            payu=self.payu_path,
925
            start=next_run,
926
            n=self.n_runs
927
        )
928
        cmd = shlex.split(cmd)
×
929
        sp.call(cmd)
×
930

931
    def set_userscript_env_vars(self):
3✔
932
        """Save information of output directories and current run to
933
        environment variables, so they can be accessed via user-scripts"""
934
        os.environ.update(
×
935
            {
936
                'PAYU_CURRENT_OUTPUT_DIR': self.output_path,
937
                'PAYU_CURRENT_RESTART_DIR': self.restart_path,
938
                'PAYU_ARCHIVE_DIR': self.archive_path,
939
                'PAYU_CURRENT_RUN': str(self.counter)
940
            }
941
        )
942

943
    def run_userscript(self, script_cmd: str):
3✔
944
        """Run a user defined script or subcommand at various stages of the
945
        payu submissions"""
946
        self.set_userscript_env_vars()
×
947
        run_script_command(script_cmd,
×
948
                           control_path=Path(self.control_path))
949

950
    def sweep(self, hard_sweep=False):
3✔
951
        # TODO: Fix the IO race conditions!
952

953
        # TODO: model outstreams and pbs logs need to be handled separately
954
        default_job_name = os.path.basename(os.getcwd())
3✔
955
        short_job_name = str(self.config.get('jobname', default_job_name))[:15]
3✔
956

957
        log_filenames = [short_job_name + '.o', short_job_name + '.e']
3✔
958
        for postfix in ['_c.o', '_c.e', '_p.o', '_p.e', '_s.o', '_s.e']:
3✔
959
            log_filenames.append(short_job_name[:13] + postfix)
3✔
960

961
        logs = [
3✔
962
            f for f in os.listdir(os.curdir) if os.path.isfile(f) and (
963
                f.startswith(tuple(log_filenames))
964
            )
965
        ]
966

967
        pbs_log_path = os.path.join(self.archive_path, 'pbs_logs')
3✔
968
        legacy_pbs_log_path = os.path.join(self.control_path, 'pbs_logs')
3✔
969

970
        if os.path.isdir(legacy_pbs_log_path):
3✔
971
            # TODO: New path may still exist!
972
            assert not os.path.isdir(pbs_log_path)
×
973
            print('payu: Moving pbs_logs to {0}'.format(pbs_log_path))
×
974
            shutil.move(legacy_pbs_log_path, pbs_log_path)
×
975
        else:
976
            mkdir_p(pbs_log_path)
3✔
977

978
        for f in logs:
3✔
979
            print('Moving log {0}'.format(f))
×
980
            shutil.move(f, os.path.join(pbs_log_path, f))
×
981

982
        if hard_sweep:
3✔
983
            if os.path.isdir(self.archive_path):
3✔
984
                print('Removing archive path {0}'.format(self.archive_path))
3✔
985
                cmd = 'rm -rf {0}'.format(self.archive_path)
3✔
986
                cmd = shlex.split(cmd)
3✔
987
                rc = sp.call(cmd)
3✔
988
                assert rc == 0
3✔
989

990
            if os.path.islink(self.archive_sym_path):
3✔
991
                print('Removing symlink {0}'.format(self.archive_sym_path))
×
992
                os.remove(self.archive_sym_path)
×
993

994
        # Remove stdout/err and yaml dumps
995
        for f in self.output_fnames:
3✔
996
            if os.path.isfile(f):
3✔
997
                os.remove(f)
×
998

999
        if os.path.isdir(self.work_path):
3✔
1000
            print('Removing work path {0}'.format(self.work_path))
3✔
1001
            cmd = 'rm -rf {0}'.format(self.work_path)
3✔
1002
            cmd = shlex.split(cmd)
3✔
1003
            rc = sp.call(cmd)
3✔
1004
            assert rc == 0
3✔
1005

1006
        if os.path.islink(self.work_sym_path):
3✔
1007
            print('Removing symlink {0}'.format(self.work_sym_path))
3✔
1008
            os.remove(self.work_sym_path)
3✔
1009

1010
    def get_restarts_to_prune(self,
3✔
1011
                              ignore_intermediate_restarts=False,
1012
                              force=False):
1013
        """Returns a list of restart directories that can be pruned"""
1014
        # Check if archive path exists
1015
        if not os.path.exists(self.archive_path):
3✔
1016
            return []
×
1017

1018
        # Sorted list of restart directories in archive
1019
        restarts = list_archive_dirs(archive_path=self.archive_path,
3✔
1020
                                     dir_type='restart')
1021
        restart_indices = {}
3✔
1022
        for restart in restarts:
3✔
1023
            restart_indices[restart] = int(restart.lstrip('restart'))
3✔
1024

1025
        # TODO: Previous logic was to prune all restarts if self.repeat_run
1026
        # Still need to figure out what should happen in this case
1027
        if self.repeat_run:
3✔
1028
            return [os.path.join(self.archive_path, restart)
×
1029
                    for restart in restarts]
1030

1031
        # Use restart_freq to decide what restarts to prune
1032
        restarts_to_prune = []
3✔
1033
        intermediate_restarts, previous_intermediate_restarts = [], []
3✔
1034
        restart_freq = self.config.get('restart_freq', default_restart_freq)
3✔
1035
        if isinstance(restart_freq, int):
3✔
1036
            # Using integer frequency to prune restarts
1037
            for restart, restart_index in restart_indices.items():
3✔
1038
                if not restart_index % restart_freq == 0:
3✔
1039
                    intermediate_restarts.append(restart)
3✔
1040
                else:
1041
                    # Add any intermediate restarts to restarts to prune
1042
                    restarts_to_prune.extend(intermediate_restarts)
3✔
1043
                    previous_intermediate_restarts = intermediate_restarts
3✔
1044
                    intermediate_restarts = []
3✔
1045
        else:
1046
            # Using date-based frequency to prune restarts
1047
            try:
3✔
1048
                date_offset = parse_date_offset(restart_freq)
3✔
1049
            except ValueError as e:
×
1050
                print('payu: error: Invalid configuration for restart_freq:',
×
1051
                      restart_freq)
1052
                raise
×
1053

1054
            next_dt = None
3✔
1055
            for restart in restarts:
3✔
1056
                # Use model-driver to parse restart directory for a datetime
1057
                restart_path = os.path.join(self.archive_path, restart)
3✔
1058
                try:
3✔
1059
                    restart_dt = self.model.get_restart_datetime(restart_path)
3✔
1060
                except NotImplementedError:
3✔
1061
                    print('payu: error: Date-based restart pruning is not '
×
1062
                          f'implemented for the {self.model.model_type} '
1063
                          'model. To use integer based restart pruning, '
1064
                          'set restart_freq to an integer value.')
1065
                    raise
×
1066
                except FileNotFoundError as e:
3✔
1067
                    print(f'payu: warning: Ignoring {restart} from date-based '
3✔
1068
                          f'restart pruning. Error: {e}')
1069
                    continue
3✔
1070
                except Exception:
×
1071
                    print('payu: error: Error parsing restart directory ',
×
1072
                          f'{restart} for a datetime to prune restarts.')
1073
                    raise
×
1074

1075
                if (next_dt is not None and restart_dt < next_dt):
3✔
1076
                    intermediate_restarts.append(restart)
3✔
1077
                else:
1078
                    # Keep the earliest datetime and use last kept datetime
1079
                    # as point of reference when adding the next time interval
1080
                    next_dt = date_offset.add_to_datetime(restart_dt)
3✔
1081

1082
                    # Add intermediate restarts to restarts to prune
1083
                    restarts_to_prune.extend(intermediate_restarts)
3✔
1084
                    previous_intermediate_restarts = intermediate_restarts
3✔
1085
                    intermediate_restarts = []
3✔
1086

1087
        if ignore_intermediate_restarts:
3✔
1088
            # Return all restarts that'll eventually be pruned
1089
            restarts_to_prune.extend(intermediate_restarts)
3✔
1090
            return restarts_to_prune
3✔
1091

1092
        if not force:
3✔
1093
            # check environment for --force-prune-restarts flag
1094
            force = os.environ.get('PAYU_FORCE_PRUNE_RESTARTS', False)
3✔
1095

1096
        # Flag to check whether more restarts than expected will be deleted
1097
        is_unexpected = restarts_to_prune != previous_intermediate_restarts
3✔
1098

1099
        # Restart_history override
1100
        restart_history = self.config.get('restart_history', None)
3✔
1101
        if restart_history is not None:
3✔
1102
            if not isinstance(restart_history, int):
3✔
1103
                raise ValueError("payu: error: restart_history is not an "
×
1104
                                 f"integer value: {restart_history}")
1105

1106
            if len(restarts) > 0:
3✔
1107
                max_index = restart_indices[restarts[-1]]
3✔
1108
                index_bound = max_index - restart_history
3✔
1109

1110
                # Keep restart_history latest restarts, in addition to the
1111
                # permanently saved restarts defined by restart_freq
1112
                restarts_to_prune.extend(intermediate_restarts)
3✔
1113
                restarts_to_prune = [res for res in restarts_to_prune
3✔
1114
                                     if restart_indices[res] <= index_bound]
1115

1116
                # Expect at most 1 restart to be pruned with restart_history
1117
                is_unexpected = len(restarts_to_prune) > 1
3✔
1118

1119
        # Log out warning if more restarts than expected will be deleted
1120
        if not force and is_unexpected:
3✔
1121
            config_info = (f'restart pruning frequency of {restart_freq}')
3✔
1122
            if restart_history:
3✔
1123
                config_info += f' and restart history of {restart_history}'
3✔
1124

1125
            print(f'payu: warning: Current {config_info} would result in '
3✔
1126
                  'following restarts being pruned: '
1127
                  f'{" ".join(restarts_to_prune)}\n'
1128
                  'If this is expected, use --force-prune-restarts flag at '
1129
                  'next run or archive (if running archive manually), e.g.:\n'
1130
                  '     payu run --force-prune-restarts, or\n'
1131
                  '     payu archive --force-prune-restarts\n'
1132
                  'Otherwise, no restarts will be pruned')
1133

1134
            # Return empty list to prevent restarts being pruned
1135
            restarts_to_prune = []
3✔
1136

1137
        return restarts_to_prune
3✔
1138

1139

1140
def enable_core_dump():
3✔
1141
    # Newer Intel compilers support 'FOR_DUMP_CORE_FILE' while most support
1142
    # 'decfort_dump_flag'.  Setting both for now, but there may be a more
1143
    # platform-independent way to support this.
1144

1145
    # Enable Fortran core dump
1146
    os.environ['FOR_DUMP_CORE_FILE'] = 'TRUE'
×
1147
    os.environ['decfort_dump_flag'] = 'TRUE'
×
1148

1149
    # Allow unlimited core dump file sizes
1150
    resource.setrlimit(resource.RLIMIT_CORE,
×
1151
                       (resource.RLIM_INFINITY, resource.RLIM_INFINITY))
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc