• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Unleash / unleash-edge / 16214554656

11 Jul 2025 07:37AM UTC coverage: 78.859%. First build
16214554656

Pull #1035

github

web-flow
Merge 6ff5f05fe into a927a96d0
Pull Request #1035: chore: apply formatter and enforce in the build

168 of 180 new or added lines in 4 files covered. (93.33%)

10661 of 13519 relevant lines covered (78.86%)

5563.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

6.35
/server/src/http/background_send_metrics.rs
1
use std::cmp::max;
2
use std::sync::Arc;
3

4
use chrono::Duration;
5
use dashmap::DashMap;
6
use lazy_static::lazy_static;
7
use prometheus::{IntGauge, IntGaugeVec, Opts, register_int_gauge, register_int_gauge_vec};
8
use reqwest::StatusCode;
9
use tracing::{error, info, trace, warn};
10

11
use crate::metrics::metric_batching::size_of_batch;
12
use crate::types::TokenRefresh;
13
use crate::{error::EdgeError, metrics::client_metrics::MetricsCache};
14

15
use super::refresher::feature_refresher::FeatureRefresher;
16

17
lazy_static! {
18
    pub static ref METRICS_UPSTREAM_HTTP_ERRORS: IntGaugeVec = register_int_gauge_vec!(
19
        Opts::new(
20
            "metrics_upstream_http_errors",
21
            "Failing requests against upstream metrics endpoint"
22
        ),
23
        &["status_code"]
24
    )
25
    .unwrap();
26
    pub static ref METRICS_UNEXPECTED_ERRORS: IntGauge =
27
        register_int_gauge!(Opts::new("metrics_send_error", "Failures to send metrics")).unwrap();
28
    pub static ref METRICS_UPSTREAM_OUTDATED: IntGaugeVec = register_int_gauge_vec!(
29
        Opts::new(
30
            "metrics_upstream_outdated",
31
            "Number of times we have tried to send metrics to an outdated endpoint"
32
        ),
33
        &["environment"]
34
    )
35
    .unwrap();
36
    pub static ref METRICS_UPSTREAM_CLIENT_BULK: IntGaugeVec = register_int_gauge_vec!(
37
        Opts::new(
38
            "metrics_upstream_client_bulk",
39
            "Number of times we have tried to send metrics to the client bulk endpoint"
40
        ),
41
        &["environment"]
42
    )
43
    .unwrap();
44
    pub static ref METRICS_INTERVAL_BETWEEN_SEND: IntGauge = register_int_gauge!(Opts::new(
45
        "metrics_interval_between_send",
46
        "Interval between sending metrics"
47
    ))
48
    .unwrap();
49
}
50

51
fn decide_where_to_post(
×
52
    environment: &String,
×
53
    known_tokens: Arc<DashMap<String, TokenRefresh>>,
×
54
) -> (bool, String) {
×
55
    if let Some(token_refresh) = known_tokens
×
56
        .iter()
×
57
        .find(|t| t.token.environment == Some(environment.to_string()))
×
58
    {
59
        METRICS_UPSTREAM_CLIENT_BULK
×
60
            .with_label_values(&[environment])
×
61
            .inc();
×
62
        (true, token_refresh.token.token.clone())
×
63
    } else {
64
        (false, "".into())
×
65
    }
66
}
×
67

68
pub async fn send_metrics_one_shot(
×
69
    metrics_cache: Arc<MetricsCache>,
×
70
    feature_refresher: Arc<FeatureRefresher>,
×
71
) {
×
72
    let envs = metrics_cache.get_metrics_by_environment();
×
73
    for (env, batch) in envs.iter() {
×
74
        let (use_new_endpoint, token) =
×
75
            decide_where_to_post(env, feature_refresher.tokens_to_refresh.clone());
×
76
        let batches = metrics_cache.get_appropriately_sized_env_batches(batch);
×
77
        trace!("Posting {} batches for {env}", batches.len());
×
78
        for batch in batches {
×
NEW
79
            if !batch.applications.is_empty()
×
NEW
80
                || !batch.metrics.is_empty()
×
NEW
81
                || !batch.impact_metrics.is_empty()
×
82
            {
83
                let result = if use_new_endpoint {
×
84
                    feature_refresher
×
85
                        .unleash_client
×
86
                        .send_bulk_metrics_to_client_endpoint(batch.clone(), &token)
×
87
                        .await
×
88
                } else {
89
                    feature_refresher
×
90
                        .unleash_client
×
91
                        .send_batch_metrics(batch.clone(), None)
×
92
                        .await
×
93
                };
94
                if let Err(edge_error) = result {
×
95
                    warn!("Shut down metrics flush failed with {edge_error:?}")
×
96
                }
×
97
            }
×
98
        }
99
    }
100
}
×
101

102
pub async fn send_metrics_task(
×
103
    metrics_cache: Arc<MetricsCache>,
×
104
    feature_refresher: Arc<FeatureRefresher>,
×
105
    send_interval: i64,
×
106
) {
×
107
    let mut failures = 0;
×
108
    let mut interval = Duration::seconds(send_interval);
×
109
    loop {
110
        trace!("Looping metrics");
×
111
        let envs = metrics_cache.get_metrics_by_environment();
×
112
        for (env, batch) in envs.iter() {
×
113
            let (use_new_endpoint, token) =
×
114
                decide_where_to_post(env, feature_refresher.tokens_to_refresh.clone());
×
115
            let batches = metrics_cache.get_appropriately_sized_env_batches(batch);
×
116
            trace!("Posting {} batches for {env}", batches.len());
×
117
            for batch in batches {
×
NEW
118
                if !batch.applications.is_empty()
×
NEW
119
                    || !batch.metrics.is_empty()
×
NEW
120
                    || !batch.impact_metrics.is_empty()
×
121
                {
122
                    let result = if use_new_endpoint {
×
123
                        feature_refresher
×
124
                            .unleash_client
×
125
                            .send_bulk_metrics_to_client_endpoint(batch.clone(), &token)
×
126
                            .await
×
127
                    } else {
128
                        feature_refresher
×
129
                            .unleash_client
×
130
                            .send_batch_metrics(batch.clone(), Some(interval.num_milliseconds()))
×
131
                            .await
×
132
                    };
133
                    if let Err(edge_error) = result {
×
134
                        match edge_error {
×
135
                            EdgeError::EdgeMetricsRequestError(status_code, message) => {
×
136
                                METRICS_UPSTREAM_HTTP_ERRORS
×
137
                                    .with_label_values(&[status_code.as_str()])
×
138
                                    .inc();
×
139
                                match status_code {
×
140
                                    StatusCode::PAYLOAD_TOO_LARGE => error!(
×
141
                                        "Metrics were too large. They were {}",
×
142
                                        size_of_batch(&batch)
×
143
                                    ),
144
                                    StatusCode::BAD_REQUEST => {
145
                                        error!(
×
146
                                            "Unleash said [{message:?}]. Dropping this metric bucket to avoid consuming too much memory"
×
147
                                        );
148
                                    }
149
                                    StatusCode::NOT_FOUND => {
150
                                        failures = 10;
×
151
                                        interval = new_interval(send_interval, failures);
×
152
                                        error!(
×
153
                                            "Upstream said we are trying to post to an endpoint that doesn't exist. backing off to {} seconds",
×
154
                                            interval.num_seconds()
×
155
                                        );
156
                                    }
157
                                    StatusCode::FORBIDDEN | StatusCode::UNAUTHORIZED => {
158
                                        failures = 10;
×
159
                                        interval = new_interval(send_interval, failures);
×
160
                                        error!(
×
161
                                            "Upstream said we were not allowed to post metrics, backing off to {} seconds",
×
162
                                            interval.num_seconds()
×
163
                                        );
164
                                    }
165
                                    StatusCode::TOO_MANY_REQUESTS => {
166
                                        failures = max(10, failures + 1);
×
167
                                        interval = new_interval(send_interval, failures);
×
168
                                        info!(
×
169
                                            "Upstream said it was too busy, backing off to {} seconds",
×
170
                                            interval.num_seconds()
×
171
                                        );
172
                                        metrics_cache.reinsert_batch(batch);
×
173
                                    }
174
                                    StatusCode::INTERNAL_SERVER_ERROR
175
                                    | StatusCode::BAD_GATEWAY
176
                                    | StatusCode::SERVICE_UNAVAILABLE
177
                                    | StatusCode::GATEWAY_TIMEOUT => {
178
                                        failures = max(10, failures + 1);
×
179
                                        interval = new_interval(send_interval, failures);
×
180
                                        info!(
×
181
                                            "Upstream said it is struggling. It returned Http status {}. Backing off to {} seconds",
×
182
                                            status_code,
×
183
                                            interval.num_seconds()
×
184
                                        );
185
                                        metrics_cache.reinsert_batch(batch);
×
186
                                    }
187
                                    _ => {
188
                                        warn!(
×
189
                                            "Failed to send metrics. Status code was {status_code}. Will reinsert metrics for next attempt"
×
190
                                        );
191
                                        metrics_cache.reinsert_batch(batch);
×
192
                                    }
193
                                }
194
                            }
195
                            _ => {
196
                                warn!("Failed to send metrics: {edge_error:?}");
×
197
                                METRICS_UNEXPECTED_ERRORS.inc();
×
198
                            }
199
                        }
200
                    } else {
×
201
                        failures = max(0, failures - 1);
×
202
                        interval = new_interval(send_interval, failures);
×
203
                    }
×
204
                }
×
205
            }
206
        }
207
        trace!(
×
208
            "Done posting traces. Sleeping for {} seconds and then going again",
×
209
            interval.num_seconds()
×
210
        );
211
        METRICS_INTERVAL_BETWEEN_SEND.set(interval.num_seconds());
×
212
        tokio::time::sleep(std::time::Duration::from_secs(interval.num_seconds() as u64)).await;
×
213
    }
214
}
215

216
fn new_interval(send_interval: i64, failures: i64) -> Duration {
1✔
217
    let added_interval_from_failure = send_interval * failures;
1✔
218
    Duration::seconds(send_interval + added_interval_from_failure)
1✔
219
}
1✔
220

221
#[cfg(test)]
222
mod tests {
223
    use crate::http::background_send_metrics::new_interval;
224

225
    #[tokio::test]
226
    pub async fn new_interval_does_not_overflow() {
1✔
227
        let metrics = new_interval(300, 10);
1✔
228
        assert!(metrics.num_seconds() < 3305);
1✔
229
    }
1✔
230
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc