• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hasgeek / coaster / 9244418196

26 May 2024 03:39PM UTC coverage: 84.467% (-4.8%) from 89.263%
9244418196

push

github

web-flow
Add async support for Quart+Flask (#470)

This commit bumps the version number from 0.7 to 0.8 as it has extensive changes:

* Ruff replaces black, isort and flake8 for linting and formatting
* All decorators now support async functions and provide async wrapper implementations
* Some obsolete modules have been removed
* Pagination from Flask-SQLAlchemy is now included, removing that dependency (but still used in tests)
* New `compat` module provides wrappers to both Quart and Flake and is used by all other modules
* Some tests run using Quart. The vast majority of tests are not upgraded, nor are there tests for async decorators, so overall line coverage has dropped significantly. Comprehensive test coverage is still pending; for now we are using Funnel's tests as the extended test suite

648 of 1023 new or added lines in 29 files covered. (63.34%)

138 existing lines in 17 files now uncovered.

3948 of 4674 relevant lines covered (84.47%)

3.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.77
/src/coaster/sqlalchemy/functions.py
1
"""SQLAlchemy helper functions."""
4✔
2

3
from __future__ import annotations
4✔
4

5
from datetime import datetime
4✔
6
from typing import Any, Optional, TypeVar, Union, cast, overload
4✔
7

8
import sqlalchemy as sa
4✔
9
import sqlalchemy.exc as sa_exc
4✔
10
import sqlalchemy.orm as sa_orm
4✔
11
from sqlalchemy import inspect
4✔
12
from sqlalchemy.ext.compiler import compiles
4✔
13
from sqlalchemy.orm import DeclarativeBase
4✔
14

15
from .query import relationship
4✔
16

17
__all__ = [
4✔
18
    'make_timestamp_columns',
19
    'failsafe_add',
20
    'add_primary_relationship',
21
    'auto_init_default',
22
    'idfilters',
23
]
24

25
T = TypeVar('T')
4✔
26

27
# --- SQL functions --------------------------------------------------------------------
28

29

30
# Provide sqlalchemy.func.utcnow()
31
# Adapted from https://docs.sqlalchemy.org/en/14/core/compiler.html
32
# #utc-timestamp-function
33
class UtcNow(sa.sql.functions.GenericFunction):
4✔
34
    """Provide ``sqlalchemy.func.utcnow()`` that guarantees UTC timestamp."""
4✔
35

36
    type = sa.TIMESTAMP()
4✔
37
    identifier = 'utcnow'
4✔
38
    inherit_cache = True
4✔
39

40

41
@compiles(UtcNow)
4✔
42
def _utcnow_default(_element: UtcNow, _compiler: Any, **_kwargs) -> str:
4✔
43
    return 'CURRENT_TIMESTAMP'
4✔
44

45

46
@compiles(UtcNow, 'mysql')
47
def _utcnow_mysql(  # pragma: no cover
48
    _element: UtcNow,
49
    _compiler: Any,
50
    **_kwargs,
51
) -> str:
52
    return 'UTC_TIMESTAMP()'
53

54

55
@compiles(UtcNow, 'mssql')
56
def _utcnow_mssql(  # pragma: no cover
57
    _element: UtcNow,
58
    _compiler: Any,
59
    **_kwargs,
60
) -> str:
61
    return 'SYSUTCDATETIME()'
62

63

64
# --- Helper functions -----------------------------------------------------------------
65

66

67
def make_timestamp_columns(
4✔
68
    timezone: bool = False,
69
) -> tuple[sa.Column[datetime], sa.Column[datetime]]:
70
    """Return two columns, `created_at` and `updated_at`, with appropriate defaults."""
71
    return (
4✔
72
        sa.Column(
73
            'created_at',
74
            sa.TIMESTAMP(timezone=timezone),
75
            default=sa.func.utcnow(),
76
            nullable=False,
77
        ),
78
        sa.Column(
79
            'updated_at',
80
            sa.TIMESTAMP(timezone=timezone),
81
            default=sa.func.utcnow(),
82
            onupdate=sa.func.utcnow(),
83
            nullable=False,
84
        ),
85
    )
86

87

88
session_type = Union[sa_orm.Session, sa_orm.scoped_session]
4✔
89

90

91
@overload
92
def failsafe_add(__session: session_type, __instance: Any, /) -> None: ...
93

94

95
@overload
96
def failsafe_add(__session: session_type, __instance: T, /, **filters: Any) -> T: ...
97

98

99
def failsafe_add(
4✔
100
    __session: session_type, __instance: T, /, **filters: Any
101
) -> Optional[T]:
102
    """
103
    Add and commit a new instance in a nested transaction (using SQL SAVEPOINT).
104

105
    Gracefully handles failure in case a conflicting entry is already in the
106
    database, which may occur due to parallel requests causing race conditions
107
    in a production environment with multiple workers.
108

109
    Returns the instance saved to database if no error occurred, or loaded from
110
    database using the provided filters if an error occurred. If the filters fail
111
    to load from the database, the original IntegrityError is re-raised, as it
112
    is assumed to imply that the commit failed because of missing or invalid
113
    data, not because of a duplicate entry.
114

115
    However, when no filters are provided, nothing is returned and IntegrityError
116
    is also suppressed as there is no way to distinguish between data validation
117
    failure and an existing conflicting record in the database. Use this option
118
    when failures are acceptable but the cost of verification is not.
119

120
    Usage: ``failsafe_add(db.session, instance, **filters)`` where filters
121
    are the parameters passed to ``Model.query.filter_by(**filters).one()``
122
    to load the instance.
123

124
    You must commit the transaction as usual after calling ``failsafe_add``.
125

126
    :param __session: Database session (positional only)
127
    :param __instance: Instance to commit (positional only)
128
    :param filters: Filters required to load existing instance from the
129
        database in case the commit fails (required)
130
    :return: Instance that is in the database
131
    """
132
    if __instance in __session:
4✔
133
        # This instance is already in the session, most likely due to a
134
        # save-update cascade. SQLAlchemy will flush before beginning a
135
        # nested transaction, which defeats the purpose of nesting, so
136
        # remove it for now and add it back inside the SAVEPOINT.
137
        __session.expunge(__instance)
4✔
138
    savepoint = __session.begin_nested()
4✔
139
    try:
4✔
140
        __session.add(__instance)
4✔
141
        savepoint.commit()
4✔
142
        if filters:
4✔
143
            return __instance
4✔
144
    except sa_exc.IntegrityError as e:
4✔
145
        savepoint.rollback()
4✔
146
        if filters:
4✔
147
            try:
4✔
148
                return __session.query(__instance.__class__).filter_by(**filters).one()
4✔
149
            except sa_exc.NoResultFound:  # Do not trap the other, MultipleResultsFound
4✔
150
                raise e from e
4✔
151
    return None
4✔
152

153

154
def add_primary_relationship(
4✔
155
    parent: type[DeclarativeBase],
156
    childrel: str,
157
    child: type[DeclarativeBase],
158
    parentrel: str,
159
    parentcol: str,
160
) -> sa.Table:
161
    """
162
    Add support for the primary child of a parent, given a one-to-many relationship.
163

164
    This is achieved by creating a secondary table to hold the reference, and if the
165
    database is PostgreSQL, by adding a trigger to ensure foreign key integrity.
166

167
    A SQLAlchemy relationship named ``parent.childrel`` is added that makes
168
    usage seamless within SQLAlchemy.
169

170
    The secondary table is named after the parent and child tables, with
171
    ``_primary`` appended, in the form ``parent_child_primary``. This table can
172
    be found in the metadata in the ``parent.metadata.tables`` dictionary.
173

174
    Multi-column primary keys on either parent or child are unsupported at
175
    this time.
176

177
    :param parent: The parent model (on which this relationship will be added)
178
    :param childrel: The name of the relationship to the child that will be
179
        added
180
    :param child: The child model
181
    :param str parentrel: Name of the existing relationship on the child model
182
        that refers back to the parent model
183
    :param str parentcol: Name of the existing table column on the child model
184
        that refers back to the parent model
185
    :return: Secondary table that was created
186
    """
187
    parent_table_name = cast(str, parent.__tablename__)
4✔
188
    child_table_name = cast(str, child.__tablename__)
4✔
189
    primary_table_name = parent_table_name + '_' + child_table_name + '_primary'
4✔
190
    parent_id_columns = [c.name for c in sa.inspect(parent).primary_key]
4✔
191
    child_id_columns = [c.name for c in sa.inspect(child).primary_key]
4✔
192

193
    primary_table_columns: list[sa.Column] = (
4✔
194
        [
195
            sa.Column(
196
                parent_table_name + '_' + name,
197
                None,
198
                sa.ForeignKey(parent_table_name + '.' + name, ondelete='CASCADE'),
199
                primary_key=True,
200
                nullable=False,
201
            )
202
            for name in parent_id_columns
203
        ]
204
        + [
205
            sa.Column(
206
                child_table_name + '_' + name,
207
                None,
208
                sa.ForeignKey(child_table_name + '.' + name, ondelete='CASCADE'),
209
                nullable=False,
210
            )
211
            for name in child_id_columns
212
        ]
213
        + cast(
214
            list[sa.Column],
215
            list(
216
                make_timestamp_columns(
217
                    timezone=getattr(parent, '__with_timezone__', True)
218
                )
219
            ),
220
        )
221
    )
222

223
    primary_table = sa.Table(
4✔
224
        primary_table_name, parent.metadata, *primary_table_columns
225
    )
226
    rel = relationship(child, uselist=False, secondary=primary_table)
4✔
227
    setattr(parent, childrel, rel)
4✔
228

229
    @sa.event.listens_for(rel, 'set')
4✔
230
    def _validate_child(
4✔
231
        target: Any, value: Any, _oldvalue: Any, _initiator: Any
232
    ) -> None:
233
        if value and getattr(value, parentrel) != target:
4✔
234
            raise ValueError("The target is not affiliated with this parent")
4✔
235

236
    # XXX: To support multi-column primary keys, update this SQL function
237
    sa.event.listen(
4✔
238
        primary_table,
239
        'after_create',
240
        # spell-checker:ignore parentcol plpgsql
241
        sa.DDL(
242
            '''
243
            CREATE FUNCTION %(function)s() RETURNS TRIGGER AS $$
244
            DECLARE
245
                target RECORD;
246
            BEGIN
247
                IF (NEW.%(rhs)s IS NOT NULL) THEN
248
                    SELECT %(parentcol)s INTO target FROM %(child_table_name)s
249
                    WHERE %(child_id_column)s = NEW.%(rhs)s;
250
                    IF (target.%(parentcol)s != NEW.%(lhs)s) THEN
251
                        RAISE foreign_key_violation USING
252
                        MESSAGE = 'The target is not affiliated with this parent';
253
                    END IF;
254
                END IF;
255
                RETURN NEW;
256
            END;
257
            $$ LANGUAGE plpgsql;
258
            CREATE TRIGGER %(trigger)s BEFORE INSERT OR UPDATE
259
            ON %(table)s
260
            FOR EACH ROW EXECUTE PROCEDURE %(function)s();
261
            ''',
262
            context={
263
                'table': primary_table_name,
264
                'function': f'{primary_table_name}_validate',
265
                'trigger': f'{primary_table_name}_trigger',
266
                'parentcol': parentcol,
267
                'child_table_name': child_table_name,
268
                'child_id_column': child_id_columns[0],
269
                'lhs': f'{parent_table_name}_{parent_id_columns[0]}',
270
                'rhs': f'{child_table_name}_{child_id_columns[0]}',
271
            },
272
        ).execute_if(dialect='postgresql'),
273
    )
274

275
    sa.event.listen(
4✔
276
        primary_table,
277
        'before_drop',
278
        sa.DDL(
279
            '''
280
            DROP TRIGGER %(trigger)s ON %(table)s;
281
            DROP FUNCTION %(function)s();
282
            ''',
283
            context={
284
                'table': primary_table_name,
285
                'trigger': f'{primary_table_name}_trigger',
286
                'function': f'{primary_table_name}_validate',
287
            },
288
        ).execute_if(dialect='postgresql'),
289
    )
290
    return primary_table
4✔
291

292

293
def auto_init_default(
4✔
294
    column: Union[sa_orm.ColumnProperty, sa_orm.InstrumentedAttribute],
295
) -> None:
296
    """
297
    Set the default value of a column on first access.
298

299
    SQLAlchemy defaults to setting default values on commit, but code that attempts to
300
    read the value before commit will get None instead of the default value. This
301
    helper fixes that. Usage::
302

303
        auto_init_default(MyModel.column)
304
    """
305
    if isinstance(column, sa_orm.ColumnProperty):
4✔
306
        default = column.columns[0].default
4✔
307
    else:
308
        default = column.default
4✔
309

310
    @sa.event.listens_for(column, 'init_scalar', retval=True, propagate=True)
4✔
311
    def init_scalar(_target: Any, value: Any, dict_: dict[str, Any]) -> Optional[Any]:
4✔
312
        # A subclass may override the column and not provide a default. Watch out for
313
        # that.
314
        if default:
4✔
315
            if default.is_callable:
4✔
316
                value = default.arg(None)
4✔
317
            elif default.is_scalar:
4✔
318
                value = default.arg
4✔
319
            else:
320
                raise NotImplementedError(
321
                    "Can't invoke pre-default for a SQL-level column default"
322
                )
323
            dict_[column.key] = value
4✔
324
            return value
4✔
325
        return None
4✔
326

327

328
def idfilters(obj: DeclarativeBase) -> Optional[list[sa.BinaryExpression]]:
4✔
329
    """
330
    Return SQLAlchemy expressions for the identity of the given object.
331

332
    This is useful when querying for membership in a lazy relationship. With
333
    DynamicMapped (``lazy='dynamic'``)::
334

335
        filtered_query = parent.children.filter(*idfilters(child))
336

337
    Or with WriteOnlyMapped (``lazy='write_only'``)::
338

339
        filtered_select = parent.children.select().where(*idfilters(child))
340

341
    Returns None when the object has no persistent identity.
342
    """
343
    insp = inspect(obj)
4✔
344
    identity = insp.identity
4✔
345
    if identity is None:
4✔
346
        return None
4✔
347
    pkeys = insp.mapper.primary_key
4✔
348
    if len(pkeys) == 1:
4✔
349
        return [pkeys[0] == identity[0]]
4✔
UNCOV
350
    return [column == value for column, value in zip(pkeys, identity)]
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc