blob: 2a462acb10dfa60ed1c3f4bc2a804ddf3c7109ae [file] [log] [blame]
#!/usr/bin/env python3
import logging
import socket
import telnetlib
import time
from collections import deque
class TelnetRemoteclient:
r"""
Class to create telnet connection to remote host for command execution.
"""
def __init__(
self, hostname, username, password, port=23, read_timeout=None
):
r"""
Initialize the TelnetRemoteClient object with the provided remote host
details.
This method initializes a TelnetRemoteClient object with the given
attributes, which represent the details of the remote (targeting) host.
The attributes include the hostname, username, password, and Telnet
port.
The method also accepts an optional read_timeout parameter, which
specifies a new read timeout value to override the default one.
Parameters:
hostname (str): Name or IP address of the remote
host.
username (str): User on the remote host with access
to files.
password (str): Password for the user on the remote
host.
port (int, optional): Telnet port value. Defaults to 23.
read_timeout (int, optional): New read timeout value to override
the default one. Defaults to None.
Returns:
None
"""
self.hostname = hostname
self.username = username
self.password = password
self.tnclient = None
self.port = port
self.read_timeout = read_timeout
def tn_remoteclient_login(self):
r"""
Establish a Telnet connection to the remote host and log in.
This method establishes a Telnet connection to the remote host using
the provided hostname, username, and password. If the connection and
login are successful, the method returns True. Otherwise, it returns
False.
Parameters:
None
Returns:
bool: True if the Telnet connection and login are successful,
False otherwise.
"""
is_telnet = True
try:
self.tnclient = telnetlib.Telnet(
self.hostname, self.port, timeout=15
)
if b"login:" in self.tnclient.read_until(
b"login:", timeout=self.read_timeout
):
self.tnclient.write(self.username.encode("utf-8") + b"\n")
if b"Password:" in self.tnclient.read_until(
b"Password:", timeout=self.read_timeout
):
self.tnclient.write(self.password.encode("utf-8") + b"\n")
n, match, pre_match = self.tnclient.expect(
[
b"Login incorrect",
b"invalid login name or password.",
rb"\#",
rb"\$",
],
timeout=self.read_timeout,
)
if n == 0 or n == 1:
logging.error(
"\n\tERROR: Telnet Authentication Failed. Check"
" userid and password.\n\n"
)
is_telnet = False
else:
# login successful
self.fifo = deque()
else:
# Anything else, telnet server is not running
logging.error("\n\tERROR: Telnet Connection Refused.\n\n")
is_telnet = False
else:
is_telnet = False
except Exception:
# Any kind of exception, skip telnet protocol
is_telnet = False
return is_telnet
def __del__(self):
r"""
Disconnect from the remote host when the object is deleted.
This method disconnects from the remote host when the
TelnetRemoteClient object is deleted.
"""
self.tn_remoteclient_disconnect()
def tn_remoteclient_disconnect(self):
r"""
Disconnect from the remote host using the Telnet client.
This method disconnects from the remote host using the Telnet client
established during the FFDC collection process.
The method attempts to close the Telnet connection. If the Telnet
client does not exist, the method ignores the exception.
"""
try:
self.tnclient.close()
except Exception:
# the telnet object might not exist yet, so ignore this one
pass
def execute_command(self, cmd, i_timeout=120):
r"""
Executes a command on the remote host using Telnet and returns the
output.
This method executes a provided command on the remote host using
Telnet. The method takes the cmd argument, which is expected to be a
valid command to execute, and an optional i_timeout parameter, which
specifies the timeout for the command output.
The method returns the output of the executed command as a string.
Parameters:
cmd (str): The command to be executed on the
remote host.
i_timeout (int, optional): The timeout for the command output.
Defaults to 120 seconds.
Returns:
str: The output of the executed command as a string.
"""
# Wait time for command execution before reading the output.
# Use user input wait time for command execution if one exists.
# Else use the default 120 sec,
if i_timeout != 120:
execution_time = i_timeout
else:
execution_time = 120
# Execute the command and read the command output.
return_buffer = b""
try:
# Do at least one non-blocking read.
# to flush whatever data is in the read buffer.
while self.tnclient.read_very_eager():
continue
# Execute the command
self.tnclient.write(cmd.encode("utf-8") + b"\n")
time.sleep(execution_time)
local_buffer = b""
# Read the command output one block at a time.
return_buffer = self.tnclient.read_very_eager()
while return_buffer:
local_buffer = b"".join([local_buffer, return_buffer])
time.sleep(3) # let the buffer fill up a bit
return_buffer = self.tnclient.read_very_eager()
except (socket.error, EOFError) as e:
self.tn_remoteclient_disconnect()
if str(e).__contains__("Connection reset by peer"):
msg = e
elif str(e).__contains__("telnet connection closed"):
msg = "Telnet connection closed."
else:
msg = "Some other issue.%s %s %s\n\n" % (cmd, e.__class__, e)
logging.error("\t\t ERROR %s " % msg)
# Return ASCII string data with ending PROMPT stripped
return local_buffer.decode("ascii", "ignore").replace("$ ", "\n")