• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tfcollins / telemetry / 12804737190

16 Jan 2025 08:17AM UTC coverage: 37.59% (+4.2%) from 33.437%
12804737190

push

github

web-flow
Merge pull request #45 from trishaange01/pytest_parser

Update pytest xml parser and markdown gist results

78 of 107 new or added lines in 2 files covered. (72.9%)

1 existing line in 1 file now uncovered.

524 of 1394 relevant lines covered (37.59%)

0.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.9
/telemetry/gparser/parser.py
1
from urllib.parse import urlparse
×
2
import json
×
3
import os
×
4
import re
×
5
import traceback
×
6
import junitparser
×
7
import telemetry
×
8
from junitparser import JUnitXml, Skipped, Error, Failure
×
NEW
9
import xml.etree.ElementTree as ET
×
NEW
10
import ast
×
11

12
def get_parser(url,grabber=None):
×
13
    '''Factory method that provides appropriate parser object base on given url'''
14
    parsers = {
1✔
15
        '^.*dmesg_[a-zA-Z0-9-]+\.log': Dmesg,
16
        '^.*dmesg_.+err\.log': DmesgError,
17
        '^.*dmesg_.+warn\.log': DmesgWarning,
18
        '^.*enumerated_devs\.log': EnumeratedDevs,
19
        '^.*missing_devs\.log': MissingDevs,
20
        '^.*pyadi-iio.*\.xml': [PytestFailure, PytestSkipped, PytestError],
21
        '^.*HWTestResults\.xml': [MatlabFailure, MatlabSkipped, MatlabError],
22
        '^.*info\.txt': InfoTxt,
23
    }
24

25
    # find parser
26
    for sk, parser in parsers.items():
1✔
27
        if re.match(sk, url):
1✔
28
            if isinstance(parser, list):
1✔
29
                return [p(url, grabber) for p in parser]
×
30
            return parser(url, grabber)
1✔
31

32
    raise Exception("Cannot find Parser for {}".format(url))
×
33

34
def remove_suffix(input_string, suffix):
×
35
    if suffix and input_string.endswith(suffix):
1✔
36
        return input_string[:-len(suffix)]
1✔
37
    return input_string
1✔
38

39
class Parser():
×
40
    '''Base class for a parser object'''
41
    def __init__(self, url, grabber=None):
×
42
        
43
        self.url = url
1✔
44
        self.server = None
1✔
45
        self.job = None
1✔
46
        self.job_no = None
1✔
47
        self.job_date = None
1✔
48
        self.file_name = None
1✔
49
        self.file_info = None
1✔
50
        self.target_board = None
1✔
51
        self.artifact_info_type = None
1✔
52
        self.payload_raw = []
1✔
53
        self.payload = []
1✔
54
        self.payload_param = []
1✔
55
        if not grabber:
1✔
56
            self.grabber = telemetry.grabber.Grabber()
×
57
        else:
58
            self.grabber = grabber
1✔
59
        self.initialize()
1✔
60

61
    def initialize(self):
×
62
        url_ = urlparse(self.url)
1✔
63
        self.multilevel = False
1✔
64
        if len(url_.path.split('/job/')) > 2:
1✔
65
            self.multilevel = True
1✔
66

67
        # initialize attributes
68
        self.server = url_.scheme + '://' + url_.netloc + '/' + url_.path.split('/')[1]
1✔
69
        self.job, self.job_no, self.job_date  = self.get_job_info()
1✔
70
        file_info = self.get_file_info()
1✔
71
        self.file_name = file_info[0]
1✔
72
        self.file_info = file_info[1]
1✔
73
        self.target_board = file_info[2]
1✔
74
        self.artifact_info_type=file_info[3]
1✔
75
        self.payload_raw=self.get_payload_raw()
1✔
76
        payload_parsed=self.get_payload_parsed()
1✔
77
        if isinstance(payload_parsed, tuple):
1✔
78
            self.payload=payload_parsed[0]
1✔
79
            self.payload_param=payload_parsed[1]
1✔
80
        else:
81
            self.payload=payload_parsed
1✔
82
            self.payload_param=self.get_payload_param() 
1✔
83

84
    def show_info(self):
×
85
        return self.__dict__
×
86

87
    def get_job_info(self):
×
88
        '''returns jenkins project name, job no and job date'''
89
        if self.multilevel:
1✔
90
            url = urlparse(self.url)
1✔
91
            job=url.path.split('/')[3] + '/' + url.path.split('/')[5]
1✔
92
            job_no=url.path.split('/')[6]
1✔
93
            # TODO: get job date using jenkins api
94
            job_date=None
1✔
95
            return (job,job_no,job_date)
1✔
96

97
        raise Exception("Does not support non multilevel yet!")
×
98

99
    def get_file_info(self):
×
100
        '''returns file name, file info, target_board, artifact_info_type'''
101
        if self.multilevel:
×
102
            url = urlparse(self.url)
×
103
            file_name = url.path.split('/')[-1]
×
104
            file_info = file_name.split('_')
×
105
            target_board=file_info[0]
×
106
            artifact_info_type=file_info[1] + '_' + file_info[2]
×
107
            artifact_info_type = remove_suffix(artifact_info_type,".log")
×
108
            return (file_name, file_info, target_board, artifact_info_type)
×
109

110
        raise Exception("Does not support non multilevel yet!")
×
111

112

113
    def get_payload_raw(self):
×
114
        payload = []
1✔
115
        try:
1✔
116
            file_path = self.grabber.download_file(self.url, self.file_name)
1✔
117
            with open(file_path, "r") as f:
1✔
118
                payload = [l.strip() for l in f.readlines()]
1✔
119
        except Exception as ex:
×
120
            traceback.print_exc()
×
121
            print("Error Parsing File!")
×
122
        finally:
123
            os.remove(file_path)
1✔
124
        return payload
1✔
125

126
    def get_payload_parsed(self):
×
127
        payload = []
1✔
128
        for p in self.payload_raw:
1✔
129
            # try to extract timestamp from data
130
            x = re.search("\[.*(\d+\.\d*)\]\s(.*)", p)
1✔
131
            if x:
1✔
132
                payload.append((x.group(1),x.group(2)))
×
133
            else:
134
                x = re.search("(.*)", p)
1✔
135
                payload.append(x.group(1))
1✔
136
        return payload
1✔
137
    
138
    def get_payload_param(self):
×
139
        payload_param = list()
1✔
140
        for k in self.payload:
1✔
141
            payload_param.append("NA")
1✔
142
        return payload_param
1✔
143

144
class Dmesg(Parser):
×
145

146
    def __init__(self, url, grabber):
×
147
        super(Dmesg, self).__init__(url, grabber)
1✔
148

149
    def get_file_info(self):
×
150
        '''returns file name, file info, target_board, artifact_info_type'''
151
        if self.multilevel:
1✔
152
            url = urlparse(self.url)
1✔
153
            file_name = url.path.split('/')[-1]
1✔
154
            file_info = file_name.split('_')
1✔
155
            target_board=file_info[1]
1✔
156
            artifact_info_type=file_info[0]
1✔
157
            if len(file_info) == 3:
1✔
158
                artifact_info_type += '_' + file_info[2]
1✔
159
            artifact_info_type = remove_suffix(artifact_info_type,".log")
1✔
160
            return (file_name, file_info, target_board, artifact_info_type)
1✔
161

162
        raise Exception("Does not support non multilevel yet!")
×
163

164
class DmesgError(Dmesg):
×
165
    
166
    def __init__(self, url, grabber):
×
167
        super(DmesgError, self).__init__(url, grabber)
1✔
168

169
class DmesgWarning(Dmesg):
×
170
    
171
    def __init__(self, url, grabber):
×
172
        super(DmesgWarning, self).__init__(url, grabber)
1✔
173

174
class EnumeratedDevs(Parser):
×
175
    
176
    def __init__(self, url, grabber):
×
177
        super(EnumeratedDevs, self).__init__(url, grabber)
×
178

179
class MissingDevs(Parser):
×
180
    
181
    def __init__(self, url, grabber):
×
182
        super(MissingDevs, self).__init__(url, grabber)
×
183

184
class xmlParser(Parser):
×
185
    def __init__(self, url, grabber):
×
186
        super(xmlParser, self).__init__(url, grabber)
1✔
187
        
188
    def get_file_info(self):
×
189
        '''returns file name, file info, target_board, artifact_info_type'''
190
        if self.multilevel:
1✔
191
            url = urlparse(self.url)
1✔
192
            url_path = url.path.split('/')
1✔
193
            file_name = url_path[-1]
1✔
194
            parser_type = type(self).__name__
1✔
195
            x = [i for i, c in enumerate(parser_type) if c.isupper()]
1✔
196
            file_info = (parser_type[:x[1]]+'_'+parser_type[x[1]:]).lower()
1✔
197
            target_board = file_name.replace('_','-')
1✔
198
            target_board = remove_suffix(target_board,"-reports.xml")
1✔
199
            target_board = remove_suffix(target_board,"-HWTestResults.xml")
1✔
200
            artifact_info_type=file_info
1✔
201
            return (file_name, file_info, target_board, artifact_info_type)
1✔
202

203
        raise Exception("Does not support non multilevel yet!")
×
204
        
205
    def get_payload_raw(self):
×
206
        payload = []
×
207
        try:
×
208
            file_path = self.grabber.download_file(self.url, self.file_name)
×
209
            # Parser
210
            xml = JUnitXml.fromfile(file_path)
×
211
            resultType = getattr(junitparser, self.file_info.split("_")[1].capitalize())
×
212
            for suite in xml:
×
213
                for case in suite:
×
214
                    if case.result and type(case.result[0]) is resultType:
×
215
                        payload.append(case.name)
×
216
        except Exception as ex:
×
217
            traceback.print_exc()
×
218
            print("Error Parsing File!")
×
219
        finally:
220
            os.remove(file_path)
×
221
        return payload
×
222
    
223
    def get_payload_parsed(self):
×
224
        num_payload = len(self.payload_raw)
×
225
        procedure = list(range(num_payload))
×
226
        param = list(range(num_payload))
×
227
        for k, payload_str in enumerate(self.payload_raw):
×
228
            # remove trailing adi.xxxx device name
229
            payload_str = re.sub("-adi\.\w*", "", payload_str)
×
230
            # remove multiple dashes
231
            payload_str = re.sub("-+", "-", payload_str)
×
232
            # replace () from MATLAB xml with []
233
            payload_str = payload_str.replace("(","[").replace(")","]")
×
234
            procedure_param = payload_str.split("[")
×
235
            procedure[k] = procedure_param[0]
×
236
            if len(procedure_param) == 2:
×
237
                # remove path from profile filename
238
                if any(x in procedure[k] for x in ["profile_write", "write_profile"]):
×
239
                    param[k] = re.findall("(\w*\..*)]",procedure_param[1])[0]
×
240
                else:
241
                    param[k] = procedure_param[1][:-1]
×
242
            else:
243
                param[k] = "NA"
×
244
        payload = procedure
×
245
        payload_param = param
×
246
        return (payload, payload_param)
×
247

NEW
248
class pytestxml_parser(xmlParser):
×
NEW
249
    def __init__(self, url, grabber):
×
250
        super(pytestxml_parser, self).__init__(url, grabber)
1✔
251
    
NEW
252
    def get_payload_raw(self):
×
253
        payload = []
1✔
254
        try:
1✔
255
            file_path = self.grabber.download_file(self.url, self.file_name)
1✔
256
            # Load and parse the XML using element tree 
257
            tree = ET.parse(file_path)
1✔
258
            root = tree.getroot()              
1✔
259

260
            # Iterate over all test cases
261
            for testcase in root.findall(".//testcase"):
1✔
262
                # Get the name of the test case
263
                test_name = testcase.get("name")
1✔
264
                # Find the properties tag
265
                properties = testcase.find("properties")
1✔
266
                # Find the failure tag
267
                failure = testcase.find("failure")    
1✔
268

269
                attr_link = "https://analogdevicesinc.github.io/pyadi-iio/dev/test_attr.html"                
1✔
270
                # Set default description link for all tests
271
                test_name_link = f"[{test_name}]({attr_link})"                                     
1✔
272

273
                if properties is not None:
1✔
274
                    if  failure is not None:
1✔
275
                        # Get parameters from test case name
276
                        param_parsed = test_name.split("[", 1)
1✔
277
                        test_name = param_parsed[0]
1✔
278
                        param_parsed_last = (param_parsed[-1])[:-1].strip()
1✔
279
                        # Separate the parameters
280
                        param_separate = re.split(r'-(?!\d)', param_parsed_last)
1✔
281
                        # Check parameters list for a dictionary"
282
                        for index, param in enumerate(param_separate):                   
1✔
283
                            param_list = []
1✔
284
                            # Separate values from parameter name
285
                            new_param_split = param.split("=", 1)
1✔
286
                            if len(new_param_split) > 1:
1✔
287
                                param_name = new_param_split[0]
1✔
288
                                new_param = new_param_split[1].strip()
1✔
289
                                # Check if param is a dictionary and not empty
290
                                if new_param[0] == "{" and new_param != "\{\}":
1✔
291
                                    # Convert remaining string to a dictionary
292
                                    param_dict = ast.literal_eval(new_param)
1✔
293
                                    if param_dict:
1✔
294
                                        for key, value in param_dict.items():
1✔
295
                                            # Convert parameters back to string
296
                                            new_updated = "'" + key + "'" + ": " + str(value)
1✔
297
                                            param_list.append(new_updated)
1✔
298
                                        insert_param_name = param_name + "="
1✔
299
                                        param_list.insert(0, insert_param_name)
1✔
300
                                        param_list_string = "   \n".join(param_list)
1✔
301
                                        # Update param in param_separate list
302
                                        param_separate[index] = param_list_string 
1✔
303
                        # Compile final parameter details
304
                        param_separate.insert(0,"**Parameters:**")
1✔
305
                        param_display = "\n  - ".join(param_separate)
1✔
306

307
                        # Get failure tag content
308
                        failure_text = failure.text
1✔
309
                        fail_content_lines = failure_text.splitlines()
1✔
310
                        exc_param_value = ""
1✔
311
                        # Get exception statement with parameter names
312
                        for item in fail_content_lines[::-1]:
1✔
313
                            if len(item) > 0:
1✔
314
                                if item[0] == ">":
1✔
315
                                    exc_param_value = item[1:].lstrip()
1✔
316
                                    break                        
1✔
317
                                                                     
318
                        test_name_function = ""
1✔
319
                        test_function_module = ""
1✔
320
                        exctype_message = ""
1✔
321
                        # Iterate through each property in the properties tag
322
                        for prop in properties.findall("property"):
1✔
323
                            # Get the property name and value
324
                            prop_name = prop.get("name")
1✔
325
                            prop_value = prop.get("value")
1✔
326
                            if prop_name == "exception_type_and_message":                                                             
1✔
327
                                prop_list = prop_value.splitlines() 
1✔
328
                                new_props = prop_value
1✔
329
                                prop_list_updated = []
1✔
330
                                # Check if exception and message has mutiple lines  
331
                                if len(prop_list) > 1:
1✔
NEW
332
                                    for props in prop_list:
×
NEW
333
                                        if len(props) > 0:
×
334
                                            # Remove leading spaces
NEW
335
                                            prop_strip = props.lstrip()
×
NEW
336
                                            prop_list_updated.append(prop_strip)                                        
×
337
                                if len(prop_list_updated) > 0:
1✔
NEW
338
                                    new_props = " ".join(prop_list_updated)                                            
×
339
                                # Combine exception type, message, and parameters        
340
                                exctype_message =  "\n" + "  " + new_props + " ( " + exc_param_value + " )"
1✔
341
                                # Get test name function
342
                            if prop_name == "test_name_function":
1✔
343
                                # Get test description
344
                                test_name_function = prop_value
1✔
345
                            if prop_name == "test_function_module":
1✔
346
                                test_function_module = prop_value
1✔
347
                            
348
                            # Create dictionary of pyadi-iio test module links
349
                            test_links = {
1✔
350
                                "test.attr_tests" : "https://analogdevicesinc.github.io/pyadi-iio/dev/test_attr.html",
351
                                "test.dma_tests" : "https://analogdevicesinc.github.io/pyadi-iio/dev/test_dma.html", 
352
                                "test.generics" : "https://analogdevicesinc.github.io/pyadi-iio/dev/test_generics.html"
353
                            }
354
                            
355
                            # Set test description link
356
                            if test_function_module != "":
1✔
357
                                if test_name_function != "":
1✔
358
                                    if test_function_module in test_links:
1✔
359
                                        test_permalink = test_links[test_function_module] + "#" + test_function_module + "." + test_name_function
1✔
360
                                        test_name_link = f"[{test_name}]({test_permalink})"
1✔
361
                                else:
NEW
362
                                    if test_function_module in test_links:
×
NEW
363
                                        test_permalink = test_links[test_function_module]
×
NEW
364
                                        test_name_link = f"[{test_name}]({test_permalink})"
×
365

366
                        # Compile the test details
367
                        test_details = [test_name_link, param_display]
1✔
368
                        test_details_final = "<br><br>".join(test_details) 
1✔
369
                        test_details_final = test_details_final + "\n\n" + exctype_message
1✔
370

371
                        payload.append(test_details_final)
1✔
372
            
NEW
373
        except Exception as ex:
×
NEW
374
            traceback.print_exc()
×
NEW
375
            print("Error Parsing File!")
×
376
        finally:
377
            os.remove(file_path)
1✔
378
        return payload
1✔
379
    
NEW
380
    def get_payload_parsed(self):
×
381
        num_payload = len(self.payload_raw)
1✔
382
        param = list(range(num_payload))
1✔
383
        
384
        payload = self.payload_raw
1✔
385
        payload_param = param
1✔
386
        return (payload, payload_param)
1✔
387

NEW
388
class PytestFailure(pytestxml_parser):
×
389
    def __init__(self, url, grabber):
×
390
        super(PytestFailure, self).__init__(url, grabber)
1✔
NEW
391
class PytestSkipped(pytestxml_parser):
×
392
    def __init__(self, url, grabber):
×
393
        super(PytestSkipped, self).__init__(url, grabber)
×
NEW
394
class PytestError(pytestxml_parser,):
×
395
    def __init__(self, url, grabber):
×
396
        super(PytestError, self).__init__(url, grabber)
×
397

398
class MatlabFailure(xmlParser):
×
399
    def __init__(self, url, grabber):
×
400
        super(MatlabFailure, self).__init__(url, grabber)
×
401
class MatlabSkipped(xmlParser):
×
402
    def __init__(self, url, grabber):
×
403
        super(MatlabSkipped, self).__init__(url, grabber)
×
404
class MatlabError(xmlParser):
×
405
    def __init__(self, url, grabber):
×
406
        super(MatlabError, self).__init__(url, grabber)
×
407

408
class InfoTxt(Parser):
×
409
    def __init__(self, url, grabber):
×
410
        self.regex_patterns = [
1✔
411
            "(BRANCH):\s(.+)$",
412
            "(PR_ID):\s(.+)$",
413
            "(TIMESTAMP):\s(.+)$",
414
            "(DIRECTION):\s(.+)$",
415
            "(Triggered\sby):\s(.+)$",
416
            "(COMMIT\sSHA):\s(.+)$",
417
            "(COMMIT_DATE):\s(.+)$",
418
            "-\s([^:\s]+)$",
419
        ]
420
        super(InfoTxt, self).__init__(url, grabber)
1✔
421
        
422

423
    def get_file_info(self):
×
424
        '''returns file name, file info, target_board, artifact_info_type'''
425
        if self.multilevel:
1✔
426
            url = urlparse(self.url)
1✔
427
            file_name = url.path.split('/')[-1]
1✔
428
            file_info = "NA"
1✔
429
            target_board="NA"
1✔
430
            artifact_info_type = "info_txt"
1✔
431
            return (file_name, file_info, target_board, artifact_info_type)
1✔
432

433
        raise Exception("Does not support non multilevel yet!")
×
434

435
    def get_payload_raw(self):
×
436
        payload = []
1✔
437
        try:
1✔
438
            file_path = self.grabber.download_file(self.url, self.file_name)
1✔
439
            with open(file_path, "r") as f:
1✔
440
                for l in f.readlines():
1✔
441
                    found = False
1✔
442
                    for p in self.regex_patterns:
1✔
443
                        if re.search(p,l.strip()):
1✔
444
                            found=True
1✔
445
                    if found:
1✔
446
                        payload.append(l.strip())
1✔
447
        except Exception as ex:
×
448
            traceback.print_exc()
×
449
            print("Error Parsing File!")
×
450
        finally:
451
            os.remove(file_path)
1✔
452

453
        return payload
1✔
454
    
455
    def get_payload_parsed(self):
×
456
        payload = list()
1✔
457
        payload_param = list()
1✔
458
        for l in self.payload_raw:
1✔
459
            for p in self.regex_patterns:
1✔
460
                x = re.search(p,l)
1✔
461
                if x and len(x.groups())==1:
1✔
462
                    payload.append("Built projects")
1✔
463
                    payload_param.append(x.group(1))
1✔
464
                elif x and len(x.groups())==2:
1✔
465
                    payload.append(x.group(1))
1✔
466
                    payload_param.append(x.group(2))
1✔
467
        return (payload, payload_param)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc