• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Ouranosinc / xscen / 12917303355

22 Jan 2025 09:21PM UTC coverage: 87.135%. First build
12917303355

Pull #516

github

web-flow
Merge 0a8d7a20b into 9b98317db
Pull Request #516: Address warnings

7 of 10 new or added lines in 2 files covered. (70.0%)

3935 of 4516 relevant lines covered (87.13%)

5.14 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.06
/src/xscen/scripting.py
1
"""A collection of various convenience objects and functions to use in scripts."""
2

3
import logging
6✔
4
import mimetypes
6✔
5
import os
6✔
6
import shutil as sh
6✔
7
import signal
6✔
8
import smtplib
6✔
9
import sys
6✔
10
import time
6✔
11
import warnings
6✔
12
from contextlib import contextmanager
6✔
13
from email.message import EmailMessage
6✔
14
from io import BytesIO
6✔
15
from pathlib import Path
6✔
16
from traceback import format_exception
6✔
17

18
import xarray as xr
6✔
19
from matplotlib.figure import Figure
6✔
20

21
from .catalog import ProjectCatalog
6✔
22
from .config import parse_config
6✔
23
from .utils import get_cat_attrs
6✔
24

25
logger = logging.getLogger(__name__)
6✔
26

27
__all__ = [
6✔
28
    "TimeoutException",
29
    "measure_time",
30
    "move_and_delete",
31
    "save_and_update",
32
    "send_mail",
33
    "send_mail_on_exit",
34
    "skippable",
35
    "timeout",
36
]
37

38

39
@parse_config
6✔
40
def send_mail(
6✔
41
    *,
42
    subject: str,
43
    msg: str,
44
    to: str | None = None,
45
    server: str = "127.0.0.1",
46
    port: int = 25,
47
    attachments: None | (
48
        list[tuple[str, Figure | os.PathLike] | Figure | os.PathLike]
49
    ) = None,
50
) -> None:
51
    """Send email.
52

53
    Email a single address through a login-less SMTP server.
54
    The default values of server and port should work out-of-the-box on Ouranos's systems.
55

56
    Parameters
57
    ----------
58
    subject: str
59
      Subject line.
60
    msg: str
61
      Main content of the email. Can be UTF-8 and multi-line.
62
    to: str, optional
63
      Email address to which send the email. If None (default), the email is sent to "{os.getlogin()}@{os.uname().nodename}".
64
      On unix systems simply put your real email address in `$HOME/.forward` to receive the emails sent to this local address.
65
    server : str
66
      SMTP server url. Defaults to 127.0.0.1, the local host. This function does not try to log-in.
67
    port: int
68
      Port of the SMTP service on the server. Defaults to 25, which is usually the default port on unix-like systems.
69
    attachments : list of paths or matplotlib figures or tuples of a string and a path or figure, optional
70
      List of files to attach to the email.
71
      Elements of the list can be paths, the mimetypes of those is guessed and the files are read and sent.
72
      Elements can also be matplotlib Figures which are send as png image (savefig) with names like "Figure00.png".
73
      Finally, elements can be tuples of a filename to use in the email and the attachment, handled as above.
74

75
    Returns
76
    -------
77
    None
78
    """
79
    # Inspired by https://betterprogramming.pub/how-to-send-emails-with-attachments-using-python-dd37c4b6a7fd
80
    email = EmailMessage()
×
81
    email["Subject"] = subject
×
82
    email["From"] = f"{os.getlogin()}@{os.uname().nodename}"
×
83
    email["To"] = to or f"{os.getlogin()}@{os.uname().nodename}"
×
84
    email.set_content(msg)
×
85

86
    for i, att in enumerate(attachments or []):
×
87
        fname = None
×
88
        if isinstance(att, tuple):
×
89
            fname, att = att
×
90
        if isinstance(att, Figure):
×
91
            data = BytesIO()
×
92
            att.savefig(data, format="png")
×
93
            data.seek(0)
×
94
            email.add_attachment(
×
95
                data.read(),
96
                maintype="image",
97
                subtype="png",
98
                filename=fname or f"Figure{i:02d}.png",
99
            )
100
        else:  # a path
101
            attpath = Path(att)
×
102
            ctype, encoding = mimetypes.guess_type(attpath)
×
103
            if ctype is None or encoding is not None:
×
104
                ctype = "application/octet-stream"
×
105
            maintype, subtype = ctype.split("/", 1)
×
106

107
            with attpath.open("rb") as fp:
×
108
                email.add_attachment(
×
109
                    fp.read(),
110
                    maintype=maintype,
111
                    subtype=subtype,
112
                    filename=fname or attpath.name,
113
                )
114

115
    with smtplib.SMTP(host=server, port=port) as SMTP:
×
116
        SMTP.send_message(email)
×
117

118

119
class ExitWatcher:
6✔
120
    """An object that watches system exits and exceptions before the python exits."""
121

122
    def __init__(self):
6✔
123
        self.code = None
6✔
124
        self.error = None
6✔
125
        self.hooked = False
6✔
126

127
    def hook(self):
6✔
128
        """Hooks the watcher to the system by monkeypatching `sys` with its own methods."""
129
        if not self.hooked:
6✔
130
            self.orig_exit = sys.exit
6✔
131
            self.orig_excepthook = sys.excepthook
6✔
132
            sys.exit = self.exit
6✔
133
            sys.excepthook = self.err_handler
6✔
134
            self.hooked = True
6✔
135
        else:
136
            warnings.warn("Exit hooks have already been overrided.")
×
137

138
    def unhook(self):
6✔
139
        if self.hooked:
×
140
            sys.exit = self.orig_exit
×
141
            sys.excepthook = self.orig_excepthook
×
142
        else:
143
            raise ValueError("Exit hooks were not overriden, can't unhook.")
×
144

145
    def exit(self, code=0):
6✔
146
        self.code = code
×
147
        self.orig_exit(code)
×
148

149
    def err_handler(self, *exc_info):
6✔
150
        self.error = exc_info
×
151
        self.orig_excepthook(*exc_info)
×
152

153

154
exit_watcher = ExitWatcher()
6✔
155
exit_watcher.hook()
6✔
156

157

158
@parse_config
6✔
159
def send_mail_on_exit(
6✔
160
    *,
161
    subject: str | None = None,
162
    msg_ok: str | None = None,
163
    msg_err: str | None = None,
164
    on_error_only: bool = False,
165
    skip_ctrlc: bool = True,
166
    **mail_kwargs,
167
) -> None:
168
    """Send an email with content depending on how the system exited.
169

170
    This function is best used by registering it with `atexit`. Calls :py:func:`send_mail`.
171

172
    Parameters
173
    ----------
174
    subject : str, optional
175
      Email subject. Will be appended by "Success", "No errors" or "Failure" depending
176
      on how the system exits.
177
    msg_ok : str, optional
178
      Content of the email if the system exists successfully.
179
    msg_err : str, optional
180
      Content of the email id the system exists with a non-zero code or with an error.
181
      The message will be appended by the exit code or with the error traceback.
182
    on_error_only : boolean
183
      Whether to only send an email on a non-zero/error exit.
184
    skip_ctrlc : boolean
185
      If True (default), exiting with a KeyboardInterrupt will not send an email.
186
    mail_kwargs
187
      Other arguments passed to :py:func:`send_mail`.
188
      The `to` argument is necessary for this function to work.
189

190
    Returns
191
    -------
192
    None
193

194
    Example
195
    -------
196
    Send an eamil titled "Woups" upon non-successful program exit. We assume the `to`
197
    field was given in the config.
198

199
    >>> import atexit
200
    >>> atexit.register(send_mail_on_exit, subject="Woups", on_error_only=True)
201
    """
202
    subject = subject or "Workflow"
×
203
    msg_err = msg_err or "Workflow exited with some errors."
×
204
    if (
×
205
        not on_error_only
206
        and exit_watcher.error is None
207
        and exit_watcher.code in [None, 0]
208
    ):
209
        send_mail(
×
210
            subject=subject + " - Success",
211
            msg=msg_ok or "Workflow exited successfully.",
212
            **mail_kwargs,
213
        )
214
    elif exit_watcher.error is None and (exit_watcher.code or 0) > 0:
×
215
        send_mail(
×
216
            subject=subject + " - No errors",
217
            msg=f"{msg_err}\nSystem exited with code {exit_watcher.code}.",
218
            **mail_kwargs,
219
        )
220
    elif exit_watcher.error is not None and (
×
221
        exit_watcher.error[0] is not KeyboardInterrupt or not skip_ctrlc
222
    ):
223
        tb = "".join(format_exception(*exit_watcher.error))
×
224
        msg_err = f"{msg_err}\n\n{tb}"
×
225
        send_mail(subject=subject + " - Failure", msg=msg_err, **mail_kwargs)
×
226

227

228
@parse_config
6✔
229
class measure_time:
6✔
230
    """Context for timing a code block.
231

232
    Parameters
233
    ----------
234
    name : str, optional
235
      A name to give to the block being timed, for meaningful logging.
236
    cpu : boolean
237
      If True, the CPU time is also measured and logged.
238
    logger : logging.Logger, optional
239
      The logger object to use when sending Info messages with the measured time.
240
      Defaults to a logger from this module.
241
    """
242

243
    def __init__(
6✔
244
        self,
245
        name: str | None = None,
246
        cpu: bool = False,
247
        logger: logging.Logger = logger,
248
    ):
249
        self.name = name or ""
×
250
        self.cpu = cpu
×
251
        self.logger = logger
×
252

253
    def __enter__(self):  # noqa: D105
6✔
254
        self.start = time.perf_counter()
×
255
        self.start_cpu = time.process_time()
×
256
        msg = f"Started process {self.name}."
×
257
        self.logger.info(msg)
×
258
        return
×
259

260
    def __exit__(self, *args, **kwargs):  # noqa: D105
6✔
261
        elapsed = time.perf_counter() - self.start
×
262
        elapsed_cpu = time.process_time() - self.start_cpu
×
263
        occ = elapsed_cpu / elapsed
×
264
        s = f"Process {self.name} done in {elapsed:.02f} s"
×
265
        if self.cpu:
×
266
            s += f" and used {elapsed_cpu:.02f} of cpu time ({occ:.1%} % occupancy)."
×
267

268
        self.logger.info(s)
×
269

270

271
# FIXME: This should be written as "TimeoutError"
272
class TimeoutException(Exception):  # noqa: N818
6✔
273
    """An exception raised with a timeout occurs."""
274

275
    def __init__(self, seconds: int, task: str = "", **kwargs):
6✔
276
        self.msg = f"Task {task} timed out after {seconds} seconds"
×
277
        super().__init__(self.msg, **kwargs)
×
278

279

280
@contextmanager
6✔
281
def timeout(seconds: int, task: str = ""):
6✔
282
    """Timeout context manager.
283

284
    Only one can be used at a time, this is not multithread-safe : it cannot be used in
285
    another thread than the main one, but multithreading can be used in parallel.
286

287
    Parameters
288
    ----------
289
    seconds : int
290
      Number of seconds after which the context exits with a TimeoutException.
291
      If None or negative, no timeout is set and this context does nothing.
292
    task : str, optional
293
      A name to give to the task, allowing a more meaningful exception.
294
    """
295
    if seconds is None or seconds <= 0:
×
296
        yield
×
297
    else:
298

299
        # FIXME: These variables are not used
300
        def _timeout_handler(signum, frame):  # noqa: F841
×
301
            raise TimeoutException(seconds, task)
×
302

303
        old_handler = signal.signal(signal.SIGALRM, _timeout_handler)
×
304
        signal.alarm(seconds)
×
305
        try:
×
306
            yield
×
307
        finally:
308
            signal.alarm(0)
×
309
            signal.signal(signal.SIGALRM, old_handler)
×
310

311

312
@contextmanager
6✔
313
def skippable(seconds: int = 2, task: str = "", logger: logging.Logger | None = None):
6✔
314
    """Skippable context manager.
315

316
    When CTRL-C (SIGINT, KeyboardInterrupt) is sent within the context,
317
    this catches it, prints to the log and gives a timeout during which a subsequent
318
    interruption will stop the script. Otherwise, the context exits normally.
319

320
    This is meant to be used within a loop so that we can skip some iterations:
321

322
    .. code-block:: python
323

324
        for i in iterable:
325
            with skippable(2, i):
326
                some_skippable_code()
327

328
    Parameters
329
    ----------
330
    seconds: int
331
      Number of seconds to wait for a second CTRL-C.
332
    task : str
333
      A name for the skippable task, to have an explicit script.
334
    logger : logging.Logger, optional
335
      The logger to use when printing the messages. The interruption signal is
336
      notified with ERROR, while the skipping is notified with INFO.
337
      If not given (default), a brutal print is used.
338
    """
339
    if logger:
×
340
        err = logger.error
×
341
        inf = logger.info
×
342
    else:
343
        err = inf = print
×
344
    try:
×
345
        yield
×
346
    except KeyboardInterrupt:
×
347
        err("Received SIGINT. Do it again to stop the script.")
×
348
        time.sleep(seconds)
×
349
        inf(f"Skipping {task}.")
×
350

351

352
def save_and_update(
6✔
353
    ds: xr.Dataset,
354
    pcat: ProjectCatalog,
355
    path: str | os.PathLike | None = None,
356
    file_format: str | None = None,
357
    build_path_kwargs: dict | None = None,
358
    save_kwargs: dict | None = None,
359
    update_kwargs: dict | None = None,
360
):
361
    """
362
    Construct the path, save and delete.
363

364
    This function can be used after each task of a workflow.
365

366
    Parameters
367
    ----------
368
    ds: xr.Dataset
369
        Dataset to save.
370
    pcat: ProjectCatalog
371
        Catalog to update after saving the dataset.
372
    path: str or os.pathlike, optional
373
        Path where to save the dataset.
374
        If the string contains variables in curly bracket. They will be filled by catalog attributes.
375
        If None, the `catutils.build_path` fonction will be used to create a path.
376
    file_format: {'nc', 'zarr'}
377
        Format of the file.
378
        If None, look for the following in order: build_path_kwargs['format'], a suffix in path, ds.attrs['cat:format'].
379
        If nothing is found, it will default to zarr.
380
    build_path_kwargs: dict, optional
381
        Arguments to pass to `build_path`.
382
    save_kwargs: dict, optional
383
        Arguments to pass to `save_to_netcdf` or `save_to_zarr`.
384
    update_kwargs: dict, optional
385
        Arguments to pass to `update_from_ds`.
386
    """
387
    build_path_kwargs = build_path_kwargs or {}
6✔
388
    save_kwargs = save_kwargs or {}
6✔
389
    update_kwargs = update_kwargs or {}
6✔
390

391
    # try to guess file format if not given.
392
    if file_format is None:
6✔
393
        if "format" in build_path_kwargs:
6✔
394
            file_format = build_path_kwargs.get("format")
×
395
        elif path is not None and Path(path).suffix:
6✔
396
            file_format = Path(path).suffix.split(".")[-1]
6✔
397
        else:
398
            file_format = ds.attrs.get("cat:format", "zarr")
×
399

400
    # get path
401
    if path is not None:
6✔
402
        path = str(path).format(
6✔
403
            **get_cat_attrs(ds, var_as_str=True)
404
        )  # fill path with attrs
405
    else:  # if path is not given build it
406
        build_path_kwargs.setdefault("format", file_format)
6✔
407
        from .catutils import build_path
6✔
408

409
        path = build_path(ds, **build_path_kwargs)
6✔
410

411
    # save
412
    if file_format == "zarr":
6✔
413
        from .io import save_to_zarr
6✔
414

415
        save_to_zarr(ds, path, **save_kwargs)
6✔
416
    elif file_format == "nc":
6✔
417
        from .io import save_to_netcdf
6✔
418

419
        save_to_netcdf(ds, path, **save_kwargs)
6✔
420
    else:
421
        raise ValueError(f"file_format {file_format} is not valid. Use zarr or nc.")
×
422

423
    # update catalog
424
    pcat.update_from_ds(ds=ds, path=path, **update_kwargs)
6✔
425

426
    msg = f"File {path} has been saved successfully and the catalog was updated."
6✔
427
    logger.info(msg)
6✔
428

429

430
def move_and_delete(
6✔
431
    moving: list[list[str | os.PathLike]],
432
    pcat: ProjectCatalog,
433
    deleting: list[str | os.PathLike] | None = None,
434
    copy: bool = False,
435
):
436
    """
437
    First, move files, then update the catalog with new locations. Finally, delete directories.
438

439
    This function can be used at the end of for loop in a workflow to clean temporary files.
440

441
    Parameters
442
    ----------
443
    moving : list of lists of str or os.PathLike
444
        list of lists of path of files to move, following the format: [[source 1, destination1], [source 2, destination2],...]
445
    pcat : ProjectCatalog
446
        Catalog to update with new destinations
447
    deleting : list of str or os.PathLike, optional
448
        List of directories to be deleted, including all contents, and recreated empty. e.g. The working directory of a workflow.
449
    copy : bool, optional
450
        If True, copy directories instead of moving them.
451
    """
452
    if isinstance(moving, list) and isinstance(moving[0], list):
6✔
453
        for files in moving:
6✔
454
            source, dest = files[0], files[1]
6✔
455
            if Path(source).exists():
6✔
456
                if copy:
6✔
457
                    msg = f"Copying {source} to {dest}."
6✔
458
                    logger.info(msg)
6✔
459
                    copied_files = sh.copytree(source, dest, dirs_exist_ok=True)
6✔
460
                    for f in copied_files:
6✔
461
                        # copied files don't include zarr files
462
                        if f[-16:] == ".zarr/.zmetadata":
6✔
NEW
463
                            with warnings.catch_warnings():
×
464
                                # Silence RuntimeWarning about failed guess of backend engines
NEW
465
                                warnings.simplefilter("ignore", category=RuntimeWarning)
×
NEW
466
                                ds = xr.open_dataset(f[:-11])
×
467
                            pcat.update_from_ds(ds=ds, path=f[:-11])
×
468
                        if f[-3:] == ".nc":
6✔
469
                            ds = xr.open_dataset(f)
×
470
                            pcat.update_from_ds(ds=ds, path=f)
×
471
                else:
472
                    msg = f"Moving {source} to {dest}."
6✔
473
                    logger.info(msg)
6✔
474
                    sh.move(source, dest)
6✔
475
                if Path(dest).suffix in [".zarr", ".nc"]:
6✔
476
                    with warnings.catch_warnings():
6✔
477
                        # Silence RuntimeWarning about failed guess of backend engines
478
                        warnings.simplefilter("ignore", category=RuntimeWarning)
6✔
479
                        ds = xr.open_dataset(dest)
6✔
480
                    pcat.update_from_ds(ds=ds, path=dest)
6✔
481
            else:
482
                msg = f"You are trying to move {source}, but it does not exist."
×
483
                logger.info(msg)
×
484
    else:
485
        raise ValueError("`moving` should be a list of lists.")
×
486

487
    # erase workdir content if this is the last step
488
    if isinstance(deleting, list):
6✔
489
        for dir_to_delete in deleting:
6✔
490
            if Path(dir_to_delete).exists() and Path(dir_to_delete).is_dir():
6✔
491
                msg = f"Deleting content inside {dir_to_delete}."
6✔
492
                logger.info(msg)
6✔
493
                sh.rmtree(dir_to_delete)
6✔
494
                Path(dir_to_delete).mkdir()
6✔
495
    elif deleting is None:
6✔
496
        pass
6✔
497
    else:
498
        raise ValueError("`deleting` should be a list.")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc