• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

iggy-rs / iggy / 12875284241

20 Jan 2025 08:30PM UTC coverage: 75.442% (+0.05%) from 75.392%
12875284241

push

github

web-flow
Refactor: Replace dyn Trait with Enum for Encryptor and Persister Types (#1442)

Addresses part of #https://github.com/iggy-rs/iggy/issues/667

This PR refactors the codebase to replace the use of dyn Trait for
Encryptor and Persister with enum types (EncryptorKind and
PersisterKind)

Co-authored-by: Hubert Gruszecki <h.gruszecki@gmail.com>

41 of 61 new or added lines in 16 files covered. (67.21%)

17 existing lines in 4 files now uncovered.

24680 of 32714 relevant lines covered (75.44%)

25631.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.25
/cli/src/main.rs
1
mod args;
2
mod credentials;
3
mod error;
4
mod logging;
5

6
use crate::args::{
7
    client::ClientAction, consumer_group::ConsumerGroupAction,
8
    consumer_offset::ConsumerOffsetAction, permissions::PermissionsArgs,
9
    personal_access_token::PersonalAccessTokenAction, stream::StreamAction, topic::TopicAction,
10
    Command, IggyConsoleArgs,
11
};
12
use crate::credentials::IggyCredentials;
13
use crate::error::IggyCmdError;
14
use crate::logging::Logging;
15
use args::context::ContextAction;
16
use args::message::MessageAction;
17
use args::partition::PartitionAction;
18
use args::user::UserAction;
19
use args::{CliOptions, IggyMergedConsoleArgs};
20
use clap::Parser;
21
use iggy::args::Args;
22
use iggy::cli::context::common::ContextManager;
23
use iggy::cli::context::use_context::UseContextCmd;
24
use iggy::cli::system::snapshot::GetSnapshotCmd;
25
use iggy::cli::{
26
    client::{get_client::GetClientCmd, get_clients::GetClientsCmd},
27
    consumer_group::{
28
        create_consumer_group::CreateConsumerGroupCmd,
29
        delete_consumer_group::DeleteConsumerGroupCmd, get_consumer_group::GetConsumerGroupCmd,
30
        get_consumer_groups::GetConsumerGroupsCmd,
31
    },
32
    consumer_offset::{
33
        get_consumer_offset::GetConsumerOffsetCmd, set_consumer_offset::SetConsumerOffsetCmd,
34
    },
35
    context::get_contexts::GetContextsCmd,
36
    message::{
37
        flush_messages::FlushMessagesCmd, poll_messages::PollMessagesCmd,
38
        send_messages::SendMessagesCmd,
39
    },
40
    partitions::{create_partitions::CreatePartitionsCmd, delete_partitions::DeletePartitionsCmd},
41
    personal_access_tokens::{
42
        create_personal_access_token::CreatePersonalAccessTokenCmd,
43
        delete_personal_access_tokens::DeletePersonalAccessTokenCmd,
44
        get_personal_access_tokens::GetPersonalAccessTokensCmd,
45
    },
46
    streams::{
47
        create_stream::CreateStreamCmd, delete_stream::DeleteStreamCmd, get_stream::GetStreamCmd,
48
        get_streams::GetStreamsCmd, purge_stream::PurgeStreamCmd, update_stream::UpdateStreamCmd,
49
    },
50
    system::{me::GetMeCmd, ping::PingCmd, stats::GetStatsCmd},
51
    topics::{
52
        create_topic::CreateTopicCmd, delete_topic::DeleteTopicCmd, get_topic::GetTopicCmd,
53
        get_topics::GetTopicsCmd, purge_topic::PurgeTopicCmd, update_topic::UpdateTopicCmd,
54
    },
55
    users::{
56
        change_password::ChangePasswordCmd,
57
        create_user::CreateUserCmd,
58
        delete_user::DeleteUserCmd,
59
        get_user::GetUserCmd,
60
        get_users::GetUsersCmd,
61
        update_permissions::UpdatePermissionsCmd,
62
        update_user::{UpdateUserCmd, UpdateUserType},
63
    },
64
};
65
use iggy::cli_command::{CliCommand, PRINT_TARGET};
66
use iggy::client_provider::{self, ClientProviderConfig};
67
use iggy::clients::client::IggyClient;
68
use iggy::utils::crypto::{Aes256GcmEncryptor, EncryptorKind};
69
use iggy::utils::personal_access_token_expiry::PersonalAccessTokenExpiry;
70
use std::sync::Arc;
71
use tracing::{event, Level};
72

73
#[cfg(feature = "login-session")]
74
mod main_login_session {
75
    pub(crate) use iggy::cli::system::{login::LoginCmd, logout::LogoutCmd};
76
    pub(crate) use iggy::cli::utils::login_session_expiry::LoginSessionExpiry;
77
}
78

79
#[cfg(feature = "login-session")]
80
use main_login_session::*;
81

82
fn get_command(
208✔
83
    command: Command,
208✔
84
    cli_options: &CliOptions,
208✔
85
    iggy_args: &Args,
208✔
86
) -> Box<dyn CliCommand> {
208✔
87
    #[warn(clippy::let_and_return)]
208✔
88
    match command {
208✔
89
        Command::Stream(command) => match command {
13✔
90
            StreamAction::Create(args) => {
2✔
91
                Box::new(CreateStreamCmd::new(args.stream_id, args.name.clone()))
2✔
92
            }
93
            StreamAction::Delete(args) => Box::new(DeleteStreamCmd::new(args.stream_id.clone())),
2✔
94
            StreamAction::Update(args) => Box::new(UpdateStreamCmd::new(
2✔
95
                args.stream_id.clone(),
2✔
96
                args.name.clone(),
2✔
97
            )),
2✔
98
            StreamAction::Get(args) => Box::new(GetStreamCmd::new(args.stream_id.clone())),
2✔
99
            StreamAction::List(args) => Box::new(GetStreamsCmd::new(args.list_mode.into())),
3✔
100
            StreamAction::Purge(args) => Box::new(PurgeStreamCmd::new(args.stream_id.clone())),
2✔
101
        },
102
        Command::Topic(command) => match command {
25✔
103
            TopicAction::Create(args) => Box::new(CreateTopicCmd::new(
4✔
104
                args.stream_id.clone(),
4✔
105
                args.topic_id,
4✔
106
                args.partitions_count,
4✔
107
                args.compression_algorithm,
4✔
108
                args.name.clone(),
4✔
109
                args.message_expiry.clone().into(),
4✔
110
                args.max_topic_size,
4✔
111
                args.replication_factor,
4✔
112
            )),
4✔
113
            TopicAction::Delete(args) => Box::new(DeleteTopicCmd::new(
4✔
114
                args.stream_id.clone(),
4✔
115
                args.topic_id.clone(),
4✔
116
            )),
4✔
117
            TopicAction::Update(args) => Box::new(UpdateTopicCmd::new(
6✔
118
                args.stream_id.clone(),
6✔
119
                args.topic_id.clone(),
6✔
120
                args.compression_algorithm,
6✔
121
                args.name.clone(),
6✔
122
                args.message_expiry.clone().into(),
6✔
123
                args.max_topic_size,
6✔
124
                args.replication_factor,
6✔
125
            )),
6✔
126
            TopicAction::Get(args) => Box::new(GetTopicCmd::new(
4✔
127
                args.stream_id.clone(),
4✔
128
                args.topic_id.clone(),
4✔
129
            )),
4✔
130
            TopicAction::List(args) => Box::new(GetTopicsCmd::new(
3✔
131
                args.stream_id.clone(),
3✔
132
                args.list_mode.into(),
3✔
133
            )),
3✔
134
            TopicAction::Purge(args) => Box::new(PurgeTopicCmd::new(
4✔
135
                args.stream_id.clone(),
4✔
136
                args.topic_id.clone(),
4✔
137
            )),
4✔
138
        },
139
        Command::Partition(command) => match command {
8✔
140
            PartitionAction::Create(args) => Box::new(CreatePartitionsCmd::new(
4✔
141
                args.stream_id.clone(),
4✔
142
                args.topic_id.clone(),
4✔
143
                args.partitions_count,
4✔
144
            )),
4✔
145
            PartitionAction::Delete(args) => Box::new(DeletePartitionsCmd::new(
4✔
146
                args.stream_id.clone(),
4✔
147
                args.topic_id.clone(),
4✔
148
                args.partitions_count,
4✔
149
            )),
4✔
150
        },
151
        Command::Ping(args) => Box::new(PingCmd::new(args.count)),
2✔
152
        Command::Me => Box::new(GetMeCmd::new()),
13✔
153
        Command::Stats(args) => Box::new(GetStatsCmd::new(cli_options.quiet, args.output.into())),
5✔
154
        Command::Snapshot(args) => Box::new(GetSnapshotCmd::new(
1✔
155
            args.compression,
1✔
156
            args.snapshot_types,
1✔
157
            args.out_dir,
1✔
158
        )),
1✔
159
        Command::Pat(command) => match command {
8✔
160
            PersonalAccessTokenAction::Create(pat_create_args) => {
3✔
161
                Box::new(CreatePersonalAccessTokenCmd::new(
3✔
162
                    pat_create_args.name.clone(),
3✔
163
                    PersonalAccessTokenExpiry::new(pat_create_args.expiry.clone()),
3✔
164
                    cli_options.quiet,
3✔
165
                    pat_create_args.store_token,
3✔
166
                    iggy_args.get_server_address().unwrap(),
3✔
167
                ))
3✔
168
            }
169
            PersonalAccessTokenAction::Delete(pat_delete_args) => {
2✔
170
                Box::new(DeletePersonalAccessTokenCmd::new(
2✔
171
                    pat_delete_args.name.clone(),
2✔
172
                    iggy_args.get_server_address().unwrap(),
2✔
173
                ))
2✔
174
            }
175
            PersonalAccessTokenAction::List(pat_list_args) => Box::new(
3✔
176
                GetPersonalAccessTokensCmd::new(pat_list_args.list_mode.into()),
3✔
177
            ),
3✔
178
        },
179
        Command::User(command) => match command {
34✔
180
            UserAction::Create(create_args) => Box::new(CreateUserCmd::new(
7✔
181
                create_args.username.clone(),
7✔
182
                create_args.password.clone(),
7✔
183
                create_args.user_status.clone().into(),
7✔
184
                PermissionsArgs::new(
7✔
185
                    create_args.global_permissions.clone(),
7✔
186
                    create_args.stream_permissions.clone(),
7✔
187
                )
7✔
188
                .into(),
7✔
189
            )),
7✔
190
            UserAction::Delete(delete_args) => {
2✔
191
                Box::new(DeleteUserCmd::new(delete_args.user_id.clone()))
2✔
192
            }
193
            UserAction::Get(get_args) => Box::new(GetUserCmd::new(get_args.user_id.clone())),
7✔
194
            UserAction::List(list_args) => Box::new(GetUsersCmd::new(list_args.list_mode.into())),
3✔
195
            UserAction::Name(name_args) => Box::new(UpdateUserCmd::new(
3✔
196
                name_args.user_id.clone(),
3✔
197
                UpdateUserType::Name(name_args.username.clone()),
3✔
198
            )),
3✔
199
            UserAction::Status(status_args) => Box::new(UpdateUserCmd::new(
4✔
200
                status_args.user_id.clone(),
4✔
201
                UpdateUserType::Status(status_args.status.clone().into()),
4✔
202
            )),
4✔
203
            UserAction::Password(change_pwd_args) => Box::new(ChangePasswordCmd::new(
4✔
204
                change_pwd_args.user_id,
4✔
205
                change_pwd_args.current_password,
4✔
206
                change_pwd_args.new_password,
4✔
207
            )),
4✔
208
            UserAction::Permissions(permissions_args) => Box::new(UpdatePermissionsCmd::new(
4✔
209
                permissions_args.user_id.clone(),
4✔
210
                PermissionsArgs::new(
4✔
211
                    permissions_args.global_permissions.clone(),
4✔
212
                    permissions_args.stream_permissions.clone(),
4✔
213
                )
4✔
214
                .into(),
4✔
215
            )),
4✔
216
        },
217
        Command::Client(command) => match command {
4✔
218
            ClientAction::Get(get_args) => Box::new(GetClientCmd::new(get_args.client_id)),
1✔
219
            ClientAction::List(list_args) => {
3✔
220
                Box::new(GetClientsCmd::new(list_args.list_mode.into()))
3✔
221
            }
222
        },
223
        Command::ConsumerGroup(command) => match command {
32✔
224
            ConsumerGroupAction::Create(create_args) => Box::new(CreateConsumerGroupCmd::new(
4✔
225
                create_args.stream_id.clone(),
4✔
226
                create_args.topic_id.clone(),
4✔
227
                create_args.name.clone(),
4✔
228
                create_args.group_id,
4✔
229
            )),
4✔
230
            ConsumerGroupAction::Delete(delete_args) => Box::new(DeleteConsumerGroupCmd::new(
8✔
231
                delete_args.stream_id.clone(),
8✔
232
                delete_args.topic_id.clone(),
8✔
233
                delete_args.group_id.clone(),
8✔
234
            )),
8✔
235
            ConsumerGroupAction::Get(get_args) => Box::new(GetConsumerGroupCmd::new(
8✔
236
                get_args.stream_id.clone(),
8✔
237
                get_args.topic_id.clone(),
8✔
238
                get_args.group_id.clone(),
8✔
239
            )),
8✔
240
            ConsumerGroupAction::List(list_args) => Box::new(GetConsumerGroupsCmd::new(
12✔
241
                list_args.stream_id.clone(),
12✔
242
                list_args.topic_id.clone(),
12✔
243
                list_args.list_mode.into(),
12✔
244
            )),
12✔
245
        },
246
        Command::Message(command) => match command {
35✔
247
            MessageAction::Send(send_args) => Box::new(SendMessagesCmd::new(
14✔
248
                send_args.stream_id.clone(),
14✔
249
                send_args.topic_id.clone(),
14✔
250
                send_args.partition_id,
14✔
251
                send_args.message_key.clone(),
14✔
252
                send_args.messages.clone(),
14✔
253
                send_args.headers.clone(),
14✔
254
                send_args.input_file.clone(),
14✔
255
            )),
14✔
256
            MessageAction::Poll(poll_args) => Box::new(PollMessagesCmd::new(
13✔
257
                poll_args.stream_id.clone(),
13✔
258
                poll_args.topic_id.clone(),
13✔
259
                poll_args.partition_id,
13✔
260
                poll_args.message_count,
13✔
261
                poll_args.auto_commit,
13✔
262
                poll_args.offset,
13✔
263
                poll_args.first,
13✔
264
                poll_args.last,
13✔
265
                poll_args.next,
13✔
266
                poll_args.consumer.clone(),
13✔
267
                poll_args.show_headers,
13✔
268
                poll_args.output_file.clone(),
13✔
269
            )),
13✔
270
            MessageAction::Flush(flush_args) => Box::new(FlushMessagesCmd::new(
8✔
271
                flush_args.stream_id.clone(),
8✔
272
                flush_args.topic_id.clone(),
8✔
273
                flush_args.partition_id,
8✔
274
                flush_args.fsync,
8✔
275
            )),
8✔
276
        },
277
        Command::ConsumerOffset(command) => match command {
18✔
278
            ConsumerOffsetAction::Get(get_args) => Box::new(GetConsumerOffsetCmd::new(
9✔
279
                get_args.consumer_id.clone(),
9✔
280
                get_args.stream_id.clone(),
9✔
281
                get_args.topic_id.clone(),
9✔
282
                get_args.partition_id,
9✔
283
            )),
9✔
284
            ConsumerOffsetAction::Set(set_args) => Box::new(SetConsumerOffsetCmd::new(
9✔
285
                set_args.consumer_id.clone(),
9✔
286
                set_args.stream_id.clone(),
9✔
287
                set_args.topic_id.clone(),
9✔
288
                set_args.partition_id,
9✔
289
                set_args.offset,
9✔
290
            )),
9✔
291
        },
292
        Command::Context(command) => match command {
4✔
293
            ContextAction::List(list_args) => {
3✔
294
                Box::new(GetContextsCmd::new(list_args.list_mode.into()))
3✔
295
            }
296
            ContextAction::Use(use_args) => {
1✔
297
                Box::new(UseContextCmd::new(use_args.context_name.clone()))
1✔
298
            }
299
        },
300
        #[cfg(feature = "login-session")]
301
        Command::Login(login_args) => Box::new(LoginCmd::new(
4✔
302
            iggy_args.get_server_address().unwrap(),
4✔
303
            LoginSessionExpiry::new(login_args.expiry.clone()),
4✔
304
        )),
4✔
305
        #[cfg(feature = "login-session")]
306
        Command::Logout => Box::new(LogoutCmd::new(iggy_args.get_server_address().unwrap())),
2✔
307
    }
308
}
208✔
309

310
#[tokio::main]
311
async fn main() -> Result<(), IggyCmdError> {
306✔
312
    let args = IggyConsoleArgs::parse();
306✔
313

306✔
314
    if let Some(generator) = args.cli.generator {
306✔
315
        args.generate_completion(generator);
306✔
316
        return Ok(());
97✔
317
    }
306✔
318

209✔
319
    if args.command.is_none() {
209✔
320
        IggyConsoleArgs::print_overview();
306✔
321
        return Ok(());
1✔
322
    }
306✔
323

208✔
324
    let mut logging = Logging::new();
208✔
325
    logging.init(args.cli.quiet, &args.cli.debug);
208✔
326

208✔
327
    let command = args.command.clone().unwrap();
208✔
328

208✔
329
    let mut context_manager = ContextManager::default();
208✔
330
    let active_context = context_manager.get_active_context().await?;
306✔
331
    let merged_args = IggyMergedConsoleArgs::from_context(active_context, args);
306✔
332

208✔
333
    let iggy_args = merged_args.iggy;
208✔
334
    let cli_options = merged_args.cli;
208✔
335

208✔
336
    // Get command based on command line arguments
208✔
337
    let mut command = get_command(command, &cli_options, &iggy_args);
208✔
338

306✔
339
    // Create credentials based on command line arguments and command
306✔
340
    let mut credentials = IggyCredentials::new(&cli_options, &iggy_args, command.login_required())?;
306✔
341

306✔
342
    let encryptor = match iggy_args.encryption_key.is_empty() {
306✔
343
        true => None,
306✔
344
        false => Some(Arc::new(EncryptorKind::Aes256Gcm(
306✔
345
            Aes256GcmEncryptor::from_base64_key(&iggy_args.encryption_key).unwrap(),
×
NEW
346
        ))),
×
347
    };
306✔
348
    let client_provider_config = Arc::new(ClientProviderConfig::from_args_set_autologin(
306✔
349
        iggy_args.clone(),
206✔
350
        false,
206✔
351
    )?);
206✔
352

306✔
353
    let client =
306✔
354
        client_provider::get_raw_client(client_provider_config, command.connection_required())
306✔
355
            .await?;
206✔
356
    let client = IggyClient::create(client, None, encryptor);
306✔
357

206✔
358
    credentials.set_iggy_client(&client);
206✔
359
    credentials.login_user().await?;
206✔
360

306✔
361
    if command.use_tracing() {
306✔
362
        event!(target: PRINT_TARGET, Level::INFO, "Executing {}", command.explain());
306✔
363
    } else {
306✔
364
        println!("Executing {}", command.explain());
1✔
365
    }
1✔
366
    command.execute_cmd(&client).await?;
306✔
367

306✔
368
    credentials.logout_user().await?;
306✔
369

306✔
370
    Ok(())
306✔
371
}
306✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc