import math import re from collections import Counter, defaultdict from css_colours import normalize_colour MAX_DISTANCE_TO_ENTRANCES = 300 # in meters MAX_DISTANCE_STOP_TO_LINE = 50 # in meters ALLOWED_STATIONS_MISMATCH = 0.02 # part of total station count ALLOWED_TRANSFERS_MISMATCH = 0.07 # part of total interchanges count ALLOWED_ANGLE_BETWEEN_STOPS = 45 # in degrees DISALLOWED_ANGLE_BETWEEN_STOPS = 20 # in degrees # If an object was moved not too far compared to previous script run, # it is likely the same object DISPLACEMENT_TOLERANCE = 300 # in meters MODES_RAPID = set(("subway", "light_rail", "monorail", "train")) MODES_OVERGROUND = set(("tram", "bus", "trolleybus", "aerialway", "ferry")) DEFAULT_MODES_RAPID = set(("subway", "light_rail")) DEFAULT_MODES_OVERGROUND = set(("tram",)) # TODO: bus and trolleybus? ALL_MODES = MODES_RAPID | MODES_OVERGROUND RAILWAY_TYPES = set( ( "rail", "light_rail", "subway", "narrow_gauge", "funicular", "monorail", "tram", ) ) CONSTRUCTION_KEYS = ( "construction", "proposed", "construction:railway", "proposed:railway", ) used_entrances = set() START_END_TIMES_RE = re.compile(r".*?(\d{2}):(\d{2})-(\d{2}):(\d{2}).*") def get_start_end_times(opening_hours): """Very simplified method to parse OSM opening_hours tag. We simply take the first HH:MM-HH:MM substring which is the most probable opening hours interval for the most of weekdays. """ start_time, end_time = None, None m = START_END_TIMES_RE.match(opening_hours) if m: ints = tuple(map(int, m.groups())) start_time = (ints[0], ints[1]) end_time = (ints[2], ints[3]) return start_time, end_time def osm_interval_to_seconds(interval_str): """Convert to int an OSM value for 'interval'/'headway' tag which may be in these formats: HH:MM:SS, HH:MM, MM, M (https://wiki.openstreetmap.org/wiki/Key:interval#Format) """ hours, minutes, seconds = 0, 0, 0 semicolon_count = interval_str.count(":") try: if semicolon_count == 0: minutes = int(interval_str) elif semicolon_count == 1: hours, minutes = map(int, interval_str.split(":")) elif semicolon_count == 2: hours, minutes, seconds = map(int, interval_str.split(":")) else: return None except ValueError: return None return seconds + 60 * minutes + 60 * 60 * hours class CriticalValidationError(Exception): """Is thrown if an error occurs that prevents further validation of a city.""" def el_id(el): if not el: return None if "type" not in el: raise Exception("What is this element? {}".format(el)) return el["type"][0] + str(el.get("id", el.get("ref", ""))) def el_center(el): if not el: return None if "lat" in el: return (el["lon"], el["lat"]) elif "center" in el: return (el["center"]["lon"], el["center"]["lat"]) return None def distance(p1, p2): if p1 is None or p2 is None: raise Exception( "One of arguments to distance({}, {}) is None".format(p1, p2) ) dx = math.radians(p1[0] - p2[0]) * math.cos( 0.5 * math.radians(p1[1] + p2[1]) ) dy = math.radians(p1[1] - p2[1]) return 6378137 * math.sqrt(dx * dx + dy * dy) def is_near(p1, p2): return ( p1[0] - 1e-8 <= p2[0] <= p1[0] + 1e-8 and p1[1] - 1e-8 <= p2[1] <= p1[1] + 1e-8 ) def project_on_segment(p, p1, p2): """Given three points, return u - the position of projection of point p onto segment p1p2 regarding point p1 and (p2-p1) direction vector """ dp = (p2[0] - p1[0], p2[1] - p1[1]) d2 = dp[0] * dp[0] + dp[1] * dp[1] if d2 < 1e-14: return None u = ((p[0] - p1[0]) * dp[0] + (p[1] - p1[1]) * dp[1]) / d2 if not 0 <= u <= 1: return None return u def project_on_line(p, line): result = { # In the first approximation, position on rails is the index of the # closest vertex of line to the point p. Fractional value means that # the projected point lies on a segment between two vertices. # More than one value can occur if a route follows the same tracks # more than once. "positions_on_line": None, "projected_point": None, # (lon, lat) } if len(line) < 2: return result d_min = MAX_DISTANCE_STOP_TO_LINE * 5 closest_to_vertex = False # First, check vertices in the line for i, vertex in enumerate(line): d = distance(p, vertex) if d < d_min: result["positions_on_line"] = [i] result["projected_point"] = vertex d_min = d closest_to_vertex = True elif vertex == result["projected_point"]: # Repeated occurrence of the track vertex in line, like Oslo Line 5 result["positions_on_line"].append(i) # And then calculate distances to each segment for seg in range(len(line) - 1): # Check bbox for speed if not ( ( min(line[seg][0], line[seg + 1][0]) - MAX_DISTANCE_STOP_TO_LINE <= p[0] <= max(line[seg][0], line[seg + 1][0]) + MAX_DISTANCE_STOP_TO_LINE ) and ( min(line[seg][1], line[seg + 1][1]) - MAX_DISTANCE_STOP_TO_LINE <= p[1] <= max(line[seg][1], line[seg + 1][1]) + MAX_DISTANCE_STOP_TO_LINE ) ): continue u = project_on_segment(p, line[seg], line[seg + 1]) if u: projected_point = ( line[seg][0] + u * (line[seg + 1][0] - line[seg][0]), line[seg][1] + u * (line[seg + 1][1] - line[seg][1]), ) d = distance(p, projected_point) if d < d_min: result["positions_on_line"] = [seg + u] result["projected_point"] = projected_point d_min = d closest_to_vertex = False elif projected_point == result["projected_point"]: # Repeated occurrence of the track segment in line, # like Oslo Line 5 if not closest_to_vertex: result["positions_on_line"].append(seg + u) return result def find_segment(p, line, start_vertex=0): """Returns index of a segment and a position inside it.""" EPS = 1e-9 for seg in range(start_vertex, len(line) - 1): if is_near(p, line[seg]): return seg, 0.0 if line[seg][0] == line[seg + 1][0]: if not (p[0] - EPS <= line[seg][0] <= p[0] + EPS): continue px = None else: px = (p[0] - line[seg][0]) / (line[seg + 1][0] - line[seg][0]) if px is None or (0 <= px <= 1): if line[seg][1] == line[seg + 1][1]: if not (p[1] - EPS <= line[seg][1] <= p[1] + EPS): continue py = None else: py = (p[1] - line[seg][1]) / (line[seg + 1][1] - line[seg][1]) if py is None or (0 <= py <= 1): if py is None or px is None or (px - EPS <= py <= px + EPS): return seg, px or py return None, None def distance_on_line(p1, p2, line, start_vertex=0): """Calculates distance via line between projections of points p1 and p2. Returns a TUPLE of (d, vertex): d is the distance and vertex is the number of the second vertex, to continue calculations for the next point.""" line_len = len(line) seg1, pos1 = find_segment(p1, line, start_vertex) if seg1 is None: # logging.warn('p1 %s is not projected, st=%s', p1, start_vertex) return None seg2, pos2 = find_segment(p2, line, seg1) if seg2 is None: if line[0] == line[-1]: line = line + line[1:] seg2, pos2 = find_segment(p2, line, seg1) if seg2 is None: # logging.warn('p2 %s is not projected, st=%s', p2, start_vertex) return None if seg1 == seg2: return distance(line[seg1], line[seg1 + 1]) * abs(pos2 - pos1), seg1 if seg2 < seg1: # Should not happen raise Exception("Pos1 %s is after pos2 %s", seg1, seg2) d = 0 if pos1 < 1: d += distance(line[seg1], line[seg1 + 1]) * (1 - pos1) for i in range(seg1 + 1, seg2): d += distance(line[i], line[i + 1]) if pos2 > 0: d += distance(line[seg2], line[seg2 + 1]) * pos2 return d, seg2 % line_len def angle_between(p1, c, p2): a = round( abs( math.degrees( math.atan2(p1[1] - c[1], p1[0] - c[0]) - math.atan2(p2[1] - c[1], p2[0] - c[0]) ) ) ) return a if a <= 180 else 360 - a def format_elid_list(ids): msg = ", ".join(sorted(ids)[:20]) if len(ids) > 20: msg += ", ..." return msg class Station: @staticmethod def get_modes(el): mode = el["tags"].get("station") modes = [] if not mode else [mode] for m in ALL_MODES: if el["tags"].get(m) == "yes": modes.append(m) return set(modes) @staticmethod def is_station(el, modes): # public_transport=station is too ambiguous and unspecific to use, # so we expect for it to be backed by railway=station. if ( "tram" in modes and el.get("tags", {}).get("railway") == "tram_stop" ): return True if el.get("tags", {}).get("railway") not in ("station", "halt"): return False for k in CONSTRUCTION_KEYS: if k in el["tags"]: return False # Not checking for station=train, obviously if "train" not in modes and Station.get_modes(el).isdisjoint(modes): return False return True def __init__(self, el, city): """Call this with a railway=station node.""" if not Station.is_station(el, city.modes): raise Exception( "Station object should be instantiated from a station node. " "Got: {}".format(el) ) self.id = el_id(el) self.element = el self.modes = Station.get_modes(el) self.name = el["tags"].get("name", "?") self.int_name = el["tags"].get( "int_name", el["tags"].get("name:en", None) ) try: self.colour = normalize_colour(el["tags"].get("colour", None)) except ValueError as e: self.colour = None city.warn(str(e), el) self.center = el_center(el) if self.center is None: raise Exception("Could not find center of {}".format(el)) def __repr__(self): return "Station(id={}, modes={}, name={}, center={})".format( self.id, ",".join(self.modes), self.name, self.center ) class StopArea: @staticmethod def is_stop(el): if "tags" not in el: return False if el["tags"].get("railway") == "stop": return True if el["tags"].get("public_transport") == "stop_position": return True return False @staticmethod def is_platform(el): if "tags" not in el: return False if el["tags"].get("railway") in ("platform", "platform_edge"): return True if el["tags"].get("public_transport") == "platform": return True return False @staticmethod def is_track(el): if el["type"] != "way" or "tags" not in el: return False return el["tags"].get("railway") in RAILWAY_TYPES def __init__(self, station, city, stop_area=None): """Call this with a Station object.""" self.element = stop_area or station.element self.id = el_id(self.element) self.station = station self.stops = set() # set of el_ids of stop_positions self.platforms = set() # set of el_ids of platforms self.exits = set() # el_id of subway_entrance for leaving the platform self.entrances = set() # el_id of subway_entrance for entering # the platform self.center = None # lon, lat of the station centre point self.centers = {} # el_id -> (lon, lat) for all elements self.transfer = None # el_id of a transfer relation self.modes = station.modes self.name = station.name self.int_name = station.int_name self.colour = station.colour if stop_area: self.name = stop_area["tags"].get("name", self.name) self.int_name = stop_area["tags"].get( "int_name", stop_area["tags"].get("name:en", self.int_name) ) try: self.colour = ( normalize_colour(stop_area["tags"].get("colour")) or self.colour ) except ValueError as e: city.warn(str(e), stop_area) # If we have a stop area, add all elements from it warned_about_tracks = False for m in stop_area["members"]: k = el_id(m) m_el = city.elements.get(k) if m_el and "tags" in m_el: if Station.is_station(m_el, city.modes): if k != station.id: city.error( "Stop area has multiple stations", stop_area ) elif StopArea.is_stop(m_el): self.stops.add(k) elif StopArea.is_platform(m_el): self.platforms.add(k) elif m_el["tags"].get("railway") == "subway_entrance": if m_el["type"] != "node": city.warn("Subway entrance is not a node", m_el) if ( m_el["tags"].get("entrance") != "exit" and m["role"] != "exit_only" ): self.entrances.add(k) if ( m_el["tags"].get("entrance") != "entrance" and m["role"] != "entry_only" ): self.exits.add(k) elif StopArea.is_track(m_el): if not warned_about_tracks: city.warn( "Tracks in a stop_area relation", stop_area ) warned_about_tracks = True else: # Otherwise add nearby entrances center = station.center for c_el in city.elements.values(): if c_el.get("tags", {}).get("railway") == "subway_entrance": c_id = el_id(c_el) if c_id not in city.stop_areas: c_center = el_center(c_el) if ( c_center and distance(center, c_center) <= MAX_DISTANCE_TO_ENTRANCES ): if c_el["type"] != "node": city.warn( "Subway entrance is not a node", c_el ) etag = c_el["tags"].get("entrance") if etag != "exit": self.entrances.add(c_id) if etag != "entrance": self.exits.add(c_id) if self.exits and not self.entrances: city.warn( "Only exits for a station, no entrances", stop_area or station.element, ) if self.entrances and not self.exits: city.warn("No exits for a station", stop_area or station.element) for el in self.get_elements(): self.centers[el] = el_center(city.elements[el]) """Calculate the center point of the station. This algorithm cannot rely on a station node, since many stop_areas can share one. Basically it averages center points of all platforms and stop positions.""" if len(self.stops) + len(self.platforms) == 0: self.center = station.center else: self.center = [0, 0] for sp in self.stops | self.platforms: spc = self.centers[sp] for i in range(2): self.center[i] += spc[i] for i in range(2): self.center[i] /= len(self.stops) + len(self.platforms) def get_elements(self): result = set([self.id, self.station.id]) result.update(self.entrances) result.update(self.exits) result.update(self.stops) result.update(self.platforms) return result def __repr__(self): return ( f"StopArea(id={self.id}, name={self.name}, station={self.station}," f" transfer={self.transfer}, center={self.center})" ) class RouteStop: def __init__(self, stoparea): self.stoparea = stoparea self.stop = None # Stop position (lon, lat), possibly projected self.distance = 0 # In meters from the start of the route self.platform_entry = None # Platform el_id self.platform_exit = None # Platform el_id self.can_enter = False self.can_exit = False self.seen_stop = False self.seen_platform_entry = False self.seen_platform_exit = False self.seen_station = False @property def seen_platform(self): return self.seen_platform_entry or self.seen_platform_exit @staticmethod def get_actual_role(el, role, modes): if StopArea.is_stop(el): return "stop" elif StopArea.is_platform(el): return "platform" elif Station.is_station(el, modes): if "platform" in role: return "platform" else: return "stop" return None def add(self, member, relation, city): el = city.elements[el_id(member)] role = member["role"] if StopArea.is_stop(el): if "platform" in role: city.warn("Stop position in a platform role in a route", el) if el["type"] != "node": city.error("Stop position is not a node", el) self.stop = el_center(el) if "entry_only" not in role: self.can_exit = True if "exit_only" not in role: self.can_enter = True elif Station.is_station(el, city.modes): if el["type"] != "node": city.notice("Station in route is not a node", el) if not self.seen_stop and not self.seen_platform: self.stop = el_center(el) self.can_enter = True self.can_exit = True elif StopArea.is_platform(el): if "stop" in role: city.warn("Platform in a stop role in a route", el) if "exit_only" not in role: self.platform_entry = el_id(el) self.can_enter = True if "entry_only" not in role: self.platform_exit = el_id(el) self.can_exit = True if not self.seen_stop: self.stop = el_center(el) multiple_check = False actual_role = RouteStop.get_actual_role(el, role, city.modes) if actual_role == "platform": if role == "platform_entry_only": multiple_check = self.seen_platform_entry self.seen_platform_entry = True elif role == "platform_exit_only": multiple_check = self.seen_platform_exit self.seen_platform_exit = True else: if role != "platform" and "stop" not in role: city.warn( f'Platform "{el["tags"].get("name", "")}" ' f'({el_id(el)}) with invalid role "{role}" in route', relation, ) multiple_check = self.seen_platform self.seen_platform_entry = True self.seen_platform_exit = True elif actual_role == "stop": multiple_check = self.seen_stop self.seen_stop = True if multiple_check: log_function = city.error if actual_role == "stop" else city.notice log_function( f'Multiple {actual_role}s for a station "' f'{el["tags"].get("name", "")} ' f"({el_id(el)}) in a route relation", relation, ) def __repr__(self): return ( "RouteStop(stop={}, pl_entry={}, pl_exit={}, stoparea={})".format( self.stop, self.platform_entry, self.platform_exit, self.stoparea, ) ) class Route: """The longest route for a city with a unique ref.""" @staticmethod def is_route(el, modes): if ( el["type"] != "relation" or el.get("tags", {}).get("type") != "route" ): return False if "members" not in el: return False if el["tags"].get("route") not in modes: return False for k in CONSTRUCTION_KEYS: if k in el["tags"]: return False if "ref" not in el["tags"] and "name" not in el["tags"]: return False return True @staticmethod def get_network(relation): for k in ("network:metro", "network", "operator"): if k in relation["tags"]: return relation["tags"][k] return None @staticmethod def get_interval(tags): v = None for k in ("interval", "headway"): if k in tags: v = tags[k] break else: for kk in tags: if kk.startswith(k + ":"): v = tags[kk] break if not v: return None return osm_interval_to_seconds(v) def __init__(self, relation, city, master=None): assert Route.is_route( relation, city.modes ), f"The relation does not seem to be a route: {relation}" self.city = city self.element = relation self.id = el_id(relation) self.ref = None self.name = None self.mode = None self.colour = None self.infill = None self.network = None self.interval = None self.start_time = None self.end_time = None self.is_circular = False self.stops = [] # List of RouteStop # Would be a list of (lon, lat) for the longest stretch. Can be empty. self.tracks = None # Index of the fist stop that is located on/near the self.tracks self.first_stop_on_rails_index = None # Index of the last stop that is located on/near the self.tracks self.last_stop_on_rails_index = None self.process_tags(master) stop_position_elements = self.process_stop_members() self.process_tracks(stop_position_elements) def build_longest_line(self): line_nodes = set() last_track = [] track = [] warned_about_holes = False for m in self.element["members"]: el = self.city.elements.get(el_id(m), None) if not el or not StopArea.is_track(el): continue if "nodes" not in el or len(el["nodes"]) < 2: self.city.error("Cannot find nodes in a railway", el) continue nodes = ["n{}".format(n) for n in el["nodes"]] if m["role"] == "backward": nodes.reverse() line_nodes.update(nodes) if not track: is_first = True track.extend(nodes) else: new_segment = list(nodes) # copying if new_segment[0] == track[-1]: track.extend(new_segment[1:]) elif new_segment[-1] == track[-1]: track.extend(reversed(new_segment[:-1])) elif is_first and track[0] in ( new_segment[0], new_segment[-1], ): # We can reverse the track and try again track.reverse() if new_segment[0] == track[-1]: track.extend(new_segment[1:]) else: track.extend(reversed(new_segment[:-1])) else: # Store the track if it is long and clean it if not warned_about_holes: self.city.warn( "Hole in route rails near node {}".format( track[-1] ), self.element, ) warned_about_holes = True if len(track) > len(last_track): last_track = track track = [] is_first = False if len(track) > len(last_track): last_track = track # Remove duplicate points last_track = [ last_track[i] for i in range(0, len(last_track)) if i == 0 or last_track[i - 1] != last_track[i] ] return last_track, line_nodes def get_stop_projections(self): projected = [project_on_line(x.stop, self.tracks) for x in self.stops] def stop_near_tracks_criterion(stop_index: int): return ( projected[stop_index]["projected_point"] is not None and distance( self.stops[stop_index].stop, projected[stop_index]["projected_point"], ) <= MAX_DISTANCE_STOP_TO_LINE ) return projected, stop_near_tracks_criterion def project_stops_on_line(self): projected, stop_near_tracks_criterion = self.get_stop_projections() projected_stops_data = { "first_stop_on_rails_index": None, "last_stop_on_rails_index": None, "stops_on_longest_line": [], # list [{'route_stop': RouteStop, # 'coords': (lon, lat), # 'positions_on_rails': [] } } first_index = 0 while first_index < len(self.stops) and not stop_near_tracks_criterion( first_index ): first_index += 1 projected_stops_data["first_stop_on_rails_index"] = first_index last_index = len(self.stops) - 1 while last_index > projected_stops_data[ "first_stop_on_rails_index" ] and not stop_near_tracks_criterion(last_index): last_index -= 1 projected_stops_data["last_stop_on_rails_index"] = last_index for i, route_stop in enumerate(self.stops): if not first_index <= i <= last_index: continue if projected[i]["projected_point"] is None: self.city.error( 'Stop "{}" {} is nowhere near the tracks'.format( route_stop.stoparea.name, route_stop.stop ), self.element, ) else: stop_data = { "route_stop": route_stop, "coords": None, "positions_on_rails": None, } projected_point = projected[i]["projected_point"] # We've got two separate stations with a good stretch of # railway tracks between them. Put these on tracks. d = round(distance(route_stop.stop, projected_point)) if d > MAX_DISTANCE_STOP_TO_LINE: self.city.notice( 'Stop "{}" {} is {} meters from the tracks'.format( route_stop.stoparea.name, route_stop.stop, d ), self.element, ) else: stop_data["coords"] = projected_point stop_data["positions_on_rails"] = projected[i][ "positions_on_line" ] projected_stops_data["stops_on_longest_line"].append(stop_data) return projected_stops_data def calculate_distances(self): dist = 0 vertex = 0 for i, stop in enumerate(self.stops): if i > 0: direct = distance(stop.stop, self.stops[i - 1].stop) d_line = None if ( self.first_stop_on_rails_index <= i <= self.last_stop_on_rails_index ): d_line = distance_on_line( self.stops[i - 1].stop, stop.stop, self.tracks, vertex ) if d_line and direct - 10 <= d_line[0] <= direct * 2: vertex = d_line[1] dist += round(d_line[0]) else: dist += round(direct) stop.distance = dist def process_tags(self, master): relation = self.element master_tags = {} if not master else master["tags"] if "ref" not in relation["tags"] and "ref" not in master_tags: self.city.notice("Missing ref on a route", relation) self.ref = relation["tags"].get( "ref", master_tags.get("ref", relation["tags"].get("name", None)) ) self.name = relation["tags"].get("name", None) self.mode = relation["tags"]["route"] if ( "colour" not in relation["tags"] and "colour" not in master_tags and self.mode != "tram" ): self.city.notice("Missing colour on a route", relation) try: self.colour = normalize_colour( relation["tags"].get("colour", master_tags.get("colour", None)) ) except ValueError as e: self.colour = None self.city.warn(str(e), relation) try: self.infill = normalize_colour( relation["tags"].get( "colour:infill", master_tags.get("colour:infill", None) ) ) except ValueError as e: self.infill = None self.city.warn(str(e), relation) self.network = Route.get_network(relation) self.interval = Route.get_interval( relation["tags"] ) or Route.get_interval(master_tags) self.start_time, self.end_time = get_start_end_times( relation["tags"].get( "opening_hours", master_tags.get("opening_hours", "") ) ) if relation["tags"].get("public_transport:version") == "1": self.city.warn( "Public transport version is 1, which means the route " "is an unsorted pile of objects", relation, ) def process_stop_members(self): stations = set() # temporary for recording stations seen_stops = False seen_platforms = False repeat_pos = None stop_position_elements = [] for m in self.element["members"]: if "inactive" in m["role"]: continue k = el_id(m) if k in self.city.stations: st_list = self.city.stations[k] st = st_list[0] if len(st_list) > 1: self.city.error( f"Ambiguous station {st.name} in route. Please " "use stop_position or split interchange stations", self.element, ) el = self.city.elements[k] actual_role = RouteStop.get_actual_role( el, m["role"], self.city.modes ) if actual_role: if m["role"] and actual_role not in m["role"]: self.city.warn( "Wrong role '{}' for {} {}".format( m["role"], actual_role, k ), self.element, ) if repeat_pos is None: if not self.stops or st not in stations: stop = RouteStop(st) self.stops.append(stop) stations.add(st) elif self.stops[-1].stoparea.id == st.id: stop = self.stops[-1] else: # We've got a repeat if ( (seen_stops and seen_platforms) or ( actual_role == "stop" and not seen_platforms ) or ( actual_role == "platform" and not seen_stops ) ): # Circular route! stop = RouteStop(st) self.stops.append(stop) stations.add(st) else: repeat_pos = 0 if repeat_pos is not None: if repeat_pos >= len(self.stops): continue # Check that the type matches if (actual_role == "stop" and seen_stops) or ( actual_role == "platform" and seen_platforms ): self.city.error( 'Found an out-of-place {}: "{}" ({})'.format( actual_role, el["tags"].get("name", ""), k ), self.element, ) continue # Find the matching stop starting with index repeat_pos while ( repeat_pos < len(self.stops) and self.stops[repeat_pos].stoparea.id != st.id ): repeat_pos += 1 if repeat_pos >= len(self.stops): self.city.error( "Incorrect order of {}s at {}".format( actual_role, k ), self.element, ) continue stop = self.stops[repeat_pos] stop.add(m, self.element, self.city) if repeat_pos is None: seen_stops |= stop.seen_stop or stop.seen_station seen_platforms |= stop.seen_platform if StopArea.is_stop(el): stop_position_elements.append(el) continue if k not in self.city.elements: if "stop" in m["role"] or "platform" in m["role"]: raise CriticalValidationError( f"{m['role']} {m['type']} {m['ref']} for route " f"relation {self.element['id']} is not in the dataset" ) continue el = self.city.elements[k] if "tags" not in el: self.city.error( f"Untagged object {k} in a route", self.element ) continue is_under_construction = False for ck in CONSTRUCTION_KEYS: if ck in el["tags"]: self.city.warn( f"Under construction {m['role'] or 'feature'} {k} " "in route. Consider setting 'inactive' role or " "removing construction attributes", self.element, ) is_under_construction = True break if is_under_construction: continue if Station.is_station(el, self.city.modes): # A station may be not included into this route due to previous # 'stop area has multiple stations' error. No other error # message is needed. pass elif el["tags"].get("railway") in ("station", "halt"): self.city.error( "Missing station={} on a {}".format(self.mode, m["role"]), el, ) else: actual_role = RouteStop.get_actual_role( el, m["role"], self.city.modes ) if actual_role: self.city.error( f"{actual_role} {m['type']} {m['ref']} is not " "connected to a station in route", self.element, ) elif not StopArea.is_track(el): self.city.warn( "Unknown member type for {} {} in route".format( m["type"], m["ref"] ), self.element, ) return stop_position_elements def process_tracks(self, stop_position_elements): tracks, line_nodes = self.build_longest_line() for stop_el in stop_position_elements: stop_id = el_id(stop_el) if stop_id not in line_nodes: self.city.warn( 'Stop position "{}" ({}) is not on tracks'.format( stop_el["tags"].get("name", ""), stop_id ), self.element, ) # self.tracks would be a list of (lon, lat) for the longest stretch. # Can be empty. self.tracks = [el_center(self.city.elements.get(k)) for k in tracks] if ( None in self.tracks ): # usually, extending BBOX for the city is needed self.tracks = [] for n in filter(lambda x: x not in self.city.elements, tracks): self.city.warn( f"The dataset is missing the railway tracks node {n}", self.element, ) break if len(self.stops) > 1: self.is_circular = ( self.stops[0].stoparea == self.stops[-1].stoparea ) if ( self.is_circular and self.tracks and self.tracks[0] != self.tracks[-1] ): self.city.warn( "Non-closed rail sequence in a circular route", self.element, ) projected_stops_data = self.project_stops_on_line() self.check_and_recover_stops_order(projected_stops_data) self.apply_projected_stops_data(projected_stops_data) self.calculate_distances() def apply_projected_stops_data(self, projected_stops_data: dict) -> None: """Store better stop coordinates and indexes of first/last stops that lie on a continuous track line, to the instance attributes. """ for attr in ("first_stop_on_rails_index", "last_stop_on_rails_index"): setattr(self, attr, projected_stops_data[attr]) for stop_data in projected_stops_data["stops_on_longest_line"]: route_stop = stop_data["route_stop"] route_stop.positions_on_rails = stop_data["positions_on_rails"] if stop_coords := stop_data["coords"]: route_stop.stop = stop_coords def get_extended_tracks(self): """Amend tracks with points of leading/trailing self.stops that were not projected onto the longest tracks line. Return a new array. """ if self.first_stop_on_rails_index >= len(self.stops): tracks = [route_stop.stop for route_stop in self.stops] else: tracks = ( [ route_stop.stop for i, route_stop in enumerate(self.stops) if i < self.first_stop_on_rails_index ] + self.tracks + [ route_stop.stop for i, route_stop in enumerate(self.stops) if i > self.last_stop_on_rails_index ] ) return tracks def get_truncated_tracks(self, tracks): """Truncate leading/trailing segments of `tracks` param that are beyond the first and last stop locations. Return a new array. """ if self.is_circular: return tracks.copy() first_stop_location = find_segment(self.stops[0].stop, tracks, 0) last_stop_location = find_segment(self.stops[-1].stop, tracks, 0) if last_stop_location != (None, None): seg2, u2 = last_stop_location if u2 == 0.0: # Make seg2 the segment the last_stop_location is # at the middle or end of seg2 -= 1 # u2 = 1.0 if seg2 + 2 < len(tracks): tracks = tracks[0 : seg2 + 2] # noqa E203 tracks[-1] = self.stops[-1].stop if first_stop_location != (None, None): seg1, u1 = first_stop_location if u1 == 1.0: # Make seg1 the segment the first_stop_location is # at the beginning or middle of seg1 += 1 # u1 = 0.0 if seg1 > 0: tracks = tracks[seg1:] tracks[0] = self.stops[0].stop return tracks def get_tracks_geometry(self): tracks = self.get_extended_tracks() tracks = self.get_truncated_tracks(tracks) return tracks def check_stops_order_by_angle(self): disorder_warnings = [] disorder_errors = [] for si in range(len(self.stops) - 2): angle = angle_between( self.stops[si].stop, self.stops[si + 1].stop, self.stops[si + 2].stop, ) if angle < ALLOWED_ANGLE_BETWEEN_STOPS: msg = ( f'Angle between stops around "{self.stops[si + 1]}" ' f"is too narrow, {angle} degrees" ) if angle < DISALLOWED_ANGLE_BETWEEN_STOPS: disorder_errors.append(msg) else: disorder_warnings.append(msg) return disorder_warnings, disorder_errors def check_stops_order_on_tracks_direct(self, stop_sequence): """Checks stops order on tracks, following stop_sequence in direct order only. :param stop_sequence: list of dict{'route_stop', 'positions_on_rails', 'coords'} for RouteStops that belong to the longest contiguous sequence of tracks in a route. :return: error message on the first order violation or None. """ allowed_order_violations = 1 if self.is_circular else 0 max_position_on_rails = -1 for stop_data in stop_sequence: positions_on_rails = stop_data["positions_on_rails"] suitable_occurrence = 0 while ( suitable_occurrence < len(positions_on_rails) and positions_on_rails[suitable_occurrence] < max_position_on_rails ): suitable_occurrence += 1 if suitable_occurrence == len(positions_on_rails): if allowed_order_violations > 0: suitable_occurrence -= 1 allowed_order_violations -= 1 else: route_stop = stop_data["route_stop"] return 'Stops on tracks are unordered near "{}" {}'.format( route_stop.stoparea.name, route_stop.stop ) max_position_on_rails = positions_on_rails[suitable_occurrence] def check_stops_order_on_tracks(self, projected_stops_data): """Checks stops order on tracks, trying direct and reversed order of stops in the stop_sequence. :param projected_stops_data: info about RouteStops that belong to the longest contiguous sequence of tracks in a route. May be changed if tracks reversing is performed. :return: error message on the first order violation or None. """ error_message = self.check_stops_order_on_tracks_direct( projected_stops_data["stops_on_longest_line"] ) if error_message: error_message_reversed = self.check_stops_order_on_tracks_direct( reversed(projected_stops_data["stops_on_longest_line"]) ) if error_message_reversed is None: error_message = None self.city.warn( "Tracks seem to go in the opposite direction to stops", self.element, ) self.tracks.reverse() new_projected_stops_data = self.project_stops_on_line() projected_stops_data.update(new_projected_stops_data) return error_message def check_stops_order(self, projected_stops_data): ( angle_disorder_warnings, angle_disorder_errors, ) = self.check_stops_order_by_angle() disorder_on_tracks_error = self.check_stops_order_on_tracks( projected_stops_data ) disorder_warnings = angle_disorder_warnings disorder_errors = angle_disorder_errors if disorder_on_tracks_error: disorder_errors.append(disorder_on_tracks_error) return disorder_warnings, disorder_errors def check_and_recover_stops_order(self, projected_stops_data: dict): """ :param projected_stops_data: may change if we need to reverse tracks """ disorder_warnings, disorder_errors = self.check_stops_order( projected_stops_data ) if disorder_warnings or disorder_errors: resort_success = False if self.city.recovery_data: resort_success = self.try_resort_stops() if resort_success: for msg in disorder_warnings: self.city.notice(msg, self.element) for msg in disorder_errors: self.city.warn( "Fixed with recovery data: " + msg, self.element ) if not resort_success: for msg in disorder_warnings: self.city.notice(msg, self.element) for msg in disorder_errors: self.city.error(msg, self.element) def try_resort_stops(self): """Precondition: self.city.recovery_data is not None. Return success of station order recovering.""" self_stops = {} # station name => RouteStop for stop in self.stops: station = stop.stoparea.station stop_name = station.name if stop_name == "?" and station.int_name: stop_name = station.int_name # We won't programmatically recover routes with repeating stations: # such cases are rare and deserves manual verification if stop_name in self_stops: return False self_stops[stop_name] = stop route_id = (self.colour, self.ref) if route_id not in self.city.recovery_data: return False stop_names = list(self_stops.keys()) suitable_itineraries = [] for itinerary in self.city.recovery_data[route_id]: itinerary_stop_names = [ stop["name"] for stop in itinerary["stations"] ] if not ( len(stop_names) == len(itinerary_stop_names) and sorted(stop_names) == sorted(itinerary_stop_names) ): continue big_station_displacement = False for it_stop in itinerary["stations"]: name = it_stop["name"] it_stop_center = it_stop["center"] self_stop_center = self_stops[name].stoparea.station.center if ( distance(it_stop_center, self_stop_center) > DISPLACEMENT_TOLERANCE ): big_station_displacement = True break if not big_station_displacement: suitable_itineraries.append(itinerary) if len(suitable_itineraries) == 0: return False elif len(suitable_itineraries) == 1: matching_itinerary = suitable_itineraries[0] else: from_tag = self.element["tags"].get("from") to_tag = self.element["tags"].get("to") if not from_tag and not to_tag: return False matching_itineraries = [ itin for itin in suitable_itineraries if from_tag and itin["from"] == from_tag or to_tag and itin["to"] == to_tag ] if len(matching_itineraries) != 1: return False matching_itinerary = matching_itineraries[0] self.stops = [ self_stops[stop["name"]] for stop in matching_itinerary["stations"] ] return True def __len__(self): return len(self.stops) def __getitem__(self, i): return self.stops[i] def __iter__(self): return iter(self.stops) def __repr__(self): return ( "Route(id={}, mode={}, ref={}, name={}, network={}, interval={}, " "circular={}, num_stops={}, line_length={} m, from={}, to={}" ).format( self.id, self.mode, self.ref, self.name, self.network, self.interval, self.is_circular, len(self.stops), self.stops[-1].distance, self.stops[0], self.stops[-1], ) class RouteMaster: def __init__(self, master=None): self.routes = [] self.best = None self.id = el_id(master) self.has_master = master is not None self.interval_from_master = False if master: self.ref = master["tags"].get( "ref", master["tags"].get("name", None) ) try: self.colour = normalize_colour( master["tags"].get("colour", None) ) except ValueError: self.colour = None try: self.infill = normalize_colour( master["tags"].get("colour:infill", None) ) except ValueError: self.infill = None self.network = Route.get_network(master) self.mode = master["tags"].get( "route_master", None ) # This tag is required, but okay self.name = master["tags"].get("name", None) self.interval = Route.get_interval(master["tags"]) self.interval_from_master = self.interval is not None else: self.ref = None self.colour = None self.infill = None self.network = None self.mode = None self.name = None self.interval = None def add(self, route, city): if not self.network: self.network = route.network elif route.network and route.network != self.network: city.error( 'Route has different network ("{}") from master "{}"'.format( route.network, self.network ), route.element, ) if not self.colour: self.colour = route.colour elif route.colour and route.colour != self.colour: city.notice( 'Route "{}" has different colour from master "{}"'.format( route.colour, self.colour ), route.element, ) if not self.infill: self.infill = route.infill elif route.infill and route.infill != self.infill: city.notice( ( f'Route "{route.infill}" has different infill colour ' f'from master "{self.infill}"' ), route.element, ) if not self.ref: self.ref = route.ref elif route.ref != self.ref: city.notice( 'Route "{}" has different ref from master "{}"'.format( route.ref, self.ref ), route.element, ) if not self.name: self.name = route.name if not self.mode: self.mode = route.mode elif route.mode != self.mode: city.error( "Incompatible PT mode: master has {} and route has {}".format( self.mode, route.mode ), route.element, ) return if not self.interval_from_master and route.interval: if not self.interval: self.interval = route.interval else: self.interval = min(self.interval, route.interval) if not self.has_master and (not self.id or self.id > route.id): self.id = route.id self.routes.append(route) if not self.best or len(route.stops) > len(self.best.stops): self.best = route def stop_areas(self): """Returns a list of all stations on all route variants.""" seen_ids = set() for route in self.routes: for stop in route: st = stop.stoparea if st.id not in seen_ids: seen_ids.add(st.id) yield st def __len__(self): return len(self.routes) def __getitem__(self, i): return self.routes[i] def __iter__(self): return iter(self.routes) def __repr__(self): return ( f"RouteMaster(id={self.id}, mode={self.mode}, ref={self.ref}, " f"name={self.name}, network={self.network}, " f"num_variants={len(self.routes)}" ) class City: route_class = Route def __init__(self, city_data, overground=False): self.validate_called = False self.errors = [] self.warnings = [] self.notices = [] self.id = int(city_data["id"]) self.name = city_data["name"] self.country = city_data["country"] self.continent = city_data["continent"] self.overground = overground if not overground: self.num_stations = int(city_data["num_stations"]) self.num_lines = int(city_data["num_lines"] or "0") self.num_light_lines = int(city_data["num_light_lines"] or "0") self.num_interchanges = int(city_data["num_interchanges"] or "0") else: self.num_tram_lines = int(city_data["num_tram_lines"] or "0") self.num_trolleybus_lines = int( city_data["num_trolleybus_lines"] or "0" ) self.num_bus_lines = int(city_data["num_bus_lines"] or "0") self.num_other_lines = int(city_data["num_other_lines"] or "0") # Aquiring list of networks and modes networks = ( None if not city_data["networks"] else city_data["networks"].split(":") ) if not networks or len(networks[-1]) == 0: self.networks = [] else: self.networks = set( filter(None, [x.strip() for x in networks[-1].split(";")]) ) if not networks or len(networks) < 2 or len(networks[0]) == 0: if self.overground: self.modes = DEFAULT_MODES_OVERGROUND else: self.modes = DEFAULT_MODES_RAPID else: self.modes = set([x.strip() for x in networks[0].split(",")]) # Reversing bbox so it is (xmin, ymin, xmax, ymax) bbox = city_data["bbox"].split(",") if len(bbox) == 4: self.bbox = [float(bbox[i]) for i in (1, 0, 3, 2)] else: self.bbox = None self.elements = {} # Dict el_id → el self.stations = defaultdict(list) # Dict el_id → list of StopAreas self.routes = {} # Dict route_master_ref → RouteMaster self.masters = {} # Dict el_id of route → route_master self.stop_areas = defaultdict( list ) # El_id → list of stop_area elements it belongs to self.transfers = [] # List of lists of stop areas self.station_ids = set() # Set of stations' uid self.stops_and_platforms = set() # Set of stops and platforms el_id self.recovery_data = None @staticmethod def log_message(message, el): if el: tags = el.get("tags", {}) message += ' ({} {}, "{}")'.format( el["type"], el.get("id", el.get("ref")), tags.get("name", tags.get("ref", "")), ) return message def notice(self, message, el=None): """This type of message may point to a potential problem.""" msg = City.log_message(message, el) self.notices.append(msg) def warn(self, message, el=None): """A warning is definitely a problem but is doesn't prevent from building a routing file and doesn't invalidate the city. """ msg = City.log_message(message, el) self.warnings.append(msg) def error(self, message, el=None): """Error if a critical problem that invalidates the city""" msg = City.log_message(message, el) self.errors.append(msg) def contains(self, el): center = el_center(el) if center: return ( self.bbox[0] <= center[1] <= self.bbox[2] and self.bbox[1] <= center[0] <= self.bbox[3] ) return False def add(self, el): if el["type"] == "relation" and "members" not in el: return self.elements[el_id(el)] = el if not (el["type"] == "relation" and "tags" in el): return relation_type = el["tags"].get("type") if relation_type == "route_master": for m in el["members"]: if m["type"] != "relation": continue if el_id(m) in self.masters: self.error("Route in two route_masters", m) self.masters[el_id(m)] = el elif el["tags"].get("public_transport") == "stop_area": if relation_type != "public_transport": self.warn( "stop_area relation with " f"type={relation_type}, needed type=public_transport", el, ) return warned_about_duplicates = False for m in el["members"]: stop_areas = self.stop_areas[el_id(m)] if el in stop_areas and not warned_about_duplicates: self.warn("Duplicate element in a stop area", el) warned_about_duplicates = True else: stop_areas.append(el) def make_transfer(self, sag): transfer = set() for m in sag["members"]: k = el_id(m) el = self.elements.get(k) if not el: # A sag member may validly not belong to the city while # the sag does - near the city bbox boundary continue if "tags" not in el: self.warn( "An untagged object {} in a stop_area_group".format(k), sag ) continue if ( el["type"] != "relation" or el["tags"].get("type") != "public_transport" or el["tags"].get("public_transport") != "stop_area" ): continue if k in self.stations: stoparea = self.stations[k][0] transfer.add(stoparea) if stoparea.transfer: # TODO: properly process such cases. # Counterexample 1: Paris, # Châtelet subway station <-> # "Châtelet - Les Halles" railway station <-> # Les Halles subway station # Counterexample 2: Saint-Petersburg, transfers # Витебский вокзал <-> # Пушкинская <-> # Звенигородская self.warn( "Stop area {} belongs to multiple interchanges".format( k ) ) stoparea.transfer = el_id(sag) if len(transfer) > 1: self.transfers.append(transfer) def extract_routes(self): # Extract stations processed_stop_areas = set() for el in self.elements.values(): if Station.is_station(el, self.modes): # See PR https://github.com/mapsme/subways/pull/98 if ( el["type"] == "relation" and el["tags"].get("type") != "multipolygon" ): rel_type = el["tags"].get("type") self.warn( "A railway station cannot be a relation of type " f"{rel_type}", el, ) continue st = Station(el, self) self.station_ids.add(st.id) if st.id in self.stop_areas: stations = [] for sa in self.stop_areas[st.id]: stations.append(StopArea(st, self, sa)) else: stations = [StopArea(st, self)] for station in stations: if station.id not in processed_stop_areas: processed_stop_areas.add(station.id) for st_el in station.get_elements(): self.stations[st_el].append(station) # Check that stops and platforms belong to # a single stop_area for sp in station.stops | station.platforms: if sp in self.stops_and_platforms: self.notice( f"A stop or a platform {sp} belongs to " "multiple stop areas, might be correct" ) else: self.stops_and_platforms.add(sp) # Extract routes for el in self.elements.values(): if Route.is_route(el, self.modes): if el["tags"].get("access") in ("no", "private"): continue route_id = el_id(el) master = self.masters.get(route_id, None) if self.networks: network = Route.get_network(el) if master: master_network = Route.get_network(master) else: master_network = None if ( network not in self.networks and master_network not in self.networks ): continue route = self.route_class(el, self, master) if not route.stops: self.warn("Route has no stops", el) continue elif len(route.stops) == 1: self.warn("Route has only one stop", el) continue k = el_id(master) if master else route.ref if k not in self.routes: self.routes[k] = RouteMaster(master) self.routes[k].add(route, self) # Sometimes adding a route to a newly initialized RouteMaster # can fail if len(self.routes[k]) == 0: del self.routes[k] # And while we're iterating over relations, find interchanges if ( el["type"] == "relation" and el.get("tags", {}).get("public_transport", None) == "stop_area_group" ): self.make_transfer(el) # Filter transfers, leaving only stations that belong to routes used_stop_areas = set() for rmaster in self.routes.values(): for route in rmaster: used_stop_areas.update([s.stoparea for s in route.stops]) new_transfers = [] for transfer in self.transfers: new_tr = [s for s in transfer if s in used_stop_areas] if len(new_tr) > 1: new_transfers.append(new_tr) self.transfers = new_transfers def __iter__(self): return iter(self.routes.values()) @property def is_good(self): if not (self.errors or self.validate_called): raise RuntimeError( "You mustn't refer to City.is_good property before calling " "the City.validate() method unless an error already occurred." ) return len(self.errors) == 0 def get_validation_result(self): result = { "name": self.name, "country": self.country, "continent": self.continent, "stations_found": getattr(self, "found_stations", 0), "transfers_found": getattr(self, "found_interchanges", 0), "unused_entrances": getattr(self, "unused_entrances", 0), "networks": getattr(self, "found_networks", 0), } if not self.overground: result.update( { "subwayl_expected": self.num_lines, "lightrl_expected": self.num_light_lines, "subwayl_found": getattr(self, "found_lines", 0), "lightrl_found": getattr(self, "found_light_lines", 0), "stations_expected": self.num_stations, "transfers_expected": self.num_interchanges, } ) else: result.update( { "stations_expected": 0, "transfers_expected": 0, "busl_expected": self.num_bus_lines, "trolleybusl_expected": self.num_trolleybus_lines, "traml_expected": self.num_tram_lines, "otherl_expected": self.num_other_lines, "busl_found": getattr(self, "found_bus_lines", 0), "trolleybusl_found": getattr( self, "found_trolleybus_lines", 0 ), "traml_found": getattr(self, "found_tram_lines", 0), "otherl_found": getattr(self, "found_other_lines", 0), } ) result["warnings"] = self.warnings result["errors"] = self.errors result["notices"] = self.notices return result def count_unused_entrances(self): global used_entrances stop_areas = set() for el in self.elements.values(): if ( el["type"] == "relation" and "tags" in el and el["tags"].get("public_transport") == "stop_area" and "members" in el ): stop_areas.update([el_id(m) for m in el["members"]]) unused = [] not_in_sa = [] for el in self.elements.values(): if ( el["type"] == "node" and "tags" in el and el["tags"].get("railway") == "subway_entrance" ): i = el_id(el) if i in self.stations: used_entrances.add(i) if i not in stop_areas: not_in_sa.append(i) if i not in self.stations: unused.append(i) self.unused_entrances = len(unused) self.entrances_not_in_stop_areas = len(not_in_sa) if unused: self.notice( f"{len(unused)} subway entrances are not connected to a " f"station: {format_elid_list(unused)}" ) if not_in_sa: self.notice( f"{len(not_in_sa)} subway entrances are not in stop_area " f"relations: {format_elid_list(not_in_sa)}" ) def check_return_routes(self, rmaster): variants = {} have_return = set() for variant in rmaster: if len(variant) < 2: continue # Using transfer ids because a train can arrive at different # stations within a transfer. But disregard transfer that may give # an impression of a circular route (for example, # Simonis / Elisabeth station and route 2 in Brussels) if variant[0].stoparea.transfer == variant[-1].stoparea.transfer: t = (variant[0].stoparea.id, variant[-1].stoparea.id) else: t = ( variant[0].stoparea.transfer or variant[0].stoparea.id, variant[-1].stoparea.transfer or variant[-1].stoparea.id, ) if t in variants: continue variants[t] = variant.element tr = (t[1], t[0]) if tr in variants: have_return.add(t) have_return.add(tr) if len(variants) == 0: self.error( "An empty route master {}. Please set construction:route " "if it is under construction".format(rmaster.id) ) elif len(variants) == 1: log_function = ( self.error if not rmaster.best.is_circular else self.notice ) log_function( "Only one route in route_master. " "Please check if it needs a return route", rmaster.best.element, ) else: for t, rel in variants.items(): if t not in have_return: self.notice("Route does not have a return direction", rel) def validate_lines(self): self.found_light_lines = len( [x for x in self.routes.values() if x.mode != "subway"] ) self.found_lines = len(self.routes) - self.found_light_lines if self.found_lines != self.num_lines: self.error( "Found {} subway lines, expected {}".format( self.found_lines, self.num_lines ) ) if self.found_light_lines != self.num_light_lines: self.error( "Found {} light rail lines, expected {}".format( self.found_light_lines, self.num_light_lines ) ) def validate_overground_lines(self): self.found_tram_lines = len( [x for x in self.routes.values() if x.mode == "tram"] ) self.found_bus_lines = len( [x for x in self.routes.values() if x.mode == "bus"] ) self.found_trolleybus_lines = len( [x for x in self.routes.values() if x.mode == "trolleybus"] ) self.found_other_lines = len( [ x for x in self.routes.values() if x.mode not in ("bus", "trolleybus", "tram") ] ) if self.found_tram_lines != self.num_tram_lines: log_function = ( self.error if self.found_tram_lines == 0 else self.notice ) log_function( "Found {} tram lines, expected {}".format( self.found_tram_lines, self.num_tram_lines ), ) def validate(self): networks = Counter() self.found_stations = 0 unused_stations = set(self.station_ids) for rmaster in self.routes.values(): networks[str(rmaster.network)] += 1 if not self.overground: self.check_return_routes(rmaster) route_stations = set() for sa in rmaster.stop_areas(): route_stations.add(sa.transfer or sa.id) unused_stations.discard(sa.station.id) self.found_stations += len(route_stations) if unused_stations: self.unused_stations = len(unused_stations) self.notice( "{} unused stations: {}".format( self.unused_stations, format_elid_list(unused_stations) ) ) self.count_unused_entrances() self.found_interchanges = len(self.transfers) if self.overground: self.validate_overground_lines() else: self.validate_lines() if self.found_stations != self.num_stations: msg = "Found {} stations in routes, expected {}".format( self.found_stations, self.num_stations ) log_function = ( self.error if not ( 0 <= (self.num_stations - self.found_stations) / self.num_stations <= ALLOWED_STATIONS_MISMATCH ) else self.warn ) log_function(msg) if self.found_interchanges != self.num_interchanges: msg = "Found {} interchanges, expected {}".format( self.found_interchanges, self.num_interchanges ) log_function = ( self.error if self.num_interchanges != 0 and not ( (self.num_interchanges - self.found_interchanges) / self.num_interchanges <= ALLOWED_TRANSFERS_MISMATCH ) else self.warn ) log_function(msg) self.found_networks = len(networks) if len(networks) > max(1, len(self.networks)): n_str = "; ".join( ["{} ({})".format(k, v) for k, v in networks.items()] ) self.notice("More than one network: {}".format(n_str)) self.validate_called = True def find_transfers(elements, cities): transfers = [] stop_area_groups = [] for el in elements: if ( el["type"] == "relation" and "members" in el and el.get("tags", {}).get("public_transport") == "stop_area_group" ): stop_area_groups.append(el) # StopArea.id uniquely identifies a StopArea. We must ensure StopArea # uniqueness since one stop_area relation may result in # several StopArea instances at inter-city interchanges. stop_area_ids = defaultdict(set) # el_id -> set of StopArea.id stop_area_objects = dict() # StopArea.id -> one of StopArea instances for city in cities: for el, st in city.stations.items(): stop_area_ids[el].update(sa.id for sa in st) stop_area_objects.update((sa.id, sa) for sa in st) for sag in stop_area_groups: transfer = set() for m in sag["members"]: k = el_id(m) if k not in stop_area_ids: continue transfer.update( stop_area_objects[sa_id] for sa_id in stop_area_ids[k] ) if len(transfer) > 1: transfers.append(transfer) return transfers def get_unused_entrances_geojson(elements): global used_entrances features = [] for el in elements: if ( el["type"] == "node" and "tags" in el and el["tags"].get("railway") == "subway_entrance" ): if el_id(el) not in used_entrances: geometry = {"type": "Point", "coordinates": el_center(el)} properties = { k: v for k, v in el["tags"].items() if k not in ("railway", "entrance") } features.append( { "type": "Feature", "geometry": geometry, "properties": properties, } ) return {"type": "FeatureCollection", "features": features}