1# SPDX-License-Identifier: BSD-3-Clause 2# Copyright(c) 2010-2014 Intel Corporation 3# Copyright(c) 2022 PANTHEON.tech s.r.o. 4# Copyright(c) 2022 University of New Hampshire 5 6import time 7 8from pexpect import pxssh # type: ignore 9 10from framework.config import NodeConfiguration 11from framework.exception import SSHConnectionError, SSHSessionDeadError, SSHTimeoutError 12from framework.logger import DTSLOG 13from framework.utils import GREEN, RED 14 15from .remote_session import RemoteSession 16 17 18class SSHSession(RemoteSession): 19 """ 20 Module for creating Pexpect SSH sessions to a node. 21 """ 22 23 session: pxssh.pxssh 24 magic_prompt: str 25 26 def __init__( 27 self, 28 node_config: NodeConfiguration, 29 session_name: str, 30 logger: DTSLOG, 31 ): 32 self.magic_prompt = "MAGIC PROMPT" 33 super(SSHSession, self).__init__(node_config, session_name, logger) 34 35 def _connect(self) -> None: 36 """ 37 Create connection to assigned node. 38 """ 39 retry_attempts = 10 40 login_timeout = 20 if self.port else 10 41 password_regex = ( 42 r"(?i)(?:password:)|(?:passphrase for key)|(?i)(password for .+:)" 43 ) 44 try: 45 for retry_attempt in range(retry_attempts): 46 self.session = pxssh.pxssh(encoding="utf-8") 47 try: 48 self.session.login( 49 self.ip, 50 self.username, 51 self.password, 52 original_prompt="[$#>]", 53 port=self.port, 54 login_timeout=login_timeout, 55 password_regex=password_regex, 56 ) 57 break 58 except Exception as e: 59 self.logger.warning(e) 60 time.sleep(2) 61 self.logger.info( 62 f"Retrying connection: retry number {retry_attempt + 1}." 63 ) 64 else: 65 raise Exception(f"Connection to {self.hostname} failed") 66 67 self.send_expect("stty -echo", "#") 68 self.send_expect("stty columns 1000", "#") 69 except Exception as e: 70 self.logger.error(RED(str(e))) 71 if getattr(self, "port", None): 72 suggestion = ( 73 f"\nSuggestion: Check if the firewall on {self.hostname} is " 74 f"stopped.\n" 75 ) 76 self.logger.info(GREEN(suggestion)) 77 78 raise SSHConnectionError(self.hostname) 79 80 def send_expect( 81 self, command: str, prompt: str, timeout: float = 15, verify: bool = False 82 ) -> str | int: 83 try: 84 ret = self.send_expect_base(command, prompt, timeout) 85 if verify: 86 ret_status = self.send_expect_base("echo $?", prompt, timeout) 87 try: 88 retval = int(ret_status) 89 if retval: 90 self.logger.error(f"Command: {command} failure!") 91 self.logger.error(ret) 92 return retval 93 else: 94 return ret 95 except ValueError: 96 return ret 97 else: 98 return ret 99 except Exception as e: 100 self.logger.error( 101 f"Exception happened in [{command}] and output is " 102 f"[{self._get_output()}]" 103 ) 104 raise e 105 106 def send_expect_base(self, command: str, prompt: str, timeout: float) -> str: 107 self._clean_session() 108 original_prompt = self.session.PROMPT 109 self.session.PROMPT = prompt 110 self._send_line(command) 111 self._prompt(command, timeout) 112 113 before = self._get_output() 114 self.session.PROMPT = original_prompt 115 return before 116 117 def _clean_session(self) -> None: 118 self.session.PROMPT = self.magic_prompt 119 self.get_output(timeout=0.01) 120 self.session.PROMPT = self.session.UNIQUE_PROMPT 121 122 def _send_line(self, command: str) -> None: 123 if not self.is_alive(): 124 raise SSHSessionDeadError(self.hostname) 125 if len(command) == 2 and command.startswith("^"): 126 self.session.sendcontrol(command[1]) 127 else: 128 self.session.sendline(command) 129 130 def _prompt(self, command: str, timeout: float) -> None: 131 if not self.session.prompt(timeout): 132 raise SSHTimeoutError(command, self._get_output()) from None 133 134 def get_output(self, timeout: float = 15) -> str: 135 """ 136 Get all output before timeout 137 """ 138 try: 139 self.session.prompt(timeout) 140 except Exception: 141 pass 142 143 before = self._get_output() 144 self._flush() 145 146 return before 147 148 def _get_output(self) -> str: 149 if not self.is_alive(): 150 raise SSHSessionDeadError(self.hostname) 151 before = self.session.before.rsplit("\r\n", 1)[0] 152 if before == "[PEXPECT]": 153 return "" 154 return before 155 156 def _flush(self) -> None: 157 """ 158 Clear all session buffer 159 """ 160 self.session.buffer = "" 161 self.session.before = "" 162 163 def is_alive(self) -> bool: 164 return self.session.isalive() 165 166 def _send_command(self, command: str, timeout: float) -> str: 167 try: 168 self._clean_session() 169 self._send_line(command) 170 except Exception as e: 171 raise e 172 173 output = self.get_output(timeout=timeout) 174 self.session.PROMPT = self.session.UNIQUE_PROMPT 175 self.session.prompt(0.1) 176 177 return output 178 179 def _close(self, force: bool = False) -> None: 180 if force is True: 181 self.session.close() 182 else: 183 if self.is_alive(): 184 self.session.logout() 185