• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

corridor / sqlalchemy-history / 25801782794

13 May 2026 01:20PM UTC coverage: 95.68%. First build
25801782794

Pull #170

github

web-flow
Merge 8135f8bd4 into 5fe88b105
Pull Request #170: refactor: add PEP 484 type annotations across the codebase (#164)

286 of 320 new or added lines in 22 files covered. (89.38%)

5471 of 5718 relevant lines covered (95.68%)

9.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.59
/sqlalchemy_history/fetcher.py
1
"""Fetcher Module helps traverse across versions for a given versioned object."""
2

3
import operator
10✔
4
import typing as t
10✔
5

6

7
if t.TYPE_CHECKING:
10✔
NEW
8
    from sqlalchemy_history.manager import VersioningManager
×
9

10
import sqlalchemy as sa
10✔
11
from sqlalchemy.ext.asyncio import async_object_session
10✔
12
from sqlalchemy.orm import aliased, object_session
10✔
13
from sqlalchemy_utils import get_primary_keys, identity
10✔
14

15
from sqlalchemy_history.utils import end_tx_column_name, tx_column_name
10✔
16

17

18
def parent_identity(obj_or_class: t.Any) -> tuple:
10✔
19
    return tuple(
10✔
20
        getattr(obj_or_class, column_key)
21
        for column_key in get_primary_keys(obj_or_class)
22
        if column_key != tx_column_name(obj_or_class)
23
    )
24

25

26
def eqmap(callback: t.Callable, iterable: t.Iterable) -> t.Iterator[bool]:
10✔
27
    for a, b in zip(*map(callback, iterable)):
10✔
28
        yield a == b
10✔
29

30

31
def parent_criteria(obj: t.Any, class_: t.Optional[type] = None) -> t.Iterator[bool]:
10✔
32
    if class_ is None:
10✔
33
        class_ = obj.__class__
10✔
34
    return eqmap(parent_identity, (class_, obj))
10✔
35

36

37
class VersionObjectFetcher:
10✔
38
    def __init__(self, manager: "VersioningManager") -> None:
10✔
39
        self.manager = manager
10✔
40

41
    def previous(self, obj: t.Any) -> t.Optional[t.Any]:
10✔
42
        """
43
        Returns the previous version relative to this version in the version
44
        history. If current version is the first version this method returns
45
        None.
46
        """
47
        session = object_session(obj)
10✔
48
        return session.scalars(self.previous_query(obj).limit(1)).first()
10✔
49

50
    def index(self, obj: t.Any) -> int:
10✔
51
        """
52
        Return the index of this version in the version history.
53
        """
54
        session = object_session(obj)
10✔
55
        return session.scalar(self._index_query(obj))
10✔
56

57
    def next(self, obj: t.Any) -> t.Optional[t.Any]:
10✔
58
        """
59
        Returns the next version relative to this version in the version
60
        history. If current version is the last version this method returns
61
        None.
62
        """
63
        session = object_session(obj)
10✔
64
        return session.scalars(self.next_query(obj).limit(1)).first()
10✔
65

66
    async def aprevious(self, obj: t.Any) -> t.Optional[t.Any]:
10✔
67
        """
68
        Returns the previous version relative to this version in the version
69
        history. If current version is the first version this method returns
70
        None.
71

72
        Use this when working with async SQLAlchemy.
73
        """
74
        async_session = async_object_session(obj)
10✔
75
        return (await async_session.scalars(self.previous_query(obj).limit(1))).first()
10✔
76

77
    async def aindex(self, obj: t.Any) -> int:
10✔
78
        """
79
        Return the index of this version in the version history.
80

81
        Use this when working with async SQLAlchemy.
82
        """
83
        async_session = async_object_session(obj)
10✔
84
        return await async_session.scalar(self._index_query(obj))
10✔
85

86
    async def anext(self, obj: t.Any) -> t.Optional[t.Any]:
10✔
87
        """
88
        Returns the next version relative to this version in the version
89
        history. If current version is the last version this method returns
90
        None.
91

92
        Use this when working with async SQLAlchemy.
93
        """
94
        async_session = async_object_session(obj)
10✔
95
        return (await async_session.scalars(self.next_query(obj).limit(1))).first()
10✔
96

97
    def _transaction_id_subquery(
10✔
98
        self, obj: t.Any, next_or_prev: str = "next", alias: t.Optional[type] = None
99
    ) -> sa.ScalarSelect:
100
        if next_or_prev == "next":
10✔
101
            op = operator.gt
10✔
102
            func = sa.func.min
10✔
103
        else:
104
            op = operator.lt
10✔
105
            func = sa.func.max
10✔
106

107
        if alias is None:
10✔
108
            alias = aliased(obj.__class__)
10✔
109
            table = alias.__table__
10✔
110
            attrs = alias.c if hasattr(alias, "c") else alias
10✔
111
        else:
112
            table = alias.original
10✔
113
            attrs = alias.c
10✔
114
        query = (
10✔
115
            sa.select(func(getattr(attrs, tx_column_name(obj))))
116
            .select_from(table)
117
            .where(
118
                sa.and_(
119
                    op(
120
                        getattr(attrs, tx_column_name(obj)),
121
                        getattr(obj, tx_column_name(obj)),
122
                    ),
123
                    *[
124
                        getattr(attrs, pk) == getattr(obj, pk)
125
                        for pk in get_primary_keys(obj.__class__)
126
                        if pk != tx_column_name(obj)
127
                    ],
128
                )
129
            )
130
            .correlate(table)
131
        )
132
        return query.scalar_subquery()
10✔
133

134
    def _next_prev_query(self, obj: t.Any, next_or_prev: str = "next") -> sa.Select:
10✔
135
        subquery = self._transaction_id_subquery(obj, next_or_prev=next_or_prev)
10✔
136
        subquery = subquery.scalar_subquery()
10✔
137

138
        return sa.select(obj.__class__).filter(
10✔
139
            sa.and_(getattr(obj.__class__, tx_column_name(obj)) == subquery, *parent_criteria(obj))
140
        )
141

142
    def _index_query(self, obj: t.Any) -> sa.Select:
10✔
143
        """
144
        Returns the query needed for fetching the index of this record relative
145
        to version history.
146
        """
147
        alias = aliased(obj.__class__)
10✔
148

149
        subquery = (
10✔
150
            sa.select(sa.func.count("1"))
151
            .select_from(alias.__table__)
152
            .where(getattr(alias, tx_column_name(obj)) < getattr(obj, tx_column_name(obj)))
153
            .correlate(alias.__table__)
154
            .label("position")
155
        )
156
        return (
10✔
157
            sa.select(subquery)
158
            .select_from(obj.__table__)
159
            .where(sa.and_(*eqmap(identity, (obj.__class__, obj))))
160
            .order_by(getattr(obj.__class__, tx_column_name(obj)))
161
        )
162

163

164
class SubqueryFetcher(VersionObjectFetcher):
10✔
165
    def previous_query(self, obj: t.Any) -> sa.Select:
10✔
166
        """
167
        Returns the query that fetches the previous version relative to this
168
        version in the version history.
169
        """
170
        return self._next_prev_query(obj, "previous")
10✔
171

172
    def next_query(self, obj: t.Any) -> sa.Select:
10✔
173
        """
174
        Returns the query that fetches the next version relative to this
175
        version in the version history.
176
        """
177
        return self._next_prev_query(obj, "next")
10✔
178

179

180
class ValidityFetcher(VersionObjectFetcher):
10✔
181
    def next_query(self, obj: t.Any) -> sa.Select:
10✔
182
        """
183
        Returns the query that fetches the next version relative to this
184
        version in the version history.
185
        """
186
        return sa.select(obj.__class__).filter(
10✔
187
            sa.and_(
188
                getattr(obj.__class__, tx_column_name(obj)) == getattr(obj, end_tx_column_name(obj)),
189
                *parent_criteria(obj),
190
            )
191
        )
192

193
    def previous_query(self, obj: t.Any) -> sa.Select:
10✔
194
        """
195
        Returns the query that fetches the previous version relative to this
196
        version in the version history.
197
        """
198
        return sa.select(obj.__class__).filter(
10✔
199
            sa.and_(
200
                getattr(obj.__class__, end_tx_column_name(obj)) == getattr(obj, tx_column_name(obj)),
201
                *parent_criteria(obj),
202
            )
203
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc