• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 19181077268

07 Nov 2025 09:00PM UTC coverage: 86.352% (-0.5%) from 86.841%
19181077268

Pull #1059

github

web-flow
Merge fb47045e6 into 58a6e4765
Pull Request #1059: fix: patching of session custom resources

89 of 104 new or added lines in 5 files covered. (85.58%)

577 existing lines in 33 files now uncovered.

22791 of 26393 relevant lines covered (86.35%)

1.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.26
/components/renku_data_services/search/reprovision.py
1
"""Code for reprovisioning the search index."""
2

3
from collections.abc import AsyncGenerator, Callable
2✔
4
from datetime import datetime
2✔
5

6
from renku_data_services.app_config import logging
2✔
7
from renku_data_services.base_api.pagination import PaginationRequest
2✔
8
from renku_data_services.base_models.core import APIUser
2✔
9
from renku_data_services.data_connectors.db import DataConnectorRepository
2✔
10
from renku_data_services.data_connectors.models import DataConnector, GlobalDataConnector
2✔
11
from renku_data_services.errors.errors import ForbiddenError
2✔
12
from renku_data_services.message_queue.db import ReprovisioningRepository
2✔
13
from renku_data_services.message_queue.models import Reprovisioning
2✔
14
from renku_data_services.namespace.db import GroupRepository
2✔
15
from renku_data_services.namespace.models import Group
2✔
16
from renku_data_services.project.db import ProjectRepository
2✔
17
from renku_data_services.project.models import Project
2✔
18
from renku_data_services.search.db import SearchUpdatesRepo
2✔
19
from renku_data_services.solr.solr_client import DefaultSolrClient, SolrClientConfig
2✔
20
from renku_data_services.users.db import UserRepo
2✔
21
from renku_data_services.users.models import UserInfo
2✔
22

23
logger = logging.getLogger(__name__)
2✔
24

25

26
class SearchReprovision:
2✔
27
    """Encapsulates routines to reprovision the index."""
28

29
    def __init__(
2✔
30
        self,
31
        search_updates_repo: SearchUpdatesRepo,
32
        reprovisioning_repo: ReprovisioningRepository,
33
        solr_config: SolrClientConfig,
34
        user_repo: UserRepo,
35
        group_repo: GroupRepository,
36
        project_repo: ProjectRepository,
37
        data_connector_repo: DataConnectorRepository,
38
    ) -> None:
39
        self._search_updates_repo = search_updates_repo
2✔
40
        self._reprovisioning_repo = reprovisioning_repo
2✔
41
        self._solr_config = solr_config
2✔
42
        self._user_repo = user_repo
2✔
43
        self._group_repo = group_repo
2✔
44
        self._project_repo = project_repo
2✔
45
        self._data_connector_repo = data_connector_repo
2✔
46

47
    async def run_reprovision(self, admin: APIUser) -> int:
2✔
48
        """Start a reprovisioning if not already running."""
49
        reprovision = await self.acquire_reprovision()
1✔
50
        return await self.init_reprovision(admin, reprovision)
1✔
51

52
    async def acquire_reprovision(self) -> Reprovisioning:
2✔
53
        """Acquire a reprovisioning slot. Throws if already taken."""
54
        return await self._reprovisioning_repo.start()
2✔
55

56
    async def kill_reprovision_lock(self) -> None:
2✔
57
        """Removes an existing reprovisioning lock."""
58
        return await self._reprovisioning_repo.stop()
1✔
59

60
    async def get_current_reprovision(self) -> Reprovisioning | None:
2✔
61
        """Return the current reprovisioning lock."""
62
        return await self._reprovisioning_repo.get_active_reprovisioning()
1✔
63

64
    async def _get_all_data_connectors(
2✔
65
        self, user: APIUser, per_page: int = 20
66
    ) -> AsyncGenerator[DataConnector | GlobalDataConnector, None]:
67
        """Get all data connectors, retrieving `per_page` each time."""
68
        preq = PaginationRequest(page=1, per_page=per_page)
1✔
69
        result: tuple[list[DataConnector | GlobalDataConnector], int] | None = None
1✔
70
        count: int = 0
1✔
71
        while result is None or result[1] > count:
1✔
72
            result = await self._data_connector_repo.get_data_connectors(user=user, pagination=preq)
1✔
73
            count = count + len(result[0])
1✔
74
            preq = PaginationRequest(page=preq.page + 1, per_page=per_page)
1✔
75
            for dc in result[0]:
1✔
76
                yield dc
1✔
77

78
    async def init_reprovision(self, admin: APIUser, reprovisioning: Reprovisioning) -> int:
2✔
79
        """Initiates reprovisioning by inserting documents into the staging table.
80

81
        Deletes all renku entities in the solr core. Then it goes
82
        through all entities in the postgres datatabase and inserts
83
        solr documents into the `search_update` table. A background
84
        process is querying this table and will eventually update
85
        solr with these entries.
86
        """
87

88
        if not admin.is_admin:
2✔
89
            raise ForbiddenError(message="Only Renku administrators are allowed to start search reprovisioning.")
×
90

91
        def log_counter(c: int) -> None:
2✔
92
            if c % 50 == 0:
1✔
93
                logger.info(f"Inserted {c}. entities into staging table...")
×
94

95
        counter = 0
2✔
96
        try:
2✔
97
            logger.info(f"Starting reprovisioning with ID {reprovisioning.id}")
2✔
98
            started = datetime.now()
2✔
99
            await self._search_updates_repo.clear_all()
2✔
100
            async with DefaultSolrClient(self._solr_config) as client:
2✔
101
                await client.delete("_type:*")
2✔
102

103
            all_users = self._user_repo.get_all_users(requested_by=admin)
2✔
104
            counter = await self.__update_entities(all_users, "user", started, counter, log_counter)
2✔
105
            logger.info(f"Done adding user entities to search_updates table. Record count: {counter}.")
2✔
106

107
            all_groups = self._group_repo.get_all_groups(requested_by=admin)
2✔
108
            counter = await self.__update_entities(all_groups, "group", started, counter, log_counter)
2✔
109
            logger.info(f"Done adding group entities to search_updates table. Record count: {counter}")
1✔
110

111
            all_projects = self._project_repo.get_all_projects(requested_by=admin)
1✔
112
            counter = await self.__update_entities(all_projects, "project", started, counter, log_counter)
1✔
113
            logger.info(f"Done adding project entities to search_updates table. Record count: {counter}")
1✔
114

115
            all_dcs = self._get_all_data_connectors(admin, per_page=20)
1✔
116
            counter = await self.__update_entities(all_dcs, "data connector", started, counter, log_counter)
1✔
117
            logger.info(f"Done adding dataconnector entities to search_updates table. Record count: {counter}")
1✔
118

119
            logger.info(f"Inserted {counter} entities into the staging table.")
1✔
120
        except Exception as e:
×
121
            logger.error("Error while reprovisioning entities!", exc_info=e)
×
122
            ## TODO error handling. skip or fail?
123
        finally:
124
            await self._reprovisioning_repo.stop()
1✔
125

126
        return counter
1✔
127

128
    async def __update_entities(
2✔
129
        self,
130
        iter: AsyncGenerator[Project | Group | UserInfo | DataConnector | GlobalDataConnector, None],
131
        name: str,
132
        started: datetime,
133
        counter: int,
134
        on_count: Callable[[int], None],
135
    ) -> int:
136
        try:
2✔
137
            async for entity in iter:
2✔
138
                try:
1✔
139
                    await self._search_updates_repo.insert(entity, started)
1✔
140
                    counter += 1
1✔
141
                    on_count(counter)
1✔
UNCOV
142
                except Exception as e:
×
UNCOV
143
                    logger.error(f"Error updating search entry for {name} {entity.id}: {e}", exc_info=e)
×
144
        except Exception as e:
1✔
145
            logger.error(f"Error updating search entry for {name}s: {e}", exc_info=e)
1✔
146

147
        return counter
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc