2140 lines
79 KiB
Python
2140 lines
79 KiB
Python
import math
|
|
import re
|
|
from collections import Counter, defaultdict
|
|
from itertools import islice
|
|
|
|
from css_colours import normalize_colour
|
|
|
|
|
|
MAX_DISTANCE_TO_ENTRANCES = 300 # in meters
|
|
MAX_DISTANCE_STOP_TO_LINE = 50 # in meters
|
|
ALLOWED_STATIONS_MISMATCH = 0.02 # part of total station count
|
|
ALLOWED_TRANSFERS_MISMATCH = 0.07 # part of total interchanges count
|
|
ALLOWED_ANGLE_BETWEEN_STOPS = 45 # in degrees
|
|
DISALLOWED_ANGLE_BETWEEN_STOPS = 20 # in degrees
|
|
|
|
# If an object was moved not too far compared to previous script run,
|
|
# it is likely the same object
|
|
DISPLACEMENT_TOLERANCE = 300 # in meters
|
|
|
|
MODES_RAPID = set(("subway", "light_rail", "monorail", "train"))
|
|
MODES_OVERGROUND = set(("tram", "bus", "trolleybus", "aerialway", "ferry"))
|
|
DEFAULT_MODES_RAPID = set(("subway", "light_rail"))
|
|
DEFAULT_MODES_OVERGROUND = set(("tram",)) # TODO: bus and trolleybus?
|
|
ALL_MODES = MODES_RAPID | MODES_OVERGROUND
|
|
RAILWAY_TYPES = set(
|
|
(
|
|
"rail",
|
|
"light_rail",
|
|
"subway",
|
|
"narrow_gauge",
|
|
"funicular",
|
|
"monorail",
|
|
"tram",
|
|
)
|
|
)
|
|
CONSTRUCTION_KEYS = (
|
|
"construction",
|
|
"proposed",
|
|
"construction:railway",
|
|
"proposed:railway",
|
|
)
|
|
|
|
used_entrances = set()
|
|
|
|
|
|
START_END_TIMES_RE = re.compile(r".*?(\d{2}):(\d{2})-(\d{2}):(\d{2}).*")
|
|
|
|
|
|
def get_start_end_times(opening_hours):
|
|
"""Very simplified method to parse OSM opening_hours tag.
|
|
We simply take the first HH:MM-HH:MM substring which is the most probable
|
|
opening hours interval for the most of weekdays.
|
|
"""
|
|
start_time, end_time = None, None
|
|
m = START_END_TIMES_RE.match(opening_hours)
|
|
if m:
|
|
ints = tuple(map(int, m.groups()))
|
|
start_time = (ints[0], ints[1])
|
|
end_time = (ints[2], ints[3])
|
|
return start_time, end_time
|
|
|
|
|
|
def osm_interval_to_seconds(interval_str):
|
|
"""Convert to int an OSM value for 'interval'/'headway' tag
|
|
which may be in these formats:
|
|
HH:MM:SS,
|
|
HH:MM,
|
|
MM,
|
|
M
|
|
(https://wiki.openstreetmap.org/wiki/Key:interval#Format)
|
|
"""
|
|
hours, minutes, seconds = 0, 0, 0
|
|
semicolon_count = interval_str.count(":")
|
|
try:
|
|
if semicolon_count == 0:
|
|
minutes = int(interval_str)
|
|
elif semicolon_count == 1:
|
|
hours, minutes = map(int, interval_str.split(":"))
|
|
elif semicolon_count == 2:
|
|
hours, minutes, seconds = map(int, interval_str.split(":"))
|
|
else:
|
|
return None
|
|
except ValueError:
|
|
return None
|
|
return seconds + 60 * minutes + 60 * 60 * hours
|
|
|
|
|
|
class CriticalValidationError(Exception):
|
|
"""Is thrown if an error occurs
|
|
that prevents further validation of a city."""
|
|
|
|
|
|
def el_id(el):
|
|
if not el:
|
|
return None
|
|
if "type" not in el:
|
|
raise Exception("What is this element? {}".format(el))
|
|
return el["type"][0] + str(el.get("id", el.get("ref", "")))
|
|
|
|
|
|
def el_center(el):
|
|
if not el:
|
|
return None
|
|
if "lat" in el:
|
|
return (el["lon"], el["lat"])
|
|
elif "center" in el:
|
|
return (el["center"]["lon"], el["center"]["lat"])
|
|
return None
|
|
|
|
|
|
def distance(p1, p2):
|
|
if p1 is None or p2 is None:
|
|
raise Exception(
|
|
"One of arguments to distance({}, {}) is None".format(p1, p2)
|
|
)
|
|
dx = math.radians(p1[0] - p2[0]) * math.cos(
|
|
0.5 * math.radians(p1[1] + p2[1])
|
|
)
|
|
dy = math.radians(p1[1] - p2[1])
|
|
return 6378137 * math.sqrt(dx * dx + dy * dy)
|
|
|
|
|
|
def is_near(p1, p2):
|
|
return (
|
|
p1[0] - 1e-8 <= p2[0] <= p1[0] + 1e-8
|
|
and p1[1] - 1e-8 <= p2[1] <= p1[1] + 1e-8
|
|
)
|
|
|
|
|
|
def project_on_segment(p, p1, p2):
|
|
"""Given three points, return u - the position of projection of
|
|
point p onto segment p1p2 regarding point p1 and (p2-p1) direction vector
|
|
"""
|
|
dp = (p2[0] - p1[0], p2[1] - p1[1])
|
|
d2 = dp[0] * dp[0] + dp[1] * dp[1]
|
|
if d2 < 1e-14:
|
|
return None
|
|
u = ((p[0] - p1[0]) * dp[0] + (p[1] - p1[1]) * dp[1]) / d2
|
|
if not 0 <= u <= 1:
|
|
return None
|
|
return u
|
|
|
|
|
|
def project_on_line(p, line):
|
|
result = {
|
|
# In the first approximation, position on rails is the index of the
|
|
# closest vertex of line to the point p. Fractional value means that
|
|
# the projected point lies on a segment between two vertices.
|
|
# More than one value can occur if a route follows the same tracks
|
|
# more than once.
|
|
"positions_on_line": None,
|
|
"projected_point": None, # (lon, lat)
|
|
}
|
|
|
|
if len(line) < 2:
|
|
return result
|
|
d_min = MAX_DISTANCE_STOP_TO_LINE * 5
|
|
closest_to_vertex = False
|
|
# First, check vertices in the line
|
|
for i, vertex in enumerate(line):
|
|
d = distance(p, vertex)
|
|
if d < d_min:
|
|
result["positions_on_line"] = [i]
|
|
result["projected_point"] = vertex
|
|
d_min = d
|
|
closest_to_vertex = True
|
|
elif vertex == result["projected_point"]:
|
|
# Repeated occurrence of the track vertex in line, like Oslo Line 5
|
|
result["positions_on_line"].append(i)
|
|
# And then calculate distances to each segment
|
|
for seg in range(len(line) - 1):
|
|
# Check bbox for speed
|
|
if not (
|
|
(
|
|
min(line[seg][0], line[seg + 1][0]) - MAX_DISTANCE_STOP_TO_LINE
|
|
<= p[0]
|
|
<= max(line[seg][0], line[seg + 1][0])
|
|
+ MAX_DISTANCE_STOP_TO_LINE
|
|
)
|
|
and (
|
|
min(line[seg][1], line[seg + 1][1]) - MAX_DISTANCE_STOP_TO_LINE
|
|
<= p[1]
|
|
<= max(line[seg][1], line[seg + 1][1])
|
|
+ MAX_DISTANCE_STOP_TO_LINE
|
|
)
|
|
):
|
|
continue
|
|
u = project_on_segment(p, line[seg], line[seg + 1])
|
|
if u:
|
|
projected_point = (
|
|
line[seg][0] + u * (line[seg + 1][0] - line[seg][0]),
|
|
line[seg][1] + u * (line[seg + 1][1] - line[seg][1]),
|
|
)
|
|
d = distance(p, projected_point)
|
|
if d < d_min:
|
|
result["positions_on_line"] = [seg + u]
|
|
result["projected_point"] = projected_point
|
|
d_min = d
|
|
closest_to_vertex = False
|
|
elif projected_point == result["projected_point"]:
|
|
# Repeated occurrence of the track segment in line,
|
|
# like Oslo Line 5
|
|
if not closest_to_vertex:
|
|
result["positions_on_line"].append(seg + u)
|
|
return result
|
|
|
|
|
|
def find_segment(p, line, start_vertex=0):
|
|
"""Returns index of a segment and a position inside it."""
|
|
EPS = 1e-9
|
|
for seg in range(start_vertex, len(line) - 1):
|
|
if is_near(p, line[seg]):
|
|
return seg, 0.0
|
|
if line[seg][0] == line[seg + 1][0]:
|
|
if not (p[0] - EPS <= line[seg][0] <= p[0] + EPS):
|
|
continue
|
|
px = None
|
|
else:
|
|
px = (p[0] - line[seg][0]) / (line[seg + 1][0] - line[seg][0])
|
|
if px is None or (0 <= px <= 1):
|
|
if line[seg][1] == line[seg + 1][1]:
|
|
if not (p[1] - EPS <= line[seg][1] <= p[1] + EPS):
|
|
continue
|
|
py = None
|
|
else:
|
|
py = (p[1] - line[seg][1]) / (line[seg + 1][1] - line[seg][1])
|
|
if py is None or (0 <= py <= 1):
|
|
if py is None or px is None or (px - EPS <= py <= px + EPS):
|
|
return seg, px or py
|
|
return None, None
|
|
|
|
|
|
def distance_on_line(p1, p2, line, start_vertex=0):
|
|
"""Calculates distance via line between projections
|
|
of points p1 and p2. Returns a TUPLE of (d, vertex):
|
|
d is the distance and vertex is the number of the second
|
|
vertex, to continue calculations for the next point."""
|
|
line_len = len(line)
|
|
seg1, pos1 = find_segment(p1, line, start_vertex)
|
|
if seg1 is None:
|
|
# logging.warn('p1 %s is not projected, st=%s', p1, start_vertex)
|
|
return None
|
|
seg2, pos2 = find_segment(p2, line, seg1)
|
|
if seg2 is None:
|
|
if line[0] == line[-1]:
|
|
line = line + line[1:]
|
|
seg2, pos2 = find_segment(p2, line, seg1)
|
|
if seg2 is None:
|
|
# logging.warn('p2 %s is not projected, st=%s', p2, start_vertex)
|
|
return None
|
|
if seg1 == seg2:
|
|
return distance(line[seg1], line[seg1 + 1]) * abs(pos2 - pos1), seg1
|
|
if seg2 < seg1:
|
|
# Should not happen
|
|
raise Exception("Pos1 %s is after pos2 %s", seg1, seg2)
|
|
d = 0
|
|
if pos1 < 1:
|
|
d += distance(line[seg1], line[seg1 + 1]) * (1 - pos1)
|
|
for i in range(seg1 + 1, seg2):
|
|
d += distance(line[i], line[i + 1])
|
|
if pos2 > 0:
|
|
d += distance(line[seg2], line[seg2 + 1]) * pos2
|
|
return d, seg2 % line_len
|
|
|
|
|
|
def angle_between(p1, c, p2):
|
|
a = round(
|
|
abs(
|
|
math.degrees(
|
|
math.atan2(p1[1] - c[1], p1[0] - c[0])
|
|
- math.atan2(p2[1] - c[1], p2[0] - c[0])
|
|
)
|
|
)
|
|
)
|
|
return a if a <= 180 else 360 - a
|
|
|
|
|
|
def format_elid_list(ids):
|
|
msg = ", ".join(sorted(ids)[:20])
|
|
if len(ids) > 20:
|
|
msg += ", ..."
|
|
return msg
|
|
|
|
|
|
class Station:
|
|
@staticmethod
|
|
def get_modes(el):
|
|
mode = el["tags"].get("station")
|
|
modes = [] if not mode else [mode]
|
|
for m in ALL_MODES:
|
|
if el["tags"].get(m) == "yes":
|
|
modes.append(m)
|
|
return set(modes)
|
|
|
|
@staticmethod
|
|
def is_station(el, modes):
|
|
# public_transport=station is too ambiguous and unspecific to use,
|
|
# so we expect for it to be backed by railway=station.
|
|
if (
|
|
"tram" in modes
|
|
and el.get("tags", {}).get("railway") == "tram_stop"
|
|
):
|
|
return True
|
|
if el.get("tags", {}).get("railway") not in ("station", "halt"):
|
|
return False
|
|
for k in CONSTRUCTION_KEYS:
|
|
if k in el["tags"]:
|
|
return False
|
|
# Not checking for station=train, obviously
|
|
if "train" not in modes and Station.get_modes(el).isdisjoint(modes):
|
|
return False
|
|
return True
|
|
|
|
def __init__(self, el, city):
|
|
"""Call this with a railway=station node."""
|
|
if not Station.is_station(el, city.modes):
|
|
raise Exception(
|
|
"Station object should be instantiated from a station node. "
|
|
"Got: {}".format(el)
|
|
)
|
|
|
|
self.id = el_id(el)
|
|
self.element = el
|
|
self.modes = Station.get_modes(el)
|
|
self.name = el["tags"].get("name", "?")
|
|
self.int_name = el["tags"].get(
|
|
"int_name", el["tags"].get("name:en", None)
|
|
)
|
|
try:
|
|
self.colour = normalize_colour(el["tags"].get("colour", None))
|
|
except ValueError as e:
|
|
self.colour = None
|
|
city.warn(str(e), el)
|
|
self.center = el_center(el)
|
|
if self.center is None:
|
|
raise Exception("Could not find center of {}".format(el))
|
|
|
|
def __repr__(self):
|
|
return "Station(id={}, modes={}, name={}, center={})".format(
|
|
self.id, ",".join(self.modes), self.name, self.center
|
|
)
|
|
|
|
|
|
class StopArea:
|
|
@staticmethod
|
|
def is_stop(el):
|
|
if "tags" not in el:
|
|
return False
|
|
if el["tags"].get("railway") == "stop":
|
|
return True
|
|
if el["tags"].get("public_transport") == "stop_position":
|
|
return True
|
|
return False
|
|
|
|
@staticmethod
|
|
def is_platform(el):
|
|
if "tags" not in el:
|
|
return False
|
|
if el["tags"].get("railway") in ("platform", "platform_edge"):
|
|
return True
|
|
if el["tags"].get("public_transport") == "platform":
|
|
return True
|
|
return False
|
|
|
|
@staticmethod
|
|
def is_track(el):
|
|
if el["type"] != "way" or "tags" not in el:
|
|
return False
|
|
return el["tags"].get("railway") in RAILWAY_TYPES
|
|
|
|
def __init__(self, station, city, stop_area=None):
|
|
"""Call this with a Station object."""
|
|
|
|
self.element = stop_area or station.element
|
|
self.id = el_id(self.element)
|
|
self.station = station
|
|
self.stops = set() # set of el_ids of stop_positions
|
|
self.platforms = set() # set of el_ids of platforms
|
|
self.exits = set() # el_id of subway_entrance for leaving the platform
|
|
self.entrances = set() # el_id of subway_entrance for entering
|
|
# the platform
|
|
self.center = None # lon, lat of the station centre point
|
|
self.centers = {} # el_id -> (lon, lat) for all elements
|
|
self.transfer = None # el_id of a transfer relation
|
|
|
|
self.modes = station.modes
|
|
self.name = station.name
|
|
self.int_name = station.int_name
|
|
self.colour = station.colour
|
|
|
|
if stop_area:
|
|
self.name = stop_area["tags"].get("name", self.name)
|
|
self.int_name = stop_area["tags"].get(
|
|
"int_name", stop_area["tags"].get("name:en", self.int_name)
|
|
)
|
|
try:
|
|
self.colour = (
|
|
normalize_colour(stop_area["tags"].get("colour"))
|
|
or self.colour
|
|
)
|
|
except ValueError as e:
|
|
city.warn(str(e), stop_area)
|
|
|
|
# If we have a stop area, add all elements from it
|
|
warned_about_tracks = False
|
|
for m in stop_area["members"]:
|
|
k = el_id(m)
|
|
m_el = city.elements.get(k)
|
|
if m_el and "tags" in m_el:
|
|
if Station.is_station(m_el, city.modes):
|
|
if k != station.id:
|
|
city.error(
|
|
"Stop area has multiple stations", stop_area
|
|
)
|
|
elif StopArea.is_stop(m_el):
|
|
self.stops.add(k)
|
|
elif StopArea.is_platform(m_el):
|
|
self.platforms.add(k)
|
|
elif m_el["tags"].get("railway") == "subway_entrance":
|
|
if m_el["type"] != "node":
|
|
city.warn("Subway entrance is not a node", m_el)
|
|
if (
|
|
m_el["tags"].get("entrance") != "exit"
|
|
and m["role"] != "exit_only"
|
|
):
|
|
self.entrances.add(k)
|
|
if (
|
|
m_el["tags"].get("entrance") != "entrance"
|
|
and m["role"] != "entry_only"
|
|
):
|
|
self.exits.add(k)
|
|
elif StopArea.is_track(m_el):
|
|
if not warned_about_tracks:
|
|
city.warn(
|
|
"Tracks in a stop_area relation", stop_area
|
|
)
|
|
warned_about_tracks = True
|
|
else:
|
|
# Otherwise add nearby entrances
|
|
center = station.center
|
|
for c_el in city.elements.values():
|
|
if c_el.get("tags", {}).get("railway") == "subway_entrance":
|
|
c_id = el_id(c_el)
|
|
if c_id not in city.stop_areas:
|
|
c_center = el_center(c_el)
|
|
if (
|
|
c_center
|
|
and distance(center, c_center)
|
|
<= MAX_DISTANCE_TO_ENTRANCES
|
|
):
|
|
if c_el["type"] != "node":
|
|
city.warn(
|
|
"Subway entrance is not a node", c_el
|
|
)
|
|
etag = c_el["tags"].get("entrance")
|
|
if etag != "exit":
|
|
self.entrances.add(c_id)
|
|
if etag != "entrance":
|
|
self.exits.add(c_id)
|
|
|
|
if self.exits and not self.entrances:
|
|
city.warn(
|
|
"Only exits for a station, no entrances",
|
|
stop_area or station.element,
|
|
)
|
|
if self.entrances and not self.exits:
|
|
city.warn("No exits for a station", stop_area or station.element)
|
|
|
|
for el in self.get_elements():
|
|
self.centers[el] = el_center(city.elements[el])
|
|
|
|
"""Calculate the center point of the station. This algorithm
|
|
cannot rely on a station node, since many stop_areas can share one.
|
|
Basically it averages center points of all platforms
|
|
and stop positions."""
|
|
if len(self.stops) + len(self.platforms) == 0:
|
|
self.center = station.center
|
|
else:
|
|
self.center = [0, 0]
|
|
for sp in self.stops | self.platforms:
|
|
spc = self.centers[sp]
|
|
for i in range(2):
|
|
self.center[i] += spc[i]
|
|
for i in range(2):
|
|
self.center[i] /= len(self.stops) + len(self.platforms)
|
|
|
|
def get_elements(self):
|
|
result = set([self.id, self.station.id])
|
|
result.update(self.entrances)
|
|
result.update(self.exits)
|
|
result.update(self.stops)
|
|
result.update(self.platforms)
|
|
return result
|
|
|
|
def __repr__(self):
|
|
return (
|
|
f"StopArea(id={self.id}, name={self.name}, station={self.station},"
|
|
f" transfer={self.transfer}, center={self.center})"
|
|
)
|
|
|
|
|
|
class RouteStop:
|
|
def __init__(self, stoparea):
|
|
self.stoparea = stoparea
|
|
self.stop = None # Stop position (lon, lat), possibly projected
|
|
self.distance = 0 # In meters from the start of the route
|
|
self.platform_entry = None # Platform el_id
|
|
self.platform_exit = None # Platform el_id
|
|
self.can_enter = False
|
|
self.can_exit = False
|
|
self.seen_stop = False
|
|
self.seen_platform_entry = False
|
|
self.seen_platform_exit = False
|
|
self.seen_station = False
|
|
|
|
@property
|
|
def seen_platform(self):
|
|
return self.seen_platform_entry or self.seen_platform_exit
|
|
|
|
@staticmethod
|
|
def get_actual_role(el, role, modes):
|
|
if StopArea.is_stop(el):
|
|
return "stop"
|
|
elif StopArea.is_platform(el):
|
|
return "platform"
|
|
elif Station.is_station(el, modes):
|
|
if "platform" in role:
|
|
return "platform"
|
|
else:
|
|
return "stop"
|
|
return None
|
|
|
|
def add(self, member, relation, city):
|
|
el = city.elements[el_id(member)]
|
|
role = member["role"]
|
|
|
|
if StopArea.is_stop(el):
|
|
if "platform" in role:
|
|
city.warn("Stop position in a platform role in a route", el)
|
|
if el["type"] != "node":
|
|
city.error("Stop position is not a node", el)
|
|
self.stop = el_center(el)
|
|
if "entry_only" not in role:
|
|
self.can_exit = True
|
|
if "exit_only" not in role:
|
|
self.can_enter = True
|
|
|
|
elif Station.is_station(el, city.modes):
|
|
if el["type"] != "node":
|
|
city.notice("Station in route is not a node", el)
|
|
|
|
if not self.seen_stop and not self.seen_platform:
|
|
self.stop = el_center(el)
|
|
self.can_enter = True
|
|
self.can_exit = True
|
|
|
|
elif StopArea.is_platform(el):
|
|
if "stop" in role:
|
|
city.warn("Platform in a stop role in a route", el)
|
|
if "exit_only" not in role:
|
|
self.platform_entry = el_id(el)
|
|
self.can_enter = True
|
|
if "entry_only" not in role:
|
|
self.platform_exit = el_id(el)
|
|
self.can_exit = True
|
|
if not self.seen_stop:
|
|
self.stop = el_center(el)
|
|
|
|
multiple_check = False
|
|
actual_role = RouteStop.get_actual_role(el, role, city.modes)
|
|
if actual_role == "platform":
|
|
if role == "platform_entry_only":
|
|
multiple_check = self.seen_platform_entry
|
|
self.seen_platform_entry = True
|
|
elif role == "platform_exit_only":
|
|
multiple_check = self.seen_platform_exit
|
|
self.seen_platform_exit = True
|
|
else:
|
|
if role != "platform" and "stop" not in role:
|
|
city.warn(
|
|
f'Platform "{el["tags"].get("name", "")}" '
|
|
f'({el_id(el)}) with invalid role "{role}" in route',
|
|
relation,
|
|
)
|
|
multiple_check = self.seen_platform
|
|
self.seen_platform_entry = True
|
|
self.seen_platform_exit = True
|
|
elif actual_role == "stop":
|
|
multiple_check = self.seen_stop
|
|
self.seen_stop = True
|
|
if multiple_check:
|
|
log_function = city.error if actual_role == "stop" else city.notice
|
|
log_function(
|
|
f'Multiple {actual_role}s for a station "'
|
|
f'{el["tags"].get("name", "")} '
|
|
f"({el_id(el)}) in a route relation",
|
|
relation,
|
|
)
|
|
|
|
def __repr__(self):
|
|
return (
|
|
"RouteStop(stop={}, pl_entry={}, pl_exit={}, stoparea={})".format(
|
|
self.stop,
|
|
self.platform_entry,
|
|
self.platform_exit,
|
|
self.stoparea,
|
|
)
|
|
)
|
|
|
|
|
|
class Route:
|
|
"""The longest route for a city with a unique ref."""
|
|
|
|
@staticmethod
|
|
def is_route(el, modes):
|
|
if (
|
|
el["type"] != "relation"
|
|
or el.get("tags", {}).get("type") != "route"
|
|
):
|
|
return False
|
|
if "members" not in el:
|
|
return False
|
|
if el["tags"].get("route") not in modes:
|
|
return False
|
|
for k in CONSTRUCTION_KEYS:
|
|
if k in el["tags"]:
|
|
return False
|
|
if "ref" not in el["tags"] and "name" not in el["tags"]:
|
|
return False
|
|
return True
|
|
|
|
@staticmethod
|
|
def get_network(relation):
|
|
for k in ("network:metro", "network", "operator"):
|
|
if k in relation["tags"]:
|
|
return relation["tags"][k]
|
|
return None
|
|
|
|
@staticmethod
|
|
def get_interval(tags):
|
|
v = None
|
|
for k in ("interval", "headway"):
|
|
if k in tags:
|
|
v = tags[k]
|
|
break
|
|
else:
|
|
for kk in tags:
|
|
if kk.startswith(k + ":"):
|
|
v = tags[kk]
|
|
break
|
|
if not v:
|
|
return None
|
|
return osm_interval_to_seconds(v)
|
|
|
|
def __init__(self, relation, city, master=None):
|
|
assert Route.is_route(
|
|
relation, city.modes
|
|
), f"The relation does not seem to be a route: {relation}"
|
|
self.city = city
|
|
self.element = relation
|
|
self.id = el_id(relation)
|
|
|
|
self.ref = None
|
|
self.name = None
|
|
self.mode = None
|
|
self.colour = None
|
|
self.infill = None
|
|
self.network = None
|
|
self.interval = None
|
|
self.start_time = None
|
|
self.end_time = None
|
|
self.is_circular = False
|
|
self.stops = [] # List of RouteStop
|
|
# Would be a list of (lon, lat) for the longest stretch. Can be empty.
|
|
self.tracks = None
|
|
# Index of the fist stop that is located on/near the self.tracks
|
|
self.first_stop_on_rails_index = None
|
|
# Index of the last stop that is located on/near the self.tracks
|
|
self.last_stop_on_rails_index = None
|
|
|
|
self.process_tags(master)
|
|
stop_position_elements = self.process_stop_members()
|
|
self.process_tracks(stop_position_elements)
|
|
|
|
def build_longest_line(self):
|
|
line_nodes = set()
|
|
last_track = []
|
|
track = []
|
|
warned_about_holes = False
|
|
for m in self.element["members"]:
|
|
el = self.city.elements.get(el_id(m), None)
|
|
if not el or not StopArea.is_track(el):
|
|
continue
|
|
if "nodes" not in el or len(el["nodes"]) < 2:
|
|
self.city.error("Cannot find nodes in a railway", el)
|
|
continue
|
|
nodes = ["n{}".format(n) for n in el["nodes"]]
|
|
if m["role"] == "backward":
|
|
nodes.reverse()
|
|
line_nodes.update(nodes)
|
|
if not track:
|
|
is_first = True
|
|
track.extend(nodes)
|
|
else:
|
|
new_segment = list(nodes) # copying
|
|
if new_segment[0] == track[-1]:
|
|
track.extend(new_segment[1:])
|
|
elif new_segment[-1] == track[-1]:
|
|
track.extend(reversed(new_segment[:-1]))
|
|
elif is_first and track[0] in (
|
|
new_segment[0],
|
|
new_segment[-1],
|
|
):
|
|
# We can reverse the track and try again
|
|
track.reverse()
|
|
if new_segment[0] == track[-1]:
|
|
track.extend(new_segment[1:])
|
|
else:
|
|
track.extend(reversed(new_segment[:-1]))
|
|
else:
|
|
# Store the track if it is long and clean it
|
|
if not warned_about_holes:
|
|
self.city.warn(
|
|
"Hole in route rails near node {}".format(
|
|
track[-1]
|
|
),
|
|
self.element,
|
|
)
|
|
warned_about_holes = True
|
|
if len(track) > len(last_track):
|
|
last_track = track
|
|
track = []
|
|
is_first = False
|
|
if len(track) > len(last_track):
|
|
last_track = track
|
|
# Remove duplicate points
|
|
last_track = [
|
|
last_track[i]
|
|
for i in range(0, len(last_track))
|
|
if i == 0 or last_track[i - 1] != last_track[i]
|
|
]
|
|
return last_track, line_nodes
|
|
|
|
def get_stop_projections(self):
|
|
projected = [project_on_line(x.stop, self.tracks) for x in self.stops]
|
|
|
|
def stop_near_tracks_criterion(stop_index: int):
|
|
return (
|
|
projected[stop_index]["projected_point"] is not None
|
|
and distance(
|
|
self.stops[stop_index].stop,
|
|
projected[stop_index]["projected_point"],
|
|
)
|
|
<= MAX_DISTANCE_STOP_TO_LINE
|
|
)
|
|
|
|
return projected, stop_near_tracks_criterion
|
|
|
|
def project_stops_on_line(self):
|
|
projected, stop_near_tracks_criterion = self.get_stop_projections()
|
|
|
|
projected_stops_data = {
|
|
"first_stop_on_rails_index": None,
|
|
"last_stop_on_rails_index": None,
|
|
"stops_on_longest_line": [], # list [{'route_stop': RouteStop,
|
|
# 'coords': (lon, lat),
|
|
# 'positions_on_rails': [] }
|
|
}
|
|
first_index = 0
|
|
while first_index < len(self.stops) and not stop_near_tracks_criterion(
|
|
first_index
|
|
):
|
|
first_index += 1
|
|
projected_stops_data["first_stop_on_rails_index"] = first_index
|
|
|
|
last_index = len(self.stops) - 1
|
|
while last_index > projected_stops_data[
|
|
"first_stop_on_rails_index"
|
|
] and not stop_near_tracks_criterion(last_index):
|
|
last_index -= 1
|
|
projected_stops_data["last_stop_on_rails_index"] = last_index
|
|
|
|
for i, route_stop in enumerate(self.stops):
|
|
if not first_index <= i <= last_index:
|
|
continue
|
|
|
|
if projected[i]["projected_point"] is None:
|
|
self.city.error(
|
|
'Stop "{}" {} is nowhere near the tracks'.format(
|
|
route_stop.stoparea.name, route_stop.stop
|
|
),
|
|
self.element,
|
|
)
|
|
else:
|
|
stop_data = {
|
|
"route_stop": route_stop,
|
|
"coords": None,
|
|
"positions_on_rails": None,
|
|
}
|
|
projected_point = projected[i]["projected_point"]
|
|
# We've got two separate stations with a good stretch of
|
|
# railway tracks between them. Put these on tracks.
|
|
d = round(distance(route_stop.stop, projected_point))
|
|
if d > MAX_DISTANCE_STOP_TO_LINE:
|
|
self.city.notice(
|
|
'Stop "{}" {} is {} meters from the tracks'.format(
|
|
route_stop.stoparea.name, route_stop.stop, d
|
|
),
|
|
self.element,
|
|
)
|
|
else:
|
|
stop_data["coords"] = projected_point
|
|
stop_data["positions_on_rails"] = projected[i][
|
|
"positions_on_line"
|
|
]
|
|
projected_stops_data["stops_on_longest_line"].append(stop_data)
|
|
return projected_stops_data
|
|
|
|
def calculate_distances(self):
|
|
dist = 0
|
|
vertex = 0
|
|
for i, stop in enumerate(self.stops):
|
|
if i > 0:
|
|
direct = distance(stop.stop, self.stops[i - 1].stop)
|
|
d_line = None
|
|
if (
|
|
self.first_stop_on_rails_index
|
|
<= i
|
|
<= self.last_stop_on_rails_index
|
|
):
|
|
d_line = distance_on_line(
|
|
self.stops[i - 1].stop, stop.stop, self.tracks, vertex
|
|
)
|
|
if d_line and direct - 10 <= d_line[0] <= direct * 2:
|
|
vertex = d_line[1]
|
|
dist += round(d_line[0])
|
|
else:
|
|
dist += round(direct)
|
|
stop.distance = dist
|
|
|
|
def process_tags(self, master):
|
|
relation = self.element
|
|
master_tags = {} if not master else master["tags"]
|
|
if "ref" not in relation["tags"] and "ref" not in master_tags:
|
|
self.city.notice("Missing ref on a route", relation)
|
|
self.ref = relation["tags"].get(
|
|
"ref", master_tags.get("ref", relation["tags"].get("name", None))
|
|
)
|
|
self.name = relation["tags"].get("name", None)
|
|
self.mode = relation["tags"]["route"]
|
|
if (
|
|
"colour" not in relation["tags"]
|
|
and "colour" not in master_tags
|
|
and self.mode != "tram"
|
|
):
|
|
self.city.notice("Missing colour on a route", relation)
|
|
try:
|
|
self.colour = normalize_colour(
|
|
relation["tags"].get("colour", master_tags.get("colour", None))
|
|
)
|
|
except ValueError as e:
|
|
self.colour = None
|
|
self.city.warn(str(e), relation)
|
|
try:
|
|
self.infill = normalize_colour(
|
|
relation["tags"].get(
|
|
"colour:infill", master_tags.get("colour:infill", None)
|
|
)
|
|
)
|
|
except ValueError as e:
|
|
self.infill = None
|
|
self.city.warn(str(e), relation)
|
|
self.network = Route.get_network(relation)
|
|
self.interval = Route.get_interval(
|
|
relation["tags"]
|
|
) or Route.get_interval(master_tags)
|
|
self.start_time, self.end_time = get_start_end_times(
|
|
relation["tags"].get(
|
|
"opening_hours", master_tags.get("opening_hours", "")
|
|
)
|
|
)
|
|
if relation["tags"].get("public_transport:version") == "1":
|
|
self.city.warn(
|
|
"Public transport version is 1, which means the route "
|
|
"is an unsorted pile of objects",
|
|
relation,
|
|
)
|
|
|
|
def process_stop_members(self):
|
|
stations = set() # temporary for recording stations
|
|
seen_stops = False
|
|
seen_platforms = False
|
|
repeat_pos = None
|
|
stop_position_elements = []
|
|
for m in self.element["members"]:
|
|
if "inactive" in m["role"]:
|
|
continue
|
|
k = el_id(m)
|
|
if k in self.city.stations:
|
|
st_list = self.city.stations[k]
|
|
st = st_list[0]
|
|
if len(st_list) > 1:
|
|
self.city.error(
|
|
f"Ambiguous station {st.name} in route. Please "
|
|
"use stop_position or split interchange stations",
|
|
self.element,
|
|
)
|
|
el = self.city.elements[k]
|
|
actual_role = RouteStop.get_actual_role(
|
|
el, m["role"], self.city.modes
|
|
)
|
|
if actual_role:
|
|
if m["role"] and actual_role not in m["role"]:
|
|
self.city.warn(
|
|
"Wrong role '{}' for {} {}".format(
|
|
m["role"], actual_role, k
|
|
),
|
|
self.element,
|
|
)
|
|
if repeat_pos is None:
|
|
if not self.stops or st not in stations:
|
|
stop = RouteStop(st)
|
|
self.stops.append(stop)
|
|
stations.add(st)
|
|
elif self.stops[-1].stoparea.id == st.id:
|
|
stop = self.stops[-1]
|
|
else:
|
|
# We've got a repeat
|
|
if (
|
|
(seen_stops and seen_platforms)
|
|
or (
|
|
actual_role == "stop"
|
|
and not seen_platforms
|
|
)
|
|
or (
|
|
actual_role == "platform"
|
|
and not seen_stops
|
|
)
|
|
):
|
|
# Circular route!
|
|
stop = RouteStop(st)
|
|
self.stops.append(stop)
|
|
stations.add(st)
|
|
else:
|
|
repeat_pos = 0
|
|
if repeat_pos is not None:
|
|
if repeat_pos >= len(self.stops):
|
|
continue
|
|
# Check that the type matches
|
|
if (actual_role == "stop" and seen_stops) or (
|
|
actual_role == "platform" and seen_platforms
|
|
):
|
|
self.city.error(
|
|
'Found an out-of-place {}: "{}" ({})'.format(
|
|
actual_role, el["tags"].get("name", ""), k
|
|
),
|
|
self.element,
|
|
)
|
|
continue
|
|
# Find the matching stop starting with index repeat_pos
|
|
while (
|
|
repeat_pos < len(self.stops)
|
|
and self.stops[repeat_pos].stoparea.id != st.id
|
|
):
|
|
repeat_pos += 1
|
|
if repeat_pos >= len(self.stops):
|
|
self.city.error(
|
|
"Incorrect order of {}s at {}".format(
|
|
actual_role, k
|
|
),
|
|
self.element,
|
|
)
|
|
continue
|
|
stop = self.stops[repeat_pos]
|
|
|
|
stop.add(m, self.element, self.city)
|
|
if repeat_pos is None:
|
|
seen_stops |= stop.seen_stop or stop.seen_station
|
|
seen_platforms |= stop.seen_platform
|
|
|
|
if StopArea.is_stop(el):
|
|
stop_position_elements.append(el)
|
|
|
|
continue
|
|
|
|
if k not in self.city.elements:
|
|
if "stop" in m["role"] or "platform" in m["role"]:
|
|
raise CriticalValidationError(
|
|
f"{m['role']} {m['type']} {m['ref']} for route "
|
|
f"relation {self.element['id']} is not in the dataset"
|
|
)
|
|
continue
|
|
el = self.city.elements[k]
|
|
if "tags" not in el:
|
|
self.city.error(
|
|
f"Untagged object {k} in a route", self.element
|
|
)
|
|
continue
|
|
|
|
is_under_construction = False
|
|
for ck in CONSTRUCTION_KEYS:
|
|
if ck in el["tags"]:
|
|
self.city.warn(
|
|
f"Under construction {m['role'] or 'feature'} {k} "
|
|
"in route. Consider setting 'inactive' role or "
|
|
"removing construction attributes",
|
|
self.element,
|
|
)
|
|
is_under_construction = True
|
|
break
|
|
if is_under_construction:
|
|
continue
|
|
|
|
if Station.is_station(el, self.city.modes):
|
|
# A station may be not included into this route due to previous
|
|
# 'stop area has multiple stations' error. No other error
|
|
# message is needed.
|
|
pass
|
|
elif el["tags"].get("railway") in ("station", "halt"):
|
|
self.city.error(
|
|
"Missing station={} on a {}".format(self.mode, m["role"]),
|
|
el,
|
|
)
|
|
else:
|
|
actual_role = RouteStop.get_actual_role(
|
|
el, m["role"], self.city.modes
|
|
)
|
|
if actual_role:
|
|
self.city.error(
|
|
f"{actual_role} {m['type']} {m['ref']} is not "
|
|
"connected to a station in route",
|
|
self.element,
|
|
)
|
|
elif not StopArea.is_track(el):
|
|
self.city.warn(
|
|
"Unknown member type for {} {} in route".format(
|
|
m["type"], m["ref"]
|
|
),
|
|
self.element,
|
|
)
|
|
return stop_position_elements
|
|
|
|
def process_tracks(self, stop_position_elements: list[dict]) -> None:
|
|
tracks, line_nodes = self.build_longest_line()
|
|
|
|
for stop_el in stop_position_elements:
|
|
stop_id = el_id(stop_el)
|
|
if stop_id not in line_nodes:
|
|
self.city.warn(
|
|
'Stop position "{}" ({}) is not on tracks'.format(
|
|
stop_el["tags"].get("name", ""), stop_id
|
|
),
|
|
self.element,
|
|
)
|
|
|
|
# self.tracks would be a list of (lon, lat) for the longest stretch.
|
|
# Can be empty.
|
|
self.tracks = [el_center(self.city.elements.get(k)) for k in tracks]
|
|
if (
|
|
None in self.tracks
|
|
): # usually, extending BBOX for the city is needed
|
|
self.tracks = []
|
|
for n in filter(lambda x: x not in self.city.elements, tracks):
|
|
self.city.warn(
|
|
f"The dataset is missing the railway tracks node {n}",
|
|
self.element,
|
|
)
|
|
break
|
|
|
|
if len(self.stops) > 1:
|
|
self.is_circular = (
|
|
self.stops[0].stoparea == self.stops[-1].stoparea
|
|
)
|
|
if (
|
|
self.is_circular
|
|
and self.tracks
|
|
and self.tracks[0] != self.tracks[-1]
|
|
):
|
|
self.city.warn(
|
|
"Non-closed rail sequence in a circular route",
|
|
self.element,
|
|
)
|
|
|
|
projected_stops_data = self.project_stops_on_line()
|
|
self.check_and_recover_stops_order(projected_stops_data)
|
|
self.apply_projected_stops_data(projected_stops_data)
|
|
|
|
def apply_projected_stops_data(self, projected_stops_data: dict) -> None:
|
|
"""Store better stop coordinates and indexes of first/last stops
|
|
that lie on a continuous track line, to the instance attributes.
|
|
"""
|
|
for attr in ("first_stop_on_rails_index", "last_stop_on_rails_index"):
|
|
setattr(self, attr, projected_stops_data[attr])
|
|
|
|
for stop_data in projected_stops_data["stops_on_longest_line"]:
|
|
route_stop = stop_data["route_stop"]
|
|
route_stop.positions_on_rails = stop_data["positions_on_rails"]
|
|
if stop_coords := stop_data["coords"]:
|
|
route_stop.stop = stop_coords
|
|
|
|
def get_extended_tracks(self):
|
|
"""Amend tracks with points of leading/trailing self.stops
|
|
that were not projected onto the longest tracks line.
|
|
Return a new array.
|
|
"""
|
|
if self.first_stop_on_rails_index >= len(self.stops):
|
|
tracks = [route_stop.stop for route_stop in self.stops]
|
|
else:
|
|
tracks = (
|
|
[
|
|
route_stop.stop
|
|
for i, route_stop in enumerate(self.stops)
|
|
if i < self.first_stop_on_rails_index
|
|
]
|
|
+ self.tracks
|
|
+ [
|
|
route_stop.stop
|
|
for i, route_stop in enumerate(self.stops)
|
|
if i > self.last_stop_on_rails_index
|
|
]
|
|
)
|
|
return tracks
|
|
|
|
def get_truncated_tracks(self, tracks):
|
|
"""Truncate leading/trailing segments of `tracks` param
|
|
that are beyond the first and last stop locations.
|
|
Return a new array.
|
|
"""
|
|
if self.is_circular:
|
|
return tracks.copy()
|
|
|
|
first_stop_location = find_segment(self.stops[0].stop, tracks, 0)
|
|
last_stop_location = find_segment(self.stops[-1].stop, tracks, 0)
|
|
|
|
if last_stop_location != (None, None):
|
|
seg2, u2 = last_stop_location
|
|
if u2 == 0.0:
|
|
# Make seg2 the segment the last_stop_location is
|
|
# at the middle or end of
|
|
seg2 -= 1
|
|
# u2 = 1.0
|
|
if seg2 + 2 < len(tracks):
|
|
tracks = tracks[0 : seg2 + 2] # noqa E203
|
|
tracks[-1] = self.stops[-1].stop
|
|
|
|
if first_stop_location != (None, None):
|
|
seg1, u1 = first_stop_location
|
|
if u1 == 1.0:
|
|
# Make seg1 the segment the first_stop_location is
|
|
# at the beginning or middle of
|
|
seg1 += 1
|
|
# u1 = 0.0
|
|
if seg1 > 0:
|
|
tracks = tracks[seg1:]
|
|
tracks[0] = self.stops[0].stop
|
|
|
|
return tracks
|
|
|
|
def get_tracks_geometry(self):
|
|
tracks = self.get_extended_tracks()
|
|
tracks = self.get_truncated_tracks(tracks)
|
|
return tracks
|
|
|
|
def check_stops_order_by_angle(self) -> tuple[list, list]:
|
|
disorder_warnings = []
|
|
disorder_errors = []
|
|
for i, route_stop in enumerate(
|
|
islice(self.stops, 1, len(self.stops) - 1), start=1
|
|
):
|
|
angle = angle_between(
|
|
self.stops[i - 1].stop,
|
|
route_stop.stop,
|
|
self.stops[i + 1].stop,
|
|
)
|
|
if angle < ALLOWED_ANGLE_BETWEEN_STOPS:
|
|
msg = (
|
|
"Angle between stops around "
|
|
f'"{route_stop.stoparea.name}" {route_stop.stop} '
|
|
f"is too narrow, {angle} degrees"
|
|
)
|
|
if angle < DISALLOWED_ANGLE_BETWEEN_STOPS:
|
|
disorder_errors.append(msg)
|
|
else:
|
|
disorder_warnings.append(msg)
|
|
return disorder_warnings, disorder_errors
|
|
|
|
def check_stops_order_on_tracks_direct(self, stop_sequence) -> str | None:
|
|
"""Checks stops order on tracks, following stop_sequence
|
|
in direct order only.
|
|
:param stop_sequence: list of dict{'route_stop', 'positions_on_rails',
|
|
'coords'} for RouteStops that belong to the longest contiguous
|
|
sequence of tracks in a route.
|
|
:return: error message on the first order violation or None.
|
|
"""
|
|
allowed_order_violations = 1 if self.is_circular else 0
|
|
max_position_on_rails = -1
|
|
for stop_data in stop_sequence:
|
|
positions_on_rails = stop_data["positions_on_rails"]
|
|
suitable_occurrence = 0
|
|
while (
|
|
suitable_occurrence < len(positions_on_rails)
|
|
and positions_on_rails[suitable_occurrence]
|
|
< max_position_on_rails
|
|
):
|
|
suitable_occurrence += 1
|
|
if suitable_occurrence == len(positions_on_rails):
|
|
if allowed_order_violations > 0:
|
|
suitable_occurrence -= 1
|
|
allowed_order_violations -= 1
|
|
else:
|
|
route_stop = stop_data["route_stop"]
|
|
return (
|
|
"Stops on tracks are unordered near "
|
|
f'"{route_stop.stoparea.name}" {route_stop.stop}'
|
|
)
|
|
max_position_on_rails = positions_on_rails[suitable_occurrence]
|
|
|
|
def check_stops_order_on_tracks(self, projected_stops_data) -> str | None:
|
|
"""Checks stops order on tracks, trying direct and reversed
|
|
order of stops in the stop_sequence.
|
|
:param projected_stops_data: info about RouteStops that belong to the
|
|
longest contiguous sequence of tracks in a route. May be changed
|
|
if tracks reversing is performed.
|
|
:return: error message on the first order violation or None.
|
|
"""
|
|
error_message = self.check_stops_order_on_tracks_direct(
|
|
projected_stops_data["stops_on_longest_line"]
|
|
)
|
|
if error_message:
|
|
error_message_reversed = self.check_stops_order_on_tracks_direct(
|
|
reversed(projected_stops_data["stops_on_longest_line"])
|
|
)
|
|
if error_message_reversed is None:
|
|
error_message = None
|
|
self.city.warn(
|
|
"Tracks seem to go in the opposite direction to stops",
|
|
self.element,
|
|
)
|
|
self.tracks.reverse()
|
|
new_projected_stops_data = self.project_stops_on_line()
|
|
projected_stops_data.update(new_projected_stops_data)
|
|
|
|
return error_message
|
|
|
|
def check_stops_order(self, projected_stops_data):
|
|
(
|
|
angle_disorder_warnings,
|
|
angle_disorder_errors,
|
|
) = self.check_stops_order_by_angle()
|
|
disorder_on_tracks_error = self.check_stops_order_on_tracks(
|
|
projected_stops_data
|
|
)
|
|
disorder_warnings = angle_disorder_warnings
|
|
disorder_errors = angle_disorder_errors
|
|
if disorder_on_tracks_error:
|
|
disorder_errors.append(disorder_on_tracks_error)
|
|
return disorder_warnings, disorder_errors
|
|
|
|
def check_and_recover_stops_order(self, projected_stops_data: dict):
|
|
"""
|
|
:param projected_stops_data: may change if we need to reverse tracks
|
|
"""
|
|
disorder_warnings, disorder_errors = self.check_stops_order(
|
|
projected_stops_data
|
|
)
|
|
if disorder_warnings or disorder_errors:
|
|
resort_success = False
|
|
if self.city.recovery_data:
|
|
resort_success = self.try_resort_stops()
|
|
if resort_success:
|
|
for msg in disorder_warnings:
|
|
self.city.notice(msg, self.element)
|
|
for msg in disorder_errors:
|
|
self.city.warn(
|
|
"Fixed with recovery data: " + msg, self.element
|
|
)
|
|
|
|
if not resort_success:
|
|
for msg in disorder_warnings:
|
|
self.city.notice(msg, self.element)
|
|
for msg in disorder_errors:
|
|
self.city.error(msg, self.element)
|
|
|
|
def try_resort_stops(self):
|
|
"""Precondition: self.city.recovery_data is not None.
|
|
Return success of station order recovering."""
|
|
self_stops = {} # station name => RouteStop
|
|
for stop in self.stops:
|
|
station = stop.stoparea.station
|
|
stop_name = station.name
|
|
if stop_name == "?" and station.int_name:
|
|
stop_name = station.int_name
|
|
# We won't programmatically recover routes with repeating stations:
|
|
# such cases are rare and deserves manual verification
|
|
if stop_name in self_stops:
|
|
return False
|
|
self_stops[stop_name] = stop
|
|
|
|
route_id = (self.colour, self.ref)
|
|
if route_id not in self.city.recovery_data:
|
|
return False
|
|
|
|
stop_names = list(self_stops.keys())
|
|
suitable_itineraries = []
|
|
for itinerary in self.city.recovery_data[route_id]:
|
|
itinerary_stop_names = [
|
|
stop["name"] for stop in itinerary["stations"]
|
|
]
|
|
if not (
|
|
len(stop_names) == len(itinerary_stop_names)
|
|
and sorted(stop_names) == sorted(itinerary_stop_names)
|
|
):
|
|
continue
|
|
big_station_displacement = False
|
|
for it_stop in itinerary["stations"]:
|
|
name = it_stop["name"]
|
|
it_stop_center = it_stop["center"]
|
|
self_stop_center = self_stops[name].stoparea.station.center
|
|
if (
|
|
distance(it_stop_center, self_stop_center)
|
|
> DISPLACEMENT_TOLERANCE
|
|
):
|
|
big_station_displacement = True
|
|
break
|
|
if not big_station_displacement:
|
|
suitable_itineraries.append(itinerary)
|
|
|
|
if len(suitable_itineraries) == 0:
|
|
return False
|
|
elif len(suitable_itineraries) == 1:
|
|
matching_itinerary = suitable_itineraries[0]
|
|
else:
|
|
from_tag = self.element["tags"].get("from")
|
|
to_tag = self.element["tags"].get("to")
|
|
if not from_tag and not to_tag:
|
|
return False
|
|
matching_itineraries = [
|
|
itin
|
|
for itin in suitable_itineraries
|
|
if from_tag
|
|
and itin["from"] == from_tag
|
|
or to_tag
|
|
and itin["to"] == to_tag
|
|
]
|
|
if len(matching_itineraries) != 1:
|
|
return False
|
|
matching_itinerary = matching_itineraries[0]
|
|
self.stops = [
|
|
self_stops[stop["name"]] for stop in matching_itinerary["stations"]
|
|
]
|
|
return True
|
|
|
|
def __len__(self):
|
|
return len(self.stops)
|
|
|
|
def __getitem__(self, i):
|
|
return self.stops[i]
|
|
|
|
def __iter__(self):
|
|
return iter(self.stops)
|
|
|
|
def __repr__(self):
|
|
return (
|
|
"Route(id={}, mode={}, ref={}, name={}, network={}, interval={}, "
|
|
"circular={}, num_stops={}, line_length={} m, from={}, to={}"
|
|
).format(
|
|
self.id,
|
|
self.mode,
|
|
self.ref,
|
|
self.name,
|
|
self.network,
|
|
self.interval,
|
|
self.is_circular,
|
|
len(self.stops),
|
|
self.stops[-1].distance,
|
|
self.stops[0],
|
|
self.stops[-1],
|
|
)
|
|
|
|
|
|
class RouteMaster:
|
|
def __init__(self, master=None):
|
|
self.routes = []
|
|
self.best = None
|
|
self.id = el_id(master)
|
|
self.has_master = master is not None
|
|
self.interval_from_master = False
|
|
if master:
|
|
self.ref = master["tags"].get(
|
|
"ref", master["tags"].get("name", None)
|
|
)
|
|
try:
|
|
self.colour = normalize_colour(
|
|
master["tags"].get("colour", None)
|
|
)
|
|
except ValueError:
|
|
self.colour = None
|
|
try:
|
|
self.infill = normalize_colour(
|
|
master["tags"].get("colour:infill", None)
|
|
)
|
|
except ValueError:
|
|
self.infill = None
|
|
self.network = Route.get_network(master)
|
|
self.mode = master["tags"].get(
|
|
"route_master", None
|
|
) # This tag is required, but okay
|
|
self.name = master["tags"].get("name", None)
|
|
self.interval = Route.get_interval(master["tags"])
|
|
self.interval_from_master = self.interval is not None
|
|
else:
|
|
self.ref = None
|
|
self.colour = None
|
|
self.infill = None
|
|
self.network = None
|
|
self.mode = None
|
|
self.name = None
|
|
self.interval = None
|
|
|
|
def add(self, route, city):
|
|
if not self.network:
|
|
self.network = route.network
|
|
elif route.network and route.network != self.network:
|
|
city.error(
|
|
'Route has different network ("{}") from master "{}"'.format(
|
|
route.network, self.network
|
|
),
|
|
route.element,
|
|
)
|
|
|
|
if not self.colour:
|
|
self.colour = route.colour
|
|
elif route.colour and route.colour != self.colour:
|
|
city.notice(
|
|
'Route "{}" has different colour from master "{}"'.format(
|
|
route.colour, self.colour
|
|
),
|
|
route.element,
|
|
)
|
|
|
|
if not self.infill:
|
|
self.infill = route.infill
|
|
elif route.infill and route.infill != self.infill:
|
|
city.notice(
|
|
(
|
|
f'Route "{route.infill}" has different infill colour '
|
|
f'from master "{self.infill}"'
|
|
),
|
|
route.element,
|
|
)
|
|
|
|
if not self.ref:
|
|
self.ref = route.ref
|
|
elif route.ref != self.ref:
|
|
city.notice(
|
|
'Route "{}" has different ref from master "{}"'.format(
|
|
route.ref, self.ref
|
|
),
|
|
route.element,
|
|
)
|
|
|
|
if not self.name:
|
|
self.name = route.name
|
|
|
|
if not self.mode:
|
|
self.mode = route.mode
|
|
elif route.mode != self.mode:
|
|
city.error(
|
|
"Incompatible PT mode: master has {} and route has {}".format(
|
|
self.mode, route.mode
|
|
),
|
|
route.element,
|
|
)
|
|
return
|
|
|
|
if not self.interval_from_master and route.interval:
|
|
if not self.interval:
|
|
self.interval = route.interval
|
|
else:
|
|
self.interval = min(self.interval, route.interval)
|
|
|
|
if not self.has_master and (not self.id or self.id > route.id):
|
|
self.id = route.id
|
|
|
|
self.routes.append(route)
|
|
if not self.best or len(route.stops) > len(self.best.stops):
|
|
self.best = route
|
|
|
|
def stop_areas(self):
|
|
"""Returns a list of all stations on all route variants."""
|
|
seen_ids = set()
|
|
for route in self.routes:
|
|
for stop in route:
|
|
st = stop.stoparea
|
|
if st.id not in seen_ids:
|
|
seen_ids.add(st.id)
|
|
yield st
|
|
|
|
def __len__(self):
|
|
return len(self.routes)
|
|
|
|
def __getitem__(self, i):
|
|
return self.routes[i]
|
|
|
|
def __iter__(self):
|
|
return iter(self.routes)
|
|
|
|
def __repr__(self):
|
|
return (
|
|
f"RouteMaster(id={self.id}, mode={self.mode}, ref={self.ref}, "
|
|
f"name={self.name}, network={self.network}, "
|
|
f"num_variants={len(self.routes)}"
|
|
)
|
|
|
|
|
|
class City:
|
|
route_class = Route
|
|
|
|
def __init__(self, city_data, overground=False):
|
|
self.validate_called = False
|
|
self.errors = []
|
|
self.warnings = []
|
|
self.notices = []
|
|
self.try_fill_int_attribute(city_data, "id")
|
|
self.name = city_data["name"]
|
|
self.country = city_data["country"]
|
|
self.continent = city_data["continent"]
|
|
self.overground = overground
|
|
if not overground:
|
|
self.try_fill_int_attribute(city_data, "num_stations")
|
|
self.try_fill_int_attribute(city_data, "num_lines", "0")
|
|
self.try_fill_int_attribute(city_data, "num_light_lines", "0")
|
|
self.try_fill_int_attribute(city_data, "num_interchanges", "0")
|
|
else:
|
|
self.try_fill_int_attribute(city_data, "num_tram_lines", "0")
|
|
self.try_fill_int_attribute(city_data, "num_trolleybus_lines", "0")
|
|
self.try_fill_int_attribute(city_data, "num_bus_lines", "0")
|
|
self.try_fill_int_attribute(city_data, "num_other_lines", "0")
|
|
|
|
# Acquiring list of networks and modes
|
|
networks = (
|
|
None
|
|
if not city_data["networks"]
|
|
else city_data["networks"].split(":")
|
|
)
|
|
if not networks or len(networks[-1]) == 0:
|
|
self.networks = []
|
|
else:
|
|
self.networks = set(
|
|
filter(None, [x.strip() for x in networks[-1].split(";")])
|
|
)
|
|
if not networks or len(networks) < 2 or len(networks[0]) == 0:
|
|
if self.overground:
|
|
self.modes = DEFAULT_MODES_OVERGROUND
|
|
else:
|
|
self.modes = DEFAULT_MODES_RAPID
|
|
else:
|
|
self.modes = set([x.strip() for x in networks[0].split(",")])
|
|
|
|
# Reversing bbox so it is (xmin, ymin, xmax, ymax)
|
|
bbox = city_data["bbox"].split(",")
|
|
if len(bbox) == 4:
|
|
self.bbox = [float(bbox[i]) for i in (1, 0, 3, 2)]
|
|
else:
|
|
self.bbox = None
|
|
|
|
self.elements = {} # Dict el_id → el
|
|
self.stations = defaultdict(list) # Dict el_id → list of StopAreas
|
|
self.routes = {} # Dict route_master_ref → RouteMaster
|
|
self.masters = {} # Dict el_id of route → route_master
|
|
self.stop_areas = defaultdict(
|
|
list
|
|
) # El_id → list of stop_area elements it belongs to
|
|
self.transfers = [] # List of lists of stop areas
|
|
self.station_ids = set() # Set of stations' uid
|
|
self.stops_and_platforms = set() # Set of stops and platforms el_id
|
|
self.recovery_data = None
|
|
|
|
def try_fill_int_attribute(
|
|
self, city_data: dict, attr: str, default: str | None = None
|
|
) -> None:
|
|
"""Try to convert string value to int. Conversion is considered
|
|
to fail if one of the following is true:
|
|
* attr is not empty and data type casting fails;
|
|
* attr is empty and no default value is given.
|
|
In such cases the city is marked as bad by adding an error
|
|
to the city validation log.
|
|
"""
|
|
attr_value = city_data[attr]
|
|
if not attr_value and default is not None:
|
|
attr_value = default
|
|
|
|
try:
|
|
attr_int = int(attr_value)
|
|
except ValueError:
|
|
print_value = (
|
|
f"{city_data[attr]}" if city_data[attr] else "<empty>"
|
|
)
|
|
self.error(
|
|
f"Configuration error: wrong value for {attr}: {print_value}"
|
|
)
|
|
setattr(self, attr, 0)
|
|
else:
|
|
setattr(self, attr, attr_int)
|
|
|
|
@staticmethod
|
|
def log_message(message, el):
|
|
if el:
|
|
tags = el.get("tags", {})
|
|
message += ' ({} {}, "{}")'.format(
|
|
el["type"],
|
|
el.get("id", el.get("ref")),
|
|
tags.get("name", tags.get("ref", "")),
|
|
)
|
|
return message
|
|
|
|
def notice(self, message, el=None):
|
|
"""This type of message may point to a potential problem."""
|
|
msg = City.log_message(message, el)
|
|
self.notices.append(msg)
|
|
|
|
def warn(self, message, el=None):
|
|
"""A warning is definitely a problem but is doesn't prevent
|
|
from building a routing file and doesn't invalidate the city.
|
|
"""
|
|
msg = City.log_message(message, el)
|
|
self.warnings.append(msg)
|
|
|
|
def error(self, message, el=None):
|
|
"""Error if a critical problem that invalidates the city"""
|
|
msg = City.log_message(message, el)
|
|
self.errors.append(msg)
|
|
|
|
def contains(self, el):
|
|
center = el_center(el)
|
|
if center:
|
|
return (
|
|
self.bbox[0] <= center[1] <= self.bbox[2]
|
|
and self.bbox[1] <= center[0] <= self.bbox[3]
|
|
)
|
|
return False
|
|
|
|
def add(self, el):
|
|
if el["type"] == "relation" and "members" not in el:
|
|
return
|
|
|
|
self.elements[el_id(el)] = el
|
|
if not (el["type"] == "relation" and "tags" in el):
|
|
return
|
|
|
|
relation_type = el["tags"].get("type")
|
|
if relation_type == "route_master":
|
|
for m in el["members"]:
|
|
if m["type"] != "relation":
|
|
continue
|
|
|
|
if el_id(m) in self.masters:
|
|
self.error("Route in two route_masters", m)
|
|
self.masters[el_id(m)] = el
|
|
|
|
elif el["tags"].get("public_transport") == "stop_area":
|
|
if relation_type != "public_transport":
|
|
self.warn(
|
|
"stop_area relation with "
|
|
f"type={relation_type}, needed type=public_transport",
|
|
el,
|
|
)
|
|
return
|
|
|
|
warned_about_duplicates = False
|
|
for m in el["members"]:
|
|
stop_areas = self.stop_areas[el_id(m)]
|
|
if el in stop_areas and not warned_about_duplicates:
|
|
self.warn("Duplicate element in a stop area", el)
|
|
warned_about_duplicates = True
|
|
else:
|
|
stop_areas.append(el)
|
|
|
|
def make_transfer(self, sag):
|
|
transfer = set()
|
|
for m in sag["members"]:
|
|
k = el_id(m)
|
|
el = self.elements.get(k)
|
|
if not el:
|
|
# A sag member may validly not belong to the city while
|
|
# the sag does - near the city bbox boundary
|
|
continue
|
|
if "tags" not in el:
|
|
self.warn(
|
|
"An untagged object {} in a stop_area_group".format(k), sag
|
|
)
|
|
continue
|
|
if (
|
|
el["type"] != "relation"
|
|
or el["tags"].get("type") != "public_transport"
|
|
or el["tags"].get("public_transport") != "stop_area"
|
|
):
|
|
continue
|
|
if k in self.stations:
|
|
stoparea = self.stations[k][0]
|
|
transfer.add(stoparea)
|
|
if stoparea.transfer:
|
|
# TODO: properly process such cases.
|
|
# Counterexample 1: Paris,
|
|
# Châtelet subway station <->
|
|
# "Châtelet - Les Halles" railway station <->
|
|
# Les Halles subway station
|
|
# Counterexample 2: Saint-Petersburg, transfers
|
|
# Витебский вокзал <->
|
|
# Пушкинская <->
|
|
# Звенигородская
|
|
self.warn(
|
|
"Stop area {} belongs to multiple interchanges".format(
|
|
k
|
|
)
|
|
)
|
|
stoparea.transfer = el_id(sag)
|
|
if len(transfer) > 1:
|
|
self.transfers.append(transfer)
|
|
|
|
def extract_routes(self):
|
|
# Extract stations
|
|
processed_stop_areas = set()
|
|
for el in self.elements.values():
|
|
if Station.is_station(el, self.modes):
|
|
# See PR https://github.com/mapsme/subways/pull/98
|
|
if (
|
|
el["type"] == "relation"
|
|
and el["tags"].get("type") != "multipolygon"
|
|
):
|
|
rel_type = el["tags"].get("type")
|
|
self.warn(
|
|
"A railway station cannot be a relation of type "
|
|
f"{rel_type}",
|
|
el,
|
|
)
|
|
continue
|
|
st = Station(el, self)
|
|
self.station_ids.add(st.id)
|
|
if st.id in self.stop_areas:
|
|
stations = []
|
|
for sa in self.stop_areas[st.id]:
|
|
stations.append(StopArea(st, self, sa))
|
|
else:
|
|
stations = [StopArea(st, self)]
|
|
|
|
for station in stations:
|
|
if station.id not in processed_stop_areas:
|
|
processed_stop_areas.add(station.id)
|
|
for st_el in station.get_elements():
|
|
self.stations[st_el].append(station)
|
|
|
|
# Check that stops and platforms belong to
|
|
# a single stop_area
|
|
for sp in station.stops | station.platforms:
|
|
if sp in self.stops_and_platforms:
|
|
self.notice(
|
|
f"A stop or a platform {sp} belongs to "
|
|
"multiple stop areas, might be correct"
|
|
)
|
|
else:
|
|
self.stops_and_platforms.add(sp)
|
|
|
|
# Extract routes
|
|
for el in self.elements.values():
|
|
if Route.is_route(el, self.modes):
|
|
if el["tags"].get("access") in ("no", "private"):
|
|
continue
|
|
route_id = el_id(el)
|
|
master = self.masters.get(route_id, None)
|
|
if self.networks:
|
|
network = Route.get_network(el)
|
|
if master:
|
|
master_network = Route.get_network(master)
|
|
else:
|
|
master_network = None
|
|
if (
|
|
network not in self.networks
|
|
and master_network not in self.networks
|
|
):
|
|
continue
|
|
|
|
route = self.route_class(el, self, master)
|
|
if not route.stops:
|
|
self.warn("Route has no stops", el)
|
|
continue
|
|
elif len(route.stops) == 1:
|
|
self.warn("Route has only one stop", el)
|
|
continue
|
|
|
|
k = el_id(master) if master else route.ref
|
|
if k not in self.routes:
|
|
self.routes[k] = RouteMaster(master)
|
|
self.routes[k].add(route, self)
|
|
|
|
# Sometimes adding a route to a newly initialized RouteMaster
|
|
# can fail
|
|
if len(self.routes[k]) == 0:
|
|
del self.routes[k]
|
|
|
|
# And while we're iterating over relations, find interchanges
|
|
if (
|
|
el["type"] == "relation"
|
|
and el.get("tags", {}).get("public_transport", None)
|
|
== "stop_area_group"
|
|
):
|
|
self.make_transfer(el)
|
|
|
|
# Filter transfers, leaving only stations that belong to routes
|
|
used_stop_areas = set()
|
|
for rmaster in self.routes.values():
|
|
for route in rmaster:
|
|
used_stop_areas.update([s.stoparea for s in route.stops])
|
|
new_transfers = []
|
|
for transfer in self.transfers:
|
|
new_tr = [s for s in transfer if s in used_stop_areas]
|
|
if len(new_tr) > 1:
|
|
new_transfers.append(new_tr)
|
|
self.transfers = new_transfers
|
|
|
|
def __iter__(self):
|
|
return iter(self.routes.values())
|
|
|
|
@property
|
|
def is_good(self):
|
|
if not (self.errors or self.validate_called):
|
|
raise RuntimeError(
|
|
"You mustn't refer to City.is_good property before calling "
|
|
"the City.validate() method unless an error already occurred."
|
|
)
|
|
return len(self.errors) == 0
|
|
|
|
def get_validation_result(self):
|
|
result = {
|
|
"name": self.name,
|
|
"country": self.country,
|
|
"continent": self.continent,
|
|
"stations_found": getattr(self, "found_stations", 0),
|
|
"transfers_found": getattr(self, "found_interchanges", 0),
|
|
"unused_entrances": getattr(self, "unused_entrances", 0),
|
|
"networks": getattr(self, "found_networks", 0),
|
|
}
|
|
if not self.overground:
|
|
result.update(
|
|
{
|
|
"subwayl_expected": getattr(self, "num_lines", 0),
|
|
"lightrl_expected": getattr(self, "num_light_lines", 0),
|
|
"subwayl_found": getattr(self, "found_lines", 0),
|
|
"lightrl_found": getattr(self, "found_light_lines", 0),
|
|
"stations_expected": getattr(self, "num_stations", 0),
|
|
"transfers_expected": getattr(self, "num_interchanges", 0),
|
|
}
|
|
)
|
|
else:
|
|
result.update(
|
|
{
|
|
"stations_expected": 0,
|
|
"transfers_expected": 0,
|
|
"busl_expected": getattr(self, "num_bus_lines", 0),
|
|
"trolleybusl_expected": getattr(
|
|
self, "num_trolleybus_lines", 0
|
|
),
|
|
"traml_expected": getattr(self, "num_tram_lines", 0),
|
|
"otherl_expected": getattr(self, "num_other_lines", 0),
|
|
"busl_found": getattr(self, "found_bus_lines", 0),
|
|
"trolleybusl_found": getattr(
|
|
self, "found_trolleybus_lines", 0
|
|
),
|
|
"traml_found": getattr(self, "found_tram_lines", 0),
|
|
"otherl_found": getattr(self, "found_other_lines", 0),
|
|
}
|
|
)
|
|
result["warnings"] = self.warnings
|
|
result["errors"] = self.errors
|
|
result["notices"] = self.notices
|
|
return result
|
|
|
|
def count_unused_entrances(self):
|
|
global used_entrances
|
|
stop_areas = set()
|
|
for el in self.elements.values():
|
|
if (
|
|
el["type"] == "relation"
|
|
and "tags" in el
|
|
and el["tags"].get("public_transport") == "stop_area"
|
|
and "members" in el
|
|
):
|
|
stop_areas.update([el_id(m) for m in el["members"]])
|
|
unused = []
|
|
not_in_sa = []
|
|
for el in self.elements.values():
|
|
if (
|
|
el["type"] == "node"
|
|
and "tags" in el
|
|
and el["tags"].get("railway") == "subway_entrance"
|
|
):
|
|
i = el_id(el)
|
|
if i in self.stations:
|
|
used_entrances.add(i)
|
|
if i not in stop_areas:
|
|
not_in_sa.append(i)
|
|
if i not in self.stations:
|
|
unused.append(i)
|
|
self.unused_entrances = len(unused)
|
|
self.entrances_not_in_stop_areas = len(not_in_sa)
|
|
if unused:
|
|
self.notice(
|
|
f"{len(unused)} subway entrances are not connected to a "
|
|
f"station: {format_elid_list(unused)}"
|
|
)
|
|
if not_in_sa:
|
|
self.notice(
|
|
f"{len(not_in_sa)} subway entrances are not in stop_area "
|
|
f"relations: {format_elid_list(not_in_sa)}"
|
|
)
|
|
|
|
def check_return_routes(self, rmaster):
|
|
variants = {}
|
|
have_return = set()
|
|
for variant in rmaster:
|
|
if len(variant) < 2:
|
|
continue
|
|
# Using transfer ids because a train can arrive at different
|
|
# stations within a transfer. But disregard transfer that may give
|
|
# an impression of a circular route (for example,
|
|
# Simonis / Elisabeth station and route 2 in Brussels)
|
|
if variant[0].stoparea.transfer == variant[-1].stoparea.transfer:
|
|
t = (variant[0].stoparea.id, variant[-1].stoparea.id)
|
|
else:
|
|
t = (
|
|
variant[0].stoparea.transfer or variant[0].stoparea.id,
|
|
variant[-1].stoparea.transfer or variant[-1].stoparea.id,
|
|
)
|
|
if t in variants:
|
|
continue
|
|
variants[t] = variant.element
|
|
tr = (t[1], t[0])
|
|
if tr in variants:
|
|
have_return.add(t)
|
|
have_return.add(tr)
|
|
|
|
if len(variants) == 0:
|
|
self.error(
|
|
"An empty route master {}. Please set construction:route "
|
|
"if it is under construction".format(rmaster.id)
|
|
)
|
|
elif len(variants) == 1:
|
|
log_function = (
|
|
self.error if not rmaster.best.is_circular else self.notice
|
|
)
|
|
log_function(
|
|
"Only one route in route_master. "
|
|
"Please check if it needs a return route",
|
|
rmaster.best.element,
|
|
)
|
|
else:
|
|
for t, rel in variants.items():
|
|
if t not in have_return:
|
|
self.notice("Route does not have a return direction", rel)
|
|
|
|
def validate_lines(self):
|
|
self.found_light_lines = len(
|
|
[x for x in self.routes.values() if x.mode != "subway"]
|
|
)
|
|
self.found_lines = len(self.routes) - self.found_light_lines
|
|
if self.found_lines != self.num_lines:
|
|
self.error(
|
|
"Found {} subway lines, expected {}".format(
|
|
self.found_lines, self.num_lines
|
|
)
|
|
)
|
|
if self.found_light_lines != self.num_light_lines:
|
|
self.error(
|
|
"Found {} light rail lines, expected {}".format(
|
|
self.found_light_lines, self.num_light_lines
|
|
)
|
|
)
|
|
|
|
def validate_overground_lines(self):
|
|
self.found_tram_lines = len(
|
|
[x for x in self.routes.values() if x.mode == "tram"]
|
|
)
|
|
self.found_bus_lines = len(
|
|
[x for x in self.routes.values() if x.mode == "bus"]
|
|
)
|
|
self.found_trolleybus_lines = len(
|
|
[x for x in self.routes.values() if x.mode == "trolleybus"]
|
|
)
|
|
self.found_other_lines = len(
|
|
[
|
|
x
|
|
for x in self.routes.values()
|
|
if x.mode not in ("bus", "trolleybus", "tram")
|
|
]
|
|
)
|
|
if self.found_tram_lines != self.num_tram_lines:
|
|
log_function = (
|
|
self.error if self.found_tram_lines == 0 else self.notice
|
|
)
|
|
log_function(
|
|
"Found {} tram lines, expected {}".format(
|
|
self.found_tram_lines, self.num_tram_lines
|
|
),
|
|
)
|
|
|
|
def validate(self):
|
|
networks = Counter()
|
|
self.found_stations = 0
|
|
unused_stations = set(self.station_ids)
|
|
for rmaster in self.routes.values():
|
|
networks[str(rmaster.network)] += 1
|
|
if not self.overground:
|
|
self.check_return_routes(rmaster)
|
|
route_stations = set()
|
|
for sa in rmaster.stop_areas():
|
|
route_stations.add(sa.transfer or sa.id)
|
|
unused_stations.discard(sa.station.id)
|
|
self.found_stations += len(route_stations)
|
|
if unused_stations:
|
|
self.unused_stations = len(unused_stations)
|
|
self.notice(
|
|
"{} unused stations: {}".format(
|
|
self.unused_stations, format_elid_list(unused_stations)
|
|
)
|
|
)
|
|
self.count_unused_entrances()
|
|
self.found_interchanges = len(self.transfers)
|
|
|
|
if self.overground:
|
|
self.validate_overground_lines()
|
|
else:
|
|
self.validate_lines()
|
|
|
|
if self.found_stations != self.num_stations:
|
|
msg = "Found {} stations in routes, expected {}".format(
|
|
self.found_stations, self.num_stations
|
|
)
|
|
log_function = (
|
|
self.error
|
|
if self.num_stations > 0
|
|
and not (
|
|
0
|
|
<= (self.num_stations - self.found_stations)
|
|
/ self.num_stations
|
|
<= ALLOWED_STATIONS_MISMATCH
|
|
)
|
|
else self.warn
|
|
)
|
|
log_function(msg)
|
|
|
|
if self.found_interchanges != self.num_interchanges:
|
|
msg = "Found {} interchanges, expected {}".format(
|
|
self.found_interchanges, self.num_interchanges
|
|
)
|
|
log_function = (
|
|
self.error
|
|
if self.num_interchanges != 0
|
|
and not (
|
|
(self.num_interchanges - self.found_interchanges)
|
|
/ self.num_interchanges
|
|
<= ALLOWED_TRANSFERS_MISMATCH
|
|
)
|
|
else self.warn
|
|
)
|
|
log_function(msg)
|
|
|
|
self.found_networks = len(networks)
|
|
if len(networks) > max(1, len(self.networks)):
|
|
n_str = "; ".join(
|
|
["{} ({})".format(k, v) for k, v in networks.items()]
|
|
)
|
|
self.notice("More than one network: {}".format(n_str))
|
|
|
|
self.validate_called = True
|
|
|
|
def calculate_distances(self) -> None:
|
|
for route_master in self:
|
|
for route in route_master:
|
|
route.calculate_distances()
|
|
|
|
|
|
def find_transfers(elements, cities):
|
|
transfers = []
|
|
stop_area_groups = []
|
|
for el in elements:
|
|
if (
|
|
el["type"] == "relation"
|
|
and "members" in el
|
|
and el.get("tags", {}).get("public_transport") == "stop_area_group"
|
|
):
|
|
stop_area_groups.append(el)
|
|
|
|
# StopArea.id uniquely identifies a StopArea. We must ensure StopArea
|
|
# uniqueness since one stop_area relation may result in
|
|
# several StopArea instances at inter-city interchanges.
|
|
stop_area_ids = defaultdict(set) # el_id -> set of StopArea.id
|
|
stop_area_objects = dict() # StopArea.id -> one of StopArea instances
|
|
for city in cities:
|
|
for el, st in city.stations.items():
|
|
stop_area_ids[el].update(sa.id for sa in st)
|
|
stop_area_objects.update((sa.id, sa) for sa in st)
|
|
|
|
for sag in stop_area_groups:
|
|
transfer = set()
|
|
for m in sag["members"]:
|
|
k = el_id(m)
|
|
if k not in stop_area_ids:
|
|
continue
|
|
transfer.update(
|
|
stop_area_objects[sa_id] for sa_id in stop_area_ids[k]
|
|
)
|
|
if len(transfer) > 1:
|
|
transfers.append(transfer)
|
|
return transfers
|
|
|
|
|
|
def get_unused_entrances_geojson(elements):
|
|
global used_entrances
|
|
features = []
|
|
for el in elements:
|
|
if (
|
|
el["type"] == "node"
|
|
and "tags" in el
|
|
and el["tags"].get("railway") == "subway_entrance"
|
|
):
|
|
if el_id(el) not in used_entrances:
|
|
geometry = {"type": "Point", "coordinates": el_center(el)}
|
|
properties = {
|
|
k: v
|
|
for k, v in el["tags"].items()
|
|
if k not in ("railway", "entrance")
|
|
}
|
|
features.append(
|
|
{
|
|
"type": "Feature",
|
|
"geometry": geometry,
|
|
"properties": properties,
|
|
}
|
|
)
|
|
return {"type": "FeatureCollection", "features": features}
|