• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Covertness / coap-rs / 25325548906

04 May 2026 02:43PM UTC coverage: 84.615% (-0.7%) from 85.288%
25325548906

Pull #130

github

web-flow
Merge aa99ac8bd into 8b5e53778
Pull Request #130: Fix all Clippy warnings and enforce Clippy in CI

62 of 75 new or added lines in 5 files covered. (82.67%)

3 existing lines in 2 files now uncovered.

836 of 988 relevant lines covered (84.62%)

4.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.82
/src/server.rs
1
use async_trait::async_trait;
2
use coap_lite::{BlockHandler, BlockHandlerConfig, CoapOption, CoapRequest, CoapResponse, Packet};
3
use log::debug;
4
use std::{
5
    future::Future,
6
    net::{self, IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs},
7
    sync::Arc,
8
};
9
use tokio::{
10
    io,
11
    net::UdpSocket,
12
    select,
13
    sync::{
14
        mpsc::{self, UnboundedReceiver, UnboundedSender},
15
        Mutex,
16
    },
17
    task::JoinHandle,
18
};
19

20
use crate::observer::{encode_coap_uint, Observer};
21

22
#[derive(Debug)]
23
pub enum CoAPServerError {
24
    NetworkError,
25
    EventLoopError,
26
    AnotherHandlerIsRunning,
27
    EventSendError,
28
}
29

30
use tokio::io::Error;
31

32
#[async_trait]
33
pub trait Dispatcher: Send + Sync {
34
    async fn dispatch(&self, request: CoapRequest<SocketAddr>) -> Option<CoapResponse>;
35
}
36

37
#[async_trait]
38
/// This trait represents a generic way to respond to a listener. If you want to implement your own
39
/// listener, you have to implement this trait to be able to send responses back through the
40
/// correct transport
41
pub trait Responder: Sync + Send {
42
    async fn respond(&self, response: Vec<u8>);
43
    fn address(&self) -> SocketAddr;
44
}
45

46
/// channel to send new requests from a transport to the CoAP server
47
pub type TransportRequestSender = UnboundedSender<(Vec<u8>, Arc<dyn Responder>)>;
48

49
/// channel used by CoAP server to receive new requests
50
pub type TransportRequestReceiver = UnboundedReceiver<(Vec<u8>, Arc<dyn Responder>)>;
51

52
type UdpResponseReceiver = UnboundedReceiver<(Vec<u8>, SocketAddr)>;
53
type UdpResponseSender = UnboundedSender<(Vec<u8>, SocketAddr)>;
54

55
// listeners receive new connections
56
#[async_trait]
57
pub trait Listener: Send {
58
    async fn listen(
59
        self: Box<Self>,
60
        sender: TransportRequestSender,
61
    ) -> std::io::Result<JoinHandle<std::io::Result<()>>>;
62
}
63
/// listener for a UDP socket
64
pub struct UdpCoapListener {
65
    socket: UdpSocket,
66
    multicast_addresses: Vec<IpAddr>,
67
    response_receiver: UdpResponseReceiver,
68
    response_sender: UdpResponseSender,
69
}
70

71
#[async_trait]
72
/// A trait for handling incoming requests. Use this instead of a closure
73
/// if you want to modify some external state
74
pub trait RequestHandler: Send + Sync + 'static {
75
    async fn handle_request(
76
        &self,
77
        mut request: Box<CoapRequest<SocketAddr>>,
78
    ) -> Box<CoapRequest<SocketAddr>>;
79
}
80

81
#[async_trait]
82
impl<F, HandlerRet> RequestHandler for F
83
where
84
    F: Fn(Box<CoapRequest<SocketAddr>>) -> HandlerRet + Send + Sync + 'static,
85
    HandlerRet: Future<Output = Box<CoapRequest<SocketAddr>>> + Send,
86
{
87
    async fn handle_request(
35✔
88
        &self,
89
        request: Box<CoapRequest<SocketAddr>>,
90
    ) -> Box<CoapRequest<SocketAddr>> {
91
        self(request).await
22✔
92
    }
93
}
94

95
/// A listener for UDP packets. This listener can also subscribe to multicast addresses
96
impl UdpCoapListener {
97
    pub fn new<A: ToSocketAddrs>(addr: A) -> Result<Self, Error> {
1✔
98
        let std_socket = net::UdpSocket::bind(addr)?;
1✔
99
        std_socket.set_nonblocking(true)?;
2✔
100
        let socket = UdpSocket::from_std(std_socket)?;
1✔
101
        Ok(Self::from_socket(socket))
2✔
102
    }
103

104
    pub fn from_socket(socket: tokio::net::UdpSocket) -> Self {
1✔
105
        let (tx, rx) = mpsc::unbounded_channel();
2✔
106
        Self {
107
            socket,
108
            multicast_addresses: Vec::new(),
1✔
109
            response_receiver: rx,
110
            response_sender: tx,
111
        }
112
    }
113

114
    /// join multicast - adds the multicast addresses to the unicast listener
115
    /// - IPv4 multicast address range is '224.0.0.0/4'
116
    /// - IPv6 AllCoAp multicast addresses are 'ff00::/8'
117
    ///
118
    /// Parameter segment is used with IPv6 to determine the first octet.
119
    /// - It's value can be between 0x0 and 0xf.
120
    /// - To join multiple segments, you have to call enable_discovery for each of the segments.
121
    ///
122
    /// Some Multicast address scope
123
    /// IPv6        IPv4 equivalent[16]            Scope                Purpose
124
    /// ffx1::/16    127.0.0.0/8                    Interface-local        Packets with this destination address may not be sent over any network link, but must remain within the current node; this is the multicast equivalent of the unicast loopback address.
125
    /// ffx2::/16    224.0.0.0/24                Link-local            Packets with this destination address may not be routed anywhere.
126
    /// ffx3::/16    239.255.0.0/16                IPv4 local scope
127
    /// ffx4::/16                                Admin-local            The smallest scope that must be administratively configured.
128
    /// ffx5::/16                                Site-local            Restricted to the local physical network.
129
    /// ffx8::/16    239.192.0.0/14                Organization-local    Restricted to networks used by the organization administering the local network. (For example, these addresses might be used over VPNs; when packets for this group are routed over the public internet (where these addresses are not valid), they would have to be encapsulated in some other protocol.)
130
    /// ffxe::/16    224.0.1.0-238.255.255.255    Global scope        Eligible to be routed over the public internet.
131
    ///
132
    /// Notable addresses:
133
    /// ff02::1        All nodes on the local network segment
134
    /// ff0x::c        Simple Service Discovery Protocol
135
    /// ff0x::fb    Multicast DNS
136
    /// ff0x::fb    Multicast CoAP
137
    /// ff0x::114    Used for experiments
138
    //    pub fn join_multicast(&mut self, addr: IpAddr) {
139
    //        self.udp_server.join_multicast(addr);
140
    //    }
141
    pub fn join_multicast(&mut self, addr: IpAddr) {
1✔
142
        assert!(addr.is_multicast());
1✔
143
        // determine wether IPv4 or IPv6 and
144
        // join the appropriate multicast address
145
        match self.socket.local_addr().unwrap() {
1✔
146
            SocketAddr::V4(val) => {
1✔
147
                match addr {
1✔
148
                    IpAddr::V4(ipv4) => {
1✔
149
                        let i = *val.ip();
1✔
150
                        self.socket.join_multicast_v4(ipv4, i).unwrap();
1✔
151
                        self.multicast_addresses.push(addr);
1✔
152
                    }
153
                    IpAddr::V6(_ipv6) => { /* handle IPv6 */ }
×
154
                }
155
            }
156
            SocketAddr::V6(_val) => {
×
157
                match addr {
×
158
                    IpAddr::V4(_ipv4) => { /* handle IPv4 */ }
×
159
                    IpAddr::V6(ipv6) => {
×
160
                        self.socket.join_multicast_v6(&ipv6, 0).unwrap();
×
161
                        self.multicast_addresses.push(addr);
×
162
                        //self.socket.set_only_v6(true)?;
163
                    }
164
                }
165
            }
166
        }
167
    }
168

169
    /// leave multicast - remove the multicast address from the listener
170
    pub fn leave_multicast(&mut self, addr: IpAddr) {
1✔
171
        assert!(addr.is_multicast());
1✔
172
        // determine wether IPv4 or IPv6 and
173
        // leave the appropriate multicast address
174
        match self.socket.local_addr().unwrap() {
1✔
175
            SocketAddr::V4(val) => {
1✔
176
                match addr {
1✔
177
                    IpAddr::V4(ipv4) => {
1✔
178
                        let i = *val.ip();
1✔
179
                        self.socket.leave_multicast_v4(ipv4, i).unwrap();
1✔
180
                        let index = self
1✔
181
                            .multicast_addresses
182
                            .iter()
183
                            .position(|&item| item == addr)
3✔
184
                            .unwrap();
185
                        self.multicast_addresses.remove(index);
1✔
186
                    }
187
                    IpAddr::V6(_ipv6) => { /* handle IPv6 */ }
×
188
                }
189
            }
190
            SocketAddr::V6(_val) => {
×
191
                match addr {
×
192
                    IpAddr::V4(_ipv4) => { /* handle IPv4 */ }
×
193
                    IpAddr::V6(ipv6) => {
×
194
                        self.socket.leave_multicast_v6(&ipv6, 0).unwrap();
×
195
                        let index = self
×
196
                            .multicast_addresses
197
                            .iter()
198
                            .position(|&item| item == addr)
×
199
                            .unwrap();
200
                        self.multicast_addresses.remove(index);
×
201
                    }
202
                }
203
            }
204
        }
205
    }
206
    /// enable AllCoAP multicasts - adds the AllCoap addresses to the listener
207
    /// - IPv4 AllCoAP multicast address is '224.0.1.187'
208
    /// - IPv6 AllCoAp multicast addresses are 'ff0?::fd'
209
    ///
210
    /// Parameter segment is used with IPv6 to determine the first octet.
211
    /// - It's value can be between 0x0 and 0xf.
212
    /// - To join multiple segments, you have to call enable_discovery for each of the segments.
213
    ///
214
    /// For further details see method join_multicast
215
    pub fn enable_all_coap(&mut self, segment: u8) {
1✔
216
        assert!(segment <= 0xf);
1✔
217
        let m = match self.socket.local_addr().unwrap() {
1✔
218
            SocketAddr::V4(_val) => IpAddr::V4(Ipv4Addr::new(224, 0, 1, 187)),
1✔
219
            SocketAddr::V6(_val) => IpAddr::V6(Ipv6Addr::new(
×
220
                0xff00 + segment as u16,
×
221
                0,
222
                0,
223
                0,
224
                0,
225
                0,
226
                0,
227
                0xfd,
228
            )),
229
        };
230
        self.join_multicast(m);
1✔
231
    }
232
}
233
#[async_trait]
234
impl Listener for UdpCoapListener {
235
    async fn listen(
5✔
236
        mut self: Box<Self>,
237
        sender: TransportRequestSender,
238
    ) -> std::io::Result<JoinHandle<std::io::Result<()>>> {
239
        return Ok(tokio::spawn(self.receive_loop(sender)));
3✔
240
    }
241
}
242

243
#[derive(Clone)]
244
struct UdpResponder {
245
    address: SocketAddr, // this is the address we are sending to
246
    tx: UdpResponseSender,
247
}
248

249
#[async_trait]
250
impl Responder for UdpResponder {
251
    async fn respond(&self, response: Vec<u8>) {
4✔
252
        let _ = self.tx.send((response, self.address));
2✔
253
    }
254
    fn address(&self) -> SocketAddr {
1✔
255
        self.address
1✔
256
    }
257
}
258

259
impl UdpCoapListener {
260
    pub async fn receive_loop(mut self, sender: TransportRequestSender) -> std::io::Result<()> {
4✔
261
        loop {
2✔
262
            let mut recv_vec = Vec::with_capacity(u16::MAX as usize);
2✔
263
            select! {
3✔
264
                message =self.socket.recv_buf_from(&mut recv_vec)=> {
265
                    match message {
266
                        Ok((_size, from)) => {
267
                            sender.send((recv_vec, Arc::new(UdpResponder{address: from, tx: self.response_sender.clone()}))).map_err( |_| std::io::Error::other("server channel error"))?;
268
                        }
269
                        Err(e) => {
270
                            return Err(e);
271
                        }
272
                    }
273
                },
274
                response = self.response_receiver.recv() => {
275
                    if let Some((bytes, to)) = response{
276
                        debug!("sending {:?} to {:?}", &bytes,  &to);
277
                        self.socket.send_to(&bytes, to).await?;
278
                    }
279
                    else {
280
                        // in case nobody is listening to us, we can just terminate, though this
281
                        // should never happen for UDP
282
                        return Ok(());
283
                    }
284

285
                }
286
            }
287
        }
288
    }
289
}
290

291
#[derive(Debug)]
292
pub struct QueuedMessage {
293
    pub address: SocketAddr,
294
    pub message: Packet,
295
}
296

297
struct ServerCoapState {
298
    observer: Observer,
299
    block_handler: BlockHandler<SocketAddr>,
300
    disable_observe: bool,
301
}
302

303
pub enum ShouldForwardToHandler {
304
    True,
305
    False,
306
}
307

308
impl ServerCoapState {
309
    pub async fn intercept_request(
1✔
310
        &mut self,
311
        request: &mut CoapRequest<SocketAddr>,
312
        responder: Arc<dyn Responder>,
313
    ) -> ShouldForwardToHandler {
314
        match self.block_handler.intercept_request(request) {
3✔
315
            Ok(true) => return ShouldForwardToHandler::False,
1✔
316
            Err(_err) => return ShouldForwardToHandler::False,
×
317
            Ok(false) => {}
318
        };
319

320
        if self.disable_observe {
1✔
321
            return ShouldForwardToHandler::True;
1✔
322
        }
323

324
        let should_be_forwarded = self.observer.request_handler(request, responder).await;
2✔
325
        if should_be_forwarded {
2✔
326
            ShouldForwardToHandler::True
1✔
327
        } else {
328
            ShouldForwardToHandler::False
1✔
329
        }
330
    }
331

332
    pub async fn intercept_response(&mut self, request: &mut CoapRequest<SocketAddr>) {
4✔
333
        let resource_path = request.get_path();
1✔
334

335
        let is_block_fetch_for_observer = request.message.get_option(CoapOption::Block2).is_some()
3✔
336
            && request.message.get_option(CoapOption::Observe).is_none()
1✔
337
            && request.source.is_some()
1✔
338
            && self
2✔
339
                .observer
340
                .is_observing(&request.source.unwrap(), &resource_path);
2✔
341

342
        if is_block_fetch_for_observer {
1✔
343
            if let Some((payload, etag)) =
1✔
344
                self.observer.get_resource_payload_and_etag(&resource_path)
345
            {
346
                if let Some(ref mut response) = request.response {
2✔
347
                    response.message.payload = payload.to_vec();
2✔
348
                    response.message.clear_option(CoapOption::ETag);
1✔
349
                    response.message.add_option(CoapOption::ETag, etag);
1✔
350
                    // Prevent duplicate Size2 options, clear first.
351
                    response.message.clear_option(CoapOption::Size2);
1✔
352
                    response
1✔
353
                        .message
354
                        .add_option(CoapOption::Size2, encode_coap_uint(payload.len()));
1✔
355
                }
356
            }
357
        }
358

359
        if let Err(err) = self.block_handler.intercept_response(request) {
3✔
360
            let _ = request.apply_from_error(err);
×
361
        }
362
    }
363

364
    pub fn new() -> Self {
1✔
365
        Self {
366
            observer: Observer::new(),
1✔
367
            block_handler: BlockHandler::new(BlockHandlerConfig::default()),
2✔
368
            disable_observe: false,
369
        }
370
    }
371
    pub fn disable_observe_handling(&mut self, value: bool) {
1✔
372
        self.disable_observe = value
1✔
373
    }
374
}
375

376
pub struct Server {
377
    listeners: Vec<Box<dyn Listener>>,
378
    coap_state: Arc<Mutex<ServerCoapState>>,
379
    new_packet_receiver: TransportRequestReceiver,
380
    new_packet_sender: TransportRequestSender,
381
}
382

383
impl Server {
384
    /// Creates a CoAP server listening on the given address.
385
    pub fn new_udp<A: ToSocketAddrs>(addr: A) -> Result<Self, io::Error> {
×
386
        let listener: Vec<Box<dyn Listener>> = vec![Box::new(UdpCoapListener::new(addr)?)];
×
387
        Ok(Self::from_listeners(listener))
×
388
    }
389

390
    pub fn from_listeners(listeners: Vec<Box<dyn Listener>>) -> Self {
1✔
391
        let (tx, rx) = mpsc::unbounded_channel();
2✔
392
        Server {
393
            listeners,
394
            coap_state: Arc::new(Mutex::new(ServerCoapState::new())),
2✔
395
            new_packet_receiver: rx,
396
            new_packet_sender: tx,
397
        }
398
    }
399

400
    async fn spawn_handles(
1✔
401
        listeners: Vec<Box<dyn Listener>>,
402
        sender: TransportRequestSender,
403
    ) -> std::io::Result<Vec<JoinHandle<std::io::Result<()>>>> {
404
        let mut handles = vec![];
1✔
405
        for listener in listeners.into_iter() {
4✔
406
            let handle = listener.listen(sender.clone()).await?;
3✔
407
            handles.push(handle);
1✔
408
        }
409
        Ok(handles)
1✔
410
    }
411

412
    /// run the server.
413
    pub async fn run<Handler: RequestHandler>(mut self, handler: Handler) -> Result<(), io::Error> {
49✔
414
        let _handles = Self::spawn_handles(self.listeners, self.new_packet_sender.clone()).await?;
20✔
415

416
        let handler_arc = Arc::new(handler);
20✔
417
        // receive an input, sync our cache / states, then call custom handler
418
        loop {
419
            let (bytes, respond) = self
47✔
NEW
420
                .new_packet_receiver
×
421
                .recv()
422
                .await
58✔
423
                .ok_or_else(|| std::io::Error::other("listen channel closed"))?;
9✔
424
            if let Ok(packet) = Packet::from_bytes(&bytes) {
18✔
425
                let mut request = Box::new(CoapRequest::<SocketAddr>::from_packet(
27✔
426
                    packet,
9✔
427
                    respond.address(),
18✔
428
                ));
429
                let mut coap_state = self.coap_state.lock().await;
18✔
430
                let should_forward = coap_state
27✔
431
                    .intercept_request(&mut request, respond.clone())
9✔
432
                    .await;
36✔
433

434
                match should_forward {
9✔
435
                    ShouldForwardToHandler::True => {
436
                        let handler_clone = handler_arc.clone();
18✔
437
                        let coap_state_clone = self.coap_state.clone();
18✔
438
                        tokio::spawn(async move {
27✔
439
                            request = handler_clone.handle_request(request).await;
20✔
440
                            coap_state_clone
40✔
441
                                .lock()
8✔
442
                                .await
24✔
443
                                .intercept_response(request.as_mut())
8✔
444
                                .await;
16✔
445

446
                            Self::respond_to_request(request, respond).await;
8✔
447
                        });
448
                    }
449
                    ShouldForwardToHandler::False => {
450
                        Self::respond_to_request(request, respond).await;
12✔
451
                    }
452
                }
453
            }
454
        }
455
    }
456
    async fn respond_to_request(req: Box<CoapRequest<SocketAddr>>, responder: Arc<dyn Responder>) {
4✔
457
        // if we have some reponse to send, send it
458
        if let Some(Ok(b)) = req.response.map(|resp| resp.message.to_bytes()) {
6✔
459
            responder.respond(b).await;
2✔
460
        }
461
    }
462
    #[deprecated(
463
        since = "0.21.0",
464
        note = "Use 'coap::Server::automatic_observe_handling' instead."
465
    )]
466
    /// disable auto-observe handling in server
UNCOV
467
    pub async fn disable_observe_handling(&mut self, value: bool) {
×
UNCOV
468
        self.automatic_observe_handling(value).await
×
469
    }
470
    /// set auto-observe handling in server, defaults to enabled
471
    pub async fn automatic_observe_handling(&mut self, value: bool) {
4✔
472
        let mut coap_state = self.coap_state.lock().await;
2✔
473
        coap_state.disable_observe_handling(value)
2✔
474
    }
475
}
476

477
#[cfg(test)]
478
pub mod test {
479
    use crate::request::RequestBuilder;
480

481
    use super::super::*;
482
    use super::*;
483
    use coap_lite::{block_handler::BlockValue, CoapOption, RequestType};
484
    use std::str;
485
    use std::time::Duration;
486

487
    pub fn spawn_server<
488
        F: Fn(Box<CoapRequest<SocketAddr>>) -> HandlerRet + Send + Sync + 'static,
489
        HandlerRet,
490
    >(
491
        ip: &'static str,
492
        request_handler: F,
493
    ) -> mpsc::UnboundedReceiver<u16>
494
    where
495
        HandlerRet: Future<Output = Box<CoapRequest<SocketAddr>>> + Send,
496
    {
497
        let (tx, rx) = mpsc::unbounded_channel();
498
        let _task = tokio::spawn(async move {
499
            let sock = UdpSocket::bind(ip).await.unwrap();
500
            let addr = sock.local_addr().unwrap();
501
            let listener = Box::new(UdpCoapListener::from_socket(sock));
502
            let server = Server::from_listeners(vec![listener]);
503
            tx.send(addr.port()).unwrap();
504
            server.run(request_handler).await.unwrap();
505
        });
506

507
        rx
508
    }
509

510
    async fn request_handler(
511
        mut req: Box<CoapRequest<SocketAddr>>,
512
    ) -> Box<CoapRequest<SocketAddr>> {
513
        let uri_path_list = req.message.get_option(CoapOption::UriPath).unwrap().clone();
514
        assert_eq!(uri_path_list.len(), 1);
515

516
        match req.response {
517
            Some(ref mut response) => {
518
                response.message.payload = uri_path_list.front().unwrap().clone();
519
            }
520
            _ => {}
521
        }
522
        return req;
523
    }
524

525
    pub fn spawn_server_with_all_coap<
526
        F: Fn(Box<CoapRequest<SocketAddr>>) -> HandlerRet + Send + Sync + 'static,
527
        HandlerRet,
528
    >(
529
        ip: &'static str,
530
        request_handler: F,
531
        segment: u8,
532
    ) -> mpsc::UnboundedReceiver<u16>
533
    where
534
        HandlerRet: Future<Output = Box<CoapRequest<SocketAddr>>> + Send,
535
    {
536
        let (tx, rx) = mpsc::unbounded_channel();
537

538
        std::thread::Builder::new()
539
            .name(String::from("v4-server"))
540
            .spawn(move || {
541
                tokio::runtime::Runtime::new()
542
                    .unwrap()
543
                    .block_on(async move {
544
                        // multicast needs a server on a real interface
545
                        let sock = UdpSocket::bind((ip, 0)).await.unwrap();
546
                        let addr = sock.local_addr().unwrap();
547
                        let mut listener = Box::new(UdpCoapListener::from_socket(sock));
548
                        listener.enable_all_coap(segment);
549
                        let server = Server::from_listeners(vec![listener]);
550
                        tx.send(addr.port()).unwrap();
551
                        server.run(request_handler).await.unwrap();
552
                    })
553
            })
554
            .unwrap();
555

556
        rx
557
    }
558

559
    pub fn spawn_server_disable_observe<
560
        F: Fn(Box<CoapRequest<SocketAddr>>) -> HandlerRet + Send + Sync + 'static,
561
        HandlerRet,
562
    >(
563
        ip: &'static str,
564
        request_handler: F,
565
    ) -> mpsc::UnboundedReceiver<u16>
566
    where
567
        HandlerRet: Future<Output = Box<CoapRequest<SocketAddr>>> + Send,
568
    {
569
        let (tx, rx) = mpsc::unbounded_channel();
570
        let _task = tokio::spawn(async move {
571
            let sock = UdpSocket::bind(ip).await.unwrap();
572
            let addr = sock.local_addr().unwrap();
573
            let listener = Box::new(UdpCoapListener::from_socket(sock));
574
            let mut server = Server::from_listeners(vec![listener]);
575
            server.automatic_observe_handling(true).await;
576
            tx.send(addr.port()).unwrap();
577
            server.run(request_handler).await.unwrap();
578
        });
579

580
        rx
581
    }
582

583
    #[tokio::test]
584
    async fn test_listener_instantiation() {
585
        let listener = UdpCoapListener::new("127.0.0.1:0").unwrap();
586
        assert!(
587
            listener.socket.local_addr().unwrap().ip() == IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))
588
        );
589
        // assert!(listener.socket.blocking() == false);
590

591
        let explicit_socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
592
        let another_listener = UdpCoapListener::from_socket(explicit_socket);
593
        assert!(
594
            another_listener.socket.local_addr().unwrap().ip()
595
                == IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))
596
        );
597
    }
598

599
    #[tokio::test]
600
    async fn test_echo_server() {
601
        let server_port = spawn_server("127.0.0.1:0", request_handler)
602
            .recv()
603
            .await
604
            .unwrap();
605

606
        let client = UdpCoAPClient::new(format!("127.0.0.1:{}", server_port))
607
            .await
608
            .unwrap();
609
        let mut request = CoapRequest::new();
610
        request.message.header.set_version(1);
611
        request
612
            .message
613
            .header
614
            .set_type(coap_lite::MessageType::Confirmable);
615
        request.message.header.set_code("0.01");
616
        request.message.header.message_id = 1;
617
        request.message.set_token(vec![0x51, 0x55, 0x77, 0xE8]);
618
        request
619
            .message
620
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
621
        client.send_single_request(&request).await.unwrap();
622

623
        let recv_packet = client.send(request).await.unwrap();
624
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
625
    }
626

627
    #[tokio::test]
628
    async fn test_put_block() {
629
        let server_port = spawn_server("127.0.0.1:0", request_handler)
630
            .recv()
631
            .await
632
            .unwrap();
633
        let data = "hello this is a payload";
634
        let mut v = Vec::new();
635
        for _ in 0..1024 {
636
            v.extend_from_slice(data.as_bytes());
637
        }
638
        let payload_size = v.len();
639
        let server_string = format!("127.0.0.1:{}", server_port);
640
        let client = UdpCoAPClient::new(server_string.clone()).await.unwrap();
641

642
        let request = RequestBuilder::new("/large", RequestType::Put)
643
            .data(Some(v))
644
            .domain(server_string.clone())
645
            .build();
646

647
        let resp = client.send(request).await.unwrap();
648
        let block_opt = resp
649
            .message
650
            .get_first_option_as::<BlockValue>(CoapOption::Block1)
651
            .expect("expected block opt in response")
652
            .expect("could not decode block1 option");
653
        let expected_number = (payload_size as f32 / 1024.0).ceil() as u16 - 1;
654
        assert_eq!(
655
            block_opt.num, expected_number,
656
            "block not completely received!"
657
        );
658

659
        assert_eq!(resp.message.payload, b"large".to_vec());
660
    }
661

662
    #[tokio::test]
663
    #[ignore]
664
    async fn test_echo_server_v6() {
665
        let server_port = spawn_server("::1:0", request_handler).recv().await.unwrap();
666

667
        let client = UdpCoAPClient::new(format!("::1:{}", server_port))
668
            .await
669
            .unwrap();
670
        let mut request = CoapRequest::new();
671
        request.message.header.set_version(1);
672
        request
673
            .message
674
            .header
675
            .set_type(coap_lite::MessageType::Confirmable);
676
        request.message.header.set_code("0.01");
677
        request.message.header.message_id = 1;
678
        request.message.set_token(vec![0x51, 0x55, 0x77, 0xE8]);
679
        request
680
            .message
681
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
682

683
        let recv_packet = client.send(request).await.unwrap();
684
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
685
    }
686

687
    #[tokio::test]
688
    async fn test_echo_server_no_token() {
689
        let server_port = spawn_server("127.0.0.1:0", request_handler)
690
            .recv()
691
            .await
692
            .unwrap();
693

694
        let client = UdpCoAPClient::new(format!("127.0.0.1:{}", server_port))
695
            .await
696
            .unwrap();
697
        let mut packet = CoapRequest::new();
698
        packet.message.header.set_version(1);
699
        packet
700
            .message
701
            .header
702
            .set_type(coap_lite::MessageType::Confirmable);
703
        packet.message.header.set_code("0.01");
704
        packet.message.header.message_id = 1;
705
        packet
706
            .message
707
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
708
        let recv_packet = client.send(packet).await.unwrap();
709
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
710
    }
711

712
    #[tokio::test]
713
    #[ignore]
714
    async fn test_echo_server_no_token_v6() {
715
        let server_port = spawn_server("::1:0", request_handler).recv().await.unwrap();
716

717
        let client = UdpCoAPClient::new(format!("::1:{}", server_port))
718
            .await
719
            .unwrap();
720
        let mut packet = CoapRequest::new();
721
        packet.message.header.set_version(1);
722
        packet
723
            .message
724
            .header
725
            .set_type(coap_lite::MessageType::Confirmable);
726
        packet.message.header.set_code("0.01");
727
        packet.message.header.message_id = 1;
728
        packet
729
            .message
730
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
731

732
        let recv_packet = client.send(packet).await.unwrap();
733
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
734
    }
735

736
    #[tokio::test]
737
    async fn test_update_resource() {
738
        let path = "/test";
739
        let payload1 = b"data1".to_vec();
740
        let payload2 = b"data2".to_vec();
741
        let (tx, mut rx) = mpsc::unbounded_channel();
742
        let (tx2, mut rx2) = mpsc::unbounded_channel();
743
        let mut step = 1;
744

745
        let server_port = spawn_server("127.0.0.1:0", request_handler)
746
            .recv()
747
            .await
748
            .unwrap();
749

750
        let client = UdpCoAPClient::new(format!("127.0.0.1:{}", server_port))
751
            .await
752
            .unwrap();
753

754
        tx.send(step).unwrap();
755
        let mut request = CoapRequest::new();
756
        request.set_method(RequestType::Put);
757
        request.set_path(path);
758
        request.message.payload = payload1.clone();
759
        client.send(request.clone()).await.unwrap();
760

761
        let mut receive_step = 1;
762
        let payload1_clone = payload1.clone();
763
        let payload2_clone = payload2.clone();
764
        client
765
            .observe(path, move |msg| {
766
                match rx.try_recv() {
767
                    Ok(n) => receive_step = n,
768
                    _ => (),
769
                }
770

771
                match receive_step {
772
                    1 => assert_eq!(msg.payload, payload1_clone),
773
                    2 => {
774
                        assert_eq!(msg.payload, payload2_clone);
775
                        tx2.send(()).unwrap();
776
                    }
777
                    _ => panic!("unexpected step"),
778
                }
779
            })
780
            .await
781
            .unwrap();
782

783
        step = 2;
784
        tx.send(step).unwrap();
785
        request.message.payload = payload2.clone();
786
        let client2 = UdpCoAPClient::new(format!("127.0.0.1:{}", server_port))
787
            .await
788
            .unwrap();
789
        let _ = client2.send(request).await.unwrap();
790
        assert_eq!(
791
            tokio::time::timeout(Duration::new(5, 0), rx2.recv())
792
                .await
793
                .unwrap(),
794
            Some(())
795
        );
796
    }
797

798
    #[tokio::test]
799
    async fn test_observe_transparent_transmission() {
800
        let path = "/test";
801
        let (tx, mut rx) = mpsc::unbounded_channel();
802

803
        let server_port = spawn_server_disable_observe("127.0.0.1:0", request_handler)
804
            .recv()
805
            .await
806
            .unwrap();
807

808
        let client = UdpCoAPClient::new(format!("127.0.0.1:{}", server_port))
809
            .await
810
            .unwrap();
811

812
        client
813
            .observe(path, move |msg| {
814
                assert_eq!(msg.payload, b"test".to_vec());
815
                tx.send(()).unwrap();
816
            })
817
            .await
818
            .unwrap();
819

820
        assert_eq!(
821
            tokio::time::timeout(Duration::new(5, 0), rx.recv())
822
                .await
823
                .unwrap(),
824
            Some(())
825
        );
826
    }
827

828
    #[tokio::test]
829
    async fn multicast_server_all_coap() {
830
        // segment not relevant with IPv4
831
        let segment = 0x0;
832
        let server_port = spawn_server_with_all_coap("0.0.0.0", request_handler, segment)
833
            .recv()
834
            .await
835
            .unwrap();
836

837
        let client = UdpCoAPClient::new(format!("127.0.0.1:{}", server_port))
838
            .await
839
            .unwrap();
840
        let mut request = CoapRequest::new();
841
        request.message.header.set_version(1);
842
        request
843
            .message
844
            .header
845
            .set_type(coap_lite::MessageType::Confirmable);
846
        request.message.header.set_code("0.01");
847
        request.message.header.message_id = 1;
848
        request.message.set_token(vec![0x51, 0x55, 0x77, 0xE8]);
849
        request
850
            .message
851
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
852
        let recv_packet = client.send(request).await.unwrap();
853

854
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
855

856
        let client = UdpCoAPClient::new(format!("224.0.1.187:{}", server_port))
857
            .await
858
            .unwrap();
859
        let mut request = RequestBuilder::new("test-echo", RequestType::Get)
860
            .data(Some(vec![0x51, 0x55, 0x77, 0xE8]))
861
            .confirmable(true)
862
            .build();
863

864
        let mut receiver = client.create_receiver_for(&request).await;
865
        client.send_all_coap(&mut request, segment).await.unwrap();
866
        let recv_packet = receiver.receive().await.unwrap();
867
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
868
    }
869

870
    //This test right now does not work on windows
871
    #[cfg(unix)]
872
    #[tokio::test]
873
    #[ignore]
874
    async fn multicast_server_all_coap_v6() {
875
        // use segment 0x04 which should be the smallest administered scope
876

877
        let segment = 0x04;
878
        let server_port = spawn_server_with_all_coap("::0", request_handler, segment)
879
            .recv()
880
            .await
881
            .unwrap();
882

883
        let client = UdpCoAPClient::new(format!("::1:{}", server_port))
884
            .await
885
            .unwrap();
886
        let mut request = CoapRequest::new();
887
        request.message.header.set_version(1);
888
        request
889
            .message
890
            .header
891
            .set_type(coap_lite::MessageType::Confirmable);
892
        request.message.header.set_code("0.01");
893
        request.message.header.message_id = 1;
894
        request.message.set_token(vec![0x51, 0x55, 0x77, 0xE8]);
895
        request
896
            .message
897
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
898
        client.send_single_request(&request).await.unwrap();
899

900
        let recv_packet = client.send(request).await.unwrap();
901
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
902

903
        // use 0xff02 to keep it within this network
904
        let client = UdpCoAPClient::new(format!("ff0{}::fd:{}", segment, server_port))
905
            .await
906
            .unwrap();
907
        let mut request = CoapRequest::new();
908
        request.message.header.set_version(1);
909
        request
910
            .message
911
            .header
912
            .set_type(coap_lite::MessageType::NonConfirmable);
913
        request.message.header.set_code("0.01");
914
        request.message.header.message_id = 2;
915
        request.message.set_token(vec![0x51, 0x55, 0x77, 0xE8]);
916
        request
917
            .message
918
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
919
        let mut receiver = client.create_receiver_for(&request).await;
920
        client.send_all_coap(&mut request, segment).await.unwrap();
921
        let recv_packet = receiver.receive().await.unwrap();
922
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
923
    }
924

925
    #[test]
926
    fn multicast_join_leave() {
927
        std::thread::Builder::new()
928
            .name(String::from("v4-server"))
929
            .spawn(move || {
930
                tokio::runtime::Runtime::new()
931
                    .unwrap()
932
                    .block_on(async move {
933
                        // multicast needs a server on a real interface
934
                        let sock = UdpSocket::bind(("0.0.0.0", 0)).await.unwrap();
935
                        let mut listener = Box::new(UdpCoapListener::from_socket(sock));
936
                        listener.join_multicast(IpAddr::V4(Ipv4Addr::new(224, 0, 1, 1)));
937
                        listener.join_multicast(IpAddr::V4(Ipv4Addr::new(224, 1, 1, 1)));
938
                        listener.leave_multicast(IpAddr::V4(Ipv4Addr::new(224, 0, 1, 1)));
939
                        listener.leave_multicast(IpAddr::V4(Ipv4Addr::new(224, 1, 1, 1)));
940
                        let server = Server::from_listeners(vec![listener]);
941
                        server.run(request_handler).await.unwrap();
942
                    })
943
            })
944
            .unwrap();
945

946
        std::thread::sleep(std::time::Duration::from_secs(1));
947
    }
948
    #[test]
949
    #[ignore]
950
    fn multicast_join_leave_v6() {
951
        std::thread::Builder::new()
952
            .name(String::from("v6-server"))
953
            .spawn(move || {
954
                tokio::runtime::Runtime::new()
955
                    .unwrap()
956
                    .block_on(async move {
957
                        // multicast needs a server on a real interface
958
                        let sock = UdpSocket::bind(("0.0.0.0", 0)).await.unwrap();
959
                        let mut listener = Box::new(UdpCoapListener::from_socket(sock));
960
                        listener.join_multicast(IpAddr::V6(Ipv6Addr::new(
961
                            0xff02, 0, 0, 0, 0, 0, 1, 0x1,
962
                        )));
963
                        listener.join_multicast(IpAddr::V6(Ipv6Addr::new(
964
                            0xff02, 0, 0, 0, 0, 1, 0, 0x2,
965
                        )));
966
                        listener.leave_multicast(IpAddr::V6(Ipv6Addr::new(
967
                            0xff02, 0, 0, 0, 0, 0, 1, 0x1,
968
                        )));
969
                        listener.join_multicast(IpAddr::V6(Ipv6Addr::new(
970
                            0xff02, 0, 0, 0, 0, 1, 0, 0x2,
971
                        )));
972
                        let server = Server::from_listeners(vec![listener]);
973
                        server.run(request_handler).await.unwrap();
974
                    })
975
            })
976
            .unwrap();
977

978
        std::thread::sleep(std::time::Duration::from_secs(1));
979
    }
980

981
    fn get_expected_response() -> Vec<u8> {
982
        let mut resp = vec![];
983
        for c in b'a'..=b'z' {
984
            resp.extend(std::iter::repeat(c).take(1024));
985
        }
986
        resp
987
    }
988
    async fn block2_responder(
989
        mut req: Box<CoapRequest<SocketAddr>>,
990
    ) -> Box<CoapRequest<SocketAddr>> {
991
        // vec should contain 'a' 1024 times, then 'b' 1024, up to ascii 'z'
992

993
        match req.response {
994
            Some(ref mut response) => {
995
                response.message.payload = get_expected_response();
996
            }
997
            _ => {}
998
        }
999
        return req;
1000
    }
1001
    #[tokio::test]
1002
    async fn test_block2_server_response() {
1003
        let server_port = spawn_server("127.0.0.1:0", block2_responder)
1004
            .recv()
1005
            .await
1006
            .unwrap();
1007

1008
        let client = UdpCoAPClient::new(format!("127.0.0.1:{}", server_port))
1009
            .await
1010
            .unwrap();
1011
        let resp = client
1012
            .send(RequestBuilder::new("/", RequestType::Get).build())
1013
            .await
1014
            .unwrap();
1015
        assert_eq!(
1016
            resp.message.payload,
1017
            get_expected_response(),
1018
            "responses do not match"
1019
        );
1020
    }
1021
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc