• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tarantool / test-run / 9430631951

08 Jun 2024 06:11PM UTC coverage: 62.574% (-0.03%) from 62.605%
9430631951

push

github

ylobankov
Calculate parallel jobs based on available CPUs

We're going to enable new ARM64 runners in CI, where test-run is invoked
in a Docker container. At the same time, the runner is an LXD container
created by the service provider. In this circumstances, the Docker
container sees all the 128 online CPUs, but the runner may have
only some of them available (depending on the pricing plan).

Python 3.3+ has a function to determine available CPUs, so we can reduce
the parallelism to this value. We fall back to the online CPUs count on
Python < 3.3.

After this change, test-run follows a CPU affinity mask set by `taskset`
(and, I guess, by `numactl`).

The change is similar to replacing `nproc --all` to `nproc`.

The change only affects the default behavior, which can be overwritten
by passing the `--jobs` (or `-j`) CLI option or using the
`TEST_RUN_JOBS` environment variable.

Reported in https://github.com/tarantool/tarantool/pull/10102

761 of 1568 branches covered (48.53%)

Branch coverage included in aggregate %.

8 of 11 new or added lines in 2 files covered. (72.73%)

2 existing lines in 1 file now uncovered.

2959 of 4377 relevant lines covered (67.6%)

0.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.85
/lib/utils.py
1
import errno
1✔
2
import os
1✔
3
import sys
1✔
4
import collections
1✔
5
import signal
1✔
6
import fcntl
1✔
7
import difflib
1✔
8
import time
1✔
9
import json
1✔
10
import subprocess
1✔
11
import multiprocessing
1✔
12
from lib.colorer import color_stdout
1✔
13

14
try:
1✔
15
    # Python3.5 or above
16
    from signal import Signals
1✔
17
except ImportError:
×
18
    # Python2
19
    Signals = None
×
20

21
try:
1✔
22
    # Python 3.3+.
23
    from shlex import quote as _shlex_quote
1✔
24
except ImportError:
×
25
    # Python 2.7.
26
    from pipes import quote as _shlex_quote
×
27

28
try:
1✔
29
    # Python 3.3+.
30
    from shutil import get_terminal_size
1✔
31
except ImportError:
×
32
    # Python 2.7.
33
    get_terminal_size = None
×
34

35
try:
1✔
36
    # Python 3.3+
37
    from os import sched_getaffinity
1✔
NEW
38
except ImportError:
×
NEW
39
    sched_getaffinity = None
×
40

41
UNIX_SOCKET_LEN_LIMIT = 107
1✔
42

43
# Useful for very coarse version differentiation.
44
PY3 = sys.version_info[0] == 3
1✔
45
PY2 = sys.version_info[0] == 2
1✔
46

47
if PY2:
1!
48
    FileNotFoundError = IOError
×
49

50
if PY3:
1!
51
    string_types = str,
1✔
52
    integer_types = int,
1✔
53
else:
54
    string_types = basestring,      # noqa: F821
×
55
    integer_types = (int, long)     # noqa: F821
×
56

57

58
def check_libs():
1✔
59
    deps = [
1✔
60
        ('msgpack', 'msgpack-python'),
61
        ('tarantool', 'tarantool-python')
62
    ]
63
    base_path = os.path.dirname(os.path.abspath(__file__))
1✔
64

65
    for (mod_name, mod_dir) in deps:
1✔
66
        mod_path = os.path.join(base_path, mod_dir)
1✔
67
        if mod_path not in sys.path:
1!
68
            sys.path = [mod_path] + sys.path
1✔
69

70
    for (mod_name, _mod_dir) in deps:
1✔
71
        try:
1✔
72
            __import__(mod_name)
1✔
73
        except ImportError as e:
×
74
            color_stdout("\n\nNo %s library found\n" % mod_name,
×
75
                         schema='error')
76
            print(e)
×
77
            sys.exit(1)
×
78

79

80
def non_empty_valgrind_logs(paths_to_log):
1✔
81
    """ Check that there were no warnings in the log."""
82
    non_empty_logs = []
×
83
    for path_to_log in paths_to_log:
×
84
        if os.path.exists(path_to_log) and os.path.getsize(path_to_log) != 0:
×
85
            non_empty_logs.append(path_to_log)
×
86
    return non_empty_logs
×
87

88

89
def print_tail_n(filename, num_lines=None):
1✔
90
    """ Print N last lines of a file. If num_lines is not set,
91
    prints the whole file.
92
    """
93
    with open(filename, "r") as logfile:
×
94
        tail_n = collections.deque(logfile, num_lines)
×
95
        for line in tail_n:
×
96
            color_stdout(line, schema='tail')
×
97

98

99
def find_in_path(name):
1✔
100
    path = os.curdir + os.pathsep + os.environ["PATH"]
×
101
    for _dir in path.split(os.pathsep):
×
102
        exe = os.path.join(_dir, name)
×
103
        if os.access(exe, os.X_OK):
×
104
            return exe
×
105
    return ''
×
106

107

108
# http://stackoverflow.com/a/2549950
109
SIGNAMES = dict((int(v), k) for k, v in reversed(sorted(
1✔
110
    signal.__dict__.items())) if k.startswith('SIG') and
111
    not k.startswith('SIG_'))
112
SIGNUMS = dict((k, int(v)) for k, v in reversed(sorted(
1✔
113
    signal.__dict__.items())) if k.startswith('SIG') and
114
    not k.startswith('SIG_'))
115

116

117
def signame(signal):
1✔
118
    if isinstance(signal, integer_types):
1!
119
        return SIGNAMES[signal]
1✔
120
    if Signals and isinstance(signal, Signals):
×
121
        return SIGNAMES[int(signal)]
×
122
    if isinstance(signal, string_types):
×
123
        return signal
×
124
    raise TypeError('signame(): signal argument of unexpected type: {}'.format(
×
125
                    str(type(signal))))
126

127

128
def signum(signal):
1✔
129
    if isinstance(signal, integer_types):
×
130
        return signal
×
131
    if Signals and isinstance(signal, Signals):
×
132
        return int(signal)
×
133
    if isinstance(signal, string_types):
×
134
        if not signal.startswith('SIG'):
×
135
            signal = 'SIG' + signal
×
136
        return SIGNUMS[signal]
×
137
    raise TypeError('signum(): signal argument of unexpected type: {}'.format(
×
138
                    str(type(signal))))
139

140

141
def warn_unix_sockets_at_start(vardir):
1✔
142
    max_unix_socket_rel = '???_replication/autobootstrap_guest3.control'
1✔
143
    real_vardir = os.path.realpath(vardir)
1✔
144
    max_unix_socket_abs = os.path.join(real_vardir, max_unix_socket_rel)
1✔
145
    max_unix_socket_real = os.path.realpath(max_unix_socket_abs)
1✔
146
    if len(max_unix_socket_real) > UNIX_SOCKET_LEN_LIMIT:
1!
147
        color_stdout(
×
148
            'WARGING: unix sockets can become longer than %d symbols:\n'
149
            % UNIX_SOCKET_LEN_LIMIT,
150
            schema='error')
151
        color_stdout('WARNING: for example: "%s" has length %d\n' %
×
152
                     (max_unix_socket_real, len(max_unix_socket_real)),
153
                     schema='error')
154

155

156
def warn_unix_socket(path):
1✔
157
    real_path = os.path.realpath(path)
1✔
158
    if len(real_path) <= UNIX_SOCKET_LEN_LIMIT or \
1!
159
            real_path in warn_unix_socket.warned:
160
        return
1✔
161
    color_stdout(
×
162
        '\nWARGING: unix socket\'s "%s" path has length %d symbols that is '
163
        'longer than %d. That likely will cause failing of tests.\n' %
164
        (real_path, len(real_path), UNIX_SOCKET_LEN_LIMIT), schema='error')
165
    warn_unix_socket.warned.add(real_path)
×
166

167

168
warn_unix_socket.warned = set()
1✔
169

170

171
def safe_makedirs(directory):
1✔
172
    if os.path.isdir(directory):
1✔
173
        return
1✔
174
    # try-except to prevent races btw processes
175
    try:
1✔
176
        os.makedirs(directory)
1✔
177
    except OSError:
×
178
        pass
×
179

180

181
def format_process(pid):
1✔
182
    cmdline = 'unknown'
1✔
183
    try:
1✔
184
        with open('/proc/%d/cmdline' % pid, 'r') as f:
1✔
185
            cmdline = ' '.join(f.read().split('\0')).strip() or cmdline
1✔
186
    except (OSError, IOError):
1✔
187
        pass
1✔
188
    status = 'unknown'
1✔
189
    try:
1✔
190
        with open('/proc/%d/status' % pid, 'r') as f:
1✔
191
            for line in f:
1✔
192
                if ':' not in line:
1!
193
                    continue
×
194
                key, value = line.split(':', 1)
1✔
195
                if key == 'State':
1✔
196
                    status = value.strip()
1✔
197
    except (OSError, IOError):
1✔
198
        pass
1✔
199
    return 'process %d [%s; %s]' % (pid, status, cmdline)
1✔
200

201

202
def proc_stat_rss_supported():
1✔
203
    return os.path.isfile('/proc/%d/status' % os.getpid())
1✔
204

205

206
def get_proc_stat_rss(pid):
1✔
207
    rss = 0
1✔
208
    try:
1✔
209
        with open('/proc/%d/status' % pid, 'r') as f:
1✔
210
            for line in f:
1✔
211
                if ':' not in line:
1!
212
                    continue
×
213
                key, value = line.split(':', 1)
1✔
214
                if key == 'VmRSS':
1✔
215
                    rss = int(value.strip().split()[0])
1✔
UNCOV
216
    except (OSError, IOError):
×
UNCOV
217
        pass
×
218
    return rss
1✔
219

220

221
def set_fd_cloexec(socket):
1✔
222
    flags = fcntl.fcntl(socket, fcntl.F_GETFD)
1✔
223
    fcntl.fcntl(socket, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
1✔
224

225

226
def print_unidiff(filepath_a, filepath_b):
1✔
227
    def process_file(filepath):
×
228
        fh = None
×
229
        try:
×
230
            fh = open(filepath, 'r')
×
231
            lines = fh.readlines()
×
232
            ctime = time.ctime(os.stat(filepath).st_mtime)
×
233
        except Exception:
×
234
            if not os.path.exists(filepath):
×
235
                color_stdout('[File does not exist: {}]\n'.format(filepath),
×
236
                             schema='error')
237
            lines = []
×
238
            ctime = time.ctime()
×
239
        if fh:
×
240
            fh.close()
×
241
        return lines, ctime
×
242

243
    lines_a, time_a = process_file(filepath_a)
×
244
    lines_b, time_b = process_file(filepath_b)
×
245
    diff = difflib.unified_diff(lines_a,
×
246
                                lines_b,
247
                                filepath_a,
248
                                filepath_b,
249
                                time_a,
250
                                time_b)
251
    color_stdout.writeout_unidiff(diff)
×
252

253

254
def prefix_each_line(prefix, data):
1✔
255
    data = data.rstrip('\n')
1✔
256
    lines = [(line + '\n') for line in data.split('\n')]
1✔
257
    return prefix + prefix.join(lines)
1✔
258

259

260
def just_and_trim(src, width):
1✔
261
    if len(src) > width:
1✔
262
        return src[:width - 1] + '>'
1✔
263
    return src.ljust(width)
1✔
264

265

266
def xlog_rows(xlog_path):
1✔
267
    """ Parse xlog / snapshot file.
268

269
        Assume tarantool and tarantoolctl is in PATH.
270
    """
271
    if not os.path.exists(xlog_path):
1!
272
        raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), xlog_path)
×
273
    cmd = ['tarantoolctl', 'cat', xlog_path, '--format=json', '--show-system']
1✔
274
    with open(os.devnull, 'w') as devnull:
1✔
275
        process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=devnull)
1✔
276
    for line in process.stdout.readlines():
1!
277
        yield json.loads(bytes_to_str(line))
1✔
278

279

280
def extract_schema_from_snapshot(snapshot_path):
1✔
281
    """
282
    Extract schema version from snapshot.
283

284
    Assume tarantool and tarantoolctl is in PATH.
285

286
    Example of record:
287

288
     {
289
       "HEADER": {"lsn":2, "type": "INSERT", "timestamp": 1584694286.0031},
290
       "BODY": {"space_id": 272, "tuple": ["version", 2, 3, 1]}
291
     }
292

293
    :returns: (2, 3, 1)
294
    """
295
    BOX_SCHEMA_ID = 272
1✔
296
    for row in xlog_rows(snapshot_path):
1!
297
        if row['HEADER']['type'] == 'INSERT' and \
1!
298
           row['BODY']['space_id'] == BOX_SCHEMA_ID:
299
            res = row['BODY']['tuple']
1✔
300
            if res[0] == 'version':
1✔
301
                return tuple(res[1:])
1✔
302
    return None
×
303

304

305
def assert_bytes(b):
1✔
306
    """ Ensure given value is <bytes>.
307
    """
308
    if type(b) is not bytes:
1!
309
        raise ValueError('Internal error: expected {}, got {}: {}'.format(
×
310
            str(bytes), str(type(b)), repr(b)))
311

312

313
def assert_str(s):
1✔
314
    """ Ensure given value is <str>.
315
    """
316
    if type(s) is not str:
1!
317
        raise ValueError('Internal error: expected {}, got {}: {}'.format(
×
318
            str(str), str(type(s)), repr(s)))
319

320

321
def bytes_to_str(b):
1✔
322
    """ Convert <bytes> to <str>.
323

324
        No-op on Python 2.
325
    """
326
    assert_bytes(b)
1✔
327
    if PY2:
1!
328
        return b
×
329
    return b.decode('utf-8')
1✔
330

331

332
def str_to_bytes(s):
1✔
333
    """ Convert <str> to <bytes>.
334

335
        No-op on Python 2.
336
    """
337
    assert_str(s)
1✔
338
    if PY2:
1!
339
        return s
×
340
    return s.encode('utf-8')
1✔
341

342

343
def parse_tag_line(line):
1✔
344
    tags_str = line.split(':', 1)[1].strip()
×
345
    return [tag.strip() for tag in tags_str.split(',')]
×
346

347

348
def find_tags(filename):
1✔
349
    """ Extract tags from a first comment in the file.
350
    """
351
    # TODO: Support multiline comments. See exclude_tests() in
352
    # lib/server.py.
353
    if filename.endswith('.lua') or filename.endswith('.sql'):
×
354
        singleline_comment = '--'
×
355
    elif filename.endswith('.py'):
×
356
        singleline_comment = '#'
×
357
    else:
358
        return []
×
359

360
    tags = []
×
361
    with open(filename, 'r') as f:
×
362
        for line in f:
×
363
            line = line.rstrip('\n')
×
364
            if line.startswith('#!'):
×
365
                pass
×
366
            elif line == '':
×
367
                pass
×
368
            elif line.startswith(singleline_comment + ' tags:'):
×
369
                tags.extend(parse_tag_line(line))
×
370
            elif line.startswith(singleline_comment):
×
371
                pass
×
372
            else:
373
                break
×
374
    return tags
×
375

376

377
def prepend_path(p):
1✔
378
    """ Add an absolute path into PATH (at start) if it is not already there.
379
    """
380
    p = os.path.abspath(p)
1✔
381
    if p in os.environ['PATH'].split(os.pathsep):
1✔
382
        return
1✔
383
    os.environ['PATH'] = os.pathsep.join((p, os.environ['PATH']))
1✔
384

385

386
def shlex_quote(s):
1✔
387
    return _shlex_quote(s)
×
388

389

390
def terminal_columns():
1✔
391
    if get_terminal_size:
1!
392
        return get_terminal_size().columns
1✔
393
    return 80
×
394

395

396
def cpu_count():
1✔
397
    """
398
    Return available CPU count available for the current process.
399

400
    The result is the same as one from the `nproc` command.
401

402
    It may be smaller than all the online CPUs count. For example,
403
    an LXD container may have limited available CPUs or it may be
404
    reduced by `taskset` or `numactl` commands.
405

406
    If it is impossible to determine the available CPUs count (for
407
    example on Python < 3.3), fallback to the all online CPUs
408
    count.
409
    """
410
    if sched_getaffinity:
1!
411
        return len(sched_getaffinity(0))
1✔
NEW
412
    return multiprocessing.cpu_count()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc