• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Unleash / unleash-edge / 16346359433

17 Jul 2025 01:24PM UTC coverage: 78.139% (-0.8%) from 78.902%
16346359433

Pull #1044

github

web-flow
Merge 546605bb3 into 1b7ded904
Pull Request #1044: refactor: clean up main

10 of 131 new or added lines in 5 files covered. (7.63%)

52 existing lines in 2 files now uncovered.

10705 of 13700 relevant lines covered (78.14%)

5545.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/server/src/main.rs
1
use actix_allow_deny_middleware::{AllowList, DenyList};
2
use actix_middleware_etag::Etag;
3
use actix_web::dev::Server;
4
use actix_web::middleware::Logger;
5
use actix_web::{App, HttpServer, web};
6
use clap::Parser;
7
use dashmap::DashMap;
8
use futures::future::join_all;
9
use std::sync::Arc;
10
use tokio::sync::RwLock;
11
use ulid::Ulid;
12
use unleash_edge::error::EdgeError;
13
use unleash_edge::metrics::actix_web_prometheus_metrics::PrometheusMetrics;
14
use unleash_types::client_features::ClientFeatures;
15
use unleash_types::client_metrics::ConnectVia;
16
use utoipa::OpenApi;
17
use utoipa_swagger_ui::SwaggerUi;
18

19
use tracing::info;
20
use unleash_edge::builder::{EdgeInfo, build_caches_and_refreshers};
21
use unleash_edge::cli::{AuthHeaders, CliArgs, EdgeMode};
22
use unleash_edge::feature_cache::FeatureCache;
23
use unleash_edge::http::background_send_metrics::send_metrics_one_shot;
24
use unleash_edge::http::broadcaster::Broadcaster;
25
use unleash_edge::http::instance_data::InstanceDataSending;
26
use unleash_edge::http::refresher::feature_refresher::FeatureRefresher;
27
use unleash_edge::metrics::client_metrics::MetricsCache;
28
use unleash_edge::metrics::edge_metrics::EdgeInstanceData;
29
use unleash_edge::offline::offline_hotload;
30
use unleash_edge::persistence::{EdgePersistence, persist_data};
31
use unleash_edge::types::{EdgeResult, EdgeToken, TokenValidationStatus};
32
use unleash_edge::{client_api, frontend_api, health_checker, openapi, ready_checker};
33
use unleash_edge::{edge_api, prom_metrics};
34
use unleash_edge::{http::unleash_client::ClientMetaInformation, metrics::metrics_pusher};
35
use unleash_edge::{internal_backstage, tls};
36

NEW
37
fn setup_server(
×
NEW
38
    args: CliArgs,
×
NEW
39
    edge_info: EdgeInfo,
×
NEW
40
    metrics_middleware: PrometheusMetrics,
×
NEW
41
    instance_data_sender_for_app_context: Arc<InstanceDataSending>,
×
NEW
42
    metrics_cache: Arc<MetricsCache>,
×
NEW
43
    our_instance_data_for_app_context: Arc<EdgeInstanceData>,
×
NEW
44
    instances_observed_for_app_context: Arc<RwLock<Vec<EdgeInstanceData>>>,
×
NEW
45
) -> EdgeResult<Server> {
×
UNCOV
46
    let http_args = args.clone().http;
×
UNCOV
47
    let request_timeout = args.edge_request_timeout;
×
UNCOV
48
    let keepalive_timeout = args.edge_keepalive_timeout;
×
UNCOV
49

×
UNCOV
50
    let (
×
UNCOV
51
        (token_cache, features_cache, delta_cache_manager, engine_cache),
×
UNCOV
52
        token_validator,
×
UNCOV
53
        feature_refresher,
×
NEW
54
        _,
×
NEW
55
    ) = edge_info;
×
56

×
57
    let server = HttpServer::new(move || {
×
58
        let qs_config =
×
59
            serde_qs::actix::QsQueryConfig::default().qs_config(serde_qs::Config::new(5, false));
×
60

×
NEW
61
        let connect_via = ConnectVia {
×
NEW
62
            app_name: args.app_name.clone(),
×
NEW
63
            instance_id: our_instance_data_for_app_context.identifier.clone(),
×
NEW
64
        };
×
NEW
65

×
NEW
66
        let cors_middleware = args.http.cors.middleware();
×
67
        let mut app = App::new()
×
68
            .app_data(qs_config)
×
NEW
69
            .app_data(web::Data::new(args.token_header.clone()))
×
NEW
70
            .app_data(web::Data::new(args.trust_proxy.clone()))
×
NEW
71
            .app_data(web::Data::new(args.mode.clone()))
×
NEW
72
            .app_data(web::Data::new(connect_via))
×
73
            .app_data(web::Data::from(metrics_cache.clone()))
×
74
            .app_data(web::Data::from(token_cache.clone()))
×
75
            .app_data(web::Data::from(delta_cache_manager.clone()))
×
76
            .app_data(web::Data::from(features_cache.clone()))
×
77
            .app_data(web::Data::from(engine_cache.clone()))
×
NEW
78
            .app_data(web::Data::from(Broadcaster::new(
×
NEW
79
                delta_cache_manager.clone(),
×
NEW
80
            )))
×
NEW
81
            .app_data(web::Data::from(Arc::new(AuthHeaders::from(&args))))
×
82
            .app_data(web::Data::from(
×
83
                instance_data_sender_for_app_context.clone(),
×
84
            ))
×
85
            .app_data(web::Data::from(our_instance_data_for_app_context.clone()))
×
86
            .app_data(web::Data::from(instances_observed_for_app_context.clone()));
×
87

NEW
88
        if let Some(ref token_validator) = token_validator {
×
NEW
89
            app = app.app_data(web::Data::from(token_validator.clone()));
×
NEW
90
        }
×
NEW
91
        if let Some(ref refresher) = feature_refresher {
×
NEW
92
            app = app.app_data(web::Data::from(refresher.clone()));
×
NEW
93
        }
×
94

95
        app.service(
×
NEW
96
            web::scope(&args.http.base_path)
×
97
                .wrap(Etag)
×
98
                .wrap(actix_web::middleware::Compress::default())
×
99
                .wrap(actix_web::middleware::NormalizePath::default())
×
100
                .wrap(cors_middleware)
×
101
                .wrap(metrics_middleware.clone())
×
102
                .wrap(Logger::default())
×
103
                .service(web::scope("/internal-backstage").configure(|service_cfg| {
×
104
                    internal_backstage::configure_internal_backstage(
×
105
                        service_cfg,
×
NEW
106
                        args.internal_backstage.clone(),
×
107
                    )
×
108
                }))
×
109
                .service(
×
110
                    web::scope("/api")
×
111
                        .configure(client_api::configure_client_api)
×
112
                        .configure(|cfg| {
×
NEW
113
                            frontend_api::configure_frontend_api(cfg, args.disable_all_endpoint)
×
114
                        })
×
115
                        .wrap(DenyList::with_denied_ipnets(
×
NEW
116
                            &args.http.clone().deny_list.clone().unwrap_or_default(),
×
117
                        ))
×
118
                        .wrap(
×
NEW
119
                            args.http
×
NEW
120
                                .clone()
×
121
                                .allow_list
×
122
                                .clone()
×
123
                                .map(|list| AllowList::with_allowed_ipnets(&list))
×
NEW
124
                                .unwrap_or_default(),
×
125
                        ),
×
126
                )
×
127
                .service(
×
128
                    web::scope("/edge")
×
129
                        .configure(edge_api::configure_edge_api)
×
130
                        .wrap(DenyList::with_denied_ipnets(
×
NEW
131
                            &args.http.clone().deny_list.clone().unwrap_or_default(),
×
132
                        ))
×
133
                        .wrap(
×
NEW
134
                            args.http
×
NEW
135
                                .clone()
×
136
                                .allow_list
×
137
                                .clone()
×
138
                                .map(|list| AllowList::with_allowed_ipnets(&list))
×
NEW
139
                                .unwrap_or_default(),
×
140
                        ),
×
141
                )
×
142
                .service(
×
143
                    SwaggerUi::new("/swagger-ui/{_:.*}")
×
NEW
144
                        .url("/api-doc/openapi.json", openapi::ApiDoc::openapi()),
×
145
                ),
×
146
        )
×
147
    });
×
UNCOV
148
    let server = if http_args.tls.tls_enable {
×
UNCOV
149
        let config = tls::config(http_args.clone().tls)
×
UNCOV
150
            .expect("Was expecting to succeed in configuring TLS");
×
UNCOV
151
        server
×
NEW
152
            .bind_rustls_0_23(http_args.https_server_tuple(), config)
×
NEW
153
            .map_err(|e| EdgeError::TlsError(e.to_string()))?
×
UNCOV
154
            .bind(http_args.http_server_tuple())
×
155
    } else {
UNCOV
156
        server.bind(http_args.http_server_tuple())
×
157
    }
NEW
158
    .map_err(|e| EdgeError::ReadyCheckError(e.to_string()))?;
×
NEW
159
    let server = server
×
UNCOV
160
        .workers(http_args.workers)
×
UNCOV
161
        .shutdown_timeout(5)
×
UNCOV
162
        .keep_alive(std::time::Duration::from_secs(keepalive_timeout))
×
UNCOV
163
        .client_request_timeout(std::time::Duration::from_secs(request_timeout));
×
NEW
164
    Ok(server.run())
×
NEW
165
}
×
166

167
#[cfg(not(tarpaulin_include))]
168
#[actix_web::main]
169
async fn main() -> Result<(), anyhow::Error> {
170
    let args = CliArgs::parse();
171
    if args.markdown_help {
172
        clap_markdown::print_help_markdown::<CliArgs>();
173
        return Ok(());
174
    }
175

176
    match args.mode {
177
        EdgeMode::Health(health_args) => health_checker::check_health(health_args).await,
178
        EdgeMode::Ready(ready_args) => ready_checker::check_ready(ready_args).await,
179
        _ => run_server(args).await,
180
    }
NEW
181
    .map_err(|e| e.into())
×
182
}
183

NEW
184
async fn run_server(args: CliArgs) -> EdgeResult<()> {
×
NEW
185
    let app_name = args.app_name.clone();
×
NEW
186
    let app_id = Ulid::new();
×
NEW
187
    let edge_instance_data = Arc::new(EdgeInstanceData::new(&args.app_name, &app_id));
×
NEW
188
    let client_meta_information = ClientMetaInformation {
×
NEW
189
        app_name: args.app_name.clone(),
×
NEW
190
        instance_id: app_id.to_string(),
×
NEW
191
        connection_id: app_id.to_string(),
×
NEW
192
    };
×
NEW
193

×
NEW
194
    let metrics_middleware = prom_metrics::instantiate(
×
NEW
195
        None,
×
NEW
196
        args.internal_backstage.disable_metrics_endpoint,
×
NEW
197
        &args.log_format,
×
NEW
198
        &edge_instance_data.clone(),
×
NEW
199
    );
×
200

NEW
201
    let custom_headers = if let EdgeMode::Edge(edge) = &args.mode {
×
NEW
202
        edge.custom_client_headers.clone()
×
203
    } else {
NEW
204
        vec![]
×
205
    };
206

NEW
207
    let edge_info = build_caches_and_refreshers(args.clone(), app_id.to_string())
×
NEW
208
        .await
×
NEW
209
        .unwrap();
×
NEW
210

×
NEW
211
    let (
×
NEW
212
        (token_cache, features_cache, _, engine_cache),
×
NEW
213
        token_validator,
×
NEW
214
        feature_refresher,
×
NEW
215
        persistence,
×
NEW
216
    ) = edge_info.clone();
×
217

NEW
218
    let instance_data_sender: Arc<InstanceDataSending> = Arc::new(InstanceDataSending::from_args(
×
NEW
219
        args.clone(),
×
NEW
220
        edge_instance_data.clone(),
×
NEW
221
        metrics_middleware.registry.clone(),
×
NEW
222
    )?);
×
NEW
223
    let instance_data_sender_for_app_context = instance_data_sender.clone();
×
NEW
224
    let lazy_feature_cache = features_cache.clone();
×
NEW
225
    let lazy_token_cache = token_cache.clone();
×
NEW
226
    let lazy_engine_cache = engine_cache.clone();
×
NEW
227
    let lazy_feature_refresher = feature_refresher.clone();
×
NEW
228
    let metrics_cache = Arc::new(MetricsCache::default());
×
NEW
229
    let metrics_cache_clone = metrics_cache.clone();
×
NEW
230

×
NEW
231
    let instances_observed_for_app_context: Arc<RwLock<Vec<EdgeInstanceData>>> =
×
NEW
232
        Arc::new(RwLock::new(Vec::new()));
×
233

NEW
234
    let server = setup_server(
×
NEW
235
        args.clone(),
×
NEW
236
        edge_info,
×
NEW
237
        metrics_middleware.clone(),
×
NEW
238
        instance_data_sender_for_app_context,
×
NEW
239
        metrics_cache.clone(),
×
NEW
240
        edge_instance_data.clone(),
×
NEW
241
        instances_observed_for_app_context.clone(),
×
NEW
242
    )?;
×
243

NEW
244
    match &args.mode {
×
UNCOV
245
        EdgeMode::Edge(edge) => {
×
UNCOV
246
            let refresher_for_background = feature_refresher.clone().unwrap();
×
UNCOV
247
            if edge.streaming {
×
UNCOV
248
                let custom_headers = custom_headers.clone();
×
UNCOV
249
                if edge.delta {
×
250
                    tokio::spawn(async move {
×
251
                        let _ = refresher_for_background
×
252
                            .start_streaming_delta_background_task(
×
NEW
253
                                client_meta_information,
×
254
                                custom_headers,
×
255
                            )
×
256
                            .await;
×
257
                    });
×
UNCOV
258
                } else {
×
259
                    tokio::spawn(async move {
×
260
                        let _ = refresher_for_background
×
261
                            .start_streaming_features_background_task(
×
NEW
262
                                client_meta_information,
×
263
                                custom_headers,
×
264
                            )
×
265
                            .await;
×
266
                    });
×
UNCOV
267
                }
×
UNCOV
268
            }
×
269

UNCOV
270
            let refresher = feature_refresher.clone().unwrap();
×
NEW
271
            let validator = token_validator.clone().unwrap();
×
UNCOV
272

×
UNCOV
273
            tokio::select! {
×
NEW
274
                _ = server => {
×
UNCOV
275
                    info!("Actix is shutting down. Persisting data");
×
NEW
276
                    clean_shutdown(persistence, lazy_feature_cache.clone(), lazy_token_cache.clone(), metrics_cache_clone.clone(), feature_refresher.clone(), InstanceDataShutdownArgs { instance_data_sending: instance_data_sender.clone(), our_instance_data: edge_instance_data.clone(), downstream_instance_data: instances_observed_for_app_context.clone() }).await;
×
UNCOV
277
                                        info!("Actix was shutdown properly");
×
278
                },
UNCOV
279
                _ = refresher.start_refresh_features_background_task() => {
×
UNCOV
280
                    info!("Feature refresher unexpectedly shut down");
×
281
                }
UNCOV
282
                _ = unleash_edge::http::background_send_metrics::send_metrics_task(metrics_cache_clone.clone(), refresher.clone(), edge.metrics_interval_seconds.try_into().unwrap()) => {
×
UNCOV
283
                    info!("Metrics poster unexpectedly shut down");
×
284
                }
UNCOV
285
                _ = persist_data(persistence.clone(), lazy_token_cache.clone(), lazy_feature_cache.clone()) => {
×
UNCOV
286
                    info!("Persister was unexpectedly shut down");
×
287
                }
UNCOV
288
                _ = validator.schedule_validation_of_known_tokens(edge.token_revalidation_interval_seconds) => {
×
UNCOV
289
                    info!("Token validator validation of known tokens was unexpectedly shut down");
×
290
                }
NEW
291
                _ = validator.schedule_revalidation_of_startup_tokens(edge.tokens.clone(), lazy_feature_refresher.clone()) => {
×
UNCOV
292
                    info!("Token validator validation of startup tokens was unexpectedly shut down");
×
293
                }
NEW
294
                _ = metrics_pusher::prometheus_remote_write(metrics_middleware.registry.clone(), edge.prometheus_remote_write_url.clone(), edge.prometheus_push_interval, edge.prometheus_username.clone(), edge.prometheus_password.clone(), app_name) => {
×
UNCOV
295
                    info!("Prometheus push unexpectedly shut down");
×
296
                }
NEW
297
                _ = unleash_edge::http::instance_data::loop_send_instance_data(instance_data_sender.clone(), edge_instance_data.clone(), instances_observed_for_app_context.clone()) => {
×
UNCOV
298
                    info!("Instance data pusher unexpectedly quit");
×
299
                }
300
            }
301
        }
UNCOV
302
        EdgeMode::Offline(offline_args) if offline_args.reload_interval > 0 => {
×
UNCOV
303
            tokio::select! {
×
NEW
304
                _ = offline_hotload::start_hotload_loop(lazy_feature_cache, lazy_engine_cache, offline_args.clone()) => {
×
UNCOV
305
                    info!("Hotloader unexpectedly shut down.");
×
306
                },
NEW
307
                _ = server => {
×
UNCOV
308
                    info!("Actix is shutting down. No pending tasks.");
×
309
                },
310
            }
311
        }
UNCOV
312
        _ => tokio::select! {
×
NEW
313
            _ = server => {
×
UNCOV
314
                info!("Actix is shutting down. Persisting data");
×
NEW
315
                clean_shutdown(persistence, lazy_feature_cache.clone(), lazy_token_cache.clone(), metrics_cache_clone.clone(), feature_refresher.clone(), InstanceDataShutdownArgs { instance_data_sending: instance_data_sender.clone(), our_instance_data: edge_instance_data.clone(), downstream_instance_data: instances_observed_for_app_context.clone() }).await;
×
UNCOV
316
                info!("Actix was shutdown properly");
×
317

318
            }
319
        },
320
    };
321

UNCOV
322
    Ok(())
×
UNCOV
323
}
×
324

325
struct InstanceDataShutdownArgs {
326
    instance_data_sending: Arc<InstanceDataSending>,
327
    our_instance_data: Arc<EdgeInstanceData>,
328
    downstream_instance_data: Arc<RwLock<Vec<EdgeInstanceData>>>,
329
}
330

331
#[cfg(not(tarpaulin_include))]
332
async fn clean_shutdown(
×
333
    persistence: Option<Arc<dyn EdgePersistence>>,
×
334
    feature_cache: Arc<FeatureCache>,
×
335
    token_cache: Arc<DashMap<String, EdgeToken>>,
×
336
    metrics_cache: Arc<MetricsCache>,
×
337
    feature_refresher: Option<Arc<FeatureRefresher>>,
×
338
    instance_data_shutdown: InstanceDataShutdownArgs,
×
339
) {
×
340
    let tokens: Vec<EdgeToken> = token_cache
×
341
        .iter()
×
342
        .filter(|e| e.value().status == TokenValidationStatus::Validated)
×
343
        .map(|entry| entry.value().clone())
×
344
        .collect();
×
345

×
346
    let features: Vec<(String, ClientFeatures)> = feature_cache
×
347
        .iter()
×
348
        .map(|entry| (entry.key().clone(), entry.value().clone()))
×
349
        .collect();
×
350

351
    if let Some(persistence) = persistence {
×
352
        let res = join_all(vec![
×
353
            persistence.save_tokens(tokens),
×
354
            persistence.save_features(features),
×
355
        ])
×
356
        .await;
×
357
        if res.iter().all(|save| save.is_ok()) {
×
358
            info!("Successfully persisted data to storage backend");
×
359
        } else {
×
360
            res.iter()
×
361
                .filter(|save| save.is_err())
×
362
                .for_each(|failed_save| tracing::error!("Failed backing up: {failed_save:?}"));
×
363
        }
×
364
    }
×
365
    if let Some(feature_refresher) = feature_refresher {
×
366
        info!("Connected to an upstream, flushing last set of metrics");
×
367
        send_metrics_one_shot(metrics_cache, feature_refresher).await;
×
368
    }
×
369
    match instance_data_shutdown.instance_data_sending.as_ref() {
×
370
        InstanceDataSending::SendInstanceData(instance_data_sender) => {
×
371
            info!("Connected to an upstream, flushing last set of instance data");
×
372
            let _ = unleash_edge::http::instance_data::send_instance_data(
×
373
                instance_data_sender,
×
374
                instance_data_shutdown.our_instance_data,
×
375
                instance_data_shutdown.downstream_instance_data,
×
376
            )
×
377
            .await;
×
378
        }
379
        InstanceDataSending::SendNothing => {
380
            info!("No instance data sender configured, skipping flushing instance data");
×
381
        }
382
    }
383
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc