• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nbiotcloud / ucdp / 20206835012

14 Dec 2025 10:48AM UTC coverage: 90.366% (+0.08%) from 90.288%
20206835012

Pull #150

github

iccode17
add title, descr, comment aliases
Pull Request #150: add title, descr, comment aliases

20 of 24 new or added lines in 2 files covered. (83.33%)

55 existing lines in 2 files now uncovered.

4887 of 5408 relevant lines covered (90.37%)

13.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.74
/src/ucdp/modbase.py
1
#
2
# MIT License
3
#
4
# Copyright (c) 2024-2025 nbiotcloud
5
#
6
# Permission is hereby granted, free of charge, to any person obtaining a copy
7
# of this software and associated documentation files (the "Software"), to deal
8
# in the Software without restriction, including without limitation the rights
9
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10
# copies of the Software, and to permit persons to whom the Software is
11
# furnished to do so, subject to the following conditions:
12
#
13
# The above copyright notice and this permission notice shall be included in all
14
# copies or substantial portions of the Software.
15
#
16
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22
# SOFTWARE.
23
#
24
"""
25
Base Hardware Module.
26

27
[BaseMod][ucdp.modbase.BaseMod] defines the base interface which is **common to all hardware modules**.
28
"""
29

30
import warnings
15✔
31
from abc import abstractmethod
15✔
32
from functools import cached_property
15✔
33
from inspect import getmro
15✔
34
from typing import Any, ClassVar, Literal, Optional, TypeAlias, Union, no_type_check
15✔
35

36
from aligntext import align
15✔
37
from caseconverter import snakecase
15✔
38
from uniquer import uniquetuple
15✔
39

40
from .assigns import Assigns, Drivers, Note, Source
15✔
41
from .baseclassinfo import get_baseclassinfos
15✔
42
from .clkrel import ClkRel
15✔
43
from .clkrelbase import BaseClkRel
15✔
44
from .const import Const
15✔
45
from .consts import UPWARDS
15✔
46
from .define import Defines, cast_defines
15✔
47
from .doc import Doc
15✔
48
from .docutil import doc_from_type
15✔
49
from .exceptions import LockError
15✔
50
from .expr import BoolOp, Expr
15✔
51
from .exprparser import ExprParser, Parseable, cast_booltype
15✔
52
from .flipflop import FlipFlop
15✔
53
from .ident import Ident, Idents
15✔
54
from .ifdef import Ifdefs, cast_ifdefs, resolve_ifdefs
15✔
55
from .iterutil import namefilter
15✔
56
from .logging import LOGGER
15✔
57
from .modref import ModRef, get_modclsname
15✔
58
from .modutil import get_modbaseinfos
15✔
59
from .mux import Mux
15✔
60
from .namespace import Namespace
15✔
61
from .nameutil import join_names, split_prefix
15✔
62
from .object import Field, NamedObject, Object, PrivateField, computed_field
15✔
63
from .orientation import FWD, IN, Direction, Orientation
15✔
64
from .param import Param
15✔
65
from .routepath import Routeables, RoutePath, parse_routepaths
15✔
66
from .signal import BaseSignal, Port, Signal
15✔
67
from .typebase import BaseType
15✔
68
from .typedescriptivestruct import DescriptiveStructType
15✔
69
from .typestruct import StructItem
15✔
70

71
ModTags: TypeAlias = set[str]
15✔
72
RoutingError: TypeAlias = Literal["error", "warn", "ignore"]
15✔
73

74

75
class BaseMod(NamedObject):
15✔
76
    """
77
    Hardware Module.
78

79
    Args:
80
        parent: Parent Module. `None` by default for top module.
81
        name: Instance name. Required if parent is provided.
82

83
    Keyword Args:
84
        title (str): Title
85
        descr (str): Description
86
        comment (str): Comment
87
        paramdict (dict): Parameter values for this instance.
88
    """
89

90
    filelists: ClassVar[Any] = ()
15✔
91
    tags: ClassVar[ModTags] = ModTags()
15✔
92

93
    parent: Optional["BaseMod"] = None
15✔
94
    paramdict: dict = Field(default_factory=dict, repr=False)
15✔
95

96
    title: str | None = None
15✔
97
    descr: str | None = None
15✔
98
    comment: str | None = None
15✔
99

100
    has_hiername: bool = True
15✔
101

102
    virtual: bool = False
15✔
103

104
    # private
105

106
    drivers: Drivers = Field(default_factory=Drivers, init=False, repr=False)
15✔
107
    defines: Defines | None  # initialized by __init__
12✔
108
    namespace: Idents = Field(repr=False)  # initialized by __init__
15✔
109
    params: Idents = Field(default_factory=Idents, init=False, repr=False)
15✔
110
    ports: Idents = Field(default_factory=Idents, init=False, repr=False)
15✔
111
    portssignals: Idents = Field(default_factory=Idents, init=False, repr=False)
15✔
112
    insts: Namespace = Field(default_factory=Namespace, init=False, repr=False)
15✔
113

114
    _has_build_dep: bool = PrivateField(default=False)
15✔
115
    _has_build_final: bool = PrivateField(default=False)
15✔
116

117
    __is_locked: bool = PrivateField(default=False)
15✔
118
    __instcons: dict[str, tuple[Assigns, ExprParser]] = PrivateField(default_factory=dict)
15✔
119
    __flipflops: dict[int, FlipFlop] = PrivateField(default_factory=dict)
15✔
120
    __muxes: Namespace = PrivateField(default_factory=Namespace)
15✔
121
    __parents = PrivateField(default_factory=list)
15✔
122

123
    def __init__(self, parent: Optional["BaseMod"] = None, name: str | None = None, defines=None, **kwargs):
15✔
124
        cls = self.__class__
15✔
125
        if not cls.__name__.endswith("Mod"):
15✔
126
            raise NameError(f"Name of {cls} MUST end with 'Mod'")
15✔
127
        if not name:
15✔
128
            if parent:
15✔
129
                raise ValueError("'name' is required for sub modules.")
15✔
130
            name = snakecase(cls.__name__.removesuffix("Mod"))
15✔
131
        namespace = Idents()
15✔
132
        defines = cast_defines(defines)
15✔
133
        if defines:
15✔
134
            namespace.update(defines)
15✔
135
        super().__init__(parent=parent, name=name, namespace=namespace, defines=defines, **kwargs)  # type: ignore[call-arg]
15✔
136

137
    @property
15✔
138
    def doc(self) -> Doc:
15✔
139
        """Documentation."""
140
        return Doc(title=self.title, descr=self.descr, comment=self.comment)
15✔
141

142
    @property
15✔
143
    def basename(self) -> str:
15✔
144
        """Base Name Derived From Instance."""
145
        return split_prefix(self.name)[1]
15✔
146

147
    @property
15✔
148
    @abstractmethod
15✔
149
    def modname(self) -> str:
15✔
150
        """Module Name."""
151

152
    @property
15✔
153
    @abstractmethod
15✔
154
    def topmodname(self) -> str:
15✔
155
        """Top Module Name."""
156

157
    @property
15✔
158
    def libname(self) -> str:
15✔
159
        """Library Name."""
160
        return self.libpath.name
15✔
161

162
    @property
15✔
163
    @abstractmethod
15✔
164
    def libpath(self) -> str:
15✔
165
        """Library Path."""
166

167
    @cached_property
15✔
168
    def qualname(self) -> str:
15✔
169
        """Qualified Name (Library Name + Module Name)."""
170
        return f"{self.libname}.{self.modname}"
15✔
171

172
    @cached_property
15✔
173
    def basequalnames(self) -> tuple[str, ...]:
15✔
174
        """Qualified Name (Library Name + Module Name) of Base Modules."""
175
        return uniquetuple(f"{bci.libname}.{bci.modname}" for bci in get_modbaseinfos(self))
15✔
176

177
    @classmethod
15✔
178
    def get_modref(cls, minimal: bool = False) -> ModRef:
15✔
179
        """Python Class Reference."""
180
        bci = next(get_baseclassinfos(cls))
15✔
181
        modclsname = bci.clsname if not minimal or bci.clsname != get_modclsname(bci.modname) else None
15✔
182

183
        return ModRef(
15✔
184
            libname=bci.libname,
185
            modname=bci.modname,
186
            modclsname=modclsname,
187
        )
188

189
    @classmethod
15✔
190
    def get_basemodrefs(cls) -> tuple[ModRef, ...]:
15✔
191
        """Python Class Reference."""
192
        return tuple(
15✔
193
            ModRef(
194
                libname=bci.libname,
195
                modname=bci.modname,
196
                modclsname=bci.clsname,
197
            )
198
            for bci in get_modbaseinfos(cls)
199
        )
200

201
    @property
15✔
202
    def hiername(self) -> str:
15✔
203
        """Hierarchical Name."""
204
        mod: BaseMod | None = self
15✔
205
        names: list[str] = []
15✔
206
        while mod is not None:
15✔
207
            if mod.has_hiername:
15✔
208
                names.insert(0, split_prefix(mod.name)[1])
15✔
209
            mod = mod.parent
15✔
210
        return join_names(*names)
15✔
211

212
    @property
15✔
213
    @abstractmethod
15✔
214
    def is_tb(self) -> bool:
15✔
215
        """Determine if module belongs to Testbench or Design."""
216

217
    @property
15✔
218
    def path(self) -> tuple["BaseMod", ...]:
15✔
219
        """Path."""
220
        path = [self]
15✔
221
        parent = self.parent
15✔
222
        while parent:
15✔
223
            path.insert(0, parent)
15✔
224
            parent = parent.parent
15✔
225
        return tuple(path)
15✔
226

227
    @property
15✔
228
    def inst(self) -> str:
15✔
229
        """Path String."""
230
        parts: list[str] = []
15✔
231
        mod = self
15✔
232
        while mod.parent:
15✔
233
            parts.insert(0, mod.name)
15✔
234
            mod = mod.parent
15✔
235
        parts.insert(0, mod.modname)
15✔
236
        return "/".join(parts)
15✔
237

238
    @computed_field
15✔
239
    @cached_property
15✔
240
    def assigns(self) -> Assigns:
15✔
241
        """Assignments."""
UNCOV
242
        return Assigns(targets=self.portssignals, sources=self.namespace, drivers=self.drivers)
×
243

244
    @computed_field
15✔
245
    @cached_property
15✔
246
    def parser(self) -> ExprParser:
15✔
247
        """Expression Parser."""
248
        return ExprParser(namespace=self.namespace, context=str(self))
15✔
249

250
    @computed_field
15✔
251
    @cached_property
15✔
252
    def _router(self) -> "Router":
15✔
253
        """Router."""
254
        return Router(mod=self)
15✔
255

256
    @property
15✔
257
    def parents(self) -> tuple["BaseMod", ...]:
15✔
258
        """Parents."""
259
        return tuple(self.__parents)
15✔
260

261
    @classmethod
15✔
262
    def build_top(cls, **kwargs) -> "BaseMod":
15✔
263
        """
264
        Build Top Instance.
265

266
        Return module as top module.
267
        """
268
        return cls(**kwargs)
15✔
269

270
    def add_param(
15✔
271
        self,
272
        arg: BaseType | Param,
273
        name: str | None = None,
274
        title: str | None = None,
275
        descr: str | None = None,
276
        comment: str | None = None,
277
        ifdef: str | None = None,
278
        ifdefs: Ifdefs | str | None = None,
279
        exist_ok: bool = False,
280
    ) -> Param | None:
281
        """
282
        Add Module Parameter (:any:`Param`).
283

284
        Args:
285
            arg: Type or Parameter
286
            name: Name. Mandatory if arg is a Type.
287

288
        Keyword Args:
289
            title: Full Spoken Name.
290
            descr: Documentation Description.
291
            comment: Source Code Comment.
292
            ifdef: IFDEF pragma. Obsolete.
293
            ifdefs: IFDEFs pragmas.
294
            exist_ok: Do not complain about already existing item
295
        """
296
        if ifdef:
15✔
UNCOV
297
            warnings.warn(
×
298
                "add_param(..., ifdef=...) is obsolete, please use ifdefs=", category=DeprecationWarning, stacklevel=1
299
            )
300
        ifdefs = cast_ifdefs(ifdefs or ifdef)
15✔
301
        rifdefs = resolve_ifdefs(self.defines, ifdefs)
15✔
302
        if rifdefs is None:
15✔
UNCOV
303
            return None
×
304
        if isinstance(arg, Param):
15✔
305
            value = self.paramdict.pop(arg.name, None)
15✔
306
            param: Param = arg.new(value=value)
15✔
307
            assert name is None
15✔
308
        else:
309
            type_: BaseType = arg
15✔
310
            doc = doc_from_type(type_, title=title, descr=descr, comment=comment)
15✔
311
            value = self.paramdict.pop(name, None)
15✔
312
            param = Param(type_=type_, name=name, doc=doc, ifdefs=rifdefs, value=value)
15✔
313
        if self.__is_locked:
15✔
314
            raise LockError(f"{self}: Cannot add parameter {name!r}.")
15✔
315
        self.namespace.add(param, exist_ok=exist_ok)
15✔
316
        self.params.add(param, exist_ok=exist_ok)
15✔
317
        return param
15✔
318

319
    def add_const(
15✔
320
        self,
321
        arg: BaseType | Const,
322
        name: str | None = None,
323
        title: str | None = None,
324
        descr: str | None = None,
325
        comment: str | None = None,
326
        ifdef: str | None = None,
327
        ifdefs: Ifdefs | str | None = None,
328
        exist_ok: bool = False,
329
    ) -> Const | None:
330
        """
331
        Add Module Internal Constant (:any:`Const`).
332

333
        Args:
334
            arg: Type or Parameter
335
            name: Name. Mandatory if arg is a Type.
336

337
        Keyword Args:
338
            title: Full Spoken Name.
339
            descr: Documentation Description.
340
            comment: Source Code Comment.
341
            ifdef: IFDEF pragma. Obsolete.
342
            ifdefs: IFDEFs pragmas.
343
            exist_ok: Do not complain about already existing item
344
        """
345
        if ifdef:
15✔
UNCOV
346
            warnings.warn(
×
347
                "add_const(..., ifdef=...) is obsolete, please use ifdefs=", category=DeprecationWarning, stacklevel=1
348
            )
349
        ifdefs = cast_ifdefs(ifdefs or ifdef)
15✔
350
        rifdefs = resolve_ifdefs(self.defines, ifdefs)
15✔
351
        if rifdefs is None:
15✔
UNCOV
352
            return None
×
353
        if isinstance(arg, Const):
15✔
354
            const: Const = arg
15✔
355
            assert name is None
15✔
356
        else:
357
            type_: BaseType = arg
15✔
358
            doc = doc_from_type(type_, title=title, descr=descr, comment=comment)
15✔
359
            const = Const(type_=type_, name=name, doc=doc, ifdefs=rifdefs)
15✔
360
        if self.__is_locked:
15✔
361
            raise LockError(f"{self}: Cannot add constant {name!r}.")
15✔
362
        self.namespace.add(const, exist_ok=exist_ok)
15✔
363
        return const
15✔
364

365
    def add_type_consts(self, type_: BaseType, exist_ok: bool = False, only=None, name=None, item_suffix="e"):
15✔
366
        """
367
        Add description of `type_` as local parameters.
368

369
        Args:
370
            type_: Type to be described.
371

372
        Keyword Args:
373
            exist_ok (bool): Do not complain, if parameter already exists.
374
            name (str): Name of the local parameter. Base on `type_` name by default.
375
            only (str): Limit parameters to these listed in here, separated by ';'
376
            item_suffix (str): Enumeration Suffix.
377
        """
378
        name = name or snakecase(type_.__class__.__name__.removesuffix("Type"))
15✔
379
        if only:
15✔
UNCOV
380
            patfilter = namefilter(only)
×
381

382
            def filter_(item: StructItem):
×
UNCOV
383
                return patfilter(item.name)
×
384

385
            type_ = DescriptiveStructType(type_, filter_=filter_, enumitem_suffix=item_suffix)
×
386
        else:
387
            type_ = DescriptiveStructType(type_, enumitem_suffix=item_suffix)
15✔
388
        self.add_const(type_, name, exist_ok=exist_ok, title=type_.title, descr=type_.descr, comment=type_.comment)
15✔
389

390
    def add_port(
15✔
391
        self,
392
        type_: BaseType,
393
        name: str,
394
        direction: Direction | None = None,
395
        title: str | None = None,
396
        descr: str | None = None,
397
        comment: str | None = None,
398
        ifdef: str | None = None,
399
        ifdefs: Ifdefs | str | None = None,
400
        route: Routeables | None = None,
401
        clkrel: str | Port | BaseClkRel | None = None,
402
    ) -> Port | None:
403
        """
404
        Add Module Port (:any:`Port`).
405

406
        Args:
407
            type_: Type.
408
            name: Name.
409

410
        Keyword Args:
411
            direction: Signal Direction. Automatically detected if `name` ends with '_i', '_o', '_io'.
412
            title: Full Spoken Name.
413
            descr: Documentation Description.
414
            comment: Source Code Comment. Default is 'title'
415
            ifdef: IFDEF pragma. Obsolete.
416
            ifdefs: IFDEFs pragmas.
417
            route: Routes (iterable or string separated by ';')
418
            clkrel: Clock relation.
419
        """
420
        if ifdef:
15✔
UNCOV
421
            warnings.warn(
×
422
                "add_port(..., ifdef=...) is obsolete, please use ifdefs=", category=DeprecationWarning, stacklevel=1
423
            )
424
        doc = doc_from_type(type_, title, descr, comment)
15✔
425
        if direction is None:
15✔
426
            direction = Direction.from_name(name) or IN
15✔
427
        if clkrel is not None:
15✔
428
            clkrel = self._resolve_clkrel(clkrel)
15✔
429
        ifdefs = cast_ifdefs(ifdefs or ifdef)
15✔
430
        rifdefs = resolve_ifdefs(self.defines, ifdefs)
15✔
431
        if rifdefs is None:
15✔
432
            return None
15✔
433
        port = Port(type_, name, direction=direction, doc=doc, ifdefs=rifdefs, clkrel=clkrel)
15✔
434
        if self.__is_locked:
15✔
435
            raise LockError(f"{self}: Cannot add port {name!r}.")
15✔
436
        self.namespace[name] = port
15✔
437
        self.portssignals[name] = port
15✔
438
        self.ports[name] = port
15✔
439
        for routepath in parse_routepaths(route):
15✔
440
            self._router.add(RoutePath(expr=port), routepath)
15✔
441
        return port
15✔
442

443
    def _resolve_clkrel(self, clkrel: str | Port | BaseClkRel) -> BaseClkRel:
15✔
444
        if isinstance(clkrel, BaseClkRel):
15✔
445
            return clkrel
15✔
446
        if isinstance(clkrel, BaseSignal):
15✔
UNCOV
447
            return ClkRel(clk=clkrel)
×
448
        if isinstance(clkrel, str):
15✔
449
            port = self.ports[clkrel]
15✔
450
            return ClkRel(clk=port)
15✔
UNCOV
451
        raise ValueError(f"Invalid {clkrel=}")
×
452

453
    def add_signal(
15✔
454
        self,
455
        type_: BaseType,
456
        name: str,
457
        direction: Orientation = FWD,
458
        title: str | None = None,
459
        descr: str | None = None,
460
        comment: str | None = None,
461
        ifdef: str | None = None,
462
        ifdefs: Ifdefs | str | None = None,
463
        route: Routeables | None = None,
464
        clkrel: str | Port | BaseClkRel | None = None,
465
    ) -> Signal | None:
466
        """
467
        Add Module Internal Signal (:any:`Signal`).
468

469
        Args:
470
            type_: Type.
471
            name: Name.
472

473
        Keyword Args:
474
            direction: Signal Direction. Automatically detected if `name` ends with '_i', '_o', '_io'.
475
            title: Full Spoken Name.
476
            descr: Documentation Description.
477
            comment: Source Code Comment. Default is 'title'
478
            ifdef: IFDEF pragma. Obsolete.
479
            ifdefs: IFDEFs pragmas.
480
            route: Routes (iterable or string separated by ';')
481
            clkrel: Clock relation.
482
        """
483
        if ifdef:
15✔
UNCOV
484
            warnings.warn(
×
485
                "add_signal(..., ifdef=...) is obsolete, please use ifdefs=", category=DeprecationWarning, stacklevel=1
486
            )
487
        doc = doc_from_type(type_, title, descr, comment)
15✔
488
        if clkrel is not None:
15✔
UNCOV
489
            clkrel = self._resolve_clkrel(clkrel)
×
490
        ifdefs = cast_ifdefs(ifdefs or ifdef)
15✔
491
        rifdefs = resolve_ifdefs(self.defines, ifdefs)
15✔
492
        if rifdefs is None:
15✔
UNCOV
493
            return None
×
494
        signal = Signal(type_, name, direction=direction, doc=doc, ifdefs=rifdefs, clkrel=clkrel)
15✔
495
        if self.__is_locked:
15✔
496
            raise LockError(f"{self}: Cannot add signal {name!r}.")
15✔
497
        self.namespace[name] = signal
15✔
498
        self.portssignals[name] = signal
15✔
499
        for routepath in parse_routepaths(route):
15✔
UNCOV
500
            self._router.add(RoutePath(expr=signal), routepath)
×
501
        return signal
15✔
502

503
    def add_port_or_signal(
15✔
504
        self,
505
        type_: BaseType,
506
        name: str,
507
        direction: Direction | None = None,
508
        title: str | None = None,
509
        descr: str | None = None,
510
        comment: str | None = None,
511
        ifdef: str | None = None,
512
        ifdefs: Ifdefs | str | None = None,
513
        route: Routeables | None = None,
514
        clkrel: str | Port | BaseClkRel | None = None,
515
    ) -> BaseSignal | None:
516
        """
517
        Add Module Port (:any:`Port`) or Signal (:any:`Signal`) depending on name.
518

519
        Args:
520
            type_: Type.
521
            name: Name.
522

523
        Keyword Args:
524
            direction: Signal Direction. Automatically detected if `name` ends with '_i', '_o', '_io'.
525
            title: Full Spoken Name.
526
            descr: Documentation Description.
527
            comment: Source Code Comment. Default is 'title'
528
            ifdef: IFDEF pragma. Obsolete.
529
            ifdefs: IFDEFs pragmas.
530
            route: Routes (iterable or string separated by ';')
531
            clkrel: Clock relation.
532
        """
UNCOV
533
        if ifdef:
×
UNCOV
534
            warnings.warn(
×
535
                "add_port_or_signal(..., ifdef=...) is obsolete, please use ifdefs=",
536
                category=DeprecationWarning,
537
                stacklevel=1,
538
            )
UNCOV
539
        ifdefs = cast_ifdefs(ifdefs or ifdef)
×
UNCOV
540
        if direction is None:
×
541
            direction = Direction.from_name(name)
×
542
        if direction is None:
×
543
            return self.add_signal(
×
544
                type_,
545
                name,
546
                title=title,
547
                descr=descr,
548
                comment=comment,
549
                ifdefs=ifdefs,
550
                route=route,
551
                clkrel=clkrel,
552
            )
UNCOV
553
        return self.add_port(
×
554
            type_,
555
            name,
556
            direction=direction,
557
            title=title,
558
            descr=descr,
559
            comment=comment,
560
            ifdefs=ifdefs,
561
            route=route,
562
            clkrel=clkrel,
563
        )
564

565
    def set_parent(self, parent: "BaseMod") -> None:
15✔
566
        """
567
        Set Parent.
568

569
        Do not use this method, until you really really really know what you do.
570
        """
571
        self.__parents.append(parent)
15✔
572

573
    def assign(
15✔
574
        self,
575
        target: Parseable,
576
        source: Parseable | Note,
577
        cast: bool = False,
578
        overwrite: bool = False,
579
    ):
580
        """
581
        Assign `target` to `source`.
582

583
        The assignment is done **without** routing.
584

585
        Args:
586
            target: Target to be driven. Must be known within this module.
587
            source: Source driving target. Must be known within this module.
588

589
        Keyword Args:
590
            cast (bool): Cast. `False` by default.
591
            overwrite (bool): Overwrite existing assignment.
592
            filter_ (str, Callable): Target names or function to filter target identifiers.
593
        """
594
        if self.__is_locked:
15✔
595
            raise LockError(f"{self}: Cannot add assign '{source}' to '{target}'.")
15✔
UNCOV
596
        parser = self.parser
×
UNCOV
597
        assigntarget: BaseSignal = parser.parse(target, only=BaseSignal)  # type: ignore[assignment]
×
598
        assignsource: Source = parser.parse_note(source, only=Source)  # type: ignore[arg-type]
×
599
        self.assigns.set(assigntarget, assignsource, cast=cast, overwrite=overwrite)
×
600

601
    def add_inst(self, inst: "BaseMod") -> None:
15✔
602
        """
603
        Add Submodule `inst`.
604

605
        Args:
606
            inst: Instance.
607
        """
608
        if self.__is_locked:
15✔
609
            raise LockError(f"{self}: Cannot add instance '{inst}'.")
15✔
610
        inst.set_parent(self)
15✔
611
        self.insts.add(inst)  # type: ignore[arg-type]
15✔
612
        assigns = Assigns(targets=inst.ports, sources=self.namespace, drivers=Drivers(), inst=True)
15✔
613
        parser = ExprParser(namespace=inst.ports, context=str(inst))
15✔
614
        self.__instcons[inst.name] = assigns, parser
15✔
615

616
    def get_inst(self, inst_or_name: Union["BaseMod", str]) -> "BaseMod":
15✔
617
        """
618
        Get Module Instance.
619
        """
620
        if not isinstance(inst_or_name, str):
15✔
621
            try:
15✔
622
                return self.insts[inst_or_name.name]
15✔
623
            except KeyError:
15✔
624
                raise ValueError(f"{inst_or_name} is not a sub-module of {self}") from None
15✔
625
        inst = self
15✔
626
        for part in inst_or_name.split("/"):
15✔
627
            if part == UPWARDS:
15✔
628
                if inst.parent is None:
15✔
629
                    raise ValueError(f"{self}: {inst} has no parent.")
15✔
630
                inst = inst.parent
15✔
631
            else:
632
                try:
15✔
633
                    inst = inst.insts.get_dym(part)  # type: ignore[assignment]
15✔
634
                except ValueError as exc:
15✔
635
                    raise ValueError(f"{self} has no sub-module {exc}") from None
15✔
636
        return inst
15✔
637

638
    def set_instcon(
15✔
639
        self,
640
        inst: Union["BaseMod", str],
641
        port: Parseable,
642
        expr: Parseable,
643
        cast: bool = False,
644
        overwrite: bool = False,
645
    ):
646
        """
647
        Connect `port` of `inst` to `expr` without routing.
648

649
        The assignment is done **without** routing.
650

651
        Args:
652
            inst: Module Instance
653
            port: Port to be connected. Must be known within module instance.
654
            expr: Expression. Must be known within this module.
655

656
        Keyword Args:
657
            cast: Cast. `False` by default.
658
            overwrite: Overwrite existing assignment.
659
        """
660
        if self.__is_locked:
15✔
661
            raise LockError(f"{self}: Cannot connect '{port}' of'{inst}' to '{expr}'.")
15✔
662
        mod: BaseMod = self.get_inst(inst)
15✔
663
        assigns, parser = self.__instcons[mod.name]
15✔
664
        assigntarget: BaseSignal = parser.parse(port, only=BaseSignal)  # type: ignore[assignment]
15✔
665
        assignsource: Source = self.parser.parse_note(expr, only=Source)  # type: ignore[arg-type]
15✔
666
        assigns.set(assigntarget, assignsource, cast=cast, overwrite=overwrite)
15✔
667

668
    def get_instcons(self, inst: Union["BaseMod", str]) -> Assigns:
15✔
669
        """Retrieve All Instance Connections Of `inst`."""
670
        mod: BaseMod = self.get_inst(inst)
15✔
671
        return self.__instcons[mod.name][0]
15✔
672

673
    def add_flipflop(
15✔
674
        self,
675
        type_: BaseType,
676
        name: str,
677
        clk: Parseable,
678
        rst_an: Parseable,
679
        nxt: Parseable | None = None,
680
        rst: Parseable | None = None,
681
        ena: Parseable | None = None,
682
        route: Routeables | None = None,
683
    ) -> Signal:
684
        """
685
        Add Module Internal Flip-Flop.
686

687
        Args:
688
            type_: Type.
689
            name: Name.
690
            clk: Clock. Module Clock by default.
691
            rst_an: Reset. Module Reset by default.
692

693
        Keyword Args:
694
            nxt: Next Value. Basename of `name` with _nxt_s by default.
695
            rst: Synchronous Reset.
696
            ena: Enable Condition.
697
            route: Routing of flip-flop output.
698
        """
699
        parser = self.parser
15✔
700
        if self.__is_locked:
15✔
701
            raise LockError(f"{self}: Cannot add flipflop {name!r}.")
15✔
702
        out = self.add_signal(type_, name)
15✔
703
        # clk
704
        clk_sig: BaseSignal = parser.parse(clk, only=BaseSignal)  # type: ignore[assignment]
15✔
705
        # rst_an
706
        rst_an_sig: BaseSignal = parser.parse(rst_an, only=BaseSignal)  # type: ignore[assignment]
15✔
707
        # nxt
708
        if nxt is None:
15✔
709
            nxt = self.add_signal(type_, f"{out.basename}_nxt_s")
15✔
710
        else:
711
            nxt = parser.parse(nxt)
15✔
712
            # TODO: check connectable of nxt and out?
713
        # rst
714
        rst_expr: BoolOp | None = cast_booltype(parser.parse(rst)) if rst is not None else None
15✔
715
        # ena
716
        ena_expr: BoolOp | None = cast_booltype(parser.parse(ena)) if ena is not None else None
15✔
717
        # flipflop
718
        flipflop = self._create_flipflop(clk_sig, rst_an_sig, rst_expr, ena_expr)
15✔
719
        flipflop.set(out, nxt)
15✔
720
        # route
721
        for routepath in parse_routepaths(route):
15✔
UNCOV
722
            self._router.add(RoutePath(expr=out), routepath)
×
723
        return out
15✔
724

725
    def _create_flipflop(
15✔
726
        self,
727
        clk: BaseSignal,
728
        rst_an: BaseSignal,
729
        rst: BoolOp | None = None,
730
        ena: BoolOp | None = None,
731
    ) -> FlipFlop:
732
        flipflops = self.__flipflops
15✔
733
        key = hash((clk, rst_an, rst, ena))
15✔
734
        try:
15✔
735
            return flipflops[key]
15✔
736
        except KeyError:
15✔
737
            pass
15✔
738
        flipflops[key] = flipflop = FlipFlop(
15✔
739
            clk=clk,
740
            rst_an=rst_an,
741
            rst=rst,
742
            ena=ena,
743
            targets=self.portssignals,
744
            sources=self.namespace,
745
            drivers=self.drivers,
746
        )
747
        return flipflop
15✔
748

749
    @property
15✔
750
    def flipflops(self) -> tuple[FlipFlop, ...]:
15✔
751
        """
752
        Flip Flops.
753
        """
754
        return tuple(self.__flipflops.values())
15✔
755

756
    def add_mux(
15✔
757
        self,
758
        name,
759
        title: str | None = None,
760
        descr: str | None = None,
761
        comment: str | None = None,
762
    ) -> Mux:
763
        """
764
        Add Multiplexer with `name` And Return It For Filling.
765

766
        Args:
767
            name (str): Name.
768

769
        Keyword Args:
770
            title (str): Full Spoken Name.
771
            descr (str): Documentation Description.
772
            comment (str): Source Code Comment.
773

774
        See :any:`Mux.set()` how to fill the multiplexer and the example above.
775
        """
776
        if self.__is_locked:
15✔
777
            raise LockError(f"{self}: Cannot add mux {name!r}.")
15✔
778
        doc = Doc(title=title, descr=descr, comment=comment)
15✔
779
        self.__muxes[name] = mux = Mux(
15✔
780
            name=name,
781
            targets=self.portssignals,
782
            namespace=self.namespace,
783
            # drivers=self.drivers,
784
            parser=self.parser,
785
            doc=doc,
786
        )
787
        return mux
15✔
788

789
    @property
15✔
790
    def muxes(self) -> tuple[Mux, ...]:
15✔
791
        """
792
        Iterate over all Multiplexer.
793
        """
794
        return tuple(self.__muxes.values())
15✔
795

796
    def get_mux(self, mux: Mux | str) -> Mux:
15✔
797
        """Get Multiplexer."""
798
        if not isinstance(mux, str):
15✔
799
            return self.__muxes.get_dym(mux.name)  # type: ignore[return-value]
15✔
800
        return self.__muxes.get_dym(mux)  # type: ignore[return-value]
15✔
801

802
    @property
15✔
803
    def is_locked(self) -> bool:
15✔
804
        """
805
        Return If Module Is Already Completed And Locked For Modification.
806

807
        Locking is done by the build process **automatically** and **MUST NOT** be done earlier or later.
808
        Use a different module type or enumeration or struct type, if you have issues with locking.
809
        """
810
        return self.__is_locked
15✔
811

812
    def lock(self):
15✔
813
        """
814
        Lock.
815

816
        Locking is done via this method by the build process **automatically** and **MUST NOT** be done earlier or
817
        later. Use a different module type or enumeration or struct type, if you have issues with locking.
818
        """
819
        if self.__is_locked:
15✔
820
            raise LockError(f"{self} is already locked. Cannot lock again.")
15✔
821
        for _, obj in self:
15✔
822
            if isinstance(obj, Namespace):
15✔
823
                obj.lock(ensure=True)
15✔
824
        self.__is_locked = True
15✔
825

826
    def check_lock(self):
15✔
827
        """Check if module is locked for modifications."""
UNCOV
828
        if self.__is_locked:
×
UNCOV
829
            raise LockError(f"{self}: Is already locked for modifications.")
×
830

831
    def con(self, port: Routeables, source: Routeables, on_error: RoutingError = "error"):
15✔
832
        """Connect `port` to `dest`."""
833
        parents = self.__parents
15✔
834
        if not parents:
15✔
UNCOV
835
            raise TypeError(f"{self} is top module. Connections cannot be made.")
×
836
        router = parents[-1]._router
15✔
837
        for subtarget in parse_routepaths(port, basepath=self.name):
15✔
838
            for subsource in parse_routepaths(source):
15✔
839
                router.add(subtarget, subsource, on_error=on_error)
15✔
840

841
    def route(self, target: Routeables, source: Routeables, on_error: RoutingError = "error"):
15✔
842
        """Route `source` to `target` within the actual module."""
UNCOV
843
        router = self._router
×
UNCOV
844
        for subtarget in parse_routepaths(target):
×
845
            for subsource in parse_routepaths(source):
×
846
                router.add(subtarget, subsource, on_error=on_error)
×
847

848
    def __str__(self):
15✔
849
        modref = self.get_modref()
15✔
850
        defines = ""
15✔
851
        if self.defines:
15✔
852
            definesdict = {define.name: define.value for define in self.defines}
15✔
853
            defines = f" defines={definesdict!r}"
15✔
854
        return f"<{modref}(inst={self.inst!r}, libname={self.libname!r}, modname={self.modname!r}{defines})>"
15✔
855

856
    def __repr__(self):
15✔
857
        return str(self)
15✔
858

859
    def get_overview(self) -> str:
15✔
860
        """
861
        Return Brief Module Overview.
862

863
        This Module Overview is intended to be overwritten by the user.
864
        """
UNCOV
865
        return ""
×
866

867
    def get_info(self, sub: bool = False) -> str:
15✔
868
        """Module Information."""
UNCOV
869
        header = f"## `{self.libname}.{self.modname}` (`{self.get_modref()}`)"
×
UNCOV
870
        parts = [
×
871
            header,
872
            self._get_ident_info("Parameters", self.params),
873
            self._get_ident_info("Ports", self.ports),
874
        ]
UNCOV
875
        if sub:
×
UNCOV
876
            parts.append(self._get_sub_info())
×
877
        return "\n\n".join(parts)
×
878

879
    def _get_ident_info(self, title: str, idents: Idents):
15✔
UNCOV
880
        def entry(level, ident):
×
UNCOV
881
            pre = "  " * level
×
882
            dinfo = f" ({ident.direction})" if ident.direction else ""
×
883
            return (
×
884
                f"{pre}{ident.name}{dinfo}",
885
                f"{pre}{ident.type_}",
886
            )
887

UNCOV
888
        parts = [
×
889
            f"### {title}",
890
            "",
891
        ]
UNCOV
892
        if idents:
×
UNCOV
893
            data = [("Name ", "Type"), ("----", "----")]
×
894
            data += [entry(level, ident) for level, ident in idents.leveliter()]
×
895
            parts.append(align(data, seps=(" | ", " |"), sepfirst="| "))
×
896
        else:
897
            parts.append("-")
×
UNCOV
898
        return "\n".join(parts)
×
899

900
    def _get_sub_info(self) -> str:
15✔
UNCOV
901
        parts = [
×
902
            "### Submodules",
903
            "",
904
        ]
UNCOV
905
        if self.insts:
×
UNCOV
906
            data = [("Name", "Module"), ("----", "------")]
×
907
            data += [(f"`{inst.name}`", f"`{inst.libname}.{inst.modname}`") for inst in self.insts]
×
908
            parts.append(align(data, seps=(" | ", " |"), sepfirst="| "))
×
909
        else:
910
            parts.append("-")
×
UNCOV
911
        return "\n".join(parts)
×
912

913

914
class Router(Object):
15✔
915
    """The One And Only Router."""
916

917
    mod: BaseMod
12✔
918
    __routes: list[tuple[RoutePath, RoutePath, RoutingError]] = PrivateField(default_factory=list)
15✔
919

920
    def add(self, tpath: RoutePath, spath: RoutePath, on_error: RoutingError = "error") -> None:
15✔
921
        """Add route from `source` to `tpath`."""
922
        LOGGER.debug("%s: router: add '%s' to '%s'", self.mod, spath, tpath)
15✔
923
        self.__routes.append(self._create(tpath, spath, on_error))
15✔
924

925
    def flush(self) -> None:
15✔
926
        """Create Pending Routes."""
927
        for tpath, spath, on_error in self.__routes:
15✔
928
            tpathc, spathc, on_errorc = self._create(tpath, spath, on_error)
15✔
929
            try:
15✔
930
                self._route(tpathc, spathc)
15✔
931
            except Exception as exc:
15✔
932
                if on_errorc == "ignore":
15✔
933
                    LOGGER.info("Ignored: %s", exc)
15✔
934
                elif on_errorc == "warn":
15✔
935
                    LOGGER.warning(exc)
15✔
936
                else:
937
                    raise
15✔
938
        self.__routes.clear()
15✔
939

940
    def _create(
15✔
941
        self, tpath: RoutePath, spath: RoutePath, on_error: RoutingError
942
    ) -> tuple[RoutePath, RoutePath, RoutingError]:
943
        if tpath.create:
15✔
944
            if self.__create(spath, tpath):
15✔
945
                tpath = tpath.new(create=False)
15✔
946
        elif spath.create:
15✔
947
            if self.__create(tpath, spath):
15✔
948
                spath = spath.new(create=False)
15✔
949
        return tpath, spath, on_error
15✔
950

951
    @no_type_check  # TODO: fix types
15✔
952
    def __create(self, rpath: RoutePath, cpath: RoutePath) -> bool:
15✔
953
        """Create `cpath` based on `rpath`."""
954
        assert not rpath.create
15✔
955
        assert cpath.create
15✔
956
        mod = self.mod
15✔
957
        # Resolve reference path
958
        try:
15✔
959
            rmod = mod.get_inst(rpath.path) if rpath.path else mod
15✔
960
            rident: Ident = rmod.parser.parse(rpath.expr, only=Ident)  # type: ignore[assignment]
15✔
961
            cmod = mod.get_inst(cpath.path) if cpath.path else mod
15✔
962
        except (ValueError, NameError, KeyError):
15✔
963
            return False
15✔
964
        self.__create_port_or_signal(cmod, rident, cpath.expr)
15✔
965
        return True
15✔
966

967
    @no_type_check  # TODO: fix types
15✔
968
    def _route(self, tpath: RoutePath, spath: RoutePath):  # noqa: C901, PLR0912, PLR0915
15✔
969
        mod = self.mod
15✔
970
        LOGGER.debug("%s router: routing %r to %r", mod, spath, tpath)
15✔
971
        # Referenced modules
972
        tmod = mod.get_inst(tpath.path) if tpath.path else mod
15✔
973
        smod = mod.get_inst(spath.path) if spath.path else mod
15✔
974
        # Referenced expression/signal
975
        texpr = tmod.parser.parse(tpath.expr) if not isinstance(tpath.expr, Note) else tpath.expr
15✔
976
        sexpr = smod.parser.parse(spath.expr) if not isinstance(spath.expr, Note) else spath.expr
15✔
977
        tident = None if not isinstance(texpr, Ident) else texpr
15✔
978
        sident = None if not isinstance(sexpr, Ident) else sexpr
15✔
979
        tparts = tpath.parts
15✔
980
        sparts = spath.parts
15✔
981
        # One of the both sides need to exist
982
        rident = tident or sident
15✔
983
        assert rident is not None
15✔
984
        direction = (
15✔
985
            tident.direction * sident.direction
986
            if tident and tident.direction is not None and sident and sident.direction is not None
987
            else rident.direction
988
        )
989

990
        cast = _merge_cast(tpath.cast, spath.cast)
15✔
991
        assert len(tparts) in (0, 1)
15✔
992
        assert len(sparts) in (0, 1)
15✔
993
        if tparts:
15✔
994
            # target is submodule
995
            assert tparts[0] != UPWARDS
15✔
996
            if sparts:
15✔
997
                assert sparts[0] != UPWARDS
15✔
998
                # source and target are submodules
999
                tcon = None if tident is None else mod.get_instcons(tmod).get(tident)
15✔
1000
                scon = None if sident is None else mod.get_instcons(smod).get(sident)
15✔
1001
                if tcon is None and scon is None:
15✔
1002
                    modname = split_prefix(tmod.name)[1]
15✔
1003
                    name = join_names(modname, rident.name, "s")
15✔
1004
                    rsig = mod.add_signal(rident.type_, name, ifdefs=rident.ifdefs, direction=direction)
15✔
1005
                    Router.__routesubmod(mod, tmod, rident, texpr, rsig, tname=tpath.expr, cast=tpath.cast)
15✔
1006
                    Router.__routesubmod(mod, smod, rident, sexpr, rsig, tname=spath.expr, cast=spath.cast)
15✔
UNCOV
1007
                elif tcon is None:
×
UNCOV
1008
                    Router.__routesubmod(mod, tmod, rident, texpr, scon, tname=tpath.expr, cast=tpath.cast)
×
1009
                elif scon is None:
×
1010
                    Router.__routesubmod(mod, smod, rident, sexpr, tcon, tname=spath.expr, cast=spath.cast)
×
1011
                else:
1012
                    mod.assign(tcon, scon, cast=cast)
×
1013
            else:
1014
                tcon = None if tident is None else mod.get_instcons(tmod).get(tident)
15✔
1015
                if tcon is None:
15✔
1016
                    Router.__routesubmod(mod, tmod, rident, texpr, sexpr, tpath.expr, spath.expr, cast=cast)
15✔
1017
                else:
UNCOV
1018
                    if sexpr is None:
×
UNCOV
1019
                        sexpr = Router.__create_port_or_signal(mod, rident, spath.expr)
×
1020
                    mod.assign(tcon, sexpr, cast=cast)
×
1021
        elif sparts:
15✔
1022
            scon = None if sident is None else mod.get_instcons(smod).get(sident)
15✔
1023
            assert sparts[0] != UPWARDS
15✔
1024
            if scon is None:
15✔
1025
                Router.__routesubmod(mod, smod, rident, sexpr, texpr, spath.expr, tpath.expr, cast=cast)
15✔
1026
            else:
UNCOV
1027
                if texpr is None:
×
UNCOV
1028
                    texpr = Router.__create_port_or_signal(mod, rident, tpath.expr)
×
1029
                mod.assign(scon, texpr, cast=cast)
×
1030
        else:
1031
            # connect signals of `mod`
UNCOV
1032
            if texpr is None:
×
UNCOV
1033
                texpr = Router.__create_port_or_signal(mod, rident, tpath.expr)
×
1034
            if sexpr is None:
×
1035
                sexpr = Router.__create_port_or_signal(mod, rident, spath.expr)
×
1036
            mod.assign(texpr, sexpr, cast=cast)
×
1037

1038
    @staticmethod
15✔
1039
    def __routesubmod(
15✔
1040
        mod: BaseMod, submod: BaseMod, rident: Ident, texpr, sexpr, tname=None, sname=None, cast=False
1041
    ) -> Expr:
1042
        if texpr is None:
15✔
UNCOV
1043
            assert tname is not None
×
UNCOV
1044
            assert rident is not None and rident.type_
×
1045
            texpr = submod.add_port(rident.type_, tname, ifdefs=rident.ifdefs)
×
1046
        if sexpr is None:
15✔
1047
            sexpr = Router.__create_port_or_signal(mod, rident, sname)
×
1048
        if not isinstance(texpr, Port):
15✔
1049
            raise ValueError(f"Cannot route {type(texpr)} to module instance {submod}")
×
1050
        try:
15✔
1051
            mod.set_instcon(submod, texpr, sexpr, cast=cast)
15✔
1052
        except TypeError as err:
15✔
1053
            raise TypeError(f"{mod}: {err}") from None
15✔
1054
        return sexpr
15✔
1055

1056
    @staticmethod
15✔
1057
    def __create_port_or_signal(mod: BaseMod, rident: Ident, name: str) -> BaseSignal:
15✔
1058
        assert isinstance(rident, Ident)
15✔
1059
        assert name is not None
15✔
1060
        assert rident is not None and rident.type_ is not None, (mod, name)
15✔
1061
        type_ = rident.type_
15✔
1062
        direction = Direction.from_name(name)
15✔
1063
        signal: BaseSignal
1064
        if direction is not None:
15✔
1065
            signal = mod.add_port(type_, name, ifdefs=rident.ifdefs, direction=direction)
15✔
1066
        else:
1067
            signal = mod.add_signal(type_, name, ifdefs=rident.ifdefs)
15✔
1068
        LOGGER.debug("%s: router: creating %r", mod, signal)
15✔
1069
        return signal
15✔
1070

1071

1072
def _merge_cast(one, other):
15✔
1073
    # TODO: get rid of this.
1074
    if one or other:
15✔
UNCOV
1075
        return True
×
1076
    if one is None or other is None:
15✔
1077
        return None
×
1078
    return False
15✔
1079

1080

1081
ModCls: TypeAlias = type[BaseMod]
15✔
1082
ModClss: TypeAlias = set[type[BaseMod]]
15✔
1083

1084

1085
def get_modbaseclss(cls):
15✔
1086
    """Get Module Base Classes."""
1087
    clss = []
15✔
1088
    for basecls in getmro(cls):
15✔
1089
        if basecls is BaseMod:
15✔
1090
            break
15✔
1091
        clss.append(basecls)
15✔
1092
    return clss
15✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc