• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

wboayue / rust-ibapi / 16251934242

13 Jul 2025 06:01PM UTC coverage: 73.375% (-3.0%) from 76.394%
16251934242

push

github

wboayue
fix: create target/coverage directory before writing merged report

The lcov tool was failing because the output directory didn't exist.
Added mkdir -p to ensure the directory is created before attempting
to write the merged coverage report.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

4809 of 6554 relevant lines covered (73.38%)

34.57 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

42.86
/src/subscriptions/async.rs
1
//! Asynchronous subscription implementation
2

3
use std::pin::Pin;
4
use std::sync::atomic::{AtomicBool, Ordering};
5
use std::sync::Arc;
6
use std::task::{Context, Poll};
7

8
use futures::stream::Stream;
9
use log::{debug, warn};
10
use tokio::sync::mpsc;
11

12
use super::common::{process_decode_result, ProcessingResult};
13
use super::{ResponseContext, StreamDecoder};
14
use crate::client::r#async::Client;
15
use crate::messages::{OutgoingMessages, RequestMessage, ResponseMessage};
16
use crate::transport::AsyncInternalSubscription;
17
use crate::Error;
18

19
// Type aliases to reduce complexity
20
type CancelFn = Box<dyn Fn(i32, Option<i32>, Option<&ResponseContext>) -> Result<RequestMessage, Error> + Send + Sync>;
21
type DecoderFn<T> = Box<dyn Fn(i32, &mut ResponseMessage) -> Result<T, Error> + Send>;
22

23
/// Asynchronous subscription for streaming data
24
pub struct Subscription<T> {
25
    inner: SubscriptionInner<T>,
26
    /// Metadata for cancellation
27
    request_id: Option<i32>,
28
    order_id: Option<i32>,
29
    _message_type: Option<OutgoingMessages>,
30
    response_context: ResponseContext,
31
    cancelled: Arc<AtomicBool>,
32
    client: Option<Arc<Client>>,
33
    /// Cancel message generator
34
    cancel_fn: Option<CancelFn>,
35
}
36

37
enum SubscriptionInner<T> {
38
    /// Subscription with decoder - receives ResponseMessage and decodes to T
39
    WithDecoder {
40
        subscription: AsyncInternalSubscription,
41
        decoder: DecoderFn<T>,
42
        client: Arc<Client>,
43
    },
44
    /// Pre-decoded subscription - receives T directly
45
    PreDecoded { receiver: mpsc::UnboundedReceiver<Result<T, Error>> },
46
}
47

48
impl<T> Subscription<T> {
49
    /// Create a subscription from an internal subscription and a decoder
50
    pub fn with_decoder<D>(
71✔
51
        internal: AsyncInternalSubscription,
52
        client: Arc<Client>,
53
        decoder: D,
54
        request_id: Option<i32>,
55
        order_id: Option<i32>,
56
        message_type: Option<OutgoingMessages>,
57
        response_context: ResponseContext,
58
    ) -> Self
59
    where
60
        D: Fn(i32, &mut ResponseMessage) -> Result<T, Error> + Send + 'static,
61
    {
62
        Self {
63
            inner: SubscriptionInner::WithDecoder {
142✔
64
                subscription: internal,
65
                decoder: Box::new(decoder),
66
                client: client.clone(),
67
            },
68
            request_id,
69
            order_id,
70
            _message_type: message_type,
71
            response_context,
72
            cancelled: Arc::new(AtomicBool::new(false)),
213✔
73
            client: Some(client),
71✔
74
            cancel_fn: None,
75
        }
76
    }
77

78
    /// Create a subscription from an internal subscription with a decoder function
79
    pub fn new_with_decoder<F>(
×
80
        internal: AsyncInternalSubscription,
81
        client: Arc<Client>,
82
        decoder: F,
83
        request_id: Option<i32>,
84
        order_id: Option<i32>,
85
        message_type: Option<OutgoingMessages>,
86
        response_context: ResponseContext,
87
    ) -> Self
88
    where
89
        F: Fn(i32, &mut ResponseMessage) -> Result<T, Error> + Send + 'static,
90
    {
91
        Self::with_decoder(internal, client, decoder, request_id, order_id, message_type, response_context)
×
92
    }
93

94
    /// Create a subscription from an internal subscription using the DataStream decoder
95
    pub(crate) fn new_from_internal<D>(
71✔
96
        internal: AsyncInternalSubscription,
97
        client: Arc<Client>,
98
        request_id: Option<i32>,
99
        order_id: Option<i32>,
100
        message_type: Option<OutgoingMessages>,
101
        response_context: ResponseContext,
102
    ) -> Self
103
    where
104
        D: StreamDecoder<T> + 'static,
105
        T: 'static,
106
    {
107
        let mut sub = Self::with_decoder(internal, client.clone(), D::decode, request_id, order_id, message_type, response_context);
639✔
108
        // Store the cancel function
109
        sub.cancel_fn = Some(Box::new(D::cancel_message));
142✔
110
        sub
71✔
111
    }
112

113
    /// Create a subscription from internal subscription without explicit metadata (for backward compatibility)
114
    pub(crate) fn new_from_internal_simple<D>(internal: AsyncInternalSubscription, client: Arc<Client>) -> Self
7✔
115
    where
116
        D: StreamDecoder<T> + 'static,
117
        T: 'static,
118
    {
119
        // The AsyncInternalSubscription already has cleanup logic, so we don't need cancel metadata
120
        Self::new_from_internal::<D>(internal, client, None, None, None, ResponseContext::default())
49✔
121
    }
122

123
    /// Create subscription from existing receiver (for backward compatibility)
124
    pub fn new(receiver: mpsc::UnboundedReceiver<Result<T, Error>>) -> Self {
×
125
        // This creates a subscription that expects pre-decoded messages
126
        // Used for compatibility with existing code that manually decodes
127
        Self {
128
            inner: SubscriptionInner::PreDecoded { receiver },
×
129
            request_id: None,
130
            order_id: None,
131
            _message_type: None,
132
            response_context: ResponseContext::default(),
×
133
            cancelled: Arc::new(AtomicBool::new(false)),
×
134
            client: None,
135
            cancel_fn: None,
136
        }
137
    }
138

139
    /// Get the next value from the subscription
140
    pub async fn next(&mut self) -> Option<Result<T, Error>>
74✔
141
    where
142
        T: 'static,
143
    {
144
        match &mut self.inner {
74✔
145
            SubscriptionInner::WithDecoder {
×
146
                subscription,
74✔
147
                decoder,
×
148
                client,
×
149
            } => loop {
×
150
                match subscription.next().await {
225✔
151
                    Some(mut message) => {
72✔
152
                        let result = decoder(client.server_version(), &mut message);
216✔
153
                        match process_decode_result(result) {
72✔
154
                            ProcessingResult::Success(val) => return Some(Ok(val)),
69✔
155
                            ProcessingResult::EndOfStream => return None,
2✔
156
                            ProcessingResult::Retry => continue,
1✔
157
                            ProcessingResult::Error(err) => return Some(Err(err)),
×
158
                        }
159
                    }
160
                    None => return None,
3✔
161
                }
162
            },
163
            SubscriptionInner::PreDecoded { receiver } => receiver.recv().await,
×
164
        }
165
    }
166
}
167

168
impl<T> Subscription<T> {
169
    /// Cancel the subscription
170
    pub async fn cancel(&self) {
10✔
171
        if self.cancelled.load(Ordering::Relaxed) {
10✔
172
            return;
×
173
        }
174

175
        self.cancelled.store(true, Ordering::Relaxed);
×
176

177
        if let (Some(client), Some(cancel_fn)) = (&self.client, &self.cancel_fn) {
5✔
178
            let id = self.request_id.or(self.order_id);
×
179
            if let Ok(message) = cancel_fn(client.server_version(), id, Some(&self.response_context)) {
5✔
180
                if let Err(e) = client.message_bus.send_request(message).await {
×
181
                    warn!("error sending cancel message: {e}")
×
182
                }
183
            }
184
        }
185

186
        // The AsyncInternalSubscription's Drop will handle cleanup
187
    }
188
}
189

190
impl<T> Drop for Subscription<T> {
191
    fn drop(&mut self) {
71✔
192
        debug!("dropping async subscription");
71✔
193

194
        // Check if already cancelled
195
        if self.cancelled.load(Ordering::Relaxed) {
142✔
196
            return;
5✔
197
        }
198

199
        self.cancelled.store(true, Ordering::Relaxed);
×
200

201
        // Try to send cancel message if we have the necessary components
202
        if let (Some(client), Some(cancel_fn)) = (&self.client, &self.cancel_fn) {
66✔
203
            let client = client.clone();
×
204
            let id = self.request_id.or(self.order_id);
×
205
            let response_context = self.response_context.clone();
×
206
            let server_version = client.server_version();
×
207

208
            // Clone the cancel function for use in the spawned task
209
            if let Ok(message) = cancel_fn(server_version, id, Some(&response_context)) {
53✔
210
                // Spawn a task to send the cancel message since drop can't be async
211
                tokio::spawn(async move {
39✔
212
                    if let Err(e) = client.message_bus.send_request(message).await {
117✔
213
                        warn!("error sending cancel message in drop: {e}");
×
214
                    }
215
                });
216
            }
217
        }
218

219
        // The AsyncInternalSubscription's Drop will handle channel cleanup
220
    }
221
}
222

223
impl<T: Unpin + 'static> Stream for Subscription<T> {
224
    type Item = Result<T, Error>;
225

226
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
×
227
        let this = self.get_mut();
×
228
        match &mut this.inner {
×
229
            SubscriptionInner::WithDecoder {
×
230
                subscription,
×
231
                decoder,
×
232
                client,
×
233
            } => {
×
234
                // Create a Pin for the subscription's receiver
235
                match Pin::new(&mut subscription.receiver).poll_recv(cx) {
×
236
                    Poll::Ready(Some(mut message)) => {
×
237
                        let result = decoder(client.server_version(), &mut message);
×
238
                        match process_decode_result(result) {
×
239
                            ProcessingResult::Success(val) => Poll::Ready(Some(Ok(val))),
×
240
                            ProcessingResult::EndOfStream => Poll::Ready(None),
×
241
                            ProcessingResult::Retry => {
×
242
                                // For retry, we need to re-poll
243
                                cx.waker().wake_by_ref();
×
244
                                Poll::Pending
×
245
                            }
246
                            ProcessingResult::Error(err) => Poll::Ready(Some(Err(err))),
×
247
                        }
248
                    }
249
                    Poll::Ready(None) => Poll::Ready(None),
×
250
                    Poll::Pending => Poll::Pending,
×
251
                }
252
            }
253
            SubscriptionInner::PreDecoded { receiver } => receiver.poll_recv(cx),
×
254
        }
255
    }
256
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc