From 4ee91f375465750c063897c6b4879a8e6e36ac0d Mon Sep 17 00:00:00 2001 From: Maksim Andrianov Date: Fri, 5 Apr 2019 19:26:27 +0300 Subject: [PATCH] [generator] Supported new booking api. --- tools/python/booking_hotels.py | 499 ++++++++++++++++++++------------- tools/unix/generate_planet.sh | 2 +- 2 files changed, 301 insertions(+), 200 deletions(-) diff --git a/tools/python/booking_hotels.py b/tools/python/booking_hotels.py index cac15b2443..023d26a2a6 100755 --- a/tools/python/booking_hotels.py +++ b/tools/python/booking_hotels.py @@ -1,247 +1,348 @@ #!/usr/bin/env python # coding: utf8 -from __future__ import print_function - -from collections import defaultdict -from datetime import datetime import argparse -import base64 -import eviltransform -import json +import datetime import logging import os -import pickle -import time -import urllib2 +import statistics +import sys +from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor, as_completed +from functools import partial +from multiprocessing.pool import ThreadPool +from random import randint +from threading import Event +from time import sleep -# Initialize logging. -logging.basicConfig(level=logging.DEBUG, format='[%(asctime)s] %(levelname)s: %(message)s') +import eviltransform +import math +import requests +from ratelimit import limits, sleep_and_retry +from tqdm import tqdm -# Names starting with '.' are calculated in get_hotel_field() below. -HOTEL_FIELDS = ('hotel_id', '.lat', '.lon', 'name', 'address', 'class', '.rate', 'ranking', 'review_score', 'url', 'hoteltype_id', '.trans') +LIMIT_REQUESTS_PER_MINUTE = 400 +ATTEMPTS_COUNT = 10 +MAX_LIMIT_WAIT_AFTER_429_ERROR_SECONDS = 120 +SUPPORTED_LANGUAGES = ("en", "ru", "ar", "cs", "da", "nl", "fi", "fr", "de", + "hu", "id", "it", "ja", "ko", "pl", "pt", "ro", "es", + "sv", "th", "tr", "uk", "vi", "zh", "he", "sk", "el") + + +class AppError(Exception): + pass + + +class HTTPError(AppError): + pass + + +class AttemptsSpentError(AppError): + pass class BookingApi: - def __init__(self, login, password): - self.login = login - self.password = password - self.baseConfig = { - "headers": { - "Content-Type": "application/json", - "Authorization": "Basic " + base64.encodestring( - "{login}:{password}".format(login=self.login, password=self.password)).replace('\n', '') - }, - "url": 'https://distribution-xml.booking.com/json/bookings'} - self.checkMinute = 0 - self.requestPerMinute = 0 - self.requestLimit = 15 # request per minute + _ENDPOINTS = ( + "countries", + "hotels" + ) - def call(self, function, params=None): - self.requestPerMinute += 1 - now = datetime.utcnow() + def __init__(self, login, password, version): + major_minor = version.split(".") + assert len(major_minor) == 2 + assert int(major_minor[0]) >= 2 + assert 0 <= int(major_minor[1]) <= 4 - if self.requestPerMinute >= self.requestLimit: - waittime = 60 - now.second - logging.info("Limit for request per minute exceeded. Waiting for: {0} sec.".format(waittime)) - time.sleep(waittime) - now = datetime.utcnow() + self._event = Event() + self._event.set() + self._login = login + self._password = password + self._base_url = f"https://distribution-xml.booking.com/{version}/json" + self._set_endpoints() - if self.checkMinute != now.minute: - self.requestPerMinute = 0 - self.checkMinute = now.minute - - payload = '' + @sleep_and_retry + @limits(calls=LIMIT_REQUESTS_PER_MINUTE, period=60) + def call_endpoint(self, endpoint, **params): + self._event.wait() try: - p = "" if not params else '?' + "&".join( - ["{key}={value}".format(key=k, value=v) for (k, v) in params.iteritems()]) - url = "{base}.{func}{params}".format(base=self.baseConfig["url"], func=function, params=p) - logging.debug("{0} {1} API call:{2}".format(self.checkMinute, self.requestPerMinute, url)) - request = urllib2.Request(url, None, self.baseConfig["headers"]) - stream = urllib2.urlopen(request) - payload = stream.read() - data = json.loads(payload) - if isinstance(data, dict) and 'message' in data and 'code' in data: - logging.error('Api call failed with error: {0} Code: {1}'.format(data['message'], data['code'])) - return None - return data - + attempts = ATTEMPTS_COUNT + while attempts: + attempts -= 1 + response = requests.post(f"{self._base_url}/{endpoint}", + auth=(self._login, self._password), + params=params) + if response.status_code == 200: + data = response.json() + return data["result"] + else: + self._handle_errors(response) + raise AttemptsSpentError(f"{ATTEMPTS_COUNT} attempts were spent.") except Exception as e: - logging.error('Error: {0} Context: {1}'.format(e, payload)) - return None + if not self._event.is_set(): + self._event.set() + raise e + + def _handle_errors(self, response, ): + error_message = "" + try: + data = response.json() + error_message = ",".join(x["message"] for x in data["errors"]) + except: + pass + if response.status_code == 429: + self._event.clear() + wait_seconds = randint(0, MAX_LIMIT_WAIT_AFTER_429_ERROR_SECONDS) + sleep(wait_seconds) + logging.warning(f"Http error {response.status_code}: {error_message}. " + f"It waits {wait_seconds} seconds and tries again.") + self._event.set() + else: + raise HTTPError( + f"Http error with code {response.status_code}: {error_message}.") + + def _set_endpoints(self): + for endpoint in BookingApi._ENDPOINTS: + setattr(self, endpoint, partial(self.call_endpoint, endpoint)) -def download(user, password, path): - ''' - Downloads all hotels from booking.com and stores them in a bunch of .pkl files. - ''' - api = BookingApi(user, password) +class BookingListApi: + _ROWS_BY_REQUEST = 1000 + _ENDPOINTS = ( + "countries", + "hotels" + ) - maxrows = 1000 - countries = api.call("getCountries", dict(languagecodes='en')) - for country in countries: - countrycode = country['countrycode'] - logging.info(u'Download[{0}]: {1}'.format(countrycode, country['name'])) + def __init__(self, api): + self.api = api + self._set_endpoints() - allhotels = {} - while True: - hotels = api.call('getHotels', - dict(new_hotel_type=1, offset=len(allhotels), rows=maxrows, countrycodes=countrycode)) + def call_endpoint(self, endpoint, **params): + return self._call_simple_endpoint(endpoint, **params) - # Check for error. - if hotels is None: - logging.critical('No hotels downloaded for country {0}'.format(country['name'])) - break - - for h in hotels: - allhotels[h['hotel_id']] = h - - # If hotels in answer less then maxrows, we reach end of data. - if len(hotels) < maxrows: - break - - if not hotels: - continue - - # Now the same for hotel translations + def _call_simple_endpoint(self, endpoint, **params): + result = [] offset = 0 while True: - hotels = api.call('getHotelTranslations', dict(offset=offset, rows=maxrows, countrycodes=countrycode)) - if hotels is None: - exit(1) - - # Add translations for each hotel - for h in hotels: - if h['hotel_id'] in allhotels: - if 'translations' not in allhotels[h['hotel_id']]: - allhotels[h['hotel_id']]['translations'] = {} - allhotels[h['hotel_id']]['translations'][h['languagecode']] = {'name': h['name'], 'address': h['address']} - - offset += len(hotels) - if len(hotels) < maxrows: + resp = self._call_endpoint_offset(offset, endpoint, **params) + result.extend(resp) + if len(resp) < BookingListApi._ROWS_BY_REQUEST: break + offset += BookingListApi._ROWS_BY_REQUEST + return result - logging.info('Num of hotels: {0}, translations: {1}'.format(len(allhotels), offset)) - filename = os.path.join(path, - '{0} - {1}.pkl'.format(country['area'].encode('utf8'), country['name'].encode('utf8'))) - with open(filename, 'wb') as fd: - pickle.dump(allhotels.values(), fd, pickle.HIGHEST_PROTOCOL) + def _call_endpoint_offset(self, offset, endpoint, **params): + r = self.api.call_endpoint(endpoint, **{ + "offset": offset, + "rows": BookingListApi._ROWS_BY_REQUEST, + **params + }) + if not isinstance(r, list): + raise TypeError(f"Result has unexpected type {type(r)}") + return r + + def _set_endpoints(self): + for endpoint in BookingListApi._ENDPOINTS: + setattr(self, endpoint, partial(self.call_endpoint, endpoint)) -def translate(source, output): - ''' - Reads *.pkl files and produces a single list of hotels as tab separated values. - ''' - files = [os.path.join(source, filename) - for filename in os.listdir(source) if filename.endswith('.pkl')] +class BookingGen: + def __init__(self, api, country): + self.api = api + self.country_code = country["country"] + self.country_name = country["name"] + logging.info(f"Download[{self.country_code}]: {self.country_name}") - data = [] - for filename in sorted(files): - logging.info('Processing {0}'.format(filename)) - with open(filename, 'rb') as fd: - data += pickle.load(fd) + extras = ["hotel_info", "hotel_description", "room_info"] + self.hotels = self._download_hotels(extras) + self.translations = self._download_translations() + self.currency_medians = self._currency_medians_by_cities() - # Fix chinese coordinates - for hotel in data: - if hotel['countrycode'] == 'cn' and 'location' in hotel: - try: - hotel['location']['latitude'], hotel['location']['longitude'] = eviltransform.gcj2wgs_exact( - float(hotel['location']['latitude']), float(hotel['location']['longitude'])) - except ValueError: - # We don't care if there were errors converting coordinates to float - pass + def generate_csv_rows(self, sep="\t"): + self._fix_hotels() + return (self._create_csv_hotel_line(hotel, sep) for hotel in self.hotels) - # Dict of dicts city_id -> { currency -> [prices] } - cities = defaultdict(lambda: defaultdict(list)) + @staticmethod + def _get_hotel_min_price(hotel): + prices = (float(x["room_info"]["min_price"]) for x in hotel["room_data"]) + flt = filter(lambda x: math.isclose(x, 0.0), + prices) + try: + return min(flt) + except ValueError: + return None - def valid(hotel): - return 'city_id' in hotel and 'currencycode' in hotel and 'minrate' in hotel and hotel['minrate'] is not None + @staticmethod + def _format_string(s): + s = s.strip() + for x in (("\t", " "), ("\n", " "), ("\r", "")): + s = s.replace(*x) + return s - # Collect prices - for hotel in data: - if valid(hotel): - cities[hotel['city_id']][hotel['currencycode']].append(float(hotel['minrate'])) + def _download_hotels(self, extras, lang="default"): + return self.api.hotels(country_ids=self.country_code, language=lang, + extras=extras) - # Replaces list of prices by a median price. - for city in cities: - for cur in cities[city]: - cities[city][cur] = sorted(cities[city][cur])[len(cities[city][cur]) / 2] + def _download_translations(self): + extras = ["hotel_info", ] + translations = defaultdict(dict) + with ThreadPoolExecutor(max_workers=len(SUPPORTED_LANGUAGES)) as executor: + m = {executor.submit(self._download_hotels, extras, lang): lang + for lang in SUPPORTED_LANGUAGES} + for future in as_completed(m): + lang = m[future] + hotels = future.result() + for hotel in hotels: + hotel_id = hotel["hotel_id"] + hotel_data = hotel["hotel_data"] + translations[hotel_id][lang] = { + "name": BookingGen._format_string(hotel_data["name"]), + "address": BookingGen._format_string(hotel_data["address"]) + } + return translations - # Price rate ranges, relative to the median price for a city - rates = (0.7, 1.3) + def _fix_hotels(self): + if self.country_code == "ch": + for hotel in self.hotels: + hotel_data = hotel["hotel_data"] + location = hotel_data["location"] + try: + location["latitude"], location["longitude"] = eviltransform.gcj2wgs_exact( + float(location["latitude"]), float(location["longitude"]) + ) + except ValueError: + logging.exception(f"Converting error {location}") - def get_hotel_field(hotel, field, rate): - if field == '.lat': - return hotel['location']['latitude'] - elif field == '.lon': - return hotel['location']['longitude'] - elif field == '.rate': - return rate - elif field == '.trans': - # Translations are packed into a single column: lang1|name1|address1|lang2|name2|address2|... - if 'translations' in hotel: - tr_list = [] - for tr_lang, tr_values in hotel['translations'].items(): - tr_list.append(tr_lang) - tr_list.extend([tr_values[e] for e in ('name', 'address')]) - return '|'.join([s.replace('|', ';') for s in tr_list]) - else: - return '' - elif field in hotel: - return hotel[field] - elif field == 'ranking': - # This field is not used yet, and booking.com sometimes blocks it. - return '' - logging.error('Unknown hotel field: {0}, URL: {1}'.format(field, hotel['url'])) - return '' + def _currency_medians_by_cities(self): + cities = defaultdict(lambda: defaultdict(list)) + for hotel in self.hotels: + hotel_data = hotel["hotel_data"] + city_id = hotel_data["city_id"] + currency = hotel_data["currency"] + price = BookingGen._get_hotel_min_price(hotel) + if price is not None: + cities[city_id][currency].append(price) - with open(output, 'w') as fd: - for hotel in data: - rate = 0 - if valid(hotel): - avg = cities[hotel['city_id']][hotel['currencycode']] - price = float(hotel['minrate']) - rate = 1 - # Find a range that contains the price - while rate <= len(rates) and price > avg * rates[rate - 1]: - rate += 1 - l = [get_hotel_field(hotel, e, rate) for e in HOTEL_FIELDS] - print('\t'.join([unicode(f).encode('utf8').replace('\t', ' ').replace('\n', ' ').replace('\r', '') for f in l]), file=fd) + for city in cities: + for currency in cities[city]: + cities[city][currency] = statistics.median(cities[city][currency]) + return cities + + def _get_rate(self, hotel): + # Price rate ranges, relative to the median price for a city + rates = (0.7, 1.3) + rate = 0 + hotel_data = hotel["hotel_data"] + city_id = hotel_data["city_id"] + currency = hotel_data["currency"] + price = BookingGen._get_hotel_min_price(hotel) + if price is not None: + avg = self.currency_medians[city_id][currency] + rate = 1 + # Find a range that contains the price + while rate <= len(rates) and price > avg * rates[rate - 1]: + rate += 1 + return rate + + def _get_translations(self, hotel): + try: + tr = self.translations[hotel["hotel_id"]] + except KeyError: + return "" + + hotel_data = hotel["hotel_data"] + name = hotel_data["name"] + address = hotel_data["address"] + tr_ = defaultdict(dict) + for k, v in tr.items(): + n = v["name"] if v["name"] != name else "" + a = v["address"] if v["address"] != address else "" + if a or n: + tr_[k]["name"] = n + tr_[k]["address"] = a + + tr_list = [] + for tr_lang, tr_values in tr_.items(): + tr_list.append(tr_lang) + tr_list.extend([tr_values[e] for e in ("name", "address")]) + return "|".join(s.replace("|", ";") for s in tr_list) + + def _create_csv_hotel_line(self, hotel, sep="\t"): + hotel_data = hotel["hotel_data"] + location = hotel_data["location"] + row = ( + hotel["hotel_id"], + f"{location['latitude']:.6f}", + f"{location['longitude']:.6f}", + hotel_data["name"], + hotel_data["address"], + hotel_data["class"], + self._get_rate(hotel), + hotel_data["ranking"], + hotel_data["review_score"], + hotel_data["url"], + hotel_data["hotel_type_id"], + self._get_translations(hotel) + ) + return sep.join(BookingGen._format_string(str(x)) for x in row) + + +def download_hotels_by_country(api, country): + generator = BookingGen(api, country) + rows = list(generator.generate_csv_rows()) + logging.info(f"For {country['name']} {len(rows)} lines were generated.") + return rows + + +def download(user, password, path, threads_count, bar): + api = BookingApi(user, password, "2.4") + list_api = BookingListApi(api) + countries = list_api.countries(languages="en") + logging.info(f"There is {len(countries)} countries.") + bar.total = len(countries) + with open(path, "w") as f: + with ThreadPool(threads_count) as pool: + for lines in pool.imap_unordered(partial(download_hotels_by_country, list_api), + countries): + f.writelines([f"{x}\n" for x in lines]) + bar.update() + logging.info(f"Hotels were saved to {path}.") def process_options(): - parser = argparse.ArgumentParser(description='Download and process booking hotels.') - parser.add_argument("-v", "--verbose", action="store_true", dest="verbose") + parser = argparse.ArgumentParser(description="Download and process booking hotels.") + parser.add_argument("-v", "--verbose", action="store_true", + dest="verbose") parser.add_argument("-q", "--quiet", action="store_false", dest="verbose") - - parser.add_argument("--password", dest="password", help="Booking.com account password") - parser.add_argument("--user", dest="user", help="Booking.com account user name") - - parser.add_argument("--path", dest="path", help="Path to data files") - parser.add_argument("--output", dest="output", help="Name and destination for output file") - - parser.add_argument("--download", action="store_true", dest="download", default=False) - parser.add_argument("--translate", action="store_true", dest="translate", default=False) - + parser.add_argument("--logfile", default="", + help="Name and destination for log file") + parser.add_argument("--password", required=True, dest="password", + help="Booking.com account password") + parser.add_argument("--user", required=True, dest="user", + help="Booking.com account user name") + parser.add_argument("--threads_count", default=1, type=int, + help="The number of threads for processing countries.") + parser.add_argument("--output", required=True, dest="output", + help="Name and destination for output file") options = parser.parse_args() - - if not options.download and not options.translate: - parser.print_help() - - # TODO(mgsergio): implpement it with argparse facilities. - if options.translate and not options.output: - print("--output isn't set") - parser.print_help() - exit() - return options def main(): options = process_options() - if options.download: - download(options.user, options.password, options.path) - if options.translate: - translate(options.path, options.output) + logfile = "" + if options.logfile: + logfile = options.logfile + else: + now = datetime.datetime.now() + name = f"{now.strftime('%d_%m_%Y-%H_%M_%S')}_booking_hotels.log" + logfile = os.path.join(os.path.dirname(os.path.realpath(__file__)), name) + print(f"Logs saved to {logfile}.", file=sys.stdout) + if options.threads_count > 1: + print(f"Limit requests per minute is {LIMIT_REQUESTS_PER_MINUTE}.", file=sys.stdout) + logging.basicConfig(level=logging.DEBUG, filename=logfile, + format="%(thread)d [%(asctime)s] %(levelname)s: %(message)s") + with tqdm(disable=not options.verbose) as bar: + download(options.user, options.password, options.output, + options.threads_count, bar) if __name__ == "__main__": diff --git a/tools/unix/generate_planet.sh b/tools/unix/generate_planet.sh index 8183021ce5..d95aeb8167 100755 --- a/tools/unix/generate_planet.sh +++ b/tools/unix/generate_planet.sh @@ -319,7 +319,7 @@ fi if [ ! -f "$BOOKING_FILE" -a -n "${BOOKING_USER-}" -a -n "${BOOKING_PASS-}" ]; then log "STATUS" "Step S1: Starting background hotels downloading" ( - $PYTHON $BOOKING_SCRIPT --user $BOOKING_USER --password $BOOKING_PASS --path "$INTDIR" --download --translate --output "$BOOKING_FILE" 2>"$LOG_PATH"/booking.log || true + $PYTHON $BOOKING_SCRIPT --user $BOOKING_USER --password $BOOKING_PASS --output "$BOOKING_FILE" --logfile="$LOG_PATH"/booking.log || true if [ -f "$BOOKING_FILE" -a "$(wc -l < "$BOOKING_FILE" || echo 0)" -gt 100 ]; then echo "Hotels have been downloaded. Please ensure this line is before Step 4." >> "$PLANET_LOG" else