• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tox-rs / tox / 6967930078

23 Nov 2023 09:37AM UTC coverage: 94.849% (-0.03%) from 94.876%
6967930078

Pull #477

github

web-flow
Merge 4d4ecdd60 into 5ab95a61f
Pull Request #477: Add functions for tox crate version information

0 of 10 new or added lines in 1 file covered. (0.0%)

13 existing lines in 7 files now uncovered.

16813 of 17726 relevant lines covered (94.85%)

1.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/tox_node/src/main.rs
1
#![type_length_limit = "65995950"]
2

3
#[macro_use]
4
extern crate clap;
5
#[macro_use]
6
extern crate log;
7

8
mod motd;
9
mod node_config;
10

11
use std::convert::TryInto;
12
use std::fs::File;
13
use std::io::{ErrorKind, Read, Write};
14
use std::net::SocketAddr;
15

16
use anyhow::Error;
17
use futures::{channel::mpsc, StreamExt};
18
use futures::{future, Future, FutureExt, TryFutureExt};
19
use itertools::Itertools;
20
use rand::thread_rng;
21
#[cfg(unix)]
22
use syslog::Facility;
23
use tokio::net::{TcpListener, UdpSocket};
24
use tokio::runtime;
25
use tox::core::dht::lan_discovery::LanDiscoverySender;
26
use tox::core::dht::server::Server as DhtServer;
27
use tox::core::dht::server_ext::dht_run_socket;
28
use tox::core::relay::server::{tcp_run, Server as TcpServer};
29
use tox::core::stats::Stats;
30
use tox::core::udp::Server as UdpServer;
31
use tox::crypto::*;
32
use tox::packet::onion::InnerOnionResponse;
33
use tox::packet::relay::OnionRequest;
34

35
use crate::motd::{Counters, Motd};
36
use crate::node_config::*;
37

38
/// Channel size for onion messages between UDP and TCP relay.
39
const ONION_CHANNEL_SIZE: usize = 32;
40
/// Channel size for DHT packets.
41
const DHT_CHANNEL_SIZE: usize = 32;
42

43
/// Get version in format 3AAABBBCCC, where A B and C are major, minor and patch
44
/// versions of node. `tox-bootstrapd` uses similar scheme but with leading 1.
45
/// Before it used format YYYYMMDDVV so the leading numeral was 2. To make a
46
/// difference with these schemes we use 3.
NEW
47
fn version() -> u32 {
×
NEW
48
    let major: u32 = env!("CARGO_PKG_VERSION_MAJOR").parse().expect("Invalid major version");
×
NEW
49
    let minor: u32 = env!("CARGO_PKG_VERSION_MINOR").parse().expect("Invalid minor version");
×
NEW
50
    let patch: u32 = env!("CARGO_PKG_VERSION_PATCH").parse().expect("Invalid patch version");
×
NEW
51
    assert!(major < 1000, "Invalid major version");
×
NEW
52
    assert!(minor < 1000, "Invalid minor version");
×
NEW
53
    assert!(patch < 1000, "Invalid patch version");
×
NEW
54
    3_000_000_000 + major * 1_000_000 + minor * 1000 + patch
×
55
}
56

57
/// Bind a UDP listener to the socket address.
NEW
58
async fn bind_socket(addr: SocketAddr) -> UdpSocket {
×
NEW
59
    let socket = UdpSocket::bind(&addr).await.expect("Failed to bind UDP socket");
×
60
    socket.set_broadcast(true).expect("set_broadcast call failed");
×
61
    if addr.is_ipv6() {
×
62
        socket
×
63
            .set_multicast_loop_v6(true)
64
            .expect("set_multicast_loop_v6 call failed");
65
    }
66
    socket
×
67
}
68

69
/// Save DHT keys to a binary file.
70
fn save_keys(keys_file: &str, pk: PublicKey, sk: &SecretKey) {
×
71
    #[cfg(unix)]
72
    use std::os::unix::fs::OpenOptionsExt;
73

74
    #[cfg(not(unix))]
75
    let mut file = File::create(keys_file).expect("Failed to create the keys file");
76

77
    #[cfg(unix)]
78
    let mut file = std::fs::OpenOptions::new()
×
79
        .create(true)
80
        .write(true)
81
        .mode(0o600)
82
        .open(keys_file)
83
        .expect("Failed to create the keys file");
84

85
    file.write_all(pk.as_ref())
×
86
        .expect("Failed to save public key to the keys file");
87
    file.write_all(sk.as_bytes())
×
88
        .expect("Failed to save secret key to the keys file");
89
}
90

91
/// Load DHT keys from a binary file.
92
fn load_keys(mut file: File) -> (PublicKey, SecretKey) {
×
93
    let mut buf = [0; crypto_box::KEY_SIZE * 2];
×
94
    file.read_exact(&mut buf)
×
95
        .expect("Failed to read keys from the keys file");
96
    let pk_bytes: [u8; crypto_box::KEY_SIZE] = buf[..crypto_box::KEY_SIZE]
×
97
        .try_into()
98
        .expect("Failed to read public key from the keys file");
99
    let sk_bytes: [u8; crypto_box::KEY_SIZE] = buf[crypto_box::KEY_SIZE..]
×
100
        .try_into()
101
        .expect("Failed to read secret key from the keys file");
102
    let pk = PublicKey::from(pk_bytes);
×
103
    let sk = SecretKey::from(sk_bytes);
×
104
    assert!(
×
105
        pk == sk.public_key(),
×
106
        "The loaded public key does not correspond to the loaded secret key"
107
    );
108
    (pk, sk)
×
109
}
110

111
/// Load DHT keys from a binary file or generate and save them if file does not
112
/// exist.
113
fn load_or_gen_keys(keys_file: &str) -> (PublicKey, SecretKey) {
×
114
    match File::open(keys_file) {
×
115
        Ok(file) => load_keys(file),
×
116
        Err(ref e) if e.kind() == ErrorKind::NotFound => {
×
117
            info!("Generating new DHT keys and storing them to '{}'", keys_file);
×
118
            let sk = SecretKey::generate(&mut thread_rng());
×
119
            let pk = sk.public_key();
×
120
            save_keys(keys_file, pk.clone(), &sk);
×
121
            (pk, sk)
×
122
        }
123
        Err(e) => panic!("Failed to read the keys file: {}", e),
×
124
    }
125
}
126

127
/// Run a future with the runtime specified by config.
128
fn run<F>(future: F, threads: Threads)
×
129
where
130
    F: Future<Output = Result<(), Error>> + 'static,
131
{
132
    if threads == Threads::N(1) {
×
133
        let runtime = runtime::Runtime::new().expect("Failed to create runtime");
×
134
        runtime.block_on(future).expect("Execution was terminated with error");
×
135
    } else {
136
        let mut builder = runtime::Builder::new_multi_thread();
×
137
        match threads {
×
138
            Threads::N(n) => {
×
139
                builder.worker_threads(n as usize);
×
140
            }
141
            Threads::Auto => {} // builder will detect number of cores automatically
142
        }
143
        let runtime = builder.build().expect("Failed to create runtime");
×
144
        runtime.block_on(future).expect("Execution was terminated with error");
×
145
    };
146
}
147

148
/// Onion sink and stream for TCP.
149
struct TcpOnion {
150
    /// Sink for onion packets from TCP to UDP.
151
    tx: mpsc::Sender<(OnionRequest, SocketAddr)>,
152
    /// Stream of onion packets from TCP to UDP.
153
    rx: mpsc::Receiver<(InnerOnionResponse, SocketAddr)>,
154
}
155

156
/// Onion sink and stream for UDP.
157
struct UdpOnion {
158
    /// Sink for onion packets from UDP to TCP.
159
    tx: mpsc::Sender<(InnerOnionResponse, SocketAddr)>,
160
    /// Stream of onion packets from TCP to UDP.
161
    rx: mpsc::Receiver<(OnionRequest, SocketAddr)>,
162
}
163

164
/// Create onion streams for TCP and UDP servers communication.
165
fn create_onion_streams() -> (TcpOnion, UdpOnion) {
×
166
    let (udp_onion_tx, udp_onion_rx) = mpsc::channel(ONION_CHANNEL_SIZE);
×
167
    let (tcp_onion_tx, tcp_onion_rx) = mpsc::channel(ONION_CHANNEL_SIZE);
×
168
    let tcp_onion = TcpOnion {
169
        tx: tcp_onion_tx,
170
        rx: udp_onion_rx,
171
    };
172
    let udp_onion = UdpOnion {
173
        tx: udp_onion_tx,
174
        rx: tcp_onion_rx,
175
    };
176
    (tcp_onion, udp_onion)
×
177
}
178

179
async fn run_tcp(config: &NodeConfig, dht_sk: SecretKey, mut tcp_onion: TcpOnion, stats: Stats) -> Result<(), Error> {
×
180
    if config.tcp_addrs.is_empty() {
×
181
        // If TCP address is not specified don't start TCP server and only drop
182
        // all onion packets from DHT server
183
        while tcp_onion.rx.next().await.is_some() {}
×
184

185
        return Ok(());
×
186
    }
187

188
    let onion_tx = tcp_onion.tx;
×
189
    let mut onion_rx = tcp_onion.rx;
×
190

191
    let mut tcp_server = TcpServer::new();
×
192
    tcp_server.set_udp_onion_sink(onion_tx);
×
193

194
    let tcp_server_c = tcp_server.clone();
×
195
    let tcp_server_futures = config.tcp_addrs.iter().map(move |&addr| {
×
196
        let tcp_server_c = tcp_server_c.clone();
×
197
        let stats = stats.clone();
×
198
        let dht_sk = dht_sk.clone();
×
199
        async move {
×
200
            let listener = TcpListener::bind(&addr).await.expect("Failed to bind TCP listener");
×
201
            tcp_run(
×
202
                &tcp_server_c,
×
203
                listener,
×
204
                dht_sk,
×
205
                stats.clone(),
×
206
                config.tcp_connections_limit,
×
207
            )
208
            .await
×
209
            .map_err(Error::from)
210
        }
211
        .boxed()
212
    });
213

214
    let tcp_server_future = async { future::select_all(tcp_server_futures).await.0 };
×
215

216
    // let tcp_onion_rx = tcp_onion.rx.clone()
217
    let tcp_onion_future = async {
×
218
        while let Some((onion_response, addr)) = onion_rx.next().await {
×
219
            let res = tcp_server
×
220
                .handle_udp_onion_response(addr.ip(), addr.port(), onion_response)
×
221
                .await;
×
222

223
            if let Err(err) = res {
×
224
                warn!("Failed to handle UDP onion response: {:?}", err);
×
225
            }
226
        }
227

228
        Ok(())
×
229
    };
230

231
    info!("Running TCP relay on {}", config.tcp_addrs.iter().format(","));
×
232

233
    futures::try_join!(tcp_server_future, tcp_onion_future)?;
×
234

235
    Ok(())
×
236
}
237

238
async fn run_udp(
×
239
    config: &NodeConfig,
240
    dht_pk: PublicKey,
241
    dht_sk: &SecretKey,
242
    mut udp_onion: UdpOnion,
243
    tcp_stats: Stats,
244
) -> Result<(), Error> {
245
    let udp_addr = if let Some(udp_addr) = config.udp_addr {
×
246
        udp_addr
×
247
    } else {
248
        // If UDP address is not specified don't start DHT server and only drop
249
        // all onion packets from TCP server
250
        while udp_onion.rx.next().await.is_some() {}
×
251

252
        return Ok(());
×
253
    };
254

255
    let socket = bind_socket(udp_addr).await;
×
256
    let udp_stats = Stats::new();
×
257

258
    // Create a channel for server to communicate with network
259
    let (tx, rx) = mpsc::channel(DHT_CHANNEL_SIZE);
×
260

261
    let tx_clone = tx.clone();
×
262
    let dht_pk_c = dht_pk.clone();
×
263
    let lan_discovery_future = async move {
×
264
        if config.lan_discovery_enabled {
×
265
            LanDiscoverySender::new(tx_clone, dht_pk_c, udp_addr.is_ipv6())
×
266
                .run()
267
                .map_err(Error::from)
268
                .await
×
269
        } else {
270
            Ok(())
×
271
        }
272
    };
273

274
    let (onion_tx, mut onion_rx) = (udp_onion.tx, udp_onion.rx);
×
275

276
    let mut dht_server = DhtServer::new(tx, dht_pk, dht_sk.clone());
×
277
    let counters = Counters::new(tcp_stats, udp_stats.clone());
×
278
    let motd = Motd::new(config.motd.clone(), counters);
×
279
    dht_server.set_bootstrap_info(version(), Box::new(move |_| motd.format().as_bytes().to_owned()));
×
280
    dht_server.enable_lan_discovery(config.lan_discovery_enabled);
×
281
    dht_server.set_tcp_onion_sink(onion_tx);
×
282
    dht_server.enable_ipv6_mode(udp_addr.is_ipv6());
×
283

284
    let dht_server_c = dht_server.clone();
×
285
    let udp_onion_future = async move {
×
286
        while let Some((onion_request, addr)) = onion_rx.next().await {
×
287
            let res = dht_server_c.handle_tcp_onion_request(onion_request, addr).await;
×
288

289
            if let Err(err) = res {
×
290
                warn!("Failed to handle TCP onion request: {:?}", err);
×
291
            }
292
        }
293

294
        Ok(())
×
295
    };
296

297
    if config.bootstrap_nodes.is_empty() {
×
298
        warn!("No bootstrap nodes!");
×
299
    }
300

301
    for node in config.bootstrap_nodes.iter().flat_map(|node| node.resolve()) {
×
302
        dht_server.add_initial_bootstrap(node);
×
303
    }
304

305
    let udp_server = UdpServer::new(dht_server);
×
306

307
    info!("Running DHT server on {}", udp_addr);
×
308

309
    let udp_server_future = dht_run_socket(&udp_server, socket, rx, udp_stats).map_err(Error::from);
×
310

311
    futures::try_join!(udp_server_future, lan_discovery_future, udp_onion_future)?;
×
312

313
    Ok(())
×
314
}
315

316
fn main() {
×
317
    let config = cli_parse();
×
318

319
    match config.log_type {
×
320
        LogType::Stderr => {
321
            let env = env_logger::Env::default().filter_or("RUST_LOG", "info");
×
322
            env_logger::Builder::from_env(env).init();
×
323
        }
324
        LogType::Stdout => {
325
            let env = env_logger::Env::default().filter_or("RUST_LOG", "info");
×
326
            env_logger::Builder::from_env(env)
×
327
                .target(env_logger::fmt::Target::Stdout)
×
328
                .init();
329
        }
330
        #[cfg(unix)]
331
        LogType::Syslog => {
332
            syslog::init(Facility::LOG_USER, log::LevelFilter::Info, None)
×
333
                .expect("Failed to initialize syslog backend.");
334
        }
335
        LogType::None => {}
336
    }
337

338
    for key in config.unused.keys() {
×
339
        warn!("Unused configuration key: {:?}", key);
×
340
    }
341

342
    let (dht_pk, dht_sk) = if let Some(ref sk) = config.sk {
×
343
        (sk.public_key(), sk.clone())
×
344
    } else if let Some(ref keys_file) = config.keys_file {
×
345
        load_or_gen_keys(keys_file)
×
346
    } else {
347
        panic!("Neither secret key nor keys file is specified")
×
348
    };
349

350
    if config.tcp_addrs.is_empty() && config.udp_addr.is_none() {
×
351
        panic!("Both TCP addresses and UDP address are not defined.")
×
352
    }
353

354
    if config.sk_passed_as_arg {
×
355
        warn!(
×
356
            "You should not pass the secret key via arguments due to \
357
               security reasons. Use the environment variable instead"
358
        );
359
    }
360

361
    info!("DHT public key: {}", hex::encode(dht_pk.as_ref()).to_uppercase());
×
362

363
    let (tcp_onion, udp_onion) = create_onion_streams();
×
364

365
    let udp_tcp_stats = Stats::new();
×
366
    let tcp_tcp_stats = udp_tcp_stats.clone();
×
367

368
    let udp_config = config.clone();
×
369
    let udp_dht_sk = dht_sk.clone();
×
370
    let udp_server_future =
×
371
        async move { run_udp(&udp_config, dht_pk, &udp_dht_sk, udp_onion, udp_tcp_stats.clone()).await };
372

373
    let tcp_config = config.clone();
×
374
    let tcp_dht_sk = dht_sk;
×
375
    let tcp_server_future = async move { run_tcp(&tcp_config, tcp_dht_sk, tcp_onion, tcp_tcp_stats).await };
×
376

377
    let future = async move {
×
378
        futures::select! {
×
379
            res = udp_server_future.fuse() => res,
×
380
            res = tcp_server_future.fuse() => res,
×
381
        }
382
    };
383

384
    run(future, config.threads);
×
385
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc