Coveralls logob
Coveralls logo
  • Home
  • Features
  • Pricing
  • Docs
  • Sign In

matrix-org / synapse / 4532

23 Sep 2019 - 19:39 coverage decreased (-49.7%) to 17.596%
4532

Pull #6079

buildkite

Richard van der Hoff
update changelog
Pull Request #6079: Add submit_url response parameter to msisdn /requestToken

359 of 12986 branches covered (2.76%)

Branch coverage included in aggregate %.

0 of 7 new or added lines in 1 file covered. (0.0%)

18869 existing lines in 281 files now uncovered.

8809 of 39116 relevant lines covered (22.52%)

0.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

7.11
/synapse/storage/events_bg_updates.py
1
# -*- coding: utf-8 -*-
2
# Copyright 2019 The Matrix.org Foundation C.I.C.
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15

16
import logging
1×
17

18
from six import text_type
1×
19

20
from canonicaljson import json
1×
21

22
from twisted.internet import defer
1×
23

24
from synapse.storage.background_updates import BackgroundUpdateStore
1×
25

26
logger = logging.getLogger(__name__)
1×
27

28

29
class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
1×
30

31
    EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
1×
32
    EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
1×
33
    DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities"
1×
34

35
    def __init__(self, db_conn, hs):
1×
UNCOV
36
        super(EventsBackgroundUpdatesStore, self).__init__(db_conn, hs)
!
37

UNCOV
38
        self.register_background_update_handler(
!
39
            self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
40
        )
UNCOV
41
        self.register_background_update_handler(
!
42
            self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME,
43
            self._background_reindex_fields_sender,
44
        )
45

UNCOV
46
        self.register_background_index_update(
!
47
            "event_contains_url_index",
48
            index_name="event_contains_url_index",
49
            table="events",
50
            columns=["room_id", "topological_ordering", "stream_ordering"],
51
            where_clause="contains_url = true AND outlier = false",
52
        )
53

54
        # an event_id index on event_search is useful for the purge_history
55
        # api. Plus it means we get to enforce some integrity with a UNIQUE
56
        # clause
UNCOV
57
        self.register_background_index_update(
!
58
            "event_search_event_id_idx",
59
            index_name="event_search_event_id_idx",
60
            table="event_search",
61
            columns=["event_id"],
62
            unique=True,
63
            psql_only=True,
64
        )
65

UNCOV
66
        self.register_background_update_handler(
!
67
            self.DELETE_SOFT_FAILED_EXTREMITIES, self._cleanup_extremities_bg_update
68
        )
69

70
    @defer.inlineCallbacks
1×
71
    def _background_reindex_fields_sender(self, progress, batch_size):
72
        target_min_stream_id = progress["target_min_stream_id_inclusive"]
!
73
        max_stream_id = progress["max_stream_id_exclusive"]
!
74
        rows_inserted = progress.get("rows_inserted", 0)
!
75

76
        INSERT_CLUMP_SIZE = 1000
!
77

78
        def reindex_txn(txn):
!
79
            sql = (
!
80
                "SELECT stream_ordering, event_id, json FROM events"
81
                " INNER JOIN event_json USING (event_id)"
82
                " WHERE ? <= stream_ordering AND stream_ordering < ?"
83
                " ORDER BY stream_ordering DESC"
84
                " LIMIT ?"
85
            )
86

87
            txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
!
88

89
            rows = txn.fetchall()
!
90
            if not rows:
Branches [[0, 91], [0, 93]] missed. !
91
                return 0
!
92

93
            min_stream_id = rows[-1][0]
!
94

95
            update_rows = []
!
96
            for row in rows:
Branches [[0, 97], [0, 113]] missed. !
97
                try:
!
98
                    event_id = row[1]
!
99
                    event_json = json.loads(row[2])
!
100
                    sender = event_json["sender"]
!
101
                    content = event_json["content"]
!
102

103
                    contains_url = "url" in content
!
104
                    if contains_url:
Branches [[0, 105], [0, 111]] missed. !
105
                        contains_url &= isinstance(content["url"], text_type)
!
106
                except (KeyError, AttributeError):
!
107
                    # If the event is missing a necessary field then
108
                    # skip over it.
109
                    continue
!
110

111
                update_rows.append((sender, contains_url, event_id))
!
112

113
            sql = "UPDATE events SET sender = ?, contains_url = ? WHERE event_id = ?"
!
114

115
            for index in range(0, len(update_rows), INSERT_CLUMP_SIZE):
Branches [[0, 116], [0, 119]] missed. !
116
                clump = update_rows[index : index + INSERT_CLUMP_SIZE]
!
117
                txn.executemany(sql, clump)
!
118

119
            progress = {
!
120
                "target_min_stream_id_inclusive": target_min_stream_id,
121
                "max_stream_id_exclusive": min_stream_id,
122
                "rows_inserted": rows_inserted + len(rows),
123
            }
124

125
            self._background_update_progress_txn(
!
126
                txn, self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, progress
127
            )
128

129
            return len(rows)
!
130

131
        result = yield self.runInteraction(
!
132
            self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, reindex_txn
133
        )
134

135
        if not result:
Branches [[0, 136], [0, 138]] missed. !
136
            yield self._end_background_update(self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME)
!
137

138
        return result
!
139

140
    @defer.inlineCallbacks
1×
141
    def _background_reindex_origin_server_ts(self, progress, batch_size):
142
        target_min_stream_id = progress["target_min_stream_id_inclusive"]
!
143
        max_stream_id = progress["max_stream_id_exclusive"]
!
144
        rows_inserted = progress.get("rows_inserted", 0)
!
145

146
        INSERT_CLUMP_SIZE = 1000
!
147

148
        def reindex_search_txn(txn):
!
149
            sql = (
!
150
                "SELECT stream_ordering, event_id FROM events"
151
                " WHERE ? <= stream_ordering AND stream_ordering < ?"
152
                " ORDER BY stream_ordering DESC"
153
                " LIMIT ?"
154
            )
155

156
            txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
!
157

158
            rows = txn.fetchall()
!
159
            if not rows:
Branches [[0, 160], [0, 162]] missed. !
160
                return 0
!
161

162
            min_stream_id = rows[-1][0]
!
163
            event_ids = [row[1] for row in rows]
Branches [[0, 163], [0, 165]] missed. !
164

165
            rows_to_update = []
!
166

167
            chunks = [event_ids[i : i + 100] for i in range(0, len(event_ids), 100)]
Branches [[0, 167], [0, 168]] missed. !
168
            for chunk in chunks:
Branches [[0, 169], [0, 190]] missed. !
169
                ev_rows = self._simple_select_many_txn(
!
170
                    txn,
171
                    table="event_json",
172
                    column="event_id",
173
                    iterable=chunk,
174
                    retcols=["event_id", "json"],
175
                    keyvalues={},
176
                )
177

178
                for row in ev_rows:
Branches [[0, 168], [0, 179]] missed. !
179
                    event_id = row["event_id"]
!
180
                    event_json = json.loads(row["json"])
!
181
                    try:
!
182
                        origin_server_ts = event_json["origin_server_ts"]
!
183
                    except (KeyError, AttributeError):
!
184
                        # If the event is missing a necessary field then
185
                        # skip over it.
186
                        continue
!
187

188
                    rows_to_update.append((origin_server_ts, event_id))
!
189

190
            sql = "UPDATE events SET origin_server_ts = ? WHERE event_id = ?"
!
191

192
            for index in range(0, len(rows_to_update), INSERT_CLUMP_SIZE):
Branches [[0, 193], [0, 196]] missed. !
193
                clump = rows_to_update[index : index + INSERT_CLUMP_SIZE]
!
194
                txn.executemany(sql, clump)
!
195

196
            progress = {
!
197
                "target_min_stream_id_inclusive": target_min_stream_id,
198
                "max_stream_id_exclusive": min_stream_id,
199
                "rows_inserted": rows_inserted + len(rows_to_update),
200
            }
201

202
            self._background_update_progress_txn(
!
203
                txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress
204
            )
205

206
            return len(rows_to_update)
!
207

208
        result = yield self.runInteraction(
!
209
            self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn
210
        )
211

212
        if not result:
Branches [[0, 213], [0, 215]] missed. !
213
            yield self._end_background_update(self.EVENT_ORIGIN_SERVER_TS_NAME)
!
214

215
        return result
!
216

217
    @defer.inlineCallbacks
1×
218
    def _cleanup_extremities_bg_update(self, progress, batch_size):
219
        """Background update to clean out extremities that should have been
220
        deleted previously.
221

222
        Mainly used to deal with the aftermath of #5269.
223
        """
224

225
        # This works by first copying all existing forward extremities into the
226
        # `_extremities_to_check` table at start up, and then checking each
227
        # event in that table whether we have any descendants that are not
228
        # soft-failed/rejected. If that is the case then we delete that event
229
        # from the forward extremities table.
230
        #
231
        # For efficiency, we do this in batches by recursively pulling out all
232
        # descendants of a batch until we find the non soft-failed/rejected
233
        # events, i.e. the set of descendants whose chain of prev events back
234
        # to the batch of extremities are all soft-failed or rejected.
235
        # Typically, we won't find any such events as extremities will rarely
236
        # have any descendants, but if they do then we should delete those
237
        # extremities.
238

239
        def _cleanup_extremities_bg_update_txn(txn):
!
240
            # The set of extremity event IDs that we're checking this round
241
            original_set = set()
!
242

243
            # A dict[str, set[str]] of event ID to their prev events.
244
            graph = {}
!
245

246
            # The set of descendants of the original set that are not rejected
247
            # nor soft-failed. Ancestors of these events should be removed
248
            # from the forward extremities table.
249
            non_rejected_leaves = set()
!
250

251
            # Set of event IDs that have been soft failed, and for which we
252
            # should check if they have descendants which haven't been soft
253
            # failed.
254
            soft_failed_events_to_lookup = set()
!
255

256
            # First, we get `batch_size` events from the table, pulling out
257
            # their successor events, if any, and the successor events'
258
            # rejection status.
259
            txn.execute(
!
260
                """SELECT prev_event_id, event_id, internal_metadata,
261
                    rejections.event_id IS NOT NULL, events.outlier
262
                FROM (
263
                    SELECT event_id AS prev_event_id
264
                    FROM _extremities_to_check
265
                    LIMIT ?
266
                ) AS f
267
                LEFT JOIN event_edges USING (prev_event_id)
268
                LEFT JOIN events USING (event_id)
269
                LEFT JOIN event_json USING (event_id)
270
                LEFT JOIN rejections USING (event_id)
271
                """,
272
                (batch_size,),
273
            )
274

275
            for prev_event_id, event_id, metadata, rejected, outlier in txn:
Branches [[0, 276], [0, 297]] missed. !
276
                original_set.add(prev_event_id)
!
277

278
                if not event_id or outlier:
Branches [[0, 281], [0, 283]] missed. !
279
                    # Common case where the forward extremity doesn't have any
280
                    # descendants.
281
                    continue
!
282

283
                graph.setdefault(event_id, set()).add(prev_event_id)
!
284

285
                soft_failed = False
!
286
                if metadata:
Branches [[0, 287], [0, 289]] missed. !
287
                    soft_failed = json.loads(metadata).get("soft_failed")
!
288

289
                if soft_failed or rejected:
Branches [[0, 290], [0, 292]] missed. !
290
                    soft_failed_events_to_lookup.add(event_id)
!
291
                else:
292
                    non_rejected_leaves.add(event_id)
!
293

294
            # Now we recursively check all the soft-failed descendants we
295
            # found above in the same way, until we have nothing left to
296
            # check.
297
            while soft_failed_events_to_lookup:
Branches [[0, 300], [0, 336]] missed. !
298
                # We only want to do 100 at a time, so we split given list
299
                # into two.
300
                batch = list(soft_failed_events_to_lookup)
!
301
                to_check, to_defer = batch[:100], batch[100:]
!
302
                soft_failed_events_to_lookup = set(to_defer)
!
303

304
                sql = """SELECT prev_event_id, event_id, internal_metadata,
Branches [[0, 314], [0, 316]] missed. !
305
                    rejections.event_id IS NOT NULL
306
                    FROM event_edges
307
                    INNER JOIN events USING (event_id)
308
                    INNER JOIN event_json USING (event_id)
309
                    LEFT JOIN rejections USING (event_id)
310
                    WHERE
311
                        prev_event_id IN (%s)
312
                        AND NOT events.outlier
313
                """ % (
314
                    ",".join("?" for _ in to_check),
315
                )
316
                txn.execute(sql, to_check)
!
317

318
                for prev_event_id, event_id, metadata, rejected in txn:
Branches [[0, 297], [0, 319]] missed. !
319
                    if event_id in graph:
Branches [[0, 322], [0, 325]] missed. !
320
                        # Already handled this event previously, but we still
321
                        # want to record the edge.
322
                        graph[event_id].add(prev_event_id)
!
323
                        continue
!
324

325
                    graph[event_id] = {prev_event_id}
!
326

327
                    soft_failed = json.loads(metadata).get("soft_failed")
!
328
                    if soft_failed or rejected:
Branches [[0, 329], [0, 331]] missed. !
329
                        soft_failed_events_to_lookup.add(event_id)
!
330
                    else:
331
                        non_rejected_leaves.add(event_id)
!
332

333
            # We have a set of non-soft-failed descendants, so we recurse up
334
            # the graph to find all ancestors and add them to the set of event
335
            # IDs that we can delete from forward extremities table.
336
            to_delete = set()
!
337
            while non_rejected_leaves:
Branches [[0, 338], [0, 343]] missed. !
338
                event_id = non_rejected_leaves.pop()
!
339
                prev_event_ids = graph.get(event_id, set())
!
340
                non_rejected_leaves.update(prev_event_ids)
!
341
                to_delete.update(prev_event_ids)
!
342

343
            to_delete.intersection_update(original_set)
!
344

345
            deleted = self._simple_delete_many_txn(
!
346
                txn=txn,
347
                table="event_forward_extremities",
348
                column="event_id",
349
                iterable=to_delete,
350
                keyvalues={},
351
            )
352

353
            logger.info(
!
354
                "Deleted %d forward extremities of %d checked, to clean up #5269",
355
                deleted,
356
                len(original_set),
357
            )
358

359
            if deleted:
Branches [[0, 361], [0, 375]] missed. !
360
                # We now need to invalidate the caches of these rooms
361
                rows = self._simple_select_many_txn(
!
362
                    txn,
363
                    table="events",
364
                    column="event_id",
365
                    iterable=to_delete,
366
                    keyvalues={},
367
                    retcols=("room_id",),
368
                )
369
                room_ids = set(row["room_id"] for row in rows)
Branches [[0, 369], [0, 370]] missed. !
370
                for room_id in room_ids:
Branches [[0, 371], [0, 375]] missed. !
371
                    txn.call_after(
!
372
                        self.get_latest_event_ids_in_room.invalidate, (room_id,)
373
                    )
374

375
            self._simple_delete_many_txn(
!
376
                txn=txn,
377
                table="_extremities_to_check",
378
                column="event_id",
379
                iterable=original_set,
380
                keyvalues={},
381
            )
382

383
            return len(original_set)
!
384

385
        num_handled = yield self.runInteraction(
!
386
            "_cleanup_extremities_bg_update", _cleanup_extremities_bg_update_txn
387
        )
388

389
        if not num_handled:
Branches [[0, 390], [0, 399]] missed. !
390
            yield self._end_background_update(self.DELETE_SOFT_FAILED_EXTREMITIES)
!
391

392
            def _drop_table_txn(txn):
!
393
                txn.execute("DROP TABLE _extremities_to_check")
!
394

395
            yield self.runInteraction(
!
396
                "_cleanup_extremities_bg_update_drop_table", _drop_table_txn
397
            )
398

399
        return num_handled
!
Troubleshooting · Open an Issue · Sales · Support · ENTERPRISE · CAREERS · STATUS
BLOG · TWITTER · Legal & Privacy · Supported CI Services · What's a CI service? · Automated Testing

© 2019 Coveralls, LLC