• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Nic30 / hwtLib / 97b914ce-e803-4d6c-b69e-5a382af1f922

05 Nov 2025 05:49PM UTC coverage: 92.735% (-0.03%) from 92.769%
97b914ce-e803-4d6c-b69e-5a382af1f922

push

circleci

Nic30
feat(logic): CRC_POLY utility functions

5467 of 6696 branches covered (81.65%)

5 of 22 new or added lines in 1 file covered. (22.73%)

3 existing lines in 2 files now uncovered.

40492 of 43664 relevant lines covered (92.74%)

0.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.04
/hwtLib/examples/axi/debugbusmonitor_test.py
1
#!/usr/bin/env python3
2
# -*- coding: utf-8 -*-
3

4
from collections import deque
1✔
5
from io import StringIO
1✔
6
from math import ceil
1✔
7
import os
1✔
8
import threading
1✔
9
import unittest
1✔
10

11
from hwt.simulator.simTestCase import SimTestCase
1✔
12
from hwtLib.examples.axi.debugbusmonitor import DebugBusMonitorExampleAxi
1✔
13
from hwtLib.tools.debug_bus_monitor_ctl import DebugBusMonitorCtl, words_to_int
1✔
14
from hwtSimApi.constants import CLK_PERIOD
1✔
15
from hwtSimApi.triggers import Timer, StopSimumulation, WaitWriteOnly
1✔
16
from pyMathBitPrecise.bit_utils import ValidityError
1✔
17

18

19
class DebugBusMonitorCtlSim(DebugBusMonitorCtl):
1✔
20

21
    def __init__(self, tc):
1✔
22
        DebugBusMonitorCtl.__init__(self, 0)
1✔
23
        self.tc = tc
1✔
24

25
    def read(self, addr, size):
1✔
26
        axi = self.tc.dut.s
1✔
27
        word_size = axi.DATA_WIDTH // 8
1✔
28
        words = []
1✔
29
        tc: DebugBusMonitorExampleAxiTC = self.tc
1✔
30
        for _ in range(ceil(size / word_size)):
1✔
31
            assert not tc.sim_done
1✔
32
            ar_req = axi.ar._ag.create_addr_req(addr)
1✔
33
            axi.ar._ag.data.append(ar_req)
1✔
34
            # notify simulator about new request
35
            tc.ctl_thread_request_pending = True
1✔
36
            if tc.ctl_thread_has_requests.locked():
1✔
37
                tc.ctl_thread_has_requests.release()
1✔
38

39
            r_data = axi.r._ag.data
1✔
40
            while not r_data:
1✔
41
                assert not self.tc.sim_done
1✔
42
                # wait until simulator provides the data
43
                self.tc.r_data_available.acquire()
1✔
44
                tc.ctl_thread_request_pending = False
1✔
45
            
46
            d = r_data.popleft()[0]
1✔
47
            try:
1✔
48
                d = int(d)
1✔
49
            except ValidityError:
1✔
50
                d = d.val & d.vld_mask
1✔
51

52
            words.append(d)
1✔
53
            addr += word_size
1✔
54

55
        return words_to_int(words, word_size, size).to_bytes(size, "little")
1✔
56

57

58
def run_DebugBusMonitorCtlSim(tc, out_txt, out_dot):
1✔
59
    db = DebugBusMonitorCtlSim(tc)
1✔
60
    db.dump_txt(out_txt)
1✔
61
    db.dump_dot(out_dot)
1✔
62
    tc.sim_done = True
1✔
63

64

65
class DebugBusMonitorExampleAxiTC(SimTestCase):
1✔
66

67
    @classmethod
1✔
68
    def setUpClass(cls):
1✔
69
        dut = cls.dut = DebugBusMonitorExampleAxi()
1✔
70
        dut.DATA_WIDTH = 32
1✔
71
        cls.compileSim(dut)
1✔
72

73
    def setUp(self):
1✔
74
        SimTestCase.setUp(self)
1✔
75
        self.sim_done = False
1✔
76
        self.r_data_available = threading.Lock()
1✔
77
        self.r_data_available.acquire()  # locked means there are no data
1✔
78
        self.ctl_thread_request_pending = False
1✔
79
        self.ctl_thread_has_requests = threading.Lock()
1✔
80
        self.ctl_thread_has_requests.acquire()  # locked means there are no requests
1✔
81

82
    def test_dump(self):
1✔
83
        dut = self.dut
1✔
84
        tc = self
1✔
85

86
        class SpyDeque(deque):
1✔
87

88
            def __init__(self, tc):
1✔
89
                super(SpyDeque, self).__init__()
1✔
90
                self.tc = tc
1✔
91

92
            def append(self, x):
1✔
93
                if self.tc.r_data_available.locked():
1!
94
                    self.tc.r_data_available.release()
1✔
95
                super(SpyDeque, self).append(x)
1✔
96

97
        dut.s.r._ag.data = SpyDeque(self)
1✔
98
        dut.din0._ag.data.extend([1, 2])
1✔
99
        dut.din1._ag.data.extend([3, 4])
1✔
100
        dut.din2._ag.data.extend([5, 6])
1✔
101

102
        def sim_init():
1✔
103
            yield WaitWriteOnly()
1✔
104
            dut.dout1._ag.setEnable(False)
1✔
105

106
        def time_sync():
1✔
107
            while True:
1✔
108
                if dut.s.r._ag.data and tc.r_data_available.locked():
1!
109
                    # if more data was produced by simulator notify the tool control thread
UNCOV
110
                    tc.r_data_available.release()
×
111
                yield Timer(CLK_PERIOD)
1✔
112
                if tc.sim_done:
1✔
113
                    raise StopSimumulation()
1✔
114
                if not tc.ctl_thread_request_pending:
1!
115
                    # if no control opration is pending wait until some is requested
UNCOV
116
                    tc.ctl_thread_has_requests.acquire()
×
117

118
        self.procs.extend([time_sync(), sim_init()])
1✔
119

120
        buff_txt = StringIO()
1✔
121
        buff_dot = StringIO()
1✔
122

123
        ctl_thread = threading.Thread(target=run_DebugBusMonitorCtlSim,
1✔
124
                                      args=(self, buff_txt, buff_dot))
125
        ctl_thread.start()
1✔
126
        # actually takes less time as the simulation is stopped after ctl_thread end
127
        self.runSim(200000 * CLK_PERIOD)
1✔
128
        # handle the case where something went wrong and ctl thread is still running
129
        self.sim_done = True
1✔
130
        if self.r_data_available.locked():
1!
131
            self.r_data_available.release()
1✔
132
        ctl_thread.join()
1✔
133

134
        d = buff_txt.getvalue()
1✔
135
        self.assertEqual(d, """\
1✔
136
din0:
137
  data: 0x0
138
  vld: 0
139
  rd: 1
140
din0_snapshot:
141
  data: 0x2
142
  vld: 1
143
  rd: 1
144
dout0:
145
  data: 0x0
146
  vld: 0
147
  rd: 1
148
dout0_snapshot:
149
  data: 0x2
150
  vld: 1
151
  rd: 1
152
din1:
153
  data: 0x4
154
  vld: 1
155
  rd: 0
156
din1_snapshot:
157
  data: 0x3
158
  vld: 1
159
  rd: 1
160
reg:
161
  dataIn:
162
    data: 0x4
163
    vld: 1
164
    rd: 0
165
  dataIn_snapshot:
166
    data: 0x4
167
    vld: 1
168
    rd: 0
169
  dataOut:
170
    data: 0x3
171
    vld: 1
172
    rd: 0
173
  dataOut_snapshot:
174
    data: 0x0
175
    vld: 0
176
    rd: 0
177
dout1:
178
  data: 0x3
179
  vld: 1
180
  rd: 0
181
dout1_snapshot:
182
  data: 0x0
183
  vld: 0
184
  rd: 0
185
din2:
186
  data: 0x0
187
  vld: 0
188
  rd: 1
189
din2_snapshot:
190
  data: 0x0
191
  vld: 0
192
  rd: 1
193
dout2:
194
  data: 0x0
195
  vld: 0
196
  rd: 1
197
dout2_snapshot:
198
  data: 0x0
199
  vld: 0
200
  rd: 1
201
""")
202
        dot_file = os.path.join(os.path.dirname(__file__), "DebugBusMonitorExampleAxiTC.dot")
1✔
203
        # with open(dot_file, "w") as f:
204
        #    f.write(buff_dot.getvalue())
205
        with open(dot_file, "r") as f:
1✔
206
            self.assertEqual(buff_dot.getvalue(), f.read())
1✔
207

208

209
if __name__ == "__main__":
210
    testLoader = unittest.TestLoader()
211
    # suite = unittest.TestSuite([DebugBusMonitorExampleAxiTC("test_write")])
212
    suite = testLoader.loadTestsFromTestCase(DebugBusMonitorExampleAxiTC)
213
    runner = unittest.TextTestRunner(verbosity=3)
214
    runner.run(suite)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc