• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Covertness / coap-rs / 17128430288

21 Aug 2025 01:30PM UTC coverage: 82.832% (+0.6%) from 82.27%
17128430288

push

github

web-flow
Merge pull request #119 from SimenZhor/client-blockwise-observe

Add client-side support for block-wise observe

69 of 78 new or added lines in 1 file covered. (88.46%)

6 existing lines in 3 files now uncovered.

743 of 897 relevant lines covered (82.83%)

3.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.03
/src/server.rs
1
use async_trait::async_trait;
2
use coap_lite::{BlockHandler, BlockHandlerConfig, CoapRequest, CoapResponse, Packet};
3
use log::debug;
4
use std::{
5
    future::Future,
6
    io::ErrorKind,
7
    net::{self, IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs},
8
    sync::Arc,
9
};
10
use tokio::{
11
    io,
12
    net::UdpSocket,
13
    select,
14
    sync::{
15
        mpsc::{self, UnboundedReceiver, UnboundedSender},
16
        Mutex,
17
    },
18
    task::JoinHandle,
19
};
20

21
use crate::observer::Observer;
22

23
#[derive(Debug)]
24
pub enum CoAPServerError {
25
    NetworkError,
26
    EventLoopError,
27
    AnotherHandlerIsRunning,
28
    EventSendError,
29
}
30

31
use tokio::io::Error;
32

33
#[async_trait]
34
pub trait Dispatcher: Send + Sync {
35
    async fn dispatch(&self, request: CoapRequest<SocketAddr>) -> Option<CoapResponse>;
36
}
37

38
#[async_trait]
39
/// This trait represents a generic way to respond to a listener. If you want to implement your own
40
/// listener, you have to implement this trait to be able to send responses back through the
41
/// correct transport
42
pub trait Responder: Sync + Send {
43
    async fn respond(&self, response: Vec<u8>);
44
    fn address(&self) -> SocketAddr;
45
}
46

47
/// channel to send new requests from a transport to the CoAP server
48
pub type TransportRequestSender = UnboundedSender<(Vec<u8>, Arc<dyn Responder>)>;
49

50
/// channel used by CoAP server to receive new requests
51
pub type TransportRequestReceiver = UnboundedReceiver<(Vec<u8>, Arc<dyn Responder>)>;
52

53
type UdpResponseReceiver = UnboundedReceiver<(Vec<u8>, SocketAddr)>;
54
type UdpResponseSender = UnboundedSender<(Vec<u8>, SocketAddr)>;
55

56
// listeners receive new connections
57
#[async_trait]
58
pub trait Listener: Send {
59
    async fn listen(
60
        self: Box<Self>,
61
        sender: TransportRequestSender,
62
    ) -> std::io::Result<JoinHandle<std::io::Result<()>>>;
63
}
64
/// listener for a UDP socket
65
pub struct UdpCoapListener {
66
    socket: UdpSocket,
67
    multicast_addresses: Vec<IpAddr>,
68
    response_receiver: UdpResponseReceiver,
69
    response_sender: UdpResponseSender,
70
}
71

72
#[async_trait]
73
/// A trait for handling incoming requests. Use this instead of a closure
74
/// if you want to modify some external state
75
pub trait RequestHandler: Send + Sync + 'static {
76
    async fn handle_request(
77
        &self,
78
        mut request: Box<CoapRequest<SocketAddr>>,
79
    ) -> Box<CoapRequest<SocketAddr>>;
80
}
81

82
#[async_trait]
83
impl<F, HandlerRet> RequestHandler for F
84
where
85
    F: Fn(Box<CoapRequest<SocketAddr>>) -> HandlerRet + Send + Sync + 'static,
86
    HandlerRet: Future<Output = Box<CoapRequest<SocketAddr>>> + Send,
87
{
88
    async fn handle_request(
34✔
89
        &self,
90
        request: Box<CoapRequest<SocketAddr>>,
91
    ) -> Box<CoapRequest<SocketAddr>> {
92
        self(request).await
22✔
93
    }
94
}
95

96
/// A listener for UDP packets. This listener can also subscribe to multicast addresses
97
impl UdpCoapListener {
98
    pub fn new<A: ToSocketAddrs>(addr: A) -> Result<Self, Error> {
1✔
99
        let std_socket = net::UdpSocket::bind(addr)?;
1✔
100
        std_socket.set_nonblocking(true)?;
2✔
101
        let socket = UdpSocket::from_std(std_socket)?;
1✔
102
        Ok(Self::from_socket(socket))
2✔
103
    }
104

105
    pub fn from_socket(socket: tokio::net::UdpSocket) -> Self {
1✔
106
        let (tx, rx) = mpsc::unbounded_channel();
2✔
107
        Self {
108
            socket,
109
            multicast_addresses: Vec::new(),
1✔
110
            response_receiver: rx,
111
            response_sender: tx,
112
        }
113
    }
114

115
    /// join multicast - adds the multicast addresses to the unicast listener
116
    /// - IPv4 multicast address range is '224.0.0.0/4'
117
    /// - IPv6 AllCoAp multicast addresses are 'ff00::/8'
118
    ///
119
    /// Parameter segment is used with IPv6 to determine the first octet.
120
    /// - It's value can be between 0x0 and 0xf.
121
    /// - To join multiple segments, you have to call enable_discovery for each of the segments.
122
    ///
123
    /// Some Multicast address scope
124
    /// IPv6        IPv4 equivalent[16]                Scope                    Purpose
125
    /// ffx1::/16        127.0.0.0/8                        Interface-local            Packets with this destination address may not be sent over any network link, but must remain within the current node; this is the multicast equivalent of the unicast loopback address.
126
    /// ffx2::/16        224.0.0.0/24                    Link-local                Packets with this destination address may not be routed anywhere.
127
    /// ffx3::/16        239.255.0.0/16                    IPv4 local scope
128
    /// ffx4::/16                                        Admin-local                The smallest scope that must be administratively configured.
129
    /// ffx5::/16                                        Site-local                Restricted to the local physical network.
130
    /// ffx8::/16        239.192.0.0/14                    Organization-local        Restricted to networks used by the organization administering the local network. (For example, these addresses might be used over VPNs; when packets for this group are routed over the public internet (where these addresses are not valid), they would have to be encapsulated in some other protocol.)
131
    /// ffxe::/16        224.0.1.0-238.255.255.255        Global scope            Eligible to be routed over the public internet.
132
    ///
133
    /// Notable addresses:
134
    /// ff02::1            All nodes on the local network segment
135
    /// ff0x::c            Simple Service Discovery Protocol
136
    /// ff0x::fb        Multicast DNS
137
    /// ff0x::fb        Multicast CoAP
138
    /// ff0x::114        Used for experiments
139
    //    pub fn join_multicast(&mut self, addr: IpAddr) {
140
    //        self.udp_server.join_multicast(addr);
141
    //    }
142
    pub fn join_multicast(&mut self, addr: IpAddr) {
1✔
143
        assert!(addr.is_multicast());
1✔
144
        // determine wether IPv4 or IPv6 and
145
        // join the appropriate multicast address
146
        match self.socket.local_addr().unwrap() {
1✔
147
            SocketAddr::V4(val) => {
1✔
148
                match addr {
1✔
149
                    IpAddr::V4(ipv4) => {
1✔
150
                        let i = val.ip().clone();
1✔
151
                        self.socket.join_multicast_v4(ipv4, i).unwrap();
1✔
152
                        self.multicast_addresses.push(addr);
2✔
153
                    }
154
                    IpAddr::V6(_ipv6) => { /* handle IPv6 */ }
×
155
                }
156
            }
157
            SocketAddr::V6(_val) => {
×
158
                match addr {
×
159
                    IpAddr::V4(_ipv4) => { /* handle IPv4 */ }
×
160
                    IpAddr::V6(ipv6) => {
×
161
                        self.socket.join_multicast_v6(&ipv6, 0).unwrap();
×
162
                        self.multicast_addresses.push(addr);
×
163
                        //self.socket.set_only_v6(true)?;
164
                    }
165
                }
166
            }
167
        }
168
    }
169

170
    /// leave multicast - remove the multicast address from the listener
171
    pub fn leave_multicast(&mut self, addr: IpAddr) {
1✔
172
        assert!(addr.is_multicast());
1✔
173
        // determine wether IPv4 or IPv6 and
174
        // leave the appropriate multicast address
175
        match self.socket.local_addr().unwrap() {
1✔
176
            SocketAddr::V4(val) => {
1✔
177
                match addr {
1✔
178
                    IpAddr::V4(ipv4) => {
1✔
179
                        let i = val.ip().clone();
1✔
180
                        self.socket.leave_multicast_v4(ipv4, i).unwrap();
1✔
181
                        let index = self
1✔
182
                            .multicast_addresses
183
                            .iter()
184
                            .position(|&item| item == addr)
3✔
185
                            .unwrap();
186
                        self.multicast_addresses.remove(index);
1✔
187
                    }
188
                    IpAddr::V6(_ipv6) => { /* handle IPv6 */ }
×
189
                }
190
            }
191
            SocketAddr::V6(_val) => {
×
192
                match addr {
×
193
                    IpAddr::V4(_ipv4) => { /* handle IPv4 */ }
×
194
                    IpAddr::V6(ipv6) => {
×
195
                        self.socket.leave_multicast_v6(&ipv6, 0).unwrap();
×
196
                        let index = self
×
197
                            .multicast_addresses
198
                            .iter()
199
                            .position(|&item| item == addr)
×
200
                            .unwrap();
201
                        self.multicast_addresses.remove(index);
×
202
                    }
203
                }
204
            }
205
        }
206
    }
207
    /// enable AllCoAP multicasts - adds the AllCoap addresses to the listener
208
    /// - IPv4 AllCoAP multicast address is '224.0.1.187'
209
    /// - IPv6 AllCoAp multicast addresses are 'ff0?::fd'
210
    ///
211
    /// Parameter segment is used with IPv6 to determine the first octet.
212
    /// - It's value can be between 0x0 and 0xf.
213
    /// - To join multiple segments, you have to call enable_discovery for each of the segments.
214
    ///
215
    /// For further details see method join_multicast
216
    pub fn enable_all_coap(&mut self, segment: u8) {
1✔
217
        assert!(segment <= 0xf);
1✔
218
        let m = match self.socket.local_addr().unwrap() {
1✔
219
            SocketAddr::V4(_val) => IpAddr::V4(Ipv4Addr::new(224, 0, 1, 187)),
1✔
220
            SocketAddr::V6(_val) => IpAddr::V6(Ipv6Addr::new(
×
221
                0xff00 + segment as u16,
×
222
                0,
223
                0,
224
                0,
225
                0,
226
                0,
227
                0,
228
                0xfd,
229
            )),
230
        };
231
        self.join_multicast(m);
1✔
232
    }
233
}
234
#[async_trait]
235
impl Listener for UdpCoapListener {
236
    async fn listen(
5✔
237
        mut self: Box<Self>,
238
        sender: TransportRequestSender,
239
    ) -> std::io::Result<JoinHandle<std::io::Result<()>>> {
240
        return Ok(tokio::spawn(self.receive_loop(sender)));
3✔
241
    }
242
}
243

244
#[derive(Clone)]
245
struct UdpResponder {
246
    address: SocketAddr, // this is the address we are sending to
247
    tx: UdpResponseSender,
248
}
249

250
#[async_trait]
251
impl Responder for UdpResponder {
252
    async fn respond(&self, response: Vec<u8>) {
4✔
253
        let _ = self.tx.send((response, self.address));
2✔
254
    }
255
    fn address(&self) -> SocketAddr {
1✔
256
        self.address
1✔
257
    }
258
}
259

260
impl UdpCoapListener {
261
    pub async fn receive_loop(mut self, sender: TransportRequestSender) -> std::io::Result<()> {
4✔
262
        loop {
2✔
263
            let mut recv_vec = Vec::with_capacity(u16::MAX as usize);
2✔
264
            select! {
2✔
265
                message =self.socket.recv_buf_from(&mut recv_vec)=> {
266
                    match message {
267
                        Ok((_size, from)) => {
268
                            sender.send((recv_vec, Arc::new(UdpResponder{address: from, tx: self.response_sender.clone()}))).map_err( |_| std::io::Error::new(ErrorKind::Other, "server channel error"))?;
269
                        }
270
                        Err(e) => {
271
                            return Err(e);
272
                        }
273
                    }
274
                },
275
                response = self.response_receiver.recv() => {
276
                    if let Some((bytes, to)) = response{
277
                        debug!("sending {:?} to {:?}", &bytes,  &to);
278
                        self.socket.send_to(&bytes, to).await?;
279
                    }
280
                    else {
281
                        // in case nobody is listening to us, we can just terminate, though this
282
                        // should never happen for UDP
283
                        return Ok(());
284
                    }
285

286
                }
287
            }
288
        }
289
    }
290
}
291

292
#[derive(Debug)]
293
pub struct QueuedMessage {
294
    pub address: SocketAddr,
295
    pub message: Packet,
296
}
297

298
struct ServerCoapState {
299
    observer: Observer,
300
    block_handler: BlockHandler<SocketAddr>,
301
    disable_observe: bool,
302
}
303

304
pub enum ShouldForwardToHandler {
305
    True,
306
    False,
307
}
308

309
impl ServerCoapState {
310
    pub async fn intercept_request(
1✔
311
        &mut self,
312
        request: &mut CoapRequest<SocketAddr>,
313
        responder: Arc<dyn Responder>,
314
    ) -> ShouldForwardToHandler {
315
        match self.block_handler.intercept_request(request) {
3✔
316
            Ok(true) => return ShouldForwardToHandler::False,
1✔
317
            Err(_err) => return ShouldForwardToHandler::False,
×
318
            Ok(false) => {}
319
        };
320

321
        if self.disable_observe {
1✔
322
            return ShouldForwardToHandler::True;
1✔
323
        }
324

325
        let should_be_forwarded = self.observer.request_handler(request, responder).await;
2✔
326
        if should_be_forwarded {
1✔
327
            return ShouldForwardToHandler::True;
1✔
328
        } else {
329
            return ShouldForwardToHandler::False;
1✔
330
        }
331
    }
332

333
    pub async fn intercept_response(&mut self, request: &mut CoapRequest<SocketAddr>) {
4✔
334
        match self.block_handler.intercept_response(request) {
2✔
335
            Err(err) => {
×
336
                let _ = request.apply_from_error(err);
×
337
            }
338
            _ => {}
1✔
339
        }
340
    }
341
    pub fn new() -> Self {
1✔
342
        Self {
343
            observer: Observer::new(),
1✔
344
            block_handler: BlockHandler::new(BlockHandlerConfig::default()),
2✔
345
            disable_observe: false,
346
        }
347
    }
348
    pub fn disable_observe_handling(&mut self, value: bool) {
1✔
349
        self.disable_observe = value
1✔
350
    }
351
}
352

353
pub struct Server {
354
    listeners: Vec<Box<dyn Listener>>,
355
    coap_state: Arc<Mutex<ServerCoapState>>,
356
    new_packet_receiver: TransportRequestReceiver,
357
    new_packet_sender: TransportRequestSender,
358
}
359

360
impl Server {
361
    /// Creates a CoAP server listening on the given address.
362
    pub fn new_udp<A: ToSocketAddrs>(addr: A) -> Result<Self, io::Error> {
×
363
        let listener: Vec<Box<dyn Listener>> = vec![Box::new(UdpCoapListener::new(addr)?)];
×
364
        Ok(Self::from_listeners(listener))
×
365
    }
366

367
    pub fn from_listeners(listeners: Vec<Box<dyn Listener>>) -> Self {
1✔
368
        let (tx, rx) = mpsc::unbounded_channel();
2✔
369
        Server {
370
            listeners,
371
            coap_state: Arc::new(Mutex::new(ServerCoapState::new())),
2✔
372
            new_packet_receiver: rx,
373
            new_packet_sender: tx,
374
        }
375
    }
376

377
    async fn spawn_handles(
1✔
378
        listeners: Vec<Box<dyn Listener>>,
379
        sender: TransportRequestSender,
380
    ) -> std::io::Result<Vec<JoinHandle<std::io::Result<()>>>> {
381
        let mut handles = vec![];
1✔
382
        for listener in listeners.into_iter() {
4✔
383
            let handle = listener.listen(sender.clone()).await?;
3✔
384
            handles.push(handle);
1✔
385
        }
386
        return Ok(handles);
1✔
387
    }
388

389
    /// run the server.
390
    pub async fn run<Handler: RequestHandler>(mut self, handler: Handler) -> Result<(), io::Error> {
36✔
391
        let _handles = Self::spawn_handles(self.listeners, self.new_packet_sender.clone()).await?;
18✔
392

393
        let handler_arc = Arc::new(handler);
18✔
394
        // receive an input, sync our cache / states, then call custom handler
UNCOV
395
        loop {
×
396
            let (bytes, respond) =
17✔
397
                self.new_packet_receiver.recv().await.ok_or_else(|| {
×
398
                    std::io::Error::new(ErrorKind::Other, "listen channel closed")
×
399
                })?;
400
            if let Ok(packet) = Packet::from_bytes(&bytes) {
16✔
401
                let mut request = Box::new(CoapRequest::<SocketAddr>::from_packet(
24✔
402
                    packet,
8✔
403
                    respond.address(),
16✔
404
                ));
405
                let mut coap_state = self.coap_state.lock().await;
16✔
406
                let should_forward = coap_state
24✔
407
                    .intercept_request(&mut request, respond.clone())
8✔
408
                    .await;
32✔
409

410
                match should_forward {
9✔
411
                    ShouldForwardToHandler::True => {
×
412
                        let handler_clone = handler_arc.clone();
16✔
413
                        let coap_state_clone = self.coap_state.clone();
16✔
414
                        tokio::spawn(async move {
25✔
415
                            request = handler_clone.handle_request(request).await;
20✔
416
                            coap_state_clone
35✔
417
                                .lock()
7✔
418
                                .await
21✔
419
                                .intercept_response(request.as_mut())
8✔
420
                                .await;
14✔
421

422
                            Self::respond_to_request(request, respond).await;
7✔
423
                        });
424
                    }
425
                    ShouldForwardToHandler::False => {
×
426
                        Self::respond_to_request(request, respond).await;
9✔
427
                    }
428
                }
429
            }
430
        }
431
    }
432
    async fn respond_to_request(req: Box<CoapRequest<SocketAddr>>, responder: Arc<dyn Responder>) {
4✔
433
        // if we have some reponse to send, send it
434
        if let Some(Ok(b)) = req.response.map(|resp| resp.message.to_bytes()) {
6✔
435
            responder.respond(b).await;
2✔
436
        }
437
    }
438
    #[deprecated(since = "0.21.0", note = "Use 'coap::Server::automatic_observe_handling' instead.")]
439
    /// disable auto-observe handling in server
440
    pub async fn disable_observe_handling(&mut self, value: bool) {
4✔
441
        self.automatic_observe_handling(value).await
2✔
442
    }
443
    /// set auto-observe handling in server, defaults to enabled
444
    pub async fn automatic_observe_handling(&mut self, value: bool) {
4✔
445
        let mut coap_state = self.coap_state.lock().await;
2✔
446
        coap_state.disable_observe_handling(value)
2✔
447
    }
448
}
449

450
#[cfg(test)]
451
pub mod test {
452
    use crate::request::RequestBuilder;
453

454
    use super::super::*;
455
    use super::*;
456
    use coap_lite::{block_handler::BlockValue, CoapOption, RequestType};
457
    use std::str;
458
    use std::time::Duration;
459

460
    pub fn spawn_server<
461
        F: Fn(Box<CoapRequest<SocketAddr>>) -> HandlerRet + Send + Sync + 'static,
462
        HandlerRet,
463
    >(
464
        ip: &'static str,
465
        request_handler: F,
466
    ) -> mpsc::UnboundedReceiver<u16>
467
    where
468
        HandlerRet: Future<Output = Box<CoapRequest<SocketAddr>>> + Send,
469
    {
470
        let (tx, rx) = mpsc::unbounded_channel();
471
        let _task = tokio::spawn(async move {
472
            let sock = UdpSocket::bind(ip).await.unwrap();
473
            let addr = sock.local_addr().unwrap();
474
            let listener = Box::new(UdpCoapListener::from_socket(sock));
475
            let server = Server::from_listeners(vec![listener]);
476
            tx.send(addr.port()).unwrap();
477
            server.run(request_handler).await.unwrap();
478
        });
479

480
        rx
481
    }
482

483
    async fn request_handler(
484
        mut req: Box<CoapRequest<SocketAddr>>,
485
    ) -> Box<CoapRequest<SocketAddr>> {
486
        let uri_path_list = req.message.get_option(CoapOption::UriPath).unwrap().clone();
487
        assert_eq!(uri_path_list.len(), 1);
488

489
        match req.response {
490
            Some(ref mut response) => {
491
                response.message.payload = uri_path_list.front().unwrap().clone();
492
            }
493
            _ => {}
494
        }
495
        return req;
496
    }
497

498
    pub fn spawn_server_with_all_coap<
499
        F: Fn(Box<CoapRequest<SocketAddr>>) -> HandlerRet + Send + Sync + 'static,
500
        HandlerRet,
501
    >(
502
        ip: &'static str,
503
        request_handler: F,
504
        segment: u8,
505
    ) -> mpsc::UnboundedReceiver<u16>
506
    where
507
        HandlerRet: Future<Output = Box<CoapRequest<SocketAddr>>> + Send,
508
    {
509
        let (tx, rx) = mpsc::unbounded_channel();
510

511
        std::thread::Builder::new()
512
            .name(String::from("v4-server"))
513
            .spawn(move || {
514
                tokio::runtime::Runtime::new()
515
                    .unwrap()
516
                    .block_on(async move {
517
                        // multicast needs a server on a real interface
518
                        let sock = UdpSocket::bind((ip, 0)).await.unwrap();
519
                        let addr = sock.local_addr().unwrap();
520
                        let mut listener = Box::new(UdpCoapListener::from_socket(sock));
521
                        listener.enable_all_coap(segment);
522
                        let server = Server::from_listeners(vec![listener]);
523
                        tx.send(addr.port()).unwrap();
524
                        server.run(request_handler).await.unwrap();
525
                    })
526
            })
527
            .unwrap();
528

529
        rx
530
    }
531

532
    pub fn spawn_server_disable_observe<
533
        F: Fn(Box<CoapRequest<SocketAddr>>) -> HandlerRet + Send + Sync + 'static,
534
        HandlerRet,
535
    >(
536
        ip: &'static str,
537
        request_handler: F,
538
    ) -> mpsc::UnboundedReceiver<u16>
539
    where
540
        HandlerRet: Future<Output = Box<CoapRequest<SocketAddr>>> + Send,
541
    {
542
        let (tx, rx) = mpsc::unbounded_channel();
543
        let _task = tokio::spawn(async move {
544
            let sock = UdpSocket::bind(ip).await.unwrap();
545
            let addr = sock.local_addr().unwrap();
546
            let listener = Box::new(UdpCoapListener::from_socket(sock));
547
            let mut server = Server::from_listeners(vec![listener]);
548
            server.disable_observe_handling(true).await;
549
            tx.send(addr.port()).unwrap();
550
            server.run(request_handler).await.unwrap();
551
        });
552

553
        rx
554
    }
555

556
    #[tokio::test]
557
    async fn test_listener_instantiation() {
558
        let listener = UdpCoapListener::new("127.0.0.1:0").unwrap();
559
        assert!(
560
            listener.socket.local_addr().unwrap().ip() == IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))
561
        );
562
        // assert!(listener.socket.blocking() == false);
563

564
        let explicit_socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
565
        let another_listener = UdpCoapListener::from_socket(explicit_socket);
566
        assert!(
567
            another_listener.socket.local_addr().unwrap().ip()
568
                == IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))
569
        );
570
    }
571

572
    #[tokio::test]
573
    async fn test_echo_server() {
574
        let server_port = spawn_server("127.0.0.1:0", request_handler)
575
            .recv()
576
            .await
577
            .unwrap();
578

579
        let client = UdpCoAPClient::new_udp(format!("127.0.0.1:{}", server_port))
580
            .await
581
            .unwrap();
582
        let mut request = CoapRequest::new();
583
        request.message.header.set_version(1);
584
        request
585
            .message
586
            .header
587
            .set_type(coap_lite::MessageType::Confirmable);
588
        request.message.header.set_code("0.01");
589
        request.message.header.message_id = 1;
590
        request.message.set_token(vec![0x51, 0x55, 0x77, 0xE8]);
591
        request
592
            .message
593
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
594
        client.send_single_request(&request).await.unwrap();
595

596
        let recv_packet = client.send(request).await.unwrap();
597
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
598
    }
599

600
    #[tokio::test]
601
    async fn test_put_block() {
602
        let server_port = spawn_server("127.0.0.1:0", request_handler)
603
            .recv()
604
            .await
605
            .unwrap();
606
        let data = "hello this is a payload";
607
        let mut v = Vec::new();
608
        for _ in 0..1024 {
609
            v.extend_from_slice(data.as_bytes());
610
        }
611
        let payload_size = v.len();
612
        let server_string = format!("127.0.0.1:{}", server_port);
613
        let client = UdpCoAPClient::new_udp(server_string.clone()).await.unwrap();
614

615
        let request = RequestBuilder::new("/large", RequestType::Put)
616
            .data(Some(v))
617
            .domain(server_string.clone())
618
            .build();
619

620
        let resp = client.send(request).await.unwrap();
621
        let block_opt = resp
622
            .message
623
            .get_first_option_as::<BlockValue>(CoapOption::Block1)
624
            .expect("expected block opt in response")
625
            .expect("could not decode block1 option");
626
        let expected_number = (payload_size as f32 / 1024.0).ceil() as u16 - 1;
627
        assert_eq!(
628
            block_opt.num, expected_number,
629
            "block not completely received!"
630
        );
631

632
        assert_eq!(resp.message.payload, b"large".to_vec());
633
    }
634

635
    #[tokio::test]
636
    #[ignore]
637
    async fn test_echo_server_v6() {
638
        let server_port = spawn_server("::1:0", request_handler).recv().await.unwrap();
639

640
        let client = UdpCoAPClient::new_udp(format!("::1:{}", server_port))
641
            .await
642
            .unwrap();
643
        let mut request = CoapRequest::new();
644
        request.message.header.set_version(1);
645
        request
646
            .message
647
            .header
648
            .set_type(coap_lite::MessageType::Confirmable);
649
        request.message.header.set_code("0.01");
650
        request.message.header.message_id = 1;
651
        request.message.set_token(vec![0x51, 0x55, 0x77, 0xE8]);
652
        request
653
            .message
654
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
655

656
        let recv_packet = client.send(request).await.unwrap();
657
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
658
    }
659

660
    #[tokio::test]
661
    async fn test_echo_server_no_token() {
662
        let server_port = spawn_server("127.0.0.1:0", request_handler)
663
            .recv()
664
            .await
665
            .unwrap();
666

667
        let client = UdpCoAPClient::new_udp(format!("127.0.0.1:{}", server_port))
668
            .await
669
            .unwrap();
670
        let mut packet = CoapRequest::new();
671
        packet.message.header.set_version(1);
672
        packet
673
            .message
674
            .header
675
            .set_type(coap_lite::MessageType::Confirmable);
676
        packet.message.header.set_code("0.01");
677
        packet.message.header.message_id = 1;
678
        packet
679
            .message
680
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
681
        let recv_packet = client.send(packet).await.unwrap();
682
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
683
    }
684

685
    #[tokio::test]
686
    #[ignore]
687
    async fn test_echo_server_no_token_v6() {
688
        let server_port = spawn_server("::1:0", request_handler).recv().await.unwrap();
689

690
        let client = UdpCoAPClient::new_udp(format!("::1:{}", server_port))
691
            .await
692
            .unwrap();
693
        let mut packet = CoapRequest::new();
694
        packet.message.header.set_version(1);
695
        packet
696
            .message
697
            .header
698
            .set_type(coap_lite::MessageType::Confirmable);
699
        packet.message.header.set_code("0.01");
700
        packet.message.header.message_id = 1;
701
        packet
702
            .message
703
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
704

705
        let recv_packet = client.send(packet).await.unwrap();
706
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
707
    }
708

709
    #[tokio::test]
710
    async fn test_update_resource() {
711
        let path = "/test";
712
        let payload1 = b"data1".to_vec();
713
        let payload2 = b"data2".to_vec();
714
        let (tx, mut rx) = mpsc::unbounded_channel();
715
        let (tx2, mut rx2) = mpsc::unbounded_channel();
716
        let mut step = 1;
717

718
        let server_port = spawn_server("127.0.0.1:0", request_handler)
719
            .recv()
720
            .await
721
            .unwrap();
722

723
        let client = UdpCoAPClient::new_udp(format!("127.0.0.1:{}", server_port))
724
            .await
725
            .unwrap();
726

727
        tx.send(step).unwrap();
728
        let mut request = CoapRequest::new();
729
        request.set_method(RequestType::Put);
730
        request.set_path(path);
731
        request.message.payload = payload1.clone();
732
        client.send(request.clone()).await.unwrap();
733

734
        let mut receive_step = 1;
735
        let payload1_clone = payload1.clone();
736
        let payload2_clone = payload2.clone();
737
        client
738
            .observe(path, move |msg| {
739
                match rx.try_recv() {
740
                    Ok(n) => receive_step = n,
741
                    _ => (),
742
                }
743

744
                match receive_step {
745
                    1 => assert_eq!(msg.payload, payload1_clone),
746
                    2 => {
747
                        assert_eq!(msg.payload, payload2_clone);
748
                        tx2.send(()).unwrap();
749
                    }
750
                    _ => panic!("unexpected step"),
751
                }
752
            })
753
            .await
754
            .unwrap();
755

756
        step = 2;
757
        tx.send(step).unwrap();
758
        request.message.payload = payload2.clone();
759
        let client2 = UdpCoAPClient::new_udp(format!("127.0.0.1:{}", server_port))
760
            .await
761
            .unwrap();
762
        let _ = client2.send(request).await.unwrap();
763
        assert_eq!(
764
            tokio::time::timeout(Duration::new(5, 0), rx2.recv())
765
                .await
766
                .unwrap(),
767
            Some(())
768
        );
769
    }
770

771
    #[tokio::test]
772
    async fn test_observe_transparent_transmission() {
773
        let path = "/test";
774
        let (tx, mut rx) = mpsc::unbounded_channel();
775

776
        let server_port = spawn_server_disable_observe("127.0.0.1:0", request_handler)
777
            .recv()
778
            .await
779
            .unwrap();
780

781
        let client = UdpCoAPClient::new_udp(format!("127.0.0.1:{}", server_port))
782
            .await
783
            .unwrap();
784

785
        client
786
            .observe(path, move |msg| {
787
                assert_eq!(msg.payload, b"test".to_vec());
788
                tx.send(()).unwrap();
789
            })
790
            .await
791
            .unwrap();
792

793
        assert_eq!(
794
            tokio::time::timeout(Duration::new(5, 0), rx.recv())
795
                .await
796
                .unwrap(),
797
            Some(())
798
        );
799
    }
800

801
    #[tokio::test]
802
    async fn multicast_server_all_coap() {
803
        // segment not relevant with IPv4
804
        let segment = 0x0;
805
        let server_port = spawn_server_with_all_coap("0.0.0.0", request_handler, segment)
806
            .recv()
807
            .await
808
            .unwrap();
809

810
        let client = UdpCoAPClient::new_udp(format!("127.0.0.1:{}", server_port))
811
            .await
812
            .unwrap();
813
        let mut request = CoapRequest::new();
814
        request.message.header.set_version(1);
815
        request
816
            .message
817
            .header
818
            .set_type(coap_lite::MessageType::Confirmable);
819
        request.message.header.set_code("0.01");
820
        request.message.header.message_id = 1;
821
        request.message.set_token(vec![0x51, 0x55, 0x77, 0xE8]);
822
        request
823
            .message
824
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
825
        let recv_packet = client.send(request).await.unwrap();
826

827
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
828

829
        let client = UdpCoAPClient::new_udp(format!("224.0.1.187:{}", server_port))
830
            .await
831
            .unwrap();
832
        let mut request = RequestBuilder::new("test-echo", RequestType::Get)
833
            .data(Some(vec![0x51, 0x55, 0x77, 0xE8]))
834
            .confirmable(true)
835
            .build();
836

837
        let mut receiver = client.create_receiver_for(&request).await;
838
        client.send_all_coap(&mut request, segment).await.unwrap();
839
        let recv_packet = receiver.receive().await.unwrap();
840
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
841
    }
842

843
    //This test right now does not work on windows
844
    #[cfg(unix)]
845
    #[tokio::test]
846
    #[ignore]
847
    async fn multicast_server_all_coap_v6() {
848
        // use segment 0x04 which should be the smallest administered scope
849

850
        let segment = 0x04;
851
        let server_port = spawn_server_with_all_coap("::0", request_handler, segment)
852
            .recv()
853
            .await
854
            .unwrap();
855

856
        let client = UdpCoAPClient::new_udp(format!("::1:{}", server_port))
857
            .await
858
            .unwrap();
859
        let mut request = CoapRequest::new();
860
        request.message.header.set_version(1);
861
        request
862
            .message
863
            .header
864
            .set_type(coap_lite::MessageType::Confirmable);
865
        request.message.header.set_code("0.01");
866
        request.message.header.message_id = 1;
867
        request.message.set_token(vec![0x51, 0x55, 0x77, 0xE8]);
868
        request
869
            .message
870
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
871
        client.send_single_request(&request).await.unwrap();
872

873
        let recv_packet = client.send(request).await.unwrap();
874
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
875

876
        // use 0xff02 to keep it within this network
877
        let client = UdpCoAPClient::new_udp(format!("ff0{}::fd:{}", segment, server_port))
878
            .await
879
            .unwrap();
880
        let mut request = CoapRequest::new();
881
        request.message.header.set_version(1);
882
        request
883
            .message
884
            .header
885
            .set_type(coap_lite::MessageType::NonConfirmable);
886
        request.message.header.set_code("0.01");
887
        request.message.header.message_id = 2;
888
        request.message.set_token(vec![0x51, 0x55, 0x77, 0xE8]);
889
        request
890
            .message
891
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
892
        let mut receiver = client.create_receiver_for(&request).await;
893
        client.send_all_coap(&mut request, segment).await.unwrap();
894
        let recv_packet = receiver.receive().await.unwrap();
895
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
896
    }
897

898
    #[test]
899
    fn multicast_join_leave() {
900
        std::thread::Builder::new()
901
            .name(String::from("v4-server"))
902
            .spawn(move || {
903
                tokio::runtime::Runtime::new()
904
                    .unwrap()
905
                    .block_on(async move {
906
                        // multicast needs a server on a real interface
907
                        let sock = UdpSocket::bind(("0.0.0.0", 0)).await.unwrap();
908
                        let mut listener = Box::new(UdpCoapListener::from_socket(sock));
909
                        listener.join_multicast(IpAddr::V4(Ipv4Addr::new(224, 0, 1, 1)));
910
                        listener.join_multicast(IpAddr::V4(Ipv4Addr::new(224, 1, 1, 1)));
911
                        listener.leave_multicast(IpAddr::V4(Ipv4Addr::new(224, 0, 1, 1)));
912
                        listener.leave_multicast(IpAddr::V4(Ipv4Addr::new(224, 1, 1, 1)));
913
                        let server = Server::from_listeners(vec![listener]);
914
                        server.run(request_handler).await.unwrap();
915
                    })
916
            })
917
            .unwrap();
918

919
        std::thread::sleep(std::time::Duration::from_secs(1));
920
    }
921
    #[test]
922
    #[ignore]
923
    fn multicast_join_leave_v6() {
924
        std::thread::Builder::new()
925
            .name(String::from("v6-server"))
926
            .spawn(move || {
927
                tokio::runtime::Runtime::new()
928
                    .unwrap()
929
                    .block_on(async move {
930
                        // multicast needs a server on a real interface
931
                        let sock = UdpSocket::bind(("0.0.0.0", 0)).await.unwrap();
932
                        let mut listener = Box::new(UdpCoapListener::from_socket(sock));
933
                        listener.join_multicast(IpAddr::V6(Ipv6Addr::new(
934
                            0xff02, 0, 0, 0, 0, 0, 1, 0x1,
935
                        )));
936
                        listener.join_multicast(IpAddr::V6(Ipv6Addr::new(
937
                            0xff02, 0, 0, 0, 0, 1, 0, 0x2,
938
                        )));
939
                        listener.leave_multicast(IpAddr::V6(Ipv6Addr::new(
940
                            0xff02, 0, 0, 0, 0, 0, 1, 0x1,
941
                        )));
942
                        listener.join_multicast(IpAddr::V6(Ipv6Addr::new(
943
                            0xff02, 0, 0, 0, 0, 1, 0, 0x2,
944
                        )));
945
                        let server = Server::from_listeners(vec![listener]);
946
                        server.run(request_handler).await.unwrap();
947
                    })
948
            })
949
            .unwrap();
950

951
        std::thread::sleep(std::time::Duration::from_secs(1));
952
    }
953

954
    fn get_expected_response() -> Vec<u8> {
955
        let mut resp = vec![];
956
        for c in b'a'..=b'z' {
957
            resp.extend(std::iter::repeat(c).take(1024));
958
        }
959
        resp
960
    }
961
    async fn block2_responder(
962
        mut req: Box<CoapRequest<SocketAddr>>,
963
    ) -> Box<CoapRequest<SocketAddr>> {
964
        // vec should contain 'a' 1024 times, then 'b' 1024, up to ascii 'z'
965

966
        match req.response {
967
            Some(ref mut response) => {
968
                response.message.payload = get_expected_response();
969
            }
970
            _ => {}
971
        }
972
        return req;
973
    }
974
    #[tokio::test]
975
    async fn test_block2_server_response() {
976
        let server_port = spawn_server("127.0.0.1:0", block2_responder)
977
            .recv()
978
            .await
979
            .unwrap();
980

981
        let client = UdpCoAPClient::new_udp(format!("127.0.0.1:{}", server_port))
982
            .await
983
            .unwrap();
984
        let resp = client
985
            .send(RequestBuilder::new("/", RequestType::Get).build())
986
            .await
987
            .unwrap();
988
        assert_eq!(
989
            resp.message.payload,
990
            get_expected_response(),
991
            "responses do not match"
992
        );
993
    }
994
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc