xref: /dpdk/dts/framework/remote_session/ssh_session.py (revision 02d36ef6a9528e0f4a3403956e66bcea5fadbf8c)
1# SPDX-License-Identifier: BSD-3-Clause
2# Copyright(c) 2010-2014 Intel Corporation
3# Copyright(c) 2022 PANTHEON.tech s.r.o.
4# Copyright(c) 2022 University of New Hampshire
5
6import time
7
8from pexpect import pxssh  # type: ignore
9
10from framework.config import NodeConfiguration
11from framework.exception import SSHConnectionError, SSHSessionDeadError, SSHTimeoutError
12from framework.logger import DTSLOG
13from framework.utils import GREEN, RED
14
15from .remote_session import RemoteSession
16
17
18class SSHSession(RemoteSession):
19    """
20    Module for creating Pexpect SSH sessions to a node.
21    """
22
23    session: pxssh.pxssh
24    magic_prompt: str
25
26    def __init__(
27        self,
28        node_config: NodeConfiguration,
29        session_name: str,
30        logger: DTSLOG,
31    ):
32        self.magic_prompt = "MAGIC PROMPT"
33        super(SSHSession, self).__init__(node_config, session_name, logger)
34
35    def _connect(self) -> None:
36        """
37        Create connection to assigned node.
38        """
39        retry_attempts = 10
40        login_timeout = 20 if self.port else 10
41        password_regex = (
42            r"(?i)(?:password:)|(?:passphrase for key)|(?i)(password for .+:)"
43        )
44        try:
45            for retry_attempt in range(retry_attempts):
46                self.session = pxssh.pxssh(encoding="utf-8")
47                try:
48                    self.session.login(
49                        self.ip,
50                        self.username,
51                        self.password,
52                        original_prompt="[$#>]",
53                        port=self.port,
54                        login_timeout=login_timeout,
55                        password_regex=password_regex,
56                    )
57                    break
58                except Exception as e:
59                    self.logger.warning(e)
60                    time.sleep(2)
61                    self.logger.info(
62                        f"Retrying connection: retry number {retry_attempt + 1}."
63                    )
64            else:
65                raise Exception(f"Connection to {self.hostname} failed")
66
67            self.send_expect("stty -echo", "#")
68            self.send_expect("stty columns 1000", "#")
69        except Exception as e:
70            self.logger.error(RED(str(e)))
71            if getattr(self, "port", None):
72                suggestion = (
73                    f"\nSuggestion: Check if the firewall on {self.hostname} is "
74                    f"stopped.\n"
75                )
76                self.logger.info(GREEN(suggestion))
77
78            raise SSHConnectionError(self.hostname)
79
80    def send_expect(
81        self, command: str, prompt: str, timeout: float = 15, verify: bool = False
82    ) -> str | int:
83        try:
84            ret = self.send_expect_base(command, prompt, timeout)
85            if verify:
86                ret_status = self.send_expect_base("echo $?", prompt, timeout)
87                try:
88                    retval = int(ret_status)
89                    if retval:
90                        self.logger.error(f"Command: {command} failure!")
91                        self.logger.error(ret)
92                        return retval
93                    else:
94                        return ret
95                except ValueError:
96                    return ret
97            else:
98                return ret
99        except Exception as e:
100            self.logger.error(
101                f"Exception happened in [{command}] and output is "
102                f"[{self._get_output()}]"
103            )
104            raise e
105
106    def send_expect_base(self, command: str, prompt: str, timeout: float) -> str:
107        self._clean_session()
108        original_prompt = self.session.PROMPT
109        self.session.PROMPT = prompt
110        self._send_line(command)
111        self._prompt(command, timeout)
112
113        before = self._get_output()
114        self.session.PROMPT = original_prompt
115        return before
116
117    def _clean_session(self) -> None:
118        self.session.PROMPT = self.magic_prompt
119        self.get_output(timeout=0.01)
120        self.session.PROMPT = self.session.UNIQUE_PROMPT
121
122    def _send_line(self, command: str) -> None:
123        if not self.is_alive():
124            raise SSHSessionDeadError(self.hostname)
125        if len(command) == 2 and command.startswith("^"):
126            self.session.sendcontrol(command[1])
127        else:
128            self.session.sendline(command)
129
130    def _prompt(self, command: str, timeout: float) -> None:
131        if not self.session.prompt(timeout):
132            raise SSHTimeoutError(command, self._get_output()) from None
133
134    def get_output(self, timeout: float = 15) -> str:
135        """
136        Get all output before timeout
137        """
138        try:
139            self.session.prompt(timeout)
140        except Exception:
141            pass
142
143        before = self._get_output()
144        self._flush()
145
146        return before
147
148    def _get_output(self) -> str:
149        if not self.is_alive():
150            raise SSHSessionDeadError(self.hostname)
151        before = self.session.before.rsplit("\r\n", 1)[0]
152        if before == "[PEXPECT]":
153            return ""
154        return before
155
156    def _flush(self) -> None:
157        """
158        Clear all session buffer
159        """
160        self.session.buffer = ""
161        self.session.before = ""
162
163    def is_alive(self) -> bool:
164        return self.session.isalive()
165
166    def _send_command(self, command: str, timeout: float) -> str:
167        try:
168            self._clean_session()
169            self._send_line(command)
170        except Exception as e:
171            raise e
172
173        output = self.get_output(timeout=timeout)
174        self.session.PROMPT = self.session.UNIQUE_PROMPT
175        self.session.prompt(0.1)
176
177        return output
178
179    def _close(self, force: bool = False) -> None:
180        if force is True:
181            self.session.close()
182        else:
183            if self.is_alive():
184                self.session.logout()
185