• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Nic30 / hwt / 8b945dd2-6cbf-43b3-be54-881b32d4f5e4

10 Jun 2025 09:35PM UTC coverage: 84.734% (-2.6%) from 87.284%
8b945dd2-6cbf-43b3-be54-881b32d4f5e4

push

circleci

Nic30
test: fix call of unittestMain

3415 of 4468 branches covered (76.43%)

Branch coverage included in aggregate %.

11877 of 13579 relevant lines covered (87.47%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.16
/hwt/code.py
1
from operator import and_, or_, xor, add, eq
1✔
2
from types import GeneratorType
1✔
3
from typing import Union, Sequence, Optional, Tuple
1✔
4

5
from hdlConvertorAst.to.hdlUtils import iter_with_last
1✔
6
from hwt.code_utils import _mkOp, _HwIOToRtlSignal
1✔
7
from hwt.hdl.const import HConst
1✔
8
from hwt.hdl.operatorDefs import concatFn
1✔
9
from hwt.hdl.statements.codeBlockContainer import HdlStmCodeBlockContainer
1✔
10
from hwt.hdl.statements.ifContainter import IfContainer
1✔
11
from hwt.hdl.statements.statement import HwtSyntaxError, HdlStatement
1✔
12
from hwt.hdl.statements.switchContainer import SwitchContainer
1✔
13
from hwt.hdl.statements.utils.listOfHdlStatements import ListOfHdlStatement
1✔
14
from hwt.hdl.types.bits import HBits
1✔
15
from hwt.hdl.types.enum import HEnum
1✔
16
from hwt.hdl.types.typeCast import toHVal
1✔
17
from hwt.mainBases import HwIOBase, HwModuleBase
1✔
18
from hwt.mainBases import RtlSignalBase
1✔
19
from hwt.math import log2ceil
1✔
20
from hwt.pyUtils.arrayQuery import arr_any
1✔
21
from hwt.synthesizer.rtlLevel.rtlSignalWalkers import \
1✔
22
    discoverEventDependency
23

24

25
class CodeBlock(HdlStmCodeBlockContainer):
1✔
26
    """
27
    Container for list of statements
28
    """
29

30
    def __init__(self, *statements: Sequence[HdlStatement]):
1✔
31
        super(CodeBlock, self).__init__()
1✔
32
        self._register_stements(statements, self.statements)
1✔
33
        self.rank = sum(map(lambda s: s.rank, statements))
1✔
34

35
        if self._outputs:
1!
36
            ctx = self._get_rtl_context()
1✔
37
            ctx.statements.add(self)
1✔
38

39

40
class If(IfContainer):
1✔
41
    """
42
    If statement generator
43
    """
44

45
    def __init__(self, cond: Union[RtlSignalBase, HwIOBase], *statements: Sequence[HdlStatement]):
1✔
46
        """
47
        :param cond: condition in if statement
48
        :param statements: list of statements which should be active
49
            if condition is met
50
        """
51
        cond_sig = _HwIOToRtlSignal(cond)
1✔
52
        if not isinstance(cond_sig, RtlSignalBase):
1!
53
            raise HwtSyntaxError("Condition is not signal, it is not certain"
×
54
                                 " if this is an error or desire ", cond_sig)
55

56
        assert cond_sig._dtype.bit_length() == 1, cond_sig
1✔
57
        super(If, self).__init__(cond_sig)
1✔
58
        self.rank = 1
1✔
59
        self._inputs.append(cond_sig)
1✔
60
        cond_sig._rtlEndpoints.append(self)
1✔
61

62
        ev_dep = arr_any(discoverEventDependency(cond_sig), lambda x: True)
1✔
63
        self._event_dependent_from_branch = 0 if ev_dep else None
1✔
64

65
        self._register_stements(statements, self.ifTrue)
1✔
66
        self._get_rtl_context().statements.add(self)
1✔
67

68
    def Elif(self, cond: Union[RtlSignalBase, HwIOBase], *statements: Sequence[HdlStatement]):
1✔
69
        assert self.parentStm is None
1✔
70
        self.rank += 1
1✔
71
        cond_sig = _HwIOToRtlSignal(cond)
1✔
72

73
        assert cond_sig._dtype.bit_length() == 1, cond_sig
1✔
74
        ev_dep = arr_any(discoverEventDependency(cond_sig), lambda x: True)
1✔
75
        self._event_dependent_from_branch = len(self.elIfs) + 1 if ev_dep else None
1✔
76

77
        self._inputs.append(cond_sig)
1✔
78
        cond_sig._rtlEndpoints.append(self)
1✔
79

80
        stms = ListOfHdlStatement()
1✔
81
        self.elIfs.append((cond_sig, stms))
1✔
82
        self._register_stements(statements, stms)
1✔
83

84
        return self
1✔
85

86
    def Else(self, *statements: Sequence[HdlStatement]):
1✔
87
        assert self.parentStm is None
1✔
88
        if self.ifFalse is not None:
1!
89
            raise HwtSyntaxError(
×
90
                "Else on this if-then-else statement was already used")
91

92
        self.rank += 1
1✔
93

94
        self.ifFalse = ListOfHdlStatement()
1✔
95
        self._register_stements(statements, self.ifFalse)
1✔
96
        return self
1✔
97

98

99
class Switch(SwitchContainer):
1✔
100
    """
101
    Switch statement generator
102
    """
103

104
    def __init__(self, switchOn: Union[RtlSignalBase, HwIOBase]):
1✔
105
        switchOn = _HwIOToRtlSignal(switchOn)
1✔
106
        if not isinstance(switchOn, RtlSignalBase):
1!
107
            raise HwtSyntaxError("Select is not signal, it is not certain"
×
108
                                 " if this is an error or desire")
109
        if arr_any(discoverEventDependency(switchOn), lambda x: True):
1!
110
            raise HwtSyntaxError("Can not switch on result of event operator")
×
111

112
        super(Switch, self).__init__(switchOn, [])
1✔
113
        switchOn._rtlCtx.statements.add(self)
1✔
114
        self._inputs.append(switchOn)
1✔
115
        switchOn._rtlEndpoints.append(self)
1✔
116

117
    def add_cases(self, tupesValStms: Sequence[Tuple[Union[HConst, int], Sequence[HdlStatement]]]):
1✔
118
        """
119
        Add multiple case statements from iterable of tuples
120
        (caseVal, statements)
121
        """
122
        s = self
1✔
123
        for val, statements in tupesValStms:
1✔
124
            s = s.Case(val, statements)
1✔
125
        return s
1✔
126

127
    def Case(self, caseVal: Union[HConst, int], *statements: Sequence[HdlStatement]):
1✔
128
        "c-like case of switch statement"
129
        assert self.parentStm is None
1✔
130
        caseVal = toHVal(caseVal, self.switchOn._dtype)
1✔
131

132
        assert isinstance(caseVal, HConst), caseVal
1✔
133
        assert caseVal._is_full_valid(), "Cmp with invalid value"
1✔
134
        assert caseVal not in self._case_value_index, (
1✔
135
            "Switch statement already has case for value ", caseVal)
136

137
        self.rank += 1
1✔
138
        stms = ListOfHdlStatement()
1✔
139
        self._case_value_index[caseVal] = len(self.cases)
1✔
140
        self.cases.append((caseVal, stms))
1✔
141
        self._register_stements(statements, stms)
1✔
142

143
        return self
1✔
144

145
    def Default(self, *statements: Sequence[HdlStatement]):
1✔
146
        """c-like default of switch statement
147
        """
148
        assert self.parentStm is None
1✔
149
        self.rank += 1
1✔
150
        self.default = ListOfHdlStatement()
1✔
151
        self._register_stements(statements, self.default)
1✔
152
        return self
1✔
153

154

155
def SwitchLogic(cases: Sequence[Tuple[Union[RtlSignalBase, HwIOBase, HConst, bool], Sequence[HdlStatement]]],
1✔
156
                default: Optional[Sequence[HdlStatement]]=None):
157
    """
158
    Generate if tree for cases like (syntax sugar for large generated elifs)
159

160
    ..code-block:: python
161
        if cond0:
162
            statements0
163
        elif cond1:
164
            statements1
165
        else:
166
            default
167

168
    :param case: iterable of tuples (condition, statements)
169
    :param default: default statements
170
    """
171
    assigTop = None
1✔
172
    hasElse = False
1✔
173
    for last, (cond, statements) in iter_with_last(cases):
1✔
174
        if isinstance(cond, (RtlSignalBase, HwIOBase)):
1✔
175
            if assigTop is None:
1✔
176
                assigTop = If(cond,
1✔
177
                             statements
178
                           )
179
            else:
180
                assigTop = assigTop.Elif(cond, statements)
1✔
181
        else:
182
            if cond:
1!
183
                if assigTop is None:
1!
184
                    assigTop = statements
1✔
185
                else:
186
                    assigTop.Else(statements)
×
187
                    hasElse = True
×
188

189
                if last or isinstance(cases, GeneratorType):
1!
190
                    # allow True as a condition for default
191
                    break
1✔
192

193
            raise HwtSyntaxError("Condition is not a signal, it is not certain"
×
194
                                 " if this is an error or desire ", cond, cases)
195

196
    if assigTop is None:
1✔
197
        if default is None:
1!
198
            return []
×
199
        else:
200
            return default
1✔
201
    else:
202
        if hasElse:
1!
203
            return assigTop
×
204
        elif default is not None:
1✔
205
            assigTop = assigTop.Else(default)
1✔
206

207
        return assigTop
1✔
208

209

210
def In(sigOrConst: Union[RtlSignalBase, HwIOBase, HConst], iterable: Sequence[Union[RtlSignalBase, HwIOBase, HConst]]):
1✔
211
    """
212
    HDL convertible "in" operator, check if any of items
213
    in "iterable" equals "sigOrConst"
214
    """
215
    res = None
1✔
216
    for i in iterable:
1✔
217
        i = toHVal(i)
1✔
218
        if res is None:
1✔
219
            res = sigOrConst._eq(i)
1✔
220
        else:
221
            res = res | sigOrConst._eq(i)
1✔
222

223
    assert res is not None, "argument iterable is empty"
1✔
224
    return res
1✔
225

226

227
def StaticForEach(parentModule: HwModuleBase, items, bodyFn, name=""):
1✔
228
    """
229
    Generate for loop for static items
230

231
    :param parentModule: HwModule where this code should be instantiated
232
    :param items: items which this "for" iterating on
233
    :param bodyFn: function which fn(item, index) or fn(item)
234
        returns (statementList, ack).
235
        It's content is performed in every iteration.
236
        When ack is high loop will fall to next iteration
237
    """
238

239
    items = list(items)
1✔
240
    itemsCnt = len(items)
1✔
241
    if itemsCnt == 0:
1!
242
        # if there are no items there is nothing to generate
243
        return []
×
244
    elif itemsCnt == 1:
1✔
245
        # if there is only one item do not generate counter logic generate
246
        return bodyFn(items[0], 0)
1✔
247
    else:
248
        # if there is multiple items we have to generate counter logic
249
        index = parentModule._reg(name + "for_index",
1✔
250
                                HBits(log2ceil(itemsCnt + 1), signed=False),
251
                                def_val=0)
252
        ackSig = parentModule._sig(name + "for_ack")
1✔
253

254
        statementLists = []
1✔
255
        for i, (statementList, ack) in [(i, bodyFn(item, i))
1✔
256
                                        for i, item in enumerate(items)]:
257
            statementLists.append(statementList + [(ackSig(ack)), ])
1✔
258

259
        If(ackSig,
1✔
260
           If(index._eq(itemsCnt - 1),
261
              index(0)
262
              ).Else(
263
               index(index + 1)
264
           )
265
           )
266

267
        return Switch(index)\
1✔
268
            .add_cases(
269
            enumerate(statementLists)
270
        ).Default(
271
            bodyFn(items[0], 0)[0],
272
            ackSig(True)
273
        )
274

275

276
class FsmBuilder(Switch):
1✔
277
    """
278
    A syntax sugar which automatically construct the state transition switch and state register
279

280
    :ivar ~.stateReg: register with state
281
    """
282

283
    def __init__(self, parentModule: HwModuleBase, stateT, stateRegName="st"):
1✔
284
        """
285
        :param parentModule: parent HwModule where FSM should be builded
286
        :param stateT: enum type of state
287
        :param stateRegName: name of register where sate is stored
288
        """
289
        if isinstance(stateT, HEnum):
1!
290
            beginVal = stateT.from_py(stateT._allValues[0])
1✔
291
        else:
292
            beginVal = 0
×
293

294
        self.stateReg = parentModule._reg(stateRegName, stateT, beginVal)
1✔
295
        Switch.__init__(self, self.stateReg)
1✔
296

297
    def Trans(self, stateFrom, *condAndNextState):
1✔
298
        """
299
        :param stateFrom: apply when FSM is in this state
300
        :param condAndNextState: tuples (condition, newState),
301
            last does not to have condition
302

303
        :attention: transitions has priority, first has the biggest
304
        :attention: if stateFrom is None it is evaluated as default
305
        """
306
        top = []
1✔
307
        last = True
1✔
308

309
        for cAndS in reversed(condAndNextState):
1✔
310
            if last is True:
1✔
311
                last = False
1✔
312
                # if this is last trans. it does not have to condition
313
                try:
1✔
314
                    condition, newvalue = cAndS
1✔
315
                except TypeError:
1✔
316
                    top = self.stateReg(cAndS)
1✔
317
                    continue
1✔
318
                top = []
1✔
319

320
            else:
321
                condition, newvalue = cAndS
1✔
322

323
            # building decision tree
324
            top = \
1✔
325
                If(condition,
326
                   self.stateReg(newvalue)
327
                ).Else(
328
                    top
329
                )
330
        if stateFrom is None:
1!
331
            return Switch.Default(self, top)
×
332
        else:
333
            return Switch.Case(self, stateFrom, top)
1✔
334

335
    def Default(self, *condAndNextState):
1✔
336
        d = self.Trans(None, *condAndNextState)
×
337
        d.stateReg = self.stateReg
×
338
        return d
×
339

340

341
# variadic operator functions
342
And = _mkOp(and_)
1✔
343
Add = _mkOp(add)
1✔
344
Or = _mkOp(or_)
1✔
345
Xor = _mkOp(xor)  # :note: xor is bitwise !=
1✔
346
Xnor = _mkOp(eq)  # :note: xnor is bitwise ==
1✔
347
Concat = _mkOp(concatFn)
1✔
348

349

350
def ror(sig:Union[RtlSignalBase, HConst], howMany: int) -> RtlSignalBase:
1✔
351
    "Rotate right"
352
    if sig._dtype.bit_length() == 1:
1✔
353
        return sig
1✔
354

355
    if isinstance(howMany, int):
1!
356
        return sig[howMany:]._concat(sig[:howMany])
1✔
357
    elif isinstance(howMany, HConst):
×
358
        return ror(sig, int(howMany))
×
359
    else:
360
        t = howMany._dtype
×
361
        if not isinstance(t, HBits) or t.signed:
×
362
            raise NotImplementedError(t)
363
        res = sig
×
364
        for i in range(1, t.domain_size() - 1):
×
365
            res = howMany._eq(i)._ternary(ror(sig, i), res)
×
366
        return  res
×
367

368

369
def rol(sig:Union[RtlSignalBase, HConst], howMany:Union[RtlSignalBase, int]) -> RtlSignalBase:
1✔
370
    "Rotate left"
371
    if isinstance(howMany, int):
1!
372
        width = sig._dtype.bit_length()
1✔
373
        if width == 1:
1!
374
            return sig
×
375
        return sig[(width - howMany):]._concat(sig[:(width - howMany)])
1✔
376
    elif isinstance(howMany, HConst):
×
377
        return rol(sig, int(howMany))
×
378
    else:
379
        t = howMany._dtype
×
380
        if not isinstance(t, HBits) or t.signed:
×
381
            raise NotImplementedError(t)
382
        res = sig
×
383
        for i in range(1, t.domain_size() - 1):
×
384
            res = howMany._eq(i)._ternary(rol(sig, i), res)
×
385
        return  res
×
386

387

388
def replicate(n:int, v:Union[RtlSignalBase, HConst]):
1✔
389
    assert n > 0, n
1✔
390
    return Concat(*(v for _ in range(n)))
1✔
391

392

393
def segment_get(n:Union[RtlSignalBase, HConst],
1✔
394
                segmentWidth:int,
395
                segmentIndex:Union[RtlSignalBase, HConst, int]):
396
    """
397
    This function gets bits from bit vector as if it was an array of items of "segmentWidth" bits
398
    """
399
    return n[segmentWidth * (segmentIndex + 1): segmentWidth * segmentIndex]
×
400

401

402
def split_to_segments(n:Union[RtlSignalBase, HConst], maxSegmentWidth:int, allowLastToBeSmaller=False, extendLast=False):
1✔
403
    """
404
    Split bit vector to a segments of up to maxSegmentWidth bits, lower bits first.
405
    """
406
    offset = 0
×
407
    segments = []
×
408
    width = n._dtype.bit_length()
×
409
    if not allowLastToBeSmaller and not extendLast:
×
410
        assert width % maxSegmentWidth == 0, (width, maxSegmentWidth)
×
411

412
    while True:
×
413
        end = min(offset + maxSegmentWidth, width)
×
414
        segments.append(n[end:offset])
×
415
        if end == width:
×
416
            if extendLast and end != offset + maxSegmentWidth:
×
417
                segments[-1] = segments[-1]._ext(maxSegmentWidth)
×
418
            break
×
419
        offset = end
×
420
    return segments
×
421

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc