• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Covertness / coap-rs / 25931059598

15 May 2026 05:11PM UTC coverage: 84.871% (-0.4%) from 85.288%
25931059598

Pull #130

github

web-flow
Merge 936cb4815 into 094b60c74
Pull Request #130: Fix all Clippy warnings and enforce Clippy in CI

480 of 567 new or added lines in 19 files covered. (84.66%)

3 existing lines in 2 files now uncovered.

1251 of 1474 relevant lines covered (84.87%)

3.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.48
/src/observer.rs
1
use coap_lite::{
2
    block_handler::BlockValue, CoapOption, CoapRequest, MessageClass, MessageType, ObserveOption,
3
    Packet, RequestType as Method, ResponseType as Status,
4
};
5
use futures::{
6
    stream::{Fuse, SelectNextSome},
7
    StreamExt,
8
};
9
use log::{debug, warn};
10
use std::{
11
    collections::{hash_map::Entry, HashMap, HashSet},
12
    net::SocketAddr,
13
    sync::Arc,
14
    time::Duration,
15
};
16
use tokio::time::interval;
17
use tokio_stream::wrappers::IntervalStream;
18

19
use crate::server::Responder;
20

21
const DEFAULT_UNACKNOWLEDGE_MESSAGE_TRY_TIMES: usize = 10;
22

23
pub struct Observer {
24
    registers: HashMap<String, RegisterItem>,
25
    resources: HashMap<String, ResourceItem>,
26
    register_resources: HashMap<String, RegisterResourceItem>,
27
    unacknowledge_messages: HashMap<u16, UnacknowledgeMessageItem>,
28
    current_message_id: u16,
29
    timer: Fuse<IntervalStream>,
30
}
31

32
#[derive(Debug)]
33
struct RegisterItem {
34
    register_resources: HashSet<String>,
35
}
36

37
#[derive(Debug)]
38
struct ResourceItem {
39
    payload: Arc<Vec<u8>>,
40
    register_resources: HashSet<String>,
41
    sequence: u32,
42
    etag: Vec<u8>,
43
}
44

45
struct RegisterResourceItem {
46
    pub(crate) registered_responder: Arc<dyn Responder>,
47
    pub(crate) resource: String,
48
    pub(crate) token: Vec<u8>,
49
    pub(crate) unacknowledge_message: Option<u16>,
50
    pub(crate) preferred_block_size: Option<usize>,
51
}
52

53
#[derive(Debug)]
54
struct UnacknowledgeMessageItem {
55
    register_resource: String,
56
    try_times: usize,
57
}
58

59
/// Encodes a usize as a CoAP uint (big-endian, variable length up to 4 bytes).
60
/// The value 0 is encoded as an empty byte slice. Values larger than 2^32-1 are saturated.
61
pub(crate) fn encode_coap_uint(value: usize) -> Vec<u8> {
1✔
62
    (value.min(u32::MAX as usize) as u32)
1✔
63
        .to_be_bytes()
64
        .iter()
65
        .skip_while(|&&b| b == 0)
3✔
66
        .copied()
67
        .collect()
68
}
69

70
impl Default for Observer {
NEW
71
    fn default() -> Self {
×
NEW
72
        Self::new()
×
73
    }
74
}
75

76
impl Observer {
77
    /// Creates an observer with channel to send message.
78
    pub fn new() -> Self {
1✔
79
        Self {
80
            registers: HashMap::new(),
1✔
81
            resources: HashMap::new(),
1✔
82
            register_resources: HashMap::new(),
1✔
83
            unacknowledge_messages: HashMap::new(),
1✔
84
            current_message_id: 0,
85
            timer: IntervalStream::new(interval(Duration::from_secs(1))).fuse(),
2✔
86
        }
87
    }
88

89
    /// poll the observer's timer.
90
    pub fn select_next_some(&mut self) -> SelectNextSome<'_, Fuse<IntervalStream>> {
×
91
        self.timer.select_next_some()
×
92
    }
93

94
    /// Checks if the given client endpoint is currently observing the specified resource.
95
    pub fn is_observing(&self, addr: &SocketAddr, path: &String) -> bool {
1✔
96
        let key = Self::format_register_resource(addr, path);
1✔
97
        self.register_resources.contains_key(&key)
1✔
98
    }
99

100
    /// Returns a shared reference to the resource payload and its current ETag.
101
    /// Used by the server to serve remaining blocks consistently during a block-wise transfer.
102
    pub fn get_resource_payload_and_etag(&self, path: &String) -> Option<(Arc<Vec<u8>>, Vec<u8>)> {
1✔
103
        self.resources
1✔
104
            .get(path)
1✔
105
            .map(|r| (r.payload.clone(), r.etag.clone()))
3✔
106
    }
107

108
    /// Computes a non-cryptographic ETag using FNV-1a hash for change detection only.
109
    /// This ETag is not suitable for security purposes (e.g., integrity validation against attackers).
110
    fn compute_etag(payload: &[u8]) -> Vec<u8> {
1✔
111
        let mut hash: u64 = 0xcbf29ce484222325;
1✔
112
        for &byte in payload {
2✔
113
            hash ^= byte as u64;
1✔
114
            hash = hash.wrapping_mul(0x100000001b3);
1✔
115
        }
116
        hash.to_be_bytes()[..4].to_vec()
1✔
117
    }
118

119
    /// filter the requests belong to the observer. store the responder in case it is needed
120
    /// returns whether the request should be forwarded to the handler
121
    pub async fn request_handler(
1✔
122
        &mut self,
123
        request: &mut CoapRequest<SocketAddr>,
124
        responder: Arc<dyn Responder>,
125
    ) -> bool {
126
        match request.message.header.get_type() {
2✔
127
            MessageType::Acknowledgement => {
128
                self.acknowledge(request);
1✔
129
                return false;
1✔
130
            }
131
            MessageType::Reset => {
132
                self.reset_notification(request);
1✔
133
                return false;
1✔
134
            }
135
            _ => {}
136
        }
137

138
        match (request.get_method(), request.get_observe_flag()) {
3✔
139
            (&Method::Get, Some(observe_option)) => match observe_option {
2✔
140
                Ok(ObserveOption::Register) => {
141
                    self.register(request, responder).await;
1✔
142
                    false
1✔
143
                }
144
                Ok(ObserveOption::Deregister) => {
145
                    self.deregister(request);
1✔
146
                    true
1✔
147
                }
NEW
148
                _ => true,
×
149
            },
150
            (&Method::Put, _) => {
151
                self.resource_changed(request).await;
2✔
152
                true
1✔
153
            }
154
            _ => true,
1✔
155
        }
156
    }
157

158
    /// trigger send the unacknowledge messages.
159
    pub async fn timer_handler(&mut self) {
×
160
        let register_resource_keys: Vec<String>;
161
        {
162
            register_resource_keys = self
×
163
                .unacknowledge_messages
NEW
164
                .values()
×
NEW
165
                .map(|msg| msg.register_resource.clone())
×
UNCOV
166
                .collect();
×
167
        }
168

169
        for register_resource_key in register_resource_keys {
×
170
            if self.try_unacknowledge_message(&register_resource_key) {
×
171
                self.notify_register_with_newest_resource(&register_resource_key)
×
172
                    .await;
×
173
            }
174
        }
175
    }
176

177
    async fn register(
1✔
178
        &mut self,
179
        request: &mut CoapRequest<SocketAddr>,
180
        responder: Arc<dyn Responder>,
181
    ) {
182
        let register_address = responder.address();
2✔
183
        let resource_path = request.get_path();
1✔
184

185
        debug!("register {} {}", register_address, resource_path);
3✔
186

187
        // reply NotFound if resource doesn't exist
188
        if !self.resources.contains_key(&resource_path) {
2✔
189
            if let Some(ref response) = request.response.take() {
2✔
190
                let mut response2 = response.clone();
1✔
191
                response2.set_status(Status::NotFound);
1✔
192
                let msg_serial = response2.message.to_bytes();
1✔
193
                if let Ok(b) = msg_serial {
3✔
194
                    responder.respond(b).await;
2✔
195
                }
196
            }
197
            return;
198
        }
199

200
        let preferred_block_size = request
2✔
201
            .message
202
            .get_first_option_as::<BlockValue>(CoapOption::Block2)
1✔
203
            .and_then(|x: Result<BlockValue, _>| x.ok())
3✔
204
            .map(|b: BlockValue| b.size());
3✔
205

206
        self.record_register_resource(
1✔
207
            responder.clone(),
2✔
208
            &resource_path,
1✔
209
            request.message.get_token(),
1✔
210
            preferred_block_size,
211
        );
212

213
        let resource = self.resources.get(&resource_path).unwrap();
1✔
214

215
        if let Some(response) = request.response.take() {
3✔
216
            let mut response2 = response.clone();
1✔
217
            response2.message.set_observe_value(resource.sequence);
1✔
218
            response2
1✔
219
                .message
220
                .header
221
                .set_type(MessageType::NonConfirmable);
1✔
222
            response2
1✔
223
                .message
224
                .add_option(CoapOption::ETag, resource.etag.clone());
1✔
225

226
            let total_size = resource.payload.len();
1✔
227
            response2
1✔
228
                .message
229
                .add_option(CoapOption::Size2, encode_coap_uint(total_size));
1✔
230

231
            if let Some(block_size) = preferred_block_size {
2✔
232
                if resource.payload.len() > block_size {
3✔
233
                    let block = BlockValue::new(0, true, block_size).expect("valid block size");
2✔
234
                    response2
1✔
235
                        .message
236
                        .add_option_as::<BlockValue>(CoapOption::Block2, block);
1✔
237
                    response2.message.payload = resource.payload[..block_size].to_vec();
1✔
238
                } else {
239
                    response2.message.payload = resource.payload.to_vec();
2✔
240
                }
241
            } else {
242
                response2.message.payload = resource.payload.to_vec();
2✔
243
            }
244

245
            if let Ok(b) = response2.message.to_bytes() {
3✔
246
                responder.respond(b).await;
2✔
247
            }
248
        }
249
    }
250

251
    fn deregister(&mut self, request: &CoapRequest<SocketAddr>) {
1✔
252
        let register_address = request.source.unwrap();
1✔
253
        let resource_path = request.get_path();
1✔
254

255
        debug!("deregister {} {}", register_address, resource_path);
3✔
256

257
        self.remove_register_resource(
1✔
258
            &register_address,
259
            &resource_path,
260
            request.message.get_token(),
1✔
261
        );
262
    }
263

264
    /// handle reset message from client.
265
    /// according to rfc 7641 section 4.5, if a client rejects a notification with a reset message,
266
    /// the server must remove the associated entry from the list of observers.
267
    fn reset_notification(&mut self, request: &CoapRequest<SocketAddr>) {
1✔
268
        let message_id = request.message.header.message_id;
1✔
269

270
        // Validate Message ID
271
        let Some(unack_item) = self.unacknowledge_messages.get(&message_id) else {
1✔
272
            debug!("Reset received for unknown Message ID: {}", message_id);
×
273
            return;
274
        };
275

276
        // Verify source endpoint
277
        let key = &unack_item.register_resource;
1✔
278
        let Some(reg_item) = self.register_resources.get(key) else {
2✔
279
            debug!("Reset received for unknown registration key: {}", key);
×
280
            return;
281
        };
282

283
        let expected_address = reg_item.registered_responder.address();
1✔
284
        if request.source != Some(expected_address) {
1✔
285
            warn!(
×
286
                "Received RST for MID {} from unexpected source {:?}, expected {}. Ignoring.",
287
                message_id, request.source, expected_address
288
            );
289
            return;
290
        }
291

292
        // Extract necessary information for cleanup
293
        // We clone data here to release the immutable borrows on self before mutation.
294
        let address = expected_address;
1✔
295
        let resource_path = reg_item.resource.clone();
1✔
296
        let token = reg_item.token.clone();
1✔
297
        let register_resource_key = key.clone();
1✔
298

299
        // Remove the mapping between MID and resource
300
        self.unacknowledge_messages.remove(&message_id);
2✔
301

302
        // Clear the pending message reference in the registration to avoid double-deletion
303
        if let Some(item) = self.register_resources.get_mut(&register_resource_key) {
1✔
304
            item.unacknowledge_message = None;
1✔
305
        }
306

307
        debug!(
2✔
308
            "Reset received from {} for resource {}, removing observer",
309
            address, resource_path
310
        );
311

312
        // Remove the observer from the registry
313
        self.remove_register_resource(&address, &resource_path, &token);
2✔
314
    }
315

316
    async fn resource_changed(&mut self, request: &CoapRequest<SocketAddr>) {
4✔
317
        let resource_path = request.get_path();
1✔
318
        let resource_payload = &request.message.payload;
1✔
319

320
        debug!("resource_changed {} {:?}", resource_path, resource_payload);
3✔
321

322
        let register_resource_keys: Vec<String>;
323
        {
324
            let resource = self.record_resource(&resource_path, resource_payload);
2✔
325
            register_resource_keys = resource.register_resources.iter().cloned().collect();
1✔
326
        }
327

328
        for register_resource_key in register_resource_keys {
3✔
329
            self.gen_message_id();
1✔
330
            self.notify_register_with_newest_resource(&register_resource_key)
2✔
331
                .await;
3✔
332
            self.record_unacknowledge_message(&register_resource_key);
1✔
333
        }
334
    }
335

336
    fn acknowledge(&mut self, request: &CoapRequest<SocketAddr>) {
1✔
337
        self.remove_unacknowledge_message(
1✔
338
            &request.message.header.message_id,
1✔
339
            request.message.get_token(),
1✔
340
        );
341
    }
342

343
    fn record_register_resource(
1✔
344
        &mut self,
345
        responder: Arc<dyn Responder>,
346
        path: &String,
347
        token: &[u8],
348
        preferred_block_size: Option<usize>,
349
    ) {
350
        let resource = self.resources.get_mut(path).unwrap();
2✔
351
        let register_key = responder;
1✔
352

353
        let register_resource_key = Self::format_register_resource(&register_key.address(), path);
2✔
354

355
        self.register_resources
1✔
356
            .entry(register_resource_key.clone())
2✔
357
            .and_modify(|existing| {
2✔
358
                // Clear any pending unacknowledged message to prevent stale timeout logic
359
                if let Some(old_msg_id) = existing.unacknowledge_message.take() {
1✔
360
                    self.unacknowledge_messages.remove(&old_msg_id);
×
361
                }
362

363
                // Refresh the observation state with new parameters
364
                existing.registered_responder = register_key.clone();
1✔
365
                existing.token = token.into();
1✔
366
                existing.preferred_block_size = preferred_block_size;
1✔
367
            })
368
            .or_insert_with(|| RegisterResourceItem {
3✔
369
                registered_responder: register_key.clone(),
1✔
370
                resource: path.clone(),
1✔
371
                token: token.into(),
1✔
372
                unacknowledge_message: None,
373
                preferred_block_size,
1✔
374
            });
375

376
        resource
1✔
377
            .register_resources
378
            .replace(register_resource_key.clone());
1✔
379
        match self.registers.entry(register_key.address().to_string()) {
1✔
380
            Entry::Occupied(register) => {
1✔
381
                register
382
                    .into_mut()
1✔
383
                    .register_resources
384
                    .replace(register_resource_key);
1✔
385
            }
386
            Entry::Vacant(v) => {
1✔
387
                let mut register = RegisterItem {
388
                    register_resources: HashSet::new(),
1✔
389
                };
390
                register.register_resources.insert(register_resource_key);
1✔
391

392
                v.insert(register);
1✔
393
            }
394
        };
395
    }
396

397
    fn remove_register_resource(
1✔
398
        &mut self,
399
        address: &SocketAddr,
400
        path: &String,
401
        token: &[u8],
402
    ) -> bool {
403
        let register_resource_key = Self::format_register_resource(address, path);
1✔
404

405
        if let Some(register_resource) = self.register_resources.get(&register_resource_key) {
2✔
406
            if register_resource.token != *token {
2✔
407
                return false;
×
408
            }
409

410
            if let Some(unacknowledge_message) = register_resource.unacknowledge_message {
2✔
411
                self.unacknowledge_messages
1✔
412
                    .remove(&unacknowledge_message)
1✔
413
                    .unwrap();
414
            }
415

416
            assert!(self
1✔
417
                .resources
418
                .get_mut(path)
419
                .unwrap()
420
                .register_resources
421
                .remove(&register_resource_key));
422

423
            let remove_register;
424
            {
425
                let register = self
2✔
426
                    .registers
427
                    .get_mut(&register_resource.registered_responder.address().to_string())
1✔
428
                    .unwrap();
429
                assert!(register.register_resources.remove(&register_resource_key));
1✔
430
                remove_register = register.register_resources.is_empty();
1✔
431
            }
432

433
            if remove_register {
1✔
434
                self.registers
1✔
435
                    .remove(&register_resource.registered_responder.address().to_string());
1✔
436
            }
437
        }
438

439
        self.register_resources.remove(&register_resource_key);
2✔
440
        true
1✔
441
    }
442

443
    fn record_resource(&mut self, path: &str, payload: &[u8]) -> &ResourceItem {
1✔
444
        match self.resources.entry(path.to_owned()) {
2✔
445
            Entry::Occupied(resource) => {
1✔
446
                let r = resource.into_mut();
1✔
447
                r.sequence += 1;
1✔
448
                r.payload = Arc::new(payload.to_owned());
1✔
449
                r.etag = Self::compute_etag(payload);
1✔
450
                r
1✔
451
            }
452
            Entry::Vacant(v) => v.insert(ResourceItem {
2✔
453
                payload: Arc::new(payload.to_owned()),
2✔
454
                register_resources: HashSet::new(),
1✔
455
                sequence: 0,
456
                etag: Self::compute_etag(payload),
1✔
457
            }),
458
        }
459
    }
460

461
    fn record_unacknowledge_message(&mut self, register_resource_key: &String) {
1✔
462
        let message_id = self.current_message_id;
1✔
463

464
        let register_resource = self
1✔
465
            .register_resources
466
            .get_mut(register_resource_key)
1✔
467
            .unwrap();
468
        if let Some(old_message_id) = register_resource.unacknowledge_message {
1✔
469
            self.unacknowledge_messages.remove(&old_message_id);
1✔
470
        }
471

472
        register_resource.unacknowledge_message = Some(message_id);
1✔
473
        self.unacknowledge_messages.insert(
2✔
474
            message_id,
475
            UnacknowledgeMessageItem {
1✔
476
                register_resource: register_resource_key.clone(),
1✔
477
                try_times: 1,
478
            },
479
        );
480
    }
481

482
    fn try_unacknowledge_message(&mut self, register_resource_key: &String) -> bool {
×
483
        let register_resource = self
×
484
            .register_resources
485
            .get_mut(register_resource_key)
×
486
            .unwrap();
NEW
487
        let message_id = &register_resource.unacknowledge_message.unwrap();
×
488

489
        let try_again;
490
        {
491
            let unacknowledge_message = self.unacknowledge_messages.get_mut(message_id).unwrap();
×
492
            if unacknowledge_message.try_times > DEFAULT_UNACKNOWLEDGE_MESSAGE_TRY_TIMES {
×
493
                try_again = false;
×
494
            } else {
495
                unacknowledge_message.try_times += 1;
×
496
                try_again = true;
×
497
            }
498
        }
499

500
        if !try_again {
×
501
            warn!(
×
502
                "unacknowledge_message try times exceeded  {}",
503
                register_resource_key
504
            );
505

506
            register_resource.unacknowledge_message = None;
×
507
            self.unacknowledge_messages.remove(message_id);
×
508
        }
509

NEW
510
        try_again
×
511
    }
512

513
    fn remove_unacknowledge_message(&mut self, message_id: &u16, token: &[u8]) {
1✔
514
        if let Some(message) = self.unacknowledge_messages.get_mut(message_id) {
1✔
515
            let register_resource = self
1✔
516
                .register_resources
517
                .get_mut(&message.register_resource)
1✔
518
                .unwrap();
519
            if register_resource.token != *token {
1✔
520
                return;
521
            }
522

523
            register_resource.unacknowledge_message = None;
×
524
        }
525

526
        self.unacknowledge_messages.remove(message_id);
×
527
    }
528

529
    /// Notifies a specific registered observer about a resource change.
530
    ///
531
    /// Note on Architecture: The payload truncation and Block2 option injection for the
532
    /// first block are handled directly within `Observer` rather than the generic
533
    /// `intercept_response` in `server.rs`. This is a deliberate design choice to maintain
534
    /// single-responsibility in a push-based context: asynchronous notifications bypass the
535
    /// main server request/response loop and are sent directly via the `Responder`.
536
    /// Centralizing the "first block generation" logic here avoids duplicating the truncation
537
    /// code for both synchronous registrations and asynchronous notifications.
538
    async fn notify_register_with_newest_resource(&mut self, register_resource_key: &String) {
4✔
539
        let message_id = self.current_message_id;
1✔
540

541
        debug!("notify {} {}", register_resource_key, message_id);
3✔
542

543
        let message = &mut Packet::new();
2✔
544
        message.header.set_type(MessageType::Confirmable);
1✔
545
        message.header.code = MessageClass::Response(Status::Content);
1✔
546

547
        let register_resource = self.register_resources.get(register_resource_key).unwrap();
1✔
548
        let resource = self.resources.get(&register_resource.resource).unwrap();
1✔
549

550
        message.set_token(register_resource.token.clone());
1✔
551
        message.set_observe_value(resource.sequence);
1✔
552
        message.header.message_id = message_id;
1✔
553
        message.add_option(CoapOption::ETag, resource.etag.clone());
1✔
554

555
        let total_size = resource.payload.len();
1✔
556
        message.add_option(CoapOption::Size2, encode_coap_uint(total_size));
1✔
557

558
        if let Some(block_size) = register_resource.preferred_block_size {
2✔
559
            if resource.payload.len() > block_size {
3✔
560
                let block = BlockValue::new(0, true, block_size).expect("valid block size");
2✔
561
                message.add_option_as::<BlockValue>(CoapOption::Block2, block);
1✔
562
                message.payload = resource.payload[..block_size].to_vec();
1✔
563
            } else {
564
                message.payload = resource.payload.to_vec();
×
565
            }
566
        } else {
567
            message.payload = resource.payload.to_vec();
2✔
568
        }
569

570
        if let Ok(b) = message.to_bytes() {
3✔
571
            debug!("notify register with newest resource {:?}", b);
3✔
572
            register_resource.registered_responder.respond(b).await;
2✔
573
        }
574
    }
575

576
    fn gen_message_id(&mut self) -> u16 {
1✔
577
        self.current_message_id += 1;
1✔
578
        self.current_message_id
1✔
579
    }
580

581
    fn format_register_resource(address: &SocketAddr, path: &String) -> String {
1✔
582
        format!("{}${}", address, path)
1✔
583
    }
584
}
585

586
#[cfg(test)]
587
mod test {
588

589
    use crate::request::RequestBuilder;
590

591
    use super::super::*;
592
    use super::*;
593
    use async_trait::async_trait;
594
    use coap_lite::CoapResponse;
595
    use std::io::ErrorKind;
596
    use tokio::sync::mpsc;
597

598
    async fn request_handler(
599
        mut req: Box<CoapRequest<SocketAddr>>,
600
    ) -> Box<CoapRequest<SocketAddr>> {
601
        match *req.get_method() {
602
            coap_lite::RequestType::Get => {
603
                let observe_option = req.get_observe_flag().unwrap().unwrap();
604
                assert_eq!(observe_option, ObserveOption::Deregister);
605
            }
606
            coap_lite::RequestType::Put => {}
607
            _ => panic!("unexpected request"),
608
        }
609

610
        if let Some(ref mut response) = req.response {
611
            response.message.payload = b"OK".to_vec();
612
        };
613
        req
614
    }
615

616
    fn decode_uint(data: &[u8]) -> usize {
617
        data.iter().fold(0, |acc, &b| (acc << 8) | b as usize)
618
    }
619

620
    #[tokio::test]
621
    async fn test_observe() {
622
        let path = "/test";
623
        let payload1 = b"data1".to_vec();
624
        let payload2 = b"data2".to_vec();
625
        let (tx, mut rx) = mpsc::unbounded_channel();
626
        let (tx2, mut rx2) = mpsc::unbounded_channel();
627
        let mut step = 1;
628

629
        let server_port = server::test::spawn_server("127.0.0.1:0", request_handler)
630
            .recv()
631
            .await
632
            .unwrap();
633

634
        let server_address = &format!("127.0.0.1:{}", server_port);
635

636
        let client = UdpCoAPClient::new(server_address).await.unwrap();
637

638
        tx.send(step).unwrap();
639
        let mut request = CoapRequest::new();
640

641
        request.set_method(coap_lite::RequestType::Put);
642
        request.set_path(path);
643
        request.message.set_token(vec![1]);
644

645
        request.message.payload = payload1.clone();
646
        let _ = client.send(request.clone()).await.unwrap();
647

648
        let payload1_clone = payload1.clone();
649
        let payload2_clone = payload2.clone();
650

651
        let client2 = client.clone();
652

653
        let mut receive_step = 1;
654
        client
655
            .observe(path, move |msg| {
656
                match rx.try_recv() {
657
                    Ok(n) => receive_step = n,
658
                    _ => debug!("receive_step rx error"),
659
                }
660
                debug!("receive on client: {:?}", msg);
661

662
                match receive_step {
663
                    1 => assert_eq!(msg.payload, payload1_clone),
664
                    2 => {
665
                        assert_eq!(msg.payload, payload2_clone);
666
                        tx2.send(()).unwrap();
667
                    }
668
                    _ => panic!("unexpected step"),
669
                }
670
            })
671
            .await
672
            .unwrap();
673

674
        step = 2;
675
        debug!("on step 2");
676
        tx.send(step).unwrap();
677

678
        request.message.payload = payload2.clone();
679
        request.message.set_token(vec![2]);
680

681
        let _ = client2.send(request).await.unwrap();
682
        assert_eq!(
683
            tokio::time::timeout(Duration::new(5, 0), rx2.recv())
684
                .await
685
                .unwrap(),
686
            Some(())
687
        );
688
    }
689
    #[tokio::test]
690
    async fn test_unobserve() {
691
        let path = "/test";
692
        let payload1 = b"data1".to_vec();
693
        let payload2 = b"data2".to_vec();
694

695
        let server_port = server::test::spawn_server("127.0.0.1:0", request_handler)
696
            .recv()
697
            .await
698
            .unwrap();
699

700
        let server_address = &format!("127.0.0.1:{}", server_port);
701

702
        let client = UdpCoAPClient::new(server_address).await.unwrap();
703

704
        let client3 = client.clone();
705

706
        let mut request = RequestBuilder::new(path, coap_lite::RequestType::Put)
707
            .token(Some(vec![1]))
708
            .data(Some(payload1.clone()))
709
            .build();
710
        let _ = client.send(request.clone()).await.unwrap();
711

712
        let payload1_clone = payload1.clone();
713
        let unobserve = client
714
            .observe(path, move |msg| {
715
                assert_eq!(msg.payload, payload1_clone);
716
            })
717
            .await
718
            .unwrap();
719

720
        unobserve.send(client::ObserveMessage::Terminate).unwrap();
721
        request.message.payload = payload2.clone();
722

723
        let _ = client3.send(request).await.unwrap();
724
    }
725

726
    #[tokio::test]
727
    async fn test_observe_without_resource() {
728
        let path = "/test";
729

730
        let server_port = server::test::spawn_server("127.0.0.1:0", request_handler)
731
            .recv()
732
            .await
733
            .unwrap();
734

735
        let client = UdpCoAPClient::new(format!("127.0.0.1:{}", server_port))
736
            .await
737
            .unwrap();
738
        let error = client.observe(path, |_msg| {}).await.unwrap_err();
739
        assert_eq!(error.kind(), ErrorKind::NotFound);
740
    }
741

742
    #[tokio::test]
743
    async fn test_observe_cancelled_by_rst() {
744
        use async_trait::async_trait;
745
        use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
746

747
        let mut observer = Observer::new();
748
        let path = "test";
749
        let addr: SocketAddr = "127.0.0.1:5683".parse().unwrap();
750

751
        // Mock Responder to intercept outgoing packets (notifications)
752
        struct MockResponder {
753
            addr: SocketAddr,
754
            tx: UnboundedSender<Vec<u8>>,
755
        }
756

757
        #[async_trait]
758
        impl Responder for MockResponder {
759
            async fn respond(&self, bytes: Vec<u8>) {
760
                self.tx.send(bytes).unwrap();
761
            }
762
            fn address(&self) -> SocketAddr {
763
                self.addr
764
            }
765
        }
766

767
        // Setup: Create a resource and a channel to capture notifications
768
        observer.resources.insert(
769
            path.to_string(),
770
            ResourceItem {
771
                payload: Arc::new(vec![]),
772
                register_resources: HashSet::new(),
773
                sequence: 0,
774
                etag: vec![],
775
            },
776
        );
777

778
        let (tx, mut rx) = unbounded_channel();
779
        let responder = Arc::new(MockResponder { addr, tx });
780

781
        // Register the observer
782
        let mut register_req = CoapRequest::<SocketAddr>::new();
783
        register_req.set_path(path);
784
        register_req.set_method(Method::Get);
785
        register_req.message.set_token(vec![1, 2, 3, 4]);
786
        register_req.set_observe_flag(ObserveOption::Register);
787
        register_req.source = Some(addr);
788

789
        observer
790
            .request_handler(&mut register_req, responder.clone())
791
            .await;
792

793
        let key = format!("{}${}", addr, path);
794
        assert!(observer.register_resources.contains_key(&key));
795

796
        // Trigger a resource change (PUT) to cause a notification
797
        let mut put_req = CoapRequest::<SocketAddr>::new();
798
        put_req.set_path(path);
799
        put_req.set_method(Method::Put);
800
        put_req.message.payload = b"data1".to_vec();
801
        put_req.source = Some(addr);
802

803
        observer
804
            .request_handler(&mut put_req, responder.clone())
805
            .await;
806

807
        // Intercept the notification to get its Message ID
808
        let notification_bytes = rx.recv().await.unwrap();
809
        let notification_pkt = Packet::from_bytes(&notification_bytes).unwrap();
810
        assert_eq!(notification_pkt.header.get_type(), MessageType::Confirmable);
811
        let mid = notification_pkt.header.message_id;
812

813
        // Simulate the client rejecting the notification with a Reset message
814
        let mut rst_req = CoapRequest::<SocketAddr>::new();
815
        rst_req.message.header.set_type(MessageType::Reset);
816
        rst_req.message.header.set_code("0.00");
817
        rst_req.message.header.message_id = mid;
818
        rst_req.source = Some(addr);
819

820
        observer
821
            .request_handler(&mut rst_req, responder.clone())
822
            .await;
823

824
        // Verify that the observer has been removed from the list
825
        assert!(!observer.register_resources.contains_key(&key));
826

827
        // Trigger another resource change and ensure no new notification is sent
828
        let mut put_req2 = CoapRequest::<SocketAddr>::new();
829
        put_req2.set_path(path);
830
        put_req2.set_method(Method::Put);
831
        put_req2.message.payload = b"data2".to_vec();
832
        put_req2.source = Some(addr);
833

834
        observer
835
            .request_handler(&mut put_req2, responder.clone())
836
            .await;
837

838
        // Use timeout to verify the channel is empty (no notification sent)
839
        let result = tokio::time::timeout(Duration::from_millis(100), rx.recv()).await;
840
        assert!(
841
            result.is_err(),
842
            "Expected no notification after RST cancellation"
843
        );
844
    }
845

846
    use tokio::sync::Mutex;
847

848
    struct MockResponder {
849
        addr: SocketAddr,
850
        last_sent: Arc<Mutex<Option<Vec<u8>>>>,
851
    }
852

853
    impl MockResponder {
854
        fn new(addr: SocketAddr) -> Self {
855
            Self {
856
                addr,
857
                last_sent: Arc::new(Mutex::new(None)),
858
            }
859
        }
860

861
        async fn get_last_sent(&self) -> Option<Vec<u8>> {
862
            self.last_sent.lock().await.clone()
863
        }
864
    }
865

866
    #[async_trait]
867
    impl Responder for MockResponder {
868
        async fn respond(&self, bytes: Vec<u8>) {
869
            *self.last_sent.lock().await = Some(bytes);
870
        }
871
        fn address(&self) -> SocketAddr {
872
            self.addr
873
        }
874
    }
875

876
    #[tokio::test]
877
    async fn test_observer_block_size_boundaries_and_preferences() {
878
        let mut observer = Observer::new();
879
        let path = "test";
880
        let addr1: SocketAddr = "127.0.0.1:5001".parse().unwrap();
881
        let addr2: SocketAddr = "127.0.0.1:5002".parse().unwrap();
882

883
        // 1. Prepare a resource exactly equal to block_size (1024 bytes)
884
        let payload_exact = vec![0xAA; 1024];
885
        let mut put_req = CoapRequest::<SocketAddr>::new();
886
        put_req.set_method(Method::Put);
887
        put_req.set_path(path);
888
        put_req.message.payload = payload_exact.clone();
889
        put_req.source = Some(addr1);
890
        observer
891
            .request_handler(&mut put_req, Arc::new(MockResponder::new(addr1)))
892
            .await;
893

894
        // 2. Client 1 registers, expecting block size 1024
895
        let mut reg_req1 = CoapRequest::<SocketAddr>::new();
896
        reg_req1.set_method(Method::Get);
897
        reg_req1.set_path(path);
898
        reg_req1.set_observe_flag(ObserveOption::Register);
899
        reg_req1.message.add_option_as::<BlockValue>(
900
            CoapOption::Block2,
901
            BlockValue::new(0, false, 1024).unwrap(),
902
        );
903
        reg_req1.source = Some(addr1);
904
        reg_req1.response = Some(CoapResponse {
905
            message: Packet::new(),
906
        });
907

908
        let responder1 = Arc::new(MockResponder::new(addr1));
909
        observer
910
            .request_handler(&mut reg_req1, responder1.clone())
911
            .await;
912

913
        let resp1_bytes = responder1.get_last_sent().await.unwrap();
914
        let resp1_pkt = Packet::from_bytes(&resp1_bytes).unwrap();
915

916
        assert_eq!(resp1_pkt.payload.len(), 1024);
917
        assert!(
918
            resp1_pkt.get_option(CoapOption::Block2).is_none(),
919
            "Should not add Block2 if exactly equal"
920
        );
921

922
        // Size2 assertion for Client 1 registration
923
        let size2_opt = resp1_pkt.get_first_option(CoapOption::Size2);
924
        assert!(
925
            size2_opt.is_some(),
926
            "Size2 missing in register response (no block2)"
927
        );
928
        let size2_val = decode_uint(size2_opt.unwrap());
929
        assert_eq!(size2_val, 1024, "Size2 value mismatch");
930

931
        // 3. Prepare larger resource (1500 bytes)
932
        let payload_large = vec![0xBB; 1500];
933
        let mut put_req2 = CoapRequest::<SocketAddr>::new();
934
        put_req2.set_method(Method::Put);
935
        put_req2.set_path(path);
936
        put_req2.message.payload = payload_large.clone();
937
        put_req2.source = Some(addr1);
938
        observer
939
            .request_handler(&mut put_req2, Arc::new(MockResponder::new(addr1)))
940
            .await;
941

942
        // 4. Client 2 registers, expecting block size 512
943
        let mut reg_req2 = CoapRequest::<SocketAddr>::new();
944
        reg_req2.set_method(Method::Get);
945
        reg_req2.set_path(path);
946
        reg_req2.set_observe_flag(ObserveOption::Register);
947
        reg_req2.message.add_option_as::<BlockValue>(
948
            CoapOption::Block2,
949
            BlockValue::new(0, false, 512).unwrap(),
950
        );
951
        reg_req2.message.set_token(vec![2]);
952
        reg_req2.source = Some(addr2);
953
        reg_req2.response = Some(CoapResponse {
954
            message: Packet::new(),
955
        });
956

957
        let responder2 = Arc::new(MockResponder::new(addr2));
958
        observer
959
            .request_handler(&mut reg_req2, responder2.clone())
960
            .await;
961

962
        let resp2_bytes = responder2.get_last_sent().await.unwrap();
963
        let resp2_pkt = Packet::from_bytes(&resp2_bytes).unwrap();
964

965
        assert_eq!(resp2_pkt.payload.len(), 512);
966
        let block2_opt = resp2_pkt
967
            .get_first_option_as::<BlockValue>(CoapOption::Block2)
968
            .unwrap()
969
            .unwrap();
970
        assert_eq!(block2_opt.size(), 512);
971
        assert!(block2_opt.more, "Should have more blocks");
972

973
        // Size2 assertion for Client 2 registration
974
        let size2_opt2 = resp2_pkt.get_first_option(CoapOption::Size2);
975
        assert!(
976
            size2_opt2.is_some(),
977
            "Size2 missing in register response with block2"
978
        );
979
        let size2_val2 = decode_uint(size2_opt2.unwrap());
980
        assert_eq!(
981
            size2_val2, 1500,
982
            "Size2 value mismatch in block2 registration"
983
        );
984

985
        // 5. Trigger a resource change (PUT) to cause notification to Client 1
986
        let mut put_req3 = CoapRequest::<SocketAddr>::new();
987
        put_req3.set_method(Method::Put);
988
        put_req3.set_path(path);
989
        put_req3.message.payload = vec![0xCC; 1500];
990
        put_req3.source = Some(addr1);
991
        observer
992
            .request_handler(&mut put_req3, responder1.clone())
993
            .await;
994

995
        let resp1_notify_bytes = responder1.get_last_sent().await.unwrap();
996
        let resp1_notify_pkt = Packet::from_bytes(&resp1_notify_bytes).unwrap();
997

998
        assert_eq!(resp1_notify_pkt.payload.len(), 1024);
999
        let notify_block2 = resp1_notify_pkt
1000
            .get_first_option_as::<BlockValue>(CoapOption::Block2)
1001
            .unwrap()
1002
            .unwrap();
1003
        assert_eq!(notify_block2.size(), 1024);
1004

1005
        // Size2 assertion for notification
1006
        let size2_opt_notify = resp1_notify_pkt.get_first_option(CoapOption::Size2);
1007
        assert!(size2_opt_notify.is_some(), "Size2 missing in notification");
1008
        let size2_val_notify = decode_uint(size2_opt_notify.unwrap());
1009
        assert_eq!(
1010
            size2_val_notify, 1500,
1011
            "Size2 value mismatch in notification"
1012
        );
1013
    }
1014

1015
    #[test]
1016
    fn test_encode_coap_uint() {
1017
        assert_eq!(encode_coap_uint(0), vec![] as Vec<u8>);
1018
        assert_eq!(encode_coap_uint(1), vec![0x01]);
1019
        assert_eq!(encode_coap_uint(255), vec![0xFF]);
1020
        assert_eq!(encode_coap_uint(256), vec![0x01, 0x00]);
1021
        assert_eq!(encode_coap_uint(0xFFFFFFFF), vec![0xFF, 0xFF, 0xFF, 0xFF]);
1022
        #[cfg(target_pointer_width = "64")]
1023
        {
1024
            assert_eq!(encode_coap_uint(0x100000000), vec![0xFF, 0xFF, 0xFF, 0xFF]);
1025
        }
1026
    }
1027

1028
    #[tokio::test]
1029
    async fn test_observe_empty_resource_includes_size2() {
1030
        let mut observer = Observer::new();
1031
        let path = "empty";
1032
        let addr: SocketAddr = "127.0.0.1:5003".parse().unwrap();
1033

1034
        // Create an empty resource via PUT
1035
        let empty_payload = vec![];
1036
        let mut put_req = CoapRequest::<SocketAddr>::new();
1037
        put_req.set_method(Method::Put);
1038
        put_req.set_path(path);
1039
        put_req.message.payload = empty_payload.clone();
1040
        put_req.source = Some(addr);
1041
        observer
1042
            .request_handler(&mut put_req, Arc::new(MockResponder::new(addr)))
1043
            .await;
1044

1045
        // Register to observe the empty resource
1046
        let mut reg_req = CoapRequest::<SocketAddr>::new();
1047
        reg_req.set_method(Method::Get);
1048
        reg_req.set_path(path);
1049
        reg_req.set_observe_flag(ObserveOption::Register);
1050
        reg_req.source = Some(addr);
1051
        reg_req.response = Some(CoapResponse {
1052
            message: Packet::new(),
1053
        });
1054
        let responder = Arc::new(MockResponder::new(addr));
1055
        observer
1056
            .request_handler(&mut reg_req, responder.clone())
1057
            .await;
1058

1059
        let resp_bytes = responder.get_last_sent().await.unwrap();
1060
        let resp_pkt = Packet::from_bytes(&resp_bytes).unwrap();
1061

1062
        // Verify Size2 option is present and value is 0
1063
        let size2_opt = resp_pkt.get_first_option(CoapOption::Size2);
1064
        assert!(
1065
            size2_opt.is_some(),
1066
            "Size2 option missing for empty resource"
1067
        );
1068
        let size2_val = decode_uint(size2_opt.unwrap());
1069
        assert_eq!(size2_val, 0, "Size2 value should be 0 for empty payload");
1070
    }
1071

1072
    #[tokio::test]
1073
    async fn test_observe_reregistration_updates_token_and_block_size() {
1074
        let mut observer = Observer::new();
1075
        let path = "test_update";
1076
        let addr: SocketAddr = "127.0.0.1:6001".parse().unwrap();
1077

1078
        // Create a resource larger than block sizes to trigger block-wise transfer
1079
        let payload = vec![0xAA; 1500];
1080
        let mut put_req = CoapRequest::<SocketAddr>::new();
1081
        put_req.set_method(Method::Put);
1082
        put_req.set_path(path);
1083
        put_req.message.payload = payload.clone();
1084
        put_req.source = Some(addr);
1085
        observer
1086
            .request_handler(&mut put_req, Arc::new(MockResponder::new(addr)))
1087
            .await;
1088

1089
        // Initial registration with Token A and Block2 preference 1024
1090
        let mut reg_req1 = CoapRequest::<SocketAddr>::new();
1091
        reg_req1.set_method(Method::Get);
1092
        reg_req1.set_path(path);
1093
        reg_req1.set_observe_flag(ObserveOption::Register);
1094
        reg_req1.message.set_token(vec![1]); // Token A
1095
        reg_req1.message.add_option_as::<BlockValue>(
1096
            CoapOption::Block2,
1097
            BlockValue::new(0, false, 1024).unwrap(),
1098
        );
1099
        reg_req1.source = Some(addr);
1100
        reg_req1.response = Some(CoapResponse {
1101
            message: Packet::new(),
1102
        });
1103

1104
        let responder1 = Arc::new(MockResponder::new(addr));
1105
        observer
1106
            .request_handler(&mut reg_req1, responder1.clone())
1107
            .await;
1108

1109
        // Re-registration from the same client with Token B and Block2 preference 512
1110
        let mut reg_req2 = CoapRequest::<SocketAddr>::new();
1111
        reg_req2.set_method(Method::Get);
1112
        reg_req2.set_path(path);
1113
        reg_req2.set_observe_flag(ObserveOption::Register);
1114
        reg_req2.message.set_token(vec![2]); // Token B
1115
        reg_req2.message.add_option_as::<BlockValue>(
1116
            CoapOption::Block2,
1117
            BlockValue::new(0, false, 512).unwrap(), // New preference
1118
        );
1119
        reg_req2.source = Some(addr);
1120
        reg_req2.response = Some(CoapResponse {
1121
            message: Packet::new(),
1122
        });
1123

1124
        let responder2 = Arc::new(MockResponder::new(addr));
1125
        observer
1126
            .request_handler(&mut reg_req2, responder2.clone())
1127
            .await;
1128

1129
        // Trigger a resource change to force a notification
1130
        let mut put_req2 = CoapRequest::<SocketAddr>::new();
1131
        put_req2.set_method(Method::Put);
1132
        put_req2.set_path(path);
1133
        put_req2.message.payload = vec![0xBB; 1500];
1134
        put_req2.source = Some(addr);
1135
        observer
1136
            .request_handler(&mut put_req2, responder2.clone())
1137
            .await;
1138

1139
        // Fetch the notification sent by the observer
1140
        let resp_bytes = responder2.get_last_sent().await.unwrap();
1141
        let resp_pkt = Packet::from_bytes(&resp_bytes).unwrap();
1142

1143
        // Verify the server used the updated state (Token B and Block size 512)
1144
        assert_eq!(
1145
            resp_pkt.get_token(),
1146
            vec![2],
1147
            "Server should use the updated token from re-registration"
1148
        );
1149
        assert_eq!(
1150
            resp_pkt.payload.len(),
1151
            512,
1152
            "Server should use the updated block size preference from re-registration"
1153
        );
1154

1155
        let block2_opt = resp_pkt
1156
            .get_first_option_as::<BlockValue>(CoapOption::Block2)
1157
            .unwrap()
1158
            .unwrap();
1159
        assert_eq!(block2_opt.size(), 512);
1160
    }
1161
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc