• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

payjoin / rust-payjoin / 24684607356

20 Apr 2026 06:55PM UTC coverage: 84.903% (+0.4%) from 84.502%
24684607356

Pull #1377

github

web-flow
Merge 716a65517 into f93247e3a
Pull Request #1377: Use internal Url struct in favor of url::Url to minimize the url dep in payjoin

467 of 487 new or added lines in 16 files covered. (95.89%)

2 existing lines in 2 files now uncovered.

11343 of 13360 relevant lines covered (84.9%)

401.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.46
/payjoin-cli/src/app/v1.rs
1
use std::collections::HashMap;
2
use std::net::SocketAddr;
3
use std::str::FromStr;
4
use std::sync::Arc;
5

6
use anyhow::{anyhow, Context, Result};
7
use http_body_util::combinators::BoxBody;
8
use http_body_util::{BodyExt, Full};
9
use hyper::body::{Bytes, Incoming};
10
use hyper::server::conn::http1;
11
use hyper::service::service_fn;
12
use hyper::{Method, Request, Response, StatusCode};
13
use hyper_util::rt::TokioIo;
14
use payjoin::bitcoin::psbt::Psbt;
15
use payjoin::bitcoin::{Amount, FeeRate};
16
use payjoin::receive::v1::{PayjoinProposal, UncheckedOriginalPayload};
17
use payjoin::receive::Error;
18
use payjoin::send::v1::SenderBuilder;
19
use payjoin::{ImplementationError, IntoUrl, Uri, UriExt};
20
use tokio::net::TcpListener;
21
use tokio::sync::watch;
22

23
use super::config::Config;
24
use super::wallet::BitcoindWallet;
25
use super::App as AppTrait;
26
use crate::app::{handle_interrupt, http_agent};
27
use crate::db::Database;
28

29
struct Headers<'a>(&'a hyper::HeaderMap);
30
impl payjoin::receive::v1::Headers for Headers<'_> {
31
    fn get_header(&self, key: &str) -> Option<&str> {
6✔
32
        self.0.get(key).map(|v| v.to_str()).transpose().ok().flatten()
6✔
33
    }
6✔
34
}
35

36
#[derive(Clone)]
37
pub(crate) struct App {
38
    config: Config,
39
    db: Arc<Database>,
40
    wallet: BitcoindWallet,
41
    interrupt: watch::Receiver<()>,
42
}
43

44
#[async_trait::async_trait]
45
impl AppTrait for App {
46
    async fn new(config: Config) -> Result<Self> {
5✔
47
        let db = Arc::new(Database::create(&config.db_path)?);
48
        let (interrupt_tx, interrupt_rx) = watch::channel(());
49
        tokio::spawn(handle_interrupt(interrupt_tx));
50
        let wallet = BitcoindWallet::new(&config.bitcoind).await?;
51
        let app = Self { config, db, wallet, interrupt: interrupt_rx };
52
        app.wallet()
53
            .network()
54
            .context("Failed to connect to bitcoind. Check config RPC connection.")?;
55
        Ok(app)
56
    }
5✔
57

58
    fn wallet(&self) -> BitcoindWallet { self.wallet.clone() }
16✔
59

60
    async fn send_payjoin(&self, bip21: &str, fee_rate: FeeRate) -> Result<()> {
2✔
61
        let uri =
62
            Uri::try_from(bip21).map_err(|e| anyhow!("Failed to create URI from BIP21: {}", e))?;
×
63
        let uri = uri.assume_checked();
64
        let uri = uri.check_pj_supported().map_err(|_| anyhow!("URI does not support Payjoin"))?;
×
65
        let amount = uri.amount.ok_or_else(|| anyhow!("please specify the amount in the Uri"))?;
×
66
        let psbt = self.create_original_psbt(&uri.address, amount, fee_rate)?;
67
        let (req, ctx) = SenderBuilder::new(psbt, uri.clone())
68
            .build_recommended(fee_rate)
69
            .with_context(|| "Failed to build payjoin request")?
70
            .create_v1_post_request();
71
        let http = http_agent(&self.config)?;
72
        let body = String::from_utf8(req.body.clone()).unwrap();
73
        println!("Sending fallback request to {}", &req.url);
74
        let response = http
75
            .post(req.url)
76
            .header("Content-Type", req.content_type)
77
            .body(body.clone())
78
            .send()
79
            .await
80
            .with_context(|| "HTTP request failed")?;
81
        let fallback_tx = Psbt::from_str(&body)
82
            .map_err(|e| anyhow!("Failed to load PSBT from base64: {}", e))?
×
83
            .extract_tx()?;
84
        println!("Sent fallback transaction txid: {}", fallback_tx.compute_txid());
85
        println!(
86
            "Sent fallback transaction hex: {:#}",
87
            payjoin::bitcoin::consensus::encode::serialize_hex(&fallback_tx)
88
        );
89
        let psbt = ctx.process_response(&response.bytes().await?).map_err(|e| {
×
90
            tracing::debug!("Error processing response: {e:?}");
×
91
            anyhow!("Failed to process response {e}")
×
92
        })?;
×
93

94
        self.process_pj_response(psbt)?;
95
        Ok(())
96
    }
2✔
97

98
    #[allow(clippy::incompatible_msrv)]
99
    async fn receive_payjoin(&self, amount: Amount) -> Result<()> {
3✔
100
        let mut interrupt = self.interrupt.clone();
101
        tokio::select! {
102
            res = self.start_http_server(amount) => { res?; }
103
            _ = interrupt.changed() => {
104
                println!("Interrupted.");
105
            }
106
        }
107
        Ok(())
108
    }
3✔
109

110
    #[cfg(feature = "v2")]
111
    async fn resume_payjoins(&self) -> Result<()> {
×
112
        unimplemented!("resume_payjoins not implemented for v1");
113
    }
×
114

115
    #[cfg(feature = "v2")]
116
    async fn history(&self) -> Result<()> {
×
117
        unimplemented!("history not implemented for v1");
118
    }
×
119
}
120

121
impl App {
122
    fn construct_payjoin_uri(&self, amount: Amount, endpoint: impl IntoUrl) -> Result<String> {
3✔
123
        let pj_receiver_address = self.wallet.get_new_address()?;
3✔
124

125
        let mut pj_uri = payjoin::receive::v1::build_v1_pj_uri(
3✔
126
            &pj_receiver_address,
3✔
127
            endpoint,
3✔
128
            payjoin::OutputSubstitution::Enabled,
3✔
129
        )?;
×
130
        pj_uri.amount = Some(amount);
3✔
131

132
        Ok(pj_uri.to_string())
3✔
133
    }
3✔
134

135
    async fn start_http_server(&self, amount: Amount) -> Result<()> {
3✔
136
        let port = self.config.v1()?.port;
3✔
137
        let addr = SocketAddr::from(([0, 0, 0, 0], port));
3✔
138
        let listener = TcpListener::bind(addr).await?;
3✔
139

140
        let mut endpoint = self.config.v1()?.pj_endpoint.clone();
3✔
141

142
        // If --port 0 is specified, a free port is chosen, so we need to set it
143
        // on the endpoint which must not have a port.
144
        if port == 0 {
3✔
145
            endpoint.set_port(Some(listener.local_addr()?.port()));
3✔
UNCOV
146
        }
×
147

148
        let pj_uri_string = self.construct_payjoin_uri(amount, endpoint.as_str())?;
3✔
149
        println!(
3✔
150
            "Listening at {}. Configured to accept payjoin at BIP 21 Payjoin Uri:",
151
            listener.local_addr()?
3✔
152
        );
153

154
        #[cfg(not(feature = "_manual-tls"))]
155
        println!(
156
            "Make sure to configure a reverse proxy to handle TLS termination for the listener"
157
        );
158

159
        println!("{pj_uri_string}");
3✔
160

161
        let app = self.clone();
3✔
162

163
        #[cfg(feature = "_manual-tls")]
164
        let tls_acceptor = self.init_tls_acceptor()?;
3✔
165
        while let Ok((stream, _)) = listener.accept().await {
6✔
166
            let app = app.clone();
3✔
167
            #[cfg(feature = "_manual-tls")]
168
            let tls_acceptor = tls_acceptor.clone();
3✔
169
            tokio::spawn(async move {
3✔
170
                #[cfg(feature = "_manual-tls")]
171
                let stream = match tls_acceptor.accept(stream).await {
3✔
172
                    Ok(tls_stream) => tls_stream,
3✔
173
                    Err(e) => {
×
174
                        tracing::error!("TLS accept error: {e}");
×
175
                        return;
×
176
                    }
177
                };
178

179
                let _ = http1::Builder::new()
3✔
180
                    .serve_connection(
3✔
181
                        TokioIo::new(stream),
3✔
182
                        service_fn(move |req| app.clone().handle_web_request(req)),
3✔
183
                    )
184
                    .await;
3✔
185
            });
3✔
186
        }
187
        Ok(())
×
188
    }
×
189

190
    #[cfg(feature = "_manual-tls")]
191
    fn init_tls_acceptor(&self) -> Result<tokio_rustls::TlsAcceptor> {
3✔
192
        use std::sync::Arc;
193

194
        use tokio_rustls::rustls::pki_types::{CertificateDer, PrivateKeyDer};
195
        use tokio_rustls::rustls::ServerConfig;
196
        use tokio_rustls::TlsAcceptor;
197

198
        let key_der = std::fs::read(
3✔
199
            self.config
3✔
200
                .certificate_key
3✔
201
                .as_ref()
3✔
202
                .expect("certificate key is required if listening with tls"),
3✔
203
        )?;
×
204
        let key = PrivateKeyDer::try_from(key_der.clone())
3✔
205
            .map_err(|e| anyhow::anyhow!("Could not parse key: {}", e))?;
3✔
206

207
        let cert_der = std::fs::read(
3✔
208
            self.config
3✔
209
                .root_certificate
3✔
210
                .as_ref()
3✔
211
                .expect("certificate key is required if listening with tls"),
3✔
212
        )?;
×
213
        let certs = vec![CertificateDer::from(cert_der)];
3✔
214

215
        let mut server_config = ServerConfig::builder()
3✔
216
            .with_no_client_auth()
3✔
217
            .with_single_cert(certs, key)
3✔
218
            .map_err(|e| anyhow::anyhow!("TLS error: {}", e))?;
3✔
219

220
        server_config.alpn_protocols =
3✔
221
            vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()];
3✔
222

223
        Ok(TlsAcceptor::from(Arc::new(server_config)))
3✔
224
    }
3✔
225

226
    async fn handle_web_request(
3✔
227
        self,
3✔
228
        req: Request<Incoming>,
3✔
229
    ) -> Result<Response<BoxBody<Bytes, hyper::Error>>> {
3✔
230
        tracing::trace!("Received {} request to {}", req.method(), req.uri().path());
3✔
231
        let mut response = match (req.method(), req.uri().path()) {
3✔
232
            (&Method::GET, "/bip21") => {
×
233
                let query_string = req.uri().query().unwrap_or("");
×
234
                tracing::trace!("{:?}, {query_string:?}", req.method());
×
NEW
235
                let query_url = payjoin::Url::parse(&format!("http://localhost/?{query_string}"))
×
NEW
236
                    .expect("valid query URL");
×
NEW
237
                let query_params: HashMap<String, String> =
×
NEW
238
                    query_url.query_pairs().into_iter().collect();
×
239
                let amount = query_params.get("amount").map(|amt| {
×
240
                    Amount::from_btc(amt.parse().expect("Failed to parse amount")).unwrap()
×
241
                });
×
242
                self.handle_get_bip21(amount)
×
243
                    .map_err(|e| {
×
244
                        tracing::error!("Error handling request: {e}");
×
245
                        Response::builder().status(500).body(full(e.to_string())).unwrap()
×
246
                    })
×
247
                    .unwrap_or_else(|err_resp| err_resp)
×
248
            }
249
            (&Method::POST, _) => self
3✔
250
                .handle_payjoin_post(req)
3✔
251
                .await
3✔
252
                .map_err(|e| {
3✔
253
                    let json = payjoin::receive::JsonReply::from(&e);
×
254
                    tracing::error!("Error handling request: {e}");
×
255
                    Response::builder()
×
256
                        .status(json.status_code())
×
257
                        .body(full(json.to_json().to_string()))
×
258
                        .unwrap()
×
259
                })
×
260
                .unwrap_or_else(|err_resp| err_resp),
3✔
261
            _ => Response::builder().status(StatusCode::NOT_FOUND).body(full("Not found")).unwrap(),
×
262
        };
263
        response
3✔
264
            .headers_mut()
3✔
265
            .insert("Access-Control-Allow-Origin", hyper::header::HeaderValue::from_static("*"));
3✔
266
        Ok(response)
3✔
267
    }
3✔
268

269
    fn handle_get_bip21(
×
270
        &self,
×
271
        amount: Option<Amount>,
×
272
    ) -> Result<Response<BoxBody<Bytes, hyper::Error>>, Error> {
×
273
        let v1_config = self.config.v1().map_err(|e| {
×
274
            Error::Implementation(ImplementationError::from(e.into_boxed_dyn_error()))
×
275
        })?;
×
276
        let address = self.wallet.get_new_address().map_err(|e| {
×
277
            Error::Implementation(ImplementationError::from(e.into_boxed_dyn_error()))
×
278
        })?;
×
279
        let uri_string = if let Some(amount) = amount {
×
280
            format!(
×
281
                "{}?amount={}&pj={}",
282
                address.to_qr_uri(),
×
283
                amount.to_btc(),
×
284
                v1_config.pj_endpoint
285
            )
286
        } else {
287
            format!("{}?pj={}", address.to_qr_uri(), v1_config.pj_endpoint)
×
288
        };
289
        let uri = Uri::try_from(uri_string.clone()).map_err(|_| {
×
290
            Error::Implementation(ImplementationError::from(
×
291
                anyhow!("Could not parse payjoin URI string.").into_boxed_dyn_error(),
×
292
            ))
×
293
        })?;
×
294
        let _ = uri.assume_checked(); // we just got it from bitcoind above
×
295

296
        Ok(Response::new(full(uri_string)))
×
297
    }
×
298

299
    async fn handle_payjoin_post(
3✔
300
        &self,
3✔
301
        req: Request<Incoming>,
3✔
302
    ) -> Result<Response<BoxBody<Bytes, hyper::Error>>, Error> {
3✔
303
        let (parts, body) = req.into_parts();
3✔
304
        let headers = Headers(&parts.headers);
3✔
305
        let query_string = parts.uri.query().unwrap_or("");
3✔
306
        let body = body
3✔
307
            .collect()
3✔
308
            .await
3✔
309
            .map_err(|e| Error::Implementation(ImplementationError::new(e)))?
3✔
310
            .to_bytes();
3✔
311
        let proposal = UncheckedOriginalPayload::from_request(&body, query_string, headers)?;
3✔
312

313
        let payjoin_proposal = self.process_v1_proposal(proposal)?;
3✔
314
        let psbt = payjoin_proposal.psbt();
3✔
315
        let body = psbt.to_string();
3✔
316
        println!(
3✔
317
            "Responded with Payjoin proposal {}",
318
            psbt.clone().extract_tx_unchecked_fee_rate().compute_txid()
3✔
319
        );
320
        Ok(Response::new(full(body)))
3✔
321
    }
3✔
322

323
    fn process_v1_proposal(
3✔
324
        &self,
3✔
325
        proposal: UncheckedOriginalPayload,
3✔
326
    ) -> Result<PayjoinProposal, Error> {
3✔
327
        let wallet = self.wallet();
3✔
328

329
        // Receive Check 1: Can Broadcast
330
        let proposal = proposal.check_broadcast_suitability(None, |tx| {
3✔
331
            wallet
3✔
332
                .can_broadcast(tx)
3✔
333
                .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
3✔
334
        })?;
3✔
335
        tracing::trace!("check1");
3✔
336

337
        // in a payment processor where the sender could go offline, this is where you schedule to broadcast the original_tx
338
        let _to_broadcast_in_failure_case = proposal.extract_tx_to_schedule_broadcast();
3✔
339

340
        // Receive Check 2: receiver can't sign for proposal inputs
341
        let proposal = proposal.check_inputs_not_owned(&mut |input| {
3✔
342
            wallet.is_mine(input).map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
3✔
343
        })?;
3✔
344
        tracing::trace!("check2");
3✔
345

346
        // Receive Check 3: have we seen this input before? More of a check for non-interactive i.e. payment processor receivers.
347
        let payjoin = proposal.check_no_inputs_seen_before(&mut |input| {
3✔
348
            Ok(self.db.insert_input_seen_before(*input)?)
3✔
349
        })?;
3✔
350
        tracing::trace!("check3");
3✔
351

352
        let payjoin = payjoin.identify_receiver_outputs(&mut |output_script| {
6✔
353
            wallet
6✔
354
                .is_mine(output_script)
6✔
355
                .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
6✔
356
        })?;
6✔
357

358
        let payjoin = payjoin
3✔
359
            .substitute_receiver_script(
3✔
360
                &self
3✔
361
                    .wallet
3✔
362
                    .get_new_address()
3✔
363
                    .map_err(|e| {
3✔
364
                        Error::Implementation(ImplementationError::from(e.into_boxed_dyn_error()))
×
365
                    })?
×
366
                    .script_pubkey(),
3✔
367
            )
368
            .map_err(|e| Error::Implementation(ImplementationError::new(e)))?
3✔
369
            .commit_outputs();
3✔
370

371
        let wants_fee_range = try_contributing_inputs(payjoin.clone(), &self.wallet)
3✔
372
            .map_err(Error::Implementation)?;
3✔
373
        let provisional_payjoin =
3✔
374
            wants_fee_range.apply_fee_range(None, self.config.max_fee_rate)?;
3✔
375

376
        let payjoin_proposal = provisional_payjoin.finalize_proposal(|psbt| {
3✔
377
            self.wallet
3✔
378
                .process_psbt(psbt)
3✔
379
                .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
3✔
380
        })?;
3✔
381
        Ok(payjoin_proposal)
3✔
382
    }
3✔
383
}
384

385
fn try_contributing_inputs(
3✔
386
    payjoin: payjoin::receive::v1::WantsInputs,
3✔
387
    wallet: &BitcoindWallet,
3✔
388
) -> Result<payjoin::receive::v1::WantsFeeRange, ImplementationError> {
3✔
389
    let candidate_inputs =
3✔
390
        wallet.list_unspent().map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))?;
3✔
391

392
    if candidate_inputs.is_empty() {
3✔
393
        return Err(ImplementationError::from(
×
394
            anyhow::anyhow!(
×
395
                "No spendable UTXOs available in wallet. Please fund your wallet before resuming this session"
×
396
            )
×
397
            .into_boxed_dyn_error(),
×
398
        ));
×
399
    }
3✔
400

401
    let selected_input =
3✔
402
        payjoin.try_preserving_privacy(candidate_inputs).map_err(ImplementationError::new)?;
3✔
403

404
    Ok(payjoin
3✔
405
        .contribute_inputs(vec![selected_input])
3✔
406
        .map_err(ImplementationError::new)?
3✔
407
        .commit_inputs())
3✔
408
}
3✔
409

410
fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
3✔
411
    Full::new(chunk.into()).map_err(|never| match never {}).boxed()
3✔
412
}
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc