• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

maxlambrecht / rust-spiffe / 21214457836

21 Jan 2026 03:02PM UTC coverage: 83.127% (-0.07%) from 83.194%
21214457836

push

github

web-flow
feat(observability): consume log args when features are disabled (#263)

Signed-off-by: Max Lambrecht <maxlambrecht@gmail.com>

19 of 56 new or added lines in 7 files covered. (33.93%)

19 existing lines in 5 files now uncovered.

4700 of 5654 relevant lines covered (83.13%)

299.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

43.23
/spiffe/src/jwt_source/supervisor.rs
1
use super::builder::{ReconnectConfig, ResourceLimits};
2
use super::errors::{JwtSourceError, MetricsErrorKind};
3
use super::limits::validate_bundle_set;
4
use super::metrics::MetricsRecorder;
5
use super::source::Inner;
6
use super::types::ClientFactory;
7
use crate::bundle::jwt::JwtBundleSet;
8
use crate::prelude::{debug, info, warn};
9
use crate::workload_api::error::WorkloadApiError;
10
use crate::workload_api::supervisor_common::{
11
    self, ErrorKey, ErrorTracker, StreamPhase, MAX_CONSECUTIVE_SAME_ERROR,
12
};
13
use crate::workload_api::WorkloadApiClient;
14
use std::sync::Arc;
15
use std::time::Duration;
16
use tokio_stream::StreamExt;
17
use tokio_util::sync::CancellationToken;
18

19
/// Attempts to create a Workload API client.
20
///
21
/// Records metrics and logs errors.
22
///
23
/// On error, records `ClientCreation` metric. The caller should call `record_reconnect`
24
/// for steady-state reconnections (not for initial sync).
25
pub(super) async fn try_create_client(
29✔
26
    make_client: &ClientFactory,
29✔
27
    _min_backoff: Duration,
29✔
28
    backoff: &mut Duration,
29✔
29
    error_tracker: &mut ErrorTracker,
29✔
30
    metrics: Option<&dyn MetricsRecorder>,
29✔
31
) -> Result<WorkloadApiClient, WorkloadApiError> {
29✔
32
    match (make_client)().await {
29✔
33
        Ok(c) => {
29✔
34
            // Only log recovery if there were significant consecutive failures (>= 3) of the same type
35
            if error_tracker.last_error_kind() == Some(ErrorKey::ClientCreation)
29✔
36
                && error_tracker.consecutive_count() >= 3
×
37
            {
×
38
                debug!(
×
39
                    "Client creation recovered after {} consecutive failures",
×
40
                    error_tracker.consecutive_count()
×
41
                );
×
42
            }
29✔
43
            // Only reset tracker if the last error was client creation (actual recovery).
44
            // Don't reset if tracking stream connection errors (e.g., NoIdentityIssued).
45
            if error_tracker.last_error_kind() == Some(ErrorKey::ClientCreation) {
29✔
46
                error_tracker.reset();
×
47
            }
29✔
48
            Ok(c)
29✔
49
        }
50
        Err(e) => {
×
51
            let error_kind = ErrorKey::ClientCreation;
×
52
            let should_warn = error_tracker.record_error(error_kind);
×
53

54
            if should_warn {
×
55
                warn!(
×
56
                    "Failed to create WorkloadApiClient; retrying: error={}, backoff_ms={}",
×
57
                    e,
×
58
                    backoff.as_millis()
×
59
                );
×
60
            } else {
×
61
                debug!(
×
62
                    "Failed to create WorkloadApiClient (repeated); retrying: error={}, backoff_ms={}, consecutive_failures={}",
×
63
                    e,
×
64
                    backoff.as_millis(),
×
65
                    error_tracker.consecutive_count()
×
66
                );
×
67
            }
×
68

69
            if let Some(m) = metrics {
×
70
                m.record_error(MetricsErrorKind::ClientCreation);
×
71
            }
×
72
            Err(e)
×
73
        }
74
    }
75
}
29✔
76

77
/// Attempts to connect to the JWT bundle stream.
78
///
79
/// Records metrics and logs errors. Resets backoff to `min_backoff` on success.
80
///
81
/// On error, records `StreamConnect` metric. The caller should call `record_reconnect`
82
/// for steady-state reconnections (not for initial sync).
83
pub(super) async fn try_connect_stream(
29✔
84
    client: &WorkloadApiClient,
29✔
85
    min_backoff: Duration,
29✔
86
    backoff: &mut Duration,
29✔
87
    error_tracker: &mut ErrorTracker,
29✔
88
    metrics: Option<&dyn MetricsRecorder>,
29✔
89
    phase: StreamPhase,
29✔
90
    supervisor_id: Option<u64>,
29✔
91
) -> Result<
29✔
92
    impl tokio_stream::Stream<Item = Result<JwtBundleSet, WorkloadApiError>> + Send + 'static,
29✔
93
    WorkloadApiError,
29✔
94
> {
29✔
95
    match client.stream_jwt_bundles().await {
29✔
96
        Ok(s) => {
29✔
97
            let id_suffix = supervisor_id.map_or_else(String::new, |id| format!(", id={id}"));
29✔
98

99
            // Only log recovery if the last error was a stream connection failure
100
            if error_tracker.last_error_kind() == Some(ErrorKey::StreamConnect)
29✔
101
                && error_tracker.consecutive_count() > 0
×
102
            {
×
103
                info!(
×
104
                    "Stream connection recovered after {} consecutive failures (phase={:?}{})",
×
105
                    error_tracker.consecutive_count(),
×
NEW
106
                    phase,
×
NEW
107
                    id_suffix
×
UNCOV
108
                );
×
109
            }
29✔
110
            error_tracker.reset();
29✔
111
            info!(
29✔
112
                "Connected to Workload API JWT bundle stream (phase={:?}{})",
29✔
113
                phase, id_suffix
114
            );
115
            *backoff = min_backoff;
29✔
116
            Ok(s)
29✔
117
        }
118
        Err(e) => {
×
119
            // Handle "no identity issued" as a distinct transient state
120
            if matches!(e, WorkloadApiError::NoIdentityIssued) {
×
121
                let error_kind = ErrorKey::NoIdentityIssued;
×
122
                let should_warn = error_tracker.record_error(error_kind);
×
123

124
                if should_warn {
×
125
                    warn!("No identity issued yet; waiting before retry");
×
126
                } else {
×
127
                    debug!(
×
128
                        "No identity issued yet (repeated); waiting before retry: consecutive_failures={}",
×
129
                        error_tracker.consecutive_count()
×
130
                    );
×
131
                }
×
132

133
                // Don't record this as a stream error metric (it's expected/transient)
134
                return Err(e);
×
135
            }
×
136

137
            let error_kind = ErrorKey::StreamConnect;
×
138
            let should_warn = error_tracker.record_error(error_kind);
×
139

140
            if should_warn {
×
141
                warn!(
×
142
                    "Failed to connect to Workload API stream; retrying: error={}, backoff_ms={}",
×
143
                    e,
×
144
                    backoff.as_millis()
×
145
                );
×
146
            } else {
×
147
                debug!(
×
148
                    "Failed to connect to Workload API stream (repeated); retrying: error={}, backoff_ms={}, consecutive_failures={}",
×
149
                    e,
×
150
                    backoff.as_millis(),
×
151
                    error_tracker.consecutive_count()
×
152
                );
×
153
            }
×
154

155
            if let Some(m) = metrics {
×
156
                m.record_error(MetricsErrorKind::StreamConnect);
×
157
            }
×
158
            Err(e)
×
159
        }
160
    }
161
}
29✔
162

163
pub(super) async fn initial_sync_with_retry(
23✔
164
    make_client: &ClientFactory,
23✔
165
    cancel: &CancellationToken,
23✔
166
    reconnect: ReconnectConfig,
23✔
167
    limits: ResourceLimits,
23✔
168
    metrics: Option<&dyn MetricsRecorder>,
23✔
169
) -> Result<Arc<JwtBundleSet>, JwtSourceError> {
23✔
170
    let mut backoff = reconnect.min_backoff;
23✔
171
    let mut error_tracker = ErrorTracker::new(MAX_CONSECUTIVE_SAME_ERROR);
23✔
172

173
    loop {
174
        if cancel.is_cancelled() {
23✔
175
            return Err(JwtSourceError::Closed);
×
176
        }
23✔
177

178
        match try_sync_once(
23✔
179
            make_client,
23✔
180
            limits,
23✔
181
            metrics,
23✔
182
            reconnect.min_backoff,
23✔
183
            &mut backoff,
23✔
184
            &mut error_tracker,
23✔
185
        )
186
        .await
23✔
187
        {
188
            Ok(v) => return Ok(v),
23✔
189
            Err(e) => {
×
190
                // Record InitialSyncFailed as an umbrella metric for any failed attempt.
191
                // Specific metrics (ClientCreation, StreamConnect, StreamError, StreamEnded,
192
                // LimitMaxBundles, LimitMaxBundleJwksBytes) are already recorded in try_sync_once().
193
                // Detailed logs are also produced by try_create_client/try_connect_stream/stream read,
194
                // so we avoid duplicate outer logs here to reduce noise.
195
                if let Some(m) = metrics {
×
196
                    m.record_error(MetricsErrorKind::InitialSyncFailed);
×
197
                }
×
198
                if sleep_or_cancel(cancel, backoff).await {
×
199
                    return Err(JwtSourceError::Closed);
×
200
                }
×
201
                // Choose backoff policy based on error type
202
                match &e {
×
203
                    JwtSourceError::Source(WorkloadApiError::NoIdentityIssued) => {
×
204
                        // Use slower backoff for "no identity issued" (expected transient state)
×
205
                        backoff = next_backoff_for_no_identity(backoff);
×
206
                        warn!(
×
207
                            "Initial sync: no identity issued, using backoff_ms={}",
×
208
                            backoff.as_millis()
×
209
                        );
×
210
                    }
×
211
                    _ => {
×
212
                        // Use standard exponential backoff for other errors
×
213
                        backoff = next_backoff(backoff, reconnect.max_backoff);
×
214
                    }
×
215
                }
216
            }
217
        }
218
    }
219
}
23✔
220

221
#[allow(clippy::too_many_arguments)]
222
async fn try_sync_once(
23✔
223
    make_client: &ClientFactory,
23✔
224
    limits: ResourceLimits,
23✔
225
    metrics: Option<&dyn MetricsRecorder>,
23✔
226
    min_backoff: Duration,
23✔
227
    backoff: &mut Duration,
23✔
228
    error_tracker: &mut ErrorTracker,
23✔
229
) -> Result<Arc<JwtBundleSet>, JwtSourceError> {
23✔
230
    // Use shared client creation logic (records ClientCreation metric on failure).
231
    // Initial sync does not record reconnect metrics (it's not a reconnect).
232
    let client =
23✔
233
        match try_create_client(make_client, min_backoff, backoff, error_tracker, metrics).await {
23✔
234
            Ok(c) => c,
23✔
235
            Err(e) => {
×
236
                return Err(JwtSourceError::Source(e));
×
237
            }
238
        };
239

240
    // Use shared stream connection logic (records StreamConnect metric on failure).
241
    let mut stream = match try_connect_stream(
23✔
242
        &client,
23✔
243
        min_backoff,
23✔
244
        backoff,
23✔
245
        error_tracker,
23✔
246
        metrics,
23✔
247
        StreamPhase::InitialSync,
23✔
248
        None,
23✔
249
    )
250
    .await
23✔
251
    {
252
        Ok(s) => s,
23✔
253
        Err(e) => {
×
254
            return Err(JwtSourceError::Source(e));
×
255
        }
256
    };
257

258
    match stream.next().await {
23✔
259
        Some(Ok(bundle_set)) => {
23✔
260
            validate_bundle_set(&bundle_set, limits, metrics).inspect_err(|e| {
23✔
NEW
261
                warn!("Initial JWT bundle set rejected; will retry: error={e}");
×
UNCOV
262
            })?;
×
263

264
            Ok(Arc::new(bundle_set))
23✔
265
        }
266
        Some(Err(e)) => {
×
267
            // Record StreamError for stream read errors.
NEW
268
            warn!("Initial sync: Workload API stream error; will retry: error={e}");
×
UNCOV
269
            if let Some(m) = metrics {
×
270
                m.record_error(MetricsErrorKind::StreamError);
×
271
            }
×
272
            Err(JwtSourceError::Source(e))
×
273
        }
274
        None => {
275
            // Record StreamEnded for empty stream.
UNCOV
276
            warn!("Initial sync: Workload API stream ended immediately; will retry");
×
277
            if let Some(m) = metrics {
×
278
                m.record_error(MetricsErrorKind::StreamEnded);
×
279
            }
×
280
            Err(JwtSourceError::StreamEnded)
×
281
        }
282
    }
283
}
23✔
284

285
pub(super) use supervisor_common::{next_backoff, next_backoff_for_no_identity, sleep_or_cancel};
286

287
impl Inner {
288
    pub(super) async fn run_update_supervisor(&self, cancellation_token: CancellationToken) {
14✔
289
        // Generate a unique supervisor ID for diagnostics (detects duplicate supervisors/repeated constructions).
290
        let supervisor_id = fastrand::u64(..);
14✔
291
        info!("Starting update supervisor: id={}", supervisor_id);
14✔
292

293
        let mut backoff = self.reconnect().min_backoff;
14✔
294
        let mut error_tracker = ErrorTracker::new(MAX_CONSECUTIVE_SAME_ERROR);
14✔
295

296
        loop {
297
            if cancellation_token.is_cancelled() {
14✔
298
                debug!("Cancellation signal received; stopping updates");
8✔
299
                return;
8✔
300
            }
6✔
301

302
            let Ok(client) = try_create_client(
6✔
303
                self.make_client(),
6✔
304
                self.reconnect().min_backoff,
6✔
305
                &mut backoff,
6✔
306
                &mut error_tracker,
6✔
307
                self.metrics(),
6✔
308
            )
309
            .await
6✔
310
            else {
311
                if self
×
312
                    .backoff_and_maybe_cancel(&cancellation_token, backoff)
×
313
                    .await
×
314
                {
315
                    return;
×
316
                }
×
317
                backoff = next_backoff(backoff, self.reconnect().max_backoff);
×
318
                continue;
×
319
            };
320

321
            match try_connect_stream(
6✔
322
                &client,
6✔
323
                self.reconnect().min_backoff,
6✔
324
                &mut backoff,
6✔
325
                &mut error_tracker,
6✔
326
                self.metrics(),
6✔
327
                StreamPhase::Supervisor,
6✔
328
                Some(supervisor_id),
6✔
329
            )
330
            .await
6✔
331
            {
332
                Ok(mut stream) => {
6✔
333
                    // Process stream updates. Returns true if cancelled, false if reconnect needed.
334
                    let cancelled = self
6✔
335
                        .process_stream_updates(&mut stream, &cancellation_token, supervisor_id)
6✔
336
                        .await;
6✔
337
                    if cancelled {
×
338
                        return;
×
339
                    }
×
340

341
                    // Stream ended or errored. Sleep/backoff before retrying.
342
                    if self
×
343
                        .backoff_and_maybe_cancel(&cancellation_token, backoff)
×
344
                        .await
×
345
                    {
346
                        return;
×
347
                    }
×
348
                    backoff = next_backoff(backoff, self.reconnect().max_backoff);
×
349
                }
350
                Err(stream_err) => {
×
351
                    // Choose backoff policy based on error type
352
                    match stream_err {
×
353
                        WorkloadApiError::NoIdentityIssued => {
×
354
                            // Use slower backoff for "no identity issued" (expected transient state)
×
355
                            backoff = next_backoff_for_no_identity(backoff);
×
356
                            warn!(
×
357
                                "No identity issued: using backoff_ms={}",
×
358
                                backoff.as_millis()
×
359
                            );
×
360
                        }
×
361
                        _ => {
×
362
                            // Use standard exponential backoff for other errors
×
363
                            backoff = next_backoff(backoff, self.reconnect().max_backoff);
×
364
                        }
×
365
                    }
366

367
                    if self
×
368
                        .backoff_and_maybe_cancel(&cancellation_token, backoff)
×
369
                        .await
×
370
                    {
371
                        return;
×
372
                    }
×
373
                }
374
            }
375
        }
376
    }
8✔
377

378
    async fn backoff_and_maybe_cancel(&self, token: &CancellationToken, backoff: Duration) -> bool {
×
379
        if let Some(metrics) = self.metrics() {
×
380
            metrics.record_reconnect();
×
381
        }
×
382
        sleep_or_cancel(token, backoff).await
×
383
    }
×
384

385
    async fn process_stream_updates(
6✔
386
        &self,
6✔
387
        stream: &mut (impl tokio_stream::Stream<Item = Result<JwtBundleSet, WorkloadApiError>>
6✔
388
                  + Unpin
6✔
389
                  + Send
6✔
390
                  + 'static),
6✔
391
        cancellation_token: &CancellationToken,
6✔
392
        supervisor_id: u64,
6✔
393
    ) -> bool {
6✔
394
        let mut update_rejection_tracker = ErrorTracker::new(MAX_CONSECUTIVE_SAME_ERROR);
6✔
395

396
        loop {
397
            let item = tokio::select! {
12✔
398
                () = cancellation_token.cancelled() => {
12✔
UNCOV
399
                    debug!("Cancellation signal received; stopping update loop");
×
400
                    return true;
×
401
                }
402
                v = stream.next() => v,
12✔
403
            };
404

405
            match item {
6✔
406
                Some(Ok(bundle_set)) => {
6✔
407
                    match self.apply_update(std::sync::Arc::new(bundle_set)) {
6✔
408
                        Ok(()) => {
409
                            if update_rejection_tracker.consecutive_count() > 0 {
6✔
410
                                info!(
×
411
                                    "Update validation recovered after {} consecutive failures",
×
412
                                    update_rejection_tracker.consecutive_count(),
×
413
                                );
×
414
                                update_rejection_tracker.reset();
×
415
                            }
6✔
416
                            info!("JWT bundle set updated");
6✔
417
                        }
NEW
418
                        Err(e) => {
×
419
                            let should_warn =
×
420
                                update_rejection_tracker.record_error(ErrorKey::UpdateRejected);
×
421

422
                            if should_warn {
×
NEW
423
                                warn!("Rejected JWT bundle set update: error={e}");
×
424
                            } else {
×
425
                                debug!(
×
426
                                    "Rejected JWT bundle set update (repeated): error={}, consecutive_rejections={}",
×
NEW
427
                                    e,
×
428
                                    update_rejection_tracker.consecutive_count()
×
429
                                );
×
430
                            }
×
431
                            // Metrics already recorded by apply_update(); do not double-count.
432
                        }
433
                    }
434
                }
NEW
435
                Some(Err(e)) => {
×
NEW
436
                    warn!("Workload API stream error; reconnecting: id={supervisor_id}, error={e}");
×
437
                    self.record_error(MetricsErrorKind::StreamError);
×
438
                    return false;
×
439
                }
440
                None => {
NEW
441
                    warn!("Workload API stream ended; reconnecting: id={supervisor_id}");
×
UNCOV
442
                    self.record_error(MetricsErrorKind::StreamEnded);
×
443
                    return false;
×
444
                }
445
            }
446
        }
447
    }
×
448
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc