• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SpiNNakerManchester / SpiNNMan / 6574804013

19 Oct 2023 12:47PM UTC coverage: 51.937% (+1.2%) from 50.777%
6574804013

Pull #327

github

Christian-B
typing changes
Pull Request #327: Type Annotations and Checking

105 of 1288 branches covered (0.0%)

Branch coverage included in aggregate %.

2375 of 2375 new or added lines in 180 files covered. (100.0%)

4775 of 8108 relevant lines covered (58.89%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.76
/spinnman/spalloc/spalloc_boot_connection.py
1
# Copyright (c) 2022 The University of Manchester
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     https://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
"""
1✔
15
API of the client for the Spalloc web service.
16
"""
17

18
import struct
1✔
19
import time
1✔
20
from typing import Callable, Optional
1✔
21
from spinn_utilities.abstract_base import AbstractBase
1✔
22
from spinn_utilities.overrides import overrides
1✔
23
from spinnman.connections.abstract_classes import Listenable
1✔
24
from spinnman.connections.udp_packet_connections import BootConnection
1✔
25
from spinnman.connections.udp_packet_connections.boot_connection import (
1✔
26
    _ANTI_FLOOD_DELAY)
27
from spinnman.messages.spinnaker_boot import SpinnakerBootMessage
1✔
28
from .spalloc_proxied_connection import SpallocProxiedConnection
1✔
29

30
_ONE_SHORT = struct.Struct("<H")
1✔
31
_TWO_SHORTS = struct.Struct("<2H")
1✔
32
_TWO_SKIP: bytes = b'\0\0'
1✔
33
_NUM_UPDATE_TAG_TRIES = 3
1✔
34
_UPDATE_TAG_TIMEOUT = 1.0
1✔
35

36

37
class SpallocBootConnection(
1✔
38
        BootConnection, SpallocProxiedConnection,
39
        Listenable[SpinnakerBootMessage], metaclass=AbstractBase):
40
    """
41
    The socket interface supported by proxied boot sockets. The socket will
42
    always be talking to the root board of a job.
43
    This emulates a BootConnection.
44
    """
45
    __slots__ = ()
1✔
46

47
    @overrides(BootConnection.send_boot_message)
1✔
48
    def send_boot_message(self, boot_message: SpinnakerBootMessage):
1✔
49
        self.send(boot_message.bytestring)
×
50

51
        # Sleep between messages to avoid flooding the machine
52
        time.sleep(_ANTI_FLOOD_DELAY)
×
53

54
    @overrides(BootConnection.receive_boot_message)
1✔
55
    def receive_boot_message(
1✔
56
            self, timeout: Optional[float] = None) -> SpinnakerBootMessage:
57
        data = self.receive(timeout)
×
58
        return SpinnakerBootMessage.from_bytestring(data, 0)
×
59

60
    @overrides(Listenable.get_receive_method)
1✔
61
    def get_receive_method(  # type: ignore[override]
1✔
62
            self) -> Callable[[], SpinnakerBootMessage]:
63
        return self.receive_boot_message
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc