• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jayvynl / django-clickhouse-backend / 18678782196

21 Oct 2025 09:07AM UTC coverage: 85.299% (+0.02%) from 85.279%
18678782196

Pull #140

github

web-flow
Merge 5e6850a3b into a08b87117
Pull Request #140: Add pre-commit with ruff

1 of 1 new or added line in 1 file covered. (100.0%)

21 existing lines in 1 file now uncovered.

3197 of 3748 relevant lines covered (85.3%)

24.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.84
/clickhouse_backend/patch/migrations.py
1
from django.apps.registry import Apps
19✔
2
from django.db import DatabaseError
29✔
3
from django.db import models as django_models
29✔
4
from django.db.migrations import Migration
29✔
5
from django.db.migrations.exceptions import IrreversibleError, MigrationSchemaMissing
29✔
6
from django.db.migrations.operations.fields import FieldOperation
29✔
7
from django.db.migrations.operations.models import (
29✔
8
    DeleteModel,
9
    IndexOperation,
10
    ModelOperation,
11
    RenameModel,
12
)
13
from django.db.migrations.recorder import MigrationRecorder
29✔
14
from django.utils.timezone import now
29✔
15

16
__all__ = ["patch_migrations", "patch_migration_recorder", "patch_migration"]
29✔
17

18

19
def _should_distribute_migrations(connection):
29✔
20
    """
21
    Check if the connection is configured for distributed migrations.
22
    """
23
    return getattr(connection, "distributed_migrations", False) and getattr(
29✔
24
        connection, "migration_cluster", None
25
    )
26

27

28
def _get_model_table_name(connection):
29✔
29
    """
30
    Return the name of the table that will be used by the MigrationRecorder.
31
    If distributed migrations are enabled, return the distributed table name.
32
    Otherwise, return the regular django_migrations table name.
33
    """
34
    if _should_distribute_migrations(connection):
29✔
35
        return "distributed_django_migrations"
29✔
36
    return "django_migrations"
29✔
37

38

39
def _check_replicas(connection):
29✔
40
    """
41
    Check if the connection has replicas configured for the migration cluster.
42
    """
43
    if hasattr(connection, "has_replicas"):
29✔
44
        return connection.has_replicas
×
45

46
    with connection.cursor() as cursor:
29✔
47
        replica_count = _get_replicas(connection.migration_cluster, cursor)
29✔
48
    return replica_count >= 1
29✔
49

50

51
def _get_replicas(cluster_name, cursor):
29✔
52
    cursor.execute(
29✔
53
        "select replica_num from system.clusters where cluster=%s", [cluster_name]
54
    )
55
    res = cursor.fetchone()
29✔
56
    if not res:
29✔
57
        return 0
29✔
58
    return res[0]
29✔
59

60

61
def patch_migrations():
29✔
62
    patch_migration_recorder()
29✔
63
    patch_migration()
29✔
64

65

66
def patch_migration_recorder():
29✔
67
    def Migration(self):
29✔
68
        """
69
        Lazy load to avoid AppRegistryNotReady if installed apps import
70
        MigrationRecorder.
71
        """
72
        if self._migration_class is None:
29✔
73
            if self.connection.vendor == "clickhouse":
29✔
74
                from clickhouse_backend import models
29✔
75
                from clickhouse_backend.models import currentDatabase
29✔
76

77
                # Only create a distributed migration model if the connection
78
                # has distributed migrations enabled and a migration cluster is set.
79
                # otherwise, create a regular merge tree.
80
                if _should_distribute_migrations(self.connection):
29✔
81
                    has_replicas = _check_replicas(self.connection)
29✔
82

83
                    Engine = models.MergeTree
29✔
84
                    if has_replicas:
29✔
85
                        Engine = models.ReplicatedMergeTree
29✔
86

87
                    self.connection.has_replicas = has_replicas
29✔
88

89
                    class _Migration(models.ClickhouseModel):
29✔
90
                        app = models.StringField(max_length=255)
29✔
91
                        name = models.StringField(max_length=255)
29✔
92
                        applied = models.DateTime64Field(default=now)
29✔
93
                        deleted = models.BoolField(default=False)
29✔
94

95
                        class Meta:
29✔
96
                            apps = Apps()
29✔
97
                            app_label = "migrations"
29✔
98
                            db_table = "django_migrations"
29✔
99
                            engine = Engine(order_by=("app", "name"))
29✔
100
                            cluster = self.connection.migration_cluster
29✔
101

102
                        def __str__(self):
29✔
UNCOV
103
                            return "Migration %s for %s" % (self.name, self.app)
×
104

105
                    class Migration(models.ClickhouseModel):
29✔
106
                        app = models.StringField(max_length=255)
29✔
107
                        name = models.StringField(max_length=255)
29✔
108
                        applied = models.DateTime64Field(default=now)
29✔
109
                        deleted = models.BoolField(default=False)
29✔
110

111
                        class Meta:
29✔
112
                            apps = Apps()
29✔
113
                            app_label = "migrations"
29✔
114
                            db_table = _get_model_table_name(self.connection)
29✔
115
                            engine = models.Distributed(
29✔
116
                                self.connection.migration_cluster,
117
                                currentDatabase(),
118
                                _Migration._meta.db_table,
119
                                models.Rand(),
120
                            )
121
                            cluster = self.connection.migration_cluster
29✔
122

123
                    Migration._meta.local_model_class = _Migration
29✔
124

125
                else:
126

127
                    class Migration(models.ClickhouseModel):
29✔
128
                        app = models.StringField(max_length=255)
29✔
129
                        name = models.StringField(max_length=255)
29✔
130
                        applied = models.DateTime64Field(default=now)
29✔
131
                        deleted = models.BoolField(default=False)
29✔
132

133
                        class Meta:
29✔
134
                            apps = Apps()
29✔
135
                            app_label = "migrations"
29✔
136
                            db_table = _get_model_table_name(self.connection)
29✔
137
                            engine = models.MergeTree(order_by=("app", "name"))
29✔
138
                            cluster = getattr(
29✔
139
                                self.connection, "migration_cluster", None
140
                            )
141

142
                        def __str__(self):
29✔
UNCOV
143
                            return "Migration %s for %s" % (self.name, self.app)
×
144

145
            else:
146

147
                class Migration(django_models.Model):
29✔
148
                    app = django_models.CharField(max_length=255)
29✔
149
                    name = django_models.CharField(max_length=255)
29✔
150
                    applied = django_models.DateTimeField(default=now)
29✔
151

152
                    class Meta:
29✔
153
                        apps = Apps()
29✔
154
                        app_label = "migrations"
29✔
155
                        db_table = "django_migrations"
29✔
156

157
                    def __str__(self):
29✔
UNCOV
158
                        return "Migration %s for %s" % (self.name, self.app)
×
159

160
            self._migration_class = Migration
29✔
161
        return self._migration_class
29✔
162

163
    def has_table(self):
29✔
164
        """Return True if the django_migrations table exists."""
165
        # Assert migration table won't be deleted once created.
166
        if not getattr(self, "_has_table", False):
29✔
167
            with self.connection.cursor() as cursor:
29✔
168
                table = self.Migration._meta.db_table
29✔
169
                tables = self.connection.introspection.table_names(cursor)
29✔
170
                self._has_table = table in tables
29✔
171
                if self._has_table and self.connection.vendor == "clickhouse":
29✔
172
                    # fix https://github.com/jayvynl/django-clickhouse-backend/issues/51
173
                    cursor.execute(
29✔
174
                        f"ALTER table {table} ADD COLUMN IF NOT EXISTS deleted Bool"
175
                    )
176
        return self._has_table
29✔
177

178
    def ensure_schema(self):
29✔
179
        """Ensure the table exists and has the correct schema."""
180
        # If the table's there, that's fine - we've never changed its schema
181
        # in the codebase.
182
        if self.has_table():
29✔
183
            return
29✔
184

185
        # In case of distributed migrations, we need to ensure the local model exists first and
186
        # then create the distributed model.
187
        try:
29✔
188
            with self.connection.schema_editor() as editor:
29✔
189
                if (
29✔
190
                    editor.connection.vendor == "clickhouse"
191
                    and _should_distribute_migrations(editor.connection)
192
                ):
193
                    with editor.connection.cursor() as cursor:
29✔
194
                        tables = editor.connection.introspection.table_names(cursor)
29✔
195
                    local_model_class = self.Migration._meta.local_model_class
29✔
196
                    local_table = local_model_class._meta.db_table
29✔
197
                    if local_table not in tables:
29✔
198
                        # Create the local model first
199
                        editor.create_model(self.Migration._meta.local_model_class)
29✔
200

201
                editor.create_model(self.Migration)
29✔
UNCOV
202
        except DatabaseError as exc:
×
UNCOV
203
            raise MigrationSchemaMissing(
×
204
                "Unable to create the django_migrations table (%s)" % exc
205
            )
206

207
    def migration_qs(self):
29✔
208
        if self.connection.vendor == "clickhouse":
29✔
209
            return self.Migration.objects.using(self.connection.alias).filter(
29✔
210
                deleted=False
211
            )
212
        return self.Migration.objects.using(self.connection.alias)
29✔
213

214
    def record_applied(self, app, name):
29✔
215
        """Record that a migration was applied."""
216
        self.ensure_schema()
29✔
217
        if self.connection.vendor == "clickhouse" and (
29✔
218
            self.Migration.objects.using(self.connection.alias)
219
            .filter(app=app, name=name)
220
            .exists()
221
        ):
222
            self.Migration.objects.using(self.connection.alias).filter(
29✔
223
                app=app, name=name
224
            ).settings(mutations_sync=1).update(deleted=False)
225
        else:
226
            self.migration_qs.create(app=app, name=name)
29✔
227

228
    def record_unapplied(self, app, name):
29✔
229
        """Record that a migration was unapplied."""
230
        self.ensure_schema()
29✔
231
        if self.connection.vendor == "clickhouse":
29✔
232
            self.migration_qs.filter(app=app, name=name).settings(
29✔
233
                mutations_sync=1
234
            ).update(deleted=True)
235
        else:
236
            self.migration_qs.filter(app=app, name=name).delete()
×
237

238
    def flush(self):
29✔
239
        """Delete all migration records. Useful for testing migrations."""
240
        if self.connection.vendor == "clickhouse":
29✔
241
            self.migration_qs.settings(mutations_sync=1).delete()
29✔
242
        else:
UNCOV
243
            self.migration_qs.all().delete()
×
244

245
    MigrationRecorder.Migration = property(Migration)
29✔
246
    MigrationRecorder.has_table = has_table
29✔
247
    MigrationRecorder.ensure_schema = ensure_schema
29✔
248
    MigrationRecorder.migration_qs = property(migration_qs)
29✔
249
    MigrationRecorder.record_applied = record_applied
29✔
250
    MigrationRecorder.record_unapplied = record_unapplied
29✔
251
    MigrationRecorder.flush = flush
29✔
252

253

254
def patch_migration():
29✔
255
    def apply(self, project_state, schema_editor, collect_sql=False):
29✔
256
        """
257
        Take a project_state representing all migrations prior to this one
258
        and a schema_editor for a live database and apply the migration
259
        in a forwards order.
260

261
        Return the resulting project state for efficient reuse by following
262
        Migrations.
263
        """
264
        applied_on_remote = False
29✔
265
        if getattr(schema_editor.connection, "migration_cluster", None):
29✔
266
            _table = _get_model_table_name(schema_editor.connection)
29✔
267

268
            with schema_editor.connection.cursor() as cursor:
29✔
269
                cursor.execute(
29✔
270
                    "select EXISTS(select 1 from clusterAllReplicas(%s, currentDatabase(), %s)"
271
                    " where app=%s and name=%s and deleted=false)",
272
                    [
273
                        schema_editor.connection.migration_cluster,
274
                        _table,
275
                        self.app_label,
276
                        self.name,
277
                    ],
278
                )
279
                (applied_on_remote,) = cursor.fetchone()
29✔
280
        for operation in self.operations:
29✔
281
            # If this operation cannot be represented as SQL, place a comment
282
            # there instead
283
            if collect_sql:
29✔
284
                schema_editor.collected_sql.append("--")
×
285
                schema_editor.collected_sql.append("-- %s" % operation.describe())
×
UNCOV
286
                schema_editor.collected_sql.append("--")
×
UNCOV
287
                if not operation.reduces_to_sql:
×
UNCOV
288
                    schema_editor.collected_sql.append(
×
289
                        "-- THIS OPERATION CANNOT BE WRITTEN AS SQL"
290
                    )
UNCOV
291
                    continue
×
UNCOV
292
                collected_sql_before = len(schema_editor.collected_sql)
×
293
            # Save the state before the operation has run
294
            old_state = project_state.clone()
29✔
295
            operation.state_forwards(self.app_label, project_state)
29✔
296

297
            # Run the operation
298
            # Ensure queries on cluster are only executed once.
299
            model_name = None
29✔
300
            skip_database_forwards = False
29✔
301
            if isinstance(operation, (IndexOperation, FieldOperation)):
29✔
302
                model_name = operation.model_name_lower
29✔
303
            elif isinstance(operation, ModelOperation):
29✔
304
                model_name = operation.name_lower
29✔
305
            if model_name:
29✔
306
                if isinstance(operation, (RenameModel, DeleteModel)):
29✔
307
                    model_state = old_state.models[self.app_label, model_name]
29✔
308
                else:
309
                    model_state = project_state.models[self.app_label, model_name]
29✔
310
                if model_state.options.get("cluster") and applied_on_remote:
29✔
UNCOV
311
                    skip_database_forwards = True
×
312
            if not skip_database_forwards:
29✔
313
                operation.database_forwards(
29✔
314
                    self.app_label, schema_editor, old_state, project_state
315
                )
316
            if collect_sql and collected_sql_before == len(schema_editor.collected_sql):
29✔
UNCOV
317
                schema_editor.collected_sql.append("-- (no-op)")
×
318
        return project_state
29✔
319

320
    def unapply(self, project_state, schema_editor, collect_sql=False):
29✔
321
        """
322
        Take a project_state representing all migrations prior to this one
323
        and a schema_editor for a live database and apply the migration
324
        in a reverse order.
325

326
        The backwards migration process consists of two phases:
327

328
        1. The intermediate states from right before the first until right
329
           after the last operation inside this migration are preserved.
330
        2. The operations are applied in reverse order using the states
331
           recorded in step 1.
332
        """
333
        unapplied_on_remote = False
29✔
334
        if getattr(schema_editor.connection, "migration_cluster", None):
29✔
335
            _table = _get_model_table_name(schema_editor.connection)
29✔
336

337
            with schema_editor.connection.cursor() as cursor:
29✔
338
                cursor.execute(
29✔
339
                    "select EXISTS(select 1 from clusterAllReplicas(%s, currentDatabase(), %s)"
340
                    " where app=%s and name=%s and deleted=true)",
341
                    [
342
                        schema_editor.connection.migration_cluster,
343
                        _table,
344
                        self.app_label,
345
                        self.name,
346
                    ],
347
                )
348
                (unapplied_on_remote,) = cursor.fetchone()
29✔
349
        # Construct all the intermediate states we need for a reverse migration
350
        to_run = []
29✔
351
        new_state = project_state
29✔
352
        # Phase 1
353
        for operation in self.operations:
29✔
354
            # If it's irreversible, error out
355
            if not operation.reversible:
29✔
UNCOV
356
                raise IrreversibleError(
×
357
                    "Operation %s in %s is not reversible" % (operation, self)
358
                )
359
            # Preserve new state from previous run to not tamper the same state
360
            # over all operations
361
            new_state = new_state.clone()
29✔
362
            old_state = new_state.clone()
29✔
363
            operation.state_forwards(self.app_label, new_state)
29✔
364
            to_run.insert(0, (operation, old_state, new_state))
29✔
365

366
        # Phase 2
367
        for operation, to_state, from_state in to_run:
29✔
368
            if collect_sql:
29✔
369
                schema_editor.collected_sql.append("--")
×
370
                schema_editor.collected_sql.append("-- %s" % operation.describe())
×
UNCOV
371
                schema_editor.collected_sql.append("--")
×
UNCOV
372
                if not operation.reduces_to_sql:
×
UNCOV
373
                    schema_editor.collected_sql.append(
×
374
                        "-- THIS OPERATION CANNOT BE WRITTEN AS SQL"
375
                    )
UNCOV
376
                    continue
×
UNCOV
377
                collected_sql_before = len(schema_editor.collected_sql)
×
378
            # Ensure queries on cluster are only executed once.
379
            model_name = None
29✔
380
            skip_database_backwards = False
29✔
381
            if isinstance(operation, (IndexOperation, FieldOperation)):
29✔
382
                model_name = operation.model_name_lower
29✔
383
            elif isinstance(operation, ModelOperation):
29✔
384
                model_name = operation.name_lower
29✔
385
            if model_name:
29✔
386
                if isinstance(operation, (RenameModel, DeleteModel)):
29✔
387
                    model_state = to_state.models[self.app_label, model_name]
29✔
388
                else:
389
                    model_state = from_state.models[self.app_label, model_name]
29✔
390
                if model_state.options.get("cluster") and unapplied_on_remote:
29✔
UNCOV
391
                    skip_database_backwards = True
×
392
            if not skip_database_backwards:
29✔
393
                operation.database_backwards(
29✔
394
                    self.app_label, schema_editor, from_state, to_state
395
                )
396
            if collect_sql and collected_sql_before == len(schema_editor.collected_sql):
29✔
UNCOV
397
                schema_editor.collected_sql.append("-- (no-op)")
×
398
        return project_state
29✔
399

400
    Migration.apply = apply
29✔
401
    Migration.unapply = unapply
29✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc