• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

payjoin / rust-payjoin / 13350991407

16 Feb 2025 02:43AM UTC coverage: 79.468% (+0.2%) from 79.269%
13350991407

Pull #538

github

web-flow
Merge 0e46f80b5 into 2627ef20f
Pull Request #538: Make payjoin-cli v1 / v2 features additive

363 of 422 new or added lines in 6 files covered. (86.02%)

2 existing lines in 1 file now uncovered.

4122 of 5187 relevant lines covered (79.47%)

887.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.85
/payjoin-cli/src/app/v1.rs
1
use std::collections::HashMap;
2
use std::net::SocketAddr;
3
use std::str::FromStr;
4
use std::sync::Arc;
5

6
use anyhow::{anyhow, Context, Result};
7
use bitcoincore_rpc::bitcoin::Amount;
8
use http_body_util::combinators::BoxBody;
9
use http_body_util::{BodyExt, Full};
10
use hyper::body::{Buf, Bytes, Incoming};
11
use hyper::server::conn::http1;
12
use hyper::service::service_fn;
13
use hyper::{Method, Request, Response, StatusCode};
14
use hyper_util::rt::TokioIo;
15
use payjoin::bitcoin::psbt::Psbt;
16
use payjoin::bitcoin::FeeRate;
17
use payjoin::receive::v1::{PayjoinProposal, UncheckedProposal};
18
use payjoin::receive::ReplyableError::{self, Implementation, V1};
19
use payjoin::send::v1::SenderBuilder;
20
use payjoin::{Uri, UriExt};
21
use tokio::net::TcpListener;
22
use tokio::sync::watch;
23

24
use super::config::Config;
25
use super::wallet::BitcoindWallet;
26
use super::App as AppTrait;
27
use crate::app::{handle_interrupt, http_agent};
28
use crate::db::Database;
29
#[cfg(feature = "_danger-local-https")]
30
pub const LOCAL_CERT_FILE: &str = "localhost.der";
31

32
struct Headers<'a>(&'a hyper::HeaderMap);
33
impl payjoin::receive::v1::Headers for Headers<'_> {
34
    fn get_header(&self, key: &str) -> Option<&str> {
4✔
35
        self.0.get(key).map(|v| v.to_str()).transpose().ok().flatten()
4✔
36
    }
4✔
37
}
38

39
#[derive(Clone)]
40
pub(crate) struct App {
41
    config: Config,
42
    db: Arc<Database>,
43
    wallet: BitcoindWallet,
44
    interrupt: watch::Receiver<()>,
45
}
46

47
#[async_trait::async_trait]
48
impl AppTrait for App {
49
    fn new(config: Config) -> Result<Self> {
4✔
50
        let db = Arc::new(Database::create(&config.db_path)?);
4✔
51
        let (interrupt_tx, interrupt_rx) = watch::channel(());
4✔
52
        tokio::spawn(handle_interrupt(interrupt_tx));
4✔
53
        let wallet = BitcoindWallet::new(&config.bitcoind)?;
4✔
54
        let app = Self { config, db, wallet, interrupt: interrupt_rx };
4✔
55
        app.wallet()
4✔
56
            .network()
4✔
57
            .context("Failed to connect to bitcoind. Check config RPC connection.")?;
4✔
58
        Ok(app)
4✔
59
    }
4✔
60

61
    fn wallet(&self) -> BitcoindWallet { self.wallet.clone() }
14✔
62

63
    async fn send_payjoin(&self, bip21: &str, fee_rate: FeeRate) -> Result<()> {
2✔
64
        let uri =
2✔
65
            Uri::try_from(bip21).map_err(|e| anyhow!("Failed to create URI from BIP21: {}", e))?;
2✔
66
        let uri = uri.assume_checked();
2✔
67
        let uri = uri.check_pj_supported().map_err(|_| anyhow!("URI does not support Payjoin"))?;
2✔
68
        let psbt = self.create_original_psbt(&uri, fee_rate)?;
2✔
69
        let (req, ctx) = SenderBuilder::new(psbt, uri.clone())
2✔
70
            .build_recommended(fee_rate)
2✔
71
            .with_context(|| "Failed to build payjoin request")?
2✔
72
            .extract_v1()?;
2✔
73
        let http = http_agent()?;
2✔
74
        let body = String::from_utf8(req.body.clone()).unwrap();
2✔
75
        println!("Sending fallback request to {}", &req.url);
2✔
76
        let response = http
2✔
77
            .post(req.url)
2✔
78
            .header("Content-Type", req.content_type)
2✔
79
            .body(body.clone())
2✔
80
            .send()
2✔
81
            .await
2✔
82
            .with_context(|| "HTTP request failed")?;
2✔
83
        let fallback_tx = Psbt::from_str(&body)
2✔
84
            .map_err(|e| anyhow!("Failed to load PSBT from base64: {}", e))?
2✔
85
            .extract_tx()?;
2✔
86
        println!("Sent fallback transaction txid: {}", fallback_tx.compute_txid());
2✔
87
        println!(
2✔
88
            "Sent fallback transaction hex: {:#}",
2✔
89
            payjoin::bitcoin::consensus::encode::serialize_hex(&fallback_tx)
2✔
90
        );
2✔
91
        let psbt = ctx.process_response(&mut response.bytes().await?.to_vec().as_slice()).map_err(
2✔
92
            |e| {
2✔
93
                log::debug!("Error processing response: {:?}", e);
×
94
                anyhow!("Failed to process response {}", e)
×
95
            },
2✔
96
        )?;
2✔
97

98
        self.process_pj_response(psbt)?;
2✔
99
        Ok(())
2✔
100
    }
4✔
101

102
    #[allow(clippy::incompatible_msrv)]
103
    async fn receive_payjoin(&self, amount: Amount) -> Result<()> {
2✔
104
        let pj_uri_string = self.construct_payjoin_uri(amount, None)?;
2✔
105
        println!(
2✔
106
            "Debug - Config: port={}, pj_endpoint={}",
2✔
107
            self.config.v1()?.port,
2✔
108
            self.config.v1()?.pj_endpoint
2✔
109
        );
110
        println!("Debug - URI about to be printed: {}", pj_uri_string);
2✔
111
        println!(
2✔
112
            "Listening at {}. Configured to accept payjoin at BIP 21 Payjoin Uri:",
2✔
113
            self.config.v1()?.port
2✔
114
        );
115
        println!("{}", pj_uri_string);
2✔
116

2✔
117
        let mut interrupt = self.interrupt.clone();
2✔
118
        tokio::select! {
2✔
119
            res = self.start_http_server() => { res?; }
2✔
120
            _ = interrupt.changed() => {
2✔
121
                println!("Interrupted.");
2✔
122
            }
2✔
123
        }
124
        Ok(())
2✔
125
    }
4✔
126

127
    #[cfg(feature = "v2")]
NEW
128
    async fn resume_payjoins(&self) -> Result<()> {
×
NEW
129
        unimplemented!("resume_payjoins not implemented for v1");
×
NEW
130
    }
×
131
}
132

133
impl App {
134
    fn construct_payjoin_uri(
2✔
135
        &self,
2✔
136
        amount: Amount,
2✔
137
        fallback_target: Option<&str>,
2✔
138
    ) -> Result<String> {
2✔
139
        let pj_receiver_address = self.wallet.get_new_address()?;
2✔
140
        let pj_part = match fallback_target {
2✔
141
            Some(target) => target,
×
142
            None => self.config.v1()?.pj_endpoint.as_str(),
2✔
143
        };
144
        let pj_part = payjoin::Url::parse(pj_part)
2✔
145
            .map_err(|e| anyhow!("Failed to parse pj_endpoint: {}", e))?;
2✔
146

147
        let mut pj_uri =
2✔
148
            payjoin::receive::v1::build_v1_pj_uri(&pj_receiver_address, &pj_part, false);
2✔
149
        pj_uri.amount = Some(amount);
2✔
150

2✔
151
        Ok(pj_uri.to_string())
2✔
152
    }
2✔
153

154
    async fn start_http_server(&self) -> Result<()> {
2✔
155
        let addr = SocketAddr::from(([0, 0, 0, 0], self.config.v1()?.port));
2✔
156
        let listener = TcpListener::bind(addr).await?;
2✔
157
        let app = self.clone();
2✔
158

159
        #[cfg(feature = "_danger-local-https")]
160
        let tls_acceptor = Self::init_tls_acceptor()?;
2✔
161
        while let Ok((stream, _)) = listener.accept().await {
4✔
162
            let app = app.clone();
2✔
163
            #[cfg(feature = "_danger-local-https")]
2✔
164
            let tls_acceptor = tls_acceptor.clone();
2✔
165
            tokio::spawn(async move {
2✔
166
                #[cfg(feature = "_danger-local-https")]
167
                let stream = match tls_acceptor.accept(stream).await {
2✔
168
                    Ok(tls_stream) => tls_stream,
2✔
169
                    Err(e) => {
×
170
                        log::error!("TLS accept error: {}", e);
×
171
                        return;
×
172
                    }
173
                };
174

175
                let _ = http1::Builder::new()
2✔
176
                    .serve_connection(
2✔
177
                        TokioIo::new(stream),
2✔
178
                        service_fn(move |req| app.clone().handle_web_request(req)),
2✔
179
                    )
2✔
180
                    .await;
2✔
181
            });
2✔
182
        }
183
        Ok(())
×
184
    }
×
185

186
    #[cfg(feature = "_danger-local-https")]
187
    fn init_tls_acceptor() -> Result<tokio_rustls::TlsAcceptor> {
2✔
188
        use std::io::Write;
189

190
        use rustls::pki_types::{CertificateDer, PrivateKeyDer};
191
        use rustls::ServerConfig;
192
        use tokio_rustls::TlsAcceptor;
193

194
        let cert = rcgen::generate_simple_self_signed(vec!["localhost".to_string()])?;
2✔
195
        let cert_der = cert.serialize_der()?;
2✔
196
        let mut local_cert_path = std::env::temp_dir();
2✔
197
        local_cert_path.push(LOCAL_CERT_FILE);
2✔
198
        let mut file = std::fs::File::create(local_cert_path)?;
2✔
199
        file.write_all(&cert_der)?;
2✔
200
        let key = PrivateKeyDer::try_from(cert.serialize_private_key_der())
2✔
201
            .map_err(|e| anyhow::anyhow!("Could not parse key: {}", e))?;
2✔
202
        let certs = vec![CertificateDer::from(cert_der)];
2✔
203
        let mut server_config = ServerConfig::builder()
2✔
204
            .with_no_client_auth()
2✔
205
            .with_single_cert(certs, key)
2✔
206
            .map_err(|e| anyhow::anyhow!("TLS error: {}", e))?;
2✔
207
        server_config.alpn_protocols =
2✔
208
            vec![b"h2".to_vec(), b"http/1.1".to_vec(), b"http/1.0".to_vec()];
2✔
209
        Ok(TlsAcceptor::from(Arc::new(server_config)))
2✔
210
    }
2✔
211

212
    async fn handle_web_request(
2✔
213
        self,
2✔
214
        req: Request<Incoming>,
2✔
215
    ) -> Result<Response<BoxBody<Bytes, hyper::Error>>> {
2✔
216
        log::debug!("Received request: {:?}", req);
2✔
217
        let mut response = match (req.method(), req.uri().path()) {
2✔
218
            (&Method::GET, "/bip21") => {
×
219
                let query_string = req.uri().query().unwrap_or("");
×
220
                log::debug!("{:?}, {:?}", req.method(), query_string);
×
221
                let query_params: HashMap<_, _> =
×
222
                    url::form_urlencoded::parse(query_string.as_bytes()).into_owned().collect();
×
223
                let amount = query_params.get("amount").map(|amt| {
×
224
                    Amount::from_btc(amt.parse().expect("Failed to parse amount")).unwrap()
×
225
                });
×
226
                self.handle_get_bip21(amount)
×
227
                    .map_err(|e| {
×
228
                        log::error!("Error handling request: {}", e);
×
229
                        Response::builder().status(500).body(full(e.to_string())).unwrap()
×
230
                    })
×
231
                    .unwrap_or_else(|err_resp| err_resp)
×
232
            }
233
            (&Method::POST, _) => self
2✔
234
                .handle_payjoin_post(req)
2✔
235
                .await
2✔
236
                .map_err(|e| match e {
2✔
237
                    V1(e) => {
×
238
                        log::error!("Error handling request: {}", e);
×
239
                        Response::builder().status(400).body(full(e.to_string())).unwrap()
×
240
                    }
241
                    e => {
×
242
                        log::error!("Error handling request: {}", e);
×
243
                        Response::builder().status(500).body(full(e.to_string())).unwrap()
×
244
                    }
245
                })
2✔
246
                .unwrap_or_else(|err_resp| err_resp),
2✔
247
            _ => Response::builder().status(StatusCode::NOT_FOUND).body(full("Not found")).unwrap(),
×
248
        };
249
        response
2✔
250
            .headers_mut()
2✔
251
            .insert("Access-Control-Allow-Origin", hyper::header::HeaderValue::from_static("*"));
2✔
252
        Ok(response)
2✔
253
    }
2✔
254

255
    fn handle_get_bip21(
×
256
        &self,
×
257
        amount: Option<Amount>,
×
258
    ) -> Result<Response<BoxBody<Bytes, hyper::Error>>, ReplyableError> {
×
NEW
259
        let v1_config = self.config.v1().map_err(|e| Implementation(e.into()))?;
×
NEW
260
        let address = self.wallet.get_new_address().map_err(|e| Implementation(e.into()))?;
×
261
        let uri_string = if let Some(amount) = amount {
×
262
            format!(
×
263
                "{}?amount={}&pj={}",
×
264
                address.to_qr_uri(),
×
265
                amount.to_btc(),
×
NEW
266
                v1_config.pj_endpoint
×
267
            )
×
268
        } else {
NEW
269
            format!("{}?pj={}", address.to_qr_uri(), v1_config.pj_endpoint)
×
270
        };
271
        let uri = Uri::try_from(uri_string.clone())
×
272
            .map_err(|_| Implementation(anyhow!("Could not parse payjoin URI string.").into()))?;
×
273
        let _ = uri.assume_checked(); // we just got it from bitcoind above
×
274

×
275
        Ok(Response::new(full(uri_string)))
×
276
    }
×
277

278
    async fn handle_payjoin_post(
2✔
279
        &self,
2✔
280
        req: Request<Incoming>,
2✔
281
    ) -> Result<Response<BoxBody<Bytes, hyper::Error>>, ReplyableError> {
2✔
282
        let (parts, body) = req.into_parts();
2✔
283
        let headers = Headers(&parts.headers);
2✔
284
        let query_string = parts.uri.query().unwrap_or("");
2✔
285
        let body = body.collect().await.map_err(|e| Implementation(e.into()))?.aggregate().reader();
2✔
286
        let proposal = UncheckedProposal::from_request(body, query_string, headers)?;
2✔
287

288
        let payjoin_proposal = self.process_v1_proposal(proposal)?;
2✔
289
        let psbt = payjoin_proposal.psbt();
2✔
290
        let body = psbt.to_string();
2✔
291
        println!(
2✔
292
            "Responded with Payjoin proposal {}",
2✔
293
            psbt.clone().extract_tx_unchecked_fee_rate().compute_txid()
2✔
294
        );
2✔
295
        Ok(Response::new(full(body)))
2✔
296
    }
2✔
297

298
    fn process_v1_proposal(
2✔
299
        &self,
2✔
300
        proposal: UncheckedProposal,
2✔
301
    ) -> Result<PayjoinProposal, ReplyableError> {
2✔
302
        let wallet = self.wallet();
2✔
303

2✔
304
        // in a payment processor where the sender could go offline, this is where you schedule to broadcast the original_tx
2✔
305
        let _to_broadcast_in_failure_case = proposal.extract_tx_to_schedule_broadcast();
2✔
306

307
        // Receive Check 1: Can Broadcast
308
        let proposal =
2✔
309
            proposal.check_broadcast_suitability(None, |tx| Ok(wallet.can_broadcast(tx)?))?;
2✔
310
        log::trace!("check1");
2✔
311

312
        // Receive Check 2: receiver can't sign for proposal inputs
313
        let proposal = proposal.check_inputs_not_owned(|input| Ok(wallet.is_mine(input)?))?;
2✔
314
        log::trace!("check2");
2✔
315

316
        // Receive Check 3: have we seen this input before? More of a check for non-interactive i.e. payment processor receivers.
317
        let payjoin = proposal
2✔
318
            .check_no_inputs_seen_before(|input| Ok(self.db.insert_input_seen_before(*input)?))?;
2✔
319
        log::trace!("check3");
2✔
320

321
        let payjoin = payjoin
2✔
322
            .identify_receiver_outputs(|output_script| Ok(wallet.is_mine(output_script)?))?;
4✔
323

324
        let payjoin = payjoin
2✔
325
            .substitute_receiver_script(
2✔
326
                &self
2✔
327
                    .wallet
2✔
328
                    .get_new_address()
2✔
329
                    .map_err(|e| Implementation(e.into()))?
2✔
330
                    .script_pubkey(),
2✔
331
            )
2✔
332
            .map_err(|e| Implementation(e.into()))?
2✔
333
            .commit_outputs();
2✔
334

2✔
335
        let provisional_payjoin = try_contributing_inputs(payjoin.clone(), &self.wallet)
2✔
336
            .unwrap_or_else(|e| {
2✔
337
                log::warn!("Failed to contribute inputs: {}", e);
×
338
                payjoin.commit_inputs()
×
339
            });
2✔
340

341
        let payjoin_proposal = provisional_payjoin.finalize_proposal(
2✔
342
            |psbt| Ok(self.wallet.process_psbt(psbt)?),
2✔
343
            None,
2✔
344
            self.config.max_fee_rate,
2✔
345
        )?;
2✔
346
        Ok(payjoin_proposal)
2✔
347
    }
2✔
348
}
349

350
fn try_contributing_inputs(
2✔
351
    payjoin: payjoin::receive::v1::WantsInputs,
2✔
352
    wallet: &BitcoindWallet,
2✔
353
) -> Result<payjoin::receive::v1::ProvisionalProposal> {
2✔
354
    let candidate_inputs = wallet.list_unspent()?;
2✔
355
    let selected_input = payjoin
2✔
356
        .try_preserving_privacy(candidate_inputs)
2✔
357
        .map_err(|e| anyhow!("Failed to make privacy preserving selection: {}", e))?;
2✔
358
    log::debug!("selected input: {:#?}", selected_input);
2✔
359

360
    Ok(payjoin
2✔
361
        .contribute_inputs(vec![selected_input])
2✔
362
        .expect("This shouldn't happen. Failed to contribute inputs.")
2✔
363
        .commit_inputs())
2✔
364
}
2✔
365

366
fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
2✔
367
    Full::new(chunk.into()).map_err(|never| match never {}).boxed()
2✔
368
}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc