• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / SpiNNMan / 6574804013

19 Oct 2023 12:47PM UTC coverage: 51.937% (+1.2%) from 50.777%
6574804013

Pull #327

github

Christian-B
typing changes
Pull Request #327: Type Annotations and Checking

105 of 1288 branches covered (0.0%)

Branch coverage included in aggregate %.

2375 of 2375 new or added lines in 180 files covered. (100.0%)

4775 of 8108 relevant lines covered (58.89%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

33.9
/spinnman/connections/connection_listener.py
1
# Copyright (c) 2015 The University of Manchester
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     https://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
from __future__ import annotations
1✔
15
import logging
1✔
16
from threading import Thread
1✔
17
from typing import Callable, Generic, List, TypeVar
1✔
18
from concurrent.futures import ThreadPoolExecutor, Future
1✔
19
from spinn_utilities.log import FormatAdapter
1✔
20
from spinnman.exceptions import SpinnmanEOFException
1✔
21
from spinnman.connections.abstract_classes import Listenable
1✔
22

23
#: :meta private:
24
T = TypeVar("T")
1✔
25
logger = FormatAdapter(logging.getLogger(__name__))
1✔
26
_POOL_SIZE = 4
1✔
27
_TIMEOUT = 1
1✔
28

29

30
class ConnectionListener(Thread, Generic[T]):
1✔
31
    """
32
    Thread that listens to a connection and calls callbacks with new
33
    messages when they arrive.
34
    """
35
    __slots__ = (
1✔
36
        "__callback_pool",
37
        "__callbacks",
38
        "__connection",
39
        "__done",
40
        "__timeout")
41

42
    def __init__(self, connection: Listenable[T],
1✔
43
                 n_processes: int = _POOL_SIZE, timeout: float = _TIMEOUT):
44
        """
45
        :param Listenable connection: A connection to listen to
46
        :param int n_processes:
47
            The number of threads to use when calling callbacks
48
        :param float timeout:
49
            How long to wait for messages before checking to see if the
50
            connection is to be terminated.
51
        """
52
        super().__init__(
×
53
            name=f"Connection listener for connection {connection}")
54
        self.daemon = True
×
55
        self.__connection = connection
×
56
        self.__timeout = timeout
×
57
        self.__callback_pool = ThreadPoolExecutor(max_workers=n_processes)
×
58
        self.__done = False
×
59
        self.__callbacks: List[Callable[[T], None]] = []
×
60

61
    def __run_step(self, handler: Callable[[], T]):
1✔
62
        """
63
        :param ~collections.abc.Callable handler:
64
        """
65
        if self.__connection.is_ready_to_receive(timeout=self.__timeout):
×
66
            message = handler()
×
67
            for callback in self.__callbacks:
×
68
                future = self.__callback_pool.submit(
×
69
                    callback, message)
70
                future.add_done_callback(self.__done_callback)
×
71

72
    def __done_callback(self, future: Future[None]):
1✔
73
        """
74
        :param ~concurrent.futures.Future future:
75
        """
76
        try:
×
77
            future.result()
×
78
        except Exception:  # pylint: disable=broad-except
×
79
            logger.exception("problem in listener call")
×
80

81
    def run(self) -> None:
1✔
82
        """
83
        Implements the listening thread.
84
        """
85
        with self.__callback_pool:
×
86
            handler = self.__connection.get_receive_method()
×
87
            while not self.__done:
×
88
                try:
×
89
                    self.__run_step(handler)
×
90
                except SpinnmanEOFException:
×
91
                    self.__done = True
×
92
                except Exception:  # pylint: disable=broad-except
×
93
                    if not self.__done:
×
94
                        logger.warning("problem when dispatching message",
×
95
                                       exc_info=True)
96

97
    def add_callback(self, callback: Callable[[T], None]):
1✔
98
        """
99
        Add a callback to be called when a message is received.
100

101
        :param ~collections.abc.Callable callback:
102
            A callable which takes a single parameter, which is the message
103
            received; the result of the callback will be ignored.
104
        """
105
        self.__callbacks.append(callback)
×
106

107
    def close(self) -> None:
1✔
108
        """
109
        Closes the listener.
110

111
        .. note::
112
            This does not close the provider of the messages; this instead
113
            marks the listener as closed.  The listener will not truly stop
114
            until the get message call returns.
115
        """
116
        self.__done = True
×
117
        self.join()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc