• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

qutech / qupulse / 12966231730

25 Jan 2025 03:31PM UTC coverage: 88.582%. First build
12966231730

Pull #885

github

web-flow
Merge 738c882a8 into b2dbe4328
Pull Request #885: Improve ProgramBuilder documentation

27 of 28 new or added lines in 2 files covered. (96.43%)

18465 of 20845 relevant lines covered (88.58%)

5.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.59
/qupulse/program/linspace.py
1
# SPDX-FileCopyrightText: 2014-2024 Quantum Technology Group and Chair of Software Engineering, RWTH Aachen University
2
#
3
# SPDX-License-Identifier: GPL-3.0-or-later
4

5
import abc
6✔
6
import contextlib
6✔
7
import dataclasses
6✔
8
import numpy as np
6✔
9
from dataclasses import dataclass
6✔
10
from typing import Mapping, Optional, Sequence, ContextManager, Iterable, Tuple, Union, Dict, List, Iterator
6✔
11

12
from qupulse import ChannelID, MeasurementWindow
6✔
13
from qupulse.parameter_scope import Scope, MappedScope, FrozenDict
6✔
14
from qupulse.program import (ProgramBuilder, HardwareTime, HardwareVoltage, Waveform, RepetitionCount, TimeType,
6✔
15
                             SimpleExpression)
16
from qupulse.program.waveforms import MultiChannelWaveform
6✔
17

18
# this resolution is used to unify increments
19
# the increments themselves remain floats
20
DEFAULT_INCREMENT_RESOLUTION: float = 1e-9
6✔
21

22

23
@dataclass(frozen=True)
6✔
24
class DepKey:
6✔
25
    """The key that identifies how a certain set command depends on iteration indices. The factors are rounded with a
26
    given resolution to be independent on rounding errors.
27

28
    These objects allow backends which support it to track multiple amplitudes at once.
29
    """
30
    factors: Tuple[int, ...]
6✔
31

32
    @classmethod
6✔
33
    def from_voltages(cls, voltages: Sequence[float], resolution: float):
6✔
34
        # remove trailing zeros
35
        while voltages and voltages[-1] == 0:
6✔
36
            voltages = voltages[:-1]
6✔
37
        return cls(tuple(int(round(voltage / resolution)) for voltage in voltages))
6✔
38

39

40
@dataclass
6✔
41
class LinSpaceNode:
6✔
42
    """AST node for a program that supports linear spacing of set points as well as nested sequencing and repetitions"""
43

44
    def dependencies(self) -> Mapping[int, set]:
6✔
NEW
45
        raise NotImplementedError
×
46

47
    def reversed(self, offset: int, lengths: list):
6✔
48
        """Get the time reversed version of this linspace node. Since this is a non-local operation the arguments give
49
        the context.
50

51
        Args:
52
            offset:  Active iterations that are not reserved
53
            lengths: Lengths of the currently active iterations that have to be reversed
54

55
        Returns:
56
            Time reversed version.
57
        """
58
        raise NotImplementedError
×
59

60

61
@dataclass
6✔
62
class LinSpaceHold(LinSpaceNode):
6✔
63
    """Hold voltages for a given time. The voltages and the time may depend on the iteration index."""
64

65
    bases: Tuple[float, ...]
6✔
66
    factors: Tuple[Optional[Tuple[float, ...]], ...]
6✔
67

68
    duration_base: TimeType
6✔
69
    duration_factors: Optional[Tuple[TimeType, ...]]
6✔
70

71
    def dependencies(self) -> Mapping[int, set]:
6✔
72
        return {idx: {factors}
6✔
73
                for idx, factors in enumerate(self.factors)
74
                if factors}
75

76
    def reversed(self, offset: int, lengths: list):
6✔
77
        if not lengths:
6✔
78
            return self
×
79
        # If the iteration length is `n`, the starting point is shifted by `n - 1`
80
        steps = [length - 1 for length in lengths]
6✔
81
        bases = []
6✔
82
        factors = []
6✔
83
        for ch_base, ch_factors in zip(self.bases, self.factors):
6✔
84
            if ch_factors is None or len(ch_factors) <= offset:
6✔
85
                bases.append(ch_base)
×
86
                factors.append(ch_factors)
×
87
            else:
88
                ch_reverse_base = ch_base + sum(step * factor
6✔
89
                                                for factor, step in zip(ch_factors[offset:], steps))
90
                reversed_factors = ch_factors[:offset] + tuple(-f for f in ch_factors[offset:])
6✔
91
                bases.append(ch_reverse_base)
6✔
92
                factors.append(reversed_factors)
6✔
93

94
        if self.duration_factors is None or len(self.duration_factors) <= offset:
6✔
95
            duration_factors = self.duration_factors
6✔
96
            duration_base = self.duration_base
6✔
97
        else:
98
            duration_base = self.duration_base + sum((step * factor
×
99
                                                      for factor, step in zip(self.duration_factors[offset:], steps)), TimeType(0))
100
            duration_factors = self.duration_factors[:offset] + tuple(-f for f in self.duration_factors[offset:])
×
101
        return LinSpaceHold(tuple(bases), tuple(factors), duration_base=duration_base, duration_factors=duration_factors)
6✔
102

103

104
@dataclass
6✔
105
class LinSpaceArbitraryWaveform(LinSpaceNode):
6✔
106
    """This is just a wrapper to pipe arbitrary waveforms through the system."""
107
    waveform: Waveform
6✔
108
    channels: Tuple[ChannelID, ...]
6✔
109

110
    def reversed(self, offset: int, lengths: list):
6✔
111
        return LinSpaceArbitraryWaveform(
6✔
112
            waveform=self.waveform.reversed(),
113
            channels=self.channels,
114
        )
115

116

117
@dataclass
6✔
118
class LinSpaceRepeat(LinSpaceNode):
6✔
119
    """Repeat the body count times."""
120
    body: Tuple[LinSpaceNode, ...]
6✔
121
    count: int
6✔
122

123
    def dependencies(self):
6✔
124
        dependencies = {}
6✔
125
        for node in self.body:
6✔
126
            for idx, deps in node.dependencies().items():
6✔
127
                dependencies.setdefault(idx, set()).update(deps)
6✔
128
        return dependencies
6✔
129

130
    def reversed(self, offset: int, counts: list):
6✔
131
        return LinSpaceRepeat(tuple(node.reversed(offset, counts) for node in reversed(self.body)), self.count)
×
132

133

134
@dataclass
6✔
135
class LinSpaceIter(LinSpaceNode):
6✔
136
    """Iteration in linear space are restricted to range 0 to length.
137

138
    Offsets and spacing are stored in the hold node."""
139
    body: Tuple[LinSpaceNode, ...]
6✔
140
    length: int
6✔
141

142
    def dependencies(self):
6✔
143
        dependencies = {}
6✔
144
        for node in self.body:
6✔
145
            for idx, deps in node.dependencies().items():
6✔
146
                # remove the last elemt in index because this iteration sets it -> no external dependency
147
                shortened = {dep[:-1] for dep in deps}
6✔
148
                if shortened != {()}:
6✔
149
                    dependencies.setdefault(idx, set()).update(shortened)
6✔
150
        return dependencies
6✔
151

152
    def reversed(self, offset: int, lengths: list):
6✔
153
        lengths.append(self.length)
6✔
154
        reversed_iter = LinSpaceIter(tuple(node.reversed(offset, lengths) for node in reversed(self.body)), self.length)
6✔
155
        lengths.pop()
6✔
156
        return reversed_iter
6✔
157

158

159
class LinSpaceBuilder(ProgramBuilder):
6✔
160
    """This program builder supports efficient translation of pulse templates that use symbolic linearly
161
    spaced voltages and durations.
162

163
    The channel identifiers are reduced to their index in the given channel tuple.
164

165
    Arbitrary waveforms are not implemented yet
166
    """
167

168
    def __init__(self, channels: Tuple[ChannelID, ...]):
6✔
169
        super().__init__()
6✔
170
        self._name_to_idx = {name: idx for idx, name in enumerate(channels)}
6✔
171
        self._idx_to_name = channels
6✔
172

173
        self._stack = [[]]
6✔
174
        self._ranges = []
6✔
175

176
    def _root(self):
6✔
177
        return self._stack[0]
6✔
178

179
    def _get_rng(self, idx_name: str) -> range:
6✔
180
        return self._get_ranges()[idx_name]
×
181

182
    def inner_scope(self, scope: Scope) -> Scope:
6✔
183
        """This function is necessary to inject program builder specific parameter implementations into the build
184
        process."""
185
        if self._ranges:
6✔
186
            name, _ = self._ranges[-1]
6✔
187
            return scope.overwrite({name: SimpleExpression(base=0, offsets={name: 1})})
6✔
188
        else:
189
            return scope
6✔
190

191
    def _get_ranges(self):
6✔
192
        return dict(self._ranges)
6✔
193

194
    def hold_voltage(self, duration: HardwareTime, voltages: Mapping[ChannelID, HardwareVoltage]):
6✔
195
        voltages = sorted((self._name_to_idx[ch_name], value) for ch_name, value in voltages.items())
6✔
196
        voltages = [value for _, value in voltages]
6✔
197

198
        ranges = self._get_ranges()
6✔
199
        factors = []
6✔
200
        bases = []
6✔
201
        for value in voltages:
6✔
202
            if isinstance(value, float):
6✔
203
                bases.append(value)
6✔
204
                factors.append(None)
6✔
205
                continue
6✔
206
            offsets = value.offsets
6✔
207
            base = value.base
6✔
208
            incs = []
6✔
209
            for rng_name, rng in ranges.items():
6✔
210
                start = 0.
6✔
211
                step = 0.
6✔
212
                offset = offsets.get(rng_name, None)
6✔
213
                if offset:
6✔
214
                    start += rng.start * offset
6✔
215
                    step += rng.step * offset
6✔
216
                base += start
6✔
217
                incs.append(step)
6✔
218
            factors.append(tuple(incs))
6✔
219
            bases.append(base)
6✔
220

221
        if isinstance(duration, SimpleExpression):
6✔
222
            duration_factors = duration.offsets
×
223
            duration_base = duration.base
×
224
        else:
225
            duration_base = duration
6✔
226
            duration_factors = None
6✔
227

228
        set_cmd = LinSpaceHold(bases=tuple(bases),
6✔
229
                               factors=tuple(factors),
230
                               duration_base=duration_base,
231
                               duration_factors=duration_factors)
232

233
        self._stack[-1].append(set_cmd)
6✔
234

235
    def play_arbitrary_waveform(self, waveform: Waveform):
6✔
236
        return self._stack[-1].append(LinSpaceArbitraryWaveform(waveform, self._idx_to_name))
6✔
237

238
    def measure(self, measurements: Optional[Sequence[MeasurementWindow]]):
6✔
239
        """Ignores measurements"""
240
        pass
6✔
241

242
    def with_repetition(self, repetition_count: RepetitionCount,
6✔
243
                        measurements: Optional[Sequence[MeasurementWindow]] = None) -> Iterable['ProgramBuilder']:
244
        if repetition_count == 0:
6✔
245
            return
×
246
        self._stack.append([])
6✔
247
        yield self
6✔
248
        blocks = self._stack.pop()
6✔
249
        if blocks:
6✔
250
            self._stack[-1].append(LinSpaceRepeat(body=tuple(blocks), count=repetition_count))
6✔
251

252
    @contextlib.contextmanager
6✔
253
    def with_sequence(self,
6✔
254
                      measurements: Optional[Sequence[MeasurementWindow]] = None) -> ContextManager['ProgramBuilder']:
255
        yield self
6✔
256

257
    def new_subprogram(self, global_transformation: 'Transformation' = None) -> ContextManager['ProgramBuilder']:
6✔
258
        raise NotImplementedError('Not implemented yet (postponed)')
6✔
259

260
    def with_iteration(self, index_name: str, rng: range,
6✔
261
                       measurements: Optional[Sequence[MeasurementWindow]] = None) -> Iterable['ProgramBuilder']:
262
        if len(rng) == 0:
6✔
263
            return
×
264
        self._stack.append([])
6✔
265
        self._ranges.append((index_name, rng))
6✔
266
        yield self
6✔
267
        cmds = self._stack.pop()
6✔
268
        self._ranges.pop()
6✔
269
        if cmds:
6✔
270
            self._stack[-1].append(LinSpaceIter(body=tuple(cmds), length=len(rng)))
6✔
271

272
    @contextlib.contextmanager
6✔
273
    def time_reversed(self) -> ContextManager['LinSpaceBuilder']:
6✔
274
        self._stack.append([])
6✔
275
        yield self
6✔
276
        inner = self._stack.pop()
6✔
277
        offset = len(self._ranges)
6✔
278
        self._stack[-1].extend(node.reversed(offset, []) for node in reversed(inner))
6✔
279

280
    def to_program(self) -> Optional[Sequence[LinSpaceNode]]:
6✔
281
        if self._root():
6✔
282
            return self._root()
6✔
283

284

285
@dataclass
6✔
286
class LoopLabel:
6✔
287
    idx: int
6✔
288
    count: int
6✔
289

290

291
@dataclass
6✔
292
class Increment:
6✔
293
    channel: int
6✔
294
    value: float
6✔
295
    dependency_key: DepKey
6✔
296

297

298
@dataclass
6✔
299
class Set:
6✔
300
    channel: int
6✔
301
    value: float
6✔
302
    key: DepKey = dataclasses.field(default_factory=lambda: DepKey(()))
6✔
303

304

305
@dataclass
6✔
306
class Wait:
6✔
307
    duration: TimeType
6✔
308

309

310
@dataclass
6✔
311
class LoopJmp:
6✔
312
    idx: int
6✔
313

314

315
@dataclass
6✔
316
class Play:
6✔
317
    waveform: Waveform
6✔
318
    channels: Tuple[ChannelID]
6✔
319

320

321
Command = Union[Increment, Set, LoopLabel, LoopJmp, Wait, Play]
6✔
322

323

324
@dataclass(frozen=True)
6✔
325
class DepState:
6✔
326
    base: float
6✔
327
    iterations: Tuple[int, ...]
6✔
328

329
    def required_increment_from(self, previous: 'DepState', factors: Sequence[float]) -> float:
6✔
330
        """Calculate the required increment from the previous state to the current given the factors that determine
331
        the voltage dependency of each index.
332

333
        By convention there are only two possible values for each iteration index integer in self: 0 or the last index
334
        The three possible increments for each iteration are none, regular and jump to next line.
335
        
336
        The previous dependency state can have a different iteration length if the trailing factors now or during the
337
        last iteration are zero.
338

339
        Args:
340
            previous: The previous state to calculate the required increment from. It has to belong to the same DepKey.
341
            factors: The number of factors has to be the same as the current number of iterations.
342

343
        Returns:
344
            The increment
345
        """
346
        assert len(self.iterations) == len(factors)
6✔
347

348
        increment = self.base - previous.base
6✔
349
        for old, new, factor in zip(previous.iterations, self.iterations, factors):
6✔
350
            # By convention there are only two possible values for each integer here: 0 or the last index
351
            # The three possible increments are none, regular and jump to next line
352

353
            if old == new:
6✔
354
                # we are still in the same iteration of this sweep
355
                pass
6✔
356

357
            elif old < new:
6✔
358
                assert old == 0
6✔
359
                # regular iteration, although the new value will probably be > 1, the resulting increment will be
360
                # applied multiple times so only one factor is needed.
361
                increment += factor
6✔
362

363
            else:
364
                assert new == 0
6✔
365
                # we need to jump back. The old value gives us the number of increments to reverse
366
                increment -= factor * old
6✔
367
        return increment
6✔
368

369

370
@dataclass
6✔
371
class _TranslationState:
6✔
372
    """This is the state of a translation of a LinSpace program to a command sequence."""
373

374
    label_num: int = dataclasses.field(default=0)
6✔
375
    commands: List[Command] = dataclasses.field(default_factory=list)
6✔
376
    iterations: List[int] = dataclasses.field(default_factory=list)
6✔
377
    active_dep: Dict[int, DepKey] = dataclasses.field(default_factory=dict)
6✔
378
    dep_states: Dict[int, Dict[DepKey, DepState]] = dataclasses.field(default_factory=dict)
6✔
379
    plain_voltage: Dict[int, float] = dataclasses.field(default_factory=dict)
6✔
380
    resolution: float = dataclasses.field(default_factory=lambda: DEFAULT_INCREMENT_RESOLUTION)
6✔
381

382
    def new_loop(self, count: int):
6✔
383
        label = LoopLabel(self.label_num, count)
6✔
384
        jmp = LoopJmp(self.label_num)
6✔
385
        self.label_num += 1
6✔
386
        return label, jmp
6✔
387

388
    def get_dependency_state(self, dependencies: Mapping[int, set]):
6✔
389
        return {
6✔
390
            self.dep_states.get(ch, {}).get(DepKey.from_voltages(dep, self.resolution), None)
391
            for ch, deps in dependencies.items()
392
            for dep in deps
393
        }
394

395
    def set_voltage(self, channel: int, value: float):
6✔
396
        key = DepKey(())
6✔
397
        if self.active_dep.get(channel, None) != key or self.plain_voltage.get(channel, None) != value:
6✔
398
            self.commands.append(Set(channel, value, key))
6✔
399
            self.active_dep[channel] = key
6✔
400
            self.plain_voltage[channel] = value
6✔
401

402
    def _add_repetition_node(self, node: LinSpaceRepeat):
6✔
403
        pre_dep_state = self.get_dependency_state(node.dependencies())
6✔
404
        label, jmp = self.new_loop(node.count)
6✔
405
        initial_position = len(self.commands)
6✔
406
        self.commands.append(label)
6✔
407
        self.add_node(node.body)
6✔
408
        post_dep_state = self.get_dependency_state(node.dependencies())
6✔
409
        if pre_dep_state != post_dep_state:
6✔
410
            # hackedy
411
            self.commands.pop(initial_position)
6✔
412
            self.commands.append(label)
6✔
413
            label.count -= 1
6✔
414
            self.add_node(node.body)
6✔
415
        self.commands.append(jmp)
6✔
416

417
    def _add_iteration_node(self, node: LinSpaceIter):
6✔
418
        self.iterations.append(0)
6✔
419
        self.add_node(node.body)
6✔
420

421
        if node.length > 1:
6✔
422
            self.iterations[-1] = node.length - 1
6✔
423
            label, jmp = self.new_loop(node.length - 1)
6✔
424
            self.commands.append(label)
6✔
425
            self.add_node(node.body)
6✔
426
            self.commands.append(jmp)
6✔
427
        self.iterations.pop()
6✔
428

429
    def _set_indexed_voltage(self, channel: int, base: float, factors: Sequence[float]):
6✔
430
        dep_key = DepKey.from_voltages(voltages=factors, resolution=self.resolution)
6✔
431
        new_dep_state = DepState(
6✔
432
            base,
433
            iterations=tuple(self.iterations)
434
        )
435

436
        current_dep_state = self.dep_states.setdefault(channel, {}).get(dep_key, None)
6✔
437
        if current_dep_state is None:
6✔
438
            assert all(it == 0 for it in self.iterations)
6✔
439
            self.commands.append(Set(channel, base, dep_key))
6✔
440
            self.active_dep[channel] = dep_key
6✔
441

442
        else:
443
            inc = new_dep_state.required_increment_from(previous=current_dep_state, factors=factors)
6✔
444

445
            # we insert all inc here (also inc == 0) because it signals to activate this amplitude register
446
            if inc or self.active_dep.get(channel, None) != dep_key:
6✔
447
                self.commands.append(Increment(channel, inc, dep_key))
6✔
448
            self.active_dep[channel] = dep_key
6✔
449
        self.dep_states[channel][dep_key] = new_dep_state
6✔
450

451
    def _add_hold_node(self, node: LinSpaceHold):
6✔
452
        if node.duration_factors:
6✔
453
            raise NotImplementedError("TODO")
×
454

455
        for ch, (base, factors) in enumerate(zip(node.bases, node.factors)):
6✔
456
            if factors is None:
6✔
457
                self.set_voltage(ch, base)
6✔
458
                continue
6✔
459

460
            else:
461
                self._set_indexed_voltage(ch, base, factors)
6✔
462

463
        self.commands.append(Wait(node.duration_base))
6✔
464

465
    def add_node(self, node: Union[LinSpaceNode, Sequence[LinSpaceNode]]):
6✔
466
        """Translate a (sequence of) linspace node(s) to commands and add it to the internal command list."""
467
        if isinstance(node, Sequence):
6✔
468
            for lin_node in node:
6✔
469
                self.add_node(lin_node)
6✔
470

471
        elif isinstance(node, LinSpaceRepeat):
6✔
472
            self._add_repetition_node(node)
6✔
473

474
        elif isinstance(node, LinSpaceIter):
6✔
475
            self._add_iteration_node(node)
6✔
476

477
        elif isinstance(node, LinSpaceHold):
6✔
478
            self._add_hold_node(node)
6✔
479

480
        elif isinstance(node, LinSpaceArbitraryWaveform):
6✔
481
            self.commands.append(Play(node.waveform, node.channels))
6✔
482

483
        else:
484
            raise TypeError("The node type is not handled", type(node), node)
×
485

486

487
def to_increment_commands(linspace_nodes: Sequence[LinSpaceNode]) -> List[Command]:
6✔
488
    """translate the given linspace node tree to a minimal sequence of set and increment commands as well as loops."""
489
    state = _TranslationState()
6✔
490
    state.add_node(linspace_nodes)
6✔
491
    return state.commands
6✔
492

493

494
class LinSpaceVM:
6✔
495
    def __init__(self, channels: int,
6✔
496
                 sample_resolution: TimeType = TimeType.from_fraction(1, 2)):
497
        self.current_values = [np.nan] * channels
6✔
498
        self.sample_resolution = sample_resolution
6✔
499
        self.time = TimeType(0)
6✔
500
        self.registers = tuple({} for _ in range(channels))
6✔
501

502
        self.history: List[Tuple[TimeType, Tuple[float, ...]]] = []
6✔
503

504
        self.commands = None
6✔
505
        self.label_targets = None
6✔
506
        self.label_counts = None
6✔
507
        self.current_command = None
6✔
508

509
    def change_state(self, cmd: Union[Set, Increment, Wait, Play]):
6✔
510
        if isinstance(cmd, Play):
6✔
511
            dt = self.sample_resolution
6✔
512
            t = TimeType(0)
6✔
513
            total_duration = cmd.waveform.duration
6✔
514
            while t <= total_duration and dt > 0:
6✔
515
                sample_time = np.array([float(t)])
6✔
516
                values = []
6✔
517
                for (idx, ch) in enumerate(cmd.channels):
6✔
518
                    self.current_values[idx] = values.append(cmd.waveform.get_sampled(channel=ch, sample_times=sample_time)[0])
6✔
519
                self.history.append(
6✔
520
                    (self.time, self.current_values.copy())
521
                )
522
                dt = min(total_duration - t, self.sample_resolution)
6✔
523
                self.time += dt
6✔
524
                t += dt
6✔
525
        elif isinstance(cmd, Wait):
6✔
526
            self.history.append(
6✔
527
                (self.time, self.current_values.copy())
528
            )
529
            self.time += cmd.duration
6✔
530
        elif isinstance(cmd, Set):
6✔
531
            self.current_values[cmd.channel] = cmd.value
6✔
532
            self.registers[cmd.channel][cmd.key] = cmd.value
6✔
533
        elif isinstance(cmd, Increment):
6✔
534
            value = self.registers[cmd.channel][cmd.dependency_key]
6✔
535
            value += cmd.value
6✔
536
            self.registers[cmd.channel][cmd.dependency_key] = value
6✔
537
            self.current_values[cmd.channel] = value
6✔
538
        else:
539
            raise NotImplementedError(cmd)
×
540

541
    def set_commands(self, commands: Sequence[Command]):
6✔
542
        self.commands = []
6✔
543
        self.label_targets = {}
6✔
544
        self.label_counts = {}
6✔
545
        self.current_command = None
6✔
546

547
        for cmd in commands:
6✔
548
            self.commands.append(cmd)
6✔
549
            if isinstance(cmd, LoopLabel):
6✔
550
                # a loop label signifies a reset count followed by the actual label that targets the following command
551
                assert cmd.idx not in self.label_targets
6✔
552
                self.label_targets[cmd.idx] = len(self.commands)
6✔
553

554
        self.current_command = 0
6✔
555

556
    def step(self):
6✔
557
        cmd = self.commands[self.current_command]
6✔
558
        if isinstance(cmd, LoopJmp):
6✔
559
            if self.label_counts[cmd.idx] > 0:
6✔
560
                self.label_counts[cmd.idx] -= 1
6✔
561
                self.current_command = self.label_targets[cmd.idx]
6✔
562
            else:
563
                # ignore jump
564
                self.current_command += 1
6✔
565
        elif isinstance(cmd, LoopLabel):
6✔
566
            self.label_counts[cmd.idx] = cmd.count - 1
6✔
567
            self.current_command += 1
6✔
568
        else:
569
            self.change_state(cmd)
6✔
570
            self.current_command += 1
6✔
571

572
    def run(self):
6✔
573
        while self.current_command < len(self.commands):
6✔
574
            self.step()
6✔
575

576

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc