• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Nic30 / hwtLib / 66ace1bd-fd2b-4b24-93d9-8e675610c989

pending completion
66ace1bd-fd2b-4b24-93d9-8e675610c989

push

circleci

Nic30
StreamNode: rm mebers from ack expression if they do not have sync sig.

9688 of 10705 branches covered (90.5%)

15 of 15 new or added lines in 1 file covered. (100.0%)

37527 of 40106 relevant lines covered (93.57%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.87
/hwtLib/handshaked/streamNode.py
1
from typing import List, Optional, Tuple, Union, Dict
1✔
2

3
from hwt.code import And, Or
1✔
4
from hwt.hdl.statements.assignmentContainer import HdlAssignmentContainer
1✔
5
from hwt.hdl.types.defs import BIT
1✔
6
from hwt.hdl.value import HValue
1✔
7
from hwt.synthesizer.interface import Interface
1✔
8
from hwt.synthesizer.rtlLevel.constants import NOT_SPECIFIED
1✔
9
from hwt.synthesizer.rtlLevel.rtlSignal import RtlSignal
1✔
10

11

12
def _get_ready_signal(intf: Union[Interface, Tuple[RtlSignal, RtlSignal]]) -> RtlSignal:
1✔
13
    try:
1✔
14
        return intf.rd
1✔
15
    except AttributeError:
1✔
16
        pass
1✔
17

18
    if isinstance(intf, Tuple):
1✔
19
        _, rd = intf
1✔
20
        return rd
1✔
21

22
    return intf.ready
1✔
23

24

25
def _get_valid_signal(intf: Union[Interface, Tuple[RtlSignal, RtlSignal]]) -> RtlSignal:
1✔
26
    try:
1✔
27
        return intf.vld
1✔
28
    except AttributeError:
1✔
29
        pass
1✔
30

31
    if isinstance(intf, Tuple):
1✔
32
        vld, _ = intf
1✔
33
        return vld
1✔
34

35
    return intf.valid
1✔
36

37

38
def _exStreamMemberAck(m) -> RtlSignal:
1✔
39
    c, n = m
1✔
40
    return c & n.ack()
1✔
41

42

43
class ExclusiveStreamGroups(list):
1✔
44
    """
45
    list of tuples (cond, StreamNode instance)
46
    Only one stream from this group can be activated at the time
47
    """
48

49
    def __hash__(self):
1✔
50
        return id(self)
1✔
51

52
    def sync(self, enSig=None) -> List[HdlAssignmentContainer]:
1✔
53
        """
54
        Create synchronization logic between streams
55
        (generate valid/ready synchronization logic for interfaces)
56

57
        :param enSig: optional signal to enable this group of nodes
58
        :return: list of assignments which are responsible for synchronization
59
            of streams
60
        """
61
        expression = []
1✔
62
        for cond, node in self:
1✔
63
            if enSig is not None:
1!
64
                cond = cond & enSig
1✔
65
            expression.extend(node.sync(cond))
1✔
66
        return expression
1✔
67

68
    def ack(self) -> RtlSignal:
1✔
69
        """
70
        :return: expression which's value is high when transaction can be made
71
            over at least on child streaming node
72
        """
73
        return Or(*(_exStreamMemberAck(m) for m in self))
1✔
74

75

76
INTERFACE_OR_VLD_RD_TUPLE = Union[Interface, Tuple[Union[RtlSignal, int], Union[RtlSignal, int]]]
1✔
77

78

79
class StreamNode():
1✔
80
    """
81
    Group of stream master and slave interfaces to synchronize them to each other
82

83
    :ivar ~.masters: interfaces which are inputs into this node
84
    :ivar ~.slaves: interfaces which are outputs of this node
85
    :ivar ~.extraConds: {dict interface : extraConditionSignal}
86
        where extra conditions will be added to expression for channel enable.
87
        For master it means it will obtain ready=1 only if extraConditionSignal
88
        is 1.
89
        For slave it means it will obtain valid=1 only
90
        if extraConditionSignal is 1.
91
        All interfaces have to wait on each other so if an extraCond!=1 it causes
92
        blocking on all interfaces if not overridden by skipWhen.
93
    :note: instead of interface it is possilble to use tuple (valid, ready) signal
94
    :ivar ~.skipWhen: dict interface : skipSignal
95
        where if skipSignal is high interface is disconnected from stream
96
        sync node and others does not have to wait on it
97
        (master does not need to have valid and slave ready)
98
    :attention: skipWhen has higher priority
99
    """
100

101
    def __init__(self,
1✔
102
                 masters: Optional[List[INTERFACE_OR_VLD_RD_TUPLE]]=None,
103
                 slaves: Optional[List[INTERFACE_OR_VLD_RD_TUPLE]]=None,
104
                 extraConds: Optional[Dict[INTERFACE_OR_VLD_RD_TUPLE, RtlSignal]]=None,
105
                 skipWhen: Optional[Dict[INTERFACE_OR_VLD_RD_TUPLE, RtlSignal]]=None):
106
        if masters is None:
1✔
107
            masters = []
1✔
108
        if slaves is None:
1✔
109
            slaves = []
1✔
110
        if extraConds is None:
1✔
111
            extraConds = {}
1✔
112
        if skipWhen is None:
1✔
113
            skipWhen = {}
1✔
114

115
        self.masters = masters
1✔
116
        self.slaves = slaves
1✔
117
        self.extraConds = extraConds
1✔
118
        self.skipWhen = skipWhen
1✔
119

120
    def sync(self, enSig: Optional[RtlSignal]=None) -> List[HdlAssignmentContainer]:
1✔
121
        """
122
        Create synchronization logic between streams
123
        (generate valid/ready synchronization logic for interfaces)
124

125
        :param enSig: optional signal to enable this node
126
        :return: list of assignements which are responsible for synchronization of streams
127
        """
128
        masters = self.masters
1✔
129
        slaves = self.slaves
1✔
130

131
        if not masters and not slaves:
1!
132
            # node is empty
133
            assert not self.extraConds
×
134
            assert not self.skipWhen
×
135
            return []
×
136

137
        # check if there is not not any mess in extraConds/skipWhen
138
        for i in self.extraConds.keys():
1✔
139
            assert i in masters or i in slaves, i
1✔
140

141
        for i in self.skipWhen.keys():
1✔
142
            assert i in masters or i in slaves, i
1✔
143

144
        # this expression container is there to allow usage of this function
145
        # in usual hdl containers like If, Switch etc...
146
        expression = []
1✔
147
        for m in masters:
1✔
148
            r = self.ackForMaster(m)
1✔
149
            if enSig is not None:
1✔
150
                r = r & enSig
1✔
151

152
            if isinstance(m, ExclusiveStreamGroups):
1✔
153
                a = m.sync(r)
1✔
154
            else:
155
                rd = _get_ready_signal(m)
1✔
156
                if isinstance(rd, int):
1!
157
                    assert rd == 1
×
158
                    a = []
×
159
                else:
160
                    a = [rd(r), ]
1✔
161

162
            expression.extend(a)
1✔
163

164
        for s in slaves:
1✔
165
            v = self.ackForSlave(s)
1✔
166

167
            if enSig is not None:
1✔
168
                v = v & enSig
1✔
169

170
            if isinstance(s, ExclusiveStreamGroups):
1!
171
                a = s.sync(v)
×
172
            else:
173
                vld = _get_valid_signal(s)
1✔
174
                if isinstance(vld, int):
1!
175
                    assert vld == 1
×
176
                    a = []
×
177
                else:
178
                    a = [vld(v), ]
1✔
179

180
            expression.extend(a)
1✔
181

182
        return expression
1✔
183

184
    def ack(self) -> RtlSignal:
1✔
185
        """
186
        :return: expression which's value is high when transaction can be made over interfaces
187
        """
188
        # every interface has to have skip flag or it has to be ready/valid
189
        # and extraCond has to be True if present
190
        acks = []
1✔
191
        for m in self.masters:
1✔
192
            extra, skip = self.getExtraAndSkip(m)
1✔
193
            if isinstance(m, ExclusiveStreamGroups):
1✔
194
                a = m.ack()
1✔
195
            else:
196
                a = _get_valid_signal(m)
1✔
197
            
198
            if isinstance(a, (int, HValue)):
1!
199
                assert int(a) == 1, (m, a)
×
200
                a = BIT.from_py(1)
×
201
            else:
202
                if extra:
1✔
203
                    a = And(a, *extra)
1✔
204
    
205
                if skip is not None:
1✔
206
                    a = Or(a, skip)
1✔
207

208
            acks.append(a)
1✔
209

210
        for s in self.slaves:
1✔
211
            extra, skip = self.getExtraAndSkip(s)
1✔
212
            if isinstance(s, ExclusiveStreamGroups):
1!
213
                a = s.ack()
×
214
            else:
215
                a = _get_ready_signal(s)
1✔
216

217
            if isinstance(a, (int, HValue)):
1!
218
                assert int(a) == 1, (s, a)
×
219
                a = BIT.from_py(1)
×
220
            else:
221
                if extra:
1✔
222
                    a = And(a, *extra)
1✔
223
    
224
                if skip is not None:
1✔
225
                    a = Or(a, skip)
1✔
226

227
            acks.append(a)
1✔
228

229
        if not acks:
1!
230
            return True
×
231

232
        return And(*acks)
1✔
233

234
    def getExtraAndSkip(self, intf: INTERFACE_OR_VLD_RD_TUPLE) -> Tuple[Optional[RtlSignal], Optional[RtlSignal]]:
1✔
235
        """
236
        :return: optional extraCond and skip flags for interface
237
        """
238
        try:
1✔
239
            extra = [self.extraConds[intf], ]
1✔
240
        except KeyError:
1✔
241
            extra = []
1✔
242

243
        try:
1✔
244
            skip = self.skipWhen[intf]
1✔
245
        except KeyError:
1✔
246
            skip = None
1✔
247

248
        return extra, skip
1✔
249

250
    def vld(self, intf: INTERFACE_OR_VLD_RD_TUPLE) -> RtlSignal:
1✔
251
        """
252
        :return: valid signal of master interface for synchronization of othres
253
        """
254
        try:
1✔
255
            s = self.skipWhen[intf]
1✔
256
            assert s is not None
1✔
257
        except KeyError:
1✔
258
            s = None
1✔
259

260
        if isinstance(intf, ExclusiveStreamGroups):
1!
261
            v = intf.ack()
×
262
        else:
263
            v = _get_valid_signal(intf)
1✔
264

265
        if isinstance(v, int):
1!
266
            assert v == 1, v
×
267
            return BIT.from_py(1)
×
268

269
        if s is None:
1✔
270
            return v
1✔
271
        else:
272
            return v | s
1✔
273

274
    def rd(self, intf: INTERFACE_OR_VLD_RD_TUPLE) -> RtlSignal:
1✔
275
        """
276
        :return: ready signal of slave interface for synchronization of othres
277
        """
278
        try:
1✔
279
            s = self.skipWhen[intf]
1✔
280
            assert s is not None
1✔
281
        except KeyError:
1✔
282
            s = None
1✔
283

284
        if isinstance(intf, ExclusiveStreamGroups):
1!
285
            r = intf.ack()
×
286
        else:
287
            r = _get_ready_signal(intf)
1✔
288

289
        if isinstance(r, int):
1!
290
            assert r == 1, r
×
291
            return BIT.from_py(1)
×
292

293
        if s is None:
1✔
294
            return r
1✔
295
        else:
296
            return r | s
1✔
297

298
    def ackForMaster(self, master: INTERFACE_OR_VLD_RD_TUPLE) -> RtlSignal:
1✔
299
        """
300
        :return: driver of ready signal for master
301
        """
302
        extra, skip = self.getExtraAndSkip(master)
1✔
303

304
        rd = self.rd
1✔
305
        vld = self.vld
1✔
306
        conds = [*(vld(m) for m in self.masters if m is not master),
1✔
307
                 *(rd(s) for s in self.slaves),
308
                 *extra]
309
        if conds:
1✔
310
            r = And(*conds)
1✔
311
        else:
312
            r = BIT.from_py(1)
1✔
313

314
        if skip is not None:
1✔
315
            r = r & ~skip
1✔
316

317
        return r
1✔
318

319
    def ackForSlave(self, slave: INTERFACE_OR_VLD_RD_TUPLE) -> RtlSignal:
1✔
320
        """
321
        :return: driver of valid signal for slave
322
        """
323
        extra, skip = self.getExtraAndSkip(slave)
1✔
324

325
        rd = self.rd
1✔
326
        vld = self.vld
1✔
327
        conds = [*(vld(m) for m in self.masters),
1✔
328
                 *(rd(s) for s in self.slaves if s is not slave),
329
                 *extra]
330
        if conds:
1✔
331
            v = And(*conds)
1✔
332
        else:
333
            v = BIT.from_py(1)
1✔
334

335
        if skip is not None:
1✔
336
            v = v & ~skip
1✔
337

338
        return v
1✔
339

340
    def __repr__format_intf_list(self, intf_list):
341
        res = []
342
        for i in intf_list:
343
            extraCond = self.extraConds.get(i, NOT_SPECIFIED)
344
            skipWhen = self.skipWhen.get(i, NOT_SPECIFIED)
345
            if extraCond is not NOT_SPECIFIED and skipWhen is not NOT_SPECIFIED:
346
                res.append(f"({i},\n            extraCond={extraCond},\n            skipWhen={skipWhen})")
347
            elif extraCond is not NOT_SPECIFIED:
348
                res.append(f"({i},\n            extraCond={extraCond})")
349
            elif skipWhen is not NOT_SPECIFIED:
350
                res.append(f"({i},\n            skipWhen={skipWhen})")
351
            else:
352
                res.append(f"{i}")
353

354
        return ',\n        '.join(res)
355

356
    def __repr__(self):
357
        masters = self.__repr__format_intf_list(self.masters)
358
        slaves = self.__repr__format_intf_list(self.slaves)
359
        return f"""<{self.__class__.__name__}
360
    masters=[
361
        {masters:s}],
362
    slaves=[
363
        {slaves:s}],
364
>"""
365

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc