blob: 82b10fd45d27efe7a51819d8d5f279b249f2dd25 [file] [log] [blame]
#!/usr/bin/env python3
r"""
Network generic functions.
"""
import collections
import ipaddress
import json
import re
import socket
import subprocess
import bmc_ssh_utils as bsu
import gen_cmd as gc
import gen_misc as gm
import gen_print as gp
import var_funcs as vf
from robot.libraries.BuiltIn import BuiltIn
ip_regex = r"\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}"
def get_running_system_ip():
r"""
Get the IP address of server from which robot code is running.
"""
ip_list = list()
stdout = subprocess.check_output(["hostname", "--all-fqdns"], shell=True)
host_fqdns = stdout.decode("utf-8").strip()
ip_address = socket.gethostbyname(str(host_fqdns))
ip_list.append(ip_address)
return ip_list
def netmask_prefix_length(netmask):
r"""
Return the netmask prefix length.
Description of argument(s):
netmask Netmask value (e.g. "255.255.0.0", "255.255.255.0",
"255.252.0.0", etc.).
"""
# IP address netmask format: '0.0.0.0/255.255.252.0'
return ipaddress.ip_network("0.0.0.0/" + netmask).prefixlen
def get_netmask_address(prefix_len):
r"""
Return the netmask address.
Description of argument(s):
prefix_len Prefix length value (e.g. "24", "23", "22", etc.).
"""
# IP address netmask format: '0.0.0.0/24'
return ipaddress.ip_network("0.0.0.0/" + prefix_len).netmask
def parse_nping_output(output):
r"""
Parse the output from the nping command and return as a dictionary.
Example of output value:
Starting Nping 0.6.47 ( http://nmap.org/nping ) at 2019-08-07 22:05 IST
SENT (0.0181s) TCP Source IP:37577 >
Destination IP:80 S ttl=64 id=39113 iplen=40 seq=629782493 win=1480
SENT (0.2189s) TCP Source IP:37577 >
Destination IP:80 S ttl=64 id=39113 iplen=40 seq=629782493 win=1480
RCVD (0.4120s) TCP Destination IP:80 >
Source IP:37577 SA ttl=49 id=0 iplen=44 seq=1078301364 win=5840 <mss 1380>
Max rtt: 193.010ms | Min rtt: 193.010ms | Avg rtt: 193.010ms
Raw packets sent: 2 (80B) | Rcvd: 1 (46B) | Lost: 1 (50.00%)
TCP connection attempts: 5 | Successful connections: 5 | Failed: 0 (0.00%)
Nping done: 1 IP address pinged in 0.43 seconds
Example of data returned by this function:
nping_result:
[max_rtt]: 193.010ms
[min_rtt]: 193.010ms
[avg_rtt]: 193.010ms
[raw_packets_sent]: 2 (80B)
[rcvd]: 1 (46B)
[lost]: 1 (50.00%)
[percent_lost]: 50.00
[tcp_connection_attempts]: 5
[successful_connections]: 5
[failed]: 0 (0.00%)
[percent_failed]: 0.00
Description of argument(s):
output The output obtained by running an nping
command.
"""
lines = output.split("\n")
# Obtain only the lines of interest.
lines = list(
filter(
lambda x: re.match(r"(Max rtt|Raw packets|TCP connection)", x),
lines,
)
)
key_value_list = []
for line in lines:
key_value_list += line.split("|")
nping_result = vf.key_value_list_to_dict(key_value_list)
# Extract percent_lost/percent_failed value from lost/failed field.
if "lost" in nping_result:
nping_result["percent_lost"] = float(
nping_result["lost"].split(" ")[-1].strip("()%")
)
else:
nping_result["percent_failed"] = float(
nping_result["failed"].split(" ")[-1].strip("()%")
)
return nping_result
openbmc_host = BuiltIn().get_variable_value("${OPENBMC_HOST}")
def nping(host=openbmc_host, parse_results=1, **options):
r"""
Run the nping command and return the results either as a string or as a dictionary.
Do a 'man nping' for a complete description of the nping utility.
Note that any valid nping argument may be specified as a function argument.
Example robot code:
${nping_result}= Nping delay=${delay} count=${count} icmp=${None} icmp-type=echo
Rprint Vars nping_result
Resulting output:
nping_result:
[max_rtt]: 0.534ms
[min_rtt]: 0.441ms
[avg_rtt]: 0.487ms
[raw_packets_sent]: 4 (112B)
[rcvd]: 2 (92B)
[lost]: 2 (50.00%)
[percent_lost]: 50.0
Description of argument(s):
host The host name or IP of the target of the
nping command.
parse_results 1 or True indicates that this function
should parse the nping results and return
a dictionary rather than the raw nping
output. See the parse_nping_output()
function for details on the dictionary
structure.
options Zero or more options accepted by the nping
command. Do a 'man nping' for details.
"""
command_string = gc.create_command_string("nping", host, options)
rc, output = gc.shell_cmd(command_string, print_output=0, ignore_err=0)
if parse_results:
return parse_nping_output(output)
return output
def get_channel_config():
r"""
Get the channel config data and return as a dictionary.
Example:
channel_config = get_channel_config()
print_vars(channel_config)
channel_config:
[0]:
[name]: IPMB
[is_valid]: True
[active_sessions]: 0
[channel_info]:
[medium_type]: ipmb
[protocol_type]: ipmb-1.0
[session_supported]: session-less
[is_ipmi]: True
[1]:
[name]: eth0
[is_valid]: True
[active_sessions]: 0
[channel_info]:
[medium_type]: other-lan
[protocol_type]: ipmb-1.0
[session_supported]: multi-session
[is_ipmi]: True
[2]:
[name]: eth1
[is_valid]: True
[active_sessions]: 0
[channel_info]:
[medium_type]: lan-802.3
[protocol_type]: ipmb-1.0
[session_supported]: multi-session
[is_ipmi]: True
(etc.)
"""
stdout, stderr, rc = bsu.bmc_execute_command(
"cat /usr/share/ipmi-providers/channel_config.json"
)
return json.loads(stdout)
def get_active_channel_config():
r"""
Channel configs which medium_type are 'other-lan' or 'lan-802.3' returned by
this function.
"""
return vf.filter_struct(
get_channel_config(),
"[('medium_type', 'other-lan|lan-802.3')]",
regex=1,
)
def get_channel_access_config(file_name):
r"""
Get the channel access config data and return as a dictionary.
Description of argument:
file_name File name for channel access settings (e.g. '/run/ipmi/channel_access_volatile.json',
'/var/lib/ipmi/channel_access_nv.json'.).
Example:
channel_access_config = get_channel_access_config()
print_vars(channel_access_config)
channel_access_config:
[1]:
[priv_limit]: priv-admin
[per_msg_auth_disabled]: False
[access_mode]: always_available
[alerting_disabled]: False
[user_auth_disabled]: False
[2]:
[priv_limit]: priv-admin
[per_msg_auth_disabled]: False
[access_mode]: always_available
[alerting_disabled]: False
[user_auth_disabled]: False
"""
stdout, stderr, rc = bsu.bmc_execute_command("cat " + file_name)
return json.loads(stdout)