• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ChristianTremblay / BAC0 / 10785693407

10 Sep 2024 04:18AM UTC coverage: 40.293% (-7.1%) from 47.384%
10785693407

push

github

ChristianTremblay
coovers empty list of discovered devices

0 of 27 new or added lines in 1 file covered. (0.0%)

1874 existing lines in 29 files now uncovered.

2144 of 5321 relevant lines covered (40.29%)

0.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.35
/BAC0/scripts/Base.py
1
#!/usr/bin/python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright (C) 2015 by Christian Tremblay, P.Eng <christian.tremblay@servisys.com>
5
# Licensed under LGPLv3, see file LICENSE in this source tree.
6
#
7
"""
8
Doc here
9
"""
10
import asyncio
1✔
11
import random
1✔
12
import sys
1✔
13
import typing as t
1✔
14
from collections import defaultdict
1✔
15

16
# --- standard Python modules ---
17
from bacpypes3.basetypes import DeviceStatus, HostNPort, ObjectTypesSupported
1✔
18
from bacpypes3.json.util import sequence_to_json
1✔
19
from bacpypes3.local.device import DeviceObject
1✔
20
from bacpypes3.local.networkport import NetworkPortObject
1✔
21
from bacpypes3.pdu import Address
1✔
22
from bacpypes3.primitivedata import CharacterString
1✔
23
from bacpypes3.vendor import VendorInfo, get_vendor_info
1✔
24

25
# --- this application's modules ---
26
from ..core.app.asyncApp import (
1✔
27
    BAC0Application,
28
)  # BAC0BBMDDeviceApplication,; BAC0ForeignDeviceApplication,
29
from ..core.functions.GetIPAddr import validate_ip_address
1✔
30
from ..core.functions.TimeSync import TimeHandler
1✔
31
from ..core.io.IOExceptions import InitializationError, UnknownObjectError
1✔
32
from ..core.utils.notes import note_and_log
1✔
33
from ..tasks.TaskManager import stopAllTasks
1✔
34

35
# --- 3rd party modules ---
36

37
# ------------------------------------------------------------------------------
38

39

40
@note_and_log
1✔
41
class LocalObjects(object):
1✔
42
    def __init__(self, device):
1✔
43
        self.device = device
1✔
44

45
    def __getitem__(self, obj):
1✔
UNCOV
46
        item = None
×
UNCOV
47
        if isinstance(obj, tuple):
×
UNCOV
48
            obj_type, instance = obj
×
UNCOV
49
            item = self.device.this_application.app.get_object_id((obj_type, instance))
×
UNCOV
50
        elif isinstance(obj, str):
×
UNCOV
51
            name = obj
×
UNCOV
52
            item = self.device.this_application.app.get_object_name(name)
×
UNCOV
53
        if item is None:
×
UNCOV
54
            raise UnknownObjectError(f"Can't find {obj} in local device")
×
55
        else:
UNCOV
56
            return item
×
57

58

59
def charstring(val):
1✔
60
    return CharacterString(val) if isinstance(val, str) else val
1✔
61

62

63
@note_and_log
1✔
64
class Base:
1✔
65
    """
66
    Build a running BACnet/IP device that accepts WhoIs and IAm requests
67
    Initialization requires some minimial information about the local device.
68

69
    :param localIPAddr='127.0.0.1':
70
    :param localObjName='BAC0':
71
    :param deviceId=None:
72
    :param maxAPDULengthAccepted='1024':
73
    :param maxSegmentsAccepted='1024':
74
    :param segmentationSupported='segmentedBoth':
75
    """
76

77
    _used_ips: t.Set[Address] = set()
1✔
78

79
    def __init__(
1✔
80
        self,
81
        localIPAddr: str = "127.0.0.1",
82
        networkNumber: int = None,
83
        localObjName: str = "BAC0",
84
        deviceId: int = None,
85
        firmwareRevision: str = "".join(sys.version.split("|")[:2]),
86
        maxAPDULengthAccepted: str = "1024",
87
        maxSegmentsAccepted: str = "1024",
88
        segmentationSupported: str = "segmentedBoth",
89
        bbmdAddress: str = None,
90
        bbmdTTL: int = 0,
91
        bdtable: list = None,
92
        modelName: str = "BAC0 Scripting Tool",
93
        vendorId: int = 842,
94
        vendorName: str = "SERVISYS inc.",
95
        description: str = "http://christiantremblay.github.io/BAC0/",
96
        location: str = "Bromont, Québec",
97
        timezone: str = "America/Montreal",
98
        json_file: str = None,
99
    ):
100
        self.log("Configurating app", level="debug")
1✔
101

102
        # Register Servisys
103
        try:
1✔
104
            _BAC0_vendor = VendorInfo(vendorId)
1✔
105
        except RuntimeError:
1✔
106
            pass  # we are re-running the script... forgive us
1✔
107
            _BAC0_vendor = get_vendor_info(vendorId)
1✔
108
        _BAC0_vendor.register_object_class(
1✔
109
            ObjectTypesSupported.networkPort, NetworkPortObject
110
        )
111
        _BAC0_vendor.register_object_class(ObjectTypesSupported.device, DeviceObject)
1✔
112

113
        self.timehandler = TimeHandler(tz=timezone)
1✔
114

115
        self.response = None
1✔
116
        self._initialized = False
1✔
117
        self._started = False
1✔
118
        self._stopped = False
1✔
119

120
        if localIPAddr in Base._used_ips:
1✔
UNCOV
121
            raise InitializationError(
×
122
                "IP Address provided ({}) already used by BAC0. Check if another software is using port 47808 on this network interface. If so, you can define multiple IP per interface. Or specify another IP using BAC0.lite(ip='IP/mask')".format(
123
                    localIPAddr
124
                )
125
            )
126

127
        if validate_ip_address(localIPAddr):
1✔
128
            self.localIPAddr = localIPAddr
1✔
129
        else:
UNCOV
130
            raise InitializationError(
×
131
                "IP Address provided ({}) invalid. Check if another software is using port 47808 on this network interface. If so, you can define multiple IP per interface. Or specify another IP using BAC0.lite(ip='IP/mask')".format(
132
                    localIPAddr
133
                )
134
            )
135
        self.networkNumber = networkNumber
1✔
136

137
        self.Boid = (
1✔
138
            int(deviceId) if deviceId else (3056177 + int(random.uniform(0, 1000)))
139
        )
140

141
        self.segmentationSupported = segmentationSupported
1✔
142
        self.maxSegmentsAccepted = maxSegmentsAccepted
1✔
143
        self.localObjName = localObjName
1✔
144
        self.local_objects = LocalObjects(device=self)
1✔
145

146
        self.maxAPDULengthAccepted = maxAPDULengthAccepted
1✔
147
        self.vendorId = vendorId
1✔
148
        self.vendorName = charstring(vendorName)
1✔
149
        self.modelName = charstring(modelName)
1✔
150
        self.description = charstring(description)
1✔
151
        self.location = charstring(location)
1✔
152

153
        self.discoveredDevices: t.Optional[t.Dict[t.Tuple[str, int], int]] = None
1✔
154
        self.systemStatus = DeviceStatus(1)
1✔
155

156
        self.bbmdAddress = bbmdAddress
1✔
157
        self.bbmdTTL = bbmdTTL
1✔
158
        self.bdtable = bdtable
1✔
159

160
        self.firmwareRevision = firmwareRevision
1✔
161
        self._ric = {}
1✔
162
        self.subscription_contexts = {}
1✔
163
        self.database = None
1✔
164
        self.json_file = json_file
1✔
165

166
        try:
1✔
167
            self.startApp()
1✔
UNCOV
168
        except InitializationError as error:
×
UNCOV
169
            raise InitializationError(
×
170
                f"Gros probleme : {error}. Address requested : {localIPAddr}"
171
            )
172

173
    def startApp(self):
1✔
174
        """
175
        Define the local device, including services supported.
176
        Once defined, start the BACnet stack in its own thread.
177
        """
178
        self.log("Create Local Device", level="debug")
1✔
179
        try:
1✔
180
            app_type = "BACnet/IP App"
1✔
181

182
            class config(defaultdict):
1✔
183
                "Simple class to mimic args dot retrieval"
184

185
                def __init__(self, cfg):
1✔
186
                    for k, v in cfg.items():
1✔
187
                        self[k] = v
1✔
188

189
                def __getattr__(self, key):
1✔
UNCOV
190
                    return self[key]
×
191

192
            if self.bbmdAddress is not None:
1✔
UNCOV
193
                mode = "foreign"
×
194
            elif self.bdtable:
1✔
UNCOV
195
                mode = "bbmd"
×
196
            else:
197
                mode = "normal"
1✔
198
            cfg = {
1✔
199
                "BAC0": {
200
                    "bbmdAddress": self.bbmdAddress,
201
                    "bdt": self.bdtable,
202
                    "ttl": self.bbmdTTL,
203
                },
204
                "device": {
205
                    "object-name": self.localObjName,
206
                    # "firmware-revision": self.firmwareRevision,
207
                    "vendor-identifier": self.vendorId,
208
                    "vendor-name": "Servisys inc.",
209
                    "object-identifier": f"device,{self.Boid}",
210
                    "object-list": [f"device,{self.Boid}", "network-port,1"],
211
                    "model-name": self.modelName,
212
                    # "max-apdu-length-accepted": self.maxAPDULengthAccepted,
213
                    # "max-segments-accepted": self.maxSegmentsAccepted,
214
                    # "location": self.location,
215
                    # "description": self.description
216
                },
217
                "network-port": {
218
                    "ip-address": str(self.localIPAddr),
219
                    "ip-subnet-mask": str(self.localIPAddr.netmask),
220
                    "bacnet-ip-udp-port": self.localIPAddr.addrPort,
221
                    "network-number": None,
222
                    "fd-bbmd-address": sequence_to_json(HostNPort(self.bbmdAddress)),
223
                    "fd-subscription-lifetime": self.bbmdTTL,
224
                    "bacnet-ip-mode": mode,
225
                },
226
            }
227
            if mode == "bbmd":
1✔
228
                # bdt_json_seq = [f"BDTEntry({addr})" for addr in self.bdtable]
UNCOV
229
                cfg["network-port"]["bbmdBroadcastDistributionTable"] = self.bdtable
×
230

231
            _cfg = config(cfg)
1✔
232

233
            self.this_application = BAC0Application(
1✔
234
                _cfg, self.localIPAddr, json_file=self.json_file
235
            )
236
            if mode == "bbmd":
1✔
UNCOV
237
                self._log.info(f"Populating BDT with {self.bdtable}")
×
UNCOV
238
                self.this_application.populate_bdt()
×
239

240
            if mode == "foreign":
1✔
UNCOV
241
                self._log.info(
×
242
                    f"Registering as a foreign device to host {self.bbmdAddress} for {self.bbmdTTL} seconds"
243
                )
UNCOV
244
                self.this_application.register_as_foreign_device_to(
×
245
                    host=self.bbmdAddress, lifetime=self.bbmdTTL
246
                )
247

248
            self.log("Starting", level="debug")
1✔
249
            self._initialized = True
1✔
250

251
            try:
1✔
252
                Base._used_ips.add(self.localIPAddr)
1✔
253
                self.log(f"Registered as {app_type} | mode {mode}", level="info")
1✔
254
                self._started = True
1✔
UNCOV
255
            except OSError as error:
×
UNCOV
256
                self.log(f"Error opening socket: {error}", level="warning")
×
UNCOV
257
                raise InitializationError(f"Error opening socket: {error}")
×
258
            self.log("Running", level="debug")
1✔
259
        except OSError as error:
×
UNCOV
260
            self.log(f"an error has occurred: {error}", level="error")
×
UNCOV
261
            raise InitializationError(f"Error starting app: {error}")
×
262
            self.log("finally", level="debug")
263

264
    def register_foreign_device(self, addr=None, ttl=0):
1✔
265
        # self.this_application.register_to_bbmd(addr, ttl)
UNCOV
266
        raise NotImplementedError()
×
267

268
    def unregister_foreign_device(self):
1✔
UNCOV
269
        self.this_application.unregister_from_bbmd()
×
270

271
    def disconnect(self) -> asyncio.Task:
1✔
UNCOV
272
        task = asyncio.create_task(self._disconnect())
×
UNCOV
273
        return task
×
274

275
    async def _disconnect(self):
1✔
276
        """
277
        Stop the BACnet stack.  Free the IP socket.
278
        """
279
        self.log("Stopping All running tasks", level="debug")
1✔
280
        await stopAllTasks()
1✔
281
        self.log("Stopping BACnet stack", level="debug")
1✔
282
        # Freeing socket
283
        self.this_application.app.close()
1✔
284

285
        self._stopped = True  # Stop stack thread
1✔
286
        # self.t.join()
287
        self._started = False
1✔
288
        Base._used_ips.discard(self.localIPAddr)
1✔
289
        self.log("BACnet stopped", level="info")
1✔
290

291
    @property
1✔
292
    def routing_table(self):
1✔
293
        """
294
        Routing Table will give all the details about routers and how they
295
        connect BACnet networks together.
296

297
        It's a decoded presentation of what bacpypes.router_info_cache contains.
298

299
        Returns a dict with the address of routers as key.
300
        """
301

UNCOV
302
        class Router:
×
UNCOV
303
            def __init__(self, snet, address, dnets, path=None):
×
UNCOV
304
                self.source_network: int = snet
×
305
                self.address: Address = address
×
306
                self.destination_networks: set = dnets
×
307
                self.path: list = path
×
308

UNCOV
309
            def __repr__(self):
×
UNCOV
310
                return "Source Network: {} | Address: {} | Destination Networks: {} | Path: {}".format(
×
311
                    self.source_network,
312
                    self.address,
313
                    self.destination_networks,
314
                    self.path,
315
                )
316

UNCOV
317
        self._routers = {}
×
318

UNCOV
319
        self._ric = self.this_application.app.nsap.router_info_cache
×
320

UNCOV
321
        for router, dnets in self._ric.router_dnets.items():
×
UNCOV
322
            snet, address = router
×
UNCOV
323
            self._routers[str(address)] = Router(snet, address, dnets, path=[])
×
UNCOV
324
        for path, router_info in self._ric.path_info.items():
×
UNCOV
325
            router_address, router_status = router_info
×
UNCOV
326
            snet, dnet = path
×
UNCOV
327
            self._routers[str(router_address)].path.append((path, router_status))
×
328

329
        return self._routers
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc