• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Unleash / unleash-edge / #1444

21 Jan 2025 08:37AM UTC coverage: 19.689% (-53.4%) from 73.06%
#1444

push

web-flow
dep-update: bump serde_json from 1.0.135 to 1.0.137 (#678)

Bumps [serde_json](https://github.com/serde-rs/json) from 1.0.135 to 1.0.137.
- [Release notes](https://github.com/serde-rs/json/releases)
- [Commits](https://github.com/serde-rs/json/compare/v1.0.135...v1.0.137)

---
updated-dependencies:
- dependency-name: serde_json
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

405 of 2057 relevant lines covered (19.69%)

0.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/server/src/http/unleash_client.rs
1
use std::collections::HashMap;
2
use std::fs;
3
use std::path::PathBuf;
4
use std::str::FromStr;
5

6
use actix_web::http::header::EntityTag;
7
use chrono::Duration;
8
use chrono::Utc;
9
use lazy_static::lazy_static;
10
use prometheus::{register_histogram_vec, register_int_gauge_vec, HistogramVec, IntGaugeVec, Opts};
11
use reqwest::header::{HeaderMap, HeaderName};
12
use reqwest::{header, Client};
13
use reqwest::{ClientBuilder, Identity, RequestBuilder, StatusCode, Url};
14
use serde::{Deserialize, Serialize};
15
use tracing::error;
16
use tracing::{info, trace, warn};
17
use unleash_types::client_features::{ClientFeatures, ClientFeaturesDelta};
18
use unleash_types::client_metrics::ClientApplication;
19

20
use crate::cli::ClientIdentity;
21
use crate::error::EdgeError::EdgeMetricsRequestError;
22
use crate::error::{CertificateError, FeatureError};
23
use crate::http::headers::{
24
    UNLEASH_APPNAME_HEADER, UNLEASH_CLIENT_SPEC_HEADER, UNLEASH_INSTANCE_ID_HEADER,
25
};
26
use crate::metrics::client_metrics::MetricsBatch;
27
use crate::tls::build_upstream_certificate;
28
use crate::types::{
29
    ClientFeaturesDeltaResponse, ClientFeaturesResponse, EdgeResult, EdgeToken,
30
    TokenValidationStatus, ValidateTokensRequest,
31
};
32
use crate::urls::UnleashUrls;
33
use crate::{error::EdgeError, types::ClientFeaturesRequest};
34

35
lazy_static! {
36
    pub static ref CLIENT_REGISTER_FAILURES: IntGaugeVec = register_int_gauge_vec!(
×
37
        Opts::new(
×
38
            "client_register_failures",
39
            "Why we failed to register upstream"
40
        ),
41
        &["status_code"]
42
    )
43
    .unwrap();
44
    pub static ref CLIENT_FEATURE_FETCH: HistogramVec = register_histogram_vec!(
×
45
        "client_feature_fetch",
46
        "Timings for fetching features in milliseconds",
47
        &["status_code"],
48
        vec![1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 5000.0]
×
49
    )
50
    .unwrap();
51
    pub static ref CLIENT_FEATURE_DELTA_FETCH: HistogramVec = register_histogram_vec!(
×
52
        "client_feature_delta_fetch",
53
        "Timings for fetching feature deltas in milliseconds",
54
        &["status_code"],
55
        vec![1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 5000.0]
×
56
    )
57
    .unwrap();
58
    pub static ref CLIENT_FEATURE_FETCH_FAILURES: IntGaugeVec = register_int_gauge_vec!(
×
59
        Opts::new(
×
60
            "client_feature_fetch_failures",
61
            "Why we failed to fetch features"
62
        ),
63
        &["status_code"]
64
    )
65
    .unwrap();
66
    pub static ref TOKEN_VALIDATION_FAILURES: IntGaugeVec = register_int_gauge_vec!(
×
67
        Opts::new(
×
68
            "token_validation_failures",
69
            "Why we failed to validate tokens"
70
        ),
71
        &["status_code"]
72
    )
73
    .unwrap();
74
    pub static ref UPSTREAM_VERSION: IntGaugeVec = register_int_gauge_vec!(
×
75
        Opts::new(
×
76
            "upstream_version",
77
            "The server type (Unleash or Edge) and version of the upstream we're connected to"
78
        ),
79
        &["server", "version"]
80
    )
81
    .unwrap();
82
}
83

84
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
85
pub struct ClientMetaInformation {
86
    pub app_name: String,
87
    pub instance_id: String,
88
}
89

90
impl Default for ClientMetaInformation {
91
    fn default() -> Self {
×
92
        Self {
93
            app_name: "unleash-edge".into(),
×
94
            instance_id: format!("unleash-edge@{}", ulid::Ulid::new().to_string()),
×
95
        }
96
    }
97
}
98

99
impl ClientMetaInformation {
100
    pub fn test_config() -> Self {
×
101
        Self {
102
            app_name: "test-app-name".into(),
×
103
            instance_id: "test-instance-id".into(),
×
104
        }
105
    }
106
}
107

108
#[derive(Clone, Debug, Default)]
109
pub struct UnleashClient {
110
    pub urls: UnleashUrls,
111
    backing_client: Client,
112
    custom_headers: HashMap<String, String>,
113
    token_header: String,
114
}
115

116
fn load_pkcs12(id: &ClientIdentity) -> EdgeResult<Identity> {
×
117
    let pfx = fs::read(id.pkcs12_identity_file.clone().unwrap()).map_err(|e| {
×
118
        EdgeError::ClientCertificateError(CertificateError::Pkcs12ArchiveNotFound(format!("{e:?}")))
×
119
    })?;
120
    Identity::from_pkcs12_der(
121
        &pfx,
×
122
        &id.pkcs12_passphrase.clone().unwrap_or_else(|| "".into()),
×
123
    )
124
    .map_err(|e| {
×
125
        EdgeError::ClientCertificateError(CertificateError::Pkcs12IdentityGeneration(format!(
×
126
            "{e:?}"
127
        )))
128
    })
129
}
130

131
fn load_pkcs8(id: &ClientIdentity) -> EdgeResult<Identity> {
×
132
    let cert = fs::read(id.pkcs8_client_certificate_file.clone().unwrap()).map_err(|e| {
×
133
        EdgeError::ClientCertificateError(CertificateError::Pem8ClientCertNotFound(format!("{e:}")))
×
134
    })?;
135
    let key = fs::read(id.pkcs8_client_key_file.clone().unwrap()).map_err(|e| {
×
136
        EdgeError::ClientCertificateError(CertificateError::Pem8ClientKeyNotFound(format!("{e:?}")))
×
137
    })?;
138
    Identity::from_pkcs8_pem(&cert, &key).map_err(|e| {
×
139
        EdgeError::ClientCertificateError(CertificateError::Pem8IdentityGeneration(format!(
×
140
            "{e:?}"
141
        )))
142
    })
143
}
144

145
fn build_identity(tls: Option<ClientIdentity>) -> EdgeResult<ClientBuilder> {
×
146
    tls.map_or_else(
×
147
        || Ok(ClientBuilder::new()),
×
148
        |tls| {
×
149
            let req_identity = if tls.pkcs12_identity_file.is_some() {
×
150
                // We're going to assume that we're using pkcs#12
151
                load_pkcs12(&tls)
×
152
            } else if tls.pkcs8_client_certificate_file.is_some() {
×
153
                load_pkcs8(&tls)
×
154
            } else {
155
                Err(EdgeError::ClientCertificateError(
×
156
                    CertificateError::NoCertificateFiles,
×
157
                ))
158
            };
159
            req_identity.map(|id| ClientBuilder::new().identity(id))
×
160
        },
161
    )
162
}
163

164
pub fn new_reqwest_client(
×
165
    skip_ssl_verification: bool,
166
    client_identity: Option<ClientIdentity>,
167
    upstream_certificate_file: Option<PathBuf>,
168
    connect_timeout: Duration,
169
    socket_timeout: Duration,
170
    client_meta_information: ClientMetaInformation,
171
) -> EdgeResult<Client> {
172
    build_identity(client_identity)
×
173
        .and_then(|builder| {
×
174
            build_upstream_certificate(upstream_certificate_file).map(|cert| match cert {
×
175
                Some(c) => builder.add_root_certificate(c),
×
176
                None => builder,
×
177
            })
178
        })
179
        .and_then(|client| {
×
180
            let mut header_map = HeaderMap::new();
×
181
            header_map.insert(
×
182
                UNLEASH_APPNAME_HEADER,
183
                header::HeaderValue::from_str(&client_meta_information.app_name)
×
184
                    .expect("Could not add app name as a header"),
185
            );
186
            header_map.insert(
×
187
                UNLEASH_INSTANCE_ID_HEADER,
188
                header::HeaderValue::from_str(&client_meta_information.instance_id).unwrap(),
×
189
            );
190
            header_map.insert(
×
191
                UNLEASH_CLIENT_SPEC_HEADER,
192
                header::HeaderValue::from_static(unleash_yggdrasil::SUPPORTED_SPEC_VERSION),
×
193
            );
194

195
            client
×
196
                .user_agent(format!("unleash-edge-{}", crate::types::build::PKG_VERSION))
×
197
                .default_headers(header_map)
×
198
                .danger_accept_invalid_certs(skip_ssl_verification)
×
199
                .timeout(socket_timeout.to_std().unwrap())
×
200
                .connect_timeout(connect_timeout.to_std().unwrap())
×
201
                .build()
202
                .map_err(|e| EdgeError::ClientBuildError(format!("{e:?}")))
×
203
        })
204
}
205

206
#[derive(Clone, Debug, Serialize, Deserialize)]
207
pub struct EdgeTokens {
208
    pub tokens: Vec<EdgeToken>,
209
}
210

211
impl UnleashClient {
212
    pub fn from_url(server_url: Url, token_header: String, backing_client: Client) -> Self {
×
213
        Self {
214
            urls: UnleashUrls::from_base_url(server_url),
×
215
            backing_client,
216
            custom_headers: Default::default(),
×
217
            token_header,
218
        }
219
    }
220

221
    pub fn new(server_url: &str, instance_id_opt: Option<String>) -> Result<Self, EdgeError> {
×
222
        use ulid::Ulid;
223

224
        let instance_id = instance_id_opt.unwrap_or_else(|| Ulid::new().to_string());
×
225
        Ok(Self {
×
226
            urls: UnleashUrls::from_str(server_url)?,
×
227
            backing_client: new_reqwest_client(
×
228
                false,
229
                None,
×
230
                None,
×
231
                Duration::seconds(5),
×
232
                Duration::seconds(5),
×
233
                ClientMetaInformation {
×
234
                    instance_id,
×
235
                    app_name: "test-client".into(),
×
236
                },
237
            )
238
            .unwrap(),
239
            custom_headers: Default::default(),
×
240
            token_header: "Authorization".to_string(),
×
241
        })
242
    }
243

244
    #[cfg(test)]
245
    pub fn new_insecure(server_url: &str) -> Result<Self, EdgeError> {
246

247
        Ok(Self {
248
            urls: UnleashUrls::from_str(server_url)?,
249
            backing_client: new_reqwest_client(
250
                true,
251
                None,
252
                None,
253
                Duration::seconds(5),
254
                Duration::seconds(5),
255
                ClientMetaInformation::test_config(),
256
            )
257
            .unwrap(),
258
            custom_headers: Default::default(),
259
            token_header: "Authorization".to_string(),
260
        })
261
    }
262

263
    fn client_features_req(&self, req: ClientFeaturesRequest) -> RequestBuilder {
×
264
        let client_req = self
×
265
            .backing_client
266
            .get(self.urls.client_features_url.to_string())
×
267
            .headers(self.header_map(Some(req.api_key)));
×
268
        if let Some(tag) = req.etag {
×
269
            client_req.header(header::IF_NONE_MATCH, tag.to_string())
×
270
        } else {
271
            client_req
×
272
        }
273
    }
274

275
    fn client_features_delta_req(&self, req: ClientFeaturesRequest) -> RequestBuilder {
×
276
        let client_req = self
×
277
            .backing_client
278
            .get(self.urls.client_features_delta_url.to_string())
×
279
            .headers(self.header_map(Some(req.api_key)));
×
280
        if let Some(tag) = req.etag {
×
281
            client_req.header(header::IF_NONE_MATCH, tag.to_string())
×
282
        } else {
283
            client_req
×
284
        }
285
    }
286

287
    fn header_map(&self, api_key: Option<String>) -> HeaderMap {
×
288
        let mut header_map = HeaderMap::new();
×
289
        let token_header: HeaderName = HeaderName::from_str(self.token_header.as_str()).unwrap();
×
290
        if let Some(key) = api_key {
×
291
            header_map.insert(token_header, key.parse().unwrap());
×
292
        }
293
        for (header_name, header_value) in self.custom_headers.iter() {
×
294
            let key = HeaderName::from_str(header_name.as_str()).unwrap();
×
295
            header_map.insert(key, header_value.parse().unwrap());
×
296
        }
297
        header_map
×
298
    }
299

300
    pub fn with_custom_client_headers(self, custom_headers: Vec<(String, String)>) -> Self {
×
301
        Self {
302
            custom_headers: custom_headers.iter().cloned().collect(),
×
303
            ..self
304
        }
305
    }
306

307
    pub async fn register_as_client(
×
308
        &self,
309
        api_key: String,
310
        application: ClientApplication,
311
    ) -> EdgeResult<()> {
312
        self.backing_client
×
313
            .post(self.urls.client_register_app_url.to_string())
×
314
            .headers(self.header_map(Some(api_key)))
×
315
            .json(&application)
×
316
            .send()
317
            .await
×
318
            .map_err(|e| {
×
319
                warn!("Failed to register client: {e:?}");
×
320
                EdgeError::ClientRegisterError
×
321
            })
322
            .map(|r| {
×
323
                if !r.status().is_success() {
×
324
                    CLIENT_REGISTER_FAILURES
×
325
                        .with_label_values(&[r.status().as_str()])
×
326
                        .inc();
×
327
                    warn!(
×
328
                        "Failed to register client upstream with status code {}",
329
                        r.status()
×
330
                    );
331
                }
332
            })
333
    }
334

335
    pub async fn get_client_features(
×
336
        &self,
337
        request: ClientFeaturesRequest,
338
    ) -> EdgeResult<ClientFeaturesResponse> {
339
        let start_time = Utc::now();
×
340
        let response = self
×
341
            .client_features_req(request.clone())
×
342
            .send()
343
            .await
×
344
            .map_err(|e| {
×
345
                warn!("Failed to fetch. Due to [{e:?}] - Will retry");
×
346
                match e.status() {
×
347
                    Some(s) => EdgeError::ClientFeaturesFetchError(FeatureError::Retriable(s)),
×
348
                    None => EdgeError::ClientFeaturesFetchError(FeatureError::NotFound),
×
349
                }
350
            })?;
351
        let stop_time = Utc::now();
×
352
        CLIENT_FEATURE_FETCH
×
353
            .with_label_values(&[&response.status().as_u16().to_string()])
×
354
            .observe(
355
                stop_time
×
356
                    .signed_duration_since(start_time)
×
357
                    .num_milliseconds() as f64,
358
            );
359
        if response.status() == StatusCode::NOT_MODIFIED {
×
360
            Ok(ClientFeaturesResponse::NoUpdate(
×
361
                request.etag.expect("Got NOT_MODIFIED without an ETag"),
×
362
            ))
363
        } else if response.status().is_success() {
×
364
            let etag = response
×
365
                .headers()
366
                .get("ETag")
367
                .or_else(|| response.headers().get("etag"))
×
368
                .and_then(|etag| EntityTag::from_str(etag.to_str().unwrap()).ok());
×
369
            let features = response.json::<ClientFeatures>().await.map_err(|e| {
×
370
                warn!("Could not parse features response to internal representation");
×
371
                EdgeError::ClientFeaturesParseError(e.to_string())
×
372
            })?;
373
            Ok(ClientFeaturesResponse::Updated(features, etag))
×
374
        } else if response.status() == StatusCode::FORBIDDEN {
×
375
            CLIENT_FEATURE_FETCH_FAILURES
×
376
                .with_label_values(&[response.status().as_str()])
×
377
                .inc();
378
            Err(EdgeError::ClientFeaturesFetchError(
×
379
                FeatureError::AccessDenied,
×
380
            ))
381
        } else if response.status() == StatusCode::UNAUTHORIZED {
×
382
            CLIENT_FEATURE_FETCH_FAILURES
×
383
                .with_label_values(&[response.status().as_str()])
×
384
                .inc();
385
            warn!(
×
386
                "Failed to get features. Url: [{}]. Status code: [401]",
387
                self.urls.client_features_url.to_string()
388
            );
389
            Err(EdgeError::ClientFeaturesFetchError(
×
390
                FeatureError::AccessDenied,
×
391
            ))
392
        } else if response.status() == StatusCode::NOT_FOUND {
×
393
            CLIENT_FEATURE_FETCH_FAILURES
×
394
                .with_label_values(&[response.status().as_str()])
×
395
                .inc();
396
            warn!(
×
397
                "Failed to get features. Url: [{}]. Status code: [{}]",
398
                self.urls.client_features_url.to_string(),
399
                response.status().as_str()
400
            );
401
            Err(EdgeError::ClientFeaturesFetchError(FeatureError::NotFound))
×
402
        } else {
403
            CLIENT_FEATURE_FETCH_FAILURES
×
404
                .with_label_values(&[response.status().as_str()])
×
405
                .inc();
406
            Err(EdgeError::ClientFeaturesFetchError(
×
407
                FeatureError::Retriable(response.status()),
×
408
            ))
409
        }
410
    }
411

412
    pub async fn get_client_features_delta(
×
413
        &self,
414
        request: ClientFeaturesRequest,
415
    ) -> EdgeResult<ClientFeaturesDeltaResponse> {
416
        let start_time = Utc::now();
×
417
        let response = self
×
418
            .client_features_delta_req(request.clone())
×
419
            .send()
420
            .await
×
421
            .map_err(|e| {
×
422
                warn!("Failed to fetch. Due to [{e:?}] - Will retry");
×
423
                match e.status() {
×
424
                    Some(s) => EdgeError::ClientFeaturesFetchError(FeatureError::Retriable(s)),
×
425
                    None => EdgeError::ClientFeaturesFetchError(FeatureError::NotFound),
×
426
                }
427
            })?;
428
        let stop_time = Utc::now();
×
429
        CLIENT_FEATURE_DELTA_FETCH
×
430
            .with_label_values(&[&response.status().as_u16().to_string()])
×
431
            .observe(
432
                stop_time
×
433
                    .signed_duration_since(start_time)
×
434
                    .num_milliseconds() as f64,
435
            );
436
        if response.status() == StatusCode::NOT_MODIFIED {
×
437
            Ok(ClientFeaturesDeltaResponse::NoUpdate(
×
438
                request.etag.expect("Got NOT_MODIFIED without an ETag"),
×
439
            ))
440
        } else if response.status().is_success() {
×
441
            let etag = response
×
442
                .headers()
443
                .get("ETag")
444
                .or_else(|| response.headers().get("etag"))
×
445
                .and_then(|etag| EntityTag::from_str(etag.to_str().unwrap()).ok());
×
446
            let features = response.json::<ClientFeaturesDelta>().await.map_err(|e| {
×
447
                warn!("Could not parse features response to internal representation");
×
448
                EdgeError::ClientFeaturesParseError(e.to_string())
×
449
            })?;
450
            Ok(ClientFeaturesDeltaResponse::Updated(features, etag))
×
451
        } else if response.status() == StatusCode::FORBIDDEN {
×
452
            CLIENT_FEATURE_FETCH_FAILURES
×
453
                .with_label_values(&[response.status().as_str()])
×
454
                .inc();
455
            Err(EdgeError::ClientFeaturesFetchError(
×
456
                FeatureError::AccessDenied,
×
457
            ))
458
        } else if response.status() == StatusCode::UNAUTHORIZED {
×
459
            CLIENT_FEATURE_FETCH_FAILURES
×
460
                .with_label_values(&[response.status().as_str()])
×
461
                .inc();
462
            warn!(
×
463
                "Failed to get features. Url: [{}]. Status code: [401]",
464
                self.urls.client_features_delta_url.to_string()
465
            );
466
            Err(EdgeError::ClientFeaturesFetchError(
×
467
                FeatureError::AccessDenied,
×
468
            ))
469
        } else if response.status() == StatusCode::NOT_FOUND {
×
470
            CLIENT_FEATURE_FETCH_FAILURES
×
471
                .with_label_values(&[response.status().as_str()])
×
472
                .inc();
473
            warn!(
×
474
                "Failed to get features. Url: [{}]. Status code: [{}]",
475
                self.urls.client_features_delta_url.to_string(),
476
                response.status().as_str()
477
            );
478
            Err(EdgeError::ClientFeaturesFetchError(FeatureError::NotFound))
×
479
        } else {
480
            CLIENT_FEATURE_FETCH_FAILURES
×
481
                .with_label_values(&[response.status().as_str()])
×
482
                .inc();
483
            Err(EdgeError::ClientFeaturesFetchError(
×
484
                FeatureError::Retriable(response.status()),
×
485
            ))
486
        }
487
    }
488

489
    pub async fn send_batch_metrics(&self, request: MetricsBatch) -> EdgeResult<()> {
×
490
        trace!("Sending metrics to old /edge/metrics endpoint");
×
491
        let result = self
×
492
            .backing_client
493
            .post(self.urls.edge_metrics_url.to_string())
×
494
            .headers(self.header_map(None))
×
495
            .json(&request)
×
496
            .send()
497
            .await
×
498
            .map_err(|e| {
×
499
                info!("Failed to send batch metrics: {e:?}");
×
500
                EdgeError::EdgeMetricsError
×
501
            })?;
502
        if result.status().is_success() {
×
503
            Ok(())
×
504
        } else {
505
            match result.status() {
×
506
                StatusCode::BAD_REQUEST => Err(EdgeError::EdgeMetricsRequestError(
×
507
                    result.status(),
×
508
                    result.json().await.ok(),
×
509
                )),
510
                _ => Err(EdgeMetricsRequestError(result.status(), None)),
×
511
            }
512
        }
513
    }
514

515
    pub async fn send_bulk_metrics_to_client_endpoint(
×
516
        &self,
517
        request: MetricsBatch,
518
        token: &str,
519
    ) -> EdgeResult<()> {
520
        trace!("Sending metrics to bulk endpoint");
×
521
        let result = self
×
522
            .backing_client
523
            .post(self.urls.client_bulk_metrics_url.to_string())
×
524
            .headers(self.header_map(Some(token.to_string())))
×
525
            .json(&request)
×
526
            .send()
527
            .await
×
528
            .map_err(|e| {
×
529
                info!("Failed to send metrics to /api/client/metrics/bulk endpoint {e:?}");
×
530
                EdgeError::EdgeMetricsError
×
531
            })?;
532
        if result.status().is_success() {
×
533
            Ok(())
×
534
        } else {
535
            match result.status() {
×
536
                StatusCode::BAD_REQUEST => Err(EdgeMetricsRequestError(
×
537
                    result.status(),
×
538
                    result.json().await.ok(),
×
539
                )),
540
                _ => Err(EdgeMetricsRequestError(result.status(), None)),
×
541
            }
542
        }
543
    }
544

545
    pub async fn validate_tokens(
×
546
        &self,
547
        request: ValidateTokensRequest,
548
    ) -> EdgeResult<Vec<EdgeToken>> {
549
        let check_api_suffix = || {
×
550
            let base_url = self.urls.base_url.to_string();
×
551
            if base_url.ends_with("/api") || base_url.ends_with("/api/") {
×
552
                error!("Try passing the instance URL without '/api'.");
×
553
            }
554
        };
555

556
        let result = self
×
557
            .backing_client
558
            .post(self.urls.edge_validate_url.to_string())
×
559
            .headers(self.header_map(None))
×
560
            .json(&request)
×
561
            .send()
562
            .await
×
563
            .map_err(|e| {
×
564
                info!("Failed to validate tokens: [{e:?}]");
×
565
                EdgeError::EdgeTokenError
×
566
            })?;
567
        match result.status() {
×
568
            StatusCode::OK => {
569
                let token_response = result.json::<EdgeTokens>().await.map_err(|e| {
×
570
                    warn!("Failed to parse validation response with error: {e:?}");
×
571
                    EdgeError::EdgeTokenParseError
×
572
                })?;
573
                Ok(token_response
×
574
                    .tokens
575
                    .into_iter()
576
                    .map(|t| {
×
577
                        let remaining_info =
×
578
                            EdgeToken::try_from(t.token.clone()).unwrap_or_else(|_| t.clone());
×
579
                        EdgeToken {
×
580
                            token: t.token.clone(),
×
581
                            token_type: t.token_type,
×
582
                            environment: t.environment.or(remaining_info.environment),
×
583
                            projects: t.projects,
×
584
                            status: TokenValidationStatus::Validated,
×
585
                        }
586
                    })
587
                    .collect())
588
            }
589
            s => {
×
590
                TOKEN_VALIDATION_FAILURES
×
591
                    .with_label_values(&[result.status().as_str()])
×
592
                    .inc();
593
                error!(
×
594
                    "Failed to validate tokens. Requested url: [{}]. Got status: {:?}",
595
                    self.urls.edge_validate_url.to_string(),
596
                    s
597
                );
598
                check_api_suffix();
×
599
                Err(EdgeError::TokenValidationError(
×
600
                    reqwest::StatusCode::from_u16(s.as_u16()).unwrap(),
×
601
                ))
602
            }
603
        }
604
    }
605
}
606

607
#[cfg(test)]
608
mod tests {
609
    use std::path::PathBuf;
610
    use std::str::FromStr;
611

612
    use actix_http::{body::MessageBody, HttpService, TlsAcceptorConfig};
613
    use actix_http_test::{test_server, TestServer};
614
    use actix_middleware_etag::Etag;
615
    use actix_service::map_config;
616
    use actix_web::{
617
        dev::{AppConfig, ServiceRequest, ServiceResponse},
618
        http::header::EntityTag,
619
        web, App, HttpResponse,
620
    };
621
    use chrono::Duration;
622
    use unleash_types::client_features::{ClientFeature, ClientFeatures};
623
    use crate::cli::ClientIdentity;
624
    use crate::http::unleash_client::new_reqwest_client;
625
    use crate::{
626
        cli::TlsOptions,
627
        middleware::as_async_middleware::as_async_middleware,
628
        tls,
629
        types::{
630
            ClientFeaturesRequest, ClientFeaturesResponse, EdgeToken, TokenValidationStatus,
631
            ValidateTokensRequest,
632
        },
633
    };
634

635
    use super::{EdgeTokens, UnleashClient, ClientMetaInformation};
636

637
    impl ClientFeaturesRequest {
638
        pub(crate) fn new(api_key: String, etag: Option<String>) -> Self {
639
            Self {
640
                api_key,
641
                etag: etag.map(EntityTag::new_weak),
642
            }
643
        }
644
    }
645

646
    const TEST_TOKEN: &str = "[]:development.08bce4267a3b1aa";
647
    fn two_client_features() -> ClientFeatures {
648
        ClientFeatures {
649
            version: 2,
650
            features: vec![
651
                ClientFeature {
652
                    name: "test1".into(),
653
                    feature_type: Some("release".into()),
654
                    ..Default::default()
655
                },
656
                ClientFeature {
657
                    name: "test2".into(),
658
                    feature_type: Some("release".into()),
659
                    ..Default::default()
660
                },
661
            ],
662
            segments: None,
663
            query: None,
664
            meta: None,
665
        }
666
    }
667

668
    async fn return_client_features() -> HttpResponse {
669
        HttpResponse::Ok().json(two_client_features())
670
    }
671

672
    async fn return_validate_tokens() -> HttpResponse {
673
        HttpResponse::Ok().json(EdgeTokens {
674
            tokens: vec![EdgeToken {
675
                token: TEST_TOKEN.into(),
676
                ..Default::default()
677
            }],
678
        })
679
    }
680

681
    async fn test_features_server() -> TestServer {
682
        test_server(move || {
683
            HttpService::new(map_config(
684
                App::new()
685
                    .wrap(Etag)
686
                    .service(
687
                        web::resource("/api/client/features")
688
                            .route(web::get().to(return_client_features)),
689
                    )
690
                    .service(
691
                        web::resource("/edge/validate")
692
                            .route(web::post().to(return_validate_tokens)),
693
                    )
694
                    .service(
695
                        web::resource("/api/edge/validate")
696
                            .route(web::post().to(HttpResponse::Forbidden)),
697
                    ),
698
                |_| AppConfig::default(),
699
            ))
700
            .tcp()
701
        })
702
        .await
703
    }
704

705
    async fn test_features_server_with_untrusted_ssl() -> TestServer {
706
        test_server(move || {
707
            let tls_options = TlsOptions {
708
                tls_server_cert: Some("../examples/server.crt".into()),
709
                tls_enable: true,
710
                tls_server_key: Some("../examples/server.key".into()),
711
                tls_server_port: 443,
712
            };
713
            let server_config = tls::config(tls_options).unwrap();
714
            let tls_acceptor_config =
715
                TlsAcceptorConfig::default().handshake_timeout(std::time::Duration::from_secs(5));
716
            HttpService::new(map_config(
717
                App::new()
718
                    .wrap(Etag)
719
                    .service(
720
                        web::resource("/api/client/features")
721
                            .route(web::get().to(return_client_features)),
722
                    )
723
                    .service(
724
                        web::resource("/edge/validate")
725
                            .route(web::post().to(return_validate_tokens)),
726
                    ),
727
                |_| AppConfig::default(),
728
            ))
729
            .rustls_0_23_with_config(server_config, tls_acceptor_config)
730
        })
731
        .await
732
    }
733

734
    async fn validate_api_key_middleware(
735
        req: ServiceRequest,
736
        srv: crate::middleware::as_async_middleware::Next<impl MessageBody + 'static>,
737
    ) -> Result<ServiceResponse<impl MessageBody>, actix_web::Error> {
738
        let res = if req
739
            .headers()
740
            .get("X-Api-Key")
741
            .map(|key| key.to_str().unwrap() == "MyMagicKey")
742
            .unwrap_or(false)
743
        {
744
            srv.call(req).await?.map_into_left_body()
745
        } else {
746
            req.into_response(HttpResponse::Forbidden().finish())
747
                .map_into_right_body()
748
        };
749
        Ok(res)
750
    }
751

752
    async fn test_features_server_with_required_custom_header() -> TestServer {
753
        test_server(move || {
754
            HttpService::new(map_config(
755
                App::new()
756
                    .wrap(Etag)
757
                    .wrap(as_async_middleware(validate_api_key_middleware))
758
                    .service(
759
                        web::resource("/api/client/features")
760
                            .route(web::get().to(return_client_features)),
761
                    )
762
                    .service(
763
                        web::resource("/edge/validate")
764
                            .route(web::post().to(return_validate_tokens)),
765
                    ),
766
                |_| AppConfig::default(),
767
            ))
768
            .tcp()
769
        })
770
        .await
771
    }
772

773
    fn expected_etag(features: ClientFeatures) -> String {
774
        let hash = features.xx3_hash().unwrap();
775
        let len = serde_json::to_string(&features)
776
            .map(|string| string.len())
777
            .unwrap();
778
        format!("{len:x}-{hash}")
779
    }
780

781
    #[actix_web::test]
782
    async fn client_can_get_features() {
783
        let srv = test_features_server().await;
784
        let tag = EntityTag::new_weak(expected_etag(two_client_features()));
785
        let client = UnleashClient::new(srv.url("/").as_str(), None).unwrap();
786
        let client_features_result = client
787
            .get_client_features(ClientFeaturesRequest::new("somekey".to_string(), None))
788
            .await;
789
        assert!(client_features_result.is_ok());
790
        let client_features_response = client_features_result.unwrap();
791
        match client_features_response {
792
            ClientFeaturesResponse::Updated(f, e) => {
793
                assert!(e.is_some());
794
                assert_eq!(e.unwrap(), tag);
795
                assert!(!f.features.is_empty());
796
            }
797
            _ => panic!("Got no update when expecting an update"),
798
        }
799
    }
800

801
    #[actix_web::test]
802
    async fn client_handles_304() {
803
        let srv = test_features_server().await;
804
        let tag = expected_etag(two_client_features());
805
        let client = UnleashClient::new(srv.url("/").as_str(), None).unwrap();
806
        let client_features_result = client
807
            .get_client_features(ClientFeaturesRequest::new(
808
                TEST_TOKEN.to_string(),
809
                Some(tag.clone()),
810
            ))
811
            .await;
812
        assert!(client_features_result.is_ok());
813
        let client_features_response = client_features_result.unwrap();
814
        match client_features_response {
815
            ClientFeaturesResponse::NoUpdate(t) => {
816
                assert_eq!(t, EntityTag::new_weak(tag));
817
            }
818
            _ => panic!("Got an update when no update was expected"),
819
        }
820
    }
821

822
    #[actix_web::test]
823
    async fn can_validate_token() {
824
        let srv = test_features_server().await;
825
        let client = UnleashClient::new(srv.url("/").as_str(), None).unwrap();
826
        let validate_result = client
827
            .validate_tokens(ValidateTokensRequest {
828
                tokens: vec![TEST_TOKEN.to_string()],
829
            })
830
            .await;
831
        match validate_result {
832
            Ok(token_status) => {
833
                assert_eq!(token_status.len(), 1);
834
                let validated_token = token_status.first().unwrap();
835
                assert_eq!(validated_token.status, TokenValidationStatus::Validated);
836
                assert_eq!(validated_token.environment, Some("development".into()))
837
            }
838
            Err(e) => {
839
                panic!("Error validating token: {e}");
840
            }
841
        }
842
    }
843

844
    #[test]
845
    pub fn can_parse_entity_tag() {
846
        let etag = EntityTag::from_str("W/\"b5e6-DPC/1RShRw1J/jtxvRtTo1jf4+o\"").unwrap();
847
        assert!(etag.weak);
848
    }
849

850
    #[test]
851
    pub fn parse_entity_tag() {
852
        let optimal_304_tag = EntityTag::from_str("\"76d8bb0e:2841\"");
853
        assert!(optimal_304_tag.is_ok());
854
    }
855

856
    #[actix_web::test]
857
    pub async fn custom_client_headers_are_sent_along() {
858
        let custom_headers = vec![("X-Api-Key".to_string(), "MyMagicKey".to_string())];
859
        let srv = test_features_server_with_required_custom_header().await;
860
        let client_without_extra_headers = UnleashClient::new(srv.url("/").as_str(), None).unwrap();
861
        let client_with_headers = client_without_extra_headers
862
            .clone()
863
            .with_custom_client_headers(custom_headers);
864
        let res = client_without_extra_headers
865
            .get_client_features(ClientFeaturesRequest {
866
                api_key: "notneeded".into(),
867
                etag: None,
868
            })
869
            .await;
870
        assert!(res.is_err());
871
        let authed_res = client_with_headers
872
            .get_client_features(ClientFeaturesRequest {
873
                api_key: "notneeded".into(),
874
                etag: None,
875
            })
876
            .await;
877
        assert!(authed_res.is_ok());
878
    }
879

880
    #[actix_web::test]
881
    pub async fn disabling_ssl_verification_allows_communicating_with_upstream_unleash_with_self_signed_cert(
882
    ) {
883
        let srv = test_features_server_with_untrusted_ssl().await;
884
        let client = UnleashClient::new_insecure(srv.surl("/").as_str()).unwrap();
885

886
        let validate_result = client
887
            .validate_tokens(ValidateTokensRequest {
888
                tokens: vec![TEST_TOKEN.to_string()],
889
            })
890
            .await;
891

892
        assert!(validate_result.is_ok());
893
    }
894

895
    #[actix_web::test]
896
    pub async fn not_disabling_ssl_verification_fails_communicating_with_upstream_unleash_with_self_signed_cert(
897
    ) {
898
        let srv = test_features_server_with_untrusted_ssl().await;
899
        let client = UnleashClient::new(srv.surl("/").as_str(), None).unwrap();
900

901
        let validate_result = client
902
            .validate_tokens(ValidateTokensRequest {
903
                tokens: vec![TEST_TOKEN.to_string()],
904
            })
905
            .await;
906

907
        assert!(validate_result.is_err());
908
    }
909

910
    #[cfg(target_os = "linux")]
911
    #[test]
912
    pub fn can_instantiate_pkcs_12_client() {
913
        let pfx = "./testdata/pkcs12/snakeoil.pfx";
914
        let passphrase = "password";
915
        let identity = ClientIdentity {
916
            pkcs8_client_certificate_file: None,
917
            pkcs8_client_key_file: None,
918
            pkcs12_identity_file: Some(PathBuf::from(pfx)),
919
            pkcs12_passphrase: Some(passphrase.into()),
920
        };
921
        let client = new_reqwest_client(
922
            false,
923
            Some(identity),
924
            None,
925
            Duration::seconds(5),
926
            Duration::seconds(5),
927
            ClientMetaInformation {
928
                app_name: "test-client".into(),
929
                instance_id: "test-pkcs12".into(),
930
            },
931
        );
932
        assert!(client.is_ok());
933
    }
934

935
    #[test]
936
    pub fn should_throw_error_if_wrong_passphrase_to_pfx_file() {
937
        let pfx = "./testdata/pkcs12/snakeoil.pfx";
938
        let passphrase = "wrongpassword";
939
        let identity = ClientIdentity {
940
            pkcs8_client_certificate_file: None,
941
            pkcs8_client_key_file: None,
942
            pkcs12_identity_file: Some(PathBuf::from(pfx)),
943
            pkcs12_passphrase: Some(passphrase.into()),
944
        };
945
        let client = new_reqwest_client(
946
            false,
947
            Some(identity),
948
            None,
949
            Duration::seconds(5),
950
            Duration::seconds(5),
951
            ClientMetaInformation {
952
                app_name: "test-client".into(),
953
                instance_id: "test-pkcs12".into(),
954
            },
955
        );
956
        assert!(client.is_err());
957
    }
958

959
    #[test]
960
    pub fn can_instantiate_pkcs_8_client() {
961
        let key = "./testdata/pkcs8/snakeoil.key";
962
        let cert = "./testdata/pkcs12/snakeoil.pem";
963
        let identity = ClientIdentity {
964
            pkcs8_client_certificate_file: Some(cert.into()),
965
            pkcs8_client_key_file: Some(key.into()),
966
            pkcs12_identity_file: None,
967
            pkcs12_passphrase: None,
968
        };
969
        let client = new_reqwest_client(
970
            false,
971
            Some(identity),
972
            None,
973
            Duration::seconds(5),
974
            Duration::seconds(5),
975
            ClientMetaInformation {
976
                app_name: "test-client".into(),
977
                instance_id: "test-pkcs8".into(),
978
            },
979
        );
980
        assert!(client.is_ok());
981
    }
982
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc