• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ChristianTremblay / BAC0 / 17868482439

19 Sep 2025 07:59PM UTC coverage: 39.741% (-1.0%) from 40.7%
17868482439

push

github

web-flow
Merge pull request #567 from ChristianTremblay/develop

Task fix + Mypy

150 of 437 new or added lines in 24 files covered. (34.32%)

19 existing lines in 9 files now uncovered.

2270 of 5712 relevant lines covered (39.74%)

0.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

11.9
/BAC0/db/sql.py
1
#!/usr/bin/python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright (C) 2015 by Christian Tremblay, P.Eng <christian.tremblay@servisys.com>
5
# Licensed under LGPLv3, see file LICENSE in this source tree.
6
#
7
"""
8
sql.py -
9
"""
10

11
import os.path
1✔
12

13
# --- standard Python modules ---
14
import pickle
1✔
15

16
# --- 3rd party modules ---
17
import aiosqlite
1✔
18
import sqlite3
1✔
19
import functools
1✔
20
import asyncio as _asyncio
1✔
21

22
from ..core.io.IOExceptions import (
1✔
23
    DataError,
24
    NoResponseFromController,
25
    RemovedPointException,
26
)
27
from ..core.utils.lookfordependency import pandas_if_available
1✔
28

29
_PANDAS, pd, sql, Timestamp = pandas_if_available()
1✔
30
# --- this application's modules ---
31

32
# ------------------------------------------------------------------------------
33

34

35
class SQLMixin(object):
1✔
36
    """
37
    Use SQL to persist a device's contents.  By saving the device contents to an SQL
38
    database, you can work with the device's data while offline, or while the device
39
    is not available.
40
    """
41

42
    async def _read_from_sql(self, request, db_name):
1✔
43
        """
44
        Using the contextlib, I hope to close the connection to database when
45
        not in use
46
        """
47
        async with aiosqlite.connect(f"{db_name}.db") as con:
×
48
            async with con.execute(request) as cursor:
×
49
                rows = await cursor.fetchall()
×
50
                columns = [description[0] for description in cursor.description]
×
51
                return pd.DataFrame(rows, columns=columns)
×
52

53
    def dev_properties_df(self):
1✔
54
        dic = self.properties.asdict.copy()
×
55
        dic.pop("network", None)
×
56
        dic["objects_list"] = []
×
57
        dic.pop("pss", None)
×
58
        return dic
×
59

60
    def points_properties_df(self):
1✔
61
        """
62
        Return a dictionary of point/point_properties in preparation for storage in SQL.
63
        """
64
        pprops = {}
×
65
        for each in self.points:
×
66
            p = each.properties.asdict.copy()
×
67
            p.pop("device", None)
×
68
            p.pop("network", None)
×
69
            p.pop("simulated", None)
×
70
            p.pop("overridden", None)
×
71
            pprops[str(each.properties.name)] = p
×
72

73
        return pd.DataFrame(pprops)
×
74

75
    def backup_histories_df(self, resampling="1s"):
1✔
76
        """
77
        Build a dataframe of the point histories
78
        By default, dataframe will be resampled for 1sec intervals,
79
        NaN will be forward filled then backward filled. This way, no
80
        NaN values will remains and analytics will be easier.
81

82
        Please note that this can be disabled using resampling=False
83

84
        In the process of building the dataframe, analog values are
85
        resampled using the mean() function. So we have intermediate
86
        results between to records.
87

88
        For binary values, we'll use .last() so we won't get a 0.5 value
89
        which means nothing in this context.
90

91
        If saving a DB that already exists, previous resampling will survive
92
        the merge of old data and new data.
93
        """
94
        if not _PANDAS:
×
95
            self.log("Pandas is required to create dataframe.", level="error")
×
96
            return
×
97
        backup = {}
×
98
        if isinstance(resampling, str):
×
99
            resampling_needed = True
×
100
            resampling_freq = resampling
×
101
        elif resampling in [0, False]:
×
102
            resampling_needed = False
×
103

104
        def extract_value_and_string(val):
×
105
            if isinstance(val, str):
×
106
                if ":" in val:
×
107
                    _v, _s = val.split(":")
×
108
                    return (int(_v), _s)
×
109
                elif val == "active":
×
110
                    val = 1
×
111
                elif val == "inactive":
×
112
                    val = 0
×
113
            return (int(val), "unknown")
×
114

115
        # print(resampling, resampling_freq, resampling_needed)
116
        for point in self.points:
×
117
            _name = str(point.properties.name)
×
118
            try:
×
119
                if (
×
120
                    "binary" in point.properties.type
121
                    or "multi" in point.properties.type
122
                ):
123
                    backup[f"{_name}_str"] = (
×
124
                        point.history.apply(lambda x: extract_value_and_string(x)[1])
125
                        .resample(resampling_freq)
126
                        .last()
127
                    )
128
                    backup[_name] = (
×
129
                        point.history.apply(lambda x: extract_value_and_string(x)[0])
130
                        .resample(resampling_freq)
131
                        .last()
132
                    )
133
                elif resampling_needed and "analog" in point.properties.type:
×
134
                    backup[_name] = point.history.resample(resampling_freq).mean()
×
135
                else:
136
                    # backup[point.properties.name] = point.history.resample(
137
                    #    resampling_freq
138
                    # ).last()
139
                    continue
×
140

141
            except Exception as error:
×
142
                try:
×
143
                    self.log(
×
144
                        f"{self.properties.name} ({self.properties.device.properties.address}) | Error in resampling {point.properties.name} | {error} (probably not enough points)",
145
                        level="error",
146
                    )
147
                except AttributeError as error:
×
148
                    raise DataError(
×
149
                        f"Cannot save, missing required information : {error}"
150
                    )
151
                if (
×
152
                    "binary" in point.properties.type
153
                    or "multi" in point.properties.type
154
                ):
155
                    backup[f"{_name}.str"] = (
×
156
                        point.history.apply(lambda x: extract_value_and_string(x)[1])
157
                        .resample(resampling_freq)
158
                        .last()
159
                    )
160
                    backup[f"{_name}.val"] = (
×
161
                        point.history.apply(lambda x: extract_value_and_string(x)[0])
162
                        .resample(resampling_freq)
163
                        .last()
164
                    )
165
                    backup[_name] = point.history.resample(resampling_freq).last()
×
166
                elif "analog" in point.properties.type:
×
167
                    backup[_name] = point.history.resample(resampling_freq).mean()
×
168
                else:
169
                    # backup[point.properties.name] = point.history
170
                    continue
×
171

172
        df = pd.DataFrame(dict([(k, pd.Series(v)) for k, v in backup.items()]))
×
173
        if resampling_needed:
×
174
            return df.resample(resampling_freq).last().ffill().bfill()
×
175
        else:
176
            return df
×
177

178
    async def save(self, filename=None, resampling=None):
1✔
179
        """
180
        Save the point histories to sqlite3 database.
181
        Save the device object properties to a pickle file so the device can be reloaded.
182

183
        Resampling : valid Pandas resampling frequency. If 0 or False, dataframe will not be resampled on save.
184
        """
185
        if not _PANDAS:
×
186
            self.log("Pandas is required to save to SQLite.", level="error")
×
187
            return
×
188

189
        if filename:
×
190
            if ".db" in filename:
×
191
                filename = filename.split(".")[0]
×
192
            self.properties.db_name = filename
×
193
        else:
194
            self.properties.db_name = f"Device_{self.properties.device_id}"
×
195

196
        if resampling is None:
×
197
            resampling = self.properties.save_resampling
×
198

199
        # Does file exist? If so, append data
200

201
        def _df_to_backup():
×
202
            try:
×
203
                return self.backup_histories_df(resampling=resampling)
×
204
            except (DataError, NoResponseFromController):
×
205
                self.log("Impossible to save right now, error in data", level="error")
×
206
                return pd.DataFrame()
×
207

208
        if os.path.isfile(f"{self.properties.db_name}.db"):
×
209
            try:
×
210
                his = await self._read_from_sql(
×
211
                    'select * from "history"', self.properties.db_name
212
                )
213
                his.index = his["index"].apply(Timestamp)
×
214
                last = his.index[-1]
×
215
                df_to_backup = _df_to_backup()[last:]
×
216
            except Exception:
×
217
                df_to_backup = _df_to_backup()
×
218

219
        else:
220
            self.log("Creating a new backup database", level="debug")
×
221
            df_to_backup = _df_to_backup()
×
222

223
        if df_to_backup is None:
×
224
            return
×
225
        # DataFrames that will be saved to SQL
226
        async with aiosqlite.connect(f"{self.properties.db_name}.db") as con:
×
227
            try:
×
228
                # Read only the existing index values so we can append only new rows
NEW
229
                async with con.execute('SELECT "index" FROM history') as cursor:
×
NEW
230
                    rows = await cursor.fetchall()
×
NEW
231
                    existing_index = {r[0] for r in rows}
×
232

233
                # Filter df_to_backup to rows not already present in DB (by index)
NEW
234
                if df_to_backup.empty:
×
NEW
235
                    self._log.debug("No new rows to append to history DB")
×
NEW
236
                    return
×
237

NEW
238
                try:
×
NEW
239
                    mask_new = ~df_to_backup.index.map(lambda x: x in existing_index)
×
NEW
240
                except Exception:
×
241
                    # Fallback to string comparison of index values
NEW
242
                    mask_new = ~df_to_backup.index.map(lambda x: str(x) in existing_index)
×
243

NEW
244
                df_to_append = df_to_backup.loc[mask_new]
×
NEW
245
                if df_to_append.empty:
×
NEW
246
                    self._log.debug("No new rows after deduplication, skipping append")
×
247
                else:
248
                    # pandas' DataFrame.to_sql is synchronous and expects a sqlite3/SQLAlchemy
249
                    # connection. Run it in a thread to avoid blocking the event loop.
NEW
250
                    def _sync_df_to_sql(df_local, db_name):
×
NEW
251
                        conn = sqlite3.connect(f"{db_name}.db")
×
NEW
252
                        try:
×
NEW
253
                            df_local.to_sql(
×
254
                                "history",
255
                                con=conn,
256
                                index_label="index",
257
                                index=True,
258
                                if_exists="append",
259
                            )
260
                        finally:
NEW
261
                            conn.close()
×
262

NEW
263
                    loop = _asyncio.get_running_loop()
×
264
                    # run the append in the default ThreadPoolExecutor
NEW
265
                    await loop.run_in_executor(
×
266
                        None, functools.partial(_sync_df_to_sql, df_to_append, self.properties.db_name)
267
                    )
268

UNCOV
269
            except Exception:
×
UNCOV
270
                self._log.error("Error saving to SQL database")
×
271

272
            # asyncio.run(
273
            #    None, df_to_backup.to_sql, "history", con, None, "append", True, "index"
274
            # )
275

276
        # Saving other properties to a pickle file...
277
        prop_backup = {"device": self.dev_properties_df()}
×
278
        prop_backup["points"] = self.points_properties_df()
×
279
        try:
×
280
            with open(f"{self.properties.db_name}.bin", "wb") as file:
×
281
                pickle.dump(prop_backup, file)
×
282
            if self.properties.clear_history_on_save:
×
283
                self.clear_histories()
×
284

285
            self.log(f"Device saved to {self.properties.db_name}.db", level="info")
×
286
        except Exception as error:
×
287
            self._log.error(f"Error saving to pickle file: {error}")
×
288

289
    async def points_from_sql(self, db_name):
1✔
290
        """
291
        Retrieve point list from SQL database
292
        """
293
        try:
×
294
            points = await self._read_from_sql("SELECT * FROM history;", db_name)
×
295
            return list(points.columns.values)[1:]
×
296
        except Exception:
×
297
            self._log.warning(f"No history retrieved from {db_name}.db:")
×
298
            return []
×
299

300
    async def his_from_sql(self, db_name, point):
1✔
301
        """
302
        Retrive point histories from SQL database
303
        """
304
        his = await self._read_from_sql('select * from "history"', db_name)
×
305
        his.index = his["index"].apply(Timestamp)
×
306
        return his.set_index("index")[point]
×
307

308
    async def value_from_sql(self, db_name, point):
1✔
309
        """
310
        Take last known value as the value
311
        """
312
        return await self.his_from_sql(db_name, point).last_valid_index()
×
313

314
    def read_point_prop(self, device_name, point):
1✔
315
        """
316
        Points properties retrieved from pickle
317
        """
318
        with open(f"{device_name}.bin", "rb") as file:
×
319
            try:
×
320
                _point = pickle.load(file)["points"][point]
×
321
            except KeyError:
×
322
                raise RemovedPointException(f"{point} not found (probably deleted)")
×
323
            return _point
×
324

325
    def read_dev_prop(self, device_name):
1✔
326
        """
327
        Device properties retrieved from pickle
328
        """
329
        self.log("Reading prop from DB file", level="debug")
×
330
        try:
×
331
            with open(f"{device_name}.bin", "rb") as file:
×
332
                return pickle.load(file)["device"]
×
333
        except (EOFError, FileNotFoundError):
×
334
            self._log.error("Error reading device properties")
×
335
            raise ValueError
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc