• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

payjoin / rust-payjoin / 21486039766

29 Jan 2026 04:23PM UTC coverage: 83.673% (+0.5%) from 83.203%
21486039766

Pull #1296

github

web-flow
Merge 741fb9cbc into 1b6cc38b5
Pull Request #1296: Add standalone metrics service to Payjoin-service

130 of 144 new or added lines in 7 files covered. (90.28%)

18 existing lines in 3 files now uncovered.

10224 of 12219 relevant lines covered (83.67%)

431.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.64
/payjoin-service/src/lib.rs
1
use std::net::SocketAddr;
2

3
use axum::extract::State;
4
use axum::http::header::CONTENT_TYPE;
5
use axum::http::Method;
6
use axum::response::{IntoResponse, Response};
7
use axum::routing::get;
8
use axum::Router;
9
use config::Config;
10
use ohttp_relay::SentinelTag;
11
use rand::Rng;
12
use tokio_listener::{Listener, SystemOptions, UserOptions};
13
use tower::{Service, ServiceBuilder};
14
use tracing::info;
15

16
pub mod cli;
17
pub mod config;
18
pub mod metrics;
19
pub mod middleware;
20

21
use crate::metrics::MetricsService;
22
use crate::middleware::{track_connections, track_metrics};
23

24
#[derive(Clone)]
25
struct Services {
26
    directory: payjoin_directory::Service<payjoin_directory::FilesDb>,
27
    relay: ohttp_relay::Service,
28
}
29

30
pub async fn serve(config: Config) -> anyhow::Result<()> {
×
31
    let sentinel_tag = generate_sentinel_tag();
×
NEW
32
    let metrics = MetricsService::new()?;
×
33

34
    let services = Services {
×
35
        directory: init_directory(&config, sentinel_tag).await?,
×
36
        relay: ohttp_relay::Service::new(sentinel_tag).await,
×
37
    };
38

NEW
39
    let app = build_app(services, metrics.clone());
×
NEW
40
    let _ = spawn_metrics_server(config.metrics_port, metrics).await?;
×
41

42
    let listener =
×
43
        Listener::bind(&config.listener, &SystemOptions::default(), &UserOptions::default())
×
44
            .await?;
×
45
    info!("Payjoin service listening on {:?}", listener.local_addr());
×
46
    axum::serve(listener, app).await?;
×
47

48
    Ok(())
×
49
}
×
50

51
/// Serves payjoin-service with manual TLS configuration.
52
///
53
/// Binds to `config.listener` (use port 0 to let the OS assign a free port) and returns
54
/// the actual bound port, the metrics port, and a task handle.
55
///
56
/// If `tls_config` is provided, the server will use TLS for incoming connections.
57
/// The `root_store` is used for outgoing relay connections to the gateway.
58
#[cfg(feature = "_manual-tls")]
59
pub async fn serve_manual_tls(
24✔
60
    config: Config,
24✔
61
    tls_config: Option<axum_server::tls_rustls::RustlsConfig>,
24✔
62
    root_store: rustls::RootCertStore,
24✔
63
) -> anyhow::Result<(u16, u16, tokio::task::JoinHandle<anyhow::Result<()>>)> {
24✔
64
    use std::net::SocketAddr;
65

66
    let sentinel_tag = generate_sentinel_tag();
24✔
67
    let metrics = MetricsService::new()?;
24✔
68

69
    let services = Services {
24✔
70
        directory: init_directory(&config, sentinel_tag).await?,
24✔
71
        relay: ohttp_relay::Service::new_with_roots(root_store, sentinel_tag).await,
24✔
72
    };
73
    let app = build_app(services, metrics.clone());
24✔
74
    let metrics_port = spawn_metrics_server(config.metrics_port, metrics).await?;
24✔
75

76
    let addr: SocketAddr = config
24✔
77
        .listener
24✔
78
        .to_string()
24✔
79
        .parse()
24✔
80
        .map_err(|_| anyhow::anyhow!("TLS mode requires a TCP address (e.g., '[::]:8080')"))?;
24✔
81
    let listener = tokio::net::TcpListener::bind(addr).await?;
24✔
82
    let port = listener.local_addr()?.port();
24✔
83

84
    let handle = match tls_config {
24✔
85
        Some(tls) => {
14✔
86
            info!("Payjoin service listening on port {} with TLS", port);
14✔
87
            tokio::spawn(async move {
14✔
88
                axum_server::from_tcp_rustls(listener.into_std()?, tls)
14✔
89
                    .serve(app.into_make_service())
14✔
90
                    .await
14✔
91
                    .map_err(Into::into)
×
92
            })
×
93
        }
94
        None => {
95
            info!("Payjoin service listening on port {} without TLS", port);
10✔
96
            tokio::spawn(async move { axum::serve(listener, app).await.map_err(Into::into) })
10✔
97
        }
98
    };
99

100
    Ok((port, metrics_port, handle))
24✔
101
}
24✔
102

103
/// Generate random sentinel tag at startup.
104
/// The relay and directory share this tag in a best-effort attempt
105
/// at detecting self loops.
106
fn generate_sentinel_tag() -> SentinelTag { SentinelTag::new(rand::thread_rng().gen()) }
24✔
107

108
async fn init_directory(
24✔
109
    config: &Config,
24✔
110
    sentinel_tag: SentinelTag,
24✔
111
) -> anyhow::Result<payjoin_directory::Service<payjoin_directory::FilesDb>> {
24✔
112
    let db = payjoin_directory::FilesDb::init(config.timeout, config.storage_dir.clone()).await?;
24✔
113
    db.spawn_background_prune().await;
24✔
114

115
    let ohttp_keys_dir = config.storage_dir.join("ohttp-keys");
24✔
116
    let ohttp_config = init_ohttp_config(&ohttp_keys_dir)?;
24✔
117

118
    Ok(payjoin_directory::Service::new(db, ohttp_config.into(), sentinel_tag))
24✔
119
}
24✔
120

121
fn init_ohttp_config(
24✔
122
    ohttp_keys_dir: &std::path::Path,
24✔
123
) -> anyhow::Result<payjoin_directory::ServerKeyConfig> {
24✔
124
    std::fs::create_dir_all(ohttp_keys_dir)?;
24✔
125
    match payjoin_directory::read_server_config(ohttp_keys_dir) {
24✔
126
        Ok(config) => Ok(config),
×
127
        Err(_) => {
128
            let config = payjoin_directory::gen_ohttp_server_config()?;
24✔
129
            payjoin_directory::persist_new_key_config(config.clone(), ohttp_keys_dir)?;
24✔
130
            Ok(config)
24✔
131
        }
132
    }
133
}
24✔
134

135
fn build_app(services: Services, metrics: MetricsService) -> Router {
24✔
136
    Router::new()
24✔
137
        .fallback(route_request)
24✔
138
        .layer(
24✔
139
            ServiceBuilder::new()
24✔
140
                .layer(axum::middleware::from_fn_with_state(metrics.clone(), track_metrics))
24✔
141
                .layer(axum::middleware::from_fn_with_state(metrics, track_connections)),
24✔
142
        )
143
        .with_state(services)
24✔
144
}
24✔
145

146
fn build_metrics_app(metrics: MetricsService) -> Router {
24✔
147
    Router::new().route("/metrics", get(metrics_handler)).with_state(metrics)
24✔
148
}
24✔
149

150
async fn metrics_handler(State(metrics): State<MetricsService>) -> impl IntoResponse {
1✔
151
    match metrics.encode_metrics() {
1✔
152
        Ok(body) => (
1✔
153
            axum::http::StatusCode::OK,
1✔
154
            [(CONTENT_TYPE, "text/plain; version=0.0.4; charset=utf-8")],
1✔
155
            body,
1✔
156
        )
1✔
157
            .into_response(),
1✔
NEW
158
        Err(e) => (
×
NEW
159
            axum::http::StatusCode::INTERNAL_SERVER_ERROR,
×
NEW
160
            format!("Failed to encode metrics: {}", e),
×
NEW
161
        )
×
NEW
162
            .into_response(),
×
163
    }
164
}
1✔
165

166
async fn spawn_metrics_server(metrics_port: u16, metrics: MetricsService) -> anyhow::Result<u16> {
24✔
167
    let addr: SocketAddr =
24✔
168
        format!("[::]:{metrics_port}").parse().expect("metrics bind address is valid");
24✔
169
    let listener = tokio::net::TcpListener::bind(addr).await?;
24✔
170
    let actual_port = listener.local_addr()?.port();
24✔
171
    info!("Metrics server listening on [::]:{actual_port}");
24✔
172
    tokio::spawn(async move {
24✔
173
        let app = build_metrics_app(metrics);
24✔
174
        if let Err(e) = axum::serve(listener, app).await {
24✔
NEW
175
            tracing::error!("Metrics server error: {e}");
×
NEW
176
        }
×
NEW
177
    });
×
178
    Ok(actual_port)
24✔
179
}
24✔
180

181
async fn route_request(
129✔
182
    State(mut services): State<Services>,
129✔
183
    req: axum::extract::Request,
129✔
184
) -> Response {
129✔
185
    if is_relay_request(&req) {
129✔
186
        match services.relay.call(req).await {
46✔
187
            Ok(res) => res.into_response(),
46✔
UNCOV
188
            Err(e) => (axum::http::StatusCode::BAD_GATEWAY, e.to_string()).into_response(),
×
189
        }
190
    } else {
191
        // The directory service handles all other requests (including 404)
192
        match services.directory.call(req).await {
83✔
193
            Ok(res) => res.into_response(),
83✔
UNCOV
194
            Err(e) =>
×
UNCOV
195
                (axum::http::StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
×
196
        }
197
    }
198
}
129✔
199

200
/// Determines if a request should be routed to the OHTTP relay service.
201
///
202
/// Routing rules:
203
/// - `(OPTIONS, _)` => CORS preflight handling
204
/// - `(CONNECT, _)` => OHTTP bootstrap tunneling
205
/// - `(POST, "/")` => relay to default gateway (needed for backwards-compatibility only)
206
/// - `(POST, /http(s)://...)` => RFC 9540 opt-in gateway specified in path
207
/// - `(GET, /http(s)://...)` => OHTTP bootstrap via WebSocket with opt-in gateway
208
fn is_relay_request(req: &axum::extract::Request) -> bool {
129✔
209
    let method = req.method();
129✔
210
    let path = req.uri().path();
129✔
211

212
    match (method, path) {
129✔
213
        (&Method::OPTIONS, _) | (&Method::CONNECT, _) | (&Method::POST, "/") => true,
72✔
214
        (&Method::POST, p) | (&Method::GET, p)
35✔
215
            if p.starts_with("/http://") || p.starts_with("/https://") =>
46✔
216
            true,
35✔
217
        _ => false,
83✔
218
    }
219
}
129✔
220

221
#[cfg(test)]
222
mod tests {
223
    use std::sync::Arc;
224
    use std::time::Duration;
225

226
    use axum_server::tls_rustls::RustlsConfig;
227
    use payjoin_test_utils::{http_agent, local_cert_key, wait_for_service_ready};
228
    use rustls::pki_types::CertificateDer;
229
    use rustls::RootCertStore;
230
    use tempfile::tempdir;
231

232
    use super::*;
233

234
    async fn start_service(
4✔
235
        cert_der: Vec<u8>,
4✔
236
        key_der: Vec<u8>,
4✔
237
    ) -> (u16, u16, tokio::task::JoinHandle<anyhow::Result<()>>, tempfile::TempDir) {
4✔
238
        let tempdir = tempdir().unwrap();
4✔
239
        let config = Config {
4✔
240
            listener: "[::]:0".parse().expect("valid listener address"),
4✔
241
            storage_dir: tempdir.path().to_path_buf(),
4✔
242
            timeout: Duration::from_secs(2),
4✔
243
            metrics_port: 0,
4✔
244
        };
4✔
245

246
        let mut root_store = RootCertStore::empty();
4✔
247
        root_store.add(CertificateDer::from(cert_der.clone())).unwrap();
4✔
248
        let tls_config = RustlsConfig::from_der(vec![cert_der], key_der).await.unwrap();
4✔
249

250
        let (port, metrics_port, handle) =
4✔
251
            serve_manual_tls(config, Some(tls_config), root_store).await.unwrap();
4✔
252
        (port, metrics_port, handle, tempdir)
4✔
253
    }
4✔
254

255
    #[tokio::test]
256
    async fn self_loop_request_is_rejected() {
1✔
257
        let cert = local_cert_key();
1✔
258
        let cert_der = cert.cert.der().to_vec();
1✔
259
        let key_der = cert.signing_key.serialize_der();
1✔
260

261
        let (port, _metrics_port, _handle, _tempdir) =
1✔
262
            start_service(cert_der.clone(), key_der).await;
1✔
263

264
        let client = Arc::new(http_agent(cert_der.clone()).unwrap());
1✔
265
        let base_url = format!("https://localhost:{}", port);
1✔
266
        wait_for_service_ready(&base_url, client.clone()).await.unwrap();
1✔
267

268
        // Make a request through the relay that targets this same instance's directory.
269
        // The path format is /{gateway_url} where gateway_url points back to ourselves.
270
        let ohttp_req_url = format!("{}/{}", base_url, base_url);
1✔
271

272
        let response = client
1✔
273
            .post(&ohttp_req_url)
1✔
274
            .header("Content-Type", "message/ohttp-req")
1✔
275
            .body(vec![0u8; 100])
1✔
276
            .send()
1✔
277
            .await
1✔
278
            .expect("request should complete");
1✔
279

280
        assert_eq!(
1✔
281
            response.status(),
1✔
282
            axum::http::StatusCode::FORBIDDEN,
1✔
283
            "self-loop request should be rejected with 403 Forbidden"
1✔
284
        );
1✔
285
    }
1✔
286

287
    #[tokio::test]
288
    async fn cross_instance_request_is_accepted() {
1✔
289
        let cert = local_cert_key();
1✔
290
        let cert_der = cert.cert.der().to_vec();
1✔
291
        let key_der = cert.signing_key.serialize_der();
1✔
292

293
        let (relay_port, _relay_metrics, _relay_handle, _relay_tempdir) =
1✔
294
            start_service(cert_der.clone(), key_der.clone()).await;
1✔
295
        let (directory_port, _directory_metrics, _directory_handle, _directory_tempdir) =
1✔
296
            start_service(cert_der.clone(), key_der).await;
1✔
297

298
        let client = Arc::new(http_agent(cert_der).unwrap());
1✔
299
        let relay_url = format!("https://localhost:{}", relay_port);
1✔
300
        let directory_url = format!("https://localhost:{}", directory_port);
1✔
301

302
        wait_for_service_ready(&relay_url, client.clone()).await.unwrap();
1✔
303
        wait_for_service_ready(&directory_url, client.clone()).await.unwrap();
1✔
304

305
        // Make a request through the relay instance to the directory instance.
306
        // Since they're different instances with different sentinel tags, this should work.
307
        let ohttp_req_url = format!("{}/{}", relay_url, directory_url);
1✔
308

309
        let response = client
1✔
310
            .post(&ohttp_req_url)
1✔
311
            .header("Content-Type", "message/ohttp-req")
1✔
312
            .body(vec![0u8; 100])
1✔
313
            .send()
1✔
314
            .await
1✔
315
            .expect("request should complete");
1✔
316

317
        // The request may fail for other reasons (invalid OHTTP body), but not due to self-loop.
318
        assert_ne!(
1✔
319
            response.status(),
1✔
320
            axum::http::StatusCode::FORBIDDEN,
1✔
321
            "cross-instance request should not be rejected as forbidden"
1✔
322
        );
1✔
323
    }
1✔
324

325
    #[tokio::test]
326
    async fn metrics_endpoint_works() {
1✔
327
        let cert = local_cert_key();
1✔
328
        let cert_der = cert.cert.der().to_vec();
1✔
329
        let key_der = cert.signing_key.serialize_der();
1✔
330

331
        let (port, metrics_port, _handle, _tempdir) =
1✔
332
            start_service(cert_der.clone(), key_der).await;
1✔
333

334
        let client = Arc::new(http_agent(cert_der).unwrap());
1✔
335
        let base_url = format!("https://localhost:{}", port);
1✔
336
        wait_for_service_ready(&base_url, client.clone()).await.unwrap();
1✔
337

338
        let metrics_url = format!("http://localhost:{}/metrics", metrics_port);
1✔
339
        let http_client = reqwest::Client::new();
1✔
340
        let response =
1✔
341
            http_client.get(&metrics_url).send().await.expect("metrics request should work");
1✔
342

343
        assert_eq!(response.status(), axum::http::StatusCode::OK);
1✔
344
        let body = response.text().await.unwrap();
1✔
345
        assert!(body.contains("http_request_total"));
1✔
346
        assert!(body.contains("active_connections"));
1✔
347
    }
1✔
348
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc