• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mozilla-releng / balrog / #5457

05 May 2026 03:06PM UTC coverage: 89.879% (-0.06%) from 89.936%
#5457

Pull #3776

circleci

bhearsum
update deployment docs for argocd stage deployments
Pull Request #3776: chore: update deployment docs to reflect new mozcloud stage environment

2189 of 2574 branches covered (85.04%)

Branch coverage included in aggregate %.

5759 of 6269 relevant lines covered (91.86%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.02
/src/auslib/db.py
1
import itertools
1✔
2
import json
1✔
3
import logging
1✔
4
import re
1✔
5
import time
1✔
6
from collections import defaultdict
1✔
7
from copy import copy
1✔
8
from os import path
1✔
9

10
import migrate.versioning.api
1✔
11
import migrate.versioning.schema
1✔
12
import sqlalchemy.event
1✔
13
import sqlalchemy.types
1✔
14
from aiohttp import ClientSession
1✔
15
from sqlalchemy import JSON, BigInteger, Boolean, Column, Integer, MetaData, String, Table, Text, create_engine, func, join, select
1✔
16
from sqlalchemy.exc import SQLAlchemyError
1✔
17
from sqlalchemy.sql.expression import null
1✔
18
from sqlalchemy.sql.functions import max as sql_max
1✔
19

20
from auslib.blobs.base import createBlob, merge_dicts
1✔
21
from auslib.errors import PermissionDeniedError, ReadOnlyError, SignoffRequiredError
1✔
22
from auslib.global_state import cache
1✔
23
from auslib.util.rulematching import (
1✔
24
    matchBoolean,
25
    matchBuildID,
26
    matchChannel,
27
    matchCsv,
28
    matchLocale,
29
    matchMemory,
30
    matchRegex,
31
    matchSimpleExpression,
32
    matchVersion,
33
)
34
from auslib.util.signoffs import get_required_signoffs_for_product_channel
1✔
35
from auslib.util.statsd import statsd
1✔
36
from auslib.util.timestamp import getMillisecondTimestamp
1✔
37
from auslib.util.versions import get_version_class
1✔
38

39

40
def rows_to_dicts(rows):
1✔
41
    """Converts SQL Alchemy result rows to dicts.
42

43
    You might want this if you want to mutate objects (SQLAlchemy rows
44
    are immutable), or if you want to serialize them to JSON
45
    (SQLAlchemy rows get confused if you try to serialize them).
46
    """
47
    # In Python 3, map returns an iterable instead a list.
48
    return [dict(row) for row in rows]
1✔
49

50

51
class AlreadySetupError(Exception):
1✔
52
    def __str__(self):
1✔
53
        return "Can't connect to new database, still connected to previous one"
×
54

55

56
class TransactionError(SQLAlchemyError):
1✔
57
    """Raised when a transaction fails for any reason."""
58

59

60
class OutdatedDataError(SQLAlchemyError):
1✔
61
    """Raised when an update or delete fails because of outdated data."""
62

63

64
class MismatchedDataVersionError(SQLAlchemyError):
1✔
65
    """Raised when the data version of a scheduled change and its associated conditions
66
    row do not match after an insert or update."""
67

68

69
class WrongNumberOfRowsError(SQLAlchemyError):
1✔
70
    """Raised when an update or delete fails because the clause matches more than one row."""
71

72

73
class UpdateMergeError(SQLAlchemyError):
1✔
74
    pass
1✔
75

76

77
class ChangeScheduledError(SQLAlchemyError):
1✔
78
    """Raised when a Scheduled Change cannot be created, modified, or deleted
79
    for data consistency reasons."""
80

81

82
class JSONColumn(sqlalchemy.types.TypeDecorator):
1✔
83
    """JSONColumns are used for types that are deserialized JSON (usually
84
    dicts) in memory, but need to be serialized to text before storage.
85
    JSONColumn handles the conversion both ways, serialized just before
86
    storage, and deserialized just after retrieval."""
87

88
    impl = Text
1✔
89
    cache_ok = True
1✔
90

91
    def process_bind_param(self, value, dialect):
1✔
92
        if value:
1✔
93
            value = json.dumps(value)
1✔
94
        return value
1✔
95

96
    def process_result_value(self, value, dialect):
1✔
97
        if value:
1✔
98
            value = json.loads(value)
1✔
99
        return value
1✔
100

101

102
class CompatibleBooleanColumn(sqlalchemy.types.TypeDecorator):
1✔
103
    """A Boolean column that is compatible with all of our supported
104
    database engines (mysql, sqlite). SQLAlchemy's built-in Boolean
105
    does not work because it creates a CHECK constraint that makes
106
    it impossible to downgrade a database with sqlalchemy-migrate."""
107

108
    impl = Integer
1✔
109
    cache_ok = True
1✔
110

111
    def process_bind_param(self, value, dialect):
1✔
112
        if value is not None:
1✔
113
            if not isinstance(value, bool):
1!
114
                raise TypeError("{} is invalid type ({}), must be bool".format(value, type(value)))
×
115

116
            if value is True:
1✔
117
                value = 1
1✔
118
            else:
119
                value = 0
1✔
120
        return value
1✔
121

122
    def process_result_value(self, value, dialect):
1✔
123
        # Boolean columns may be nullable, we need to be sure to preserve nulls
124
        # in case consumers treat them differently than False.
125
        if value is not None:
1✔
126
            value = bool(value)
1✔
127
        return value
1✔
128

129

130
def BlobColumn(impl=Text):
1✔
131
    """BlobColumns are used to store Release Blobs, which are ultimately dicts.
132
    Release Blobs must be serialized before storage, and deserialized upon
133
    retrieval. This type handles both conversions. Some database engines
134
    (eg: mysql) may require a different underlying type than Text. The
135
    desired type may be passed in as an argument."""
136

137
    class cls(sqlalchemy.types.TypeDecorator):
1✔
138
        cache_ok = True
1✔
139

140
        def process_bind_param(self, value, dialect):
1✔
141
            if value:
1✔
142
                value = value.getJSON()
1✔
143
            return value
1✔
144

145
        def process_result_value(self, value, dialect):
1✔
146
            if value:
1✔
147
                value = createBlob(value)
1✔
148
            return value
1✔
149

150
    cls.impl = impl
1✔
151
    return cls
1✔
152

153

154
def verify_signoffs(potential_required_signoffs, signoffs):
1✔
155
    """Determines whether or not something is signed off given:
156
    * A list of potential required signoffs
157
    * A list of signoffs that have been made
158

159
    The real number of signoffs required is found by looking through the
160
    potential required signoffs and finding the highest number required for each
161
    role. If there are not enough signoffs provided for any of the groups,
162
    a SignoffRequiredError is raised."""
163

164
    signoffs_given = defaultdict(int)
1✔
165
    required_signoffs = {}
1✔
166
    if not potential_required_signoffs:
1✔
167
        return
1✔
168
    if not signoffs:
1✔
169
        raise SignoffRequiredError("No Signoffs given")
1✔
170
    for signoff in signoffs:
1✔
171
        signoffs_given[signoff["role"]] += 1
1✔
172
    for rs in potential_required_signoffs:
1✔
173
        required_signoffs[rs["role"]] = max(required_signoffs.get(rs["role"], 0), rs["signoffs_required"])
1✔
174
    for role, signoffs_required in required_signoffs.items():
1✔
175
        if signoffs_given[role] < signoffs_required:
1✔
176
            raise SignoffRequiredError("Not enough signoffs for role '{}'".format(role))
1✔
177

178

179
class AUSTransaction(object):
1✔
180
    """Manages a single transaction. Requires a connection object.
181

182
    :param conn: connection object to perform the transaction on
183
    :type conn: sqlalchemy.engine.base.Connection
184

185
    The connection and transaction are opened lazily on the first call to
186
    execute().
187
    """
188

189
    def __init__(self, engine):
1✔
190
        self.engine = engine
1✔
191
        self.conn = None
1✔
192
        self.trans = None
1✔
193
        self.log = logging.getLogger(self.__class__.__name__)
1✔
194

195
    def _ensure_connection(self):
1✔
196
        if self.conn is None:
1✔
197
            self.conn = self.engine.connect()
1✔
198
            self.trans = self.conn.begin()
1✔
199

200
    def __enter__(self):
1✔
201
        return self
1✔
202

203
    def __exit__(self, exc_type, exc_value, exc_traceback):
1✔
204
        if self.conn is None:
1✔
205
            return
1✔
206
        try:
1✔
207
            # If something that executed in the context raised an Exception,
208
            # rollback and re-raise it.
209
            if exc_type:
1✔
210
                self.log.debug("exc is:", exc_info=True)
1✔
211
                self.rollback()
1✔
212
                return False
1✔
213
            # self.commit will issue a rollback if it raises
214
            self.commit()
1✔
215
        finally:
216
            # Always make sure the connection is closed, bug 740360
217
            self.close()
1✔
218

219
    def close(self):
1✔
220
        # For some reason, sometimes the connection appears to close itself...
221
        if self.conn is not None and not self.conn.closed:
1✔
222
            self.conn.close()
1✔
223

224
    def execute(self, statement):
1✔
225
        self._ensure_connection()
1✔
226
        try:
1✔
227
            self.log.debug("Attempting to execute %s" % statement)
1✔
228
            return self.conn.execute(statement)
1✔
229
        except Exception as exc:
1✔
230
            self.log.debug("Caught exception")
1✔
231
            # We want to raise our own Exception, so that errors are easily
232
            # caught by consumers. The dance below lets us do that without
233
            # losing the original Traceback, which will be much more
234
            # informative than one starting from this point.
235
            self.rollback()
1✔
236
            raise TransactionError() from exc
1✔
237

238
    def commit(self):
1✔
239
        if self.trans is None:
1!
240
            return
×
241
        try:
1✔
242
            self.trans.commit()
1✔
243
        except Exception as exc:
×
244
            self.rollback()
×
245
            raise TransactionError() from exc
×
246

247
    def rollback(self):
1✔
248
        if self.trans is None:
1✔
249
            return
1✔
250
        self.trans.rollback()
1✔
251

252

253
class AUSTable(object):
1✔
254
    """Base class for all AUS Tables. By default, all tables have a history
255
    table created for them, too, which mirrors their own structure and adds
256
    a record of who made a change, and when the change happened.
257

258
    :param history: Whether or not to create a history table for this table.
259
                    When True, a History object will be created for this
260
                    table, and all changes will be logged to it. Defaults
261
                    to True.
262
    :type history: bool
263
    :param versioned: Whether or not this table is versioned. When True,
264
                      an additional 'data_version' column will be added
265
                      to the Table, and its version increased with every
266
                      update. This is useful for detecting colliding
267
                      updates.
268

269
    :type versioned: bool
270
    :param scheduled_changes: Whether or not this table should allow changes
271
                              to be scheduled. When True, two additional tables
272
                              will be created: a $name_scheduled_changes, which
273
                              will contain data needed to schedule changes to
274
                              $name, and $name_scheduled_changes_history, which
275
                              tracks the history of a scheduled change.
276

277
    :type scheduled_changes: bool
278
    """
279

280
    def __init__(
1✔
281
        self,
282
        db,
283
        dialect,
284
        historyClass=None,
285
        historyKwargs={},
286
        versioned=True,
287
        scheduled_changes=False,
288
        scheduled_changes_kwargs={},
289
    ):
290
        self.db = db
1✔
291
        self.t = self.table
1✔
292
        # Enable versioning, if required
293
        if versioned:
1✔
294
            self.t.append_column(Column("data_version", Integer, nullable=False))
1✔
295
        self.versioned = versioned
1✔
296
        # Mirror the columns as attributes for easy access
297
        self.primary_key = []
1✔
298
        for col in self.table.columns:
1✔
299
            setattr(self, col.name, col)
1✔
300
            if col.primary_key:
1✔
301
                self.primary_key.append(col)
1✔
302
        # Set-up a history table to do logging in, if required
303
        if historyClass:
1✔
304
            self.history = historyClass(db, dialect, self.t.metadata, self, **historyKwargs)
1✔
305
        else:
306
            self.history = None
1✔
307
        # Set-up a scheduled changes table if required
308
        if scheduled_changes:
1✔
309
            self.scheduled_changes = ScheduledChangeTable(db, dialect, self.t.metadata, self, **scheduled_changes_kwargs)
1✔
310
        else:
311
            self.scheduled_changes = None
1✔
312
        self.log = logging.getLogger(self.__class__.__name__)
1✔
313

314
    # Can't do this in the constructor, because the engine is always
315
    # unset when we're instantiated
316
    def getEngine(self):
1✔
317
        return self.t.metadata.bind
1✔
318

319
    def _returnRowOrRaise(self, where, columns=None, transaction=None):
1✔
320
        """Return the row matching the where clause supplied. If no rows match or multiple rows match,
321
        a WrongNumberOfRowsError will be raised."""
322
        rows = self.select(where=where, columns=columns, transaction=transaction)
1✔
323
        if len(rows) == 0:
1!
324
            raise WrongNumberOfRowsError("where clause matched no rows")
×
325
        if len(rows) > 1:
1!
326
            raise WrongNumberOfRowsError("where clause matches multiple rows (primary keys: %s)" % rows)
×
327
        return rows[0]
1✔
328

329
    def _selectStatement(self, columns=None, where=None, order_by=None, limit=None, offset=None, distinct=False):
1✔
330
        """Create a SELECT statement on this table.
331

332
        :param columns: Column objects to select. Defaults to None, meaning select all columns
333
        :type columns: A sequence of sqlalchemy.schema.Column objects or column names as strings
334
        :param order_by: Columns to sort the rows by. Defaults to None, meaning no ORDER BY clause
335
        :type order_by: A sequence of sqlalchemy.schema.Column objects
336
        :param limit: Limit results to this many. Defaults to None, meaning no limit
337
        :type limit: int
338
        :param distinct: Whether or not to return only distinct rows. Default: False.
339
        :type distinct: bool
340

341
        :rtype: sqlalchemy.sql.expression.Select
342
        """
343
        if columns:
1✔
344
            table_columns = [(self.t.c[col] if isinstance(col, str) else col) for col in columns]
1✔
345
            query = select(table_columns, order_by=order_by, limit=limit, offset=offset, distinct=distinct)
1✔
346
        else:
347
            query = self.t.select(order_by=order_by, limit=limit, offset=offset, distinct=distinct)
1✔
348
        if where:
1✔
349
            for cond in where:
1✔
350
                query = query.where(cond)
1✔
351
        return query
1✔
352

353
    def select(self, where=None, transaction=None, **kwargs):
1✔
354
        """Perform a SELECT statement on this table.
355
        See AUSTable._selectStatement for possible arguments.
356

357
        :param where: A list of SQLAlchemy clauses, or a key/value pair of columns and values.
358
        :type where: list of clauses or key/value pairs.
359

360
        :param transaction: A transaction object to add the update statement (and history changes) to.
361
                            If provided, you must commit the transaction yourself. If None, they will
362
                            be added to a locally-scoped transaction and committed.
363

364
        :rtype: sqlalchemy.engine.base.ResultProxy
365
        """
366

367
        # If "where" is key/value pairs, we need to convert it to SQLAlchemy
368
        # clauses before proceeding.
369
        if hasattr(where, "keys"):
1✔
370
            where = [getattr(self, k) == v for k, v in where.items()]
1✔
371

372
        query = self._selectStatement(where=where, **kwargs)
1✔
373

374
        if transaction:
1✔
375
            result = transaction.execute(query).fetchall()
1✔
376
        else:
377
            with AUSTransaction(self.getEngine()) as trans:
1✔
378
                result = trans.execute(query).fetchall()
1✔
379

380
        return rows_to_dicts(result)
1✔
381

382
    def _insertStatement(self, **columns):
1✔
383
        """Create an INSERT statement for this table
384

385
        :param columns: Data to insert
386
        :type colmuns: dict
387

388
        :rtype: sqlalchemy.sql.express.Insert
389
        """
390
        for col in self.table.c:
1✔
391
            if col.primary_key and col.autoincrement and isinstance(col.type, Integer) and col.name in columns:
1!
392
                raise ValueError("Cannot set autoincrement primary key '%s' on insert" % col.name)
×
393
        table_columns = {k: columns[k] for k in columns.keys() if k in self.table.c}
1✔
394
        unconsumed_columns = {k: columns[k] for k in columns.keys() if k not in table_columns}
1✔
395
        return self.t.insert(values=table_columns), unconsumed_columns
1✔
396

397
    def _sharedPrepareInsert(self, trans, changed_by, **columns):
1✔
398
        """Prepare an INSERT statement for commit. If this table has versioning enabled,
399
        data_version will be set to 1. If this table has history enabled, two rows
400
        will be created in that table: one representing the current state (NULL),
401
        and one representing the new state.
402

403
        :rtype: sqlalchemy.engine.base.ResultProxy
404
        """
405
        data = columns.copy()
1✔
406
        if self.versioned:
1✔
407
            data["data_version"] = 1
1✔
408
        query, unconsumed_columns = self._insertStatement(**data)
1✔
409
        ret = trans.execute(query)
1✔
410
        return data, ret
1✔
411

412
    def _prepareInsert(self, trans, changed_by, **columns):
1✔
413
        data, ret = self._sharedPrepareInsert(trans, changed_by, **columns)
1✔
414
        if self.history:
1✔
415
            self.history.forInsert(ret.inserted_primary_key, data, changed_by, trans)
1✔
416
        return ret
1✔
417

418
    async def _asyncPrepareInsert(self, trans, changed_by, **columns):
1✔
419
        data, ret = self._sharedPrepareInsert(trans, changed_by, **columns)
1✔
420
        if self.history:
1!
421
            await self.history.forInsert(ret.inserted_primary_key, data, changed_by, trans)
1✔
422
        return ret
1✔
423

424
    def insert(self, changed_by=None, transaction=None, dryrun=False, **columns):
1✔
425
        """Perform an INSERT statement on this table. See AUSTable._insertStatement for
426
        a description of columns.
427

428
        :param changed_by: The username of the person inserting the row. Required when
429
                           history is enabled. Unused otherwise. No authorization checks are done
430
                           at this level.
431
        :type changed_by: str
432
        :param transaction: A transaction object to add the insert statement (and history changes) to.
433
                            If provided, you must commit the transaction yourself. If None, they will
434
                            be added to a locally-scoped transaction and committed.
435
        :param dryrun: If true, this insert statement will not actually be run.
436
        :type dryrun: bool
437

438
        :rtype: sqlalchemy.engine.base.ResultProxy
439
        """
440
        if self.history and not changed_by:
1!
441
            raise ValueError("changed_by must be passed for Tables that have history")
×
442

443
        if dryrun:
1✔
444
            self.log.debug("In dryrun mode, not doing anything...")
1✔
445
            return
1✔
446

447
        if transaction:
1✔
448
            return self._prepareInsert(transaction, changed_by, **columns)
1✔
449
        else:
450
            with AUSTransaction(self.getEngine()) as trans:
1✔
451
                return self._prepareInsert(trans, changed_by, **columns)
1✔
452

453
    async def async_insert(self, changed_by=None, transaction=None, dryrun=False, **columns):
1✔
454
        """Perform an INSERT statement on this table. See AUSTable._insertStatement for
455
        a description of columns.
456

457
        :param changed_by: The username of the person inserting the row. Required when
458
                           history is enabled. Unused otherwise. No authorization checks are done
459
                           at this level.
460
        :type changed_by: str
461
        :param transaction: A transaction object to add the insert statement (and history changes) to.
462
                            If provided, you must commit the transaction yourself. If None, they will
463
                            be added to a locally-scoped transaction and committed.
464
        :param dryrun: If true, this insert statement will not actually be run.
465
        :type dryrun: bool
466

467
        :rtype: sqlalchemy.engine.base.ResultProxy
468
        """
469
        if self.history and not changed_by:
1!
470
            raise ValueError("changed_by must be passed for Tables that have history")
×
471

472
        if dryrun:
1!
473
            self.log.debug("In dryrun mode, not doing anything...")
×
474
            return
×
475

476
        if transaction:
1!
477
            return await self._asyncPrepareInsert(transaction, changed_by, **columns)
1✔
478
        else:
479
            with AUSTransaction(self.getEngine()) as trans:
×
480
                return await self._asyncPrepareInsert(trans, changed_by, **columns)
×
481

482
    def _deleteStatement(self, where):
1✔
483
        """Create a DELETE statement for this table.
484

485
        :param where: Conditions to apply on this select.
486
        :type where: A sequence of sqlalchemy.sql.expression.ClauseElement objects
487

488
        :rtype: sqlalchemy.sql.expression.Delete
489
        """
490
        query = self.t.delete()
1✔
491
        if where:
1!
492
            for cond in where:
1✔
493
                query = query.where(cond)
1✔
494
        return query
1✔
495

496
    def _sharedPrepareDelete(self, trans, where, changed_by, old_data_version):
1✔
497
        """Prepare a DELETE statement for commit. If this table has history enabled,
498
        a row will be created in that table representing the new state of the
499
        row being deleted (NULL). If versioning is enabled and old_data_version
500
        doesn't match the current version of the row to be deleted, an OutdatedDataError
501
        will be raised.
502

503
        :rtype: sqlalchemy.engine.base.ResultProxy
504
        """
505
        row = self._returnRowOrRaise(where=where, columns=self.primary_key, transaction=trans)
1✔
506

507
        if self.versioned:
1✔
508
            where = copy(where)
1✔
509
            where.append(self.data_version == old_data_version)
1✔
510

511
        query = self._deleteStatement(where)
1✔
512

513
        ret = trans.execute(query)
1✔
514
        if ret.rowcount != 1:
1✔
515
            raise OutdatedDataError("Failed to delete row, old_data_version doesn't match current data_version")
1✔
516
        if self.scheduled_changes:
1✔
517
            # If this table has active scheduled changes we cannot allow it to be deleted
518
            sc_where = [self.scheduled_changes.complete == False]  # noqa
1✔
519
            for pk in self.primary_key:
1✔
520
                sc_where.append(getattr(self.scheduled_changes, "base_%s" % pk.name) == row[pk.name])
1✔
521
            if self.scheduled_changes.select(where=sc_where, transaction=trans):
1✔
522
                raise ChangeScheduledError("Cannot delete rows that have changes scheduled.")
1✔
523

524
        return row, ret
1✔
525

526
    def _prepareDelete(self, trans, where, changed_by, old_data_version):
1✔
527
        row, ret = self._sharedPrepareDelete(trans, where, changed_by, old_data_version)
1✔
528
        if self.history:
1!
529
            self.history.forDelete(row, changed_by, trans)
1✔
530

531
        return ret
1✔
532

533
    async def _asyncPrepareDelete(self, trans, where, changed_by, old_data_version):
1✔
534
        row, ret = self._sharedPrepareDelete(trans, where, changed_by, old_data_version)
1✔
535
        if self.history:
1!
536
            await self.history.forDelete(row, changed_by, trans)
1✔
537

538
        return ret
1✔
539

540
    def delete(self, where, changed_by=None, old_data_version=None, transaction=None, dryrun=False):
1✔
541
        """Perform a DELETE statement on this table. See AUSTable._deleteStatement for
542
        a description of `where`. To simplify versioning, this method can only
543
        delete a single row per invocation. If the where clause given would delete
544
        zero or multiple rows, a WrongNumberOfRowsError is raised.
545

546
        :param where: A list of SQLAlchemy clauses, or a key/value pair of columns and values.
547
        :type where: list of clauses or key/value pairs.
548
        :param changed_by: The username of the person deleting the row(s). Required when
549
                           history is enabled. Unused otherwise. No authorization checks are done
550
                           at this level.
551
        :type changed_by: str
552
        :param old_data_version: Previous version of the row to be deleted. If this version doesn't
553
                                 match the current version of the row, an OutdatedDataError will be
554
                                 raised and the delete will fail. Required when versioning is enabled.
555
        :type old_data_version: int
556
        :param transaction: A transaction object to add the delete statement (and history changes) to.
557
                            If provided, you must commit the transaction yourself. If None, they will
558
                            be added to a locally-scoped transaction and committed.
559
        :param dryrun: If true, this insert statement will not actually be run.
560
        :type dryrun: bool
561

562
        :rtype: sqlalchemy.engine.base.ResultProxy
563
        """
564
        # If "where" is key/value pairs, we need to convert it to SQLAlchemy
565
        # clauses before proceeding.
566
        if hasattr(where, "keys"):
1✔
567
            where = [getattr(self, k) == v for k, v in where.items()]
1✔
568

569
        if self.history and not changed_by:
1!
570
            raise ValueError("changed_by must be passed for Tables that have history")
×
571
        if self.versioned and not old_data_version:
1!
572
            raise ValueError("old_data_version must be passed for Tables that are versioned")
×
573

574
        if dryrun:
1✔
575
            self.log.debug("In dryrun mode, not doing anything...")
1✔
576
            return
1✔
577

578
        if transaction:
1✔
579
            return self._prepareDelete(transaction, where, changed_by, old_data_version)
1✔
580
        else:
581
            with AUSTransaction(self.getEngine()) as trans:
1✔
582
                return self._prepareDelete(trans, where, changed_by, old_data_version)
1✔
583

584
    async def async_delete(self, where, changed_by=None, old_data_version=None, transaction=None, dryrun=False):
1✔
585
        """Perform a DELETE statement on this table. See AUSTable._deleteStatement for
586
        a description of `where`. To simplify versioning, this method can only
587
        delete a single row per invocation. If the where clause given would delete
588
        zero or multiple rows, a WrongNumberOfRowsError is raised.
589

590
        :param where: A list of SQLAlchemy clauses, or a key/value pair of columns and values.
591
        :type where: list of clauses or key/value pairs.
592
        :param changed_by: The username of the person deleting the row(s). Required when
593
                           history is enabled. Unused otherwise. No authorization checks are done
594
                           at this level.
595
        :type changed_by: str
596
        :param old_data_version: Previous version of the row to be deleted. If this version doesn't
597
                                 match the current version of the row, an OutdatedDataError will be
598
                                 raised and the delete will fail. Required when versioning is enabled.
599
        :type old_data_version: int
600
        :param transaction: A transaction object to add the delete statement (and history changes) to.
601
                            If provided, you must commit the transaction yourself. If None, they will
602
                            be added to a locally-scoped transaction and committed.
603
        :param dryrun: If true, this insert statement will not actually be run.
604
        :type dryrun: bool
605

606
        :rtype: sqlalchemy.engine.base.ResultProxy
607
        """
608
        # If "where" is key/value pairs, we need to convert it to SQLAlchemy
609
        # clauses before proceeding.
610
        if hasattr(where, "keys"):
1✔
611
            where = [getattr(self, k) == v for k, v in where.items()]
1✔
612

613
        if self.history and not changed_by:
1!
614
            raise ValueError("changed_by must be passed for Tables that have history")
×
615
        if self.versioned and not old_data_version:
1!
616
            raise ValueError("old_data_version must be passed for Tables that are versioned")
×
617

618
        if dryrun:
1!
619
            self.log.debug("In dryrun mode, not doing anything...")
×
620
            return
×
621

622
        if transaction:
1!
623
            return await self._asyncPrepareDelete(transaction, where, changed_by, old_data_version)
1✔
624
        else:
625
            with AUSTransaction(self.getEngine()) as trans:
×
626
                return await self._asyncPrepareDelete(trans, where, changed_by, old_data_version)
×
627

628
    def _updateStatement(self, where, what):
1✔
629
        """Create an UPDATE statement for this table
630

631
        :param where: Conditions to apply to this UPDATE.
632
        :type where: A sequence of sqlalchemy.sql.expression.ClauseElement objects.
633
        :param what: Data to update
634
        :type what: dict
635

636
        :rtype: sqlalchemy.sql.expression.Update
637
        """
638
        table_what = {k: what[k] for k in what.keys() if k in self.table.c}
1✔
639
        unconsumed_columns = {k: what[k] for k in what.keys() if k not in table_what}
1✔
640
        query = self.t.update(values=table_what)
1✔
641
        if where:
1!
642
            for cond in where:
1✔
643
                query = query.where(cond)
1✔
644
        return query, unconsumed_columns
1✔
645

646
    def _sharedPrepareUpdate(self, trans, where, what, changed_by, old_data_version):
1✔
647
        """Prepare an UPDATE statement for commit. If this table has versioning enabled,
648
        data_version will be increased by 1. If this table has history enabled, a
649
        row will be added to that table represent the new state of the data.
650

651
        :rtype: sqlalchemy.engine.base.ResultProxy
652
        """
653
        # To do merge detection for tables with scheduled changes we need a
654
        # copy of the original row, and what will be changed. To record
655
        # history, we need a copy of the entire new row.
656
        orig_row = self._returnRowOrRaise(where=where, transaction=trans)
1✔
657
        new_row = orig_row.copy()
1✔
658
        if self.versioned:
1✔
659
            where = copy(where)
1✔
660
            where.append(self.data_version == old_data_version)
1✔
661
            new_row["data_version"] += 1
1✔
662
            what["data_version"] = new_row["data_version"]
1✔
663

664
        # Copy the new data into the row
665
        for col in what:
1✔
666
            new_row[col] = what[col]
1✔
667

668
        query, unconsumed_columns = self._updateStatement(where, new_row)
1✔
669

670
        ret = trans.execute(query)
1✔
671
        # It's important that OutdatedDataError is raised as early as possible
672
        # because callers may be able to handle it gracefully (and continue
673
        # with their update). If we raise this _after_ adding history or merging
674
        # with Scheduled Changes, we may end up altering the history or
675
        # scheduled changes more than once if the caller ends up re-calling
676
        # AUSTable.update() after handling the OutdatedDataError.
677
        if ret.rowcount != 1:
1✔
678
            raise OutdatedDataError("Failed to update row, old_data_version doesn't match current data_version")
1✔
679
        if self.scheduled_changes:
1✔
680
            self.scheduled_changes.mergeUpdate(orig_row, what, changed_by, trans)
1✔
681
        return new_row, ret
1✔
682

683
    def _prepareUpdate(self, trans, where, what, changed_by, old_data_version):
1✔
684
        new_row, ret = self._sharedPrepareUpdate(trans, where, what, changed_by, old_data_version)
1✔
685
        if self.history:
1✔
686
            self.history.forUpdate(new_row, changed_by, trans)
1✔
687
        return ret
1✔
688

689
    async def _asyncPrepareUpdate(self, trans, where, what, changed_by, old_data_version):
1✔
690
        new_row, ret = self._sharedPrepareUpdate(trans, where, what, changed_by, old_data_version)
1✔
691
        if self.history:
1!
692
            await self.history.forUpdate(new_row, changed_by, trans)
1✔
693

694
        return ret
1✔
695

696
    def update(self, where, what, changed_by=None, old_data_version=None, transaction=None, dryrun=False):
1✔
697
        """Perform an UPDATE statement on this table. See AUSTable._updateStatement for
698
        a description of `where` and `what`. This method can only update a single row
699
        per invocation. If the where clause given would update zero or multiple rows, a
700
        WrongNumberOfRowsError is raised.
701

702
        :param where: A list of SQLAlchemy clauses, or a key/value pair of columns and values.
703
        :type where: list of clauses or key/value pairs.
704
        :param what: Key/value pairs containing new values for the given columns.
705
        :type what: key/value pairs
706
        :param changed_by: The username of the person inserting the row. Required when
707
                           history is enabled. Unused otherwise. No authorization checks are done
708
                           at this level.
709
        :type changed_by: str
710
        :param old_data_version: Previous version of the row to be deleted. If this version doesn't
711
                                 match the current version of the row, an OutdatedDataError will be
712
                                 raised and the delete will fail. Required when versioning is enabled.
713
        :type old_data_version: int
714
        :param transaction: A transaction object to add the update statement (and history changes) to.
715
                            If provided, you must commit the transaction yourself. If None, they will
716
                            be added to a locally-scoped transaction and committed.
717
        :param dryrun: If true, this insert statement will not actually be run.
718
        :type dryrun: bool
719

720
        :rtype: sqlalchemy.engine.base.ResultProxy
721
        """
722
        # If "where" is key/value pairs, we need to convert it to SQLAlchemy
723
        # clauses before proceeding.
724
        if hasattr(where, "keys"):
1✔
725
            where = [getattr(self, k) == v for k, v in where.items()]
1✔
726

727
        if self.history and not changed_by:
1!
728
            raise ValueError("changed_by must be passed for Tables that have history")
×
729
        if self.versioned and not old_data_version:
1!
730
            raise ValueError("update: old_data_version must be passed for Tables that are versioned")
×
731

732
        if dryrun:
1✔
733
            self.log.debug("In dryrun mode, not doing anything...")
1✔
734
            return
1✔
735

736
        if transaction:
1✔
737
            return self._prepareUpdate(transaction, where, what, changed_by, old_data_version)
1✔
738
        else:
739
            with AUSTransaction(self.getEngine()) as trans:
1✔
740
                return self._prepareUpdate(trans, where, what, changed_by, old_data_version)
1✔
741

742
    async def async_update(self, where, what, changed_by=None, old_data_version=None, transaction=None, dryrun=False):
1✔
743
        """Perform an UPDATE statement on this table. See AUSTable._updateStatement for
744
        a description of `where` and `what`. This method can only update a single row
745
        per invocation. If the where clause given would update zero or multiple rows, a
746
        WrongNumberOfRowsError is raised.
747

748
        :param where: A list of SQLAlchemy clauses, or a key/value pair of columns and values.
749
        :type where: list of clauses or key/value pairs.
750
        :param what: Key/value pairs containing new values for the given columns.
751
        :type what: key/value pairs
752
        :param changed_by: The username of the person inserting the row. Required when
753
                           history is enabled. Unused otherwise. No authorization checks are done
754
                           at this level.
755
        :type changed_by: str
756
        :param old_data_version: Previous version of the row to be deleted. If this version doesn't
757
                                 match the current version of the row, an OutdatedDataError will be
758
                                 raised and the delete will fail. Required when versioning is enabled.
759
        :type old_data_version: int
760
        :param transaction: A transaction object to add the update statement (and history changes) to.
761
                            If provided, you must commit the transaction yourself. If None, they will
762
                            be added to a locally-scoped transaction and committed.
763
        :param dryrun: If true, this insert statement will not actually be run.
764
        :type dryrun: bool
765

766
        :rtype: sqlalchemy.engine.base.ResultProxy
767
        """
768
        # If "where" is key/value pairs, we need to convert it to SQLAlchemy
769
        # clauses before proceeding.
770
        if hasattr(where, "keys"):
1✔
771
            where = [getattr(self, k) == v for k, v in where.items()]
1✔
772

773
        if self.history and not changed_by:
1!
774
            raise ValueError("changed_by must be passed for Tables that have history")
×
775
        if self.versioned and not old_data_version:
1!
776
            raise ValueError("update: old_data_version must be passed for Tables that are versioned")
×
777

778
        if dryrun:
1!
779
            self.log.debug("In dryrun mode, not doing anything...")
×
780
            return
×
781

782
        if transaction:
1!
783
            return await self._asyncPrepareUpdate(transaction, where, what, changed_by, old_data_version)
1✔
784
        else:
785
            with AUSTransaction(self.getEngine()) as trans:
×
786
                return await self._asyncPrepareUpdate(trans, where, what, changed_by, old_data_version)
×
787

788
    def count(self, column="*", where=None, transaction=None):
1✔
789
        count_statement = select(columns=[func.count(column)], from_obj=self.t)
1✔
790
        if where:
1✔
791
            for cond in where:
1✔
792
                count_statement = count_statement.where(cond)
1✔
793
        if transaction:
1✔
794
            row_count = transaction.execute(count_statement).scalar()
1✔
795
        else:
796
            with AUSTransaction(self.getEngine()) as trans:
1✔
797
                row_count = trans.execute(count_statement).scalar()
1✔
798
        return row_count
1✔
799

800
    def getRecentChanges(self, limit=10, transaction=None):
1✔
801
        return self.history.select(transaction=transaction, limit=limit, order_by=self.history.timestamp.desc())
×
802

803

804
class GCSHistory:
1✔
805
    def __init__(self, db, dialect, metadata, baseTable, buckets, identifier_columns, data_column):
1✔
806
        self.buckets = buckets
1✔
807
        self.identifier_columns = identifier_columns
1✔
808
        self.data_column = data_column
1✔
809

810
    def _getBucket(self, identifier):
1✔
811
        for substring, bucket in self.buckets.items():
×
812
            if substring in identifier:
×
813
                return bucket
×
814
        else:
815
            raise KeyError("Couldn't find bucket to place {} history in.".format(identifier))
×
816

817
    def forInsert(self, insertedKeys, columns, changed_by, trans):
1✔
818
        timestamp = getMillisecondTimestamp()
1✔
819
        identifier = "-".join([columns.get(i) for i in self.identifier_columns])
1✔
820
        for data_version, ts, data in ((None, timestamp - 1, ""), (columns.get("data_version"), timestamp, json.dumps(columns[self.data_column]))):
1✔
821
            bname = "{}/{}-{}-{}.json".format(identifier, data_version, ts, changed_by)
1✔
822
            with statsd.timer("gcs_upload"):
1✔
823
                bucket = self._getBucket(identifier)(use_gcloud_aio=False)
1✔
824
                blob = bucket.blob(bname)
1✔
825
                blob.upload_from_string(data, content_type="application/json")
1✔
826

827
    def forDelete(self, rowData, changed_by, trans):
1✔
828
        identifier = "-".join([rowData.get(i) for i in self.identifier_columns])
1✔
829
        bname = "{}/{}-{}-{}.json".format(identifier, rowData.get("data_version"), getMillisecondTimestamp(), changed_by)
1✔
830
        with statsd.timer("gcs_upload"):
1✔
831
            bucket = self._getBucket(identifier)(use_gcloud_aio=False)
1✔
832
            blob = bucket.blob(bname)
1✔
833
            blob.upload_from_string("", content_type="application/json")
1✔
834

835
    def forUpdate(self, rowData, changed_by, trans):
1✔
836
        identifier = "-".join([rowData.get(i) for i in self.identifier_columns])
1✔
837
        bname = "{}/{}-{}-{}.json".format(identifier, rowData.get("data_version"), getMillisecondTimestamp(), changed_by)
1✔
838
        with statsd.timer("gcs_upload"):
1✔
839
            bucket = self._getBucket(identifier)(use_gcloud_aio=False)
1✔
840
            blob = bucket.blob(bname)
1✔
841
            blob.upload_from_string(json.dumps(rowData[self.data_column]), content_type="application/json")
1✔
842

843
    def getChange(self, change_id=None, column_values=None, data_version=None, transaction=None):
1✔
844
        if not set(self.identifier_columns).issubset(column_values.keys()) or not data_version:
×
845
            raise ValueError("Cannot find GCS changes without {} and data_version".format(self.identifier_columns))
×
846
        identifier = "-".join([column_values[i] for i in self.identifier_columns])
×
847
        bucket = self._getBucket(identifier)(use_gcloud_aio=False)
×
848
        blobs = [b for b in bucket.list_blobs(prefix="{}/{}".format(identifier, data_version))]
×
849
        if len(blobs) != 1:
×
850
            raise ValueError("Found {} blobs instead of 1".format(len(blobs)))
×
851
        return {tuple(self.identifier_columns): identifier, "data_version": data_version, self.data_column: json.loads(blobs[0].download_as_string())}
×
852

853

854
class GCSHistoryAsync:
1✔
855
    def __init__(self, db, dialect, metadata, baseTable, buckets, identifier_columns, data_column):
1✔
856
        self.db = db
1✔
857
        self.buckets = buckets
1✔
858
        self.identifier_columns = identifier_columns
1✔
859
        self.data_column = data_column
1✔
860

861
    def _getBucket(self, identifier):
1✔
862
        for substring, bucket in self.buckets.items():
×
863
            if substring in identifier:
×
864
                return bucket
×
865
        else:
866
            raise KeyError("Couldn't find bucket to place {} history in.".format(identifier))
×
867

868
    async def forInsert(self, insertedKeys, columns, changed_by, trans):
1✔
869
        timestamp = getMillisecondTimestamp()
1✔
870
        identifier = "-".join([columns.get(i) for i in self.identifier_columns])
1✔
871
        for data_version, ts, data in ((None, timestamp - 1, ""), (columns.get("data_version"), timestamp, json.dumps(columns[self.data_column]))):
1✔
872
            bname = "{}/{}-{}-{}.json".format(identifier, data_version, ts, changed_by)
1✔
873
            with statsd.timer("async_gcs_upload"):
1✔
874
                # Using a separate session for each request is not ideal, but it's
875
                # the only thing that seems to work. Ideally, we'd share one session
876
                # for the entire application, but we can't for two reasons:
877
                # 1) gcloud-aio won't close the sessions, which results in a lot of
878
                # errors (https://github.com/talkiq/gcloud-aio/issues/33)
879
                # 2) When bhearsum tried this it resulted in hangs that he suspected
880
                # were caused by connection re-use.
881
                async with ClientSession() as session:
1✔
882
                    bucket = self._getBucket(identifier)(session=session)
1✔
883
                    blob = bucket.new_blob(bname)
1✔
884
                    await blob.upload(data, session=session)
1✔
885

886
    async def forDelete(self, rowData, changed_by, trans):
1✔
887
        identifier = "-".join([rowData.get(i) for i in self.identifier_columns])
1✔
888
        bname = "{}/{}-{}-{}.json".format(identifier, rowData.get("data_version"), getMillisecondTimestamp(), changed_by)
1✔
889
        with statsd.timer("async_gcs_upload"):
1✔
890
            async with ClientSession() as session:
1✔
891
                bucket = self._getBucket(identifier)(session=session)
1✔
892
                blob = bucket.new_blob(bname)
1✔
893
                await blob.upload("", session=session)
1✔
894

895
    async def forUpdate(self, rowData, changed_by, trans):
1✔
896
        identifier = "-".join([rowData.get(i) for i in self.identifier_columns])
1✔
897
        bname = "{}/{}-{}-{}.json".format(identifier, rowData.get("data_version"), getMillisecondTimestamp(), changed_by)
1✔
898
        with statsd.timer("async_gcs_upload"):
1✔
899
            async with ClientSession() as session:
1✔
900
                bucket = self._getBucket(identifier)(session=session)
1✔
901
                blob = bucket.new_blob(bname)
1✔
902
                await blob.upload(json.dumps(rowData[self.data_column]), session=session)
1✔
903

904

905
class HistoryTable(AUSTable):
1✔
906
    """Represents a history table that may be attached to another AUSTable.
907
    History tables mirror the structure of their `baseTable`, with the exception
908
    that nullable and primary_key attributes are always overwritten to be
909
    True and False respectively. Additionally, History tables have a unique
910
    change_id for each row, and record the username making a change, and the
911
    timestamp of each change. The methods forInsert, forDelete, and forUpdate
912
    will generate appropriate INSERTs to the History table given appropriate
913
    inputs, and are documented below. History tables are never versioned,
914
    and cannot have history of their own."""
915

916
    def __init__(self, db, dialect, metadata, baseTable):
1✔
917
        self.baseTable = baseTable
1✔
918
        self.table = Table(
1✔
919
            "%s_history" % baseTable.t.name,
920
            metadata,
921
            Column("change_id", Integer, primary_key=True, autoincrement=True),
922
            Column("changed_by", String(100), nullable=False),
923
        )
924
        # Timestamps are stored as an integer, but actually contain
925
        # precision down to the millisecond, achieved through
926
        # multiplication.
927
        # SQLAlchemy's SQLite dialect doesn't support fully support BigInteger.
928
        # The Column will work, but it ends up being a NullType Column which
929
        # breaks our upgrade unit tests. Because of this, we make sure to use
930
        # a plain Integer column for SQLite. In MySQL, an Integer is
931
        # Integer(11), which is too small for our needs.
932
        if dialect == "sqlite":
1!
933
            self.table.append_column(Column("timestamp", Integer, nullable=False))
1✔
934
        else:
935
            self.table.append_column(Column("timestamp", BigInteger, nullable=False))
×
936
        self.base_primary_key = [pk.name for pk in baseTable.primary_key]
1✔
937
        for col in baseTable.t.columns:
1✔
938
            newcol = col._copy()
1✔
939
            if col.primary_key:
1✔
940
                newcol.primary_key = False
1✔
941
            else:
942
                newcol.nullable = True
1✔
943
                # Setting unique to None because SQLAlchemy marks column attribute as None
944
                # unless they have been explicitely set to True or False.
945
                newcol.unique = None
1✔
946
            self.table.append_column(newcol)
1✔
947
        AUSTable.__init__(self, db, dialect, historyClass=None, versioned=False)
1✔
948

949
    def getPointInTime(self, timestamp, transaction=None):
1✔
950
        # The inner query here gets one change id for every unique object in
951
        # the base table. Filtering by timestamp < provided timestamp means
952
        # we won't get any results most recent than the requested timestamp.
953
        # Grouping by the primary key and selecting the max change_id means
954
        # we'll get the most recent change_id (after applying the timestamp
955
        # filter) for every unique object.
956
        # The outer query simply retrieves the actual row data for each
957
        # change_id that the inner query found
958
        # Black wants to format this all on one line, which is more difficult
959
        # to read.
960
        # fmt: off
961
        q = (select(self.table.columns)
1✔
962
             .where(self.change_id.in_(
963
                 select([sql_max(self.change_id)])
964
                 .where(self.timestamp <= timestamp)
965
                 .group_by(*self.base_primary_key)
966
             )
967
        ))
968
        # fmt: on
969
        if transaction:
1!
970
            result = transaction.execute(q).fetchall()
×
971
        else:
972
            with AUSTransaction(self.getEngine()) as trans:
1✔
973
                result = trans.execute(q).fetchall()
1✔
974

975
        rows = []
1✔
976
        # Filter out any rows who have no non-primary key data, because this
977
        # means the row has been deleted.
978
        non_primary_key_columns = [col.name for col in self.baseTable.t.columns if not col.primary_key]
1✔
979
        for row in result:
1✔
980
            if any([row[col] for col in non_primary_key_columns]):
1✔
981
                rows.append(row)
1✔
982

983
        return rows_to_dicts(rows)
1✔
984

985
    def forInsert(self, insertedKeys, columns, changed_by, trans):
1✔
986
        """Inserts cause two rows in the History table to be created. The first
987
        one records the primary key data and NULLs for other row data. This
988
        represents that the row did not exist prior to the insert. The
989
        timestamp for this row is 1 millisecond behind the real timestamp to
990
        reflect this. The second row records the full data of the row at the
991
        time of insert."""
992
        primary_key_data = {}
1✔
993
        for i in range(0, len(self.base_primary_key)):
1✔
994
            name = self.base_primary_key[i]
1✔
995
            primary_key_data[name] = insertedKeys[i]
1✔
996
            # Make sure the primary keys are included in the second row as well
997
            columns[name] = insertedKeys[i]
1✔
998

999
        ts = getMillisecondTimestamp()
1✔
1000
        query, _ = self._insertStatement(changed_by=changed_by, timestamp=ts - 1, **primary_key_data)
1✔
1001
        trans.execute(query)
1✔
1002
        query, _ = self._insertStatement(changed_by=changed_by, timestamp=ts, **columns)
1✔
1003
        trans.execute(query)
1✔
1004

1005
    def forDelete(self, rowData, changed_by, trans):
1✔
1006
        """Deletes cause a single row to be created, which only contains the
1007
        primary key data. This represents that the row no longer exists."""
1008
        row = {}
1✔
1009
        table_row_data = {k: rowData[k] for k in rowData.keys() if k in self.table.c}
1✔
1010
        for k in table_row_data:
1✔
1011
            row[str(k)] = table_row_data[k]
1✔
1012
        # Tack on history table information to the row
1013
        row["changed_by"] = changed_by
1✔
1014
        row["timestamp"] = getMillisecondTimestamp()
1✔
1015
        query, _ = self._insertStatement(**row)
1✔
1016
        trans.execute(query)
1✔
1017

1018
    def forUpdate(self, rowData, changed_by, trans):
1✔
1019
        """Updates cause a single row to be created, which contains the full,
1020
        new data of the row at the time of the update."""
1021
        row = {}
1✔
1022
        table_row_data = {k: rowData[k] for k in rowData.keys() if k in self.table.c}
1✔
1023
        for k in table_row_data:
1✔
1024
            row[str(k)] = table_row_data[k]
1✔
1025
        row["changed_by"] = changed_by
1✔
1026
        row["timestamp"] = getMillisecondTimestamp()
1✔
1027
        query, _ = self._insertStatement(**row)
1✔
1028
        trans.execute(query)
1✔
1029

1030
    def getChange(self, change_id=None, column_values=None, data_version=None, transaction=None):
1✔
1031
        """Returns the unique change that matches the give change_id or
1032
        combination of data_version and values for the specified columns.
1033
        column_values is a dict that contains the column names that are
1034
        versioned and their values.
1035
        Ignores non primary key attributes specified in column_values."""
1036
        # if change_id is not None, we use it to get the change, ignoring
1037
        # data_version and column_values
1038
        by_change_id = False if change_id is None else True
1✔
1039
        # column_names lists all primary keys as string keys with the column
1040
        # objects as values
1041
        column_names = {col.name: col for col in self.table.columns if col.name in self.base_primary_key}
1✔
1042

1043
        if not by_change_id:
1✔
1044
            # we check if the entire primary key is present in column_values,
1045
            # since there might be multiple rows that match an incomplete
1046
            # primary key
1047
            for col in column_names.keys():
1✔
1048
                if col not in column_values.keys():
1✔
1049
                    raise ValueError("Entire primary key not present")
1✔
1050
            # data_version can only be queried for versioned tables
1051
            if not self.baseTable.versioned:
1!
1052
                raise ValueError("data_version queried for non-versioned table")
×
1053

1054
            where = [self.data_version == data_version]
1✔
1055
            self.log.debug("Querying for change_id by:")
1✔
1056
            self.log.debug("data_version: %s", data_version)
1✔
1057
            for col in column_names.keys():
1✔
1058
                self.log.debug("%s: %s", column_names[col], column_values[col])
1✔
1059
                where.append(column_names[col] == column_values[col])
1✔
1060

1061
            # To improve query efficiency we first get the change_id,
1062
            # and _then_ get the entire row. This is because we may not be able
1063
            # to query by an index depending which column_values we were given.
1064
            # If we end up querying by column_values that don't have an index,
1065
            # mysql will read many more rows than will be returned. This is
1066
            # particularly bad on the releases_history table, where the "data"
1067
            # column is often hundreds of kilobytes per row.
1068
            # Additional details in https://github.com/mozilla-releng/balrog/pull/419#issuecomment-334851038
1069
            change_ids = self.select(columns=[self.change_id], where=where, transaction=transaction)
1✔
1070
            if len(change_ids) != 1:
1✔
1071
                self.log.debug("Found %s changes when not querying by change_id, should have been 1", len(change_ids))
1✔
1072
                return None
1✔
1073
            change_id = change_ids[0]["change_id"]
1✔
1074

1075
        self.log.debug("Querying for full change by change_id %s", change_id)
1✔
1076
        changes = self.select(where=[self.change_id == change_id], transaction=transaction)
1✔
1077
        if len(changes) != 1:
1✔
1078
            self.log.debug("Found %s changes when querying by change_id, should have been 1", len(changes))
1✔
1079
            return None
1✔
1080
        return changes[0]
1✔
1081

1082

1083
class ConditionsTable(AUSTable):
1✔
1084
    # Scheduled changes may only have a single type of condition, but some
1085
    # conditions require mulitple arguments. This data structure defines
1086
    # each type of condition, and groups their args together for easier
1087
    # processing.
1088
    condition_groups = {"time": ("when",), "uptake": ("telemetry_product", "telemetry_channel", "telemetry_uptake")}
1✔
1089

1090
    def __init__(self, db, dialect, metadata, baseName, conditions, historyClass=HistoryTable):
1✔
1091
        if not conditions:
1✔
1092
            raise ValueError("No conditions enabled, cannot initialize conditions for for {}".format(baseName))
1✔
1093
        if set(conditions) - set(self.condition_groups):
1✔
1094
            raise ValueError("Unknown conditions in: {}".format(conditions))
1✔
1095

1096
        self.enabled_condition_groups = {k: v for k, v in self.condition_groups.items() if k in conditions}
1✔
1097

1098
        self.table = Table("{}_conditions".format(baseName), metadata, Column("sc_id", Integer, primary_key=True, autoincrement=False))
1✔
1099

1100
        if "uptake" in conditions:
1✔
1101
            self.table.append_column(Column("telemetry_product", String(15)))
1✔
1102
            self.table.append_column(Column("telemetry_channel", String(75)))
1✔
1103
            self.table.append_column(Column("telemetry_uptake", Integer))
1✔
1104

1105
        if "time" in conditions:
1!
1106
            if dialect == "sqlite":
1!
1107
                self.table.append_column(Column("when", Integer))
1✔
1108
            else:
1109
                self.table.append_column(Column("when", BigInteger))
×
1110

1111
        super(ConditionsTable, self).__init__(db, dialect, historyClass=historyClass, versioned=True)
1✔
1112

1113
    def validate(self, conditions):
1✔
1114
        conditions = {k: v for k, v in conditions.items() if conditions[k]}
1✔
1115
        if not conditions:
1✔
1116
            raise ValueError("No conditions found")
1✔
1117

1118
        for c in conditions:
1✔
1119
            for condition, args in self.condition_groups.items():
1✔
1120
                if c in args:
1✔
1121
                    if c in itertools.chain(*self.enabled_condition_groups.values()):
1✔
1122
                        break
1✔
1123
                    else:
1124
                        raise ValueError("{} condition is disabled".format(condition))
1✔
1125
            else:
1126
                raise ValueError("Invalid condition: %s", c)
1✔
1127

1128
        for group in self.enabled_condition_groups.values():
1✔
1129
            if set(group) == set(conditions.keys()):
1✔
1130
                break
1✔
1131
        else:
1132
            raise ValueError("Invalid combination of conditions: {}".format(conditions.keys()))
1✔
1133

1134
        if "when" in conditions:
1✔
1135
            try:
1✔
1136
                time.gmtime(conditions["when"] / 1000)
1✔
1137
            except Exception:
1✔
1138
                raise ValueError("Cannot parse 'when' as a unix timestamp.")
1✔
1139

1140
            if conditions["when"] < getMillisecondTimestamp():
1✔
1141
                raise ValueError("Cannot schedule changes in the past")
1✔
1142

1143

1144
class ScheduledChangeTable(AUSTable):
1✔
1145
    """A Table that stores the necessary information to schedule changes
1146
    to the baseTable provided. A ScheduledChangeTable ends up mirroring the
1147
    columns of its base, and adding the necessary ones to provide the schedule.
1148
    By default, ScheduledChangeTables enable History on themselves."""
1149

1150
    def __init__(self, db, dialect, metadata, baseTable, conditions=("time", "uptake"), historyClass=HistoryTable):
1✔
1151
        table_name = "{}_scheduled_changes".format(baseTable.t.name)
1✔
1152
        self.baseTable = baseTable
1✔
1153
        self.table = Table(
1✔
1154
            table_name,
1155
            metadata,
1156
            Column("sc_id", Integer, primary_key=True, autoincrement=True),
1157
            Column("scheduled_by", String(100), nullable=False),
1158
            Column("complete", Boolean, default=False),
1159
            Column("change_type", String(50), nullable=False),
1160
        )
1161
        self.conditions = ConditionsTable(db, dialect, metadata, table_name, conditions, historyClass=historyClass)
1✔
1162
        # Signoffs are configurable at runtime, which means that we always need
1163
        # a Signoffs table, even if it may not be used immediately.
1164
        self.signoffs = SignoffsTable(db, metadata, dialect, table_name)
1✔
1165

1166
        # The primary key column(s) are used in construct "where" clauses for
1167
        # existing rows.
1168
        self.base_primary_key = []
1✔
1169
        # A ScheduledChangesTable requires all of the columns from its base
1170
        # table, with a few tweaks:
1171
        for col in baseTable.t.columns:
1✔
1172
            if col.primary_key:
1✔
1173
                self.base_primary_key.append(col.name)
1✔
1174
            newcol = col._copy()
1✔
1175
            # 1) Columns are prefixed with "base_", to make them easy to
1176
            # identify and avoid conflicts.
1177
            # Renaming a column requires to change both the key and the name
1178
            # See https://github.com/zzzeek/sqlalchemy/blob/rel_0_7/lib/sqlalchemy/schema.py#L781
1179
            # for background.
1180
            newcol.key = newcol.name = "base_%s" % col.name
1✔
1181
            # 2) Primary Key Integer Autoincrement columns from the baseTable become normal nullable
1182
            # columns in ScheduledChanges because we can schedule changes that insert into baseTable
1183
            # and the DB will handle inserting the correct value. However, nulls aren't allowed when
1184
            # we schedule updates or deletes -this is enforced in self.validate().
1185
            # For Primary Key columns that aren't Integer or Autoincrement but are nullable, we preserve
1186
            # this non-nullability because we need a value to insert into the baseTable when the
1187
            # scheduled change gets executed.
1188
            # Non-Primary Key columns from the baseTable become nullable and non-unique in ScheduledChanges
1189
            # because they aren't part of the ScheduledChanges business logic and become simple data storage.
1190
            if col.primary_key:
1✔
1191
                newcol.primary_key = False
1✔
1192

1193
                # Only integer columns can be AUTOINCREMENT. The isinstance statement guards
1194
                # against false positives from SQLAlchemy.
1195
                if col.autoincrement and isinstance(col.type, Integer):
1✔
1196
                    newcol.nullable = True
1✔
1197
            else:
1198
                newcol.unique = None
1✔
1199
                newcol.nullable = True
1✔
1200

1201
            self.table.append_column(newcol)
1✔
1202

1203
        super(ScheduledChangeTable, self).__init__(db, dialect, historyClass=historyClass, versioned=True)
1✔
1204

1205
    def _prefixColumns(self, columns):
1✔
1206
        """Helper function which takes key/value pairs of columns for this
1207
        scheduled changes table - which could contain some unprefixed base
1208
        table columns - and returns key/values pairs of the same columns
1209
        with the base table ones prefixed."""
1210
        ret = {}
1✔
1211
        base_columns = [c.name for c in self.baseTable.t.columns]
1✔
1212
        for k, v in columns.items():
1✔
1213
            if k in base_columns:
1✔
1214
                ret["base_%s" % k] = v
1✔
1215
            else:
1216
                ret[k] = v
1✔
1217
        return ret
1✔
1218

1219
    def _splitColumns(self, columns):
1✔
1220
        """Because Scheduled Changes are stored across two Tables, we need to
1221
        split out the parts that are in the main table from the parts that
1222
        are stored in the conditions table in a few different places."""
1223
        base_columns = {}
1✔
1224
        condition_columns = {}
1✔
1225
        for cond_type in columns:
1✔
1226
            if cond_type in itertools.chain(*self.conditions.condition_groups.values()):
1✔
1227
                condition_columns[cond_type] = columns[cond_type]
1✔
1228
            else:
1229
                base_columns[cond_type] = columns[cond_type]
1✔
1230

1231
        return base_columns, condition_columns
1✔
1232

1233
    def _checkBaseTablePermissions(self, base_table_where, new_row, changed_by, transaction):
1✔
1234
        if "change_type" not in new_row:
1!
1235
            raise ValueError("change_type needed to check Permission")
×
1236

1237
        if new_row.get("change_type") == "update":
1✔
1238
            self.baseTable.update(base_table_where, new_row, changed_by, new_row["data_version"], transaction=transaction, dryrun=True)
1✔
1239
        elif new_row.get("change_type") == "insert":
1✔
1240
            self.baseTable.insert(changed_by, transaction=transaction, dryrun=True, **new_row)
1✔
1241
        elif new_row.get("change_type") == "delete":
1!
1242
            self.baseTable.delete(base_table_where, changed_by, new_row["data_version"], transaction=transaction, dryrun=True)
1✔
1243
        else:
1244
            raise ValueError("Unknown Change Type")
×
1245

1246
    def _dataVersionsAreSynced(self, sc_id, transaction):
1✔
1247
        sc_row = super(ScheduledChangeTable, self).select(where=[self.sc_id == sc_id], transaction=transaction, columns=[self.data_version])
1✔
1248
        conditions_row = self.conditions.select(where=[self.conditions.sc_id == sc_id], transaction=transaction, columns=[self.conditions.data_version])
1✔
1249
        if not sc_row or len(sc_row) != 1 or not conditions_row or len(conditions_row) != 1:
1!
1250
            return False
×
1251
        self.log.debug("sc_row data version is %s", sc_row[0].get("data_version"))
1✔
1252
        self.log.debug("conditions_row data version is %s", conditions_row[0].get("data_version"))
1✔
1253
        if sc_row[0].get("data_version") != conditions_row[0].get("data_version"):
1✔
1254
            return False
1✔
1255

1256
        return True
1✔
1257

1258
    def validate(self, base_columns, condition_columns, changed_by, sc_id=None, transaction=None):
1✔
1259
        # Depending on the change type, we may do some additional checks
1260
        # against the base table PK columns. It's cleaner to build up these
1261
        # early than do it later.
1262
        base_table_where = []
1✔
1263
        sc_table_where = []
1✔
1264

1265
        for pk in self.base_primary_key:
1✔
1266
            base_column = getattr(self.baseTable, pk)
1✔
1267
            if pk in base_columns:
1✔
1268
                sc_table_where.append(getattr(self, "base_%s" % pk) == base_columns[pk])
1✔
1269
                base_table_where.append(getattr(self.baseTable, pk) == base_columns[pk])
1✔
1270
            # Non-Integer columns can have autoincrement set to True for some reason.
1271
            # Any non-integer columns in the primary key are always required (because
1272
            # autoincrement actually isn't a thing for them), and any Integer columns
1273
            # that _aren't_ autoincrement are required as well.
1274
            elif not isinstance(base_column.type, (sqlalchemy.types.Integer,)) or not base_column.autoincrement:
1✔
1275
                raise ValueError("Missing primary key column '%s' which is not autoincrement", pk)
1✔
1276

1277
        if base_columns["change_type"] == "delete":
1✔
1278
            for pk in self.base_primary_key:
1✔
1279
                if pk not in base_columns:
1!
1280
                    raise ValueError("Missing primary key column %s. PK values needed for deletion" % (pk))
×
1281
                if base_columns[pk] is None:
1!
1282
                    raise ValueError("%s value found to be None. PK value can not be None for deletion" % (pk))
×
1283
        elif base_columns["change_type"] == "update":
1✔
1284
            # For updates, we need to make sure that the baseTable row already
1285
            # exists, and that the data version provided matches the current
1286
            # version to ensure that someone isn't trying to schedule a change
1287
            # against out-of-date data.
1288
            current_data_version = self.baseTable.select(columns=(self.baseTable.data_version,), where=base_table_where, transaction=transaction)
1✔
1289
            if not current_data_version:
1✔
1290
                raise ValueError("Cannot create scheduled change with data_version for non-existent row")
1✔
1291

1292
            if current_data_version and current_data_version[0]["data_version"] != base_columns.get("data_version"):
1✔
1293
                raise OutdatedDataError("Wrong data_version given for base table, cannot create scheduled change.")
1✔
1294
        elif base_columns["change_type"] == "insert" and base_table_where:
1✔
1295
            # If the base table row shouldn't already exist, we need to make sure they don't
1296
            # to avoid getting an IntegrityError when the change is enacted.
1297
            if self.baseTable.select(columns=(self.baseTable.data_version,), where=base_table_where, transaction=transaction):
1✔
1298
                raise ValueError("Cannot schedule change for duplicate PK")
1✔
1299

1300
        # If we're validating a new scheduled change (sc_id is None), we need
1301
        # to make sure that no other scheduled change already exists if a
1302
        # primary key for the base table was provided (sc_table_where is not empty).
1303
        if not sc_id and sc_table_where:
1✔
1304
            sc_table_where.append(self.complete == False)  # noqa because we need to use == for sqlalchemy operator overloading to work
1✔
1305
            if len(self.select(columns=[self.sc_id], where=sc_table_where)) > 0:
1✔
1306
                raise ChangeScheduledError("Cannot schedule a change for a row with one already scheduled")
1✔
1307

1308
        self.conditions.validate(condition_columns)
1✔
1309
        self._checkBaseTablePermissions(base_table_where, base_columns, changed_by, transaction)
1✔
1310

1311
    def auto_signoff(self, changed_by, transaction, sc_id, dryrun, columns):
1✔
1312
        # - If the User scheduling a change only holds one of the required Roles, record a signoff with it.
1313
        # - If the User scheduling a change holds more than one of the required Roles, we cannot a Signoff, because
1314
        #   we don't know which Role we'd want to signoff with. The user will need to signoff
1315
        #   manually in these cases.
1316
        user_roles = self.db.getUserRoles(username=changed_by, transaction=transaction)
1✔
1317
        if len(user_roles):
1✔
1318
            required_roles = set()
1✔
1319
            required_signoffs = self.baseTable.getPotentialRequiredSignoffs([columns], transaction=transaction)
1✔
1320
            if required_signoffs:
1✔
1321
                required_roles.update([rs["role"] for rs in [obj for v in required_signoffs.values() for obj in v]])
1✔
1322
            possible_signoffs = list(filter(lambda role: role["role"] in required_roles, user_roles))
1✔
1323
            if len(possible_signoffs) == 1:
1✔
1324
                self.signoffs.insert(changed_by=changed_by, transaction=transaction, dryrun=dryrun, sc_id=sc_id, role=possible_signoffs[0].get("role"))
1✔
1325

1326
    def select(self, where=None, transaction=None, **kwargs):
1✔
1327
        ret = []
1✔
1328
        # We'll be retrieving condition information for each Scheduled Change,
1329
        # and we'll need sc_id to do so.
1330
        if kwargs.get("columns") is not None:
1✔
1331
            # Columns can be specified as names or Column instances, so we must check for both.
1332
            if "sc_id" not in kwargs["columns"] and self.sc_id not in kwargs["columns"]:
1✔
1333
                kwargs["columns"].append(self.sc_id)
1✔
1334
        for row in super(ScheduledChangeTable, self).select(where=where, transaction=transaction, **kwargs):
1✔
1335
            columns = [getattr(self.conditions, c) for c in itertools.chain(*self.conditions.enabled_condition_groups.values())]
1✔
1336
            conditions = self.conditions.select([self.conditions.sc_id == row["sc_id"]], transaction=transaction, columns=columns)
1✔
1337
            row.update(conditions[0])
1✔
1338
            ret.append(row)
1✔
1339
        return ret
1✔
1340

1341
    def insert(self, changed_by, transaction=None, dryrun=False, **columns):
1✔
1342
        base_columns, condition_columns = self._splitColumns(columns)
1✔
1343
        if "change_type" not in base_columns:
1!
1344
            raise ValueError("Change type is required")
×
1345

1346
        self.validate(base_columns=base_columns, condition_columns=condition_columns, changed_by=changed_by, transaction=transaction)
1✔
1347

1348
        base_columns = self._prefixColumns(base_columns)
1✔
1349
        base_columns["scheduled_by"] = changed_by
1✔
1350

1351
        ret = super(ScheduledChangeTable, self).insert(changed_by=changed_by, transaction=transaction, dryrun=dryrun, **base_columns)
1✔
1352
        if not dryrun:
1!
1353
            sc_id = ret.inserted_primary_key[0]
1✔
1354
            self.conditions.insert(changed_by, transaction, dryrun, sc_id=sc_id, **condition_columns)
1✔
1355
            if not self._dataVersionsAreSynced(sc_id, transaction):
1✔
1356
                raise MismatchedDataVersionError("Conditions data version is out of sync with main table for sc_id %s", sc_id)
1✔
1357
            self.auto_signoff(changed_by, transaction, sc_id, dryrun, columns)
1✔
1358

1359
            return sc_id
1✔
1360

1361
    def update(self, where, what, changed_by, old_data_version, transaction=None, dryrun=False):
1✔
1362
        base_what, condition_what = self._splitColumns(what)
1✔
1363

1364
        affected_ids = []
1✔
1365
        # We need to check each Scheduled Change that would be affected by this
1366
        # to ensure the new row will be valid.
1367
        for row in self.select(where=where, transaction=transaction):
1✔
1368
            # verify whether the scheduled change has already been completed or not. If completed,
1369
            # then cannot modify the scheduled change anymore.
1370
            if row.get("complete"):
1✔
1371
                raise ValueError("Scheduled change already completed. Cannot update now.")
1✔
1372

1373
            affected_ids.append(row["sc_id"])
1✔
1374
            # Before validation, we need to create the new version of the
1375
            # Scheduled Change by combining the old one with the new data.
1376
            # To do this, we need to split the columns up a bit. First,
1377
            # separating the primary scheduled changes columns from the conditions...
1378
            sc_columns, condition_columns = self._splitColumns(row)
1✔
1379
            # ...and then combine taking the baseTable parts of sc_columns
1380
            # and combining them with any new values provided in base_what.
1381
            base_columns = {}
1✔
1382
            for col in sc_columns:
1✔
1383
                if not col.startswith("base_"):
1✔
1384
                    continue
1✔
1385
                base_col = col.replace("base_", "")
1✔
1386
                if base_col in base_what:
1✔
1387
                    base_columns[base_col] = base_what[base_col]
1✔
1388
                elif sc_columns.get(col):
1✔
1389
                    base_columns[base_col] = sc_columns[col]
1✔
1390

1391
            # As we need change_type in base_columns and it does not start with "base_". We assign it outside the loop
1392
            base_columns["change_type"] = sc_columns["change_type"]
1✔
1393

1394
            # Similarly, we need to integrate the new values for any conditions
1395
            # with the existing ones.
1396
            condition_columns.update(condition_what)
1✔
1397

1398
            # Now that we have all that sorted out, we can validate the new values for everything.
1399
            self.validate(base_columns, condition_columns, changed_by, sc_id=sc_columns["sc_id"], transaction=transaction)
1✔
1400

1401
            self.conditions.update([self.conditions.sc_id == sc_columns["sc_id"]], condition_columns, changed_by, old_data_version, transaction, dryrun=dryrun)
1✔
1402

1403
        base_what = self._prefixColumns(base_what)
1✔
1404
        base_what["scheduled_by"] = changed_by
1✔
1405
        ret = super(ScheduledChangeTable, self).update(where, base_what, changed_by, old_data_version, transaction, dryrun=dryrun)
1✔
1406
        sc_id = ret.last_updated_params()["sc_id"]
1✔
1407

1408
        for sc_id in affected_ids:
1✔
1409
            if not self._dataVersionsAreSynced(sc_id, transaction):
1✔
1410
                raise MismatchedDataVersionError("Conditions data version is out of sync with main table for sc_id %s" % sc_id)
1✔
1411

1412
        self.auto_signoff(changed_by, transaction, sc_id, dryrun, base_columns)
1✔
1413

1414
        for sc_id in affected_ids:
1✔
1415
            where_signOff = {"sc_id": sc_id}
1✔
1416
            signOffs = self.signoffs.select(where=where_signOff, transaction=transaction, columns=["sc_id", "username"])
1✔
1417
            for signOff in signOffs:
1✔
1418
                if signOff["username"] != changed_by:
1✔
1419
                    where_signOff.update({"username": signOff["username"]})
1✔
1420
                    self.signoffs.delete(where=where_signOff, changed_by=changed_by, transaction=transaction, reset_signoff=True)
1✔
1421

1422
    def delete(self, where, changed_by=None, old_data_version=None, transaction=None, dryrun=False):
1✔
1423
        conditions_where = []
1✔
1424
        for row in self.select(where=where, transaction=transaction):
1✔
1425
            # verify whether the scheduled change has already been completed or not. If completed,
1426
            # then cannot modify the scheduled change anymore.
1427
            if row.get("complete"):
1✔
1428
                raise ValueError("Scheduled change already completed. Cannot delete now.")
1✔
1429

1430
            conditions_where.append(self.conditions.sc_id == row["sc_id"])
1✔
1431
            base_row = {col[5:]: row[col] for col in row if col.startswith("base_")}
1✔
1432
            # we also need change_type in base_row to check permission
1433
            base_row["change_type"] = row["change_type"]
1✔
1434
            base_table_where = {pk: row["base_%s" % pk] for pk in self.base_primary_key}
1✔
1435
            # TODO: What permissions *should* be required to delete a scheduled change?
1436
            # It seems a bit odd to be checking base table update/insert here. Maybe
1437
            # something broader should be required?
1438
            self._checkBaseTablePermissions(base_table_where, base_row, changed_by, transaction)
1✔
1439

1440
        ret = super(ScheduledChangeTable, self).delete(where, changed_by, old_data_version, transaction, dryrun=dryrun)
1✔
1441
        self.conditions.delete(conditions_where, changed_by, old_data_version, transaction, dryrun=dryrun)
1✔
1442
        return ret
1✔
1443

1444
    async def asyncEnactChange(self, sc_id, enacted_by, transaction=None):
1✔
1445
        """Enacts a previously scheduled change by running update, insert, or delete on
1446
        the base table."""
1447
        if not self.db.hasPermission(enacted_by, "scheduled_change", "enact", transaction=transaction):
1✔
1448
            raise PermissionDeniedError("%s is not allowed to enact scheduled changes", enacted_by)
1✔
1449

1450
        sc = self.select(where=[self.sc_id == sc_id], transaction=transaction)[0]
1✔
1451
        what = {}
1✔
1452
        change_type = sc["change_type"]
1✔
1453
        for col in sc:
1✔
1454
            if col.startswith("base_"):
1✔
1455
                what[col[5:]] = sc[col]
1✔
1456

1457
        # The scheduled change is marked as complete first to avoid it being
1458
        # updated unnecessarily when the base table's update method calls
1459
        # mergeUpdate. If the base table update fails, this will get reverted
1460
        # when the transaction is rolled back.
1461
        # We explicitly avoid using ScheduledChangeTable's update() method here
1462
        # because we don't want to trigger its validation of conditions. Doing so
1463
        # would raise any exception for any timestamp based changes, because
1464
        # they are already in the past when we're ready to enact them.
1465
        # Updating in conditions table also so that history view can work
1466
        # See : https://bugzilla.mozilla.org/show_bug.cgi?id=1333876
1467
        self.conditions.update(
1✔
1468
            where=[self.conditions.sc_id == sc_id], what={}, changed_by=sc["scheduled_by"], old_data_version=sc["data_version"], transaction=transaction
1469
        )
1470
        super(ScheduledChangeTable, self).update(
1✔
1471
            where=[self.sc_id == sc_id], what={"complete": True}, changed_by=sc["scheduled_by"], old_data_version=sc["data_version"], transaction=transaction
1472
        )
1473

1474
        signoffs = self.signoffs.select(where=[self.signoffs.sc_id == sc_id], transaction=transaction)
1✔
1475

1476
        # If the scheduled change had a data version, it means the row already
1477
        # exists, and we need to use update() to enact it.
1478
        if change_type == "delete":
1✔
1479
            where = []
1✔
1480
            for col in self.base_primary_key:
1✔
1481
                where.append((getattr(self.baseTable, col) == sc["base_%s" % col]))
1✔
1482
            await self.baseTable.async_delete(where, sc["scheduled_by"], sc["base_data_version"], transaction=transaction, signoffs=signoffs)
1✔
1483
        elif change_type == "update":
1✔
1484
            where = []
1✔
1485
            for col in self.base_primary_key:
1✔
1486
                where.append((getattr(self.baseTable, col) == sc["base_%s" % col]))
1✔
1487
            await self.baseTable.async_update(where, what, sc["scheduled_by"], sc["base_data_version"], transaction=transaction, signoffs=signoffs)
1✔
1488
        elif change_type == "insert":
1!
1489
            await self.baseTable.async_insert(sc["scheduled_by"], transaction=transaction, signoffs=signoffs, **what)
1✔
1490
        else:
1491
            raise ValueError("Unknown Change Type")
×
1492

1493
    def enactChange(self, sc_id, enacted_by, transaction=None):
1✔
1494
        """Enacts a previously scheduled change by running update, insert, or delete on
1495
        the base table."""
1496
        if not self.db.hasPermission(enacted_by, "scheduled_change", "enact", transaction=transaction):
1✔
1497
            raise PermissionDeniedError("%s is not allowed to enact scheduled changes", enacted_by)
1✔
1498

1499
        sc = self.select(where=[self.sc_id == sc_id], transaction=transaction)[0]
1✔
1500
        what = {}
1✔
1501
        change_type = sc["change_type"]
1✔
1502
        for col in sc:
1✔
1503
            if col.startswith("base_"):
1✔
1504
                what[col[5:]] = sc[col]
1✔
1505

1506
        # The scheduled change is marked as complete first to avoid it being
1507
        # updated unnecessarily when the base table's update method calls
1508
        # mergeUpdate. If the base table update fails, this will get reverted
1509
        # when the transaction is rolled back.
1510
        # We explicitly avoid using ScheduledChangeTable's update() method here
1511
        # because we don't want to trigger its validation of conditions. Doing so
1512
        # would raise any exception for any timestamp based changes, because
1513
        # they are already in the past when we're ready to enact them.
1514
        # Updating in conditions table also so that history view can work
1515
        # See : https://bugzilla.mozilla.org/show_bug.cgi?id=1333876
1516
        self.conditions.update(
1✔
1517
            where=[self.conditions.sc_id == sc_id], what={}, changed_by=sc["scheduled_by"], old_data_version=sc["data_version"], transaction=transaction
1518
        )
1519
        super(ScheduledChangeTable, self).update(
1✔
1520
            where=[self.sc_id == sc_id], what={"complete": True}, changed_by=sc["scheduled_by"], old_data_version=sc["data_version"], transaction=transaction
1521
        )
1522

1523
        signoffs = self.signoffs.select(where=[self.signoffs.sc_id == sc_id], transaction=transaction)
1✔
1524

1525
        # If the scheduled change had a data version, it means the row already
1526
        # exists, and we need to use update() to enact it.
1527
        if change_type == "delete":
1✔
1528
            where = []
1✔
1529
            for col in self.base_primary_key:
1✔
1530
                where.append((getattr(self.baseTable, col) == sc["base_%s" % col]))
1✔
1531
            self.baseTable.delete(where, sc["scheduled_by"], sc["base_data_version"], transaction=transaction, signoffs=signoffs)
1✔
1532
        elif change_type == "update":
1✔
1533
            where = []
1✔
1534
            for col in self.base_primary_key:
1✔
1535
                where.append((getattr(self.baseTable, col) == sc["base_%s" % col]))
1✔
1536
            self.baseTable.update(where, what, sc["scheduled_by"], sc["base_data_version"], transaction=transaction, signoffs=signoffs)
1✔
1537
        elif change_type == "insert":
1!
1538
            for col in self.base_primary_key:
1✔
1539
                # as we want sqlalchemy to return the automatically inserted id, we need to not pass it (for 1.4)
1540
                if what[col] is None:
1✔
1541
                    del what[col]
1✔
1542
            self.baseTable.insert(sc["scheduled_by"], transaction=transaction, signoffs=signoffs, **what)
1✔
1543
        else:
1544
            raise ValueError("Unknown Change Type")
×
1545

1546
    def mergeUpdate(self, old_row, what, changed_by, transaction=None):
1✔
1547
        """Merges an update to the base table into any changes that may be
1548
        scheduled for the affected row. If the changes are unmergable
1549
        (meaning: the scheduled change and the new version of the row modify
1550
        the same columns), an UpdateMergeError is raised."""
1551

1552
        # Filter the update to only include fields that are different than
1553
        # what's in the base (old_row).
1554
        what = {k: v for k, v in what.items() if v != old_row.get(k)}
1✔
1555

1556
        # pyflakes thinks this should be "is False", but that's not how SQLAlchemy
1557
        # works, so we need to shut it up.
1558
        # http://stackoverflow.com/questions/18998010/flake8-complains-on-boolean-comparison-in-filter-clause
1559
        where = [self.complete == False]  # noqa
1✔
1560
        for col in self.base_primary_key:
1✔
1561
            where.append((getattr(self, "base_%s" % col) == old_row[col]))
1✔
1562

1563
        scheduled_changes = self.select(where=where, transaction=transaction)
1✔
1564

1565
        if not scheduled_changes:
1✔
1566
            self.log.debug("No scheduled changes found for update; nothing to do")
1✔
1567
            return
1✔
1568
        for sc in scheduled_changes:
1✔
1569
            self.log.debug("Trying to merge update with scheduled change '%s'", sc["sc_id"])
1✔
1570

1571
            for col in what:
1✔
1572
                # If the scheduled change is different than the old row it will
1573
                # be modifying the row when enacted. If the update to the row
1574
                # ("what") is also modifying the same column, this is a conflict
1575
                # that the server cannot resolve.
1576
                if sc["base_%s" % col] != old_row.get(col) and what.get(col) != old_row.get(col):
1✔
1577
                    raise UpdateMergeError("Cannot safely merge change to '%s' with scheduled change '%s'", col, sc["sc_id"])
1✔
1578

1579
            # If we get here, the change is safely mergeable
1580
            self.update(
1✔
1581
                where=[self.sc_id == sc["sc_id"]], what=what, changed_by=sc["scheduled_by"], old_data_version=sc["data_version"], transaction=transaction
1582
            )
1583
            self.log.debug("Merged %s into scheduled change '%s'", what, sc["sc_id"])
1✔
1584

1585

1586
class RequiredSignoffsTable(AUSTable):
1✔
1587
    """RequiredSignoffsTables store and validate information about what types
1588
    and how many signoffs are required for the data provided in
1589
    `decisionColumns`. Subclasses are required to create a Table with the
1590
    necessary columns, and add those columns names to `decisionColumns`.
1591
    When changes are made to a RequiredSignoffsTable, it will look at its own
1592
    rows to determine whether or not that change needs signoff."""
1593

1594
    decisionColumns = []
1✔
1595

1596
    def __init__(self, db, dialect):
1✔
1597
        self.table.append_column(Column("role", String(50), primary_key=True))
1✔
1598
        self.table.append_column(Column("signoffs_required", Integer, nullable=False))
1✔
1599

1600
        super(RequiredSignoffsTable, self).__init__(
1✔
1601
            db, dialect, scheduled_changes=True, scheduled_changes_kwargs={"conditions": ["time"]}, historyClass=HistoryTable
1602
        )
1603

1604
    def getPotentialRequiredSignoffs(self, affected_rows, transaction=None):
1✔
1605
        potential_required_signoffs = {"rs": []}
1✔
1606
        for row in affected_rows:
1✔
1607
            if not row:
1!
1608
                continue
×
1609
            where = {col: row[col] for col in self.decisionColumns}
1✔
1610
            potential_required_signoffs["rs"].extend(self.select(where=where, transaction=transaction))
1✔
1611
        return potential_required_signoffs
1✔
1612

1613
    def validate(self, columns, transaction=None):
1✔
1614
        for col in self.decisionColumns:
1✔
1615
            if columns[col] is None:
1!
1616
                raise ValueError("{} are required.".format(self.decisionColumns))
×
1617
            user_table = self.db.permissions.user_roles
1✔
1618
            users_with_role = user_table.count(where=[user_table.role == columns["role"]], transaction=transaction)
1✔
1619

1620
        if users_with_role < columns["signoffs_required"]:
1✔
1621
            msg = ", ".join([columns[col] for col in self.decisionColumns])
1✔
1622
            raise ValueError(
1✔
1623
                "Cannot require {} signoffs for {} - only {} users hold the {} role".format(columns["signoffs_required"], msg, users_with_role, columns["role"])
1624
            )
1625

1626
    def insert(self, changed_by, transaction=None, dryrun=False, signoffs=None, **columns):
1✔
1627
        self.validate(columns, transaction=transaction)
1✔
1628

1629
        if not self.db.hasPermission(changed_by, "required_signoff", "create", transaction=transaction):
1✔
1630
            raise PermissionDeniedError("{} is not allowed to create new Required Signoffs.".format(changed_by))
1✔
1631

1632
        if not dryrun:
1✔
1633
            potential_required_signoffs = [obj for v in self.getPotentialRequiredSignoffs([columns], transaction=transaction).values() for obj in v]
1✔
1634
            verify_signoffs(potential_required_signoffs, signoffs)
1✔
1635

1636
        return super(RequiredSignoffsTable, self).insert(changed_by=changed_by, transaction=transaction, dryrun=dryrun, **columns)
1✔
1637

1638
    def update(self, where, what, changed_by, old_data_version, transaction=None, dryrun=False, signoffs=None):
1✔
1639
        for rs in self.select(where=where, transaction=transaction):
1✔
1640
            new_rs = rs.copy()
1✔
1641
            new_rs.update(what)
1✔
1642
            self.validate(new_rs, transaction=transaction)
1✔
1643

1644
            if not self.db.hasPermission(changed_by, "required_signoff", "modify", transaction=transaction):
1✔
1645
                raise PermissionDeniedError("{} is not allowed to modify Required Signoffs.".format(changed_by))
1✔
1646

1647
            if not dryrun:
1✔
1648
                potential_required_signoffs = [obj for v in self.getPotentialRequiredSignoffs([rs, new_rs], transaction=transaction).values() for obj in v]
1✔
1649
                verify_signoffs(potential_required_signoffs, signoffs)
1✔
1650

1651
        return super(RequiredSignoffsTable, self).update(
1✔
1652
            where=where, what=what, changed_by=changed_by, old_data_version=old_data_version, transaction=transaction, dryrun=dryrun
1653
        )
1654

1655
    def delete(self, where, changed_by=None, old_data_version=None, transaction=None, dryrun=False, signoffs=None):
1✔
1656
        if not self.db.hasPermission(changed_by, "required_signoff", "delete", transaction=transaction):
1✔
1657
            raise PermissionDeniedError("{} is not allowed to remove Required Signoffs.".format(changed_by))
1✔
1658

1659
        if not dryrun:
1✔
1660
            for rs in self.select(where=where, transaction=transaction):
1✔
1661
                potential_required_signoffs = [obj for v in self.getPotentialRequiredSignoffs([rs], transaction=transaction).values() for obj in v]
1✔
1662
                verify_signoffs(potential_required_signoffs, signoffs)
1✔
1663

1664
        return super(RequiredSignoffsTable, self).delete(
1✔
1665
            where=where, changed_by=changed_by, old_data_version=old_data_version, transaction=transaction, dryrun=dryrun
1666
        )
1667

1668

1669
class ProductRequiredSignoffsTable(RequiredSignoffsTable):
1✔
1670
    decisionColumns = ["product", "channel"]
1✔
1671

1672
    def __init__(self, db, metadata, dialect):
1✔
1673
        self.table = Table("product_req_signoffs", metadata, Column("product", String(15), primary_key=True), Column("channel", String(75), primary_key=True))
1✔
1674
        super(ProductRequiredSignoffsTable, self).__init__(db, dialect)
1✔
1675

1676

1677
class PermissionsRequiredSignoffsTable(RequiredSignoffsTable):
1✔
1678
    decisionColumns = ["product"]
1✔
1679

1680
    def __init__(self, db, metadata, dialect):
1✔
1681
        self.table = Table("permissions_req_signoffs", metadata, Column("product", String(15), primary_key=True))
1✔
1682
        super(PermissionsRequiredSignoffsTable, self).__init__(db, dialect)
1✔
1683

1684

1685
class SignoffsTable(AUSTable):
1✔
1686
    def __init__(self, db, metadata, dialect, baseName):
1✔
1687
        self.table = Table(
1✔
1688
            "{}_signoffs".format(baseName),
1689
            metadata,
1690
            Column("sc_id", Integer, primary_key=True, autoincrement=False),
1691
            Column("username", String(100), primary_key=True),
1692
            Column("role", String(50), nullable=False),
1693
        )
1694
        # Because Signoffs cannot be modified, there's no possibility of an
1695
        # update race, so they do not need to be versioned.
1696
        super(SignoffsTable, self).__init__(db, dialect, versioned=False, historyClass=HistoryTable)
1✔
1697

1698
    def insert(self, changed_by=None, transaction=None, dryrun=False, **columns):
1✔
1699
        if "sc_id" not in columns or "role" not in columns:
1!
1700
            raise ValueError("sc_id and role must be provided when signing off")
×
1701
        if "username" in columns and columns["username"] != changed_by:
1!
1702
            raise PermissionDeniedError("Cannot signoff on behalf of another user")
×
1703
        if changed_by in self.db.systemAccounts:
1✔
1704
            raise PermissionDeniedError("System account cannot signoff")
1✔
1705
        if not self.db.hasRole(changed_by, columns["role"], transaction=transaction):
1✔
1706
            raise PermissionDeniedError("{} cannot signoff with role '{}'".format(changed_by, columns["role"]))
1✔
1707

1708
        existing_signoff = self.select({"sc_id": columns["sc_id"], "username": changed_by}, transaction)
1✔
1709
        if existing_signoff:
1✔
1710
            # It shouldn't be possible for there to be more than one signoff,
1711
            # so not iterating over this should be fine.
1712
            existing_signoff = existing_signoff[0]
1✔
1713
            if existing_signoff["role"] != columns["role"]:
1✔
1714
                raise PermissionDeniedError("Cannot signoff with a second role")
1✔
1715
            # Signoff already made under the same role, we don't need to do
1716
            # anything!
1717
            return
1✔
1718

1719
        columns["username"] = changed_by
1✔
1720
        super(SignoffsTable, self).insert(changed_by=changed_by, transaction=transaction, dryrun=dryrun, **columns)
1✔
1721

1722
    def update(self, where, what, changed_by=None, transaction=None, dryrun=False):
1✔
1723
        raise AttributeError("Signoffs cannot be modified (only granted and revoked)")
1✔
1724

1725
    def delete(self, where, changed_by=None, transaction=None, dryrun=False, reset_signoff=False):
1✔
1726
        if not reset_signoff:
1✔
1727
            for row in self.select(where, transaction):
1✔
1728
                if changed_by in self.db.systemAccounts:
1✔
1729
                    raise PermissionDeniedError("System accounts cannot revoke a signoff")
1✔
1730
                if not self.db.hasRole(changed_by, row["role"], transaction=transaction) and not self.db.isAdmin(changed_by, transaction=transaction):
1✔
1731
                    raise PermissionDeniedError("Cannot revoke a signoff made by someone in a group you do not belong to")
1✔
1732

1733
        super(SignoffsTable, self).delete(where, changed_by=changed_by, transaction=transaction, dryrun=dryrun)
1✔
1734

1735

1736
class Rules(AUSTable):
1✔
1737
    def __init__(self, db, metadata, dialect):
1✔
1738
        self.table = Table(
1✔
1739
            "rules",
1740
            metadata,
1741
            Column("rule_id", Integer, primary_key=True, autoincrement=True),
1742
            Column("alias", String(50), unique=True),
1743
            Column("priority", Integer),
1744
            Column("mapping", String(100)),
1745
            Column("fallbackMapping", String(100)),
1746
            Column("backgroundRate", Integer),
1747
            Column("update_type", String(15), nullable=False),
1748
            Column("product", String(15)),
1749
            Column("version", String(75)),
1750
            Column("channel", String(75)),
1751
            Column("buildTarget", String(75)),
1752
            Column("buildID", String(20)),
1753
            Column("locale", String(200)),
1754
            Column("osVersion", String(1000)),
1755
            Column("memory", String(100)),
1756
            Column("instructionSet", String(1000)),
1757
            Column("jaws", CompatibleBooleanColumn),
1758
            Column("mig64", CompatibleBooleanColumn),
1759
            Column("distribution", String(2000)),
1760
            Column("distVersion", String(100)),
1761
            Column("headerArchitecture", String(10)),
1762
            Column("comment", String(500)),
1763
        )
1764

1765
        AUSTable.__init__(self, db, dialect, scheduled_changes=True, historyClass=HistoryTable)
1✔
1766

1767
    def getPotentialRequiredSignoffs(self, affected_rows, transaction=None):
1✔
1768
        potential_required_signoffs = {}
1✔
1769
        rows = []
1✔
1770
        # The new row may change the product or channel, so we must look for
1771
        # Signoffs for both.
1772
        for row in affected_rows:
1✔
1773
            if not row:
1!
1774
                continue
×
1775
            rows.append(row)
1✔
1776

1777
        where = {}
1✔
1778
        cond = []
1✔
1779
        for row in rows:
1✔
1780
            if not row.get("product"):
1✔
1781
                # If product isn't present, or is None, it means the Rule affects
1782
                # all products, and we must leave it out of the where clause. If
1783
                # we included it, the query would only match rows where product is
1784
                # NULL. Since we are returning all rs, we can safely breakout of this loop
1785
                break
1✔
1786
            cond.append(row["product"])
1✔
1787
        else:  # nobreak
1788
            where = [self.db.productRequiredSignoffs.product.in_(tuple(cond))]
1✔
1789

1790
        q = self.db.productRequiredSignoffs.select(where=where, transaction=transaction)
1✔
1791

1792
        # map query result using product as the key
1793
        q_map = {}
1✔
1794
        for rs in q:
1✔
1795
            if rs["product"] in q_map:
1✔
1796
                q_map[rs["product"]].append(rs)
1✔
1797
            else:
1798
                q_map[rs["product"]] = [rs]
1✔
1799

1800
        for row in rows:
1✔
1801
            potential_required_signoffs[(row.get("product"), row.get("channel"))] = get_required_signoffs_for_product_channel(
1✔
1802
                row.get("product"), row.get("channel"), q_map, q
1803
            )
1804
        return potential_required_signoffs
1✔
1805

1806
    def _isAlias(self, id_or_alias):
1✔
1807
        if re.match("^[a-zA-Z][a-zA-Z0-9-]*$", str(id_or_alias)):
1✔
1808
            return True
1✔
1809
        return False
1✔
1810

1811
    def insert(self, changed_by, transaction=None, dryrun=False, signoffs=None, **columns):
1✔
1812
        if not self.db.hasPermission(changed_by, "rule", "create", columns.get("product"), transaction):
1✔
1813
            raise PermissionDeniedError("%s is not allowed to create new rules for product %s" % (changed_by, columns.get("product")))
1✔
1814

1815
        if not dryrun:
1✔
1816
            potential_required_signoffs = [obj for v in self.getPotentialRequiredSignoffs([columns], transaction=transaction).values() for obj in v]
1✔
1817
            verify_signoffs(potential_required_signoffs, signoffs)
1✔
1818

1819
        ret = super(Rules, self).insert(changed_by=changed_by, transaction=transaction, dryrun=dryrun, **columns)
1✔
1820
        if not dryrun:
1✔
1821
            return ret.inserted_primary_key[0]
1✔
1822

1823
    def getOrderedRules(self, where=None, transaction=None):
1✔
1824
        """Returns all of the rules, sorted in ascending order"""
1825
        return self.select(where=where, order_by=(self.priority, self.version, self.mapping), transaction=transaction)
1✔
1826

1827
    def getRulesMatchingQuery(self, updateQuery, fallbackChannel, transaction=None):
1✔
1828
        """Returns all of the rules that match the given update query.
1829
        For cases where a particular updateQuery channel has no
1830
        fallback, fallbackChannel should match the channel from the query."""
1831

1832
        def getRawMatches():
1✔
1833
            where = [
1✔
1834
                ((self.product == updateQuery["product"]) | (self.product == null()))
1835
                & ((self.buildTarget == updateQuery["buildTarget"]) | (self.buildTarget == null()))
1836
            ]
1837

1838
            if "headerArchitecture" in updateQuery:
1✔
1839
                where.extend([(self.headerArchitecture == updateQuery.get("headerArchitecture")) | (self.headerArchitecture == null())])
1✔
1840
            else:
1841
                where.extend([self.headerArchitecture == null()])
1✔
1842

1843
            if "distVersion" in updateQuery:
1✔
1844
                where.extend([(self.distVersion == updateQuery["distVersion"]) | (self.distVersion == null())])
1✔
1845
            else:
1846
                where.extend([self.distVersion == null()])
1✔
1847

1848
            self.log.debug("where: %s", where)
1✔
1849
            return self.select(where=where, transaction=transaction)
1✔
1850

1851
        # This cache key is constructed from all parts of the updateQuery that
1852
        # are used in the select() to get the "raw" rule matches. For the most
1853
        # part, product and buildTarget will be the only applicable ones which
1854
        # means we should get very high cache hit rates, as there's not a ton
1855
        # of variability of possible combinations for those.
1856
        cache_key = "%s:%s:%s:%s:%s" % (
1✔
1857
            updateQuery["product"],
1858
            updateQuery["buildTarget"],
1859
            updateQuery.get("headerArchitecture"),
1860
            updateQuery.get("distVersion"),
1861
            updateQuery.get("force"),
1862
        )
1863
        rules = cache.get("rules", cache_key, getRawMatches)
1✔
1864

1865
        self.log.debug("Raw matches:")
1✔
1866

1867
        matchingRules = []
1✔
1868
        for rule in rules:
1✔
1869
            self.log.debug(rule)
1✔
1870

1871
            # Resolve special means for channel, version, and buildID - dropping
1872
            # rules that don't match after resolution.
1873
            if not matchChannel(rule["channel"], updateQuery["channel"], fallbackChannel):
1✔
1874
                self.log.debug("%s doesn't match %s", rule["channel"], updateQuery["channel"])
1✔
1875
                continue
1✔
1876
            if not matchVersion(rule["version"], updateQuery["version"], get_version_class(updateQuery["product"])):
1✔
1877
                self.log.debug("%s doesn't match %s", rule["version"], updateQuery["version"])
1✔
1878
                continue
1✔
1879
            if not matchBuildID(rule["buildID"], updateQuery.get("buildID", "")):
1✔
1880
                self.log.debug("%s doesn't match %s", rule["buildID"], updateQuery.get("buildID"))
1✔
1881
                continue
1✔
1882
            if not matchMemory(rule["memory"], updateQuery.get("memory")):
1✔
1883
                self.log.debug("%s doesn't match %s", rule["memory"], updateQuery.get("memory"))
1✔
1884
                continue
1✔
1885
            # To help keep the rules table compact, multiple OS versions may be
1886
            # specified in a single rule. They are comma delimited, so we need to
1887
            # break them out and create clauses for each one.
1888
            if not matchSimpleExpression(rule["osVersion"], updateQuery.get("osVersion", "")):
1✔
1889
                self.log.debug("%s doesn't match %s", rule["osVersion"], updateQuery.get("osVersion"))
1✔
1890
                continue
1✔
1891
            if not matchCsv(rule["instructionSet"], updateQuery.get("instructionSet", ""), substring=False):
1✔
1892
                self.log.debug("%s doesn't match %s", rule["instructionSet"], updateQuery.get("instructionSet"))
1✔
1893
                continue
1✔
1894
            if not matchCsv(rule["distribution"], updateQuery.get("distribution", ""), substring=False):
1✔
1895
                self.log.debug("%s doesn't match %s", rule["distribution"], updateQuery.get("distribution"))
1✔
1896
                continue
1✔
1897
            # Locales may be a comma delimited rule too, exact matches only
1898
            if not matchLocale(rule["locale"], updateQuery.get("locale", "")):
1✔
1899
                self.log.debug("%s doesn't match %s", rule["locale"], updateQuery.get("locale"))
1✔
1900
                continue
1✔
1901
            if not matchBoolean(rule["mig64"], updateQuery.get("mig64")):
1✔
1902
                self.log.debug("%s doesn't match %s", rule["mig64"], updateQuery.get("mig64"))
1✔
1903
                continue
1✔
1904
            if not matchBoolean(rule["jaws"], updateQuery.get("jaws")):
1✔
1905
                self.log.debug("%s doesn't match %s", rule["jaws"], updateQuery.get("jaws"))
1✔
1906
                continue
1✔
1907

1908
            matchingRules.append(rule)
1✔
1909

1910
        self.log.debug("Reduced matches:")
1✔
1911
        if self.log.isEnabledFor(logging.DEBUG):
1!
1912
            for r in matchingRules:
×
1913
                self.log.debug(r)
×
1914
        return matchingRules
1✔
1915

1916
    def getRule(self, id_or_alias, transaction=None):
1✔
1917
        """Returns the unique rule that matches the give rule_id or alias."""
1918
        where = []
1✔
1919
        # Figuring out which column to use ahead of times means there's only
1920
        # one potential index for the database to use, which should make
1921
        # queries faster (it will always use the most efficient one).
1922
        if self._isAlias(id_or_alias):
1✔
1923
            where.append(self.alias == id_or_alias)
1✔
1924
        else:
1925
            where.append(self.rule_id == id_or_alias)
1✔
1926

1927
        rules = self.select(where=where, transaction=transaction)
1✔
1928
        found = len(rules)
1✔
1929
        if found > 1 or found == 0:
1✔
1930
            self.log.debug("Found %s rules, should have been 1", found)
1✔
1931
            return None
1✔
1932
        return rules[0]
1✔
1933

1934
    def update(self, where, what, changed_by, old_data_version, transaction=None, dryrun=False, signoffs=None):
1✔
1935
        # Rather than forcing callers to figure out whether the identifier
1936
        # they have is an id or an alias, we handle it here.
1937
        if "rule_id" in where and self._isAlias(where["rule_id"]):
1✔
1938
            where["alias"] = where["rule_id"]
1✔
1939
            del where["rule_id"]
1✔
1940

1941
        # If the product is being changed, we also need to make sure the user
1942
        # permission to modify _that_ product.
1943
        if "product" in what:
1✔
1944
            if not self.db.hasPermission(changed_by, "rule", "modify", what["product"], transaction):
1✔
1945
                raise PermissionDeniedError("%s is not allowed to modify rules for product %s" % (changed_by, what["product"]))
1✔
1946

1947
        for current_rule in self.select(where=where, transaction=transaction):
1✔
1948
            if not self.db.hasPermission(changed_by, "rule", "modify", current_rule["product"], transaction):
1✔
1949
                raise PermissionDeniedError("%s is not allowed to modify rules for product %s" % (changed_by, current_rule["product"]))
1✔
1950

1951
            new_rule = current_rule.copy()
1✔
1952
            new_rule.update(what)
1✔
1953
            if not dryrun:
1✔
1954
                potential_required_signoffs = [
1✔
1955
                    obj for v in self.getPotentialRequiredSignoffs([current_rule, new_rule], transaction=transaction).values() for obj in v
1956
                ]
1957
                verify_signoffs(potential_required_signoffs, signoffs)
1✔
1958

1959
        return super(Rules, self).update(
1✔
1960
            changed_by=changed_by, where=where, what=what, old_data_version=old_data_version, transaction=transaction, dryrun=dryrun
1961
        )
1962

1963
    def delete(self, where, changed_by=None, old_data_version=None, transaction=None, dryrun=False, signoffs=None):
1✔
1964
        if "rule_id" in where and self._isAlias(where["rule_id"]):
1✔
1965
            where["alias"] = where["rule_id"]
1✔
1966
            del where["rule_id"]
1✔
1967

1968
        product = self.select(where=where, columns=[self.product], transaction=transaction)[0]["product"]
1✔
1969
        if not self.db.hasPermission(changed_by, "rule", "delete", product, transaction):
1✔
1970
            raise PermissionDeniedError("%s is not allowed to delete rules for product %s" % (changed_by, product))
1✔
1971

1972
        if not dryrun:
1✔
1973
            for current_rule in self.select(where=where, transaction=transaction):
1✔
1974
                potential_required_signoffs = [obj for v in self.getPotentialRequiredSignoffs([current_rule], transaction=transaction).values() for obj in v]
1✔
1975
                verify_signoffs(potential_required_signoffs, signoffs)
1✔
1976

1977
        super(Rules, self).delete(changed_by=changed_by, where=where, old_data_version=old_data_version, transaction=transaction, dryrun=dryrun)
1✔
1978

1979

1980
class Releases(AUSTable):
1✔
1981
    def __init__(self, db, metadata, dialect, history_buckets, historyClass):
1✔
1982
        self.domainAllowlist = []
1✔
1983

1984
        self.table = Table(
1✔
1985
            "releases",
1986
            metadata,
1987
            Column("name", String(100), primary_key=True),
1988
            Column("product", String(15), nullable=False),
1989
            Column("read_only", Boolean, default=False),
1990
        )
1991
        if dialect == "mysql":
1!
1992
            from sqlalchemy.dialects.mysql import LONGTEXT
×
1993

1994
            dataType = LONGTEXT
×
1995
        else:
1996
            dataType = Text
1✔
1997
        self.table.append_column(Column("data", BlobColumn(dataType), nullable=False))
1✔
1998
        historyKwargs = {}
1✔
1999
        if history_buckets:
1✔
2000
            historyKwargs["buckets"] = history_buckets
1✔
2001
            historyKwargs["identifier_columns"] = ["name"]
1✔
2002
            historyKwargs["data_column"] = "data"
1✔
2003
        else:
2004
            # Can't have history without a bucket
2005
            historyClass = None
1✔
2006
        AUSTable.__init__(
1✔
2007
            self, db, dialect, scheduled_changes=True, scheduled_changes_kwargs={"conditions": ["time"]}, historyClass=historyClass, historyKwargs=historyKwargs
2008
        )
2009

2010
    def getPotentialRequiredSignoffs(self, affected_rows, transaction=None):
1✔
2011
        potential_required_signoffs = {}
1✔
2012
        rows = []
1✔
2013
        for row in affected_rows:
1✔
2014
            if not row:
1!
2015
                continue
×
2016
            rows.append(row)
1✔
2017
        info = self.getReleaseInfo(names=[row["name"] for row in rows], transaction=transaction)
1✔
2018
        # Releases do not affect live updates on their own, only the
2019
        # product+channel combinations specified in Rules that point
2020
        # to them. We need to find these Rules, and then return _their_
2021
        # Required Signoffs.
2022
        if info:
1✔
2023
            relevant_rules = [rule_info for row in info for rule_info in row["rule_info"].values()]
1✔
2024

2025
            # get all rs as one query
2026
            all_rs = self.db.rules.getPotentialRequiredSignoffs(relevant_rules, transaction=transaction)
1✔
2027

2028
            for row in info:
1✔
2029
                rs = []
1✔
2030
                potential_required_signoffs[row["name"]] = []
1✔
2031
                for rule in row["rule_info"].values():
1✔
2032
                    _rs = all_rs[(rule["product"], rule["channel"])]
1✔
2033
                    rs.extend(_rs)
1✔
2034
                potential_required_signoffs[row["name"]] = rs
1✔
2035
        else:
2036
            potential_required_signoffs["rs"] = []
1✔
2037
        return potential_required_signoffs
1✔
2038

2039
    def getPotentialRequiredSignoffsForProduct(self, product, transaction=None):
1✔
2040
        potential_required_signoffs = {"rs": []}
1✔
2041
        where = [self.db.productRequiredSignoffs.product == product]
1✔
2042
        product_rs = self.db.productRequiredSignoffs.select(where=where, transaction=transaction)
1✔
2043
        if product_rs:
1✔
2044
            role_map = defaultdict(list)
1✔
2045
            for rs in product_rs:
1✔
2046
                role_map[rs["role"]].append(rs)
1✔
2047
            signoffs_required = [max(signoffs, default=None, key=lambda k: k["signoffs_required"]) for signoffs in role_map.values()]
1✔
2048
            potential_required_signoffs["rs"] = signoffs_required
1✔
2049
        return potential_required_signoffs
1✔
2050

2051
    def setDomainAllowlist(self, domainAllowlist):
1✔
2052
        self.domainAllowlist = domainAllowlist
1✔
2053

2054
    def getReleases(self, name=None, product=None, limit=None, transaction=None):
1✔
2055
        self.log.debug("Looking for releases with:")
1✔
2056
        self.log.debug("name: %s", name)
1✔
2057
        self.log.debug("product: %s", product)
1✔
2058
        where = []
1✔
2059
        if name:
1✔
2060
            where.append(self.name == name)
1✔
2061
        if product:
1!
2062
            where.append(self.product == product)
×
2063
        # We could get the "data" column here too, but getReleaseBlob knows how
2064
        # to grab cached versions of that, so it's better to let it take care
2065
        # of it.
2066
        rows = self.select(columns=[self.name, self.product, self.data_version], where=where, limit=limit, transaction=transaction)
1✔
2067
        for row in rows:
1✔
2068
            row["data"] = self.getReleaseBlob(row["name"], transaction)
1✔
2069
        return rows
1✔
2070

2071
    def getReleaseInfo(self, names=None, product=None, limit=None, transaction=None, nameOnly=False, name_prefix=None):
1✔
2072
        where = []
1✔
2073
        if names:
1✔
2074
            where.append(self.name.in_(tuple(names)))
1✔
2075
        if product:
1✔
2076
            where.append(self.product == product)
1✔
2077
        if name_prefix:
1✔
2078
            where.append(self.name.startswith(name_prefix))
1✔
2079
        if nameOnly:
1✔
2080
            column = [self.name]
1✔
2081
        else:
2082
            column = [self.name, self.product, self.data_version, self.read_only]
1✔
2083

2084
        rows = self.select(where=where, columns=column, limit=limit, transaction=transaction)
1✔
2085

2086
        if not nameOnly:
1✔
2087
            j = join(
1✔
2088
                self.db.releases.t,
2089
                self.db.rules.t,
2090
                ((self.db.releases.name == self.db.rules.mapping) | (self.db.releases.name == self.db.rules.fallbackMapping)),
2091
            )
2092
            if transaction:
1✔
2093
                ref_list = transaction.execute(
1✔
2094
                    select([self.db.releases.name, self.db.rules.rule_id, self.db.rules.product, self.db.rules.channel]).select_from(j)
2095
                ).fetchall()
2096
            else:
2097
                ref_list = (
1✔
2098
                    self.getEngine()
2099
                    .execute(select([self.db.releases.name, self.db.rules.rule_id, self.db.rules.product, self.db.rules.channel]).select_from(j))
2100
                    .fetchall()
2101
                )
2102

2103
            for row in rows:
1✔
2104
                refs = [ref for ref in ref_list if ref[0] == row["name"]]
1✔
2105
                ref_list = [ref for ref in ref_list if ref[0] != row["name"]]
1✔
2106
                row["rule_ids"] = [ref[1] for ref in refs]
1✔
2107
                row["rule_info"] = {str(ref[1]): {"product": ref[2], "channel": ref[3]} for ref in refs}
1✔
2108

2109
        return rows
1✔
2110

2111
    def getReleaseNames(self, **kwargs):
1✔
2112
        return self.getReleaseInfo(nameOnly=True, **kwargs)
1✔
2113

2114
    def getReleaseBlob(self, name, transaction=None):
1✔
2115
        # Putting the data_version and blob getters into these methods lets us
2116
        # delegate the decision about whether or not to use the cached values
2117
        # to the cache class. It will either return as a cached value, or use
2118
        # the getter to return a fresh value (and cache it).
2119
        def getDataVersion():
1✔
2120
            try:
1✔
2121
                return self.select(where=[self.name == name], columns=[self.data_version], limit=1, transaction=transaction)[0]
1✔
2122
            except IndexError:
1✔
2123
                raise KeyError("Couldn't find release with name '%s'" % name)
1✔
2124

2125
        data_version = cache.get("blob_version", name, getDataVersion)
1✔
2126

2127
        def getBlob():
1✔
2128
            try:
1✔
2129
                row = self.select(where=[self.name == name], columns=[self.data], limit=1, transaction=transaction)[0]
1✔
2130
                blob = row["data"]
1✔
2131
                return {"data_version": data_version, "blob": blob}
1✔
2132
            except IndexError:
×
2133
                raise KeyError("Couldn't find release with name '%s'" % name)
×
2134

2135
        def get_data_version(obj):
1✔
2136
            if isinstance(obj, int):
1✔
2137
                return obj
1✔
2138
            return obj["data_version"]
1✔
2139

2140
        cached_blob = cache.get("blob", name, getBlob)
1✔
2141

2142
        # Even though we may have retrieved a cached blob, we need to make sure
2143
        # that it's not older than the one in the database. If the data version
2144
        # of the cached blob and the latest data version don't match, we need
2145
        # to update the cache with the latest blob.
2146
        if get_data_version(data_version) > get_data_version(cached_blob["data_version"]):
1!
2147
            blob_info = getBlob()
×
2148
            cache.put("blob", name, blob_info)
×
2149
            blob = blob_info["blob"]
×
2150
        else:
2151
            # And while it's extremely unlikely, there is a remote possibility
2152
            # that the cached blob actually has a newer data version than the
2153
            # blob version cache. This can occur if the blob cache expired
2154
            # between retrieving the cached data version and cached blob.
2155
            # (Because the blob version cache ttl should be shorter than the
2156
            # blob cache ttl, if the blob cache expired prior to retrieving the
2157
            # data version, the blob version cache would've expired as well.
2158
            # If we hit one of these cases, we should bring the blob version
2159
            # cache up to date since we have it.
2160
            if get_data_version(cached_blob["data_version"]) > get_data_version(data_version):
1!
2161
                cache.put("blob_version", name, data_version)
×
2162
            blob = cached_blob["blob"]
1✔
2163

2164
        return blob
1✔
2165

2166
    def insert(self, changed_by, transaction=None, dryrun=False, signoffs=None, **columns):
1✔
2167
        if "name" not in columns or "product" not in columns or "data" not in columns:
1!
2168
            raise ValueError("name, product, and data are all required")
×
2169

2170
        blob = columns["data"]
1✔
2171

2172
        blob.validate(columns["product"], self.domainAllowlist)
1✔
2173
        if columns["name"] != blob["name"]:
1✔
2174
            raise ValueError("name in database (%s) does not match name in blob (%s)" % (columns["name"], blob["name"]))
1✔
2175

2176
        if not self.db.hasPermission(changed_by, "release", "create", columns["product"], transaction):
1✔
2177
            raise PermissionDeniedError("%s is not allowed to create releases for product %s" % (changed_by, columns["product"]))
1✔
2178

2179
        if not dryrun:
1✔
2180
            potential_required_signoffs = [obj for v in self.getPotentialRequiredSignoffs([columns], transaction=transaction).values() for obj in v]
1✔
2181
            verify_signoffs(potential_required_signoffs, signoffs)
1✔
2182

2183
        ret = super(Releases, self).insert(changed_by=changed_by, transaction=transaction, dryrun=dryrun, **columns)
1✔
2184
        if not dryrun:
1✔
2185
            cache.put("blob", columns["name"], {"data_version": 1, "blob": blob})
1✔
2186
            cache.put("blob_version", columns["name"], 1)
1✔
2187
            return ret.inserted_primary_key[0]
1✔
2188

2189
    def update(self, where, what, changed_by, old_data_version, transaction=None, dryrun=False, signoffs=None):
1✔
2190
        blob = what.get("data")
1✔
2191

2192
        current_releases = self.select(where=where, columns=[self.name, self.product, self.read_only], transaction=transaction)
1✔
2193
        for current_release in current_releases:
1✔
2194
            name = current_release["name"]
1✔
2195
            is_readonly_change = "read_only" in what and current_release["read_only"] != what["read_only"]
1✔
2196

2197
            if not is_readonly_change:
1✔
2198
                if "product" in what or "data" in what:
1!
2199
                    self._proceedIfNotReadOnly(current_release["name"], transaction=transaction)
1✔
2200

2201
                if blob:
1✔
2202
                    blob.validate(what.get("product", current_release["product"]), self.domainAllowlist)
1✔
2203
                    name = what.get("name", name)
1✔
2204
                    if name != blob["name"]:
1✔
2205
                        raise ValueError("name in database (%s) does not match name in blob (%s)" % (name, blob.get("name")))
1✔
2206

2207
                if not self.db.hasPermission(changed_by, "release", "modify", current_release["product"], transaction):
1✔
2208
                    raise PermissionDeniedError("%s is not allowed to modify releases for product %s" % (changed_by, current_release["product"]))
1✔
2209

2210
                if "product" in what:
1✔
2211
                    # If the product is being changed, we need to make sure the user
2212
                    # has permission to modify releases of that product, too.
2213
                    if not self.db.hasPermission(changed_by, "release", "modify", what["product"], transaction):
1✔
2214
                        raise PermissionDeniedError("%s is not allowed to modify releases for product %s" % (changed_by, what["product"]))
1✔
2215

2216
                new_release = current_release.copy()
1✔
2217
                new_release.update(what)
1✔
2218
                if not dryrun:
1✔
2219
                    potential_required_signoffs = [
1✔
2220
                        obj for v in self.getPotentialRequiredSignoffs([current_release, new_release], transaction=transaction).values() for obj in v
2221
                    ]
2222
                    verify_signoffs(potential_required_signoffs, signoffs)
1✔
2223
            else:
2224
                self.validate_readonly_change(
1✔
2225
                    where, what["read_only"], changed_by, release=current_release, transaction=transaction, dryrun=dryrun, signoffs=signoffs
2226
                )
2227

2228
        for release in current_releases:
1✔
2229
            name = current_release["name"]
1✔
2230
            new_data_version = old_data_version + 1
1✔
2231
            try:
1✔
2232
                super(Releases, self).update(
1✔
2233
                    where={"name": name}, what=what, changed_by=changed_by, old_data_version=old_data_version, transaction=transaction, dryrun=dryrun
2234
                )
2235
            except OutdatedDataError as e:
1✔
2236
                self.log.warning("Trying to merge update to release %s at data_version %s with the latest version.", name, old_data_version)
1✔
2237
                if blob is not None:
1!
2238
                    ancestor_change = self.history.getChange(data_version=old_data_version, column_values={"name": name}, transaction=transaction)
1✔
2239
                    # if we have no historical information about the ancestor blob
2240
                    if ancestor_change is None:
1!
2241
                        self.log.exception("Couldn't find history for release %s at data_version %s", name, old_data_version)
×
2242
                        raise
×
2243
                    ancestor_blob = ancestor_change.get("data")
1✔
2244
                    tip_release = self.getReleases(name=name, transaction=transaction)[0]
1✔
2245
                    tip_blob = tip_release.get("data")
1✔
2246
                    try:
1✔
2247
                        what["data"] = createBlob(merge_dicts(ancestor_blob, tip_blob, blob))
1✔
2248
                        self.log.warning("Successfully merged release %s at data_version %s with the latest version.", name, old_data_version)
1✔
2249
                        # ancestor_change is checked for None a few lines up
2250
                        self.log.warning(
1✔
2251
                            "ancestor_change is change_id %s, data_version %s", ancestor_change.get("change_id"), ancestor_change.get("data_version")
2252
                        )
2253
                        self.log.warning("tip release is data_version %s", tip_release.get("data_version"))
1✔
2254
                    except ValueError:
1✔
2255
                        self.log.exception("Couldn't merge release %s at data_version %s with the latest version.", name, old_data_version)
1✔
2256
                        # ancestor_change is checked for None a few lines up
2257
                        self.log.warning(
1✔
2258
                            "ancestor_change is change_id %s, data_version %s", ancestor_change.get("change_id"), ancestor_change.get("data_version")
2259
                        )
2260
                        self.log.warning("tip release is data_version %s", tip_release.get("data_version"))
1✔
2261
                        raise e
1✔
2262
                    # we want the data_version for the dictdiffer.merged blob to be one
2263
                    # more than that of the latest blob
2264
                    tip_data_version = tip_release["data_version"]
1✔
2265
                    super(Releases, self).update(
1✔
2266
                        where={"name": name}, what=what, changed_by=changed_by, old_data_version=tip_data_version, transaction=transaction, dryrun=dryrun
2267
                    )
2268
                    # cache will have a data_version of one plus the tip
2269
                    # data_version
2270
                    new_data_version = tip_data_version + 1
1✔
2271

2272
            if not dryrun:
1✔
2273
                cache.put("blob", name, {"data_version": new_data_version, "blob": blob})
1✔
2274
                cache.put("blob_version", name, new_data_version)
1✔
2275

2276
    def addLocaleToRelease(self, name, product, platform, locale, data, old_data_version, changed_by, transaction=None, alias=None):
1✔
2277
        """Adds or update's the existing data for a specific platform + locale
2278
        combination, in the release identified by 'name'. The data is
2279
        validated before commiting it, and a ValueError is raised if it is
2280
        invalid.
2281
        """
2282
        self._proceedIfNotReadOnly(name, transaction=transaction)
1✔
2283

2284
        where = [self.name == name]
1✔
2285
        product = self.select(where=where, columns=[self.product], transaction=transaction)[0]["product"]
1✔
2286
        if not self.db.hasPermission(changed_by, "release_locale", "modify", product, transaction):
1✔
2287
            raise PermissionDeniedError("%s is not allowed to add builds for product %s" % (changed_by, product))
1✔
2288

2289
        releaseBlob = self.getReleaseBlob(name, transaction=transaction)
1✔
2290
        if "platforms" not in releaseBlob:
1✔
2291
            releaseBlob["platforms"] = {}
1✔
2292

2293
        if platform in releaseBlob["platforms"]:
1✔
2294
            # If the platform we're given is aliased to another one, we need
2295
            # to resolve that before doing any updating. If we don't, the data
2296
            # will go into an aliased platform and be ignored!
2297
            platform = releaseBlob.getResolvedPlatform(platform)
1✔
2298

2299
        if platform not in releaseBlob["platforms"]:
1✔
2300
            releaseBlob["platforms"][platform] = {}
1✔
2301

2302
        if "locales" not in releaseBlob["platforms"][platform]:
1✔
2303
            releaseBlob["platforms"][platform]["locales"] = {}
1✔
2304

2305
        releaseBlob["platforms"][platform]["locales"][locale] = data
1✔
2306

2307
        # we don't allow modification of existing platforms (aliased or not)
2308
        if alias:
1✔
2309
            for a in alias:
1✔
2310
                if a not in releaseBlob["platforms"]:
1!
2311
                    releaseBlob["platforms"][a] = {"alias": platform}
1✔
2312

2313
        releaseBlob.validate(product, self.domainAllowlist)
1✔
2314
        what = dict(data=releaseBlob)
1✔
2315

2316
        self.update(where=where, what=what, changed_by=changed_by, old_data_version=old_data_version, transaction=transaction)
1✔
2317
        new_data_version = old_data_version + 1
1✔
2318
        cache.put("blob", name, {"data_version": new_data_version, "blob": releaseBlob})
1✔
2319
        cache.put("blob_version", name, new_data_version)
1✔
2320

2321
    def getLocale(self, name, platform, locale, transaction=None):
1✔
2322
        try:
1✔
2323
            blob = self.getReleaseBlob(name, transaction=transaction)
1✔
2324
            return blob["platforms"][platform]["locales"][locale]
1✔
2325
        except KeyError:
1✔
2326
            raise KeyError("Couldn't find locale identified by: %s, %s, %s" % (name, platform, locale))
1✔
2327

2328
    def localeExists(self, name, platform, locale, transaction=None):
1✔
2329
        try:
1✔
2330
            self.getLocale(name, platform, locale, transaction)
1✔
2331
            return True
1✔
2332
        except KeyError:
1✔
2333
            return False
1✔
2334

2335
    def isMappedTo(self, name, transaction=None):
1✔
2336
        mapping_count = self.db.rules.count(where=[self.db.rules.mapping == name], transaction=transaction)
1✔
2337
        fallbackMapping_count = self.db.rules.count(where=[self.db.rules.fallbackMapping == name], transaction=transaction)
1✔
2338
        return mapping_count > 0 or fallbackMapping_count > 0
1✔
2339

2340
    def delete(self, where, changed_by, old_data_version, transaction=None, dryrun=False, signoffs=None):
1✔
2341
        release = self.select(where=where, columns=[self.name, self.product], transaction=transaction)
1✔
2342
        if len(release) != 1:
1✔
2343
            raise ValueError("Where clause must match exactly one release to delete.")
1✔
2344
        release = release[0]
1✔
2345

2346
        if self.isMappedTo(release["name"], transaction):
1✔
2347
            msg = "%s has rules pointing to it. Hence it cannot be deleted." % (release["name"])
1✔
2348
            raise ValueError(msg)
1✔
2349

2350
        self._proceedIfNotReadOnly(release["name"], transaction=transaction)
1✔
2351
        if not self.db.hasPermission(changed_by, "release", "delete", release["product"], transaction):
1✔
2352
            raise PermissionDeniedError("%s is not allowed to delete releases for product %s" % (changed_by, release["product"]))
1✔
2353

2354
        if not dryrun:
1✔
2355
            potential_required_signoffs = [obj for v in self.getPotentialRequiredSignoffs([release], transaction=transaction).values() for obj in v]
1✔
2356
            verify_signoffs(potential_required_signoffs, signoffs)
1✔
2357

2358
        super(Releases, self).delete(where=where, changed_by=changed_by, old_data_version=old_data_version, transaction=transaction, dryrun=dryrun)
1✔
2359
        if not dryrun:
1✔
2360
            cache.invalidate("blob", release["name"])
1✔
2361
            cache.invalidate("blob_version", release["name"])
1✔
2362

2363
    def isReadOnly(self, name, limit=None, transaction=None):
1✔
2364
        where = [self.name == name]
1✔
2365
        column = [self.read_only]
1✔
2366
        row = self.select(where=where, columns=column, limit=limit, transaction=transaction)[0]
1✔
2367
        return row["read_only"]
1✔
2368

2369
    def _proceedIfNotReadOnly(self, name, limit=None, transaction=None):
1✔
2370
        if self.isReadOnly(name, limit, transaction):
1✔
2371
            raise ReadOnlyError("Release '%s' is read only" % name)
1✔
2372

2373
    def validate_readonly_change(self, where, new_readonly_state, changed_by, release=None, transaction=None, dryrun=False, signoffs=None):
1✔
2374
        if not release:
1✔
2375
            release = self.select(where=where, columns=[self.name, self.product, self.read_only], transaction=transaction)[0]
1✔
2376

2377
        product = release["product"]
1✔
2378

2379
        if new_readonly_state:
1✔
2380
            if not self.db.hasPermission(changed_by, "release_read_only", "set", product, transaction):
1✔
2381
                raise PermissionDeniedError(f"{changed_by} is not allowed to mark {product} products read only")
1✔
2382
        else:
2383
            if not self.db.hasPermission(changed_by, "release_read_only", "unset", product, transaction):
1✔
2384
                raise PermissionDeniedError(f"{changed_by} is not allowed to mark {product} products read write")
1✔
2385

2386
        # if release is moving from ro->rw
2387
        if not dryrun and release["read_only"] and not new_readonly_state:
1✔
2388

2389
            def _map_required_signoffs(required_signoffs):
1✔
2390
                return [obj for v in required_signoffs for obj in v]
1✔
2391

2392
            # If release is associated with a rule, get the required signoffs for the rule's product/channel.
2393
            potential_required_signoffs = _map_required_signoffs(self.getPotentialRequiredSignoffs([release], transaction=transaction).values())
1✔
2394

2395
            # If no required signoffs is found, get the required signoffs considering all products/channels for the given release product.
2396
            if not potential_required_signoffs:
1✔
2397
                potential_required_signoffs = _map_required_signoffs(
1✔
2398
                    self.getPotentialRequiredSignoffsForProduct(release["product"], transaction=transaction).values()
2399
                )
2400
            verify_signoffs(potential_required_signoffs, signoffs)
1✔
2401

2402
    def change_readonly(self, where, is_readonly, changed_by, old_data_version, transaction=None):
1✔
2403
        self.validate_readonly_change(where, is_readonly, changed_by, transaction=transaction)
1✔
2404
        super().update(where, {"read_only": is_readonly}, changed_by=changed_by, old_data_version=old_data_version, transaction=transaction)
1✔
2405

2406

2407
class ReleasesJSON(AUSTable):
1✔
2408
    def __init__(self, db, metadata, dialect, history_buckets, historyClass):
1✔
2409
        self.domainAllowlist = []
1✔
2410

2411
        self.table = Table(
1✔
2412
            "releases_json",
2413
            metadata,
2414
            Column("name", String(100), primary_key=True),
2415
            Column("product", String(15), nullable=False),
2416
            Column("read_only", Boolean, default=False),
2417
            Column("data", JSON),
2418
        )
2419
        historyKwargs = {}
1✔
2420
        if history_buckets:
1✔
2421
            historyKwargs["buckets"] = history_buckets
1✔
2422
            historyKwargs["identifier_columns"] = ["name"]
1✔
2423
            historyKwargs["data_column"] = "data"
1✔
2424
        else:
2425
            # Can't have history without a bucket
2426
            historyClass = None
1✔
2427
        super(ReleasesJSON, self).__init__(
1✔
2428
            db,
2429
            dialect,
2430
            scheduled_changes=True,
2431
            scheduled_changes_kwargs={"conditions": ["time"]},
2432
            historyClass=historyClass,
2433
            historyKwargs=historyKwargs,
2434
        )
2435

2436
    def getPotentialRequiredSignoffs(self, affected_rows, transaction=None):
1✔
2437
        potential_required_signoffs = defaultdict(list)
1✔
2438

2439
        for release in affected_rows:
1✔
2440
            stmt = select([self.db.rules.rule_id, self.db.rules.product, self.db.rules.channel]).where(
1✔
2441
                ((self.db.releases_json.name == self.db.rules.mapping) | (self.db.releases_json.name == self.db.rules.fallbackMapping))
2442
                & (self.db.releases_json.name == release["name"])
2443
            )
2444

2445
            if transaction:
1!
2446
                rule_info = transaction.execute(stmt).fetchall()
1✔
2447
            else:
2448
                rule_info = self.getEngine().execute(stmt).fetchall()
×
2449

2450
            rule_required_signoffs = self.db.rules.getPotentialRequiredSignoffs([dict(r) for r in rule_info], transaction)
1✔
2451

2452
            for rule in rule_info:
1✔
2453
                rs = rule_required_signoffs[(rule["product"], rule["channel"])]
1✔
2454
                potential_required_signoffs[release["name"]].extend(rs)
1✔
2455

2456
        return potential_required_signoffs
1✔
2457

2458
    def getPotentialRequiredSignoffsForProduct(self, product, transaction=None):
1✔
2459
        potential_required_signoffs = {"rs": []}
×
2460
        where = [self.db.productRequiredSignoffs.product == product]
×
2461
        product_rs = self.db.productRequiredSignoffs.select(where=where, transaction=transaction)
×
2462
        if product_rs:
×
2463
            role_map = defaultdict(list)
×
2464
            for rs in product_rs:
×
2465
                role_map[rs["role"]].append(rs)
×
2466
            signoffs_required = [max(signoffs, default=None, key=lambda k: k["signoffs_required"]) for signoffs in role_map.values()]
×
2467
            potential_required_signoffs["rs"] = signoffs_required
×
2468
        return potential_required_signoffs
×
2469

2470
    async def async_insert(self, changed_by, transaction=None, dryrun=False, signoffs=None, **columns):
1✔
2471
        if not dryrun:
1!
2472
            potential_required_signoffs = [obj for v in self.getPotentialRequiredSignoffs([columns], transaction=transaction).values() for obj in v]
1✔
2473
            verify_signoffs(potential_required_signoffs, signoffs)
1✔
2474

2475
        return await super(ReleasesJSON, self).async_insert(changed_by=changed_by, transaction=transaction, dryrun=dryrun, **columns)
1✔
2476

2477
    async def async_update(self, where, what, changed_by, old_data_version, transaction=None, dryrun=False, signoffs=None):
1✔
2478
        for row in self.select(where=where, transaction=transaction):
1✔
2479
            new_row = row.copy()
1✔
2480
            new_row.update(what)
1✔
2481
            is_readonly_change = row["data"] == new_row["data"] and "read_only" in what and row["read_only"] != what["read_only"]
1✔
2482

2483
            # Only do signoff checks when the data is being changed, or we're moving from read-only to read-write
2484
            if not is_readonly_change or what["read_only"] is False:
1✔
2485
                if not dryrun:
1!
2486
                    potential_required_signoffs = [
1✔
2487
                        obj for v in self.getPotentialRequiredSignoffs([row, new_row], transaction=transaction).values() for obj in v
2488
                    ]
2489
                    verify_signoffs(potential_required_signoffs, signoffs)
1✔
2490

2491
        return await super(ReleasesJSON, self).async_update(
1✔
2492
            where=where, what=what, changed_by=changed_by, old_data_version=old_data_version, transaction=transaction, dryrun=dryrun
2493
        )
2494

2495
    async def async_delete(self, where, changed_by=None, old_data_version=None, transaction=None, dryrun=False, signoffs=None):
1✔
2496
        if not dryrun:
1!
2497
            for row in self.select(where=where, transaction=transaction):
1✔
2498
                potential_required_signoffs = [obj for v in self.getPotentialRequiredSignoffs([row], transaction=transaction).values() for obj in v]
1✔
2499
                verify_signoffs(potential_required_signoffs, signoffs)
1✔
2500

2501
        return await super(ReleasesJSON, self).async_delete(
1✔
2502
            where=where, changed_by=changed_by, old_data_version=old_data_version, transaction=transaction, dryrun=dryrun
2503
        )
2504

2505

2506
class ReleaseAssets(AUSTable):
1✔
2507
    def __init__(self, db, metadata, dialect, history_buckets, historyClass):
1✔
2508
        self.table = Table(
1✔
2509
            "release_assets",
2510
            metadata,
2511
            Column("name", String(100), primary_key=True),
2512
            Column("path", String(200), primary_key=True),
2513
            Column("data", JSON),
2514
        )
2515
        historyKwargs = {}
1✔
2516
        if history_buckets:
1✔
2517
            historyKwargs["buckets"] = history_buckets
1✔
2518
            historyKwargs["identifier_columns"] = ["name", "path"]
1✔
2519
            historyKwargs["data_column"] = "data"
1✔
2520
        else:
2521
            # Can't have history without a bucket
2522
            historyClass = None
1✔
2523

2524
        super(ReleaseAssets, self).__init__(
1✔
2525
            db, dialect, scheduled_changes=True, scheduled_changes_kwargs={"conditions": ["time"]}, historyClass=historyClass, historyKwargs=historyKwargs
2526
        )
2527

2528
    def getPotentialRequiredSignoffs(self, affected_rows, transaction=None):
1✔
2529
        potential_required_signoffs = defaultdict(list)
1✔
2530

2531
        for release in affected_rows:
1✔
2532
            stmt = select([self.db.rules.rule_id, self.db.rules.product, self.db.rules.channel]).where(
1✔
2533
                ((self.db.release_assets.name == self.db.rules.mapping) | (self.db.release_assets.name == self.db.rules.fallbackMapping))
2534
                & (self.db.release_assets.name == release["name"])
2535
                & (self.db.release_assets.path == release["path"])
2536
            )
2537

2538
            if transaction:
1!
2539
                rule_info = transaction.execute(stmt).fetchall()
1✔
2540
            else:
2541
                rule_info = self.getEngine().execute(stmt).fetchall()
×
2542

2543
            rule_required_signoffs = self.db.rules.getPotentialRequiredSignoffs([dict(r) for r in rule_info], transaction)
1✔
2544

2545
            for rule in rule_info:
1✔
2546
                rs = rule_required_signoffs[(rule["product"], rule["channel"])]
1✔
2547
                potential_required_signoffs[(release["name"], release["path"])].extend(rs)
1✔
2548

2549
        return potential_required_signoffs
1✔
2550

2551
    async def async_insert(self, changed_by, transaction=None, dryrun=False, signoffs=None, **columns):
1✔
2552
        if not dryrun:
1!
2553
            potential_required_signoffs = [obj for v in self.getPotentialRequiredSignoffs([columns], transaction=transaction).values() for obj in v]
1✔
2554
            verify_signoffs(potential_required_signoffs, signoffs)
1✔
2555

2556
        return await super(ReleaseAssets, self).async_insert(changed_by=changed_by, transaction=transaction, dryrun=dryrun, **columns)
1✔
2557

2558
    async def async_update(self, where, what, changed_by, old_data_version, transaction=None, dryrun=False, signoffs=None):
1✔
2559
        for row in self.select(where=where, transaction=transaction):
1✔
2560
            new_row = row.copy()
1✔
2561
            new_row.update(what)
1✔
2562

2563
            if not dryrun:
1!
2564
                potential_required_signoffs = [obj for v in self.getPotentialRequiredSignoffs([row, new_row], transaction=transaction).values() for obj in v]
1✔
2565
                verify_signoffs(potential_required_signoffs, signoffs)
1✔
2566

2567
        return await super(ReleaseAssets, self).async_update(
1✔
2568
            where=where, what=what, changed_by=changed_by, old_data_version=old_data_version, transaction=transaction, dryrun=dryrun
2569
        )
2570

2571
    async def async_delete(self, where, changed_by=None, old_data_version=None, transaction=None, dryrun=False, signoffs=None):
1✔
2572
        if not dryrun:
1!
2573
            for row in self.select(where=where, transaction=transaction):
1✔
2574
                potential_required_signoffs = [obj for v in self.getPotentialRequiredSignoffs([row], transaction=transaction).values() for obj in v]
1✔
2575
                verify_signoffs(potential_required_signoffs, signoffs)
1✔
2576

2577
        return await super(ReleaseAssets, self).async_delete(
1✔
2578
            where=where, changed_by=changed_by, old_data_version=old_data_version, transaction=transaction, dryrun=dryrun
2579
        )
2580

2581

2582
class UserRoles(AUSTable):
1✔
2583
    def __init__(self, db, metadata, dialect):
1✔
2584
        self.table = Table("user_roles", metadata, Column("username", String(100), primary_key=True), Column("role", String(50), primary_key=True))
1✔
2585
        super(UserRoles, self).__init__(db, dialect, historyClass=HistoryTable)
1✔
2586

2587
    def update(self, where, what, changed_by, old_data_version, transaction=None, dryrun=False):
1✔
2588
        raise AttributeError("User roles cannot be modified (only granted and revoked)")
1✔
2589

2590

2591
class Permissions(AUSTable):
1✔
2592
    """allPermissions defines the structure and possible options for all
2593
    available permissions. Permissions can be limited to specific types
2594
    of actions. Eg: granting the "rule" permission with "actions" set to
2595
    ["create"] allows rules to be created but not modified or deleted.
2596
    Permissions that relate to rules or releases can be further limited
2597
    by product. Eg: granting the "release" permission with "products" set
2598
    to ["GMP"] allows the user to modify GMP releases, but not Firefox."""
2599

2600
    allPermissions = {
1✔
2601
        "admin": ["products"],
2602
        "emergency_shutoff": ["actions", "products"],
2603
        "release": ["actions", "products"],
2604
        "release_locale": ["actions", "products"],
2605
        "release_read_only": ["actions", "products"],
2606
        "rule": ["actions", "products"],
2607
        "pinnable_release": ["products"],
2608
        "permission": ["actions"],
2609
        "required_signoff": ["products"],
2610
        "scheduled_change": ["actions"],
2611
    }
2612

2613
    def __init__(self, db, metadata, dialect):
1✔
2614
        self.table = Table(
1✔
2615
            "permissions",
2616
            metadata,
2617
            Column("permission", String(50), primary_key=True),
2618
            Column("username", String(100), primary_key=True),
2619
            Column("options", JSONColumn),
2620
        )
2621
        self.user_roles = UserRoles(db, metadata, dialect)
1✔
2622
        AUSTable.__init__(self, db, dialect, scheduled_changes=True, scheduled_changes_kwargs={"conditions": ["time"]}, historyClass=HistoryTable)
1✔
2623

2624
    def getPotentialRequiredSignoffs(self, affected_rows, transaction=None):
1✔
2625
        potential_required_signoffs = {"rs": []}
1✔
2626
        for row in affected_rows:
1✔
2627
            if not row:
1!
2628
                continue
×
2629
            # XXX: This kindof sucks because it means that we don't have great control
2630
            # over the signoffs required permissions that don't specify products, or
2631
            # don't support them.
2632
            if "products" in self.allPermissions[row["permission"]] and row.get("options") and row["options"].get("products"):
1✔
2633
                for product in row["options"]["products"]:
1✔
2634
                    potential_required_signoffs["rs"].extend(self.db.permissionsRequiredSignoffs.select(where={"product": product}, transaction=transaction))
1✔
2635
            else:
2636
                potential_required_signoffs["rs"].extend(self.db.permissionsRequiredSignoffs.select(transaction=transaction))
1✔
2637
        return potential_required_signoffs
1✔
2638

2639
    def assertPermissionExists(self, permission):
1✔
2640
        if permission not in self.allPermissions.keys():
1✔
2641
            raise ValueError('Unknown permission "%s"' % permission)
1✔
2642

2643
    def assertOptionsExist(self, permission, options):
1✔
2644
        for opt in options:
1✔
2645
            if opt not in self.allPermissions[permission]:
1✔
2646
                raise ValueError('Unknown option "%s" for permission "%s"' % (opt, permission))
1✔
2647

2648
    def getAllUsers(self, transaction=None):
1✔
2649
        res_users = self.select(columns=[self.username], distinct=True, transaction=transaction)
1✔
2650
        users_list = list([r["username"] for r in res_users])
1✔
2651
        users = {}
1✔
2652
        for user in users_list:
1✔
2653
            res_roles = self.user_roles.select(
1✔
2654
                where=[self.user_roles.username == user], columns=[self.user_roles.role, self.user_roles.data_version], transaction=transaction
2655
            )
2656
            users[user] = {"roles": res_roles}
1✔
2657
        return users
1✔
2658

2659
    def getAllPermissions(self, retrieving_as, transaction=None):
1✔
2660
        if not self.hasPermission(retrieving_as, "permission", "view", transaction=transaction):
1!
2661
            raise PermissionDeniedError("You are not authorized to view permissions of other users.")
×
2662

2663
        ret = defaultdict(dict)
1✔
2664
        for r in self.select(transaction=transaction):
1✔
2665
            ret[r["username"]][r["permission"]] = {
1✔
2666
                "options": r["options"],
2667
                "data_version": r["data_version"],
2668
            }
2669
        return ret
1✔
2670

2671
    def countAllUsers(self, transaction=None):
1✔
2672
        res = self.select(columns=[self.username], distinct=True, transaction=transaction)
1✔
2673
        return len(res)
1✔
2674

2675
    def insert(self, changed_by, transaction=None, dryrun=False, signoffs=None, **columns):
1✔
2676
        if "permission" not in columns or "username" not in columns:
1!
2677
            raise ValueError("permission and username are required")
×
2678

2679
        self.assertPermissionExists(columns["permission"])
1✔
2680
        if columns.get("options"):
1✔
2681
            self.assertOptionsExist(columns["permission"], columns["options"])
1✔
2682

2683
        if not self.db.hasPermission(changed_by, "permission", "create", transaction=transaction):
1!
2684
            raise PermissionDeniedError("%s is not allowed to grant permissions" % changed_by)
×
2685

2686
        if not dryrun:
1✔
2687
            potential_required_signoffs = [obj for v in self.getPotentialRequiredSignoffs([columns], transaction=transaction).values() for obj in v]
1✔
2688
            verify_signoffs(potential_required_signoffs, signoffs)
1✔
2689

2690
        self.log.debug("granting %s to %s with options %s", columns["permission"], columns["username"], columns.get("options"))
1✔
2691
        super(Permissions, self).insert(changed_by=changed_by, transaction=transaction, dryrun=dryrun, **columns)
1✔
2692
        cache.invalidate("users", "usernames")
1✔
2693
        self.log.debug("successfully granted %s to %s with options %s", columns["permission"], columns["username"], columns.get("options"))
1✔
2694

2695
    def grantRole(self, username, role, changed_by, transaction=None):
1✔
2696
        if not self.hasPermission(changed_by, "permission", "create", transaction=transaction):
1✔
2697
            raise PermissionDeniedError("%s is not allowed to grant user roles" % changed_by)
1✔
2698

2699
        if len(self.getUserPermissions(username, changed_by, transaction)) < 1:
1✔
2700
            raise ValueError("Cannot grant a role to a user without any permissions")
1✔
2701

2702
        self.log.debug("granting {} role to {}".format(role, username))
1✔
2703
        return self.user_roles.insert(changed_by, transaction, username=username, role=role)
1✔
2704

2705
    def update(self, where, what, changed_by, old_data_version, transaction=None, dryrun=False, signoffs=None):
1✔
2706
        if "permission" in what:
1✔
2707
            self.assertPermissionExists(what["permission"])
1✔
2708

2709
        for current_permission in self.select(where=where, transaction=transaction):
1✔
2710
            if what.get("options"):
1✔
2711
                self.assertOptionsExist(what.get("permission", current_permission["permission"]), what["options"])
1✔
2712

2713
            if not self.db.hasPermission(changed_by, "permission", "modify", transaction=transaction):
1!
2714
                raise PermissionDeniedError("%s is not allowed to modify permissions" % changed_by)
×
2715

2716
            new_permission = current_permission.copy()
1✔
2717
            new_permission.update(what)
1✔
2718
            if not dryrun:
1✔
2719
                potential_required_signoffs = [
1✔
2720
                    obj for v in self.getPotentialRequiredSignoffs([current_permission, new_permission], transaction=transaction).values() for obj in v
2721
                ]
2722
                verify_signoffs(potential_required_signoffs, signoffs)
1✔
2723

2724
        super(Permissions, self).update(
1✔
2725
            where=where, what=what, changed_by=changed_by, old_data_version=old_data_version, transaction=transaction, dryrun=dryrun
2726
        )
2727

2728
    def delete(self, where, changed_by=None, old_data_version=None, transaction=None, dryrun=False, signoffs=None):
1✔
2729
        if not self.db.hasPermission(changed_by, "permission", "delete", transaction=transaction):
1!
2730
            raise PermissionDeniedError("%s is not allowed to revoke permissions", changed_by)
×
2731

2732
        usernames = set()
1✔
2733
        for current_permission in self.select(where=where, transaction=transaction):
1✔
2734
            usernames.add(current_permission["username"])
1✔
2735
            if not dryrun:
1✔
2736
                potential_required_signoffs = [
1✔
2737
                    obj for v in self.getPotentialRequiredSignoffs([current_permission], transaction=transaction).values() for obj in v
2738
                ]
2739
                verify_signoffs(potential_required_signoffs, signoffs)
1✔
2740

2741
        if not dryrun:
1✔
2742
            super(Permissions, self).delete(changed_by=changed_by, where=where, old_data_version=old_data_version, transaction=transaction)
1✔
2743

2744
            for u in usernames:
1✔
2745
                if len(self.getUserPermissions(u, changed_by, transaction)) == 0:
1✔
2746
                    for role in self.user_roles.select([self.user_roles.username == u], transaction=transaction):
1✔
2747
                        self.revokeRole(u, role["role"], changed_by=changed_by, old_data_version=role["data_version"], transaction=transaction)
1✔
2748

2749
        cache.invalidate("users", "usernames")
1✔
2750

2751
    def revokeRole(self, username, role, changed_by=None, old_data_version=None, transaction=None):
1✔
2752
        if not self.hasPermission(changed_by, "permission", "delete", transaction=transaction):
1✔
2753
            raise PermissionDeniedError("%s is not allowed to revoke user roles", changed_by)
1✔
2754

2755
        role_signoffs = self.db.permissionsRequiredSignoffs.select(where={"role": role}, transaction=transaction)
1✔
2756
        role_signoffs += self.db.productRequiredSignoffs.select(where={"role": role}, transaction=transaction)
1✔
2757
        if role_signoffs:
1✔
2758
            required = max([rs["signoffs_required"] for rs in role_signoffs])
1✔
2759
            users_with_role = len(self.user_roles.select(where={"role": role}, transaction=transaction))
1✔
2760
            if required > (users_with_role - 1):
1✔
2761
                raise ValueError("Revoking {} role would make it impossible for Required Signoffs to be fulfilled".format(role))
1✔
2762

2763
        return self.user_roles.delete({"username": username, "role": role}, changed_by=changed_by, old_data_version=old_data_version, transaction=transaction)
1✔
2764

2765
    def getPermission(self, username, permission, transaction=None):
1✔
2766
        try:
1✔
2767
            return self.select(where=[self.username == username, self.permission == permission], transaction=transaction)[0]
1✔
2768
        except IndexError:
1✔
2769
            return {}
1✔
2770

2771
    def getUserPermissions(self, username, retrieving_as, transaction=None):
1✔
2772
        # If the user is retrieving permissions other than their own, we need
2773
        # to make sure they have enough access to do so. If any user is able
2774
        # to retrieve permissions of anyone, it may make privilege escalation
2775
        # attacks easier.
2776
        if username != retrieving_as and not self.hasPermission(retrieving_as, "permission", "view", transaction=transaction):
1✔
2777
            raise PermissionDeniedError("You are not authorized to view permissions of other users.")
1✔
2778

2779
        rows = self.select(columns=[self.permission, self.options, self.data_version], where=[self.username == username], transaction=transaction)
1✔
2780
        ret = dict()
1✔
2781
        for row in rows:
1✔
2782
            ret[row["permission"]] = {"options": row["options"], "data_version": row["data_version"]}
1✔
2783
        return ret
1✔
2784

2785
    def getOptions(self, username, permission, transaction=None):
1✔
2786
        ret = self.select(columns=[self.options], where=[self.username == username, self.permission == permission], transaction=transaction)
1✔
2787
        if ret:
1✔
2788
            return ret[0]["options"]
1✔
2789
        else:
2790
            raise ValueError('Permission "%s" doesn\'t exist' % permission)
1✔
2791

2792
    def getUserRoles(self, username, transaction=None):
1✔
2793
        res = self.user_roles.select(
1✔
2794
            where=[self.user_roles.username == username], columns=[self.user_roles.role, self.user_roles.data_version], distinct=True, transaction=transaction
2795
        )
2796
        return [{"role": r["role"], "data_version": r["data_version"]} for r in res]
1✔
2797

2798
    def isAdmin(self, username, transaction=None):
1✔
2799
        return bool(self.getPermission(username, "admin", transaction))
1✔
2800

2801
    def hasPermission(self, username, thing, action, product=None, transaction=None):
1✔
2802
        perm = self.getPermission(username, "admin", transaction=transaction)
1✔
2803
        if perm:
1✔
2804
            options = perm["options"]
1✔
2805
            if options and options.get("products") and product not in options["products"]:
1✔
2806
                # Supporting product-wise admin permissions. If there are no options
2807
                # with admin, we assume that the user has admin access over all
2808
                # products.
2809
                return False
1✔
2810
            return True
1✔
2811

2812
        perm = self.getPermission(username, thing, transaction=transaction)
1✔
2813
        if perm:
1✔
2814
            options = perm["options"]
1✔
2815
            if options:
1✔
2816
                # If a user has a permission that doesn't explicitly limit the type of
2817
                # actions they can perform, they are allowed to do any type of action.
2818
                if options.get("actions") and action not in options["actions"]:
1✔
2819
                    return False
1✔
2820
                # Similarly, permissions without products specified grant that
2821
                # permission without any limitation on the product.
2822
                if options.get("products") and product not in options["products"]:
1✔
2823
                    return False
1✔
2824
            return True
1✔
2825

2826
        return False
1✔
2827

2828
    def hasRole(self, username, role, transaction=None):
1✔
2829
        roles_list = [r["role"] for r in self.getUserRoles(username, transaction)]
1✔
2830
        return role in roles_list
1✔
2831

2832
    def isKnownUser(self, username):
1✔
2833
        if not username:
1✔
2834
            return False
1✔
2835

2836
        cache_column = "username"
1✔
2837

2838
        def user_getter():
1✔
2839
            permissions = self.select(columns=[cache_column], distinct=True)
1✔
2840
            return [permission[cache_column] for permission in permissions]
1✔
2841

2842
        usernames = cache.get("users", "usernames", value_getter=user_getter)
1✔
2843
        return username in usernames
1✔
2844

2845

2846
class Dockerflow(AUSTable):
1✔
2847
    def __init__(self, db, metadata, dialect):
1✔
2848
        self.table = Table("dockerflow", metadata, Column("watchdog", Integer, nullable=False))
1✔
2849
        AUSTable.__init__(self, db, dialect, historyClass=None, versioned=False)
1✔
2850

2851
    def getDockerflowEntry(self, transaction=None):
1✔
2852
        return self.select(transaction=transaction)[0]
1✔
2853

2854
    def incrementWatchdogValue(self, changed_by, transaction=None, dryrun=False):
1✔
2855
        try:
1✔
2856
            value = self.getDockerflowEntry()
1✔
2857
            where = [self.watchdog == value["watchdog"]]
1✔
2858
            value["watchdog"] += 1
1✔
2859
        except IndexError:
1✔
2860
            value = {"watchdog": 1}
1✔
2861
            where = None
1✔
2862

2863
        self._putWatchdogValue(changed_by=changed_by, value=value, where=where, transaction=transaction, dryrun=dryrun)
1✔
2864

2865
        return value["watchdog"]
1✔
2866

2867
    def _putWatchdogValue(self, changed_by, value, where=None, transaction=None, dryrun=False):
1✔
2868
        if where is None:
1✔
2869
            super(Dockerflow, self).insert(changed_by=changed_by, transaction=transaction, dryrun=dryrun, watchdog=value["watchdog"])
1✔
2870
        else:
2871
            super(Dockerflow, self).update(where=where, what=value, changed_by=changed_by, transaction=transaction, dryrun=dryrun)
1✔
2872

2873

2874
class EmergencyShutoffs(AUSTable):
1✔
2875
    def __init__(self, db, metadata, dialect):
1✔
2876
        self.table = Table(
1✔
2877
            "emergency_shutoffs",
2878
            metadata,
2879
            Column("product", String(15), nullable=False, primary_key=True),
2880
            Column("channel", String(75), nullable=False, primary_key=True),
2881
            Column("comment", String(500)),
2882
        )
2883
        AUSTable.__init__(self, db, dialect, scheduled_changes=True, scheduled_changes_kwargs={"conditions": ["time"]}, historyClass=HistoryTable)
1✔
2884

2885
    def insert(self, changed_by, transaction=None, dryrun=False, **columns):
1✔
2886
        if not self.db.hasPermission(changed_by, "emergency_shutoff", "create", columns.get("product"), transaction):
1✔
2887
            raise PermissionDeniedError("{} is not allowed to shut off updates for product {}".format(changed_by, columns.get("product")))
1✔
2888

2889
        ret = super(EmergencyShutoffs, self).insert(changed_by=changed_by, transaction=transaction, dryrun=dryrun, **columns)
1✔
2890
        if not dryrun:
1!
2891
            return ret.last_inserted_params()
1✔
2892

2893
    def getPotentialRequiredSignoffs(self, affected_rows, transaction=None):
1✔
2894
        potential_required_signoffs = {"rs": []}
1✔
2895
        row = affected_rows[-1]
1✔
2896
        where = {"product": row["product"]}
1✔
2897
        for rs in self.db.productRequiredSignoffs.select(where=where, transaction=transaction):
1✔
2898
            if not row.get("channel") or matchRegex(row["channel"], rs["channel"]):
1✔
2899
                potential_required_signoffs["rs"].append(rs)
1✔
2900
        return potential_required_signoffs
1✔
2901

2902
    def delete(self, where, changed_by=None, old_data_version=None, transaction=None, dryrun=False, signoffs=None):
1✔
2903
        product = self.select(where=where, columns=[self.product], transaction=transaction)[0]["product"]
1✔
2904
        if not self.db.hasPermission(changed_by, "emergency_shutoff", "delete", product, transaction):
1✔
2905
            raise PermissionDeniedError("%s is not allowed to delete shutoffs for product %s" % (changed_by, product))
1✔
2906

2907
        if not dryrun:
1✔
2908
            for current_rule in self.select(where=where, transaction=transaction):
1✔
2909
                potential_required_signoffs = [obj for v in self.getPotentialRequiredSignoffs([current_rule], transaction=transaction).values() for obj in v]
1✔
2910
                verify_signoffs(potential_required_signoffs, signoffs)
1✔
2911

2912
        super(EmergencyShutoffs, self).delete(changed_by=changed_by, where=where, old_data_version=old_data_version, transaction=transaction, dryrun=dryrun)
1✔
2913

2914

2915
class PinnableReleasesTable(AUSTable):
1✔
2916
    def __init__(self, db, metadata, dialect):
1✔
2917
        self.table = Table(
1✔
2918
            "pinnable_releases",
2919
            metadata,
2920
            Column("product", String(15), nullable=False, primary_key=True),
2921
            Column("version", String(75), nullable=False, primary_key=True),
2922
            Column("channel", String(75), nullable=False, primary_key=True),
2923
            Column("mapping", String(100), nullable=False),
2924
        )
2925
        AUSTable.__init__(self, db, dialect, scheduled_changes=True, scheduled_changes_kwargs={"conditions": ["time"]}, historyClass=HistoryTable)
1✔
2926

2927
    def getPotentialRequiredSignoffs(self, affected_rows, transaction=None):
1✔
2928
        # Implementing this is required to schedule changes to this table
2929
        return {}
1✔
2930

2931
    def insert(self, changed_by, transaction=None, dryrun=False, **columns):
1✔
2932
        if not self.db.hasPermission(changed_by, "pinnable_release", "create", columns.get("product"), transaction):
1!
2933
            raise PermissionDeniedError("{} is not allowed to create pinnable releases for product {}".format(changed_by, columns.get("product")))
×
2934

2935
        ret = super(PinnableReleasesTable, self).insert(changed_by=changed_by, transaction=transaction, dryrun=dryrun, **columns)
1✔
2936
        if not dryrun:
1✔
2937
            return ret.last_inserted_params()
1✔
2938

2939
    def update(self, where, what, changed_by, old_data_version, transaction=None, dryrun=False, signoffs=None):
1✔
2940
        product = self.select(where=where, columns=[self.product], transaction=transaction)[0]["product"]
1✔
2941
        if not self.db.hasPermission(changed_by, "pinnable_release", "modify", product, transaction):
1!
2942
            raise PermissionDeniedError("{} is not allowed to modify pinnable releases for product {}".format(changed_by, product))
×
2943

2944
        ret = super(PinnableReleasesTable, self).update(
1✔
2945
            where=where,
2946
            what=what,
2947
            changed_by=changed_by,
2948
            old_data_version=old_data_version,
2949
            transaction=transaction,
2950
            dryrun=dryrun,
2951
        )
2952
        if not dryrun:
1✔
2953
            return ret.last_updated_params()
1✔
2954

2955
    def delete(self, where, changed_by=None, old_data_version=None, transaction=None, dryrun=False, signoffs=None):
1✔
2956
        product = self.select(where=where, columns=[self.product], transaction=transaction)[0]["product"]
1✔
2957
        if not self.db.hasPermission(changed_by, "pinnable_release", "delete", product, transaction):
1!
2958
            raise PermissionDeniedError("{} is not allowed to delete pinnable releases for product {}".format(changed_by, product))
×
2959

2960
        super(PinnableReleasesTable, self).delete(changed_by=changed_by, where=where, old_data_version=old_data_version, transaction=transaction, dryrun=dryrun)
1✔
2961

2962
    def getPinRow(self, product, channel, version, transaction=None):
1✔
2963
        rows = self.select(
1✔
2964
            where=[self.product == product, self.channel == channel, self.version == version],
2965
            columns=[self.mapping, self.data_version],
2966
            transaction=transaction,
2967
        )
2968
        if len(rows) == 0:
1✔
2969
            return None
1✔
2970
        return rows[0]
1✔
2971

2972
    def mappingHasPin(self, mapping, transaction=None):
1✔
2973
        return self.count(where=[self.mapping == mapping], transaction=transaction) > 0
1✔
2974

2975
    def getPinMapping(self, product, channel, version, transaction=None):
1✔
2976
        rows = self.select(where=[self.product == product, self.channel == channel, self.version == version], columns=[self.mapping], transaction=transaction)
1✔
2977
        if len(rows) == 0:
1✔
2978
            return None
1✔
2979
        return rows[0]["mapping"]
1✔
2980

2981

2982
# A helper that sets sql_mode. This should only be used with MySQL, and
2983
# lets us put the database in a stricter mode that will disallow things like
2984
# automatic data truncation.
2985
# From http://www.enricozini.org/2012/tips/sa-sqlmode-traditional/
2986
def my_on_connect(dbapi_con, connection_record):
1✔
2987
    cur = dbapi_con.cursor()
×
2988
    cur.execute("SET SESSION sql_mode='TRADITIONAL'")
×
2989

2990

2991
class AUSDatabase(object):
1✔
2992
    engine = None
1✔
2993
    migrate_repo = path.join(path.dirname(__file__), "migrate")
1✔
2994

2995
    def __init__(
1✔
2996
        self,
2997
        dburi=None,
2998
        mysql_traditional_mode=False,
2999
        releases_history_buckets=None,
3000
        releases_history_class=GCSHistory,
3001
        async_releases_history_class=GCSHistoryAsync,
3002
    ):
3003
        """Create a new AUSDatabase. Before this object is useful, dburi must be
3004
        set, either through the constructor or setDburi()"""
3005
        if dburi:
1!
3006
            self.setDburi(dburi, mysql_traditional_mode, releases_history_buckets, releases_history_class, async_releases_history_class)
1✔
3007
        self.log = logging.getLogger(self.__class__.__name__)
1✔
3008
        self.systemAccounts = []
1✔
3009

3010
    def setDburi(
1✔
3011
        self,
3012
        dburi,
3013
        mysql_traditional_mode=False,
3014
        releases_history_buckets=None,
3015
        releases_history_class=GCSHistory,
3016
        async_releases_history_class=GCSHistoryAsync,
3017
    ):
3018
        """Setup the database connection. Note that SQLAlchemy only opens a connection
3019
        to the database when it needs to, however."""
3020
        if self.engine:
1✔
3021
            raise AlreadySetupError()
1✔
3022
        self.dburi = dburi
1✔
3023
        self.metadata = MetaData()
1✔
3024
        engine_kwargs = {"pool_recycle": 60}
1✔
3025
        if dburi.startswith("sqlite"):
1!
3026
            # connexion 3.x runs the flask app in a thread, so we need to share the db connection
3027
            from sqlalchemy.pool import StaticPool
1✔
3028

3029
            engine_kwargs.update({"poolclass": StaticPool, "connect_args": {"check_same_thread": False}})
1✔
3030
        self.engine = create_engine(self.dburi, **engine_kwargs)
1✔
3031
        if mysql_traditional_mode and "mysql" in dburi:
1!
3032
            sqlalchemy.event.listen(self.engine, "connect", my_on_connect)
×
3033
        dialect = self.engine.name
1✔
3034
        self.rulesTable = Rules(self, self.metadata, dialect)
1✔
3035
        self.releasesTable = Releases(self, self.metadata, dialect, releases_history_buckets, releases_history_class)
1✔
3036
        self.releasesJSONTable = ReleasesJSON(self, self.metadata, dialect, releases_history_buckets, async_releases_history_class)
1✔
3037
        self.releaseAssetsTable = ReleaseAssets(self, self.metadata, dialect, releases_history_buckets, async_releases_history_class)
1✔
3038
        self.permissionsTable = Permissions(self, self.metadata, dialect)
1✔
3039
        self.dockerflowTable = Dockerflow(self, self.metadata, dialect)
1✔
3040
        self.productRequiredSignoffsTable = ProductRequiredSignoffsTable(self, self.metadata, dialect)
1✔
3041
        self.permissionsRequiredSignoffsTable = PermissionsRequiredSignoffsTable(self, self.metadata, dialect)
1✔
3042
        self.emergencyShutoffsTable = EmergencyShutoffs(self, self.metadata, dialect)
1✔
3043
        self.pinnableReleasesTable = PinnableReleasesTable(self, self.metadata, dialect)
1✔
3044
        self.metadata.bind = self.engine
1✔
3045

3046
    def setSystemAccounts(self, systemAccounts):
1✔
3047
        self.systemAccounts = systemAccounts
1✔
3048

3049
    def setDomainAllowlist(self, domainAllowlist):
1✔
3050
        self.releasesTable.setDomainAllowlist(domainAllowlist)
1✔
3051

3052
    def isKnownUser(self, username):
1✔
3053
        return self.permissions.isKnownUser(username)
×
3054

3055
    def isAdmin(self, *args, **kwargs):
1✔
3056
        return self.permissions.isAdmin(*args, **kwargs)
1✔
3057

3058
    def hasPermission(self, *args, **kwargs):
1✔
3059
        return self.permissions.hasPermission(*args, **kwargs)
1✔
3060

3061
    def hasRole(self, *args, **kwargs):
1✔
3062
        return self.permissions.hasRole(*args, **kwargs)
1✔
3063

3064
    def getUserRoles(self, *args, **kwargs):
1✔
3065
        return self.permissions.getUserRoles(*args, **kwargs)
1✔
3066

3067
    def create(self, version=None):
1✔
3068
        # Migrate's "create" merely declares a database to be under its control,
3069
        # it doesn't actually create tables or upgrade it. So we need to call it
3070
        # and then do the upgrade to get to the state we want. We also have to
3071
        # tell create that we're creating at version 0 of the database, otherwise
3072
        # upgrade will do nothing!
3073
        migrate.versioning.schema.ControlledSchema.create(self.engine, self.migrate_repo, 0)
1✔
3074
        self.upgrade(version)
1✔
3075

3076
    def upgrade(self, version=None):
1✔
3077
        # This method was taken from Buildbot:
3078
        # https://github.com/buildbot/buildbot/blob/87108ec4088dc7fd5394ac3c1d0bd3b465300d92/master/buildbot/db/model.py#L455
3079
        # http://code.google.com/p/sqlalchemy-migrate/issues/detail?id=100
3080
        # means we cannot use the migrate.versioning.api module.  So these
3081
        # methods perform similar wrapping functions to what is done by the API
3082
        # functions, but without disposing of the engine.
3083
        schema = migrate.versioning.schema.ControlledSchema(self.engine, self.migrate_repo)
1✔
3084
        changeset = schema.changeset(version)
1✔
3085
        for step, change in changeset:
1✔
3086
            self.log.debug("migrating schema version %s -> %d" % (step, step + 1))
1✔
3087
            schema.runchange(step, change, 1)
1✔
3088

3089
    def downgrade(self, version):
1✔
3090
        if version < 21:
1!
3091
            raise ValueError("Cannot downgrade below version 21")
×
3092
        schema = migrate.versioning.schema.ControlledSchema(self.engine, self.migrate_repo)
1✔
3093
        changeset = schema.changeset(version)
1✔
3094
        for step, change in changeset:
1✔
3095
            self.log.debug("migrating schema version %s -> %d" % (step, step - 1))
1✔
3096
            schema.runchange(step, change, -1)
1✔
3097

3098
    def reset(self):
1✔
3099
        self.engine = None
1✔
3100
        self.metadata.bind = None
1✔
3101

3102
    def begin(self):
1✔
3103
        return AUSTransaction(self.engine)
1✔
3104

3105
    @property
1✔
3106
    def rules(self):
1✔
3107
        return self.rulesTable
1✔
3108

3109
    @property
1✔
3110
    def releases(self):
1✔
3111
        return self.releasesTable
1✔
3112

3113
    @property
1✔
3114
    def releases_json(self):
1✔
3115
        return self.releasesJSONTable
1✔
3116

3117
    @property
1✔
3118
    def release_assets(self):
1✔
3119
        return self.releaseAssetsTable
1✔
3120

3121
    @property
1✔
3122
    def permissions(self):
1✔
3123
        return self.permissionsTable
1✔
3124

3125
    @property
1✔
3126
    def productRequiredSignoffs(self):
1✔
3127
        return self.productRequiredSignoffsTable
1✔
3128

3129
    @property
1✔
3130
    def permissionsRequiredSignoffs(self):
1✔
3131
        return self.permissionsRequiredSignoffsTable
1✔
3132

3133
    @property
1✔
3134
    def dockerflow(self):
1✔
3135
        return self.dockerflowTable
1✔
3136

3137
    @property
1✔
3138
    def emergencyShutoffs(self):
1✔
3139
        return self.emergencyShutoffsTable
1✔
3140

3141
    @property
1✔
3142
    def pinnable_releases(self):
1✔
3143
        return self.pinnableReleasesTable
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc