• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

payjoin / rust-payjoin / 17331582027

29 Aug 2025 06:34PM UTC coverage: 85.916% (-0.05%) from 85.967%
17331582027

Pull #1020

github

web-flow
Merge a530f9ab1 into 285239539
Pull Request #1020: use tracing crate instead of log

32 of 42 new or added lines in 15 files covered. (76.19%)

55 existing lines in 2 files now uncovered.

8168 of 9507 relevant lines covered (85.92%)

491.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.76
/payjoin-cli/src/app/v1.rs
1
use std::collections::HashMap;
2
use std::net::SocketAddr;
3
use std::str::FromStr;
4
use std::sync::Arc;
5

6
use anyhow::{anyhow, Context, Result};
7
use http_body_util::combinators::BoxBody;
8
use http_body_util::{BodyExt, Full};
9
use hyper::body::{Bytes, Incoming};
10
use hyper::server::conn::http1;
11
use hyper::service::service_fn;
12
use hyper::{Method, Request, Response, StatusCode};
13
use hyper_util::rt::TokioIo;
14
use payjoin::bitcoin::psbt::Psbt;
15
use payjoin::bitcoin::{Amount, FeeRate};
16
use payjoin::receive::v1::{PayjoinProposal, UncheckedOriginalPayload};
17
use payjoin::receive::ReplyableError::{self, Implementation, V1};
18
use payjoin::send::v1::SenderBuilder;
19
use payjoin::{ImplementationError, IntoUrl, Uri, UriExt};
20
use tokio::net::TcpListener;
21
use tokio::sync::watch;
22

23
use super::config::Config;
24
use super::wallet::BitcoindWallet;
25
use super::App as AppTrait;
26
use crate::app::{handle_interrupt, http_agent};
27
use crate::db::Database;
28

29
struct Headers<'a>(&'a hyper::HeaderMap);
30
impl payjoin::receive::v1::Headers for Headers<'_> {
31
    fn get_header(&self, key: &str) -> Option<&str> {
6✔
32
        self.0.get(key).map(|v| v.to_str()).transpose().ok().flatten()
6✔
33
    }
6✔
34
}
35

36
#[derive(Clone)]
37
pub(crate) struct App {
38
    config: Config,
39
    db: Arc<Database>,
40
    wallet: BitcoindWallet,
41
    interrupt: watch::Receiver<()>,
42
}
43

44
#[async_trait::async_trait]
45
impl AppTrait for App {
46
    async fn new(config: Config) -> Result<Self> {
5✔
47
        let db = Arc::new(Database::create(&config.db_path)?);
48
        let (interrupt_tx, interrupt_rx) = watch::channel(());
49
        tokio::spawn(handle_interrupt(interrupt_tx));
50
        let wallet = BitcoindWallet::new(&config.bitcoind).await?;
51
        let app = Self { config, db, wallet, interrupt: interrupt_rx };
52
        app.wallet()
53
            .network()
54
            .context("Failed to connect to bitcoind. Check config RPC connection.")?;
55
        Ok(app)
56
    }
5✔
57

58
    fn wallet(&self) -> BitcoindWallet { self.wallet.clone() }
16✔
59

60
    async fn send_payjoin(&self, bip21: &str, fee_rate: FeeRate) -> Result<()> {
2✔
61
        let uri =
62
            Uri::try_from(bip21).map_err(|e| anyhow!("Failed to create URI from BIP21: {}", e))?;
×
63
        let uri = uri.assume_checked();
64
        let uri = uri.check_pj_supported().map_err(|_| anyhow!("URI does not support Payjoin"))?;
×
65
        let amount = uri.amount.ok_or_else(|| anyhow!("please specify the amount in the Uri"))?;
×
66
        let psbt = self.create_original_psbt(&uri.address, amount, fee_rate)?;
67
        let (req, ctx) = SenderBuilder::new(psbt, uri.clone())
68
            .build_recommended(fee_rate)
69
            .with_context(|| "Failed to build payjoin request")?
70
            .create_v1_post_request();
71
        let http = http_agent(&self.config)?;
72
        let body = String::from_utf8(req.body.clone()).unwrap();
73
        println!("Sending fallback request to {}", &req.url);
74
        let response = http
75
            .post(req.url)
76
            .header("Content-Type", req.content_type)
77
            .body(body.clone())
78
            .send()
79
            .await
80
            .with_context(|| "HTTP request failed")?;
81
        let fallback_tx = Psbt::from_str(&body)
82
            .map_err(|e| anyhow!("Failed to load PSBT from base64: {}", e))?
×
83
            .extract_tx()?;
84
        println!("Sent fallback transaction txid: {}", fallback_tx.compute_txid());
85
        println!(
86
            "Sent fallback transaction hex: {:#}",
87
            payjoin::bitcoin::consensus::encode::serialize_hex(&fallback_tx)
88
        );
89
        let psbt = ctx.process_response(&response.bytes().await?).map_err(|e| {
×
NEW
90
            tracing::debug!("Error processing response: {e:?}");
×
91
            anyhow!("Failed to process response {e}")
×
92
        })?;
×
93

94
        self.process_pj_response(psbt)?;
95
        Ok(())
96
    }
2✔
97

98
    #[allow(clippy::incompatible_msrv)]
99
    async fn receive_payjoin(&self, amount: Amount) -> Result<()> {
3✔
100
        let mut interrupt = self.interrupt.clone();
101
        tokio::select! {
102
            res = self.start_http_server(amount) => { res?; }
103
            _ = interrupt.changed() => {
104
                println!("Interrupted.");
105
            }
106
        }
107
        Ok(())
108
    }
3✔
109

110
    #[cfg(feature = "v2")]
111
    async fn resume_payjoins(&self) -> Result<()> {
×
112
        unimplemented!("resume_payjoins not implemented for v1");
113
    }
×
114
}
115

116
impl App {
117
    fn construct_payjoin_uri(&self, amount: Amount, endpoint: impl IntoUrl) -> Result<String> {
3✔
118
        let pj_receiver_address = self.wallet.get_new_address()?;
3✔
119

120
        let mut pj_uri = payjoin::receive::v1::build_v1_pj_uri(
3✔
121
            &pj_receiver_address,
3✔
122
            endpoint,
3✔
123
            payjoin::OutputSubstitution::Enabled,
3✔
124
        )?;
×
125
        pj_uri.amount = Some(amount);
3✔
126

127
        Ok(pj_uri.to_string())
3✔
128
    }
3✔
129

130
    async fn start_http_server(&self, amount: Amount) -> Result<()> {
3✔
131
        let port = self.config.v1()?.port;
3✔
132
        let addr = SocketAddr::from(([0, 0, 0, 0], port));
3✔
133
        let listener = TcpListener::bind(addr).await?;
3✔
134

135
        let mut endpoint = self.config.v1()?.pj_endpoint.clone();
3✔
136

137
        // If --port 0 is specified, a free port is chosen, so we need to set it
138
        // on the endpoint which must not have a port.
139
        if port == 0 {
3✔
140
            endpoint
3✔
141
                .set_port(Some(listener.local_addr()?.port()))
3✔
142
                .expect("setting port must succeed");
3✔
143
        }
×
144

145
        let pj_uri_string = self.construct_payjoin_uri(amount, endpoint)?;
3✔
146
        println!(
3✔
147
            "Listening at {}. Configured to accept payjoin at BIP 21 Payjoin Uri:",
3✔
148
            listener.local_addr()?
3✔
149
        );
150
        println!("{pj_uri_string}");
3✔
151

152
        let app = self.clone();
3✔
153

154
        #[cfg(feature = "_manual-tls")]
155
        let tls_acceptor = self.init_tls_acceptor()?;
3✔
156
        while let Ok((stream, _)) = listener.accept().await {
6✔
157
            let app = app.clone();
3✔
158
            #[cfg(feature = "_manual-tls")]
159
            let tls_acceptor = tls_acceptor.clone();
3✔
160
            tokio::spawn(async move {
3✔
161
                #[cfg(feature = "_manual-tls")]
162
                let stream = match tls_acceptor.accept(stream).await {
3✔
163
                    Ok(tls_stream) => tls_stream,
3✔
164
                    Err(e) => {
×
NEW
165
                        tracing::error!("TLS accept error: {e}");
×
166
                        return;
×
167
                    }
168
                };
169

170
                let _ = http1::Builder::new()
3✔
171
                    .serve_connection(
3✔
172
                        TokioIo::new(stream),
3✔
173
                        service_fn(move |req| app.clone().handle_web_request(req)),
3✔
174
                    )
175
                    .await;
3✔
176
            });
3✔
177
        }
178
        Ok(())
×
179
    }
×
180

181
    #[cfg(feature = "_manual-tls")]
182
    fn init_tls_acceptor(&self) -> Result<tokio_rustls::TlsAcceptor> {
3✔
183
        use std::sync::Arc;
184

185
        use tokio_rustls::rustls::pki_types::{CertificateDer, PrivateKeyDer};
186
        use tokio_rustls::rustls::ServerConfig;
187
        use tokio_rustls::TlsAcceptor;
188

189
        let key_der = std::fs::read(
3✔
190
            self.config
3✔
191
                .certificate_key
3✔
192
                .as_ref()
3✔
193
                .expect("certificate key is required if listening with tls"),
3✔
194
        )?;
×
195
        let key = PrivateKeyDer::try_from(key_der.clone())
3✔
196
            .map_err(|e| anyhow::anyhow!("Could not parse key: {}", e))?;
3✔
197

198
        let cert_der = std::fs::read(
3✔
199
            self.config
3✔
200
                .root_certificate
3✔
201
                .as_ref()
3✔
202
                .expect("certificate key is required if listening with tls"),
3✔
203
        )?;
×
204
        let certs = vec![CertificateDer::from(cert_der)];
3✔
205

206
        let mut server_config = ServerConfig::builder()
3✔
207
            .with_no_client_auth()
3✔
208
            .with_single_cert(certs, key)
3✔
209
            .map_err(|e| anyhow::anyhow!("TLS error: {}", e))?;
3✔
210

211
        server_config.alpn_protocols =
3✔
212
            vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()];
3✔
213

214
        Ok(TlsAcceptor::from(Arc::new(server_config)))
3✔
215
    }
3✔
216

217
    async fn handle_web_request(
3✔
218
        self,
3✔
219
        req: Request<Incoming>,
3✔
220
    ) -> Result<Response<BoxBody<Bytes, hyper::Error>>> {
3✔
221
        tracing::debug!("Received request: {req:?}");
3✔
222
        let mut response = match (req.method(), req.uri().path()) {
3✔
223
            (&Method::GET, "/bip21") => {
×
224
                let query_string = req.uri().query().unwrap_or("");
×
NEW
225
                tracing::debug!("{:?}, {query_string:?}", req.method());
×
226
                let query_params: HashMap<_, _> =
×
227
                    url::form_urlencoded::parse(query_string.as_bytes()).into_owned().collect();
×
228
                let amount = query_params.get("amount").map(|amt| {
×
229
                    Amount::from_btc(amt.parse().expect("Failed to parse amount")).unwrap()
×
230
                });
×
231
                self.handle_get_bip21(amount)
×
232
                    .map_err(|e| {
×
NEW
233
                        tracing::error!("Error handling request: {e}");
×
234
                        Response::builder().status(500).body(full(e.to_string())).unwrap()
×
235
                    })
×
236
                    .unwrap_or_else(|err_resp| err_resp)
×
237
            }
238
            (&Method::POST, _) => self
3✔
239
                .handle_payjoin_post(req)
3✔
240
                .await
3✔
241
                .map_err(|e| match e {
3✔
242
                    V1(e) => {
×
NEW
243
                        tracing::error!("Error handling request: {e}");
×
244
                        Response::builder().status(400).body(full(e.to_string())).unwrap()
×
245
                    }
246
                    e => {
×
NEW
247
                        tracing::error!("Error handling request: {e}");
×
248
                        Response::builder().status(500).body(full(e.to_string())).unwrap()
×
249
                    }
250
                })
×
251
                .unwrap_or_else(|err_resp| err_resp),
3✔
252
            _ => Response::builder().status(StatusCode::NOT_FOUND).body(full("Not found")).unwrap(),
×
253
        };
254
        response
3✔
255
            .headers_mut()
3✔
256
            .insert("Access-Control-Allow-Origin", hyper::header::HeaderValue::from_static("*"));
3✔
257
        Ok(response)
3✔
258
    }
3✔
259

260
    fn handle_get_bip21(
×
261
        &self,
×
262
        amount: Option<Amount>,
×
263
    ) -> Result<Response<BoxBody<Bytes, hyper::Error>>, ReplyableError> {
×
264
        let v1_config = self
×
265
            .config
×
266
            .v1()
×
267
            .map_err(|e| Implementation(ImplementationError::from(e.into_boxed_dyn_error())))?;
×
268
        let address = self
×
269
            .wallet
×
270
            .get_new_address()
×
271
            .map_err(|e| Implementation(ImplementationError::from(e.into_boxed_dyn_error())))?;
×
272
        let uri_string = if let Some(amount) = amount {
×
273
            format!(
×
274
                "{}?amount={}&pj={}",
×
275
                address.to_qr_uri(),
×
276
                amount.to_btc(),
×
277
                v1_config.pj_endpoint
278
            )
279
        } else {
280
            format!("{}?pj={}", address.to_qr_uri(), v1_config.pj_endpoint)
×
281
        };
282
        let uri = Uri::try_from(uri_string.clone()).map_err(|_| {
×
283
            Implementation(ImplementationError::from(
×
284
                anyhow!("Could not parse payjoin URI string.").into_boxed_dyn_error(),
×
285
            ))
×
286
        })?;
×
287
        let _ = uri.assume_checked(); // we just got it from bitcoind above
×
288

289
        Ok(Response::new(full(uri_string)))
×
290
    }
×
291

292
    async fn handle_payjoin_post(
3✔
293
        &self,
3✔
294
        req: Request<Incoming>,
3✔
295
    ) -> Result<Response<BoxBody<Bytes, hyper::Error>>, ReplyableError> {
3✔
296
        let (parts, body) = req.into_parts();
3✔
297
        let headers = Headers(&parts.headers);
3✔
298
        let query_string = parts.uri.query().unwrap_or("");
3✔
299
        let body = body
3✔
300
            .collect()
3✔
301
            .await
3✔
302
            .map_err(|e| Implementation(ImplementationError::new(e)))?
3✔
303
            .to_bytes();
3✔
304
        let proposal = UncheckedOriginalPayload::from_request(&body, query_string, headers)?;
3✔
305

306
        let payjoin_proposal = self.process_v1_proposal(proposal)?;
3✔
307
        let psbt = payjoin_proposal.psbt();
3✔
308
        let body = psbt.to_string();
3✔
309
        println!(
3✔
310
            "Responded with Payjoin proposal {}",
3✔
311
            psbt.clone().extract_tx_unchecked_fee_rate().compute_txid()
3✔
312
        );
313
        Ok(Response::new(full(body)))
3✔
314
    }
3✔
315

316
    fn process_v1_proposal(
3✔
317
        &self,
3✔
318
        proposal: UncheckedOriginalPayload,
3✔
319
    ) -> Result<PayjoinProposal, ReplyableError> {
3✔
320
        let wallet = self.wallet();
3✔
321

322
        // Receive Check 1: Can Broadcast
323
        let proposal = proposal.check_broadcast_suitability(None, |tx| {
3✔
324
            wallet
3✔
325
                .can_broadcast(tx)
3✔
326
                .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
3✔
327
        })?;
3✔
328
        tracing::trace!("check1");
3✔
329

330
        // in a payment processor where the sender could go offline, this is where you schedule to broadcast the original_tx
331
        let _to_broadcast_in_failure_case = proposal.extract_tx_to_schedule_broadcast();
3✔
332

333
        // Receive Check 2: receiver can't sign for proposal inputs
334
        let proposal = proposal.check_inputs_not_owned(&mut |input| {
3✔
335
            wallet.is_mine(input).map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
3✔
336
        })?;
3✔
337
        tracing::trace!("check2");
3✔
338

339
        // Receive Check 3: have we seen this input before? More of a check for non-interactive i.e. payment processor receivers.
340
        let payjoin = proposal.check_no_inputs_seen_before(&mut |input| {
3✔
341
            Ok(self.db.insert_input_seen_before(*input)?)
3✔
342
        })?;
3✔
343
        tracing::trace!("check3");
3✔
344

345
        let payjoin = payjoin.identify_receiver_outputs(&mut |output_script| {
6✔
346
            wallet
6✔
347
                .is_mine(output_script)
6✔
348
                .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
6✔
349
        })?;
6✔
350

351
        let payjoin = payjoin
3✔
352
            .substitute_receiver_script(
3✔
353
                &self
3✔
354
                    .wallet
3✔
355
                    .get_new_address()
3✔
356
                    .map_err(|e| {
3✔
357
                        Implementation(ImplementationError::from(e.into_boxed_dyn_error()))
×
358
                    })?
×
359
                    .script_pubkey(),
3✔
360
            )
361
            .map_err(|e| Implementation(ImplementationError::new(e)))?
3✔
362
            .commit_outputs();
3✔
363

364
        let wants_fee_range = try_contributing_inputs(payjoin.clone(), &self.wallet)
3✔
365
            .map_err(ReplyableError::Implementation)?;
3✔
366
        let provisional_payjoin =
3✔
367
            wants_fee_range.apply_fee_range(None, self.config.max_fee_rate)?;
3✔
368

369
        let payjoin_proposal = provisional_payjoin.finalize_proposal(|psbt| {
3✔
370
            self.wallet
3✔
371
                .process_psbt(psbt)
3✔
372
                .map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))
3✔
373
        })?;
3✔
374
        Ok(payjoin_proposal)
3✔
375
    }
3✔
376
}
377

378
fn try_contributing_inputs(
3✔
379
    payjoin: payjoin::receive::v1::WantsInputs,
3✔
380
    wallet: &BitcoindWallet,
3✔
381
) -> Result<payjoin::receive::v1::WantsFeeRange, ImplementationError> {
3✔
382
    let candidate_inputs =
3✔
383
        wallet.list_unspent().map_err(|e| ImplementationError::from(e.into_boxed_dyn_error()))?;
3✔
384

385
    let selected_input =
3✔
386
        payjoin.try_preserving_privacy(candidate_inputs).map_err(ImplementationError::new)?;
3✔
387

388
    Ok(payjoin
3✔
389
        .contribute_inputs(vec![selected_input])
3✔
390
        .map_err(ImplementationError::new)?
3✔
391
        .commit_inputs())
3✔
392
}
3✔
393

394
fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
3✔
395
    Full::new(chunk.into()).map_err(|never| match never {}).boxed()
3✔
396
}
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc