• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

feihoo87 / waveforms / 16361817426

18 Jul 2025 03:57AM UTC coverage: 52.861%. First build
16361817426

push

github

feihoo87
Add mollifier function and update version to 2.2.0

- Introduced a new mollifier function for smooth transitions in waveforms.
- Added support for the mollifier in the Waveform class and its associated formatting.
- Updated version number to 2.2.0 to reflect the new feature addition.

14 of 51 new or added lines in 3 files covered. (27.45%)

1321 of 2499 relevant lines covered (52.86%)

6.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

45.86
/waveforms/waveform.py
1
from fractions import Fraction
12✔
2
from typing import Generator, Iterable, cast
12✔
3

4
import numpy as np
12✔
5
from numpy import e, inf, pi
12✔
6
from numpy.typing import NDArray
12✔
7
from scipy.signal import sosfilt
12✔
8

9
from ._waveform import (
12✔
10
    _D, COS, COSH, DRAG, ERF, EXP, EXPONENTIALCHIRP, GAUSSIAN, HYPERBOLICCHIRP,
11
    INTERP, LINEAR, LINEARCHIRP, MOLLIFIER, NDIGITS, SINC, SINH, _baseFunc,
12
    _baseFunc_latex, _const, _half, _one, _zero, add, basic_wave, calc_parts,
13
    filter, is_const, merge_waveform, mul, pow, registerBaseFunc,
14
    registerBaseFuncLatex, registerDerivative, shift, simplify, wave_sum)
15

16

17
def _test_spec_num(num, spec):
12✔
18
    x = Fraction(num / spec).limit_denominator(1000000000)
×
19
    if x.denominator <= 24:
×
20
        return True, x, 1
×
21
    x = Fraction(spec * num).limit_denominator(1000000000)
×
22
    if x.denominator <= 24:
×
23
        return True, x, -1
×
24
    return False, x, 0
×
25

26

27
def _spec_num_latex(num):
12✔
28
    for spec, spec_latex in [(1, ''), (np.sqrt(2), '\\sqrt{2}'),
×
29
                             (np.sqrt(3), '\\sqrt{3}'),
30
                             (np.sqrt(5), '\\sqrt{5}'),
31
                             (np.log(2), '\\log{2}'), (np.log(3), '\\log{3}'),
32
                             (np.log(5), '\\log{5}'), (np.e, 'e'),
33
                             (np.pi, '\\pi'), (np.pi**2, '\\pi^2'),
34
                             (np.sqrt(np.pi), '\\sqrt{\\pi}')]:
35
        flag, x, sign = _test_spec_num(num, spec)
×
36
        if flag:
×
37
            if sign < 0:
×
38
                spec_latex = f"\\frac{{{1}}}{{{spec_latex}}}"
×
39
            if x.denominator == 1:
×
40
                if x.numerator == 1:
×
41
                    return f"{spec_latex}"
×
42
                else:
43
                    return f"{x.numerator:g}{spec_latex}"
×
44
            else:
45
                if x.numerator < 0:
×
46
                    return f"-\\frac{{{-x.numerator}}}{{{x.denominator}}}{spec_latex}"
×
47
                else:
48
                    return f"\\frac{{{x.numerator}}}{{{x.denominator}}}{spec_latex}"
×
49
    return f"{num:g}"
×
50

51

52
def _num_latex(num):
12✔
53
    if num == -np.inf:
×
54
        return r"-\infty"
×
55
    elif num == np.inf:
×
56
        return r"\infty"
×
57
    if num.imag > 0:
×
58
        return f"\\left({_num_latex(num.real)}+{_num_latex(num.imag)}j\\right)"
×
59
    elif num.imag < 0:
×
60
        return f"\\left({_num_latex(num.real)}-{_num_latex(-num.imag)}j\\right)"
×
61
    s = _spec_num_latex(num.real)
×
62
    if s == '' and round(num.real) == 1:
×
63
        return '1'
×
64
    if "e" in s:
×
65
        a, n = s.split("e")
×
66
        n = float(n)
×
67
        s = f"{a} \\times 10^{{{n:g}}}"
×
68
    return s
×
69

70

71
def _fun_latex(fun):
12✔
72
    funID, *args, shift = fun
×
73
    if _baseFunc_latex[funID] is None:
×
74
        shift = _num_latex(shift)
×
75
        if shift == "0":
×
76
            shift = ""
×
77
        elif shift[0] != '-':
×
78
            shift = "+" + shift
×
79
        return r"\mathrm{Func}" + f"{funID}(t{shift}, ...)"
×
80
    return _baseFunc_latex[funID](shift, *args)
×
81

82

83
def _wav_latex(wav):
12✔
84

85
    if wav == _zero:
×
86
        return "0"
×
87
    elif is_const(wav):
×
88
        return f"{wav[1][0]}"
×
89

90
    sum_expr = []
×
91
    for mul, amp in zip(*wav):
×
92
        if mul == ((), ()):
×
93
            sum_expr.append(_num_latex(amp))
×
94
            continue
×
95
        mul_expr = []
×
96
        amp = _num_latex(amp)
×
97
        if amp != "1":
×
98
            mul_expr.append(amp)
×
99
        for fun, n in zip(*mul):
×
100
            fun_expr = _fun_latex(fun)
×
101
            if n != 1:
×
102
                mul_expr.append(fun_expr + "^{" + f"{n}" + "}")
×
103
            else:
104
                mul_expr.append(fun_expr)
×
105
        sum_expr.append(''.join(mul_expr))
×
106

107
    ret = sum_expr[0]
×
108
    for expr in sum_expr[1:]:
×
109
        if expr[0] == '-':
×
110
            ret += expr
×
111
        else:
112
            ret += "+" + expr
×
113
    return ret
×
114

115

116
class Waveform:
12✔
117
    __slots__ = ('bounds', 'seq', 'max', 'min', 'start', 'stop', 'sample_rate',
12✔
118
                 'filters', 'label')
119

120
    def __init__(self, bounds=(+inf, ), seq=(_zero, ), min=-inf, max=inf):
12✔
121
        self.bounds = bounds
12✔
122
        self.seq = seq
12✔
123
        self.max = max
12✔
124
        self.min = min
12✔
125
        self.start = None
12✔
126
        self.stop = None
12✔
127
        self.sample_rate = None
12✔
128
        self.filters: tuple[np.ndarray, float] | None = None
12✔
129
        self.label = None
12✔
130

131
    @staticmethod
12✔
132
    def _begin(bounds, seq):
12✔
133
        for i, s in enumerate(seq):
×
134
            if s is not _zero:
×
135
                if i == 0:
×
136
                    return -inf
×
137
                return bounds[i - 1]
×
138
        return inf
×
139

140
    @staticmethod
12✔
141
    def _end(bounds, seq):
12✔
142
        N = len(bounds)
×
143
        for i, s in enumerate(seq[::-1]):
×
144
            if s is not _zero:
×
145
                if i == 0:
×
146
                    return inf
×
147
                return bounds[N - i - 1]
×
148
        return -inf
×
149

150
    @property
12✔
151
    def begin(self):
12✔
152
        if self.start is None:
×
153
            return self._begin(self.bounds, self.seq)
×
154
        else:
155
            return max(self.start, self._begin(self.bounds, self.seq))
×
156

157
    @property
12✔
158
    def end(self):
12✔
159
        if self.stop is None:
×
160
            return self._end(self.bounds, self.seq)
×
161
        else:
162
            return min(self.stop, self._end(self.bounds, self.seq))
×
163

164
    def sample(
12✔
165
        self,
166
        sample_rate=None,
167
        out: np.ndarray | None = None,
168
        chunk_size=None,
169
        function_lib=None,
170
        filters: tuple[np.ndarray, float] | None = None
171
    ) -> np.ndarray | Iterable[np.ndarray]:
172
        if sample_rate is None:
12✔
173
            sample_rate = self.sample_rate
12✔
174
        if self.start is None or self.stop is None or sample_rate is None:
12✔
175
            raise ValueError(
×
176
                f'Waveform is not initialized. {self.start=}, {self.stop=}, {sample_rate=}'
177
            )
178
        if filters is None:
12✔
179
            filters = self.filters
12✔
180
        if chunk_size is None:
12✔
181
            x = np.arange(self.start, self.stop, 1 / sample_rate)
12✔
182
            sig = self.__call__(x, out=out, function_lib=function_lib)
12✔
183
            if filters is not None:
12✔
184
                sos, initial = filters
12✔
185
                if not isinstance(sos, np.ndarray):
12✔
186
                    sos = np.array(sos)
×
187
                elif not sos.flags.writeable:
12✔
188
                    sos = sos.copy()
×
189
                if initial:
12✔
NEW
190
                    sig = cast(np.ndarray, sosfilt(sos,
×
191
                                                   sig - initial)) + initial
192
                else:
193
                    sig = cast(np.ndarray, sosfilt(sos, sig))
12✔
194
            return cast(np.ndarray, sig)
12✔
195
        else:
196
            return self._sample_iter(sample_rate, chunk_size, out,
×
197
                                     function_lib, filters)
198

199
    def _sample_iter(
12✔
200
        self, sample_rate, chunk_size, out: np.ndarray | None, function_lib,
201
        filters: tuple[np.ndarray, float] | None
202
    ) -> Generator[np.ndarray, None, None]:
NEW
203
        start = cast(float, self.start)
×
204
        start_n = 0
×
205
        if filters is not None:
×
206
            sos, initial = filters
×
207
            if not isinstance(sos, np.ndarray):
×
208
                sos = np.array(sos)
×
209
            elif not sos.flags.writeable:
×
210
                sos = sos.copy()
×
211
            # zi = sosfilt_zi(sos)
212
            zi = np.zeros((sos.shape[0], 2))
×
213
        length = chunk_size / sample_rate
×
NEW
214
        while start < cast(float, self.stop):
×
NEW
215
            if start + length > cast(float, self.stop):
×
NEW
216
                length = cast(float, self.stop) - start
×
NEW
217
                stop = cast(float, self.stop)
×
218
                size = round((stop - start) * sample_rate)
×
219
            else:
220
                stop = start + length
×
221
                size = chunk_size
×
222
            x = np.linspace(start, stop, size, endpoint=False)
×
223

224
            if filters is None:
×
225
                if out is not None:
×
NEW
226
                    yield cast(
×
227
                        np.ndarray,
228
                        self.__call__(x,
229
                                      out=out[start_n:],
230
                                      function_lib=function_lib))
231
                else:
NEW
232
                    yield cast(np.ndarray,
×
233
                               self.__call__(x, function_lib=function_lib))
234
            else:
NEW
235
                sig = cast(np.ndarray,
×
236
                           self.__call__(x, function_lib=function_lib))
237
                if initial:
×
238
                    sig -= initial
×
239
                sig, zi = sosfilt(sos, sig, zi=zi)
×
240
                if initial:
×
241
                    sig += initial
×
242
                if out is not None:
×
243
                    out[start_n:start_n + size] = sig
×
NEW
244
                yield cast(np.ndarray, sig)
×
245

246
            start = stop
×
247
            start_n += chunk_size
×
248

249
    @staticmethod
12✔
250
    def _tolist(bounds, seq, ret=None):
12✔
251
        if ret is None:
12✔
252
            ret = []
×
253
        ret.append(len(bounds))
12✔
254
        for seq, b in zip(seq, bounds):
12✔
255
            ret.append(b)
12✔
256
            tlist, amplist = seq
12✔
257
            ret.append(len(amplist))
12✔
258
            for t, amp in zip(tlist, amplist):
12✔
259
                ret.append(amp)
12✔
260
                mtlist, nlist = t
12✔
261
                ret.append(len(nlist))
12✔
262
                for fun, n in zip(mtlist, nlist):
12✔
263
                    ret.append(n)
12✔
264
                    ret.append(len(fun))
12✔
265
                    ret.extend(fun)
12✔
266
        return ret
12✔
267

268
    @staticmethod
12✔
269
    def _fromlist(l, pos=0):
12✔
270

271
        def _read(l, pos, size):
12✔
272
            try:
12✔
273
                return tuple(l[pos:pos + size]), pos + size
12✔
274
            except:
×
275
                raise ValueError('Invalid waveform format')
×
276

277
        (nseg, ), pos = _read(l, pos, 1)
12✔
278
        bounds = []
12✔
279
        seq = []
12✔
280
        for _ in range(nseg):
12✔
281
            (b, nsum), pos = _read(l, pos, 2)
12✔
282
            bounds.append(b)
12✔
283
            amp = []
12✔
284
            t = []
12✔
285
            for _ in range(nsum):
12✔
286
                (a, nmul), pos = _read(l, pos, 2)
12✔
287
                amp.append(a)
12✔
288
                nlst = []
12✔
289
                mt = []
12✔
290
                for _ in range(nmul):
12✔
291
                    (n, nfun), pos = _read(l, pos, 2)
12✔
292
                    nlst.append(n)
12✔
293
                    fun, pos = _read(l, pos, nfun)
12✔
294
                    mt.append(fun)
12✔
295
                t.append((tuple(mt), tuple(nlst)))
12✔
296
            seq.append((tuple(t), tuple(amp)))
12✔
297

298
        return tuple(bounds), tuple(seq), pos
12✔
299

300
    def tolist(self):
12✔
301
        l = [self.max, self.min, self.start, self.stop, self.sample_rate]
12✔
302
        if self.filters is None:
12✔
303
            l.append(None)
12✔
304
        else:
305
            sos, initial = self.filters
12✔
306
            sos = list(sos.reshape(-1))
12✔
307
            l.append(len(sos))
12✔
308
            l.extend(sos)
12✔
309
            l.append(initial)
12✔
310

311
        return self._tolist(self.bounds, self.seq, l)
12✔
312

313
    @classmethod
12✔
314
    def fromlist(cls, l):
12✔
315
        w = cls()
12✔
316
        pos = 6
12✔
317
        (w.max, w.min, w.start, w.stop, w.sample_rate, sos_size) = l[:pos]
12✔
318
        if sos_size is not None:
12✔
319
            sos = np.array(l[pos:pos + sos_size]).reshape(-1, 6)
12✔
320
            pos += sos_size
12✔
321
            initial = l[pos]
12✔
322
            pos += 1
12✔
323
            w.filters = sos, initial
12✔
324

325
        w.bounds, w.seq, pos = cls._fromlist(l, pos)
12✔
326
        return w
12✔
327

328
    def totree(self):
12✔
329
        if self.filters is None:
12✔
330
            header = (self.max, self.min, self.start, self.stop,
12✔
331
                      self.sample_rate, None)
332
        else:
333
            header = (self.max, self.min, self.start, self.stop,
12✔
334
                      self.sample_rate, self.filters)
335
        body = []
12✔
336

337
        for seq, b in zip(self.seq, self.bounds):
12✔
338
            tlist, amplist = seq
12✔
339
            new_seq = []
12✔
340
            for t, amp in zip(tlist, amplist):
12✔
341
                mtlist, nlist = t
12✔
342
                new_t = []
12✔
343
                for fun, n in zip(mtlist, nlist):
12✔
344
                    new_t.append((n, fun))
12✔
345
                new_seq.append((amp, tuple(new_t)))
12✔
346
            body.append((b, tuple(new_seq)))
12✔
347
        return header, tuple(body)
12✔
348

349
    @staticmethod
12✔
350
    def fromtree(tree):
12✔
351
        w = Waveform()
12✔
352
        header, body = tree
12✔
353

354
        (w.max, w.min, w.start, w.stop, w.sample_rate, w.filters) = header
12✔
355
        bounds = []
12✔
356
        seqs = []
12✔
357
        for b, seq in body:
12✔
358
            bounds.append(b)
12✔
359
            amp_list = []
12✔
360
            t_list = []
12✔
361
            for amp, t in seq:
12✔
362
                amp_list.append(amp)
12✔
363
                n_list = []
12✔
364
                mt_list = []
12✔
365
                for n, mt in t:
12✔
366
                    n_list.append(n)
12✔
367
                    mt_list.append(mt)
12✔
368
                t_list.append((tuple(mt_list), tuple(n_list)))
12✔
369
            seqs.append((tuple(t_list), tuple(amp_list)))
12✔
370
        w.bounds = tuple(bounds)
12✔
371
        w.seq = tuple(seqs)
12✔
372
        return w
12✔
373

374
    def simplify(self, eps=1e-15):
12✔
375
        seq = [simplify(self.seq[0], eps)]
12✔
376
        bounds = [self.bounds[0]]
12✔
377
        for expr, b in zip(self.seq[1:], self.bounds[1:]):
12✔
378
            expr = simplify(expr, eps)
12✔
379
            if expr == seq[-1]:
12✔
380
                seq.pop()
×
381
                bounds.pop()
×
382
            seq.append(expr)
12✔
383
            bounds.append(b)
12✔
384
        return Waveform(tuple(bounds), tuple(seq))
12✔
385

386
    def filter(self, low=0, high=inf, eps=1e-15):
12✔
387
        seq = []
×
388
        for expr in self.seq:
×
389
            seq.append(filter(expr, low, high, eps))
×
390
        return Waveform(self.bounds, tuple(seq))
×
391

392
    def _comb(self, other, oper):
12✔
393
        return Waveform(*merge_waveform(self.bounds, self.seq, other.bounds,
12✔
394
                                        other.seq, oper))
395

396
    def __pow__(self, n):
12✔
397
        return Waveform(self.bounds, tuple(pow(w, n) for w in self.seq))
12✔
398

399
    def __add__(self, other):
12✔
400
        if isinstance(other, Waveform):
12✔
401
            return self._comb(other, add)
12✔
402
        else:
403
            return self + const(other)
12✔
404

405
    def __radd__(self, v):
12✔
406
        return const(v) + self
×
407

408
    def __ior__(self, other):
12✔
409
        return self | other
×
410

411
    def __or__(self, other):
12✔
412
        if isinstance(other, (int, float, complex)):
×
413
            other = const(other)
×
414
        w = self.marker + other.marker
×
415

416
        def _or(a, b):
×
417
            if a != _zero or b != _zero:
×
418
                return _one
×
419
            else:
420
                return _zero
×
421

422
        return self._comb(other, _or)
×
423

424
    def __iand__(self, other):
12✔
425
        return self & other
×
426

427
    def __and__(self, other):
12✔
428
        if isinstance(other, (int, float, complex)):
×
429
            other = const(other)
×
430
        w = self.marker + other.marker
×
431

432
        def _and(a, b):
×
433
            if a != _zero and b != _zero:
×
434
                return _one
×
435
            else:
436
                return _zero
×
437

438
        return self._comb(other, _and)
×
439

440
    @property
12✔
441
    def marker(self):
12✔
442
        w = self.simplify()
×
443
        return Waveform(w.bounds,
×
444
                        tuple(_zero if s == _zero else _one for s in w.seq))
445

446
    def mask(self, edge=0):
12✔
447
        w = self.marker
×
448
        in_wave = w.seq[0] == _zero
×
449
        bounds = []
×
450
        seq = []
×
451

452
        if w.seq[0] == _zero:
×
453
            in_wave = False
×
454
            b = w.bounds[0] - edge
×
455
            bounds.append(b)
×
456
            seq.append(_zero)
×
457

458
        for b, s in zip(w.bounds[1:], w.seq[1:]):
×
459
            if not in_wave and s != _zero:
×
460
                in_wave = True
×
461
                bounds.append(b + edge)
×
462
                seq.append(_one)
×
463
            elif in_wave and s == _zero:
×
464
                in_wave = False
×
465
                b = b - edge
×
466
                if b > bounds[-1]:
×
467
                    bounds.append(b)
×
468
                    seq.append(_zero)
×
469
                else:
470
                    bounds.pop()
×
471
                    bounds.append(b)
×
472
        return Waveform(tuple(bounds), tuple(seq))
×
473

474
    def __mul__(self, other):
12✔
475
        if isinstance(other, Waveform):
12✔
476
            return self._comb(other, mul)
12✔
477
        else:
478
            return self * const(other)
×
479

480
    def __rmul__(self, v):
12✔
481
        return const(v) * self
12✔
482

483
    def __truediv__(self, other):
12✔
484
        if isinstance(other, Waveform):
12✔
485
            raise TypeError('division by waveform')
×
486
        else:
487
            return self * const(1 / other)
12✔
488

489
    def __neg__(self):
12✔
490
        return -1 * self
12✔
491

492
    def __sub__(self, other):
12✔
493
        return self + (-other)
12✔
494

495
    def __rsub__(self, v):
12✔
496
        return v + (-self)
×
497

498
    def __rshift__(self, time):
12✔
499
        return Waveform(
12✔
500
            tuple(round(bound + time, NDIGITS) for bound in self.bounds),
501
            tuple(shift(expr, time) for expr in self.seq))
502

503
    def __lshift__(self, time):
12✔
504
        return self >> (-time)
12✔
505

506
    @staticmethod
12✔
507
    def _merge_parts(
12✔
508
        parts: list[tuple[int, int, np.ndarray | int | float | complex]],
509
        out: list[tuple[int, int, np.ndarray | int | float | complex]]
510
    ) -> list[tuple[int, int, np.ndarray | int | float | complex]]:
511
        # TODO: merge parts
512
        raise NotImplementedError
×
513

514
    @staticmethod
12✔
515
    def _fill_parts(parts, out):
12✔
516
        for start, stop, part in parts:
12✔
517
            out[start:stop] += part
12✔
518

519
    def __call__(
12✔
520
        self,
521
        x,
522
        frag=False,
523
        out: np.ndarray | None = None,
524
        accumulate=False,
525
        function_lib=None
526
    ) -> NDArray[np.float64] | list[tuple[int, int,
527
                                          NDArray[np.float64]]] | np.float64:
528
        if function_lib is None:
12✔
529
            function_lib = _baseFunc
12✔
530
        if isinstance(x, (int, float, complex)):
12✔
NEW
531
            return cast(
×
532
                NDArray[np.float64],
533
                self.__call__(np.array([x]), function_lib=function_lib))[0]
534
        parts, dtype = calc_parts(self.bounds, self.seq, x, function_lib,
12✔
535
                                  self.min, self.max)
536
        if not frag:
12✔
537
            if out is None:
12✔
538
                out = np.zeros_like(x, dtype=dtype)
12✔
539
            elif not accumulate:
×
540
                out *= 0
×
541
            self._fill_parts(parts, out)
12✔
542
        else:
543
            if out is None:
×
544
                return parts
×
545
            else:
546
                if not accumulate:
×
547
                    out.clear()
×
548
                    out.extend(parts)
×
549
                else:
550
                    self._merge_parts(parts, out)
×
551
        return out
12✔
552

553
    def __hash__(self):
12✔
554
        return hash((self.max, self.min, self.start, self.stop,
×
555
                     self.sample_rate, self.bounds, self.seq))
556

557
    def __eq__(self, o: object) -> bool:
12✔
558
        if isinstance(o, (int, float, complex)):
12✔
559
            return self == const(o)
12✔
560
        elif isinstance(o, Waveform):
12✔
561
            a = self.simplify()
12✔
562
            b = o.simplify()
12✔
563
            return a.seq == b.seq and a.bounds == b.bounds and (
12✔
564
                a.max, a.min, a.start, a.stop) == (b.max, b.min, b.start,
565
                                                   b.stop)
566
        else:
567
            return False
×
568

569
    def _repr_latex_(self):
12✔
570
        parts = []
×
571
        start = -np.inf
×
572
        for end, wav in zip(self.bounds, self.seq):
×
573
            e_str = _wav_latex(wav)
×
574
            start_str = _num_latex(start)
×
575
            end_str = _num_latex(end)
×
576
            parts.append(e_str + r",~~&t\in" + f"({start_str},{end_str}" +
×
577
                         (']' if end < np.inf else ')'))
578
            start = end
×
579
        if len(parts) == 1:
×
580
            expr = ''.join(['f(t)=', *parts[0].split('&')])
×
581
        else:
582
            expr = '\n'.join([
×
583
                r"f(t)=\begin{cases}", (r"\\" + '\n').join(parts),
584
                r"\end{cases}"
585
            ])
586
        return "$$\n{}\n$$".format(expr)
×
587

588
    def _play(self, time_unit, volume=1.0):
12✔
589
        import pyaudio
×
590

591
        CHUNK = 1024
×
592
        RATE = 48000
×
593

594
        dynamic_volume = 1.0
×
595
        amp = 2**15 * 0.999 * volume * dynamic_volume
×
596

597
        p = pyaudio.PyAudio()
×
598
        try:
×
599
            stream = p.open(format=pyaudio.paInt16,
×
600
                            channels=1,
601
                            rate=RATE,
602
                            output=True)
603
            try:
×
604
                for data in self.sample(sample_rate=RATE / time_unit,
×
605
                                        chunk_size=CHUNK):
606
                    lim = np.abs(data).max()
×
607
                    if lim > 0 and dynamic_volume > 1.0 / lim:
×
608
                        dynamic_volume = 1.0 / lim
×
609
                        amp = 2**15 * 0.99 * volume * dynamic_volume
×
610
                    data = (amp * data).astype(np.int16)
×
611
                    stream.write(bytes(data.data))
×
612
            finally:
613
                stream.stop_stream()
×
614
                stream.close()
×
615
        finally:
616
            p.terminate()
×
617

618
    def play(self, time_unit=1, volume=1.0):
12✔
619
        import multiprocessing as mp
×
620
        p = mp.Process(target=self._play,
×
621
                       args=(time_unit, volume),
622
                       daemon=True)
623
        p.start()
×
624

625

626
class WaveVStack(Waveform):
12✔
627

628
    def __init__(self, wlist: list[Waveform] = []):
12✔
629
        self.wlist = [(w.bounds, w.seq) for w in wlist]
12✔
630
        self.start = None
12✔
631
        self.stop = None
12✔
632
        self.sample_rate = None
12✔
633
        self.offset = 0
12✔
634
        self.shift = 0
12✔
635
        self.filters = None
12✔
636
        self.label = None
12✔
637
        self.function_lib = None
12✔
638

639
    def __begin(self):
12✔
640
        if self.wlist:
×
641
            v = [self._begin(bounds, seq) for bounds, seq in self.wlist]
×
642
            return min(v)
×
643
        else:
644
            return -inf
×
645

646
    def __end(self):
12✔
647
        if self.wlist:
×
648
            v = [self._end(bounds, seq) for bounds, seq in self.wlist]
×
649
            return max(v)
×
650
        else:
651
            return inf
×
652

653
    @property
12✔
654
    def begin(self):
12✔
655
        if self.start is None:
×
656
            return self.__begin()
×
657
        else:
658
            return max(self.start, self.__begin())
×
659

660
    @property
12✔
661
    def end(self):
12✔
662
        if self.stop is None:
×
663
            return self.__end()
×
664
        else:
665
            return min(self.stop, self.__end())
×
666

667
    def __call__(self, x, frag=False, out=None, function_lib=None):
12✔
668
        assert frag is False, 'WaveVStack does not support frag mode'
12✔
669
        out = np.full_like(x, self.offset, dtype=complex)
12✔
670
        if self.shift != 0:
12✔
671
            x = x - self.shift
12✔
672
        if function_lib is None:
12✔
673
            if self.function_lib is None:
12✔
674
                function_lib = _baseFunc
12✔
675
            else:
676
                function_lib = self.function_lib
×
677
        for bounds, seq in self.wlist:
12✔
678
            parts, dtype = calc_parts(bounds, seq, x, function_lib)
12✔
679
            self._fill_parts(parts, out)
12✔
680
        return out.real
12✔
681

682
    def tolist(self):
12✔
683
        l = [
12✔
684
            self.start,
685
            self.stop,
686
            self.offset,
687
            self.shift,
688
            self.sample_rate,
689
        ]
690
        if self.filters is None:
12✔
691
            l.append(None)
12✔
692
        else:
693
            sos, initial = self.filters
12✔
694
            sos = list(sos.reshape(-1))
12✔
695
            l.append(len(sos))
12✔
696
            l.extend(sos)
12✔
697
            l.append(initial)
12✔
698
        l.append(len(self.wlist))
12✔
699
        for bounds, seq in self.wlist:
12✔
700
            self._tolist(bounds, seq, l)
12✔
701
        return l
12✔
702

703
    @classmethod
12✔
704
    def fromlist(cls, l):
12✔
705
        w = cls()
12✔
706
        pos = 6
12✔
707
        w.start, w.stop, w.offset, w.shift, w.sample_rate, sos_size = l[:pos]
12✔
708
        if sos_size is not None:
12✔
709
            sos = np.array(l[pos:pos + sos_size]).reshape(-1, 6)
12✔
710
            pos += sos_size
12✔
711
            initial = l[pos]
12✔
712
            pos += 1
12✔
713
            w.filters = sos, initial
12✔
714
        n = l[pos]
12✔
715
        pos += 1
12✔
716
        for _ in range(n):
12✔
717
            bounds, seq, pos = cls._fromlist(l, pos)
12✔
718
            w.wlist.append((bounds, seq))
12✔
719
        return w
12✔
720

721
    def simplify(self, eps=1e-15):
12✔
722
        if not self.wlist:
12✔
723
            return zero()
12✔
724
        bounds, seq = wave_sum(self.wlist)
12✔
725
        wav = Waveform(bounds=bounds, seq=seq)
12✔
726
        if self.offset != 0:
12✔
727
            wav += self.offset
×
728
        if self.shift != 0:
12✔
729
            wav >>= self.shift
×
730
        wav = wav.simplify(eps)
12✔
731
        wav.start = self.start
12✔
732
        wav.stop = self.stop
12✔
733
        wav.sample_rate = self.sample_rate
12✔
734
        return wav
12✔
735

736
    @staticmethod
12✔
737
    def _rshift(wlist, time):
12✔
738
        if time == 0:
×
739
            return wlist
×
740
        return [(tuple(round(bound + time, NDIGITS) for bound in bounds),
×
741
                 tuple(shift(expr, time) for expr in seq))
742
                for bounds, seq in wlist]
743

744
    def __rshift__(self, time):
12✔
745
        ret = WaveVStack()
12✔
746
        ret.wlist = self.wlist
12✔
747
        ret.sample_rate = self.sample_rate
12✔
748
        ret.start = self.start
12✔
749
        ret.stop = self.stop
12✔
750
        ret.shift = self.shift + time
12✔
751
        ret.offset = self.offset
12✔
752
        return ret
12✔
753

754
    def __add__(self, other):
12✔
755
        ret = WaveVStack()
12✔
756
        ret.wlist.extend(self.wlist)
12✔
757
        if isinstance(other, WaveVStack):
12✔
758
            if other.shift != self.shift:
×
759
                ret.wlist = self._rshift(ret.wlist, self.shift)
×
760
                ret.wlist.extend(self._rshift(other.wlist, other.shift))
×
761
            else:
762
                ret.wlist.extend(other.wlist)
×
763
            ret.offset = self.offset + other.offset
×
764
        elif isinstance(other, Waveform):
12✔
765
            other <<= self.shift
12✔
766
            ret.wlist.append((other.bounds, other.seq))
12✔
767
        else:
768
            # ret.wlist.append(((+inf, ), (_const(1.0 * other), )))
769
            ret.offset += other
12✔
770
        return ret
12✔
771

772
    def __radd__(self, v):
12✔
773
        return self + v
×
774

775
    def __mul__(self, other):
12✔
776
        if isinstance(other, Waveform):
12✔
777
            other = other.simplify() << self.shift
12✔
778
            ret = WaveVStack([Waveform(*w) * other for w in self.wlist])
12✔
779
            if self.offset != 0:
12✔
780
                w = other * self.offset
×
781
                ret.wlist.append((w.bounds, w.seq))
×
782
            return ret
12✔
783
        else:
784
            ret = WaveVStack([Waveform(*w) * other for w in self.wlist])
×
785
            ret.offset = self.offset * other
×
786
            return ret
×
787

788
    def __rmul__(self, v):
12✔
789
        return self * v
×
790

791
    def __eq__(self, other):
12✔
792
        if self.wlist:
×
793
            return False
×
794
        else:
795
            return zero() == other
×
796

797
    def _repr_latex_(self):
12✔
798
        return r"\sum_{i=1}^{" + f"{len(self.wlist)}" + r"}" + r"f_i(t)"
×
799

800
    def __getstate__(self) -> tuple:
12✔
801
        function_lib = self.function_lib
×
802
        if function_lib:
×
803
            try:
×
804
                import dill
×
805
                function_lib = dill.dumps(function_lib)
×
806
            except:
×
807
                function_lib = None
×
808
        return (self.wlist, self.start, self.stop, self.sample_rate,
×
809
                self.offset, self.shift, self.filters, self.label,
810
                function_lib)
811

812
    def __setstate__(self, state: tuple) -> None:
12✔
813
        (self.wlist, self.start, self.stop, self.sample_rate, self.offset,
×
814
         self.shift, self.filters, self.label, function_lib) = state
815
        if function_lib:
×
816
            try:
×
817
                import dill
×
818
                function_lib = dill.loads(function_lib)
×
819
            except:
×
820
                function_lib = None
×
821
        self.function_lib = function_lib
×
822

823

824
def play(data, rate=48000):
12✔
825
    import io
×
826

827
    import pyaudio
×
828

829
    CHUNK = 1024
×
830

831
    max_amp = np.max(np.abs(data))
×
832

833
    if max_amp > 1:
×
834
        data /= max_amp
×
835

836
    data = np.array(2**15 * 0.999 * data, dtype=np.int16)
×
837
    buff = io.BytesIO(data.data)
×
838
    p = pyaudio.PyAudio()
×
839

840
    try:
×
841
        stream = p.open(format=pyaudio.paInt16,
×
842
                        channels=1,
843
                        rate=rate,
844
                        output=True)
845
        try:
×
846
            while True:
×
847
                data = buff.read(CHUNK)
×
848
                if data:
×
849
                    stream.write(data)
×
850
                else:
851
                    break
×
852
        finally:
853
            stream.stop_stream()
×
854
            stream.close()
×
855
    finally:
856
        p.terminate()
×
857

858

859
_zero_waveform = Waveform()
12✔
860
_one_waveform = Waveform(seq=(_one, ))
12✔
861

862

863
def zero():
12✔
864
    return _zero_waveform
12✔
865

866

867
def one():
12✔
868
    return _one_waveform
12✔
869

870

871
def const(c):
12✔
872
    return Waveform(seq=(_const(1.0 * c), ))
12✔
873

874

875
# register base function
876
def _format_LINEAR(shift, *args):
12✔
877
    if shift != 0:
×
878
        shift = _num_latex(-shift)
×
879
        if shift[0] == '-':
×
880
            return f"(t{shift})"
×
881
        else:
882
            return f"(t+{shift})"
×
883
    else:
884
        return 't'
×
885

886

887
def _format_GAUSSIAN(shift, *args):
12✔
888
    sigma = _num_latex(args[0] / np.sqrt(2))
×
889
    shift = _num_latex(-shift)
×
890
    if shift != '0':
×
891
        if shift[0] != '-':
×
892
            shift = '+' + shift
×
893
        if sigma == '1':
×
894
            return ('\\exp\\left[-\\frac{\\left(t' + shift +
×
895
                    '\\right)^2}{2}\\right]')
896
        else:
897
            return ('\\exp\\left[-\\frac{1}{2}\\left(\\frac{t' + shift + '}{' +
×
898
                    sigma + '}\\right)^2\\right]')
899
    else:
900
        if sigma == '1':
×
901
            return ('\\exp\\left(-\\frac{t^2}{2}\\right)')
×
902
        else:
903
            return ('\\exp\\left[-\\frac{1}{2}\\left(\\frac{t}{' + sigma +
×
904
                    '}\\right)^2\\right]')
905

906

907
def _format_SINC(shift, *args):
12✔
908
    shift = _num_latex(-shift)
×
909
    bw = _num_latex(args[0])
×
910
    if shift != '0':
×
911
        if shift[0] != '-':
×
912
            shift = '+' + shift
×
913
        if bw == '1':
×
914
            return '\\mathrm{sinc}(t' + shift + ')'
×
915
        else:
916
            return '\\mathrm{sinc}[' + bw + '(t' + shift + ')]'
×
917
    else:
918
        if bw == '1':
×
919
            return '\\mathrm{sinc}(t)'
×
920
        else:
921
            return '\\mathrm{sinc}(' + bw + 't)'
×
922

923

924
def _format_COSINE(shift, *args):
12✔
925
    freq = args[0] / 2 / np.pi
×
926
    phase = -shift * freq
×
927
    freq = _num_latex(freq)
×
928
    if freq == '1':
×
929
        freq = ''
×
930
    phase = _num_latex(phase)
×
931
    if phase == '0':
×
932
        phase = ''
×
933
    elif phase[0] != '-':
×
934
        phase = '+' + phase
×
935
    if phase != '':
×
936
        return f'\\cos\\left[2\\pi\\left({freq}t{phase}\\right)\\right]'
×
937
    elif freq != '':
×
938
        return f'\\cos\\left(2\\pi\\times {freq}t\\right)'
×
939
    else:
940
        return '\\cos\\left(2\\pi t\\right)'
×
941

942

943
def _format_ERF(shift, *args):
12✔
944
    if shift > 0:
×
945
        return '\\mathrm{erf}(\\frac{t-' + f"{_num_latex(shift)}" + '}{' + f'{args[0]:g}' + '})'
×
946
    elif shift < 0:
×
947
        return '\\mathrm{erf}(\\frac{t+' + f"{_num_latex(-shift)}" + '}{' + f'{args[0]:g}' + '})'
×
948
    else:
949
        return '\\mathrm{erf}(\\frac{t}{' + f'{args[0]:g}' + '})'
×
950

951

952
def _format_COSH(shift, *args):
12✔
953
    if shift > 0:
×
954
        return '\\cosh(\\frac{t-' + f"{_num_latex(shift)}" + '}{' + f'{1/args[0]:g}' + '})'
×
955
    elif shift < 0:
×
956
        return '\\cosh(\\frac{t+' + f"{_num_latex(-shift)}" + '}{' + f'{1/args[0]:g}' + '})'
×
957
    else:
958
        return '\\cosh(\\frac{t}{' + f'{1/args[0]:g}' + '})'
×
959

960

961
def _format_SINH(shift, *args):
12✔
962
    if shift > 0:
×
963
        return '\\sinh(\\frac{t-' + f"{_num_latex(shift)}" + '}{' + f'{args[0]:g}' + '})'
×
964
    elif shift < 0:
×
965
        return '\\sinh(\\frac{t+' + f"{_num_latex(-shift)}" + '}{' + f'{args[0]:g}' + '})'
×
966
    else:
967
        return '\\sinh(\\frac{t}{' + f'{args[0]:g}' + '})'
×
968

969

970
def _format_EXP(shift, *args):
12✔
971
    if _num_latex(shift) and shift > 0:
×
972
        return '\\exp\\left(-' + f'{args[0]:g}' + '\\left(t-' + f"{_num_latex(shift)}" + '\\right)\\right)'
×
973
    elif _num_latex(-shift) and shift < 0:
×
974
        return '\\exp\\left(-' + f'{args[0]:g}' + '\\left(t+' + f"{_num_latex(-shift)}" + '\\right)\\right)'
×
975
    else:
976
        return '\\exp\\left(-' + f'{args[0]:g}' + 't\\right)'
×
977

978

979
def _format_DRAG(shift, *args):
12✔
980
    return f"DRAG(...)"
×
981

982

983
def _format_MOLLIFIER(shift, *args):
12✔
NEW
984
    r = _num_latex(args[0])
×
NEW
985
    d = _num_latex(args[1])
×
NEW
986
    shift_str = _num_latex(-shift)
×
NEW
987
    if shift_str == '0':
×
NEW
988
        shift_str = ''
×
NEW
989
    elif shift_str[0] != '-':
×
NEW
990
        shift_str = '+' + shift_str
×
991

NEW
992
    if d == '0':
×
NEW
993
        return f"\\mathrm{{Mollifier}}\\left(t{shift_str}, r={r}\\right)"
×
NEW
994
    elif d == '1':
×
NEW
995
        return f"\\mathrm{{Mollifier}}'\\left(t{shift_str}, r={r}\\right)"
×
NEW
996
    elif d == '2':
×
NEW
997
        return f"\\mathrm{{Mollifier}}''\\left(t{shift_str}, r={r}\\right)"
×
998
    else:
NEW
999
        return f"\\mathrm{{Mollifier}}^{{({d})}}\\left(t{shift_str}, r={r}\\right)"
×
1000

1001

1002
registerBaseFuncLatex(LINEAR, _format_LINEAR)
12✔
1003
registerBaseFuncLatex(GAUSSIAN, _format_GAUSSIAN)
12✔
1004
registerBaseFuncLatex(ERF, _format_ERF)
12✔
1005
registerBaseFuncLatex(COS, _format_COSINE)
12✔
1006
registerBaseFuncLatex(SINC, _format_SINC)
12✔
1007
registerBaseFuncLatex(EXP, _format_EXP)
12✔
1008
registerBaseFuncLatex(COSH, _format_COSH)
12✔
1009
registerBaseFuncLatex(SINH, _format_SINH)
12✔
1010
registerBaseFuncLatex(DRAG, _format_DRAG)
12✔
1011
registerBaseFuncLatex(MOLLIFIER, _format_MOLLIFIER)
12✔
1012

1013

1014
def D(wav: Waveform, d: int = 1) -> Waveform:
12✔
1015
    """derivative
1016

1017
    Parameters
1018
    ----------
1019
    wav : Waveform
1020
        The waveform to take the derivative of.
1021
    d : int, optional
1022
        The order of the derivative, by default 1.
1023
    """
NEW
1024
    assert d >= 0 and isinstance(d, int), "d must be a non-negative integer"
×
NEW
1025
    if d == 0:
×
NEW
1026
        return wav
×
NEW
1027
    elif d == 1:
×
NEW
1028
        return Waveform(bounds=wav.bounds, seq=tuple(_D(x) for x in wav.seq))
×
1029
    else:
NEW
1030
        return D(D(wav, d - 1), 1)
×
1031

1032

1033
def convolve(a, b):
12✔
1034
    pass
×
1035

1036

1037
def sign():
12✔
1038
    return Waveform(bounds=(0, +inf), seq=(_const(-1), _one))
×
1039

1040

1041
def step(edge, type='erf'):
12✔
1042
    """
1043
    type: "erf", "cos", "linear"
1044
    """
1045
    if edge == 0:
12✔
1046
        return Waveform(bounds=(0, +inf), seq=(_zero, _one))
12✔
1047
    if type == 'cos':
12✔
1048
        rise = add(_half,
×
1049
                   mul(_half, basic_wave(COS, pi / edge, shift=0.5 * edge)))
1050
        return Waveform(bounds=(round(-edge / 2,
×
1051
                                      NDIGITS), round(edge / 2,
1052
                                                      NDIGITS), +inf),
1053
                        seq=(_zero, rise, _one))
1054
    elif type == 'linear':
12✔
1055
        rise = add(_half, mul(_const(1 / edge), basic_wave(LINEAR)))
12✔
1056
        return Waveform(bounds=(round(-edge / 2,
12✔
1057
                                      NDIGITS), round(edge / 2,
1058
                                                      NDIGITS), +inf),
1059
                        seq=(_zero, rise, _one))
1060
    else:
1061
        std_sq2 = edge / 5
×
1062
        # rise = add(_half, mul(_half, basic_wave(ERF, std_sq2)))
1063
        rise = ((((), ()), (((ERF, std_sq2, 0), ), (1, ))), (0.5, 0.5))
×
1064
        return Waveform(bounds=(-round(edge, NDIGITS), round(edge,
×
1065
                                                             NDIGITS), +inf),
1066
                        seq=(_zero, rise, _one))
1067

1068

1069
def square(width, edge=0, type='erf'):
12✔
1070
    if width <= 0:
12✔
1071
        return zero()
×
1072
    if edge == 0:
12✔
1073
        return Waveform(bounds=(round(-0.5 * width,
12✔
1074
                                      NDIGITS), round(0.5 * width,
1075
                                                      NDIGITS), +inf),
1076
                        seq=(_zero, _one, _zero))
1077
    else:
1078
        return ((step(edge, type=type) << width / 2) -
12✔
1079
                (step(edge, type=type) >> width / 2))
1080

1081

1082
def gaussian(width, plateau=0.0):
12✔
1083
    if width <= 0 and plateau <= 0.0:
12✔
1084
        return zero()
×
1085
    # width is two times FWHM
1086
    # std_sq2 = width / (4 * np.sqrt(np.log(2)))
1087
    std_sq2 = width / 3.3302184446307908
12✔
1088
    # std is set to give total pulse area same as a square
1089
    # std_sq2 = width/np.sqrt(np.pi)
1090
    if round(0.5 * plateau, NDIGITS) <= 0.0:
12✔
1091
        return Waveform(bounds=(round(-0.75 * width,
12✔
1092
                                      NDIGITS), round(0.75 * width,
1093
                                                      NDIGITS), +inf),
1094
                        seq=(_zero, basic_wave(GAUSSIAN, std_sq2), _zero))
1095
    else:
1096
        return Waveform(bounds=(round(-0.75 * width - 0.5 * plateau,
×
1097
                                      NDIGITS), round(-0.5 * plateau, NDIGITS),
1098
                                round(0.5 * plateau, NDIGITS),
1099
                                round(0.75 * width + 0.5 * plateau,
1100
                                      NDIGITS), +inf),
1101
                        seq=(_zero,
1102
                             basic_wave(GAUSSIAN,
1103
                                        std_sq2,
1104
                                        shift=-0.5 * plateau), _one,
1105
                             basic_wave(GAUSSIAN, std_sq2,
1106
                                        shift=0.5 * plateau), _zero))
1107

1108

1109
def cos(w, phi=0):
12✔
1110
    if w == 0:
12✔
1111
        return const(np.cos(phi))
×
1112
    if w < 0:
12✔
1113
        phi = -phi
×
1114
        w = -w
×
1115
    return Waveform(seq=(basic_wave(COS, w, shift=-phi / w), ))
12✔
1116

1117

1118
def sin(w, phi=0):
12✔
1119
    if w == 0:
12✔
1120
        return const(np.sin(phi))
×
1121
    if w < 0:
12✔
1122
        phi = -phi + pi
×
1123
        w = -w
×
1124
    return Waveform(seq=(basic_wave(COS, w, shift=(pi / 2 - phi) / w), ))
12✔
1125

1126

1127
def exp(alpha):
12✔
1128
    if isinstance(alpha, complex):
12✔
1129
        if alpha.real == 0:
12✔
1130
            return cos(alpha.imag) + 1j * sin(alpha.imag)
×
1131
        else:
1132
            return exp(alpha.real) * (cos(alpha.imag) + 1j * sin(alpha.imag))
12✔
1133
    else:
1134
        return Waveform(seq=(basic_wave(EXP, alpha), ))
12✔
1135

1136

1137
def sinc(bw):
12✔
1138
    if bw <= 0:
×
1139
        return zero()
×
1140
    width = 100 / bw
×
1141
    return Waveform(bounds=(round(-0.5 * width,
×
1142
                                  NDIGITS), round(0.5 * width, NDIGITS), +inf),
1143
                    seq=(_zero, basic_wave(SINC, bw), _zero))
1144

1145

1146
def cosPulse(width, plateau=0.0):
12✔
1147
    # cos = basic_wave(COS, 2*np.pi/width)
1148
    # pulse = mul(add(cos, _one), _half)
1149
    if round(0.5 * plateau, NDIGITS) > 0:
×
1150
        return square(plateau + 0.5 * width, edge=0.5 * width, type='cos')
×
1151
    if width <= 0:
×
1152
        return zero()
×
1153
    pulse = ((((), ()), (((COS, 6.283185307179586 / width, 0), ), (1, ))),
×
1154
             (0.5, 0.5))
1155
    return Waveform(bounds=(round(-0.5 * width,
×
1156
                                  NDIGITS), round(0.5 * width, NDIGITS), +inf),
1157
                    seq=(_zero, pulse, _zero))
1158

1159

1160
def hanning(width, plateau=0.0):
12✔
1161
    return cosPulse(width, plateau=plateau)
×
1162

1163

1164
def cosh(w):
12✔
1165
    return Waveform(seq=(basic_wave(COSH, w), ))
×
1166

1167

1168
def sinh(w):
12✔
1169
    return Waveform(seq=(basic_wave(SINH, w), ))
×
1170

1171

1172
def coshPulse(width, eps=1.0, plateau=0.0):
12✔
1173
    """Cosine hyperbolic pulse with the following im
1174

1175
    pulse edge shape:
1176
            cosh(eps / 2) - cosh(eps * t / T)
1177
    f(t) = -----------------------------------
1178
                  cosh(eps / 2) - 1
1179
    where T is the pulse width and eps is the pulse edge steepness.
1180
    The pulse is defined for t in [-T/2, T/2].
1181

1182
    In case of plateau > 0, the pulse is defined as:
1183
           | f(t + plateau/2)   if t in [-T/2 - plateau/2, - plateau/2]
1184
    g(t) = | 1                  if t in [-plateau/2, plateau/2]
1185
           | f(t - plateau/2)   if t in [plateau/2, T/2 + plateau/2]
1186

1187
    Parameters
1188
    ----------
1189
    width : float
1190
        Pulse width.
1191
    eps : float
1192
        Pulse edge steepness.
1193
    plateau : float
1194
        Pulse plateau.
1195
    """
1196
    if width <= 0 and plateau <= 0:
×
1197
        return zero()
×
1198
    w = eps / width
×
1199
    A = np.cosh(eps / 2)
×
1200

1201
    if plateau == 0.0 or round(-0.5 * plateau, NDIGITS) == round(
×
1202
            0.5 * plateau, NDIGITS):
1203
        pulse = ((((), ()), (((COSH, w, 0), ), (1, ))), (A / (A - 1),
×
1204
                                                         -1 / (A - 1)))
1205
        return Waveform(bounds=(round(-0.5 * width,
×
1206
                                      NDIGITS), round(0.5 * width,
1207
                                                      NDIGITS), +inf),
1208
                        seq=(_zero, pulse, _zero))
1209
    else:
1210
        raising = ((((), ()), (((COSH, w, -0.5 * plateau), ), (1, ))),
×
1211
                   (A / (A - 1), -1 / (A - 1)))
1212
        falling = ((((), ()), (((COSH, w, 0.5 * plateau), ), (1, ))),
×
1213
                   (A / (A - 1), -1 / (A - 1)))
1214
        return Waveform(bounds=(round(-0.5 * width - 0.5 * plateau,
×
1215
                                      NDIGITS), round(-0.5 * plateau, NDIGITS),
1216
                                round(0.5 * plateau, NDIGITS),
1217
                                round(0.5 * width + 0.5 * plateau,
1218
                                      NDIGITS), +inf),
1219
                        seq=(_zero, raising, _one, falling, _zero))
1220

1221

1222
def general_cosine(duration, *arg):
12✔
1223
    wav = zero()
×
1224
    arg = np.asarray(arg)
×
1225
    arg /= arg[::2].sum()
×
1226
    for i, a in enumerate(arg, start=1):
×
1227
        wav += a / 2 * (1 - (-1)**i * cos(i * 2 * pi / duration))
×
1228
    return wav * square(duration)
×
1229

1230

1231
def slepian(duration, *arg):
12✔
1232
    wav = zero()
×
1233
    arg = np.asarray(arg)
×
1234
    arg /= arg[::2].sum()
×
1235
    for i, a in enumerate(arg, start=1):
×
1236
        wav += a / 2 * (1 - (-1)**i * cos(i * 2 * pi / duration))
×
1237
    return wav * square(duration)
×
1238

1239

1240
def mollifier(width, plateau: float = 0.0, d: int = 0):
12✔
1241
    """
1242
    Mollifier function is a smooth function that is 1 at the origin and 0 outside a certain radius.
1243
    It is defined as:
1244

1245
    f(x) = exp(1 / ((x / r) ^ 2 - 1) + 1)  in case |x| < r
1246
         = 0                           in case |x| >= r
1247
    where r = width / 2 is the radius of the mollifier.
1248

1249
    The parameter plateau is the width of the plateau.
1250
    The parameter d is the order of the derivative.
1251
    """
NEW
1252
    assert d >= 0 and isinstance(d, int), "d must be a non-negative integer"
×
NEW
1253
    assert width > 0, "width must be positive"
×
1254

NEW
1255
    if plateau <= 0:
×
NEW
1256
        return Waveform(bounds=(-0.5 * width, 0.5 * width, inf),
×
1257
                        seq=(_zero, basic_wave(MOLLIFIER, width / 2,
1258
                                               d), _zero))
1259
    else:
NEW
1260
        return Waveform(bounds=(-0.5 * width - 0.5 * plateau, -0.5 * plateau,
×
1261
                                0.5 * plateau, 0.5 * width + 0.5 * plateau,
1262
                                inf),
1263
                        seq=(_zero,
1264
                             basic_wave(MOLLIFIER,
1265
                                        width / 2,
1266
                                        d,
1267
                                        shift=-0.5 * plateau), _one,
1268
                             basic_wave(MOLLIFIER,
1269
                                        width / 2,
1270
                                        d,
1271
                                        shift=0.5 * plateau), _zero))
1272

1273

1274
def _poly(*a):
12✔
1275
    """
1276
    a[0] + a[1] * t + a[2] * t**2 + ...
1277
    """
1278
    t = []
12✔
1279
    amp = []
12✔
1280
    if a[0] != 0:
12✔
1281
        t.append(((), ()))
12✔
1282
        amp.append(a[0])
12✔
1283
    for n, a_ in enumerate(a[1:], start=1):
12✔
1284
        if a_ != 0:
12✔
1285
            t.append((((LINEAR, 0), ), (n, )))
12✔
1286
            amp.append(a_)
12✔
1287
    return tuple(t), tuple(a)
12✔
1288

1289

1290
def poly(a):
12✔
1291
    """
1292
    a[0] + a[1] * t + a[2] * t**2 + ...
1293
    """
1294
    return Waveform(seq=(_poly(*a), ))
12✔
1295

1296

1297
def t():
12✔
1298
    return Waveform(seq=((((LINEAR, 0), ), (1, )), (1, )))
×
1299

1300

1301
def drag(freq, width, plateau=0, delta=0, block_freq=None, phase=0, t0=0):
12✔
1302
    phase += pi * delta * (width + plateau)
×
1303
    if plateau <= 0:
×
1304
        return Waveform(seq=(_zero,
×
1305
                             basic_wave(DRAG, t0, freq, width, delta,
1306
                                        block_freq, phase), _zero),
1307
                        bounds=(round(t0, NDIGITS), round(t0 + width,
1308
                                                          NDIGITS), +inf))
1309
    elif width <= 0:
×
1310
        w = 2 * pi * (freq + delta)
×
1311
        return Waveform(
×
1312
            seq=(_zero,
1313
                 basic_wave(COS, w,
1314
                            shift=(phase + 2 * pi * delta * t0) / w), _zero),
1315
            bounds=(round(t0, NDIGITS), round(t0 + plateau, NDIGITS), +inf))
1316
    else:
1317
        w = 2 * pi * (freq + delta)
×
1318
        return Waveform(
×
1319
            seq=(_zero,
1320
                 basic_wave(DRAG, t0, freq, width, delta, block_freq, phase),
1321
                 basic_wave(COS, w, shift=(phase + 2 * pi * delta * t0) / w),
1322
                 basic_wave(DRAG, t0 + plateau, freq, width, delta, block_freq,
1323
                            phase - 2 * pi * delta * plateau), _zero),
1324
            bounds=(round(t0, NDIGITS), round(t0 + width / 2, NDIGITS),
1325
                    round(t0 + width / 2 + plateau,
1326
                          NDIGITS), round(t0 + width + plateau,
1327
                                          NDIGITS), +inf))
1328

1329

1330
def chirp(f0, f1, T, phi0=0, type='linear'):
12✔
1331
    """
1332
    A chirp is a signal in which the frequency increases (up-chirp)
1333
    or decreases (down-chirp) with time. In some sources, the term
1334
    chirp is used interchangeably with sweep signal.
1335

1336
    type: "linear", "exponential", "hyperbolic"
1337
    """
1338
    if f0 == f1:
12✔
1339
        return sin(f0, phi0)
×
1340
    if T <= 0:
12✔
1341
        raise ValueError('T must be positive')
×
1342

1343
    if type == 'linear':
12✔
1344
        # f(t) = f1 * (t/T) + f0 * (1 - t/T)
1345
        return Waveform(bounds=(0, round(T, NDIGITS), +inf),
12✔
1346
                        seq=(_zero, basic_wave(LINEARCHIRP, f0, f1, T,
1347
                                               phi0), _zero))
1348
    elif type in ['exp', 'exponential', 'geometric']:
12✔
1349
        # f(t) = f0 * (f1/f0) ** (t/T)
1350
        if f0 == 0:
12✔
1351
            raise ValueError('f0 must be non-zero')
×
1352
        alpha = np.log(f1 / f0) / T
12✔
1353
        return Waveform(bounds=(0, round(T, NDIGITS), +inf),
12✔
1354
                        seq=(_zero,
1355
                             basic_wave(EXPONENTIALCHIRP, f0, alpha,
1356
                                        phi0), _zero))
1357
    elif type in ['hyperbolic', 'hyp']:
12✔
1358
        # f(t) = f0 * f1 / (f0 * (t/T) + f1 * (1-t/T))
1359
        if f0 * f1 == 0:
12✔
1360
            return const(np.sin(phi0))
×
1361
        k = (f0 - f1) / (f1 * T)
12✔
1362
        return Waveform(bounds=(0, round(T, NDIGITS), +inf),
12✔
1363
                        seq=(_zero, basic_wave(HYPERBOLICCHIRP, f0, k,
1364
                                               phi0), _zero))
1365
    else:
1366
        raise ValueError(f'unknown type {type}')
×
1367

1368

1369
def interp(x, y):
12✔
1370
    seq, bounds = [_zero], [x[0]]
×
1371
    for x1, x2, y1, y2 in zip(x[:-1], x[1:], y[:-1], y[1:]):
×
1372
        if x2 == x1:
×
1373
            continue
×
1374
        seq.append(
×
1375
            add(
1376
                mul(_const((y2 - y1) / (x2 - x1)), basic_wave(LINEAR,
1377
                                                              shift=x1)),
1378
                _const(y1)))
1379
        bounds.append(x2)
×
1380
    bounds.append(inf)
×
1381
    seq.append(_zero)
×
1382
    return Waveform(seq=tuple(seq),
×
1383
                    bounds=tuple(round(b, NDIGITS)
1384
                                 for b in bounds)).simplify()
1385

1386

1387
def cut(wav, start=None, stop=None, head=None, tail=None, min=None, max=None):
12✔
1388
    offset = 0
×
1389
    if start is not None and head is not None:
×
1390
        offset = head - wav(np.array([1.0 * start]))[0]
×
1391
    elif stop is not None and tail is not None:
×
1392
        offset = tail - wav(np.array([1.0 * stop]))[0]
×
1393
    wav = wav + offset
×
1394

1395
    if start is not None:
×
1396
        wav = wav * (step(0) >> start)
×
1397
    if stop is not None:
×
1398
        wav = wav * ((1 - step(0)) >> stop)
×
1399
    if min is not None:
×
1400
        wav.min = min
×
1401
    if max is not None:
×
1402
        wav.max = max
×
1403
    return wav
×
1404

1405

1406
def function(fun, *args, start=None, stop=None):
12✔
1407
    TYPEID = registerBaseFunc(fun)
×
1408
    seq = (basic_wave(TYPEID, *args), )
×
1409
    wav = Waveform(seq=seq)
×
1410
    if start is not None:
×
1411
        wav = wav * (step(0) >> start)
×
1412
    if stop is not None:
×
1413
        wav = wav * ((1 - step(0)) >> stop)
×
1414
    return wav
×
1415

1416

1417
def samplingPoints(start, stop, points):
12✔
1418
    return Waveform(bounds=(round(start, NDIGITS), round(stop, NDIGITS), inf),
×
1419
                    seq=(_zero, basic_wave(INTERP, start, stop,
1420
                                           tuple(points)), _zero))
1421

1422

1423
def mixing(I,
12✔
1424
           Q=None,
1425
           *,
1426
           phase=0.0,
1427
           freq=0.0,
1428
           ratioIQ=1.0,
1429
           phaseDiff=0.0,
1430
           block_freq=None,
1431
           DRAGScaling=None):
1432
    """SSB or envelope mixing
1433
    """
1434
    if Q is None:
×
1435
        I = I
×
1436
        Q = zero()
×
1437

1438
    w = 2 * pi * freq
×
1439
    if freq != 0.0:
×
1440
        # SSB mixing
1441
        Iout = I * cos(w, -phase) + Q * sin(w, -phase)
×
1442
        Qout = -I * sin(w, -phase + phaseDiff) + Q * cos(w, -phase + phaseDiff)
×
1443
    else:
1444
        # envelope mixing
1445
        Iout = I * np.cos(-phase) + Q * np.sin(-phase)
×
1446
        Qout = -I * np.sin(-phase) + Q * np.cos(-phase)
×
1447

1448
    # apply DRAG
1449
    if block_freq is not None and block_freq != freq:
×
1450
        a = block_freq / (block_freq - freq)
×
1451
        b = 1 / (block_freq - freq)
×
1452
        I = a * Iout + b / (2 * pi) * D(Qout)
×
1453
        Q = a * Qout - b / (2 * pi) * D(Iout)
×
1454
        Iout, Qout = I, Q
×
1455
    elif DRAGScaling is not None and DRAGScaling != 0:
×
1456
        # 2 * pi * scaling * (freq - block_freq) = 1
1457
        I = (1 - w * DRAGScaling) * Iout - DRAGScaling * D(Qout)
×
1458
        Q = (1 - w * DRAGScaling) * Qout + DRAGScaling * D(Iout)
×
1459
        Iout, Qout = I, Q
×
1460

1461
    Qout = ratioIQ * Qout
×
1462

1463
    return Iout, Qout
×
1464

1465

1466
__all__ = [
12✔
1467
    'D', 'Waveform', 'chirp', 'const', 'cos', 'cosh', 'coshPulse', 'cosPulse',
1468
    'cut', 'drag', 'exp', 'function', 'gaussian', 'general_cosine', 'hanning',
1469
    'interp', 'mixing', 'mollifier', 'one', 'poly', 'registerBaseFunc',
1470
    'registerDerivative', 'samplingPoints', 'sign', 'sin', 'sinc', 'sinh',
1471
    'square', 'step', 't', 'zero'
1472
]
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc