|  | #!/usr/bin/env python3 | 
|  |  | 
|  |  | 
|  | import logging | 
|  | import socket | 
|  | import telnetlib | 
|  | import time | 
|  | from collections import deque | 
|  |  | 
|  |  | 
|  | class TelnetRemoteclient: | 
|  | r""" | 
|  | Class to create telnet connection to remote host for command execution. | 
|  | """ | 
|  |  | 
|  | def __init__( | 
|  | self, hostname, username, password, port=23, read_timeout=None | 
|  | ): | 
|  | r""" | 
|  | Description of argument(s): | 
|  |  | 
|  | hostname        Name/IP of the remote (targeting) host | 
|  | username        User on the remote host with access to FFCD files | 
|  | password        Password for user on remote host | 
|  | read_timeout    New read timeout value to override default one | 
|  | """ | 
|  |  | 
|  | self.hostname = hostname | 
|  | self.username = username | 
|  | self.password = password | 
|  | self.tnclient = None | 
|  | self.port = port | 
|  | self.read_timeout = read_timeout | 
|  |  | 
|  | def tn_remoteclient_login(self): | 
|  | is_telnet = True | 
|  | try: | 
|  | self.tnclient = telnetlib.Telnet( | 
|  | self.hostname, self.port, timeout=15 | 
|  | ) | 
|  | if b"login:" in self.tnclient.read_until( | 
|  | b"login:", timeout=self.read_timeout | 
|  | ): | 
|  | self.tnclient.write(self.username.encode("utf-8") + b"\n") | 
|  |  | 
|  | if b"Password:" in self.tnclient.read_until( | 
|  | b"Password:", timeout=self.read_timeout | 
|  | ): | 
|  | self.tnclient.write(self.password.encode("utf-8") + b"\n") | 
|  |  | 
|  | n, match, pre_match = self.tnclient.expect( | 
|  | [ | 
|  | b"Login incorrect", | 
|  | b"invalid login name or password.", | 
|  | rb"\#", | 
|  | rb"\$", | 
|  | ], | 
|  | timeout=self.read_timeout, | 
|  | ) | 
|  | if n == 0 or n == 1: | 
|  | logging.error( | 
|  | "\n\tERROR: Telnet Authentication Failed.  Check" | 
|  | " userid and password.\n\n" | 
|  | ) | 
|  | is_telnet = False | 
|  | else: | 
|  | # login successful | 
|  | self.fifo = deque() | 
|  | else: | 
|  | # Anything else, telnet server is not running | 
|  | logging.error("\n\tERROR: Telnet Connection Refused.\n\n") | 
|  | is_telnet = False | 
|  | else: | 
|  | is_telnet = False | 
|  | except Exception: | 
|  | # Any kind of exception, skip telnet protocol | 
|  | is_telnet = False | 
|  |  | 
|  | return is_telnet | 
|  |  | 
|  | def __del__(self): | 
|  | self.tn_remoteclient_disconnect() | 
|  |  | 
|  | def tn_remoteclient_disconnect(self): | 
|  | try: | 
|  | self.tnclient.close() | 
|  | except Exception: | 
|  | # the telnet object might not exist yet, so ignore this one | 
|  | pass | 
|  |  | 
|  | def execute_command(self, cmd, i_timeout=120): | 
|  | r""" | 
|  | Executes commands on the remote host | 
|  |  | 
|  | Description of argument(s): | 
|  | cmd             Command to run on remote host | 
|  | i_timeout       Timeout for command output | 
|  | default is 120 seconds | 
|  | """ | 
|  |  | 
|  | # Wait time for command execution before reading the output. | 
|  | # Use user input wait time for command execution if one exists. | 
|  | # Else use the default 120 sec, | 
|  | if i_timeout != 120: | 
|  | execution_time = i_timeout | 
|  | else: | 
|  | execution_time = 120 | 
|  |  | 
|  | # Execute the command and read the command output. | 
|  | return_buffer = b"" | 
|  | try: | 
|  | # Do at least one non-blocking read. | 
|  | #  to flush whatever data is in the read buffer. | 
|  | while self.tnclient.read_very_eager(): | 
|  | continue | 
|  |  | 
|  | # Execute the command | 
|  | self.tnclient.write(cmd.encode("utf-8") + b"\n") | 
|  | time.sleep(execution_time) | 
|  |  | 
|  | local_buffer = b"" | 
|  | # Read the command output one block at a time. | 
|  | return_buffer = self.tnclient.read_very_eager() | 
|  | while return_buffer: | 
|  | local_buffer = b"".join([local_buffer, return_buffer]) | 
|  | time.sleep(3)  # let the buffer fill up a bit | 
|  | return_buffer = self.tnclient.read_very_eager() | 
|  | except (socket.error, EOFError) as e: | 
|  | self.tn_remoteclient_disconnect() | 
|  |  | 
|  | if str(e).__contains__("Connection reset by peer"): | 
|  | msg = e | 
|  | elif str(e).__contains__("telnet connection closed"): | 
|  | msg = "Telnet connection closed." | 
|  | else: | 
|  | msg = "Some other issue.%s %s %s\n\n" % (cmd, e.__class__, e) | 
|  |  | 
|  | logging.error("\t\t ERROR %s " % msg) | 
|  |  | 
|  | # Return ASCII string data with ending PROMPT stripped | 
|  | return local_buffer.decode("ascii", "ignore").replace("$ ", "\n") |