• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

iggy-rs / iggy / 13008523553

28 Jan 2025 10:33AM UTC coverage: 75.951% (-0.02%) from 75.97%
13008523553

push

github

web-flow
Improve `Stats` payload decoding, make `format_params` public, improve scripts (#1465)

This commit enhances the `map_stats` function in `mapper.rs` by adding
safe decoding
for hostname, OS name, OS version, kernel version, server version, and
server semver.
The changes ensure that the function checks for sufficient bytes in the
payload
before attempting to decode each field, preventing potential errors due
to
insufficient data. Default values are used for fields that may not be
present
in older server payloads, ensuring backward compatibility.

Besdies that, method format_params is now public and performance suite
scripts
have better env variables handling.

84 of 108 new or added lines in 1 file covered. (77.78%)

1 existing line in 1 file now uncovered.

24883 of 32762 relevant lines covered (75.95%)

23819.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.77
/sdk/src/binary/mapper.rs
1
use crate::bytes_serializable::BytesSerializable;
2
use crate::compression::compression_algorithm::CompressionAlgorithm;
3
use crate::error::IggyError;
4
use crate::models::client_info::{ClientInfo, ClientInfoDetails, ConsumerGroupInfo};
5
use crate::models::consumer_group::{ConsumerGroup, ConsumerGroupDetails, ConsumerGroupMember};
6
use crate::models::consumer_offset_info::ConsumerOffsetInfo;
7
use crate::models::identity_info::IdentityInfo;
8
use crate::models::messages::{MessageState, PolledMessage, PolledMessages};
9
use crate::models::partition::Partition;
10
use crate::models::permissions::Permissions;
11
use crate::models::personal_access_token::{PersonalAccessTokenInfo, RawPersonalAccessToken};
12
use crate::models::stats::Stats;
13
use crate::models::stream::{Stream, StreamDetails};
14
use crate::models::topic::{Topic, TopicDetails};
15
use crate::models::user_info::{UserInfo, UserInfoDetails};
16
use crate::models::user_status::UserStatus;
17
use crate::utils::byte_size::IggyByteSize;
18
use crate::utils::expiry::IggyExpiry;
19
use crate::utils::topic_size::MaxTopicSize;
20
use bytes::Bytes;
21
use std::collections::HashMap;
22
use std::str::from_utf8;
23

24
const EMPTY_MESSAGES: Vec<PolledMessage> = vec![];
25
const EMPTY_TOPICS: Vec<Topic> = vec![];
26
const EMPTY_STREAMS: Vec<Stream> = vec![];
27
const EMPTY_CLIENTS: Vec<ClientInfo> = vec![];
28
const EMPTY_USERS: Vec<UserInfo> = vec![];
29
const EMPTY_PERSONAL_ACCESS_TOKENS: Vec<PersonalAccessTokenInfo> = vec![];
30
const EMPTY_CONSUMER_GROUPS: Vec<ConsumerGroup> = vec![];
31

32
pub fn map_stats(payload: Bytes) -> Result<Stats, IggyError> {
20✔
33
    let process_id = u32::from_le_bytes(
20✔
34
        payload[..4]
20✔
35
            .try_into()
20✔
36
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
37
    );
38
    let cpu_usage = f32::from_le_bytes(
20✔
39
        payload[4..8]
20✔
40
            .try_into()
20✔
41
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
42
    );
43
    let total_cpu_usage = f32::from_le_bytes(
20✔
44
        payload[8..12]
20✔
45
            .try_into()
20✔
46
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
47
    );
48
    let memory_usage = u64::from_le_bytes(
20✔
49
        payload[12..20]
20✔
50
            .try_into()
20✔
51
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
52
    )
53
    .into();
20✔
54
    let total_memory = u64::from_le_bytes(
20✔
55
        payload[20..28]
20✔
56
            .try_into()
20✔
57
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
58
    )
59
    .into();
20✔
60
    let available_memory = u64::from_le_bytes(
20✔
61
        payload[28..36]
20✔
62
            .try_into()
20✔
63
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
64
    )
65
    .into();
20✔
66
    let run_time = u64::from_le_bytes(
20✔
67
        payload[36..44]
20✔
68
            .try_into()
20✔
69
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
70
    )
71
    .into();
20✔
72
    let start_time = u64::from_le_bytes(
20✔
73
        payload[44..52]
20✔
74
            .try_into()
20✔
75
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
76
    )
77
    .into();
20✔
78
    let read_bytes = u64::from_le_bytes(
20✔
79
        payload[52..60]
20✔
80
            .try_into()
20✔
81
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
82
    )
83
    .into();
20✔
84
    let written_bytes = u64::from_le_bytes(
20✔
85
        payload[60..68]
20✔
86
            .try_into()
20✔
87
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
88
    )
89
    .into();
20✔
90
    let messages_size_bytes = u64::from_le_bytes(
20✔
91
        payload[68..76]
20✔
92
            .try_into()
20✔
93
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
94
    )
95
    .into();
20✔
96
    let streams_count = u32::from_le_bytes(
20✔
97
        payload[76..80]
20✔
98
            .try_into()
20✔
99
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
100
    );
101
    let topics_count = u32::from_le_bytes(
20✔
102
        payload[80..84]
20✔
103
            .try_into()
20✔
104
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
105
    );
106
    let partitions_count = u32::from_le_bytes(
20✔
107
        payload[84..88]
20✔
108
            .try_into()
20✔
109
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
110
    );
111
    let segments_count = u32::from_le_bytes(
20✔
112
        payload[88..92]
20✔
113
            .try_into()
20✔
114
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
115
    );
116
    let messages_count = u64::from_le_bytes(
20✔
117
        payload[92..100]
20✔
118
            .try_into()
20✔
119
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
120
    );
121
    let clients_count = u32::from_le_bytes(
20✔
122
        payload[100..104]
20✔
123
            .try_into()
20✔
124
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
125
    );
126
    let consumer_groups_count = u32::from_le_bytes(
20✔
127
        payload[104..108]
20✔
128
            .try_into()
20✔
129
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
130
    );
131

132
    let mut current_position = 108;
20✔
133

20✔
134
    //
20✔
135
    // Safely decode hostname
20✔
136
    //
20✔
137
    if current_position + 4 > payload.len() {
20✔
NEW
138
        return Err(IggyError::InvalidNumberEncoding);
×
139
    }
20✔
140
    let hostname_length = u32::from_le_bytes(
20✔
141
        payload[current_position..current_position + 4]
20✔
142
            .try_into()
20✔
143
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
144
    ) as usize;
145
    current_position += 4;
20✔
146
    if current_position + hostname_length > payload.len() {
20✔
NEW
147
        return Err(IggyError::InvalidNumberEncoding);
×
148
    }
20✔
149
    let hostname = from_utf8(&payload[current_position..current_position + hostname_length])
20✔
150
        .map_err(|_| IggyError::InvalidUtf8)?
20✔
151
        .to_string();
20✔
152
    current_position += hostname_length;
20✔
153

20✔
154
    //
20✔
155
    // Safely Decode OS name
20✔
156
    //
20✔
157
    if current_position + 4 > payload.len() {
20✔
NEW
158
        return Err(IggyError::InvalidNumberEncoding);
×
159
    }
20✔
160
    let os_name_length = u32::from_le_bytes(
20✔
161
        payload[current_position..current_position + 4]
20✔
162
            .try_into()
20✔
163
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
164
    ) as usize;
165
    current_position += 4;
20✔
166
    if current_position + os_name_length > payload.len() {
20✔
NEW
167
        return Err(IggyError::InvalidNumberEncoding);
×
168
    }
20✔
169
    let os_name = from_utf8(&payload[current_position..current_position + os_name_length])
20✔
170
        .map_err(|_| IggyError::InvalidUtf8)?
20✔
171
        .to_string();
20✔
172
    current_position += os_name_length;
20✔
173

20✔
174
    //
20✔
175
    // Safely decode OS version
20✔
176
    //
20✔
177
    if current_position + 4 > payload.len() {
20✔
NEW
178
        return Err(IggyError::InvalidNumberEncoding);
×
179
    }
20✔
180
    let os_version_length = u32::from_le_bytes(
20✔
181
        payload[current_position..current_position + 4]
20✔
182
            .try_into()
20✔
183
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
184
    ) as usize;
185
    current_position += 4;
20✔
186
    if current_position + os_version_length > payload.len() {
20✔
NEW
187
        return Err(IggyError::InvalidNumberEncoding);
×
188
    }
20✔
189
    let os_version = from_utf8(&payload[current_position..current_position + os_version_length])
20✔
190
        .map_err(|_| IggyError::InvalidUtf8)?
20✔
191
        .to_string();
20✔
192
    current_position += os_version_length;
20✔
193

20✔
194
    //
20✔
195
    // Safely decode kernel version (NEW) + server version (NEW) + server semver (NEW)
20✔
196
    // We'll check if there's enough bytes before reading each new field.
20✔
197
    //
20✔
198

20✔
199
    // Default them in case payload doesn't have them (older server)
20✔
200
    let mut kernel_version = String::new();
20✔
201
    let mut iggy_server_version = String::new();
20✔
202
    let mut iggy_server_semver: Option<u32> = None;
20✔
203

20✔
204
    // kernel_version (if it exists)
20✔
205
    if current_position + 4 <= payload.len() {
20✔
206
        let kernel_version_length = u32::from_le_bytes(
20✔
207
            payload[current_position..current_position + 4]
20✔
208
                .try_into()
20✔
209
                .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
210
        ) as usize;
211
        current_position += 4;
20✔
212
        if current_position + kernel_version_length <= payload.len() {
20✔
213
            let kv =
20✔
214
                from_utf8(&payload[current_position..current_position + kernel_version_length])
20✔
215
                    .map_err(|_| IggyError::InvalidUtf8)?
20✔
216
                    .to_string();
20✔
217
            kernel_version = kv;
20✔
218
            current_position += kernel_version_length;
20✔
NEW
219
        } else {
×
NEW
220
            // Not enough bytes for kernel version string, treat as empty or error out
×
NEW
221
            // return Err(IggyError::InvalidNumberEncoding);
×
NEW
222
            kernel_version = String::new(); // fallback
×
NEW
223
        }
×
UNCOV
224
    } else {
×
NEW
225
        // This means older server didn't send kernel_version, so remain empty
×
NEW
226
    }
×
227

228
    // iggy_server_version (if it exists)
229
    if current_position + 4 <= payload.len() {
20✔
230
        let iggy_version_length = u32::from_le_bytes(
20✔
231
            payload[current_position..current_position + 4]
20✔
232
                .try_into()
20✔
233
                .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
234
        ) as usize;
235
        current_position += 4;
20✔
236
        if current_position + iggy_version_length <= payload.len() {
20✔
237
            let iv = from_utf8(&payload[current_position..current_position + iggy_version_length])
20✔
238
                .map_err(|_| IggyError::InvalidUtf8)?
20✔
239
                .to_string();
20✔
240
            iggy_server_version = iv;
20✔
241
            current_position += iggy_version_length;
20✔
NEW
242
        } else {
×
NEW
243
            // Not enough bytes for iggy version string, treat as empty or error out
×
NEW
244
            // return Err(IggyError::InvalidNumberEncoding);
×
NEW
245
            iggy_server_version = String::new(); // fallback
×
NEW
246
        }
×
NEW
247
    } else {
×
NEW
248
        // older server didn't send iggy_server_version, so remain empty
×
NEW
249
    }
×
250

251
    // iggy_server_semver (if it exists)
252
    if current_position + 4 <= payload.len() {
20✔
253
        let semver = u32::from_le_bytes(
20✔
254
            payload[current_position..current_position + 4]
20✔
255
                .try_into()
20✔
256
                .map_err(|_| IggyError::InvalidNumberEncoding)?,
20✔
257
        );
258
        // current_position += 4; // uncomment this when adding new fields
259
        if semver != 0 {
20✔
260
            iggy_server_semver = Some(semver);
20✔
261
        }
20✔
NEW
262
    } else {
×
NEW
263
        // older server didn't send semver
×
NEW
264
    }
×
265

266
    Ok(Stats {
20✔
267
        process_id,
20✔
268
        cpu_usage,
20✔
269
        total_cpu_usage,
20✔
270
        memory_usage,
20✔
271
        total_memory,
20✔
272
        available_memory,
20✔
273
        run_time,
20✔
274
        start_time,
20✔
275
        read_bytes,
20✔
276
        written_bytes,
20✔
277
        messages_size_bytes,
20✔
278
        streams_count,
20✔
279
        topics_count,
20✔
280
        partitions_count,
20✔
281
        segments_count,
20✔
282
        messages_count,
20✔
283
        clients_count,
20✔
284
        consumer_groups_count,
20✔
285
        hostname,
20✔
286
        os_name,
20✔
287
        os_version,
20✔
288
        kernel_version,
20✔
289
        iggy_server_version,
20✔
290
        iggy_server_semver,
20✔
291
    })
20✔
292
}
20✔
293

294
pub fn map_consumer_offset(payload: Bytes) -> Result<ConsumerOffsetInfo, IggyError> {
22✔
295
    let partition_id = u32::from_le_bytes(
22✔
296
        payload[..4]
22✔
297
            .try_into()
22✔
298
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
22✔
299
    );
300
    let current_offset = u64::from_le_bytes(
22✔
301
        payload[4..12]
22✔
302
            .try_into()
22✔
303
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
22✔
304
    );
305
    let stored_offset = u64::from_le_bytes(
22✔
306
        payload[12..20]
22✔
307
            .try_into()
22✔
308
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
22✔
309
    );
310
    Ok(ConsumerOffsetInfo {
22✔
311
        partition_id,
22✔
312
        current_offset,
22✔
313
        stored_offset,
22✔
314
    })
22✔
315
}
22✔
316

317
pub fn map_user(payload: Bytes) -> Result<UserInfoDetails, IggyError> {
88✔
318
    let (user, position) = map_to_user_info(payload.clone(), 0)?;
88✔
319
    let has_permissions = payload[position];
88✔
320
    let permissions = if has_permissions == 1 {
88✔
321
        let permissions_length = u32::from_le_bytes(
41✔
322
            payload[position + 1..position + 5]
41✔
323
                .try_into()
41✔
324
                .map_err(|_| IggyError::InvalidNumberEncoding)?,
41✔
325
        ) as usize;
326
        let permissions = payload.slice(position + 5..position + 5 + permissions_length);
41✔
327
        Some(Permissions::from_bytes(permissions)?)
41✔
328
    } else {
329
        None
47✔
330
    };
331

332
    let user = UserInfoDetails {
88✔
333
        id: user.id,
88✔
334
        created_at: user.created_at,
88✔
335
        status: user.status,
88✔
336
        username: user.username,
88✔
337
        permissions,
88✔
338
    };
88✔
339
    Ok(user)
88✔
340
}
88✔
341

342
pub fn map_users(payload: Bytes) -> Result<Vec<UserInfo>, IggyError> {
24✔
343
    if payload.is_empty() {
24✔
344
        return Ok(EMPTY_USERS);
×
345
    }
24✔
346

24✔
347
    let mut users = Vec::new();
24✔
348
    let length = payload.len();
24✔
349
    let mut position = 0;
24✔
350
    while position < length {
54✔
351
        let (user, read_bytes) = map_to_user_info(payload.clone(), position)?;
30✔
352
        users.push(user);
30✔
353
        position += read_bytes;
30✔
354
    }
355
    users.sort_by(|x, y| x.id.cmp(&y.id));
24✔
356
    Ok(users)
24✔
357
}
24✔
358

359
pub fn map_personal_access_tokens(
24✔
360
    payload: Bytes,
24✔
361
) -> Result<Vec<PersonalAccessTokenInfo>, IggyError> {
24✔
362
    if payload.is_empty() {
24✔
363
        return Ok(EMPTY_PERSONAL_ACCESS_TOKENS);
10✔
364
    }
14✔
365

14✔
366
    let mut personal_access_tokens = Vec::new();
14✔
367
    let length = payload.len();
14✔
368
    let mut position = 0;
14✔
369
    while position < length {
30✔
370
        let (personal_access_token, read_bytes) = map_to_pat_info(payload.clone(), position)?;
16✔
371
        personal_access_tokens.push(personal_access_token);
16✔
372
        position += read_bytes;
16✔
373
    }
374
    personal_access_tokens.sort_by(|x, y| x.name.cmp(&y.name));
14✔
375
    Ok(personal_access_tokens)
14✔
376
}
24✔
377

378
pub fn map_identity_info(payload: Bytes) -> Result<IdentityInfo, IggyError> {
443✔
379
    let user_id = u32::from_le_bytes(
443✔
380
        payload[..4]
443✔
381
            .try_into()
443✔
382
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
443✔
383
    );
384
    Ok(IdentityInfo {
443✔
385
        user_id,
443✔
386
        access_token: None,
443✔
387
    })
443✔
388
}
443✔
389

390
pub fn map_raw_pat(payload: Bytes) -> Result<RawPersonalAccessToken, IggyError> {
18✔
391
    let token_length = payload[0];
18✔
392
    let token = from_utf8(&payload[1..1 + token_length as usize])
18✔
393
        .map_err(|_| IggyError::InvalidUtf8)?
18✔
394
        .to_string();
18✔
395
    Ok(RawPersonalAccessToken { token })
18✔
396
}
18✔
397

398
pub fn map_client(payload: Bytes) -> Result<ClientInfoDetails, IggyError> {
35✔
399
    let (client, mut position) = map_to_client_info(payload.clone(), 0)?;
35✔
400
    let mut consumer_groups = Vec::new();
35✔
401
    let length = payload.len();
35✔
402
    while position < length {
53✔
403
        for _ in 0..client.consumer_groups_count {
18✔
404
            let stream_id = u32::from_le_bytes(
18✔
405
                payload[position..position + 4]
18✔
406
                    .try_into()
18✔
407
                    .map_err(|_| IggyError::InvalidNumberEncoding)?,
18✔
408
            );
409
            let topic_id = u32::from_le_bytes(
18✔
410
                payload[position + 4..position + 8]
18✔
411
                    .try_into()
18✔
412
                    .map_err(|_| IggyError::InvalidNumberEncoding)?,
18✔
413
            );
414
            let group_id = u32::from_le_bytes(
18✔
415
                payload[position + 8..position + 12]
18✔
416
                    .try_into()
18✔
417
                    .map_err(|_| IggyError::InvalidNumberEncoding)?,
18✔
418
            );
419
            let consumer_group = ConsumerGroupInfo {
18✔
420
                stream_id,
18✔
421
                topic_id,
18✔
422
                group_id,
18✔
423
            };
18✔
424
            consumer_groups.push(consumer_group);
18✔
425
            position += 12;
18✔
426
        }
427
    }
428

429
    consumer_groups.sort_by(|x, y| x.group_id.cmp(&y.group_id));
35✔
430
    let client = ClientInfoDetails {
35✔
431
        client_id: client.client_id,
35✔
432
        user_id: client.user_id,
35✔
433
        address: client.address,
35✔
434
        transport: client.transport,
35✔
435
        consumer_groups_count: client.consumer_groups_count,
35✔
436
        consumer_groups,
35✔
437
    };
35✔
438
    Ok(client)
35✔
439
}
35✔
440

441
pub fn map_clients(payload: Bytes) -> Result<Vec<ClientInfo>, IggyError> {
5✔
442
    if payload.is_empty() {
5✔
443
        return Ok(EMPTY_CLIENTS);
×
444
    }
5✔
445

5✔
446
    let mut clients = Vec::new();
5✔
447
    let length = payload.len();
5✔
448
    let mut position = 0;
5✔
449
    while position < length {
13✔
450
        let (client, read_bytes) = map_to_client_info(payload.clone(), position)?;
8✔
451
        clients.push(client);
8✔
452
        position += read_bytes;
8✔
453
    }
454
    clients.sort_by(|x, y| x.client_id.cmp(&y.client_id));
5✔
455
    Ok(clients)
5✔
456
}
5✔
457

458
pub fn map_polled_messages(payload: Bytes) -> Result<PolledMessages, IggyError> {
67,013✔
459
    if payload.is_empty() {
67,013✔
460
        return Ok(PolledMessages {
×
461
            messages: EMPTY_MESSAGES,
×
462
            partition_id: 0,
×
463
            current_offset: 0,
×
464
        });
×
465
    }
67,013✔
466

67,013✔
467
    let length = payload.len();
67,013✔
468
    let partition_id = u32::from_le_bytes(
67,013✔
469
        payload[..4]
67,013✔
470
            .try_into()
67,013✔
471
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
67,013✔
472
    );
473
    let current_offset = u64::from_le_bytes(
67,013✔
474
        payload[4..12]
67,013✔
475
            .try_into()
67,013✔
476
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
67,013✔
477
    );
478
    // Currently ignored
479
    let _messages_count = u32::from_le_bytes(
67,013✔
480
        payload[12..16]
67,013✔
481
            .try_into()
67,013✔
482
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
67,013✔
483
    );
484
    let mut position = 16;
67,013✔
485
    let mut messages = Vec::new();
67,013✔
486
    while position < length {
1,798,276✔
487
        let offset = u64::from_le_bytes(
1,764,737✔
488
            payload[position..position + 8]
1,764,737✔
489
                .try_into()
1,764,737✔
490
                .map_err(|_| IggyError::InvalidNumberEncoding)?,
1,764,737✔
491
        );
492
        let state = MessageState::from_code(payload[position + 8])?;
1,764,737✔
493
        let timestamp = u64::from_le_bytes(
1,764,737✔
494
            payload[position + 9..position + 17]
1,764,737✔
495
                .try_into()
1,764,737✔
496
                .map_err(|_| IggyError::InvalidNumberEncoding)?,
1,764,737✔
497
        );
498
        let id = u128::from_le_bytes(
1,764,737✔
499
            payload[position + 17..position + 33]
1,764,737✔
500
                .try_into()
1,764,737✔
501
                .map_err(|_| IggyError::InvalidNumberEncoding)?,
1,764,737✔
502
        );
503
        let checksum = u32::from_le_bytes(
1,764,737✔
504
            payload[position + 33..position + 37]
1,764,737✔
505
                .try_into()
1,764,737✔
506
                .map_err(|_| IggyError::InvalidNumberEncoding)?,
1,764,737✔
507
        );
508
        let headers_length = u32::from_le_bytes(
1,764,737✔
509
            payload[position + 37..position + 41]
1,764,737✔
510
                .try_into()
1,764,737✔
511
                .map_err(|_| IggyError::InvalidNumberEncoding)?,
1,764,737✔
512
        );
513
        let headers = if headers_length > 0 {
1,764,737✔
514
            let headers_payload =
4,112✔
515
                payload.slice(position + 41..position + 41 + headers_length as usize);
4,112✔
516
            Some(HashMap::from_bytes(headers_payload)?)
4,112✔
517
        } else {
518
            None
1,760,625✔
519
        };
520
        position += headers_length as usize;
1,764,737✔
521
        let message_length = u32::from_le_bytes(
1,764,737✔
522
            payload[position + 41..position + 45]
1,764,737✔
523
                .try_into()
1,764,737✔
524
                .map_err(|_| IggyError::InvalidNumberEncoding)?,
1,764,737✔
525
        );
526
        let payload_range = position + 45..position + 45 + message_length as usize;
1,764,737✔
527
        if payload_range.start > length || payload_range.end > length {
1,764,737✔
528
            break;
×
529
        }
1,764,737✔
530

1,764,737✔
531
        let payload = payload[payload_range].to_vec();
1,764,737✔
532
        let total_size = 45 + message_length as usize;
1,764,737✔
533
        position += total_size;
1,764,737✔
534
        messages.push(PolledMessage {
1,764,737✔
535
            offset,
1,764,737✔
536
            timestamp,
1,764,737✔
537
            state,
1,764,737✔
538
            checksum,
1,764,737✔
539
            id,
1,764,737✔
540
            headers,
1,764,737✔
541
            length: IggyByteSize::from(message_length as u64),
1,764,737✔
542
            payload: Bytes::from(payload),
1,764,737✔
543
        });
1,764,737✔
544

1,764,737✔
545
        if position + 45 >= length {
1,764,737✔
546
            break;
33,474✔
547
        }
1,731,263✔
548
    }
549

550
    messages.sort_by(|x, y| x.offset.cmp(&y.offset));
1,731,263✔
551
    Ok(PolledMessages {
67,013✔
552
        partition_id,
67,013✔
553
        current_offset,
67,013✔
554
        messages,
67,013✔
555
    })
67,013✔
556
}
67,013✔
557

558
pub fn map_streams(payload: Bytes) -> Result<Vec<Stream>, IggyError> {
43✔
559
    if payload.is_empty() {
43✔
560
        return Ok(EMPTY_STREAMS);
30✔
561
    }
13✔
562

13✔
563
    let mut streams = Vec::new();
13✔
564
    let length = payload.len();
13✔
565
    let mut position = 0;
13✔
566
    while position < length {
85✔
567
        let (stream, read_bytes) = map_to_stream(payload.clone(), position)?;
72✔
568
        streams.push(stream);
72✔
569
        position += read_bytes;
72✔
570
    }
571
    streams.sort_by(|x, y| x.id.cmp(&y.id));
162✔
572
    Ok(streams)
13✔
573
}
43✔
574

575
pub fn map_stream(payload: Bytes) -> Result<StreamDetails, IggyError> {
261✔
576
    let (stream, mut position) = map_to_stream(payload.clone(), 0)?;
261✔
577
    let mut topics = Vec::new();
261✔
578
    let length = payload.len();
261✔
579
    while position < length {
302✔
580
        let (topic, read_bytes) = map_to_topic(payload.clone(), position)?;
41✔
581
        topics.push(topic);
41✔
582
        position += read_bytes;
41✔
583
    }
584

585
    topics.sort_by(|x, y| x.id.cmp(&y.id));
261✔
586
    let stream = StreamDetails {
261✔
587
        id: stream.id,
261✔
588
        created_at: stream.created_at,
261✔
589
        topics_count: stream.topics_count,
261✔
590
        size: stream.size,
261✔
591
        messages_count: stream.messages_count,
261✔
592
        name: stream.name,
261✔
593
        topics,
261✔
594
    };
261✔
595
    Ok(stream)
261✔
596
}
261✔
597

598
fn map_to_stream(payload: Bytes, position: usize) -> Result<(Stream, usize), IggyError> {
333✔
599
    let id = u32::from_le_bytes(
333✔
600
        payload[position..position + 4]
333✔
601
            .try_into()
333✔
602
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
333✔
603
    );
604
    let created_at = u64::from_le_bytes(
333✔
605
        payload[position + 4..position + 12]
333✔
606
            .try_into()
333✔
607
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
333✔
608
    )
609
    .into();
333✔
610
    let topics_count = u32::from_le_bytes(
333✔
611
        payload[position + 12..position + 16]
333✔
612
            .try_into()
333✔
613
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
333✔
614
    );
615
    let size_bytes = u64::from_le_bytes(
333✔
616
        payload[position + 16..position + 24]
333✔
617
            .try_into()
333✔
618
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
333✔
619
    )
620
    .into();
333✔
621
    let messages_count = u64::from_le_bytes(
333✔
622
        payload[position + 24..position + 32]
333✔
623
            .try_into()
333✔
624
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
333✔
625
    );
626
    let name_length = payload[position + 32];
333✔
627
    let name = from_utf8(&payload[position + 33..position + 33 + name_length as usize])
333✔
628
        .map_err(|_| IggyError::InvalidUtf8)?
333✔
629
        .to_string();
333✔
630
    let read_bytes = 4 + 8 + 4 + 8 + 8 + 1 + name_length as usize;
333✔
631
    Ok((
333✔
632
        Stream {
333✔
633
            id,
333✔
634
            created_at,
333✔
635
            name,
333✔
636
            size: size_bytes,
333✔
637
            messages_count,
333✔
638
            topics_count,
333✔
639
        },
333✔
640
        read_bytes,
333✔
641
    ))
333✔
642
}
333✔
643

644
pub fn map_topics(payload: Bytes) -> Result<Vec<Topic>, IggyError> {
11✔
645
    if payload.is_empty() {
11✔
646
        return Ok(EMPTY_TOPICS);
6✔
647
    }
5✔
648

5✔
649
    let mut topics = Vec::new();
5✔
650
    let length = payload.len();
5✔
651
    let mut position = 0;
5✔
652
    while position < length {
10✔
653
        let (topic, read_bytes) = map_to_topic(payload.clone(), position)?;
5✔
654
        topics.push(topic);
5✔
655
        position += read_bytes;
5✔
656
    }
657
    topics.sort_by(|x, y| x.id.cmp(&y.id));
5✔
658
    Ok(topics)
5✔
659
}
11✔
660

661
pub fn map_topic(payload: Bytes) -> Result<TopicDetails, IggyError> {
302✔
662
    let (topic, mut position) = map_to_topic(payload.clone(), 0)?;
302✔
663
    let mut partitions = Vec::new();
302✔
664
    let length = payload.len();
302✔
665
    while position < length {
1,025✔
666
        let (partition, read_bytes) = map_to_partition(payload.clone(), position)?;
723✔
667
        partitions.push(partition);
723✔
668
        position += read_bytes;
723✔
669
    }
670

671
    partitions.sort_by(|x, y| x.id.cmp(&y.id));
831✔
672
    let topic = TopicDetails {
302✔
673
        id: topic.id,
302✔
674
        created_at: topic.created_at,
302✔
675
        name: topic.name,
302✔
676
        size: topic.size,
302✔
677
        messages_count: topic.messages_count,
302✔
678
        message_expiry: topic.message_expiry,
302✔
679
        compression_algorithm: topic.compression_algorithm,
302✔
680
        max_topic_size: topic.max_topic_size,
302✔
681
        replication_factor: topic.replication_factor,
302✔
682
        #[allow(clippy::cast_possible_truncation)]
302✔
683
        partitions_count: partitions.len() as u32,
302✔
684
        partitions,
302✔
685
    };
302✔
686
    Ok(topic)
302✔
687
}
302✔
688

689
fn map_to_topic(payload: Bytes, position: usize) -> Result<(Topic, usize), IggyError> {
348✔
690
    let id = u32::from_le_bytes(
348✔
691
        payload[position..position + 4]
348✔
692
            .try_into()
348✔
693
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
348✔
694
    );
695
    let created_at = u64::from_le_bytes(
348✔
696
        payload[position + 4..position + 12]
348✔
697
            .try_into()
348✔
698
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
348✔
699
    );
700
    let created_at = created_at.into();
348✔
701
    let partitions_count = u32::from_le_bytes(
348✔
702
        payload[position + 12..position + 16]
348✔
703
            .try_into()
348✔
704
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
348✔
705
    );
706
    let message_expiry = match u64::from_le_bytes(
348✔
707
        payload[position + 16..position + 24]
348✔
708
            .try_into()
348✔
709
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
348✔
710
    ) {
711
        0 => IggyExpiry::NeverExpire,
×
712
        message_expiry => message_expiry.into(),
348✔
713
    };
714
    let compression_algorithm = CompressionAlgorithm::from_code(payload[position + 24])?;
348✔
715
    let max_topic_size = u64::from_le_bytes(
348✔
716
        payload[position + 25..position + 33]
348✔
717
            .try_into()
348✔
718
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
348✔
719
    );
720
    let max_topic_size: MaxTopicSize = max_topic_size.into();
348✔
721
    let replication_factor = payload[position + 33];
348✔
722
    let size_bytes = IggyByteSize::from(u64::from_le_bytes(
348✔
723
        payload[position + 34..position + 42]
348✔
724
            .try_into()
348✔
725
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
348✔
726
    ));
727
    let messages_count = u64::from_le_bytes(
348✔
728
        payload[position + 42..position + 50]
348✔
729
            .try_into()
348✔
730
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
348✔
731
    );
732
    let name_length = payload[position + 50];
348✔
733
    let name = from_utf8(&payload[position + 51..position + 51 + name_length as usize])
348✔
734
        .map_err(|_| IggyError::InvalidUtf8)?
348✔
735
        .to_string();
348✔
736
    let read_bytes = 4 + 8 + 4 + 8 + 8 + 8 + 8 + 1 + 1 + 1 + name_length as usize;
348✔
737
    Ok((
348✔
738
        Topic {
348✔
739
            id,
348✔
740
            created_at,
348✔
741
            name,
348✔
742
            partitions_count,
348✔
743
            size: size_bytes,
348✔
744
            messages_count,
348✔
745
            message_expiry,
348✔
746
            compression_algorithm,
348✔
747
            max_topic_size,
348✔
748
            replication_factor,
348✔
749
        },
348✔
750
        read_bytes,
348✔
751
    ))
348✔
752
}
348✔
753

754
fn map_to_partition(payload: Bytes, position: usize) -> Result<(Partition, usize), IggyError> {
723✔
755
    let id = u32::from_le_bytes(
723✔
756
        payload[position..position + 4]
723✔
757
            .try_into()
723✔
758
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
723✔
759
    );
760
    let created_at = u64::from_le_bytes(
723✔
761
        payload[position + 4..position + 12]
723✔
762
            .try_into()
723✔
763
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
723✔
764
    );
765
    let created_at = created_at.into();
723✔
766
    let segments_count = u32::from_le_bytes(
723✔
767
        payload[position + 12..position + 16]
723✔
768
            .try_into()
723✔
769
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
723✔
770
    );
771
    let current_offset = u64::from_le_bytes(
723✔
772
        payload[position + 16..position + 24]
723✔
773
            .try_into()
723✔
774
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
723✔
775
    );
776
    let size_bytes = u64::from_le_bytes(
723✔
777
        payload[position + 24..position + 32]
723✔
778
            .try_into()
723✔
779
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
723✔
780
    )
781
    .into();
723✔
782
    let messages_count = u64::from_le_bytes(
723✔
783
        payload[position + 32..position + 40]
723✔
784
            .try_into()
723✔
785
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
723✔
786
    );
787
    let read_bytes = 4 + 8 + 4 + 8 + 8 + 8;
723✔
788
    Ok((
723✔
789
        Partition {
723✔
790
            id,
723✔
791
            created_at,
723✔
792
            segments_count,
723✔
793
            current_offset,
723✔
794
            size: size_bytes,
723✔
795
            messages_count,
723✔
796
        },
723✔
797
        read_bytes,
723✔
798
    ))
723✔
799
}
723✔
800

801
pub fn map_consumer_groups(payload: Bytes) -> Result<Vec<ConsumerGroup>, IggyError> {
24✔
802
    if payload.is_empty() {
24✔
803
        return Ok(EMPTY_CONSUMER_GROUPS);
10✔
804
    }
14✔
805

14✔
806
    let mut consumer_groups = Vec::new();
14✔
807
    let length = payload.len();
14✔
808
    let mut position = 0;
14✔
809
    while position < length {
28✔
810
        let (consumer_group, read_bytes) = map_to_consumer_group(payload.clone(), position)?;
14✔
811
        consumer_groups.push(consumer_group);
14✔
812
        position += read_bytes;
14✔
813
    }
814
    consumer_groups.sort_by(|x, y| x.id.cmp(&y.id));
14✔
815
    Ok(consumer_groups)
14✔
816
}
24✔
817

818
pub fn map_consumer_group(payload: Bytes) -> Result<ConsumerGroupDetails, IggyError> {
78✔
819
    let (consumer_group, mut position) = map_to_consumer_group(payload.clone(), 0)?;
78✔
820
    let mut members = Vec::new();
78✔
821
    let length = payload.len();
78✔
822
    while position < length {
114✔
823
        let (member, read_bytes) = map_to_consumer_group_member(payload.clone(), position)?;
36✔
824
        members.push(member);
36✔
825
        position += read_bytes;
36✔
826
    }
827
    members.sort_by(|x, y| x.id.cmp(&y.id));
78✔
828
    let consumer_group_details = ConsumerGroupDetails {
78✔
829
        id: consumer_group.id,
78✔
830
        name: consumer_group.name,
78✔
831
        partitions_count: consumer_group.partitions_count,
78✔
832
        members_count: consumer_group.members_count,
78✔
833
        members,
78✔
834
    };
78✔
835
    Ok(consumer_group_details)
78✔
836
}
78✔
837

838
fn map_to_consumer_group(
92✔
839
    payload: Bytes,
92✔
840
    position: usize,
92✔
841
) -> Result<(ConsumerGroup, usize), IggyError> {
92✔
842
    let id = u32::from_le_bytes(
92✔
843
        payload[position..position + 4]
92✔
844
            .try_into()
92✔
845
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
92✔
846
    );
847
    let partitions_count = u32::from_le_bytes(
92✔
848
        payload[position + 4..position + 8]
92✔
849
            .try_into()
92✔
850
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
92✔
851
    );
852
    let members_count = u32::from_le_bytes(
92✔
853
        payload[position + 8..position + 12]
92✔
854
            .try_into()
92✔
855
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
92✔
856
    );
857
    let name_length = payload[position + 12];
92✔
858
    let name = from_utf8(&payload[position + 13..position + 13 + name_length as usize])
92✔
859
        .map_err(|_| IggyError::InvalidUtf8)?
92✔
860
        .to_string();
92✔
861
    let read_bytes = 13 + name_length as usize;
92✔
862
    Ok((
92✔
863
        ConsumerGroup {
92✔
864
            id,
92✔
865
            partitions_count,
92✔
866
            members_count,
92✔
867
            name,
92✔
868
        },
92✔
869
        read_bytes,
92✔
870
    ))
92✔
871
}
92✔
872

873
fn map_to_consumer_group_member(
36✔
874
    payload: Bytes,
36✔
875
    position: usize,
36✔
876
) -> Result<(ConsumerGroupMember, usize), IggyError> {
36✔
877
    let id = u32::from_le_bytes(
36✔
878
        payload[position..position + 4]
36✔
879
            .try_into()
36✔
880
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
36✔
881
    );
882
    let partitions_count = u32::from_le_bytes(
36✔
883
        payload[position + 4..position + 8]
36✔
884
            .try_into()
36✔
885
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
36✔
886
    );
887
    let mut partitions = Vec::new();
36✔
888
    for i in 0..partitions_count {
54✔
889
        let partition_id = u32::from_le_bytes(
54✔
890
            payload[position + 8 + (i * 4) as usize..position + 8 + ((i + 1) * 4) as usize]
54✔
891
                .try_into()
54✔
892
                .map_err(|_| IggyError::InvalidNumberEncoding)?,
54✔
893
        );
894
        partitions.push(partition_id);
54✔
895
    }
896

897
    let read_bytes = (4 + 4 + partitions_count * 4) as usize;
36✔
898
    Ok((
36✔
899
        ConsumerGroupMember {
36✔
900
            id,
36✔
901
            partitions_count,
36✔
902
            partitions,
36✔
903
        },
36✔
904
        read_bytes,
36✔
905
    ))
36✔
906
}
36✔
907

908
fn map_to_client_info(
43✔
909
    payload: Bytes,
43✔
910
    mut position: usize,
43✔
911
) -> Result<(ClientInfo, usize), IggyError> {
43✔
912
    let mut read_bytes;
913
    let client_id = u32::from_le_bytes(
43✔
914
        payload[position..position + 4]
43✔
915
            .try_into()
43✔
916
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
43✔
917
    );
918
    let user_id = u32::from_le_bytes(
43✔
919
        payload[position + 4..position + 8]
43✔
920
            .try_into()
43✔
921
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
43✔
922
    );
923
    let user_id = match user_id {
43✔
924
        0 => None,
×
925
        _ => Some(user_id),
43✔
926
    };
927

928
    let transport = payload[position + 8];
43✔
929
    let transport = match transport {
43✔
930
        1 => "TCP",
30✔
931
        2 => "QUIC",
13✔
932
        _ => "Unknown",
×
933
    }
934
    .to_string();
43✔
935

936
    let address_length = u32::from_le_bytes(
43✔
937
        payload[position + 9..position + 13]
43✔
938
            .try_into()
43✔
939
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
43✔
940
    ) as usize;
941
    let address = from_utf8(&payload[position + 13..position + 13 + address_length])
43✔
942
        .map_err(|_| IggyError::InvalidUtf8)?
43✔
943
        .to_string();
43✔
944
    read_bytes = 4 + 4 + 1 + 4 + address_length;
43✔
945
    position += read_bytes;
43✔
946
    let consumer_groups_count = u32::from_le_bytes(
43✔
947
        payload[position..position + 4]
43✔
948
            .try_into()
43✔
949
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
43✔
950
    );
951
    read_bytes += 4;
43✔
952
    Ok((
43✔
953
        ClientInfo {
43✔
954
            client_id,
43✔
955
            user_id,
43✔
956
            address,
43✔
957
            transport,
43✔
958
            consumer_groups_count,
43✔
959
        },
43✔
960
        read_bytes,
43✔
961
    ))
43✔
962
}
43✔
963

964
fn map_to_user_info(payload: Bytes, position: usize) -> Result<(UserInfo, usize), IggyError> {
118✔
965
    let id = u32::from_le_bytes(
118✔
966
        payload[position..position + 4]
118✔
967
            .try_into()
118✔
968
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
118✔
969
    );
970
    let created_at = u64::from_le_bytes(
118✔
971
        payload[position + 4..position + 12]
118✔
972
            .try_into()
118✔
973
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
118✔
974
    );
975
    let created_at = created_at.into();
118✔
976
    let status = payload[position + 12];
118✔
977
    let status = UserStatus::from_code(status)?;
118✔
978
    let username_length = payload[position + 13];
118✔
979
    let username = from_utf8(&payload[position + 14..position + 14 + username_length as usize])
118✔
980
        .map_err(|_| IggyError::InvalidUtf8)?
118✔
981
        .to_string();
118✔
982
    let read_bytes = 4 + 8 + 1 + 1 + username_length as usize;
118✔
983

118✔
984
    Ok((
118✔
985
        UserInfo {
118✔
986
            id,
118✔
987
            created_at,
118✔
988
            status,
118✔
989
            username,
118✔
990
        },
118✔
991
        read_bytes,
118✔
992
    ))
118✔
993
}
118✔
994

995
fn map_to_pat_info(
16✔
996
    payload: Bytes,
16✔
997
    position: usize,
16✔
998
) -> Result<(PersonalAccessTokenInfo, usize), IggyError> {
16✔
999
    let name_length = payload[position];
16✔
1000
    let name = from_utf8(&payload[position + 1..position + 1 + name_length as usize])
16✔
1001
        .map_err(|_| IggyError::InvalidUtf8)?
16✔
1002
        .to_string();
16✔
1003
    let position = position + 1 + name_length as usize;
16✔
1004
    let expiry_at = u64::from_le_bytes(
16✔
1005
        payload[position..position + 8]
16✔
1006
            .try_into()
16✔
1007
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
16✔
1008
    );
1009
    let expiry_at = match expiry_at {
16✔
1010
        0 => None,
7✔
1011
        value => Some(value.into()),
9✔
1012
    };
1013
    let read_bytes = 1 + name_length as usize + 8;
16✔
1014
    Ok((PersonalAccessTokenInfo { name, expiry_at }, read_bytes))
16✔
1015
}
16✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc