Coveralls logob
Coveralls logo
  • Home
  • Features
  • Pricing
  • Docs
  • Sign In

matrix-org / synapse / 4532

23 Sep 2019 - 19:39 coverage decreased (-49.7%) to 17.596%
4532

Pull #6079

buildkite

Richard van der Hoff
update changelog
Pull Request #6079: Add submit_url response parameter to msisdn /requestToken

359 of 12986 branches covered (2.76%)

Branch coverage included in aggregate %.

0 of 7 new or added lines in 1 file covered. (0.0%)

18869 existing lines in 281 files now uncovered.

8809 of 39116 relevant lines covered (22.52%)

0.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

12.74
/synapse/push/pusherpool.py
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
# Copyright 2015, 2016 OpenMarket Ltd
4
#
5
# Licensed under the Apache License, Version 2.0 (the "License");
6
# you may not use this file except in compliance with the License.
7
# You may obtain a copy of the License at
8
#
9
#     http://www.apache.org/licenses/LICENSE-2.0
10
#
11
# Unless required by applicable law or agreed to in writing, software
12
# distributed under the License is distributed on an "AS IS" BASIS,
13
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
# See the License for the specific language governing permissions and
15
# limitations under the License.
16

17
import logging
1×
18

19
from twisted.internet import defer
1×
20

21
from synapse.metrics.background_process_metrics import run_as_background_process
1×
22
from synapse.push import PusherConfigException
1×
23
from synapse.push.pusher import PusherFactory
1×
24
from synapse.util.async_helpers import concurrently_execute
1×
25

26
logger = logging.getLogger(__name__)
1×
27

28

29
class PusherPool:
1×
30
    """
31
    The pusher pool. This is responsible for dispatching notifications of new events to
32
    the http and email pushers.
33

34
    It provides three methods which are designed to be called by the rest of the
35
    application: `start`, `on_new_notifications`, and `on_new_receipts`: each of these
36
    delegates to each of the relevant pushers.
37

38
    Note that it is expected that each pusher will have its own 'processing' loop which
39
    will send out the notifications in the background, rather than blocking until the
40
    notifications are sent; accordingly Pusher.on_started, Pusher.on_new_notifications and
41
    Pusher.on_new_receipts are not expected to return deferreds.
42
    """
43

44
    def __init__(self, _hs):
1×
UNCOV
45
        self.hs = _hs
!
UNCOV
46
        self.pusher_factory = PusherFactory(_hs)
!
UNCOV
47
        self._should_start_pushers = _hs.config.start_pushers
!
UNCOV
48
        self.store = self.hs.get_datastore()
!
UNCOV
49
        self.clock = self.hs.get_clock()
!
UNCOV
50
        self.pushers = {}
!
51

52
    def start(self):
1×
53
        """Starts the pushers off in a background process.
54
        """
UNCOV
55
        if not self._should_start_pushers:
Branches [[0, 56], [0, 58]] missed. !
56
            logger.info("Not starting pushers because they are disabled in the config")
!
57
            return
!
UNCOV
58
        run_as_background_process("start_pushers", self._start_pushers)
!
59

60
    @defer.inlineCallbacks
1×
61
    def add_pusher(
1×
62
        self,
63
        user_id,
64
        access_token,
65
        kind,
66
        app_id,
67
        app_display_name,
68
        device_display_name,
69
        pushkey,
70
        lang,
71
        data,
72
        profile_tag="",
73
    ):
74
        """Creates a new pusher and adds it to the pool
75

76
        Returns:
77
            Deferred[EmailPusher|HttpPusher]
78
        """
UNCOV
79
        time_now_msec = self.clock.time_msec()
!
80

81
        # we try to create the pusher just to validate the config: it
82
        # will then get pulled out of the database,
83
        # recreated, added and started: this means we have only one
84
        # code path adding pushers.
UNCOV
85
        self.pusher_factory.create_pusher(
!
86
            {
87
                "id": None,
88
                "user_name": user_id,
89
                "kind": kind,
90
                "app_id": app_id,
91
                "app_display_name": app_display_name,
92
                "device_display_name": device_display_name,
93
                "pushkey": pushkey,
94
                "ts": time_now_msec,
95
                "lang": lang,
96
                "data": data,
97
                "last_stream_ordering": None,
98
                "last_success": None,
99
                "failing_since": None,
100
            }
101
        )
102

103
        # create the pusher setting last_stream_ordering to the current maximum
104
        # stream ordering in event_push_actions, so it will process
105
        # pushes from this point onwards.
UNCOV
106
        last_stream_ordering = (
!
107
            yield self.store.get_latest_push_action_stream_ordering()
108
        )
109

UNCOV
110
        yield self.store.add_pusher(
!
111
            user_id=user_id,
112
            access_token=access_token,
113
            kind=kind,
114
            app_id=app_id,
115
            app_display_name=app_display_name,
116
            device_display_name=device_display_name,
117
            pushkey=pushkey,
118
            pushkey_ts=time_now_msec,
119
            lang=lang,
120
            data=data,
121
            last_stream_ordering=last_stream_ordering,
122
            profile_tag=profile_tag,
123
        )
UNCOV
124
        pusher = yield self.start_pusher_by_id(app_id, pushkey, user_id)
!
125

UNCOV
126
        return pusher
!
127

128
    @defer.inlineCallbacks
1×
129
    def remove_pushers_by_app_id_and_pushkey_not_user(
130
        self, app_id, pushkey, not_user_id
131
    ):
UNCOV
132
        to_remove = yield self.store.get_pushers_by_app_id_and_pushkey(app_id, pushkey)
!
UNCOV
133
        for p in to_remove:
Branches [[0, 128], [0, 134]] missed. !
UNCOV
134
            if p["user_name"] != not_user_id:
Branches [[0, 133], [0, 135]] missed. !
UNCOV
135
                logger.info(
!
136
                    "Removing pusher for app id %s, pushkey %s, user %s",
137
                    app_id,
138
                    pushkey,
139
                    p["user_name"],
140
                )
UNCOV
141
                yield self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])
!
142

143
    @defer.inlineCallbacks
1×
144
    def remove_pushers_by_access_token(self, user_id, access_tokens):
145
        """Remove the pushers for a given user corresponding to a set of
146
        access_tokens.
147

148
        Args:
149
            user_id (str): user to remove pushers for
150
            access_tokens (Iterable[int]): access token *ids* to remove pushers
151
                for
152
        """
UNCOV
153
        tokens = set(access_tokens)
!
UNCOV
154
        for p in (yield self.store.get_pushers_by_user_id(user_id)):
Branches [[0, 143], [0, 155]] missed. !
UNCOV
155
            if p["access_token"] in tokens:
Branches [[0, 154], [0, 156]] missed. !
UNCOV
156
                logger.info(
!
157
                    "Removing pusher for app id %s, pushkey %s, user %s",
158
                    p["app_id"],
159
                    p["pushkey"],
160
                    p["user_name"],
161
                )
UNCOV
162
                yield self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])
!
163

164
    @defer.inlineCallbacks
1×
165
    def on_new_notifications(self, min_stream_id, max_stream_id):
UNCOV
166
        if not self.pushers:
Branches [[0, 168], [0, 170]] missed. !
167
            # nothing to do here.
UNCOV
168
            return
!
169

UNCOV
170
        try:
!
UNCOV
171
            users_affected = yield self.store.get_push_action_users_in_range(
!
172
                min_stream_id, max_stream_id
173
            )
174

UNCOV
175
            for u in users_affected:
Branches [[0, 164], [0, 176]] missed. !
UNCOV
176
                if u in self.pushers:
Branches [[0, 175], [0, 177]] missed. !
UNCOV
177
                    for p in self.pushers[u].values():
Branches [[0, 175], [0, 178]] missed. !
UNCOV
178
                        p.on_new_notifications(min_stream_id, max_stream_id)
!
179

180
        except Exception:
!
181
            logger.exception("Exception in pusher on_new_notifications")
!
182

183
    @defer.inlineCallbacks
1×
184
    def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
UNCOV
185
        if not self.pushers:
Branches [[0, 187], [0, 189]] missed. !
186
            # nothing to do here.
UNCOV
187
            return
!
188

UNCOV
189
        try:
!
190
            # Need to subtract 1 from the minimum because the lower bound here
191
            # is not inclusive
UNCOV
192
            updated_receipts = yield self.store.get_all_updated_receipts(
!
193
                min_stream_id - 1, max_stream_id
194
            )
195
            # This returns a tuple, user_id is at index 3
UNCOV
196
            users_affected = set([r[3] for r in updated_receipts])
Branches [[0, 196], [0, 198]] missed. !
197

UNCOV
198
            for u in users_affected:
Branches [[0, 183], [0, 199]] missed. !
UNCOV
199
                if u in self.pushers:
Branches [[0, 198], [0, 200]] missed. !
UNCOV
200
                    for p in self.pushers[u].values():
Branches [[0, 198], [0, 201]] missed. !
UNCOV
201
                        p.on_new_receipts(min_stream_id, max_stream_id)
!
202

203
        except Exception:
!
204
            logger.exception("Exception in pusher on_new_receipts")
!
205

206
    @defer.inlineCallbacks
1×
207
    def start_pusher_by_id(self, app_id, pushkey, user_id):
208
        """Look up the details for the given pusher, and start it
209

210
        Returns:
211
            Deferred[EmailPusher|HttpPusher|None]: The pusher started, if any
212
        """
UNCOV
213
        if not self._should_start_pushers:
Branches [[0, 214], [0, 216]] missed. !
214
            return
!
215

UNCOV
216
        resultlist = yield self.store.get_pushers_by_app_id_and_pushkey(app_id, pushkey)
!
217

UNCOV
218
        pusher_dict = None
!
UNCOV
219
        for r in resultlist:
Branches [[0, 220], [0, 223]] missed. !
UNCOV
220
            if r["user_name"] == user_id:
Branches [[0, 219], [0, 221]] missed. !
UNCOV
221
                pusher_dict = r
!
222

UNCOV
223
        pusher = None
!
UNCOV
224
        if pusher_dict:
Branches [[0, 225], [0, 227]] missed. !
UNCOV
225
            pusher = yield self._start_pusher(pusher_dict)
!
226

UNCOV
227
        return pusher
!
228

229
    @defer.inlineCallbacks
1×
230
    def _start_pushers(self):
231
        """Start all the pushers
232

233
        Returns:
234
            Deferred
235
        """
UNCOV
236
        pushers = yield self.store.get_all_pushers()
!
UNCOV
237
        logger.info("Starting %d pushers", len(pushers))
!
238

239
        # Stagger starting up the pushers so we don't completely drown the
240
        # process on start up.
UNCOV
241
        yield concurrently_execute(self._start_pusher, pushers, 10)
!
242

UNCOV
243
        logger.info("Started pushers")
!
244

245
    @defer.inlineCallbacks
1×
246
    def _start_pusher(self, pusherdict):
247
        """Start the given pusher
248

249
        Args:
250
            pusherdict (dict):
251

252
        Returns:
253
            Deferred[EmailPusher|HttpPusher]
254
        """
UNCOV
255
        try:
!
UNCOV
256
            p = self.pusher_factory.create_pusher(pusherdict)
!
257
        except PusherConfigException as e:
Branches [[0, 258], [0, 266]] missed. !
258
            logger.warning(
!
259
                "Pusher incorrectly configured user=%s, appid=%s, pushkey=%s: %s",
260
                pusherdict.get("user_name"),
261
                pusherdict.get("app_id"),
262
                pusherdict.get("pushkey"),
263
                e,
264
            )
265
            return
!
266
        except Exception:
!
267
            logger.exception("Couldn't start a pusher: caught Exception")
!
268
            return
!
269

UNCOV
270
        if not p:
Branches [[0, 271], [0, 273]] missed. !
UNCOV
271
            return
!
272

UNCOV
273
        appid_pushkey = "%s:%s" % (pusherdict["app_id"], pusherdict["pushkey"])
!
UNCOV
274
        byuser = self.pushers.setdefault(pusherdict["user_name"], {})
!
275

UNCOV
276
        if appid_pushkey in byuser:
Branches [[0, 277], [0, 278]] missed. !
277
            byuser[appid_pushkey].on_stop()
!
UNCOV
278
        byuser[appid_pushkey] = p
!
279

280
        # Check if there *may* be push to process. We do this as this check is a
281
        # lot cheaper to do than actually fetching the exact rows we need to
282
        # push.
UNCOV
283
        user_id = pusherdict["user_name"]
!
UNCOV
284
        last_stream_ordering = pusherdict["last_stream_ordering"]
!
UNCOV
285
        if last_stream_ordering:
Branches [[0, 286], [0, 292]] missed. !
UNCOV
286
            have_notifs = yield self.store.get_if_maybe_push_in_range_for_user(
!
287
                user_id, last_stream_ordering
288
            )
289
        else:
290
            # We always want to default to starting up the pusher rather than
291
            # risk missing push.
UNCOV
292
            have_notifs = True
!
293

UNCOV
294
        p.on_started(have_notifs)
!
295

UNCOV
296
        return p
!
297

298
    @defer.inlineCallbacks
1×
299
    def remove_pusher(self, app_id, pushkey, user_id):
UNCOV
300
        appid_pushkey = "%s:%s" % (app_id, pushkey)
!
301

UNCOV
302
        byuser = self.pushers.get(user_id, {})
!
303

UNCOV
304
        if appid_pushkey in byuser:
Branches [[0, 305], [0, 308]] missed. !
UNCOV
305
            logger.info("Stopping pusher %s / %s", user_id, appid_pushkey)
!
UNCOV
306
            byuser[appid_pushkey].on_stop()
!
UNCOV
307
            del byuser[appid_pushkey]
!
UNCOV
308
        yield self.store.delete_pusher_by_app_id_pushkey_user_id(
!
309
            app_id, pushkey, user_id
310
        )
Troubleshooting · Open an Issue · Sales · Support · ENTERPRISE · CAREERS · STATUS
BLOG · TWITTER · Legal & Privacy · Supported CI Services · What's a CI service? · Automated Testing

© 2019 Coveralls, LLC