blob: 2d2ed6830fd5080a435c2cc792adb0ee5f485420 [file] [log] [blame]
#!/usr/bin/env python3
r"""
This module provides valuable argument processing functions like gen_get_options and sprint_args.
"""
import os
import re
import sys
try:
import psutil
psutil_imported = True
except ImportError:
psutil_imported = False
try:
import __builtin__
except ImportError:
import builtins as __builtin__
import argparse
import atexit
import signal
import textwrap as textwrap
import gen_cmd as gc
import gen_misc as gm
import gen_print as gp
import gen_valid as gv
class MultilineFormatter(argparse.HelpFormatter):
def _fill_text(self, text, width, indent):
r"""
Split text into formatted lines for every "%%n" encountered in the text and return the result.
"""
lines = self._whitespace_matcher.sub(" ", text).strip().split("%n")
formatted_lines = [
textwrap.fill(
x, width, initial_indent=indent, subsequent_indent=indent
)
+ "\n"
for x in lines
]
return "".join(formatted_lines)
class ArgumentDefaultsHelpMultilineFormatter(
MultilineFormatter, argparse.ArgumentDefaultsHelpFormatter
):
pass
default_string = ' The default value is "%(default)s".'
module = sys.modules["__main__"]
def gen_get_options(parser, stock_list=[]):
r"""
Parse the command line arguments using the parser object passed and return True/False (i.e. pass/fail).
However, if gv.exit_on_error is set, simply exit the program on failure. Also set the following built in
values:
__builtin__.quiet This value is used by the qprint functions.
__builtin__.test_mode This value is used by command processing functions.
__builtin__.debug This value is used by the dprint functions.
__builtin__.arg_obj This value is used by print_program_header, etc.
__builtin__.parser This value is used by print_program_header, etc.
Description of arguments:
parser A parser object. See argparse module documentation for details.
stock_list The caller can use this parameter to request certain stock parameters
offered by this function. For example, this function will define a
"quiet" option upon request. This includes stop help text and parm
checking. The stock_list is a list of tuples each of which consists of
an arg_name and a default value. Example: stock_list = [("test_mode",
0), ("quiet", 1), ("debug", 0)]
"""
# This is a list of stock parms that we support.
master_stock_list = ["quiet", "test_mode", "debug", "loglevel"]
# Process stock_list.
for ix in range(0, len(stock_list)):
if len(stock_list[ix]) < 1:
error_message = (
"Programmer error - stock_list["
+ str(ix)
+ "] is supposed to be a tuple containing at"
+ " least one element which is the name of"
+ " the desired stock parameter:\n"
+ gp.sprint_var(stock_list)
)
return gv.process_error_message(error_message)
if isinstance(stock_list[ix], tuple):
arg_name = stock_list[ix][0]
default = stock_list[ix][1]
else:
arg_name = stock_list[ix]
default = None
if arg_name not in master_stock_list:
error_message = (
'Programmer error - arg_name "'
+ arg_name
+ '" not found found in stock list:\n'
+ gp.sprint_var(master_stock_list)
)
return gv.process_error_message(error_message)
if arg_name == "quiet":
if default is None:
default = 0
parser.add_argument(
"--quiet",
default=default,
type=int,
choices=[1, 0],
help='If this parameter is set to "1", %(prog)s'
+ " will print only essential information, i.e. it will"
+ " not echo parameters, echo commands, print the total"
+ " run time, etc."
+ default_string,
)
elif arg_name == "test_mode":
if default is None:
default = 0
parser.add_argument(
"--test_mode",
default=default,
type=int,
choices=[1, 0],
help="This means that %(prog)s should go through all the"
+ " motions but not actually do anything substantial."
+ " This is mainly to be used by the developer of"
+ " %(prog)s."
+ default_string,
)
elif arg_name == "debug":
if default is None:
default = 0
parser.add_argument(
"--debug",
default=default,
type=int,
choices=[1, 0],
help='If this parameter is set to "1", %(prog)s will print'
+ " additional debug information. This is mainly to be"
+ " used by the developer of %(prog)s."
+ default_string,
)
elif arg_name == "loglevel":
if default is None:
default = "info"
parser.add_argument(
"--loglevel",
default=default,
type=str,
choices=[
"DEBUG",
"INFO",
"WARNING",
"ERROR",
"CRITICAL",
"debug",
"info",
"warning",
"error",
"critical",
],
help='If this parameter is set to "1", %(prog)s will print'
+ " additional debug information. This is mainly to be"
+ " used by the developer of %(prog)s."
+ default_string,
)
arg_obj = parser.parse_args()
__builtin__.quiet = 0
__builtin__.test_mode = 0
__builtin__.debug = 0
__builtin__.loglevel = "WARNING"
for ix in range(0, len(stock_list)):
if isinstance(stock_list[ix], tuple):
arg_name = stock_list[ix][0]
default = stock_list[ix][1]
else:
arg_name = stock_list[ix]
default = None
if arg_name == "quiet":
__builtin__.quiet = arg_obj.quiet
elif arg_name == "test_mode":
__builtin__.test_mode = arg_obj.test_mode
elif arg_name == "debug":
__builtin__.debug = arg_obj.debug
elif arg_name == "loglevel":
__builtin__.loglevel = arg_obj.loglevel
__builtin__.arg_obj = arg_obj
__builtin__.parser = parser
# For each command line parameter, create a corresponding global variable and assign it the appropriate
# value. For example, if the command line contained "--last_name='Smith', we'll create a global variable
# named "last_name" with the value "Smith".
module = sys.modules["__main__"]
for key in arg_obj.__dict__:
setattr(module, key, getattr(__builtin__.arg_obj, key))
return True
def set_pgm_arg(var_value, var_name=None):
r"""
Set the value of the arg_obj.__dict__ entry named in var_name with the var_value provided. Also, set
corresponding global variable.
Description of arguments:
var_value The value to set in the variable.
var_name The name of the variable to set. This defaults to the name of the
variable used for var_value when calling this function.
"""
if var_name is None:
var_name = gp.get_arg_name(None, 1, 2)
arg_obj.__dict__[var_name] = var_value
module = sys.modules["__main__"]
setattr(module, var_name, var_value)
if var_name == "quiet":
__builtin__.quiet = var_value
elif var_name == "debug":
__builtin__.debug = var_value
elif var_name == "test_mode":
__builtin__.test_mode = var_value
def sprint_args(arg_obj, indent=0):
r"""
sprint_var all of the arguments found in arg_obj and return the result as a string.
Description of arguments:
arg_obj An argument object such as is returned by the argparse parse_args()
method.
indent The number of spaces to indent each line of output.
"""
col1_width = gp.dft_col1_width + indent
buffer = ""
for key in arg_obj.__dict__:
buffer += gp.sprint_varx(
key, getattr(arg_obj, key), 0, indent, col1_width
)
return buffer
def sync_args():
r"""
Synchronize the argument values to match their corresponding global variable values.
The user's validate_parms() function may manipulate global variables that correspond to program
arguments. After validate_parms() is called, sync_args is called to set the altered values back into the
arg_obj. This will ensure that the print-out of program arguments reflects the updated values.
Example:
def validate_parms():
# Set a default value for dir_path argument.
dir_path = gm.add_trailing_slash(gm.dft(dir_path, os.getcwd()))
"""
module = sys.modules["__main__"]
for key in arg_obj.__dict__:
arg_obj.__dict__[key] = getattr(module, key)
term_options = None
def set_term_options(**kwargs):
r"""
Set the global term_options.
If the global term_options is not None, gen_exit_function() will call terminate_descendants().
Description of arguments():
kwargs Supported keyword options follow:
term_requests Requests to terminate specified descendants of this program. The
following values for term_requests are supported:
children Terminate the direct children of this program.
descendants Terminate all descendants of this program.
<dictionary> A dictionary with support for the following keys:
pgm_names A list of program names which will be used to identify which descendant
processes should be terminated.
"""
global term_options
# Validation:
arg_names = list(kwargs.keys())
gv.valid_list(arg_names, ["term_requests"])
if type(kwargs["term_requests"]) is dict:
keys = list(kwargs["term_requests"].keys())
gv.valid_list(keys, ["pgm_names"])
else:
gv.valid_value(kwargs["term_requests"], ["children", "descendants"])
term_options = kwargs
if psutil_imported:
def match_process_by_pgm_name(process, pgm_name):
r"""
Return True or False to indicate whether the process matches the program name.
Description of argument(s):
process A psutil process object such as the one returned by psutil.Process().
pgm_name The name of a program to look for in the cmdline field of the process
object.
"""
# This function will examine elements 0 and 1 of the cmdline field of the process object. The
# following examples will illustrate the reasons for this:
# Example 1: Suppose a process was started like this:
# shell_cmd('python_pgm_template --quiet=0', fork=1)
# And then this function is called as follows:
# match_process_by_pgm_name(process, "python_pgm_template")
# The process object might contain the following for its cmdline field:
# cmdline:
# [0]: /usr/bin/python
# [1]: /my_path/python_pgm_template
# [2]: --quiet=0
# Because "python_pgm_template" is a python program, the python interpreter (e.g. "/usr/bin/python")
# will appear in entry 0 of cmdline and the python_pgm_template will appear in entry 1 (with a
# qualifying dir path).
# Example 2: Suppose a process was started like this:
# shell_cmd('sleep 5', fork=1)
# And then this function is called as follows:
# match_process_by_pgm_name(process, "sleep")
# The process object might contain the following for its cmdline field:
# cmdline:
# [0]: sleep
# [1]: 5
# Because "sleep" is a compiled executable, it will appear in entry 0.
optional_dir_path_regex = "(.*/)?"
cmdline = process.as_dict()["cmdline"]
return re.match(
optional_dir_path_regex + pgm_name + "( |$)", cmdline[0]
) or re.match(optional_dir_path_regex + pgm_name + "( |$)", cmdline[1])
def select_processes_by_pgm_name(processes, pgm_name):
r"""
Select the processes that match pgm_name and return the result as a list of process objects.
Description of argument(s):
processes A list of psutil process objects such as the one returned by
psutil.Process().
pgm_name The name of a program to look for in the cmdline field of each process
object.
"""
return [
process
for process in processes
if match_process_by_pgm_name(process, pgm_name)
]
def sprint_process_report(pids):
r"""
Create a process report for the given pids and return it as a string.
Description of argument(s):
pids A list of process IDs for processes to be included in the report.
"""
report = "\n"
cmd_buf = (
"echo ; ps wwo user,pgrp,pid,ppid,lstart,cmd --forest "
+ " ".join(pids)
)
report += gp.sprint_issuing(cmd_buf)
rc, outbuf = gc.shell_cmd(cmd_buf, quiet=1)
report += outbuf + "\n"
return report
def get_descendant_info(process=psutil.Process()):
r"""
Get info about the descendants of the given process and return as a tuple of descendants,
descendant_pids and process_report.
descendants will be a list of process objects. descendant_pids will be a list of pids (in str form)
and process_report will be a report produced by a call to sprint_process_report().
Description of argument(s):
process A psutil process object such as the one returned by psutil.Process().
"""
descendants = process.children(recursive=True)
descendant_pids = [str(process.pid) for process in descendants]
if descendants:
process_report = sprint_process_report(
[str(process.pid)] + descendant_pids
)
else:
process_report = ""
return descendants, descendant_pids, process_report
def terminate_descendants():
r"""
Terminate descendants of the current process according to the requirements layed out in global
term_options variable.
Note: If term_options is not null, gen_exit_function() will automatically call this function.
When this function gets called, descendant processes may be running and may be printing to the same
stdout stream being used by this process. If this function writes directly to stdout, its output can
be interspersed with any output generated by descendant processes. This makes it very difficult to
interpret the output. In order solve this problem, the activity of this process will be logged to a
temporary file. After descendant processes have been terminated successfully, the temporary file
will be printed to stdout and then deleted. However, if this function should fail to complete (i.e.
get hung waiting for descendants to terminate gracefully), the temporary file will not be deleted and
can be used by the developer for debugging. If no descendant processes are found, this function will
return before creating the temporary file.
Note that a general principal being observed here is that each process is responsible for the
children it produces.
"""
message = (
"\n" + gp.sprint_dashes(width=120) + gp.sprint_executing() + "\n"
)
current_process = psutil.Process()
descendants, descendant_pids, process_report = get_descendant_info(
current_process
)
if not descendants:
# If there are no descendants, then we have nothing to do.
return
terminate_descendants_temp_file_path = gm.create_temp_file_path()
gp.print_vars(terminate_descendants_temp_file_path)
message += (
gp.sprint_varx("pgm_name", gp.pgm_name)
+ gp.sprint_vars(term_options)
+ process_report
)
# Process the termination requests:
if term_options["term_requests"] == "children":
term_processes = current_process.children(recursive=False)
term_pids = [str(process.pid) for process in term_processes]
elif term_options["term_requests"] == "descendants":
term_processes = descendants
term_pids = descendant_pids
else:
# Process term requests by pgm_names.
term_processes = []
for pgm_name in term_options["term_requests"]["pgm_names"]:
term_processes.extend(
select_processes_by_pgm_name(descendants, pgm_name)
)
term_pids = [str(process.pid) for process in term_processes]
message += gp.sprint_timen(
"Processes to be terminated:"
) + gp.sprint_var(term_pids)
for process in term_processes:
process.terminate()
message += gp.sprint_timen(
"Waiting on the following pids: " + " ".join(descendant_pids)
)
gm.append_file(terminate_descendants_temp_file_path, message)
psutil.wait_procs(descendants)
# Checking after the fact to see whether any descendant processes are still alive. If so, a process
# report showing this will be included in the output.
descendants, descendant_pids, process_report = get_descendant_info(
current_process
)
if descendants:
message = (
"\n"
+ gp.sprint_timen("Not all of the processes terminated:")
+ process_report
)
gm.append_file(terminate_descendants_temp_file_path, message)
message = gp.sprint_dashes(width=120)
gm.append_file(terminate_descendants_temp_file_path, message)
gp.print_file(terminate_descendants_temp_file_path)
os.remove(terminate_descendants_temp_file_path)
def gen_exit_function():
r"""
Execute whenever the program ends normally or with the signals that we catch (i.e. TERM, INT).
"""
# ignore_err influences the way shell_cmd processes errors. Since we're doing exit processing, we don't
# want to stop the program due to a shell_cmd failure.
ignore_err = 1
if psutil_imported and term_options:
terminate_descendants()
# Call the main module's exit_function if it is defined.
exit_function = getattr(module, "exit_function", None)
if exit_function:
exit_function()
gp.qprint_pgm_footer()
def gen_signal_handler(signal_number, frame):
r"""
Handle signals. Without a function to catch a SIGTERM or SIGINT, the program would terminate immediately
with return code 143 and without calling the exit_function.
"""
# The convention is to set up exit_function with atexit.register() so there is no need to explicitly
# call exit_function from here.
gp.qprint_executing()
# Calling exit prevents control from returning to the code that was running when the signal was received.
exit(0)
def gen_post_validation(exit_function=None, signal_handler=None):
r"""
Do generic post-validation processing. By "post", we mean that this is to be called from a validation
function after the caller has done any validation desired. If the calling program passes exit_function
and signal_handler parms, this function will register them. In other words, it will make the
signal_handler functions get called for SIGINT and SIGTERM and will make the exit_function function run
prior to the termination of the program.
Description of arguments:
exit_function A function object pointing to the caller's exit function. This defaults
to this module's gen_exit_function.
signal_handler A function object pointing to the caller's signal_handler function. This
defaults to this module's gen_signal_handler.
"""
# Get defaults.
exit_function = exit_function or gen_exit_function
signal_handler = signal_handler or gen_signal_handler
atexit.register(exit_function)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
def gen_setup():
r"""
Do general setup for a program.
"""
# Set exit_on_error for gen_valid functions.
gv.set_exit_on_error(True)
# Get main module variable values.
parser = getattr(module, "parser")
stock_list = getattr(module, "stock_list")
validate_parms = getattr(module, "validate_parms", None)
gen_get_options(parser, stock_list)
if validate_parms:
validate_parms()
sync_args()
gen_post_validation()
gp.qprint_pgm_header()