• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ergoplatform / sigma-rust / 19957094785

05 Dec 2025 08:23AM UTC coverage: 86.918% (+8.5%) from 78.463%
19957094785

Pull #837

github

web-flow
Merge dec08367a into 2f840d387
Pull Request #837: Split TransactionHintsBag hints properly

44 of 53 new or added lines in 13 files covered. (83.02%)

1621 existing lines in 221 files now uncovered.

27453 of 31585 relevant lines covered (86.92%)

253204.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.53
/ergo-rest/src/api/peer_discovery_internals/non_chrome.rs
1
//! This module contains the implementation of `peer_discovery`.  It's structured as 2 separate
2
//! tasks:
3
//!
4
//!  - Task 1 is responsible for tracking which nodes are active/inactive and making sure that any
5
//!    given ergo node is queried exactly once.
6
//!  - Task 2's job is to wait for a URL from task 1, make the actual HTTP requests to that URL, and
7
//!    to report the result back to task 1.
8
//! ```text
9
//!                              <ergo node URL>
10
//!               __________________________________________________
11
//!              |                                                  |
12
//!              |                                                  v
13
//!  /----------------------\                   /----------------------\
14
//!  | 1. Track node status |                   | 2. HTTP request task |
15
//!  \----------------------/                   \----------------------/
16
//!              ^                                                  |
17
//!              |__________________________________________________|
18
//!                <active node| non-active node| list of peers>
19
//! ```
20
use super::PeerDiscoverySettings;
21
use crate::api::peer_discovery_internals::get_peers_all;
22
use crate::error::PeerDiscoveryError;
23
use crate::{api::node::get_info, NodeConf, PeerInfo};
24
use async_trait::async_trait;
25
use bounded_integer::BoundedU16;
26
use bounded_vec::NonEmptyVec;
27
use ergo_chain_types::PeerAddr;
28
use std::fmt::Debug;
29
use std::{collections::HashSet, time::Duration};
30
use url::Url;
31

32
// Uncomment the following to enable logging on WASM through the `console_log` macro. Taken from
33
// https://rustwasm.github.io/wasm-bindgen/examples/console-log.html#srclibrs
34
//#[cfg(target_arch = "wasm32")]
35
//use wasm_bindgen::prelude::*;
36
//
37
//
38
//#[cfg(target_arch = "wasm32")]
39
//#[wasm_bindgen]
40
//extern "C" {
41
//    // Use `js_namespace` here to bind `console.log(..)` instead of just
42
//    // `log(..)`
43
//    #[wasm_bindgen(js_namespace = console)]
44
//    fn log(s: &str);
45
//}
46
//
47
//#[cfg(target_arch = "wasm32")]
48
//macro_rules! console_log {
49
// Note that this is using the `log` function imported above during
50
// `bare_bones`
51
//($($t:tt)*) => (log(&format_args!($($t)*).to_string()))
52
//}
53

54
pub(crate) async fn peer_discovery_inner(
2✔
55
    seeds: NonEmptyVec<Url>,
2✔
56
    max_parallel_tasks: BoundedU16<1, { u16::MAX }>,
2✔
57
    timeout: Duration,
2✔
58
) -> Result<Vec<Url>, PeerDiscoveryError> {
2✔
59
    let settings = PeerDiscoverySettings {
2✔
60
        max_parallel_tasks,
2✔
61
        task_2_buffer_length: max_parallel_tasks.get() as usize,
2✔
62
        global_timeout: timeout,
2✔
63
        timeout_of_individual_node_request: Duration::from_secs(4),
2✔
64
    };
2✔
65
    #[cfg(not(target_arch = "wasm32"))]
66
    let (tx_msg, rx_msg) = tokio::sync::mpsc::channel::<Msg>(settings.task_2_buffer_length);
2✔
67
    #[cfg(not(target_arch = "wasm32"))]
68
    let (tx_url, rx_url) = tokio::sync::mpsc::channel::<Url>(settings.task_2_buffer_length);
2✔
69
    #[cfg(not(target_arch = "wasm32"))]
70
    let url_stream = tokio_stream::wrappers::ReceiverStream::new(rx_url);
2✔
71
    #[cfg(not(target_arch = "wasm32"))]
72
    let msg_stream = tokio_stream::wrappers::ReceiverStream::new(rx_msg);
2✔
73

74
    #[cfg(target_arch = "wasm32")]
75
    let (tx_msg, rx_msg) = futures::channel::mpsc::channel::<Msg>(settings.task_2_buffer_length);
76
    #[cfg(target_arch = "wasm32")]
77
    let (tx_url, rx_url) = futures::channel::mpsc::channel::<Url>(settings.task_2_buffer_length);
78
    #[cfg(target_arch = "wasm32")]
79
    let url_stream = rx_url;
80
    #[cfg(target_arch = "wasm32")]
81
    let msg_stream = rx_msg;
82

83
    peer_discovery_impl(seeds, tx_msg, msg_stream, tx_url, url_stream, settings).await
2✔
84
}
2✔
85

86
/// Implementation of `peer_discovery`.
87
async fn peer_discovery_impl<
2✔
88
    SendMsg: 'static + ChannelInfallibleSender<Msg> + Clone + Send + Sync,
2✔
89
    SendUrl: 'static + ChannelInfallibleSender<Url> + ChannelTrySender<Url> + Clone + Send + Sync,
2✔
90
>(
2✔
91
    seeds: NonEmptyVec<Url>,
2✔
92
    tx_msg: SendMsg,
2✔
93
    msg_stream: impl futures::Stream<Item = Msg> + Send + 'static,
2✔
94
    mut tx_url: SendUrl,
2✔
95
    url_stream: impl futures::Stream<Item = Url> + Send + 'static,
2✔
96
    settings: PeerDiscoverySettings,
2✔
97
) -> Result<Vec<Url>, PeerDiscoveryError> {
2✔
98
    use futures::future::FutureExt;
99
    use futures::StreamExt;
100

101
    let mut seeds_set: HashSet<Url> = HashSet::new();
2✔
102

103
    for mut seed_url in seeds {
30✔
104
        #[allow(clippy::unwrap_used)]
28✔
105
        seed_url.set_port(None).unwrap();
28✔
106
        seeds_set.insert(seed_url);
28✔
107
    }
28✔
108

109
    // Task 2 from the schematic above
110
    spawn_http_request_task(
2✔
111
        tx_msg,
2✔
112
        url_stream,
2✔
113
        settings.max_parallel_tasks,
2✔
114
        settings.timeout_of_individual_node_request,
2✔
115
    );
116

117
    // Start with requests to seed nodes.
118
    for url in &seeds_set {
30✔
119
        tx_url.infallible_send(url.clone()).await;
28✔
120
    }
121

122
    // (*) This variable represents the number of URLs that need to be checked to see whether it
123
    // corresponds to an active Ergo node. `count` is crucial to allow this function to terminate,
124
    // as once it reaches zero we break the loop below. This leads us to drop `tx_url`, which is the
125
    // sender side of the receiver stream `rx_url_stream`, allowing task 1 to end.
126
    let mut count = seeds_set.len();
2✔
127

128
    let mut visited_active_peers = HashSet::new();
2✔
129
    let mut visited_peers = HashSet::new();
2✔
130

131
    // Stack of peers to evaluate. Used as a growable buffer for when the (tx_url, rx_url) channel
132
    // gets full.
133
    let mut peer_stack: Vec<PeerInfo> = vec![];
2✔
134

135
    // Here we spawn a task that triggers a signal after `settings.global_timeout` has elapsed.
136
    #[cfg(target_arch = "wasm32")]
137
    let rx_timeout_signal = {
138
        let (tx, rx) = futures::channel::oneshot::channel::<()>();
139
        wasm_bindgen_futures::spawn_local(async move {
140
            let _ = wasmtimer::tokio::sleep(settings.global_timeout).await;
141
            let _ = tx.send(());
142
        });
143
        rx.into_stream()
144
    };
145

146
    #[cfg(not(target_arch = "wasm32"))]
147
    let rx_timeout_signal = {
2✔
148
        let (tx, rx) = tokio::sync::oneshot::channel::<()>();
2✔
149
        tokio::spawn(async move {
2✔
150
            tokio::time::sleep(settings.global_timeout).await;
2✔
151
            let _ = tx.send(());
1✔
152
        });
1✔
153
        rx.into_stream()
2✔
154
    };
155

156
    // In addition to listening for `Msg`s from the HTTP request task, we need to watch for the
157
    // timeout signal so we can exit early. The solution is to combine the streams.
158
    enum C {
159
        RxMsg(Msg),
160
        RxTimeoutSignal,
161
    }
162

163
    type CombinedStream = std::pin::Pin<Box<dyn futures::stream::Stream<Item = C> + Send>>;
164

165
    let streams: Vec<CombinedStream> = vec![
2✔
166
        msg_stream.map(C::RxMsg).boxed(),
2✔
167
        rx_timeout_signal.map(|_| C::RxTimeoutSignal).boxed(),
2✔
168
    ];
169
    let mut combined_stream = futures::stream::select_all(streams);
2✔
170

171
    // This variable equals to true as long as we're checking for new peer nodes. It is set to false
172
    // once the global timeout is reached.
173
    let mut add_peers = true;
2✔
174

175
    'loop_: while let Some(n) = combined_stream.next().await {
1,666✔
176
        match n {
1,666✔
177
            C::RxMsg(p) => {
1,665✔
178
                // Try pushing as many peers as can be allowed in the (tx_url, rx_url) channel
179
                while let Some(peer) = peer_stack.pop() {
13,232✔
180
                    let mut url = peer.addr.as_http_url();
11,990✔
181
                    #[allow(clippy::unwrap_used)]
182
                    url.set_port(None).unwrap();
11,990✔
183
                    if !visited_peers.contains(&url) {
11,990✔
184
                        match tx_url.try_send(url.clone()) {
2,002✔
185
                            Ok(_) => {
1,579✔
186
                                visited_peers.insert(url);
1,579✔
187
                                count += 1;
1,579✔
188
                            }
1,579✔
189
                            Err(TrySendError::Full) => {
190
                                // Push it back on the stack, try again later.
191
                                peer_stack.push(peer);
423✔
192
                                break;
423✔
193
                            }
194
                            Err(TrySendError::Closed) => {
195
                                return Err(PeerDiscoveryError::MpscSender);
×
196
                            }
197
                        }
198
                    }
9,988✔
199
                }
200
                match p {
1,665✔
201
                    Msg::AddActiveNode(mut url) => {
58✔
202
                        #[allow(clippy::unwrap_used)]
203
                        url.set_port(None).unwrap();
58✔
204
                        visited_active_peers.insert(url.clone());
58✔
205
                        visited_peers.insert(url);
58✔
206
                        count -= 1;
58✔
207
                        if count == 0 {
58✔
208
                            break 'loop_;
×
209
                        }
58✔
210
                    }
211
                    Msg::AddInactiveNode(mut url) => {
1,549✔
212
                        #[allow(clippy::unwrap_used)]
213
                        url.set_port(None).unwrap();
1,549✔
214
                        visited_peers.insert(url);
1,549✔
215
                        count -= 1;
1,549✔
216
                        if count == 0 {
1,549✔
217
                            break 'loop_;
2✔
218
                        }
1,547✔
219
                    }
220
                    Msg::CheckPeers(mut peers) => {
58✔
221
                        use rand::seq::SliceRandom;
222
                        use rand::thread_rng;
223
                        peers.shuffle(&mut thread_rng());
58✔
224
                        if add_peers {
58✔
225
                            peer_stack.extend(peers);
36✔
226
                        }
36✔
227
                    }
228
                }
229
            }
230
            C::RxTimeoutSignal => {
1✔
231
                add_peers = false;
1✔
232
                peer_stack.clear();
1✔
233
            }
1✔
234
        }
235
    }
236

237
    drop(tx_url);
2✔
238
    let coll: Vec<_> = visited_active_peers
2✔
239
        .difference(&seeds_set)
2✔
240
        .cloned()
2✔
241
        .collect();
2✔
242

243
    // Uncomment for debugging
244

245
    //#[cfg(not(target_arch = "wasm32"))]
246
    //println!(
247
    //    "Total # nodes visited: {}, # peers found: {}",
248
    //    visited_peers.len(),
249
    //    coll.len()
250
    //);
251
    //
252
    //#[cfg(target_arch = "wasm32")]
253
    //console_log!(
254
    //    "Total # nodes visited: {}, # peers found: {}",
255
    //    visited_peers.len(),
256
    //    coll.len()
257
    //);
258
    Ok(coll)
2✔
259
}
2✔
260

261
/// Given a stream that receives URLs of full ergo nodes, spawn a task (task 2 in the schematic
262
/// above) which checks if it is active.  If so, request its peers. In all cases, a message (enum
263
/// `Msg`) is sent out to notify the listener.
264
fn spawn_http_request_task<
2✔
265
    SendMsg: ChannelInfallibleSender<Msg> + Clone + Send + Sync + 'static,
2✔
266
>(
2✔
267
    tx_peer: SendMsg,
2✔
268
    url_stream: impl futures::Stream<Item = Url> + Send + 'static,
2✔
269
    max_parallel_requests: BoundedU16<1, { u16::MAX }>,
2✔
270
    request_timeout_duration: Duration,
2✔
271
) {
2✔
272
    use futures::StreamExt;
273

274
    // Note that `tokio` - the de facto standard async runtime - is not supported on WASM. We need
275
    // to spawn tasks for HTTP requests, and for WASM we rely on the `wasm_bindgen_futures` crate.
276
    #[cfg(not(target_arch = "wasm32"))]
277
    let spawn_fn = tokio::spawn;
2✔
278

279
    #[cfg(target_arch = "wasm32")]
280
    let spawn_fn = wasm_bindgen_futures::spawn_local;
281

282
    let mapped_stream = url_stream
2✔
283
        .map(move |mut url| {
1,607✔
284
            let mut tx_peer = tx_peer.clone();
1,607✔
285
            async move {
1,607✔
286
                // `tokio::spawn` returns a `JoinHandle` which we make sure to drop. If we don't drop
287
                // and instead await on it, performance suffers greatly (~ 5x slower). In WASM case
288
                // we don't need to worry because `wasm_bindgen_futures::spawn_local` returns ().
289
                let _handle = spawn_fn(async move {
1,607✔
290
                    // Query node at url.
291
                    #[allow(clippy::unwrap_used)]
292
                    url.set_port(Some(9053)).unwrap();
1,607✔
293
                    #[allow(clippy::unwrap_used)]
294
                    let node_conf = NodeConf {
1,607✔
295
                        addr: PeerAddr::try_from(&url).unwrap(),
1,607✔
296
                        api_key: None,
1,607✔
297
                        timeout: Some(request_timeout_duration),
1,607✔
298
                    };
1,607✔
299

300
                    // If active, look up its peers.
301
                    match get_info(node_conf).await {
1,607✔
302
                        Ok(_) => {
303
                            match get_peers_all(node_conf).await {
58✔
304
                                Ok(peers) => {
58✔
305
                                    // It's important to send this message before the `AddActiveNode`
306
                                    // message below, to ensure an accurate `count` variable in task 1;
307
                                    // see (*) above in `peer_discovery_inner`.
308
                                    tx_peer.infallible_send(Msg::CheckPeers(peers)).await;
58✔
309
                                    tx_peer
58✔
310
                                        .infallible_send(Msg::AddActiveNode(url.clone()))
58✔
311
                                        .await;
58✔
312
                                }
313
                                Err(_) => {
314
                                    #[allow(clippy::unwrap_used)]
UNCOV
315
                                    tx_peer.infallible_send(Msg::AddInactiveNode(url)).await;
×
316
                                }
317
                            }
318
                        }
319
                        Err(_) => {
320
                            #[allow(clippy::unwrap_used)]
321
                            tx_peer.infallible_send(Msg::AddInactiveNode(url)).await;
1,549✔
322
                        }
323
                    }
324
                });
1,607✔
325
            }
1,607✔
326
        })
1,607✔
327
        .buffer_unordered(max_parallel_requests.get() as usize); // Allow for parallel requests
2✔
328

329
    // Note: We need to define another binding to the spawn function to get around the Rust type
330
    // checker.
331
    #[cfg(not(target_arch = "wasm32"))]
332
    let spawn_fn_new = tokio::spawn;
2✔
333

334
    #[cfg(target_arch = "wasm32")]
335
    let spawn_fn_new = wasm_bindgen_futures::spawn_local;
336

337
    // (*) Run stream to completion.
338
    spawn_fn_new(mapped_stream.for_each(|_| async move {}));
1,607✔
339
}
2✔
340

341
/// Used in the implementation of `peer_discovery`
342
#[derive(Debug)]
343
pub(crate) enum Msg {
344
    /// Indicates that the ergo node at the given URL is active. This means that a GET request
345
    /// to the node's /info endpoint responds with code 200 OK.
346
    AddActiveNode(Url),
347
    /// Indicates that the ergo node at the given URL is inactive. This means that a GET request
348
    /// to the node's /info endpoint does not respond with code 200 OK.
349
    AddInactiveNode(Url),
350
    /// A list of peers of an active ergo node, returned from a GET on the /peers/all endpoint.
351
    CheckPeers(Vec<PeerInfo>),
352
}
353

354
/// This trait abstracts over the `send` method of channel senders, assuming no failure.
355
#[async_trait]
356
trait ChannelInfallibleSender<T> {
357
    /// A send that cannot fail.
358
    async fn infallible_send(&mut self, value: T);
359
}
360

361
#[cfg(not(target_arch = "wasm32"))]
362
#[async_trait]
363
impl<T: Debug + Send> ChannelInfallibleSender<T> for tokio::sync::mpsc::Sender<T> {
364
    async fn infallible_send(&mut self, value: T) {
1,693✔
365
        // If error results, just discard it.
366
        let _ = self.send(value).await;
367
    }
1,693✔
368
}
369

370
#[cfg(target_arch = "wasm32")]
371
#[async_trait]
372
impl<T: Debug + Send> ChannelInfallibleSender<T> for futures::channel::mpsc::Sender<T> {
373
    async fn infallible_send(&mut self, value: T) {
374
        use futures::sink::SinkExt;
375
        // If error results, just discard it.
376
        let _ = self.send(value).await;
377
    }
378
}
379

380
/// This trait abstracts over the `try_send` method of channel senders
381
trait ChannelTrySender<T> {
382
    fn try_send(&mut self, value: T) -> Result<(), TrySendError>;
383
}
384

385
/// Errors that can return from `try_send(..)` calls are converted into the following enum.
386
enum TrySendError {
387
    /// Receiver's buffer is full
388
    Full,
389
    /// Receiver is no longer active. Either it was specifically closed or dropped.
390
    Closed,
391
}
392

393
#[cfg(not(target_arch = "wasm32"))]
394
impl<T> ChannelTrySender<T> for tokio::sync::mpsc::Sender<T> {
395
    fn try_send(&mut self, value: T) -> Result<(), TrySendError> {
2,002✔
396
        use tokio::sync::mpsc::error::TrySendError as TokioTrySendError;
397
        match tokio::sync::mpsc::Sender::try_send(self, value) {
2,002✔
398
            Ok(()) => Ok(()),
1,579✔
399
            Err(TokioTrySendError::Full(_)) => Err(TrySendError::Full),
423✔
400
            Err(TokioTrySendError::Closed(_)) => Err(TrySendError::Closed),
×
401
        }
402
    }
2,002✔
403
}
404

405
#[cfg(target_arch = "wasm32")]
406
impl<T> ChannelTrySender<T> for futures::channel::mpsc::Sender<T> {
407
    fn try_send(&mut self, value: T) -> Result<(), TrySendError> {
408
        match futures::channel::mpsc::Sender::try_send(self, value) {
409
            Ok(_) => Ok(()),
410
            Err(e) => {
411
                if e.is_full() {
412
                    Err(TrySendError::Full)
413
                } else {
414
                    Err(TrySendError::Closed)
415
                }
416
            }
417
        }
418
    }
419
}
420

421
#[cfg(test)]
422
#[allow(clippy::unwrap_used)]
423
mod tests {
424
    use super::*;
425
    use std::str::FromStr;
426

427
    #[test]
428
    fn test_get_peers_all() {
1✔
429
        let runtime_inner = tokio::runtime::Builder::new_multi_thread()
1✔
430
            .enable_all()
1✔
431
            .build()
1✔
432
            .unwrap();
1✔
433
        let node_conf = NodeConf {
1✔
434
            addr: PeerAddr::from_str("213.239.193.208:9053").unwrap(),
1✔
435
            api_key: None,
1✔
436
            timeout: Some(Duration::from_secs(5)),
1✔
437
        };
1✔
438
        let res = runtime_inner.block_on(async { get_peers_all(node_conf).await.unwrap() });
1✔
439
        assert!(!res.is_empty())
1✔
440
    }
1✔
441
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc