Coveralls logob
Coveralls logo
  • Home
  • Features
  • Pricing
  • Docs
  • Sign In

matrix-org / synapse / 4532

23 Sep 2019 - 19:39 coverage decreased (-49.7%) to 17.596%
4532

Pull #6079

buildkite

Richard van der Hoff
update changelog
Pull Request #6079: Add submit_url response parameter to msisdn /requestToken

359 of 12986 branches covered (2.76%)

Branch coverage included in aggregate %.

0 of 7 new or added lines in 1 file covered. (0.0%)

18869 existing lines in 281 files now uncovered.

8809 of 39116 relevant lines covered (22.52%)

0.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

34.09
/synapse/storage/__init__.py
1
# -*- coding: utf-8 -*-
2
# Copyright 2014-2016 OpenMarket Ltd
3
# Copyright 2018 New Vector Ltd
4
#
5
# Licensed under the Apache License, Version 2.0 (the "License");
6
# you may not use this file except in compliance with the License.
7
# You may obtain a copy of the License at
8
#
9
#     http://www.apache.org/licenses/LICENSE-2.0
10
#
11
# Unless required by applicable law or agreed to in writing, software
12
# distributed under the License is distributed on an "AS IS" BASIS,
13
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
# See the License for the specific language governing permissions and
15
# limitations under the License.
16

17
import calendar
1×
18
import logging
1×
19
import time
1×
20

21
from twisted.internet import defer
1×
22

23
from synapse.api.constants import PresenceState
1×
24
from synapse.storage.devices import DeviceStore
1×
25
from synapse.storage.user_erasure_store import UserErasureStore
1×
26
from synapse.util.caches.stream_change_cache import StreamChangeCache
1×
27

28
from .account_data import AccountDataStore
1×
29
from .appservice import ApplicationServiceStore, ApplicationServiceTransactionStore
1×
30
from .client_ips import ClientIpStore
1×
31
from .deviceinbox import DeviceInboxStore
1×
32
from .directory import DirectoryStore
1×
33
from .e2e_room_keys import EndToEndRoomKeyStore
1×
34
from .end_to_end_keys import EndToEndKeyStore
1×
35
from .engines import PostgresEngine
1×
36
from .event_federation import EventFederationStore
1×
37
from .event_push_actions import EventPushActionsStore
1×
38
from .events import EventsStore
1×
39
from .events_bg_updates import EventsBackgroundUpdatesStore
1×
40
from .filtering import FilteringStore
1×
41
from .group_server import GroupServerStore
1×
42
from .keys import KeyStore
1×
43
from .media_repository import MediaRepositoryStore
1×
44
from .monthly_active_users import MonthlyActiveUsersStore
1×
45
from .openid import OpenIdStore
1×
46
from .presence import PresenceStore, UserPresenceState
1×
47
from .profile import ProfileStore
1×
48
from .push_rule import PushRuleStore
1×
49
from .pusher import PusherStore
1×
50
from .receipts import ReceiptsStore
1×
51
from .registration import RegistrationStore
1×
52
from .rejections import RejectionsStore
1×
53
from .relations import RelationsStore
1×
54
from .room import RoomStore
1×
55
from .roommember import RoomMemberStore
1×
56
from .search import SearchStore
1×
57
from .signatures import SignatureStore
1×
58
from .state import StateStore
1×
59
from .stats import StatsStore
1×
60
from .stream import StreamStore
1×
61
from .tags import TagsStore
1×
62
from .transactions import TransactionStore
1×
63
from .user_directory import UserDirectoryStore
1×
64
from .util.id_generators import ChainedIdGenerator, IdGenerator, StreamIdGenerator
1×
65

66
logger = logging.getLogger(__name__)
1×
67

68

69
class DataStore(
1×
70
    EventsBackgroundUpdatesStore,
71
    RoomMemberStore,
72
    RoomStore,
73
    RegistrationStore,
74
    StreamStore,
75
    ProfileStore,
76
    PresenceStore,
77
    TransactionStore,
78
    DirectoryStore,
79
    KeyStore,
80
    StateStore,
81
    SignatureStore,
82
    ApplicationServiceStore,
83
    EventsStore,
84
    EventFederationStore,
85
    MediaRepositoryStore,
86
    RejectionsStore,
87
    FilteringStore,
88
    PusherStore,
89
    PushRuleStore,
90
    ApplicationServiceTransactionStore,
91
    ReceiptsStore,
92
    EndToEndKeyStore,
93
    EndToEndRoomKeyStore,
94
    SearchStore,
95
    TagsStore,
96
    AccountDataStore,
97
    EventPushActionsStore,
98
    OpenIdStore,
99
    ClientIpStore,
100
    DeviceStore,
101
    DeviceInboxStore,
102
    UserDirectoryStore,
103
    GroupServerStore,
104
    UserErasureStore,
105
    MonthlyActiveUsersStore,
106
    StatsStore,
107
    RelationsStore,
108
):
109
    def __init__(self, db_conn, hs):
1×
UNCOV
110
        self.hs = hs
!
UNCOV
111
        self._clock = hs.get_clock()
!
UNCOV
112
        self.database_engine = hs.database_engine
!
113

UNCOV
114
        self._stream_id_gen = StreamIdGenerator(
!
115
            db_conn,
116
            "events",
117
            "stream_ordering",
118
            extra_tables=[("local_invites", "stream_id")],
119
        )
UNCOV
120
        self._backfill_id_gen = StreamIdGenerator(
!
121
            db_conn,
122
            "events",
123
            "stream_ordering",
124
            step=-1,
125
            extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
126
        )
UNCOV
127
        self._presence_id_gen = StreamIdGenerator(
!
128
            db_conn, "presence_stream", "stream_id"
129
        )
UNCOV
130
        self._device_inbox_id_gen = StreamIdGenerator(
!
131
            db_conn, "device_max_stream_id", "stream_id"
132
        )
UNCOV
133
        self._public_room_id_gen = StreamIdGenerator(
!
134
            db_conn, "public_room_list_stream", "stream_id"
135
        )
UNCOV
136
        self._device_list_id_gen = StreamIdGenerator(
!
137
            db_conn, "device_lists_stream", "stream_id"
138
        )
139

UNCOV
140
        self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
!
UNCOV
141
        self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
!
UNCOV
142
        self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id")
!
UNCOV
143
        self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id")
!
UNCOV
144
        self._push_rules_stream_id_gen = ChainedIdGenerator(
!
145
            self._stream_id_gen, db_conn, "push_rules_stream", "stream_id"
146
        )
UNCOV
147
        self._pushers_id_gen = StreamIdGenerator(
!
148
            db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")]
149
        )
UNCOV
150
        self._group_updates_id_gen = StreamIdGenerator(
!
151
            db_conn, "local_group_updates", "stream_id"
152
        )
153

UNCOV
154
        if isinstance(self.database_engine, PostgresEngine):
Branches [[0, 155], [0, 159]] missed. !
UNCOV
155
            self._cache_id_gen = StreamIdGenerator(
!
156
                db_conn, "cache_invalidation_stream", "stream_id"
157
            )
158
        else:
159
            self._cache_id_gen = None
!
160

UNCOV
161
        self._presence_on_startup = self._get_active_presence(db_conn)
!
162

UNCOV
163
        presence_cache_prefill, min_presence_val = self._get_cache_dict(
!
164
            db_conn,
165
            "presence_stream",
166
            entity_column="user_id",
167
            stream_column="stream_id",
168
            max_value=self._presence_id_gen.get_current_token(),
169
        )
UNCOV
170
        self.presence_stream_cache = StreamChangeCache(
!
171
            "PresenceStreamChangeCache",
172
            min_presence_val,
173
            prefilled_cache=presence_cache_prefill,
174
        )
175

UNCOV
176
        max_device_inbox_id = self._device_inbox_id_gen.get_current_token()
!
UNCOV
177
        device_inbox_prefill, min_device_inbox_id = self._get_cache_dict(
!
178
            db_conn,
179
            "device_inbox",
180
            entity_column="user_id",
181
            stream_column="stream_id",
182
            max_value=max_device_inbox_id,
183
            limit=1000,
184
        )
UNCOV
185
        self._device_inbox_stream_cache = StreamChangeCache(
!
186
            "DeviceInboxStreamChangeCache",
187
            min_device_inbox_id,
188
            prefilled_cache=device_inbox_prefill,
189
        )
190
        # The federation outbox and the local device inbox uses the same
191
        # stream_id generator.
UNCOV
192
        device_outbox_prefill, min_device_outbox_id = self._get_cache_dict(
!
193
            db_conn,
194
            "device_federation_outbox",
195
            entity_column="destination",
196
            stream_column="stream_id",
197
            max_value=max_device_inbox_id,
198
            limit=1000,
199
        )
UNCOV
200
        self._device_federation_outbox_stream_cache = StreamChangeCache(
!
201
            "DeviceFederationOutboxStreamChangeCache",
202
            min_device_outbox_id,
203
            prefilled_cache=device_outbox_prefill,
204
        )
205

UNCOV
206
        device_list_max = self._device_list_id_gen.get_current_token()
!
UNCOV
207
        self._device_list_stream_cache = StreamChangeCache(
!
208
            "DeviceListStreamChangeCache", device_list_max
209
        )
UNCOV
210
        self._device_list_federation_stream_cache = StreamChangeCache(
!
211
            "DeviceListFederationStreamChangeCache", device_list_max
212
        )
213

UNCOV
214
        events_max = self._stream_id_gen.get_current_token()
!
UNCOV
215
        curr_state_delta_prefill, min_curr_state_delta_id = self._get_cache_dict(
!
216
            db_conn,
217
            "current_state_delta_stream",
218
            entity_column="room_id",
219
            stream_column="stream_id",
220
            max_value=events_max,  # As we share the stream id with events token
221
            limit=1000,
222
        )
UNCOV
223
        self._curr_state_delta_stream_cache = StreamChangeCache(
!
224
            "_curr_state_delta_stream_cache",
225
            min_curr_state_delta_id,
226
            prefilled_cache=curr_state_delta_prefill,
227
        )
228

UNCOV
229
        _group_updates_prefill, min_group_updates_id = self._get_cache_dict(
!
230
            db_conn,
231
            "local_group_updates",
232
            entity_column="user_id",
233
            stream_column="stream_id",
234
            max_value=self._group_updates_id_gen.get_current_token(),
235
            limit=1000,
236
        )
UNCOV
237
        self._group_updates_stream_cache = StreamChangeCache(
!
238
            "_group_updates_stream_cache",
239
            min_group_updates_id,
240
            prefilled_cache=_group_updates_prefill,
241
        )
242

UNCOV
243
        self._stream_order_on_start = self.get_room_max_stream_ordering()
!
UNCOV
244
        self._min_stream_order_on_start = self.get_room_min_stream_ordering()
!
245

246
        # Used in _generate_user_daily_visits to keep track of progress
UNCOV
247
        self._last_user_visit_update = self._get_start_of_day()
!
248

UNCOV
249
        super(DataStore, self).__init__(db_conn, hs)
!
250

251
    def take_presence_startup_info(self):
1×
UNCOV
252
        active_on_startup = self._presence_on_startup
!
UNCOV
253
        self._presence_on_startup = None
!
UNCOV
254
        return active_on_startup
!
255

256
    def _get_active_presence(self, db_conn):
1×
257
        """Fetch non-offline presence from the database so that we can register
258
        the appropriate time outs.
259
        """
260

UNCOV
261
        sql = (
!
262
            "SELECT user_id, state, last_active_ts, last_federation_update_ts,"
263
            " last_user_sync_ts, status_msg, currently_active FROM presence_stream"
264
            " WHERE state != ?"
265
        )
UNCOV
266
        sql = self.database_engine.convert_param_style(sql)
!
267

UNCOV
268
        txn = db_conn.cursor()
!
UNCOV
269
        txn.execute(sql, (PresenceState.OFFLINE,))
!
UNCOV
270
        rows = self.cursor_to_dict(txn)
!
UNCOV
271
        txn.close()
!
272

UNCOV
273
        for row in rows:
Branches [[0, 274], [0, 276]] missed. !
274
            row["currently_active"] = bool(row["currently_active"])
!
275

UNCOV
276
        return [UserPresenceState(**row) for row in rows]
Branches [[0, 276], [0, 256]] missed. !
277

278
    def count_daily_users(self):
1×
279
        """
280
        Counts the number of users who used this homeserver in the last 24 hours.
281
        """
282
        yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24)
!
283
        return self.runInteraction("count_daily_users", self._count_users, yesterday)
!
284

285
    def count_monthly_users(self):
1×
286
        """
287
        Counts the number of users who used this homeserver in the last 30 days.
288
        Note this method is intended for phonehome metrics only and is different
289
        from the mau figure in synapse.storage.monthly_active_users which,
290
        amongst other things, includes a 3 day grace period before a user counts.
291
        """
292
        thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30)
!
293
        return self.runInteraction(
!
294
            "count_monthly_users", self._count_users, thirty_days_ago
295
        )
296

297
    def _count_users(self, txn, time_from):
1×
298
        """
299
        Returns number of users seen in the past time_from period
300
        """
301
        sql = """
!
302
            SELECT COALESCE(count(*), 0) FROM (
303
                SELECT user_id FROM user_ips
304
                WHERE last_seen > ?
305
                GROUP BY user_id
306
            ) u
307
        """
308
        txn.execute(sql, (time_from,))
!
309
        count, = txn.fetchone()
!
310
        return count
!
311

312
    def count_r30_users(self):
1×
313
        """
314
        Counts the number of 30 day retained users, defined as:-
315
         * Users who have created their accounts more than 30 days ago
316
         * Where last seen at most 30 days ago
317
         * Where account creation and last_seen are > 30 days apart
318

319
         Returns counts globaly for a given user as well as breaking
320
         by platform
321
        """
322

323
        def _count_r30_users(txn):
!
324
            thirty_days_in_secs = 86400 * 30
!
325
            now = int(self._clock.time())
!
326
            thirty_days_ago_in_secs = now - thirty_days_in_secs
!
327

328
            sql = """
!
329
                SELECT platform, COALESCE(count(*), 0) FROM (
330
                     SELECT
331
                        users.name, platform, users.creation_ts * 1000,
332
                        MAX(uip.last_seen)
333
                     FROM users
334
                     INNER JOIN (
335
                         SELECT
336
                         user_id,
337
                         last_seen,
338
                         CASE
339
                             WHEN user_agent LIKE '%%Android%%' THEN 'android'
340
                             WHEN user_agent LIKE '%%iOS%%' THEN 'ios'
341
                             WHEN user_agent LIKE '%%Electron%%' THEN 'electron'
342
                             WHEN user_agent LIKE '%%Mozilla%%' THEN 'web'
343
                             WHEN user_agent LIKE '%%Gecko%%' THEN 'web'
344
                             ELSE 'unknown'
345
                         END
346
                         AS platform
347
                         FROM user_ips
348
                     ) uip
349
                     ON users.name = uip.user_id
350
                     AND users.appservice_id is NULL
351
                     AND users.creation_ts < ?
352
                     AND uip.last_seen/1000 > ?
353
                     AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30
354
                     GROUP BY users.name, platform, users.creation_ts
355
                ) u GROUP BY platform
356
            """
357

358
            results = {}
!
359
            txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs))
!
360

361
            for row in txn:
Branches [[0, 362], [0, 366]] missed. !
362
                if row[0] == "unknown":
!
363
                    pass
364
                results[row[0]] = row[1]
!
365

366
            sql = """
!
367
                SELECT COALESCE(count(*), 0) FROM (
368
                    SELECT users.name, users.creation_ts * 1000,
369
                                                        MAX(uip.last_seen)
370
                    FROM users
371
                    INNER JOIN (
372
                        SELECT
373
                        user_id,
374
                        last_seen
375
                        FROM user_ips
376
                    ) uip
377
                    ON users.name = uip.user_id
378
                    AND appservice_id is NULL
379
                    AND users.creation_ts < ?
380
                    AND uip.last_seen/1000 > ?
381
                    AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30
382
                    GROUP BY users.name, users.creation_ts
383
                ) u
384
            """
385

386
            txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs))
!
387

388
            count, = txn.fetchone()
!
389
            results["all"] = count
!
390

391
            return results
!
392

393
        return self.runInteraction("count_r30_users", _count_r30_users)
!
394

395
    def _get_start_of_day(self):
1×
396
        """
397
        Returns millisecond unixtime for start of UTC day.
398
        """
UNCOV
399
        now = time.gmtime()
!
UNCOV
400
        today_start = calendar.timegm((now.tm_year, now.tm_mon, now.tm_mday, 0, 0, 0))
!
UNCOV
401
        return today_start * 1000
!
402

403
    def generate_user_daily_visits(self):
1×
404
        """
405
        Generates daily visit data for use in cohort/ retention analysis
406
        """
407

UNCOV
408
        def _generate_user_daily_visits(txn):
!
UNCOV
409
            logger.info("Calling _generate_user_daily_visits")
!
UNCOV
410
            today_start = self._get_start_of_day()
!
UNCOV
411
            a_day_in_milliseconds = 24 * 60 * 60 * 1000
!
UNCOV
412
            now = self.clock.time_msec()
!
413

UNCOV
414
            sql = """
!
415
                INSERT INTO user_daily_visits (user_id, device_id, timestamp)
416
                    SELECT u.user_id, u.device_id, ?
417
                    FROM user_ips AS u
418
                    LEFT JOIN (
419
                      SELECT user_id, device_id, timestamp FROM user_daily_visits
420
                      WHERE timestamp = ?
421
                    ) udv
422
                    ON u.user_id = udv.user_id AND u.device_id=udv.device_id
423
                    INNER JOIN users ON users.name=u.user_id
424
                    WHERE last_seen > ? AND last_seen <= ?
425
                    AND udv.timestamp IS NULL AND users.is_guest=0
426
                    AND users.appservice_id IS NULL
427
                    GROUP BY u.user_id, u.device_id
428
            """
429

430
            # This means that the day has rolled over but there could still
431
            # be entries from the previous day. There is an edge case
432
            # where if the user logs in at 23:59 and overwrites their
433
            # last_seen at 00:01 then they will not be counted in the
434
            # previous day's stats - it is important that the query is run
435
            # often to minimise this case.
UNCOV
436
            if today_start > self._last_user_visit_update:
Branches [[0, 437], [0, 449]] missed. !
437
                yesterday_start = today_start - a_day_in_milliseconds
!
438
                txn.execute(
!
439
                    sql,
440
                    (
441
                        yesterday_start,
442
                        yesterday_start,
443
                        self._last_user_visit_update,
444
                        today_start,
445
                    ),
446
                )
447
                self._last_user_visit_update = today_start
!
448

UNCOV
449
            txn.execute(
!
450
                sql, (today_start, today_start, self._last_user_visit_update, now)
451
            )
452
            # Update _last_user_visit_update to now. The reason to do this
453
            # rather just clamping to the beginning of the day is to limit
454
            # the size of the join - meaning that the query can be run more
455
            # frequently
UNCOV
456
            self._last_user_visit_update = now
!
457

UNCOV
458
        return self.runInteraction(
!
459
            "generate_user_daily_visits", _generate_user_daily_visits
460
        )
461

462
    def get_users(self):
1×
463
        """Function to reterive a list of users in users table.
464

465
        Args:
466
        Returns:
467
            defer.Deferred: resolves to list[dict[str, Any]]
468
        """
469
        return self._simple_select_list(
!
470
            table="users",
471
            keyvalues={},
472
            retcols=["name", "password_hash", "is_guest", "admin", "user_type"],
473
            desc="get_users",
474
        )
475

476
    @defer.inlineCallbacks
1×
477
    def get_users_paginate(self, order, start, limit):
478
        """Function to reterive a paginated list of users from
479
        users list. This will return a json object, which contains
480
        list of users and the total number of users in users table.
481

482
        Args:
483
            order (str): column name to order the select by this column
484
            start (int): start number to begin the query from
485
            limit (int): number of rows to reterive
486
        Returns:
487
            defer.Deferred: resolves to json object {list[dict[str, Any]], count}
488
        """
489
        users = yield self.runInteraction(
!
490
            "get_users_paginate",
491
            self._simple_select_list_paginate_txn,
492
            table="users",
493
            keyvalues={"is_guest": False},
494
            orderby=order,
495
            start=start,
496
            limit=limit,
497
            retcols=["name", "password_hash", "is_guest", "admin", "user_type"],
498
        )
499
        count = yield self.runInteraction("get_users_paginate", self.get_user_count_txn)
!
500
        retval = {"users": users, "total": count}
!
501
        return retval
!
502

503
    def search_users(self, term):
1×
504
        """Function to search users list for one or more users with
505
        the matched term.
506

507
        Args:
508
            term (str): search term
509
            col (str): column to query term should be matched to
510
        Returns:
511
            defer.Deferred: resolves to list[dict[str, Any]]
512
        """
513
        return self._simple_search_list(
!
514
            table="users",
515
            term=term,
516
            col="name",
517
            retcols=["name", "password_hash", "is_guest", "admin", "user_type"],
518
            desc="search_users",
519
        )
520

521

522
def are_all_users_on_domain(txn, database_engine, domain):
1×
UNCOV
523
    sql = database_engine.convert_param_style(
!
524
        "SELECT COUNT(*) FROM users WHERE name NOT LIKE ?"
525
    )
UNCOV
526
    pat = "%:" + domain
!
UNCOV
527
    txn.execute(sql, (pat,))
!
UNCOV
528
    num_not_matching = txn.fetchall()[0][0]
!
UNCOV
529
    if num_not_matching == 0:
Branches [[0, 530], [0, 531]] missed. !
UNCOV
530
        return True
!
531
    return False
!
Troubleshooting · Open an Issue · Sales · Support · ENTERPRISE · CAREERS · STATUS
BLOG · TWITTER · Legal & Privacy · Supported CI Services · What's a CI service? · Automated Testing

© 2019 Coveralls, LLC