• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

google / alioth / 17114607742

21 Aug 2025 01:31AM UTC coverage: 10.411%. Remained the same
17114607742

Pull #273

github

web-flow
Merge 897d3fdbf into 7925c9625
Pull Request #273: feat: virtio packed queue

30 of 57 new or added lines in 9 files covered. (52.63%)

50 existing lines in 4 files now uncovered.

714 of 6858 relevant lines covered (10.41%)

16.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/alioth/src/virtio/dev/dev.rs
1
// Copyright 2024 Google LLC
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     https://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
pub mod balloon;
16
pub mod blk;
17
pub mod entropy;
18
#[path = "fs/fs.rs"]
19
pub mod fs;
20
#[cfg(target_os = "linux")]
21
#[path = "net/net.rs"]
22
pub mod net;
23
#[cfg(target_os = "linux")]
24
#[path = "vsock/vsock.rs"]
25
pub mod vsock;
26

27
use std::fmt::Debug;
28
use std::sync::Arc;
29
use std::sync::atomic::{AtomicU8, AtomicU16, AtomicU32};
30
use std::sync::mpsc::{self, Receiver, Sender};
31
use std::thread::JoinHandle;
32

33
use bitflags::Flags;
34
use snafu::ResultExt;
35

36
use crate::hv::IoeventFd;
37
use crate::mem::emulated::Mmio;
38
use crate::mem::mapped::{Ram, RamBus};
39
use crate::mem::{LayoutChanged, LayoutUpdated, MemRegion};
40
use crate::virtio::queue::split::SplitQueue;
41
use crate::virtio::queue::{QUEUE_SIZE_MAX, Queue, VirtQueue};
42
#[cfg(target_os = "linux")]
43
use crate::virtio::vu::conn::VuChannel;
44
use crate::virtio::worker::Waker;
45
use crate::virtio::{DeviceId, IrqSender, Result, VirtioFeature, error};
46

47
pub trait Virtio: Debug + Send + Sync + 'static {
48
    type Config: Mmio;
49
    type Feature: Flags<Bits = u128> + Debug;
50

51
    fn name(&self) -> &str;
52
    fn id(&self) -> DeviceId;
53
    fn num_queues(&self) -> u16;
54
    fn config(&self) -> Arc<Self::Config>;
55
    fn feature(&self) -> u128;
56
    fn spawn_worker<S: IrqSender, E: IoeventFd>(
57
        self,
58
        event_rx: Receiver<WakeEvent<S, E>>,
59
        memory: Arc<RamBus>,
60
        queue_regs: Arc<[Queue]>,
61
    ) -> Result<(JoinHandle<()>, Arc<Waker>)>;
62
    fn shared_mem_regions(&self) -> Option<Arc<MemRegion>> {
×
63
        None
×
64
    }
65
    fn ioeventfd_offloaded(&self, _q_index: u16) -> Result<bool> {
×
66
        Ok(false)
×
67
    }
68
    fn mem_update_callback(&self) -> Option<Box<dyn LayoutUpdated>> {
×
69
        None
×
70
    }
71
    fn mem_change_callback(&self) -> Option<Box<dyn LayoutChanged>> {
×
72
        None
×
73
    }
74
    #[cfg(target_os = "linux")]
75
    fn set_vu_channel(&mut self, _channel: Arc<VuChannel>) {}
76
}
77

78
#[derive(Debug, Default)]
79
pub struct Register {
80
    pub device_feature: [u32; 4],
81
    pub driver_feature: [AtomicU32; 4],
82
    pub device_feature_sel: AtomicU8,
83
    pub driver_feature_sel: AtomicU8,
84
    pub queue_sel: AtomicU16,
85
    pub status: AtomicU8,
86
}
87

88
const TOKEN_WARKER: u64 = 1 << 63;
89

90
#[derive(Debug, Clone)]
91
pub struct StartParam<S, E>
92
where
93
    S: IrqSender,
94
    E: IoeventFd,
95
{
96
    pub(crate) feature: u128,
97
    pub(crate) irq_sender: Arc<S>,
98
    pub(crate) ioeventfds: Option<Arc<[E]>>,
99
}
100

101
#[derive(Debug, Clone)]
102
pub enum WakeEvent<S, E>
103
where
104
    S: IrqSender,
105
    E: IoeventFd,
106
{
107
    Notify {
108
        q_index: u16,
109
    },
110
    Shutdown,
111
    #[cfg(target_os = "linux")]
112
    VuChannel {
113
        channel: Arc<VuChannel>,
114
    },
115
    Start {
116
        param: StartParam<S, E>,
117
    },
118
    Reset,
119
}
120

121
#[derive(Debug, PartialEq, Eq)]
122
pub enum WorkerState {
123
    Pending,
124
    Running,
125
    Shutdown,
126
}
127

128
#[derive(Debug)]
129
pub struct Worker<D, S, E, B>
130
where
131
    S: IrqSender,
132
    E: IoeventFd,
133
{
134
    context: Context<D, S, E>,
135
    backend: B,
136
}
137

138
#[derive(Debug)]
139
pub struct VirtioDevice<S, E>
140
where
141
    S: IrqSender,
142
    E: IoeventFd,
143
{
144
    pub name: Arc<str>,
145
    pub id: DeviceId,
146
    pub device_config: Arc<dyn Mmio>,
147
    pub device_feature: u128,
148
    pub queue_regs: Arc<[Queue]>,
149
    pub shared_mem_regions: Option<Arc<MemRegion>>,
150
    pub waker: Arc<Waker>,
151
    pub event_tx: Sender<WakeEvent<S, E>>,
152
    worker_handle: Option<JoinHandle<()>>,
153
}
154

155
impl<S, E> VirtioDevice<S, E>
156
where
157
    S: IrqSender,
158
    E: IoeventFd,
159
{
160
    fn shutdown(&mut self) -> Result<(), Box<dyn std::error::Error>> {
×
161
        let Some(handle) = self.worker_handle.take() else {
×
162
            return Ok(());
×
163
        };
164
        self.event_tx.send(WakeEvent::Shutdown)?;
×
165
        self.waker.wake()?;
×
166
        if let Err(e) = handle.join() {
×
167
            log::error!("{}: failed to join worker thread: {e:?}", self.name)
×
168
        }
169
        Ok(())
×
170
    }
171

172
    pub fn new<D>(
×
173
        name: impl Into<Arc<str>>,
174
        dev: D,
175
        memory: Arc<RamBus>,
176
        restricted_memory: bool,
177
    ) -> Result<Self>
178
    where
179
        D: Virtio,
180
    {
181
        let name = name.into();
×
182
        let id = dev.id();
×
183
        let device_config = dev.config();
×
184
        let mut device_feature = dev.feature();
×
185
        if restricted_memory {
×
186
            device_feature |= VirtioFeature::ACCESS_PLATFORM.bits()
×
187
        } else {
188
            device_feature &= !VirtioFeature::ACCESS_PLATFORM.bits()
×
189
        }
190
        let num_queues = dev.num_queues();
×
NEW
191
        let queue_regs = (0..num_queues).map(|_| Queue {
×
192
            size: AtomicU16::new(QUEUE_SIZE_MAX),
×
193
            ..Default::default()
×
194
        });
195
        let queue_regs = queue_regs.collect::<Arc<_>>();
×
196

197
        let shared_mem_regions = dev.shared_mem_regions();
×
198
        let (event_tx, event_rx) = mpsc::channel();
×
199
        let (handle, waker) = dev.spawn_worker(event_rx, memory, queue_regs.clone())?;
×
200
        log::debug!(
×
201
            "{name}: created with {:x?} {:x?}",
×
202
            VirtioFeature::from_bits_retain(device_feature & !D::Feature::all().bits()),
×
203
            D::Feature::from_bits_truncate(device_feature)
×
204
        );
205
        let virtio_dev = VirtioDevice {
206
            name,
207
            id,
208
            device_feature,
209
            queue_regs,
210
            worker_handle: Some(handle),
×
211
            event_tx,
212
            waker,
213
            device_config,
214
            shared_mem_regions,
215
        };
216
        Ok(virtio_dev)
×
217
    }
218
}
219

220
impl<S, E> Drop for VirtioDevice<S, E>
221
where
222
    S: IrqSender,
223
    E: IoeventFd,
224
{
225
    fn drop(&mut self) {
×
226
        if let Err(e) = self.shutdown() {
×
227
            log::error!("{}: failed to shutdown: {e}", self.name);
×
228
        }
229
    }
230
}
231

232
pub trait Backend<D: Virtio>: Send + 'static {
233
    fn register_waker(&mut self, token: u64) -> Result<Arc<Waker>>;
234
    fn reset(&self, dev: &mut D) -> Result<()>;
235
    fn event_loop<'m, S, Q, E>(
236
        &mut self,
237
        memory: &'m Ram,
238
        context: &mut Context<D, S, E>,
239
        queues: &mut [Option<Q>],
240
        param: &StartParam<S, E>,
241
    ) -> Result<()>
242
    where
243
        S: IrqSender,
244
        Q: VirtQueue<'m>,
245
        E: IoeventFd;
246
}
247

248
pub trait BackendEvent {
249
    fn token(&self) -> u64;
250
}
251

252
pub trait ActiveBackend<D: Virtio> {
253
    type Event: BackendEvent;
254
    fn handle_event(&mut self, dev: &mut D, event: &Self::Event) -> Result<()>;
255
    fn handle_queue(&mut self, dev: &mut D, index: u16) -> Result<()>;
256
}
257

258
#[derive(Debug)]
259
pub struct Context<D, S, E>
260
where
261
    S: IrqSender,
262
    E: IoeventFd,
263
{
264
    pub dev: D,
265
    memory: Arc<RamBus>,
266
    event_rx: Receiver<WakeEvent<S, E>>,
267
    queue_regs: Arc<[Queue]>,
268
    pub state: WorkerState,
269
}
270

271
impl<D, S, E> Context<D, S, E>
272
where
273
    D: Virtio,
274
    S: IrqSender,
275
    E: IoeventFd,
276
{
277
    fn handle_wake_events<B>(&mut self, backend: &mut B) -> Result<()>
×
278
    where
279
        B: ActiveBackend<D>,
280
    {
281
        while let Ok(event) = self.event_rx.try_recv() {
×
282
            match event {
×
283
                WakeEvent::Notify { q_index } => backend.handle_queue(&mut self.dev, q_index)?,
×
284
                WakeEvent::Shutdown => {
×
285
                    self.state = WorkerState::Shutdown;
×
286
                    break;
×
287
                }
288
                WakeEvent::Start { .. } => {
×
289
                    log::error!("{}: device has already started", self.dev.name())
×
290
                }
291
                #[cfg(target_os = "linux")]
292
                WakeEvent::VuChannel { channel } => self.dev.set_vu_channel(channel),
×
293
                WakeEvent::Reset => {
×
294
                    log::info!("{}: guest requested reset", self.dev.name());
×
295
                    self.state = WorkerState::Pending;
×
296
                    break;
×
297
                }
298
            }
299
        }
300
        Ok(())
×
301
    }
302

303
    fn wait_start(&mut self) -> Option<StartParam<S, E>> {
×
304
        for wake_event in self.event_rx.iter() {
×
305
            match wake_event {
×
306
                WakeEvent::Reset => {}
×
307
                WakeEvent::Start { param } => {
×
308
                    self.state = WorkerState::Running;
×
309
                    return Some(param);
×
310
                }
311
                #[cfg(target_os = "linux")]
312
                WakeEvent::VuChannel { channel } => self.dev.set_vu_channel(channel),
×
313
                WakeEvent::Shutdown => break,
×
314
                WakeEvent::Notify { q_index } => {
×
315
                    log::error!(
×
316
                        "{}: driver notified queue {q_index} before device is ready",
317
                        self.dev.name()
×
318
                    )
319
                }
320
            }
321
        }
322
        self.state = WorkerState::Shutdown;
×
323
        None
×
324
    }
325

326
    pub fn handle_event<B>(&mut self, event: &B::Event, backend: &mut B) -> Result<()>
×
327
    where
328
        B: ActiveBackend<D>,
329
    {
330
        if event.token() == TOKEN_WARKER {
×
331
            self.handle_wake_events(backend)
×
332
        } else {
333
            backend.handle_event(&mut self.dev, event)
×
334
        }
335
    }
336
}
337

338
impl<D, S, E, B> Worker<D, S, E, B>
339
where
340
    D: Virtio,
341
    S: IrqSender,
342
    B: Backend<D>,
343
    E: IoeventFd,
344
{
345
    pub fn spawn(
×
346
        dev: D,
347
        mut backend: B,
348
        event_rx: Receiver<WakeEvent<S, E>>,
349
        memory: Arc<RamBus>,
350
        queue_regs: Arc<[Queue]>,
351
    ) -> Result<(JoinHandle<()>, Arc<Waker>)> {
352
        let waker = backend.register_waker(TOKEN_WARKER)?;
×
353
        let worker = Worker {
354
            context: Context {
×
355
                dev,
356
                event_rx,
357
                memory,
358
                queue_regs,
359
                state: WorkerState::Pending,
360
            },
361
            backend,
362
        };
363
        let name = worker.context.dev.name();
×
364
        let handle = std::thread::Builder::new()
×
365
            .name(name.to_owned())
×
366
            .spawn(move || worker.do_work())
×
367
            .context(error::WorkerThread)?;
×
368
        Ok((handle, waker))
×
369
    }
370

371
    fn event_loop<'m, Q>(
×
372
        &mut self,
373
        queues: &mut [Option<Q>],
374
        ram: &'m Ram,
375
        param: &StartParam<S, E>,
376
    ) -> Result<()>
377
    where
378
        Q: VirtQueue<'m>,
379
        E: IoeventFd,
380
    {
381
        log::debug!(
×
382
            "{}: activated with {:x?} {:x?}",
×
383
            self.context.dev.name(),
×
384
            VirtioFeature::from_bits_retain(param.feature & !D::Feature::all().bits()),
×
385
            D::Feature::from_bits_truncate(param.feature)
×
386
        );
387
        self.backend
×
388
            .event_loop(ram, &mut self.context, queues, param)
×
389
    }
390

391
    fn loop_until_reset(&mut self) -> Result<()> {
×
392
        let Some(param) = self.context.wait_start() else {
×
393
            return Ok(());
×
394
        };
395
        let memory = self.context.memory.clone();
×
396
        let ram = memory.lock_layout();
×
397
        let feature = param.feature & !VirtioFeature::ACCESS_PLATFORM.bits();
×
398
        let queue_regs = self.context.queue_regs.clone();
×
399
        if VirtioFeature::from_bits_retain(feature).contains(VirtioFeature::RING_PACKED) {
×
400
            todo!()
401
        } else {
402
            let new_queue = |reg| SplitQueue::new(reg, &ram, feature);
×
403
            let queues: Result<Box<_>> = queue_regs.iter().map(new_queue).collect();
×
404
            self.event_loop(&mut (queues?), &ram, &param)?;
×
405
        };
406
        self.backend.reset(&mut self.context.dev)?;
×
407
        Ok(())
×
408
    }
409

410
    fn do_work(mut self) {
×
411
        while self.context.state != WorkerState::Shutdown {
×
412
            if let Err(e) = self.loop_until_reset() {
×
413
                log::error!("worker {}: {e:?}", self.context.dev.name(),);
×
414
                return;
×
415
            }
416
        }
417
        log::debug!("worker {}: done", self.context.dev.name())
×
418
    }
419
}
420

421
pub trait DevParam {
422
    type Device;
423
    fn build(self, name: impl Into<Arc<str>>) -> Result<Self::Device>;
424
    fn needs_mem_shared_fd(&self) -> bool {
×
425
        false
×
426
    }
427
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc