• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

oguzbilgener / noxious / 6020306411

30 Aug 2023 03:46AM UTC coverage: 81.141% (-0.1%) from 81.266%
6020306411

push

github

oguzbilgener
wip: work on release-please

1308 of 1612 relevant lines covered (81.14%)

1.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.02
/core/src/proxy.rs
1
use crate::socket::{SocketListener, SocketStream};
2
use crate::{
3
    error::NotFoundError,
4
    link::Link,
5
    signal::{Closer, Stop},
6
    state::{ProxyState, SharedProxyInfo, ToxicStateHolder},
7
    stream::{Read, Write},
8
    toxic::{update_toxic_list_in_place, StreamDirection, Toxic, ToxicEvent, ToxicEventResult},
9
};
10
use async_trait::async_trait;
11
use bmrng::{Payload, RequestReceiver};
12
use futures::{stream, StreamExt};
13
#[cfg(test)]
14
use mockall::automock;
15
use serde::{Deserialize, Serialize};
16
use std::io;
17
use std::net::SocketAddr;
18
use std::sync::Arc;
19
use thiserror::Error;
20
use tokio_util::codec::{BytesCodec, FramedRead, FramedWrite};
21
use tracing::{debug, error, info, instrument};
22

23
/// The default Go io.Copy buffer size is 32K, so also use 32K buffers here to imitate Toxiproxy.
24
const READ_BUFFER_SIZE: usize = 32768;
25

26
/// The immutable configuration for a proxy
27
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
28
pub struct ProxyConfig {
29
    /// An arbitrary name
30
    #[serde(default = "default_name")]
31
    pub name: String,
32
    /// The host name and the port the proxy listens on, like 127.0.0.1:5431
33
    pub listen: String,
34
    /// The host name and the port the proxy connects to, like 127.0.0:5432
35
    pub upstream: String,
36
    /// The client can set the enabled field to false to stop this proxy.
37
    /// Proxies are enabled by default
38
    #[serde(default = "default_enabled")]
39
    pub enabled: bool,
40
    /// A random seed. Not exposed in the API
41
    #[serde(skip)]
42
    pub rand_seed: Option<u64>,
43
}
44

45
fn default_name() -> String {
1✔
46
    "".to_owned()
1✔
47
}
48

49
fn default_enabled() -> bool {
1✔
50
    true
51
}
52

53
/// A holder for upstream and downstream links, as well as the per-connection state
54
#[derive(Debug)]
55
pub struct Links {
56
    upstream: Link,
57
    client: Link,
58
    /// Optional, connection-wide state for toxics that need such state (like LimitData)
59
    /// Toxic Name -> State
60
    state_holder: Option<Arc<ToxicStateHolder>>,
61
}
62

63
/// Toxics applied on a proxy connection
64
#[derive(Debug, Clone)]
65
pub struct Toxics {
66
    /// The toxics applied on the upstream link
67
    pub upstream: Vec<Toxic>,
68
    /// The toxics applied on the downstream link
69
    pub downstream: Vec<Toxic>,
70
}
71

72
/// The serializable API response
73
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
74
pub struct ProxyWithToxics {
75
    /// The proxy details
76
    #[serde(flatten)]
77
    pub proxy: ProxyConfig,
78
    /// Toxics installed on the proxy
79
    pub toxics: Vec<Toxic>,
80
}
81

82
impl ProxyConfig {
83
    /// Validate the proxy config, return `ProxyValidateError` if invalid
84
    pub fn validate(&self) -> Result<(), ProxyValidateError> {
2✔
85
        if self.name.is_empty() {
3✔
86
            Err(ProxyValidateError::MissingName)
1✔
87
        } else if self.upstream.is_empty() {
3✔
88
            Err(ProxyValidateError::MissingUpstream)
1✔
89
        } else if self.listen.is_empty() {
4✔
90
            Err(ProxyValidateError::MissingListen)
1✔
91
        } else {
92
            Ok(())
2✔
93
        }
94
    }
95
}
96

97
impl Toxics {
98
    /// Initialize an empty set up toxics
99
    pub fn empty() -> Self {
2✔
100
        Toxics {
101
            upstream: Vec::new(),
2✔
102
            downstream: Vec::new(),
2✔
103
        }
104
    }
105

106
    /// Consume this Toxics struct to combine upstream and downstream toxics in a flat unordered vec
107
    pub fn into_vec(mut self) -> Vec<Toxic> {
1✔
108
        self.upstream.append(&mut self.downstream);
1✔
109
        self.upstream
1✔
110
    }
111

112
    /// Find a toxic by name in upstream and downstream lists
113
    pub fn find_by_name(&self, toxic_name: &str) -> Option<Toxic> {
1✔
114
        self.upstream
3✔
115
            .iter()
116
            .find(|toxic| toxic.name == toxic_name)
3✔
117
            .or_else(|| {
2✔
118
                self.downstream
2✔
119
                    .iter()
×
120
                    .find(|toxic| toxic.name == toxic_name)
1✔
121
            })
122
            .map(|toxic| toxic.to_owned())
2✔
123
    }
124
}
125

126
impl ProxyWithToxics {
127
    /// Create the full ProxyWithToxics from SharedProxyInfo
128
    pub fn from_shared_proxy_info(info: SharedProxyInfo) -> Self {
1✔
129
        let proxy_state = info.state.lock();
2✔
130
        ProxyWithToxics {
131
            proxy: info.clone_config(),
1✔
132
            toxics: proxy_state.toxics.clone().into_vec(),
2✔
133
        }
134
    }
135

136
    /// Create a new ProxyWithToxics with empty toxics
137
    pub fn from_proxy_config(proxy_config: ProxyConfig) -> Self {
1✔
138
        ProxyWithToxics {
139
            proxy: proxy_config,
140
            toxics: Vec::new(),
1✔
141
        }
142
    }
143
}
144

145
struct Streams {
146
    client_read: Read,
147
    client_write: Write,
148
    upstream_read: Read,
149
    upstream_write: Write,
150
}
151

152
/// The proxy runner interface (defined for mocking, mainly)
153
#[cfg_attr(test, automock)]
154
#[async_trait]
155
pub trait Runner {
156
    /// Initialize a proxy, bind to a TCP port but don't start accepting clients
157
    async fn initialize_proxy<Listener>(
158
        config: ProxyConfig,
159
        initial_toxics: Toxics,
160
    ) -> io::Result<(Listener, SharedProxyInfo)>
161
    where
162
        Listener: SocketListener + 'static;
163

164
    /// Run the initialized proxy, accept clients, establish links
165
    async fn run_proxy<Listener>(
166
        listener: Listener,
167
        proxy_info: SharedProxyInfo,
168
        receiver: RequestReceiver<ToxicEvent, ToxicEventResult>,
169
        mut stop: Stop,
170
        closer: Closer,
171
    ) -> io::Result<()>
172
    where
173
        Listener: SocketListener + 'static;
174
}
175

176
/// The proxy runner
177
#[derive(Debug, Copy, Clone)]
178
pub struct ProxyRunner;
179

180
#[async_trait]
181
impl Runner for ProxyRunner {
182
    /// Initialize a proxy, bind to a TCP port but don't start accepting clients
183
    #[instrument(level = "debug")]
184
    async fn initialize_proxy<Listener>(
×
185
        config: ProxyConfig,
186
        initial_toxics: Toxics,
187
    ) -> io::Result<(Listener, SharedProxyInfo)>
188
    where
189
        Listener: SocketListener + 'static,
190
    {
191
        let listener = Listener::bind(&config.listen).await?;
4✔
192

193
        info!(name = ?config.name, proxy = ?config.listen, upstream = ?config.upstream, "Initialized proxy");
8✔
194

195
        let state = Arc::new(ProxyState::new(initial_toxics));
4✔
196

197
        let proxy_info = SharedProxyInfo {
198
            state,
199
            config: Arc::new(config),
2✔
200
        };
201

202
        Ok((listener, proxy_info))
2✔
203
    }
204

205
    /// Run the initialized proxy, accept clients, establish links
206
    #[instrument(level = "debug", skip(listener, receiver, stop, closer))]
207
    async fn run_proxy<Listener>(
×
208
        listener: Listener,
209
        proxy_info: SharedProxyInfo,
210
        receiver: RequestReceiver<ToxicEvent, ToxicEventResult>,
211
        mut stop: Stop,
212
        closer: Closer,
213
    ) -> io::Result<()>
214
    where
215
        Listener: SocketListener + 'static,
216
    {
217
        let state = proxy_info.state;
2✔
218
        let config = proxy_info.config;
2✔
219

220
        tokio::spawn(listen_toxic_events(
6✔
221
            state.clone(),
4✔
222
            receiver,
2✔
223
            stop.clone(),
4✔
224
            config.clone(),
2✔
225
        ));
226

227
        while !stop.stop_received() {
5✔
228
            let maybe_connection = tokio::select! {
14✔
229
                res = listener.accept() => {
5✔
230
                    Ok::<Option<(Listener::Stream, SocketAddr)>, io::Error>(Some(res?))
4✔
231
                },
232
                _ = stop.recv() => {
2✔
233
                    Ok(None)
×
234
                },
235
            }?;
236

237
            if let Some((client_stream, addr)) = maybe_connection {
2✔
238
                debug!(proxy = ?&config, addr = ?&addr, "Accepted client {}", addr);
4✔
239
                let upstream = match Listener::Stream::connect(&config.upstream).await {
4✔
240
                    Ok(upstream) => upstream,
1✔
241
                    Err(err) => {
×
242
                        error!(err = ?err, proxy = ?&config.name, upstream = ?&config.upstream, listen = ?&config.listen, "Unable to open connection to upstream");
×
243
                        // This is not a fatal error, can retry next time another client connects
244
                        continue;
×
245
                    }
246
                };
247

248
                let (client_read, client_write) = client_stream.into_split();
2✔
249
                let (upstream_read, upstream_write) = upstream.into_split();
2✔
250

251
                let client_read =
1✔
252
                    FramedRead::with_capacity(client_read, BytesCodec::new(), READ_BUFFER_SIZE);
×
253
                let client_write = FramedWrite::new(client_write, BytesCodec::new());
1✔
254
                let upstream_read =
1✔
255
                    FramedRead::with_capacity(upstream_read, BytesCodec::new(), READ_BUFFER_SIZE);
×
256
                let upstream_write = FramedWrite::new(upstream_write, BytesCodec::new());
1✔
257

258
                let toxics = state.lock().toxics.clone();
2✔
259

260
                let streams = Streams {
261
                    client_read,
262
                    client_write,
263
                    upstream_read,
264
                    upstream_write,
265
                };
266

267
                let res = create_links(
268
                    state.clone(),
2✔
269
                    addr,
1✔
270
                    &config,
1✔
271
                    &mut stop,
1✔
272
                    toxics,
1✔
273
                    streams,
1✔
274
                    None,
1✔
275
                );
276
                if let Err(err) = res {
1✔
277
                    error!(err = ?err, proxy = ?&config.name, listen = ?&config.listen, "Unable to establish link for proxy");
×
278
                    continue;
×
279
                }
280
            } else {
281
                break;
×
282
            }
283
        }
284
        drop(listener);
×
285
        let _ = closer.close();
×
286
        debug!(proxy = ?&config.name, listen = ?&config.listen, "Shutting down proxy");
×
287
        Ok(())
×
288
    }
289
}
290

291
#[instrument(level = "debug", skip(state, streams, stop))]
5✔
292
fn create_links(
293
    state: Arc<ProxyState>,
294
    addr: SocketAddr,
295
    config: &ProxyConfig,
296
    stop: &mut Stop,
297
    toxics: Toxics,
298
    streams: Streams,
299
    previous_toxic_state_holder: Option<Arc<ToxicStateHolder>>,
300
) -> io::Result<()> {
301
    let mut current_state = state.lock();
2✔
302

303
    if current_state.clients.contains_key(&addr) {
2✔
304
        return Err(io::Error::new(
×
305
            io::ErrorKind::AlreadyExists,
×
306
            format!(
×
307
                "State error: there is already a client connected with this address: {}",
308
                addr
309
            ),
310
        ));
311
    }
312

313
    let (links_stop, links_stopper) = stop.fork();
2✔
314

315
    let toxics_state_holder =
4✔
316
        previous_toxic_state_holder.or_else(|| ToxicStateHolder::for_toxics(&toxics));
317

318
    let mut upstream_link = Link::new(
319
        addr,
1✔
320
        StreamDirection::Upstream,
1✔
321
        config.clone(),
2✔
322
        links_stop.clone(),
1✔
323
    );
324
    let mut client_link = Link::new(
325
        addr,
1✔
326
        StreamDirection::Downstream,
1✔
327
        config.clone(),
1✔
328
        links_stop,
1✔
329
    );
330

331
    let upstream_handle = upstream_link.establish(
1✔
332
        streams.client_read,
1✔
333
        streams.upstream_write,
1✔
334
        toxics.upstream,
1✔
335
        toxics_state_holder.clone(),
1✔
336
    );
337
    let downstream_handle = client_link.establish(
1✔
338
        streams.upstream_read,
1✔
339
        streams.client_write,
1✔
340
        toxics.downstream,
1✔
341
        toxics_state_holder.clone(),
1✔
342
    );
343

344
    let state = state.clone();
2✔
345
    tokio::spawn(async move {
4✔
346
        // No need to listen for the stop signal here, we're ending as soon as one of the tasks have stopped.
347
        let _ = tokio::select! {
8✔
348
            up = upstream_handle => {
2✔
349
                debug!("Upstream joined first");
4✔
350
                up
1✔
351
            },
352
            down = downstream_handle => {
1✔
353
                debug!("Downstream joined first");
×
354
                down
×
355
            }
356
        };
357
        links_stopper.stop();
1✔
358
        let mut state = state.lock();
1✔
359
        state.clients.remove(&addr);
2✔
360
        debug!("Removed client {}", addr);
4✔
361
    });
362

363
    current_state.clients.insert(
2✔
364
        addr,
1✔
365
        Links {
1✔
366
            upstream: upstream_link,
1✔
367
            client: client_link,
1✔
368
            state_holder: toxics_state_holder,
1✔
369
        },
370
    );
371
    Ok(())
1✔
372
}
373

374
#[doc(hidden)]
375
pub async fn listen_toxic_events(
2✔
376
    state: Arc<ProxyState>,
377
    mut receiver: RequestReceiver<ToxicEvent, ToxicEventResult>,
378
    mut stop: Stop,
379
    config: Arc<ProxyConfig>,
380
) {
381
    while !stop.stop_received() {
7✔
382
        let maybe_payload: Option<Payload<ToxicEvent, ToxicEventResult>> = tokio::select! {
15✔
383
            res = receiver.recv() => {
3✔
384
                if let Ok(payload) = res {
3✔
385
                    Some(payload)
1✔
386
                } else {
387
                    None
×
388
                }
389
            },
390
            _ = stop.recv() => None,
3✔
391
        };
392
        if let Some(payload) = maybe_payload {
2✔
393
            process_toxic_event(state.clone(), config.clone(), stop.clone(), payload).await;
3✔
394
        } else {
395
            break;
396
        }
397
    }
398
}
399

400
async fn process_toxic_event(
1✔
401
    state: Arc<ProxyState>,
402
    config: Arc<ProxyConfig>,
403
    stop: Stop,
404
    (request, mut responder): Payload<ToxicEvent, ToxicEventResult>,
405
) {
406
    let new_toxics = {
407
        let mut current_state = state.lock();
2✔
408
        if let Err(err) = update_toxics(request, &mut current_state.toxics) {
1✔
409
            let _ = responder.respond(Err(err.into()));
×
410
            return;
411
        }
412
        current_state.toxics.clone()
2✔
413
    };
414

415
    let old_map = {
416
        let mut current_state = state.lock();
2✔
417
        std::mem::take(&mut current_state.clients)
2✔
418
    };
419

420
    let mut clients = stream::iter(old_map);
1✔
421
    while let Some((addr, links)) = clients.next().await {
4✔
422
        if let Err(err) = recreate_links(
423
            state.clone(),
×
424
            &config,
×
425
            stop.clone(),
×
426
            addr,
×
427
            links,
×
428
            new_toxics.clone(),
×
429
        )
430
        .await
×
431
        {
432
            error!(err = ?err, addr = ?addr, proxy = ?&config.name, "Failed to recreate links for client");
×
433
        }
434
    }
435
    let _ = responder.respond(Ok(()));
1✔
436
}
437

438
async fn recreate_links(
×
439
    state: Arc<ProxyState>,
440
    config: &ProxyConfig,
441
    stop: Stop,
442
    addr: SocketAddr,
443
    links: Links,
444
    new_toxics: Toxics,
445
) -> io::Result<()> {
446
    let (client_read, upstream_write) = links.client.disband().await?;
×
447
    let (upstream_read, client_write) = links.upstream.disband().await?;
×
448
    let streams = Streams {
449
        client_read,
450
        client_write,
451
        upstream_read,
452
        upstream_write,
453
    };
454
    create_links(
455
        state.clone(),
×
456
        addr,
×
457
        config,
×
458
        &mut stop.clone(),
×
459
        new_toxics,
×
460
        streams,
×
461
        links.state_holder,
×
462
    )
463
}
464

465
/// Update the toxics collection in place
466
fn update_toxics(event: ToxicEvent, toxics: &mut Toxics) -> Result<(), NotFoundError> {
1✔
467
    update_toxic_list_in_place(&mut toxics.upstream, event.kind, StreamDirection::Upstream)
3✔
468
        .or_else(|kind| {
2✔
469
            update_toxic_list_in_place(&mut toxics.downstream, kind, StreamDirection::Downstream)
1✔
470
        })
471
        .or(Err(NotFoundError))
1✔
472
}
473

474
/// Errors return when ProxyConfig validation fails
475
#[derive(Debug, Clone, Copy, Error, PartialEq)]
476
pub enum ProxyValidateError {
477
    /// The name field is empty
478
    #[error("name missing")]
479
    MissingName,
480
    /// The upstream field is empty
481
    #[error("upstream missing")]
482
    MissingUpstream,
483
    /// The listen field is empty
484
    #[error("listen address missing")]
485
    MissingListen,
486
}
487

488
#[cfg(test)]
489
mod serde_tests {
490
    use super::*;
491
    use serde_json::{from_str, to_string};
492

493
    #[test]
494
    fn test_ser_and_de() {
495
        let config = ProxyConfig {
496
            name: "foo".to_owned(),
497
            listen: "127.0.0.1:5431".to_owned(),
498
            upstream: "127.0.0.1:5432".to_owned(),
499
            enabled: false,
500
            rand_seed: Some(3),
501
        };
502
        let serialized = to_string(&config).unwrap();
503
        let expected = "{\"name\":\"foo\",\"listen\":\"127.0.0.1:5431\",\"upstream\":\"127.0.0.1:5432\",\"enabled\":false}";
504
        assert_eq!(expected, serialized);
505

506
        let expected = ProxyConfig {
507
            name: "foo".to_owned(),
508
            listen: "127.0.0.1:5431".to_owned(),
509
            upstream: "127.0.0.1:5432".to_owned(),
510
            enabled: false,
511
            rand_seed: None,
512
        };
513

514
        let deserialized = from_str(&serialized).unwrap();
515
        assert_eq!(expected, deserialized);
516
    }
517

518
    #[test]
519
    fn test_optional_enabled() {
520
        let expected = ProxyConfig {
521
            name: "foo".to_owned(),
522
            listen: "127.0.0.1:5431".to_owned(),
523
            upstream: "127.0.0.1:5432".to_owned(),
524
            enabled: true,
525
            rand_seed: None,
526
        };
527
        let input =
528
            "{\"name\":\"foo\",\"listen\":\"127.0.0.1:5431\",\"upstream\":\"127.0.0.1:5432\"}";
529
        let deserialized = from_str(input).unwrap();
530
        assert_eq!(expected, deserialized);
531
    }
532
}
533

534
#[cfg(test)]
535
mod config_tests {
536
    use super::*;
537

538
    #[test]
539
    fn validates_name() {
540
        let config = ProxyConfig {
541
            name: "".to_owned(),
542
            listen: "".to_owned(),
543
            upstream: "".to_owned(),
544
            enabled: true,
545
            rand_seed: None,
546
        };
547
        assert_eq!(config.validate(), Err(ProxyValidateError::MissingName))
548
    }
549

550
    #[test]
551
    fn validates_listen() {
552
        let config = ProxyConfig {
553
            name: "name".to_owned(),
554
            listen: "".to_owned(),
555
            upstream: "bogus_addr".to_owned(),
556
            enabled: true,
557
            rand_seed: None,
558
        };
559
        assert_eq!(config.validate(), Err(ProxyValidateError::MissingListen))
560
    }
561

562
    #[test]
563
    fn validates_upstream() {
564
        let config = ProxyConfig {
565
            name: "name".to_owned(),
566
            listen: "bogus_addr".to_owned(),
567
            upstream: "".to_owned(),
568
            enabled: true,
569
            rand_seed: None,
570
        };
571
        assert_eq!(config.validate(), Err(ProxyValidateError::MissingUpstream))
572
    }
573

574
    #[test]
575
    fn allows_invalid_addresses() {
576
        let config = ProxyConfig {
577
            name: "name".to_owned(),
578
            listen: "bogus_addr".to_owned(),
579
            upstream: "bogus_upstream".to_owned(),
580
            enabled: true,
581
            rand_seed: None,
582
        };
583
        assert_eq!(config.validate(), Ok(()))
584
    }
585
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc