• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Covertness / coap-rs / 16940033320

13 Aug 2025 02:19PM UTC coverage: 81.514% (-0.8%) from 82.27%
16940033320

Pull #119

github

web-flow
Merge f07f8aec3 into 3850dc22b
Pull Request #119: Add client-side support for block-wise observe

68 of 80 new or added lines in 1 file covered. (85.0%)

6 existing lines in 3 files now uncovered.

732 of 898 relevant lines covered (81.51%)

3.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.5
/src/client.rs
1
#[cfg(feature = "dtls")]
2
use crate::dtls::{DtlsConnection, UdpDtlsConfig};
3
use crate::request::RequestBuilder;
4
use coap_lite::{
5
    block_handler::{extending_splice, BlockValue},
6
    error::HandlingError,
7
    CoapOption, CoapRequest, CoapResponse, MessageClass, MessageType, ObserveOption,
8
    Packet as Message, RequestType as Method, ResponseType as Status,
9
};
10
use core::mem;
11

12
use futures::Future;
13
use log::*;
14

15
use regex::Regex;
16
use std::{
17
    collections::BTreeMap,
18
    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
19
    sync::{atomic::AtomicU16, Weak},
20
};
21
use std::{
22
    io::{Error, ErrorKind, Result as IoResult},
23
    pin::Pin,
24
};
25
use std::{sync::Arc, time::Duration};
26
use tokio::sync::{
27
    mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
28
    oneshot, Mutex,
29
};
30
use tokio::time::timeout;
31
use tokio::{
32
    net::{lookup_host, ToSocketAddrs, UdpSocket},
33
    sync::RwLock,
34
};
35
use url::Url;
36
const DEFAULT_RECEIVE_TIMEOUT_SECONDS: u64 = 2; // 2s
37

38
#[derive(Debug, Clone)]
39
pub struct Packet {
40
    pub address: Option<SocketAddr>,
41
    pub message: Message,
42
}
43

44
#[derive(Debug)]
45
pub enum ObserveMessage {
46
    Terminate,
47
}
48
use async_trait::async_trait;
49

50
#[async_trait]
51
/// A basic interface for a transport on the client
52
/// representing a one-to-one connection between a client and server
53
/// timeouts and retries do not need to be implemented by the transport
54
/// if confirmable messages are sent
55
pub trait ClientTransport: Send + Sync {
56
    async fn recv(&self, buf: &mut [u8]) -> std::io::Result<(usize, Option<SocketAddr>)>;
57
    async fn send(&self, buf: &[u8]) -> std::io::Result<usize>;
58
}
59

60
trait TransportExt {
61
    async fn receive_packet(&self) -> IoResult<Option<Packet>>;
62
}
63

64
impl<T: ClientTransport> TransportExt for T {
65
    async fn receive_packet(&self) -> IoResult<Option<Packet>> {
20✔
66
        let mut buf = [0; 1500];
6✔
67
        let (nread, address) = self.recv(&mut buf).await?;
18✔
68
        return match Message::from_bytes(&buf[..nread]).ok() {
5✔
69
            Some(message) => Ok(Some(Packet { address, message })),
3✔
70
            None => Ok(None),
×
71
        };
72
    }
73
}
74

75
/// we only use the token as the identifier, and an empty token to represent empty requests
76
type Token = Vec<u8>;
77
type PacketRegistry = BTreeMap<Token, UnboundedSender<IoResult<Packet>>>;
78

79
#[derive(Clone)]
80
pub struct TransportSynchronizer {
81
    pub(crate) outgoing: Arc<Mutex<PacketRegistry>>,
82
    fail_error: Arc<RwLock<Option<std::io::Error>>>,
83
}
84

85
impl TransportSynchronizer {
86
    pub fn new() -> Self {
4✔
87
        Self {
88
            outgoing: Arc::new(Mutex::new(PacketRegistry::new())),
4✔
89
            fail_error: Arc::new(RwLock::new(None)),
8✔
90
        }
91
    }
92

93
    async fn check_for_error(&self, sender: &UnboundedSender<IoResult<Packet>>) -> Option<()> {
8✔
94
        self.fail_error.read().await.as_ref();
4✔
95
        if let Some(err) = self.fail_error.read().await.as_ref() {
3✔
96
            let _ = sender.send(Err(Self::clone_err(err)));
2✔
97
            return None;
1✔
98
        }
99
        Some(())
1✔
100
    }
101

102
    fn clone_err(error: &std::io::Error) -> std::io::Error {
1✔
103
        let s = error.to_string();
1✔
104
        let k = error.kind();
2✔
105
        std::io::Error::new(k, s)
1✔
106
    }
107

108
    pub async fn fail(self, error: std::io::Error) {
4✔
109
        let error_clone = Self::clone_err(&error);
2✔
110
        let _ = self.fail_error.write().await.insert(error_clone);
2✔
111
        let mut mutex = self.outgoing.lock().await;
1✔
112

113
        let keys: Vec<Vec<u8>> = mutex.keys().cloned().collect();
2✔
114
        for k in keys {
2✔
115
            let error_clone = Self::clone_err(&error);
2✔
116
            let _ = mutex.remove(&k).map(|resp| resp.send(Err(error_clone)));
4✔
117
        }
118
    }
119

120
    pub async fn get_sender(&self, key: &[u8]) -> Option<UnboundedSender<IoResult<Packet>>> {
8✔
121
        self.outgoing
12✔
122
            .lock()
123
            .await
3✔
124
            .get(key)
1✔
125
            .map(UnboundedSender::clone)
3✔
126
    }
127
    /// Sets the sender of a given key,
128
    /// returns the previous key if it was set
129
    pub async fn set_sender(
1✔
130
        &self,
131
        key: Vec<u8>,
132
        sender: UnboundedSender<IoResult<Packet>>,
133
    ) -> Option<UnboundedSender<IoResult<Packet>>> {
134
        self.check_for_error(&sender).await?;
5✔
135
        self.outgoing.lock().await.insert(key, sender)
4✔
136
    }
137
    pub async fn remove_sender(&self, key: &[u8]) -> Option<UnboundedSender<IoResult<Packet>>> {
8✔
138
        self.outgoing.lock().await.remove(key)
7✔
139
    }
140
}
141

142
async fn receive_loop<T: ClientTransport + 'static>(
7✔
143
    transport: Weak<T>,
144
    transport_sync: TransportSynchronizer,
145
) -> std::io::Result<()> {
146
    let err = loop {
5✔
147
        let Some(transport_instance) = transport.upgrade() else {
8✔
148
            // nobody else is listening so we can drop our reference
149
            return Ok(());
1✔
150
        };
151
        // we do a timeout here to ensure that we do not block forever
152
        let Ok(recv_res) = timeout(
153
            Duration::from_millis(300),
6✔
154
            transport_instance.receive_packet(),
4✔
155
        )
156
        .await
27✔
157
        else {
×
158
            continue;
×
159
        };
160
        let option_packet = match recv_res {
4✔
161
            Err(e) => break e,
1✔
162
            Ok(o) => o,
5✔
163
        };
164
        let Some(packet) = option_packet else {
3✔
165
            trace!("unexpected malformed packet received");
×
166
            continue;
×
167
        };
168
        if let Some(ack) = parse_for_ack(&packet) {
10✔
169
            transport_instance.send(&ack).await?;
3✔
170
        }
171

172
        match packet.message.header.code {
3✔
173
            MessageClass::Response(_) => {}
×
174
            m => {
×
175
                debug!("unknown message type {}", m);
×
176
                continue;
×
177
            }
178
        };
179

180
        let token = packet.message.get_token();
8✔
181
        let Some(sender) = transport_sync.get_sender(token).await else {
11✔
182
            info!("received unexpected response for token {:?}", &token);
×
183
            continue;
×
184
        };
185
        let Ok(_) = sender.send(Ok(packet)) else {
8✔
186
            debug!("unexpected drop of sender");
×
187
            continue;
×
188
        };
189
    };
190

191
    let e = Err(Error::new(err.kind(), err.to_string()));
2✔
192
    transport_sync.fail(err).await;
2✔
193
    return e;
1✔
194
}
195

196
pub fn parse_for_ack(packet: &Packet) -> Option<Vec<u8>> {
1✔
197
    match (packet.message.header.get_type(), packet.message.header.code) {
3✔
198
        (MessageType::Confirmable, MessageClass::Response(_)) => Some(make_ack(packet)),
1✔
199
        _ => None,
1✔
200
    }
201
}
202

203
pub fn make_ack(packet: &Packet) -> Vec<u8> {
1✔
204
    let mut ack = Message::new();
1✔
205
    ack.header.set_type(MessageType::Acknowledgement);
1✔
206
    ack.header.message_id = packet.message.header.message_id;
1✔
207
    ack.header.code = MessageClass::Empty;
1✔
208
    return ack.to_bytes().unwrap();
1✔
209
}
210

211
/// a wrapper for transports responsible for retries and timeouts
212
struct CoapClientTransport<T: ClientTransport> {
213
    pub(crate) transport: Arc<T>,
214
    pub(crate) synchronizer: TransportSynchronizer,
215
    pub(crate) retries: usize,
216
    pub(crate) timeout: Duration,
217
}
218

219
impl<T: ClientTransport> Clone for CoapClientTransport<T> {
220
    fn clone(&self) -> Self {
1✔
221
        Self {
222
            transport: self.transport.clone(),
1✔
223
            synchronizer: self.synchronizer.clone(),
2✔
224
            retries: self.retries.clone(),
1✔
225
            timeout: self.timeout.clone(),
1✔
226
        }
227
    }
228
}
229

230
impl<T: ClientTransport> CoapClientTransport<T> {
231
    pub const DEFAULT_NUM_RETRIES: usize = 5;
232
    async fn establish_receiver_for(&self, packet: &Packet) -> UnboundedReceiver<IoResult<Packet>> {
18✔
233
        let (tx, rx) = unbounded_channel();
9✔
234
        let token = packet.message.get_token().to_owned();
9✔
235
        self.synchronizer.set_sender(token, tx).await;
4✔
236
        return rx;
6✔
237
    }
238

239
    /// tries to send a confirmable message with retries and timeouts
240
    async fn try_send_confirmable_message(
5✔
241
        &self,
242
        msg: &Packet,
243
        receiver: &mut UnboundedReceiver<IoResult<Packet>>,
244
    ) -> IoResult<Packet> {
245
        let mut res = Err(Error::new(ErrorKind::InvalidData, "not enough retries"));
10✔
246
        for _ in 0..self.retries {
22✔
247
            res = self.try_send_non_confirmable_message(&msg, receiver).await;
26✔
248
            if res.is_ok() {
6✔
249
                return res;
3✔
250
            }
251
        }
252
        return res;
3✔
253
    }
254

255
    fn encode_message(message: &Message) -> IoResult<Vec<u8>> {
6✔
256
        message
×
257
            .to_bytes()
258
            .map_err(|e| std::io::Error::new(ErrorKind::InvalidData, e.to_string()))
4✔
259
    }
260

261
    async fn try_send_non_confirmable_message(
7✔
262
        &self,
263
        msg: &Packet,
264
        receiver: &mut UnboundedReceiver<IoResult<Packet>>,
265
    ) -> IoResult<Packet> {
266
        let bytes = Self::encode_message(&msg.message)?;
10✔
267
        self.transport.send(&bytes).await?;
14✔
268
        let try_receive: Result<Option<Result<Packet, Error>>, tokio::time::error::Elapsed> =
13✔
269
            timeout(self.timeout, receiver.recv()).await;
×
270
        if let Ok(Some(res)) = try_receive {
10✔
271
            return res;
4✔
272
        }
273
        Err(Error::new(ErrorKind::TimedOut, "send timeout"))
4✔
274
    }
275

276
    async fn do_request_response_for_packet_inner(
7✔
277
        &self,
278
        packet: &Packet,
279
        receiver: &mut UnboundedReceiver<IoResult<Packet>>,
280
    ) -> IoResult<Packet> {
281
        if packet.message.header.get_type() == MessageType::Confirmable {
10✔
282
            return self.try_send_confirmable_message(&packet, receiver).await;
14✔
283
        } else {
284
            return self
2✔
285
                .try_send_non_confirmable_message(&packet, receiver)
1✔
286
                .await;
3✔
287
        }
288
    }
289

290
    pub async fn do_request_response_for_packet(&self, packet: &Packet) -> IoResult<Packet> {
23✔
291
        let mut receiver = self.establish_receiver_for(packet).await;
9✔
292
        let result = self
15✔
293
            .do_request_response_for_packet_inner(packet, &mut receiver)
6✔
294
            .await;
19✔
295
        self.synchronizer
16✔
296
            .remove_sender(packet.message.get_token())
10✔
297
            .await;
14✔
298
        result
4✔
299
    }
300

301
    pub fn from_transport(transport: Arc<T>, synchronizer: TransportSynchronizer) -> Self {
7✔
302
        return Self {
7✔
303
            transport,
7✔
304
            synchronizer,
7✔
305
            retries: Self::DEFAULT_NUM_RETRIES,
×
306
            timeout: Duration::from_secs(DEFAULT_RECEIVE_TIMEOUT_SECONDS),
7✔
307
        };
308
    }
309
}
310

311
pub struct UdpTransport {
312
    pub socket: UdpSocket,
313
    pub peer_addr: SocketAddr,
314
}
315
#[async_trait]
316
impl ClientTransport for UdpTransport {
317
    async fn recv(&self, buf: &mut [u8]) -> std::io::Result<(usize, Option<SocketAddr>)> {
12✔
318
        let (read, addr) = self.socket.recv_from(buf).await?;
5✔
319
        return Ok((read, Some(addr)));
3✔
320
    }
321
    async fn send(&self, buf: &[u8]) -> std::io::Result<usize> {
11✔
322
        self.socket.send_to(buf, self.peer_addr).await
7✔
323
    }
324
}
325

326
/// A CoAP client over UDP. This client can send multicast and broadcasts
327
pub type UdpCoAPClient = CoAPClient<UdpTransport>;
328

329
pub struct CoAPClient<T: ClientTransport> {
330
    transport: CoapClientTransport<T>,
331
    block1_size: usize,
332
    message_id: Arc<AtomicU16>,
333
}
334

335
impl<T: ClientTransport> Clone for CoAPClient<T> {
336
    fn clone(&self) -> Self {
1✔
337
        Self {
338
            transport: self.transport.clone(),
1✔
339
            block1_size: self.block1_size.clone(),
1✔
340
            message_id: self.message_id.clone(),
1✔
341
        }
342
    }
343
}
344

345
/// a receiver used whenever you have a use case involving multiple responses to a single request
346
pub struct MessageReceiver {
347
    synchronizer: TransportSynchronizer,
348
    receiver: UnboundedReceiver<IoResult<Packet>>,
349
    token: Vec<u8>,
350
}
351

352
impl MessageReceiver {
353
    pub async fn receive(&mut self) -> IoResult<Packet> {
4✔
354
        match self.receiver.recv().await {
3✔
355
            Some(Ok(packet)) => Ok(packet),
1✔
356
            Some(Err(e)) => Err(e),
×
357
            None => Err(Error::new(
×
358
                ErrorKind::Other,
359
                "sender dropped by synchronizer",
360
            )),
361
        }
362
    }
363
    pub fn new(
1✔
364
        synchronizer: TransportSynchronizer,
365
        receiver: UnboundedReceiver<IoResult<Packet>>,
366
        token: &[u8],
367
    ) -> Self {
368
        Self {
369
            synchronizer,
370
            receiver,
371
            token: token.to_vec(),
1✔
372
        }
373
    }
374
}
375

376
impl Drop for MessageReceiver {
377
    fn drop(&mut self) {
1✔
378
        let sync = self.synchronizer.clone();
1✔
379
        let tok = std::mem::take(&mut self.token);
1✔
380
        tokio::spawn(async move { sync.remove_sender(&tok).await });
1✔
381
    }
382
}
383

384
impl UdpCoAPClient {
385
    pub async fn new_with_specific_source<A: ToSocketAddrs, B: ToSocketAddrs>(
4✔
386
        bind_addr: A,
387
        peer_addr: B,
388
    ) -> IoResult<Self> {
389
        let socket = UdpSocket::bind(bind_addr).await?;
8✔
390
        Self::new_with_tokio_socket(socket, peer_addr).await
4✔
391
    }
392

393
    pub async fn new_udp<A: ToSocketAddrs>(addr: A) -> IoResult<Self> {
14✔
394
        let sock_addr = lookup_host(addr).await?.next().ok_or(Error::new(
16✔
UNCOV
395
            ErrorKind::InvalidInput,
×
396
            "could not get socket address",
×
397
        ))?;
398
        Ok(match &sock_addr {
12✔
399
            SocketAddr::V4(_) => Self::new_with_specific_source("0.0.0.0:0", sock_addr).await?,
6✔
400
            SocketAddr::V6(_) => Self::new_with_specific_source(":::0", sock_addr).await?,
×
401
        })
402
    }
403

404
    /// Create a client with a `std::net` socket
405
    ///
406
    /// Using a standard socket is useful to get advanced features from socket2 crate
407
    ///
408
    /// # Examples
409
    ///
410
    /// ```
411
    /// # tokio_test::block_on(async {
412
    ///   use socket2::{Socket, Domain, Type};
413
    ///   use coap::UdpCoAPClient;
414
    ///
415
    ///   let socket = Socket::new(Domain::IPV6, Type::DGRAM, None).expect("Standard socket creation failed");
416
    ///   socket.set_multicast_hops_v6(16).expect("Setting multicast hops failed");
417
    ///   let client = UdpCoAPClient::new_with_std_socket(socket.into(), "[::1]:5683").await.expect("Client creation failed");
418
    /// # })
419
    /// ```
420
    pub async fn new_with_std_socket<A: ToSocketAddrs>(
421
        socket: std::net::UdpSocket,
422
        peer_addr: A,
423
    ) -> IoResult<Self> {
424
        socket.set_nonblocking(true)?;
×
425
        let socket = UdpSocket::from_std(socket)?;
×
426
        Self::new_with_tokio_socket(socket, peer_addr).await
×
427
    }
428

429
    async fn new_with_tokio_socket<A: ToSocketAddrs>(
4✔
430
        socket: UdpSocket,
431
        peer_addr: A,
432
    ) -> IoResult<Self> {
433
        let peer_addr = lookup_host(peer_addr).await?.next().ok_or(Error::new(
12✔
UNCOV
434
            ErrorKind::InvalidInput,
×
435
            "could not get socket address",
×
436
        ))?;
437

438
        let transport = UdpTransport { socket, peer_addr };
4✔
439
        Ok(UdpCoAPClient::from_transport(transport))
4✔
440
    }
441

442
    /// Send a request to all CoAP devices.
443
    /// - IPv4 AllCoAP multicast address is '224.0.1.187'
444
    /// - IPv6 AllCoAp multicast addresses are 'ff0?::fd'
445
    /// Parameter segment is used with IPv6 to determine the first octet.
446
    /// It's value can be between 0x0 and 0xf. To address multiple segments,
447
    /// you have to call send_all_coap for each of the segments.
448
    pub async fn send_all_coap(
1✔
449
        &self,
450
        request: &mut CoapRequest<SocketAddr>,
451
        segment: u8,
452
    ) -> IoResult<()> {
453
        assert!(segment <= 0xf);
1✔
454
        let addr = match self.transport.transport.peer_addr {
2✔
455
            SocketAddr::V4(val) => {
1✔
456
                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(224, 0, 1, 187)), val.port())
1✔
457
            }
458
            SocketAddr::V6(val) => SocketAddr::new(
459
                IpAddr::V6(Ipv6Addr::new(
×
460
                    0xff00 + segment as u16,
×
461
                    0,
462
                    0,
463
                    0,
464
                    0,
465
                    0,
466
                    0,
467
                    0xfd,
468
                )),
469
                val.port(),
×
470
            ),
471
        };
472

473
        self.send_multicast(request, &addr).await
3✔
474
    }
475

476
    /// Send a multicast request to multiple devices.
477
    pub async fn send_multicast(
1✔
478
        &self,
479
        request: &mut CoapRequest<SocketAddr>,
480
        addr: &SocketAddr,
481
    ) -> IoResult<()> {
482
        if 0 == request.message.header.message_id {
2✔
483
            request.message.header.message_id = self.gen_message_id();
2✔
484
        }
485
        match request.message.to_bytes() {
2✔
486
            Ok(bytes) => {
1✔
487
                let size = self
4✔
488
                    .transport
489
                    .transport
490
                    .socket
491
                    .send_to(&bytes[..], addr)
1✔
492
                    .await?;
4✔
493
                if size == bytes.len() {
2✔
494
                    Ok(())
1✔
495
                } else {
496
                    Err(Error::new(ErrorKind::Other, "send length error"))
×
497
                }
498
            }
499
            Err(_) => Err(Error::new(ErrorKind::InvalidInput, "packet error")),
×
500
        }
501
    }
502

503
    pub fn set_broadcast(&self, value: bool) -> IoResult<()> {
1✔
504
        self.transport.transport.socket.set_broadcast(value)
1✔
505
    }
506

507
    /// creates a receiver based on a specific request
508
    /// this method can be used if you send a multicast request and
509
    /// expect multiple responses.
510
    /// only use this method if you know what you are doing
511
    /// ```
512
    ///
513
    /// use coap_lite::{
514
    ///     RequestType
515
    /// };
516
    /// use coap::request::RequestBuilder;
517
    /// use coap::client::UdpCoAPClient;
518
    ///
519
    /// async fn foo() {
520
    ///   let segment = 0x0;
521
    ///   let client = UdpCoAPClient::new_udp("127.0.0.1:5683")
522
    ///          .await
523
    ///          .unwrap();
524
    ///   let mut request = RequestBuilder::new("test-echo", RequestType::Get)
525
    ///       .data(Some(vec![0x51, 0x55, 0x77, 0xE8]))
526
    ///       .confirmable(true)
527
    ///       .build();
528
    ///
529
    ///   let mut receiver = client.create_receiver_for(&request).await;
530
    ///   client.send_all_coap(&mut request, segment).await.unwrap();
531
    ///   loop {
532
    ///      let recv_packet = receiver.receive().await.unwrap();
533
    ///      assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
534
    ///   }
535
    /// }
536
    /// ```
537

538
    pub async fn create_receiver_for(&self, request: &CoapRequest<SocketAddr>) -> MessageReceiver {
4✔
539
        let (tx, rx) = unbounded_channel();
2✔
540
        let key = request.message.get_token().to_vec();
2✔
541
        self.transport.synchronizer.set_sender(key, tx).await;
1✔
542
        return MessageReceiver::new(
1✔
543
            self.transport.synchronizer.clone(),
2✔
544
            rx,
1✔
545
            request.message.get_token(),
1✔
546
        );
547
    }
548
}
549

550
#[cfg(feature = "dtls")]
551
impl CoAPClient<DtlsConnection> {
552
    pub async fn from_udp_dtls_config(config: UdpDtlsConfig) -> IoResult<Self> {
4✔
553
        Ok(CoAPClient::from_transport(
1✔
554
            DtlsConnection::try_new(config).await?,
4✔
555
        ))
556
    }
557
}
558

559
impl<T: ClientTransport + 'static> CoAPClient<T> {
560
    const MAX_PAYLOAD_BLOCK: usize = 1024;
561
    /// Create a CoAP client with a chosen transport type
562

563
    pub fn from_transport(transport: T) -> Self {
7✔
564
        let synchronizer = TransportSynchronizer::new();
14✔
565
        let transport_arc = Arc::new(transport);
14✔
566
        let message_id: u16 = rand::random();
14✔
567
        // spawn receive loop to handle responses
568
        tokio::spawn(receive_loop(
14✔
569
            Arc::downgrade(&transport_arc),
7✔
570
            synchronizer.clone(),
7✔
571
        ));
572
        CoAPClient {
573
            transport: CoapClientTransport::from_transport(transport_arc.clone(), synchronizer),
7✔
574
            block1_size: Self::MAX_PAYLOAD_BLOCK,
575
            message_id: Arc::new(AtomicU16::new(message_id)),
14✔
576
        }
577
    }
578
    /// Execute a single get request with a coap url
579
    pub async fn get(url: &str) -> IoResult<CoapResponse> {
4✔
580
        Self::request(url, Method::Get, None).await
3✔
581
    }
582

583
    /// Execute a single get request with a coap url and a specific timeout.
584
    pub async fn get_with_timeout(url: &str, timeout: Duration) -> IoResult<CoapResponse> {
4✔
585
        Self::request_with_timeout(url, Method::Get, None, timeout).await
3✔
586
    }
587

588
    /// Execute a single post request with a coap url using udp
589
    pub async fn post(url: &str, data: Vec<u8>) -> IoResult<CoapResponse> {
4✔
590
        Self::request(url, Method::Post, Some(data)).await
3✔
591
    }
592

593
    /// Execute a single post request with a coap url using udp
594
    pub async fn post_with_timeout(
595
        url: &str,
596
        data: Vec<u8>,
597
        timeout: Duration,
598
    ) -> IoResult<CoapResponse> {
599
        Self::request_with_timeout(url, Method::Post, Some(data), timeout).await
×
600
    }
601

602
    /// Execute a put request with a coap url using udp
603
    pub async fn put(url: &str, data: Vec<u8>) -> IoResult<CoapResponse> {
4✔
604
        Self::request(url, Method::Put, Some(data)).await
3✔
605
    }
606

607
    /// Execute a single put request with a coap url using udp
608
    pub async fn put_with_timeout(
609
        url: &str,
610
        data: Vec<u8>,
611
        timeout: Duration,
612
    ) -> IoResult<CoapResponse> {
613
        Self::request_with_timeout(url, Method::Put, Some(data), timeout).await
×
614
    }
615

616
    /// Execute a single delete request with a coap url using udp
617
    pub async fn delete(url: &str) -> IoResult<CoapResponse> {
4✔
618
        Self::request(url, Method::Delete, None).await
3✔
619
    }
620

621
    /// Execute a single delete request with a coap url using udp
622
    pub async fn delete_with_timeout(url: &str, timeout: Duration) -> IoResult<CoapResponse> {
×
623
        Self::request_with_timeout(url, Method::Delete, None, timeout).await
×
624
    }
625

626
    /// Execute a single request (GET, POST, PUT, DELETE) with a coap url using udp
627
    pub async fn request(
1✔
628
        url: &str,
629
        method: Method,
630
        data: Option<Vec<u8>>,
631
    ) -> IoResult<CoapResponse> {
632
        let (domain, port, path, queries) = Self::parse_coap_url(url)?;
2✔
633
        let client = UdpCoAPClient::new_udp((domain.as_str(), port)).await?;
3✔
634
        let request = RequestBuilder::new(&path, method)
2✔
635
            .queries(queries)
1✔
636
            .domain(domain)
1✔
637
            .data(data)
1✔
638
            .build();
639
        client.send(request).await
3✔
640
    }
641

642
    /// Execute a single request (GET, POST, PUT, DELETE) with a coap url and a specfic timeout
643
    /// using udp
644
    pub async fn request_with_timeout(
1✔
645
        url: &str,
646
        method: Method,
647
        data: Option<Vec<u8>>,
648
        timeout: Duration,
649
    ) -> IoResult<CoapResponse> {
650
        let (domain, port, path, queries) = Self::parse_coap_url(url)?;
2✔
651
        let mut client = UdpCoAPClient::new_udp((domain.as_str(), port)).await?;
2✔
652
        client.set_receive_timeout(timeout);
1✔
653
        let request = RequestBuilder::new(&path, method)
1✔
654
            .queries(queries)
1✔
655
            .domain(domain)
1✔
656
            .data(data)
1✔
657
            .build();
658

659
        client.send(request).await
3✔
660
    }
661

662
    /// Send a Request via the given transport, and receive a response.
663
    /// users are responsible for filling meaningful fields in the request
664
    /// this method supports blockwise requests
665
    pub async fn send(&self, mut request: CoapRequest<SocketAddr>) -> IoResult<CoapResponse> {
26✔
666
        let first_response = self.send_request(&mut request).await?;
14✔
667
        request.response = Some(first_response);
7✔
668
        self.receive(&mut request).await
8✔
669
    }
670

671
    pub async fn observe<H: FnMut(Message) + Send + 'static>(
8✔
672
        &self,
673
        resource_path: &str,
674
        handler: H,
675
    ) -> IoResult<oneshot::Sender<ObserveMessage>>
676
    where
677
        T: 'static + Send + Sync,
678
    {
679
        let register_packet = RequestBuilder::new(resource_path, Method::Get).build();
16✔
680
        self.observe_with(register_packet, handler).await
16✔
681
    }
682

683
    /// Observe a resource with the handler and specified timeout using the given transport.
684
    /// Use the oneshot sender to cancel observation. If this sender is dropped without explicitly
685
    /// cancelling it, the observation will continue forever.
686
    pub async fn observe_with_timeout<H: FnMut(Message) + Send + 'static>(
687
        &mut self,
688
        resource_path: &str,
689
        handler: H,
690
        timeout: Duration,
691
    ) -> IoResult<oneshot::Sender<ObserveMessage>>
692
    where
693
        T: 'static + Send + Sync,
694
    {
695
        self.set_receive_timeout(timeout);
×
696
        self.observe(resource_path, handler).await
×
697
    }
698

699
    /// observe a resource with a given transport using your own request
700
    /// Use this method if you need to set some specific options in your
701
    /// requests. This method will add observe flags and a message id as a fallback
702
    /// Use this method if you plan on re-using the same client for requests
703
    pub async fn observe_with<H: FnMut(Message) + Send + 'static>(
8✔
704
        &self,
705
        request: CoapRequest<SocketAddr>,
706
        mut handler: H,
707
    ) -> IoResult<oneshot::Sender<ObserveMessage>> {
708
        let this = self.clone();
16✔
709
        let mut register_packet = request.clone();
16✔
710
        if 0 == register_packet.message.header.message_id {
16✔
711
            register_packet.message.header.message_id = self.gen_message_id();
16✔
712
        }
713
        if register_packet.message.get_token().is_empty() {
16✔
714
            register_packet
16✔
NEW
715
                .message
×
716
                .set_token(self.gen_message_id().to_be_bytes().to_vec());
8✔
717
        }
718
        register_packet.set_observe_flag(ObserveOption::Register);
8✔
719

720
        let req_token = register_packet.message.get_token().to_vec();
16✔
721
        let resource_path = register_packet.get_path();
16✔
722

723
        let (tx_observe, mut rx_observe) = unbounded_channel();
16✔
724
        self.transport
24✔
725
            .synchronizer
×
726
            .set_sender(req_token.clone(), tx_observe)
16✔
727
            .await;
24✔
728

729
        // Ensure we clean up the long-lived observe mapping if registration fails
730
        if let Err(e) = self
32✔
731
            .send_observe_registration(&mut register_packet, &mut rx_observe, &mut handler)
8✔
732
            .await
32✔
733
        {
734
            self.transport.synchronizer.remove_sender(&req_token).await;
4✔
735
            return Err(e);
2✔
736
        }
737

738
        let (tx, rx) = oneshot::channel();
12✔
739
        let observe_path = String::from(resource_path);
6✔
740

741
        tokio::spawn(async move {
12✔
742
            // Template used to create a fresh continuation request per notification
743
            // to avoid cross-notification state leakage.
744
            let continuation_template = register_packet.clone();
3✔
745
            let mut rx_pinned: Pin<
×
746
                Box<
×
747
                    dyn Future<
×
748
                            Output = std::result::Result<ObserveMessage, oneshot::error::RecvError>,
×
749
                        > + Send,
×
750
                >,
×
751
            > = Box::pin(rx);
6✔
UNCOV
752
            loop {
×
753
                tokio::select! {
25✔
754
                    sock_rx = rx_observe.recv() => {
5✔
755
                        let mut blockwise_request = continuation_template.clone();
2✔
756
                        this.receive_and_handle_message_observe(&mut blockwise_request, sock_rx?, &mut handler).await;
4✔
757
                    }
758
                    observe = &mut rx_pinned => {
6✔
759
                        match observe {
3✔
760
                            Ok(ObserveMessage::Terminate) => {
×
761
                                this.terminate_observe(&observe_path, req_token).await;
2✔
762
                                break;
×
763
                            }
764
                            // if the receiver is dropped, we change the future to wait forever
765
                            Err(_) => {
×
766
                                debug!("observe continuing forever");
6✔
767
                                rx_pinned  = Box::pin(futures::future::pending())
4✔
768
                            },
769
                        }
770
                    }
771

772
                }
773
            }
774
            Some(())
1✔
775
        });
776
        return Ok(tx);
6✔
777
    }
778

779
    async fn send_observe_registration<H: FnMut(Message) + Send + 'static>(
8✔
780
        &self,
781
        register_packet: &mut CoapRequest<SocketAddr>,
782
        receiver: &mut UnboundedReceiver<IoResult<Packet>>,
783
        handler: &mut H,
784
    ) -> Result<(), std::io::Error> {
785
        // Bypass the first layer of "do_request_response_for_packet" to prevent the
786
        // long-lasting observe-receiver from being removed
787
        let response = self
32✔
NEW
788
            .transport
×
789
            .do_request_response_for_packet_inner(
790
                &Packet {
8✔
791
                    address: None,
8✔
792
                    message: register_packet.message.to_owned(),
8✔
793
                },
NEW
794
                receiver,
×
795
            )
796
            .await?;
32✔
797
        // Check if the server accepted the observe request
798
        let coap_response = CoapResponse {
799
            message: response.message.clone(),
8✔
800
        };
801
        if *coap_response.get_status() != Status::Content {
16✔
802
            return Err(Error::new(
4✔
NEW
803
                ErrorKind::NotFound,
×
NEW
804
                "the resource was not found",
×
805
            ));
806
        }
807
        // Finalize a potential block-wise transfer and pass result to handler
808
        self.receive_and_handle_message_observe(register_packet, Ok(response), handler)
12✔
809
            .await;
14✔
810
        Ok(())
6✔
811
    }
812

813
    async fn terminate_observe(&self, observe_path: &str, req_token: Vec<u8>) {
4✔
814
        let mut deregister_packet = CoapRequest::<SocketAddr>::new();
1✔
815
        deregister_packet.message.header.message_id = self.gen_message_id();
2✔
816
        deregister_packet.set_observe_flag(ObserveOption::Deregister);
1✔
817
        deregister_packet.set_path(observe_path);
1✔
818
        deregister_packet.message.set_token(req_token);
1✔
819

820
        let _ = self
3✔
821
            .transport
×
822
            .do_request_response_for_packet(&Packet {
1✔
823
                address: None,
1✔
824
                message: deregister_packet.message,
1✔
825
            })
826
            .await;
3✔
827
    }
828

829
    async fn receive_and_handle_message_observe<H: FnMut(Message) + Send + 'static>(
6✔
830
        &self,
831
        request: &mut CoapRequest<SocketAddr>,
832
        socket_result: IoResult<Packet>,
833
        handler: &mut H,
834
    ) {
835
        match socket_result {
6✔
836
            Ok(response) => {
6✔
837
                if let Some(block2) = CoAPClient::<T>::get_block2_option(&response.message) {
12✔
838
                    if CoAPClient::<T>::contains_more_blocks(&response.message) {
4✔
839
                        // Start of a new blockwise transfer.
840
                        // Receive the rest before passing it on to the user-defined handler.
841
                        request.response = Some(CoapResponse {
2✔
842
                            message: response.message.clone(),
2✔
843
                        });
844
                        // Use a different token for the short-lived blockwise continuation to avoid
845
                        // interfering with the long-lived observe mapping keyed by the original token.
NEW
846
                        request
×
NEW
847
                            .message
×
848
                            .set_token(self.gen_message_id().to_be_bytes().to_vec());
2✔
849
                        request.message.header.message_id = self.gen_message_id();
2✔
850
                        request.message.clear_option(CoapOption::Observe);
2✔
851
                        request.message.clear_option(CoapOption::Block2);
2✔
852

853
                        let mut next_block2 = block2.clone();
2✔
854
                        next_block2.num += 1;
2✔
855
                        next_block2.more = false;
2✔
856
                        request
2✔
NEW
857
                            .message
×
858
                            .add_option_as::<BlockValue>(CoapOption::Block2, next_block2);
4✔
859

860
                        let full_datagram = self.receive(request).await;
4✔
861
                        // Pass the message on to the user-defined handler
862
                        if let Ok(full_datagram) = full_datagram {
4✔
863
                            handler(full_datagram.message.clone());
4✔
864
                        }
865
                    } else {
866
                        // A full datagram has been received
867
                        // Pass the message on to the user-defined handler
NEW
868
                        handler(response.message);
×
869
                    }
870
                } else {
871
                    // A full datagram has been received
872
                    // Pass the message on to the user-defined handler
873
                    handler(response.message);
8✔
874
                }
875
            }
876
            Err(e) => match e.kind() {
×
877
                ErrorKind::WouldBlock => {
×
878
                    info!("Observe timeout");
×
879
                }
880
                _ => warn!("observe failed {:?}", e),
×
881
            },
882
        };
883
    }
884

885
    /// sends a request through the transport. If a request is confirmable, it will attempt
886
    /// retries until receiving a response. requests sent using a multicast-address should be non-confirmable
887
    /// the user is responsible for setting meaningful fields in the request
888
    /// Do not use this method unless you need low-level control over the protocol (e.g.,
889
    /// multicast), instead use send for client applications.
890
    pub async fn send_single_request(
5✔
891
        &self,
892
        request: &CoapRequest<SocketAddr>,
893
    ) -> IoResult<CoapResponse> {
894
        let response = self
26✔
895
            .transport
×
896
            .do_request_response_for_packet(&Packet {
6✔
897
                address: None,
4✔
898
                message: request.message.to_owned(),
5✔
899
            })
900
            .await?;
23✔
901
        Ok(CoapResponse {
3✔
902
            message: response.message,
5✔
903
        })
904
    }
905

906
    /// low-level method to send a a request supporting block1 option based on
907
    /// the block size set in the client
908
    async fn send_request(&self, request: &mut CoapRequest<SocketAddr>) -> IoResult<CoapResponse> {
20✔
909
        let request_length = request.message.payload.len();
10✔
910
        if request_length <= self.block1_size {
6✔
911
            if 0 == request.message.header.message_id {
9✔
912
                request.message.header.message_id = self.gen_message_id();
8✔
913
            }
914
            return self.send_single_request(request).await;
15✔
915
        }
916
        let payload = std::mem::take(&mut request.message.payload);
1✔
917
        let mut it = payload.chunks(self.block1_size).enumerate().peekable();
2✔
918
        let mut result = Err(Error::new(ErrorKind::Other, "unknown error occurred"));
2✔
919

920
        while let Some((idx, elem)) = it.next() {
3✔
921
            let more_blocks = it.peek().is_some();
2✔
922
            let block = BlockValue::new(idx, more_blocks, self.block1_size)
2✔
923
                .map_err(|_| Error::new(ErrorKind::Other, "could not set block size"))?;
1✔
924

925
            request.message.clear_option(CoapOption::Block1);
1✔
926
            request
2✔
927
                .message
×
928
                .add_option_as::<BlockValue>(CoapOption::Block1, block.clone());
2✔
929
            request.message.payload = elem.to_vec();
1✔
930

931
            request.message.header.message_id = self.gen_message_id();
1✔
932
            let resp = self.send_single_request(request).await?;
4✔
933
            // continue receiving responses until last element
934
            if it.peek().is_some() {
3✔
935
                let maybe_block1 = resp
5✔
936
                    .message
×
937
                    .get_first_option_as::<BlockValue>(CoapOption::Block1)
2✔
938
                    .ok_or(Error::new(
2✔
UNCOV
939
                        ErrorKind::Unsupported,
×
940
                        "endpoint does not support blockwise transfers. Try setting block1_size to a larger value",
×
941
                    ))?;
942
                let block1_resp = maybe_block1.map_err(|_| {
2✔
943
                    Error::new(
×
944
                        ErrorKind::InvalidData,
×
945
                        "endpoint responded with invalid block",
×
946
                    )
947
                })?;
948
                //TODO: negotiate smaller block size
949
                if block1_resp.size_exponent != block.size_exponent {
1✔
950
                    return Err(Error::new(
×
951
                        ErrorKind::Unsupported,
×
952
                        "negotiating block size is currently unsupported",
×
953
                    ));
954
                }
955
            }
956
            result = Ok(resp);
1✔
957
        }
958
        return result;
1✔
959
    }
960

961
    /// Receive a response support block-wise.
962
    async fn receive(&self, request: &mut CoapRequest<SocketAddr>) -> IoResult<CoapResponse> {
14✔
963
        let mut block2_state = BlockState::default();
7✔
964
        loop {
1✔
965
            match Self::intercept_response(request, &mut block2_state) {
11✔
966
                Ok(true) => {
×
967
                    request.message.header.message_id = self.gen_message_id();
2✔
968
                    let resp = self.send_single_request(request).await?;
4✔
969
                    request.response = Some(resp);
2✔
970
                }
971
                Err(err) => {
×
972
                    error!("intercept response error: {:?}", err);
×
973
                    return Err(Error::new(ErrorKind::Interrupted, "packet error"));
×
974
                }
975
                Ok(false) => {
×
976
                    break;
×
977
                }
978
            }
979
        }
980
        Ok(CoapResponse {
3✔
981
            message: request.response.as_ref().unwrap().message.clone(),
7✔
982
        })
983
    }
984

985
    /// Set the receive timeout.
986
    pub fn set_receive_timeout(&mut self, dur: Duration) {
1✔
987
        self.transport.timeout = dur;
1✔
988
    }
989

990
    pub fn set_transport_retries(&mut self, num_retries: usize) {
1✔
991
        self.transport.retries = num_retries;
1✔
992
    }
993

994
    /// Set the maximum size for a block1 request. Default is 1024 bytes
995
    pub fn set_block1_size(&mut self, block1_max_bytes: usize) {
1✔
996
        self.block1_size = block1_max_bytes;
1✔
997
    }
998

999
    fn parse_coap_url(url: &str) -> IoResult<(String, u16, String, Vec<Vec<u8>>)> {
1✔
1000
        let url_params = match Url::parse(url) {
1✔
1001
            Ok(url_params) => url_params,
1✔
1002
            Err(_) => return Err(Error::new(ErrorKind::InvalidInput, "url error")),
1✔
1003
        };
1004

1005
        let host = match url_params.host_str() {
2✔
1006
            Some("") => return Err(Error::new(ErrorKind::InvalidInput, "host error")),
2✔
1007
            Some(h) => h,
1✔
1008
            None => return Err(Error::new(ErrorKind::InvalidInput, "host error")),
1✔
1009
        };
1010
        let host = Regex::new(r"^\[(.*?)]$")
1✔
1011
            .unwrap()
1012
            .replace(&host, "$1")
1✔
1013
            .to_string();
1014

1015
        let port = match url_params.port() {
1✔
1016
            Some(p) => p,
1✔
1017
            None => 5683,
1✔
1018
        };
1019

1020
        let path = url_params.path().to_string();
1✔
1021

1022
        let queries = url_params
1✔
1023
            .query()
1024
            .map(|q| q.split("&").map(|qi| qi.as_bytes().to_vec()).collect())
5✔
1025
            .unwrap_or(vec![]);
2✔
1026

1027
        return Ok((host.to_string(), port, path, queries));
2✔
1028
    }
1029

1030
    fn gen_message_id(&self) -> u16 {
3✔
1031
        self.message_id
4✔
1032
            .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
3✔
1033
    }
1034

1035
    fn intercept_response(
3✔
1036
        request: &mut CoapRequest<SocketAddr>,
1037
        state: &mut BlockState,
1038
    ) -> std::result::Result<bool, HandlingError> {
1039
        let contains_more = Self::handle_blockwise(request, state)?;
4✔
1040
        if contains_more {
3✔
1041
            return Ok(true);
1✔
1042
        }
1043

1044
        Ok(false)
4✔
1045
    }
1046

1047
    // Handle blockwise transfers.
1048
    // Returns true if the datagram contains more blocks.
1049
    // Returns false if it is the last block or if the datagram is not part of a blockwise transfer.
1050
    fn handle_blockwise(
3✔
1051
        request: &mut CoapRequest<SocketAddr>,
1052
        state: &mut BlockState,
1053
    ) -> std::result::Result<bool, HandlingError> {
1054
        let packet = request.response.as_ref().unwrap().message.clone();
4✔
1055

1056
        if let Some(block2) = Self::get_block2_option(&packet) {
7✔
1057
            if state.cached_payload.is_none() {
3✔
1058
                state.cached_payload = Some(Vec::new());
1✔
1059
            }
1060
            let cached_payload = state.cached_payload.as_mut().unwrap();
2✔
1061
            let payload_offset = usize::from(block2.num) * block2.size();
1✔
1062
            extending_splice(
1063
                cached_payload,
×
1064
                payload_offset..payload_offset + block2.size(),
2✔
1065
                packet.payload.iter().copied(),
2✔
1066
                16 * 1024,
1✔
1067
            )
1068
            .map_err(HandlingError::internal)?;
2✔
1069

1070
            if Self::contains_more_blocks(&packet) {
1✔
1071
                // Prepare request for requesting next block
1072
                request.message.clear_option(CoapOption::Block2);
1✔
1073
                let mut next_block2 = block2.clone();
1✔
1074
                next_block2.num += 1;
1✔
1075
                next_block2.more = false;
1✔
1076
                request
1✔
1077
                    .message
×
1078
                    .add_option_as::<BlockValue>(CoapOption::Block2, next_block2);
2✔
1079
                return Ok(true);
1✔
1080
            } else {
1081
                let cached_payload = mem::take(&mut state.cached_payload).unwrap();
2✔
1082
                request.response.as_mut().unwrap().message.payload = cached_payload;
1✔
1083
            }
1084
        }
1085

1086
        Ok(false)
4✔
1087
    }
1088

1089
    fn get_block2_option(packet: &Message) -> Option<BlockValue> {
4✔
NEW
1090
        packet
×
1091
            .get_first_option_as::<BlockValue>(CoapOption::Block2)
3✔
1092
            .and_then(|x| x.ok())
6✔
1093
    }
1094

1095
    // Check the Block2 option in the packet and determine if the datagram contains more blocks or
1096
    // if it is the last block.
1097
    // All packets that are part of a blockwise transfer will be assembled in the BlockState.cached_payload.
1098
    fn contains_more_blocks(packet: &Message) -> bool {
1✔
1099
        if let Some(block2) = Self::get_block2_option(packet) {
1✔
1100
            if block2.more {
1✔
NEW
1101
                return true;
×
1102
            }
1103
        }
NEW
1104
        false
×
1105
    }
1106
}
1107

1108
#[derive(Debug, Clone, Default)]
1109
pub struct BlockState {
1110
    cached_payload: Option<Vec<u8>>,
1111
}
1112

1113
#[cfg(test)]
1114
mod test {
1115

1116
    use tokio::time;
1117

1118
    use crate::server::test::spawn_server;
1119

1120
    use super::super::*;
1121
    use super::*;
1122
    use std::ops::DerefMut;
1123
    use std::str;
1124
    use std::sync::atomic::{AtomicU32, Ordering};
1125

1126
    #[test]
1127
    fn test_parse_coap_url_good_url() {
1128
        assert!(UdpCoAPClient::parse_coap_url("coap://127.0.0.1").is_ok());
1129
        assert!(UdpCoAPClient::parse_coap_url("coap://127.0.0.1:5683").is_ok());
1130
        assert!(UdpCoAPClient::parse_coap_url("coap://[::1]").is_ok());
1131
        assert!(UdpCoAPClient::parse_coap_url("coap://[::1]:5683").is_ok());
1132
        assert!(UdpCoAPClient::parse_coap_url("coap://[bbbb::9329:f033:f558:7418]").is_ok());
1133
        assert!(UdpCoAPClient::parse_coap_url("coap://[bbbb::9329:f033:f558:7418]:5683").is_ok());
1134
        assert!(UdpCoAPClient::parse_coap_url("coap://127.0.0.1/?hello=world").is_ok());
1135
    }
1136

1137
    #[test]
1138
    fn test_parse_coap_url_bad_url() {
1139
        assert!(UdpCoAPClient::parse_coap_url("coap://127.0.0.1:65536").is_err());
1140
        assert!(UdpCoAPClient::parse_coap_url("coap://").is_err());
1141
        assert!(UdpCoAPClient::parse_coap_url("coap://:5683").is_err());
1142
        assert!(UdpCoAPClient::parse_coap_url("127.0.0.1").is_err());
1143
    }
1144

1145
    async fn request_handler(req: Box<CoapRequest<SocketAddr>>) -> Box<CoapRequest<SocketAddr>> {
1146
        tokio::time::sleep(Duration::from_secs(1)).await;
1147
        req
1148
    }
1149

1150
    #[test]
1151
    fn test_parse_queries() {
1152
        if let Ok((_, _, _, queries)) =
1153
            UdpCoAPClient::parse_coap_url("coap://127.0.0.1/?hello=world&test1=test2")
1154
        {
1155
            assert_eq!(
1156
                vec![
1157
                    "hello=world".as_bytes().to_vec(),
1158
                    "test1=test2".as_bytes().to_vec()
1159
                ],
1160
                queries
1161
            );
1162
        } else {
1163
            error!("Parse Queries failed");
1164
        }
1165
    }
1166

1167
    #[tokio::test]
1168
    async fn test_get_url() {
1169
        let resp = UdpCoAPClient::get("coap://coap.me:5683/hello")
1170
            .await
1171
            .unwrap();
1172
        assert_eq!(resp.message.payload, b"world".to_vec());
1173
    }
1174

1175
    #[tokio::test]
1176
    async fn test_get_url_timeout() {
1177
        let server_port = server::test::spawn_server("127.0.0.1:0", request_handler)
1178
            .recv()
1179
            .await
1180
            .unwrap();
1181

1182
        let error = UdpCoAPClient::get_with_timeout(
1183
            &format!("coap://127.0.0.1:{}/Rust", server_port),
1184
            Duration::new(0, 0),
1185
        )
1186
        .await
1187
        .unwrap_err();
1188
        assert_eq!(error.kind(), ErrorKind::TimedOut);
1189
    }
1190

1191
    #[tokio::test]
1192
    async fn test_get() {
1193
        let domain = "coap.me";
1194
        let client = UdpCoAPClient::new_udp((domain, 5683)).await.unwrap();
1195
        let resp = client
1196
            .send(
1197
                RequestBuilder::request_path(
1198
                    "/hello",
1199
                    Method::Get,
1200
                    None,
1201
                    vec![],
1202
                    Some(domain.to_string()),
1203
                )
1204
                .build(),
1205
            )
1206
            .await
1207
            .unwrap();
1208
        assert_eq!(resp.message.payload, b"world".to_vec());
1209
    }
1210
    #[tokio::test]
1211
    async fn test_post_url() {
1212
        let resp = UdpCoAPClient::post("coap://coap.me:5683/validate", b"world".to_vec())
1213
            .await
1214
            .unwrap();
1215
        assert_eq!(resp.message.payload, b"POST OK".to_vec());
1216
        let resp = UdpCoAPClient::post("coap://coap.me:5683/validate", b"test".to_vec())
1217
            .await
1218
            .unwrap();
1219
        assert_eq!(resp.message.payload, b"POST OK".to_vec());
1220
    }
1221

1222
    #[tokio::test]
1223
    async fn test_post() {
1224
        let domain = "coap.me";
1225
        let client = UdpCoAPClient::new_udp((domain, 5683)).await.unwrap();
1226
        let resp = client
1227
            .send(
1228
                RequestBuilder::request_path(
1229
                    "/validate",
1230
                    Method::Post,
1231
                    Some(b"world".to_vec()),
1232
                    vec![],
1233
                    Some(domain.to_string()),
1234
                )
1235
                .build(),
1236
            )
1237
            .await
1238
            .unwrap();
1239
        assert_eq!(resp.message.payload, b"POST OK".to_vec());
1240
    }
1241

1242
    #[tokio::test]
1243
    async fn test_put_url() {
1244
        let resp = UdpCoAPClient::put("coap://coap.me:5683/create1", b"world".to_vec())
1245
            .await
1246
            .unwrap();
1247
        assert_eq!(resp.message.payload, b"Created".to_vec());
1248
        let resp = UdpCoAPClient::put("coap://coap.me:5683/create1", b"test".to_vec())
1249
            .await
1250
            .unwrap();
1251
        assert_eq!(resp.message.payload, b"Created".to_vec());
1252
    }
1253

1254
    #[tokio::test]
1255
    async fn test_put() {
1256
        let domain = "coap.me";
1257
        let client = UdpCoAPClient::new_udp((domain, 5683)).await.unwrap();
1258
        let resp = client
1259
            .send(
1260
                RequestBuilder::new("/create1", Method::Put)
1261
                    .data(Some(b"world".to_vec()))
1262
                    .domain(domain.to_string())
1263
                    .build(),
1264
            )
1265
            .await
1266
            .unwrap();
1267
        assert_eq!(resp.message.payload, b"Created".to_vec());
1268
    }
1269

1270
    #[tokio::test]
1271
    async fn test_delete_url() {
1272
        let resp = UdpCoAPClient::delete("coap://coap.me:5683/validate")
1273
            .await
1274
            .unwrap();
1275
        assert_eq!(resp.message.payload, b"DELETE OK".to_vec());
1276
        let resp = UdpCoAPClient::delete("coap://coap.me:5683/validate")
1277
            .await
1278
            .unwrap();
1279
        assert_eq!(resp.message.payload, b"DELETE OK".to_vec());
1280
    }
1281

1282
    #[tokio::test]
1283
    async fn test_delete() {
1284
        let domain = "coap.me";
1285
        let client = UdpCoAPClient::new_udp((domain, 5683)).await.unwrap();
1286
        let resp = client
1287
            .send(
1288
                RequestBuilder::new("/validate", Method::Delete)
1289
                    .domain(domain.to_string())
1290
                    .build(),
1291
            )
1292
            .await
1293
            .unwrap();
1294
        assert_eq!(resp.message.payload, b"DELETE OK".to_vec());
1295
    }
1296

1297
    #[tokio::test]
1298
    async fn test_set_broadcast() {
1299
        let client = UdpCoAPClient::new_udp(("127.0.0.1", 5683)).await.unwrap();
1300
        assert!(client.set_broadcast(true).is_ok());
1301
        assert!(client.set_broadcast(false).is_ok());
1302
    }
1303

1304
    #[tokio::test]
1305
    #[ignore]
1306
    async fn test_set_broadcast_v6() {
1307
        let client = UdpCoAPClient::new_udp(("::1", 5683)).await.unwrap();
1308
        assert!(client.set_broadcast(true).is_ok());
1309
        assert!(client.set_broadcast(false).is_ok());
1310
    }
1311

1312
    // Build a server that fakes observe behavior for a single response (it doesn't send regular notifications).
1313
    // Upon "/observe_me" registration, it returns a single notification split into two blocks.
1314
    // First block Block2 option: num=0 more=true, payload: "a" 1024 times, second block: num=1 more=false, payload: "b" 1024 times.
1315
    fn make_fake_blockwise_observe_server_handler() -> Box<
1316
        dyn Fn(
1317
                Box<CoapRequest<SocketAddr>>,
1318
            ) -> std::pin::Pin<
1319
                Box<dyn std::future::Future<Output = Box<CoapRequest<SocketAddr>>> + Send>,
1320
            > + Send
1321
            + Sync
1322
            + 'static,
1323
    > {
1324
        let prev_block_num = Arc::new(std::sync::Mutex::new(0u8));
1325
        Box::new(move |mut req: Box<CoapRequest<SocketAddr>>| {
1326
            let prev_block_num = prev_block_num.clone();
1327
            Box::pin(async move {
1328
                let path = req.get_path().to_string();
1329
                let method = req.get_method().clone();
1330

1331
                let mut send_block = |num: u16, more: bool, data: &[u8]| {
1332
                    if let Some(resp) = req.response.as_mut() {
1333
                        resp.message.header.code = MessageClass::Response(Status::Content);
1334
                        let block =
1335
                            BlockValue::new(num as usize, more, data.len()).expect("valid block");
1336
                        resp.message
1337
                            .add_option_as::<BlockValue>(CoapOption::Block2, block);
1338
                        resp.message.payload = data.to_vec();
1339
                    }
1340
                };
1341

1342
                match (path.as_str(), method) {
1343
                    ("ok", Method::Get) => {
1344
                        if let Some(resp) = req.response.as_mut() {
1345
                            resp.message.header.code = MessageClass::Response(Status::Content);
1346
                            resp.message.payload = b"ok".to_vec();
1347
                        }
1348
                    }
1349
                    ("observe_me", _) => {
1350
                        let has_observe = req.message.get_option(CoapOption::Observe).is_some();
1351
                        let maybe_block2_req = req
1352
                            .message
1353
                            .get_first_option_as::<BlockValue>(CoapOption::Block2)
1354
                            .and_then(|x| x.ok());
1355

1356
                        match (has_observe, maybe_block2_req) {
1357
                            (true, None) => {
1358
                                *prev_block_num.lock().unwrap() = 0;
1359
                                let payload_first_block = vec![b'a'; 1024];
1360
                                send_block(0, true, &payload_first_block);
1361
                            }
1362
                            (_, Some(block2)) if block2.num == 1 => {
1363
                                *prev_block_num.lock().unwrap() = 1;
1364
                                let payload_second_block = vec![b'b'; 1024];
1365
                                send_block(1, false, &payload_second_block);
1366
                            }
1367
                            _ => {
1368
                                if let Some(resp) = req.response.as_mut() {
1369
                                    resp.message.header.code =
1370
                                        MessageClass::Response(Status::NotFound);
1371
                                    resp.message.payload = b"not found".to_vec();
1372
                                }
1373
                            }
1374
                        }
1375
                    }
1376
                    _ => {
1377
                        if let Some(resp) = req.response.as_mut() {
1378
                            resp.message.header.code = MessageClass::Response(Status::NotFound);
1379
                            resp.message.payload = b"not found".to_vec();
1380
                        }
1381
                    }
1382
                }
1383

1384
                req
1385
            })
1386
        })
1387
    }
1388

1389
    #[tokio::test]
1390
    async fn test_observe_registration_failure_cleans_up_and_returns_error() {
1391
        // Spawn server with automatic observe handling disabled so that the observe-messages are passed to this handler.
1392
        // The server returns NotFound for observe registration
1393
        let server_port = server::test::spawn_server_disable_observe(
1394
            "127.0.0.1:0",
1395
            make_fake_blockwise_observe_server_handler(),
1396
        )
1397
        .recv()
1398
        .await
1399
        .unwrap();
1400

1401
        let mut client = UdpCoAPClient::new_udp(("127.0.0.1", server_port))
1402
            .await
1403
            .unwrap();
1404
        client.set_receive_timeout(Duration::from_secs(1));
1405

1406
        // Attempt to observe non-existing resource should fail with NotFound
1407
        let failed_observe_result = client.observe("/dont_observe_me", |_m| {}).await;
1408
        assert!(failed_observe_result.is_err());
1409

1410
        // The client should remain usable after the failed registration
1411
        let working_observe_result = client.observe("/observe_me", |_m| {}).await;
1412
        assert!(working_observe_result.is_ok());
1413

1414
        // Clean up the observe registration
1415
        let _ = working_observe_result
1416
            .unwrap()
1417
            .send(ObserveMessage::Terminate);
1418
    }
1419

1420
    #[tokio::test]
1421
    async fn test_observe_blockwise_notification_is_assembled() {
1422
        // Spawn server with automatic observe handling disabled so that we can write a specific test in the handler.
1423
        let server_port = server::test::spawn_server_disable_observe(
1424
            "127.0.0.1:0",
1425
            make_fake_blockwise_observe_server_handler(),
1426
        )
1427
        .recv()
1428
        .await
1429
        .unwrap();
1430

1431
        let client = UdpCoAPClient::new_udp(("127.0.0.1", server_port))
1432
            .await
1433
            .unwrap();
1434

1435
        // Set up arc to know when the handler is called
1436
        let client_handler_called = Arc::new(std::sync::Mutex::new(false));
1437
        let client_handler_called_clone = client_handler_called.clone();
1438

1439
        let handler = move |m: Message| {
1440
            let mut client_handler_called = client_handler_called_clone.lock().unwrap();
1441
            *client_handler_called = true;
1442
            assert!(m.payload.len() == 2048);
1443
        };
1444

1445
        let terminator = client.observe("/observe_me", handler).await.unwrap();
1446

1447
        //Wait for the handler to be called
1448
        while !*client_handler_called.lock().unwrap() {
1449
            tokio::time::sleep(Duration::from_millis(10)).await;
1450
        }
1451

1452
        // Terminate observation
1453
        let _ = terminator.send(ObserveMessage::Terminate);
1454
    }
1455

1456
    #[tokio::test]
1457
    async fn test_send_all_coap() {
1458
        // prepare the Non-confirmable request with the broadcast message
1459
        let mut request: CoapRequest<SocketAddr> = CoapRequest::new();
1460
        request.set_method(Method::Get);
1461
        request.set_path("/");
1462
        request
1463
            .message
1464
            .header
1465
            .set_type(coap_lite::MessageType::NonConfirmable);
1466
        request.message.payload = b"Discovery".to_vec();
1467

1468
        let client = UdpCoAPClient::new_udp(("127.0.0.1", 5683)).await.unwrap();
1469
        client.send_all_coap(&mut request, 0).await.unwrap();
1470
    }
1471
    #[tokio::test]
1472
    async fn test_change_block_option() {
1473
        // this test is a little finnicky because it relies on the configuration
1474
        // of the reception endpoint. It tries to send a payload larger than the
1475
        // default using a block option, this request is expected to fail because
1476
        // the endpoint does not support block requests. Afterwards, we change the
1477
        // maximum block size and thus expect the request to work.
1478
        const PAYLOAD_STR: &str = "this is a payload";
1479
        let mut large_payload = vec![];
1480
        while large_payload.len() < 1024 {
1481
            large_payload.extend_from_slice(PAYLOAD_STR.as_bytes());
1482
        }
1483
        let domain = "coap.me";
1484
        let mut client = UdpCoAPClient::new_udp((domain, 5683)).await.unwrap();
1485
        let resp = client
1486
            .send(
1487
                RequestBuilder::new("/large-create", Method::Put)
1488
                    .domain(domain.to_string())
1489
                    .data(Some(large_payload.clone()))
1490
                    .build(),
1491
            )
1492
            .await;
1493
        let err = resp.unwrap_err();
1494
        assert_eq!(err.kind(), ErrorKind::Unsupported);
1495
        //we now set the block size to make sure it is sent in a single request
1496
        client.set_block1_size(10_000_000);
1497

1498
        let resp = client
1499
            .send(
1500
                RequestBuilder::new("/large-create", Method::Post)
1501
                    .data(Some(large_payload.clone()))
1502
                    .domain(domain.to_string())
1503
                    .build(),
1504
            )
1505
            .await
1506
            .unwrap();
1507
        assert_eq!(*resp.get_status(), Status::Created);
1508
    }
1509
    #[tokio::test]
1510
    #[ignore]
1511
    async fn test_send_all_coap_v6() {
1512
        // prepare the Non-confirmable request with the broadcast message
1513
        let mut request: CoapRequest<SocketAddr> = CoapRequest::new();
1514
        request.set_method(Method::Get);
1515
        request.set_path("/");
1516
        request
1517
            .message
1518
            .header
1519
            .set_type(coap_lite::MessageType::NonConfirmable);
1520
        request.message.payload = b"Discovery".to_vec();
1521

1522
        let client = UdpCoAPClient::new_udp(("::1", 5683)).await.unwrap();
1523
        client.send_all_coap(&mut request, 0x4).await.unwrap();
1524
    }
1525

1526
    struct FaultyUdp {
1527
        pub udp: UdpTransport,
1528
        pub num_fails: u32,
1529
        pub current_fails: AtomicU32,
1530
    }
1531

1532
    #[async_trait]
1533
    impl ClientTransport for FaultyUdp {
1534
        async fn recv(&self, buf: &mut [u8]) -> std::io::Result<(usize, Option<SocketAddr>)> {
1535
            self.udp.recv(buf).await
1536
        }
1537

1538
        async fn send(&self, buf: &[u8]) -> std::io::Result<usize> {
1539
            self.current_fails.fetch_add(1, Ordering::Relaxed);
1540
            self.current_fails
1541
                .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
1542
                    Some(n % self.num_fails)
1543
                })
1544
                .unwrap();
1545
            if self.current_fails.load(Ordering::Relaxed) == 0 {
1546
                return self.udp.send(buf).await;
1547
            }
1548
            Err(Error::new(ErrorKind::Other, "fails this time"))
1549
        }
1550
    }
1551

1552
    async fn get_faulty_client(server_addr: &str, num_fails: u32) -> CoAPClient<FaultyUdp> {
1553
        let peer_addr = lookup_host(server_addr)
1554
            .await
1555
            .unwrap()
1556
            .next()
1557
            .ok_or(Error::new(
1558
                ErrorKind::InvalidInput,
1559
                "could not get socket address",
1560
            ))
1561
            .unwrap();
1562
        let socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
1563
        let transport = UdpTransport { socket, peer_addr };
1564
        let transport = FaultyUdp {
1565
            udp: transport,
1566
            num_fails,
1567
            current_fails: 0.into(),
1568
        };
1569

1570
        return CoAPClient::from_transport(transport);
1571
    }
1572
    #[tokio::test]
1573
    async fn test_retries() {
1574
        let server_port = server::test::spawn_server("127.0.0.1:0", |mut req| async {
1575
            req.response.as_mut().unwrap().message.payload = b"Rust".to_vec();
1576
            return req;
1577
        })
1578
        .recv()
1579
        .await
1580
        .unwrap();
1581

1582
        let server_addr = format!("127.0.0.1:{}", server_port);
1583
        let mut client = get_faulty_client(
1584
            &server_addr,
1585
            CoapClientTransport::<FaultyUdp>::DEFAULT_NUM_RETRIES as u32 + 1,
1586
        )
1587
        .await;
1588
        let request_gen = || {
1589
            RequestBuilder::new("/Rust", Method::Get)
1590
                .domain(server_addr.clone())
1591
                .build()
1592
        };
1593
        let error = client.send(request_gen()).await.unwrap_err();
1594
        assert_eq!(error.kind(), ErrorKind::Other);
1595
        //this request will work, we do this to reset the state of the faulty udp
1596
        client.send(request_gen()).await.unwrap();
1597

1598
        client.set_transport_retries(CoapClientTransport::<UdpTransport>::DEFAULT_NUM_RETRIES + 2);
1599
        let resp = client.send(request_gen()).await.unwrap();
1600

1601
        assert_eq!(resp.message.payload, b"Rust".to_vec());
1602
    }
1603
    #[tokio::test]
1604
    async fn test_non_confirmable_no_retries() {
1605
        let server_port = server::test::spawn_server("127.0.0.1:0", |mut req| async {
1606
            req.response.as_mut().unwrap().message.payload = b"Rust".to_vec();
1607
            return req;
1608
        })
1609
        .recv()
1610
        .await
1611
        .unwrap();
1612

1613
        let server_addr = format!("127.0.0.1:{}", server_port);
1614
        let client = get_faulty_client(&server_addr, 2).await;
1615
        let mut request = CoapRequest::new();
1616
        request.set_method(Method::Get);
1617
        request.set_path("/Rust");
1618
        request.message.header.message_id = 123;
1619
        request.message.header.set_type(MessageType::NonConfirmable);
1620

1621
        let req = client.send(request).await;
1622
        assert!(req.is_err());
1623
    }
1624

1625
    async fn do_wait_request<T: ClientTransport + 'static>(
1626
        client: Arc<CoAPClient<T>>,
1627
        path: &str,
1628
        token: Vec<u8>,
1629
        wait_ms: u64,
1630
    ) -> IoResult<CoapResponse> {
1631
        let mut request = CoapRequest::new();
1632
        request.message.header.set_version(1);
1633
        request
1634
            .message
1635
            .header
1636
            .set_type(coap_lite::MessageType::Confirmable);
1637
        request.message.header.set_code("0.01");
1638
        request.message.header.message_id = 1;
1639
        request.message.set_token(token);
1640
        request
1641
            .message
1642
            .add_option(CoapOption::UriPath, path.as_bytes().to_vec());
1643
        request.message.payload = wait_ms.to_string().into();
1644

1645
        return client.send(request).await;
1646
    }
1647

1648
    async fn wait_handler(mut req: Box<CoapRequest<SocketAddr>>) -> Box<CoapRequest<SocketAddr>> {
1649
        let uri_path_list = req.message.get_option(CoapOption::UriPath).unwrap().clone();
1650
        let payload = str::from_utf8(&req.message.payload).unwrap();
1651
        let to_wait_ms: u64 = payload.parse().unwrap();
1652
        time::sleep(Duration::from_millis(to_wait_ms)).await;
1653

1654
        match req.response {
1655
            Some(ref mut response) => {
1656
                response.message.payload = uri_path_list.front().unwrap().clone();
1657
            }
1658
            _ => {}
1659
        }
1660
        return req;
1661
    }
1662
    /// run 2 clients using the same transport and receive an answer
1663
    /// in the expected order without interference
1664
    #[tokio::test]
1665
    async fn test_multiple_clients_same_socket() {
1666
        let server_port = spawn_server("127.0.0.1:0", wait_handler)
1667
            .recv()
1668
            .await
1669
            .unwrap();
1670

1671
        let client = Arc::new(
1672
            UdpCoAPClient::new_udp(format!("127.0.0.1:{}", server_port))
1673
                .await
1674
                .unwrap(),
1675
        );
1676
        let mut b = tokio::spawn(do_wait_request(client.clone(), "/bar", vec![1], 500));
1677
        let a = tokio::spawn(do_wait_request(client.clone(), "/foo", vec![2], 50));
1678

1679
        tokio::select! {
1680
            a_first = a => {
1681
            let a_first = a_first.unwrap().unwrap();
1682
            assert_eq!(a_first.message.payload, b"/foo".to_vec());
1683
            assert_eq!(a_first.message.get_token(), vec![2]);
1684
            },
1685
            _b_first = &mut b => {
1686
                panic!("should not happen");
1687

1688
            }
1689
        }
1690
        let b_end = b.await.unwrap().expect("should receive a response");
1691
        assert_eq!(b_end.message.payload, b"/bar".to_vec());
1692
        assert_eq!(b_end.message.get_token(), vec![1]);
1693
    }
1694

1695
    struct FaultyReceiver {
1696
        pub udp: UdpTransport,
1697
        pub should_fail: Mutex<oneshot::Receiver<std::io::Error>>,
1698
    }
1699
    #[async_trait]
1700
    impl ClientTransport for FaultyReceiver {
1701
        async fn recv(&self, buf: &mut [u8]) -> std::io::Result<(usize, Option<SocketAddr>)> {
1702
            let mut mutex = self.should_fail.lock().await;
1703
            tokio::select! {
1704
                e = mutex.deref_mut() => {
1705
                    return Err(e.unwrap());
1706
                }
1707
                result = self.udp.recv(buf) => {
1708
                    return result;
1709
                }
1710
            }
1711
        }
1712

1713
        async fn send(&self, buf: &[u8]) -> std::io::Result<usize> {
1714
            self.udp.send(buf).await
1715
        }
1716
    }
1717

1718
    async fn get_faulty_receiver_client(
1719
        server_addr: &str,
1720
    ) -> (oneshot::Sender<std::io::Error>, CoAPClient<FaultyReceiver>) {
1721
        let (tx, rx) = oneshot::channel();
1722
        let peer_addr = lookup_host(server_addr)
1723
            .await
1724
            .unwrap()
1725
            .next()
1726
            .ok_or(Error::new(
1727
                ErrorKind::InvalidInput,
1728
                "could not get socket address",
1729
            ))
1730
            .unwrap();
1731
        let socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
1732
        let transport = UdpTransport { socket, peer_addr };
1733
        let transport = FaultyReceiver {
1734
            udp: transport,
1735
            should_fail: Mutex::new(rx),
1736
        };
1737

1738
        return (tx, CoAPClient::from_transport(transport));
1739
    }
1740
    #[tokio::test(flavor = "multi_thread")]
1741
    async fn test_synchronizer_receive_error() {
1742
        let server_port = server::test::spawn_server("127.0.0.1:0", wait_handler)
1743
            .recv()
1744
            .await
1745
            .unwrap();
1746

1747
        let server_addr = format!("127.0.0.1:{}", server_port);
1748
        let (flag, client) = get_faulty_receiver_client(&server_addr).await;
1749
        let mut handles = vec![];
1750
        let arc_client = Arc::new(client);
1751
        for i in 0..10 {
1752
            let c_clone = arc_client.clone();
1753
            handles.push(tokio::spawn(async move {
1754
                do_wait_request(c_clone, &format!("/{}", i), vec![i], 2000).await
1755
            }));
1756
        }
1757
        //wait for all futures to advance
1758
        tokio::time::sleep(Duration::from_millis(200)).await;
1759
        flag.send(Error::new(ErrorKind::Other, "fail")).unwrap();
1760

1761
        //all handles should fail now because of the error
1762
        for h in handles {
1763
            assert!(h.await.unwrap().is_err());
1764
        }
1765

1766
        assert!(
1767
            do_wait_request(arc_client.clone(), "/foo", vec![254], 1)
1768
                .await
1769
                .is_err(),
1770
            "failed transport should make all other requests fail"
1771
        )
1772
    }
1773
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc