• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Covertness / coap-rs / 25617157971

10 May 2026 02:01AM UTC coverage: 85.218% (-0.07%) from 85.288%
25617157971

push

github

web-flow
Merge pull request #129 from SimenZhor/deregister-opts

Add options (from registration packet) to deregistration packet

4 of 4 new or added lines in 1 file covered. (100.0%)

1 existing line in 1 file now uncovered.

859 of 1008 relevant lines covered (85.22%)

4.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.69
/src/server.rs
1
use async_trait::async_trait;
2
use coap_lite::{BlockHandler, BlockHandlerConfig, CoapOption, CoapRequest, CoapResponse, Packet};
3
use log::debug;
4
use std::{
5
    future::Future,
6
    io::ErrorKind,
7
    net::{self, IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs},
8
    sync::Arc,
9
};
10
use tokio::{
11
    io,
12
    net::UdpSocket,
13
    select,
14
    sync::{
15
        mpsc::{self, UnboundedReceiver, UnboundedSender},
16
        Mutex,
17
    },
18
    task::JoinHandle,
19
};
20

21
use crate::observer::{encode_coap_uint, Observer};
22

23
#[derive(Debug)]
24
pub enum CoAPServerError {
25
    NetworkError,
26
    EventLoopError,
27
    AnotherHandlerIsRunning,
28
    EventSendError,
29
}
30

31
use tokio::io::Error;
32

33
#[async_trait]
34
pub trait Dispatcher: Send + Sync {
35
    async fn dispatch(&self, request: CoapRequest<SocketAddr>) -> Option<CoapResponse>;
36
}
37

38
#[async_trait]
39
/// This trait represents a generic way to respond to a listener. If you want to implement your own
40
/// listener, you have to implement this trait to be able to send responses back through the
41
/// correct transport
42
pub trait Responder: Sync + Send {
43
    async fn respond(&self, response: Vec<u8>);
44
    fn address(&self) -> SocketAddr;
45
}
46

47
/// channel to send new requests from a transport to the CoAP server
48
pub type TransportRequestSender = UnboundedSender<(Vec<u8>, Arc<dyn Responder>)>;
49

50
/// channel used by CoAP server to receive new requests
51
pub type TransportRequestReceiver = UnboundedReceiver<(Vec<u8>, Arc<dyn Responder>)>;
52

53
type UdpResponseReceiver = UnboundedReceiver<(Vec<u8>, SocketAddr)>;
54
type UdpResponseSender = UnboundedSender<(Vec<u8>, SocketAddr)>;
55

56
// listeners receive new connections
57
#[async_trait]
58
pub trait Listener: Send {
59
    async fn listen(
60
        self: Box<Self>,
61
        sender: TransportRequestSender,
62
    ) -> std::io::Result<JoinHandle<std::io::Result<()>>>;
63
}
64
/// listener for a UDP socket
65
pub struct UdpCoapListener {
66
    socket: UdpSocket,
67
    multicast_addresses: Vec<IpAddr>,
68
    response_receiver: UdpResponseReceiver,
69
    response_sender: UdpResponseSender,
70
}
71

72
#[async_trait]
73
/// A trait for handling incoming requests. Use this instead of a closure
74
/// if you want to modify some external state
75
pub trait RequestHandler: Send + Sync + 'static {
76
    async fn handle_request(
77
        &self,
78
        mut request: Box<CoapRequest<SocketAddr>>,
79
    ) -> Box<CoapRequest<SocketAddr>>;
80
}
81

82
#[async_trait]
83
impl<F, HandlerRet> RequestHandler for F
84
where
85
    F: Fn(Box<CoapRequest<SocketAddr>>) -> HandlerRet + Send + Sync + 'static,
86
    HandlerRet: Future<Output = Box<CoapRequest<SocketAddr>>> + Send,
87
{
88
    async fn handle_request(
39✔
89
        &self,
90
        request: Box<CoapRequest<SocketAddr>>,
91
    ) -> Box<CoapRequest<SocketAddr>> {
92
        self(request).await
24✔
93
    }
94
}
95

96
/// A listener for UDP packets. This listener can also subscribe to multicast addresses
97
impl UdpCoapListener {
98
    pub fn new<A: ToSocketAddrs>(addr: A) -> Result<Self, Error> {
1✔
99
        let std_socket = net::UdpSocket::bind(addr)?;
1✔
100
        std_socket.set_nonblocking(true)?;
2✔
101
        let socket = UdpSocket::from_std(std_socket)?;
1✔
102
        Ok(Self::from_socket(socket))
2✔
103
    }
104

105
    pub fn from_socket(socket: tokio::net::UdpSocket) -> Self {
1✔
106
        let (tx, rx) = mpsc::unbounded_channel();
2✔
107
        Self {
108
            socket,
109
            multicast_addresses: Vec::new(),
1✔
110
            response_receiver: rx,
111
            response_sender: tx,
112
        }
113
    }
114

115
    /// join multicast - adds the multicast addresses to the unicast listener
116
    /// - IPv4 multicast address range is '224.0.0.0/4'
117
    /// - IPv6 AllCoAp multicast addresses are 'ff00::/8'
118
    ///
119
    /// Parameter segment is used with IPv6 to determine the first octet.
120
    /// - It's value can be between 0x0 and 0xf.
121
    /// - To join multiple segments, you have to call enable_discovery for each of the segments.
122
    ///
123
    /// Some Multicast address scope
124
    /// IPv6        IPv4 equivalent[16]                Scope                    Purpose
125
    /// ffx1::/16        127.0.0.0/8                        Interface-local            Packets with this destination address may not be sent over any network link, but must remain within the current node; this is the multicast equivalent of the unicast loopback address.
126
    /// ffx2::/16        224.0.0.0/24                    Link-local                Packets with this destination address may not be routed anywhere.
127
    /// ffx3::/16        239.255.0.0/16                    IPv4 local scope
128
    /// ffx4::/16                                        Admin-local                The smallest scope that must be administratively configured.
129
    /// ffx5::/16                                        Site-local                Restricted to the local physical network.
130
    /// ffx8::/16        239.192.0.0/14                    Organization-local        Restricted to networks used by the organization administering the local network. (For example, these addresses might be used over VPNs; when packets for this group are routed over the public internet (where these addresses are not valid), they would have to be encapsulated in some other protocol.)
131
    /// ffxe::/16        224.0.1.0-238.255.255.255        Global scope            Eligible to be routed over the public internet.
132
    ///
133
    /// Notable addresses:
134
    /// ff02::1            All nodes on the local network segment
135
    /// ff0x::c            Simple Service Discovery Protocol
136
    /// ff0x::fb        Multicast DNS
137
    /// ff0x::fb        Multicast CoAP
138
    /// ff0x::114        Used for experiments
139
    //    pub fn join_multicast(&mut self, addr: IpAddr) {
140
    //        self.udp_server.join_multicast(addr);
141
    //    }
142
    pub fn join_multicast(&mut self, addr: IpAddr) {
1✔
143
        assert!(addr.is_multicast());
1✔
144
        // determine wether IPv4 or IPv6 and
145
        // join the appropriate multicast address
146
        match self.socket.local_addr().unwrap() {
1✔
147
            SocketAddr::V4(val) => {
1✔
148
                match addr {
1✔
149
                    IpAddr::V4(ipv4) => {
1✔
150
                        let i = val.ip().clone();
1✔
151
                        self.socket.join_multicast_v4(ipv4, i).unwrap();
1✔
152
                        self.multicast_addresses.push(addr);
1✔
153
                    }
154
                    IpAddr::V6(_ipv6) => { /* handle IPv6 */ }
×
155
                }
156
            }
157
            SocketAddr::V6(_val) => {
×
158
                match addr {
×
159
                    IpAddr::V4(_ipv4) => { /* handle IPv4 */ }
×
160
                    IpAddr::V6(ipv6) => {
×
161
                        self.socket.join_multicast_v6(&ipv6, 0).unwrap();
×
162
                        self.multicast_addresses.push(addr);
×
163
                        //self.socket.set_only_v6(true)?;
164
                    }
165
                }
166
            }
167
        }
168
    }
169

170
    /// leave multicast - remove the multicast address from the listener
171
    pub fn leave_multicast(&mut self, addr: IpAddr) {
1✔
172
        assert!(addr.is_multicast());
1✔
173
        // determine wether IPv4 or IPv6 and
174
        // leave the appropriate multicast address
175
        match self.socket.local_addr().unwrap() {
1✔
176
            SocketAddr::V4(val) => {
1✔
177
                match addr {
1✔
178
                    IpAddr::V4(ipv4) => {
1✔
179
                        let i = val.ip().clone();
1✔
180
                        self.socket.leave_multicast_v4(ipv4, i).unwrap();
1✔
181
                        let index = self
1✔
182
                            .multicast_addresses
183
                            .iter()
184
                            .position(|&item| item == addr)
3✔
185
                            .unwrap();
186
                        self.multicast_addresses.remove(index);
1✔
187
                    }
188
                    IpAddr::V6(_ipv6) => { /* handle IPv6 */ }
×
189
                }
190
            }
191
            SocketAddr::V6(_val) => {
×
192
                match addr {
×
193
                    IpAddr::V4(_ipv4) => { /* handle IPv4 */ }
×
194
                    IpAddr::V6(ipv6) => {
×
195
                        self.socket.leave_multicast_v6(&ipv6, 0).unwrap();
×
196
                        let index = self
×
197
                            .multicast_addresses
198
                            .iter()
199
                            .position(|&item| item == addr)
×
200
                            .unwrap();
201
                        self.multicast_addresses.remove(index);
×
202
                    }
203
                }
204
            }
205
        }
206
    }
207
    /// enable AllCoAP multicasts - adds the AllCoap addresses to the listener
208
    /// - IPv4 AllCoAP multicast address is '224.0.1.187'
209
    /// - IPv6 AllCoAp multicast addresses are 'ff0?::fd'
210
    ///
211
    /// Parameter segment is used with IPv6 to determine the first octet.
212
    /// - It's value can be between 0x0 and 0xf.
213
    /// - To join multiple segments, you have to call enable_discovery for each of the segments.
214
    ///
215
    /// For further details see method join_multicast
216
    pub fn enable_all_coap(&mut self, segment: u8) {
1✔
217
        assert!(segment <= 0xf);
1✔
218
        let m = match self.socket.local_addr().unwrap() {
1✔
219
            SocketAddr::V4(_val) => IpAddr::V4(Ipv4Addr::new(224, 0, 1, 187)),
1✔
220
            SocketAddr::V6(_val) => IpAddr::V6(Ipv6Addr::new(
×
221
                0xff00 + segment as u16,
×
222
                0,
223
                0,
224
                0,
225
                0,
226
                0,
227
                0,
228
                0xfd,
229
            )),
230
        };
231
        self.join_multicast(m);
1✔
232
    }
233
}
234
#[async_trait]
235
impl Listener for UdpCoapListener {
236
    async fn listen(
5✔
237
        mut self: Box<Self>,
238
        sender: TransportRequestSender,
239
    ) -> std::io::Result<JoinHandle<std::io::Result<()>>> {
240
        return Ok(tokio::spawn(self.receive_loop(sender)));
3✔
241
    }
242
}
243

244
#[derive(Clone)]
245
struct UdpResponder {
246
    address: SocketAddr, // this is the address we are sending to
247
    tx: UdpResponseSender,
248
}
249

250
#[async_trait]
251
impl Responder for UdpResponder {
252
    async fn respond(&self, response: Vec<u8>) {
4✔
253
        let _ = self.tx.send((response, self.address));
2✔
254
    }
255
    fn address(&self) -> SocketAddr {
1✔
256
        self.address
1✔
257
    }
258
}
259

260
impl UdpCoapListener {
261
    pub async fn receive_loop(mut self, sender: TransportRequestSender) -> std::io::Result<()> {
4✔
262
        loop {
2✔
263
            let mut recv_vec = Vec::with_capacity(u16::MAX as usize);
2✔
UNCOV
264
            select! {
×
265
                message =self.socket.recv_buf_from(&mut recv_vec)=> {
266
                    match message {
267
                        Ok((_size, from)) => {
268
                            sender.send((recv_vec, Arc::new(UdpResponder{address: from, tx: self.response_sender.clone()}))).map_err( |_| std::io::Error::new(ErrorKind::Other, "server channel error"))?;
269
                        }
270
                        Err(e) => {
271
                            return Err(e);
272
                        }
273
                    }
274
                },
275
                response = self.response_receiver.recv() => {
276
                    if let Some((bytes, to)) = response{
277
                        debug!("sending {:?} to {:?}", &bytes,  &to);
278
                        self.socket.send_to(&bytes, to).await?;
279
                    }
280
                    else {
281
                        // in case nobody is listening to us, we can just terminate, though this
282
                        // should never happen for UDP
283
                        return Ok(());
284
                    }
285

286
                }
287
            }
288
        }
289
    }
290
}
291

292
#[derive(Debug)]
293
pub struct QueuedMessage {
294
    pub address: SocketAddr,
295
    pub message: Packet,
296
}
297

298
struct ServerCoapState {
299
    observer: Observer,
300
    block_handler: BlockHandler<SocketAddr>,
301
    disable_observe: bool,
302
}
303

304
pub enum ShouldForwardToHandler {
305
    True,
306
    False,
307
}
308

309
impl ServerCoapState {
310
    pub async fn intercept_request(
1✔
311
        &mut self,
312
        request: &mut CoapRequest<SocketAddr>,
313
        responder: Arc<dyn Responder>,
314
    ) -> ShouldForwardToHandler {
315
        match self.block_handler.intercept_request(request) {
3✔
316
            Ok(true) => return ShouldForwardToHandler::False,
1✔
317
            Err(_err) => return ShouldForwardToHandler::False,
×
318
            Ok(false) => {}
319
        };
320

321
        if self.disable_observe {
1✔
322
            return ShouldForwardToHandler::True;
1✔
323
        }
324

325
        let should_be_forwarded = self.observer.request_handler(request, responder).await;
2✔
326
        if should_be_forwarded {
1✔
327
            return ShouldForwardToHandler::True;
1✔
328
        } else {
329
            return ShouldForwardToHandler::False;
1✔
330
        }
331
    }
332

333
    pub async fn intercept_response(&mut self, request: &mut CoapRequest<SocketAddr>) {
4✔
334
        let resource_path = request.get_path();
1✔
335

336
        let is_block_fetch_for_observer = request.message.get_option(CoapOption::Block2).is_some()
3✔
337
            && request.message.get_option(CoapOption::Observe).is_none()
1✔
338
            && request.source.is_some()
1✔
339
            && self
2✔
340
                .observer
341
                .is_observing(&request.source.unwrap(), &resource_path);
2✔
342

343
        if is_block_fetch_for_observer {
1✔
344
            if let Some((payload, etag)) =
1✔
345
                self.observer.get_resource_payload_and_etag(&resource_path)
346
            {
347
                if let Some(ref mut response) = request.response {
2✔
348
                    response.message.payload = payload.to_vec();
2✔
349
                    response.message.clear_option(CoapOption::ETag);
1✔
350
                    response.message.add_option(CoapOption::ETag, etag);
1✔
351
                    // Prevent duplicate Size2 options, clear first.
352
                    response.message.clear_option(CoapOption::Size2);
1✔
353
                    response
1✔
354
                        .message
355
                        .add_option(CoapOption::Size2, encode_coap_uint(payload.len()));
1✔
356
                }
357
            }
358
        }
359

360
        if let Err(err) = self.block_handler.intercept_response(request) {
3✔
361
            let _ = request.apply_from_error(err);
×
362
        }
363
    }
364

365
    pub fn new() -> Self {
1✔
366
        Self {
367
            observer: Observer::new(),
1✔
368
            block_handler: BlockHandler::new(BlockHandlerConfig::default()),
2✔
369
            disable_observe: false,
370
        }
371
    }
372
    pub fn disable_observe_handling(&mut self, value: bool) {
1✔
373
        self.disable_observe = value
1✔
374
    }
375
}
376

377
pub struct Server {
378
    listeners: Vec<Box<dyn Listener>>,
379
    coap_state: Arc<Mutex<ServerCoapState>>,
380
    new_packet_receiver: TransportRequestReceiver,
381
    new_packet_sender: TransportRequestSender,
382
}
383

384
impl Server {
385
    /// Creates a CoAP server listening on the given address.
386
    pub fn new_udp<A: ToSocketAddrs>(addr: A) -> Result<Self, io::Error> {
×
387
        let listener: Vec<Box<dyn Listener>> = vec![Box::new(UdpCoapListener::new(addr)?)];
×
388
        Ok(Self::from_listeners(listener))
×
389
    }
390

391
    pub fn from_listeners(listeners: Vec<Box<dyn Listener>>) -> Self {
1✔
392
        let (tx, rx) = mpsc::unbounded_channel();
2✔
393
        Server {
394
            listeners,
395
            coap_state: Arc::new(Mutex::new(ServerCoapState::new())),
2✔
396
            new_packet_receiver: rx,
397
            new_packet_sender: tx,
398
        }
399
    }
400

401
    async fn spawn_handles(
1✔
402
        listeners: Vec<Box<dyn Listener>>,
403
        sender: TransportRequestSender,
404
    ) -> std::io::Result<Vec<JoinHandle<std::io::Result<()>>>> {
405
        let mut handles = vec![];
1✔
406
        for listener in listeners.into_iter() {
4✔
407
            let handle = listener.listen(sender.clone()).await?;
3✔
408
            handles.push(handle);
1✔
409
        }
410
        return Ok(handles);
1✔
411
    }
412

413
    /// run the server.
414
    pub async fn run<Handler: RequestHandler>(mut self, handler: Handler) -> Result<(), io::Error> {
54✔
415
        let _handles = Self::spawn_handles(self.listeners, self.new_packet_sender.clone()).await?;
22✔
416

417
        let handler_arc = Arc::new(handler);
22✔
418
        // receive an input, sync our cache / states, then call custom handler
419
        loop {
420
            let (bytes, respond) =
21✔
421
                self.new_packet_receiver.recv().await.ok_or_else(|| {
×
422
                    std::io::Error::new(ErrorKind::Other, "listen channel closed")
×
423
                })?;
424
            if let Ok(packet) = Packet::from_bytes(&bytes) {
20✔
425
                let mut request = Box::new(CoapRequest::<SocketAddr>::from_packet(
30✔
426
                    packet,
10✔
427
                    respond.address(),
20✔
428
                ));
429
                let mut coap_state = self.coap_state.lock().await;
20✔
430
                let should_forward = coap_state
30✔
431
                    .intercept_request(&mut request, respond.clone())
10✔
432
                    .await;
40✔
433

434
                match should_forward {
10✔
435
                    ShouldForwardToHandler::True => {
436
                        let handler_clone = handler_arc.clone();
20✔
437
                        let coap_state_clone = self.coap_state.clone();
20✔
438
                        tokio::spawn(async move {
30✔
439
                            request = handler_clone.handle_request(request).await;
22✔
440
                            coap_state_clone
45✔
441
                                .lock()
9✔
442
                                .await
27✔
443
                                .intercept_response(request.as_mut())
9✔
444
                                .await;
18✔
445

446
                            Self::respond_to_request(request, respond).await;
9✔
447
                        });
448
                    }
449
                    ShouldForwardToHandler::False => {
450
                        Self::respond_to_request(request, respond).await;
12✔
451
                    }
452
                }
453
            }
454
        }
455
    }
456
    async fn respond_to_request(req: Box<CoapRequest<SocketAddr>>, responder: Arc<dyn Responder>) {
4✔
457
        // if we have some reponse to send, send it
458
        if let Some(Ok(b)) = req.response.map(|resp| resp.message.to_bytes()) {
6✔
459
            responder.respond(b).await;
2✔
460
        }
461
    }
462
    #[deprecated(
463
        since = "0.21.0",
464
        note = "Use 'coap::Server::automatic_observe_handling' instead."
465
    )]
466
    /// disable auto-observe handling in server
467
    pub async fn disable_observe_handling(&mut self, value: bool) {
4✔
468
        self.automatic_observe_handling(value).await
2✔
469
    }
470
    /// set auto-observe handling in server, defaults to enabled
471
    pub async fn automatic_observe_handling(&mut self, value: bool) {
4✔
472
        let mut coap_state = self.coap_state.lock().await;
2✔
473
        coap_state.disable_observe_handling(value)
2✔
474
    }
475
}
476

477
#[cfg(test)]
478
pub mod test {
479
    use crate::request::RequestBuilder;
480

481
    use super::super::*;
482
    use super::*;
483
    use coap_lite::{block_handler::BlockValue, CoapOption, RequestType};
484
    use std::str;
485
    use std::time::Duration;
486

487
    pub fn spawn_server<
488
        F: Fn(Box<CoapRequest<SocketAddr>>) -> HandlerRet + Send + Sync + 'static,
489
        HandlerRet,
490
    >(
491
        ip: &'static str,
492
        request_handler: F,
493
    ) -> mpsc::UnboundedReceiver<u16>
494
    where
495
        HandlerRet: Future<Output = Box<CoapRequest<SocketAddr>>> + Send,
496
    {
497
        let (tx, rx) = mpsc::unbounded_channel();
498
        let _task = tokio::spawn(async move {
499
            let sock = UdpSocket::bind(ip).await.unwrap();
500
            let addr = sock.local_addr().unwrap();
501
            let listener = Box::new(UdpCoapListener::from_socket(sock));
502
            let server = Server::from_listeners(vec![listener]);
503
            tx.send(addr.port()).unwrap();
504
            server.run(request_handler).await.unwrap();
505
        });
506

507
        rx
508
    }
509

510
    async fn request_handler(
511
        mut req: Box<CoapRequest<SocketAddr>>,
512
    ) -> Box<CoapRequest<SocketAddr>> {
513
        let uri_path_list = req.message.get_option(CoapOption::UriPath).unwrap().clone();
514
        assert_eq!(uri_path_list.len(), 1);
515

516
        match req.response {
517
            Some(ref mut response) => {
518
                response.message.payload = uri_path_list.front().unwrap().clone();
519
            }
520
            _ => {}
521
        }
522
        return req;
523
    }
524

525
    pub fn spawn_server_with_all_coap<
526
        F: Fn(Box<CoapRequest<SocketAddr>>) -> HandlerRet + Send + Sync + 'static,
527
        HandlerRet,
528
    >(
529
        ip: &'static str,
530
        request_handler: F,
531
        segment: u8,
532
    ) -> mpsc::UnboundedReceiver<u16>
533
    where
534
        HandlerRet: Future<Output = Box<CoapRequest<SocketAddr>>> + Send,
535
    {
536
        let (tx, rx) = mpsc::unbounded_channel();
537

538
        std::thread::Builder::new()
539
            .name(String::from("v4-server"))
540
            .spawn(move || {
541
                tokio::runtime::Runtime::new()
542
                    .unwrap()
543
                    .block_on(async move {
544
                        // multicast needs a server on a real interface
545
                        let sock = UdpSocket::bind((ip, 0)).await.unwrap();
546
                        let addr = sock.local_addr().unwrap();
547
                        let mut listener = Box::new(UdpCoapListener::from_socket(sock));
548
                        listener.enable_all_coap(segment);
549
                        let server = Server::from_listeners(vec![listener]);
550
                        tx.send(addr.port()).unwrap();
551
                        server.run(request_handler).await.unwrap();
552
                    })
553
            })
554
            .unwrap();
555

556
        rx
557
    }
558

559
    pub fn spawn_server_disable_observe<
560
        F: Fn(Box<CoapRequest<SocketAddr>>) -> HandlerRet + Send + Sync + 'static,
561
        HandlerRet,
562
    >(
563
        ip: &'static str,
564
        request_handler: F,
565
    ) -> mpsc::UnboundedReceiver<u16>
566
    where
567
        HandlerRet: Future<Output = Box<CoapRequest<SocketAddr>>> + Send,
568
    {
569
        let (tx, rx) = mpsc::unbounded_channel();
570
        let _task = tokio::spawn(async move {
571
            let sock = UdpSocket::bind(ip).await.unwrap();
572
            let addr = sock.local_addr().unwrap();
573
            let listener = Box::new(UdpCoapListener::from_socket(sock));
574
            let mut server = Server::from_listeners(vec![listener]);
575
            server.disable_observe_handling(true).await;
576
            tx.send(addr.port()).unwrap();
577
            server.run(request_handler).await.unwrap();
578
        });
579

580
        rx
581
    }
582

583
    #[tokio::test]
584
    async fn test_listener_instantiation() {
585
        let listener = UdpCoapListener::new("127.0.0.1:0").unwrap();
586
        assert!(
587
            listener.socket.local_addr().unwrap().ip() == IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))
588
        );
589
        // assert!(listener.socket.blocking() == false);
590

591
        let explicit_socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
592
        let another_listener = UdpCoapListener::from_socket(explicit_socket);
593
        assert!(
594
            another_listener.socket.local_addr().unwrap().ip()
595
                == IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))
596
        );
597
    }
598

599
    #[tokio::test]
600
    async fn test_echo_server() {
601
        let server_port = spawn_server("127.0.0.1:0", request_handler)
602
            .recv()
603
            .await
604
            .unwrap();
605

606
        let client = UdpCoAPClient::new(format!("127.0.0.1:{}", server_port))
607
            .await
608
            .unwrap();
609
        let mut request = CoapRequest::new();
610
        request.message.header.set_version(1);
611
        request
612
            .message
613
            .header
614
            .set_type(coap_lite::MessageType::Confirmable);
615
        request.message.header.set_code("0.01");
616
        request.message.header.message_id = 1;
617
        request.message.set_token(vec![0x51, 0x55, 0x77, 0xE8]);
618
        request
619
            .message
620
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
621
        client.send_single_request(&request).await.unwrap();
622

623
        let recv_packet = client.send(request).await.unwrap();
624
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
625
    }
626

627
    #[tokio::test]
628
    async fn test_put_block() {
629
        let server_port = spawn_server("127.0.0.1:0", request_handler)
630
            .recv()
631
            .await
632
            .unwrap();
633
        let data = "hello this is a payload";
634
        let mut v = Vec::new();
635
        for _ in 0..1024 {
636
            v.extend_from_slice(data.as_bytes());
637
        }
638
        let payload_size = v.len();
639
        let server_string = format!("127.0.0.1:{}", server_port);
640
        let client = UdpCoAPClient::new(server_string.clone()).await.unwrap();
641

642
        let request = RequestBuilder::new("/large", RequestType::Put)
643
            .data(Some(v))
644
            .domain(server_string.clone())
645
            .build();
646

647
        let resp = client.send(request).await.unwrap();
648
        let block_opt = resp
649
            .message
650
            .get_first_option_as::<BlockValue>(CoapOption::Block1)
651
            .expect("expected block opt in response")
652
            .expect("could not decode block1 option");
653
        let expected_number = (payload_size as f32 / 1024.0).ceil() as u16 - 1;
654
        assert_eq!(
655
            block_opt.num, expected_number,
656
            "block not completely received!"
657
        );
658

659
        assert_eq!(resp.message.payload, b"large".to_vec());
660
    }
661

662
    #[tokio::test]
663
    #[ignore]
664
    async fn test_echo_server_v6() {
665
        let server_port = spawn_server("::1:0", request_handler).recv().await.unwrap();
666

667
        let client = UdpCoAPClient::new(format!("::1:{}", server_port))
668
            .await
669
            .unwrap();
670
        let mut request = CoapRequest::new();
671
        request.message.header.set_version(1);
672
        request
673
            .message
674
            .header
675
            .set_type(coap_lite::MessageType::Confirmable);
676
        request.message.header.set_code("0.01");
677
        request.message.header.message_id = 1;
678
        request.message.set_token(vec![0x51, 0x55, 0x77, 0xE8]);
679
        request
680
            .message
681
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
682

683
        let recv_packet = client.send(request).await.unwrap();
684
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
685
    }
686

687
    #[tokio::test]
688
    async fn test_echo_server_no_token() {
689
        let server_port = spawn_server("127.0.0.1:0", request_handler)
690
            .recv()
691
            .await
692
            .unwrap();
693

694
        let client = UdpCoAPClient::new(format!("127.0.0.1:{}", server_port))
695
            .await
696
            .unwrap();
697
        let mut packet = CoapRequest::new();
698
        packet.message.header.set_version(1);
699
        packet
700
            .message
701
            .header
702
            .set_type(coap_lite::MessageType::Confirmable);
703
        packet.message.header.set_code("0.01");
704
        packet.message.header.message_id = 1;
705
        packet
706
            .message
707
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
708
        let recv_packet = client.send(packet).await.unwrap();
709
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
710
    }
711

712
    #[tokio::test]
713
    #[ignore]
714
    async fn test_echo_server_no_token_v6() {
715
        let server_port = spawn_server("::1:0", request_handler).recv().await.unwrap();
716

717
        let client = UdpCoAPClient::new(format!("::1:{}", server_port))
718
            .await
719
            .unwrap();
720
        let mut packet = CoapRequest::new();
721
        packet.message.header.set_version(1);
722
        packet
723
            .message
724
            .header
725
            .set_type(coap_lite::MessageType::Confirmable);
726
        packet.message.header.set_code("0.01");
727
        packet.message.header.message_id = 1;
728
        packet
729
            .message
730
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
731

732
        let recv_packet = client.send(packet).await.unwrap();
733
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
734
    }
735

736
    #[tokio::test]
737
    async fn test_update_resource() {
738
        let path = "/test";
739
        let payload1 = b"data1".to_vec();
740
        let payload2 = b"data2".to_vec();
741
        let (tx, mut rx) = mpsc::unbounded_channel();
742
        let (tx2, mut rx2) = mpsc::unbounded_channel();
743
        let mut step = 1;
744

745
        let server_port = spawn_server("127.0.0.1:0", request_handler)
746
            .recv()
747
            .await
748
            .unwrap();
749

750
        let client = UdpCoAPClient::new(format!("127.0.0.1:{}", server_port))
751
            .await
752
            .unwrap();
753

754
        tx.send(step).unwrap();
755
        let mut request = CoapRequest::new();
756
        request.set_method(RequestType::Put);
757
        request.set_path(path);
758
        request.message.payload = payload1.clone();
759
        client.send(request.clone()).await.unwrap();
760

761
        let mut receive_step = 1;
762
        let payload1_clone = payload1.clone();
763
        let payload2_clone = payload2.clone();
764
        client
765
            .observe(path, move |msg| {
766
                match rx.try_recv() {
767
                    Ok(n) => receive_step = n,
768
                    _ => (),
769
                }
770

771
                match receive_step {
772
                    1 => assert_eq!(msg.payload, payload1_clone),
773
                    2 => {
774
                        assert_eq!(msg.payload, payload2_clone);
775
                        tx2.send(()).unwrap();
776
                    }
777
                    _ => panic!("unexpected step"),
778
                }
779
            })
780
            .await
781
            .unwrap();
782

783
        step = 2;
784
        tx.send(step).unwrap();
785
        request.message.payload = payload2.clone();
786
        let client2 = UdpCoAPClient::new(format!("127.0.0.1:{}", server_port))
787
            .await
788
            .unwrap();
789
        let _ = client2.send(request).await.unwrap();
790
        assert_eq!(
791
            tokio::time::timeout(Duration::new(5, 0), rx2.recv())
792
                .await
793
                .unwrap(),
794
            Some(())
795
        );
796
    }
797

798
    #[tokio::test]
799
    async fn test_observe_transparent_transmission() {
800
        let path = "/test";
801
        let (tx, mut rx) = mpsc::unbounded_channel();
802

803
        let server_port = spawn_server_disable_observe("127.0.0.1:0", request_handler)
804
            .recv()
805
            .await
806
            .unwrap();
807

808
        let client = UdpCoAPClient::new(format!("127.0.0.1:{}", server_port))
809
            .await
810
            .unwrap();
811

812
        client
813
            .observe(path, move |msg| {
814
                assert_eq!(msg.payload, b"test".to_vec());
815
                tx.send(()).unwrap();
816
            })
817
            .await
818
            .unwrap();
819

820
        assert_eq!(
821
            tokio::time::timeout(Duration::new(5, 0), rx.recv())
822
                .await
823
                .unwrap(),
824
            Some(())
825
        );
826
    }
827

828
    #[tokio::test]
829
    async fn multicast_server_all_coap() {
830
        // segment not relevant with IPv4
831
        let segment = 0x0;
832
        let server_port = spawn_server_with_all_coap("0.0.0.0", request_handler, segment)
833
            .recv()
834
            .await
835
            .unwrap();
836

837
        let client = UdpCoAPClient::new(format!("127.0.0.1:{}", server_port))
838
            .await
839
            .unwrap();
840
        let mut request = CoapRequest::new();
841
        request.message.header.set_version(1);
842
        request
843
            .message
844
            .header
845
            .set_type(coap_lite::MessageType::Confirmable);
846
        request.message.header.set_code("0.01");
847
        request.message.header.message_id = 1;
848
        request.message.set_token(vec![0x51, 0x55, 0x77, 0xE8]);
849
        request
850
            .message
851
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
852
        let recv_packet = client.send(request).await.unwrap();
853

854
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
855

856
        let client = UdpCoAPClient::new(format!("224.0.1.187:{}", server_port))
857
            .await
858
            .unwrap();
859
        let mut request = RequestBuilder::new("test-echo", RequestType::Get)
860
            .data(Some(vec![0x51, 0x55, 0x77, 0xE8]))
861
            .confirmable(true)
862
            .build();
863

864
        let mut receiver = client.create_receiver_for(&request).await;
865
        client.send_all_coap(&mut request, segment).await.unwrap();
866
        let recv_packet = receiver.receive().await.unwrap();
867
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
868
    }
869

870
    //This test right now does not work on windows
871
    #[cfg(unix)]
872
    #[tokio::test]
873
    #[ignore]
874
    async fn multicast_server_all_coap_v6() {
875
        // use segment 0x04 which should be the smallest administered scope
876

877
        let segment = 0x04;
878
        let server_port = spawn_server_with_all_coap("::0", request_handler, segment)
879
            .recv()
880
            .await
881
            .unwrap();
882

883
        let client = UdpCoAPClient::new(format!("::1:{}", server_port))
884
            .await
885
            .unwrap();
886
        let mut request = CoapRequest::new();
887
        request.message.header.set_version(1);
888
        request
889
            .message
890
            .header
891
            .set_type(coap_lite::MessageType::Confirmable);
892
        request.message.header.set_code("0.01");
893
        request.message.header.message_id = 1;
894
        request.message.set_token(vec![0x51, 0x55, 0x77, 0xE8]);
895
        request
896
            .message
897
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
898
        client.send_single_request(&request).await.unwrap();
899

900
        let recv_packet = client.send(request).await.unwrap();
901
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
902

903
        // use 0xff02 to keep it within this network
904
        let client = UdpCoAPClient::new(format!("ff0{}::fd:{}", segment, server_port))
905
            .await
906
            .unwrap();
907
        let mut request = CoapRequest::new();
908
        request.message.header.set_version(1);
909
        request
910
            .message
911
            .header
912
            .set_type(coap_lite::MessageType::NonConfirmable);
913
        request.message.header.set_code("0.01");
914
        request.message.header.message_id = 2;
915
        request.message.set_token(vec![0x51, 0x55, 0x77, 0xE8]);
916
        request
917
            .message
918
            .add_option(CoapOption::UriPath, b"test-echo".to_vec());
919
        let mut receiver = client.create_receiver_for(&request).await;
920
        client.send_all_coap(&mut request, segment).await.unwrap();
921
        let recv_packet = receiver.receive().await.unwrap();
922
        assert_eq!(recv_packet.message.payload, b"test-echo".to_vec());
923
    }
924

925
    #[test]
926
    fn multicast_join_leave() {
927
        std::thread::Builder::new()
928
            .name(String::from("v4-server"))
929
            .spawn(move || {
930
                tokio::runtime::Runtime::new()
931
                    .unwrap()
932
                    .block_on(async move {
933
                        // multicast needs a server on a real interface
934
                        let sock = UdpSocket::bind(("0.0.0.0", 0)).await.unwrap();
935
                        let mut listener = Box::new(UdpCoapListener::from_socket(sock));
936
                        listener.join_multicast(IpAddr::V4(Ipv4Addr::new(224, 0, 1, 1)));
937
                        listener.join_multicast(IpAddr::V4(Ipv4Addr::new(224, 1, 1, 1)));
938
                        listener.leave_multicast(IpAddr::V4(Ipv4Addr::new(224, 0, 1, 1)));
939
                        listener.leave_multicast(IpAddr::V4(Ipv4Addr::new(224, 1, 1, 1)));
940
                        let server = Server::from_listeners(vec![listener]);
941
                        server.run(request_handler).await.unwrap();
942
                    })
943
            })
944
            .unwrap();
945

946
        std::thread::sleep(std::time::Duration::from_secs(1));
947
    }
948
    #[test]
949
    #[ignore]
950
    fn multicast_join_leave_v6() {
951
        std::thread::Builder::new()
952
            .name(String::from("v6-server"))
953
            .spawn(move || {
954
                tokio::runtime::Runtime::new()
955
                    .unwrap()
956
                    .block_on(async move {
957
                        // multicast needs a server on a real interface
958
                        let sock = UdpSocket::bind(("0.0.0.0", 0)).await.unwrap();
959
                        let mut listener = Box::new(UdpCoapListener::from_socket(sock));
960
                        listener.join_multicast(IpAddr::V6(Ipv6Addr::new(
961
                            0xff02, 0, 0, 0, 0, 0, 1, 0x1,
962
                        )));
963
                        listener.join_multicast(IpAddr::V6(Ipv6Addr::new(
964
                            0xff02, 0, 0, 0, 0, 1, 0, 0x2,
965
                        )));
966
                        listener.leave_multicast(IpAddr::V6(Ipv6Addr::new(
967
                            0xff02, 0, 0, 0, 0, 0, 1, 0x1,
968
                        )));
969
                        listener.join_multicast(IpAddr::V6(Ipv6Addr::new(
970
                            0xff02, 0, 0, 0, 0, 1, 0, 0x2,
971
                        )));
972
                        let server = Server::from_listeners(vec![listener]);
973
                        server.run(request_handler).await.unwrap();
974
                    })
975
            })
976
            .unwrap();
977

978
        std::thread::sleep(std::time::Duration::from_secs(1));
979
    }
980

981
    fn get_expected_response() -> Vec<u8> {
982
        let mut resp = vec![];
983
        for c in b'a'..=b'z' {
984
            resp.extend(std::iter::repeat(c).take(1024));
985
        }
986
        resp
987
    }
988
    async fn block2_responder(
989
        mut req: Box<CoapRequest<SocketAddr>>,
990
    ) -> Box<CoapRequest<SocketAddr>> {
991
        // vec should contain 'a' 1024 times, then 'b' 1024, up to ascii 'z'
992

993
        match req.response {
994
            Some(ref mut response) => {
995
                response.message.payload = get_expected_response();
996
            }
997
            _ => {}
998
        }
999
        return req;
1000
    }
1001
    #[tokio::test]
1002
    async fn test_block2_server_response() {
1003
        let server_port = spawn_server("127.0.0.1:0", block2_responder)
1004
            .recv()
1005
            .await
1006
            .unwrap();
1007

1008
        let client = UdpCoAPClient::new(format!("127.0.0.1:{}", server_port))
1009
            .await
1010
            .unwrap();
1011
        let resp = client
1012
            .send(RequestBuilder::new("/", RequestType::Get).build())
1013
            .await
1014
            .unwrap();
1015
        assert_eq!(
1016
            resp.message.payload,
1017
            get_expected_response(),
1018
            "responses do not match"
1019
        );
1020
    }
1021
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc