• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

gocept / gocept.testdb / 6761143091

10 Oct 2023 06:25AM UTC coverage: 95.04%. Remained the same
6761143091

push

github

web-flow
[pre-commit.ci] pre-commit autoupdate (#42)

updates:
- [github.com/pre-commit/pre-commit-hooks: v4.4.0 → v4.5.0](https://github.com/pre-commit/pre-commit-hooks/compare/v4.4.0...v4.5.0)

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>

95 of 111 branches covered (0.0%)

Branch coverage included in aggregate %.

499 of 514 relevant lines covered (97.08%)

0.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.27
/src/gocept/testdb/postgres.py
1
from .base import Database
1✔
2
import os
1✔
3
import sqlalchemy
1✔
4
import subprocess
1✔
5

6

7
class PostgreSQL(Database):
1✔
8

9
    protocol = 'postgresql'
1✔
10
    environ_prefix = 'POSTGRES'
1✔
11

12
    def __init__(self, encoding=None, db_template=None,
1✔
13
                 force_template=False, lc_collate=None,
14
                 *args, **kw):
15
        super().__init__(*args, **kw)
1✔
16
        self.encoding = encoding
1✔
17
        self.db_template = db_template
1✔
18
        self.lc_collate = lc_collate
1✔
19
        self.force_template = force_template
1✔
20

21
    def login_args(self, command, extra_args=()):
1✔
22
        args = [
1✔
23
            command,
24
            '-h', self.db_host]
25
        if self.db_port:
1!
26
            args.extend(['-p', self.db_port])
×
27
        if self.db_user:
1!
28
            args.extend(['-U', self.db_user])
1✔
29
        args.extend(extra_args)
1✔
30
        return args
1✔
31

32
    def create(self):
1✔
33
        if self.db_template:
1✔
34
            try:
1✔
35
                self.create_template()
1✔
36
            except SystemExit as e:
1✔
37
                try:
1✔
38
                    self.drop_db(self.db_template)
1✔
39
                except BaseException:  # pragma: no cover
40
                    pass
41
                raise e
1✔
42
            try:
1✔
43
                self.create_db(
1✔
44
                    self.db_name,
45
                    db_template=self.db_template)
46
            except AssertionError:  # pragma: no cover
47
                raise SystemExit(
48
                    "Could not create database %r from template %r" %
49
                    (self.db_name, self.db_template))
50
        else:
51
            self.create_db_from_schema(self.db_name)
1✔
52

53
    def create_template(self):
1✔
54
        if self.schema_path is None:
1✔
55
            schema_mtime = 0
1✔
56
        else:
57
            schema_mtime = int(os.path.getmtime(self.schema_path))
1✔
58

59
        if self.db_template in self.list_db_names():
1✔
60
            template_mtime = self._get_db_mtime(self.db_template)
1✔
61
            if self.force_template or schema_mtime != template_mtime:
1✔
62
                self.drop_db(self.db_template)
1✔
63
            else:
64
                return
1✔
65

66
        self.create_db_from_schema(self.db_template)
1✔
67
        self._set_db_mtime(self.db_template, schema_mtime)
1✔
68

69
    def create_db(self, db_name, db_template=None, lc_collate=None):
1✔
70
        create_args = []
1✔
71
        if db_template is not None:
1✔
72
            create_args.extend(['-T', self.db_template])
1✔
73
        if self.lc_collate is not None:
1✔
74
            create_args.extend(['--lc-collate', self.lc_collate])
1✔
75
            create_args.extend(['-T', 'template0'])
1✔
76
        if self.encoding:
1✔
77
            create_args.extend(['-E', self.encoding])
1✔
78
        args = self.login_args('createdb', create_args + [db_name])
1✔
79
        assert 0 == subprocess.call(args), " ".join(args)
1✔
80

81
    def create_schema(self, db_name):
1✔
82
        assert 0 == subprocess.call(
1✔
83
            self.login_args(
84
                'psql', ['-f', self.schema_path,
85
                         '-v', 'ON_ERROR_STOP=true', '--quiet',
86
                         db_name]))
87

88
    def pg_list_db_items(self):
1✔
89
        # Use unaligned output to simplify splitting.
90
        raw_list, _ = subprocess.Popen(self.login_args('psql', ['-l', '-A']),
1✔
91
                                       stdout=subprocess.PIPE).communicate()
92
        result = []
1✔
93
        for line in raw_list.splitlines()[2:-1]:
1✔
94
            line = line.decode('us-ascii')
1✔
95
            if '|' in line:
1✔
96
                result.append(line.split('|'))
1✔
97
        return result
1✔
98

99
    def list_db_names(self):
1✔
100
        return [items[0] for items in self.pg_list_db_items()]
1✔
101

102
    def _get_db_mtime(self, database):
1✔
103
        dsn = self.get_dsn(database)
1✔
104
        conn = sqlalchemy.create_engine(dsn).connect()
1✔
105
        result = next(conn.execute(sqlalchemy.text(
1✔
106
            'SELECT schema_mtime FROM tmp_functest;'
107
        )).cursor)
108
        conn.invalidate()
1✔
109
        conn.close()
1✔
110
        if result:
1!
111
            result = result[0]
1✔
112
        else:
113
            result = 0
×
114
        return result
1✔
115

116
    def _set_db_mtime(self, database, mtime):
1✔
117
        dsn = self.get_dsn(database)
1✔
118
        engine = sqlalchemy.create_engine(dsn)
1✔
119
        with engine.begin() as conn:
1✔
120
            conn.execute(
1✔
121
                sqlalchemy.text(
122
                    'INSERT INTO tmp_functest (schema_mtime) VALUES (:mtime);'
123
                ),
124
                {"mtime": mtime}
125
            )
126
        engine.dispose()
1✔
127

128
    def drop_all(self, drop_template=False):
1✔
129
        for name in self.list_db_names():
1✔
130
            if (name == self.db_template and drop_template or
1✔
131
                    self._matches_db_naming_scheme(name)):
132
                self.drop_db(name)
1✔
133

134
    def drop_db(self, db_name):
1✔
135
        try:
1✔
136
            assert 0 == subprocess.call(self.login_args('dropdb', [db_name]))
1✔
137
        except AssertionError:
×
138
            for command in [
×
139
                ("UPDATE pg_database SET datallowconn = false "
140
                 "WHERE datname = '" + db_name + "'"),
141
                'ALTER DATABASE "' + db_name + '" CONNECTION LIMIT 1',
142
                ("SELECT pg_terminate_backend(pid) FROM pg_stat_activity "
143
                 "WHERE datname = '" + db_name + "'"),
144
            ]:
145
                subprocess.call(self.login_args('psql', [
×
146
                    '--dbname=postgres', '-c', command]))
147
            assert 0 == subprocess.call(self.login_args('dropdb', [db_name]))
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc