• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

payjoin / rust-payjoin / 21284432896

23 Jan 2026 11:22AM UTC coverage: 82.83% (-0.4%) from 83.203%
21284432896

Pull #1296

github

web-flow
Merge f9fc9008c into 45689ba2c
Pull Request #1296: WIP:Add standalone metrics service to Payjoin-service

0 of 55 new or added lines in 1 file covered. (0.0%)

11 existing lines in 1 file now uncovered.

10140 of 12242 relevant lines covered (82.83%)

430.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.58
/payjoin-service/src/lib.rs
1
use axum::extract::State;
2
use axum::http::Method;
3
use axum::response::{IntoResponse, Response};
4
use axum::Router;
5
use config::Config;
6
use ohttp_relay::SentinelTag;
7
use rand::Rng;
8
use tokio_listener::{Listener, SystemOptions, UserOptions};
9
use tower::Service;
10
use tracing::info;
11

12
pub mod cli;
13
pub mod config;
14
pub mod metrics;
15

16
#[derive(Clone)]
17
struct Services {
18
    directory: payjoin_directory::Service<payjoin_directory::FilesDb>,
19
    relay: ohttp_relay::Service,
20
}
21

UNCOV
22
pub async fn serve(config: Config) -> anyhow::Result<()> {
×
UNCOV
23
    let sentinel_tag = generate_sentinel_tag();
×
24

UNCOV
25
    let services = Services {
×
UNCOV
26
        directory: init_directory(&config, sentinel_tag).await?,
×
27
        relay: ohttp_relay::Service::new(sentinel_tag).await,
×
28
    };
29
    let app = Router::new().fallback(route_request).with_state(services);
×
30

31
    let listener =
×
32
        Listener::bind(&config.listener, &SystemOptions::default(), &UserOptions::default())
×
33
            .await?;
×
34
    info!("Payjoin service listening on {:?}", listener.local_addr());
×
35
    axum::serve(listener, app).await?;
×
36

37
    Ok(())
×
UNCOV
38
}
×
39

40
/// Serves payjoin-service with manual TLS configuration.
41
///
42
/// Binds to `config.listener` (use port 0 to let the OS assign a free port) and returns
43
/// the actual bound port along with a task handle.
44
///
45
/// If `tls_config` is provided, the server will use TLS for incoming connections.
46
/// The `root_store` is used for outgoing relay connections to the gateway.
47
#[cfg(feature = "_manual-tls")]
48
pub async fn serve_manual_tls(
23✔
49
    config: Config,
23✔
50
    tls_config: Option<axum_server::tls_rustls::RustlsConfig>,
23✔
51
    root_store: rustls::RootCertStore,
23✔
52
) -> anyhow::Result<(u16, tokio::task::JoinHandle<anyhow::Result<()>>)> {
23✔
53
    use std::net::SocketAddr;
54

55
    let sentinel_tag = generate_sentinel_tag();
23✔
56

57
    let services = Services {
23✔
58
        directory: init_directory(&config, sentinel_tag).await?,
23✔
59
        relay: ohttp_relay::Service::new_with_roots(root_store, sentinel_tag).await,
23✔
60
    };
61
    let app = Router::new().fallback(route_request).with_state(services);
23✔
62

63
    let addr: SocketAddr = config
23✔
64
        .listener
23✔
65
        .to_string()
23✔
66
        .parse()
23✔
67
        .map_err(|_| anyhow::anyhow!("TLS mode requires a TCP address (e.g., '[::]:8080')"))?;
23✔
68
    let listener = tokio::net::TcpListener::bind(addr).await?;
23✔
69
    let port = listener.local_addr()?.port();
23✔
70

71
    let handle = match tls_config {
23✔
72
        Some(tls) => {
13✔
73
            info!("Payjoin service listening on port {} with TLS", port);
13✔
74
            tokio::spawn(async move {
13✔
75
                axum_server::from_tcp_rustls(listener.into_std()?, tls)
13✔
76
                    .serve(app.into_make_service())
13✔
77
                    .await
13✔
UNCOV
78
                    .map_err(Into::into)
×
UNCOV
79
            })
×
80
        }
81
        None => {
82
            info!("Payjoin service listening on port {} without TLS", port);
10✔
83
            tokio::spawn(async move { axum::serve(listener, app).await.map_err(Into::into) })
10✔
84
        }
85
    };
86

87
    Ok((port, handle))
23✔
88
}
23✔
89

90
/// Generate random sentinel tag at startup.
91
/// The relay and directory share this tag in a best-effort attempt
92
/// at detecting self loops.
93
fn generate_sentinel_tag() -> SentinelTag { SentinelTag::new(rand::thread_rng().gen()) }
23✔
94

95
async fn init_directory(
23✔
96
    config: &Config,
23✔
97
    sentinel_tag: SentinelTag,
23✔
98
) -> anyhow::Result<payjoin_directory::Service<payjoin_directory::FilesDb>> {
23✔
99
    let db = payjoin_directory::FilesDb::init(config.timeout, config.storage_dir.clone()).await?;
23✔
100
    db.spawn_background_prune().await;
23✔
101

102
    let ohttp_keys_dir = config.storage_dir.join("ohttp-keys");
23✔
103
    let ohttp_config = init_ohttp_config(&ohttp_keys_dir)?;
23✔
104
    let metrics = payjoin_directory::metrics::Metrics::new();
23✔
105

106
    Ok(payjoin_directory::Service::new(db, ohttp_config.into(), metrics, sentinel_tag))
23✔
107
}
23✔
108

109
fn init_ohttp_config(
23✔
110
    ohttp_keys_dir: &std::path::Path,
23✔
111
) -> anyhow::Result<payjoin_directory::ServerKeyConfig> {
23✔
112
    std::fs::create_dir_all(ohttp_keys_dir)?;
23✔
113
    match payjoin_directory::read_server_config(ohttp_keys_dir) {
23✔
UNCOV
114
        Ok(config) => Ok(config),
×
115
        Err(_) => {
116
            let config = payjoin_directory::gen_ohttp_server_config()?;
23✔
117
            payjoin_directory::persist_new_key_config(config.clone(), ohttp_keys_dir)?;
23✔
118
            Ok(config)
23✔
119
        }
120
    }
121
}
23✔
122

123
async fn route_request(
128✔
124
    State(mut services): State<Services>,
128✔
125
    req: axum::extract::Request,
128✔
126
) -> Response {
128✔
127
    if is_relay_request(&req) {
128✔
128
        match services.relay.call(req).await {
46✔
129
            Ok(res) => res.into_response(),
46✔
UNCOV
130
            Err(e) => (axum::http::StatusCode::BAD_GATEWAY, e.to_string()).into_response(),
×
131
        }
132
    } else {
133
        // The directory service handles all other requests (including 404)
134
        match services.directory.call(req).await {
82✔
135
            Ok(res) => res.into_response(),
82✔
UNCOV
136
            Err(e) =>
×
UNCOV
137
                (axum::http::StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
×
138
        }
139
    }
140
}
128✔
141

142
/// Determines if a request should be routed to the OHTTP relay service.
143
///
144
/// Routing rules:
145
/// - `(OPTIONS, _)` => CORS preflight handling
146
/// - `(CONNECT, _)` => OHTTP bootstrap tunneling
147
/// - `(POST, "/")` => relay to default gateway (needed for backwards-compatibility only)
148
/// - `(POST, /http(s)://...)` => RFC 9540 opt-in gateway specified in path
149
/// - `(GET, /http(s)://...)` => OHTTP bootstrap via WebSocket with opt-in gateway
150
fn is_relay_request(req: &axum::extract::Request) -> bool {
128✔
151
    let method = req.method();
128✔
152
    let path = req.uri().path();
128✔
153

154
    match (method, path) {
128✔
155
        (&Method::OPTIONS, _) | (&Method::CONNECT, _) | (&Method::POST, "/") => true,
72✔
156
        (&Method::POST, p) | (&Method::GET, p)
35✔
157
            if p.starts_with("/http://") || p.starts_with("/https://") =>
45✔
158
            true,
35✔
159
        _ => false,
82✔
160
    }
161
}
128✔
162

163
#[cfg(test)]
164
mod tests {
165
    use std::sync::Arc;
166
    use std::time::Duration;
167

168
    use axum_server::tls_rustls::RustlsConfig;
169
    use payjoin_test_utils::{http_agent, local_cert_key, wait_for_service_ready};
170
    use rustls::pki_types::CertificateDer;
171
    use rustls::RootCertStore;
172
    use tempfile::tempdir;
173

174
    use super::*;
175

176
    async fn start_service(
3✔
177
        cert_der: Vec<u8>,
3✔
178
        key_der: Vec<u8>,
3✔
179
    ) -> (u16, tokio::task::JoinHandle<anyhow::Result<()>>, tempfile::TempDir) {
3✔
180
        let tempdir = tempdir().unwrap();
3✔
181
        let config = Config {
3✔
182
            listener: "[::]:0".parse().expect("valid listener address"),
3✔
183
            storage_dir: tempdir.path().to_path_buf(),
3✔
184
            timeout: Duration::from_secs(2),
3✔
185
        };
3✔
186

187
        let mut root_store = RootCertStore::empty();
3✔
188
        root_store.add(CertificateDer::from(cert_der.clone())).unwrap();
3✔
189
        let tls_config = RustlsConfig::from_der(vec![cert_der], key_der).await.unwrap();
3✔
190

191
        let (port, handle) = serve_manual_tls(config, Some(tls_config), root_store).await.unwrap();
3✔
192
        (port, handle, tempdir)
3✔
193
    }
3✔
194

195
    #[tokio::test]
196
    async fn self_loop_request_is_rejected() {
1✔
197
        let cert = local_cert_key();
1✔
198
        let cert_der = cert.cert.der().to_vec();
1✔
199
        let key_der = cert.signing_key.serialize_der();
1✔
200

201
        let (port, _handle, _tempdir) = start_service(cert_der.clone(), key_der).await;
1✔
202

203
        let client = Arc::new(http_agent(cert_der.clone()).unwrap());
1✔
204
        let base_url = format!("https://localhost:{}", port);
1✔
205
        wait_for_service_ready(&base_url, client.clone()).await.unwrap();
1✔
206

207
        // Make a request through the relay that targets this same instance's directory.
208
        // The path format is /{gateway_url} where gateway_url points back to ourselves.
209
        let ohttp_req_url = format!("{}/{}", base_url, base_url);
1✔
210

211
        let response = client
1✔
212
            .post(&ohttp_req_url)
1✔
213
            .header("Content-Type", "message/ohttp-req")
1✔
214
            .body(vec![0u8; 100])
1✔
215
            .send()
1✔
216
            .await
1✔
217
            .expect("request should complete");
1✔
218

219
        assert_eq!(
1✔
220
            response.status(),
1✔
221
            axum::http::StatusCode::FORBIDDEN,
1✔
222
            "self-loop request should be rejected with 403 Forbidden"
1✔
223
        );
1✔
224
    }
1✔
225

226
    #[tokio::test]
227
    async fn cross_instance_request_is_accepted() {
1✔
228
        let cert = local_cert_key();
1✔
229
        let cert_der = cert.cert.der().to_vec();
1✔
230
        let key_der = cert.signing_key.serialize_der();
1✔
231

232
        let (relay_port, _relay_handle, _relay_tempdir) =
1✔
233
            start_service(cert_der.clone(), key_der.clone()).await;
1✔
234
        let (directory_port, _directory_handle, _directory_tempdir) =
1✔
235
            start_service(cert_der.clone(), key_der).await;
1✔
236

237
        let client = Arc::new(http_agent(cert_der).unwrap());
1✔
238
        let relay_url = format!("https://localhost:{}", relay_port);
1✔
239
        let directory_url = format!("https://localhost:{}", directory_port);
1✔
240

241
        wait_for_service_ready(&relay_url, client.clone()).await.unwrap();
1✔
242
        wait_for_service_ready(&directory_url, client.clone()).await.unwrap();
1✔
243

244
        // Make a request through the relay instance to the directory instance.
245
        // Since they're different instances with different sentinel tags, this should work.
246
        let ohttp_req_url = format!("{}/{}", relay_url, directory_url);
1✔
247

248
        let response = client
1✔
249
            .post(&ohttp_req_url)
1✔
250
            .header("Content-Type", "message/ohttp-req")
1✔
251
            .body(vec![0u8; 100])
1✔
252
            .send()
1✔
253
            .await
1✔
254
            .expect("request should complete");
1✔
255

256
        // The request may fail for other reasons (invalid OHTTP body), but not due to self-loop.
257
        assert_ne!(
1✔
258
            response.status(),
1✔
259
            axum::http::StatusCode::FORBIDDEN,
1✔
260
            "cross-instance request should not be rejected as forbidden"
1✔
261
        );
1✔
262
    }
1✔
263
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc