• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

payjoin / rust-payjoin / 14228368139

02 Apr 2025 07:54PM UTC coverage: 80.879% (-0.7%) from 81.608%
14228368139

Pull #631

github

web-flow
Merge fe2ceba4a into fde867b93
Pull Request #631: Create clear command in payjoin-cli

26 of 89 new or added lines in 5 files covered. (29.21%)

1 existing line in 1 file now uncovered.

5279 of 6527 relevant lines covered (80.88%)

708.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.36
/payjoin-cli/src/app/v2.rs
1
use std::sync::Arc;
2

3
use anyhow::{anyhow, Context, Result};
4
use payjoin::bitcoin::consensus::encode::serialize_hex;
5
use payjoin::bitcoin::psbt::Psbt;
6
use payjoin::bitcoin::{Amount, FeeRate};
7
use payjoin::receive::v2::{NewReceiver, Receiver, UncheckedProposal};
8
use payjoin::receive::{Error, ImplementationError, ReplyableError};
9
use payjoin::send::v2::{Sender, SenderBuilder};
10
use payjoin::Uri;
11
use tokio::sync::watch;
12

13
use super::config::Config;
14
use super::wallet::BitcoindWallet;
15
use super::App as AppTrait;
16
use crate::app::{handle_interrupt, http_agent};
17
use crate::db::v2::{ReceiverPersister, SenderPersister};
18
use crate::db::Database;
19

20
#[derive(Clone)]
21
pub(crate) struct App {
22
    config: Config,
23
    db: Arc<Database>,
24
    wallet: BitcoindWallet,
25
    interrupt: watch::Receiver<()>,
26
}
27

28
#[async_trait::async_trait]
29
impl AppTrait for App {
30
    fn new(config: Config) -> Result<Self> {
4✔
31
        let db = Arc::new(Database::create(&config.db_path)?);
4✔
32
        let (interrupt_tx, interrupt_rx) = watch::channel(());
4✔
33
        tokio::spawn(handle_interrupt(interrupt_tx));
4✔
34
        let wallet = BitcoindWallet::new(&config.bitcoind)?;
4✔
35
        let app = Self { config, db, wallet, interrupt: interrupt_rx };
4✔
36
        app.wallet()
4✔
37
            .network()
4✔
38
            .context("Failed to connect to bitcoind. Check config RPC connection.")?;
4✔
39
        Ok(app)
4✔
40
    }
4✔
41

42
    fn wallet(&self) -> BitcoindWallet { self.wallet.clone() }
10✔
43

44
    async fn send_payjoin(&self, bip21: &str, fee_rate: FeeRate) -> Result<()> {
2✔
45
        use payjoin::UriExt;
46
        let uri =
2✔
47
            Uri::try_from(bip21).map_err(|e| anyhow!("Failed to create URI from BIP21: {}", e))?;
2✔
48
        let uri = uri.assume_checked();
2✔
49
        let uri = uri.check_pj_supported().map_err(|_| anyhow!("URI does not support Payjoin"))?;
2✔
50
        let url = uri.extras.endpoint();
2✔
51
        // match bip21 to send_session public_key
52
        let req_ctx = match self.db.get_send_session(url)? {
2✔
53
            Some(send_session) => send_session,
1✔
54
            None => {
55
                let psbt = self.create_original_psbt(&uri, fee_rate)?;
1✔
56
                let mut persister = SenderPersister::new(self.db.clone());
1✔
57
                let new_sender = SenderBuilder::new(psbt, uri.clone())
1✔
58
                    .build_recommended(fee_rate)
1✔
59
                    .with_context(|| "Failed to build payjoin request")?;
1✔
60
                let storage_token = new_sender
1✔
61
                    .persist(&mut persister)
1✔
62
                    .map_err(|e| anyhow!("Failed to persist sender: {}", e))?;
1✔
63
                Sender::load(storage_token, &persister)
1✔
64
                    .map_err(|e| anyhow!("Failed to load sender: {}", e))?
1✔
65
            }
66
        };
67
        self.spawn_payjoin_sender(req_ctx).await
2✔
68
    }
4✔
69

70
    async fn receive_payjoin(&self, amount: Amount) -> Result<()> {
1✔
71
        let address = self.wallet().get_new_address()?;
1✔
72
        let ohttp_keys = unwrap_ohttp_keys_or_else_fetch(&self.config).await?;
1✔
73
        let mut persister = ReceiverPersister::new(self.db.clone());
1✔
74
        let new_receiver = NewReceiver::new(
1✔
75
            address,
1✔
76
            self.config.v2()?.pj_directory.clone(),
1✔
77
            ohttp_keys.clone(),
1✔
78
            None,
1✔
79
        )?;
×
80
        let storage_token = new_receiver
1✔
81
            .persist(&mut persister)
1✔
82
            .map_err(|e| anyhow!("Failed to persist receiver: {}", e))?;
1✔
83
        let session = Receiver::load(storage_token, &persister)
1✔
84
            .map_err(|e| anyhow!("Failed to load receiver: {}", e))?;
1✔
85
        self.spawn_payjoin_receiver(session, Some(amount)).await
1✔
86
    }
2✔
87

88
    #[allow(clippy::incompatible_msrv)]
89
    async fn resume_payjoins(&self) -> Result<()> {
1✔
90
        let recv_sessions = self.db.get_recv_sessions()?;
1✔
91
        let send_sessions = self.db.get_send_sessions()?;
1✔
92

93
        if recv_sessions.is_empty() && send_sessions.is_empty() {
1✔
94
            println!("No sessions to resume.");
×
95
            return Ok(());
×
96
        }
1✔
97

1✔
98
        let mut tasks = Vec::new();
1✔
99

100
        for session in recv_sessions {
2✔
101
            let self_clone = self.clone();
1✔
102
            tasks.push(tokio::spawn(async move {
1✔
103
                self_clone.spawn_payjoin_receiver(session, None).await
1✔
104
            }));
1✔
105
        }
1✔
106

107
        for session in send_sessions {
1✔
108
            let self_clone = self.clone();
×
109
            tasks.push(tokio::spawn(async move { self_clone.spawn_payjoin_sender(session).await }));
×
110
        }
×
111

112
        let mut interrupt = self.interrupt.clone();
1✔
113
        tokio::select! {
1✔
114
            _ = async {
1✔
115
                for task in tasks {
1✔
116
                    let _ = task.await;
1✔
117
                }
118
            } => {
×
119
                println!("All resumed sessions completed.");
×
120
            }
×
121
            _ = interrupt.changed() => {
1✔
122
                println!("Resumed sessions were interrupted.");
1✔
123
            }
1✔
124
        }
125
        Ok(())
1✔
126
    }
2✔
127

NEW
128
    async fn clear_payjoins(&self, bip_21: Option<&str>) -> Result<()> {
×
129
        use payjoin::UriExt;
NEW
130
        if let Some(bip_21) = bip_21 {
×
NEW
131
            let uri = Uri::try_from(bip_21)
×
NEW
132
                .map_err(|e| anyhow!("Failed to create URI from BIP21: {}", e))?;
×
NEW
133
            let uri = uri.assume_checked();
×
NEW
134
            let uri =
×
NEW
135
                uri.check_pj_supported().map_err(|_| anyhow!("URI does not support Payjoin"))?;
×
NEW
136
            let recv_session = self.db.get_recv_session(uri.extras.endpoint())?;
×
NEW
137
            let send_session = self.db.get_send_session(uri.extras.endpoint())?;
×
138

NEW
139
            if recv_session.is_some() || send_session.is_some() {
×
NEW
140
                if recv_session.is_some() {
×
NEW
141
                    println!("Removing receiver session with uri: {}", uri);
×
NEW
142
                    self.db.clear_recv_session(uri.extras.endpoint())?;
×
NEW
143
                }
×
NEW
144
                if send_session.is_some() {
×
NEW
145
                    println!("Removing sender session with uri: {}", uri);
×
NEW
146
                    self.db.clear_send_session(uri.extras.endpoint())?;
×
NEW
147
                }
×
NEW
148
            } else {
×
NEW
149
                println!("No matching session found with uri: {}", uri);
×
NEW
150
            }
×
151
        } else {
NEW
152
            let send_sessions = self.db.get_send_sessions()?;
×
NEW
153
            self.db.clear_recv_sessions()?;
×
NEW
154
            for psbt in send_sessions {
×
NEW
155
                self.db.clear_send_session(psbt.endpoint())?;
×
156
            }
NEW
157
            println!("All sessions removed.")
×
158
        }
159

NEW
160
        Ok(())
×
NEW
161
    }
×
162
}
163

164
impl App {
165
    #[allow(clippy::incompatible_msrv)]
166
    async fn spawn_payjoin_sender(&self, mut req_ctx: Sender) -> Result<()> {
2✔
167
        let mut interrupt = self.interrupt.clone();
2✔
168
        tokio::select! {
2✔
169
            res = self.long_poll_post(&mut req_ctx) => {
2✔
170
                self.process_pj_response(res?)?;
1✔
171
                self.db.clear_send_session(req_ctx.endpoint())?;
1✔
172
            }
173
            _ = interrupt.changed() => {
2✔
174
                println!("Interrupted. Call `send` with the same arguments to resume this session or `resume` to resume all sessions.");
1✔
175
            }
1✔
176
        }
177
        Ok(())
2✔
178
    }
2✔
179

180
    #[allow(clippy::incompatible_msrv)]
181
    async fn spawn_payjoin_receiver(
2✔
182
        &self,
2✔
183
        mut session: Receiver,
2✔
184
        amount: Option<Amount>,
2✔
185
    ) -> Result<()> {
2✔
186
        println!("Receive session established");
2✔
187
        let mut pj_uri = session.pj_uri();
2✔
188
        pj_uri.amount = amount;
2✔
189
        println!("Request Payjoin by sharing this Payjoin Uri:");
2✔
190
        println!("{}", pj_uri);
2✔
191

2✔
192
        let mut interrupt = self.interrupt.clone();
2✔
193
        let receiver = tokio::select! {
2✔
194
            res = self.long_poll_fallback(&mut session) => res,
2✔
195
            _ = interrupt.changed() => {
2✔
196
                println!("Interrupted. Call the `resume` command to resume all sessions.");
1✔
197
                return Ok(());
1✔
198
            }
199
        }?;
×
200

201
        println!("Fallback transaction received. Consider broadcasting this to get paid if the Payjoin fails:");
1✔
202
        println!("{}", serialize_hex(&receiver.extract_tx_to_schedule_broadcast()));
1✔
203
        let mut payjoin_proposal = match self.process_v2_proposal(receiver.clone()) {
1✔
204
            Ok(proposal) => proposal,
1✔
205
            Err(Error::ReplyToSender(e)) => {
×
206
                return Err(
×
207
                    handle_recoverable_error(e, receiver, &self.config.v2()?.ohttp_relay).await
×
208
                );
209
            }
210
            Err(e) => return Err(e.into()),
×
211
        };
212
        let (req, ohttp_ctx) = payjoin_proposal
1✔
213
            .extract_req(&self.config.v2()?.ohttp_relay)
1✔
214
            .map_err(|e| anyhow!("v2 req extraction failed {}", e))?;
1✔
215
        println!("Got a request from the sender. Responding with a Payjoin proposal.");
1✔
216
        let res = post_request(req).await?;
1✔
217
        payjoin_proposal
1✔
218
            .process_res(&res.bytes().await?, ohttp_ctx)
1✔
219
            .map_err(|e| anyhow!("Failed to deserialize response {}", e))?;
1✔
220
        let payjoin_psbt = payjoin_proposal.psbt().clone();
1✔
221
        println!(
1✔
222
            "Response successful. Watch mempool for successful Payjoin. TXID: {}",
1✔
223
            payjoin_psbt.extract_tx_unchecked_fee_rate().clone().compute_txid()
1✔
224
        );
1✔
225
        self.db.clear_recv_sessions()?;
1✔
226
        Ok(())
1✔
227
    }
2✔
228

229
    async fn long_poll_post(&self, req_ctx: &mut Sender) -> Result<Psbt> {
2✔
230
        match req_ctx.extract_v2(self.config.v2()?.ohttp_relay.clone()) {
2✔
231
            Ok((req, ctx)) => {
2✔
232
                println!("Posting Original PSBT Payload request...");
2✔
233
                let response = post_request(req).await?;
2✔
234
                println!("Sent fallback transaction");
2✔
235
                let v2_ctx = Arc::new(ctx.process_response(&response.bytes().await?)?);
2✔
236
                loop {
237
                    let (req, ohttp_ctx) =
3✔
238
                        v2_ctx.extract_req(self.config.v2()?.ohttp_relay.clone())?;
3✔
239
                    let response = post_request(req).await?;
3✔
240
                    match v2_ctx.process_response(&response.bytes().await?, ohttp_ctx) {
2✔
241
                        Ok(Some(psbt)) => return Ok(psbt),
1✔
242
                        Ok(None) => {
1✔
243
                            println!("No response yet.");
1✔
244
                        }
1✔
245
                        Err(re) => {
×
246
                            println!("{}", re);
×
247
                            log::debug!("{:?}", re);
×
248
                            return Err(anyhow!("Response error").context(re));
×
249
                        }
250
                    }
251
                }
252
            }
253
            Err(_) => {
254
                let (req, v1_ctx) = req_ctx.extract_v1();
×
255
                println!("Posting Original PSBT Payload request...");
×
256
                let response = post_request(req).await?;
×
257
                println!("Sent fallback transaction");
×
258
                match v1_ctx.process_response(&mut response.bytes().await?.to_vec().as_slice()) {
×
259
                    Ok(psbt) => Ok(psbt),
×
260
                    Err(re) => {
×
261
                        println!("{}", re);
×
262
                        log::debug!("{:?}", re);
×
263
                        Err(anyhow!("Response error").context(re))
×
264
                    }
265
                }
266
            }
267
        }
268
    }
1✔
269

270
    async fn long_poll_fallback(
2✔
271
        &self,
2✔
272
        session: &mut payjoin::receive::v2::Receiver,
2✔
273
    ) -> Result<payjoin::receive::v2::UncheckedProposal> {
2✔
274
        loop {
275
            let (req, context) = session.extract_req(&self.config.v2()?.ohttp_relay)?;
2✔
276
            println!("Polling receive request...");
2✔
277
            let ohttp_response = post_request(req).await?;
2✔
278
            let proposal = session
1✔
279
                .process_res(ohttp_response.bytes().await?.to_vec().as_slice(), context)
1✔
280
                .map_err(|_| anyhow!("GET fallback failed"))?;
1✔
281
            log::debug!("got response");
1✔
282
            if let Some(proposal) = proposal {
1✔
283
                break Ok(proposal);
1✔
284
            }
×
285
        }
286
    }
1✔
287

288
    fn process_v2_proposal(
1✔
289
        &self,
1✔
290
        proposal: payjoin::receive::v2::UncheckedProposal,
1✔
291
    ) -> Result<payjoin::receive::v2::PayjoinProposal, Error> {
1✔
292
        let wallet = self.wallet();
1✔
293

1✔
294
        // in a payment processor where the sender could go offline, this is where you schedule to broadcast the original_tx
1✔
295
        let _to_broadcast_in_failure_case = proposal.extract_tx_to_schedule_broadcast();
1✔
296

297
        // Receive Check 1: Can Broadcast
298
        let proposal =
1✔
299
            proposal.check_broadcast_suitability(None, |tx| Ok(wallet.can_broadcast(tx)?))?;
1✔
300
        log::trace!("check1");
1✔
301

302
        // Receive Check 2: receiver can't sign for proposal inputs
303
        let proposal = proposal.check_inputs_not_owned(|input| Ok(wallet.is_mine(input)?))?;
1✔
304
        log::trace!("check2");
1✔
305

306
        // Receive Check 3: have we seen this input before? More of a check for non-interactive i.e. payment processor receivers.
307
        let payjoin = proposal
1✔
308
            .check_no_inputs_seen_before(|input| Ok(self.db.insert_input_seen_before(*input)?))?;
1✔
309
        log::trace!("check3");
1✔
310

311
        let payjoin = payjoin
1✔
312
            .identify_receiver_outputs(|output_script| Ok(wallet.is_mine(output_script)?))?
2✔
313
            .commit_outputs();
1✔
314

315
        let provisional_payjoin = try_contributing_inputs(payjoin.clone(), &wallet)
1✔
316
            .map_err(ReplyableError::Implementation)?;
1✔
317

318
        let payjoin_proposal = provisional_payjoin.finalize_proposal(
1✔
319
            |psbt| Ok(wallet.process_psbt(psbt)?),
1✔
320
            None,
1✔
321
            self.config.max_fee_rate,
1✔
322
        )?;
1✔
323
        let payjoin_proposal_psbt = payjoin_proposal.psbt();
1✔
324
        log::debug!("Receiver's Payjoin proposal PSBT Rsponse: {:#?}", payjoin_proposal_psbt);
1✔
325
        Ok(payjoin_proposal)
1✔
326
    }
1✔
327
}
328

329
/// Handle request error by sending an error response over the directory
330
async fn handle_recoverable_error(
×
331
    e: ReplyableError,
×
332
    mut receiver: UncheckedProposal,
×
333
    ohttp_relay: &payjoin::Url,
×
334
) -> anyhow::Error {
×
335
    let to_return = anyhow!("Replied with error: {}", e);
×
336
    let (err_req, err_ctx) = match receiver.extract_err_req(&e.into(), ohttp_relay) {
×
337
        Ok(req_ctx) => req_ctx,
×
338
        Err(e) => return anyhow!("Failed to extract error request: {}", e),
×
339
    };
340

341
    let err_response = match post_request(err_req).await {
×
342
        Ok(response) => response,
×
343
        Err(e) => return anyhow!("Failed to post error request: {}", e),
×
344
    };
345

346
    let err_bytes = match err_response.bytes().await {
×
347
        Ok(bytes) => bytes,
×
348
        Err(e) => return anyhow!("Failed to get error response bytes: {}", e),
×
349
    };
350

351
    if let Err(e) = receiver.process_err_res(&err_bytes, err_ctx) {
×
352
        return anyhow!("Failed to process error response: {}", e);
×
353
    }
×
354

×
355
    to_return
×
356
}
×
357

358
fn try_contributing_inputs(
1✔
359
    payjoin: payjoin::receive::v2::WantsInputs,
1✔
360
    wallet: &BitcoindWallet,
1✔
361
) -> Result<payjoin::receive::v2::ProvisionalProposal, ImplementationError> {
1✔
362
    let candidate_inputs = wallet.list_unspent()?;
1✔
363

364
    let selected_input =
1✔
365
        payjoin.try_preserving_privacy(candidate_inputs).map_err(ImplementationError::from)?;
1✔
366

367
    Ok(payjoin
1✔
368
        .contribute_inputs(vec![selected_input])
1✔
369
        .map_err(ImplementationError::from)?
1✔
370
        .commit_inputs())
1✔
371
}
1✔
372

373
async fn unwrap_ohttp_keys_or_else_fetch(config: &Config) -> Result<payjoin::OhttpKeys> {
1✔
374
    if let Some(keys) = config.v2()?.ohttp_keys.clone() {
1✔
375
        println!("Using OHTTP Keys from config");
1✔
376
        Ok(keys)
1✔
377
    } else {
378
        println!("Bootstrapping private network transport over Oblivious HTTP");
×
379
        let ohttp_relay = config.v2()?.ohttp_relay.clone();
×
380
        let payjoin_directory = config.v2()?.pj_directory.clone();
×
381
        #[cfg(feature = "_danger-local-https")]
382
        let ohttp_keys = {
×
383
            let cert_der = crate::app::read_local_cert()?;
×
384
            payjoin::io::fetch_ohttp_keys_with_cert(ohttp_relay, payjoin_directory, cert_der)
×
385
                .await?
×
386
        };
387
        #[cfg(not(feature = "_danger-local-https"))]
388
        let ohttp_keys = payjoin::io::fetch_ohttp_keys(ohttp_relay, payjoin_directory).await?;
389
        Ok(ohttp_keys)
×
390
    }
391
}
1✔
392

393
async fn post_request(req: payjoin::Request) -> Result<reqwest::Response> {
8✔
394
    let http = http_agent()?;
8✔
395
    http.post(req.url)
8✔
396
        .header("Content-Type", req.content_type)
8✔
397
        .body(req.body)
8✔
398
        .send()
8✔
399
        .await
8✔
400
        .map_err(map_reqwest_err)
6✔
401
}
6✔
402

403
fn map_reqwest_err(e: reqwest::Error) -> anyhow::Error {
×
404
    match e.status() {
×
405
        Some(status_code) => anyhow!("HTTP request failed: {} {}", status_code, e),
×
406
        None => anyhow!("No HTTP response: {}", e),
×
407
    }
408
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc