Refactoring run_desktop_tests

The goal of this refactoring is to make the code readable after it has grown to the current state.
Also, fixed the bug where unexpectedly quitting tests were ignored by Jenkins and fixed the test duration for Jenkins.
This commit is contained in:
Timofey 2015-07-22 20:14:39 +03:00 committed by Alex Zolotarev
parent f51f074991
commit df7871bade
2 changed files with 288 additions and 248 deletions

View file

@ -18,10 +18,9 @@ be found, i.e. the tests that were specified in the skip list, but do not exist.
from __future__ import print_function
import getopt
from os import listdir
from optparse import OptionParser
from os import listdir, remove
from os.path import isfile, join
import os
import socket
import subprocess
import sys
@ -29,16 +28,6 @@ import testserver
import urllib2
tests_path = ""
workspace_path = "omim-build-release/out/release"
skiplist = []
runlist = []
logfile = "testlog.log"
data_path = ""
user_resource_path = ""
TO_RUN = "to_run"
SKIP = "skip"
NOT_FOUND = "not_found"
@ -47,169 +36,148 @@ PASSED = "passed"
PORT = 34568
def print_pretty(result, tests):
if len(tests) == 0:
return
print("")
print(result.upper())
class TestRunner:
for test in tests:
print("- {test}".format(test=test))
def print_pretty(self, result, tests):
if not tests:
return
print("\n{result}".format(result=result.upper()))
for test in tests:
print("- {test}".format(test=test))
def usage():
print("""
Possbile options:
def set_global_vars(self):
-h --help : print this help
parser = OptionParser()
parser.add_option("-o", "--output", dest="output", default="testlog.log", help="resulting log file. Default testlog.log")
parser.add_option("-f", "--folder", dest="folder", default="omim-build-release/out/release", help="specify the folder where the tests reside (absolute path or relative to the location of this script)")
parser.add_option("-d", "--data_path", dest="data_path", help="Path to data files (passed to the test executables as --data_path=<value>)")
parser.add_option("-u", "--user_resource_path", dest="resource_path", help="Path to resources, styles and classificators (passed to the test executables as --user_resource_path=<value>)")
parser.add_option("-i", "--include", dest="runlist", action="append", default=[], help="Include test into execution, comma separated list with no spaces or individual tests, or both. E.g.: -i one -i two -i three,four,five")
parser.add_option("-e", "--exclude", dest="skiplist", action="append", default=[], help="Exclude test from execution, comma separated list with no spaces or individual tests, or both. E.g.: -i one -i two -i three,four,five")
(options, args) = parser.parse_args()
-f --folder : specify the folder where the tests reside (absolute path or relative to the location of this script)
-e --exclude: list of tests to exclude, comma separated, no spaces allowed
-i --include: list of tests to be run, overrides -e
-o --output : resulting log file. Default testlog.log
-d --data_path : Path to data files (passed to the test executables as --data_path=<value>)
-u --user_resource_path : Path to resources, styles and classificators (passed to the test executables as --user_resource_path=<value>)
Example
./run_desktop_tests.py -f /Users/Jenkins/Home/jobs/Multiplatform/workspace/omim-build-release/out/release -e drape_tests,some_other_tests -o my_log_file.log
""")
def set_global_vars():
global skiplist
global logfile
global runlist
global workspace_path
global data_path
global user_resource_path
try:
opts, args = getopt.getopt(sys.argv[1:], "he:f:o:i:d:u:",
["help", "exclude=", "include=", "folder=", "output=", "data_path=", "user_resource_path="])
except getopt.GetoptError as err:
print(str(err))
usage()
sys.exit(2)
for option, argument in opts:
if option in ("-h", "--help"):
usage()
sys.exit()
if option in ("-o", "--output"):
logfile = argument
elif option in ("-e", "--exclude"):
skiplist = list(set(argument.split(",")))
elif option in ("-i", "--include"):
print("\n-i option found, -e option will be ignored!")
runlist = argument.split(",")
elif option in ("-f", "--folder"):
workspace_path = argument
elif option in ("-d", "--data_path"):
data_path = " --data_path={argument} ".format(argument=argument)
elif option in ("-u", "--user_resource_path"):
user_resource_path = " --user_resource_path={argument} ".format(argument=argument)
self.skiplist = set()
self.runlist = list()
for tests in options.skiplist:
for test in tests.split(","):
self.skiplist.add(test)
for tests in options.runlist:
self.runlist.extend(tests.split(","))
if self.runlist:
print("WARNING: -i option found, the -e option will be ignored")
self.workspace_path = options.folder
self.logfile = options.output
self.data_path = (" --data_path={0}".format(options.data_path) if options.data_path else "")
self.user_resource_path = (" --user_resource_path={0}".format(options.resource_path) if options.resource_path else "")
def start_server(self):
server = testserver.TestServer()
server.start_serving()
def stop_server(self):
try:
urllib2.urlopen('http://localhost:{port}/kill'.format(port=PORT), timeout=5)
except (urllib2.URLError, socket.timeout):
print("Failed to stop the server...")
def categorize_tests(self):
tests_to_run = []
local_skiplist = []
not_found = []
test_files_in_dir = filter(lambda x: x.endswith("_tests"), listdir(self.workspace_path))
on_disk = lambda x: x in test_files_in_dir
not_on_disk = lambda x : not on_disk(x)
if not self.runlist:
local_skiplist = filter(on_disk, self.skiplist)
not_found = filter(not_on_disk, self.skiplist)
tests_to_run = filter(lambda x: x not in local_skiplist, test_files_in_dir)
else:
assert False, "unhandled option"
tests_to_run = filter(on_disk, self.runlist)
not_found = filter(not_on_disk, self.runlist)
def start_server():
server = testserver.TestServer()
server.start_serving()
def stop_server():
try:
urllib2.urlopen('http://localhost:{port}/kill'.format(port=PORT), timeout=5)
except (urllib2.URLError, socket.timeout):
print("Failed to stop the server...")
def categorize_tests():
global skiplist
tests_to_run = []
local_skiplist = []
not_found = []
test_files_in_dir = filter(lambda x: x.endswith("_tests"), listdir(workspace_path))
on_disk = lambda x: x in test_files_in_dir
not_on_disk = lambda x : not on_disk(x)
if len(runlist) == 0:
local_skiplist = filter(on_disk, skiplist)
not_found = filter(not_on_disk, skiplist)
tests_to_run = filter(lambda x: x not in local_skiplist, test_files_in_dir)
else:
tests_to_run = filter(on_disk, runlist)
not_found = filter(not_on_disk, runlist)
return {TO_RUN:tests_to_run, SKIP:local_skiplist, NOT_FOUND:not_found}
return {TO_RUN:tests_to_run, SKIP:local_skiplist, NOT_FOUND:not_found}
def run_tests(tests_to_run):
def run_tests(self, tests_to_run):
failed = []
passed = []
failed = []
passed = []
for test_file in tests_to_run:
self.log_exec_file(test_file)
server = None
for file in tests_to_run:
if file == "platform_tests":
start_server()
if test_file == "platform_tests":
self.start_server()
file="{file}{data}{resources}".format(file=file, data=data_path, resources=user_resource_path)
test_file_with_keys = "{test_file}{data}{resources}".format(test_file=test_file, data=self.data_path, resources=self.user_resource_path)
process = subprocess.Popen("{tests_path}/{file} 2>> {logfile}".
format(tests_path=workspace_path, file=file, logfile=logfile),
print(test_file_with_keys)
process = subprocess.Popen("{tests_path}/{test_file} 2>> {logfile}".
format(tests_path=self.workspace_path, test_file=test_file_with_keys, logfile=self.logfile),
shell=True,
stdout=subprocess.PIPE)
process.wait()
process.wait()
if file == "platform_tests":
stop_server()
if test_file == "platform_tests":
self.stop_server()
if process.returncode > 0:
failed.append(file)
else:
passed.append(file)
if process.returncode > 0:
failed.append(test_file)
else:
passed.append(test_file)
self.log_exec_file(test_file, result=process.returncode)
return {FAILED: failed, PASSED: passed}
return {FAILED: failed, PASSED: passed}
def rm_log_file():
try:
os.remove(logfile)
except OSError:
pass
def log_exec_file(self, filename, result=None):
logstring = ("END" if result else "BEGIN")
resstring = (" | result: {returncode}".format(returncode=result) if result else "")
with open(self.logfile, "a") as logf:
logf.write("\n{logstring}: {filename}{resstring}\n".format(logstring=logstring, filename=filename, resstring=resstring))
def main():
set_global_vars()
rm_log_file()
categorized_tests = categorize_tests()
results = run_tests(categorized_tests[TO_RUN])
print_pretty("failed", results[FAILED])
print_pretty("skipped", categorized_tests[SKIP])
print_pretty("passed", results[PASSED])
print_pretty("not found", categorized_tests[NOT_FOUND])
def rm_log_file(self):
try:
remove(self.logfile)
except OSError:
pass
if (__name__ == "__main__"):
main()
def __init__(self):
self.set_global_vars()
self.rm_log_file()
def execute(self):
categorized_tests = self.categorize_tests()
results = self.run_tests(categorized_tests[TO_RUN])
self.print_pretty("failed", results[FAILED])
self.print_pretty("skipped", categorized_tests[SKIP])
self.print_pretty("passed", results[PASSED])
self.print_pretty("not found", categorized_tests[NOT_FOUND])
runner = TestRunner()
runner.execute()

View file

@ -10,46 +10,77 @@ Created on May 13, 2015
'''
from __future__ import print_function
import getopt
import sys
import xml.etree.ElementTree as ElementTree
from optparse import OptionParser
import re
class PrefixesInLog:
OK = "OK"
FAILED = "FAILED"
BEGIN = "BEGIN: "
END = "END: "
RUNNING = "Running "
TEST_TOOK = "Test took "
RESULT = "result: "
class TestInfo:
EXE = "UNKNOWN_COMPILED_FILE"
NAME = "UNKNOWN_CPP_FILE"
FAILED = "FAILED"
PASSED = "PASSED"
def __init__(self):
self.obj_is_valid = False
self.test_name = None
self.test_name = TestInfo.NAME
self.test_suite = TestInfo.EXE
self.test_comment = None
self.test_result = None
self.test_result = TestInfo.FAILED
self.test_duration = 0.0
def set_name(self, test_name):
self.obj_is_valid = True
self.test_suite, name = test_name.split("::", 1)
self.test_suite = self.test_suite[0: -4]
name = name.replace("::", ".")
self.test_name = name
def is_valid(self):
return self.obj_is_valid
self.test_name = test_name.replace("::", ".")
def set_exe_name(self, exe):
self.test_suite = exe if exe else TestInfo.EXE
def set_duration(self, milis):
self.test_duration = float(milis) / 1000
def set_test_result(self, result_string):
if result_string.startswith(PrefixesInLog.FAILED):
self.test_result = TestInfo.FAILED
self.append_comment(string_after_prefix(result_string, PrefixesInLog.FAILED))
elif result_string.startswith(PrefixesInLog.OK):
self.test_result = TestInfo.PASSED
def append_comment(self, comment):
if self.test_comment is None:
self.test_comment = comment
if not self.test_comment:
if comment.strip(): # if we don't have a comment to test yet, and the line we got is not an empty string
self.test_comment = comment
else:
self.test_comment += "\n" + comment
try:
self.test_comment = u"{old_comment}\n{comment}".format(old_comment=self.test_comment, comment=comment)
except Exception as ex:
print(comment)
print(type(ex))
sys.exit(2)
def is_empty(self):
return self.test_name == TestInfo.NAME and self.test_suite == TestInfo.EXE and self.test_comment
def __repr__(self):
local_comment = ""
if self.test_comment is not None:
local_comment = self.test_comment
local_comment = self.test_comment if self.test_comment else str()
return "{suite}::{name}: {comment} -> {result}\n".format(suite=self.test_suite,
name=self.test_name,
comment=local_comment,
@ -60,112 +91,153 @@ class TestInfo:
d = ElementTree.Element("testcase", {"name":self.test_name,
"classname":self.test_suite,
"time":str(self.test_duration)})
if self.test_comment is not None:
if self.test_comment:
b = ElementTree.SubElement(d, "system-err")
b.text = self.test_comment
if self.test_result == "FAILED":
if self.test_result == TestInfo.FAILED:
ElementTree.SubElement(d, "failure")
return d
def set_duration(self, milis):
self.test_duration = float(milis)
class Parser:
def __init__(self, logfile, xml_file):
self.logfile = logfile if logfile else "testlog.log"
self.xml_file = xml_file if xml_file else "test_results.xml"
self.logfile = logfile
self.xml_file = xml_file
self.current_exe = None
self.test_info = TestInfo()
self.var_should_pass = False
self.root = ElementTree.Element("testsuite")
def parse_log_file(self):
with open(self.logfile) as f:
test_info = None
for line in f.readlines():
if test_info == None:
test_info = TestInfo()
line = line.rstrip().decode('utf-8')
if line.startswith("Running"):
test_info.set_name(line[len("Running "):])
elif line.startswith("Test took"):
test_info.set_duration(line[len("Test took "):-3])
if test_info.is_valid():
self.root.append(test_info.xml())
test_info = None
elif line == "OK" or line.startswith("FAILED"):
test_info.test_result = line
if line.startswith("FAILED"):
test_info.append_comment(line[len("FAILED"):])
else:
test_info.append_comment(line)
def write_xml_file(self):
ElementTree.ElementTree(self.root).write(self.xml_file, encoding="UTF-8")
def parse_log_file(self):
with open(self.logfile) as f:
PipeEach(f.readlines()).through_functions(
self.check_for_exe_boundaries,
self.check_for_testcase_boundaries,
self.check_test_result,
self.should_pass,
self.append_to_comment
)
def usage():
print("""
Possbile options:
-h --help : print this help
-i --input : the path of the original log file to parse
-o --output: the path of the output xml file
def should_pass(self, line):
return self.var_should_pass
Example
./testlog_to_xml_converter.py -i testlog.log -o /Users/t.danshin/Desktop/testxml.xml
""")
def check_for_exe_boundaries(self, line):
if line.startswith(PrefixesInLog.BEGIN):
if self.current_exe: #if we never had an End to a Beginning
self.test_info = TestInfo()
self.append_to_xml()
self.var_should_pass = False
self.current_exe = string_after_prefix(line, PrefixesInLog.BEGIN)
return True
elif line.startswith(PrefixesInLog.END):
self.var_should_pass = False
parts = line.split(" | ")
end_exe = string_after_prefix(parts[0], PrefixesInLog.END)
result = int(string_after_prefix(parts[1], PrefixesInLog.RESULT))
if result != 0:
if not self.test_info:
self.test_info = TestInfo()
self.test_info.set_exe_name(end_exe)
self.test_info.set_test_result(TestInfo.FAILED)
self.append_to_xml()
self.current_exe = None
return True
return False
def check_for_testcase_boundaries(self, line):
if line.startswith(PrefixesInLog.RUNNING):
if not self.test_info:
self.test_info = TestInfo()
self.test_info.set_name(string_after_prefix(line, PrefixesInLog.RUNNING))
self.test_info.set_exe_name(self.current_exe)
return True
elif line.startswith(PrefixesInLog.TEST_TOOK):
self.test_info.set_duration(string_after_prefix(line, PrefixesInLog.TEST_TOOK, end=-3))
self.append_to_xml()
self.test_info = None
return True
return False
def check_test_result(self, line):
if line == PrefixesInLog.OK or line.startswith(PrefixesInLog.FAILED):
self.test_info.set_test_result(line)
return True
return False
def append_to_xml(self):
if self.test_info:
self.test_info.set_exe_name(self.current_exe)
self.root.append(self.test_info.xml())
def append_to_comment(self, line):
if self.test_info:
if line == "All tests passed." or re.match("\d{1,} tests failed", line, re.IGNORECASE):
self.var_should_pass = True
return False
self.test_info.append_comment(line)
return False
class PipeEach:
def __init__(self, iterable_param):
self.iterable_param = iterable_param
def through_functions(self, *fns):
for param in self.iterable_param:
param = param.rstrip().decode('utf-8')
for fn in fns:
if fn(param):
break
def string_after_prefix(line, prefix, end=None):
return line[len(prefix):end] if end else line[len(prefix):]
def read_cl_options():
try:
opts, args = getopt.getopt(sys.argv[1:], "hi:o:",
["help", "input=", "output="])
except getopt.GetoptError as err:
print(str(err))
usage()
sys.exit(2)
ret = {}
for option, argument in opts:
if option in ("-h", "--help"):
usage()
sys.exit()
elif option in ("-i", "--input"):
ret["logfile"] = argument
elif option in ("-o", "--output"):
ret["xml_file"] = argument
else:
assert False, "unhandled option"
return ret
parser = OptionParser()
parser.add_option("-o", "--output", dest="output", default="test_results.xml", help="resulting log file. Default testlog.log")
parser.add_option("-i", "--include", dest="input", default="testlog.log", help="The path to the original log file to parse")
(options, args) = parser.parse_args()
return options
def main():
options = read_cl_options()
try:
parser = Parser(options["logfile"], options["xml_file"])
except:
usage()
exit(2)
parser = Parser(options.input, options.output)
parser.parse_log_file()
parser.write_xml_file()