Coveralls logob
Coveralls logo
  • Home
  • Features
  • Pricing
  • Docs
  • Sign In

matrix-org / synapse / 4532

23 Sep 2019 - 19:39 coverage decreased (-49.7%) to 17.596%
4532

Pull #6079

buildkite

Richard van der Hoff
update changelog
Pull Request #6079: Add submit_url response parameter to msisdn /requestToken

359 of 12986 branches covered (2.76%)

Branch coverage included in aggregate %.

0 of 7 new or added lines in 1 file covered. (0.0%)

18869 existing lines in 281 files now uncovered.

8809 of 39116 relevant lines covered (22.52%)

0.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

14.91
/synapse/handlers/typing.py
1
# -*- coding: utf-8 -*-
2
# Copyright 2014-2016 OpenMarket Ltd
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15

16
import logging
1×
17
from collections import namedtuple
1×
18

19
from twisted.internet import defer
1×
20

21
from synapse.api.errors import AuthError, SynapseError
1×
22
from synapse.logging.context import run_in_background
1×
23
from synapse.types import UserID, get_domain_from_id
1×
24
from synapse.util.caches.stream_change_cache import StreamChangeCache
1×
25
from synapse.util.metrics import Measure
1×
26
from synapse.util.wheel_timer import WheelTimer
1×
27

28
logger = logging.getLogger(__name__)
1×
29

30

31
# A tiny object useful for storing a user's membership in a room, as a mapping
32
# key
33
RoomMember = namedtuple("RoomMember", ("room_id", "user_id"))
1×
34

35

36
# How often we expect remote servers to resend us presence.
37
FEDERATION_TIMEOUT = 60 * 1000
1×
38

39
# How often to resend typing across federation.
40
FEDERATION_PING_INTERVAL = 40 * 1000
1×
41

42

43
class TypingHandler(object):
1×
44
    def __init__(self, hs):
1×
UNCOV
45
        self.store = hs.get_datastore()
!
UNCOV
46
        self.server_name = hs.config.server_name
!
UNCOV
47
        self.auth = hs.get_auth()
!
UNCOV
48
        self.is_mine_id = hs.is_mine_id
!
UNCOV
49
        self.notifier = hs.get_notifier()
!
UNCOV
50
        self.state = hs.get_state_handler()
!
51

UNCOV
52
        self.hs = hs
!
53

UNCOV
54
        self.clock = hs.get_clock()
!
UNCOV
55
        self.wheel_timer = WheelTimer(bucket_size=5000)
!
56

UNCOV
57
        self.federation = hs.get_federation_sender()
!
58

UNCOV
59
        hs.get_federation_registry().register_edu_handler("m.typing", self._recv_edu)
!
60

UNCOV
61
        hs.get_distributor().observe("user_left_room", self.user_left_room)
!
62

UNCOV
63
        self._member_typing_until = {}  # clock time we expect to stop
!
UNCOV
64
        self._member_last_federation_poke = {}
!
65

UNCOV
66
        self._latest_room_serial = 0
!
UNCOV
67
        self._reset()
!
68

69
        # caches which room_ids changed at which serials
UNCOV
70
        self._typing_stream_change_cache = StreamChangeCache(
!
71
            "TypingStreamChangeCache", self._latest_room_serial
72
        )
73

UNCOV
74
        self.clock.looping_call(self._handle_timeouts, 5000)
!
75

76
    def _reset(self):
1×
77
        """
78
        Reset the typing handler's data caches.
79
        """
80
        # map room IDs to serial numbers
UNCOV
81
        self._room_serials = {}
!
82
        # map room IDs to sets of users currently typing
UNCOV
83
        self._room_typing = {}
!
84

85
    def _handle_timeouts(self):
1×
UNCOV
86
        logger.debug("Checking for typing timeouts")
!
87

UNCOV
88
        now = self.clock.time_msec()
!
89

UNCOV
90
        members = set(self.wheel_timer.fetch(now))
!
91

UNCOV
92
        for member in members:
Branches [[0, 85], [0, 93]] missed. !
UNCOV
93
            if not self.is_typing(member):
Branches [[0, 95], [0, 97]] missed. !
94
                # Nothing to do if they're no longer typing
UNCOV
95
                continue
!
96

UNCOV
97
            until = self._member_typing_until.get(member, None)
!
UNCOV
98
            if not until or until <= now:
Branches [[0, 99], [0, 105]] missed. !
UNCOV
99
                logger.info("Timing out typing for: %s", member.user_id)
!
UNCOV
100
                self._stopped_typing(member)
!
UNCOV
101
                continue
!
102

103
            # Check if we need to resend a keep alive over federation for this
104
            # user.
105
            if self.hs.is_mine_id(member.user_id):
Branches [[0, 106], [0, 112]] missed. !
106
                last_fed_poke = self._member_last_federation_poke.get(member, None)
!
107
                if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now:
Branches [[0, 108], [0, 112]] missed. !
108
                    run_in_background(self._push_remote, member=member, typing=True)
!
109

110
            # Add a paranoia timer to ensure that we always have a timer for
111
            # each person typing.
112
            self.wheel_timer.insert(now=now, obj=member, then=now + 60 * 1000)
!
113

114
    def is_typing(self, member):
1×
UNCOV
115
        return member.user_id in self._room_typing.get(member.room_id, [])
!
116

117
    @defer.inlineCallbacks
1×
118
    def started_typing(self, target_user, auth_user, room_id, timeout):
UNCOV
119
        target_user_id = target_user.to_string()
!
UNCOV
120
        auth_user_id = auth_user.to_string()
!
121

UNCOV
122
        if not self.is_mine_id(target_user_id):
Branches [[0, 123], [0, 125]] missed. !
123
            raise SynapseError(400, "User is not hosted on this Home Server")
!
124

UNCOV
125
        if target_user_id != auth_user_id:
Branches [[0, 126], [0, 128]] missed. !
126
            raise AuthError(400, "Cannot set another user's typing state")
!
127

UNCOV
128
        yield self.auth.check_joined_room(room_id, target_user_id)
!
129

UNCOV
130
        logger.debug("%s has started typing in %s", target_user_id, room_id)
!
131

UNCOV
132
        member = RoomMember(room_id=room_id, user_id=target_user_id)
!
133

UNCOV
134
        was_present = member.user_id in self._room_typing.get(room_id, set())
!
135

UNCOV
136
        now = self.clock.time_msec()
!
UNCOV
137
        self._member_typing_until[member] = now + timeout
!
138

UNCOV
139
        self.wheel_timer.insert(now=now, obj=member, then=now + timeout)
!
140

UNCOV
141
        if was_present:
Branches [[0, 143], [0, 145]] missed. !
142
            # No point sending another notification
143
            return None
!
144

UNCOV
145
        self._push_update(member=member, typing=True)
!
146

147
    @defer.inlineCallbacks
1×
148
    def stopped_typing(self, target_user, auth_user, room_id):
UNCOV
149
        target_user_id = target_user.to_string()
!
UNCOV
150
        auth_user_id = auth_user.to_string()
!
151

UNCOV
152
        if not self.is_mine_id(target_user_id):
Branches [[0, 153], [0, 155]] missed. !
153
            raise SynapseError(400, "User is not hosted on this Home Server")
!
154

UNCOV
155
        if target_user_id != auth_user_id:
Branches [[0, 156], [0, 158]] missed. !
156
            raise AuthError(400, "Cannot set another user's typing state")
!
157

UNCOV
158
        yield self.auth.check_joined_room(room_id, target_user_id)
!
159

UNCOV
160
        logger.debug("%s has stopped typing in %s", target_user_id, room_id)
!
161

UNCOV
162
        member = RoomMember(room_id=room_id, user_id=target_user_id)
!
163

UNCOV
164
        self._stopped_typing(member)
!
165

166
    @defer.inlineCallbacks
1×
167
    def user_left_room(self, user, room_id):
UNCOV
168
        user_id = user.to_string()
!
UNCOV
169
        if self.is_mine_id(user_id):
Branches [[0, 166], [0, 170]] missed. !
UNCOV
170
            member = RoomMember(room_id=room_id, user_id=user_id)
!
UNCOV
171
            yield self._stopped_typing(member)
!
172

173
    def _stopped_typing(self, member):
1×
UNCOV
174
        if member.user_id not in self._room_typing.get(member.room_id, set()):
Branches [[0, 176], [0, 178]] missed. !
175
            # No point
UNCOV
176
            return None
!
177

UNCOV
178
        self._member_typing_until.pop(member, None)
!
UNCOV
179
        self._member_last_federation_poke.pop(member, None)
!
180

UNCOV
181
        self._push_update(member=member, typing=False)
!
182

183
    def _push_update(self, member, typing):
1×
UNCOV
184
        if self.hs.is_mine_id(member.user_id):
Branches [[0, 186], [0, 188]] missed. !
185
            # Only send updates for changes to our own users.
UNCOV
186
            run_in_background(self._push_remote, member, typing)
!
187

UNCOV
188
        self._push_update_local(member=member, typing=typing)
!
189

190
    @defer.inlineCallbacks
1×
191
    def _push_remote(self, member, typing):
UNCOV
192
        try:
!
UNCOV
193
            users = yield self.state.get_current_users_in_room(member.room_id)
!
UNCOV
194
            self._member_last_federation_poke[member] = self.clock.time_msec()
!
195

UNCOV
196
            now = self.clock.time_msec()
!
UNCOV
197
            self.wheel_timer.insert(
!
198
                now=now, obj=member, then=now + FEDERATION_PING_INTERVAL
199
            )
200

UNCOV
201
            for domain in set(get_domain_from_id(u) for u in users):
Branches [[0, 201], [0, 190], [0, 202]] missed. !
UNCOV
202
                if domain != self.server_name:
Branches [[0, 201], [0, 203]] missed. !
UNCOV
203
                    logger.debug("sending typing update to %s", domain)
!
UNCOV
204
                    self.federation.build_and_send_edu(
!
205
                        destination=domain,
206
                        edu_type="m.typing",
207
                        content={
208
                            "room_id": member.room_id,
209
                            "user_id": member.user_id,
210
                            "typing": typing,
211
                        },
212
                        key=member,
213
                    )
214
        except Exception:
!
215
            logger.exception("Error pushing typing notif to remotes")
!
216

217
    @defer.inlineCallbacks
1×
218
    def _recv_edu(self, origin, content):
UNCOV
219
        room_id = content["room_id"]
!
UNCOV
220
        user_id = content["user_id"]
!
221

UNCOV
222
        member = RoomMember(user_id=user_id, room_id=room_id)
!
223

224
        # Check that the string is a valid user id
UNCOV
225
        user = UserID.from_string(user_id)
!
226

UNCOV
227
        if user.domain != origin:
Branches [[0, 228], [0, 233]] missed. !
UNCOV
228
            logger.info(
!
229
                "Got typing update from %r with bad 'user_id': %r", origin, user_id
230
            )
UNCOV
231
            return
!
232

UNCOV
233
        users = yield self.state.get_current_users_in_room(room_id)
!
UNCOV
234
        domains = set(get_domain_from_id(u) for u in users)
Branches [[0, 234], [0, 236]] missed. !
235

UNCOV
236
        if self.server_name in domains:
Branches [[0, 217], [0, 237]] missed. !
UNCOV
237
            logger.info("Got typing update from %s: %r", user_id, content)
!
UNCOV
238
            now = self.clock.time_msec()
!
UNCOV
239
            self._member_typing_until[member] = now + FEDERATION_TIMEOUT
!
UNCOV
240
            self.wheel_timer.insert(now=now, obj=member, then=now + FEDERATION_TIMEOUT)
!
UNCOV
241
            self._push_update_local(member=member, typing=content["typing"])
!
242

243
    def _push_update_local(self, member, typing):
1×
UNCOV
244
        room_set = self._room_typing.setdefault(member.room_id, set())
!
UNCOV
245
        if typing:
Branches [[0, 246], [0, 248]] missed. !
UNCOV
246
            room_set.add(member.user_id)
!
247
        else:
UNCOV
248
            room_set.discard(member.user_id)
!
249

UNCOV
250
        self._latest_room_serial += 1
!
UNCOV
251
        self._room_serials[member.room_id] = self._latest_room_serial
!
UNCOV
252
        self._typing_stream_change_cache.entity_has_changed(
!
253
            member.room_id, self._latest_room_serial
254
        )
255

UNCOV
256
        self.notifier.on_new_event(
!
257
            "typing_key", self._latest_room_serial, rooms=[member.room_id]
258
        )
259

260
    def get_all_typing_updates(self, last_id, current_id):
1×
261
        if last_id == current_id:
Branches [[0, 262], [0, 264]] missed. !
262
            return []
!
263

264
        changed_rooms = self._typing_stream_change_cache.get_all_entities_changed(
!
265
            last_id
266
        )
267

268
        if changed_rooms is None:
Branches [[0, 269], [0, 271]] missed. !
269
            changed_rooms = self._room_serials
!
270

271
        rows = []
!
272
        for room_id in changed_rooms:
Branches [[0, 273], [0, 277]] missed. !
273
            serial = self._room_serials[room_id]
!
274
            if last_id < serial <= current_id:
Branches [[0, 272], [0, 275]] missed. !
275
                typing = self._room_typing[room_id]
!
276
                rows.append((serial, room_id, list(typing)))
!
277
        rows.sort()
!
278
        return rows
!
279

280
    def get_current_token(self):
1×
UNCOV
281
        return self._latest_room_serial
!
282

283

284
class TypingNotificationEventSource(object):
1×
285
    def __init__(self, hs):
1×
UNCOV
286
        self.hs = hs
!
UNCOV
287
        self.clock = hs.get_clock()
!
288
        # We can't call get_typing_handler here because there's a cycle:
289
        #
290
        #   Typing -> Notifier -> TypingNotificationEventSource -> Typing
291
        #
UNCOV
292
        self.get_typing_handler = hs.get_typing_handler
!
293

294
    def _make_event_for(self, room_id):
1×
UNCOV
295
        typing = self.get_typing_handler()._room_typing[room_id]
!
UNCOV
296
        return {
!
297
            "type": "m.typing",
298
            "room_id": room_id,
299
            "content": {"user_ids": list(typing)},
300
        }
301

302
    def get_new_events(self, from_key, room_ids, **kwargs):
1×
UNCOV
303
        with Measure(self.clock, "typing.get_new_events"):
!
UNCOV
304
            from_key = int(from_key)
!
UNCOV
305
            handler = self.get_typing_handler()
!
306

UNCOV
307
            events = []
!
UNCOV
308
            for room_id in room_ids:
Branches [[0, 309], [0, 316]] missed. !
UNCOV
309
                if room_id not in handler._room_serials:
Branches [[0, 310], [0, 311]] missed. !
UNCOV
310
                    continue
!
UNCOV
311
                if handler._room_serials[room_id] <= from_key:
Branches [[0, 312], [0, 314]] missed. !
UNCOV
312
                    continue
!
313

UNCOV
314
                events.append(self._make_event_for(room_id))
!
315

UNCOV
316
            return events, handler._latest_room_serial
!
317

318
    def get_current_key(self):
1×
UNCOV
319
        return self.get_typing_handler()._latest_room_serial
!
320

321
    def get_pagination_rows(self, user, pagination_config, key):
1×
322
        return [], pagination_config.from_key
!
Troubleshooting · Open an Issue · Sales · Support · ENTERPRISE · CAREERS · STATUS
BLOG · TWITTER · Legal & Privacy · Supported CI Services · What's a CI service? · Automated Testing

© 2019 Coveralls, LLC