• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Ouranosinc / xscen / 13186761960

06 Feb 2025 07:51PM UTC coverage: 88.051%. First build
13186761960

Pull #519

github

web-flow
Merge c9f9f3628 into fc049a239
Pull Request #519: Drop setuptools, use flit

0 of 3 new or added lines in 2 files covered. (0.0%)

3935 of 4469 relevant lines covered (88.05%)

5.2 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.06
/src/xscen/scripting.py
1
"""A collection of various convenience objects and functions to use in scripts."""
2

3
import logging
6✔
4
import mimetypes
6✔
5
import os
6✔
6
import shutil as sh
6✔
7
import signal
6✔
8
import smtplib
6✔
9
import sys
6✔
10
import time
6✔
11
import warnings
6✔
12
from contextlib import contextmanager
6✔
13
from email.message import EmailMessage
6✔
14
from io import BytesIO
6✔
15
from pathlib import Path
6✔
16
from traceback import format_exception
6✔
17

18
import xarray as xr
6✔
19
from matplotlib.figure import Figure
6✔
20

21
from .catalog import ProjectCatalog
6✔
22
from .config import parse_config
6✔
23
from .utils import get_cat_attrs
6✔
24

25
logger = logging.getLogger(__name__)
6✔
26

27
__all__ = [
6✔
28
    "TimeoutException",
29
    "measure_time",
30
    "move_and_delete",
31
    "save_and_update",
32
    "send_mail",
33
    "send_mail_on_exit",
34
    "skippable",
35
    "timeout",
36
]
37

38

39
@parse_config
6✔
40
def send_mail(
6✔
41
    *,
42
    subject: str,
43
    msg: str,
44
    to: str | None = None,
45
    server: str = "127.0.0.1",
46
    port: int = 25,
47
    attachments: None | (
48
        list[tuple[str, Figure | os.PathLike] | Figure | os.PathLike]
49
    ) = None,
50
) -> None:
51
    """Send email.
52

53
    Email a single address through a login-less SMTP server.
54
    The default values of server and port should work out-of-the-box on Ouranos's systems.
55

56
    Parameters
57
    ----------
58
    subject: str
59
      Subject line.
60
    msg: str
61
      Main content of the email. Can be UTF-8 and multi-line.
62
    to: str, optional
63
      Email address to which send the email. If None (default), the email is sent to "{os.getlogin()}@{os.uname().nodename}".
64
      On unix systems simply put your real email address in `$HOME/.forward` to receive the emails sent to this local address.
65
    server : str
66
      SMTP server url. Defaults to 127.0.0.1, the local host. This function does not try to log-in.
67
    port: int
68
      Port of the SMTP service on the server. Defaults to 25, which is usually the default port on unix-like systems.
69
    attachments : list of paths or matplotlib figures or tuples of a string and a path or figure, optional
70
      List of files to attach to the email.
71
      Elements of the list can be paths, the mimetypes of those is guessed and the files are read and sent.
72
      Elements can also be matplotlib Figures which are send as png image (savefig) with names like "Figure00.png".
73
      Finally, elements can be tuples of a filename to use in the email and the attachment, handled as above.
74

75
    Returns
76
    -------
77
    None
78
    """
79
    # Inspired by https://betterprogramming.pub/how-to-send-emails-with-attachments-using-python-dd37c4b6a7fd
80
    email = EmailMessage()
×
81
    email["Subject"] = subject
×
82
    email["From"] = f"{os.getlogin()}@{os.uname().nodename}"
×
83
    email["To"] = to or f"{os.getlogin()}@{os.uname().nodename}"
×
84
    email.set_content(msg)
×
85

86
    for i, att in enumerate(attachments or []):
×
87
        fname = None
×
88
        if isinstance(att, tuple):
×
89
            fname, att = att
×
90
        if isinstance(att, Figure):
×
91
            data = BytesIO()
×
92
            att.savefig(data, format="png")
×
93
            data.seek(0)
×
94
            email.add_attachment(
×
95
                data.read(),
96
                maintype="image",
97
                subtype="png",
98
                filename=fname or f"Figure{i:02d}.png",
99
            )
100
        else:  # a path
101
            attpath = Path(att)
×
102
            ctype, encoding = mimetypes.guess_type(attpath)
×
103
            if ctype is None or encoding is not None:
×
104
                ctype = "application/octet-stream"
×
105
            maintype, subtype = ctype.split("/", 1)
×
106

107
            with attpath.open("rb") as fp:
×
108
                email.add_attachment(
×
109
                    fp.read(),
110
                    maintype=maintype,
111
                    subtype=subtype,
112
                    filename=fname or attpath.name,
113
                )
114

115
    with smtplib.SMTP(host=server, port=port) as SMTP:
×
116
        SMTP.send_message(email)
×
117

118

119
class ExitWatcher:
6✔
120
    """An object that watches system exits and exceptions before the python exits."""
121

122
    def __init__(self):
6✔
123
        self.code = None
6✔
124
        self.error = None
6✔
125
        self.hooked = False
6✔
126

127
    def hook(self):
6✔
128
        """Hooks the watcher to the system by monkeypatching `sys` with its own methods."""
129
        if not self.hooked:
6✔
130
            self.orig_exit = sys.exit
6✔
131
            self.orig_excepthook = sys.excepthook
6✔
132
            sys.exit = self.exit
6✔
133
            sys.excepthook = self.err_handler
6✔
134
            self.hooked = True
6✔
135
        else:
NEW
136
            warnings.warn("Exit hooks have already been overridden.")
×
137

138
    def unhook(self):
6✔
139
        if self.hooked:
×
140
            sys.exit = self.orig_exit
×
141
            sys.excepthook = self.orig_excepthook
×
142
        else:
NEW
143
            raise ValueError("Exit hooks were not overridden. Cannot unhook.")
×
144

145
    def exit(self, code=0):
6✔
146
        self.code = code
×
147
        self.orig_exit(code)
×
148

149
    def err_handler(self, *exc_info):
6✔
150
        self.error = exc_info
×
151
        self.orig_excepthook(*exc_info)
×
152

153

154
exit_watcher = ExitWatcher()
6✔
155
exit_watcher.hook()
6✔
156

157

158
@parse_config
6✔
159
def send_mail_on_exit(
6✔
160
    *,
161
    subject: str | None = None,
162
    msg_ok: str | None = None,
163
    msg_err: str | None = None,
164
    on_error_only: bool = False,
165
    skip_ctrlc: bool = True,
166
    **mail_kwargs,
167
) -> None:
168
    """Send an email with content depending on how the system exited.
169

170
    This function is best used by registering it with `atexit`. Calls :py:func:`send_mail`.
171

172
    Parameters
173
    ----------
174
    subject : str, optional
175
        Email subject. Will be appended by "Success", "No errors" or "Failure" depending
176
        on how the system exits.
177
    msg_ok : str, optional
178
        Content of the email if the system exists successfully.
179
    msg_err : str, optional
180
        Content of the email id the system exists with a non-zero code or with an error.
181
        The message will be appended by the exit code or with the error traceback.
182
    on_error_only : boolean
183
        Whether to only send an email on a non-zero/error exit.
184
    skip_ctrlc : boolean
185
        If True (default), exiting with a KeyboardInterrupt will not send an email.
186
    mail_kwargs
187
        Other arguments passed to :py:func:`send_mail`.
188
        The `to` argument is necessary for this function to work.
189

190
    Example
191
    -------
192
    Send an eamil titled "Woups" upon non-successful program exit. We assume the `to`
193
    field was given in the config.
194

195
    >>> import atexit
196
    >>> atexit.register(send_mail_on_exit, subject="Woups", on_error_only=True)
197
    """
198
    subject = subject or "Workflow"
×
199
    msg_err = msg_err or "Workflow exited with some errors."
×
200
    if (
×
201
        not on_error_only
202
        and exit_watcher.error is None
203
        and exit_watcher.code in [None, 0]
204
    ):
205
        send_mail(
×
206
            subject=subject + " - Success",
207
            msg=msg_ok or "Workflow exited successfully.",
208
            **mail_kwargs,
209
        )
210
    elif exit_watcher.error is None and (exit_watcher.code or 0) > 0:
×
211
        send_mail(
×
212
            subject=subject + " - No errors",
213
            msg=f"{msg_err}\nSystem exited with code {exit_watcher.code}.",
214
            **mail_kwargs,
215
        )
216
    elif exit_watcher.error is not None and (
×
217
        exit_watcher.error[0] is not KeyboardInterrupt or not skip_ctrlc
218
    ):
219
        tb = "".join(format_exception(*exit_watcher.error))
×
220
        msg_err = f"{msg_err}\n\n{tb}"
×
221
        send_mail(subject=subject + " - Failure", msg=msg_err, **mail_kwargs)
×
222

223

224
@parse_config
6✔
225
class measure_time:
6✔
226
    """Context for timing a code block.
227

228
    Parameters
229
    ----------
230
    name : str, optional
231
      A name to give to the block being timed, for meaningful logging.
232
    cpu : boolean
233
      If True, the CPU time is also measured and logged.
234
    logger : logging.Logger, optional
235
      The logger object to use when sending Info messages with the measured time.
236
      Defaults to a logger from this module.
237
    """
238

239
    def __init__(
6✔
240
        self,
241
        name: str | None = None,
242
        cpu: bool = False,
243
        logger: logging.Logger = logger,
244
    ):
245
        self.name = name or ""
×
246
        self.cpu = cpu
×
247
        self.logger = logger
×
248

249
    def __enter__(self):  # noqa: D105
6✔
250
        self.start = time.perf_counter()
×
251
        self.start_cpu = time.process_time()
×
252
        msg = f"Started process {self.name}."
×
253
        self.logger.info(msg)
×
254
        return
×
255

256
    def __exit__(self, *args, **kwargs):  # noqa: D105
6✔
257
        elapsed = time.perf_counter() - self.start
×
258
        elapsed_cpu = time.process_time() - self.start_cpu
×
259
        occ = elapsed_cpu / elapsed
×
260
        s = f"Process {self.name} done in {elapsed:.02f} s"
×
261
        if self.cpu:
×
262
            s += f" and used {elapsed_cpu:.02f} of cpu time ({occ:.1%} % occupancy)."
×
263

264
        self.logger.info(s)
×
265

266

267
# FIXME: This should be written as "TimeoutError"
268
class TimeoutException(Exception):  # noqa: N818
6✔
269
    """An exception raised with a timeout occurs."""
270

271
    def __init__(self, seconds: int, task: str = "", **kwargs):
6✔
272
        self.msg = f"Task {task} timed out after {seconds} seconds"
×
273
        super().__init__(self.msg, **kwargs)
×
274

275

276
@contextmanager
6✔
277
def timeout(seconds: int, task: str = ""):
6✔
278
    """Timeout context manager.
279

280
    Only one can be used at a time, this is not multithread-safe : it cannot be used in
281
    another thread than the main one, but multithreading can be used in parallel.
282

283
    Parameters
284
    ----------
285
    seconds : int
286
      Number of seconds after which the context exits with a TimeoutException.
287
      If None or negative, no timeout is set and this context does nothing.
288
    task : str, optional
289
      A name to give to the task, allowing a more meaningful exception.
290
    """
291
    if seconds is None or seconds <= 0:
×
292
        yield
×
293
    else:
294

295
        # FIXME: These variables are not used
296
        def _timeout_handler(signum, frame):  # noqa: F841
×
297
            raise TimeoutException(seconds, task)
×
298

299
        old_handler = signal.signal(signal.SIGALRM, _timeout_handler)
×
300
        signal.alarm(seconds)
×
301
        try:
×
302
            yield
×
303
        finally:
304
            signal.alarm(0)
×
305
            signal.signal(signal.SIGALRM, old_handler)
×
306

307

308
@contextmanager
6✔
309
def skippable(seconds: int = 2, task: str = "", logger: logging.Logger | None = None):
6✔
310
    """Skippable context manager.
311

312
    When CTRL-C (SIGINT, KeyboardInterrupt) is sent within the context,
313
    this catches it, prints to the log and gives a timeout during which a subsequent
314
    interruption will stop the script. Otherwise, the context exits normally.
315

316
    This is meant to be used within a loop so that we can skip some iterations:
317

318
    .. code-block:: python
319

320
        for i in iterable:
321
            with skippable(2, i):
322
                some_skippable_code()
323

324
    Parameters
325
    ----------
326
    seconds: int
327
      Number of seconds to wait for a second CTRL-C.
328
    task : str
329
      A name for the skippable task, to have an explicit script.
330
    logger : logging.Logger, optional
331
      The logger to use when printing the messages. The interruption signal is
332
      notified with ERROR, while the skipping is notified with INFO.
333
      If not given (default), a brutal print is used.
334
    """
335
    if logger:
×
336
        err = logger.error
×
337
        inf = logger.info
×
338
    else:
339
        err = inf = print
×
340
    try:
×
341
        yield
×
342
    except KeyboardInterrupt:
×
343
        err("Received SIGINT. Do it again to stop the script.")
×
344
        time.sleep(seconds)
×
345
        inf(f"Skipping {task}.")
×
346

347

348
def save_and_update(
6✔
349
    ds: xr.Dataset,
350
    pcat: ProjectCatalog,
351
    path: str | os.PathLike | None = None,
352
    file_format: str | None = None,
353
    build_path_kwargs: dict | None = None,
354
    save_kwargs: dict | None = None,
355
    update_kwargs: dict | None = None,
356
):
357
    """
358
    Construct the path, save and delete.
359

360
    This function can be used after each task of a workflow.
361

362
    Parameters
363
    ----------
364
    ds: xr.Dataset
365
        Dataset to save.
366
    pcat: ProjectCatalog
367
        Catalog to update after saving the dataset.
368
    path: str or os.pathlike, optional
369
        Path where to save the dataset.
370
        If the string contains variables in curly bracket. They will be filled by catalog attributes.
371
        If None, the `catutils.build_path` function will be used to create a path.
372
    file_format: {'nc', 'zarr'}
373
        Format of the file.
374
        If None, look for the following in order: build_path_kwargs['format'], a suffix in path, ds.attrs['cat:format'].
375
        If nothing is found, it will default to zarr.
376
    build_path_kwargs: dict, optional
377
        Arguments to pass to `build_path`.
378
    save_kwargs: dict, optional
379
        Arguments to pass to `save_to_netcdf` or `save_to_zarr`.
380
    update_kwargs: dict, optional
381
        Arguments to pass to `update_from_ds`.
382
    """
383
    build_path_kwargs = build_path_kwargs or {}
6✔
384
    save_kwargs = save_kwargs or {}
6✔
385
    update_kwargs = update_kwargs or {}
6✔
386

387
    # try to guess file format if not given.
388
    if file_format is None:
6✔
389
        if "format" in build_path_kwargs:
6✔
390
            file_format = build_path_kwargs.get("format")
×
391
        elif path is not None and Path(path).suffix:
6✔
392
            file_format = Path(path).suffix.split(".")[-1]
6✔
393
        else:
394
            file_format = ds.attrs.get("cat:format", "zarr")
×
395

396
    # get path
397
    if path is not None:
6✔
398
        path = str(path).format(
6✔
399
            **get_cat_attrs(ds, var_as_str=True)
400
        )  # fill path with attrs
401
    else:  # if path is not given build it
402
        build_path_kwargs.setdefault("format", file_format)
6✔
403
        from .catutils import build_path
6✔
404

405
        path = build_path(ds, **build_path_kwargs)
6✔
406

407
    # save
408
    if file_format == "zarr":
6✔
409
        from .io import save_to_zarr
6✔
410

411
        save_to_zarr(ds, path, **save_kwargs)
6✔
412
    elif file_format == "nc":
6✔
413
        from .io import save_to_netcdf
6✔
414

415
        save_to_netcdf(ds, path, **save_kwargs)
6✔
416
    else:
417
        raise ValueError(f"file_format {file_format} is not valid. Use zarr or nc.")
×
418

419
    # update catalog
420
    pcat.update_from_ds(ds=ds, path=path, **update_kwargs)
6✔
421

422
    msg = f"File {path} has been saved successfully and the catalog was updated."
6✔
423
    logger.info(msg)
6✔
424

425

426
def move_and_delete(
6✔
427
    moving: list[list[str | os.PathLike]],
428
    pcat: ProjectCatalog,
429
    deleting: list[str | os.PathLike] | None = None,
430
    copy: bool = False,
431
):
432
    """
433
    First, move files, then update the catalog with new locations. Finally, delete directories.
434

435
    This function can be used at the end of for loop in a workflow to clean temporary files.
436

437
    Parameters
438
    ----------
439
    moving : list of lists of str or os.PathLike
440
        list of lists of path of files to move, following the format: [[source 1, destination1], [source 2, destination2],...]
441
    pcat : ProjectCatalog
442
        Catalog to update with new destinations
443
    deleting : list of str or os.PathLike, optional
444
        List of directories to be deleted, including all contents, and recreated empty. e.g. The working directory of a workflow.
445
    copy : bool, optional
446
        If True, copy directories instead of moving them.
447
    """
448
    if isinstance(moving, list) and isinstance(moving[0], list):
6✔
449
        for files in moving:
6✔
450
            source, dest = files[0], files[1]
6✔
451
            if Path(source).exists():
6✔
452
                if copy:
6✔
453
                    msg = f"Copying {source} to {dest}."
6✔
454
                    logger.info(msg)
6✔
455
                    copied_files = sh.copytree(source, dest, dirs_exist_ok=True)
6✔
456
                    for f in copied_files:
6✔
457
                        # copied files don't include zarr files
458
                        if f[-16:] == ".zarr/.zmetadata":
6✔
459
                            with warnings.catch_warnings():
×
460
                                # Silence RuntimeWarning about failed guess of backend engines
461
                                warnings.simplefilter("ignore", category=RuntimeWarning)
×
462
                                ds = xr.open_dataset(f[:-11])
×
463
                            pcat.update_from_ds(ds=ds, path=f[:-11])
×
464
                        if f[-3:] == ".nc":
6✔
465
                            ds = xr.open_dataset(f)
×
466
                            pcat.update_from_ds(ds=ds, path=f)
×
467
                else:
468
                    msg = f"Moving {source} to {dest}."
6✔
469
                    logger.info(msg)
6✔
470
                    sh.move(source, dest)
6✔
471
                if Path(dest).suffix in [".zarr", ".nc"]:
6✔
472
                    with warnings.catch_warnings():
6✔
473
                        # Silence RuntimeWarning about failed guess of backend engines
474
                        warnings.simplefilter("ignore", category=RuntimeWarning)
6✔
475
                        ds = xr.open_dataset(dest)
6✔
476
                    pcat.update_from_ds(ds=ds, path=dest)
6✔
477
            else:
478
                msg = f"You are trying to move {source}, but it does not exist."
×
479
                logger.info(msg)
×
480
    else:
481
        raise ValueError("`moving` should be a list of lists.")
×
482

483
    # erase workdir content if this is the last step
484
    if isinstance(deleting, list):
6✔
485
        for dir_to_delete in deleting:
6✔
486
            if Path(dir_to_delete).exists() and Path(dir_to_delete).is_dir():
6✔
487
                msg = f"Deleting content inside {dir_to_delete}."
6✔
488
                logger.info(msg)
6✔
489
                sh.rmtree(dir_to_delete)
6✔
490
                Path(dir_to_delete).mkdir()
6✔
491
    elif deleting is None:
6✔
492
        pass
6✔
493
    else:
494
        raise ValueError("`deleting` should be a list.")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc