Coveralls logob
Coveralls logo
  • Home
  • Features
  • Pricing
  • Docs
  • Sign In

matrix-org / synapse / 4532

23 Sep 2019 - 19:39 coverage decreased (-49.7%) to 17.596%
4532

Pull #6079

buildkite

Richard van der Hoff
update changelog
Pull Request #6079: Add submit_url response parameter to msisdn /requestToken

359 of 12986 branches covered (2.76%)

Branch coverage included in aggregate %.

0 of 7 new or added lines in 1 file covered. (0.0%)

18869 existing lines in 281 files now uncovered.

8809 of 39116 relevant lines covered (22.52%)

0.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

8.61
/synapse/storage/prepare_database.py
1
# -*- coding: utf-8 -*-
2
# Copyright 2014 - 2016 OpenMarket Ltd
3
# Copyright 2018 New Vector Ltd
4
#
5
# Licensed under the Apache License, Version 2.0 (the "License");
6
# you may not use this file except in compliance with the License.
7
# You may obtain a copy of the License at
8
#
9
#     http://www.apache.org/licenses/LICENSE-2.0
10
#
11
# Unless required by applicable law or agreed to in writing, software
12
# distributed under the License is distributed on an "AS IS" BASIS,
13
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
# See the License for the specific language governing permissions and
15
# limitations under the License.
16

17
import fnmatch
1×
18
import imp
1×
19
import logging
1×
20
import os
1×
21
import re
1×
22

23
from synapse.storage.engines.postgres import PostgresEngine
1×
24

25
logger = logging.getLogger(__name__)
1×
26

27

28
# Remember to update this number every time a change is made to database
29
# schema files, so the users will be informed on server restarts.
30
SCHEMA_VERSION = 56
1×
31

32
dir_path = os.path.abspath(os.path.dirname(__file__))
1×
33

34

35
class PrepareDatabaseException(Exception):
1×
36
    pass
1×
37

38

39
class UpgradeDatabaseException(PrepareDatabaseException):
1×
40
    pass
1×
41

42

43
def prepare_database(db_conn, database_engine, config):
1×
44
    """Prepares a database for usage. Will either create all necessary tables
45
    or upgrade from an older schema version.
46

47
    If `config` is None then prepare_database will assert that no upgrade is
48
    necessary, *or* will create a fresh database if the database is empty.
49

50
    Args:
51
        db_conn:
52
        database_engine:
53
        config (synapse.config.homeserver.HomeServerConfig|None):
54
            application config, or None if we are connecting to an existing
55
            database which we expect to be configured already
56
    """
UNCOV
57
    try:
!
UNCOV
58
        cur = db_conn.cursor()
!
UNCOV
59
        version_info = _get_or_create_schema_state(cur, database_engine)
!
60

UNCOV
61
        if version_info:
Branches [[0, 62], [0, 74]] missed. !
62
            user_version, delta_files, upgraded = version_info
!
63

64
            if config is None:
Branches [[0, 65], [0, 70]] missed. !
65
                if user_version != SCHEMA_VERSION:
Branches [[0, 68], [0, 77]] missed. !
66
                    # If we don't pass in a config file then we are expecting to
67
                    # have already upgraded the DB.
68
                    raise UpgradeDatabaseException("Database needs to be upgraded")
!
69
            else:
70
                _upgrade_existing_database(
!
71
                    cur, user_version, delta_files, upgraded, database_engine, config
72
                )
73
        else:
UNCOV
74
            _setup_new_database(cur, database_engine)
!
75

76
        # check if any of our configured dynamic modules want a database
UNCOV
77
        if config is not None:
Branches [[0, 78], [0, 80]] missed. !
UNCOV
78
            _apply_module_schemas(cur, database_engine, config)
!
79

UNCOV
80
        cur.close()
!
UNCOV
81
        db_conn.commit()
!
82
    except Exception:
!
83
        db_conn.rollback()
!
84
        raise
!
85

86

87
def _setup_new_database(cur, database_engine):
1×
88
    """Sets up the database by finding a base set of "full schemas" and then
89
    applying any necessary deltas.
90

91
    The "full_schemas" directory has subdirectories named after versions. This
92
    function searches for the highest version less than or equal to
93
    `SCHEMA_VERSION` and executes all .sql files in that directory.
94

95
    The function will then apply all deltas for all versions after the base
96
    version.
97

98
    Example directory structure:
99

100
        schema/
101
            delta/
102
                ...
103
            full_schemas/
104
                3/
105
                    test.sql
106
                    ...
107
                11/
108
                    foo.sql
109
                    bar.sql
110
                ...
111

112
    In the example foo.sql and bar.sql would be run, and then any delta files
113
    for versions strictly greater than 11.
114
    """
UNCOV
115
    current_dir = os.path.join(dir_path, "schema", "full_schemas")
!
UNCOV
116
    directory_entries = os.listdir(current_dir)
!
117

UNCOV
118
    valid_dirs = []
!
UNCOV
119
    pattern = re.compile(r"^\d+(\.sql)?$")
!
120

UNCOV
121
    if isinstance(database_engine, PostgresEngine):
Branches [[0, 122], [0, 124]] missed. !
UNCOV
122
        specific = "postgres"
!
123
    else:
124
        specific = "sqlite"
!
125

UNCOV
126
    specific_pattern = re.compile(r"^\d+(\.sql." + specific + r")?$")
!
127

UNCOV
128
    for filename in directory_entries:
Branches [[0, 129], [0, 138]] missed. !
UNCOV
129
        match = pattern.match(filename) or specific_pattern.match(filename)
!
UNCOV
130
        abs_path = os.path.join(current_dir, filename)
!
UNCOV
131
        if match and os.path.isdir(abs_path):
Branches [[0, 132], [0, 136]] missed. !
UNCOV
132
            ver = int(match.group(0))
!
UNCOV
133
            if ver <= SCHEMA_VERSION:
Branches [[0, 128], [0, 134]] missed. !
UNCOV
134
                valid_dirs.append((ver, abs_path))
!
135
        else:
UNCOV
136
            logger.debug("Ignoring entry '%s' in 'full_schemas'", filename)
!
137

UNCOV
138
    if not valid_dirs:
Branches [[0, 139], [0, 143]] missed. !
139
        raise PrepareDatabaseException(
!
140
            "Could not find a suitable base set of full schemas"
141
        )
142

UNCOV
143
    max_current_ver, sql_dir = max(valid_dirs, key=lambda x: x[0])
Branches [[0, 143], [0, 145]] missed. !
144

UNCOV
145
    logger.debug("Initialising schema v%d", max_current_ver)
!
146

UNCOV
147
    directory_entries = os.listdir(sql_dir)
!
148

UNCOV
149
    for filename in sorted(
Branches [[0, 153], [0, 157]] missed. !
150
        fnmatch.filter(directory_entries, "*.sql")
151
        + fnmatch.filter(directory_entries, "*.sql." + specific)
152
    ):
UNCOV
153
        sql_loc = os.path.join(sql_dir, filename)
!
UNCOV
154
        logger.debug("Applying schema %s", sql_loc)
!
UNCOV
155
        executescript(cur, sql_loc)
!
156

UNCOV
157
    cur.execute(
!
158
        database_engine.convert_param_style(
159
            "INSERT INTO schema_version (version, upgraded)" " VALUES (?,?)"
160
        ),
161
        (max_current_ver, False),
162
    )
163

UNCOV
164
    _upgrade_existing_database(
!
165
        cur,
166
        current_version=max_current_ver,
167
        applied_delta_files=[],
168
        upgraded=False,
169
        database_engine=database_engine,
170
        config=None,
171
        is_empty=True,
172
    )
173

174

175
def _upgrade_existing_database(
1×
176
    cur,
177
    current_version,
178
    applied_delta_files,
179
    upgraded,
180
    database_engine,
181
    config,
182
    is_empty=False,
183
):
184
    """Upgrades an existing database.
185

186
    Delta files can either be SQL stored in *.sql files, or python modules
187
    in *.py.
188

189
    There can be multiple delta files per version. Synapse will keep track of
190
    which delta files have been applied, and will apply any that haven't been
191
    even if there has been no version bump. This is useful for development
192
    where orthogonal schema changes may happen on separate branches.
193

194
    Different delta files for the same version *must* be orthogonal and give
195
    the same result when applied in any order. No guarantees are made on the
196
    order of execution of these scripts.
197

198
    This is a no-op of current_version == SCHEMA_VERSION.
199

200
    Example directory structure:
201

202
        schema/
203
            delta/
204
                11/
205
                    foo.sql
206
                    ...
207
                12/
208
                    foo.sql
209
                    bar.py
210
                ...
211
            full_schemas/
212
                ...
213

214
    In the example, if current_version is 11, then foo.sql will be run if and
215
    only if `upgraded` is True. Then `foo.sql` and `bar.py` would be run in
216
    some arbitrary order.
217

218
    Args:
219
        cur (Cursor)
220
        current_version (int): The current version of the schema.
221
        applied_delta_files (list): A list of deltas that have already been
222
            applied.
223
        upgraded (bool): Whether the current version was generated by having
224
            applied deltas or from full schema file. If `True` the function
225
            will never apply delta files for the given `current_version`, since
226
            the current_version wasn't generated by applying those delta files.
227
    """
228

UNCOV
229
    if current_version > SCHEMA_VERSION:
Branches [[0, 230], [0, 235]] missed. !
230
        raise ValueError(
!
231
            "Cannot use this database as it is too "
232
            + "new for the server to understand"
233
        )
234

UNCOV
235
    start_ver = current_version
!
UNCOV
236
    if not upgraded:
Branches [[0, 237], [0, 239]] missed. !
UNCOV
237
        start_ver += 1
!
238

UNCOV
239
    logger.debug("applied_delta_files: %s", applied_delta_files)
!
240

UNCOV
241
    if isinstance(database_engine, PostgresEngine):
Branches [[0, 242], [0, 244]] missed. !
UNCOV
242
        specific_engine_extension = ".postgres"
!
243
    else:
244
        specific_engine_extension = ".sqlite"
!
245

UNCOV
246
    specific_engine_extensions = (".sqlite", ".postgres")
!
247

UNCOV
248
    for v in range(start_ver, SCHEMA_VERSION + 1):
Branches [[0, 175], [0, 249]] missed. !
UNCOV
249
        logger.info("Upgrading schema to v%d", v)
!
250

UNCOV
251
        delta_dir = os.path.join(dir_path, "schema", "delta", str(v))
!
252

UNCOV
253
        try:
!
UNCOV
254
            directory_entries = os.listdir(delta_dir)
!
255
        except OSError:
!
256
            logger.exception("Could not open delta dir for version %d", v)
!
257
            raise UpgradeDatabaseException(
!
258
                "Could not open delta dir for version %d" % (v,)
259
            )
260

UNCOV
261
        directory_entries.sort()
!
UNCOV
262
        for file_name in directory_entries:
Branches [[0, 248], [0, 263]] missed. !
UNCOV
263
            relative_path = os.path.join(str(v), file_name)
!
UNCOV
264
            logger.debug("Found file: %s", relative_path)
!
UNCOV
265
            if relative_path in applied_delta_files:
Branches [[0, 266], [0, 268]] missed. !
266
                continue
!
267

UNCOV
268
            absolute_path = os.path.join(dir_path, "schema", "delta", relative_path)
!
UNCOV
269
            root_name, ext = os.path.splitext(file_name)
!
UNCOV
270
            if ext == ".py":
Branches [[0, 273], [0, 280]] missed. !
271
                # This is a python upgrade module. We need to import into some
272
                # package and then execute its `run_upgrade` function.
273
                module_name = "synapse.storage.v%d_%s" % (v, root_name)
!
274
                with open(absolute_path) as python_file:
!
275
                    module = imp.load_source(module_name, absolute_path, python_file)
!
276
                logger.info("Running script %s", relative_path)
!
277
                module.run_create(cur, database_engine)
!
278
                if not is_empty:
Branches [[0, 279], [0, 305]] missed. !
279
                    module.run_upgrade(cur, database_engine, config=config)
!
UNCOV
280
            elif ext == ".pyc" or file_name == "__pycache__":
Branches [[0, 284], [0, 285]] missed. !
281
                # Sometimes .pyc files turn up anyway even though we've
282
                # disabled their generation; e.g. from distribution package
283
                # installers. Silently skip it
284
                continue
!
UNCOV
285
            elif ext == ".sql":
Branches [[0, 287], [0, 289]] missed. !
286
                # A plain old .sql file, just read and execute it
UNCOV
287
                logger.info("Applying schema %s", relative_path)
!
UNCOV
288
                executescript(cur, absolute_path)
!
289
            elif ext == specific_engine_extension and root_name.endswith(".sql"):
Branches [[0, 291], [0, 293]] missed. !
290
                # A .sql file specific to our engine; just read and execute it
291
                logger.info("Applying engine-specific schema %s", relative_path)
!
292
                executescript(cur, absolute_path)
!
293
            elif ext in specific_engine_extensions and root_name.endswith(".sql"):
Branches [[0, 295], [0, 298]] missed. !
294
                # A .sql file for a different engine; skip it.
295
                continue
!
296
            else:
297
                # Not a valid delta file.
298
                logger.warning(
!
299
                    "Found directory entry that did not end in .py or .sql: %s",
300
                    relative_path,
301
                )
302
                continue
!
303

304
            # Mark as done.
UNCOV
305
            cur.execute(
!
306
                database_engine.convert_param_style(
307
                    "INSERT INTO applied_schema_deltas (version, file) VALUES (?,?)"
308
                ),
309
                (v, relative_path),
310
            )
311

UNCOV
312
            cur.execute("DELETE FROM schema_version")
!
UNCOV
313
            cur.execute(
!
314
                database_engine.convert_param_style(
315
                    "INSERT INTO schema_version (version, upgraded) VALUES (?,?)"
316
                ),
317
                (v, True),
318
            )
319

320

321
def _apply_module_schemas(txn, database_engine, config):
1×
322
    """Apply the module schemas for the dynamic modules, if any
323

324
    Args:
325
        cur: database cursor
326
        database_engine: synapse database engine class
327
        config (synapse.config.homeserver.HomeServerConfig):
328
            application config
329
    """
UNCOV
330
    for (mod, _config) in config.password_providers:
Branches [[0, 321], [0, 331]] missed. !
331
        if not hasattr(mod, "get_db_schema_files"):
Branches [[0, 332], [0, 333]] missed. !
332
            continue
!
333
        modname = ".".join((mod.__module__, mod.__name__))
!
334
        _apply_module_schema_files(
!
335
            txn, database_engine, modname, mod.get_db_schema_files()
336
        )
337

338

339
def _apply_module_schema_files(cur, database_engine, modname, names_and_streams):
1×
340
    """Apply the module schemas for a single module
341

342
    Args:
343
        cur: database cursor
344
        database_engine: synapse database engine class
345
        modname (str): fully qualified name of the module
346
        names_and_streams (Iterable[(str, file)]): the names and streams of
347
            schemas to be applied
348
    """
349
    cur.execute(
!
350
        database_engine.convert_param_style(
351
            "SELECT file FROM applied_module_schemas WHERE module_name = ?"
352
        ),
353
        (modname,),
354
    )
355
    applied_deltas = set(d for d, in cur)
Branches [[0, 355], [0, 356]] missed. !
356
    for (name, stream) in names_and_streams:
Branches [[0, 339], [0, 357]] missed. !
357
        if name in applied_deltas:
Branches [[0, 358], [0, 360]] missed. !
358
            continue
!
359

360
        root_name, ext = os.path.splitext(name)
!
361
        if ext != ".sql":
Branches [[0, 362], [0, 366]] missed. !
362
            raise PrepareDatabaseException(
!
363
                "only .sql files are currently supported for module schemas"
364
            )
365

366
        logger.info("applying schema %s for %s", name, modname)
!
367
        for statement in get_statements(stream):
Branches [[0, 368], [0, 371]] missed. !
368
            cur.execute(statement)
!
369

370
        # Mark as done.
371
        cur.execute(
!
372
            database_engine.convert_param_style(
373
                "INSERT INTO applied_module_schemas (module_name, file)" " VALUES (?,?)"
374
            ),
375
            (modname, name),
376
        )
377

378

379
def get_statements(f):
1×
UNCOV
380
    statement_buffer = ""
!
UNCOV
381
    in_comment = False  # If we're in a /* ... */ style comment
!
382

UNCOV
383
    for line in f:
Branches [[0, 379], [0, 384]] missed. !
UNCOV
384
        line = line.strip()
!
385

UNCOV
386
        if in_comment:
Branches [[0, 388], [0, 395]] missed. !
387
            # Check if this line contains an end to the comment
UNCOV
388
            comments = line.split("*/", 1)
!
UNCOV
389
            if len(comments) == 1:
Branches [[0, 390], [0, 391]] missed. !
UNCOV
390
                continue
!
UNCOV
391
            line = comments[1]
!
UNCOV
392
            in_comment = False
!
393

394
        # Remove inline block comments
UNCOV
395
        line = re.sub(r"/\*.*\*/", " ", line)
!
396

397
        # Does this line start a comment?
UNCOV
398
        comments = line.split("/*", 1)
!
UNCOV
399
        if len(comments) > 1:
Branches [[0, 400], [0, 404]] missed. !
UNCOV
400
            line = comments[0]
!
UNCOV
401
            in_comment = True
!
402

403
        # Deal with line comments
UNCOV
404
        line = line.split("--", 1)[0]
!
UNCOV
405
        line = line.split("//", 1)[0]
!
406

407
        # Find *all* semicolons. We need to treat first and last entry
408
        # specially.
UNCOV
409
        statements = line.split(";")
!
410

411
        # We must prepend statement_buffer to the first statement
UNCOV
412
        first_statement = "%s %s" % (statement_buffer.strip(), statements[0].strip())
!
UNCOV
413
        statements[0] = first_statement
!
414

415
        # Every entry, except the last, is a full statement
UNCOV
416
        for statement in statements[:-1]:
Branches [[0, 417], [0, 421]] missed. !
UNCOV
417
            yield statement.strip()
!
418

419
        # The last entry did *not* end in a semicolon, so we store it for the
420
        # next semicolon we find
UNCOV
421
        statement_buffer = statements[-1].strip()
!
422

423

424
def executescript(txn, schema_path):
1×
UNCOV
425
    with open(schema_path, "r") as f:
!
UNCOV
426
        for statement in get_statements(f):
Branches [[0, 424], [0, 427]] missed. !
UNCOV
427
            txn.execute(statement)
!
428

429

430
def _get_or_create_schema_state(txn, database_engine):
1×
431
    # Bluntly try creating the schema_version tables.
UNCOV
432
    schema_path = os.path.join(dir_path, "schema", "schema_version.sql")
!
UNCOV
433
    executescript(txn, schema_path)
!
434

UNCOV
435
    txn.execute("SELECT version, upgraded FROM schema_version")
!
UNCOV
436
    row = txn.fetchone()
!
UNCOV
437
    current_version = int(row[0]) if row else None
!
UNCOV
438
    upgraded = bool(row[1]) if row else None
!
439

UNCOV
440
    if current_version:
Branches [[0, 441], [0, 450]] missed. !
441
        txn.execute(
!
442
            database_engine.convert_param_style(
443
                "SELECT file FROM applied_schema_deltas WHERE version >= ?"
444
            ),
445
            (current_version,),
446
        )
447
        applied_deltas = [d for d, in txn]
Branches [[0, 447], [0, 448]] missed. !
448
        return current_version, applied_deltas, upgraded
!
449

UNCOV
450
    return None
!
Troubleshooting · Open an Issue · Sales · Support · ENTERPRISE · CAREERS · STATUS
BLOG · TWITTER · Legal & Privacy · Supported CI Services · What's a CI service? · Automated Testing

© 2019 Coveralls, LLC