• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ergoplatform / sigma-rust / 19909456309

03 Dec 2025 09:29PM UTC coverage: 86.947% (+8.5%) from 78.463%
19909456309

Pull #838

github

web-flow
Merge 717ebc4b7 into 2f840d387
Pull Request #838: Fix CI, bump dependencies and rust toolchain

20 of 24 new or added lines in 12 files covered. (83.33%)

1614 existing lines in 221 files now uncovered.

27478 of 31603 relevant lines covered (86.95%)

506307.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.53
/ergo-rest/src/api/peer_discovery_internals/non_chrome.rs
1
//! This module contains the implementation of `peer_discovery`.  It's structured as 2 separate
2
//! tasks:
3
//!
4
//!  - Task 1 is responsible for tracking which nodes are active/inactive and making sure that any
5
//!    given ergo node is queried exactly once.
6
//!  - Task 2's job is to wait for a URL from task 1, make the actual HTTP requests to that URL, and
7
//!    to report the result back to task 1.
8
//! ```text
9
//!                              <ergo node URL>
10
//!               __________________________________________________
11
//!              |                                                  |
12
//!              |                                                  v
13
//!  /----------------------\                   /----------------------\
14
//!  | 1. Track node status |                   | 2. HTTP request task |
15
//!  \----------------------/                   \----------------------/
16
//!              ^                                                  |
17
//!              |__________________________________________________|
18
//!                <active node| non-active node| list of peers>
19
//! ```
20
use super::PeerDiscoverySettings;
21
use crate::api::peer_discovery_internals::get_peers_all;
22
use crate::error::PeerDiscoveryError;
23
use crate::{api::node::get_info, NodeConf, PeerInfo};
24
use async_trait::async_trait;
25
use bounded_integer::BoundedU16;
26
use bounded_vec::NonEmptyVec;
27
use ergo_chain_types::PeerAddr;
28
use std::fmt::Debug;
29
use std::{collections::HashSet, time::Duration};
30
use url::Url;
31

32
// Uncomment the following to enable logging on WASM through the `console_log` macro. Taken from
33
// https://rustwasm.github.io/wasm-bindgen/examples/console-log.html#srclibrs
34
//#[cfg(target_arch = "wasm32")]
35
//use wasm_bindgen::prelude::*;
36
//
37
//
38
//#[cfg(target_arch = "wasm32")]
39
//#[wasm_bindgen]
40
//extern "C" {
41
//    // Use `js_namespace` here to bind `console.log(..)` instead of just
42
//    // `log(..)`
43
//    #[wasm_bindgen(js_namespace = console)]
44
//    fn log(s: &str);
45
//}
46
//
47
//#[cfg(target_arch = "wasm32")]
48
//macro_rules! console_log {
49
// Note that this is using the `log` function imported above during
50
// `bare_bones`
51
//($($t:tt)*) => (log(&format_args!($($t)*).to_string()))
52
//}
53

54
pub(crate) async fn peer_discovery_inner(
4✔
55
    seeds: NonEmptyVec<Url>,
4✔
56
    max_parallel_tasks: BoundedU16<1, { u16::MAX }>,
4✔
57
    timeout: Duration,
4✔
58
) -> Result<Vec<Url>, PeerDiscoveryError> {
4✔
59
    let settings = PeerDiscoverySettings {
4✔
60
        max_parallel_tasks,
4✔
61
        task_2_buffer_length: max_parallel_tasks.get() as usize,
4✔
62
        global_timeout: timeout,
4✔
63
        timeout_of_individual_node_request: Duration::from_secs(4),
4✔
64
    };
4✔
65
    #[cfg(not(target_arch = "wasm32"))]
66
    let (tx_msg, rx_msg) = tokio::sync::mpsc::channel::<Msg>(settings.task_2_buffer_length);
4✔
67
    #[cfg(not(target_arch = "wasm32"))]
68
    let (tx_url, rx_url) = tokio::sync::mpsc::channel::<Url>(settings.task_2_buffer_length);
4✔
69
    #[cfg(not(target_arch = "wasm32"))]
70
    let url_stream = tokio_stream::wrappers::ReceiverStream::new(rx_url);
4✔
71
    #[cfg(not(target_arch = "wasm32"))]
72
    let msg_stream = tokio_stream::wrappers::ReceiverStream::new(rx_msg);
4✔
73

74
    #[cfg(target_arch = "wasm32")]
75
    let (tx_msg, rx_msg) = futures::channel::mpsc::channel::<Msg>(settings.task_2_buffer_length);
76
    #[cfg(target_arch = "wasm32")]
77
    let (tx_url, rx_url) = futures::channel::mpsc::channel::<Url>(settings.task_2_buffer_length);
78
    #[cfg(target_arch = "wasm32")]
79
    let url_stream = rx_url;
80
    #[cfg(target_arch = "wasm32")]
81
    let msg_stream = rx_msg;
82

83
    peer_discovery_impl(seeds, tx_msg, msg_stream, tx_url, url_stream, settings).await
4✔
84
}
4✔
85

86
/// Implementation of `peer_discovery`.
87
async fn peer_discovery_impl<
4✔
88
    SendMsg: 'static + ChannelInfallibleSender<Msg> + Clone + Send + Sync,
4✔
89
    SendUrl: 'static + ChannelInfallibleSender<Url> + ChannelTrySender<Url> + Clone + Send + Sync,
4✔
90
>(
4✔
91
    seeds: NonEmptyVec<Url>,
4✔
92
    tx_msg: SendMsg,
4✔
93
    msg_stream: impl futures::Stream<Item = Msg> + Send + 'static,
4✔
94
    mut tx_url: SendUrl,
4✔
95
    url_stream: impl futures::Stream<Item = Url> + Send + 'static,
4✔
96
    settings: PeerDiscoverySettings,
4✔
97
) -> Result<Vec<Url>, PeerDiscoveryError> {
4✔
98
    use futures::future::FutureExt;
99
    use futures::StreamExt;
100

101
    let mut seeds_set: HashSet<Url> = HashSet::new();
4✔
102

103
    for mut seed_url in seeds {
60✔
104
        #[allow(clippy::unwrap_used)]
56✔
105
        seed_url.set_port(None).unwrap();
56✔
106
        seeds_set.insert(seed_url);
56✔
107
    }
56✔
108

109
    // Task 2 from the schematic above
110
    spawn_http_request_task(
4✔
111
        tx_msg,
4✔
112
        url_stream,
4✔
113
        settings.max_parallel_tasks,
4✔
114
        settings.timeout_of_individual_node_request,
4✔
115
    );
116

117
    // Start with requests to seed nodes.
118
    for url in &seeds_set {
60✔
119
        tx_url.infallible_send(url.clone()).await;
56✔
120
    }
121

122
    // (*) This variable represents the number of URLs that need to be checked to see whether it
123
    // corresponds to an active Ergo node. `count` is crucial to allow this function to terminate,
124
    // as once it reaches zero we break the loop below. This leads us to drop `tx_url`, which is the
125
    // sender side of the receiver stream `rx_url_stream`, allowing task 1 to end.
126
    let mut count = seeds_set.len();
4✔
127

128
    let mut visited_active_peers = HashSet::new();
4✔
129
    let mut visited_peers = HashSet::new();
4✔
130

131
    // Stack of peers to evaluate. Used as a growable buffer for when the (tx_url, rx_url) channel
132
    // gets full.
133
    let mut peer_stack: Vec<PeerInfo> = vec![];
4✔
134

135
    // Here we spawn a task that triggers a signal after `settings.global_timeout` has elapsed.
136
    #[cfg(target_arch = "wasm32")]
137
    let rx_timeout_signal = {
138
        let (tx, rx) = futures::channel::oneshot::channel::<()>();
139
        wasm_bindgen_futures::spawn_local(async move {
140
            let _ = wasmtimer::tokio::sleep(settings.global_timeout).await;
141
            let _ = tx.send(());
142
        });
143
        rx.into_stream()
144
    };
145

146
    #[cfg(not(target_arch = "wasm32"))]
147
    let rx_timeout_signal = {
4✔
148
        let (tx, rx) = tokio::sync::oneshot::channel::<()>();
4✔
149
        tokio::spawn(async move {
4✔
150
            tokio::time::sleep(settings.global_timeout).await;
4✔
151
            let _ = tx.send(());
2✔
152
        });
2✔
153
        rx.into_stream()
4✔
154
    };
155

156
    // In addition to listening for `Msg`s from the HTTP request task, we need to watch for the
157
    // timeout signal so we can exit early. The solution is to combine the streams.
158
    enum C {
159
        RxMsg(Msg),
160
        RxTimeoutSignal,
161
    }
162

163
    type CombinedStream = std::pin::Pin<Box<dyn futures::stream::Stream<Item = C> + Send>>;
164

165
    let streams: Vec<CombinedStream> = vec![
4✔
166
        msg_stream.map(C::RxMsg).boxed(),
4✔
167
        rx_timeout_signal.map(|_| C::RxTimeoutSignal).boxed(),
4✔
168
    ];
169
    let mut combined_stream = futures::stream::select_all(streams);
4✔
170

171
    // This variable equals to true as long as we're checking for new peer nodes. It is set to false
172
    // once the global timeout is reached.
173
    let mut add_peers = true;
4✔
174

175
    'loop_: while let Some(n) = combined_stream.next().await {
3,193✔
176
        match n {
3,193✔
177
            C::RxMsg(p) => {
3,191✔
178
                // Try pushing as many peers as can be allowed in the (tx_url, rx_url) channel
179
                while let Some(peer) = peer_stack.pop() {
26,697✔
180
                    let mut url = peer.addr.as_http_url();
24,107✔
181
                    #[allow(clippy::unwrap_used)]
182
                    url.set_port(None).unwrap();
24,107✔
183
                    if !visited_peers.contains(&url) {
24,107✔
184
                        match tx_url.try_send(url.clone()) {
3,638✔
185
                            Ok(_) => {
3,037✔
186
                                visited_peers.insert(url);
3,037✔
187
                                count += 1;
3,037✔
188
                            }
3,037✔
189
                            Err(TrySendError::Full) => {
190
                                // Push it back on the stack, try again later.
191
                                peer_stack.push(peer);
601✔
192
                                break;
601✔
193
                            }
194
                            Err(TrySendError::Closed) => {
195
                                return Err(PeerDiscoveryError::MpscSender);
×
196
                            }
197
                        }
198
                    }
20,469✔
199
                }
200
                match p {
3,191✔
201
                    Msg::AddActiveNode(mut url) => {
98✔
202
                        #[allow(clippy::unwrap_used)]
203
                        url.set_port(None).unwrap();
98✔
204
                        visited_active_peers.insert(url.clone());
98✔
205
                        visited_peers.insert(url);
98✔
206
                        count -= 1;
98✔
207
                        if count == 0 {
98✔
208
                            break 'loop_;
×
209
                        }
98✔
210
                    }
211
                    Msg::AddInactiveNode(mut url) => {
2,995✔
212
                        #[allow(clippy::unwrap_used)]
213
                        url.set_port(None).unwrap();
2,995✔
214
                        visited_peers.insert(url);
2,995✔
215
                        count -= 1;
2,995✔
216
                        if count == 0 {
2,995✔
217
                            break 'loop_;
4✔
218
                        }
2,991✔
219
                    }
220
                    Msg::CheckPeers(mut peers) => {
98✔
221
                        use rand::seq::SliceRandom;
222
                        use rand::thread_rng;
223
                        peers.shuffle(&mut thread_rng());
98✔
224
                        if add_peers {
98✔
225
                            peer_stack.extend(peers);
71✔
226
                        }
71✔
227
                    }
228
                }
229
            }
230
            C::RxTimeoutSignal => {
2✔
231
                add_peers = false;
2✔
232
                peer_stack.clear();
2✔
233
            }
2✔
234
        }
235
    }
236

237
    drop(tx_url);
4✔
238
    let coll: Vec<_> = visited_active_peers
4✔
239
        .difference(&seeds_set)
4✔
240
        .cloned()
4✔
241
        .collect();
4✔
242

243
    // Uncomment for debugging
244

245
    //#[cfg(not(target_arch = "wasm32"))]
246
    //println!(
247
    //    "Total # nodes visited: {}, # peers found: {}",
248
    //    visited_peers.len(),
249
    //    coll.len()
250
    //);
251
    //
252
    //#[cfg(target_arch = "wasm32")]
253
    //console_log!(
254
    //    "Total # nodes visited: {}, # peers found: {}",
255
    //    visited_peers.len(),
256
    //    coll.len()
257
    //);
258
    Ok(coll)
4✔
259
}
4✔
260

261
/// Given a stream that receives URLs of full ergo nodes, spawn a task (task 2 in the schematic
262
/// above) which checks if it is active.  If so, request its peers. In all cases, a message (enum
263
/// `Msg`) is sent out to notify the listener.
264
fn spawn_http_request_task<
4✔
265
    SendMsg: ChannelInfallibleSender<Msg> + Clone + Send + Sync + 'static,
4✔
266
>(
4✔
267
    tx_peer: SendMsg,
4✔
268
    url_stream: impl futures::Stream<Item = Url> + Send + 'static,
4✔
269
    max_parallel_requests: BoundedU16<1, { u16::MAX }>,
4✔
270
    request_timeout_duration: Duration,
4✔
271
) {
4✔
272
    use futures::StreamExt;
273

274
    // Note that `tokio` - the de facto standard async runtime - is not supported on WASM. We need
275
    // to spawn tasks for HTTP requests, and for WASM we rely on the `wasm_bindgen_futures` crate.
276
    #[cfg(not(target_arch = "wasm32"))]
277
    let spawn_fn = tokio::spawn;
4✔
278

279
    #[cfg(target_arch = "wasm32")]
280
    let spawn_fn = wasm_bindgen_futures::spawn_local;
281

282
    let mapped_stream = url_stream
4✔
283
        .map(move |mut url| {
3,093✔
284
            let mut tx_peer = tx_peer.clone();
3,093✔
285
            async move {
3,093✔
286
                // `tokio::spawn` returns a `JoinHandle` which we make sure to drop. If we don't drop
287
                // and instead await on it, performance suffers greatly (~ 5x slower). In WASM case
288
                // we don't need to worry because `wasm_bindgen_futures::spawn_local` returns ().
289
                let _handle = spawn_fn(async move {
3,093✔
290
                    // Query node at url.
291
                    #[allow(clippy::unwrap_used)]
292
                    url.set_port(Some(9053)).unwrap();
3,093✔
293
                    #[allow(clippy::unwrap_used)]
294
                    let node_conf = NodeConf {
3,093✔
295
                        addr: PeerAddr::try_from(&url).unwrap(),
3,093✔
296
                        api_key: None,
3,093✔
297
                        timeout: Some(request_timeout_duration),
3,093✔
298
                    };
3,093✔
299

300
                    // If active, look up its peers.
301
                    match get_info(node_conf).await {
3,093✔
302
                        Ok(_) => {
303
                            match get_peers_all(node_conf).await {
98✔
304
                                Ok(peers) => {
98✔
305
                                    // It's important to send this message before the `AddActiveNode`
306
                                    // message below, to ensure an accurate `count` variable in task 1;
307
                                    // see (*) above in `peer_discovery_inner`.
308
                                    tx_peer.infallible_send(Msg::CheckPeers(peers)).await;
98✔
309
                                    tx_peer
98✔
310
                                        .infallible_send(Msg::AddActiveNode(url.clone()))
98✔
311
                                        .await;
98✔
312
                                }
313
                                Err(_) => {
314
                                    #[allow(clippy::unwrap_used)]
UNCOV
315
                                    tx_peer.infallible_send(Msg::AddInactiveNode(url)).await;
×
316
                                }
317
                            }
318
                        }
319
                        Err(_) => {
320
                            #[allow(clippy::unwrap_used)]
321
                            tx_peer.infallible_send(Msg::AddInactiveNode(url)).await;
2,995✔
322
                        }
323
                    }
324
                });
3,093✔
325
            }
3,093✔
326
        })
3,093✔
327
        .buffer_unordered(max_parallel_requests.get() as usize); // Allow for parallel requests
4✔
328

329
    // Note: We need to define another binding to the spawn function to get around the Rust type
330
    // checker.
331
    #[cfg(not(target_arch = "wasm32"))]
332
    let spawn_fn_new = tokio::spawn;
4✔
333

334
    #[cfg(target_arch = "wasm32")]
335
    let spawn_fn_new = wasm_bindgen_futures::spawn_local;
336

337
    // (*) Run stream to completion.
338
    spawn_fn_new(mapped_stream.for_each(|_| async move {}));
3,093✔
339
}
4✔
340

341
/// Used in the implementation of `peer_discovery`
342
#[derive(Debug)]
343
pub(crate) enum Msg {
344
    /// Indicates that the ergo node at the given URL is active. This means that a GET request
345
    /// to the node's /info endpoint responds with code 200 OK.
346
    AddActiveNode(Url),
347
    /// Indicates that the ergo node at the given URL is inactive. This means that a GET request
348
    /// to the node's /info endpoint does not respond with code 200 OK.
349
    AddInactiveNode(Url),
350
    /// A list of peers of an active ergo node, returned from a GET on the /peers/all endpoint.
351
    CheckPeers(Vec<PeerInfo>),
352
}
353

354
/// This trait abstracts over the `send` method of channel senders, assuming no failure.
355
#[async_trait]
356
trait ChannelInfallibleSender<T> {
357
    /// A send that cannot fail.
358
    async fn infallible_send(&mut self, value: T);
359
}
360

361
#[cfg(not(target_arch = "wasm32"))]
362
#[async_trait]
363
impl<T: Debug + Send> ChannelInfallibleSender<T> for tokio::sync::mpsc::Sender<T> {
364
    async fn infallible_send(&mut self, value: T) {
3,247✔
365
        // If error results, just discard it.
366
        let _ = self.send(value).await;
367
    }
3,247✔
368
}
369

370
#[cfg(target_arch = "wasm32")]
371
#[async_trait]
372
impl<T: Debug + Send> ChannelInfallibleSender<T> for futures::channel::mpsc::Sender<T> {
373
    async fn infallible_send(&mut self, value: T) {
374
        use futures::sink::SinkExt;
375
        // If error results, just discard it.
376
        let _ = self.send(value).await;
377
    }
378
}
379

380
/// This trait abstracts over the `try_send` method of channel senders
381
trait ChannelTrySender<T> {
382
    fn try_send(&mut self, value: T) -> Result<(), TrySendError>;
383
}
384

385
/// Errors that can return from `try_send(..)` calls are converted into the following enum.
386
enum TrySendError {
387
    /// Receiver's buffer is full
388
    Full,
389
    /// Receiver is no longer active. Either it was specifically closed or dropped.
390
    Closed,
391
}
392

393
#[cfg(not(target_arch = "wasm32"))]
394
impl<T> ChannelTrySender<T> for tokio::sync::mpsc::Sender<T> {
395
    fn try_send(&mut self, value: T) -> Result<(), TrySendError> {
3,638✔
396
        use tokio::sync::mpsc::error::TrySendError as TokioTrySendError;
397
        match tokio::sync::mpsc::Sender::try_send(self, value) {
3,638✔
398
            Ok(()) => Ok(()),
3,037✔
399
            Err(TokioTrySendError::Full(_)) => Err(TrySendError::Full),
601✔
400
            Err(TokioTrySendError::Closed(_)) => Err(TrySendError::Closed),
×
401
        }
402
    }
3,638✔
403
}
404

405
#[cfg(target_arch = "wasm32")]
406
impl<T> ChannelTrySender<T> for futures::channel::mpsc::Sender<T> {
407
    fn try_send(&mut self, value: T) -> Result<(), TrySendError> {
408
        match futures::channel::mpsc::Sender::try_send(self, value) {
409
            Ok(_) => Ok(()),
410
            Err(e) => {
411
                if e.is_full() {
412
                    Err(TrySendError::Full)
413
                } else {
414
                    Err(TrySendError::Closed)
415
                }
416
            }
417
        }
418
    }
419
}
420

421
#[cfg(test)]
422
#[allow(clippy::unwrap_used)]
423
mod tests {
424
    use super::*;
425
    use std::str::FromStr;
426

427
    #[test]
428
    fn test_get_peers_all() {
2✔
429
        let runtime_inner = tokio::runtime::Builder::new_multi_thread()
2✔
430
            .enable_all()
2✔
431
            .build()
2✔
432
            .unwrap();
2✔
433
        let node_conf = NodeConf {
2✔
434
            addr: PeerAddr::from_str("213.239.193.208:9053").unwrap(),
2✔
435
            api_key: None,
2✔
436
            timeout: Some(Duration::from_secs(5)),
2✔
437
        };
2✔
438
        let res = runtime_inner.block_on(async { get_peers_all(node_conf).await.unwrap() });
2✔
439
        assert!(!res.is_empty())
2✔
440
    }
2✔
441
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc