• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Unleash / unleash-edge / #1670

27 Feb 2025 08:30AM UTC coverage: 66.679% (-0.2%) from 66.885%
#1670

push

web-flow
feat: app_name and instance_id added as label to metrics (#780)

* feat: app_name and instance_id added as label to metrics

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>

25 of 47 new or added lines in 2 files covered. (53.19%)

1 existing line in 1 file now uncovered.

1749 of 2623 relevant lines covered (66.68%)

1.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/server/src/http/instance_data.rs
1
use chrono::Duration;
2
use reqwest::{StatusCode, Url};
3
use std::sync::Arc;
4
use tokio::sync::RwLock;
5

6
use crate::cli::{CliArgs, EdgeMode};
7
use crate::error::EdgeError;
8
use crate::http::unleash_client::{new_reqwest_client, ClientMetaInformation, UnleashClient};
9
use crate::metrics::edge_metrics::EdgeInstanceData;
10
use prometheus::Registry;
11
use tracing::{debug, warn};
12

13
#[derive(Debug, Clone)]
14
pub struct InstanceDataSender {
15
    pub unleash_client: Arc<UnleashClient>,
16
    pub registry: Registry,
17
    pub token: String,
18
    pub base_path: String,
19
}
20

21
#[derive(Debug, Clone)]
22
pub enum InstanceDataSending {
23
    SendNothing,
24
    SendInstanceData(InstanceDataSender),
25
}
26

27
impl InstanceDataSending {
28
    pub fn from_args(
×
29
        args: CliArgs,
30
        instance_data: Arc<EdgeInstanceData>,
31
        registry: Registry,
32
    ) -> Result<Self, EdgeError> {
33
        match args.mode {
×
34
            EdgeMode::Edge(edge_args) => {
×
35
                let instance_id = instance_data.identifier.clone();
×
36
                edge_args
×
37
                    .tokens
38
                    .first()
39
                    .map(|token| {
×
40
                        let client_meta_information = ClientMetaInformation {
×
41
                            app_name: args.app_name,
×
42
                            instance_id,
×
43
                        };
44
                        let http_client = new_reqwest_client(
×
45
                            edge_args.skip_ssl_verification,
×
46
                            edge_args.client_identity.clone(),
×
47
                            edge_args.upstream_certificate_file.clone(),
×
48
                            Duration::seconds(edge_args.upstream_request_timeout),
×
49
                            Duration::seconds(edge_args.upstream_socket_timeout),
×
50
                            client_meta_information.clone(),
×
51
                        )
52
                        .expect(
53
                            "Could not construct reqwest client for posting observability data",
54
                        );
55
                        let unleash_client = Url::parse(&edge_args.upstream_url.clone())
×
56
                            .map(|url| {
×
57
                                UnleashClient::from_url(
×
58
                                    url,
×
59
                                    args.token_header.token_header.clone(),
×
60
                                    http_client,
×
NEW
61
                                    client_meta_information.clone(),
×
62
                                )
63
                            })
64
                            .map(|c| {
×
65
                                c.with_custom_client_headers(
×
66
                                    edge_args.custom_client_headers.clone(),
×
67
                                )
68
                            })
69
                            .map(Arc::new)
70
                            .map_err(|_| {
×
71
                                EdgeError::InvalidServerUrl(edge_args.upstream_url.clone())
×
72
                            })
73
                            .expect("Could not construct UnleashClient");
×
74
                        let instance_data_sender = InstanceDataSender {
×
75
                            unleash_client,
×
76
                            token: token.clone(),
×
77
                            base_path: args.http.base_path.clone(),
×
78
                            registry,
×
79
                        };
80
                        InstanceDataSending::SendInstanceData(instance_data_sender)
×
81
                    })
82
                    .map(Ok)
83
                    .unwrap_or(Ok(InstanceDataSending::SendNothing))
×
84
            }
85
            _ => Ok(InstanceDataSending::SendNothing),
×
86
        }
87
    }
88
}
89

90
pub async fn send_instance_data(
×
91
    instance_data_sender: &InstanceDataSender,
92
    our_instance_data: Arc<EdgeInstanceData>,
93
    downstream_instance_data: Arc<RwLock<Vec<EdgeInstanceData>>>,
94
) -> Result<(), EdgeError> {
95
    let observed_data = our_instance_data.observe(
×
96
        &instance_data_sender.registry,
×
97
        downstream_instance_data.read().await.clone(),
×
98
        &instance_data_sender.base_path,
×
99
    );
100
    instance_data_sender
×
101
        .unleash_client
102
        .post_edge_observability_data(observed_data, &instance_data_sender.token)
×
103
        .await
×
104
}
105
pub async fn loop_send_instance_data(
×
106
    instance_data_sender: Arc<InstanceDataSending>,
107
    our_instance_data: Arc<EdgeInstanceData>,
108
    downstream_instance_data: Arc<RwLock<Vec<EdgeInstanceData>>>,
109
) {
110
    let mut errors = 0;
×
111
    let delay = std::time::Duration::from_secs(60);
×
112
    loop {
×
113
        tokio::time::sleep(std::time::Duration::from_secs(60) + delay * std::cmp::min(errors, 10))
×
114
            .await;
×
115
        match instance_data_sender.as_ref() {
×
116
            InstanceDataSending::SendNothing => {
117
                debug!("No instance data sender found. Doing nothing.");
×
118
                return;
119
            }
120
            InstanceDataSending::SendInstanceData(instance_data_sender) => {
×
121
                let status = send_instance_data(
122
                    instance_data_sender,
123
                    our_instance_data.clone(),
×
124
                    downstream_instance_data.clone(),
×
125
                )
126
                .await;
×
127
                if let Err(e) = status {
×
128
                    match e {
×
129
                        EdgeError::EdgeMetricsRequestError(status, _) => {
×
130
                            if status == StatusCode::NOT_FOUND {
×
131
                                debug!("Our upstream is not running a version that supports edge metrics.");
×
132
                                errors += 1;
×
133
                                downstream_instance_data.write().await.clear();
×
134
                                our_instance_data.requests_since_last_report.clear();
×
135
                            } else if status == StatusCode::FORBIDDEN {
×
136
                                warn!("Upstream edge metrics said our token wasn't allowed to post data");
×
137
                                errors += 1;
×
138
                                downstream_instance_data.write().await.clear();
×
139
                                our_instance_data.requests_since_last_report.clear();
×
140
                            }
141
                        }
142
                        _ => {
143
                            warn!("Failed to post instance data due to unknown error {e:?}");
×
144
                        }
145
                    }
146
                } else {
147
                    debug!("Successfully posted observability metrics.");
×
148
                    errors = 0;
×
149
                    downstream_instance_data.write().await.clear();
×
150
                    our_instance_data.requests_since_last_report.clear();
×
151
                }
152
            }
153
        }
154
    }
155
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc