• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ChristianTremblay / BAC0 / 17868458554

19 Sep 2025 07:57PM UTC coverage: 39.741% (-1.0%) from 40.7%
17868458554

Pull #567

github

web-flow
Merge c1290eebb into df4d3e57e
Pull Request #567: Task fix + Mypy

150 of 437 new or added lines in 24 files covered. (34.32%)

19 existing lines in 9 files now uncovered.

2270 of 5712 relevant lines covered (39.74%)

0.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.67
/BAC0/scripts/Base.py
1
#!/usr/bin/python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright (C) 2015 by Christian Tremblay, P.Eng <christian.tremblay@servisys.com>
5
# Licensed under LGPLv3, see file LICENSE in this source tree.
6
#
7
"""
8
Doc here
9
"""
10
import asyncio
1✔
11
import random
1✔
12
import sys
1✔
13
import typing as t
1✔
14
from collections import defaultdict
1✔
15

16
# --- standard Python modules ---
17
from bacpypes3.basetypes import DeviceStatus, HostNPort, ObjectTypesSupported
1✔
18
from bacpypes3.json.util import sequence_to_json
1✔
19
from bacpypes3.local.device import DeviceObject
1✔
20
from bacpypes3.local.networkport import NetworkPortObject
1✔
21
from bacpypes3.pdu import Address
1✔
22
from bacpypes3.primitivedata import CharacterString, ObjectIdentifier
1✔
23
from bacpypes3.vendor import VendorInfo, get_vendor_info
1✔
24

25
# --- this application's modules ---
26
from ..core.app.asyncApp import (
1✔
27
    BAC0Application,
28
)  # BAC0BBMDDeviceApplication,; BAC0ForeignDeviceApplication,
29
from ..core.functions.GetIPAddr import validate_ip_address
1✔
30
from ..core.functions.TimeSync import TimeHandler
1✔
31
from ..core.io.IOExceptions import InitializationError, UnknownObjectError
1✔
32
from ..core.utils.notes import note_and_log
1✔
33
from ..tasks.TaskManager import stopAllTasks
1✔
34

35
# --- 3rd party modules ---
36

37
# ------------------------------------------------------------------------------
38

39

40
@note_and_log
1✔
41
class LocalObjects(object):
1✔
42
    def __init__(self, device):
1✔
43
        self.device = device
1✔
44

45
    def __getitem__(self, obj):
1✔
46
        item = None
×
47
        if isinstance(obj, tuple):
×
48
            obj_type, instance = obj
×
49
            item = self.device.this_application.app.get_object_id((obj_type, instance))
×
50
        elif isinstance(obj, str):
×
51
            name = obj
×
52
            item = self.device.this_application.app.get_object_name(name)
×
53
        if item is None:
×
54
            raise UnknownObjectError(f"Can't find {obj} in local device")
×
55
        else:
56
            return item
×
57

58

59
def charstring(val):
1✔
60
    return CharacterString(val) if isinstance(val, str) else val
1✔
61

62
class DiscoveredDevice(t.TypedDict):
1✔
63
    object_instance: ObjectIdentifier
1✔
64
    address: Address
1✔
65
    network_number: t.Set[int]
1✔
66
    vendor_id: int
1✔
67
    vendor_name: str
1✔
68

69
@note_and_log
1✔
70
class Base:
1✔
71
    """
72
    Build a running BACnet/IP device that accepts WhoIs and IAm requests
73
    Initialization requires some minimial information about the local device.
74

75
    :param localIPAddr='127.0.0.1':
76
    :param localObjName='BAC0':
77
    :param deviceId=None:
78
    :param maxAPDULengthAccepted='1024':
79
    :param maxSegmentsAccepted='1024':
80
    :param segmentationSupported='segmentedBoth':
81
    """
82

83
    _used_ips: t.Set[Address] = set()
1✔
84
    _last_cov_identifier = 0
1✔
85
    _running_cov_tasks = {}
1✔
86

87
    def __init__(
1✔
88
        self,
89
        localIPAddr: Address = Address("127.0.0.1/24"),
90
        networkNumber: t.Optional[int] = None,
91
        localObjName: str = "BAC0",
92
        deviceId: t.Optional[int] = None,
93
        firmwareRevision: str = "".join(sys.version.split("|")[:2]),
94
        maxAPDULengthAccepted: str = "1024",
95
        maxSegmentsAccepted: str = "1024",
96
        segmentationSupported: str = "segmentedBoth",
97
        bbmdAddress: t.Optional[str] = None,
98
        bbmdTTL: t.Optional[int] = 0,
99
        bdtable: t.Optional[list] = None,
100
        modelName: str = "BAC0 Scripting Tool",
101
        vendorId: int = 842,
102
        vendorName: str = "SERVISYS inc.",
103
        description: str = "http://christiantremblay.github.io/BAC0/",
104
        location: str = "Bromont, Québec",
105
        timezone: str = "America/Montreal",
106
        json_file: t.Optional[str] = None,
107
    ):
108
        self.log("Configurating app", level="debug")
1✔
109

110
        # Register Servisys
111
        try:
1✔
112
            _BAC0_vendor = VendorInfo(vendorId)
1✔
113
        except RuntimeError:
1✔
114
            pass  # we are re-running the script... forgive us
1✔
115
            _BAC0_vendor = get_vendor_info(vendorId)
1✔
116
        _BAC0_vendor.register_object_class(
1✔
117
            ObjectTypesSupported.networkPort, NetworkPortObject
118
        )
119
        _BAC0_vendor.register_object_class(ObjectTypesSupported.device, DeviceObject)
1✔
120

121
        self.timehandler = TimeHandler(tz=timezone)
1✔
122

123
        self.response = None
1✔
124
        self._initialized = False
1✔
125
        self._started = False
1✔
126
        self._stopped = False
1✔
127

128
        if localIPAddr in Base._used_ips:
1✔
129
            raise InitializationError(
×
130
                "IP Address provided ({}) already used by BAC0. Check if another software is using port 47808 on this network interface. If so, you can define multiple IP per interface. Or specify another IP using BAC0.lite(ip='IP/mask')".format(
131
                    localIPAddr
132
                )
133
            )
134

135
        if validate_ip_address(localIPAddr):
1✔
136
            self.localIPAddr = localIPAddr
1✔
137
        else:
138
            raise InitializationError(
×
139
                "IP Address provided ({}) invalid. Check if another software is using port 47808 on this network interface. If so, you can define multiple IP per interface. Or specify another IP using BAC0.lite(ip='IP/mask')".format(
140
                    localIPAddr
141
                )
142
            )
143
        self.networkNumber = networkNumber
1✔
144

145
        self.Boid = (
1✔
146
            int(deviceId) if deviceId else (3056177 + int(random.uniform(0, 1000)))
147
        )
148

149
        self.segmentationSupported = segmentationSupported
1✔
150
        self.maxSegmentsAccepted = maxSegmentsAccepted
1✔
151
        self.localObjName = localObjName
1✔
152
        self.local_objects = LocalObjects(device=self)
1✔
153

154
        self.maxAPDULengthAccepted = maxAPDULengthAccepted
1✔
155
        self.vendorId = vendorId
1✔
156
        self.vendorName = charstring(vendorName)
1✔
157
        self.modelName = charstring(modelName)
1✔
158
        self.description = charstring(description)
1✔
159
        self.location = charstring(location)
1✔
160

161
        self.discoveredDevices: t.Optional[t.Dict[str, DiscoveredDevice]] = None
1✔
162
        self.systemStatus = DeviceStatus(1)
1✔
163

164
        self.bbmdAddress = bbmdAddress
1✔
165
        self.bbmdTTL = bbmdTTL
1✔
166
        self.bdtable = bdtable
1✔
167

168
        self.firmwareRevision = firmwareRevision
1✔
169
        self._ric = {}
1✔
170
        self.subscription_contexts = {}
1✔
171
        # Cannot reference db.InfluxDB directly since it's an optional import
172
        self.database: t.Optional[t.Any] = None
1✔
173
        self.json_file = json_file
1✔
174

175
        try:
1✔
176
            self.startApp()
1✔
177
        except InitializationError as error:
×
178
            raise InitializationError(
×
179
                f"Gros probleme : {error}. Address requested : {localIPAddr}"
180
            )
181

182
    def startApp(self):
1✔
183
        """
184
        Define the local device, including services supported.
185
        Once defined, start the BACnet stack in its own thread.
186
        """
187
        self.log("Create Local Device", level="debug")
1✔
188
        try:
1✔
189
            app_type = "BACnet/IP App"
1✔
190

191
            class config(defaultdict):
1✔
192
                "Simple class to mimic args dot retrieval"
193

194
                def __init__(self, cfg):
1✔
195
                    for k, v in cfg.items():
1✔
196
                        self[k] = v
1✔
197

198
                def __getattr__(self, key):
1✔
199
                    return self[key]
×
200

201
            if self.bbmdAddress is not None:
1✔
202
                mode = "foreign"
×
203
            elif self.bdtable:
1✔
204
                mode = "bbmd"
×
205
            else:
206
                mode = "normal"
1✔
207
            cfg = {
1✔
208
                "BAC0": {
209
                    "bbmdAddress": self.bbmdAddress,
210
                    "bdt": self.bdtable,
211
                    "ttl": self.bbmdTTL,
212
                },
213
                "device": {
214
                    "object-name": self.localObjName,
215
                    # "firmware-revision": self.firmwareRevision,
216
                    "vendor-identifier": self.vendorId,
217
                    "vendor-name": "Servisys inc.",
218
                    "object-identifier": f"device,{self.Boid}",
219
                    "object-list": [f"device,{self.Boid}", "network-port,1"],
220
                    "model-name": self.modelName,
221
                    # "max-apdu-length-accepted": self.maxAPDULengthAccepted,
222
                    # "max-segments-accepted": self.maxSegmentsAccepted,
223
                    # "location": self.location,
224
                    # "description": self.description
225
                },
226
                "network-port": {
227
                    "ip-address": str(self.localIPAddr),
228
                    "ip-subnet-mask": str(self.localIPAddr.netmask),
229
                    "bacnet-ip-udp-port": self.localIPAddr.addrPort,
230
                    "network-number": None,
231
                    "fd-bbmd-address": sequence_to_json(HostNPort(self.bbmdAddress)),
232
                    "fd-subscription-lifetime": self.bbmdTTL,
233
                    "bacnet-ip-mode": mode,
234
                },
235
            }
236
            if mode == "bbmd":
1✔
237
                # bdt_json_seq = [f"BDTEntry({addr})" for addr in self.bdtable]
238
                cfg["network-port"]["bbmdBroadcastDistributionTable"] = self.bdtable
×
239

240
            _cfg = config(cfg)
1✔
241

242
            self.this_application = BAC0Application(
1✔
243
                _cfg, self.localIPAddr, json_file=self.json_file
244
            )
245
            if mode == "bbmd":
1✔
246
                self._log.info(f"Populating BDT with {self.bdtable}")
×
247
                self.this_application.populate_bdt()
×
248

249
            if mode == "foreign":
1✔
250
                self._log.info(
×
251
                    f"Registering as a foreign device to host {self.bbmdAddress} for {self.bbmdTTL} seconds"
252
                )
NEW
253
                if self.bbmdAddress is None or self.bbmdTTL is None:
×
NEW
254
                    raise ValueError("Missing bbmdAddress and/or bbmdTTL")
×
UNCOV
255
                self.this_application.register_as_foreign_device_to(
×
256
                    host=self.bbmdAddress, lifetime=self.bbmdTTL
257
                )
258

259
            self.log("Starting", level="debug")
1✔
260
            self._initialized = True
1✔
261

262
            try:
1✔
263
                Base._used_ips.add(self.localIPAddr)
1✔
264
                self.log(f"Registered as {app_type} | mode {mode}", level="info")
1✔
265
                self._started = True
1✔
266
            except OSError as error:
×
267
                self.log(f"Error opening socket: {error}", level="warning")
×
268
                raise InitializationError(f"Error opening socket: {error}")
×
269
            self.log("Running", level="debug")
1✔
270
        except OSError as error:
×
271
            self.log(f"an error has occurred: {error}", level="error")
×
272
            raise InitializationError(f"Error starting app: {error}")
×
273
            self.log("finally", level="debug")
274

275
    def register_foreign_device(self, addr=None, ttl=0):
1✔
276
        # self.this_application.register_to_bbmd(addr, ttl)
277
        raise NotImplementedError()
×
278

279
    def unregister_foreign_device(self):
1✔
280
        self.this_application.unregister_from_bbmd()
×
281

282
    def disconnect(self) -> asyncio.Task:
1✔
283
        task = asyncio.create_task(self._disconnect())
×
284
        return task
×
285

286
    async def _disconnect(self):
1✔
287
        """
288
        Stop the BACnet stack.  Free the IP socket.
289
        """
290
        self.log("Stopping All running tasks", level="debug")
1✔
291
        await stopAllTasks()
1✔
292
        self.log("Stopping BACnet stack", level="debug")
1✔
293
        # Freeing socket
294
        self.this_application.app.close()
1✔
295

296
        self._stopped = True  # Stop stack thread
1✔
297
        # self.t.join()
298
        self._started = False
1✔
299
        Base._used_ips.discard(self.localIPAddr)
1✔
300
        self.log("BACnet stopped", level="info")
1✔
301

302
    @property
1✔
303
    def routing_table(self):
1✔
304
        """
305
        Routing Table will give all the details about routers and how they
306
        connect BACnet networks together.
307

308
        It's a decoded presentation of what bacpypes.router_info_cache contains.
309

310
        Returns a dict with the address of routers as key.
311
        """
312

313
        class Router:
×
314
            def __init__(self, snet, address, dnets, path=None):
×
315
                self.source_network: int = snet
×
316
                self.address: Address = address
×
317
                self.destination_networks: set = dnets
×
318
                self.path: list = path
×
319

320
            def __repr__(self):
×
321
                return "Source Network: {} | Address: {} | Destination Networks: {} | Path: {}".format(
×
322
                    self.source_network,
323
                    self.address,
324
                    self.destination_networks,
325
                    self.path,
326
                )
327

328
        self._routers = {}
×
329

330
        self._ric = self.this_application.app.nsap.router_info_cache
×
331

332
        for router, dnets in self._ric.router_dnets.items():
×
333
            snet, address = router
×
334
            self._routers[str(address)] = Router(snet, address, dnets, path=[])
×
335
        for path, router_info in self._ric.path_info.items():
×
336
            router_address, router_status = router_info
×
337
            snet, dnet = path
×
338
            self._routers[str(router_address)].path.append((path, router_status))
×
339

340
        return self._routers
×
341

342
    @classmethod
1✔
343
    def extract_value_from_primitive_data(cls, value):
1✔
344
        if isinstance(value, float):
×
345
            return float(value)
×
346
        # elif isinstance(value, Boolean):
347
        #    if value == int(1):
348
        #        return True
349
        #    else:
350
        #        return False
351
        elif isinstance(value, int):
×
352
            return int(value)
×
353
        elif isinstance(value, str):
×
354
            return str(value)
×
355
        else:
356
            return value
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc