• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Nic30 / hwtLib / 33c42d21-d820-4417-a1b1-9b8f8645c374

19 Oct 2025 05:57PM UTC coverage: 92.767% (-0.05%) from 92.814%
33c42d21-d820-4417-a1b1-9b8f8645c374

push

circleci

Nic30
style

5469 of 6692 branches covered (81.72%)

40491 of 43648 relevant lines covered (92.77%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.83
/hwtLib/amba/axi4SSegmentedSimFrameUtils.py
1
from typing import Union, Sequence, Generator, Deque, Self
1✔
2

3
from hdlConvertorAst.to.hdlUtils import iter_with_last
1✔
4
from hwt.code import Concat
1✔
5
from hwt.hdl.const import HConst
1✔
6
from hwt.hdl.types.bitsConst import HBitsConst
1✔
7
from hwt.hdl.types.struct import HStruct
1✔
8
from hwt.pyUtils.typingFuture import override
1✔
9
from hwt.synthesizer.vectorUtils import iterBits
1✔
10
from hwtLib.abstract.simFrameUtils import SimFrameUtils
1✔
11
from hwtLib.amba.axi4SSegmented import Axi4StreamSegmented, \
1✔
12
    _Axi4StreamSegmentedWord, Axi4StreamSegmentedAgentWordType
13
from hwtLib.types.ctypes import uint8_t
1✔
14
from pyMathBitPrecise.bit_utils import get_bit_range
1✔
15

16

17
class Axi4StreamSegmentedFrameUtils(SimFrameUtils[Axi4StreamSegmentedAgentWordType]):
1✔
18

19
    def __init__(self, SEGMENT_DATA_WIDTH:int, SEGMENT_CNT:int, USER_SEGMENT_T: HStruct, BYTE_WIDTH:int=8,
1✔
20
                 SUPPORT_ZLP:bool=False, USE_SOF:bool=False, ERROR_WIDTH:int=0, PACK_SEGMENT_BITS:bool=False):
21
        self.SEGMENT_CNT = SEGMENT_CNT
1✔
22
        self.BYTE_WIDTH = BYTE_WIDTH
1✔
23
        self.SEGMENT_DATA_WIDTH = SEGMENT_DATA_WIDTH
1✔
24
        self.SEGMENT_BYTE_CNT = SEGMENT_DATA_WIDTH // BYTE_WIDTH
1✔
25
        assert SEGMENT_DATA_WIDTH % BYTE_WIDTH == 0
1✔
26
        self.USER_SEGMENT_T = USER_SEGMENT_T
1✔
27
        self.SUPPORT_ZLP = SUPPORT_ZLP
1✔
28
        self.USE_SOF = USE_SOF
1✔
29
        self.ERROR_WIDTH = ERROR_WIDTH
1✔
30
        self.PACK_SEGMENT_BITS = PACK_SEGMENT_BITS
1✔
31
        self.USE_EMPTY = Axi4StreamSegmented._hasEmpty(SEGMENT_DATA_WIDTH, BYTE_WIDTH, SUPPORT_ZLP)
1✔
32
        self.USE_ENABLE = Axi4StreamSegmented._hasEnable(SEGMENT_CNT)
1✔
33

34
    @override
1✔
35
    @classmethod
1✔
36
    def from_HwIO(cls, axiss: Axi4StreamSegmented) -> Self:
1✔
37
        return cls(axiss.SEGMENT_DATA_WIDTH,
1✔
38
                   axiss.SEGMENT_CNT,
39
                   axiss.USER_SEGMENT_T,
40
                   SUPPORT_ZLP=axiss.SUPPORT_ZLP,
41
                   USE_SOF=axiss.USE_SOF,
42
                   ERROR_WIDTH=axiss.ERROR_WIDTH,
43
                   PACK_SEGMENT_BITS=axiss.PACK_SEGMENT_BITS)
44

45
    @override
1✔
46
    def pack_frame(self, frameVal: Union[HConst, Sequence[int]])\
1✔
47
            ->Generator[Axi4StreamSegmentedAgentWordType, None, None]:
48
        """
49
        pack data of structure into words on Axi4StreamSegmented interface
50
        
51
        :param frameVal: value to be send, HConst instance or list of int for each byte
52
        
53
        :returns: generator of tuples tuples (data, USER_SEGMENT_T)
54
        """
55
        frameVal, frameIsZeroLen = self._toHConst(frameVal)
1✔
56
        byte_cnt = self.SEGMENT_BYTE_CNT
1✔
57
        USER_SEGMENT_T = self.USER_SEGMENT_T
1✔
58
        USE_SOF = self.USE_SOF
1✔
59
        USE_ENABLE = self.USE_ENABLE
1✔
60
        USE_EMPTY = self.USE_EMPTY
1✔
61
        ERROR_WIDTH = self.ERROR_WIDTH
1✔
62
        if frameIsZeroLen:
1!
63
            userData = {"eof": 1}
×
64
            if USE_ENABLE:
×
65
                userData["enable"] = 1
×
66
            if USE_EMPTY:
×
67
                userData["empty"] = byte_cnt
×
68
            if USE_SOF:
×
69
                userData["sof"] = 1
×
70
            if ERROR_WIDTH:
×
71
                userData["err"] = 0
×
72

73
            yield (None, USER_SEGMENT_T.from_py(userData))
×
74
            return
×
75

76
        dataWidth = self.SEGMENT_DATA_WIDTH
1✔
77
        words = iterBits(frameVal, bitsInOne=dataWidth,
1✔
78
                         skipPadding=False, fillup=True)
79
        first = True
1✔
80
        for last, data in iter_with_last(words):
1✔
81
            assert data._dtype.bit_length() == dataWidth, data._dtype.bit_length()
1✔
82
            sof = first
1✔
83
            if first:
1!
84
                first = False
1✔
85
            eof = last
1✔
86

87
            userData = {"eof":eof}
1✔
88
            if USE_ENABLE:
1!
89
                userData["enable"] = 1
1✔
90
            if USE_EMPTY:
1!
91
                if last:
1!
92
                    leftOverSize = (frameVal._dtype.bit_length() // self.BYTE_WIDTH) % byte_cnt
1✔
93
                    if leftOverSize:
1!
94
                        empty = byte_cnt - leftOverSize
1✔
95
                    else:
96
                        empty = 0
×
97
                else:
98
                    empty = 0
×
99
                userData["empty"] = empty
1✔
100

101
            if USE_SOF:
1!
102
                userData["sof"] = sof
×
103

104
            if ERROR_WIDTH:
1!
105
                userData["err"] = 0
×
106

107
            yield (data, USER_SEGMENT_T.from_py(userData))
1✔
108

109
    @override
1✔
110
    def concatWordBits(self, frameBeats: Sequence[Axi4StreamSegmentedAgentWordType]):
1✔
111
        for segments in frameBeats:
×
112
            # in lowest bits first format
113
            assert segments
×
114
            word: list[HBitsConst] = []
×
115
            fillerSegmentCnt = self.SEGMENT_CNT - len(segments)
×
116
            for data, _ in segments:
×
117
                word.append(data)
×
118
            for _ in range(fillerSegmentCnt):
×
119
                word.append(data._dtype.from_py(None))
×
120

121
            for data, user in segments:
×
122
                for f in user._dtype.fields:
×
123
                    word.append(getattr(user, f.name))
×
124
            
125
            for _ in range(fillerSegmentCnt):
×
126
                for f in user._dtype.fields:
×
127
                    if f.name == "enable":
×
128
                        word.append(f.dtype.from_py(0))
×
129
                    else:
130
                        word.append(f.dtype.from_py(None))
×
131

132
            yield Concat(*reversed(word))
×
133

134
    @override
1✔
135
    def send_bytes(self, data_B:Union[bytes, list[int]], ag_data:Deque[Axi4StreamSegmentedAgentWordType], offset:int=0) -> list[Axi4StreamSegmentedAgentWordType]:
1✔
136
        if data_B:
1!
137
            if isinstance(data_B, bytes):
1!
138
                data_B = [int(x) for x in data_B]
×
139
            t = uint8_t[len(data_B) + offset]
1✔
140
            _data_B = t.from_py([None for _ in range(offset)] + data_B)
1✔
141
        else:
142
            assert self.SUPPORT_ZLP
×
143
            _data_B = data_B
×
144

145
        f = self.pack_frame(_data_B)
1✔
146
        SEGMENT_CNT = self.SEGMENT_CNT
1✔
147
        if ag_data:
1✔
148
            # possibly fill unused segments into existing last word
149
            lastWord = ag_data[-1]
1✔
150
            if len(lastWord) < SEGMENT_CNT:
1✔
151
                newSegmentsForLastWord = []
1✔
152
                for _ in range(SEGMENT_CNT - len(lastWord)):
1✔
153
                    try:
1✔
154
                        newSegmentsForLastWord.append(next(f))
1✔
155
                    except StopIteration:
×
156
                        break
×
157
                ag_data[-1] = tuple((*lastWord, *newSegmentsForLastWord))
1✔
158

159
        eof = False
1✔
160
        while not eof:
1✔
161
            segments = []
1✔
162
            for _ in range(SEGMENT_CNT):
1!
163
                try:
1✔
164
                    seg = next(f)
1✔
165
                    segments.append(seg)
1✔
166
                except StopIteration:
1✔
167
                    eof = True
1✔
168
                    break
1✔
169
                if segments:
1!
170
                    ag_data.append(tuple(segments))
1✔
171

172
    @override
1✔
173
    def receive_bytes(self, ag_data:Deque[Axi4StreamSegmentedAgentWordType]) -> tuple[int, list[Union[int, HBitsConst]], bool]:
1✔
174
        """
175
        :param ag_data: list of axi stream segmented words, number of item in tuple depends on use_keep and use_id
176
        :returns: tuple (startSegmentIndex, data bytes, had error flag)
177
        """
178
        data_B = []
1✔
179
        sof = True
1✔
180
        eof = False
1✔
181
        err = 0
1✔
182
        first = True
1✔
183
        SEGMENT_BYTE_CNT = self.SEGMENT_BYTE_CNT
1✔
184
        SEGMENT_CNT = self.SEGMENT_CNT
1✔
185
        USE_SOF = self.USE_SOF
1✔
186
        ERROR_WIDTH = self.ERROR_WIDTH
1✔
187

188
        while ag_data:
1!
189
            try:
1✔
190
                _segmentIndex, data, user = _Axi4StreamSegmentedWord.popSegmentWordFromAgentData(ag_data, SEGMENT_CNT)
1✔
191
            except IndexError:
×
192
                break
×
193
            enable = bool(user.enable)
1✔
194
            if data_B:
1!
195
                assert enable, "All segments between sof-eof must have enable=1 or whole word must have valid=0"
×
196
            else:
197
                if not enable:
1!
198
                    # skipping unused segments before end of frame
199
                    continue
×
200

201
            if USE_SOF:
1!
202
                sof = bool(user.sof)
×
203
                assert sof == first, ("Soft must be 1 in the first segment of the frame", sof, first)
×
204
            else:
205
                sof = first
1✔
206

207
            if ERROR_WIDTH:
1!
208
                err |= int(user.err)
×
209

210
            eof = bool(user.eof)
1✔
211
            empty = int(user.empty)
1✔
212
            if empty != 0:
1!
213
                assert eof, "non-full word in the middle of the packet"
1✔
214

215
            for i in range(SEGMENT_BYTE_CNT - empty):
1✔
216
                d = get_bit_range(data.val, i * 8, 8)
1✔
217
                if get_bit_range(data.vld_mask, i * 8, 8) != 0xff:
1✔
218
                    raise AssertionError(
219
                        "Data not valid but it should be"
220
                        f' based on value of "empty" B_i:{i:d}, empty:{empty:d}, vld_mask:0x{data.vld_mask:x}')
221
                data_B.append(d)
1✔
222

223
            if first:
1!
224
                segmentIndex = _segmentIndex
1✔
225
                first = False
1✔
226

227
            if eof:
1!
228
                break
1✔
229

230
        if not eof:
1!
231
            if data_B:
×
232
                raise ValueError("Unfinished frame", data_B)
×
233
            else:
234
                raise ValueError("No frame available")
×
235

236
        return segmentIndex, data_B, err
1✔
237

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc