• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

karellen / kubernator / 18180360618

02 Oct 2025 01:15AM UTC coverage: 75.914% (-0.1%) from 76.019%
18180360618

push

github

arcivanov
Release 1.0.21

615 of 961 branches covered (64.0%)

Branch coverage included in aggregate %.

2395 of 3004 relevant lines covered (79.73%)

4.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.43
/src/main/python/kubernator/proc.py
1
# -*- coding: utf-8 -*-
2
#
3
#   Copyright 2020 Express Systems USA, Inc
4
#   Copyright 2021 Karellen, Inc.
5
#
6
#   Licensed under the Apache License, Version 2.0 (the "License");
7
#   you may not use this file except in compliance with the License.
8
#   You may obtain a copy of the License at
9
#
10
#       http://www.apache.org/licenses/LICENSE-2.0
11
#
12
#   Unless required by applicable law or agreed to in writing, software
13
#   distributed under the License is distributed on an "AS IS" BASIS,
14
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
#   See the License for the specific language governing permissions and
16
#   limitations under the License.
17
#
18

19
import logging
6✔
20
import os
6✔
21
from collections.abc import Callable
6✔
22
from functools import partial
6✔
23
from io import BytesIO
6✔
24
from subprocess import Popen, PIPE, DEVNULL, CalledProcessError, TimeoutExpired
6✔
25
from typing import Union, IO, BinaryIO, TextIO, AnyStr, Iterable
6✔
26

27
from gevent import spawn, Timeout
6✔
28

29
from kubernator.api import StringIO
6✔
30

31
__all__ = ["run"]
6✔
32

33
logger = logging.getLogger("kubernator.proc")
6✔
34

35

36
def stream_writer_buf(pipe: BinaryIO, source):
6✔
37
    with pipe:
×
38
        if isinstance(source, Callable):
×
39
            for buf in source():
×
40
                pipe.write(buf)
×
41
        else:
42
            pipe.write(source)
×
43

44

45
def stream_writer_text(pipe: TextIO, source):
6✔
46
    with pipe:
6✔
47
        if isinstance(source, Callable):
6!
48
            pipe.writelines(source())
×
49
        else:
50
            pipe.write(source)
6✔
51

52

53
def stream_reader_buf(pipe: BinaryIO, sink_func):
6✔
54
    buf = bytearray(16384)
×
55
    while read := pipe.readinto(buf):
×
56
        sink_func(memoryview(buf)[:read])
×
57

58

59
def stream_reader_line(pipe: TextIO, sink_func):
6✔
60
    for line in pipe:
6✔
61
        sink_func(line)
6✔
62

63

64
class ProcessRunner:
6✔
65
    def __init__(self, args,
6✔
66
                 stdout: Union[None, int, IO, Callable[[AnyStr], None]],
67
                 stderr: Union[None, int, IO, Callable[[AnyStr], None]],
68
                 stdin: Union[None, int, bytes, str, IO, Callable[[], Iterable[AnyStr]]] = DEVNULL,
69
                 *,
70
                 safe_args=None, universal_newlines=True, **kwargs):
71
        self._safe_args = safe_args or args
6✔
72
        logger.trace("Starting %r", self._safe_args)
6✔
73

74
        if "env" not in kwargs:
6✔
75
            kwargs["env"] = os.environ
6✔
76

77
        self._proc = Popen(args,
6✔
78
                           stdout=PIPE if isinstance(stdout, Callable) else (stdout if stdout is not None else DEVNULL),
79
                           stderr=PIPE if isinstance(stderr, Callable) else (stderr if stderr is not None else DEVNULL),
80
                           stdin=PIPE if isinstance(stdin, (Callable, bytes, str)) else
81
                           (stdin if stdin is not None else DEVNULL),
82
                           universal_newlines=universal_newlines,
83
                           **kwargs)
84

85
        self._stdin_writer = (spawn(partial(stream_writer_text if universal_newlines else stream_writer_buf,
6✔
86
                                            self._proc.stdin, stdin))
87
                              if isinstance(stdin, (Callable, bytes, str)) else None)
88
        self._stdout_reader = spawn(partial(stream_reader_line if universal_newlines else stream_reader_buf,
6✔
89
                                            self._proc.stdout, stdout)) if isinstance(stdout, Callable) else None
90
        self._stderr_reader = spawn(partial(stream_reader_line if universal_newlines else stream_reader_buf,
6✔
91
                                            self._proc.stderr, stderr)) if isinstance(stderr, Callable) else None
92

93
    @property
6✔
94
    def stdout(self):
6✔
95
        if not self._stdout_reader:
×
96
            raise RuntimeError("not available")
×
97
        return self._proc.stdout
×
98

99
    @property
6✔
100
    def stderr(self):
6✔
101
        if not self._stderr_reader:
×
102
            raise RuntimeError("not available")
×
103
        return self._proc.stderr
×
104

105
    @property
6✔
106
    def stdin(self):
6✔
107
        if not self._stdin_writer:
×
108
            raise RuntimeError("not available")
×
109
        return self._proc.stdin
×
110

111
    def wait(self, fail=True, timeout=None, _out_func=None, _stderr_func=None):
6✔
112
        with Timeout(timeout, TimeoutExpired):
6✔
113
            retcode = self._proc.wait()
6✔
114
            if self._stdin_writer:
6!
115
                self._stdin_writer.join()
×
116
            if self._stdout_reader:
6!
117
                self._stdout_reader.join()
×
118
            if self._stderr_reader:
6!
119
                self._stderr_reader.join()
×
120
        if fail and retcode:
6✔
121
            output = None
6✔
122
            stderr = None
6✔
123
            if _out_func:
6!
124
                output = _out_func()
6✔
125
            if _stderr_func:
6!
126
                stderr = _stderr_func()
×
127
            raise CalledProcessError(retcode, self._safe_args, output=output, stderr=stderr)
6✔
128
        return retcode
6✔
129

130
    def terminate(self):
6✔
131
        self._proc.terminate()
×
132

133
    def kill(self):
6✔
134
        self._proc.kill()
×
135

136

137
run = ProcessRunner
6✔
138

139

140
def run_pass_through_capturing(args, stdout_logger, stderr_logger, stdin=DEVNULL, *, safe_args=None,
6✔
141
                               universal_newlines=True, **kwargs):
142
    out = StringIO(trimmed=False) if universal_newlines else BytesIO()
6✔
143
    err = StringIO(trimmed=False) if universal_newlines else BytesIO()
6✔
144

145
    def write_out(data):
6✔
146
        out.write(data)
6✔
147
        stdout_logger(data)
6✔
148

149
    def write_err(data):
6✔
150
        err.write(data)
×
151
        stderr_logger(data)
×
152

153
    proc = run(args, write_out, write_err, stdin, safe_args=safe_args, universal_newlines=universal_newlines,
6✔
154
               **kwargs)
155
    proc.wait(_out_func=lambda: out.getvalue(), _stderr_func=lambda: err.getvalue())
6!
156
    return out.getvalue(), err.getvalue()
6✔
157

158

159
def run_capturing_out(args, stderr_logger, stdin=DEVNULL, *, safe_args=None, universal_newlines=True, **kwargs):
6✔
160
    out = StringIO(trimmed=False) if universal_newlines else BytesIO()
6✔
161
    proc = run(args, out.write, stderr_logger, stdin, safe_args=safe_args, universal_newlines=universal_newlines,
6✔
162
               **kwargs)
163
    proc.wait(_out_func=lambda: out.getvalue())
6✔
164
    return out.getvalue()
6✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc