• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

iggy-rs / iggy / 12875284241

20 Jan 2025 08:30PM UTC coverage: 75.442% (+0.05%) from 75.392%
12875284241

push

github

web-flow
Refactor: Replace dyn Trait with Enum for Encryptor and Persister Types (#1442)

Addresses part of #https://github.com/iggy-rs/iggy/issues/667

This PR refactors the codebase to replace the use of dyn Trait for
Encryptor and Persister with enum types (EncryptorKind and
PersisterKind)

Co-authored-by: Hubert Gruszecki <h.gruszecki@gmail.com>

41 of 61 new or added lines in 16 files covered. (67.21%)

17 existing lines in 4 files now uncovered.

24680 of 32714 relevant lines covered (75.44%)

25631.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.46
/sdk/src/binary/mapper.rs
1
use crate::bytes_serializable::BytesSerializable;
2
use crate::compression::compression_algorithm::CompressionAlgorithm;
3
use crate::error::IggyError;
4
use crate::models::client_info::{ClientInfo, ClientInfoDetails, ConsumerGroupInfo};
5
use crate::models::consumer_group::{ConsumerGroup, ConsumerGroupDetails, ConsumerGroupMember};
6
use crate::models::consumer_offset_info::ConsumerOffsetInfo;
7
use crate::models::identity_info::IdentityInfo;
8
use crate::models::messages::{MessageState, PolledMessage, PolledMessages};
9
use crate::models::partition::Partition;
10
use crate::models::permissions::Permissions;
11
use crate::models::personal_access_token::{PersonalAccessTokenInfo, RawPersonalAccessToken};
12
use crate::models::stats::Stats;
13
use crate::models::stream::{Stream, StreamDetails};
14
use crate::models::topic::{Topic, TopicDetails};
15
use crate::models::user_info::{UserInfo, UserInfoDetails};
16
use crate::models::user_status::UserStatus;
17
use crate::utils::byte_size::IggyByteSize;
18
use crate::utils::expiry::IggyExpiry;
19
use crate::utils::topic_size::MaxTopicSize;
20
use bytes::Bytes;
21
use std::collections::HashMap;
22
use std::str::from_utf8;
23

24
const EMPTY_MESSAGES: Vec<PolledMessage> = vec![];
25
const EMPTY_TOPICS: Vec<Topic> = vec![];
26
const EMPTY_STREAMS: Vec<Stream> = vec![];
27
const EMPTY_CLIENTS: Vec<ClientInfo> = vec![];
28
const EMPTY_USERS: Vec<UserInfo> = vec![];
29
const EMPTY_PERSONAL_ACCESS_TOKENS: Vec<PersonalAccessTokenInfo> = vec![];
30
const EMPTY_CONSUMER_GROUPS: Vec<ConsumerGroup> = vec![];
31

32
pub fn map_stats(payload: Bytes) -> Result<Stats, IggyError> {
9✔
33
    let process_id = u32::from_le_bytes(
9✔
34
        payload[..4]
9✔
35
            .try_into()
9✔
36
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
9✔
37
    );
38
    let cpu_usage = f32::from_le_bytes(
9✔
39
        payload[4..8]
9✔
40
            .try_into()
9✔
41
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
9✔
42
    );
43
    let total_cpu_usage = f32::from_le_bytes(
9✔
44
        payload[8..12]
9✔
45
            .try_into()
9✔
46
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
9✔
47
    );
48
    let memory_usage = u64::from_le_bytes(
9✔
49
        payload[12..20]
9✔
50
            .try_into()
9✔
51
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
9✔
52
    )
53
    .into();
9✔
54
    let total_memory = u64::from_le_bytes(
9✔
55
        payload[20..28]
9✔
56
            .try_into()
9✔
57
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
9✔
58
    )
59
    .into();
9✔
60
    let available_memory = u64::from_le_bytes(
9✔
61
        payload[28..36]
9✔
62
            .try_into()
9✔
63
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
9✔
64
    )
65
    .into();
9✔
66
    let run_time = u64::from_le_bytes(
9✔
67
        payload[36..44]
9✔
68
            .try_into()
9✔
69
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
9✔
70
    )
71
    .into();
9✔
72
    let start_time = u64::from_le_bytes(
9✔
73
        payload[44..52]
9✔
74
            .try_into()
9✔
75
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
9✔
76
    )
77
    .into();
9✔
78
    let read_bytes = u64::from_le_bytes(
9✔
79
        payload[52..60]
9✔
80
            .try_into()
9✔
81
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
9✔
82
    )
83
    .into();
9✔
84
    let written_bytes = u64::from_le_bytes(
9✔
85
        payload[60..68]
9✔
86
            .try_into()
9✔
87
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
9✔
88
    )
89
    .into();
9✔
90
    let messages_size_bytes = u64::from_le_bytes(
9✔
91
        payload[68..76]
9✔
92
            .try_into()
9✔
93
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
9✔
94
    )
95
    .into();
9✔
96
    let streams_count = u32::from_le_bytes(
9✔
97
        payload[76..80]
9✔
98
            .try_into()
9✔
99
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
9✔
100
    );
101
    let topics_count = u32::from_le_bytes(
9✔
102
        payload[80..84]
9✔
103
            .try_into()
9✔
104
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
9✔
105
    );
106
    let partitions_count = u32::from_le_bytes(
9✔
107
        payload[84..88]
9✔
108
            .try_into()
9✔
109
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
9✔
110
    );
111
    let segments_count = u32::from_le_bytes(
9✔
112
        payload[88..92]
9✔
113
            .try_into()
9✔
114
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
9✔
115
    );
116
    let messages_count = u64::from_le_bytes(
9✔
117
        payload[92..100]
9✔
118
            .try_into()
9✔
119
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
9✔
120
    );
121
    let clients_count = u32::from_le_bytes(
9✔
122
        payload[100..104]
9✔
123
            .try_into()
9✔
124
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
9✔
125
    );
126
    let consumer_groups_count = u32::from_le_bytes(
9✔
127
        payload[104..108]
9✔
128
            .try_into()
9✔
129
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
9✔
130
    );
131
    let mut current_position = 108;
9✔
132
    let hostname_length = u32::from_le_bytes(
9✔
133
        payload[current_position..current_position + 4]
9✔
134
            .try_into()
9✔
135
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
9✔
136
    ) as usize;
137
    let hostname =
9✔
138
        from_utf8(&payload[current_position + 4..current_position + 4 + hostname_length])
9✔
139
            .map_err(|_| IggyError::InvalidUtf8)?
9✔
140
            .to_string();
9✔
141
    current_position += 4 + hostname_length;
9✔
142
    let os_name_length = u32::from_le_bytes(
9✔
143
        payload[current_position..current_position + 4]
9✔
144
            .try_into()
9✔
145
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
9✔
146
    ) as usize;
147
    let os_name = from_utf8(&payload[current_position + 4..current_position + 4 + os_name_length])
9✔
148
        .map_err(|_| IggyError::InvalidUtf8)?
9✔
149
        .to_string();
9✔
150
    current_position += 4 + os_name_length;
9✔
151
    let os_version_length = u32::from_le_bytes(
9✔
152
        payload[current_position..current_position + 4]
9✔
153
            .try_into()
9✔
154
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
9✔
155
    ) as usize;
156
    let os_version =
9✔
157
        from_utf8(&payload[current_position + 4..current_position + 4 + os_version_length])
9✔
158
            .map_err(|_| IggyError::InvalidUtf8)?
9✔
159
            .to_string();
9✔
160
    current_position += 4 + os_version_length;
9✔
161
    let kernel_version_length = u32::from_le_bytes(
9✔
162
        payload[current_position..current_position + 4]
9✔
163
            .try_into()
9✔
164
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
9✔
165
    ) as usize;
166
    let kernel_version =
9✔
167
        from_utf8(&payload[current_position + 4..current_position + 4 + kernel_version_length])
9✔
168
            .map_err(|_| IggyError::InvalidUtf8)?
9✔
169
            .to_string();
9✔
170
    current_position += 4 + kernel_version_length;
9✔
171
    let iggy_version_length = u32::from_le_bytes(
9✔
172
        payload[current_position..current_position + 4]
9✔
173
            .try_into()
9✔
174
            .map_err(|_| IggyError::InvalidUtf8)?,
9✔
175
    ) as usize;
176
    let iggy_version =
9✔
177
        from_utf8(&payload[current_position + 4..current_position + 4 + iggy_version_length])
9✔
178
            .map_err(|_| IggyError::InvalidUtf8)?
9✔
179
            .to_string();
9✔
180
    current_position += 4 + iggy_version_length;
9✔
181
    let iggy_semver = u32::from_le_bytes(
9✔
182
        payload[current_position..current_position + 4]
9✔
183
            .try_into()
9✔
184
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
9✔
185
    );
186
    let iggy_semver = if iggy_semver == 0 {
9✔
UNCOV
187
        None
×
188
    } else {
189
        Some(iggy_semver)
9✔
190
    };
191

192
    Ok(Stats {
9✔
193
        process_id,
9✔
194
        cpu_usage,
9✔
195
        total_cpu_usage,
9✔
196
        memory_usage,
9✔
197
        total_memory,
9✔
198
        available_memory,
9✔
199
        run_time,
9✔
200
        start_time,
9✔
201
        read_bytes,
9✔
202
        written_bytes,
9✔
203
        messages_size_bytes,
9✔
204
        streams_count,
9✔
205
        topics_count,
9✔
206
        partitions_count,
9✔
207
        segments_count,
9✔
208
        messages_count,
9✔
209
        clients_count,
9✔
210
        consumer_groups_count,
9✔
211
        hostname,
9✔
212
        os_name,
9✔
213
        os_version,
9✔
214
        kernel_version,
9✔
215
        iggy_server_version: iggy_version,
9✔
216
        iggy_server_semver: iggy_semver,
9✔
217
    })
9✔
218
}
9✔
219

220
pub fn map_consumer_offset(payload: Bytes) -> Result<ConsumerOffsetInfo, IggyError> {
22✔
221
    let partition_id = u32::from_le_bytes(
22✔
222
        payload[..4]
22✔
223
            .try_into()
22✔
224
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
22✔
225
    );
226
    let current_offset = u64::from_le_bytes(
22✔
227
        payload[4..12]
22✔
228
            .try_into()
22✔
229
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
22✔
230
    );
231
    let stored_offset = u64::from_le_bytes(
22✔
232
        payload[12..20]
22✔
233
            .try_into()
22✔
234
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
22✔
235
    );
236
    Ok(ConsumerOffsetInfo {
22✔
237
        partition_id,
22✔
238
        current_offset,
22✔
239
        stored_offset,
22✔
240
    })
22✔
241
}
22✔
242

243
pub fn map_user(payload: Bytes) -> Result<UserInfoDetails, IggyError> {
88✔
244
    let (user, position) = map_to_user_info(payload.clone(), 0)?;
88✔
245
    let has_permissions = payload[position];
88✔
246
    let permissions = if has_permissions == 1 {
88✔
247
        let permissions_length = u32::from_le_bytes(
41✔
248
            payload[position + 1..position + 5]
41✔
249
                .try_into()
41✔
250
                .map_err(|_| IggyError::InvalidNumberEncoding)?,
41✔
251
        ) as usize;
252
        let permissions = payload.slice(position + 5..position + 5 + permissions_length);
41✔
253
        Some(Permissions::from_bytes(permissions)?)
41✔
254
    } else {
255
        None
47✔
256
    };
257

258
    let user = UserInfoDetails {
88✔
259
        id: user.id,
88✔
260
        created_at: user.created_at,
88✔
261
        status: user.status,
88✔
262
        username: user.username,
88✔
263
        permissions,
88✔
264
    };
88✔
265
    Ok(user)
88✔
266
}
88✔
267

268
pub fn map_users(payload: Bytes) -> Result<Vec<UserInfo>, IggyError> {
24✔
269
    if payload.is_empty() {
24✔
UNCOV
270
        return Ok(EMPTY_USERS);
×
271
    }
24✔
272

24✔
273
    let mut users = Vec::new();
24✔
274
    let length = payload.len();
24✔
275
    let mut position = 0;
24✔
276
    while position < length {
54✔
277
        let (user, read_bytes) = map_to_user_info(payload.clone(), position)?;
30✔
278
        users.push(user);
30✔
279
        position += read_bytes;
30✔
280
    }
281
    users.sort_by(|x, y| x.id.cmp(&y.id));
24✔
282
    Ok(users)
24✔
283
}
24✔
284

285
pub fn map_personal_access_tokens(
24✔
286
    payload: Bytes,
24✔
287
) -> Result<Vec<PersonalAccessTokenInfo>, IggyError> {
24✔
288
    if payload.is_empty() {
24✔
289
        return Ok(EMPTY_PERSONAL_ACCESS_TOKENS);
10✔
290
    }
14✔
291

14✔
292
    let mut personal_access_tokens = Vec::new();
14✔
293
    let length = payload.len();
14✔
294
    let mut position = 0;
14✔
295
    while position < length {
30✔
296
        let (personal_access_token, read_bytes) = map_to_pat_info(payload.clone(), position)?;
16✔
297
        personal_access_tokens.push(personal_access_token);
16✔
298
        position += read_bytes;
16✔
299
    }
300
    personal_access_tokens.sort_by(|x, y| x.name.cmp(&y.name));
14✔
301
    Ok(personal_access_tokens)
14✔
302
}
24✔
303

304
pub fn map_identity_info(payload: Bytes) -> Result<IdentityInfo, IggyError> {
432✔
305
    let user_id = u32::from_le_bytes(
432✔
306
        payload[..4]
432✔
307
            .try_into()
432✔
308
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
432✔
309
    );
310
    Ok(IdentityInfo {
432✔
311
        user_id,
432✔
312
        access_token: None,
432✔
313
    })
432✔
314
}
432✔
315

316
pub fn map_raw_pat(payload: Bytes) -> Result<RawPersonalAccessToken, IggyError> {
18✔
317
    let token_length = payload[0];
18✔
318
    let token = from_utf8(&payload[1..1 + token_length as usize])
18✔
319
        .map_err(|_| IggyError::InvalidUtf8)?
18✔
320
        .to_string();
18✔
321
    Ok(RawPersonalAccessToken { token })
18✔
322
}
18✔
323

324
pub fn map_client(payload: Bytes) -> Result<ClientInfoDetails, IggyError> {
35✔
325
    let (client, mut position) = map_to_client_info(payload.clone(), 0)?;
35✔
326
    let mut consumer_groups = Vec::new();
35✔
327
    let length = payload.len();
35✔
328
    while position < length {
53✔
329
        for _ in 0..client.consumer_groups_count {
18✔
330
            let stream_id = u32::from_le_bytes(
18✔
331
                payload[position..position + 4]
18✔
332
                    .try_into()
18✔
333
                    .map_err(|_| IggyError::InvalidNumberEncoding)?,
18✔
334
            );
335
            let topic_id = u32::from_le_bytes(
18✔
336
                payload[position + 4..position + 8]
18✔
337
                    .try_into()
18✔
338
                    .map_err(|_| IggyError::InvalidNumberEncoding)?,
18✔
339
            );
340
            let group_id = u32::from_le_bytes(
18✔
341
                payload[position + 8..position + 12]
18✔
342
                    .try_into()
18✔
343
                    .map_err(|_| IggyError::InvalidNumberEncoding)?,
18✔
344
            );
345
            let consumer_group = ConsumerGroupInfo {
18✔
346
                stream_id,
18✔
347
                topic_id,
18✔
348
                group_id,
18✔
349
            };
18✔
350
            consumer_groups.push(consumer_group);
18✔
351
            position += 12;
18✔
352
        }
353
    }
354

355
    consumer_groups.sort_by(|x, y| x.group_id.cmp(&y.group_id));
35✔
356
    let client = ClientInfoDetails {
35✔
357
        client_id: client.client_id,
35✔
358
        user_id: client.user_id,
35✔
359
        address: client.address,
35✔
360
        transport: client.transport,
35✔
361
        consumer_groups_count: client.consumer_groups_count,
35✔
362
        consumer_groups,
35✔
363
    };
35✔
364
    Ok(client)
35✔
365
}
35✔
366

367
pub fn map_clients(payload: Bytes) -> Result<Vec<ClientInfo>, IggyError> {
5✔
368
    if payload.is_empty() {
5✔
UNCOV
369
        return Ok(EMPTY_CLIENTS);
×
370
    }
5✔
371

5✔
372
    let mut clients = Vec::new();
5✔
373
    let length = payload.len();
5✔
374
    let mut position = 0;
5✔
375
    while position < length {
13✔
376
        let (client, read_bytes) = map_to_client_info(payload.clone(), position)?;
8✔
377
        clients.push(client);
8✔
378
        position += read_bytes;
8✔
379
    }
380
    clients.sort_by(|x, y| x.client_id.cmp(&y.client_id));
5✔
381
    Ok(clients)
5✔
382
}
5✔
383

384
pub fn map_polled_messages(payload: Bytes) -> Result<PolledMessages, IggyError> {
68,618✔
385
    if payload.is_empty() {
68,618✔
UNCOV
386
        return Ok(PolledMessages {
×
UNCOV
387
            messages: EMPTY_MESSAGES,
×
UNCOV
388
            partition_id: 0,
×
UNCOV
389
            current_offset: 0,
×
UNCOV
390
        });
×
391
    }
68,618✔
392

68,618✔
393
    let length = payload.len();
68,618✔
394
    let partition_id = u32::from_le_bytes(
68,618✔
395
        payload[..4]
68,618✔
396
            .try_into()
68,618✔
397
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
68,618✔
398
    );
399
    let current_offset = u64::from_le_bytes(
68,618✔
400
        payload[4..12]
68,618✔
401
            .try_into()
68,618✔
402
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
68,618✔
403
    );
404
    // Currently ignored
405
    let _messages_count = u32::from_le_bytes(
68,618✔
406
        payload[12..16]
68,618✔
407
            .try_into()
68,618✔
408
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
68,618✔
409
    );
410
    let mut position = 16;
68,618✔
411
    let mut messages = Vec::new();
68,618✔
412
    while position < length {
1,952,539✔
413
        let offset = u64::from_le_bytes(
1,918,937✔
414
            payload[position..position + 8]
1,918,937✔
415
                .try_into()
1,918,937✔
416
                .map_err(|_| IggyError::InvalidNumberEncoding)?,
1,918,937✔
417
        );
418
        let state = MessageState::from_code(payload[position + 8])?;
1,918,937✔
419
        let timestamp = u64::from_le_bytes(
1,918,937✔
420
            payload[position + 9..position + 17]
1,918,937✔
421
                .try_into()
1,918,937✔
422
                .map_err(|_| IggyError::InvalidNumberEncoding)?,
1,918,937✔
423
        );
424
        let id = u128::from_le_bytes(
1,918,937✔
425
            payload[position + 17..position + 33]
1,918,937✔
426
                .try_into()
1,918,937✔
427
                .map_err(|_| IggyError::InvalidNumberEncoding)?,
1,918,937✔
428
        );
429
        let checksum = u32::from_le_bytes(
1,918,937✔
430
            payload[position + 33..position + 37]
1,918,937✔
431
                .try_into()
1,918,937✔
432
                .map_err(|_| IggyError::InvalidNumberEncoding)?,
1,918,937✔
433
        );
434
        let headers_length = u32::from_le_bytes(
1,918,937✔
435
            payload[position + 37..position + 41]
1,918,937✔
436
                .try_into()
1,918,937✔
437
                .map_err(|_| IggyError::InvalidNumberEncoding)?,
1,918,937✔
438
        );
439
        let headers = if headers_length > 0 {
1,918,937✔
440
            let headers_payload =
4,112✔
441
                payload.slice(position + 41..position + 41 + headers_length as usize);
4,112✔
442
            Some(HashMap::from_bytes(headers_payload)?)
4,112✔
443
        } else {
444
            None
1,914,825✔
445
        };
446
        position += headers_length as usize;
1,918,937✔
447
        let message_length = u32::from_le_bytes(
1,918,937✔
448
            payload[position + 41..position + 45]
1,918,937✔
449
                .try_into()
1,918,937✔
450
                .map_err(|_| IggyError::InvalidNumberEncoding)?,
1,918,937✔
451
        );
452
        let payload_range = position + 45..position + 45 + message_length as usize;
1,918,937✔
453
        if payload_range.start > length || payload_range.end > length {
1,918,937✔
UNCOV
454
            break;
×
455
        }
1,918,937✔
456

1,918,937✔
457
        let payload = payload[payload_range].to_vec();
1,918,937✔
458
        let total_size = 45 + message_length as usize;
1,918,937✔
459
        position += total_size;
1,918,937✔
460
        messages.push(PolledMessage {
1,918,937✔
461
            offset,
1,918,937✔
462
            timestamp,
1,918,937✔
463
            state,
1,918,937✔
464
            checksum,
1,918,937✔
465
            id,
1,918,937✔
466
            headers,
1,918,937✔
467
            length: IggyByteSize::from(message_length as u64),
1,918,937✔
468
            payload: Bytes::from(payload),
1,918,937✔
469
        });
1,918,937✔
470

1,918,937✔
471
        if position + 45 >= length {
1,918,937✔
472
            break;
35,016✔
473
        }
1,883,921✔
474
    }
475

476
    messages.sort_by(|x, y| x.offset.cmp(&y.offset));
1,883,921✔
477
    Ok(PolledMessages {
68,618✔
478
        partition_id,
68,618✔
479
        current_offset,
68,618✔
480
        messages,
68,618✔
481
    })
68,618✔
482
}
68,618✔
483

484
pub fn map_streams(payload: Bytes) -> Result<Vec<Stream>, IggyError> {
43✔
485
    if payload.is_empty() {
43✔
486
        return Ok(EMPTY_STREAMS);
30✔
487
    }
13✔
488

13✔
489
    let mut streams = Vec::new();
13✔
490
    let length = payload.len();
13✔
491
    let mut position = 0;
13✔
492
    while position < length {
85✔
493
        let (stream, read_bytes) = map_to_stream(payload.clone(), position)?;
72✔
494
        streams.push(stream);
72✔
495
        position += read_bytes;
72✔
496
    }
497
    streams.sort_by(|x, y| x.id.cmp(&y.id));
176✔
498
    Ok(streams)
13✔
499
}
43✔
500

501
pub fn map_stream(payload: Bytes) -> Result<StreamDetails, IggyError> {
261✔
502
    let (stream, mut position) = map_to_stream(payload.clone(), 0)?;
261✔
503
    let mut topics = Vec::new();
261✔
504
    let length = payload.len();
261✔
505
    while position < length {
302✔
506
        let (topic, read_bytes) = map_to_topic(payload.clone(), position)?;
41✔
507
        topics.push(topic);
41✔
508
        position += read_bytes;
41✔
509
    }
510

511
    topics.sort_by(|x, y| x.id.cmp(&y.id));
261✔
512
    let stream = StreamDetails {
261✔
513
        id: stream.id,
261✔
514
        created_at: stream.created_at,
261✔
515
        topics_count: stream.topics_count,
261✔
516
        size: stream.size,
261✔
517
        messages_count: stream.messages_count,
261✔
518
        name: stream.name,
261✔
519
        topics,
261✔
520
    };
261✔
521
    Ok(stream)
261✔
522
}
261✔
523

524
fn map_to_stream(payload: Bytes, position: usize) -> Result<(Stream, usize), IggyError> {
333✔
525
    let id = u32::from_le_bytes(
333✔
526
        payload[position..position + 4]
333✔
527
            .try_into()
333✔
528
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
333✔
529
    );
530
    let created_at = u64::from_le_bytes(
333✔
531
        payload[position + 4..position + 12]
333✔
532
            .try_into()
333✔
533
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
333✔
534
    )
535
    .into();
333✔
536
    let topics_count = u32::from_le_bytes(
333✔
537
        payload[position + 12..position + 16]
333✔
538
            .try_into()
333✔
539
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
333✔
540
    );
541
    let size_bytes = u64::from_le_bytes(
333✔
542
        payload[position + 16..position + 24]
333✔
543
            .try_into()
333✔
544
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
333✔
545
    )
546
    .into();
333✔
547
    let messages_count = u64::from_le_bytes(
333✔
548
        payload[position + 24..position + 32]
333✔
549
            .try_into()
333✔
550
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
333✔
551
    );
552
    let name_length = payload[position + 32];
333✔
553
    let name = from_utf8(&payload[position + 33..position + 33 + name_length as usize])
333✔
554
        .map_err(|_| IggyError::InvalidUtf8)?
333✔
555
        .to_string();
333✔
556
    let read_bytes = 4 + 8 + 4 + 8 + 8 + 1 + name_length as usize;
333✔
557
    Ok((
333✔
558
        Stream {
333✔
559
            id,
333✔
560
            created_at,
333✔
561
            name,
333✔
562
            size: size_bytes,
333✔
563
            messages_count,
333✔
564
            topics_count,
333✔
565
        },
333✔
566
        read_bytes,
333✔
567
    ))
333✔
568
}
333✔
569

570
pub fn map_topics(payload: Bytes) -> Result<Vec<Topic>, IggyError> {
11✔
571
    if payload.is_empty() {
11✔
572
        return Ok(EMPTY_TOPICS);
6✔
573
    }
5✔
574

5✔
575
    let mut topics = Vec::new();
5✔
576
    let length = payload.len();
5✔
577
    let mut position = 0;
5✔
578
    while position < length {
10✔
579
        let (topic, read_bytes) = map_to_topic(payload.clone(), position)?;
5✔
580
        topics.push(topic);
5✔
581
        position += read_bytes;
5✔
582
    }
583
    topics.sort_by(|x, y| x.id.cmp(&y.id));
5✔
584
    Ok(topics)
5✔
585
}
11✔
586

587
pub fn map_topic(payload: Bytes) -> Result<TopicDetails, IggyError> {
302✔
588
    let (topic, mut position) = map_to_topic(payload.clone(), 0)?;
302✔
589
    let mut partitions = Vec::new();
302✔
590
    let length = payload.len();
302✔
591
    while position < length {
1,025✔
592
        let (partition, read_bytes) = map_to_partition(payload.clone(), position)?;
723✔
593
        partitions.push(partition);
723✔
594
        position += read_bytes;
723✔
595
    }
596

597
    partitions.sort_by(|x, y| x.id.cmp(&y.id));
857✔
598
    let topic = TopicDetails {
302✔
599
        id: topic.id,
302✔
600
        created_at: topic.created_at,
302✔
601
        name: topic.name,
302✔
602
        size: topic.size,
302✔
603
        messages_count: topic.messages_count,
302✔
604
        message_expiry: topic.message_expiry,
302✔
605
        compression_algorithm: topic.compression_algorithm,
302✔
606
        max_topic_size: topic.max_topic_size,
302✔
607
        replication_factor: topic.replication_factor,
302✔
608
        #[allow(clippy::cast_possible_truncation)]
302✔
609
        partitions_count: partitions.len() as u32,
302✔
610
        partitions,
302✔
611
    };
302✔
612
    Ok(topic)
302✔
613
}
302✔
614

615
fn map_to_topic(payload: Bytes, position: usize) -> Result<(Topic, usize), IggyError> {
348✔
616
    let id = u32::from_le_bytes(
348✔
617
        payload[position..position + 4]
348✔
618
            .try_into()
348✔
619
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
348✔
620
    );
621
    let created_at = u64::from_le_bytes(
348✔
622
        payload[position + 4..position + 12]
348✔
623
            .try_into()
348✔
624
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
348✔
625
    );
626
    let created_at = created_at.into();
348✔
627
    let partitions_count = u32::from_le_bytes(
348✔
628
        payload[position + 12..position + 16]
348✔
629
            .try_into()
348✔
630
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
348✔
631
    );
632
    let message_expiry = match u64::from_le_bytes(
348✔
633
        payload[position + 16..position + 24]
348✔
634
            .try_into()
348✔
635
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
348✔
636
    ) {
UNCOV
637
        0 => IggyExpiry::NeverExpire,
×
638
        message_expiry => message_expiry.into(),
348✔
639
    };
640
    let compression_algorithm = CompressionAlgorithm::from_code(payload[position + 24])?;
348✔
641
    let max_topic_size = u64::from_le_bytes(
348✔
642
        payload[position + 25..position + 33]
348✔
643
            .try_into()
348✔
644
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
348✔
645
    );
646
    let max_topic_size: MaxTopicSize = max_topic_size.into();
348✔
647
    let replication_factor = payload[position + 33];
348✔
648
    let size_bytes = IggyByteSize::from(u64::from_le_bytes(
348✔
649
        payload[position + 34..position + 42]
348✔
650
            .try_into()
348✔
651
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
348✔
652
    ));
653
    let messages_count = u64::from_le_bytes(
348✔
654
        payload[position + 42..position + 50]
348✔
655
            .try_into()
348✔
656
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
348✔
657
    );
658
    let name_length = payload[position + 50];
348✔
659
    let name = from_utf8(&payload[position + 51..position + 51 + name_length as usize])
348✔
660
        .map_err(|_| IggyError::InvalidUtf8)?
348✔
661
        .to_string();
348✔
662
    let read_bytes = 4 + 8 + 4 + 8 + 8 + 8 + 8 + 1 + 1 + 1 + name_length as usize;
348✔
663
    Ok((
348✔
664
        Topic {
348✔
665
            id,
348✔
666
            created_at,
348✔
667
            name,
348✔
668
            partitions_count,
348✔
669
            size: size_bytes,
348✔
670
            messages_count,
348✔
671
            message_expiry,
348✔
672
            compression_algorithm,
348✔
673
            max_topic_size,
348✔
674
            replication_factor,
348✔
675
        },
348✔
676
        read_bytes,
348✔
677
    ))
348✔
678
}
348✔
679

680
fn map_to_partition(payload: Bytes, position: usize) -> Result<(Partition, usize), IggyError> {
723✔
681
    let id = u32::from_le_bytes(
723✔
682
        payload[position..position + 4]
723✔
683
            .try_into()
723✔
684
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
723✔
685
    );
686
    let created_at = u64::from_le_bytes(
723✔
687
        payload[position + 4..position + 12]
723✔
688
            .try_into()
723✔
689
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
723✔
690
    );
691
    let created_at = created_at.into();
723✔
692
    let segments_count = u32::from_le_bytes(
723✔
693
        payload[position + 12..position + 16]
723✔
694
            .try_into()
723✔
695
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
723✔
696
    );
697
    let current_offset = u64::from_le_bytes(
723✔
698
        payload[position + 16..position + 24]
723✔
699
            .try_into()
723✔
700
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
723✔
701
    );
702
    let size_bytes = u64::from_le_bytes(
723✔
703
        payload[position + 24..position + 32]
723✔
704
            .try_into()
723✔
705
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
723✔
706
    )
707
    .into();
723✔
708
    let messages_count = u64::from_le_bytes(
723✔
709
        payload[position + 32..position + 40]
723✔
710
            .try_into()
723✔
711
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
723✔
712
    );
713
    let read_bytes = 4 + 8 + 4 + 8 + 8 + 8;
723✔
714
    Ok((
723✔
715
        Partition {
723✔
716
            id,
723✔
717
            created_at,
723✔
718
            segments_count,
723✔
719
            current_offset,
723✔
720
            size: size_bytes,
723✔
721
            messages_count,
723✔
722
        },
723✔
723
        read_bytes,
723✔
724
    ))
723✔
725
}
723✔
726

727
pub fn map_consumer_groups(payload: Bytes) -> Result<Vec<ConsumerGroup>, IggyError> {
24✔
728
    if payload.is_empty() {
24✔
729
        return Ok(EMPTY_CONSUMER_GROUPS);
10✔
730
    }
14✔
731

14✔
732
    let mut consumer_groups = Vec::new();
14✔
733
    let length = payload.len();
14✔
734
    let mut position = 0;
14✔
735
    while position < length {
28✔
736
        let (consumer_group, read_bytes) = map_to_consumer_group(payload.clone(), position)?;
14✔
737
        consumer_groups.push(consumer_group);
14✔
738
        position += read_bytes;
14✔
739
    }
740
    consumer_groups.sort_by(|x, y| x.id.cmp(&y.id));
14✔
741
    Ok(consumer_groups)
14✔
742
}
24✔
743

744
pub fn map_consumer_group(payload: Bytes) -> Result<ConsumerGroupDetails, IggyError> {
78✔
745
    let (consumer_group, mut position) = map_to_consumer_group(payload.clone(), 0)?;
78✔
746
    let mut members = Vec::new();
78✔
747
    let length = payload.len();
78✔
748
    while position < length {
114✔
749
        let (member, read_bytes) = map_to_consumer_group_member(payload.clone(), position)?;
36✔
750
        members.push(member);
36✔
751
        position += read_bytes;
36✔
752
    }
753
    members.sort_by(|x, y| x.id.cmp(&y.id));
78✔
754
    let consumer_group_details = ConsumerGroupDetails {
78✔
755
        id: consumer_group.id,
78✔
756
        name: consumer_group.name,
78✔
757
        partitions_count: consumer_group.partitions_count,
78✔
758
        members_count: consumer_group.members_count,
78✔
759
        members,
78✔
760
    };
78✔
761
    Ok(consumer_group_details)
78✔
762
}
78✔
763

764
fn map_to_consumer_group(
92✔
765
    payload: Bytes,
92✔
766
    position: usize,
92✔
767
) -> Result<(ConsumerGroup, usize), IggyError> {
92✔
768
    let id = u32::from_le_bytes(
92✔
769
        payload[position..position + 4]
92✔
770
            .try_into()
92✔
771
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
92✔
772
    );
773
    let partitions_count = u32::from_le_bytes(
92✔
774
        payload[position + 4..position + 8]
92✔
775
            .try_into()
92✔
776
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
92✔
777
    );
778
    let members_count = u32::from_le_bytes(
92✔
779
        payload[position + 8..position + 12]
92✔
780
            .try_into()
92✔
781
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
92✔
782
    );
783
    let name_length = payload[position + 12];
92✔
784
    let name = from_utf8(&payload[position + 13..position + 13 + name_length as usize])
92✔
785
        .map_err(|_| IggyError::InvalidUtf8)?
92✔
786
        .to_string();
92✔
787
    let read_bytes = 13 + name_length as usize;
92✔
788
    Ok((
92✔
789
        ConsumerGroup {
92✔
790
            id,
92✔
791
            partitions_count,
92✔
792
            members_count,
92✔
793
            name,
92✔
794
        },
92✔
795
        read_bytes,
92✔
796
    ))
92✔
797
}
92✔
798

799
fn map_to_consumer_group_member(
36✔
800
    payload: Bytes,
36✔
801
    position: usize,
36✔
802
) -> Result<(ConsumerGroupMember, usize), IggyError> {
36✔
803
    let id = u32::from_le_bytes(
36✔
804
        payload[position..position + 4]
36✔
805
            .try_into()
36✔
806
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
36✔
807
    );
808
    let partitions_count = u32::from_le_bytes(
36✔
809
        payload[position + 4..position + 8]
36✔
810
            .try_into()
36✔
811
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
36✔
812
    );
813
    let mut partitions = Vec::new();
36✔
814
    for i in 0..partitions_count {
54✔
815
        let partition_id = u32::from_le_bytes(
54✔
816
            payload[position + 8 + (i * 4) as usize..position + 8 + ((i + 1) * 4) as usize]
54✔
817
                .try_into()
54✔
818
                .map_err(|_| IggyError::InvalidNumberEncoding)?,
54✔
819
        );
820
        partitions.push(partition_id);
54✔
821
    }
822

823
    let read_bytes = (4 + 4 + partitions_count * 4) as usize;
36✔
824
    Ok((
36✔
825
        ConsumerGroupMember {
36✔
826
            id,
36✔
827
            partitions_count,
36✔
828
            partitions,
36✔
829
        },
36✔
830
        read_bytes,
36✔
831
    ))
36✔
832
}
36✔
833

834
fn map_to_client_info(
43✔
835
    payload: Bytes,
43✔
836
    mut position: usize,
43✔
837
) -> Result<(ClientInfo, usize), IggyError> {
43✔
838
    let mut read_bytes;
839
    let client_id = u32::from_le_bytes(
43✔
840
        payload[position..position + 4]
43✔
841
            .try_into()
43✔
842
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
43✔
843
    );
844
    let user_id = u32::from_le_bytes(
43✔
845
        payload[position + 4..position + 8]
43✔
846
            .try_into()
43✔
847
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
43✔
848
    );
849
    let user_id = match user_id {
43✔
UNCOV
850
        0 => None,
×
851
        _ => Some(user_id),
43✔
852
    };
853

854
    let transport = payload[position + 8];
43✔
855
    let transport = match transport {
43✔
856
        1 => "TCP",
30✔
857
        2 => "QUIC",
13✔
UNCOV
858
        _ => "Unknown",
×
859
    }
860
    .to_string();
43✔
861

862
    let address_length = u32::from_le_bytes(
43✔
863
        payload[position + 9..position + 13]
43✔
864
            .try_into()
43✔
865
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
43✔
866
    ) as usize;
867
    let address = from_utf8(&payload[position + 13..position + 13 + address_length])
43✔
868
        .map_err(|_| IggyError::InvalidUtf8)?
43✔
869
        .to_string();
43✔
870
    read_bytes = 4 + 4 + 1 + 4 + address_length;
43✔
871
    position += read_bytes;
43✔
872
    let consumer_groups_count = u32::from_le_bytes(
43✔
873
        payload[position..position + 4]
43✔
874
            .try_into()
43✔
875
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
43✔
876
    );
877
    read_bytes += 4;
43✔
878
    Ok((
43✔
879
        ClientInfo {
43✔
880
            client_id,
43✔
881
            user_id,
43✔
882
            address,
43✔
883
            transport,
43✔
884
            consumer_groups_count,
43✔
885
        },
43✔
886
        read_bytes,
43✔
887
    ))
43✔
888
}
43✔
889

890
fn map_to_user_info(payload: Bytes, position: usize) -> Result<(UserInfo, usize), IggyError> {
118✔
891
    let id = u32::from_le_bytes(
118✔
892
        payload[position..position + 4]
118✔
893
            .try_into()
118✔
894
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
118✔
895
    );
896
    let created_at = u64::from_le_bytes(
118✔
897
        payload[position + 4..position + 12]
118✔
898
            .try_into()
118✔
899
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
118✔
900
    );
901
    let created_at = created_at.into();
118✔
902
    let status = payload[position + 12];
118✔
903
    let status = UserStatus::from_code(status)?;
118✔
904
    let username_length = payload[position + 13];
118✔
905
    let username = from_utf8(&payload[position + 14..position + 14 + username_length as usize])
118✔
906
        .map_err(|_| IggyError::InvalidUtf8)?
118✔
907
        .to_string();
118✔
908
    let read_bytes = 4 + 8 + 1 + 1 + username_length as usize;
118✔
909

118✔
910
    Ok((
118✔
911
        UserInfo {
118✔
912
            id,
118✔
913
            created_at,
118✔
914
            status,
118✔
915
            username,
118✔
916
        },
118✔
917
        read_bytes,
118✔
918
    ))
118✔
919
}
118✔
920

921
fn map_to_pat_info(
16✔
922
    payload: Bytes,
16✔
923
    position: usize,
16✔
924
) -> Result<(PersonalAccessTokenInfo, usize), IggyError> {
16✔
925
    let name_length = payload[position];
16✔
926
    let name = from_utf8(&payload[position + 1..position + 1 + name_length as usize])
16✔
927
        .map_err(|_| IggyError::InvalidUtf8)?
16✔
928
        .to_string();
16✔
929
    let position = position + 1 + name_length as usize;
16✔
930
    let expiry_at = u64::from_le_bytes(
16✔
931
        payload[position..position + 8]
16✔
932
            .try_into()
16✔
933
            .map_err(|_| IggyError::InvalidNumberEncoding)?,
16✔
934
    );
935
    let expiry_at = match expiry_at {
16✔
936
        0 => None,
7✔
937
        value => Some(value.into()),
9✔
938
    };
939
    let read_bytes = 1 + name_length as usize + 8;
16✔
940
    Ok((PersonalAccessTokenInfo { name, expiry_at }, read_bytes))
16✔
941
}
16✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc