• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Unleash / unleash-edge / #1603

20 Feb 2025 01:15PM UTC coverage: 64.335% (-6.0%) from 70.33%
#1603

push

web-flow
feat: Add edge observability (#713)

get latency for own endpoints from prometheus
get latency for upstream endpoints from prometheus
get process stats from prometheus
instantiate on startup
---------

Co-authored-by: Nuno Góis <github@nunogois.com>

27 of 252 new or added lines in 6 files covered. (10.71%)

1 existing line in 1 file now uncovered.

1582 of 2459 relevant lines covered (64.34%)

1.57 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/server/src/http/instance_data.rs
1
use chrono::Duration;
2
use reqwest::{StatusCode, Url};
3
use std::sync::Arc;
4
use tokio::sync::RwLock;
5

6
use crate::cli::{CliArgs, EdgeMode};
7
use crate::error::EdgeError;
8
use crate::http::unleash_client::{new_reqwest_client, ClientMetaInformation, UnleashClient};
9
use crate::metrics::edge_metrics::EdgeInstanceData;
10
use prometheus::Registry;
11
use tracing::{debug, warn};
12

13
#[derive(Debug, Clone)]
14
pub struct InstanceDataSender {
15
    pub unleash_client: Arc<UnleashClient>,
16
    pub registry: Registry,
17
    pub token: String,
18
    pub base_path: String,
19
}
20

21
#[derive(Debug, Clone)]
22
pub enum InstanceDataSending {
23
    SendNothing,
24
    SendInstanceData(InstanceDataSender),
25
}
26

27
impl InstanceDataSending {
NEW
28
    pub fn from_args(
×
29
        args: CliArgs,
30
        instance_data: Arc<EdgeInstanceData>,
31
        registry: Registry,
32
    ) -> Result<Self, EdgeError> {
NEW
33
        match args.mode {
×
NEW
34
            EdgeMode::Edge(edge_args) => {
×
NEW
35
                let instance_id = instance_data.identifier.clone();
×
NEW
36
                edge_args
×
37
                    .tokens
38
                    .first()
NEW
39
                    .map(|token| {
×
NEW
40
                        let client_meta_information = ClientMetaInformation {
×
NEW
41
                            app_name: args.app_name,
×
NEW
42
                            instance_id,
×
43
                        };
NEW
44
                        let http_client = new_reqwest_client(
×
NEW
45
                            edge_args.skip_ssl_verification,
×
NEW
46
                            edge_args.client_identity.clone(),
×
NEW
47
                            edge_args.upstream_certificate_file.clone(),
×
NEW
48
                            Duration::seconds(edge_args.upstream_request_timeout),
×
NEW
49
                            Duration::seconds(edge_args.upstream_socket_timeout),
×
NEW
50
                            client_meta_information.clone(),
×
51
                        )
52
                        .expect(
53
                            "Could not construct reqwest client for posting observability data",
54
                        );
NEW
55
                        let unleash_client = Url::parse(&edge_args.upstream_url.clone())
×
NEW
56
                            .map(|url| {
×
NEW
57
                                UnleashClient::from_url(
×
NEW
58
                                    url,
×
NEW
59
                                    args.token_header.token_header.clone(),
×
NEW
60
                                    http_client,
×
61
                                )
62
                            })
NEW
63
                            .map(|c| {
×
NEW
64
                                c.with_custom_client_headers(
×
NEW
65
                                    edge_args.custom_client_headers.clone(),
×
66
                                )
67
                            })
68
                            .map(Arc::new)
NEW
69
                            .map_err(|_| {
×
NEW
70
                                EdgeError::InvalidServerUrl(edge_args.upstream_url.clone())
×
71
                            })
NEW
72
                            .expect("Could not construct UnleashClient");
×
NEW
73
                        let instance_data_sender = InstanceDataSender {
×
NEW
74
                            unleash_client,
×
NEW
75
                            token: token.clone(),
×
NEW
76
                            base_path: args.http.base_path.clone(),
×
NEW
77
                            registry,
×
78
                        };
NEW
79
                        InstanceDataSending::SendInstanceData(instance_data_sender)
×
80
                    })
81
                    .map(Ok)
NEW
82
                    .unwrap_or(Ok(InstanceDataSending::SendNothing))
×
83
            }
NEW
84
            _ => Ok(InstanceDataSending::SendNothing),
×
85
        }
86
    }
87
}
88

NEW
89
pub async fn send_instance_data(
×
90
    instance_data_sender: &InstanceDataSender,
91
    our_instance_data: Arc<EdgeInstanceData>,
92
    downstream_instance_data: Arc<RwLock<Vec<EdgeInstanceData>>>,
93
) -> Result<(), EdgeError> {
NEW
94
    let observed_data = our_instance_data.observe(
×
NEW
95
        &instance_data_sender.registry,
×
NEW
96
        downstream_instance_data.read().await.clone(),
×
NEW
97
        &instance_data_sender.base_path,
×
98
    );
NEW
99
    instance_data_sender
×
100
        .unleash_client
NEW
101
        .post_edge_observability_data(observed_data, &instance_data_sender.token)
×
NEW
102
        .await
×
103
}
NEW
104
pub async fn loop_send_instance_data(
×
105
    instance_data_sender: Arc<InstanceDataSending>,
106
    our_instance_data: Arc<EdgeInstanceData>,
107
    downstream_instance_data: Arc<RwLock<Vec<EdgeInstanceData>>>,
108
) {
NEW
109
    let mut errors = 0;
×
NEW
110
    let delay = std::time::Duration::from_secs(60);
×
NEW
111
    loop {
×
NEW
112
        tokio::time::sleep(std::time::Duration::from_secs(60) + delay * std::cmp::min(errors, 10))
×
NEW
113
            .await;
×
NEW
114
        match instance_data_sender.as_ref() {
×
115
            InstanceDataSending::SendNothing => {
NEW
116
                debug!("No instance data sender found. Doing nothing.");
×
117
                return;
118
            }
NEW
119
            InstanceDataSending::SendInstanceData(instance_data_sender) => {
×
120
                let status = send_instance_data(
121
                    instance_data_sender,
NEW
122
                    our_instance_data.clone(),
×
NEW
123
                    downstream_instance_data.clone(),
×
124
                )
NEW
125
                .await;
×
NEW
126
                if let Err(e) = status {
×
NEW
127
                    match e {
×
NEW
128
                        EdgeError::EdgeMetricsRequestError(status, _) => {
×
NEW
129
                            if status == StatusCode::NOT_FOUND {
×
NEW
130
                                debug!("Our upstream is not running a version that supports edge metrics.");
×
NEW
131
                                errors += 1;
×
NEW
132
                                downstream_instance_data.write().await.clear();
×
NEW
133
                            } else if status == StatusCode::FORBIDDEN {
×
NEW
134
                                warn!("Upstream edge metrics said our token wasn't allowed to post data");
×
NEW
135
                                errors += 1;
×
NEW
136
                                downstream_instance_data.write().await.clear();
×
137
                            }
138
                        }
139
                        _ => {
NEW
140
                            warn!("Failed to post instance data due to unknown error {e:?}");
×
141
                        }
142
                    }
143
                } else {
NEW
144
                    debug!("Successfully posted observability metrics.");
×
NEW
145
                    errors = 0;
×
NEW
146
                    downstream_instance_data.write().await.clear();
×
147
                }
148
            }
149
        }
150
    }
151
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc