From 1823f88b3e3002e013fc9d9054301c74a89344de Mon Sep 17 00:00:00 2001 From: Timofey Date: Tue, 5 Jan 2016 18:25:51 +0300 Subject: [PATCH 1/3] Doing the test parallelization in python. Now we need to be able to lock a file to make sure the log is in a consistent state. --- tools/integration_tests_runner.py | 137 ++++++++++++++++++++++++++++++ 1 file changed, 137 insertions(+) create mode 100644 tools/integration_tests_runner.py diff --git a/tools/integration_tests_runner.py b/tools/integration_tests_runner.py new file mode 100644 index 0000000000..d456b57a38 --- /dev/null +++ b/tools/integration_tests_runner.py @@ -0,0 +1,137 @@ +from __future__ import print_function + + +__author__ = 't.danshin' + +import subprocess +import multiprocessing +from threading import Lock +from threading import Thread +import traceback + +#The idea is that we should run integration tests in parallel. To that end, we need to first get all tests, create a queue and feed tests to it as soon as executors in that queue become free. + + + +class IntegrationRunner: + def __init__(self, exec_file): + self.exec_file = exec_file + print("Exec file " + exec_file) + self.workspace_path = "/Users/t.danshin/Documents/projects/omim-build-release/out/release" + self.proc_count = multiprocessing.cpu_count() + + self.file_lock = Lock() + self.queue_lock = Lock() + + self.tests = list(self.get_tests_from_exec_file(exec_file, "--list_tests")[0]) + self.tests.reverse() + print("Self.tests are: ") + print(self.tests) + + # self.buckets = self.split_tests_into_buckets() + + # print("Bucket 1 = ") + # print(self.buckets[1]) + + self.file = open("python_log.log", "w") + + + # self.exec_tests_in_bucket(self.buckets[1]) + + threads = list() + + for i in range(0, self.proc_count): + thread = Thread(target=self.exec_tests_in_bucket) + thread.start() + threads.append(thread) + + # for bucket in self.buckets: + # thread = Thread(target=self.exec_tests_in_bucket, args=(bucket,)) + # thread.start() + # threads.append(thread) + + for thread in threads: + thread.join() + + + self.file.close() + + # def split_tests_into_buckets(self): + # test_buckets = list() + # for i in range(0, self.proc_count): + # test_buckets.append(list()) + # i = 0 + # for test in self.tests: + # test_bucket = i % len(test_buckets) + # print(">> " + str(test_bucket)) + # test_buckets[test_bucket].append(test) + # i += 1 + # + # + # return test_buckets + + def exec_tests_in_bucket(self): + while True: + try: + self.queue_lock.acquire() + if not len(self.tests): + print("Len of tests is: " + str(len(self.tests))) + print("Returning because nothing is left in the queue") + return + + test = self.tests.pop() + self.queue_lock.release() + print("Added test: " + test) + + out, err = self.get_tests_from_exec_file(self.exec_file, '--filter={test}'.format(test=test)) + + print("Finished " + test) + print("Err: >> " + str(err)) + + print("Out: >> " + str(out)) + + try: + self.file_lock.acquire() + self.file.write(str(err)) + finally: + self.file_lock.release() + + except: + traceback.print_exc() + + finally: + if self.queue_lock.locked(): + self.queue_lock.release() + # return + + + + + + + def get_tests_from_exec_file(self, test, keys): + spell = "{tests_path}/{test} {keys}".format(tests_path=self.workspace_path, test=test, keys=keys) + + print("Spell = " + spell) + process = subprocess.Popen(spell.split(" "), + # shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + + out, err = process.communicate() +# process.wait() + + print("out = " + str(out)) + print("err = " + str(err)) + + return (filter(lambda x: x, out.split("\n")), err) + + +def main(): + exec_file = "pedestrian_routing_tests" + runner = IntegrationRunner(exec_file) + + +if __name__ == "__main__": + main() From 1a1ee8a64502a16ec77fdb18ef8fed467e9bab76 Mon Sep 17 00:00:00 2001 From: Timofey Date: Thu, 7 Jan 2016 16:42:16 +0300 Subject: [PATCH 2/3] Now we can run tests in parallel. However, we don't produce a parseable log so far. Need to add BEGIN: and END: | result: x. --- tools/integration_tests_runner.py | 126 +++++++++++++----------------- tools/run_desktop_tests.py | 10 ++- 2 files changed, 60 insertions(+), 76 deletions(-) diff --git a/tools/integration_tests_runner.py b/tools/integration_tests_runner.py index d456b57a38..780325efcc 100644 --- a/tools/integration_tests_runner.py +++ b/tools/integration_tests_runner.py @@ -1,137 +1,117 @@ from __future__ import print_function - - -__author__ = 't.danshin' - +from optparse import OptionParser import subprocess import multiprocessing from threading import Lock from threading import Thread import traceback +import logging -#The idea is that we should run integration tests in parallel. To that end, we need to first get all tests, create a queue and feed tests to it as soon as executors in that queue become free. +from run_desktop_tests import tests_on_disk +__author__ = 't.danshin' class IntegrationRunner: - def __init__(self, exec_file): - self.exec_file = exec_file - print("Exec file " + exec_file) - self.workspace_path = "/Users/t.danshin/Documents/projects/omim-build-release/out/release" + def __init__(self): + self.process_cli() + self.proc_count = multiprocessing.cpu_count() + logging.info("Number of processors is: {}".format(self.proc_count)) self.file_lock = Lock() self.queue_lock = Lock() - self.tests = list(self.get_tests_from_exec_file(exec_file, "--list_tests")[0]) - self.tests.reverse() - print("Self.tests are: ") - print(self.tests) + self.tests = list() - # self.buckets = self.split_tests_into_buckets() + def run_tests(self): + for exec_file in self.runlist: + tests = list(self.get_tests_from_exec_file(exec_file, "--list_tests")[0])[::-1] + self.tests.extend(map(lambda x: (exec_file, x), tests)) - # print("Bucket 1 = ") - # print(self.buckets[1]) - - self.file = open("python_log.log", "w") + self.file = open(self.output, "w") + self.run_parallel_tests() + self.file.close() - # self.exec_tests_in_bucket(self.buckets[1]) - + def run_parallel_tests(self): threads = list() for i in range(0, self.proc_count): - thread = Thread(target=self.exec_tests_in_bucket) + thread = Thread(target=self.exec_tests_in_queue) thread.start() threads.append(thread) - # for bucket in self.buckets: - # thread = Thread(target=self.exec_tests_in_bucket, args=(bucket,)) - # thread.start() - # threads.append(thread) - for thread in threads: thread.join() - self.file.close() - - # def split_tests_into_buckets(self): - # test_buckets = list() - # for i in range(0, self.proc_count): - # test_buckets.append(list()) - # i = 0 - # for test in self.tests: - # test_bucket = i % len(test_buckets) - # print(">> " + str(test_bucket)) - # test_buckets[test_bucket].append(test) - # i += 1 - # - # - # return test_buckets - - def exec_tests_in_bucket(self): + def exec_tests_in_queue(self): while True: try: self.queue_lock.acquire() if not len(self.tests): - print("Len of tests is: " + str(len(self.tests))) - print("Returning because nothing is left in the queue") return - test = self.tests.pop() + test_file, test = self.tests.pop() + self.queue_lock.release() - print("Added test: " + test) - - out, err = self.get_tests_from_exec_file(self.exec_file, '--filter={test}'.format(test=test)) - - print("Finished " + test) - print("Err: >> " + str(err)) - - print("Out: >> " + str(out)) - - try: - self.file_lock.acquire() - self.file.write(str(err)) - finally: - self.file_lock.release() - + self.exec_test(test_file, test) except: - traceback.print_exc() + logging.error(traceback.format_exc()) finally: if self.queue_lock.locked(): self.queue_lock.release() - # return - + def exec_test(self, test_file, test): + out, err, result = self.get_tests_from_exec_file(test_file, '--filter={test}'.format(test=test)) + try: + self.file_lock.acquire() + self.file.write("BEGIN: {}\n".format(test_file)) + self.file.write(str(err)) + self.file.write("\nEND: {} | result: {}\n\n".format(test_file, result)) + self.file.flush() + finally: + self.file_lock.release() def get_tests_from_exec_file(self, test, keys): spell = "{tests_path}/{test} {keys}".format(tests_path=self.workspace_path, test=test, keys=keys) - print("Spell = " + spell) process = subprocess.Popen(spell.split(" "), - # shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) out, err = process.communicate() -# process.wait() + result = process.returncode - print("out = " + str(out)) - print("err = " + str(err)) + return filter(lambda x: x, out.split("\n")), err, result - return (filter(lambda x: x, out.split("\n")), err) + + def process_cli(self): + parser = OptionParser() + parser.add_option("-o", "--output", dest="output", default="testlog.log", help="resulting log file. Default testlog.log") + parser.add_option("-f", "--folder", dest="folder", default="omim-build-release/out/release", help="specify the folder where the tests reside (absolute path or relative to the location of this script)") + parser.add_option("-i", "--include", dest="runlist", action="append", default=[], help="Include test into execution, comma separated list with no spaces or individual tests, or both. E.g.: -i one -i two -i three,four,five") + + (options, args) = parser.parse_args() + + if not options.runlist: + logging.warn("You must provide the list of tests to run. This runner doesn't run all the tests it finds, only the ones you specify.") + exit(2) + + self.workspace_path = options.folder + self.runlist = filter(lambda x: x in tests_on_disk(self.workspace_path), options.runlist) + self.output = options.output def main(): - exec_file = "pedestrian_routing_tests" - runner = IntegrationRunner(exec_file) - + runner = IntegrationRunner() + runner.run_tests() if __name__ == "__main__": main() diff --git a/tools/run_desktop_tests.py b/tools/run_desktop_tests.py index 55c4d6d304..cd5ed08cc2 100755 --- a/tools/run_desktop_tests.py +++ b/tools/run_desktop_tests.py @@ -97,7 +97,6 @@ class TestRunner: except (urllib2.URLError, socket.timeout): logging.info("Failed to stop the server...") - def categorize_tests(self): tests_to_run = list() @@ -223,5 +222,10 @@ class TestRunner: self.print_pretty("not found", categorized_tests[NOT_FOUND]) -runner = TestRunner() -runner.execute() +def tests_on_disk(path): + return filter(lambda x: x.endswith("_tests"), listdir(path)) + + +if __name__ == "__main__": + runner = TestRunner() + runner.execute() From 949fcc1e39e2ad76a56ce01d29164c759e0790e9 Mon Sep 17 00:00:00 2001 From: Timofey Date: Thu, 7 Jan 2016 18:53:51 +0300 Subject: [PATCH 3/3] PR fixes --- tools/integration_tests_runner.py | 37 ++++++++++++------------------- 1 file changed, 14 insertions(+), 23 deletions(-) diff --git a/tools/integration_tests_runner.py b/tools/integration_tests_runner.py index 780325efcc..2bcf234cf5 100644 --- a/tools/integration_tests_runner.py +++ b/tools/integration_tests_runner.py @@ -6,6 +6,11 @@ from threading import Lock from threading import Thread import traceback import logging +from os import path + + +from Queue import Queue + from run_desktop_tests import tests_on_disk @@ -20,14 +25,14 @@ class IntegrationRunner: logging.info("Number of processors is: {}".format(self.proc_count)) self.file_lock = Lock() - self.queue_lock = Lock() - self.tests = list() + self.tests = Queue() def run_tests(self): for exec_file in self.runlist: - tests = list(self.get_tests_from_exec_file(exec_file, "--list_tests")[0])[::-1] - self.tests.extend(map(lambda x: (exec_file, x), tests)) + tests = self.get_tests_from_exec_file(exec_file, "--list_tests")[0] + for test in tests: + self.tests.put((exec_file, test)) self.file = open(self.output, "w") self.run_parallel_tests() @@ -49,37 +54,26 @@ class IntegrationRunner: def exec_tests_in_queue(self): while True: try: - self.queue_lock.acquire() - if not len(self.tests): + if self.tests.empty(): return - test_file, test = self.tests.pop() - - self.queue_lock.release() + test_file, test = self.tests.get() self.exec_test(test_file, test) except: logging.error(traceback.format_exc()) - finally: - if self.queue_lock.locked(): - self.queue_lock.release() - def exec_test(self, test_file, test): out, err, result = self.get_tests_from_exec_file(test_file, '--filter={test}'.format(test=test)) - - try: - self.file_lock.acquire() + with self.file_lock: self.file.write("BEGIN: {}\n".format(test_file)) self.file.write(str(err)) self.file.write("\nEND: {} | result: {}\n\n".format(test_file, result)) self.file.flush() - finally: - self.file_lock.release() def get_tests_from_exec_file(self, test, keys): - spell = "{tests_path}/{test} {keys}".format(tests_path=self.workspace_path, test=test, keys=keys) + spell = "{test} {keys}".format(test=path.join(self.workspace_path, test), keys=keys) process = subprocess.Popen(spell.split(" "), stdout=subprocess.PIPE, @@ -109,9 +103,6 @@ class IntegrationRunner: self.output = options.output -def main(): +if __name__ == "__main__": runner = IntegrationRunner() runner.run_tests() - -if __name__ == "__main__": - main()