• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

iggy-rs / iggy / 12770513340

14 Jan 2025 03:07PM UTC coverage: 75.399% (+0.05%) from 75.351%
12770513340

push

github

web-flow
Allow replaying messages for high-level consumer, add delete consumer offset (#1436)

291 of 398 new or added lines in 23 files covered. (73.12%)

1 existing line in 1 file now uncovered.

24626 of 32661 relevant lines covered (75.4%)

25528.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.83
/server/src/command.rs
1
use bytes::{BufMut, Bytes, BytesMut};
2
use iggy::command::*;
3
use iggy::consumer_groups::create_consumer_group::CreateConsumerGroup;
4
use iggy::consumer_groups::delete_consumer_group::DeleteConsumerGroup;
5
use iggy::consumer_groups::get_consumer_group::GetConsumerGroup;
6
use iggy::consumer_groups::get_consumer_groups::GetConsumerGroups;
7
use iggy::consumer_groups::join_consumer_group::JoinConsumerGroup;
8
use iggy::consumer_groups::leave_consumer_group::LeaveConsumerGroup;
9
use iggy::consumer_offsets::delete_consumer_offset::DeleteConsumerOffset;
10
use iggy::consumer_offsets::get_consumer_offset::GetConsumerOffset;
11
use iggy::consumer_offsets::store_consumer_offset::StoreConsumerOffset;
12
use iggy::error::IggyError;
13
use iggy::messages::poll_messages::PollMessages;
14
use iggy::messages::send_messages::SendMessages;
15
use iggy::partitions::create_partitions::CreatePartitions;
16
use iggy::partitions::delete_partitions::DeletePartitions;
17
use iggy::personal_access_tokens::create_personal_access_token::CreatePersonalAccessToken;
18
use iggy::personal_access_tokens::delete_personal_access_token::DeletePersonalAccessToken;
19
use iggy::personal_access_tokens::get_personal_access_tokens::GetPersonalAccessTokens;
20
use iggy::personal_access_tokens::login_with_personal_access_token::LoginWithPersonalAccessToken;
21
use iggy::streams::create_stream::CreateStream;
22
use iggy::streams::delete_stream::DeleteStream;
23
use iggy::streams::get_stream::GetStream;
24
use iggy::streams::get_streams::GetStreams;
25
use iggy::streams::purge_stream::PurgeStream;
26
use iggy::streams::update_stream::UpdateStream;
27
use iggy::system::get_client::GetClient;
28
use iggy::system::get_clients::GetClients;
29
use iggy::system::get_me::GetMe;
30
use iggy::system::get_snapshot::GetSnapshot;
31
use iggy::system::get_stats::GetStats;
32
use iggy::system::ping::Ping;
33
use iggy::topics::create_topic::CreateTopic;
34
use iggy::topics::delete_topic::DeleteTopic;
35
use iggy::topics::get_topic::GetTopic;
36
use iggy::topics::get_topics::GetTopics;
37
use iggy::topics::purge_topic::PurgeTopic;
38
use iggy::topics::update_topic::UpdateTopic;
39
use iggy::users::change_password::ChangePassword;
40
use iggy::users::create_user::CreateUser;
41
use iggy::users::delete_user::DeleteUser;
42
use iggy::users::get_user::GetUser;
43
use iggy::users::get_users::GetUsers;
44
use iggy::users::login_user::LoginUser;
45
use iggy::users::logout_user::LogoutUser;
46
use iggy::users::update_permissions::UpdatePermissions;
47
use iggy::users::update_user::UpdateUser;
48
use iggy::validatable::Validatable;
49
use iggy::{
50
    bytes_serializable::BytesSerializable, messages::flush_unsaved_buffer::FlushUnsavedBuffer,
51
};
52
use std::fmt::{Display, Formatter};
53
use strum::EnumString;
54

55
#[derive(Debug, PartialEq, EnumString)]
×
56
pub enum ServerCommand {
57
    Ping(Ping),
58
    GetStats(GetStats),
59
    GetMe(GetMe),
60
    GetClient(GetClient),
61
    GetClients(GetClients),
62
    GetUser(GetUser),
63
    GetUsers(GetUsers),
64
    CreateUser(CreateUser),
65
    DeleteUser(DeleteUser),
66
    UpdateUser(UpdateUser),
67
    UpdatePermissions(UpdatePermissions),
68
    ChangePassword(ChangePassword),
69
    LoginUser(LoginUser),
70
    LogoutUser(LogoutUser),
71
    GetPersonalAccessTokens(GetPersonalAccessTokens),
72
    CreatePersonalAccessToken(CreatePersonalAccessToken),
73
    DeletePersonalAccessToken(DeletePersonalAccessToken),
74
    LoginWithPersonalAccessToken(LoginWithPersonalAccessToken),
75
    SendMessages(SendMessages),
76
    PollMessages(PollMessages),
77
    FlushUnsavedBuffer(FlushUnsavedBuffer),
78
    GetConsumerOffset(GetConsumerOffset),
79
    StoreConsumerOffset(StoreConsumerOffset),
80
    DeleteConsumerOffset(DeleteConsumerOffset),
81
    GetStream(GetStream),
82
    GetStreams(GetStreams),
83
    CreateStream(CreateStream),
84
    DeleteStream(DeleteStream),
85
    UpdateStream(UpdateStream),
86
    PurgeStream(PurgeStream),
87
    GetTopic(GetTopic),
88
    GetTopics(GetTopics),
89
    CreateTopic(CreateTopic),
90
    DeleteTopic(DeleteTopic),
91
    UpdateTopic(UpdateTopic),
92
    PurgeTopic(PurgeTopic),
93
    CreatePartitions(CreatePartitions),
94
    DeletePartitions(DeletePartitions),
95
    GetConsumerGroup(GetConsumerGroup),
96
    GetConsumerGroups(GetConsumerGroups),
97
    CreateConsumerGroup(CreateConsumerGroup),
98
    DeleteConsumerGroup(DeleteConsumerGroup),
99
    JoinConsumerGroup(JoinConsumerGroup),
100
    LeaveConsumerGroup(LeaveConsumerGroup),
101
    GetSnapshotFile(GetSnapshot),
102
}
103

104
impl BytesSerializable for ServerCommand {
105
    fn to_bytes(&self) -> Bytes {
43✔
106
        match self {
43✔
107
            ServerCommand::Ping(payload) => as_bytes(payload),
1✔
108
            ServerCommand::GetStats(payload) => as_bytes(payload),
1✔
109
            ServerCommand::GetMe(payload) => as_bytes(payload),
1✔
110
            ServerCommand::GetClient(payload) => as_bytes(payload),
1✔
111
            ServerCommand::GetClients(payload) => as_bytes(payload),
1✔
112
            ServerCommand::GetUser(payload) => as_bytes(payload),
1✔
113
            ServerCommand::GetUsers(payload) => as_bytes(payload),
1✔
114
            ServerCommand::CreateUser(payload) => as_bytes(payload),
1✔
115
            ServerCommand::DeleteUser(payload) => as_bytes(payload),
1✔
116
            ServerCommand::UpdateUser(payload) => as_bytes(payload),
1✔
117
            ServerCommand::UpdatePermissions(payload) => as_bytes(payload),
1✔
118
            ServerCommand::ChangePassword(payload) => as_bytes(payload),
1✔
119
            ServerCommand::LoginUser(payload) => as_bytes(payload),
1✔
120
            ServerCommand::LogoutUser(payload) => as_bytes(payload),
1✔
121
            ServerCommand::GetPersonalAccessTokens(payload) => as_bytes(payload),
1✔
122
            ServerCommand::CreatePersonalAccessToken(payload) => as_bytes(payload),
1✔
123
            ServerCommand::DeletePersonalAccessToken(payload) => as_bytes(payload),
1✔
124
            ServerCommand::LoginWithPersonalAccessToken(payload) => as_bytes(payload),
1✔
125
            ServerCommand::SendMessages(payload) => as_bytes(payload),
1✔
126
            ServerCommand::PollMessages(payload) => as_bytes(payload),
1✔
127
            ServerCommand::StoreConsumerOffset(payload) => as_bytes(payload),
1✔
NEW
128
            ServerCommand::DeleteConsumerOffset(payload) => as_bytes(payload),
×
129
            ServerCommand::GetConsumerOffset(payload) => as_bytes(payload),
1✔
130
            ServerCommand::GetStream(payload) => as_bytes(payload),
1✔
131
            ServerCommand::GetStreams(payload) => as_bytes(payload),
1✔
132
            ServerCommand::CreateStream(payload) => as_bytes(payload),
1✔
133
            ServerCommand::DeleteStream(payload) => as_bytes(payload),
1✔
134
            ServerCommand::UpdateStream(payload) => as_bytes(payload),
1✔
135
            ServerCommand::PurgeStream(payload) => as_bytes(payload),
1✔
136
            ServerCommand::GetTopic(payload) => as_bytes(payload),
1✔
137
            ServerCommand::GetTopics(payload) => as_bytes(payload),
1✔
138
            ServerCommand::CreateTopic(payload) => as_bytes(payload),
1✔
139
            ServerCommand::DeleteTopic(payload) => as_bytes(payload),
1✔
140
            ServerCommand::UpdateTopic(payload) => as_bytes(payload),
1✔
141
            ServerCommand::PurgeTopic(payload) => as_bytes(payload),
1✔
142
            ServerCommand::CreatePartitions(payload) => as_bytes(payload),
1✔
143
            ServerCommand::DeletePartitions(payload) => as_bytes(payload),
1✔
144
            ServerCommand::GetConsumerGroup(payload) => as_bytes(payload),
1✔
145
            ServerCommand::GetConsumerGroups(payload) => as_bytes(payload),
1✔
146
            ServerCommand::CreateConsumerGroup(payload) => as_bytes(payload),
1✔
147
            ServerCommand::DeleteConsumerGroup(payload) => as_bytes(payload),
1✔
148
            ServerCommand::JoinConsumerGroup(payload) => as_bytes(payload),
1✔
149
            ServerCommand::LeaveConsumerGroup(payload) => as_bytes(payload),
1✔
150
            ServerCommand::FlushUnsavedBuffer(payload) => as_bytes(payload),
1✔
151
            ServerCommand::GetSnapshotFile(payload) => as_bytes(payload),
×
152
        }
153
    }
43✔
154

155
    fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> {
95,514✔
156
        let code = u32::from_le_bytes(
95,514✔
157
            bytes[..4]
95,514✔
158
                .try_into()
95,514✔
159
                .map_err(|_| IggyError::InvalidNumberEncoding)?,
95,514✔
160
        );
161
        let payload = bytes.slice(4..);
95,514✔
162
        match code {
95,514✔
163
            PING_CODE => Ok(ServerCommand::Ping(Ping::from_bytes(payload)?)),
147✔
164
            GET_STATS_CODE => Ok(ServerCommand::GetStats(GetStats::from_bytes(payload)?)),
10✔
165
            GET_ME_CODE => Ok(ServerCommand::GetMe(GetMe::from_bytes(payload)?)),
35✔
166
            GET_CLIENT_CODE => Ok(ServerCommand::GetClient(GetClient::from_bytes(payload)?)),
2✔
167
            GET_CLIENTS_CODE => Ok(ServerCommand::GetClients(GetClients::from_bytes(payload)?)),
6✔
168
            GET_USER_CODE => Ok(ServerCommand::GetUser(GetUser::from_bytes(payload)?)),
43✔
169
            GET_USERS_CODE => Ok(ServerCommand::GetUsers(GetUsers::from_bytes(payload)?)),
25✔
170
            CREATE_USER_CODE => Ok(ServerCommand::CreateUser(CreateUser::from_bytes(payload)?)),
51✔
171
            DELETE_USER_CODE => Ok(ServerCommand::DeleteUser(DeleteUser::from_bytes(payload)?)),
35✔
172
            UPDATE_USER_CODE => Ok(ServerCommand::UpdateUser(UpdateUser::from_bytes(payload)?)),
10✔
173
            UPDATE_PERMISSIONS_CODE => Ok(ServerCommand::UpdatePermissions(
174
                UpdatePermissions::from_bytes(payload)?,
7✔
175
            )),
176
            CHANGE_PASSWORD_CODE => Ok(ServerCommand::ChangePassword(ChangePassword::from_bytes(
9✔
177
                payload,
9✔
178
            )?)),
9✔
179
            LOGIN_USER_CODE => Ok(ServerCommand::LoginUser(LoginUser::from_bytes(payload)?)),
427✔
180
            LOGOUT_USER_CODE => Ok(ServerCommand::LogoutUser(LogoutUser::from_bytes(payload)?)),
204✔
181
            GET_PERSONAL_ACCESS_TOKENS_CODE => Ok(ServerCommand::GetPersonalAccessTokens(
182
                GetPersonalAccessTokens::from_bytes(payload)?,
25✔
183
            )),
184
            CREATE_PERSONAL_ACCESS_TOKEN_CODE => Ok(ServerCommand::CreatePersonalAccessToken(
185
                CreatePersonalAccessToken::from_bytes(payload)?,
19✔
186
            )),
187
            DELETE_PERSONAL_ACCESS_TOKEN_CODE => Ok(ServerCommand::DeletePersonalAccessToken(
188
                DeletePersonalAccessToken::from_bytes(payload)?,
18✔
189
            )),
190
            LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE => {
191
                Ok(ServerCommand::LoginWithPersonalAccessToken(
192
                    LoginWithPersonalAccessToken::from_bytes(payload)?,
12✔
193
                ))
194
            }
195
            SEND_MESSAGES_CODE => Ok(ServerCommand::SendMessages(SendMessages::from_bytes(
26,109✔
196
                payload,
26,109✔
197
            )?)),
26,109✔
198
            POLL_MESSAGES_CODE => Ok(ServerCommand::PollMessages(PollMessages::from_bytes(
67,127✔
199
                payload,
67,127✔
200
            )?)),
67,127✔
201
            FLUSH_UNSAVED_BUFFER_CODE => Ok(ServerCommand::FlushUnsavedBuffer(
202
                FlushUnsavedBuffer::from_bytes(payload)?,
19✔
203
            )),
204
            STORE_CONSUMER_OFFSET_CODE => Ok(ServerCommand::StoreConsumerOffset(
205
                StoreConsumerOffset::from_bytes(payload)?,
23✔
206
            )),
207
            DELETE_CONSUMER_OFFSET_CODE => Ok(ServerCommand::DeleteConsumerOffset(
208
                DeleteConsumerOffset::from_bytes(payload)?,
2✔
209
            )),
210
            GET_CONSUMER_OFFSET_CODE => Ok(ServerCommand::GetConsumerOffset(
211
                GetConsumerOffset::from_bytes(payload)?,
27✔
212
            )),
213
            GET_STREAM_CODE => Ok(ServerCommand::GetStream(GetStream::from_bytes(payload)?)),
51✔
214
            GET_STREAMS_CODE => Ok(ServerCommand::GetStreams(GetStreams::from_bytes(payload)?)),
44✔
215
            CREATE_STREAM_CODE => Ok(ServerCommand::CreateStream(CreateStream::from_bytes(
220✔
216
                payload,
220✔
217
            )?)),
220✔
218
            DELETE_STREAM_CODE => Ok(ServerCommand::DeleteStream(DeleteStream::from_bytes(
149✔
219
                payload,
149✔
220
            )?)),
149✔
221
            UPDATE_STREAM_CODE => Ok(ServerCommand::UpdateStream(UpdateStream::from_bytes(
5✔
222
                payload,
5✔
223
            )?)),
5✔
224
            PURGE_STREAM_CODE => Ok(ServerCommand::PurgeStream(PurgeStream::from_bytes(
7✔
225
                payload,
7✔
226
            )?)),
7✔
227
            GET_TOPIC_CODE => Ok(ServerCommand::GetTopic(GetTopic::from_bytes(payload)?)),
96✔
228
            GET_TOPICS_CODE => Ok(ServerCommand::GetTopics(GetTopics::from_bytes(payload)?)),
12✔
229
            CREATE_TOPIC_CODE => Ok(ServerCommand::CreateTopic(CreateTopic::from_bytes(
213✔
230
                payload,
213✔
231
            )?)),
213✔
232
            DELETE_TOPIC_CODE => Ok(ServerCommand::DeleteTopic(DeleteTopic::from_bytes(
128✔
233
                payload,
128✔
234
            )?)),
128✔
235
            UPDATE_TOPIC_CODE => Ok(ServerCommand::UpdateTopic(UpdateTopic::from_bytes(
9✔
236
                payload,
9✔
237
            )?)),
9✔
238
            PURGE_TOPIC_CODE => Ok(ServerCommand::PurgeTopic(PurgeTopic::from_bytes(payload)?)),
9✔
239
            CREATE_PARTITIONS_CODE => Ok(ServerCommand::CreatePartitions(
240
                CreatePartitions::from_bytes(payload)?,
7✔
241
            )),
242
            DELETE_PARTITIONS_CODE => Ok(ServerCommand::DeletePartitions(
243
                DeletePartitions::from_bytes(payload)?,
7✔
244
            )),
245
            GET_CONSUMER_GROUP_CODE => Ok(ServerCommand::GetConsumerGroup(
246
                GetConsumerGroup::from_bytes(payload)?,
35✔
247
            )),
248
            GET_CONSUMER_GROUPS_CODE => Ok(ServerCommand::GetConsumerGroups(
249
                GetConsumerGroups::from_bytes(payload)?,
25✔
250
            )),
251
            CREATE_CONSUMER_GROUP_CODE => Ok(ServerCommand::CreateConsumerGroup(
252
                CreateConsumerGroup::from_bytes(payload)?,
45✔
253
            )),
254
            DELETE_CONSUMER_GROUP_CODE => Ok(ServerCommand::DeleteConsumerGroup(
255
                DeleteConsumerGroup::from_bytes(payload)?,
31✔
256
            )),
257
            JOIN_CONSUMER_GROUP_CODE => Ok(ServerCommand::JoinConsumerGroup(
258
                JoinConsumerGroup::from_bytes(payload)?,
25✔
259
            )),
260
            LEAVE_CONSUMER_GROUP_CODE => Ok(ServerCommand::LeaveConsumerGroup(
261
                LeaveConsumerGroup::from_bytes(payload)?,
3✔
262
            )),
263
            GET_SNAPSHOT_FILE_CODE => Ok(ServerCommand::GetSnapshotFile(GetSnapshot::from_bytes(
1✔
264
                payload,
1✔
265
            )?)),
1✔
266
            _ => Err(IggyError::InvalidCommand),
×
267
        }
268
    }
95,514✔
269
}
270

271
fn as_bytes<T: Command>(command: &T) -> Bytes {
43✔
272
    let payload = command.to_bytes();
43✔
273
    let mut bytes = BytesMut::with_capacity(4 + payload.len());
43✔
274
    bytes.put_u32_le(command.code());
43✔
275
    bytes.put_slice(&payload);
43✔
276
    bytes.freeze()
43✔
277
}
43✔
278

279
impl Validatable<IggyError> for ServerCommand {
280
    fn validate(&self) -> Result<(), IggyError> {
95,471✔
281
        match self {
95,471✔
282
            ServerCommand::Ping(command) => command.validate(),
146✔
283
            ServerCommand::GetStats(command) => command.validate(),
9✔
284
            ServerCommand::GetMe(command) => command.validate(),
34✔
285
            ServerCommand::GetClient(command) => command.validate(),
1✔
286
            ServerCommand::GetClients(command) => command.validate(),
5✔
287
            ServerCommand::GetUser(command) => command.validate(),
42✔
288
            ServerCommand::GetUsers(command) => command.validate(),
24✔
289
            ServerCommand::CreateUser(command) => command.validate(),
50✔
290
            ServerCommand::DeleteUser(command) => command.validate(),
34✔
291
            ServerCommand::UpdateUser(command) => command.validate(),
9✔
292
            ServerCommand::UpdatePermissions(command) => command.validate(),
6✔
293
            ServerCommand::ChangePassword(command) => command.validate(),
8✔
294
            ServerCommand::LoginUser(command) => command.validate(),
426✔
295
            ServerCommand::LogoutUser(command) => command.validate(),
203✔
296
            ServerCommand::GetPersonalAccessTokens(command) => command.validate(),
24✔
297
            ServerCommand::CreatePersonalAccessToken(command) => command.validate(),
18✔
298
            ServerCommand::DeletePersonalAccessToken(command) => command.validate(),
17✔
299
            ServerCommand::LoginWithPersonalAccessToken(command) => command.validate(),
11✔
300
            ServerCommand::SendMessages(command) => command.validate(),
26,108✔
301
            ServerCommand::PollMessages(command) => command.validate(),
67,126✔
302
            ServerCommand::StoreConsumerOffset(command) => command.validate(),
22✔
303
            ServerCommand::DeleteConsumerOffset(command) => command.validate(),
2✔
304
            ServerCommand::GetConsumerOffset(command) => command.validate(),
26✔
305
            ServerCommand::GetStream(command) => command.validate(),
50✔
306
            ServerCommand::GetStreams(command) => command.validate(),
43✔
307
            ServerCommand::CreateStream(command) => command.validate(),
219✔
308
            ServerCommand::DeleteStream(command) => command.validate(),
148✔
309
            ServerCommand::UpdateStream(command) => command.validate(),
4✔
310
            ServerCommand::PurgeStream(command) => command.validate(),
6✔
311
            ServerCommand::GetTopic(command) => command.validate(),
95✔
312
            ServerCommand::GetTopics(command) => command.validate(),
11✔
313
            ServerCommand::CreateTopic(command) => command.validate(),
212✔
314
            ServerCommand::DeleteTopic(command) => command.validate(),
127✔
315
            ServerCommand::UpdateTopic(command) => command.validate(),
8✔
316
            ServerCommand::PurgeTopic(command) => command.validate(),
8✔
317
            ServerCommand::CreatePartitions(command) => command.validate(),
6✔
318
            ServerCommand::DeletePartitions(command) => command.validate(),
6✔
319
            ServerCommand::GetConsumerGroup(command) => command.validate(),
34✔
320
            ServerCommand::GetConsumerGroups(command) => command.validate(),
24✔
321
            ServerCommand::CreateConsumerGroup(command) => command.validate(),
44✔
322
            ServerCommand::DeleteConsumerGroup(command) => command.validate(),
30✔
323
            ServerCommand::JoinConsumerGroup(command) => command.validate(),
24✔
324
            ServerCommand::LeaveConsumerGroup(command) => command.validate(),
2✔
325
            ServerCommand::FlushUnsavedBuffer(command) => command.validate(),
18✔
326
            ServerCommand::GetSnapshotFile(command) => command.validate(),
1✔
327
        }
328
    }
95,471✔
329
}
330

331
impl Display for ServerCommand {
332
    fn fmt(&self, formatter: &mut Formatter<'_>) -> std::fmt::Result {
×
333
        match self {
×
334
            ServerCommand::Ping(_) => write!(formatter, "{PING}"),
×
335
            ServerCommand::GetStats(_) => write!(formatter, "{GET_STATS}"),
×
336
            ServerCommand::GetMe(_) => write!(formatter, "{GET_ME}"),
×
337
            ServerCommand::GetClient(payload) => write!(formatter, "{GET_CLIENT}|{payload}"),
×
338
            ServerCommand::GetClients(_) => write!(formatter, "{GET_CLIENTS}"),
×
339
            ServerCommand::GetUser(payload) => write!(formatter, "{GET_USER}|{payload}"),
×
340
            ServerCommand::GetUsers(_) => write!(formatter, "{GET_USERS}"),
×
341
            ServerCommand::CreateUser(payload) => write!(formatter, "{CREATE_USER}|{payload}"),
×
342
            ServerCommand::DeleteUser(payload) => write!(formatter, "{DELETE_USER}|{payload}"),
×
343
            ServerCommand::UpdateUser(payload) => write!(formatter, "{UPDATE_USER}|{payload}"),
×
344
            ServerCommand::UpdatePermissions(payload) => {
×
345
                write!(formatter, "{UPDATE_PERMISSIONS}|{payload}")
×
346
            }
347
            ServerCommand::ChangePassword(payload) => {
×
348
                write!(formatter, "{CHANGE_PASSWORD}|{payload}")
×
349
            }
350
            ServerCommand::LoginUser(payload) => write!(formatter, "{LOGIN_USER}|{payload}"),
×
351
            ServerCommand::LogoutUser(_) => write!(formatter, "{LOGOUT_USER}"),
×
352
            ServerCommand::GetPersonalAccessTokens(_) => {
353
                write!(formatter, "{GET_PERSONAL_ACCESS_TOKENS}")
×
354
            }
355
            ServerCommand::CreatePersonalAccessToken(payload) => {
×
356
                write!(formatter, "{CREATE_PERSONAL_ACCESS_TOKEN}|{payload}")
×
357
            }
358
            ServerCommand::DeletePersonalAccessToken(payload) => {
×
359
                write!(formatter, "{DELETE_PERSONAL_ACCESS_TOKEN}|{payload}")
×
360
            }
361
            ServerCommand::LoginWithPersonalAccessToken(payload) => {
×
362
                write!(formatter, "{LOGIN_WITH_PERSONAL_ACCESS_TOKEN}|{payload}")
×
363
            }
364
            ServerCommand::GetStream(payload) => write!(formatter, "{GET_STREAM}|{payload}"),
×
365
            ServerCommand::GetStreams(_) => write!(formatter, "{GET_STREAMS}"),
×
366
            ServerCommand::CreateStream(payload) => write!(formatter, "{CREATE_STREAM}|{payload}"),
×
367
            ServerCommand::DeleteStream(payload) => write!(formatter, "{DELETE_STREAM}|{payload}"),
×
368
            ServerCommand::UpdateStream(payload) => write!(formatter, "{UPDATE_STREAM}|{payload}"),
×
369
            ServerCommand::PurgeStream(payload) => write!(formatter, "{PURGE_STREAM}|{payload}"),
×
370
            ServerCommand::GetTopic(payload) => write!(formatter, "{GET_TOPIC}|{payload}"),
×
371
            ServerCommand::GetTopics(payload) => write!(formatter, "{GET_TOPICS}|{payload}"),
×
372
            ServerCommand::CreateTopic(payload) => write!(formatter, "{CREATE_TOPIC}|{payload}"),
×
373
            ServerCommand::DeleteTopic(payload) => write!(formatter, "{DELETE_TOPIC}|{payload}"),
×
374
            ServerCommand::UpdateTopic(payload) => write!(formatter, "{UPDATE_TOPIC}|{payload}"),
×
375
            ServerCommand::PurgeTopic(payload) => write!(formatter, "{PURGE_TOPIC}|{payload}"),
×
376
            ServerCommand::CreatePartitions(payload) => {
×
377
                write!(formatter, "{CREATE_PARTITIONS}|{payload}")
×
378
            }
379
            ServerCommand::DeletePartitions(payload) => {
×
380
                write!(formatter, "{DELETE_PARTITIONS}|{payload}")
×
381
            }
382
            ServerCommand::PollMessages(payload) => write!(formatter, "{POLL_MESSAGES}|{payload}"),
×
383
            ServerCommand::SendMessages(payload) => write!(formatter, "{SEND_MESSAGES}|{payload}"),
×
384
            ServerCommand::StoreConsumerOffset(payload) => {
×
385
                write!(formatter, "{STORE_CONSUMER_OFFSET}|{payload}")
×
386
            }
NEW
387
            ServerCommand::DeleteConsumerOffset(payload) => {
×
NEW
388
                write!(formatter, "{DELETE_CONSUMER_OFFSET}|{payload}")
×
389
            }
390
            ServerCommand::GetConsumerOffset(payload) => {
×
391
                write!(formatter, "{GET_CONSUMER_OFFSET}|{payload}")
×
392
            }
393
            ServerCommand::GetConsumerGroup(payload) => {
×
394
                write!(formatter, "{GET_CONSUMER_GROUP}|{payload}")
×
395
            }
396
            ServerCommand::GetConsumerGroups(payload) => {
×
397
                write!(formatter, "{GET_CONSUMER_GROUPS}|{payload}")
×
398
            }
399
            ServerCommand::CreateConsumerGroup(payload) => {
×
400
                write!(formatter, "{CREATE_CONSUMER_GROUP}|{payload}")
×
401
            }
402
            ServerCommand::DeleteConsumerGroup(payload) => {
×
403
                write!(formatter, "{DELETE_CONSUMER_GROUP}|{payload}")
×
404
            }
405
            ServerCommand::JoinConsumerGroup(payload) => {
×
406
                write!(formatter, "{JOIN_CONSUMER_GROUP}|{payload}")
×
407
            }
408
            ServerCommand::LeaveConsumerGroup(payload) => {
×
409
                write!(formatter, "{LEAVE_CONSUMER_GROUP}|{payload}")
×
410
            }
411
            ServerCommand::FlushUnsavedBuffer(payload) => {
×
412
                write!(formatter, "{FLUSH_UNSAVED_BUFFER}|{payload}")
×
413
            }
414
            ServerCommand::GetSnapshotFile(payload) => {
×
415
                write!(formatter, "{GET_SNAPSHOT_FILE}|{payload}")
×
416
            }
417
        }
418
    }
×
419
}
420

421
#[cfg(test)]
422
mod tests {
423
    use super::*;
424

425
    #[test]
426
    fn should_be_serialized_as_bytes_and_deserialized_from_bytes() {
1✔
427
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
428
            &ServerCommand::Ping(Ping::default()),
1✔
429
            PING_CODE,
1✔
430
            &Ping::default(),
1✔
431
        );
1✔
432
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
433
            &ServerCommand::GetStats(GetStats::default()),
1✔
434
            GET_STATS_CODE,
1✔
435
            &GetStats::default(),
1✔
436
        );
1✔
437
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
438
            &ServerCommand::GetMe(GetMe::default()),
1✔
439
            GET_ME_CODE,
1✔
440
            &GetMe::default(),
1✔
441
        );
1✔
442
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
443
            &ServerCommand::GetClient(GetClient::default()),
1✔
444
            GET_CLIENT_CODE,
1✔
445
            &GetClient::default(),
1✔
446
        );
1✔
447
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
448
            &ServerCommand::GetClients(GetClients::default()),
1✔
449
            GET_CLIENTS_CODE,
1✔
450
            &GetClients::default(),
1✔
451
        );
1✔
452
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
453
            &ServerCommand::GetUser(GetUser::default()),
1✔
454
            GET_USER_CODE,
1✔
455
            &GetUser::default(),
1✔
456
        );
1✔
457
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
458
            &ServerCommand::GetUsers(GetUsers::default()),
1✔
459
            GET_USERS_CODE,
1✔
460
            &GetUsers::default(),
1✔
461
        );
1✔
462
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
463
            &ServerCommand::CreateUser(CreateUser::default()),
1✔
464
            CREATE_USER_CODE,
1✔
465
            &CreateUser::default(),
1✔
466
        );
1✔
467
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
468
            &ServerCommand::DeleteUser(DeleteUser::default()),
1✔
469
            DELETE_USER_CODE,
1✔
470
            &DeleteUser::default(),
1✔
471
        );
1✔
472
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
473
            &ServerCommand::UpdateUser(UpdateUser::default()),
1✔
474
            UPDATE_USER_CODE,
1✔
475
            &UpdateUser::default(),
1✔
476
        );
1✔
477
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
478
            &ServerCommand::UpdatePermissions(UpdatePermissions::default()),
1✔
479
            UPDATE_PERMISSIONS_CODE,
1✔
480
            &UpdatePermissions::default(),
1✔
481
        );
1✔
482
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
483
            &ServerCommand::ChangePassword(ChangePassword::default()),
1✔
484
            CHANGE_PASSWORD_CODE,
1✔
485
            &ChangePassword::default(),
1✔
486
        );
1✔
487
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
488
            &ServerCommand::LoginUser(LoginUser::default()),
1✔
489
            LOGIN_USER_CODE,
1✔
490
            &LoginUser::default(),
1✔
491
        );
1✔
492
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
493
            &ServerCommand::LogoutUser(LogoutUser::default()),
1✔
494
            LOGOUT_USER_CODE,
1✔
495
            &LogoutUser::default(),
1✔
496
        );
1✔
497
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
498
            &ServerCommand::GetPersonalAccessTokens(GetPersonalAccessTokens::default()),
1✔
499
            GET_PERSONAL_ACCESS_TOKENS_CODE,
1✔
500
            &GetPersonalAccessTokens::default(),
1✔
501
        );
1✔
502
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
503
            &ServerCommand::CreatePersonalAccessToken(CreatePersonalAccessToken::default()),
1✔
504
            CREATE_PERSONAL_ACCESS_TOKEN_CODE,
1✔
505
            &CreatePersonalAccessToken::default(),
1✔
506
        );
1✔
507
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
508
            &ServerCommand::DeletePersonalAccessToken(DeletePersonalAccessToken::default()),
1✔
509
            DELETE_PERSONAL_ACCESS_TOKEN_CODE,
1✔
510
            &DeletePersonalAccessToken::default(),
1✔
511
        );
1✔
512
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
513
            &ServerCommand::LoginWithPersonalAccessToken(LoginWithPersonalAccessToken::default()),
1✔
514
            LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE,
1✔
515
            &LoginWithPersonalAccessToken::default(),
1✔
516
        );
1✔
517
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
518
            &ServerCommand::SendMessages(SendMessages::default()),
1✔
519
            SEND_MESSAGES_CODE,
1✔
520
            &SendMessages::default(),
1✔
521
        );
1✔
522
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
523
            &ServerCommand::PollMessages(PollMessages::default()),
1✔
524
            POLL_MESSAGES_CODE,
1✔
525
            &PollMessages::default(),
1✔
526
        );
1✔
527
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
528
            &ServerCommand::StoreConsumerOffset(StoreConsumerOffset::default()),
1✔
529
            STORE_CONSUMER_OFFSET_CODE,
1✔
530
            &StoreConsumerOffset::default(),
1✔
531
        );
1✔
532
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
533
            &ServerCommand::GetConsumerOffset(GetConsumerOffset::default()),
1✔
534
            GET_CONSUMER_OFFSET_CODE,
1✔
535
            &GetConsumerOffset::default(),
1✔
536
        );
1✔
537
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
538
            &ServerCommand::GetStream(GetStream::default()),
1✔
539
            GET_STREAM_CODE,
1✔
540
            &GetStream::default(),
1✔
541
        );
1✔
542
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
543
            &ServerCommand::GetStreams(GetStreams::default()),
1✔
544
            GET_STREAMS_CODE,
1✔
545
            &GetStreams::default(),
1✔
546
        );
1✔
547
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
548
            &ServerCommand::CreateStream(CreateStream::default()),
1✔
549
            CREATE_STREAM_CODE,
1✔
550
            &CreateStream::default(),
1✔
551
        );
1✔
552
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
553
            &ServerCommand::DeleteStream(DeleteStream::default()),
1✔
554
            DELETE_STREAM_CODE,
1✔
555
            &DeleteStream::default(),
1✔
556
        );
1✔
557
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
558
            &ServerCommand::UpdateStream(UpdateStream::default()),
1✔
559
            UPDATE_STREAM_CODE,
1✔
560
            &UpdateStream::default(),
1✔
561
        );
1✔
562
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
563
            &ServerCommand::PurgeStream(PurgeStream::default()),
1✔
564
            PURGE_STREAM_CODE,
1✔
565
            &PurgeStream::default(),
1✔
566
        );
1✔
567
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
568
            &ServerCommand::GetTopic(GetTopic::default()),
1✔
569
            GET_TOPIC_CODE,
1✔
570
            &GetTopic::default(),
1✔
571
        );
1✔
572
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
573
            &ServerCommand::GetTopics(GetTopics::default()),
1✔
574
            GET_TOPICS_CODE,
1✔
575
            &GetTopics::default(),
1✔
576
        );
1✔
577
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
578
            &ServerCommand::CreateTopic(CreateTopic::default()),
1✔
579
            CREATE_TOPIC_CODE,
1✔
580
            &CreateTopic::default(),
1✔
581
        );
1✔
582
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
583
            &ServerCommand::DeleteTopic(DeleteTopic::default()),
1✔
584
            DELETE_TOPIC_CODE,
1✔
585
            &DeleteTopic::default(),
1✔
586
        );
1✔
587
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
588
            &ServerCommand::UpdateTopic(UpdateTopic::default()),
1✔
589
            UPDATE_TOPIC_CODE,
1✔
590
            &UpdateTopic::default(),
1✔
591
        );
1✔
592
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
593
            &ServerCommand::PurgeTopic(PurgeTopic::default()),
1✔
594
            PURGE_TOPIC_CODE,
1✔
595
            &PurgeTopic::default(),
1✔
596
        );
1✔
597
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
598
            &ServerCommand::CreatePartitions(CreatePartitions::default()),
1✔
599
            CREATE_PARTITIONS_CODE,
1✔
600
            &CreatePartitions::default(),
1✔
601
        );
1✔
602
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
603
            &ServerCommand::DeletePartitions(DeletePartitions::default()),
1✔
604
            DELETE_PARTITIONS_CODE,
1✔
605
            &DeletePartitions::default(),
1✔
606
        );
1✔
607
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
608
            &ServerCommand::GetConsumerGroup(GetConsumerGroup::default()),
1✔
609
            GET_CONSUMER_GROUP_CODE,
1✔
610
            &GetConsumerGroup::default(),
1✔
611
        );
1✔
612
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
613
            &ServerCommand::GetConsumerGroups(GetConsumerGroups::default()),
1✔
614
            GET_CONSUMER_GROUPS_CODE,
1✔
615
            &GetConsumerGroups::default(),
1✔
616
        );
1✔
617
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
618
            &ServerCommand::CreateConsumerGroup(CreateConsumerGroup::default()),
1✔
619
            CREATE_CONSUMER_GROUP_CODE,
1✔
620
            &CreateConsumerGroup::default(),
1✔
621
        );
1✔
622
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
623
            &ServerCommand::DeleteConsumerGroup(DeleteConsumerGroup::default()),
1✔
624
            DELETE_CONSUMER_GROUP_CODE,
1✔
625
            &DeleteConsumerGroup::default(),
1✔
626
        );
1✔
627
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
628
            &ServerCommand::JoinConsumerGroup(JoinConsumerGroup::default()),
1✔
629
            JOIN_CONSUMER_GROUP_CODE,
1✔
630
            &JoinConsumerGroup::default(),
1✔
631
        );
1✔
632
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
633
            &ServerCommand::LeaveConsumerGroup(LeaveConsumerGroup::default()),
1✔
634
            LEAVE_CONSUMER_GROUP_CODE,
1✔
635
            &LeaveConsumerGroup::default(),
1✔
636
        );
1✔
637
        assert_serialized_as_bytes_and_deserialized_from_bytes(
1✔
638
            &ServerCommand::FlushUnsavedBuffer(FlushUnsavedBuffer::default()),
1✔
639
            FLUSH_UNSAVED_BUFFER_CODE,
1✔
640
            &FlushUnsavedBuffer::default(),
1✔
641
        );
1✔
642
    }
1✔
643

644
    fn assert_serialized_as_bytes_and_deserialized_from_bytes(
43✔
645
        command: &ServerCommand,
43✔
646
        code: u32,
43✔
647
        payload: &dyn Command,
43✔
648
    ) {
43✔
649
        assert_serialized_as_bytes(command, code, payload);
43✔
650
        assert_deserialized_from_bytes(command, code, payload);
43✔
651
    }
43✔
652

653
    fn assert_serialized_as_bytes(
43✔
654
        server_command: &ServerCommand,
43✔
655
        code: u32,
43✔
656
        command: &dyn Command,
43✔
657
    ) {
43✔
658
        let payload = command.to_bytes();
43✔
659
        let mut bytes = BytesMut::with_capacity(4 + payload.len());
43✔
660
        bytes.put_u32_le(code);
43✔
661
        bytes.put_slice(&payload);
43✔
662
        assert_eq!(server_command.to_bytes(), bytes);
43✔
663
    }
43✔
664

665
    fn assert_deserialized_from_bytes(
43✔
666
        command: &ServerCommand,
43✔
667
        command_id: u32,
43✔
668
        payload: &dyn Command,
43✔
669
    ) {
43✔
670
        let payload = payload.to_bytes();
43✔
671
        let mut bytes = BytesMut::with_capacity(4 + payload.len());
43✔
672
        bytes.put_u32_le(command_id);
43✔
673
        bytes.put_slice(&payload);
43✔
674
        let bytes = Bytes::from(bytes);
43✔
675
        assert_eq!(&ServerCommand::from_bytes(bytes).unwrap(), command);
43✔
676
    }
43✔
677
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc