diff --git a/tools/ResponseProvider.py b/tools/ResponseProvider.py new file mode 100644 index 0000000000..5b8bb25143 --- /dev/null +++ b/tools/ResponseProvider.py @@ -0,0 +1,197 @@ +from __future__ import print_function + + +BIG_FILE_SIZE = 47684 + + +class Payload: + def __init__(self, message, response_code=200, headers=dict()): + self.__response_code = response_code + self.__message = message + self.__headers = headers + + + def response_code(self): + """ + Response code to send to the client. + """ + return self.__response_code + + + def message(self): + """ + The message to send to the client. + """ + return self.__message + + + def length(self): + """ + The length of the response. + """ + return len(self.message()) + + + def headers(self): + """ + The headers to be sent to the client. Please, note, that these do not include + the Content-Length header, which you need to send separately. + """ + return self.__headers + + + def __repr__(self): + return "{}: {}: {}".format(self.response_code(), self.length(), self.message()) + + +class ResponseProviderMixin: + """ +A mixin (basically, an interface) that the web-server that we might use relies on. + +In this implementation, the job of the web-server is just to get the request +(the url and the headers), and to send the response as it knows how. It isn't +its job to decide how to respond to what request. It is the job of the +ResponseProvider. + +In your web-server you should initialize the ResponseProvider, and ask it for + +response_for_url_and_headers(url, headers) + +Which will return a Payload object that the server must send as response. + +The server might be notified when a particular request has been received: + +got_pinged(self) - someone sent a ping request. The Response provider will +respond with "pong" and call this method of the server. You might want to +increment the count of active users, for ping is the request that new instances +of servers send to check if other servers are currently serving. + +kill(self) - someone sent the kill request, which means that that someone +no longer needs this server to serve. You might want to decrement the count of +active users and/or stop the server. +""" + + def dispatch_response(self, payload): + """ + Define this mehtod to dispatch the response received from the ResponseProvider + """ + raise NotImplementedError() + + + def got_pinged(self): + """ + A ping request has been received. In most scenarios it means that the number of + users of this server has increased by 1. + """ + raise NotImplementedError() + + + def kill(self): + """ + Someone no longer needs this server. Decrement the number of users and stop + the server if the number fell to 0. + """ + raise NotImplementedError() + + + + +class ResponseProvider: + + def __init__(self, delegate): + self.headers = list() + self.delegate = delegate + self.byterange = None + self.is_chunked = False + self.response_code = 200 + + + def pong(self): + self.delegate.got_pinged() + return Payload("pong") + + def my_id(self): + return Payload(str(os.getpid())) + + + + def response_for_url_and_headers(self, url, headers): + self.headers = headers + self.chunk_requested() + try: + return { + + "/unit_tests/1.txt" : self.test1, + "/unit_tests/notexisting_unittest": self.test_404, + "/unit_tests/permanent" : self.test_301, + "/unit_tests/47kb.file" : self.test_47_kb, + "/ping" : self.pong, + "/kill" : self.kill, + "/id" :self.my_id, + }[url]() + except: + return self.test_404() + + def chunk_requested(self): + if "range" in self.headers: + self.is_chunked = True + self.response_code = 206 + meaningful_string = self.headers["range"][6:] + first, last = meaningful_string.split("-") + self.byterange = (int(first), int(last)) + + + def trim_message(self, message): + if not self.is_chunked: + return message + return message[self.byterange[0]: self.byterange[1] + 1] + + + def test1(self): + init_message = "Test1" + message = self.trim_message(init_message) + size = len(init_message) + self.check_byterange(size) + headers = self.chunked_response_header(size) + + return Payload(message, self.response_code, headers) + + + def test_404(self): + return Payload("", response_code=404) + + + def test_301(self): + return Payload("", 301, {"Location" : "google.com"}) + + + def check_byterange(self, size): + if self.byterange is None: + self.byterange = (0, size) + + def chunked_response_header(self, size): + return {"Content-Range" : "bytes {start}-{end}/{out_of}".format(start=self.byterange[0], + end=self.byterange[1], + out_of=size)} + + + def test_47_kb(self): + self.check_byterange(BIG_FILE_SIZE) + headers = self.chunked_response_header(BIG_FILE_SIZE) + message = self.trim_message(self.message_for_47kb_file()) + + return Payload(message, self.response_code, headers) + + + def message_for_47kb_file(self): + message = [] + for i in range(0, BIG_FILE_SIZE + 1): + message.append(chr(i / 256)) + message.append(chr(i % 256)) + + return "".join(message) + + + def kill(self): + self.delegate.kill() + return Payload("Bye...") diff --git a/tools/SiblingKiller.py b/tools/SiblingKiller.py new file mode 100644 index 0000000000..3be8068c37 --- /dev/null +++ b/tools/SiblingKiller.py @@ -0,0 +1,163 @@ +from __future__ import print_function + +import re +import os +import urllib2 +import socket +from subprocess import Popen, PIPE + +class SiblingKiller: + + def __init__(self, port, ping_timeout): + self.all_processes = self.ps_dash_w() + self.all_pids = self.all_process_ids() + self.__allow_serving = False + self.__my_pid = self.my_process_id() + self.port = port + self.ping_timeout = ping_timeout + + + def allow_serving(self): + return self.__allow_serving + + + def kill_siblings(self): + """ + The idea is to get the list of all processes by the current user, check which one of them is using the port. + If there is such a process, let's wait for 10 seconds for it to start serving, if it doesn't, kill it. + + If there is NO such process, let's see if there is a process with the same name as us but with a lower process id. + We shall wait for 10 seconds for it to start, if it doesn't, kill it. + + If we are the process with the same name as ours and with the lowest process id, let's start serving and kill everyone else. + """ + self.kill_process_on_port() # changes __allow_serving to True if the process was alive and serving + + sibs = self.siblings() + + if self.__allow_serving: + self.kill(pids=sibs) + return + + for sibling in sibs: + if self.wait_for_server(): + serving_pid = self.serving_process_id() + if serving_pid: + self.kill(pids=map(lambda x: x != serving_pid, sibs)) + self.__allow_serving = False + return + else: + self.kill(pid=sibling) + + + def kill(self, pid=0, pids=list()): + if not pid and not pids: + return + if pid and pids: + raise Exception("Use either one pid or multiple pids") + + hitlist = "" + if pid: + hitlist = str(pid) + if pids: + hitlist = " ".join(map(str, pids)) + + command = "kill -9 {hitlist}".format(hitlist=hitlist) + self.exec_command(command) + + + def siblings(self): + my_name = self.my_process_name() + return filter(lambda x: x < self.__my_pid, + map(lambda x: int(x.split(" ")[1]), + filter(lambda x: my_name in x, self.all_processes))) + + + def kill_process_on_port(self): + process_on_port = self.process_using_port(self.port) + + if not self.wait_for_server(): + self.kill(pid=process_on_port) + self.__allow_serving = True + + + def all_process_ids(self): + pid = lambda x: int(x.split(" ")[1]) + return map(pid, self.all_processes) + + + def process_using_port(self, port): + lsof = lambda x : (self.exec_command("lsof -a -p{process} -i4".format(process=str(x))), x) #ignore the return code + listenning_on_port = lambda (info_line, id) : info_line and info_line.endswith("(LISTEN)") and str(port) in info_line + ids = lambda (info_line, id) : id + + listening_process = map(ids, filter(listenning_on_port, map(lsof, self.all_pids))) + + if len(listening_process) > 1: + pass + # We should panic here + + if not listening_process: + return None + + return listening_process[0] + + + def my_process_id(self): + return os.getpid() + + + def my_process_name(self): + return map(lambda x : x[1], + filter(lambda x: x[0] == str(self.__my_pid), + map(lambda x: x.split(" ", 7)[1:8:6], self.all_processes)))[0] + + + def ps_dash_w(self): + not_header = lambda x: x and not x.startswith("UID") + return filter(not_header, self.gen_to_list(re.sub("\s{1,}", " ", x.strip()) for x in self.exec_command("ps -f").split("\n"))) + + + def wait_for_server(self): + for i in range(0, 2): + if self.ping(): # unsuccessful ping takes 5 seconds (look at PING_TIMEOUT) + return True + return False + + + def ping(self): + html = str() + try: + response = urllib2.urlopen('http://localhost:{port}/ping'.format(port=self.port), timeout=self.ping_timeout); + html = response.read() + except (urllib2.URLError, socket.timeout): + pass + + return html == "pong" + + + def serving_process_id(self): + resp = str() + try: + response = urllib2.urlopen('http://localhost:{port}/id'.format(port=PORT), timeout=self.ping_timeout); + resp = response.read() + id = int(resp) + return id + + except: + print("Couldn't get id of a serving process (the PID of the server that responded to pinging)") + return None + + + def gen_to_list(self, generator): + l = list() + for i in generator: + l.append(i) + return l + + + def exec_command(self, command): + p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE) + output, err = p.communicate() + p.wait() + return output diff --git a/tools/run_desktop_tests.py b/tools/run_desktop_tests.py index 0e5572c4b5..29081ad410 100755 --- a/tools/run_desktop_tests.py +++ b/tools/run_desktop_tests.py @@ -21,6 +21,7 @@ from __future__ import print_function from optparse import OptionParser from os import listdir, remove from os.path import isfile, join +from random import shuffle import socket import subprocess import sys @@ -109,6 +110,8 @@ class TestRunner: tests_to_run = filter(lambda x: x not in local_skiplist, test_files_in_dir) else: tests_to_run = filter(on_disk, self.runlist) + shuffle(tests_to_run) + not_found = filter(not_on_disk, self.runlist) return {TO_RUN:tests_to_run, SKIP:local_skiplist, NOT_FOUND:not_found} @@ -180,4 +183,3 @@ class TestRunner: runner = TestRunner() runner.execute() - diff --git a/tools/testserver.py b/tools/testserver.py index 36e08135df..bba5d3ec6d 100644 --- a/tools/testserver.py +++ b/tools/testserver.py @@ -1,7 +1,6 @@ """ This is a simple web-server that does very few things. It is necessary for -the downloader tests. Currently it works only for a subset of tests, it -doesn't yet work for chunked downloads. +the downloader tests. Here is the logic behind the initialization: Because several instances of the test can run simultaneously on the Build @@ -10,41 +9,44 @@ one is already running. However, there is a chance that a server will not terminate correctly, and will still hold the port, so we will not be able to initialize another server. -So, before initalizing a new server we "ping" it. If it replies with "pong", -we know that a working instance is already running, and we do not start a new -one. +So before initializing the server, we check if any processes are using the port +that we want to use. If we find such a process, we assume that it might be +working, and wait for about 10 seconds for it to start serving. If it does not, +we kill it. -If it doesn't reply with pong, it might mean that it is either not running -at all, or it is running, but dead. In this case, we try to init a server, -and if we catch an exception, we kill all other processes that have the -word "testserver.py" in the name, but whose ids are not our id, and then -we init the server once again. If that fails, there is nothing we can do -about it, so we don't catch any exceptions there. +Next, we check the name of our process and see if there are other processes +with the same name. If there are, we assume that they might start serving any +moment. So we iterate over the ones that have PID lower than ours, and wait +for them to start serving. If a process doesn't serve, we kill it. -Another thing to note is that you cannot stop the server from the same thread -as it was started from, and the only way I found possible to kill it is from a -timer. +If we have killed (or someone has) all the processes with PIDs lower than ours, +we try to start serving. If we succeed, we kill all other processes with the +same name as ours. If we don't someone else will kill us. """ + + from __future__ import print_function from BaseHTTPServer import BaseHTTPRequestHandler from BaseHTTPServer import HTTPServer -import cgi -from numpy.distutils.exec_command import exec_command -import os -import re -import socket -from subprocess import Popen, PIPE -import sys -import thread +from ResponseProvider import Payload +from ResponseProvider import ResponseProvider +from ResponseProvider import ResponseProviderMixin +from SiblingKiller import SiblingKiller from threading import Timer +import BaseHTTPServer +import os +import socket import threading -import time -import types -import urllib2 -from scipy.stats.stats import trim_mean +import traceback + +try: + from tornado_handler import MainHandler + USE_TORNADO = True +except: + USE_TORNADO = False PORT = 34568 @@ -56,15 +58,12 @@ PING_TIMEOUT = 5 # Nubmer of seconds to wait for ping response class InternalServer(HTTPServer): - - def kill_me(self): - print("The server's life has come to an end") self.shutdown() def reset_selfdestruct_timer(self): - if self.self_destruct_timer is not None: + if self.self_destruct_timer: self.self_destruct_timer.cancel() self.self_destruct_timer = Timer(LIFESPAN, self.kill_me) @@ -73,10 +72,12 @@ class InternalServer(HTTPServer): def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True): - self.self_destruct_timer = None - self.clients = 1 + HTTPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=bind_and_activate) + + self.self_destruct_timer = None + self.clients = 1 self.reset_selfdestruct_timer() @@ -92,206 +93,114 @@ class InternalServer(HTTPServer): class TestServer: - def __init__(self): + + self.may_serve = False + + pid = os.getpid() + print("Init server. Pid: {}".format(pid)) + self.server = None - html = str() - try: - print("Pinging the server...") - response = urllib2.urlopen('http://localhost:{port}/ping'.format(port=PORT), timeout=PING_TIMEOUT); - html = response.read() - except (urllib2.URLError, socket.timeout): - print("The server does not currently serve...") - - if html != "pong": - print("html != pong") + + killer = SiblingKiller(PORT, PING_TIMEOUT) + killer.kill_siblings() + if killer.allow_serving(): try: self.init_server() - except socket.error: - print("Killing siblings") - self.kill_siblings() - time.sleep(1) - self.init_server() + print("Started server with pid: {}".format(pid)) + self.may_serve = True + except socket.error: + print("Failed to start the server: Port is in use") + except Exception as e: + print(e) + print("Failed to start serving for unknown reason") + traceback.print_exc() + else: + print("Not allowed to start serving for process: {}".format(pid)) def init_server(self): - self.server = InternalServer(('localhost', PORT), PostHandler) + + if USE_TORNADO: + MainHandler.init_server(PORT, LIFESPAN) + else: + print(""" +************* +WARNING: Using the python's built-in BaseHTTPServer! +It is all right if you run the tests on your local machine, but if you are running tests on a server, +please consider installing Tornado. It is a much more powerful web-server. Otherwise you will find +that some of your downloader tests either fail or hang. + +do + +sudo pip install tornado + +or go to http://www.tornadoweb.org/en/stable/ for more detail. +************* +""") + + self.server = InternalServer(('localhost', PORT), PostHandler) + def start_serving(self): - if self.server is not None: + if not self.may_serve: + return + + if USE_TORNADO: + MainHandler.start_serving() + + else: thread = threading.Thread(target=self.server.serve_forever) thread.deamon = True thread.start() - def exec_command(self, command): - print(command) - p = Popen(command.split(" "), shell=True, stdout=PIPE, stderr=PIPE) - output, err = p.communicate() - p.wait() - return output[0] - - - def kill_siblings(self): - output = gen_to_list(re.sub("\s{1,}", " ", x.strip()) for x in exec_command("ps -w")[1].split("\n")) - - my_pid = str(os.getpid()) - - my_name = map(lambda x: x.split(" ")[4], # the name of python script - filter(lambda x: x.startswith(my_pid), output))[0] - - siblings = filter(lambda x: x != my_pid, - map(lambda x: x.split(" ")[0], - filter(lambda x: my_name in x, output))) - - if len(siblings) > 0: - command = "kill {siblings}".format(siblings=" ".join(siblings)) - exec_command(command) - else: - print("The process has no siblings") - - -def gen_to_list(generator): - l = [] - for i in generator: - l.append(i) - return l - - -BIG_FILE_SIZE = 47684 - -class PostHandler(BaseHTTPRequestHandler): +class PostHandler(BaseHTTPRequestHandler, ResponseProviderMixin): + + def dispatch_response(self, payload): + + self.send_response(payload.response_code()) + for h in payload.headers(): + self.send_header(h, payload.headers()[h]) + self.send_header("Content-Length", payload.length()) + self.end_headers() + self.wfile.write(payload.message()) + def init_vars(self): - self.byterange = None - self.is_chunked = False - self.response_code = 200 + self.response_provider = ResponseProvider(self) def do_POST(self): self.init_vars() self.server.reset_selfdestruct_timer() - print("URL is: " + self.path) - ctype, pdict = cgi.parse_header(self.headers.getheader('content-type')) - - if ctype == 'multipart/form-data': - self.send_response(500) - self.end_headers() + length = int(self.headers.getheader('content-length')) + self.dispatch_response(Payload(self.rfile.read(length))) - elif ctype == 'application/json': - length = int(self.headers.getheader('content-length')) - data = self.rfile.read(length) - - self.send_response(200) - self.send_header("Content-Length", length) - self.end_headers() - - self.wfile.write(data) - - def do_GET(self): + headers = self.prepare_headers() self.init_vars() - self.chunk_requested() - - self.server.reset_selfdestruct_timer() - switch = {"/unit_tests/1.txt": self.test1, - "/unit_tests/notexisting_unittest": self.test_404, - "/unit_tests/permanent" : self.test_301, - "/unit_tests/47kb.file" : self.test_47_kb, - "/ping" : self.pong, - "/kill": self.kill, - } - switch[self.path]() + self.dispatch_response(self.response_provider.response_for_url_and_headers(self.path, headers)) - def chunk_requested(self): - the_range = self.headers.get('Range') - if the_range is not None: - self.is_chunked = True - self.response_code = 206 - meaningful_string = the_range[6:] - first, last = meaningful_string.split("-") - self.byterange = (int(first), int(last)) + def prepare_headers(self): + ret = dict() + for h in self.headers: + ret[h] = self.headers.get(h) + return ret + - - def trim_message(self, message): - if not self.is_chunked: - return message - m = message[self.byterange[0]: self.byterange[1] + 1] - return m - - - def pong(self): + def got_pinged(self): self.server.clients += 1 - self.send_response(200) - self.end_headers() - self.wfile.write("pong") - - - def message_for_47kb_file(self): - message = [] - for i in range(0, BIG_FILE_SIZE + 1): - message.append(chr(i / 256)) - message.append(chr(i % 256)) - - return "".join(message) - - - def test1(self): - message = "Test1" - message = self.trim_message(message) - - self.send_response(self.response_code) - self.send_header("Content-Length", len(message)) - self.end_headers() - self.wfile.write(message) - - - def test_404(self): - self.send_response(404) - self.end_headers() - - - def test_301(self): - self.send_response(301) - self.send_header("Location", "google.com") - self.end_headers() - - - def test_47_kb(self): - self.send_response(self.response_code) - length = BIG_FILE_SIZE - if self.byterange is None: - self.byterange = (0, BIG_FILE_SIZE) - else: - length = min([length, self.byterange[1] - self.byterange[0] + 1]) - - self.send_header("Content-Length", length) - if self.byterange is not None: - self.send_header("Content-Range", - "bytes {start}-{end}/{out_of}".format(start=self.byterange[0], - end=self.byterange[1], - out_of=BIG_FILE_SIZE)) - self.end_headers() - - message = self.message_for_47kb_file() - - if message is None: - print("The message is none for some reason") - self.wfile.write(self.message_for_47kb_file()[self.byterange[0] : self.byterange[1] + 1]) def kill(self): - message = "Bye..." - self.send_response(200) - self.send_header("Content-Length", len(message)) - self.end_headers() - self.wfile.write(message) self.server.suicide() if __name__ == '__main__': + server = TestServer() server.start_serving() diff --git a/tools/tornado_handler.py b/tools/tornado_handler.py new file mode 100644 index 0000000000..9ea17ff313 --- /dev/null +++ b/tools/tornado_handler.py @@ -0,0 +1,85 @@ +from __future__ import print_function + +from ResponseProvider import Payload +from ResponseProvider import ResponseProvider +from ResponseProvider import ResponseProviderMixin +from threading import Timer +import threading +import tornado.ioloop +import tornado.web + + +class MainHandler(tornado.web.RequestHandler, ResponseProviderMixin): + + ping_count = 1 + self_destruct_timer = None + + def got_pinged(self): + MainHandler.ping_count += 1 + + + def kill(self): + MainHandler.ping_count -= 1 + if MainHandler.ping_count <= 0: #so that if we decrease the value from several threads we still kill it. + tornado.ioloop.IOLoop.current().stop() + if MainHandler.self_destruct_timer: + MainHandler.self_destruct_timer.cancel() + + + def dispatch_response(self, payload): + self.set_status(payload.response_code()) + for h in payload.headers(): + self.add_header(h, payload.headers()[h]) + self.add_header("Content-Length", payload.length()) + self.write(payload.message()) + + + def prepare_headers(self): + ret = dict() + for h in self.request.headers: + ret[h.lower()] = self.request.headers.get(h) + + return ret + + + def init_vars(self): + self.response_provider = ResponseProvider(self) + self.headers = self.prepare_headers() + + + def prepare(self): + MainHandler.reset_self_destruct_timer() + self.init_vars() + + + def get(self, param): + self.dispatch_response(self.response_provider.response_for_url_and_headers(self.request.uri, self.headers)) + + + def post(self, param): + self.dispatch_response(Payload(self.request.body)) + + + @staticmethod + def reset_self_destruct_timer(): + if MainHandler.self_destruct_timer: + MainHandler.self_destruct_timer.cancel() + MainHandler.self_destruct_timer = Timer(MainHandler.lifespan, tornado.ioloop.IOLoop.current().stop) + MainHandler.self_destruct_timer.start() + + + @staticmethod + def start_serving(): + thread = threading.Thread(target=tornado.ioloop.IOLoop.current().start) + thread.deamon = True + thread.start() + + + @staticmethod + def init_server(port, lifespan): + MainHandler.lifespan = lifespan + MainHandler.reset_self_destruct_timer() + application = tornado.web.Application([ + (r"/(.*)", MainHandler), + ]) + application.listen(port)