• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

payjoin / rust-payjoin / 13143813652

04 Feb 2025 08:01PM UTC coverage: 78.544% (-0.03%) from 78.576%
13143813652

Pull #526

github

web-flow
Merge 553a64477 into 20620b236
Pull Request #526: Only derive JsonError for errors that can return Json

121 of 162 new or added lines in 8 files covered. (74.69%)

22 existing lines in 3 files now uncovered.

3668 of 4670 relevant lines covered (78.54%)

984.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.68
/payjoin-cli/src/app/v2.rs
1
use std::str::FromStr;
2
use std::sync::Arc;
3

4
use anyhow::{anyhow, Context, Result};
5
use bitcoincore_rpc::RpcApi;
6
use payjoin::bitcoin::consensus::encode::serialize_hex;
7
use payjoin::bitcoin::psbt::Psbt;
8
use payjoin::bitcoin::{Amount, FeeRate};
9
use payjoin::receive::v2::{Receiver, UncheckedProposal};
10
use payjoin::receive::Error;
11
use payjoin::receive::ReplyableError::Implementation;
12
use payjoin::send::v2::{Sender, SenderBuilder};
13
use payjoin::{bitcoin, Uri};
14
use tokio::signal;
15
use tokio::sync::watch;
16

17
use super::config::AppConfig;
18
use super::App as AppTrait;
19
use crate::app::{http_agent, input_pair_from_list_unspent};
20
use crate::db::Database;
21

22
#[derive(Clone)]
23
pub(crate) struct App {
24
    config: AppConfig,
25
    db: Arc<Database>,
26
    interrupt: watch::Receiver<()>,
27
}
28

29
#[async_trait::async_trait]
30
impl AppTrait for App {
31
    fn new(config: AppConfig) -> Result<Self> {
4✔
32
        let db = Arc::new(Database::create(&config.db_path)?);
4✔
33
        let (interrupt_tx, interrupt_rx) = watch::channel(());
4✔
34
        tokio::spawn(handle_interrupt(interrupt_tx));
4✔
35
        let app = Self { config, db, interrupt: interrupt_rx };
4✔
36
        app.bitcoind()?
4✔
37
            .get_blockchain_info()
4✔
38
            .context("Failed to connect to bitcoind. Check config RPC connection.")?;
4✔
39
        Ok(app)
4✔
40
    }
4✔
41

42
    fn bitcoind(&self) -> Result<bitcoincore_rpc::Client> {
11✔
43
        match &self.config.bitcoind_cookie {
11✔
44
            Some(cookie) => bitcoincore_rpc::Client::new(
11✔
45
                self.config.bitcoind_rpchost.as_str(),
11✔
46
                bitcoincore_rpc::Auth::CookieFile(cookie.into()),
11✔
47
            ),
11✔
48
            None => bitcoincore_rpc::Client::new(
×
49
                self.config.bitcoind_rpchost.as_str(),
×
50
                bitcoincore_rpc::Auth::UserPass(
×
51
                    self.config.bitcoind_rpcuser.clone(),
×
52
                    self.config.bitcoind_rpcpassword.clone(),
×
53
                ),
×
54
            ),
×
55
        }
56
        .with_context(|| "Failed to connect to bitcoind")
11✔
57
    }
11✔
58

59
    async fn send_payjoin(&self, bip21: &str, fee_rate: FeeRate) -> Result<()> {
2✔
60
        use payjoin::UriExt;
61
        let uri =
2✔
62
            Uri::try_from(bip21).map_err(|e| anyhow!("Failed to create URI from BIP21: {}", e))?;
2✔
63
        let uri = uri.assume_checked();
2✔
64
        let uri = uri.check_pj_supported().map_err(|_| anyhow!("URI does not support Payjoin"))?;
2✔
65
        let url = uri.extras.endpoint();
2✔
66
        // match bip21 to send_session public_key
67
        let req_ctx = match self.db.get_send_session(url)? {
2✔
68
            Some(send_session) => send_session,
1✔
69
            None => {
70
                let psbt = self.create_original_psbt(&uri, fee_rate)?;
1✔
71
                let mut req_ctx = SenderBuilder::new(psbt, uri.clone())
1✔
72
                    .build_recommended(fee_rate)
1✔
73
                    .with_context(|| "Failed to build payjoin request")?;
1✔
74
                self.db.insert_send_session(&mut req_ctx, url)?;
1✔
75
                req_ctx
1✔
76
            }
77
        };
78
        self.spawn_payjoin_sender(req_ctx).await
2✔
79
    }
3✔
80

81
    async fn receive_payjoin(self, amount: Amount) -> Result<()> {
1✔
82
        let address = self.bitcoind()?.get_new_address(None, None)?.assume_checked();
1✔
83
        let ohttp_keys = unwrap_ohttp_keys_or_else_fetch(&self.config).await?;
1✔
84
        let session =
1✔
85
            Receiver::new(address, self.config.pj_directory.clone(), ohttp_keys.clone(), None);
1✔
86
        self.db.insert_recv_session(session.clone())?;
1✔
87
        self.spawn_payjoin_receiver(session, Some(amount)).await
1✔
88
    }
1✔
89
}
90

91
impl App {
92
    #[allow(clippy::incompatible_msrv)]
93
    async fn spawn_payjoin_sender(&self, mut req_ctx: Sender) -> Result<()> {
2✔
94
        let mut interrupt = self.interrupt.clone();
2✔
95
        tokio::select! {
2✔
96
            res = self.long_poll_post(&mut req_ctx) => {
2✔
97
                self.process_pj_response(res?)?;
1✔
98
                self.db.clear_send_session(req_ctx.endpoint())?;
1✔
99
            }
100
            _ = interrupt.changed() => {
2✔
101
                println!("Interrupted. Call `send` with the same arguments to resume this session or `resume` to resume all sessions.");
1✔
102
            }
1✔
103
        }
104
        Ok(())
2✔
105
    }
2✔
106

107
    #[allow(clippy::incompatible_msrv)]
108
    async fn spawn_payjoin_receiver(
2✔
109
        &self,
2✔
110
        mut session: Receiver,
2✔
111
        amount: Option<Amount>,
2✔
112
    ) -> Result<()> {
2✔
113
        println!("Receive session established");
2✔
114
        let mut pj_uri = session.pj_uri();
2✔
115
        pj_uri.amount = amount;
2✔
116

2✔
117
        println!("Request Payjoin by sharing this Payjoin Uri:");
2✔
118
        println!("{}", pj_uri);
2✔
119

2✔
120
        let mut interrupt = self.interrupt.clone();
2✔
121
        let receiver = tokio::select! {
2✔
122
            res = self.long_poll_fallback(&mut session) => res,
2✔
123
            _ = interrupt.changed() => {
2✔
124
                println!("Interrupted. Call the `resume` command to resume all sessions.");
×
125
                return Ok(());
×
126
            }
127
        }?;
×
128

129
        println!("Fallback transaction received. Consider broadcasting this to get paid if the Payjoin fails:");
1✔
130
        println!("{}", serialize_hex(&receiver.extract_tx_to_schedule_broadcast()));
1✔
131
        let mut payjoin_proposal = match self.process_v2_proposal(receiver.clone()) {
1✔
132
            Ok(proposal) => proposal,
1✔
133
            Err(e) => {
×
134
                return Err(handle_request_error(e, receiver, &self.config.ohttp_relay).await);
×
135
            }
136
        };
137
        let (req, ohttp_ctx) = payjoin_proposal
1✔
138
            .extract_v2_req(&self.config.ohttp_relay)
1✔
139
            .map_err(|e| anyhow!("v2 req extraction failed {}", e))?;
1✔
140
        println!("Got a request from the sender. Responding with a Payjoin proposal.");
1✔
141
        let res = post_request(req).await?;
1✔
142
        payjoin_proposal
1✔
143
            .process_res(&res.bytes().await?, ohttp_ctx)
1✔
144
            .map_err(|e| anyhow!("Failed to deserialize response {}", e))?;
1✔
145
        let payjoin_psbt = payjoin_proposal.psbt().clone();
1✔
146
        println!(
1✔
147
            "Response successful. Watch mempool for successful Payjoin. TXID: {}",
1✔
148
            payjoin_psbt.extract_tx_unchecked_fee_rate().clone().compute_txid()
1✔
149
        );
1✔
150
        self.db.clear_recv_session()?;
1✔
151
        Ok(())
1✔
152
    }
1✔
153

154
    #[allow(clippy::incompatible_msrv)]
155
    pub async fn resume_payjoins(&self) -> Result<()> {
1✔
156
        let recv_sessions = self.db.get_recv_sessions()?;
1✔
157
        let send_sessions = self.db.get_send_sessions()?;
1✔
158

159
        if recv_sessions.is_empty() && send_sessions.is_empty() {
1✔
160
            println!("No sessions to resume.");
×
161
            return Ok(());
×
162
        }
1✔
163

1✔
164
        let mut tasks = Vec::new();
1✔
165

166
        for session in recv_sessions {
2✔
167
            let self_clone = self.clone();
1✔
168
            tasks.push(tokio::spawn(async move {
1✔
169
                self_clone.spawn_payjoin_receiver(session, None).await
1✔
170
            }));
1✔
171
        }
1✔
172

173
        for session in send_sessions {
1✔
174
            let self_clone = self.clone();
×
175
            tasks.push(tokio::spawn(async move { self_clone.spawn_payjoin_sender(session).await }));
×
176
        }
×
177

178
        let mut interrupt = self.interrupt.clone();
1✔
179
        tokio::select! {
1✔
180
            _ = async {
1✔
181
                for task in tasks {
1✔
182
                    let _ = task.await;
1✔
183
                }
184
            } => {
×
185
                println!("All resumed sessions completed.");
×
186
            }
×
187
            _ = interrupt.changed() => {
1✔
188
                println!("Resumed sessions were interrupted.");
1✔
189
            }
1✔
190
        }
191
        Ok(())
1✔
192
    }
1✔
193

194
    async fn long_poll_post(&self, req_ctx: &mut Sender) -> Result<Psbt> {
2✔
195
        match req_ctx.extract_v2(self.config.ohttp_relay.clone()) {
2✔
196
            Ok((req, ctx)) => {
2✔
197
                println!("Posting Original PSBT Payload request...");
2✔
198
                let response = post_request(req).await?;
2✔
199
                println!("Sent fallback transaction");
2✔
200
                let v2_ctx = Arc::new(ctx.process_response(&response.bytes().await?)?);
2✔
201
                loop {
202
                    let (req, ohttp_ctx) = v2_ctx.extract_req(self.config.ohttp_relay.clone())?;
3✔
203
                    let response = post_request(req).await?;
3✔
204
                    match v2_ctx.process_response(&response.bytes().await?, ohttp_ctx) {
2✔
205
                        Ok(Some(psbt)) => return Ok(psbt),
1✔
206
                        Ok(None) => {
1✔
207
                            println!("No response yet.");
1✔
208
                        }
1✔
209
                        Err(re) => {
×
210
                            println!("{}", re);
×
211
                            log::debug!("{:?}", re);
×
212
                            return Err(anyhow!("Response error").context(re));
×
213
                        }
214
                    }
215
                }
216
            }
217
            Err(_) => {
218
                let (req, v1_ctx) = req_ctx.extract_v1()?;
×
219
                println!("Posting Original PSBT Payload request...");
×
220
                let response = post_request(req).await?;
×
221
                println!("Sent fallback transaction");
×
222
                match v1_ctx.process_response(&mut response.bytes().await?.to_vec().as_slice()) {
×
223
                    Ok(psbt) => Ok(psbt),
×
224
                    Err(re) => {
×
225
                        println!("{}", re);
×
226
                        log::debug!("{:?}", re);
×
227
                        Err(anyhow!("Response error").context(re))
×
228
                    }
229
                }
230
            }
231
        }
232
    }
1✔
233

234
    async fn long_poll_fallback(
2✔
235
        &self,
2✔
236
        session: &mut payjoin::receive::v2::Receiver,
2✔
237
    ) -> Result<payjoin::receive::v2::UncheckedProposal> {
2✔
238
        loop {
239
            let (req, context) = session.extract_req(&self.config.ohttp_relay)?;
2✔
240
            println!("Polling receive request...");
2✔
241
            let ohttp_response = post_request(req).await?;
2✔
242
            let proposal = session
1✔
243
                .process_res(ohttp_response.bytes().await?.to_vec().as_slice(), context)
1✔
244
                .map_err(|_| anyhow!("GET fallback failed"))?;
1✔
245
            log::debug!("got response");
1✔
246
            if let Some(proposal) = proposal {
1✔
247
                break Ok(proposal);
1✔
248
            }
×
249
        }
250
    }
1✔
251

252
    fn process_v2_proposal(
1✔
253
        &self,
1✔
254
        proposal: payjoin::receive::v2::UncheckedProposal,
1✔
255
    ) -> Result<payjoin::receive::v2::PayjoinProposal, Error> {
1✔
256
        let bitcoind = self.bitcoind().map_err(|e| Implementation(e.into()))?;
1✔
257

258
        // in a payment processor where the sender could go offline, this is where you schedule to broadcast the original_tx
259
        let _to_broadcast_in_failure_case = proposal.extract_tx_to_schedule_broadcast();
1✔
260

261
        // The network is used for checks later
262
        let network = bitcoind.get_blockchain_info().map_err(|e| Implementation(e.into()))?.chain;
1✔
263
        // Receive Check 1: Can Broadcast
264
        let proposal = proposal.check_broadcast_suitability(None, |tx| {
1✔
265
            let raw_tx = bitcoin::consensus::encode::serialize_hex(&tx);
1✔
266
            let mempool_results =
1✔
267
                bitcoind.test_mempool_accept(&[raw_tx]).map_err(|e| Implementation(e.into()))?;
1✔
268
            match mempool_results.first() {
1✔
269
                Some(result) => Ok(result.allowed),
1✔
NEW
270
                None => Err(Implementation(
×
271
                    anyhow!("No mempool results returned on broadcast check").into(),
×
272
                )),
×
273
            }
274
        })?;
1✔
275
        log::trace!("check1");
1✔
276

277
        // Receive Check 2: receiver can't sign for proposal inputs
278
        let proposal = proposal.check_inputs_not_owned(|input| {
1✔
279
            if let Ok(address) = bitcoin::Address::from_script(input, network) {
1✔
280
                bitcoind
1✔
281
                    .get_address_info(&address)
1✔
282
                    .map(|info| info.is_mine.unwrap_or(false))
1✔
283
                    .map_err(|e| Implementation(e.into()))
1✔
284
            } else {
285
                Ok(false)
×
286
            }
287
        })?;
1✔
288
        log::trace!("check2");
1✔
289

290
        // Receive Check 3: have we seen this input before? More of a check for non-interactive i.e. payment processor receivers.
291
        let payjoin = proposal.check_no_inputs_seen_before(|input| {
1✔
292
            self.db.insert_input_seen_before(*input).map_err(|e| Implementation(e.into()))
1✔
293
        })?;
1✔
294
        log::trace!("check3");
1✔
295

296
        let payjoin = payjoin
1✔
297
            .identify_receiver_outputs(|output_script| {
2✔
298
                if let Ok(address) = bitcoin::Address::from_script(output_script, network) {
2✔
299
                    bitcoind
2✔
300
                        .get_address_info(&address)
2✔
301
                        .map(|info| info.is_mine.unwrap_or(false))
2✔
302
                        .map_err(|e| Implementation(e.into()))
2✔
303
                } else {
304
                    Ok(false)
×
305
                }
306
            })?
2✔
307
            .commit_outputs();
1✔
308

1✔
309
        let provisional_payjoin = try_contributing_inputs(payjoin.clone(), &bitcoind)
1✔
310
            .unwrap_or_else(|e| {
1✔
311
                log::warn!("Failed to contribute inputs: {}", e);
×
312
                payjoin.commit_inputs()
×
313
            });
1✔
314

315
        let payjoin_proposal = provisional_payjoin.finalize_proposal(
1✔
316
            |psbt: &Psbt| {
1✔
317
                bitcoind
1✔
318
                    .wallet_process_psbt(&psbt.to_string(), None, None, Some(false))
1✔
319
                    .map(|res| Psbt::from_str(&res.psbt).map_err(|e| Implementation(e.into())))
1✔
320
                    .map_err(|e| Implementation(e.into()))?
1✔
321
            },
1✔
322
            None,
1✔
323
            self.config.max_fee_rate,
1✔
324
        )?;
1✔
325
        let payjoin_proposal_psbt = payjoin_proposal.psbt();
1✔
326
        log::debug!("Receiver's Payjoin proposal PSBT Rsponse: {:#?}", payjoin_proposal_psbt);
1✔
327
        Ok(payjoin_proposal)
1✔
328
    }
1✔
329
}
330

331
/// Handle request error by sending an error response over the directory
332
async fn handle_request_error(
×
333
    e: Error,
×
334
    mut receiver: UncheckedProposal,
×
335
    ohttp_relay: &payjoin::Url,
×
336
) -> anyhow::Error {
×
NEW
337
    if let Some((err_req, err_ctx)) = match receiver.extract_err_req(&e, ohttp_relay) {
×
338
        Ok(req_ctx) => req_ctx,
×
339
        Err(e) => return anyhow!("Failed to extract error request: {}", e),
×
340
    } {
NEW
341
        let err_response = match post_request(err_req).await {
×
NEW
342
            Ok(response) => response,
×
NEW
343
            Err(e) => return anyhow!("Failed to post error request: {}", e),
×
344
        };
345

NEW
346
        let err_bytes = match err_response.bytes().await {
×
NEW
347
            Ok(bytes) => bytes,
×
NEW
348
            Err(e) => return anyhow!("Failed to get error response bytes: {}", e),
×
349
        };
350

NEW
351
        if let Err(e) = receiver.process_err_res(&err_bytes, err_ctx) {
×
NEW
352
            return anyhow!("Failed to process error response: {}", e);
×
NEW
353
        }
×
354

×
NEW
355
        return e.into();
×
356
    }
×
NEW
357
    log::error!("Failed to extract error request: {}", e);
×
358
    e.into()
×
359
}
×
360

361
fn try_contributing_inputs(
1✔
362
    payjoin: payjoin::receive::v2::WantsInputs,
1✔
363
    bitcoind: &bitcoincore_rpc::Client,
1✔
364
) -> Result<payjoin::receive::v2::ProvisionalProposal> {
1✔
365
    let candidate_inputs = bitcoind
1✔
366
        .list_unspent(None, None, None, None, None)
1✔
367
        .context("Failed to list unspent from bitcoind")?
1✔
368
        .into_iter()
1✔
369
        .map(input_pair_from_list_unspent);
1✔
370
    let selected_input = payjoin
1✔
371
        .try_preserving_privacy(candidate_inputs)
1✔
372
        .map_err(|e| anyhow!("Failed to make privacy preserving selection: {}", e))?;
1✔
373
    log::debug!("selected input: {:#?}", selected_input);
1✔
374

375
    Ok(payjoin
1✔
376
        .contribute_inputs(vec![selected_input])
1✔
377
        .expect("This shouldn't happen. Failed to contribute inputs.")
1✔
378
        .commit_inputs())
1✔
379
}
1✔
380

381
async fn unwrap_ohttp_keys_or_else_fetch(config: &AppConfig) -> Result<payjoin::OhttpKeys> {
1✔
382
    if let Some(keys) = config.ohttp_keys.clone() {
1✔
383
        println!("Using OHTTP Keys from config");
1✔
384
        Ok(keys)
1✔
385
    } else {
386
        println!("Bootstrapping private network transport over Oblivious HTTP");
×
387
        let ohttp_relay = config.ohttp_relay.clone();
×
388
        let payjoin_directory = config.pj_directory.clone();
×
389
        #[cfg(feature = "_danger-local-https")]
390
        let ohttp_keys = {
×
391
            let cert_der = crate::app::read_local_cert()?;
×
392
            payjoin::io::fetch_ohttp_keys_with_cert(ohttp_relay, payjoin_directory, cert_der)
×
393
                .await?
×
394
        };
395
        #[cfg(not(feature = "_danger-local-https"))]
396
        let ohttp_keys = payjoin::io::fetch_ohttp_keys(ohttp_relay, payjoin_directory).await?;
397
        Ok(ohttp_keys)
×
398
    }
399
}
1✔
400

401
async fn handle_interrupt(tx: watch::Sender<()>) {
4✔
402
    if let Err(e) = signal::ctrl_c().await {
4✔
403
        eprintln!("Error setting up Ctrl-C handler: {}", e);
×
404
    }
4✔
405
    let _ = tx.send(());
4✔
406
}
4✔
407

408
async fn post_request(req: payjoin::Request) -> Result<reqwest::Response> {
7✔
409
    let http = http_agent()?;
7✔
410
    http.post(req.url)
7✔
411
        .header("Content-Type", req.content_type)
7✔
412
        .body(req.body)
7✔
413
        .send()
7✔
414
        .await
7✔
415
        .map_err(map_reqwest_err)
6✔
416
}
6✔
417

418
fn map_reqwest_err(e: reqwest::Error) -> anyhow::Error {
×
419
    match e.status() {
×
420
        Some(status_code) => anyhow!("HTTP request failed: {} {}", status_code, e),
×
421
        None => anyhow!("No HTTP response: {}", e),
×
422
    }
423
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc