• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Ouranosinc / xscen / 10908227621

17 Sep 2024 05:41PM UTC coverage: 79.281%. First build
10908227621

Pull #452

github

web-flow
Merge e730c3c29 into 5e11da5f2
Pull Request #452: Cookiecutter update

115 of 182 new or added lines in 13 files covered. (63.19%)

3440 of 4339 relevant lines covered (79.28%)

4.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.64
/src/xscen/scripting.py
1
"""A collection of various convenience objects and functions to use in scripts."""
2

3
import logging
6✔
4
import mimetypes
6✔
5
import os
6✔
6
import shutil as sh
6✔
7
import signal
6✔
8
import smtplib
6✔
9
import sys
6✔
10
import time
6✔
11
import warnings
6✔
12
from contextlib import contextmanager
6✔
13
from distutils.dir_util import copy_tree
6✔
14
from email.message import EmailMessage
6✔
15
from io import BytesIO
6✔
16
from pathlib import Path
6✔
17
from traceback import format_exception
6✔
18
from typing import Optional, Union
6✔
19

20
import xarray as xr
6✔
21
from matplotlib.figure import Figure
6✔
22

23
from .catalog import ProjectCatalog
6✔
24
from .config import parse_config
6✔
25
from .utils import get_cat_attrs
6✔
26

27
logger = logging.getLogger(__name__)
6✔
28

29
__all__ = [
6✔
30
    "TimeoutException",
31
    "measure_time",
32
    "move_and_delete",
33
    "save_and_update",
34
    "send_mail",
35
    "send_mail_on_exit",
36
    "skippable",
37
    "timeout",
38
]
39

40

41
@parse_config
6✔
42
def send_mail(
6✔
43
    *,
44
    subject: str,
45
    msg: str,
46
    to: Optional[str] = None,
47
    server: str = "127.0.0.1",
48
    port: int = 25,
49
    attachments: Optional[
50
        list[Union[tuple[str, Union[Figure, os.PathLike]], Figure, os.PathLike]]
51
    ] = None,
52
) -> None:
53
    """Send email.
54

55
    Email a single address through a login-less SMTP server.
56
    The default values of server and port should work out-of-the-box on Ouranos's systems.
57

58
    Parameters
59
    ----------
60
    subject: str
61
      Subject line.
62
    msg: str
63
      Main content of the email. Can be UTF-8 and multi-line.
64
    to: str, optional
65
      Email address to which send the email. If None (default), the email is sent to "{os.getlogin()}@{os.uname().nodename}".
66
      On unix systems simply put your real email address in `$HOME/.forward` to receive the emails sent to this local address.
67
    server : str
68
      SMTP server url. Defaults to 127.0.0.1, the local host. This function does not try to log-in.
69
    port: int
70
      Port of the SMTP service on the server. Defaults to 25, which is usually the default port on unix-like systems.
71
    attachments : list of paths or matplotlib figures or tuples of a string and a path or figure, optional
72
      List of files to attach to the email.
73
      Elements of the list can be paths, the mimetypes of those is guessed and the files are read and sent.
74
      Elements can also be matplotlib Figures which are send as png image (savefig) with names like "Figure00.png".
75
      Finally, elements can be tuples of a filename to use in the email and the attachment, handled as above.
76

77
    Returns
78
    -------
79
    None
80
    """
81
    # Inspired by https://betterprogramming.pub/how-to-send-emails-with-attachments-using-python-dd37c4b6a7fd
82
    email = EmailMessage()
×
83
    email["Subject"] = subject
×
84
    email["From"] = f"{os.getlogin()}@{os.uname().nodename}"
×
85
    email["To"] = to or f"{os.getlogin()}@{os.uname().nodename}"
×
86
    email.set_content(msg)
×
87

88
    for i, att in enumerate(attachments or []):
×
89
        fname = None
×
90
        if isinstance(att, tuple):
×
91
            fname, att = att
×
92
        if isinstance(att, Figure):
×
93
            data = BytesIO()
×
94
            att.savefig(data, format="png")
×
95
            data.seek(0)
×
96
            email.add_attachment(
×
97
                data.read(),
98
                maintype="image",
99
                subtype="png",
100
                filename=fname or f"Figure{i:02d}.png",
101
            )
102
        else:  # a path
103
            attpath = Path(att)
×
104
            ctype, encoding = mimetypes.guess_type(attpath)
×
105
            if ctype is None or encoding is not None:
×
106
                ctype = "application/octet-stream"
×
107
            maintype, subtype = ctype.split("/", 1)
×
108

109
            with attpath.open("rb") as fp:
×
110
                email.add_attachment(
×
111
                    fp.read(),
112
                    maintype=maintype,
113
                    subtype=subtype,
114
                    filename=fname or attpath.name,
115
                )
116

117
    with smtplib.SMTP(host=server, port=port) as SMTP:
×
118
        SMTP.send_message(email)
×
119

120

121
class ExitWatcher:
6✔
122
    """An object that watches system exits and exceptions before the python exits."""
123

124
    def __init__(self):
6✔
125
        self.code = None
6✔
126
        self.error = None
6✔
127
        self.hooked = False
6✔
128

129
    def hook(self):
6✔
130
        """Hooks the watcher to the system by monkeypatching `sys` with its own methods."""
131
        if not self.hooked:
6✔
132
            self.orig_exit = sys.exit
6✔
133
            self.orig_excepthook = sys.excepthook
6✔
134
            sys.exit = self.exit
6✔
135
            sys.excepthook = self.err_handler
6✔
136
            self.hooked = True
6✔
137
        else:
138
            warnings.warn("Exit hooks have already been overrided.")
×
139

140
    def unhook(self):
6✔
141
        if self.hooked:
×
142
            sys.exit = self.orig_exit
×
143
            sys.excepthook = self.orig_excepthook
×
144
        else:
145
            raise ValueError("Exit hooks were not overriden, can't unhook.")
×
146

147
    def exit(self, code=0):
6✔
148
        self.code = code
×
149
        self.orig_exit(code)
×
150

151
    def err_handler(self, *exc_info):
6✔
152
        self.error = exc_info
×
153
        self.orig_excepthook(*exc_info)
×
154

155

156
exit_watcher = ExitWatcher()
6✔
157
exit_watcher.hook()
6✔
158

159

160
@parse_config
6✔
161
def send_mail_on_exit(
6✔
162
    *,
163
    subject: Optional[str] = None,
164
    msg_ok: Optional[str] = None,
165
    msg_err: Optional[str] = None,
166
    on_error_only: bool = False,
167
    skip_ctrlc: bool = True,
168
    **mail_kwargs,
169
) -> None:
170
    """Send an email with content depending on how the system exited.
171

172
    This function is best used by registering it with `atexit`. Calls :py:func:`send_mail`.
173

174
    Parameters
175
    ----------
176
    subject : str, optional
177
      Email subject. Will be appended by "Success", "No errors" or "Failure" depending
178
      on how the system exits.
179
    msg_ok : str, optional
180
      Content of the email if the system exists successfully.
181
    msg_err : str, optional
182
      Content of the email id the system exists with a non-zero code or with an error.
183
      The message will be appended by the exit code or with the error traceback.
184
    on_error_only : boolean
185
      Whether to only send an email on a non-zero/error exit.
186
    skip_ctrlc : boolean
187
      If True (default), exiting with a KeyboardInterrupt will not send an email.
188
    mail_kwargs
189
      Other arguments passed to :py:func:`send_mail`.
190
      The `to` argument is necessary for this function to work.
191

192
    Returns
193
    -------
194
    None
195

196
    Example
197
    -------
198
    Send an eamil titled "Woups" upon non-successful program exit. We assume the `to`
199
    field was given in the config.
200

201
    >>> import atexit
202
    >>> atexit.register(send_mail_on_exit, subject="Woups", on_error_only=True)
203
    """
204
    subject = subject or "Workflow"
×
205
    msg_err = msg_err or "Workflow exited with some errors."
×
206
    if (
×
207
        not on_error_only
208
        and exit_watcher.error is None
209
        and exit_watcher.code in [None, 0]
210
    ):
211
        send_mail(
×
212
            subject=subject + " - Success",
213
            msg=msg_ok or "Workflow exited successfully.",
214
            **mail_kwargs,
215
        )
216
    elif exit_watcher.error is None and (exit_watcher.code or 0) > 0:
×
217
        send_mail(
×
218
            subject=subject + " - No errors",
219
            msg=f"{msg_err}\nSystem exited with code {exit_watcher.code}.",
220
            **mail_kwargs,
221
        )
222
    elif exit_watcher.error is not None and (
×
223
        exit_watcher.error[0] is not KeyboardInterrupt or not skip_ctrlc
224
    ):
225
        tb = "".join(format_exception(*exit_watcher.error))
×
226
        msg_err = f"{msg_err}\n\n{tb}"
×
227
        send_mail(subject=subject + " - Failure", msg=msg_err, **mail_kwargs)
×
228

229

230
@parse_config
6✔
231
class measure_time:
6✔
232
    """Context for timing a code block.
233

234
    Parameters
235
    ----------
236
    name : str, optional
237
      A name to give to the block being timed, for meaningful logging.
238
    cpu : boolean
239
      If True, the CPU time is also measured and logged.
240
    logger : logging.Logger, optional
241
      The logger object to use when sending Info messages with the measured time.
242
      Defaults to a logger from this module.
243
    """
244

245
    def __init__(
6✔
246
        self,
247
        name: Optional[str] = None,
248
        cpu: bool = False,
249
        logger: logging.Logger = logger,
250
    ):
251
        self.name = name or ""
×
252
        self.cpu = cpu
×
253
        self.logger = logger
×
254

255
    def __enter__(self):  # noqa: D105
6✔
256
        self.start = time.perf_counter()
×
257
        self.start_cpu = time.process_time()
×
NEW
258
        msg = f"Started process {self.name}."
×
NEW
259
        self.logger.info(msg)
×
260
        return
×
261

262
    def __exit__(self, *args, **kwargs):  # noqa: D105
6✔
263
        elapsed = time.perf_counter() - self.start
×
264
        elapsed_cpu = time.process_time() - self.start_cpu
×
265
        occ = elapsed_cpu / elapsed
×
266
        s = f"Process {self.name} done in {elapsed:.02f} s"
×
267
        if self.cpu:
×
268
            s += f" and used {elapsed_cpu:.02f} of cpu time ({occ:.1%} % occupancy)."
×
269

270
        self.logger.info(s)
×
271

272

273
# FIXME: This should be written as "TimeoutError"
274
class TimeoutException(Exception):  # noqa: N818
6✔
275
    """An exception raised with a timeout occurs."""
276

277
    def __init__(self, seconds: int, task: str = "", **kwargs):
6✔
278
        self.msg = f"Task {task} timed out after {seconds} seconds"
×
279
        super().__init__(self.msg, **kwargs)
×
280

281

282
@contextmanager
6✔
283
def timeout(seconds: int, task: str = ""):
6✔
284
    """Timeout context manager.
285

286
    Only one can be used at a time, this is not multithread-safe : it cannot be used in
287
    another thread than the main one, but multithreading can be used in parallel.
288

289
    Parameters
290
    ----------
291
    seconds : int
292
      Number of seconds after which the context exits with a TimeoutException.
293
      If None or negative, no timeout is set and this context does nothing.
294
    task : str, optional
295
      A name to give to the task, allowing a more meaningful exception.
296
    """
297
    if seconds is None or seconds <= 0:
×
298
        yield
×
299
    else:
300

301
        def _timeout_handler(signum, frame):
×
302
            raise TimeoutException(seconds, task)
×
303

304
        old_handler = signal.signal(signal.SIGALRM, _timeout_handler)
×
305
        signal.alarm(seconds)
×
306
        try:
×
307
            yield
×
308
        finally:
309
            signal.alarm(0)
×
310
            signal.signal(signal.SIGALRM, old_handler)
×
311

312

313
@contextmanager
6✔
314
def skippable(
6✔
315
    seconds: int = 2, task: str = "", logger: Optional[logging.Logger] = None
316
):
317
    """Skippable context manager.
318

319
    When CTRL-C (SIGINT, KeyboardInterrupt) is sent within the context,
320
    this catches it, prints to the log and gives a timeout during which a subsequent
321
    interruption will stop the script. Otherwise, the context exits normally.
322

323
    This is meant to be used within a loop so that we can skip some iterations:
324

325
    .. code-block:: python
326

327
        for i in iterable:
328
            with skippable(2, i):
329
                some_skippable_code()
330

331
    Parameters
332
    ----------
333
    seconds: int
334
      Number of seconds to wait for a second CTRL-C.
335
    task : str
336
      A name for the skippable task, to have an explicit script.
337
    logger : logging.Logger, optional
338
      The logger to use when printing the messages. The interruption signal is
339
      notified with ERROR, while the skipping is notified with INFO.
340
      If not given (default), a brutal print is used.
341
    """
342
    if logger:
×
343
        err = logger.error
×
344
        inf = logger.info
×
345
    else:
346
        err = inf = print
×
347
    try:
×
348
        yield
×
349
    except KeyboardInterrupt:
×
350
        err("Received SIGINT. Do it again to stop the script.")
×
351
        time.sleep(seconds)
×
352
        inf(f"Skipping {task}.")
×
353

354

355
def save_and_update(
6✔
356
    ds: xr.Dataset,
357
    pcat: ProjectCatalog,
358
    path: Optional[Union[str, os.PathLike]] = None,
359
    file_format: Optional[str] = None,
360
    build_path_kwargs: Optional[dict] = None,
361
    save_kwargs: Optional[dict] = None,
362
    update_kwargs: Optional[dict] = None,
363
):
364
    """
365
    Construct the path, save and delete.
366

367
    This function can be used after each task of a workflow.
368

369
    Parameters
370
    ----------
371
    ds: xr.Dataset
372
        Dataset to save.
373
    pcat: ProjectCatalog
374
        Catalog to update after saving the dataset.
375
    path: str or os.pathlike, optional
376
        Path where to save the dataset.
377
        If the string contains variables in curly bracket. They will be filled by catalog attributes.
378
        If None, the `catutils.build_path` fonction will be used to create a path.
379
    file_format: {'nc', 'zarr'}
380
        Format of the file.
381
        If None, look for the following in order: build_path_kwargs['format'], a suffix in path, ds.attrs['cat:format'].
382
        If nothing is found, it will default to zarr.
383
    build_path_kwargs: dict, optional
384
        Arguments to pass to `build_path`.
385
    save_kwargs: dict, optional
386
        Arguments to pass to `save_to_netcdf` or `save_to_zarr`.
387
    update_kwargs: dict, optional
388
        Arguments to pass to `update_from_ds`.
389
    """
390
    build_path_kwargs = build_path_kwargs or {}
6✔
391
    save_kwargs = save_kwargs or {}
6✔
392
    update_kwargs = update_kwargs or {}
6✔
393

394
    # try to guess file format if not given.
395
    if file_format is None:
6✔
396
        if "format" in build_path_kwargs:
6✔
397
            file_format = build_path_kwargs.get("format")
×
398
        elif path is not None and Path(path).suffix:
6✔
399
            file_format = Path(path).suffix.split(".")[-1]
6✔
400
        else:
401
            file_format = ds.attrs.get("cat:format", "zarr")
×
402

403
    # get path
404
    if path is not None:
6✔
405
        path = str(path).format(
6✔
406
            **get_cat_attrs(ds, var_as_str=True)
407
        )  # fill path with attrs
408
    else:  # if path is not given build it
409
        build_path_kwargs.setdefault("format", file_format)
6✔
410
        from .catutils import build_path
6✔
411

412
        path = build_path(ds, **build_path_kwargs)
6✔
413

414
    # save
415
    if file_format == "zarr":
6✔
416
        from .io import save_to_zarr
6✔
417

418
        save_to_zarr(ds, path, **save_kwargs)
6✔
419
    elif file_format == "nc":
6✔
420
        from .io import save_to_netcdf
6✔
421

422
        save_to_netcdf(ds, path, **save_kwargs)
6✔
423
    else:
424
        raise ValueError(f"file_format {file_format} is not valid. Use zarr or nc.")
×
425

426
    # update catalog
427
    pcat.update_from_ds(ds=ds, path=path, **update_kwargs)
6✔
428

429
    msg = f"File {path} has been saved successfully and the catalog was updated."
6✔
430
    logger.info(msg)
6✔
431

432

433
def move_and_delete(
6✔
434
    moving: list[list[Union[str, os.PathLike]]],
435
    pcat: ProjectCatalog,
436
    deleting: Optional[list[Union[str, os.PathLike]]] = None,
437
    copy: bool = False,
438
):
439
    """
440
    First, move files, then update the catalog with new locations. Finally, delete directories.
441

442
    This function can be used at the end of for loop in a workflow to clean temporary files.
443

444
    Parameters
445
    ----------
446
    moving: list of lists of str or os.PathLike
447
        list of lists of path of files to move, following the format: [[source 1, destination1], [source 2, destination2],...]
448
    pcat: ProjectCatalog
449
        Catalog to update with new destinations
450
    deleting: list of str or os.PathLike, optional
451
        list of directories to be deleted including all contents and recreated empty.
452
        E.g. the working directory of a workflow.
453
    copy: bool, optional
454
        If True, copy directories instead of moving them.
455

456
    """
457
    if isinstance(moving, list) and isinstance(moving[0], list):
6✔
458
        for files in moving:
6✔
459
            source, dest = files[0], files[1]
6✔
460
            if Path(source).exists():
6✔
461
                if copy:
6✔
462
                    msg = f"Copying {source} to {dest}."
6✔
463
                    logger.info(msg)
6✔
464
                    copied_files = copy_tree(source, dest)
6✔
465
                    for f in copied_files:
6✔
466
                        # copied files don't include zarr files
467
                        if f[-16:] == ".zarr/.zmetadata":
6✔
468
                            ds = xr.open_dataset(f[:-11])
6✔
469
                            pcat.update_from_ds(ds=ds, path=f[:-11])
6✔
470
                        if f[-3:] == ".nc":
6✔
471
                            ds = xr.open_dataset(f)
×
472
                            pcat.update_from_ds(ds=ds, path=f)
×
473
                else:
474
                    msg = f"Moving {source} to {dest}."
6✔
475
                    logger.info(msg)
6✔
476
                    sh.move(source, dest)
6✔
477
                if Path(dest).suffix in [".zarr", ".nc"]:
6✔
478
                    ds = xr.open_dataset(dest)
6✔
479
                    pcat.update_from_ds(ds=ds, path=dest)
6✔
480
            else:
NEW
481
                msg = f"You are trying to move {source}, but it does not exist."
×
NEW
482
                logger.info(msg)
×
483
    else:
484
        raise ValueError("`moving` should be a list of lists.")
×
485

486
    # erase workdir content if this is the last step
487
    if isinstance(deleting, list):
6✔
488
        for dir_to_delete in deleting:
6✔
489
            if Path(dir_to_delete).exists() and Path(dir_to_delete).is_dir():
6✔
490
                msg = f"Deleting content inside {dir_to_delete}."
6✔
491
                logger.info(msg)
6✔
492
                sh.rmtree(dir_to_delete)
6✔
493
                Path(dir_to_delete).mkdir()
6✔
494
    elif deleting is None:
6✔
495
        pass
6✔
496
    else:
497
        raise ValueError("`deleting` should be a list.")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc