• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 14664796329

25 Apr 2025 12:38PM UTC coverage: 86.671%. First build
14664796329

Pull #809

github

web-flow
Merge 521140ccd into 668e85803
Pull Request #809: feat: renku data tasks

122 of 132 new or added lines in 4 files covered. (92.42%)

20606 of 23775 relevant lines covered (86.67%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.75
/components/renku_data_services/message_queue/db.py
1
"""Adapters for project database classes."""
2

3
from __future__ import annotations
2✔
4

5
import logging
2✔
6
from collections.abc import Callable
2✔
7
from datetime import UTC, datetime
2✔
8

9
from sqlalchemy import delete, select
2✔
10
from sqlalchemy.ext.asyncio import AsyncSession
2✔
11
from sqlalchemy.orm import Session
2✔
12

13
from renku_data_services import errors
2✔
14
from renku_data_services.message_queue import orm as schemas
2✔
15
from renku_data_services.message_queue.interface import IMessageQueue
2✔
16
from renku_data_services.message_queue.models import Event, Reprovisioning
2✔
17

18
logger = logging.getLogger(__name__)
2✔
19

20

21
class EventRepository:
2✔
22
    """Repository for events."""
23

24
    def __init__(
2✔
25
        self,
26
        session_maker: Callable[..., AsyncSession],
27
        message_queue: IMessageQueue,
28
    ) -> None:
29
        self.session_maker = session_maker
2✔
30
        self.message_queue: IMessageQueue = message_queue
2✔
31

32
    async def get_pending_events(self) -> list[schemas.EventORM]:
2✔
33
        """Get all pending events."""
34
        async with self.session_maker() as session:
1✔
35
            stmt = select(schemas.EventORM).order_by(schemas.EventORM.timestamp_utc)
1✔
36
            events_orm = await session.scalars(stmt)
1✔
37
            return list(events_orm.all())
1✔
38

39
    async def send_pending_events(self) -> None:
2✔
40
        """Get all pending events and send them.
41

42
        We lock rows that get sent and keep sending until there are no more events.
43
        """
44
        n_total_events = 0
1✔
45

46
        while True:
1✔
47
            async with self.session_maker() as session, session.begin():
1✔
48
                stmt = (
1✔
49
                    select(schemas.EventORM)
50
                    # lock retrieved rows, skip already locked ones, to deal with concurrency
51
                    .with_for_update(skip_locked=True)
52
                    .limit(100)
53
                    .order_by(schemas.EventORM.timestamp_utc)
54
                )
55
                result = await session.scalars(stmt)
1✔
56
                events_orm = result.all()
1✔
57

58
                new_events_count = len(events_orm)
1✔
59
                if new_events_count == 0:
1✔
60
                    break
1✔
61

62
                n_total_events += new_events_count
1✔
63

64
                for event in events_orm:
1✔
65
                    try:
1✔
66
                        await self.message_queue.send_message(event.dump())
1✔
67
                        await session.delete(event)  # this has to be done in the same transaction to not get a deadlock
1✔
68
                    except Exception as e:
×
NEW
69
                        logger.warning(f"Couldn't send event {event.id}: {event.payload} on queue {event.queue}: {e}")
×
70

71
        if n_total_events > 0:
1✔
72
            logger.info(f"sent {n_total_events} events to the message queue")
1✔
73

74
    async def store_event(self, session: AsyncSession | Session, event: Event) -> int:
2✔
75
        """Store an event."""
76
        event_orm = schemas.EventORM.load(event)
2✔
77
        session.add(event_orm)
2✔
78

79
        return event_orm.id
2✔
80

81
    async def delete_event(self, id: int) -> None:
2✔
82
        """Delete an event."""
83
        async with self.session_maker() as session, session.begin():
×
84
            stmt = delete(schemas.EventORM).where(schemas.EventORM.id == id)
×
85
            await session.execute(stmt)
×
86

87
    async def delete_all_events(self) -> None:
2✔
88
        """Delete all events. This is only used when testing reprovisioning."""
89
        async with self.session_maker() as session, session.begin():
1✔
90
            await session.execute(delete(schemas.EventORM))
1✔
91

92

93
class ReprovisioningRepository:
2✔
94
    """Repository for Reprovisioning."""
95

96
    def __init__(self, session_maker: Callable[..., AsyncSession]) -> None:
2✔
97
        self.session_maker = session_maker
2✔
98

99
    async def start(self) -> Reprovisioning:
2✔
100
        """Create a new reprovisioning."""
101
        async with self.session_maker() as session, session.begin():
2✔
102
            active_reprovisioning = await session.scalar(select(schemas.ReprovisioningORM))
2✔
103
            if active_reprovisioning:
2✔
104
                raise errors.ConflictError(message="A reprovisioning is already in progress")
1✔
105

106
            reprovisioning_orm = schemas.ReprovisioningORM(start_date=datetime.now(UTC).replace(microsecond=0))
2✔
107
            session.add(reprovisioning_orm)
2✔
108

109
            return reprovisioning_orm.dump()
2✔
110

111
    async def get_active_reprovisioning(self) -> Reprovisioning | None:
2✔
112
        """Get current reprovisioning."""
113
        async with self.session_maker() as session:
2✔
114
            active_reprovisioning = await session.scalar(select(schemas.ReprovisioningORM))
2✔
115
            return active_reprovisioning.dump() if active_reprovisioning else None
2✔
116

117
    async def stop(self) -> None:
2✔
118
        """Stop current reprovisioning."""
119
        async with self.session_maker() as session, session.begin():
2✔
120
            await session.execute(delete(schemas.ReprovisioningORM))
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc