• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Covertness / coap-rs / 25328171359

04 May 2026 03:36PM UTC coverage: 84.615% (-0.7%) from 85.288%
25328171359

Pull #130

github

web-flow
Merge f5a864f79 into 8b5e53778
Pull Request #130: Fix all Clippy warnings and enforce Clippy in CI

64 of 77 new or added lines in 5 files covered. (83.12%)

3 existing lines in 2 files now uncovered.

836 of 988 relevant lines covered (84.62%)

4.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.82
/src/server.rs
1
use async_trait::async_trait;
2
use coap_lite::{BlockHandler, BlockHandlerConfig, CoapOption, CoapRequest, CoapResponse, Packet};
3
use log::debug;
4
use std::{
5
    future::Future,
6
    net::{self, IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs},
7
    sync::Arc,
8
};
9
use tokio::{
10
    io,
11
    net::UdpSocket,
12
    select,
13
    sync::{
14
        mpsc::{self, UnboundedReceiver, UnboundedSender},
15
        Mutex,
16
    },
17
    task::JoinHandle,
18
};
19

20
use crate::observer::{encode_coap_uint, Observer};
21

22
#[derive(Debug)]
23
pub enum CoAPServerError {
24
    NetworkError,
25
    EventLoopError,
26
    AnotherHandlerIsRunning,
27
    EventSendError,
28
}
29

30
use tokio::io::Error;
31

32
#[async_trait]
33
pub trait Dispatcher: Send + Sync {
34
    async fn dispatch(&self, request: CoapRequest<SocketAddr>) -> Option<CoapResponse>;
35
}
36

37
#[async_trait]
38
/// This trait represents a generic way to respond to a listener. If you want to implement your own
39
/// listener, you have to implement this trait to be able to send responses back through the
40
/// correct transport
41
pub trait Responder: Sync + Send {
42
    async fn respond(&self, response: Vec<u8>);
43
    fn address(&self) -> SocketAddr;
44
}
45

46
/// channel to send new requests from a transport to the CoAP server
47
pub type TransportRequestSender = UnboundedSender<(Vec<u8>, Arc<dyn Responder>)>;
48

49
/// channel used by CoAP server to receive new requests
50
pub type TransportRequestReceiver = UnboundedReceiver<(Vec<u8>, Arc<dyn Responder>)>;
51

52
type UdpResponseReceiver = UnboundedReceiver<(Vec<u8>, SocketAddr)>;
53
type UdpResponseSender = UnboundedSender<(Vec<u8>, SocketAddr)>;
54

55
// listeners receive new connections
56
#[async_trait]
57
pub trait Listener: Send {
58
    async fn listen(
59
        self: Box<Self>,
60
        sender: TransportRequestSender,
61
    ) -> std::io::Result<JoinHandle<std::io::Result<()>>>;
62
}
63
/// listener for a UDP socket
64
pub struct UdpCoapListener {
65
    socket: UdpSocket,
66
    multicast_addresses: Vec<IpAddr>,
67
    response_receiver: UdpResponseReceiver,
68
    response_sender: UdpResponseSender,
69
}
70

71
#[async_trait]
72
/// A trait for handling incoming requests. Use this instead of a closure
73
/// if you want to modify some external state
74
pub trait RequestHandler: Send + Sync + 'static {
75
    async fn handle_request(
76
        &self,
77
        mut request: Box<CoapRequest<SocketAddr>>,
78
    ) -> Box<CoapRequest<SocketAddr>>;
79
}
80

81
#[async_trait]
82
impl<F, HandlerRet> RequestHandler for F
83
where
84
    F: Fn(Box<CoapRequest<SocketAddr>>) -> HandlerRet + Send + Sync + 'static,
85
    HandlerRet: Future<Output = Box<CoapRequest<SocketAddr>>> + Send,
86
{
87
    async fn handle_request(
35✔
88
        &self,
89
        request: Box<CoapRequest<SocketAddr>>,
90
    ) -> Box<CoapRequest<SocketAddr>> {
91
        self(request).await
22✔
92
    }
93
}
94

95
/// A listener for UDP packets. This listener can also subscribe to multicast addresses
96
impl UdpCoapListener {
97
    pub fn new<A: ToSocketAddrs>(addr: A) -> Result<Self, Error> {
1✔
98
        let std_socket = net::UdpSocket::bind(addr)?;
1✔
99
        std_socket.set_nonblocking(true)?;
2✔
100
        let socket = UdpSocket::from_std(std_socket)?;
1✔
101
        Ok(Self::from_socket(socket))
2✔
102
    }
103

104
    pub fn from_socket(socket: tokio::net::UdpSocket) -> Self {
1✔
105
        let (tx, rx) = mpsc::unbounded_channel();
2✔
106
        Self {
107
            socket,
108
            multicast_addresses: Vec::new(),
1✔
109
            response_receiver: rx,
110
            response_sender: tx,
111
        }
112
    }
113

114
    /// join multicast - adds the multicast addresses to the unicast listener
115
    /// - IPv4 multicast address range is '224.0.0.0/4'
116
    /// - IPv6 AllCoAp multicast addresses are 'ff00::/8'
117
    ///
118
    /// Parameter segment is used with IPv6 to determine the first octet.
119
    /// - It's value can be between 0x0 and 0xf.
120
    /// - To join multiple segments, you have to call enable_discovery for each of the segments.
121
    ///
122
    /// Some Multicast address scope
123
    /// IPv6        IPv4 equivalent[16]            Scope                Purpose
124
    /// ffx1::/16    127.0.0.0/8                    Interface-local        Packets with this destination address may not be sent over any network link, but must remain within the current node; this is the multicast equivalent of the unicast loopback address.
125
    /// ffx2::/16    224.0.0.0/24                Link-local            Packets with this destination address may not be routed anywhere.
126
    /// ffx3::/16    239.255.0.0/16                IPv4 local scope
127
    /// ffx4::/16                                Admin-local            The smallest scope that must be administratively configured.
128
    /// ffx5::/16                                Site-local            Restricted to the local physical network.
129
    /// ffx8::/16    239.192.0.0/14                Organization-local    Restricted to networks used by the organization administering the local network. (For example, these addresses might be used over VPNs; when packets for this group are routed over the public internet (where these addresses are not valid), they would have to be encapsulated in some other protocol.)
130
    /// ffxe::/16    224.0.1.0-238.255.255.255    Global scope        Eligible to be routed over the public internet.
131
    ///
132
    /// Notable addresses:
133
    /// ff02::1        All nodes on the local network segment
134
    /// ff0x::c        Simple Service Discovery Protocol
135
    /// ff0x::fb    Multicast DNS
136
    /// ff0x::fb    Multicast CoAP
137
    /// ff0x::114    Used for experiments
138
    //    pub fn join_multicast(&mut self, addr: IpAddr) {
139
    //        self.udp_server.join_multicast(addr);
140
    //    }
141
    pub fn join_multicast(&mut self, addr: IpAddr) {
1✔
142
        assert!(addr.is_multicast());
1✔
143
        // determine wether IPv4 or IPv6 and
144
        // join the appropriate multicast address
145
        match self.socket.local_addr().unwrap() {
1✔
146
            SocketAddr::V4(val) => {
1✔
147
                match addr {
1✔
148
                    IpAddr::V4(ipv4) => {
1✔
149
                        let i = *val.ip();
1✔
150
                        self.socket.join_multicast_v4(ipv4, i).unwrap();
1✔
151
                        self.multicast_addresses.push(addr);
1✔
152
                    }
153
                    IpAddr::V6(_ipv6) => { /* handle IPv6 */ }
×
154
                }
155
            }
156
            SocketAddr::V6(_val) => {
×
157
                match addr {
×
158
                    IpAddr::V4(_ipv4) => { /* handle IPv4 */ }
×
159
                    IpAddr::V6(ipv6) => {
×
160
                        self.socket.join_multicast_v6(&ipv6, 0).unwrap();
×
161
                        self.multicast_addresses.push(addr);
×
162
                        //self.socket.set_only_v6(true)?;
163
                    }
164
                }
165
            }
166
        }
167
    }
168

169
    /// leave multicast - remove the multicast address from the listener
170
    pub fn leave_multicast(&mut self, addr: IpAddr) {
1✔
171
        assert!(addr.is_multicast());
1✔
172
        // determine wether IPv4 or IPv6 and
173
        // leave the appropriate multicast address
174
        match self.socket.local_addr().unwrap() {
1✔
175
            SocketAddr::V4(val) => {
1✔
176
                match addr {
1✔
177
                    IpAddr::V4(ipv4) => {
1✔
178
                        let i = *val.ip();
1✔
179
                        self.socket.leave_multicast_v4(ipv4, i).unwrap();
1✔
180
                        let index = self
1✔
181
                            .multicast_addresses
182
                            .iter()
183
                            .position(|&item| item == addr)
3✔
184
                            .unwrap();
185
                        self.multicast_addresses.remove(index);
1✔
186
                    }
187
                    IpAddr::V6(_ipv6) => { /* handle IPv6 */ }
×
188
                }
189
            }
190
            SocketAddr::V6(_val) => {
×
191
                match addr {
×
192
                    IpAddr::V4(_ipv4) => { /* handle IPv4 */ }
×
193
                    IpAddr::V6(ipv6) => {
×
194
                        self.socket.leave_multicast_v6(&ipv6, 0).unwrap();
×
195
                        let index = self
×
196
                            .multicast_addresses
197
                            .iter()
198
                            .position(|&item| item == addr)
×
199
                            .unwrap();
200
                        self.multicast_addresses.remove(index);
×
201
                    }
202
                }
203
            }
204
        }
205
    }
206
    /// enable AllCoAP multicasts - adds the AllCoap addresses to the listener
207
    /// - IPv4 AllCoAP multicast address is '224.0.1.187'
208
    /// - IPv6 AllCoAp multicast addresses are 'ff0?::fd'
209
    ///
210
    /// Parameter segment is used with IPv6 to determine the first octet.
211
    /// - It's value can be between 0x0 and 0xf.
212
    /// - To join multiple segments, you have to call enable_discovery for each of the segments.
213
    ///
214
    /// For further details see method join_multicast
215
    pub fn enable_all_coap(&mut self, segment: u8) {
1✔
216
        assert!(segment <= 0xf);
1✔
217
        let m = match self.socket.local_addr().unwrap() {
1✔
218
            SocketAddr::V4(_val) => IpAddr::V4(Ipv4Addr::new(224, 0, 1, 187)),
1✔
219
            SocketAddr::V6(_val) => IpAddr::V6(Ipv6Addr::new(
×
220
                0xff00 + segment as u16,
×
221
                0,
222
                0,
223
                0,
224
                0,
225
                0,
226
                0,
227
                0xfd,
228
            )),
229
        };
230
        self.join_multicast(m);
1✔
231
    }
232
}
233
#[async_trait]
234
impl Listener for UdpCoapListener {
235
    async fn listen(
5✔
236
        mut self: Box<Self>,
237
        sender: TransportRequestSender,
238
    ) -> std::io::Result<JoinHandle<std::io::Result<()>>> {
239
        return Ok(tokio::spawn(self.receive_loop(sender)));
3✔
240
    }
241
}
242

243
#[derive(Clone)]
244
struct UdpResponder {
245
    address: SocketAddr, // this is the address we are sending to
246
    tx: UdpResponseSender,
247
}
248

249
#[async_trait]
250
impl Responder for UdpResponder {
251
    async fn respond(&self, response: Vec<u8>) {
4✔
252
        let _ = self.tx.send((response, self.address));
2✔
253
    }
254
    fn address(&self) -> SocketAddr {
1✔
255
        self.address
1✔
256
    }
257
}
258

259
impl UdpCoapListener {
260
    pub async fn receive_loop(mut self, sender: TransportRequestSender) -> std::io::Result<()> {
4✔
261
        loop {
2✔
262
            let mut recv_vec = Vec::with_capacity(u16::MAX as usize);
2✔
263
            select! {
2✔
264
                message =self.socket.recv_buf_from(&mut recv_vec)=> {
265
                    match message {
266
                        Ok((_size, from)) => {
267
                            sender.send((recv_vec, Arc::new(UdpResponder{address: from, tx: self.response_sender.clone()}))).map_err( |_| std::io::Error::other("server channel error"))?;
268
                        }
269
                        Err(e) => {
270
                            return Err(e);
271
                        }
272
                    }
273
                },
274
                response = self.response_receiver.recv() => {
275
                    if let Some((bytes, to)) = response{
276
                        debug!("sending {:?} to {:?}", bytes, to);
277
                        self.socket.send_to(&bytes, to).await?;
278
                    }
279
                    else {
280
                        // in case nobody is listening to us, we can just terminate, though this
281
                        // should never happen for UDP
282
                        return Ok(());
283
                    }
284

285
                }
286
            }
287
        }
288
    }
289
}
290

291
#[derive(Debug)]
292
pub struct QueuedMessage {
293
    pub address: SocketAddr,
294
    pub message: Packet,
295
}
296

297
struct ServerCoapState {
298
    observer: Observer,
299
    block_handler: BlockHandler<SocketAddr>,
300
    disable_observe: bool,
301
}
302

303
pub enum ShouldForwardToHandler {
304
    True,
305
    False,
306
}
307

308
impl ServerCoapState {
309
    pub async fn intercept_request(
1✔
310
        &mut self,
311
        request: &mut CoapRequest<SocketAddr>,
312
        responder: Arc<dyn Responder>,
313
    ) -> ShouldForwardToHandler {
314
        match self.block_handler.intercept_request(request) {
3✔
315
            Ok(true) => return ShouldForwardToHandler::False,
1✔
316
            Err(_err) => return ShouldForwardToHandler::False,
×
317
            Ok(false) => {}
318
        };
319

320
        if self.disable_observe {
1✔
321
            return ShouldForwardToHandler::True;
1✔
322
        }
323

324
        let should_be_forwarded = self.observer.request_handler(request, responder).await;
2✔
325
        if should_be_forwarded {
3✔
326
            ShouldForwardToHandler::True
1✔
327
        } else {
328
            ShouldForwardToHandler::False
2✔
329
        }
330
    }
331

332
    pub async fn intercept_response(&mut self, request: &mut CoapRequest<SocketAddr>) {
4✔
333
        let resource_path = request.get_path();
1✔
334

335
        let is_block_fetch_for_observer = request.message.get_option(CoapOption::Block2).is_some()
3✔
336
            && request.message.get_option(CoapOption::Observe).is_none()
1✔
337
            && request.source.is_some()
1✔
338
            && self
2✔
339
                .observer
340
                .is_observing(&request.source.unwrap(), &resource_path);
2✔
341

342
        if is_block_fetch_for_observer {
1✔
343
            if let Some((payload, etag)) =
1✔
344
                self.observer.get_resource_payload_and_etag(&resource_path)
345
            {
346
                if let Some(ref mut response) = request.response {
2✔
347
                    response.message.payload = payload.to_vec();
2✔
348
                    response.message.clear_option(CoapOption::ETag);
1✔
349
                    response.message.add_option(CoapOption::ETag, etag);
1✔
350
                    // Prevent duplicate Size2 options, clear first.
351
                    response.message.clear_option(CoapOption::Size2);
1✔
352
                    response
1✔
353
                        .message
354
                        .add_option(CoapOption::Size2, encode_coap_uint(payload.len()));
1✔
355
                }
356
            }
357
        }
358

359
        if let Err(err) = self.block_handler.intercept_response(request) {
3✔
360
            let _ = request.apply_from_error(err);
×
361
        }
362
    }
363

364
    pub fn new() -> Self {
1✔
365
        Self {
366
            observer: Observer::new(),
1✔
367
            block_handler: BlockHandler::new(BlockHandlerConfig::default()),
2✔
368
            disable_observe: false,
369
        }
370
    }
371
    pub fn disable_observe_handling(&mut self, value: bool) {
1✔
372
        self.disable_observe = value
1✔
373
    }
374
}
375

376
pub struct Server {
377
    listeners: Vec<Box<dyn Listener>>,
378
    coap_state: Arc<Mutex<ServerCoapState>>,
379
    new_packet_receiver: TransportRequestReceiver,
380
    new_packet_sender: TransportRequestSender,
381
}
382

383
impl Server {
384
    /// Creates a CoAP server listening on the given address.
385
    pub fn new_udp<A: ToSocketAddrs>(addr: A) -> Result<Self, io::Error> {
×
386
        let listener: Vec<Box<dyn Listener>> = vec![Box::new(UdpCoapListener::new(addr)?)];
×
387
        Ok(Self::from_listeners(listener))
×
388
    }
389

390
    pub fn from_listeners(listeners: Vec<Box<dyn Listener>>) -> Self {
1✔
391
        let (tx, rx) = mpsc::unbounded_channel();
2✔
392
        Server {
393
            listeners,
394
            coap_state: Arc::new(Mutex::new(ServerCoapState::new())),
2✔
395
            new_packet_receiver: rx,
396
            new_packet_sender: tx,
397
        }
398
    }
399

400
    async fn spawn_handles(
1✔
401
        listeners: Vec<Box<dyn Listener>>,
402
        sender: TransportRequestSender,
403
    ) -> std::io::Result<Vec<JoinHandle<std::io::Result<()>>>> {
404
        let mut handles = vec![];
1✔
405
        for listener in listeners.into_iter() {
4✔
406
            let handle = listener.listen(sender.clone()).await?;
3✔
407
            handles.push(handle);
1✔
408
        }
409
        Ok(handles)
1✔
410
    }
411

412
    /// run the server.
413
    pub async fn run<Handler: RequestHandler>(mut self, handler: Handler) -> Result<(), io::Error> {
52✔
414
        let _handles = Self::spawn_handles(self.listeners, self.new_packet_sender.clone()).await?;
22✔
415

416
        let handler_arc = Arc::new(handler);
22✔
417
        // receive an input, sync our cache / states, then call custom handler
418
        loop {
419
            let (bytes, respond) = self
50✔
NEW
420
                .new_packet_receiver
×
421
                .recv()
422
                .await
61✔
423
                .ok_or_else(|| std::io::Error::other("listen channel closed"))?;
9✔
424
            if let Ok(packet) = Packet::from_bytes(&bytes) {
18✔
425
                let mut request = Box::new(CoapRequest::<SocketAddr>::from_packet(
27✔
426
                    packet,
9✔
427
                    respond.address(),
18✔
428
                ));
429
                let mut coap_state = self.coap_state.lock().await;
18✔
430
                let should_forward = coap_state
27✔
431
                    .intercept_request(&mut request, respond.clone())
9✔
432
                    .await;
36✔
433

434
                match should_forward {
9✔
435
                    ShouldForwardToHandler::True => {
436
                        let handler_clone = handler_arc.clone();
18✔
437
                        let coap_state_clone = self.coap_state.clone();
18✔
438
                        tokio::spawn(async move {
27✔
439
                            request = handler_clone.handle_request(request).await;
20✔
440
                            coap_state_clone
40✔
441
                                .lock()
8✔
442
                                .await
24✔
443
                                .intercept_response(request.as_mut())
8✔
444
                                .await;
16✔
445

446
                            Self::respond_to_request(request, respond).await;
8✔
447
                        });
448
                    }
449
                    ShouldForwardToHandler::False => {
450
                        Self::respond_to_request(request, respond).await;
12✔
451
                    }
452
                }
453
            }
454
        }
455
    }
456
    async fn respond_to_request(req: Box<CoapRequest<SocketAddr>>, responder: Arc<dyn Responder>) {
4✔
457
        // if we have some reponse to send, send it
458
        if let Some(Ok(b)) = req.response.map(|resp| resp.message.to_bytes()) {
6✔
459
            responder.respond(b).await;
2✔
460
        }
461
    }
462
    #[deprecated(
463
        since = "0.21.0",
464
        note = "Use 'coap::Server::automatic_observe_handling' instead."
465
    )]
466
    /// disable auto-observe handling in server
UNCOV
467
    pub async fn disable_observe_handling(&mut self, value: bool) {
×
UNCOV
468
        self.automatic_observe_handling(value).await
×
469
    }
470
    /// set auto-observe handling in server, defaults to enabled
471
    pub async fn automatic_observe_handling(&mut self, value: bool) {
4✔
472
        let mut coap_state = self.coap_state.lock().await;
2✔
473
        coap_state.disable_observe_handling(value)
2✔
474
    }
475
}
476

477
#[cfg(test)]
478
pub mod test {
479
    use crate::request::RequestBuilder;
480

481
    use super::super::*;
482
    use super::*;
483
    use coap_lite::{block_handler::BlockValue, CoapOption, RequestType};
484
    use std::str;
485
    use std::time::Duration;
486

487
    pub fn spawn_server<
488
        F: Fn(Box<CoapRequest<SocketAddr>>) -> HandlerRet + Send + Sync + 'static,
489
        HandlerRet,
490
    >(
491
        ip: &'static str,
492
        request_handler: F,
493
    ) -> mpsc::UnboundedReceiver<u16>
494
    where
495
        HandlerRet: Future<Output = Box<CoapRequest<SocketAddr>>> + Send,
496
    {
497
        let (tx, rx) = mpsc::unbounded_channel();
498
        let _task = tokio::spawn(async move {
499
            let sock = UdpSocket::bind(ip).await.unwrap();
500
            let addr = sock.local_addr().unwrap();
501
            let listener = Box::new(UdpCoapListener::from_socket(sock));
502
            let server = Server::from_listeners(vec![listener]);
503
            tx.send(addr.port()).unwrap();
504
            server.run(request_handler).await.unwrap();
505
        });
506

507
        rx
508
    }
509

510
    async fn request_handler(
511
        mut req: Box<CoapRequest<SocketAddr>>,
512
    ) -> Box<CoapRequest<SocketAddr>> {
513
        let uri_path_list = req.message.get_option(CoapOption::UriPath).unwrap().clone();
514
        assert_eq!(uri_path_list.len(), 1);
515

516
        if let Some(ref mut response) = req.response {
517
            response.message.payload = uri_path_list.front().unwrap().clone();
518
        }
519
        req
520
    }
521

522
    pub fn spawn_server_with_all_coap<
523
        F: Fn(Box<CoapRequest<SocketAddr>>) -> HandlerRet + Send + Sync + 'static,
524
        HandlerRet,
525
    >(
526
        ip: &'static str,
527
        request_handler: F,
528
        segment: u8,
529
    ) -> mpsc::UnboundedReceiver<u16>
530
    where
531
        HandlerRet: Future<Output = Box<CoapRequest<SocketAddr>>> + Send,
532
    {
533
        let (tx, rx) = mpsc::unbounded_channel();
534

535
        std::thread::Builder::new()
536
            .name(String::from("v4-server"))
537
            .spawn(move || {
538
                tokio::runtime::Runtime::new()
539
                    .unwrap()
540
                    .block_on(async move {
541
                        // multicast needs a server on a real interface
542
                        let sock = UdpSocket::bind((ip, 0)).await.unwrap();
543
                        let addr = sock.local_addr().unwrap();
544
                        let mut listener = Box::new(UdpCoapListener::from_socket(sock));
545
                        listener.enable_all_coap(segment);
546
                        let server = Server::from_listeners(vec![listener]);
547
                        tx.send(addr.port()).unwrap();
548
                        server.run(request_handler).await.unwrap();
549
                    })
550
            })
551
            .unwrap();
552

553
        rx
554
    }
555

556
    pub fn spawn_server_disable_observe<
557
        F: Fn(Box<CoapRequest<SocketAddr>>) -> HandlerRet + Send + Sync + 'static,
558
        HandlerRet,
559
    >(
560
        ip: &'static str,
561
        request_handler: F,
562
    ) -> mpsc::UnboundedReceiver<u16>
563
    where
564
        HandlerRet: Future<Output = Box<CoapRequest<SocketAddr>>> + Send,
565
    {
566
        let (tx, rx) = mpsc::unbounded_channel();
567
        let _task = tokio::spawn(async move {
568
            let sock = UdpSocket::bind(ip).await.unwrap();
569
            let addr = sock.local_addr().unwrap();
570
            let listener = Box::new(UdpCoapListener::from_socket(sock));
571
            let mut server = Server::from_listeners(vec![listener]);
572
            server.automatic_observe_handling(true).await;
573
            tx.send(addr.port()).unwrap();
574
            server.run(request_handler).await.unwrap();
575
        });
576

577
        rx
578
    }
579

580
    #[tokio::test]
581
    async fn test_listener_instantiation() {
582
        let listener = UdpCoapListener::new("127.0.0.1:0").unwrap();
583
        assert!(
584
            listener.socket.local_addr().unwrap().ip() == IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))
585
        );
586
        // assert!(listener.socket.blocking() == false);
587

588
        let explicit_socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
589
        let another_listener = UdpCoapListener::from_socket(explicit_socket);
590
        assert!(
591
            another_listener.socket.local_addr().unwrap().ip()
592
                == IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))
593
        );
594
    }
595

596
    #[tokio::test]
597
    async fn test_echo_server() {
598
        let server_port = spawn_server("127.0.0.1:0", request_handler)
599
            .recv()
600
            .await
601
            .unwrap();
602

603
        let client = UdpCoAPClient::new(format!("127.0.0.1:{}", server_port))
604
            .await
605
            .unwrap();
606
        let mut request = CoapRequest::new();
607
        request.message.header.set_version(1);
608
        request
609
            .message
610
            .header
611
            .set_type(coap_lite::MessageType::Confirmable);
612
        request.message.header.set_code("0.01");
613
        request.message.header.message_id = 1;
614
        request.message.set_token(vec![0x51, 0x55, 0x77, 0xE8]);
615
        request
616
            .message
617
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
618
        client.send_single_request(&request).await.unwrap();
619

620
        let recv_packet = client.send(request).await.unwrap();
621
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
622
    }
623

624
    #[tokio::test]
625
    async fn test_put_block() {
626
        let server_port = spawn_server("127.0.0.1:0", request_handler)
627
            .recv()
628
            .await
629
            .unwrap();
630
        let data = "hello this is a payload";
631
        let mut v = Vec::new();
632
        for _ in 0..1024 {
633
            v.extend_from_slice(data.as_bytes());
634
        }
635
        let payload_size = v.len();
636
        let server_string = format!("127.0.0.1:{}", server_port);
637
        let client = UdpCoAPClient::new(server_string.clone()).await.unwrap();
638

639
        let request = RequestBuilder::new("/large", RequestType::Put)
640
            .data(Some(v))
641
            .domain(server_string.clone())
642
            .build();
643

644
        let resp = client.send(request).await.unwrap();
645
        let block_opt = resp
646
            .message
647
            .get_first_option_as::<BlockValue>(CoapOption::Block1)
648
            .expect("expected block opt in response")
649
            .expect("could not decode block1 option");
650
        let expected_number = (payload_size as f32 / 1024.0).ceil() as u16 - 1;
651
        assert_eq!(
652
            block_opt.num, expected_number,
653
            "block not completely received!"
654
        );
655

656
        assert_eq!(resp.message.payload, b"large".to_vec());
657
    }
658

659
    #[tokio::test]
660
    #[ignore]
661
    async fn test_echo_server_v6() {
662
        let server_port = spawn_server("::1:0", request_handler).recv().await.unwrap();
663

664
        let client = UdpCoAPClient::new(format!("::1:{}", server_port))
665
            .await
666
            .unwrap();
667
        let mut request = CoapRequest::new();
668
        request.message.header.set_version(1);
669
        request
670
            .message
671
            .header
672
            .set_type(coap_lite::MessageType::Confirmable);
673
        request.message.header.set_code("0.01");
674
        request.message.header.message_id = 1;
675
        request.message.set_token(vec![0x51, 0x55, 0x77, 0xE8]);
676
        request
677
            .message
678
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
679

680
        let recv_packet = client.send(request).await.unwrap();
681
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
682
    }
683

684
    #[tokio::test]
685
    async fn test_echo_server_no_token() {
686
        let server_port = spawn_server("127.0.0.1:0", request_handler)
687
            .recv()
688
            .await
689
            .unwrap();
690

691
        let client = UdpCoAPClient::new(format!("127.0.0.1:{}", server_port))
692
            .await
693
            .unwrap();
694
        let mut packet = CoapRequest::new();
695
        packet.message.header.set_version(1);
696
        packet
697
            .message
698
            .header
699
            .set_type(coap_lite::MessageType::Confirmable);
700
        packet.message.header.set_code("0.01");
701
        packet.message.header.message_id = 1;
702
        packet
703
            .message
704
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
705
        let recv_packet = client.send(packet).await.unwrap();
706
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
707
    }
708

709
    #[tokio::test]
710
    #[ignore]
711
    async fn test_echo_server_no_token_v6() {
712
        let server_port = spawn_server("::1:0", request_handler).recv().await.unwrap();
713

714
        let client = UdpCoAPClient::new(format!("::1:{}", server_port))
715
            .await
716
            .unwrap();
717
        let mut packet = CoapRequest::new();
718
        packet.message.header.set_version(1);
719
        packet
720
            .message
721
            .header
722
            .set_type(coap_lite::MessageType::Confirmable);
723
        packet.message.header.set_code("0.01");
724
        packet.message.header.message_id = 1;
725
        packet
726
            .message
727
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
728

729
        let recv_packet = client.send(packet).await.unwrap();
730
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
731
    }
732

733
    #[tokio::test]
734
    async fn test_update_resource() {
735
        let path = "/test";
736
        let payload1 = b"data1".to_vec();
737
        let payload2 = b"data2".to_vec();
738
        let (tx, mut rx) = mpsc::unbounded_channel();
739
        let (tx2, mut rx2) = mpsc::unbounded_channel();
740
        let mut step = 1;
741

742
        let server_port = spawn_server("127.0.0.1:0", request_handler)
743
            .recv()
744
            .await
745
            .unwrap();
746

747
        let client = UdpCoAPClient::new(format!("127.0.0.1:{}", server_port))
748
            .await
749
            .unwrap();
750

751
        tx.send(step).unwrap();
752
        let mut request = CoapRequest::new();
753
        request.set_method(RequestType::Put);
754
        request.set_path(path);
755
        request.message.payload = payload1.clone();
756
        client.send(request.clone()).await.unwrap();
757

758
        let mut receive_step = 1;
759
        let payload1_clone = payload1.clone();
760
        let payload2_clone = payload2.clone();
761
        client
762
            .observe(path, move |msg| {
763
                if let Ok(n) = rx.try_recv() {
764
                    receive_step = n
765
                }
766

767
                match receive_step {
768
                    1 => assert_eq!(msg.payload, payload1_clone),
769
                    2 => {
770
                        assert_eq!(msg.payload, payload2_clone);
771
                        tx2.send(()).unwrap();
772
                    }
773
                    _ => panic!("unexpected step"),
774
                }
775
            })
776
            .await
777
            .unwrap();
778

779
        step = 2;
780
        tx.send(step).unwrap();
781
        request.message.payload = payload2.clone();
782
        let client2 = UdpCoAPClient::new(format!("127.0.0.1:{}", server_port))
783
            .await
784
            .unwrap();
785
        let _ = client2.send(request).await.unwrap();
786
        assert_eq!(
787
            tokio::time::timeout(Duration::new(5, 0), rx2.recv())
788
                .await
789
                .unwrap(),
790
            Some(())
791
        );
792
    }
793

794
    #[tokio::test]
795
    async fn test_observe_transparent_transmission() {
796
        let path = "/test";
797
        let (tx, mut rx) = mpsc::unbounded_channel();
798

799
        let server_port = spawn_server_disable_observe("127.0.0.1:0", request_handler)
800
            .recv()
801
            .await
802
            .unwrap();
803

804
        let client = UdpCoAPClient::new(format!("127.0.0.1:{}", server_port))
805
            .await
806
            .unwrap();
807

808
        client
809
            .observe(path, move |msg| {
810
                assert_eq!(msg.payload, b"test".to_vec());
811
                tx.send(()).unwrap();
812
            })
813
            .await
814
            .unwrap();
815

816
        assert_eq!(
817
            tokio::time::timeout(Duration::new(5, 0), rx.recv())
818
                .await
819
                .unwrap(),
820
            Some(())
821
        );
822
    }
823

824
    #[tokio::test]
825
    async fn multicast_server_all_coap() {
826
        // segment not relevant with IPv4
827
        let segment = 0x0;
828
        let server_port = spawn_server_with_all_coap("0.0.0.0", request_handler, segment)
829
            .recv()
830
            .await
831
            .unwrap();
832

833
        let client = UdpCoAPClient::new(format!("127.0.0.1:{}", server_port))
834
            .await
835
            .unwrap();
836
        let mut request = CoapRequest::new();
837
        request.message.header.set_version(1);
838
        request
839
            .message
840
            .header
841
            .set_type(coap_lite::MessageType::Confirmable);
842
        request.message.header.set_code("0.01");
843
        request.message.header.message_id = 1;
844
        request.message.set_token(vec![0x51, 0x55, 0x77, 0xE8]);
845
        request
846
            .message
847
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
848
        let recv_packet = client.send(request).await.unwrap();
849

850
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
851

852
        let client = UdpCoAPClient::new(format!("224.0.1.187:{}", server_port))
853
            .await
854
            .unwrap();
855
        let mut request = RequestBuilder::new("test-echo", RequestType::Get)
856
            .data(Some(vec![0x51, 0x55, 0x77, 0xE8]))
857
            .confirmable(true)
858
            .build();
859

860
        let mut receiver = client.create_receiver_for(&request).await;
861
        client.send_all_coap(&mut request, segment).await.unwrap();
862
        let recv_packet = receiver.receive().await.unwrap();
863
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
864
    }
865

866
    //This test right now does not work on windows
867
    #[cfg(unix)]
868
    #[tokio::test]
869
    #[ignore]
870
    async fn multicast_server_all_coap_v6() {
871
        // use segment 0x04 which should be the smallest administered scope
872

873
        let segment = 0x04;
874
        let server_port = spawn_server_with_all_coap("::0", request_handler, segment)
875
            .recv()
876
            .await
877
            .unwrap();
878

879
        let client = UdpCoAPClient::new(format!("::1:{}", server_port))
880
            .await
881
            .unwrap();
882
        let mut request = CoapRequest::new();
883
        request.message.header.set_version(1);
884
        request
885
            .message
886
            .header
887
            .set_type(coap_lite::MessageType::Confirmable);
888
        request.message.header.set_code("0.01");
889
        request.message.header.message_id = 1;
890
        request.message.set_token(vec![0x51, 0x55, 0x77, 0xE8]);
891
        request
892
            .message
893
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
894
        client.send_single_request(&request).await.unwrap();
895

896
        let recv_packet = client.send(request).await.unwrap();
897
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
898

899
        // use 0xff02 to keep it within this network
900
        let client = UdpCoAPClient::new(format!("ff0{}::fd:{}", segment, server_port))
901
            .await
902
            .unwrap();
903
        let mut request = CoapRequest::new();
904
        request.message.header.set_version(1);
905
        request
906
            .message
907
            .header
908
            .set_type(coap_lite::MessageType::NonConfirmable);
909
        request.message.header.set_code("0.01");
910
        request.message.header.message_id = 2;
911
        request.message.set_token(vec![0x51, 0x55, 0x77, 0xE8]);
912
        request
913
            .message
914
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
915
        let mut receiver = client.create_receiver_for(&request).await;
916
        client.send_all_coap(&mut request, segment).await.unwrap();
917
        let recv_packet = receiver.receive().await.unwrap();
918
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
919
    }
920

921
    #[test]
922
    fn multicast_join_leave() {
923
        std::thread::Builder::new()
924
            .name(String::from("v4-server"))
925
            .spawn(move || {
926
                tokio::runtime::Runtime::new()
927
                    .unwrap()
928
                    .block_on(async move {
929
                        // multicast needs a server on a real interface
930
                        let sock = UdpSocket::bind(("0.0.0.0", 0)).await.unwrap();
931
                        let mut listener = Box::new(UdpCoapListener::from_socket(sock));
932
                        listener.join_multicast(IpAddr::V4(Ipv4Addr::new(224, 0, 1, 1)));
933
                        listener.join_multicast(IpAddr::V4(Ipv4Addr::new(224, 1, 1, 1)));
934
                        listener.leave_multicast(IpAddr::V4(Ipv4Addr::new(224, 0, 1, 1)));
935
                        listener.leave_multicast(IpAddr::V4(Ipv4Addr::new(224, 1, 1, 1)));
936
                        let server = Server::from_listeners(vec![listener]);
937
                        server.run(request_handler).await.unwrap();
938
                    })
939
            })
940
            .unwrap();
941

942
        std::thread::sleep(std::time::Duration::from_secs(1));
943
    }
944
    #[test]
945
    #[ignore]
946
    fn multicast_join_leave_v6() {
947
        std::thread::Builder::new()
948
            .name(String::from("v6-server"))
949
            .spawn(move || {
950
                tokio::runtime::Runtime::new()
951
                    .unwrap()
952
                    .block_on(async move {
953
                        // multicast needs a server on a real interface
954
                        let sock = UdpSocket::bind(("0.0.0.0", 0)).await.unwrap();
955
                        let mut listener = Box::new(UdpCoapListener::from_socket(sock));
956
                        listener.join_multicast(IpAddr::V6(Ipv6Addr::new(
957
                            0xff02, 0, 0, 0, 0, 0, 1, 0x1,
958
                        )));
959
                        listener.join_multicast(IpAddr::V6(Ipv6Addr::new(
960
                            0xff02, 0, 0, 0, 0, 1, 0, 0x2,
961
                        )));
962
                        listener.leave_multicast(IpAddr::V6(Ipv6Addr::new(
963
                            0xff02, 0, 0, 0, 0, 0, 1, 0x1,
964
                        )));
965
                        listener.join_multicast(IpAddr::V6(Ipv6Addr::new(
966
                            0xff02, 0, 0, 0, 0, 1, 0, 0x2,
967
                        )));
968
                        let server = Server::from_listeners(vec![listener]);
969
                        server.run(request_handler).await.unwrap();
970
                    })
971
            })
972
            .unwrap();
973

974
        std::thread::sleep(std::time::Duration::from_secs(1));
975
    }
976

977
    fn get_expected_response() -> Vec<u8> {
978
        let mut resp = vec![];
979
        for c in b'a'..=b'z' {
980
            resp.extend(std::iter::repeat_n(c, 1024));
981
        }
982
        resp
983
    }
984
    async fn block2_responder(
985
        mut req: Box<CoapRequest<SocketAddr>>,
986
    ) -> Box<CoapRequest<SocketAddr>> {
987
        // vec should contain 'a' 1024 times, then 'b' 1024, up to ascii 'z'
988

989
        if let Some(ref mut response) = req.response {
990
            response.message.payload = get_expected_response();
991
        }
992
        req
993
    }
994
    #[tokio::test]
995
    async fn test_block2_server_response() {
996
        let server_port = spawn_server("127.0.0.1:0", block2_responder)
997
            .recv()
998
            .await
999
            .unwrap();
1000

1001
        let client = UdpCoAPClient::new(format!("127.0.0.1:{}", server_port))
1002
            .await
1003
            .unwrap();
1004
        let resp = client
1005
            .send(RequestBuilder::new("/", RequestType::Get).build())
1006
            .await
1007
            .unwrap();
1008
        assert_eq!(
1009
            resp.message.payload,
1010
            get_expected_response(),
1011
            "responses do not match"
1012
        );
1013
    }
1014
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc