• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ergoplatform / sigma-rust / 20193621239

13 Dec 2025 02:45PM UTC coverage: 78.457% (-0.006%) from 78.463%
20193621239

Pull #839

github

web-flow
Merge 64a7742ab into 2f840d387
Pull Request #839: docs(iOS): Modernize README with Xcode 15+ and comprehensive guides

11967 of 15253 relevant lines covered (78.46%)

3.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.52
/ergo-rest/src/api/peer_discovery_internals/non_chrome.rs
1
//! This module contains the implementation of `peer_discovery`.  It's structured as 2 separate
2
//! tasks:
3
//!
4
//!  - Task 1 is responsible for tracking which nodes are active/inactive and making sure that any
5
//!    given ergo node is queried exactly once.
6
//!  - Task 2's job is to wait for a URL from task 1, make the actual HTTP requests to that URL, and
7
//!    to report the result back to task 1.
8
//! ```text
9
//!                              <ergo node URL>
10
//!               __________________________________________________
11
//!              |                                                  |
12
//!              |                                                  v
13
//!  /----------------------\                   /----------------------\
14
//!  | 1. Track node status |                   | 2. HTTP request task |
15
//!  \----------------------/                   \----------------------/
16
//!              ^                                                  |
17
//!              |__________________________________________________|
18
//!                <active node| non-active node| list of peers>
19
//! ```
20
use super::PeerDiscoverySettings;
21
use crate::api::peer_discovery_internals::get_peers_all;
22
use crate::error::PeerDiscoveryError;
23
use crate::{api::node::get_info, NodeConf, PeerInfo};
24
use async_trait::async_trait;
25
use bounded_integer::BoundedU16;
26
use bounded_vec::NonEmptyVec;
27
use ergo_chain_types::PeerAddr;
28
use std::fmt::Debug;
29
use std::{collections::HashSet, time::Duration};
30
use url::Url;
31

32
// Uncomment the following to enable logging on WASM through the `console_log` macro. Taken from
33
// https://rustwasm.github.io/wasm-bindgen/examples/console-log.html#srclibrs
34
//#[cfg(target_arch = "wasm32")]
35
//use wasm_bindgen::prelude::*;
36
//
37
//
38
//#[cfg(target_arch = "wasm32")]
39
//#[wasm_bindgen]
40
//extern "C" {
41
//    // Use `js_namespace` here to bind `console.log(..)` instead of just
42
//    // `log(..)`
43
//    #[wasm_bindgen(js_namespace = console)]
44
//    fn log(s: &str);
45
//}
46
//
47
//#[cfg(target_arch = "wasm32")]
48
//macro_rules! console_log {
49
// Note that this is using the `log` function imported above during
50
// `bare_bones`
51
//($($t:tt)*) => (log(&format_args!($($t)*).to_string()))
52
//}
53

54
pub(crate) async fn peer_discovery_inner(
1✔
55
    seeds: NonEmptyVec<Url>,
56
    max_parallel_tasks: BoundedU16<1, { u16::MAX }>,
57
    timeout: Duration,
58
) -> Result<Vec<Url>, PeerDiscoveryError> {
59
    let settings = PeerDiscoverySettings {
60
        max_parallel_tasks,
61
        task_2_buffer_length: max_parallel_tasks.get() as usize,
2✔
62
        global_timeout: timeout,
63
        timeout_of_individual_node_request: Duration::from_secs(4),
1✔
64
    };
65
    #[cfg(not(target_arch = "wasm32"))]
66
    let (tx_msg, rx_msg) = tokio::sync::mpsc::channel::<Msg>(settings.task_2_buffer_length);
1✔
67
    #[cfg(not(target_arch = "wasm32"))]
68
    let (tx_url, rx_url) = tokio::sync::mpsc::channel::<Url>(settings.task_2_buffer_length);
2✔
69
    #[cfg(not(target_arch = "wasm32"))]
70
    let url_stream = tokio_stream::wrappers::ReceiverStream::new(rx_url);
2✔
71
    #[cfg(not(target_arch = "wasm32"))]
72
    let msg_stream = tokio_stream::wrappers::ReceiverStream::new(rx_msg);
2✔
73

74
    #[cfg(target_arch = "wasm32")]
75
    let (tx_msg, rx_msg) = futures::channel::mpsc::channel::<Msg>(settings.task_2_buffer_length);
76
    #[cfg(target_arch = "wasm32")]
77
    let (tx_url, rx_url) = futures::channel::mpsc::channel::<Url>(settings.task_2_buffer_length);
78
    #[cfg(target_arch = "wasm32")]
79
    let url_stream = rx_url;
80
    #[cfg(target_arch = "wasm32")]
81
    let msg_stream = rx_msg;
82

83
    peer_discovery_impl(seeds, tx_msg, msg_stream, tx_url, url_stream, settings).await
3✔
84
}
85

86
/// Implementation of `peer_discovery`.
87
async fn peer_discovery_impl<
1✔
88
    SendMsg: 'static + ChannelInfallibleSender<Msg> + Clone + Send + Sync,
89
    SendUrl: 'static + ChannelInfallibleSender<Url> + ChannelTrySender<Url> + Clone + Send + Sync,
90
>(
91
    seeds: NonEmptyVec<Url>,
92
    tx_msg: SendMsg,
93
    msg_stream: impl futures::Stream<Item = Msg> + Send + 'static,
94
    mut tx_url: SendUrl,
95
    url_stream: impl futures::Stream<Item = Url> + Send + 'static,
96
    settings: PeerDiscoverySettings,
97
) -> Result<Vec<Url>, PeerDiscoveryError> {
98
    use futures::future::FutureExt;
99
    use futures::StreamExt;
100

101
    let mut seeds_set: HashSet<Url> = HashSet::new();
1✔
102

103
    for mut seed_url in seeds {
4✔
104
        #[allow(clippy::unwrap_used)]
105
        seed_url.set_port(None).unwrap();
2✔
106
        seeds_set.insert(seed_url);
1✔
107
    }
108

109
    // Task 2 from the schematic above
110
    spawn_http_request_task(
111
        tx_msg,
1✔
112
        url_stream,
1✔
113
        settings.max_parallel_tasks,
1✔
114
        settings.timeout_of_individual_node_request,
1✔
115
    );
116

117
    // Start with requests to seed nodes.
118
    for url in &seeds_set {
3✔
119
        tx_url.infallible_send(url.clone()).await;
4✔
120
    }
121

122
    // (*) This variable represents the number of URLs that need to be checked to see whether it
123
    // corresponds to an active Ergo node. `count` is crucial to allow this function to terminate,
124
    // as once it reaches zero we break the loop below. This leads us to drop `tx_url`, which is the
125
    // sender side of the receiver stream `rx_url_stream`, allowing task 1 to end.
126
    let mut count = seeds_set.len();
1✔
127

128
    let mut visited_active_peers = HashSet::new();
1✔
129
    let mut visited_peers = HashSet::new();
1✔
130

131
    // Stack of peers to evaluate. Used as a growable buffer for when the (tx_url, rx_url) channel
132
    // gets full.
133
    let mut peer_stack: Vec<PeerInfo> = vec![];
1✔
134

135
    // Here we spawn a task that triggers a signal after `settings.global_timeout` has elapsed.
136
    #[cfg(target_arch = "wasm32")]
137
    let rx_timeout_signal = {
×
138
        let (tx, rx) = futures::channel::oneshot::channel::<()>();
×
139
        wasm_bindgen_futures::spawn_local(async move {
×
140
            let _ = crate::wasm_timer::Delay::new(settings.global_timeout).await;
×
141
            let _ = tx.send(());
×
142
        });
143
        rx.into_stream()
×
144
    };
145

146
    #[cfg(not(target_arch = "wasm32"))]
147
    let rx_timeout_signal = {
×
148
        let (tx, rx) = tokio::sync::oneshot::channel::<()>();
2✔
149
        tokio::spawn(async move {
4✔
150
            tokio::time::sleep(settings.global_timeout).await;
3✔
151
            let _ = tx.send(());
1✔
152
        });
153
        rx.into_stream()
1✔
154
    };
155

156
    // In addition to listening for `Msg`s from the HTTP request task, we need to watch for the
157
    // timeout signal so we can exit early. The solution is to combine the streams.
158
    enum C {
159
        RxMsg(Msg),
160
        RxTimeoutSignal,
161
    }
162

163
    type CombinedStream = std::pin::Pin<Box<dyn futures::stream::Stream<Item = C> + Send>>;
×
164

165
    let streams: Vec<CombinedStream> = vec![
3✔
166
        msg_stream.map(C::RxMsg).boxed(),
2✔
167
        rx_timeout_signal.map(|_| C::RxTimeoutSignal).boxed(),
4✔
168
    ];
169
    let mut combined_stream = futures::stream::select_all(streams);
1✔
170

171
    // This variable equals to true as long as we're checking for new peer nodes. It is set to false
172
    // once the global timeout is reached.
173
    let mut add_peers = true;
1✔
174

175
    'loop_: while let Some(n) = combined_stream.next().await {
6✔
176
        match n {
1✔
177
            C::RxMsg(p) => {
1✔
178
                // Try pushing as many peers as can be allowed in the (tx_url, rx_url) channel
179
                while let Some(peer) = peer_stack.pop() {
3✔
180
                    let mut url = peer.addr.as_http_url();
2✔
181
                    #[allow(clippy::unwrap_used)]
182
                    url.set_port(None).unwrap();
2✔
183
                    if !visited_peers.contains(&url) {
2✔
184
                        match tx_url.try_send(url.clone()) {
2✔
185
                            Ok(_) => {
×
186
                                visited_peers.insert(url);
1✔
187
                                count += 1;
1✔
188
                            }
189
                            Err(TrySendError::Full) => {
×
190
                                // Push it back on the stack, try again later.
191
                                peer_stack.push(peer);
1✔
192
                                break;
×
193
                            }
194
                            Err(TrySendError::Closed) => {
×
195
                                return Err(PeerDiscoveryError::MpscSender);
×
196
                            }
197
                        }
198
                    }
199
                }
200
                match p {
1✔
201
                    Msg::AddActiveNode(mut url) => {
1✔
202
                        #[allow(clippy::unwrap_used)]
203
                        url.set_port(None).unwrap();
2✔
204
                        visited_active_peers.insert(url.clone());
1✔
205
                        visited_peers.insert(url);
1✔
206
                        count -= 1;
1✔
207
                        if count == 0 {
1✔
208
                            break 'loop_;
×
209
                        }
210
                    }
211
                    Msg::AddInactiveNode(mut url) => {
1✔
212
                        #[allow(clippy::unwrap_used)]
213
                        url.set_port(None).unwrap();
2✔
214
                        visited_peers.insert(url);
1✔
215
                        count -= 1;
1✔
216
                        if count == 0 {
1✔
217
                            break 'loop_;
×
218
                        }
219
                    }
220
                    Msg::CheckPeers(mut peers) => {
1✔
221
                        use rand::seq::SliceRandom;
222
                        use rand::thread_rng;
223
                        peers.shuffle(&mut thread_rng());
2✔
224
                        if add_peers {
1✔
225
                            peer_stack.extend(peers);
1✔
226
                        }
227
                    }
228
                }
229
            }
230
            C::RxTimeoutSignal => {
×
231
                add_peers = false;
1✔
232
                peer_stack.clear();
2✔
233
            }
234
        }
235
    }
236

237
    drop(tx_url);
1✔
238
    let coll: Vec<_> = visited_active_peers
2✔
239
        .difference(&seeds_set)
1✔
240
        .cloned()
241
        .collect();
242

243
    // Uncomment for debugging
244

245
    //#[cfg(not(target_arch = "wasm32"))]
246
    //println!(
247
    //    "Total # nodes visited: {}, # peers found: {}",
248
    //    visited_peers.len(),
249
    //    coll.len()
250
    //);
251
    //
252
    //#[cfg(target_arch = "wasm32")]
253
    //console_log!(
254
    //    "Total # nodes visited: {}, # peers found: {}",
255
    //    visited_peers.len(),
256
    //    coll.len()
257
    //);
258
    Ok(coll)
1✔
259
}
260

261
/// Given a stream that receives URLs of full ergo nodes, spawn a task (task 2 in the schematic
262
/// above) which checks if it is active.  If so, request its peers. In all cases, a message (enum
263
/// `Msg`) is sent out to notify the listener.
264
fn spawn_http_request_task<
1✔
265
    SendMsg: ChannelInfallibleSender<Msg> + Clone + Send + Sync + 'static,
266
>(
267
    tx_peer: SendMsg,
268
    url_stream: impl futures::Stream<Item = Url> + Send + 'static,
269
    max_parallel_requests: BoundedU16<1, { u16::MAX }>,
270
    request_timeout_duration: Duration,
271
) {
272
    use futures::StreamExt;
273

274
    // Note that `tokio` - the de facto standard async runtime - is not supported on WASM. We need
275
    // to spawn tasks for HTTP requests, and for WASM we rely on the `wasm_bindgen_futures` crate.
276
    #[cfg(not(target_arch = "wasm32"))]
277
    let spawn_fn = tokio::spawn;
1✔
278

279
    #[cfg(target_arch = "wasm32")]
280
    let spawn_fn = wasm_bindgen_futures::spawn_local;
×
281

282
    let mapped_stream = url_stream
2✔
283
        .map(move |mut url| {
2✔
284
            let mut tx_peer = tx_peer.clone();
2✔
285
            async move {
3✔
286
                // `tokio::spawn` returns a `JoinHandle` which we make sure to drop. If we don't drop
287
                // and instead await on it, performance suffers greatly (~ 5x slower). In WASM case
288
                // we don't need to worry because `wasm_bindgen_futures::spawn_local` returns ().
289
                let _handle = spawn_fn(async move {
4✔
290
                    // Query node at url.
291
                    #[allow(clippy::unwrap_used)]
×
292
                    url.set_port(Some(9053)).unwrap();
2✔
293
                    #[allow(clippy::unwrap_used)]
×
294
                    let node_conf = NodeConf {
1✔
295
                        addr: PeerAddr::try_from(&url).unwrap(),
1✔
296
                        api_key: None,
1✔
297
                        timeout: Some(request_timeout_duration),
1✔
298
                    };
299

300
                    // If active, look up its peers.
301
                    match get_info(node_conf).await {
2✔
302
                        Ok(_) => {
×
303
                            match get_peers_all(node_conf).await {
3✔
304
                                Ok(peers) => {
1✔
305
                                    // It's important to send this message before the `AddActiveNode`
306
                                    // message below, to ensure an accurate `count` variable in task 1;
307
                                    // see (*) above in `peer_discovery_inner`.
308
                                    tx_peer.infallible_send(Msg::CheckPeers(peers)).await;
2✔
309
                                    tx_peer
3✔
310
                                        .infallible_send(Msg::AddActiveNode(url.clone()))
1✔
311
                                        .await;
2✔
312
                                }
313
                                Err(_) => {
×
314
                                    #[allow(clippy::unwrap_used)]
×
315
                                    tx_peer.infallible_send(Msg::AddInactiveNode(url)).await;
×
316
                                }
317
                            }
318
                        }
319
                        Err(_) => {
1✔
320
                            #[allow(clippy::unwrap_used)]
×
321
                            tx_peer.infallible_send(Msg::AddInactiveNode(url)).await;
3✔
322
                        }
323
                    }
324
                });
325
            }
326
        })
327
        .buffer_unordered(max_parallel_requests.get() as usize); // Allow for parallel requests
2✔
328

329
    // Note: We need to define another binding to the spawn function to get around the Rust type
330
    // checker.
331
    #[cfg(not(target_arch = "wasm32"))]
332
    let spawn_fn_new = tokio::spawn;
×
333

334
    #[cfg(target_arch = "wasm32")]
335
    let spawn_fn_new = wasm_bindgen_futures::spawn_local;
×
336

337
    // (*) Run stream to completion.
338
    spawn_fn_new(mapped_stream.for_each(|_| async move {}));
5✔
339
}
340

341
/// Used in the implementation of `peer_discovery`
342
#[derive(Debug)]
343
pub(crate) enum Msg {
344
    /// Indicates that the ergo node at the given URL is active. This means that a GET request
345
    /// to the node's /info endpoint responds with code 200 OK.
346
    AddActiveNode(Url),
347
    /// Indicates that the ergo node at the given URL is inactive. This means that a GET request
348
    /// to the node's /info endpoint does not respond with code 200 OK.
349
    AddInactiveNode(Url),
350
    /// A list of peers of an active ergo node, returned from a GET on the /peers/all endpoint.
351
    CheckPeers(Vec<PeerInfo>),
352
}
353

354
/// This trait abstracts over the `send` method of channel senders, assuming no failure.
355
#[async_trait]
356
trait ChannelInfallibleSender<T> {
357
    /// A send that cannot fail.
358
    async fn infallible_send(&mut self, value: T);
359
}
360

361
#[cfg(not(target_arch = "wasm32"))]
362
#[async_trait]
363
impl<T: Debug + Send> ChannelInfallibleSender<T> for tokio::sync::mpsc::Sender<T> {
364
    async fn infallible_send(&mut self, value: T) {
8✔
365
        // If error results, just discard it.
366
        let _ = self.send(value).await;
6✔
367
    }
368
}
369

370
#[cfg(target_arch = "wasm32")]
371
#[async_trait]
372
impl<T: Debug + Send> ChannelInfallibleSender<T> for futures::channel::mpsc::Sender<T> {
373
    async fn infallible_send(&mut self, value: T) {
×
374
        use futures::sink::SinkExt;
375
        // If error results, just discard it.
376
        let _ = self.send(value).await;
×
377
    }
378
}
379

380
/// This trait abstracts over the `try_send` method of channel senders
381
trait ChannelTrySender<T> {
382
    fn try_send(&mut self, value: T) -> Result<(), TrySendError>;
383
}
384

385
/// Errors that can return from `try_send(..)` calls are converted into the following enum.
386
enum TrySendError {
387
    /// Receiver's buffer is full
388
    Full,
389
    /// Receiver is no longer active. Either it was specifically closed or dropped.
390
    Closed,
391
}
392

393
#[cfg(not(target_arch = "wasm32"))]
394
impl<T> ChannelTrySender<T> for tokio::sync::mpsc::Sender<T> {
395
    fn try_send(&mut self, value: T) -> Result<(), TrySendError> {
1✔
396
        use tokio::sync::mpsc::error::TrySendError as TokioTrySendError;
397
        match tokio::sync::mpsc::Sender::try_send(self, value) {
1✔
398
            Ok(()) => Ok(()),
1✔
399
            Err(TokioTrySendError::Full(_)) => Err(TrySendError::Full),
1✔
400
            Err(TokioTrySendError::Closed(_)) => Err(TrySendError::Closed),
×
401
        }
402
    }
403
}
404

405
#[cfg(target_arch = "wasm32")]
406
impl<T> ChannelTrySender<T> for futures::channel::mpsc::Sender<T> {
407
    fn try_send(&mut self, value: T) -> Result<(), TrySendError> {
×
408
        match futures::channel::mpsc::Sender::try_send(self, value) {
×
409
            Ok(_) => Ok(()),
×
410
            Err(e) => {
×
411
                if e.is_full() {
×
412
                    Err(TrySendError::Full)
×
413
                } else {
414
                    Err(TrySendError::Closed)
×
415
                }
416
            }
417
        }
418
    }
419
}
420

421
#[cfg(test)]
422
#[allow(clippy::unwrap_used)]
423
mod tests {
424
    use super::*;
425
    use std::str::FromStr;
426

427
    #[test]
428
    fn test_get_peers_all() {
429
        let runtime_inner = tokio::runtime::Builder::new_multi_thread()
430
            .enable_all()
431
            .build()
432
            .unwrap();
433
        let node_conf = NodeConf {
434
            addr: PeerAddr::from_str("213.239.193.208:9053").unwrap(),
435
            api_key: None,
436
            timeout: Some(Duration::from_secs(5)),
437
        };
438
        let res = runtime_inner.block_on(async { get_peers_all(node_conf).await.unwrap() });
439
        assert!(!res.is_empty())
440
    }
441
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc