subways/processors/mapsme.py
2023-03-14 19:15:44 +01:00

405 lines
15 KiB
Python
Executable file

import json
import logging
import os
from collections import defaultdict
from subway_structure import (
DISPLACEMENT_TOLERANCE,
distance,
el_center,
Station,
)
from ._common import (
DEFAULT_INTERVAL,
format_colour,
KMPH_TO_MPS,
SPEED_ON_TRANSFER,
TRANSFER_PENALTY,
)
OSM_TYPES = {"n": (0, "node"), "w": (2, "way"), "r": (3, "relation")}
ENTRANCE_PENALTY = 60 # seconds
SPEED_TO_ENTRANCE = 5 * KMPH_TO_MPS # m/s
SPEED_ON_LINE = 40 * KMPH_TO_MPS # m/s
def uid(elid, typ=None):
t = elid[0]
osm_id = int(elid[1:])
if not typ:
osm_id = (osm_id << 2) + OSM_TYPES[t][0]
elif typ != t:
raise Exception("Got {}, expected {}".format(elid, typ))
return osm_id << 1
class DummyCache:
"""This class may be used when you need to omit all cache processing"""
def __init__(self, cache_path, cities):
pass
def __getattr__(self, name):
"""This results in that a call to any method effectively does nothing
and does not generate exceptions."""
def method(*args, **kwargs):
return None
return method
def if_object_is_used(method):
"""Decorator to skip method execution under certain condition.
Relies on "is_used" object property."""
def inner(self, *args, **kwargs):
if not self.is_used:
return
return method(self, *args, **kwargs)
return inner
class MapsmeCache:
def __init__(self, cache_path, cities):
if not cache_path:
# Cache is not used,
# all actions with cache must be silently skipped
self.is_used = False
return
self.cache_path = cache_path
self.is_used = True
self.cache = {}
if os.path.exists(cache_path):
try:
with open(cache_path, "r", encoding="utf-8") as f:
self.cache = json.load(f)
except json.decoder.JSONDecodeError:
logging.warning(
"City cache '%s' is not a valid json file. "
"Building cache from scratch.",
cache_path,
)
self.recovered_city_names = set()
# One stoparea may participate in routes of different cities
self.stop_cities = defaultdict(set) # stoparea id -> city names
self.city_dict = {c.name: c for c in cities}
self.good_city_names = {c.name for c in cities if c.is_good}
def _is_cached_city_usable(self, city):
"""Check if cached stations still exist in osm data and
not moved far away.
"""
city_cache_data = self.cache[city.name]
for stoparea_id, cached_stoparea in city_cache_data["stops"].items():
station_id = cached_stoparea["osm_type"][0] + str(
cached_stoparea["osm_id"]
)
city_station = city.elements.get(station_id)
if not city_station or not Station.is_station(
city_station, city.modes
):
return False
station_coords = el_center(city_station)
cached_station_coords = tuple(
cached_stoparea[coord] for coord in ("lon", "lat")
)
displacement = distance(station_coords, cached_station_coords)
if displacement > DISPLACEMENT_TOLERANCE:
return False
return True
@if_object_is_used
def provide_stops_and_networks(self, stops, networks):
"""Put stops and networks for bad cities into containers
passed as arguments."""
for city in self.city_dict.values():
if not city.is_good and city.name in self.cache:
city_cached_data = self.cache[city.name]
if self._is_cached_city_usable(city):
stops.update(city_cached_data["stops"])
networks.append(city_cached_data["network"])
logging.info("Taking %s from cache", city.name)
self.recovered_city_names.add(city.name)
@if_object_is_used
def provide_transfers(self, transfers):
"""Add transfers from usable cached cities to 'transfers' dict
passed as argument."""
for city_name in self.recovered_city_names:
city_cached_transfers = self.cache[city_name]["transfers"]
for stop1_uid, stop2_uid, transfer_time in city_cached_transfers:
if (stop1_uid, stop2_uid) not in transfers:
transfers[(stop1_uid, stop2_uid)] = transfer_time
@if_object_is_used
def initialize_good_city(self, city_name, network):
"""Create/replace one cache element with new data container.
This should be done for each good city."""
self.cache[city_name] = {
"network": network,
"stops": {}, # stoparea el_id -> jsonified stop data
"transfers": [], # list of tuples
# (stoparea1_uid, stoparea2_uid, time); uid1 < uid2
}
@if_object_is_used
def link_stop_with_city(self, stoparea_id, city_name):
"""Remember that some stop_area is used in a city."""
stoparea_uid = uid(stoparea_id)
self.stop_cities[stoparea_uid].add(city_name)
@if_object_is_used
def add_stop(self, stoparea_id, st):
"""Add stoparea to the cache of each city the stoparea is in."""
stoparea_uid = uid(stoparea_id)
for city_name in self.stop_cities[stoparea_uid]:
self.cache[city_name]["stops"][stoparea_id] = st
@if_object_is_used
def add_transfer(self, stoparea1_uid, stoparea2_uid, transfer_time):
"""If a transfer is inside a good city, add it to the city's cache."""
for city_name in (
self.good_city_names
& self.stop_cities[stoparea1_uid]
& self.stop_cities[stoparea2_uid]
):
self.cache[city_name]["transfers"].append(
(stoparea1_uid, stoparea2_uid, transfer_time)
)
@if_object_is_used
def save(self):
try:
with open(self.cache_path, "w", encoding="utf-8") as f:
json.dump(self.cache, f, ensure_ascii=False)
except Exception as e:
logging.warning("Failed to save cache: %s", str(e))
def process(cities, transfers, filename, cache_path):
"""Generate all output and save to file.
:param cities: List of City instances
:param transfers: List of sets of StopArea.id
:param filename: Path to file to save the result
:param cache_path: Path to json-file with good cities cache or None.
"""
def find_exits_for_platform(center, nodes):
exits = []
min_distance = None
for n in nodes:
d = distance(center, (n["lon"], n["lat"]))
if not min_distance:
min_distance = d * 2 / 3
elif d < min_distance:
continue
too_close = False
for e in exits:
d = distance((e["lon"], e["lat"]), (n["lon"], n["lat"]))
if d < min_distance:
too_close = True
break
if not too_close:
exits.append(n)
return exits
cache = MapsmeCache(cache_path, cities)
stop_areas = {} # stoparea el_id -> StopArea instance
stops = {} # stoparea el_id -> stop jsonified data
networks = []
good_cities = [c for c in cities if c.is_good]
platform_nodes = {}
cache.provide_stops_and_networks(stops, networks)
for city in good_cities:
network = {"network": city.name, "routes": [], "agency_id": city.id}
cache.initialize_good_city(city.name, network)
for route in city:
routes = {
"type": route.mode,
"ref": route.ref,
"name": route.name,
"colour": format_colour(route.colour),
"route_id": uid(route.id, "r"),
"itineraries": [],
}
if route.infill:
routes["casing"] = routes["colour"]
routes["colour"] = format_colour(route.infill)
for i, variant in enumerate(route):
itin = []
for stop in variant:
stop_areas[stop.stoparea.id] = stop.stoparea
cache.link_stop_with_city(stop.stoparea.id, city.name)
itin.append(
[
uid(stop.stoparea.id),
round(stop.distance / SPEED_ON_LINE),
]
)
# Make exits from platform nodes,
# if we don't have proper exits
if (
len(stop.stoparea.entrances) + len(stop.stoparea.exits)
== 0
):
for pl in stop.stoparea.platforms:
pl_el = city.elements[pl]
if pl_el["type"] == "node":
pl_nodes = [pl_el]
elif pl_el["type"] == "way":
pl_nodes = [
city.elements.get("n{}".format(n))
for n in pl_el["nodes"]
]
else:
pl_nodes = []
for m in pl_el["members"]:
if m["type"] == "way":
if (
"{}{}".format(
m["type"][0], m["ref"]
)
in city.elements
):
pl_nodes.extend(
[
city.elements.get(
"n{}".format(n)
)
for n in city.elements[
"{}{}".format(
m["type"][0],
m["ref"],
)
]["nodes"]
]
)
pl_nodes = [n for n in pl_nodes if n]
platform_nodes[pl] = find_exits_for_platform(
stop.stoparea.centers[pl], pl_nodes
)
routes["itineraries"].append(
{
"stops": itin,
"interval": round(
variant.interval or DEFAULT_INTERVAL
),
}
)
network["routes"].append(routes)
networks.append(network)
for stop_id, stop in stop_areas.items():
st = {
"name": stop.name,
"int_name": stop.int_name,
"lat": stop.center[1],
"lon": stop.center[0],
"osm_type": OSM_TYPES[stop.station.id[0]][1],
"osm_id": int(stop.station.id[1:]),
"id": uid(stop.id),
"entrances": [],
"exits": [],
}
for e_l, k in ((stop.entrances, "entrances"), (stop.exits, "exits")):
for e in e_l:
if e[0] == "n":
st[k].append(
{
"osm_type": "node",
"osm_id": int(e[1:]),
"lon": stop.centers[e][0],
"lat": stop.centers[e][1],
"distance": ENTRANCE_PENALTY
+ round(
distance(stop.centers[e], stop.center)
/ SPEED_TO_ENTRANCE
),
}
)
if len(stop.entrances) + len(stop.exits) == 0:
if stop.platforms:
for pl in stop.platforms:
for n in platform_nodes[pl]:
for k in ("entrances", "exits"):
st[k].append(
{
"osm_type": n["type"],
"osm_id": n["id"],
"lon": n["lon"],
"lat": n["lat"],
"distance": ENTRANCE_PENALTY
+ round(
distance(
(n["lon"], n["lat"]), stop.center
)
/ SPEED_TO_ENTRANCE
),
}
)
else:
for k in ("entrances", "exits"):
st[k].append(
{
"osm_type": OSM_TYPES[stop.station.id[0]][1],
"osm_id": int(stop.station.id[1:]),
"lon": stop.centers[stop.id][0],
"lat": stop.centers[stop.id][1],
"distance": 60,
}
)
stops[stop_id] = st
cache.add_stop(stop_id, st)
pairwise_transfers = (
{}
) # (stoparea1_uid, stoparea2_uid) -> time; uid1 < uid2
for t_set in transfers:
t = list(t_set)
for t_first in range(len(t) - 1):
for t_second in range(t_first + 1, len(t)):
stoparea1 = t[t_first]
stoparea2 = t[t_second]
if stoparea1.id in stops and stoparea2.id in stops:
uid1 = uid(stoparea1.id)
uid2 = uid(stoparea2.id)
uid1, uid2 = sorted([uid1, uid2])
transfer_time = TRANSFER_PENALTY + round(
distance(stoparea1.center, stoparea2.center)
/ SPEED_ON_TRANSFER
)
pairwise_transfers[(uid1, uid2)] = transfer_time
cache.add_transfer(uid1, uid2, transfer_time)
cache.provide_transfers(pairwise_transfers)
cache.save()
pairwise_transfers = [
(stop1_uid, stop2_uid, transfer_time)
for (stop1_uid, stop2_uid), transfer_time in pairwise_transfers.items()
]
result = {
"stops": list(stops.values()),
"transfers": pairwise_transfers,
"networks": networks,
}
if not filename.lower().endswith("json"):
filename = f"{filename}.json"
with open(filename, "w", encoding="utf-8") as f:
json.dump(
result,
f,
indent=1,
ensure_ascii=False,
)