• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

projectfluent / fluent-rs / 13221860010

09 Feb 2025 03:13AM UTC coverage: 89.64% (-0.04%) from 89.675%
13221860010

Pull #378

github

web-flow
Merge ee3bfef45 into f2033ce83
Pull Request #378: deps: update `thiserror` to version 2

3963 of 4421 relevant lines covered (89.64%)

3602.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.57
/fluent-fallback/src/cache.rs
1
use std::{
2
    cell::{RefCell, UnsafeCell},
3
    cmp::Ordering,
4
    pin::Pin,
5
    task::Context,
6
    task::Poll,
7
    task::Waker,
8
};
9

10
use crate::generator::{BundleIterator, BundleStream};
11
use chunky_vec::ChunkyVec;
12
use futures::{ready, Stream};
13
use pin_cell::{PinCell, PinMut};
14

15
pub struct Cache<I, R>
16
where
17
    I: Iterator,
18
{
19
    iter: RefCell<I>,
20
    items: UnsafeCell<ChunkyVec<I::Item>>,
21
    res: std::marker::PhantomData<R>,
22
}
23

24
impl<I, R> Cache<I, R>
25
where
26
    I: Iterator,
27
{
28
    pub fn new(iter: I) -> Self {
10✔
29
        Self {
10✔
30
            iter: RefCell::new(iter),
10✔
31
            items: Default::default(),
10✔
32
            res: std::marker::PhantomData,
10✔
33
        }
10✔
34
    }
10✔
35

36
    pub fn len(&self) -> usize {
38✔
37
        unsafe {
38✔
38
            let items = self.items.get();
38✔
39
            (*items).len()
38✔
40
        }
38✔
41
    }
38✔
42

43
    pub fn get(&self, index: usize) -> Option<&I::Item> {
13✔
44
        unsafe {
13✔
45
            let items = self.items.get();
13✔
46
            (*items).get(index)
13✔
47
        }
13✔
48
    }
13✔
49

50
    /// Push, immediately getting a reference to the element
51
    pub fn push_get(&self, new_value: I::Item) -> &I::Item {
16✔
52
        unsafe {
16✔
53
            let items = self.items.get();
16✔
54
            (*items).push_get(new_value)
16✔
55
        }
16✔
56
    }
16✔
57
}
58

59
impl<I, R> Cache<I, R>
60
where
61
    I: BundleIterator + Iterator,
62
{
63
    pub fn prefetch(&self) {
×
64
        self.iter.borrow_mut().prefetch_sync();
×
65
    }
×
66
}
67

68
pub struct CacheIter<'a, I, R>
69
where
70
    I: Iterator,
71
{
72
    cache: &'a Cache<I, R>,
73
    curr: usize,
74
}
75

76
impl<'a, I, R> Iterator for CacheIter<'a, I, R>
77
where
78
    I: Iterator,
79
{
80
    type Item = &'a I::Item;
81

82
    fn next(&mut self) -> Option<Self::Item> {
38✔
83
        let cache_len = self.cache.len();
38✔
84
        match self.curr.cmp(&cache_len) {
38✔
85
            Ordering::Less => {
86
                // Cached value
87
                self.curr += 1;
13✔
88
                self.cache.get(self.curr - 1)
13✔
89
            }
90
            Ordering::Equal => {
91
                // Get the next item from the iterator
92
                let item = self.cache.iter.borrow_mut().next();
25✔
93
                self.curr += 1;
25✔
94
                if let Some(item) = item {
25✔
95
                    Some(self.cache.push_get(item))
16✔
96
                } else {
97
                    None
9✔
98
                }
99
            }
100
            Ordering::Greater => {
101
                // Ran off the end of the cache
102
                None
×
103
            }
104
        }
105
    }
38✔
106
}
107

108
impl<'a, I, R> IntoIterator for &'a Cache<I, R>
109
where
110
    I: Iterator,
111
{
112
    type Item = &'a I::Item;
113
    type IntoIter = CacheIter<'a, I, R>;
114

115
    fn into_iter(self) -> Self::IntoIter {
18✔
116
        CacheIter {
18✔
117
            cache: self,
18✔
118
            curr: 0,
18✔
119
        }
18✔
120
    }
18✔
121
}
122

123
////////////////////////////////////////////////////////////////////////////////
124

125
pub struct AsyncCache<S, R>
126
where
127
    S: Stream,
128
{
129
    stream: PinCell<S>,
130
    items: UnsafeCell<ChunkyVec<S::Item>>,
131
    // TODO: Should probably be an SmallVec<[Waker; 1]> or something? I guess
132
    // multiple pending wakes are not really all that common.
133
    pending_wakes: RefCell<Vec<Waker>>,
134
    res: std::marker::PhantomData<R>,
135
}
136

137
impl<S, R> AsyncCache<S, R>
138
where
139
    S: Stream,
140
{
141
    pub fn new(stream: S) -> Self {
1✔
142
        Self {
1✔
143
            stream: PinCell::new(stream),
1✔
144
            items: Default::default(),
1✔
145
            pending_wakes: Default::default(),
1✔
146
            res: std::marker::PhantomData,
1✔
147
        }
1✔
148
    }
1✔
149

150
    pub fn len(&self) -> usize {
2✔
151
        unsafe {
2✔
152
            let items = self.items.get();
2✔
153
            (*items).len()
2✔
154
        }
2✔
155
    }
2✔
156

157
    pub fn get(&self, index: usize) -> Poll<Option<&S::Item>> {
×
158
        unsafe {
×
159
            let items = self.items.get();
×
160
            (*items).get(index).into()
×
161
        }
×
162
    }
×
163

164
    /// Push, immediately getting a reference to the element
165
    pub fn push_get(&self, new_value: S::Item) -> &S::Item {
1✔
166
        unsafe {
1✔
167
            let items = self.items.get();
1✔
168
            (*items).push_get(new_value)
1✔
169
        }
1✔
170
    }
1✔
171

172
    pub fn stream(&self) -> AsyncCacheStream<'_, S, R> {
1✔
173
        AsyncCacheStream {
1✔
174
            cache: self,
1✔
175
            curr: 0,
1✔
176
        }
1✔
177
    }
1✔
178
}
179

180
impl<S, R> AsyncCache<S, R>
181
where
182
    S: BundleStream + Stream,
183
{
184
    pub async fn prefetch(&self) {
×
185
        let pin = unsafe { Pin::new_unchecked(&self.stream) };
×
186
        unsafe { PinMut::as_mut(&mut pin.borrow_mut()).get_unchecked_mut() }
×
187
            .prefetch_async()
×
188
            .await;
×
189
    }
×
190
}
191

192
impl<S, R> AsyncCache<S, R>
193
where
194
    S: Stream,
195
{
196
    // Helper function that gets the next value from wrapped stream.
197
    fn poll_next_item(&self, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
2✔
198
        let pin = unsafe { Pin::new_unchecked(&self.stream) };
2✔
199
        let poll = PinMut::as_mut(&mut pin.borrow_mut()).poll_next(cx);
2✔
200
        if poll.is_ready() {
2✔
201
            let wakers = std::mem::take(&mut *self.pending_wakes.borrow_mut());
2✔
202
            for waker in wakers {
2✔
203
                waker.wake();
×
204
            }
×
205
        } else {
×
206
            self.pending_wakes.borrow_mut().push(cx.waker().clone());
×
207
        }
×
208
        poll
2✔
209
    }
2✔
210
}
211

212
pub struct AsyncCacheStream<'a, S, R>
213
where
214
    S: Stream,
215
{
216
    cache: &'a AsyncCache<S, R>,
217
    curr: usize,
218
}
219

220
impl<'a, S, R> Stream for AsyncCacheStream<'a, S, R>
221
where
222
    S: Stream,
223
{
224
    type Item = &'a S::Item;
225

226
    fn poll_next(
2✔
227
        mut self: std::pin::Pin<&mut Self>,
2✔
228
        cx: &mut std::task::Context<'_>,
2✔
229
    ) -> Poll<Option<Self::Item>> {
2✔
230
        let cache_len = self.cache.len();
2✔
231
        match self.curr.cmp(&cache_len) {
2✔
232
            Ordering::Less => {
233
                // Cached value
234
                self.curr += 1;
×
235
                self.cache.get(self.curr - 1)
×
236
            }
237
            Ordering::Equal => {
238
                // Get the next item from the stream
239
                let item = ready!(self.cache.poll_next_item(cx));
2✔
240
                self.curr += 1;
2✔
241
                if let Some(item) = item {
2✔
242
                    Some(self.cache.push_get(item)).into()
1✔
243
                } else {
244
                    None.into()
1✔
245
                }
246
            }
247
            Ordering::Greater => {
248
                // Ran off the end of the cache
249
                None.into()
×
250
            }
251
        }
252
    }
2✔
253
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc