• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nbiotcloud / ucdp / 18093583429

29 Sep 2025 10:16AM UTC coverage: 90.562% (-6.1%) from 96.647%
18093583429

push

github

web-flow
Merge pull request #132 from nbiotcloud/update-deps

update dependencies

4673 of 5160 relevant lines covered (90.56%)

10.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.57
/src/ucdp/modbase.py
1
#
2
# MIT License
3
#
4
# Copyright (c) 2024-2025 nbiotcloud
5
#
6
# Permission is hereby granted, free of charge, to any person obtaining a copy
7
# of this software and associated documentation files (the "Software"), to deal
8
# in the Software without restriction, including without limitation the rights
9
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10
# copies of the Software, and to permit persons to whom the Software is
11
# furnished to do so, subject to the following conditions:
12
#
13
# The above copyright notice and this permission notice shall be included in all
14
# copies or substantial portions of the Software.
15
#
16
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22
# SOFTWARE.
23
#
24
"""
25
Base Hardware Module.
26

27
[BaseMod][ucdp.modbase.BaseMod] defines the base interface which is **common to all hardware modules**.
28
"""
29

30
from abc import abstractmethod
12✔
31
from functools import cached_property
12✔
32
from inspect import getmro
12✔
33
from typing import Any, ClassVar, Literal, Optional, TypeAlias, Union, no_type_check
12✔
34

35
from aligntext import align
12✔
36
from caseconverter import snakecase
12✔
37
from uniquer import uniquetuple
12✔
38

39
from .assigns import Assigns, Drivers, Note, Source
12✔
40
from .baseclassinfo import get_baseclassinfos
12✔
41
from .clkrel import ClkRel
12✔
42
from .clkrelbase import BaseClkRel
12✔
43
from .const import Const
12✔
44
from .consts import UPWARDS
12✔
45
from .doc import Doc
12✔
46
from .docutil import doc_from_type
12✔
47
from .exceptions import LockError
12✔
48
from .expr import BoolOp, Expr
12✔
49
from .exprparser import ExprParser, Parseable, cast_booltype
12✔
50
from .flipflop import FlipFlop
12✔
51
from .ident import Ident, Idents
12✔
52
from .iterutil import namefilter
12✔
53
from .logging import LOGGER
12✔
54
from .modref import ModRef, get_modclsname
12✔
55
from .modutil import get_modbaseinfos
12✔
56
from .mux import Mux
12✔
57
from .namespace import Namespace
12✔
58
from .nameutil import join_names, split_prefix
12✔
59
from .object import Field, NamedObject, Object, PrivateField, computed_field
12✔
60
from .orientation import FWD, IN, Direction, Orientation
12✔
61
from .param import Param
12✔
62
from .routepath import Routeables, RoutePath, parse_routepaths
12✔
63
from .signal import BaseSignal, Port, Signal
12✔
64
from .typebase import BaseType
12✔
65
from .typedescriptivestruct import DescriptiveStructType
12✔
66
from .typestruct import StructItem
12✔
67

68
ModTags: TypeAlias = set[str]
12✔
69
RoutingError: TypeAlias = Literal["error", "warn", "ignore"]
12✔
70

71

72
class BaseMod(NamedObject):
12✔
73
    """
74
    Hardware Module.
75

76
    Args:
77
        parent: Parent Module. `None` by default for top module.
78
        name: Instance name. Required if parent is provided.
79

80
    Keyword Args:
81
        title (str): Title
82
        descr (str): Description
83
        comment (str): Comment
84
        paramdict (dict): Parameter values for this instance.
85
    """
86

87
    filelists: ClassVar[Any] = ()
12✔
88
    tags: ClassVar[ModTags] = ModTags()
12✔
89

90
    parent: Optional["BaseMod"] = None
12✔
91
    paramdict: dict = Field(default_factory=dict, repr=False)
12✔
92

93
    title: str | None = None
12✔
94
    descr: str | None = None
12✔
95
    comment: str | None = None
12✔
96

97
    has_hiername: bool = True
12✔
98

99
    virtual: bool = False
12✔
100

101
    # private
102

103
    drivers: Drivers = Field(default_factory=Drivers, init=False, repr=False)
12✔
104
    namespace: Idents = Field(default_factory=Idents, init=False, repr=False)
12✔
105
    params: Idents = Field(default_factory=Idents, init=False, repr=False)
12✔
106
    ports: Idents = Field(default_factory=Idents, init=False, repr=False)
12✔
107
    portssignals: Idents = Field(default_factory=Idents, init=False, repr=False)
12✔
108
    insts: Namespace = Field(default_factory=Namespace, init=False, repr=False)
12✔
109

110
    _has_build_dep: bool = PrivateField(default=False)
12✔
111
    _has_build_final: bool = PrivateField(default=False)
12✔
112

113
    __is_locked: bool = PrivateField(default=False)
12✔
114
    __instcons: dict[str, tuple[Assigns, ExprParser]] = PrivateField(default_factory=dict)
12✔
115
    __flipflops: dict[int, FlipFlop] = PrivateField(default_factory=dict)
12✔
116
    __muxes: Namespace = PrivateField(default_factory=Namespace)
12✔
117
    __parents = PrivateField(default_factory=list)
12✔
118

119
    def __init__(self, parent: Optional["BaseMod"] = None, name: str | None = None, **kwargs):
12✔
120
        cls = self.__class__
12✔
121
        if not cls.__name__.endswith("Mod"):
12✔
122
            raise NameError(f"Name of {cls} MUST end with 'Mod'")
12✔
123
        if not name:
12✔
124
            if parent:
12✔
125
                raise ValueError("'name' is required for sub modules.")
12✔
126
            name = snakecase(cls.__name__.removesuffix("Mod"))
12✔
127
        super().__init__(parent=parent, name=name, **kwargs)  # type: ignore[call-arg]
12✔
128

129
    @property
12✔
130
    def doc(self) -> Doc:
12✔
131
        """Documentation."""
132
        return Doc(title=self.title, descr=self.descr, comment=self.comment)
12✔
133

134
    @property
12✔
135
    def basename(self) -> str:
12✔
136
        """Base Name Derived From Instance."""
137
        return split_prefix(self.name)[1]
12✔
138

139
    @property
12✔
140
    @abstractmethod
12✔
141
    def modname(self) -> str:
12✔
142
        """Module Name."""
143

144
    @property
12✔
145
    @abstractmethod
12✔
146
    def topmodname(self) -> str:
12✔
147
        """Top Module Name."""
148

149
    @property
12✔
150
    def libname(self) -> str:
12✔
151
        """Library Name."""
152
        return self.libpath.name
12✔
153

154
    @property
12✔
155
    @abstractmethod
12✔
156
    def libpath(self) -> str:
12✔
157
        """Library Path."""
158

159
    @cached_property
12✔
160
    def qualname(self) -> str:
12✔
161
        """Qualified Name (Library Name + Module Name)."""
162
        return f"{self.libname}.{self.modname}"
12✔
163

164
    @cached_property
12✔
165
    def basequalnames(self) -> tuple[str, ...]:
12✔
166
        """Qualified Name (Library Name + Module Name) of Base Modules."""
167
        return uniquetuple(f"{bci.libname}.{bci.modname}" for bci in get_modbaseinfos(self))
12✔
168

169
    @classmethod
12✔
170
    def get_modref(cls, minimal: bool = False) -> ModRef:
12✔
171
        """Python Class Reference."""
172
        bci = next(get_baseclassinfos(cls))
12✔
173
        modclsname = bci.clsname if not minimal or bci.clsname != get_modclsname(bci.modname) else None
12✔
174

175
        return ModRef(
12✔
176
            libname=bci.libname,
177
            modname=bci.modname,
178
            modclsname=modclsname,
179
        )
180

181
    @classmethod
12✔
182
    def get_basemodrefs(cls) -> tuple[ModRef, ...]:
12✔
183
        """Python Class Reference."""
184
        return tuple(
12✔
185
            ModRef(
186
                libname=bci.libname,
187
                modname=bci.modname,
188
                modclsname=bci.clsname,
189
            )
190
            for bci in get_modbaseinfos(cls)
191
        )
192

193
    @property
12✔
194
    def hiername(self) -> str:
12✔
195
        """Hierarchical Name."""
196
        mod: BaseMod | None = self
12✔
197
        names: list[str] = []
12✔
198
        while mod is not None:
12✔
199
            if mod.has_hiername:
12✔
200
                names.insert(0, split_prefix(mod.name)[1])
12✔
201
            mod = mod.parent
12✔
202
        return join_names(*names)
12✔
203

204
    @property
12✔
205
    @abstractmethod
12✔
206
    def is_tb(self) -> bool:
12✔
207
        """Determine if module belongs to Testbench or Design."""
208

209
    @property
12✔
210
    def path(self) -> tuple["BaseMod", ...]:
12✔
211
        """Path."""
212
        path = [self]
12✔
213
        parent = self.parent
12✔
214
        while parent:
12✔
215
            path.insert(0, parent)
12✔
216
            parent = parent.parent
12✔
217
        return tuple(path)
12✔
218

219
    @property
12✔
220
    def inst(self) -> str:
12✔
221
        """Path String."""
222
        parts: list[str] = []
12✔
223
        mod = self
12✔
224
        while mod.parent:
12✔
225
            parts.insert(0, mod.name)
12✔
226
            mod = mod.parent
12✔
227
        parts.insert(0, mod.modname)
12✔
228
        return "/".join(parts)
12✔
229

230
    @computed_field
12✔
231
    @cached_property
12✔
232
    def assigns(self) -> Assigns:
12✔
233
        """Assignments."""
234
        return Assigns(targets=self.portssignals, sources=self.namespace, drivers=self.drivers)
×
235

236
    @computed_field
12✔
237
    @cached_property
12✔
238
    def parser(self) -> ExprParser:
12✔
239
        """Expression Parser."""
240
        return ExprParser(namespace=self.namespace, context=str(self))
12✔
241

242
    @computed_field
12✔
243
    @cached_property
12✔
244
    def _router(self) -> "Router":
12✔
245
        """Router."""
246
        return Router(mod=self)
12✔
247

248
    @property
12✔
249
    def parents(self) -> tuple["BaseMod", ...]:
12✔
250
        """Parents."""
251
        return tuple(self.__parents)
12✔
252

253
    @classmethod
12✔
254
    def build_top(cls, **kwargs) -> "BaseMod":
12✔
255
        """
256
        Build Top Instance.
257

258
        Return module as top module.
259
        """
260
        return cls(**kwargs)
12✔
261

262
    def add_param(
12✔
263
        self,
264
        arg: BaseType | Param,
265
        name: str | None = None,
266
        title: str | None = None,
267
        descr: str | None = None,
268
        comment: str | None = None,
269
        ifdef: str | None = None,
270
        exist_ok: bool = False,
271
    ) -> Param:
272
        """
273
        Add Module Parameter (:any:`Param`).
274

275
        Args:
276
            arg: Type or Parameter
277
            name: Name. Mandatory if arg is a Type.
278

279
        Keyword Args:
280
            title: Full Spoken Name.
281
            descr: Documentation Description.
282
            comment: Source Code Comment.
283
            ifdef: IFDEF pragma
284
            exist_ok: Do not complain about already existing item
285
        """
286
        if isinstance(arg, Param):
12✔
287
            value = self.paramdict.pop(arg.name, None)
12✔
288
            param: Param = arg.new(value=value)
12✔
289
            assert name is None
12✔
290
        else:
291
            type_: BaseType = arg
12✔
292
            doc = doc_from_type(type_, title=title, descr=descr, comment=comment)
12✔
293
            value = self.paramdict.pop(name, None)
12✔
294
            param = Param(type_=type_, name=name, doc=doc, ifdef=ifdef, value=value)
12✔
295
        if self.__is_locked:
12✔
296
            raise LockError(f"{self}: Cannot add parameter {name!r}.")
12✔
297
        self.namespace.add(param, exist_ok=exist_ok)
12✔
298
        self.params.add(param, exist_ok=exist_ok)
12✔
299
        return param
12✔
300

301
    def add_const(
12✔
302
        self,
303
        arg: BaseType | Const,
304
        name: str | None = None,
305
        title: str | None = None,
306
        descr: str | None = None,
307
        comment: str | None = None,
308
        ifdef: str | None = None,
309
        exist_ok: bool = False,
310
    ) -> Const:
311
        """
312
        Add Module Internal Constant (:any:`Const`).
313

314
        Args:
315
            arg: Type or Parameter
316
            name: Name. Mandatory if arg is a Type.
317

318
        Keyword Args:
319
            title: Full Spoken Name.
320
            descr: Documentation Description.
321
            comment: Source Code Comment.
322
            ifdef: IFDEF pragma
323
            exist_ok: Do not complain about already existing item
324
        """
325
        if isinstance(arg, Const):
12✔
326
            const: Const = arg
12✔
327
            assert name is None
12✔
328
        else:
329
            type_: BaseType = arg
12✔
330
            doc = doc_from_type(type_, title=title, descr=descr, comment=comment)
12✔
331
            const = Const(type_=type_, name=name, doc=doc, ifdef=ifdef)
12✔
332
        if self.__is_locked:
12✔
333
            raise LockError(f"{self}: Cannot add constant {name!r}.")
12✔
334
        self.namespace.add(const, exist_ok=exist_ok)
12✔
335
        return const
12✔
336

337
    def add_type_consts(self, type_: BaseType, exist_ok: bool = False, only=None, name=None, item_suffix="e"):
12✔
338
        """
339
        Add description of `type_` as local parameters.
340

341
        Args:
342
            type_: Type to be described.
343

344
        Keyword Args:
345
            exist_ok (bool): Do not complain, if parameter already exists.
346
            name (str): Name of the local parameter. Base on `type_` name by default.
347
            only (str): Limit parameters to these listed in here, separated by ';'
348
            item_suffix (str): Enumeration Suffix.
349
        """
350
        name = name or snakecase(type_.__class__.__name__.removesuffix("Type"))
12✔
351
        if only:
12✔
352
            patfilter = namefilter(only)
×
353

354
            def filter_(item: StructItem):
×
355
                return patfilter(item.name)
×
356

357
            type_ = DescriptiveStructType(type_, filter_=filter_, enumitem_suffix=item_suffix)
×
358
        else:
359
            type_ = DescriptiveStructType(type_, enumitem_suffix=item_suffix)
12✔
360
        self.add_const(type_, name, exist_ok=exist_ok, title=type_.title, descr=type_.descr, comment=type_.comment)
12✔
361

362
    def add_port(
12✔
363
        self,
364
        type_: BaseType,
365
        name: str,
366
        direction: Direction | None = None,
367
        title: str | None = None,
368
        descr: str | None = None,
369
        comment: str | None = None,
370
        ifdef: str | None = None,
371
        route: Routeables | None = None,
372
        clkrel: str | Port | BaseClkRel | None = None,
373
    ) -> Port:
374
        """
375
        Add Module Port (:any:`Port`).
376

377
        Args:
378
            type_: Type.
379
            name: Name.
380

381
        Keyword Args:
382
            direction: Signal Direction. Automatically detected if `name` ends with '_i', '_o', '_io'.
383
            title: Full Spoken Name.
384
            descr: Documentation Description.
385
            comment: Source Code Comment. Default is 'title'
386
            ifdef: IFDEF mapping
387
            route: Routes (iterable or string separated by ';')
388
            clkrel: Clock relation.
389
        """
390
        doc = doc_from_type(type_, title, descr, comment)
12✔
391
        if direction is None:
12✔
392
            direction = Direction.from_name(name) or IN
12✔
393
        if clkrel is not None:
12✔
394
            clkrel = self._resolve_clkrel(clkrel)
12✔
395
        port = Port(type_, name, direction=direction, doc=doc, ifdef=ifdef, clkrel=clkrel)
12✔
396
        if self.__is_locked:
12✔
397
            raise LockError(f"{self}: Cannot add port {name!r}.")
12✔
398
        self.namespace[name] = port
12✔
399
        self.portssignals[name] = port
12✔
400
        self.ports[name] = port
12✔
401
        for routepath in parse_routepaths(route):
12✔
402
            self._router.add(RoutePath(expr=port), routepath)
12✔
403
        return port
12✔
404

405
    def _resolve_clkrel(self, clkrel: str | Port | BaseClkRel) -> BaseClkRel:
12✔
406
        if isinstance(clkrel, BaseClkRel):
12✔
407
            return clkrel
12✔
408
        if isinstance(clkrel, BaseSignal):
12✔
409
            return ClkRel(clk=clkrel)
×
410
        if isinstance(clkrel, str):
12✔
411
            port = self.ports[clkrel]
12✔
412
            return ClkRel(clk=port)
12✔
413
        raise ValueError(f"Invalid {clkrel=}")
×
414

415
    def add_signal(
12✔
416
        self,
417
        type_: BaseType,
418
        name: str,
419
        direction: Orientation = FWD,
420
        title: str | None = None,
421
        descr: str | None = None,
422
        comment: str | None = None,
423
        ifdef: str | None = None,
424
        route: Routeables | None = None,
425
        clkrel: str | Port | BaseClkRel | None = None,
426
    ) -> Signal:
427
        """
428
        Add Module Internal Signal (:any:`Signal`).
429

430
        Args:
431
            type_: Type.
432
            name: Name.
433

434
        Keyword Args:
435
            direction: Signal Direction. Automatically detected if `name` ends with '_i', '_o', '_io'.
436
            title: Full Spoken Name.
437
            descr: Documentation Description.
438
            comment: Source Code Comment. Default is 'title'
439
            ifdef: IFDEF mapping
440
            route: Routes (iterable or string separated by ';')
441
            clkrel: Clock relation.
442
        """
443
        doc = doc_from_type(type_, title, descr, comment)
12✔
444
        if clkrel is not None:
12✔
445
            clkrel = self._resolve_clkrel(clkrel)
×
446
        signal = Signal(type_, name, direction=direction, doc=doc, ifdef=ifdef, clkrel=clkrel)
12✔
447
        if self.__is_locked:
12✔
448
            raise LockError(f"{self}: Cannot add signal {name!r}.")
12✔
449
        self.namespace[name] = signal
12✔
450
        self.portssignals[name] = signal
12✔
451
        for routepath in parse_routepaths(route):
12✔
452
            self._router.add(RoutePath(expr=signal), routepath)
×
453
        return signal
12✔
454

455
    def add_port_or_signal(
12✔
456
        self,
457
        type_: BaseType,
458
        name: str,
459
        direction: Direction | None = None,
460
        title: str | None = None,
461
        descr: str | None = None,
462
        comment: str | None = None,
463
        ifdef: str | None = None,
464
        route: Routeables | None = None,
465
        clkrel: str | Port | BaseClkRel | None = None,
466
    ) -> BaseSignal:
467
        """
468
        Add Module Port (:any:`Port`) or Signal (:any:`Signal`) depending on name.
469

470
        Args:
471
            type_: Type.
472
            name: Name.
473

474
        Keyword Args:
475
            direction: Signal Direction. Automatically detected if `name` ends with '_i', '_o', '_io'.
476
            title: Full Spoken Name.
477
            descr: Documentation Description.
478
            comment: Source Code Comment. Default is 'title'
479
            ifdef: IFDEF mapping
480
            route: Routes (iterable or string separated by ';')
481
            clkrel: Clock relation.
482
        """
483
        if direction is None:
×
484
            direction = Direction.from_name(name)
×
485
        if direction is None:
×
486
            return self.add_signal(
×
487
                type_,
488
                name,
489
                title=title,
490
                descr=descr,
491
                comment=comment,
492
                ifdef=ifdef,
493
                route=route,
494
                clkrel=clkrel,
495
            )
496
        return self.add_port(
×
497
            type_,
498
            name,
499
            direction=direction,
500
            title=title,
501
            descr=descr,
502
            comment=comment,
503
            ifdef=ifdef,
504
            route=route,
505
            clkrel=clkrel,
506
        )
507

508
    def set_parent(self, parent: "BaseMod") -> None:
12✔
509
        """
510
        Set Parent.
511

512
        Do not use this method, until you really really really know what you do.
513
        """
514
        self.__parents.append(parent)
12✔
515

516
    def assign(
12✔
517
        self,
518
        target: Parseable,
519
        source: Parseable | Note,
520
        cast: bool = False,
521
        overwrite: bool = False,
522
    ):
523
        """
524
        Assign `target` to `source`.
525

526
        The assignment is done **without** routing.
527

528
        Args:
529
            target: Target to be driven. Must be known within this module.
530
            source: Source driving target. Must be known within this module.
531

532
        Keyword Args:
533
            cast (bool): Cast. `False` by default.
534
            overwrite (bool): Overwrite existing assignment.
535
            filter_ (str, Callable): Target names or function to filter target identifiers.
536
        """
537
        if self.__is_locked:
12✔
538
            raise LockError(f"{self}: Cannot add assign '{source}' to '{target}'.")
12✔
539
        parser = self.parser
×
540
        assigntarget: BaseSignal = parser.parse(target, only=BaseSignal)  # type: ignore[assignment]
×
541
        assignsource: Source = parser.parse_note(source, only=Source)  # type: ignore[arg-type]
×
542
        self.assigns.set(assigntarget, assignsource, cast=cast, overwrite=overwrite)
×
543

544
    def add_inst(self, inst: "BaseMod") -> None:
12✔
545
        """
546
        Add Submodule `inst`.
547

548
        Args:
549
            inst: Instance.
550
        """
551
        if self.__is_locked:
12✔
552
            raise LockError(f"{self}: Cannot add instance '{inst}'.")
12✔
553
        inst.set_parent(self)
12✔
554
        self.insts.add(inst)  # type: ignore[arg-type]
12✔
555
        assigns = Assigns(targets=inst.ports, sources=self.namespace, drivers=Drivers(), inst=True)
12✔
556
        parser = ExprParser(namespace=inst.ports, context=str(inst))
12✔
557
        self.__instcons[inst.name] = assigns, parser
12✔
558

559
    def get_inst(self, inst_or_name: Union["BaseMod", str]) -> "BaseMod":
12✔
560
        """
561
        Get Module Instance.
562
        """
563
        if not isinstance(inst_or_name, str):
12✔
564
            try:
12✔
565
                return self.insts[inst_or_name.name]
12✔
566
            except KeyError:
12✔
567
                raise ValueError(f"{inst_or_name} is not a sub-module of {self}") from None
12✔
568
        inst = self
12✔
569
        for part in inst_or_name.split("/"):
12✔
570
            if part == UPWARDS:
12✔
571
                if inst.parent is None:
12✔
572
                    raise ValueError(f"{self}: {inst} has no parent.")
12✔
573
                inst = inst.parent
12✔
574
            else:
575
                try:
12✔
576
                    inst = inst.insts.get_dym(part)  # type: ignore[assignment]
12✔
577
                except ValueError as exc:
12✔
578
                    raise ValueError(f"{self} has no sub-module {exc}") from None
12✔
579
        return inst
12✔
580

581
    def set_instcon(
12✔
582
        self,
583
        inst: Union["BaseMod", str],
584
        port: Parseable,
585
        expr: Parseable,
586
        cast: bool = False,
587
        overwrite: bool = False,
588
    ):
589
        """
590
        Connect `port` of `inst` to `expr` without routing.
591

592
        The assignment is done **without** routing.
593

594
        Args:
595
            inst: Module Instance
596
            port: Port to be connected. Must be known within module instance.
597
            expr: Expression. Must be known within this module.
598

599
        Keyword Args:
600
            cast: Cast. `False` by default.
601
            overwrite: Overwrite existing assignment.
602
        """
603
        if self.__is_locked:
12✔
604
            raise LockError(f"{self}: Cannot connect '{port}' of'{inst}' to '{expr}'.")
12✔
605
        mod: BaseMod = self.get_inst(inst)
12✔
606
        assigns, parser = self.__instcons[mod.name]
12✔
607
        assigntarget: BaseSignal = parser.parse(port, only=BaseSignal)  # type: ignore[assignment]
12✔
608
        assignsource: Source = self.parser.parse_note(expr, only=Source)  # type: ignore[arg-type]
12✔
609
        assigns.set(assigntarget, assignsource, cast=cast, overwrite=overwrite)
12✔
610

611
    def get_instcons(self, inst: Union["BaseMod", str]) -> Assigns:
12✔
612
        """Retrieve All Instance Connections Of `inst`."""
613
        mod: BaseMod = self.get_inst(inst)
12✔
614
        return self.__instcons[mod.name][0]
12✔
615

616
    def add_flipflop(
12✔
617
        self,
618
        type_: BaseType,
619
        name: str,
620
        clk: Parseable,
621
        rst_an: Parseable,
622
        nxt: Parseable | None = None,
623
        rst: Parseable | None = None,
624
        ena: Parseable | None = None,
625
        route: Routeables | None = None,
626
    ) -> Signal:
627
        """
628
        Add Module Internal Flip-Flop.
629

630
        Args:
631
            type_: Type.
632
            name: Name.
633
            clk: Clock. Module Clock by default.
634
            rst_an: Reset. Module Reset by default.
635

636
        Keyword Args:
637
            nxt: Next Value. Basename of `name` with _nxt_s by default.
638
            rst: Synchronous Reset.
639
            ena: Enable Condition.
640
            route: Routing of flip-flop output.
641
        """
642
        parser = self.parser
12✔
643
        if self.__is_locked:
12✔
644
            raise LockError(f"{self}: Cannot add flipflop {name!r}.")
12✔
645
        out = self.add_signal(type_, name)
12✔
646
        # clk
647
        clk_sig: BaseSignal = parser.parse(clk, only=BaseSignal)  # type: ignore[assignment]
12✔
648
        # rst_an
649
        rst_an_sig: BaseSignal = parser.parse(rst_an, only=BaseSignal)  # type: ignore[assignment]
12✔
650
        # nxt
651
        if nxt is None:
12✔
652
            nxt = self.add_signal(type_, f"{out.basename}_nxt_s")
12✔
653
        else:
654
            nxt = parser.parse(nxt)
12✔
655
            # TODO: check connectable of nxt and out?
656
        # rst
657
        rst_expr: BoolOp | None = cast_booltype(parser.parse(rst)) if rst is not None else None
12✔
658
        # ena
659
        ena_expr: BoolOp | None = cast_booltype(parser.parse(ena)) if ena is not None else None
12✔
660
        # flipflop
661
        flipflop = self._create_flipflop(clk_sig, rst_an_sig, rst_expr, ena_expr)
12✔
662
        flipflop.set(out, nxt)
12✔
663
        # route
664
        for routepath in parse_routepaths(route):
12✔
665
            self._router.add(RoutePath(expr=out), routepath)
×
666
        return out
12✔
667

668
    def _create_flipflop(
12✔
669
        self,
670
        clk: BaseSignal,
671
        rst_an: BaseSignal,
672
        rst: BoolOp | None = None,
673
        ena: BoolOp | None = None,
674
    ) -> FlipFlop:
675
        flipflops = self.__flipflops
12✔
676
        key = hash((clk, rst_an, rst, ena))
12✔
677
        try:
12✔
678
            return flipflops[key]
12✔
679
        except KeyError:
12✔
680
            pass
12✔
681
        flipflops[key] = flipflop = FlipFlop(
12✔
682
            clk=clk,
683
            rst_an=rst_an,
684
            rst=rst,
685
            ena=ena,
686
            targets=self.portssignals,
687
            sources=self.namespace,
688
            drivers=self.drivers,
689
        )
690
        return flipflop
12✔
691

692
    @property
12✔
693
    def flipflops(self) -> tuple[FlipFlop, ...]:
12✔
694
        """
695
        Flip Flops.
696
        """
697
        return tuple(self.__flipflops.values())
12✔
698

699
    def add_mux(
12✔
700
        self,
701
        name,
702
        title: str | None = None,
703
        descr: str | None = None,
704
        comment: str | None = None,
705
    ) -> Mux:
706
        """
707
        Add Multiplexer with `name` And Return It For Filling.
708

709
        Args:
710
            name (str): Name.
711

712
        Keyword Args:
713
            title (str): Full Spoken Name.
714
            descr (str): Documentation Description.
715
            comment (str): Source Code Comment.
716

717
        See :any:`Mux.set()` how to fill the multiplexer and the example above.
718
        """
719
        if self.__is_locked:
12✔
720
            raise LockError(f"{self}: Cannot add mux {name!r}.")
12✔
721
        doc = Doc(title=title, descr=descr, comment=comment)
12✔
722
        self.__muxes[name] = mux = Mux(
12✔
723
            name=name,
724
            targets=self.portssignals,
725
            namespace=self.namespace,
726
            # drivers=self.drivers,
727
            parser=self.parser,
728
            doc=doc,
729
        )
730
        return mux
12✔
731

732
    @property
12✔
733
    def muxes(self) -> tuple[Mux, ...]:
12✔
734
        """
735
        Iterate over all Multiplexer.
736
        """
737
        return tuple(self.__muxes.values())
12✔
738

739
    def get_mux(self, mux: Mux | str) -> Mux:
12✔
740
        """Get Multiplexer."""
741
        if not isinstance(mux, str):
12✔
742
            return self.__muxes.get_dym(mux.name)  # type: ignore[return-value]
12✔
743
        return self.__muxes.get_dym(mux)  # type: ignore[return-value]
12✔
744

745
    @property
12✔
746
    def is_locked(self) -> bool:
12✔
747
        """
748
        Return If Module Is Already Completed And Locked For Modification.
749

750
        Locking is done by the build process **automatically** and **MUST NOT** be done earlier or later.
751
        Use a different module type or enumeration or struct type, if you have issues with locking.
752
        """
753
        return self.__is_locked
12✔
754

755
    def lock(self):
12✔
756
        """
757
        Lock.
758

759
        Locking is done via this method by the build process **automatically** and **MUST NOT** be done earlier or
760
        later. Use a different module type or enumeration or struct type, if you have issues with locking.
761
        """
762
        if self.__is_locked:
12✔
763
            raise LockError(f"{self} is already locked. Cannot lock again.")
12✔
764
        for _, obj in self:
12✔
765
            if isinstance(obj, Namespace):
12✔
766
                obj.lock()
12✔
767
        self.__is_locked = True
12✔
768

769
    def check_lock(self):
12✔
770
        """Check if module is locked for modifications."""
771
        if self.__is_locked:
×
772
            raise LockError(f"{self}: Is already locked for modifications.")
×
773

774
    def con(self, port: Routeables, source: Routeables, on_error: RoutingError = "error"):
12✔
775
        """Connect `port` to `dest`."""
776
        parents = self.__parents
12✔
777
        if not parents:
12✔
778
            raise TypeError(f"{self} is top module. Connections cannot be made.")
×
779
        router = parents[-1]._router
12✔
780
        for subtarget in parse_routepaths(port, basepath=self.name):
12✔
781
            for subsource in parse_routepaths(source):
12✔
782
                router.add(subtarget, subsource, on_error=on_error)
12✔
783

784
    def route(self, target: Routeables, source: Routeables, on_error: RoutingError = "error"):
12✔
785
        """Route `source` to `target` within the actual module."""
786
        router = self._router
×
787
        for subtarget in parse_routepaths(target):
×
788
            for subsource in parse_routepaths(source):
×
789
                router.add(subtarget, subsource, on_error=on_error)
×
790

791
    def __str__(self):
12✔
792
        modref = self.get_modref()
12✔
793
        return f"<{modref}(inst={self.inst!r}, libname={self.libname!r}, modname={self.modname!r})>"
12✔
794

795
    def __repr__(self):
12✔
796
        modref = self.get_modref()
12✔
797
        return f"<{modref}(inst={self.inst!r}, libname={self.libname!r}, modname={self.modname!r})>"
12✔
798

799
    def get_overview(self) -> str:
12✔
800
        """
801
        Return Brief Module Overview.
802

803
        This Module Overview is intended to be overwritten by the user.
804
        """
805
        return ""
×
806

807
    def get_info(self, sub: bool = False) -> str:
12✔
808
        """Module Information."""
809
        header = f"## `{self.libname}.{self.modname}` (`{self.get_modref()}`)"
×
810
        parts = [
×
811
            header,
812
            self._get_ident_info("Parameters", self.params),
813
            self._get_ident_info("Ports", self.ports),
814
        ]
815
        if sub:
×
816
            parts.append(self._get_sub_info())
×
817
        return "\n\n".join(parts)
×
818

819
    def _get_ident_info(self, title: str, idents: Idents):
12✔
820
        def entry(level, ident):
×
821
            pre = "  " * level
×
822
            dinfo = f" ({ident.direction})" if ident.direction else ""
×
823
            return (
×
824
                f"{pre}{ident.name}{dinfo}",
825
                f"{pre}{ident.type_}",
826
            )
827

828
        parts = [
×
829
            f"### {title}",
830
            "",
831
        ]
832
        if idents:
×
833
            data = [("Name ", "Type"), ("----", "----")]
×
834
            data += [entry(level, ident) for level, ident in idents.leveliter()]
×
835
            parts.append(align(data, seps=(" | ", " |"), sepfirst="| "))
×
836
        else:
837
            parts.append("-")
×
838
        return "\n".join(parts)
×
839

840
    def _get_sub_info(self) -> str:
12✔
841
        parts = [
×
842
            "### Submodules",
843
            "",
844
        ]
845
        if self.insts:
×
846
            data = [("Name", "Module"), ("----", "------")]
×
847
            data += [(f"`{inst.name}`", f"`{inst.libname}.{inst.modname}`") for inst in self.insts]
×
848
            parts.append(align(data, seps=(" | ", " |"), sepfirst="| "))
×
849
        else:
850
            parts.append("-")
×
851
        return "\n".join(parts)
×
852

853

854
class Router(Object):
12✔
855
    """The One And Only Router."""
856

857
    mod: BaseMod
12✔
858
    __routes: list[tuple[RoutePath, RoutePath, RoutingError]] = PrivateField(default_factory=list)
12✔
859

860
    def add(self, tpath: RoutePath, spath: RoutePath, on_error: RoutingError = "error") -> None:
12✔
861
        """Add route from `source` to `tpath`."""
862
        LOGGER.debug("%s: router: add '%s' to '%s'", self.mod, spath, tpath)
12✔
863
        self.__routes.append(self._create(tpath, spath, on_error))
12✔
864

865
    def flush(self) -> None:
12✔
866
        """Create Pending Routes."""
867
        for tpath, spath, on_error in self.__routes:
12✔
868
            tpathc, spathc, on_errorc = self._create(tpath, spath, on_error)
12✔
869
            try:
12✔
870
                self._route(tpathc, spathc)
12✔
871
            except Exception as exc:
12✔
872
                if on_errorc == "ignore":
12✔
873
                    LOGGER.info("Ignored: %s", exc)
12✔
874
                elif on_errorc == "warn":
12✔
875
                    LOGGER.warning(exc)
12✔
876
                else:
877
                    raise
12✔
878
        self.__routes.clear()
12✔
879

880
    def _create(
12✔
881
        self, tpath: RoutePath, spath: RoutePath, on_error: RoutingError
882
    ) -> tuple[RoutePath, RoutePath, RoutingError]:
883
        if tpath.create:
12✔
884
            if self.__create(spath, tpath):
12✔
885
                tpath = tpath.new(create=False)
12✔
886
        elif spath.create:
12✔
887
            if self.__create(tpath, spath):
12✔
888
                spath = spath.new(create=False)
12✔
889
        return tpath, spath, on_error
12✔
890

891
    @no_type_check  # TODO: fix types
12✔
892
    def __create(self, rpath: RoutePath, cpath: RoutePath) -> bool:
12✔
893
        """Create `cpath` based on `rpath`."""
894
        assert not rpath.create
12✔
895
        assert cpath.create
12✔
896
        mod = self.mod
12✔
897
        # Resolve reference path
898
        try:
12✔
899
            rmod = mod.get_inst(rpath.path) if rpath.path else mod
12✔
900
            rident: Ident = rmod.parser.parse(rpath.expr, only=Ident)  # type: ignore[assignment]
12✔
901
            cmod = mod.get_inst(cpath.path) if cpath.path else mod
12✔
902
        except (ValueError, NameError, KeyError):
12✔
903
            return False
12✔
904
        self.__create_port_or_signal(cmod, rident, cpath.expr)
12✔
905
        return True
12✔
906

907
    @no_type_check  # TODO: fix types
12✔
908
    def _route(self, tpath: RoutePath, spath: RoutePath):  # noqa: C901, PLR0912, PLR0915
12✔
909
        mod = self.mod
12✔
910
        LOGGER.debug("%s router: routing %r to %r", mod, spath, tpath)
12✔
911
        # Referenced modules
912
        tmod = mod.get_inst(tpath.path) if tpath.path else mod
12✔
913
        smod = mod.get_inst(spath.path) if spath.path else mod
12✔
914
        # Referenced expression/signal
915
        texpr = tmod.parser.parse(tpath.expr) if not isinstance(tpath.expr, Note) else tpath.expr
12✔
916
        sexpr = smod.parser.parse(spath.expr) if not isinstance(spath.expr, Note) else spath.expr
12✔
917
        tident = None if not isinstance(texpr, Ident) else texpr
12✔
918
        sident = None if not isinstance(sexpr, Ident) else sexpr
12✔
919
        tparts = tpath.parts
12✔
920
        sparts = spath.parts
12✔
921
        # One of the both sides need to exist
922
        rident = tident or sident
12✔
923
        assert rident is not None
12✔
924
        direction = (
12✔
925
            tident.direction * sident.direction
926
            if tident and tident.direction is not None and sident and sident.direction is not None
927
            else rident.direction
928
        )
929

930
        cast = _merge_cast(tpath.cast, spath.cast)
12✔
931
        assert len(tparts) in (0, 1)
12✔
932
        assert len(sparts) in (0, 1)
12✔
933
        if tparts:
12✔
934
            # target is submodule
935
            assert tparts[0] != UPWARDS
12✔
936
            if sparts:
12✔
937
                assert sparts[0] != UPWARDS
12✔
938
                # source and target are submodules
939
                tcon = None if tident is None else mod.get_instcons(tmod).get(tident)
12✔
940
                scon = None if sident is None else mod.get_instcons(smod).get(sident)
12✔
941
                if tcon is None and scon is None:
12✔
942
                    modname = split_prefix(tmod.name)[1]
12✔
943
                    name = join_names(modname, rident.name, "s")
12✔
944
                    rsig = mod.add_signal(rident.type_, name, ifdef=rident.ifdef, direction=direction)
12✔
945
                    Router.__routesubmod(mod, tmod, rident, texpr, rsig, tname=tpath.expr, cast=tpath.cast)
12✔
946
                    Router.__routesubmod(mod, smod, rident, sexpr, rsig, tname=spath.expr, cast=spath.cast)
12✔
947
                elif tcon is None:
×
948
                    Router.__routesubmod(mod, tmod, rident, texpr, scon, tname=tpath.expr, cast=tpath.cast)
×
949
                elif scon is None:
×
950
                    Router.__routesubmod(mod, smod, rident, sexpr, tcon, tname=spath.expr, cast=spath.cast)
×
951
                else:
952
                    mod.assign(tcon, scon, cast=cast)
×
953
            else:
954
                tcon = None if tident is None else mod.get_instcons(tmod).get(tident)
12✔
955
                if tcon is None:
12✔
956
                    Router.__routesubmod(mod, tmod, rident, texpr, sexpr, tpath.expr, spath.expr, cast=cast)
12✔
957
                else:
958
                    if sexpr is None:
×
959
                        sexpr = Router.__create_port_or_signal(mod, rident, spath.expr)
×
960
                    mod.assign(tcon, sexpr, cast=cast)
×
961
        elif sparts:
12✔
962
            scon = None if sident is None else mod.get_instcons(smod).get(sident)
12✔
963
            assert sparts[0] != UPWARDS
12✔
964
            if scon is None:
12✔
965
                Router.__routesubmod(mod, smod, rident, sexpr, texpr, spath.expr, tpath.expr, cast=cast)
12✔
966
            else:
967
                if texpr is None:
×
968
                    texpr = Router.__create_port_or_signal(mod, rident, tpath.expr)
×
969
                mod.assign(scon, texpr, cast=cast)
×
970
        else:
971
            # connect signals of `mod`
972
            if texpr is None:
×
973
                texpr = Router.__create_port_or_signal(mod, rident, tpath.expr)
×
974
            if sexpr is None:
×
975
                sexpr = Router.__create_port_or_signal(mod, rident, spath.expr)
×
976
            mod.assign(texpr, sexpr, cast=cast)
×
977

978
    @staticmethod
12✔
979
    def __routesubmod(
12✔
980
        mod: BaseMod, submod: BaseMod, rident: Ident, texpr, sexpr, tname=None, sname=None, cast=False
981
    ) -> Expr:
982
        if texpr is None:
12✔
983
            assert tname is not None
×
984
            assert rident is not None and rident.type_
×
985
            texpr = submod.add_port(rident.type_, tname, ifdef=rident.ifdef)
×
986
        if sexpr is None:
12✔
987
            sexpr = Router.__create_port_or_signal(mod, rident, sname)
×
988
        if not isinstance(texpr, Port):
12✔
989
            raise ValueError(f"Cannot route {type(texpr)} to module instance {submod}")
×
990
        try:
12✔
991
            mod.set_instcon(submod, texpr, sexpr, cast=cast)
12✔
992
        except TypeError as err:
12✔
993
            raise TypeError(f"{mod}: {err}") from None
12✔
994
        return sexpr
12✔
995

996
    @staticmethod
12✔
997
    def __create_port_or_signal(mod: BaseMod, rident: Ident, name: str) -> BaseSignal:
12✔
998
        assert isinstance(rident, Ident)
12✔
999
        assert name is not None
12✔
1000
        assert rident is not None and rident.type_ is not None, (mod, name)
12✔
1001
        type_ = rident.type_
12✔
1002
        direction = Direction.from_name(name)
12✔
1003
        signal: BaseSignal
1004
        if direction is not None:
12✔
1005
            signal = mod.add_port(type_, name, ifdef=rident.ifdef, direction=direction)
12✔
1006
        else:
1007
            signal = mod.add_signal(type_, name, ifdef=rident.ifdef)
12✔
1008
        LOGGER.debug("%s: router: creating %r", mod, signal)
12✔
1009
        return signal
12✔
1010

1011

1012
def _merge_cast(one, other):
12✔
1013
    # TODO: get rid of this.
1014
    if one or other:
12✔
1015
        return True
×
1016
    if one is None or other is None:
12✔
1017
        return None
×
1018
    return False
12✔
1019

1020

1021
ModCls: TypeAlias = type[BaseMod]
12✔
1022
ModClss: TypeAlias = set[type[BaseMod]]
12✔
1023

1024

1025
def get_modbaseclss(cls):
12✔
1026
    """Get Module Base Classes."""
1027
    clss = []
12✔
1028
    for basecls in getmro(cls):
12✔
1029
        if basecls is BaseMod:
12✔
1030
            break
12✔
1031
        clss.append(basecls)
12✔
1032
    return clss
12✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc