• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SwissDataScienceCenter / renku-data-services / 15638916243

13 Jun 2025 02:33PM UTC coverage: 87.034% (+0.009%) from 87.025%
15638916243

push

github

web-flow
fix: Skip authz if admin is requesting data connectors (#887)

For reprovisioning it is necessary to get all entities. When selecting
data connectors from the db, there was always a call to authz to first
get all identifiers of resources the requesting user has read access
to. For internal service admins, this doesn't work, because they are
not modelled in authz. For admins in general the call can be
prevented, because admins are defined to see everything.

This change skips the authz call if the requesting user is an admin.
Additionally the search reprovisioning code uses an internal service
admin for doing its work when triggered internally (and not by an endpoint).

23 of 24 new or added lines in 4 files covered. (95.83%)

1 existing line in 1 file now uncovered.

21869 of 25127 relevant lines covered (87.03%)

1.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.26
/components/renku_data_services/search/reprovision.py
1
"""Code for reprovisioning the search index."""
2

3
from collections.abc import AsyncGenerator, Callable
2✔
4
from datetime import datetime
2✔
5

6
from renku_data_services.app_config import logging
2✔
7
from renku_data_services.base_api.pagination import PaginationRequest
2✔
8
from renku_data_services.base_models.core import APIUser
2✔
9
from renku_data_services.data_connectors.db import DataConnectorRepository
2✔
10
from renku_data_services.data_connectors.models import DataConnector, GlobalDataConnector
2✔
11
from renku_data_services.errors.errors import ForbiddenError
2✔
12
from renku_data_services.message_queue.db import ReprovisioningRepository
2✔
13
from renku_data_services.message_queue.models import Reprovisioning
2✔
14
from renku_data_services.namespace.db import GroupRepository
2✔
15
from renku_data_services.namespace.models import Group
2✔
16
from renku_data_services.project.db import ProjectRepository
2✔
17
from renku_data_services.project.models import Project
2✔
18
from renku_data_services.search.db import SearchUpdatesRepo
2✔
19
from renku_data_services.solr.solr_client import DefaultSolrClient, SolrClientConfig
2✔
20
from renku_data_services.users.db import UserRepo
2✔
21
from renku_data_services.users.models import UserInfo
2✔
22

23
logger = logging.getLogger(__name__)
2✔
24

25

26
class SearchReprovision:
2✔
27
    """Encapsulates routines to reprovision the index."""
28

29
    def __init__(
2✔
30
        self,
31
        search_updates_repo: SearchUpdatesRepo,
32
        reprovisioning_repo: ReprovisioningRepository,
33
        solr_config: SolrClientConfig,
34
        user_repo: UserRepo,
35
        group_repo: GroupRepository,
36
        project_repo: ProjectRepository,
37
        data_connector_repo: DataConnectorRepository,
38
    ) -> None:
39
        self._search_updates_repo = search_updates_repo
2✔
40
        self._reprovisioning_repo = reprovisioning_repo
2✔
41
        self._solr_config = solr_config
2✔
42
        self._user_repo = user_repo
2✔
43
        self._group_repo = group_repo
2✔
44
        self._project_repo = project_repo
2✔
45
        self._data_connector_repo = data_connector_repo
2✔
46

47
    async def run_reprovision(self, admin: APIUser) -> int:
2✔
48
        """Start a reprovisioning if not already running."""
49
        reprovision = await self.acquire_reprovision()
1✔
50
        return await self.init_reprovision(admin, reprovision)
1✔
51

52
    async def acquire_reprovision(self) -> Reprovisioning:
2✔
53
        """Acquire a reprovisioning slot. Throws if already taken."""
54
        return await self._reprovisioning_repo.start()
2✔
55

56
    async def kill_reprovision_lock(self) -> None:
2✔
57
        """Removes an existing reprovisioning lock."""
58
        return await self._reprovisioning_repo.stop()
1✔
59

60
    async def get_current_reprovision(self) -> Reprovisioning | None:
2✔
61
        """Return the current reprovisioning lock."""
62
        return await self._reprovisioning_repo.get_active_reprovisioning()
1✔
63

64
    async def _get_all_data_connectors(
2✔
65
        self, user: APIUser, per_page: int = 20
66
    ) -> AsyncGenerator[DataConnector | GlobalDataConnector, None]:
67
        """Get all data connectors, retrieving `per_page` each time."""
68
        preq = PaginationRequest(page=1, per_page=per_page)
1✔
69
        result: tuple[list[DataConnector | GlobalDataConnector], int] | None = None
1✔
70
        count: int = 0
1✔
71
        while result is None or result[1] > count:
1✔
72
            result = await self._data_connector_repo.get_data_connectors(user=user, pagination=preq)
1✔
73
            count = count + len(result[0])
1✔
74
            preq = PaginationRequest(page=preq.page + 1, per_page=per_page)
1✔
75
            for dc in result[0]:
1✔
76
                yield dc
1✔
77

78
    async def init_reprovision(self, admin: APIUser, reprovisioning: Reprovisioning) -> int:
2✔
79
        """Initiates reprovisioning by inserting documents into the staging table.
80

81
        Deletes all renku entities in the solr core. Then it goes
82
        through all entities in the postgres datatabase and inserts
83
        solr documents into the `search_update` table. A background
84
        process is querying this table and will eventually update
85
        solr with these entries.
86
        """
87

88
        if not admin.is_admin:
2✔
NEW
89
            raise ForbiddenError(message="Only Renku administrators are allowed to start search reprovisioning.")
×
90

91
        def log_counter(c: int) -> None:
2✔
92
            if c % 50 == 0:
1✔
93
                logger.info(f"Inserted {c}. entities into staging table...")
×
94

95
        counter = 0
2✔
96
        try:
2✔
97
            logger.info(f"Starting reprovisioning with ID {reprovisioning.id}")
2✔
98
            started = datetime.now()
2✔
99
            await self._search_updates_repo.clear_all()
2✔
100
            async with DefaultSolrClient(self._solr_config) as client:
2✔
101
                await client.delete("_type:*")
2✔
102

103
            all_users = self._user_repo.get_all_users(requested_by=admin)
2✔
104
            counter = await self.__update_entities(all_users, "user", started, counter, log_counter)
2✔
105
            logger.info(f"Done adding user entities to search_updates table. Record count: {counter}.")
1✔
106

107
            all_groups = self._group_repo.get_all_groups(requested_by=admin)
1✔
108
            counter = await self.__update_entities(all_groups, "group", started, counter, log_counter)
1✔
109
            logger.info(f"Done adding group entities to search_updates table. Record count: {counter}")
1✔
110

111
            all_projects = self._project_repo.get_all_projects(requested_by=admin)
1✔
112
            counter = await self.__update_entities(all_projects, "project", started, counter, log_counter)
1✔
113
            logger.info(f"Done adding project entities to search_updates table. Record count: {counter}")
1✔
114

115
            all_dcs = self._get_all_data_connectors(admin, per_page=20)
1✔
116
            counter = await self.__update_entities(all_dcs, "data connector", started, counter, log_counter)
1✔
117
            logger.info(f"Done adding dataconnector entities to search_updates table. Record count: {counter}")
1✔
118

119
            logger.info(f"Inserted {counter} entities into the staging table.")
1✔
120
        except Exception as e:
×
121
            logger.error("Error while reprovisioning entities!", exc_info=e)
×
122
            ## TODO error handling. skip or fail?
123
        finally:
124
            await self._reprovisioning_repo.stop()
1✔
125

126
        return counter
1✔
127

128
    async def __update_entities(
2✔
129
        self,
130
        iter: AsyncGenerator[Project | Group | UserInfo | DataConnector | GlobalDataConnector, None],
131
        name: str,
132
        started: datetime,
133
        counter: int,
134
        on_count: Callable[[int], None],
135
    ) -> int:
136
        try:
2✔
137
            async for entity in iter:
2✔
138
                try:
2✔
139
                    await self._search_updates_repo.insert(entity, started)
2✔
140
                    counter += 1
1✔
141
                    on_count(counter)
1✔
142
                except Exception as e:
1✔
143
                    logger.error(f"Error updating search entry for {name} {entity.id}: {e}", exc_info=e)
1✔
144
        except Exception as e:
×
145
            logger.error(f"Error updating search entry for {name}s: {e}", exc_info=e)
×
146

147
        return counter
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc