• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 10662564162

02 Sep 2024 07:16AM UTC coverage: 90.424% (-0.02%) from 90.446%
10662564162

push

github

web-flow
fix: fully reconnect to redis if master is not found (#354)

Co-authored-by: Ralf Grubenmann <ralf.grubenmann@gmail.com>

4 of 8 new or added lines in 2 files covered. (50.0%)

2 existing lines in 2 files now uncovered.

9263 of 10244 relevant lines covered (90.42%)

1.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.2
/components/renku_data_services/message_queue/redis_queue.py
1
"""Message queue implementation for redis streams."""
2✔
2

3
import copy
2✔
4
from collections.abc import Awaitable, Callable
2✔
5
from dataclasses import dataclass
2✔
6
from functools import wraps
2✔
7
from typing import Concatenate, ParamSpec, Protocol, TypeVar
2✔
8

9
from dataclasses_avroschema.schema_generator import AvroModel
2✔
10
from redis.asyncio.sentinel import MasterNotFoundError
2✔
11
from sqlalchemy.ext.asyncio import AsyncSession
2✔
12

13
from renku_data_services.errors import errors
2✔
14
from renku_data_services.message_queue import events
2✔
15
from renku_data_services.message_queue.config import RedisConfig
2✔
16
from renku_data_services.message_queue.converters import EventConverter
2✔
17
from renku_data_services.message_queue.db import EventRepository
2✔
18
from renku_data_services.message_queue.interface import IMessageQueue
2✔
19
from renku_data_services.message_queue.models import Event
2✔
20

21

22
class WithMessageQueue(Protocol):
2✔
23
    """The protocol required for a class to send messages to a message queue."""
2✔
24

25
    @property
2✔
26
    def event_repo(self) -> EventRepository:
2✔
27
        """Returns the event repository."""
28
        ...
×
29

30

31
_P = ParamSpec("_P")
2✔
32
_T = TypeVar("_T")
2✔
33
_WithMessageQueue = TypeVar("_WithMessageQueue", bound=WithMessageQueue)
2✔
34

35

36
def dispatch_message(
2✔
37
    event_type: type[AvroModel] | type[events.AmbiguousEvent],
38
) -> Callable[
39
    [Callable[Concatenate[_WithMessageQueue, _P], Awaitable[_T]]],
40
    Callable[Concatenate[_WithMessageQueue, _P], Awaitable[_T]],
41
]:
42
    """Sends a message on the message queue.
43

44
    The transform method is called with the arguments and result of the wrapped method. It is responsible for
45
    creating the message type to dispatch. The message is sent based on the return type of the transform method.
46
    Messages are stored in the database in the same transaction as the changed entities, and are sent by a background
47
    job to ensure delivery of messages and prevent messages being sent in case of failing transactions or due to
48
    exceptions.
49
    """
50

51
    def decorator(
2✔
52
        f: Callable[Concatenate[_WithMessageQueue, _P], Awaitable[_T]],
53
    ) -> Callable[Concatenate[_WithMessageQueue, _P], Awaitable[_T]]:
54
        @wraps(f)
2✔
55
        async def message_wrapper(self: _WithMessageQueue, *args: _P.args, **kwargs: _P.kwargs) -> _T:
2✔
56
            session = kwargs.get("session")
2✔
57
            if not isinstance(session, AsyncSession):
2✔
58
                raise errors.ProgrammingError(
×
59
                    message="The decorator that populates the message queue expects a valid database session "
60
                    f"in the keyword arguments instead it got {type(session)}."
61
                )
62
            result = await f(self, *args, **kwargs)
2✔
63
            if result is None:
2✔
64
                return result  # type: ignore[unreachable]
2✔
65
            events = EventConverter.to_events(result, event_type)
2✔
66

67
            for event in events:
2✔
68
                await self.event_repo.store_event(session, event)
2✔
69
            return result
2✔
70

71
        return message_wrapper
2✔
72

73
    return decorator
2✔
74

75

76
@dataclass
2✔
77
class RedisQueue(IMessageQueue):
2✔
78
    """Redis streams queue implementation."""
2✔
79

80
    config: RedisConfig
2✔
81

82
    async def send_message(self, event: Event) -> None:
2✔
83
        """Send a message on a channel."""
84
        message = copy.copy(event.serialize())
1✔
85

86
        try:
1✔
87
            await self.config.redis_connection.xadd(event.queue, message)
1✔
NEW
88
        except MasterNotFoundError:
×
NEW
89
            self.config.reset_redis_connection()  # force redis reconnection
×
NEW
90
            await self.config.redis_connection.xadd(event.queue, message)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc