• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 8360018096

20 Mar 2024 01:46PM UTC coverage: 88.8% (+0.2%) from 88.58%
8360018096

push

gihub-action

web-flow
feat: refactor messages for cleaner code and reliability (#150)

* refactor messaging code

* add background job for resending events

237 of 261 new or added lines in 9 files covered. (90.8%)

1 existing line in 1 file now uncovered.

5098 of 5741 relevant lines covered (88.8%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.8
/components/renku_data_services/message_queue/redis_queue.py
1
"""Message queue implementation for redis streams."""
1✔
2

3
import base64
1✔
4
import copy
1✔
5
import glob
1✔
6
import inspect
1✔
7
import json
1✔
8
from dataclasses import dataclass
1✔
9
from datetime import datetime
1✔
10
from functools import wraps
1✔
11
from io import BytesIO
1✔
12
from pathlib import Path
1✔
13
from types import NoneType, UnionType
1✔
14
from typing import Callable, Optional, Type, TypeVar, Union
1✔
15

16
from dataclasses_avroschema.schema_generator import AvroModel
1✔
17
from dataclasses_avroschema.utils import standardize_custom_type
1✔
18
from fastavro import parse_schema, schemaless_reader, schemaless_writer
1✔
19
from ulid import ULID
1✔
20

21
from renku_data_services.message_queue.avro_models.io.renku.events.v1.header import Header
1✔
22
from renku_data_services.message_queue.avro_models.io.renku.events.v1.project_authorization_added import (
1✔
23
    ProjectAuthorizationAdded,
24
)
25
from renku_data_services.message_queue.avro_models.io.renku.events.v1.project_authorization_removed import (
1✔
26
    ProjectAuthorizationRemoved,
27
)
28
from renku_data_services.message_queue.avro_models.io.renku.events.v1.project_authorization_updated import (
1✔
29
    ProjectAuthorizationUpdated,
30
)
31
from renku_data_services.message_queue.avro_models.io.renku.events.v1.project_created import ProjectCreated
1✔
32
from renku_data_services.message_queue.avro_models.io.renku.events.v1.project_removed import ProjectRemoved
1✔
33
from renku_data_services.message_queue.avro_models.io.renku.events.v1.project_updated import ProjectUpdated
1✔
34
from renku_data_services.message_queue.avro_models.io.renku.events.v1.user_added import UserAdded
1✔
35
from renku_data_services.message_queue.avro_models.io.renku.events.v1.user_removed import UserRemoved
1✔
36
from renku_data_services.message_queue.avro_models.io.renku.events.v1.user_updated import UserUpdated
1✔
37
from renku_data_services.message_queue.config import RedisConfig
1✔
38
from renku_data_services.message_queue.interface import IMessageQueue
1✔
39

40
_root = Path(__file__).parent.resolve()
1✔
41
_filter = f"{_root}/schemas/**/*.avsc"
1✔
42
_schemas = {}
1✔
43
for file in glob.glob(_filter, recursive=True):
1✔
44
    with open(file) as f:
1✔
45
        _schema = json.load(f)
1✔
46
        if "name" in _schema:
1✔
47
            _name = _schema["name"]
1✔
48
            _namespace = _schema.get("namespace")
1✔
49
            if _namespace:
1✔
50
                _name = f"{_namespace}.{_name}"
1✔
51
            _schemas[_name] = _schema
1✔
52

53

54
def serialize_binary(obj: AvroModel) -> bytes:
1✔
55
    """Serialize a message with avro, making sure to use the original schema."""
56
    schema = parse_schema(schema=json.loads(getattr(obj, "_schema", obj.avro_schema())), named_schemas=_schemas)
1✔
57
    fo = BytesIO()
1✔
58
    schemaless_writer(fo, schema, obj.asdict(standardize_factory=standardize_custom_type))
1✔
59
    return fo.getvalue()
1✔
60

61

62
T = TypeVar("T", bound=AvroModel)
1✔
63

64

65
def deserialize_binary(data: bytes, model: Type[T]) -> T:
1✔
66
    """Deserialize an avro binary message, using the original schema."""
67
    input_stream = BytesIO(data)
1✔
68
    schema = parse_schema(schema=json.loads(getattr(model, "_schema", model.avro_schema())), named_schemas=_schemas)
1✔
69

70
    payload = schemaless_reader(input_stream, schema, schema)
1✔
71
    input_stream.flush()
1✔
72
    obj = model.parse_obj(payload)  # type: ignore
1✔
73

74
    return obj
1✔
75

76

77
def create_header(message_type: str, content_type: str = "application/avro+binary") -> Header:
1✔
78
    """Create a message header."""
79
    return Header(
1✔
80
        type=message_type,
81
        source="renku-data-services",
82
        dataContentType=content_type,
83
        schemaVersion="1",
84
        time=datetime.utcnow(),
85
        requestId=ULID().hex,
86
    )
87

88

89
def dispatch_message(transform: Callable[..., Union[AvroModel, Optional[AvroModel]]]):
1✔
90
    """Sends a message on the message queue.
91

92
    The transform method is called with the arguments and result of the wrapped method. It is responsible for
93
    creating the message type to dispatch. The message is sent based on the return type of the transform method.
94
    This wrapper takes care of guaranteed at-least-once delivery of messages by using a backup 'events' table that
95
    stores messages for redelivery shold sending fail. For this to work correctly, the messages need to be stored
96
    in the events table in the same database transaction as the metadata update that they are related to.
97
    All this is to ensure that downstream consumers are kept up to date. They are expected to handle multiple
98
    delivery of the same message correctly.
99
    This code addresses these potential error cases:
100
    - Data being persisted in our database but no message being sent due to errors/restarts of the service at the
101
      wrong time.
102
    - Redis not being available.
103
    Downstream consumers are expected to handle the following:
104
    - The same message being delivered more than once. Deduplication can be done due to the message ids being
105
      the identical.
106
    - Messages being delivered out of order. This should be super rare, e.g. a user edits a project, message delivery
107
      fails duf to redis being down, the user then deletes the project and message delivery works. Then the first
108
      message is delivered again and this works, meaning downstream the project deletion arrives before the project
109
      update. Order can be maintained due to the timestamps in the messages.
110
    """
111

112
    def decorator(f):
1✔
113
        @wraps(f)
1✔
114
        async def message_wrapper(self, session, *args, **kwargs):
1✔
115
            result = await f(self, session, *args, **kwargs)
1✔
116
            payload = transform(result, *args, **kwargs)
1✔
117

118
            if payload is None:
1✔
119
                # don't send message if transform returned None
NEW
120
                return result
×
121

122
            signature = inspect.signature(transform).return_annotation
1✔
123

124
            # Handle type unions
125
            non_none_types = None
1✔
126
            if isinstance(signature, UnionType):
1✔
127
                non_none_types = [t for t in signature.__args__ if t != NoneType]
1✔
128
            elif isinstance(signature, str) and " | " in signature:
1✔
129
                non_none_types = [t for t in signature.split(" | ") if t != "None"]
1✔
130

131
            if non_none_types is not None:
1✔
132
                if len(non_none_types) != 1:
1✔
NEW
133
                    raise NotImplementedError(f"Only optional types are supported, got {signature}")
×
134
                signature = non_none_types[0]
1✔
135
            if not isinstance(signature, str):
1✔
136
                # depending on 'from _future_ import annotations' this can be a string or a type
137
                signature = signature.__qualname__
1✔
138

139
            match signature:
1✔
140
                case ProjectCreated.__qualname__:
1✔
141
                    queue_name = "project.created"
1✔
142
                case ProjectUpdated.__qualname__:
1✔
143
                    queue_name = "project.updated"
1✔
144
                case ProjectRemoved.__qualname__:
1✔
145
                    queue_name = "project.removed"
1✔
146
                case UserAdded.__qualname__:
1✔
147
                    queue_name = "user.added"
1✔
148
                case UserUpdated.__qualname__:
1✔
149
                    queue_name = "user.updated"
1✔
150
                case UserRemoved.__qualname__:
1✔
151
                    queue_name = "user.removed"
1✔
152
                case ProjectAuthorizationAdded.__qualname__:
1✔
153
                    queue_name = "projectAuth.added"
1✔
154
                case ProjectAuthorizationUpdated.__qualname__:
1✔
155
                    queue_name = "projectAuth.updated"
1✔
156
                case ProjectAuthorizationRemoved.__qualname__:
1✔
157
                    queue_name = "projectAuth.removed"
1✔
NEW
158
                case _:
×
NEW
159
                    raise NotImplementedError(f"Can't create message using transform {transform}:{signature}")
×
160
            headers = create_header(queue_name)
1✔
161
            message_id = ULID().hex
1✔
162
            message: dict[bytes | memoryview | str | int | float, bytes | memoryview | str | int | float] = {
1✔
163
                "id": message_id,
164
                "headers": headers.serialize_json(),
165
                "payload": base64.b64encode(serialize_binary(payload)).decode(),
166
            }
167
            event_id = await self.event_repo.store_event(session, queue_name, message)
1✔
168
            session.commit()
1✔
169

170
            try:
1✔
171
                await self.message_queue.send_message(queue_name, message)
1✔
172
            except:  # noqa:E722
1✔
173
                return result
1✔
NEW
174
            await self.event_repo.delete_event(event_id)
×
175
            return result
1✔
176

177
        return message_wrapper
1✔
178

179
    return decorator
1✔
180

181

182
@dataclass
1✔
183
class RedisQueue(IMessageQueue):
1✔
184
    """Redis streams queue implementation."""
1✔
185

186
    config: RedisConfig
1✔
187

188
    async def send_message(
1✔
189
        self,
190
        channel: str,
191
        message: dict[bytes | memoryview | str | int | float, bytes | memoryview | str | int | float],
192
    ):
193
        """Send a message on a channel."""
194
        message = copy.copy(message)
1✔
195
        if "payload" in message:
1✔
196
            message["payload"] = base64.b64decode(message["payload"])  # type: ignore
1✔
197

198
        await self.config.redis_connection.xadd(channel, message)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc