• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

xzkostyan / clickhouse-sqlalchemy / 23057175585

13 Mar 2026 03:10PM UTC coverage: 85.465%. First build
23057175585

Pull #401

github

web-flow
Merge 3b97a645d into 25721c3cd
Pull Request #401: Fix compatibility with asynch 0.3.1+, Alembic 1.18+, and SQLAlchemy 2.0.44+ (fixes #400)

26 of 45 new or added lines in 4 files covered. (57.78%)

2499 of 2924 relevant lines covered (85.47%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

18.28
/clickhouse_sqlalchemy/alembic/comparators.py
1
import logging
2

3
from alembic import __version__ as alembic_version
1✔
4
from alembic.autogenerate import comparators
1✔
5
from alembic.operations.ops import ModifyTableOps
1✔
6
from sqlalchemy import schema as sa_schema
1✔
7
from sqlalchemy import text
1✔
8

9
from clickhouse_sqlalchemy.sql.schema import Table
1✔
10
from . import operations
1✔
11

12
logger = logging.getLogger(__name__)
1✔
13

14
alembic_version = tuple(
1✔
15
    (int(x) if x.isdigit() else x) for x in alembic_version.split('.')
16
)
17
_alembic_118 = alembic_version >= (1, 18, 0)
1✔
18

19
if _alembic_118:
1✔
20
    from alembic.autogenerate.compare.tables import _compare_columns
1✔
21
else:
NEW
22
    from alembic.autogenerate.compare import _compare_columns
×
23

24

25
def _extract_to_table_name(create_table_query):
1✔
26
    query = create_table_query
×
27
    # Naive inner name detection
28
    brace = query.index('(')
×
29
    inner_name = query[query.index(' TO ', 0, brace) + 4:brace - 1].strip(' `')
×
30
    return inner_name.split('.')[1] if '.' in inner_name else inner_name
×
31

32

33
# Direct call .dispatch_for('schema', 'clickhouse') override an Alembic
34
# default ('schema', 'default') comparator. To avoid it (as we have own
35
# implementation only for a materialized views) register default "schema"
36
# comparators as "clickhouse" comparators too.
37
if _alembic_118:
1✔
38
    for (target, qualifier, priority), fns in comparators._registry.items():
1✔
NEW
39
        if target == 'schema' and qualifier == 'default':
×
NEW
40
            for fn, _ in fns:
×
NEW
41
                comparators.dispatch_for(
×
42
                    'schema', qualifier='clickhouse', priority=priority
43
                )(fn)
44
else:
NEW
45
    for fn in comparators._registry[('schema', 'default')]:
×
NEW
46
        comparators.dispatch_for('schema', qualifier='clickhouse')(fn)
×
47

48

49
@comparators.dispatch_for('schema', qualifier='clickhouse')
1✔
50
def compare_mat_view(autogen_context, upgrade_ops, schemas):
1✔
51
    connection = autogen_context.connection
×
52
    dialect = autogen_context.dialect
×
53
    metadata = autogen_context.metadata
×
54

55
    database_engine = dialect._execute(
×
56
        connection,
57
        text(
58
            'SELECT engine '
59
            'FROM system.databases '
60
            'WHERE database = currentDatabase()'
61
        ), scalar=True
62
    )
63

64
    is_atomic = database_engine.lower() == 'atomic'
×
65
    logger.info('Database engine: %s', database_engine)
×
66
    if is_atomic:
×
67
        logger.info('Using "TO table" for materialized views storage')
×
68
    else:
69
        logger.info('Using ".inner" for materialized views storage by default')
×
70

71
    all_mat_views = set(dialect.get_view_names(connection))
×
72

73
    metadata_mat_views = metadata.info.setdefault('mat_views', set())
×
74

75
    statement_compiler = dialect.statement_compiler(dialect, None)
×
76
    ddl_compiler = dialect.ddl_compiler(dialect, None)
×
77

78
    for name in sorted(metadata_mat_views.difference(all_mat_views)):
×
79
        view = metadata.mat_views[name]
×
80

81
        selectable = statement_compiler.process(
×
82
            view.mv_selectable, literal_binds=True
83
        )
84

85
        logger.info('Detected added materialized view %s', name)
×
86

87
        if is_atomic or view.to:
×
88
            create = operations.CreateMatViewToTableOp(
×
89
                view.name, selectable, view.inner_table.name
90
            )
91

92
        else:
93
            inner_table = view.inner_table
×
94
            engine = ddl_compiler.process(inner_table.engine)
×
95

96
            create = operations.CreateMatViewOp(
×
97
                view.name, selectable, engine, *inner_table.columns
98
            )
99

100
        upgrade_ops.ops.append(create)
×
101

102
    existing_metadata = sa_schema.MetaData()
×
103
    inspector = autogen_context.inspector
×
104

105
    removed_mat_views = all_mat_views.difference(metadata_mat_views)
×
106
    existing_mat_views = all_mat_views.intersection(metadata_mat_views)
×
107

108
    mat_view_params_by_name = {}
×
109
    if removed_mat_views | existing_mat_views:
×
110
        rv = dialect._execute(
×
111
            connection,
112
            text(
113
                'SELECT name, as_select, engine_full, create_table_query '
114
                'FROM system.tables '
115
                'WHERE database = currentDatabase() AND name IN :names'
116
            ), names=list(removed_mat_views | existing_mat_views)
117
        )
118
        mat_view_params_by_name = {x.name: x for x in rv}
×
119

120
    for name in sorted(removed_mat_views):
×
121
        logger.info('Detected removed materialized view %s', name)
×
122
        params = mat_view_params_by_name[name]
×
123

124
        try:
×
125
            inner_name = _extract_to_table_name(params.create_table_query)
×
126
        except ValueError:
×
127
            inner_name = None
×
128

129
        if inner_name:
×
130
            drop = operations.DropMatViewToTableOp(
×
131
                name, params.as_select, inner_name
132
            )
133
        else:
134
            table = Table(name, existing_metadata)
×
NEW
135
            inspector.reflect_table(table, include_columns=None)
×
136

137
            drop = operations.DropMatViewOp(
×
138
                name, params.as_select, params.engine_full, *table.columns
139
            )
140

141
        upgrade_ops.ops.append(drop)
×
142

143
    for name in sorted(existing_mat_views):
×
144
        view = metadata.mat_views[name]
×
145
        params = mat_view_params_by_name[name]
×
146
        if is_atomic or view.to:
×
147
            inner_name = _extract_to_table_name(params.create_table_query)
×
148
        else:
149
            inner_name = '.inner.' + name
×
150

151
        conn_table = Table(inner_name, existing_metadata)
×
NEW
152
        inspector.reflect_table(conn_table, include_columns=None)
×
153

154
        if not autogen_context.run_object_filters(
×
155
            view, name, 'mat_view', False, conn_table
156
        ):
157
            return
×
158

159
        metadata_table = metadata.mat_views[name].inner_table
×
160
        modify_table_ops = ModifyTableOps(name, [])
×
161
        schema = None
×
162
        with _compare_columns(
×
163
                schema,
164
                inner_name,
165
                conn_table,
166
                metadata_table,
167
                modify_table_ops,
168
                autogen_context,
169
                inspector,
170
        ):
171
            comparators.dispatch('table')(
×
172
                autogen_context,
173
                modify_table_ops,
174
                schema,
175
                inner_name,
176
                conn_table,
177
                metadata_table,
178
            )
179

180
        if not modify_table_ops.is_empty():
×
181
            selectable = statement_compiler.process(
×
182
                view.mv_selectable, literal_binds=True
183
            )
184
            engine = ddl_compiler.process(metadata_table.engine)
×
185

186
            if is_atomic or view.to:
×
187
                ops = [
×
188
                    operations.DropMatViewToTableOp(
189
                        name, params.as_select, metadata_table.name
190
                    ),
191
                    modify_table_ops,
192
                    operations.CreateMatViewToTableOp(
193
                        view.name, selectable, metadata_table.name
194
                    )
195
                ]
196
            else:
197
                ops = [
×
198
                    operations.DetachMatViewOp(
199
                        name, params.as_select, engine,
200
                        *metadata_table.columns
201
                    ),
202
                    modify_table_ops,
203
                    operations.AttachMatViewOp(
204
                        name, selectable, engine, *metadata_table.columns
205
                    )
206
                ]
207

208
            upgrade_ops.ops.extend(ops)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc