• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

apowers313 / roc / 10463305481

20 Aug 2024 12:41AM UTC coverage: 93.864% (-0.01%) from 93.875%
10463305481

push

github

apowers313
refactor saliency map to use new grid

19 of 19 new or added lines in 1 file covered. (100.0%)

7 existing lines in 2 files now uncovered.

1744 of 1858 relevant lines covered (93.86%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.7
/roc/component.py
1
"""This module defines the Component base class, which is instantiated for
1✔
2
nearly every part of the system. It implements interfaces for communications,
3
initialization, shutdown, etc.
4
"""
5

6
from __future__ import annotations
1✔
7

8
# import traceback
9
from abc import ABC
1✔
10
from typing import TYPE_CHECKING, Any, TypeVar, cast
1✔
11
from weakref import WeakSet
1✔
12

13
from typing_extensions import Self
1✔
14

15
from .config import Config
1✔
16
from .logger import logger
1✔
17

18
if TYPE_CHECKING:
1✔
19
    from .event import BusConnection, Event, EventBus
×
20

21
loaded_components: dict[str, Component] = {}
1✔
22
component_set: WeakSet[Component] = WeakSet()
1✔
23

24
T = TypeVar("T")
1✔
25

26

27
class Component(ABC):
1✔
28
    """An abstract component class for building pieces of ROC that will talk to each other."""
1✔
29

30
    name: str = "<name unassigned>"
1✔
31
    type: str = "<type unassigned>"
1✔
32

33
    def __init__(self) -> None:
1✔
34
        global component_set
35
        component_set.add(self)
1✔
36
        self.bus_conns: dict[str, BusConnection[Any]] = {}
1✔
37
        logger.trace(f"++ incrementing component count: {self.name}:{self.type} {self}")
1✔
38
        # traceback.print_stack()
39

40
    def __del__(self) -> None:
1✔
41
        global component_set
42
        component_set.add(self)
1✔
43
        logger.trace(f"-- decrementing component count: {self.name}:{self.type} {self}")
1✔
44

45
    def connect_bus(self, bus: EventBus[T]) -> BusConnection[T]:
1✔
46
        """Create a new bus connection for the component, storing the result for
47
        later shutdown.
48

49
        Args:
50
            bus (EventBus[T]): The event bus to attach to
51

52
        Raises:
53
            ValueError: if the bus has already been connected to by this component
54

55
        Returns:
56
            BusConnection[T]: The bus connection for listening or sending events
57
        """
58
        if bus.name in self.bus_conns:
1✔
59
            raise ValueError(
×
60
                f"Component '{self.name}' attempting duplicate connection to bus '{bus.name}'"
61
            )
62

63
        conn = bus.connect(self)
1✔
64
        self.bus_conns[bus.name] = conn
1✔
65
        return conn
1✔
66

67
    def event_filter(self, e: Event[Any]) -> bool:
1✔
68
        """A filter for any incoming events. By default it filters out events
69
        sent by itself, but it is especially useful for creating new filters in
70
        sub-classes.
71

72
        Args:
73
            e (Event[Any]): The event to be evaluated
74

75
        Returns:
76
            bool: True if the event should be sent, False if it should be dropped
77
        """
78
        return e.src is not self
1✔
79

80
    def shutdown(self) -> None:
1✔
81
        """De-initializes the component, removing any bus connections and any
82
        other clean-up that needs to be performed
83
        """
84
        logger.debug(f"Component {self.name}:{self.type} shutting down.")
1✔
85

86
        for conn in self.bus_conns:
1✔
87
            for obs in self.bus_conns[conn].attached_bus.subject.observers:
1✔
88
                obs.on_completed()
1✔
89

90
    @staticmethod
1✔
91
    def init() -> None:
1✔
92
        """Loads all components registered as `auto` and perception components
93
        in the `perception_components` config field.
94
        """
95
        settings = Config.get()
1✔
96
        component_list = default_components
1✔
97
        logger.debug("perception components from settings", settings.perception_components)
1✔
98
        component_list = component_list.union(settings.perception_components, default_components)
1✔
99
        logger.debug(f"Component.init: default components: {component_list}")
1✔
100

101
        # TODO: shutdown previously loaded components
102

103
        for reg_str in component_list:
1✔
104
            logger.trace(f"Loading component: {reg_str} ...")
1✔
105
            (name, type) = reg_str.split(":")
1✔
106
            loaded_components[reg_str] = Component.get(name, type)
1✔
107

108
    @classmethod
1✔
109
    def get(cls, name: str, type: str, *args: Any, **kwargs: Any) -> Self:
1✔
110
        """Retreives a component with the specified name from the registry and
111
        creates a new version of it with the specified args. Used by
112
        `Config.init` and for testing.
113

114
        Args:
115
            name (str): The name of the component to get, as specified during
116
                its registration
117
            type (str): The type of the component to get, as specified during
118
                its registration
119
            args (Any): Fixed position arguments to pass to the Component
120
                constructor
121
            kwargs (Any): Keyword args to pass to the Component constructor
122

123
        Returns:
124
            Self: the component that was created, casted as the calling class.
125
            (e.g. `Perception.get(...)` will return a Perception component and
126
            `Action.get(...)` will return an Action component)
127
        """
128
        reg_str = _component_registry_key(name, type)
1✔
129
        return cast(Self, component_registry[reg_str](*args, **kwargs))
1✔
130

131
    @staticmethod
1✔
132
    def get_component_count() -> int:
1✔
133
        """Returns the number of currently created Components. The number goes
134
        up on __init__ and down on __del__. Primarily used for testing to ensure
135
        Components are being shutdown appropriately.
136

137
        Returns:
138
            int: The number of currently active Component instances
139
        """
140
        # global component_count
141
        # return component_count
142
        global component_set
143
        return len(component_set)
1✔
144

145
    @staticmethod
1✔
146
    def get_loaded_components() -> list[str]:
1✔
147
        """Returns the names and types of all initiated components.
148

149
        Returns:
150
            list[str]: A list of the names and types of components, as strings.
151
        """
152
        global loaded_components
UNCOV
153
        return [s for s in loaded_components.keys()]
×
154

155
    @staticmethod
1✔
156
    def deregister(name: str, type: str) -> None:
1✔
157
        """Removes a component from the Component registry. Primarlly used for testing.
158

159
        Args:
160
            name (str): The name of the Component to deregister
161
            type (str): The type of the Component to deregister
162
        """
163
        reg_str = _component_registry_key(name, type)
1✔
164
        del component_registry[reg_str]
1✔
165

166
    @staticmethod
1✔
167
    def reset() -> None:
1✔
168
        """Shuts down all components"""
169
        # shutdown all components
170
        global loaded_components
171
        for name in loaded_components:
1✔
172
            logger.trace(f"Shutting down component: {name}.")
1✔
173
            c = loaded_components[name]
1✔
174
            c.shutdown()
1✔
175

176
        loaded_components.clear()
1✔
177

178
        global component_set
179
        for c in component_set:
1✔
180
            c.shutdown()
1✔
181

182

183
WrappedComponentBase = TypeVar("WrappedComponentBase", bound=Component)
1✔
184
component_registry: dict[str, type[Component]] = {}
1✔
185
default_components: set[str] = set()
1✔
186

187

188
def _component_registry_key(name: str, type: str) -> str:
1✔
189
    return f"{name}:{type}"
1✔
190

191

192
class register_component:
1✔
193
    """A decorator to register a new component."""
1✔
194

195
    def __init__(self, name: str, type: str, *, auto: bool = False) -> None:
1✔
196
        self.name = name
1✔
197
        self.type = type
1✔
198
        self.auto = auto
1✔
199

200
    def __call__(self, cls: type[Component]) -> type[Component]:  # noqa: D102
1✔
201
        global register_component
202
        global component_registry
203

204
        logger.trace(f"Registering component: {self.name}:{self.type} (auto={self.auto})")
1✔
205

206
        reg_str = _component_registry_key(self.name, self.type)
1✔
207
        if reg_str in component_registry:
1✔
UNCOV
208
            raise ValueError(f"Registering duplicate component name: '{self.name}'")
×
209

210
        if self.auto:
1✔
211
            global default_components
212
            default_components.add(reg_str)
1✔
213

214
        component_registry[reg_str] = cls
1✔
215
        cls.name = self.name
1✔
216
        cls.type = self.type
1✔
217

218
        return cls
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc