• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

INTI-CMNB / KiAuto / 6981974383

24 Nov 2023 02:31PM UTC coverage: 88.592%. First build
6981974383

push

github

set-soft
Forced test

3021 of 3410 relevant lines covered (88.59%)

3.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.69
/kiauto/ui_automation.py
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3
# Copyright (c) 2020-2022 Salvador E. Tropea
4
# Copyright (c) 2020-2022 Instituto Nacional de Tecnologïa Industrial
5
# Copyright (c) 2019 Jesse Vincent (@obra)
6
# Copyright (c) 2018-2019 Seppe Stas (@seppestas) (Productize SPRL)
7
# Copyright (c) 2015-2016 Scott Bezek (@scottbez1)
8
# License: Apache 2.0
9
# Project: KiAuto (formerly kicad-automation-scripts)
10
# Adapted from: https://github.com/obra/kicad-automation-scripts
11
"""
6✔
12
Utility functions for UI automation with xdotool in a virtual framebuffer
13
with XVFB. Also includes utilities for accessing the clipboard for easily
14
and efficiently copy-pasting strings in the UI.
15

16
Based on splitflap/electronics/scripts/export_util.py by Scott Bezek
17
"""
18
import argparse
6✔
19
from contextlib import contextmanager
6✔
20
import os
6✔
21
import re
6✔
22
import shutil
6✔
23
import signal
6✔
24
from subprocess import Popen, CalledProcessError, TimeoutExpired, call, check_output, STDOUT, DEVNULL, run, PIPE
6✔
25
import sys
6✔
26
from tempfile import mkdtemp
6✔
27
import time
6✔
28
# python3-xvfbwrapper
29
from xvfbwrapper import Xvfb
6✔
30
from kiauto.file_util import get_log_files
6✔
31
from kiauto.misc import KICAD_VERSION_5_99, MISSING_TOOL, KICAD_DIED, __version__
6✔
32
import kiauto.misc
6✔
33

34
from kiauto import log
6✔
35
logger = log.get_logger(__name__)
6✔
36
time_out_scale = 1.0
6✔
37
img_tmp_dir = None
6✔
38

39

40
def set_time_out_scale(scale):
6✔
41
    global time_out_scale
42
    time_out_scale = scale
6✔
43

44

45
class PopenContext(Popen):
6✔
46

47
    def __exit__(self, type, value, traceback):
6✔
48
        logger.debug("Closing pipe with %d", self.pid)
6✔
49
        # Note: currently we don't communicate with the child so these cases are never used.
50
        # I keep them in case they are needed, but excluded from the coverage.
51
        # Also note that closing stdin needs extra handling, implemented in the parent class
52
        # but not here.
53
        # This can generate a deadlock
54
        # if self.stdout:
55
        #     self.stdout.close()  # pragma: no cover
56
        if self.stderr:
6✔
57
            self.stderr.close()  # pragma: no cover
58
        if self.stdin:
6✔
59
            self.stdin.close()   # pragma: no cover
60
        if type:
6✔
61
            logger.debug("Terminating %d", self.pid)
5✔
62
            # KiCad nightly uses a shell script as intermediate to run setup the environment
63
            # and run the proper binary. If we simply call "terminate" we just kill the
64
            # shell script. So we create a group and then kill the whole group.
65
            try:
5✔
66
                os.killpg(os.getpgid(self.pid), signal.SIGTERM)
5✔
67
            except ProcessLookupError:
×
68
                pass
×
69
            # self.terminate()
70
        # Wait for the process to terminate, to avoid zombies.
71
        try:
6✔
72
            # Wait for 3 seconds
73
            self.wait(3)
6✔
74
            retry = False
6✔
75
        except TimeoutExpired:  # pragma: no cover
76
            # The process still alive after 3 seconds
77
            retry = True
78
            pass
79
        if retry:  # pragma: no cover
80
            logger.debug("Killing %d", self.pid)
81
            # We shouldn't get here. Kill the process and wait upto 10 seconds
82
            os.killpg(os.getpgid(self.pid), signal.SIGKILL)
83
            # self.kill()
84
            self.wait(10)
85

86

87
def wait_xserver(out_dir, num_try):
6✔
88
    global time_out_scale
89
    timeout = 10*time_out_scale
6✔
90
    DELAY = 0.5
6✔
91
    logger.debug('Waiting for virtual X server ...')
6✔
92
    try:
6✔
93
        logger.debug('Current DISPLAY is '+os.environ['DISPLAY'])
6✔
94
    except KeyError:
×
95
        logger.error('Missing DISPLAY on wait_xserver!')
×
96
    if shutil.which('setxkbmap'):
6✔
97
        cmd = ['setxkbmap', '-query']
6✔
98
    elif shutil.which('xset'):  # pragma: no cover
99
        cmd = ['xset', 'q']
100
    else:  # pragma: no cover
101
        cmd = ['ls']
102
        logger.warning('No setxkbmap nor xset available, unable to verify if X is running')
103
    for i in range(int(timeout/DELAY)):
6✔
104
        logger.debug('Checking using '+str(cmd))
6✔
105
        flog_out, flog_err, _ = get_log_files(out_dir, cmd[0])
6✔
106
        ret = call(cmd, stdout=flog_out, stderr=flog_err, close_fds=True)
6✔
107
        if not ret:
6✔
108
            time_wait = 0.5*(num_try+1)*(num_try+1)*time_out_scale
6✔
109
            if log.get_level() > 2:
6✔
110
                logger.debug(str(cmd)+' returned 0')
6✔
111
                logger.debug('Waiting {} seconds before using the X server'.format(time_wait))
6✔
112
            # On GitLab I saw setxkbmap success and recordmydesktop and KiCad failing to connect to X
113
            # One possible solution is a wait here.
114
            # Another is detecting KiCad exited.
115
            time.sleep(time_wait)
6✔
116
            return
6✔
117
        logger.debug('   Retry')
×
118
        time.sleep(DELAY)
×
119
    raise RuntimeError('Timed out waiting for virtual X server')
120

121

122
def wait_wm():
6✔
123
    global time_out_scale
124
    timeout = 10*time_out_scale
6✔
125
    DELAY = 0.5
6✔
126
    logger.debug('Waiting for Window Manager ...')
6✔
127
    if shutil.which('wmctrl'):
6✔
128
        cmd = ['wmctrl', '-m']
6✔
129
    else:  # pragma: no cover
130
        logger.warning('No wmctrl, unable to verify if WM is running')
131
        time.sleep(2)
132
        return
133
    logger.debug('Checking using '+str(cmd))
6✔
134
    for i in range(int(timeout/DELAY)):
6✔
135
        ret = call(cmd, stdout=DEVNULL, stderr=STDOUT, close_fds=True)
6✔
136
        if not ret:
6✔
137
            return
6✔
138
        logger.debug('   Retry')
6✔
139
        time.sleep(DELAY)
6✔
140
    raise RuntimeError('Timed out waiting for WM server')
141

142

143
@contextmanager
6✔
144
def start_wm(do_it):
6✔
145
    if do_it:
6✔
146
        cmd = ['fluxbox']
6✔
147
        logger.debug('Starting WM: '+str(cmd))
6✔
148
        with PopenContext(cmd, stdout=DEVNULL, stderr=DEVNULL, close_fds=True, start_new_session=True) as wm_proc:
6✔
149
            wait_wm()
6✔
150
            try:
6✔
151
                yield
6✔
152
            finally:
153
                logger.debug('Terminating the WM')
6✔
154
                # Fluxbox sometimes will ignore SIGTERM, we can just kill it
155
                wm_proc.kill()
6✔
156
    else:
157
        yield
6✔
158

159

160
@contextmanager
6✔
161
def start_record(do_record, video_dir, video_name):
6✔
162
    if do_record:
6✔
163
        video_filename = os.path.join(video_dir, video_name)
6✔
164
        cmd = ['recordmydesktop', '--overwrite', '--no-sound', '--no-frame', '--on-the-fly-encoding',
6✔
165
               '-o', video_filename]
166
        logger.debug('Recording session with: '+str(cmd))
6✔
167
        flog_out, flog_err, _ = get_log_files(video_dir, cmd[0])
6✔
168
        with PopenContext(cmd, stdout=flog_out, stderr=flog_err, close_fds=True, start_new_session=True) as screencast_proc:
6✔
169
            try:
6✔
170
                yield
6✔
171
            finally:
172
                logger.debug('Terminating the session recorder')
6✔
173
                screencast_proc.terminate()
6✔
174
    else:
175
        yield
6✔
176

177

178
@contextmanager
6✔
179
def start_x11vnc(do_it, old_display):
6✔
180
    if do_it:
6✔
181
        if not shutil.which('x11vnc'):
6✔
182
            logger.error("x11vnc isn't installed, please install it")
×
183
            yield
×
184
        else:
185
            cmd = ['x11vnc', '-display', os.environ['DISPLAY'], '-localhost']
6✔
186
            logger.debug('Starting VNC server: '+str(cmd))
6✔
187
            with PopenContext(cmd, stdout=DEVNULL, stderr=DEVNULL, close_fds=True, start_new_session=True) as x11vnc_proc:
6✔
188
                if old_display is None:
6✔
189
                    old_display = ':0'
6✔
190
                logger.debug('To monitor the Xvfb now you can start: "ssvncviewer '+old_display+'"(or similar)')
6✔
191
                try:
6✔
192
                    yield
6✔
193
                finally:
194
                    logger.debug('Terminating the x11vnc server')
6✔
195
                    x11vnc_proc.terminate()
6✔
196
    else:
197
        yield
6✔
198

199

200
@contextmanager
6✔
201
def recorded_xvfb(cfg, num_try=0):
6✔
202
    old_display = os.environ.get('DISPLAY')
6✔
203
    if cfg.record and shutil.which('recordmydesktop') is None:
6✔
204
        logger.error('To record the session please install `recordmydesktop`')
×
205
        cfg.record = False
×
206
    with Xvfb(width=cfg.rec_width, height=cfg.rec_height, colordepth=cfg.colordepth):
6✔
207
        wait_xserver(cfg.output_dir, num_try)
6✔
208
        with start_x11vnc(cfg.start_x11vnc, old_display):
6✔
209
            with start_wm(cfg.use_wm):
6✔
210
                with start_record(cfg.record, cfg.video_dir, cfg.video_name):
6✔
211
                    yield
6✔
212

213

214
def xdotool(command, id=None):
6✔
215
    if id is not None:
6✔
216
        command.insert(1, str(id))
6✔
217
        command.insert(1, '--window')
6✔
218
    logger.debug(['xdotool'] + command)
6✔
219
    return check_output(['xdotool'] + command, stderr=DEVNULL).decode()
6✔
220
    # return check_output(['xdotool'] + command)
221

222

223
# def clipboard_store(string):
224
#     # I don't know how to use Popen/run to make it run with pipes without
225
#     # either blocking or losing the messages.
226
#     # Using files works really well.
227
#     logger.debug('Clipboard store "'+string+'"')
228
#     # Write the text to a file
229
#     fd_in, temp_in = tempfile.mkstemp(text=True)
230
#     os.write(fd_in, string.encode())
231
#     os.close(fd_in)
232
#     # Capture output
233
#     fd_out, temp_out = tempfile.mkstemp(text=True)
234
#     process = Popen(['xclip', '-selection', 'clipboard', temp_in], stdout=fd_out, stderr=STDOUT)
235
#     ret_code = process.wait()
236
#     os.remove(temp_in)
237
#     os.lseek(fd_out, 0, os.SEEK_SET)
238
#     ret_text = os.read(fd_out, 1000)
239
#     os.close(fd_out)
240
#     os.remove(temp_out)
241
#     ret_text = ret_text.decode()
242
#     if ret_text:  # pragma: no cover
243
#         logger.error('Failed to store string in clipboard')
244
#         logger.error(ret_text)
245
#         raise
246
#     if ret_code:  # pragma: no cover
247
#         logger.error('Failed to store string in clipboard')
248
#         logger.error('xclip returned %d' % ret_code)
249
#         raise
250

251

252
def text_replace(string):
6✔
253
    """ Used to replace a text in an input text widget. """
254
    delay = str(int(12*time_out_scale))
6✔
255
    cmd = ['key', '--delay', delay, 'ctrl+a', 'type', '--delay', delay, string]
6✔
256
    logger.debug('text_replace with: {}'.format(cmd))
6✔
257
    xdotool(cmd)
6✔
258

259

260
def clipboard_retrieve():
6✔
261
    p = Popen(['xclip', '-o', '-selection', 'clipboard'], stdout=PIPE, stderr=STDOUT)
2✔
262
    output = ''
2✔
263
    for line in p.stdout:
2✔
264
        output += line.decode()
2✔
265
    logger.debug('Clipboard retrieve "'+output+'"')
2✔
266
    return output
2✔
267

268

269
def get_windows(all=False):
6✔
270
    cmd = ['search', '--name', '.*']
6✔
271
    if not all:
6✔
272
        cmd.insert(1, '--onlyvisible')
6✔
273
    ids = xdotool(cmd).splitlines()
6✔
274
    res = []
6✔
275
    for i in ids:
6✔
276
        name = ''
6✔
277
        try:
6✔
278
            name = xdotool(['getwindowname', i])[:-1]
6✔
279
        except CalledProcessError:
2✔
280
            name = '** No longer there **'
2✔
281
        res.append((i, name))
6✔
282
        if log.get_level() >= 2:
6✔
283
            logger.debug('get_windows {} {}'.format(i, name))
6✔
284
    if log.get_level() >= 2:
6✔
285
        logger.debug('get_windows end of list')
6✔
286
    return res
6✔
287

288

289
def debug_window(id=None):  # pragma: no cover
290
    if log.get_level() < 2:
291
        return
292
    if shutil.which('xprop'):
293
        if id is None:
294
            try:
295
                id = xdotool(['getwindowfocus']).rstrip()
296
            except CalledProcessError:
297
                logger.debug('xdotool getwindowfocus failed!')
298
                pass
299
        if id:
300
            call(['xprop', '-id', id])
301
    if shutil.which('vmstat'):
302
        call(['vmstat', '-s'])
303
    if shutil.which('uptime'):
304
        call(['uptime'])
305
    logger.debug("Visible windows:")
306
    for i in get_windows():
307
        logger.debug("Window ID: `{}` ; name: `{}`".format(i[0], i[1]))
308
    logger.debug("All windows:")
309
    for i in get_windows(all=True):
310
        logger.debug("Window ID: `{}` ; name: `{}`".format(i[0], i[1]))
311

312

313
def wait_focused(id, timeout=10):
6✔
314
    global time_out_scale
315
    timeout *= time_out_scale
6✔
316
    DELAY = 0.5
6✔
317
    logger.debug('Waiting for %s window to get focus...', id)
6✔
318
    for i in range(min(int(timeout/DELAY), 1)):
6✔
319
        cur_id = xdotool(['getwindowfocus']).rstrip()
6✔
320
        logger.debug('Currently focused id: %s', cur_id)
6✔
321
        if cur_id == id:
6✔
322
            return
6✔
323
        time.sleep(DELAY)
×
324
    debug_window(cur_id)  # pragma: no cover
325
    raise RuntimeError('Timed out waiting for %s window to get focus' % id)
326

327

328
def wait_not_focused(id, timeout=10):
6✔
329
    global time_out_scale
330
    timeout *= time_out_scale
3✔
331
    DELAY = 0.5
3✔
332
    logger.debug('Waiting for %s window to lose focus...', id)
3✔
333
    for i in range(int(timeout/DELAY)):
3✔
334
        try:
3✔
335
            cur_id = xdotool(['getwindowfocus']).rstrip()
3✔
336
        except CalledProcessError:
2✔
337
            # When no window is available xdotool receives ID=1 and exits with error
338
            return
2✔
339
        logger.debug('Currently focused id: %s', cur_id)
3✔
340
        if cur_id != id:
3✔
341
            return
3✔
342
        time.sleep(DELAY)
3✔
343
    debug_window(cur_id)  # pragma: no cover
344
    raise RuntimeError('Timed out waiting for %s window to lose focus' % id)
345

346

347
def search_visible_windows(regex, others=None):
6✔
348
    """ Workaround for problems in xdotool failing to match window names """
349
    r = re.compile(regex)
6✔
350
    others = [] if others is None else others
6✔
351
    others_rx = [re.compile(v) for v in others]
6✔
352
    found = []
6✔
353
    windows = get_windows()
6✔
354
    # First check if we have one of the "others"
355
    for c, rx in enumerate(others_rx):
6✔
356
        for i in windows:
3✔
357
            if rx.search(i[1]):
3✔
358
                # Yes, inform it
359
                # Note: The main window can be focused with a dialog over it.
360
                #       If we found one of these dialogs it means we have a problem, no matters if the main windows is focused
361
                raise ValueError(others[c])
3✔
362
    # Now check for the window we need
363
    for i in windows:
6✔
364
        if r.search(i[1]):
6✔
365
            found.append(i[0])
6✔
366
    return found
6✔
367

368

369
def wait_for_window(name, window_regex, timeout=10, focus=True, skip_id=0, others=None, popen_obj=None):
6✔
370
    global time_out_scale
371
    timeout *= time_out_scale
6✔
372
    DELAY = 0.5
6✔
373
    logger.info('Waiting for "%s" ...', name)
6✔
374
    if skip_id:
6✔
375
        logger.debug('Will skip %s', skip_id)
3✔
376

377
    for i in range(int(timeout/DELAY)):
6✔
378
        try:
6✔
379
            window_id = search_visible_windows(window_regex, others=others)
6✔
380
            if len(window_id):
6✔
381
                logger.debug('Found %s window (%d)', name, len(window_id))
6✔
382
                if len(window_id) == 1:
6✔
383
                    id = window_id[0]
6✔
384
                if len(window_id) > 1:
6✔
385
                    id = window_id[1]
6✔
386
                logger.debug('Window id: %s', id)
6✔
387
                if id != skip_id:
6✔
388
                    if focus:
6✔
389
                        xdotool(['windowfocus', '--sync', id])
6✔
390
                        wait_focused(id, timeout)
6✔
391
                    return window_id
6✔
392
                else:
393
                    logger.debug('Skipped')
3✔
394
        except CalledProcessError:
5✔
395
            if popen_obj and popen_obj.poll() is not None:
3✔
396
                raise
×
397
        # Check if we have a list of alternative windows
398
        if others:
5✔
399
            for other in others:
3✔
400
                window_id = search_visible_windows(other)
3✔
401
                if len(window_id):
3✔
402
                    raise ValueError(other)
×
403
        if popen_obj:
5✔
404
            # Is KiCad running?
405
            ret_code = popen_obj.poll()
5✔
406
            if ret_code is not None:
5✔
407
                raise CalledProcessError(ret_code, 'KiCad')
×
408
        time.sleep(DELAY)
5✔
409
    debug_window()  # pragma: no cover
410
    raise RuntimeError('Timed out waiting for %s window' % name)
411

412

413
def wait_point(cfg):
6✔
414
    if cfg.wait_for_key:
6✔
415
        input('Press a key')
×
416

417

418
def capture_window_region(window_id, x, y, w, h, name, to_capture=None):
6✔
419
    """ Capture a region of a window to a file """
420
    geometry = '{}x{}+{}+{}'.format(w, h, x, y)
2✔
421
    logger.debug('Capturing region {} from window {}'.format(geometry, window_id))
2✔
422
    name = os.path.join(img_tmp_dir, name)
2✔
423
    if not shutil.which('import'):
2✔
424
        logger.error("import isn't installed, please install it.\nThis is part of ImageMagick and GraphicsMagic packages.")
×
425
        sys.exit(MISSING_TOOL)
×
426
    res = check_output(['import', '-window', str(window_id), '-crop', geometry, name], stderr=DEVNULL,
2✔
427
                       timeout=to_capture).decode()
428
    logger.debug('Import output: ' + res)
2✔
429

430

431
def wait_window_get_ref(window_id, x, y, w, h):
6✔
432
    """ Takes a region of a window as reference image """
433
    global img_tmp_dir
434
    img_tmp_dir = mkdtemp(prefix='tmp-kiauto-images-')
2✔
435
    capture_window_region(window_id, x, y, w, h, "wait_ref.png")
2✔
436

437

438
def wait_window_change(window_id, x, y, w, h, time_out, to_capture):
6✔
439
    """ Waits for a change in a window region """
440
    for i in range(int(time_out + 0.9)):
2✔
441
        capture_window_region(window_id, x, y, w, h, "current.png", to_capture)
2✔
442
        current = os.path.join(img_tmp_dir, "current.png")
2✔
443
        wait_ref = os.path.join(img_tmp_dir, "wait_ref.png")
2✔
444
        difference = os.path.join(img_tmp_dir, "difference.png")
2✔
445
        res = run(['compare', '-fuzz', '5%', '-metric', 'AE', current, wait_ref, difference],
2✔
446
                  stderr=PIPE).stderr.decode()
447
        ae = int(res)
2✔
448
        logger.debug('Difference ' + res)
2✔
449
        if ae:
2✔
450
            shutil.rmtree(img_tmp_dir)
2✔
451
            return
2✔
452
        time.sleep(1)
2✔
453
    shutil.rmtree(img_tmp_dir)
×
454

455

456
def open_dialog_with_retry(msg, keys, desc, w_name, cfg, id_dest=None):
6✔
457
    logger.info(msg)
3✔
458
    wait_point(cfg)
3✔
459
    if cfg.kicad_version >= KICAD_VERSION_5_99:
3✔
460
        # KiCad 6 has a very slow start-up
461
        time.sleep(1)
2✔
462
    xdotool(keys, id=id_dest)
3✔
463
    retry = False
3✔
464
    try:
3✔
465
        id = wait_for_window(desc, w_name, popen_obj=cfg.popen_obj, others=['pcbnew Warning'])
3✔
466
    except RuntimeError:  # pragma: no cover
467
        # Perhaps the main window wasn't available yet
468
        retry = True
469
    except CalledProcessError as e:
1✔
470
        logger.error(str(e))
×
471
        sys.exit(KICAD_DIED)
×
472
    except ValueError:
1✔
473
        logger.warning('Got pcbnew warning dialog, trying to dismiss it')
1✔
474
        xdotool(['key', 'Return'])
1✔
475
        retry = True
1✔
476
    if retry:
3✔
477
        logger.info('"{}" did not open, retrying'.format(desc))
1✔
478
        # wait_eeschema_start(cfg)
479
        xdotool(keys, id=id_dest)
1✔
480
        try:
1✔
481
            id = wait_for_window(desc, w_name, popen_obj=cfg.popen_obj)
1✔
482
        except CalledProcessError as e:
×
483
            logger.error(str(e))
×
484
            sys.exit(KICAD_DIED)
×
485
    return id
3✔
486

487

488
def show_info():
6✔
489
    print("This is KiAuto v"+__version__)
×
490
    print("Installed at: "+os.path.abspath(sys.argv[0]))
×
491
    print("Using kiauto module from: "+os.path.dirname(kiauto.misc.__file__))
×
492
    print("Interpreted by Python: {} (v{})".format(sys.executable, sys.version.replace('\n', ' ')))
×
493
    print("Tools:")
×
494
    try:
×
495
        import pcbnew
×
496
        kicad_version = pcbnew.GetBuildVersion()
×
497
        print("- kicad: {} (v{})".format(shutil.which('kicad'), kicad_version))
×
498
    except ImportError:
×
499
        print("ERROR: Failed to import pcbnew Python module."
×
500
              " Is KiCad installed?"
501
              " Do you need to add it to PYTHONPATH?")
502
    print("- xdotool: "+str(shutil.which('xdotool')))
×
503
    print("- recordmydesktop: "+str(shutil.which('recordmydesktop')))
×
504
    print("- xsltproc: "+str(shutil.which('xsltproc')))
×
505
    print("- xclip: "+str(shutil.which('xclip')))
×
506
    print("- convert: "+str(shutil.which('convert')))
×
507

508

509
class ShowInfoAction(argparse.Action):
6✔
510
    def __call__(self, parser, namespace, values, option_string=None):
6✔
511
        show_info()
×
512
        exit(0)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc