• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

maxlambrecht / rust-spiffe / 21214457836

21 Jan 2026 03:02PM UTC coverage: 83.127% (-0.07%) from 83.194%
21214457836

push

github

web-flow
feat(observability): consume log args when features are disabled (#263)

Signed-off-by: Max Lambrecht <maxlambrecht@gmail.com>

19 of 56 new or added lines in 7 files covered. (33.93%)

19 existing lines in 5 files now uncovered.

4700 of 5654 relevant lines covered (83.13%)

299.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.08
/spiffe/src/jwt_source/source.rs
1
use super::builder::{JwtSourceBuilder, ReconnectConfig, ResourceLimits};
2
use super::errors::{JwtSourceError, MetricsErrorKind};
3
use super::limits::validate_bundle_set;
4
use super::metrics::MetricsRecorder;
5
use super::supervisor::initial_sync_with_retry;
6
use super::types::ClientFactory;
7
use crate::bundle::BundleSource;
8
use crate::prelude::warn;
9
use crate::workload_api::WorkloadApiClient;
10
use crate::{JwtBundle, JwtBundleSet, JwtSvid, SpiffeId, TrustDomain};
11
use arc_swap::ArcSwap;
12
use std::fmt::Debug;
13
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14
use std::sync::Arc;
15
use std::time::Duration;
16
use tokio::sync::{watch, Mutex};
17
use tokio::task::JoinHandle;
18
use tokio_util::sync::CancellationToken;
19

20
#[cfg(test)]
21
use crate::WorkloadApiError;
22

23
/// Handle for receiving update notifications from a [`JwtSource`].
24
///
25
/// This type wraps the underlying notification mechanism and provides a clean
26
/// API for detecting when the JWT bundle set has been updated.
27
///
28
/// Cloning this handle creates another receiver that shares the same update
29
/// stream. Each receiver observes the latest sequence number; if a receiver
30
/// is slow to consume updates, intermediate sequence numbers may be skipped
31
/// (this is the standard behavior of `watch` channels).
32
///
33
/// # Examples
34
///
35
/// ```no_run
36
/// # use spiffe::JwtSource;
37
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
38
/// let source = JwtSource::new().await?;
39
/// let mut updates = source.updated();
40
///
41
/// // Wait for an update
42
/// updates.changed().await?;
43
/// println!("Update sequence: {}", updates.last());
44
///
45
/// // Wait for another update
46
/// updates.changed().await?;
47
/// println!("New sequence: {}", updates.last());
48
/// # Ok(())
49
/// # }
50
/// ```
51
#[derive(Clone, Debug)]
52
pub struct JwtSourceUpdates {
53
    rx: watch::Receiver<u64>,
54
}
55

56
impl JwtSourceUpdates {
57
    /// Waits for the next update and returns the new sequence number.
58
    ///
59
    /// This method waits for the next rotation after initial synchronization.
60
    /// The initial sync does not trigger a notification; only subsequent updates
61
    /// are notified.
62
    ///
63
    /// This method will return an error if the source has been closed or
64
    /// the internal update task has terminated (e.g., due to shutdown or
65
    /// an internal error).
66
    ///
67
    /// # Errors
68
    ///
69
    /// Returns [`JwtSourceError::Closed`] if the source has been shut down
70
    /// or the internal update task has terminated. This can occur due to:
71
    /// - Explicit shutdown via [`JwtSource::shutdown`] or [`JwtSource::shutdown_with_timeout`]
72
    /// - Internal task termination (e.g., supervisor task panic)
73
    pub async fn changed(&mut self) -> Result<u64, JwtSourceError> {
7✔
74
        self.rx
7✔
75
            .changed()
7✔
76
            .await
7✔
77
            .map_err(|_| JwtSourceError::Closed)?;
6✔
78
        Ok(*self.rx.borrow())
6✔
79
    }
6✔
80

81
    /// Returns the last sequence number without waiting.
82
    ///
83
    /// This method never blocks and always returns immediately with the
84
    /// current sequence number.
85
    pub fn last(&self) -> u64 {
21✔
86
        *self.rx.borrow()
21✔
87
    }
21✔
88

89
    /// Waits for the sequence number to satisfy a predicate.
90
    ///
91
    /// This method first checks the current sequence number; if the predicate
92
    /// is already satisfied, it returns immediately without waiting. Otherwise,
93
    /// it repeatedly calls `changed()` until the predicate returns `true`.
94
    ///
95
    /// # Errors
96
    ///
97
    /// Returns an error if the source has been closed.
98
    pub async fn wait_for<F>(&mut self, mut f: F) -> Result<u64, JwtSourceError>
7✔
99
    where
7✔
100
        F: FnMut(&u64) -> bool,
7✔
101
    {
7✔
102
        let current = self.last();
7✔
103
        if f(&current) {
7✔
104
            return Ok(current);
2✔
105
        }
5✔
106
        loop {
107
            let seq = self.changed().await?;
5✔
108
            if f(&seq) {
5✔
109
                return Ok(seq);
5✔
110
            }
×
111
        }
112
    }
7✔
113
}
114

115
/// Live source of JWT bundles from the SPIFFE Workload API.
116
///
117
/// `JwtSource` performs an initial sync before returning from [`JwtSource::new`] or
118
/// [`JwtSourceBuilder::build`]. Updates are applied atomically and can be observed via
119
/// [`JwtSource::updated`].
120
///
121
/// The source automatically:
122
/// - Maintains a cached view of the latest JWT bundles
123
/// - Handles bundle rotation transparently
124
/// - Reconnects with exponential backoff on transient failures
125
/// - Validates resource limits before publishing updates
126
///
127
/// Unlike X.509 SVIDs which are streamed continuously, JWT SVIDs are fetched on-demand
128
/// with specific audiences. Use [`JwtSource::get_jwt_svid`] to fetch JWT SVIDs as needed.
129
///
130
/// Use [`JwtSource::shutdown`] or [`JwtSource::shutdown_configured`] to stop background tasks.
131
///
132
#[derive(Clone, Debug)]
133
pub struct JwtSource {
134
    inner: Arc<Inner>,
135
}
136

137
pub(super) struct Inner {
138
    // Atomically replaced, last-known-good JWT bundle set.
139
    bundle_set: ArcSwap<JwtBundleSet>,
140
    limits: ResourceLimits,
141

142
    // Cached client for on-demand SVID fetching (recreated on failure).
143
    // Uses Option to allow lazy initialization.
144
    // Protected by Mutex to prevent concurrent creation.
145
    cached_client: ArcSwap<Option<Arc<WorkloadApiClient>>>,
146
    client_creation_mutex: Mutex<()>,
147

148
    // Supervisor configuration and dependencies.
149
    reconnect: ReconnectConfig,
150
    make_client: ClientFactory,
151
    metrics: Option<Arc<dyn MetricsRecorder>>,
152

153
    // Lifecycle / shutdown.
154
    closed: AtomicBool,
155
    cancel: CancellationToken,
156
    shutdown_timeout: Option<Duration>,
157

158
    // Update notifications (monotonic sequence).
159
    update_seq: AtomicU64,
160
    update_tx: watch::Sender<u64>,
161
    update_rx: watch::Receiver<u64>,
162

163
    // Supervisor task handle (joined/aborted at shutdown).
164
    supervisor: Mutex<Option<JoinHandle<()>>>,
165
}
166

167
impl Inner {
168
    pub(super) fn reconnect(&self) -> ReconnectConfig {
26✔
169
        self.reconnect
26✔
170
    }
26✔
171
    pub(super) fn metrics(&self) -> Option<&dyn MetricsRecorder> {
12✔
172
        self.metrics.as_deref()
12✔
173
    }
12✔
174
    pub(super) fn make_client(&self) -> &ClientFactory {
6✔
175
        &self.make_client
6✔
176
    }
6✔
177

178
    /// Gets the cached client or creates a new one if not available.
179
    ///
180
    /// This method is safe to call concurrently. If multiple tasks call this
181
    /// simultaneously when the cache is empty, only one will create the client;
182
    /// the others will wait and reuse the newly created client.
183
    pub(super) async fn get_or_recreate_client(
2✔
184
        &self,
2✔
185
    ) -> Result<Arc<WorkloadApiClient>, JwtSourceError> {
2✔
186
        // Fast path: check cache first
187
        let cached = self.cached_client.load();
2✔
188
        if let Some(client) = cached.as_ref() {
2✔
189
            return Ok(Arc::clone(client));
2✔
190
        }
×
191

192
        // Slow path: serialize client creation to avoid races
193
        let _guard = self.client_creation_mutex.lock().await;
×
194

195
        // Double-check: another task might have created it while we waited
196
        let cached = self.cached_client.load();
×
197
        if let Some(client) = cached.as_ref() {
×
198
            return Ok(Arc::clone(client));
×
199
        }
×
200

201
        // We're the first, create the client
202
        self.recreate_client_inner().await
×
203
    }
2✔
204

205
    /// Recreates the cached client and stores it atomically.
206
    ///
207
    /// This method should be called when the cached client fails or becomes invalid.
208
    /// It serializes creation to avoid concurrent recreation.
209
    pub(super) async fn recreate_client(&self) -> Result<Arc<WorkloadApiClient>, JwtSourceError> {
×
210
        let _guard = self.client_creation_mutex.lock().await;
×
211
        self.recreate_client_inner().await
×
212
    }
×
213

214
    /// Internal helper that actually creates and stores the client.
215
    /// Must be called while holding `client_creation_mutex`.
216
    async fn recreate_client_inner(&self) -> Result<Arc<WorkloadApiClient>, JwtSourceError> {
×
217
        let client = (self.make_client)().await.map_err(JwtSourceError::Source)?;
×
218
        let client_arc = Arc::new(client);
×
219
        self.cached_client
×
220
            .store(Arc::new(Some(Arc::clone(&client_arc))));
×
221
        Ok(client_arc)
×
222
    }
×
223
}
224

225
impl Debug for Inner {
226
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
227
        f.debug_struct("JwtSource")
×
228
            .field("bundle_set", &"<ArcSwap<JwtBundleSet>>")
×
229
            .field(
×
230
                "cached_client",
×
231
                &"<ArcSwap<Option<Arc<WorkloadApiClient>>>>",
×
232
            )
×
233
            .field("client_creation_mutex", &"<Mutex<()>>")
×
234
            .field("reconnect", &self.reconnect)
×
235
            .field("limits", &self.limits)
×
236
            .field("make_client", &"<ClientFactory>")
×
237
            .field(
×
238
                "metrics",
×
239
                &self.metrics.as_ref().map(|_| "<MetricsRecorder>"),
×
240
            )
241
            .field("shutdown_timeout", &self.shutdown_timeout)
×
242
            .field("closed", &self.closed.load(Ordering::Relaxed))
×
243
            .field("cancel", &self.cancel)
×
244
            .field("update_seq", &self.update_seq)
×
245
            .field("update_tx", &"<watch::Sender<u64>>")
×
246
            .field("update_rx", &"<watch::Receiver<u64>>")
×
247
            .field("supervisor", &"<Mutex<Option<JoinHandle<()>>>>")
×
248
            .finish()
×
249
    }
×
250
}
251

252
impl JwtSource {
253
    /// Creates a `JwtSource` using the default Workload API endpoint.
254
    ///
255
    /// The endpoint is resolved from `SPIFFE_ENDPOINT_SOCKET`.
256
    ///
257
    /// On success, the returned source is already synchronized with the agent and will keep
258
    /// updating in the background until it is closed.
259
    ///
260
    /// # Errors
261
    ///
262
    /// Returns a [`JwtSourceError`] if:
263
    /// - the Workload API endpoint cannot be resolved or connected to,
264
    /// - the initial synchronization with the Workload API does not complete successfully.
265
    pub async fn new() -> Result<Self, JwtSourceError> {
14✔
266
        JwtSourceBuilder::new().build().await
14✔
267
    }
14✔
268

269
    /// Creates a builder for configuring a [`JwtSource`].
270
    ///
271
    /// The builder allows customizing how the source connects to the SPIFFE
272
    /// Workload API and how JWT material is managed (e.g. endpoint selection,
273
    /// reconnection behavior, resource limits).
274
    ///
275
    /// # Examples
276
    ///
277
    /// ```no_run
278
    /// use spiffe::jwt_source::JwtSource;
279
    ///
280
    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
281
    /// let source = JwtSource::builder()
282
    ///     .endpoint("unix:///tmp/spire-agent/public/api.sock")
283
    ///     .build()
284
    ///     .await?;
285
    ///
286
    /// # Ok(())
287
    /// # }
288
    /// ```
289
    pub fn builder() -> JwtSourceBuilder {
×
290
        JwtSourceBuilder::new()
×
291
    }
×
292

293
    /// Returns a handle for receiving update notifications.
294
    ///
295
    /// The handle yields a monotonically increasing sequence number on each
296
    /// successful update to the JWT bundle set. This can be used to detect when
297
    /// the bundle set has changed without polling.
298
    ///
299
    /// **Note:** The initial sequence number is 0. Notifications are only sent
300
    /// for rotations that occur after initial synchronization completes. The initial
301
    /// sync does not trigger a notification.
302
    ///
303
    /// # Examples
304
    ///
305
    /// ```no_run
306
    /// # use spiffe::JwtSource;
307
    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
308
    /// let source = JwtSource::new().await?;
309
    /// let mut updates = source.updated();
310
    /// // Wait for the first update notification
311
    /// updates.changed().await?;
312
    /// println!("Update sequence: {}", updates.last());
313
    /// # Ok(())
314
    /// # }
315
    /// ```
316
    pub fn updated(&self) -> JwtSourceUpdates {
6✔
317
        JwtSourceUpdates {
6✔
318
            rx: self.inner.update_rx.clone(),
6✔
319
        }
6✔
320
    }
6✔
321

322
    /// Returns `true` if the source appears healthy and can likely provide bundles.
323
    ///
324
    /// This method checks that:
325
    /// - The source is not closed or cancelled
326
    /// - There are bundles available
327
    ///
328
    /// **Note:** This check is inherently racy. Between calling `is_healthy()` and
329
    /// `bundle_set()`, the source may be shut down or the bundle set may change. Use this
330
    /// for best-effort health checks (e.g., monitoring), not for synchronization.
331
    /// If you need a guaranteed check, call `bundle_set()` directly and handle the error.
332
    ///
333
    /// # Examples
334
    ///
335
    /// ```no_run
336
    /// # use spiffe::JwtSource;
337
    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
338
    /// let source = JwtSource::new().await?;
339
    ///
340
    /// if source.is_healthy() {
341
    ///     println!("Source appears healthy");
342
    /// } else {
343
    ///     println!("Source is unhealthy");
344
    /// }
345
    /// # Ok(())
346
    /// # }
347
    /// ```
348
    pub fn is_healthy(&self) -> bool {
14✔
349
        if self.inner.closed.load(Ordering::Acquire) || self.inner.cancel.is_cancelled() {
14✔
350
            return false;
1✔
351
        }
13✔
352

353
        let bundle_set = self.inner.bundle_set.load();
13✔
354
        !bundle_set.is_empty()
13✔
355
    }
14✔
356

357
    /// Returns the current JWT bundle set.
358
    ///
359
    /// # Errors
360
    ///
361
    /// Returns a [`JwtSourceError`] if the source is closed.
362
    pub fn bundle_set(&self) -> Result<Arc<JwtBundleSet>, JwtSourceError> {
2✔
363
        self.assert_open()?;
2✔
364
        Ok(self.inner.bundle_set.load_full())
1✔
365
    }
2✔
366

367
    /// Returns the current bundle for the trust domain, or `None` if unavailable.
368
    ///
369
    /// This is a convenience method that returns `None` instead of an error
370
    /// when the bundle cannot be retrieved. Use this when `None` is an acceptable
371
    /// value for your use case.
372
    ///
373
    /// **Note:** This method swallows all errors, including `Closed`. If you need
374
    /// to detect shutdown, use [`JwtSource::bundle_for_trust_domain`] instead.
375
    ///
376
    /// # Examples
377
    ///
378
    /// ```no_run
379
    /// # use spiffe::{TrustDomain, JwtSource};
380
    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
381
    /// let source = JwtSource::new().await?;
382
    /// let trust_domain = TrustDomain::new("example.org")?;
383
    ///
384
    /// if let Some(bundle) = source.try_bundle_for_trust_domain(&trust_domain) {
385
    ///     println!("Got bundle for {}", trust_domain);
386
    /// } else {
387
    ///     println!("No bundle available for {}", trust_domain);
388
    /// }
389
    /// # Ok(())
390
    /// # }
391
    /// ```
392
    pub fn try_bundle_for_trust_domain(&self, td: &TrustDomain) -> Option<Arc<JwtBundle>> {
2✔
393
        self.bundle_for_trust_domain(td).ok().flatten()
2✔
394
    }
2✔
395

396
    /// Fetches a JWT SVID for the given audience.
397
    ///
398
    /// Unlike X.509 SVIDs which are streamed continuously, JWT SVIDs are fetched
399
    /// on-demand with specific audiences. This method makes a one-shot request to
400
    /// the Workload API to fetch a JWT SVID.
401
    ///
402
    /// # Errors
403
    ///
404
    /// Returns a [`JwtSourceError`] if:
405
    /// - the source is closed
406
    /// - the Workload API request fails
407
    /// - no SVID is returned or the SVID cannot be parsed
408
    ///
409
    /// # Examples
410
    ///
411
    /// ```no_run
412
    /// # use spiffe::JwtSource;
413
    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
414
    /// let source = JwtSource::new().await?;
415
    ///
416
    /// // Fetch a JWT SVID for specific audiences
417
    /// let jwt_svid = source.get_jwt_svid(&["service-a", "service-b"]).await?;
418
    /// println!("SPIFFE ID: {}", jwt_svid.spiffe_id());
419
    ///
420
    /// # Ok(())
421
    /// # }
422
    /// ```
423
    pub async fn get_jwt_svid<I>(&self, audience: I) -> Result<JwtSvid, JwtSourceError>
2✔
424
    where
2✔
425
        I: IntoIterator,
2✔
426
        I::Item: AsRef<str>,
2✔
427
    {
2✔
428
        self.get_jwt_svid_with_id(audience, None).await
2✔
429
    }
2✔
430

431
    /// Fetches a JWT SVID for the given audience and optional SPIFFE ID.
432
    ///
433
    /// This method automatically retries once if the initial request fails (e.g., due to
434
    /// a closed connection). On retry, the cached client is recreated to handle transient
435
    /// connection issues.
436
    ///
437
    /// # Errors
438
    ///
439
    /// Returns a [`JwtSourceError`] if:
440
    /// - the source is closed
441
    /// - the Workload API request fails (after retry)
442
    /// - no SVID is returned or the SVID cannot be parsed
443
    /// # Examples
444
    ///
445
    /// ```no_run
446
    /// # use spiffe::JwtSource;
447
    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
448
    /// let source = JwtSource::new().await?;
449
    ///
450
    /// // Fetch for a specific SPIFFE ID
451
    /// let spiffe_id = "spiffe://example.org/myservice".parse()?;
452
    /// let jwt_svid = source.get_jwt_svid_with_id(&["service-a"], Some(&spiffe_id)).await?;
453
    /// # Ok(())
454
    /// # }
455
    pub async fn get_jwt_svid_with_id<I>(
4✔
456
        &self,
4✔
457
        audience: I,
4✔
458
        spiffe_id: Option<&SpiffeId>,
4✔
459
    ) -> Result<JwtSvid, JwtSourceError>
4✔
460
    where
4✔
461
        I: IntoIterator,
4✔
462
        I::Item: AsRef<str>,
4✔
463
    {
4✔
464
        self.assert_open()?;
4✔
465

466
        // Collect audience into Vec to allow reuse on retry
467
        let audience_vec: Vec<String> = audience
2✔
468
            .into_iter()
2✔
469
            .map(|a| a.as_ref().to_string())
2✔
470
            .collect();
2✔
471

472
        let client = self.inner.get_or_recreate_client().await?;
2✔
473

474
        // Try to fetch the SVID
475
        match client.fetch_jwt_svid(&audience_vec, spiffe_id).await {
2✔
476
            Ok(svid) => Ok(svid),
2✔
477
            Err(_e) => {
×
478
                // On failure, invalidate the cached client and try once more
479
                // This handles transient connection issues (e.g., connection closed)
480
                self.assert_open()?; // Check if closed before retry
×
481
                let new_client = self.inner.recreate_client().await?;
×
482
                new_client
×
483
                    .fetch_jwt_svid(&audience_vec, spiffe_id)
×
484
                    .await
×
485
                    .map_err(JwtSourceError::FetchJwtSvid)
×
486
            }
487
        }
488
    }
4✔
489

490
    /// Cancels background tasks and waits for termination.
491
    ///
492
    /// This method is idempotent. Calling it multiple times is safe and has no
493
    /// additional effect after the first invocation.
494
    ///
495
    /// The shutdown request is best-effort. Background tasks are signaled to stop
496
    /// and awaited before returning.
497
    ///
498
    /// **Note:** This method may wait indefinitely if background tasks don't respond.
499
    /// For production use, prefer [`JwtSource::shutdown_with_timeout`] or
500
    /// [`JwtSource::shutdown_configured`].
501
    pub async fn shutdown(&self) {
6✔
502
        if self.inner.closed.swap(true, Ordering::AcqRel) {
6✔
503
            return;
×
504
        }
6✔
505
        self.inner.cancel.cancel();
6✔
506

507
        if let Some(handle) = self.inner.supervisor.lock().await.take() {
6✔
508
            if let Err(e) = handle.await {
6✔
NEW
509
                warn!("Error joining supervisor task during shutdown: error={e}");
×
510
                self.inner
×
511
                    .record_error(MetricsErrorKind::SupervisorJoinFailed);
×
512
            }
6✔
513
        }
×
514
    }
6✔
515

516
    /// Cancels background tasks and waits for termination with a timeout.
517
    ///
518
    /// This method attempts graceful shutdown first: it signals cancellation and
519
    /// waits up to `timeout` for the supervisor task to complete. If the timeout
520
    /// is exceeded, the task is forcefully aborted.
521
    ///
522
    /// This method is idempotent. Calling it multiple times is safe and has no
523
    /// additional effect after the first invocation.
524
    ///
525
    /// # Errors
526
    ///
527
    /// Returns [`JwtSourceError::ShutdownTimeout`] if graceful shutdown does not
528
    /// complete within the timeout and the task must be aborted.
529
    ///
530
    /// # Examples
531
    ///
532
    /// ```no_run
533
    /// use spiffe::JwtSource;
534
    /// use std::time::Duration;
535
    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
536
    /// let source = JwtSource::new().await?;
537
    /// // Shutdown with 10 second timeout (graceful, then abort if needed)
538
    /// source.shutdown_with_timeout(Duration::from_secs(10)).await?;
539
    /// # Ok(())
540
    /// # }
541
    /// ```
542
    pub async fn shutdown_with_timeout(&self, timeout: Duration) -> Result<(), JwtSourceError> {
3✔
543
        if self.inner.closed.swap(true, Ordering::AcqRel) {
3✔
544
            return Ok(());
1✔
545
        }
2✔
546
        self.inner.cancel.cancel();
2✔
547

548
        let Some(mut handle) = self.inner.supervisor.lock().await.take() else {
2✔
549
            return Ok(());
×
550
        };
551

552
        match tokio::time::timeout(timeout, &mut handle).await {
2✔
553
            Ok(Ok(())) => Ok(()),
2✔
NEW
554
            Ok(Err(e)) => {
×
NEW
555
                warn!("Error joining supervisor task during shutdown: error={e}");
×
556
                self.inner
×
557
                    .record_error(MetricsErrorKind::SupervisorJoinFailed);
×
558
                Ok(())
×
559
            }
560
            Err(_) => {
UNCOV
561
                warn!("Shutdown timeout exceeded; aborting supervisor task");
×
562
                handle.abort();
×
563
                // Wait for the abort to take effect
564
                let _ = handle.await;
×
565
                Err(JwtSourceError::ShutdownTimeout)
×
566
            }
567
        }
568
    }
3✔
569

570
    /// Cancels background tasks and waits for termination using the configured timeout.
571
    ///
572
    /// This is a convenience method that uses the timeout configured in the builder.
573
    /// If no timeout was configured, this method will wait indefinitely (same as `shutdown()`).
574
    ///
575
    /// # Errors
576
    ///
577
    /// Returns [`JwtSourceError::ShutdownTimeout`] if the configured shutdown timeout is exceeded.
578
    pub async fn shutdown_configured(&self) -> Result<(), JwtSourceError> {
3✔
579
        if let Some(timeout) = self.inner.shutdown_timeout {
3✔
580
            self.shutdown_with_timeout(timeout).await
3✔
581
        } else {
582
            self.shutdown().await;
×
583
            Ok(())
×
584
        }
585
    }
3✔
586
}
587

588
impl JwtSource {
589
    pub(super) async fn build_with(
23✔
590
        make_client: ClientFactory,
23✔
591
        reconnect: ReconnectConfig,
23✔
592
        limits: ResourceLimits,
23✔
593
        metrics: Option<Arc<dyn MetricsRecorder>>,
23✔
594
        shutdown_timeout: Option<Duration>,
23✔
595
    ) -> Result<JwtSource, JwtSourceError> {
23✔
596
        let reconnect = super::builder::normalize_reconnect(reconnect);
23✔
597

598
        let (update_tx, update_rx) = watch::channel(0u64);
23✔
599
        let cancel = CancellationToken::new();
23✔
600

601
        let initial_bundle_set =
23✔
602
            initial_sync_with_retry(&make_client, &cancel, reconnect, limits, metrics.as_deref())
23✔
603
                .await?;
23✔
604

605
        // Create initial client for on-demand SVID fetching
606
        let initial_client = make_client().await.map_err(JwtSourceError::Source)?;
23✔
607
        let initial_client_arc = Arc::new(initial_client);
23✔
608

609
        let inner = Arc::new(Inner {
23✔
610
            bundle_set: ArcSwap::from(initial_bundle_set),
23✔
611
            cached_client: ArcSwap::from(Arc::new(Some(initial_client_arc))),
23✔
612
            client_creation_mutex: Mutex::new(()),
23✔
613
            reconnect,
23✔
614
            make_client,
23✔
615
            limits,
23✔
616
            metrics,
23✔
617
            shutdown_timeout,
23✔
618
            closed: AtomicBool::new(false),
23✔
619
            cancel,
23✔
620
            update_seq: AtomicU64::new(0),
23✔
621
            update_tx,
23✔
622
            update_rx,
23✔
623
            supervisor: Mutex::new(None),
23✔
624
        });
23✔
625

626
        let task_inner = Arc::clone(&inner);
23✔
627
        let token = task_inner.cancel.clone();
23✔
628
        let handle = tokio::spawn(async move {
23✔
629
            task_inner.run_update_supervisor(token).await;
14✔
630
        });
8✔
631

632
        *inner.supervisor.lock().await = Some(handle);
23✔
633

634
        Ok(Self { inner })
23✔
635
    }
23✔
636

637
    /// Test-only constructor that creates a `JwtSource` with a provided initial bundle set
638
    /// without spawning the supervisor task or performing initial sync.
639
    ///
640
    /// This allows deterministic unit tests without requiring a real Workload API client.
641
    #[cfg(test)]
642
    pub(super) fn new_for_test(
2✔
643
        initial_bundle_set: Arc<JwtBundleSet>,
2✔
644
        reconnect: ReconnectConfig,
2✔
645
        limits: ResourceLimits,
2✔
646
        metrics: Option<Arc<dyn MetricsRecorder>>,
2✔
647
    ) -> JwtSource {
2✔
648
        // Normalize reconnect config at the boundary (same as build_with)
649
        let reconnect = super::builder::normalize_reconnect(reconnect);
2✔
650

651
        let (update_tx, update_rx) = watch::channel(0u64);
2✔
652
        let cancel = CancellationToken::new();
2✔
653

654
        let make_client: ClientFactory =
2✔
655
            Arc::new(|| Box::pin(async move { Err(WorkloadApiError::EmptyResponse) }));
2✔
656

657
        let inner = Inner {
2✔
658
            bundle_set: ArcSwap::from(initial_bundle_set),
2✔
659
            cached_client: ArcSwap::from(Arc::new(None)),
2✔
660
            client_creation_mutex: Mutex::new(()),
2✔
661
            reconnect,
2✔
662
            make_client,
2✔
663
            limits,
2✔
664
            metrics,
2✔
665
            shutdown_timeout: None,
2✔
666
            closed: AtomicBool::new(false),
2✔
667
            cancel,
2✔
668
            update_seq: AtomicU64::new(0),
2✔
669
            update_tx,
2✔
670
            update_rx,
2✔
671
            supervisor: Mutex::new(None),
2✔
672
        };
2✔
673

674
        Self {
2✔
675
            inner: Arc::new(inner),
2✔
676
        }
2✔
677
    }
2✔
678

679
    fn assert_open(&self) -> Result<(), JwtSourceError> {
29✔
680
        if self.inner.closed.load(Ordering::Acquire) || self.inner.cancel.is_cancelled() {
29✔
681
            return Err(JwtSourceError::Closed);
5✔
682
        }
24✔
683
        Ok(())
24✔
684
    }
29✔
685
}
686

687
impl Inner {
688
    pub(super) fn record_error(&self, kind: MetricsErrorKind) {
1✔
689
        if let Some(metrics) = self.metrics.as_deref() {
1✔
690
            metrics.record_error(kind);
1✔
691
        }
1✔
692
    }
1✔
693

694
    pub(super) fn record_update(&self) {
6✔
695
        if let Some(metrics) = self.metrics.as_deref() {
6✔
696
            metrics.record_update();
1✔
697
        }
5✔
698
    }
6✔
699

700
    pub(super) fn apply_update(
7✔
701
        &self,
7✔
702
        new_bundle_set: Arc<JwtBundleSet>,
7✔
703
    ) -> Result<(), JwtSourceError> {
7✔
704
        // validate_bundle_set() already records limit-specific metrics.
705
        // We only record UpdateRejected here if validation fails, and the supervisor loop
706
        // should NOT record it again to avoid double-counting.
707
        match self.validate_bundle_set(&new_bundle_set) {
7✔
708
            Ok(()) => {
709
                self.bundle_set.store(new_bundle_set);
6✔
710
                self.record_update();
6✔
711
                self.notify_update();
6✔
712
                Ok(())
6✔
713
            }
714
            Err(e) => {
1✔
715
                // Record UpdateRejected for any validation failure (limit metrics already recorded in validate_bundle_set).
716
                self.record_error(MetricsErrorKind::UpdateRejected);
1✔
717
                Err(e)
1✔
718
            }
719
        }
720
    }
7✔
721

722
    pub(super) fn notify_update(&self) {
6✔
723
        let next = self.update_seq.fetch_add(1, Ordering::Relaxed) + 1;
6✔
724
        let _ = self.update_tx.send(next);
6✔
725
    }
6✔
726

727
    pub(super) fn validate_bundle_set(
7✔
728
        &self,
7✔
729
        bundle_set: &JwtBundleSet,
7✔
730
    ) -> Result<(), JwtSourceError> {
7✔
731
        validate_bundle_set(bundle_set, self.limits, self.metrics.as_deref())
7✔
732
    }
7✔
733
}
734

735
impl Drop for JwtSource {
736
    fn drop(&mut self) {
25✔
737
        // Best-effort cancellation. Do not block in Drop.
738
        self.inner.cancel.cancel();
25✔
739
    }
25✔
740
}
741

742
impl BundleSource for JwtSource {
743
    type Item = JwtBundle;
744
    type Error = JwtSourceError;
745

746
    fn bundle_for_trust_domain(
23✔
747
        &self,
23✔
748
        trust_domain: &TrustDomain,
23✔
749
    ) -> Result<Option<Arc<Self::Item>>, Self::Error> {
23✔
750
        self.assert_open()?;
23✔
751
        let bundle_set = self.inner.bundle_set.load();
21✔
752
        Ok(bundle_set.get(trust_domain))
21✔
753
    }
23✔
754
}
755

756
#[cfg(test)]
757
#[allow(clippy::unwrap_used, clippy::expect_used)]
758
mod tests {
759
    use super::super::errors::MetricsErrorKind;
760
    use super::super::metrics::MetricsRecorder;
761
    use super::*;
762
    use crate::bundle::jwt::JwtAuthority;
763
    use std::collections::HashMap;
764
    use std::sync::{Arc, Mutex};
765
    use std::time::Duration;
766
    use tokio::sync::watch;
767

768
    fn jwk_with_kid(kid: &str) -> JwtAuthority {
3✔
769
        let json = format!(
3✔
770
            r#"{{
3✔
771
                "kty": "oct",
3✔
772
                "kid": "{kid}",
3✔
773
                "k": "AyM1SysPpbyDfgZld3umj1qzKObwVMkoqQ-EstJQLr_T-1qS0gZH75aKtMN3Yj0iPS4hcgUuTwjAzZr1Z9CAow"
3✔
774
            }}"#
3✔
775
        );
776
        JwtAuthority::from_jwk_json(json.as_bytes()).expect("valid JWK JSON")
3✔
777
    }
3✔
778

779
    fn create_test_bundle_set() -> Arc<JwtBundleSet> {
1✔
780
        let trust_domain = TrustDomain::new("example.org").unwrap();
1✔
781
        let mut bundle = JwtBundle::new(trust_domain);
1✔
782
        bundle.add_jwt_authority(jwk_with_kid("kid-1"));
1✔
783
        let mut bundle_set = JwtBundleSet::new();
1✔
784
        bundle_set.add_bundle(bundle);
1✔
785
        Arc::new(bundle_set)
1✔
786
    }
1✔
787

788
    #[tokio::test]
789
    async fn test_wait_for_immediate_satisfaction() {
1✔
790
        let (tx, rx) = watch::channel(5u64);
1✔
791
        let mut updates = JwtSourceUpdates { rx };
1✔
792

793
        // Predicate is already satisfied (current value is 5, which is > 3)
794
        let result = updates.wait_for(|&seq| seq > 3).await;
1✔
795
        assert!(result.is_ok());
1✔
796
        assert_eq!(result.unwrap(), 5);
1✔
797

798
        // Update the value
799
        let _ = tx.send(10);
1✔
800

801
        // Wait for predicate to be satisfied again (should return immediately with new value)
802
        let result = updates.wait_for(|&seq| seq > 8).await;
1✔
803
        assert!(result.is_ok());
1✔
804
        assert_eq!(result.unwrap(), 10);
1✔
805
    }
1✔
806

807
    #[tokio::test]
808
    async fn test_wait_for_waits_when_not_satisfied() {
1✔
809
        let (tx, rx) = watch::channel(1u64);
1✔
810
        let mut updates = JwtSourceUpdates { rx };
1✔
811

812
        // Spawn a task to update the value after a short delay
813
        let tx_clone = tx.clone();
1✔
814
        tokio::spawn(async move {
1✔
815
            tokio::time::sleep(Duration::from_millis(50)).await;
1✔
816
            let _ = tx_clone.send(5);
1✔
817
        });
1✔
818

819
        // Predicate is not satisfied initially (1 is not > 3)
820
        // Should wait and then return when value becomes 5
821
        let result = tokio::time::timeout(Duration::from_secs(1), updates.wait_for(|&seq| seq > 3))
2✔
822
            .await
1✔
823
            .expect("Should complete within timeout");
1✔
824
        assert!(result.is_ok());
1✔
825
        assert_eq!(result.unwrap(), 5);
1✔
826
    }
1✔
827

828
    #[tokio::test]
829
    async fn test_updated_only_notifies_on_rotations_after_initial_sync() {
1✔
830
        // Verify that updated().changed() only notifies on rotations after initial sync,
831
        // not on the initial sync itself. The initial sequence number is 0.
832
        let (tx, rx) = watch::channel(0u64);
1✔
833
        let mut updates = JwtSourceUpdates { rx: rx.clone() };
1✔
834

835
        // Initial value is 0, so changed() should wait for an update
836
        let tx_clone = tx.clone();
1✔
837
        tokio::spawn(async move {
1✔
838
            tokio::time::sleep(Duration::from_millis(50)).await;
1✔
839
            // Simulate first rotation after initial sync
840
            let _ = tx_clone.send(1);
1✔
841
        });
1✔
842

843
        // Should wait and then return when value becomes 1 (first rotation)
844
        let result = tokio::time::timeout(Duration::from_secs(1), updates.changed())
1✔
845
            .await
1✔
846
            .expect("Should complete within timeout");
1✔
847
        assert!(result.is_ok());
1✔
848
        assert_eq!(result.unwrap(), 1);
1✔
849
        assert_eq!(updates.last(), 1);
1✔
850
    }
1✔
851

852
    #[tokio::test]
853
    async fn test_updated_initial_sequence_is_zero() {
1✔
854
        // Verify that the initial sequence number is 0
855
        let (_tx, rx) = watch::channel(0u64);
1✔
856
        let updates = JwtSourceUpdates { rx };
1✔
857
        assert_eq!(updates.last(), 0);
1✔
858
    }
1✔
859

860
    /// Test metrics recorder that counts error recordings by kind.
861
    struct TestMetricsRecorder {
862
        counts: Arc<Mutex<HashMap<MetricsErrorKind, u64>>>,
863
    }
864

865
    impl TestMetricsRecorder {
866
        fn new() -> Self {
2✔
867
            Self {
2✔
868
                counts: Arc::new(Mutex::new(HashMap::new())),
2✔
869
            }
2✔
870
        }
2✔
871

872
        fn count(&self, kind: MetricsErrorKind) -> u64 {
6✔
873
            *self.counts.lock().unwrap().get(&kind).unwrap_or(&0)
6✔
874
        }
6✔
875
    }
876

877
    impl MetricsRecorder for TestMetricsRecorder {
878
        fn record_update(&self) {}
×
879
        fn record_reconnect(&self) {}
×
880
        fn record_error(&self, kind: MetricsErrorKind) {
3✔
881
            *self.counts.lock().unwrap().entry(kind).or_insert(0) += 1;
3✔
882
        }
3✔
883
    }
884

885
    #[test]
886
    fn test_metrics_recorded_exactly_once_per_rejected_update() {
1✔
887
        // Verify that UpdateRejected and limit metrics are recorded exactly once
888
        // per rejected update, with no double-counting.
889
        use super::super::builder::ResourceLimits;
890
        use crate::bundle::jwt::JwtBundle;
891

892
        let metrics = Arc::new(TestMetricsRecorder::new());
1✔
893
        let limits = ResourceLimits {
1✔
894
            max_bundles: Some(0), // Limit that will be exceeded
1✔
895
            max_bundle_jwks_bytes: Some(1000),
1✔
896
        };
1✔
897

898
        // Create a bundle set with 1 bundle (exceeds max_bundles=0)
899
        let trust_domain = TrustDomain::new("example.org").unwrap();
1✔
900
        let mut bundle = JwtBundle::new(trust_domain.clone());
1✔
901
        bundle.add_jwt_authority(jwk_with_kid("kid-1"));
1✔
902
        let mut bundle_set = JwtBundleSet::new();
1✔
903
        bundle_set.add_bundle(bundle);
1✔
904

905
        // Create source using test seam
906
        let source = JwtSource::new_for_test(
1✔
907
            Arc::new(JwtBundleSet::new()),
1✔
908
            ReconnectConfig::default(),
1✔
909
            limits,
1✔
910
            Some(metrics.clone()),
1✔
911
        );
912

913
        // Apply update that should be rejected due to max_bundles limit
914
        let result = source.inner.apply_update(Arc::new(bundle_set));
1✔
915

916
        // Should fail with ResourceLimitExceeded
917
        assert!(matches!(
1✔
918
            result,
1✔
919
            Err(JwtSourceError::ResourceLimitExceeded {
920
                kind: super::super::errors::LimitKind::MaxBundles,
921
                ..
922
            })
923
        ));
924

925
        // Verify metrics recorded exactly once
926
        assert_eq!(metrics.count(MetricsErrorKind::LimitMaxBundles), 1);
1✔
927
        assert_eq!(metrics.count(MetricsErrorKind::UpdateRejected), 1);
1✔
928
        // Verify no other limit metrics were recorded
929
        assert_eq!(metrics.count(MetricsErrorKind::LimitMaxBundleJwksBytes), 0);
1✔
930
    }
1✔
931

932
    #[test]
933
    fn test_new_with_normalizes_reconnect_config() {
1✔
934
        // Verify that JwtSource::build_with() normalizes reconnect config at the authoritative boundary.
935
        use super::super::builder::ResourceLimits;
936
        use std::time::Duration;
937

938
        let initial_bundle_set = create_test_bundle_set();
1✔
939

940
        // Create reconnect config with inverted min/max (min > max)
941
        let inverted_reconnect = ReconnectConfig {
1✔
942
            min_backoff: Duration::from_secs(10),
1✔
943
            max_backoff: Duration::from_secs(1),
1✔
944
        };
1✔
945

946
        // Create source using test seam with inverted reconnect config
947
        let source = JwtSource::new_for_test(
1✔
948
            initial_bundle_set,
1✔
949
            inverted_reconnect,
1✔
950
            ResourceLimits::default(),
1✔
951
            None,
1✔
952
        );
953

954
        // Verify that reconnect config was normalized (swapped)
955
        assert_eq!(source.inner.reconnect.min_backoff, Duration::from_secs(1));
1✔
956
        assert_eq!(source.inner.reconnect.max_backoff, Duration::from_secs(10));
1✔
957
    }
1✔
958

959
    #[test]
960
    fn test_initial_sync_validation_records_correct_metrics() {
1✔
961
        // Verify that validation records limit metrics,
962
        // but does NOT record UpdateRejected (that's recorded by apply_update).
963
        use super::super::builder::ResourceLimits;
964
        use super::super::limits::validate_bundle_set;
965

966
        let metrics = Arc::new(TestMetricsRecorder::new());
1✔
967
        let limits = ResourceLimits {
1✔
968
            max_bundles: Some(0), // Limit that will be exceeded
1✔
969
            max_bundle_jwks_bytes: Some(1000),
1✔
970
        };
1✔
971

972
        // Create a bundle set with 1 bundle (exceeds max_bundles=0)
973
        let trust_domain = TrustDomain::new("example.org").unwrap();
1✔
974
        let mut bundle = JwtBundle::new(trust_domain.clone());
1✔
975
        bundle.add_jwt_authority(jwk_with_kid("kid-1"));
1✔
976
        let mut bundle_set = JwtBundleSet::new();
1✔
977
        bundle_set.add_bundle(bundle);
1✔
978

979
        // Validate bundle set (simulating validation used in both initial sync and updates)
980
        let result = validate_bundle_set(
1✔
981
            &bundle_set,
1✔
982
            limits,
1✔
983
            Some(metrics.as_ref() as &dyn MetricsRecorder),
1✔
984
        );
985

986
        // Should fail with ResourceLimitExceeded
987
        assert!(matches!(
1✔
988
            result,
1✔
989
            Err(JwtSourceError::ResourceLimitExceeded {
990
                kind: super::super::errors::LimitKind::MaxBundles,
991
                ..
992
            })
993
        ));
994

995
        // Verify limit metric was recorded
996
        assert_eq!(metrics.count(MetricsErrorKind::LimitMaxBundles), 1);
1✔
997
        // Verify UpdateRejected was NOT recorded (that's recorded by apply_update, not validate_bundle_set)
998
        assert_eq!(metrics.count(MetricsErrorKind::UpdateRejected), 0);
1✔
999
        // Verify no other limit metrics were recorded
1000
        assert_eq!(metrics.count(MetricsErrorKind::LimitMaxBundleJwksBytes), 0);
1✔
1001
    }
1✔
1002

1003
    #[test]
1004
    fn test_resource_limits_unlimited() {
1✔
1005
        // Verify that ResourceLimits::unlimited() creates limits with all fields set to None.
1006
        use super::super::builder::ResourceLimits;
1007

1008
        let unlimited = ResourceLimits::unlimited();
1✔
1009
        assert_eq!(unlimited.max_bundles, None);
1✔
1010
        assert_eq!(unlimited.max_bundle_jwks_bytes, None);
1✔
1011
    }
1✔
1012

1013
    #[test]
1014
    fn test_resource_limits_default_limits() {
1✔
1015
        // Verify that ResourceLimits::default_limits() creates limits with conservative defaults.
1016
        use super::super::builder::ResourceLimits;
1017

1018
        let limits = ResourceLimits::default_limits();
1✔
1019
        assert_eq!(limits.max_bundles, Some(200));
1✔
1020
        assert_eq!(limits.max_bundle_jwks_bytes, Some(4 * 1024 * 1024)); // 4MB
1✔
1021
    }
1✔
1022

1023
    #[test]
1024
    fn test_resource_limits_mixed() {
1✔
1025
        // Verify that ResourceLimits can have mixed unlimited and limited fields.
1026
        use super::super::builder::ResourceLimits;
1027

1028
        let mixed = ResourceLimits {
1✔
1029
            max_bundles: Some(50),
1✔
1030
            max_bundle_jwks_bytes: None, // Unlimited
1✔
1031
        };
1✔
1032

1033
        assert_eq!(mixed.max_bundles, Some(50));
1✔
1034
        assert_eq!(mixed.max_bundle_jwks_bytes, None);
1✔
1035
    }
1✔
1036
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc