• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

twaugh / journal-brief / 236

24 Jun 2020 - 16:50 coverage: 96.173% (+0.1%) from 96.04%
236

Pull #45

travis-ci

web-flow
Merge 6b0317e88 into 8a9284111
Pull Request #45: Reduce flake8 warnings

22 of 24 new or added lines in 3 files covered. (91.67%)

779 of 810 relevant lines covered (96.17%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.16
/journal_brief/config.py
1
"""
2
Copyright (c) 2015 Tim Waugh <tim@cyberelk.net>
3

4
## This program is free software; you can redistribute it and/or modify
5
## it under the terms of the GNU General Public License as published by
6
## the Free Software Foundation; either version 2 of the License, or
7
## (at your option) any later version.
8

9
## This program is distributed in the hope that it will be useful,
10
## but WITHOUT ANY WARRANTY; without even the implied warranty of
11
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
## GNU General Public License for more details.
13

14
## You should have received a copy of the GNU General Public License
15
## along with this program; if not, write to the Free Software
16
## Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17
"""
18

19
import errno
1×
20
from journal_brief import list_formatters
1×
21
from journal_brief.constants import CONFIG_DIR, PACKAGE, PRIORITY_MAP
1×
22
from logging import getLogger
1×
23
import os
1×
24
import re
1×
25
import sre_constants
1×
26
import yaml
1×
27

28

29
log = getLogger(__name__)
1×
30

31

32
class ConfigError(Exception):
1×
33
    pass
1×
34

35

36
class SyntaxError(ConfigError):
1×
37
    def __init__(self, config_file, scanner_error):
1×
38
        super(SyntaxError, self).__init__(scanner_error.problem)
1×
39
        self.scanner_error = scanner_error
1×
40
        with open(config_file, 'rt') as cfp:
1×
41
            self.config_lines = cfp.readlines()
1×
42

43
    def __str__(self):
1×
44
        mark = self.scanner_error.problem_mark
1×
45
        ret = ('{sev}: {problem}\n'
1×
46
               '  in "{file}", line {line}, column {column}:\n'.format(
47
                   sev='error',
48
                   problem=self.scanner_error.problem,
49
                   file=mark.name,
50
                   line=mark.line,
51
                   column=mark.column))
52
        assert mark.line > 0
1×
53
        index = mark.line - 1
1×
54
        assert index < len(self.config_lines)
1×
55
        ret += self.config_lines[index]
1×
56
        assert mark.column > 0
1×
57
        ret += ' ' * (mark.column - 1) + '^'
1×
58
        return ret
1×
59

60

61
class SemanticError(ConfigError):
1×
62
    def __init__(self, message, item, conf, index=None):
1×
63
        super(SemanticError, self).__init__(message)
1×
64
        self.message = message
1×
65
        self.item = item
1×
66
        self.conf = conf
1×
67
        self.index = index
1×
68

69
    def __str__(self):
1×
70
        conf = yaml.dump(self.conf,
1×
71
                         indent=2,
72
                         default_flow_style=False)
73
        if self.index is None:
1×
74
            at = ''
1×
75
        else:
76
            at = 'at item {index}, '.format(index=self.index)
1×
77
            if self.index > 0:
1×
78
                conflines = conf.split('\n')
1×
79
                conflines.insert(1, '(...)')
1×
80
                conf = '\n'.join(conflines)
1×
81

82
        return "{sev}: {item}: {message}\n  {at}in:\n{conf}".format(
1×
83
            sev='error',
84
            item=self.item,
85
            message=self.message,
86
            at=at,
87
            conf=conf
88
        )
89

90

91
def load_config(config_file):
1×
92
    try:
1×
93
        with open(config_file) as config_fp:
1×
94
            try:
1×
95
                config = yaml.safe_load(config_fp)
1×
96
            except yaml.scanner.ScannerError as scanner_error:
1×
97
                err = SyntaxError(config_file,
1×
98
                                  scanner_error)
99
                log.error(err)
1×
100
                raise err from scanner_error
1×
101

102
        if not config:
1×
NEW
103
            config = {}
!
104

105
        return config
1×
106
    except IOError as ex:
1×
107
        if ex.errno != errno.ENOENT:
1×
NEW
108
            raise
!
109

110
        return {}
1×
111

112

113
class Config(dict):
1×
114
    ALLOWED_KEYWORDS = {
1×
115
        'cursor-file',
116
        'debug',
117
        'exclusions',
118
        'inclusions',
119
        'output',
120
        'priority',
121
    }
122

123
    def __init__(self, config_file=None):
1×
124
        if not config_file:
1×
125
            conf_filename = '{0}.conf'.format(PACKAGE)
1×
126
            config_file = os.path.join(CONFIG_DIR, conf_filename)
1×
127

128
        default_config = {'cursor-file': 'cursor'}
1×
129
        config = load_config(config_file)
1×
130

131
        if not isinstance(config, dict):
1×
132
            error = SemanticError('must be a map', 'top level', config)
1×
133
            log.error(error)
1×
134
            raise error
1×
135

136
        default_config.update(config)
1×
137
        super(Config, self).__init__(default_config)
1×
138
        exceptions = list(self.validate())
1×
139
        for exception in exceptions:
1×
140
            log.error("%s", exception)
1×
141
        if exceptions:
1×
142
            raise exceptions[0]
1×
143

144
    def validate(self):
1×
145
        valid_prios = [prio for prio in PRIORITY_MAP.keys()
1×
146
                       if isinstance(prio, str)]
147
        valid_prios.sort()
1×
148
        for errors in [self.validate_allowed_keywords(),
1×
149
                       self.validate_cursor_file(),
150
                       self.validate_debug(),
151
                       self.validate_inclusions_or_exclusions(valid_prios,
152
                                                              'exclusions'),
153
                       self.validate_inclusions_or_exclusions(valid_prios,
154
                                                              'inclusions'),
155
                       self.validate_output(),
156
                       self.validate_priority(valid_prios)]:
157
            for error in errors:
1×
158
                yield error
1×
159

160
    def validate_allowed_keywords(self):
1×
161
        for unexpected_key in set(self) - self.ALLOWED_KEYWORDS:
1×
162
            yield SemanticError('unexpected keyword', unexpected_key,
1×
163
                                {unexpected_key: self[unexpected_key]})
164

165
    def validate_cursor_file(self):
1×
166
        if 'cursor-file' not in self:
1×
167
            return
!
168

169
        if not (isinstance(self['cursor-file'], str) or
1×
170
                isinstance(self['cursor-file'], int)):
171
            yield SemanticError('expected string', 'cursor-file',
1×
172
                                {'cursor-file': self['cursor-file']})
173

174
    def validate_debug(self):
1×
175
        if 'debug' not in self:
1×
176
            return
1×
177

178
        if not (isinstance(self['debug'], bool) or
1×
179
                isinstance(self['debug'], int)):
180
            yield SemanticError('expected bool', 'debug',
1×
181
                                {'debug': self['debug']})
182

183
    def validate_output(self):
1×
184
        if 'output' not in self:
1×
185
            return
1×
186

187
        formatters = list_formatters()
1×
188
        output = self['output']
1×
189
        if isinstance(output, list):
1×
190
            outputs = output
1×
191
        else:
192
            outputs = [output]
1×
193

194
        for output in outputs:
1×
195
            if output not in formatters:
1×
196
                yield SemanticError('invalid output format, must be in %s' %
1×
197
                                    formatters, output,
198
                                    {'output': self['output']})
199

200
    def validate_priority(self, valid_prios):
1×
201
        if 'priority' not in self:
1×
202
            return
1×
203

204
        try:
1×
205
            valid = self['priority'] in PRIORITY_MAP
1×
206
        except TypeError:
1×
207
            valid = False
1×
208
        finally:
209
            if not valid:
1×
210
                yield SemanticError('invalid priority, must be in %s' %
1×
211
                                    valid_prios, 'priority',
212
                                    {'priority': self['priority']})
213

214
    def validate_inclusions_or_exclusions(self, valid_prios, key):
1×
215
        if key not in self:
1×
216
            return
1×
217

218
        if not isinstance(self[key], list):
1×
219
            yield SemanticError('must be a list', key,
1×
220
                                {key: self[key]})
221
            return
1×
222

223
        for error in self.find_bad_rules(valid_prios, key):
1×
224
            yield error
1×
225

226
    def priority_rule_is_valid(self, values):
1×
227
        try:
1×
228
            if isinstance(values, list):
1×
229
                valid = all(value in PRIORITY_MAP
1×
230
                            for value in values)
231
            else:
232
                valid = values in PRIORITY_MAP
1×
233
        except TypeError:
1×
234
            valid = False
1×
235
        finally:
236
            return valid
1×
237

238
    def find_bad_rule_values(self, key, index, field, values):
1×
239
        for value in values:
1×
240
            if isinstance(value, list) or isinstance(value, dict):
1×
241
                yield SemanticError('must be a string', value,
1×
242
                                    {key: [{field: values}]},
243
                                    index=index)
244
                continue
1×
245

246
            if (key == 'exclusions' and
1×
247
                    isinstance(value, str) and
248
                    value.startswith('/') and
249
                    value.endswith('/')):
250
                try:
1×
251
                    # TODO: use this computed value
252
                    re.compile(value[1:-1])
1×
253
                except sre_constants.error as ex:
1×
254
                    yield SemanticError(ex.args[0], value,
1×
255
                                        {key: [{field: values}]},
256
                                        index=index)
257

258
    def find_bad_rules(self, valid_prios, key):
1×
259
        log.debug("%s:", key)
1×
260
        for index, rule in enumerate(self[key]):
1×
261
            if not isinstance(rule, dict):
1×
262
                yield SemanticError('must be a map', key, {key: [rule]}, index)
1×
263
                continue
1×
264

265
            log.debug('-')
1×
266
            for field, values in rule.items():
1×
267
                log.debug("%s: %r", field, values)
1×
268
                if field == 'PRIORITY':
1×
269
                    if not self.priority_rule_is_valid(values):
1×
270
                        message = ('must be list or priority (%s)' %
1×
271
                                   valid_prios)
272
                        yield SemanticError(message, field,
1×
273
                                            {key: [{field: values}]},
274
                                            index=index)
275

276
                    continue
1×
277

278
                if not isinstance(values, list):
1×
279
                    yield SemanticError('must be a list', field,
1×
280
                                        {key: [{field: values}]},
281
                                        index=index)
282
                    continue
1×
283

284
                for error in self.find_bad_rule_values(key, index,
1×
285
                                                       field, values):
286
                    yield error
1×
Troubleshooting · Open an Issue · Sales · Support · ENTERPRISE · CAREERS · STATUS
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2023 Coveralls, Inc