• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

agronholm / sqlacodegen / 12938863942

23 Jan 2025 10:01PM UTC coverage: 97.025% (-2.0%) from 99.035%
12938863942

push

github

web-flow
SQLModel Code generation fixes  (#358)

12 of 15 new or added lines in 1 file covered. (80.0%)

33 existing lines in 3 files now uncovered.

1337 of 1378 relevant lines covered (97.02%)

4.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.39
/src/sqlacodegen/utils.py
1
from __future__ import annotations
5✔
2

3
import re
5✔
4
from collections.abc import Mapping
5✔
5
from typing import Any
5✔
6

7
from sqlalchemy import PrimaryKeyConstraint, UniqueConstraint
5✔
8
from sqlalchemy.engine import Connection, Engine
5✔
9
from sqlalchemy.sql import ClauseElement
5✔
10
from sqlalchemy.sql.elements import TextClause
5✔
11
from sqlalchemy.sql.schema import (
5✔
12
    CheckConstraint,
13
    ColumnCollectionConstraint,
14
    Constraint,
15
    ForeignKeyConstraint,
16
    Index,
17
    Table,
18
)
19

20
_re_postgresql_nextval_sequence = re.compile(r"nextval\('(.+)'::regclass\)")
5✔
21
_re_postgresql_sequence_delimiter = re.compile(r'(.*?)([."]|$)')
5✔
22

23

24
def get_column_names(constraint: ColumnCollectionConstraint) -> list[str]:
5✔
25
    return list(constraint.columns.keys())
5✔
26

27

28
def get_constraint_sort_key(constraint: Constraint) -> str:
5✔
29
    if isinstance(constraint, CheckConstraint):
5✔
30
        return f"C{constraint.sqltext}"
5✔
31
    elif isinstance(constraint, ColumnCollectionConstraint):
5✔
32
        return constraint.__class__.__name__[0] + repr(get_column_names(constraint))
5✔
33
    else:
34
        return str(constraint)
×
35

36

37
def get_compiled_expression(statement: ClauseElement, bind: Engine | Connection) -> str:
5✔
38
    """Return the statement in a form where any placeholders have been filled in."""
39
    return str(statement.compile(bind, compile_kwargs={"literal_binds": True}))
5✔
40

41

42
def get_common_fk_constraints(
5✔
43
    table1: Table, table2: Table
44
) -> set[ForeignKeyConstraint]:
45
    """
46
    Return a set of foreign key constraints the two tables have against each other.
47

48
    """
49
    c1 = {
5✔
50
        c
51
        for c in table1.constraints
52
        if isinstance(c, ForeignKeyConstraint) and c.elements[0].column.table == table2
53
    }
54
    c2 = {
5✔
55
        c
56
        for c in table2.constraints
57
        if isinstance(c, ForeignKeyConstraint) and c.elements[0].column.table == table1
58
    }
59
    return c1.union(c2)
5✔
60

61

62
def uses_default_name(constraint: Constraint | Index) -> bool:
5✔
63
    if not constraint.name or constraint.table is None:
5✔
64
        return True
5✔
65

66
    table = constraint.table
5✔
67
    values: dict[str, Any] = {
5✔
68
        "table_name": table.name,
69
        "constraint_name": constraint.name,
70
    }
71
    if isinstance(constraint, (Index, ColumnCollectionConstraint)):
5✔
72
        values.update(
5✔
73
            {
74
                "column_0N_name": "".join(col.name for col in constraint.columns),
75
                "column_0_N_name": "_".join(col.name for col in constraint.columns),
76
                "column_0N_label": "".join(
77
                    col.label(col.name).name for col in constraint.columns
78
                ),
79
                "column_0_N_label": "_".join(
80
                    col.label(col.name).name for col in constraint.columns
81
                ),
82
                "column_0N_key": "".join(
83
                    col.key for col in constraint.columns if col.key
84
                ),
85
                "column_0_N_key": "_".join(
86
                    col.key for col in constraint.columns if col.key
87
                ),
88
            }
89
        )
90
        if constraint.columns:
5✔
91
            columns = constraint.columns.values()
5✔
92
            values.update(
5✔
93
                {
94
                    "column_0_name": columns[0].name,
95
                    "column_0_label": columns[0].label(columns[0].name).name,
96
                    "column_0_key": columns[0].key,
97
                }
98
            )
99

100
    if isinstance(constraint, Index):
5✔
101
        key = "ix"
5✔
102
    elif isinstance(constraint, CheckConstraint):
5✔
103
        key = "ck"
5✔
104
    elif isinstance(constraint, UniqueConstraint):
5✔
105
        key = "uq"
5✔
106
    elif isinstance(constraint, PrimaryKeyConstraint):
5✔
107
        key = "pk"
5✔
108
    elif isinstance(constraint, ForeignKeyConstraint):
5✔
109
        key = "fk"
5✔
110
        values.update(
5✔
111
            {
112
                "referred_table_name": constraint.referred_table,
113
                "referred_column_0_name": constraint.elements[0].column.name,
114
                "referred_column_0N_name": "".join(
115
                    fk.column.name for fk in constraint.elements
116
                ),
117
                "referred_column_0_N_name": "_".join(
118
                    fk.column.name for fk in constraint.elements
119
                ),
120
                "referred_column_0_label": constraint.elements[0]
121
                .column.label(constraint.elements[0].column.name)
122
                .name,
123
                "referred_fk.column_0N_label": "".join(
124
                    fk.column.label(fk.column.name).name for fk in constraint.elements
125
                ),
126
                "referred_fk.column_0_N_label": "_".join(
127
                    fk.column.label(fk.column.name).name for fk in constraint.elements
128
                ),
129
                "referred_fk.column_0_key": constraint.elements[0].column.key,
130
                "referred_fk.column_0N_key": "".join(
131
                    fk.column.key for fk in constraint.elements if fk.column.key
132
                ),
133
                "referred_fk.column_0_N_key": "_".join(
134
                    fk.column.key for fk in constraint.elements if fk.column.key
135
                ),
136
            }
137
        )
138
    else:
UNCOV
139
        raise TypeError(f"Unknown constraint type: {constraint.__class__.__qualname__}")
×
140

141
    try:
5✔
142
        convention: str = table.metadata.naming_convention[key]
5✔
143
        return constraint.name == (convention % values)
5✔
144
    except KeyError:
5✔
145
        return False
5✔
146

147

148
def render_callable(
5✔
149
    name: str,
150
    *args: object,
151
    kwargs: Mapping[str, object] | None = None,
152
    indentation: str = "",
153
) -> str:
154
    """
155
    Render a function call.
156

157
    :param name: name of the callable
158
    :param args: positional arguments
159
    :param kwargs: keyword arguments
160
    :param indentation: if given, each argument will be rendered on its own line with
161
        this value used as the indentation
162

163
    """
164
    if kwargs:
5✔
165
        args += tuple(f"{key}={value}" for key, value in kwargs.items())
5✔
166

167
    if indentation:
5✔
168
        prefix = f"\n{indentation}"
5✔
169
        suffix = "\n"
5✔
170
        delimiter = f",\n{indentation}"
5✔
171
    else:
172
        prefix = suffix = ""
5✔
173
        delimiter = ", "
5✔
174

175
    rendered_args = delimiter.join(str(arg) for arg in args)
5✔
176
    return f"{name}({prefix}{rendered_args}{suffix})"
5✔
177

178

179
def qualified_table_name(table: Table) -> str:
5✔
180
    if table.schema:
5✔
181
        return f"{table.schema}.{table.name}"
5✔
182
    else:
183
        return str(table.name)
5✔
184

185

186
def decode_postgresql_sequence(clause: TextClause) -> tuple[str | None, str | None]:
5✔
187
    match = _re_postgresql_nextval_sequence.match(clause.text)
5✔
188
    if not match:
5✔
UNCOV
189
        return None, None
×
190

191
    schema: str | None = None
5✔
192
    sequence: str = ""
5✔
193
    in_quotes = False
5✔
194
    for match in _re_postgresql_sequence_delimiter.finditer(match.group(1)):
5✔
195
        sequence += match.group(1)
5✔
196
        if match.group(2) == '"':
5✔
197
            in_quotes = not in_quotes
5✔
198
        elif match.group(2) == ".":
5✔
199
            if in_quotes:
5✔
200
                sequence += "."
5✔
201
            else:
202
                schema, sequence = sequence, ""
5✔
203

204
    return schema, sequence
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc