• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Covertness / coap-rs / 25067091909

28 Apr 2026 05:11PM UTC coverage: 85.163% (+2.0%) from 83.134%
25067091909

Pull #128

github

web-flow
Merge 23a963bf6 into fd8429451
Pull Request #128: Add block-wise observe support for large resources

75 of 87 new or added lines in 3 files covered. (86.21%)

1 existing line in 1 file now uncovered.

838 of 984 relevant lines covered (85.16%)

4.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.01
/src/server.rs
1
use async_trait::async_trait;
2
use coap_lite::{BlockHandler, BlockHandlerConfig, CoapOption, CoapRequest, CoapResponse, Packet};
3
use log::debug;
4
use std::{
5
    future::Future,
6
    io::ErrorKind,
7
    net::{self, IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs},
8
    sync::Arc,
9
};
10
use tokio::{
11
    io,
12
    net::UdpSocket,
13
    select,
14
    sync::{
15
        mpsc::{self, UnboundedReceiver, UnboundedSender},
16
        Mutex,
17
    },
18
    task::JoinHandle,
19
};
20

21
use crate::observer::Observer;
22

23
#[derive(Debug)]
24
pub enum CoAPServerError {
25
    NetworkError,
26
    EventLoopError,
27
    AnotherHandlerIsRunning,
28
    EventSendError,
29
}
30

31
use tokio::io::Error;
32

33
#[async_trait]
34
pub trait Dispatcher: Send + Sync {
35
    async fn dispatch(&self, request: CoapRequest<SocketAddr>) -> Option<CoapResponse>;
36
}
37

38
#[async_trait]
39
/// This trait represents a generic way to respond to a listener. If you want to implement your own
40
/// listener, you have to implement this trait to be able to send responses back through the
41
/// correct transport
42
pub trait Responder: Sync + Send {
43
    async fn respond(&self, response: Vec<u8>);
44
    fn address(&self) -> SocketAddr;
45
}
46

47
/// channel to send new requests from a transport to the CoAP server
48
pub type TransportRequestSender = UnboundedSender<(Vec<u8>, Arc<dyn Responder>)>;
49

50
/// channel used by CoAP server to receive new requests
51
pub type TransportRequestReceiver = UnboundedReceiver<(Vec<u8>, Arc<dyn Responder>)>;
52

53
type UdpResponseReceiver = UnboundedReceiver<(Vec<u8>, SocketAddr)>;
54
type UdpResponseSender = UnboundedSender<(Vec<u8>, SocketAddr)>;
55

56
// listeners receive new connections
57
#[async_trait]
58
pub trait Listener: Send {
59
    async fn listen(
60
        self: Box<Self>,
61
        sender: TransportRequestSender,
62
    ) -> std::io::Result<JoinHandle<std::io::Result<()>>>;
63
}
64
/// listener for a UDP socket
65
pub struct UdpCoapListener {
66
    socket: UdpSocket,
67
    multicast_addresses: Vec<IpAddr>,
68
    response_receiver: UdpResponseReceiver,
69
    response_sender: UdpResponseSender,
70
}
71

72
#[async_trait]
73
/// A trait for handling incoming requests. Use this instead of a closure
74
/// if you want to modify some external state
75
pub trait RequestHandler: Send + Sync + 'static {
76
    async fn handle_request(
77
        &self,
78
        mut request: Box<CoapRequest<SocketAddr>>,
79
    ) -> Box<CoapRequest<SocketAddr>>;
80
}
81

82
#[async_trait]
83
impl<F, HandlerRet> RequestHandler for F
84
where
85
    F: Fn(Box<CoapRequest<SocketAddr>>) -> HandlerRet + Send + Sync + 'static,
86
    HandlerRet: Future<Output = Box<CoapRequest<SocketAddr>>> + Send,
87
{
88
    async fn handle_request(
36✔
89
        &self,
90
        request: Box<CoapRequest<SocketAddr>>,
91
    ) -> Box<CoapRequest<SocketAddr>> {
92
        self(request).await
24✔
93
    }
94
}
95

96
/// A listener for UDP packets. This listener can also subscribe to multicast addresses
97
impl UdpCoapListener {
98
    pub fn new<A: ToSocketAddrs>(addr: A) -> Result<Self, Error> {
1✔
99
        let std_socket = net::UdpSocket::bind(addr)?;
1✔
100
        std_socket.set_nonblocking(true)?;
2✔
101
        let socket = UdpSocket::from_std(std_socket)?;
1✔
102
        Ok(Self::from_socket(socket))
2✔
103
    }
104

105
    pub fn from_socket(socket: tokio::net::UdpSocket) -> Self {
1✔
106
        let (tx, rx) = mpsc::unbounded_channel();
2✔
107
        Self {
108
            socket,
109
            multicast_addresses: Vec::new(),
1✔
110
            response_receiver: rx,
111
            response_sender: tx,
112
        }
113
    }
114

115
    /// join multicast - adds the multicast addresses to the unicast listener
116
    /// - IPv4 multicast address range is '224.0.0.0/4'
117
    /// - IPv6 AllCoAp multicast addresses are 'ff00::/8'
118
    ///
119
    /// Parameter segment is used with IPv6 to determine the first octet.
120
    /// - It's value can be between 0x0 and 0xf.
121
    /// - To join multiple segments, you have to call enable_discovery for each of the segments.
122
    ///
123
    /// Some Multicast address scope
124
    /// IPv6        IPv4 equivalent[16]                Scope                    Purpose
125
    /// ffx1::/16        127.0.0.0/8                        Interface-local            Packets with this destination address may not be sent over any network link, but must remain within the current node; this is the multicast equivalent of the unicast loopback address.
126
    /// ffx2::/16        224.0.0.0/24                    Link-local                Packets with this destination address may not be routed anywhere.
127
    /// ffx3::/16        239.255.0.0/16                    IPv4 local scope
128
    /// ffx4::/16                                        Admin-local                The smallest scope that must be administratively configured.
129
    /// ffx5::/16                                        Site-local                Restricted to the local physical network.
130
    /// ffx8::/16        239.192.0.0/14                    Organization-local        Restricted to networks used by the organization administering the local network. (For example, these addresses might be used over VPNs; when packets for this group are routed over the public internet (where these addresses are not valid), they would have to be encapsulated in some other protocol.)
131
    /// ffxe::/16        224.0.1.0-238.255.255.255        Global scope            Eligible to be routed over the public internet.
132
    ///
133
    /// Notable addresses:
134
    /// ff02::1            All nodes on the local network segment
135
    /// ff0x::c            Simple Service Discovery Protocol
136
    /// ff0x::fb        Multicast DNS
137
    /// ff0x::fb        Multicast CoAP
138
    /// ff0x::114        Used for experiments
139
    //    pub fn join_multicast(&mut self, addr: IpAddr) {
140
    //        self.udp_server.join_multicast(addr);
141
    //    }
142
    pub fn join_multicast(&mut self, addr: IpAddr) {
1✔
143
        assert!(addr.is_multicast());
1✔
144
        // determine wether IPv4 or IPv6 and
145
        // join the appropriate multicast address
146
        match self.socket.local_addr().unwrap() {
1✔
147
            SocketAddr::V4(val) => {
1✔
148
                match addr {
1✔
149
                    IpAddr::V4(ipv4) => {
1✔
150
                        let i = val.ip().clone();
1✔
151
                        self.socket.join_multicast_v4(ipv4, i).unwrap();
1✔
152
                        self.multicast_addresses.push(addr);
1✔
153
                    }
154
                    IpAddr::V6(_ipv6) => { /* handle IPv6 */ }
×
155
                }
156
            }
157
            SocketAddr::V6(_val) => {
×
158
                match addr {
×
159
                    IpAddr::V4(_ipv4) => { /* handle IPv4 */ }
×
160
                    IpAddr::V6(ipv6) => {
×
161
                        self.socket.join_multicast_v6(&ipv6, 0).unwrap();
×
162
                        self.multicast_addresses.push(addr);
×
163
                        //self.socket.set_only_v6(true)?;
164
                    }
165
                }
166
            }
167
        }
168
    }
169

170
    /// leave multicast - remove the multicast address from the listener
171
    pub fn leave_multicast(&mut self, addr: IpAddr) {
1✔
172
        assert!(addr.is_multicast());
1✔
173
        // determine wether IPv4 or IPv6 and
174
        // leave the appropriate multicast address
175
        match self.socket.local_addr().unwrap() {
1✔
176
            SocketAddr::V4(val) => {
1✔
177
                match addr {
1✔
178
                    IpAddr::V4(ipv4) => {
1✔
179
                        let i = val.ip().clone();
1✔
180
                        self.socket.leave_multicast_v4(ipv4, i).unwrap();
1✔
181
                        let index = self
1✔
182
                            .multicast_addresses
183
                            .iter()
184
                            .position(|&item| item == addr)
3✔
185
                            .unwrap();
186
                        self.multicast_addresses.remove(index);
1✔
187
                    }
188
                    IpAddr::V6(_ipv6) => { /* handle IPv6 */ }
×
189
                }
190
            }
191
            SocketAddr::V6(_val) => {
×
192
                match addr {
×
193
                    IpAddr::V4(_ipv4) => { /* handle IPv4 */ }
×
194
                    IpAddr::V6(ipv6) => {
×
195
                        self.socket.leave_multicast_v6(&ipv6, 0).unwrap();
×
196
                        let index = self
×
197
                            .multicast_addresses
198
                            .iter()
199
                            .position(|&item| item == addr)
×
200
                            .unwrap();
201
                        self.multicast_addresses.remove(index);
×
202
                    }
203
                }
204
            }
205
        }
206
    }
207
    /// enable AllCoAP multicasts - adds the AllCoap addresses to the listener
208
    /// - IPv4 AllCoAP multicast address is '224.0.1.187'
209
    /// - IPv6 AllCoAp multicast addresses are 'ff0?::fd'
210
    ///
211
    /// Parameter segment is used with IPv6 to determine the first octet.
212
    /// - It's value can be between 0x0 and 0xf.
213
    /// - To join multiple segments, you have to call enable_discovery for each of the segments.
214
    ///
215
    /// For further details see method join_multicast
216
    pub fn enable_all_coap(&mut self, segment: u8) {
1✔
217
        assert!(segment <= 0xf);
1✔
218
        let m = match self.socket.local_addr().unwrap() {
1✔
219
            SocketAddr::V4(_val) => IpAddr::V4(Ipv4Addr::new(224, 0, 1, 187)),
1✔
220
            SocketAddr::V6(_val) => IpAddr::V6(Ipv6Addr::new(
×
221
                0xff00 + segment as u16,
×
222
                0,
223
                0,
224
                0,
225
                0,
226
                0,
227
                0,
228
                0xfd,
229
            )),
230
        };
231
        self.join_multicast(m);
1✔
232
    }
233
}
234
#[async_trait]
235
impl Listener for UdpCoapListener {
236
    async fn listen(
5✔
237
        mut self: Box<Self>,
238
        sender: TransportRequestSender,
239
    ) -> std::io::Result<JoinHandle<std::io::Result<()>>> {
240
        return Ok(tokio::spawn(self.receive_loop(sender)));
3✔
241
    }
242
}
243

244
#[derive(Clone)]
245
struct UdpResponder {
246
    address: SocketAddr, // this is the address we are sending to
247
    tx: UdpResponseSender,
248
}
249

250
#[async_trait]
251
impl Responder for UdpResponder {
252
    async fn respond(&self, response: Vec<u8>) {
4✔
253
        let _ = self.tx.send((response, self.address));
2✔
254
    }
255
    fn address(&self) -> SocketAddr {
1✔
256
        self.address
1✔
257
    }
258
}
259

260
impl UdpCoapListener {
261
    pub async fn receive_loop(mut self, sender: TransportRequestSender) -> std::io::Result<()> {
4✔
262
        loop {
2✔
263
            let mut recv_vec = Vec::with_capacity(u16::MAX as usize);
2✔
264
            select! {
5✔
265
                message =self.socket.recv_buf_from(&mut recv_vec)=> {
266
                    match message {
267
                        Ok((_size, from)) => {
268
                            sender.send((recv_vec, Arc::new(UdpResponder{address: from, tx: self.response_sender.clone()}))).map_err( |_| std::io::Error::new(ErrorKind::Other, "server channel error"))?;
269
                        }
270
                        Err(e) => {
271
                            return Err(e);
272
                        }
273
                    }
274
                },
275
                response = self.response_receiver.recv() => {
276
                    if let Some((bytes, to)) = response{
277
                        debug!("sending {:?} to {:?}", &bytes,  &to);
278
                        self.socket.send_to(&bytes, to).await?;
279
                    }
280
                    else {
281
                        // in case nobody is listening to us, we can just terminate, though this
282
                        // should never happen for UDP
283
                        return Ok(());
284
                    }
285

286
                }
287
            }
288
        }
289
    }
290
}
291

292
#[derive(Debug)]
293
pub struct QueuedMessage {
294
    pub address: SocketAddr,
295
    pub message: Packet,
296
}
297

298
struct ServerCoapState {
299
    observer: Observer,
300
    block_handler: BlockHandler<SocketAddr>,
301
    disable_observe: bool,
302
}
303

304
pub enum ShouldForwardToHandler {
305
    True,
306
    False,
307
}
308

309
impl ServerCoapState {
310
    pub async fn intercept_request(
1✔
311
        &mut self,
312
        request: &mut CoapRequest<SocketAddr>,
313
        responder: Arc<dyn Responder>,
314
    ) -> ShouldForwardToHandler {
315
        match self.block_handler.intercept_request(request) {
3✔
316
            Ok(true) => return ShouldForwardToHandler::False,
1✔
317
            Err(_err) => return ShouldForwardToHandler::False,
×
318
            Ok(false) => {}
319
        };
320

321
        if self.disable_observe {
1✔
322
            return ShouldForwardToHandler::True;
1✔
323
        }
324

325
        let should_be_forwarded = self.observer.request_handler(request, responder).await;
2✔
326
        if should_be_forwarded {
1✔
327
            return ShouldForwardToHandler::True;
1✔
328
        } else {
329
            return ShouldForwardToHandler::False;
2✔
330
        }
331
    }
332

333
    pub async fn intercept_response(&mut self, request: &mut CoapRequest<SocketAddr>) {
4✔
334
        let resource_path = request.get_path();
1✔
335

336
        let is_block_fetch_for_observer = request.message.get_option(CoapOption::Block2).is_some()
3✔
337
            && request.message.get_option(CoapOption::Observe).is_none()
1✔
338
            && request.source.is_some()
1✔
339
            && self
2✔
340
                .observer
341
                .is_observing(&request.source.unwrap(), &resource_path);
2✔
342

343
        if is_block_fetch_for_observer {
1✔
344
            if let Some((payload, etag)) =
1✔
345
                self.observer.get_resource_payload_and_etag(&resource_path)
346
            {
347
                if let Some(ref mut response) = request.response {
2✔
348
                    response.message.payload = payload.to_vec();
2✔
349
                    response.message.clear_option(CoapOption::ETag);
1✔
350
                    response.message.add_option(CoapOption::ETag, etag);
1✔
351
                }
352
            }
353
        }
354

355
        if let Err(err) = self.block_handler.intercept_response(request) {
3✔
NEW
356
            let _ = request.apply_from_error(err);
×
357
        }
358
    }
359

360
    pub fn new() -> Self {
1✔
361
        Self {
362
            observer: Observer::new(),
1✔
363
            block_handler: BlockHandler::new(BlockHandlerConfig::default()),
2✔
364
            disable_observe: false,
365
        }
366
    }
367
    pub fn disable_observe_handling(&mut self, value: bool) {
1✔
368
        self.disable_observe = value
1✔
369
    }
370
}
371

372
pub struct Server {
373
    listeners: Vec<Box<dyn Listener>>,
374
    coap_state: Arc<Mutex<ServerCoapState>>,
375
    new_packet_receiver: TransportRequestReceiver,
376
    new_packet_sender: TransportRequestSender,
377
}
378

379
impl Server {
380
    /// Creates a CoAP server listening on the given address.
381
    pub fn new_udp<A: ToSocketAddrs>(addr: A) -> Result<Self, io::Error> {
×
382
        let listener: Vec<Box<dyn Listener>> = vec![Box::new(UdpCoapListener::new(addr)?)];
×
383
        Ok(Self::from_listeners(listener))
×
384
    }
385

386
    pub fn from_listeners(listeners: Vec<Box<dyn Listener>>) -> Self {
1✔
387
        let (tx, rx) = mpsc::unbounded_channel();
2✔
388
        Server {
389
            listeners,
390
            coap_state: Arc::new(Mutex::new(ServerCoapState::new())),
2✔
391
            new_packet_receiver: rx,
392
            new_packet_sender: tx,
393
        }
394
    }
395

396
    async fn spawn_handles(
1✔
397
        listeners: Vec<Box<dyn Listener>>,
398
        sender: TransportRequestSender,
399
    ) -> std::io::Result<Vec<JoinHandle<std::io::Result<()>>>> {
400
        let mut handles = vec![];
1✔
401
        for listener in listeners.into_iter() {
4✔
402
            let handle = listener.listen(sender.clone()).await?;
3✔
403
            handles.push(handle);
1✔
404
        }
405
        return Ok(handles);
1✔
406
    }
407

408
    /// run the server.
409
    pub async fn run<Handler: RequestHandler>(mut self, handler: Handler) -> Result<(), io::Error> {
49✔
410
        let _handles = Self::spawn_handles(self.listeners, self.new_packet_sender.clone()).await?;
20✔
411

412
        let handler_arc = Arc::new(handler);
20✔
413
        // receive an input, sync our cache / states, then call custom handler
414
        loop {
415
            let (bytes, respond) =
19✔
416
                self.new_packet_receiver.recv().await.ok_or_else(|| {
×
417
                    std::io::Error::new(ErrorKind::Other, "listen channel closed")
×
418
                })?;
419
            if let Ok(packet) = Packet::from_bytes(&bytes) {
18✔
420
                let mut request = Box::new(CoapRequest::<SocketAddr>::from_packet(
27✔
421
                    packet,
9✔
422
                    respond.address(),
18✔
423
                ));
424
                let mut coap_state = self.coap_state.lock().await;
18✔
425
                let should_forward = coap_state
27✔
426
                    .intercept_request(&mut request, respond.clone())
9✔
427
                    .await;
36✔
428

429
                match should_forward {
9✔
430
                    ShouldForwardToHandler::True => {
431
                        let handler_clone = handler_arc.clone();
18✔
432
                        let coap_state_clone = self.coap_state.clone();
18✔
433
                        tokio::spawn(async move {
27✔
434
                            request = handler_clone.handle_request(request).await;
20✔
435
                            coap_state_clone
40✔
436
                                .lock()
8✔
437
                                .await
24✔
438
                                .intercept_response(request.as_mut())
8✔
439
                                .await;
16✔
440

441
                            Self::respond_to_request(request, respond).await;
8✔
442
                        });
443
                    }
444
                    ShouldForwardToHandler::False => {
445
                        Self::respond_to_request(request, respond).await;
12✔
446
                    }
447
                }
448
            }
449
        }
450
    }
451
    async fn respond_to_request(req: Box<CoapRequest<SocketAddr>>, responder: Arc<dyn Responder>) {
4✔
452
        // if we have some reponse to send, send it
453
        if let Some(Ok(b)) = req.response.map(|resp| resp.message.to_bytes()) {
6✔
454
            responder.respond(b).await;
2✔
455
        }
456
    }
457
    #[deprecated(
458
        since = "0.21.0",
459
        note = "Use 'coap::Server::automatic_observe_handling' instead."
460
    )]
461
    /// disable auto-observe handling in server
462
    pub async fn disable_observe_handling(&mut self, value: bool) {
4✔
463
        self.automatic_observe_handling(value).await
2✔
464
    }
465
    /// set auto-observe handling in server, defaults to enabled
466
    pub async fn automatic_observe_handling(&mut self, value: bool) {
4✔
467
        let mut coap_state = self.coap_state.lock().await;
2✔
468
        coap_state.disable_observe_handling(value)
2✔
469
    }
470
}
471

472
#[cfg(test)]
473
pub mod test {
474
    use crate::request::RequestBuilder;
475

476
    use super::super::*;
477
    use super::*;
478
    use coap_lite::{block_handler::BlockValue, CoapOption, RequestType};
479
    use std::str;
480
    use std::time::Duration;
481

482
    pub fn spawn_server<
483
        F: Fn(Box<CoapRequest<SocketAddr>>) -> HandlerRet + Send + Sync + 'static,
484
        HandlerRet,
485
    >(
486
        ip: &'static str,
487
        request_handler: F,
488
    ) -> mpsc::UnboundedReceiver<u16>
489
    where
490
        HandlerRet: Future<Output = Box<CoapRequest<SocketAddr>>> + Send,
491
    {
492
        let (tx, rx) = mpsc::unbounded_channel();
493
        let _task = tokio::spawn(async move {
494
            let sock = UdpSocket::bind(ip).await.unwrap();
495
            let addr = sock.local_addr().unwrap();
496
            let listener = Box::new(UdpCoapListener::from_socket(sock));
497
            let server = Server::from_listeners(vec![listener]);
498
            tx.send(addr.port()).unwrap();
499
            server.run(request_handler).await.unwrap();
500
        });
501

502
        rx
503
    }
504

505
    async fn request_handler(
506
        mut req: Box<CoapRequest<SocketAddr>>,
507
    ) -> Box<CoapRequest<SocketAddr>> {
508
        let uri_path_list = req.message.get_option(CoapOption::UriPath).unwrap().clone();
509
        assert_eq!(uri_path_list.len(), 1);
510

511
        match req.response {
512
            Some(ref mut response) => {
513
                response.message.payload = uri_path_list.front().unwrap().clone();
514
            }
515
            _ => {}
516
        }
517
        return req;
518
    }
519

520
    pub fn spawn_server_with_all_coap<
521
        F: Fn(Box<CoapRequest<SocketAddr>>) -> HandlerRet + Send + Sync + 'static,
522
        HandlerRet,
523
    >(
524
        ip: &'static str,
525
        request_handler: F,
526
        segment: u8,
527
    ) -> mpsc::UnboundedReceiver<u16>
528
    where
529
        HandlerRet: Future<Output = Box<CoapRequest<SocketAddr>>> + Send,
530
    {
531
        let (tx, rx) = mpsc::unbounded_channel();
532

533
        std::thread::Builder::new()
534
            .name(String::from("v4-server"))
535
            .spawn(move || {
536
                tokio::runtime::Runtime::new()
537
                    .unwrap()
538
                    .block_on(async move {
539
                        // multicast needs a server on a real interface
540
                        let sock = UdpSocket::bind((ip, 0)).await.unwrap();
541
                        let addr = sock.local_addr().unwrap();
542
                        let mut listener = Box::new(UdpCoapListener::from_socket(sock));
543
                        listener.enable_all_coap(segment);
544
                        let server = Server::from_listeners(vec![listener]);
545
                        tx.send(addr.port()).unwrap();
546
                        server.run(request_handler).await.unwrap();
547
                    })
548
            })
549
            .unwrap();
550

551
        rx
552
    }
553

554
    pub fn spawn_server_disable_observe<
555
        F: Fn(Box<CoapRequest<SocketAddr>>) -> HandlerRet + Send + Sync + 'static,
556
        HandlerRet,
557
    >(
558
        ip: &'static str,
559
        request_handler: F,
560
    ) -> mpsc::UnboundedReceiver<u16>
561
    where
562
        HandlerRet: Future<Output = Box<CoapRequest<SocketAddr>>> + Send,
563
    {
564
        let (tx, rx) = mpsc::unbounded_channel();
565
        let _task = tokio::spawn(async move {
566
            let sock = UdpSocket::bind(ip).await.unwrap();
567
            let addr = sock.local_addr().unwrap();
568
            let listener = Box::new(UdpCoapListener::from_socket(sock));
569
            let mut server = Server::from_listeners(vec![listener]);
570
            server.disable_observe_handling(true).await;
571
            tx.send(addr.port()).unwrap();
572
            server.run(request_handler).await.unwrap();
573
        });
574

575
        rx
576
    }
577

578
    #[tokio::test]
579
    async fn test_listener_instantiation() {
580
        let listener = UdpCoapListener::new("127.0.0.1:0").unwrap();
581
        assert!(
582
            listener.socket.local_addr().unwrap().ip() == IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))
583
        );
584
        // assert!(listener.socket.blocking() == false);
585

586
        let explicit_socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
587
        let another_listener = UdpCoapListener::from_socket(explicit_socket);
588
        assert!(
589
            another_listener.socket.local_addr().unwrap().ip()
590
                == IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))
591
        );
592
    }
593

594
    #[tokio::test]
595
    async fn test_echo_server() {
596
        let server_port = spawn_server("127.0.0.1:0", request_handler)
597
            .recv()
598
            .await
599
            .unwrap();
600

601
        let client = UdpCoAPClient::new(format!("127.0.0.1:{}", server_port))
602
            .await
603
            .unwrap();
604
        let mut request = CoapRequest::new();
605
        request.message.header.set_version(1);
606
        request
607
            .message
608
            .header
609
            .set_type(coap_lite::MessageType::Confirmable);
610
        request.message.header.set_code("0.01");
611
        request.message.header.message_id = 1;
612
        request.message.set_token(vec![0x51, 0x55, 0x77, 0xE8]);
613
        request
614
            .message
615
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
616
        client.send_single_request(&request).await.unwrap();
617

618
        let recv_packet = client.send(request).await.unwrap();
619
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
620
    }
621

622
    #[tokio::test]
623
    async fn test_put_block() {
624
        let server_port = spawn_server("127.0.0.1:0", request_handler)
625
            .recv()
626
            .await
627
            .unwrap();
628
        let data = "hello this is a payload";
629
        let mut v = Vec::new();
630
        for _ in 0..1024 {
631
            v.extend_from_slice(data.as_bytes());
632
        }
633
        let payload_size = v.len();
634
        let server_string = format!("127.0.0.1:{}", server_port);
635
        let client = UdpCoAPClient::new(server_string.clone()).await.unwrap();
636

637
        let request = RequestBuilder::new("/large", RequestType::Put)
638
            .data(Some(v))
639
            .domain(server_string.clone())
640
            .build();
641

642
        let resp = client.send(request).await.unwrap();
643
        let block_opt = resp
644
            .message
645
            .get_first_option_as::<BlockValue>(CoapOption::Block1)
646
            .expect("expected block opt in response")
647
            .expect("could not decode block1 option");
648
        let expected_number = (payload_size as f32 / 1024.0).ceil() as u16 - 1;
649
        assert_eq!(
650
            block_opt.num, expected_number,
651
            "block not completely received!"
652
        );
653

654
        assert_eq!(resp.message.payload, b"large".to_vec());
655
    }
656

657
    #[tokio::test]
658
    #[ignore]
659
    async fn test_echo_server_v6() {
660
        let server_port = spawn_server("::1:0", request_handler).recv().await.unwrap();
661

662
        let client = UdpCoAPClient::new(format!("::1:{}", server_port))
663
            .await
664
            .unwrap();
665
        let mut request = CoapRequest::new();
666
        request.message.header.set_version(1);
667
        request
668
            .message
669
            .header
670
            .set_type(coap_lite::MessageType::Confirmable);
671
        request.message.header.set_code("0.01");
672
        request.message.header.message_id = 1;
673
        request.message.set_token(vec![0x51, 0x55, 0x77, 0xE8]);
674
        request
675
            .message
676
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
677

678
        let recv_packet = client.send(request).await.unwrap();
679
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
680
    }
681

682
    #[tokio::test]
683
    async fn test_echo_server_no_token() {
684
        let server_port = spawn_server("127.0.0.1:0", request_handler)
685
            .recv()
686
            .await
687
            .unwrap();
688

689
        let client = UdpCoAPClient::new(format!("127.0.0.1:{}", server_port))
690
            .await
691
            .unwrap();
692
        let mut packet = CoapRequest::new();
693
        packet.message.header.set_version(1);
694
        packet
695
            .message
696
            .header
697
            .set_type(coap_lite::MessageType::Confirmable);
698
        packet.message.header.set_code("0.01");
699
        packet.message.header.message_id = 1;
700
        packet
701
            .message
702
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
703
        let recv_packet = client.send(packet).await.unwrap();
704
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
705
    }
706

707
    #[tokio::test]
708
    #[ignore]
709
    async fn test_echo_server_no_token_v6() {
710
        let server_port = spawn_server("::1:0", request_handler).recv().await.unwrap();
711

712
        let client = UdpCoAPClient::new(format!("::1:{}", server_port))
713
            .await
714
            .unwrap();
715
        let mut packet = CoapRequest::new();
716
        packet.message.header.set_version(1);
717
        packet
718
            .message
719
            .header
720
            .set_type(coap_lite::MessageType::Confirmable);
721
        packet.message.header.set_code("0.01");
722
        packet.message.header.message_id = 1;
723
        packet
724
            .message
725
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
726

727
        let recv_packet = client.send(packet).await.unwrap();
728
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
729
    }
730

731
    #[tokio::test]
732
    async fn test_update_resource() {
733
        let path = "/test";
734
        let payload1 = b"data1".to_vec();
735
        let payload2 = b"data2".to_vec();
736
        let (tx, mut rx) = mpsc::unbounded_channel();
737
        let (tx2, mut rx2) = mpsc::unbounded_channel();
738
        let mut step = 1;
739

740
        let server_port = spawn_server("127.0.0.1:0", request_handler)
741
            .recv()
742
            .await
743
            .unwrap();
744

745
        let client = UdpCoAPClient::new(format!("127.0.0.1:{}", server_port))
746
            .await
747
            .unwrap();
748

749
        tx.send(step).unwrap();
750
        let mut request = CoapRequest::new();
751
        request.set_method(RequestType::Put);
752
        request.set_path(path);
753
        request.message.payload = payload1.clone();
754
        client.send(request.clone()).await.unwrap();
755

756
        let mut receive_step = 1;
757
        let payload1_clone = payload1.clone();
758
        let payload2_clone = payload2.clone();
759
        client
760
            .observe(path, move |msg| {
761
                match rx.try_recv() {
762
                    Ok(n) => receive_step = n,
763
                    _ => (),
764
                }
765

766
                match receive_step {
767
                    1 => assert_eq!(msg.payload, payload1_clone),
768
                    2 => {
769
                        assert_eq!(msg.payload, payload2_clone);
770
                        tx2.send(()).unwrap();
771
                    }
772
                    _ => panic!("unexpected step"),
773
                }
774
            })
775
            .await
776
            .unwrap();
777

778
        step = 2;
779
        tx.send(step).unwrap();
780
        request.message.payload = payload2.clone();
781
        let client2 = UdpCoAPClient::new(format!("127.0.0.1:{}", server_port))
782
            .await
783
            .unwrap();
784
        let _ = client2.send(request).await.unwrap();
785
        assert_eq!(
786
            tokio::time::timeout(Duration::new(5, 0), rx2.recv())
787
                .await
788
                .unwrap(),
789
            Some(())
790
        );
791
    }
792

793
    #[tokio::test]
794
    async fn test_observe_transparent_transmission() {
795
        let path = "/test";
796
        let (tx, mut rx) = mpsc::unbounded_channel();
797

798
        let server_port = spawn_server_disable_observe("127.0.0.1:0", request_handler)
799
            .recv()
800
            .await
801
            .unwrap();
802

803
        let client = UdpCoAPClient::new(format!("127.0.0.1:{}", server_port))
804
            .await
805
            .unwrap();
806

807
        client
808
            .observe(path, move |msg| {
809
                assert_eq!(msg.payload, b"test".to_vec());
810
                tx.send(()).unwrap();
811
            })
812
            .await
813
            .unwrap();
814

815
        assert_eq!(
816
            tokio::time::timeout(Duration::new(5, 0), rx.recv())
817
                .await
818
                .unwrap(),
819
            Some(())
820
        );
821
    }
822

823
    #[tokio::test]
824
    async fn multicast_server_all_coap() {
825
        // segment not relevant with IPv4
826
        let segment = 0x0;
827
        let server_port = spawn_server_with_all_coap("0.0.0.0", request_handler, segment)
828
            .recv()
829
            .await
830
            .unwrap();
831

832
        let client = UdpCoAPClient::new(format!("127.0.0.1:{}", server_port))
833
            .await
834
            .unwrap();
835
        let mut request = CoapRequest::new();
836
        request.message.header.set_version(1);
837
        request
838
            .message
839
            .header
840
            .set_type(coap_lite::MessageType::Confirmable);
841
        request.message.header.set_code("0.01");
842
        request.message.header.message_id = 1;
843
        request.message.set_token(vec![0x51, 0x55, 0x77, 0xE8]);
844
        request
845
            .message
846
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
847
        let recv_packet = client.send(request).await.unwrap();
848

849
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
850

851
        let client = UdpCoAPClient::new(format!("224.0.1.187:{}", server_port))
852
            .await
853
            .unwrap();
854
        let mut request = RequestBuilder::new("test-echo", RequestType::Get)
855
            .data(Some(vec![0x51, 0x55, 0x77, 0xE8]))
856
            .confirmable(true)
857
            .build();
858

859
        let mut receiver = client.create_receiver_for(&request).await;
860
        client.send_all_coap(&mut request, segment).await.unwrap();
861
        let recv_packet = receiver.receive().await.unwrap();
862
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
863
    }
864

865
    //This test right now does not work on windows
866
    #[cfg(unix)]
867
    #[tokio::test]
868
    #[ignore]
869
    async fn multicast_server_all_coap_v6() {
870
        // use segment 0x04 which should be the smallest administered scope
871

872
        let segment = 0x04;
873
        let server_port = spawn_server_with_all_coap("::0", request_handler, segment)
874
            .recv()
875
            .await
876
            .unwrap();
877

878
        let client = UdpCoAPClient::new(format!("::1:{}", server_port))
879
            .await
880
            .unwrap();
881
        let mut request = CoapRequest::new();
882
        request.message.header.set_version(1);
883
        request
884
            .message
885
            .header
886
            .set_type(coap_lite::MessageType::Confirmable);
887
        request.message.header.set_code("0.01");
888
        request.message.header.message_id = 1;
889
        request.message.set_token(vec![0x51, 0x55, 0x77, 0xE8]);
890
        request
891
            .message
892
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
893
        client.send_single_request(&request).await.unwrap();
894

895
        let recv_packet = client.send(request).await.unwrap();
896
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
897

898
        // use 0xff02 to keep it within this network
899
        let client = UdpCoAPClient::new(format!("ff0{}::fd:{}", segment, server_port))
900
            .await
901
            .unwrap();
902
        let mut request = CoapRequest::new();
903
        request.message.header.set_version(1);
904
        request
905
            .message
906
            .header
907
            .set_type(coap_lite::MessageType::NonConfirmable);
908
        request.message.header.set_code("0.01");
909
        request.message.header.message_id = 2;
910
        request.message.set_token(vec![0x51, 0x55, 0x77, 0xE8]);
911
        request
912
            .message
913
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
914
        let mut receiver = client.create_receiver_for(&request).await;
915
        client.send_all_coap(&mut request, segment).await.unwrap();
916
        let recv_packet = receiver.receive().await.unwrap();
917
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
918
    }
919

920
    #[test]
921
    fn multicast_join_leave() {
922
        std::thread::Builder::new()
923
            .name(String::from("v4-server"))
924
            .spawn(move || {
925
                tokio::runtime::Runtime::new()
926
                    .unwrap()
927
                    .block_on(async move {
928
                        // multicast needs a server on a real interface
929
                        let sock = UdpSocket::bind(("0.0.0.0", 0)).await.unwrap();
930
                        let mut listener = Box::new(UdpCoapListener::from_socket(sock));
931
                        listener.join_multicast(IpAddr::V4(Ipv4Addr::new(224, 0, 1, 1)));
932
                        listener.join_multicast(IpAddr::V4(Ipv4Addr::new(224, 1, 1, 1)));
933
                        listener.leave_multicast(IpAddr::V4(Ipv4Addr::new(224, 0, 1, 1)));
934
                        listener.leave_multicast(IpAddr::V4(Ipv4Addr::new(224, 1, 1, 1)));
935
                        let server = Server::from_listeners(vec![listener]);
936
                        server.run(request_handler).await.unwrap();
937
                    })
938
            })
939
            .unwrap();
940

941
        std::thread::sleep(std::time::Duration::from_secs(1));
942
    }
943
    #[test]
944
    #[ignore]
945
    fn multicast_join_leave_v6() {
946
        std::thread::Builder::new()
947
            .name(String::from("v6-server"))
948
            .spawn(move || {
949
                tokio::runtime::Runtime::new()
950
                    .unwrap()
951
                    .block_on(async move {
952
                        // multicast needs a server on a real interface
953
                        let sock = UdpSocket::bind(("0.0.0.0", 0)).await.unwrap();
954
                        let mut listener = Box::new(UdpCoapListener::from_socket(sock));
955
                        listener.join_multicast(IpAddr::V6(Ipv6Addr::new(
956
                            0xff02, 0, 0, 0, 0, 0, 1, 0x1,
957
                        )));
958
                        listener.join_multicast(IpAddr::V6(Ipv6Addr::new(
959
                            0xff02, 0, 0, 0, 0, 1, 0, 0x2,
960
                        )));
961
                        listener.leave_multicast(IpAddr::V6(Ipv6Addr::new(
962
                            0xff02, 0, 0, 0, 0, 0, 1, 0x1,
963
                        )));
964
                        listener.join_multicast(IpAddr::V6(Ipv6Addr::new(
965
                            0xff02, 0, 0, 0, 0, 1, 0, 0x2,
966
                        )));
967
                        let server = Server::from_listeners(vec![listener]);
968
                        server.run(request_handler).await.unwrap();
969
                    })
970
            })
971
            .unwrap();
972

973
        std::thread::sleep(std::time::Duration::from_secs(1));
974
    }
975

976
    fn get_expected_response() -> Vec<u8> {
977
        let mut resp = vec![];
978
        for c in b'a'..=b'z' {
979
            resp.extend(std::iter::repeat(c).take(1024));
980
        }
981
        resp
982
    }
983
    async fn block2_responder(
984
        mut req: Box<CoapRequest<SocketAddr>>,
985
    ) -> Box<CoapRequest<SocketAddr>> {
986
        // vec should contain 'a' 1024 times, then 'b' 1024, up to ascii 'z'
987

988
        match req.response {
989
            Some(ref mut response) => {
990
                response.message.payload = get_expected_response();
991
            }
992
            _ => {}
993
        }
994
        return req;
995
    }
996
    #[tokio::test]
997
    async fn test_block2_server_response() {
998
        let server_port = spawn_server("127.0.0.1:0", block2_responder)
999
            .recv()
1000
            .await
1001
            .unwrap();
1002

1003
        let client = UdpCoAPClient::new(format!("127.0.0.1:{}", server_port))
1004
            .await
1005
            .unwrap();
1006
        let resp = client
1007
            .send(RequestBuilder::new("/", RequestType::Get).build())
1008
            .await
1009
            .unwrap();
1010
        assert_eq!(
1011
            resp.message.payload,
1012
            get_expected_response(),
1013
            "responses do not match"
1014
        );
1015
    }
1016
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc