| #!/usr/bin/env python3 |
| |
| r""" |
| This module provides valuable argument processing functions like gen_get_options and sprint_args. |
| """ |
| |
| import os |
| import re |
| import sys |
| |
| try: |
| import psutil |
| |
| psutil_imported = True |
| except ImportError: |
| psutil_imported = False |
| try: |
| import __builtin__ |
| except ImportError: |
| import builtins as __builtin__ |
| |
| import argparse |
| import atexit |
| import signal |
| import textwrap as textwrap |
| |
| import gen_cmd as gc |
| import gen_misc as gm |
| import gen_print as gp |
| import gen_valid as gv |
| |
| |
| class MultilineFormatter(argparse.HelpFormatter): |
| def _fill_text(self, text, width, indent): |
| r""" |
| Split text into formatted lines for every "%%n" encountered in the text and return the result. |
| """ |
| lines = self._whitespace_matcher.sub(" ", text).strip().split("%n") |
| formatted_lines = [ |
| textwrap.fill( |
| x, width, initial_indent=indent, subsequent_indent=indent |
| ) |
| + "\n" |
| for x in lines |
| ] |
| return "".join(formatted_lines) |
| |
| |
| class ArgumentDefaultsHelpMultilineFormatter( |
| MultilineFormatter, argparse.ArgumentDefaultsHelpFormatter |
| ): |
| pass |
| |
| |
| default_string = ' The default value is "%(default)s".' |
| module = sys.modules["__main__"] |
| |
| |
| def gen_get_options(parser, stock_list=[]): |
| r""" |
| Parse the command line arguments using the parser object passed and return True/False (i.e. pass/fail). |
| However, if gv.exit_on_error is set, simply exit the program on failure. Also set the following built in |
| values: |
| |
| __builtin__.quiet This value is used by the qprint functions. |
| __builtin__.test_mode This value is used by command processing functions. |
| __builtin__.debug This value is used by the dprint functions. |
| __builtin__.arg_obj This value is used by print_program_header, etc. |
| __builtin__.parser This value is used by print_program_header, etc. |
| |
| Description of arguments: |
| parser A parser object. See argparse module documentation for details. |
| stock_list The caller can use this parameter to request certain stock parameters |
| offered by this function. For example, this function will define a |
| "quiet" option upon request. This includes stop help text and parm |
| checking. The stock_list is a list of tuples each of which consists of |
| an arg_name and a default value. Example: stock_list = [("test_mode", |
| 0), ("quiet", 1), ("debug", 0)] |
| """ |
| |
| # This is a list of stock parms that we support. |
| master_stock_list = ["quiet", "test_mode", "debug", "loglevel"] |
| |
| # Process stock_list. |
| for ix in range(0, len(stock_list)): |
| if len(stock_list[ix]) < 1: |
| error_message = ( |
| "Programmer error - stock_list[" |
| + str(ix) |
| + "] is supposed to be a tuple containing at" |
| + " least one element which is the name of" |
| + " the desired stock parameter:\n" |
| + gp.sprint_var(stock_list) |
| ) |
| return gv.process_error_message(error_message) |
| if isinstance(stock_list[ix], tuple): |
| arg_name = stock_list[ix][0] |
| default = stock_list[ix][1] |
| else: |
| arg_name = stock_list[ix] |
| default = None |
| |
| if arg_name not in master_stock_list: |
| error_message = ( |
| 'Programmer error - arg_name "' |
| + arg_name |
| + '" not found found in stock list:\n' |
| + gp.sprint_var(master_stock_list) |
| ) |
| return gv.process_error_message(error_message) |
| |
| if arg_name == "quiet": |
| if default is None: |
| default = 0 |
| parser.add_argument( |
| "--quiet", |
| default=default, |
| type=int, |
| choices=[1, 0], |
| help='If this parameter is set to "1", %(prog)s' |
| + " will print only essential information, i.e. it will" |
| + " not echo parameters, echo commands, print the total" |
| + " run time, etc." |
| + default_string, |
| ) |
| elif arg_name == "test_mode": |
| if default is None: |
| default = 0 |
| parser.add_argument( |
| "--test_mode", |
| default=default, |
| type=int, |
| choices=[1, 0], |
| help="This means that %(prog)s should go through all the" |
| + " motions but not actually do anything substantial." |
| + " This is mainly to be used by the developer of" |
| + " %(prog)s." |
| + default_string, |
| ) |
| elif arg_name == "debug": |
| if default is None: |
| default = 0 |
| parser.add_argument( |
| "--debug", |
| default=default, |
| type=int, |
| choices=[1, 0], |
| help='If this parameter is set to "1", %(prog)s will print' |
| + " additional debug information. This is mainly to be" |
| + " used by the developer of %(prog)s." |
| + default_string, |
| ) |
| elif arg_name == "loglevel": |
| if default is None: |
| default = "info" |
| parser.add_argument( |
| "--loglevel", |
| default=default, |
| type=str, |
| choices=[ |
| "DEBUG", |
| "INFO", |
| "WARNING", |
| "ERROR", |
| "CRITICAL", |
| "debug", |
| "info", |
| "warning", |
| "error", |
| "critical", |
| ], |
| help='If this parameter is set to "1", %(prog)s will print' |
| + " additional debug information. This is mainly to be" |
| + " used by the developer of %(prog)s." |
| + default_string, |
| ) |
| |
| arg_obj = parser.parse_args() |
| |
| __builtin__.quiet = 0 |
| __builtin__.test_mode = 0 |
| __builtin__.debug = 0 |
| __builtin__.loglevel = "WARNING" |
| for ix in range(0, len(stock_list)): |
| if isinstance(stock_list[ix], tuple): |
| arg_name = stock_list[ix][0] |
| default = stock_list[ix][1] |
| else: |
| arg_name = stock_list[ix] |
| default = None |
| if arg_name == "quiet": |
| __builtin__.quiet = arg_obj.quiet |
| elif arg_name == "test_mode": |
| __builtin__.test_mode = arg_obj.test_mode |
| elif arg_name == "debug": |
| __builtin__.debug = arg_obj.debug |
| elif arg_name == "loglevel": |
| __builtin__.loglevel = arg_obj.loglevel |
| |
| __builtin__.arg_obj = arg_obj |
| __builtin__.parser = parser |
| |
| # For each command line parameter, create a corresponding global variable and assign it the appropriate |
| # value. For example, if the command line contained "--last_name='Smith', we'll create a global variable |
| # named "last_name" with the value "Smith". |
| module = sys.modules["__main__"] |
| for key in arg_obj.__dict__: |
| setattr(module, key, getattr(__builtin__.arg_obj, key)) |
| |
| return True |
| |
| |
| def set_pgm_arg(var_value, var_name=None): |
| r""" |
| Set the value of the arg_obj.__dict__ entry named in var_name with the var_value provided. Also, set |
| corresponding global variable. |
| |
| Description of arguments: |
| var_value The value to set in the variable. |
| var_name The name of the variable to set. This defaults to the name of the |
| variable used for var_value when calling this function. |
| """ |
| |
| if var_name is None: |
| var_name = gp.get_arg_name(None, 1, 2) |
| |
| arg_obj.__dict__[var_name] = var_value |
| module = sys.modules["__main__"] |
| setattr(module, var_name, var_value) |
| if var_name == "quiet": |
| __builtin__.quiet = var_value |
| elif var_name == "debug": |
| __builtin__.debug = var_value |
| elif var_name == "test_mode": |
| __builtin__.test_mode = var_value |
| |
| |
| def sprint_args(arg_obj, indent=0): |
| r""" |
| sprint_var all of the arguments found in arg_obj and return the result as a string. |
| |
| Description of arguments: |
| arg_obj An argument object such as is returned by the argparse parse_args() |
| method. |
| indent The number of spaces to indent each line of output. |
| """ |
| |
| col1_width = gp.dft_col1_width + indent |
| |
| buffer = "" |
| for key in arg_obj.__dict__: |
| buffer += gp.sprint_varx( |
| key, getattr(arg_obj, key), 0, indent, col1_width |
| ) |
| return buffer |
| |
| |
| def sync_args(): |
| r""" |
| Synchronize the argument values to match their corresponding global variable values. |
| |
| The user's validate_parms() function may manipulate global variables that correspond to program |
| arguments. After validate_parms() is called, sync_args is called to set the altered values back into the |
| arg_obj. This will ensure that the print-out of program arguments reflects the updated values. |
| |
| Example: |
| |
| def validate_parms(): |
| |
| # Set a default value for dir_path argument. |
| dir_path = gm.add_trailing_slash(gm.dft(dir_path, os.getcwd())) |
| """ |
| module = sys.modules["__main__"] |
| for key in arg_obj.__dict__: |
| arg_obj.__dict__[key] = getattr(module, key) |
| |
| |
| term_options = None |
| |
| |
| def set_term_options(**kwargs): |
| r""" |
| Set the global term_options. |
| |
| If the global term_options is not None, gen_exit_function() will call terminate_descendants(). |
| |
| Description of arguments(): |
| kwargs Supported keyword options follow: |
| term_requests Requests to terminate specified descendants of this program. The |
| following values for term_requests are supported: |
| children Terminate the direct children of this program. |
| descendants Terminate all descendants of this program. |
| <dictionary> A dictionary with support for the following keys: |
| pgm_names A list of program names which will be used to identify which descendant |
| processes should be terminated. |
| """ |
| |
| global term_options |
| # Validation: |
| arg_names = list(kwargs.keys()) |
| gv.valid_list(arg_names, ["term_requests"]) |
| if type(kwargs["term_requests"]) is dict: |
| keys = list(kwargs["term_requests"].keys()) |
| gv.valid_list(keys, ["pgm_names"]) |
| else: |
| gv.valid_value(kwargs["term_requests"], ["children", "descendants"]) |
| term_options = kwargs |
| |
| |
| if psutil_imported: |
| |
| def match_process_by_pgm_name(process, pgm_name): |
| r""" |
| Return True or False to indicate whether the process matches the program name. |
| |
| Description of argument(s): |
| process A psutil process object such as the one returned by psutil.Process(). |
| pgm_name The name of a program to look for in the cmdline field of the process |
| object. |
| """ |
| |
| # This function will examine elements 0 and 1 of the cmdline field of the process object. The |
| # following examples will illustrate the reasons for this: |
| |
| # Example 1: Suppose a process was started like this: |
| |
| # shell_cmd('python_pgm_template --quiet=0', fork=1) |
| |
| # And then this function is called as follows: |
| |
| # match_process_by_pgm_name(process, "python_pgm_template") |
| |
| # The process object might contain the following for its cmdline field: |
| |
| # cmdline: |
| # [0]: /usr/bin/python |
| # [1]: /my_path/python_pgm_template |
| # [2]: --quiet=0 |
| |
| # Because "python_pgm_template" is a python program, the python interpreter (e.g. "/usr/bin/python") |
| # will appear in entry 0 of cmdline and the python_pgm_template will appear in entry 1 (with a |
| # qualifying dir path). |
| |
| # Example 2: Suppose a process was started like this: |
| |
| # shell_cmd('sleep 5', fork=1) |
| |
| # And then this function is called as follows: |
| |
| # match_process_by_pgm_name(process, "sleep") |
| |
| # The process object might contain the following for its cmdline field: |
| |
| # cmdline: |
| # [0]: sleep |
| # [1]: 5 |
| |
| # Because "sleep" is a compiled executable, it will appear in entry 0. |
| |
| optional_dir_path_regex = "(.*/)?" |
| cmdline = process.as_dict()["cmdline"] |
| return re.match( |
| optional_dir_path_regex + pgm_name + "( |$)", cmdline[0] |
| ) or re.match(optional_dir_path_regex + pgm_name + "( |$)", cmdline[1]) |
| |
| def select_processes_by_pgm_name(processes, pgm_name): |
| r""" |
| Select the processes that match pgm_name and return the result as a list of process objects. |
| |
| Description of argument(s): |
| processes A list of psutil process objects such as the one returned by |
| psutil.Process(). |
| pgm_name The name of a program to look for in the cmdline field of each process |
| object. |
| """ |
| |
| return [ |
| process |
| for process in processes |
| if match_process_by_pgm_name(process, pgm_name) |
| ] |
| |
| def sprint_process_report(pids): |
| r""" |
| Create a process report for the given pids and return it as a string. |
| |
| Description of argument(s): |
| pids A list of process IDs for processes to be included in the report. |
| """ |
| report = "\n" |
| cmd_buf = ( |
| "echo ; ps wwo user,pgrp,pid,ppid,lstart,cmd --forest " |
| + " ".join(pids) |
| ) |
| report += gp.sprint_issuing(cmd_buf) |
| rc, outbuf = gc.shell_cmd(cmd_buf, quiet=1) |
| report += outbuf + "\n" |
| |
| return report |
| |
| def get_descendant_info(process=psutil.Process()): |
| r""" |
| Get info about the descendants of the given process and return as a tuple of descendants, |
| descendant_pids and process_report. |
| |
| descendants will be a list of process objects. descendant_pids will be a list of pids (in str form) |
| and process_report will be a report produced by a call to sprint_process_report(). |
| |
| Description of argument(s): |
| process A psutil process object such as the one returned by psutil.Process(). |
| """ |
| descendants = process.children(recursive=True) |
| descendant_pids = [str(process.pid) for process in descendants] |
| if descendants: |
| process_report = sprint_process_report( |
| [str(process.pid)] + descendant_pids |
| ) |
| else: |
| process_report = "" |
| return descendants, descendant_pids, process_report |
| |
| def terminate_descendants(): |
| r""" |
| Terminate descendants of the current process according to the requirements layed out in global |
| term_options variable. |
| |
| Note: If term_options is not null, gen_exit_function() will automatically call this function. |
| |
| When this function gets called, descendant processes may be running and may be printing to the same |
| stdout stream being used by this process. If this function writes directly to stdout, its output can |
| be interspersed with any output generated by descendant processes. This makes it very difficult to |
| interpret the output. In order solve this problem, the activity of this process will be logged to a |
| temporary file. After descendant processes have been terminated successfully, the temporary file |
| will be printed to stdout and then deleted. However, if this function should fail to complete (i.e. |
| get hung waiting for descendants to terminate gracefully), the temporary file will not be deleted and |
| can be used by the developer for debugging. If no descendant processes are found, this function will |
| return before creating the temporary file. |
| |
| Note that a general principal being observed here is that each process is responsible for the |
| children it produces. |
| """ |
| |
| message = ( |
| "\n" + gp.sprint_dashes(width=120) + gp.sprint_executing() + "\n" |
| ) |
| |
| current_process = psutil.Process() |
| |
| descendants, descendant_pids, process_report = get_descendant_info( |
| current_process |
| ) |
| if not descendants: |
| # If there are no descendants, then we have nothing to do. |
| return |
| |
| terminate_descendants_temp_file_path = gm.create_temp_file_path() |
| gp.print_vars(terminate_descendants_temp_file_path) |
| |
| message += ( |
| gp.sprint_varx("pgm_name", gp.pgm_name) |
| + gp.sprint_vars(term_options) |
| + process_report |
| ) |
| |
| # Process the termination requests: |
| if term_options["term_requests"] == "children": |
| term_processes = current_process.children(recursive=False) |
| term_pids = [str(process.pid) for process in term_processes] |
| elif term_options["term_requests"] == "descendants": |
| term_processes = descendants |
| term_pids = descendant_pids |
| else: |
| # Process term requests by pgm_names. |
| term_processes = [] |
| for pgm_name in term_options["term_requests"]["pgm_names"]: |
| term_processes.extend( |
| select_processes_by_pgm_name(descendants, pgm_name) |
| ) |
| term_pids = [str(process.pid) for process in term_processes] |
| |
| message += gp.sprint_timen( |
| "Processes to be terminated:" |
| ) + gp.sprint_var(term_pids) |
| for process in term_processes: |
| process.terminate() |
| message += gp.sprint_timen( |
| "Waiting on the following pids: " + " ".join(descendant_pids) |
| ) |
| gm.append_file(terminate_descendants_temp_file_path, message) |
| psutil.wait_procs(descendants) |
| |
| # Checking after the fact to see whether any descendant processes are still alive. If so, a process |
| # report showing this will be included in the output. |
| descendants, descendant_pids, process_report = get_descendant_info( |
| current_process |
| ) |
| if descendants: |
| message = ( |
| "\n" |
| + gp.sprint_timen("Not all of the processes terminated:") |
| + process_report |
| ) |
| gm.append_file(terminate_descendants_temp_file_path, message) |
| |
| message = gp.sprint_dashes(width=120) |
| gm.append_file(terminate_descendants_temp_file_path, message) |
| gp.print_file(terminate_descendants_temp_file_path) |
| os.remove(terminate_descendants_temp_file_path) |
| |
| |
| def gen_exit_function(): |
| r""" |
| Execute whenever the program ends normally or with the signals that we catch (i.e. TERM, INT). |
| """ |
| |
| # ignore_err influences the way shell_cmd processes errors. Since we're doing exit processing, we don't |
| # want to stop the program due to a shell_cmd failure. |
| ignore_err = 1 |
| |
| if psutil_imported and term_options: |
| terminate_descendants() |
| |
| # Call the main module's exit_function if it is defined. |
| exit_function = getattr(module, "exit_function", None) |
| if exit_function: |
| exit_function() |
| |
| gp.qprint_pgm_footer() |
| |
| |
| def gen_signal_handler(signal_number, frame): |
| r""" |
| Handle signals. Without a function to catch a SIGTERM or SIGINT, the program would terminate immediately |
| with return code 143 and without calling the exit_function. |
| """ |
| |
| # The convention is to set up exit_function with atexit.register() so there is no need to explicitly |
| # call exit_function from here. |
| |
| gp.qprint_executing() |
| |
| # Calling exit prevents control from returning to the code that was running when the signal was received. |
| exit(0) |
| |
| |
| def gen_post_validation(exit_function=None, signal_handler=None): |
| r""" |
| Do generic post-validation processing. By "post", we mean that this is to be called from a validation |
| function after the caller has done any validation desired. If the calling program passes exit_function |
| and signal_handler parms, this function will register them. In other words, it will make the |
| signal_handler functions get called for SIGINT and SIGTERM and will make the exit_function function run |
| prior to the termination of the program. |
| |
| Description of arguments: |
| exit_function A function object pointing to the caller's exit function. This defaults |
| to this module's gen_exit_function. |
| signal_handler A function object pointing to the caller's signal_handler function. This |
| defaults to this module's gen_signal_handler. |
| """ |
| |
| # Get defaults. |
| exit_function = exit_function or gen_exit_function |
| signal_handler = signal_handler or gen_signal_handler |
| |
| atexit.register(exit_function) |
| signal.signal(signal.SIGINT, signal_handler) |
| signal.signal(signal.SIGTERM, signal_handler) |
| |
| |
| def gen_setup(): |
| r""" |
| Do general setup for a program. |
| """ |
| |
| # Set exit_on_error for gen_valid functions. |
| gv.set_exit_on_error(True) |
| |
| # Get main module variable values. |
| parser = getattr(module, "parser") |
| stock_list = getattr(module, "stock_list") |
| validate_parms = getattr(module, "validate_parms", None) |
| |
| gen_get_options(parser, stock_list) |
| |
| if validate_parms: |
| validate_parms() |
| sync_args() |
| gen_post_validation() |
| |
| gp.qprint_pgm_header() |