Added vmi certificate stage2 testcases and supporting library functions
Changes-
added testcases:
-Get_Concurrent_Root_Certificate_Requests_From_Multiple_Admins
-Get_Concurrent_CSR_Requests_From_Multiple_Admins
-Get_Concurrent_Corrupted_CSR_Requests_From_Multiple_Admins
library functions:
-execute_keyword_args
-execute_process_multi_keyword
Change-Id: I9cd8266ba4213eb6a476b9666beb512872f87dde
Signed-off-by: shrsuman123 <shrsuman@in.ibm.com>
diff --git a/lib/jobs_processing.py b/lib/jobs_processing.py
index ea9bc67..3368bd7 100644
--- a/lib/jobs_processing.py
+++ b/lib/jobs_processing.py
@@ -9,6 +9,7 @@
from robot.libraries.BuiltIn import BuiltIn
from multiprocessing import Process, Manager
import os
+import datetime
import gen_print as gp
@@ -58,3 +59,60 @@
# Return function return codes.
return return_dict
+
+
+def execute_keyword_args(keyword_name, args, return_dict):
+ r"""
+ Execute a robot keyword with arguments.
+ In addition to running the caller's keyword, this function will:
+ - Add an entry to the return_dict
+ Description of argument(s):
+ keyword_name Keyword name to be executed.
+ args Arguments to keyword.
+ return_dict A dictionary consisting of pid/process status for the
+ keys/values. This function will append a new entry to
+ this dictionary.
+ """
+
+ execution_time = datetime.datetime.now()
+
+ status = BuiltIn().run_keyword_and_return_status(keyword_name, *args)
+
+ # Build execution time:<status> dictionary.
+ return_dict[str(execution_time)] = str(status)
+
+
+def execute_process_multi_keyword(number_args, *keyword_names):
+ r"""
+ Execute multiple robot keywords with arguments via multiprocessing process.
+
+ Description of argument(s):
+ number_args Number of argument in keywords.
+ keyword_names Keyword name to be executed.
+ """
+
+ manager = Manager()
+ return_dict = manager.dict()
+ process_list = []
+ # Append each keyword with its arguments in a process to execute.
+ for keywords_data in keyword_names:
+ keyword_args = tuple(keywords_data.split(" ")[-number_args:])
+ keyword_name = " ".join(keywords_data.split(" ")[:-number_args])
+ task = Process(target=execute_keyword_args,
+ args=(keyword_name, keyword_args, return_dict))
+ process_list.append(task)
+ task.start()
+
+ # Wait for process to complete.
+ for task in process_list:
+ task.join()
+ return return_dict
+
+
+def get_current_date_time():
+ r"""
+ Gets current time.
+ """
+
+ current_time = datetime.datetime.now().strftime("%H:%M:%S.%f")
+ return current_time