• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

maxlambrecht / rust-spiffe / 21196161572

21 Jan 2026 03:30AM UTC coverage: 83.194% (-0.9%) from 84.111%
21196161572

push

github

web-flow
Supervisor refactor: shared utilities, improved no-identity backoff, and better diagnostics (#257)

Signed-off-by: Max Lambrecht <maxlambrecht@gmail.com>

77 of 244 new or added lines in 3 files covered. (31.56%)

2 existing lines in 2 files now uncovered.

4688 of 5635 relevant lines covered (83.19%)

300.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

34.0
/spiffe/src/workload_api/supervisor_common.rs
1
//! Common supervisor utilities shared between X.509 and JWT source supervisors.
2
//!
3
//! This module contains reusable components for managing Workload API connections,
4
//! error tracking, and backoff policies.
5

6
#![allow(dead_code)] // Items are used by x509_source and jwt_source supervisor modules
7

8
use std::time::Duration;
9
use tokio::time::sleep;
10
use tokio_util::sync::CancellationToken;
11

12
/// Supervisor policy: maximum number of consecutive identical errors before suppressing WARN logs.
13
///
14
/// Applies to:
15
/// - client creation failures
16
/// - stream connection failures
17
/// - update validation rejections
18
pub(crate) const MAX_CONSECUTIVE_SAME_ERROR: u32 = 3;
19

20
/// Stream connection phase for diagnostics (distinguishes initial sync from steady-state supervisor).
21
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22
pub(crate) enum StreamPhase {
23
    /// Initial sync phase during source construction.
24
    InitialSync,
25
    /// Steady-state supervisor loop maintaining the stream.
26
    Supervisor,
27
}
28

29
/// Allocation-free key type for error tracking categories.
30
///
31
/// Used by `ErrorTracker` to group errors for log suppression without requiring
32
/// string literals or heap allocation.
33
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
34
pub(crate) enum ErrorKey {
35
    /// Client creation failures.
36
    ClientCreation,
37
    /// Stream connection failures.
38
    StreamConnect,
39
    /// Update rejection failures.
40
    UpdateRejected,
41
    /// No identity issued (expected transient state).
42
    NoIdentityIssued,
43
}
44

45
/// Helper for tracking repeated errors to suppress log noise.
46
///
47
/// This tracks consecutive occurrences of the same error kind and suppresses
48
/// log warnings after the first N consecutive occurrences. For the first N
49
/// consecutive occurrences of each error kind, logs are emitted at WARN level.
50
/// After that, logs are downgraded to DEBUG level to reduce noise.
51
///
52
/// When a different error kind occurs or errors stop, the counter resets.
53
pub(crate) struct ErrorTracker {
54
    last_error_kind: Option<ErrorKey>,
55
    consecutive_same_error: u32,
56
    max_consecutive: u32,
57
}
58

59
impl ErrorTracker {
60
    pub(crate) fn new(max_consecutive: u32) -> Self {
81✔
61
        Self {
81✔
62
            last_error_kind: None,
81✔
63
            consecutive_same_error: 0,
81✔
64
            max_consecutive,
81✔
65
        }
81✔
66
    }
81✔
67

NEW
68
    pub(crate) fn record_error(&mut self, error_kind: ErrorKey) -> bool {
×
NEW
69
        let should_warn = self.last_error_kind != Some(error_kind)
×
NEW
70
            || self.consecutive_same_error < self.max_consecutive;
×
71

NEW
72
        if self.last_error_kind == Some(error_kind) {
×
NEW
73
            self.consecutive_same_error += 1;
×
NEW
74
        } else {
×
NEW
75
            self.consecutive_same_error = 1;
×
NEW
76
            self.last_error_kind = Some(error_kind);
×
NEW
77
        }
×
78

NEW
79
        should_warn
×
NEW
80
    }
×
81

82
    pub(crate) fn reset(&mut self) {
56✔
83
        self.consecutive_same_error = 0;
56✔
84
        self.last_error_kind = None;
56✔
85
    }
56✔
86

87
    pub(crate) fn consecutive_count(&self) -> u32 {
10✔
88
        self.consecutive_same_error
10✔
89
    }
10✔
90

91
    pub(crate) fn last_error_kind(&self) -> Option<ErrorKey> {
168✔
92
        self.last_error_kind
168✔
93
    }
168✔
94
}
95

NEW
96
pub(crate) async fn sleep_or_cancel(token: &CancellationToken, dur: Duration) -> bool {
×
NEW
97
    tokio::select! {
×
NEW
98
        () = token.cancelled() => true,
×
NEW
99
        () = sleep(dur) => false,
×
100
    }
NEW
101
}
×
102

103
/// Exponential backoff with small jitter.
104
///
105
/// Computes the next backoff duration by:
106
/// 1. Doubling the current duration (exponential growth)
107
/// 2. Clamping to the maximum duration
108
/// 3. Adding small jitter (0-10% of the base) to prevent synchronized reconnect storms
109
///
110
/// The jitter is especially important in container fleets that start simultaneously.
111
///
112
/// Note: Jitter is calculated in milliseconds, which may result in sub-millisecond
113
/// precision loss for very small durations. This is acceptable for backoff purposes.
114
#[allow(clippy::cast_possible_truncation)]
NEW
115
pub(crate) fn next_backoff(current: Duration, max: Duration) -> Duration {
×
NEW
116
    let cur = current.as_millis().min(u128::from(u64::MAX)) as u64;
×
NEW
117
    let max = max.as_millis().min(u128::from(u64::MAX)) as u64;
×
118

NEW
119
    let base = (cur.saturating_mul(2)).min(max);
×
NEW
120
    if base == 0 {
×
NEW
121
        return Duration::from_millis(0);
×
NEW
122
    }
×
123

NEW
124
    let jitter = base / 10;
×
NEW
125
    let add = if jitter > 0 {
×
NEW
126
        fastrand::u64(0..=jitter)
×
127
    } else {
NEW
128
        0
×
129
    };
130

NEW
131
    Duration::from_millis((base.saturating_add(add)).min(max))
×
NEW
132
}
×
133

134
/// Slower backoff policy for "no identity issued" condition.
135
///
136
/// This is an expected transient state (workload may not be registered yet),
137
/// so we use a gentler backoff: starts at 1s, exponential up to 10s, with jitter.
NEW
138
pub(crate) fn next_backoff_for_no_identity(current: Duration) -> Duration {
×
139
    const MIN_BACKOFF_MS: u64 = 1000; // 1 second
140
    const MAX_BACKOFF_MS: u64 = 10000; // 10 seconds
141

NEW
142
    let current_with_min = current.max(Duration::from_millis(MIN_BACKOFF_MS));
×
NEW
143
    next_backoff(current_with_min, Duration::from_millis(MAX_BACKOFF_MS))
×
NEW
144
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc