• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

agronholm / sqlacodegen / 16157977885

09 Jul 2025 01:17AM UTC coverage: 97.387% (+0.008%) from 97.379%
16157977885

Pull #411

github

web-flow
Merge fd2afcaa9 into e4c32c5e7
Pull Request #411: Fixed same-name imports from wrong package

23 of 24 new or added lines in 3 files covered. (95.83%)

29 existing lines in 2 files now uncovered.

1416 of 1454 relevant lines covered (97.39%)

4.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.39
/src/sqlacodegen/utils.py
1
from __future__ import annotations
5✔
2

3
import re
5✔
4
from collections.abc import Mapping
5✔
5
from typing import Any, Literal, cast
5✔
6

7
from sqlalchemy import PrimaryKeyConstraint, UniqueConstraint
5✔
8
from sqlalchemy.engine import Connection, Engine
5✔
9
from sqlalchemy.sql import ClauseElement
5✔
10
from sqlalchemy.sql.elements import TextClause
5✔
11
from sqlalchemy.sql.schema import (
5✔
12
    CheckConstraint,
13
    ColumnCollectionConstraint,
14
    Constraint,
15
    ForeignKeyConstraint,
16
    Index,
17
    Table,
18
)
19

20
_re_postgresql_nextval_sequence = re.compile(r"nextval\('(.+)'::regclass\)")
5✔
21
_re_postgresql_sequence_delimiter = re.compile(r'(.*?)([."]|$)')
5✔
22

23

24
def get_column_names(constraint: ColumnCollectionConstraint) -> list[str]:
5✔
25
    return list(constraint.columns.keys())
5✔
26

27

28
def get_constraint_sort_key(constraint: Constraint) -> str:
5✔
29
    if isinstance(constraint, CheckConstraint):
5✔
30
        return f"C{constraint.sqltext}"
5✔
31
    elif isinstance(constraint, ColumnCollectionConstraint):
5✔
32
        return constraint.__class__.__name__[0] + repr(get_column_names(constraint))
5✔
33
    else:
UNCOV
34
        return str(constraint)
×
35

36

37
def get_compiled_expression(statement: ClauseElement, bind: Engine | Connection) -> str:
5✔
38
    """Return the statement in a form where any placeholders have been filled in."""
39
    return str(statement.compile(bind, compile_kwargs={"literal_binds": True}))
5✔
40

41

42
def get_common_fk_constraints(
5✔
43
    table1: Table, table2: Table
44
) -> set[ForeignKeyConstraint]:
45
    """
46
    Return a set of foreign key constraints the two tables have against each other.
47

48
    """
49
    c1 = {
5✔
50
        c
51
        for c in table1.constraints
52
        if isinstance(c, ForeignKeyConstraint) and c.elements[0].column.table == table2
53
    }
54
    c2 = {
5✔
55
        c
56
        for c in table2.constraints
57
        if isinstance(c, ForeignKeyConstraint) and c.elements[0].column.table == table1
58
    }
59
    return c1.union(c2)
5✔
60

61

62
def uses_default_name(constraint: Constraint | Index) -> bool:
5✔
63
    if not constraint.name or constraint.table is None:
5✔
64
        return True
5✔
65

66
    table = constraint.table
5✔
67
    values: dict[str, Any] = {
5✔
68
        "table_name": table.name,
69
        "constraint_name": constraint.name,
70
    }
71
    if isinstance(constraint, (Index, ColumnCollectionConstraint)):
5✔
72
        values.update(
5✔
73
            {
74
                "column_0N_name": "".join(col.name for col in constraint.columns),
75
                "column_0_N_name": "_".join(col.name for col in constraint.columns),
76
                "column_0N_label": "".join(
77
                    col.label(col.name).name for col in constraint.columns
78
                ),
79
                "column_0_N_label": "_".join(
80
                    col.label(col.name).name for col in constraint.columns
81
                ),
82
                "column_0N_key": "".join(
83
                    col.key for col in constraint.columns if col.key
84
                ),
85
                "column_0_N_key": "_".join(
86
                    col.key for col in constraint.columns if col.key
87
                ),
88
            }
89
        )
90
        if constraint.columns:
5✔
91
            columns = constraint.columns.values()
5✔
92
            values.update(
5✔
93
                {
94
                    "column_0_name": columns[0].name,
95
                    "column_0_label": columns[0].label(columns[0].name).name,
96
                    "column_0_key": columns[0].key,
97
                }
98
            )
99

100
    key: Literal["fk", "pk", "ix", "ck", "uq"]
101
    if isinstance(constraint, Index):
5✔
102
        key = "ix"
5✔
103
    elif isinstance(constraint, CheckConstraint):
5✔
104
        key = "ck"
5✔
105
    elif isinstance(constraint, UniqueConstraint):
5✔
106
        key = "uq"
5✔
107
    elif isinstance(constraint, PrimaryKeyConstraint):
5✔
108
        key = "pk"
5✔
109
    elif isinstance(constraint, ForeignKeyConstraint):
5✔
110
        key = "fk"
5✔
111
        values.update(
5✔
112
            {
113
                "referred_table_name": constraint.referred_table,
114
                "referred_column_0_name": constraint.elements[0].column.name,
115
                "referred_column_0N_name": "".join(
116
                    fk.column.name for fk in constraint.elements
117
                ),
118
                "referred_column_0_N_name": "_".join(
119
                    fk.column.name for fk in constraint.elements
120
                ),
121
                "referred_column_0_label": constraint.elements[0]
122
                .column.label(constraint.elements[0].column.name)
123
                .name,
124
                "referred_fk.column_0N_label": "".join(
125
                    fk.column.label(fk.column.name).name for fk in constraint.elements
126
                ),
127
                "referred_fk.column_0_N_label": "_".join(
128
                    fk.column.label(fk.column.name).name for fk in constraint.elements
129
                ),
130
                "referred_fk.column_0_key": constraint.elements[0].column.key,
131
                "referred_fk.column_0N_key": "".join(
132
                    fk.column.key for fk in constraint.elements if fk.column.key
133
                ),
134
                "referred_fk.column_0_N_key": "_".join(
135
                    fk.column.key for fk in constraint.elements if fk.column.key
136
                ),
137
            }
138
        )
139
    else:
UNCOV
140
        raise TypeError(f"Unknown constraint type: {constraint.__class__.__qualname__}")
×
141

142
    try:
5✔
143
        convention = cast(
5✔
144
            Mapping[str, str],
145
            table.metadata.naming_convention,
146
        )[key]
147
        return constraint.name == (convention % values)
5✔
148
    except KeyError:
5✔
149
        return False
5✔
150

151

152
def render_callable(
5✔
153
    name: str,
154
    *args: object,
155
    kwargs: Mapping[str, object] | None = None,
156
    indentation: str = "",
157
) -> str:
158
    """
159
    Render a function call.
160

161
    :param name: name of the callable
162
    :param args: positional arguments
163
    :param kwargs: keyword arguments
164
    :param indentation: if given, each argument will be rendered on its own line with
165
        this value used as the indentation
166

167
    """
168
    if kwargs:
5✔
169
        args += tuple(f"{key}={value}" for key, value in kwargs.items())
5✔
170

171
    if indentation:
5✔
172
        prefix = f"\n{indentation}"
5✔
173
        suffix = "\n"
5✔
174
        delimiter = f",\n{indentation}"
5✔
175
    else:
176
        prefix = suffix = ""
5✔
177
        delimiter = ", "
5✔
178

179
    rendered_args = delimiter.join(str(arg) for arg in args)
5✔
180
    return f"{name}({prefix}{rendered_args}{suffix})"
5✔
181

182

183
def qualified_table_name(table: Table) -> str:
5✔
184
    if table.schema:
5✔
185
        return f"{table.schema}.{table.name}"
5✔
186
    else:
187
        return str(table.name)
5✔
188

189

190
def decode_postgresql_sequence(clause: TextClause) -> tuple[str | None, str | None]:
5✔
191
    match = _re_postgresql_nextval_sequence.match(clause.text)
5✔
192
    if not match:
5✔
UNCOV
193
        return None, None
×
194

195
    schema: str | None = None
5✔
196
    sequence: str = ""
5✔
197
    in_quotes = False
5✔
198
    for match in _re_postgresql_sequence_delimiter.finditer(match.group(1)):
5✔
199
        sequence += match.group(1)
5✔
200
        if match.group(2) == '"':
5✔
201
            in_quotes = not in_quotes
5✔
202
        elif match.group(2) == ".":
5✔
203
            if in_quotes:
5✔
204
                sequence += "."
5✔
205
            else:
206
                schema, sequence = sequence, ""
5✔
207

208
    return schema, sequence
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc