• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

feihoo87 / waveforms / 15965960927

30 Jun 2025 06:51AM UTC coverage: 53.393% (+7.2%) from 46.214%
15965960927

push

github

feihoo87
Update GitHub Actions workflow to support Python 3.13 and enhance testing coverage

- Added Python 3.13 to the matrix for multi-platform builds.
- Reorganized steps for setting up Python and installing dependencies.
- Improved testing process by integrating coverage reporting with Coveralls.
- Streamlined package building and publishing steps, ensuring compatibility with the latest Python version.

1314 of 2461 relevant lines covered (53.39%)

4.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.53
/waveforms/waveform_parser.py
1
import os
8✔
2
import subprocess
8✔
3
from ast import literal_eval
8✔
4
from functools import lru_cache
8✔
5
from pathlib import Path
8✔
6

7
from antlr4 import *
8✔
8
from antlr4.error.ErrorListener import ErrorListener
8✔
9

10
from . import multy_drag, waveform
8✔
11

12

13
class WaveformParseError(Exception):
8✔
14
    """Custom exception for waveform parsing errors."""
15
    pass
8✔
16

17

18
class WaveformErrorListener(ErrorListener):
8✔
19
    """Custom error listener for ANTLR parser."""
20

21
    def syntaxError(self, recognizer, offendingSymbol, line, column, msg, e):
8✔
22
        raise WaveformParseError(
×
23
            f"Syntax error at line {line}, column {column}: {msg}")
24

25

26
class WaveformVisitor:
8✔
27
    """Visitor class to evaluate waveform expressions."""
28

29
    def __init__(self):
8✔
30
        self.functions = [
8✔
31
            'D', 'chirp', 'const', 'cos', 'cosh', 'coshPulse', 'cosPulse',
32
            'cut', 'drag', 'drag_sin', 'drag_sinx', 'exp', 'gaussian',
33
            'general_cosine', 'hanning', 'interp', 'mixing', 'one', 'poly',
34
            'samplingPoints', 'sign', 'sin', 'sinc', 'sinh', 'square', 'step',
35
            't', 'zero'
36
        ]
37
        self.constants = {
8✔
38
            'pi': waveform.pi,
39
            'e': waveform.e,
40
            'inf': waveform.inf
41
        }
42

43
    def get_function(self, name):
8✔
44
        """Get function from waveform or multy_drag modules."""
45
        for mod in [waveform, multy_drag]:
8✔
46
            try:
8✔
47
                return getattr(mod, name)
8✔
48
            except AttributeError:
×
49
                continue
×
50
        raise WaveformParseError(f"Unknown function '{name}'")
×
51

52
    def visit(self, ctx):
8✔
53
        """Visit a parse tree node."""
54
        method_name = f'visit{type(ctx).__name__}'
8✔
55
        visitor = getattr(self, method_name, self.generic_visit)
8✔
56
        return visitor(ctx)
8✔
57

58
    def generic_visit(self, ctx):
8✔
59
        """Default visitor method."""
60
        if hasattr(ctx, 'children') and ctx.children:
×
61
            if len(ctx.children) == 1:
×
62
                return self.visit(ctx.children[0])
×
63
            else:
64
                return [self.visit(child) for child in ctx.children]
×
65
        return ctx.getText()
×
66

67
    def visitExprContext(self, ctx):
8✔
68
        """Visit expression context."""
69
        if ctx.assignment():
8✔
70
            return self.visit(ctx.assignment())
×
71
        else:
72
            return self.visit(ctx.expression())
8✔
73

74
    def visitAssignmentContext(self, ctx):
8✔
75
        """Visit assignment context - not supported in waveform expressions."""
76
        raise WaveformParseError("Assignment expressions are not supported")
×
77

78
    def visitPowerExpressionContext(self, ctx):
8✔
79
        """Handle power expressions (** or ^)."""
80
        left = self.visit(ctx.expression(0))
×
81
        right = self.visit(ctx.expression(1))
×
82
        return left**right
×
83

84
    def visitMultiplyDivideExpressionContext(self, ctx):
8✔
85
        """Handle multiplication and division."""
86
        left = self.visit(ctx.expression(0))
8✔
87
        right = self.visit(ctx.expression(1))
8✔
88
        op = ctx.getChild(1).getText()  # Get operator text
8✔
89
        if op == '*':
8✔
90
            return left * right
8✔
91
        else:  # op == '/'
92
            return left / right
8✔
93

94
    def visitAddSubtractExpressionContext(self, ctx):
8✔
95
        """Handle addition and subtraction."""
96
        left = self.visit(ctx.expression(0))
8✔
97
        right = self.visit(ctx.expression(1))
8✔
98
        op = ctx.getChild(1).getText()  # Get operator text
8✔
99
        if op == '+':
8✔
100
            return left + right
8✔
101
        else:  # op == '-'
102
            return left - right
×
103

104
    def visitShiftExpressionContext(self, ctx):
8✔
105
        """Handle shift expressions (<< and >>)."""
106
        left = self.visit(ctx.expression(0))
8✔
107
        right = self.visit(ctx.expression(1))
8✔
108
        op = ctx.getChild(1).getText()  # Get operator text
8✔
109
        if op == '<<':
8✔
110
            return left << right
8✔
111
        else:  # op == '>>'
112
            return left >> right
8✔
113

114
    def visitParenthesesExpressionContext(self, ctx):
8✔
115
        """Handle parentheses expressions."""
116
        return self.visit(ctx.expression())
8✔
117

118
    def visitUnaryMinusExpressionContext(self, ctx):
8✔
119
        """Handle unary minus expressions."""
120
        return -self.visit(ctx.expression())
8✔
121

122
    def visitFunctionCallExpressionContext(self, ctx):
8✔
123
        """Handle function call expressions."""
124
        return self.visit(ctx.functionCall())
8✔
125

126
    def visitConstantExpressionContext(self, ctx):
8✔
127
        """Handle constant expressions."""
128
        const_name = ctx.CONSTANT().getText()
8✔
129
        return self.constants[const_name]
8✔
130

131
    def visitNumberExpressionContext(self, ctx):
8✔
132
        """Handle number expressions."""
133
        return literal_eval(ctx.NUMBER().getText())
8✔
134

135
    def visitStringExpressionContext(self, ctx):
8✔
136
        """Handle string expressions."""
137
        return literal_eval(ctx.STRING().getText())
8✔
138

139
    def visitListExpressionContext(self, ctx):
8✔
140
        """Handle list expressions."""
141
        return self.visit(ctx.list_())
8✔
142

143
    def visitTupleExpressionContext(self, ctx):
8✔
144
        """Handle tuple expressions."""
145
        return self.visit(ctx.tuple_())
8✔
146

147
    def visitIdentifierExpressionContext(self, ctx):
8✔
148
        """Handle identifier expressions."""
149
        # This could be a variable reference, but for now we'll raise an error
150
        # as the original implementation doesn't support variables
151
        var_name = ctx.ID().getText()
×
152
        raise WaveformParseError(f"Unknown identifier '{var_name}'")
×
153

154
    def visitNoArgFunctionContext(self, ctx):
8✔
155
        """Handle function calls with no arguments."""
156
        func_name = ctx.ID().getText()
8✔
157
        func = self.get_function(func_name)
8✔
158
        return func()
8✔
159

160
    def visitArgsFunctionContext(self, ctx):
8✔
161
        """Handle function calls with positional arguments."""
162
        func_name = ctx.ID().getText()
8✔
163
        func = self.get_function(func_name)
8✔
164
        args = self.visit(ctx.args())
8✔
165
        return func(*args)
8✔
166

167
    def visitKwargsFunctionContext(self, ctx):
8✔
168
        """Handle function calls with keyword arguments."""
169
        func_name = ctx.ID().getText()
×
170
        func = self.get_function(func_name)
×
171
        kwargs = self.visit(ctx.kwargs())
×
172
        return func(**kwargs)
×
173

174
    def visitArgsKwargsFunctionContext(self, ctx):
8✔
175
        """Handle function calls with both positional and keyword arguments."""
176
        func_name = ctx.ID().getText()
8✔
177
        func = self.get_function(func_name)
8✔
178
        args = self.visit(ctx.args())
8✔
179
        kwargs = self.visit(ctx.kwargs())
8✔
180
        return func(*args, **kwargs)
8✔
181

182
    def visitArgsContext(self, ctx):
8✔
183
        """Handle argument lists."""
184
        return [self.visit(expr) for expr in ctx.expression()]
8✔
185

186
    def visitKwargsContext(self, ctx):
8✔
187
        """Handle keyword argument lists."""
188
        kwargs = {}
8✔
189
        for kwarg in ctx.kwarg():
8✔
190
            key, value = self.visit(kwarg)
8✔
191
            kwargs[key] = value
8✔
192
        return kwargs
8✔
193

194
    def visitKwargContext(self, ctx):
8✔
195
        """Handle individual keyword arguments."""
196
        key = ctx.ID().getText()
8✔
197
        value = self.visit(ctx.expression())
8✔
198
        return key, value
8✔
199

200
    def visitListContext(self, ctx):
8✔
201
        """Handle list literals."""
202
        if ctx.expression():
8✔
203
            return [self.visit(expr) for expr in ctx.expression()]
8✔
204
        return []
×
205

206
    def visitTupleContext(self, ctx):
8✔
207
        """Handle tuple literals."""
208
        expressions = ctx.expression()
8✔
209
        if len(expressions) == 1:
8✔
210
            return (self.visit(expressions[0]), )
×
211
        return tuple(self.visit(expr) for expr in expressions)
8✔
212

213

214
def _generate_antlr_parser():
8✔
215
    """Generate ANTLR parser files if needed."""
216
    current_dir = Path(__file__).parent
×
217
    grammar_file = current_dir / "Waveform.g4"
×
218
    lexer_file = current_dir / "WaveformLexer.py"
×
219
    parser_file = current_dir / "WaveformParser.py"
×
220

221
    # Check if parser files exist and are newer than grammar file
222
    if (lexer_file.exists() and parser_file.exists()
×
223
            and lexer_file.stat().st_mtime > grammar_file.stat().st_mtime
224
            and parser_file.stat().st_mtime > grammar_file.stat().st_mtime):
225
        return
×
226

227
    # Generate ANTLR files
228
    try:
×
229
        result = subprocess.run([
×
230
            "antlr4", "-Dlanguage=Python3", "-o",
231
            str(current_dir),
232
            str(grammar_file)
233
        ],
234
                                capture_output=True,
235
                                text=True,
236
                                check=True)
237
    except (subprocess.CalledProcessError, FileNotFoundError) as e:
×
238
        # Fall back to java command if antlr4 command is not available
239
        try:
×
240
            antlr_jar = os.environ.get('ANTLR_JAR',
×
241
                                       'antlr-4.11.1-complete.jar')
242
            result = subprocess.run([
×
243
                "java", "-jar", antlr_jar, "-Dlanguage=Python3", "-o",
244
                str(current_dir),
245
                str(grammar_file)
246
            ],
247
                                    capture_output=True,
248
                                    text=True,
249
                                    check=True)
250
        except (subprocess.CalledProcessError, FileNotFoundError):
×
251
            raise WaveformParseError(
×
252
                "Failed to generate ANTLR parser. Please install ANTLR4 or set ANTLR_JAR environment variable."
253
            )
254

255

256
def parse_waveform_expression(expr: str) -> waveform.Waveform:
8✔
257
    """Parse a waveform expression using ANTLR4."""
258
    try:
8✔
259
        # Import generated ANTLR classes
260
        from .WaveformLexer import WaveformLexer
8✔
261
        from .WaveformParser import WaveformParser
8✔
262

263
        # Create lexer and parser
264
        input_stream = InputStream(expr)
8✔
265
        lexer = WaveformLexer(input_stream)
8✔
266
        stream = CommonTokenStream(lexer)
8✔
267
        parser = WaveformParser(stream)
8✔
268

269
        # Add error listener
270
        error_listener = WaveformErrorListener()
8✔
271
        parser.removeErrorListeners()
8✔
272
        parser.addErrorListener(error_listener)
8✔
273

274
        # Parse expression
275
        tree = parser.expr()
8✔
276

277
        # Visit tree and evaluate
278
        visitor = WaveformVisitor()
8✔
279
        result = visitor.visit(tree)
8✔
280

281
        # Convert numeric results to waveforms
282
        if isinstance(result, (int, float, complex)):
8✔
283
            result = waveform.const(result)
8✔
284

285
        return result.simplify()
8✔
286

287
    except Exception as e:
×
288
        if isinstance(e, WaveformParseError):
×
289
            raise
×
290
        raise WaveformParseError(
×
291
            f"Failed to parse expression '{expr}': {str(e)}")
292

293

294
@lru_cache(maxsize=1024)
8✔
295
def wave_eval(expr: str) -> "waveform.Waveform":
8✔
296
    """
297
    Parse and evaluate a waveform expression using ANTLR 4.
298
    
299
    Args:
300
        expr: The expression string to parse
301
        
302
    Returns:
303
        A Waveform object representing the parsed expression
304
        
305
    Raises:
306
        SyntaxError: If the expression cannot be parsed
307
    """
308
    try:
8✔
309
        return parse_waveform_expression(expr)
8✔
310
    except WaveformParseError as e:
×
311
        raise SyntaxError(f"Failed to parse expression '{expr}': {str(e)}")
×
312
    except Exception as e:
×
313
        raise SyntaxError(f"Failed to parse expression '{expr}': {str(e)}")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc