• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Unleash / unleash-edge / #1670

27 Feb 2025 08:30AM UTC coverage: 66.679% (-0.2%) from 66.885%
#1670

push

web-flow
feat: app_name and instance_id added as label to metrics (#780)

* feat: app_name and instance_id added as label to metrics

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>

25 of 47 new or added lines in 2 files covered. (53.19%)

1 existing line in 1 file now uncovered.

1749 of 2623 relevant lines covered (66.68%)

1.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.71
/server/src/http/unleash_client.rs
1
use std::collections::HashMap;
2
use std::fs;
3
use std::path::PathBuf;
4
use std::str::FromStr;
5

6
use actix_web::http::header::EntityTag;
7
use chrono::Duration;
8
use chrono::Utc;
9
use lazy_static::lazy_static;
10
use prometheus::{register_histogram_vec, register_int_gauge_vec, HistogramVec, IntGaugeVec, Opts};
11
use reqwest::header::{HeaderMap, HeaderName};
12
use reqwest::{header, Client};
13
use reqwest::{ClientBuilder, Identity, RequestBuilder, StatusCode, Url};
14
use serde::{Deserialize, Serialize};
15
use tracing::{debug, error};
16
use tracing::{info, trace, warn};
17
use unleash_types::client_features::{ClientFeatures, ClientFeaturesDelta};
18
use unleash_types::client_metrics::ClientApplication;
19

20
use crate::cli::ClientIdentity;
21
use crate::error::EdgeError::EdgeMetricsRequestError;
22
use crate::error::{CertificateError, FeatureError};
23
use crate::http::headers::{
24
    UNLEASH_APPNAME_HEADER, UNLEASH_CLIENT_SPEC_HEADER, UNLEASH_INSTANCE_ID_HEADER,
25
};
26
use crate::metrics::client_metrics::MetricsBatch;
27
use crate::metrics::edge_metrics::EdgeInstanceData;
28
use crate::tls::build_upstream_certificate;
29
use crate::types::{
30
    ClientFeaturesDeltaResponse, ClientFeaturesResponse, EdgeResult, EdgeToken,
31
    TokenValidationStatus, ValidateTokensRequest,
32
};
33
use crate::urls::UnleashUrls;
34
use crate::{error::EdgeError, types::ClientFeaturesRequest};
35

36
lazy_static! {
37
    pub static ref CLIENT_REGISTER_FAILURES: IntGaugeVec = register_int_gauge_vec!(
4✔
38
        Opts::new(
1✔
39
            "client_register_failures",
40
            "Why we failed to register upstream"
41
        ),
42
        &["status_code", "app_name", "instance_id"]
43
    )
44
    .unwrap();
45
    pub static ref CLIENT_FEATURE_FETCH: HistogramVec = register_histogram_vec!(
4✔
46
        "client_feature_fetch",
47
        "Timings for fetching features in milliseconds",
48
        &["status_code", "app_name", "instance_id"],
49
        vec![1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 5000.0]
2✔
50
    )
51
    .unwrap();
52
    pub static ref CLIENT_FEATURE_DELTA_FETCH: HistogramVec = register_histogram_vec!(
4✔
53
        "client_feature_delta_fetch",
54
        "Timings for fetching feature deltas in milliseconds",
55
        &["status_code", "app_name", "instance_id"],
56
        vec![1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 5000.0]
2✔
57
    )
58
    .unwrap();
59
    pub static ref METRICS_UPLOAD: HistogramVec = register_histogram_vec!(
4✔
60
        "client_metrics_upload",
61
        "Timings for uploading client metrics in milliseconds",
62
        &["status_code", "app_name", "instance_id"],
63
        vec![1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0]
2✔
64
    )
65
    .unwrap();
66
    pub static ref INSTANCE_DATA_UPLOAD: HistogramVec = register_histogram_vec!(
4✔
67
        "instance_data_upload",
68
        "Timings for uploading Edge instance data in milliseconds",
69
        &["status_code", "app_name", "instance_id"],
70
        vec![1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0]
2✔
71
    )
72
    .unwrap();
73
    pub static ref CLIENT_FEATURE_FETCH_FAILURES: IntGaugeVec = register_int_gauge_vec!(
4✔
74
        Opts::new(
1✔
75
            "client_feature_fetch_failures",
76
            "Why we failed to fetch features"
77
        ),
78
        &["status_code", "app_name", "instance_id"]
79
    )
80
    .unwrap();
81
    pub static ref TOKEN_VALIDATION_FAILURES: IntGaugeVec = register_int_gauge_vec!(
4✔
82
        Opts::new(
1✔
83
            "token_validation_failures",
84
            "Why we failed to validate tokens"
85
        ),
86
        &["status_code", "app_name", "instance_id"]
87
    )
88
    .unwrap();
89
    pub static ref UPSTREAM_VERSION: IntGaugeVec = register_int_gauge_vec!(
4✔
90
        Opts::new(
1✔
91
            "upstream_version",
92
            "The server type (Unleash or Edge) and version of the upstream we're connected to"
93
        ),
94
        &["server", "version", "app_name", "instance_id"]
95
    )
96
    .unwrap();
97
}
98

99
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
100
pub struct ClientMetaInformation {
101
    pub app_name: String,
102
    pub instance_id: String,
103
}
104

105
impl Default for ClientMetaInformation {
106
    fn default() -> Self {
2✔
107
        Self {
108
            app_name: "unleash-edge".into(),
2✔
109
            instance_id: format!("unleash-edge@{}", ulid::Ulid::new().to_string()),
4✔
110
        }
111
    }
112
}
113

114
impl ClientMetaInformation {
115
    pub fn test_config() -> Self {
1✔
116
        Self {
117
            app_name: "test-app-name".into(),
1✔
118
            instance_id: "test-instance-id".into(),
1✔
119
        }
120
    }
121
}
122

123
#[derive(Clone, Debug, Default)]
124
pub struct UnleashClient {
125
    pub urls: UnleashUrls,
126
    backing_client: Client,
127
    custom_headers: HashMap<String, String>,
128
    token_header: String,
129
    meta_info: ClientMetaInformation,
130
}
131

132
fn load_pkcs12(id: &ClientIdentity) -> EdgeResult<Identity> {
1✔
133
    let pfx = fs::read(id.pkcs12_identity_file.clone().unwrap()).map_err(|e| {
2✔
134
        EdgeError::ClientCertificateError(CertificateError::Pkcs12ArchiveNotFound(format!("{e:?}")))
×
135
    })?;
136
    Identity::from_pkcs12_der(
137
        &pfx,
1✔
138
        &id.pkcs12_passphrase.clone().unwrap_or_else(|| "".into()),
1✔
139
    )
140
    .map_err(|e| {
1✔
141
        EdgeError::ClientCertificateError(CertificateError::Pkcs12IdentityGeneration(format!(
3✔
142
            "{e:?}"
143
        )))
144
    })
145
}
146

147
fn load_pkcs8(id: &ClientIdentity) -> EdgeResult<Identity> {
1✔
148
    let cert = fs::read(id.pkcs8_client_certificate_file.clone().unwrap()).map_err(|e| {
2✔
149
        EdgeError::ClientCertificateError(CertificateError::Pem8ClientCertNotFound(format!("{e:}")))
×
150
    })?;
151
    let key = fs::read(id.pkcs8_client_key_file.clone().unwrap()).map_err(|e| {
2✔
152
        EdgeError::ClientCertificateError(CertificateError::Pem8ClientKeyNotFound(format!("{e:?}")))
×
153
    })?;
154
    Identity::from_pkcs8_pem(&cert, &key).map_err(|e| {
2✔
155
        EdgeError::ClientCertificateError(CertificateError::Pem8IdentityGeneration(format!(
×
156
            "{e:?}"
157
        )))
158
    })
159
}
160

161
fn build_identity(tls: Option<ClientIdentity>) -> EdgeResult<ClientBuilder> {
3✔
162
    tls.map_or_else(
1✔
163
        || Ok(ClientBuilder::new()),
4✔
164
        |tls| {
1✔
165
            let req_identity = if tls.pkcs12_identity_file.is_some() {
2✔
166
                // We're going to assume that we're using pkcs#12
167
                load_pkcs12(&tls)
2✔
168
            } else if tls.pkcs8_client_certificate_file.is_some() {
2✔
169
                load_pkcs8(&tls)
2✔
170
            } else {
171
                Err(EdgeError::ClientCertificateError(
×
172
                    CertificateError::NoCertificateFiles,
×
173
                ))
174
            };
175
            req_identity.map(|id| ClientBuilder::new().identity(id))
3✔
176
        },
177
    )
178
}
179

180
pub fn new_reqwest_client(
3✔
181
    skip_ssl_verification: bool,
182
    client_identity: Option<ClientIdentity>,
183
    upstream_certificate_file: Option<PathBuf>,
184
    connect_timeout: Duration,
185
    socket_timeout: Duration,
186
    client_meta_information: ClientMetaInformation,
187
) -> EdgeResult<Client> {
188
    build_identity(client_identity)
4✔
189
        .and_then(|builder| {
5✔
190
            build_upstream_certificate(upstream_certificate_file).map(|cert| match cert {
8✔
191
                Some(c) => builder.add_root_certificate(c),
×
192
                None => builder,
2✔
193
            })
194
        })
195
        .and_then(|client| {
4✔
196
            let mut header_map = HeaderMap::new();
2✔
197
            header_map.insert(
4✔
198
                UNLEASH_APPNAME_HEADER,
199
                header::HeaderValue::from_str(&client_meta_information.app_name)
4✔
200
                    .expect("Could not add app name as a header"),
201
            );
202
            header_map.insert(
2✔
203
                UNLEASH_INSTANCE_ID_HEADER,
204
                header::HeaderValue::from_str(&client_meta_information.instance_id).unwrap(),
2✔
205
            );
206
            header_map.insert(
2✔
207
                UNLEASH_CLIENT_SPEC_HEADER,
208
                header::HeaderValue::from_static(unleash_yggdrasil::SUPPORTED_SPEC_VERSION),
1✔
209
            );
210

211
            client
14✔
212
                .user_agent(format!("unleash-edge-{}", crate::types::build::PKG_VERSION))
4✔
213
                .default_headers(header_map)
2✔
214
                .danger_accept_invalid_certs(skip_ssl_verification)
3✔
215
                .timeout(socket_timeout.to_std().unwrap())
6✔
216
                .connect_timeout(connect_timeout.to_std().unwrap())
2✔
217
                .build()
218
                .map_err(|e| EdgeError::ClientBuildError(format!("{e:?}")))
×
219
        })
220
}
221

222
#[derive(Clone, Debug, Serialize, Deserialize)]
223
pub struct EdgeTokens {
224
    pub tokens: Vec<EdgeToken>,
225
}
226

227
impl UnleashClient {
228
    pub fn from_url(
1✔
229
        server_url: Url,
230
        token_header: String,
231
        backing_client: Client,
232
        client_meta_information: ClientMetaInformation,
233
    ) -> Self {
234
        Self {
235
            urls: UnleashUrls::from_base_url(server_url),
1✔
236
            backing_client,
237
            custom_headers: Default::default(),
1✔
238
            token_header,
239
            meta_info: client_meta_information,
240
        }
241
    }
242

243
    pub fn new(server_url: &str, instance_id_opt: Option<String>) -> Result<Self, EdgeError> {
1✔
244
        use ulid::Ulid;
245

246
        let instance_id = instance_id_opt.unwrap_or_else(|| Ulid::new().to_string());
3✔
247
        let client_meta_info = ClientMetaInformation {
248
            instance_id,
249
            app_name: "test-client".into(),
1✔
250
        };
251
        Ok(Self {
3✔
252
            urls: UnleashUrls::from_str(server_url)?,
3✔
253
            backing_client: new_reqwest_client(
4✔
254
                false,
255
                None,
2✔
256
                None,
2✔
257
                Duration::seconds(5),
2✔
258
                Duration::seconds(5),
2✔
259
                client_meta_info.clone(),
2✔
260
            )
261
            .unwrap(),
262
            custom_headers: Default::default(),
1✔
263
            token_header: "Authorization".to_string(),
3✔
264
            meta_info: client_meta_info.clone(),
1✔
265
        })
266
    }
267

268
    #[cfg(test)]
269
    pub fn new_insecure(server_url: &str) -> Result<Self, EdgeError> {
1✔
270
        Ok(Self {
2✔
271
            urls: UnleashUrls::from_str(server_url)?,
1✔
272
            backing_client: new_reqwest_client(
2✔
273
                true,
274
                None,
1✔
275
                None,
1✔
276
                Duration::seconds(5),
1✔
277
                Duration::seconds(5),
1✔
278
                ClientMetaInformation::test_config(),
1✔
279
            )
280
            .unwrap(),
281
            custom_headers: Default::default(),
1✔
282
            token_header: "Authorization".to_string(),
1✔
283
            meta_info: ClientMetaInformation::test_config(),
1✔
284
        })
285
    }
286

287
    fn client_features_req(&self, req: ClientFeaturesRequest) -> RequestBuilder {
1✔
288
        let client_req = self
3✔
289
            .backing_client
290
            .get(self.urls.client_features_url.to_string())
2✔
291
            .headers(self.header_map(Some(req.api_key)));
2✔
292
        if let Some(tag) = req.etag {
2✔
293
            client_req.header(header::IF_NONE_MATCH, tag.to_string())
1✔
294
        } else {
295
            client_req
1✔
296
        }
297
    }
298

299
    fn client_features_delta_req(&self, req: ClientFeaturesRequest) -> RequestBuilder {
2✔
300
        let client_req = self
6✔
301
            .backing_client
302
            .get(self.urls.client_features_delta_url.to_string())
4✔
303
            .headers(self.header_map(Some(req.api_key)));
4✔
304
        if let Some(tag) = req.etag {
2✔
305
            client_req.header(header::IF_NONE_MATCH, tag.to_string())
1✔
306
        } else {
307
            client_req
2✔
308
        }
309
    }
310

311
    fn header_map(&self, api_key: Option<String>) -> HeaderMap {
1✔
312
        let mut header_map = HeaderMap::new();
1✔
313
        let token_header: HeaderName = HeaderName::from_str(self.token_header.as_str()).unwrap();
3✔
314
        if let Some(key) = api_key {
1✔
315
            header_map.insert(token_header, key.parse().unwrap());
2✔
316
        }
317
        for (header_name, header_value) in self.custom_headers.iter() {
3✔
318
            let key = HeaderName::from_str(header_name.as_str()).unwrap();
2✔
319
            header_map.insert(key, header_value.parse().unwrap());
2✔
320
        }
321
        header_map
1✔
322
    }
323

324
    pub fn with_custom_client_headers(self, custom_headers: Vec<(String, String)>) -> Self {
1✔
325
        Self {
326
            custom_headers: custom_headers.iter().cloned().collect(),
2✔
327
            ..self
328
        }
329
    }
330

331
    pub async fn register_as_client(
1✔
332
        &self,
333
        api_key: String,
334
        application: ClientApplication,
335
    ) -> EdgeResult<()> {
336
        self.backing_client
9✔
337
            .post(self.urls.client_register_app_url.to_string())
2✔
338
            .headers(self.header_map(Some(api_key)))
2✔
339
            .json(&application)
1✔
340
            .send()
341
            .await
4✔
342
            .map_err(|e| {
1✔
343
                warn!("Failed to register client: {e:?}");
5✔
344
                EdgeError::ClientRegisterError
1✔
345
            })
346
            .map(|r| {
2✔
347
                if !r.status().is_success() {
2✔
348
                    CLIENT_REGISTER_FAILURES
2✔
349
                        .with_label_values(&[
1✔
350
                            r.status().as_str(),
1✔
351
                            &self.meta_info.app_name,
1✔
352
                            &self.meta_info.instance_id,
1✔
353
                        ])
UNCOV
354
                        .inc();
×
355
                    warn!(
6✔
356
                        "Failed to register client upstream with status code {}",
357
                        r.status()
1✔
358
                    );
359
                }
360
            })
361
    }
362

363
    pub async fn get_client_features(
1✔
364
        &self,
365
        request: ClientFeaturesRequest,
366
    ) -> EdgeResult<ClientFeaturesResponse> {
367
        let start_time = Utc::now();
1✔
368
        let response = self
7✔
369
            .client_features_req(request.clone())
1✔
370
            .send()
371
            .await
4✔
372
            .map_err(|e| {
1✔
373
                warn!("Failed to fetch. Due to [{e:?}] - Will retry");
5✔
374
                match e.status() {
2✔
375
                    Some(s) => EdgeError::ClientFeaturesFetchError(FeatureError::Retriable(s)),
×
376
                    None => EdgeError::ClientFeaturesFetchError(FeatureError::NotFound),
1✔
377
                }
378
            })?;
379
        let stop_time = Utc::now();
1✔
380
        CLIENT_FEATURE_FETCH
3✔
381
            .with_label_values(&[
1✔
382
                &response.status().as_u16().to_string(),
1✔
383
                &self.meta_info.app_name,
1✔
384
                &self.meta_info.instance_id,
1✔
385
            ])
386
            .observe(
387
                stop_time
2✔
388
                    .signed_duration_since(start_time)
1✔
389
                    .num_milliseconds() as f64,
390
            );
391
        if response.status() == StatusCode::NOT_MODIFIED {
2✔
392
            Ok(ClientFeaturesResponse::NoUpdate(
1✔
393
                request.etag.expect("Got NOT_MODIFIED without an ETag"),
1✔
394
            ))
395
        } else if response.status().is_success() {
3✔
396
            let etag = response
3✔
397
                .headers()
398
                .get("ETag")
399
                .or_else(|| response.headers().get("etag"))
3✔
400
                .and_then(|etag| EntityTag::from_str(etag.to_str().unwrap()).ok());
3✔
401
            let features = response.json::<ClientFeatures>().await.map_err(|e| {
3✔
402
                warn!("Could not parse features response to internal representation");
×
403
                EdgeError::ClientFeaturesParseError(e.to_string())
×
404
            })?;
405
            Ok(ClientFeaturesResponse::Updated(features, etag))
1✔
406
        } else if response.status() == StatusCode::FORBIDDEN {
3✔
407
            CLIENT_FEATURE_FETCH_FAILURES
2✔
408
                .with_label_values(&[
1✔
409
                    response.status().as_str(),
1✔
410
                    &self.meta_info.app_name,
1✔
411
                    &self.meta_info.instance_id,
1✔
412
                ])
413
                .inc();
414
            Err(EdgeError::ClientFeaturesFetchError(
1✔
415
                FeatureError::AccessDenied,
1✔
416
            ))
417
        } else if response.status() == StatusCode::UNAUTHORIZED {
×
418
            CLIENT_FEATURE_FETCH_FAILURES
×
NEW
419
                .with_label_values(&[
×
NEW
420
                    response.status().as_str(),
×
NEW
421
                    &self.meta_info.app_name,
×
NEW
422
                    &self.meta_info.instance_id,
×
423
                ])
424
                .inc();
425
            warn!(
×
426
                "Failed to get features. Url: [{}]. Status code: [401]",
427
                self.urls.client_features_url.to_string()
428
            );
429
            Err(EdgeError::ClientFeaturesFetchError(
×
430
                FeatureError::AccessDenied,
×
431
            ))
432
        } else if response.status() == StatusCode::NOT_FOUND {
×
433
            CLIENT_FEATURE_FETCH_FAILURES
×
NEW
434
                .with_label_values(&[
×
NEW
435
                    response.status().as_str(),
×
NEW
436
                    &self.meta_info.app_name,
×
NEW
437
                    &self.meta_info.instance_id,
×
438
                ])
439
                .inc();
440
            warn!(
×
441
                "Failed to get features. Url: [{}]. Status code: [{}]",
442
                self.urls.client_features_url.to_string(),
443
                response.status().as_str()
444
            );
445
            Err(EdgeError::ClientFeaturesFetchError(FeatureError::NotFound))
×
446
        } else {
447
            CLIENT_FEATURE_FETCH_FAILURES
×
NEW
448
                .with_label_values(&[
×
NEW
449
                    response.status().as_str(),
×
NEW
450
                    &self.meta_info.app_name,
×
NEW
451
                    &self.meta_info.instance_id,
×
452
                ])
453
                .inc();
454
            Err(EdgeError::ClientFeaturesFetchError(
×
455
                FeatureError::Retriable(response.status()),
×
456
            ))
457
        }
458
    }
459

460
    pub async fn get_client_features_delta(
2✔
461
        &self,
462
        request: ClientFeaturesRequest,
463
    ) -> EdgeResult<ClientFeaturesDeltaResponse> {
464
        let start_time = Utc::now();
2✔
465
        let response = self
9✔
466
            .client_features_delta_req(request.clone())
2✔
467
            .send()
468
            .await
4✔
469
            .map_err(|e| {
1✔
470
                warn!("Failed to fetch. Due to [{e:?}] - Will retry");
5✔
471
                match e.status() {
2✔
472
                    Some(s) => EdgeError::ClientFeaturesFetchError(FeatureError::Retriable(s)),
×
473
                    None => EdgeError::ClientFeaturesFetchError(FeatureError::NotFound),
1✔
474
                }
475
            })?;
476
        let stop_time = Utc::now();
1✔
477
        CLIENT_FEATURE_DELTA_FETCH
3✔
478
            .with_label_values(&[
1✔
479
                &response.status().as_u16().to_string(),
1✔
480
                &self.meta_info.app_name,
1✔
481
                &self.meta_info.instance_id,
1✔
482
            ])
483
            .observe(
484
                stop_time
2✔
485
                    .signed_duration_since(start_time)
1✔
486
                    .num_milliseconds() as f64,
487
            );
488
        if response.status() == StatusCode::NOT_MODIFIED {
1✔
489
            Ok(ClientFeaturesDeltaResponse::NoUpdate(
×
490
                request.etag.expect("Got NOT_MODIFIED without an ETag"),
×
491
            ))
492
        } else if response.status().is_success() {
3✔
493
            let etag = response
3✔
494
                .headers()
495
                .get("ETag")
496
                .or_else(|| response.headers().get("etag"))
1✔
497
                .and_then(|etag| EntityTag::from_str(etag.to_str().unwrap()).ok());
3✔
498
            let features = response.json::<ClientFeaturesDelta>().await.map_err(|e| {
2✔
499
                warn!("Could not parse features response to internal representation");
×
500
                EdgeError::ClientFeaturesParseError(e.to_string())
×
501
            })?;
502
            Ok(ClientFeaturesDeltaResponse::Updated(features, etag))
1✔
503
        } else if response.status() == StatusCode::FORBIDDEN {
×
504
            CLIENT_FEATURE_FETCH_FAILURES
×
NEW
505
                .with_label_values(&[response.status().as_str(), &self.meta_info.app_name])
×
506
                .inc();
507
            Err(EdgeError::ClientFeaturesFetchError(
×
508
                FeatureError::AccessDenied,
×
509
            ))
510
        } else if response.status() == StatusCode::UNAUTHORIZED {
×
511
            CLIENT_FEATURE_FETCH_FAILURES
×
512
                .with_label_values(&[response.status().as_str()])
×
513
                .inc();
514
            warn!(
×
515
                "Failed to get features. Url: [{}]. Status code: [401]",
516
                self.urls.client_features_delta_url.to_string()
517
            );
518
            Err(EdgeError::ClientFeaturesFetchError(
×
519
                FeatureError::AccessDenied,
×
520
            ))
521
        } else if response.status() == StatusCode::NOT_FOUND {
×
522
            CLIENT_FEATURE_FETCH_FAILURES
×
523
                .with_label_values(&[response.status().as_str()])
×
524
                .inc();
525
            warn!(
×
526
                "Failed to get features. Url: [{}]. Status code: [{}]",
527
                self.urls.client_features_delta_url.to_string(),
528
                response.status().as_str()
529
            );
530
            Err(EdgeError::ClientFeaturesFetchError(FeatureError::NotFound))
×
531
        } else {
532
            CLIENT_FEATURE_FETCH_FAILURES
×
533
                .with_label_values(&[response.status().as_str()])
×
534
                .inc();
535
            Err(EdgeError::ClientFeaturesFetchError(
×
536
                FeatureError::Retriable(response.status()),
×
537
            ))
538
        }
539
    }
540

541
    pub async fn send_batch_metrics(&self, request: MetricsBatch) -> EdgeResult<()> {
×
542
        trace!("Sending metrics to old /edge/metrics endpoint");
×
543
        let result = self
×
544
            .backing_client
545
            .post(self.urls.edge_metrics_url.to_string())
×
546
            .headers(self.header_map(None))
×
547
            .json(&request)
×
548
            .send()
549
            .await
×
550
            .map_err(|e| {
×
551
                info!("Failed to send batch metrics: {e:?}");
×
552
                EdgeError::EdgeMetricsError
×
553
            })?;
554
        if result.status().is_success() {
×
555
            Ok(())
×
556
        } else {
557
            match result.status() {
×
558
                StatusCode::BAD_REQUEST => Err(EdgeError::EdgeMetricsRequestError(
×
559
                    result.status(),
×
560
                    result.json().await.ok(),
×
561
                )),
562
                _ => Err(EdgeMetricsRequestError(result.status(), None)),
×
563
            }
564
        }
565
    }
566

567
    pub async fn send_bulk_metrics_to_client_endpoint(
1✔
568
        &self,
569
        request: MetricsBatch,
570
        token: &str,
571
    ) -> EdgeResult<()> {
572
        trace!("Sending metrics to bulk endpoint");
5✔
573
        let started_at = Utc::now();
1✔
574
        let result = self
8✔
575
            .backing_client
576
            .post(self.urls.client_bulk_metrics_url.to_string())
2✔
577
            .headers(self.header_map(Some(token.to_string())))
2✔
578
            .json(&request)
1✔
579
            .send()
580
            .await
4✔
581
            .map_err(|e| {
×
582
                info!("Failed to send metrics to /api/client/metrics/bulk endpoint {e:?}");
×
583
                EdgeError::EdgeMetricsError
×
584
            })?;
585
        let ended = Utc::now();
1✔
586
        METRICS_UPLOAD
3✔
587
            .with_label_values(&[
1✔
588
                result.status().as_str(),
1✔
589
                &self.meta_info.app_name,
1✔
590
                &self.meta_info.instance_id,
1✔
591
            ])
592
            .observe(ended.signed_duration_since(started_at).num_milliseconds() as f64);
2✔
593
        if result.status().is_success() {
2✔
594
            Ok(())
1✔
595
        } else {
596
            match result.status() {
2✔
597
                StatusCode::BAD_REQUEST => Err(EdgeMetricsRequestError(
×
598
                    result.status(),
×
599
                    result.json().await.ok(),
×
600
                )),
601
                _ => Err(EdgeMetricsRequestError(result.status(), None)),
2✔
602
            }
603
        }
604
    }
605

606
    #[tracing::instrument(skip(self, instance_data, token))]
607
    pub async fn post_edge_observability_data(
608
        &self,
609
        instance_data: EdgeInstanceData,
610
        token: &str,
611
    ) -> EdgeResult<()> {
612
        let started_at = Utc::now();
×
613
        let result = self
×
614
            .backing_client
615
            .post(self.urls.edge_instance_data_url.to_string())
×
616
            .headers(self.header_map(Some(token.into())))
×
617
            .timeout(Duration::seconds(3).to_std().unwrap())
×
618
            .json(&instance_data)
×
619
            .send()
620
            .await
×
621
            .map_err(|e| {
×
622
                info!("Failed to send instance data: {e:?}");
×
623
                EdgeError::EdgeMetricsError
×
624
            })?;
625
        let ended_at = Utc::now();
×
626
        INSTANCE_DATA_UPLOAD
×
NEW
627
            .with_label_values(&[
×
NEW
628
                result.status().as_str(),
×
NEW
629
                &self.meta_info.app_name,
×
NEW
630
                &self.meta_info.instance_id,
×
631
            ])
632
            .observe(
633
                ended_at
×
634
                    .signed_duration_since(started_at)
×
635
                    .num_milliseconds() as f64,
636
            );
637
        let r = if result.status().is_success() {
×
638
            Ok(())
×
639
        } else {
640
            match result.status() {
×
641
                StatusCode::BAD_REQUEST => Err(EdgeMetricsRequestError(
×
642
                    result.status(),
×
643
                    result.json().await.ok(),
×
644
                )),
645
                _ => Err(EdgeMetricsRequestError(result.status(), None)),
×
646
            }
647
        };
648
        debug!("Sent instance data to upstream server");
×
649
        r
×
650
    }
651

652
    pub async fn validate_tokens(
1✔
653
        &self,
654
        request: ValidateTokensRequest,
655
    ) -> EdgeResult<Vec<EdgeToken>> {
656
        let check_api_suffix = || {
1✔
657
            let base_url = self.urls.base_url.to_string();
×
658
            if base_url.ends_with("/api") || base_url.ends_with("/api/") {
×
659
                error!("Try passing the instance URL without '/api'.");
×
660
            }
661
        };
662

663
        let result = self
9✔
664
            .backing_client
665
            .post(self.urls.edge_validate_url.to_string())
2✔
666
            .headers(self.header_map(None))
2✔
667
            .json(&request)
1✔
668
            .send()
669
            .await
4✔
670
            .map_err(|e| {
1✔
671
                info!("Failed to validate tokens: [{e:?}]");
5✔
672
                EdgeError::EdgeTokenError
1✔
673
            })?;
674
        match result.status() {
2✔
675
            StatusCode::OK => {
676
                let token_response = result.json::<EdgeTokens>().await.map_err(|e| {
2✔
677
                    warn!("Failed to parse validation response with error: {e:?}");
×
678
                    EdgeError::EdgeTokenParseError
×
679
                })?;
680
                Ok(token_response
2✔
681
                    .tokens
682
                    .into_iter()
683
                    .map(|t| {
1✔
684
                        let remaining_info =
1✔
685
                            EdgeToken::try_from(t.token.clone()).unwrap_or_else(|_| t.clone());
2✔
686
                        EdgeToken {
1✔
687
                            token: t.token.clone(),
1✔
688
                            token_type: t.token_type,
1✔
689
                            environment: t.environment.or(remaining_info.environment),
1✔
690
                            projects: t.projects,
1✔
691
                            status: TokenValidationStatus::Validated,
1✔
692
                        }
693
                    })
694
                    .collect())
695
            }
696
            s => {
×
697
                TOKEN_VALIDATION_FAILURES
×
NEW
698
                    .with_label_values(&[
×
NEW
699
                        result.status().as_str(),
×
NEW
700
                        &self.meta_info.app_name,
×
NEW
701
                        &self.meta_info.instance_id,
×
702
                    ])
703
                    .inc();
704
                error!(
×
705
                    "Failed to validate tokens. Requested url: [{}]. Got status: {:?}",
706
                    self.urls.edge_validate_url.to_string(),
707
                    s
708
                );
709
                check_api_suffix();
×
710
                Err(EdgeError::TokenValidationError(
×
711
                    reqwest::StatusCode::from_u16(s.as_u16()).unwrap(),
×
712
                ))
713
            }
714
        }
715
    }
716
}
717

718
#[cfg(test)]
719
mod tests {
720
    use std::path::PathBuf;
721
    use std::str::FromStr;
722

723
    use crate::cli::ClientIdentity;
724
    use crate::http::unleash_client::new_reqwest_client;
725
    use crate::{
726
        cli::TlsOptions,
727
        middleware::as_async_middleware::as_async_middleware,
728
        tls,
729
        types::{
730
            ClientFeaturesRequest, ClientFeaturesResponse, EdgeToken, TokenValidationStatus,
731
            ValidateTokensRequest,
732
        },
733
    };
734
    use actix_http::{body::MessageBody, HttpService, TlsAcceptorConfig};
735
    use actix_http_test::{test_server, TestServer};
736
    use actix_middleware_etag::Etag;
737
    use actix_service::map_config;
738
    use actix_web::{
739
        dev::{AppConfig, ServiceRequest, ServiceResponse},
740
        http::header::EntityTag,
741
        web, App, HttpResponse,
742
    };
743
    use chrono::Duration;
744
    use unleash_types::client_features::{ClientFeature, ClientFeatures};
745

746
    use super::{ClientMetaInformation, EdgeTokens, UnleashClient};
747

748
    impl ClientFeaturesRequest {
749
        pub(crate) fn new(api_key: String, etag: Option<String>) -> Self {
750
            Self {
751
                api_key,
752
                etag: etag.map(EntityTag::new_weak),
753
            }
754
        }
755
    }
756

757
    const TEST_TOKEN: &str = "[]:development.08bce4267a3b1aa";
758
    fn two_client_features() -> ClientFeatures {
759
        ClientFeatures {
760
            version: 2,
761
            features: vec![
762
                ClientFeature {
763
                    name: "test1".into(),
764
                    feature_type: Some("release".into()),
765
                    ..Default::default()
766
                },
767
                ClientFeature {
768
                    name: "test2".into(),
769
                    feature_type: Some("release".into()),
770
                    ..Default::default()
771
                },
772
            ],
773
            segments: None,
774
            query: None,
775
            meta: None,
776
        }
777
    }
778

779
    async fn return_client_features() -> HttpResponse {
780
        HttpResponse::Ok().json(two_client_features())
781
    }
782

783
    async fn return_validate_tokens() -> HttpResponse {
784
        HttpResponse::Ok().json(EdgeTokens {
785
            tokens: vec![EdgeToken {
786
                token: TEST_TOKEN.into(),
787
                ..Default::default()
788
            }],
789
        })
790
    }
791

792
    async fn test_features_server() -> TestServer {
793
        test_server(move || {
794
            HttpService::new(map_config(
795
                App::new()
796
                    .wrap(Etag)
797
                    .service(
798
                        web::resource("/api/client/features")
799
                            .route(web::get().to(return_client_features)),
800
                    )
801
                    .service(
802
                        web::resource("/edge/validate")
803
                            .route(web::post().to(return_validate_tokens)),
804
                    )
805
                    .service(
806
                        web::resource("/api/edge/validate")
807
                            .route(web::post().to(HttpResponse::Forbidden)),
808
                    ),
809
                |_| AppConfig::default(),
810
            ))
811
            .tcp()
812
        })
813
        .await
814
    }
815

816
    async fn test_features_server_with_untrusted_ssl() -> TestServer {
817
        test_server(move || {
818
            let tls_options = TlsOptions {
819
                tls_server_cert: Some("../examples/server.crt".into()),
820
                tls_enable: true,
821
                tls_server_key: Some("../examples/server.key".into()),
822
                tls_server_port: 443,
823
            };
824
            let server_config = tls::config(tls_options).unwrap();
825
            let tls_acceptor_config =
826
                TlsAcceptorConfig::default().handshake_timeout(std::time::Duration::from_secs(5));
827
            HttpService::new(map_config(
828
                App::new()
829
                    .wrap(Etag)
830
                    .service(
831
                        web::resource("/api/client/features")
832
                            .route(web::get().to(return_client_features)),
833
                    )
834
                    .service(
835
                        web::resource("/edge/validate")
836
                            .route(web::post().to(return_validate_tokens)),
837
                    ),
838
                |_| AppConfig::default(),
839
            ))
840
            .rustls_0_23_with_config(server_config, tls_acceptor_config)
841
        })
842
        .await
843
    }
844

845
    async fn validate_api_key_middleware(
846
        req: ServiceRequest,
847
        srv: crate::middleware::as_async_middleware::Next<impl MessageBody + 'static>,
848
    ) -> Result<ServiceResponse<impl MessageBody>, actix_web::Error> {
849
        let res = if req
850
            .headers()
851
            .get("X-Api-Key")
852
            .map(|key| key.to_str().unwrap() == "MyMagicKey")
853
            .unwrap_or(false)
854
        {
855
            srv.call(req).await?.map_into_left_body()
856
        } else {
857
            req.into_response(HttpResponse::Forbidden().finish())
858
                .map_into_right_body()
859
        };
860
        Ok(res)
861
    }
862

863
    async fn test_features_server_with_required_custom_header() -> TestServer {
864
        test_server(move || {
865
            HttpService::new(map_config(
866
                App::new()
867
                    .wrap(Etag)
868
                    .wrap(as_async_middleware(validate_api_key_middleware))
869
                    .service(
870
                        web::resource("/api/client/features")
871
                            .route(web::get().to(return_client_features)),
872
                    )
873
                    .service(
874
                        web::resource("/edge/validate")
875
                            .route(web::post().to(return_validate_tokens)),
876
                    ),
877
                |_| AppConfig::default(),
878
            ))
879
            .tcp()
880
        })
881
        .await
882
    }
883

884
    fn expected_etag(features: ClientFeatures) -> String {
885
        let hash = features.xx3_hash().unwrap();
886
        let len = serde_json::to_string(&features)
887
            .map(|string| string.len())
888
            .unwrap();
889
        format!("{len:x}-{hash}")
890
    }
891

892
    #[actix_web::test]
893
    async fn client_can_get_features() {
894
        let srv = test_features_server().await;
895
        let tag = EntityTag::new_weak(expected_etag(two_client_features()));
896
        let client = UnleashClient::new(srv.url("/").as_str(), None).unwrap();
897
        let client_features_result = client
898
            .get_client_features(ClientFeaturesRequest::new("somekey".to_string(), None))
899
            .await;
900
        assert!(client_features_result.is_ok());
901
        let client_features_response = client_features_result.unwrap();
902
        match client_features_response {
903
            ClientFeaturesResponse::Updated(f, e) => {
904
                assert!(e.is_some());
905
                assert_eq!(e.unwrap(), tag);
906
                assert!(!f.features.is_empty());
907
            }
908
            _ => panic!("Got no update when expecting an update"),
909
        }
910
    }
911

912
    #[actix_web::test]
913
    async fn client_handles_304() {
914
        let srv = test_features_server().await;
915
        let tag = expected_etag(two_client_features());
916
        let client = UnleashClient::new(srv.url("/").as_str(), None).unwrap();
917
        let client_features_result = client
918
            .get_client_features(ClientFeaturesRequest::new(
919
                TEST_TOKEN.to_string(),
920
                Some(tag.clone()),
921
            ))
922
            .await;
923
        assert!(client_features_result.is_ok());
924
        let client_features_response = client_features_result.unwrap();
925
        match client_features_response {
926
            ClientFeaturesResponse::NoUpdate(t) => {
927
                assert_eq!(t, EntityTag::new_weak(tag));
928
            }
929
            _ => panic!("Got an update when no update was expected"),
930
        }
931
    }
932

933
    #[actix_web::test]
934
    async fn can_validate_token() {
935
        let srv = test_features_server().await;
936
        let client = UnleashClient::new(srv.url("/").as_str(), None).unwrap();
937
        let validate_result = client
938
            .validate_tokens(ValidateTokensRequest {
939
                tokens: vec![TEST_TOKEN.to_string()],
940
            })
941
            .await;
942
        match validate_result {
943
            Ok(token_status) => {
944
                assert_eq!(token_status.len(), 1);
945
                let validated_token = token_status.first().unwrap();
946
                assert_eq!(validated_token.status, TokenValidationStatus::Validated);
947
                assert_eq!(validated_token.environment, Some("development".into()))
948
            }
949
            Err(e) => {
950
                panic!("Error validating token: {e}");
951
            }
952
        }
953
    }
954

955
    #[test]
956
    pub fn can_parse_entity_tag() {
957
        let etag = EntityTag::from_str("W/\"b5e6-DPC/1RShRw1J/jtxvRtTo1jf4+o\"").unwrap();
958
        assert!(etag.weak);
959
    }
960

961
    #[test]
962
    pub fn parse_entity_tag() {
963
        let optimal_304_tag = EntityTag::from_str("\"76d8bb0e:2841\"");
964
        assert!(optimal_304_tag.is_ok());
965
    }
966

967
    #[actix_web::test]
968
    pub async fn custom_client_headers_are_sent_along() {
969
        let custom_headers = vec![("X-Api-Key".to_string(), "MyMagicKey".to_string())];
970
        let srv = test_features_server_with_required_custom_header().await;
971
        let client_without_extra_headers = UnleashClient::new(srv.url("/").as_str(), None).unwrap();
972
        let client_with_headers = client_without_extra_headers
973
            .clone()
974
            .with_custom_client_headers(custom_headers);
975
        let res = client_without_extra_headers
976
            .get_client_features(ClientFeaturesRequest {
977
                api_key: "notneeded".into(),
978
                etag: None,
979
            })
980
            .await;
981
        assert!(res.is_err());
982
        let authed_res = client_with_headers
983
            .get_client_features(ClientFeaturesRequest {
984
                api_key: "notneeded".into(),
985
                etag: None,
986
            })
987
            .await;
988
        assert!(authed_res.is_ok());
989
    }
990

991
    #[actix_web::test]
992
    pub async fn disabling_ssl_verification_allows_communicating_with_upstream_unleash_with_self_signed_cert(
993
    ) {
994
        let srv = test_features_server_with_untrusted_ssl().await;
995
        let client = UnleashClient::new_insecure(srv.surl("/").as_str()).unwrap();
996

997
        let validate_result = client
998
            .validate_tokens(ValidateTokensRequest {
999
                tokens: vec![TEST_TOKEN.to_string()],
1000
            })
1001
            .await;
1002

1003
        assert!(validate_result.is_ok());
1004
    }
1005

1006
    #[actix_web::test]
1007
    pub async fn not_disabling_ssl_verification_fails_communicating_with_upstream_unleash_with_self_signed_cert(
1008
    ) {
1009
        let srv = test_features_server_with_untrusted_ssl().await;
1010
        let client = UnleashClient::new(srv.surl("/").as_str(), None).unwrap();
1011

1012
        let validate_result = client
1013
            .validate_tokens(ValidateTokensRequest {
1014
                tokens: vec![TEST_TOKEN.to_string()],
1015
            })
1016
            .await;
1017

1018
        assert!(validate_result.is_err());
1019
    }
1020

1021
    #[cfg(target_os = "linux")]
1022
    #[test]
1023
    pub fn can_instantiate_pkcs_12_client() {
1024
        let pfx = "./testdata/pkcs12/snakeoil.pfx";
1025
        let passphrase = "password";
1026
        let identity = ClientIdentity {
1027
            pkcs8_client_certificate_file: None,
1028
            pkcs8_client_key_file: None,
1029
            pkcs12_identity_file: Some(PathBuf::from(pfx)),
1030
            pkcs12_passphrase: Some(passphrase.into()),
1031
        };
1032
        let client = new_reqwest_client(
1033
            false,
1034
            Some(identity),
1035
            None,
1036
            Duration::seconds(5),
1037
            Duration::seconds(5),
1038
            ClientMetaInformation {
1039
                app_name: "test-client".into(),
1040
                instance_id: "test-pkcs12".into(),
1041
            },
1042
        );
1043
        assert!(client.is_ok());
1044
    }
1045

1046
    #[test]
1047
    pub fn should_throw_error_if_wrong_passphrase_to_pfx_file() {
1048
        let pfx = "./testdata/pkcs12/snakeoil.pfx";
1049
        let passphrase = "wrongpassword";
1050
        let identity = ClientIdentity {
1051
            pkcs8_client_certificate_file: None,
1052
            pkcs8_client_key_file: None,
1053
            pkcs12_identity_file: Some(PathBuf::from(pfx)),
1054
            pkcs12_passphrase: Some(passphrase.into()),
1055
        };
1056
        let client = new_reqwest_client(
1057
            false,
1058
            Some(identity),
1059
            None,
1060
            Duration::seconds(5),
1061
            Duration::seconds(5),
1062
            ClientMetaInformation {
1063
                app_name: "test-client".into(),
1064
                instance_id: "test-pkcs12".into(),
1065
            },
1066
        );
1067
        assert!(client.is_err());
1068
    }
1069

1070
    #[test]
1071
    pub fn can_instantiate_pkcs_8_client() {
1072
        let key = "./testdata/pkcs8/snakeoil.key";
1073
        let cert = "./testdata/pkcs12/snakeoil.pem";
1074
        let identity = ClientIdentity {
1075
            pkcs8_client_certificate_file: Some(cert.into()),
1076
            pkcs8_client_key_file: Some(key.into()),
1077
            pkcs12_identity_file: None,
1078
            pkcs12_passphrase: None,
1079
        };
1080
        let client = new_reqwest_client(
1081
            false,
1082
            Some(identity),
1083
            None,
1084
            Duration::seconds(5),
1085
            Duration::seconds(5),
1086
            ClientMetaInformation {
1087
                app_name: "test-client".into(),
1088
                instance_id: "test-pkcs8".into(),
1089
            },
1090
        );
1091
        assert!(client.is_ok());
1092
    }
1093
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc