[generator] Supported new booking api.

This commit is contained in:
Maksim Andrianov 2019-04-05 19:26:27 +03:00 committed by mpimenov
parent d044e4c0d6
commit 4ee91f3754
2 changed files with 301 additions and 200 deletions

View file

@ -1,247 +1,348 @@
#!/usr/bin/env python
# coding: utf8
from __future__ import print_function
from collections import defaultdict
from datetime import datetime
import argparse
import base64
import eviltransform
import json
import datetime
import logging
import os
import pickle
import time
import urllib2
import statistics
import sys
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import partial
from multiprocessing.pool import ThreadPool
from random import randint
from threading import Event
from time import sleep
# Initialize logging.
logging.basicConfig(level=logging.DEBUG, format='[%(asctime)s] %(levelname)s: %(message)s')
import eviltransform
import math
import requests
from ratelimit import limits, sleep_and_retry
from tqdm import tqdm
# Names starting with '.' are calculated in get_hotel_field() below.
HOTEL_FIELDS = ('hotel_id', '.lat', '.lon', 'name', 'address', 'class', '.rate', 'ranking', 'review_score', 'url', 'hoteltype_id', '.trans')
LIMIT_REQUESTS_PER_MINUTE = 400
ATTEMPTS_COUNT = 10
MAX_LIMIT_WAIT_AFTER_429_ERROR_SECONDS = 120
SUPPORTED_LANGUAGES = ("en", "ru", "ar", "cs", "da", "nl", "fi", "fr", "de",
"hu", "id", "it", "ja", "ko", "pl", "pt", "ro", "es",
"sv", "th", "tr", "uk", "vi", "zh", "he", "sk", "el")
class AppError(Exception):
pass
class HTTPError(AppError):
pass
class AttemptsSpentError(AppError):
pass
class BookingApi:
def __init__(self, login, password):
self.login = login
self.password = password
self.baseConfig = {
"headers": {
"Content-Type": "application/json",
"Authorization": "Basic " + base64.encodestring(
"{login}:{password}".format(login=self.login, password=self.password)).replace('\n', '')
},
"url": 'https://distribution-xml.booking.com/json/bookings'}
self.checkMinute = 0
self.requestPerMinute = 0
self.requestLimit = 15 # request per minute
_ENDPOINTS = (
"countries",
"hotels"
)
def call(self, function, params=None):
self.requestPerMinute += 1
now = datetime.utcnow()
def __init__(self, login, password, version):
major_minor = version.split(".")
assert len(major_minor) == 2
assert int(major_minor[0]) >= 2
assert 0 <= int(major_minor[1]) <= 4
if self.requestPerMinute >= self.requestLimit:
waittime = 60 - now.second
logging.info("Limit for request per minute exceeded. Waiting for: {0} sec.".format(waittime))
time.sleep(waittime)
now = datetime.utcnow()
self._event = Event()
self._event.set()
self._login = login
self._password = password
self._base_url = f"https://distribution-xml.booking.com/{version}/json"
self._set_endpoints()
if self.checkMinute != now.minute:
self.requestPerMinute = 0
self.checkMinute = now.minute
payload = ''
@sleep_and_retry
@limits(calls=LIMIT_REQUESTS_PER_MINUTE, period=60)
def call_endpoint(self, endpoint, **params):
self._event.wait()
try:
p = "" if not params else '?' + "&".join(
["{key}={value}".format(key=k, value=v) for (k, v) in params.iteritems()])
url = "{base}.{func}{params}".format(base=self.baseConfig["url"], func=function, params=p)
logging.debug("{0} {1} API call:{2}".format(self.checkMinute, self.requestPerMinute, url))
request = urllib2.Request(url, None, self.baseConfig["headers"])
stream = urllib2.urlopen(request)
payload = stream.read()
data = json.loads(payload)
if isinstance(data, dict) and 'message' in data and 'code' in data:
logging.error('Api call failed with error: {0} Code: {1}'.format(data['message'], data['code']))
return None
return data
attempts = ATTEMPTS_COUNT
while attempts:
attempts -= 1
response = requests.post(f"{self._base_url}/{endpoint}",
auth=(self._login, self._password),
params=params)
if response.status_code == 200:
data = response.json()
return data["result"]
else:
self._handle_errors(response)
raise AttemptsSpentError(f"{ATTEMPTS_COUNT} attempts were spent.")
except Exception as e:
logging.error('Error: {0} Context: {1}'.format(e, payload))
return None
if not self._event.is_set():
self._event.set()
raise e
def _handle_errors(self, response, ):
error_message = ""
try:
data = response.json()
error_message = ",".join(x["message"] for x in data["errors"])
except:
pass
if response.status_code == 429:
self._event.clear()
wait_seconds = randint(0, MAX_LIMIT_WAIT_AFTER_429_ERROR_SECONDS)
sleep(wait_seconds)
logging.warning(f"Http error {response.status_code}: {error_message}. "
f"It waits {wait_seconds} seconds and tries again.")
self._event.set()
else:
raise HTTPError(
f"Http error with code {response.status_code}: {error_message}.")
def _set_endpoints(self):
for endpoint in BookingApi._ENDPOINTS:
setattr(self, endpoint, partial(self.call_endpoint, endpoint))
def download(user, password, path):
'''
Downloads all hotels from booking.com and stores them in a bunch of .pkl files.
'''
api = BookingApi(user, password)
class BookingListApi:
_ROWS_BY_REQUEST = 1000
_ENDPOINTS = (
"countries",
"hotels"
)
maxrows = 1000
countries = api.call("getCountries", dict(languagecodes='en'))
for country in countries:
countrycode = country['countrycode']
logging.info(u'Download[{0}]: {1}'.format(countrycode, country['name']))
def __init__(self, api):
self.api = api
self._set_endpoints()
allhotels = {}
while True:
hotels = api.call('getHotels',
dict(new_hotel_type=1, offset=len(allhotels), rows=maxrows, countrycodes=countrycode))
def call_endpoint(self, endpoint, **params):
return self._call_simple_endpoint(endpoint, **params)
# Check for error.
if hotels is None:
logging.critical('No hotels downloaded for country {0}'.format(country['name']))
break
for h in hotels:
allhotels[h['hotel_id']] = h
# If hotels in answer less then maxrows, we reach end of data.
if len(hotels) < maxrows:
break
if not hotels:
continue
# Now the same for hotel translations
def _call_simple_endpoint(self, endpoint, **params):
result = []
offset = 0
while True:
hotels = api.call('getHotelTranslations', dict(offset=offset, rows=maxrows, countrycodes=countrycode))
if hotels is None:
exit(1)
# Add translations for each hotel
for h in hotels:
if h['hotel_id'] in allhotels:
if 'translations' not in allhotels[h['hotel_id']]:
allhotels[h['hotel_id']]['translations'] = {}
allhotels[h['hotel_id']]['translations'][h['languagecode']] = {'name': h['name'], 'address': h['address']}
offset += len(hotels)
if len(hotels) < maxrows:
resp = self._call_endpoint_offset(offset, endpoint, **params)
result.extend(resp)
if len(resp) < BookingListApi._ROWS_BY_REQUEST:
break
offset += BookingListApi._ROWS_BY_REQUEST
return result
logging.info('Num of hotels: {0}, translations: {1}'.format(len(allhotels), offset))
filename = os.path.join(path,
'{0} - {1}.pkl'.format(country['area'].encode('utf8'), country['name'].encode('utf8')))
with open(filename, 'wb') as fd:
pickle.dump(allhotels.values(), fd, pickle.HIGHEST_PROTOCOL)
def _call_endpoint_offset(self, offset, endpoint, **params):
r = self.api.call_endpoint(endpoint, **{
"offset": offset,
"rows": BookingListApi._ROWS_BY_REQUEST,
**params
})
if not isinstance(r, list):
raise TypeError(f"Result has unexpected type {type(r)}")
return r
def _set_endpoints(self):
for endpoint in BookingListApi._ENDPOINTS:
setattr(self, endpoint, partial(self.call_endpoint, endpoint))
def translate(source, output):
'''
Reads *.pkl files and produces a single list of hotels as tab separated values.
'''
files = [os.path.join(source, filename)
for filename in os.listdir(source) if filename.endswith('.pkl')]
class BookingGen:
def __init__(self, api, country):
self.api = api
self.country_code = country["country"]
self.country_name = country["name"]
logging.info(f"Download[{self.country_code}]: {self.country_name}")
data = []
for filename in sorted(files):
logging.info('Processing {0}'.format(filename))
with open(filename, 'rb') as fd:
data += pickle.load(fd)
extras = ["hotel_info", "hotel_description", "room_info"]
self.hotels = self._download_hotels(extras)
self.translations = self._download_translations()
self.currency_medians = self._currency_medians_by_cities()
# Fix chinese coordinates
for hotel in data:
if hotel['countrycode'] == 'cn' and 'location' in hotel:
try:
hotel['location']['latitude'], hotel['location']['longitude'] = eviltransform.gcj2wgs_exact(
float(hotel['location']['latitude']), float(hotel['location']['longitude']))
except ValueError:
# We don't care if there were errors converting coordinates to float
pass
def generate_csv_rows(self, sep="\t"):
self._fix_hotels()
return (self._create_csv_hotel_line(hotel, sep) for hotel in self.hotels)
# Dict of dicts city_id -> { currency -> [prices] }
cities = defaultdict(lambda: defaultdict(list))
@staticmethod
def _get_hotel_min_price(hotel):
prices = (float(x["room_info"]["min_price"]) for x in hotel["room_data"])
flt = filter(lambda x: math.isclose(x, 0.0),
prices)
try:
return min(flt)
except ValueError:
return None
def valid(hotel):
return 'city_id' in hotel and 'currencycode' in hotel and 'minrate' in hotel and hotel['minrate'] is not None
@staticmethod
def _format_string(s):
s = s.strip()
for x in (("\t", " "), ("\n", " "), ("\r", "")):
s = s.replace(*x)
return s
# Collect prices
for hotel in data:
if valid(hotel):
cities[hotel['city_id']][hotel['currencycode']].append(float(hotel['minrate']))
def _download_hotels(self, extras, lang="default"):
return self.api.hotels(country_ids=self.country_code, language=lang,
extras=extras)
# Replaces list of prices by a median price.
for city in cities:
for cur in cities[city]:
cities[city][cur] = sorted(cities[city][cur])[len(cities[city][cur]) / 2]
def _download_translations(self):
extras = ["hotel_info", ]
translations = defaultdict(dict)
with ThreadPoolExecutor(max_workers=len(SUPPORTED_LANGUAGES)) as executor:
m = {executor.submit(self._download_hotels, extras, lang): lang
for lang in SUPPORTED_LANGUAGES}
for future in as_completed(m):
lang = m[future]
hotels = future.result()
for hotel in hotels:
hotel_id = hotel["hotel_id"]
hotel_data = hotel["hotel_data"]
translations[hotel_id][lang] = {
"name": BookingGen._format_string(hotel_data["name"]),
"address": BookingGen._format_string(hotel_data["address"])
}
return translations
# Price rate ranges, relative to the median price for a city
rates = (0.7, 1.3)
def _fix_hotels(self):
if self.country_code == "ch":
for hotel in self.hotels:
hotel_data = hotel["hotel_data"]
location = hotel_data["location"]
try:
location["latitude"], location["longitude"] = eviltransform.gcj2wgs_exact(
float(location["latitude"]), float(location["longitude"])
)
except ValueError:
logging.exception(f"Converting error {location}")
def get_hotel_field(hotel, field, rate):
if field == '.lat':
return hotel['location']['latitude']
elif field == '.lon':
return hotel['location']['longitude']
elif field == '.rate':
return rate
elif field == '.trans':
# Translations are packed into a single column: lang1|name1|address1|lang2|name2|address2|...
if 'translations' in hotel:
tr_list = []
for tr_lang, tr_values in hotel['translations'].items():
tr_list.append(tr_lang)
tr_list.extend([tr_values[e] for e in ('name', 'address')])
return '|'.join([s.replace('|', ';') for s in tr_list])
else:
return ''
elif field in hotel:
return hotel[field]
elif field == 'ranking':
# This field is not used yet, and booking.com sometimes blocks it.
return ''
logging.error('Unknown hotel field: {0}, URL: {1}'.format(field, hotel['url']))
return ''
def _currency_medians_by_cities(self):
cities = defaultdict(lambda: defaultdict(list))
for hotel in self.hotels:
hotel_data = hotel["hotel_data"]
city_id = hotel_data["city_id"]
currency = hotel_data["currency"]
price = BookingGen._get_hotel_min_price(hotel)
if price is not None:
cities[city_id][currency].append(price)
with open(output, 'w') as fd:
for hotel in data:
rate = 0
if valid(hotel):
avg = cities[hotel['city_id']][hotel['currencycode']]
price = float(hotel['minrate'])
rate = 1
# Find a range that contains the price
while rate <= len(rates) and price > avg * rates[rate - 1]:
rate += 1
l = [get_hotel_field(hotel, e, rate) for e in HOTEL_FIELDS]
print('\t'.join([unicode(f).encode('utf8').replace('\t', ' ').replace('\n', ' ').replace('\r', '') for f in l]), file=fd)
for city in cities:
for currency in cities[city]:
cities[city][currency] = statistics.median(cities[city][currency])
return cities
def _get_rate(self, hotel):
# Price rate ranges, relative to the median price for a city
rates = (0.7, 1.3)
rate = 0
hotel_data = hotel["hotel_data"]
city_id = hotel_data["city_id"]
currency = hotel_data["currency"]
price = BookingGen._get_hotel_min_price(hotel)
if price is not None:
avg = self.currency_medians[city_id][currency]
rate = 1
# Find a range that contains the price
while rate <= len(rates) and price > avg * rates[rate - 1]:
rate += 1
return rate
def _get_translations(self, hotel):
try:
tr = self.translations[hotel["hotel_id"]]
except KeyError:
return ""
hotel_data = hotel["hotel_data"]
name = hotel_data["name"]
address = hotel_data["address"]
tr_ = defaultdict(dict)
for k, v in tr.items():
n = v["name"] if v["name"] != name else ""
a = v["address"] if v["address"] != address else ""
if a or n:
tr_[k]["name"] = n
tr_[k]["address"] = a
tr_list = []
for tr_lang, tr_values in tr_.items():
tr_list.append(tr_lang)
tr_list.extend([tr_values[e] for e in ("name", "address")])
return "|".join(s.replace("|", ";") for s in tr_list)
def _create_csv_hotel_line(self, hotel, sep="\t"):
hotel_data = hotel["hotel_data"]
location = hotel_data["location"]
row = (
hotel["hotel_id"],
f"{location['latitude']:.6f}",
f"{location['longitude']:.6f}",
hotel_data["name"],
hotel_data["address"],
hotel_data["class"],
self._get_rate(hotel),
hotel_data["ranking"],
hotel_data["review_score"],
hotel_data["url"],
hotel_data["hotel_type_id"],
self._get_translations(hotel)
)
return sep.join(BookingGen._format_string(str(x)) for x in row)
def download_hotels_by_country(api, country):
generator = BookingGen(api, country)
rows = list(generator.generate_csv_rows())
logging.info(f"For {country['name']} {len(rows)} lines were generated.")
return rows
def download(user, password, path, threads_count, bar):
api = BookingApi(user, password, "2.4")
list_api = BookingListApi(api)
countries = list_api.countries(languages="en")
logging.info(f"There is {len(countries)} countries.")
bar.total = len(countries)
with open(path, "w") as f:
with ThreadPool(threads_count) as pool:
for lines in pool.imap_unordered(partial(download_hotels_by_country, list_api),
countries):
f.writelines([f"{x}\n" for x in lines])
bar.update()
logging.info(f"Hotels were saved to {path}.")
def process_options():
parser = argparse.ArgumentParser(description='Download and process booking hotels.')
parser.add_argument("-v", "--verbose", action="store_true", dest="verbose")
parser = argparse.ArgumentParser(description="Download and process booking hotels.")
parser.add_argument("-v", "--verbose", action="store_true",
dest="verbose")
parser.add_argument("-q", "--quiet", action="store_false", dest="verbose")
parser.add_argument("--password", dest="password", help="Booking.com account password")
parser.add_argument("--user", dest="user", help="Booking.com account user name")
parser.add_argument("--path", dest="path", help="Path to data files")
parser.add_argument("--output", dest="output", help="Name and destination for output file")
parser.add_argument("--download", action="store_true", dest="download", default=False)
parser.add_argument("--translate", action="store_true", dest="translate", default=False)
parser.add_argument("--logfile", default="",
help="Name and destination for log file")
parser.add_argument("--password", required=True, dest="password",
help="Booking.com account password")
parser.add_argument("--user", required=True, dest="user",
help="Booking.com account user name")
parser.add_argument("--threads_count", default=1, type=int,
help="The number of threads for processing countries.")
parser.add_argument("--output", required=True, dest="output",
help="Name and destination for output file")
options = parser.parse_args()
if not options.download and not options.translate:
parser.print_help()
# TODO(mgsergio): implpement it with argparse facilities.
if options.translate and not options.output:
print("--output isn't set")
parser.print_help()
exit()
return options
def main():
options = process_options()
if options.download:
download(options.user, options.password, options.path)
if options.translate:
translate(options.path, options.output)
logfile = ""
if options.logfile:
logfile = options.logfile
else:
now = datetime.datetime.now()
name = f"{now.strftime('%d_%m_%Y-%H_%M_%S')}_booking_hotels.log"
logfile = os.path.join(os.path.dirname(os.path.realpath(__file__)), name)
print(f"Logs saved to {logfile}.", file=sys.stdout)
if options.threads_count > 1:
print(f"Limit requests per minute is {LIMIT_REQUESTS_PER_MINUTE}.", file=sys.stdout)
logging.basicConfig(level=logging.DEBUG, filename=logfile,
format="%(thread)d [%(asctime)s] %(levelname)s: %(message)s")
with tqdm(disable=not options.verbose) as bar:
download(options.user, options.password, options.output,
options.threads_count, bar)
if __name__ == "__main__":

View file

@ -319,7 +319,7 @@ fi
if [ ! -f "$BOOKING_FILE" -a -n "${BOOKING_USER-}" -a -n "${BOOKING_PASS-}" ]; then
log "STATUS" "Step S1: Starting background hotels downloading"
(
$PYTHON $BOOKING_SCRIPT --user $BOOKING_USER --password $BOOKING_PASS --path "$INTDIR" --download --translate --output "$BOOKING_FILE" 2>"$LOG_PATH"/booking.log || true
$PYTHON $BOOKING_SCRIPT --user $BOOKING_USER --password $BOOKING_PASS --output "$BOOKING_FILE" --logfile="$LOG_PATH"/booking.log || true
if [ -f "$BOOKING_FILE" -a "$(wc -l < "$BOOKING_FILE" || echo 0)" -gt 100 ]; then
echo "Hotels have been downloaded. Please ensure this line is before Step 4." >> "$PLANET_LOG"
else