• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

payjoin / rust-payjoin / 25993736976

17 May 2026 02:36PM UTC coverage: 85.199% (-0.1%) from 85.297%
25993736976

Pull #1557

github

web-flow
Merge 40bca0c24 into bf6e0603b
Pull Request #1557: Sender cancel state

34 of 58 new or added lines in 6 files covered. (58.62%)

90 existing lines in 5 files now uncovered.

11691 of 13722 relevant lines covered (85.2%)

394.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.75
/payjoin-cli/src/app/v1.rs
1
use std::collections::HashMap;
2
use std::net::SocketAddr;
3
use std::sync::Arc;
4

5
use anyhow::{anyhow, Context, Result};
6
use http_body_util::combinators::BoxBody;
7
use http_body_util::{BodyExt, Full};
8
use hyper::body::{Bytes, Incoming};
9
use hyper::server::conn::http1;
10
use hyper::service::service_fn;
11
use hyper::{Method, Request, Response, StatusCode};
12
use hyper_util::rt::TokioIo;
13
use payjoin::bitcoin::consensus::encode::serialize_hex;
14
use payjoin::bitcoin::{Amount, FeeRate};
15
use payjoin::receive::v1::{PayjoinProposal, UncheckedOriginalPayload};
16
use payjoin::receive::Error;
17
use payjoin::send::v1::SenderBuilder;
18
use payjoin::{ImplementationError, IntoUrl, Uri, UriExt};
19
use tokio::net::TcpListener;
20
use tokio::sync::watch;
21

22
use super::config::Config;
23
use super::wallet::BitcoindWallet;
24
use super::App as AppTrait;
25
use crate::app::{handle_interrupt, http_agent};
26
use crate::db::Database;
27

28
struct Headers<'a>(&'a hyper::HeaderMap);
29
impl payjoin::receive::v1::Headers for Headers<'_> {
30
    fn get_header(&self, key: &str) -> Option<&str> {
6✔
31
        self.0.get(key).map(|v| v.to_str()).transpose().ok().flatten()
6✔
32
    }
6✔
33
}
34

35
#[derive(Clone)]
36
pub(crate) struct App {
37
    config: Config,
38
    db: Arc<Database>,
39
    wallet: BitcoindWallet,
40
    interrupt: watch::Receiver<()>,
41
}
42

43
#[async_trait::async_trait]
44
impl AppTrait for App {
45
    async fn new(config: Config) -> Result<Self> {
5✔
46
        let db = Arc::new(Database::create(&config.db_path)?);
47
        let (interrupt_tx, interrupt_rx) = watch::channel(());
48
        tokio::spawn(handle_interrupt(interrupt_tx));
49
        let wallet = BitcoindWallet::new(&config.bitcoind).await?;
50
        let app = Self { config, db, wallet, interrupt: interrupt_rx };
51
        app.wallet()
52
            .network()
53
            .context("Failed to connect to bitcoind. Check config RPC connection.")?;
54
        Ok(app)
55
    }
5✔
56

57
    fn wallet(&self) -> BitcoindWallet { self.wallet.clone() }
16✔
58

59
    async fn send_payjoin(&self, bip21: &str, fee_rate: FeeRate) -> Result<()> {
2✔
60
        let uri =
61
            Uri::try_from(bip21).map_err(|e| anyhow!("Failed to create URI from BIP21: {}", e))?;
×
62
        let uri = uri.assume_checked();
63
        let uri = uri.check_pj_supported().map_err(|_| anyhow!("URI does not support Payjoin"))?;
×
64
        let amount = uri.amount.ok_or_else(|| anyhow!("please specify the amount in the Uri"))?;
×
65
        let psbt = self.create_original_psbt(&uri.address, amount, fee_rate)?;
66
        let fallback_tx = psbt.clone().extract_tx()?;
67
        let (req, ctx) = SenderBuilder::new(psbt, uri.clone())
68
            .build_recommended(fee_rate)
69
            .with_context(|| "Failed to build payjoin request")?
70
            .create_v1_post_request();
71
        let http = http_agent(&self.config)?;
72
        let body = String::from_utf8(req.body.clone()).unwrap();
73
        println!("Sending Original PSBT to {}", req.url);
74
        let response = match http
75
            .post(req.url)
76
            .header("Content-Type", req.content_type)
77
            .body(body.clone())
78
            .send()
79
            .await
80
        {
81
            Ok(response) => response,
82
            Err(e) => {
83
                tracing::error!("HTTP request failed: {e}");
84
                println!("Payjoin failed. To broadcast the fallback transaction, run:");
85
                println!(
86
                    "  bitcoin-cli -rpcwallet=<wallet> sendrawtransaction {:#}",
87
                    serialize_hex(&fallback_tx)
88
                );
89
                return Err(anyhow!("HTTP request failed: {e}"));
90
            }
91
        };
92
        let psbt = match ctx.process_response(&response.bytes().await?) {
93
            Ok(psbt) => psbt,
94
            Err(e) => {
95
                tracing::error!("Error processing response: {e:?}");
96
                println!("Payjoin failed. To broadcast the fallback transaction, run:");
97
                println!(
98
                    "  bitcoin-cli -rpcwallet=<wallet> sendrawtransaction {:#}",
99
                    serialize_hex(&fallback_tx)
100
                );
101
                return Err(anyhow!("Failed to process response {e}"));
102
            }
103
        };
104

105
        self.process_pj_response(psbt)?;
106
        Ok(())
107
    }
2✔
108

109
    #[allow(clippy::incompatible_msrv)]
110
    async fn receive_payjoin(&self, amount: Amount) -> Result<()> {
3✔
111
        let mut interrupt = self.interrupt.clone();
112
        tokio::select! {
113
            res = self.start_http_server(amount) => { res?; }
114
            _ = interrupt.changed() => {
115
                println!("Interrupted.");
116
            }
117
        }
118
        Ok(())
119
    }
3✔
120

121
    #[cfg(feature = "v2")]
122
    async fn resume_payjoins(&self) -> Result<()> {
×
123
        unimplemented!("resume_payjoins not implemented for v1");
124
    }
×
125

126
    #[cfg(feature = "v2")]
127
    async fn history(&self) -> Result<()> {
×
128
        unimplemented!("history not implemented for v1");
129
    }
×
130

131
    #[cfg(feature = "v2")]
132
    async fn fallback_sender(&self, _session_id: crate::db::v2::SessionId) -> Result<()> {
×
133
        anyhow::bail!("fallback is only supported for v2 (BIP77) sessions")
134
    }
×
135

136
    #[cfg(feature = "v2")]
NEW
137
    async fn cancel_sender(&self, _session_id: crate::db::v2::SessionId) -> Result<()> {
×
138
        anyhow::bail!("cancel is only supported for v2 (BIP77) sessions")
NEW
UNCOV
139
    }
×
140
}
141

142
impl App {
143
    fn construct_payjoin_uri(&self, amount: Amount, endpoint: impl IntoUrl) -> Result<String> {
3✔
144
        let pj_receiver_address = self.wallet.get_new_address()?;
3✔
145

146
        let mut pj_uri = payjoin::receive::v1::build_v1_pj_uri(
3✔
147
            &pj_receiver_address,
3✔
148
            endpoint,
3✔
149
            payjoin::OutputSubstitution::Enabled,
3✔
UNCOV
150
        )?;
×
151
        pj_uri.amount = Some(amount);
3✔
152

153
        Ok(pj_uri.to_string())
3✔
154
    }
3✔
155

156
    async fn start_http_server(&self, amount: Amount) -> Result<()> {
3✔
157
        let port = self.config.v1()?.port;
3✔
158
        let addr = SocketAddr::from(([0, 0, 0, 0], port));
3✔
159
        let listener = TcpListener::bind(addr).await?;
3✔
160

161
        let mut endpoint = self.config.v1()?.pj_endpoint.clone();
3✔
162

163
        // If --port 0 is specified, a free port is chosen, so we need to set it
164
        // on the endpoint which must not have a port.
165
        if port == 0 {
3✔
166
            endpoint.set_port(Some(listener.local_addr()?.port()));
3✔
UNCOV
167
        }
×
168

169
        let pj_uri_string = self.construct_payjoin_uri(amount, endpoint.as_str())?;
3✔
170
        println!(
3✔
171
            "Listening at {}. Configured to accept payjoin at BIP 21 Payjoin Uri:",
172
            listener.local_addr()?
3✔
173
        );
174

175
        #[cfg(not(feature = "_manual-tls"))]
176
        println!(
177
            "Make sure to configure a reverse proxy to handle TLS termination for the listener"
178
        );
179

180
        println!("{pj_uri_string}");
3✔
181

182
        let app = self.clone();
3✔
183

184
        #[cfg(feature = "_manual-tls")]
185
        let tls_acceptor = self.init_tls_acceptor()?;
3✔
186
        while let Ok((stream, _)) = listener.accept().await {
6✔
187
            let app = app.clone();
3✔
188
            #[cfg(feature = "_manual-tls")]
189
            let tls_acceptor = tls_acceptor.clone();
3✔
190
            tokio::spawn(async move {
3✔
191
                #[cfg(feature = "_manual-tls")]
192
                let stream = match tls_acceptor.accept(stream).await {
3✔
193
                    Ok(tls_stream) => tls_stream,
3✔
194
                    Err(e) => {
×
195
                        tracing::error!("TLS accept error: {e}");
×
UNCOV
196
                        return;
×
197
                    }
198
                };
199

200
                let _ = http1::Builder::new()
3✔
201
                    .serve_connection(
3✔
202
                        TokioIo::new(stream),
3✔
203
                        service_fn(move |req| app.clone().handle_web_request(req)),
3✔
204
                    )
205
                    .await;
3✔
206
            });
3✔
207
        }
208
        Ok(())
×
UNCOV
209
    }
×
210

211
    #[cfg(feature = "_manual-tls")]
212
    fn init_tls_acceptor(&self) -> Result<tokio_rustls::TlsAcceptor> {
3✔
213
        use std::sync::Arc;
214

215
        use tokio_rustls::rustls::pki_types::{CertificateDer, PrivateKeyDer};
216
        use tokio_rustls::rustls::ServerConfig;
217
        use tokio_rustls::TlsAcceptor;
218

219
        let key_der = std::fs::read(
3✔
220
            self.config
3✔
221
                .certificate_key
3✔
222
                .as_ref()
3✔
223
                .expect("certificate key is required if listening with tls"),
3✔
UNCOV
224
        )?;
×
225
        let key = PrivateKeyDer::try_from(key_der.clone())
3✔
226
            .map_err(|e| anyhow::anyhow!("Could not parse key: {}", e))?;
3✔
227

228
        let cert_der = std::fs::read(
3✔
229
            self.config
3✔
230
                .root_certificate
3✔
231
                .as_ref()
3✔
232
                .expect("certificate key is required if listening with tls"),
3✔
UNCOV
233
        )?;
×
234
        let certs = vec![CertificateDer::from(cert_der)];
3✔
235

236
        let mut server_config = ServerConfig::builder()
3✔
237
            .with_no_client_auth()
3✔
238
            .with_single_cert(certs, key)
3✔
239
            .map_err(|e| anyhow::anyhow!("TLS error: {}", e))?;
3✔
240

241
        server_config.alpn_protocols =
3✔
242
            vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()];
3✔
243

244
        Ok(TlsAcceptor::from(Arc::new(server_config)))
3✔
245
    }
3✔
246

247
    async fn handle_web_request(
3✔
248
        self,
3✔
249
        req: Request<Incoming>,
3✔
250
    ) -> Result<Response<BoxBody<Bytes, hyper::Error>>> {
3✔
251
        tracing::trace!("Received {} request to {}", req.method(), req.uri().path());
3✔
252
        let mut response = match (req.method(), req.uri().path()) {
3✔
253
            (&Method::GET, "/bip21") => {
×
254
                let query_string = req.uri().query().unwrap_or("");
×
255
                tracing::trace!("{:?}, {query_string:?}", req.method());
×
256
                let query_url = payjoin::Url::parse(&format!("http://localhost/?{query_string}"))
×
257
                    .expect("valid query URL");
×
258
                let query_params: HashMap<String, String> =
×
259
                    query_url.query_pairs().into_iter().collect();
×
260
                let amount = query_params.get("amount").map(|amt| {
×
261
                    Amount::from_btc(amt.parse().expect("Failed to parse amount")).unwrap()
×
262
                });
×
263
                self.handle_get_bip21(amount)
×
264
                    .map_err(|e| {
×
265
                        tracing::error!("Error handling request: {e}");
×
266
                        Response::builder().status(500).body(full(e.to_string())).unwrap()
×
267
                    })
×
UNCOV
268
                    .unwrap_or_else(|err_resp| err_resp)
×
269
            }
270
            (&Method::POST, _) => self
3✔
271
                .handle_payjoin_post(req)
3✔
272
                .await
3✔
273
                .map_err(|e| {
3✔
274
                    let json = payjoin::receive::JsonReply::from(&e);
×
275
                    tracing::error!("Error handling request: {e}");
×
276
                    Response::builder()
×
277
                        .status(json.status_code())
×
278
                        .body(full(json.to_json().to_string()))
×
279
                        .unwrap()
×
UNCOV
280
                })
×
281
                .unwrap_or_else(|err_resp| err_resp),
3✔
UNCOV
282
            _ => Response::builder().status(StatusCode::NOT_FOUND).body(full("Not found")).unwrap(),
×
283
        };
284
        response
3✔
285
            .headers_mut()
3✔
286
            .insert("Access-Control-Allow-Origin", hyper::header::HeaderValue::from_static("*"));
3✔
287
        Ok(response)
3✔
288
    }
3✔
289

290
    fn handle_get_bip21(
×
291
        &self,
×
292
        amount: Option<Amount>,
×
293
    ) -> Result<Response<BoxBody<Bytes, hyper::Error>>, Error> {
×
294
        let v1_config = self.config.v1().map_err(|e| {
×
295
            Error::Implementation(ImplementationError::from(e.into_boxed_dyn_error()))
×
296
        })?;
×
297
        let address = self.wallet.get_new_address().map_err(|e| {
×
298
            Error::Implementation(ImplementationError::from(e.into_boxed_dyn_error()))
×
299
        })?;
×
300
        let uri_string = if let Some(amount) = amount {
×
UNCOV
301
            format!(
×
302
                "{}?amount={}&pj={}",
303
                address.to_qr_uri(),
×
UNCOV
304
                amount.to_btc(),
×
305
                v1_config.pj_endpoint
306
            )
307
        } else {
UNCOV
308
            format!("{}?pj={}", address.to_qr_uri(), v1_config.pj_endpoint)
×
309
        };
310
        let uri = Uri::try_from(uri_string.clone()).map_err(|_| {
×
311
            Error::Implementation(ImplementationError::from(
×
312
                anyhow!("Could not parse payjoin URI string.").into_boxed_dyn_error(),
×
313
            ))
×
314
        })?;
×
UNCOV
315
        let _ = uri.assume_checked(); // we just got it from bitcoind above
×
316

317
        Ok(Response::new(full(uri_string)))
×
UNCOV
318
    }
×
319

320
    async fn handle_payjoin_post(
3✔
321
        &self,
3✔
322
        req: Request<Incoming>,
3✔
323
    ) -> Result<Response<BoxBody<Bytes, hyper::Error>>, Error> {
3✔
324
        let (parts, body) = req.into_parts();
3✔
325
        let headers = Headers(&parts.headers);
3✔
326
        let query_string = parts.uri.query().unwrap_or("");
3✔
327
        let body = body
3✔
328
            .collect()
3✔
329
            .await
3✔
330
            .map_err(|e| Error::Implementation(ImplementationError::new(e)))?
3✔
331
            .to_bytes();
3✔
332
        let proposal = UncheckedOriginalPayload::from_request(&body, query_string, headers)?;
3✔
333

334
        let payjoin_proposal = self.process_v1_proposal(proposal)?;
3✔
335
        let psbt = payjoin_proposal.psbt();
3✔
336
        let body = psbt.to_string();
3✔
337
        println!(
3✔
338
            "Responded with Payjoin proposal {}",
339
            psbt.clone().extract_tx_unchecked_fee_rate().compute_txid()
3✔
340
        );
341
        Ok(Response::new(full(body)))
3✔
342
    }
3✔
343

344
    fn process_v1_proposal(
3✔
345
        &self,
3✔
346
        proposal: UncheckedOriginalPayload,
3✔
347
    ) -> Result<PayjoinProposal, Error> {
3✔
348
        let wallet = self.wallet();
3✔
349

350
        // Receive Check 1: Can Broadcast
351
        let proposal = proposal.check_broadcast_suitability(None, |tx| {
3✔
352
            wallet
3✔
353
                .can_broadcast(tx)
3✔
354
                .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
3✔
355
        })?;
3✔
356
        tracing::trace!("check1");
3✔
357

358
        // in a payment processor where the sender could go offline, this is where you schedule to broadcast the original_tx
359
        let _to_broadcast_in_failure_case = proposal.extract_tx_to_schedule_broadcast();
3✔
360

361
        // Receive Check 2: receiver can't sign for proposal inputs
362
        let proposal = proposal.check_inputs_not_owned(&mut |input| {
3✔
363
            wallet.is_mine(input).map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
3✔
364
        })?;
3✔
365
        tracing::trace!("check2");
3✔
366

367
        // Receive Check 3: have we seen this input before? More of a check for non-interactive i.e. payment processor receivers.
368
        let payjoin = proposal.check_no_inputs_seen_before(&mut |input| {
3✔
369
            Ok(self.db.insert_input_seen_before(*input)?)
3✔
370
        })?;
3✔
371
        tracing::trace!("check3");
3✔
372

373
        let payjoin = payjoin.identify_receiver_outputs(&mut |output_script| {
6✔
374
            wallet
6✔
375
                .is_mine(output_script)
6✔
376
                .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
6✔
377
        })?;
6✔
378

379
        let payjoin = payjoin
3✔
380
            .substitute_receiver_script(
3✔
381
                &self
3✔
382
                    .wallet
3✔
383
                    .get_new_address()
3✔
384
                    .map_err(|e| {
3✔
385
                        Error::Implementation(ImplementationError::from(e.into_boxed_dyn_error()))
×
UNCOV
386
                    })?
×
387
                    .script_pubkey(),
3✔
388
            )
389
            .map_err(|e| Error::Implementation(ImplementationError::new(e)))?
3✔
390
            .commit_outputs();
3✔
391

392
        let wants_fee_range = try_contributing_inputs(payjoin.clone(), &self.wallet)
3✔
393
            .map_err(Error::Implementation)?;
3✔
394
        let provisional_payjoin =
3✔
395
            wants_fee_range.apply_fee_range(None, self.config.max_fee_rate)?;
3✔
396

397
        let payjoin_proposal = provisional_payjoin.finalize_proposal(|psbt| {
3✔
398
            self.wallet
3✔
399
                .process_psbt(psbt)
3✔
400
                .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
3✔
401
        })?;
3✔
402
        Ok(payjoin_proposal)
3✔
403
    }
3✔
404
}
405

406
fn try_contributing_inputs(
3✔
407
    payjoin: payjoin::receive::v1::WantsInputs,
3✔
408
    wallet: &BitcoindWallet,
3✔
409
) -> Result<payjoin::receive::v1::WantsFeeRange, ImplementationError> {
3✔
410
    let candidate_inputs =
3✔
411
        wallet.list_unspent().map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))?;
3✔
412

413
    if candidate_inputs.is_empty() {
3✔
414
        return Err(ImplementationError::from(
×
415
            anyhow::anyhow!(
×
416
                "No spendable UTXOs available in wallet. Please fund your wallet before resuming this session"
×
417
            )
×
418
            .into_boxed_dyn_error(),
×
UNCOV
419
        ));
×
420
    }
3✔
421

422
    let selected_input =
3✔
423
        payjoin.try_preserving_privacy(candidate_inputs).map_err(ImplementationError::new)?;
3✔
424

425
    Ok(payjoin
3✔
426
        .contribute_inputs(vec![selected_input])
3✔
427
        .map_err(ImplementationError::new)?
3✔
428
        .commit_inputs())
3✔
429
}
3✔
430

431
fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
3✔
432
    Full::new(chunk.into()).map_err(|never| match never {}).boxed()
3✔
433
}
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc