• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

payu-org / payu / 6489602581

12 Oct 2023 12:25AM UTC coverage: 45.573% (+2.8%) from 42.772%
6489602581

push

github

web-flow
Merge pull request #363 from jo-basevi/358-date-based-frequency

Add support for date-based restart frequency

111 of 147 new or added lines in 10 files covered. (75.51%)

2 existing lines in 1 file now uncovered.

1580 of 3467 relevant lines covered (45.57%)

1.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

29.8
/payu/models/fms.py
1
"""Driver interface to the FMS model framework.
2

3
:copyright: Copyright 2011 Marshall Ward, see AUTHORS for details
4
:license: Apache License, Version 2.0, see LICENSE for details
5
"""
6
from __future__ import print_function
3✔
7

8
from collections import defaultdict
3✔
9
from pathlib import Path
3✔
10
import multiprocessing
3✔
11
import os
3✔
12
import resource as res
3✔
13
import shlex
3✔
14
import shutil
3✔
15
import subprocess as sp
3✔
16
import sys
3✔
17
from itertools import count
3✔
18
import fnmatch
3✔
19

20
import cftime
3✔
21

22
from payu.models.model import Model
3✔
23
from payu import envmod
3✔
24
from payu.fsops import required_libs
3✔
25

26
# There is a limit on the number of command line arguments in a forked
27
# MPI process. This applies only to mppnccombine-fast. The limit is higher
28
# than this, but mppnccombine-fast is very slow with large numbers of files
29
MPI_FORK_MAX_FILE_LIMIT = 1000
3✔
30

31

32
def cmdthread(cmd, cwd):
3✔
33
    # This is run in a thread, so the GIL of python makes it sensible to
34
    # capture the output from each process and print it out at the end so
35
    # it doesn't get scrambled when collates are run in parallel
36
    output = ''
×
37
    returncode = None
×
38
    try:
×
39
        output = sp.check_output(shlex.split(cmd), cwd=cwd, stderr=sp.STDOUT)
×
40
    except sp.CalledProcessError as e:
×
41
        output = e.output
×
42
        returncode = e.returncode
×
43
    return returncode, output
×
44

45

46
def get_uncollated_files(dir):
3✔
47

48
    if not os.path.isdir(dir):
3✔
49
        return []
×
50

51
    # Generate collated file list and identify the first tile
52
    tile_fnames = [f for f in Path(dir).glob('*.nc.*')
3✔
53
                   if f.suffixes[-2] == '.nc' and
54
                   f.suffixes[-1][1:].isdigit()]
55

56
    # Sort numerically according to the number in the suffix and strip off
57
    # path information
58
    return [f.name for f
3✔
59
            in sorted(tile_fnames, key=lambda e: int(e.suffixes[-1][1:]))]
60

61
def fms_collate(model):
3✔
62
    """
63
    Collate model output using mppnccombine. This is broken out of the Fms class so that it can
64
    be used by other models
65

66
    Parameters
67
    ----------
68
    model: :py:class:`~payu.models.model`
69
        The payu model class or child thereof
70
    """
71
    
72
    # Set the stacksize to be unlimited
73
    res.setrlimit(res.RLIMIT_STACK, (res.RLIM_INFINITY, res.RLIM_INFINITY))
×
74

75
    collate_config = model.expt.config.get('collate', {})
×
76

77
    # The mpi flag implies using mppnccombine-fast
78
    mpi = collate_config.get('mpi', False)
×
79

80
    if mpi:
×
81
        # Must use envmod to be able to load mpi modules for collation
82
        envmod.setup()
×
83
        model.expt.load_modules()
×
84
        default_exe = 'mppnccombine-fast'
×
85
    else:
86
        default_exe = 'mppnccombine'
×
87

88
    # Locate the FMS collation tool
89
    # Check config for collate executable
90
    mppnc_path = collate_config.get('exe')
×
91
    if mppnc_path is None:
×
92
        for f in os.listdir(model.expt.lab.bin_path):
×
93
            if f == default_exe:
×
94
                mppnc_path = os.path.join(model.expt.lab.bin_path, f)
×
95
                break
×
96
    else:
97
        if not os.path.isabs(mppnc_path):
×
98
            mppnc_path = os.path.join(model.expt.lab.bin_path, mppnc_path)
×
99

100
    assert mppnc_path, 'No mppnccombine program found'
×
101

102
    # Check config for collate command line options
103
    collate_flags = collate_config.get('flags')
×
104
    if collate_flags is None:
×
105
        if mpi:
×
106
            collate_flags = '-r'
×
107
        else:
108
            collate_flags = '-n4 -z -m -r'
×
109

110
    if mpi:
×
111
        # The output file is the first argument after the flags
112
        # and mppnccombine-fast uses an explicit -o flag to specify
113
        # the output
114
        collate_flags = " ".join([collate_flags, '-o'])
×
115
        envmod.lib_update(required_libs(mppnc_path), 'libmpi.so')
×
116

117
    # Import list of collated files to ignore
118
    collate_ignore = collate_config.get('ignore')
×
119
    if collate_ignore is None:
×
120
        collate_ignore = []
×
121
    elif type(collate_ignore) != list:
×
122
        collate_ignore = [collate_ignore]
×
123

124
    # Generate collated file list and identify the first tile
125
    tile_fnames = {}
×
126
    fnames = get_uncollated_files(model.output_path)
×
127
    tile_fnames[model.output_path] = fnames
×
128

129
    print(tile_fnames)
×
130

131
    if (collate_config.get('restart', False) and
×
132
            model.prior_restart_path is not None):
133
        # Add uncollated restart files
134
        fnames = get_uncollated_files(model.prior_restart_path)
×
135
        tile_fnames[model.prior_restart_path] = fnames
×
136

137
    # mnc_tiles = defaultdict(list)
138
    mnc_tiles = defaultdict(defaultdict(list).copy)
×
139
    for t_dir in tile_fnames:
×
140
        for t_fname in tile_fnames[t_dir]:
×
141
            t_base, t_ext = os.path.splitext(t_fname)
×
142
            t_ext = t_ext.lstrip('.')
×
143

144
            # Skip any files listed in the ignore list
145
            if t_base in collate_ignore:
×
146
                continue
×
147

148
            mnc_tiles[t_dir][t_base].append(t_fname)
×
149

150
    # print(mnc_tiles)
151

152
    if mpi and collate_config.get('glob', True):
×
153
        for t_base in mnc_tiles:
×
154
            globstr = "{}.*".format(t_base)
×
155
            # Try an equivalent glob and check the same files are returned
156
            mnc_glob = fnmatch.filter(os.listdir(model.output_path),
×
157
                                      globstr)
158
            if mnc_tiles[t_base] == sorted(mnc_glob):
×
159
                mnc_tiles[t_base] = [globstr, ]
×
160
                print("Note: using globstr ({}) for collating {}"
×
161
                      .format(globstr, t_base))
162
            else:
163
                print("Warning: cannot use globstr {} to collate {}"
×
164
                      .format(globstr, t_base))
165
                if len(mnc_tiles[t_base]) > MPI_FORK_MAX_FILE_LIMIT:
×
166
                    print("Warning: large number of tiles: {} "
×
167
                          .format(len(mnc_tiles[t_base])))
168
                    print("Warning: collation will be slow and may fail")
×
169

170
    cpucount = int(collate_config.get('ncpus',
×
171
                   multiprocessing.cpu_count()))
172

173
    if mpi:
×
174
        # Default to one for mpi
175
        nprocesses = int(collate_config.get('threads', 1))
×
176
    else:
177
        nprocesses = int(collate_config.get('threads', cpucount))
×
178

179
    ncpusperprocess = int(cpucount/nprocesses)
×
180

181
    if ncpusperprocess == 1 and mpi:
×
182
        print("Warning: running collate with mpirun on a single processor")
×
183

184
    pool = multiprocessing.Pool(processes=nprocesses)
×
185

186
    # Collate each tileset into a single file
187
    results = []
×
188
    codes = []
×
189
    outputs = []
×
190
    for output_path in mnc_tiles:
×
191
        for nc_fname in mnc_tiles[output_path]:
×
192
            nc_path = os.path.join(output_path, nc_fname)
×
193

194
            # Remove the collated file if it already exists, since it is
195
            # probably from a failed collation attempt
196
            # TODO: Validate this somehow
197
            if os.path.isfile(nc_path):
×
198
                os.remove(nc_path)
×
199

200
            cmd = ' '.join([mppnc_path, collate_flags, nc_fname,
×
201
                            ' '.join(mnc_tiles[output_path][nc_fname])])
202
            if mpi:
×
203
                cmd = "mpirun -n {} {}".format(ncpusperprocess, cmd)
×
204

205
            print(cmd)
×
206
            results.append(
×
207
                pool.apply_async(cmdthread, args=(cmd, output_path)))
208

209
    pool.close()
×
210
    pool.join()
×
211

212
    for result in results:
×
213
        rc, op = result.get()
×
214
        codes.append(rc)
×
215
        outputs.append(op)
×
216

217
    # TODO: Categorise the return codes
218
    if any(rc is not None for rc in codes):
×
219
        for p, rc, op in zip(count(), codes, outputs):
×
220
            if rc is not None:
×
221
                print('payu: error: Thread {p} crashed with error code '
×
222
                      '{rc}.'.format(p=p, rc=rc), file=sys.stderr)
223
                print(' Error message:', file=sys.stderr)
×
224
                print(op.decode(), file=sys.stderr)
×
225
        sys.exit(-1)
×
226

227

228
class Fms(Model):
3✔
229

230
    def __init__(self, expt, name, config):
3✔
231
        
232
        # payu initialisation
233
        super().__init__(expt, name, config)
3✔
234

235
    def set_model_pathnames(self):
3✔
236

237
        super().set_model_pathnames()
3✔
238

239
        # Define local FMS directories
240
        self.work_restart_path = os.path.join(self.work_path, 'RESTART')
3✔
241
        self.work_input_path = os.path.join(self.work_path, 'INPUT')
3✔
242
        self.work_init_path = self.work_input_path
3✔
243

244
    def archive(self, **kwargs):
3✔
245
        super().archive()
×
246

247
        # Remove the 'INPUT' path
248
        shutil.rmtree(self.work_input_path, ignore_errors=True)
×
249

250
        # Archive restart files before processing model output
251
        if os.path.isdir(self.restart_path):
×
252
            os.rmdir(self.restart_path)
×
253

254
        shutil.move(self.work_restart_path, self.restart_path)
×
255

256
    def collate(self):
3✔
257
        fms_collate(self)
×
258

259
    def get_restart_datetime(self, restart_path):
3✔
260
        """Given a restart path, parse the restart files and
261
        return a cftime datetime (for date-based restart pruning)"""
262
        # Check for ocean_solo.res file
263
        ocean_solo_path = os.path.join(restart_path, 'ocean_solo.res')
3✔
264
        if not os.path.exists(ocean_solo_path):
3✔
NEW
265
            raise NotImplementedError(
×
266
                'Cannot find ocean_solo.res file, which is required for '
267
                'date-based restart pruning')
268

269
        with open(ocean_solo_path, 'r') as ocean_solo:
3✔
270
            lines = ocean_solo.readlines()
3✔
271

272
        calendar_int = int(lines[0].split()[0])
3✔
273
        cftime_calendars = {
3✔
274
            1: "360_day",
275
            2: "julian",
276
            3: "proleptic_gregorian",
277
            4: "noleap"
278
        }
279
        calendar = cftime_calendars[calendar_int]
3✔
280

281
        last_date_line = lines[-1].split()
3✔
282
        date_values = [int(i) for i in last_date_line[:6]]
3✔
283
        year, month, day, hour, minute, second = date_values
3✔
284
        return cftime.datetime(year, month, day, hour, minute, second,
3✔
285
                               calendar=calendar)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc