• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

INTI-CMNB / KiAuto / 6981974383

24 Nov 2023 02:31PM UTC coverage: 88.592%. First build
6981974383

push

github

set-soft
Forced test

3021 of 3410 relevant lines covered (88.59%)

3.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.85
/kiauto/file_util.py
1
# -*- coding: utf-8 -*-
2
# Copyright (c) 2020-2022 Salvador E. Tropea
3
# Copyright (c) 2020-2022 Instituto Nacional de Tecnologïa Industrial
4
# Copyright (c) 2019 Jesse Vincent (@obra)
5
# Copyright (c) 2018-2019 Seppe Stas (@seppestas) (Productize SPRL)
6
# Based on ideas by: Scott Bezek (@scottbez1)
7
# License: Apache 2.0
8
# Project: KiAuto (formerly kicad-automation-scripts)
9
# Adapted from: https://github.com/obra/kicad-automation-scripts
10
"""
6✔
11
Utilities related to the filesystem and file names.
12

13
Note: Only wait_for_file_created_by_process is from the original project.
14
"""
15
import os
6✔
16
import time
6✔
17
import re
6✔
18
import shutil
6✔
19
import atexit
6✔
20
from subprocess import DEVNULL
6✔
21
# python3-psutil
22
import psutil
6✔
23
import json
6✔
24

25
from kiauto.misc import WRONG_ARGUMENTS, KICAD_VERSION_5_99, Config, READ_ONLY_PROBLEM, RULES_KEY
6✔
26
from kiauto import log
6✔
27
logger = log.get_logger(__name__)
6✔
28
time_out_scale = 1.0
6✔
29

30

31
def set_time_out_scale(scale):
6✔
32
    global time_out_scale
33
    time_out_scale = scale
6✔
34

35

36
def wait_for_file_created_by_process(pid, file):
6✔
37
    global time_out_scale
38
    timeout = 15*time_out_scale
3✔
39
    process = psutil.Process(pid)
3✔
40
    DELAY = 0.2
3✔
41
    for j in range(2):
3✔
42
        # 2 passes: 1) for the file to be created 2) extend the timeout if created
43
        logger.debug('Waiting for file %s (pid %d) (timeout: %f)', file, pid, timeout)
3✔
44
        for i in range(int(timeout/DELAY)):
3✔
45
            kicad_died = False
3✔
46
            try:
3✔
47
                open_files = process.open_files()
3✔
48
            except psutil.AccessDenied:
×
49
                # Is our child, this access denied is because we are listing
50
                # files for other process that took the pid of the old KiCad.
51
                kicad_died = True
×
52
            if kicad_died:
3✔
53
                raise RuntimeError('KiCad unexpectedly died')
54
            logger.debug(open_files)
3✔
55
            if os.path.isfile(file):
3✔
56
                file_open = False
3✔
57
                for open_file in open_files:
3✔
58
                    if open_file.path == file:
3✔
59
                        file_open = True
3✔
60
                if file_open:
3✔
61
                    logger.debug('Waiting for process to close file')
3✔
62
                else:
63
                    return
3✔
64
            else:
65
                logger.debug('Waiting for process to create file')
3✔
66
            time.sleep(DELAY)
3✔
67
        # If the file was created assume KiCad is working
68
        if os.path.isfile(file):
×
69
            timeout = 45*time_out_scale
×
70
        else:
71
            # The file wasn't even created, don't wait much
72
            timeout = 1*time_out_scale
×
73

74
    raise RuntimeError('Timed out waiting for creation of %s' % file)
75

76

77
def load_filters(cfg, file):
6✔
78
    """ Load errors filters """
79
    if not os.path.isfile(file):
6✔
80
        logger.error("Filter file `{}` doesn't exist".format(file))
6✔
81
        exit(WRONG_ARGUMENTS)
6✔
82
    logger.debug('Loading filter errors')
6✔
83
    with open(file, 'r') as f:
6✔
84
        ln = 1
6✔
85
        fl = 0
6✔
86
        for line in f:
6✔
87
            line = line.rstrip()
6✔
88
            if len(line) > 0 and line[0] != '#':
6✔
89
                m = re.search(r'^(\S+)\s*,(.*)$', line)
6✔
90
                if m:
6✔
91
                    cfg.err_filters.append([m.group(1), m.group(2)])
6✔
92
                    fl = fl+1
6✔
93
                else:
94
                    logger.error('Syntax error at line {} in filter file `{}`: `{}`'.format(ln, file, line))
6✔
95
                    logger.error('Use `ERROR_NUMBER,REGEX` format')
6✔
96
                    exit(WRONG_ARGUMENTS)
6✔
97
            ln = ln+1
6✔
98
        logger.info('Loaded {} error filters from `{}`'.format(fl, file))
6✔
99

100

101
def add_filter(cfg, id, regex):
6✔
102
    cfg.err_filters.append((id, regex))
×
103

104

105
def apply_filters(cfg, err_name, wrn_name):
6✔
106
    """ Apply the error filters to the list of errors and unconnecteds """
107
    if len(cfg.err_filters) == 0:
6✔
108
        return (0, 0)
6✔
109
    skip_err = 0
6✔
110
    for i, err in enumerate(cfg.errs):
6✔
111
        for f in cfg.err_filters:
6✔
112
            if err.startswith('({})'.format(f[0])):
6✔
113
                m = re.search(f[1], err)
6✔
114
                if m:
6✔
115
                    skip_err += 1
6✔
116
                    logger.warning('Ignoring '+err)
6✔
117
                    logger.debug('Matched regex `{}`'.format(f[1]))
6✔
118
                    cfg.errs[i] = None
6✔
119
                    break
6✔
120
    if skip_err:
6✔
121
        logger.info('Ignoring {} {}'.format(skip_err, err_name))
6✔
122
    skip_wrn = 0
6✔
123
    for i, wrn in enumerate(cfg.wrns):
6✔
124
        for f in cfg.err_filters:
6✔
125
            if wrn.startswith('({})'.format(f[0])):
6✔
126
                m = re.search(f[1], wrn)
6✔
127
                if m:
6✔
128
                    skip_wrn += 1
6✔
129
                    logger.info('Ignoring '+wrn)
6✔
130
                    logger.debug('Matched regex `{}`'.format(f[1]))
6✔
131
                    cfg.wrns[i] = None
6✔
132
                    break
6✔
133
    if skip_wrn:
6✔
134
        logger.info('Ignoring {} {}'.format(skip_wrn, wrn_name))
6✔
135
    return skip_err, skip_wrn
6✔
136

137

138
def list_errors(cfg):
6✔
139
    for err in cfg.errs:
6✔
140
        if err:
6✔
141
            if "; Severity: warning" in err:
6✔
142
                logger.warning(re.sub(" +; Severity: warning\n?", '', err))
4✔
143
            else:
144
                logger.error(re.sub(" +; Severity: error\n?", '', err))
6✔
145

146

147
def list_warnings(cfg):
6✔
148
    for wrn in cfg.wrns:
6✔
149
        if wrn:
6✔
150
            if "; Severity: error" in wrn:
6✔
151
                logger.error(re.sub(" +; Severity: error\n?", ''), wrn)
×
152
            else:
153
                logger.warning(re.sub(" +; Severity: warning\n?", '', wrn))
6✔
154

155

156
def check_kicad_config_dir(cfg):
6✔
157
    if not os.path.isdir(cfg.kicad_conf_path):
6✔
158
        logger.debug('Creating KiCad config dir')
2✔
159
        os.makedirs(cfg.kicad_conf_path, exist_ok=True)
2✔
160

161

162
def remove_lib_table(fname):
6✔
163
    if os.path.isfile(fname):
×
164
        logger.debug('Removing '+fname)
×
165
        os.remove(fname)
×
166

167

168
def check_lib_table(fuser, fsys):
6✔
169
    if not os.path.isfile(fuser):
6✔
170
        logger.debug('Missing default '+os.path.basename(fuser))
6✔
171
        for f in fsys:
6✔
172
            if os.path.isfile(f):
6✔
173
                shutil.copy2(f, fuser)
6✔
174
                logger.debug('Copied {} to {}'.format(f, fuser))
6✔
175
                return
6✔
176
        logger.warning('Missing default system symbol table '+fsys[0] +
177
                       ' creating an empty one')  # pragma: no cover
178
        with open(fuser, 'wt') as f:
×
179
            f.write('({} )\n'.format(os.path.basename(fuser).replace('-', '_')))
×
180
        atexit.register(remove_lib_table, fuser)
×
181

182

183
def restore_one_config(name, fname, fbkp):
6✔
184
    if fbkp and os.path.exists(fbkp):
6✔
185
        if os.path.exists(fname):
6✔
186
            os.remove(fname)
6✔
187
        os.rename(fbkp, fname)
6✔
188
        logger.debug('Restoring old %s config', name)
6✔
189
        return None
6✔
190
    return fbkp
6✔
191

192

193
def restore_config(cfg):
6✔
194
    """ Restore original user configuration """
195
    cfg.conf_eeschema_bkp = restore_one_config('eeschema', cfg.conf_eeschema, cfg.conf_eeschema_bkp)
6✔
196
    cfg.conf_kicad_bkp = restore_one_config('KiCad common', cfg.conf_kicad, cfg.conf_kicad_bkp)
6✔
197
    cfg.conf_hotkeys_bkp = restore_one_config('user hotkeys', cfg.conf_hotkeys, cfg.conf_hotkeys_bkp)
6✔
198
    cfg.conf_pcbnew_bkp = restore_one_config('pcbnew', cfg.conf_pcbnew, cfg.conf_pcbnew_bkp)
6✔
199
    cfg.conf_colors_bkp = restore_one_config('colors', cfg.conf_colors, cfg.conf_colors_bkp)
6✔
200
    cfg.conf_3dview_bkp = restore_one_config('3D viewer', cfg.conf_3dview, cfg.conf_3dview_bkp)
6✔
201

202

203
def backup_config(name, file, err, cfg):
6✔
204
    config_file = file
6✔
205
    old_config_file = file+'.pre_script'
6✔
206
    logger.debug(name+' config: '+config_file)
6✔
207
    # If we have an old back-up ask for the user to solve it
208
    if os.path.isfile(old_config_file):
6✔
209
        logger.error(name+' config back-up found (%s)', old_config_file)
6✔
210
        logger.error('It could contain your %s configuration, rename it to %s or discard it.', name.lower(), config_file)
6✔
211
        exit(err)
6✔
212
    if os.path.isfile(config_file):
6✔
213
        logger.debug('Moving current config to '+old_config_file)
6✔
214
        os.rename(config_file, old_config_file)
6✔
215
        atexit.register(restore_config, cfg)
6✔
216
        return old_config_file
6✔
217
    return None
6✔
218

219

220
def create_user_hotkeys(cfg):
6✔
221
    logger.debug('Creating a user hotkeys config')
4✔
222
    with open(cfg.conf_hotkeys, "wt") as text_file:
4✔
223
        text_file.write('common.Control.print\tCtrl+P\n')
4✔
224
        text_file.write('common.Control.plot\tCtrl+Shift+P\n')
4✔
225
        text_file.write('common.Control.show3DViewer\tAlt+3\n')
4✔
226
        text_file.write('eeschema.EditorControl.exportNetlist\tCtrl+Shift+N\n')
4✔
227
        text_file.write('eeschema.EditorControl.generateBOM\tCtrl+Shift+B\n')
4✔
228
        text_file.write('eeschema.InspectionTool.runERC\t{}\n'.format(RULES_KEY))
4✔
229
        text_file.write('pcbnew.DRCTool.runDRC\t{}\n'.format(RULES_KEY))
4✔
230
        text_file.write('pcbnew.ZoneFiller.zoneFillAll\tB\n')
4✔
231
        text_file.write('pcbnew.EditorControl.generateD356File\tAlt+Shift+E\n')
4✔
232
        text_file.write('3DViewer.Control.rotateXclockwise\tAlt+X\n')
4✔
233
        text_file.write('3DViewer.Control.rotateXcounterclockwise\tAlt+Shift+X\n')
4✔
234
        text_file.write('3DViewer.Control.rotateYclockwise\tAlt+Y\n')
4✔
235
        text_file.write('3DViewer.Control.rotateYcounterclockwise\tAlt+Shift+Y\n')
4✔
236
        text_file.write('3DViewer.Control.rotateZclockwise\tAlt+Z\n')
4✔
237
        text_file.write('3DViewer.Control.rotateZcounterclockwise\tAlt+Shift+Z\n')
4✔
238

239

240
def create_kicad_config(cfg):
6✔
241
    logger.debug('Creating a KiCad common config')
6✔
242
    with open(cfg.conf_kicad, "wt") as text_file:
6✔
243
        if cfg.conf_kicad_json:
6✔
244
            kiconf = {"environment": {"show_warning_dialog": False}}
4✔
245
            kiconf['graphics'] = {"cairo_antialiasing_mode": 0, "opengl_antialiasing_mode": 0}
4✔
246
            kiconf['system'] = {"editor_name": "/bin/cat"}
4✔
247
            # Copy the environment vars if available
248
            if cfg.conf_kicad_bkp:
4✔
249
                vars = Config.get_config_vars_json(cfg.conf_kicad_bkp)
4✔
250
                if vars:
4✔
251
                    kiconf['environment']['vars'] = vars
×
252
            text_file.write(json.dumps(kiconf))
4✔
253
            logger.debug(json.dumps(kiconf))
4✔
254
        else:
255
            text_file.write('ShowEnvVarWarningDialog=0\n')
2✔
256
            text_file.write('Editor=/bin/cat\n')
2✔
257
            # Copy the environment vars if available
258
            if cfg.conf_kicad_bkp:
2✔
259
                vars = Config.get_config_vars_ini(cfg.conf_kicad_bkp)
2✔
260
                if vars:
2✔
261
                    text_file.write('[EnvironmentVariables]\n')
2✔
262
                    for key in vars:
2✔
263
                        text_file.write(key.upper()+'='+vars[key]+'\n')
2✔
264

265

266
def restore_autosave(name):
6✔
267
    """ Restores de auto save information """
268
    old_name = name[:-11]
×
269
    if os.path.isfile(name):
×
270
        logger.debug('Restoring {} -> {}'.format(name, old_name))
×
271
        os.rename(name, old_name)
×
272

273

274
def check_input_file(cfg, no_file, no_ext):
6✔
275
    # Check the schematic/PCB is there
276
    if not os.path.isfile(cfg.input_file):
6✔
277
        logger.error(cfg.input_file+' does not exist')
6✔
278
        exit(no_file)
6✔
279
    # If we pass a name without extension KiCad will try to create a kicad_sch/kicad_pcb
280
    # The extension can be anything.
281
    ext = os.path.splitext(cfg.input_file)[1]
6✔
282
    if not ext:
6✔
283
        logger.error('Input files must use an extension, otherwise KiCad will reject them.')
6✔
284
        exit(no_ext)
6✔
285
    if cfg.kicad_version >= KICAD_VERSION_5_99 and ext == '.sch':
6✔
286
        logger.warning('Using old format files is not recommended. Convert them first.')
×
287
    # KiCad 6 uses #auto_saved_files# to store autosave info
288
    fauto = os.path.join(os.path.dirname(cfg.input_file), '#auto_saved_files#')
6✔
289
    if os.path.isfile(fauto):
6✔
290
        logger.warning('Partially saved project detected, please double check it')
×
291
        # Rename it so KiCad doesn't ask about restoring autosaved files
292
        fauto_new = fauto+'.moved_away'
×
293
        logger.debug('Renaming {} -> {}'.format(fauto, fauto_new))
×
294
        try:
×
295
            os.rename(fauto, fauto_new)
×
296
        except PermissionError:
×
297
            # Read-only directory or file system, give up
298
            logger.error('Unable to rename `{}` please remove it manually'.format(fauto))
×
299
            exit(READ_ONLY_PROBLEM)
×
300
        # Restore it at exit
301
        atexit.register(restore_autosave, fauto_new)
×
302

303

304
def memorize_project(cfg):
6✔
305
    """ Detect the .pro filename and try to read it and its mtime.
306
        If KiCad changes it then we'll try to revert the changes """
307
    cfg.pro_stat = None
6✔
308
    cfg.pro_content = None
6✔
309
    cfg.prl_stat = None
6✔
310
    cfg.prl_content = None
6✔
311
    name_no_ext = os.path.splitext(cfg.input_file)[0]
6✔
312
    cfg.pro_name = name_no_ext+'.'+cfg.pro_ext
6✔
313
    if not os.path.isfile(cfg.pro_name):
6✔
314
        cfg.pro_name = name_no_ext+'.pro'
6✔
315
        if not os.path.isfile(cfg.pro_name):
6✔
316
            logger.warning('KiCad project file not found')
6✔
317
            cfg.pro_name = name_no_ext+'.'+cfg.pro_ext
6✔
318
            return
6✔
319
        if cfg.kicad_version >= KICAD_VERSION_5_99:
×
320
            logger.warning('Using old format projects is not recommended. Convert them first.')
×
321
    if cfg.pro_name[-4:] == '.pro':
6✔
322
        cfg.pro_stat = cfg.start_pro_stat
2✔
323
    else:
324
        cfg.pro_stat = cfg.start_kicad_pro_stat
4✔
325
    with open(cfg.pro_name, 'rb') as f:
6✔
326
        cfg.pro_content = f.read()
6✔
327
    atexit.register(restore_project, cfg)
6✔
328
    if cfg.prl_ext:
6✔
329
        cfg.prl_name = name_no_ext+'.'+cfg.prl_ext
4✔
330
        if not os.path.isfile(cfg.prl_name):
4✔
331
            return
×
332
        cfg.prl_stat = cfg.start_kicad_prl_stat
4✔
333
        with open(cfg.prl_name, 'rb') as f:
4✔
334
            cfg.prl_content = f.read()
4✔
335

336

337
def _restore_project(name, stat_v, content):
6✔
338
    logger.debug('Checking if %s was modified', name)
6✔
339
    if stat_v and content:
6✔
340
        pro_found = False
6✔
341
        if os.path.isfile(name):
6✔
342
            new_stat = os.stat(name)
6✔
343
            pro_found = True
6✔
344
        else:  # pragma: no cover
345
            logger.warning('Project file lost')
346
        if not pro_found or new_stat.st_mtime != stat_v.st_mtime:
6✔
347
            logger.debug('Restoring the project file')
5✔
348
            os.rename(name, name+'-bak')
5✔
349
            with open(name, 'wb') as f:
5✔
350
                f.write(content)
5✔
351
            os.utime(name, times=(stat_v.st_atime, stat_v.st_mtime))
5✔
352

353

354
def restore_project(cfg):
6✔
355
    """ If the .pro was modified try to restore it """
356
    _restore_project(cfg.pro_name, cfg.pro_stat, cfg.pro_content)
6✔
357
    if cfg.prl_ext and cfg.prl_stat:
6✔
358
        _restore_project(cfg.prl_name, cfg.prl_stat, cfg.prl_content)
4✔
359

360

361
def get_log_files(out_dir, app_name, also_interposer=False):
6✔
362
    if log.get_level() > 2:
6✔
363
        os.makedirs(out_dir, exist_ok=True)
6✔
364
        flog_out = open(os.path.join(out_dir, app_name+'_out.log'), 'wt')
6✔
365
        flog_err = open(os.path.join(out_dir, app_name+'_err.log'), 'wt')
6✔
366
        logger.debug('Redirecting '+app_name+' output to '+app_name+'*.log')
6✔
367
    else:
368
        flog_out = flog_err = DEVNULL
6✔
369
    if also_interposer:
6✔
370
        os.makedirs(out_dir, exist_ok=True)
3✔
371
        fname = os.path.join(out_dir, app_name+'_interposer.log')
3✔
372
        flog_int = open(fname, 'wt')
3✔
373
        logger.debug('Saving '+app_name+' interposer dialog to '+fname)
3✔
374
    else:
375
        flog_int = DEVNULL
6✔
376
    return (flog_out, flog_err, flog_int)
6✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc