• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wboayue / rust-ibapi / 16243597896

13 Jul 2025 12:39AM UTC coverage: 75.566% (+0.2%) from 75.325%
16243597896

push

github

web-flow
Unify DataStream trait for sync and async implementations (#267)

177 of 248 new or added lines in 23 files covered. (71.37%)

11 existing lines in 4 files now uncovered.

4741 of 6274 relevant lines covered (75.57%)

32.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.0
/src/subscriptions/sync.rs
1
//! Synchronous subscription implementation
2

3
use std::marker::PhantomData;
4
use std::sync::atomic::{AtomicBool, Ordering};
5
use std::sync::Mutex;
6
use std::time::Duration;
7

8
use log::{debug, error, warn};
9

10
use super::common::{process_decode_result, should_retry_error, should_store_error, ProcessingResult};
11
use super::{ResponseContext, StreamDecoder};
12
use crate::client::Client;
13
use crate::errors::Error;
14
use crate::messages::{OutgoingMessages, ResponseMessage};
15
use crate::transport::InternalSubscription;
16

17
/// A [Subscription] is a stream of responses returned from TWS. A [Subscription] is normally returned when invoking an API that can return more than one value.
18
///
19
/// You can convert subscriptions into blocking or non-blocking iterators using the [iter](Subscription::iter), [try_iter](Subscription::try_iter) or [timeout_iter](Subscription::timeout_iter) methods.
20
///
21
/// Alternatively, you may poll subscriptions in a blocking or non-blocking manner using the [next](Subscription::next), [try_next](Subscription::try_next) or [next_timeout](Subscription::next_timeout) methods.
22
#[allow(private_bounds)]
23
#[derive(Debug)]
24
pub struct Subscription<'a, T: StreamDecoder<T> + 'static> {
25
    client: &'a Client,
26
    request_id: Option<i32>,
27
    order_id: Option<i32>,
28
    message_type: Option<OutgoingMessages>,
29
    phantom: PhantomData<T>,
30
    cancelled: AtomicBool,
31
    snapshot_ended: AtomicBool,
32
    subscription: InternalSubscription,
33
    response_context: Option<ResponseContext>,
34
    error: Mutex<Option<Error>>,
35
}
36

37
#[allow(private_bounds)]
38
impl<'a, T: StreamDecoder<T> + 'static> Subscription<'a, T> {
39
    pub(crate) fn new(client: &'a Client, subscription: InternalSubscription, context: Option<ResponseContext>) -> Self {
42✔
40
        let request_id = subscription.request_id;
84✔
41
        let order_id = subscription.order_id;
84✔
42
        let message_type = subscription.message_type;
84✔
43

44
        Subscription {
45
            client,
46
            request_id,
47
            order_id,
48
            message_type,
49
            subscription,
50
            response_context: context,
51
            phantom: PhantomData,
52
            cancelled: AtomicBool::new(false),
84✔
53
            snapshot_ended: AtomicBool::new(false),
84✔
54
            error: Mutex::new(None),
42✔
55
        }
56
    }
57

58
    /// Cancel the subscription
59
    pub fn cancel(&self) {
45✔
60
        // Only cancel if snapshot hasn't ended (for market data snapshots)
61
        // For streaming subscriptions, snapshot_ended will remain false
62
        if self.snapshot_ended.load(Ordering::Relaxed) {
135✔
63
            return;
×
64
        }
65

66
        if self.cancelled.load(Ordering::Relaxed) {
×
67
            return;
3✔
68
        }
69

70
        self.cancelled.store(true, Ordering::Relaxed);
×
71

72
        if let Some(request_id) = self.request_id {
33✔
73
            if let Ok(message) = T::cancel_message(self.client.server_version(), self.request_id, self.response_context.as_ref()) {
24✔
74
                if let Err(e) = self.client.message_bus.cancel_subscription(request_id, &message) {
×
75
                    warn!("error cancelling subscription: {e}")
×
76
                }
77
                self.subscription.cancel();
×
78
            }
79
        } else if let Some(order_id) = self.order_id {
9✔
NEW
80
            if let Ok(message) = T::cancel_message(self.client.server_version(), self.request_id, self.response_context.as_ref()) {
×
81
                if let Err(e) = self.client.message_bus.cancel_order_subscription(order_id, &message) {
×
82
                    warn!("error cancelling order subscription: {e}")
×
83
                }
84
                self.subscription.cancel();
×
85
            }
86
        } else if let Some(message_type) = self.message_type {
16✔
87
            if let Ok(message) = T::cancel_message(self.client.server_version(), self.request_id, self.response_context.as_ref()) {
3✔
88
                if let Err(e) = self.client.message_bus.cancel_shared_subscription(message_type, &message) {
×
89
                    warn!("error cancelling shared subscription: {e}")
×
90
                }
91
                self.subscription.cancel();
×
92
            }
93
        } else {
94
            debug!("Could not determine cancel method")
2✔
95
        }
96
    }
97

98
    /// Returns the next available value, blocking if necessary until a value becomes available.
99
    ///
100
    /// # Examples
101
    ///
102
    /// ```no_run
103
    /// use ibapi::Client;
104
    /// use ibapi::contracts::Contract;
105
    ///
106
    /// let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed");
107
    ///
108
    /// let contract = Contract::stock("AAPL");
109
    /// let subscription = client.market_data(&contract, &["233"], false, false).expect("market data request failed");
110
    ///
111
    /// // Process data blocking until the next value is available
112
    /// while let Some(data) = subscription.next() {
113
    ///     println!("Received data: {data:?}");
114
    /// }
115
    ///
116
    /// // When the loop exits, check if it was due to an error
117
    /// if let Some(err) = subscription.error() {
118
    ///     eprintln!("subscription error: {err}");
119
    /// }
120
    /// ```
121
    /// # Returns
122
    /// * `Some(T)` - The next available item from the subscription
123
    /// * `None` - If the subscription has ended or encountered an error
124
    pub fn next(&self) -> Option<T> {
57✔
125
        match self.process_response(self.subscription.next()) {
171✔
126
            Some(val) => Some(val),
106✔
127
            None => match self.error() {
4✔
128
                Some(ref err) if should_retry_error(err) => {
×
129
                    debug!("retrying after error: {err:?}");
×
130
                    self.next()
×
131
                }
132
                _ => None,
4✔
133
            },
134
        }
135
    }
136

137
    /// Returns the current error state of the subscription.
138
    ///
139
    /// This method allows checking if an error occurred during subscription processing.
140
    /// Errors are stored internally when they occur during `next()`, `try_next()`, or `next_timeout()` calls.
141
    ///
142
    /// # Returns
143
    /// * `Some(Error)` - If an error has occurred
144
    /// * `None` - If no error has occurred
145
    pub fn error(&self) -> Option<Error> {
6✔
146
        let error = self.error.lock().unwrap();
24✔
147
        error.clone()
6✔
148
    }
149

150
    fn clear_error(&self) {
57✔
151
        let mut error = self.error.lock().unwrap();
228✔
152
        *error = None;
114✔
153
    }
154

155
    fn process_response(&self, response: Option<Result<ResponseMessage, Error>>) -> Option<T> {
57✔
156
        self.clear_error();
114✔
157

158
        match response {
54✔
159
            Some(Ok(message)) => self.process_message(message),
54✔
160
            Some(Err(e)) => {
×
161
                if should_store_error(&e) {
×
162
                    let mut error = self.error.lock().unwrap();
×
163
                    *error = Some(e);
×
164
                }
165
                None
×
166
            }
167
            None => None,
3✔
168
        }
169
    }
170

171
    fn process_message(&self, mut message: ResponseMessage) -> Option<T> {
54✔
172
        match process_decode_result(T::decode(self.client.server_version, &mut message)) {
162✔
173
            ProcessingResult::Success(val) => {
53✔
174
                // Check if this decoded value represents the end of a snapshot subscription
175
                if val.is_snapshot_end() {
×
176
                    self.snapshot_ended.store(true, Ordering::Relaxed);
×
177
                }
178
                Some(val)
×
179
            }
180
            ProcessingResult::EndOfStream => None,
1✔
181
            ProcessingResult::Retry => {
×
182
                // This case shouldn't happen here since UnexpectedResponse is handled at the next() level
183
                // but we handle it for completeness
184
                None
×
185
            }
186
            ProcessingResult::Error(err) => {
×
187
                error!("error decoding message: {err}");
×
188
                let mut error = self.error.lock().unwrap();
×
189
                *error = Some(err);
×
190
                None
×
191
            }
192
        }
193
    }
194

195
    /// Tries to return the next available value without blocking.
196
    ///
197
    /// Returns immediately with:
198
    /// - `Some(value)` if a value is available
199
    /// - `None` if no data is currently available
200
    ///
201
    /// Use this method when you want to poll for data without blocking.
202
    /// Check `error()` to determine if `None` was returned due to an error.
203
    ///
204
    /// # Examples
205
    ///
206
    /// ```no_run
207
    /// use ibapi::Client;
208
    /// use ibapi::contracts::Contract;
209
    /// use std::thread;
210
    /// use std::time::Duration;
211
    ///
212
    /// let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed");
213
    ///
214
    /// let contract = Contract::stock("AAPL");
215
    /// let subscription = client.market_data(&contract, &["233"], false, false).expect("market data request failed");
216
    ///
217
    /// // Poll for data without blocking
218
    /// loop {
219
    ///     if let Some(data) = subscription.try_next() {
220
    ///         println!("{data:?}");
221
    ///     } else if let Some(err) = subscription.error() {
222
    ///         eprintln!("Error: {err}");
223
    ///         break;
224
    ///     } else {
225
    ///         // No data available, do other work or sleep
226
    ///         thread::sleep(Duration::from_millis(100));
227
    ///     }
228
    /// }
229
    /// ```
230
    pub fn try_next(&self) -> Option<T> {
×
231
        self.process_response(self.subscription.try_next())
×
232
    }
233

234
    /// Waits for the next available value up to the specified timeout duration.
235
    ///
236
    /// Returns:
237
    /// - `Some(value)` if a value becomes available within the timeout
238
    /// - `None` if the timeout expires before data becomes available
239
    ///
240
    /// Check `error()` to determine if `None` was returned due to an error.
241
    ///
242
    /// # Examples
243
    ///
244
    /// ```no_run
245
    /// use ibapi::Client;
246
    /// use ibapi::contracts::Contract;
247
    /// use std::time::Duration;
248
    ///
249
    /// let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed");
250
    ///
251
    /// let contract = Contract::stock("AAPL");
252
    /// let subscription = client.market_data(&contract, &["233"], false, false).expect("market data request failed");
253
    ///
254
    /// // Wait up to 5 seconds for data
255
    /// if let Some(data) = subscription.next_timeout(Duration::from_secs(5)) {
256
    ///     println!("{data:?}");
257
    /// } else if let Some(err) = subscription.error() {
258
    ///     eprintln!("Error: {err}");
259
    /// } else {
260
    ///     eprintln!("Timeout: no data received within 5 seconds");
261
    /// }
262
    /// ```
263
    pub fn next_timeout(&self, timeout: Duration) -> Option<T> {
×
264
        self.process_response(self.subscription.next_timeout(timeout))
×
265
    }
266

267
    /// Creates a blocking iterator over the subscription data.
268
    ///
269
    /// The iterator will block waiting for the next value if none is immediately available.
270
    /// The iterator ends when the subscription is cancelled or an unrecoverable error occurs.
271
    ///
272
    /// # Examples
273
    ///
274
    /// ```no_run
275
    /// use ibapi::Client;
276
    ///
277
    /// let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed");
278
    ///
279
    /// let subscription = client.positions().expect("positions request failed");
280
    ///
281
    /// // Process all positions as they arrive
282
    /// for position in subscription.iter() {
283
    ///     println!("{position:?}");
284
    /// }
285
    ///
286
    /// // Check if iteration ended due to an error
287
    /// if let Some(err) = subscription.error() {
288
    ///     eprintln!("Subscription error: {err}");
289
    /// }
290
    /// ```
291
    pub fn iter(&self) -> SubscriptionIter<'_, T> {
10✔
292
        SubscriptionIter { subscription: self }
293
    }
294

295
    /// Creates a non-blocking iterator over the subscription data.
296
    ///
297
    /// The iterator will return immediately with `None` if no data is available.
298
    /// Use this when you want to process available data without blocking.
299
    ///
300
    /// # Examples
301
    ///
302
    /// ```no_run
303
    /// use ibapi::Client;
304
    /// use std::thread;
305
    /// use std::time::Duration;
306
    ///
307
    /// let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed");
308
    ///
309
    /// let subscription = client.positions().expect("positions request failed");
310
    ///
311
    /// // Process available positions without blocking
312
    /// loop {
313
    ///     let mut data_received = false;
314
    ///     for position in subscription.try_iter() {
315
    ///         data_received = true;
316
    ///         println!("{position:?}");
317
    ///     }
318
    ///     
319
    ///     if let Some(err) = subscription.error() {
320
    ///         eprintln!("Error: {err}");
321
    ///         break;
322
    ///     }
323
    ///     
324
    ///     if !data_received {
325
    ///         // No data available, do other work or sleep
326
    ///         thread::sleep(Duration::from_millis(100));
327
    ///     }
328
    /// }
329
    /// ```
330
    pub fn try_iter(&self) -> SubscriptionTryIter<'_, T> {
×
331
        SubscriptionTryIter { subscription: self }
332
    }
333

334
    /// Creates an iterator that waits up to the specified timeout for each value.
335
    ///
336
    /// The iterator will wait up to `timeout` duration for each value.
337
    /// If the timeout expires, the iterator ends.
338
    ///
339
    /// # Examples
340
    ///
341
    /// ```no_run
342
    /// use ibapi::Client;
343
    /// use std::time::Duration;
344
    ///
345
    /// let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed");
346
    ///
347
    /// let subscription = client.positions().expect("positions request failed");
348
    ///
349
    /// // Process positions with a 5 second timeout per item
350
    /// for position in subscription.timeout_iter(Duration::from_secs(5)) {
351
    ///     println!("{position:?}");
352
    /// }
353
    ///
354
    /// if let Some(err) = subscription.error() {
355
    ///     eprintln!("Error: {err}");
356
    /// } else {
357
    ///     println!("No more positions received within timeout");
358
    /// }
359
    /// ```
360
    pub fn timeout_iter(&self, timeout: Duration) -> SubscriptionTimeoutIter<'_, T> {
×
361
        SubscriptionTimeoutIter { subscription: self, timeout }
362
    }
363
}
364

365
impl<T: StreamDecoder<T> + 'static> Drop for Subscription<'_, T> {
366
    /// Cancel subscription on drop
367
    fn drop(&mut self) {
42✔
368
        debug!("dropping subscription");
42✔
369
        self.cancel();
84✔
370
    }
371
}
372

373
/// An iterator that yields items as they become available, blocking if necessary.
374
#[allow(private_bounds)]
375
#[derive(Debug)]
376
pub struct SubscriptionIter<'a, T: StreamDecoder<T> + 'static> {
377
    subscription: &'a Subscription<'a, T>,
378
}
379

380
impl<T: StreamDecoder<T> + 'static> Iterator for SubscriptionIter<'_, T> {
381
    type Item = T;
382

383
    fn next(&mut self) -> Option<Self::Item> {
20✔
384
        self.subscription.next()
40✔
385
    }
386
}
387

388
impl<'a, T: StreamDecoder<T> + 'static> IntoIterator for &'a Subscription<'a, T> {
389
    type Item = T;
390
    type IntoIter = SubscriptionIter<'a, T>;
391

392
    fn into_iter(self) -> Self::IntoIter {
1✔
393
        self.iter()
2✔
394
    }
395
}
396

397
/// An iterator that takes ownership and yields items as they become available, blocking if necessary.
398
#[allow(private_bounds)]
399
#[derive(Debug)]
400
pub struct SubscriptionOwnedIter<'a, T: StreamDecoder<T> + 'static> {
401
    subscription: Subscription<'a, T>,
402
}
403

404
impl<T: StreamDecoder<T> + 'static> Iterator for SubscriptionOwnedIter<'_, T> {
405
    type Item = T;
406

407
    fn next(&mut self) -> Option<Self::Item> {
×
408
        self.subscription.next()
×
409
    }
410
}
411

412
impl<'a, T: StreamDecoder<T> + 'static> IntoIterator for Subscription<'a, T> {
413
    type Item = T;
414
    type IntoIter = SubscriptionOwnedIter<'a, T>;
415

416
    fn into_iter(self) -> Self::IntoIter {
×
417
        SubscriptionOwnedIter { subscription: self }
418
    }
419
}
420

421
/// An iterator that yields items as they become available without blocking.
422
#[allow(private_bounds)]
423
#[derive(Debug)]
424
pub struct SubscriptionTryIter<'a, T: StreamDecoder<T> + 'static> {
425
    subscription: &'a Subscription<'a, T>,
426
}
427

428
impl<T: StreamDecoder<T> + 'static> Iterator for SubscriptionTryIter<'_, T> {
429
    type Item = T;
430

431
    fn next(&mut self) -> Option<Self::Item> {
×
432
        self.subscription.try_next()
×
433
    }
434
}
435

436
/// An iterator that yields items with a timeout.
437
#[allow(private_bounds)]
438
#[derive(Debug)]
439
pub struct SubscriptionTimeoutIter<'a, T: StreamDecoder<T> + 'static> {
440
    subscription: &'a Subscription<'a, T>,
441
    timeout: Duration,
442
}
443

444
impl<T: StreamDecoder<T> + 'static> Iterator for SubscriptionTimeoutIter<'_, T> {
445
    type Item = T;
446

447
    fn next(&mut self) -> Option<Self::Item> {
×
448
        self.subscription.next_timeout(self.timeout)
×
449
    }
450
}
451

452
/// Marker trait for subscriptions that share a channel based on message type
453
pub trait SharesChannel {}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc