• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wboayue / rust-ibapi / 16259710877

14 Jul 2025 06:31AM UTC coverage: 73.863% (-0.6%) from 74.445%
16259710877

push

github

web-flow
Fix async race condition in shared channel handling (#277)

This PR fixes critical race conditions in the async implementation that were causing shared channel operations to hang or fail intermittently.

53 of 124 new or added lines in 11 files covered. (42.74%)

20 existing lines in 4 files now uncovered.

4889 of 6619 relevant lines covered (73.86%)

37.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

42.86
/src/subscriptions/async.rs
1
//! Asynchronous subscription implementation
2

3
use std::pin::Pin;
4
use std::sync::atomic::{AtomicBool, Ordering};
5
use std::sync::Arc;
6
use std::task::{Context, Poll};
7

8
use futures::stream::Stream;
9
use log::{debug, warn};
10
use tokio::sync::mpsc;
11

12
use super::common::{process_decode_result, ProcessingResult};
13
use super::{ResponseContext, StreamDecoder};
14
use crate::client::r#async::Client;
15
use crate::messages::{OutgoingMessages, RequestMessage, ResponseMessage};
16
use crate::transport::AsyncInternalSubscription;
17
use crate::Error;
18

19
// Type aliases to reduce complexity
20
type CancelFn = Box<dyn Fn(i32, Option<i32>, Option<&ResponseContext>) -> Result<RequestMessage, Error> + Send + Sync>;
21
type DecoderFn<T> = Box<dyn Fn(i32, &mut ResponseMessage) -> Result<T, Error> + Send>;
22

23
/// Asynchronous subscription for streaming data
24
pub struct Subscription<T> {
25
    inner: SubscriptionInner<T>,
26
    /// Metadata for cancellation
27
    request_id: Option<i32>,
28
    order_id: Option<i32>,
29
    _message_type: Option<OutgoingMessages>,
30
    response_context: ResponseContext,
31
    cancelled: Arc<AtomicBool>,
32
    client: Option<Arc<Client>>,
33
    /// Cancel message generator
34
    cancel_fn: Option<CancelFn>,
35
}
36

37
enum SubscriptionInner<T> {
38
    /// Subscription with decoder - receives ResponseMessage and decodes to T
39
    WithDecoder {
40
        subscription: AsyncInternalSubscription,
41
        decoder: DecoderFn<T>,
42
        client: Arc<Client>,
43
    },
44
    /// Pre-decoded subscription - receives T directly
45
    PreDecoded { receiver: mpsc::UnboundedReceiver<Result<T, Error>> },
46
}
47

48
impl<T> Subscription<T> {
49
    /// Create a subscription from an internal subscription and a decoder
50
    pub fn with_decoder<D>(
72✔
51
        internal: AsyncInternalSubscription,
52
        client: Arc<Client>,
53
        decoder: D,
54
        request_id: Option<i32>,
55
        order_id: Option<i32>,
56
        message_type: Option<OutgoingMessages>,
57
        response_context: ResponseContext,
58
    ) -> Self
59
    where
60
        D: Fn(i32, &mut ResponseMessage) -> Result<T, Error> + Send + 'static,
61
    {
62
        Self {
63
            inner: SubscriptionInner::WithDecoder {
144✔
64
                subscription: internal,
65
                decoder: Box::new(decoder),
66
                client: client.clone(),
67
            },
68
            request_id,
69
            order_id,
70
            _message_type: message_type,
71
            response_context,
72
            cancelled: Arc::new(AtomicBool::new(false)),
216✔
73
            client: Some(client),
72✔
74
            cancel_fn: None,
75
        }
76
    }
77

78
    /// Create a subscription from an internal subscription with a decoder function
79
    pub fn new_with_decoder<F>(
×
80
        internal: AsyncInternalSubscription,
81
        client: Arc<Client>,
82
        decoder: F,
83
        request_id: Option<i32>,
84
        order_id: Option<i32>,
85
        message_type: Option<OutgoingMessages>,
86
        response_context: ResponseContext,
87
    ) -> Self
88
    where
89
        F: Fn(i32, &mut ResponseMessage) -> Result<T, Error> + Send + 'static,
90
    {
91
        Self::with_decoder(internal, client, decoder, request_id, order_id, message_type, response_context)
×
92
    }
93

94
    /// Create a subscription from an internal subscription using the DataStream decoder
95
    pub(crate) fn new_from_internal<D>(
72✔
96
        internal: AsyncInternalSubscription,
97
        client: Arc<Client>,
98
        request_id: Option<i32>,
99
        order_id: Option<i32>,
100
        message_type: Option<OutgoingMessages>,
101
        response_context: ResponseContext,
102
    ) -> Self
103
    where
104
        D: StreamDecoder<T> + 'static,
105
        T: 'static,
106
    {
107
        let mut sub = Self::with_decoder(internal, client.clone(), D::decode, request_id, order_id, message_type, response_context);
648✔
108
        // Store the cancel function
109
        sub.cancel_fn = Some(Box::new(D::cancel_message));
144✔
110
        sub
72✔
111
    }
112

113
    /// Create a subscription from internal subscription without explicit metadata (for backward compatibility)
114
    pub(crate) fn new_from_internal_simple<D>(internal: AsyncInternalSubscription, client: Arc<Client>) -> Self
7✔
115
    where
116
        D: StreamDecoder<T> + 'static,
117
        T: 'static,
118
    {
119
        // The AsyncInternalSubscription already has cleanup logic, so we don't need cancel metadata
120
        Self::new_from_internal::<D>(internal, client, None, None, None, ResponseContext::default())
49✔
121
    }
122

123
    /// Create subscription from existing receiver (for backward compatibility)
124
    pub fn new(receiver: mpsc::UnboundedReceiver<Result<T, Error>>) -> Self {
×
125
        // This creates a subscription that expects pre-decoded messages
126
        // Used for compatibility with existing code that manually decodes
127
        Self {
128
            inner: SubscriptionInner::PreDecoded { receiver },
×
129
            request_id: None,
130
            order_id: None,
131
            _message_type: None,
132
            response_context: ResponseContext::default(),
×
133
            cancelled: Arc::new(AtomicBool::new(false)),
×
134
            client: None,
135
            cancel_fn: None,
136
        }
137
    }
138

139
    /// Get the next value from the subscription
140
    pub async fn next(&mut self) -> Option<Result<T, Error>>
76✔
141
    where
142
        T: 'static,
143
    {
144
        match &mut self.inner {
76✔
145
            SubscriptionInner::WithDecoder {
×
146
                subscription,
76✔
147
                decoder,
×
148
                client,
×
149
            } => loop {
×
150
                match subscription.next().await {
231✔
151
                    Some(mut message) => {
74✔
152
                        let result = decoder(client.server_version(), &mut message);
222✔
153
                        match process_decode_result(result) {
74✔
154
                            ProcessingResult::Success(val) => return Some(Ok(val)),
70✔
155
                            ProcessingResult::EndOfStream => return None,
3✔
156
                            ProcessingResult::Retry => continue,
1✔
157
                            ProcessingResult::Error(err) => return Some(Err(err)),
×
158
                        }
159
                    }
160
                    None => return None,
3✔
161
                }
162
            },
163
            SubscriptionInner::PreDecoded { receiver } => receiver.recv().await,
×
164
        }
165
    }
166
}
167

168
impl<T> Subscription<T> {
169
    /// Cancel the subscription
170
    pub async fn cancel(&self) {
10✔
171
        if self.cancelled.load(Ordering::Relaxed) {
10✔
172
            return;
×
173
        }
174

175
        self.cancelled.store(true, Ordering::Relaxed);
×
176

177
        if let (Some(client), Some(cancel_fn)) = (&self.client, &self.cancel_fn) {
5✔
178
            let id = self.request_id.or(self.order_id);
×
179
            if let Ok(message) = cancel_fn(client.server_version(), id, Some(&self.response_context)) {
5✔
NEW
180
                if let Err(e) = client.message_bus.send_message(message).await {
×
181
                    warn!("error sending cancel message: {e}")
×
182
                }
183
            }
184
        }
185

186
        // The AsyncInternalSubscription's Drop will handle cleanup
187
    }
188
}
189

190
impl<T> Drop for Subscription<T> {
191
    fn drop(&mut self) {
72✔
192
        debug!("dropping async subscription");
72✔
193

194
        // Check if already cancelled
195
        if self.cancelled.load(Ordering::Relaxed) {
144✔
196
            return;
5✔
197
        }
198

199
        self.cancelled.store(true, Ordering::Relaxed);
×
200

201
        // Try to send cancel message if we have the necessary components
202
        if let (Some(client), Some(cancel_fn)) = (&self.client, &self.cancel_fn) {
67✔
203
            let client = client.clone();
×
204
            let id = self.request_id.or(self.order_id);
×
205
            let response_context = self.response_context.clone();
×
206
            let server_version = client.server_version();
×
207

208
            // Clone the cancel function for use in the spawned task
209
            if let Ok(message) = cancel_fn(server_version, id, Some(&response_context)) {
53✔
210
                // Spawn a task to send the cancel message since drop can't be async
211
                tokio::spawn(async move {
39✔
212
                    if let Err(e) = client.message_bus.send_message(message).await {
117✔
213
                        warn!("error sending cancel message in drop: {e}");
×
214
                    }
215
                });
216
            }
217
        }
218

219
        // The AsyncInternalSubscription's Drop will handle channel cleanup
220
    }
221
}
222

223
impl<T: Unpin + 'static> Stream for Subscription<T> {
224
    type Item = Result<T, Error>;
225

226
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
×
227
        let this = self.get_mut();
×
228
        match &mut this.inner {
×
229
            SubscriptionInner::WithDecoder {
×
230
                subscription,
×
231
                decoder,
×
232
                client,
×
233
            } => {
×
234
                // Create a Pin for the subscription's receiver
235
                match Pin::new(&mut subscription.receiver).poll_recv(cx) {
×
236
                    Poll::Ready(Some(mut message)) => {
×
237
                        let result = decoder(client.server_version(), &mut message);
×
238
                        match process_decode_result(result) {
×
239
                            ProcessingResult::Success(val) => Poll::Ready(Some(Ok(val))),
×
240
                            ProcessingResult::EndOfStream => Poll::Ready(None),
×
241
                            ProcessingResult::Retry => {
×
242
                                // For retry, we need to re-poll
243
                                cx.waker().wake_by_ref();
×
244
                                Poll::Pending
×
245
                            }
246
                            ProcessingResult::Error(err) => Poll::Ready(Some(Err(err))),
×
247
                        }
248
                    }
249
                    Poll::Ready(None) => Poll::Ready(None),
×
250
                    Poll::Pending => Poll::Pending,
×
251
                }
252
            }
253
            SubscriptionInner::PreDecoded { receiver } => receiver.poll_recv(cx),
×
254
        }
255
    }
256
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc