• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

davidfischer-ch / pytoolbox / 8542654626

03 Apr 2024 05:02PM UTC coverage: 67.555%. Remained the same
8542654626

push

github

web-flow
Rename migrations

7708 of 11410 relevant lines covered (67.55%)

1.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.3
/pytoolbox/subprocess.py
1
from __future__ import annotations
2✔
2

3
from collections.abc import Callable, Iterable
2✔
4
from pathlib import Path
2✔
5
from typing import TypeAlias, TypedDict
2✔
6
import errno
2✔
7
import fcntl
2✔
8
import grp
2✔
9
import multiprocessing
2✔
10
import os
2✔
11
import pwd
2✔
12
import random
2✔
13
import re
2✔
14
import setuptools.archive_util
2✔
15
import shlex
2✔
16
import shutil
2✔
17
import subprocess
2✔
18
import threading
2✔
19
import time
2✔
20

21
from . import exceptions, filesystem, module
2✔
22
from .decorators import deprecated
2✔
23
from .logging import get_logger, LoggerType
2✔
24

25
_all = module.All(globals())
2✔
26

27
# import Popen on steroids if available
28
try:
2✔
29
    from psutil import NoSuchProcess, Popen
2✔
30
except ImportError:
2✔
31
    from subprocess import Popen
2✔
32
    NoSuchProcess = None
2✔
33

34
try:
2✔
35
    from shlex import quote
2✔
36
except ImportError:
×
37
    from pipes import quote  # pylint: disable=deprecated-module
×
38

39

40
# Better to warn user than letting converting to string Any!
41
# None will be stripped automatically
42
CallArgType: TypeAlias = int | float | str | Path | None
2✔
43
CallArgsType: TypeAlias = str | Iterable[CallArgType]
2✔
44

45

46
class CallResult(TypedDict):
2✔
47
    process: Popen | None
2✔
48
    returncode: int
2✔
49
    stdout: bytes | None
2✔
50
    stderr: bytes | None
2✔
51
    exception: OSError | None
2✔
52

53

54
def kill(process: Popen) -> None:
2✔
55
    try:
2✔
56
        process.kill()
2✔
57
    except OSError as ex:
×
58
        if ex.errno != errno.ESRCH:
×
59
            raise
×
60
    except Exception as ex:  # pylint:disable=broad-except
×
61
        if not NoSuchProcess or not isinstance(ex, NoSuchProcess):
×
62
            raise
×
63

64

65
def su(user: str | int, group: str | int) -> Callable:  # pylint:disable=invalid-name
2✔
66
    """
67
    Return a function to change current user/group id.
68

69
    **Example usage**
70

71
    >> import subprocess
72
    >> subprocess.call(['ls', '/'], preexec_fn=su(1000, 1000))
73
    >> subprocess.call(['ls', '/'], preexec_fn=su('root', 'root'))
74
    """
75
    def set_ids():
×
76
        os.setgid(grp.getgrnam(group).gr_gid if isinstance(group, str) else group)
×
77
        os.setuid(pwd.getpwnam(user).pw_uid if isinstance(user, str) else user)
×
78
    return set_ids
×
79

80

81
# http://stackoverflow.com/a/7730201/190597
82
def make_async(fd) -> None:  # pylint:disable=invalid-name
2✔
83
    """Add the O_NONBLOCK flag to a file descriptor."""
84
    fcntl.fcntl(fd, fcntl.F_SETFL, fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK)
85

86

87
# http://stackoverflow.com/a/7730201/190597
88
def read_async(fd) -> str:  # pylint:disable=invalid-name
89
    """Read some data from a file descriptor, ignoring EAGAIN errors."""
90
    try:
×
91
        return fd.read()
×
92
    except IOError as ex:
×
93
        if ex.errno == errno.EAGAIN:
×
94
            return ''
×
95
        raise
×
96

97

98
def to_args_list(args: CallArgsType | None) -> list[str]:
2✔
99
    if not args:
2✔
100
        return []
2✔
101
    return shlex.split(args) if isinstance(args, str) else [str(a) for a in args if a is not None]
2✔
102

103

104
def to_args_string(args: CallArgsType | None) -> str:
2✔
105
    if not args:
2✔
106
        return ''
2✔
107
    return args if isinstance(args, str) else ' '.join(quote(str(a)) for a in args if a is not None)
2✔
108

109

110
# --------------------------------------------------------------------------------------------------
111

112

113
def raw_cmd(arguments: CallArgsType, *, shell: bool = False, **kwargs) -> Popen:
2✔
114
    """
115
    Launch a subprocess.
116

117
    This function ensure that:
118

119
    * subprocess arguments will be converted to a string if `shell` is True
120
    * subprocess.args is set to the arguments of the subprocess
121
    """
122
    arguments = to_args_list(arguments)
2✔
123
    process = Popen(to_args_string(arguments) if shell else arguments, shell=shell, **kwargs)
2✔
124
    if not hasattr(process, 'args'):
2✔
125
        process.args = arguments
×
126
    return process
2✔
127

128

129
# thanks http://stackoverflow.com/questions/1191374$
130
def _communicate_with_timeout(*, data, process, input) -> None:  # pylint:disable=redefined-builtin
2✔
131
    data['stdout'], data['stderr'] = process.communicate(input=input)
2✔
132

133

134
# TODO Refine type hints with overloads
135
def cmd(  # pylint:disable=too-many-arguments,too-many-branches,too-many-locals,too-many-statements
2✔
136
    command: CallArgsType,
1✔
137
    *,
×
138
    user: str | None = None,
2✔
139
    input: str | None = None,  # pylint:disable=redefined-builtin
2✔
140
    cli_input: str | None = None,
2✔
141
    cli_output: bool = False,
2✔
142
    communicate: bool = True,
2✔
143
    timeout: float | None = None,
2✔
144
    fail: bool = True,
2✔
145
    log: LoggerType = None,  # pylint:disable=redefined-outer-name
2✔
146
    tries: int = 1,
2✔
147
    delay_min: float = 5,
2✔
148
    delay_max: float = 10,
2✔
149
    success_codes: Iterable[int] = (0, ),
2✔
150
    **kwargs
×
151
):
×
152
    """
×
153
    Calls the `command` and returns a dictionary with process, stdout, stderr, and the returncode.
×
154

155
    Returned returncode, stdout and stderr will be None if `communicate` is set to False.
×
156

157
    :param command: The command to execute.
×
158
    :param user: If set, this will use ``sudo -u <user> ...`` to execute `command` as `user`.
×
159
    :param input: If set, sended to stdin (if `communicate` is True).
×
160
    :param cli_input: If set, sended to stdin (no condition).
×
161
    :param cli_output: Set to True to output (in real-time) stdout to stdout and stderr to stderr.
×
162
    :param fail: Set to False to avoid the exception `exceptions.CalledProcessError`.
×
163
    :param log: If set then override the default logger named `pytoolbox.subprocess.cmd.<binary>`.
×
164
    :param communicate: Set to True to communicate with the process, this is a locking call
×
165
                        (if timeout is None).
×
166
    :param timeout: Time-out for the communication with the process, in seconds.
×
167
    :param tries: How many times you want the command to be retried ?
×
168
    :param delay_min: Minimum delay to sleep after every attempt communicate must be True.
×
169
    :param delay_max: Maximum delay to sleep after every attempt communicate must be True.
×
170
    :param success_codes: Terraform plan may return code 3 for modified, so success_codes=(0, 3).
×
171
    :param kwargs: Any argument of the :mod:`subprocess`.Popen constructor
×
172
                   excepting stdin, stdout and stderr
×
173

174
    The delay will be a random number in range (`delay_min`, `delay_max`).
×
175

176
    """
×
177
    # Process arguments
178
    process_cmd: list[str] = to_args_list(command)
2✔
179
    log = get_logger(log or f'{__name__}.cmd.{Path(process_cmd[0]).name}')
2✔
180

181
    if user is not None:
2✔
182
        process_cmd = ['sudo', '-u', user, *process_cmd]
×
183

184
    # log the execution
185
    log.debug(''.join([
2✔
186
        'Execute ',
2✔
187
        '' if input is None else f'echo {repr(input)} | ',
2✔
188
        to_args_string(process_cmd),
2✔
189
        '' if cli_input is None else f' < {repr(cli_input)}'
2✔
190
    ]))
191

192
    for trial in range(tries):  # noqa
2✔
193
        # create the sub-process
194
        try:
2✔
195
            process = Popen(
2✔
196
                process_cmd,
2✔
197
                stdin=subprocess.PIPE,
2✔
198
                stdout=None if cli_output else subprocess.PIPE,
2✔
199
                stderr=None if cli_output else subprocess.PIPE,
2✔
200
                **kwargs)
2✔
201
        except OSError as ex:
2✔
202
            # Unable to execute the program (e.g. does not exist)
203
            log.exception(ex)
2✔
204
            if fail:
2✔
205
                raise
2✔
206
            return {
2✔
207
                'process': None,
2✔
208
                'returncode': 2,
2✔
209
                'stdout': None,
2✔
210
                'stderr': None,
2✔
211
                'exception': ex
2✔
212
            }
213

214
        # Write to stdin (answer to questions, ...)
215
        if cli_input is not None:
2✔
216
            process.stdin.write(cli_input)
×
217
            process.stdin.flush()
×
218

219
        # Interact with the process and wait for the process to terminate
220
        if communicate:
2✔
221
            data: dict = {}
2✔
222
            thread = threading.Thread(
2✔
223
                target=_communicate_with_timeout,
2✔
224
                kwargs={'data': data, 'input': input, 'process': process})
2✔
225
            thread.start()
2✔
226
            thread.join(timeout=timeout)
2✔
227
            if thread.is_alive():
2✔
228
                try:
×
229
                    process.terminate()
×
230
                    thread.join()
×
231
                except OSError as ex:
×
232
                    # Manage race condition with process that may terminate just after the call to
233
                    # thread.is_alive() !
234
                    if ex.errno != errno.ESRCH:
×
235
                        raise
×
236
            stdout, stderr = data['stdout'], data['stderr']
2✔
237
        else:
×
238
            # get a return code that may be None of course ...
239
            process.poll()
×
240
            stdout = stderr = None
×
241

242
        result: CallResult = {
2✔
243
            'process': process,
2✔
244
            'returncode': process.returncode,
2✔
245
            'stdout': stdout,
2✔
246
            'stderr': stderr,
2✔
247
            'exception': None
2✔
248
        }
249

250
        if process.returncode in success_codes:
2✔
251
            break
2✔
252

253
        # failed attempt, may retry
254
        do_retry = trial < tries - 1
2✔
255
        delay = random.uniform(delay_min, delay_max)
2✔
256
        log.warning(' '.join([
2✔
257
            f'Attempt {trial + 1} out of {tries}:',
2✔
258
            f'Will retry in {delay} seconds' if do_retry else 'Failed'
2✔
259
        ]))
260

261
        # raise if this is the last try
262
        if fail and not do_retry:
2✔
263
            raise exceptions.CalledProcessError(
2✔
264
                cmd=process_cmd,
2✔
265
                returncode=process.returncode,
2✔
266
                stdout=stdout,
2✔
267
                stderr=stderr)
2✔
268

269
        if do_retry:
2✔
270
            time.sleep(delay)
2✔
271

272
    return result
2✔
273

274

275
# --------------------------------------------------------------------------------------------------
276

277

278
def make(
2✔
279
    archive: Path,
1✔
280
    directory: Path,
1✔
281
    *,
282
    with_cmake: bool = False,
2✔
283
    configure_options: str = '',
2✔
284
    install: bool = True,
2✔
285
    remove_temporary: bool = True,
2✔
286
    make_options: str = f'-j{multiprocessing.cpu_count()}',
2✔
287
    **kwargs
288
) -> dict[str, CallResult]:
1✔
289
    """Build and optionally install a piece of software from source."""
290
    results = {}
291
    setuptools.archive_util.unpack_archive(archive, directory)
292
    with filesystem.chdir(directory):
293
        if with_cmake:
294
            filesystem.makedirs(Path('build'))
295
            os.chdir('build')
296
            results['cmake'] = cmd('cmake -DCMAKE_BUILD_TYPE=RELEASE ..', **kwargs)
297
        else:
298
            results['configure'] = cmd(f'./configure {configure_options}', **kwargs)
299
        results['make'] = cmd(f'make {make_options}', **kwargs)
300
        if install:
301
            results['make install'] = cmd('make install', **kwargs)
302
    if remove_temporary:
303
        shutil.rmtree(directory)
304

305
    return results
306

307

308
# --------------------------------------------------------------------------------------------------
309

310

311
def rsync(  # pylint:disable=too-many-arguments,too-many-locals
2✔
312
    source: Path,
1✔
313
    destination: Path,
1✔
314
    *,
315
    source_is_dir: bool = False,
2✔
316
    destination_is_dir: bool = False,
2✔
317
    archive: bool = True,
2✔
318
    delete: bool = False,
2✔
319
    exclude_vcs: bool = False,
2✔
320
    progress: bool = False,
2✔
321
    recursive: bool = False,
2✔
322
    simulate: bool = False,
2✔
323
    excludes: Iterable[str] | None = None,
2✔
324
    includes: Iterable[str] | None = None,
2✔
325
    rsync_path: Path | None = None,
2✔
326
    size_only: bool = False,
2✔
327
    extra: str | None = None,
2✔
328
    extra_args: Iterable[CallArgType] | None = None,
2✔
329
    **kwargs
330
) -> CallResult:
1✔
331
    """Execute the famous rsync remote (or local) synchronization tool."""
332
    source_string = str(source)
333
    if source.is_dir() or source_is_dir:
334
        source_string += os.sep
335

336
    destination_string = str(destination)
337
    if destination.is_dir() or destination_is_dir:
338
        destination_string += os.sep
339

340
    command: list[CallArgType] = [
341
        'rsync',
342
        '-a' if archive else None,
343
        '--delete' if delete else None,
344
        '--progress' if progress else None,
345
        '-r' if recursive else None,
346
        '--dry-run' if simulate else None,
347
        '--size-only' if size_only else None
348
    ]
349

350
    if rsync_path is not None:
351
        command += ['--rsync-path', rsync_path]
352
    if extra is not None:
353
        command += ['-e', extra]
354
    if excludes is not None:
355
        command += [f'--exclude={e}' for e in excludes]
356
    if includes is not None:
357
        command += [f'--include={i}' for i in includes]
358
    if exclude_vcs:
359
        command += ['--exclude=.svn', '--exclude=.git']
360
    if extra_args is not None:
361
        command.extend(extra_args)
362
    command += [source_string, destination_string]
363

364
    return cmd([c for c in command if c], **kwargs)
365

366

367
def screen_kill(name: str | None = None, *, fail: bool = True, **kwargs):
2✔
368
    """Kill all screen instances called `name` or all if `name` is None."""
369
    for instance_name in screen_list(name=name, **kwargs):
370
        cmd(['screen', '-S', instance_name, '-X', 'quit'], fail=fail, **kwargs)
371

372

373
def screen_launch(name: str, command: CallArgsType, **kwargs) -> CallResult:
374
    """Launch a new named screen instance."""
375
    return cmd(['screen', '-dmS', name, *to_args_list(command)], **kwargs)
2✔
376

377

378
def screen_list(name: str | None = None, **kwargs) -> list[str]:
2✔
379
    """Returns a list containing all instances of screen. Can be filtered by `name`."""
380
    screens = cmd(['screen', '-ls', name], fail=False, **kwargs)['stdout']
381
    return re.findall(r'\s+(\d+.\S+)\s+\(.*\).*', (screens or b'').decode('utf-8'))
382

383

384
__all__ = _all.diff(globals())
385

386

387
# Deprecated ---------------------------------------------------------------------------------------
388

389

390
@deprecated('Use pytoolbox.git.clone_or_pull instead (drop-in replacement)')
391
def git_clone_or_pull(*args, **kwargs) -> None:  # pragma: no cover
392
    from pytoolbox.git import clone_or_pull  # pylint:disable=import-outside-toplevel
393
    return clone_or_pull(*args, **kwargs)
394

395

396
@deprecated('Use pytoolbox.ssh.ssh instead (drop-in replacement)')
397
def ssh(*args, **kwargs) -> dict:  # pragma: no cover
398
    from pytoolbox.ssh import ssh as _ssh
399
    return _ssh(*args, **kwargs)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc