• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

payjoin / rust-payjoin / 15642374656

13 Jun 2025 07:22PM UTC coverage: 84.642% (-1.4%) from 86.035%
15642374656

Pull #768

github

web-flow
Merge 605fdfd6b into be0597869
Pull Request #768: Add PartialEq/Eq to errors for easier comparison

22 of 162 new or added lines in 13 files covered. (13.58%)

304 existing lines in 9 files now uncovered.

7137 of 8432 relevant lines covered (84.64%)

550.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.43
/payjoin-cli/src/app/v2/mod.rs
1
use std::sync::{Arc, Mutex};
2

3
use anyhow::{anyhow, Context, Result};
4
use payjoin::bitcoin::consensus::encode::serialize_hex;
5
use payjoin::bitcoin::psbt::Psbt;
6
use payjoin::bitcoin::{Amount, FeeRate};
7
use payjoin::receive::v2::{
8
    NewReceiver, PayjoinProposal, ProvisionalProposal, Receiver, UncheckedProposal, WantsInputs,
9
    WithContext,
10
};
11
use payjoin::receive::{Error, ReplyableError};
12
use payjoin::send::v2::{Sender, SenderBuilder, WithReplyKey};
13
use payjoin::{ImplementationError, Uri};
14
use tokio::sync::watch;
15

16
use super::config::Config;
17
use super::wallet::BitcoindWallet;
18
use super::App as AppTrait;
19
use crate::app::v2::ohttp::{unwrap_ohttp_keys_or_else_fetch, RelayManager};
20
use crate::app::{handle_interrupt, http_agent};
21
use crate::db::v2::{ReceiverPersister, SenderPersister};
22
use crate::db::Database;
23

24
mod ohttp;
25

26
#[derive(Clone)]
27
pub(crate) struct App {
28
    config: Config,
29
    db: Arc<Database>,
30
    wallet: BitcoindWallet,
31
    interrupt: watch::Receiver<()>,
32
    relay_manager: Arc<Mutex<RelayManager>>,
33
}
34

35
#[async_trait::async_trait]
36
impl AppTrait for App {
37
    fn new(config: Config) -> Result<Self> {
4✔
38
        let db = Arc::new(Database::create(&config.db_path)?);
4✔
39
        let relay_manager = Arc::new(Mutex::new(RelayManager::new()));
4✔
40
        let (interrupt_tx, interrupt_rx) = watch::channel(());
4✔
41
        tokio::spawn(handle_interrupt(interrupt_tx));
4✔
42
        let wallet = BitcoindWallet::new(&config.bitcoind)?;
4✔
43
        let app = Self { config, db, wallet, interrupt: interrupt_rx, relay_manager };
4✔
44
        app.wallet()
4✔
45
            .network()
4✔
46
            .context("Failed to connect to bitcoind. Check config RPC connection.")?;
4✔
47
        Ok(app)
4✔
48
    }
4✔
49

50
    fn wallet(&self) -> BitcoindWallet { self.wallet.clone() }
10✔
51

52
    async fn send_payjoin(&self, bip21: &str, fee_rate: FeeRate) -> Result<()> {
2✔
53
        use payjoin::UriExt;
54
        let uri =
2✔
55
            Uri::try_from(bip21).map_err(|e| anyhow!("Failed to create URI from BIP21: {}", e))?;
2✔
56
        let uri = uri.assume_checked();
2✔
57
        let uri = uri.check_pj_supported().map_err(|_| anyhow!("URI does not support Payjoin"))?;
2✔
58
        let url = uri.extras.endpoint();
2✔
59
        // match bip21 to send_session public_key
60
        let req_ctx = match self.db.get_send_session(url)? {
2✔
61
            Some(send_session) => send_session,
1✔
62
            None => {
63
                let psbt = self.create_original_psbt(&uri, fee_rate)?;
1✔
64
                let mut persister = SenderPersister::new(self.db.clone());
1✔
65
                let new_sender = SenderBuilder::new(psbt, uri.clone())
1✔
66
                    .build_recommended(fee_rate)
1✔
67
                    .with_context(|| "Failed to build payjoin request")?;
1✔
68
                let storage_token = new_sender
1✔
69
                    .persist(&mut persister)
1✔
70
                    .map_err(|e| anyhow!("Failed to persist sender: {}", e))?;
1✔
71
                Sender::load(storage_token, &persister)
1✔
72
                    .map_err(|e| anyhow!("Failed to load sender: {}", e))?
1✔
73
            }
74
        };
75
        self.spawn_payjoin_sender(req_ctx).await
2✔
76
    }
4✔
77

78
    async fn receive_payjoin(&self, amount: Amount) -> Result<()> {
1✔
79
        let address = self.wallet().get_new_address()?;
1✔
80
        let ohttp_keys =
1✔
81
            unwrap_ohttp_keys_or_else_fetch(&self.config, None, self.relay_manager.clone())
1✔
82
                .await?
1✔
83
                .ohttp_keys;
84
        let mut persister = ReceiverPersister::new(self.db.clone());
1✔
85
        let new_receiver = NewReceiver::new(
1✔
86
            address,
1✔
87
            self.config.v2()?.pj_directory.clone(),
1✔
88
            ohttp_keys.clone(),
1✔
89
            None,
1✔
90
        )?;
×
91
        let storage_token = new_receiver
1✔
92
            .persist(&mut persister)
1✔
93
            .map_err(|e| anyhow!("Failed to persist receiver: {}", e))?;
1✔
94
        let session = Receiver::load(storage_token, &persister)
1✔
95
            .map_err(|e| anyhow!("Failed to load receiver: {}", e))?;
1✔
96
        self.spawn_payjoin_receiver(session, Some(amount)).await
1✔
97
    }
2✔
98

99
    #[allow(clippy::incompatible_msrv)]
100
    async fn resume_payjoins(&self) -> Result<()> {
1✔
101
        let recv_sessions = self.db.get_recv_sessions()?;
1✔
102
        let send_sessions = self.db.get_send_sessions()?;
1✔
103

104
        if recv_sessions.is_empty() && send_sessions.is_empty() {
1✔
105
            println!("No sessions to resume.");
×
106
            return Ok(());
×
107
        }
1✔
108

1✔
109
        let mut tasks = Vec::new();
1✔
110

111
        for session in recv_sessions {
2✔
112
            let self_clone = self.clone();
1✔
113
            tasks.push(tokio::spawn(async move {
1✔
114
                self_clone.spawn_payjoin_receiver(session, None).await
1✔
115
            }));
1✔
116
        }
1✔
117

118
        for session in send_sessions {
1✔
119
            let self_clone = self.clone();
×
120
            tasks.push(tokio::spawn(async move { self_clone.spawn_payjoin_sender(session).await }));
×
121
        }
×
122

123
        let mut interrupt = self.interrupt.clone();
1✔
124
        tokio::select! {
1✔
125
            _ = async {
1✔
126
                for task in tasks {
1✔
127
                    let _ = task.await;
1✔
128
                }
129
            } => {
×
130
                println!("All resumed sessions completed.");
×
131
            }
×
132
            _ = interrupt.changed() => {
1✔
133
                println!("Resumed sessions were interrupted.");
1✔
134
            }
1✔
135
        }
136
        Ok(())
1✔
137
    }
2✔
138
}
139

140
impl App {
141
    #[allow(clippy::incompatible_msrv)]
142
    async fn spawn_payjoin_sender(&self, mut req_ctx: Sender<WithReplyKey>) -> Result<()> {
2✔
143
        let mut interrupt = self.interrupt.clone();
2✔
144
        tokio::select! {
2✔
145
            res = self.long_poll_post(&mut req_ctx) => {
2✔
146
                self.process_pj_response(res?)?;
1✔
147
                self.db.clear_send_session(req_ctx.endpoint())?;
1✔
148
            }
149
            _ = interrupt.changed() => {
2✔
150
                println!("Interrupted. Call `send` with the same arguments to resume this session or `resume` to resume all sessions.");
1✔
151
            }
1✔
152
        }
153
        Ok(())
2✔
154
    }
2✔
155

156
    #[allow(clippy::incompatible_msrv)]
157
    async fn spawn_payjoin_receiver(
2✔
158
        &self,
2✔
159
        mut session: Receiver<WithContext>,
2✔
160
        amount: Option<Amount>,
2✔
161
    ) -> Result<()> {
2✔
162
        println!("Receive session established");
2✔
163
        let mut pj_uri = session.pj_uri();
2✔
164
        pj_uri.amount = amount;
2✔
165
        let ohttp_relay = self
2✔
166
            .unwrap_relay_or_else_fetch(Some(session.pj_uri().extras.endpoint().clone()))
2✔
167
            .await?;
2✔
168

169
        println!("Request Payjoin by sharing this Payjoin Uri:");
2✔
170
        println!("{pj_uri}");
2✔
171

2✔
172
        let mut interrupt = self.interrupt.clone();
2✔
173
        let receiver = tokio::select! {
2✔
174
            res = self.long_poll_fallback(&mut session) => res,
2✔
175
            _ = interrupt.changed() => {
2✔
176
                println!("Interrupted. Call the `resume` command to resume all sessions.");
1✔
177
                return Ok(());
1✔
178
            }
179
        }?;
×
180

181
        println!("Fallback transaction received. Consider broadcasting this to get paid if the Payjoin fails:");
1✔
182
        println!("{}", serialize_hex(&receiver.extract_tx_to_schedule_broadcast()));
1✔
183
        let mut payjoin_proposal = match self.process_v2_proposal(receiver.clone()) {
1✔
184
            Ok(proposal) => proposal,
1✔
185
            Err(Error::ReplyToSender(e)) => {
×
186
                return Err(handle_recoverable_error(e, receiver, &ohttp_relay).await);
×
187
            }
188
            Err(e) => return Err(e.into()),
×
189
        };
190
        let (req, ohttp_ctx) = payjoin_proposal
1✔
191
            .extract_req(ohttp_relay)
1✔
192
            .map_err(|e| anyhow!("v2 req extraction failed {}", e))?;
1✔
193
        println!("Got a request from the sender. Responding with a Payjoin proposal.");
1✔
194
        let res = post_request(req).await?;
1✔
195
        payjoin_proposal
1✔
196
            .process_res(&res.bytes().await?, ohttp_ctx)
1✔
197
            .map_err(|e| anyhow!("Failed to deserialize response {}", e))?;
1✔
198
        let payjoin_psbt = payjoin_proposal.psbt().clone();
1✔
199
        println!(
1✔
200
            "Response successful. Watch mempool for successful Payjoin. TXID: {}",
1✔
201
            payjoin_psbt.extract_tx_unchecked_fee_rate().clone().compute_txid()
1✔
202
        );
1✔
203
        self.db.clear_recv_session()?;
1✔
204
        Ok(())
1✔
205
    }
2✔
206

207
    async fn long_poll_post(&self, req_ctx: &mut Sender<WithReplyKey>) -> Result<Psbt> {
2✔
208
        let ohttp_relay = self.unwrap_relay_or_else_fetch(Some(req_ctx.endpoint().clone())).await?;
2✔
209

210
        match req_ctx.extract_v2(ohttp_relay.clone()) {
2✔
211
            Ok((req, ctx)) => {
2✔
212
                println!("Posting Original PSBT Payload request...");
2✔
213
                let response = post_request(req).await?;
2✔
214
                println!("Sent fallback transaction");
2✔
215
                let v2_ctx = Arc::new(ctx.process_response(&response.bytes().await?)?);
2✔
216
                loop {
217
                    let (req, ohttp_ctx) = v2_ctx.extract_req(&ohttp_relay)?;
3✔
218
                    let response = post_request(req).await?;
3✔
219
                    match v2_ctx.process_response(&response.bytes().await?, ohttp_ctx) {
2✔
220
                        Ok(Some(psbt)) => return Ok(psbt),
1✔
221
                        Ok(None) => {
1✔
222
                            println!("No response yet.");
1✔
223
                        }
1✔
224
                        Err(re) => {
×
225
                            println!("{re}");
×
226
                            log::debug!("{re:?}");
×
227
                            return Err(anyhow!("Response error").context(re));
×
228
                        }
229
                    }
230
                }
231
            }
232
            Err(_) => {
233
                let (req, v1_ctx) = req_ctx.extract_v1();
×
234
                println!("Posting Original PSBT Payload request...");
×
235
                let response = post_request(req).await?;
×
236
                println!("Sent fallback transaction");
×
237
                match v1_ctx.process_response(&response.bytes().await?) {
×
238
                    Ok(psbt) => Ok(psbt),
×
239
                    Err(re) => {
×
240
                        println!("{re}");
×
241
                        log::debug!("{re:?}");
×
242
                        Err(anyhow!("Response error").context(re))
×
243
                    }
244
                }
245
            }
246
        }
247
    }
1✔
248

249
    async fn long_poll_fallback(
2✔
250
        &self,
2✔
251
        session: &mut Receiver<WithContext>,
2✔
252
    ) -> Result<Receiver<UncheckedProposal>> {
2✔
253
        let ohttp_relay = self
2✔
254
            .unwrap_relay_or_else_fetch(Some(session.pj_uri().extras.endpoint().clone()))
2✔
255
            .await?;
2✔
256

257
        loop {
258
            let (req, context) = session.extract_req(&ohttp_relay)?;
2✔
259
            println!("Polling receive request...");
2✔
260
            let ohttp_response = post_request(req).await?;
2✔
261
            let proposal = session
1✔
262
                .process_res(ohttp_response.bytes().await?.to_vec().as_slice(), context)
1✔
263
                .map_err(|_| anyhow!("GET fallback failed"))?;
1✔
264
            log::debug!("got response");
1✔
265
            if let Some(proposal) = proposal {
1✔
266
                break Ok(proposal);
1✔
267
            }
×
268
        }
269
    }
1✔
270

271
    fn process_v2_proposal(
1✔
272
        &self,
1✔
273
        proposal: Receiver<UncheckedProposal>,
1✔
274
    ) -> Result<Receiver<PayjoinProposal>, Error> {
1✔
275
        let wallet = self.wallet();
1✔
276

1✔
277
        // in a payment processor where the sender could go offline, this is where you schedule to broadcast the original_tx
1✔
278
        let _to_broadcast_in_failure_case = proposal.extract_tx_to_schedule_broadcast();
1✔
279

280
        // Receive Check 1: Can Broadcast
281
        let proposal =
1✔
282
            proposal.check_broadcast_suitability(None, |tx| Ok(wallet.can_broadcast(tx)?))?;
1✔
283
        log::trace!("check1");
1✔
284

285
        // Receive Check 2: receiver can't sign for proposal inputs
286
        let proposal = proposal.check_inputs_not_owned(|input| Ok(wallet.is_mine(input)?))?;
1✔
287
        log::trace!("check2");
1✔
288

289
        // Receive Check 3: have we seen this input before? More of a check for non-interactive i.e. payment processor receivers.
290
        let payjoin = proposal
1✔
291
            .check_no_inputs_seen_before(|input| Ok(self.db.insert_input_seen_before(*input)?))?;
1✔
292
        log::trace!("check3");
1✔
293

294
        let payjoin = payjoin
1✔
295
            .identify_receiver_outputs(|output_script| Ok(wallet.is_mine(output_script)?))?
2✔
296
            .commit_outputs();
1✔
297

298
        let provisional_payjoin = try_contributing_inputs(payjoin.clone(), &wallet)
1✔
299
            .map_err(ReplyableError::Implementation)?;
1✔
300

301
        let payjoin_proposal = provisional_payjoin.finalize_proposal(
1✔
302
            |psbt| Ok(wallet.process_psbt(psbt)?),
1✔
303
            None,
1✔
304
            self.config.max_fee_rate,
1✔
305
        )?;
1✔
306
        let payjoin_proposal_psbt = payjoin_proposal.psbt();
1✔
307
        log::debug!("Receiver's Payjoin proposal PSBT Rsponse: {payjoin_proposal_psbt:#?}");
1✔
308
        Ok(payjoin_proposal)
1✔
309
    }
1✔
310

311
    async fn unwrap_relay_or_else_fetch(
6✔
312
        &self,
6✔
313
        directory: Option<payjoin::Url>,
6✔
314
    ) -> Result<payjoin::Url> {
6✔
315
        let selected_relay =
6✔
316
            self.relay_manager.lock().expect("Lock should not be poisoned").get_selected_relay();
6✔
317
        let ohttp_relay = match selected_relay {
6✔
318
            Some(relay) => relay,
1✔
319
            None =>
320
                unwrap_ohttp_keys_or_else_fetch(&self.config, directory, self.relay_manager.clone())
5✔
321
                    .await?
5✔
322
                    .relay_url,
323
        };
324
        Ok(ohttp_relay)
6✔
325
    }
6✔
326
}
327

328
/// Handle request error by sending an error response over the directory
UNCOV
329
async fn handle_recoverable_error(
×
UNCOV
330
    e: ReplyableError,
×
UNCOV
331
    mut receiver: Receiver<UncheckedProposal>,
×
UNCOV
332
    ohttp_relay: &payjoin::Url,
×
UNCOV
333
) -> anyhow::Error {
×
UNCOV
334
    let to_return = anyhow!("Replied with error: {}", e);
×
UNCOV
335
    let (err_req, err_ctx) = match receiver.extract_err_req(&e.into(), ohttp_relay) {
×
UNCOV
336
        Ok(req_ctx) => req_ctx,
×
UNCOV
337
        Err(e) => return anyhow!("Failed to extract error request: {}", e),
×
338
    };
339

UNCOV
340
    let err_response = match post_request(err_req).await {
×
UNCOV
341
        Ok(response) => response,
×
UNCOV
342
        Err(e) => return anyhow!("Failed to post error request: {}", e),
×
343
    };
344

345
    let err_bytes = match err_response.bytes().await {
×
346
        Ok(bytes) => bytes,
×
347
        Err(e) => return anyhow!("Failed to get error response bytes: {}", e),
×
348
    };
349

350
    if let Err(e) = receiver.process_err_res(&err_bytes, err_ctx) {
×
351
        return anyhow!("Failed to process error response: {}", e);
×
UNCOV
352
    }
×
UNCOV
353

×
354
    to_return
×
355
}
×
356

357
fn try_contributing_inputs(
1✔
358
    payjoin: Receiver<WantsInputs>,
1✔
359
    wallet: &BitcoindWallet,
1✔
360
) -> Result<Receiver<ProvisionalProposal>, ImplementationError> {
1✔
361
    let candidate_inputs = wallet.list_unspent()?;
1✔
362

363
    let selected_input =
1✔
364
        payjoin.try_preserving_privacy(candidate_inputs).map_err(ImplementationError::new)?;
1✔
365

366
    Ok(payjoin
1✔
367
        .contribute_inputs(vec![selected_input])
1✔
368
        .map_err(ImplementationError::new)?
1✔
369
        .commit_inputs())
1✔
370
}
1✔
371

372
async fn post_request(req: payjoin::Request) -> Result<reqwest::Response> {
8✔
373
    let http = http_agent()?;
8✔
374
    http.post(req.url)
8✔
375
        .header("Content-Type", req.content_type)
8✔
376
        .body(req.body)
8✔
377
        .send()
8✔
378
        .await
8✔
379
        .map_err(map_reqwest_err)
6✔
380
}
6✔
381

UNCOV
382
fn map_reqwest_err(e: reqwest::Error) -> anyhow::Error {
×
383
    match e.status() {
×
UNCOV
384
        Some(status_code) => anyhow!("HTTP request failed: {} {}", status_code, e),
×
UNCOV
385
        None => anyhow!("No HTTP response: {}", e),
×
386
    }
UNCOV
387
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc