• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OGGM / oggm / 5975837719

25 Aug 2023 12:24PM UTC coverage: 85.547% (+0.05%) from 85.502%
5975837719

push

github

web-flow
Tiny edit on index page regarding youtube (#1628)

* Update index.rst

* Update index.rst

11441 of 13374 relevant lines covered (85.55%)

3.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.68
/oggm/workflow.py
1
"""Wrappers for the single tasks, multi processor handling."""
2
# Built ins
3
import logging
9✔
4
import os
9✔
5
import shutil
9✔
6
from collections.abc import Sequence
9✔
7
# External libs
8
import multiprocessing
9✔
9
import numpy as np
9✔
10
import pandas as pd
9✔
11
from scipy import optimize as optimization
9✔
12

13
# Locals
14
import oggm
9✔
15
from oggm import cfg, tasks, utils
9✔
16
from oggm.core import centerlines, flowline, climate
9✔
17
from oggm.exceptions import InvalidParamsError, InvalidWorkflowError
9✔
18
from oggm.utils import global_task
9✔
19

20
# MPI
21
try:
9✔
22
    import oggm.mpi as ogmpi
9✔
23
    _have_ogmpi = True
×
24
except ImportError:
25
    _have_ogmpi = False
26

27
# Module logger
28
log = logging.getLogger(__name__)
9✔
29

30
# Multiprocessing Pool
31
_mp_manager = None
9✔
32
_mp_pool = None
9✔
33

34

35
def _init_pool_globals(_cfg_contents, global_lock):
9✔
36
    cfg.unpack_config(_cfg_contents)
1✔
37
    utils.lock = global_lock
1✔
38

39

40
def init_mp_pool(reset=False):
9✔
41
    """Necessary because at import time, cfg might be uninitialized"""
42
    global _mp_manager, _mp_pool
43
    if _mp_pool and _mp_manager and not reset:
1✔
44
        return _mp_pool
1✔
45

46
    cfg.CONFIG_MODIFIED = False
1✔
47
    if _mp_pool:
1✔
48
        _mp_pool.terminate()
1✔
49
        _mp_pool = None
1✔
50
    if _mp_manager:
1✔
51
        cfg.set_manager(None)
1✔
52
        _mp_manager.shutdown()
1✔
53
        _mp_manager = None
1✔
54

55
    if cfg.PARAMS['use_mp_spawn']:
1✔
56
        mp = multiprocessing.get_context('spawn')
×
57
    else:
58
        mp = multiprocessing
1✔
59

60
    _mp_manager = mp.Manager()
1✔
61

62
    cfg.set_manager(_mp_manager)
1✔
63
    cfg_contents = cfg.pack_config()
1✔
64

65
    global_lock = _mp_manager.Lock()
1✔
66

67
    mpp = cfg.PARAMS['mp_processes']
1✔
68
    _mp_pool = mp.Pool(mpp, initializer=_init_pool_globals,
1✔
69
                       initargs=(cfg_contents, global_lock))
70
    return _mp_pool
1✔
71

72

73
def _merge_dicts(*dicts):
9✔
74
    r = {}
7✔
75
    for d in dicts:
7✔
76
        r.update(d)
7✔
77
    return r
7✔
78

79

80
class _pickle_copier(object):
9✔
81
    """Pickleable alternative to functools.partial,
82
    Which is not pickleable in python2 and thus doesn't work
83
    with Multiprocessing."""
84

85
    def __init__(self, func, kwargs):
9✔
86
        self.call_func = func
7✔
87
        self.out_kwargs = kwargs
7✔
88

89
    def _call_internal(self, call_func, gdir, kwargs):
9✔
90
        # If the function is None, assume gdir is tuple with task function
91
        if not call_func:
7✔
92
            call_func, gdir = gdir
×
93

94
        # Merge main kwargs with per-task kwargs
95
        kwargs = _merge_dicts(self.out_kwargs, kwargs)
7✔
96

97
        # If gdir is a sequence, again assume it's a tuple with per-gdir kwargs.
98
        if isinstance(gdir, Sequence) and not isinstance(gdir, str):
7✔
99
            gdir, gdir_kwargs = gdir
1✔
100
            kwargs.update(gdir_kwargs)
1✔
101

102
        return call_func(gdir, **kwargs)
7✔
103

104
    def __call__(self, arg):
9✔
105
        res = None
7✔
106
        for func in self.call_func:
7✔
107
            func, kwargs = func
7✔
108
            res = self._call_internal(func, arg, kwargs)
7✔
109
        return res
7✔
110

111

112
def reset_multiprocessing():
9✔
113
    """Reset multiprocessing state
114

115
    Call this if you changed configuration parameters mid-run and need them to
116
    be re-propagated to child processes.
117
    """
118
    global _mp_pool
119
    if _mp_pool:
9✔
120
        _mp_pool.terminate()
1✔
121
        _mp_pool = None
1✔
122
    cfg.CONFIG_MODIFIED = False
9✔
123

124

125
def execute_entity_task(task, gdirs, **kwargs):
9✔
126
    """Execute a task on gdirs.
127

128
    If you asked for multiprocessing, it will do it.
129

130
    If ``task`` has more arguments than `gdir` they have to be keyword
131
    arguments.
132

133
    Parameters
134
    ----------
135
    task : function or sequence of functions
136
         The entity task(s) to apply.
137
         Can be None, in which case each gdir is expected to be a tuple of (task, gdir).
138
         When passing a sequence, each item can also optionally be a tuple of (task, dictionary).
139
         In this case the dictionary items will be passed to the task as kwargs.
140
    gdirs : list of :py:class:`oggm.GlacierDirectory` objects
141
        The glacier directories to process.
142
        Each individual gdir can optionally be a tuple of (gdir, dictionary).
143
        In this case, the values in the dictionary will be passed to the task as
144
        keyword arguments for that specific gdir.
145

146
    Returns
147
    -------
148
    List of results from task. Last task if a list of tasks was given.
149
    """
150

151
    # Normalize task into list of tuples for simplicity
152
    if not isinstance(task, Sequence):
7✔
153
        task = [task]
7✔
154
    tasks = []
7✔
155
    for t in task:
7✔
156
        if isinstance(t, tuple):
7✔
157
            tasks.append(t)
1✔
158
        else:
159
            tasks.append((t, {}))
7✔
160

161
    # Reject global tasks
162
    for t in tasks:
7✔
163
        if t[0].__dict__.get('is_global_task', False):
7✔
164
            raise InvalidWorkflowError('execute_entity_task cannot be used on '
×
165
                                       'global tasks.')
166

167
    # Should be iterable
168
    gdirs = utils.tolist(gdirs)
7✔
169
    ng = len(gdirs)
7✔
170
    if ng == 0:
7✔
171
        log.workflow('Called execute_entity_task on 0 glaciers. Returning...')
1✔
172
        return
1✔
173

174
    log.workflow('Execute entity tasks [%s] on %d glaciers',
7✔
175
                 ', '.join([t[0].__name__ for t in tasks]), ng)
176

177
    pc = _pickle_copier(tasks, kwargs)
7✔
178

179
    if _have_ogmpi:
7✔
180
        if ogmpi.OGGM_MPI_COMM is not None:
×
181
            return ogmpi.mpi_master_spin_tasks(pc, gdirs)
×
182

183
    if cfg.PARAMS['use_multiprocessing'] and ng > 1:
7✔
184
        mppool = init_mp_pool(cfg.CONFIG_MODIFIED)
1✔
185
        out = mppool.map(pc, gdirs, chunksize=1)
1✔
186
    else:
187
        if ng > 3:
7✔
188
            log.workflow('WARNING: you are trying to run an entity task on '
4✔
189
                         '%d glaciers with multiprocessing turned off. OGGM '
190
                         'will run faster with multiprocessing turned on.', ng)
191
        out = [pc(gdir) for gdir in gdirs]
7✔
192

193
    return out
7✔
194

195

196
def execute_parallel_tasks(gdir, tasks):
9✔
197
    """Execute a list of task on a single gdir (experimental!).
198

199
    This is useful when running a non-sequential list of task on a gdir,
200
    mostly for e.g. different experiments with different output files.
201

202
    Parameters
203
    ----------
204
    gdir : :py:class:`oggm.GlacierDirectory`
205
         the directory to process.
206
    tasks : list
207
         the the list of entity tasks to apply.
208
         Optionally, each list element can be a tuple, with the first element
209
         being the task, and the second element a dict that
210
         will be passed to the task function as ``**kwargs``.
211
    """
212

213
    pc = _pickle_copier(None, {})
1✔
214

215
    _tasks = []
1✔
216
    for task in tasks:
1✔
217
        kwargs = {}
1✔
218
        if isinstance(task, Sequence):
1✔
219
            task, kwargs = task
1✔
220
        _tasks.append((task, (gdir, kwargs)))
1✔
221

222
    if _have_ogmpi:
1✔
223
        if ogmpi.OGGM_MPI_COMM is not None:
×
224
            ogmpi.mpi_master_spin_tasks(pc, _tasks)
×
225
            return
×
226

227
    if cfg.PARAMS['use_multiprocessing']:
1✔
228
        mppool = init_mp_pool(cfg.CONFIG_MODIFIED)
×
229
        mppool.map(pc, _tasks, chunksize=1)
×
230
    else:
231
        for task, (gd, kw) in _tasks:
1✔
232
            task(gd, **kw)
1✔
233

234

235
def gdir_from_prepro(entity, from_prepro_level=None,
9✔
236
                     prepro_border=None, prepro_rgi_version=None,
237
                     base_url=None):
238

239
    if prepro_border is None:
2✔
240
        prepro_border = int(cfg.PARAMS['border'])
1✔
241
    if prepro_rgi_version is None:
2✔
242
        prepro_rgi_version = cfg.PARAMS['rgi_version']
1✔
243

244
    if isinstance(entity, pd.Series):
2✔
245
        try:
×
246
            rid = entity.RGIId
×
247
        except AttributeError:
×
248
            rid = entity.rgi_id
×
249
    else:
250
        rid = entity
2✔
251

252
    tar_base = utils.get_prepro_gdir(prepro_rgi_version, rid, prepro_border,
2✔
253
                                     from_prepro_level, base_url=base_url)
254
    from_tar = os.path.join(tar_base.replace('.tar', ''), rid + '.tar.gz')
2✔
255
    return oggm.GlacierDirectory(entity, from_tar=from_tar)
2✔
256

257

258
def gdir_from_tar(entity, from_tar):
9✔
259

260
    try:
1✔
261
        rgi_id = entity.RGIId
1✔
262
    except AttributeError:
×
263
        rgi_id = entity
×
264

265
    from_tar = os.path.join(from_tar, '{}'.format(rgi_id[:8]),
1✔
266
                            '{}.tar' .format(rgi_id[:11]))
267
    assert os.path.exists(from_tar), 'tarfile does not exist'
1✔
268
    from_tar = os.path.join(from_tar.replace('.tar', ''), rgi_id + '.tar.gz')
1✔
269
    return oggm.GlacierDirectory(entity, from_tar=from_tar)
1✔
270

271

272
def _check_rgi_input(rgidf=None, err_on_lvl2=False):
9✔
273
    """Complain if the input has duplicates."""
274

275
    if rgidf is None:
7✔
276
        return
×
277

278
    msg = ('You have glaciers with connectivity level 2 in your list. '
7✔
279
           'OGGM does not provide pre-processed directories for these.')
280

281
    # Check if dataframe or list of strs
282
    is_dataframe = isinstance(rgidf, pd.DataFrame)
7✔
283
    if is_dataframe:
7✔
284
        try:
6✔
285
            rgi_ids = rgidf.RGIId
6✔
286
            # if dataframe we can also check for connectivity
287
            if 'Connect' in rgidf and np.any(rgidf['Connect'] == 2):
6✔
288
                if err_on_lvl2:
×
289
                    raise RuntimeError(msg)
×
290
        except AttributeError:
2✔
291
            # RGI7
292
            rgi_ids = rgidf.rgi_id
2✔
293
    else:
294
        rgi_ids = utils.tolist(rgidf)
3✔
295
        # Check for Connectivity level 2 here as well
296
        not_good_ids = pd.read_csv(utils.get_demo_file('rgi6_ids_conn_lvl2.csv'),
3✔
297
                                   index_col=0)
298
        try:
3✔
299
            if err_on_lvl2 and len(not_good_ids.loc[rgi_ids]) > 0:
3✔
300
                raise RuntimeError(msg)
×
301
        except KeyError:
2✔
302
            # Were good
303
            pass
2✔
304

305
    u, c = np.unique(rgi_ids, return_counts=True)
7✔
306
    if len(u) < len(rgi_ids):
7✔
307
        raise InvalidWorkflowError('Found duplicates in the list of '
1✔
308
                                   'RGI IDs: {}'.format(u[c > 1]))
309

310

311
def _isdir(path):
9✔
312
    """os.path.isdir, returning False instead of an error on non-string/path-like objects
313
    """
314
    try:
6✔
315
        return os.path.isdir(path)
6✔
316
    except TypeError:
×
317
        return False
×
318

319

320
def init_glacier_directories(rgidf=None, *, reset=False, force=False,
9✔
321
                             from_prepro_level=None, prepro_border=None,
322
                             prepro_rgi_version=None, prepro_base_url=None,
323
                             from_tar=False, delete_tar=False):
324
    """Initializes the list of Glacier Directories for this run.
325

326
    This is the very first task to do (always). If the directories are already
327
    available in the working directory, use them. If not, create new ones.
328

329
    **Careful**: when starting from a pre-processed directory with
330
    `from_prepro_level` or `from_tar`, the existing directories will be overwritten!
331

332
    Parameters
333
    ----------
334
    rgidf : GeoDataFrame or list of ids, optional for pre-computed runs
335
        the RGI glacier outlines. If unavailable, OGGM will parse the
336
        information from the glacier directories found in the working
337
        directory. It is required for new runs.
338
    reset : bool
339
        delete the existing glacier directories if found.
340
    force : bool
341
        setting `reset=True` will trigger a yes/no question to the user. Set
342
        `force=True` to avoid this.
343
    from_prepro_level : int
344
        get the gdir data from the official pre-processed pool. If this
345
        argument is set, the existing directories will be overwritten!
346
    prepro_border : int
347
        for `from_prepro_level` only: if you want to override the default
348
        behavior which is to use `cfg.PARAMS['border']`
349
    prepro_rgi_version : str
350
        for `from_prepro_level` only: if you want to override the default
351
        behavior which is to use `cfg.PARAMS['rgi_version']`
352
    prepro_base_url : str
353
        for `from_prepro_level` only: the preprocessed directory url from
354
        which to download the directories (became mandatory in OGGM v1.6)
355
    from_tar : bool or str, default=False
356
        extract the gdir data from a tar file. If set to `True`,
357
        will check for a tar file at the expected location in `base_dir`.
358
        delete the original tar file after extraction. If this
359
        argument is set, the existing directories will be overwritten!
360

361
    Returns
362
    -------
363
    gdirs : list of :py:class:`oggm.GlacierDirectory` objects
364
        the initialised glacier directories
365
    """
366

367
    _check_rgi_input(rgidf, err_on_lvl2=from_prepro_level)
7✔
368

369
    if reset and not force:
7✔
370
        reset = utils.query_yes_no('Delete all glacier directories?')
×
371

372
    if from_prepro_level:
7✔
373
        url = utils.get_prepro_base_url(base_url=prepro_base_url,
2✔
374
                                        border=prepro_border,
375
                                        prepro_level=from_prepro_level,
376
                                        rgi_version=prepro_rgi_version)
377
        if cfg.PARAMS['has_internet'] and not utils.url_exists(url):
2✔
378
            raise InvalidParamsError("base url seems unreachable with these "
×
379
                                     "parameters: {}".format(url))
380
        if ('oggm_v1.4' in url and
2✔
381
                from_prepro_level >= 3 and
382
                not cfg.PARAMS['prcp_fac']):
383
            log.warning('You seem to be using v1.4 directories with a more '
1✔
384
                        'recent version of OGGM. While this is possible, be '
385
                        'aware that some defaults parameters have changed. '
386
                        'See the documentation for details: '
387
                        'http://docs.oggm.org/en/stable/whats-new.html')
388

389
    # if reset delete also the log directory
390
    if reset:
7✔
391
        fpath = os.path.join(cfg.PATHS['working_dir'], 'log')
1✔
392
        if os.path.exists(fpath):
1✔
393
            shutil.rmtree(fpath)
×
394

395
    if rgidf is None:
7✔
396
        # Infer the glacier directories from folders available in working dir
397
        if reset:
×
398
            raise ValueError('Cannot use reset without setting rgidf')
×
399
        log.workflow('init_glacier_directories by parsing all available '
×
400
                     'folders (this takes time: if possible, provide rgidf '
401
                     'instead).')
402
        # The dirs should be there already
403
        gl_dir = os.path.join(cfg.PATHS['working_dir'], 'per_glacier')
×
404
        gdirs = []
×
405
        for root, _, files in os.walk(gl_dir):
×
406
            if files and ('outlines.shp' in files or
×
407
                          'outlines.tar.gz' in files):
408
                gdirs.append(oggm.GlacierDirectory(os.path.basename(root)))
×
409
    else:
410
        # Create glacier directories from input
411
        # Check if dataframe or list of str
412
        try:
7✔
413
            entities = []
7✔
414
            for _, entity in rgidf.iterrows():
7✔
415
                entities.append(entity)
6✔
416
        except AttributeError:
2✔
417
            entities = utils.tolist(rgidf)
2✔
418

419
        if from_prepro_level is not None:
7✔
420
            log.workflow('init_glacier_directories from prepro level {} on '
2✔
421
                         '{} glaciers.'.format(from_prepro_level,
422
                                               len(entities)))
423
            # Read the hash dictionary before we use multiproc
424
            if cfg.PARAMS['dl_verify']:
2✔
425
                utils.get_dl_verify_data('cluster.klima.uni-bremen.de')
1✔
426
            gdirs = execute_entity_task(gdir_from_prepro, entities,
2✔
427
                                        from_prepro_level=from_prepro_level,
428
                                        prepro_border=prepro_border,
429
                                        prepro_rgi_version=prepro_rgi_version,
430
                                        base_url=prepro_base_url)
431
        else:
432
            # We can set the intersects file automatically here
433
            if (cfg.PARAMS['use_intersects'] and
6✔
434
                    len(cfg.PARAMS['intersects_gdf']) == 0 and
435
                    not from_tar):
436
                try:
×
437
                    rgi_ids = np.unique(np.sort([entity.rgi_id for entity in
×
438
                                                 entities]))
439
                    rgi_version = rgi_ids[0].split('-')[0][-2:]
×
440
                    fp = utils.get_rgi_intersects_entities(rgi_ids,
×
441
                                                           version=rgi_version)
442
                    cfg.set_intersects_db(fp)
×
443
                except AttributeError:
×
444
                    # RGI V6
445
                    try:
×
446
                        rgi_ids = np.unique(np.sort([entity.RGIId for entity in
×
447
                                                     entities]))
448
                        rgi_version = rgi_ids[0].split('-')[0][-2:]
×
449
                        fp = utils.get_rgi_intersects_entities(rgi_ids,
×
450
                                                               version=rgi_version)
451
                        cfg.set_intersects_db(fp)
×
452
                    except AttributeError:
×
453
                        # List of str
454
                        pass
×
455

456
            if _isdir(from_tar):
6✔
457
                gdirs = execute_entity_task(gdir_from_tar, entities,
1✔
458
                                            from_tar=from_tar)
459
            else:
460
                gdirs = execute_entity_task(utils.GlacierDirectory, entities,
6✔
461
                                            reset=reset,
462
                                            from_tar=from_tar,
463
                                            delete_tar=delete_tar)
464

465
    return gdirs
7✔
466

467

468
@global_task(log)
9✔
469
def gis_prepro_tasks(gdirs):
9✔
470
    """Run all flowline preprocessing tasks on a list of glaciers.
471

472
    Parameters
473
    ----------
474
    gdirs : list of :py:class:`oggm.GlacierDirectory` objects
475
        the glacier directories to process
476
    """
477

478
    task_list = [
4✔
479
        tasks.define_glacier_region,
480
        tasks.glacier_masks,
481
        tasks.compute_centerlines,
482
        tasks.initialize_flowlines,
483
        tasks.compute_downstream_line,
484
        tasks.compute_downstream_bedshape,
485
        tasks.catchment_area,
486
        tasks.catchment_intersections,
487
        tasks.catchment_width_geom,
488
        tasks.catchment_width_correction
489
    ]
490
    for task in task_list:
4✔
491
        execute_entity_task(task, gdirs)
4✔
492

493

494
@global_task(log)
9✔
495
def climate_tasks(gdirs, overwrite_gdir=False, override_missing=None):
9✔
496
    """Run all climate related entity tasks on a list of glaciers.
497
    Parameters
498
    ----------
499
    gdirs : list of :py:class:`oggm.GlacierDirectory` objects
500
        the glacier directories to process
501
    """
502

503
    # Process climate data
504
    execute_entity_task(tasks.process_climate_data, gdirs)
2✔
505
    # mass balance and the apparent mass balance
506
    execute_entity_task(tasks.mb_calibration_from_geodetic_mb, gdirs,
2✔
507
                        override_missing=override_missing,
508
                        overwrite_gdir=overwrite_gdir)
509
    execute_entity_task(tasks.apparent_mb_from_any_mb, gdirs)
2✔
510

511

512
@global_task(log)
9✔
513
def inversion_tasks(gdirs, glen_a=None, fs=None, filter_inversion_output=True,
9✔
514
                    add_to_log_file=True):
515
    """Run all ice thickness inversion tasks on a list of glaciers.
516

517
    Quite useful to deal with calving glaciers as well.
518

519
    Parameters
520
    ----------
521
    gdirs : list of :py:class:`oggm.GlacierDirectory` objects
522
        the glacier directories to process
523
    add_to_log_file : bool
524
        if the called entity tasks should write into log of gdir. Default True
525
    """
526

527
    if cfg.PARAMS['use_kcalving_for_inversion']:
6✔
528
        # Differentiate between calving and non-calving glaciers
529
        gdirs_nc = []
3✔
530
        gdirs_c = []
3✔
531
        for gd in gdirs:
3✔
532
            if gd.is_tidewater:
3✔
533
                gdirs_c.append(gd)
3✔
534
            else:
535
                gdirs_nc.append(gd)
2✔
536

537
        log.workflow('Starting inversion tasks for {} tidewater and {} '
3✔
538
                     'non-tidewater glaciers.'.format(len(gdirs_c),
539
                                                      len(gdirs_nc)))
540

541
        if gdirs_nc:
3✔
542
            execute_entity_task(tasks.prepare_for_inversion, gdirs_nc,
2✔
543
                                add_to_log_file=add_to_log_file)
544
            execute_entity_task(tasks.mass_conservation_inversion, gdirs_nc,
2✔
545
                                glen_a=glen_a, fs=fs,
546
                                add_to_log_file=add_to_log_file)
547
            if filter_inversion_output:
2✔
548
                execute_entity_task(tasks.filter_inversion_output, gdirs_nc,
2✔
549
                                    add_to_log_file=add_to_log_file)
550

551
        if gdirs_c:
3✔
552
            execute_entity_task(tasks.find_inversion_calving_from_any_mb,
3✔
553
                                gdirs_c,
554
                                glen_a=glen_a, fs=fs,
555
                                add_to_log_file=add_to_log_file)
556
    else:
557
        execute_entity_task(tasks.prepare_for_inversion, gdirs,
4✔
558
                            add_to_log_file=add_to_log_file)
559
        execute_entity_task(tasks.mass_conservation_inversion, gdirs,
4✔
560
                            glen_a=glen_a, fs=fs,
561
                            add_to_log_file=add_to_log_file)
562
        if filter_inversion_output:
4✔
563
            execute_entity_task(tasks.filter_inversion_output, gdirs,
4✔
564
                                add_to_log_file=add_to_log_file)
565

566

567
@global_task(log)
9✔
568
def calibrate_inversion_from_consensus(gdirs, ignore_missing=True,
9✔
569
                                       fs=0, a_bounds=(0.1, 10),
570
                                       apply_fs_on_mismatch=False,
571
                                       error_on_mismatch=True,
572
                                       filter_inversion_output=True,
573
                                       volume_m3_reference=None,
574
                                       add_to_log_file=True):
575
    """Fit the total volume of the glaciers to the 2019 consensus estimate.
576

577
    This method finds the "best Glen A" to match all glaciers in gdirs with
578
    a valid inverted volume.
579

580
    Parameters
581
    ----------
582
    gdirs : list of :py:class:`oggm.GlacierDirectory` objects
583
        the glacier directories to process
584
    ignore_missing : bool
585
        set this to true to silence the error if some glaciers could not be
586
        found in the consensus estimate.
587
    fs : float
588
        invert with sliding (default: no)
589
    a_bounds: tuple
590
        factor to apply to default A
591
    apply_fs_on_mismatch: false
592
        on mismatch, try to apply an arbitrary value of fs (fs = 5.7e-20 from
593
        Oerlemans) and try to optimize A again.
594
    error_on_mismatch: bool
595
        sometimes the given bounds do not allow to find a zero mismatch:
596
        this will normally raise an error, but you can switch this off,
597
        use the closest value instead and move on.
598
    filter_inversion_output : bool
599
        whether or not to apply terminus thickness filtering on the inversion
600
        output (needs the downstream lines to work).
601
    volume_m3_reference : float
602
        Option to give an own total glacier volume to match to
603
    add_to_log_file : bool
604
        if the called entity tasks should write into log of gdir. Default True
605

606
    Returns
607
    -------
608
    a dataframe with the individual glacier volumes
609
    """
610

611
    gdirs = utils.tolist(gdirs)
5✔
612

613
    # Get the ref data for the glaciers we have
614
    df = pd.read_hdf(utils.get_demo_file('rgi62_itmix_df.h5'))
5✔
615
    rids = [gdir.rgi_id for gdir in gdirs]
5✔
616

617
    found_ids = df.index.intersection(rids)
5✔
618
    if not ignore_missing and (len(found_ids) != len(rids)):
5✔
619
        raise InvalidWorkflowError('Could not find matching indices in the '
×
620
                                   'consensus estimate for all provided '
621
                                   'glaciers. Set ignore_missing=True to '
622
                                   'ignore this error.')
623

624
    df = df.reindex(rids)
5✔
625

626
    # Optimize the diff to ref
627
    def_a = cfg.PARAMS['inversion_glen_a']
5✔
628

629
    def compute_vol(x):
5✔
630
        inversion_tasks(gdirs, glen_a=x*def_a, fs=fs,
5✔
631
                        filter_inversion_output=filter_inversion_output,
632
                        add_to_log_file=add_to_log_file)
633
        odf = df.copy()
5✔
634
        odf['oggm'] = execute_entity_task(tasks.get_inversion_volume, gdirs,
5✔
635
                                          add_to_log_file=add_to_log_file)
636
        # if the user provides a glacier volume all glaciers are considered,
637
        # dropna() below exclude glaciers where no ITMIX volume is available
638
        if volume_m3_reference is None:
5✔
639
            return odf.dropna()
5✔
640
        else:
641
            return odf
4✔
642

643
    def to_minimize(x):
5✔
644
        log.workflow('Consensus estimate optimisation with '
5✔
645
                     'A factor: {} and fs: {}'.format(x, fs))
646
        odf = compute_vol(x)
5✔
647
        if volume_m3_reference is None:
5✔
648
            return odf.vol_itmix_m3.sum() - odf.oggm.sum()
5✔
649
        else:
650
            return volume_m3_reference - odf.oggm.sum()
4✔
651

652
    try:
5✔
653
        out_fac, r = optimization.brentq(to_minimize, *a_bounds, rtol=1e-2,
5✔
654
                                         full_output=True)
655
        if r.converged:
5✔
656
            log.workflow('calibrate_inversion_from_consensus '
5✔
657
                         'converged after {} iterations and fs={}. The '
658
                         'resulting Glen A factor is {}.'
659
                         ''.format(r.iterations, fs, out_fac))
660
        else:
661
            raise ValueError('Unexpected error in optimization.brentq')
×
662
    except ValueError:
1✔
663
        # Ok can't find an A. Log for debug:
664
        odf1 = compute_vol(a_bounds[0]).sum() * 1e-9
1✔
665
        odf2 = compute_vol(a_bounds[1]).sum() * 1e-9
1✔
666
        if volume_m3_reference is None:
1✔
667
            ref_vol_1 = odf1.vol_itmix_m3
1✔
668
            ref_vol_2 = odf2.vol_itmix_m3
1✔
669
        else:
670
            ref_vol_1 = volume_m3_reference * 1e-9
×
671
            ref_vol_2 = volume_m3_reference * 1e-9
×
672
        msg = ('calibration from consensus estimate CAN\'T converge with fs={}.\n'
1✔
673
               'Bound values (km3):\nRef={:.3f} OGGM={:.3f} for A factor {}\n'
674
               'Ref={:.3f} OGGM={:.3f} for A factor {}'
675
               ''.format(fs,
676
                         ref_vol_1, odf1.oggm, a_bounds[0],
677
                         ref_vol_2, odf2.oggm, a_bounds[1]))
678
        if apply_fs_on_mismatch and fs == 0 and odf2.oggm > ref_vol_2:
1✔
679
            return calibrate_inversion_from_consensus(gdirs,
1✔
680
                                                      ignore_missing=ignore_missing,
681
                                                      fs=5.7e-20, a_bounds=a_bounds,
682
                                                      apply_fs_on_mismatch=False,
683
                                                      error_on_mismatch=error_on_mismatch,
684
                                                      volume_m3_reference=volume_m3_reference,
685
                                                      filter_inversion_output=filter_inversion_output,
686
                                                      add_to_log_file=add_to_log_file)
687
        if error_on_mismatch:
1✔
688
            raise ValueError(msg)
1✔
689

690
        out_fac = a_bounds[int(abs(ref_vol_1 - odf1.oggm) >
1✔
691
                               abs(ref_vol_2 - odf2.oggm))]
692
        log.workflow(msg)
1✔
693
        log.workflow('We use A factor = {} and fs = {} and move on.'
1✔
694
                     ''.format(out_fac, fs))
695

696
    # Compute the final volume with the correct A
697
    inversion_tasks(gdirs, glen_a=out_fac*def_a, fs=fs,
5✔
698
                    filter_inversion_output=filter_inversion_output,
699
                    add_to_log_file=add_to_log_file)
700
    df['vol_oggm_m3'] = execute_entity_task(tasks.get_inversion_volume, gdirs,
5✔
701
                                            add_to_log_file=add_to_log_file)
702
    return df
5✔
703

704

705
@global_task(log)
9✔
706
def merge_glacier_tasks(gdirs, main_rgi_id=None, return_all=False, buffer=None,
9✔
707
                        **kwargs):
708
    """Shortcut function: run all tasks to merge tributaries to a main glacier
709

710
    Parameters
711
    ----------
712
    gdirs : list of :py:class:`oggm.GlacierDirectory`
713
        all glaciers, main and tributary. Preprocessed and initialised
714
    main_rgi_id: str
715
        RGI ID of the main glacier of interest. If None is provided merging
716
        will start based upon the largest glacier
717
    return_all : bool
718
        if main_rgi_id is given and return_all = False: only the main glacier
719
        is returned
720
        if main_rgi_is given and return_all = True, the main glacier and every
721
        remaining glacier from the initial gdirs list is returned, possible
722
        merged as well.
723
    buffer : float
724
        buffer around a flowline to first better find an overlap with another
725
        flowline. And second assure some distance between the lines at a
726
        junction. Will default to `cfg.PARAMS['kbuffer']`.
727
    kwargs: keyword argument for the recursive merging
728

729
    Returns
730
    -------
731
    merged_gdirs: list of all merged :py:class:`oggm.GlacierDirectory`
732
    """
733

734
    if len(gdirs) > 100:
×
735
        raise InvalidParamsError('this could take time! I should include an '
×
736
                                 'optional parameter to ignore this.')
737

738
    # sort all glaciers descending by area
739
    gdirs.sort(key=lambda x: x.rgi_area_m2, reverse=True)
×
740

741
    # if main glacier is asked, put it in first position
742
    if main_rgi_id is not None:
×
743
        gdir_main = [gd for gd in gdirs if gd.rgi_id == main_rgi_id][0]
×
744
        gdirs.remove(gdir_main)
×
745
        gdirs = [gdir_main] + gdirs
×
746

747
    merged_gdirs = []
×
748
    while len(gdirs) > 1:
×
749
        # main glacier is always the first: either given or the largest one
750
        gdir_main = gdirs.pop(0)
×
751
        gdir_merged, gdirs = _recursive_merging(gdirs, gdir_main, **kwargs)
×
752
        merged_gdirs.append(gdir_merged)
×
753

754
    # now we have gdirs which contain all the necessary flowlines,
755
    # time to clean them up
756
    for gdir in merged_gdirs:
×
757
        flowline.clean_merged_flowlines(gdir, buffer=buffer)
×
758

759
    if main_rgi_id is not None and return_all is False:
×
760
        return [gd for gd in merged_gdirs if main_rgi_id in gd.rgi_id][0]
×
761

762
    # add the remaining glacier to the final list
763
    merged_gdirs = merged_gdirs + gdirs
×
764

765
    return merged_gdirs
×
766

767

768
def _recursive_merging(gdirs, gdir_main, glcdf=None, dem_source=None,
9✔
769
                       filename='climate_historical', input_filesuffix=''):
770
    """ Recursive function to merge all tributary glaciers.
771

772
    This function should start with the largest glacier and then be called
773
    upon all smaller glaciers.
774

775
    Parameters
776
    ----------
777
    gdirs : list of :py:class:`oggm.GlacierDirectory`
778
        all glaciers, main and tributary. Preprocessed and initialised
779
    gdir_main: :py:class:`oggm.GlacierDirectory`
780
        the current main glacier where the others are merge to
781
    glcdf: geopandas.GeoDataFrame
782
        which contains the main glaciers, will be downloaded if None
783
    filename: str
784
        Baseline climate file
785
    dem_source: str
786
        the DEM source to use
787
    input_filesuffix: str
788
        Filesuffix to the climate file
789

790
    Returns
791
    -------
792
    merged_gdir: :py:class:`oggm.GlacierDirectory`
793
        the mergeed current main glacier
794
    gdirs : list of :py:class:`oggm.GlacierDirectory`
795
        updated list of glaciers, removed the already merged ones
796
    """
797
    # find glaciers which intersect with the main
798
    tributaries = centerlines.intersect_downstream_lines(gdir_main,
×
799
                                                         candidates=gdirs)
800
    if len(tributaries) == 0:
×
801
        # if no tributaries: nothing to do
802
        return gdir_main, gdirs
×
803

804
    # separate those glaciers which are not already found to be a tributary
805
    gdirs = [gd for gd in gdirs if gd not in tributaries]
×
806

807
    gdirs_to_merge = []
×
808

809
    for trib in tributaries:
×
810
        # for each tributary: check if we can merge additional glaciers to it
811
        merged, gdirs = _recursive_merging(gdirs, trib, glcdf=glcdf,
×
812
                                           filename=filename,
813
                                           input_filesuffix=input_filesuffix,
814
                                           dem_source=dem_source)
815
        gdirs_to_merge.append(merged)
×
816

817
    # create merged glacier directory
818
    gdir_merged = utils.initialize_merged_gdir(
×
819
        gdir_main, tribs=gdirs_to_merge, glcdf=glcdf, filename=filename,
820
        input_filesuffix=input_filesuffix, dem_source=dem_source)
821

822
    flowline.merge_to_one_glacier(gdir_merged, gdirs_to_merge,
×
823
                                  filename=filename,
824
                                  input_filesuffix=input_filesuffix)
825

826
    return gdir_merged, gdirs
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc