• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Covertness / coap-rs / 25931059598

15 May 2026 05:11PM UTC coverage: 84.871% (-0.4%) from 85.288%
25931059598

Pull #130

github

web-flow
Merge 936cb4815 into 094b60c74
Pull Request #130: Fix all Clippy warnings and enforce Clippy in CI

480 of 567 new or added lines in 19 files covered. (84.66%)

3 existing lines in 2 files now uncovered.

1251 of 1474 relevant lines covered (84.87%)

3.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.11
/src/server.rs
1
#[cfg(feature = "router")]
2
use crate::router::{request::Request, Router};
3
use async_trait::async_trait;
4
use coap_lite::{BlockHandler, BlockHandlerConfig, CoapOption, CoapRequest, CoapResponse, Packet};
5
use log::debug;
6
use std::{
7
    future::Future,
8
    net::{self, IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs},
9
    sync::Arc,
10
};
11
use tokio::{
12
    io,
13
    net::UdpSocket,
14
    select,
15
    sync::{
16
        mpsc::{self, UnboundedReceiver, UnboundedSender},
17
        Mutex,
18
    },
19
    task::JoinHandle,
20
};
21

22
use crate::observer::{encode_coap_uint, Observer};
23

24
#[derive(Debug)]
25
pub enum CoAPServerError {
26
    NetworkError,
27
    EventLoopError,
28
    AnotherHandlerIsRunning,
29
    EventSendError,
30
}
31

32
use tokio::io::Error;
33

34
#[async_trait]
35
pub trait Dispatcher: Send + Sync {
36
    async fn dispatch(&self, request: CoapRequest<SocketAddr>) -> Option<CoapResponse>;
37
}
38

39
#[async_trait]
40
/// This trait represents a generic way to respond to a listener. If you want to implement your own
41
/// listener, you have to implement this trait to be able to send responses back through the
42
/// correct transport
43
pub trait Responder: Sync + Send {
44
    async fn respond(&self, response: Vec<u8>);
45
    fn address(&self) -> SocketAddr;
46
}
47

48
/// channel to send new requests from a transport to the CoAP server
49
pub type TransportRequestSender = UnboundedSender<(Vec<u8>, Arc<dyn Responder>)>;
50

51
/// channel used by CoAP server to receive new requests
52
pub type TransportRequestReceiver = UnboundedReceiver<(Vec<u8>, Arc<dyn Responder>)>;
53

54
type UdpResponseReceiver = UnboundedReceiver<(Vec<u8>, SocketAddr)>;
55
type UdpResponseSender = UnboundedSender<(Vec<u8>, SocketAddr)>;
56

57
// listeners receive new connections
58
#[async_trait]
59
pub trait Listener: Send {
60
    async fn listen(
61
        self: Box<Self>,
62
        sender: TransportRequestSender,
63
    ) -> std::io::Result<JoinHandle<std::io::Result<()>>>;
64
}
65
/// listener for a UDP socket
66
pub struct UdpCoapListener {
67
    socket: UdpSocket,
68
    multicast_addresses: Vec<IpAddr>,
69
    response_receiver: UdpResponseReceiver,
70
    response_sender: UdpResponseSender,
71
}
72

73
#[async_trait]
74
/// A trait for handling incoming requests. Use this instead of a closure
75
/// if you want to modify some external state
76
pub trait RequestHandler: Send + Sync + 'static {
77
    async fn handle_request(
78
        &self,
79
        mut request: Box<CoapRequest<SocketAddr>>,
80
    ) -> Box<CoapRequest<SocketAddr>>;
81
}
82

83
#[async_trait]
84
impl<F, HandlerRet> RequestHandler for F
85
where
86
    F: Fn(Box<CoapRequest<SocketAddr>>) -> HandlerRet + Send + Sync + 'static,
87
    HandlerRet: Future<Output = Box<CoapRequest<SocketAddr>>> + Send,
88
{
89
    async fn handle_request(
47✔
90
        &self,
91
        request: Box<CoapRequest<SocketAddr>>,
92
    ) -> Box<CoapRequest<SocketAddr>> {
93
        self(request).await
28✔
94
    }
95
}
96

97
/// A listener for UDP packets. This listener can also subscribe to multicast addresses
98
impl UdpCoapListener {
99
    pub fn new<A: ToSocketAddrs>(addr: A) -> Result<Self, Error> {
2✔
100
        let std_socket = net::UdpSocket::bind(addr)?;
2✔
101
        std_socket.set_nonblocking(true)?;
4✔
102
        let socket = UdpSocket::from_std(std_socket)?;
2✔
103
        Ok(Self::from_socket(socket))
4✔
104
    }
105

106
    pub fn from_socket(socket: tokio::net::UdpSocket) -> Self {
1✔
107
        let (tx, rx) = mpsc::unbounded_channel();
2✔
108
        Self {
109
            socket,
110
            multicast_addresses: Vec::new(),
1✔
111
            response_receiver: rx,
112
            response_sender: tx,
113
        }
114
    }
115

116
    /// join multicast - adds the multicast addresses to the unicast listener
117
    /// - IPv4 multicast address range is '224.0.0.0/4'
118
    /// - IPv6 AllCoAp multicast addresses are 'ff00::/8'
119
    ///
120
    /// Parameter segment is used with IPv6 to determine the first octet.
121
    /// - It's value can be between 0x0 and 0xf.
122
    /// - To join multiple segments, you have to call enable_discovery for each of the segments.
123
    ///
124
    /// Some Multicast address scope
125
    /// IPv6        IPv4 equivalent[16]            Scope                Purpose
126
    /// ffx1::/16    127.0.0.0/8                    Interface-local        Packets with this destination address may not be sent over any network link, but must remain within the current node; this is the multicast equivalent of the unicast loopback address.
127
    /// ffx2::/16    224.0.0.0/24                Link-local            Packets with this destination address may not be routed anywhere.
128
    /// ffx3::/16    239.255.0.0/16                IPv4 local scope
129
    /// ffx4::/16                                Admin-local            The smallest scope that must be administratively configured.
130
    /// ffx5::/16                                Site-local            Restricted to the local physical network.
131
    /// ffx8::/16    239.192.0.0/14                Organization-local    Restricted to networks used by the organization administering the local network. (For example, these addresses might be used over VPNs; when packets for this group are routed over the public internet (where these addresses are not valid), they would have to be encapsulated in some other protocol.)
132
    /// ffxe::/16    224.0.1.0-238.255.255.255    Global scope        Eligible to be routed over the public internet.
133
    ///
134
    /// Notable addresses:
135
    /// ff02::1        All nodes on the local network segment
136
    /// ff0x::c        Simple Service Discovery Protocol
137
    /// ff0x::fb    Multicast DNS
138
    /// ff0x::fb    Multicast CoAP
139
    /// ff0x::114    Used for experiments
140
    //    pub fn join_multicast(&mut self, addr: IpAddr) {
141
    //        self.udp_server.join_multicast(addr);
142
    //    }
143
    pub fn join_multicast(&mut self, addr: IpAddr) {
1✔
144
        assert!(addr.is_multicast());
1✔
145
        // determine wether IPv4 or IPv6 and
146
        // join the appropriate multicast address
147
        match self.socket.local_addr().unwrap() {
1✔
148
            SocketAddr::V4(val) => {
1✔
149
                match addr {
1✔
150
                    IpAddr::V4(ipv4) => {
1✔
151
                        let i = *val.ip();
1✔
152
                        self.socket.join_multicast_v4(ipv4, i).unwrap();
1✔
153
                        self.multicast_addresses.push(addr);
1✔
154
                    }
155
                    IpAddr::V6(_ipv6) => { /* handle IPv6 */ }
×
156
                }
157
            }
158
            SocketAddr::V6(_val) => {
×
159
                match addr {
×
160
                    IpAddr::V4(_ipv4) => { /* handle IPv4 */ }
×
161
                    IpAddr::V6(ipv6) => {
×
162
                        self.socket.join_multicast_v6(&ipv6, 0).unwrap();
×
163
                        self.multicast_addresses.push(addr);
×
164
                        //self.socket.set_only_v6(true)?;
165
                    }
166
                }
167
            }
168
        }
169
    }
170

171
    /// leave multicast - remove the multicast address from the listener
172
    pub fn leave_multicast(&mut self, addr: IpAddr) {
1✔
173
        assert!(addr.is_multicast());
1✔
174
        // determine wether IPv4 or IPv6 and
175
        // leave the appropriate multicast address
176
        match self.socket.local_addr().unwrap() {
1✔
177
            SocketAddr::V4(val) => {
1✔
178
                match addr {
1✔
179
                    IpAddr::V4(ipv4) => {
1✔
180
                        let i = *val.ip();
1✔
181
                        self.socket.leave_multicast_v4(ipv4, i).unwrap();
1✔
182
                        let index = self
1✔
183
                            .multicast_addresses
184
                            .iter()
185
                            .position(|&item| item == addr)
3✔
186
                            .unwrap();
187
                        self.multicast_addresses.remove(index);
1✔
188
                    }
189
                    IpAddr::V6(_ipv6) => { /* handle IPv6 */ }
×
190
                }
191
            }
192
            SocketAddr::V6(_val) => {
×
193
                match addr {
×
194
                    IpAddr::V4(_ipv4) => { /* handle IPv4 */ }
×
195
                    IpAddr::V6(ipv6) => {
×
196
                        self.socket.leave_multicast_v6(&ipv6, 0).unwrap();
×
197
                        let index = self
×
198
                            .multicast_addresses
199
                            .iter()
200
                            .position(|&item| item == addr)
×
201
                            .unwrap();
202
                        self.multicast_addresses.remove(index);
×
203
                    }
204
                }
205
            }
206
        }
207
    }
208
    /// enable AllCoAP multicasts - adds the AllCoap addresses to the listener
209
    /// - IPv4 AllCoAP multicast address is '224.0.1.187'
210
    /// - IPv6 AllCoAp multicast addresses are 'ff0?::fd'
211
    ///
212
    /// Parameter segment is used with IPv6 to determine the first octet.
213
    /// - It's value can be between 0x0 and 0xf.
214
    /// - To join multiple segments, you have to call enable_discovery for each of the segments.
215
    ///
216
    /// For further details see method join_multicast
217
    pub fn enable_all_coap(&mut self, segment: u8) {
1✔
218
        assert!(segment <= 0xf);
1✔
219
        let m = match self.socket.local_addr().unwrap() {
1✔
220
            SocketAddr::V4(_val) => IpAddr::V4(Ipv4Addr::new(224, 0, 1, 187)),
1✔
221
            SocketAddr::V6(_val) => IpAddr::V6(Ipv6Addr::new(
×
222
                0xff00 + segment as u16,
×
223
                0,
224
                0,
225
                0,
226
                0,
227
                0,
228
                0,
229
                0xfd,
230
            )),
231
        };
232
        self.join_multicast(m);
1✔
233
    }
234
}
235
#[async_trait]
236
impl Listener for UdpCoapListener {
237
    async fn listen(
5✔
238
        mut self: Box<Self>,
239
        sender: TransportRequestSender,
240
    ) -> std::io::Result<JoinHandle<std::io::Result<()>>> {
241
        return Ok(tokio::spawn(self.receive_loop(sender)));
3✔
242
    }
243
}
244

245
#[derive(Clone)]
246
struct UdpResponder {
247
    address: SocketAddr, // this is the address we are sending to
248
    tx: UdpResponseSender,
249
}
250

251
#[async_trait]
252
impl Responder for UdpResponder {
253
    async fn respond(&self, response: Vec<u8>) {
4✔
254
        let _ = self.tx.send((response, self.address));
2✔
255
    }
256
    fn address(&self) -> SocketAddr {
1✔
257
        self.address
1✔
258
    }
259
}
260

261
impl UdpCoapListener {
262
    pub async fn receive_loop(mut self, sender: TransportRequestSender) -> std::io::Result<()> {
4✔
263
        loop {
2✔
264
            let mut recv_vec = Vec::with_capacity(u16::MAX as usize);
2✔
265
            select! {
3✔
266
                message =self.socket.recv_buf_from(&mut recv_vec)=> {
267
                    match message {
268
                        Ok((_size, from)) => {
269
                            sender.send((recv_vec, Arc::new(UdpResponder{address: from, tx: self.response_sender.clone()}))).map_err( |_| std::io::Error::other("server channel error"))?;
270
                        }
271
                        Err(e) => {
272
                            return Err(e);
273
                        }
274
                    }
275
                },
276
                response = self.response_receiver.recv() => {
277
                    if let Some((bytes, to)) = response{
278
                        debug!("sending {:?} to {:?}", bytes, to);
279
                        self.socket.send_to(&bytes, to).await?;
280
                    }
281
                    else {
282
                        // in case nobody is listening to us, we can just terminate, though this
283
                        // should never happen for UDP
284
                        return Ok(());
285
                    }
286

287
                }
288
            }
289
        }
290
    }
291
}
292

293
#[derive(Debug)]
294
pub struct QueuedMessage {
295
    pub address: SocketAddr,
296
    pub message: Packet,
297
}
298

299
struct ServerCoapState {
300
    observer: Observer,
301
    block_handler: BlockHandler<SocketAddr>,
302
    disable_observe: bool,
303
}
304

305
pub enum ShouldForwardToHandler {
306
    True,
307
    False,
308
}
309

310
impl ServerCoapState {
311
    pub async fn intercept_request(
1✔
312
        &mut self,
313
        request: &mut CoapRequest<SocketAddr>,
314
        responder: Arc<dyn Responder>,
315
    ) -> ShouldForwardToHandler {
316
        match self.block_handler.intercept_request(request) {
3✔
317
            Ok(true) => return ShouldForwardToHandler::False,
1✔
318
            Err(_err) => return ShouldForwardToHandler::False,
×
319
            Ok(false) => {}
320
        };
321

322
        if self.disable_observe {
1✔
323
            return ShouldForwardToHandler::True;
1✔
324
        }
325

326
        let should_be_forwarded = self.observer.request_handler(request, responder).await;
2✔
327
        if should_be_forwarded {
2✔
328
            ShouldForwardToHandler::True
1✔
329
        } else {
330
            ShouldForwardToHandler::False
1✔
331
        }
332
    }
333

334
    pub async fn intercept_response(&mut self, request: &mut CoapRequest<SocketAddr>) {
4✔
335
        let resource_path = request.get_path();
1✔
336

337
        let is_block_fetch_for_observer = request.message.get_option(CoapOption::Block2).is_some()
3✔
338
            && request.message.get_option(CoapOption::Observe).is_none()
1✔
339
            && request.source.is_some()
1✔
340
            && self
2✔
341
                .observer
342
                .is_observing(&request.source.unwrap(), &resource_path);
2✔
343

344
        if is_block_fetch_for_observer {
1✔
345
            if let Some((payload, etag)) =
1✔
346
                self.observer.get_resource_payload_and_etag(&resource_path)
347
            {
348
                if let Some(ref mut response) = request.response {
2✔
349
                    response.message.payload = payload.to_vec();
2✔
350
                    response.message.clear_option(CoapOption::ETag);
1✔
351
                    response.message.add_option(CoapOption::ETag, etag);
1✔
352
                    // Prevent duplicate Size2 options, clear first.
353
                    response.message.clear_option(CoapOption::Size2);
1✔
354
                    response
1✔
355
                        .message
356
                        .add_option(CoapOption::Size2, encode_coap_uint(payload.len()));
1✔
357
                }
358
            }
359
        }
360

361
        if let Err(err) = self.block_handler.intercept_response(request) {
3✔
362
            let _ = request.apply_from_error(err);
×
363
        }
364
    }
365

366
    pub fn new() -> Self {
1✔
367
        Self {
368
            observer: Observer::new(),
1✔
369
            block_handler: BlockHandler::new(BlockHandlerConfig::default()),
2✔
370
            disable_observe: false,
371
        }
372
    }
373
    pub fn disable_observe_handling(&mut self, value: bool) {
1✔
374
        self.disable_observe = value
1✔
375
    }
376
}
377

378
pub struct Server {
379
    listeners: Vec<Box<dyn Listener>>,
380
    coap_state: Arc<Mutex<ServerCoapState>>,
381
    new_packet_receiver: TransportRequestReceiver,
382
    new_packet_sender: TransportRequestSender,
383
}
384

385
impl Server {
386
    /// Creates a CoAP server listening on the given address.
387
    pub fn new_udp<A: ToSocketAddrs>(addr: A) -> Result<Self, io::Error> {
1✔
388
        let listener: Vec<Box<dyn Listener>> = vec![Box::new(UdpCoapListener::new(addr)?)];
2✔
389
        Ok(Self::from_listeners(listener))
2✔
390
    }
391

392
    pub fn from_listeners(listeners: Vec<Box<dyn Listener>>) -> Self {
1✔
393
        let (tx, rx) = mpsc::unbounded_channel();
2✔
394
        Server {
395
            listeners,
396
            coap_state: Arc::new(Mutex::new(ServerCoapState::new())),
2✔
397
            new_packet_receiver: rx,
398
            new_packet_sender: tx,
399
        }
400
    }
401

402
    async fn spawn_handles(
1✔
403
        listeners: Vec<Box<dyn Listener>>,
404
        sender: TransportRequestSender,
405
    ) -> std::io::Result<Vec<JoinHandle<std::io::Result<()>>>> {
406
        let mut handles = vec![];
1✔
407
        for listener in listeners.into_iter() {
4✔
408
            let handle = listener.listen(sender.clone()).await?;
3✔
409
            handles.push(handle);
1✔
410
        }
411
        Ok(handles)
1✔
412
    }
413

414
    /// run the server.
415
    pub async fn run<Handler: RequestHandler>(mut self, handler: Handler) -> Result<(), io::Error> {
64✔
416
        let _handles = Self::spawn_handles(self.listeners, self.new_packet_sender.clone()).await?;
26✔
417

418
        let handler_arc = Arc::new(handler);
26✔
419
        // receive an input, sync our cache / states, then call custom handler
420
        loop {
421
            let (bytes, respond) = self
62✔
NEW
422
                .new_packet_receiver
×
423
                .recv()
424
                .await
76✔
425
                .ok_or_else(|| std::io::Error::other("listen channel closed"))?;
12✔
426
            if let Ok(packet) = Packet::from_bytes(&bytes) {
24✔
427
                let mut request = Box::new(CoapRequest::<SocketAddr>::from_packet(
36✔
428
                    packet,
12✔
429
                    respond.address(),
24✔
430
                ));
431
                let mut coap_state = self.coap_state.lock().await;
24✔
432
                let should_forward = coap_state
36✔
433
                    .intercept_request(&mut request, respond.clone())
12✔
434
                    .await;
48✔
435

436
                match should_forward {
12✔
437
                    ShouldForwardToHandler::True => {
438
                        let handler_clone = handler_arc.clone();
24✔
439
                        let coap_state_clone = self.coap_state.clone();
24✔
440
                        tokio::spawn(async move {
37✔
441
                            request = handler_clone.handle_request(request).await;
26✔
442
                            coap_state_clone
55✔
443
                                .lock()
11✔
444
                                .await
34✔
445
                                .intercept_response(request.as_mut())
11✔
446
                                .await;
22✔
447

448
                            Self::respond_to_request(request, respond).await;
11✔
449
                        });
450
                    }
451
                    ShouldForwardToHandler::False => {
452
                        Self::respond_to_request(request, respond).await;
12✔
453
                    }
454
                }
455
            }
456
        }
457
    }
458

459
    #[cfg(feature = "router")]
460
    pub async fn serve<S>(self, router: Router<S>) -> Result<(), io::Error>
4✔
461
    where
462
        S: Clone + Send + Sync + 'static,
463
    {
464
        let router = Arc::new(router);
4✔
NEW
465
        let handler = {
×
466
            move |req| {
4✔
467
                let r = router.clone();
4✔
468
                let req = Request::new(req);
2✔
469
                async move { r.handle(req).await.req }
6✔
470
            }
471
        };
472
        self.run(handler).await
6✔
473
    }
474

475
    async fn respond_to_request(req: Box<CoapRequest<SocketAddr>>, responder: Arc<dyn Responder>) {
4✔
476
        // if we have some reponse to send, send it
477
        if let Some(Ok(b)) = req.response.map(|resp| resp.message.to_bytes()) {
6✔
478
            responder.respond(b).await;
2✔
479
        }
480
    }
481
    #[deprecated(
482
        since = "0.21.0",
483
        note = "Use 'coap::Server::automatic_observe_handling' instead."
484
    )]
485
    /// disable auto-observe handling in server
UNCOV
486
    pub async fn disable_observe_handling(&mut self, value: bool) {
×
UNCOV
487
        self.automatic_observe_handling(value).await
×
488
    }
489
    /// Controls whether the server automatically handles observe options.
490
    /// Automatic handling is on by default.
491
    ///
492
    /// Set `bypass` to `true` when your handler needs full control over
493
    /// observe — the server will skip its built-in processing.
494
    pub async fn automatic_observe_handling(&mut self, bypass: bool) {
4✔
495
        let mut coap_state = self.coap_state.lock().await;
2✔
496
        coap_state.disable_observe_handling(bypass)
2✔
497
    }
498
}
499

500
#[cfg(test)]
501
pub mod test {
502
    use crate::request::RequestBuilder;
503

504
    use super::super::*;
505
    use super::*;
506
    use coap_lite::{block_handler::BlockValue, CoapOption, RequestType};
507
    use std::str;
508
    use std::time::Duration;
509

510
    pub fn spawn_server<
511
        F: Fn(Box<CoapRequest<SocketAddr>>) -> HandlerRet + Send + Sync + 'static,
512
        HandlerRet,
513
    >(
514
        ip: &'static str,
515
        request_handler: F,
516
    ) -> mpsc::UnboundedReceiver<u16>
517
    where
518
        HandlerRet: Future<Output = Box<CoapRequest<SocketAddr>>> + Send,
519
    {
520
        let (tx, rx) = mpsc::unbounded_channel();
521
        let _task = tokio::spawn(async move {
522
            let sock = UdpSocket::bind(ip).await.unwrap();
523
            let addr = sock.local_addr().unwrap();
524
            let listener = Box::new(UdpCoapListener::from_socket(sock));
525
            let server = Server::from_listeners(vec![listener]);
526
            tx.send(addr.port()).unwrap();
527
            server.run(request_handler).await.unwrap();
528
        });
529

530
        rx
531
    }
532

533
    async fn request_handler(
534
        mut req: Box<CoapRequest<SocketAddr>>,
535
    ) -> Box<CoapRequest<SocketAddr>> {
536
        let uri_path_list = req.message.get_option(CoapOption::UriPath).unwrap().clone();
537
        assert_eq!(uri_path_list.len(), 1);
538

539
        if let Some(ref mut response) = req.response {
540
            response.message.payload = uri_path_list.front().unwrap().clone();
541
        }
542
        req
543
    }
544

545
    pub fn spawn_server_with_all_coap<
546
        F: Fn(Box<CoapRequest<SocketAddr>>) -> HandlerRet + Send + Sync + 'static,
547
        HandlerRet,
548
    >(
549
        ip: &'static str,
550
        request_handler: F,
551
        segment: u8,
552
    ) -> mpsc::UnboundedReceiver<u16>
553
    where
554
        HandlerRet: Future<Output = Box<CoapRequest<SocketAddr>>> + Send,
555
    {
556
        let (tx, rx) = mpsc::unbounded_channel();
557

558
        std::thread::Builder::new()
559
            .name(String::from("v4-server"))
560
            .spawn(move || {
561
                tokio::runtime::Runtime::new()
562
                    .unwrap()
563
                    .block_on(async move {
564
                        // multicast needs a server on a real interface
565
                        let sock = UdpSocket::bind((ip, 0)).await.unwrap();
566
                        let addr = sock.local_addr().unwrap();
567
                        let mut listener = Box::new(UdpCoapListener::from_socket(sock));
568
                        listener.enable_all_coap(segment);
569
                        let server = Server::from_listeners(vec![listener]);
570
                        tx.send(addr.port()).unwrap();
571
                        server.run(request_handler).await.unwrap();
572
                    })
573
            })
574
            .unwrap();
575

576
        rx
577
    }
578

579
    pub fn spawn_server_disable_observe<
580
        F: Fn(Box<CoapRequest<SocketAddr>>) -> HandlerRet + Send + Sync + 'static,
581
        HandlerRet,
582
    >(
583
        ip: &'static str,
584
        request_handler: F,
585
    ) -> mpsc::UnboundedReceiver<u16>
586
    where
587
        HandlerRet: Future<Output = Box<CoapRequest<SocketAddr>>> + Send,
588
    {
589
        let (tx, rx) = mpsc::unbounded_channel();
590
        let _task = tokio::spawn(async move {
591
            let sock = UdpSocket::bind(ip).await.unwrap();
592
            let addr = sock.local_addr().unwrap();
593
            let listener = Box::new(UdpCoapListener::from_socket(sock));
594
            let mut server = Server::from_listeners(vec![listener]);
595
            // `bypass = true` sets the internal `disable_observe` flag,
596
            // so the server skips its built-in observe handling.
597
            server.automatic_observe_handling(true).await;
598
            tx.send(addr.port()).unwrap();
599
            server.run(request_handler).await.unwrap();
600
        });
601

602
        rx
603
    }
604

605
    #[tokio::test]
606
    async fn test_listener_instantiation() {
607
        let listener = UdpCoapListener::new("127.0.0.1:0").unwrap();
608
        assert!(
609
            listener.socket.local_addr().unwrap().ip() == IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))
610
        );
611
        // assert!(listener.socket.blocking() == false);
612

613
        let explicit_socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
614
        let another_listener = UdpCoapListener::from_socket(explicit_socket);
615
        assert!(
616
            another_listener.socket.local_addr().unwrap().ip()
617
                == IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))
618
        );
619
    }
620

621
    #[tokio::test]
622
    async fn test_echo_server() {
623
        let server_port = spawn_server("127.0.0.1:0", request_handler)
624
            .recv()
625
            .await
626
            .unwrap();
627

628
        let client = UdpCoAPClient::new(format!("127.0.0.1:{}", server_port))
629
            .await
630
            .unwrap();
631
        let mut request = CoapRequest::new();
632
        request.message.header.set_version(1);
633
        request
634
            .message
635
            .header
636
            .set_type(coap_lite::MessageType::Confirmable);
637
        request.message.header.set_code("0.01");
638
        request.message.header.message_id = 1;
639
        request.message.set_token(vec![0x51, 0x55, 0x77, 0xE8]);
640
        request
641
            .message
642
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
643
        client.send_single_request(&request).await.unwrap();
644

645
        let recv_packet = client.send(request).await.unwrap();
646
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
647
    }
648

649
    #[tokio::test]
650
    async fn test_put_block() {
651
        let server_port = spawn_server("127.0.0.1:0", request_handler)
652
            .recv()
653
            .await
654
            .unwrap();
655
        let data = "hello this is a payload";
656
        let mut v = Vec::new();
657
        for _ in 0..1024 {
658
            v.extend_from_slice(data.as_bytes());
659
        }
660
        let payload_size = v.len();
661
        let server_string = format!("127.0.0.1:{}", server_port);
662
        let client = UdpCoAPClient::new(server_string.clone()).await.unwrap();
663

664
        let request = RequestBuilder::new("/large", RequestType::Put)
665
            .data(Some(v))
666
            .domain(server_string.clone())
667
            .build();
668

669
        let resp = client.send(request).await.unwrap();
670
        let block_opt = resp
671
            .message
672
            .get_first_option_as::<BlockValue>(CoapOption::Block1)
673
            .expect("expected block opt in response")
674
            .expect("could not decode block1 option");
675
        let expected_number = (payload_size as f32 / 1024.0).ceil() as u16 - 1;
676
        assert_eq!(
677
            block_opt.num, expected_number,
678
            "block not completely received!"
679
        );
680

681
        assert_eq!(resp.message.payload, b"large".to_vec());
682
    }
683

684
    #[tokio::test]
685
    #[ignore]
686
    async fn test_echo_server_v6() {
687
        let server_port = spawn_server("::1:0", request_handler).recv().await.unwrap();
688

689
        let client = UdpCoAPClient::new(format!("::1:{}", server_port))
690
            .await
691
            .unwrap();
692
        let mut request = CoapRequest::new();
693
        request.message.header.set_version(1);
694
        request
695
            .message
696
            .header
697
            .set_type(coap_lite::MessageType::Confirmable);
698
        request.message.header.set_code("0.01");
699
        request.message.header.message_id = 1;
700
        request.message.set_token(vec![0x51, 0x55, 0x77, 0xE8]);
701
        request
702
            .message
703
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
704

705
        let recv_packet = client.send(request).await.unwrap();
706
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
707
    }
708

709
    #[tokio::test]
710
    async fn test_echo_server_no_token() {
711
        let server_port = spawn_server("127.0.0.1:0", request_handler)
712
            .recv()
713
            .await
714
            .unwrap();
715

716
        let client = UdpCoAPClient::new(format!("127.0.0.1:{}", server_port))
717
            .await
718
            .unwrap();
719
        let mut packet = CoapRequest::new();
720
        packet.message.header.set_version(1);
721
        packet
722
            .message
723
            .header
724
            .set_type(coap_lite::MessageType::Confirmable);
725
        packet.message.header.set_code("0.01");
726
        packet.message.header.message_id = 1;
727
        packet
728
            .message
729
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
730
        let recv_packet = client.send(packet).await.unwrap();
731
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
732
    }
733

734
    #[tokio::test]
735
    #[ignore]
736
    async fn test_echo_server_no_token_v6() {
737
        let server_port = spawn_server("::1:0", request_handler).recv().await.unwrap();
738

739
        let client = UdpCoAPClient::new(format!("::1:{}", server_port))
740
            .await
741
            .unwrap();
742
        let mut packet = CoapRequest::new();
743
        packet.message.header.set_version(1);
744
        packet
745
            .message
746
            .header
747
            .set_type(coap_lite::MessageType::Confirmable);
748
        packet.message.header.set_code("0.01");
749
        packet.message.header.message_id = 1;
750
        packet
751
            .message
752
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
753

754
        let recv_packet = client.send(packet).await.unwrap();
755
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
756
    }
757

758
    #[tokio::test]
759
    async fn test_update_resource() {
760
        let path = "/test";
761
        let payload1 = b"data1".to_vec();
762
        let payload2 = b"data2".to_vec();
763
        let (tx, mut rx) = mpsc::unbounded_channel();
764
        let (tx2, mut rx2) = mpsc::unbounded_channel();
765
        let mut step = 1;
766

767
        let server_port = spawn_server("127.0.0.1:0", request_handler)
768
            .recv()
769
            .await
770
            .unwrap();
771

772
        let client = UdpCoAPClient::new(format!("127.0.0.1:{}", server_port))
773
            .await
774
            .unwrap();
775

776
        tx.send(step).unwrap();
777
        let mut request = CoapRequest::new();
778
        request.set_method(RequestType::Put);
779
        request.set_path(path);
780
        request.message.payload = payload1.clone();
781
        client.send(request.clone()).await.unwrap();
782

783
        let mut receive_step = 1;
784
        let payload1_clone = payload1.clone();
785
        let payload2_clone = payload2.clone();
786
        client
787
            .observe(path, move |msg| {
788
                if let Ok(n) = rx.try_recv() {
789
                    receive_step = n
790
                }
791

792
                match receive_step {
793
                    1 => assert_eq!(msg.payload, payload1_clone),
794
                    2 => {
795
                        assert_eq!(msg.payload, payload2_clone);
796
                        tx2.send(()).unwrap();
797
                    }
798
                    _ => panic!("unexpected step"),
799
                }
800
            })
801
            .await
802
            .unwrap();
803

804
        step = 2;
805
        tx.send(step).unwrap();
806
        request.message.payload = payload2.clone();
807
        let client2 = UdpCoAPClient::new(format!("127.0.0.1:{}", server_port))
808
            .await
809
            .unwrap();
810
        let _ = client2.send(request).await.unwrap();
811
        assert_eq!(
812
            tokio::time::timeout(Duration::new(5, 0), rx2.recv())
813
                .await
814
                .unwrap(),
815
            Some(())
816
        );
817
    }
818

819
    #[tokio::test]
820
    async fn test_observe_transparent_transmission() {
821
        let path = "/test";
822
        let (tx, mut rx) = mpsc::unbounded_channel();
823

824
        let server_port = spawn_server_disable_observe("127.0.0.1:0", request_handler)
825
            .recv()
826
            .await
827
            .unwrap();
828

829
        let client = UdpCoAPClient::new(format!("127.0.0.1:{}", server_port))
830
            .await
831
            .unwrap();
832

833
        client
834
            .observe(path, move |msg| {
835
                assert_eq!(msg.payload, b"test".to_vec());
836
                tx.send(()).unwrap();
837
            })
838
            .await
839
            .unwrap();
840

841
        assert_eq!(
842
            tokio::time::timeout(Duration::new(5, 0), rx.recv())
843
                .await
844
                .unwrap(),
845
            Some(())
846
        );
847
    }
848

849
    #[tokio::test]
850
    async fn multicast_server_all_coap() {
851
        // segment not relevant with IPv4
852
        let segment = 0x0;
853
        let server_port = spawn_server_with_all_coap("0.0.0.0", request_handler, segment)
854
            .recv()
855
            .await
856
            .unwrap();
857

858
        let client = UdpCoAPClient::new(format!("127.0.0.1:{}", server_port))
859
            .await
860
            .unwrap();
861
        let mut request = CoapRequest::new();
862
        request.message.header.set_version(1);
863
        request
864
            .message
865
            .header
866
            .set_type(coap_lite::MessageType::Confirmable);
867
        request.message.header.set_code("0.01");
868
        request.message.header.message_id = 1;
869
        request.message.set_token(vec![0x51, 0x55, 0x77, 0xE8]);
870
        request
871
            .message
872
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
873
        let recv_packet = client.send(request).await.unwrap();
874

875
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
876

877
        let client = UdpCoAPClient::new(format!("224.0.1.187:{}", server_port))
878
            .await
879
            .unwrap();
880
        let mut request = RequestBuilder::new("test-echo", RequestType::Get)
881
            .data(Some(vec![0x51, 0x55, 0x77, 0xE8]))
882
            .confirmable(true)
883
            .build();
884

885
        let mut receiver = client.create_receiver_for(&request).await;
886
        client.send_all_coap(&mut request, segment).await.unwrap();
887
        let recv_packet = receiver.receive().await.unwrap();
888
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
889
    }
890

891
    //This test right now does not work on windows
892
    #[cfg(unix)]
893
    #[tokio::test]
894
    #[ignore]
895
    async fn multicast_server_all_coap_v6() {
896
        // use segment 0x04 which should be the smallest administered scope
897

898
        let segment = 0x04;
899
        let server_port = spawn_server_with_all_coap("::0", request_handler, segment)
900
            .recv()
901
            .await
902
            .unwrap();
903

904
        let client = UdpCoAPClient::new(format!("::1:{}", server_port))
905
            .await
906
            .unwrap();
907
        let mut request = CoapRequest::new();
908
        request.message.header.set_version(1);
909
        request
910
            .message
911
            .header
912
            .set_type(coap_lite::MessageType::Confirmable);
913
        request.message.header.set_code("0.01");
914
        request.message.header.message_id = 1;
915
        request.message.set_token(vec![0x51, 0x55, 0x77, 0xE8]);
916
        request
917
            .message
918
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
919
        client.send_single_request(&request).await.unwrap();
920

921
        let recv_packet = client.send(request).await.unwrap();
922
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
923

924
        // use 0xff02 to keep it within this network
925
        let client = UdpCoAPClient::new(format!("ff0{}::fd:{}", segment, server_port))
926
            .await
927
            .unwrap();
928
        let mut request = CoapRequest::new();
929
        request.message.header.set_version(1);
930
        request
931
            .message
932
            .header
933
            .set_type(coap_lite::MessageType::NonConfirmable);
934
        request.message.header.set_code("0.01");
935
        request.message.header.message_id = 2;
936
        request.message.set_token(vec![0x51, 0x55, 0x77, 0xE8]);
937
        request
938
            .message
939
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
940
        let mut receiver = client.create_receiver_for(&request).await;
941
        client.send_all_coap(&mut request, segment).await.unwrap();
942
        let recv_packet = receiver.receive().await.unwrap();
943
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
944
    }
945

946
    #[test]
947
    fn multicast_join_leave() {
948
        std::thread::Builder::new()
949
            .name(String::from("v4-server"))
950
            .spawn(move || {
951
                tokio::runtime::Runtime::new()
952
                    .unwrap()
953
                    .block_on(async move {
954
                        // multicast needs a server on a real interface
955
                        let sock = UdpSocket::bind(("0.0.0.0", 0)).await.unwrap();
956
                        let mut listener = Box::new(UdpCoapListener::from_socket(sock));
957
                        listener.join_multicast(IpAddr::V4(Ipv4Addr::new(224, 0, 1, 1)));
958
                        listener.join_multicast(IpAddr::V4(Ipv4Addr::new(224, 1, 1, 1)));
959
                        listener.leave_multicast(IpAddr::V4(Ipv4Addr::new(224, 0, 1, 1)));
960
                        listener.leave_multicast(IpAddr::V4(Ipv4Addr::new(224, 1, 1, 1)));
961
                        let server = Server::from_listeners(vec![listener]);
962
                        server.run(request_handler).await.unwrap();
963
                    })
964
            })
965
            .unwrap();
966

967
        std::thread::sleep(std::time::Duration::from_secs(1));
968
    }
969
    #[test]
970
    #[ignore]
971
    fn multicast_join_leave_v6() {
972
        std::thread::Builder::new()
973
            .name(String::from("v6-server"))
974
            .spawn(move || {
975
                tokio::runtime::Runtime::new()
976
                    .unwrap()
977
                    .block_on(async move {
978
                        // multicast needs a server on a real interface
979
                        let sock = UdpSocket::bind(("0.0.0.0", 0)).await.unwrap();
980
                        let mut listener = Box::new(UdpCoapListener::from_socket(sock));
981
                        listener.join_multicast(IpAddr::V6(Ipv6Addr::new(
982
                            0xff02, 0, 0, 0, 0, 0, 1, 0x1,
983
                        )));
984
                        listener.join_multicast(IpAddr::V6(Ipv6Addr::new(
985
                            0xff02, 0, 0, 0, 0, 1, 0, 0x2,
986
                        )));
987
                        listener.leave_multicast(IpAddr::V6(Ipv6Addr::new(
988
                            0xff02, 0, 0, 0, 0, 0, 1, 0x1,
989
                        )));
990
                        listener.join_multicast(IpAddr::V6(Ipv6Addr::new(
991
                            0xff02, 0, 0, 0, 0, 1, 0, 0x2,
992
                        )));
993
                        let server = Server::from_listeners(vec![listener]);
994
                        server.run(request_handler).await.unwrap();
995
                    })
996
            })
997
            .unwrap();
998

999
        std::thread::sleep(std::time::Duration::from_secs(1));
1000
    }
1001

1002
    fn get_expected_response() -> Vec<u8> {
1003
        let mut resp = vec![];
1004
        for c in b'a'..=b'z' {
1005
            resp.resize(resp.len() + 1024, c);
1006
        }
1007
        resp
1008
    }
1009
    async fn block2_responder(
1010
        mut req: Box<CoapRequest<SocketAddr>>,
1011
    ) -> Box<CoapRequest<SocketAddr>> {
1012
        // vec should contain 'a' 1024 times, then 'b' 1024, up to ascii 'z'
1013

1014
        if let Some(ref mut response) = req.response {
1015
            response.message.payload = get_expected_response();
1016
        }
1017
        req
1018
    }
1019
    #[tokio::test]
1020
    async fn test_block2_server_response() {
1021
        let server_port = spawn_server("127.0.0.1:0", block2_responder)
1022
            .recv()
1023
            .await
1024
            .unwrap();
1025

1026
        let client = UdpCoAPClient::new(format!("127.0.0.1:{}", server_port))
1027
            .await
1028
            .unwrap();
1029
        let resp = client
1030
            .send(RequestBuilder::new("/", RequestType::Get).build())
1031
            .await
1032
            .unwrap();
1033
        assert_eq!(
1034
            resp.message.payload,
1035
            get_expected_response(),
1036
            "responses do not match"
1037
        );
1038
    }
1039
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc