• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

iggy-rs / iggy / 13087075133

01 Feb 2025 08:55AM UTC coverage: 75.978% (+0.07%) from 75.908%
13087075133

push

github

web-flow
Add cache metrics to `Stats` and p9999 latency tracking to `BenchmarkReport` (#1475)

This commit introduces cache metrics tracking and p9999 latency
measurement to improve performance monitoring and analysis. The
`human-repr` crate is added to enhance readability of large numbers in
logs and reports.

- Updated `Cargo.toml` and `Cargo.lock` to include `human-repr` dependency.
- Added cache metrics collection and serialization in both client and server.
- Implemented p9999 latency tracking in benchmark reports and analytics.
- Enhanced logging with human-readable counts and sizes using `human-repr`.
- Refactored code to accommodate new metrics and improve clarity.

161 of 176 new or added lines in 8 files covered. (91.48%)

7 existing lines in 4 files now uncovered.

25027 of 32940 relevant lines covered (75.98%)

11817.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.92
/sdk/src/binary/mapper.rs
1
use crate::bytes_serializable::BytesSerializable;
2
use crate::compression::compression_algorithm::CompressionAlgorithm;
3
use crate::error::IggyError;
4
use crate::models::client_info::{ClientInfo, ClientInfoDetails, ConsumerGroupInfo};
5
use crate::models::consumer_group::{ConsumerGroup, ConsumerGroupDetails, ConsumerGroupMember};
6
use crate::models::consumer_offset_info::ConsumerOffsetInfo;
7
use crate::models::identity_info::IdentityInfo;
8
use crate::models::messages::{MessageState, PolledMessage, PolledMessages};
9
use crate::models::partition::Partition;
10
use crate::models::permissions::Permissions;
11
use crate::models::personal_access_token::{PersonalAccessTokenInfo, RawPersonalAccessToken};
12
use crate::models::stats::{CacheMetrics, CacheMetricsKey, Stats};
13
use crate::models::stream::{Stream, StreamDetails};
14
use crate::models::topic::{Topic, TopicDetails};
15
use crate::models::user_info::{UserInfo, UserInfoDetails};
16
use crate::models::user_status::UserStatus;
17
use crate::utils::byte_size::IggyByteSize;
18
use crate::utils::expiry::IggyExpiry;
19
use crate::utils::topic_size::MaxTopicSize;
20
use bytes::Bytes;
21
use std::collections::HashMap;
22
use std::str::from_utf8;
23

24
const EMPTY_MESSAGES: Vec<PolledMessage> = vec![];
25
const EMPTY_TOPICS: Vec<Topic> = vec![];
26
const EMPTY_STREAMS: Vec<Stream> = vec![];
27
const EMPTY_CLIENTS: Vec<ClientInfo> = vec![];
28
const EMPTY_USERS: Vec<UserInfo> = vec![];
29
const EMPTY_PERSONAL_ACCESS_TOKENS: Vec<PersonalAccessTokenInfo> = vec![];
30
const EMPTY_CONSUMER_GROUPS: Vec<ConsumerGroup> = vec![];
31

32
pub fn map_stats(payload: Bytes) -> Result<Stats, IggyError> {
20✔
33
    let process_id = u32::from_le_bytes(
20✔
34
        payload[..4]
20✔
35
            .try_into()
20✔
36
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
37
    );
38
    let cpu_usage = f32::from_le_bytes(
20✔
39
        payload[4..8]
20✔
40
            .try_into()
20✔
41
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
42
    );
43
    let total_cpu_usage = f32::from_le_bytes(
20✔
44
        payload[8..12]
20✔
45
            .try_into()
20✔
46
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
47
    );
48
    let memory_usage = u64::from_le_bytes(
20✔
49
        payload[12..20]
20✔
50
            .try_into()
20✔
51
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
52
    )
53
    .into();
20✔
54
    let total_memory = u64::from_le_bytes(
20✔
55
        payload[20..28]
20✔
56
            .try_into()
20✔
57
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
58
    )
59
    .into();
20✔
60
    let available_memory = u64::from_le_bytes(
20✔
61
        payload[28..36]
20✔
62
            .try_into()
20✔
63
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
64
    )
65
    .into();
20✔
66
    let run_time = u64::from_le_bytes(
20✔
67
        payload[36..44]
20✔
68
            .try_into()
20✔
69
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
70
    )
71
    .into();
20✔
72
    let start_time = u64::from_le_bytes(
20✔
73
        payload[44..52]
20✔
74
            .try_into()
20✔
75
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
76
    )
77
    .into();
20✔
78
    let read_bytes = u64::from_le_bytes(
20✔
79
        payload[52..60]
20✔
80
            .try_into()
20✔
81
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
82
    )
83
    .into();
20✔
84
    let written_bytes = u64::from_le_bytes(
20✔
85
        payload[60..68]
20✔
86
            .try_into()
20✔
87
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
88
    )
89
    .into();
20✔
90
    let messages_size_bytes = u64::from_le_bytes(
20✔
91
        payload[68..76]
20✔
92
            .try_into()
20✔
93
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
94
    )
95
    .into();
20✔
96
    let streams_count = u32::from_le_bytes(
20✔
97
        payload[76..80]
20✔
98
            .try_into()
20✔
99
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
100
    );
101
    let topics_count = u32::from_le_bytes(
20✔
102
        payload[80..84]
20✔
103
            .try_into()
20✔
104
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
105
    );
106
    let partitions_count = u32::from_le_bytes(
20✔
107
        payload[84..88]
20✔
108
            .try_into()
20✔
109
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
110
    );
111
    let segments_count = u32::from_le_bytes(
20✔
112
        payload[88..92]
20✔
113
            .try_into()
20✔
114
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
115
    );
116
    let messages_count = u64::from_le_bytes(
20✔
117
        payload[92..100]
20✔
118
            .try_into()
20✔
119
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
120
    );
121
    let clients_count = u32::from_le_bytes(
20✔
122
        payload[100..104]
20✔
123
            .try_into()
20✔
124
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
125
    );
126
    let consumer_groups_count = u32::from_le_bytes(
20✔
127
        payload[104..108]
20✔
128
            .try_into()
20✔
129
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
130
    );
131

132
    let mut current_position = 108;
20✔
133

20✔
134
    //
20✔
135
    // Safely decode hostname
20✔
136
    //
20✔
137
    if current_position + 4 > payload.len() {
20✔
138
        return Err(IggyError::InvalidNumberEncoding);
×
139
    }
20✔
140
    let hostname_length = u32::from_le_bytes(
20✔
141
        payload[current_position..current_position + 4]
20✔
142
            .try_into()
20✔
143
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
144
    ) as usize;
145
    current_position += 4;
20✔
146
    if current_position + hostname_length > payload.len() {
20✔
147
        return Err(IggyError::InvalidNumberEncoding);
×
148
    }
20✔
149
    let hostname = from_utf8(&payload[current_position..current_position + hostname_length])
20✔
150
        .map_err(|_| IggyError::InvalidUtf8)?
20✔
151
        .to_string();
20✔
152
    current_position += hostname_length;
20✔
153

20✔
154
    //
20✔
155
    // Safely Decode OS name
20✔
156
    //
20✔
157
    if current_position + 4 > payload.len() {
20✔
158
        return Err(IggyError::InvalidNumberEncoding);
×
159
    }
20✔
160
    let os_name_length = u32::from_le_bytes(
20✔
161
        payload[current_position..current_position + 4]
20✔
162
            .try_into()
20✔
163
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
164
    ) as usize;
165
    current_position += 4;
20✔
166
    if current_position + os_name_length > payload.len() {
20✔
167
        return Err(IggyError::InvalidNumberEncoding);
×
168
    }
20✔
169
    let os_name = from_utf8(&payload[current_position..current_position + os_name_length])
20✔
170
        .map_err(|_| IggyError::InvalidUtf8)?
20✔
171
        .to_string();
20✔
172
    current_position += os_name_length;
20✔
173

20✔
174
    //
20✔
175
    // Safely decode OS version
20✔
176
    //
20✔
177
    if current_position + 4 > payload.len() {
20✔
178
        return Err(IggyError::InvalidNumberEncoding);
×
179
    }
20✔
180
    let os_version_length = u32::from_le_bytes(
20✔
181
        payload[current_position..current_position + 4]
20✔
182
            .try_into()
20✔
183
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
184
    ) as usize;
185
    current_position += 4;
20✔
186
    if current_position + os_version_length > payload.len() {
20✔
187
        return Err(IggyError::InvalidNumberEncoding);
×
188
    }
20✔
189
    let os_version = from_utf8(&payload[current_position..current_position + os_version_length])
20✔
190
        .map_err(|_| IggyError::InvalidUtf8)?
20✔
191
        .to_string();
20✔
192
    current_position += os_version_length;
20✔
193

20✔
194
    //
20✔
195
    // Safely decode kernel version (NEW) + server version (NEW) + server semver (NEW)
20✔
196
    // We'll check if there's enough bytes before reading each new field.
20✔
197
    //
20✔
198

20✔
199
    // Default them in case payload doesn't have them (older server)
20✔
200
    let mut kernel_version = String::new();
20✔
201
    let mut iggy_server_version = String::new();
20✔
202
    let mut iggy_server_semver: Option<u32> = None;
20✔
203

20✔
204
    // kernel_version (if it exists)
20✔
205
    if current_position + 4 <= payload.len() {
20✔
206
        let kernel_version_length = u32::from_le_bytes(
20✔
207
            payload[current_position..current_position + 4]
20✔
208
                .try_into()
20✔
209
                .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
210
        ) as usize;
211
        current_position += 4;
20✔
212
        if current_position + kernel_version_length <= payload.len() {
20✔
213
            let kv =
20✔
214
                from_utf8(&payload[current_position..current_position + kernel_version_length])
20✔
215
                    .map_err(|_| IggyError::InvalidUtf8)?
20✔
216
                    .to_string();
20✔
217
            kernel_version = kv;
20✔
218
            current_position += kernel_version_length;
20✔
219
        } else {
×
220
            // Not enough bytes for kernel version string, treat as empty or error out
×
221
            // return Err(IggyError::InvalidNumberEncoding);
×
222
            kernel_version = String::new(); // fallback
×
223
        }
×
224
    } else {
×
225
        // This means older server didn't send kernel_version, so remain empty
×
226
    }
×
227

228
    // iggy_server_version (if it exists)
229
    if current_position + 4 <= payload.len() {
20✔
230
        let iggy_version_length = u32::from_le_bytes(
20✔
231
            payload[current_position..current_position + 4]
20✔
232
                .try_into()
20✔
233
                .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
234
        ) as usize;
235
        current_position += 4;
20✔
236
        if current_position + iggy_version_length <= payload.len() {
20✔
237
            let iv = from_utf8(&payload[current_position..current_position + iggy_version_length])
20✔
238
                .map_err(|_| IggyError::InvalidUtf8)?
20✔
239
                .to_string();
20✔
240
            iggy_server_version = iv;
20✔
241
            current_position += iggy_version_length;
20✔
242
        } else {
×
243
            // Not enough bytes for iggy version string, treat as empty or error out
×
244
            // return Err(IggyError::InvalidNumberEncoding);
×
245
            iggy_server_version = String::new(); // fallback
×
246
        }
×
247
    } else {
×
248
        // older server didn't send iggy_server_version, so remain empty
×
249
    }
×
250

251
    // iggy_server_semver (if it exists)
252
    if current_position + 4 <= payload.len() {
20✔
253
        let semver = u32::from_le_bytes(
20✔
254
            payload[current_position..current_position + 4]
20✔
255
                .try_into()
20✔
256
                .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
257
        );
258
        current_position += 4; // Increment position after reading semver
20✔
259
        if semver != 0 {
20✔
260
            iggy_server_semver = Some(semver);
20✔
261
        }
20✔
262
    } else {
×
263
        // older server didn't send semver
×
264
    }
×
265

266
    // Read cache metrics (if they exist)
267
    let mut cache_metrics = HashMap::new();
20✔
268
    if current_position + 4 <= payload.len() {
20✔
269
        let metrics_count = u32::from_le_bytes(
20✔
270
            payload[current_position..current_position + 4]
20✔
271
                .try_into()
20✔
272
                .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
273
        ) as usize;
274
        current_position += 4;
20✔
275

20✔
276
        for _ in 0..metrics_count {
20✔
277
            // Read CacheMetricsKey
278
            let stream_id = u32::from_le_bytes(
161✔
279
                payload[current_position..current_position + 4]
161✔
280
                    .try_into()
161✔
281
                    .map_err(|_| IggyError::InvalidNumberEncoding)?,
161✔
282
            );
283
            current_position += 4;
161✔
284

285
            let topic_id = u32::from_le_bytes(
161✔
286
                payload[current_position..current_position + 4]
161✔
287
                    .try_into()
161✔
288
                    .map_err(|_| IggyError::InvalidNumberEncoding)?,
161✔
289
            );
290
            current_position += 4;
161✔
291

292
            let partition_id = u32::from_le_bytes(
161✔
293
                payload[current_position..current_position + 4]
161✔
294
                    .try_into()
161✔
295
                    .map_err(|_| IggyError::InvalidNumberEncoding)?,
161✔
296
            );
297
            current_position += 4;
161✔
298

299
            // Read CacheMetrics
300
            let hits = u64::from_le_bytes(
161✔
301
                payload[current_position..current_position + 8]
161✔
302
                    .try_into()
161✔
303
                    .map_err(|_| IggyError::InvalidNumberEncoding)?,
161✔
304
            );
305
            current_position += 8;
161✔
306

307
            let misses = u64::from_le_bytes(
161✔
308
                payload[current_position..current_position + 8]
161✔
309
                    .try_into()
161✔
310
                    .map_err(|_| IggyError::InvalidNumberEncoding)?,
161✔
311
            );
312
            current_position += 8;
161✔
313

314
            let hit_ratio = f32::from_le_bytes(
161✔
315
                payload[current_position..current_position + 4]
161✔
316
                    .try_into()
161✔
317
                    .map_err(|_| IggyError::InvalidNumberEncoding)?,
161✔
318
            );
319
            current_position += 4;
161✔
320

161✔
321
            let key = CacheMetricsKey {
161✔
322
                stream_id,
161✔
323
                topic_id,
161✔
324
                partition_id,
161✔
325
            };
161✔
326

161✔
327
            let metrics = CacheMetrics {
161✔
328
                hits,
161✔
329
                misses,
161✔
330
                hit_ratio,
161✔
331
            };
161✔
332

161✔
333
            cache_metrics.insert(key, metrics);
161✔
334
        }
NEW
335
    }
×
336

337
    Ok(Stats {
20✔
338
        process_id,
20✔
339
        cpu_usage,
20✔
340
        total_cpu_usage,
20✔
341
        memory_usage,
20✔
342
        total_memory,
20✔
343
        available_memory,
20✔
344
        run_time,
20✔
345
        start_time,
20✔
346
        read_bytes,
20✔
347
        written_bytes,
20✔
348
        messages_size_bytes,
20✔
349
        streams_count,
20✔
350
        topics_count,
20✔
351
        partitions_count,
20✔
352
        segments_count,
20✔
353
        messages_count,
20✔
354
        clients_count,
20✔
355
        consumer_groups_count,
20✔
356
        hostname,
20✔
357
        os_name,
20✔
358
        os_version,
20✔
359
        kernel_version,
20✔
360
        iggy_server_version,
20✔
361
        iggy_server_semver,
20✔
362
        cache_metrics,
20✔
363
    })
20✔
364
}
20✔
365

366
pub fn map_consumer_offset(payload: Bytes) -> Result<ConsumerOffsetInfo, IggyError> {
22✔
367
    let partition_id = u32::from_le_bytes(
22✔
368
        payload[..4]
22✔
369
            .try_into()
22✔
370
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
22✔
371
    );
372
    let current_offset = u64::from_le_bytes(
22✔
373
        payload[4..12]
22✔
374
            .try_into()
22✔
375
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
22✔
376
    );
377
    let stored_offset = u64::from_le_bytes(
22✔
378
        payload[12..20]
22✔
379
            .try_into()
22✔
380
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
22✔
381
    );
382
    Ok(ConsumerOffsetInfo {
22✔
383
        partition_id,
22✔
384
        current_offset,
22✔
385
        stored_offset,
22✔
386
    })
22✔
387
}
22✔
388

389
pub fn map_user(payload: Bytes) -> Result<UserInfoDetails, IggyError> {
88✔
390
    let (user, position) = map_to_user_info(payload.clone(), 0)?;
88✔
391
    let has_permissions = payload[position];
88✔
392
    let permissions = if has_permissions == 1 {
88✔
393
        let permissions_length = u32::from_le_bytes(
41✔
394
            payload[position + 1..position + 5]
41✔
395
                .try_into()
41✔
396
                .map_err(|_| IggyError::InvalidNumberEncoding)?,
41✔
397
        ) as usize;
398
        let permissions = payload.slice(position + 5..position + 5 + permissions_length);
41✔
399
        Some(Permissions::from_bytes(permissions)?)
41✔
400
    } else {
401
        None
47✔
402
    };
403

404
    let user = UserInfoDetails {
88✔
405
        id: user.id,
88✔
406
        created_at: user.created_at,
88✔
407
        status: user.status,
88✔
408
        username: user.username,
88✔
409
        permissions,
88✔
410
    };
88✔
411
    Ok(user)
88✔
412
}
88✔
413

414
pub fn map_users(payload: Bytes) -> Result<Vec<UserInfo>, IggyError> {
24✔
415
    if payload.is_empty() {
24✔
416
        return Ok(EMPTY_USERS);
×
417
    }
24✔
418

24✔
419
    let mut users = Vec::new();
24✔
420
    let length = payload.len();
24✔
421
    let mut position = 0;
24✔
422
    while position < length {
54✔
423
        let (user, read_bytes) = map_to_user_info(payload.clone(), position)?;
30✔
424
        users.push(user);
30✔
425
        position += read_bytes;
30✔
426
    }
427
    users.sort_by(|x, y| x.id.cmp(&y.id));
24✔
428
    Ok(users)
24✔
429
}
24✔
430

431
pub fn map_personal_access_tokens(
24✔
432
    payload: Bytes,
24✔
433
) -> Result<Vec<PersonalAccessTokenInfo>, IggyError> {
24✔
434
    if payload.is_empty() {
24✔
435
        return Ok(EMPTY_PERSONAL_ACCESS_TOKENS);
10✔
436
    }
14✔
437

14✔
438
    let mut personal_access_tokens = Vec::new();
14✔
439
    let length = payload.len();
14✔
440
    let mut position = 0;
14✔
441
    while position < length {
30✔
442
        let (personal_access_token, read_bytes) = map_to_pat_info(payload.clone(), position)?;
16✔
443
        personal_access_tokens.push(personal_access_token);
16✔
444
        position += read_bytes;
16✔
445
    }
446
    personal_access_tokens.sort_by(|x, y| x.name.cmp(&y.name));
14✔
447
    Ok(personal_access_tokens)
14✔
448
}
24✔
449

450
pub fn map_identity_info(payload: Bytes) -> Result<IdentityInfo, IggyError> {
443✔
451
    let user_id = u32::from_le_bytes(
443✔
452
        payload[..4]
443✔
453
            .try_into()
443✔
454
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
443✔
455
    );
456
    Ok(IdentityInfo {
443✔
457
        user_id,
443✔
458
        access_token: None,
443✔
459
    })
443✔
460
}
443✔
461

462
pub fn map_raw_pat(payload: Bytes) -> Result<RawPersonalAccessToken, IggyError> {
18✔
463
    let token_length = payload[0];
18✔
464
    let token = from_utf8(&payload[1..1 + token_length as usize])
18✔
465
        .map_err(|_| IggyError::InvalidUtf8)?
18✔
466
        .to_string();
18✔
467
    Ok(RawPersonalAccessToken { token })
18✔
468
}
18✔
469

470
pub fn map_client(payload: Bytes) -> Result<ClientInfoDetails, IggyError> {
35✔
471
    let (client, mut position) = map_to_client_info(payload.clone(), 0)?;
35✔
472
    let mut consumer_groups = Vec::new();
35✔
473
    let length = payload.len();
35✔
474
    while position < length {
53✔
475
        for _ in 0..client.consumer_groups_count {
18✔
476
            let stream_id = u32::from_le_bytes(
18✔
477
                payload[position..position + 4]
18✔
478
                    .try_into()
18✔
479
                    .map_err(|_| IggyError::InvalidNumberEncoding)?,
18✔
480
            );
481
            let topic_id = u32::from_le_bytes(
18✔
482
                payload[position + 4..position + 8]
18✔
483
                    .try_into()
18✔
484
                    .map_err(|_| IggyError::InvalidNumberEncoding)?,
18✔
485
            );
486
            let group_id = u32::from_le_bytes(
18✔
487
                payload[position + 8..position + 12]
18✔
488
                    .try_into()
18✔
489
                    .map_err(|_| IggyError::InvalidNumberEncoding)?,
18✔
490
            );
491
            let consumer_group = ConsumerGroupInfo {
18✔
492
                stream_id,
18✔
493
                topic_id,
18✔
494
                group_id,
18✔
495
            };
18✔
496
            consumer_groups.push(consumer_group);
18✔
497
            position += 12;
18✔
498
        }
499
    }
500

501
    consumer_groups.sort_by(|x, y| x.group_id.cmp(&y.group_id));
35✔
502
    let client = ClientInfoDetails {
35✔
503
        client_id: client.client_id,
35✔
504
        user_id: client.user_id,
35✔
505
        address: client.address,
35✔
506
        transport: client.transport,
35✔
507
        consumer_groups_count: client.consumer_groups_count,
35✔
508
        consumer_groups,
35✔
509
    };
35✔
510
    Ok(client)
35✔
511
}
35✔
512

513
pub fn map_clients(payload: Bytes) -> Result<Vec<ClientInfo>, IggyError> {
5✔
514
    if payload.is_empty() {
5✔
515
        return Ok(EMPTY_CLIENTS);
×
516
    }
5✔
517

5✔
518
    let mut clients = Vec::new();
5✔
519
    let length = payload.len();
5✔
520
    let mut position = 0;
5✔
521
    while position < length {
13✔
522
        let (client, read_bytes) = map_to_client_info(payload.clone(), position)?;
8✔
523
        clients.push(client);
8✔
524
        position += read_bytes;
8✔
525
    }
526
    clients.sort_by(|x, y| x.client_id.cmp(&y.client_id));
5✔
527
    Ok(clients)
5✔
528
}
5✔
529

530
pub fn map_polled_messages(payload: Bytes) -> Result<PolledMessages, IggyError> {
42,087✔
531
    if payload.is_empty() {
42,087✔
532
        return Ok(PolledMessages {
×
533
            messages: EMPTY_MESSAGES,
×
534
            partition_id: 0,
×
535
            current_offset: 0,
×
536
        });
×
537
    }
42,087✔
538

42,087✔
539
    let length = payload.len();
42,087✔
540
    let partition_id = u32::from_le_bytes(
42,087✔
541
        payload[..4]
42,087✔
542
            .try_into()
42,087✔
543
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
42,087✔
544
    );
545
    let current_offset = u64::from_le_bytes(
42,087✔
546
        payload[4..12]
42,087✔
547
            .try_into()
42,087✔
548
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
42,087✔
549
    );
550
    // Currently ignored
551
    let _messages_count = u32::from_le_bytes(
42,087✔
552
        payload[12..16]
42,087✔
553
            .try_into()
42,087✔
554
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
42,087✔
555
    );
556
    let mut position = 16;
42,087✔
557
    let mut messages = Vec::new();
42,087✔
558
    while position < length {
644,255✔
559
        let offset = u64::from_le_bytes(
624,237✔
560
            payload[position..position + 8]
624,237✔
561
                .try_into()
624,237✔
562
                .map_err(|_| IggyError::InvalidNumberEncoding)?,
624,237✔
563
        );
564
        let state = MessageState::from_code(payload[position + 8])?;
624,237✔
565
        let timestamp = u64::from_le_bytes(
624,237✔
566
            payload[position + 9..position + 17]
624,237✔
567
                .try_into()
624,237✔
568
                .map_err(|_| IggyError::InvalidNumberEncoding)?,
624,237✔
569
        );
570
        let id = u128::from_le_bytes(
624,237✔
571
            payload[position + 17..position + 33]
624,237✔
572
                .try_into()
624,237✔
573
                .map_err(|_| IggyError::InvalidNumberEncoding)?,
624,237✔
574
        );
575
        let checksum = u32::from_le_bytes(
624,237✔
576
            payload[position + 33..position + 37]
624,237✔
577
                .try_into()
624,237✔
578
                .map_err(|_| IggyError::InvalidNumberEncoding)?,
624,237✔
579
        );
580
        let headers_length = u32::from_le_bytes(
624,237✔
581
            payload[position + 37..position + 41]
624,237✔
582
                .try_into()
624,237✔
583
                .map_err(|_| IggyError::InvalidNumberEncoding)?,
624,237✔
584
        );
585
        let headers = if headers_length > 0 {
624,237✔
586
            let headers_payload =
4,112✔
587
                payload.slice(position + 41..position + 41 + headers_length as usize);
4,112✔
588
            Some(HashMap::from_bytes(headers_payload)?)
4,112✔
589
        } else {
590
            None
620,125✔
591
        };
592
        position += headers_length as usize;
624,237✔
593
        let message_length = u32::from_le_bytes(
624,237✔
594
            payload[position + 41..position + 45]
624,237✔
595
                .try_into()
624,237✔
596
                .map_err(|_| IggyError::InvalidNumberEncoding)?,
624,237✔
597
        );
598
        let payload_range = position + 45..position + 45 + message_length as usize;
624,237✔
599
        if payload_range.start > length || payload_range.end > length {
624,237✔
600
            break;
×
601
        }
624,237✔
602

624,237✔
603
        let payload = payload[payload_range].to_vec();
624,237✔
604
        let total_size = 45 + message_length as usize;
624,237✔
605
        position += total_size;
624,237✔
606
        messages.push(PolledMessage {
624,237✔
607
            offset,
624,237✔
608
            timestamp,
624,237✔
609
            state,
624,237✔
610
            checksum,
624,237✔
611
            id,
624,237✔
612
            headers,
624,237✔
613
            length: IggyByteSize::from(message_length as u64),
624,237✔
614
            payload: Bytes::from(payload),
624,237✔
615
        });
624,237✔
616

624,237✔
617
        if position + 45 >= length {
624,237✔
618
            break;
22,069✔
619
        }
602,168✔
620
    }
621

622
    messages.sort_by(|x, y| x.offset.cmp(&y.offset));
602,168✔
623
    Ok(PolledMessages {
42,087✔
624
        partition_id,
42,087✔
625
        current_offset,
42,087✔
626
        messages,
42,087✔
627
    })
42,087✔
628
}
42,087✔
629

630
pub fn map_streams(payload: Bytes) -> Result<Vec<Stream>, IggyError> {
43✔
631
    if payload.is_empty() {
43✔
632
        return Ok(EMPTY_STREAMS);
30✔
633
    }
13✔
634

13✔
635
    let mut streams = Vec::new();
13✔
636
    let length = payload.len();
13✔
637
    let mut position = 0;
13✔
638
    while position < length {
85✔
639
        let (stream, read_bytes) = map_to_stream(payload.clone(), position)?;
72✔
640
        streams.push(stream);
72✔
641
        position += read_bytes;
72✔
642
    }
643
    streams.sort_by(|x, y| x.id.cmp(&y.id));
150✔
644
    Ok(streams)
13✔
645
}
43✔
646

647
pub fn map_stream(payload: Bytes) -> Result<StreamDetails, IggyError> {
261✔
648
    let (stream, mut position) = map_to_stream(payload.clone(), 0)?;
261✔
649
    let mut topics = Vec::new();
261✔
650
    let length = payload.len();
261✔
651
    while position < length {
302✔
652
        let (topic, read_bytes) = map_to_topic(payload.clone(), position)?;
41✔
653
        topics.push(topic);
41✔
654
        position += read_bytes;
41✔
655
    }
656

657
    topics.sort_by(|x, y| x.id.cmp(&y.id));
261✔
658
    let stream = StreamDetails {
261✔
659
        id: stream.id,
261✔
660
        created_at: stream.created_at,
261✔
661
        topics_count: stream.topics_count,
261✔
662
        size: stream.size,
261✔
663
        messages_count: stream.messages_count,
261✔
664
        name: stream.name,
261✔
665
        topics,
261✔
666
    };
261✔
667
    Ok(stream)
261✔
668
}
261✔
669

670
fn map_to_stream(payload: Bytes, position: usize) -> Result<(Stream, usize), IggyError> {
333✔
671
    let id = u32::from_le_bytes(
333✔
672
        payload[position..position + 4]
333✔
673
            .try_into()
333✔
674
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
333✔
675
    );
676
    let created_at = u64::from_le_bytes(
333✔
677
        payload[position + 4..position + 12]
333✔
678
            .try_into()
333✔
679
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
333✔
680
    )
681
    .into();
333✔
682
    let topics_count = u32::from_le_bytes(
333✔
683
        payload[position + 12..position + 16]
333✔
684
            .try_into()
333✔
685
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
333✔
686
    );
687
    let size_bytes = u64::from_le_bytes(
333✔
688
        payload[position + 16..position + 24]
333✔
689
            .try_into()
333✔
690
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
333✔
691
    )
692
    .into();
333✔
693
    let messages_count = u64::from_le_bytes(
333✔
694
        payload[position + 24..position + 32]
333✔
695
            .try_into()
333✔
696
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
333✔
697
    );
698
    let name_length = payload[position + 32];
333✔
699
    let name = from_utf8(&payload[position + 33..position + 33 + name_length as usize])
333✔
700
        .map_err(|_| IggyError::InvalidUtf8)?
333✔
701
        .to_string();
333✔
702
    let read_bytes = 4 + 8 + 4 + 8 + 8 + 1 + name_length as usize;
333✔
703
    Ok((
333✔
704
        Stream {
333✔
705
            id,
333✔
706
            created_at,
333✔
707
            name,
333✔
708
            size: size_bytes,
333✔
709
            messages_count,
333✔
710
            topics_count,
333✔
711
        },
333✔
712
        read_bytes,
333✔
713
    ))
333✔
714
}
333✔
715

716
pub fn map_topics(payload: Bytes) -> Result<Vec<Topic>, IggyError> {
11✔
717
    if payload.is_empty() {
11✔
718
        return Ok(EMPTY_TOPICS);
6✔
719
    }
5✔
720

5✔
721
    let mut topics = Vec::new();
5✔
722
    let length = payload.len();
5✔
723
    let mut position = 0;
5✔
724
    while position < length {
10✔
725
        let (topic, read_bytes) = map_to_topic(payload.clone(), position)?;
5✔
726
        topics.push(topic);
5✔
727
        position += read_bytes;
5✔
728
    }
729
    topics.sort_by(|x, y| x.id.cmp(&y.id));
5✔
730
    Ok(topics)
5✔
731
}
11✔
732

733
pub fn map_topic(payload: Bytes) -> Result<TopicDetails, IggyError> {
302✔
734
    let (topic, mut position) = map_to_topic(payload.clone(), 0)?;
302✔
735
    let mut partitions = Vec::new();
302✔
736
    let length = payload.len();
302✔
737
    while position < length {
1,025✔
738
        let (partition, read_bytes) = map_to_partition(payload.clone(), position)?;
723✔
739
        partitions.push(partition);
723✔
740
        position += read_bytes;
723✔
741
    }
742

743
    partitions.sort_by(|x, y| x.id.cmp(&y.id));
856✔
744
    let topic = TopicDetails {
302✔
745
        id: topic.id,
302✔
746
        created_at: topic.created_at,
302✔
747
        name: topic.name,
302✔
748
        size: topic.size,
302✔
749
        messages_count: topic.messages_count,
302✔
750
        message_expiry: topic.message_expiry,
302✔
751
        compression_algorithm: topic.compression_algorithm,
302✔
752
        max_topic_size: topic.max_topic_size,
302✔
753
        replication_factor: topic.replication_factor,
302✔
754
        #[allow(clippy::cast_possible_truncation)]
302✔
755
        partitions_count: partitions.len() as u32,
302✔
756
        partitions,
302✔
757
    };
302✔
758
    Ok(topic)
302✔
759
}
302✔
760

761
fn map_to_topic(payload: Bytes, position: usize) -> Result<(Topic, usize), IggyError> {
348✔
762
    let id = u32::from_le_bytes(
348✔
763
        payload[position..position + 4]
348✔
764
            .try_into()
348✔
765
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
348✔
766
    );
767
    let created_at = u64::from_le_bytes(
348✔
768
        payload[position + 4..position + 12]
348✔
769
            .try_into()
348✔
770
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
348✔
771
    );
772
    let created_at = created_at.into();
348✔
773
    let partitions_count = u32::from_le_bytes(
348✔
774
        payload[position + 12..position + 16]
348✔
775
            .try_into()
348✔
776
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
348✔
777
    );
778
    let message_expiry = match u64::from_le_bytes(
348✔
779
        payload[position + 16..position + 24]
348✔
780
            .try_into()
348✔
781
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
348✔
782
    ) {
783
        0 => IggyExpiry::NeverExpire,
×
784
        message_expiry => message_expiry.into(),
348✔
785
    };
786
    let compression_algorithm = CompressionAlgorithm::from_code(payload[position + 24])?;
348✔
787
    let max_topic_size = u64::from_le_bytes(
348✔
788
        payload[position + 25..position + 33]
348✔
789
            .try_into()
348✔
790
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
348✔
791
    );
792
    let max_topic_size: MaxTopicSize = max_topic_size.into();
348✔
793
    let replication_factor = payload[position + 33];
348✔
794
    let size_bytes = IggyByteSize::from(u64::from_le_bytes(
348✔
795
        payload[position + 34..position + 42]
348✔
796
            .try_into()
348✔
797
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
348✔
798
    ));
799
    let messages_count = u64::from_le_bytes(
348✔
800
        payload[position + 42..position + 50]
348✔
801
            .try_into()
348✔
802
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
348✔
803
    );
804
    let name_length = payload[position + 50];
348✔
805
    let name = from_utf8(&payload[position + 51..position + 51 + name_length as usize])
348✔
806
        .map_err(|_| IggyError::InvalidUtf8)?
348✔
807
        .to_string();
348✔
808
    let read_bytes = 4 + 8 + 4 + 8 + 8 + 8 + 8 + 1 + 1 + 1 + name_length as usize;
348✔
809
    Ok((
348✔
810
        Topic {
348✔
811
            id,
348✔
812
            created_at,
348✔
813
            name,
348✔
814
            partitions_count,
348✔
815
            size: size_bytes,
348✔
816
            messages_count,
348✔
817
            message_expiry,
348✔
818
            compression_algorithm,
348✔
819
            max_topic_size,
348✔
820
            replication_factor,
348✔
821
        },
348✔
822
        read_bytes,
348✔
823
    ))
348✔
824
}
348✔
825

826
fn map_to_partition(payload: Bytes, position: usize) -> Result<(Partition, usize), IggyError> {
723✔
827
    let id = u32::from_le_bytes(
723✔
828
        payload[position..position + 4]
723✔
829
            .try_into()
723✔
830
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
723✔
831
    );
832
    let created_at = u64::from_le_bytes(
723✔
833
        payload[position + 4..position + 12]
723✔
834
            .try_into()
723✔
835
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
723✔
836
    );
837
    let created_at = created_at.into();
723✔
838
    let segments_count = u32::from_le_bytes(
723✔
839
        payload[position + 12..position + 16]
723✔
840
            .try_into()
723✔
841
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
723✔
842
    );
843
    let current_offset = u64::from_le_bytes(
723✔
844
        payload[position + 16..position + 24]
723✔
845
            .try_into()
723✔
846
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
723✔
847
    );
848
    let size_bytes = u64::from_le_bytes(
723✔
849
        payload[position + 24..position + 32]
723✔
850
            .try_into()
723✔
851
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
723✔
852
    )
853
    .into();
723✔
854
    let messages_count = u64::from_le_bytes(
723✔
855
        payload[position + 32..position + 40]
723✔
856
            .try_into()
723✔
857
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
723✔
858
    );
859
    let read_bytes = 4 + 8 + 4 + 8 + 8 + 8;
723✔
860
    Ok((
723✔
861
        Partition {
723✔
862
            id,
723✔
863
            created_at,
723✔
864
            segments_count,
723✔
865
            current_offset,
723✔
866
            size: size_bytes,
723✔
867
            messages_count,
723✔
868
        },
723✔
869
        read_bytes,
723✔
870
    ))
723✔
871
}
723✔
872

873
pub fn map_consumer_groups(payload: Bytes) -> Result<Vec<ConsumerGroup>, IggyError> {
24✔
874
    if payload.is_empty() {
24✔
875
        return Ok(EMPTY_CONSUMER_GROUPS);
10✔
876
    }
14✔
877

14✔
878
    let mut consumer_groups = Vec::new();
14✔
879
    let length = payload.len();
14✔
880
    let mut position = 0;
14✔
881
    while position < length {
28✔
882
        let (consumer_group, read_bytes) = map_to_consumer_group(payload.clone(), position)?;
14✔
883
        consumer_groups.push(consumer_group);
14✔
884
        position += read_bytes;
14✔
885
    }
886
    consumer_groups.sort_by(|x, y| x.id.cmp(&y.id));
14✔
887
    Ok(consumer_groups)
14✔
888
}
24✔
889

890
pub fn map_consumer_group(payload: Bytes) -> Result<ConsumerGroupDetails, IggyError> {
78✔
891
    let (consumer_group, mut position) = map_to_consumer_group(payload.clone(), 0)?;
78✔
892
    let mut members = Vec::new();
78✔
893
    let length = payload.len();
78✔
894
    while position < length {
114✔
895
        let (member, read_bytes) = map_to_consumer_group_member(payload.clone(), position)?;
36✔
896
        members.push(member);
36✔
897
        position += read_bytes;
36✔
898
    }
899
    members.sort_by(|x, y| x.id.cmp(&y.id));
78✔
900
    let consumer_group_details = ConsumerGroupDetails {
78✔
901
        id: consumer_group.id,
78✔
902
        name: consumer_group.name,
78✔
903
        partitions_count: consumer_group.partitions_count,
78✔
904
        members_count: consumer_group.members_count,
78✔
905
        members,
78✔
906
    };
78✔
907
    Ok(consumer_group_details)
78✔
908
}
78✔
909

910
fn map_to_consumer_group(
92✔
911
    payload: Bytes,
92✔
912
    position: usize,
92✔
913
) -> Result<(ConsumerGroup, usize), IggyError> {
92✔
914
    let id = u32::from_le_bytes(
92✔
915
        payload[position..position + 4]
92✔
916
            .try_into()
92✔
917
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
92✔
918
    );
919
    let partitions_count = u32::from_le_bytes(
92✔
920
        payload[position + 4..position + 8]
92✔
921
            .try_into()
92✔
922
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
92✔
923
    );
924
    let members_count = u32::from_le_bytes(
92✔
925
        payload[position + 8..position + 12]
92✔
926
            .try_into()
92✔
927
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
92✔
928
    );
929
    let name_length = payload[position + 12];
92✔
930
    let name = from_utf8(&payload[position + 13..position + 13 + name_length as usize])
92✔
931
        .map_err(|_| IggyError::InvalidUtf8)?
92✔
932
        .to_string();
92✔
933
    let read_bytes = 13 + name_length as usize;
92✔
934
    Ok((
92✔
935
        ConsumerGroup {
92✔
936
            id,
92✔
937
            partitions_count,
92✔
938
            members_count,
92✔
939
            name,
92✔
940
        },
92✔
941
        read_bytes,
92✔
942
    ))
92✔
943
}
92✔
944

945
fn map_to_consumer_group_member(
36✔
946
    payload: Bytes,
36✔
947
    position: usize,
36✔
948
) -> Result<(ConsumerGroupMember, usize), IggyError> {
36✔
949
    let id = u32::from_le_bytes(
36✔
950
        payload[position..position + 4]
36✔
951
            .try_into()
36✔
952
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
36✔
953
    );
954
    let partitions_count = u32::from_le_bytes(
36✔
955
        payload[position + 4..position + 8]
36✔
956
            .try_into()
36✔
957
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
36✔
958
    );
959
    let mut partitions = Vec::new();
36✔
960
    for i in 0..partitions_count {
54✔
961
        let partition_id = u32::from_le_bytes(
54✔
962
            payload[position + 8 + (i * 4) as usize..position + 8 + ((i + 1) * 4) as usize]
54✔
963
                .try_into()
54✔
964
                .map_err(|_| IggyError::InvalidNumberEncoding)?,
54✔
965
        );
966
        partitions.push(partition_id);
54✔
967
    }
968

969
    let read_bytes = (4 + 4 + partitions_count * 4) as usize;
36✔
970
    Ok((
36✔
971
        ConsumerGroupMember {
36✔
972
            id,
36✔
973
            partitions_count,
36✔
974
            partitions,
36✔
975
        },
36✔
976
        read_bytes,
36✔
977
    ))
36✔
978
}
36✔
979

980
fn map_to_client_info(
43✔
981
    payload: Bytes,
43✔
982
    mut position: usize,
43✔
983
) -> Result<(ClientInfo, usize), IggyError> {
43✔
984
    let mut read_bytes;
985
    let client_id = u32::from_le_bytes(
43✔
986
        payload[position..position + 4]
43✔
987
            .try_into()
43✔
988
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
43✔
989
    );
990
    let user_id = u32::from_le_bytes(
43✔
991
        payload[position + 4..position + 8]
43✔
992
            .try_into()
43✔
993
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
43✔
994
    );
995
    let user_id = match user_id {
43✔
996
        0 => None,
×
997
        _ => Some(user_id),
43✔
998
    };
999

1000
    let transport = payload[position + 8];
43✔
1001
    let transport = match transport {
43✔
1002
        1 => "TCP",
30✔
1003
        2 => "QUIC",
13✔
1004
        _ => "Unknown",
×
1005
    }
1006
    .to_string();
43✔
1007

1008
    let address_length = u32::from_le_bytes(
43✔
1009
        payload[position + 9..position + 13]
43✔
1010
            .try_into()
43✔
1011
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
43✔
1012
    ) as usize;
1013
    let address = from_utf8(&payload[position + 13..position + 13 + address_length])
43✔
1014
        .map_err(|_| IggyError::InvalidUtf8)?
43✔
1015
        .to_string();
43✔
1016
    read_bytes = 4 + 4 + 1 + 4 + address_length;
43✔
1017
    position += read_bytes;
43✔
1018
    let consumer_groups_count = u32::from_le_bytes(
43✔
1019
        payload[position..position + 4]
43✔
1020
            .try_into()
43✔
1021
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
43✔
1022
    );
1023
    read_bytes += 4;
43✔
1024
    Ok((
43✔
1025
        ClientInfo {
43✔
1026
            client_id,
43✔
1027
            user_id,
43✔
1028
            address,
43✔
1029
            transport,
43✔
1030
            consumer_groups_count,
43✔
1031
        },
43✔
1032
        read_bytes,
43✔
1033
    ))
43✔
1034
}
43✔
1035

1036
fn map_to_user_info(payload: Bytes, position: usize) -> Result<(UserInfo, usize), IggyError> {
118✔
1037
    let id = u32::from_le_bytes(
118✔
1038
        payload[position..position + 4]
118✔
1039
            .try_into()
118✔
1040
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
118✔
1041
    );
1042
    let created_at = u64::from_le_bytes(
118✔
1043
        payload[position + 4..position + 12]
118✔
1044
            .try_into()
118✔
1045
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
118✔
1046
    );
1047
    let created_at = created_at.into();
118✔
1048
    let status = payload[position + 12];
118✔
1049
    let status = UserStatus::from_code(status)?;
118✔
1050
    let username_length = payload[position + 13];
118✔
1051
    let username = from_utf8(&payload[position + 14..position + 14 + username_length as usize])
118✔
1052
        .map_err(|_| IggyError::InvalidUtf8)?
118✔
1053
        .to_string();
118✔
1054
    let read_bytes = 4 + 8 + 1 + 1 + username_length as usize;
118✔
1055

118✔
1056
    Ok((
118✔
1057
        UserInfo {
118✔
1058
            id,
118✔
1059
            created_at,
118✔
1060
            status,
118✔
1061
            username,
118✔
1062
        },
118✔
1063
        read_bytes,
118✔
1064
    ))
118✔
1065
}
118✔
1066

1067
fn map_to_pat_info(
16✔
1068
    payload: Bytes,
16✔
1069
    position: usize,
16✔
1070
) -> Result<(PersonalAccessTokenInfo, usize), IggyError> {
16✔
1071
    let name_length = payload[position];
16✔
1072
    let name = from_utf8(&payload[position + 1..position + 1 + name_length as usize])
16✔
1073
        .map_err(|_| IggyError::InvalidUtf8)?
16✔
1074
        .to_string();
16✔
1075
    let position = position + 1 + name_length as usize;
16✔
1076
    let expiry_at = u64::from_le_bytes(
16✔
1077
        payload[position..position + 8]
16✔
1078
            .try_into()
16✔
1079
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
16✔
1080
    );
1081
    let expiry_at = match expiry_at {
16✔
1082
        0 => None,
7✔
1083
        value => Some(value.into()),
9✔
1084
    };
1085
    let read_bytes = 1 + name_length as usize + 8;
16✔
1086
    Ok((PersonalAccessTokenInfo { name, expiry_at }, read_bytes))
16✔
1087
}
16✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc