subways/processors/mapsme.py
2021-08-02 15:45:56 +03:00

389 lines
15 KiB
Python
Executable file

import json
import os
import logging
from collections import defaultdict
from subway_structure import (
distance,
el_center,
Station,
DISPLACEMENT_TOLERANCE,
)
OSM_TYPES = {'n': (0, 'node'), 'w': (2, 'way'), 'r': (3, 'relation')}
ENTRANCE_PENALTY = 60 # seconds
TRANSFER_PENALTY = 30 # seconds
KMPH_TO_MPS = 1 / 3.6 # km/h to m/s conversion multiplier
SPEED_TO_ENTRANCE = 5 * KMPH_TO_MPS # m/s
SPEED_ON_TRANSFER = 3.5 * KMPH_TO_MPS # m/s
SPEED_ON_LINE = 40 * KMPH_TO_MPS # m/s
DEFAULT_INTERVAL = 2.5 # minutes
def uid(elid, typ=None):
t = elid[0]
osm_id = int(elid[1:])
if not typ:
osm_id = (osm_id << 2) + OSM_TYPES[t][0]
elif typ != t:
raise Exception('Got {}, expected {}'.format(elid, typ))
return osm_id << 1
class DummyCache:
"""This class may be used when you need to omit all cache processing"""
def __init__(self, cache_path, cities):
pass
def __getattr__(self, name):
"""This results in that a call to any method effectively does nothing
and does not generate exceptions."""
def method(*args, **kwargs):
return None
return method
def if_object_is_used(method):
"""Decorator to skip method execution under certain condition.
Relies on "is_used" object property."""
def inner(self, *args, **kwargs):
if not self.is_used:
return
return method(self, *args, **kwargs)
return inner
class MapsmeCache:
def __init__(self, cache_path, cities):
if not cache_path:
# cache is not used, all actions with cache must be silently skipped
self.is_used = False
return
self.cache_path = cache_path
self.is_used = True
self.cache = {}
if os.path.exists(cache_path):
try:
with open(cache_path, 'r', encoding='utf-8') as f:
self.cache = json.load(f)
except json.decoder.JSONDecodeError:
logging.warning(
"City cache '%s' is not a valid json file. "
"Building cache from scratch.",
cache_path,
)
self.recovered_city_names = set()
# One stoparea may participate in routes of different cities
self.stop_cities = defaultdict(set) # stoparea id -> city names
self.city_dict = {c.name: c for c in cities}
self.good_city_names = {c.name for c in cities if c.is_good()}
def _is_cached_city_usable(self, city):
"""Check if cached stations still exist in osm data and
not moved far away.
"""
city_cache_data = self.cache[city.name]
for stoparea_id, cached_stoparea in city_cache_data['stops'].items():
station_id = cached_stoparea['osm_type'][0] + str(
cached_stoparea['osm_id']
)
city_station = city.elements.get(station_id)
if not city_station or not Station.is_station(
city_station, city.modes
):
return False
station_coords = el_center(city_station)
cached_station_coords = tuple(
cached_stoparea[coord] for coord in ('lon', 'lat')
)
displacement = distance(station_coords, cached_station_coords)
if displacement > DISPLACEMENT_TOLERANCE:
return False
return True
@if_object_is_used
def provide_stops_and_networks(self, stops, networks):
"""Put stops and networks for bad cities into containers
passed as arguments."""
for city in self.city_dict.values():
if not city.is_good() and city.name in self.cache:
city_cached_data = self.cache[city.name]
if self._is_cached_city_usable(city):
stops.update(city_cached_data['stops'])
networks.append(city_cached_data['network'])
logging.info("Taking %s from cache", city.name)
self.recovered_city_names.add(city.name)
@if_object_is_used
def provide_transfers(self, transfers):
"""Add transfers from usable cached cities to 'transfers' dict
passed as argument."""
for city_name in self.recovered_city_names:
city_cached_transfers = self.cache[city_name]['transfers']
for stop1_uid, stop2_uid, transfer_time in city_cached_transfers:
if (stop1_uid, stop2_uid) not in transfers:
transfers[(stop1_uid, stop2_uid)] = transfer_time
@if_object_is_used
def initialize_good_city(self, city_name, network):
"""Create/replace one cache element with new data container.
This should be done for each good city."""
self.cache[city_name] = {
'network': network,
'stops': {}, # stoparea el_id -> jsonified stop data
'transfers': [], # list of tuples (stoparea1_uid, stoparea2_uid, time); uid1 < uid2
}
@if_object_is_used
def link_stop_with_city(self, stoparea_id, city_name):
"""Remember that some stop_area is used in a city."""
stoparea_uid = uid(stoparea_id)
self.stop_cities[stoparea_uid].add(city_name)
@if_object_is_used
def add_stop(self, stoparea_id, st):
"""Add stoparea to the cache of each city the stoparea is in."""
stoparea_uid = uid(stoparea_id)
for city_name in self.stop_cities[stoparea_uid]:
self.cache[city_name]['stops'][stoparea_id] = st
@if_object_is_used
def add_transfer(self, stoparea1_uid, stoparea2_uid, transfer_time):
"""If a transfer is inside a good city, add it to the city's cache."""
for city_name in (
self.good_city_names
& self.stop_cities[stoparea1_uid]
& self.stop_cities[stoparea2_uid]
):
self.cache[city_name]['transfers'].append(
(stoparea1_uid, stoparea2_uid, transfer_time)
)
@if_object_is_used
def save(self):
try:
with open(self.cache_path, 'w', encoding='utf-8') as f:
json.dump(self.cache, f, ensure_ascii=False)
except Exception as e:
logging.warning("Failed to save cache: %s", str(e))
def process(cities, transfers, cache_path):
"""cities - list of City instances;
transfers - list of sets of StopArea.id;
cache_path - path to json-file with good cities cache or None.
"""
def format_colour(c):
return c[1:] if c else None
def find_exits_for_platform(center, nodes):
exits = []
min_distance = None
for n in nodes:
d = distance(center, (n['lon'], n['lat']))
if not min_distance:
min_distance = d * 2 / 3
elif d < min_distance:
continue
too_close = False
for e in exits:
d = distance((e['lon'], e['lat']), (n['lon'], n['lat']))
if d < min_distance:
too_close = True
break
if not too_close:
exits.append(n)
return exits
cache = MapsmeCache(cache_path, cities)
stop_areas = {} # stoparea el_id -> StopArea instance
stops = {} # stoparea el_id -> stop jsonified data
networks = []
good_cities = [c for c in cities if c.is_good()]
platform_nodes = {}
cache.provide_stops_and_networks(stops, networks)
for city in good_cities:
network = {'network': city.name, 'routes': [], 'agency_id': city.id}
cache.initialize_good_city(city.name, network)
for route in city:
routes = {
'type': route.mode,
'ref': route.ref,
'name': route.name,
'colour': format_colour(route.colour),
'route_id': uid(route.id, 'r'),
'itineraries': [],
}
if route.infill:
routes['casing'] = routes['colour']
routes['colour'] = format_colour(route.infill)
for i, variant in enumerate(route):
itin = []
for stop in variant:
stop_areas[stop.stoparea.id] = stop.stoparea
cache.link_stop_with_city(stop.stoparea.id, city.name)
itin.append(
[
uid(stop.stoparea.id),
round(stop.distance / SPEED_ON_LINE),
]
)
# Make exits from platform nodes, if we don't have proper exits
if (
len(stop.stoparea.entrances) + len(stop.stoparea.exits)
== 0
):
for pl in stop.stoparea.platforms:
pl_el = city.elements[pl]
if pl_el['type'] == 'node':
pl_nodes = [pl_el]
elif pl_el['type'] == 'way':
pl_nodes = [
city.elements.get('n{}'.format(n))
for n in pl_el['nodes']
]
else:
pl_nodes = []
for m in pl_el['members']:
if m['type'] == 'way':
if (
'{}{}'.format(
m['type'][0], m['ref']
)
in city.elements
):
pl_nodes.extend(
[
city.elements.get(
'n{}'.format(n)
)
for n in city.elements[
'{}{}'.format(
m['type'][0],
m['ref'],
)
]['nodes']
]
)
pl_nodes = [n for n in pl_nodes if n]
platform_nodes[pl] = find_exits_for_platform(
stop.stoparea.centers[pl], pl_nodes
)
routes['itineraries'].append(
{
'stops': itin,
'interval': round(
(variant.interval or DEFAULT_INTERVAL) * 60
),
}
)
network['routes'].append(routes)
networks.append(network)
for stop_id, stop in stop_areas.items():
st = {
'name': stop.name,
'int_name': stop.int_name,
'lat': stop.center[1],
'lon': stop.center[0],
'osm_type': OSM_TYPES[stop.station.id[0]][1],
'osm_id': int(stop.station.id[1:]),
'id': uid(stop.id),
'entrances': [],
'exits': [],
}
for e_l, k in ((stop.entrances, 'entrances'), (stop.exits, 'exits')):
for e in e_l:
if e[0] == 'n':
st[k].append(
{
'osm_type': 'node',
'osm_id': int(e[1:]),
'lon': stop.centers[e][0],
'lat': stop.centers[e][1],
'distance': ENTRANCE_PENALTY
+ round(
distance(stop.centers[e], stop.center)
/ SPEED_TO_ENTRANCE
),
}
)
if len(stop.entrances) + len(stop.exits) == 0:
if stop.platforms:
for pl in stop.platforms:
for n in platform_nodes[pl]:
for k in ('entrances', 'exits'):
st[k].append(
{
'osm_type': n['type'],
'osm_id': n['id'],
'lon': n['lon'],
'lat': n['lat'],
'distance': ENTRANCE_PENALTY
+ round(
distance(
(n['lon'], n['lat']), stop.center
)
/ SPEED_TO_ENTRANCE
),
}
)
else:
for k in ('entrances', 'exits'):
st[k].append(
{
'osm_type': OSM_TYPES[stop.station.id[0]][1],
'osm_id': int(stop.station.id[1:]),
'lon': stop.centers[stop.id][0],
'lat': stop.centers[stop.id][1],
'distance': 60,
}
)
stops[stop_id] = st
cache.add_stop(stop_id, st)
pairwise_transfers = (
{}
) # (stoparea1_uid, stoparea2_uid) -> time; uid1 < uid2
for t_set in transfers:
t = list(t_set)
for t_first in range(len(t) - 1):
for t_second in range(t_first + 1, len(t)):
stoparea1 = t[t_first]
stoparea2 = t[t_second]
if stoparea1.id in stops and stoparea2.id in stops:
uid1 = uid(stoparea1.id)
uid2 = uid(stoparea2.id)
uid1, uid2 = sorted([uid1, uid2])
transfer_time = TRANSFER_PENALTY + round(
distance(stoparea1.center, stoparea2.center)
/ SPEED_ON_TRANSFER
)
pairwise_transfers[(uid1, uid2)] = transfer_time
cache.add_transfer(uid1, uid2, transfer_time)
cache.provide_transfers(pairwise_transfers)
cache.save()
pairwise_transfers = [
(stop1_uid, stop2_uid, transfer_time)
for (stop1_uid, stop2_uid), transfer_time in pairwise_transfers.items()
]
result = {
'stops': list(stops.values()),
'transfers': pairwise_transfers,
'networks': networks,
}
return result