• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ChristianTremblay / BAC0 / 9040788442

11 May 2024 03:25AM UTC coverage: 40.97% (+0.04%) from 40.935%
9040788442

push

github

ChristianTremblay
Github action must install bacpypes3, not the old branch from the time pypi version was bad...

2078 of 5072 relevant lines covered (40.97%)

0.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.0
/BAC0/scripts/Base.py
1
#!/usr/bin/python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright (C) 2015 by Christian Tremblay, P.Eng <christian.tremblay@servisys.com>
5
# Licensed under LGPLv3, see file LICENSE in this source tree.
6
#
7
"""
1✔
8
Doc here
9
"""
10
import asyncio
1✔
11
import random
1✔
12
import sys
1✔
13
import typing as t
1✔
14
from collections import defaultdict
1✔
15

16
# --- standard Python modules ---
17
from bacpypes3.basetypes import DeviceStatus, HostNPort, ObjectTypesSupported
1✔
18
from bacpypes3.json.util import sequence_to_json
1✔
19
from bacpypes3.local.device import DeviceObject
1✔
20
from bacpypes3.local.networkport import NetworkPortObject
1✔
21
from bacpypes3.pdu import Address
1✔
22
from bacpypes3.primitivedata import CharacterString
1✔
23
from bacpypes3.vendor import VendorInfo, get_vendor_info
1✔
24

25
# --- this application's modules ---
26
from ..core.app.asyncApp import (
1✔
27
    BAC0Application,
28
)  # BAC0BBMDDeviceApplication,; BAC0ForeignDeviceApplication,
29
from ..core.functions.GetIPAddr import validate_ip_address
1✔
30
from ..core.functions.TimeSync import TimeHandler
1✔
31
from ..core.io.IOExceptions import InitializationError, UnknownObjectError
1✔
32
from ..core.utils.notes import note_and_log
1✔
33
from ..tasks.TaskManager import stopAllTasks
1✔
34

35
# --- 3rd party modules ---
36

37
# ------------------------------------------------------------------------------
38

39

40
@note_and_log
1✔
41
class LocalObjects(object):
1✔
42
    def __init__(self, device):
1✔
43
        self.device = device
1✔
44

45
    def __getitem__(self, obj):
1✔
46
        item = None
×
47
        if isinstance(obj, tuple):
×
48
            obj_type, instance = obj
×
49
            item = self.device.this_application.app.get_object_id((obj_type, instance))
×
50
        elif isinstance(obj, str):
×
51
            name = obj
×
52
            item = self.device.this_application.app.get_object_name(name)
×
53
        if item is None:
×
54
            raise UnknownObjectError(f"Can't find {obj} in local device")
×
55
        else:
56
            return item
×
57

58

59
def charstring(val):
1✔
60
    return CharacterString(val) if isinstance(val, str) else val
1✔
61

62

63
@note_and_log
1✔
64
class Base:
1✔
65
    """
66
    Build a running BACnet/IP device that accepts WhoIs and IAm requests
67
    Initialization requires some minimial information about the local device.
68

69
    :param localIPAddr='127.0.0.1':
70
    :param localObjName='BAC0':
71
    :param deviceId=None:
72
    :param maxAPDULengthAccepted='1024':
73
    :param maxSegmentsAccepted='1024':
74
    :param segmentationSupported='segmentedBoth':
75
    """
76

77
    _used_ips: t.Set[Address] = set()
1✔
78

79
    def __init__(
1✔
80
        self,
81
        localIPAddr: str = "127.0.0.1",
82
        networkNumber: int = None,
83
        localObjName: str = "BAC0",
84
        deviceId: int = None,
85
        firmwareRevision: str = "".join(sys.version.split("|")[:2]),
86
        maxAPDULengthAccepted: str = "1024",
87
        maxSegmentsAccepted: str = "1024",
88
        segmentationSupported: str = "segmentedBoth",
89
        bbmdAddress: str = None,
90
        bbmdTTL: int = 0,
91
        bdtable: dict = None,
92
        modelName: str = "BAC0 Scripting Tool",
93
        vendorId: int = 842,
94
        vendorName: str = "SERVISYS inc.",
95
        description: str = "http://christiantremblay.github.io/BAC0/",
96
        location: str = "Bromont, Québec",
97
        timezone: str = "America/Montreal",
98
        json_file: str = None,
99
    ):
100
        self.log("Configurating app", level="debug")
1✔
101

102
        # Register Servisys
103
        try:
1✔
104
            _BAC0_vendor = VendorInfo(vendorId)
1✔
105
        except RuntimeError:
1✔
106
            pass  # we are re-running the script... forgive us
1✔
107
            _BAC0_vendor = get_vendor_info(vendorId)
1✔
108
        _BAC0_vendor.register_object_class(
1✔
109
            ObjectTypesSupported.networkPort, NetworkPortObject
110
        )
111
        _BAC0_vendor.register_object_class(ObjectTypesSupported.device, DeviceObject)
1✔
112

113
        self.timehandler = TimeHandler(tz=timezone)
1✔
114

115
        self.response = None
1✔
116
        self._initialized = False
1✔
117
        self._started = False
1✔
118
        self._stopped = False
1✔
119

120
        if localIPAddr in Base._used_ips:
1✔
121
            raise InitializationError(
×
122
                "IP Address provided ({}) already used by BAC0. Check if another software is using port 47808 on this network interface. If so, you can define multiple IP per interface. Or specify another IP using BAC0.lite(ip='IP/mask')".format(
123
                    localIPAddr
124
                )
125
            )
126

127
        if validate_ip_address(localIPAddr):
1✔
128
            self.localIPAddr = localIPAddr
1✔
129
        else:
130
            raise InitializationError(
×
131
                "IP Address provided ({}) invalid. Check if another software is using port 47808 on this network interface. If so, you can define multiple IP per interface. Or specify another IP using BAC0.lite(ip='IP/mask')".format(
132
                    localIPAddr
133
                )
134
            )
135
        self.networkNumber = networkNumber
1✔
136

137
        self.Boid = (
1✔
138
            int(deviceId) if deviceId else (3056177 + int(random.uniform(0, 1000)))
139
        )
140

141
        self.segmentationSupported = segmentationSupported
1✔
142
        self.maxSegmentsAccepted = maxSegmentsAccepted
1✔
143
        self.localObjName = localObjName
1✔
144
        self.local_objects = LocalObjects(device=self)
1✔
145

146
        self.maxAPDULengthAccepted = maxAPDULengthAccepted
1✔
147
        self.vendorId = vendorId
1✔
148
        self.vendorName = charstring(vendorName)
1✔
149
        self.modelName = charstring(modelName)
1✔
150
        self.description = charstring(description)
1✔
151
        self.location = charstring(location)
1✔
152

153
        self.discoveredDevices: t.Optional[t.Dict[t.Tuple[str, int], int]] = None
1✔
154
        self.systemStatus = DeviceStatus(1)
1✔
155

156
        self.bbmdAddress = bbmdAddress
1✔
157
        self.bbmdTTL = bbmdTTL
1✔
158
        self.bdtable = bdtable
1✔
159

160
        self.firmwareRevision = firmwareRevision
1✔
161
        self._ric = {}
1✔
162
        self.subscription_contexts = {}
1✔
163
        self.database = None
1✔
164
        self.json_file = json_file
1✔
165

166
        try:
1✔
167
            self.startApp()
1✔
168
        except InitializationError as error:
×
169
            raise InitializationError(
×
170
                f"Gros probleme : {error}. Address requested : {localIPAddr}"
171
            )
172

173
    def startApp(self):
1✔
174
        """
175
        Define the local device, including services supported.
176
        Once defined, start the BACnet stack in its own thread.
177
        """
178
        self.log("Create Local Device", level="debug")
1✔
179
        try:
1✔
180
            app_type = "BACnet/IP App"
1✔
181

182
            class config(defaultdict):
1✔
183
                "Simple class to mimic args dot retrieval"
184

185
                def __init__(self, cfg):
1✔
186
                    for k, v in cfg.items():
1✔
187
                        self[k] = v
1✔
188

189
                def __getattr__(self, key):
1✔
190
                    return self[key]
×
191

192
            if self.bbmdAddress is not None:
1✔
193
                mode = "foreign"
×
194
            elif self.bdtable:
1✔
195
                mode = "bbmd"
×
196
            else:
197
                mode = "normal"
1✔
198
            cfg = {
1✔
199
                "BAC0": {
200
                    "bbmdAddress": self.bbmdAddress,
201
                    "bdt": self.bdtable,
202
                    "ttl": self.bbmdTTL,
203
                },
204
                "device": {
205
                    "object-name": self.localObjName,
206
                    # "firmware-revision": self.firmwareRevision,
207
                    "vendor-identifier": self.vendorId,
208
                    "vendor-name": "Servisys inc.",
209
                    "object-identifier": f"device,{self.Boid}",
210
                    "object-list": [f"device,{self.Boid}", "network-port,1"],
211
                    "model-name": self.modelName,
212
                    # "max-apdu-length-accepted": self.maxAPDULengthAccepted,
213
                    # "max-segments-accepted": self.maxSegmentsAccepted,
214
                    # "location": self.location,
215
                    # "description": self.description
216
                },
217
                "network-port": {
218
                    "ip-address": str(self.localIPAddr),
219
                    "ip-subnet-mask": str(self.localIPAddr.netmask),
220
                    "bacnet-ip-udp-port": self.localIPAddr.addrPort,
221
                    "network-number": None,
222
                    "fd-bbmd-address": sequence_to_json(HostNPort(self.bbmdAddress)),
223
                    "fd-subscription-lifetime": self.bbmdTTL,
224
                    "bacnet-ip-mode": mode,
225
                },
226
            }
227

228
            self.this_application = BAC0Application(
1✔
229
                config(cfg), self.localIPAddr, json_file=self.json_file
230
            )
231
            self.log("Starting", level="debug")
1✔
232
            self._initialized = True
1✔
233

234
            try:
1✔
235
                Base._used_ips.add(self.localIPAddr)
1✔
236
                self.log(f"Registered as {app_type}", level="info")
1✔
237
                self._started = True
1✔
238
            except OSError as error:
×
239
                self.log(f"Error opening socket: {error}", level="warning")
×
240
                raise InitializationError(f"Error opening socket: {error}")
×
241
            self.log("Running", level="debug")
1✔
242
        except OSError as error:
×
243
            self.log(f"an error has occurred: {error}", level="error")
×
244
            raise InitializationError(f"Error starting app: {error}")
×
245
            self.log("finally", level="debug")
246

247
    def register_foreign_device(self, addr=None, ttl=0):
1✔
248
        # self.this_application.register_to_bbmd(addr, ttl)
249
        raise NotImplementedError()
×
250

251
    def unregister_foreign_device(self):
1✔
252
        self.this_application.unregister_from_bbmd()
×
253

254
    def disconnect(self) -> asyncio.Task:
1✔
255
        task = asyncio.create_task(self._disconnect())
×
256
        return task
×
257

258
    async def _disconnect(self):
1✔
259
        """
260
        Stop the BACnet stack.  Free the IP socket.
261
        """
262
        self.log("Stopping All running tasks", level="debug")
1✔
263
        await stopAllTasks()
1✔
264
        self.log("Stopping BACnet stack", level="debug")
1✔
265
        # Freeing socket
266
        self.this_application.app.close()
1✔
267

268
        self._stopped = True  # Stop stack thread
1✔
269
        # self.t.join()
270
        self._started = False
1✔
271
        Base._used_ips.discard(self.localIPAddr)
1✔
272
        self.log("BACnet stopped", level="info")
1✔
273

274
    @property
1✔
275
    def routing_table(self):
1✔
276
        """
277
        Routing Table will give all the details about routers and how they
278
        connect BACnet networks together.
279

280
        It's a decoded presentation of what bacpypes.router_info_cache contains.
281

282
        Returns a dict with the address of routers as key.
283
        """
284

285
        class Router:
×
286
            def __init__(self, snet, address, dnets, path=None):
×
287
                self.source_network: int = snet
×
288
                self.address: Address = address
×
289
                self.destination_networks: set = dnets
×
290
                self.path: list = path
×
291

292
            def __repr__(self):
×
293
                return "Source Network: {} | Address: {} | Destination Networks: {} | Path: {}".format(
×
294
                    self.source_network,
295
                    self.address,
296
                    self.destination_networks,
297
                    self.path,
298
                )
299

300
        self._routers = {}
×
301

302
        self._ric = self.this_application.app.nsap.router_info_cache
×
303

304
        for router, dnets in self._ric.router_dnets.items():
×
305
            snet, address = router
×
306
            self._routers[str(address)] = Router(snet, address, dnets, path=[])
×
307
        for path, router_info in self._ric.path_info.items():
×
308
            router_address, router_status = router_info
×
309
            snet, dnet = path
×
310
            self._routers[str(router_address)].path.append(
×
311
                (path, router_status)
312
            )
313

314
        return self._routers
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc