• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ChristianTremblay / BAC0 / 10761719058

08 Sep 2024 05:13PM UTC coverage: 38.955% (+0.04%) from 38.92%
10761719058

push

github

ChristianTremblay
Merge branch 'async' of https://github.com/ChristianTremblay/BAC0 into async

90 of 171 new or added lines in 18 files covered. (52.63%)

1008 existing lines in 18 files now uncovered.

2072 of 5319 relevant lines covered (38.95%)

0.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

11.26
/BAC0/db/sql.py
1
#!/usr/bin/python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright (C) 2015 by Christian Tremblay, P.Eng <christian.tremblay@servisys.com>
5
# Licensed under LGPLv3, see file LICENSE in this source tree.
6
#
7
"""
8
sql.py -
9
"""
10

11
import os.path
1✔
12

13
# --- standard Python modules ---
14
import pickle
1✔
15

16
# --- 3rd party modules ---
17
import aiosqlite
1✔
18

19
from ..core.io.IOExceptions import (
1✔
20
    DataError,
21
    NoResponseFromController,
22
    RemovedPointException,
23
)
24
from ..core.utils.lookfordependency import pandas_if_available
1✔
25

26
_PANDAS, pd, sql, Timestamp = pandas_if_available()
1✔
27
# --- this application's modules ---
28

29
# ------------------------------------------------------------------------------
30

31

32
class SQLMixin(object):
1✔
33
    """
34
    Use SQL to persist a device's contents.  By saving the device contents to an SQL
35
    database, you can work with the device's data while offline, or while the device
36
    is not available.
37
    """
38

39
    async def _read_from_sql(self, request, db_name):
1✔
40
        """
41
        Using the contextlib, I hope to close the connection to database when
42
        not in use
43
        """
44
        async with aiosqlite.connect(f"{db_name}.db") as con:
×
45
            async with con.execute(request) as cursor:
×
46
                rows = await cursor.fetchall()
×
UNCOV
47
                columns = [description[0] for description in cursor.description]
×
UNCOV
48
                return pd.DataFrame(rows, columns=columns)
×
49

50
    def dev_properties_df(self):
1✔
UNCOV
51
        dic = self.properties.asdict.copy()
×
52
        dic.pop("network", None)
×
53
        dic["objects_list"] = []
×
54
        dic.pop("pss", None)
×
55
        return dic
×
56

57
    def points_properties_df(self):
1✔
58
        """
59
        Return a dictionary of point/point_properties in preparation for storage in SQL.
60
        """
61
        pprops = {}
×
UNCOV
62
        for each in self.points:
×
UNCOV
63
            p = each.properties.asdict.copy()
×
UNCOV
64
            p.pop("device", None)
×
UNCOV
65
            p.pop("network", None)
×
UNCOV
66
            p.pop("simulated", None)
×
UNCOV
67
            p.pop("overridden", None)
×
UNCOV
68
            pprops[str(each.properties.name)] = p
×
69

UNCOV
70
        return pd.DataFrame(pprops)
×
71

72
    def backup_histories_df(self, resampling="1s"):
1✔
73
        """
74
        Build a dataframe of the point histories
75
        By default, dataframe will be resampled for 1sec intervals,
76
        NaN will be forward filled then backward filled. This way, no
77
        NaN values will remains and analytics will be easier.
78

79
        Please note that this can be disabled using resampling=False
80

81
        In the process of building the dataframe, analog values are
82
        resampled using the mean() function. So we have intermediate
83
        results between to records.
84

85
        For binary values, we'll use .last() so we won't get a 0.5 value
86
        which means nothing in this context.
87

88
        If saving a DB that already exists, previous resampling will survive
89
        the merge of old data and new data.
90
        """
UNCOV
91
        if not _PANDAS:
×
92
            self.log("Pandas is required to create dataframe.", level="error")
×
93
            return
×
94
        backup = {}
×
95
        if isinstance(resampling, str):
×
96
            resampling_needed = True
×
97
            resampling_freq = resampling
×
98
        elif resampling in [0, False]:
×
99
            resampling_needed = False
×
100

101
        def extract_value_and_string(val):
×
UNCOV
102
            if isinstance(val, str):
×
UNCOV
103
                if ":" in val:
×
104
                    _v, _s = val.split(":")
×
105
                    return (int(_v), _s)
×
106
                elif val == "active":
×
107
                    val = 1
×
UNCOV
108
                elif val == "inactive":
×
UNCOV
109
                    val = 0
×
UNCOV
110
            return (int(val), "unknown")
×
111

112
        # print(resampling, resampling_freq, resampling_needed)
UNCOV
113
        for point in self.points:
×
UNCOV
114
            _name = str(point.properties.name)
×
UNCOV
115
            try:
×
116
                if (
×
117
                    "binary" in point.properties.type
118
                    or "multi" in point.properties.type
119
                ):
UNCOV
120
                    backup[f"{_name}_str"] = (
×
121
                        point.history.apply(lambda x: extract_value_and_string(x)[1])
122
                        .resample(resampling_freq)
123
                        .last()
124
                    )
UNCOV
125
                    backup[_name] = (
×
126
                        point.history.apply(lambda x: extract_value_and_string(x)[0])
127
                        .resample(resampling_freq)
128
                        .last()
129
                    )
130
                elif resampling_needed and "analog" in point.properties.type:
×
131
                    backup[_name] = point.history.resample(resampling_freq).mean()
×
132
                else:
133
                    # backup[point.properties.name] = point.history.resample(
134
                    #    resampling_freq
135
                    # ).last()
136
                    continue
×
137

UNCOV
138
            except Exception as error:
×
139
                try:
×
UNCOV
140
                    self.log(
×
141
                        f"{self.properties.name} ({self.properties.device.properties.address}) | Error in resampling {point.properties.name} | {error} (probably not enough points)",
142
                        level="error",
143
                    )
UNCOV
144
                except AttributeError as error:
×
UNCOV
145
                    raise DataError(
×
146
                        f"Cannot save, missing required information : {error}"
147
                    )
148
                if (
×
149
                    "binary" in point.properties.type
150
                    or "multi" in point.properties.type
151
                ):
UNCOV
152
                    backup[f"{_name}.str"] = (
×
153
                        point.history.apply(lambda x: extract_value_and_string(x)[1])
154
                        .resample(resampling_freq)
155
                        .last()
156
                    )
UNCOV
157
                    backup[f"{_name}.val"] = (
×
158
                        point.history.apply(lambda x: extract_value_and_string(x)[0])
159
                        .resample(resampling_freq)
160
                        .last()
161
                    )
162
                    backup[_name] = point.history.resample(resampling_freq).last()
×
UNCOV
163
                elif "analog" in point.properties.type:
×
164
                    backup[_name] = point.history.resample(resampling_freq).mean()
×
165
                else:
166
                    # backup[point.properties.name] = point.history
UNCOV
167
                    continue
×
168

UNCOV
169
        df = pd.DataFrame(dict([(k, pd.Series(v)) for k, v in backup.items()]))
×
UNCOV
170
        if resampling_needed:
×
UNCOV
171
            return df.resample(resampling_freq).last().ffill().bfill()
×
172
        else:
173
            return df
×
174

175
    async def save(self, filename=None, resampling=None):
1✔
176
        """
177
        Save the point histories to sqlite3 database.
178
        Save the device object properties to a pickle file so the device can be reloaded.
179

180
        Resampling : valid Pandas resampling frequency. If 0 or False, dataframe will not be resampled on save.
181
        """
182
        if not _PANDAS:
×
UNCOV
183
            self.log("Pandas is required to save to SQLite.", level="error")
×
184
            return
×
185

UNCOV
186
        if filename:
×
UNCOV
187
            if ".db" in filename:
×
UNCOV
188
                filename = filename.split(".")[0]
×
189
            self.properties.db_name = filename
×
190
        else:
191
            self.properties.db_name = f"Device_{self.properties.device_id}"
×
192

193
        if resampling is None:
×
194
            resampling = self.properties.save_resampling
×
195

196
        # Does file exist? If so, append data
197

198
        def _df_to_backup():
×
UNCOV
199
            try:
×
UNCOV
200
                return self.backup_histories_df(resampling=resampling)
×
201
            except (DataError, NoResponseFromController):
×
202
                self.log("Impossible to save right now, error in data", level="error")
×
203
                return pd.DataFrame()
×
204

205
        if os.path.isfile(f"{self.properties.db_name}.db"):
×
UNCOV
206
            try:
×
UNCOV
207
                his = await self._read_from_sql(
×
208
                    'select * from "history"', self.properties.db_name
209
                )
UNCOV
210
                his.index = his["index"].apply(Timestamp)
×
211
                last = his.index[-1]
×
212
                df_to_backup = _df_to_backup()[last:]
×
UNCOV
213
            except Exception:
×
214
                df_to_backup = _df_to_backup()
×
215

216
        else:
217
            self.log("Creating a new backup database", level="debug")
×
218
            df_to_backup = _df_to_backup()
×
219

220
        if df_to_backup is None:
×
221
            return
×
222
        # DataFrames that will be saved to SQL
UNCOV
223
        async with aiosqlite.connect(f"{self.properties.db_name}.db") as con:
×
UNCOV
224
            try:
×
UNCOV
225
                async with con.execute("SELECT * FROM history") as cursor:
×
UNCOV
226
                    data = await cursor.fetchall()
×
UNCOV
227
                    columns = [description[0] for description in cursor.description]
×
UNCOV
228
                    data = pd.DataFrame(data, columns=columns)
×
229
                    df = pd.concat([data, df_to_backup], sort=True)
×
UNCOV
230
                    sql.to_sql(
×
231
                        df_to_backup,
232
                        name="history",
233
                        con=con,
234
                        index_label="index",
235
                        index=True,
236
                        if_exists="append",
237
                    )
238
            except Exception:
×
239
                # df = df_to_backup
NEW
240
                self._log.error("Error saving to SQL database")
×
241

242
            # asyncio.run(
243
            #    None, df_to_backup.to_sql, "history", con, None, "append", True, "index"
244
            # )
245

246
        # Saving other properties to a pickle file...
247
        prop_backup = {"device": self.dev_properties_df()}
×
248
        prop_backup["points"] = self.points_properties_df()
×
249
        try:
×
UNCOV
250
            with open(f"{self.properties.db_name}.bin", "wb") as file:
×
UNCOV
251
                pickle.dump(prop_backup, file)
×
UNCOV
252
            if self.properties.clear_history_on_save:
×
UNCOV
253
                self.clear_histories()
×
254

255
            self.log(f"Device saved to {self.properties.db_name}.db", level="info")
×
256
        except Exception as error:
×
257
            self._log.error(f"Error saving to pickle file: {error}")
×
258

259
    async def points_from_sql(self, db_name):
1✔
260
        """
261
        Retrieve point list from SQL database
262
        """
UNCOV
263
        try:
×
UNCOV
264
            points = await self._read_from_sql("SELECT * FROM history;", db_name)
×
UNCOV
265
            return list(points.columns.values)[1:]
×
NEW
266
        except Exception:
×
267
            self._log.warning(f"No history retrieved from {db_name}.db:")
×
268
            return []
×
269

270
    async def his_from_sql(self, db_name, point):
1✔
271
        """
272
        Retrive point histories from SQL database
273
        """
274
        his = await self._read_from_sql('select * from "history"', db_name)
×
UNCOV
275
        his.index = his["index"].apply(Timestamp)
×
UNCOV
276
        return his.set_index("index")[point]
×
277

278
    async def value_from_sql(self, db_name, point):
1✔
279
        """
280
        Take last known value as the value
281
        """
282
        return await self.his_from_sql(db_name, point).last_valid_index()
×
283

284
    def read_point_prop(self, device_name, point):
1✔
285
        """
286
        Points properties retrieved from pickle
287
        """
UNCOV
288
        with open(f"{device_name}.bin", "rb") as file:
×
UNCOV
289
            try:
×
UNCOV
290
                _point = pickle.load(file)["points"][point]
×
291
            except KeyError:
×
292
                raise RemovedPointException(f"{point} not found (probably deleted)")
×
293
            return _point
×
294

295
    def read_dev_prop(self, device_name):
1✔
296
        """
297
        Device properties retrieved from pickle
298
        """
UNCOV
299
        self.log("Reading prop from DB file", level="debug")
×
UNCOV
300
        try:
×
UNCOV
301
            with open(f"{device_name}.bin", "rb") as file:
×
UNCOV
302
                return pickle.load(file)["device"]
×
UNCOV
303
        except (EOFError, FileNotFoundError):
×
UNCOV
304
            self._log.error("Error reading device properties")
×
UNCOV
305
            raise ValueError
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc