• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ChristianTremblay / BAC0 / 17868482439

19 Sep 2025 07:59PM UTC coverage: 39.741% (-1.0%) from 40.7%
17868482439

push

github

web-flow
Merge pull request #567 from ChristianTremblay/develop

Task fix + Mypy

150 of 437 new or added lines in 24 files covered. (34.32%)

19 existing lines in 9 files now uncovered.

2270 of 5712 relevant lines covered (39.74%)

0.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

48.83
/BAC0/scripts/Lite.py
1
#!/usr/bin/python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright (C) 2015 by Christian Tremblay, P.Eng <christian.tremblay@servisys.com>
5
# Licensed under LGPLv3, see file LICENSE in this source tree.
6
#
7
"""
8
Lite is the base class to create a BACnet network
9
It uses provided args to register itself as a device in the network
10
and allow communication with other devices.
11

12
"""
13

14
import asyncio
1✔
15
import typing as t
1✔
16

17
# --- standard Python modules ---
18
import weakref
1✔
19

20
from bacpypes3 import __version__ as bacpypes_version
1✔
21
from bacpypes3.app import Application
1✔
22
from bacpypes3.pdu import Address
1✔
23
from bacpypes3.primitivedata import ObjectIdentifier
1✔
24

25
from BAC0.core.app.asyncApp import BAC0Application
1✔
26
from BAC0.scripts.Base import Base
1✔
27

28

29
from ..core.devices.Device import RPDeviceConnected, RPMDeviceConnected
1✔
30
from ..core.devices.Points import Point
1✔
31
from ..core.devices.Trends import _TrendLog
1✔
32
from ..core.devices.Virtuals import VirtualPoint
1✔
33
from ..core.functions.Alias import Alias
1✔
34
from ..core.functions.CoV import COVSubscription
1✔
35

36
# from ..core.functions.legacy.cov import CoV
37
# from ..core.functions.legacy.DeviceCommunicationControl import (
38
#    DeviceCommunicationControl,
39
# )
40
from ..core.functions.Discover import Discover
1✔
41
from ..core.functions.EventEnrollment import EventEnrollment
1✔
42
from ..core.functions.GetIPAddr import HostIP
1✔
43
from ..core.functions.Reinitialize import Reinitialize
1✔
44

45
# from ..core.functions.legacy.Reinitialize import Reinitialize
46
from ..core.functions.Schedule import Schedule
1✔
47
from ..core.functions.Text import TextMixin
1✔
48
from ..core.functions.TimeSync import TimeSync
1✔
49
from ..core.io.IOExceptions import (
1✔
50
    NoResponseFromController,
51
    NumerousPingFailures,
52
    Timeout,
53
    UnrecognizedService,
54
)
55
from ..core.io.Read import ReadProperty
1✔
56
from ..core.io.Simulate import Simulation
1✔
57
from ..core.io.Write import WriteProperty
1✔
58
from ..core.utils.lookfordependency import influxdb_if_available, rich_if_available
1✔
59

60
# from ..core.io.asynchronous.Write import WriteProperty
61
from ..core.utils.notes import note_and_log
1✔
62
from ..infos import __version__ as version
1✔
63

64
# --- this application's modules ---
65
from ..tasks.RecurringTask import RecurringTask
1✔
66
from ..tasks.TaskManager import Task
1✔
67

68
from ..db.influxdb import InfluxDB
1✔
69

70
RICH, rich = rich_if_available()
1✔
71
if RICH:
1✔
72
    from rich import pretty
1✔
73
    from rich.console import Console
1✔
74
    from rich.table import Table
1✔
75

76
    pretty.install()
1✔
77

78
INFLUXDB = False
1✔
79

80
# ------------------------------------------------------------------------------
81

82

83
@note_and_log
1✔
84
class Lite(
1✔
85
    Base,
86
    Discover,
87
    Alias,
88
    EventEnrollment,
89
    ReadProperty,
90
    WriteProperty,
91
    Simulation,
92
    TimeSync,
93
    Reinitialize,
94
    # DeviceCommunicationControl,
95
    COVSubscription,
96
    Schedule,
97
    # Calendar,
98
    TextMixin,
99
):
100
    """
101
    Build a BACnet application to accept read and write requests.
102
    [Basic Whois/IAm functions are implemented in parent BasicScript class.]
103
    Once created, execute a whois() to build a list of available controllers.
104
    Initialization requires information on the local device.
105

106
    :param ip='127.0.0.1': Address must be in the same subnet as the BACnet network
107
        [BBMD and Foreign Device - not supported]
108

109
    """
110

111
    def __init__(
1✔
112
        self,
113
        ip: t.Optional[str] = None,
114
        port: t.Optional[int] = None,
115
        mask: t.Optional[int] = None,
116
        bbmdAddress=None,
117
        bbmdTTL: int = 0,
118
        bdtable=None,
119
        ping: bool = True,
120
        ping_delay: int = 300,
121
        db_params: t.Optional[t.Dict[str, t.Any]] = None,
122
        **params,
123
    ) -> None:
124
        self._initialized = False
1✔
125
        self.log(
1✔
126
            f"Starting Asynchronous BAC0 version {version} ({self.__module__.split('.')[-1]})",
127
            level="info",
128
        )
129
        self.log(f"Using bacpypes3 version {bacpypes_version}", level="info")
1✔
130
        self.log("Use BAC0.log_level to adjust verbosity of the app.", level="info")
1✔
131
        self.log(
1✔
132
            "Ex. BAC0.log_level('silence') or BAC0.log_level('error')", level="info"
133
        )
134

135
        self.log("Configurating app", level="debug")
1✔
136
        self._registered_devices = weakref.WeakValueDictionary()
1✔
137

138
        # Ping task will deal with all registered device and disconnect them if they do not respond.
139

140
        self._ping_task = RecurringTask(
1✔
141
            self.ping_registered_devices,
142
            delay=ping_delay,
143
            name="Ping Registered Devices Task",
144
        )
145
        if ping:
1✔
146
            self._ping_task.start()
1✔
147

148
        self._cleanup_task = RecurringTask(
1✔
149
            Task.clean_tasklist, delay=60, name="Cleanup Tasks List"
150
        )
151
        self._cleanup_task.start()
1✔
152

153
        if ip is None:
1✔
154
            host = HostIP(port)
×
155
            mask = host.mask
×
156
            ip_addr = host.address
×
157
        else:
158
            try:
1✔
159
                ip, subnet_mask_and_port = ip.split("/")
1✔
160
                try:
1✔
161
                    mask_s, port_s = subnet_mask_and_port.split(":")
1✔
162
                    mask = int(mask_s)
×
163
                    port = int(port_s)
×
164
                except ValueError:
1✔
165
                    mask = int(subnet_mask_and_port)
1✔
166
            except ValueError:
×
167
                ip = ip
×
168

169
            if not mask:
1✔
170
                mask = 24
×
171
            if not port:
1✔
172
                port = 47808
1✔
173
            ip_addr = Address(f"{ip}/{mask}:{port}")
1✔
174
        self._log.info(
1✔
175
            f"Using ip : {ip_addr}/{mask} on port {ip_addr.addrPort} | broadcast : {ip_addr.addrBroadcastTuple[0]}"
176
        )
177

178
        Base.__init__(
1✔
179
            self,
180
            localIPAddr=ip_addr,
181
            bbmdAddress=bbmdAddress,
182
            bbmdTTL=bbmdTTL,
183
            bdtable=bdtable,
184
            **params,
185
        )
186
        self.log(f"Device instance (id) : {self.Boid}", level="info")
1✔
187
        self.bokehserver = False
1✔
188
        self._points_to_trend = weakref.WeakValueDictionary()
1✔
189

190
        INFLUXDB = influxdb_if_available(db_params['version'])[0] if db_params is not None else False
1✔
191
        # Activate InfluxDB if params are available
192
        if db_params and INFLUXDB:
1✔
193
            try:
×
194
                self.database = (
×
195
                    InfluxDB(db_params)
196
                    if db_params["name"].lower() == "influxdb"
197
                    else None
198
                )
199
                asyncio.create_task(
×
200
                    asyncio.wait_for(self.database._health(), timeout=5)
201
                )
202
            except TimeoutError:
×
203
                self._log.error(
×
204
                    "Unable to connect to InfluxDB. Please validate parameters"
205
                )
206
        if self.database:
1✔
207
            write_interval = db_params.get("write_interval", 60)
×
208
            self.create_save_to_influxdb_task(delay=write_interval)
×
209

210
        # Announce yourself
211

212
        self.i_am()
1✔
213

214
    def i_am(self):
1✔
215
        loop = asyncio.get_running_loop()
1✔
216
        loop.create_task(self._i_am())
1✔
217

218
    async def _i_am(self) -> None:
1✔
219
        while self.this_application.app is None or not asyncio.iscoroutinefunction(
1✔
220
            self.this_application.app.i_am
221
        ):
222
            await asyncio.sleep(0.01)
1✔
223
        _this_application: BAC0Application = self.this_application
×
224
        _app: Application = _this_application.app
×
225

226
        _res = await self.this_application.app.i_am()
×
227
        self._initialized = True
×
228

229
    def create_save_to_influxdb_task(self, delay: int = 60) -> None:
1✔
230
        self._write_to_db = RecurringTask(
×
231
            self.save_registered_devices_to_db,
232
            delay=delay,
233
            name="Write to InfluxDB Task",
234
        )
235
        self._write_to_db.start()
×
236

237
    async def save_registered_devices_to_db(self, delay: int = 60) -> None:
1✔
238
        if len(self.registered_devices) > 0:
×
239
            for each in self.registered_devices:
×
240
                try:
×
241
                    await self.database.write_points_lastvalue_to_db(each.points)
×
242
                except Exception as error:
×
243
                    self._log.error(
×
244
                        f"Error writing points of {each} to InfluxDB : {error}. Stopping task."
245
                    )
246
                    await self._write_to_db.stop()
×
247
                    self.log(
×
248
                        "Write to InfluxDB Task stopped. Restarting", level="warning"
249
                    )
250
                    self.create_save_to_influxdb_task(delay=delay)
×
251

252
    def register_device(
1✔
253
        self, device: t.Union[RPDeviceConnected, RPMDeviceConnected]
254
    ) -> None:
255
        oid = id(device)
1✔
256
        self._registered_devices[oid] = device
1✔
257

258
    async def ping_registered_devices(self) -> None:
1✔
259
        """
260
        Registered device on a network (self) are kept in a list (registered_devices).
261
        This function will allow pinging thoses device regularly to monitor them. In case
262
        of disconnected devices, we will disconnect the device (which will save it). Then
263
        we'll ping again until reconnection, where the device will be bring back online.
264

265
        To permanently disconnect a device, an explicit device.disconnect(unregister=True [default value])
266
        will be needed. This way, the device won't be in the registered_devices list and
267
        BAC0 won't try to ping it.
268
        """
269
        for each in self.registered_devices:
1✔
270
            if isinstance(each, RPDeviceConnected) or isinstance(
×
271
                each, RPMDeviceConnected
272
            ):
273
                try:
×
274
                    self._log.debug(
×
275
                        f"Ping {each.properties.name}|{each.properties.address}"
276
                    )
277
                    await each.ping()
×
278
                    if each.properties.ping_failures > 3:
×
279
                        raise NumerousPingFailures
×
280

281
                except NumerousPingFailures:
×
282
                    self._log.warning(
×
283
                        "{}|{} is offline, disconnecting it.".format(
284
                            each.properties.name, each.properties.address
285
                        )
286
                    )
287
                    await each._disconnect(unregister=False)
×
288

289
            else:
290
                device_id = each.properties.device_id
×
291
                addr = each.properties.address
×
292
                name = await self.read(f"{addr} device {device_id} objectName")
×
293
                if name == each.properties.name:
×
294
                    each.properties.ping_failures = 0
×
295
                    self._log.info(
×
296
                        "{}|{} is back online, reconnecting.".format(
297
                            each.properties.name, each.properties.address
298
                        )
299
                    )
300
                    each.connect(network=self)
×
301
                    each.poll(delay=each.properties.pollDelay)
×
302

303
    @property
1✔
304
    def registered_devices(self):
1✔
305
        """
306
        Devices that have been created using BAC0.device(args)
307
        """
308
        return list(self._registered_devices.values())
1✔
309

310
    def unregister_device(self, device):
1✔
311
        """
312
        Remove from the registered list
313
        """
314
        oid = id(device)
1✔
315
        try:
1✔
316
            del self._registered_devices[oid]
1✔
317
        except KeyError:
×
318
            pass
×
319

320
    def add_trend(self, point_to_trend: t.Union[Point, _TrendLog, VirtualPoint]) -> None:
1✔
321
        """
322
        Add point to the list of histories that will be handled by Bokeh
323

324
        Argument provided must be of type Point or TrendLog
325
        ex. bacnet.add_trend(controller['point_name'])
326
        """
327
        if (
×
328
            isinstance(point_to_trend, Point)
329
            or isinstance(point_to_trend, _TrendLog)
330
            or isinstance(point_to_trend, VirtualPoint)
331
        ):
332
            oid = id(point_to_trend)
×
333
            self._points_to_trend[oid] = point_to_trend
×
334
        else:
335
            raise TypeError("Please provide point containing history")
×
336

337
    def remove_trend(
1✔
338
        self, point_to_remove: t.Union[Point, _TrendLog, VirtualPoint]
339
    ) -> None:
340
        """
341
        Remove point from the list of histories that will be handled by Bokeh
342

343
        Argument provided must be of type Point or TrendLog
344
        ex. bacnet.remove_trend(controller['point_name'])
345
        """
346
        if (
×
347
            isinstance(point_to_remove, Point)
348
            or isinstance(point_to_remove, _TrendLog)
349
            or isinstance(point_to_remove, VirtualPoint)
350
        ):
351
            oid = id(point_to_remove)
×
352
        else:
353
            raise TypeError("Please provide point or trendLog containing history")
×
354
        if oid in self._points_to_trend.keys():
×
355
            del self._points_to_trend[oid]
×
356

357
    @property
1✔
358
    async def devices(self):
1✔
359
        await self._devices(_return_list=False)
×
360

361
    async def _devices(
1✔
362
        self, _return_list: bool = False
363
    ) -> t.List[t.Tuple[str, str, int, str, t.Set[int]]]:
364
        """
365
        This property will create a good looking table of all the discovered devices
366
        seen on the network.
367

368
        For that, some requests will be sent over the network to look for name,
369
        manufacturer, etc and in big network, this could be a long process.
370
        """
371

NEW
372
        lst: t.List[t.Tuple[str, str, int, str, t.Set[int]]] = []
×
373
        if self.discoveredDevices is not None:
×
374
            for k, v in self.discoveredDevices.items():
×
NEW
375
                object, instance = v["object_instance"]
×
NEW
376
                device_address = v["address"]
×
NEW
377
                network_number = v["network_number"]
×
378
                try:
×
379
                    deviceName, vendorName = await self.readMultiple(
×
380
                        f"{device_address} {object} {instance} objectName vendorName"
381
                    )
382
                except (UnrecognizedService, ValueError):
×
383
                    self._log.warning(
×
384
                        f"Unrecognized service for {object} {instance} | {device_address}"
385
                    )
386
                    try:
×
387
                        deviceName = await self.read(
×
388
                            f"{device_address} {object} {instance} objectName"
389
                        )
390
                        vendorName = await self.read(
×
391
                            f"{device_address} {object} {instance} vendorName"
392
                        )
393
                    except NoResponseFromController:
×
394
                        self.log(f"No response from {k}", level="warning")
×
395
                        continue
×
396
                except (NoResponseFromController, Timeout):
×
397
                    self.log(f"No response from {k}", level="warning")
×
398
                    continue
×
NEW
399
                lst.append((str(deviceName), str(vendorName), instance, device_address, network_number))
×
UNCOV
400
            if RICH:
×
401
                console = Console()
×
402
                table = Table(show_header=True, header_style="bold magenta")
×
403
                table.add_column("Network_number")
×
404
                table.add_column("Device Name")
×
405
                table.add_column("Address")
×
406
                table.add_column("Device Instance")
×
407
                table.add_column("Vendor Name")
×
408
                for each in lst:
×
409
                    deviceName, vendorName, devId, device_address, network_number = each
×
410
                    table.add_row(
×
411
                        f"{network_number}",
412
                        f"{deviceName}",
413
                        f"{device_address}",
414
                        f"{devId}",
415
                        f"{vendorName}",
416
                    )
417
                console.print(table)
×
418
        if _return_list:
×
NEW
419
            return lst
×
NEW
420
        return []
×
421

422
    @property
1✔
423
    def trends(self) -> t.List[t.Any]:
1✔
424
        """
425
        This will present a list of all registered trends used by Bokeh Server
426
        """
427
        return list(self._points_to_trend.values())
×
428

429
    @property
1✔
430
    def tasks(self) -> t.List[Task]:
1✔
431
        """
432
        This will present a list of all registered tasks
433
        """
434
        return Task.tasks
×
435

436
    def disconnect(self) -> asyncio.Task:
1✔
NEW
437
        return asyncio.create_task(self._disconnect())
×
438

439
    async def _disconnect(self) -> None:
1✔
440
        self.log("Disconnecting", level="debug")
1✔
441
        for each in self.registered_devices:
1✔
442
            await each._disconnect()
×
443
        await super()._disconnect()
1✔
444
        self._initialized = False
1✔
445

446
    def __repr__(self) -> str:
1✔
447
        return f"Bacnet Network using ip {self.localIPAddr} with device id {self.Boid}"
×
448

449
    def __getitem__(self, boid_or_localobject: t.Union[str, ObjectIdentifier, tuple]):
1✔
450
        """
451
        Retrieve an item from the application by its name or identifier.
452

453
        Args:
454
            boid_or_localobject (Union[str, ObjectIdentifier]): The name (as a string) or identifier (as an ObjectIdentifier) of the object to retrieve.
455

456
        Returns:
457
            Union[YourObjectType, None]: The object corresponding to the given name or identifier, or a registered device if the identifier matches a device ID. Returns None if the object or device is not found.
458

459
        Raises:
460
            KeyError: If the object or device is not found and logging is not enabled.
461
        """
462
        if isinstance(boid_or_localobject, str):
×
463
            item = self.this_application.app.objectName[boid_or_localobject]
×
464
        elif isinstance(boid_or_localobject, ObjectIdentifier):
×
465
            item = self.this_application.app.objectIdentifier[boid_or_localobject]
×
466
        elif isinstance(boid_or_localobject, tuple):
×
467
            _objId = ObjectIdentifier(boid_or_localobject)
×
468
            item = self.this_application.app.objectIdentifier[_objId]
×
469
        if item is None:
×
470
            for device in self._registered_devices:
×
471
                if str(device.properties.device_id) == str(boid_or_localobject):
×
472
                    return device
×
473
            self.log(f"{boid_or_localobject} not found", level="error")
×
474
        else:
475
            return item
×
476

477
    async def __aenter__(self):
1✔
478
        while not self._initialized:
1✔
479
            await asyncio.sleep(0.1)
×
480
        await asyncio.sleep(1)  # just to be sure we are ready
1✔
481
        self._log.info(
1✔
482
            f"{self.localObjName}|{self.Boid} connected. Entering context manager."
483
        )
484
        return self
1✔
485

486
    async def __aexit__(self, exc_type, exc_val, exc_tb):
1✔
487
        await self._disconnect()
1✔
488
        while self._initialized:
1✔
489
            await asyncio.sleep(0.1)
×
490
        self._log.info(
1✔
491
            f"{self.localObjName}|{self.Boid} disconnected. Exiting context manager."
492
        )
493

494
    def get_device_by_id(self, id):
1✔
495
        for each in self.registered_devices:
×
496
            if each.properties.device_id == id:
×
497
                return each
×
498
        self._log.error(f"Device {id} not found")
×
499
        raise ValueError("Device not found")
×
500

501
    def cov(
1✔
502
        self,
503
        address: str,
504
        objectID: t.Tuple[str, int],
505
        lifetime: int = 900,
506
        confirmed: bool = False,
507
        callback: t.Optional[
508
            t.Union[t.Callable[[str, t.Any], None], t.Awaitable[None]]
509
        ] = None,
510
    ):
511
        """
512
        Subscribe to COV notification for a given address and objectID
513
        If callback is provided, it will be called with the value of the COV notification
514
        So the function must be built to accept two arguments : property_identifier and property_value
515
        """
516
        cov_task = COVSubscription(
×
517
            address=address,
518
            objectID=objectID,
519
            confirmed=confirmed,
520
            lifetime=lifetime,
521
            callback=callback,
522
            BAC0App=self,
523
        )
524
        Base._running_cov_tasks[cov_task.process_identifier] = cov_task
×
525
        cov_task.task = asyncio.create_task(cov_task.run())
×
526
        self.log(
×
527
            f"COV subscription for {address}|{objectID} with id {cov_task.process_identifier} started",
528
            level="info",
529
        )
530

531
    def cancel_cov(self, task_id: int):
1✔
532
        self.log(f"Canceling COV subscription id {task_id}", level="info")
×
533
        process_identifer = task_id
×
534

535
        if process_identifer not in Base._running_cov_tasks:
×
536
            self.log(f"Task {process_identifer} not found", level="warning")
×
537
            return
×
538
        cov_subscription = Base._running_cov_tasks.pop(process_identifer)
×
539
        cov_subscription.stop()
×
540
        # await cov_subscription.task
541

542
    @property
1✔
543
    def cov_tasks(self):
1✔
544
        return Base._running_cov_tasks
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc