black: re-format
black and isort are enabled in the openbmc-build-scripts on Python files
to have a consistent formatting. Re-run the formatter on the whole
repository.
Signed-off-by: Patrick Williams <patrick@stwcx.xyz>
Change-Id: I944f1915ece753f72a3fa654902d445a9749d0f9
diff --git a/lib/gen_arg.py b/lib/gen_arg.py
index afa7b57..2d2ed68 100755
--- a/lib/gen_arg.py
+++ b/lib/gen_arg.py
@@ -4,11 +4,13 @@
This module provides valuable argument processing functions like gen_get_options and sprint_args.
"""
-import sys
import os
import re
+import sys
+
try:
import psutil
+
psutil_imported = True
except ImportError:
psutil_imported = False
@@ -16,15 +18,16 @@
import __builtin__
except ImportError:
import builtins as __builtin__
+
+import argparse
import atexit
import signal
-import argparse
import textwrap as textwrap
-import gen_print as gp
-import gen_valid as gv
import gen_cmd as gc
import gen_misc as gm
+import gen_print as gp
+import gen_valid as gv
class MultilineFormatter(argparse.HelpFormatter):
@@ -32,13 +35,20 @@
r"""
Split text into formatted lines for every "%%n" encountered in the text and return the result.
"""
- lines = self._whitespace_matcher.sub(' ', text).strip().split('%n')
- formatted_lines = \
- [textwrap.fill(x, width, initial_indent=indent, subsequent_indent=indent) + '\n' for x in lines]
- return ''.join(formatted_lines)
+ lines = self._whitespace_matcher.sub(" ", text).strip().split("%n")
+ formatted_lines = [
+ textwrap.fill(
+ x, width, initial_indent=indent, subsequent_indent=indent
+ )
+ + "\n"
+ for x in lines
+ ]
+ return "".join(formatted_lines)
-class ArgumentDefaultsHelpMultilineFormatter(MultilineFormatter, argparse.ArgumentDefaultsHelpFormatter):
+class ArgumentDefaultsHelpMultilineFormatter(
+ MultilineFormatter, argparse.ArgumentDefaultsHelpFormatter
+):
pass
@@ -46,8 +56,7 @@
module = sys.modules["__main__"]
-def gen_get_options(parser,
- stock_list=[]):
+def gen_get_options(parser, stock_list=[]):
r"""
Parse the command line arguments using the parser object passed and return True/False (i.e. pass/fail).
However, if gv.exit_on_error is set, simply exit the program on failure. Also set the following built in
@@ -75,11 +84,14 @@
# Process stock_list.
for ix in range(0, len(stock_list)):
if len(stock_list[ix]) < 1:
- error_message = "Programmer error - stock_list[" + str(ix) +\
- "] is supposed to be a tuple containing at" +\
- " least one element which is the name of" +\
- " the desired stock parameter:\n" +\
- gp.sprint_var(stock_list)
+ error_message = (
+ "Programmer error - stock_list["
+ + str(ix)
+ + "] is supposed to be a tuple containing at"
+ + " least one element which is the name of"
+ + " the desired stock parameter:\n"
+ + gp.sprint_var(stock_list)
+ )
return gv.process_error_message(error_message)
if isinstance(stock_list[ix], tuple):
arg_name = stock_list[ix][0]
@@ -89,65 +101,86 @@
default = None
if arg_name not in master_stock_list:
- error_message = "Programmer error - arg_name \"" + arg_name +\
- "\" not found found in stock list:\n" +\
- gp.sprint_var(master_stock_list)
+ error_message = (
+ 'Programmer error - arg_name "'
+ + arg_name
+ + '" not found found in stock list:\n'
+ + gp.sprint_var(master_stock_list)
+ )
return gv.process_error_message(error_message)
if arg_name == "quiet":
if default is None:
default = 0
parser.add_argument(
- '--quiet',
+ "--quiet",
default=default,
type=int,
choices=[1, 0],
help='If this parameter is set to "1", %(prog)s'
- + ' will print only essential information, i.e. it will'
- + ' not echo parameters, echo commands, print the total'
- + ' run time, etc.' + default_string)
+ + " will print only essential information, i.e. it will"
+ + " not echo parameters, echo commands, print the total"
+ + " run time, etc."
+ + default_string,
+ )
elif arg_name == "test_mode":
if default is None:
default = 0
parser.add_argument(
- '--test_mode',
+ "--test_mode",
default=default,
type=int,
choices=[1, 0],
- help='This means that %(prog)s should go through all the'
- + ' motions but not actually do anything substantial.'
- + ' This is mainly to be used by the developer of'
- + ' %(prog)s.' + default_string)
+ help="This means that %(prog)s should go through all the"
+ + " motions but not actually do anything substantial."
+ + " This is mainly to be used by the developer of"
+ + " %(prog)s."
+ + default_string,
+ )
elif arg_name == "debug":
if default is None:
default = 0
parser.add_argument(
- '--debug',
+ "--debug",
default=default,
type=int,
choices=[1, 0],
help='If this parameter is set to "1", %(prog)s will print'
- + ' additional debug information. This is mainly to be'
- + ' used by the developer of %(prog)s.' + default_string)
+ + " additional debug information. This is mainly to be"
+ + " used by the developer of %(prog)s."
+ + default_string,
+ )
elif arg_name == "loglevel":
if default is None:
default = "info"
parser.add_argument(
- '--loglevel',
+ "--loglevel",
default=default,
type=str,
- choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL',
- 'debug', 'info', 'warning', 'error', 'critical'],
+ choices=[
+ "DEBUG",
+ "INFO",
+ "WARNING",
+ "ERROR",
+ "CRITICAL",
+ "debug",
+ "info",
+ "warning",
+ "error",
+ "critical",
+ ],
help='If this parameter is set to "1", %(prog)s will print'
- + ' additional debug information. This is mainly to be'
- + ' used by the developer of %(prog)s.' + default_string)
+ + " additional debug information. This is mainly to be"
+ + " used by the developer of %(prog)s."
+ + default_string,
+ )
arg_obj = parser.parse_args()
__builtin__.quiet = 0
__builtin__.test_mode = 0
__builtin__.debug = 0
- __builtin__.loglevel = 'WARNING'
+ __builtin__.loglevel = "WARNING"
for ix in range(0, len(stock_list)):
if isinstance(stock_list[ix], tuple):
arg_name = stock_list[ix][0]
@@ -170,15 +203,14 @@
# For each command line parameter, create a corresponding global variable and assign it the appropriate
# value. For example, if the command line contained "--last_name='Smith', we'll create a global variable
# named "last_name" with the value "Smith".
- module = sys.modules['__main__']
+ module = sys.modules["__main__"]
for key in arg_obj.__dict__:
setattr(module, key, getattr(__builtin__.arg_obj, key))
return True
-def set_pgm_arg(var_value,
- var_name=None):
+def set_pgm_arg(var_value, var_name=None):
r"""
Set the value of the arg_obj.__dict__ entry named in var_name with the var_value provided. Also, set
corresponding global variable.
@@ -193,7 +225,7 @@
var_name = gp.get_arg_name(None, 1, 2)
arg_obj.__dict__[var_name] = var_value
- module = sys.modules['__main__']
+ module = sys.modules["__main__"]
setattr(module, var_name, var_value)
if var_name == "quiet":
__builtin__.quiet = var_value
@@ -203,8 +235,7 @@
__builtin__.test_mode = var_value
-def sprint_args(arg_obj,
- indent=0):
+def sprint_args(arg_obj, indent=0):
r"""
sprint_var all of the arguments found in arg_obj and return the result as a string.
@@ -218,8 +249,9 @@
buffer = ""
for key in arg_obj.__dict__:
- buffer += gp.sprint_varx(key, getattr(arg_obj, key), 0, indent,
- col1_width)
+ buffer += gp.sprint_varx(
+ key, getattr(arg_obj, key), 0, indent, col1_width
+ )
return buffer
@@ -238,7 +270,7 @@
# Set a default value for dir_path argument.
dir_path = gm.add_trailing_slash(gm.dft(dir_path, os.getcwd()))
"""
- module = sys.modules['__main__']
+ module = sys.modules["__main__"]
for key in arg_obj.__dict__:
arg_obj.__dict__[key] = getattr(module, key)
@@ -266,16 +298,17 @@
global term_options
# Validation:
arg_names = list(kwargs.keys())
- gv.valid_list(arg_names, ['term_requests'])
- if type(kwargs['term_requests']) is dict:
- keys = list(kwargs['term_requests'].keys())
- gv.valid_list(keys, ['pgm_names'])
+ gv.valid_list(arg_names, ["term_requests"])
+ if type(kwargs["term_requests"]) is dict:
+ keys = list(kwargs["term_requests"].keys())
+ gv.valid_list(keys, ["pgm_names"])
else:
- gv.valid_value(kwargs['term_requests'], ['children', 'descendants'])
+ gv.valid_value(kwargs["term_requests"], ["children", "descendants"])
term_options = kwargs
if psutil_imported:
+
def match_process_by_pgm_name(process, pgm_name):
r"""
Return True or False to indicate whether the process matches the program name.
@@ -325,9 +358,10 @@
# Because "sleep" is a compiled executable, it will appear in entry 0.
optional_dir_path_regex = "(.*/)?"
- cmdline = process.as_dict()['cmdline']
- return re.match(optional_dir_path_regex + pgm_name + '( |$)', cmdline[0]) \
- or re.match(optional_dir_path_regex + pgm_name + '( |$)', cmdline[1])
+ cmdline = process.as_dict()["cmdline"]
+ return re.match(
+ optional_dir_path_regex + pgm_name + "( |$)", cmdline[0]
+ ) or re.match(optional_dir_path_regex + pgm_name + "( |$)", cmdline[1])
def select_processes_by_pgm_name(processes, pgm_name):
r"""
@@ -340,7 +374,11 @@
object.
"""
- return [process for process in processes if match_process_by_pgm_name(process, pgm_name)]
+ return [
+ process
+ for process in processes
+ if match_process_by_pgm_name(process, pgm_name)
+ ]
def sprint_process_report(pids):
r"""
@@ -350,7 +388,10 @@
pids A list of process IDs for processes to be included in the report.
"""
report = "\n"
- cmd_buf = "echo ; ps wwo user,pgrp,pid,ppid,lstart,cmd --forest " + ' '.join(pids)
+ cmd_buf = (
+ "echo ; ps wwo user,pgrp,pid,ppid,lstart,cmd --forest "
+ + " ".join(pids)
+ )
report += gp.sprint_issuing(cmd_buf)
rc, outbuf = gc.shell_cmd(cmd_buf, quiet=1)
report += outbuf + "\n"
@@ -371,7 +412,9 @@
descendants = process.children(recursive=True)
descendant_pids = [str(process.pid) for process in descendants]
if descendants:
- process_report = sprint_process_report([str(process.pid)] + descendant_pids)
+ process_report = sprint_process_report(
+ [str(process.pid)] + descendant_pids
+ )
else:
process_report = ""
return descendants, descendant_pids, process_report
@@ -397,12 +440,15 @@
children it produces.
"""
- message = "\n" + gp.sprint_dashes(width=120) \
- + gp.sprint_executing() + "\n"
+ message = (
+ "\n" + gp.sprint_dashes(width=120) + gp.sprint_executing() + "\n"
+ )
current_process = psutil.Process()
- descendants, descendant_pids, process_report = get_descendant_info(current_process)
+ descendants, descendant_pids, process_report = get_descendant_info(
+ current_process
+ )
if not descendants:
# If there are no descendants, then we have nothing to do.
return
@@ -410,38 +456,50 @@
terminate_descendants_temp_file_path = gm.create_temp_file_path()
gp.print_vars(terminate_descendants_temp_file_path)
- message += gp.sprint_varx("pgm_name", gp.pgm_name) \
- + gp.sprint_vars(term_options) \
+ message += (
+ gp.sprint_varx("pgm_name", gp.pgm_name)
+ + gp.sprint_vars(term_options)
+ process_report
+ )
# Process the termination requests:
- if term_options['term_requests'] == 'children':
+ if term_options["term_requests"] == "children":
term_processes = current_process.children(recursive=False)
term_pids = [str(process.pid) for process in term_processes]
- elif term_options['term_requests'] == 'descendants':
+ elif term_options["term_requests"] == "descendants":
term_processes = descendants
term_pids = descendant_pids
else:
# Process term requests by pgm_names.
term_processes = []
- for pgm_name in term_options['term_requests']['pgm_names']:
- term_processes.extend(select_processes_by_pgm_name(descendants, pgm_name))
+ for pgm_name in term_options["term_requests"]["pgm_names"]:
+ term_processes.extend(
+ select_processes_by_pgm_name(descendants, pgm_name)
+ )
term_pids = [str(process.pid) for process in term_processes]
- message += gp.sprint_timen("Processes to be terminated:") \
- + gp.sprint_var(term_pids)
+ message += gp.sprint_timen(
+ "Processes to be terminated:"
+ ) + gp.sprint_var(term_pids)
for process in term_processes:
process.terminate()
- message += gp.sprint_timen("Waiting on the following pids: " + ' '.join(descendant_pids))
+ message += gp.sprint_timen(
+ "Waiting on the following pids: " + " ".join(descendant_pids)
+ )
gm.append_file(terminate_descendants_temp_file_path, message)
psutil.wait_procs(descendants)
# Checking after the fact to see whether any descendant processes are still alive. If so, a process
# report showing this will be included in the output.
- descendants, descendant_pids, process_report = get_descendant_info(current_process)
+ descendants, descendant_pids, process_report = get_descendant_info(
+ current_process
+ )
if descendants:
- message = "\n" + gp.sprint_timen("Not all of the processes terminated:") \
+ message = (
+ "\n"
+ + gp.sprint_timen("Not all of the processes terminated:")
+ process_report
+ )
gm.append_file(terminate_descendants_temp_file_path, message)
message = gp.sprint_dashes(width=120)
@@ -470,8 +528,7 @@
gp.qprint_pgm_footer()
-def gen_signal_handler(signal_number,
- frame):
+def gen_signal_handler(signal_number, frame):
r"""
Handle signals. Without a function to catch a SIGTERM or SIGINT, the program would terminate immediately
with return code 143 and without calling the exit_function.
@@ -486,8 +543,7 @@
exit(0)
-def gen_post_validation(exit_function=None,
- signal_handler=None):
+def gen_post_validation(exit_function=None, signal_handler=None):
r"""
Do generic post-validation processing. By "post", we mean that this is to be called from a validation
function after the caller has done any validation desired. If the calling program passes exit_function