• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Unleash / unleash-edge / 15441100167

04 Jun 2025 11:28AM UTC coverage: 78.265% (+10.3%) from 67.995%
15441100167

Pull #970

github

web-flow
Merge 8ad457ed6 into 34ad3228b
Pull Request #970: task(rust): Update Rust version to 1.87.0

10140 of 12956 relevant lines covered (78.26%)

158.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/server/src/http/instance_data.rs
1
use chrono::Duration;
2
use reqwest::{StatusCode, Url};
3
use std::sync::Arc;
4
use tokio::sync::RwLock;
5

6
use crate::cli::{CliArgs, EdgeMode};
7
use crate::error::EdgeError;
8
use crate::http::unleash_client::{ClientMetaInformation, UnleashClient, new_reqwest_client};
9
use crate::metrics::edge_metrics::EdgeInstanceData;
10
use prometheus::Registry;
11
use tracing::{debug, warn};
12

13
#[derive(Debug, Clone)]
14
pub struct InstanceDataSender {
15
    pub unleash_client: Arc<UnleashClient>,
16
    pub registry: Registry,
17
    pub token: String,
18
    pub base_path: String,
19
}
20

21
#[derive(Debug, Clone)]
22
pub enum InstanceDataSending {
23
    SendNothing,
24
    SendInstanceData(InstanceDataSender),
25
}
26

27
impl InstanceDataSending {
28
    pub fn from_args(
×
29
        args: CliArgs,
×
30
        instance_data: Arc<EdgeInstanceData>,
×
31
        registry: Registry,
×
32
    ) -> Result<Self, EdgeError> {
×
33
        match args.mode {
×
34
            EdgeMode::Edge(edge_args) => {
×
35
                let identifier = instance_data.identifier.clone();
×
36
                edge_args
×
37
                    .tokens
×
38
                    .first()
×
39
                    .map(|token| {
×
40
                        let client_meta_information = ClientMetaInformation {
×
41
                            app_name: args.app_name,
×
42
                            instance_id: identifier.clone(),
×
43
                            connection_id: identifier,
×
44
                        };
×
45
                        let http_client = new_reqwest_client(
×
46
                            edge_args.skip_ssl_verification,
×
47
                            edge_args.client_identity.clone(),
×
48
                            edge_args.upstream_certificate_file.clone(),
×
49
                            Duration::seconds(edge_args.upstream_request_timeout),
×
50
                            Duration::seconds(edge_args.upstream_socket_timeout),
×
51
                            client_meta_information.clone(),
×
52
                        )
×
53
                        .expect(
×
54
                            "Could not construct reqwest client for posting observability data",
×
55
                        );
×
56
                        let unleash_client = Url::parse(&edge_args.upstream_url.clone())
×
57
                            .map(|url| {
×
58
                                UnleashClient::from_url(
×
59
                                    url,
×
60
                                    args.auth_headers
×
61
                                        .upstream_auth_header
×
62
                                        .clone()
×
63
                                        .unwrap_or("Authorization".to_string()),
×
64
                                    http_client,
×
65
                                    client_meta_information.clone(),
×
66
                                )
×
67
                            })
×
68
                            .map(|c| {
×
69
                                c.with_custom_client_headers(
×
70
                                    edge_args.custom_client_headers.clone(),
×
71
                                )
×
72
                            })
×
73
                            .map(Arc::new)
×
74
                            .map_err(|_| {
×
75
                                EdgeError::InvalidServerUrl(edge_args.upstream_url.clone())
×
76
                            })
×
77
                            .expect("Could not construct UnleashClient");
×
78
                        let instance_data_sender = InstanceDataSender {
×
79
                            unleash_client,
×
80
                            token: token.clone(),
×
81
                            base_path: args.http.base_path.clone(),
×
82
                            registry,
×
83
                        };
×
84
                        InstanceDataSending::SendInstanceData(instance_data_sender)
×
85
                    })
×
86
                    .map(Ok)
×
87
                    .unwrap_or(Ok(InstanceDataSending::SendNothing))
×
88
            }
89
            _ => Ok(InstanceDataSending::SendNothing),
×
90
        }
91
    }
×
92
}
93

94
pub async fn send_instance_data(
×
95
    instance_data_sender: &InstanceDataSender,
×
96
    our_instance_data: Arc<EdgeInstanceData>,
×
97
    downstream_instance_data: Arc<RwLock<Vec<EdgeInstanceData>>>,
×
98
) -> Result<(), EdgeError> {
×
99
    let observed_data = our_instance_data.observe(
×
100
        &instance_data_sender.registry,
×
101
        downstream_instance_data.read().await.clone(),
×
102
        &instance_data_sender.base_path,
×
103
    );
×
104
    instance_data_sender
×
105
        .unleash_client
×
106
        .post_edge_observability_data(observed_data, &instance_data_sender.token)
×
107
        .await
×
108
}
×
109
pub async fn loop_send_instance_data(
×
110
    instance_data_sender: Arc<InstanceDataSending>,
×
111
    our_instance_data: Arc<EdgeInstanceData>,
×
112
    downstream_instance_data: Arc<RwLock<Vec<EdgeInstanceData>>>,
×
113
) {
×
114
    let mut errors = 0;
×
115
    let delay = std::time::Duration::from_secs(60);
×
116
    loop {
117
        tokio::time::sleep(std::time::Duration::from_secs(60) + delay * std::cmp::min(errors, 10))
×
118
            .await;
×
119
        match instance_data_sender.as_ref() {
×
120
            InstanceDataSending::SendNothing => {
121
                debug!("No instance data sender found. Doing nothing.");
×
122
                continue;
×
123
            }
124
            InstanceDataSending::SendInstanceData(instance_data_sender) => {
×
125
                let status = send_instance_data(
×
126
                    instance_data_sender,
×
127
                    our_instance_data.clone(),
×
128
                    downstream_instance_data.clone(),
×
129
                )
×
130
                .await;
×
131
                if let Err(e) = status {
×
132
                    match e {
×
133
                        EdgeError::EdgeMetricsRequestError(status, _) => {
×
134
                            if status == StatusCode::NOT_FOUND {
×
135
                                debug!(
×
136
                                    "Our upstream is not running a version that supports edge metrics."
×
137
                                );
138
                                errors += 1;
×
139
                                downstream_instance_data.write().await.clear();
×
140
                                our_instance_data.clear_time_windowed_metrics();
×
141
                            } else if status == StatusCode::FORBIDDEN {
×
142
                                warn!(
×
143
                                    "Upstream edge metrics said our token wasn't allowed to post data"
×
144
                                );
145
                                errors += 1;
×
146
                                downstream_instance_data.write().await.clear();
×
147
                                our_instance_data.clear_time_windowed_metrics();
×
148
                            }
×
149
                        }
150
                        _ => {
151
                            warn!("Failed to post instance data due to unknown error {e:?}");
×
152
                        }
153
                    }
154
                } else {
155
                    debug!("Successfully posted observability metrics.");
×
156
                    errors = 0;
×
157
                    downstream_instance_data.write().await.clear();
×
158
                    our_instance_data.clear_time_windowed_metrics();
×
159
                }
160
            }
161
        }
162
    }
163
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc