• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

agronholm / sqlacodegen / 7052416992

30 Nov 2023 09:20PM UTC coverage: 99.035% (+1.4%) from 97.636%
7052416992

push

github

agronholm
Updated GitHub actions

1129 of 1140 relevant lines covered (99.04%)

7.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.43
/src/sqlacodegen/utils.py
1
from __future__ import annotations
10✔
2

3
import re
10✔
4
from collections.abc import Mapping
10✔
5
from typing import Any
10✔
6

7
from sqlalchemy import PrimaryKeyConstraint, UniqueConstraint
10✔
8
from sqlalchemy.engine import Connection, Engine
10✔
9
from sqlalchemy.sql import ClauseElement
10✔
10
from sqlalchemy.sql.elements import TextClause
10✔
11
from sqlalchemy.sql.schema import (
10✔
12
    CheckConstraint,
13
    ColumnCollectionConstraint,
14
    Constraint,
15
    ForeignKeyConstraint,
16
    Index,
17
    Table,
18
)
19

20
_re_postgresql_nextval_sequence = re.compile(r"nextval\('(.+)'::regclass\)")
10✔
21
_re_postgresql_sequence_delimiter = re.compile(r'(.*?)([."]|$)')
10✔
22

23

24
def get_column_names(constraint: ColumnCollectionConstraint) -> list[str]:
10✔
25
    return list(constraint.columns.keys())
10✔
26

27

28
def get_constraint_sort_key(constraint: Constraint) -> str:
10✔
29
    if isinstance(constraint, CheckConstraint):
10✔
30
        return f"C{constraint.sqltext}"
10✔
31
    elif isinstance(constraint, ColumnCollectionConstraint):
10✔
32
        return constraint.__class__.__name__[0] + repr(get_column_names(constraint))
10✔
33
    else:
×
34
        return str(constraint)
×
35

36

37
def get_compiled_expression(statement: ClauseElement, bind: Engine | Connection) -> str:
10✔
38
    """Return the statement in a form where any placeholders have been filled in."""
39
    return str(statement.compile(bind, compile_kwargs={"literal_binds": True}))
40

41

42
def get_common_fk_constraints(
43
    table1: Table, table2: Table
44
) -> set[ForeignKeyConstraint]:
45
    """
46
    Return a set of foreign key constraints the two tables have against each other.
47

48
    """
49
    c1 = {
10✔
50
        c
10✔
51
        for c in table1.constraints
10✔
52
        if isinstance(c, ForeignKeyConstraint) and c.elements[0].column.table == table2
10✔
53
    }
54
    c2 = {
10✔
55
        c
10✔
56
        for c in table2.constraints
10✔
57
        if isinstance(c, ForeignKeyConstraint) and c.elements[0].column.table == table1
10✔
58
    }
59
    return c1.union(c2)
10✔
60

61

62
def uses_default_name(constraint: Constraint | Index) -> bool:
10✔
63
    if not constraint.name or constraint.table is None:
10✔
64
        return True
10✔
65

66
    table = constraint.table
10✔
67
    values: dict[str, Any] = {
10✔
68
        "table_name": table.name,
10✔
69
        "constraint_name": constraint.name,
10✔
70
    }
71
    if isinstance(constraint, (Index, ColumnCollectionConstraint)):
10✔
72
        values.update(
10✔
73
            {
10✔
74
                "column_0N_name": "".join(col.name for col in constraint.columns),
10✔
75
                "column_0_N_name": "_".join(col.name for col in constraint.columns),
10✔
76
                "column_0N_label": "".join(
10✔
77
                    col.label(col.name).name for col in constraint.columns
10✔
78
                ),
79
                "column_0_N_label": "_".join(
10✔
80
                    col.label(col.name).name for col in constraint.columns
10✔
81
                ),
82
                "column_0N_key": "".join(
10✔
83
                    col.key for col in constraint.columns if col.key
10✔
84
                ),
85
                "column_0_N_key": "_".join(
10✔
86
                    col.key for col in constraint.columns if col.key
10✔
87
                ),
88
            }
89
        )
90
        if constraint.columns:
10✔
91
            columns = constraint.columns.values()
10✔
92
            values.update(
10✔
93
                {
10✔
94
                    "column_0_name": columns[0].name,
10✔
95
                    "column_0_label": columns[0].label(columns[0].name).name,
10✔
96
                    "column_0_key": columns[0].key,
10✔
97
                }
98
            )
99

100
    if isinstance(constraint, Index):
10✔
101
        key = "ix"
10✔
102
    elif isinstance(constraint, CheckConstraint):
10✔
103
        key = "ck"
10✔
104
    elif isinstance(constraint, UniqueConstraint):
10✔
105
        key = "uq"
10✔
106
    elif isinstance(constraint, PrimaryKeyConstraint):
10✔
107
        key = "pk"
10✔
108
    elif isinstance(constraint, ForeignKeyConstraint):
10✔
109
        key = "fk"
10✔
110
        values.update(
10✔
111
            {
10✔
112
                "referred_table_name": constraint.referred_table,
10✔
113
                "referred_column_0_name": constraint.elements[0].column.name,
10✔
114
                "referred_column_0N_name": "".join(
10✔
115
                    fk.column.name for fk in constraint.elements
10✔
116
                ),
117
                "referred_column_0_N_name": "_".join(
10✔
118
                    fk.column.name for fk in constraint.elements
10✔
119
                ),
120
                "referred_column_0_label": constraint.elements[0]
10✔
121
                .column.label(constraint.elements[0].column.name)
10✔
122
                .name,
6✔
123
                "referred_fk.column_0N_label": "".join(
10✔
124
                    fk.column.label(fk.column.name).name for fk in constraint.elements
10✔
125
                ),
126
                "referred_fk.column_0_N_label": "_".join(
10✔
127
                    fk.column.label(fk.column.name).name for fk in constraint.elements
10✔
128
                ),
129
                "referred_fk.column_0_key": constraint.elements[0].column.key,
10✔
130
                "referred_fk.column_0N_key": "".join(
10✔
131
                    fk.column.key for fk in constraint.elements if fk.column.key
10✔
132
                ),
133
                "referred_fk.column_0_N_key": "_".join(
10✔
134
                    fk.column.key for fk in constraint.elements if fk.column.key
10✔
135
                ),
136
            }
137
        )
138
    else:
139
        raise TypeError(f"Unknown constraint type: {constraint.__class__.__qualname__}")
140

141
    try:
10✔
142
        convention: str = table.metadata.naming_convention[key]
10✔
143
        return constraint.name == (convention % values)
10✔
144
    except KeyError:
10✔
145
        return False
10✔
146

147

148
def render_callable(
10✔
149
    name: str,
2✔
150
    *args: object,
2✔
151
    kwargs: Mapping[str, object] | None = None,
10✔
152
    indentation: str = "",
10✔
153
) -> str:
2✔
154
    """
155
    Render a function call.
156

157
    :param name: name of the callable
158
    :param args: positional arguments
159
    :param kwargs: keyword arguments
160
    :param indentation: if given, each argument will be rendered on its own line with
161
        this value used as the indentation
162

163
    """
164
    if kwargs:
10✔
165
        args += tuple(f"{key}={value}" for key, value in kwargs.items())
10✔
166

167
    if indentation:
10✔
168
        prefix = f"\n{indentation}"
10✔
169
        suffix = "\n"
10✔
170
        delimiter = f",\n{indentation}"
10✔
171
    else:
172
        prefix = suffix = ""
10✔
173
        delimiter = ", "
10✔
174

175
    rendered_args = delimiter.join(str(arg) for arg in args)
10✔
176
    return f"{name}({prefix}{rendered_args}{suffix})"
10✔
177

178

179
def qualified_table_name(table: Table) -> str:
10✔
180
    if table.schema:
10✔
181
        return f"{table.schema}.{table.name}"
10✔
182
    else:
183
        return str(table.name)
10✔
184

185

186
def decode_postgresql_sequence(clause: TextClause) -> tuple[str | None, str | None]:
10✔
187
    match = _re_postgresql_nextval_sequence.match(clause.text)
10✔
188
    if not match:
10✔
189
        return None, None
190

191
    schema: str | None = None
10✔
192
    sequence: str = ""
10✔
193
    in_quotes = False
10✔
194
    for match in _re_postgresql_sequence_delimiter.finditer(match.group(1)):
10✔
195
        sequence += match.group(1)
10✔
196
        if match.group(2) == '"':
10✔
197
            in_quotes = not in_quotes
10✔
198
        elif match.group(2) == ".":
10✔
199
            if in_quotes:
10✔
200
                sequence += "."
10✔
201
            else:
202
                schema, sequence = sequence, ""
10✔
203

204
    return schema, sequence
10✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc