• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

davidfischer-ch / pytoolbox / 8424226214

25 Mar 2024 05:18PM UTC coverage: 67.055% (-0.003%) from 67.058%
8424226214

push

github

davidfischer-ch
Add param in doc

0 of 1 new or added line in 1 file covered. (0.0%)

39 existing lines in 3 files now uncovered.

7547 of 11255 relevant lines covered (67.05%)

1.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.71
/pytoolbox/subprocess.py
1
from __future__ import annotations
2✔
2

3
from collections.abc import Iterable
2✔
4
from pathlib import Path
2✔
5
from typing import Final, TypeAlias
2✔
6
import errno
2✔
7
import fcntl
2✔
8
import grp
2✔
9
import logging
2✔
10
import multiprocessing
2✔
11
import os
2✔
12
import pwd
2✔
13
import random
2✔
14
import re
2✔
15
import setuptools.archive_util
2✔
16
import shlex
2✔
17
import shutil
2✔
18
import subprocess
2✔
19
import threading
2✔
20
import time
2✔
21

22
from . import exceptions, filesystem, module
2✔
23
from .logging import LoggerType, get_logger
2✔
24
from .decorators import deprecated
2✔
25

26
log = logging.getLogger(__name__)
2✔
27

28
_all = module.All(globals())
2✔
29

30
EMPTY_CMD_RETURN: Final[dict[str, None]] = {
2✔
31
    'process': None,
2✔
32
    'stdout': None,
2✔
33
    'stderr': None,
2✔
34
    'returncode': None
2✔
35
}
36

37
# import Popen on steroids if available
38
try:
2✔
39
    from psutil import NoSuchProcess, Popen
2✔
40
except ImportError:
2✔
41
    from subprocess import Popen
2✔
42
    NoSuchProcess = None
2✔
43

44
try:
2✔
45
    from shlex import quote
2✔
46
except ImportError:
×
47
    from pipes import quote  # pylint: disable=deprecated-module
×
48

49

50
# Better to warn user than letting converting to string Any!
51
# None will be stripped automatically
52
CallArgType: TypeAlias = int | float | str | Path | None
2✔
53
CallArgsType: TypeAlias = str | Iterable[CallArgType]
2✔
54

55

56
def kill(process):
2✔
57
    try:
2✔
58
        process.kill()
2✔
59
    except OSError as ex:
×
60
        if ex.errno != errno.ESRCH:
×
61
            raise
×
62
    except Exception as ex:  # pylint:disable=broad-except
×
63
        if not NoSuchProcess or not isinstance(ex, NoSuchProcess):
×
64
            raise
×
65

66

67
def su(user, group):  # pylint:disable=invalid-name
2✔
68
    """
69
    Return a function to change current user/group id.
70

71
    **Example usage**
72

73
    >> import subprocess
74
    >> subprocess.call(['ls', '/'], preexec_fn=su(1000, 1000))
75
    >> subprocess.call(['ls', '/'], preexec_fn=su('root', 'root'))
76
    """
77
    def set_ids():
×
78
        os.setgid(grp.getgrnam(group).gr_gid if isinstance(group, str) else group)
×
79
        os.setuid(pwd.getpwnam(user).pw_uid if isinstance(user, str) else user)
×
80
    return set_ids
×
81

82

83
# http://stackoverflow.com/a/7730201/190597
84
def make_async(fd):  # pylint:disable=invalid-name
2✔
85
    """Add the O_NONBLOCK flag to a file descriptor."""
86
    fcntl.fcntl(fd, fcntl.F_SETFL, fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK)
87

88

89
# http://stackoverflow.com/a/7730201/190597
90
def read_async(fd):  # pylint:disable=invalid-name
91
    """Read some data from a file descriptor, ignoring EAGAIN errors."""
92
    try:
×
93
        return fd.read()
×
94
    except IOError as ex:
×
95
        if ex.errno == errno.EAGAIN:
×
96
            return ''
×
97
        raise
×
98

99

100
def to_args_list(args: CallArgsType | None) -> list[str]:
2✔
101
    if not args:
2✔
102
        return []
2✔
103
    return shlex.split(args) if isinstance(args, str) else [str(a) for a in args if a is not None]
2✔
104

105

106
def to_args_string(args: CallArgsType | None) -> str:
2✔
107
    if not args:
2✔
108
        return ''
2✔
109
    return args if isinstance(args, str) else ' '.join(quote(str(a)) for a in args if a is not None)
2✔
110

111

112
# --------------------------------------------------------------------------------------------------
113

114
def raw_cmd(arguments: CallArgsType, *, shell: bool = False, **kwargs) -> Popen:
2✔
115
    """
116
    Launch a subprocess.
117

118
    This function ensure that:
119

120
    * subprocess arguments will be converted to a string if `shell` is True
121
    * subprocess.args is set to the arguments of the subprocess
122
    """
123
    arguments = to_args_list(arguments)
2✔
124
    process = Popen(to_args_string(arguments) if shell else arguments, shell=shell, **kwargs)
2✔
125
    if not hasattr(process, 'args'):
2✔
126
        process.args = arguments
×
127
    return process
2✔
128

129

130
# thanks http://stackoverflow.com/questions/1191374$
131
def _communicate_with_timeout(*, data, process, input):  # pylint:disable=redefined-builtin
2✔
132
    data['stdout'], data['stderr'] = process.communicate(input=input)
2✔
133

134

135
def cmd(  # pylint:disable=too-many-arguments,too-many-branches,too-many-locals,too-many-statements
2✔
UNCOV
136
    command: CallArgsType,
1✔
137
    *,
×
138
    user: str | None = None,
2✔
139
    input: str | None = None,  # pylint:disable=redefined-builtin
2✔
140
    cli_input: str | None = None,
2✔
141
    cli_output: bool = False,
2✔
142
    communicate: bool = True,
2✔
143
    timeout: float | None = None,
2✔
144
    fail: bool = True,
2✔
145
    log: LoggerType = log,  # pylint:disable=redefined-outer-name
2✔
146
    tries: int = 1,
2✔
147
    delay_min: float = 5,
2✔
148
    delay_max: float = 10,
2✔
149
    success_codes: tuple[int, ...] = (0, ),
2✔
UNCOV
150
    **kwargs
×
151
) -> dict:
1✔
152
    """
×
UNCOV
153
    Calls the `command` and returns a dictionary with process, stdout, stderr, and the returncode.
×
154

UNCOV
155
    Returned returncode, stdout and stderr will be None if `communicate` is set to False.
×
156

157
    :param command: The command to execute.
×
158
    :param user: If set, this will use ``sudo -u <user> ...`` to execute `command` as `user`.
×
159
    :param input: If set, sended to stdin (if `communicate` is True).
×
160
    :param cli_input: If set, sended to stdin (no condition).
×
161
    :param cli_output: Set to True to output (in real-time) stdout to stdout and stderr to stderr.
×
162
    :param fail: Set to False to avoid the exception `exceptions.CalledProcessError`.
×
163
    :param log: A function to log/print details about what is executed/any failure, can be a logger.
×
164
    :param communicate: Set to True to communicate with the process, this is a locking call
×
165
                        (if timeout is None).
×
166
    :param timeout: Time-out for the communication with the process, in seconds.
×
167
    :param tries: How many times you want the command to be retried ?
×
168
    :param delay_min: Minimum delay to sleep after every attempt communicate must be True.
×
169
    :param delay_max: Maximum delay to sleep after every attempt communicate must be True.
×
NEW
170
    :param success_codes: Terraform plan may return code 3 for modified, so success_codes=(0, 3).
×
UNCOV
171
    :param kwargs: Any argument of the :mod:`subprocess`.Popen constructor
×
UNCOV
172
                   excepting stdin, stdout and stderr
×
173

UNCOV
174
    The delay will be a random number in range (`delay_min`, `delay_max`).
×
175

UNCOV
176
    """
×
177
    log = get_logger(log)
2✔
178

179
    # Process arguments
180
    args_list = to_args_list(command)
2✔
181
    if user is not None:
2✔
UNCOV
182
        args_list = ['sudo', '-u', user, *command]
×
183
    args_string = to_args_string(args_list)
2✔
184

185
    # log the execution
186
    log.debug(''.join([
2✔
187
        'Execute ',
2✔
188
        '' if input is None else f'echo {repr(input)} | ',
2✔
189
        args_string,
2✔
190
        '' if cli_input is None else f' < {repr(cli_input)}'
2✔
191
    ]))
192

193
    for trial in range(tries):  # noqa
2✔
194
        # create the sub-process
195
        try:
2✔
196
            process = Popen(
2✔
197
                args_list,
2✔
198
                stdin=subprocess.PIPE,
2✔
199
                stdout=None if cli_output else subprocess.PIPE,
2✔
200
                stderr=None if cli_output else subprocess.PIPE, **kwargs)
2✔
201
        except OSError as ex:
2✔
202
            # Unable to execute the program (e.g. does not exist)
203
            log.exception(ex)
2✔
204
            if fail:
2✔
205
                raise
2✔
206
            return {'process': None, 'stdout': '', 'stderr': ex, 'returncode': 2}
2✔
207

208
        # Write to stdin (answer to questions, ...)
209
        if cli_input is not None:
2✔
210
            process.stdin.write(cli_input)
×
UNCOV
211
            process.stdin.flush()
×
212

213
        # Interact with the process and wait for the process to terminate
214
        if communicate:
2✔
215
            data: dict = {}
2✔
216
            thread = threading.Thread(
2✔
217
                target=_communicate_with_timeout,
2✔
218
                kwargs={'data': data, 'input': input, 'process': process})
2✔
219
            thread.start()
2✔
220
            thread.join(timeout=timeout)
2✔
221
            if thread.is_alive():
2✔
222
                try:
×
223
                    process.terminate()
×
224
                    thread.join()
×
UNCOV
225
                except OSError as ex:
×
226
                    # Manage race condition with process that may terminate just after the call to
227
                    # thread.is_alive() !
228
                    if ex.errno != errno.ESRCH:
×
UNCOV
229
                        raise
×
230
            stdout, stderr = data['stdout'], data['stderr']
2✔
UNCOV
231
        else:
×
232
            # get a return code that may be None of course ...
233
            process.poll()
×
UNCOV
234
            stdout = stderr = None
×
235

236
        result = {
2✔
237
            'process': process,
2✔
238
            'stdout': stdout,
2✔
239
            'stderr': stderr,
2✔
240
            'returncode': process.returncode
2✔
241
        }
242

243
        if process.returncode in success_codes:
2✔
244
            break
2✔
245

246
        # failed attempt, may retry
247
        do_retry = trial < tries - 1
2✔
248
        delay = random.uniform(delay_min, delay_max)
2✔
249
        log.warning(' '.join([
2✔
250
            f'Attempt {trial + 1} out of {tries}:',
2✔
251
            f'Will retry in {delay} seconds' if do_retry else 'Failed'
2✔
252
        ]))
253

254
        # raise if this is the last try
255
        if fail and not do_retry:
2✔
256
            raise exceptions.CalledProcessError(
2✔
257
                returncode=process.returncode,
2✔
258
                cmd=args_string,
2✔
259
                stdout=stdout,
2✔
260
                stderr=stderr)
2✔
261

262
        if do_retry:
2✔
263
            time.sleep(delay)
2✔
264

265
    return result
2✔
266

267

268
# --------------------------------------------------------------------------------------------------
269

270
@deprecated
2✔
271
def git_clone_or_pull(*args, **kwargs) -> None:
2✔
272
    from pytoolbox.git import clone_or_pull  # pylint:disable=import-outside-toplevel
×
UNCOV
273
    return clone_or_pull(*args, **kwargs)
×
274

275

276
# --------------------------------------------------------------------------------------------------
277

278
def make(
2✔
279
    archive: Path,
1✔
280
    directory: Path,
1✔
281
    *,
282
    with_cmake: bool = False,
2✔
283
    configure_options: str = '',
2✔
284
    install: bool = True,
2✔
285
    remove_temporary: bool = True,
2✔
286
    make_options: str = f'-j{multiprocessing.cpu_count()}',
2✔
287
    **kwargs
288
) -> dict[str, dict]:
1✔
289
    """Build and optionally install a piece of software from source."""
290
    results = {}
291
    setuptools.archive_util.unpack_archive(archive, directory)
292
    with filesystem.chdir(directory):
293
        if with_cmake:
294
            filesystem.makedirs(Path('build'))
295
            os.chdir('build')
296
            results['cmake'] = cmd('cmake -DCMAKE_BUILD_TYPE=RELEASE ..', **kwargs)
297
        else:
298
            results['configure'] = cmd(f'./configure {configure_options}', **kwargs)
299
        results['make'] = cmd(f'make {make_options}', **kwargs)
300
        if install:
301
            results['make install'] = cmd('make install', **kwargs)
302
    if remove_temporary:
303
        shutil.rmtree(directory)
304

305
    return results
306

307

308
# --------------------------------------------------------------------------------------------------
309

310
def rsync(  # pylint:disable=too-many-arguments,too-many-locals
2✔
311
    source: Path,
1✔
312
    destination: Path,
1✔
313
    *,
314
    source_is_dir: bool = False,
2✔
315
    destination_is_dir: bool = False,
2✔
316
    archive: bool = True,
2✔
317
    delete: bool = False,
2✔
318
    exclude_vcs: bool = False,
2✔
319
    progress: bool = False,
2✔
320
    recursive: bool = False,
2✔
321
    simulate: bool = False,
2✔
322
    excludes: Iterable[str] | None = None,
2✔
323
    includes: Iterable[str] | None = None,
2✔
324
    rsync_path: Path | None = None,
2✔
325
    size_only: bool = False,
2✔
326
    extra: str | None = None,
2✔
327
    extra_args: list[CallArgType] | None = None,
2✔
328
    **kwargs
329
) -> dict:
1✔
330
    """Execute the famous rsync remote (or local) synchronization tool."""
331
    source_string = str(source)
332
    if source.is_dir() or source_is_dir:
333
        source_string += os.sep
334

335
    destination_string = str(destination)
336
    if destination.is_dir() or destination_is_dir:
337
        destination_string += os.sep
338

339
    command: list[CallArgType] = [
340
        'rsync',
341
        '-a' if archive else None,
342
        '--delete' if delete else None,
343
        '--progress' if progress else None,
344
        '-r' if recursive else None,
345
        '--dry-run' if simulate else None,
346
        '--size-only' if size_only else None
347
    ]
348

349
    if rsync_path is not None:
350
        command += ['--rsync-path', rsync_path]
351
    if extra is not None:
352
        command += ['-e', extra]
353
    if excludes is not None:
354
        command += [f'--exclude={e}' for e in excludes]
355
    if includes is not None:
356
        command += [f'--include={i}' for i in includes]
357
    if exclude_vcs:
358
        command += ['--exclude=.svn', '--exclude=.git']
359
    if extra_args is not None:
360
        command += extra_args
361
    command += [source_string, destination_string]
362

363
    return cmd([c for c in command if c], **kwargs)
364

365

366
def screen_kill(name: str | None = None, *, fail: bool = True, **kwargs):
2✔
367
    """Kill all screen instances called `name` or all if `name` is None."""
368
    for instance_name in screen_list(name=name, **kwargs):
369
        cmd(['screen', '-S', instance_name, '-X', 'quit'], fail=fail, **kwargs)
370

371

372
def screen_launch(name: str, command: CallArgsType, **kwargs) -> dict:
373
    """Launch a new named screen instance."""
374
    return cmd(['screen', '-dmS', name, *to_args_list(command)], **kwargs)
2✔
375

376

377
def screen_list(name: str | None = None, **kwargs) -> list[str]:
2✔
378
    """Returns a list containing all instances of screen. Can be filtered by `name`."""
379
    screens = cmd(['screen', '-ls', name], fail=False, **kwargs)['stdout']
380
    return re.findall(r'\s+(\d+.\S+)\s+\(.*\).*', screens.decode('utf-8'))
381

382

383
@deprecated
384
def ssh(*args, **kwargs) -> dict:
385
    from pytoolbox.ssh import ssh as _ssh
386
    return _ssh(*args, **kwargs)
387

388

389
__all__ = _all.diff(globals())
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc