Now we can run tests in parallel.

However, we don't produce a parseable log so far. Need to add BEGIN: and END: | result: x.
This commit is contained in:
Timofey 2016-01-07 16:42:16 +03:00
parent 1823f88b3e
commit 1a1ee8a645
2 changed files with 60 additions and 76 deletions

View file

@ -1,137 +1,117 @@
from __future__ import print_function
__author__ = 't.danshin'
from optparse import OptionParser
import subprocess
import multiprocessing
from threading import Lock
from threading import Thread
import traceback
import logging
#The idea is that we should run integration tests in parallel. To that end, we need to first get all tests, create a queue and feed tests to it as soon as executors in that queue become free.
from run_desktop_tests import tests_on_disk
__author__ = 't.danshin'
class IntegrationRunner:
def __init__(self, exec_file):
self.exec_file = exec_file
print("Exec file " + exec_file)
self.workspace_path = "/Users/t.danshin/Documents/projects/omim-build-release/out/release"
def __init__(self):
self.process_cli()
self.proc_count = multiprocessing.cpu_count()
logging.info("Number of processors is: {}".format(self.proc_count))
self.file_lock = Lock()
self.queue_lock = Lock()
self.tests = list(self.get_tests_from_exec_file(exec_file, "--list_tests")[0])
self.tests.reverse()
print("Self.tests are: ")
print(self.tests)
self.tests = list()
# self.buckets = self.split_tests_into_buckets()
def run_tests(self):
for exec_file in self.runlist:
tests = list(self.get_tests_from_exec_file(exec_file, "--list_tests")[0])[::-1]
self.tests.extend(map(lambda x: (exec_file, x), tests))
# print("Bucket 1 = ")
# print(self.buckets[1])
self.file = open("python_log.log", "w")
self.file = open(self.output, "w")
self.run_parallel_tests()
self.file.close()
# self.exec_tests_in_bucket(self.buckets[1])
def run_parallel_tests(self):
threads = list()
for i in range(0, self.proc_count):
thread = Thread(target=self.exec_tests_in_bucket)
thread = Thread(target=self.exec_tests_in_queue)
thread.start()
threads.append(thread)
# for bucket in self.buckets:
# thread = Thread(target=self.exec_tests_in_bucket, args=(bucket,))
# thread.start()
# threads.append(thread)
for thread in threads:
thread.join()
self.file.close()
# def split_tests_into_buckets(self):
# test_buckets = list()
# for i in range(0, self.proc_count):
# test_buckets.append(list())
# i = 0
# for test in self.tests:
# test_bucket = i % len(test_buckets)
# print(">> " + str(test_bucket))
# test_buckets[test_bucket].append(test)
# i += 1
#
#
# return test_buckets
def exec_tests_in_bucket(self):
def exec_tests_in_queue(self):
while True:
try:
self.queue_lock.acquire()
if not len(self.tests):
print("Len of tests is: " + str(len(self.tests)))
print("Returning because nothing is left in the queue")
return
test = self.tests.pop()
test_file, test = self.tests.pop()
self.queue_lock.release()
print("Added test: " + test)
out, err = self.get_tests_from_exec_file(self.exec_file, '--filter={test}'.format(test=test))
print("Finished " + test)
print("Err: >> " + str(err))
print("Out: >> " + str(out))
try:
self.file_lock.acquire()
self.file.write(str(err))
finally:
self.file_lock.release()
self.exec_test(test_file, test)
except:
traceback.print_exc()
logging.error(traceback.format_exc())
finally:
if self.queue_lock.locked():
self.queue_lock.release()
# return
def exec_test(self, test_file, test):
out, err, result = self.get_tests_from_exec_file(test_file, '--filter={test}'.format(test=test))
try:
self.file_lock.acquire()
self.file.write("BEGIN: {}\n".format(test_file))
self.file.write(str(err))
self.file.write("\nEND: {} | result: {}\n\n".format(test_file, result))
self.file.flush()
finally:
self.file_lock.release()
def get_tests_from_exec_file(self, test, keys):
spell = "{tests_path}/{test} {keys}".format(tests_path=self.workspace_path, test=test, keys=keys)
print("Spell = " + spell)
process = subprocess.Popen(spell.split(" "),
# shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
out, err = process.communicate()
# process.wait()
result = process.returncode
print("out = " + str(out))
print("err = " + str(err))
return filter(lambda x: x, out.split("\n")), err, result
return (filter(lambda x: x, out.split("\n")), err)
def process_cli(self):
parser = OptionParser()
parser.add_option("-o", "--output", dest="output", default="testlog.log", help="resulting log file. Default testlog.log")
parser.add_option("-f", "--folder", dest="folder", default="omim-build-release/out/release", help="specify the folder where the tests reside (absolute path or relative to the location of this script)")
parser.add_option("-i", "--include", dest="runlist", action="append", default=[], help="Include test into execution, comma separated list with no spaces or individual tests, or both. E.g.: -i one -i two -i three,four,five")
(options, args) = parser.parse_args()
if not options.runlist:
logging.warn("You must provide the list of tests to run. This runner doesn't run all the tests it finds, only the ones you specify.")
exit(2)
self.workspace_path = options.folder
self.runlist = filter(lambda x: x in tests_on_disk(self.workspace_path), options.runlist)
self.output = options.output
def main():
exec_file = "pedestrian_routing_tests"
runner = IntegrationRunner(exec_file)
runner = IntegrationRunner()
runner.run_tests()
if __name__ == "__main__":
main()

View file

@ -97,7 +97,6 @@ class TestRunner:
except (urllib2.URLError, socket.timeout):
logging.info("Failed to stop the server...")
def categorize_tests(self):
tests_to_run = list()
@ -223,5 +222,10 @@ class TestRunner:
self.print_pretty("not found", categorized_tests[NOT_FOUND])
runner = TestRunner()
runner.execute()
def tests_on_disk(path):
return filter(lambda x: x.endswith("_tests"), listdir(path))
if __name__ == "__main__":
runner = TestRunner()
runner.execute()