• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Unleash / unleash-edge / 16366014373

18 Jul 2025 08:30AM UTC coverage: 78.338% (+0.2%) from 78.139%
16366014373

push

github

web-flow
fix: shared reqwest client so keep alive timeout is not exceeded (#1045)

21 of 87 new or added lines in 3 files covered. (24.14%)

2 existing lines in 2 files now uncovered.

10719 of 13683 relevant lines covered (78.34%)

5491.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/server/src/http/instance_data.rs
1
use reqwest::{StatusCode, Url};
2
use std::sync::Arc;
3
use tokio::sync::RwLock;
4

5
use crate::cli::{CliArgs, EdgeMode};
6
use crate::error::EdgeError;
7
use crate::http::unleash_client::{ClientMetaInformation, UnleashClient};
8
use crate::metrics::edge_metrics::EdgeInstanceData;
9
use prometheus::Registry;
10
use tracing::{debug, warn};
11

12
#[derive(Debug, Clone)]
13
pub struct InstanceDataSender {
14
    pub unleash_client: Arc<UnleashClient>,
15
    pub registry: Registry,
16
    pub token: String,
17
    pub base_path: String,
18
}
19

20
#[derive(Debug, Clone)]
21
pub enum InstanceDataSending {
22
    SendNothing,
23
    SendInstanceData(InstanceDataSender),
24
}
25

26
impl InstanceDataSending {
27
    pub fn from_args(
×
28
        args: CliArgs,
×
NEW
29
        client_meta_information: &ClientMetaInformation,
×
NEW
30
        http_client: reqwest::Client,
×
31
        registry: Registry,
×
32
    ) -> Result<Self, EdgeError> {
×
33
        match args.mode {
×
NEW
34
            EdgeMode::Edge(edge_args) => edge_args
×
NEW
35
                .tokens
×
NEW
36
                .first()
×
NEW
37
                .map(|token| {
×
NEW
38
                    let unleash_client = Url::parse(&edge_args.upstream_url.clone())
×
NEW
39
                        .map(|url| {
×
NEW
40
                            UnleashClient::from_url(
×
NEW
41
                                url,
×
NEW
42
                                args.auth_headers
×
NEW
43
                                    .upstream_auth_header
×
NEW
44
                                    .clone()
×
NEW
45
                                    .unwrap_or("Authorization".to_string()),
×
NEW
46
                                http_client,
×
NEW
47
                                client_meta_information.clone(),
×
NEW
48
                            )
×
NEW
49
                        })
×
NEW
50
                        .map(|c| {
×
NEW
51
                            c.with_custom_client_headers(edge_args.custom_client_headers.clone())
×
NEW
52
                        })
×
NEW
53
                        .map(Arc::new)
×
NEW
54
                        .map_err(|_| EdgeError::InvalidServerUrl(edge_args.upstream_url.clone()))
×
NEW
55
                        .expect("Could not construct UnleashClient");
×
NEW
56
                    let instance_data_sender = InstanceDataSender {
×
NEW
57
                        unleash_client,
×
NEW
58
                        token: token.clone(),
×
NEW
59
                        base_path: args.http.base_path.clone(),
×
NEW
60
                        registry,
×
NEW
61
                    };
×
NEW
62
                    InstanceDataSending::SendInstanceData(instance_data_sender)
×
NEW
63
                })
×
NEW
64
                .map(Ok)
×
NEW
65
                .unwrap_or(Ok(InstanceDataSending::SendNothing)),
×
UNCOV
66
            _ => Ok(InstanceDataSending::SendNothing),
×
67
        }
68
    }
×
69
}
70

71
pub async fn send_instance_data(
×
72
    instance_data_sender: &InstanceDataSender,
×
73
    our_instance_data: Arc<EdgeInstanceData>,
×
74
    downstream_instance_data: Arc<RwLock<Vec<EdgeInstanceData>>>,
×
75
) -> Result<(), EdgeError> {
×
76
    let observed_data = our_instance_data.observe(
×
77
        &instance_data_sender.registry,
×
78
        downstream_instance_data.read().await.clone(),
×
79
        &instance_data_sender.base_path,
×
80
    );
×
81
    instance_data_sender
×
82
        .unleash_client
×
83
        .post_edge_observability_data(observed_data, &instance_data_sender.token)
×
84
        .await
×
85
}
×
86
pub async fn loop_send_instance_data(
×
87
    instance_data_sender: Arc<InstanceDataSending>,
×
88
    our_instance_data: Arc<EdgeInstanceData>,
×
89
    downstream_instance_data: Arc<RwLock<Vec<EdgeInstanceData>>>,
×
90
) {
×
91
    let mut errors = 0;
×
92
    let delay = std::time::Duration::from_secs(60);
×
93
    loop {
94
        tokio::time::sleep(std::time::Duration::from_secs(60) + delay * std::cmp::min(errors, 10))
×
95
            .await;
×
96
        match instance_data_sender.as_ref() {
×
97
            InstanceDataSending::SendNothing => {
98
                debug!("No instance data sender found. Doing nothing.");
×
99
                continue;
×
100
            }
101
            InstanceDataSending::SendInstanceData(instance_data_sender) => {
×
102
                let status = send_instance_data(
×
103
                    instance_data_sender,
×
104
                    our_instance_data.clone(),
×
105
                    downstream_instance_data.clone(),
×
106
                )
×
107
                .await;
×
108
                if let Err(e) = status {
×
109
                    match e {
×
110
                        EdgeError::EdgeMetricsRequestError(status, _) => {
×
111
                            if status == StatusCode::NOT_FOUND {
×
112
                                debug!(
×
113
                                    "Our upstream is not running a version that supports edge metrics."
×
114
                                );
115
                                errors += 1;
×
116
                                downstream_instance_data.write().await.clear();
×
117
                                our_instance_data.clear_time_windowed_metrics();
×
118
                            } else if status == StatusCode::FORBIDDEN {
×
119
                                warn!(
×
120
                                    "Upstream edge metrics said our token wasn't allowed to post data"
×
121
                                );
122
                                errors += 1;
×
123
                                downstream_instance_data.write().await.clear();
×
124
                                our_instance_data.clear_time_windowed_metrics();
×
125
                            }
×
126
                        }
127
                        _ => {
128
                            warn!("Failed to post instance data due to unknown error {e:?}");
×
129
                        }
130
                    }
131
                } else {
132
                    debug!("Successfully posted observability metrics.");
×
133
                    errors = 0;
×
134
                    downstream_instance_data.write().await.clear();
×
135
                    our_instance_data.clear_time_windowed_metrics();
×
136
                }
137
            }
138
        }
139
    }
140
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc