• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

feihoo87 / waveforms / 24607219516

18 Apr 2026 02:54PM UTC coverage: 52.648% (-0.04%) from 52.692%
24607219516

push

github

feihoo87
Update GitHub Actions workflow to download ANTLR4 jar and modify parser generation command

- Changed the installation step for ANTLR4 to download the jar file directly.
- Updated the command for generating the ANTLR parser to use the downloaded jar, ensuring compatibility with the workflow.
- Ensured pip and setuptools are upgraded before installing dependencies.

1332 of 2530 relevant lines covered (52.65%)

6.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

45.85
/waveforms/waveform.py
1
from __future__ import annotations
12✔
2

3
from fractions import Fraction
12✔
4
from typing import Generator, Iterable, cast
12✔
5

6
import numpy as np
12✔
7
from numpy import e, inf, pi
12✔
8
from numpy.typing import NDArray
12✔
9
from scipy.signal import sosfilt
12✔
10

11
from ._waveform import (_D, COS, COSH, D_GAUSSIAN, DRAG, ERF, EXP,
12✔
12
                        EXPONENTIALCHIRP, GAUSSIAN, HYPERBOLICCHIRP, INTERP,
13
                        LINEAR, LINEARCHIRP, MOLLIFIER, NDIGITS, SINC, SINH,
14
                        _baseFunc, _baseFunc_latex, _const, _half, _one, _zero,
15
                        add, basic_wave, calc_parts, filter, is_const,
16
                        merge_waveform, mul, pow, registerBaseFunc,
17
                        registerBaseFuncLatex, registerDerivative, shift,
18
                        simplify, wave_sum)
19

20

21
def _test_spec_num(num, spec):
12✔
22
    x = Fraction(num / spec).limit_denominator(1000000000)
×
23
    if x.denominator <= 24:
×
24
        return True, x, 1
×
25
    x = Fraction(spec * num).limit_denominator(1000000000)
×
26
    if x.denominator <= 24:
×
27
        return True, x, -1
×
28
    return False, x, 0
×
29

30

31
def _exp_num(s):
12✔
32
    if "e" in s:
×
33
        a, n = s.split("e")
×
34
        n = float(n)
×
35
        s = f"{a} \\times 10^{{{n:g}}}"
×
36
    return s
×
37

38

39
def _spec_num_latex(num):
12✔
40
    for spec, spec_latex in [(1, ''), (np.sqrt(2), '\\sqrt{2}'),
×
41
                             (np.sqrt(3), '\\sqrt{3}'),
42
                             (np.sqrt(5), '\\sqrt{5}'),
43
                             (np.log(2), '\\log{2}'), (np.log(3), '\\log{3}'),
44
                             (np.log(5), '\\log{5}'), (np.e, 'e'),
45
                             (np.pi, '\\pi'), (np.pi**2, '\\pi^2'),
46
                             (np.sqrt(np.pi), '\\sqrt{\\pi}')]:
47
        flag, x, sign = _test_spec_num(num, spec)
×
48
        if flag:
×
49
            if sign < 0:
×
50
                spec_latex = f"\\frac{{{1}}}{{{spec_latex}}}"
×
51
            if x.denominator == 1:
×
52
                if x.numerator == 1:
×
53
                    return f"{spec_latex}"
×
54
                else:
55
                    s = _exp_num(f"{x.numerator:g}")
×
56
                    return f"{s}{spec_latex}"
×
57
            else:
58
                if x.numerator < 0:
×
59
                    return f"-\\frac{{{-x.numerator}}}{{{x.denominator}}}{spec_latex}"
×
60
                else:
61
                    return f"\\frac{{{x.numerator}}}{{{x.denominator}}}{spec_latex}"
×
62
    return _exp_num(f"{num:g}")
×
63

64

65
def _num_latex(num):
12✔
66
    if num == -np.inf:
×
67
        return r"-\infty"
×
68
    elif num == np.inf:
×
69
        return r"\infty"
×
70
    if num.imag > 0:
×
71
        return f"\\left({_num_latex(num.real)}+{_num_latex(num.imag)}j\\right)"
×
72
    elif num.imag < 0:
×
73
        return f"\\left({_num_latex(num.real)}-{_num_latex(-num.imag)}j\\right)"
×
74
    s = _spec_num_latex(num.real)
×
75
    if s == '' and round(num.real) == 1:
×
76
        return '1'
×
77
    return s
×
78

79

80
def _fun_latex(fun):
12✔
81
    funID, *args, shift = fun
×
82
    if _baseFunc_latex[funID] is None:
×
83
        shift = _num_latex(shift)
×
84
        if shift == "0":
×
85
            shift = ""
×
86
        elif shift[0] != '-':
×
87
            shift = "+" + shift
×
88
        return r"\mathrm{Func}" + f"{funID}(t{shift}, ...)"
×
89
    return _baseFunc_latex[funID](shift, *args)
×
90

91

92
def _wav_latex(wav):
12✔
93

94
    if wav == _zero:
×
95
        return "0"
×
96
    elif is_const(wav):
×
97
        return f"{wav[1][0]}"
×
98

99
    sum_expr = []
×
100
    for mul, amp in zip(*wav):
×
101
        if mul == ((), ()):
×
102
            sum_expr.append(_num_latex(amp))
×
103
            continue
×
104
        mul_expr = []
×
105
        amp = _num_latex(amp)
×
106
        if amp != "1":
×
107
            mul_expr.append(amp)
×
108
        for fun, n in zip(*mul):
×
109
            fun_expr = _fun_latex(fun)
×
110
            if n != 1:
×
111
                mul_expr.append(fun_expr + "^{" + f"{n}" + "}")
×
112
            else:
113
                mul_expr.append(fun_expr)
×
114
        sum_expr.append(''.join(mul_expr))
×
115

116
    ret = sum_expr[0]
×
117
    for expr in sum_expr[1:]:
×
118
        if expr[0] == '-':
×
119
            ret += expr
×
120
        else:
121
            ret += "+" + expr
×
122
    return ret
×
123

124

125
class Waveform:
12✔
126
    __slots__ = ('bounds', 'seq', 'max', 'min', 'start', 'stop', 'sample_rate',
12✔
127
                 'filters', 'label')
128

129
    def __init__(self, bounds=(+inf, ), seq=(_zero, ), min=-inf, max=inf):
12✔
130
        self.bounds = bounds
12✔
131
        self.seq = seq
12✔
132
        self.max = max
12✔
133
        self.min = min
12✔
134
        self.start = None
12✔
135
        self.stop = None
12✔
136
        self.sample_rate = None
12✔
137
        self.filters: tuple[np.ndarray, float] | None = None
12✔
138
        self.label = None
12✔
139

140
    @staticmethod
12✔
141
    def _begin(bounds, seq):
12✔
142
        for i, s in enumerate(seq):
×
143
            if s != _zero:
×
144
                if i == 0:
×
145
                    return -inf
×
146
                return bounds[i - 1]
×
147
        return inf
×
148

149
    @staticmethod
12✔
150
    def _end(bounds, seq):
12✔
151
        N = len(bounds)
×
152
        for i, s in enumerate(seq[::-1]):
×
153
            if s != _zero:
×
154
                if i == 0:
×
155
                    return inf
×
156
                return bounds[N - i - 1]
×
157
        return -inf
×
158

159
    @property
12✔
160
    def begin(self):
12✔
161
        if self.start is None:
×
162
            return self._begin(self.bounds, self.seq)
×
163
        else:
164
            return max(self.start, self._begin(self.bounds, self.seq))
×
165

166
    @property
12✔
167
    def end(self):
12✔
168
        if self.stop is None:
×
169
            return self._end(self.bounds, self.seq)
×
170
        else:
171
            return min(self.stop, self._end(self.bounds, self.seq))
×
172

173
    def sample(
12✔
174
        self,
175
        sample_rate=None,
176
        out: np.ndarray | None = None,
177
        chunk_size=None,
178
        function_lib=None,
179
        filters: tuple[np.ndarray, float] | None = None
180
    ) -> np.ndarray | Iterable[np.ndarray]:
181
        if sample_rate is None:
12✔
182
            sample_rate = self.sample_rate
12✔
183
        if self.start is None or self.stop is None or sample_rate is None:
12✔
184
            raise ValueError(
×
185
                f'Waveform is not initialized. {self.start=}, {self.stop=}, {sample_rate=}'
186
            )
187
        if filters is None:
12✔
188
            filters = self.filters
12✔
189
        if chunk_size is None:
12✔
190
            x = np.arange(self.start, self.stop, 1 / sample_rate)
12✔
191
            sig = cast(np.ndarray,
12✔
192
                       self.__call__(x, out=out, function_lib=function_lib))
193
            if filters is not None:
12✔
194
                sos, initial = filters
12✔
195
                if not isinstance(sos, np.ndarray):
12✔
196
                    sos = np.array(sos)
×
197
                elif not sos.flags.writeable:
12✔
198
                    sos = sos.copy()
×
199
                if initial:
12✔
200
                    sig = cast(np.ndarray, sosfilt(sos,
×
201
                                                   sig - initial)) + initial
202
                else:
203
                    sig = cast(np.ndarray, sosfilt(sos, sig))
12✔
204
            return cast(np.ndarray, sig)
12✔
205
        else:
206
            return self._sample_iter(sample_rate, chunk_size, out,
×
207
                                     function_lib, filters)
208

209
    def _sample_iter(
12✔
210
        self, sample_rate, chunk_size, out: np.ndarray | None, function_lib,
211
        filters: tuple[np.ndarray, float] | None
212
    ) -> Generator[np.ndarray, None, None]:
213
        start = cast(float, self.start)
×
214
        start_n = 0
×
215
        if filters is not None:
×
216
            sos, initial = filters
×
217
            if not isinstance(sos, np.ndarray):
×
218
                sos = np.array(sos)
×
219
            elif not sos.flags.writeable:
×
220
                sos = sos.copy()
×
221
            # zi = sosfilt_zi(sos)
222
            zi = np.zeros((sos.shape[0], 2))
×
223
        length = chunk_size / sample_rate
×
224
        while start < cast(float, self.stop):
×
225
            if start + length > cast(float, self.stop):
×
226
                length = cast(float, self.stop) - start
×
227
                stop = cast(float, self.stop)
×
228
                size = round((stop - start) * sample_rate)
×
229
            else:
230
                stop = start + length
×
231
                size = chunk_size
×
232
            x = np.linspace(start, stop, size, endpoint=False)
×
233

234
            if filters is None:
×
235
                if out is not None:
×
236
                    yield cast(
×
237
                        np.ndarray,
238
                        self.__call__(x,
239
                                      out=out[start_n:],
240
                                      function_lib=function_lib))
241
                else:
242
                    yield cast(np.ndarray,
×
243
                               self.__call__(x, function_lib=function_lib))
244
            else:
245
                sig = cast(np.ndarray,
×
246
                           self.__call__(x, function_lib=function_lib))
247
                if initial:
×
248
                    sig -= initial
×
249
                sig, zi = sosfilt(sos, sig, zi=zi)
×
250
                if initial:
×
251
                    sig += initial
×
252
                if out is not None:
×
253
                    out[start_n:start_n + size] = sig
×
254
                yield cast(np.ndarray, sig)
×
255

256
            start = stop
×
257
            start_n += chunk_size
×
258

259
    @staticmethod
12✔
260
    def _tolist(bounds, seq, ret=None):
12✔
261
        if ret is None:
12✔
262
            ret = []
×
263
        ret.append(len(bounds))
12✔
264
        for seq, b in zip(seq, bounds):
12✔
265
            ret.append(b)
12✔
266
            tlist, amplist = seq
12✔
267
            ret.append(len(amplist))
12✔
268
            for t, amp in zip(tlist, amplist):
12✔
269
                ret.append(amp)
12✔
270
                mtlist, nlist = t
12✔
271
                ret.append(len(nlist))
12✔
272
                for fun, n in zip(mtlist, nlist):
12✔
273
                    ret.append(n)
12✔
274
                    ret.append(len(fun))
12✔
275
                    ret.extend(fun)
12✔
276
        return ret
12✔
277

278
    @staticmethod
12✔
279
    def _fromlist(l, pos=0):
12✔
280

281
        def _read(l, pos, size):
12✔
282
            try:
12✔
283
                return tuple(l[pos:pos + size]), pos + size
12✔
284
            except:
×
285
                raise ValueError('Invalid waveform format')
×
286

287
        (nseg, ), pos = _read(l, pos, 1)
12✔
288
        bounds = []
12✔
289
        seq = []
12✔
290
        for _ in range(nseg):
12✔
291
            (b, nsum), pos = _read(l, pos, 2)
12✔
292
            bounds.append(b)
12✔
293
            amp = []
12✔
294
            t = []
12✔
295
            for _ in range(nsum):
12✔
296
                (a, nmul), pos = _read(l, pos, 2)
12✔
297
                amp.append(a)
12✔
298
                nlst = []
12✔
299
                mt = []
12✔
300
                for _ in range(nmul):
12✔
301
                    (n, nfun), pos = _read(l, pos, 2)
12✔
302
                    nlst.append(n)
12✔
303
                    fun, pos = _read(l, pos, nfun)
12✔
304
                    mt.append(fun)
12✔
305
                t.append((tuple(mt), tuple(nlst)))
12✔
306
            seq.append((tuple(t), tuple(amp)))
12✔
307

308
        return tuple(bounds), tuple(seq), pos
12✔
309

310
    def tolist(self):
12✔
311
        l = [self.max, self.min, self.start, self.stop, self.sample_rate]
12✔
312
        if self.filters is None:
12✔
313
            l.append(None)
12✔
314
        else:
315
            sos, initial = self.filters
12✔
316
            sos = list(sos.reshape(-1))
12✔
317
            l.append(len(sos))
12✔
318
            l.extend(sos)
12✔
319
            l.append(initial)
12✔
320

321
        return self._tolist(self.bounds, self.seq, l)
12✔
322

323
    @classmethod
12✔
324
    def fromlist(cls, l):
12✔
325
        w = cls()
12✔
326
        pos = 6
12✔
327
        (w.max, w.min, w.start, w.stop, w.sample_rate, sos_size) = l[:pos]
12✔
328
        if sos_size is not None:
12✔
329
            sos = np.array(l[pos:pos + sos_size]).reshape(-1, 6)
12✔
330
            pos += sos_size
12✔
331
            initial = l[pos]
12✔
332
            pos += 1
12✔
333
            w.filters = sos, initial
12✔
334

335
        w.bounds, w.seq, pos = cls._fromlist(l, pos)
12✔
336
        return w
12✔
337

338
    def totree(self):
12✔
339
        if self.filters is None:
12✔
340
            header = (self.max, self.min, self.start, self.stop,
12✔
341
                      self.sample_rate, None)
342
        else:
343
            header = (self.max, self.min, self.start, self.stop,
12✔
344
                      self.sample_rate, self.filters)
345
        body = []
12✔
346

347
        for seq, b in zip(self.seq, self.bounds):
12✔
348
            tlist, amplist = seq
12✔
349
            new_seq = []
12✔
350
            for t, amp in zip(tlist, amplist):
12✔
351
                mtlist, nlist = t
12✔
352
                new_t = []
12✔
353
                for fun, n in zip(mtlist, nlist):
12✔
354
                    new_t.append((n, fun))
12✔
355
                new_seq.append((amp, tuple(new_t)))
12✔
356
            body.append((b, tuple(new_seq)))
12✔
357
        return header, tuple(body)
12✔
358

359
    @staticmethod
12✔
360
    def fromtree(tree):
12✔
361
        w = Waveform()
12✔
362
        header, body = tree
12✔
363

364
        (w.max, w.min, w.start, w.stop, w.sample_rate, w.filters) = header
12✔
365
        bounds = []
12✔
366
        seqs = []
12✔
367
        for b, seq in body:
12✔
368
            bounds.append(b)
12✔
369
            amp_list = []
12✔
370
            t_list = []
12✔
371
            for amp, t in seq:
12✔
372
                amp_list.append(amp)
12✔
373
                n_list = []
12✔
374
                mt_list = []
12✔
375
                for n, mt in t:
12✔
376
                    n_list.append(n)
12✔
377
                    mt_list.append(mt)
12✔
378
                t_list.append((tuple(mt_list), tuple(n_list)))
12✔
379
            seqs.append((tuple(t_list), tuple(amp_list)))
12✔
380
        w.bounds = tuple(bounds)
12✔
381
        w.seq = tuple(seqs)
12✔
382
        return w
12✔
383

384
    def simplify(self, eps=1e-15):
12✔
385
        seq = [simplify(self.seq[0], eps)]
12✔
386
        bounds = [self.bounds[0]]
12✔
387
        for expr, b in zip(self.seq[1:], self.bounds[1:]):
12✔
388
            expr = simplify(expr, eps)
12✔
389
            if expr == seq[-1]:
12✔
390
                seq.pop()
×
391
                bounds.pop()
×
392
            seq.append(expr)
12✔
393
            bounds.append(b)
12✔
394
        return Waveform(tuple(bounds), tuple(seq))
12✔
395

396
    def filter(self, low=0, high=inf, eps=1e-15):
12✔
397
        seq = []
×
398
        for expr in self.seq:
×
399
            seq.append(filter(expr, low, high, eps))
×
400
        return Waveform(self.bounds, tuple(seq))
×
401

402
    def _comb(self, other, oper):
12✔
403
        return Waveform(*merge_waveform(self.bounds, self.seq, other.bounds,
12✔
404
                                        other.seq, oper))
405

406
    def __pow__(self, n) -> Waveform:
12✔
407
        return Waveform(self.bounds, tuple(pow(w, n) for w in self.seq))
12✔
408

409
    def __add__(self, other) -> Waveform:
12✔
410
        if isinstance(other, Waveform):
12✔
411
            return self._comb(other, add)
12✔
412
        else:
413
            return self + const(other)
12✔
414

415
    def __radd__(self, v) -> Waveform:
12✔
416
        return const(v) + self
×
417

418
    def __ior__(self, other) -> Waveform:
12✔
419
        return self | other
×
420

421
    def __or__(self, other) -> Waveform:
12✔
422
        if isinstance(other, (int, float, complex)):
×
423
            other = const(other)
×
424
        w = self.marker + other.marker
×
425

426
        def _or(a, b):
×
427
            if a != _zero or b != _zero:
×
428
                return _one
×
429
            else:
430
                return _zero
×
431

432
        return self._comb(other, _or)
×
433

434
    def __iand__(self, other) -> Waveform:
12✔
435
        return self & other
×
436

437
    def __and__(self, other) -> Waveform:
12✔
438
        if isinstance(other, (int, float, complex)):
×
439
            other = const(other)
×
440
        w = self.marker + other.marker
×
441

442
        def _and(a, b):
×
443
            if a != _zero and b != _zero:
×
444
                return _one
×
445
            else:
446
                return _zero
×
447

448
        return self._comb(other, _and)
×
449

450
    @property
12✔
451
    def marker(self):
12✔
452
        w = self.simplify()
×
453
        return Waveform(w.bounds,
×
454
                        tuple(_zero if s == _zero else _one for s in w.seq))
455

456
    def mask(self, edge: float = 0) -> Waveform:
12✔
457
        w = self.marker
×
458
        in_wave = w.seq[0] == _zero
×
459
        bounds = []
×
460
        seq = []
×
461

462
        if w.seq[0] == _zero:
×
463
            in_wave = False
×
464
            b = w.bounds[0] - edge
×
465
            bounds.append(b)
×
466
            seq.append(_zero)
×
467

468
        for b, s in zip(w.bounds[1:], w.seq[1:]):
×
469
            if not in_wave and s != _zero:
×
470
                in_wave = True
×
471
                bounds.append(b + edge)
×
472
                seq.append(_one)
×
473
            elif in_wave and s == _zero:
×
474
                in_wave = False
×
475
                b = b - edge
×
476
                if b > bounds[-1]:
×
477
                    bounds.append(b)
×
478
                    seq.append(_zero)
×
479
                else:
480
                    bounds.pop()
×
481
                    bounds.append(b)
×
482
        return Waveform(tuple(bounds), tuple(seq))
×
483

484
    def __mul__(self, other) -> Waveform:
12✔
485
        if isinstance(other, Waveform):
12✔
486
            return self._comb(other, mul)
12✔
487
        else:
488
            return self * const(other)
×
489

490
    def __rmul__(self, v) -> Waveform:
12✔
491
        return const(v) * self
12✔
492

493
    def __truediv__(self, other) -> Waveform:
12✔
494
        if isinstance(other, Waveform):
12✔
495
            raise TypeError('division by waveform')
×
496
        else:
497
            return self * const(1 / other)
12✔
498

499
    def __neg__(self) -> Waveform:
12✔
500
        return -1 * self
12✔
501

502
    def __sub__(self, other) -> Waveform:
12✔
503
        return self + (-other)
12✔
504

505
    def __rsub__(self, v) -> Waveform:
12✔
506
        return v + (-self)
×
507

508
    def __rshift__(self, time) -> Waveform:
12✔
509
        return Waveform(
12✔
510
            tuple(round(bound + time, NDIGITS) for bound in self.bounds),
511
            tuple(shift(expr, time) for expr in self.seq))
512

513
    def __lshift__(self, time):
12✔
514
        return self >> (-time)
12✔
515

516
    @staticmethod
12✔
517
    def _merge_parts(
12✔
518
        parts: list[tuple[int, int, np.ndarray | int | float | complex]],
519
        out: list[tuple[int, int, np.ndarray | int | float | complex]]
520
    ) -> list[tuple[int, int, np.ndarray | int | float | complex]]:
521
        # TODO: merge parts
522
        raise NotImplementedError
×
523

524
    @staticmethod
12✔
525
    def _fill_parts(parts, out):
12✔
526
        for start, stop, part in parts:
12✔
527
            out[start:stop] += part
12✔
528

529
    def __call__(
12✔
530
        self,
531
        x,
532
        frag=False,
533
        out: np.ndarray | list | None = None,
534
        accumulate=False,
535
        function_lib=None
536
    ) -> NDArray[np.float64 | np.complex128] | list[
537
            tuple[int, int, NDArray[np.float64 | np.complex128]] | int
538
            | float | complex] | np.float64:
539
        if function_lib is None:
12✔
540
            function_lib = _baseFunc
12✔
541
        if isinstance(x, (int, float, complex)):
12✔
542
            return cast(
×
543
                NDArray[np.float64],
544
                self.__call__(np.array([x]), function_lib=function_lib))[0]
545
        parts, dtype = calc_parts(self.bounds, self.seq, x, function_lib,
12✔
546
                                  self.min, self.max)
547
        if not frag:
12✔
548
            if out is None:
12✔
549
                out = np.zeros_like(x, dtype=dtype)
12✔
550
            elif not accumulate:
×
551
                out *= 0
×
552
            self._fill_parts(parts, out)
12✔
553
        else:
554
            if out is None:
×
555
                return cast(list, parts)
×
556
            else:
557
                out = cast(list, out)
×
558
                if not accumulate:
×
559
                    out.clear()
×
560
                    out.extend(parts)
×
561
                else:
562
                    self._merge_parts(parts, out)
×
563
        return out
12✔
564

565
    def __hash__(self):
12✔
566
        return hash((self.max, self.min, self.start, self.stop,
×
567
                     self.sample_rate, self.bounds, self.seq))
568

569
    def __eq__(self, o: object) -> bool:
12✔
570
        if isinstance(o, (int, float, complex)):
12✔
571
            return self == const(o)
12✔
572
        elif isinstance(o, Waveform):
12✔
573
            a = self.simplify()
12✔
574
            b = o.simplify()
12✔
575
            return a.seq == b.seq and a.bounds == b.bounds and (
12✔
576
                a.max, a.min, a.start, a.stop) == (b.max, b.min, b.start,
577
                                                   b.stop)
578
        else:
579
            return False
×
580

581
    def _repr_latex_(self):
12✔
582
        parts = []
×
583
        start = -np.inf
×
584
        for end, wav in zip(self.bounds, self.seq):
×
585
            e_str = _wav_latex(wav)
×
586
            start_str = _num_latex(start)
×
587
            end_str = _num_latex(end)
×
588
            parts.append(e_str + r",~~&t\in" + f"({start_str},{end_str}" +
×
589
                         (']' if end < np.inf else ')'))
590
            start = end
×
591
        if len(parts) == 1:
×
592
            expr = ''.join(['f(t)=', *parts[0].split('&')])
×
593
        else:
594
            expr = '\n'.join([
×
595
                r"f(t)=\begin{cases}", (r"\\" + '\n').join(parts),
596
                r"\end{cases}"
597
            ])
598
        return "$$\n{}\n$$".format(expr)
×
599

600
    def _play(self, time_unit, volume=1.0):
12✔
601
        import pyaudio
×
602

603
        CHUNK = 1024
×
604
        RATE = 48000
×
605

606
        dynamic_volume = 1.0
×
607
        amp = 2**15 * 0.999 * volume * dynamic_volume
×
608

609
        p = pyaudio.PyAudio()
×
610
        try:
×
611
            stream = p.open(format=pyaudio.paInt16,
×
612
                            channels=1,
613
                            rate=RATE,
614
                            output=True)
615
            try:
×
616
                for data in self.sample(sample_rate=RATE / time_unit,
×
617
                                        chunk_size=CHUNK):
618
                    lim = np.abs(data).max()
×
619
                    if lim > 0 and dynamic_volume > 1.0 / lim:
×
620
                        dynamic_volume = 1.0 / lim
×
621
                        amp = 2**15 * 0.99 * volume * dynamic_volume
×
622
                    data = (amp * data).astype(np.int16)
×
623
                    stream.write(bytes(data.data))
×
624
            finally:
625
                stream.stop_stream()
×
626
                stream.close()
×
627
        finally:
628
            p.terminate()
×
629

630
    def play(self, time_unit=1, volume=1.0):
12✔
631
        import multiprocessing as mp
×
632
        p = mp.Process(target=self._play,
×
633
                       args=(time_unit, volume),
634
                       daemon=True)
635
        p.start()
×
636

637

638
class WaveVStack(Waveform):
12✔
639

640
    def __init__(self, wlist: list[Waveform] = []):
12✔
641
        self.wlist = [(w.bounds, w.seq) for w in wlist]
12✔
642
        self.start = None
12✔
643
        self.stop = None
12✔
644
        self.sample_rate = None
12✔
645
        self.offset = 0
12✔
646
        self.shift = 0
12✔
647
        self.filters = None
12✔
648
        self.label = None
12✔
649
        self.function_lib = None
12✔
650

651
    def __begin(self):
12✔
652
        if self.wlist:
×
653
            v = [self._begin(bounds, seq) for bounds, seq in self.wlist]
×
654
            return min(v)
×
655
        else:
656
            return -inf
×
657

658
    def __end(self):
12✔
659
        if self.wlist:
×
660
            v = [self._end(bounds, seq) for bounds, seq in self.wlist]
×
661
            return max(v)
×
662
        else:
663
            return inf
×
664

665
    @property
12✔
666
    def begin(self):
12✔
667
        if self.start is None:
×
668
            return self.__begin()
×
669
        else:
670
            return max(self.start, self.__begin())
×
671

672
    @property
12✔
673
    def end(self):
12✔
674
        if self.stop is None:
×
675
            return self.__end()
×
676
        else:
677
            return min(self.stop, self.__end())
×
678

679
    def __call__(self, x, frag=False, out=None, function_lib=None):
12✔
680
        assert frag is False, 'WaveVStack does not support frag mode'
12✔
681
        out = np.full_like(x, self.offset, dtype=np.complex128)
12✔
682
        out = cast(NDArray[np.complex128], out)
12✔
683
        if self.shift != 0:
12✔
684
            x = x - self.shift
12✔
685
        if function_lib is None:
12✔
686
            if self.function_lib is None:
12✔
687
                function_lib = _baseFunc
12✔
688
            else:
689
                function_lib = self.function_lib
×
690
        for bounds, seq in self.wlist:
12✔
691
            parts, dtype = calc_parts(bounds, seq, x, function_lib)
12✔
692
            self._fill_parts(parts, out)
12✔
693
        return out.real
12✔
694

695
    def tolist(self):
12✔
696
        l = [
12✔
697
            self.start,
698
            self.stop,
699
            self.offset,
700
            self.shift,
701
            self.sample_rate,
702
        ]
703
        if self.filters is None:
12✔
704
            l.append(None)
12✔
705
        else:
706
            sos, initial = self.filters
12✔
707
            sos = list(sos.reshape(-1))
12✔
708
            l.append(len(sos))
12✔
709
            l.extend(sos)
12✔
710
            l.append(initial)
12✔
711
        l.append(len(self.wlist))
12✔
712
        for bounds, seq in self.wlist:
12✔
713
            self._tolist(bounds, seq, l)
12✔
714
        return l
12✔
715

716
    @classmethod
12✔
717
    def fromlist(cls, l):
12✔
718
        w = cls()
12✔
719
        pos = 6
12✔
720
        w.start, w.stop, w.offset, w.shift, w.sample_rate, sos_size = l[:pos]
12✔
721
        if sos_size is not None:
12✔
722
            sos = np.array(l[pos:pos + sos_size]).reshape(-1, 6)
12✔
723
            pos += sos_size
12✔
724
            initial = l[pos]
12✔
725
            pos += 1
12✔
726
            w.filters = sos, initial
12✔
727
        n = l[pos]
12✔
728
        pos += 1
12✔
729
        for _ in range(n):
12✔
730
            bounds, seq, pos = cls._fromlist(l, pos)
12✔
731
            w.wlist.append((bounds, seq))
12✔
732
        return w
12✔
733

734
    def simplify(self, eps=1e-15):
12✔
735
        if not self.wlist:
12✔
736
            return zero()
12✔
737
        bounds, seq = wave_sum(self.wlist)
12✔
738
        wav = Waveform(bounds=bounds, seq=seq)
12✔
739
        if self.offset != 0:
12✔
740
            wav += self.offset
×
741
        if self.shift != 0:
12✔
742
            wav >>= self.shift
×
743
        wav = wav.simplify(eps)
12✔
744
        wav.start = self.start
12✔
745
        wav.stop = self.stop
12✔
746
        wav.sample_rate = self.sample_rate
12✔
747
        wav.filters = self.filters
12✔
748
        wav.label = self.label
12✔
749
        return wav
12✔
750

751
    @staticmethod
12✔
752
    def _rshift(wlist, time):
12✔
753
        if time == 0:
×
754
            return wlist
×
755
        return [(tuple(round(bound + time, NDIGITS) for bound in bounds),
×
756
                 tuple(shift(expr, time) for expr in seq))
757
                for bounds, seq in wlist]
758

759
    def __rshift__(self, time):
12✔
760
        ret = WaveVStack()
12✔
761
        ret.wlist = self.wlist
12✔
762
        ret.sample_rate = self.sample_rate
12✔
763
        ret.start = self.start
12✔
764
        ret.stop = self.stop
12✔
765
        ret.shift = self.shift + time
12✔
766
        ret.offset = self.offset
12✔
767
        ret.filters = self.filters
12✔
768
        ret.label = self.label
12✔
769
        return ret
12✔
770

771
    def __add__(self, other) -> WaveVStack:
12✔
772
        ret = WaveVStack()
12✔
773
        ret.wlist.extend(self.wlist)
12✔
774
        if isinstance(other, WaveVStack):
12✔
775
            if other.shift != self.shift:
×
776
                ret.wlist = self._rshift(ret.wlist, self.shift)
×
777
                ret.wlist.extend(self._rshift(other.wlist, other.shift))
×
778
            else:
779
                ret.wlist.extend(other.wlist)
×
780
            ret.offset = self.offset + other.offset
×
781
        elif isinstance(other, Waveform):
12✔
782
            other <<= self.shift
12✔
783
            ret.wlist.append((other.bounds, other.seq))
12✔
784
        else:
785
            # ret.wlist.append(((+inf, ), (_const(1.0 * other), )))
786
            ret.offset += other
12✔
787
        ret.filters = self.filters
12✔
788
        ret.label = self.label
12✔
789
        return ret
12✔
790

791
    def __radd__(self, v) -> WaveVStack:
12✔
792
        return self + v
×
793

794
    def __mul__(self, other) -> WaveVStack:
12✔
795
        if isinstance(other, Waveform):
12✔
796
            other = other.simplify() << self.shift
12✔
797
            ret = WaveVStack([Waveform(*w) * other for w in self.wlist])
12✔
798
            if self.offset != 0:
12✔
799
                w = other * self.offset
×
800
                ret.wlist.append((w.bounds, w.seq))
×
801
            ret.filters = self.filters
12✔
802
            ret.label = self.label
12✔
803
            return ret
12✔
804
        else:
805
            ret = WaveVStack([Waveform(*w) * other for w in self.wlist])
×
806
            ret.offset = self.offset * other
×
807
            ret.filters = self.filters
×
808
            ret.label = self.label
×
809
            return ret
×
810

811
    def __rmul__(self, v) -> WaveVStack:
12✔
812
        return self * v
×
813

814
    def __eq__(self, other) -> bool:
12✔
815
        if self.wlist:
×
816
            return False
×
817
        else:
818
            return zero() == other
×
819

820
    def _repr_latex_(self):
12✔
821
        return r"\sum_{i=1}^{" + f"{len(self.wlist)}" + r"}" + r"f_i(t)"
×
822

823
    def __getstate__(self) -> tuple:
12✔
824
        function_lib = self.function_lib
×
825
        if function_lib:
×
826
            try:
×
827
                import dill
×
828
                function_lib = dill.dumps(function_lib)
×
829
            except:
×
830
                function_lib = None
×
831
        return (self.wlist, self.start, self.stop, self.sample_rate,
×
832
                self.offset, self.shift, self.filters, self.label,
833
                function_lib)
834

835
    def __setstate__(self, state: tuple) -> None:
12✔
836
        (self.wlist, self.start, self.stop, self.sample_rate, self.offset,
×
837
         self.shift, self.filters, self.label, function_lib) = state
838
        if function_lib:
×
839
            try:
×
840
                import dill
×
841
                function_lib = dill.loads(function_lib)
×
842
            except:
×
843
                function_lib = None
×
844
        self.function_lib = function_lib
×
845

846

847
def play(data, rate=48000):
12✔
848
    import io
×
849

850
    import pyaudio
×
851

852
    CHUNK = 1024
×
853

854
    max_amp = np.max(np.abs(data))
×
855

856
    if max_amp > 1:
×
857
        data /= max_amp
×
858

859
    data = np.array(2**15 * 0.999 * data, dtype=np.int16)
×
860
    buff = io.BytesIO(data.data)
×
861
    p = pyaudio.PyAudio()
×
862

863
    try:
×
864
        stream = p.open(format=pyaudio.paInt16,
×
865
                        channels=1,
866
                        rate=rate,
867
                        output=True)
868
        try:
×
869
            while True:
×
870
                data = buff.read(CHUNK)
×
871
                if data:
×
872
                    stream.write(data)
×
873
                else:
874
                    break
×
875
        finally:
876
            stream.stop_stream()
×
877
            stream.close()
×
878
    finally:
879
        p.terminate()
×
880

881

882
_zero_waveform = Waveform()
12✔
883
_one_waveform = Waveform(seq=(_one, ))
12✔
884

885

886
def zero():
12✔
887
    return _zero_waveform
12✔
888

889

890
def one():
12✔
891
    return _one_waveform
12✔
892

893

894
def const(c):
12✔
895
    return Waveform(seq=(_const(1.0 * c), ))
12✔
896

897

898
# register base function
899
def _format_LINEAR(shift, *args):
12✔
900
    if shift != 0:
×
901
        shift = _num_latex(-shift)
×
902
        if shift[0] == '-':
×
903
            return f"(t{shift})"
×
904
        else:
905
            return f"(t+{shift})"
×
906
    else:
907
        return 't'
×
908

909

910
def _format_GAUSSIAN(shift, *args):
12✔
911
    sigma = _num_latex(args[0] / np.sqrt(2))
×
912
    shift = _num_latex(-shift)
×
913
    if shift != '0':
×
914
        if shift[0] != '-':
×
915
            shift = '+' + shift
×
916
        if sigma == '1':
×
917
            return ('\\exp\\left[-\\frac{\\left(t' + shift +
×
918
                    '\\right)^2}{2}\\right]')
919
        else:
920
            return ('\\exp\\left[-\\frac{1}{2}\\left(\\frac{t' + shift + '}{' +
×
921
                    sigma + '}\\right)^2\\right]')
922
    else:
923
        if sigma == '1':
×
924
            return ('\\exp\\left(-\\frac{t^2}{2}\\right)')
×
925
        else:
926
            return ('\\exp\\left[-\\frac{1}{2}\\left(\\frac{t}{' + sigma +
×
927
                    '}\\right)^2\\right]')
928

929

930
def _format_SINC(shift, *args):
12✔
931
    shift = _num_latex(-shift)
×
932
    bw = _num_latex(args[0])
×
933
    if shift != '0':
×
934
        if shift[0] != '-':
×
935
            shift = '+' + shift
×
936
        if bw == '1':
×
937
            return '\\mathrm{sinc}(t' + shift + ')'
×
938
        else:
939
            return '\\mathrm{sinc}[' + bw + '(t' + shift + ')]'
×
940
    else:
941
        if bw == '1':
×
942
            return '\\mathrm{sinc}(t)'
×
943
        else:
944
            return '\\mathrm{sinc}(' + bw + 't)'
×
945

946

947
def _format_COSINE(shift, *args):
12✔
948
    freq = args[0] / 2 / np.pi
×
949
    phase = -shift * freq
×
950
    freq = _num_latex(freq)
×
951
    if freq == '1':
×
952
        freq = ''
×
953
    phase = _num_latex(phase)
×
954
    if phase == '0':
×
955
        phase = ''
×
956
    elif phase[0] != '-':
×
957
        phase = '+' + phase
×
958
    if phase != '':
×
959
        return f'\\cos\\left[2\\pi\\left({freq}t{phase}\\right)\\right]'
×
960
    elif freq != '':
×
961
        return f'\\cos\\left(2\\pi\\times {freq}t\\right)'
×
962
    else:
963
        return '\\cos\\left(2\\pi t\\right)'
×
964

965

966
def _format_ERF(shift, *args):
12✔
967
    if shift > 0:
×
968
        return '\\mathrm{erf}(\\frac{t-' + f"{_num_latex(shift)}" + '}{' + f'{args[0]:g}' + '})'
×
969
    elif shift < 0:
×
970
        return '\\mathrm{erf}(\\frac{t+' + f"{_num_latex(-shift)}" + '}{' + f'{args[0]:g}' + '})'
×
971
    else:
972
        return '\\mathrm{erf}(\\frac{t}{' + f'{args[0]:g}' + '})'
×
973

974

975
def _format_COSH(shift, *args):
12✔
976
    if shift > 0:
×
977
        return '\\cosh(\\frac{t-' + f"{_num_latex(shift)}" + '}{' + f'{1/args[0]:g}' + '})'
×
978
    elif shift < 0:
×
979
        return '\\cosh(\\frac{t+' + f"{_num_latex(-shift)}" + '}{' + f'{1/args[0]:g}' + '})'
×
980
    else:
981
        return '\\cosh(\\frac{t}{' + f'{1/args[0]:g}' + '})'
×
982

983

984
def _format_SINH(shift, *args):
12✔
985
    if shift > 0:
×
986
        return '\\sinh(\\frac{t-' + f"{_num_latex(shift)}" + '}{' + f'{args[0]:g}' + '})'
×
987
    elif shift < 0:
×
988
        return '\\sinh(\\frac{t+' + f"{_num_latex(-shift)}" + '}{' + f'{args[0]:g}' + '})'
×
989
    else:
990
        return '\\sinh(\\frac{t}{' + f'{args[0]:g}' + '})'
×
991

992

993
def _format_EXP(shift, *args):
12✔
994
    if _num_latex(shift) and shift > 0:
×
995
        return '\\exp\\left(-' + f'{args[0]:g}' + '\\left(t-' + f"{_num_latex(shift)}" + '\\right)\\right)'
×
996
    elif _num_latex(-shift) and shift < 0:
×
997
        return '\\exp\\left(-' + f'{args[0]:g}' + '\\left(t+' + f"{_num_latex(-shift)}" + '\\right)\\right)'
×
998
    else:
999
        return '\\exp\\left(-' + f'{args[0]:g}' + 't\\right)'
×
1000

1001

1002
def _format_DRAG(shift, *args):
12✔
1003
    return f"DRAG(...)"
×
1004

1005

1006
def _format_MOLLIFIER(shift, *args):
12✔
1007
    r = _num_latex(args[0])
×
1008
    d = _num_latex(args[1])
×
1009
    shift_str = _num_latex(-shift)
×
1010
    if shift_str == '0':
×
1011
        shift_str = ''
×
1012
    elif shift_str[0] != '-':
×
1013
        shift_str = '+' + shift_str
×
1014

1015
    if d == '0':
×
1016
        return f"\\mathrm{{Mollifier}}\\left(t{shift_str}, r={r}\\right)"
×
1017
    elif d == '1':
×
1018
        return f"\\mathrm{{Mollifier}}'\\left(t{shift_str}, r={r}\\right)"
×
1019
    elif d == '2':
×
1020
        return f"\\mathrm{{Mollifier}}''\\left(t{shift_str}, r={r}\\right)"
×
1021
    else:
1022
        return f"\\mathrm{{Mollifier}}^{{({d})}}\\left(t{shift_str}, r={r}\\right)"
×
1023

1024

1025
def _format_D_GAUSSIAN(shift, *args):
12✔
1026
    sigma = _num_latex(args[0] / np.sqrt(2))
×
1027
    d = args[1]
×
1028
    shift_str = _num_latex(-shift)
×
1029
    if shift_str == '0':
×
1030
        shift_str = ''
×
1031
    elif shift_str[0] != '-':
×
1032
        shift_str = '+' + shift_str
×
1033

1034
    if d == 0:
×
1035
        return f"\\mathrm{{Gaussian}}\\left(t{shift_str}, \\sigma={sigma}\\right)"
×
1036
    elif d == 1:
×
1037
        return f"\\frac{{\\mathrm{{d}}}}{{\\mathrm{{d}}t}}\\mathrm{{Gaussian}}\\left(t{shift_str}, \\sigma={sigma}\\right)"
×
1038
    else:
1039
        return f"\\frac{{\\mathrm{{d}}^{{{d}}}}}{{\\mathrm{{d}}t^{{{d}}}}}\\mathrm{{Gaussian}}\\left(t{shift_str}, \\sigma={sigma}\\right)"
×
1040

1041

1042
registerBaseFuncLatex(LINEAR, _format_LINEAR)
12✔
1043
registerBaseFuncLatex(GAUSSIAN, _format_GAUSSIAN)
12✔
1044
registerBaseFuncLatex(ERF, _format_ERF)
12✔
1045
registerBaseFuncLatex(COS, _format_COSINE)
12✔
1046
registerBaseFuncLatex(SINC, _format_SINC)
12✔
1047
registerBaseFuncLatex(EXP, _format_EXP)
12✔
1048
registerBaseFuncLatex(COSH, _format_COSH)
12✔
1049
registerBaseFuncLatex(SINH, _format_SINH)
12✔
1050
registerBaseFuncLatex(DRAG, _format_DRAG)
12✔
1051
registerBaseFuncLatex(MOLLIFIER, _format_MOLLIFIER)
12✔
1052
registerBaseFuncLatex(D_GAUSSIAN, _format_D_GAUSSIAN)
12✔
1053

1054

1055
def D(wav: Waveform, d: int = 1) -> Waveform:
12✔
1056
    """derivative
1057

1058
    Parameters
1059
    ----------
1060
    wav : Waveform
1061
        The waveform to take the derivative of.
1062
    d : int, optional
1063
        The order of the derivative, by default 1.
1064
    """
1065
    assert d >= 0 and isinstance(d, int), "d must be a non-negative integer"
×
1066
    if d == 0:
×
1067
        return wav
×
1068
    elif d == 1:
×
1069
        return Waveform(bounds=wav.bounds, seq=tuple(_D(x) for x in wav.seq))
×
1070
    else:
1071
        return D(D(wav, d - 1), 1)
×
1072

1073

1074
def convolve(a, b):
12✔
1075
    pass
×
1076

1077

1078
def sign():
12✔
1079
    return Waveform(bounds=(0, +inf), seq=(_const(-1), _one))
×
1080

1081

1082
def step(edge, type='erf'):
12✔
1083
    """
1084
    type: "erf", "cos", "linear"
1085
    """
1086
    if edge == 0:
12✔
1087
        return Waveform(bounds=(0, +inf), seq=(_zero, _one))
12✔
1088
    if type == 'cos':
12✔
1089
        rise = add(_half,
×
1090
                   mul(_half, basic_wave(COS, pi / edge, shift=0.5 * edge)))
1091
        return Waveform(bounds=(round(-edge / 2,
×
1092
                                      NDIGITS), round(edge / 2,
1093
                                                      NDIGITS), +inf),
1094
                        seq=(_zero, rise, _one))
1095
    elif type == 'linear':
12✔
1096
        rise = add(_half, mul(_const(1 / edge), basic_wave(LINEAR)))
12✔
1097
        return Waveform(bounds=(round(-edge / 2,
12✔
1098
                                      NDIGITS), round(edge / 2,
1099
                                                      NDIGITS), +inf),
1100
                        seq=(_zero, rise, _one))
1101
    else:
1102
        std_sq2 = edge / 5
×
1103
        # rise = add(_half, mul(_half, basic_wave(ERF, std_sq2)))
1104
        rise = ((((), ()), (((ERF, std_sq2, 0), ), (1, ))), (0.5, 0.5))
×
1105
        return Waveform(bounds=(-round(edge, NDIGITS), round(edge,
×
1106
                                                             NDIGITS), +inf),
1107
                        seq=(_zero, rise, _one))
1108

1109

1110
def square(width: float, edge: float = 0, type: str = 'erf') -> Waveform:
12✔
1111
    if width <= 0:
12✔
1112
        return zero()
×
1113
    if edge == 0:
12✔
1114
        return Waveform(bounds=(round(-0.5 * width,
12✔
1115
                                      NDIGITS), round(0.5 * width,
1116
                                                      NDIGITS), +inf),
1117
                        seq=(_zero, _one, _zero))
1118
    else:
1119
        return ((step(edge, type=type) << width / 2) -
12✔
1120
                (step(edge, type=type) >> width / 2))
1121

1122

1123
def gaussian(width: float,
12✔
1124
             plateau: float = 0.0,
1125
             d: int | None = None) -> Waveform:
1126
    if width <= 0 and plateau <= 0.0:
12✔
1127
        return zero()
×
1128
    # width is two times FWHM
1129
    # std_sq2 = width / (4 * np.sqrt(np.log(2)))
1130
    std_sq2 = width / 3.3302184446307908
12✔
1131
    # std is set to give total pulse area same as a square
1132
    # std_sq2 = width/np.sqrt(np.pi)
1133
    if d is None:
12✔
1134
        base = lambda shift: basic_wave(GAUSSIAN, std_sq2, shift=shift)
12✔
1135
    else:
1136
        base = lambda shift: basic_wave(D_GAUSSIAN, std_sq2, d, shift=shift)
×
1137

1138
    if round(0.5 * plateau, NDIGITS) <= 0.0:
12✔
1139
        return Waveform(bounds=(round(-0.75 * width,
12✔
1140
                                      NDIGITS), round(0.75 * width,
1141
                                                      NDIGITS), +inf),
1142
                        seq=(_zero, base(0), _zero))
1143
    else:
1144
        return Waveform(bounds=(round(-0.75 * width - 0.5 * plateau,
×
1145
                                      NDIGITS), round(-0.5 * plateau, NDIGITS),
1146
                                round(0.5 * plateau, NDIGITS),
1147
                                round(0.75 * width + 0.5 * plateau,
1148
                                      NDIGITS), +inf),
1149
                        seq=(_zero, base(-0.5 * plateau), _one,
1150
                             base(0.5 * plateau), _zero))
1151

1152

1153
def cos(w: float, phi: float = 0) -> Waveform:
12✔
1154
    if w == 0:
12✔
1155
        return const(np.cos(phi))
×
1156
    if w < 0:
12✔
1157
        phi = -phi
×
1158
        w = -w
×
1159
    return Waveform(seq=(basic_wave(COS, w, shift=-phi / w), ))
12✔
1160

1161

1162
def sin(w: float, phi: float = 0) -> Waveform:
12✔
1163
    if w == 0:
12✔
1164
        return const(np.sin(phi))
×
1165
    if w < 0:
12✔
1166
        phi = -phi + pi
×
1167
        w = -w
×
1168
    return Waveform(seq=(basic_wave(COS, w, shift=(pi / 2 - phi) / w), ))
12✔
1169

1170

1171
def exp(alpha: float | complex) -> Waveform:
12✔
1172
    if isinstance(alpha, complex):
12✔
1173
        if alpha.real == 0:
12✔
1174
            return cos(alpha.imag) + 1j * sin(alpha.imag)
×
1175
        else:
1176
            return exp(alpha.real) * (cos(alpha.imag) + 1j * sin(alpha.imag))
12✔
1177
    else:
1178
        return Waveform(seq=(basic_wave(EXP, alpha), ))
12✔
1179

1180

1181
def sinc(bw: float) -> Waveform:
12✔
1182
    if bw <= 0:
×
1183
        return zero()
×
1184
    width = 100 / bw
×
1185
    return Waveform(bounds=(round(-0.5 * width,
×
1186
                                  NDIGITS), round(0.5 * width, NDIGITS), +inf),
1187
                    seq=(_zero, basic_wave(SINC, bw), _zero))
1188

1189

1190
def cosPulse(width: float, plateau: float = 0.0) -> Waveform:
12✔
1191
    # cos = basic_wave(COS, 2*np.pi/width)
1192
    # pulse = mul(add(cos, _one), _half)
1193
    if round(0.5 * plateau, NDIGITS) > 0:
×
1194
        return square(plateau + 0.5 * width, edge=0.5 * width, type='cos')
×
1195
    if width <= 0:
×
1196
        return zero()
×
1197
    pulse = ((((), ()), (((COS, 6.283185307179586 / width, 0), ), (1, ))),
×
1198
             (0.5, 0.5))
1199
    return Waveform(bounds=(round(-0.5 * width,
×
1200
                                  NDIGITS), round(0.5 * width, NDIGITS), +inf),
1201
                    seq=(_zero, pulse, _zero))
1202

1203

1204
def hanning(width: float, plateau: float = 0.0) -> Waveform:
12✔
1205
    return cosPulse(width, plateau=plateau)
×
1206

1207

1208
def cosh(w: float) -> Waveform:
12✔
1209
    return Waveform(seq=(basic_wave(COSH, w), ))
×
1210

1211

1212
def sinh(w: float) -> Waveform:
12✔
1213
    return Waveform(seq=(basic_wave(SINH, w), ))
×
1214

1215

1216
def coshPulse(width: float,
12✔
1217
              eps: float = 1.0,
1218
              plateau: float = 0.0) -> Waveform:
1219
    """Cosine hyperbolic pulse with the following im
1220

1221
    pulse edge shape:
1222
            cosh(eps / 2) - cosh(eps * t / T)
1223
    f(t) = -----------------------------------
1224
                  cosh(eps / 2) - 1
1225
    where T is the pulse width and eps is the pulse edge steepness.
1226
    The pulse is defined for t in [-T/2, T/2].
1227

1228
    In case of plateau > 0, the pulse is defined as:
1229
           | f(t + plateau/2)   if t in [-T/2 - plateau/2, - plateau/2]
1230
    g(t) = | 1                  if t in [-plateau/2, plateau/2]
1231
           | f(t - plateau/2)   if t in [plateau/2, T/2 + plateau/2]
1232

1233
    Parameters
1234
    ----------
1235
    width : float
1236
        Pulse width.
1237
    eps : float
1238
        Pulse edge steepness.
1239
    plateau : float
1240
        Pulse plateau.
1241
    """
1242
    if width <= 0 and plateau <= 0:
×
1243
        return zero()
×
1244
    w = eps / width
×
1245
    A = np.cosh(eps / 2)
×
1246

1247
    if plateau == 0.0 or round(-0.5 * plateau, NDIGITS) == round(
×
1248
            0.5 * plateau, NDIGITS):
1249
        pulse = ((((), ()), (((COSH, w, 0), ), (1, ))), (A / (A - 1),
×
1250
                                                         -1 / (A - 1)))
1251
        return Waveform(bounds=(round(-0.5 * width,
×
1252
                                      NDIGITS), round(0.5 * width,
1253
                                                      NDIGITS), +inf),
1254
                        seq=(_zero, pulse, _zero))
1255
    else:
1256
        raising = ((((), ()), (((COSH, w, -0.5 * plateau), ), (1, ))),
×
1257
                   (A / (A - 1), -1 / (A - 1)))
1258
        falling = ((((), ()), (((COSH, w, 0.5 * plateau), ), (1, ))),
×
1259
                   (A / (A - 1), -1 / (A - 1)))
1260
        return Waveform(bounds=(round(-0.5 * width - 0.5 * plateau,
×
1261
                                      NDIGITS), round(-0.5 * plateau, NDIGITS),
1262
                                round(0.5 * plateau, NDIGITS),
1263
                                round(0.5 * width + 0.5 * plateau,
1264
                                      NDIGITS), +inf),
1265
                        seq=(_zero, raising, _one, falling, _zero))
1266

1267

1268
def general_cosine(duration: float, *arg: float) -> Waveform:
12✔
1269
    wav = zero()
×
1270
    arg_ = np.asarray(arg)
×
1271
    arg_ /= arg_[::2].sum()
×
1272
    for i, a in enumerate(arg_, start=1):
×
1273
        wav += a / 2 * (1 - (-1)**i * cos(i * 2 * pi / duration))
×
1274
    return wav * square(duration)
×
1275

1276

1277
def slepian(duration: float, *arg: float) -> Waveform:
12✔
1278
    wav = zero()
×
1279
    arg_ = np.asarray(arg)
×
1280
    arg_ /= arg_[::2].sum()
×
1281
    for i, a in enumerate(arg_, start=1):
×
1282
        wav += a / 2 * (1 - (-1)**i * cos(i * 2 * pi / duration))
×
1283
    return wav * square(duration)
×
1284

1285

1286
def mollifier(width: float, plateau: float = 0.0, d: int = 0) -> Waveform:
12✔
1287
    """
1288
    Mollifier function is a smooth function that is 1 at the origin and 0 outside a certain radius.
1289
    It is defined as:
1290

1291
    f(x) = exp(1 / ((x / r) ^ 2 - 1) + 1)  in case |x| < r
1292
         = 0                           in case |x| >= r
1293
    where r = width / 2 is the radius of the mollifier.
1294

1295
    The parameter plateau is the width of the plateau.
1296
    The parameter d is the order of the derivative.
1297
    """
1298
    assert d >= 0 and isinstance(d, int), "d must be a non-negative integer"
×
1299
    assert width > 0, "width must be positive"
×
1300

1301
    if plateau <= 0:
×
1302
        return Waveform(bounds=(-0.5 * width, 0.5 * width, inf),
×
1303
                        seq=(_zero, basic_wave(MOLLIFIER, width / 2,
1304
                                               d), _zero))
1305
    else:
1306
        return Waveform(bounds=(-0.5 * width - 0.5 * plateau, -0.5 * plateau,
×
1307
                                0.5 * plateau, 0.5 * width + 0.5 * plateau,
1308
                                inf),
1309
                        seq=(_zero,
1310
                             basic_wave(MOLLIFIER,
1311
                                        width / 2,
1312
                                        d,
1313
                                        shift=-0.5 * plateau), _one,
1314
                             basic_wave(MOLLIFIER,
1315
                                        width / 2,
1316
                                        d,
1317
                                        shift=0.5 * plateau), _zero))
1318

1319

1320
def _poly(*a):
12✔
1321
    """
1322
    a[0] + a[1] * t + a[2] * t**2 + ...
1323
    """
1324
    t = []
12✔
1325
    amp = []
12✔
1326
    if a[0] != 0:
12✔
1327
        t.append(((), ()))
12✔
1328
        amp.append(a[0])
12✔
1329
    for n, a_ in enumerate(a[1:], start=1):
12✔
1330
        if a_ != 0:
12✔
1331
            t.append((((LINEAR, 0), ), (n, )))
12✔
1332
            amp.append(a_)
12✔
1333
    return tuple(t), tuple(a)
12✔
1334

1335

1336
def poly(a):
12✔
1337
    """
1338
    a[0] + a[1] * t + a[2] * t**2 + ...
1339
    """
1340
    return Waveform(seq=(_poly(*a), ))
12✔
1341

1342

1343
def t():
12✔
1344
    return Waveform(seq=((((LINEAR, 0), ), (1, )), (1, )))
×
1345

1346

1347
def drag(freq: float,
12✔
1348
         width: float,
1349
         plateau: float = 0,
1350
         delta: float = 0,
1351
         block_freq: float | None = None,
1352
         phase: float = 0,
1353
         t0: float = 0) -> Waveform:
1354
    phase += pi * delta * (width + plateau)
×
1355
    if plateau <= 0:
×
1356
        return Waveform(seq=(_zero,
×
1357
                             basic_wave(DRAG, t0, freq, width, delta,
1358
                                        block_freq, phase), _zero),
1359
                        bounds=(round(t0, NDIGITS), round(t0 + width,
1360
                                                          NDIGITS), +inf))
1361
    elif width <= 0:
×
1362
        w = 2 * pi * (freq + delta)
×
1363
        return Waveform(
×
1364
            seq=(_zero,
1365
                 basic_wave(COS, w,
1366
                            shift=(phase + 2 * pi * delta * t0) / w), _zero),
1367
            bounds=(round(t0, NDIGITS), round(t0 + plateau, NDIGITS), +inf))
1368
    else:
1369
        w = 2 * pi * (freq + delta)
×
1370
        return Waveform(
×
1371
            seq=(_zero,
1372
                 basic_wave(DRAG, t0, freq, width, delta, block_freq, phase),
1373
                 basic_wave(COS, w, shift=(phase + 2 * pi * delta * t0) / w),
1374
                 basic_wave(DRAG, t0 + plateau, freq, width, delta, block_freq,
1375
                            phase - 2 * pi * delta * plateau), _zero),
1376
            bounds=(round(t0, NDIGITS), round(t0 + width / 2, NDIGITS),
1377
                    round(t0 + width / 2 + plateau,
1378
                          NDIGITS), round(t0 + width + plateau,
1379
                                          NDIGITS), +inf))
1380

1381

1382
def chirp(f0: float,
12✔
1383
          f1: float,
1384
          T: float,
1385
          phi0: float = 0,
1386
          type: str = 'linear') -> Waveform:
1387
    """
1388
    A chirp is a signal in which the frequency increases (up-chirp)
1389
    or decreases (down-chirp) with time. In some sources, the term
1390
    chirp is used interchangeably with sweep signal.
1391

1392
    type: "linear", "exponential", "hyperbolic"
1393
    """
1394
    if f0 == f1:
12✔
1395
        return sin(f0, phi0)
×
1396
    if T <= 0:
12✔
1397
        raise ValueError('T must be positive')
×
1398

1399
    if type == 'linear':
12✔
1400
        # f(t) = f1 * (t/T) + f0 * (1 - t/T)
1401
        return Waveform(bounds=(0, round(T, NDIGITS), +inf),
12✔
1402
                        seq=(_zero, basic_wave(LINEARCHIRP, f0, f1, T,
1403
                                               phi0), _zero))
1404
    elif type in ['exp', 'exponential', 'geometric']:
12✔
1405
        # f(t) = f0 * (f1/f0) ** (t/T)
1406
        if f0 == 0:
12✔
1407
            raise ValueError('f0 must be non-zero')
×
1408
        alpha = np.log(f1 / f0) / T
12✔
1409
        return Waveform(bounds=(0, round(T, NDIGITS), +inf),
12✔
1410
                        seq=(_zero,
1411
                             basic_wave(EXPONENTIALCHIRP, f0, alpha,
1412
                                        phi0), _zero))
1413
    elif type in ['hyperbolic', 'hyp']:
12✔
1414
        # f(t) = f0 * f1 / (f0 * (t/T) + f1 * (1-t/T))
1415
        if f0 * f1 == 0:
12✔
1416
            return const(np.sin(phi0))
×
1417
        k = (f0 - f1) / (f1 * T)
12✔
1418
        return Waveform(bounds=(0, round(T, NDIGITS), +inf),
12✔
1419
                        seq=(_zero, basic_wave(HYPERBOLICCHIRP, f0, k,
1420
                                               phi0), _zero))
1421
    else:
1422
        raise ValueError(f'unknown type {type}')
×
1423

1424

1425
def interp(x: NDArray[np.float64], y: NDArray[np.float64]) -> Waveform:
12✔
1426
    seq, bounds = [_zero], [x[0]]
×
1427
    for x1, x2, y1, y2 in zip(x[:-1], x[1:], y[:-1], y[1:]):
×
1428
        if x2 == x1:
×
1429
            continue
×
1430
        seq.append(
×
1431
            add(
1432
                mul(_const((y2 - y1) / (x2 - x1)), basic_wave(LINEAR,
1433
                                                              shift=x1)),
1434
                _const(y1)))
1435
        bounds.append(x2)
×
1436
    bounds.append(inf)
×
1437
    seq.append(_zero)
×
1438
    return Waveform(seq=tuple(seq),
×
1439
                    bounds=tuple(round(b, NDIGITS)
1440
                                 for b in bounds)).simplify()
1441

1442

1443
def cut(wav: Waveform,
12✔
1444
        start: float | None = None,
1445
        stop: float | None = None,
1446
        head: float | None = None,
1447
        tail: float | None = None,
1448
        min: float | None = None,
1449
        max: float | None = None) -> Waveform:
1450
    offset = 0
×
1451
    if start is not None and head is not None:
×
1452
        offset = head - cast(NDArray[np.float64], wav(np.array([1.0 * start
×
1453
                                                                ])))[0]
1454
    elif stop is not None and tail is not None:
×
1455
        offset = tail - cast(NDArray[np.float64], wav(np.array([1.0 * stop
×
1456
                                                                ])))[0]
1457
    wav = wav + offset
×
1458

1459
    if start is not None:
×
1460
        wav = wav * (step(0) >> start)
×
1461
    if stop is not None:
×
1462
        wav = wav * ((1 - step(0)) >> stop)
×
1463
    if min is not None:
×
1464
        wav.min = min
×
1465
    if max is not None:
×
1466
        wav.max = max
×
1467
    return wav
×
1468

1469

1470
def function(fun, *args, start=None, stop=None):
12✔
1471
    TYPEID = registerBaseFunc(fun)
×
1472
    seq = (basic_wave(TYPEID, *args), )
×
1473
    wav = Waveform(seq=seq)
×
1474
    if start is not None:
×
1475
        wav = wav * (step(0) >> start)
×
1476
    if stop is not None:
×
1477
        wav = wav * ((1 - step(0)) >> stop)
×
1478
    return wav
×
1479

1480

1481
def samplingPoints(start, stop, points):
12✔
1482
    return Waveform(bounds=(round(start, NDIGITS), round(stop, NDIGITS), inf),
×
1483
                    seq=(_zero, basic_wave(INTERP, start, stop,
1484
                                           tuple(points)), _zero))
1485

1486

1487
def mixing(I: Waveform,
12✔
1488
           Q: Waveform | None = None,
1489
           *,
1490
           phase: float = 0.0,
1491
           freq: float = 0.0,
1492
           ratioIQ: float = 1.0,
1493
           phaseDiff: float = 0.0,
1494
           block_freq: float | None = None,
1495
           DRAGScaling: float | None = None) -> tuple[Waveform, Waveform]:
1496
    """SSB or envelope mixing
1497
    """
1498
    if Q is None:
×
1499
        I = I
×
1500
        Q = zero()
×
1501

1502
    w = 2 * pi * freq
×
1503
    if freq != 0.0:
×
1504
        # SSB mixing
1505
        Iout = I * cos(w, -phase) + Q * sin(w, -phase)
×
1506
        Qout = -I * sin(w, -phase + phaseDiff) + Q * cos(w, -phase + phaseDiff)
×
1507
    else:
1508
        # envelope mixing
1509
        Iout = cast(Waveform, I * np.cos(-phase) + Q * np.sin(-phase))
×
1510
        Qout = cast(Waveform, -I * np.sin(-phase) + Q * np.cos(-phase))
×
1511

1512
    # apply DRAG
1513
    if block_freq is not None and block_freq != freq:
×
1514
        a = block_freq / (block_freq - freq)
×
1515
        b = 1 / (block_freq - freq)
×
1516
        I = a * Iout + b / (2 * pi) * D(Qout)
×
1517
        Q = a * Qout - b / (2 * pi) * D(Iout)
×
1518
        Iout, Qout = I, Q
×
1519
    elif DRAGScaling is not None and DRAGScaling != 0:
×
1520
        # 2 * pi * scaling * (freq - block_freq) = 1
1521
        I = (1 - w * DRAGScaling) * Iout - DRAGScaling * D(Qout)
×
1522
        Q = (1 - w * DRAGScaling) * Qout + DRAGScaling * D(Iout)
×
1523
        Iout, Qout = I, Q
×
1524

1525
    Qout = ratioIQ * Qout
×
1526

1527
    return Iout, Qout
×
1528

1529

1530
__all__ = [
12✔
1531
    'D', 'Waveform', 'chirp', 'const', 'cos', 'cosh', 'coshPulse', 'cosPulse',
1532
    'cut', 'drag', 'exp', 'function', 'gaussian', 'general_cosine', 'hanning',
1533
    'interp', 'mixing', 'mollifier', 'one', 'poly', 'registerBaseFunc',
1534
    'registerDerivative', 'samplingPoints', 'sign', 'sin', 'sinc', 'sinh',
1535
    'square', 'step', 't', 'zero'
1536
]
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc