• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Red-M / RedSSH / 52347

pending completion
52347

push

coveralls-python

Red_M
Fix bug with pty argument to open a single use exec command channel. Add test for `redssh.RedSSH().execute_command()`. Version bump.

9 of 9 new or added lines in 4 files covered. (100.0%)

323 of 1605 relevant lines covered (20.12%)

0.2 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

18.0
/redssh/clients/libssh2/redsshlibssh2.py
1
# RedSSH
2
# Copyright (C) 2018 - 2022 Red_M ( http://bitbucket.com/Red_M )
3

4
# This program is free software; you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License as published by
6
# the Free Software Foundation; either version 2 of the License, or
7
# (at your option) any later version.
8

9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU General Public License for more details.
13

14
# You should have received a copy of the GNU General Public License along
15
# with this program; if not, write to the Free Software Foundation, Inc.,
16
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17

18

19
import os
1✔
20
import re
1✔
21
import time
1✔
22
import hashlib
1✔
23
import threading
1✔
24
import multiprocessing
1✔
25
import socket
1✔
26
import select
1✔
27
import ssh2
1✔
28

29
from redssh.clients.base_client import BaseClient, BaseClientModules
1✔
30
from redssh.clients.libssh2 import libssh2
1✔
31
from redssh.clients.libssh2 import enums as client_enums
1✔
32
from redssh import exceptions
1✔
33
from redssh import enums
1✔
34
from redssh.clients.libssh2 import sftp
1✔
35
from redssh.clients.libssh2 import scp
1✔
36
from redssh.clients.libssh2 import tunneling
1✔
37
from redssh.clients.libssh2 import x11
1✔
38

39
class LibSSH2Modules(BaseClientModules):
1✔
40
    client_enums = client_enums
1✔
41
    scp = scp
1✔
42
    sftp = sftp
1✔
43
    tunneling = tunneling
1✔
44
    x11 = x11
1✔
45

46
class LibSSH2(BaseClient):
1✔
47
    '''
48
    Instances the start of an SSH connection.
49
    Extra options are available after :func:`redssh.RedSSH.connect` is called.
50

51
    :param encoding: Set the encoding to something other than the default of ``'utf8'`` when your target SSH server doesn't return UTF-8.
52
    :type encoding: ``str``
53
    :param terminal: Set the terminal sent to the remote server to something other than the default of ``'vt100'``.
54
    :type terminal: ``str``
55
    :param ssh_host_key_verification: Change the behaviour of remote host key verification. Can be set to one of the following values, ``strict``, ``warn``, ``auto_add`` or ``none``.
56
    :type ssh_host_key_verification: :class:`redssh.enums.SSHHostKeyVerify`
57
    :param ssh_keepalive_interval: Enable or disable SSH keepalive packets, value is interval in seconds, ``0`` is off.
58
    :type ssh_keepalive_interval: ``float``
59
    :param set_flags: Not supported in ssh2-python 0.18.0
60
    :type set_flags: ``dict``
61
    :param method_preferences: Not supported in ssh2-python 0.18.0
62
    :type method_preferences: ``dict``
63
    :param callbacks: Not supported yet
64
    :type callbacks: ``dict``
65
    :param auto_terminate_tunnels: Automatically terminate tunnels when errors are detected
66
    :type auto_terminate_tunnels: ``bool``
67
    :param tcp_nodelay: Set `TCP_NODELAY` for the underlying :func:`socket.socket`, by default this is off via `False`.
68
    :type tcp_nodelay: ``bool``
69
    '''
70
    def __init__(self,*args,set_flags={},method_preferences={},callbacks={},**kwargs):
1✔
71
        super().__init__(*args,**kwargs)
×
72

73
        self.set_flags = set_flags
×
74
        self.method_preferences = method_preferences
×
75
        self.callbacks = callbacks
×
76
        self._ssh_keepalive_thread = None
×
77
        self._ssh_keepalive_event = None
×
78
        self._modules = LibSSH2Modules
×
79
        self.enums = self._modules.client_enums
×
80

81
    def ssh_keepalive(self):
1✔
82
        timeout = 0.01
×
83
        while self.__check_for_attr__('channel')==False:
×
84
            time.sleep(timeout)
×
85
        while self._ssh_keepalive_event.is_set()==False and self.__check_for_attr__('channel')==True:
×
86
            timeout = self._block(self.session.keepalive_send,_select_timeout=self._select_timeout)
×
87
            self._ssh_keepalive_event.wait(timeout=timeout)
×
88

89
    def _block(self,func,*args,**kwargs):
1✔
90
        if self.__shutdown_all__.is_set()==False:
×
91
            default_str = 'sdkljfhklsdjf'
×
92
            _select_timeout = kwargs.get('_select_timeout',default_str)
×
93
            if _select_timeout==default_str:
×
94
                _select_timeout = None
×
95
            else:
96
                _select_timeout = float(_select_timeout)
×
97
                del kwargs['_select_timeout']
×
98
            out = libssh2.LIBSSH2_ERROR_EAGAIN
×
99
            while out==libssh2.LIBSSH2_ERROR_EAGAIN:
×
100
                self._block_select(_select_timeout)
×
101
                with self.session._block_lock:
×
102
                    out = func(*args,**kwargs)
×
103
            return(out)
×
104

105
    def _block_write(self,func,data,_select_timeout=None):
1✔
106
        data_len = len(data)
×
107
        total_written = 0
×
108
        while total_written<data_len:
×
109
            if self.__shutdown_all__.is_set()==False:
×
110
                self._block_select(_select_timeout)
×
111
                with self.session._block_lock:
×
112
                    (rc,bytes_written) = func(data[total_written:])
×
113
                total_written+=bytes_written
×
114
        return(total_written)
×
115

116
    def _read_iter(self,func,block=False,max_read=-1,_select_timeout=None):
1✔
117
        pos = 0
×
118
        remainder_len = 0
×
119
        remainder = b''
×
120
        if self.__shutdown_all__.is_set()==False:
×
121
            self._block_select(_select_timeout)
×
122
            with self.session._block_lock:
×
123
                (size,data) = func()
×
124
            while size==libssh2.LIBSSH2_ERROR_EAGAIN or size>0:
×
125
                if size==libssh2.LIBSSH2_ERROR_EAGAIN:
×
126
                    self._block_select(_select_timeout)
×
127
                    if self.__shutdown_all__.is_set()==False:
×
128
                        with self.session._block_lock:
×
129
                            (size,data) = func()
×
130
                # if timeout is not None and size==libssh2.LIBSSH2_ERROR_EAGAIN:
131
                if size==libssh2.LIBSSH2_ERROR_EAGAIN and (block==False or (max_read>size and max_read!=-1)):
×
132
                    return(b'')
×
133
                while size>0:
×
134
                    while pos<size:
×
135
                        if remainder_len>0:
×
136
                            yield(remainder+data[pos:size])
×
137
                            remainder = b''
×
138
                            remainder_len = 0
×
139
                        else:
140
                            yield(data[pos:size])
×
141
                        pos = size
×
142
                    self._block_select(_select_timeout)
×
143
                    with self.session._block_lock:
×
144
                        (size,data) = func()
×
145
                    pos = 0
×
146
            if remainder_len>0:
×
147
                yield(remainder)
×
148

149
    def _auth_attempt(self,func,*args,**kwargs):
1✔
150
        try:
×
151
            func(*args,**kwargs)
×
152
        except:
×
153
            pass
×
154
        return(self.session.userauth_authenticated())
×
155

156
    def _auth(self,username,password,allow_agent,host_based,key_filepath,passphrase,look_for_keys):
1✔
157
        auth_supported = self.session.userauth_list(username)
×
158
        if isinstance(auth_supported,type([])):
×
159
            auth_types_tried = []
×
160

161
            if 'publickey' in auth_supported:
×
162
                if allow_agent==True:
×
163
                    auth_types_tried.append('publickey')
×
164
                    if self._auth_attempt(self.session.agent_auth,username)==True:
×
165
                        return()
×
166
                if not key_filepath==None:
×
167
                    if isinstance(key_filepath,type(''))==True:
×
168
                        key_filepath = [key_filepath]
×
169
                    if isinstance(key_filepath,type([]))==True:
×
170
                        if passphrase==None:
×
171
                            passphrase = ''
×
172
                        for private_key in key_filepath:
×
173
                            if os.path.exists(private_key) and os.path.isfile(private_key):
×
174
                                auth_types_tried.append('publickey')
×
175
                                if self._auth_attempt(self.session.userauth_publickey_fromfile,username,private_key,passphrase)==True:
×
176
                                    return()
×
177
                # elif host_based==True:
178
                    # auth_types_tried.append('hostbased')
179
                    # if res==self._auth_attempt(self.session.userauth_hostbased_fromfile,username,private_key,hostname,passphrase=passphrase):
180
                        # return()
181
            if not password==None:
×
182
                if 'password' in auth_supported:
×
183
                    auth_types_tried.append('password')
×
184
                    if self._auth_attempt(self.session.userauth_password,username,password)==True:
×
185
                        return()
×
186
                if 'keyboard-interactive' in auth_supported:
×
187
                    auth_types_tried.append('keyboard-interactive')
×
188
                    if self._auth_attempt(self.session.userauth_keyboardinteractive,username,password)==True:
×
189
                        return()
×
190

191
        if self.session.userauth_authenticated()==False:
×
192
            raise(exceptions.AuthenticationFailedException(list(set(auth_types_tried))))
×
193

194
    def eof(self):
1✔
195
        '''
196
        Returns ``True`` or ``False`` when the main channel has recieved an ``EOF``.
197
        '''
198
        if self.__check_for_attr__('channel')==True:
×
199
            return(self._block(self.channel.eof))
×
200

201
    def methods(self, method):
1✔
202
        '''
203
        Returns what value was settled on during session negotiation.
204
        '''
205
        if self.__check_for_attr__('session')==True:
×
206
            return(self._block(self.session.methods,method))
×
207

208
    def setenv(self, varname, value):
1✔
209
        '''
210
        Set an environment variable on the main channel.
211

212
        :param varname: Name of environment variable to set on the remote channel.
213
        :type varname: ``str``
214
        :param value: Value to set ``varname`` to.
215
        :type value: ``str``
216
        :return: ``None``
217
        '''
218
        if self.past_login==True:
×
219
            self._block(self.channel.setenv,varname,value)
×
220

221
    def open_channel(self,shell=True,pty=False):
1✔
222
        channel = self._block(self.session.open_session)
×
223
        if self.request_pty==True and pty==True:
×
224
            self._block(channel.pty,self.terminal)
×
225
        if shell==True:
×
226
            self._block(channel.shell)
×
227
        return(channel)
×
228

229
    def check_host_key(self,hostname,port):
1✔
230
        if self.ssh_host_key_verification==enums.SSHHostKeyVerify.none:
×
231
            return(None)
×
232

233
        self.known_hosts = self.session.knownhost_init()
×
234
        if os.path.exists(self.known_hosts_path)==True:
×
235
            self.known_hosts.readfile(self.known_hosts_path)
×
236
        (host_key,host_key_type) = self.session.hostkey()
×
237

238
        if isinstance(hostname,type('')):
×
239
            hostname = hostname.encode(self.encoding)
×
240
        if host_key_type==libssh2.LIBSSH2_HOSTKEY_TYPE_RSA:
×
241
            server_key_type = libssh2.LIBSSH2_KNOWNHOST_KEY_SSHRSA
×
242
        else:
243
            server_key_type = libssh2.LIBSSH2_KNOWNHOST_KEY_SSHDSS
×
244
        key_bitmask = libssh2.LIBSSH2_KNOWNHOST_TYPE_PLAIN|libssh2.LIBSSH2_KNOWNHOST_KEYENC_RAW|server_key_type
×
245

246

247
        if self.ssh_host_key_verification==enums.SSHHostKeyVerify.strict:
×
248
            self.known_hosts.checkp(hostname,port,host_key,key_bitmask)
×
249

250
        if self.ssh_host_key_verification==enums.SSHHostKeyVerify.warn:
×
251
            try:
×
252
                self.known_hosts.checkp(hostname,port,host_key,key_bitmask)
×
253
            except Exception as e:
×
254
                print('WARN: '+str(e))
×
255

256
        if self.ssh_host_key_verification in [enums.SSHHostKeyVerify.auto_add,enums.SSHHostKeyVerify.warn_auto_add]:
×
257
            try:
×
258
                self.known_hosts.checkp(hostname,port,host_key,key_bitmask)
×
259
                return(None)
×
260
            except Exception as e:
×
261
                if self.ssh_host_key_verification==enums.SSHHostKeyVerify.warn_auto_add:
×
262
                    print('WARN: '+str(e))
×
263
            self.known_hosts.addc(hostname,host_key,key_bitmask)
×
264
            self.known_hosts.writefile(self.known_hosts_path)
×
265

266
    def connect(self,hostname,port=22,username='',password=None,
1✔
267
        allow_agent=False,host_based=None,key_filepath=None,passphrase=None,
268
        look_for_keys=False,sock=None,timeout=None):
269
        '''
270
        .. warning::
271
            Some authentication methods are not yet supported!
272

273
        :param hostname: Hostname to connect to.
274
        :type hostname: ``str``
275
        :param port: SSH port to connect to.
276
        :type port: ``int``
277
        :param username: Username to connect as to the remote server.
278
        :type username: ``str``
279
        :param password: Password to offer to the remote server for authentication.
280
        :type password: ``str``
281
        :param allow_agent: Allow the local SSH key agent to offer the keys held in it for authentication.
282
        :type allow_agent: ``bool``
283
        :param host_based: Allow the local SSH host keys to be used for authentication. NOT IMPLEMENTED!
284
        :type host_based: ``bool``
285
        :param key_filepath: Array of filenames to offer to the remote server. Can be a string for a single key.
286
        :type key_filepath: ``array``/``str``
287
        :param passphrase: Passphrase to decrypt any keys offered to the remote server.
288
        :type passphrase: ``str``
289
        :param look_for_keys: Enable offering keys in ``~/.ssh`` automatically. NOT IMPLEMENTED!
290
        :type look_for_keys: ``bool``
291
        :param sock: A pre-connected socket to the remote server. Useful if you have strange network requirements.
292
        :type sock: :func:`socket.socket`
293
        :param timeout: Timeout for the socket connection to the remote server.
294
        :type timeout: ``float``
295
        '''
296
        if password==None and allow_agent==False and host_based==None and key_filepath==None and look_for_keys==False:
×
297
            raise(exceptions.NoAuthenticationOfferedException())
×
298
        if self.past_login==False:
×
299
            if sock==None:
×
300
                self.sock = socket.create_connection((hostname,port),timeout)
×
301
                self.sock.setsockopt(socket.SOL_SOCKET,socket.SO_KEEPALIVE,1)
×
302
                self.sock.setsockopt(socket.IPPROTO_TCP,socket.TCP_NODELAY,self.tcp_nodelay)
×
303
            else:
304
                self.sock = sock
×
305

306
            self.session = libssh2.Session()
×
307
            # self.session.publickey_init()
308

309
            if not self.set_flags=={}:
×
310
                for flag in self.set_flags:
×
311
                    self.session.flag(flag, self.set_flags[flag])
×
312

313
            if not self.method_preferences=={}:
×
314
                for pref in self.method_preferences:
×
315
                    self.session.method_pref(pref, self.method_preferences[pref])
×
316

317
            # if 'callback_set' in dir(self.session):
318
                # if not self.callbacks=={}:
319
                    # for cbtype in self.callbacks:
320
                        # self.session.callback_set(cbtype, self.callbacks[cbtype])
321

322
            self.session.handshake(self.sock)
×
323

324
            __initial = time.time()
×
325
            self.session.keepalive_send()
×
326
            new_select_timeout = float(time.time()-__initial)
×
327
            if new_select_timeout>self._select_timeout and self._auto_select_timeout_enabled==True:
×
328
                self._select_timeout = new_select_timeout
×
329

330
            self.check_host_key(hostname,port) # segfault on real ssh server????
×
331

332
            self._auth(username,password,allow_agent,host_based,key_filepath,passphrase,look_for_keys)
×
333

334
            self.session.set_blocking(False)
×
335
            if self.ssh_keepalive_interval>0:
×
336
                self.session.keepalive_config(True, self.ssh_keepalive_interval)
×
337
                self._ssh_keepalive_thread = threading.Thread(target=self.ssh_keepalive)
×
338
                self._ssh_keepalive_event = threading.Event()
×
339
                self._ssh_keepalive_thread.start()
×
340
            self.channel = self.open_channel(True,self.request_pty)
×
341

342
            # if 'callback_set' in dir(self.session):
343
                # self._forward_x11()
344

345
            self.past_login = True
×
346

347
    def read(self,block=False):
1✔
348
        '''
349
        Recieve data from the remote session.
350
        Only works if the current session has made it past the login process.
351

352
        :param block: Block until data is received from the remote server. ``True``
353
            will block until data is recieved and ``False`` may return ``b''`` if no data is available from the remote server.
354
        :type block: ``bool``
355
        :return: ``generator`` - A generator of byte strings that has been recieved in the time given.
356
        '''
357
        if self.past_login==True:
×
358
            if self.past_login==True:
×
359
                return(self._read_iter(self.channel.read,block))
×
360
        return([])
×
361

362
    def send(self,string):
1✔
363
        '''
364
        Send data to the remote session.
365
        Only works if the current session has made it past the login process.
366

367
        :param string: String to send to the remote session.
368
        :type string: ``str``
369
        :return: ``int`` - Amount of bytes sent to remote machine.
370
        '''
371
        if self.past_login==True:
×
372
            if self.past_login==True:
×
373
                return(self._block_write(self.channel.write,string))
×
374
        return(0)
×
375

376
    def flush(self):
1✔
377
        '''
378
        Flush all data on the primary channel's stdin to the remote connection.
379
        Only works if connected, otherwise returns ``0``.
380

381
        :return: ``int`` - Amount of bytes sent to remote machine.
382
        '''
383
        if self.past_login==True:
×
384
            if self.past_login==True:
×
385
                return(self._block(self.channel.flush))
×
386
        return(0)
×
387

388
    def last_error(self):
1✔
389
        '''
390
        Get the last error from the current session.
391

392
        :return: ``str``
393
        '''
394
        return(self._block(self.session.last_error))
×
395

396
    def execute_command(self,command,env=None,channel=None,pty=False):
1✔
397
        '''
398
        Run a command. This will block as the command executes.
399

400
        :param command: Command to execute.
401
        :type command: ``str``
402
        :param env: Environment variables to set during ``command``.
403
        :type env: ``dict``
404
        :param channel: Use an existing SSH channel instead of spawning a new one.
405
        :type channel: ``redssh.RedSSH.channel``
406
        :param pty: Request a pty for the command to be executed via.
407
        :type pty: ``bool``
408
        :return: ``tuple (int, str)`` - of ``(return_code, command_output)``
409
        '''
410
        if env==None:
×
411
            env = {}
×
412
        if len(env)>0:
×
413
            for key in env:
×
414
                self.setenv(key,env[key])
×
415
        out = b''
×
416
        if channel==None:
×
417
            channel = self.open_channel(True,pty)
×
418
        self._block(channel.execute,command)
×
419
        iter = self._read_iter(channel.read,True)
×
420
        for data in iter:
×
421
            out+=data
×
422
        self._block(channel.wait_eof)
×
423
        self._block(channel.close)
×
424
        ret = self._block(channel.get_exit_status)
×
425
        del channel
×
426
        return(ret,out)
×
427

428
    def start_sftp(self):
1✔
429
        '''
430
        Start the SFTP client.
431
        The client will be available at `self.sftp` and will be an instance of `redssh.sftp.RedSFTP`
432

433
        :return: ``None``
434
        '''
435
        if self.past_login and self.__check_for_attr__('sftp')==False:
×
436
            self.sftp = sftp.RedSFTP(self)
×
437

438
    def start_scp(self):
1✔
439
        '''
440
        Start the SCP client.
441

442
        Note that openssh has deprecated SCP, if this fails to start the SCP client it will transparently start the SFTP client as an alternative if allowed.
443

444
        :return: ``None``
445
        '''
446
        if self.past_login and self.__check_for_attr__('scp')==False:
×
447
            self.scp = scp.RedSCP(self)
×
448

449

450
    # def _forward_x11(self):
451
        # if libssh2.LIBSSH2_CALLBACK_X11 in self.callbacks:
452
            # self.x11_channels = []
453
            # disp = 0
454
            # thread_terminate = threading.Event()
455
            # self._block(self.channel.x11_req, disp)
456
            # forward_thread = threading.Thread(target=x11.forward,args=(self,thread_terminate))
457
            # forward_thread.daemon = True
458
            # forward_thread.name = enums.TunnelType.x11+':'+str(disp)
459
            # forward_thread.start()
460
            # self.tunnels[enums.TunnelType.x11][disp] = (forward_thread,thread_terminate,None,None)
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc