• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

google / alioth / 17185259678

24 Aug 2025 06:24AM UTC coverage: 13.868% (-0.02%) from 13.887%
17185259678

Pull #277

github

web-flow
Merge 2d53e34b0 into 861f19073
Pull Request #277: feat: Unix domain socket based vsock device

62 of 101 new or added lines in 9 files covered. (61.39%)

72 existing lines in 5 files now uncovered.

972 of 7009 relevant lines covered (13.87%)

17.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

39.84
/alioth/src/virtio/dev/dev.rs
1
// Copyright 2024 Google LLC
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     https://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
pub mod balloon;
16
pub mod blk;
17
pub mod entropy;
18
#[path = "fs/fs.rs"]
19
pub mod fs;
20
#[cfg(target_os = "linux")]
21
#[path = "net/net.rs"]
22
pub mod net;
23
#[cfg(target_os = "linux")]
24
#[path = "vsock/vsock.rs"]
25
pub mod vsock;
26

27
use std::fmt::Debug;
28
use std::sync::Arc;
29
use std::sync::atomic::{AtomicU8, AtomicU16, AtomicU32};
30
use std::sync::mpsc::{self, Receiver, Sender};
31
use std::thread::JoinHandle;
32

33
use bitflags::Flags;
34
use snafu::ResultExt;
35

36
use crate::hv::IoeventFd;
37
use crate::mem::emulated::Mmio;
38
use crate::mem::mapped::{Ram, RamBus};
39
use crate::mem::{LayoutChanged, LayoutUpdated, MemRegion};
40
use crate::virtio::queue::packed::PackedQueue;
41
use crate::virtio::queue::split::SplitQueue;
42
use crate::virtio::queue::{QUEUE_SIZE_MAX, Queue, QueueReg, VirtQueue};
43
#[cfg(target_os = "linux")]
44
use crate::virtio::vu::conn::VuChannel;
45
use crate::virtio::worker::Waker;
46
use crate::virtio::{DeviceId, IrqSender, Result, VirtioFeature, error};
47

48
pub trait Virtio: Debug + Send + Sync + 'static {
49
    type Config: Mmio;
50
    type Feature: Flags<Bits = u128> + Debug;
51

52
    fn name(&self) -> &str;
53
    fn id(&self) -> DeviceId;
54
    fn num_queues(&self) -> u16;
55
    fn config(&self) -> Arc<Self::Config>;
56
    fn feature(&self) -> u128;
57
    fn spawn_worker<S: IrqSender, E: IoeventFd>(
58
        self,
59
        event_rx: Receiver<WakeEvent<S, E>>,
60
        memory: Arc<RamBus>,
61
        queue_regs: Arc<[QueueReg]>,
62
    ) -> Result<(JoinHandle<()>, Arc<Waker>)>;
63
    fn shared_mem_regions(&self) -> Option<Arc<MemRegion>> {
×
64
        None
×
65
    }
66
    fn ioeventfd_offloaded(&self, _q_index: u16) -> Result<bool> {
×
67
        Ok(false)
×
68
    }
69
    fn mem_update_callback(&self) -> Option<Box<dyn LayoutUpdated>> {
×
70
        None
×
71
    }
72
    fn mem_change_callback(&self) -> Option<Box<dyn LayoutChanged>> {
×
73
        None
×
74
    }
75
    #[cfg(target_os = "linux")]
76
    fn set_vu_channel(&mut self, _channel: Arc<VuChannel>) {}
77
}
78

79
#[derive(Debug, Default)]
80
pub struct Register {
81
    pub device_feature: [u32; 4],
82
    pub driver_feature: [AtomicU32; 4],
83
    pub device_feature_sel: AtomicU8,
84
    pub driver_feature_sel: AtomicU8,
85
    pub queue_sel: AtomicU16,
86
    pub status: AtomicU8,
87
}
88

89
const TOKEN_WARKER: u64 = 1 << 63;
90

91
#[derive(Debug, Clone)]
92
pub struct StartParam<S, E>
93
where
94
    S: IrqSender,
95
    E: IoeventFd,
96
{
97
    pub(crate) feature: u128,
98
    pub(crate) irq_sender: Arc<S>,
99
    pub(crate) ioeventfds: Option<Arc<[E]>>,
100
}
101

102
#[derive(Debug, Clone)]
103
pub enum WakeEvent<S, E>
104
where
105
    S: IrqSender,
106
    E: IoeventFd,
107
{
108
    Notify {
109
        q_index: u16,
110
    },
111
    Shutdown,
112
    #[cfg(target_os = "linux")]
113
    VuChannel {
114
        channel: Arc<VuChannel>,
115
    },
116
    Start {
117
        param: StartParam<S, E>,
118
    },
119
    Reset,
120
}
121

122
#[derive(Debug, PartialEq, Eq)]
123
pub enum WorkerState {
124
    Pending,
125
    Running,
126
    Shutdown,
127
}
128

129
#[derive(Debug)]
130
pub struct Worker<D, S, E, B>
131
where
132
    S: IrqSender,
133
    E: IoeventFd,
134
{
135
    context: Context<D, S, E>,
136
    backend: B,
137
}
138

139
#[derive(Debug)]
140
pub struct VirtioDevice<S, E>
141
where
142
    S: IrqSender,
143
    E: IoeventFd,
144
{
145
    pub name: Arc<str>,
146
    pub id: DeviceId,
147
    pub device_config: Arc<dyn Mmio>,
148
    pub device_feature: u128,
149
    pub queue_regs: Arc<[QueueReg]>,
150
    pub shared_mem_regions: Option<Arc<MemRegion>>,
151
    pub waker: Arc<Waker>,
152
    pub event_tx: Sender<WakeEvent<S, E>>,
153
    worker_handle: Option<JoinHandle<()>>,
154
}
155

156
impl<S, E> VirtioDevice<S, E>
157
where
158
    S: IrqSender,
159
    E: IoeventFd,
160
{
161
    fn shutdown(&mut self) -> Result<(), Box<dyn std::error::Error>> {
×
162
        let Some(handle) = self.worker_handle.take() else {
×
163
            return Ok(());
×
164
        };
165
        self.event_tx.send(WakeEvent::Shutdown)?;
×
166
        self.waker.wake()?;
×
167
        if let Err(e) = handle.join() {
×
168
            log::error!("{}: failed to join worker thread: {e:?}", self.name)
×
169
        }
170
        Ok(())
×
171
    }
172

173
    pub fn new<D>(
×
174
        name: impl Into<Arc<str>>,
175
        dev: D,
176
        memory: Arc<RamBus>,
177
        restricted_memory: bool,
178
    ) -> Result<Self>
179
    where
180
        D: Virtio,
181
    {
182
        let name = name.into();
×
183
        let id = dev.id();
×
184
        let device_config = dev.config();
×
185
        let mut device_feature = dev.feature();
×
186
        if restricted_memory {
×
187
            device_feature |= VirtioFeature::ACCESS_PLATFORM.bits()
×
188
        } else {
189
            device_feature &= !VirtioFeature::ACCESS_PLATFORM.bits()
×
190
        }
191
        let num_queues = dev.num_queues();
×
192
        let queue_regs = (0..num_queues).map(|_| QueueReg {
×
193
            size: AtomicU16::new(QUEUE_SIZE_MAX),
×
194
            ..Default::default()
×
195
        });
196
        let queue_regs = queue_regs.collect::<Arc<_>>();
×
197

198
        let shared_mem_regions = dev.shared_mem_regions();
×
199
        let (event_tx, event_rx) = mpsc::channel();
×
200
        let (handle, waker) = dev.spawn_worker(event_rx, memory, queue_regs.clone())?;
×
201
        log::debug!(
×
202
            "{name}: created with {:x?} {:x?}",
×
203
            VirtioFeature::from_bits_retain(device_feature & !D::Feature::all().bits()),
×
204
            D::Feature::from_bits_truncate(device_feature)
×
205
        );
206
        let virtio_dev = VirtioDevice {
207
            name,
208
            id,
209
            device_feature,
210
            queue_regs,
211
            worker_handle: Some(handle),
×
212
            event_tx,
213
            waker,
214
            device_config,
215
            shared_mem_regions,
216
        };
217
        Ok(virtio_dev)
×
218
    }
219
}
220

221
impl<S, E> Drop for VirtioDevice<S, E>
222
where
223
    S: IrqSender,
224
    E: IoeventFd,
225
{
226
    fn drop(&mut self) {
×
227
        if let Err(e) = self.shutdown() {
×
228
            log::error!("{}: failed to shutdown: {e}", self.name);
×
229
        }
230
    }
231
}
232

233
pub trait Backend<D: Virtio>: Send + 'static {
234
    fn register_waker(&mut self, token: u64) -> Result<Arc<Waker>>;
235
    fn reset(&self, dev: &mut D) -> Result<()>;
236
    fn event_loop<'m, S, Q, E>(
237
        &mut self,
238
        memory: &'m Ram,
239
        context: &mut Context<D, S, E>,
240
        queues: &mut [Option<Queue<'_, 'm, Q>>],
241
        param: &StartParam<S, E>,
242
    ) -> Result<()>
243
    where
244
        S: IrqSender,
245
        Q: VirtQueue<'m>,
246
        E: IoeventFd;
247
}
248

249
pub trait BackendEvent {
250
    fn token(&self) -> u64;
251
}
252

253
pub trait ActiveBackend<D: Virtio> {
254
    type Event: BackendEvent;
255
    fn handle_event(&mut self, dev: &mut D, event: &Self::Event) -> Result<()>;
256
    fn handle_queue(&mut self, dev: &mut D, index: u16) -> Result<()>;
257
}
258

259
#[derive(Debug)]
260
pub struct Context<D, S, E>
261
where
262
    S: IrqSender,
263
    E: IoeventFd,
264
{
265
    pub dev: D,
266
    memory: Arc<RamBus>,
267
    event_rx: Receiver<WakeEvent<S, E>>,
268
    queue_regs: Arc<[QueueReg]>,
269
    pub state: WorkerState,
270
}
271

272
impl<D, S, E> Context<D, S, E>
273
where
274
    D: Virtio,
275
    S: IrqSender,
276
    E: IoeventFd,
277
{
278
    fn handle_wake_events<B>(&mut self, backend: &mut B) -> Result<()>
7✔
279
    where
280
        B: ActiveBackend<D>,
281
    {
282
        while let Ok(event) = self.event_rx.try_recv() {
22✔
283
            match event {
1✔
284
                WakeEvent::Notify { q_index } => backend.handle_queue(&mut self.dev, q_index)?,
8✔
285
                WakeEvent::Shutdown => {
×
286
                    self.state = WorkerState::Shutdown;
3✔
287
                    break;
3✔
288
                }
289
                WakeEvent::Start { .. } => {
×
290
                    log::error!("{}: device has already started", self.dev.name())
×
291
                }
292
                #[cfg(target_os = "linux")]
293
                WakeEvent::VuChannel { channel } => self.dev.set_vu_channel(channel),
×
294
                WakeEvent::Reset => {
×
295
                    log::info!("{}: guest requested reset", self.dev.name());
×
296
                    self.state = WorkerState::Pending;
×
297
                    break;
×
298
                }
299
            }
300
        }
301
        Ok(())
7✔
302
    }
303

304
    fn wait_start(&mut self) -> Option<StartParam<S, E>> {
3✔
305
        for wake_event in self.event_rx.iter() {
8✔
306
            match wake_event {
3✔
307
                WakeEvent::Reset => {}
×
308
                WakeEvent::Start { param } => {
3✔
309
                    self.state = WorkerState::Running;
3✔
310
                    return Some(param);
3✔
311
                }
312
                #[cfg(target_os = "linux")]
313
                WakeEvent::VuChannel { channel } => self.dev.set_vu_channel(channel),
×
314
                WakeEvent::Shutdown => break,
×
315
                WakeEvent::Notify { q_index } => {
×
316
                    log::error!(
×
317
                        "{}: driver notified queue {q_index} before device is ready",
318
                        self.dev.name()
×
319
                    )
320
                }
321
            }
322
        }
323
        self.state = WorkerState::Shutdown;
×
324
        None
×
325
    }
326

327
    pub fn handle_event<B>(&mut self, event: &B::Event, backend: &mut B) -> Result<()>
7✔
328
    where
329
        B: ActiveBackend<D>,
330
    {
331
        if event.token() == TOKEN_WARKER {
7✔
332
            self.handle_wake_events(backend)
19✔
333
        } else {
334
            backend.handle_event(&mut self.dev, event)
×
335
        }
336
    }
337
}
338

339
impl<D, S, E, B> Worker<D, S, E, B>
340
where
341
    D: Virtio,
342
    S: IrqSender,
343
    B: Backend<D>,
344
    E: IoeventFd,
345
{
346
    pub fn spawn(
3✔
347
        dev: D,
348
        mut backend: B,
349
        event_rx: Receiver<WakeEvent<S, E>>,
350
        memory: Arc<RamBus>,
351
        queue_regs: Arc<[QueueReg]>,
352
    ) -> Result<(JoinHandle<()>, Arc<Waker>)> {
353
        let waker = backend.register_waker(TOKEN_WARKER)?;
8✔
354
        let worker = Worker {
355
            context: Context {
1✔
356
                dev,
357
                event_rx,
358
                memory,
359
                queue_regs,
360
                state: WorkerState::Pending,
361
            },
362
            backend,
363
        };
364
        let name = worker.context.dev.name();
2✔
365
        let handle = std::thread::Builder::new()
5✔
366
            .name(name.to_owned())
2✔
367
            .spawn(move || worker.do_work())
7✔
368
            .context(error::WorkerThread)?;
1✔
369
        Ok((handle, waker))
1✔
370
    }
371

372
    fn event_loop<'m, Q>(
3✔
373
        &mut self,
374
        queues: &mut [Option<Queue<'_, 'm, Q>>],
375
        ram: &'m Ram,
376
        param: &StartParam<S, E>,
377
    ) -> Result<()>
378
    where
379
        Q: VirtQueue<'m>,
380
        E: IoeventFd,
381
    {
382
        log::debug!(
4✔
383
            "{}: activated with {:x?} {:x?}",
×
384
            self.context.dev.name(),
×
385
            VirtioFeature::from_bits_retain(param.feature & !D::Feature::all().bits()),
×
386
            D::Feature::from_bits_truncate(param.feature)
×
387
        );
388
        self.backend
3✔
389
            .event_loop(ram, &mut self.context, queues, param)
11✔
390
    }
391

392
    fn loop_until_reset(&mut self) -> Result<()> {
3✔
393
        let Some(param) = self.context.wait_start() else {
5✔
394
            return Ok(());
×
395
        };
396
        let memory = self.context.memory.clone();
2✔
397
        let ram = memory.lock_layout();
2✔
398
        let feature = param.feature & !VirtioFeature::ACCESS_PLATFORM.bits();
2✔
399
        let queue_regs = self.context.queue_regs.clone();
1✔
400
        let feature = VirtioFeature::from_bits_retain(feature);
2✔
401
        let event_idx = feature.contains(VirtioFeature::EVENT_IDX);
1✔
402
        if feature.contains(VirtioFeature::RING_PACKED) {
1✔
403
            let new_queue = |reg| {
×
404
                let Some(split_queue) = PackedQueue::new(reg, &ram, event_idx)? else {
×
405
                    return Ok(None);
×
406
                };
NEW
407
                Ok(Some(Queue::new(split_queue, reg, &ram)))
×
408
            };
409
            let queues: Result<Box<_>> = queue_regs.iter().map(new_queue).collect();
×
410
            self.event_loop(&mut (queues?), &ram, &param)?;
×
411
        } else {
412
            let new_queue = |reg| {
6✔
413
                let Some(split_queue) = SplitQueue::new(reg, &ram, event_idx)? else {
11✔
414
                    return Ok(None);
×
415
                };
NEW
416
                Ok(Some(Queue::new(split_queue, reg, &ram)))
1✔
417
            };
418
            let queues: Result<Box<_>> = queue_regs.iter().map(new_queue).collect();
2✔
419
            self.event_loop(&mut (queues?), &ram, &param)?;
3✔
420
        };
421
        self.backend.reset(&mut self.context.dev)?;
4✔
422
        Ok(())
3✔
423
    }
424

425
    fn do_work(mut self) {
3✔
426
        while self.context.state != WorkerState::Shutdown {
6✔
427
            if let Err(e) = self.loop_until_reset() {
4✔
428
                log::error!("worker {}: {e:?}", self.context.dev.name(),);
×
429
                return;
×
430
            }
431
        }
432
        log::debug!("worker {}: done", self.context.dev.name())
5✔
433
    }
434
}
435

436
pub trait DevParam {
437
    type Device;
438
    fn build(self, name: impl Into<Arc<str>>) -> Result<Self::Device>;
439
    fn needs_mem_shared_fd(&self) -> bool {
×
440
        false
×
441
    }
442
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc