Coveralls logob
Coveralls logo
  • Home
  • Features
  • Pricing
  • Docs
  • Sign In

matrix-org / synapse / 4532

23 Sep 2019 - 19:39 coverage decreased (-49.7%) to 17.596%
4532

Pull #6079

buildkite

Richard van der Hoff
update changelog
Pull Request #6079: Add submit_url response parameter to msisdn /requestToken

359 of 12986 branches covered (2.76%)

Branch coverage included in aggregate %.

0 of 7 new or added lines in 1 file covered. (0.0%)

18869 existing lines in 281 files now uncovered.

8809 of 39116 relevant lines covered (22.52%)

0.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

12.04
/synapse/handlers/appservice.py
1
# -*- coding: utf-8 -*-
2
# Copyright 2015, 2016 OpenMarket Ltd
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15

16
import logging
1×
17

18
from six import itervalues
1×
19

20
from prometheus_client import Counter
1×
21

22
from twisted.internet import defer
1×
23

24
import synapse
1×
25
from synapse.api.constants import EventTypes
1×
26
from synapse.logging.context import make_deferred_yieldable, run_in_background
1×
27
from synapse.metrics import (
1×
28
    event_processing_loop_counter,
29
    event_processing_loop_room_count,
30
)
31
from synapse.metrics.background_process_metrics import run_as_background_process
1×
32
from synapse.util import log_failure
1×
33
from synapse.util.metrics import Measure
1×
34

35
logger = logging.getLogger(__name__)
1×
36

37
events_processed_counter = Counter("synapse_handlers_appservice_events_processed", "")
1×
38

39

40
class ApplicationServicesHandler(object):
1×
41
    def __init__(self, hs):
1×
UNCOV
42
        self.store = hs.get_datastore()
!
UNCOV
43
        self.is_mine_id = hs.is_mine_id
!
UNCOV
44
        self.appservice_api = hs.get_application_service_api()
!
UNCOV
45
        self.scheduler = hs.get_application_service_scheduler()
!
UNCOV
46
        self.started_scheduler = False
!
UNCOV
47
        self.clock = hs.get_clock()
!
UNCOV
48
        self.notify_appservices = hs.config.notify_appservices
!
49

UNCOV
50
        self.current_max = 0
!
UNCOV
51
        self.is_processing = False
!
52

53
    @defer.inlineCallbacks
1×
54
    def notify_interested_services(self, current_id):
55
        """Notifies (pushes) all application services interested in this event.
56

57
        Pushing is done asynchronously, so this method won't block for any
58
        prolonged length of time.
59

60
        Args:
61
            current_id(int): The current maximum ID.
62
        """
UNCOV
63
        services = self.store.get_app_services()
!
UNCOV
64
        if not services or not self.notify_appservices:
Branches [[0, 65], [0, 67]] missed. !
UNCOV
65
            return
!
66

UNCOV
67
        self.current_max = max(self.current_max, current_id)
!
UNCOV
68
        if self.is_processing:
Branches [[0, 69], [0, 71]] missed. !
UNCOV
69
            return
!
70

UNCOV
71
        with Measure(self.clock, "notify_interested_services"):
!
UNCOV
72
            self.is_processing = True
!
UNCOV
73
            try:
!
UNCOV
74
                limit = 100
!
UNCOV
75
                while True:
!
UNCOV
76
                    upper_bound, events = yield self.store.get_new_events_for_appservice(
!
77
                        self.current_max, limit
78
                    )
79

UNCOV
80
                    if not events:
Branches [[0, 81], [0, 83]] missed. !
UNCOV
81
                        break
!
82

UNCOV
83
                    events_by_room = {}
!
UNCOV
84
                    for event in events:
Branches [[0, 85], [0, 87]] missed. !
UNCOV
85
                        events_by_room.setdefault(event.room_id, []).append(event)
!
86

UNCOV
87
                    @defer.inlineCallbacks
!
88
                    def handle_event(event):
89
                        # Gather interested services
UNCOV
90
                        services = yield self._get_services_for_event(event)
!
UNCOV
91
                        if len(services) == 0:
Branches [[0, 92], [0, 98]] missed. !
UNCOV
92
                            return  # no services need notifying
!
93

94
                        # Do we know this user exists? If not, poke the user
95
                        # query API for all services which match that user regex.
96
                        # This needs to block as these user queries need to be
97
                        # made BEFORE pushing the event.
UNCOV
98
                        yield self._check_user_exists(event.sender)
!
UNCOV
99
                        if event.type == EventTypes.Member:
Branches [[0, 100], [0, 102]] missed. !
UNCOV
100
                            yield self._check_user_exists(event.state_key)
!
101

UNCOV
102
                        if not self.started_scheduler:
Branches [[0, 104], [0, 113]] missed. !
103

UNCOV
104
                            def start_scheduler():
!
UNCOV
105
                                return self.scheduler.start().addErrback(
!
106
                                    log_failure, "Application Services Failure"
107
                                )
108

UNCOV
109
                            run_as_background_process("as_scheduler", start_scheduler)
!
UNCOV
110
                            self.started_scheduler = True
!
111

112
                        # Fork off pushes to these services
UNCOV
113
                        for service in services:
Branches [[0, 87], [0, 114]] missed. !
UNCOV
114
                            self.scheduler.submit_event_for_as(service, event)
!
115

UNCOV
116
                    @defer.inlineCallbacks
!
117
                    def handle_room_events(events):
UNCOV
118
                        for event in events:
Branches [[0, 116], [0, 119]] missed. !
UNCOV
119
                            yield handle_event(event)
!
120

UNCOV
121
                    yield make_deferred_yieldable(
Branches [[0, 124], [0, 131]] missed. !
122
                        defer.gatherResults(
123
                            [
124
                                run_in_background(handle_room_events, evs)
125
                                for evs in itervalues(events_by_room)
126
                            ],
127
                            consumeErrors=True,
128
                        )
129
                    )
130

UNCOV
131
                    yield self.store.set_appservice_last_pos(upper_bound)
!
132

UNCOV
133
                    now = self.clock.time_msec()
!
UNCOV
134
                    ts = yield self.store.get_received_ts(events[-1].event_id)
!
135

UNCOV
136
                    synapse.metrics.event_processing_positions.labels(
!
137
                        "appservice_sender"
138
                    ).set(upper_bound)
139

UNCOV
140
                    events_processed_counter.inc(len(events))
!
141

UNCOV
142
                    event_processing_loop_room_count.labels("appservice_sender").inc(
!
143
                        len(events_by_room)
144
                    )
145

UNCOV
146
                    event_processing_loop_counter.labels("appservice_sender").inc()
!
147

UNCOV
148
                    synapse.metrics.event_processing_lag.labels(
!
149
                        "appservice_sender"
150
                    ).set(now - ts)
UNCOV
151
                    synapse.metrics.event_processing_last_ts.labels(
!
152
                        "appservice_sender"
153
                    ).set(ts)
154
            finally:
UNCOV
155
                self.is_processing = False
!
156

157
    @defer.inlineCallbacks
1×
158
    def query_user_exists(self, user_id):
159
        """Check if any application service knows this user_id exists.
160

161
        Args:
162
            user_id(str): The user to query if they exist on any AS.
163
        Returns:
164
            True if this user exists on at least one application service.
165
        """
UNCOV
166
        user_query_services = yield self._get_services_for_user(user_id=user_id)
!
UNCOV
167
        for user_service in user_query_services:
Branches [[0, 168], [0, 171]] missed. !
UNCOV
168
            is_known_user = yield self.appservice_api.query_user(user_service, user_id)
!
UNCOV
169
            if is_known_user:
Branches [[0, 167], [0, 170]] missed. !
UNCOV
170
                return True
!
171
        return False
!
172

173
    @defer.inlineCallbacks
1×
174
    def query_room_alias_exists(self, room_alias):
175
        """Check if an application service knows this room alias exists.
176

177
        Args:
178
            room_alias(RoomAlias): The room alias to query.
179
        Returns:
180
            namedtuple: with keys "room_id" and "servers" or None if no
181
            association can be found.
182
        """
UNCOV
183
        room_alias_str = room_alias.to_string()
!
UNCOV
184
        services = self.store.get_app_services()
!
UNCOV
185
        alias_query_services = [
Branches [[0, 186], [0, 188]] missed. !
186
            s for s in services if (s.is_interested_in_alias(room_alias_str))
187
        ]
UNCOV
188
        for alias_service in alias_query_services:
Branches [[0, 173], [0, 189]] missed. !
UNCOV
189
            is_known_alias = yield self.appservice_api.query_alias(
!
190
                alias_service, room_alias_str
191
            )
UNCOV
192
            if is_known_alias:
Branches [[0, 188], [0, 194]] missed. !
193
                # the alias exists now so don't query more ASes.
UNCOV
194
                result = yield self.store.get_association_from_room_alias(room_alias)
!
UNCOV
195
                return result
!
196

197
    @defer.inlineCallbacks
1×
198
    def query_3pe(self, kind, protocol, fields):
UNCOV
199
        services = yield self._get_services_for_3pn(protocol)
!
200

UNCOV
201
        results = yield make_deferred_yieldable(
Branches [[0, 204], [0, 213]] missed. !
202
            defer.DeferredList(
203
                [
204
                    run_in_background(
205
                        self.appservice_api.query_3pe, service, kind, protocol, fields
206
                    )
207
                    for service in services
208
                ],
209
                consumeErrors=True,
210
            )
211
        )
212

UNCOV
213
        ret = []
!
UNCOV
214
        for (success, result) in results:
Branches [[0, 215], [0, 218]] missed. !
UNCOV
215
            if success:
Branches [[0, 214], [0, 216]] missed. !
UNCOV
216
                ret.extend(result)
!
217

UNCOV
218
        return ret
!
219

220
    @defer.inlineCallbacks
1×
221
    def get_3pe_protocols(self, only_protocol=None):
1×
UNCOV
222
        services = self.store.get_app_services()
!
UNCOV
223
        protocols = {}
!
224

225
        # Collect up all the individual protocol responses out of the ASes
UNCOV
226
        for s in services:
Branches [[0, 227], [0, 239]] missed. !
UNCOV
227
            for p in s.protocols:
Branches [[0, 226], [0, 228]] missed. !
UNCOV
228
                if only_protocol is not None and p != only_protocol:
Branches [[0, 229], [0, 231]] missed. !
229
                    continue
!
230

UNCOV
231
                if p not in protocols:
Branches [[0, 232], [0, 234]] missed. !
UNCOV
232
                    protocols[p] = []
!
233

UNCOV
234
                info = yield self.appservice_api.get_3pe_protocol(s, p)
!
235

UNCOV
236
                if info is not None:
Branches [[0, 227], [0, 237]] missed. !
UNCOV
237
                    protocols[p].append(info)
!
238

UNCOV
239
        def _merge_instances(infos):
!
UNCOV
240
            if not infos:
Branches [[0, 241], [0, 246]] missed. !
241
                return {}
!
242

243
            # Merge the 'instances' lists of multiple results, but just take
244
            # the other fields from the first as they ought to be identical
245
            # copy the result so as not to corrupt the cached one
UNCOV
246
            combined = dict(infos[0])
!
UNCOV
247
            combined["instances"] = list(combined["instances"])
!
248

UNCOV
249
            for info in infos[1:]:
Branches [[0, 250], [0, 252]] missed. !
UNCOV
250
                combined["instances"].extend(info["instances"])
!
251

UNCOV
252
            return combined
!
253

UNCOV
254
        for p in protocols.keys():
Branches [[0, 255], [0, 257]] missed. !
UNCOV
255
            protocols[p] = _merge_instances(protocols[p])
!
256

UNCOV
257
        return protocols
!
258

259
    @defer.inlineCallbacks
1×
260
    def _get_services_for_event(self, event):
261
        """Retrieve a list of application services interested in this event.
262

263
        Args:
264
            event(Event): The event to check. Can be None if alias_list is not.
265
        Returns:
266
            list<ApplicationService>: A list of services interested in this
267
            event based on the service regex.
268
        """
UNCOV
269
        services = self.store.get_app_services()
!
270

271
        # we can't use a list comprehension here. Since python 3, list
272
        # comprehensions use a generator internally. This means you can't yield
273
        # inside of a list comprehension anymore.
UNCOV
274
        interested_list = []
!
UNCOV
275
        for s in services:
Branches [[0, 276], [0, 279]] missed. !
UNCOV
276
            if (yield s.is_interested(event, self.store)):
Branches [[0, 275], [0, 277]] missed. !
UNCOV
277
                interested_list.append(s)
!
278

UNCOV
279
        return interested_list
!
280

281
    def _get_services_for_user(self, user_id):
1×
UNCOV
282
        services = self.store.get_app_services()
!
UNCOV
283
        interested_list = [s for s in services if (s.is_interested_in_user(user_id))]
Branches [[0, 283], [0, 284]] missed. !
UNCOV
284
        return defer.succeed(interested_list)
!
285

286
    def _get_services_for_3pn(self, protocol):
1×
UNCOV
287
        services = self.store.get_app_services()
!
UNCOV
288
        interested_list = [s for s in services if s.is_interested_in_protocol(protocol)]
Branches [[0, 288], [0, 289]] missed. !
UNCOV
289
        return defer.succeed(interested_list)
!
290

291
    @defer.inlineCallbacks
1×
292
    def _is_unknown_user(self, user_id):
UNCOV
293
        if not self.is_mine_id(user_id):
Branches [[0, 296], [0, 298]] missed. !
294
            # we don't know if they are unknown or not since it isn't one of our
295
            # users. We can't poke ASes.
296
            return False
!
297

UNCOV
298
        user_info = yield self.store.get_user_by_id(user_id)
!
UNCOV
299
        if user_info:
Branches [[0, 300], [0, 303]] missed. !
UNCOV
300
            return False
!
301

302
        # user not found; could be the AS though, so check.
UNCOV
303
        services = self.store.get_app_services()
!
UNCOV
304
        service_list = [s for s in services if s.sender == user_id]
Branches [[0, 304], [0, 305]] missed. !
UNCOV
305
        return len(service_list) == 0
!
306

307
    @defer.inlineCallbacks
1×
308
    def _check_user_exists(self, user_id):
UNCOV
309
        unknown_user = yield self._is_unknown_user(user_id)
!
UNCOV
310
        if unknown_user:
Branches [[0, 311], [0, 313]] missed. !
UNCOV
311
            exists = yield self.query_user_exists(user_id)
!
UNCOV
312
            return exists
!
UNCOV
313
        return True
!
Troubleshooting · Open an Issue · Sales · Support · ENTERPRISE · CAREERS · STATUS
BLOG · TWITTER · Legal & Privacy · Supported CI Services · What's a CI service? · Automated Testing

© 2019 Coveralls, LLC