• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 11784329519

11 Nov 2024 06:40PM UTC coverage: 86.369% (-4.5%) from 90.823%
11784329519

push

github

olevski
fix: run the /post-init.sh script if present

Co-authored-by: Flora Thiebaut <flora.thiebaut@sdsc.ethz.ch>

1 of 1 new or added line in 1 file covered. (100.0%)

269 existing lines in 26 files now uncovered.

14991 of 17357 relevant lines covered (86.37%)

1.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.96
/components/renku_data_services/message_queue/db.py
1
"""Adapters for project database classes."""
2✔
2

3
from __future__ import annotations
2✔
4

5
from collections.abc import Callable
2✔
6
from datetime import UTC, datetime
2✔
7

8
from sanic.log import logger
2✔
9
from sqlalchemy import delete, select
2✔
10
from sqlalchemy.ext.asyncio import AsyncSession
2✔
11
from sqlalchemy.orm import Session
2✔
12

13
from renku_data_services import errors
2✔
14
from renku_data_services.message_queue import orm as schemas
2✔
15
from renku_data_services.message_queue.interface import IMessageQueue
2✔
16
from renku_data_services.message_queue.models import Event, Reprovisioning
2✔
17

18

19
class EventRepository:
2✔
20
    """Repository for events."""
2✔
21

22
    def __init__(
2✔
23
        self,
24
        session_maker: Callable[..., AsyncSession],
25
        message_queue: IMessageQueue,
26
    ) -> None:
27
        self.session_maker = session_maker
2✔
28
        self.message_queue: IMessageQueue = message_queue
2✔
29

30
    async def _get_pending_events(self) -> list[schemas.EventORM]:
2✔
31
        """Get all pending events."""
32
        async with self.session_maker() as session:
1✔
33
            stmt = select(schemas.EventORM).order_by(schemas.EventORM.timestamp_utc)
1✔
34
            events_orm = await session.scalars(stmt)
1✔
35
            return list(events_orm.all())
1✔
36

37
    async def send_pending_events(self) -> None:
2✔
38
        """Get all pending events and send them.
39

40
        We lock rows that get sent and keep sending until there are no more events.
41
        """
42
        n_total_events = 0
1✔
43

44
        while True:
1✔
45
            async with self.session_maker() as session, session.begin():
1✔
46
                stmt = (
1✔
47
                    select(schemas.EventORM)
48
                    # lock retrieved rows, skip already locked ones, to deal with concurrency
49
                    .with_for_update(skip_locked=True)
50
                    .limit(100)
51
                    .order_by(schemas.EventORM.timestamp_utc)
52
                )
53
                result = await session.scalars(stmt)
1✔
54
                events_orm = result.all()
1✔
55

56
                new_events_count = len(events_orm)
1✔
57
                if new_events_count == 0:
1✔
58
                    break
1✔
59

60
                n_total_events += new_events_count
1✔
61

62
                for event in events_orm:
1✔
63
                    try:
1✔
64
                        await self.message_queue.send_message(event.dump())
1✔
65

66
                        await session.delete(event)  # this has to be done in the same transaction to not get a deadlock
1✔
67
                    except Exception as e:
×
68
                        logger.warning(f"couldn't send event {event.payload} on queue {event.queue}: {e}")
×
69

70
        if n_total_events > 0:
1✔
71
            logger.info(f"sent {n_total_events} events to the message queue")
1✔
72

73
    async def store_event(self, session: AsyncSession | Session, event: Event) -> int:
2✔
74
        """Store an event."""
75
        event_orm = schemas.EventORM.load(event)
2✔
76
        session.add(event_orm)
2✔
77

78
        return event_orm.id
2✔
79

80
    async def delete_event(self, id: int) -> None:
2✔
81
        """Delete an event."""
82
        async with self.session_maker() as session, session.begin():
×
83
            stmt = delete(schemas.EventORM).where(schemas.EventORM.id == id)
×
UNCOV
84
            await session.execute(stmt)
×
85

86
    async def delete_all_events(self) -> None:
2✔
87
        """Delete all events. This is only used when testing reprovisioning."""
88
        async with self.session_maker() as session, session.begin():
1✔
89
            await session.execute(delete(schemas.EventORM))
1✔
90

91

92
class ReprovisioningRepository:
2✔
93
    """Repository for Reprovisioning."""
2✔
94

95
    def __init__(self, session_maker: Callable[..., AsyncSession]) -> None:
2✔
96
        self.session_maker = session_maker
2✔
97

98
    async def start(self) -> Reprovisioning:
2✔
99
        """Create a new reprovisioning."""
100
        async with self.session_maker() as session, session.begin():
2✔
101
            active_reprovisioning = await session.scalar(select(schemas.ReprovisioningORM))
2✔
102
            if active_reprovisioning:
2✔
103
                raise errors.ConflictError(message="A reprovisioning is already in progress")
1✔
104

105
            reprovisioning_orm = schemas.ReprovisioningORM(start_date=datetime.now(UTC).replace(microsecond=0))
2✔
106
            session.add(reprovisioning_orm)
2✔
107

108
            return reprovisioning_orm.dump()
2✔
109

110
    async def get_active_reprovisioning(self) -> Reprovisioning | None:
2✔
111
        """Get current reprovisioning."""
112
        async with self.session_maker() as session:
2✔
113
            active_reprovisioning = await session.scalar(select(schemas.ReprovisioningORM))
2✔
114
            return active_reprovisioning.dump() if active_reprovisioning else None
2✔
115

116
    async def stop(self) -> None:
2✔
117
        """Stop current reprovisioning."""
118
        async with self.session_maker() as session, session.begin():
2✔
119
            await session.execute(delete(schemas.ReprovisioningORM))
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc