• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Nic30 / hwtLib / 170208f5-5e39-4024-ae82-f3378a16eefe

pending completion
170208f5-5e39-4024-ae82-f3378a16eefe

push

circleci

Nic30
StreamNode.__repr__: improve readability

9688 of 10705 branches covered (90.5%)

37526 of 40104 relevant lines covered (93.57%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.81
/hwtLib/amba/datapump/r_aligned_test.py
1
#!/usr/bin/env python3e
2
# -*- coding: utf-8 -*-
3

4
from hwtLib.amba.axi3 import Axi3
1✔
5
from hwtLib.amba.axi3Lite import Axi3Lite
1✔
6
from hwtLib.amba.axi4 import Axi4
1✔
7
from hwtLib.amba.axi4Lite import Axi4Lite
1✔
8
from hwtLib.amba.axiLite_comp.sim.ram import Axi4LiteSimRam
1✔
9
from hwtLib.amba.axi_comp.sim.ram import AxiSimRam
1✔
10
from hwtLib.amba.constants import RESP_OKAY
1✔
11
from hwtLib.amba.datapump.r import Axi_rDatapump
1✔
12
from hwtLib.amba.datapump.test import Axi_datapumpTC
1✔
13
from hwtSimApi.constants import CLK_PERIOD
1✔
14
from pyMathBitPrecise.bit_utils import mask
1✔
15

16

17
class Axi4_rDatapumpTC(Axi_datapumpTC):
1✔
18
    LEN_MAX_VAL = 255
1✔
19
    DATA_WIDTH = 64
1✔
20
    CHUNK_WIDTH = DATA_WIDTH
1✔
21
    ALIGNAS = DATA_WIDTH
1✔
22

23
    @classmethod
1✔
24
    def setUpClass(cls):
1✔
25
        u = cls.u = Axi_rDatapump(axiCls=Axi4)
1✔
26
        u.DATA_WIDTH = cls.DATA_WIDTH
1✔
27
        u.CHUNK_WIDTH = cls.CHUNK_WIDTH
1✔
28
        assert cls.CHUNK_WIDTH == cls.DATA_WIDTH
1✔
29
        u.MAX_CHUNKS = cls.LEN_MAX_VAL + 1
1✔
30
        u.ALIGNAS = cls.ALIGNAS
1✔
31
        cls.compileSim(u)
1✔
32

33
    def test_nop(self):
1✔
34
        u = self.u
1✔
35
        self.runSim(20 * CLK_PERIOD)
1✔
36

37
        self.assertEmpty(u.axi.ar._ag.data)
1✔
38
        self.assertEmpty(u.driver.r._ag.data)
1✔
39

40
    def test_notSplitedReq(self):
1✔
41
        u = self.u
1✔
42

43
        req = u.driver._ag.req
1✔
44

45
        # download one word from addr 0x100
46
        req.data.append(self.mkReq(0x100, 0))
1✔
47
        self.runSim((self.LEN_MAX_VAL + 3) * CLK_PERIOD)
1✔
48

49
        self.assertEqual(len(req.data), 0)
1✔
50
        self.assertEqual(len(u.axi.ar._ag.data), 1)
1✔
51
        self.assertEqual(len(u.driver._ag.r.data), 0)
1✔
52

53
    def test_notSplitedReqWithData(self):
1✔
54
        u = self.u
1✔
55
        r = u.axi.r._ag
1✔
56

57
        ar_ref, driver_r_ref = self.spotReadMemcpyTransactions(0x100, 0, None)
1✔
58
        # extra data without any request
59
        for i in range(2):
1✔
60
            r.data.append(self.rTrans(i + 78))
1✔
61

62
        self.runSim((self.LEN_MAX_VAL + 7) * CLK_PERIOD)
1✔
63

64
        self.assertValSequenceEqual(u.axi.ar._ag.data, ar_ref)
1✔
65
        self.assertValSequenceEqual(u.driver.r._ag.data, driver_r_ref)
1✔
66
        # 2. is now beeing sended (but it should not finish as there is not a request for it)
67
        self.assertEqual(len(r.data), 2 - 1)
1✔
68

69
    def test_maxNotSplitedReqWithData(self):
1✔
70
        u = self.u
1✔
71

72
        req = u.driver.req._ag
1✔
73
        r = u.axi.r._ag
1✔
74

75
        ar_ref, driver_r_ref = self.spotReadMemcpyTransactions(0x100, self.LEN_MAX_VAL, None)
1✔
76

77
        # dummy data after the transaction
78
        r.data.extend([
1✔
79
            self.rTrans(11),
80
            self.rTrans(12),
81
        ])
82
        self.runSim((self.LEN_MAX_VAL + 6) * CLK_PERIOD)
1✔
83

84
        self.assertEmpty(req.data)
1✔
85
        self.assertValSequenceEqual(u.axi.ar._ag.data, ar_ref)
1✔
86
        self.assertValSequenceEqual(u.driver.r._ag.data, driver_r_ref)
1✔
87

88
        # 2. is now beeing sended (but it should not finish as there is not a request for it)
89
        self.assertEqual(len(r.data), 2 - 1)
1✔
90

91
    def test_maxReq(self):
1✔
92
        ar_ref, driver_r_ref = self.spotReadMemcpyTransactions(0x100, 2 * self.LEN_MAX_VAL + 1, None, addData=False)
1✔
93
        self.runSim((2 * self.LEN_MAX_VAL + 3) * CLK_PERIOD)
1✔
94

95
        self.check_r_trans(ar_ref, driver_r_ref)
1✔
96

97
    def test_maxOverlap(self):
1✔
98
        u = self.u
1✔
99
        MAX_TRANS_OVERLAP = u.MAX_TRANS_OVERLAP
1✔
100

101
        ar_ref, _ = self.spotReadMemcpyTransactions(0x100, 2 * MAX_TRANS_OVERLAP, 0, addData=False)
1✔
102

103
        self.runSim((2 * MAX_TRANS_OVERLAP + 6) * CLK_PERIOD)
1✔
104

105
        self.assertEqual(len(u.driver._ag.req.data), MAX_TRANS_OVERLAP)
1✔
106
        self.assertEqual(len(u.driver.r._ag.data), 0)
1✔
107
        self.assertValSequenceEqual(u.axi.ar._ag.data, ar_ref[:16])
1✔
108
        self.assertEqual(len(u.axi.r._ag.data), 0)
1✔
109

110
    def test_multipleShortest(self, N=64):
1✔
111
        ar_ref, driver_r_ref = self.spotReadMemcpyTransactions(0x0, N - 1, 0)
1✔
112
        self.runSim((N + 5) * CLK_PERIOD)
1✔
113
        self.check_r_trans(ar_ref, driver_r_ref)
1✔
114

115
    def test_endstrb(self, FRAME_LEN=0):
1✔
116
        ar_ref, driver_r_ref = [], []
1✔
117
        addr_step = self.DATA_WIDTH // 8
1✔
118
        for i in range(0, addr_step, self.CHUNK_WIDTH // 8):  # for all possible end offsets
1✔
119
            _ar_ref, _driver_r_ref = self.spotReadMemcpyTransactions((i + FRAME_LEN + 1) * addr_step,
1✔
120
                                                                 FRAME_LEN,
121
                                                                 None,
122
                                                                 lastWordByteCnt=i + self.CHUNK_WIDTH // 8)
123
            ar_ref.extend(_ar_ref)
1✔
124
            driver_r_ref.extend(_driver_r_ref)
1✔
125

126
        self.runSim((len(driver_r_ref) + 5) * 2 * CLK_PERIOD)
1✔
127

128
        self.check_r_trans(ar_ref, driver_r_ref)
1✔
129

130
    def test_endstrbMultiFrame(self):
1✔
131
        self.test_endstrb(self.LEN_MAX_VAL)
1✔
132

133
    def test_multipleSplited(self, FRAMES=4):
1✔
134
        ar_ref, driver_r_ref = self.spotReadMemcpyTransactions(0x100, FRAMES * (self.LEN_MAX_VAL + 1) - 1, None)
1✔
135
        self.runSim((len(driver_r_ref) + 5) * CLK_PERIOD)
1✔
136
        self.check_r_trans(ar_ref, driver_r_ref)
1✔
137

138
    def test_randomized(self, N=24):
1✔
139
        u = self.u
1✔
140

141
        if u.AXI_CLS in (Axi3Lite, Axi4Lite):
1✔
142
            m = Axi4LiteSimRam(axi=u.axi)
1✔
143
        else:
144
            m = AxiSimRam(axi=u.axi)
1✔
145

146
        MAGIC = 99
1✔
147
        self.randomize(u.driver.r)
1✔
148
        self.randomize(u.driver.req)
1✔
149

150
        self.randomize(u.axi.ar)
1✔
151
        self.randomize(u.axi.r)
1✔
152

153
        r_ref = []
1✔
154
        for _ in range(N):
1✔
155
            size = int(self._rand.random() * self.LEN_MAX_VAL) + 1
1✔
156
            data = [MAGIC + i2 for i2 in range(size)]
1✔
157
            a = m.calloc(size, u.DATA_WIDTH // 8, initValues=data)
1✔
158
            _, _r_ref = self.spotReadMemcpyTransactions(a, size - 1, None, data=data)
1✔
159
            r_ref.extend(_r_ref)
1✔
160
            MAGIC += size
1✔
161

162
        self.runSim(len(r_ref) * 5 * CLK_PERIOD)
1✔
163

164
        self.assertEmpty(u.driver.req._ag.data)
1✔
165
        self.assertValSequenceEqual(u.driver.r._ag.data, r_ref)
1✔
166

167
    def test_simpleUnalignedWithData(self, N=1, WORDS=1, OFFSET_B=None, randomize=False):
1✔
168
        u = self.u
1✔
169

170
        req = u.driver._ag.req
1✔
171
        if randomize:
1!
172
            self.randomize(u.driver.req)
×
173
            self.randomize(u.driver.r)
×
174
            self.randomize(u.axi.ar)
×
175
            self.randomize(u.axi.r)
×
176

177
        if u.AXI_CLS in (Axi3Lite, Axi4Lite):
1✔
178
            m = Axi4LiteSimRam(axi=u.axi)
1✔
179
        else:
180
            m = AxiSimRam(axi=u.axi)
1✔
181

182
        if OFFSET_B is None:
1!
183
            if self.ALIGNAS == self.DATA_WIDTH:
1✔
184
                # to trigger an alignment error
185
                OFFSET_B = 1
1✔
186
            else:
187
                OFFSET_B = self.ALIGNAS // 8
1✔
188

189
        addr_step = u.DATA_WIDTH // 8
1✔
190
        offset = 8
1✔
191
        ref_r_frames = []
1✔
192
        for i in range(N):
1✔
193
            data = [ (i + i2) & 0xff for i2 in range((WORDS + 1) * addr_step)]
1✔
194
            a = m.calloc((WORDS + 1) * addr_step, 1, initValues=data)
1✔
195
            rem = (self.CHUNK_WIDTH % self.DATA_WIDTH) // 8
1✔
196
            req.data.append(self.mkReq(a + OFFSET_B, WORDS - 1, rem=rem))
1✔
197
            if rem == 0:
1✔
198
                rem = self.DATA_WIDTH
1✔
199
            ref_r_frames.append(data[OFFSET_B:((WORDS - 1) * addr_step) + rem + OFFSET_B])
1✔
200

201
        t = (10 + N) * CLK_PERIOD
1✔
202
        if randomize:
1!
203
            t *= 6
×
204
        self.runSim(t)
1✔
205
        if u.ALIGNAS == u.DATA_WIDTH:
1✔
206
            # unsupported alignment check if error is set
207
            self.assertValEqual(u.errorAlignment._ag.data[-1], 1)
1✔
208

209
        else:
210
            if u.ALIGNAS != 8:
1✔
211
                # if alignment is on 1B the but the errorAlignment can not happen
212
                self.assertValEqual(u.errorAlignment._ag.data[-1], 0)
1✔
213

214
            ar_ref = []
1✔
215
            if u.axi.LEN_WIDTH == 0:
1✔
216
                for i in range(N):
1✔
217
                    for w_i in range(WORDS + 1):
1✔
218
                        ar_ref.append(
1✔
219
                            self.aTrans(0x100 + (i + w_i) * addr_step, WORDS, 0)
220
                        )
221
            else:
222
                for i in range(N):
1✔
223
                    ar_ref.append(
1✔
224
                        self.aTrans(0x100 + i * addr_step, WORDS, 0)
225
                    )
226

227
            driver_r = u.driver.r._ag.data
1✔
228
            for ref_frame in ref_r_frames:
1✔
229
                offset = None
1✔
230
                r_data = []
1✔
231
                for w_i in range(WORDS):
1✔
232
                    data, strb, last = driver_r.popleft()
1✔
233
                    self.assertValEqual(last, int(w_i == WORDS - 1))
1✔
234

235
                    for B_i in range(addr_step):
1✔
236
                        if strb[B_i]:
1✔
237
                            if offset is None:
1✔
238
                                offset = B_i + (addr_step * w_i)
1✔
239
                            B = int(data[(B_i + 1) * 8: B_i * 8])
1✔
240
                            r_data.append(B)
1✔
241

242
                self.assertEqual(offset, 0)
1✔
243
                self.assertSequenceEqual(r_data, ref_frame)
1✔
244

245
        self.assertEmpty(u.axi.ar._ag.data)
1✔
246
        self.assertEmpty(u.axi.r._ag.data)
1✔
247

248

249
class Axi3_rDatapumpTC(Axi4_rDatapumpTC):
1✔
250
    LEN_MAX_VAL = 15
1✔
251

252
    @classmethod
1✔
253
    def setUpClass(cls):
1✔
254
        u = Axi_rDatapump(axiCls=Axi3)
1✔
255
        u.DATA_WIDTH = cls.DATA_WIDTH
1✔
256
        u.CHUNK_WIDTH = cls.CHUNK_WIDTH
1✔
257
        assert cls.CHUNK_WIDTH == cls.DATA_WIDTH
1✔
258
        u.MAX_CHUNKS = cls.LEN_MAX_VAL + 1
1✔
259
        u.ALIGNAS = cls.ALIGNAS
1✔
260
        cls.compileSim(u)
1✔
261

262

263
class Axi3Lite_rDatapumpTC(Axi4_rDatapumpTC):
1✔
264
    LEN_MAX_VAL = 3
1✔
265
    MAX_CHUNKS = 1
1✔
266
    CHUNK_WIDTH = Axi4_rDatapumpTC.DATA_WIDTH
1✔
267
    ALIGNAS = Axi4_rDatapumpTC.ALIGNAS
1✔
268

269
    @classmethod
1✔
270
    def setUpClass(cls):
1✔
271
        u = Axi_rDatapump(axiCls=Axi3Lite)
1✔
272
        u.DATA_WIDTH = cls.DATA_WIDTH
1✔
273
        u.CHUNK_WIDTH = cls.CHUNK_WIDTH
1✔
274
        assert cls.CHUNK_WIDTH == cls.DATA_WIDTH
1✔
275
        u.MAX_CHUNKS = cls.LEN_MAX_VAL + 1
1✔
276
        u.ALIGNAS = cls.ALIGNAS
1✔
277
        cls.compileSim(u)
1✔
278

279
    def rTrans(self, data, _id=0, resp=RESP_OKAY, last=True):
1✔
280
        assert last
1✔
281
        assert _id == 0
1✔
282
        return (data, resp)
1✔
283

284
    def rDriverTrans(self, data, last, strb=mask(64 // 8), id_=0):
1✔
285
        assert id_ == 0
1✔
286
        return (data, strb, int(last))
1✔
287

288

289
Axi_rDatapump_alignedTCs = [
1✔
290
    Axi3_rDatapumpTC,
291
    Axi4_rDatapumpTC,
292
    Axi3Lite_rDatapumpTC,
293
]
294

295
if __name__ == "__main__":
296
    import unittest
297
    testLoader = unittest.TestLoader()
298
    # suite = unittest.TestSuite([Axi4Lite_rDatapump_16b_from_64bTC("test_notSplitedReqWithData")])
299
    loadedTcs = [testLoader.loadTestsFromTestCase(tc) for tc in Axi_rDatapump_alignedTCs]
300
    suite = unittest.TestSuite(loadedTcs)
301
    runner = unittest.TextTestRunner(verbosity=3)
302
    runner.run(suite)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc