Coveralls logob
Coveralls logo
  • Home
  • Features
  • Pricing
  • Docs
  • Sign In

matrix-org / synapse / 4532

23 Sep 2019 - 19:39 coverage decreased (-49.7%) to 17.596%
4532

Pull #6079

buildkite

Richard van der Hoff
update changelog
Pull Request #6079: Add submit_url response parameter to msisdn /requestToken

359 of 12986 branches covered (2.76%)

Branch coverage included in aggregate %.

0 of 7 new or added lines in 1 file covered. (0.0%)

18869 existing lines in 281 files now uncovered.

8809 of 39116 relevant lines covered (22.52%)

0.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

13.62
/synapse/storage/event_push_actions.py
1
# -*- coding: utf-8 -*-
2
# Copyright 2015 OpenMarket Ltd
3
# Copyright 2018 New Vector Ltd
4
#
5
# Licensed under the Apache License, Version 2.0 (the "License");
6
# you may not use this file except in compliance with the License.
7
# You may obtain a copy of the License at
8
#
9
#     http://www.apache.org/licenses/LICENSE-2.0
10
#
11
# Unless required by applicable law or agreed to in writing, software
12
# distributed under the License is distributed on an "AS IS" BASIS,
13
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
# See the License for the specific language governing permissions and
15
# limitations under the License.
16

17
import logging
1×
18

19
from six import iteritems
1×
20

21
from canonicaljson import json
1×
22

23
from twisted.internet import defer
1×
24

25
from synapse.metrics.background_process_metrics import run_as_background_process
1×
26
from synapse.storage._base import LoggingTransaction, SQLBaseStore
1×
27
from synapse.util.caches.descriptors import cachedInlineCallbacks
1×
28

29
logger = logging.getLogger(__name__)
1×
30

31

32
DEFAULT_NOTIF_ACTION = ["notify", {"set_tweak": "highlight", "value": False}]
1×
33
DEFAULT_HIGHLIGHT_ACTION = [
1×
34
    "notify",
35
    {"set_tweak": "sound", "value": "default"},
36
    {"set_tweak": "highlight"},
37
]
38

39

40
def _serialize_action(actions, is_highlight):
1×
41
    """Custom serializer for actions. This allows us to "compress" common actions.
42

43
    We use the fact that most users have the same actions for notifs (and for
44
    highlights).
45
    We store these default actions as the empty string rather than the full JSON.
46
    Since the empty string isn't valid JSON there is no risk of this clashing with
47
    any real JSON actions
48
    """
UNCOV
49
    if is_highlight:
Branches [[0, 50], [0, 53]] missed. !
UNCOV
50
        if actions == DEFAULT_HIGHLIGHT_ACTION:
Branches [[0, 51], [0, 55]] missed. !
UNCOV
51
            return ""  # We use empty string as the column is non-NULL
!
52
    else:
UNCOV
53
        if actions == DEFAULT_NOTIF_ACTION:
Branches [[0, 54], [0, 55]] missed. !
UNCOV
54
            return ""
!
UNCOV
55
    return json.dumps(actions)
!
56

57

58
def _deserialize_action(actions, is_highlight):
1×
59
    """Custom deserializer for actions. This allows us to "compress" common actions
60
    """
UNCOV
61
    if actions:
Branches [[0, 62], [0, 64]] missed. !
UNCOV
62
        return json.loads(actions)
!
63

UNCOV
64
    if is_highlight:
Branches [[0, 65], [0, 67]] missed. !
UNCOV
65
        return DEFAULT_HIGHLIGHT_ACTION
!
66
    else:
UNCOV
67
        return DEFAULT_NOTIF_ACTION
!
68

69

70
class EventPushActionsWorkerStore(SQLBaseStore):
1×
71
    def __init__(self, db_conn, hs):
1×
UNCOV
72
        super(EventPushActionsWorkerStore, self).__init__(db_conn, hs)
!
73

74
        # These get correctly set by _find_stream_orderings_for_times_txn
UNCOV
75
        self.stream_ordering_month_ago = None
!
UNCOV
76
        self.stream_ordering_day_ago = None
!
77

UNCOV
78
        cur = LoggingTransaction(
!
79
            db_conn.cursor(),
80
            name="_find_stream_orderings_for_times_txn",
81
            database_engine=self.database_engine,
82
        )
UNCOV
83
        self._find_stream_orderings_for_times_txn(cur)
!
UNCOV
84
        cur.close()
!
85

UNCOV
86
        self.find_stream_orderings_looping_call = self._clock.looping_call(
!
87
            self._find_stream_orderings_for_times, 10 * 60 * 1000
88
        )
UNCOV
89
        self._rotate_delay = 3
!
UNCOV
90
        self._rotate_count = 10000
!
91

92
    @cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000)
1×
93
    def get_unread_event_push_actions_by_room_for_user(
94
        self, room_id, user_id, last_read_event_id
95
    ):
UNCOV
96
        ret = yield self.runInteraction(
!
97
            "get_unread_event_push_actions_by_room",
98
            self._get_unread_counts_by_receipt_txn,
99
            room_id,
100
            user_id,
101
            last_read_event_id,
102
        )
UNCOV
103
        return ret
!
104

105
    def _get_unread_counts_by_receipt_txn(
1×
106
        self, txn, room_id, user_id, last_read_event_id
107
    ):
UNCOV
108
        sql = (
!
109
            "SELECT stream_ordering"
110
            " FROM events"
111
            " WHERE room_id = ? AND event_id = ?"
112
        )
UNCOV
113
        txn.execute(sql, (room_id, last_read_event_id))
!
UNCOV
114
        results = txn.fetchall()
!
UNCOV
115
        if len(results) == 0:
Branches [[0, 116], [0, 118]] missed. !
116
            return {"notify_count": 0, "highlight_count": 0}
!
117

UNCOV
118
        stream_ordering = results[0][0]
!
119

UNCOV
120
        return self._get_unread_counts_by_pos_txn(
!
121
            txn, room_id, user_id, stream_ordering
122
        )
123

124
    def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, stream_ordering):
1×
125

126
        # First get number of notifications.
127
        # We don't need to put a notif=1 clause as all rows always have
128
        # notif=1
UNCOV
129
        sql = (
!
130
            "SELECT count(*)"
131
            " FROM event_push_actions ea"
132
            " WHERE"
133
            " user_id = ?"
134
            " AND room_id = ?"
135
            " AND stream_ordering > ?"
136
        )
137

UNCOV
138
        txn.execute(sql, (user_id, room_id, stream_ordering))
!
UNCOV
139
        row = txn.fetchone()
!
UNCOV
140
        notify_count = row[0] if row else 0
!
141

UNCOV
142
        txn.execute(
!
143
            """
144
            SELECT notif_count FROM event_push_summary
145
            WHERE room_id = ? AND user_id = ? AND stream_ordering > ?
146
        """,
147
            (room_id, user_id, stream_ordering),
148
        )
UNCOV
149
        rows = txn.fetchall()
!
UNCOV
150
        if rows:
Branches [[0, 151], [0, 154]] missed. !
151
            notify_count += rows[0][0]
!
152

153
        # Now get the number of highlights
UNCOV
154
        sql = (
!
155
            "SELECT count(*)"
156
            " FROM event_push_actions ea"
157
            " WHERE"
158
            " highlight = 1"
159
            " AND user_id = ?"
160
            " AND room_id = ?"
161
            " AND stream_ordering > ?"
162
        )
163

UNCOV
164
        txn.execute(sql, (user_id, room_id, stream_ordering))
!
UNCOV
165
        row = txn.fetchone()
!
UNCOV
166
        highlight_count = row[0] if row else 0
!
167

UNCOV
168
        return {"notify_count": notify_count, "highlight_count": highlight_count}
!
169

170
    @defer.inlineCallbacks
1×
171
    def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering):
UNCOV
172
        def f(txn):
!
UNCOV
173
            sql = (
!
174
                "SELECT DISTINCT(user_id) FROM event_push_actions WHERE"
175
                " stream_ordering >= ? AND stream_ordering <= ?"
176
            )
UNCOV
177
            txn.execute(sql, (min_stream_ordering, max_stream_ordering))
!
UNCOV
178
            return [r[0] for r in txn]
Branches [[0, 178], [0, 172]] missed. !
179

UNCOV
180
        ret = yield self.runInteraction("get_push_action_users_in_range", f)
!
UNCOV
181
        return ret
!
182

183
    @defer.inlineCallbacks
1×
184
    def get_unread_push_actions_for_user_in_range_for_http(
1×
185
        self, user_id, min_stream_ordering, max_stream_ordering, limit=20
186
    ):
187
        """Get a list of the most recent unread push actions for a given user,
188
        within the given stream ordering range. Called by the httppusher.
189

190
        Args:
191
            user_id (str): The user to fetch push actions for.
192
            min_stream_ordering(int): The exclusive lower bound on the
193
                stream ordering of event push actions to fetch.
194
            max_stream_ordering(int): The inclusive upper bound on the
195
                stream ordering of event push actions to fetch.
196
            limit (int): The maximum number of rows to return.
197
        Returns:
198
            A promise which resolves to a list of dicts with the keys "event_id",
199
            "room_id", "stream_ordering", "actions".
200
            The list will be ordered by ascending stream_ordering.
201
            The list will have between 0~limit entries.
202
        """
203
        # find rooms that have a read receipt in them and return the next
204
        # push actions
UNCOV
205
        def get_after_receipt(txn):
!
206
            # find rooms that have a read receipt in them and return the next
207
            # push actions
UNCOV
208
            sql = (
!
209
                "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
210
                "   ep.highlight "
211
                " FROM ("
212
                "   SELECT room_id,"
213
                "       MAX(stream_ordering) as stream_ordering"
214
                "   FROM events"
215
                "   INNER JOIN receipts_linearized USING (room_id, event_id)"
216
                "   WHERE receipt_type = 'm.read' AND user_id = ?"
217
                "   GROUP BY room_id"
218
                ") AS rl,"
219
                " event_push_actions AS ep"
220
                " WHERE"
221
                "   ep.room_id = rl.room_id"
222
                "   AND ep.stream_ordering > rl.stream_ordering"
223
                "   AND ep.user_id = ?"
224
                "   AND ep.stream_ordering > ?"
225
                "   AND ep.stream_ordering <= ?"
226
                " ORDER BY ep.stream_ordering ASC LIMIT ?"
227
            )
UNCOV
228
            args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
!
UNCOV
229
            txn.execute(sql, args)
!
UNCOV
230
            return txn.fetchall()
!
231

UNCOV
232
        after_read_receipt = yield self.runInteraction(
!
233
            "get_unread_push_actions_for_user_in_range_http_arr", get_after_receipt
234
        )
235

236
        # There are rooms with push actions in them but you don't have a read receipt in
237
        # them e.g. rooms you've been invited to, so get push actions for rooms which do
238
        # not have read receipts in them too.
UNCOV
239
        def get_no_receipt(txn):
!
UNCOV
240
            sql = (
!
241
                "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
242
                "   ep.highlight "
243
                " FROM event_push_actions AS ep"
244
                " INNER JOIN events AS e USING (room_id, event_id)"
245
                " WHERE"
246
                "   ep.room_id NOT IN ("
247
                "     SELECT room_id FROM receipts_linearized"
248
                "       WHERE receipt_type = 'm.read' AND user_id = ?"
249
                "       GROUP BY room_id"
250
                "   )"
251
                "   AND ep.user_id = ?"
252
                "   AND ep.stream_ordering > ?"
253
                "   AND ep.stream_ordering <= ?"
254
                " ORDER BY ep.stream_ordering ASC LIMIT ?"
255
            )
UNCOV
256
            args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
!
UNCOV
257
            txn.execute(sql, args)
!
UNCOV
258
            return txn.fetchall()
!
259

UNCOV
260
        no_read_receipt = yield self.runInteraction(
!
261
            "get_unread_push_actions_for_user_in_range_http_nrr", get_no_receipt
262
        )
263

UNCOV
264
        notifs = [
Branches [[0, 265], [0, 278]] missed. !
265
            {
266
                "event_id": row[0],
267
                "room_id": row[1],
268
                "stream_ordering": row[2],
269
                "actions": _deserialize_action(row[3], row[4]),
270
            }
271
            for row in after_read_receipt + no_read_receipt
272
        ]
273

274
        # Now sort it so it's ordered correctly, since currently it will
275
        # contain results from the first query, correctly ordered, followed
276
        # by results from the second query, but we want them all ordered
277
        # by stream_ordering, oldest first.
UNCOV
278
        notifs.sort(key=lambda r: r["stream_ordering"])
Branches [[0, 278], [0, 282]] missed. !
279

280
        # Take only up to the limit. We have to stop at the limit because
281
        # one of the subqueries may have hit the limit.
UNCOV
282
        return notifs[:limit]
!
283

284
    @defer.inlineCallbacks
1×
285
    def get_unread_push_actions_for_user_in_range_for_email(
1×
286
        self, user_id, min_stream_ordering, max_stream_ordering, limit=20
287
    ):
288
        """Get a list of the most recent unread push actions for a given user,
289
        within the given stream ordering range. Called by the emailpusher
290

291
        Args:
292
            user_id (str): The user to fetch push actions for.
293
            min_stream_ordering(int): The exclusive lower bound on the
294
                stream ordering of event push actions to fetch.
295
            max_stream_ordering(int): The inclusive upper bound on the
296
                stream ordering of event push actions to fetch.
297
            limit (int): The maximum number of rows to return.
298
        Returns:
299
            A promise which resolves to a list of dicts with the keys "event_id",
300
            "room_id", "stream_ordering", "actions", "received_ts".
301
            The list will be ordered by descending received_ts.
302
            The list will have between 0~limit entries.
303
        """
304
        # find rooms that have a read receipt in them and return the most recent
305
        # push actions
306
        def get_after_receipt(txn):
!
307
            sql = (
!
308
                "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
309
                "  ep.highlight, e.received_ts"
310
                " FROM ("
311
                "   SELECT room_id,"
312
                "       MAX(stream_ordering) as stream_ordering"
313
                "   FROM events"
314
                "   INNER JOIN receipts_linearized USING (room_id, event_id)"
315
                "   WHERE receipt_type = 'm.read' AND user_id = ?"
316
                "   GROUP BY room_id"
317
                ") AS rl,"
318
                " event_push_actions AS ep"
319
                " INNER JOIN events AS e USING (room_id, event_id)"
320
                " WHERE"
321
                "   ep.room_id = rl.room_id"
322
                "   AND ep.stream_ordering > rl.stream_ordering"
323
                "   AND ep.user_id = ?"
324
                "   AND ep.stream_ordering > ?"
325
                "   AND ep.stream_ordering <= ?"
326
                " ORDER BY ep.stream_ordering DESC LIMIT ?"
327
            )
328
            args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
!
329
            txn.execute(sql, args)
!
330
            return txn.fetchall()
!
331

332
        after_read_receipt = yield self.runInteraction(
!
333
            "get_unread_push_actions_for_user_in_range_email_arr", get_after_receipt
334
        )
335

336
        # There are rooms with push actions in them but you don't have a read receipt in
337
        # them e.g. rooms you've been invited to, so get push actions for rooms which do
338
        # not have read receipts in them too.
339
        def get_no_receipt(txn):
!
340
            sql = (
!
341
                "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
342
                "   ep.highlight, e.received_ts"
343
                " FROM event_push_actions AS ep"
344
                " INNER JOIN events AS e USING (room_id, event_id)"
345
                " WHERE"
346
                "   ep.room_id NOT IN ("
347
                "     SELECT room_id FROM receipts_linearized"
348
                "       WHERE receipt_type = 'm.read' AND user_id = ?"
349
                "       GROUP BY room_id"
350
                "   )"
351
                "   AND ep.user_id = ?"
352
                "   AND ep.stream_ordering > ?"
353
                "   AND ep.stream_ordering <= ?"
354
                " ORDER BY ep.stream_ordering DESC LIMIT ?"
355
            )
356
            args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
!
357
            txn.execute(sql, args)
!
358
            return txn.fetchall()
!
359

360
        no_read_receipt = yield self.runInteraction(
!
361
            "get_unread_push_actions_for_user_in_range_email_nrr", get_no_receipt
362
        )
363

364
        # Make a list of dicts from the two sets of results.
365
        notifs = [
Branches [[0, 366], [0, 380]] missed. !
366
            {
367
                "event_id": row[0],
368
                "room_id": row[1],
369
                "stream_ordering": row[2],
370
                "actions": _deserialize_action(row[3], row[4]),
371
                "received_ts": row[5],
372
            }
373
            for row in after_read_receipt + no_read_receipt
374
        ]
375

376
        # Now sort it so it's ordered correctly, since currently it will
377
        # contain results from the first query, correctly ordered, followed
378
        # by results from the second query, but we want them all ordered
379
        # by received_ts (most recent first)
380
        notifs.sort(key=lambda r: -(r["received_ts"] or 0))
Branches [[0, 380], [0, 383]] missed. !
381

382
        # Now return the first `limit`
383
        return notifs[:limit]
!
384

385
    def get_if_maybe_push_in_range_for_user(self, user_id, min_stream_ordering):
1×
386
        """A fast check to see if there might be something to push for the
387
        user since the given stream ordering. May return false positives.
388

389
        Useful to know whether to bother starting a pusher on start up or not.
390

391
        Args:
392
            user_id (str)
393
            min_stream_ordering (int)
394

395
        Returns:
396
            Deferred[bool]: True if there may be push to process, False if
397
            there definitely isn't.
398
        """
399

UNCOV
400
        def _get_if_maybe_push_in_range_for_user_txn(txn):
!
UNCOV
401
            sql = """
!
402
                SELECT 1 FROM event_push_actions
403
                WHERE user_id = ? AND stream_ordering > ?
404
                LIMIT 1
405
            """
406

UNCOV
407
            txn.execute(sql, (user_id, min_stream_ordering))
!
UNCOV
408
            return bool(txn.fetchone())
!
409

UNCOV
410
        return self.runInteraction(
!
411
            "get_if_maybe_push_in_range_for_user",
412
            _get_if_maybe_push_in_range_for_user_txn,
413
        )
414

415
    def add_push_actions_to_staging(self, event_id, user_id_actions):
1×
416
        """Add the push actions for the event to the push action staging area.
417

418
        Args:
419
            event_id (str)
420
            user_id_actions (dict[str, list[dict|str])]): A dictionary mapping
421
                user_id to list of push actions, where an action can either be
422
                a string or dict.
423

424
        Returns:
425
            Deferred
426
        """
427

UNCOV
428
        if not user_id_actions:
Branches [[0, 429], [0, 433]] missed. !
UNCOV
429
            return
!
430

431
        # This is a helper function for generating the necessary tuple that
432
        # can be used to inert into the `event_push_actions_staging` table.
UNCOV
433
        def _gen_entry(user_id, actions):
!
UNCOV
434
            is_highlight = 1 if _action_has_highlight(actions) else 0
!
UNCOV
435
            return (
!
436
                event_id,  # event_id column
437
                user_id,  # user_id column
438
                _serialize_action(actions, is_highlight),  # actions column
439
                1,  # notif column
440
                is_highlight,  # highlight column
441
            )
442

UNCOV
443
        def _add_push_actions_to_staging_txn(txn):
!
444
            # We don't use _simple_insert_many here to avoid the overhead
445
            # of generating lists of dicts.
446

UNCOV
447
            sql = """
!
448
                INSERT INTO event_push_actions_staging
449
                    (event_id, user_id, actions, notif, highlight)
450
                VALUES (?, ?, ?, ?, ?)
451
            """
452

UNCOV
453
            txn.executemany(
Branches [[0, 456], [0, 443]] missed. !
454
                sql,
455
                (
456
                    _gen_entry(user_id, actions)
457
                    for user_id, actions in iteritems(user_id_actions)
458
                ),
459
            )
460

UNCOV
461
        return self.runInteraction(
!
462
            "add_push_actions_to_staging", _add_push_actions_to_staging_txn
463
        )
464

465
    @defer.inlineCallbacks
1×
466
    def remove_push_actions_from_staging(self, event_id):
467
        """Called if we failed to persist the event to ensure that stale push
468
        actions don't build up in the DB
469

470
        Args:
471
            event_id (str)
472
        """
473

UNCOV
474
        try:
!
UNCOV
475
            res = yield self._simple_delete(
!
476
                table="event_push_actions_staging",
477
                keyvalues={"event_id": event_id},
478
                desc="remove_push_actions_from_staging",
479
            )
UNCOV
480
            return res
!
481
        except Exception:
!
482
            # this method is called from an exception handler, so propagating
483
            # another exception here really isn't helpful - there's nothing
484
            # the caller can do about it. Just log the exception and move on.
485
            logger.exception(
!
486
                "Error removing push actions after event persistence failure"
487
            )
488

489
    def _find_stream_orderings_for_times(self):
1×
490
        return run_as_background_process(
!
491
            "event_push_action_stream_orderings",
492
            self.runInteraction,
493
            "_find_stream_orderings_for_times",
494
            self._find_stream_orderings_for_times_txn,
495
        )
496

497
    def _find_stream_orderings_for_times_txn(self, txn):
1×
UNCOV
498
        logger.info("Searching for stream ordering 1 month ago")
!
UNCOV
499
        self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn(
!
500
            txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000
501
        )
UNCOV
502
        logger.info(
!
503
            "Found stream ordering 1 month ago: it's %d", self.stream_ordering_month_ago
504
        )
UNCOV
505
        logger.info("Searching for stream ordering 1 day ago")
!
UNCOV
506
        self.stream_ordering_day_ago = self._find_first_stream_ordering_after_ts_txn(
!
507
            txn, self._clock.time_msec() - 24 * 60 * 60 * 1000
508
        )
UNCOV
509
        logger.info(
!
510
            "Found stream ordering 1 day ago: it's %d", self.stream_ordering_day_ago
511
        )
512

513
    def find_first_stream_ordering_after_ts(self, ts):
1×
514
        """Gets the stream ordering corresponding to a given timestamp.
515

516
        Specifically, finds the stream_ordering of the first event that was
517
        received on or after the timestamp. This is done by a binary search on
518
        the events table, since there is no index on received_ts, so is
519
        relatively slow.
520

521
        Args:
522
            ts (int): timestamp in millis
523

524
        Returns:
525
            Deferred[int]: stream ordering of the first event received on/after
526
                the timestamp
527
        """
UNCOV
528
        return self.runInteraction(
!
529
            "_find_first_stream_ordering_after_ts_txn",
530
            self._find_first_stream_ordering_after_ts_txn,
531
            ts,
532
        )
533

534
    @staticmethod
1×
535
    def _find_first_stream_ordering_after_ts_txn(txn, ts):
536
        """
537
        Find the stream_ordering of the first event that was received on or
538
        after a given timestamp. This is relatively slow as there is no index
539
        on received_ts but we can then use this to delete push actions before
540
        this.
541

542
        received_ts must necessarily be in the same order as stream_ordering
543
        and stream_ordering is indexed, so we manually binary search using
544
        stream_ordering
545

546
        Args:
547
            txn (twisted.enterprise.adbapi.Transaction):
548
            ts (int): timestamp to search for
549

550
        Returns:
551
            int: stream ordering
552
        """
UNCOV
553
        txn.execute("SELECT MAX(stream_ordering) FROM events")
!
UNCOV
554
        max_stream_ordering = txn.fetchone()[0]
!
555

UNCOV
556
        if max_stream_ordering is None:
Branches [[0, 557], [0, 566]] missed. !
UNCOV
557
            return 0
!
558

559
        # We want the first stream_ordering in which received_ts is greater
560
        # than or equal to ts. Call this point X.
561
        #
562
        # We maintain the invariants:
563
        #
564
        #   range_start <= X <= range_end
565
        #
UNCOV
566
        range_start = 0
!
UNCOV
567
        range_end = max_stream_ordering + 1
!
568

569
        # Given a stream_ordering, look up the timestamp at that
570
        # stream_ordering.
571
        #
572
        # The array may be sparse (we may be missing some stream_orderings).
573
        # We treat the gaps as the same as having the same value as the
574
        # preceding entry, because we will pick the lowest stream_ordering
575
        # which satisfies our requirement of received_ts >= ts.
576
        #
577
        # For example, if our array of events indexed by stream_ordering is
578
        # [10, <none>, 20], we should treat this as being equivalent to
579
        # [10, 10, 20].
580
        #
UNCOV
581
        sql = (
!
582
            "SELECT received_ts FROM events"
583
            " WHERE stream_ordering <= ?"
584
            " ORDER BY stream_ordering DESC"
585
            " LIMIT 1"
586
        )
587

UNCOV
588
        while range_end - range_start > 0:
Branches [[0, 589], [0, 608]] missed. !
UNCOV
589
            middle = (range_end + range_start) // 2
!
UNCOV
590
            txn.execute(sql, (middle,))
!
UNCOV
591
            row = txn.fetchone()
!
UNCOV
592
            if row is None:
Branches [[0, 594], [0, 597]] missed. !
593
                # no rows with stream_ordering<=middle
UNCOV
594
                range_start = middle + 1
!
UNCOV
595
                continue
!
596

UNCOV
597
            middle_ts = row[0]
!
UNCOV
598
            if ts > middle_ts:
Branches [[0, 601], [0, 606]] missed. !
599
                # we got a timestamp lower than the one we were looking for.
600
                # definitely need to look higher: X > middle.
UNCOV
601
                range_start = middle + 1
!
602
            else:
603
                # we got a timestamp higher than (or the same as) the one we
604
                # were looking for. We aren't yet sure about the point we
605
                # looked up, but we can be sure that X <= middle.
UNCOV
606
                range_end = middle
!
607

UNCOV
608
        return range_end
!
609

610

611
class EventPushActionsStore(EventPushActionsWorkerStore):
1×
612
    EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
1×
613

614
    def __init__(self, db_conn, hs):
1×
UNCOV
615
        super(EventPushActionsStore, self).__init__(db_conn, hs)
!
616

UNCOV
617
        self.register_background_index_update(
!
618
            self.EPA_HIGHLIGHT_INDEX,
619
            index_name="event_push_actions_u_highlight",
620
            table="event_push_actions",
621
            columns=["user_id", "stream_ordering"],
622
        )
623

UNCOV
624
        self.register_background_index_update(
!
625
            "event_push_actions_highlights_index",
626
            index_name="event_push_actions_highlights_index",
627
            table="event_push_actions",
628
            columns=["user_id", "room_id", "topological_ordering", "stream_ordering"],
629
            where_clause="highlight=1",
630
        )
631

UNCOV
632
        self._doing_notif_rotation = False
!
UNCOV
633
        self._rotate_notif_loop = self._clock.looping_call(
!
634
            self._start_rotate_notifs, 30 * 60 * 1000
635
        )
636

637
    def _set_push_actions_for_event_and_users_txn(
1×
638
        self, txn, events_and_contexts, all_events_and_contexts
639
    ):
640
        """Handles moving push actions from staging table to main
641
        event_push_actions table for all events in `events_and_contexts`.
642

643
        Also ensures that all events in `all_events_and_contexts` are removed
644
        from the push action staging area.
645

646
        Args:
647
            events_and_contexts (list[(EventBase, EventContext)]): events
648
                we are persisting
649
            all_events_and_contexts (list[(EventBase, EventContext)]): all
650
                events that we were going to persist. This includes events
651
                we've already persisted, etc, that wouldn't appear in
652
                events_and_context.
653
        """
654

UNCOV
655
        sql = """
!
656
            INSERT INTO event_push_actions (
657
                room_id, event_id, user_id, actions, stream_ordering,
658
                topological_ordering, notif, highlight
659
            )
660
            SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight
661
            FROM event_push_actions_staging
662
            WHERE event_id = ?
663
        """
664

UNCOV
665
        if events_and_contexts:
Branches [[0, 666], [0, 679]] missed. !
UNCOV
666
            txn.executemany(
Branches [[0, 669], [0, 679]] missed. !
667
                sql,
668
                (
669
                    (
670
                        event.room_id,
671
                        event.internal_metadata.stream_ordering,
672
                        event.depth,
673
                        event.event_id,
674
                    )
675
                    for event, _ in events_and_contexts
676
                ),
677
            )
678

UNCOV
679
        for event, _ in events_and_contexts:
Branches [[0, 680], [0, 695]] missed. !
UNCOV
680
            user_ids = self._simple_select_onecol_txn(
!
681
                txn,
682
                table="event_push_actions_staging",
683
                keyvalues={"event_id": event.event_id},
684
                retcol="user_id",
685
            )
686

UNCOV
687
            for uid in user_ids:
Branches [[0, 679], [0, 688]] missed. !
UNCOV
688
                txn.call_after(
!
689
                    self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
690
                    (event.room_id, uid),
691
                )
692

693
        # Now we delete the staging area for *all* events that were being
694
        # persisted.
UNCOV
695
        txn.executemany(
Branches [[0, 697], [0, 637]] missed. !
696
            "DELETE FROM event_push_actions_staging WHERE event_id = ?",
697
            ((event.event_id,) for event, _ in all_events_and_contexts),
698
        )
699

700
    @defer.inlineCallbacks
1×
701
    def get_push_actions_for_user(
1×
702
        self, user_id, before=None, limit=50, only_highlight=False
703
    ):
UNCOV
704
        def f(txn):
!
UNCOV
705
            before_clause = ""
!
UNCOV
706
            if before:
Branches [[0, 707], [0, 710]] missed. !
707
                before_clause = "AND epa.stream_ordering < ?"
!
708
                args = [user_id, before, limit]
!
709
            else:
UNCOV
710
                args = [user_id, limit]
!
711

UNCOV
712
            if only_highlight:
Branches [[0, 713], [0, 719]] missed. !
713
                if len(before_clause) > 0:
Branches [[0, 714], [0, 715]] missed. !
714
                    before_clause += " "
!
715
                before_clause += "AND epa.highlight = 1"
!
716

717
            # NB. This assumes event_ids are globally unique since
718
            # it makes the query easier to index
UNCOV
719
            sql = (
!
720
                "SELECT epa.event_id, epa.room_id,"
721
                " epa.stream_ordering, epa.topological_ordering,"
722
                " epa.actions, epa.highlight, epa.profile_tag, e.received_ts"
723
                " FROM event_push_actions epa, events e"
724
                " WHERE epa.event_id = e.event_id"
725
                " AND epa.user_id = ? %s"
726
                " ORDER BY epa.stream_ordering DESC"
727
                " LIMIT ?" % (before_clause,)
728
            )
UNCOV
729
            txn.execute(sql, args)
!
UNCOV
730
            return self.cursor_to_dict(txn)
!
731

UNCOV
732
        push_actions = yield self.runInteraction("get_push_actions_for_user", f)
!
UNCOV
733
        for pa in push_actions:
Branches [[0, 734], [0, 735]] missed. !
UNCOV
734
            pa["actions"] = _deserialize_action(pa["actions"], pa["highlight"])
!
UNCOV
735
        return push_actions
!
736

737
    @defer.inlineCallbacks
1×
738
    def get_time_of_last_push_action_before(self, stream_ordering):
739
        def f(txn):
!
740
            sql = (
!
741
                "SELECT e.received_ts"
742
                " FROM event_push_actions AS ep"
743
                " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id"
744
                " WHERE ep.stream_ordering > ?"
745
                " ORDER BY ep.stream_ordering ASC"
746
                " LIMIT 1"
747
            )
748
            txn.execute(sql, (stream_ordering,))
!
749
            return txn.fetchone()
!
750

751
        result = yield self.runInteraction("get_time_of_last_push_action_before", f)
!
752
        return result[0] if result else None
!
753

754
    @defer.inlineCallbacks
1×
755
    def get_latest_push_action_stream_ordering(self):
UNCOV
756
        def f(txn):
!
UNCOV
757
            txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions")
!
UNCOV
758
            return txn.fetchone()
!
759

UNCOV
760
        result = yield self.runInteraction("get_latest_push_action_stream_ordering", f)
!
UNCOV
761
        return result[0] or 0
!
762

763
    def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id):
1×
764
        # Sad that we have to blow away the cache for the whole room here
UNCOV
765
        txn.call_after(
!
766
            self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
767
            (room_id,),
768
        )
UNCOV
769
        txn.execute(
!
770
            "DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?",
771
            (room_id, event_id),
772
        )
773

774
    def _remove_old_push_actions_before_txn(
1×
775
        self, txn, room_id, user_id, stream_ordering
776
    ):
777
        """
778
        Purges old push actions for a user and room before a given
779
        stream_ordering.
780

781
        We however keep a months worth of highlighted notifications, so that
782
        users can still get a list of recent highlights.
783

784
        Args:
785
            txn: The transcation
786
            room_id: Room ID to delete from
787
            user_id: user ID to delete for
788
            stream_ordering: The lowest stream ordering which will
789
                                  not be deleted.
790
        """
UNCOV
791
        txn.call_after(
!
792
            self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
793
            (room_id, user_id),
794
        )
795

796
        # We need to join on the events table to get the received_ts for
797
        # event_push_actions and sqlite won't let us use a join in a delete so
798
        # we can't just delete where received_ts < x. Furthermore we can
799
        # only identify event_push_actions by a tuple of room_id, event_id
800
        # we we can't use a subquery.
801
        # Instead, we look up the stream ordering for the last event in that
802
        # room received before the threshold time and delete event_push_actions
803
        # in the room with a stream_odering before that.
UNCOV
804
        txn.execute(
!
805
            "DELETE FROM event_push_actions "
806
            " WHERE user_id = ? AND room_id = ? AND "
807
            " stream_ordering <= ?"
808
            " AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)",
809
            (user_id, room_id, stream_ordering, self.stream_ordering_month_ago),
810
        )
811

UNCOV
812
        txn.execute(
!
813
            """
814
            DELETE FROM event_push_summary
815
            WHERE room_id = ? AND user_id = ? AND stream_ordering <= ?
816
        """,
817
            (room_id, user_id, stream_ordering),
818
        )
819

820
    def _start_rotate_notifs(self):
1×
821
        return run_as_background_process("rotate_notifs", self._rotate_notifs)
!
822

823
    @defer.inlineCallbacks
1×
824
    def _rotate_notifs(self):
825
        if self._doing_notif_rotation or self.stream_ordering_day_ago is None:
Branches [[0, 826], [0, 827]] missed. !
826
            return
!
827
        self._doing_notif_rotation = True
!
828

829
        try:
!
830
            while True:
!
831
                logger.info("Rotating notifications")
!
832

833
                caught_up = yield self.runInteraction(
!
834
                    "_rotate_notifs", self._rotate_notifs_txn
835
                )
836
                if caught_up:
Branches [[0, 837], [0, 838]] missed. !
837
                    break
!
838
                yield self.hs.get_clock().sleep(self._rotate_delay)
!
839
        finally:
840
            self._doing_notif_rotation = False
!
841

842
    def _rotate_notifs_txn(self, txn):
1×
843
        """Archives older notifications into event_push_summary. Returns whether
844
        the archiving process has caught up or not.
845
        """
846

847
        old_rotate_stream_ordering = self._simple_select_one_onecol_txn(
!
848
            txn,
849
            table="event_push_summary_stream_ordering",
850
            keyvalues={},
851
            retcol="stream_ordering",
852
        )
853

854
        # We don't to try and rotate millions of rows at once, so we cap the
855
        # maximum stream ordering we'll rotate before.
856
        txn.execute(
!
857
            """
858
            SELECT stream_ordering FROM event_push_actions
859
            WHERE stream_ordering > ?
860
            ORDER BY stream_ordering ASC LIMIT 1 OFFSET ?
861
        """,
862
            (old_rotate_stream_ordering, self._rotate_count),
863
        )
864
        stream_row = txn.fetchone()
!
865
        if stream_row:
Branches [[0, 866], [0, 872]] missed. !
866
            offset_stream_ordering, = stream_row
!
867
            rotate_to_stream_ordering = min(
!
868
                self.stream_ordering_day_ago, offset_stream_ordering
869
            )
870
            caught_up = offset_stream_ordering >= self.stream_ordering_day_ago
!
871
        else:
872
            rotate_to_stream_ordering = self.stream_ordering_day_ago
!
873
            caught_up = True
!
874

875
        logger.info("Rotating notifications up to: %s", rotate_to_stream_ordering)
!
876

877
        self._rotate_notifs_before_txn(txn, rotate_to_stream_ordering)
!
878

879
        # We have caught up iff we were limited by `stream_ordering_day_ago`
880
        return caught_up
!
881

882
    def _rotate_notifs_before_txn(self, txn, rotate_to_stream_ordering):
1×
883
        old_rotate_stream_ordering = self._simple_select_one_onecol_txn(
!
884
            txn,
885
            table="event_push_summary_stream_ordering",
886
            keyvalues={},
887
            retcol="stream_ordering",
888
        )
889

890
        # Calculate the new counts that should be upserted into event_push_summary
891
        sql = """
!
892
            SELECT user_id, room_id,
893
                coalesce(old.notif_count, 0) + upd.notif_count,
894
                upd.stream_ordering,
895
                old.user_id
896
            FROM (
897
                SELECT user_id, room_id, count(*) as notif_count,
898
                    max(stream_ordering) as stream_ordering
899
                FROM event_push_actions
900
                WHERE ? <= stream_ordering AND stream_ordering < ?
901
                    AND highlight = 0
902
                GROUP BY user_id, room_id
903
            ) AS upd
904
            LEFT JOIN event_push_summary AS old USING (user_id, room_id)
905
        """
906

907
        txn.execute(sql, (old_rotate_stream_ordering, rotate_to_stream_ordering))
!
908
        rows = txn.fetchall()
!
909

910
        logger.info("Rotating notifications, handling %d rows", len(rows))
!
911

912
        # If the `old.user_id` above is NULL then we know there isn't already an
913
        # entry in the table, so we simply insert it. Otherwise we update the
914
        # existing table.
915
        self._simple_insert_many_txn(
Branches [[0, 919], [0, 930]] missed. !
916
            txn,
917
            table="event_push_summary",
918
            values=[
919
                {
920
                    "user_id": row[0],
921
                    "room_id": row[1],
922
                    "notif_count": row[2],
923
                    "stream_ordering": row[3],
924
                }
925
                for row in rows
926
                if row[4] is None
927
            ],
928
        )
929

930
        txn.executemany(
Branches [[0, 935], [0, 938]] missed. !
931
            """
932
                UPDATE event_push_summary SET notif_count = ?, stream_ordering = ?
933
                WHERE user_id = ? AND room_id = ?
934
            """,
935
            ((row[2], row[3], row[0], row[1]) for row in rows if row[4] is not None),
936
        )
937

938
        txn.execute(
!
939
            "DELETE FROM event_push_actions"
940
            " WHERE ? <= stream_ordering AND stream_ordering < ? AND highlight = 0",
941
            (old_rotate_stream_ordering, rotate_to_stream_ordering),
942
        )
943

944
        logger.info("Rotating notifications, deleted %s push actions", txn.rowcount)
!
945

946
        txn.execute(
!
947
            "UPDATE event_push_summary_stream_ordering SET stream_ordering = ?",
948
            (rotate_to_stream_ordering,),
949
        )
950

951

952
def _action_has_highlight(actions):
1×
UNCOV
953
    for action in actions:
Branches [[0, 954], [0, 960]] missed. !
UNCOV
954
        try:
!
UNCOV
955
            if action.get("set_tweak", None) == "highlight":
Branches [[0, 953], [0, 956]] missed. !
UNCOV
956
                return action.get("value", True)
!
UNCOV
957
        except AttributeError:
!
UNCOV
958
            pass
!
959

UNCOV
960
    return False
!
Troubleshooting · Open an Issue · Sales · Support · ENTERPRISE · CAREERS · STATUS
BLOG · TWITTER · Legal & Privacy · Supported CI Services · What's a CI service? · Automated Testing

© 2019 Coveralls, LLC