• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 8783180501

22 Apr 2024 11:22AM UTC coverage: 90.292% (+0.05%) from 90.244%
8783180501

push

gihub-action

web-flow
feat: add support for entity tags on Renku 2.0 projects (#141)

Add support for entity tag on the project API endpoints.

82 of 86 new or added lines in 10 files covered. (95.35%)

20 existing lines in 5 files now uncovered.

5822 of 6448 relevant lines covered (90.29%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.69
/components/renku_data_services/message_queue/redis_queue.py
1
"""Message queue implementation for redis streams."""
1✔
2

3
import base64
1✔
4
import copy
1✔
5
import glob
1✔
6
import inspect
1✔
7
import json
1✔
8
from collections.abc import Callable
1✔
9
from dataclasses import dataclass
1✔
10
from datetime import datetime
1✔
11
from functools import wraps
1✔
12
from io import BytesIO
1✔
13
from pathlib import Path
1✔
14
from types import NoneType, UnionType
1✔
15
from typing import Optional, TypeVar, Union
1✔
16

17
from dataclasses_avroschema.schema_generator import AvroModel
1✔
18
from dataclasses_avroschema.utils import standardize_custom_type
1✔
19
from fastavro import parse_schema, schemaless_reader, schemaless_writer
1✔
20
from sqlalchemy.ext.asyncio import AsyncSession
1✔
21
from ulid import ULID
1✔
22

23
from renku_data_services.message_queue.avro_models.io.renku.events.v1.header import Header
1✔
24
from renku_data_services.message_queue.avro_models.io.renku.events.v1.project_authorization_added import (
1✔
25
    ProjectAuthorizationAdded,
26
)
27
from renku_data_services.message_queue.avro_models.io.renku.events.v1.project_authorization_removed import (
1✔
28
    ProjectAuthorizationRemoved,
29
)
30
from renku_data_services.message_queue.avro_models.io.renku.events.v1.project_authorization_updated import (
1✔
31
    ProjectAuthorizationUpdated,
32
)
33
from renku_data_services.message_queue.avro_models.io.renku.events.v1.project_created import ProjectCreated
1✔
34
from renku_data_services.message_queue.avro_models.io.renku.events.v1.project_removed import ProjectRemoved
1✔
35
from renku_data_services.message_queue.avro_models.io.renku.events.v1.project_updated import ProjectUpdated
1✔
36
from renku_data_services.message_queue.avro_models.io.renku.events.v1.user_added import UserAdded
1✔
37
from renku_data_services.message_queue.avro_models.io.renku.events.v1.user_removed import UserRemoved
1✔
38
from renku_data_services.message_queue.avro_models.io.renku.events.v1.user_updated import UserUpdated
1✔
39
from renku_data_services.message_queue.config import RedisConfig
1✔
40
from renku_data_services.message_queue.interface import IMessageQueue
1✔
41

42
_root = Path(__file__).parent.resolve()
1✔
43
_filter = f"{_root}/schemas/**/*.avsc"
1✔
44
_schemas = {}
1✔
45
for file in glob.glob(_filter, recursive=True):
1✔
46
    with open(file) as f:
1✔
47
        _schema = json.load(f)
1✔
48
        if "name" in _schema:
1✔
49
            _name = _schema["name"]
1✔
50
            _namespace = _schema.get("namespace")
1✔
51
            if _namespace:
1✔
52
                _name = f"{_namespace}.{_name}"
1✔
53
            _schemas[_name] = _schema
1✔
54

55

56
def serialize_binary(obj: AvroModel) -> bytes:
1✔
57
    """Serialize a message with avro, making sure to use the original schema."""
58
    schema = parse_schema(schema=json.loads(getattr(obj, "_schema", obj.avro_schema())), named_schemas=_schemas)
1✔
59
    fo = BytesIO()
1✔
60
    schemaless_writer(fo, schema, obj.asdict(standardize_factory=standardize_custom_type))
1✔
61
    return fo.getvalue()
1✔
62

63

64
T = TypeVar("T", bound=AvroModel)
1✔
65

66

67
def deserialize_binary(data: bytes, model: type[T]) -> T:
1✔
68
    """Deserialize an avro binary message, using the original schema."""
69
    input_stream = BytesIO(data)
1✔
70
    schema = parse_schema(schema=json.loads(getattr(model, "_schema", model.avro_schema())), named_schemas=_schemas)
1✔
71

72
    payload = schemaless_reader(input_stream, schema, schema)
1✔
73
    input_stream.flush()
1✔
74
    obj = model.parse_obj(payload)  # type: ignore
1✔
75

76
    return obj
1✔
77

78

79
def create_header(message_type: str, content_type: str = "application/avro+binary") -> Header:
1✔
80
    """Create a message header."""
81
    return Header(
1✔
82
        type=message_type,
83
        source="renku-data-services",
84
        dataContentType=content_type,
85
        schemaVersion="1",
86
        time=datetime.utcnow(),
87
        requestId=ULID().hex,
88
    )
89

90

91
def dispatch_message(transform: Callable[..., Union[AvroModel, Optional[AvroModel]]]):
1✔
92
    """Sends a message on the message queue.
93

94
    The transform method is called with the arguments and result of the wrapped method. It is responsible for
95
    creating the message type to dispatch. The message is sent based on the return type of the transform method.
96
    This wrapper takes care of guaranteed at-least-once delivery of messages by using a backup 'events' table that
97
    stores messages for redelivery shold sending fail. For this to work correctly, the messages need to be stored
98
    in the events table in the same database transaction as the metadata update that they are related to.
99
    All this is to ensure that downstream consumers are kept up to date. They are expected to handle multiple
100
    delivery of the same message correctly.
101
    This code addresses these potential error cases:
102
    - Data being persisted in our database but no message being sent due to errors/restarts of the service at the
103
      wrong time.
104
    - Redis not being available.
105
    Downstream consumers are expected to handle the following:
106
    - The same message being delivered more than once. Deduplication can be done due to the message ids being
107
      the identical.
108
    - Messages being delivered out of order. This should be super rare, e.g. a user edits a project, message delivery
109
      fails duf to redis being down, the user then deletes the project and message delivery works. Then the first
110
      message is delivered again and this works, meaning downstream the project deletion arrives before the project
111
      update. Order can be maintained due to the timestamps in the messages.
112
    """
113

114
    def decorator(f):
1✔
115
        @wraps(f)
1✔
116
        async def message_wrapper(self, session: AsyncSession, *args, **kwargs):
1✔
117
            result = await f(self, session, *args, **kwargs)
1✔
118
            payload = transform(result, *args, **kwargs)
1✔
119

120
            if payload is None:
1✔
121
                # don't send message if transform returned None
UNCOV
122
                return result
×
123

124
            signature = inspect.signature(transform).return_annotation
1✔
125

126
            # Handle type unions
127
            non_none_types = None
1✔
128
            if isinstance(signature, UnionType):
1✔
129
                non_none_types = [t for t in signature.__args__ if t != NoneType]
1✔
130
            elif isinstance(signature, str) and " | " in signature:
1✔
131
                non_none_types = [t for t in signature.split(" | ") if t != "None"]
1✔
132

133
            if non_none_types is not None:
1✔
134
                if len(non_none_types) != 1:
1✔
UNCOV
135
                    raise NotImplementedError(f"Only optional types are supported, got {signature}")
×
136
                signature = non_none_types[0]
1✔
137
            if not isinstance(signature, str):
1✔
138
                # depending on 'from _future_ import annotations' this can be a string or a type
139
                signature = signature.__qualname__
1✔
140

141
            match signature:
1✔
142
                case ProjectCreated.__qualname__:
1✔
143
                    queue_name = "project.created"
1✔
144
                case ProjectUpdated.__qualname__:
1✔
145
                    queue_name = "project.updated"
1✔
146
                case ProjectRemoved.__qualname__:
1✔
147
                    queue_name = "project.removed"
1✔
148
                case UserAdded.__qualname__:
1✔
149
                    queue_name = "user.added"
1✔
150
                case UserUpdated.__qualname__:
1✔
151
                    queue_name = "user.updated"
1✔
152
                case UserRemoved.__qualname__:
1✔
153
                    queue_name = "user.removed"
1✔
154
                case ProjectAuthorizationAdded.__qualname__:
1✔
155
                    queue_name = "projectAuth.added"
1✔
156
                case ProjectAuthorizationUpdated.__qualname__:
1✔
157
                    queue_name = "projectAuth.updated"
1✔
158
                case ProjectAuthorizationRemoved.__qualname__:
1✔
159
                    queue_name = "projectAuth.removed"
1✔
160
                case _:
×
UNCOV
161
                    raise NotImplementedError(f"Can't create message using transform {transform}:{signature}")
×
162
            headers = create_header(queue_name)
1✔
163
            message_id = ULID().hex
1✔
164
            message: dict[bytes | memoryview | str | int | float, bytes | memoryview | str | int | float] = {
1✔
165
                "id": message_id,
166
                "headers": headers.serialize_json(),
167
                "payload": base64.b64encode(serialize_binary(payload)).decode(),
168
            }
169
            event_id = await self.event_repo.store_event(session, queue_name, message)
1✔
170
            await session.commit()
1✔
171

172
            try:
1✔
173
                await self.message_queue.send_message(queue_name, message)
1✔
174
            except:  # noqa:E722
1✔
175
                return result
1✔
176
            await self.event_repo.delete_event(event_id)
1✔
177
            return result
1✔
178

179
        return message_wrapper
1✔
180

181
    return decorator
1✔
182

183

184
@dataclass
1✔
185
class RedisQueue(IMessageQueue):
1✔
186
    """Redis streams queue implementation."""
1✔
187

188
    config: RedisConfig
1✔
189

190
    async def send_message(
1✔
191
        self,
192
        channel: str,
193
        message: dict[bytes | memoryview | str | int | float, bytes | memoryview | str | int | float],
194
    ):
195
        """Send a message on a channel."""
196
        message = copy.copy(message)
1✔
197
        if "payload" in message:
1✔
198
            message["payload"] = base64.b64decode(message["payload"])  # type: ignore
1✔
199

200
        await self.config.redis_connection.xadd(channel, message)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc