• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 9193809848

22 May 2024 03:04PM UTC coverage: 89.933% (+0.05%) from 89.888%
9193809848

push

gihub-action

web-flow
chore(ci): Update renku actions to v1.11.3 (#218)

7781 of 8652 relevant lines covered (89.93%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.89
/components/renku_data_services/message_queue/redis_queue.py
1
"""Message queue implementation for redis streams."""
1✔
2

3
import base64
1✔
4
import copy
1✔
5
import glob
1✔
6
import json
1✔
7
import logging
1✔
8
from collections.abc import Awaitable, Callable
1✔
9
from dataclasses import dataclass
1✔
10
from datetime import datetime
1✔
11
from functools import wraps
1✔
12
from io import BytesIO
1✔
13
from pathlib import Path
1✔
14
from typing import Any, Concatenate, ParamSpec, Protocol, TypeVar
1✔
15

16
from dataclasses_avroschema.schema_generator import AvroModel
1✔
17
from dataclasses_avroschema.utils import standardize_custom_type
1✔
18
from fastavro import parse_schema, schemaless_reader, schemaless_writer
1✔
19
from sqlalchemy.ext.asyncio import AsyncSession
1✔
20
from ulid import ULID
1✔
21

22
from renku_data_services.errors import errors
1✔
23
from renku_data_services.message_queue import AmbiguousEvent
1✔
24
from renku_data_services.message_queue.avro_models.io.renku.events.v1.header import Header
1✔
25
from renku_data_services.message_queue.config import RedisConfig
1✔
26
from renku_data_services.message_queue.converters import EventConverter
1✔
27
from renku_data_services.message_queue.db import EventRepository
1✔
28
from renku_data_services.message_queue.interface import IMessageQueue
1✔
29

30
_root = Path(__file__).parent.resolve()
1✔
31
_filter = f"{_root}/schemas/**/*.avsc"
1✔
32
_schemas = {}
1✔
33
for file in glob.glob(_filter, recursive=True):
1✔
34
    with open(file) as f:
1✔
35
        _schema = json.load(f)
1✔
36
        if "name" in _schema:
1✔
37
            _name = _schema["name"]
1✔
38
            _namespace = _schema.get("namespace")
1✔
39
            if _namespace:
1✔
40
                _name = f"{_namespace}.{_name}"
1✔
41
            _schemas[_name] = _schema
1✔
42

43

44
def serialize_binary(obj: AvroModel) -> bytes:
1✔
45
    """Serialize a message with avro, making sure to use the original schema."""
46
    schema = parse_schema(schema=json.loads(getattr(obj, "_schema", obj.avro_schema())), named_schemas=_schemas)
1✔
47
    fo = BytesIO()
1✔
48
    schemaless_writer(fo, schema, obj.asdict(standardize_factory=standardize_custom_type))
1✔
49
    return fo.getvalue()
1✔
50

51

52
TAvro = TypeVar("TAvro", bound=AvroModel)
1✔
53

54

55
def deserialize_binary(data: bytes, model: type[TAvro]) -> TAvro:
1✔
56
    """Deserialize an avro binary message, using the original schema."""
57
    input_stream = BytesIO(data)
1✔
58
    schema = parse_schema(schema=json.loads(getattr(model, "_schema", model.avro_schema())), named_schemas=_schemas)
1✔
59

60
    payload = schemaless_reader(input_stream, schema, schema)
1✔
61
    input_stream.flush()
1✔
62
    obj = model.parse_obj(payload)  # type: ignore
1✔
63

64
    return obj
1✔
65

66

67
def create_header(
1✔
68
    message_type: str, content_type: str = "application/avro+binary", schema_version: str = "1"
69
) -> Header:
70
    """Create a message header."""
71
    return Header(
1✔
72
        type=message_type,
73
        source="renku-data-services",
74
        dataContentType=content_type,
75
        schemaVersion=schema_version,
76
        time=datetime.utcnow(),
77
        requestId=ULID().hex,
78
    )
79

80

81
class WithMessageQueue(Protocol):
1✔
82
    """The protcol required for a class to send messages to a message queue."""
1✔
83

84
    @property
1✔
85
    def event_repo(self) -> EventRepository:
1✔
86
        """Returns the event repository."""
87
        ...
×
88

89

90
_P = ParamSpec("_P")
1✔
91
_T = TypeVar("_T")
1✔
92
_WithMessageQueue = TypeVar("_WithMessageQueue", bound=WithMessageQueue)
1✔
93

94

95
def dispatch_message(
1✔
96
    event_type: type[AvroModel] | AmbiguousEvent,
97
) -> Callable[
98
    [Callable[Concatenate[_WithMessageQueue, _P], Awaitable[_T]]],
99
    Callable[Concatenate[_WithMessageQueue, _P], Awaitable[_T]],
100
]:
101
    """Sends a message on the message queue.
102

103
    The transform method is called with the arguments and result of the wrapped method. It is responsible for
104
    creating the message type to dispatch. The message is sent based on the return type of the transform method.
105
    This wrapper takes care of guaranteed at-least-once delivery of messages by using a backup 'events' table that
106
    stores messages for redelivery shold sending fail. For this to work correctly, the messages need to be stored
107
    in the events table in the same database transaction as the metadata update that they are related to.
108
    All this is to ensure that downstream consumers are kept up to date. They are expected to handle multiple
109
    delivery of the same message correctly.
110
    This code addresses these potential error cases:
111
    - Data being persisted in our database but no message being sent due to errors/restarts of the service at the
112
      wrong time.
113
    - Redis not being available.
114
    Downstream consumers are expected to handle the following:
115
    - The same message being delivered more than once. Deduplication can be done due to the message ids being
116
      the identical.
117
    - Messages being delivered out of order. This should be super rare, e.g. a user edits a project, message delivery
118
      fails duf to redis being down, the user then deletes the project and message delivery works. Then the first
119
      message is delivered again and this works, meaning downstream the project deletion arrives before the project
120
      update. Order can be maintained due to the timestamps in the messages.
121
    """
122

123
    def decorator(
1✔
124
        f: Callable[Concatenate[_WithMessageQueue, _P], Awaitable[_T]],
125
    ) -> Callable[Concatenate[_WithMessageQueue, _P], Awaitable[_T]]:
126
        @wraps(f)
1✔
127
        async def message_wrapper(self: _WithMessageQueue, *args: _P.args, **kwargs: _P.kwargs):
1✔
128
            session = kwargs.get("session")
1✔
129
            if not isinstance(session, AsyncSession):
1✔
130
                raise errors.ProgrammingError(
×
131
                    message="The decorator that populates the message queue expects a valid database session "
132
                    f"in the keyword arguments instead it got {type(session)}."
133
                )
134
            result = await f(self, *args, **kwargs)
1✔
135
            if result is None:
1✔
136
                return result  # type: ignore[unreachable]
1✔
137
            events = EventConverter.to_events(result, event_type)
1✔
138

139
            for event in events:
1✔
140
                message_id = ULID().hex
1✔
141
                schema_version = "2"
1✔
142
                headers = create_header(event.queue, schema_version=schema_version).serialize_json()
1✔
143
                message: dict[str, Any] = {
1✔
144
                    "id": message_id,
145
                    "headers": headers,
146
                    "payload": base64.b64encode(serialize_binary(event.payload)).decode(),
147
                }
148
                event_id = await self.event_repo.store_event(session, event.queue, message)
1✔
149

150
                try:
1✔
151
                    await self.event_repo.message_queue.send_message(event.queue, message)
1✔
152
                except Exception as err:
1✔
153
                    logging.warning(
1✔
154
                        f"Could not insert event message to redis queue because of {err} "
155
                        "events have been added to postgres, will attempt to send them later."
156
                    )
157
                    return result
1✔
158
                await self.event_repo.delete_event(event_id)
1✔
159
            return result
1✔
160

161
        return message_wrapper
1✔
162

163
    return decorator
1✔
164

165

166
@dataclass
1✔
167
class RedisQueue(IMessageQueue):
1✔
168
    """Redis streams queue implementation."""
1✔
169

170
    config: RedisConfig
1✔
171

172
    async def send_message(
1✔
173
        self,
174
        channel: str,
175
        message: dict[str, Any],
176
    ):
177
        """Send a message on a channel."""
178
        message = copy.copy(message)
1✔
179
        if "payload" in message:
1✔
180
            message["payload"] = base64.b64decode(message["payload"])  # type: ignore
1✔
181

182
        await self.config.redis_connection.xadd(channel, message)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc