• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

rust-bio / rust-htslib / 20028781206

08 Dec 2025 12:53PM UTC coverage: 81.912% (-0.02%) from 81.935%
20028781206

Pull #495

github

web-flow
Merge ddf9527d9 into 8f1cdd75c
Pull Request #495: feat!: Use Arc instead of Rc in bam::buffer, such that the buffer can be used in a multithreaded context

5 of 7 new or added lines in 1 file covered. (71.43%)

20 existing lines in 6 files now uncovered.

2785 of 3400 relevant lines covered (81.91%)

27209.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.06
/src/bam/buffer.rs
1
// Copyright 2017 Johannes Köster.
2
// Licensed under the MIT license (http://opensource.org/licenses/MIT)
3
// This file may not be copied, modified, or distributed
4
// except according to those terms.
5

6
use std::collections::{vec_deque, VecDeque};
7
use std::mem;
8
use std::str;
9
use std::sync::Arc;
10

11
use crate::bam;
12
use crate::bam::Read;
13
use crate::errors::{Error, Result};
14
/// A buffer for BAM records. This allows access regions in a sorted BAM file while iterating
15
/// over it in a single pass.
16
/// The buffer is implemented as a ringbuffer, such that extension or movement to the right has
17
/// linear complexity. The buffer makes use of indexed random access. Hence, when fetching a
18
/// region at the very end of the BAM, everything before is omitted without cost.
19
#[derive(Debug)]
20
pub struct RecordBuffer {
21
    reader: bam::IndexedReader,
22
    inner: VecDeque<Arc<bam::Record>>,
23
    overflow: Option<Arc<bam::Record>>,
24
    cache_cigar: bool,
25
    min_refetch_distance: u64,
26
    buffer_record: Arc<bam::Record>,
27
    start_pos: Option<u64>,
28
}
29

30
unsafe impl Sync for RecordBuffer {}
31
unsafe impl Send for RecordBuffer {}
32

33
impl RecordBuffer {
34
    /// Create a new `RecordBuffer`.
35
    ///
36
    /// # Arguments
37
    ///
38
    /// * `bam` - BAM reader
39
    /// * `cache_cigar` - whether to call `bam::Record::cache_cigar()` for each record.
40
    pub fn new(bam: bam::IndexedReader, cache_cigar: bool) -> Self {
2✔
41
        RecordBuffer {
42
            reader: bam,
43
            inner: VecDeque::new(),
3✔
44
            overflow: None,
45
            cache_cigar,
46
            min_refetch_distance: 1,
47
            buffer_record: Arc::new(bam::Record::new()),
4✔
48
            start_pos: Some(0),
1✔
49
        }
50
    }
51

52
    /// maximum distance to previous fetch window such that a
53
    /// new fetch operation is performed. If the distance is smaller, buffer will simply
54
    /// read through until the start of the new fetch window (probably saving some time
55
    /// by avoiding the random access).
56
    pub fn set_min_refetch_distance(&mut self, min_refetch_distance: u64) {
×
57
        self.min_refetch_distance = min_refetch_distance;
×
58
    }
59

60
    /// Return start position of buffer
61
    pub fn start(&self) -> Option<u64> {
2✔
62
        self.inner.front().map(|rec| rec.pos() as u64)
7✔
63
    }
64

65
    /// Return end position of buffer.
66
    pub fn end(&self) -> Option<u64> {
×
67
        self.inner.back().map(|rec| rec.pos() as u64)
×
68
    }
69

70
    pub fn tid(&self) -> Option<i32> {
×
71
        self.inner.back().map(|rec| rec.tid())
×
72
    }
73

74
    /// Fill buffer at the given interval. If the start coordinate is left of
75
    /// the previous start coordinate, this will use an additional BAM fetch IO operation.
76
    /// Coordinates are 0-based, and end is exclusive.
77
    /// Returns tuple with numbers of added and deleted records since the previous fetch.
78
    #[allow(unused_assignments)] // TODO this is needed because rustc thinks that deleted is unused
79
    pub fn fetch(&mut self, chrom: &[u8], start: u64, end: u64) -> Result<(usize, usize)> {
2✔
80
        let mut added = 0;
3✔
81
        // move overflow from last fetch into ringbuffer
82
        if self.overflow.is_some() {
3✔
83
            added += 1;
×
84
            self.inner.push_back(self.overflow.take().unwrap());
×
85
        }
86

87
        if let Some(tid) = self.reader.header.tid(chrom) {
6✔
88
            let mut deleted = 0;
3✔
89
            let window_start = start;
3✔
90
            if self.inner.is_empty()
3✔
91
                || window_start.saturating_sub(self.end().unwrap()) >= self.min_refetch_distance
×
92
                || self.tid().unwrap() != tid as i32
×
93
                || self.start().unwrap() > self.start_pos.unwrap()
×
94
            {
95
                let end = self.reader.header.target_len(tid).unwrap();
5✔
96
                self.reader.fetch((tid, window_start, end))?;
5✔
97
                deleted = self.inner.len();
2✔
98
                self.inner.clear();
3✔
99
            } else {
100
                // remove records too far left
101
                let to_remove = self
×
UNCOV
102
                    .inner
103
                    .iter()
104
                    .take_while(|rec| rec.pos() < window_start as i64)
×
105
                    .count();
106
                for _ in 0..to_remove {
×
107
                    self.inner.pop_front();
×
108
                }
UNCOV
109
                deleted = to_remove;
×
110
            }
111

112
            // extend to the right
113
            loop {
1✔
114
                match self
15✔
115
                    .reader
14✔
116
                    .read(Arc::get_mut(&mut self.buffer_record).unwrap())
22✔
117
                {
118
                    None => break,
1✔
119
                    Some(res) => res?,
13✔
120
                }
121

122
                if self.buffer_record.is_unmapped() {
7✔
123
                    continue;
124
                }
125

126
                let pos = self.buffer_record.pos();
13✔
127

128
                // skip records before the start
129
                if pos < start as i64 {
7✔
130
                    continue;
131
                }
132

133
                // Record is kept, do not reuse it for next iteration
134
                // and thus create a new one.
135
                let mut record =
7✔
136
                    mem::replace(&mut self.buffer_record, Arc::new(bam::Record::new()));
24✔
137

138
                if self.cache_cigar {
7✔
NEW
139
                    Arc::get_mut(&mut record).unwrap().cache_cigar();
×
140
                }
141

142
                if pos >= end as i64 {
7✔
143
                    self.overflow = Some(record);
×
UNCOV
144
                    break;
145
                } else {
146
                    self.inner.push_back(record);
19✔
147
                    added += 1;
7✔
148
                }
149
            }
150
            self.start_pos = Some(self.start().unwrap_or(window_start));
4✔
151

152
            Ok((added, deleted))
2✔
153
        } else {
154
            Err(Error::UnknownSequence {
×
155
                sequence: str::from_utf8(chrom).unwrap().to_owned(),
×
156
            })
157
        }
158
    }
159

160
    /// Iterate over records that have been fetched with `fetch`.
161
    pub fn iter(&self) -> vec_deque::Iter<'_, Arc<bam::Record>> {
2✔
162
        self.inner.iter()
3✔
163
    }
164

165
    /// Iterate over mutable references to records that have been fetched with `fetch`.
NEW
166
    pub fn iter_mut(&mut self) -> vec_deque::IterMut<'_, Arc<bam::Record>> {
×
167
        self.inner.iter_mut()
×
168
    }
169

170
    pub fn len(&self) -> usize {
×
171
        self.inner.len()
×
172
    }
173

174
    pub fn is_empty(&self) -> bool {
×
175
        self.len() == 0
×
176
    }
177
}
178

179
#[cfg(test)]
180
mod tests {
181
    use super::*;
182
    use crate::bam;
183

184
    #[test]
185
    fn test_buffer() {
186
        let reader = bam::IndexedReader::from_path("test/test.bam").unwrap();
187
        let mut buffer = RecordBuffer::new(reader, false);
188

189
        buffer.fetch(b"CHROMOSOME_I", 1, 5).unwrap();
190
        {
191
            let records: Vec<_> = buffer.iter().collect();
192
            assert_eq!(records.len(), 6);
193
            assert_eq!(records[0].pos(), 1);
194
            assert_eq!(records[1].pos(), 1);
195
            assert_eq!(records[2].pos(), 1);
196
            assert_eq!(records[3].pos(), 1);
197
            assert_eq!(records[4].pos(), 1);
198
            assert_eq!(records[5].pos(), 1);
199
        }
200
    }
201
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc