• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nansencenter / django-geo-spaas-processing / 13811548473

12 Mar 2025 12:15PM UTC coverage: 100.0%. Remained the same
13811548473

Pull #133

github

aperrin66
add syntool config for current from CMEMS 008_046
Pull Request #133: Syntool config for CMEMS L4 current

6 of 6 new or added lines in 1 file covered. (100.0%)

53 existing lines in 11 files now uncovered.

1590 of 1590 relevant lines covered (100.0%)

4.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

100.0
/geospaas_processing/utils.py
1
"""Utility functions for geospaas_processing"""
2
import gzip
5✔
3
import logging
5✔
4
import math
5✔
5
import os
5✔
6
import os.path
5✔
7
import posixpath
5✔
8
import re
5✔
9
import shutil
5✔
10
import stat
5✔
11
import tarfile
5✔
12
import time
5✔
13
import yaml
5✔
14
import zipfile
5✔
15
from contextlib import contextmanager
5✔
16
from collections.abc import Sequence
5✔
17
from urllib.parse import urlparse
5✔
18

19
import paramiko
5✔
20
import requests
5✔
21
import scp
5✔
22
try:
5✔
23
    from redis import Redis
5✔
24
except ImportError:  # pragma: no cover
25
    Redis = None
26

27

28
LOGGER = logging.getLogger(__name__)
5✔
29
logging.getLogger('paramiko').setLevel(logging.WARNING)
5✔
30
LOCK_EXPIRE = 3600  # Lock expires in 1 hour
5✔
31
REDIS_HOST = os.getenv('GEOSPAAS_PROCESSING_REDIS_HOST', None)
5✔
32
REDIS_PORT = os.getenv('GEOSPAAS_PROCESSING_REDIS_PORT', None)
5✔
33

34

35
class CleanUpError(Exception):
5✔
36
    """Error while freeing space"""
37

38

39
class Storage():
5✔
40
    """Represents a storage location"""
41

42
    LOCK_PREFIX = 'lock_cleanup_'
5✔
43

44
    def __init__(self, **kwargs):
5✔
45
        """"""
46
        self.path = kwargs['path']
5✔
47
        self.block_size = self.get_block_size()
5✔
48

49
    def get_files_size(self, files):
5✔
50
        """Returns the total size of several files in bytes"""
51
        return sum(self.get_file_size(file) for file in files)
5✔
52

53
    def get_file_size(self, file):
5✔
54
        """Get the size of one file in bytes"""
55
        raise NotImplementedError
5✔
56

57
    def get_block_size(self):
5✔
58
        """Get the block size of the file system"""
59
        raise NotImplementedError()
5✔
60

61
    def get_free_space(self):
5✔
62
        """Get the available space in bytes"""
63
        raise NotImplementedError()
5✔
64

65
    def listdir(self, dir_path):
5✔
66
        """
67
        Get the contents of a directory, as returned by `os.listdir()`.
68
        The path is relative to `self.path`.
69
        """
70
        raise NotImplementedError()
5✔
71

72
    def stat(self, path):
5✔
73
        """
74
        Get information about a file, as returned by `os.stat()`.
75
        The path is relative to `self.path`
76
        """
77
        raise NotImplementedError()
5✔
78

79
    def isfile(self, path):
5✔
80
        """Return True if the file is a regular file. The path is relative to `self.path`"""
81
        raise NotImplementedError()
5✔
82

83
    def isdir(self, path):
5✔
84
        """Return True if the file is a directory. The path is relative to `self.path`."""
85
        raise NotImplementedError()
5✔
86

87
    def remove(self, path):
5✔
88
        """Remove the target file. The path is relative to `self.path`."""
89
        raise NotImplementedError()
5✔
90

91
    def put(self, local_path, storage_path):
5✔
92
        """
93
        Put the file from the local filesystem located at `local_path` in the Storage.
94
        `storage_path` is the relative path in the Storage.
95
        """
96
        raise NotImplementedError()
5✔
97

98
    def _get_file_disk_usage(self, file_size):
5✔
99
        """
100
        Get the space occupied by a file on disk,
101
        taking into account the block size of the filesystem.
102
        """
103
        blocks = math.ceil(float(file_size) / float(self.block_size))
5✔
104
        return blocks * self.block_size
5✔
105

106
    def _get_removable_files(self):
5✔
107
        """
108
        Get the characteristics of the files that can be deleted from the directory.
109
        Returns a list of tuples containing:
110
            - the file path
111
            - the file size
112
            - the time of the last modification
113
        """
114
        dirs_to_process = ['']
5✔
115
        depth = 1
5✔
116
        max_depth = 1000
5✔
117
        removable_files = []
5✔
118
        while dirs_to_process and depth < max_depth:
5✔
119
            current_dir = dirs_to_process.pop()
5✔
120
            for file_name in self.listdir(current_dir):
5✔
121
                path = os.path.join(current_dir, file_name)
5✔
122
                if self.isfile(path):
5✔
123
                    try:
5✔
124
                        file_stat = self.stat(path)
5✔
125
                    except FileNotFoundError:
5✔
126
                        LOGGER.warning(
5✔
127
                            "%s has been removed while a cleanup was in progress", path)
128
                    except IsADirectoryError:
5✔
129
                        LOGGER.warning("%s has somehow been transformed into a directory while a" +
5✔
130
                                       " cleanup was in progress",
131
                                       path)
132
                    else:
133
                        removable_files.append((
5✔
134
                            path,
135
                            self._get_file_disk_usage(file_stat.st_size),
136
                            file_stat.st_mtime
137
                        ))
138
                elif self.isdir(path):
5✔
139
                    dirs_to_process.append(path)
5✔
140
                    depth += 1
5✔
141
        LOGGER.debug("Contents of %s directory: %s", self.path, removable_files)
5✔
142
        return removable_files
5✔
143

144
    @staticmethod
5✔
UNCOV
145
    def _sort_by_mtime(files):
4✔
146
        """
147
        Sorts a list of files by their modification time.
148
        The list should have the same structure as returned by `_get_removable_files()`
149
        """
150
        return sorted(files, key=lambda x: x[2])
5✔
151

152
    @staticmethod
5✔
UNCOV
153
    def _total_freeable_space(removable_files):
4✔
154
        total = 0
5✔
155
        for file_info in removable_files:
5✔
156
            total += file_info[1]
5✔
157
        return total
5✔
158

159
    def _delete_files(self, space_to_free, removable_files):
5✔
160
        """
161
        Deletes files from the removable_files list until enough space has been freed.
162
        removable_files must be sorted by decreasing priority (the first files in the list
163
        will be deleted first). space_to_free is in bytes.
164
        """
165
        files_to_delete = []
5✔
166
        freed_space = 0
5✔
167

168
        for file_properties in removable_files:
5✔
169
            files_to_delete.append(file_properties[0])
5✔
170
            freed_space += file_properties[1]
5✔
171
            if freed_space >= space_to_free:
5✔
172
                break
5✔
173

174
        for file_path in files_to_delete:
5✔
175
            self.remove(file_path)
5✔
176

177
        return freed_space, files_to_delete
5✔
178

179
    def free_space(self, new_file_size):
5✔
180
        """
181
        Removes files from `self.path` until `new_file_size` bytes have been freed,
182
        starting with the oldest files.
183
        """
184
        max_retries = 30
5✔
185
        countdown = 20
5✔
186
        retries = 0
5✔
187
        while retries < max_retries:
5✔
188
            with redis_lock(f"{self.LOCK_PREFIX}{self.path}", '') as acquired:
5✔
189
                if acquired:
5✔
190
                    current_free_space = self.get_free_space()
5✔
191
                    removable_files = self._sort_by_mtime(self._get_removable_files())
5✔
192
                    freeable_space = self._total_freeable_space(removable_files)
5✔
193

194
                    if new_file_size > freeable_space + current_free_space:
5✔
195
                        raise CleanUpError("Cannot free enough space")
5✔
196
                    elif new_file_size > current_free_space:
5✔
197
                        space_to_free = new_file_size - current_free_space
5✔
198
                        freed_space, deleted_files = self._delete_files(
5✔
199
                            space_to_free, removable_files)
200
                        LOGGER.info("Freed %d bytes by removing the following files: %s",
5✔
201
                                    freed_space, deleted_files)
202
                        return freed_space, deleted_files
5✔
203
                    else:
204
                        return 0, []
5✔
205
                else:
206
                    LOGGER.info("Waiting for concurrent cleanup to finish")
5✔
207
                    time.sleep(countdown)
5✔
208
                    retries += 1
5✔
209
        raise CleanUpError("Could not acquire cleanup lock")
5✔
210

211

212
class LocalStorage(Storage):
5✔
213
    """Represents a storage location on a local disk"""
214

215
    def get_file_size(self, file):
5✔
216
        return os.path.getsize(os.path.join(self.path, file))
5✔
217

218
    def get_block_size(self):
5✔
219
        return self.stat('').st_blksize
5✔
220

221
    def get_free_space(self):
5✔
222
        return shutil.disk_usage(self.path).free
5✔
223

224
    def listdir(self, dir_path):
5✔
225
        return os.listdir(os.path.join(self.path, dir_path))
5✔
226

227
    def stat(self, path):
5✔
228
        return os.stat(os.path.join(self.path, path))
5✔
229

230
    def isfile(self, path):
5✔
231
        return os.path.isfile(os.path.join(self.path, path))
5✔
232

233
    def isdir(self, path):
5✔
234
        return os.path.isdir(os.path.join(self.path, path))
5✔
235

236
    def remove(self, path):
5✔
237
        os.remove(os.path.join(self.path, path))
5✔
238

239
    def put(self, local_path, storage_path):
5✔
240
        shutil.copy(local_path, os.path.join(self.path, storage_path))
5✔
241

242

243
class RemoteStorage(Storage):
5✔
244
    """Represents a storage location on a remote Linux host accessible by SSH"""
245

246
    def __init__(self, **kwargs):
5✔
247
        self.host = kwargs['host']
5✔
248
        host_config = self.get_ssh_config()
5✔
249

250
        self.ssh_client = paramiko.SSHClient()
5✔
251
        self.ssh_client.load_system_host_keys()
5✔
252
        self.ssh_client.connect(self.host, host_config['port'], host_config['user'],
5✔
253
                                key_filename=host_config['identityfile'][0])
254

255
        self.sftp_client = self.ssh_client.open_sftp()
5✔
256

257
        super().__init__(**kwargs)
5✔
258

259
    def __del__(self):
5✔
260
        self.ssh_client.close()
5✔
261

262
    def get_ssh_config(self):
5✔
263
        """Read SSH configuration from ~/.ssh/config"""
264
        config = paramiko.SSHConfig.from_path(
5✔
265
            os.path.join(os.path.expanduser('~'), '.ssh', 'config'))
266
        return config.lookup(self.host)
5✔
267

268
    def get_file_size(self, file):
5✔
269
        absolute_path = posixpath.join(self.path, file)
5✔
270
        _, stdout, _ = self.ssh_client.exec_command(f"du --bytes '{absolute_path}' | cut -f1")
5✔
271
        return int(stdout.read())
5✔
272

273
    def get_block_size(self):
5✔
274
        _, stdout, _ = self.ssh_client.exec_command(f"stat -f --printf '%S' '{self.path}'")
5✔
275
        return int(stdout.read())
5✔
276

277
    def get_free_space(self):
5✔
278
        _, stdout, _ = self.ssh_client.exec_command(f"df -B 1 -P '{self.path}'")
5✔
279
        return int(stdout.readlines()[1].split()[3])
5✔
280

281
    def listdir(self, dir_path):
5✔
282
        return self.sftp_client.listdir(os.path.join(self.path, dir_path))
5✔
283

284
    def stat(self, path):
5✔
285
        full_path = os.path.join(self.path, path)
5✔
286
        return self.sftp_client.stat(full_path)
5✔
287

288
    def isfile(self, path):
5✔
289
        mode = self.stat(path).st_mode
5✔
290
        return stat.S_ISREG(mode)
5✔
291

292
    def isdir(self, path):
5✔
293
        mode = self.stat(path).st_mode
5✔
294
        return stat.S_ISDIR(mode)
5✔
295

296
    def remove(self, path):
5✔
297
        self.sftp_client.remove(os.path.join(self.path, path))
5✔
298

299
    def put(self, local_path, storage_path):
5✔
300
        remote_path = os.path.join(self.path, storage_path)
5✔
301
        # Create the directory where the files will be copied on the remote server
302
        _, stdout, _ = self.ssh_client.exec_command(f"mkdir -p {os.path.dirname(remote_path)}")
5✔
303
        stdout.channel.recv_exit_status() # wait for the command to finish executing
5✔
304
        #Copy the files
305
        with scp.SCPClient(self.ssh_client.get_transport()) as scp_client:
5✔
306
            scp_client.put(local_path, recursive=True, remote_path=remote_path)
5✔
307

308

309
@contextmanager
5✔
UNCOV
310
def redis_lock(lock_key, lock_value):
4✔
311
    """
312
    Context manager to set a lock in a cache. Pretty much copied from:
313
    https://docs.celeryproject.org/en/latest/tutorials/task-cookbook.html#ensuring-a-task-is-only-executed-one-at-a-time
314
    """
315
    if Redis is not None and REDIS_HOST and REDIS_PORT:
5✔
316
        cache = Redis(host=REDIS_HOST, port=REDIS_PORT)
5✔
317
        timeout_at = time.monotonic() + LOCK_EXPIRE
5✔
318
        status = cache.setnx(lock_key, lock_value)
5✔
319
        cache.expire(lock_key, LOCK_EXPIRE)
5✔
320
        try:
5✔
321
            yield status
5✔
322
        finally:
323
            if time.monotonic() < timeout_at and status:
5✔
324
                cache.delete(lock_key)
5✔
325
    else:
326
        # if the redis lib is not available or no connection information is provided,
327
        # always acquire the lock
328
        yield True
5✔
329

330

331
def redis_any_lock(lock_key_prefix='lock'):
5✔
332
    """Returns True if any lock is set in Redis.
333
    Returns False if no lock is set or if Redis is not available.
334
    """
335
    if Redis is not None and REDIS_HOST and REDIS_PORT:
5✔
336
        cache = Redis(host=REDIS_HOST, port=REDIS_PORT)
5✔
337
        for key in cache.keys():
5✔
338
            if key.startswith(bytes(lock_key_prefix, 'utf-8')):
5✔
339
                return True
5✔
340
    return False
5✔
341

342

343
def gunzip(archive_path, out_dir):
5✔
344
    """Extracts the gzip archive contents to `out_dir`"""
345
    file_name = re.sub(r'\.gz$', '', os.path.basename(archive_path))
5✔
346
    with gzip.open(archive_path, 'rb') as archive_file:
5✔
347
        with open(os.path.join(out_dir, file_name), 'wb') as output_file:
5✔
348
            shutil.copyfileobj(archive_file, output_file)
5✔
349

350

351
shutil.register_unpack_format('gz', ['.gz'], gunzip)
5✔
352

353

354
def unarchive(in_file):
5✔
355
    """Extract contents if `in_file` is an archive. Supported format
356
    are those supported by shutil's unpack_archive(), plus gzip.
357
    The files are extracted in a folder name like the archive, minus
358
    the extension.
359
    Returns None if the given file is not an archive (or in an
360
    unsupported archive format)
361
    """
362
    extract_dir = None
5✔
363

364
    match = re.match(
5✔
365
        (r'(.*)\.('
366
         r'tar'
367
         r'|tar\.gz|tgz'
368
         r'|tar\.bz2|tbz2'
369
         r'|tar\.xz|txz'
370
         r'|zip'
371
         r'|(?<!tar\.)gz)$'),
372
        in_file)
373
    if match:
5✔
374
        extract_dir = match.group(1)
5✔
375
        os.makedirs(extract_dir, exist_ok=True)
5✔
376

377
        try:
5✔
378
            shutil.unpack_archive(in_file, extract_dir)
5✔
379
        except shutil.ReadError:
5✔
380
            shutil.rmtree(extract_dir)
5✔
381
            raise
5✔
382

383
    return extract_dir
5✔
384

385
def is_gzipfile(file_path):
5✔
386
    """Test if a file is a gzip archive by checking the magic number
387
    """
388
    with open(file_path, 'rb') as file_handler:
5✔
389
        return file_handler.read(2) == b'\x1f\x8b'
5✔
390

391
def tar_gzip(file_path, force=False):
5✔
392
    """Makes the file a tar archive compressed with gzip if the file is not one already"""
393
    if os.path.isfile(file_path) and (tarfile.is_tarfile(file_path) or
5✔
394
                                      zipfile.is_zipfile(file_path) or
395
                                      is_gzipfile(file_path)):
396
        return file_path
5✔
397

398
    archive_path = f"{file_path}.tar.gz"
5✔
399
    if os.path.isfile(archive_path):
5✔
400
        if force:
5✔
401
            os.remove(archive_path)
5✔
402
        else:
403
            return archive_path
5✔
404

405
    with tarfile.open(archive_path, 'w:gz') as archive:
5✔
406
        archive.add(file_path, arcname=os.path.basename(file_path))
5✔
407

408
    return archive_path
5✔
409

410

411
def yaml_env_safe_load(stream):
5✔
412
    """Parses a YAML string with support for the !ENV tag.
413
    A string tagged with !ENV is replaced by the value of the
414
    environment variable whose name is that string.
415
    """
416
    yaml.SafeLoader.add_constructor('!ENV', lambda loader, node: os.getenv(node.value))
5✔
417
    return yaml.safe_load(stream)
5✔
418

419

420
class TrustDomainSession(requests.Session):
5✔
421
    """Session class which allows keeping authentication headers in
422
    case of redirection to the same domain
423
    """
424

425
    def should_strip_auth(self, old_url, new_url):
5✔
426
        """Keep the authentication header when redirecting to a
427
        different host in the same domain, for example from
428
        "scihub.copernicus.eu" to "apihub.copernicus.eu".
429
        If not in this case, defer to the parent class.
430
        """
431
        old_split_hostname = urlparse(old_url).hostname.split('.')
5✔
432
        new_split_hostname = urlparse(new_url).hostname.split('.')
5✔
433
        if (len(old_split_hostname) > 2
5✔
434
                and len(new_split_hostname) > 2
435
                and old_split_hostname[-2:] == new_split_hostname[-2:]):
436
            return False
5✔
437
        else:
438
            return super().should_strip_auth(old_url, new_url)
5✔
439

440

441
def http_request(http_method, *args, **kwargs):
5✔
442
    """Wrapper around requests.request() which runs the HTTP request
443
    inside a TrustDomainSession if authentication is provided. This
444
    makes it possible to follow redirections inside the same domain.
445
    """
446
    auth = kwargs.pop('auth', (None, None))
5✔
447
    if ((isinstance(auth, Sequence) and any(i is not None for i in auth))
5✔
448
            or (not isinstance(auth, Sequence) and auth)):
449
        with TrustDomainSession() as session:
5✔
450
            session.auth = auth
5✔
451
            return session.request(http_method, *args, **kwargs)
5✔
452
    else:
453
        return requests.request(http_method, *args, **kwargs)
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc