• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Covertness / coap-rs / 14806460246

03 May 2025 02:13AM UTC coverage: 81.7% (-0.5%) from 82.206%
14806460246

push

github

web-flow
Merge pull request #113 from RoastVeg/unlogged_bad_response_codes

Fix response code errors not getting logged in receive_loop

2 of 4 new or added lines in 1 file covered. (50.0%)

6 existing lines in 1 file now uncovered.

692 of 847 relevant lines covered (81.7%)

3.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.01
/src/observer.rs
1
use coap_lite::{
2
    CoapRequest, MessageClass, MessageType, ObserveOption, Packet, RequestType as Method,
3
    ResponseType as Status,
4
};
5
use futures::{
6
    stream::{Fuse, SelectNextSome},
7
    StreamExt,
8
};
9
use log::{debug, warn};
10
use std::{
11
    collections::{hash_map::Entry, HashMap, HashSet},
12
    net::SocketAddr,
13
    sync::Arc,
14
    time::Duration,
15
};
16
use tokio::time::interval;
17
use tokio_stream::wrappers::IntervalStream;
18

19
use crate::server::Responder;
20

21
const DEFAULT_UNACKNOWLEDGE_MESSAGE_TRY_TIMES: usize = 10;
22

23
pub struct Observer {
24
    registers: HashMap<String, RegisterItem>,
25
    resources: HashMap<String, ResourceItem>,
26
    register_resources: HashMap<String, RegisterResourceItem>,
27
    unacknowledge_messages: HashMap<u16, UnacknowledgeMessageItem>,
28
    current_message_id: u16,
29
    timer: Fuse<IntervalStream>,
30
}
31

32
#[derive(Debug)]
33
struct RegisterItem {
34
    register_resources: HashSet<String>,
35
}
36

37
#[derive(Debug)]
38
struct ResourceItem {
39
    payload: Vec<u8>,
40
    register_resources: HashSet<String>,
41
    sequence: u32,
42
}
43

44
struct RegisterResourceItem {
45
    pub(crate) registered_responder: Arc<dyn Responder>,
46
    pub(crate) resource: String,
47
    pub(crate) token: Vec<u8>,
48
    pub(crate) unacknowledge_message: Option<u16>,
49
}
50

51
#[derive(Debug)]
52
struct UnacknowledgeMessageItem {
53
    register_resource: String,
54
    try_times: usize,
55
}
56

57
impl Observer {
58
    /// Creates an observer with channel to send message.
59
    pub fn new() -> Self {
1✔
60
        Self {
61
            registers: HashMap::new(),
1✔
62
            resources: HashMap::new(),
1✔
63
            register_resources: HashMap::new(),
1✔
64
            unacknowledge_messages: HashMap::new(),
1✔
65
            current_message_id: 0,
66
            timer: IntervalStream::new(interval(Duration::from_secs(1))).fuse(),
2✔
67
        }
68
    }
69

70
    /// poll the observer's timer.
71
    pub fn select_next_some(&mut self) -> SelectNextSome<Fuse<IntervalStream>> {
×
72
        self.timer.select_next_some()
×
73
    }
74

75
    /// filter the requests belong to the observer. store the responder in case it is needed
76
    /// returns whether the request should be forwarded to the handler
77
    pub async fn request_handler(
1✔
78
        &mut self,
79
        request: &mut CoapRequest<SocketAddr>,
80
        responder: Arc<dyn Responder>,
81
    ) -> bool {
82
        if request.message.header.get_type() == MessageType::Acknowledgement {
2✔
83
            self.acknowledge(request);
×
84
            return false;
×
85
        }
86

87
        match (request.get_method(), request.get_observe_flag()) {
3✔
88
            (&Method::Get, Some(observe_option)) => match observe_option {
1✔
89
                Ok(ObserveOption::Register) => {
90
                    self.register(request, responder).await;
2✔
91
                    return false;
1✔
92
                }
93
                Ok(ObserveOption::Deregister) => {
94
                    self.deregister(request);
1✔
95
                    return true;
1✔
96
                }
97
                _ => return true,
×
98
            },
99
            (&Method::Put, _) => {
100
                self.resource_changed(request).await;
2✔
101
                return true;
1✔
102
            }
103
            _ => return true,
1✔
104
        }
105
    }
106

107
    /// trigger send the unacknowledge messages.
108
    pub async fn timer_handler(&mut self) {
×
109
        let register_resource_keys: Vec<String>;
110
        {
UNCOV
111
            register_resource_keys = self
×
112
                .unacknowledge_messages
113
                .iter()
UNCOV
114
                .map(|(_, msg)| msg.register_resource.clone())
×
115
                .collect();
116
        }
117

UNCOV
118
        for register_resource_key in register_resource_keys {
×
UNCOV
119
            if self.try_unacknowledge_message(&register_resource_key) {
×
UNCOV
120
                self.notify_register_with_newest_resource(&register_resource_key)
×
UNCOV
121
                    .await;
×
122
            }
123
        }
124
    }
125

126
    async fn register(
1✔
127
        &mut self,
128
        request: &mut CoapRequest<SocketAddr>,
129
        responder: Arc<dyn Responder>,
130
    ) {
131
        let register_address = responder.address();
2✔
132
        let resource_path = request.get_path();
1✔
133

134
        debug!("register {} {}", register_address, resource_path);
3✔
135

136
        // reply NotFound if resource doesn't exist
137
        if !self.resources.contains_key(&resource_path) {
2✔
138
            if let Some(ref response) = request.response.take() {
2✔
139
                let mut response2 = response.clone();
1✔
140
                response2.set_status(Status::NotFound);
1✔
141
                let msg_serial = response2.message.to_bytes();
2✔
142
                if let Ok(b) = msg_serial {
2✔
143
                    responder.respond(b).await;
2✔
144
                }
145
            }
146
            return;
147
        }
148

149
        self.record_register_resource(
1✔
150
            responder.clone(),
2✔
151
            &resource_path,
1✔
152
            &request.message.get_token(),
2✔
153
        );
154

155
        let resource = self.resources.get(&resource_path).unwrap();
1✔
156

157
        if let Some(response) = request.response.take() {
2✔
158
            let mut response2 = response.clone();
1✔
159
            response2.message.payload = resource.payload.clone();
2✔
160
            response2.message.set_observe_value(resource.sequence);
1✔
161
            response2
2✔
162
                .message
163
                .header
164
                .set_type(MessageType::NonConfirmable);
1✔
165
            if let Ok(b) = response2.message.to_bytes() {
3✔
166
                responder.respond(b).await;
2✔
167
            }
168
        }
169
    }
170

171
    fn deregister(&mut self, request: &CoapRequest<SocketAddr>) {
1✔
172
        let register_address = request.source.unwrap();
1✔
173
        let resource_path = request.get_path();
1✔
174

175
        debug!("deregister {} {}", register_address, resource_path);
3✔
176

177
        self.remove_register_resource(
1✔
178
            &register_address,
179
            &resource_path,
180
            &request.message.get_token(),
2✔
181
        );
182
    }
183

184
    async fn resource_changed(&mut self, request: &CoapRequest<SocketAddr>) {
4✔
185
        let resource_path = request.get_path();
1✔
186
        let ref resource_payload = request.message.payload;
1✔
187

188
        debug!("resource_changed {} {:?}", resource_path, resource_payload);
3✔
189

190
        let register_resource_keys: Vec<String>;
191
        {
192
            let resource = self.record_resource(&resource_path, &resource_payload);
2✔
193
            register_resource_keys = resource
1✔
194
                .register_resources
195
                .iter()
196
                .map(|k| k.clone())
2✔
197
                .collect();
198
        }
199

200
        for register_resource_key in register_resource_keys {
3✔
201
            self.gen_message_id();
1✔
202
            self.notify_register_with_newest_resource(&register_resource_key)
2✔
203
                .await;
3✔
204
            self.record_unacknowledge_message(&register_resource_key);
1✔
205
        }
206
    }
207

208
    fn acknowledge(&mut self, request: &CoapRequest<SocketAddr>) {
×
209
        self.remove_unacknowledge_message(
×
210
            &request.message.header.message_id,
×
211
            &request.message.get_token(),
×
212
        );
213
    }
214

215
    fn record_register_resource(
1✔
216
        &mut self,
217
        responder: Arc<dyn Responder>,
218
        path: &String,
219
        token: &[u8],
220
    ) {
221
        let resource = self.resources.get_mut(path).unwrap();
2✔
222
        let register_key = responder;
1✔
223

224
        let register_resource_key = Self::format_register_resource(&register_key.address(), path);
2✔
225

226
        self.register_resources
3✔
227
            .entry(register_resource_key.clone())
2✔
228
            .or_insert(RegisterResourceItem {
1✔
229
                registered_responder: register_key.clone(),
2✔
230
                resource: path.clone(),
1✔
231
                token: token.into(),
1✔
232
                unacknowledge_message: None,
1✔
233
            });
234
        resource
2✔
235
            .register_resources
236
            .replace(register_resource_key.clone());
2✔
237
        match self.registers.entry(register_key.address().to_string()) {
1✔
238
            Entry::Occupied(register) => {
×
239
                register
×
240
                    .into_mut()
241
                    .register_resources
242
                    .replace(register_resource_key);
×
243
            }
244
            Entry::Vacant(v) => {
1✔
245
                let mut register = RegisterItem {
246
                    register_resources: HashSet::new(),
1✔
247
                };
248
                register.register_resources.insert(register_resource_key);
1✔
249

250
                v.insert(register);
1✔
251
            }
252
        };
253
    }
254

255
    fn remove_register_resource(
1✔
256
        &mut self,
257
        address: &SocketAddr,
258
        path: &String,
259
        token: &[u8],
260
    ) -> bool {
261
        let register_resource_key = Self::format_register_resource(&address, path);
1✔
262

263
        if let Some(register_resource) = self.register_resources.get(&register_resource_key) {
2✔
264
            if register_resource.token != *token {
2✔
265
                return false;
×
266
            }
267

268
            if let Some(unacknowledge_message) = register_resource.unacknowledge_message {
2✔
269
                self.unacknowledge_messages
2✔
270
                    .remove(&unacknowledge_message)
271
                    .unwrap();
272
            }
273

274
            assert_eq!(
1✔
275
                self.resources
2✔
276
                    .get_mut(path)
277
                    .unwrap()
278
                    .register_resources
279
                    .remove(&register_resource_key),
280
                true
281
            );
282

283
            let remove_register;
284
            {
285
                let register = self
3✔
286
                    .registers
287
                    .get_mut(&register_resource.registered_responder.address().to_string())
1✔
288
                    .unwrap();
289
                assert_eq!(
1✔
290
                    register.register_resources.remove(&register_resource_key),
1✔
291
                    true
292
                );
293
                remove_register = register.register_resources.len() == 0;
1✔
294
            }
295

296
            if remove_register {
1✔
297
                self.registers
2✔
298
                    .remove(&register_resource.registered_responder.address().to_string());
1✔
299
            }
300
        }
301

302
        self.register_resources.remove(&register_resource_key);
2✔
303
        return true;
1✔
304
    }
305

306
    fn record_resource(&mut self, path: &String, payload: &Vec<u8>) -> &ResourceItem {
1✔
307
        match self.resources.entry(path.clone()) {
1✔
308
            Entry::Occupied(resource) => {
1✔
309
                let r = resource.into_mut();
1✔
310
                r.sequence += 1;
2✔
311
                r.payload = payload.clone();
1✔
312
                return r;
1✔
313
            }
314
            Entry::Vacant(v) => {
1✔
315
                return v.insert(ResourceItem {
2✔
316
                    payload: payload.clone(),
1✔
317
                    register_resources: HashSet::new(),
1✔
318
                    sequence: 0,
319
                });
320
            }
321
        }
322
    }
323

324
    fn record_unacknowledge_message(&mut self, register_resource_key: &String) {
1✔
325
        let message_id = self.current_message_id;
1✔
326

327
        let register_resource = self
1✔
328
            .register_resources
329
            .get_mut(register_resource_key)
330
            .unwrap();
331
        if let Some(old_message_id) = register_resource.unacknowledge_message {
1✔
332
            self.unacknowledge_messages.remove(&old_message_id);
×
333
        }
334

335
        register_resource.unacknowledge_message = Some(message_id);
1✔
336
        self.unacknowledge_messages.insert(
2✔
337
            message_id,
338
            UnacknowledgeMessageItem {
1✔
339
                register_resource: register_resource_key.clone(),
1✔
340
                try_times: 1,
341
            },
342
        );
343
    }
344

345
    fn try_unacknowledge_message(&mut self, register_resource_key: &String) -> bool {
×
346
        let register_resource = self
×
347
            .register_resources
348
            .get_mut(register_resource_key)
349
            .unwrap();
350
        let ref message_id = register_resource.unacknowledge_message.unwrap();
×
351

352
        let try_again;
353
        {
354
            let unacknowledge_message = self.unacknowledge_messages.get_mut(message_id).unwrap();
×
355
            if unacknowledge_message.try_times > DEFAULT_UNACKNOWLEDGE_MESSAGE_TRY_TIMES {
×
356
                try_again = false;
×
357
            } else {
358
                unacknowledge_message.try_times += 1;
×
359
                try_again = true;
×
360
            }
361
        }
362

363
        if !try_again {
×
364
            warn!(
×
365
                "unacknowledge_message try times exceeded  {}",
366
                register_resource_key
367
            );
368

369
            register_resource.unacknowledge_message = None;
×
370
            self.unacknowledge_messages.remove(message_id);
×
371
        }
372

373
        return try_again;
×
374
    }
375

376
    fn remove_unacknowledge_message(&mut self, message_id: &u16, token: &[u8]) {
×
377
        if let Some(message) = self.unacknowledge_messages.get_mut(message_id) {
×
378
            let register_resource = self
×
379
                .register_resources
380
                .get_mut(&message.register_resource)
381
                .unwrap();
382
            if register_resource.token != *token {
×
383
                return;
384
            }
385

386
            register_resource.unacknowledge_message = None;
×
387
        }
388

389
        self.unacknowledge_messages.remove(message_id);
×
390
    }
391

392
    async fn notify_register_with_newest_resource(&mut self, register_resource_key: &String) {
4✔
393
        let message_id = self.current_message_id;
1✔
394

395
        debug!("notify {} {}", register_resource_key, message_id);
3✔
396

397
        let ref mut message = Packet::new();
2✔
398
        message.header.set_type(MessageType::Confirmable);
1✔
399
        message.header.code = MessageClass::Response(Status::Content);
1✔
400

401
        let register_resource = self.register_resources.get(register_resource_key).unwrap();
1✔
402
        let resource = self.resources.get(&register_resource.resource).unwrap();
1✔
403

404
        message.set_token(register_resource.token.clone());
1✔
405
        message.set_observe_value(resource.sequence);
1✔
406
        message.header.message_id = message_id;
1✔
407
        message.payload = resource.payload.clone();
1✔
408
        if let Ok(b) = message.to_bytes() {
3✔
409
            debug!("notify register with newest resource {:?}", &b);
3✔
410
            register_resource.registered_responder.respond(b).await;
2✔
411
        }
412
    }
413

414
    fn gen_message_id(&mut self) -> u16 {
1✔
415
        self.current_message_id += 1;
1✔
416
        return self.current_message_id;
1✔
417
    }
418

419
    fn format_register_resource(address: &SocketAddr, path: &String) -> String {
1✔
420
        format!("{}${}", address, path)
1✔
421
    }
422
}
423

424
#[cfg(test)]
425
mod test {
426

427
    use crate::request::RequestBuilder;
428

429
    use super::super::*;
430
    use super::*;
431
    use std::io::ErrorKind;
432
    use tokio::sync::mpsc;
433

434
    async fn request_handler(
435
        mut req: Box<CoapRequest<SocketAddr>>,
436
    ) -> Box<CoapRequest<SocketAddr>> {
437
        match req.get_method() {
438
            &coap_lite::RequestType::Get => {
439
                let observe_option = req.get_observe_flag().unwrap().unwrap();
440
                assert_eq!(observe_option, ObserveOption::Deregister);
441
            }
442
            &coap_lite::RequestType::Put => {}
443
            _ => panic!("unexpected request"),
444
        }
445

446
        match req.response {
447
            Some(ref mut response) => {
448
                response.message.payload = b"OK".to_vec();
449
            }
450
            _ => {}
451
        };
452
        return req;
453
    }
454

455
    #[tokio::test]
456
    async fn test_observe() {
457
        let path = "/test";
458
        let payload1 = b"data1".to_vec();
459
        let payload2 = b"data2".to_vec();
460
        let (tx, mut rx) = mpsc::unbounded_channel();
461
        let (tx2, mut rx2) = mpsc::unbounded_channel();
462
        let mut step = 1;
463

464
        let server_port = server::test::spawn_server("127.0.0.1:0", request_handler)
465
            .recv()
466
            .await
467
            .unwrap();
468

469
        let server_address = &format!("127.0.0.1:{}", server_port);
470

471
        let client = UdpCoAPClient::new_udp(server_address).await.unwrap();
472

473
        tx.send(step).unwrap();
474
        let mut request = CoapRequest::new();
475

476
        request.set_method(coap_lite::RequestType::Put);
477
        request.set_path(path);
478
        request.message.set_token(vec![1]);
479

480
        request.message.payload = payload1.clone();
481
        let _ = client.send(request.clone()).await.unwrap();
482

483
        let payload1_clone = payload1.clone();
484
        let payload2_clone = payload2.clone();
485

486
        let client2 = client.clone();
487

488
        let mut receive_step = 1;
489
        client
490
            .observe(path, move |msg| {
491
                match rx.try_recv() {
492
                    Ok(n) => receive_step = n,
493
                    _ => debug!("receive_step rx error"),
494
                }
495
                debug!("receive on client: {:?}", &msg);
496

497
                match receive_step {
498
                    1 => assert_eq!(msg.payload, payload1_clone),
499
                    2 => {
500
                        assert_eq!(msg.payload, payload2_clone);
501
                        tx2.send(()).unwrap();
502
                    }
503
                    _ => panic!("unexpected step"),
504
                }
505
            })
506
            .await
507
            .unwrap();
508

509
        step = 2;
510
        debug!("on step 2");
511
        tx.send(step).unwrap();
512

513
        request.message.payload = payload2.clone();
514
        request.message.set_token(vec![2]);
515

516
        let _ = client2.send(request).await.unwrap();
517
        assert_eq!(
518
            tokio::time::timeout(Duration::new(5, 0), rx2.recv())
519
                .await
520
                .unwrap(),
521
            Some(())
522
        );
523
    }
524
    #[tokio::test]
525
    async fn test_unobserve() {
526
        let path = "/test";
527
        let payload1 = b"data1".to_vec();
528
        let payload2 = b"data2".to_vec();
529

530
        let server_port = server::test::spawn_server("127.0.0.1:0", request_handler)
531
            .recv()
532
            .await
533
            .unwrap();
534

535
        let server_address = &format!("127.0.0.1:{}", server_port);
536

537
        let client = UdpCoAPClient::new_udp(server_address).await.unwrap();
538

539
        let client3 = client.clone();
540

541
        let mut request = RequestBuilder::new(path, coap_lite::RequestType::Put)
542
            .token(Some(vec![1]))
543
            .data(Some(payload1.clone()))
544
            .build();
545
        let _ = client.send(request.clone()).await.unwrap();
546

547
        let payload1_clone = payload1.clone();
548
        let unobserve = client
549
            .observe(path, move |msg| {
550
                assert_eq!(msg.payload, payload1_clone);
551
            })
552
            .await
553
            .unwrap();
554

555
        unobserve.send(client::ObserveMessage::Terminate).unwrap();
556
        request.message.payload = payload2.clone();
557

558
        let _ = client3.send(request).await.unwrap();
559
    }
560

561
    #[tokio::test]
562
    async fn test_observe_without_resource() {
563
        let path = "/test";
564

565
        let server_port = server::test::spawn_server("127.0.0.1:0", request_handler)
566
            .recv()
567
            .await
568
            .unwrap();
569

570
        let client = UdpCoAPClient::new_udp(format!("127.0.0.1:{}", server_port))
571
            .await
572
            .unwrap();
573
        let error = client.observe(path, |_msg| {}).await.unwrap_err();
574
        assert_eq!(error.kind(), ErrorKind::NotFound);
575
    }
576
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc