• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4116183752

pending completion
4116183752

push

github

GitHub
refactor: Make `LmdbRoCache` and `LmdbRwCache` `Send` and `Sync` (#821)

790 of 790 new or added lines in 44 files covered. (100.0%)

23005 of 33842 relevant lines covered (67.98%)

56312.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.82
/dozer-api/src/rest/api_server.rs
1
use super::api_generator;
2
use crate::errors::ApiError;
3
use crate::rest::api_generator::health_route;
4
use crate::{
5
    auth::api::{auth_route, validate},
6
    PipelineDetails, RoCacheEndpoint,
7
};
8
use actix_cors::Cors;
9
use actix_web::{
10
    body::MessageBody,
11
    dev::{ServerHandle, Service, ServiceFactory, ServiceRequest, ServiceResponse},
12
    middleware::{Condition, Logger},
13
    rt, web, App, HttpMessage, HttpServer,
14
};
15
use actix_web_httpauth::middleware::HttpAuthentication;
16
use dozer_types::{crossbeam::channel::Sender, log::info, models::api_config::ApiRest};
17
use dozer_types::{
18
    models::api_security::ApiSecurity,
19
    serde::{self, Deserialize, Serialize},
20
};
21
use tracing_actix_web::TracingLogger;
22

23
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
×
24
#[serde(crate = "self::serde")]
25
pub enum CorsOptions {
26
    Permissive,
27
    // origins, max_age
28
    Custom(Vec<String>, usize),
29
}
30
#[derive(Clone)]
×
31
pub struct ApiServer {
32
    shutdown_timeout: u64,
33
    port: u16,
34
    cors: CorsOptions,
35
    security: Option<ApiSecurity>,
36
    host: String,
37
}
38

39
impl Default for ApiServer {
40
    fn default() -> Self {
×
41
        Self {
×
42
            shutdown_timeout: 0,
×
43
            port: 8080,
×
44
            cors: CorsOptions::Permissive,
×
45
            security: None,
×
46
            host: "0.0.0.0".to_owned(),
×
47
        }
×
48
    }
×
49
}
50

51
impl ApiServer {
52
    pub fn new(rest_config: ApiRest, security: Option<ApiSecurity>) -> Self {
×
53
        Self {
×
54
            shutdown_timeout: 0,
×
55
            port: rest_config.port as u16,
×
56
            cors: CorsOptions::Permissive,
×
57
            security,
×
58
            host: rest_config.host,
×
59
        }
×
60
    }
×
61
    fn get_cors(cors: CorsOptions) -> Cors {
8✔
62
        match cors {
8✔
63
            CorsOptions::Permissive => Cors::permissive(),
8✔
64
            CorsOptions::Custom(origins, max_age) => origins
×
65
                .into_iter()
×
66
                .fold(Cors::default(), |cors, origin| cors.allowed_origin(&origin))
×
67
                .max_age(max_age),
×
68
        }
69
    }
8✔
70

71
    pub fn create_app_entry(
8✔
72
        security: Option<ApiSecurity>,
8✔
73
        cors: CorsOptions,
8✔
74
        cache_endpoints: Vec<RoCacheEndpoint>,
8✔
75
    ) -> App<
8✔
76
        impl ServiceFactory<
8✔
77
            ServiceRequest,
8✔
78
            Response = ServiceResponse<impl MessageBody>,
8✔
79
            Config = (),
8✔
80
            InitError = (),
8✔
81
            Error = actix_web::Error,
8✔
82
        >,
8✔
83
    > {
8✔
84
        let mut app = App::new()
8✔
85
            .wrap(Logger::default())
8✔
86
            .wrap(TracingLogger::default());
8✔
87

88
        let is_auth_configured = if let Some(api_security) = security {
8✔
89
            // Injecting API Security
90
            app = app.app_data(api_security);
4✔
91
            true
4✔
92
        } else {
93
            false
4✔
94
        };
95
        let auth_middleware =
8✔
96
            Condition::new(is_auth_configured, HttpAuthentication::bearer(validate));
8✔
97

8✔
98
        let cors_middleware = Self::get_cors(cors);
8✔
99

8✔
100
        cache_endpoints
8✔
101
            .into_iter()
8✔
102
            .fold(app, |app, cache_endpoint| {
8✔
103
                let endpoint = cache_endpoint.endpoint.clone();
8✔
104
                let scope = endpoint.path.clone();
8✔
105
                let schema_name = endpoint.name;
8✔
106
                app.service(
8✔
107
                    web::scope(&scope)
8✔
108
                        // Inject pipeline_details for generated functions
8✔
109
                        .wrap_fn(move |req, srv| {
14✔
110
                            req.extensions_mut().insert(PipelineDetails {
14✔
111
                                schema_name: schema_name.to_owned(),
14✔
112
                                cache_endpoint: cache_endpoint.clone(),
14✔
113
                            });
14✔
114
                            srv.call(req)
14✔
115
                        })
14✔
116
                        .route("/count", web::post().to(api_generator::count))
8✔
117
                        .route("/query", web::post().to(api_generator::query))
8✔
118
                        .route("/oapi", web::post().to(api_generator::generate_oapi))
8✔
119
                        .route("/{id}", web::get().to(api_generator::get))
8✔
120
                        .route("/", web::get().to(api_generator::list))
8✔
121
                        .route("", web::get().to(api_generator::list)),
8✔
122
                )
8✔
123
            })
8✔
124
            // Attach token generation route
8✔
125
            .route("/auth/token", web::post().to(auth_route))
8✔
126
            // Attach health route
8✔
127
            .route("/health", web::get().to(health_route))
8✔
128
            // Wrap Api Validator
8✔
129
            .wrap(auth_middleware)
8✔
130
            // Wrap CORS around api validator. Required to return the right headers.
8✔
131
            .wrap(cors_middleware)
8✔
132
    }
8✔
133

134
    pub async fn run(
×
135
        &self,
×
136
        cache_endpoints: Vec<RoCacheEndpoint>,
×
137
        tx: Sender<ServerHandle>,
×
138
    ) -> Result<(), ApiError> {
×
139
        info!(
140
            "Starting Rest Api Server on http://{}:{} with security: {}",
×
141
            self.host,
×
142
            self.port,
×
143
            self.security
×
144
                .as_ref()
×
145
                .map_or("None".to_string(), |s| match s {
×
146
                    ApiSecurity::Jwt(_) => "JWT".to_string(),
×
147
                })
×
148
        );
149
        let cors = self.cors.clone();
×
150
        let security = self.security.clone();
×
151
        let address = format!("{}:{}", self.host.to_owned(), self.port.to_owned());
×
152
        let server = HttpServer::new(move || {
×
153
            ApiServer::create_app_entry(
×
154
                security.to_owned(),
×
155
                cors.to_owned(),
×
156
                cache_endpoints.clone(),
×
157
            )
×
158
        })
×
159
        .bind(address.to_owned())
×
160
        .map_err(ApiError::PortAlreadyInUse)?
×
161
        .shutdown_timeout(self.shutdown_timeout.to_owned())
×
162
        .run();
×
163

×
164
        let _ = tx.send(server.handle());
×
165
        server
×
166
            .await
×
167
            .map_err(|e| ApiError::InternalError(Box::new(e)))
×
168
    }
×
169

170
    pub fn stop(server_handle: ServerHandle) {
×
171
        rt::System::new().block_on(server_handle.stop(true));
×
172
    }
×
173
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc