• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

johntruckenbrodt / pyroSAR / 8967262012

06 May 2024 09:54AM UTC coverage: 52.274% (+0.4%) from 51.879%
8967262012

Pull #305

github

johntruckenbrodt
Merge branch 'main' of https://github.com/johntruckenbrodt/pyroSAR into feature/config
Pull Request #305: revise configuration handling

154 of 174 new or added lines in 5 files covered. (88.51%)

1 existing line in 1 file now uncovered.

3643 of 6969 relevant lines covered (52.27%)

0.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.52
/pyroSAR/examine.py
1
###############################################################################
2
# Examination of SAR processing software
3
# Copyright (c) 2019-2024, the pyroSAR Developers.
4

5
# This file is part of the pyroSAR Project. It is subject to the
6
# license terms in the LICENSE.txt file found in the top-level
7
# directory of this distribution and at
8
# https://github.com/johntruckenbrodt/pyroSAR/blob/master/LICENSE.txt.
9
# No part of the pyroSAR project, including this file, may be
10
# copied, modified, propagated, or distributed except according
11
# to the terms contained in the LICENSE.txt file.
12
###############################################################################
13
import json
1✔
14
import os
1✔
15
import shutil
1✔
16
import platform
1✔
17
import re
1✔
18
import warnings
1✔
19
import subprocess as sp
1✔
20
import importlib.resources
1✔
21

22
from pyroSAR.config import ConfigHandler
1✔
23
from spatialist.ancillary import finder, run
1✔
24

25
import logging
1✔
26

27
log = logging.getLogger(__name__)
1✔
28

29
config = ConfigHandler()
1✔
30

31

32
class ExamineSnap(object):
1✔
33
    """
34
    Class to check if ESA SNAP is installed.
35
    Upon initialization, this class searches for relevant binaries and the accompanying
36
    relative directory structure, which uniquely identify an ESA SNAP installation on a system.
37
    First, all relevant file and folder names are read from the pyroSAR config file if it exists
38
    and their existence is verified.
39
    If this fails, a system check is performed to find relevant binaries in the system PATH variable and
40
    additional files and folders relative to them.
41
    Furthermore, a snap.auxdata.properties file is scanned for auxiliary data URLs and local storage location.
42
    This is used by SNAP to manage data from e.g. the SRTM mission. In case SNAP is not installed, the respective
43
    information is read from a default file delivered with pyroSAR. This has the advantage of using the SNAP download
44
    URLs and local directory structure without having SNAP installed such that it can be adapted by other SAR software.
45
    """
46
    
47
    def __init__(self):
1✔
48
        # update legacy config files
49
        if 'OUTPUT' in config.sections:
1✔
NEW
50
            config.remove_section('OUTPUT')
×
51
        if 'SNAP' in config.sections:
1✔
52
            snap_keys = config.keys('SNAP')
1✔
53
            for key in ['auxdata', 'auxdatapath', 'properties']:
1✔
54
                if key in snap_keys:
1✔
NEW
55
                    config.remove_option(section='SNAP', key=key)
×
56
        
57
        # define some attributes which identify SNAP
58
        self.identifiers = ['path', 'gpt', 'etc']
1✔
59
        
60
        # a list of relevant sections
61
        self.sections = ['SNAP', 'SNAP_SUFFIX']
1✔
62
        
63
        # set attributes path, gpt, etc, __suffices
64
        self.__read_config()
1✔
65
        
66
        # if SNAP could not be identified from the config attributes, do a system search for it
67
        # sets attributes path, gpt, etc
68
        if not self.__is_identified():
1✔
69
            log.debug('identifying SNAP')
1✔
70
            self.__identify_snap()
1✔
71
        
72
        # if the SNAP suffices attribute was not yet identified,
73
        # point it to the default file delivered with pyroSAR
74
        if not hasattr(self, 'suffices'):
1✔
75
            dir_data = importlib.resources.files('pyroSAR') / 'snap' / 'data'
1✔
76
            fname_suffices = str(dir_data / 'snap.suffices.properties')
1✔
77
            with open(fname_suffices, 'r') as infile:
1✔
78
                content = infile.read().split('\n')
1✔
79
            self.__suffices = {k: v for k, v in [x.split('=') for x in content]}
1✔
80
        
81
        # SNAP property read/modification interface
82
        self.snap_properties = SnapProperties(path=os.path.dirname(self.etc))
1✔
83
        
84
        # update the config file: this scans for config changes and re-writes the config file if any are found
85
        self.__update_config()
1✔
86
    
87
    def __getattr__(self, item):
1✔
88
        raise AttributeError("'ExamineSnap' object has no attribute '{}'".format(item))
1✔
89
    
90
    def __is_identified(self):
1✔
91
        """
92
        Check if SNAP has been properly identified, i.e. all paths in self.identifiers
93
        have been detected and confirmed.
94
        
95
        Returns
96
        -------
97
        bool
98
        """
99
        return sum([hasattr(self, x) for x in self.identifiers]) == len(self.identifiers)
1✔
100
    
101
    def __identify_snap(self):
1✔
102
        """
103
        do a comprehensive search for an ESA SNAP installation
104
        
105
        Returns
106
        -------
107
        bool
108
            has the SNAP properties file been changed?
109
        """
110
        # create a list of possible SNAP executables
111
        defaults = ['snap64.exe', 'snap32.exe', 'snap.exe', 'snap']
1✔
112
        paths = os.environ['PATH'].split(os.path.pathsep)
1✔
113
        options = [os.path.join(path, option) for path in paths for option in defaults]
1✔
114
        options = [x for x in options if os.path.isfile(x)]
1✔
115
        
116
        if not hasattr(self, 'path') or not os.path.isfile(self.path):
1✔
117
            executables = options
1✔
118
        else:
119
            executables = [self.path] + options
×
120
        
121
        if len(executables) == 0:
1✔
122
            log.debug("could not detect any potential 'snap' executables")
×
123
        
124
        # for each possible SNAP executable, check whether additional files and directories exist relative to it
125
        # to confirm whether it actually is an ESA SNAP installation or something else like e.g. the Ubuntu App Manager
126
        for path in executables:
1✔
127
            log.debug('checking candidate {}'.format(path))
1✔
128
            if os.path.islink(path):
1✔
129
                path = os.path.realpath(path)
1✔
130
            
131
            # check whether a directory etc exists relative to the SNAP executable
132
            etc = os.path.join(os.path.dirname(os.path.dirname(path)), 'etc')
1✔
133
            if not os.path.isdir(etc):
1✔
134
                log.debug("could not find the 'etc' directory")
×
135
                continue
×
136
            
137
            # check the content of the etc directory
138
            config_files = os.listdir(etc)
1✔
139
            expected = ['snap.auxdata.properties', 'snap.clusters',
1✔
140
                        'snap.conf', 'snap.properties']
141
            for name in expected:
1✔
142
                if name not in config_files:
1✔
NEW
143
                    log.debug("could not find the 'snap.auxdata.properties' file")
×
NEW
144
                    continue
×
145
            
146
            # identify the gpt executable
147
            gpt_candidates = finder(os.path.dirname(path), ['gpt', 'gpt.exe'])
1✔
148
            if len(gpt_candidates) == 0:
1✔
149
                log.debug("could not find the 'gpt' executable")
×
150
                continue
×
151
            else:
152
                gpt = gpt_candidates[0]
1✔
153
            
154
            self.path = path
1✔
155
            self.etc = etc
1✔
156
            self.gpt = gpt
1✔
157
            return
1✔
158
        
159
        log.warning('SNAP could not be identified. If you have installed it please add the path to the SNAP '
×
160
                    'executables (bin subdirectory) to the PATH environment. '
161
                    'E.g. in the Linux .bashrc file add the following line:\nexport PATH=$PATH:path/to/snap/bin"')
162
    
163
    def __read_config(self):
1✔
164
        """
165
        This method reads the config.ini to examine the snap paths.
166
        If the snap paths are not in the config.ini or the paths are
167
        wrong they will be automatically created.
168

169
        Returns
170
        -------
171

172
        """
173
        for attr in self.identifiers:
1✔
174
            self.__read_config_attr(attr, section='SNAP')
1✔
175
        
176
        suffices = {}
1✔
177
        if 'SNAP_SUFFIX' in config.sections:
1✔
178
            suffices = config['SNAP_SUFFIX']
1✔
179
        if len(suffices.keys()) > 0:
1✔
180
            self.__suffices = suffices
1✔
181
    
182
    def __read_config_attr(self, attr, section):
1✔
183
        """
184
        read an attribute from the config file and set it as an object attribute
185
        
186
        Parameters
187
        ----------
188
        attr: str
189
            the attribute name
190
        section: str
191
            the config section to read the attribute from
192
        
193
        Returns
194
        -------
195
        
196
        """
197
        if section in config.sections:
1✔
198
            if attr in config[section].keys():
1✔
199
                val = config[section][attr]
1✔
200
                if os.path.exists(val):
1✔
201
                    # log.info('setting attribute {}'.format(attr))
202
                    setattr(self, attr, val)
1✔
203
    
204
    def __update_config(self):
1✔
205
        for section in self.sections:
1✔
206
            if section not in config.sections:
1✔
207
                # log.info('creating section {}..'.format(section))
208
                config.add_section(section)
1✔
209
        
210
        for key in self.identifiers:
1✔
211
            if hasattr(self, key):
1✔
212
                self.__update_config_attr(key, getattr(self, key), 'SNAP')
1✔
213
        
214
        for key in sorted(self.__suffices.keys()):
1✔
215
            self.__update_config_attr(key, self.__suffices[key], 'SNAP_SUFFIX')
1✔
216
    
217
    @staticmethod
1✔
218
    def __update_config_attr(attr, value, section):
1✔
219
        if isinstance(value, list):
1✔
UNCOV
220
            value = json.dumps(value)
×
221
        
222
        if attr not in config[section].keys() or config[section][attr] != value:
1✔
223
            # log.info('updating attribute {0}:{1}..'.format(section, attr))
224
            # log.info('  {0} -> {1}'.format(repr(config[section][attr]), repr(value)))
225
            config.set(section, key=attr, value=value, overwrite=True)
1✔
226
    
227
    def get_suffix(self, operator):
1✔
228
        """
229
        get the file name suffix for an operator
230
        
231
        Parameters
232
        ----------
233
        operator: str
234
            the name of the operator
235

236
        Returns
237
        -------
238
        str or None
239
            the file suffix or None if unknown
240
        
241
        Examples
242
        --------
243
        >>> from pyroSAR.examine import ExamineSnap
244
        >>> config = ExamineSnap()
245
        >>> print(config.get_suffix('Terrain-Flattening'))
246
        'TF'
247
        """
248
        if operator in self.__suffices.keys():
1✔
249
            return self.__suffices[operator]
1✔
250
        else:
251
            return None
1✔
252
    
253
    def get_version(self, module):
1✔
254
        """
255
        Read the version and date of different SNAP modules.
256
        This scans a file 'messages.log', which is re-written every time SNAP is started.
257
        
258
        Parameters
259
        ----------
260
        module: str
261
            one of the following
262
            
263
            - core
264
            - desktop
265
            - rstb
266
            - opttbx
267
            - microwavetbx
268

269
        Returns
270
        -------
271
        dict
272
            a dictionary with keys 'version' and 'date'
273
        """
274
        # base search patterns for finding the right lines
275
        patterns = {'core': r'org\.esa\.snap\.snap\.core',
1✔
276
                    'desktop': r'org\.esa\.snap\.snap\.ui',
277
                    'rstb': r'org\.csa\.rstb\.rstb\.kit',
278
                    'opttbx': r'eu\.esa\.opt\.opttbx\.kit',
279
                    'microwavetbx': r'eu\.esa\.microwavetbx\.microwavetbx\.kit'}
280
        
281
        if module in patterns.keys():
1✔
282
            pattern = patterns[module]
1✔
283
            pattern += r' \[(?P<version>[0-9.]+) [0-9.]+ (?P<date>[0-9]{12})'
1✔
284
        else:
285
            raise RuntimeError('module not supported')
×
286
        
287
        system = platform.system()
1✔
288
        if system in ['Linux', 'Darwin']:
1✔
289
            path = os.path.join(os.path.expanduser('~'), '.snap', 'system')
1✔
290
        elif system == 'Windows':
×
291
            path = os.path.join(os.environ['APPDATA'], 'SNAP')
×
292
        else:
293
            raise RuntimeError('operating system not supported')
×
294
        
295
        conda_env_path = os.environ.get('CONDA_PREFIX')
1✔
296
        if conda_env_path is not None and conda_env_path in self.gpt:
1✔
297
            fname = os.path.join(conda_env_path, 'snap', '.snap', 'system', 'var', 'log', 'messages.log')
×
298
        else:
299
            fname = os.path.join(path, 'var', 'log', 'messages.log')
1✔
300
        
301
        if not os.path.isfile(fname):
1✔
302
            try:
1✔
303
                # This will start SNAP and immediately stop it because of the invalid argument.
304
                # Currently, this seems to be the only way to create the messages.log file if it does not exist.
305
                sp.check_call([self.path, '--nosplash', '--dummytest', '--console', 'suppress'])
1✔
306
            except sp.CalledProcessError:
1✔
307
                pass
1✔
308
        
309
        if not os.path.isfile(fname):
1✔
310
            raise RuntimeError("cannot find 'messages.log' to read SNAP module versions from.")
×
311
        
312
        with open(fname, 'r') as m:
1✔
313
            content = m.read()
1✔
314
        match = re.search(pattern, content)
1✔
315
        if match is None:
1✔
316
            raise RuntimeError('cannot read version information from {}.\nPlease restart SNAP.'.format(fname))
×
317
        return match.groupdict()
1✔
318
    
319
    @property
1✔
320
    def auxdatapath(self):
1✔
321
        out = self.snap_properties['AuxDataPath']
1✔
322
        if out is None:
1✔
323
            out = os.path.join(self.userpath, 'auxdata')
1✔
324
        return out
1✔
325
    
326
    @auxdatapath.setter
1✔
327
    def auxdatapath(self, value):
1✔
NEW
328
        self.snap_properties['AuxDataPath'] = value
×
329
    
330
    @property
1✔
331
    def userpath(self):
1✔
332
        return self.snap_properties.userpath
1✔
333
    
334
    @userpath.setter
1✔
335
    def userpath(self, value):
1✔
NEW
336
        self.snap_properties.userpath = value
×
337

338

339
class ExamineGamma(object):
1✔
340
    """
341
    Class to check if GAMMA is installed.
342
    
343
    Examples
344
    --------
345
    >>> from pyroSAR.examine import ExamineGamma
346
    >>> config = ExamineGamma()
347
    >>> print(config.home)
348
    >>> print(config.version)
349
    
350
    """
351
    
352
    def __init__(self):
1✔
353
        home_sys = os.environ.get('GAMMA_HOME')
1✔
354
        if home_sys is not None and not os.path.isdir(home_sys):
1✔
355
            warnings.warn('found GAMMA_HOME environment variable, but directory does not exist')
×
356
            home_sys = None
×
357
        
358
        self.__read_config()
1✔
359
        
360
        if hasattr(self, 'home'):
1✔
361
            if home_sys is not None and self.home != home_sys:
×
362
                log.info('the value of GAMMA_HOME is different to that in the pyroSAR configuration;\n'
×
363
                         '  was: {}\n'
364
                         '  is : {}\n'
365
                         'resetting the configuration and deleting parsed modules'
366
                         .format(self.home, home_sys))
367
                parsed = os.path.join(os.path.dirname(self.fname), 'gammaparse')
×
368
                shutil.rmtree(parsed)
×
369
                self.home = home_sys
×
370
        if not hasattr(self, 'home'):
1✔
371
            if home_sys is not None:
1✔
372
                setattr(self, 'home', home_sys)
×
373
            else:
374
                raise RuntimeError('could not read GAMMA installation directory')
1✔
375
        self.version = re.search('GAMMA_SOFTWARE[-/](?P<version>[0-9]{8})',
×
376
                                 getattr(self, 'home')).group('version')
377
        
378
        try:
×
379
            out, err = run(['which', 'gdal-config'], void=False)
×
380
            gdal_config = out.strip('\n')
×
381
            self.gdal_config = gdal_config
×
NEW
382
        except sp.CalledProcessError:
×
383
            raise RuntimeError('could not find command gdal-config.')
×
384
        self.__update_config()
×
385
    
386
    def __read_config(self):
1✔
387
        self.fname = config.file
1✔
388
        if 'GAMMA' in config.sections:
1✔
389
            attr = config['GAMMA']
×
390
            for key, value in attr.items():
×
391
                setattr(self, key, value)
×
392
    
393
    def __update_config(self):
1✔
394
        if 'GAMMA' not in config.sections:
×
395
            config.add_section('GAMMA')
×
396
        
397
        for attr in ['home', 'version']:
×
398
            self.__update_config_attr(attr, getattr(self, attr), 'GAMMA')
×
399
    
400
    @staticmethod
1✔
401
    def __update_config_attr(attr, value, section):
1✔
402
        if isinstance(value, list):
×
403
            value = json.dumps(value)
×
404
        
405
        if attr not in config[section].keys() or config[section][attr] != value:
×
406
            config.set(section, key=attr, value=value, overwrite=True)
×
407

408

409
class SnapProperties(object):
1✔
410
    """
411
    SNAP configuration interface. This class enables reading and modifying
412
    SNAP configuration in properties files. Modified properties are directly
413
    written to the files.
414
    Currently, the files `snap.properties` and `snap.auxdata.properties` are
415
    supported. These files can be found in two locations:
416
    
417
    - `<SNAP installation directory>/etc`
418
    - `<user directory>/.snap/etc`
419
    
420
    Configuration in the latter has higher priority and modified properties will
421
    always be written there so that the installation directory is not modified.
422

423
    Parameters
424
    ----------
425
    path: str
426
        SNAP installation directory path
427
    
428
    Examples
429
    --------
430
    >>> from pyroSAR.examine import SnapProperties
431
    >>> config = SnapProperties()
432
    >>> config['snap.userdir'] = '/path/to/snap/auxdata'
433
    """
434
    
435
    def __init__(self, path):
1✔
436
        self.pattern = r'^(?P<comment>#?)(?P<key>[\w\.]*)[ ]*=[ ]*(?P<value>.*)\n*'
1✔
437
        self.properties_path = os.path.join(path, 'etc', 'snap.properties')
1✔
438
        self.auxdata_properties_path = os.path.join(path, 'etc', 'snap.auxdata.properties')
1✔
439
        
440
        log.debug(f"reading {self.properties_path}")
1✔
441
        self.properties = self._to_dict(self.properties_path)
1✔
442
        self.auxdata_properties = self._to_dict(self.auxdata_properties_path)
1✔
443
        
444
        self._dicts = [self.properties, self.auxdata_properties]
1✔
445
        
446
        # some properties need to be read from the default directory path to
447
        # be visible to SNAP
448
        userpath_default = os.path.join(os.path.expanduser('~'), '.snap')
1✔
449
        properties_path_default = os.path.join(userpath_default, 'etc', 'snap.properties')
1✔
450
        log.debug(f"reading {properties_path_default}")
1✔
451
        self.properties.update(self._to_dict(properties_path_default))
1✔
452
        
453
        if self.userpath is None:
1✔
NEW
454
            self.userpath = userpath_default
×
455
        log.debug(f"reading {self.userpath_properties}")
1✔
456
        conf = self._to_dict(self.userpath_properties)
1✔
457
        log.debug(f"updating keys {list(conf.keys())}")
1✔
458
        self.properties.update(conf)
1✔
459
        self.auxdata_properties.update(self._to_dict(self.userpath_auxdata_properties))
1✔
460
    
461
    def __getitem__(self, key):
1✔
462
        """
463
        
464
        Parameters
465
        ----------
466
        key: str
467
        
468

469
        Returns
470
        -------
471

472
        """
473
        for section in self._dicts:
1✔
474
            if key in section:
1✔
475
                return section[key]
1✔
476
        raise KeyError(f'could not find key {key}')
1✔
477
    
478
    def __setitem__(self, key, value):
1✔
479
        """
480
        
481
        Parameters
482
        ----------
483
        key: str
484
        value: Any
485

486
        Returns
487
        -------
488

489
        """
490
        if value == self[key] and isinstance(value, type(self[key])):
1✔
NEW
491
            return
×
492
        if key in self.properties:
1✔
493
            self.properties[key] = value
1✔
494
        else:
NEW
495
            self.auxdata_properties[key] = value
×
496
        if value is not None:
1✔
497
            value = str(value).encode('unicode-escape').decode()
1✔
498
        if key in ['snap.home', 'snap.userdir']:
1✔
499
            path = os.path.join(os.path.expanduser('~'),
1✔
500
                                '.snap', 'etc', 'snap.properties')
501
        elif key in self.properties:
1✔
502
            path = self.userpath_properties
1✔
NEW
503
        elif key in self.auxdata_properties:
×
NEW
504
            path = self.userpath_auxdata_properties
×
505
        else:
NEW
506
            raise KeyError(f'unknown key {key}')
×
507
        if os.path.isfile(path):
1✔
508
            with open(path, 'r') as f:
1✔
509
                content = f.read()
1✔
510
        else:
511
            content = ''
1✔
512
        pattern = r'#?{}[ ]*=[ ]*(?P<value>.*)'.format(key)
1✔
513
        match = re.search(pattern, content)
1✔
514
        if match:
1✔
515
            repl = f'#{key} =' if value is None else f'{key} = {value}'
1✔
516
            content = content.replace(match.group(), repl)
1✔
517
        else:
518
            content += f'\n{key} = {value}'
1✔
519
        
520
        os.makedirs(os.path.dirname(path), exist_ok=True)
1✔
521
        log.debug(f"writing key '{key}' with value '{value}' to '{path}'")
1✔
522
        with open(path, 'w') as f:
1✔
523
            f.write(content)
1✔
524
    
525
    def _to_dict(self, path):
1✔
526
        """
527
        
528
        Parameters
529
        ----------
530
        path: str
531

532
        Returns
533
        -------
534
        dict
535
        """
536
        out = {}
1✔
537
        if os.path.isfile(path):
1✔
538
            with open(path, 'r') as f:
1✔
539
                for line in f:
1✔
540
                    if re.search(self.pattern, line):
1✔
541
                        match = re.match(re.compile(self.pattern), line)
1✔
542
                        comment, key, value = match.groups()
1✔
543
                        value = self._string_convert(value)
1✔
544
                        out[key] = value if comment == '' else None
1✔
545
        return out
1✔
546
    
547
    @staticmethod
1✔
548
    def _string_convert(string):
1✔
549
        if string.lower() == 'none':
1✔
NEW
550
            return None
×
551
        elif string.lower() == 'true':
1✔
552
            return True
1✔
553
        elif string.lower() == 'false':
1✔
NEW
554
            return False
×
555
        else:
556
            try:
1✔
557
                return int(string)
1✔
558
            except ValueError:
1✔
559
                try:
1✔
560
                    return float(string)
1✔
561
                except ValueError:
1✔
562
                    return string.replace('\\\\', '\\')
1✔
563
    
564
    def keys(self):
1✔
565
        """
566
        
567
        Returns
568
        -------
569
        list[str]
570
            all known SNAP property keys
571
        """
NEW
572
        keys = []
×
NEW
573
        for item in self._dicts:
×
NEW
574
            keys.extend(list(item.keys()))
×
NEW
575
        return sorted(keys)
×
576
    
577
    @property
1✔
578
    def userpath(self):
1✔
579
        out = self['snap.userdir']
1✔
580
        if out is None:
1✔
581
            return os.path.join(os.path.expanduser('~'), '.snap')
1✔
582
        else:
583
            return out
1✔
584
    
585
    @userpath.setter
1✔
586
    def userpath(self, value):
1✔
587
        self['snap.userdir'] = value
1✔
588
    
589
    @property
1✔
590
    def userpath_auxdata_properties(self):
1✔
591
        return os.path.join(os.path.expanduser('~'), '.snap',
1✔
592
                            'etc', 'snap.auxdata.properties')
593
    
594
    @property
1✔
595
    def userpath_properties(self):
1✔
596
        return os.path.join(os.path.expanduser('~'), '.snap',
1✔
597
                            'etc', 'snap.properties')
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc