• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wboayue / rust-ibapi / 17349946605

30 Aug 2025 11:47PM UTC coverage: 77.637% (+0.3%) from 77.37%
17349946605

push

github

web-flow
Add comprehensive integration testing infrastructure and fix async error propagation (#304)

- Added comprehensive MockGateway testing infrastructure for integration tests without live IB connections
- Fixed critical async error propagation bug affecting 18+ modules where subscription errors were silently dropped
- Expanded test coverage with 800+ lines of new integration tests for news, market data, and client functionality
- Fixed WSH async tests by implementing proper request channel handling
Formalized testing patterns with professional documentation

49 of 93 new or added lines in 11 files covered. (52.69%)

13 existing lines in 6 files now uncovered.

5440 of 7007 relevant lines covered (77.64%)

71.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

45.54
/src/subscriptions/async.rs
1
//! Asynchronous subscription implementation
2

3
use std::sync::atomic::{AtomicBool, Ordering};
4
use std::sync::Arc;
5

6
use log::{debug, warn};
7
use tokio::sync::mpsc;
8

9
use super::common::{process_decode_result, ProcessingResult};
10
use super::{ResponseContext, StreamDecoder};
11
use crate::messages::{OutgoingMessages, RequestMessage, ResponseMessage};
12
use crate::transport::{AsyncInternalSubscription, AsyncMessageBus};
13
use crate::Error;
14

15
// Type aliases to reduce complexity
16
type CancelFn = Box<dyn Fn(i32, Option<i32>, Option<&ResponseContext>) -> Result<RequestMessage, Error> + Send + Sync>;
17
type DecoderFn<T> = Arc<dyn Fn(i32, &mut ResponseMessage) -> Result<T, Error> + Send + Sync>;
18

19
/// Asynchronous subscription for streaming data
20
pub struct Subscription<T> {
21
    inner: SubscriptionInner<T>,
22
    /// Metadata for cancellation
23
    request_id: Option<i32>,
24
    order_id: Option<i32>,
25
    _message_type: Option<OutgoingMessages>,
26
    response_context: ResponseContext,
27
    cancelled: Arc<AtomicBool>,
28
    server_version: i32,
29
    message_bus: Option<Arc<dyn AsyncMessageBus>>,
30
    /// Cancel message generator
31
    cancel_fn: Option<Arc<CancelFn>>,
32
}
33

34
enum SubscriptionInner<T> {
35
    /// Subscription with decoder - receives ResponseMessage and decodes to T
36
    WithDecoder {
37
        subscription: AsyncInternalSubscription,
38
        decoder: DecoderFn<T>,
39
        server_version: i32,
40
    },
41
    /// Pre-decoded subscription - receives T directly
42
    PreDecoded { receiver: mpsc::UnboundedReceiver<Result<T, Error>> },
43
}
44

45
impl<T> Clone for SubscriptionInner<T> {
46
    fn clone(&self) -> Self {
×
47
        match self {
×
48
            SubscriptionInner::WithDecoder {
×
49
                subscription,
×
50
                decoder,
×
51
                server_version,
×
52
            } => SubscriptionInner::WithDecoder {
53
                subscription: subscription.clone(),
×
54
                decoder: decoder.clone(),
×
55
                server_version: *server_version,
×
56
            },
57
            SubscriptionInner::PreDecoded { .. } => {
×
58
                // Can't clone mpsc receivers
59
                panic!("Cannot clone pre-decoded subscriptions");
×
60
            }
61
        }
62
    }
63
}
64

65
impl<T> Clone for Subscription<T> {
66
    fn clone(&self) -> Self {
×
67
        Self {
68
            inner: self.inner.clone(),
×
69
            request_id: self.request_id,
×
70
            order_id: self.order_id,
×
71
            _message_type: self._message_type,
×
72
            response_context: self.response_context.clone(),
×
73
            cancelled: self.cancelled.clone(),
×
74
            server_version: self.server_version,
×
75
            message_bus: self.message_bus.clone(),
×
76
            cancel_fn: self.cancel_fn.clone(),
×
77
        }
78
    }
79
}
80

81
impl<T> Subscription<T> {
82
    /// Create a subscription from an internal subscription and a decoder
83
    #[allow(clippy::too_many_arguments)]
84
    pub fn with_decoder<D>(
102✔
85
        internal: AsyncInternalSubscription,
86
        server_version: i32,
87
        message_bus: Arc<dyn AsyncMessageBus>,
88
        decoder: D,
89
        request_id: Option<i32>,
90
        order_id: Option<i32>,
91
        message_type: Option<OutgoingMessages>,
92
        response_context: ResponseContext,
93
    ) -> Self
94
    where
95
        D: Fn(i32, &mut ResponseMessage) -> Result<T, Error> + Send + Sync + 'static,
96
    {
97
        Self {
98
            inner: SubscriptionInner::WithDecoder {
204✔
99
                subscription: internal,
100
                decoder: Arc::new(decoder),
101
                server_version,
102
            },
103
            request_id,
104
            order_id,
105
            _message_type: message_type,
106
            response_context,
107
            cancelled: Arc::new(AtomicBool::new(false)),
306✔
108
            server_version,
109
            message_bus: Some(message_bus),
102✔
110
            cancel_fn: None,
111
        }
112
    }
113

114
    /// Create a subscription from an internal subscription with a decoder function
115
    #[allow(clippy::too_many_arguments)]
116
    pub fn new_with_decoder<F>(
×
117
        internal: AsyncInternalSubscription,
118
        server_version: i32,
119
        message_bus: Arc<dyn AsyncMessageBus>,
120
        decoder: F,
121
        request_id: Option<i32>,
122
        order_id: Option<i32>,
123
        message_type: Option<OutgoingMessages>,
124
        response_context: ResponseContext,
125
    ) -> Self
126
    where
127
        F: Fn(i32, &mut ResponseMessage) -> Result<T, Error> + Send + Sync + 'static,
128
    {
129
        Self::with_decoder(
130
            internal,
×
131
            server_version,
×
132
            message_bus,
×
133
            decoder,
×
134
            request_id,
×
135
            order_id,
×
136
            message_type,
×
137
            response_context,
×
138
        )
139
    }
140

141
    /// Create a subscription from components and a decoder (alias for with_decoder)
142
    #[allow(clippy::too_many_arguments)]
143
    pub fn with_decoder_components<D>(
102✔
144
        internal: AsyncInternalSubscription,
145
        server_version: i32,
146
        message_bus: Arc<dyn AsyncMessageBus>,
147
        decoder: D,
148
        request_id: Option<i32>,
149
        order_id: Option<i32>,
150
        message_type: Option<OutgoingMessages>,
151
        response_context: ResponseContext,
152
    ) -> Self
153
    where
154
        D: Fn(i32, &mut ResponseMessage) -> Result<T, Error> + Send + Sync + 'static,
155
    {
156
        Self::with_decoder(
157
            internal,
102✔
158
            server_version,
102✔
159
            message_bus,
102✔
160
            decoder,
102✔
161
            request_id,
102✔
162
            order_id,
102✔
163
            message_type,
102✔
164
            response_context,
102✔
165
        )
166
    }
167

168
    /// Create a subscription from an internal subscription using the DataStream decoder
169
    pub(crate) fn new_from_internal<D>(
102✔
170
        internal: AsyncInternalSubscription,
171
        server_version: i32,
172
        message_bus: Arc<dyn AsyncMessageBus>,
173
        request_id: Option<i32>,
174
        order_id: Option<i32>,
175
        message_type: Option<OutgoingMessages>,
176
        response_context: ResponseContext,
177
    ) -> Self
178
    where
179
        D: StreamDecoder<T> + 'static,
180
        T: 'static,
181
    {
182
        let mut sub = Self::with_decoder_components(
183
            internal,
102✔
184
            server_version,
102✔
185
            message_bus,
102✔
186
            D::decode,
×
187
            request_id,
102✔
188
            order_id,
102✔
189
            message_type,
102✔
190
            response_context,
102✔
191
        );
192
        // Store the cancel function
193
        sub.cancel_fn = Some(Arc::new(Box::new(D::cancel_message)));
204✔
194
        sub
102✔
195
    }
196

197
    /// Create a subscription from internal subscription without explicit metadata
198
    pub(crate) fn new_from_internal_simple<D>(internal: AsyncInternalSubscription, server_version: i32, message_bus: Arc<dyn AsyncMessageBus>) -> Self
16✔
199
    where
200
        D: StreamDecoder<T> + 'static,
201
        T: 'static,
202
    {
203
        // The AsyncInternalSubscription already has cleanup logic, so we don't need cancel metadata
204
        Self::new_from_internal::<D>(internal, server_version, message_bus, None, None, None, ResponseContext::default())
128✔
205
    }
206

207
    /// Create subscription from existing receiver (for backward compatibility)
208
    pub fn new(receiver: mpsc::UnboundedReceiver<Result<T, Error>>) -> Self {
×
209
        // This creates a subscription that expects pre-decoded messages
210
        // Used for compatibility with existing code that manually decodes
211
        Self {
212
            inner: SubscriptionInner::PreDecoded { receiver },
×
213
            request_id: None,
214
            order_id: None,
215
            _message_type: None,
216
            response_context: ResponseContext::default(),
×
217
            cancelled: Arc::new(AtomicBool::new(false)),
×
218
            server_version: 0, // Default value for backward compatibility
219
            message_bus: None,
220
            cancel_fn: None,
221
        }
222
    }
223

224
    /// Get the next value from the subscription
225
    pub async fn next(&mut self) -> Option<Result<T, Error>>
177✔
226
    where
227
        T: 'static,
228
    {
229
        match &mut self.inner {
177✔
230
            SubscriptionInner::WithDecoder {
×
231
                subscription,
177✔
232
                decoder,
×
233
                server_version,
×
234
            } => loop {
×
235
                match subscription.next().await {
534✔
236
                    Some(Ok(mut message)) => {
172✔
UNCOV
237
                        let result = decoder(*server_version, &mut message);
×
UNCOV
238
                        match process_decode_result(result) {
×
239
                            ProcessingResult::Success(val) => return Some(Ok(val)),
162✔
240
                            ProcessingResult::EndOfStream => return None,
9✔
241
                            ProcessingResult::Retry => continue,
1✔
242
                            ProcessingResult::Error(err) => return Some(Err(err)),
×
243
                        }
244
                    }
NEW
245
                    Some(Err(e)) => return Some(Err(e)),
×
246
                    None => return None,
6✔
247
                }
248
            },
249
            SubscriptionInner::PreDecoded { receiver } => receiver.recv().await,
×
250
        }
251
    }
252
}
253

254
impl<T> Subscription<T> {
255
    /// Cancel the subscription
256
    pub async fn cancel(&self) {
10✔
257
        if self.cancelled.load(Ordering::Relaxed) {
10✔
258
            return;
×
259
        }
260

261
        self.cancelled.store(true, Ordering::Relaxed);
×
262

263
        if let (Some(message_bus), Some(cancel_fn)) = (&self.message_bus, &self.cancel_fn) {
5✔
264
            let id = self.request_id.or(self.order_id);
×
265
            if let Ok(message) = cancel_fn(self.server_version, id, Some(&self.response_context)) {
5✔
266
                if let Err(e) = message_bus.send_message(message).await {
×
267
                    warn!("error sending cancel message: {e}")
×
268
                }
269
            }
270
        }
271

272
        // The AsyncInternalSubscription's Drop will handle cleanup
273
    }
274
}
275

276
impl<T> Drop for Subscription<T> {
277
    fn drop(&mut self) {
102✔
278
        debug!("dropping async subscription");
102✔
279

280
        // Check if already cancelled
281
        if self.cancelled.load(Ordering::Relaxed) {
204✔
282
            return;
5✔
283
        }
284

285
        self.cancelled.store(true, Ordering::Relaxed);
×
286

287
        // Try to send cancel message if we have the necessary components
288
        if let (Some(message_bus), Some(cancel_fn)) = (&self.message_bus, &self.cancel_fn) {
97✔
289
            let message_bus = message_bus.clone();
×
290
            let id = self.request_id.or(self.order_id);
×
291
            let response_context = self.response_context.clone();
×
292
            let server_version = self.server_version;
×
293

294
            // Clone the cancel function for use in the spawned task
295
            if let Ok(message) = cancel_fn(server_version, id, Some(&response_context)) {
71✔
296
                // Spawn a task to send the cancel message since drop can't be async
297
                tokio::spawn(async move {
39✔
298
                    if let Err(e) = message_bus.send_message(message).await {
117✔
299
                        warn!("error sending cancel message in drop: {e}");
×
300
                    }
301
                });
302
            }
303
        }
304

305
        // The AsyncInternalSubscription's Drop will handle channel cleanup
306
    }
307
}
308

309
// Note: Stream trait implementation removed because tokio's broadcast::Receiver
310
// doesn't provide poll_recv. Users should use the async next() method instead.
311
// If Stream is needed, users can convert using futures::stream::unfold.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc