• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

LeanderCS / sqlalchemy-fake-model / #23

23 Feb 2026 11:20AM UTC coverage: 81.124% (-3.0%) from 84.108%
#23

push

coveralls-python

LeanderCS
implement circular relation detection

Signed-off-by: Leander Cain Slotosch <slotosch.leander@outlook.de>

44 of 71 new or added lines in 1 file covered. (61.97%)

1 existing line in 1 file now uncovered.

361 of 445 relevant lines covered (81.12%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.08
/sqlalchemy_fake_model/ModelFaker.py
1
import json
1✔
2
import logging
1✔
3
import random
1✔
4
import traceback
1✔
5
from datetime import date, datetime
1✔
6
from typing import Any, Dict, List, Optional, Union
1✔
7

8
from faker import Faker
1✔
9
from sqlalchemy import Column, ColumnDefault, Table
1✔
10
from sqlalchemy.exc import IntegrityError
1✔
11
from sqlalchemy.orm import ColumnProperty, Session
1✔
12

13
from .Enum import ModelColumnTypesEnum
1✔
14
from .Error import InvalidAmountError, UniquenessError
1✔
15
from .Model import ModelFakerConfig
1✔
16
from .SmartFieldDetector import SmartFieldDetector
1✔
17

18

19
class ModelFaker:
1✔
20
    """
21
    The ModelFaker class is a utility class that helps in generating fake data
22
    for a given SQLAlchemy model. It uses the faker library to generate fake
23
    data based on the column types of the model. It also handles relationships
24
    between models and can generate data for different relationships.
25
    """
26

27
    def __init__(
1✔
28
        self,
29
        model: Union[Table, ColumnProperty],
30
        db: Optional[Session] = None,
31
        faker: Optional[Faker] = None,
32
        config: Optional[ModelFakerConfig] = None,
33
    ) -> None:
34
        """
35
        Initializes the ModelFaker class with the given model,
36
        database session, faker instance, and configuration.
37

38
        :param model: The SQLAlchemy model for which fake data
39
            needs to be generated.
40
        :param db: Optional SQLAlchemy session to be used for
41
            creating fake data.
42
        :param faker: Optional Faker instance to be used for
43
            generating fake data.
44
        :param config: Optional ModelFakerConfig instance to be
45
            used for configuring the ModelFaker.
46
        """
47
        self.model = model
1✔
48
        self.db = db or self._get_framework_session()
1✔
49
        self.config = config or ModelFakerConfig()
1✔
50
        self.faker = (
1✔
51
            faker or self.config.faker_instance or Faker(self.config.locale)
52
        )
53
        self.logger = logging.getLogger(__name__)
1✔
54
        self._unique_values = {}
1✔
55
        self._relationship_cache = {}
1✔
56
        self._processing_relationships = set()
1✔
57
        self.smart_detector = (
1✔
58
            SmartFieldDetector(self.faker)
59
            if self.config.smart_detection
60
            else None
61
        )
62

63
        if self.config.seed is not None:
1✔
64
            self.faker.seed_instance(self.config.seed)
1✔
65

66
    def __enter__(self):
1✔
67
        """Context manager entry."""
68
        return self
1✔
69

70
    def __exit__(self, exc_type, exc_val, exc_tb):
1✔
71
        """Context manager exit with automatic cleanup."""
72
        if exc_type is not None:
1✔
73
            self.logger.error(f"Exception in ModelFaker context: {exc_val}")
1✔
74
            if hasattr(self.db, "rollback"):
1✔
75
                try:
1✔
76
                    self.db.rollback()
1✔
77
                    self.logger.info("Database transaction rolled back")
1✔
78
                except Exception as rollback_error:
×
79
                    self.logger.error(f"Failed to rollback: {rollback_error}")
×
80
        return False
1✔
81

82
    @staticmethod
1✔
83
    def _get_framework_session() -> Optional[Session]:
1✔
84
        """
85
        Tries to get the SQLAlchemy session from available frameworks.
86

87
        :return: The SQLAlchemy session if available.
88
        :raises RuntimeError: If no supported framework
89
            is installed or configured
90
        """
91
        try:
1✔
92
            from flask import current_app
1✔
93

94
            if "sqlalchemy" in current_app.extensions:
1✔
95
                db_ext = current_app.extensions["sqlalchemy"]
1✔
96

97
                # In Flask-SQLAlchemy >= 2.0, the db object is the extension
98
                # itself
99
                if hasattr(db_ext, "session"):
1✔
100
                    return db_ext.session
1✔
101

102
                # Some versions might have a different structure
103
                if hasattr(db_ext, "db") and hasattr(db_ext.db, "session"):
×
104
                    return db_ext.db.session
×
105

106
        except (ImportError, KeyError, AttributeError):
×
107
            pass
×
108

109
        try:
×
110
            from tornado.web import Application
×
111

112
            return Application().settings["db"]
×
113
        except (ImportError, KeyError):
×
114
            pass
×
115

116
        try:
×
117
            from django.conf import settings
×
118
            from sqlalchemy import create_engine
×
119
            from sqlalchemy.orm import sessionmaker
×
120

121
            engine = create_engine(settings.DATABASES["default"]["ENGINE"])
×
122
            return sessionmaker(bind=engine)()
×
123
        except (ImportError, KeyError, AttributeError):
×
124
            pass
×
125

126
        raise RuntimeError(
×
127
            "No SQLAlchemy session provided and no supported framework "
128
            "installed or configured."
129
        )
130

131
    def create(self, amount: Optional[int] = 1) -> None:
1✔
132
        """
133
        Creates the specified amount of fake data entries for the model.
134
        It handles exceptions and rolls back the session
135
        in case of any errors.
136

137
        :param amount: The number of fake data entries to create.
138
        :raises InvalidAmountError: If the amount is not an integer or
139
            negative.
140
        """
141
        if not isinstance(amount, int) or amount < 0:
1✔
142
            raise InvalidAmountError(amount)
1✔
143

144
        if amount <= self.config.bulk_size:
1✔
145
            self._create_single_batch(amount)
1✔
146
        else:
147
            self._create_bulk(amount)
1✔
148

149
    def _create_single_batch(self, amount: int) -> None:
1✔
150
        """Creates a single batch of records."""
151
        retries = 0
1✔
152
        max_retries = (
1✔
153
            self.config.max_retries if self.config.unique_constraints else 1
154
        )
155

156
        while retries < max_retries:
1✔
157
            try:
1✔
158
                batch_data = []
1✔
159

160
                for _ in range(amount):
1✔
161
                    data = {}
1✔
162
                    for column in self.__get_table_columns():
1✔
163
                        if self.__should_skip_field(column):
1✔
164
                            continue
1✔
165
                        data[column.name] = (
1✔
166
                            self._generate_fake_data_with_overrides(column)
167
                        )
168
                    batch_data.append(data)
1✔
169

170
                if self.__is_many_to_many_relation_table():
1✔
171
                    self.db.execute(self.model.insert().values(batch_data))
1✔
172
                else:
173
                    for data in batch_data:
1✔
174
                        self.db.add(self.model(**data))
1✔
175

176
                self.db.commit()
1✔
177
                self.logger.info(f"Successfully created {amount} records")
1✔
178
                return
1✔
179

180
            except IntegrityError as e:
1✔
NEW
181
                self.db.rollback()
×
NEW
182
                if "unique" in str(e).lower() or "duplicate" in str(e).lower():
×
NEW
183
                    retries += 1
×
NEW
184
                    if retries >= max_retries:
×
NEW
185
                        self.logger.error(
×
186
                            "Unique constraint violation after "
187
                            + f"{max_retries} retries: {e}"
188
                        )
NEW
189
                        raise UniquenessError(
×
190
                            "unknown_field", max_retries
191
                        ) from e
NEW
192
                    self.logger.warning(
×
193
                        "Unique constraint violation, retrying "
194
                        + f"({retries}/{max_retries}): {e}"
195
                    )
NEW
196
                    continue
×
NEW
197
                raise
×
198
            except Exception as e:
1✔
199
                self.db.rollback()
1✔
200
                self.logger.error(f"Failed to create batch: {e}")
1✔
201
                if isinstance(
1✔
202
                    e, (IntegrityError, UniquenessError, InvalidAmountError)
203
                ):
NEW
204
                    raise
×
205
                raise RuntimeError(
1✔
206
                    f"Failed to commit: {e} {traceback.format_exc()}"
207
                ) from e
208

209
    def _create_bulk(self, amount: int) -> None:
1✔
210
        """Creates records in multiple batches for better performance."""
211
        remaining = amount
1✔
212
        created = 0
1✔
213

214
        while remaining > 0:
1✔
215
            batch_size = min(remaining, self.config.bulk_size)
1✔
216
            try:
1✔
217
                self._create_single_batch(batch_size)
1✔
218
                created += batch_size
1✔
219
                remaining -= batch_size
1✔
220
                self.logger.info(f"Created {created}/{amount} records")
1✔
221
            except Exception as e:
×
222
                self.logger.error(
×
223
                    f"Failed to create bulk batch at {created}/{amount}: {e}"
224
                )
225
                raise
×
226

227
    def _generate_fake_data(
1✔
228
        self, column: Column
229
    ) -> Optional[Union[str, int, bool, date, datetime, None]]:
230
        """
231
        Generates fake data for a given column based on its type.
232
        It handles Enum, String, Integer, Boolean, DateTime, and Date column
233
        types.
234

235
        :param column: The SQLAlchemy column for which fake data
236
            needs to be generated.
237
        :return: The fake data generated for the column.
238
        """
239
        column_type = column.type
1✔
240

241
        if column.doc:
1✔
242
            return json.dumps(self._generate_json_data(column.doc))
1✔
243

244
        # Enum has to be the first type to check, or otherwise it
245
        # uses the options of the corresponding type of the enum options
246
        if isinstance(column_type, ModelColumnTypesEnum.ENUM.value):
1✔
247
            return random.choice(column_type.enums)
1✔
248

249
        if column.foreign_keys:
1✔
250
            related_attribute = next(iter(column.foreign_keys)).column.name
1✔
251
            return getattr(
1✔
252
                self.__handle_relationship(column), related_attribute
253
            )
254

255
        if column.primary_key:
1✔
256
            return self._generate_primitive(column_type)
×
257

258
        if isinstance(column_type, ModelColumnTypesEnum.STRING.value):
1✔
259
            max_length = (
1✔
260
                column_type.length
261
                if hasattr(column_type, "length")
262
                and column_type.length is not None
263
                else 255
264
            )
265
            return self.faker.text(max_nb_chars=max_length)
1✔
266

267
        if isinstance(column_type, ModelColumnTypesEnum.TEXT.value):
1✔
NEW
268
            return self.faker.text(max_nb_chars=500)
×
269

270
        if isinstance(column_type, ModelColumnTypesEnum.INTEGER.value):
1✔
271
            info = column.info
1✔
272
            if not info:
1✔
273
                return self.faker.random_int()
1✔
274

275
            min_value = column.info.get("min", 1)
1✔
276
            max_value = column.info.get("max", 100)
1✔
277
            return self.faker.random_int(min=min_value, max=max_value)
1✔
278

279
        if isinstance(column_type, ModelColumnTypesEnum.FLOAT.value):
1✔
280
            precision = column_type.precision
1✔
281
            if not precision:
1✔
282
                return self.faker.pyfloat()
1✔
283

284
            max_value = 10 ** (precision[0] - precision[1]) - 1
1✔
285
            return round(
1✔
286
                self.faker.pyfloat(min_value=0, max_value=max_value),
287
                precision[1],
288
            )
289

290
        if isinstance(column_type, ModelColumnTypesEnum.BOOLEAN.value):
1✔
291
            return self.faker.boolean()
1✔
292

293
        if isinstance(column_type, ModelColumnTypesEnum.DATE.value):
1✔
294
            return self.faker.date_object()
1✔
295

296
        if isinstance(column_type, ModelColumnTypesEnum.DATETIME.value):
1✔
297
            return self.faker.date_time()
1✔
298

299
        if isinstance(column_type, ModelColumnTypesEnum.TIME.value):
1✔
300
            return self.faker.time_object()
1✔
301

302
        if isinstance(column_type, ModelColumnTypesEnum.UUID.value):
1✔
303
            return self.faker.uuid4()
×
304

305
        if isinstance(column_type, ModelColumnTypesEnum.DECIMAL.value):
1✔
306
            precision = getattr(column_type, "precision", None)
1✔
307
            scale = getattr(column_type, "scale", None)
1✔
308
            if precision and scale:
1✔
309
                max_digits = precision - scale
1✔
310
                max_value = 10**max_digits - 1
1✔
311
                return round(
1✔
312
                    self.faker.pyfloat(min_value=0, max_value=max_value), scale
313
                )
314
            return self.faker.pydecimal(
×
315
                left_digits=10, right_digits=2, positive=True
316
            )
317

318
        if isinstance(column_type, ModelColumnTypesEnum.INTERVAL.value):
1✔
319
            days = self.faker.random_int(min=1, max=365)
×
320
            return f"{days} days"
×
321

322
        if isinstance(column_type, ModelColumnTypesEnum.LARGEBINARY.value):
1✔
323
            return self.faker.binary(length=256)
1✔
324

325
        if isinstance(
×
326
            column_type,
327
            (
328
                ModelColumnTypesEnum.JSON.value,
329
                ModelColumnTypesEnum.JSONB.value,
330
            ),
331
        ):
NEW
332
            if column.doc:
×
NEW
333
                return self._generate_json_data(column.doc)
×
UNCOV
334
            json_structure = {
×
335
                "id": "integer",
336
                "name": "string",
337
                "active": "boolean",
338
            }
339
            return self._populate_json_structure(json_structure)
×
340

341
        return None
×
342

343
    def __handle_relationship(self, column: Column) -> Any:
1✔
344
        """
345
        Handles the relationship of a column with another model.
346
        It creates a fake data entry for the parent model and returns its id.
347
        Reuses existing records when possible to avoid duplicates.
348
        """
349
        parent_model = self.__get_related_class(column)
1✔
350
        model_key = (
1✔
351
            parent_model.__name__
352
            if hasattr(parent_model, "__name__")
353
            else str(parent_model)
354
        )
355

356
        if model_key in self._processing_relationships:
1✔
NEW
357
            existing_record = self.db.query(parent_model).first()
×
NEW
358
            if existing_record:
×
NEW
359
                return existing_record
×
360

361
        self._processing_relationships.add(model_key)
1✔
362

363
        try:
1✔
364
            if model_key not in self._relationship_cache:
1✔
365
                existing_record = self.db.query(parent_model).first()
1✔
366
                if existing_record:
1✔
NEW
367
                    self._relationship_cache[model_key] = existing_record
×
368
                else:
369
                    ModelFaker(
1✔
370
                        parent_model, self.db, config=self.config
371
                    ).create()
372
                    self._relationship_cache[model_key] = self.db.query(
1✔
373
                        parent_model
374
                    ).first()
375
            else:
NEW
376
                if self.config.unique_constraints:
×
NEW
377
                    existing_record = self.db.query(parent_model).first()
×
NEW
378
                    if existing_record:
×
NEW
379
                        return existing_record
×
380

381
            return self._relationship_cache.get(model_key) or (
1✔
382
                self.db.query(parent_model).first()
383
            )
384
        finally:
385
            self._processing_relationships.discard(model_key)
1✔
386

387
    def __is_many_to_many_relation_table(self) -> bool:
1✔
388
        """
389
        Checks if the model is a many-to-many relationship table.
390
        """
391
        return not hasattr(self.model, "__table__") and not hasattr(
1✔
392
            self.model, "__mapper__"
393
        )
394

395
    def __should_skip_field(self, column: Column) -> bool:
1✔
396
        """
397
        Checks if a column is a primary key or has a default value.
398
        """
399
        return (
1✔
400
            (column.primary_key and self.__is_field_auto_increment(column))
401
            or self.__has_field_default_value(column)
402
            or self.__is_field_nullable(column)
403
        )
404

405
    @staticmethod
1✔
406
    def __is_field_auto_increment(column: Column) -> bool:
1✔
407
        """
408
        Checks if a column is autoincrement.
409
        """
410
        return column.autoincrement and isinstance(
1✔
411
            column.type, ModelColumnTypesEnum.INTEGER.value
412
        )
413

414
    def __has_field_default_value(self, column: Column) -> bool:
1✔
415
        """
416
        Checks if a column has a default value.
417
        """
418
        return (
1✔
419
            isinstance(column.default, ColumnDefault)
420
            and column.default.arg is not None
421
            and not self.config.fill_default_fields
422
        )
423

424
    def __is_field_nullable(self, column: Column) -> bool:
1✔
425
        """
426
        Checks if a column is nullable.
427
        """
428
        return (
1✔
429
            column.nullable is not None
430
            and column.nullable is True
431
            and not self.config.fill_nullable_fields
432
        )
433

434
    def __get_table_columns(self) -> List[Column]:
1✔
435
        """
436
        Returns the columns of the model's table.
437
        """
438
        return (
1✔
439
            self.model.columns
440
            if self.__is_many_to_many_relation_table()
441
            else self.model.__table__.columns
442
        )
443

444
    def __get_related_class(self, column: Column) -> Any:
1✔
445
        """
446
        Returns the related class of a column if it has
447
        a relationship with another model.
448
        """
449
        if (
1✔
450
            not self.__is_many_to_many_relation_table()
451
            and column.name in self.model.__mapper__.relationships
452
        ):
453
            return self.model.__mapper__.relationships[
×
454
                column.key
455
            ].mapper.class_
456

457
        fk = next(iter(column.foreign_keys))
1✔
458

459
        return fk.column.table
1✔
460

461
    def _generate_json_data(self, docstring: str) -> Dict[str, Any]:
1✔
462
        """
463
        Generates JSON data based on the provided docstring.
464
        """
465
        json_structure = json.loads(docstring)
1✔
466

467
        return self._populate_json_structure(json_structure)
1✔
468

469
    def _populate_json_structure(
1✔
470
        self, structure: Union[Dict[str, Any], List[Any]]
471
    ) -> Any:
472
        """
473
        Populates the JSON structure with fake data based on the defined
474
        schema.
475
        """
476
        if isinstance(structure, dict):
1✔
477
            return {
1✔
478
                key: self._populate_json_structure(value)
479
                if isinstance(value, (dict, list))
480
                else self._generate_primitive(value)
481
                for key, value in structure.items()
482
            }
483

484
        if isinstance(structure, list):
1✔
485
            return [
1✔
486
                self._populate_json_structure(item)
487
                if isinstance(item, (dict, list))
488
                else self._generate_primitive(item)
489
                for item in structure
490
            ]
491

492
        return structure
×
493

494
    def _generate_fake_data_with_overrides(self, column: Column) -> Any:
1✔
495
        """
496
        Generates fake data with custom overrides and optional smart detection.
497
        """
498
        if column.name in self.config.field_overrides:
1✔
499
            return self.config.field_overrides[column.name]()
1✔
500

501
        if self.smart_detector:
1✔
502
            smart_value = self.smart_detector.detect_and_generate(column)
1✔
503
            if smart_value is not None:
1✔
504
                return smart_value
1✔
505

506
        return self._generate_fake_data(column)
1✔
507

508
    def _generate_primitive(
1✔
509
        self, primitive_type: str
510
    ) -> Union[str, int, float, bool, date, datetime]:
511
        """
512
        Generates fake data for primitive types.
513
        """
514
        if primitive_type == "boolean":
1✔
515
            return self.faker.boolean()
×
516
        if primitive_type == "datetime":
1✔
517
            return self.faker.date_time().isoformat()
1✔
518
        if primitive_type == "date":
1✔
519
            return self.faker.date()
1✔
520
        if primitive_type == "integer":
1✔
521
            return self.faker.random_int()
1✔
522
        if primitive_type == "string":
1✔
523
            return self.faker.word()
1✔
524
        if primitive_type == "float":
1✔
525
            return self.faker.pyfloat()
1✔
526
        return self.faker.word()
×
527

528
    def create_batch(self, amount: int, commit: bool = False) -> List[Any]:
1✔
529
        """
530
        Creates a batch of model instances without committing to database.
531

532
        :param amount: Number of instances to create
533
        :param commit: Whether to commit the batch to database
534
        :return: List of created model instances
535
        """
536
        if not isinstance(amount, int):
1✔
537
            raise InvalidAmountError(amount)
×
538

539
        instances = []
1✔
540
        try:
1✔
541
            for _ in range(amount):
1✔
542
                data = {}
1✔
543
                for column in self.__get_table_columns():
1✔
544
                    if self.__should_skip_field(column):
1✔
545
                        continue
1✔
546
                    data[column.name] = (
1✔
547
                        self._generate_fake_data_with_overrides(column)
548
                    )
549

550
                if not self.__is_many_to_many_relation_table():
1✔
551
                    instance = self.model(**data)
1✔
552
                    instances.append(instance)
1✔
553
                    if commit:
1✔
554
                        self.db.add(instance)
1✔
555

556
            if commit and instances:
1✔
557
                self.db.commit()
1✔
558
                self.logger.info(
1✔
559
                    f"Committed batch of {len(instances)} instances"
560
                )
561

562
            return instances
1✔
563

564
        except Exception as e:
×
565
            if commit:
×
566
                self.db.rollback()
×
567
            self.logger.error(f"Failed to create batch: {e}")
×
NEW
568
            if isinstance(
×
569
                e, (IntegrityError, UniquenessError, InvalidAmountError)
570
            ):
NEW
571
                raise
×
NEW
572
            raise RuntimeError(f"Failed to create batch: {e}") from e
×
573

574
    def create_with(
1✔
575
        self, overrides: Dict[str, Any], amount: int = 1
576
    ) -> List[Any]:
577
        """
578
        Creates model instances with specific field overrides.
579

580
        :param overrides: Dictionary of field values to override
581
        :param amount: Number of instances to create
582
        :return: List of created model instances
583
        """
584
        if not isinstance(amount, int):
1✔
585
            raise InvalidAmountError(amount)
×
586

587
        instances = []
1✔
588
        try:
1✔
589
            for _ in range(amount):
1✔
590
                data = {}
1✔
591
                for column in self.__get_table_columns():
1✔
592
                    if self.__should_skip_field(column):
1✔
593
                        continue
1✔
594

595
                    if column.name in overrides:
1✔
596
                        data[column.name] = overrides[column.name]
1✔
597
                    else:
598
                        data[column.name] = (
1✔
599
                            self._generate_fake_data_with_overrides(column)
600
                        )
601

602
                if self.__is_many_to_many_relation_table():
1✔
603
                    self.db.execute(self.model.insert().values(**data))
×
604
                else:
605
                    instance = self.model(**data)
1✔
606
                    instances.append(instance)
1✔
607
                    self.db.add(instance)
1✔
608

609
            self.db.commit()
1✔
610
            self.logger.info(
1✔
611
                f"Created {len(instances)} instances with overrides"
612
            )
613
            return instances
1✔
614

615
        except Exception as e:
×
616
            self.db.rollback()
×
617
            self.logger.error(f"Failed to create with overrides: {e}")
×
NEW
618
            if isinstance(
×
619
                e, (IntegrityError, UniquenessError, InvalidAmountError)
620
            ):
NEW
621
                raise
×
NEW
622
            raise RuntimeError(f"Failed to create with overrides: {e}") from e
×
623

624
    def reset(self, confirm: bool = False) -> int:
1✔
625
        """
626
        Removes all records from the model's table.
627

628
        :param confirm: Must be True to actually perform the deletion
629
        :return: Number of deleted records
630
        """
631
        if not confirm:
1✔
632
            raise ValueError("Must set confirm=True to delete all records")
1✔
633

634
        try:
1✔
635
            if self.__is_many_to_many_relation_table():
1✔
636
                result = self.db.execute(self.model.delete())
×
637
                deleted_count = result.rowcount
×
638
            else:
639
                deleted_count = self.db.query(self.model).delete()
1✔
640

641
            self.db.commit()
1✔
642
            self.logger.info(
1✔
643
                f"Deleted {deleted_count} records from {self.model}"
644
            )
645
            return deleted_count
1✔
646

647
        except Exception as e:
×
648
            self.db.rollback()
×
649
            self.logger.error(f"Failed to reset table: {e}")
×
650
            raise
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc