• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sdementen / piecash / 9548188709

17 Jun 2024 01:03PM UTC coverage: 84.923% (-0.4%) from 85.329%
9548188709

Pull #228

github

web-flow
Merge ff556ac34 into a7fb6dae4
Pull Request #228: fix migration to SA1.4

10 of 14 new or added lines in 1 file covered. (71.43%)

3 existing lines in 3 files now uncovered.

1870 of 2202 relevant lines covered (84.92%)

1.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.95
/piecash/sa_extra.py
1
from __future__ import division, unicode_literals
2✔
2
from __future__ import print_function
2✔
3

4
import datetime
2✔
5
import logging
2✔
6
import sys
2✔
7
import unicodedata
2✔
8

9
import pytz
2✔
10
import tzlocal
2✔
11
from sqlalchemy import (
2✔
12
    types,
13
    Table,
14
    MetaData,
15
    ForeignKeyConstraint,
16
    event,
17
    create_engine,
18
)
19
from sqlalchemy.dialects import sqlite
2✔
20
from sqlalchemy.ext.compiler import compiles
2✔
21
from sqlalchemy.ext.hybrid import hybrid_property
2✔
22
from sqlalchemy.orm import exc as orm_exc
2✔
23
from sqlalchemy.orm import sessionmaker, object_session
2✔
24

25
try:
2✔
26
    # sqlalchemy 1.4 and greater changes `as_declarative` to create a
27
    # `registry` object, on which `as_declarative_base` is called.
28
    #
29
    # For unclear reasons, the `constructor` keyword is not forwarded
30
    # to the `registry` constructor in `as_declarative`, even though
31
    # it's defined for `registry.__init__` and not allowed by
32
    # `registry.as_declarative_base`.
33
    #
34
    # This redefinition of `as_declarative` passes the `constructor` keyword
35
    # onto `registry`, just as is done in `declarative_base`.
36
    #
37
    # I am using the existence of `sqlalchemy.orm.registry` (new in SA 1.4)
38
    # as the marker for whether `constructor` is supported in `as_declarative`
39
    from sqlalchemy.orm import registry
2✔
40
    from sqlalchemy.orm.decl_base import _declarative_constructor
2✔
41

42
    def as_declarative(**kw):
2✔
43
        bind, metadata, class_registry, constructor = (
2✔
44
            kw.pop("bind", None),
45
            kw.pop("metadata", None),
46
            kw.pop("class_registry", None),
47
            kw.pop("constructor", _declarative_constructor),
48
        )
49

50
        return registry(_bind=bind, metadata=metadata, class_registry=class_registry, constructor=constructor).as_declarative_base(**kw)
2✔
51

NEW
52
except ImportError:
×
53
    # `as_declarative` was under `sqlalchemy.ext.declarative` prior to 1.4
NEW
54
    from sqlalchemy.ext.declarative import as_declarative
×
55

56

57
def __init__blocked(self, *args, **kwargs):
2✔
NEW
58
    raise NotImplementedError("Objects of type {} cannot be created from scratch " "(only read)".format(self.__class__.__name__))
×
59

60

61
@as_declarative(constructor=__init__blocked)
2✔
62
class DeclarativeBase(object):
2✔
63
    @property
2✔
64
    def book(self):
2✔
65
        """Return the gnc book holding the object"""
66
        s = object_session(self)
2✔
67
        return s and s.book
2✔
68

69
    def object_to_validate(self, change):
2✔
70
        """yield the objects to validate when the object is modified (change="new" "deleted" or "dirty").
71

72
        For instance, if the object is a Split, if it changes, we want to revalidate not the split
73
        but its transaction and its lot (if any). split.object_to_validate should yeild both split.transaction
74
        and split.lot
75
        """
76
        return
2✔
UNCOV
77
        yield
×
78

79
    def on_book_add(self):
2✔
80
        """Call when the object is added to a book"""
81
        pass
2✔
82

83
    def validate(self):
2✔
84
        """This must be reimplemented for object requiring validation"""
85
        raise NotImplementedError(self)
×
86

87
    def get_all_changes(self):
2✔
88
        try:
2✔
89
            return self.book.session._all_changes[id(self)]
2✔
90
        except KeyError:
2✔
91
            return {"STATE_CHANGES": ["unchanged"], "OBJECT": self}
2✔
92

93
    def __repr__(self):
2✔
94
        return str(self)
2✔
95

96

97
tz = pytz.timezone(str(tzlocal.get_localzone()))
2✔
98
utc = pytz.utc
2✔
99

100

101
@compiles(sqlite.DATE, "sqlite")
2✔
102
def compile_date(element, compiler, **kw):
2✔
103
    return "TEXT(8)"  # % element.__class__.__name__
2✔
104

105

106
@compiles(sqlite.DATETIME, "sqlite")
2✔
107
def compile_datetime(element, compiler, **kw):
2✔
108
    """data type for the date field
109

110
    note: it went from TEXT(14) in 2.6 to TEXT(19) in 2.8 to accommodate
111
    for the new ISO format of date in sqlite"""
112
    return "TEXT(14)"
2✔
113

114

115
class _DateTime(types.TypeDecorator):
2✔
116
    """Used to customise the DateTime type for sqlite (ie without the separators as in gnucash"""
117

118
    impl = types.TypeEngine
2✔
119

120
    def load_dialect_impl(self, dialect):
2✔
121
        if dialect.name == "sqlite":
2✔
122
            return sqlite.DATETIME(
2✔
123
                storage_format="%(year)04d-%(month)02d-%(day)02d %(hour)02d:%(minute)02d:%(second)02d",
124
                regexp=r"(\d{4})-?(\d{2})-?(\d{2}) ?(\d{2}):?(\d{2}):?(\d{2})",
125
            )
126
        else:
127
            return types.DateTime()
2✔
128

129
    def process_bind_param(self, value, dialect):
2✔
130
        if value is not None:
2✔
131
            assert isinstance(value, datetime.datetime), "value {} is not of type datetime.datetime but type {}".format(value, type(value))
2✔
132
            if value.microsecond != 0:
2✔
133
                logging.warning("A datetime has been given with microseconds which are not saved in the database")
2✔
134

135
            if not value.tzinfo:
2✔
136
                value = tz.localize(value)
2✔
137

138
            return value.astimezone(utc).replace(tzinfo=None)
2✔
139

140
    def process_result_value(self, value, dialect):
2✔
141
        if value is not None:
2✔
142
            return utc.localize(value).astimezone(tz)
2✔
143

144

145
class _DateAsDateTime(types.TypeDecorator):
2✔
146
    """Used to customise the DateTime type for sqlite (ie without the separators as in gnucash"""
147

148
    impl = types.TypeEngine
2✔
149

150
    def __init__(self, neutral_time=True, *args, **kwargs):
2✔
151
        super(_DateAsDateTime, self).__init__(*args, **kwargs)
2✔
152
        self.neutral_time = neutral_time
2✔
153

154
    def load_dialect_impl(self, dialect):
2✔
155
        if dialect.name == "sqlite":
2✔
156
            return sqlite.DATETIME(
2✔
157
                storage_format="%(year)04d-%(month)02d-%(day)02d %(hour)02d:%(minute)02d:%(second)02d",
158
                regexp=r"(\d{4})-?(\d{2})-?(\d{2}) ?(\d{2}):?(\d{2}):?(\d{2})",
159
            )
160
        else:
161
            return types.DateTime()
2✔
162

163
    def process_bind_param(self, value, dialect):
2✔
164
        if value is not None:
2✔
165
            assert isinstance(value, datetime.date) and not isinstance(
2✔
166
                value, datetime.datetime
167
            ), "value {} is not of type datetime.date but type {}".format(value, type(value))
168
            if self.neutral_time:
2✔
169
                result = datetime.datetime.combine(value, datetime.time(10, 59, 0))
2✔
170
            else:
171
                result = tz.localize(datetime.datetime.combine(value, datetime.time(0, 0, 0))).astimezone(utc)
2✔
172
            return result.replace(tzinfo=None)
2✔
173

174
    def process_result_value(self, value, dialect):
2✔
175
        if value is not None:
2✔
176
            r = utc.localize(value).astimezone(tz)
2✔
177
            return r.date()
2✔
178

179

180
class _Date(types.TypeDecorator):
2✔
181
    """Used to customise the DateTime type for sqlite (ie without the separators as in gnucash"""
182

183
    impl = types.TypeEngine
2✔
184
    is_sqlite = False
2✔
185

186
    def load_dialect_impl(self, dialect):
2✔
187
        if dialect.name == "sqlite":
2✔
188
            return sqlite.DATE(
2✔
189
                storage_format="%(year)04d%(month)02d%(day)02d",
190
                regexp=r"(\d{4})(\d{2})(\d{2})",
191
            )
192
        else:
193
            return types.Date()
2✔
194

195

196
def mapped_to_slot_property(col, slot_name, slot_transform=lambda x: x):
2✔
197
    """Assume the attribute in the class as the same name as the table column with "_" prepended"""
198
    col_name = "_{}".format(col.name)
2✔
199

200
    def fget(self):
2✔
201
        return getattr(self, col_name)
2✔
202

203
    def fset(self, value):
2✔
204
        v = slot_transform(value)
2✔
205
        if v is None:
2✔
206
            if slot_name in self:
2✔
207
                del self[slot_name]
×
208
        else:
209
            self[slot_name] = v
2✔
210

211
        setattr(self, col_name, value)
2✔
212

213
    def expr(cls):
2✔
214
        return col
2✔
215

216
    return hybrid_property(
2✔
217
        fget=fget,
218
        fset=fset,
219
        expr=expr,
220
    )
221

222

223
def pure_slot_property(slot_name, slot_transform=lambda x: x, ignore_invalid_slot=False):
2✔
224
    """
225
    Create a property (class must have slots) that maps to a slot
226

227
    :param slot_name: name of the slot
228
    :param slot_transform: transformation to operate before assigning value
229
    :param ignore_invalid_slot: True if incorrect values (usually due to deleted data)
230
        should be converted to None
231
    :return:
232
    """
233

234
    def fget(self):
2✔
235
        # return None if the slot does not exist. alternative could be to raise an exception
236
        try:
2✔
237
            return self[slot_name].value
2✔
238
        except KeyError:
2✔
239
            return None
2✔
240
        except orm_exc.NoResultFound:
2✔
241
            if ignore_invalid_slot:
2✔
242
                return None
2✔
243
            else:
244
                raise
×
245

246
    def fset(self, value):
2✔
247
        v = slot_transform(value)
2✔
248
        if v is None:
2✔
249
            if slot_name in self:
×
250
                del self[slot_name]
×
251
        else:
252
            self[slot_name] = v
2✔
253

254
    return hybrid_property(
2✔
255
        fget=fget,
256
        fset=fset,
257
    )
258

259

260
def kvp_attribute(name, to_gnc=lambda v: v, from_gnc=lambda v: v, default=None):
2✔
261
    def getter(self):
2✔
262
        try:
2✔
263
            return from_gnc(self[name].value)
2✔
264
        except KeyError:
2✔
265
            return default
2✔
266

267
    def setter(self, value):
2✔
268
        if value == default:
2✔
269
            try:
2✔
270
                del self[name]
2✔
271
            except KeyError:
2✔
272
                pass
2✔
273
        else:
274
            self[name] = to_gnc(value)
2✔
275

276
    return property(getter, setter)
2✔
277

278

279
def get_foreign_keys(metadata, engine):
2✔
280
    """Retrieve all foreign keys from metadata bound to an engine
281
    :param metadata:
282
    :param engine:
283
    :return:
284
    """
285
    reflected_metadata = MetaData()
×
286
    for table_name in list(metadata.tables.keys()):
×
287
        table = Table(
×
288
            table_name,
289
            reflected_metadata,
290
            autoload=True,
291
            autoload_with=engine,
292
        )
293

294
        for constraint in table.constraints:
×
295
            if not isinstance(constraint, ForeignKeyConstraint):
×
296
                continue
×
297
            yield constraint
×
298

299

300
Session = sessionmaker(autoflush=False)
2✔
301

302

303
def create_piecash_engine(uri_conn, **kwargs):
2✔
304
    eng = create_engine(uri_conn, **kwargs)
2✔
305

306
    if eng.name == "sqlite":
2✔
307
        # add proper isolation code for sqlite engine
308
        @event.listens_for(eng, "connect")
2✔
309
        def do_connect(dbapi_connection, connection_record):
2✔
310
            # disable pysqlite's emitting of the BEGIN statement entirely.
311
            # also stops it from emitting COMMIT before any DDL.
312
            # print("=========================== in DO CONNECT")
313
            # dbapi_connection.isolation_level = "IMMEDIATE"
314
            # dbapi_connection.isolation_level = "EXCLUSIVE"
315
            pass
2✔
316

317
        @event.listens_for(eng, "begin")
2✔
318
        def do_begin(conn):
2✔
319
            # emit our own BEGIN
320
            # print("=========================== in DO BEGIN")
321
            # conn.execute("BEGIN EXCLUSIVE")
322
            pass
2✔
323

324
    return eng
2✔
325

326

327
class ChoiceType(types.TypeDecorator):
2✔
328
    impl = types.INTEGER()
2✔
329

330
    def __init__(self, choices, **kw):
2✔
331
        self.choices = dict(choices)
2✔
332
        super(ChoiceType, self).__init__(**kw)
2✔
333

334
    def process_bind_param(self, value, dialect):
2✔
335
        try:
2✔
336
            return [k for k, v in self.choices.items() if v == value][0]
2✔
337
        except IndexError:
×
338
            # print("Value '{}' is not in [{}]".format(", ".join(self.choices.values())))
NEW
339
            raise ValueError("Value '{}' is not in choices [{}]".format(value, ", ".join(self.choices.values())))
×
340

341
    def process_result_value(self, value, dialect):
2✔
342
        return self.choices[value]
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc