• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 9281692908

29 May 2024 07:03AM UTC coverage: 90.56% (-0.07%) from 90.625%
9281692908

push

gihub-action

web-flow
feat: always send messages in sanic background process (#217)

* switch to using sanic managed background process

37 of 54 new or added lines in 7 files covered. (68.52%)

9 existing lines in 3 files now uncovered.

7953 of 8782 relevant lines covered (90.56%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.64
/components/renku_data_services/message_queue/db.py
1
"""Adapters for project database classes."""
1✔
2

3
from __future__ import annotations
1✔
4

5
from collections.abc import Callable
1✔
6

7
from sanic.log import logger
1✔
8
from sqlalchemy import delete, select
1✔
9
from sqlalchemy.ext.asyncio import AsyncSession
1✔
10
from sqlalchemy.orm import Session
1✔
11

12
from renku_data_services.message_queue import orm as schemas
1✔
13
from renku_data_services.message_queue.interface import IMessageQueue
1✔
14
from renku_data_services.message_queue.models import Event
1✔
15

16

17
class EventRepository:
1✔
18
    """Repository for events."""
1✔
19

20
    def __init__(
1✔
21
        self,
22
        session_maker: Callable[..., AsyncSession],
23
        message_queue: IMessageQueue,
24
    ):
25
        self.session_maker = session_maker  # type: ignore[call-overload]
1✔
26
        self.message_queue: IMessageQueue = message_queue
1✔
27

28
    async def _get_pending_events(self) -> list[schemas.EventORM]:
1✔
29
        """Get all pending events."""
30
        async with self.session_maker() as session:
1✔
31
            stmt = select(schemas.EventORM)
1✔
32
            events_orm = await session.scalars(stmt)
1✔
33
            return list(events_orm.all())
1✔
34

35
    async def send_pending_events(self) -> None:
1✔
36
        """Get all pending events and send them.
37

38
        We lock rows that get sent and keep sending until there are no more events.
39
        """
40
        while True:
1✔
41
            async with self.session_maker() as session, session.begin():
1✔
42
                stmt = (
1✔
43
                    select(schemas.EventORM)
44
                    # lock retrieved rows, skip already locked ones, to deal with concurrency
45
                    .with_for_update(skip_locked=True)
46
                    .limit(100)
47
                    .order_by(schemas.EventORM.timestamp_utc)
48
                )
49
                result = await session.scalars(stmt)
1✔
50
                events_orm = result.all()
1✔
51

52
                new_events_count = len(events_orm)
1✔
53
                if new_events_count == 0:
1✔
54
                    break
1✔
55

56
                for event in events_orm:
1✔
57
                    try:
1✔
58
                        await self.message_queue.send_message(event.dump())
1✔
59

60
                        await session.delete(event)  # this has to be done in the same transaction to not get a deadlock
1✔
NEW
61
                    except Exception as e:
×
NEW
62
                        logger.warning(f"couldn't send event {event.payload} on queue {event.queue}: {e}")
×
63

64
                logger.info(f"sent {new_events_count} events")
1✔
65

66
    async def store_event(self, session: AsyncSession | Session, event: Event) -> int:
1✔
67
        """Store an event."""
68
        event_orm = schemas.EventORM.load(event)
1✔
69
        session.add(event_orm)
1✔
70

71
        return event_orm.id
1✔
72

73
    async def delete_event(self, id: int):
1✔
74
        """Delete an event."""
UNCOV
75
        async with self.session_maker() as session, session.begin():
×
UNCOV
76
            stmt = delete(schemas.EventORM).where(schemas.EventORM.id == id)
×
UNCOV
77
            await session.execute(stmt)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc