• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Alan-Jowett / sonde / 24443141467

15 Apr 2026 03:05AM UTC coverage: 80.383% (+0.08%) from 80.306%
24443141467

push

github

web-flow
feat: store-and-forward radio protocol implementation (#708)

* feat: add optional blob field to WAKE and COMMAND messages

Implements protocol-level store-and-forward support (SF-2, SF-3):

- NodeMessage::Wake gains optional blob field (CBOR key 10) for
  piggybacked uplink data from send_async().
- GatewayMessage::Command gains optional blob field (CBOR key 10)
  for piggybacked downlink data on NOP commands only.
- Non-NOP commands ignore blob (key 10) per protocol.md section 5.2.
- Add get_bytes_optional() helper for decoding optional bstr fields.
- Add validation tests T-P120, T-P121, T-P122 (round-trip encoding,
  key absence verification, non-NOP blob rejection).
- Update all call sites across gateway, node, and test code with
  blob: None (construction) or blob: _ (pattern matching).

All 683 tests pass. Backward compatible: existing WAKE/COMMAND
messages without blob decode identically to before.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* feat: gateway store-and-forward support

Implements GW-0509 through GW-0512 and modifies GW-0102, GW-0103,
GW-0506 for the store-and-forward data channel:

handler.rs:
- DataReply gains delivery field (CBOR key 4, default 0).
  delivery=1 means deferred; delivery=0 means immediate.
- Encode omits key 4 when delivery=0 (backward compatible).

engine.rs:
- Add deferred_replies storage (HashMap<node_id, Vec<u8>>, RAM-only).
- handle_wake_core: extract WAKE blob, route to handler, store
  non-empty reply for NEXT cycle's COMMAND.
- handle_wake_core: inject stored deferred reply into NOP COMMAND
  blob, clear after delivery.
- handle_app_data_core: when handler replies with delivery=1 and
  non-empty data, store for deferred delivery instead of sending
  immediate APP_DATA_REPLY.
- Deferred data only delivered on NOP (non-NOP commands skip it).

All 683 tests pass across protocol, gateway, and node crates.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* feat(... (continued)

492 of 577 new or added lines in 11 files covered. (85.27%)

1 existing line in 1 file now uncovered.

23545 of 29291 relevant lines covered (80.38%)

186.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.3
/crates/sonde-node/src/async_queue.rs
1
// SPDX-License-Identifier: MIT
2
// Copyright (c) 2026 sonde contributors
3

4
//! Sleep-retained async send queue for store-and-forward.
5
//!
6
//! BPF programs call `send_async` (helper #17) to enqueue data blobs
7
//! that are transmitted after BPF execution completes — either
8
//! piggybacked on the next WAKE or sent as individual APP_DATA frames.
9
//!
10
//! On ESP32, the queue is backed by RTC slow SRAM (`.rtc.data`) and
11
//! survives deep sleep (ND-0609). On host/test builds, a heap-allocated
12
//! buffer provides the same fixed-size layout for unit testing.
13

14
/// Maximum number of queued messages per wake cycle.
15
const MAX_MESSAGES: usize = 10;
16

17
/// Maximum size of a single blob (mirrors [`sonde_protocol::MAX_APP_DATA_BLOB_SIZE`]).
18
const MAX_BLOB_SIZE: usize = sonde_protocol::MAX_APP_DATA_BLOB_SIZE;
19

20
/// Magic value to validate that the RTC region was initialized by this
21
/// firmware version. ASCII "QUEU".
22
const QUEUE_MAGIC: u32 = 0x5155_4555;
23

24
// ---------------------------------------------------------------------------
25
// Fixed-size RTC layout
26
// ---------------------------------------------------------------------------
27

28
/// A single slot in the RTC queue layout.
29
#[repr(C)]
30
#[derive(Clone, Copy)]
31
struct RtcQueueItem {
32
    len: u32,
33
    data: [u8; MAX_BLOB_SIZE],
34
}
35

36
/// Fixed-size layout stored in RTC slow SRAM (ESP) or on the heap (tests).
37
///
38
/// Uses a magic/count commit pattern identical to
39
/// [`MapStorage::write_rtc_layout`](crate::map_storage) — `count` is
40
/// written last so that a reset mid-write never leaves [`from_rtc()`]
41
/// with a valid-looking but inconsistent record.
42
#[repr(C)]
43
struct RtcQueueLayout {
44
    magic: u32,
45
    count: u32,
46
    items: [RtcQueueItem; MAX_MESSAGES],
47
}
48

49
impl RtcQueueLayout {
50
    const fn zero() -> Self {
73✔
51
        Self {
73✔
52
            magic: 0,
73✔
53
            count: 0,
73✔
54
            items: [RtcQueueItem {
73✔
55
                len: 0,
73✔
56
                data: [0u8; MAX_BLOB_SIZE],
73✔
57
            }; MAX_MESSAGES],
73✔
58
        }
73✔
59
    }
73✔
60
}
61

62
#[cfg(feature = "esp")]
63
#[link_section = ".rtc.data"]
64
static mut QUEUE_LAYOUT: RtcQueueLayout = RtcQueueLayout::zero();
65

66
// ---------------------------------------------------------------------------
67
// AsyncQueue
68
// ---------------------------------------------------------------------------
69

70
/// Sleep-retained queue of data blobs destined for the gateway.
71
///
72
/// **Persistence contract (ND-0609):** On ESP32, the queue is backed by
73
/// a static in RTC slow SRAM (`.rtc.data`). Data survives deep sleep
74
/// so that blobs queued in cycle N are available for piggybacking in
75
/// cycle N+1's WAKE. Data is lost on reboot (RTC SRAM is cleared on
76
/// power-on reset).
77
///
78
/// On host/test builds, the queue is backed by a heap-allocated
79
/// [`RtcQueueLayout`], which is sufficient for unit testing.
80
pub struct AsyncQueue {
81
    #[cfg(not(feature = "esp"))]
82
    backing: Box<RtcQueueLayout>,
83
}
84

85
impl AsyncQueue {
86
    /// Create a fresh empty queue.
87
    ///
88
    /// On ESP: writes the magic value and sets count to 0 in RTC SRAM.
89
    /// On host: allocates a zeroed layout on the heap and sets the magic.
90
    pub fn new() -> Self {
73✔
91
        #[cfg(not(feature = "esp"))]
92
        {
93
            let mut backing = Box::new(RtcQueueLayout::zero());
73✔
94
            backing.magic = QUEUE_MAGIC;
73✔
95
            Self { backing }
73✔
96
        }
97
        #[cfg(feature = "esp")]
98
        {
99
            use core::sync::atomic::{fence, Ordering};
100
            unsafe {
101
                // Invalidate first so from_rtc() returns None on partial init.
102
                core::ptr::write_volatile(&raw mut QUEUE_LAYOUT.count, 0);
103
                fence(Ordering::SeqCst);
104
                core::ptr::write_volatile(&raw mut QUEUE_LAYOUT.magic, QUEUE_MAGIC);
105
                fence(Ordering::SeqCst);
106
                // count stays 0 — empty queue
107
            }
108
            Self {}
109
        }
110
    }
73✔
111

112
    /// Recover the queue from RTC slow SRAM after deep sleep.
113
    ///
114
    /// On ESP: validates the magic and count in the RTC layout. Returns
115
    /// a queue reflecting the persisted state if valid, or a fresh empty
116
    /// queue on cold boot or corruption.
117
    ///
118
    /// On host: always returns a fresh empty queue (no RTC SRAM to
119
    /// recover from).
120
    pub fn from_rtc() -> Self {
1✔
121
        #[cfg(feature = "esp")]
122
        {
123
            let magic = unsafe { core::ptr::read_volatile(&raw const QUEUE_LAYOUT.magic) };
124
            if magic != QUEUE_MAGIC {
125
                return Self::new();
126
            }
127
            let count = unsafe { core::ptr::read_volatile(&raw const QUEUE_LAYOUT.count) } as usize;
128
            if count > MAX_MESSAGES {
129
                return Self::new();
130
            }
131
            // Validate item lengths to catch corruption.
132
            for i in 0..count {
133
                let len = unsafe { core::ptr::read_volatile(&raw const QUEUE_LAYOUT.items[i].len) }
134
                    as usize;
135
                if len > MAX_BLOB_SIZE {
136
                    return Self::new();
137
                }
138
            }
139
            Self {}
140
        }
141
        #[cfg(not(feature = "esp"))]
142
        {
143
            Self::new()
1✔
144
        }
145
    }
1✔
146

147
    // ------- Low-level accessors -------
148

149
    fn read_count(&self) -> usize {
134✔
150
        #[cfg(feature = "esp")]
151
        {
152
            unsafe { core::ptr::read_volatile(&raw const QUEUE_LAYOUT.count) as usize }
153
        }
154
        #[cfg(not(feature = "esp"))]
155
        {
156
            self.backing.count as usize
134✔
157
        }
158
    }
134✔
159

160
    fn read_item_len(&self, index: usize) -> usize {
9✔
161
        #[cfg(feature = "esp")]
162
        {
163
            unsafe { core::ptr::read_volatile(&raw const QUEUE_LAYOUT.items[index].len) as usize }
164
        }
165
        #[cfg(not(feature = "esp"))]
166
        {
167
            self.backing.items[index].len as usize
9✔
168
        }
169
    }
9✔
170

171
    /// Return a reference to the data in slot `index`.
172
    ///
173
    /// The caller must ensure `index < read_count()` and that the item
174
    /// length has been validated (≤ [`MAX_BLOB_SIZE`]).
175
    fn read_item_data(&self, index: usize) -> &[u8] {
6✔
176
        let len = self.read_item_len(index);
6✔
177
        #[cfg(feature = "esp")]
178
        {
179
            unsafe {
180
                let data_ptr = &raw const QUEUE_LAYOUT.items[index].data as *const u8;
181
                core::slice::from_raw_parts(data_ptr, len)
182
            }
183
        }
184
        #[cfg(not(feature = "esp"))]
185
        {
186
            &self.backing.items[index].data[..len]
6✔
187
        }
188
    }
6✔
189

190
    /// Write a blob into slot `index`.
191
    fn write_item(&mut self, index: usize, data: &[u8]) {
42✔
192
        #[cfg(feature = "esp")]
193
        {
194
            unsafe {
195
                let data_dst = &raw mut QUEUE_LAYOUT.items[index].data as *mut u8;
196
                core::ptr::copy_nonoverlapping(data.as_ptr(), data_dst, data.len());
197
                core::ptr::write_volatile(
198
                    &raw mut QUEUE_LAYOUT.items[index].len,
199
                    data.len() as u32,
200
                );
201
            }
202
        }
203
        #[cfg(not(feature = "esp"))]
204
        {
42✔
205
            self.backing.items[index].data[..data.len()].copy_from_slice(data);
42✔
206
            self.backing.items[index].len = data.len() as u32;
42✔
207
        }
42✔
208
    }
42✔
209

210
    /// Commit `count` to the RTC layout.
211
    ///
212
    /// On ESP, a fence ensures all preceding item writes are visible
213
    /// before the count is updated (matching the invalidate-write-commit
214
    /// pattern in [`MapStorage::write_rtc_layout`](crate::map_storage)).
215
    fn commit_count(&mut self, count: usize) {
89✔
216
        #[cfg(feature = "esp")]
217
        {
218
            use core::sync::atomic::{fence, Ordering};
219
            unsafe {
220
                fence(Ordering::SeqCst);
221
                core::ptr::write_volatile(&raw mut QUEUE_LAYOUT.count, count as u32);
222
            }
223
        }
224
        #[cfg(not(feature = "esp"))]
225
        {
89✔
226
            self.backing.count = count as u32;
89✔
227
        }
89✔
228
    }
89✔
229

230
    // ------- Public API -------
231

232
    /// Enqueue a blob for deferred transmission.
233
    ///
234
    /// Returns `0` on success, `-1` if the queue is full, or `-2` if
235
    /// the blob exceeds the APP_DATA payload budget.
236
    pub fn push(&mut self, blob: Vec<u8>) -> i64 {
46✔
237
        if blob.len() > MAX_BLOB_SIZE {
46✔
238
            return -2;
1✔
239
        }
45✔
240
        let count = self.read_count();
45✔
241
        if count >= MAX_MESSAGES {
45✔
242
            return -1;
3✔
243
        }
42✔
244
        self.write_item(count, &blob);
42✔
245
        self.commit_count(count + 1);
42✔
246
        0
42✔
247
    }
46✔
248

249
    /// Drain all queued messages, returning them and leaving the queue empty.
250
    pub fn drain(&mut self) -> Vec<Vec<u8>> {
34✔
251
        let count = self.read_count();
34✔
252
        let mut result = Vec::with_capacity(count);
34✔
253
        for i in 0..count {
34✔
254
            result.push(self.read_item_data(i).to_vec());
4✔
255
        }
4✔
256
        self.commit_count(0);
34✔
257
        result
34✔
258
    }
34✔
259

260
    pub fn is_empty(&self) -> bool {
7✔
261
        self.read_count() == 0
7✔
262
    }
7✔
263

264
    pub fn len(&self) -> usize {
8✔
265
        self.read_count()
8✔
266
    }
8✔
267

268
    /// If exactly one message is queued and it fits within `wake_budget`
269
    /// bytes, return a reference to it for WAKE piggybacking.
270
    pub fn single_for_piggyback(&self, wake_budget: usize) -> Option<&[u8]> {
40✔
271
        if self.read_count() == 1 {
40✔
272
            let len = self.read_item_len(0);
3✔
273
            if len <= wake_budget && len <= MAX_BLOB_SIZE {
3✔
274
                return Some(self.read_item_data(0));
2✔
275
            }
1✔
276
        }
37✔
277
        None
38✔
278
    }
40✔
279

280
    /// Clear the queue without returning messages.
281
    pub fn clear(&mut self) {
13✔
282
        self.commit_count(0);
13✔
283
    }
13✔
284
}
285

286
impl Default for AsyncQueue {
NEW
287
    fn default() -> Self {
×
NEW
288
        Self::new()
×
NEW
289
    }
×
290
}
291

292
#[cfg(test)]
293
mod tests {
294
    use super::*;
295

296
    #[test]
297
    fn push_and_drain() {
1✔
298
        let mut q = AsyncQueue::new();
1✔
299
        assert!(q.is_empty());
1✔
300
        assert_eq!(q.len(), 0);
1✔
301

302
        assert_eq!(q.push(vec![1, 2, 3]), 0);
1✔
303
        assert!(!q.is_empty());
1✔
304
        assert_eq!(q.len(), 1);
1✔
305

306
        let msgs = q.drain();
1✔
307
        assert_eq!(msgs.len(), 1);
1✔
308
        assert_eq!(msgs[0], vec![1, 2, 3]);
1✔
309
        assert!(q.is_empty());
1✔
310
    }
1✔
311

312
    #[test]
313
    fn queue_full_returns_neg1() {
1✔
314
        let mut q = AsyncQueue::new();
1✔
315
        for i in 0..10 {
10✔
316
            assert_eq!(q.push(vec![i]), 0);
10✔
317
        }
318
        assert_eq!(q.push(vec![99]), -1);
1✔
319
        assert_eq!(q.len(), 10);
1✔
320
    }
1✔
321

322
    #[test]
323
    fn oversized_blob_returns_neg2() {
1✔
324
        let mut q = AsyncQueue::new();
1✔
325
        let big = vec![0x42u8; sonde_protocol::MAX_APP_DATA_BLOB_SIZE + 1];
1✔
326
        assert_eq!(q.push(big), -2);
1✔
327
        assert!(q.is_empty());
1✔
328
    }
1✔
329

330
    #[test]
331
    fn max_size_blob_accepted() {
1✔
332
        let mut q = AsyncQueue::new();
1✔
333
        let blob = vec![0x42u8; sonde_protocol::MAX_APP_DATA_BLOB_SIZE];
1✔
334
        assert_eq!(q.push(blob), 0);
1✔
335
        assert_eq!(q.len(), 1);
1✔
336
    }
1✔
337

338
    #[test]
339
    fn single_for_piggyback_one_fits() {
1✔
340
        let mut q = AsyncQueue::new();
1✔
341
        let _ = q.push(vec![1, 2, 3]);
1✔
342
        assert!(q.single_for_piggyback(100).is_some());
1✔
343
        assert_eq!(q.single_for_piggyback(100).unwrap(), &[1, 2, 3]);
1✔
344
    }
1✔
345

346
    #[test]
347
    fn single_for_piggyback_one_too_large() {
1✔
348
        let mut q = AsyncQueue::new();
1✔
349
        let _ = q.push(vec![1, 2, 3]);
1✔
350
        assert!(q.single_for_piggyback(2).is_none());
1✔
351
    }
1✔
352

353
    #[test]
354
    fn single_for_piggyback_multiple() {
1✔
355
        let mut q = AsyncQueue::new();
1✔
356
        let _ = q.push(vec![1]);
1✔
357
        let _ = q.push(vec![2]);
1✔
358
        assert!(q.single_for_piggyback(100).is_none());
1✔
359
    }
1✔
360

361
    #[test]
362
    fn single_for_piggyback_empty() {
1✔
363
        let q = AsyncQueue::new();
1✔
364
        assert!(q.single_for_piggyback(100).is_none());
1✔
365
    }
1✔
366

367
    #[test]
368
    fn from_rtc_returns_empty_on_host() {
1✔
369
        let q = AsyncQueue::from_rtc();
1✔
370
        assert!(q.is_empty());
1✔
371
        assert_eq!(q.len(), 0);
1✔
372
    }
1✔
373

374
    #[test]
375
    fn clear_empties_queue() {
1✔
376
        let mut q = AsyncQueue::new();
1✔
377
        assert_eq!(q.push(vec![0x42, 0x43]), 0);
1✔
378
        assert_eq!(q.push(vec![0x44, 0x45]), 0);
1✔
379
        assert_eq!(q.len(), 2);
1✔
380

381
        q.clear();
1✔
382
        assert!(q.is_empty());
1✔
383
        assert_eq!(q.len(), 0);
1✔
384
    }
1✔
385

386
    #[test]
387
    fn drain_preserves_data_fidelity() {
1✔
388
        let mut q = AsyncQueue::new();
1✔
389
        let blob1 = vec![0x42u8; 100];
1✔
390
        let blob2 = vec![0x43u8; 50];
1✔
391
        assert_eq!(q.push(blob1.clone()), 0);
1✔
392
        assert_eq!(q.push(blob2.clone()), 0);
1✔
393

394
        let drained = q.drain();
1✔
395
        assert_eq!(drained.len(), 2);
1✔
396
        assert_eq!(drained[0], blob1);
1✔
397
        assert_eq!(drained[1], blob2);
1✔
398
        assert!(q.is_empty());
1✔
399
    }
1✔
400

401
    #[test]
402
    fn layout_size_within_budget() {
1✔
403
        // RtcQueueItem may include alignment padding from the u32 len field.
404
        let item_size = core::mem::size_of::<RtcQueueItem>();
1✔
405
        let size = core::mem::size_of::<RtcQueueLayout>();
1✔
406
        assert_eq!(size, 8 + MAX_MESSAGES * item_size);
1✔
407
        assert!(size <= 2300, "queue layout exceeds 2.3 KB: {size}");
1✔
408
    }
1✔
409

410
    #[test]
411
    fn push_after_clear_reuses_slots() {
1✔
412
        let mut q = AsyncQueue::new();
1✔
413
        for _ in 0..MAX_MESSAGES {
1✔
414
            assert_eq!(q.push(vec![0x42]), 0);
10✔
415
        }
416
        assert_eq!(q.push(vec![0x42]), -1);
1✔
417

418
        q.clear();
1✔
419
        assert_eq!(q.push(vec![0x43]), 0);
1✔
420
        assert_eq!(q.len(), 1);
1✔
421

422
        let drained = q.drain();
1✔
423
        assert_eq!(drained[0], vec![0x43]);
1✔
424
    }
1✔
425
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc