• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wboayue / rust-ibapi / 21086366252

17 Jan 2026 01:43AM UTC coverage: 86.679% (+0.03%) from 86.651%
21086366252

push

github

web-flow
fix: prevent scanner infinite hang on error messages (#371)

* fix: prevent scanner infinite hang on error messages (#370)

- Add explicit handling for IncomingMessages::Error in scanner decode
  functions (async and sync) to return Error::Message instead of
  Error::UnexpectedResponse
- Add bounded retry logic (max 10 attempts) to subscription next()
  methods to prevent infinite loops from unexpected responses
- Log warnings on retry attempts and errors when max retries exceeded

Fixes #370

* refactor: extract shared scanner decode function

Move scanner message type matching logic to decode_scanner_message()
in the common decoders module to avoid duplication between async and
sync implementations.

* refactor: extract retry decision logic to common module

Move retry checking and logging to check_retry() function in the
common module. This consolidates the retry decision logic that was
duplicated between async and sync subscription implementations.

- Add RetryDecision enum to represent continue/stop decisions
- Add check_retry() function that handles counting and logging
- Update async and sync subscriptions to use the shared function
- Add test for check_retry()

* style: fix variable naming and format code

Remove underscore prefix from 'err' variable since it is actually used
in the should_retry_error() call.

29 of 42 new or added lines in 6 files covered. (69.05%)

4 existing lines in 1 file now uncovered.

8700 of 10037 relevant lines covered (86.68%)

119.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.83
/src/subscriptions/sync.rs
1
//! Synchronous subscription implementation
2

3
use std::marker::PhantomData;
4
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5
use std::sync::{Arc, Mutex};
6
use std::time::Duration;
7

8
use log::{debug, error, warn};
9

10
use super::common::{check_retry, process_decode_result, should_retry_error, should_store_error, ProcessingResult, RetryDecision};
11
use super::{ResponseContext, StreamDecoder};
12
use crate::errors::Error;
13
use crate::messages::{OutgoingMessages, ResponseMessage};
14
use crate::transport::{InternalSubscription, MessageBus};
15

16
/// A [Subscription] is a stream of responses returned from TWS. A [Subscription] is normally returned when invoking an API that can return more than one value.
17
///
18
/// You can convert subscriptions into blocking or non-blocking iterators using the [iter](Subscription::iter), [try_iter](Subscription::try_iter) or [timeout_iter](Subscription::timeout_iter) methods.
19
///
20
/// Alternatively, you may poll subscriptions in a blocking or non-blocking manner using the [next](Subscription::next), [try_next](Subscription::try_next) or [next_timeout](Subscription::next_timeout) methods.
21
#[allow(private_bounds)]
22
pub struct Subscription<T: StreamDecoder<T>> {
23
    server_version: i32,
24
    message_bus: Arc<dyn MessageBus>,
25
    request_id: Option<i32>,
26
    order_id: Option<i32>,
27
    message_type: Option<OutgoingMessages>,
28
    phantom: PhantomData<T>,
29
    cancelled: AtomicBool,
30
    snapshot_ended: AtomicBool,
31
    subscription: InternalSubscription,
32
    response_context: Option<ResponseContext>,
33
    error: Mutex<Option<Error>>,
34
    retry_count: AtomicUsize,
35
}
36

37
#[allow(private_bounds)]
38
impl<T: StreamDecoder<T>> Subscription<T> {
39
    pub(crate) fn new(
106✔
40
        server_version: i32,
41
        message_bus: Arc<dyn MessageBus>,
42
        subscription: InternalSubscription,
43
        context: Option<ResponseContext>,
44
    ) -> Self {
45
        let request_id = subscription.request_id;
212✔
46
        let order_id = subscription.order_id;
212✔
47
        let message_type = subscription.message_type;
212✔
48

49
        Subscription {
50
            server_version,
51
            message_bus,
52
            request_id,
53
            order_id,
54
            message_type,
55
            subscription,
56
            response_context: context,
57
            phantom: PhantomData,
58
            cancelled: AtomicBool::new(false),
212✔
59
            snapshot_ended: AtomicBool::new(false),
212✔
60
            error: Mutex::new(None),
212✔
61
            retry_count: AtomicUsize::new(0),
106✔
62
        }
63
    }
64

65
    /// Cancel the subscription
66
    pub fn cancel(&self) {
109✔
67
        // Only cancel if snapshot hasn't ended (for market data snapshots)
68
        // For streaming subscriptions, snapshot_ended will remain false
69
        if self.snapshot_ended.load(Ordering::Relaxed) {
327✔
70
            return;
1✔
71
        }
72

73
        if self.cancelled.load(Ordering::Relaxed) {
324✔
74
            return;
3✔
75
        }
76

77
        self.cancelled.store(true, Ordering::Relaxed);
315✔
78

79
        if let Some(request_id) = self.request_id {
188✔
80
            if let Ok(message) = T::cancel_message(self.server_version, self.request_id, self.response_context.as_ref()) {
401✔
81
                if let Err(e) = self.message_bus.cancel_subscription(request_id, &message) {
207✔
82
                    warn!("error cancelling subscription: {e}")
×
83
                }
84
                self.subscription.cancel();
138✔
85
            }
86
        } else if let Some(order_id) = self.order_id {
25✔
87
            if let Ok(message) = T::cancel_message(self.server_version, self.request_id, self.response_context.as_ref()) {
12✔
88
                if let Err(e) = self.message_bus.cancel_order_subscription(order_id, &message) {
×
89
                    warn!("error cancelling order subscription: {e}")
×
90
                }
91
                self.subscription.cancel();
×
92
            }
93
        } else if let Some(message_type) = self.message_type {
35✔
94
            if let Ok(message) = T::cancel_message(self.server_version, self.request_id, self.response_context.as_ref()) {
72✔
95
                if let Err(e) = self.message_bus.cancel_shared_subscription(message_type, &message) {
24✔
96
                    warn!("error cancelling shared subscription: {e}")
×
97
                }
98
                self.subscription.cancel();
16✔
99
            }
100
        } else {
101
            debug!("Could not determine cancel method")
3✔
102
        }
103
    }
104

105
    /// Returns the next available value, blocking if necessary until a value becomes available.
106
    ///
107
    /// # Examples
108
    ///
109
    /// ```no_run
110
    /// use ibapi::client::blocking::Client;
111
    /// use ibapi::contracts::Contract;
112
    ///
113
    /// let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed");
114
    ///
115
    /// let contract = Contract::stock("AAPL").build();
116
    /// let subscription = client.market_data(&contract)
117
    ///     .generic_ticks(&["233"])
118
    ///     .subscribe()
119
    ///     .expect("market data request failed");
120
    ///
121
    /// // Process data blocking until the next value is available
122
    /// while let Some(data) = subscription.next() {
123
    ///     println!("Received data: {data:?}");
124
    /// }
125
    ///
126
    /// // When the loop exits, check if it was due to an error
127
    /// if let Some(err) = subscription.error() {
128
    ///     eprintln!("subscription error: {err}");
129
    /// }
130
    /// ```
131
    /// # Returns
132
    /// * `Some(T)` - The next available item from the subscription
133
    /// * `None` - If the subscription has ended or encountered an error
134
    pub fn next(&self) -> Option<T> {
167✔
135
        match self.process_response(self.subscription.next()) {
501✔
136
            Some(val) => {
154✔
137
                self.retry_count.store(0, Ordering::Relaxed);
462✔
138
                Some(val)
154✔
139
            }
140
            None => match self.error() {
26✔
141
                Some(ref err) if should_retry_error(err) => {
9✔
NEW
142
                    let retries = self.retry_count.fetch_add(1, Ordering::Relaxed);
×
NEW
143
                    if check_retry(retries) == RetryDecision::Continue {
×
NEW
144
                        self.next()
×
145
                    } else {
NEW
146
                        self.retry_count.store(0, Ordering::Relaxed);
×
NEW
147
                        None
×
148
                    }
149
                }
NEW
150
                _ => {
×
151
                    self.retry_count.store(0, Ordering::Relaxed);
39✔
152
                    None
13✔
153
                }
154
            },
155
        }
156
    }
157

158
    /// Returns the current error state of the subscription.
159
    ///
160
    /// This method allows checking if an error occurred during subscription processing.
161
    /// Errors are stored internally when they occur during `next()`, `try_next()`, or `next_timeout()` calls.
162
    ///
163
    /// # Returns
164
    /// * `Some(Error)` - If an error has occurred
165
    /// * `None` - If no error has occurred
166
    pub fn error(&self) -> Option<Error> {
14✔
167
        let mut error = self.error.lock().unwrap();
56✔
168
        error.take()
14✔
169
    }
170

171
    fn clear_error(&self) {
173✔
172
        let mut error = self.error.lock().unwrap();
692✔
173
        *error = None;
346✔
174
    }
175

176
    fn process_response(&self, response: Option<Result<ResponseMessage, Error>>) -> Option<T> {
173✔
177
        self.clear_error();
346✔
178

179
        match response {
170✔
180
            Some(Ok(message)) => self.process_message(message),
668✔
181
            Some(Err(e)) => {
3✔
182
                if should_store_error(&e) {
9✔
183
                    let mut error = self.error.lock().unwrap();
15✔
184
                    *error = Some(e);
3✔
185
                }
186
                None
3✔
187
            }
188
            None => None,
3✔
189
        }
190
    }
191

192
    fn process_message(&self, mut message: ResponseMessage) -> Option<T> {
167✔
193
        match process_decode_result(T::decode(self.server_version, &mut message)) {
501✔
194
            ProcessingResult::Success(val) => {
160✔
195
                // Check if this decoded value represents the end of a snapshot subscription
196
                if val.is_snapshot_end() {
321✔
197
                    self.snapshot_ended.store(true, Ordering::Relaxed);
2✔
198
                }
199
                Some(val)
160✔
200
            }
201
            ProcessingResult::EndOfStream => None,
7✔
202
            ProcessingResult::Retry => {
×
203
                // This case shouldn't happen here since UnexpectedResponse is handled at the next() level
204
                // but we handle it for completeness
205
                None
×
206
            }
207
            ProcessingResult::Error(err) => {
×
208
                error!("error decoding message: {err}");
×
209
                let mut error = self.error.lock().unwrap();
×
210
                *error = Some(err);
×
211
                None
×
212
            }
213
        }
214
    }
215

216
    /// Tries to return the next available value without blocking.
217
    ///
218
    /// Returns immediately with:
219
    /// - `Some(value)` if a value is available
220
    /// - `None` if no data is currently available
221
    ///
222
    /// Use this method when you want to poll for data without blocking.
223
    /// Check `error()` to determine if `None` was returned due to an error.
224
    ///
225
    /// # Examples
226
    ///
227
    /// ```no_run
228
    /// use ibapi::client::blocking::Client;
229
    /// use ibapi::contracts::Contract;
230
    /// use std::thread;
231
    /// use std::time::Duration;
232
    ///
233
    /// let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed");
234
    ///
235
    /// let contract = Contract::stock("AAPL").build();
236
    /// let subscription = client.market_data(&contract)
237
    ///     .generic_ticks(&["233"])
238
    ///     .subscribe()
239
    ///     .expect("market data request failed");
240
    ///
241
    /// // Poll for data without blocking
242
    /// loop {
243
    ///     if let Some(data) = subscription.try_next() {
244
    ///         println!("{data:?}");
245
    ///     } else if let Some(err) = subscription.error() {
246
    ///         eprintln!("Error: {err}");
247
    ///         break;
248
    ///     } else {
249
    ///         // No data available, do other work or sleep
250
    ///         thread::sleep(Duration::from_millis(100));
251
    ///     }
252
    /// }
253
    /// ```
254
    pub fn try_next(&self) -> Option<T> {
×
255
        self.process_response(self.subscription.try_next())
×
256
    }
257

258
    /// Waits for the next available value up to the specified timeout duration.
259
    ///
260
    /// Returns:
261
    /// - `Some(value)` if a value becomes available within the timeout
262
    /// - `None` if the timeout expires before data becomes available
263
    ///
264
    /// Check `error()` to determine if `None` was returned due to an error.
265
    ///
266
    /// # Examples
267
    ///
268
    /// ```no_run
269
    /// use ibapi::client::blocking::Client;
270
    /// use ibapi::contracts::Contract;
271
    /// use std::time::Duration;
272
    ///
273
    /// let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed");
274
    ///
275
    /// let contract = Contract::stock("AAPL").build();
276
    /// let subscription = client.market_data(&contract)
277
    ///     .generic_ticks(&["233"])
278
    ///     .subscribe()
279
    ///     .expect("market data request failed");
280
    ///
281
    /// // Wait up to 5 seconds for data
282
    /// if let Some(data) = subscription.next_timeout(Duration::from_secs(5)) {
283
    ///     println!("{data:?}");
284
    /// } else if let Some(err) = subscription.error() {
285
    ///     eprintln!("Error: {err}");
286
    /// } else {
287
    ///     eprintln!("Timeout: no data received within 5 seconds");
288
    /// }
289
    /// ```
290
    pub fn next_timeout(&self, timeout: Duration) -> Option<T> {
6✔
291
        self.process_response(self.subscription.next_timeout(timeout))
30✔
292
    }
293

294
    /// Creates a blocking iterator over the subscription data.
295
    ///
296
    /// The iterator will block waiting for the next value if none is immediately available.
297
    /// The iterator ends when the subscription is cancelled or an unrecoverable error occurs.
298
    ///
299
    /// # Examples
300
    ///
301
    /// ```no_run
302
    /// use ibapi::client::blocking::Client;
303
    ///
304
    /// let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed");
305
    ///
306
    /// let subscription = client.positions().expect("positions request failed");
307
    ///
308
    /// // Process all positions as they arrive
309
    /// for position in subscription.iter() {
310
    ///     println!("{position:?}");
311
    /// }
312
    ///
313
    /// // Check if iteration ended due to an error
314
    /// if let Some(err) = subscription.error() {
315
    ///     eprintln!("Subscription error: {err}");
316
    /// }
317
    /// ```
318
    pub fn iter(&self) -> SubscriptionIter<'_, T> {
9✔
319
        SubscriptionIter { subscription: self }
320
    }
321

322
    /// Creates a non-blocking iterator over the subscription data.
323
    ///
324
    /// The iterator will return immediately with `None` if no data is available.
325
    /// Use this when you want to process available data without blocking.
326
    ///
327
    /// # Examples
328
    ///
329
    /// ```no_run
330
    /// use ibapi::client::blocking::Client;
331
    /// use std::thread;
332
    /// use std::time::Duration;
333
    ///
334
    /// let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed");
335
    ///
336
    /// let subscription = client.positions().expect("positions request failed");
337
    ///
338
    /// // Process available positions without blocking
339
    /// loop {
340
    ///     let mut data_received = false;
341
    ///     for position in subscription.try_iter() {
342
    ///         data_received = true;
343
    ///         println!("{position:?}");
344
    ///     }
345
    ///     
346
    ///     if let Some(err) = subscription.error() {
347
    ///         eprintln!("Error: {err}");
348
    ///         break;
349
    ///     }
350
    ///     
351
    ///     if !data_received {
352
    ///         // No data available, do other work or sleep
353
    ///         thread::sleep(Duration::from_millis(100));
354
    ///     }
355
    /// }
356
    /// ```
357
    pub fn try_iter(&self) -> SubscriptionTryIter<'_, T> {
×
358
        SubscriptionTryIter { subscription: self }
359
    }
360

361
    /// Creates an iterator that waits up to the specified timeout for each value.
362
    ///
363
    /// The iterator will wait up to `timeout` duration for each value.
364
    /// If the timeout expires, the iterator ends.
365
    ///
366
    /// # Examples
367
    ///
368
    /// ```no_run
369
    /// use ibapi::client::blocking::Client;
370
    /// use std::time::Duration;
371
    ///
372
    /// let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed");
373
    ///
374
    /// let subscription = client.positions().expect("positions request failed");
375
    ///
376
    /// // Process positions with a 5 second timeout per item
377
    /// for position in subscription.timeout_iter(Duration::from_secs(5)) {
378
    ///     println!("{position:?}");
379
    /// }
380
    ///
381
    /// if let Some(err) = subscription.error() {
382
    ///     eprintln!("Error: {err}");
383
    /// } else {
384
    ///     println!("No more positions received within timeout");
385
    /// }
386
    /// ```
387
    pub fn timeout_iter(&self, timeout: Duration) -> SubscriptionTimeoutIter<'_, T> {
×
388
        SubscriptionTimeoutIter { subscription: self, timeout }
389
    }
390
}
391

392
impl<T: StreamDecoder<T>> Drop for Subscription<T> {
393
    /// Cancel subscription on drop
394
    fn drop(&mut self) {
106✔
395
        debug!("dropping subscription");
106✔
396
        self.cancel();
212✔
397
    }
398
}
399

400
/// An iterator that yields items as they become available, blocking if necessary.
401
#[allow(private_bounds)]
402
pub struct SubscriptionIter<'a, T: StreamDecoder<T>> {
403
    subscription: &'a Subscription<T>,
404
}
405

406
impl<T: StreamDecoder<T>> Iterator for SubscriptionIter<'_, T> {
407
    type Item = T;
408

409
    fn next(&mut self) -> Option<Self::Item> {
18✔
410
        self.subscription.next()
36✔
411
    }
412
}
413

414
impl<'a, T: StreamDecoder<T>> IntoIterator for &'a Subscription<T> {
415
    type Item = T;
416
    type IntoIter = SubscriptionIter<'a, T>;
417

418
    fn into_iter(self) -> Self::IntoIter {
×
419
        self.iter()
×
420
    }
421
}
422

423
/// An iterator that takes ownership and yields items as they become available, blocking if necessary.
424
#[allow(private_bounds)]
425
pub struct SubscriptionOwnedIter<T: StreamDecoder<T>> {
426
    subscription: Subscription<T>,
427
}
428

429
impl<T: StreamDecoder<T>> Iterator for SubscriptionOwnedIter<T> {
430
    type Item = T;
431

432
    fn next(&mut self) -> Option<Self::Item> {
102✔
433
        self.subscription.next()
204✔
434
    }
435
}
436

437
impl<T: StreamDecoder<T>> IntoIterator for Subscription<T> {
438
    type Item = T;
439
    type IntoIter = SubscriptionOwnedIter<T>;
440

441
    fn into_iter(self) -> Self::IntoIter {
31✔
442
        SubscriptionOwnedIter { subscription: self }
443
    }
444
}
445

446
/// An iterator that yields items as they become available without blocking.
447
#[allow(private_bounds)]
448
pub struct SubscriptionTryIter<'a, T: StreamDecoder<T>> {
449
    subscription: &'a Subscription<T>,
450
}
451

452
impl<T: StreamDecoder<T>> Iterator for SubscriptionTryIter<'_, T> {
453
    type Item = T;
454

455
    fn next(&mut self) -> Option<Self::Item> {
×
456
        self.subscription.try_next()
×
457
    }
458
}
459

460
/// An iterator that yields items with a timeout.
461
#[allow(private_bounds)]
462
pub struct SubscriptionTimeoutIter<'a, T: StreamDecoder<T>> {
463
    subscription: &'a Subscription<T>,
464
    timeout: Duration,
465
}
466

467
impl<T: StreamDecoder<T>> Iterator for SubscriptionTimeoutIter<'_, T> {
468
    type Item = T;
469

470
    fn next(&mut self) -> Option<Self::Item> {
×
471
        self.subscription.next_timeout(self.timeout)
×
472
    }
473
}
474

475
/// Marker trait for subscriptions that share a channel based on message type
476
pub trait SharesChannel {}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc