• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wboayue / rust-ibapi / 17258774832

27 Aug 2025 06:08AM UTC coverage: 77.508% (+0.4%) from 77.105%
17258774832

push

github

web-flow
Refactor Subscription to reduce coupling with Client (#298)

74 of 91 new or added lines in 12 files covered. (81.32%)

5 existing lines in 3 files now uncovered.

5369 of 6927 relevant lines covered (77.51%)

63.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.23
/src/subscriptions/sync.rs
1
//! Synchronous subscription implementation
2

3
use std::marker::PhantomData;
4
use std::sync::atomic::{AtomicBool, Ordering};
5
use std::sync::{Arc, Mutex};
6
use std::time::Duration;
7

8
use log::{debug, error, warn};
9

10
use super::common::{process_decode_result, should_retry_error, should_store_error, ProcessingResult};
11
use super::{ResponseContext, StreamDecoder};
12
use crate::errors::Error;
13
use crate::messages::{OutgoingMessages, ResponseMessage};
14
use crate::transport::{InternalSubscription, MessageBus};
15

16
/// A [Subscription] is a stream of responses returned from TWS. A [Subscription] is normally returned when invoking an API that can return more than one value.
17
///
18
/// You can convert subscriptions into blocking or non-blocking iterators using the [iter](Subscription::iter), [try_iter](Subscription::try_iter) or [timeout_iter](Subscription::timeout_iter) methods.
19
///
20
/// Alternatively, you may poll subscriptions in a blocking or non-blocking manner using the [next](Subscription::next), [try_next](Subscription::try_next) or [next_timeout](Subscription::next_timeout) methods.
21
#[allow(private_bounds)]
22
pub struct Subscription<T: StreamDecoder<T>> {
23
    server_version: i32,
24
    message_bus: Arc<dyn MessageBus>,
25
    request_id: Option<i32>,
26
    order_id: Option<i32>,
27
    message_type: Option<OutgoingMessages>,
28
    phantom: PhantomData<T>,
29
    cancelled: AtomicBool,
30
    snapshot_ended: AtomicBool,
31
    subscription: InternalSubscription,
32
    response_context: Option<ResponseContext>,
33
    error: Mutex<Option<Error>>,
34
}
35

36
#[allow(private_bounds)]
37
impl<T: StreamDecoder<T>> Subscription<T> {
38
    pub(crate) fn new(
94✔
39
        server_version: i32,
40
        message_bus: Arc<dyn MessageBus>,
41
        subscription: InternalSubscription,
42
        context: Option<ResponseContext>,
43
    ) -> Self {
44
        let request_id = subscription.request_id;
188✔
45
        let order_id = subscription.order_id;
188✔
46
        let message_type = subscription.message_type;
188✔
47

48
        Subscription {
49
            server_version,
50
            message_bus,
51
            request_id,
52
            order_id,
53
            message_type,
54
            subscription,
55
            response_context: context,
56
            phantom: PhantomData,
57
            cancelled: AtomicBool::new(false),
188✔
58
            snapshot_ended: AtomicBool::new(false),
188✔
59
            error: Mutex::new(None),
94✔
60
        }
61
    }
62

63
    /// Cancel the subscription
64
    pub fn cancel(&self) {
97✔
65
        // Only cancel if snapshot hasn't ended (for market data snapshots)
66
        // For streaming subscriptions, snapshot_ended will remain false
67
        if self.snapshot_ended.load(Ordering::Relaxed) {
291✔
68
            return;
×
69
        }
70

71
        if self.cancelled.load(Ordering::Relaxed) {
×
72
            return;
3✔
73
        }
74

75
        self.cancelled.store(true, Ordering::Relaxed);
×
76

77
        if let Some(request_id) = self.request_id {
73✔
78
            if let Ok(message) = T::cancel_message(self.server_version, self.request_id, self.response_context.as_ref()) {
62✔
NEW
79
                if let Err(e) = self.message_bus.cancel_subscription(request_id, &message) {
×
UNCOV
80
                    warn!("error cancelling subscription: {e}")
×
81
                }
82
                self.subscription.cancel();
×
83
            }
84
        } else if let Some(order_id) = self.order_id {
24✔
NEW
85
            if let Ok(message) = T::cancel_message(self.server_version, self.request_id, self.response_context.as_ref()) {
×
NEW
86
                if let Err(e) = self.message_bus.cancel_order_subscription(order_id, &message) {
×
UNCOV
87
                    warn!("error cancelling order subscription: {e}")
×
88
                }
89
                self.subscription.cancel();
×
90
            }
91
        } else if let Some(message_type) = self.message_type {
33✔
92
            if let Ok(message) = T::cancel_message(self.server_version, self.request_id, self.response_context.as_ref()) {
7✔
NEW
93
                if let Err(e) = self.message_bus.cancel_shared_subscription(message_type, &message) {
×
UNCOV
94
                    warn!("error cancelling shared subscription: {e}")
×
95
                }
96
                self.subscription.cancel();
×
97
            }
98
        } else {
99
            debug!("Could not determine cancel method")
3✔
100
        }
101
    }
102

103
    /// Returns the next available value, blocking if necessary until a value becomes available.
104
    ///
105
    /// # Examples
106
    ///
107
    /// ```no_run
108
    /// use ibapi::Client;
109
    /// use ibapi::contracts::Contract;
110
    ///
111
    /// let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed");
112
    ///
113
    /// let contract = Contract::stock("AAPL");
114
    /// let subscription = client.market_data(&contract, &["233"], false, false).expect("market data request failed");
115
    ///
116
    /// // Process data blocking until the next value is available
117
    /// while let Some(data) = subscription.next() {
118
    ///     println!("Received data: {data:?}");
119
    /// }
120
    ///
121
    /// // When the loop exits, check if it was due to an error
122
    /// if let Some(err) = subscription.error() {
123
    ///     eprintln!("subscription error: {err}");
124
    /// }
125
    /// ```
126
    /// # Returns
127
    /// * `Some(T)` - The next available item from the subscription
128
    /// * `None` - If the subscription has ended or encountered an error
129
    pub fn next(&self) -> Option<T> {
156✔
130
        match self.process_response(self.subscription.next()) {
468✔
131
            Some(val) => Some(val),
284✔
132
            None => match self.error() {
14✔
133
                Some(ref err) if should_retry_error(err) => {
12✔
134
                    debug!("retrying after error: {err:?}");
×
135
                    self.next()
×
136
                }
137
                _ => None,
14✔
138
            },
139
        }
140
    }
141

142
    /// Returns the current error state of the subscription.
143
    ///
144
    /// This method allows checking if an error occurred during subscription processing.
145
    /// Errors are stored internally when they occur during `next()`, `try_next()`, or `next_timeout()` calls.
146
    ///
147
    /// # Returns
148
    /// * `Some(Error)` - If an error has occurred
149
    /// * `None` - If no error has occurred
150
    pub fn error(&self) -> Option<Error> {
15✔
151
        let mut error = self.error.lock().unwrap();
60✔
152
        error.take()
15✔
153
    }
154

155
    fn clear_error(&self) {
162✔
156
        let mut error = self.error.lock().unwrap();
648✔
157
        *error = None;
324✔
158
    }
159

160
    fn process_response(&self, response: Option<Result<ResponseMessage, Error>>) -> Option<T> {
162✔
161
        self.clear_error();
324✔
162

163
        match response {
159✔
164
            Some(Ok(message)) => self.process_message(message),
155✔
165
            Some(Err(e)) => {
4✔
166
                if should_store_error(&e) {
12✔
167
                    let mut error = self.error.lock().unwrap();
20✔
168
                    *error = Some(e);
4✔
169
                }
170
                None
4✔
171
            }
172
            None => None,
3✔
173
        }
174
    }
175

176
    fn process_message(&self, mut message: ResponseMessage) -> Option<T> {
155✔
177
        match process_decode_result(T::decode(self.server_version, &mut message)) {
465✔
178
            ProcessingResult::Success(val) => {
148✔
179
                // Check if this decoded value represents the end of a snapshot subscription
180
                if val.is_snapshot_end() {
×
181
                    self.snapshot_ended.store(true, Ordering::Relaxed);
×
182
                }
183
                Some(val)
×
184
            }
185
            ProcessingResult::EndOfStream => None,
7✔
186
            ProcessingResult::Retry => {
×
187
                // This case shouldn't happen here since UnexpectedResponse is handled at the next() level
188
                // but we handle it for completeness
189
                None
×
190
            }
191
            ProcessingResult::Error(err) => {
×
192
                error!("error decoding message: {err}");
×
193
                let mut error = self.error.lock().unwrap();
×
194
                *error = Some(err);
×
195
                None
×
196
            }
197
        }
198
    }
199

200
    /// Tries to return the next available value without blocking.
201
    ///
202
    /// Returns immediately with:
203
    /// - `Some(value)` if a value is available
204
    /// - `None` if no data is currently available
205
    ///
206
    /// Use this method when you want to poll for data without blocking.
207
    /// Check `error()` to determine if `None` was returned due to an error.
208
    ///
209
    /// # Examples
210
    ///
211
    /// ```no_run
212
    /// use ibapi::Client;
213
    /// use ibapi::contracts::Contract;
214
    /// use std::thread;
215
    /// use std::time::Duration;
216
    ///
217
    /// let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed");
218
    ///
219
    /// let contract = Contract::stock("AAPL");
220
    /// let subscription = client.market_data(&contract, &["233"], false, false).expect("market data request failed");
221
    ///
222
    /// // Poll for data without blocking
223
    /// loop {
224
    ///     if let Some(data) = subscription.try_next() {
225
    ///         println!("{data:?}");
226
    ///     } else if let Some(err) = subscription.error() {
227
    ///         eprintln!("Error: {err}");
228
    ///         break;
229
    ///     } else {
230
    ///         // No data available, do other work or sleep
231
    ///         thread::sleep(Duration::from_millis(100));
232
    ///     }
233
    /// }
234
    /// ```
235
    pub fn try_next(&self) -> Option<T> {
×
236
        self.process_response(self.subscription.try_next())
×
237
    }
238

239
    /// Waits for the next available value up to the specified timeout duration.
240
    ///
241
    /// Returns:
242
    /// - `Some(value)` if a value becomes available within the timeout
243
    /// - `None` if the timeout expires before data becomes available
244
    ///
245
    /// Check `error()` to determine if `None` was returned due to an error.
246
    ///
247
    /// # Examples
248
    ///
249
    /// ```no_run
250
    /// use ibapi::Client;
251
    /// use ibapi::contracts::Contract;
252
    /// use std::time::Duration;
253
    ///
254
    /// let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed");
255
    ///
256
    /// let contract = Contract::stock("AAPL");
257
    /// let subscription = client.market_data(&contract, &["233"], false, false).expect("market data request failed");
258
    ///
259
    /// // Wait up to 5 seconds for data
260
    /// if let Some(data) = subscription.next_timeout(Duration::from_secs(5)) {
261
    ///     println!("{data:?}");
262
    /// } else if let Some(err) = subscription.error() {
263
    ///     eprintln!("Error: {err}");
264
    /// } else {
265
    ///     eprintln!("Timeout: no data received within 5 seconds");
266
    /// }
267
    /// ```
268
    pub fn next_timeout(&self, timeout: Duration) -> Option<T> {
6✔
269
        self.process_response(self.subscription.next_timeout(timeout))
30✔
270
    }
271

272
    /// Creates a blocking iterator over the subscription data.
273
    ///
274
    /// The iterator will block waiting for the next value if none is immediately available.
275
    /// The iterator ends when the subscription is cancelled or an unrecoverable error occurs.
276
    ///
277
    /// # Examples
278
    ///
279
    /// ```no_run
280
    /// use ibapi::Client;
281
    ///
282
    /// let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed");
283
    ///
284
    /// let subscription = client.positions().expect("positions request failed");
285
    ///
286
    /// // Process all positions as they arrive
287
    /// for position in subscription.iter() {
288
    ///     println!("{position:?}");
289
    /// }
290
    ///
291
    /// // Check if iteration ended due to an error
292
    /// if let Some(err) = subscription.error() {
293
    ///     eprintln!("Subscription error: {err}");
294
    /// }
295
    /// ```
296
    pub fn iter(&self) -> SubscriptionIter<'_, T> {
9✔
297
        SubscriptionIter { subscription: self }
298
    }
299

300
    /// Creates a non-blocking iterator over the subscription data.
301
    ///
302
    /// The iterator will return immediately with `None` if no data is available.
303
    /// Use this when you want to process available data without blocking.
304
    ///
305
    /// # Examples
306
    ///
307
    /// ```no_run
308
    /// use ibapi::Client;
309
    /// use std::thread;
310
    /// use std::time::Duration;
311
    ///
312
    /// let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed");
313
    ///
314
    /// let subscription = client.positions().expect("positions request failed");
315
    ///
316
    /// // Process available positions without blocking
317
    /// loop {
318
    ///     let mut data_received = false;
319
    ///     for position in subscription.try_iter() {
320
    ///         data_received = true;
321
    ///         println!("{position:?}");
322
    ///     }
323
    ///     
324
    ///     if let Some(err) = subscription.error() {
325
    ///         eprintln!("Error: {err}");
326
    ///         break;
327
    ///     }
328
    ///     
329
    ///     if !data_received {
330
    ///         // No data available, do other work or sleep
331
    ///         thread::sleep(Duration::from_millis(100));
332
    ///     }
333
    /// }
334
    /// ```
335
    pub fn try_iter(&self) -> SubscriptionTryIter<'_, T> {
×
336
        SubscriptionTryIter { subscription: self }
337
    }
338

339
    /// Creates an iterator that waits up to the specified timeout for each value.
340
    ///
341
    /// The iterator will wait up to `timeout` duration for each value.
342
    /// If the timeout expires, the iterator ends.
343
    ///
344
    /// # Examples
345
    ///
346
    /// ```no_run
347
    /// use ibapi::Client;
348
    /// use std::time::Duration;
349
    ///
350
    /// let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed");
351
    ///
352
    /// let subscription = client.positions().expect("positions request failed");
353
    ///
354
    /// // Process positions with a 5 second timeout per item
355
    /// for position in subscription.timeout_iter(Duration::from_secs(5)) {
356
    ///     println!("{position:?}");
357
    /// }
358
    ///
359
    /// if let Some(err) = subscription.error() {
360
    ///     eprintln!("Error: {err}");
361
    /// } else {
362
    ///     println!("No more positions received within timeout");
363
    /// }
364
    /// ```
365
    pub fn timeout_iter(&self, timeout: Duration) -> SubscriptionTimeoutIter<'_, T> {
×
366
        SubscriptionTimeoutIter { subscription: self, timeout }
367
    }
368
}
369

370
impl<T: StreamDecoder<T>> Drop for Subscription<T> {
371
    /// Cancel subscription on drop
372
    fn drop(&mut self) {
94✔
373
        debug!("dropping subscription");
94✔
374
        self.cancel();
188✔
375
    }
376
}
377

378
/// An iterator that yields items as they become available, blocking if necessary.
379
#[allow(private_bounds)]
380
pub struct SubscriptionIter<'a, T: StreamDecoder<T>> {
381
    subscription: &'a Subscription<T>,
382
}
383

384
impl<T: StreamDecoder<T>> Iterator for SubscriptionIter<'_, T> {
385
    type Item = T;
386

387
    fn next(&mut self) -> Option<Self::Item> {
18✔
388
        self.subscription.next()
36✔
389
    }
390
}
391

392
impl<'a, T: StreamDecoder<T>> IntoIterator for &'a Subscription<T> {
393
    type Item = T;
394
    type IntoIter = SubscriptionIter<'a, T>;
395

396
    fn into_iter(self) -> Self::IntoIter {
×
397
        self.iter()
×
398
    }
399
}
400

401
/// An iterator that takes ownership and yields items as they become available, blocking if necessary.
402
#[allow(private_bounds)]
403
pub struct SubscriptionOwnedIter<T: StreamDecoder<T>> {
404
    subscription: Subscription<T>,
405
}
406

407
impl<T: StreamDecoder<T>> Iterator for SubscriptionOwnedIter<T> {
408
    type Item = T;
409

410
    fn next(&mut self) -> Option<Self::Item> {
91✔
411
        self.subscription.next()
182✔
412
    }
413
}
414

415
impl<T: StreamDecoder<T>> IntoIterator for Subscription<T> {
416
    type Item = T;
417
    type IntoIter = SubscriptionOwnedIter<T>;
418

419
    fn into_iter(self) -> Self::IntoIter {
25✔
420
        SubscriptionOwnedIter { subscription: self }
421
    }
422
}
423

424
/// An iterator that yields items as they become available without blocking.
425
#[allow(private_bounds)]
426
pub struct SubscriptionTryIter<'a, T: StreamDecoder<T>> {
427
    subscription: &'a Subscription<T>,
428
}
429

430
impl<T: StreamDecoder<T>> Iterator for SubscriptionTryIter<'_, T> {
431
    type Item = T;
432

433
    fn next(&mut self) -> Option<Self::Item> {
×
434
        self.subscription.try_next()
×
435
    }
436
}
437

438
/// An iterator that yields items with a timeout.
439
#[allow(private_bounds)]
440
pub struct SubscriptionTimeoutIter<'a, T: StreamDecoder<T>> {
441
    subscription: &'a Subscription<T>,
442
    timeout: Duration,
443
}
444

445
impl<T: StreamDecoder<T>> Iterator for SubscriptionTimeoutIter<'_, T> {
446
    type Item = T;
447

448
    fn next(&mut self) -> Option<Self::Item> {
×
449
        self.subscription.next_timeout(self.timeout)
×
450
    }
451
}
452

453
/// Marker trait for subscriptions that share a channel based on message type
454
pub trait SharesChannel {}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc