• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ChristianTremblay / BAC0 / 13463214558

21 Feb 2025 06:45PM UTC coverage: 41.108% (-0.2%) from 41.343%
13463214558

push

github

ChristianTremblay
format...

2219 of 5398 relevant lines covered (41.11%)

0.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.27
/BAC0/scripts/Base.py
1
#!/usr/bin/python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright (C) 2015 by Christian Tremblay, P.Eng <christian.tremblay@servisys.com>
5
# Licensed under LGPLv3, see file LICENSE in this source tree.
6
#
7
"""
8
Doc here
9
"""
10
import asyncio
1✔
11
import random
1✔
12
import sys
1✔
13
import typing as t
1✔
14
from collections import defaultdict
1✔
15

16
# --- standard Python modules ---
17
from bacpypes3.basetypes import DeviceStatus, HostNPort, ObjectTypesSupported
1✔
18
from bacpypes3.json.util import sequence_to_json
1✔
19
from bacpypes3.local.device import DeviceObject
1✔
20
from bacpypes3.local.networkport import NetworkPortObject
1✔
21
from bacpypes3.pdu import Address
1✔
22
from bacpypes3.primitivedata import CharacterString
1✔
23
from bacpypes3.vendor import VendorInfo, get_vendor_info
1✔
24

25
# --- this application's modules ---
26
from ..core.app.asyncApp import (
1✔
27
    BAC0Application,
28
)  # BAC0BBMDDeviceApplication,; BAC0ForeignDeviceApplication,
29
from ..core.functions.GetIPAddr import validate_ip_address
1✔
30
from ..core.functions.TimeSync import TimeHandler
1✔
31
from ..core.io.IOExceptions import InitializationError, UnknownObjectError
1✔
32
from ..core.utils.notes import note_and_log
1✔
33
from ..tasks.TaskManager import stopAllTasks
1✔
34

35
# --- 3rd party modules ---
36

37
# ------------------------------------------------------------------------------
38

39

40
@note_and_log
1✔
41
class LocalObjects(object):
1✔
42
    def __init__(self, device):
1✔
43
        self.device = device
1✔
44

45
    def __getitem__(self, obj):
1✔
46
        item = None
×
47
        if isinstance(obj, tuple):
×
48
            obj_type, instance = obj
×
49
            item = self.device.this_application.app.get_object_id((obj_type, instance))
×
50
        elif isinstance(obj, str):
×
51
            name = obj
×
52
            item = self.device.this_application.app.get_object_name(name)
×
53
        if item is None:
×
54
            raise UnknownObjectError(f"Can't find {obj} in local device")
×
55
        else:
56
            return item
×
57

58

59
def charstring(val):
1✔
60
    return CharacterString(val) if isinstance(val, str) else val
1✔
61

62

63
@note_and_log
1✔
64
class Base:
1✔
65
    """
66
    Build a running BACnet/IP device that accepts WhoIs and IAm requests
67
    Initialization requires some minimial information about the local device.
68

69
    :param localIPAddr='127.0.0.1':
70
    :param localObjName='BAC0':
71
    :param deviceId=None:
72
    :param maxAPDULengthAccepted='1024':
73
    :param maxSegmentsAccepted='1024':
74
    :param segmentationSupported='segmentedBoth':
75
    """
76

77
    _used_ips: t.Set[Address] = set()
1✔
78
    _last_cov_identifier = 0
1✔
79
    _running_cov_tasks = {}
1✔
80

81
    def __init__(
1✔
82
        self,
83
        localIPAddr: Address = Address("127.0.0.1/24"),
84
        networkNumber: int = None,
85
        localObjName: str = "BAC0",
86
        deviceId: int = None,
87
        firmwareRevision: str = "".join(sys.version.split("|")[:2]),
88
        maxAPDULengthAccepted: str = "1024",
89
        maxSegmentsAccepted: str = "1024",
90
        segmentationSupported: str = "segmentedBoth",
91
        bbmdAddress: str = None,
92
        bbmdTTL: int = 0,
93
        bdtable: list = None,
94
        modelName: str = "BAC0 Scripting Tool",
95
        vendorId: int = 842,
96
        vendorName: str = "SERVISYS inc.",
97
        description: str = "http://christiantremblay.github.io/BAC0/",
98
        location: str = "Bromont, Québec",
99
        timezone: str = "America/Montreal",
100
        json_file: str = None,
101
    ):
102
        self.log("Configurating app", level="debug")
1✔
103

104
        # Register Servisys
105
        try:
1✔
106
            _BAC0_vendor = VendorInfo(vendorId)
1✔
107
        except RuntimeError:
1✔
108
            pass  # we are re-running the script... forgive us
1✔
109
            _BAC0_vendor = get_vendor_info(vendorId)
1✔
110
        _BAC0_vendor.register_object_class(
1✔
111
            ObjectTypesSupported.networkPort, NetworkPortObject
112
        )
113
        _BAC0_vendor.register_object_class(ObjectTypesSupported.device, DeviceObject)
1✔
114

115
        self.timehandler = TimeHandler(tz=timezone)
1✔
116

117
        self.response = None
1✔
118
        self._initialized = False
1✔
119
        self._started = False
1✔
120
        self._stopped = False
1✔
121

122
        if localIPAddr in Base._used_ips:
1✔
123
            raise InitializationError(
×
124
                "IP Address provided ({}) already used by BAC0. Check if another software is using port 47808 on this network interface. If so, you can define multiple IP per interface. Or specify another IP using BAC0.lite(ip='IP/mask')".format(
125
                    localIPAddr
126
                )
127
            )
128

129
        if validate_ip_address(localIPAddr):
1✔
130
            self.localIPAddr = localIPAddr
1✔
131
        else:
132
            raise InitializationError(
×
133
                "IP Address provided ({}) invalid. Check if another software is using port 47808 on this network interface. If so, you can define multiple IP per interface. Or specify another IP using BAC0.lite(ip='IP/mask')".format(
134
                    localIPAddr
135
                )
136
            )
137
        self.networkNumber = networkNumber
1✔
138

139
        self.Boid = (
1✔
140
            int(deviceId) if deviceId else (3056177 + int(random.uniform(0, 1000)))
141
        )
142

143
        self.segmentationSupported = segmentationSupported
1✔
144
        self.maxSegmentsAccepted = maxSegmentsAccepted
1✔
145
        self.localObjName = localObjName
1✔
146
        self.local_objects = LocalObjects(device=self)
1✔
147

148
        self.maxAPDULengthAccepted = maxAPDULengthAccepted
1✔
149
        self.vendorId = vendorId
1✔
150
        self.vendorName = charstring(vendorName)
1✔
151
        self.modelName = charstring(modelName)
1✔
152
        self.description = charstring(description)
1✔
153
        self.location = charstring(location)
1✔
154

155
        self.discoveredDevices: t.Optional[t.Dict[t.Tuple[str, int], int]] = None
1✔
156
        self.systemStatus = DeviceStatus(1)
1✔
157

158
        self.bbmdAddress = bbmdAddress
1✔
159
        self.bbmdTTL = bbmdTTL
1✔
160
        self.bdtable = bdtable
1✔
161

162
        self.firmwareRevision = firmwareRevision
1✔
163
        self._ric = {}
1✔
164
        self.subscription_contexts = {}
1✔
165
        self.database = None
1✔
166
        self.json_file = json_file
1✔
167

168
        try:
1✔
169
            self.startApp()
1✔
170
        except InitializationError as error:
×
171
            raise InitializationError(
×
172
                f"Gros probleme : {error}. Address requested : {localIPAddr}"
173
            )
174

175
    def startApp(self):
1✔
176
        """
177
        Define the local device, including services supported.
178
        Once defined, start the BACnet stack in its own thread.
179
        """
180
        self.log("Create Local Device", level="debug")
1✔
181
        try:
1✔
182
            app_type = "BACnet/IP App"
1✔
183

184
            class config(defaultdict):
1✔
185
                "Simple class to mimic args dot retrieval"
186

187
                def __init__(self, cfg):
1✔
188
                    for k, v in cfg.items():
1✔
189
                        self[k] = v
1✔
190

191
                def __getattr__(self, key):
1✔
192
                    return self[key]
×
193

194
            if self.bbmdAddress is not None:
1✔
195
                mode = "foreign"
×
196
            elif self.bdtable:
1✔
197
                mode = "bbmd"
×
198
            else:
199
                mode = "normal"
1✔
200
            cfg = {
1✔
201
                "BAC0": {
202
                    "bbmdAddress": self.bbmdAddress,
203
                    "bdt": self.bdtable,
204
                    "ttl": self.bbmdTTL,
205
                },
206
                "device": {
207
                    "object-name": self.localObjName,
208
                    # "firmware-revision": self.firmwareRevision,
209
                    "vendor-identifier": self.vendorId,
210
                    "vendor-name": "Servisys inc.",
211
                    "object-identifier": f"device,{self.Boid}",
212
                    "object-list": [f"device,{self.Boid}", "network-port,1"],
213
                    "model-name": self.modelName,
214
                    # "max-apdu-length-accepted": self.maxAPDULengthAccepted,
215
                    # "max-segments-accepted": self.maxSegmentsAccepted,
216
                    # "location": self.location,
217
                    # "description": self.description
218
                },
219
                "network-port": {
220
                    "ip-address": str(self.localIPAddr),
221
                    "ip-subnet-mask": str(self.localIPAddr.netmask),
222
                    "bacnet-ip-udp-port": self.localIPAddr.addrPort,
223
                    "network-number": None,
224
                    "fd-bbmd-address": sequence_to_json(HostNPort(self.bbmdAddress)),
225
                    "fd-subscription-lifetime": self.bbmdTTL,
226
                    "bacnet-ip-mode": mode,
227
                },
228
            }
229
            if mode == "bbmd":
1✔
230
                # bdt_json_seq = [f"BDTEntry({addr})" for addr in self.bdtable]
231
                cfg["network-port"]["bbmdBroadcastDistributionTable"] = self.bdtable
×
232

233
            _cfg = config(cfg)
1✔
234

235
            self.this_application = BAC0Application(
1✔
236
                _cfg, self.localIPAddr, json_file=self.json_file
237
            )
238
            if mode == "bbmd":
1✔
239
                self._log.info(f"Populating BDT with {self.bdtable}")
×
240
                self.this_application.populate_bdt()
×
241

242
            if mode == "foreign":
1✔
243
                self._log.info(
×
244
                    f"Registering as a foreign device to host {self.bbmdAddress} for {self.bbmdTTL} seconds"
245
                )
246
                self.this_application.register_as_foreign_device_to(
×
247
                    host=self.bbmdAddress, lifetime=self.bbmdTTL
248
                )
249

250
            self.log("Starting", level="debug")
1✔
251
            self._initialized = True
1✔
252

253
            try:
1✔
254
                Base._used_ips.add(self.localIPAddr)
1✔
255
                self.log(f"Registered as {app_type} | mode {mode}", level="info")
1✔
256
                self._started = True
1✔
257
            except OSError as error:
×
258
                self.log(f"Error opening socket: {error}", level="warning")
×
259
                raise InitializationError(f"Error opening socket: {error}")
×
260
            self.log("Running", level="debug")
1✔
261
        except OSError as error:
×
262
            self.log(f"an error has occurred: {error}", level="error")
×
263
            raise InitializationError(f"Error starting app: {error}")
×
264
            self.log("finally", level="debug")
265

266
    def register_foreign_device(self, addr=None, ttl=0):
1✔
267
        # self.this_application.register_to_bbmd(addr, ttl)
268
        raise NotImplementedError()
×
269

270
    def unregister_foreign_device(self):
1✔
271
        self.this_application.unregister_from_bbmd()
×
272

273
    def disconnect(self) -> asyncio.Task:
1✔
274
        task = asyncio.create_task(self._disconnect())
×
275
        return task
×
276

277
    async def _disconnect(self):
1✔
278
        """
279
        Stop the BACnet stack.  Free the IP socket.
280
        """
281
        self.log("Stopping All running tasks", level="debug")
1✔
282
        await stopAllTasks()
1✔
283
        self.log("Stopping BACnet stack", level="debug")
1✔
284
        # Freeing socket
285
        self.this_application.app.close()
1✔
286

287
        self._stopped = True  # Stop stack thread
1✔
288
        # self.t.join()
289
        self._started = False
1✔
290
        Base._used_ips.discard(self.localIPAddr)
1✔
291
        self.log("BACnet stopped", level="info")
1✔
292

293
    @property
1✔
294
    def routing_table(self):
1✔
295
        """
296
        Routing Table will give all the details about routers and how they
297
        connect BACnet networks together.
298

299
        It's a decoded presentation of what bacpypes.router_info_cache contains.
300

301
        Returns a dict with the address of routers as key.
302
        """
303

304
        class Router:
×
305
            def __init__(self, snet, address, dnets, path=None):
×
306
                self.source_network: int = snet
×
307
                self.address: Address = address
×
308
                self.destination_networks: set = dnets
×
309
                self.path: list = path
×
310

311
            def __repr__(self):
×
312
                return "Source Network: {} | Address: {} | Destination Networks: {} | Path: {}".format(
×
313
                    self.source_network,
314
                    self.address,
315
                    self.destination_networks,
316
                    self.path,
317
                )
318

319
        self._routers = {}
×
320

321
        self._ric = self.this_application.app.nsap.router_info_cache
×
322

323
        for router, dnets in self._ric.router_dnets.items():
×
324
            snet, address = router
×
325
            self._routers[str(address)] = Router(snet, address, dnets, path=[])
×
326
        for path, router_info in self._ric.path_info.items():
×
327
            router_address, router_status = router_info
×
328
            snet, dnet = path
×
329
            self._routers[str(router_address)].path.append((path, router_status))
×
330

331
        return self._routers
×
332

333
    @classmethod
1✔
334
    def extract_value_from_primitive_data(value):
1✔
335
        if isinstance(value, float):
×
336
            return float(value)
×
337
        # elif isinstance(value, Boolean):
338
        #    if value == int(1):
339
        #        return True
340
        #    else:
341
        #        return False
342
        elif isinstance(value, int):
×
343
            return int(value)
×
344
        elif isinstance(value, str):
×
345
            return str(value)
×
346
        else:
347
            return value
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc