• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 8360018096

20 Mar 2024 01:46PM UTC coverage: 88.8% (+0.2%) from 88.58%
8360018096

push

gihub-action

web-flow
feat: refactor messages for cleaner code and reliability (#150)

* refactor messaging code

* add background job for resending events

237 of 261 new or added lines in 9 files covered. (90.8%)

1 existing line in 1 file now uncovered.

5098 of 5741 relevant lines covered (88.8%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.48
/components/renku_data_services/message_queue/db.py
1
"""Adapters for project database classes."""
1✔
2

3
from __future__ import annotations
1✔
4

5
from datetime import datetime, timedelta
1✔
6
from typing import Any, Callable
1✔
7

8
from sanic.log import logger
1✔
9
from sqlalchemy import delete, select
1✔
10
from sqlalchemy.ext.asyncio import AsyncSession
1✔
11

12
from renku_data_services.message_queue import orm as schemas
1✔
13
from renku_data_services.message_queue.interface import IMessageQueue
1✔
14

15

16
class EventRepository:
1✔
17
    """Repository for events."""
1✔
18

19
    def __init__(
1✔
20
        self,
21
        session_maker: Callable[..., AsyncSession],
22
        message_queue: IMessageQueue,
23
    ):
24
        self.session_maker = session_maker  # type: ignore[call-overload]
1✔
25
        self.message_queue: IMessageQueue = message_queue
1✔
26

27
    async def get_pending_events(self) -> list[schemas.EventORM]:
1✔
28
        """Get all pending events."""
29
        async with self.session_maker() as session:
1✔
30
            stmt = select(schemas.EventORM)
1✔
31
            result = await session.execute(stmt)
1✔
32
            events_orm = result.scalars().all()
1✔
33
            return list(events_orm)
1✔
34

35
    async def send_pending_events(self) -> None:
1✔
36
        """Get all pending events and resend them.
37

38
        This is to ensure that an event is sent at least once.
39
        """
40
        logger.info("resending missed events.")
1✔
41

42
        async with self.session_maker() as session:
1✔
43
            # we only consider events older than 5 seconds so we don't accidentally interfere with an ongoing operation
44
            stmt = select(schemas.EventORM).where(
1✔
45
                schemas.EventORM.timestamp_utc < datetime.utcnow() - timedelta(seconds=5)
46
            )
47
            result = await session.scalars(stmt)
1✔
48
            events_orm = result.all()
1✔
49

50
            num_events = len(events_orm)
1✔
51
            if num_events == 0:
1✔
52
                logger.info("no missed events to send")
1✔
53
                return
1✔
54
            for event in events_orm:
1✔
55
                try:
1✔
56
                    await self.message_queue.send_message(event.queue, event.payload)  # type:ignore
1✔
57

NEW
58
                    await self.delete_event(event.id)
×
NEW
59
                except Exception as e:
×
NEW
60
                    logger.warning(f"couldn't resend event {event.payload} on queue {event.queue}: {e}")
×
61

62
        logger.info(f"resent {num_events} events")
1✔
63

64
    async def store_event(self, session: AsyncSession, queue: str, message: dict[str, Any]) -> int:
1✔
65
        """Store an event."""
66
        event = schemas.EventORM(datetime.utcnow(), queue, message)
1✔
67
        session.add(event)
1✔
68

69
        return event.id
1✔
70

71
    async def delete_event(self, id: int):
1✔
72
        """Delete an event."""
73
        async with self.session_maker() as session:
1✔
74
            async with session.begin():
1✔
75
                stmt = delete(schemas.EventORM).where(schemas.EventORM.id == id)
1✔
76
                await session.execute(stmt)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc