• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 19171102871

07 Nov 2025 02:18PM UTC coverage: 86.819% (-0.02%) from 86.838%
19171102871

Pull #1102

github

web-flow
Merge 8d0671560 into b462e7159
Pull Request #1102: ignore: test PR for codeql action update

22849 of 26318 relevant lines covered (86.82%)

1.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.26
/components/renku_data_services/search/reprovision.py
1
"""Code for reprovisioning the search index."""
2

3
from collections.abc import AsyncGenerator, Callable
2✔
4
from datetime import datetime
2✔
5

6
from renku_data_services.app_config import logging
2✔
7
from renku_data_services.base_api.pagination import PaginationRequest
2✔
8
from renku_data_services.base_models.core import APIUser
2✔
9
from renku_data_services.data_connectors.db import DataConnectorRepository
2✔
10
from renku_data_services.data_connectors.models import DataConnector, GlobalDataConnector
2✔
11
from renku_data_services.errors.errors import ForbiddenError
2✔
12
from renku_data_services.message_queue.db import ReprovisioningRepository
2✔
13
from renku_data_services.message_queue.models import Reprovisioning
2✔
14
from renku_data_services.namespace.db import GroupRepository
2✔
15
from renku_data_services.namespace.models import Group
2✔
16
from renku_data_services.project.db import ProjectRepository
2✔
17
from renku_data_services.project.models import Project
2✔
18
from renku_data_services.search.db import SearchUpdatesRepo
2✔
19
from renku_data_services.solr.solr_client import DefaultSolrClient, SolrClientConfig
2✔
20
from renku_data_services.users.db import UserRepo
2✔
21
from renku_data_services.users.models import UserInfo
2✔
22

23
logger = logging.getLogger(__name__)
2✔
24

25

26
class SearchReprovision:
2✔
27
    """Encapsulates routines to reprovision the index."""
28

29
    def __init__(
2✔
30
        self,
31
        search_updates_repo: SearchUpdatesRepo,
32
        reprovisioning_repo: ReprovisioningRepository,
33
        solr_config: SolrClientConfig,
34
        user_repo: UserRepo,
35
        group_repo: GroupRepository,
36
        project_repo: ProjectRepository,
37
        data_connector_repo: DataConnectorRepository,
38
    ) -> None:
39
        self._search_updates_repo = search_updates_repo
2✔
40
        self._reprovisioning_repo = reprovisioning_repo
2✔
41
        self._solr_config = solr_config
2✔
42
        self._user_repo = user_repo
2✔
43
        self._group_repo = group_repo
2✔
44
        self._project_repo = project_repo
2✔
45
        self._data_connector_repo = data_connector_repo
2✔
46

47
    async def run_reprovision(self, admin: APIUser) -> int:
2✔
48
        """Start a reprovisioning if not already running."""
49
        reprovision = await self.acquire_reprovision()
1✔
50
        return await self.init_reprovision(admin, reprovision)
1✔
51

52
    async def acquire_reprovision(self) -> Reprovisioning:
2✔
53
        """Acquire a reprovisioning slot. Throws if already taken."""
54
        return await self._reprovisioning_repo.start()
2✔
55

56
    async def kill_reprovision_lock(self) -> None:
2✔
57
        """Removes an existing reprovisioning lock."""
58
        return await self._reprovisioning_repo.stop()
1✔
59

60
    async def get_current_reprovision(self) -> Reprovisioning | None:
2✔
61
        """Return the current reprovisioning lock."""
62
        return await self._reprovisioning_repo.get_active_reprovisioning()
1✔
63

64
    async def _get_all_data_connectors(
2✔
65
        self, user: APIUser, per_page: int = 20
66
    ) -> AsyncGenerator[DataConnector | GlobalDataConnector, None]:
67
        """Get all data connectors, retrieving `per_page` each time."""
68
        preq = PaginationRequest(page=1, per_page=per_page)
1✔
69
        result: tuple[list[DataConnector | GlobalDataConnector], int] | None = None
1✔
70
        count: int = 0
1✔
71
        while result is None or result[1] > count:
1✔
72
            result = await self._data_connector_repo.get_data_connectors(user=user, pagination=preq)
1✔
73
            count = count + len(result[0])
1✔
74
            preq = PaginationRequest(page=preq.page + 1, per_page=per_page)
1✔
75
            for dc in result[0]:
1✔
76
                yield dc
1✔
77

78
    async def init_reprovision(self, admin: APIUser, reprovisioning: Reprovisioning) -> int:
2✔
79
        """Initiates reprovisioning by inserting documents into the staging table.
80

81
        Deletes all renku entities in the solr core. Then it goes
82
        through all entities in the postgres datatabase and inserts
83
        solr documents into the `search_update` table. A background
84
        process is querying this table and will eventually update
85
        solr with these entries.
86
        """
87

88
        if not admin.is_admin:
2✔
89
            raise ForbiddenError(message="Only Renku administrators are allowed to start search reprovisioning.")
×
90

91
        def log_counter(c: int) -> None:
2✔
92
            if c % 50 == 0:
1✔
93
                logger.info(f"Inserted {c}. entities into staging table...")
×
94

95
        counter = 0
2✔
96
        try:
2✔
97
            logger.info(f"Starting reprovisioning with ID {reprovisioning.id}")
2✔
98
            started = datetime.now()
2✔
99
            await self._search_updates_repo.clear_all()
2✔
100
            async with DefaultSolrClient(self._solr_config) as client:
2✔
101
                await client.delete("_type:*")
2✔
102

103
            all_users = self._user_repo.get_all_users(requested_by=admin)
2✔
104
            counter = await self.__update_entities(all_users, "user", started, counter, log_counter)
2✔
105
            logger.info(f"Done adding user entities to search_updates table. Record count: {counter}.")
1✔
106

107
            all_groups = self._group_repo.get_all_groups(requested_by=admin)
1✔
108
            counter = await self.__update_entities(all_groups, "group", started, counter, log_counter)
1✔
109
            logger.info(f"Done adding group entities to search_updates table. Record count: {counter}")
1✔
110

111
            all_projects = self._project_repo.get_all_projects(requested_by=admin)
1✔
112
            counter = await self.__update_entities(all_projects, "project", started, counter, log_counter)
1✔
113
            logger.info(f"Done adding project entities to search_updates table. Record count: {counter}")
1✔
114

115
            all_dcs = self._get_all_data_connectors(admin, per_page=20)
1✔
116
            counter = await self.__update_entities(all_dcs, "data connector", started, counter, log_counter)
1✔
117
            logger.info(f"Done adding dataconnector entities to search_updates table. Record count: {counter}")
1✔
118

119
            logger.info(f"Inserted {counter} entities into the staging table.")
1✔
120
        except Exception as e:
×
121
            logger.error("Error while reprovisioning entities!", exc_info=e)
×
122
            ## TODO error handling. skip or fail?
123
        finally:
124
            await self._reprovisioning_repo.stop()
1✔
125

126
        return counter
1✔
127

128
    async def __update_entities(
2✔
129
        self,
130
        iter: AsyncGenerator[Project | Group | UserInfo | DataConnector | GlobalDataConnector, None],
131
        name: str,
132
        started: datetime,
133
        counter: int,
134
        on_count: Callable[[int], None],
135
    ) -> int:
136
        try:
2✔
137
            async for entity in iter:
2✔
138
                try:
2✔
139
                    await self._search_updates_repo.insert(entity, started)
2✔
140
                    counter += 1
1✔
141
                    on_count(counter)
1✔
142
                except Exception as e:
1✔
143
                    logger.error(f"Error updating search entry for {name} {entity.id}: {e}", exc_info=e)
1✔
144
        except Exception as e:
×
145
            logger.error(f"Error updating search entry for {name}s: {e}", exc_info=e)
×
146

147
        return counter
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc