• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

unrenamed / chatd / 9928640664

14 Jul 2024 02:31PM UTC coverage: 16.1% (+3.8%) from 12.265%
9928640664

push

github

unrenamed
fix: `UserConfig` methods visibility

0 of 7 new or added lines in 1 file covered. (0.0%)

472 existing lines in 7 files now uncovered.

379 of 2354 relevant lines covered (16.1%)

0.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/server/session/repository.rs
1
use std::fmt::Debug;
2
use std::sync::Arc;
3

4
use log::{error, info, trace, warn};
5
use terminal_keycode::KeyCode;
6
use tokio::spawn;
7
use tokio::sync::mpsc::{self, Receiver};
8
use tokio::sync::{watch, Mutex};
9

10
use crate::auth::Auth;
11
use crate::chat::ChatRoom;
12
use crate::pubkey::PubKey;
13
use crate::server::session_workflow::*;
14
use crate::terminal::{keyboard_decoder, Terminal, TerminalHandle};
15

16
type SessionId = usize;
17
type SessionSshId = String;
18
type SessionConnectUsername = String;
19

20
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
21
pub enum SessionEvent {
22
    Data(Vec<u8>),
23
    Disconnect,
24
    WindowResize(u16, u16),
25
    Env(String, String),
26
}
27

28
pub enum SessionRepositoryEvent {
29
    NewSession(
30
        SessionId,
31
        SessionSshId,
32
        SessionConnectUsername,
33
        PubKey,
34
        TerminalHandle,
35
        Receiver<SessionEvent>,
36
    ),
37
}
38

39
impl Debug for SessionRepositoryEvent {
UNCOV
40
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
41
        match self {
42
            Self::NewSession(arg0, arg1, arg2, arg3, _arg4, _arg5) => f
×
43
                .debug_tuple("NewSession")
44
                .field(arg0)
45
                .field(arg1)
46
                .field(arg2)
47
                .field(arg3)
48
                .finish(),
49
        }
50
    }
51
}
52

53
pub struct SessionRepository {
54
    repo_event_receiver: Receiver<SessionRepositoryEvent>,
55
}
56

57
impl SessionRepository {
UNCOV
58
    pub fn new(repo_event_receiver: Receiver<SessionRepositoryEvent>) -> Self {
×
59
        Self {
60
            repo_event_receiver,
61
        }
62
    }
63

UNCOV
64
    pub async fn wait_for_sessions(&mut self, room: Arc<Mutex<ChatRoom>>, auth: Arc<Mutex<Auth>>) {
×
UNCOV
65
        while let Some(event) = self.repo_event_receiver.recv().await {
×
66
            match event {
67
                SessionRepositoryEvent::NewSession(id, ssh_id, username, pk, handle, event_rx) => {
×
68
                    let room = room.clone();
×
UNCOV
69
                    let auth = auth.clone();
×
70
                    let mut terminal = Terminal::new(handle);
×
UNCOV
71
                    let (message_tx, message_rx) = mpsc::channel(100);
×
UNCOV
72
                    let (exit_tx, exit_rx) = watch::channel(());
×
73

UNCOV
74
                    spawn(async move {
×
75
                        {
UNCOV
76
                            let mut room = room.lock().await;
×
UNCOV
77
                            let join_result = room
×
UNCOV
78
                                .join(id, username, pk, ssh_id, message_tx, exit_tx)
×
79
                                .await;
×
80
                            if let Ok(user) = join_result {
×
81
                                terminal.set_prompt(&user.config.display_name());
×
82
                            }
83
                        }
UNCOV
84
                        Self::handle_session(
×
85
                            id, room, auth, terminal, event_rx, message_rx, exit_rx,
×
86
                        )
87
                        .await;
×
88
                    });
89
                }
90
            }
91
        }
92
    }
93

UNCOV
94
    async fn handle_session(
×
95
        id: SessionId,
96
        room: Arc<Mutex<ChatRoom>>,
97
        auth: Arc<Mutex<Auth>>,
98
        terminal: Terminal,
99
        event_rx: Receiver<SessionEvent>,
100
        message_rx: Receiver<String>,
101
        exit_rx: watch::Receiver<()>,
102
    ) {
UNCOV
103
        let terminal = Arc::new(Mutex::new(terminal));
×
UNCOV
104
        let (disconnect_tx, disconnect_rx) = watch::channel(());
×
105

UNCOV
106
        let session_handle = spawn(Self::process_session_events(
×
107
            id,
UNCOV
108
            room.clone(),
×
UNCOV
109
            auth,
×
UNCOV
110
            terminal.clone(),
×
UNCOV
111
            event_rx,
×
UNCOV
112
            disconnect_tx,
×
113
        ));
114

115
        let room_handle = spawn(Self::process_room_events(
×
116
            id,
117
            room.clone(),
×
UNCOV
118
            terminal,
×
119
            message_rx,
×
120
            exit_rx,
×
121
            disconnect_rx,
×
122
        ));
123

UNCOV
124
        let _ = session_handle.await;
×
UNCOV
125
        let _ = room_handle.await;
×
126

UNCOV
127
        trace!("Fell through the session tasks, indicating disconnection on session. Threads are closed");
×
128
    }
129

130
    async fn process_session_events(
×
131
        id: SessionId,
132
        room: Arc<Mutex<ChatRoom>>,
133
        auth: Arc<Mutex<Auth>>,
134
        terminal: Arc<Mutex<Terminal>>,
135
        mut event_rx: Receiver<SessionEvent>,
136
        disconnect_tx: watch::Sender<()>,
137
    ) {
138
        info!("Session events processing task for id={id} is started");
×
139

UNCOV
140
        while let Some(event) = event_rx.recv().await {
×
141
            match event {
×
UNCOV
142
                SessionEvent::Data(data) => {
×
UNCOV
143
                    let mut room = room.lock().await;
×
UNCOV
144
                    let mut auth = auth.lock().await;
×
UNCOV
145
                    let mut term = terminal.lock().await;
×
146

UNCOV
147
                    let user = room.find_member_by_id(id).user.clone();
×
UNCOV
148
                    let mut ctx = WorkflowContext::new(user);
×
149

UNCOV
150
                    let codes = keyboard_decoder::decode_bytes_to_codes(&data);
×
151
                    for code in codes {
×
152
                        match code {
×
153
                            KeyCode::Tab => {
154
                                let mut autocomplete = Autocomplete::default();
×
155
                                if let Err(err) = autocomplete
×
156
                                    .execute(&mut ctx, &mut term, &mut room, &mut auth)
×
UNCOV
157
                                    .await
×
158
                                {
159
                                    error!(
×
160
                                        "Failed to execute autocomplete workflow for user {}: {}",
161
                                        id, err
162
                                    );
163
                                }
164
                            }
165
                            KeyCode::Enter => {
166
                                let command_executor = CommandExecutor::default();
×
167
                                let command_parser = CommandParser::new(command_executor);
×
168
                                let input_validator = InputValidator::new(command_parser);
×
UNCOV
169
                                let mut rate_checker = InputRateChecker::new(input_validator);
×
170
                                if let Err(err) = rate_checker
×
UNCOV
171
                                    .execute(&mut ctx, &mut term, &mut room, &mut auth)
×
UNCOV
172
                                    .await
×
173
                                {
UNCOV
174
                                    error!(
×
175
                                        "Failed to execute command workflow for user {}: {}",
176
                                        id, err
177
                                    );
178
                                }
179
                            }
180
                            _ => {
181
                                let mut key_mapper = TerminalKeyMapper::new(code);
×
182
                                if let Err(err) = key_mapper
×
183
                                    .execute(&mut ctx, &mut term, &mut room, &mut auth)
×
UNCOV
184
                                    .await
×
185
                                {
UNCOV
186
                                    error!(
×
187
                                        "Failed to execute terminal workflow for user {}: {}",
188
                                        id, err
189
                                    );
190
                                }
191
                            }
192
                        }
193
                    }
194
                }
195
                SessionEvent::Env(name, value) => {
×
UNCOV
196
                    let mut room = room.lock().await;
×
197
                    let mut auth = auth.lock().await;
×
UNCOV
198
                    let mut term = terminal.lock().await;
×
199

UNCOV
200
                    let user = room.find_member_by_id(id).user.clone();
×
UNCOV
201
                    let mut ctx = WorkflowContext::new(user);
×
202

UNCOV
203
                    let command_executor = CommandExecutor::default();
×
UNCOV
204
                    let command_parser = CommandParser::new(command_executor);
×
UNCOV
205
                    let mut env_parser = EnvParser::new(name, value, command_parser);
×
206
                    if let Err(err) = env_parser
×
207
                        .execute(&mut ctx, &mut term, &mut room, &mut auth)
×
208
                        .await
×
209
                    {
UNCOV
210
                        error!("Failed to execute env workflow for user {}: {}", id, err);
×
211
                    }
212
                }
213
                SessionEvent::Disconnect => {
214
                    let _ = disconnect_tx.send(());
×
215
                    info!("Session events processing task for id={id} is finished");
×
216
                    return;
217
                }
218
                SessionEvent::WindowResize(width, height) => {
×
219
                    let mut terminal = terminal.lock().await;
×
UNCOV
220
                    terminal.set_size(width, height);
×
221
                }
222
            }
223
        }
224
    }
225

226
    async fn process_room_events(
×
227
        id: SessionId,
228
        room: Arc<Mutex<ChatRoom>>,
229
        terminal: Arc<Mutex<Terminal>>,
230
        mut message_rx: Receiver<String>,
231
        mut exit_rx: watch::Receiver<()>,
232
        mut disconnect_rx: watch::Receiver<()>,
233
    ) {
UNCOV
234
        info!("Render task for id={id} is started");
×
235

UNCOV
236
        tokio::select! {
×
237
            _ = exit_rx.changed() => {
238
                terminal.lock().await.exit();
239
                if let Err(err) = room.lock().await.leave(&id).await {
240
                    error!("Failed to exit the server by user {}: {}", id, err);
241
                }
242
                info!("Render task for id={id} aborted because session is closed by a user");
243
                return;
244
            }
245
            _ = disconnect_rx.changed() => {
246
                if let Err(err) = room.lock().await.leave(&id).await {
247
                    error!("Failed to disconnect user {} from the server: {}", id, err);
248
                }
249
                info!("Render task for id={id} aborted because session is disconnected");
250
                return;
251
            }
252
            _ = async {
253
                while let Some(msg) = message_rx.recv().await {
254
                    let _ = terminal.lock().await.print_message(&msg);
255
                }
256
            } => {
257
                // Warning: This situation is uncommon and should not occur under normal circumstances.
258
                warn!("Render task for id={id} finished its work");
259
            }
260
        }
261
    }
262
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc