• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

LeanderCS / sqlalchemy-fake-model / #8

18 Sep 2025 03:50PM UTC coverage: 84.444% (-0.9%) from 85.366%
#8

push

coveralls-python

LeanderCS
Add smart field detector tests

1 of 1 new or added line in 1 file covered. (100.0%)

57 existing lines in 2 files now uncovered.

342 of 405 relevant lines covered (84.44%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.06
/sqlalchemy_fake_model/ModelFaker.py
1
import json
1✔
2
import logging
1✔
3
import random
1✔
4
import traceback
1✔
5
from datetime import date, datetime
1✔
6
from typing import Any, Dict, List, Optional, Union
1✔
7

8
from faker import Faker
1✔
9
from sqlalchemy import Column, ColumnDefault, Table
1✔
10
from sqlalchemy.exc import IntegrityError
1✔
11
from sqlalchemy.orm import ColumnProperty, Session
1✔
12

13
from .Enum.ModelColumnTypesEnum import ModelColumnTypesEnum
1✔
14
from .Error.InvalidAmountError import InvalidAmountError
1✔
15
from .Error.UniquenessError import UniquenessError
1✔
16
from .Model.ModelFakerConfig import ModelFakerConfig
1✔
17
from .SmartFieldDetector import SmartFieldDetector
1✔
18

19

20
class ModelFaker:
1✔
21
    """
22
    The ModelFaker class is a utility class that helps in generating fake data
23
    for a given SQLAlchemy model. It uses the faker library to generate fake
24
    data based on the column types of the model. It also handles relationships
25
    between models and can generate data for different relationships.
26
    """
27

28
    def __init__(
1✔
29
        self,
30
        model: Union[Table, ColumnProperty],
31
        db: Optional[Session] = None,
32
        faker: Optional[Faker] = None,
33
        config: Optional[ModelFakerConfig] = None,
34
    ) -> None:
35
        """
36
        Initializes the ModelFaker class with the given model,
37
        database session, faker instance, and configuration.
38

39
        :param model: The SQLAlchemy model for which fake data
40
            needs to be generated.
41
        :param db: Optional SQLAlchemy session to be used for
42
            creating fake data.
43
        :param faker: Optional Faker instance to be used for
44
            generating fake data.
45
        :param config: Optional ModelFakerConfig instance to be
46
            used for configuring the ModelFaker.
47
        """
48
        self.model = model
1✔
49
        self.db = db or self._get_framework_session()
1✔
50
        self.config = config or ModelFakerConfig()
1✔
51
        self.faker = (
1✔
52
            faker or self.config.faker_instance or Faker(self.config.locale)
53
        )
54
        self.logger = logging.getLogger(__name__)
1✔
55
        self._unique_values = {}
1✔
56
        self.smart_detector = (
1✔
57
            SmartFieldDetector(self.faker)
58
            if self.config.smart_detection
59
            else None
60
        )
61

62
        if self.config.seed is not None:
1✔
63
            self.faker.seed_instance(self.config.seed)
1✔
64

65
    def __enter__(self):
1✔
66
        """Context manager entry."""
67
        return self
1✔
68

69
    def __exit__(self, exc_type, exc_val, exc_tb):
1✔
70
        """Context manager exit with automatic cleanup."""
71
        if exc_type is not None:
1✔
72
            self.logger.error(f"Exception in ModelFaker context: {exc_val}")
1✔
73
            if hasattr(self.db, "rollback"):
1✔
74
                try:
1✔
75
                    self.db.rollback()
1✔
76
                    self.logger.info("Database transaction rolled back")
1✔
77
                except Exception as rollback_error:
×
78
                    self.logger.error(f"Failed to rollback: {rollback_error}")
×
79
        return False
1✔
80

81
    @staticmethod
1✔
82
    def _get_framework_session() -> Optional[Session]:
1✔
83
        """
84
        Tries to get the SQLAlchemy session from available frameworks.
85

86
        :return: The SQLAlchemy session if available.
87
        :raises RuntimeError: If no supported framework
88
            is installed or configured
89
        """
90
        try:
1✔
91
            from flask import current_app
1✔
92

93
            return current_app.extensions["sqlalchemy"].db.session
1✔
UNCOV
94
        except (ImportError, KeyError):
×
UNCOV
95
            pass
×
96

UNCOV
97
        try:
×
UNCOV
98
            from tornado.web import Application
×
99

UNCOV
100
            return Application().settings["db"]
×
UNCOV
101
        except (ImportError, KeyError):
×
UNCOV
102
            pass
×
103

UNCOV
104
        try:
×
UNCOV
105
            from django.conf import settings
×
UNCOV
106
            from sqlalchemy import create_engine
×
UNCOV
107
            from sqlalchemy.orm import sessionmaker
×
108

UNCOV
109
            engine = create_engine(settings.DATABASES["default"]["ENGINE"])
×
UNCOV
110
            return sessionmaker(bind=engine)()
×
UNCOV
111
        except (ImportError, KeyError, AttributeError):
×
UNCOV
112
            pass
×
113

UNCOV
114
        raise RuntimeError(
×
115
            "No SQLAlchemy session provided and no supported framework "
116
            "installed or configured."
117
        )
118

119
    def create(self, amount: Optional[int] = 1) -> None:
1✔
120
        """
121
        Creates the specified amount of fake data entries for the model.
122
        It handles exceptions and rolls back the session
123
        in case of any errors.
124

125
        :param amount: The number of fake data entries to create.
126
        :raises InvalidAmountError: If the amount is not an integer or negative.
127
        """
128
        if not isinstance(amount, int) or amount < 0:
1✔
129
            raise InvalidAmountError(amount)
1✔
130

131
        if amount <= self.config.bulk_size:
1✔
132
            self._create_single_batch(amount)
1✔
133
        else:
134
            self._create_bulk(amount)
1✔
135

136
    def _create_single_batch(self, amount: int) -> None:
1✔
137
        """Creates a single batch of records."""
138
        try:
1✔
139
            batch_data = []
1✔
140

141
            for _ in range(amount):
1✔
142
                data = {}
1✔
143
                for column in self.__get_table_columns():
1✔
144
                    if self.__should_skip_field(column):
1✔
145
                        continue
1✔
146
                    data[column.name] = (
1✔
147
                        self._generate_fake_data_with_overrides(column)
148
                    )
149
                batch_data.append(data)
1✔
150

151
            if self.__is_many_to_many_relation_table():
1✔
152
                self.db.execute(self.model.insert().values(batch_data))
1✔
153
            else:
154
                for data in batch_data:
1✔
155
                    self.db.add(self.model(**data))
1✔
156

157
            self.db.commit()
1✔
158
            self.logger.info(f"Successfully created {amount} records")
1✔
159

160
        except IntegrityError as e:
1✔
UNCOV
161
            self.db.rollback()
×
UNCOV
162
            self.logger.error(f"Integrity error in batch creation: {e}")
×
UNCOV
163
            if "unique" in str(e).lower() or "duplicate" in str(e).lower():
×
UNCOV
164
                raise UniquenessError("unknown_field", self.config.max_retries)
×
UNCOV
165
            raise
×
166
        except Exception as e:
1✔
167
            self.db.rollback()
1✔
168
            self.logger.error(f"Failed to create batch: {e}")
1✔
169
            raise RuntimeError(
1✔
170
                f"Failed to commit: {e} {traceback.format_exc()}"
171
            )
172

173
    def _create_bulk(self, amount: int) -> None:
1✔
174
        """Creates records in multiple batches for better performance."""
175
        remaining = amount
1✔
176
        created = 0
1✔
177

178
        while remaining > 0:
1✔
179
            batch_size = min(remaining, self.config.bulk_size)
1✔
180
            try:
1✔
181
                self._create_single_batch(batch_size)
1✔
182
                created += batch_size
1✔
183
                remaining -= batch_size
1✔
184
                self.logger.info(f"Created {created}/{amount} records")
1✔
UNCOV
185
            except Exception as e:
×
UNCOV
186
                self.logger.error(
×
187
                    f"Failed to create bulk batch at {created}/{amount}: {e}"
188
                )
189
                raise
×
190

191
    def _generate_fake_data(
1✔
192
        self, column: Column
193
    ) -> Optional[Union[str, int, bool, date, datetime, None]]:
194
        """
195
        Generates fake data for a given column based on its type.
196
        It handles Enum, String, Integer, Boolean, DateTime, and Date column
197
        types.
198

199
        :param column: The SQLAlchemy column for which fake data
200
            needs to be generated.
201
        :return: The fake data generated for the column.
202
        """
203
        column_type = column.type
1✔
204

205
        if column.doc:
1✔
206
            return str(self._generate_json_data(column.doc))
1✔
207

208
        # Enum has to be the first type to check, or otherwise it
209
        # uses the options of the corresponding type of the enum options
210
        if isinstance(column_type, ModelColumnTypesEnum.ENUM.value):
1✔
UNCOV
211
            return random.choice(column_type.enums)
×
212

213
        if column.foreign_keys:
1✔
214
            related_attribute = list(column.foreign_keys)[0].column.name
1✔
215
            return getattr(
1✔
216
                self.__handle_relationship(column), related_attribute
217
            )
218

219
        if column.primary_key:
1✔
UNCOV
220
            return self._generate_primitive(column_type)
×
221

222
        if isinstance(column_type, ModelColumnTypesEnum.STRING.value):
1✔
223
            max_length = (
1✔
224
                column_type.length
225
                if hasattr(column_type, "length")
226
                and column_type.length is not None
227
                else 255
228
            )
229
            return self.faker.text(max_nb_chars=max_length)
1✔
230

231
        if isinstance(column_type, ModelColumnTypesEnum.INTEGER.value):
1✔
232
            info = column.info
1✔
233
            if not info:
1✔
234
                return self.faker.random_int()
1✔
235

236
            min_value = column.info.get("min", 1)
1✔
237
            max_value = column.info.get("max", 100)
1✔
238
            return self.faker.random_int(min=min_value, max=max_value)
1✔
239

240
        if isinstance(column_type, ModelColumnTypesEnum.FLOAT.value):
1✔
241
            precision = column_type.precision
1✔
242
            if not precision:
1✔
243
                return self.faker.pyfloat()
1✔
244

245
            max_value = 10 ** (precision[0] - precision[1]) - 1
1✔
246
            return round(
1✔
247
                self.faker.pyfloat(min_value=0, max_value=max_value),
248
                precision[1],
249
            )
250

251
        if isinstance(column_type, ModelColumnTypesEnum.BOOLEAN.value):
1✔
252
            return self.faker.boolean()
1✔
253

254
        if isinstance(column_type, ModelColumnTypesEnum.DATE.value):
1✔
255
            return self.faker.date_object()
1✔
256

257
        if isinstance(column_type, ModelColumnTypesEnum.DATETIME.value):
1✔
258
            return self.faker.date_time()
1✔
259

260
        if isinstance(column_type, ModelColumnTypesEnum.TIME.value):
1✔
261
            return self.faker.time_object()
1✔
262

263
        if isinstance(column_type, ModelColumnTypesEnum.UUID.value):
1✔
UNCOV
264
            return self.faker.uuid4()
×
265

266
        if isinstance(column_type, ModelColumnTypesEnum.DECIMAL.value):
1✔
267
            precision = getattr(column_type, "precision", None)
1✔
268
            scale = getattr(column_type, "scale", None)
1✔
269
            if precision and scale:
1✔
270
                max_digits = precision - scale
1✔
271
                max_value = 10**max_digits - 1
1✔
272
                return round(
1✔
273
                    self.faker.pyfloat(min_value=0, max_value=max_value), scale
274
                )
UNCOV
275
            return self.faker.pydecimal(
×
276
                left_digits=10, right_digits=2, positive=True
277
            )
278

279
        if isinstance(column_type, ModelColumnTypesEnum.INTERVAL.value):
1✔
UNCOV
280
            days = self.faker.random_int(min=1, max=365)
×
UNCOV
281
            return f"{days} days"
×
282

283
        if isinstance(column_type, ModelColumnTypesEnum.LARGEBINARY.value):
1✔
284
            return self.faker.binary(length=256)
1✔
285

UNCOV
286
        if isinstance(
×
287
            column_type,
288
            (
289
                ModelColumnTypesEnum.JSON.value,
290
                ModelColumnTypesEnum.JSONB.value,
291
            ),
292
        ):
UNCOV
293
            json_structure = {
×
294
                "id": "integer",
295
                "name": "string",
296
                "active": "boolean",
297
            }
UNCOV
298
            return self._populate_json_structure(json_structure)
×
299

UNCOV
300
        return None
×
301

302
    def __handle_relationship(self, column: Column) -> Optional[Table]:
1✔
303
        """
304
        Handles the relationship of a column with another model.
305
        It creates a fake data entry for the parent model and returns its id.
306
        """
307
        parent_model = self.__get_related_class(column)
1✔
308

309
        ModelFaker(parent_model, self.db).create()
1✔
310

311
        return self.db.query(parent_model).first()
1✔
312

313
    def __is_many_to_many_relation_table(self) -> bool:
1✔
314
        """
315
        Checks if the model is a many-to-many relationship table.
316
        """
317
        return not hasattr(self.model, "__table__") and not hasattr(
1✔
318
            self.model, "__mapper__"
319
        )
320

321
    def __should_skip_field(self, column: Column) -> bool:
1✔
322
        """
323
        Checks if a column is a primary key or has a default value.
324
        """
325
        return (
1✔
326
            (column.primary_key and self.__is_field_auto_increment(column))
327
            or self.__has_field_default_value(column)
328
            or self.__is_field_nullable(column)
329
        )
330

331
    @staticmethod
1✔
332
    def __is_field_auto_increment(column: Column) -> bool:
1✔
333
        """
334
        Checks if a column is autoincrement.
335
        """
336
        return column.autoincrement and isinstance(
1✔
337
            column.type, ModelColumnTypesEnum.INTEGER.value
338
        )
339

340
    def __has_field_default_value(self, column: Column) -> bool:
1✔
341
        """
342
        Checks if a column has a default value.
343
        """
344
        return (
1✔
345
            isinstance(column.default, ColumnDefault)
346
            and column.default.arg is not None
347
            and not self.config.fill_default_fields
348
        )
349

350
    def __is_field_nullable(self, column: Column) -> bool:
1✔
351
        """
352
        Checks if a column is nullable.
353
        """
354
        return (
1✔
355
            column.nullable is not None
356
            and column.nullable is True
357
            and not self.config.fill_nullable_fields
358
        )
359

360
    def __get_table_columns(self) -> List[Column]:
1✔
361
        """
362
        Returns the columns of the model's table.
363
        """
364
        return (
1✔
365
            self.model.columns
366
            if self.__is_many_to_many_relation_table()
367
            else self.model.__table__.columns
368
        )
369

370
    def __get_related_class(self, column: Column) -> Table:
1✔
371
        """
372
        Returns the related class of a column if it has
373
        a relationship with another model.
374
        """
375
        if (
1✔
376
            not self.__is_many_to_many_relation_table()
377
            and column.name in self.model.__mapper__.relationships
378
        ):
UNCOV
379
            return self.model.__mapper__.relationships[
×
380
                column.key
381
            ].mapper.class_
382

383
        fk = list(column.foreign_keys)[0]
1✔
384

385
        return fk.column.table
1✔
386

387
    def _generate_json_data(self, docstring: str) -> Dict[str, Any]:
1✔
388
        """
389
        Generates JSON data based on the provided docstring.
390
        """
391
        json_structure = json.loads(docstring)
1✔
392

393
        return self._populate_json_structure(json_structure)
1✔
394

395
    def _populate_json_structure(
1✔
396
        self, structure: Union[Dict[str, Any], List[Any]]
397
    ) -> Any:
398
        """
399
        Populates the JSON structure with fake data based on the defined
400
        schema.
401
        """
402
        if isinstance(structure, dict):
1✔
403
            return {
1✔
404
                key: self._populate_json_structure(value)
405
                if isinstance(value, (dict, list))
406
                else self._generate_primitive(value)
407
                for key, value in structure.items()
408
            }
409

410
        if isinstance(structure, list):
1✔
411
            return [
1✔
412
                self._populate_json_structure(item)
413
                if isinstance(item, (dict, list))
414
                else self._generate_primitive(item)
415
                for item in structure
416
            ]
417

UNCOV
418
        return structure
×
419

420
    def _generate_fake_data_with_overrides(self, column: Column) -> Any:
1✔
421
        """
422
        Generates fake data with custom overrides and optional smart detection.
423
        """
424
        if column.name in self.config.field_overrides:
1✔
425
            return self.config.field_overrides[column.name]()
1✔
426

427
        if self.smart_detector:
1✔
428
            smart_value = self.smart_detector.detect_and_generate(column)
1✔
429
            if smart_value is not None:
1✔
430
                return smart_value
1✔
431

432
        return self._generate_fake_data(column)
1✔
433

434
    def _generate_primitive(self, primitive_type: str) -> Any:
1✔
435
        """
436
        Generates fake data for primitive types.
437
        """
438
        if primitive_type == "boolean":
1✔
UNCOV
439
            return self.faker.boolean()
×
440
        if primitive_type == "datetime":
1✔
441
            return self.faker.date_time().isoformat()
1✔
442
        if primitive_type == "date":
1✔
443
            return self.faker.date()
1✔
444
        if primitive_type == "integer":
1✔
445
            return self.faker.random_int()
1✔
446
        if primitive_type == "string":
1✔
447
            return self.faker.word()
1✔
448
        if primitive_type == "float":
1✔
449
            return self.faker.pyfloat()
1✔
UNCOV
450
        return self.faker.word()
×
451

452
    def create_batch(self, amount: int, commit: bool = False) -> List[Any]:
1✔
453
        """
454
        Creates a batch of model instances without committing to database.
455

456
        :param amount: Number of instances to create
457
        :param commit: Whether to commit the batch to database
458
        :return: List of created model instances
459
        """
460
        if not isinstance(amount, int):
1✔
UNCOV
461
            raise InvalidAmountError(amount)
×
462

463
        instances = []
1✔
464
        try:
1✔
465
            for _ in range(amount):
1✔
466
                data = {}
1✔
467
                for column in self.__get_table_columns():
1✔
468
                    if self.__should_skip_field(column):
1✔
469
                        continue
1✔
470
                    data[column.name] = (
1✔
471
                        self._generate_fake_data_with_overrides(column)
472
                    )
473

474
                if not self.__is_many_to_many_relation_table():
1✔
475
                    instance = self.model(**data)
1✔
476
                    instances.append(instance)
1✔
477
                    if commit:
1✔
478
                        self.db.add(instance)
1✔
479

480
            if commit and instances:
1✔
481
                self.db.commit()
1✔
482
                self.logger.info(
1✔
483
                    f"Committed batch of {len(instances)} instances"
484
                )
485

486
            return instances
1✔
487

UNCOV
488
        except Exception as e:
×
UNCOV
489
            if commit:
×
UNCOV
490
                self.db.rollback()
×
UNCOV
491
            self.logger.error(f"Failed to create batch: {e}")
×
UNCOV
492
            raise
×
493

494
    def create_with(
1✔
495
        self, overrides: Dict[str, Any], amount: int = 1
496
    ) -> List[Any]:
497
        """
498
        Creates model instances with specific field overrides.
499

500
        :param overrides: Dictionary of field values to override
501
        :param amount: Number of instances to create
502
        :return: List of created model instances
503
        """
504
        if not isinstance(amount, int):
1✔
UNCOV
505
            raise InvalidAmountError(amount)
×
506

507
        instances = []
1✔
508
        try:
1✔
509
            for _ in range(amount):
1✔
510
                data = {}
1✔
511
                for column in self.__get_table_columns():
1✔
512
                    if self.__should_skip_field(column):
1✔
513
                        continue
1✔
514

515
                    if column.name in overrides:
1✔
516
                        data[column.name] = overrides[column.name]
1✔
517
                    else:
518
                        data[column.name] = (
1✔
519
                            self._generate_fake_data_with_overrides(column)
520
                        )
521

522
                if self.__is_many_to_many_relation_table():
1✔
UNCOV
523
                    self.db.execute(self.model.insert().values(**data))
×
524
                else:
525
                    instance = self.model(**data)
1✔
526
                    instances.append(instance)
1✔
527
                    self.db.add(instance)
1✔
528

529
            self.db.commit()
1✔
530
            self.logger.info(
1✔
531
                f"Created {len(instances)} instances with overrides"
532
            )
533
            return instances
1✔
534

UNCOV
535
        except Exception as e:
×
UNCOV
536
            self.db.rollback()
×
UNCOV
537
            self.logger.error(f"Failed to create with overrides: {e}")
×
UNCOV
538
            raise
×
539

540
    def reset(self, confirm: bool = False) -> int:
1✔
541
        """
542
        Removes all records from the model's table.
543

544
        :param confirm: Must be True to actually perform the deletion
545
        :return: Number of deleted records
546
        """
547
        if not confirm:
1✔
548
            raise ValueError("Must set confirm=True to delete all records")
1✔
549

550
        try:
1✔
551
            if self.__is_many_to_many_relation_table():
1✔
UNCOV
552
                result = self.db.execute(self.model.delete())
×
UNCOV
553
                deleted_count = result.rowcount
×
554
            else:
555
                deleted_count = self.db.query(self.model).count()
1✔
556
                self.db.query(self.model).delete()
1✔
557

558
            self.db.commit()
1✔
559
            self.logger.info(
1✔
560
                f"Deleted {deleted_count} records from {self.model}"
561
            )
562
            return deleted_count
1✔
563

UNCOV
564
        except Exception as e:
×
UNCOV
565
            self.db.rollback()
×
UNCOV
566
            self.logger.error(f"Failed to reset table: {e}")
×
UNCOV
567
            raise
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc