Coveralls logob
Coveralls logo
  • Home
  • Features
  • Pricing
  • Docs
  • Sign In

matrix-org / synapse / 4532

23 Sep 2019 - 19:39 coverage decreased (-49.7%) to 17.596%
4532

Pull #6079

buildkite

Richard van der Hoff
update changelog
Pull Request #6079: Add submit_url response parameter to msisdn /requestToken

359 of 12986 branches covered (2.76%)

Branch coverage included in aggregate %.

0 of 7 new or added lines in 1 file covered. (0.0%)

18869 existing lines in 281 files now uncovered.

8809 of 39116 relevant lines covered (22.52%)

0.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

15.52
/synapse/storage/room.py
1
# -*- coding: utf-8 -*-
2
# Copyright 2014-2016 OpenMarket Ltd
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15

16
import collections
1×
17
import logging
1×
18
import re
1×
19

20
from canonicaljson import json
1×
21

22
from twisted.internet import defer
1×
23

24
from synapse.api.errors import StoreError
1×
25
from synapse.storage._base import SQLBaseStore
1×
26
from synapse.storage.search import SearchStore
1×
27
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
1×
28

29
logger = logging.getLogger(__name__)
1×
30

31

32
OpsLevel = collections.namedtuple(
1×
33
    "OpsLevel", ("ban_level", "kick_level", "redact_level")
34
)
35

36
RatelimitOverride = collections.namedtuple(
1×
37
    "RatelimitOverride", ("messages_per_second", "burst_count")
38
)
39

40

41
class RoomWorkerStore(SQLBaseStore):
1×
42
    def get_room(self, room_id):
1×
43
        """Retrieve a room.
44

45
        Args:
46
            room_id (str): The ID of the room to retrieve.
47
        Returns:
48
            A dict containing the room information, or None if the room is unknown.
49
        """
UNCOV
50
        return self._simple_select_one(
!
51
            table="rooms",
52
            keyvalues={"room_id": room_id},
53
            retcols=("room_id", "is_public", "creator"),
54
            desc="get_room",
55
            allow_none=True,
56
        )
57

58
    def get_public_room_ids(self):
1×
UNCOV
59
        return self._simple_select_onecol(
!
60
            table="rooms",
61
            keyvalues={"is_public": True},
62
            retcol="room_id",
63
            desc="get_public_room_ids",
64
        )
65

66
    @cached(num_args=2, max_entries=100)
1×
67
    def get_public_room_ids_at_stream_id(self, stream_id, network_tuple):
68
        """Get pulbic rooms for a particular list, or across all lists.
69

70
        Args:
71
            stream_id (int)
72
            network_tuple (ThirdPartyInstanceID): The list to use (None, None)
73
                means the main list, None means all lsits.
74
        """
UNCOV
75
        return self.runInteraction(
!
76
            "get_public_room_ids_at_stream_id",
77
            self.get_public_room_ids_at_stream_id_txn,
78
            stream_id,
79
            network_tuple=network_tuple,
80
        )
81

82
    def get_public_room_ids_at_stream_id_txn(self, txn, stream_id, network_tuple):
1×
UNCOV
83
        return {
Branches [[0, 83], [0, 82]] missed. !
84
            rm
85
            for rm, vis in self.get_published_at_stream_id_txn(
86
                txn, stream_id, network_tuple=network_tuple
87
            ).items()
88
            if vis
89
        }
90

91
    def get_published_at_stream_id_txn(self, txn, stream_id, network_tuple):
1×
UNCOV
92
        if network_tuple:
Branches [[0, 95], [0, 116]] missed. !
93
            # We want to get from a particular list. No aggregation required.
94

UNCOV
95
            sql = """
!
96
                SELECT room_id, visibility FROM public_room_list_stream
97
                INNER JOIN (
98
                    SELECT room_id, max(stream_id) AS stream_id
99
                    FROM public_room_list_stream
100
                    WHERE stream_id <= ? %s
101
                    GROUP BY room_id
102
                ) grouped USING (room_id, stream_id)
103
            """
104

UNCOV
105
            if network_tuple.appservice_id is not None:
Branches [[0, 106], [0, 111]] missed. !
UNCOV
106
                txn.execute(
!
107
                    sql % ("AND appservice_id = ? AND network_id = ?",),
108
                    (stream_id, network_tuple.appservice_id, network_tuple.network_id),
109
                )
110
            else:
UNCOV
111
                txn.execute(sql % ("AND appservice_id IS NULL",), (stream_id,))
!
UNCOV
112
            return dict(txn)
!
113
        else:
114
            # We want to get from all lists, so we need to aggregate the results
115

UNCOV
116
            logger.info("Executing full list")
!
117

UNCOV
118
            sql = """
!
119
                SELECT room_id, visibility
120
                FROM public_room_list_stream
121
                INNER JOIN (
122
                    SELECT
123
                        room_id, max(stream_id) AS stream_id, appservice_id,
124
                        network_id
125
                    FROM public_room_list_stream
126
                    WHERE stream_id <= ?
127
                    GROUP BY room_id, appservice_id, network_id
128
                ) grouped USING (room_id, stream_id)
129
            """
130

UNCOV
131
            txn.execute(sql, (stream_id,))
!
132

UNCOV
133
            results = {}
!
134
            # A room is visible if its visible on any list.
UNCOV
135
            for room_id, visibility in txn:
Branches [[0, 136], [0, 138]] missed. !
UNCOV
136
                results[room_id] = bool(visibility) or results.get(room_id, False)
!
137

UNCOV
138
            return results
!
139

140
    def get_public_room_changes(self, prev_stream_id, new_stream_id, network_tuple):
1×
141
        def get_public_room_changes_txn(txn):
!
142
            then_rooms = self.get_public_room_ids_at_stream_id_txn(
!
143
                txn, prev_stream_id, network_tuple
144
            )
145

146
            now_rooms_dict = self.get_published_at_stream_id_txn(
!
147
                txn, new_stream_id, network_tuple
148
            )
149

150
            now_rooms_visible = set(rm for rm, vis in now_rooms_dict.items() if vis)
Branches [[0, 150], [0, 151]] missed. !
151
            now_rooms_not_visible = set(
Branches [[0, 152], [0, 155]] missed. !
152
                rm for rm, vis in now_rooms_dict.items() if not vis
153
            )
154

155
            newly_visible = now_rooms_visible - then_rooms
!
156
            newly_unpublished = now_rooms_not_visible & then_rooms
!
157

158
            return newly_visible, newly_unpublished
!
159

160
        return self.runInteraction(
!
161
            "get_public_room_changes", get_public_room_changes_txn
162
        )
163

164
    @cached(max_entries=10000)
1×
165
    def is_room_blocked(self, room_id):
UNCOV
166
        return self._simple_select_one_onecol(
!
167
            table="blocked_rooms",
168
            keyvalues={"room_id": room_id},
169
            retcol="1",
170
            allow_none=True,
171
            desc="is_room_blocked",
172
        )
173

174
    @cachedInlineCallbacks(max_entries=10000)
1×
175
    def get_ratelimit_for_user(self, user_id):
176
        """Check if there are any overrides for ratelimiting for the given
177
        user
178

179
        Args:
180
            user_id (str)
181

182
        Returns:
183
            RatelimitOverride if there is an override, else None. If the contents
184
            of RatelimitOverride are None or 0 then ratelimitng has been
185
            disabled for that user entirely.
186
        """
UNCOV
187
        row = yield self._simple_select_one(
!
188
            table="ratelimit_override",
189
            keyvalues={"user_id": user_id},
190
            retcols=("messages_per_second", "burst_count"),
191
            allow_none=True,
192
            desc="get_ratelimit_for_user",
193
        )
194

UNCOV
195
        if row:
Branches [[0, 196], [0, 201]] missed. !
196
            return RatelimitOverride(
!
197
                messages_per_second=row["messages_per_second"],
198
                burst_count=row["burst_count"],
199
            )
200
        else:
UNCOV
201
            return None
!
202

203

204
class RoomStore(RoomWorkerStore, SearchStore):
1×
205
    @defer.inlineCallbacks
1×
206
    def store_room(self, room_id, room_creator_user_id, is_public):
207
        """Stores a room.
208

209
        Args:
210
            room_id (str): The desired room ID, can be None.
211
            room_creator_user_id (str): The user ID of the room creator.
212
            is_public (bool): True to indicate that this room should appear in
213
            public room lists.
214
        Raises:
215
            StoreError if the room could not be stored.
216
        """
UNCOV
217
        try:
!
218

UNCOV
219
            def store_room_txn(txn, next_id):
!
UNCOV
220
                self._simple_insert_txn(
!
221
                    txn,
222
                    "rooms",
223
                    {
224
                        "room_id": room_id,
225
                        "creator": room_creator_user_id,
226
                        "is_public": is_public,
227
                    },
228
                )
UNCOV
229
                if is_public:
Branches [[0, 219], [0, 230]] missed. !
UNCOV
230
                    self._simple_insert_txn(
!
231
                        txn,
232
                        table="public_room_list_stream",
233
                        values={
234
                            "stream_id": next_id,
235
                            "room_id": room_id,
236
                            "visibility": is_public,
237
                        },
238
                    )
239

UNCOV
240
            with self._public_room_id_gen.get_next() as next_id:
!
UNCOV
241
                yield self.runInteraction("store_room_txn", store_room_txn, next_id)
!
UNCOV
242
        except Exception as e:
!
UNCOV
243
            logger.error("store_room with room_id=%s failed: %s", room_id, e)
!
UNCOV
244
            raise StoreError(500, "Problem creating room.")
!
245

246
    @defer.inlineCallbacks
1×
247
    def set_room_is_public(self, room_id, is_public):
UNCOV
248
        def set_room_is_public_txn(txn, next_id):
!
UNCOV
249
            self._simple_update_one_txn(
!
250
                txn,
251
                table="rooms",
252
                keyvalues={"room_id": room_id},
253
                updatevalues={"is_public": is_public},
254
            )
255

UNCOV
256
            entries = self._simple_select_list_txn(
!
257
                txn,
258
                table="public_room_list_stream",
259
                keyvalues={
260
                    "room_id": room_id,
261
                    "appservice_id": None,
262
                    "network_id": None,
263
                },
264
                retcols=("stream_id", "visibility"),
265
            )
266

UNCOV
267
            entries.sort(key=lambda r: r["stream_id"])
Branches [[0, 267], [0, 269]] missed. !
268

UNCOV
269
            add_to_stream = True
!
UNCOV
270
            if entries:
Branches [[0, 271], [0, 273]] missed. !
271
                add_to_stream = bool(entries[-1]["visibility"]) != is_public
!
272

UNCOV
273
            if add_to_stream:
Branches [[0, 248], [0, 274]] missed. !
UNCOV
274
                self._simple_insert_txn(
!
275
                    txn,
276
                    table="public_room_list_stream",
277
                    values={
278
                        "stream_id": next_id,
279
                        "room_id": room_id,
280
                        "visibility": is_public,
281
                        "appservice_id": None,
282
                        "network_id": None,
283
                    },
284
                )
285

UNCOV
286
        with self._public_room_id_gen.get_next() as next_id:
!
UNCOV
287
            yield self.runInteraction(
!
288
                "set_room_is_public", set_room_is_public_txn, next_id
289
            )
UNCOV
290
        self.hs.get_notifier().on_new_replication_data()
!
291

292
    @defer.inlineCallbacks
1×
293
    def set_room_is_public_appservice(
294
        self, room_id, appservice_id, network_id, is_public
295
    ):
296
        """Edit the appservice/network specific public room list.
297

298
        Each appservice can have a number of published room lists associated
299
        with them, keyed off of an appservice defined `network_id`, which
300
        basically represents a single instance of a bridge to a third party
301
        network.
302

303
        Args:
304
            room_id (str)
305
            appservice_id (str)
306
            network_id (str)
307
            is_public (bool): Whether to publish or unpublish the room from the
308
                list.
309
        """
310

UNCOV
311
        def set_room_is_public_appservice_txn(txn, next_id):
!
UNCOV
312
            if is_public:
Branches [[0, 313], [0, 327]] missed. !
UNCOV
313
                try:
!
UNCOV
314
                    self._simple_insert_txn(
!
315
                        txn,
316
                        table="appservice_room_list",
317
                        values={
318
                            "appservice_id": appservice_id,
319
                            "network_id": network_id,
320
                            "room_id": room_id,
321
                        },
322
                    )
323
                except self.database_engine.module.IntegrityError:
!
324
                    # We've already inserted, nothing to do.
325
                    return
!
326
            else:
UNCOV
327
                self._simple_delete_txn(
!
328
                    txn,
329
                    table="appservice_room_list",
330
                    keyvalues={
331
                        "appservice_id": appservice_id,
332
                        "network_id": network_id,
333
                        "room_id": room_id,
334
                    },
335
                )
336

UNCOV
337
            entries = self._simple_select_list_txn(
!
338
                txn,
339
                table="public_room_list_stream",
340
                keyvalues={
341
                    "room_id": room_id,
342
                    "appservice_id": appservice_id,
343
                    "network_id": network_id,
344
                },
345
                retcols=("stream_id", "visibility"),
346
            )
347

UNCOV
348
            entries.sort(key=lambda r: r["stream_id"])
Branches [[0, 348], [0, 350]] missed. !
349

UNCOV
350
            add_to_stream = True
!
UNCOV
351
            if entries:
Branches [[0, 352], [0, 354]] missed. !
UNCOV
352
                add_to_stream = bool(entries[-1]["visibility"]) != is_public
!
353

UNCOV
354
            if add_to_stream:
Branches [[0, 311], [0, 355]] missed. !
UNCOV
355
                self._simple_insert_txn(
!
356
                    txn,
357
                    table="public_room_list_stream",
358
                    values={
359
                        "stream_id": next_id,
360
                        "room_id": room_id,
361
                        "visibility": is_public,
362
                        "appservice_id": appservice_id,
363
                        "network_id": network_id,
364
                    },
365
                )
366

UNCOV
367
        with self._public_room_id_gen.get_next() as next_id:
!
UNCOV
368
            yield self.runInteraction(
!
369
                "set_room_is_public_appservice",
370
                set_room_is_public_appservice_txn,
371
                next_id,
372
            )
UNCOV
373
        self.hs.get_notifier().on_new_replication_data()
!
374

375
    def get_room_count(self):
1×
376
        """Retrieve a list of all rooms
377
        """
378

379
        def f(txn):
!
380
            sql = "SELECT count(*)  FROM rooms"
!
381
            txn.execute(sql)
!
382
            row = txn.fetchone()
!
383
            return row[0] or 0
!
384

385
        return self.runInteraction("get_rooms", f)
!
386

387
    def _store_room_topic_txn(self, txn, event):
1×
UNCOV
388
        if hasattr(event, "content") and "topic" in event.content:
Branches [[0, 387], [0, 389]] missed. !
UNCOV
389
            self.store_event_search_txn(
!
390
                txn, event, "content.topic", event.content["topic"]
391
            )
392

393
    def _store_room_name_txn(self, txn, event):
1×
UNCOV
394
        if hasattr(event, "content") and "name" in event.content:
Branches [[0, 393], [0, 395]] missed. !
UNCOV
395
            self.store_event_search_txn(
!
396
                txn, event, "content.name", event.content["name"]
397
            )
398

399
    def _store_room_message_txn(self, txn, event):
1×
UNCOV
400
        if hasattr(event, "content") and "body" in event.content:
Branches [[0, 399], [0, 401]] missed. !
UNCOV
401
            self.store_event_search_txn(
!
402
                txn, event, "content.body", event.content["body"]
403
            )
404

405
    def add_event_report(
1×
406
        self, room_id, event_id, user_id, reason, content, received_ts
407
    ):
408
        next_id = self._event_reports_id_gen.get_next()
!
409
        return self._simple_insert(
!
410
            table="event_reports",
411
            values={
412
                "id": next_id,
413
                "received_ts": received_ts,
414
                "room_id": room_id,
415
                "event_id": event_id,
416
                "user_id": user_id,
417
                "reason": reason,
418
                "content": json.dumps(content),
419
            },
420
            desc="add_event_report",
421
        )
422

423
    def get_current_public_room_stream_id(self):
1×
UNCOV
424
        return self._public_room_id_gen.get_current_token()
!
425

426
    def get_all_new_public_rooms(self, prev_id, current_id, limit):
1×
427
        def get_all_new_public_rooms(txn):
!
428
            sql = """
!
429
                SELECT stream_id, room_id, visibility, appservice_id, network_id
430
                FROM public_room_list_stream
431
                WHERE stream_id > ? AND stream_id <= ?
432
                ORDER BY stream_id ASC
433
                LIMIT ?
434
            """
435

436
            txn.execute(sql, (prev_id, current_id, limit))
!
437
            return txn.fetchall()
!
438

439
        if prev_id == current_id:
Branches [[0, 440], [0, 442]] missed. !
440
            return defer.succeed([])
!
441

442
        return self.runInteraction("get_all_new_public_rooms", get_all_new_public_rooms)
!
443

444
    @defer.inlineCallbacks
1×
445
    def block_room(self, room_id, user_id):
446
        """Marks the room as blocked. Can be called multiple times.
447

448
        Args:
449
            room_id (str): Room to block
450
            user_id (str): Who blocked it
451

452
        Returns:
453
            Deferred
454
        """
UNCOV
455
        yield self._simple_upsert(
!
456
            table="blocked_rooms",
457
            keyvalues={"room_id": room_id},
458
            values={},
459
            insertion_values={"user_id": user_id},
460
            desc="block_room",
461
        )
UNCOV
462
        yield self.runInteraction(
!
463
            "block_room_invalidation",
464
            self._invalidate_cache_and_stream,
465
            self.is_room_blocked,
466
            (room_id,),
467
        )
468

469
    def get_media_mxcs_in_room(self, room_id):
1×
470
        """Retrieves all the local and remote media MXC URIs in a given room
471

472
        Args:
473
            room_id (str)
474

475
        Returns:
476
            The local and remote media as a lists of tuples where the key is
477
            the hostname and the value is the media ID.
478
        """
479

480
        def _get_media_mxcs_in_room_txn(txn):
!
481
            local_mxcs, remote_mxcs = self._get_media_mxcs_in_room_txn(txn, room_id)
!
482
            local_media_mxcs = []
!
483
            remote_media_mxcs = []
!
484

485
            # Convert the IDs to MXC URIs
486
            for media_id in local_mxcs:
Branches [[0, 487], [0, 488]] missed. !
487
                local_media_mxcs.append("mxc://%s/%s" % (self.hs.hostname, media_id))
!
488
            for hostname, media_id in remote_mxcs:
Branches [[0, 489], [0, 491]] missed. !
489
                remote_media_mxcs.append("mxc://%s/%s" % (hostname, media_id))
!
490

491
            return local_media_mxcs, remote_media_mxcs
!
492

493
        return self.runInteraction("get_media_ids_in_room", _get_media_mxcs_in_room_txn)
!
494

495
    def quarantine_media_ids_in_room(self, room_id, quarantined_by):
1×
496
        """For a room loops through all events with media and quarantines
497
        the associated media
498
        """
499

UNCOV
500
        def _quarantine_media_in_room_txn(txn):
!
UNCOV
501
            local_mxcs, remote_mxcs = self._get_media_mxcs_in_room_txn(txn, room_id)
!
UNCOV
502
            total_media_quarantined = 0
!
503

504
            # Now update all the tables to set the quarantined_by flag
505

UNCOV
506
            txn.executemany(
Branches [[0, 512], [0, 515]] missed. !
507
                """
508
                UPDATE local_media_repository
509
                SET quarantined_by = ?
510
                WHERE media_id = ?
511
            """,
512
                ((quarantined_by, media_id) for media_id in local_mxcs),
513
            )
514

UNCOV
515
            txn.executemany(
Branches [[0, 522], [0, 527]] missed. !
516
                """
517
                    UPDATE remote_media_cache
518
                    SET quarantined_by = ?
519
                    WHERE media_origin = ? AND media_id = ?
520
                """,
521
                (
522
                    (quarantined_by, origin, media_id)
523
                    for origin, media_id in remote_mxcs
524
                ),
525
            )
526

UNCOV
527
            total_media_quarantined += len(local_mxcs)
!
UNCOV
528
            total_media_quarantined += len(remote_mxcs)
!
529

UNCOV
530
            return total_media_quarantined
!
531

UNCOV
532
        return self.runInteraction(
!
533
            "quarantine_media_in_room", _quarantine_media_in_room_txn
534
        )
535

536
    def _get_media_mxcs_in_room_txn(self, txn, room_id):
1×
537
        """Retrieves all the local and remote media MXC URIs in a given room
538

539
        Args:
540
            txn (cursor)
541
            room_id (str)
542

543
        Returns:
544
            The local and remote media as a lists of tuples where the key is
545
            the hostname and the value is the media ID.
546
        """
UNCOV
547
        mxc_re = re.compile("^mxc://([^/]+)/([^/#?]+)")
!
548

UNCOV
549
        next_token = self.get_current_events_token() + 1
!
UNCOV
550
        local_media_mxcs = []
!
UNCOV
551
        remote_media_mxcs = []
!
552

UNCOV
553
        while next_token:
Branches [[0, 554], [0, 585]] missed. !
UNCOV
554
            sql = """
!
555
                SELECT stream_ordering, json FROM events
556
                JOIN event_json USING (room_id, event_id)
557
                WHERE room_id = ?
558
                    AND stream_ordering < ?
559
                    AND contains_url = ? AND outlier = ?
560
                ORDER BY stream_ordering DESC
561
                LIMIT ?
562
            """
UNCOV
563
            txn.execute(sql, (room_id, next_token, True, False, 100))
!
564

UNCOV
565
            next_token = None
!
UNCOV
566
            for stream_ordering, content_json in txn:
Branches [[0, 553], [0, 567]] missed. !
UNCOV
567
                next_token = stream_ordering
!
UNCOV
568
                event_json = json.loads(content_json)
!
UNCOV
569
                content = event_json["content"]
!
UNCOV
570
                content_url = content.get("url")
!
UNCOV
571
                thumbnail_url = content.get("info", {}).get("thumbnail_url")
!
572

UNCOV
573
                for url in (content_url, thumbnail_url):
Branches [[0, 566], [0, 574]] missed. !
UNCOV
574
                    if not url:
Branches [[0, 575], [0, 576]] missed. !
UNCOV
575
                        continue
!
UNCOV
576
                    matches = mxc_re.match(url)
!
UNCOV
577
                    if matches:
Branches [[0, 573], [0, 578]] missed. !
UNCOV
578
                        hostname = matches.group(1)
!
UNCOV
579
                        media_id = matches.group(2)
!
UNCOV
580
                        if hostname == self.hs.hostname:
Branches [[0, 581], [0, 583]] missed. !
UNCOV
581
                            local_media_mxcs.append(media_id)
!
582
                        else:
583
                            remote_media_mxcs.append((hostname, media_id))
!
584

UNCOV
585
        return local_media_mxcs, remote_media_mxcs
!
Troubleshooting · Open an Issue · Sales · Support · ENTERPRISE · CAREERS · STATUS
BLOG · TWITTER · Legal & Privacy · Supported CI Services · What's a CI service? · Automated Testing

© 2019 Coveralls, LLC