• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nolar / kopf / 13720737485

07 Mar 2025 12:31PM UTC coverage: 89.819% (+0.4%) from 89.443%
13720737485

Pull #1031

github

web-flow
Merge 31b36db28 into 7f5046978
Pull Request #1031: Re-authenticate if the session is closed by a concurrent request

2123 of 2432 branches covered (87.29%)

Branch coverage included in aggregate %.

4 of 8 new or added lines in 3 files covered. (50.0%)

5 existing lines in 2 files now uncovered.

5499 of 6054 relevant lines covered (90.83%)

11.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.18
/kopf/_cogs/structs/credentials.py
1
"""
2
Authentication-related structures.
3

4
Kopf handles some rudimentary authentication directly, and exposes the ways
5
to implement custom authentication methods (via `on.login` handlers).
6

7
For that, a minimally sufficient data structure is introduced -- both
8
to bring all the credentials together in a structured and type-annotated way,
9
and to receive them from the operators' login-handlers with custom auth methods.
10

11
The "rudimentary" is defined as the information passed to the HTTP protocol
12
and TCP/SSL connection only, i.e. everything usable in a generic HTTP client,
13
and nothing more than that:
14

15
* TCP server host & port.
16
* SSL verification/ignorance flag.
17
* SSL certificate authority.
18
* SSL client certificate and its private key.
19
* HTTP ``Authorization: Basic username:password``.
20
* HTTP ``Authorization: Bearer token`` (or other schemes: Bearer, Digest, etc).
21
* URL's default namespace for the cases when this is implied.
22

23
.. seealso::
24
    :func:`authentication` and :mod:`piggybacking`.
25
"""
26
import asyncio
13✔
27
import collections
13✔
28
import dataclasses
13✔
29
import datetime
13✔
30
import random
13✔
31
from typing import AsyncIterable, AsyncIterator, Callable, Dict, List, \
13✔
32
                   Mapping, NewType, Optional, Tuple, TypeVar, cast
33

34
from kopf._cogs.aiokits import aiotoggles
13✔
35

36

37
class LoginError(Exception):
13✔
38
    """ Raised when the operator cannot login to the API. """
39

40

41
class AccessError(Exception):
13✔
42
    """ Raised when the operator cannot access the cluster API. """
43

44

45
@dataclasses.dataclass(frozen=True)
13✔
46
class ConnectionInfo:
13✔
47
    """
48
    A single endpoint with specific credentials and connection flags to use.
49
    """
50
    server: str  # e.g. "https://localhost:443"
13✔
51
    ca_path: Optional[str] = None
13✔
52
    ca_data: Optional[bytes] = None
13✔
53
    insecure: Optional[bool] = None
13✔
54
    username: Optional[str] = None
13✔
55
    password: Optional[str] = None
13✔
56
    scheme: Optional[str] = None  # RFC-7235/5.1: e.g. Bearer, Basic, Digest, etc.
13✔
57
    token: Optional[str] = None
13✔
58
    certificate_path: Optional[str] = None
13✔
59
    certificate_data: Optional[bytes] = None
13✔
60
    private_key_path: Optional[str] = None
13✔
61
    private_key_data: Optional[bytes] = None
13✔
62
    default_namespace: Optional[str] = None  # used for cluster objects' k8s-events.
13✔
63
    priority: int = 0
13✔
64
    expiration: Optional[datetime.datetime] = None  # TZ-aware or TZ-naive (implies UTC)
13✔
65

66

67
_T = TypeVar('_T', bound=object)
13✔
68

69
# Usually taken from the HandlerId (also a string), but semantically it is on its own.
70
VaultKey = NewType('VaultKey', str)
13✔
71

72

73
@dataclasses.dataclass
13✔
74
class VaultItem:
13✔
75
    """
76
    The actual item stored in the vault. It is never exposed externally.
77

78
    Used for proper garbage collection when the key is removed from the vault
79
    (to avoid orchestrating extra cache structures and keeping them in sync).
80

81
    The caches are populated by `Vault.extended` on-demand.
82
    """
83
    info: ConnectionInfo
13✔
84
    caches: Optional[Dict[str, object]] = None
13✔
85

86

87
class Vault(AsyncIterable[Tuple[VaultKey, ConnectionInfo]]):
13✔
88
    """
89
    A store for currently valid authentication methods.
90

91
    *Through we call it a vault to add a sense of security.*
92

93
    Normally, only one authentication method is used at a time in multiple
94
    methods and tasks (e.g. resource watching/patching, peering, etc.).
95

96
    Multiple methods to represent the same principal is an unusual case,
97
    but it is also possible as a side effect. Same for multiple distinct
98
    identities of a single operator.
99

100
    The credentials store is created once for an operator (a task),
101
    and is then used by multiple tasks running in parallel:
102

103
    * Consumed by the API client wrappers to authenticate in the API.
104
    * Reported by the API client wrappers if some of the credentials fail.
105
    * Populated by the authenticator background task when and if needed.
106

107
    .. seealso::
108
        :func:`auth.authenticated` and :func:`authentication`.
109
    """
110
    _current: Dict[VaultKey, VaultItem]
13✔
111
    _invalid: Dict[VaultKey, List[VaultItem]]
13✔
112

113
    def __init__(
13✔
114
            self,
115
            __src: Optional[Mapping[str, object]] = None,
116
    ) -> None:
117
        super().__init__()
13✔
118
        self._current = {}
13✔
119
        self._invalid = collections.defaultdict(list)
13✔
120
        self._lock = asyncio.Lock()
13✔
121
        self._next_expiration: Optional[datetime.datetime] = None
13✔
122

123
        if __src is not None:
13✔
124
            self._update_converted(__src)
13✔
125

126
        # Mark a pre-populated vault to be usable instantly,
127
        # or trigger the initial authentication for an empty vault.
128
        self._ready = aiotoggles.Toggle(not self.is_empty())
13✔
129

130
    def __repr__(self) -> str:
13✔
131
        return f'<{self.__class__.__name__}: {self._current!r}>'
×
132

133
    def __bool__(self) -> bool:
13✔
134
        raise NotImplementedError("The vault should not be evaluated as bool.")
13✔
135

136
    async def __aiter__(
13✔
137
            self,
138
    ) -> AsyncIterator[Tuple[VaultKey, ConnectionInfo]]:
139
        async for key, item in self._items():
13✔
140
            yield key, item.info
13✔
141

142
    async def extended(
13✔
143
            self,
144
            factory: Callable[[ConnectionInfo], _T],
145
            purpose: Optional[str] = None,
146
    ) -> AsyncIterator[Tuple[VaultKey, ConnectionInfo, _T]]:
147
        """
148
        Iterate the connection info items with their cached object.
149

150
        The cached objects are identified by the purpose (an arbitrary string).
151
        Multiple types of objects can be cached under different names.
152

153
        The factory is a one-argument function of a `ConnectionInfo`,
154
        that returns the object to be cached for this connection info.
155
        It is called only once per item and purpose.
156
        """
157
        purpose = purpose if purpose is not None else repr(factory)
13✔
158
        async for key, item in self._items():
13✔
159
            if item.caches is None:  # quick-check with no locking overhead.
13✔
160
                async with self._lock:
13✔
161
                    if item.caches is None:  # securely synchronised check.
13!
162
                        item.caches = {}
13✔
163
            if purpose not in item.caches:  # quick-check with no locking overhead.
13✔
164
                async with self._lock:
13✔
165
                    if purpose not in item.caches:  # securely synchronised check.
13!
166
                        item.caches[purpose] = factory(item.info)
13✔
167
            yield key, item.info, cast(_T, item.caches[purpose])
13✔
168

169
    async def _items(
13✔
170
            self,
171
    ) -> AsyncIterator[Tuple[VaultKey, VaultItem]]:
172
        """
173
        Yield the raw items as stored in the vault in random order.
174

175
        The items are yielded until either all of them are depleted,
176
        or until the yielded one does not fail (no `.invalidate` call made).
177
        Restart on every re-authentication (if new items are added).
178
        """
179

180
        # Yield the connection infos until either all of them are depleted,
181
        # or until the yielded one does not fail (no `.invalidate` call made).
182
        # Restart on every re-authentication (if new items are added).
183
        while True:
9✔
184

185
            # Whether on the 1st run, or during the active re-authentication,
186
            # ensure that the items are ready before yielding them.
187
            await self._ready.wait_for(True)
13✔
188

189
            # Check for expiration strictly after a possible re-authentication.
190
            # This might cause another re-authentication if the credentials are expired at creation.
191
            await self.expire()
13✔
192

193
            # Select the items to yield and let it (i.e. a consumer) work.
194
            async with self._lock:
13✔
195
                yielded_key, yielded_item = self.select()
13✔
196
            yield yielded_key, yielded_item
13✔
197

198
            # If the yielded item has been invalidated, assume that this item has failed.
199
            # Otherwise (the item is in the list), it has succeeded -- we are done.
200
            # Note: checked by identity, in case a similar item is re-added as a different object.
201
            async with self._lock:
13!
202
                if yielded_key in self._current and self._current[yielded_key] is yielded_item:
13!
203
                    break
13✔
204

205
    def select(self) -> Tuple[VaultKey, VaultItem]:
13✔
206
        """
207
        Select the next item (not the info!) to try (and do so infinitely).
208

209
        .. warning::
210
            This method is not async/await-safe: if the data change on the go,
211
            it can lead to improper items returned.
212
        """
213
        if not self._current:
13✔
214
            raise LoginError("Ran out of valid credentials. Consider installing "
13✔
215
                             "an API client library or adding a login handler. See more: "
216
                             "https://kopf.readthedocs.io/en/stable/authentication/")
217
        prioritised: Dict[int, List[Tuple[VaultKey, VaultItem]]]
218
        prioritised = collections.defaultdict(list)
13✔
219
        for key, item in self._current.items():
13✔
220
            prioritised[item.info.priority].append((key, item))
13✔
221
        top_priority = max(list(prioritised.keys()))
13✔
222
        key, item = random.choice(prioritised[top_priority])
13✔
223
        return key, item
13✔
224

225
    async def expire(self) -> None:
13✔
226
        """
227
        Discard the expired credentials, and re-authenticate as needed.
228

229
        Unlike invalidation, the expired credentials are not remembered
230
        and not blocked from reappearing.
231
        """
232
        now = datetime.datetime.now(datetime.timezone.utc)
13✔
233

234
        # Quick & lockless for speed: it is done on every API call, we have no time for locks.
235
        if self._next_expiration is not None and now >= self._next_expiration:
13✔
236
            async with self._lock:
13✔
237
                for key, item in list(self._current.items()):
13✔
238
                    expiration = item.info.expiration
13✔
239
                    if expiration is not None:
13!
240
                        if expiration.tzinfo is None:
13!
241
                            expiration = expiration.replace(tzinfo=datetime.timezone.utc)
×
242
                        if now >= expiration:
13✔
243
                            await self._flush_caches(item)
13✔
244
                            del self._current[key]
13✔
245
                self._update_expiration()
13✔
246
                need_reauth = not self._current  # i.e. nothing is left at all
13✔
247

248
            # Initiate a re-authentication activity, and block until it is finished.
249
            if need_reauth:
13✔
250
                await self._ready.turn_to(False)
13✔
251
                await self._ready.wait_for(True)
13✔
252

253
    async def invalidate(
13✔
254
            self,
255
            key: VaultKey,
256
            *,
257
            exc: Optional[Exception] = None,
258
    ) -> None:
259
        """
260
        Discard the specified credentials, and re-authenticate as needed.
261

262
        Multiple calls can be made for a single authenticator and credentials,
263
        if used for multiple requests at the same time (a common case).
264
        All of them will be blocked the same way, until one and only one
265
        re-authentication happens in a background task. They will be
266
        unblocked at the same instant once the new credentials are ready.
267

268
        If the re-authentication fails in the background task, this method
269
        re-raises the original exception (most likely a HTTP 401 error),
270
        and lets the client tasks to fail in their own stack.
271
        The background task continues to run and tries to re-authenticate
272
        on the next API calls until cancelled due to the operator exit.
273
        """
274

275
        # Exclude the failed connection items from the list of available ones.
276
        # But keep a short history of invalid items, so that they are not re-added.
277
        async with self._lock:
13✔
278
            if key in self._current:
13!
279
                await self._flush_caches(self._current[key])
13✔
280
                self._invalid[key] = self._invalid[key][-2:] + [self._current[key]]
13✔
281
                del self._current[key]
13✔
282
                self._update_expiration()
13✔
283
            need_reauth = not self._current  # i.e. nothing is left at all
13✔
284

285
        # Initiate a re-authentication activity, and block until it is finished.
286
        if need_reauth:
13✔
287
            await self._ready.turn_to(False)
13✔
288
            await self._ready.wait_for(True)
13✔
289

290
        # If the re-auth has failed, re-raise the original exception in the current stack.
291
        # If the original exception is unknown, raise normally on the next iteration's yield.
292
        # The error here is optional -- for better stack traces of the original exception `exc`.
293
        # Keep in mind, this routine is called in parallel from many tasks for the same keys.
294
        async with self._lock:
13✔
295
            if not self._current:
13✔
296
                if exc is not None:
13✔
297
                    raise LoginError("Ran out of valid credentials. Consider installing "
13✔
298
                                     "an API client library or adding a login handler. See more: "
299
                                     "https://kopf.readthedocs.io/en/stable/authentication/") from exc
300

301
    async def populate(
13✔
302
            self,
303
            __src: Mapping[str, object],
304
    ) -> None:
305
        """
306
        Add newly retrieved credentials.
307

308
        Used by :func:`authentication` to add newly retrieved credentials
309
        from the authentication activity handlers. Some of the credentials
310
        can be duplicates of the existing ones -- only one of them is used then.
311
        """
312

313
        # Remember the new info items (or replace the old ones). If we already see that the item
314
        # is invalid (as seen in our short per-key history), we keep it as such -- this prevents
315
        # consistently invalid credentials from causing infinite re-authentication again and again.
316
        async with self._lock:
13✔
317
            self._update_converted(__src)
13✔
318

319
        # Notify the consuming tasks (client wrappers) that new credentials are ready to be used.
320
        # Those tasks can be blocked in `vault.invalidate()` if there are no credentials left.
321
        await self._ready.turn_to(True)
13✔
322

323
    def is_empty(self) -> bool:
13✔
324
        now = datetime.datetime.now(datetime.timezone.utc)
13✔
325
        expirations = [
13✔
326
            dt if dt is None or dt.tzinfo is not None else dt.replace(tzinfo=datetime.timezone.utc)
327
            for dt in (item.info.expiration for item in self._current.values())
328
        ]
329
        return all(dt is not None and now >= dt for dt in expirations)  # i.e. expired
13✔
330

331
    async def wait_for_readiness(self) -> None:
13✔
UNCOV
332
        await self._ready.wait_for(True)
×
333

334
    async def wait_for_emptiness(self) -> None:
13✔
335
        await self._ready.wait_for(False)
13✔
336

337
    async def close(self) -> None:
13✔
338
        """
339
        Finalize all the cached objects when the operator is ending.
340
        """
341
        async with self._lock:
13✔
342
            for key in self._current:
13✔
343
                await self._flush_caches(self._current[key])
13✔
344

345
    async def _flush_caches(
13✔
346
            self,
347
            item: VaultItem,
348
    ) -> None:
349
        """
350
        Call the finalizers and garbage-collect the cached objects.
351

352
        Mainly used to garbage-collect aiohttp sessions and its derivatives
353
        when the connection info items are removed from the vault -- so that
354
        the sessions/connectors would not complain that they were not close.
355

356
        Built-in garbage-collection is not sufficient, as it is synchronous,
357
        and cannot call the async coroutines like `aiohttp.ClientSession.close`.
358

359
        .. note::
360
            Currently, we assume the ``close()`` method only (both sync/async).
361
            There is no need to generalise to customizable finalizer callbacks.
362
            This can change in the future.
363
        """
364

365
        # Close the closable objects.
366
        if item.caches:
13✔
367
            for obj in item.caches.values():
13✔
368
                if hasattr(obj, 'close'):
13!
369
                    if asyncio.iscoroutinefunction(getattr(obj, 'close')):
13!
370
                        await getattr(obj, 'close')()
13✔
371
                    else:
UNCOV
372
                        getattr(obj, 'close')()
×
373

374
        # Garbage-collect other resources (e.g. files, memory, etc).
375
        item.caches = None
13✔
376

377
    def _update_converted(
13✔
378
            self,
379
            __src: Mapping[str, object],
380
    ) -> None:
381
        for key, info in __src.items():
13✔
382
            key = VaultKey(str(key))
13✔
383
            if not isinstance(info, ConnectionInfo):
13!
UNCOV
384
                raise ValueError("Only ConnectionInfo instances are currently accepted.")
×
385
            if info not in [data.info for data in self._invalid[key]]:
13✔
386
                self._current[key] = VaultItem(info=info)
13✔
387
        self._update_expiration()
13✔
388

389
    def _update_expiration(self) -> None:
13✔
390
        expirations = [
13✔
391
            dt if dt.tzinfo is not None else dt.replace(tzinfo=datetime.timezone.utc)
392
            for dt in (item.info.expiration for item in self._current.values())
393
            if dt is not None
394
        ]
395
        self._next_expiration = min(expirations) if expirations else None
13✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc