• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

thombashi / sqliteschema / 13611228496

02 Mar 2025 02:32AM UTC coverage: 80.126% (+0.08%) from 80.042%
13611228496

push

github

thombashi
Update README

Signed-off-by: Tsuyoshi Hombashi <tsuyoshi.hombashi@gmail.com>

144 of 186 branches covered (77.42%)

383 of 478 relevant lines covered (80.13%)

14.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.24
/sqliteschema/_extractor.py
1
"""
2
.. codeauthor:: Tsuyoshi Hombashi <tsuyoshi.hombashi@gmail.com>
3
"""
4

5
import os.path
18✔
6
import re
18✔
7
import sqlite3
18✔
8
from collections import OrderedDict
18✔
9
from collections.abc import Iterator, Mapping
18✔
10
from textwrap import dedent
18✔
11
from typing import TYPE_CHECKING, Any, Optional, Union, cast
18✔
12

13
import typepy
18✔
14

15
from ._const import MAX_VERBOSITY_LEVEL, SQLITE_SYSTEM_TABLES, SchemaHeader
18✔
16
from ._error import DataNotFoundError, OperationalError
18✔
17
from ._logger import logger
18✔
18
from ._schema import SQLiteTableSchema
18✔
19

20

21
if TYPE_CHECKING:
18!
22
    import simplesqlite
×
23

24

25
def stash_row_factory(func: Any) -> Any:
18✔
26
    def wrapper(*args: list[Any], **kwargs: Any) -> Any:
18✔
27
        db: SQLiteSchemaExtractor = cast(SQLiteSchemaExtractor, args[0])
18✔
28
        stash_row_factory = db._con.row_factory
18✔
29
        db._con.row_factory = None
18✔
30

31
        try:
18✔
32
            result = func(*args, **kwargs)
18✔
33
        finally:
34
            db._con.row_factory = stash_row_factory
18✔
35

36
        return result
18✔
37

38
    return wrapper
18✔
39

40

41
class SQLiteSchemaExtractor:
18✔
42
    """A SQLite database file schema extractor class.
43

44
    Args:
45
        database_source (str or simplesqlite.SimpleSQLite or sqlite3.Connection):
46
            SQLite database source to extract schema information.
47
    """
48

49
    global_debug_query = False
18✔
50

51
    _SQLITE_MASTER_TABLE_NAME = "master"
18✔
52
    _SQLITE_MASTER_ATTR_NAME_LIST = ["tbl_name", "sql", "type", "name", "rootpage"]
18✔
53

54
    _RE_FOREIGN_KEY = re.compile("FOREIGN KEY")
18✔
55
    _RE_ATTR_NAME = re.compile(r"^'.+?'|^\".+?\"|^\[.+?\]")
18✔
56

57
    _RE_NOT_NULL = re.compile("NOT NULL", re.IGNORECASE)
18✔
58
    _RE_PRIMARY_KEY = re.compile("PRIMARY KEY", re.IGNORECASE)
18✔
59
    _RE_UNIQUE = re.compile("UNIQUE", re.IGNORECASE)
18✔
60
    _RE_AUTO_INC = re.compile("AUTOINCREMENT", re.IGNORECASE)
18✔
61

62
    _RE_MULTI_LINE_COMMENT = re.compile(
18✔
63
        rf"/\*(?P<{SchemaHeader.COMMENT}>.*?)\*/", re.MULTILINE | re.DOTALL
64
    )
65
    _RE_SINGLE_LINE_COMMENT = re.compile(rf"[\s]*--(?P<{SchemaHeader.COMMENT}>.+)", re.MULTILINE)
18✔
66

67
    def __init__(
18✔
68
        self,
69
        database_source: Union[str, "simplesqlite.SimpleSQLite", sqlite3.Connection],
70
        max_workers: Optional[int] = None,
71
    ) -> None:
72
        from simplesqlite import SimpleSQLite
18✔
73

74
        is_connection_required = True
18✔
75

76
        if isinstance(database_source, SimpleSQLite) and database_source.is_connected():
18✔
77
            assert database_source.connection
18✔
78
            self._con: sqlite3.Connection = database_source.connection
18✔
79
            is_connection_required = False
18✔
80
        elif isinstance(database_source, sqlite3.Connection):
18✔
81
            self._con = database_source
18✔
82
            is_connection_required = False
18✔
83

84
        if is_connection_required:
18✔
85
            assert not isinstance(database_source, sqlite3.Connection)
18✔
86

87
            if not os.path.isfile(database_source):
18✔
88
                raise OSError(f"file not found: {database_source}")
18✔
89

90
            try:
18✔
91
                self._con = sqlite3.connect(database_source)
18✔
92
            except sqlite3.OperationalError as e:
×
93
                raise OperationalError(e)
×
94

95
        self.__con_sqlite_master: Optional[sqlite3.Connection] = None
18✔
96
        self.__total_changes: Optional[int] = None
18✔
97

98
        self.max_workers = max_workers
18✔
99

100
    @stash_row_factory
18✔
101
    def fetch_table_names(
18✔
102
        self, include_system_table: bool = False, include_view: bool = False
103
    ) -> list[str]:
104
        """
105
        :return: List of table names in the database.
106
        :rtype: list
107
        """
108

109
        self._con.row_factory = None
18✔
110
        cur = self._con.cursor()
18✔
111

112
        if include_view:
18✔
113
            where_query = "TYPE in ('table', 'view')"
18✔
114
        else:
115
            where_query = "TYPE='table'"
18✔
116

117
        result = cur.execute(f"SELECT name FROM sqlite_master WHERE {where_query}")
18✔
118
        if result is None:
18!
119
            return []
×
120

121
        table_names = [record[0] for record in result.fetchall()]
18!
122

123
        if include_system_table:
18!
124
            return table_names
×
125

126
        return [table for table in table_names if table not in SQLITE_SYSTEM_TABLES]
18!
127

128
    @stash_row_factory
18✔
129
    def fetch_view_names(self) -> list[str]:
18✔
130
        """
131
        :return: List of view names in the database.
132
        :rtype: list
133
        """
134

135
        self._con.row_factory = None
18✔
136
        cur = self._con.cursor()
18✔
137

138
        result = cur.execute("SELECT name FROM sqlite_master WHERE TYPE='view'")
18✔
139
        if result is None:
18!
140
            return []
×
141

142
        return [record[0] for record in result.fetchall()]
18!
143

144
    def fetch_table_schema(self, table_name: str) -> SQLiteTableSchema:
18✔
145
        return SQLiteTableSchema(
18✔
146
            table_name,
147
            schema_map=self.__fetch_table_metadata(table_name),
148
            max_workers=self.max_workers,
149
        )
150

151
    def fetch_database_schema(self) -> Iterator[SQLiteTableSchema]:
18✔
152
        for table_name in self.fetch_table_names():
18✔
153
            if table_name in self.fetch_view_names():
18!
154
                continue
×
155

156
            yield self.fetch_table_schema(table_name)
18✔
157

158
    def fetch_database_schema_as_dict(self) -> dict:
18✔
159
        database_schema = {}
18✔
160
        for table_schema in self.fetch_database_schema():
18✔
161
            database_schema.update(table_schema.as_dict())
18✔
162

163
        return database_schema
18✔
164

165
    @stash_row_factory
18✔
166
    def fetch_sqlite_master(self) -> list[dict]:
18✔
167
        """
168
        Get sqlite_master table information as a list of dictionaries.
169

170
        :return: sqlite_master table information.
171
        :rtype: list
172

173
        :Sample Code:
174
            .. code:: python
175

176
                from sqliteschema import SQLiteSchemaExtractor
177

178
                print(json.dumps(SQLiteSchemaExtractor("sample.sqlite").fetch_sqlite_master(), indent=4))
179

180
        :Output:
181
            .. code-block:: json
182

183
                [
184
                    {
185
                        "tbl_name": "sample_table",
186
                        "sql": "CREATE TABLE 'sample_table' ('a' INTEGER, 'b' REAL, 'c' TEXT, 'd' REAL, 'e' TEXT)",
187
                        "type": "table",
188
                        "name": "sample_table",
189
                        "rootpage": 2
190
                    },
191
                    {
192
                        "tbl_name": "sample_table",
193
                        "sql": "CREATE INDEX sample_table_a_index ON sample_table('a')",
194
                        "type": "index",
195
                        "name": "sample_table_a_index",
196
                        "rootpage": 3
197
                    }
198
                ]
199
        """
200

201
        sqlite_master_record_list = []
18✔
202
        cur = self._con.cursor()
18✔
203
        result = cur.execute(
18✔
204
            "SELECT {:s} FROM sqlite_master".format(", ".join(self._SQLITE_MASTER_ATTR_NAME_LIST))
205
        )
206

207
        for record in result.fetchall():
18✔
208
            sqlite_master_record_list.append(
18!
209
                {
210
                    attr_name: item
211
                    for attr_name, item in zip(self._SQLITE_MASTER_ATTR_NAME_LIST, record)
212
                }
213
            )
214

215
        return sqlite_master_record_list
18✔
216

217
    def dumps(
18✔
218
        self,
219
        output_format: Optional[str] = None,
220
        verbosity_level: int = MAX_VERBOSITY_LEVEL,
221
        **kwargs: Any,
222
    ) -> str:
223
        dump_list = []
18✔
224

225
        for table_schema in self.fetch_database_schema():
18✔
226
            dump_list.append(
18✔
227
                table_schema.dumps(
228
                    output_format=output_format, verbosity_level=verbosity_level, **kwargs
229
                )
230
            )
231

232
        return "\n".join(dump_list)
18✔
233

234
    def _extract_attr_name(self, schema: str) -> str:
18✔
235
        _RE_SINGLE_QUOTES = re.compile("^'.+?'")
18✔
236
        _RE_DOUBLE_QUOTES = re.compile('^".+?"')
18✔
237
        _RE_BRACKETS = re.compile(r"^\[.+?\]")
18✔
238

239
        match_attr_name = self._RE_ATTR_NAME.search(schema)
18✔
240
        if match_attr_name is None:
18✔
241
            attr_name = schema.split()[0]
18✔
242
        else:
243
            attr_name = match_attr_name.group()
18✔
244

245
        if _RE_SINGLE_QUOTES.search(attr_name):
18✔
246
            return attr_name.strip("'")
18✔
247

248
        if _RE_DOUBLE_QUOTES.search(attr_name):
18✔
249
            return attr_name.strip('"')
18✔
250

251
        if _RE_BRACKETS.search(attr_name):
18✔
252
            return attr_name.strip("[]")
18✔
253

254
        return attr_name
18✔
255

256
    def _extract_attr_type(self, schema: str) -> Optional[str]:
18✔
257
        match_attr_name = self._RE_ATTR_NAME.search(schema)
18✔
258
        if match_attr_name is None:
18✔
259
            try:
18✔
260
                return schema.split()[1]
18✔
261
            except IndexError:
18✔
262
                return None
18✔
263

264
        schema_wo_name = self._RE_ATTR_NAME.sub("", schema).strip()
18✔
265

266
        if not schema_wo_name:
18✔
267
            return None
18✔
268

269
        return schema_wo_name.split()[0]
18✔
270

271
    def _extract_attr_constraints(self, schema: str) -> str:
18✔
272
        attr_name_match = self._RE_ATTR_NAME.search(schema)
18✔
273
        if attr_name_match is None:
18✔
274
            return " ".join(schema.split()[2:])
18✔
275

276
        schema_wo_name = self._RE_ATTR_NAME.sub("", schema).strip()
18✔
277

278
        return " ".join(schema_wo_name.split()[1:])
18✔
279

280
    @stash_row_factory
18✔
281
    def _fetch_table_schema_text(self, table_name: str, schema_type: str) -> list[str]:
18✔
282
        if table_name in SQLITE_SYSTEM_TABLES:
18!
283
            logger.debug(f"skip fetching sqlite system table: {table_name:s}")
×
284
            return []
×
285

286
        self.__update_sqlite_master_db()
18✔
287

288
        result = self.__execute_sqlite_master(
18✔
289
            "SELECT {:s} FROM {:s} WHERE {:s} AND {:s}".format(
290
                "sql",
291
                self._SQLITE_MASTER_TABLE_NAME,
292
                "{:s} = '{:s}'".format("tbl_name", table_name),
293
                "{:s} = '{:s}'".format("type", schema_type),
294
            ),
295
            self.global_debug_query,
296
        )
297

298
        try:
18✔
299
            return result.fetchone()[0]
18✔
300
        except TypeError:
18✔
301
            raise DataNotFoundError(f"data not found in '{self._SQLITE_MASTER_TABLE_NAME}' table")
18✔
302

303
        raise RuntimeError("failed to fetch table schema")
304

305
    def _parse_table_schema_text(self, table_name: str, table_schema_text: str) -> list[dict]:
18✔
306
        index_query_list = self._fetch_index_schema(table_name)
18✔
307
        table_metadata: list[dict] = []
18✔
308

309
        table_attr_text = table_schema_text.split("(", maxsplit=1)[1].rsplit(")", maxsplit=1)[0]
18✔
310
        item_count = 0
18✔
311

312
        for attr_item in re.split("[,\n]", table_attr_text):
18✔
313
            attr_item = attr_item.strip()
18✔
314
            if not attr_item:
18✔
315
                continue
18✔
316

317
            if self._RE_FOREIGN_KEY.search(attr_item) is not None:
18✔
318
                continue
18✔
319

320
            match = self._RE_MULTI_LINE_COMMENT.search(
18✔
321
                attr_item
322
            ) or self._RE_SINGLE_LINE_COMMENT.search(attr_item)
323
            comment = ""
18✔
324
            if match:
18✔
325
                comment = match.group(SchemaHeader.COMMENT).strip()
18✔
326

327
            if table_metadata and comment:
18✔
328
                table_metadata[item_count - 1][SchemaHeader.COMMENT] = comment
18✔
329
                continue
18✔
330

331
            values: dict[str, Any] = OrderedDict()
18✔
332
            attr_name = self._extract_attr_name(attr_item)
18✔
333
            re_index = re.compile(re.escape(attr_name))
18✔
334

335
            values[SchemaHeader.ATTR_NAME] = attr_name
18✔
336
            values[SchemaHeader.INDEX] = False
18✔
337
            values[SchemaHeader.DATA_TYPE] = self._extract_attr_type(attr_item)
18✔
338

339
            try:
18✔
340
                constraint = self._extract_attr_constraints(attr_item)
18✔
341
            except IndexError:
×
342
                continue
×
343

344
            values[SchemaHeader.NULLABLE] = (
18✔
345
                "NO" if self._RE_NOT_NULL.search(constraint) is not None else "YES"
346
            )
347
            values[SchemaHeader.KEY] = self.__extract_key_constraint(constraint)
18✔
348
            values[SchemaHeader.DEFAULT] = self.__extract_default_value(constraint)
18✔
349

350
            if values[SchemaHeader.KEY] in ("PRI", "UNI"):
18✔
351
                values[SchemaHeader.INDEX] = True
18✔
352
            else:
353
                for index_query in index_query_list:
18✔
354
                    if re_index.search(index_query) is not None:
18✔
355
                        values[SchemaHeader.INDEX] = True
18✔
356
                        break
18✔
357

358
            values[SchemaHeader.EXTRA] = ", ".join(self.__extract_extra(constraint))
18✔
359

360
            table_metadata.append(values)
18✔
361
            item_count += 1
18✔
362

363
        return table_metadata
18✔
364

365
    def _fetch_index_schema(self, table_name: str) -> list[str]:
18✔
366
        self.__update_sqlite_master_db()
18✔
367

368
        result = self.__execute_sqlite_master(
18✔
369
            "SELECT {:s} FROM {:s} WHERE {:s} AND {:s}".format(
370
                "sql",
371
                self._SQLITE_MASTER_TABLE_NAME,
372
                "{:s} = '{:s}'".format("tbl_name", table_name),
373
                "{:s} = '{:s}'".format("type", "index"),
374
            ),
375
            self.global_debug_query,
376
        )
377

378
        try:
18✔
379
            return [
18!
380
                record[0] for record in result.fetchall() if typepy.is_not_empty_sequence(record[0])
381
            ]
382
        except TypeError:
×
383
            raise DataNotFoundError(f"index not found in '{table_name}'")
×
384

385
    def __fetch_table_metadata(self, table_name: str) -> Mapping[str, list[Mapping[str, Any]]]:
18✔
386
        metadata: dict[str, list] = OrderedDict()
18✔
387

388
        if table_name in self.fetch_view_names():
18!
389
            # can not extract metadata from views
390
            return {}
×
391

392
        table_schema_text = self._fetch_table_schema_text(table_name, "table")
18✔
393
        metadata[table_name] = self._parse_table_schema_text(table_name, table_schema_text)
18✔
394

395
        return metadata
18✔
396

397
    def __extract_key_constraint(self, constraint: str) -> str:
18✔
398
        if self._RE_PRIMARY_KEY.search(constraint):
18✔
399
            return "PRI"
18✔
400

401
        if self._RE_UNIQUE.search(constraint):
18✔
402
            return "UNI"
18✔
403

404
        return ""
18✔
405

406
    def __extract_default_value(self, constraint: str) -> str:
18✔
407
        regexp_default = re.compile("DEFAULT (?P<value>.+)", re.IGNORECASE)
18✔
408
        match = regexp_default.search(constraint)
18✔
409

410
        if match:
18✔
411
            return match.group("value")
18✔
412

413
        if self._RE_NOT_NULL.search(constraint):
18✔
414
            return ""
18✔
415

416
        return "NULL"
18✔
417

418
    def __extract_extra(self, constraint: str) -> list[str]:
18✔
419
        extra_list = []
18✔
420
        if self._RE_AUTO_INC.search(constraint):
18✔
421
            extra_list.append("AUTOINCREMENT")
18✔
422

423
        return extra_list
18✔
424

425
    def __execute_sqlite_master(self, query: str, is_logging: bool = True) -> sqlite3.Cursor:
18✔
426
        if is_logging:
18!
427
            logger.debug(query)
×
428

429
        assert self.__con_sqlite_master is not None
18✔
430

431
        return self.__con_sqlite_master.execute(query)
18✔
432

433
    def __update_sqlite_master_db(self) -> None:
18✔
434
        try:
18✔
435
            if self.__total_changes == self._con.total_changes:
18✔
436
                """
12✔
437
                logger.debug(
438
                    "skipping the {} table update. updates not found after the last update.".format(
439
                        self._SQLITE_MASTER_TABLE_NAME
440
                    )
441
                )
442
                """
443
                return
18✔
444
        except AttributeError:
×
445
            pass
446

447
        if self.__con_sqlite_master:
18!
448
            self.__con_sqlite_master.close()
×
449

450
        self.__con_sqlite_master = sqlite3.connect(":memory:")
18✔
451
        sqlite_master = self.fetch_sqlite_master()
18✔
452

453
        if typepy.is_empty_sequence(sqlite_master):
18!
454
            return
×
455

456
        sqlite_master_records = [
18!
457
            [record[attr] for attr in self._SQLITE_MASTER_ATTR_NAME_LIST]
458
            for record in sqlite_master
459
        ]
460
        self.__execute_sqlite_master(
18✔
461
            dedent(
462
                """\
463
                CREATE TABLE {:s} (
464
                    tbl_name TEXT NOT NULL,
465
                    sql TEXT,
466
                    type TEXT NOT NULL,
467
                    name TEXT NOT NULL,
468
                    rootpage INTEGER NOT NULL
469
                )
470
                """
471
            ).format(self._SQLITE_MASTER_TABLE_NAME),
472
            False,
473
        )
474
        self.__con_sqlite_master.executemany(
18✔
475
            f"INSERT INTO {self._SQLITE_MASTER_TABLE_NAME:s} VALUES (?,?,?,?,?)",
476
            sqlite_master_records,
477
        )
478

479
        if self.global_debug_query:
18!
480
            logger.debug(
×
481
                "insert {:d} records into {:s}".format(
482
                    len(sqlite_master_records), self._SQLITE_MASTER_TABLE_NAME
483
                )
484
            )
485

486
        self.__con_sqlite_master.commit()
18✔
487

488
        self.__total_changes = self._con.total_changes
18✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc