• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wboayue / rust-ibapi / 17230381016

26 Aug 2025 06:52AM UTC coverage: 77.105% (-0.3%) from 77.389%
17230381016

push

github

web-flow
Remove Clone trait from async Client to prevent shared ownership issues (#296)

31 of 84 new or added lines in 5 files covered. (36.9%)

5 existing lines in 1 file now uncovered.

5358 of 6949 relevant lines covered (77.1%)

63.02 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

43.33
/src/subscriptions/async.rs
1
//! Asynchronous subscription implementation
2

3
use std::sync::atomic::{AtomicBool, Ordering};
4
use std::sync::Arc;
5

6
use log::{debug, warn};
7
use tokio::sync::mpsc;
8

9
use super::common::{process_decode_result, ProcessingResult};
10
use super::{ResponseContext, StreamDecoder};
11
use crate::client::r#async::Client;
12
use crate::messages::{OutgoingMessages, RequestMessage, ResponseMessage};
13
use crate::transport::{AsyncInternalSubscription, AsyncMessageBus};
14
use crate::Error;
15

16
// Type aliases to reduce complexity
17
type CancelFn = Box<dyn Fn(i32, Option<i32>, Option<&ResponseContext>) -> Result<RequestMessage, Error> + Send + Sync>;
18
type DecoderFn<T> = Arc<dyn Fn(i32, &mut ResponseMessage) -> Result<T, Error> + Send + Sync>;
19

20
/// Asynchronous subscription for streaming data
21
pub struct Subscription<T> {
22
    inner: SubscriptionInner<T>,
23
    /// Metadata for cancellation
24
    request_id: Option<i32>,
25
    order_id: Option<i32>,
26
    _message_type: Option<OutgoingMessages>,
27
    response_context: ResponseContext,
28
    cancelled: Arc<AtomicBool>,
29
    server_version: i32,
30
    message_bus: Option<Arc<dyn AsyncMessageBus>>,
31
    /// Cancel message generator
32
    cancel_fn: Option<Arc<CancelFn>>,
33
}
34

35
enum SubscriptionInner<T> {
36
    /// Subscription with decoder - receives ResponseMessage and decodes to T
37
    WithDecoder {
38
        subscription: AsyncInternalSubscription,
39
        decoder: DecoderFn<T>,
40
        server_version: i32,
41
    },
42
    /// Pre-decoded subscription - receives T directly
43
    PreDecoded { receiver: mpsc::UnboundedReceiver<Result<T, Error>> },
44
}
45

46
impl<T> Clone for SubscriptionInner<T> {
47
    fn clone(&self) -> Self {
×
48
        match self {
×
49
            SubscriptionInner::WithDecoder {
×
50
                subscription,
×
51
                decoder,
×
NEW
52
                server_version,
×
53
            } => SubscriptionInner::WithDecoder {
54
                subscription: subscription.clone(),
×
55
                decoder: decoder.clone(),
×
NEW
56
                server_version: *server_version,
×
57
            },
58
            SubscriptionInner::PreDecoded { .. } => {
×
59
                // Can't clone mpsc receivers
60
                panic!("Cannot clone pre-decoded subscriptions");
×
61
            }
62
        }
63
    }
64
}
65

66
impl<T> Clone for Subscription<T> {
67
    fn clone(&self) -> Self {
×
68
        Self {
69
            inner: self.inner.clone(),
×
70
            request_id: self.request_id,
×
71
            order_id: self.order_id,
×
72
            _message_type: self._message_type,
×
73
            response_context: self.response_context.clone(),
×
74
            cancelled: self.cancelled.clone(),
×
NEW
75
            server_version: self.server_version,
×
NEW
76
            message_bus: self.message_bus.clone(),
×
UNCOV
77
            cancel_fn: self.cancel_fn.clone(),
×
78
        }
79
    }
80
}
81

82
impl<T> Subscription<T> {
83
    /// Create a subscription from an internal subscription and a decoder
UNCOV
84
    pub fn with_decoder<D>(
×
85
        internal: AsyncInternalSubscription,
86
        client: Arc<Client>,
87
        decoder: D,
88
        request_id: Option<i32>,
89
        order_id: Option<i32>,
90
        message_type: Option<OutgoingMessages>,
91
        response_context: ResponseContext,
92
    ) -> Self
93
    where
94
        D: Fn(i32, &mut ResponseMessage) -> Result<T, Error> + Send + Sync + 'static,
95
    {
NEW
96
        let server_version = client.server_version();
×
NEW
97
        let message_bus = client.message_bus.clone();
×
98

99
        Self {
UNCOV
100
            inner: SubscriptionInner::WithDecoder {
×
101
                subscription: internal,
102
                decoder: Arc::new(decoder),
103
                server_version,
104
            },
105
            request_id,
106
            order_id,
107
            _message_type: message_type,
108
            response_context,
UNCOV
109
            cancelled: Arc::new(AtomicBool::new(false)),
×
110
            server_version,
NEW
111
            message_bus: Some(message_bus),
×
112
            cancel_fn: None,
113
        }
114
    }
115

116
    /// Create a subscription from an internal subscription with a decoder function
117
    pub fn new_with_decoder<F>(
×
118
        internal: AsyncInternalSubscription,
119
        client: Arc<Client>,
120
        decoder: F,
121
        request_id: Option<i32>,
122
        order_id: Option<i32>,
123
        message_type: Option<OutgoingMessages>,
124
        response_context: ResponseContext,
125
    ) -> Self
126
    where
127
        F: Fn(i32, &mut ResponseMessage) -> Result<T, Error> + Send + Sync + 'static,
128
    {
129
        Self::with_decoder(internal, client, decoder, request_id, order_id, message_type, response_context)
×
130
    }
131

132
    /// Create a subscription from components and a decoder
133
    #[allow(clippy::too_many_arguments)]
134
    pub fn with_decoder_components<D>(
96✔
135
        internal: AsyncInternalSubscription,
136
        server_version: i32,
137
        message_bus: Arc<dyn AsyncMessageBus>,
138
        decoder: D,
139
        request_id: Option<i32>,
140
        order_id: Option<i32>,
141
        message_type: Option<OutgoingMessages>,
142
        response_context: ResponseContext,
143
    ) -> Self
144
    where
145
        D: Fn(i32, &mut ResponseMessage) -> Result<T, Error> + Send + Sync + 'static,
146
    {
147
        Self {
148
            inner: SubscriptionInner::WithDecoder {
192✔
149
                subscription: internal,
150
                decoder: Arc::new(decoder),
151
                server_version,
152
            },
153
            request_id,
154
            order_id,
155
            _message_type: message_type,
156
            response_context,
157
            cancelled: Arc::new(AtomicBool::new(false)),
288✔
158
            server_version,
159
            message_bus: Some(message_bus),
96✔
160
            cancel_fn: None,
161
        }
162
    }
163

164
    /// Create a subscription from an internal subscription using the DataStream decoder
165
    pub(crate) fn new_from_internal<D>(
96✔
166
        internal: AsyncInternalSubscription,
167
        server_version: i32,
168
        message_bus: Arc<dyn AsyncMessageBus>,
169
        request_id: Option<i32>,
170
        order_id: Option<i32>,
171
        message_type: Option<OutgoingMessages>,
172
        response_context: ResponseContext,
173
    ) -> Self
174
    where
175
        D: StreamDecoder<T> + 'static,
176
        T: 'static,
177
    {
178
        let mut sub = Self::with_decoder_components(
179
            internal,
96✔
180
            server_version,
96✔
181
            message_bus,
96✔
NEW
182
            D::decode,
×
183
            request_id,
96✔
184
            order_id,
96✔
185
            message_type,
96✔
186
            response_context,
96✔
187
        );
188
        // Store the cancel function
189
        sub.cancel_fn = Some(Arc::new(Box::new(D::cancel_message)));
192✔
190
        sub
96✔
191
    }
192

193
    /// Create a subscription from internal subscription without explicit metadata
194
    pub(crate) fn new_from_internal_simple<D>(internal: AsyncInternalSubscription, server_version: i32, message_bus: Arc<dyn AsyncMessageBus>) -> Self
16✔
195
    where
196
        D: StreamDecoder<T> + 'static,
197
        T: 'static,
198
    {
199
        // The AsyncInternalSubscription already has cleanup logic, so we don't need cancel metadata
200
        Self::new_from_internal::<D>(internal, server_version, message_bus, None, None, None, ResponseContext::default())
128✔
201
    }
202

203
    /// Create subscription from existing receiver (for backward compatibility)
204
    pub fn new(receiver: mpsc::UnboundedReceiver<Result<T, Error>>) -> Self {
×
205
        // This creates a subscription that expects pre-decoded messages
206
        // Used for compatibility with existing code that manually decodes
207
        Self {
208
            inner: SubscriptionInner::PreDecoded { receiver },
×
209
            request_id: None,
210
            order_id: None,
211
            _message_type: None,
212
            response_context: ResponseContext::default(),
×
213
            cancelled: Arc::new(AtomicBool::new(false)),
×
214
            server_version: 0, // Default value for backward compatibility
215
            message_bus: None,
216
            cancel_fn: None,
217
        }
218
    }
219

220
    /// Get the next value from the subscription
221
    pub async fn next(&mut self) -> Option<Result<T, Error>>
166✔
222
    where
223
        T: 'static,
224
    {
225
        match &mut self.inner {
166✔
226
            SubscriptionInner::WithDecoder {
×
227
                subscription,
166✔
228
                decoder,
×
NEW
229
                server_version,
×
230
            } => loop {
×
231
                match subscription.next().await {
501✔
232
                    Some(mut message) => {
161✔
233
                        let result = decoder(*server_version, &mut message);
483✔
234
                        match process_decode_result(result) {
161✔
235
                            ProcessingResult::Success(val) => return Some(Ok(val)),
151✔
236
                            ProcessingResult::EndOfStream => return None,
9✔
237
                            ProcessingResult::Retry => continue,
1✔
238
                            ProcessingResult::Error(err) => return Some(Err(err)),
×
239
                        }
240
                    }
241
                    None => return None,
6✔
242
                }
243
            },
244
            SubscriptionInner::PreDecoded { receiver } => receiver.recv().await,
×
245
        }
246
    }
247
}
248

249
impl<T> Subscription<T> {
250
    /// Cancel the subscription
251
    pub async fn cancel(&self) {
10✔
252
        if self.cancelled.load(Ordering::Relaxed) {
10✔
253
            return;
×
254
        }
255

256
        self.cancelled.store(true, Ordering::Relaxed);
×
257

258
        if let (Some(message_bus), Some(cancel_fn)) = (&self.message_bus, &self.cancel_fn) {
5✔
259
            let id = self.request_id.or(self.order_id);
×
260
            if let Ok(message) = cancel_fn(self.server_version, id, Some(&self.response_context)) {
5✔
NEW
261
                if let Err(e) = message_bus.send_message(message).await {
×
UNCOV
262
                    warn!("error sending cancel message: {e}")
×
263
                }
264
            }
265
        }
266

267
        // The AsyncInternalSubscription's Drop will handle cleanup
268
    }
269
}
270

271
impl<T> Drop for Subscription<T> {
272
    fn drop(&mut self) {
96✔
273
        debug!("dropping async subscription");
96✔
274

275
        // Check if already cancelled
276
        if self.cancelled.load(Ordering::Relaxed) {
192✔
277
            return;
5✔
278
        }
279

280
        self.cancelled.store(true, Ordering::Relaxed);
×
281

282
        // Try to send cancel message if we have the necessary components
283
        if let (Some(message_bus), Some(cancel_fn)) = (&self.message_bus, &self.cancel_fn) {
91✔
NEW
284
            let message_bus = message_bus.clone();
×
285
            let id = self.request_id.or(self.order_id);
×
286
            let response_context = self.response_context.clone();
×
NEW
287
            let server_version = self.server_version;
×
288

289
            // Clone the cancel function for use in the spawned task
290
            if let Ok(message) = cancel_fn(server_version, id, Some(&response_context)) {
66✔
291
                // Spawn a task to send the cancel message since drop can't be async
292
                tokio::spawn(async move {
39✔
293
                    if let Err(e) = message_bus.send_message(message).await {
117✔
294
                        warn!("error sending cancel message in drop: {e}");
×
295
                    }
296
                });
297
            }
298
        }
299

300
        // The AsyncInternalSubscription's Drop will handle channel cleanup
301
    }
302
}
303

304
// Note: Stream trait implementation removed because tokio's broadcast::Receiver
305
// doesn't provide poll_recv. Users should use the async next() method instead.
306
// If Stream is needed, users can convert using futures::stream::unfold.
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc