• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Nic30 / hwtLib / b469f1f6-6a00-4958-bfb0-f9fbf427a589

06 Jun 2024 06:38PM UTC coverage: 93.399% (-0.03%) from 93.431%
b469f1f6-6a00-4958-bfb0-f9fbf427a589

push

circleci

Nic30
docs

8040 of 9100 branches covered (88.35%)

39136 of 41902 relevant lines covered (93.4%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.25
/hwtLib/amba/axi_comp/cache/cacheWriteAllocWawOnlyWritePropagating_test.py
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3

4
from hwt.hdl.types.bits import HBits
1✔
5
from hwt.serializer.combLoopAnalyzer import CombLoopAnalyzer
1✔
6
from hwt.simulator.simTestCase import SimTestCase
1✔
7
from hwtLib.amba.axiLite_comp.sim.utils import axi_randomize_per_channel
1✔
8
from hwtLib.amba.axi_comp.cache.cacheWriteAllocWawOnlyWritePropagating import AxiCacheWriteAllocWawOnlyWritePropagating
1✔
9
from hwtLib.amba.constants import RESP_OKAY
1✔
10
from hwtLib.examples.errors.combLoops import freeze_set_of_sets
1✔
11
from hwtLib.mem.sim.segmentedArrayProxy import SegmentedArrayProxy
1✔
12
from hwtSimApi.constants import CLK_PERIOD
1✔
13
from pyMathBitPrecise.bit_utils import set_bit_range, mask, int_list_to_int
1✔
14
from hwt.hdl.types.bitsConst import HBitsConst
1✔
15

16

17
class AxiCacheWriteAllocWawOnlyWritePropagatingTC(SimTestCase):
1✔
18
    # number of words in transaction - 1
19
    LEN = 0
1✔
20

21
    @classmethod
1✔
22
    def setUpClass(cls):
1✔
23
        cls.dut = dut = AxiCacheWriteAllocWawOnlyWritePropagating()
1✔
24
        dut.IS_PREINITIALIZED = True
1✔
25
        dut.DATA_WIDTH = 32
1✔
26
        dut.CACHE_LINE_SIZE = 4 * (cls.LEN + 1)
1✔
27
        dut.CACHE_LINE_CNT = 16
1✔
28
        dut.MAX_BLOCK_DATA_WIDTH = 8
1✔
29
        dut.WAY_CNT = 2
1✔
30
        cls.ADDR_STEP = dut.CACHE_LINE_SIZE
1✔
31
        cls.WAY_CACHELINES = dut.CACHE_LINE_CNT // dut.WAY_CNT
1✔
32
        cls.compileSim(dut)
1✔
33

34
    def setUp(self):
1✔
35
        SimTestCase.setUp(self)
1✔
36
        m = self.rtl_simulator.model
1✔
37
        dut = self.dut
1✔
38
        self.TAGS = SegmentedArrayProxy([
1✔
39
                getattr(m.tag_array_inst.tag_mem_inst, f"children_{i:d}_inst").io.ram_memory
40
                for i in range(dut.tag_array.tag_record_t.bit_length() * dut.WAY_CNT // 8)
41
            ],
42
        )
43
        self.DATA = SegmentedArrayProxy([
1✔
44
                getattr(m.data_array_inst.data_array_inst, f"children_{i:d}_inst").io.ram_memory
45
                for i in range(dut.DATA_WIDTH // 8)
46
            ],
47
            words_per_item=self.LEN + 1
48
        )
49

50
    def set_data(self, index: int, way: int, data: HBitsConst):
1✔
51
        dut = self.dut
1✔
52
        WAY_CACHELINES = dut.CACHE_LINE_CNT // dut.WAY_CNT
1✔
53
        self.DATA[way * WAY_CACHELINES + index] = data
1✔
54

55
    def build_cacheline(self, cacheline_words):
1✔
56
        return int_list_to_int(cacheline_words, self.dut.DATA_WIDTH)
1✔
57

58
    def get_data(self, index, way):
1✔
59
        return self.DATA[way * self.WAY_CACHELINES + index]
1✔
60

61
    def set_tag(self, addr, way):
1✔
62
        dut = self.dut
1✔
63
        tag, index, offset = dut.parse_addr_int(addr)
1✔
64
        assert offset == 0, addr
1✔
65
        tag_t = dut.tag_array.tag_record_t
1✔
66
        tag_t_w = tag_t.bit_length()
1✔
67
        v = tag_t.from_py({"tag": tag, "valid": 1})._reinterpret_cast(HBits(tag_t_w))
1✔
68

69
        cur_v = self.TAGS[index]
1✔
70
        assert cur_v._is_full_valid(), (cur_v, index)
1✔
71
        val = set_bit_range(cur_v.val, way * tag_t_w, tag_t_w, v.val)
1✔
72
        self.TAGS[index] = val
1✔
73

74
        return index
1✔
75

76
    def cacheline_insert(self, addr: int, way: int, data: HBitsConst):
1✔
77
        index = self.set_tag(addr, way)
1✔
78
        self.set_data(index, way, data)
1✔
79

80
    def get_cachelines(self):
1✔
81
        dut = self.dut
1✔
82
        res = {}
1✔
83
        tags_t = dut.tag_array.tag_record_t[dut.WAY_CNT]
1✔
84
        tags_raw_t = HBits(tags_t.bit_length())
1✔
85
        for index in range(2 ** dut.INDEX_W):
1✔
86
            tags = self.TAGS[index]
1✔
87
            tags = tags_raw_t.from_py(tags.val, tags.vld_mask)._reinterpret_cast(tags_t)
1✔
88
            for way, t in enumerate(tags):
1✔
89
                if t.valid:
1✔
90
                    data = self.get_data(index, way)
1✔
91
                    addr = dut.deparse_addr(t.tag, index, 0)
1✔
92
                    if data._is_full_valid():
1!
93
                        data = int(data)
1✔
94
                    res[int(addr)] = data
1✔
95
        return res
1✔
96

97
    def randomize_all(self):
1✔
98
        dut = self.dut
1✔
99
        axi_randomize_per_channel(self, dut.s)
1✔
100
        axi_randomize_per_channel(self, dut.m)
1✔
101

102
    def test_utils(self):
1✔
103
        self.TAGS.clean()
1✔
104
        self.DATA.clean()
1✔
105
        self.assertDictEqual(self.get_cachelines(), {})
1✔
106

107
        expected = {}
1✔
108
        MAGIC = 99
1✔
109
        for w in range(self.dut.WAY_CNT):
1✔
110
            a = (w * self.WAY_CACHELINES + 3) * self.ADDR_STEP
1✔
111
            self.cacheline_insert(a, w, MAGIC + w)
1✔
112
            expected[a] = MAGIC + w
1✔
113
            self.assertDictEqual(self.get_cachelines(), expected)
1✔
114

115
    def test_no_comb_loops(self):
1✔
116
        s = CombLoopAnalyzer()
1✔
117
        s.visit_HwModule(self.dut)
1✔
118
        comb_loops = freeze_set_of_sets(s.report())
1✔
119
        # for loop in comb_loops:
120
        #     print(10 * "-")
121
        #     for s in loop:
122
        #         print(s.resolve()[1:])
123

124
        self.assertEqual(comb_loops, frozenset())
1✔
125

126
    def test_nop(self):
1✔
127
        dut = self.dut
1✔
128
        self.randomize_all()
1✔
129
        self.runSim(50 * CLK_PERIOD)
1✔
130
        for x in [dut.m.ar, dut.m.aw, dut.m.w, dut.s.r, dut.s.b]:
1✔
131
            self.assertEmpty(x._ag.data)
1✔
132

133
    def test_read_through(self, N=10, randomized=False):
1✔
134
        # Every transaction should be checked if present in cache, should not be found and should
135
        # be forwarded on axi "m" port
136
        dut = self.dut
1✔
137
        self.TAGS.clean()
1✔
138
        ref = [
1✔
139
            dut.s.ar._ag.create_addr_req(addr=i * self.ADDR_STEP, _len=self.LEN, _id=i)
140
            for i in range(N)
141
        ]
142
        dut.s.ar._ag.data.extend(ref)
1✔
143
        t = (N * (self.LEN + 1) + 5) * CLK_PERIOD
1✔
144
        if randomized:
1!
145
            self.randomize_all()
×
146
            t *= 3
×
147

148
        self.runSim(t)
1✔
149
        self.assertValSequenceEqual(dut.m.ar._ag.data, ref)
1✔
150
        for x in [dut.m.aw, dut.m.w, dut.s.r, dut.s.b]:
1✔
151
            self.assertEmpty(x._ag.data)
1✔
152

153
    def test_read_from_everywhere(self, N_PER_WAY=10, MAGIC=99, randomized=False):
1✔
154
        # Every transaction should be read from the cache
155
        dut = self.dut
1✔
156
        self.TAGS.clean()
1✔
157
        self.DATA.clean()
1✔
158
        ID_MAX = 2 ** dut.ID_WIDTH
1✔
159
        WAY_CACHELINES = self.WAY_CACHELINES
1✔
160
        expected_r = []
1✔
161
        for w in range(dut.WAY_CNT):
1✔
162
            for i in range(N_PER_WAY):
1✔
163
                i = i % WAY_CACHELINES
1✔
164
                addr = (i + w * WAY_CACHELINES) * self.ADDR_STEP
1✔
165
                _id = i % ID_MAX
1✔
166
                req = dut.s.ar._ag.create_addr_req(addr=addr, _len=self.LEN, _id=_id)
1✔
167
                dut.s.ar._ag.data.append(req)
1✔
168
                d_list = []
1✔
169
                for w_i in range(self.LEN + 1):
1✔
170
                    d = w * MAGIC + i + w_i
1✔
171
                    expected_r.append((_id, d, RESP_OKAY, int(w_i == self.LEN)))
1✔
172
                    d_list.append(d)
1✔
173

174
                self.cacheline_insert(addr, w, self.build_cacheline(d_list))
1✔
175

176
        t = (10 + dut.WAY_CNT * N_PER_WAY * (self.LEN + 1)) * CLK_PERIOD
1✔
177
        if randomized:
1✔
178
            self.randomize_all()
1✔
179
            t *= 3
1✔
180
        self.runSim(t)
1✔
181
        for x in [dut.s.ar, dut.m.ar, dut.m.aw, dut.m.w, dut.s.b]:
1✔
182
            self.assertEmpty(x._ag.data, x)
1✔
183

184
        self.assertValSequenceEqual(dut.s.r._ag.data, expected_r)
1✔
185

186
    def test_read_from_everywhere_r(self, N_PER_WAY=50, MAGIC=99):
1✔
187
        self.test_read_from_everywhere(
1✔
188
            N_PER_WAY=N_PER_WAY,
189
            MAGIC=MAGIC,
190
            randomized=True)
191

192
    def _test_write_to(self, N_PER_WAY=8, MAGIC=99, preallocate=False, randomized=False):
1✔
193
        # Every cacheline should be stored in cache as there is plenty of space
194
        dut = self.dut
1✔
195
        self.TAGS.clean()
1✔
196
        self.DATA.clean()
1✔
197

198
        WAY_CACHELINES = self.WAY_CACHELINES
1✔
199
        ID_MAX = 2 ** dut.ID_WIDTH
1✔
200
        aw = dut.s.aw._ag
1✔
201
        expected = {}
1✔
202
        b_expected = []
1✔
203
        M = mask(dut.DATA_WIDTH // 8)
1✔
204
        for w in range(dut.WAY_CNT):
1✔
205
            for i in range(N_PER_WAY):
1✔
206
                i = i % WAY_CACHELINES
1✔
207
                addr = (i + w * WAY_CACHELINES) * self.ADDR_STEP
1✔
208

209
                if preallocate:
1✔
210
                    self.cacheline_insert(addr, w, 0)
1✔
211

212
                _id = i % ID_MAX
1✔
213
                req = aw.create_addr_req(addr=addr, _len=self.LEN, _id=_id)
1✔
214
                aw.data.append(req)
1✔
215
                d_list = []
1✔
216
                for w_i in range(self.LEN + 1):
1✔
217
                    d = w * MAGIC + i + w_i
1✔
218
                    dut.s.w._ag.data.append((d, M, int(w_i == self.LEN)))
1✔
219
                    d_list.append(d)
1✔
220
                expected[addr] = self.build_cacheline(d_list)
1✔
221

222
                b_expected.append((_id, RESP_OKAY))
1✔
223
        if preallocate:
1✔
224
            self.assertDictEqual(self.get_cachelines(), {k: 0 for k in expected.keys()})
1✔
225
        t = (dut.WAY_CNT * N_PER_WAY * (self.LEN + 1) + 10) * CLK_PERIOD
1✔
226
        if randomized:
1✔
227
            t *= 3
1✔
228
            self.randomize_all()
1✔
229

230
        self.runSim(t)
1✔
231
        for x in [dut.s.aw, dut.s.r, dut.m.ar, dut.m.aw, dut.m.w]:
1✔
232
            self.assertEmpty(x._ag.data, x)
1✔
233

234
        self.assertDictEqual(self.get_cachelines(), expected)
1✔
235
        self.assertValSequenceEqual(dut.s.b._ag.data, b_expected)
1✔
236

237
    def test_write_to_empty(self, N_PER_WAY=8, MAGIC=99, preallocate=False, randomized=False):
1✔
238
        self._test_write_to(N_PER_WAY, MAGIC, preallocate, randomized)
1✔
239

240
    def test_write_to_preallocated(self, N_PER_WAY=8, MAGIC=99):
1✔
241
        self._test_write_to(N_PER_WAY=N_PER_WAY, MAGIC=MAGIC, preallocate=True)
1✔
242

243
    def test_write_to_empty_r(self, N_PER_WAY=50, MAGIC=99):
1✔
244
        self._test_write_to(N_PER_WAY=N_PER_WAY, MAGIC=MAGIC, preallocate=False, randomized=True)
1✔
245

246
    def test_write_to_preallocated_r(self, N_PER_WAY=50, MAGIC=99):
1✔
247
        self._test_write_to(N_PER_WAY=N_PER_WAY, MAGIC=MAGIC, preallocate=True, randomized=True)
1✔
248

249
    def test_read_write_to_different_sets(self, N_PER_WAY=5, MAGIC=99, randomized=False):
1✔
250
        dut = self.dut
1✔
251
        self.TAGS.clean()
1✔
252
        self.DATA.clean()
1✔
253

254
        WAY_CACHELINES = self.WAY_CACHELINES
1✔
255
        ID_MAX = 2 ** dut.ID_WIDTH
1✔
256
        aw = dut.s.aw._ag
1✔
257
        expected = {}
1✔
258
        b_expected = []
1✔
259
        M = mask(dut.DATA_WIDTH // 8)
1✔
260
        expected_r = []
1✔
261
        for w in range(dut.WAY_CNT):
1✔
262
            for i in range(N_PER_WAY):
1✔
263
                i = i % WAY_CACHELINES
1✔
264
                addr = (i + w * WAY_CACHELINES) * self.ADDR_STEP
1✔
265

266
                _id = i % ID_MAX
1✔
267
                req = aw.create_addr_req(addr=addr, _len=0, _id=_id)
1✔
268

269
                dut.s.ar._ag.data.append(req)
1✔
270

271
                aw.data.append(req)
1✔
272
                r_d_list = []
1✔
273
                w_d_list = []
1✔
274
                for w_i in range(self.LEN + 1):
1✔
275
                    last = int(w_i == self.LEN)
1✔
276

277
                    write_d = (dut.WAY_CNT + w) * MAGIC + i + w_i
1✔
278
                    dut.s.w._ag.data.append((write_d, M, last))
1✔
279
                    w_d_list.append(write_d)
1✔
280

281
                    read_d = w * MAGIC + i + w_i
1✔
282
                    expected_r.append((_id, read_d, RESP_OKAY, last))
1✔
283
                    r_d_list.append(read_d)
1✔
284

285
                expected[addr] = self.build_cacheline(w_d_list)
1✔
286
                self.cacheline_insert(addr, w, self.build_cacheline(r_d_list))
1✔
287

288
                b_expected.append((_id, RESP_OKAY))
1✔
289

290
        t = (dut.WAY_CNT * N_PER_WAY * (self.LEN + 1) + 10) * CLK_PERIOD
1✔
291
        if randomized:
1!
292
            t *= 3
×
293
            self.randomize_all()
×
294

295
        self.runSim(t)
1✔
296
        for x in [dut.s.aw, dut.m.ar, dut.m.aw, dut.m.w]:
1✔
297
            self.assertEmpty(x._ag.data, x)
1✔
298

299
        self.assertDictEqual(self.get_cachelines(), expected)
1✔
300
        self.assertValSequenceEqual(dut.s.b._ag.data, b_expected)
1✔
301
        self.assertValSequenceEqual(dut.s.r._ag.data, expected_r)
1✔
302

303
    # write victim flush
304

305

306
class AxiCacheWriteAllocWawOnlyWritePropagating_len1TC(AxiCacheWriteAllocWawOnlyWritePropagatingTC):
1✔
307
    LEN = 1
1✔
308

309

310
AxiCacheWriteAllocWawOnlyWritePropagatingTCs = [
1✔
311
    AxiCacheWriteAllocWawOnlyWritePropagatingTC,
312
    AxiCacheWriteAllocWawOnlyWritePropagating_len1TC,
313
]
314

315
if __name__ == "__main__":
316
    import unittest
317
    testLoader = unittest.TestLoader()
318
    # suite = unittest.TestSuite([AxiCacheWriteAllocWawOnlyWritePropagatingTC("test_write_to_empty")])
319
    loadedTcs = [testLoader.loadTestsFromTestCase(tc) for tc in AxiCacheWriteAllocWawOnlyWritePropagatingTCs]
320
    suite = unittest.TestSuite(loadedTcs)
321
    runner = unittest.TextTestRunner(verbosity=3)
322
    runner.run(suite)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc