• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

payjoin / rust-payjoin / 12856536243

19 Jan 2025 07:30PM UTC coverage: 60.548% (+0.06%) from 60.489%
12856536243

Pull #491

github

web-flow
Merge 13bf515d5 into 4bbac3446
Pull Request #491: use FeeRate for max_fee_rate arg, and default it to BROADCAST_MIN

7 of 13 new or added lines in 6 files covered. (53.85%)

5 existing lines in 1 file now uncovered.

2939 of 4854 relevant lines covered (60.55%)

953.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/payjoin-cli/src/app/v2.rs
1
use std::str::FromStr;
2
use std::sync::Arc;
3

4
use anyhow::{anyhow, Context, Result};
5
use bitcoincore_rpc::RpcApi;
6
use payjoin::bitcoin::consensus::encode::serialize_hex;
7
use payjoin::bitcoin::psbt::Psbt;
8
use payjoin::bitcoin::{Amount, FeeRate};
9
use payjoin::receive::v2::{Receiver, UncheckedProposal};
10
use payjoin::receive::Error;
11
use payjoin::send::v2::{Sender, SenderBuilder};
12
use payjoin::{bitcoin, Uri};
13
use tokio::signal;
14
use tokio::sync::watch;
15

16
use super::config::AppConfig;
17
use super::App as AppTrait;
18
use crate::app::{http_agent, input_pair_from_list_unspent};
19
use crate::db::Database;
20

21
#[derive(Clone)]
22
pub(crate) struct App {
23
    config: AppConfig,
24
    db: Arc<Database>,
25
    interrupt: watch::Receiver<()>,
26
}
27

28
#[async_trait::async_trait]
29
impl AppTrait for App {
30
    fn new(config: AppConfig) -> Result<Self> {
×
31
        let db = Arc::new(Database::create(&config.db_path)?);
×
32
        let (interrupt_tx, interrupt_rx) = watch::channel(());
×
33
        tokio::spawn(handle_interrupt(interrupt_tx));
×
34
        let app = Self { config, db, interrupt: interrupt_rx };
×
35
        app.bitcoind()?
×
36
            .get_blockchain_info()
×
37
            .context("Failed to connect to bitcoind. Check config RPC connection.")?;
×
38
        Ok(app)
×
39
    }
×
40

41
    fn bitcoind(&self) -> Result<bitcoincore_rpc::Client> {
×
42
        match &self.config.bitcoind_cookie {
×
43
            Some(cookie) => bitcoincore_rpc::Client::new(
×
44
                self.config.bitcoind_rpchost.as_str(),
×
45
                bitcoincore_rpc::Auth::CookieFile(cookie.into()),
×
46
            ),
×
47
            None => bitcoincore_rpc::Client::new(
×
48
                self.config.bitcoind_rpchost.as_str(),
×
49
                bitcoincore_rpc::Auth::UserPass(
×
50
                    self.config.bitcoind_rpcuser.clone(),
×
51
                    self.config.bitcoind_rpcpassword.clone(),
×
52
                ),
×
53
            ),
×
54
        }
55
        .with_context(|| "Failed to connect to bitcoind")
×
56
    }
×
57

58
    async fn send_payjoin(&self, bip21: &str, fee_rate: FeeRate) -> Result<()> {
×
59
        use payjoin::UriExt;
60
        let uri =
×
61
            Uri::try_from(bip21).map_err(|e| anyhow!("Failed to create URI from BIP21: {}", e))?;
×
62
        let uri = uri.assume_checked();
×
63
        let uri = uri.check_pj_supported().map_err(|_| anyhow!("URI does not support Payjoin"))?;
×
64
        let url = uri.extras.endpoint();
×
65
        // match bip21 to send_session public_key
66
        let req_ctx = match self.db.get_send_session(url)? {
×
67
            Some(send_session) => send_session,
×
68
            None => {
69
                let psbt = self.create_original_psbt(&uri, fee_rate)?;
×
70
                let mut req_ctx = SenderBuilder::new(psbt, uri.clone())
×
71
                    .build_recommended(fee_rate)
×
72
                    .with_context(|| "Failed to build payjoin request")?;
×
73
                self.db.insert_send_session(&mut req_ctx, url)?;
×
74
                req_ctx
×
75
            }
76
        };
77
        self.spawn_payjoin_sender(req_ctx).await
×
78
    }
×
79

80
    async fn receive_payjoin(self, amount: Amount) -> Result<()> {
×
81
        let address = self.bitcoind()?.get_new_address(None, None)?.assume_checked();
×
82
        let ohttp_keys = unwrap_ohttp_keys_or_else_fetch(&self.config).await?;
×
83
        let session =
×
84
            Receiver::new(address, self.config.pj_directory.clone(), ohttp_keys.clone(), None);
×
85
        self.db.insert_recv_session(session.clone())?;
×
86
        self.spawn_payjoin_receiver(session, Some(amount)).await
×
87
    }
×
88
}
89

90
impl App {
91
    #[allow(clippy::incompatible_msrv)]
92
    async fn spawn_payjoin_sender(&self, mut req_ctx: Sender) -> Result<()> {
×
93
        let mut interrupt = self.interrupt.clone();
×
94
        tokio::select! {
×
95
            res = self.long_poll_post(&mut req_ctx) => {
×
96
                self.process_pj_response(res?)?;
×
97
                self.db.clear_send_session(req_ctx.endpoint())?;
×
98
            }
99
            _ = interrupt.changed() => {
×
100
                println!("Interrupted. Call `send` with the same arguments to resume this session or `resume` to resume all sessions.");
×
101
            }
×
102
        }
103
        Ok(())
×
104
    }
×
105

106
    #[allow(clippy::incompatible_msrv)]
107
    async fn spawn_payjoin_receiver(
×
108
        &self,
×
109
        mut session: Receiver,
×
110
        amount: Option<Amount>,
×
111
    ) -> Result<()> {
×
112
        println!("Receive session established");
×
113
        let mut pj_uri = session.pj_uri();
×
114
        pj_uri.amount = amount;
×
115

×
116
        println!("Request Payjoin by sharing this Payjoin Uri:");
×
117
        println!("{}", pj_uri);
×
118

×
119
        let mut interrupt = self.interrupt.clone();
×
120
        let receiver = tokio::select! {
×
121
            res = self.long_poll_fallback(&mut session) => res,
×
122
            _ = interrupt.changed() => {
×
123
                println!("Interrupted. Call the `resume` command to resume all sessions.");
×
124
                return Ok(());
×
125
            }
126
        }?;
×
127

128
        println!("Fallback transaction received. Consider broadcasting this to get paid if the Payjoin fails:");
×
129
        println!("{}", serialize_hex(&receiver.extract_tx_to_schedule_broadcast()));
×
130
        let mut payjoin_proposal = match self.process_v2_proposal(receiver.clone()) {
×
131
            Ok(proposal) => proposal,
×
132
            Err(e) => {
×
133
                return Err(handle_request_error(e, receiver, &self.config.ohttp_relay).await);
×
134
            }
135
        };
136
        let (req, ohttp_ctx) = payjoin_proposal
×
137
            .extract_v2_req(&self.config.ohttp_relay)
×
138
            .map_err(|e| anyhow!("v2 req extraction failed {}", e))?;
×
139
        println!("Got a request from the sender. Responding with a Payjoin proposal.");
×
140
        let res = post_request(req).await?;
×
141
        payjoin_proposal
×
142
            .process_res(&res.bytes().await?, ohttp_ctx)
×
143
            .map_err(|e| anyhow!("Failed to deserialize response {}", e))?;
×
144
        let payjoin_psbt = payjoin_proposal.psbt().clone();
×
145
        println!(
×
146
            "Response successful. Watch mempool for successful Payjoin. TXID: {}",
×
147
            payjoin_psbt.extract_tx_unchecked_fee_rate().clone().compute_txid()
×
148
        );
×
149
        self.db.clear_recv_session()?;
×
150
        Ok(())
×
151
    }
×
152

153
    #[allow(clippy::incompatible_msrv)]
154
    pub async fn resume_payjoins(&self) -> Result<()> {
×
155
        let recv_sessions = self.db.get_recv_sessions()?;
×
156
        let send_sessions = self.db.get_send_sessions()?;
×
157

158
        if recv_sessions.is_empty() && send_sessions.is_empty() {
×
159
            println!("No sessions to resume.");
×
160
            return Ok(());
×
161
        }
×
162

×
163
        let mut tasks = Vec::new();
×
164

165
        for session in recv_sessions {
×
166
            let self_clone = self.clone();
×
167
            tasks.push(tokio::spawn(async move {
×
168
                self_clone.spawn_payjoin_receiver(session, None).await
×
169
            }));
×
170
        }
×
171

172
        for session in send_sessions {
×
173
            let self_clone = self.clone();
×
174
            tasks.push(tokio::spawn(async move { self_clone.spawn_payjoin_sender(session).await }));
×
175
        }
×
176

177
        let mut interrupt = self.interrupt.clone();
×
178
        tokio::select! {
×
179
            _ = async {
×
180
                for task in tasks {
×
181
                    let _ = task.await;
×
182
                }
183
            } => {
×
184
                println!("All resumed sessions completed.");
×
185
            }
×
186
            _ = interrupt.changed() => {
×
187
                println!("Resumed sessions were interrupted.");
×
188
            }
×
189
        }
190
        Ok(())
×
191
    }
×
192

193
    async fn long_poll_post(&self, req_ctx: &mut Sender) -> Result<Psbt> {
×
194
        match req_ctx.extract_v2(self.config.ohttp_relay.clone()) {
×
195
            Ok((req, ctx)) => {
×
196
                println!("Posting Original PSBT Payload request...");
×
197
                let response = post_request(req).await?;
×
198
                println!("Sent fallback transaction");
×
199
                let v2_ctx = Arc::new(ctx.process_response(&response.bytes().await?)?);
×
200
                loop {
201
                    let (req, ohttp_ctx) = v2_ctx.extract_req(self.config.ohttp_relay.clone())?;
×
202
                    let response = post_request(req).await?;
×
203
                    match v2_ctx.process_response(&response.bytes().await?, ohttp_ctx) {
×
204
                        Ok(Some(psbt)) => return Ok(psbt),
×
205
                        Ok(None) => {
×
206
                            println!("No response yet.");
×
207
                        }
×
208
                        Err(re) => {
×
209
                            println!("{}", re);
×
210
                            log::debug!("{:?}", re);
×
211
                            return Err(anyhow!("Response error").context(re));
×
212
                        }
213
                    }
214
                }
215
            }
216
            Err(_) => {
217
                let (req, v1_ctx) = req_ctx.extract_v1()?;
×
218
                println!("Posting Original PSBT Payload request...");
×
219
                let response = post_request(req).await?;
×
220
                println!("Sent fallback transaction");
×
221
                match v1_ctx.process_response(&mut response.bytes().await?.to_vec().as_slice()) {
×
222
                    Ok(psbt) => Ok(psbt),
×
223
                    Err(re) => {
×
224
                        println!("{}", re);
×
225
                        log::debug!("{:?}", re);
×
226
                        Err(anyhow!("Response error").context(re))
×
227
                    }
228
                }
229
            }
230
        }
231
    }
×
232

233
    async fn long_poll_fallback(
×
234
        &self,
×
235
        session: &mut payjoin::receive::v2::Receiver,
×
236
    ) -> Result<payjoin::receive::v2::UncheckedProposal> {
×
237
        loop {
238
            let (req, context) = session.extract_req(&self.config.ohttp_relay)?;
×
239
            println!("Polling receive request...");
×
240
            let ohttp_response = post_request(req).await?;
×
241
            let proposal = session
×
242
                .process_res(ohttp_response.bytes().await?.to_vec().as_slice(), context)
×
243
                .map_err(|_| anyhow!("GET fallback failed"))?;
×
244
            log::debug!("got response");
×
245
            if let Some(proposal) = proposal {
×
246
                break Ok(proposal);
×
247
            }
×
248
        }
249
    }
×
250

251
    fn process_v2_proposal(
×
252
        &self,
×
253
        proposal: payjoin::receive::v2::UncheckedProposal,
×
254
    ) -> Result<payjoin::receive::v2::PayjoinProposal, Error> {
×
255
        let bitcoind = self.bitcoind().map_err(|e| Error::Implementation(e.into()))?;
×
256

257
        // in a payment processor where the sender could go offline, this is where you schedule to broadcast the original_tx
258
        let _to_broadcast_in_failure_case = proposal.extract_tx_to_schedule_broadcast();
×
259

260
        // The network is used for checks later
261
        let network =
×
262
            bitcoind.get_blockchain_info().map_err(|e| Error::Implementation(e.into()))?.chain;
×
263
        // Receive Check 1: Can Broadcast
264
        let proposal = proposal.check_broadcast_suitability(None, |tx| {
×
265
            let raw_tx = bitcoin::consensus::encode::serialize_hex(&tx);
×
266
            let mempool_results = bitcoind
×
267
                .test_mempool_accept(&[raw_tx])
×
268
                .map_err(|e| Error::Implementation(e.into()))?;
×
269
            match mempool_results.first() {
×
270
                Some(result) => Ok(result.allowed),
×
271
                None => Err(Error::Implementation(
×
272
                    anyhow!("No mempool results returned on broadcast check").into(),
×
273
                )),
×
274
            }
275
        })?;
×
276
        log::trace!("check1");
×
277

278
        // Receive Check 2: receiver can't sign for proposal inputs
279
        let proposal = proposal.check_inputs_not_owned(|input| {
×
280
            if let Ok(address) = bitcoin::Address::from_script(input, network) {
×
281
                bitcoind
×
282
                    .get_address_info(&address)
×
283
                    .map(|info| info.is_mine.unwrap_or(false))
×
284
                    .map_err(|e| Error::Implementation(e.into()))
×
285
            } else {
286
                Ok(false)
×
287
            }
288
        })?;
×
289
        log::trace!("check2");
×
290

291
        // Receive Check 3: have we seen this input before? More of a check for non-interactive i.e. payment processor receivers.
292
        let payjoin = proposal.check_no_inputs_seen_before(|input| {
×
293
            self.db.insert_input_seen_before(*input).map_err(|e| Error::Implementation(e.into()))
×
294
        })?;
×
295
        log::trace!("check3");
×
296

297
        let payjoin = payjoin
×
298
            .identify_receiver_outputs(|output_script| {
×
299
                if let Ok(address) = bitcoin::Address::from_script(output_script, network) {
×
300
                    bitcoind
×
301
                        .get_address_info(&address)
×
302
                        .map(|info| info.is_mine.unwrap_or(false))
×
303
                        .map_err(|e| Error::Implementation(e.into()))
×
304
                } else {
305
                    Ok(false)
×
306
                }
307
            })?
×
308
            .commit_outputs();
×
309

×
310
        let provisional_payjoin = try_contributing_inputs(payjoin.clone(), &bitcoind)
×
311
            .unwrap_or_else(|e| {
×
312
                log::warn!("Failed to contribute inputs: {}", e);
×
313
                payjoin.commit_inputs()
×
314
            });
×
315

316
        let payjoin_proposal = provisional_payjoin.finalize_proposal(
×
317
            |psbt: &Psbt| {
×
318
                bitcoind
×
319
                    .wallet_process_psbt(&psbt.to_string(), None, None, Some(false))
×
320
                    .map(|res| {
×
321
                        Psbt::from_str(&res.psbt).map_err(|e| Error::Implementation(e.into()))
×
322
                    })
×
323
                    .map_err(|e| Error::Implementation(e.into()))?
×
324
            },
×
325
            Some(bitcoin::FeeRate::MIN),
×
NEW
326
            self.config.max_fee_rate.unwrap_or(FeeRate::BROADCAST_MIN),
×
327
        )?;
×
328
        let payjoin_proposal_psbt = payjoin_proposal.psbt();
×
329
        log::debug!("Receiver's Payjoin proposal PSBT Rsponse: {:#?}", payjoin_proposal_psbt);
×
330
        Ok(payjoin_proposal)
×
331
    }
×
332
}
333

334
/// Handle request error by sending an error response over the directory
335
async fn handle_request_error(
×
336
    e: Error,
×
337
    mut receiver: UncheckedProposal,
×
338
    ohttp_relay: &payjoin::Url,
×
339
) -> anyhow::Error {
×
340
    let (err_req, err_ctx) = match receiver.extract_err_req(&e, ohttp_relay) {
×
341
        Ok(req_ctx) => req_ctx,
×
342
        Err(e) => return anyhow!("Failed to extract error request: {}", e),
×
343
    };
344

345
    let err_response = match post_request(err_req).await {
×
346
        Ok(response) => response,
×
347
        Err(e) => return anyhow!("Failed to post error request: {}", e),
×
348
    };
349

350
    let err_bytes = match err_response.bytes().await {
×
351
        Ok(bytes) => bytes,
×
352
        Err(e) => return anyhow!("Failed to get error response bytes: {}", e),
×
353
    };
354

355
    if let Err(e) = receiver.process_err_res(&err_bytes, err_ctx) {
×
356
        return anyhow!("Failed to process error response: {}", e);
×
357
    }
×
358

×
359
    e.into()
×
360
}
×
361

362
fn try_contributing_inputs(
×
363
    payjoin: payjoin::receive::v2::WantsInputs,
×
364
    bitcoind: &bitcoincore_rpc::Client,
×
365
) -> Result<payjoin::receive::v2::ProvisionalProposal> {
×
366
    let candidate_inputs = bitcoind
×
367
        .list_unspent(None, None, None, None, None)
×
368
        .context("Failed to list unspent from bitcoind")?
×
369
        .into_iter()
×
370
        .map(input_pair_from_list_unspent);
×
371
    let selected_input = payjoin
×
372
        .try_preserving_privacy(candidate_inputs)
×
373
        .map_err(|e| anyhow!("Failed to make privacy preserving selection: {}", e))?;
×
374
    log::debug!("selected input: {:#?}", selected_input);
×
375

376
    Ok(payjoin
×
377
        .contribute_inputs(vec![selected_input])
×
378
        .expect("This shouldn't happen. Failed to contribute inputs.")
×
379
        .commit_inputs())
×
380
}
×
381

382
async fn unwrap_ohttp_keys_or_else_fetch(config: &AppConfig) -> Result<payjoin::OhttpKeys> {
×
383
    if let Some(keys) = config.ohttp_keys.clone() {
×
384
        println!("Using OHTTP Keys from config");
×
385
        Ok(keys)
×
386
    } else {
387
        println!("Bootstrapping private network transport over Oblivious HTTP");
×
388
        let ohttp_relay = config.ohttp_relay.clone();
×
389
        let payjoin_directory = config.pj_directory.clone();
×
390
        #[cfg(feature = "_danger-local-https")]
391
        let ohttp_keys = {
×
392
            let cert_der = crate::app::read_local_cert()?;
×
393
            payjoin::io::fetch_ohttp_keys_with_cert(ohttp_relay, payjoin_directory, cert_der)
×
394
                .await?
×
395
        };
396
        #[cfg(not(feature = "_danger-local-https"))]
397
        let ohttp_keys = payjoin::io::fetch_ohttp_keys(ohttp_relay, payjoin_directory).await?;
398
        Ok(ohttp_keys)
×
399
    }
400
}
×
401

402
async fn handle_interrupt(tx: watch::Sender<()>) {
×
403
    if let Err(e) = signal::ctrl_c().await {
×
404
        eprintln!("Error setting up Ctrl-C handler: {}", e);
×
405
    }
×
406
    let _ = tx.send(());
×
407
}
×
408

409
async fn post_request(req: payjoin::Request) -> Result<reqwest::Response> {
×
410
    let http = http_agent()?;
×
411
    http.post(req.url)
×
412
        .header("Content-Type", req.content_type)
×
413
        .body(req.body)
×
414
        .send()
×
415
        .await
×
416
        .map_err(map_reqwest_err)
×
417
}
×
418

419
fn map_reqwest_err(e: reqwest::Error) -> anyhow::Error {
×
420
    match e.status() {
×
421
        Some(status_code) => anyhow!("HTTP request failed: {} {}", status_code, e),
×
422
        None => anyhow!("No HTTP response: {}", e),
×
423
    }
424
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc