• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Unleash / unleash-edge / #1813

18 Mar 2025 03:24PM UTC coverage: 68.003% (+1.2%) from 66.841%
#1813

push

web-flow
feat: consumption metrics (#829)

137 of 175 new or added lines in 5 files covered. (78.29%)

1 existing line in 1 file now uncovered.

1934 of 2844 relevant lines covered (68.0%)

1.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/server/src/http/instance_data.rs
1
use chrono::Duration;
2
use reqwest::{StatusCode, Url};
3
use std::sync::Arc;
4
use tokio::sync::RwLock;
5

6
use crate::cli::{CliArgs, EdgeMode};
7
use crate::error::EdgeError;
8
use crate::http::unleash_client::{ClientMetaInformation, UnleashClient, new_reqwest_client};
9
use crate::metrics::edge_metrics::EdgeInstanceData;
10
use prometheus::Registry;
11
use tracing::{debug, warn};
12

13
#[derive(Debug, Clone)]
14
pub struct InstanceDataSender {
15
    pub unleash_client: Arc<UnleashClient>,
16
    pub registry: Registry,
17
    pub token: String,
18
    pub base_path: String,
19
}
20

21
#[derive(Debug, Clone)]
22
pub enum InstanceDataSending {
23
    SendNothing,
24
    SendInstanceData(InstanceDataSender),
25
}
26

27
impl InstanceDataSending {
28
    pub fn from_args(
×
29
        args: CliArgs,
30
        instance_data: Arc<EdgeInstanceData>,
31
        registry: Registry,
32
    ) -> Result<Self, EdgeError> {
33
        match args.mode {
×
34
            EdgeMode::Edge(edge_args) => {
×
35
                let identifier = instance_data.identifier.clone();
×
36
                edge_args
×
37
                    .tokens
38
                    .first()
39
                    .map(|token| {
×
40
                        let client_meta_information = ClientMetaInformation {
×
41
                            app_name: args.app_name,
×
42
                            instance_id: identifier.clone(),
×
43
                            connection_id: identifier,
×
44
                        };
45
                        let http_client = new_reqwest_client(
×
46
                            edge_args.skip_ssl_verification,
×
47
                            edge_args.client_identity.clone(),
×
48
                            edge_args.upstream_certificate_file.clone(),
×
49
                            Duration::seconds(edge_args.upstream_request_timeout),
×
50
                            Duration::seconds(edge_args.upstream_socket_timeout),
×
51
                            client_meta_information.clone(),
×
52
                        )
53
                        .expect(
54
                            "Could not construct reqwest client for posting observability data",
55
                        );
56
                        let unleash_client = Url::parse(&edge_args.upstream_url.clone())
×
57
                            .map(|url| {
×
58
                                UnleashClient::from_url(
×
59
                                    url,
×
60
                                    args.token_header.token_header.clone(),
×
61
                                    http_client,
×
62
                                    client_meta_information.clone(),
×
63
                                )
64
                            })
65
                            .map(|c| {
×
66
                                c.with_custom_client_headers(
×
67
                                    edge_args.custom_client_headers.clone(),
×
68
                                )
69
                            })
70
                            .map(Arc::new)
71
                            .map_err(|_| {
×
72
                                EdgeError::InvalidServerUrl(edge_args.upstream_url.clone())
×
73
                            })
74
                            .expect("Could not construct UnleashClient");
×
75
                        let instance_data_sender = InstanceDataSender {
×
76
                            unleash_client,
×
77
                            token: token.clone(),
×
78
                            base_path: args.http.base_path.clone(),
×
79
                            registry,
×
80
                        };
81
                        InstanceDataSending::SendInstanceData(instance_data_sender)
×
82
                    })
83
                    .map(Ok)
84
                    .unwrap_or(Ok(InstanceDataSending::SendNothing))
×
85
            }
86
            _ => Ok(InstanceDataSending::SendNothing),
×
87
        }
88
    }
89
}
90

91
pub async fn send_instance_data(
×
92
    instance_data_sender: &InstanceDataSender,
93
    our_instance_data: Arc<EdgeInstanceData>,
94
    downstream_instance_data: Arc<RwLock<Vec<EdgeInstanceData>>>,
95
) -> Result<(), EdgeError> {
96
    let observed_data = our_instance_data.observe(
×
97
        &instance_data_sender.registry,
×
98
        downstream_instance_data.read().await.clone(),
×
99
        &instance_data_sender.base_path,
×
100
    );
101
    instance_data_sender
×
102
        .unleash_client
103
        .post_edge_observability_data(observed_data, &instance_data_sender.token)
×
104
        .await
×
105
}
106
pub async fn loop_send_instance_data(
×
107
    instance_data_sender: Arc<InstanceDataSending>,
108
    our_instance_data: Arc<EdgeInstanceData>,
109
    downstream_instance_data: Arc<RwLock<Vec<EdgeInstanceData>>>,
110
) {
111
    let mut errors = 0;
×
112
    let delay = std::time::Duration::from_secs(60);
×
113
    loop {
×
114
        tokio::time::sleep(std::time::Duration::from_secs(60) + delay * std::cmp::min(errors, 10))
×
115
            .await;
×
116
        match instance_data_sender.as_ref() {
×
117
            InstanceDataSending::SendNothing => {
118
                debug!("No instance data sender found. Doing nothing.");
×
119
                continue;
120
            }
121
            InstanceDataSending::SendInstanceData(instance_data_sender) => {
×
122
                let status = send_instance_data(
123
                    instance_data_sender,
124
                    our_instance_data.clone(),
×
125
                    downstream_instance_data.clone(),
×
126
                )
127
                .await;
×
128
                if let Err(e) = status {
×
129
                    match e {
×
130
                        EdgeError::EdgeMetricsRequestError(status, _) => {
×
131
                            if status == StatusCode::NOT_FOUND {
×
132
                                debug!(
×
133
                                    "Our upstream is not running a version that supports edge metrics."
134
                                );
135
                                errors += 1;
×
136
                                downstream_instance_data.write().await.clear();
×
NEW
137
                                our_instance_data.clear_time_windowed_metrics();
×
138
                            } else if status == StatusCode::FORBIDDEN {
×
139
                                warn!(
×
140
                                    "Upstream edge metrics said our token wasn't allowed to post data"
141
                                );
142
                                errors += 1;
×
143
                                downstream_instance_data.write().await.clear();
×
NEW
144
                                our_instance_data.clear_time_windowed_metrics();
×
145
                            }
146
                        }
147
                        _ => {
148
                            warn!("Failed to post instance data due to unknown error {e:?}");
×
149
                        }
150
                    }
151
                } else {
152
                    debug!("Successfully posted observability metrics.");
×
153
                    errors = 0;
×
154
                    downstream_instance_data.write().await.clear();
×
NEW
155
                    our_instance_data.clear_time_windowed_metrics();
×
156
                }
157
            }
158
        }
159
    }
160
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc