• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OGGM / oggm / 17036427554

18 Aug 2025 09:19AM UTC coverage: 84.387% (-0.03%) from 84.412%
17036427554

Pull #1795

github

web-flow
Merge f58c0836a into 0793d4813
Pull Request #1795: add new observation handling

208 of 217 new or added lines in 7 files covered. (95.85%)

28 existing lines in 2 files now uncovered.

12404 of 14699 relevant lines covered (84.39%)

3.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.45
/oggm/workflow.py
1
"""Wrappers for the single tasks, multi processor handling."""
2
# Built ins
3
import logging
9✔
4
import os
9✔
5
import shutil
9✔
6
from collections.abc import Sequence
9✔
7
# External libs
8
import multiprocessing
9✔
9
import numpy as np
9✔
10
import pandas as pd
9✔
11
import xarray as xr
9✔
12
from scipy import optimize as optimization
9✔
13

14
# Locals
15
import oggm
9✔
16
from oggm import cfg, tasks, utils
9✔
17
from oggm.core import centerlines, flowline, climate, gis
9✔
18
from oggm.exceptions import InvalidParamsError, InvalidWorkflowError
9✔
19
from oggm.utils import global_task
9✔
20

21
# MPI
22
try:
9✔
23
    import oggm.mpi as ogmpi
9✔
24
    _have_ogmpi = True
×
25
except ImportError:
26
    _have_ogmpi = False
27

28
# Module logger
29
log = logging.getLogger(__name__)
9✔
30

31
# Multiprocessing Pool
32
_mp_manager = None
9✔
33
_mp_pool = None
9✔
34

35

36
def _init_pool_globals(_cfg_contents, global_lock):
9✔
37
    cfg.unpack_config(_cfg_contents)
×
38
    utils.lock = global_lock
×
39

40

41
def init_mp_pool(reset=False):
9✔
42
    """Necessary because at import time, cfg might be uninitialized"""
43
    global _mp_manager, _mp_pool
44
    if _mp_pool and _mp_manager and not reset:
1✔
45
        return _mp_pool
1✔
46

47
    cfg.CONFIG_MODIFIED = False
1✔
48
    if _mp_pool:
1✔
49
        _mp_pool.terminate()
1✔
50
        _mp_pool = None
1✔
51
    if _mp_manager:
1✔
52
        cfg.set_manager(None)
1✔
53
        _mp_manager.shutdown()
1✔
54
        _mp_manager = None
1✔
55

56
    if cfg.PARAMS['use_mp_spawn']:
1✔
57
        mp = multiprocessing.get_context('spawn')
×
58
    else:
59
        mp = multiprocessing
1✔
60

61
    _mp_manager = mp.Manager()
1✔
62

63
    cfg.set_manager(_mp_manager)
1✔
64
    cfg_contents = cfg.pack_config()
1✔
65

66
    global_lock = _mp_manager.Lock()
1✔
67

68
    mpp = cfg.PARAMS['mp_processes']
1✔
69
    _mp_pool = mp.Pool(mpp, initializer=_init_pool_globals,
1✔
70
                       initargs=(cfg_contents, global_lock))
71
    return _mp_pool
1✔
72

73

74
def _merge_dicts(*dicts):
9✔
75
    r = {}
7✔
76
    for d in dicts:
7✔
77
        r.update(d)
7✔
78
    return r
7✔
79

80

81
class _pickle_copier(object):
9✔
82
    """Pickleable alternative to functools.partial,
83
    Which is not pickleable in python2 and thus doesn't work
84
    with Multiprocessing."""
85

86
    def __init__(self, func, kwargs):
9✔
87
        self.call_func = func
7✔
88
        self.out_kwargs = kwargs
7✔
89

90
    def _call_internal(self, call_func, gdir, kwargs):
9✔
91
        # If the function is None, assume gdir is tuple with task function
92
        if not call_func:
7✔
93
            call_func, gdir = gdir
×
94

95
        # Merge main kwargs with per-task kwargs
96
        kwargs = _merge_dicts(self.out_kwargs, kwargs)
7✔
97

98
        # If gdir is a sequence, again assume it's a tuple with per-gdir kwargs.
99
        if isinstance(gdir, Sequence) and not isinstance(gdir, str):
7✔
100
            gdir, gdir_kwargs = gdir
1✔
101
            kwargs.update(gdir_kwargs)
1✔
102

103
        return call_func(gdir, **kwargs)
7✔
104

105
    def __call__(self, arg):
9✔
106
        res = None
7✔
107
        for func in self.call_func:
7✔
108
            func, kwargs = func
7✔
109
            res = self._call_internal(func, arg, kwargs)
7✔
110
        return res
7✔
111

112

113
def reset_multiprocessing():
9✔
114
    """Reset multiprocessing state
115

116
    Call this if you changed configuration parameters mid-run and need them to
117
    be re-propagated to child processes.
118
    """
119
    global _mp_pool
120
    if _mp_pool:
9✔
121
        _mp_pool.terminate()
1✔
122
        _mp_pool = None
1✔
123
    cfg.CONFIG_MODIFIED = False
9✔
124

125

126
def execute_entity_task(task, gdirs, **kwargs):
9✔
127
    """Execute a task on gdirs.
128

129
    If you asked for multiprocessing, it will do it.
130

131
    If ``task`` has more arguments than `gdir` they have to be keyword
132
    arguments.
133

134
    Parameters
135
    ----------
136
    task : function or sequence of functions
137
         The entity task(s) to apply.
138
         Can be None, in which case each gdir is expected to be a tuple of (task, gdir).
139
         When passing a sequence, each item can also optionally be a tuple of (task, dictionary).
140
         In this case the dictionary items will be passed to the task as kwargs.
141
    gdirs : list of :py:class:`oggm.GlacierDirectory` objects
142
        The glacier directories to process.
143
        Each individual gdir can optionally be a tuple of (gdir, dictionary).
144
        In this case, the values in the dictionary will be passed to the task as
145
        keyword arguments for that specific gdir.
146

147
    Returns
148
    -------
149
    List of results from task. Last task if a list of tasks was given.
150
    """
151

152
    # Normalize task into list of tuples for simplicity
153
    if not isinstance(task, Sequence):
7✔
154
        task = [task]
7✔
155
    tasks = []
7✔
156
    for t in task:
7✔
157
        if isinstance(t, tuple):
7✔
158
            tasks.append(t)
1✔
159
        else:
160
            tasks.append((t, {}))
7✔
161

162
    # Reject global tasks
163
    for t in tasks:
7✔
164
        if t[0].__dict__.get('is_global_task', False):
7✔
165
            raise InvalidWorkflowError('execute_entity_task cannot be used on '
×
166
                                       'global tasks.')
167

168
    # Should be iterable
169
    gdirs = utils.tolist(gdirs)
7✔
170
    ng = len(gdirs)
7✔
171
    if ng == 0:
7✔
172
        log.workflow('Called execute_entity_task on 0 glaciers. Returning...')
1✔
173
        return
1✔
174

175
    log.workflow('Execute entity tasks [%s] on %d glaciers',
7✔
176
                 ', '.join([t[0].__name__ for t in tasks]), ng)
177

178
    pc = _pickle_copier(tasks, kwargs)
7✔
179

180
    if _have_ogmpi:
7✔
181
        if ogmpi.OGGM_MPI_COMM is not None:
×
182
            return ogmpi.mpi_master_spin_tasks(pc, gdirs)
×
183

184
    if cfg.PARAMS['use_multiprocessing'] and ng > 1:
7✔
185
        mppool = init_mp_pool(cfg.CONFIG_MODIFIED)
1✔
186
        out = mppool.map(pc, gdirs, chunksize=1)
1✔
187
    else:
188
        if ng > 3:
7✔
189
            log.workflow('WARNING: you are trying to run an entity task on '
4✔
190
                         '%d glaciers with multiprocessing turned off. OGGM '
191
                         'will run faster with multiprocessing turned on.', ng)
192
        out = [pc(gdir) for gdir in gdirs]
7✔
193

194
    return out
7✔
195

196

197
def execute_parallel_tasks(gdir, tasks):
9✔
198
    """Execute a list of task on a single gdir (experimental!).
199

200
    This is useful when running a non-sequential list of task on a gdir,
201
    mostly for e.g. different experiments with different output files.
202

203
    Parameters
204
    ----------
205
    gdir : :py:class:`oggm.GlacierDirectory`
206
         the directory to process.
207
    tasks : list
208
         the the list of entity tasks to apply.
209
         Optionally, each list element can be a tuple, with the first element
210
         being the task, and the second element a dict that
211
         will be passed to the task function as ``**kwargs``.
212
    """
213

214
    pc = _pickle_copier(None, {})
1✔
215

216
    _tasks = []
1✔
217
    for task in tasks:
1✔
218
        kwargs = {}
1✔
219
        if isinstance(task, Sequence):
1✔
220
            task, kwargs = task
1✔
221
        _tasks.append((task, (gdir, kwargs)))
1✔
222

223
    if _have_ogmpi:
1✔
224
        if ogmpi.OGGM_MPI_COMM is not None:
×
225
            ogmpi.mpi_master_spin_tasks(pc, _tasks)
×
226
            return
×
227

228
    if cfg.PARAMS['use_multiprocessing']:
1✔
229
        mppool = init_mp_pool(cfg.CONFIG_MODIFIED)
×
230
        mppool.map(pc, _tasks, chunksize=1)
×
231
    else:
232
        for task, (gd, kw) in _tasks:
1✔
233
            task(gd, **kw)
1✔
234

235

236
def gdir_from_prepro(entity, from_prepro_level=None,
9✔
237
                     prepro_border=None, prepro_rgi_version=None,
238
                     base_url=None):
239

240
    if prepro_border is None:
4✔
241
        prepro_border = int(cfg.PARAMS['border'])
1✔
242
    if prepro_rgi_version is None:
4✔
243
        prepro_rgi_version = cfg.PARAMS['rgi_version']
3✔
244

245
    if isinstance(entity, pd.Series):
4✔
246
        try:
1✔
247
            rid = entity.RGIId
1✔
248
        except AttributeError:
×
249
            rid = entity.rgi_id
×
250
    else:
251
        rid = entity
4✔
252

253
    tar_base = utils.get_prepro_gdir(prepro_rgi_version, rid, prepro_border,
4✔
254
                                     from_prepro_level, base_url=base_url)
255
    from_tar = os.path.join(tar_base.replace('.tar', ''), rid + '.tar.gz')
4✔
256
    return oggm.GlacierDirectory(entity, from_tar=from_tar)
4✔
257

258

259
def gdir_from_tar(entity, from_tar):
9✔
260

261
    try:
1✔
262
        rgi_id = entity.RGIId
1✔
263
    except AttributeError:
×
264
        rgi_id = entity
×
265

266
    from_tar = os.path.join(from_tar, '{}'.format(rgi_id[:8]),
1✔
267
                            '{}.tar' .format(rgi_id[:11]))
268
    assert os.path.exists(from_tar), 'tarfile does not exist'
1✔
269
    from_tar = os.path.join(from_tar.replace('.tar', ''), rgi_id + '.tar.gz')
1✔
270
    return oggm.GlacierDirectory(entity, from_tar=from_tar)
1✔
271

272

273
def _check_rgi_input(rgidf=None, err_on_lvl2=False):
9✔
274
    """Complain if the input has duplicates."""
275

276
    if rgidf is None:
7✔
277
        return
×
278

279
    msg = ('You have glaciers with connectivity level 2 in your list. '
7✔
280
           'OGGM does not provide pre-processed directories for these.')
281

282
    # Check if dataframe or list of strs
283
    is_dataframe = isinstance(rgidf, pd.DataFrame)
7✔
284
    if is_dataframe:
7✔
285
        try:
6✔
286
            rgi_ids = rgidf.RGIId
6✔
287
            # if dataframe we can also check for connectivity
288
            if 'Connect' in rgidf and np.any(rgidf['Connect'] == 2):
6✔
289
                if err_on_lvl2:
×
290
                    raise RuntimeError(msg)
×
291
        except AttributeError:
2✔
292
            # RGI7
293
            rgi_ids = rgidf.rgi_id
2✔
294
    else:
295
        rgi_ids = utils.tolist(rgidf)
5✔
296
        # Check for Connectivity level 2 here as well
297
        not_good_ids = pd.read_csv(utils.get_demo_file('rgi6_ids_conn_lvl2.csv'),
5✔
298
                                   index_col=0)
299
        try:
5✔
300
            if err_on_lvl2 and len(not_good_ids.loc[rgi_ids]) > 0:
5✔
301
                raise RuntimeError(msg)
×
302
        except KeyError:
4✔
303
            # Were good
304
            pass
4✔
305

306
    u, c = np.unique(rgi_ids, return_counts=True)
7✔
307
    if len(u) < len(rgi_ids):
7✔
308
        raise InvalidWorkflowError('Found duplicates in the list of '
1✔
309
                                   'RGI IDs: {}'.format(u[c > 1]))
310

311

312
def _isdir(path):
9✔
313
    """os.path.isdir, returning False instead of an error on non-string/path-like objects
314
    """
315
    try:
6✔
316
        return os.path.isdir(path)
6✔
317
    except TypeError:
×
318
        return False
×
319

320

321
def init_glacier_directories(rgidf=None, *, reset=False, force=False,
9✔
322
                             from_prepro_level=None, prepro_border=None,
323
                             prepro_rgi_version=None, prepro_base_url=None,
324
                             from_tar=False, delete_tar=False):
325
    """Initializes the list of Glacier Directories for this run.
326

327
    This is the very first task to do (always). If the directories are already
328
    available in the working directory, use them. If not, create new ones.
329

330
    **Careful**: when starting from a pre-processed directory with
331
    `from_prepro_level` or `from_tar`, the existing directories will be overwritten!
332

333
    Parameters
334
    ----------
335
    rgidf : GeoDataFrame or list of ids, optional for pre-computed runs
336
        the RGI glacier outlines. If unavailable, OGGM will parse the
337
        information from the glacier directories found in the working
338
        directory. It is required for new runs.
339
    reset : bool
340
        delete the existing glacier directories if found.
341
    force : bool
342
        setting `reset=True` will trigger a yes/no question to the user. Set
343
        `force=True` to avoid this.
344
    from_prepro_level : int
345
        get the gdir data from the official pre-processed pool. If this
346
        argument is set, the existing directories will be overwritten!
347
    prepro_border : int
348
        for `from_prepro_level` only: if you want to override the default
349
        behavior which is to use `cfg.PARAMS['border']`
350
    prepro_rgi_version : str
351
        for `from_prepro_level` only: if you want to override the default
352
        behavior which is to use `cfg.PARAMS['rgi_version']`
353
    prepro_base_url : str
354
        for `from_prepro_level` only: the preprocessed directory url from
355
        which to download the directories (became mandatory in OGGM v1.6)
356
    from_tar : bool or str, default=False
357
        extract the gdir data from a tar file. If set to `True`,
358
        will check for a tar file at the expected location in `base_dir`.
359
        delete the original tar file after extraction. If this
360
        argument is set, the existing directories will be overwritten!
361

362
    Returns
363
    -------
364
    gdirs : list of :py:class:`oggm.GlacierDirectory` objects
365
        the initialised glacier directories
366
    """
367

368
    _check_rgi_input(rgidf, err_on_lvl2=from_prepro_level)
7✔
369

370
    if reset and not force:
7✔
371
        reset = utils.query_yes_no('Delete all glacier directories?')
×
372

373
    if from_prepro_level:
7✔
374
        url = utils.get_prepro_base_url(base_url=prepro_base_url,
4✔
375
                                        border=prepro_border,
376
                                        prepro_level=from_prepro_level,
377
                                        rgi_version=prepro_rgi_version)
378
        if cfg.PARAMS['has_internet'] and not utils.url_exists(url):
4✔
379
            raise InvalidParamsError("base url seems unreachable with these "
×
380
                                     "parameters: {}".format(url))
381
        if ('oggm_v1.4' in url and
4✔
382
                from_prepro_level >= 3 and
383
                not cfg.PARAMS['prcp_fac']):
384
            log.warning('You seem to be using v1.4 directories with a more '
1✔
385
                        'recent version of OGGM. While this is possible, be '
386
                        'aware that some defaults parameters have changed. '
387
                        'See the documentation for details: '
388
                        'http://docs.oggm.org/en/stable/whats-new.html')
389

390
    # if reset delete also the log directory
391
    if reset:
7✔
392
        fpath = os.path.join(cfg.PATHS['working_dir'], 'log')
1✔
393
        if os.path.exists(fpath):
1✔
394
            shutil.rmtree(fpath)
×
395

396
    if rgidf is None:
7✔
397
        # Infer the glacier directories from folders available in working dir
398
        if reset:
×
399
            raise ValueError('Cannot use reset without setting rgidf')
×
400
        log.workflow('init_glacier_directories by parsing all available '
×
401
                     'folders (this takes time: if possible, provide rgidf '
402
                     'instead).')
403
        # The dirs should be there already
404
        gl_dir = os.path.join(cfg.PATHS['working_dir'], 'per_glacier')
×
405
        gdirs = []
×
406
        for root, _, files in os.walk(gl_dir):
×
407
            if files and ('outlines.shp' in files or
×
408
                          'outlines.tar.gz' in files):
409
                gdirs.append(oggm.GlacierDirectory(os.path.basename(root)))
×
410
    else:
411
        # Create glacier directories from input
412
        # Check if dataframe or list of str
413
        try:
7✔
414
            entities = []
7✔
415
            for _, entity in rgidf.iterrows():
7✔
416
                entities.append(entity)
6✔
417
        except AttributeError:
4✔
418
            entities = utils.tolist(rgidf)
4✔
419

420
        if from_prepro_level is not None:
7✔
421
            log.workflow('init_glacier_directories from prepro level {} on '
4✔
422
                         '{} glaciers.'.format(from_prepro_level,
423
                                               len(entities)))
424
            # Read the hash dictionary before we use multiproc
425
            if cfg.PARAMS['dl_verify']:
4✔
426
                utils.get_dl_verify_data('cluster.klima.uni-bremen.de')
1✔
427
            gdirs = execute_entity_task(gdir_from_prepro, entities,
4✔
428
                                        from_prepro_level=from_prepro_level,
429
                                        prepro_border=prepro_border,
430
                                        prepro_rgi_version=prepro_rgi_version,
431
                                        base_url=prepro_base_url)
432
        else:
433
            # We can set the intersects file automatically here
434
            if (cfg.PARAMS['use_intersects'] and
6✔
435
                    len(cfg.PARAMS['intersects_gdf']) == 0 and
436
                    not from_tar):
437
                try:
2✔
438
                    rgi_ids = np.unique(np.sort([entity.rgi_id for entity in
2✔
439
                                                 entities]))
440
                    if len(rgi_ids[0]) == 23:
2✔
441
                        # RGI7
442
                        assert rgi_ids[0].split('-')[1] == 'v7.0'
2✔
443
                        if rgi_ids[0].split('-')[2] == 'C':
2✔
444
                            # No need for interstects
445
                            fp = []
2✔
446
                            rgi_version = '70C'
2✔
447
                        else:
448
                            rgi_version = '70G'
×
449
                            fp = utils.get_rgi_intersects_entities(rgi_ids,
×
450
                                                                   version=rgi_version)
451

452
                    else:
453
                        rgi_version = rgi_ids[0].split('-')[0][-2:]
×
454
                        if rgi_version == '60':
×
455
                            rgi_version = '62'
×
456
                        fp = utils.get_rgi_intersects_entities(rgi_ids,
×
457
                                                               version=rgi_version)
458
                    cfg.set_intersects_db(fp)
2✔
459
                except AttributeError:
×
460
                    # RGI V6
461
                    try:
×
462
                        rgi_ids = np.unique(np.sort([entity.RGIId for entity in
×
463
                                                     entities]))
464
                        rgi_version = rgi_ids[0].split('-')[0][-2:]
×
465
                        if rgi_version == '60':
×
466
                            rgi_version = '62'
×
467
                        fp = utils.get_rgi_intersects_entities(rgi_ids,
×
468
                                                               version=rgi_version)
469
                        cfg.set_intersects_db(fp)
×
470
                    except AttributeError:
×
471
                        # List of str
472
                        pass
×
473

474
            if _isdir(from_tar):
6✔
475
                gdirs = execute_entity_task(gdir_from_tar, entities,
1✔
476
                                            from_tar=from_tar)
477
            else:
478
                gdirs = execute_entity_task(utils.GlacierDirectory, entities,
6✔
479
                                            reset=reset,
480
                                            from_tar=from_tar,
481
                                            delete_tar=delete_tar)
482

483
    return gdirs
7✔
484

485

486
@global_task(log)
9✔
487
def gis_prepro_tasks(gdirs):
9✔
488
    """Run all flowline preprocessing tasks on a list of glaciers.
489

490
    Parameters
491
    ----------
492
    gdirs : list of :py:class:`oggm.GlacierDirectory` objects
493
        the glacier directories to process
494
    """
495

496
    task_list = [
4✔
497
        tasks.define_glacier_region,
498
        tasks.glacier_masks,
499
        tasks.compute_centerlines,
500
        tasks.initialize_flowlines,
501
        tasks.compute_downstream_line,
502
        tasks.compute_downstream_bedshape,
503
        tasks.catchment_area,
504
        tasks.catchment_intersections,
505
        tasks.catchment_width_geom,
506
        tasks.catchment_width_correction
507
    ]
508
    for task in task_list:
4✔
509
        execute_entity_task(task, gdirs)
4✔
510

511

512
@global_task(log)
9✔
513
def climate_tasks(gdirs, settings_filesuffix='', input_filesuffix=None,
9✔
514
                  overwrite_gdir=False, override_missing=None):
515
    """Run all climate related entity tasks on a list of glaciers.
516
    Parameters
517
    ----------
518
    gdirs : list of :py:class:`oggm.GlacierDirectory` objects
519
        the glacier directories to process
520
    input_filesuffix: str
521
        the filesuffix of the input inversion flowlines which should be used
522
        (useful for conducting multiple experiments in the same gdir)
523
    """
524

525
    # Process climate data
526
    execute_entity_task(tasks.process_climate_data, gdirs,
2✔
527
                        settings_filesuffix=settings_filesuffix)
528
    # mass balance and the apparent mass balance
529
    execute_entity_task(tasks.mb_calibration_from_hugonnet_mb, gdirs,
2✔
530
                        settings_filesuffix=settings_filesuffix,
531
                        override_missing=override_missing,
532
                        overwrite_gdir=overwrite_gdir)
533
    execute_entity_task(tasks.apparent_mb_from_any_mb, gdirs,
2✔
534
                        settings_filesuffix=settings_filesuffix,
535
                        input_filesuffix=input_filesuffix,)
536

537

538
@global_task(log)
9✔
539
def inversion_tasks(gdirs, settings_filesuffix='', input_filesuffix=None,
9✔
540
                    output_filesuffix=None,
541
                    glen_a=None, fs=None, filter_inversion_output=True,
542
                    add_to_log_file=True):
543
    """Run all ice thickness inversion tasks on a list of glaciers.
544

545
    Quite useful to deal with calving glaciers as well.
546

547
    Parameters
548
    ----------
549
    gdirs : list of :py:class:`oggm.GlacierDirectory` objects
550
        the glacier directories to process
551
    settings_filesuffix: str
552
        You can use a different set of settings by providing a filesuffix. This
553
        is useful for sensitivity experiments.
554
    input_filesuffix: str
555
        The filesuffix of the input inversion flowlines. If None the
556
        settings_filesuffix will be used.
557
    output_filesuffix: str
558
        The filesuffix used for saving resulting inversion files to the gdir. If
559
        None the settings_filesuffix will be used.
560
    add_to_log_file : bool
561
        if the called entity tasks should write into log of gdir. Default True
562
    """
563

564
    if input_filesuffix is None:
6✔
565
        input_filesuffix = settings_filesuffix
4✔
566

567
    if output_filesuffix is None:
6✔
568
        output_filesuffix = settings_filesuffix
4✔
569

570
    # We use the settings of the first gdir for defining general parameters
571
    gdirs[0].settings_filesuffix = settings_filesuffix
6✔
572

573
    if gdirs[0].settings['use_kcalving_for_inversion']:
6✔
574
        # Differentiate between calving and non-calving glaciers
575
        gdirs_nc = []
3✔
576
        gdirs_c = []
3✔
577
        for gd in gdirs:
3✔
578
            if gd.is_tidewater:
3✔
579
                gdirs_c.append(gd)
3✔
580
            else:
581
                gdirs_nc.append(gd)
2✔
582

583
        log.workflow('Starting inversion tasks for {} tidewater and {} '
3✔
584
                     'non-tidewater glaciers.'.format(len(gdirs_c),
585
                                                      len(gdirs_nc)))
586

587
        if gdirs_nc:
3✔
588
            execute_entity_task(tasks.prepare_for_inversion, gdirs_nc,
2✔
589
                                settings_filesuffix=settings_filesuffix,
590
                                # only use input_filesuffix for first task as
591
                                # subsequent task use the results of previous
592
                                # tasks
593
                                input_filesuffix=input_filesuffix,
594
                                output_filesuffix=output_filesuffix,
595
                                add_to_log_file=add_to_log_file)
596
            execute_entity_task(tasks.mass_conservation_inversion, gdirs_nc,
2✔
597
                                settings_filesuffix=settings_filesuffix,
598
                                input_filesuffix=output_filesuffix,
599
                                output_filesuffix=output_filesuffix,
600
                                glen_a=glen_a, fs=fs,
601
                                add_to_log_file=add_to_log_file)
602
            if filter_inversion_output:
2✔
603
                execute_entity_task(tasks.filter_inversion_output, gdirs_nc,
2✔
604
                                    settings_filesuffix=settings_filesuffix,
605
                                    input_filesuffix=output_filesuffix,
606
                                    output_filesuffix=output_filesuffix,
607
                                    add_to_log_file=add_to_log_file)
608

609
        if gdirs_c:
3✔
610
            execute_entity_task(tasks.find_inversion_calving_from_any_mb,
3✔
611
                                gdirs_c,
612
                                settings_filesuffix=settings_filesuffix,
613
                                input_filesuffix=output_filesuffix,
614
                                output_filesuffix=output_filesuffix,
615
                                glen_a=glen_a, fs=fs,
616
                                add_to_log_file=add_to_log_file)
617
    else:
618
        execute_entity_task(tasks.prepare_for_inversion, gdirs,
6✔
619
                            settings_filesuffix=settings_filesuffix,
620
                            # only use input_filesuffix for first task as
621
                            # subsequent task use the results of previous
622
                            # tasks
623
                            input_filesuffix=input_filesuffix,
624
                            output_filesuffix=output_filesuffix,
625
                            add_to_log_file=add_to_log_file)
626
        execute_entity_task(tasks.mass_conservation_inversion, gdirs,
6✔
627
                            settings_filesuffix=settings_filesuffix,
628
                            input_filesuffix=output_filesuffix,
629
                            output_filesuffix=output_filesuffix,
630
                            glen_a=glen_a, fs=fs,
631
                            add_to_log_file=add_to_log_file)
632
        if filter_inversion_output:
6✔
633
            execute_entity_task(tasks.filter_inversion_output, gdirs,
6✔
634
                                settings_filesuffix=settings_filesuffix,
635
                                input_filesuffix=output_filesuffix,
636
                                output_filesuffix=output_filesuffix,
637
                                add_to_log_file=add_to_log_file)
638

639

640
@global_task(log)
9✔
641
def calibrate_inversion_from_volume(gdirs, settings_filesuffix='',
9✔
642
                                    observations_filesuffix='',
643
                                    overwrite_observations=False,
644
                                    ref_volume_m3=None,
645
                                    ref_volume_year=None,
646
                                    rgi_ids_in_ref_volume=None,
647
                                    input_filesuffix=None,
648
                                    output_filesuffix=None,
649
                                    fs=0, a_bounds=(0.1, 10),
650
                                    apply_fs_on_mismatch=False,
651
                                    error_on_mismatch=True,
652
                                    filter_inversion_output=True,
653
                                    add_to_log_file=True):
654
    """Fit the total volume of the glaciers to the provided estimate.
655

656
    This method finds the "best Glen A" to match all glaciers in gdirs with
657
    a valid inverted volume.
658

659
    Parameters
660
    ----------
661
    gdirs : list of :py:class:`oggm.GlacierDirectory` objects
662
        the glacier directories to process
663
    settings_filesuffix: str
664
        You can use a different set of settings by providing a filesuffix. This
665
        is useful for sensitivity experiments.
666
    observations_filesuffix: str
667
        You can provide a filesuffix for the reference volume to use. If you
668
        provide ref_volume_m3, then this values will be stored in the
669
        observations file, if ref_volume_m3 is not already present. If you want
670
        to force to use the provided values and override the current ones, set
671
        overwrite_observations to True.
672
    overwrite_observations : bool
673
        If you want to overwrite already existing observation values in the
674
        provided observations file set this to True. Default is False.
675
    ref_volume_m3 : float
676
        Option to give an own total glacier volume to match to
677
    ref_volume_year : int or None
678
        The year when the reference volume is valid. If None the RGI date is
679
        used.
680
    rgi_ids_in_ref_volume : list or None
681
        If the reference volume is only valid for part of the provided gdirs,
682
        because some glaciers do not have a volume estimate available. But in
683
        the end the inversion will be performed on all gdirs. If None all gdirs
684
        are used. Default is None.
685
    input_filesuffix: str
686
        The filesuffix of the input inversion flowlines. If None the
687
        settings_filesuffix will be used.
688
    output_filesuffix: str
689
        The filesuffix used for saving resulting inversion files to the gdir. If
690
        None the settings_filesuffix will be used.
691
    fs : float
692
        invert with sliding (default: no)
693
    a_bounds: tuple
694
        factor to apply to default A
695
    apply_fs_on_mismatch: false
696
        on mismatch, try to apply an arbitrary value of fs (fs = 5.7e-20 from
697
        Oerlemans) and try to optimize A again.
698
    error_on_mismatch: bool
699
        sometimes the given bounds do not allow to find a zero mismatch:
700
        this will normally raise an error, but you can switch this off,
701
        use the closest value instead and move on.
702
    filter_inversion_output : bool
703
        whether or not to apply terminus thickness filtering on the inversion
704
        output (needs the downstream lines to work).
705
    add_to_log_file : bool
706
        if the called entity tasks should write into log of gdir. Default True
707

708
    Returns
709
    -------
710
    a dataframe with the individual glacier volumes
711
    """
712

713
    if input_filesuffix is None:
5✔
714
        input_filesuffix = settings_filesuffix
4✔
715

716
    if output_filesuffix is None:
5✔
717
        output_filesuffix = settings_filesuffix
4✔
718

719
    gdirs = utils.tolist(gdirs)
5✔
720

721
    for gdir in gdirs:
5✔
722
        gdir.observations_filesuffix = observations_filesuffix
5✔
723

724
    # check if reference volume referes to all gdirs
725
    if rgi_ids_in_ref_volume is not None:
5✔
726
        gdirs_use = [gdir for gdir in gdirs
4✔
727
                     if gdir.rgi_id in rgi_ids_in_ref_volume]
728
    else:
729
        gdirs_use = gdirs
4✔
730

731
    if any('ref_volume_m3' in gdir.observations for gdir in gdirs):
5✔
732
        ref_volume_m3_file = sum([gdir.observations['ref_volume_m3']['value']
5✔
733
                                  for gdir in gdirs_use])
734
        if ref_volume_m3 is None:
5✔
735
            # if no reference volume is porvided use the one from the obs-file
736
            ref_volume_m3 = ref_volume_m3_file
2✔
737
        elif np.isclose(ref_volume_m3_file, ref_volume_m3, rtol=1e-2):
5✔
738
            # ok the provided ref volume is the same as stored in the obs-file
739
            pass
5✔
740
        elif not overwrite_observations:
3✔
741
            raise InvalidWorkflowError(
2✔
742
                'You have provided an reference volume, but their is already '
743
                'one stored in the current observations file (filesuffix = '
744
                f'{observations_filesuffix})! If you want to overwrite set '
745
                f'overwrite_observations = True.')
746
        else:
747
            for gdir in gdirs:
3✔
748
                if 'ref_volume_m3' in gdir.observations:
3✔
749
                    gdir.observations['ref_volume_m3']['value'] = None
3✔
750

751
    # Optimize the diff to ref, using the settings of the first gdir
752
    gdirs_use[0].settings_filesuffix = settings_filesuffix
5✔
753
    def_a = gdirs_use[0].settings['inversion_glen_a']
5✔
754

755
    # create a container to store the individual modelled glacier volumes
756
    rids = [gdir.rgi_id for gdir in gdirs_use]
5✔
757
    df = pd.DataFrame(index=rids)
5✔
758

759
    def compute_vol(x):
5✔
760
        inversion_tasks(gdirs_use, settings_filesuffix=settings_filesuffix,
5✔
761
                        input_filesuffix=input_filesuffix,
762
                        output_filesuffix=output_filesuffix,
763
                        glen_a=x*def_a, fs=fs,
764
                        filter_inversion_output=filter_inversion_output,
765
                        add_to_log_file=add_to_log_file)
766
        odf = df.copy()
5✔
767
        odf['oggm'] = execute_entity_task(tasks.get_inversion_volume, gdirs_use,
5✔
768
                                          input_filesuffix=output_filesuffix,
769
                                          add_to_log_file=add_to_log_file)
770
        return odf
5✔
771

772
    def to_minimize(x):
5✔
773
        log.workflow('Volume estimate optimisation with '
5✔
774
                     'A factor: {} and fs: {}'.format(x, fs))
775
        odf = compute_vol(x)
5✔
776
        return ref_volume_m3 - odf.oggm.sum()
5✔
777

778
    try:
5✔
779
        out_fac, r = optimization.brentq(to_minimize, *a_bounds, rtol=1e-2,
5✔
780
                                         full_output=True)
781
        if r.converged:
5✔
782
            log.workflow('calibrate_inversion_from_volume '
5✔
783
                         'converged after {} iterations and fs={}. The '
784
                         'resulting Glen A factor is {}.'
785
                         ''.format(r.iterations, fs, out_fac))
786
        else:
787
            raise ValueError('Unexpected error in optimization.brentq')
×
788
    except ValueError:
1✔
789
        # Ok can't find an A. Log for debug:
790
        odf1 = compute_vol(a_bounds[0]).sum() * 1e-9
1✔
791
        odf2 = compute_vol(a_bounds[1]).sum() * 1e-9
1✔
792
        ref_vol_1 = ref_volume_m3 * 1e-9
1✔
793
        ref_vol_2 = ref_volume_m3 * 1e-9
1✔
794
        msg = ('calibration from volume estimate CAN\'T converge with fs={}.\n'
1✔
795
               'Bound values (km3):\nRef={:.3f} OGGM={:.3f} for A factor {}\n'
796
               'Ref={:.3f} OGGM={:.3f} for A factor {}'
797
               ''.format(fs,
798
                         ref_vol_1, odf1.oggm, a_bounds[0],
799
                         ref_vol_2, odf2.oggm, a_bounds[1]))
800
        if apply_fs_on_mismatch and fs == 0 and odf2.oggm > ref_vol_2:
1✔
801
            do_filter = filter_inversion_output
1✔
802
            return calibrate_inversion_from_volume(gdirs,
1✔
803
                                                   settings_filesuffix=settings_filesuffix,
804
                                                   observations_filesuffix=observations_filesuffix,
805
                                                   overwrite_observations=overwrite_observations,
806
                                                   ref_volume_m3=ref_volume_m3,
807
                                                   rgi_ids_in_ref_volume=rgi_ids_in_ref_volume,
808
                                                   input_filesuffix=input_filesuffix,
809
                                                   output_filesuffix=output_filesuffix,
810
                                                   fs=5.7e-20, a_bounds=a_bounds,
811
                                                   apply_fs_on_mismatch=False,
812
                                                   error_on_mismatch=error_on_mismatch,
813
                                                   filter_inversion_output=do_filter,
814
                                                   add_to_log_file=add_to_log_file)
815
        if error_on_mismatch:
1✔
816
            raise ValueError(msg)
1✔
817

818
        out_fac = a_bounds[int(abs(ref_vol_1 - odf1.oggm) >
1✔
819
                               abs(ref_vol_2 - odf2.oggm))]
820
        log.workflow(msg)
1✔
821
        log.workflow('We use A factor = {} and fs = {} and move on.'
1✔
822
                     ''.format(out_fac, fs))
823

824
    # Compute the final volume with the correct A for all gdirs
825
    rids = [gdir.rgi_id for gdir in gdirs]
5✔
826
    df = pd.DataFrame(index=rids)
5✔
827
    inversion_tasks(gdirs, settings_filesuffix=settings_filesuffix,
5✔
828
                    input_filesuffix=input_filesuffix,
829
                    output_filesuffix=output_filesuffix,
830
                    glen_a=out_fac*def_a, fs=fs,
831
                    filter_inversion_output=filter_inversion_output,
832
                    add_to_log_file=add_to_log_file)
833
    df['vol_oggm_m3'] = execute_entity_task(tasks.get_inversion_volume, gdirs,
5✔
834
                                            input_filesuffix=output_filesuffix,
835
                                            add_to_log_file=add_to_log_file)
836
    # add the actually derived volume to the observations file
837
    for gdir in gdirs:
5✔
838
        vol_single = df.vol_oggm_m3.loc[gdir.rgi_id]
5✔
839
        if ref_volume_year is None:
5✔
840
            year_single = gdir.rgi_date
4✔
841
        else:
842
            year_single = ref_volume_year
2✔
843
        if 'ref_volume_m3' in gdir.observations:
5✔
844
            current_vol = gdir.observations['ref_volume_m3']
5✔
845
            current_vol['value'] = vol_single
5✔
846
        else:
847
            current_vol = {'value': vol_single}
4✔
848
        current_vol['year'] = year_single
5✔
849
        gdir.observations['ref_volume_m3'] = current_vol
5✔
850

851
    return df
5✔
852

853

854
@global_task(log)
9✔
855
def calibrate_inversion_from_consensus(gdirs, settings_filesuffix='',
9✔
856
                                       observations_filesuffix='',
857
                                       overwrite_observations=False,
858
                                       input_filesuffix=None,
859
                                       output_filesuffix=None,
860
                                       ignore_missing=True,
861
                                       fs=0, a_bounds=(0.1, 10),
862
                                       apply_fs_on_mismatch=False,
863
                                       error_on_mismatch=True,
864
                                       filter_inversion_output=True,
865
                                       add_to_log_file=True):
866
    """Fit the total volume of the glaciers to the 2019 consensus estimate.
867

868
    This method finds the "best Glen A" to match all glaciers in gdirs with
869
    a valid inverted volume.
870

871
    Parameters
872
    ----------
873
    gdirs : list of :py:class:`oggm.GlacierDirectory` objects
874
        the glacier directories to process
875
    settings_filesuffix: str
876
        You can use a different set of settings by providing a filesuffix. This
877
        is useful for sensitivity experiments.
878
    observations_filesuffix: str
879
        You can provide a filesuffix where the reference volume used will be
880
        stored.
881
    overwrite_observations : bool
882
        If you want to overwrite already existing observation values in the
883
        provided observations file set this to True. Default is False.
884
    input_filesuffix: str
885
        The filesuffix of the input inversion flowlines. If None the
886
        settings_filesuffix will be used.
887
    output_filesuffix: str
888
        The filesuffix used for saving resulting inversion files to the gdir. If
889
        None the settings_filesuffix will be used.
890
    ignore_missing : bool
891
        set this to true to silence the error if some glaciers could not be
892
        found in the consensus estimate.
893
    fs : float
894
        invert with sliding (default: no)
895
    a_bounds: tuple
896
        factor to apply to default A
897
    apply_fs_on_mismatch: false
898
        on mismatch, try to apply an arbitrary value of fs (fs = 5.7e-20 from
899
        Oerlemans) and try to optimize A again.
900
    error_on_mismatch: bool
901
        sometimes the given bounds do not allow to find a zero mismatch:
902
        this will normally raise an error, but you can switch this off,
903
        use the closest value instead and move on.
904
    filter_inversion_output : bool
905
        whether or not to apply terminus thickness filtering on the inversion
906
        output (needs the downstream lines to work).
907
    volume_m3_reference : float
908
        Option to give an own total glacier volume to match to
909
    add_to_log_file : bool
910
        if the called entity tasks should write into log of gdir. Default True
911

912
    Returns
913
    -------
914
    a dataframe with the individual glacier volumes
915
    """
916

917
    if input_filesuffix is None:
4✔
918
        input_filesuffix = settings_filesuffix
4✔
919

920
    if output_filesuffix is None:
4✔
921
        output_filesuffix = settings_filesuffix
4✔
922

923
    gdirs = utils.tolist(gdirs)
4✔
924

925
    # Get the ref data for the glaciers we have
926
    df = pd.read_hdf(utils.get_demo_file('rgi62_itmix_df.h5'))
4✔
927
    rids = [gdir.rgi_id for gdir in gdirs]
4✔
928

929
    found_ids = df.index.intersection(rids)
4✔
930
    if not ignore_missing and (len(found_ids) != len(rids)):
4✔
NEW
931
        raise InvalidWorkflowError('Could not find matching indices in the '
×
932
                                   'consensus estimate for all provided '
933
                                   'glaciers. Set ignore_missing=True to '
934
                                   'ignore this error.')
935

936
    df = df.reindex(rids)
4✔
937

938
    # only use those glaciers which have an itmix estimate
939
    odf = df.copy().dropna()
4✔
940
    ref_volume_m3 = odf.vol_itmix_m3.sum()
4✔
941
    rgi_ids_in_ref_volume = list(odf.index)
4✔
942

943
    # do the calibration
944
    df_oggm = calibrate_inversion_from_volume(
4✔
945
        gdirs, settings_filesuffix=settings_filesuffix,
946
        observations_filesuffix=observations_filesuffix,
947
        overwrite_observations=overwrite_observations,
948
        ref_volume_m3=ref_volume_m3,
949
        rgi_ids_in_ref_volume=rgi_ids_in_ref_volume,
950
        input_filesuffix=input_filesuffix,
951
        output_filesuffix=output_filesuffix,
952
        fs=fs, a_bounds=a_bounds,
953
        apply_fs_on_mismatch=apply_fs_on_mismatch,
954
        error_on_mismatch=error_on_mismatch,
955
        filter_inversion_output=filter_inversion_output,
956
        add_to_log_file=add_to_log_file)
957

958
    # add the oggm volume to the dataframe which will be returned
959
    df['vol_oggm_m3'] = df_oggm['vol_oggm_m3']
4✔
960

961
    # add the original estimates to the observations file
962
    for gdir in gdirs:
4✔
963
        gdir.observations_filesuffix = observations_filesuffix
4✔
964
        ref_volume_sinlge = gdir.observations['ref_volume_m3']
4✔
965
        for col in df.columns:
4✔
966
            if col != 'vol_oggm_m3':
4✔
967
                ref_volume_sinlge[col] = df[col].loc[gdir.rgi_id]
4✔
968
        gdir.observations['ref_volume_m3'] = ref_volume_sinlge
4✔
969
    return df
4✔
970

971

972
@global_task(log)
9✔
973
def merge_glacier_tasks(gdirs, settings_filesuffix='',
9✔
974
                        main_rgi_id=None, return_all=False, buffer=None,
975
                        **kwargs):
976
    """Shortcut function: run all tasks to merge tributaries to a main glacier
977

978
    Parameters
979
    ----------
980
    gdirs : list of :py:class:`oggm.GlacierDirectory`
981
        all glaciers, main and tributary. Preprocessed and initialised
982
    main_rgi_id: str
983
        RGI ID of the main glacier of interest. If None is provided merging
984
        will start based upon the largest glacier
985
    return_all : bool
986
        if main_rgi_id is given and return_all = False: only the main glacier
987
        is returned
988
        if main_rgi_is given and return_all = True, the main glacier and every
989
        remaining glacier from the initial gdirs list is returned, possible
990
        merged as well.
991
    buffer : float
992
        buffer around a flowline to first better find an overlap with another
993
        flowline. And second assure some distance between the lines at a
994
        junction. Will default to `cfg.PARAMS['kbuffer']`.
995
    kwargs: keyword argument for the recursive merging
996

997
    Returns
998
    -------
999
    merged_gdirs: list of all merged :py:class:`oggm.GlacierDirectory`
1000
    """
1001

1002
    if len(gdirs) > 100:
×
1003
        raise InvalidParamsError('this could take time! I should include an '
×
1004
                                 'optional parameter to ignore this.')
1005

1006
    # sort all glaciers descending by area
1007
    gdirs.sort(key=lambda x: x.rgi_area_m2, reverse=True)
×
1008

1009
    # if main glacier is asked, put it in first position
1010
    if main_rgi_id is not None:
×
1011
        gdir_main = [gd for gd in gdirs if gd.rgi_id == main_rgi_id][0]
×
1012
        gdirs.remove(gdir_main)
×
1013
        gdirs = [gdir_main] + gdirs
×
1014

1015
    merged_gdirs = []
×
1016
    while len(gdirs) > 1:
×
1017
        # main glacier is always the first: either given or the largest one
1018
        gdir_main = gdirs.pop(0)
×
1019
        gdir_merged, gdirs = _recursive_merging(gdirs, gdir_main, **kwargs)
×
1020
        merged_gdirs.append(gdir_merged)
×
1021

1022
    # now we have gdirs which contain all the necessary flowlines,
1023
    # time to clean them up
1024
    for gdir in merged_gdirs:
×
1025
        flowline.clean_merged_flowlines(
×
1026
            gdir, settings_filesuffix=settings_filesuffix, buffer=buffer)
1027

1028
    if main_rgi_id is not None and return_all is False:
×
1029
        return [gd for gd in merged_gdirs if main_rgi_id in gd.rgi_id][0]
×
1030

1031
    # add the remaining glacier to the final list
1032
    merged_gdirs = merged_gdirs + gdirs
×
1033

1034
    return merged_gdirs
×
1035

1036

1037
def _recursive_merging(gdirs, gdir_main, glcdf=None, dem_source=None,
9✔
1038
                       filename='climate_historical', input_filesuffix=''):
1039
    """ Recursive function to merge all tributary glaciers.
1040

1041
    This function should start with the largest glacier and then be called
1042
    upon all smaller glaciers.
1043

1044
    Parameters
1045
    ----------
1046
    gdirs : list of :py:class:`oggm.GlacierDirectory`
1047
        all glaciers, main and tributary. Preprocessed and initialised
1048
    gdir_main: :py:class:`oggm.GlacierDirectory`
1049
        the current main glacier where the others are merge to
1050
    glcdf: geopandas.GeoDataFrame
1051
        which contains the main glaciers, will be downloaded if None
1052
    filename: str
1053
        Baseline climate file
1054
    dem_source: str
1055
        the DEM source to use
1056
    input_filesuffix: str
1057
        Filesuffix to the climate file
1058

1059
    Returns
1060
    -------
1061
    merged_gdir: :py:class:`oggm.GlacierDirectory`
1062
        the mergeed current main glacier
1063
    gdirs : list of :py:class:`oggm.GlacierDirectory`
1064
        updated list of glaciers, removed the already merged ones
1065
    """
1066
    # find glaciers which intersect with the main
1067
    tributaries = centerlines.intersect_downstream_lines(gdir_main,
×
1068
                                                         candidates=gdirs)
1069
    if len(tributaries) == 0:
×
1070
        # if no tributaries: nothing to do
1071
        return gdir_main, gdirs
×
1072

1073
    # separate those glaciers which are not already found to be a tributary
1074
    gdirs = [gd for gd in gdirs if gd not in tributaries]
×
1075

1076
    gdirs_to_merge = []
×
1077

1078
    for trib in tributaries:
×
1079
        # for each tributary: check if we can merge additional glaciers to it
1080
        merged, gdirs = _recursive_merging(gdirs, trib, glcdf=glcdf,
×
1081
                                           filename=filename,
1082
                                           input_filesuffix=input_filesuffix,
1083
                                           dem_source=dem_source)
1084
        gdirs_to_merge.append(merged)
×
1085

1086
    # create merged glacier directory
1087
    gdir_merged = utils.initialize_merged_gdir(
×
1088
        gdir_main, tribs=gdirs_to_merge, glcdf=glcdf, filename=filename,
1089
        input_filesuffix=input_filesuffix, dem_source=dem_source)
1090

1091
    flowline.merge_to_one_glacier(gdir_merged, gdirs_to_merge,
×
1092
                                  filename=filename,
1093
                                  input_filesuffix=input_filesuffix)
1094

1095
    return gdir_merged, gdirs
×
1096

1097

1098
@global_task(log)
9✔
1099
def merge_gridded_data(gdirs, output_folder=None,
9✔
1100
                       output_filename='gridded_data_merged',
1101
                       output_grid=None,
1102
                       input_file='gridded_data',
1103
                       input_filesuffix='',
1104
                       included_variables='all',
1105
                       preserve_totals=True,
1106
                       smooth_radius=None,
1107
                       use_glacier_mask=True,
1108
                       add_topography=False,
1109
                       keep_dem_file=False,
1110
                       interp='nearest',
1111
                       use_multiprocessing=True,
1112
                       return_dataset=True,
1113
                       reset=False):
1114
    """ This function takes a list of glacier directories and combines their
1115
    gridded_data into a new NetCDF file and saves it into the output_folder. It
1116
    also could merge data from different source files if you provide a list
1117
    of input_file(s) (together with a list of input_filesuffix and a list of
1118
    included_variables).
1119

1120
    Attention: You always should check the first gdir from gdirs as this
1121
    defines the projection of the resulting dataset and the data which is
1122
    merged, if included_variables is set to 'all'.
1123

1124
    Parameters
1125
    ----------
1126
    gdirs : list of :py:class:`oggm.GlacierDirectory` objects
1127
        The glacier directories which should be combined. If an additonal
1128
        dimension than x or y is given (e.g. time) we assume it has the same
1129
        length for all gdirs (we currently do not check). The first gdir in the
1130
        list serves as the template for the merged gridded_data (it defines the
1131
        used projection, if you want to merge all variables they are taken from
1132
        the input data of the first gdir).
1133
    output_folder : str
1134
        Folder where the intermediate files and the final combined gridded data
1135
        should be stored. Default is cfg.PATHS['working_dir']
1136
    output_filename : str
1137
        The name for the resulting file. Default is 'gridded_data_merged'.
1138
    output_grid : salem.gis.Grid
1139
        You can provide a custom grid on which the gridded data should be
1140
        merged on. If None, a combined grid of all gdirs will be constructed.
1141
        Default is None.
1142
    input_file : str or list
1143
        The file(s) which should be merged. If a list is provided the data of
1144
        all files is merged into the same dataset. Default is 'gridded_data'.
1145
    input_filesuffix : str or list
1146
        Potential filesuffix for the input file(s). If input_file is a list,
1147
        input_filesuffix should also be a list of the same length.
1148
        Default is ''.
1149
    included_variables : str or list or list of lists
1150
        The variable(s) which should be merged from the input_file(s). For one
1151
        variable it can be provided as str, otherwise as a list. If set to
1152
        'all' we merge everything. If input_file is a list, include_variables
1153
        should be a list of lists with the same length, where the lists define
1154
        the variables for the individual input_files. Furthermore, if you only
1155
        want to merge a subset of the variables you can define the variable as
1156
        a tuple with the first element being the variable name and the second
1157
        element being the selected coordinates as a dictionary (e.g.
1158
        ('variable', {'time': [0, 1, 2]})). Default is 'all'.
1159
    preserve_totals : bool
1160
        If True we preserve the total value of all float-variables of the
1161
        original file. The total value is defined as the sum of all grid cell
1162
        values times the area of the grid cell (e.g. preserving ice volume).
1163
        Default is True.
1164
    smooth_radius : int
1165
        pixel size of the gaussian smoothing, only used if preserve_totals is
1166
        True. Default is to use cfg.PARAMS['smooth_window'] (i.e. a size in
1167
        meters). Set to zero to suppress smoothing.
1168
    use_glacier_mask : bool
1169
        If True only the data cropped by the glacier mask is included in the
1170
        merged file. You must make sure that the variable 'glacier_mask' exists
1171
        in the input_file(s) (which is the oggm default). Default is True.
1172
    add_topography : bool or str
1173
        If True we try to add the default DEM source of the first glacier
1174
        directory of gdirs. Alternatively you could define a DEM source
1175
        directly as string. Default is False.
1176
    keep_dem_file : bool
1177
        If we add a topography to the merged gridded_data we save the DEM as
1178
        a tiff in the output_folder as an intermediate step. If keep_dem_file
1179
        is True we will keep this file, otherwise we delete it at the end.
1180
        Default is False.
1181
    interp : str
1182
        The interpolation method used by salem.Grid.map_gridded_data. Currently
1183
        available 'nearest' (default), 'linear', or 'spline'.
1184
    use_multiprocessing : bool
1185
        If True the merging is done in parallel using multiprocessing. This
1186
        could require a lot of memory. Default is True.
1187
    return_dataset : bool
1188
        If True the merged dataset is returned. Default is True.
1189
    reset : bool
1190
        If the file defined in output_filename already exists and reset is
1191
        False an error is raised. If reset is True and the file exists it is
1192
        deleted before merging. Default is False.
1193
    """
1194

1195
    # check if output_folder exists, otherwise creates it
1196
    if output_folder is None:
3✔
1197
        output_folder = cfg.PATHS['working_dir']
2✔
1198
    utils.mkdir(output_folder)
3✔
1199

1200
    # for some data we want to set zero values outside of outline to nan
1201
    # (e.g. for visualization purposes)
1202
    vars_setting_zero_to_nan = ['distributed_thickness', 'simulated_thickness',
3✔
1203
                                'consensus_ice_thickness',
1204
                                'millan_ice_thickness']
1205

1206
    # check if file already exists
1207
    fpath = os.path.join(output_folder, f'{output_filename}.nc')
3✔
1208
    if os.path.exists(fpath):
3✔
1209
        if reset:
2✔
1210
            os.remove(fpath)
2✔
1211
        else:
1212
            raise InvalidWorkflowError(f'The file {output_filename}.nc already'
×
1213
                                       f' exists in the output folder. If you '
1214
                                       f'want to replace it set reset=True!')
1215

1216
    if not isinstance(input_file, list):
3✔
1217
        input_file = [input_file]
3✔
1218
    if not isinstance(input_filesuffix, list):
3✔
1219
        input_filesuffix = [input_filesuffix]
3✔
1220
    if not isinstance(included_variables, list):
3✔
1221
        # special case if only one variable should be merged
1222
        included_variables = [included_variables]
2✔
1223
    if len(input_file) == 1:
3✔
1224
        # in the case of one input file we still convert included_variables
1225
        # into a list of lists
1226
        included_variables = [included_variables]
3✔
1227

1228
    if output_grid is None:
3✔
1229
        # create a combined salem.Grid object, which serves as canvas/boundaries of
1230
        # the combined glacier region
1231
        output_grid = utils.combine_grids(gdirs)
3✔
1232

1233
    if add_topography:
3✔
1234
        # ok, lets get a DEM and add it to the final file
1235
        if isinstance(add_topography, str):
1✔
1236
            dem_source = add_topography
×
1237
            dem_gdir = None
×
1238
        else:
1239
            dem_source = None
1✔
1240
            dem_gdir = gdirs[0]
1✔
1241
        gis.get_dem_for_grid(output_grid, output_folder,
1✔
1242
                             source=dem_source, gdir=dem_gdir)
1243
        # unwrapped is needed to execute process_dem without the entity_task
1244
        # overhead (this would need a valid gdir)
1245
        gis.process_dem.unwrapped(gdir=None, grid=output_grid,
1✔
1246
                                  fpath=output_folder,
1247
                                  output_filename=output_filename)
1248
        if not keep_dem_file:
1✔
1249
            fpath = os.path.join(output_folder, 'dem.tif')
1✔
1250
            if os.path.exists(fpath):
1✔
1251
                os.remove(fpath)
1✔
1252

1253
            # also delete diagnostics
1254
            fpath = os.path.join(output_folder, 'dem_diagnostics.json')
1✔
1255
            if os.path.exists(fpath):
1✔
1256
                os.remove(fpath)
1✔
1257

1258
    with gis.GriddedNcdfFile(grid=output_grid, fpath=output_folder,
3✔
1259
                             basename=output_filename) as nc:
1260

1261
        # adding the data of one file after another to the merged dataset
1262
        for in_file, in_filesuffix, included_var in zip(input_file,
3✔
1263
                                                        input_filesuffix,
1264
                                                        included_variables):
1265

1266
            # if want to save all variables, take them from the first gdir
1267
            if 'all' in included_var:
3✔
1268
                with xr.open_dataset(
×
1269
                        gdirs[0].get_filepath(in_file,
1270
                                              filesuffix=in_filesuffix)) as ds:
1271
                    included_var = list(ds.data_vars)
×
1272

1273
            # add one variable after another
1274
            for var in included_var:
3✔
1275
                # check if we only want to merge a subset of the variable
1276
                if isinstance(var, tuple):
3✔
1277
                    var, slice_of_var = var
1✔
1278
                else:
1279
                    slice_of_var = None
3✔
1280

1281
                # do not merge topo variables, for this we have add_topography
1282
                if var in ['topo', 'topo_smoothed', 'topo_valid_mask']:
3✔
1283
                    continue
×
1284

1285
                # check dimensions, if other than y or x it is added to file
1286
                with xr.open_dataset(
3✔
1287
                        gdirs[0].get_filepath(in_file,
1288
                                              filesuffix=in_filesuffix)) as ds:
1289
                    ds_templ = ds
3✔
1290
                dims = ds_templ[var].dims
3✔
1291

1292
                dim_lengths = []
3✔
1293
                for dim in dims:
3✔
1294
                    if dim == 'y':
3✔
1295
                        dim_lengths.append(output_grid.ny)
3✔
1296
                    elif dim == 'x':
3✔
1297
                        dim_lengths.append(output_grid.nx)
3✔
1298
                    else:
1299
                        if slice_of_var is not None:
1✔
1300
                            # only keep selected part of the variable
1301
                            if dim in slice_of_var:
1✔
1302
                                dim_var = ds_templ[var][dim].sel(
1✔
1303
                                    {dim: slice_of_var[dim]})
1304
                            else:
1305
                                dim_var = ds_templ[var][dim]
×
1306
                        else:
1307
                            dim_var = ds_templ[var][dim]
×
1308
                        if dim not in nc.dimensions:
1✔
1309
                            nc.createDimension(dim, len(dim_var))
1✔
1310
                            v = nc.createVariable(dim, 'f4', (dim,), zlib=True)
1✔
1311
                            # add attributes
1312
                            for attr in dim_var.attrs:
1✔
1313
                                setattr(v, attr, dim_var.attrs[attr])
×
1314
                            if slice_of_var is not None:
1✔
1315
                                if dim in slice_of_var:
1✔
1316
                                    v[:] = slice_of_var[dim]
1✔
1317
                                else:
1318
                                    v[:] = dim_var.values
×
1319
                            else:
1320
                                v[:] = dim_var.values
×
1321
                            # also add potential coords (e.g. calender_year)
1322
                            for coord in dim_var.coords:
1✔
1323
                                if coord != dim:
1✔
1324
                                    if slice_of_var is not None:
1✔
1325
                                        if dim in slice_of_var:
1✔
1326
                                            coord_val = ds_templ[coord].sel(
1✔
1327
                                                {dim: slice_of_var[dim]}).values
1328
                                        else:
1329
                                            coord_val = ds_templ[coord].values
×
1330
                                    else:
1331
                                        coord_val = ds_templ[coord].values
×
1332
                                    tmp_coord = nc.createVariable(
1✔
1333
                                        coord, 'f4', (dim,))
1334
                                    tmp_coord[:] = coord_val
1✔
1335
                                    for attr in ds_templ[coord].attrs:
1✔
1336
                                        setattr(tmp_coord, attr,
×
1337
                                                ds_templ[coord].attrs[attr])
1338
                        dim_lengths.append(len(dim_var))
1✔
1339

1340
                # before merging add variable attributes to final file
1341
                v = nc.createVariable(var, 'f4', dims, zlib=True)
3✔
1342
                for attr in ds_templ[var].attrs:
3✔
1343
                    setattr(v, attr, ds_templ[var].attrs[attr])
3✔
1344

1345
                kwargs_reproject = dict(
3✔
1346
                    variable=var,
1347
                    target_grid=output_grid,
1348
                    filename=in_file,
1349
                    filesuffix=in_filesuffix,
1350
                    use_glacier_mask=use_glacier_mask,
1351
                    interp=interp,
1352
                    preserve_totals=preserve_totals,
1353
                    smooth_radius=smooth_radius,
1354
                    slice_of_variable=slice_of_var,
1355
                )
1356

1357
                if use_multiprocessing:
3✔
1358
                    r_data = execute_entity_task(
3✔
1359
                        gis.reproject_gridded_data_variable_to_grid,
1360
                        gdirs,
1361
                        **kwargs_reproject
1362
                    )
1363

1364
                    # if we continue_on_error and their was a file or a variable
1365
                    # missing some entries could be None, here we filter them
1366
                    r_data = list(filter(lambda e: e is not None, r_data))
3✔
1367

1368
                    r_data = np.sum(r_data, axis=0)
3✔
1369
                    if var in vars_setting_zero_to_nan:
3✔
1370
                        r_data = np.where(r_data == 0, np.nan, r_data)
3✔
1371

1372
                    v[:] = r_data
3✔
1373
                else:
1374
                    # if we do not use multiprocessing we have to loop over the
1375
                    # gdirs and add the data one after another
1376
                    r_data = np.zeros(dim_lengths)
1✔
1377
                    for gdir in gdirs:
1✔
1378
                        tmp_data = gis.reproject_gridded_data_variable_to_grid(
1✔
1379
                            gdir, **kwargs_reproject)
1380
                        if tmp_data is not None:
1✔
1381
                            r_data += tmp_data
1✔
1382

1383
                    if var in vars_setting_zero_to_nan:
1✔
1384
                        r_data = np.where(r_data == 0, np.nan, r_data)
1✔
1385

1386
                    v[:] = r_data
1✔
1387

1388
        # and some metadata to the merged dataset
1389
        nc.nr_of_merged_glaciers = len(gdirs)
3✔
1390
        nc.rgi_ids = [gdir.rgi_id for gdir in gdirs]
3✔
1391

1392
    # finally we set potential additional time coordinates correctly again
1393
    fp = os.path.join(output_folder, output_filename + '.nc')
3✔
1394
    ds_was_adapted = False
3✔
1395

1396
    with xr.open_dataset(fp) as ds:
3✔
1397
        for time_var in ['calendar_year', 'calendar_month',
3✔
1398
                         'hydro_year', 'hydro_month']:
1399
            if time_var in ds.data_vars:
3✔
1400
                ds = ds.set_coords(time_var)
1✔
1401
                ds_was_adapted = True
1✔
1402
        ds_adapted = ds.load()
3✔
1403

1404
    if ds_was_adapted:
3✔
1405
        ds_adapted.to_netcdf(fp)
1✔
1406

1407
    if return_dataset:
3✔
1408
        return ds_adapted
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc