Coveralls logob
Coveralls logo
  • Home
  • Features
  • Pricing
  • Docs
  • Sign In

matrix-org / synapse / 4532

23 Sep 2019 - 19:39 coverage decreased (-49.7%) to 17.596%
4532

Pull #6079

buildkite

Richard van der Hoff
update changelog
Pull Request #6079: Add submit_url response parameter to msisdn /requestToken

359 of 12986 branches covered (2.76%)

Branch coverage included in aggregate %.

0 of 7 new or added lines in 1 file covered. (0.0%)

18869 existing lines in 281 files now uncovered.

8809 of 39116 relevant lines covered (22.52%)

0.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

17.63
/synapse/state/__init__.py
1
# -*- coding: utf-8 -*-
2
# Copyright 2014-2016 OpenMarket Ltd
3
# Copyright 2018 New Vector Ltd
4
#
5
# Licensed under the Apache License, Version 2.0 (the "License");
6
# you may not use this file except in compliance with the License.
7
# You may obtain a copy of the License at
8
#
9
#     http://www.apache.org/licenses/LICENSE-2.0
10
#
11
# Unless required by applicable law or agreed to in writing, software
12
# distributed under the License is distributed on an "AS IS" BASIS,
13
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
# See the License for the specific language governing permissions and
15
# limitations under the License.
16

17
import logging
1×
18
from collections import namedtuple
1×
19

20
from six import iteritems, itervalues
1×
21

22
import attr
1×
23
from frozendict import frozendict
1×
24
from prometheus_client import Histogram
1×
25

26
from twisted.internet import defer
1×
27

28
from synapse.api.constants import EventTypes
1×
29
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, StateResolutionVersions
1×
30
from synapse.events.snapshot import EventContext
1×
31
from synapse.logging.utils import log_function
1×
32
from synapse.state import v1, v2
1×
33
from synapse.util.async_helpers import Linearizer
1×
34
from synapse.util.caches import get_cache_factor_for
1×
35
from synapse.util.caches.expiringcache import ExpiringCache
1×
36
from synapse.util.metrics import Measure
1×
37

38
logger = logging.getLogger(__name__)
1×
39

40

41
# Metrics for number of state groups involved in a resolution.
42
state_groups_histogram = Histogram(
1×
43
    "synapse_state_number_state_groups_in_resolution",
44
    "Number of state groups used when performing a state resolution",
45
    buckets=(1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"),
46
)
47

48

49
KeyStateTuple = namedtuple("KeyStateTuple", ("context", "type", "state_key"))
1×
50

51

52
SIZE_OF_CACHE = 100000 * get_cache_factor_for("state_cache")
1×
53
EVICTION_TIMEOUT_SECONDS = 60 * 60
1×
54

55

56
_NEXT_STATE_ID = 1
1×
57

58
POWER_KEY = (EventTypes.PowerLevels, "")
1×
59

60

61
def _gen_state_id():
1×
62
    global _NEXT_STATE_ID
UNCOV
63
    s = "X%d" % (_NEXT_STATE_ID,)
!
UNCOV
64
    _NEXT_STATE_ID += 1
!
UNCOV
65
    return s
!
66

67

68
class _StateCacheEntry(object):
1×
69
    __slots__ = ["state", "state_group", "state_id", "prev_group", "delta_ids"]
1×
70

71
    def __init__(self, state, state_group, prev_group=None, delta_ids=None):
1×
72
        # dict[(str, str), str] map  from (type, state_key) to event_id
UNCOV
73
        self.state = frozendict(state)
!
74

75
        # the ID of a state group if one and only one is involved.
76
        # otherwise, None otherwise?
UNCOV
77
        self.state_group = state_group
!
78

UNCOV
79
        self.prev_group = prev_group
!
UNCOV
80
        self.delta_ids = frozendict(delta_ids) if delta_ids is not None else None
!
81

82
        # The `state_id` is a unique ID we generate that can be used as ID for
83
        # this collection of state. Usually this would be the same as the
84
        # state group, but on worker instances we can't generate a new state
85
        # group each time we resolve state, so we generate a separate one that
86
        # isn't persisted and is used solely for caches.
87
        # `state_id` is either a state_group (and so an int) or a string. This
88
        # ensures we don't accidentally persist a state_id as a stateg_group
UNCOV
89
        if state_group:
Branches [[0, 90], [0, 92]] missed. !
UNCOV
90
            self.state_id = state_group
!
91
        else:
UNCOV
92
            self.state_id = _gen_state_id()
!
93

94
    def __len__(self):
1×
UNCOV
95
        return len(self.state)
!
96

97

98
class StateHandler(object):
1×
99
    """Fetches bits of state from the stores, and does state resolution
100
    where necessary
101
    """
102

103
    def __init__(self, hs):
1×
UNCOV
104
        self.clock = hs.get_clock()
!
UNCOV
105
        self.store = hs.get_datastore()
!
UNCOV
106
        self.hs = hs
!
UNCOV
107
        self._state_resolution_handler = hs.get_state_resolution_handler()
!
108

109
    @defer.inlineCallbacks
1×
110
    def get_current_state(
1×
111
        self, room_id, event_type=None, state_key="", latest_event_ids=None
112
    ):
113
        """ Retrieves the current state for the room. This is done by
114
        calling `get_latest_events_in_room` to get the leading edges of the
115
        event graph and then resolving any of the state conflicts.
116

117
        This is equivalent to getting the state of an event that were to send
118
        next before receiving any new events.
119

120
        If `event_type` is specified, then the method returns only the one
121
        event (or None) with that `event_type` and `state_key`.
122

123
        Returns:
124
            map from (type, state_key) to event
125
        """
UNCOV
126
        if not latest_event_ids:
Branches [[0, 127], [0, 129]] missed. !
UNCOV
127
            latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
!
128

UNCOV
129
        logger.debug("calling resolve_state_groups from get_current_state")
!
UNCOV
130
        ret = yield self.resolve_state_groups_for_events(room_id, latest_event_ids)
!
UNCOV
131
        state = ret.state
!
132

UNCOV
133
        if event_type:
Branches [[0, 134], [0, 140]] missed. !
UNCOV
134
            event_id = state.get((event_type, state_key))
!
UNCOV
135
            event = None
!
UNCOV
136
            if event_id:
Branches [[0, 137], [0, 138]] missed. !
UNCOV
137
                event = yield self.store.get_event(event_id, allow_none=True)
!
UNCOV
138
            return event
!
139

UNCOV
140
        state_map = yield self.store.get_events(
!
141
            list(state.values()), get_prev_content=False
142
        )
UNCOV
143
        state = {
Branches [[0, 143], [0, 147]] missed. !
144
            key: state_map[e_id] for key, e_id in iteritems(state) if e_id in state_map
145
        }
146

UNCOV
147
        return state
!
148

149
    @defer.inlineCallbacks
1×
150
    def get_current_state_ids(self, room_id, latest_event_ids=None):
1×
151
        """Get the current state, or the state at a set of events, for a room
152

153
        Args:
154
            room_id (str):
155

156
            latest_event_ids (iterable[str]|None): if given, the forward
157
                extremities to resolve. If None, we look them up from the
158
                database (via a cache)
159

160
        Returns:
161
            Deferred[dict[(str, str), str)]]: the state dict, mapping from
162
                (event_type, state_key) -> event_id
163
        """
UNCOV
164
        if not latest_event_ids:
Branches [[0, 165], [0, 167]] missed. !
UNCOV
165
            latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
!
166

UNCOV
167
        logger.debug("calling resolve_state_groups from get_current_state_ids")
!
UNCOV
168
        ret = yield self.resolve_state_groups_for_events(room_id, latest_event_ids)
!
UNCOV
169
        state = ret.state
!
170

UNCOV
171
        return state
!
172

173
    @defer.inlineCallbacks
1×
174
    def get_current_users_in_room(self, room_id, latest_event_ids=None):
1×
175
        """
176
        Get the users who are currently in a room.
177

178
        Args:
179
            room_id (str): The ID of the room.
180
            latest_event_ids (List[str]|None): Precomputed list of latest
181
                event IDs. Will be computed if None.
182
        Returns:
183
            Deferred[Dict[str,ProfileInfo]]: Dictionary of user IDs to their
184
                profileinfo.
185
        """
UNCOV
186
        if not latest_event_ids:
Branches [[0, 187], [0, 188]] missed. !
UNCOV
187
            latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
!
UNCOV
188
        logger.debug("calling resolve_state_groups from get_current_users_in_room")
!
UNCOV
189
        entry = yield self.resolve_state_groups_for_events(room_id, latest_event_ids)
!
UNCOV
190
        joined_users = yield self.store.get_joined_users_from_state(room_id, entry)
!
UNCOV
191
        return joined_users
!
192

193
    @defer.inlineCallbacks
1×
194
    def get_current_hosts_in_room(self, room_id, latest_event_ids=None):
1×
UNCOV
195
        if not latest_event_ids:
Branches [[0, 196], [0, 197]] missed. !
UNCOV
196
            latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
!
UNCOV
197
        logger.debug("calling resolve_state_groups from get_current_hosts_in_room")
!
UNCOV
198
        entry = yield self.resolve_state_groups_for_events(room_id, latest_event_ids)
!
UNCOV
199
        joined_hosts = yield self.store.get_joined_hosts(room_id, entry)
!
UNCOV
200
        return joined_hosts
!
201

202
    @defer.inlineCallbacks
1×
203
    def compute_event_context(self, event, old_state=None):
1×
204
        """Build an EventContext structure for the event.
205

206
        This works out what the current state should be for the event, and
207
        generates a new state group if necessary.
208

209
        Args:
210
            event (synapse.events.EventBase):
211
            old_state (dict|None): The state at the event if it can't be
212
                calculated from existing events. This is normally only specified
213
                when receiving an event from federation where we don't have the
214
                prev events for, e.g. when backfilling.
215
        Returns:
216
            synapse.events.snapshot.EventContext:
217
        """
218

UNCOV
219
        if event.internal_metadata.is_outlier():
Branches [[0, 223], [0, 245]] missed. !
220
            # If this is an outlier, then we know it shouldn't have any current
221
            # state. Certainly store.get_current_state won't return any, and
222
            # persisting the event won't store the state group.
UNCOV
223
            if old_state:
Branches [[0, 224], [0, 232]] missed. !
224
                prev_state_ids = {(s.type, s.state_key): s.event_id for s in old_state}
Branches [[0, 224], [0, 225]] missed. !
225
                if event.is_state():
Branches [[0, 226], [0, 230]] missed. !
226
                    current_state_ids = dict(prev_state_ids)
!
227
                    key = (event.type, event.state_key)
!
228
                    current_state_ids[key] = event.event_id
!
229
                else:
230
                    current_state_ids = prev_state_ids
!
231
            else:
UNCOV
232
                current_state_ids = {}
!
UNCOV
233
                prev_state_ids = {}
!
234

235
            # We don't store state for outliers, so we don't generate a state
236
            # group for it.
UNCOV
237
            context = EventContext.with_state(
!
238
                state_group=None,
239
                current_state_ids=current_state_ids,
240
                prev_state_ids=prev_state_ids,
241
            )
242

UNCOV
243
            return context
!
244

UNCOV
245
        if old_state:
Branches [[0, 250], [0, 279]] missed. !
246
            # We already have the state, so we don't need to calculate it.
247
            # Let's just correctly fill out the context and create a
248
            # new state group for it.
249

UNCOV
250
            prev_state_ids = {(s.type, s.state_key): s.event_id for s in old_state}
Branches [[0, 250], [0, 252]] missed. !
251

UNCOV
252
            if event.is_state():
Branches [[0, 253], [0, 261]] missed. !
UNCOV
253
                key = (event.type, event.state_key)
!
UNCOV
254
                if key in prev_state_ids:
Branches [[0, 255], [0, 258]] missed. !
UNCOV
255
                    replaces = prev_state_ids[key]
!
UNCOV
256
                    if replaces != event.event_id:  # Paranoia check
Branches [[0, 257], [0, 258]] missed. !
UNCOV
257
                        event.unsigned["replaces_state"] = replaces
!
UNCOV
258
                current_state_ids = dict(prev_state_ids)
!
UNCOV
259
                current_state_ids[key] = event.event_id
!
260
            else:
UNCOV
261
                current_state_ids = prev_state_ids
!
262

UNCOV
263
            state_group = yield self.store.store_state_group(
!
264
                event.event_id,
265
                event.room_id,
266
                prev_group=None,
267
                delta_ids=None,
268
                current_state_ids=current_state_ids,
269
            )
270

UNCOV
271
            context = EventContext.with_state(
!
272
                state_group=state_group,
273
                current_state_ids=current_state_ids,
274
                prev_state_ids=prev_state_ids,
275
            )
276

UNCOV
277
            return context
!
278

UNCOV
279
        logger.debug("calling resolve_state_groups from compute_event_context")
!
280

UNCOV
281
        entry = yield self.resolve_state_groups_for_events(
!
282
            event.room_id, event.prev_event_ids()
283
        )
284

UNCOV
285
        prev_state_ids = entry.state
!
UNCOV
286
        prev_group = None
!
UNCOV
287
        delta_ids = None
!
288

UNCOV
289
        if event.is_state():
Branches [[0, 293], [0, 321]] missed. !
290
            # If this is a state event then we need to create a new state
291
            # group for the state after this event.
292

UNCOV
293
            key = (event.type, event.state_key)
!
UNCOV
294
            if key in prev_state_ids:
Branches [[0, 295], [0, 298]] missed. !
UNCOV
295
                replaces = prev_state_ids[key]
!
UNCOV
296
                event.unsigned["replaces_state"] = replaces
!
297

UNCOV
298
            current_state_ids = dict(prev_state_ids)
!
UNCOV
299
            current_state_ids[key] = event.event_id
!
300

UNCOV
301
            if entry.state_group:
Branches [[0, 304], [0, 306]] missed. !
302
                # If the state at the event has a state group assigned then
303
                # we can use that as the prev group
UNCOV
304
                prev_group = entry.state_group
!
UNCOV
305
                delta_ids = {key: event.event_id}
!
UNCOV
306
            elif entry.prev_group:
Branches [[0, 309], [0, 313]] missed. !
307
                # If the state at the event only has a prev group, then we can
308
                # use that as a prev group too.
UNCOV
309
                prev_group = entry.prev_group
!
UNCOV
310
                delta_ids = dict(entry.delta_ids)
!
UNCOV
311
                delta_ids[key] = event.event_id
!
312

UNCOV
313
            state_group = yield self.store.store_state_group(
!
314
                event.event_id,
315
                event.room_id,
316
                prev_group=prev_group,
317
                delta_ids=delta_ids,
318
                current_state_ids=current_state_ids,
319
            )
320
        else:
UNCOV
321
            current_state_ids = prev_state_ids
!
UNCOV
322
            prev_group = entry.prev_group
!
UNCOV
323
            delta_ids = entry.delta_ids
!
324

UNCOV
325
            if entry.state_group is None:
Branches [[0, 326], [0, 335]] missed. !
UNCOV
326
                entry.state_group = yield self.store.store_state_group(
!
327
                    event.event_id,
328
                    event.room_id,
329
                    prev_group=entry.prev_group,
330
                    delta_ids=entry.delta_ids,
331
                    current_state_ids=current_state_ids,
332
                )
UNCOV
333
                entry.state_id = entry.state_group
!
334

UNCOV
335
            state_group = entry.state_group
!
336

UNCOV
337
        context = EventContext.with_state(
!
338
            state_group=state_group,
339
            current_state_ids=current_state_ids,
340
            prev_state_ids=prev_state_ids,
341
            prev_group=prev_group,
342
            delta_ids=delta_ids,
343
        )
344

UNCOV
345
        return context
!
346

347
    @defer.inlineCallbacks
1×
348
    def resolve_state_groups_for_events(self, room_id, event_ids):
349
        """ Given a list of event_ids this method fetches the state at each
350
        event, resolves conflicts between them and returns them.
351

352
        Args:
353
            room_id (str)
354
            event_ids (list[str])
355
            explicit_room_version (str|None): If set uses the the given room
356
                version to choose the resolution algorithm. If None, then
357
                checks the database for room version.
358

359
        Returns:
360
            Deferred[_StateCacheEntry]: resolved state
361
        """
UNCOV
362
        logger.debug("resolve_state_groups event_ids %s", event_ids)
!
363

364
        # map from state group id to the state in that state group (where
365
        # 'state' is a map from state key to event id)
366
        # dict[int, dict[(str, str), str]]
UNCOV
367
        state_groups_ids = yield self.store.get_state_groups_ids(room_id, event_ids)
!
368

UNCOV
369
        if len(state_groups_ids) == 0:
Branches [[0, 370], [0, 371]] missed. !
UNCOV
370
            return _StateCacheEntry(state={}, state_group=None)
!
UNCOV
371
        elif len(state_groups_ids) == 1:
Branches [[0, 372], [0, 383]] missed. !
UNCOV
372
            name, state_list = list(state_groups_ids.items()).pop()
!
373

UNCOV
374
            prev_group, delta_ids = yield self.store.get_state_group_delta(name)
!
375

UNCOV
376
            return _StateCacheEntry(
!
377
                state=state_list,
378
                state_group=name,
379
                prev_group=prev_group,
380
                delta_ids=delta_ids,
381
            )
382

UNCOV
383
        room_version = yield self.store.get_room_version(room_id)
!
384

UNCOV
385
        result = yield self._state_resolution_handler.resolve_state_groups(
!
386
            room_id,
387
            room_version,
388
            state_groups_ids,
389
            None,
390
            state_res_store=StateResolutionStore(self.store),
391
        )
UNCOV
392
        return result
!
393

394
    @defer.inlineCallbacks
1×
395
    def resolve_events(self, room_version, state_sets, event):
UNCOV
396
        logger.info(
!
397
            "Resolving state for %s with %d groups", event.room_id, len(state_sets)
398
        )
UNCOV
399
        state_set_ids = [
Branches [[0, 400], [0, 403]] missed. !
400
            {(ev.type, ev.state_key): ev.event_id for ev in st} for st in state_sets
401
        ]
402

UNCOV
403
        state_map = {ev.event_id: ev for st in state_sets for ev in st}
Branches [[0, 403], [0, 405]] missed. !
404

UNCOV
405
        with Measure(self.clock, "state._resolve_events"):
!
UNCOV
406
            new_state = yield resolve_events_with_store(
!
407
                room_version,
408
                state_set_ids,
409
                event_map=state_map,
410
                state_res_store=StateResolutionStore(self.store),
411
            )
412

UNCOV
413
        new_state = {key: state_map[ev_id] for key, ev_id in iteritems(new_state)}
Branches [[0, 413], [0, 415]] missed. !
414

UNCOV
415
        return new_state
!
416

417

418
class StateResolutionHandler(object):
1×
419
    """Responsible for doing state conflict resolution.
420

421
    Note that the storage layer depends on this handler, so all functions must
422
    be storage-independent.
423
    """
424

425
    def __init__(self, hs):
1×
UNCOV
426
        self.clock = hs.get_clock()
!
427

428
        # dict of set of event_ids -> _StateCacheEntry.
UNCOV
429
        self._state_cache = None
!
UNCOV
430
        self.resolve_linearizer = Linearizer(name="state_resolve_lock")
!
431

UNCOV
432
        self._state_cache = ExpiringCache(
!
433
            cache_name="state_cache",
434
            clock=self.clock,
435
            max_len=SIZE_OF_CACHE,
436
            expiry_ms=EVICTION_TIMEOUT_SECONDS * 1000,
437
            iterable=True,
438
            reset_expiry_on_get=True,
439
        )
440

441
    @defer.inlineCallbacks
1×
442
    @log_function
1×
443
    def resolve_state_groups(
444
        self, room_id, room_version, state_groups_ids, event_map, state_res_store
445
    ):
446
        """Resolves conflicts between a set of state groups
447

448
        Always generates a new state group (unless we hit the cache), so should
449
        not be called for a single state group
450

451
        Args:
452
            room_id (str): room we are resolving for (used for logging)
453
            room_version (str): version of the room
454
            state_groups_ids (dict[int, dict[(str, str), str]]):
455
                 map from state group id to the state in that state group
456
                (where 'state' is a map from state key to event id)
457

458
            event_map(dict[str,FrozenEvent]|None):
459
                a dict from event_id to event, for any events that we happen to
460
                have in flight (eg, those currently being persisted). This will be
461
                used as a starting point fof finding the state we need; any missing
462
                events will be requested via state_res_store.
463

464
                If None, all events will be fetched via state_res_store.
465

466
            state_res_store (StateResolutionStore)
467

468
        Returns:
469
            Deferred[_StateCacheEntry]: resolved state
470
        """
UNCOV
471
        logger.debug("resolve_state_groups state_groups %s", state_groups_ids.keys())
!
472

UNCOV
473
        group_names = frozenset(state_groups_ids.keys())
!
474

UNCOV
475
        with (yield self.resolve_linearizer.queue(group_names)):
!
UNCOV
476
            if self._state_cache is not None:
Branches [[0, 477], [0, 481]] missed. !
UNCOV
477
                cache = self._state_cache.get(group_names, None)
!
UNCOV
478
                if cache:
Branches [[0, 479], [0, 481]] missed. !
UNCOV
479
                    return cache
!
480

UNCOV
481
            logger.info(
!
482
                "Resolving state for %s with %d groups", room_id, len(state_groups_ids)
483
            )
484

UNCOV
485
            state_groups_histogram.observe(len(state_groups_ids))
!
486

487
            # start by assuming we won't have any conflicted state, and build up the new
488
            # state map by iterating through the state groups. If we discover a conflict,
489
            # we give up and instead use `resolve_events_with_store`.
490
            #
491
            # XXX: is this actually worthwhile, or should we just let
492
            # resolve_events_with_store do it?
UNCOV
493
            new_state = {}
!
UNCOV
494
            conflicted_state = False
!
UNCOV
495
            for st in itervalues(state_groups_ids):
Branches [[0, 496], [0, 504]] missed. !
UNCOV
496
                for key, e_id in iteritems(st):
Branches [[0, 497], [0, 501]] missed. !
UNCOV
497
                    if key in new_state:
Branches [[0, 498], [0, 500]] missed. !
UNCOV
498
                        conflicted_state = True
!
UNCOV
499
                        break
!
UNCOV
500
                    new_state[key] = e_id
!
UNCOV
501
                if conflicted_state:
Branches [[0, 495], [0, 502]] missed. !
UNCOV
502
                    break
!
503

UNCOV
504
            if conflicted_state:
Branches [[0, 505], [0, 519]] missed. !
UNCOV
505
                logger.info("Resolving conflicted state for %r", room_id)
!
UNCOV
506
                with Measure(self.clock, "state._resolve_events"):
!
UNCOV
507
                    new_state = yield resolve_events_with_store(
!
508
                        room_version,
509
                        list(itervalues(state_groups_ids)),
510
                        event_map=event_map,
511
                        state_res_store=state_res_store,
512
                    )
513

514
            # if the new state matches any of the input state groups, we can
515
            # use that state group again. Otherwise we will generate a state_id
516
            # which will be used as a cache key for future resolutions, but
517
            # not get persisted.
518

UNCOV
519
            with Measure(self.clock, "state.create_group_ids"):
!
UNCOV
520
                cache = _make_state_cache_entry(new_state, state_groups_ids)
!
521

UNCOV
522
            if self._state_cache is not None:
Branches [[0, 523], [0, 525]] missed. !
UNCOV
523
                self._state_cache[group_names] = cache
!
524

UNCOV
525
            return cache
!
526

527

528
def _make_state_cache_entry(new_state, state_groups_ids):
1×
529
    """Given a resolved state, and a set of input state groups, pick one to base
530
    a new state group on (if any), and return an appropriately-constructed
531
    _StateCacheEntry.
532

533
    Args:
534
        new_state (dict[(str, str), str]): resolved state map (mapping from
535
           (type, state_key) to event_id)
536

537
        state_groups_ids (dict[int, dict[(str, str), str]]):
538
                 map from state group id to the state in that state group
539
                (where 'state' is a map from state key to event id)
540

541
    Returns:
542
        _StateCacheEntry
543
    """
544
    # if the new state matches any of the input state groups, we can
545
    # use that state group again. Otherwise we will generate a state_id
546
    # which will be used as a cache key for future resolutions, but
547
    # not get persisted.
548

549
    # first look for exact matches
UNCOV
550
    new_state_event_ids = set(itervalues(new_state))
!
UNCOV
551
    for sg, state in iteritems(state_groups_ids):
Branches [[0, 552], [0, 565]] missed. !
UNCOV
552
        if len(new_state_event_ids) != len(state):
Branches [[0, 553], [0, 555]] missed. !
UNCOV
553
            continue
!
554

UNCOV
555
        old_state_event_ids = set(itervalues(state))
!
UNCOV
556
        if new_state_event_ids == old_state_event_ids:
Branches [[0, 551], [0, 558]] missed. !
557
            # got an exact match.
UNCOV
558
            return _StateCacheEntry(state=new_state, state_group=sg)
!
559

560
    # TODO: We want to create a state group for this set of events, to
561
    # increase cache hits, but we need to make sure that it doesn't
562
    # end up as a prev_group without being added to the database
563

564
    # failing that, look for the closest match.
UNCOV
565
    prev_group = None
!
UNCOV
566
    delta_ids = None
!
567

UNCOV
568
    for old_group, old_state in iteritems(state_groups_ids):
Branches [[0, 569], [0, 574]] missed. !
UNCOV
569
        n_delta_ids = {k: v for k, v in iteritems(new_state) if old_state.get(k) != v}
Branches [[0, 569], [0, 570]] missed. !
UNCOV
570
        if not delta_ids or len(n_delta_ids) < len(delta_ids):
Branches [[0, 568], [0, 571]] missed. !
UNCOV
571
            prev_group = old_group
!
UNCOV
572
            delta_ids = n_delta_ids
!
573

UNCOV
574
    return _StateCacheEntry(
!
575
        state=new_state, state_group=None, prev_group=prev_group, delta_ids=delta_ids
576
    )
577

578

579
def resolve_events_with_store(room_version, state_sets, event_map, state_res_store):
1×
580
    """
581
    Args:
582
        room_version(str): Version of the room
583

584
        state_sets(list): List of dicts of (type, state_key) -> event_id,
585
            which are the different state groups to resolve.
586

587
        event_map(dict[str,FrozenEvent]|None):
588
            a dict from event_id to event, for any events that we happen to
589
            have in flight (eg, those currently being persisted). This will be
590
            used as a starting point fof finding the state we need; any missing
591
            events will be requested via state_map_factory.
592

593
            If None, all events will be fetched via state_map_factory.
594

595
        state_res_store (StateResolutionStore)
596

597
    Returns
598
        Deferred[dict[(str, str), str]]:
599
            a map from (type, state_key) to event_id.
600
    """
UNCOV
601
    v = KNOWN_ROOM_VERSIONS[room_version]
!
UNCOV
602
    if v.state_res == StateResolutionVersions.V1:
Branches [[0, 603], [0, 607]] missed. !
UNCOV
603
        return v1.resolve_events_with_store(
!
604
            state_sets, event_map, state_res_store.get_events
605
        )
606
    else:
UNCOV
607
        return v2.resolve_events_with_store(
!
608
            room_version, state_sets, event_map, state_res_store
609
        )
610

611

612
@attr.s
1×
613
class StateResolutionStore(object):
1×
614
    """Interface that allows state resolution algorithms to access the database
615
    in well defined way.
616

617
    Args:
618
        store (DataStore)
619
    """
620

621
    store = attr.ib()
1×
622

623
    def get_events(self, event_ids, allow_rejected=False):
1×
624
        """Get events from the database
625

626
        Args:
627
            event_ids (list): The event_ids of the events to fetch
628
            allow_rejected (bool): If True return rejected events.
629

630
        Returns:
631
            Deferred[dict[str, FrozenEvent]]: Dict from event_id to event.
632
        """
633

UNCOV
634
        return self.store.get_events(
!
635
            event_ids,
636
            check_redacted=False,
637
            get_prev_content=False,
638
            allow_rejected=allow_rejected,
639
        )
640

641
    def get_auth_chain(self, event_ids):
1×
642
        """Gets the full auth chain for a set of events (including rejected
643
        events).
644

645
        Includes the given event IDs in the result.
646

647
        Note that:
648
            1. All events must be state events.
649
            2. For v1 rooms this may not have the full auth chain in the
650
               presence of rejected events
651

652
        Args:
653
            event_ids (list): The event IDs of the events to fetch the auth
654
                chain for. Must be state events.
655

656
        Returns:
657
            Deferred[list[str]]: List of event IDs of the auth chain.
658
        """
659

UNCOV
660
        return self.store.get_auth_chain_ids(event_ids, include_given=True)
!
Troubleshooting · Open an Issue · Sales · Support · ENTERPRISE · CAREERS · STATUS
BLOG · TWITTER · Legal & Privacy · Supported CI Services · What's a CI service? · Automated Testing

© 2019 Coveralls, LLC