• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

feihoo87 / waveforms / 16159025029

09 Jul 2025 02:41AM UTC coverage: 53.366% (-0.03%) from 53.393%
16159025029

push

github

feihoo87
Update version to 2.1.1 following recent enhancements and maintain backward compatibility.

1 of 1 new or added line in 1 file covered. (100.0%)

207 existing lines in 2 files now uncovered.

1316 of 2466 relevant lines covered (53.37%)

6.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

46.97
/waveforms/waveform.py
1
from fractions import Fraction
12✔
2

3
import numpy as np
12✔
4
from numpy import e, inf, pi
12✔
5
from scipy.signal import sosfilt
12✔
6

7
from ._waveform import (_D, COS, COSH, DRAG, ERF, EXP, EXPONENTIALCHIRP,
12✔
8
                        GAUSSIAN, HYPERBOLICCHIRP, INTERP, LINEAR, LINEARCHIRP,
9
                        NDIGITS, SINC, SINH, _baseFunc, _baseFunc_latex,
10
                        _const, _half, _one, _zero, add, basic_wave,
11
                        calc_parts, filter, is_const, merge_waveform, mul, pow,
12
                        registerBaseFunc, registerBaseFuncLatex,
13
                        registerDerivative, shift, simplify, wave_sum)
14

15

16
def _test_spec_num(num, spec):
12✔
17
    x = Fraction(num / spec).limit_denominator(1000000000)
×
18
    if x.denominator <= 24:
×
19
        return True, x, 1
×
20
    x = Fraction(spec * num).limit_denominator(1000000000)
×
21
    if x.denominator <= 24:
×
22
        return True, x, -1
×
23
    return False, x, 0
×
24

25

26
def _spec_num_latex(num):
12✔
27
    for spec, spec_latex in [(1, ''), (np.sqrt(2), '\\sqrt{2}'),
×
28
                             (np.sqrt(3), '\\sqrt{3}'),
29
                             (np.sqrt(5), '\\sqrt{5}'),
30
                             (np.log(2), '\\log{2}'), (np.log(3), '\\log{3}'),
31
                             (np.log(5), '\\log{5}'), (np.e, 'e'),
32
                             (np.pi, '\\pi'), (np.pi**2, '\\pi^2'),
33
                             (np.sqrt(np.pi), '\\sqrt{\\pi}')]:
34
        flag, x, sign = _test_spec_num(num, spec)
×
35
        if flag:
×
36
            if sign < 0:
×
37
                spec_latex = f"\\frac{{{1}}}{{{spec_latex}}}"
×
38
            if x.denominator == 1:
×
39
                if x.numerator == 1:
×
40
                    return f"{spec_latex}"
×
41
                else:
42
                    return f"{x.numerator:g}{spec_latex}"
×
43
            else:
44
                if x.numerator < 0:
×
45
                    return f"-\\frac{{{-x.numerator}}}{{{x.denominator}}}{spec_latex}"
×
46
                else:
47
                    return f"\\frac{{{x.numerator}}}{{{x.denominator}}}{spec_latex}"
×
48
    return f"{num:g}"
×
49

50

51
def _num_latex(num):
12✔
52
    if num == -np.inf:
×
53
        return r"-\infty"
×
54
    elif num == np.inf:
×
55
        return r"\infty"
×
56
    if num.imag > 0:
×
57
        return f"\\left({_num_latex(num.real)}+{_num_latex(num.imag)}j\\right)"
×
58
    elif num.imag < 0:
×
59
        return f"\\left({_num_latex(num.real)}-{_num_latex(-num.imag)}j\\right)"
×
60
    s = _spec_num_latex(num.real)
×
61
    if s == '' and round(num.real) == 1:
×
62
        return '1'
×
63
    if "e" in s:
×
64
        a, n = s.split("e")
×
65
        n = float(n)
×
66
        s = f"{a} \\times 10^{{{n:g}}}"
×
67
    return s
×
68

69

70
def _fun_latex(fun):
12✔
71
    funID, *args, shift = fun
×
72
    if _baseFunc_latex[funID] is None:
×
73
        shift = _num_latex(shift)
×
74
        if shift == "0":
×
75
            shift = ""
×
76
        elif shift[0] != '-':
×
77
            shift = "+" + shift
×
78
        return r"\mathrm{Func}" + f"{funID}(t{shift}, ...)"
×
79
    return _baseFunc_latex[funID](shift, *args)
×
80

81

82
def _wav_latex(wav):
12✔
83

84
    if wav == _zero:
×
85
        return "0"
×
86
    elif is_const(wav):
×
87
        return f"{wav[1][0]}"
×
88

89
    sum_expr = []
×
90
    for mul, amp in zip(*wav):
×
91
        if mul == ((), ()):
×
92
            sum_expr.append(_num_latex(amp))
×
93
            continue
×
94
        mul_expr = []
×
95
        amp = _num_latex(amp)
×
96
        if amp != "1":
×
97
            mul_expr.append(amp)
×
98
        for fun, n in zip(*mul):
×
99
            fun_expr = _fun_latex(fun)
×
100
            if n != 1:
×
101
                mul_expr.append(fun_expr + "^{" + f"{n}" + "}")
×
102
            else:
103
                mul_expr.append(fun_expr)
×
104
        sum_expr.append(''.join(mul_expr))
×
105

106
    ret = sum_expr[0]
×
107
    for expr in sum_expr[1:]:
×
108
        if expr[0] == '-':
×
109
            ret += expr
×
110
        else:
111
            ret += "+" + expr
×
112
    return ret
×
113

114

115
class Waveform:
12✔
116
    __slots__ = ('bounds', 'seq', 'max', 'min', 'start', 'stop', 'sample_rate',
12✔
117
                 'filters', 'label')
118

119
    def __init__(self, bounds=(+inf, ), seq=(_zero, ), min=-inf, max=inf):
12✔
120
        self.bounds = bounds
12✔
121
        self.seq = seq
12✔
122
        self.max = max
12✔
123
        self.min = min
12✔
124
        self.start = None
12✔
125
        self.stop = None
12✔
126
        self.sample_rate = None
12✔
127
        self.filters = None
12✔
128
        self.label = None
12✔
129

130
    @staticmethod
12✔
131
    def _begin(bounds, seq):
12✔
132
        for i, s in enumerate(seq):
×
133
            if s is not _zero:
×
134
                if i == 0:
×
135
                    return -inf
×
136
                return bounds[i - 1]
×
137
        return inf
×
138

139
    @staticmethod
12✔
140
    def _end(bounds, seq):
12✔
141
        N = len(bounds)
×
142
        for i, s in enumerate(seq[::-1]):
×
143
            if s is not _zero:
×
144
                if i == 0:
×
145
                    return inf
×
146
                return bounds[N - i - 1]
×
147
        return -inf
×
148

149
    @property
12✔
150
    def begin(self):
12✔
151
        if self.start is None:
×
152
            return self._begin(self.bounds, self.seq)
×
153
        else:
154
            return max(self.start, self._begin(self.bounds, self.seq))
×
155

156
    @property
12✔
157
    def end(self):
12✔
158
        if self.stop is None:
×
159
            return self._end(self.bounds, self.seq)
×
160
        else:
161
            return min(self.stop, self._end(self.bounds, self.seq))
×
162

163
    def sample(self,
12✔
164
               sample_rate=None,
165
               out=None,
166
               chunk_size=None,
167
               function_lib=None,
168
               filters=None):
169
        if sample_rate is None:
12✔
170
            sample_rate = self.sample_rate
12✔
171
        if self.start is None or self.stop is None or sample_rate is None:
12✔
172
            raise ValueError(
×
173
                f'Waveform is not initialized. {self.start=}, {self.stop=}, {sample_rate=}'
174
            )
175
        if filters is None:
12✔
176
            filters = self.filters
12✔
177
        if chunk_size is None:
12✔
178
            x = np.arange(self.start, self.stop, 1 / sample_rate)
12✔
179
            sig = self.__call__(x, out=out, function_lib=function_lib)
12✔
180
            if filters is not None:
12✔
181
                sos, initial = filters
12✔
182
                if not isinstance(sos, np.ndarray):
12✔
183
                    sos = np.array(sos)
×
184
                elif not sos.flags.writeable:
12✔
UNCOV
185
                    sos = sos.copy()
×
186
                if initial:
12✔
UNCOV
187
                    sig = sosfilt(sos, sig - initial) + initial
×
188
                else:
189
                    sig = sosfilt(sos, sig)
12✔
190
            return sig
12✔
191
        else:
UNCOV
192
            return self._sample_iter(sample_rate, chunk_size, out,
×
193
                                     function_lib, filters)
194

195
    def _sample_iter(self, sample_rate, chunk_size, out, function_lib,
12✔
196
                     filters):
UNCOV
197
        start = self.start
×
198
        start_n = 0
×
199
        if filters is not None:
×
200
            sos, initial = filters
×
201
            # zi = sosfilt_zi(sos)
202
            zi = np.zeros((sos.shape[0], 2))
×
203
        length = chunk_size / sample_rate
×
204
        while start < self.stop:
×
UNCOV
205
            if start + length > self.stop:
×
206
                length = self.stop - start
×
207
                stop = self.stop
×
208
                size = round((stop - start) * sample_rate)
×
209
            else:
210
                stop = start + length
×
211
                size = chunk_size
×
212
            x = np.linspace(start, stop, size, endpoint=False)
×
213

UNCOV
214
            if filters is None:
×
UNCOV
215
                if out is not None:
×
216
                    yield self.__call__(x,
×
217
                                        out=out[start_n:],
218
                                        function_lib=function_lib)
219
                else:
220
                    yield self.__call__(x, function_lib=function_lib)
×
221
            else:
222
                sig = self.__call__(x, function_lib=function_lib)
×
223
                if initial:
×
224
                    sig -= initial
×
225
                sig, zi = sosfilt(sos, sig, zi=zi)
×
226
                if initial:
×
UNCOV
227
                    sig += initial
×
228
                if out is not None:
×
229
                    out[start_n:start_n + size] = sig
×
UNCOV
230
                yield sig
×
231

UNCOV
232
            start = stop
×
UNCOV
233
            start_n += chunk_size
×
234

235
    @staticmethod
12✔
236
    def _tolist(bounds, seq, ret=None):
12✔
237
        if ret is None:
12✔
UNCOV
238
            ret = []
×
239
        ret.append(len(bounds))
12✔
240
        for seq, b in zip(seq, bounds):
12✔
241
            ret.append(b)
12✔
242
            tlist, amplist = seq
12✔
243
            ret.append(len(amplist))
12✔
244
            for t, amp in zip(tlist, amplist):
12✔
245
                ret.append(amp)
12✔
246
                mtlist, nlist = t
12✔
247
                ret.append(len(nlist))
12✔
248
                for fun, n in zip(mtlist, nlist):
12✔
249
                    ret.append(n)
12✔
250
                    ret.append(len(fun))
12✔
251
                    ret.extend(fun)
12✔
252
        return ret
12✔
253

254
    @staticmethod
12✔
255
    def _fromlist(l, pos=0):
12✔
256

257
        def _read(l, pos, size):
12✔
258
            try:
12✔
259
                return tuple(l[pos:pos + size]), pos + size
12✔
UNCOV
260
            except:
×
UNCOV
261
                raise ValueError('Invalid waveform format')
×
262

263
        (nseg, ), pos = _read(l, pos, 1)
12✔
264
        bounds = []
12✔
265
        seq = []
12✔
266
        for _ in range(nseg):
12✔
267
            (b, nsum), pos = _read(l, pos, 2)
12✔
268
            bounds.append(b)
12✔
269
            amp = []
12✔
270
            t = []
12✔
271
            for _ in range(nsum):
12✔
272
                (a, nmul), pos = _read(l, pos, 2)
12✔
273
                amp.append(a)
12✔
274
                nlst = []
12✔
275
                mt = []
12✔
276
                for _ in range(nmul):
12✔
277
                    (n, nfun), pos = _read(l, pos, 2)
12✔
278
                    nlst.append(n)
12✔
279
                    fun, pos = _read(l, pos, nfun)
12✔
280
                    mt.append(fun)
12✔
281
                t.append((tuple(mt), tuple(nlst)))
12✔
282
            seq.append((tuple(t), tuple(amp)))
12✔
283

284
        return tuple(bounds), tuple(seq), pos
12✔
285

286
    def tolist(self):
12✔
287
        l = [self.max, self.min, self.start, self.stop, self.sample_rate]
12✔
288
        if self.filters is None:
12✔
289
            l.append(None)
12✔
290
        else:
291
            sos, initial = self.filters
12✔
292
            sos = list(sos.reshape(-1))
12✔
293
            l.append(len(sos))
12✔
294
            l.extend(sos)
12✔
295
            l.append(initial)
12✔
296

297
        return self._tolist(self.bounds, self.seq, l)
12✔
298

299
    @classmethod
12✔
300
    def fromlist(cls, l):
12✔
301
        w = cls()
12✔
302
        pos = 6
12✔
303
        (w.max, w.min, w.start, w.stop, w.sample_rate, sos_size) = l[:pos]
12✔
304
        if sos_size is not None:
12✔
305
            sos = np.array(l[pos:pos + sos_size]).reshape(-1, 6)
12✔
306
            pos += sos_size
12✔
307
            initial = l[pos]
12✔
308
            pos += 1
12✔
309
            w.filters = sos, initial
12✔
310

311
        w.bounds, w.seq, pos = cls._fromlist(l, pos)
12✔
312
        return w
12✔
313

314
    def totree(self):
12✔
315
        if self.filters is None:
12✔
316
            header = (self.max, self.min, self.start, self.stop,
12✔
317
                      self.sample_rate, None)
318
        else:
319
            header = (self.max, self.min, self.start, self.stop,
12✔
320
                      self.sample_rate, self.filters)
321
        body = []
12✔
322

323
        for seq, b in zip(self.seq, self.bounds):
12✔
324
            tlist, amplist = seq
12✔
325
            new_seq = []
12✔
326
            for t, amp in zip(tlist, amplist):
12✔
327
                mtlist, nlist = t
12✔
328
                new_t = []
12✔
329
                for fun, n in zip(mtlist, nlist):
12✔
330
                    new_t.append((n, fun))
12✔
331
                new_seq.append((amp, tuple(new_t)))
12✔
332
            body.append((b, tuple(new_seq)))
12✔
333
        return header, tuple(body)
12✔
334

335
    @staticmethod
12✔
336
    def fromtree(tree):
12✔
337
        w = Waveform()
12✔
338
        header, body = tree
12✔
339

340
        (w.max, w.min, w.start, w.stop, w.sample_rate, w.filters) = header
12✔
341
        bounds = []
12✔
342
        seqs = []
12✔
343
        for b, seq in body:
12✔
344
            bounds.append(b)
12✔
345
            amp_list = []
12✔
346
            t_list = []
12✔
347
            for amp, t in seq:
12✔
348
                amp_list.append(amp)
12✔
349
                n_list = []
12✔
350
                mt_list = []
12✔
351
                for n, mt in t:
12✔
352
                    n_list.append(n)
12✔
353
                    mt_list.append(mt)
12✔
354
                t_list.append((tuple(mt_list), tuple(n_list)))
12✔
355
            seqs.append((tuple(t_list), tuple(amp_list)))
12✔
356
        w.bounds = tuple(bounds)
12✔
357
        w.seq = tuple(seqs)
12✔
358
        return w
12✔
359

360
    def simplify(self, eps=1e-15):
12✔
361
        seq = [simplify(self.seq[0], eps)]
12✔
362
        bounds = [self.bounds[0]]
12✔
363
        for expr, b in zip(self.seq[1:], self.bounds[1:]):
12✔
364
            expr = simplify(expr, eps)
12✔
365
            if expr == seq[-1]:
12✔
UNCOV
366
                seq.pop()
×
UNCOV
367
                bounds.pop()
×
368
            seq.append(expr)
12✔
369
            bounds.append(b)
12✔
370
        return Waveform(tuple(bounds), tuple(seq))
12✔
371

372
    def filter(self, low=0, high=inf, eps=1e-15):
12✔
UNCOV
373
        seq = []
×
UNCOV
374
        for expr in self.seq:
×
UNCOV
375
            seq.append(filter(expr, low, high, eps))
×
UNCOV
376
        return Waveform(self.bounds, tuple(seq))
×
377

378
    def _comb(self, other, oper):
12✔
379
        return Waveform(*merge_waveform(self.bounds, self.seq, other.bounds,
12✔
380
                                        other.seq, oper))
381

382
    def __pow__(self, n):
12✔
383
        return Waveform(self.bounds, tuple(pow(w, n) for w in self.seq))
12✔
384

385
    def __add__(self, other):
12✔
386
        if isinstance(other, Waveform):
12✔
387
            return self._comb(other, add)
12✔
388
        else:
389
            return self + const(other)
12✔
390

391
    def __radd__(self, v):
12✔
UNCOV
392
        return const(v) + self
×
393

394
    def __ior__(self, other):
12✔
395
        return self | other
×
396

397
    def __or__(self, other):
12✔
398
        if isinstance(other, (int, float, complex)):
×
399
            other = const(other)
×
400
        w = self.marker + other.marker
×
401

402
        def _or(a, b):
×
UNCOV
403
            if a != _zero or b != _zero:
×
404
                return _one
×
405
            else:
UNCOV
406
                return _zero
×
407

UNCOV
408
        return self._comb(other, _or)
×
409

410
    def __iand__(self, other):
12✔
411
        return self & other
×
412

413
    def __and__(self, other):
12✔
414
        if isinstance(other, (int, float, complex)):
×
415
            other = const(other)
×
416
        w = self.marker + other.marker
×
417

418
        def _and(a, b):
×
UNCOV
419
            if a != _zero and b != _zero:
×
420
                return _one
×
421
            else:
UNCOV
422
                return _zero
×
423

424
        return self._comb(other, _and)
×
425

426
    @property
12✔
427
    def marker(self):
12✔
UNCOV
428
        w = self.simplify()
×
429
        return Waveform(w.bounds,
×
430
                        tuple(_zero if s == _zero else _one for s in w.seq))
431

432
    def mask(self, edge=0):
12✔
UNCOV
433
        w = self.marker
×
434
        in_wave = w.seq[0] == _zero
×
435
        bounds = []
×
436
        seq = []
×
437

438
        if w.seq[0] == _zero:
×
UNCOV
439
            in_wave = False
×
440
            b = w.bounds[0] - edge
×
441
            bounds.append(b)
×
442
            seq.append(_zero)
×
443

444
        for b, s in zip(w.bounds[1:], w.seq[1:]):
×
445
            if not in_wave and s != _zero:
×
446
                in_wave = True
×
447
                bounds.append(b + edge)
×
448
                seq.append(_one)
×
449
            elif in_wave and s == _zero:
×
450
                in_wave = False
×
UNCOV
451
                b = b - edge
×
452
                if b > bounds[-1]:
×
453
                    bounds.append(b)
×
454
                    seq.append(_zero)
×
455
                else:
UNCOV
456
                    bounds.pop()
×
UNCOV
457
                    bounds.append(b)
×
UNCOV
458
        return Waveform(tuple(bounds), tuple(seq))
×
459

460
    def __mul__(self, other):
12✔
461
        if isinstance(other, Waveform):
12✔
462
            return self._comb(other, mul)
12✔
463
        else:
UNCOV
464
            return self * const(other)
×
465

466
    def __rmul__(self, v):
12✔
467
        return const(v) * self
12✔
468

469
    def __truediv__(self, other):
12✔
470
        if isinstance(other, Waveform):
12✔
UNCOV
471
            raise TypeError('division by waveform')
×
472
        else:
473
            return self * const(1 / other)
12✔
474

475
    def __neg__(self):
12✔
476
        return -1 * self
12✔
477

478
    def __sub__(self, other):
12✔
479
        return self + (-other)
12✔
480

481
    def __rsub__(self, v):
12✔
UNCOV
482
        return v + (-self)
×
483

484
    def __rshift__(self, time):
12✔
485
        return Waveform(
12✔
486
            tuple(round(bound + time, NDIGITS) for bound in self.bounds),
487
            tuple(shift(expr, time) for expr in self.seq))
488

489
    def __lshift__(self, time):
12✔
490
        return self >> (-time)
12✔
491

492
    @staticmethod
12✔
493
    def _merge_parts(
12✔
494
        parts: list[tuple[int, int, np.ndarray | int | float | complex]],
495
        out: list[tuple[int, int, np.ndarray | int | float | complex]]
496
    ) -> list[tuple[int, int, np.ndarray | int | float | complex]]:
497
        # TODO: merge parts
UNCOV
498
        raise NotImplementedError
×
499

500
    @staticmethod
12✔
501
    def _fill_parts(parts, out):
12✔
502
        for start, stop, part in parts:
12✔
503
            out[start:stop] += part
12✔
504

505
    def __call__(self,
12✔
506
                 x,
507
                 frag=False,
508
                 out=None,
509
                 accumulate=False,
510
                 function_lib=None):
511
        if function_lib is None:
12✔
512
            function_lib = _baseFunc
12✔
513
        if isinstance(x, (int, float, complex)):
12✔
UNCOV
514
            return self.__call__(np.array([x]), function_lib=function_lib)[0]
×
515
        parts, dtype = calc_parts(self.bounds, self.seq, x, function_lib,
12✔
516
                                  self.min, self.max)
517
        if not frag:
12✔
518
            if out is None:
12✔
519
                out = np.zeros_like(x, dtype=dtype)
12✔
520
            elif not accumulate:
×
521
                out *= 0
×
522
            self._fill_parts(parts, out)
12✔
523
        else:
524
            if out is None:
×
525
                return parts
×
526
            else:
527
                if not accumulate:
×
UNCOV
528
                    out.clear()
×
UNCOV
529
                    out.extend(parts)
×
530
                else:
531
                    self._merge_parts(parts, out)
×
532
        return out
12✔
533

534
    def __hash__(self):
12✔
UNCOV
535
        return hash((self.max, self.min, self.start, self.stop,
×
536
                     self.sample_rate, self.bounds, self.seq))
537

538
    def __eq__(self, o: object) -> bool:
12✔
539
        if isinstance(o, (int, float, complex)):
12✔
540
            return self == const(o)
12✔
541
        elif isinstance(o, Waveform):
12✔
542
            a = self.simplify()
12✔
543
            b = o.simplify()
12✔
544
            return a.seq == b.seq and a.bounds == b.bounds and (
12✔
545
                a.max, a.min, a.start, a.stop) == (b.max, b.min, b.start,
546
                                                   b.stop)
547
        else:
548
            return False
×
549

550
    def _repr_latex_(self):
12✔
551
        parts = []
×
552
        start = -np.inf
×
553
        for end, wav in zip(self.bounds, self.seq):
×
UNCOV
554
            e_str = _wav_latex(wav)
×
555
            start_str = _num_latex(start)
×
556
            end_str = _num_latex(end)
×
557
            parts.append(e_str + r",~~&t\in" + f"({start_str},{end_str}" +
×
558
                         (']' if end < np.inf else ')'))
559
            start = end
×
UNCOV
560
        if len(parts) == 1:
×
UNCOV
561
            expr = ''.join(['f(t)=', *parts[0].split('&')])
×
562
        else:
563
            expr = '\n'.join([
×
564
                r"f(t)=\begin{cases}", (r"\\" + '\n').join(parts),
565
                r"\end{cases}"
566
            ])
UNCOV
567
        return "$$\n{}\n$$".format(expr)
×
568

569
    def _play(self, time_unit, volume=1.0):
12✔
UNCOV
570
        import pyaudio
×
571

572
        CHUNK = 1024
×
UNCOV
573
        RATE = 48000
×
574

575
        dynamic_volume = 1.0
×
576
        amp = 2**15 * 0.999 * volume * dynamic_volume
×
577

UNCOV
578
        p = pyaudio.PyAudio()
×
UNCOV
579
        try:
×
580
            stream = p.open(format=pyaudio.paInt16,
×
581
                            channels=1,
582
                            rate=RATE,
583
                            output=True)
584
            try:
×
585
                for data in self.sample(sample_rate=RATE / time_unit,
×
586
                                        chunk_size=CHUNK):
587
                    lim = np.abs(data).max()
×
588
                    if lim > 0 and dynamic_volume > 1.0 / lim:
×
UNCOV
589
                        dynamic_volume = 1.0 / lim
×
590
                        amp = 2**15 * 0.99 * volume * dynamic_volume
×
591
                    data = (amp * data).astype(np.int16)
×
UNCOV
592
                    stream.write(bytes(data.data))
×
593
            finally:
UNCOV
594
                stream.stop_stream()
×
UNCOV
595
                stream.close()
×
596
        finally:
597
            p.terminate()
×
598

599
    def play(self, time_unit=1, volume=1.0):
12✔
600
        import multiprocessing as mp
×
UNCOV
601
        p = mp.Process(target=self._play,
×
602
                       args=(time_unit, volume),
603
                       daemon=True)
UNCOV
604
        p.start()
×
605

606

607
class WaveVStack(Waveform):
12✔
608

609
    def __init__(self, wlist: list[Waveform] = []):
12✔
610
        self.wlist = [(w.bounds, w.seq) for w in wlist]
12✔
611
        self.start = None
12✔
612
        self.stop = None
12✔
613
        self.sample_rate = None
12✔
614
        self.offset = 0
12✔
615
        self.shift = 0
12✔
616
        self.filters = None
12✔
617
        self.label = None
12✔
618
        self.function_lib = None
12✔
619

620
    def __begin(self):
12✔
621
        if self.wlist:
×
UNCOV
622
            v = [self._begin(bounds, seq) for bounds, seq in self.wlist]
×
UNCOV
623
            return min(v)
×
624
        else:
625
            return -inf
×
626

627
    def __end(self):
12✔
628
        if self.wlist:
×
UNCOV
629
            v = [self._end(bounds, seq) for bounds, seq in self.wlist]
×
UNCOV
630
            return max(v)
×
631
        else:
632
            return inf
×
633

634
    @property
12✔
635
    def begin(self):
12✔
UNCOV
636
        if self.start is None:
×
UNCOV
637
            return self.__begin()
×
638
        else:
639
            return max(self.start, self.__begin())
×
640

641
    @property
12✔
642
    def end(self):
12✔
UNCOV
643
        if self.stop is None:
×
UNCOV
644
            return self.__end()
×
645
        else:
UNCOV
646
            return min(self.stop, self.__end())
×
647

648
    def __call__(self, x, frag=False, out=None, function_lib=None):
12✔
649
        assert frag is False, 'WaveVStack does not support frag mode'
12✔
650
        out = np.full_like(x, self.offset, dtype=complex)
12✔
651
        if self.shift != 0:
12✔
652
            x = x - self.shift
12✔
653
        if function_lib is None:
12✔
654
            if self.function_lib is None:
12✔
655
                function_lib = _baseFunc
12✔
656
            else:
UNCOV
657
                function_lib = self.function_lib
×
658
        for bounds, seq in self.wlist:
12✔
659
            parts, dtype = calc_parts(bounds, seq, x, function_lib)
12✔
660
            self._fill_parts(parts, out)
12✔
661
        return out.real
12✔
662

663
    def tolist(self):
12✔
664
        l = [
12✔
665
            self.start,
666
            self.stop,
667
            self.offset,
668
            self.shift,
669
            self.sample_rate,
670
        ]
671
        if self.filters is None:
12✔
672
            l.append(None)
12✔
673
        else:
674
            sos, initial = self.filters
12✔
675
            sos = list(sos.reshape(-1))
12✔
676
            l.append(len(sos))
12✔
677
            l.extend(sos)
12✔
678
            l.append(initial)
12✔
679
        l.append(len(self.wlist))
12✔
680
        for bounds, seq in self.wlist:
12✔
681
            self._tolist(bounds, seq, l)
12✔
682
        return l
12✔
683

684
    @classmethod
12✔
685
    def fromlist(cls, l):
12✔
686
        w = cls()
12✔
687
        pos = 6
12✔
688
        w.start, w.stop, w.offset, w.shift, w.sample_rate, sos_size = l[:pos]
12✔
689
        if sos_size is not None:
12✔
690
            sos = np.array(l[pos:pos + sos_size]).reshape(-1, 6)
12✔
691
            pos += sos_size
12✔
692
            initial = l[pos]
12✔
693
            pos += 1
12✔
694
            w.filters = sos, initial
12✔
695
        n = l[pos]
12✔
696
        pos += 1
12✔
697
        for _ in range(n):
12✔
698
            bounds, seq, pos = cls._fromlist(l, pos)
12✔
699
            w.wlist.append((bounds, seq))
12✔
700
        return w
12✔
701

702
    def simplify(self, eps=1e-15):
12✔
703
        if not self.wlist:
12✔
704
            return zero()
12✔
705
        bounds, seq = wave_sum(self.wlist)
12✔
706
        wav = Waveform(bounds=bounds, seq=seq)
12✔
707
        if self.offset != 0:
12✔
UNCOV
708
            wav += self.offset
×
709
        if self.shift != 0:
12✔
UNCOV
710
            wav >>= self.shift
×
711
        wav = wav.simplify(eps)
12✔
712
        wav.start = self.start
12✔
713
        wav.stop = self.stop
12✔
714
        wav.sample_rate = self.sample_rate
12✔
715
        return wav
12✔
716

717
    @staticmethod
12✔
718
    def _rshift(wlist, time):
12✔
UNCOV
719
        if time == 0:
×
UNCOV
720
            return wlist
×
UNCOV
721
        return [(tuple(round(bound + time, NDIGITS) for bound in bounds),
×
722
                 tuple(shift(expr, time) for expr in seq))
723
                for bounds, seq in wlist]
724

725
    def __rshift__(self, time):
12✔
726
        ret = WaveVStack()
12✔
727
        ret.wlist = self.wlist
12✔
728
        ret.sample_rate = self.sample_rate
12✔
729
        ret.start = self.start
12✔
730
        ret.stop = self.stop
12✔
731
        ret.shift = self.shift + time
12✔
732
        ret.offset = self.offset
12✔
733
        return ret
12✔
734

735
    def __add__(self, other):
12✔
736
        ret = WaveVStack()
12✔
737
        ret.wlist.extend(self.wlist)
12✔
738
        if isinstance(other, WaveVStack):
12✔
739
            if other.shift != self.shift:
×
740
                ret.wlist = self._rshift(ret.wlist, self.shift)
×
UNCOV
741
                ret.wlist.extend(self._rshift(other.wlist, other.shift))
×
742
            else:
UNCOV
743
                ret.wlist.extend(other.wlist)
×
UNCOV
744
            ret.offset = self.offset + other.offset
×
745
        elif isinstance(other, Waveform):
12✔
746
            other <<= self.shift
12✔
747
            ret.wlist.append((other.bounds, other.seq))
12✔
748
        else:
749
            # ret.wlist.append(((+inf, ), (_const(1.0 * other), )))
750
            ret.offset += other
12✔
751
        return ret
12✔
752

753
    def __radd__(self, v):
12✔
UNCOV
754
        return self + v
×
755

756
    def __mul__(self, other):
12✔
757
        if isinstance(other, Waveform):
12✔
758
            other = other.simplify() << self.shift
12✔
759
            ret = WaveVStack([Waveform(*w) * other for w in self.wlist])
12✔
760
            if self.offset != 0:
12✔
761
                w = other * self.offset
×
762
                ret.wlist.append((w.bounds, w.seq))
×
763
            return ret
12✔
764
        else:
UNCOV
765
            ret = WaveVStack([Waveform(*w) * other for w in self.wlist])
×
766
            ret.offset = self.offset * other
×
UNCOV
767
            return ret
×
768

769
    def __rmul__(self, v):
12✔
770
        return self * v
×
771

772
    def __eq__(self, other):
12✔
UNCOV
773
        if self.wlist:
×
UNCOV
774
            return False
×
775
        else:
UNCOV
776
            return zero() == other
×
777

778
    def _repr_latex_(self):
12✔
779
        return r"\sum_{i=1}^{" + f"{len(self.wlist)}" + r"}" + r"f_i(t)"
×
780

781
    def __getstate__(self) -> tuple:
12✔
782
        function_lib = self.function_lib
×
783
        if function_lib:
×
784
            try:
×
785
                import dill
×
UNCOV
786
                function_lib = dill.dumps(function_lib)
×
UNCOV
787
            except:
×
UNCOV
788
                function_lib = None
×
UNCOV
789
        return (self.wlist, self.start, self.stop, self.sample_rate,
×
790
                self.offset, self.shift, self.filters, self.label,
791
                function_lib)
792

793
    def __setstate__(self, state: tuple) -> None:
12✔
794
        (self.wlist, self.start, self.stop, self.sample_rate, self.offset,
×
795
         self.shift, self.filters, self.label, function_lib) = state
796
        if function_lib:
×
797
            try:
×
798
                import dill
×
UNCOV
799
                function_lib = dill.loads(function_lib)
×
UNCOV
800
            except:
×
UNCOV
801
                function_lib = None
×
802
        self.function_lib = function_lib
×
803

804

805
def play(data, rate=48000):
12✔
806
    import io
×
807

808
    import pyaudio
×
809

810
    CHUNK = 1024
×
811

UNCOV
812
    max_amp = np.max(np.abs(data))
×
813

814
    if max_amp > 1:
×
815
        data /= max_amp
×
816

817
    data = np.array(2**15 * 0.999 * data, dtype=np.int16)
×
818
    buff = io.BytesIO(data.data)
×
UNCOV
819
    p = pyaudio.PyAudio()
×
820

UNCOV
821
    try:
×
822
        stream = p.open(format=pyaudio.paInt16,
×
823
                        channels=1,
824
                        rate=rate,
825
                        output=True)
826
        try:
×
UNCOV
827
            while True:
×
828
                data = buff.read(CHUNK)
×
UNCOV
829
                if data:
×
830
                    stream.write(data)
×
831
                else:
UNCOV
832
                    break
×
833
        finally:
UNCOV
834
            stream.stop_stream()
×
UNCOV
835
            stream.close()
×
836
    finally:
UNCOV
837
        p.terminate()
×
838

839

840
_zero_waveform = Waveform()
12✔
841
_one_waveform = Waveform(seq=(_one, ))
12✔
842

843

844
def zero():
12✔
845
    return _zero_waveform
12✔
846

847

848
def one():
12✔
849
    return _one_waveform
12✔
850

851

852
def const(c):
12✔
853
    return Waveform(seq=(_const(1.0 * c), ))
12✔
854

855

856
# register base function
857
def _format_LINEAR(shift, *args):
12✔
UNCOV
858
    if shift != 0:
×
859
        shift = _num_latex(-shift)
×
UNCOV
860
        if shift[0] == '-':
×
861
            return f"(t{shift})"
×
862
        else:
UNCOV
863
            return f"(t+{shift})"
×
864
    else:
865
        return 't'
×
866

867

868
def _format_GAUSSIAN(shift, *args):
12✔
869
    sigma = _num_latex(args[0] / np.sqrt(2))
×
870
    shift = _num_latex(-shift)
×
871
    if shift != '0':
×
UNCOV
872
        if shift[0] != '-':
×
UNCOV
873
            shift = '+' + shift
×
874
        if sigma == '1':
×
UNCOV
875
            return ('\\exp\\left[-\\frac{\\left(t' + shift +
×
876
                    '\\right)^2}{2}\\right]')
877
        else:
878
            return ('\\exp\\left[-\\frac{1}{2}\\left(\\frac{t' + shift + '}{' +
×
879
                    sigma + '}\\right)^2\\right]')
880
    else:
UNCOV
881
        if sigma == '1':
×
UNCOV
882
            return ('\\exp\\left(-\\frac{t^2}{2}\\right)')
×
883
        else:
UNCOV
884
            return ('\\exp\\left[-\\frac{1}{2}\\left(\\frac{t}{' + sigma +
×
885
                    '}\\right)^2\\right]')
886

887

888
def _format_SINC(shift, *args):
12✔
889
    shift = _num_latex(-shift)
×
890
    bw = _num_latex(args[0])
×
891
    if shift != '0':
×
UNCOV
892
        if shift[0] != '-':
×
893
            shift = '+' + shift
×
UNCOV
894
        if bw == '1':
×
895
            return '\\mathrm{sinc}(t' + shift + ')'
×
896
        else:
UNCOV
897
            return '\\mathrm{sinc}[' + bw + '(t' + shift + ')]'
×
898
    else:
UNCOV
899
        if bw == '1':
×
UNCOV
900
            return '\\mathrm{sinc}(t)'
×
901
        else:
902
            return '\\mathrm{sinc}(' + bw + 't)'
×
903

904

905
def _format_COSINE(shift, *args):
12✔
906
    freq = args[0] / 2 / np.pi
×
907
    phase = -shift * freq
×
908
    freq = _num_latex(freq)
×
909
    if freq == '1':
×
910
        freq = ''
×
911
    phase = _num_latex(phase)
×
912
    if phase == '0':
×
913
        phase = ''
×
914
    elif phase[0] != '-':
×
915
        phase = '+' + phase
×
UNCOV
916
    if phase != '':
×
917
        return f'\\cos\\left[2\\pi\\left({freq}t{phase}\\right)\\right]'
×
UNCOV
918
    elif freq != '':
×
UNCOV
919
        return f'\\cos\\left(2\\pi\\times {freq}t\\right)'
×
920
    else:
921
        return '\\cos\\left(2\\pi t\\right)'
×
922

923

924
def _format_ERF(shift, *args):
12✔
UNCOV
925
    if shift > 0:
×
926
        return '\\mathrm{erf}(\\frac{t-' + f"{_num_latex(shift)}" + '}{' + f'{args[0]:g}' + '})'
×
UNCOV
927
    elif shift < 0:
×
UNCOV
928
        return '\\mathrm{erf}(\\frac{t+' + f"{_num_latex(-shift)}" + '}{' + f'{args[0]:g}' + '})'
×
929
    else:
930
        return '\\mathrm{erf}(\\frac{t}{' + f'{args[0]:g}' + '})'
×
931

932

933
def _format_COSH(shift, *args):
12✔
UNCOV
934
    if shift > 0:
×
935
        return '\\cosh(\\frac{t-' + f"{_num_latex(shift)}" + '}{' + f'{1/args[0]:g}' + '})'
×
UNCOV
936
    elif shift < 0:
×
UNCOV
937
        return '\\cosh(\\frac{t+' + f"{_num_latex(-shift)}" + '}{' + f'{1/args[0]:g}' + '})'
×
938
    else:
939
        return '\\cosh(\\frac{t}{' + f'{1/args[0]:g}' + '})'
×
940

941

942
def _format_SINH(shift, *args):
12✔
UNCOV
943
    if shift > 0:
×
944
        return '\\sinh(\\frac{t-' + f"{_num_latex(shift)}" + '}{' + f'{args[0]:g}' + '})'
×
UNCOV
945
    elif shift < 0:
×
UNCOV
946
        return '\\sinh(\\frac{t+' + f"{_num_latex(-shift)}" + '}{' + f'{args[0]:g}' + '})'
×
947
    else:
948
        return '\\sinh(\\frac{t}{' + f'{args[0]:g}' + '})'
×
949

950

951
def _format_EXP(shift, *args):
12✔
UNCOV
952
    if _num_latex(shift) and shift > 0:
×
953
        return '\\exp\\left(-' + f'{args[0]:g}' + '\\left(t-' + f"{_num_latex(shift)}" + '\\right)\\right)'
×
UNCOV
954
    elif _num_latex(-shift) and shift < 0:
×
UNCOV
955
        return '\\exp\\left(-' + f'{args[0]:g}' + '\\left(t+' + f"{_num_latex(-shift)}" + '\\right)\\right)'
×
956
    else:
957
        return '\\exp\\left(-' + f'{args[0]:g}' + 't\\right)'
×
958

959

960
def _format_DRAG(shift, *args):
12✔
UNCOV
961
    return f"DRAG(...)"
×
962

963

964
registerBaseFuncLatex(LINEAR, _format_LINEAR)
12✔
965
registerBaseFuncLatex(GAUSSIAN, _format_GAUSSIAN)
12✔
966
registerBaseFuncLatex(ERF, _format_ERF)
12✔
967
registerBaseFuncLatex(COS, _format_COSINE)
12✔
968
registerBaseFuncLatex(SINC, _format_SINC)
12✔
969
registerBaseFuncLatex(EXP, _format_EXP)
12✔
970
registerBaseFuncLatex(COSH, _format_COSH)
12✔
971
registerBaseFuncLatex(SINH, _format_SINH)
12✔
972
registerBaseFuncLatex(DRAG, _format_DRAG)
12✔
973

974

975
def D(wav):
12✔
976
    """derivative
977
    """
978
    return Waveform(bounds=wav.bounds, seq=tuple(_D(x) for x in wav.seq))
×
979

980

981
def convolve(a, b):
12✔
982
    pass
×
983

984

985
def sign():
12✔
UNCOV
986
    return Waveform(bounds=(0, +inf), seq=(_const(-1), _one))
×
987

988

989
def step(edge, type='erf'):
12✔
990
    """
991
    type: "erf", "cos", "linear"
992
    """
993
    if edge == 0:
12✔
994
        return Waveform(bounds=(0, +inf), seq=(_zero, _one))
12✔
995
    if type == 'cos':
12✔
UNCOV
996
        rise = add(_half,
×
997
                   mul(_half, basic_wave(COS, pi / edge, shift=0.5 * edge)))
UNCOV
998
        return Waveform(bounds=(round(-edge / 2,
×
999
                                      NDIGITS), round(edge / 2,
1000
                                                      NDIGITS), +inf),
1001
                        seq=(_zero, rise, _one))
1002
    elif type == 'linear':
12✔
1003
        rise = add(_half, mul(_const(1 / edge), basic_wave(LINEAR)))
12✔
1004
        return Waveform(bounds=(round(-edge / 2,
12✔
1005
                                      NDIGITS), round(edge / 2,
1006
                                                      NDIGITS), +inf),
1007
                        seq=(_zero, rise, _one))
1008
    else:
UNCOV
1009
        std_sq2 = edge / 5
×
1010
        # rise = add(_half, mul(_half, basic_wave(ERF, std_sq2)))
UNCOV
1011
        rise = ((((), ()), (((ERF, std_sq2, 0), ), (1, ))), (0.5, 0.5))
×
UNCOV
1012
        return Waveform(bounds=(-round(edge, NDIGITS), round(edge,
×
1013
                                                             NDIGITS), +inf),
1014
                        seq=(_zero, rise, _one))
1015

1016

1017
def square(width, edge=0, type='erf'):
12✔
1018
    if width <= 0:
12✔
UNCOV
1019
        return zero()
×
1020
    if edge == 0:
12✔
1021
        return Waveform(bounds=(round(-0.5 * width,
12✔
1022
                                      NDIGITS), round(0.5 * width,
1023
                                                      NDIGITS), +inf),
1024
                        seq=(_zero, _one, _zero))
1025
    else:
1026
        return ((step(edge, type=type) << width / 2) -
12✔
1027
                (step(edge, type=type) >> width / 2))
1028

1029

1030
def gaussian(width, plateau=0.0):
12✔
1031
    if width <= 0 and plateau <= 0.0:
12✔
UNCOV
1032
        return zero()
×
1033
    # width is two times FWHM
1034
    # std_sq2 = width / (4 * np.sqrt(np.log(2)))
1035
    std_sq2 = width / 3.3302184446307908
12✔
1036
    # std is set to give total pulse area same as a square
1037
    # std_sq2 = width/np.sqrt(np.pi)
1038
    if round(0.5 * plateau, NDIGITS) <= 0.0:
12✔
1039
        return Waveform(bounds=(round(-0.75 * width,
12✔
1040
                                      NDIGITS), round(0.75 * width,
1041
                                                      NDIGITS), +inf),
1042
                        seq=(_zero, basic_wave(GAUSSIAN, std_sq2), _zero))
1043
    else:
UNCOV
1044
        return Waveform(bounds=(round(-0.75 * width - 0.5 * plateau,
×
1045
                                      NDIGITS), round(-0.5 * plateau, NDIGITS),
1046
                                round(0.5 * plateau, NDIGITS),
1047
                                round(0.75 * width + 0.5 * plateau,
1048
                                      NDIGITS), +inf),
1049
                        seq=(_zero,
1050
                             basic_wave(GAUSSIAN,
1051
                                        std_sq2,
1052
                                        shift=-0.5 * plateau), _one,
1053
                             basic_wave(GAUSSIAN, std_sq2,
1054
                                        shift=0.5 * plateau), _zero))
1055

1056

1057
def cos(w, phi=0):
12✔
1058
    if w == 0:
12✔
UNCOV
1059
        return const(np.cos(phi))
×
1060
    if w < 0:
12✔
UNCOV
1061
        phi = -phi
×
UNCOV
1062
        w = -w
×
1063
    return Waveform(seq=(basic_wave(COS, w, shift=-phi / w), ))
12✔
1064

1065

1066
def sin(w, phi=0):
12✔
1067
    if w == 0:
12✔
UNCOV
1068
        return const(np.sin(phi))
×
1069
    if w < 0:
12✔
UNCOV
1070
        phi = -phi + pi
×
UNCOV
1071
        w = -w
×
1072
    return Waveform(seq=(basic_wave(COS, w, shift=(pi / 2 - phi) / w), ))
12✔
1073

1074

1075
def exp(alpha):
12✔
1076
    if isinstance(alpha, complex):
12✔
1077
        if alpha.real == 0:
12✔
UNCOV
1078
            return cos(alpha.imag) + 1j * sin(alpha.imag)
×
1079
        else:
1080
            return exp(alpha.real) * (cos(alpha.imag) + 1j * sin(alpha.imag))
12✔
1081
    else:
1082
        return Waveform(seq=(basic_wave(EXP, alpha), ))
12✔
1083

1084

1085
def sinc(bw):
12✔
UNCOV
1086
    if bw <= 0:
×
UNCOV
1087
        return zero()
×
UNCOV
1088
    width = 100 / bw
×
UNCOV
1089
    return Waveform(bounds=(round(-0.5 * width,
×
1090
                                  NDIGITS), round(0.5 * width, NDIGITS), +inf),
1091
                    seq=(_zero, basic_wave(SINC, bw), _zero))
1092

1093

1094
def cosPulse(width, plateau=0.0):
12✔
1095
    # cos = basic_wave(COS, 2*np.pi/width)
1096
    # pulse = mul(add(cos, _one), _half)
1097
    if round(0.5 * plateau, NDIGITS) > 0:
×
UNCOV
1098
        return square(plateau + 0.5 * width, edge=0.5 * width, type='cos')
×
1099
    if width <= 0:
×
UNCOV
1100
        return zero()
×
UNCOV
1101
    pulse = ((((), ()), (((COS, 6.283185307179586 / width, 0), ), (1, ))),
×
1102
             (0.5, 0.5))
UNCOV
1103
    return Waveform(bounds=(round(-0.5 * width,
×
1104
                                  NDIGITS), round(0.5 * width, NDIGITS), +inf),
1105
                    seq=(_zero, pulse, _zero))
1106

1107

1108
def hanning(width, plateau=0.0):
12✔
1109
    return cosPulse(width, plateau=plateau)
×
1110

1111

1112
def cosh(w):
12✔
1113
    return Waveform(seq=(basic_wave(COSH, w), ))
×
1114

1115

1116
def sinh(w):
12✔
UNCOV
1117
    return Waveform(seq=(basic_wave(SINH, w), ))
×
1118

1119

1120
def coshPulse(width, eps=1.0, plateau=0.0):
12✔
1121
    """Cosine hyperbolic pulse with the following im
1122

1123
    pulse edge shape:
1124
            cosh(eps / 2) - cosh(eps * t / T)
1125
    f(t) = -----------------------------------
1126
                  cosh(eps / 2) - 1
1127
    where T is the pulse width and eps is the pulse edge steepness.
1128
    The pulse is defined for t in [-T/2, T/2].
1129

1130
    In case of plateau > 0, the pulse is defined as:
1131
           | f(t + plateau/2)   if t in [-T/2 - plateau/2, - plateau/2]
1132
    g(t) = | 1                  if t in [-plateau/2, plateau/2]
1133
           | f(t - plateau/2)   if t in [plateau/2, T/2 + plateau/2]
1134

1135
    Parameters
1136
    ----------
1137
    width : float
1138
        Pulse width.
1139
    eps : float
1140
        Pulse edge steepness.
1141
    plateau : float
1142
        Pulse plateau.
1143
    """
UNCOV
1144
    if width <= 0 and plateau <= 0:
×
1145
        return zero()
×
UNCOV
1146
    w = eps / width
×
1147
    A = np.cosh(eps / 2)
×
1148

1149
    if plateau == 0.0 or round(-0.5 * plateau, NDIGITS) == round(
×
1150
            0.5 * plateau, NDIGITS):
UNCOV
1151
        pulse = ((((), ()), (((COSH, w, 0), ), (1, ))), (A / (A - 1),
×
1152
                                                         -1 / (A - 1)))
UNCOV
1153
        return Waveform(bounds=(round(-0.5 * width,
×
1154
                                      NDIGITS), round(0.5 * width,
1155
                                                      NDIGITS), +inf),
1156
                        seq=(_zero, pulse, _zero))
1157
    else:
1158
        raising = ((((), ()), (((COSH, w, -0.5 * plateau), ), (1, ))),
×
1159
                   (A / (A - 1), -1 / (A - 1)))
UNCOV
1160
        falling = ((((), ()), (((COSH, w, 0.5 * plateau), ), (1, ))),
×
1161
                   (A / (A - 1), -1 / (A - 1)))
UNCOV
1162
        return Waveform(bounds=(round(-0.5 * width - 0.5 * plateau,
×
1163
                                      NDIGITS), round(-0.5 * plateau, NDIGITS),
1164
                                round(0.5 * plateau, NDIGITS),
1165
                                round(0.5 * width + 0.5 * plateau,
1166
                                      NDIGITS), +inf),
1167
                        seq=(_zero, raising, _one, falling, _zero))
1168

1169

1170
def general_cosine(duration, *arg):
12✔
1171
    wav = zero()
×
1172
    arg = np.asarray(arg)
×
UNCOV
1173
    arg /= arg[::2].sum()
×
UNCOV
1174
    for i, a in enumerate(arg, start=1):
×
UNCOV
1175
        wav += a / 2 * (1 - (-1)**i * cos(i * 2 * pi / duration))
×
1176
    return wav * square(duration)
×
1177

1178

1179
def slepian(duration, *arg):
12✔
1180
    wav = zero()
×
1181
    arg = np.asarray(arg)
×
UNCOV
1182
    arg /= arg[::2].sum()
×
UNCOV
1183
    for i, a in enumerate(arg, start=1):
×
UNCOV
1184
        wav += a / 2 * (1 - (-1)**i * cos(i * 2 * pi / duration))
×
UNCOV
1185
    return wav * square(duration)
×
1186

1187

1188
def _poly(*a):
12✔
1189
    """
1190
    a[0] + a[1] * t + a[2] * t**2 + ...
1191
    """
1192
    t = []
12✔
1193
    amp = []
12✔
1194
    if a[0] != 0:
12✔
1195
        t.append(((), ()))
12✔
1196
        amp.append(a[0])
12✔
1197
    for n, a_ in enumerate(a[1:], start=1):
12✔
1198
        if a_ != 0:
12✔
1199
            t.append((((LINEAR, 0), ), (n, )))
12✔
1200
            amp.append(a_)
12✔
1201
    return tuple(t), tuple(a)
12✔
1202

1203

1204
def poly(a):
12✔
1205
    """
1206
    a[0] + a[1] * t + a[2] * t**2 + ...
1207
    """
1208
    return Waveform(seq=(_poly(*a), ))
12✔
1209

1210

1211
def t():
12✔
1212
    return Waveform(seq=((((LINEAR, 0), ), (1, )), (1, )))
×
1213

1214

1215
def drag(freq, width, plateau=0, delta=0, block_freq=None, phase=0, t0=0):
12✔
UNCOV
1216
    phase += pi * delta * (width + plateau)
×
UNCOV
1217
    if plateau <= 0:
×
UNCOV
1218
        return Waveform(seq=(_zero,
×
1219
                             basic_wave(DRAG, t0, freq, width, delta,
1220
                                        block_freq, phase), _zero),
1221
                        bounds=(round(t0, NDIGITS), round(t0 + width,
1222
                                                          NDIGITS), +inf))
UNCOV
1223
    elif width <= 0:
×
UNCOV
1224
        w = 2 * pi * (freq + delta)
×
UNCOV
1225
        return Waveform(
×
1226
            seq=(_zero,
1227
                 basic_wave(COS, w,
1228
                            shift=(phase + 2 * pi * delta * t0) / w), _zero),
1229
            bounds=(round(t0, NDIGITS), round(t0 + plateau, NDIGITS), +inf))
1230
    else:
UNCOV
1231
        w = 2 * pi * (freq + delta)
×
UNCOV
1232
        return Waveform(
×
1233
            seq=(_zero,
1234
                 basic_wave(DRAG, t0, freq, width, delta, block_freq, phase),
1235
                 basic_wave(COS, w, shift=(phase + 2 * pi * delta * t0) / w),
1236
                 basic_wave(DRAG, t0 + plateau, freq, width, delta, block_freq,
1237
                            phase - 2 * pi * delta * plateau), _zero),
1238
            bounds=(round(t0, NDIGITS), round(t0 + width / 2, NDIGITS),
1239
                    round(t0 + width / 2 + plateau,
1240
                          NDIGITS), round(t0 + width + plateau,
1241
                                          NDIGITS), +inf))
1242

1243

1244
def chirp(f0, f1, T, phi0=0, type='linear'):
12✔
1245
    """
1246
    A chirp is a signal in which the frequency increases (up-chirp)
1247
    or decreases (down-chirp) with time. In some sources, the term
1248
    chirp is used interchangeably with sweep signal.
1249

1250
    type: "linear", "exponential", "hyperbolic"
1251
    """
1252
    if f0 == f1:
12✔
UNCOV
1253
        return sin(f0, phi0)
×
1254
    if T <= 0:
12✔
UNCOV
1255
        raise ValueError('T must be positive')
×
1256

1257
    if type == 'linear':
12✔
1258
        # f(t) = f1 * (t/T) + f0 * (1 - t/T)
1259
        return Waveform(bounds=(0, round(T, NDIGITS), +inf),
12✔
1260
                        seq=(_zero, basic_wave(LINEARCHIRP, f0, f1, T,
1261
                                               phi0), _zero))
1262
    elif type in ['exp', 'exponential', 'geometric']:
12✔
1263
        # f(t) = f0 * (f1/f0) ** (t/T)
1264
        if f0 == 0:
12✔
UNCOV
1265
            raise ValueError('f0 must be non-zero')
×
1266
        alpha = np.log(f1 / f0) / T
12✔
1267
        return Waveform(bounds=(0, round(T, NDIGITS), +inf),
12✔
1268
                        seq=(_zero,
1269
                             basic_wave(EXPONENTIALCHIRP, f0, alpha,
1270
                                        phi0), _zero))
1271
    elif type in ['hyperbolic', 'hyp']:
12✔
1272
        # f(t) = f0 * f1 / (f0 * (t/T) + f1 * (1-t/T))
1273
        if f0 * f1 == 0:
12✔
UNCOV
1274
            return const(np.sin(phi0))
×
1275
        k = (f0 - f1) / (f1 * T)
12✔
1276
        return Waveform(bounds=(0, round(T, NDIGITS), +inf),
12✔
1277
                        seq=(_zero, basic_wave(HYPERBOLICCHIRP, f0, k,
1278
                                               phi0), _zero))
1279
    else:
1280
        raise ValueError(f'unknown type {type}')
×
1281

1282

1283
def interp(x, y):
12✔
1284
    seq, bounds = [_zero], [x[0]]
×
UNCOV
1285
    for x1, x2, y1, y2 in zip(x[:-1], x[1:], y[:-1], y[1:]):
×
UNCOV
1286
        if x2 == x1:
×
UNCOV
1287
            continue
×
UNCOV
1288
        seq.append(
×
1289
            add(
1290
                mul(_const((y2 - y1) / (x2 - x1)), basic_wave(LINEAR,
1291
                                                              shift=x1)),
1292
                _const(y1)))
UNCOV
1293
        bounds.append(x2)
×
UNCOV
1294
    bounds.append(inf)
×
UNCOV
1295
    seq.append(_zero)
×
UNCOV
1296
    return Waveform(seq=tuple(seq),
×
1297
                    bounds=tuple(round(b, NDIGITS)
1298
                                 for b in bounds)).simplify()
1299

1300

1301
def cut(wav, start=None, stop=None, head=None, tail=None, min=None, max=None):
12✔
1302
    offset = 0
×
1303
    if start is not None and head is not None:
×
UNCOV
1304
        offset = head - wav(np.array([1.0 * start]))[0]
×
1305
    elif stop is not None and tail is not None:
×
1306
        offset = tail - wav(np.array([1.0 * stop]))[0]
×
1307
    wav = wav + offset
×
1308

1309
    if start is not None:
×
1310
        wav = wav * (step(0) >> start)
×
1311
    if stop is not None:
×
1312
        wav = wav * ((1 - step(0)) >> stop)
×
1313
    if min is not None:
×
UNCOV
1314
        wav.min = min
×
UNCOV
1315
    if max is not None:
×
UNCOV
1316
        wav.max = max
×
1317
    return wav
×
1318

1319

1320
def function(fun, *args, start=None, stop=None):
12✔
1321
    TYPEID = registerBaseFunc(fun)
×
1322
    seq = (basic_wave(TYPEID, *args), )
×
1323
    wav = Waveform(seq=seq)
×
1324
    if start is not None:
×
UNCOV
1325
        wav = wav * (step(0) >> start)
×
UNCOV
1326
    if stop is not None:
×
UNCOV
1327
        wav = wav * ((1 - step(0)) >> stop)
×
1328
    return wav
×
1329

1330

1331
def samplingPoints(start, stop, points):
12✔
UNCOV
1332
    return Waveform(bounds=(round(start, NDIGITS), round(stop, NDIGITS), inf),
×
1333
                    seq=(_zero, basic_wave(INTERP, start, stop,
1334
                                           tuple(points)), _zero))
1335

1336

1337
def mixing(I,
12✔
1338
           Q=None,
1339
           *,
1340
           phase=0.0,
1341
           freq=0.0,
1342
           ratioIQ=1.0,
1343
           phaseDiff=0.0,
1344
           block_freq=None,
1345
           DRAGScaling=None):
1346
    """SSB or envelope mixing
1347
    """
1348
    if Q is None:
×
1349
        I = I
×
UNCOV
1350
        Q = zero()
×
1351

1352
    w = 2 * pi * freq
×
UNCOV
1353
    if freq != 0.0:
×
1354
        # SSB mixing
1355
        Iout = I * cos(w, -phase) + Q * sin(w, -phase)
×
1356
        Qout = -I * sin(w, -phase + phaseDiff) + Q * cos(w, -phase + phaseDiff)
×
1357
    else:
1358
        # envelope mixing
1359
        Iout = I * np.cos(-phase) + Q * np.sin(-phase)
×
1360
        Qout = -I * np.sin(-phase) + Q * np.cos(-phase)
×
1361

1362
    # apply DRAG
1363
    if block_freq is not None and block_freq != freq:
×
1364
        a = block_freq / (block_freq - freq)
×
1365
        b = 1 / (block_freq - freq)
×
UNCOV
1366
        I = a * Iout + b / (2 * pi) * D(Qout)
×
1367
        Q = a * Qout - b / (2 * pi) * D(Iout)
×
1368
        Iout, Qout = I, Q
×
1369
    elif DRAGScaling is not None and DRAGScaling != 0:
×
1370
        # 2 * pi * scaling * (freq - block_freq) = 1
1371
        I = (1 - w * DRAGScaling) * Iout - DRAGScaling * D(Qout)
×
UNCOV
1372
        Q = (1 - w * DRAGScaling) * Qout + DRAGScaling * D(Iout)
×
1373
        Iout, Qout = I, Q
×
1374

UNCOV
1375
    Qout = ratioIQ * Qout
×
1376

UNCOV
1377
    return Iout, Qout
×
1378

1379

1380
__all__ = [
12✔
1381
    'D', 'Waveform', 'chirp', 'const', 'cos', 'cosh', 'coshPulse', 'cosPulse',
1382
    'cut', 'drag', 'exp', 'function', 'gaussian', 'general_cosine', 'hanning',
1383
    'interp', 'mixing', 'one', 'poly', 'registerBaseFunc',
1384
    'registerDerivative', 'samplingPoints', 'sign', 'sin', 'sinc', 'sinh',
1385
    'square', 'step', 't', 'zero'
1386
]
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc