subways/subway_structure.py
2022-10-07 23:20:44 +02:00

1916 lines
71 KiB
Python

import csv
import logging
import math
import urllib.parse
import urllib.request
from css_colours import normalize_colour
from collections import Counter, defaultdict
SPREADSHEET_ID = '1SEW1-NiNOnA2qDwievcxYV1FOaQl1mb1fdeyqAxHu3k'
MAX_DISTANCE_TO_ENTRANCES = 300 # in meters
MAX_DISTANCE_STOP_TO_LINE = 50 # in meters
ALLOWED_STATIONS_MISMATCH = 0.02 # part of total station count
ALLOWED_TRANSFERS_MISMATCH = 0.07 # part of total interchanges count
ALLOWED_ANGLE_BETWEEN_STOPS = 45 # in degrees
DISALLOWED_ANGLE_BETWEEN_STOPS = 20 # in degrees
# If an object was moved not too far compared to previous script run,
# it is likely the same object
DISPLACEMENT_TOLERANCE = 300 # in meters
MODES_RAPID = set(('subway', 'light_rail', 'monorail', 'train'))
MODES_OVERGROUND = set(('tram', 'bus', 'trolleybus', 'aerialway', 'ferry'))
DEFAULT_MODES_RAPID = set(('subway', 'light_rail'))
DEFAULT_MODES_OVERGROUND = set(('tram',)) # TODO: bus and trolleybus?
ALL_MODES = MODES_RAPID | MODES_OVERGROUND
RAILWAY_TYPES = set(
(
'rail',
'light_rail',
'subway',
'narrow_gauge',
'funicular',
'monorail',
'tram',
)
)
CONSTRUCTION_KEYS = (
'construction',
'proposed',
'construction:railway',
'proposed:railway',
)
used_entrances = set()
class CriticalValidationError(Exception):
"""Is thrown if an error occurs
that prevents further validation of a city."""
def el_id(el):
if not el:
return None
if 'type' not in el:
raise Exception('What is this element? {}'.format(el))
return el['type'][0] + str(el.get('id', el.get('ref', '')))
def el_center(el):
if not el:
return None
if 'lat' in el:
return (el['lon'], el['lat'])
elif 'center' in el:
return (el['center']['lon'], el['center']['lat'])
return None
def distance(p1, p2):
if p1 is None or p2 is None:
raise Exception(
'One of arguments to distance({}, {}) is None'.format(p1, p2)
)
dx = math.radians(p1[0] - p2[0]) * math.cos(
0.5 * math.radians(p1[1] + p2[1])
)
dy = math.radians(p1[1] - p2[1])
return 6378137 * math.sqrt(dx * dx + dy * dy)
def is_near(p1, p2):
return (
p1[0] - 1e-8 <= p2[0] <= p1[0] + 1e-8
and p1[1] - 1e-8 <= p2[1] <= p1[1] + 1e-8
)
def project_on_line(p, line):
def project_on_segment(p, p1, p2):
dp = (p2[0] - p1[0], p2[1] - p1[1])
d2 = dp[0] * dp[0] + dp[1] * dp[1]
if d2 < 1e-14:
return None
# u is the position of projection of p point on line p1p2
# regarding point p1 and (p2-p1) direction vector
u = ((p[0] - p1[0]) * dp[0] + (p[1] - p1[1]) * dp[1]) / d2
if not 0 <= u <= 1:
return None
return u
result = {
# In the first approximation, position on rails is the index of the
# closest vertex of line to the point p. Fractional value means that
# the projected point lies on a segment between two vertices. More than
# one value can occur if a route follows the same tracks more than once.
'positions_on_line': None,
'projected_point': None, # (lon, lat)
}
if len(line) < 2:
return result
d_min = MAX_DISTANCE_STOP_TO_LINE * 5
closest_to_vertex = False
# First, check vertices in the line
for i, vertex in enumerate(line):
d = distance(p, vertex)
if d < d_min:
result['positions_on_line'] = [i]
result['projected_point'] = vertex
d_min = d
closest_to_vertex = True
elif vertex == result['projected_point']:
# Repeated occurrence of the track vertex in line, like Oslo Line 5
result['positions_on_line'].append(i)
# And then calculate distances to each segment
for seg in range(len(line) - 1):
# Check bbox for speed
if not (
(
min(line[seg][0], line[seg + 1][0]) - MAX_DISTANCE_STOP_TO_LINE
<= p[0]
<= max(line[seg][0], line[seg + 1][0])
+ MAX_DISTANCE_STOP_TO_LINE
)
and (
min(line[seg][1], line[seg + 1][1]) - MAX_DISTANCE_STOP_TO_LINE
<= p[1]
<= max(line[seg][1], line[seg + 1][1])
+ MAX_DISTANCE_STOP_TO_LINE
)
):
continue
u = project_on_segment(p, line[seg], line[seg + 1])
if u:
projected_point = (
line[seg][0] + u * (line[seg + 1][0] - line[seg][0]),
line[seg][1] + u * (line[seg + 1][1] - line[seg][1]),
)
d = distance(p, projected_point)
if d < d_min:
result['positions_on_line'] = [seg + u]
result['projected_point'] = projected_point
d_min = d
closest_to_vertex = False
elif projected_point == result['projected_point']:
# Repeated occurrence of the track segment in line, like Oslo Line 5
if not closest_to_vertex:
result['positions_on_line'].append(seg + u)
return result
def find_segment(p, line, start_vertex=0):
"""Returns index of a segment and a position inside it."""
EPS = 1e-9
for seg in range(start_vertex, len(line) - 1):
if is_near(p, line[seg]):
return seg, 0
if line[seg][0] == line[seg + 1][0]:
if not (p[0] - EPS <= line[seg][0] <= p[0] + EPS):
continue
px = None
else:
px = (p[0] - line[seg][0]) / (line[seg + 1][0] - line[seg][0])
if px is None or (0 <= px <= 1):
if line[seg][1] == line[seg + 1][1]:
if not (p[1] - EPS <= line[seg][1] <= p[1] + EPS):
continue
py = None
else:
py = (p[1] - line[seg][1]) / (line[seg + 1][1] - line[seg][1])
if py is None or (0 <= py <= 1):
if py is None or px is None or (px - EPS <= py <= px + EPS):
return seg, px or py
return None, None
def distance_on_line(p1, p2, line, start_vertex=0):
"""Calculates distance via line between projections
of points p1 and p2. Returns a TUPLE of (d, vertex):
d is the distance and vertex is the number of the second
vertex, to continue calculations for the next point."""
line_copy = line
seg1, pos1 = find_segment(p1, line, start_vertex)
if seg1 is None:
# logging.warn('p1 %s is not projected, st=%s', p1, start_vertex)
return None
seg2, pos2 = find_segment(p2, line, seg1)
if seg2 is None:
if line[0] == line[-1]:
line = line + line[1:]
seg2, pos2 = find_segment(p2, line, seg1)
if seg2 is None:
# logging.warn('p2 %s is not projected, st=%s', p2, start_vertex)
return None
if seg1 == seg2:
return distance(line[seg1], line[seg1 + 1]) * abs(pos2 - pos1), seg1
if seg2 < seg1:
# Should not happen
raise Exception('Pos1 %s is after pos2 %s', seg1, seg2)
d = 0
if pos1 < 1:
d += distance(line[seg1], line[seg1 + 1]) * (1 - pos1)
for i in range(seg1 + 1, seg2):
d += distance(line[i], line[i + 1])
if pos2 > 0:
d += distance(line[seg2], line[seg2 + 1]) * pos2
return d, seg2 % len(line_copy)
def angle_between(p1, c, p2):
a = round(
abs(
math.degrees(
math.atan2(p1[1] - c[1], p1[0] - c[0])
- math.atan2(p2[1] - c[1], p2[0] - c[0])
)
)
)
return a if a <= 180 else 360 - a
def format_elid_list(ids):
msg = ', '.join(sorted(ids)[:20])
if len(ids) > 20:
msg += ', ...'
return msg
class Station:
@staticmethod
def get_modes(el):
mode = el['tags'].get('station')
modes = [] if not mode else [mode]
for m in ALL_MODES:
if el['tags'].get(m) == 'yes':
modes.append(m)
return set(modes)
@staticmethod
def is_station(el, modes):
# public_transport=station is too ambiguous and unspecific to use,
# so we expect for it to be backed by railway=station.
if (
'tram' in modes
and el.get('tags', {}).get('railway') == 'tram_stop'
):
return True
if el.get('tags', {}).get('railway') not in ('station', 'halt'):
return False
for k in CONSTRUCTION_KEYS:
if k in el['tags']:
return False
# Not checking for station=train, obviously
if 'train' not in modes and Station.get_modes(el).isdisjoint(modes):
return False
return True
def __init__(self, el, city):
"""Call this with a railway=station node."""
if not Station.is_station(el, city.modes):
raise Exception(
'Station object should be instantiated from a station node. '
'Got: {}'.format(el)
)
self.id = el_id(el)
self.element = el
self.modes = Station.get_modes(el)
self.name = el['tags'].get('name', '?')
self.int_name = el['tags'].get(
'int_name', el['tags'].get('name:en', None)
)
try:
self.colour = normalize_colour(el['tags'].get('colour', None))
except ValueError as e:
self.colour = None
city.warn(str(e), el)
self.center = el_center(el)
if self.center is None:
raise Exception('Could not find center of {}'.format(el))
def __repr__(self):
return 'Station(id={}, modes={}, name={}, center={})'.format(
self.id, ','.join(self.modes), self.name, self.center
)
class StopArea:
@staticmethod
def is_stop(el):
if 'tags' not in el:
return False
if el['tags'].get('railway') == 'stop':
return True
if el['tags'].get('public_transport') == 'stop_position':
return True
return False
@staticmethod
def is_platform(el):
if 'tags' not in el:
return False
if el['tags'].get('railway') in ('platform', 'platform_edge'):
return True
if el['tags'].get('public_transport') == 'platform':
return True
return False
@staticmethod
def is_track(el):
if el['type'] != 'way' or 'tags' not in el:
return False
return el['tags'].get('railway') in RAILWAY_TYPES
def __init__(self, station, city, stop_area=None):
"""Call this with a Station object."""
self.element = stop_area or station.element
self.id = el_id(self.element)
self.station = station
self.stops = set() # set of el_ids of stop_positions
self.platforms = set() # set of el_ids of platforms
self.exits = set() # el_id of subway_entrance for leaving the platform
self.entrances = set() # el_id of subway_entrance for entering
# the platform
self.center = None # lon, lat of the station centre point
self.centers = {} # el_id -> (lon, lat) for all elements
self.transfer = None # el_id of a transfer relation
self.modes = station.modes
self.name = station.name
self.int_name = station.int_name
self.colour = station.colour
if stop_area:
self.name = stop_area['tags'].get('name', self.name)
self.int_name = stop_area['tags'].get(
'int_name', stop_area['tags'].get('name:en', self.int_name)
)
try:
self.colour = (
normalize_colour(stop_area['tags'].get('colour'))
or self.colour
)
except ValueError as e:
city.warn(str(e), stop_area)
# If we have a stop area, add all elements from it
warned_about_tracks = False
for m in stop_area['members']:
k = el_id(m)
m_el = city.elements.get(k)
if m_el and 'tags' in m_el:
if Station.is_station(m_el, city.modes):
if k != station.id:
city.error(
'Stop area has multiple stations', stop_area
)
elif StopArea.is_stop(m_el):
self.stops.add(k)
elif StopArea.is_platform(m_el):
self.platforms.add(k)
elif m_el['tags'].get('railway') == 'subway_entrance':
if m_el['type'] != 'node':
city.warn('Subway entrance is not a node', m_el)
if (
m_el['tags'].get('entrance') != 'exit'
and m['role'] != 'exit_only'
):
self.entrances.add(k)
if (
m_el['tags'].get('entrance') != 'entrance'
and m['role'] != 'entry_only'
):
self.exits.add(k)
elif StopArea.is_track(m_el):
if not warned_about_tracks:
city.warn(
'Tracks in a stop_area relation', stop_area
)
warned_about_tracks = True
else:
# Otherwise add nearby entrances
center = station.center
for c_el in city.elements.values():
if c_el.get('tags', {}).get('railway') == 'subway_entrance':
c_id = el_id(c_el)
if c_id not in city.stop_areas:
c_center = el_center(c_el)
if (
c_center
and distance(center, c_center)
<= MAX_DISTANCE_TO_ENTRANCES
):
if c_el['type'] != 'node':
city.warn(
'Subway entrance is not a node', c_el
)
etag = c_el['tags'].get('entrance')
if etag != 'exit':
self.entrances.add(c_id)
if etag != 'entrance':
self.exits.add(c_id)
if self.exits and not self.entrances:
city.warn(
'Only exits for a station, no entrances',
stop_area or station.element,
)
if self.entrances and not self.exits:
city.warn('No exits for a station', stop_area or station.element)
for el in self.get_elements():
self.centers[el] = el_center(city.elements[el])
"""Calculates the center point of the station. This algorithm
cannot rely on a station node, since many stop_areas can share one.
Basically it averages center points of all platforms
and stop positions."""
if len(self.stops) + len(self.platforms) == 0:
self.center = station.center
else:
self.center = [0, 0]
for sp in self.stops | self.platforms:
spc = self.centers[sp]
for i in range(2):
self.center[i] += spc[i]
for i in range(2):
self.center[i] /= len(self.stops) + len(self.platforms)
def get_elements(self):
result = set([self.id, self.station.id])
result.update(self.entrances)
result.update(self.exits)
result.update(self.stops)
result.update(self.platforms)
return result
def __repr__(self):
return 'StopArea(id={}, name={}, station={}, transfer={}, center={})'.format(
self.id, self.name, self.station, self.transfer, self.center
)
class RouteStop:
def __init__(self, stoparea):
self.stoparea = stoparea
self.stop = None # Stop position (lon, lat), possibly projected
self.distance = 0 # In meters from the start of the route
self.platform_entry = None # Platform el_id
self.platform_exit = None # Platform el_id
self.can_enter = False
self.can_exit = False
self.seen_stop = False
self.seen_platform_entry = False
self.seen_platform_exit = False
self.seen_station = False
@property
def seen_platform(self):
return self.seen_platform_entry or self.seen_platform_exit
@staticmethod
def get_actual_role(el, role, modes):
if StopArea.is_stop(el):
return 'stop'
elif StopArea.is_platform(el):
return 'platform'
elif Station.is_station(el, modes):
if 'platform' in role:
return 'platform'
else:
return 'stop'
return None
def add(self, member, relation, city):
el = city.elements[el_id(member)]
role = member['role']
if StopArea.is_stop(el):
if 'platform' in role:
city.warn('Stop position in a platform role in a route', el)
if el['type'] != 'node':
city.error('Stop position is not a node', el)
self.stop = el_center(el)
if 'entry_only' not in role:
self.can_exit = True
if 'exit_only' not in role:
self.can_enter = True
elif Station.is_station(el, city.modes):
if el['type'] != 'node':
city.notice('Station in route is not a node', el)
if not self.seen_stop and not self.seen_platform:
self.stop = el_center(el)
self.can_enter = True
self.can_exit = True
elif StopArea.is_platform(el):
if 'stop' in role:
city.warn('Platform in a stop role in a route', el)
if 'exit_only' not in role:
self.platform_entry = el_id(el)
self.can_enter = True
if 'entry_only' not in role:
self.platform_exit = el_id(el)
self.can_exit = True
if not self.seen_stop:
self.stop = el_center(el)
multiple_check = False
actual_role = RouteStop.get_actual_role(el, role, city.modes)
if actual_role == 'platform':
if role == 'platform_entry_only':
multiple_check = self.seen_platform_entry
self.seen_platform_entry = True
elif role == 'platform_exit_only':
multiple_check = self.seen_platform_exit
self.seen_platform_exit = True
else:
if role != 'platform' and 'stop' not in role:
city.warn(
"Platform with invalid role '{}' in a route".format(
role
),
el,
)
multiple_check = self.seen_platform
self.seen_platform_entry = True
self.seen_platform_exit = True
elif actual_role == 'stop':
multiple_check = self.seen_stop
self.seen_stop = True
if multiple_check:
log_function = city.error if actual_role == 'stop' else city.notice
log_function(
'Multiple {}s for a station "{}" ({}) in a route relation'.format(
actual_role, el['tags'].get('name', ''), el_id(el)
),
relation,
)
def __repr__(self):
return (
'RouteStop(stop={}, pl_entry={}, pl_exit={}, stoparea={})'.format(
self.stop,
self.platform_entry,
self.platform_exit,
self.stoparea,
)
)
class Route:
"""The longest route for a city with a unique ref."""
@staticmethod
def is_route(el, modes):
if (
el['type'] != 'relation'
or el.get('tags', {}).get('type') != 'route'
):
return False
if 'members' not in el:
return False
if el['tags'].get('route') not in modes:
return False
for k in CONSTRUCTION_KEYS:
if k in el['tags']:
return False
if 'ref' not in el['tags'] and 'name' not in el['tags']:
return False
return True
@staticmethod
def get_network(relation):
for k in ('network:metro', 'network', 'operator'):
if k in relation['tags']:
return relation['tags'][k]
return None
@staticmethod
def get_interval(tags):
v = None
for k in ('interval', 'headway'):
if k in tags:
v = tags[k]
break
else:
for kk in tags:
if kk.startswith(k + ':'):
v = tags[kk]
break
if not v:
return None
try:
return float(v)
except ValueError:
return None
def build_longest_line(self, relation):
line_nodes = set()
last_track = []
track = []
warned_about_holes = False
for m in relation['members']:
el = self.city.elements.get(el_id(m), None)
if not el or not StopArea.is_track(el):
continue
if 'nodes' not in el or len(el['nodes']) < 2:
self.city.error('Cannot find nodes in a railway', el)
continue
nodes = ['n{}'.format(n) for n in el['nodes']]
if m['role'] == 'backward':
nodes.reverse()
line_nodes.update(nodes)
if not track:
is_first = True
track.extend(nodes)
else:
new_segment = list(nodes) # copying
if new_segment[0] == track[-1]:
track.extend(new_segment[1:])
elif new_segment[-1] == track[-1]:
track.extend(reversed(new_segment[:-1]))
elif is_first and track[0] in (
new_segment[0],
new_segment[-1],
):
# We can reverse the track and try again
track.reverse()
if new_segment[0] == track[-1]:
track.extend(new_segment[1:])
else:
track.extend(reversed(new_segment[:-1]))
else:
# Store the track if it is long and clean it
if not warned_about_holes:
self.city.warn(
'Hole in route rails near node {}'.format(
track[-1]
),
relation,
)
warned_about_holes = True
if len(track) > len(last_track):
last_track = track
track = []
is_first = False
if len(track) > len(last_track):
last_track = track
# Remove duplicate points
last_track = [
last_track[i]
for i in range(0, len(last_track))
if i == 0 or last_track[i - 1] != last_track[i]
]
return last_track, line_nodes
def project_stops_on_line(self):
projected = [project_on_line(x.stop, self.tracks) for x in self.stops]
def is_stop_near_tracks(stop_index):
return (
projected[stop_index]['projected_point'] is not None
and distance(
self.stops[stop_index].stop,
projected[stop_index]['projected_point'],
)
<= MAX_DISTANCE_STOP_TO_LINE
)
start = 0
while start < len(self.stops) and not is_stop_near_tracks(start):
start += 1
end = len(self.stops) - 1
while end > start and not is_stop_near_tracks(end):
end -= 1
tracks_start = []
tracks_end = []
stops_on_longest_line = []
for i, route_stop in enumerate(self.stops):
if i < start:
tracks_start.append(route_stop.stop)
elif i > end:
tracks_end.append(route_stop.stop)
elif projected[i]['projected_point'] is None:
self.city.error(
'Stop "{}" {} is nowhere near the tracks'.format(
route_stop.stoparea.name, route_stop.stop
),
self.element,
)
else:
projected_point = projected[i]['projected_point']
# We've got two separate stations with a good stretch of
# railway tracks between them. Put these on tracks.
d = round(distance(route_stop.stop, projected_point))
if d > MAX_DISTANCE_STOP_TO_LINE:
self.city.notice(
'Stop "{}" {} is {} meters from the tracks'.format(
route_stop.stoparea.name, route_stop.stop, d
),
self.element,
)
else:
route_stop.stop = projected_point
route_stop.positions_on_rails = projected[i][
'positions_on_line'
]
stops_on_longest_line.append(route_stop)
if start >= len(self.stops):
self.tracks = tracks_start
elif tracks_start or tracks_end:
self.tracks = tracks_start + self.tracks + tracks_end
return stops_on_longest_line
def calculate_distances(self):
dist = 0
vertex = 0
for i, stop in enumerate(self.stops):
if i > 0:
direct = distance(stop.stop, self.stops[i - 1].stop)
d_line = distance_on_line(
self.stops[i - 1].stop, stop.stop, self.tracks, vertex
)
if d_line and direct - 10 <= d_line[0] <= direct * 2:
vertex = d_line[1]
dist += round(d_line[0])
else:
dist += round(direct)
stop.distance = dist
def __init__(self, relation, city, master=None):
if not Route.is_route(relation, city.modes):
raise Exception(
'The relation does not seem a route: {}'.format(relation)
)
master_tags = {} if not master else master['tags']
self.city = city
self.element = relation
self.id = el_id(relation)
if 'ref' not in relation['tags'] and 'ref' not in master_tags:
city.notice('Missing ref on a route', relation)
self.ref = relation['tags'].get(
'ref', master_tags.get('ref', relation['tags'].get('name', None))
)
self.name = relation['tags'].get('name', None)
self.mode = relation['tags']['route']
if (
'colour' not in relation['tags']
and 'colour' not in master_tags
and self.mode != 'tram'
):
city.notice('Missing colour on a route', relation)
try:
self.colour = normalize_colour(
relation['tags'].get('colour', master_tags.get('colour', None))
)
except ValueError as e:
self.colour = None
city.warn(str(e), relation)
try:
self.infill = normalize_colour(
relation['tags'].get(
'colour:infill', master_tags.get('colour:infill', None)
)
)
except ValueError as e:
self.infill = None
city.warn(str(e), relation)
self.network = Route.get_network(relation)
self.interval = Route.get_interval(
relation['tags']
) or Route.get_interval(master_tags)
if relation['tags'].get('public_transport:version') == '1':
city.warn(
'Public transport version is 1, which means the route '
'is an unsorted pile of objects',
relation,
)
self.is_circular = False
# self.tracks would be a list of (lon, lat) for the longest stretch. Can be empty
tracks, line_nodes = self.build_longest_line(relation)
self.tracks = [el_center(city.elements.get(k)) for k in tracks]
if (
None in self.tracks
): # usually, extending BBOX for the city is needed
self.tracks = []
for n in filter(lambda x: x not in city.elements, tracks):
city.warn(
'The dataset is missing the railway tracks node {}'.format(
n
),
relation,
)
break
self.stops = [] # List of RouteStop
stations = set() # temporary for recording stations
seen_stops = False
seen_platforms = False
repeat_pos = None
for m in relation['members']:
if 'inactive' in m['role']:
continue
k = el_id(m)
if k in city.stations:
st_list = city.stations[k]
st = st_list[0]
if len(st_list) > 1:
city.error(
'Ambiguous station {} in route. Please use stop_position or split '
'interchange stations'.format(st.name),
relation,
)
el = city.elements[k]
actual_role = RouteStop.get_actual_role(
el, m['role'], city.modes
)
if actual_role:
if m['role'] and actual_role not in m['role']:
city.warn(
"Wrong role '{}' for {} {}".format(
m['role'], actual_role, k
),
relation,
)
if repeat_pos is None:
if not self.stops or st not in stations:
stop = RouteStop(st)
self.stops.append(stop)
stations.add(st)
elif self.stops[-1].stoparea.id == st.id:
stop = self.stops[-1]
else:
# We've got a repeat
if (
(seen_stops and seen_platforms)
or (
actual_role == 'stop'
and not seen_platforms
)
or (
actual_role == 'platform'
and not seen_stops
)
):
# Circular route!
stop = RouteStop(st)
self.stops.append(stop)
stations.add(st)
else:
repeat_pos = 0
if repeat_pos is not None:
if repeat_pos >= len(self.stops):
continue
# Check that the type matches
if (actual_role == 'stop' and seen_stops) or (
actual_role == 'platform' and seen_platforms
):
city.error(
'Found an out-of-place {}: "{}" ({})'.format(
actual_role, el['tags'].get('name', ''), k
),
relation,
)
continue
# Find the matching stop starting with index repeat_pos
while (
repeat_pos < len(self.stops)
and self.stops[repeat_pos].stoparea.id != st.id
):
repeat_pos += 1
if repeat_pos >= len(self.stops):
city.error(
'Incorrect order of {}s at {}'.format(
actual_role, k
),
relation,
)
continue
stop = self.stops[repeat_pos]
stop.add(m, relation, city)
if repeat_pos is None:
seen_stops |= stop.seen_stop or stop.seen_station
seen_platforms |= stop.seen_platform
if StopArea.is_stop(el):
if k not in line_nodes:
city.warn(
'Stop position "{}" ({}) is not on tracks'.format(
el['tags'].get('name', ''), k
),
relation,
)
continue
if k not in city.elements:
if 'stop' in m['role'] or 'platform' in m['role']:
raise CriticalValidationError(
'{} {} {} for route relation {} is not in the dataset'.format(
m['role'], m['type'], m['ref'], relation['id']
)
)
continue
el = city.elements[k]
if 'tags' not in el:
city.error('Untagged object {} in a route'.format(k), relation)
continue
is_under_construction = False
for ck in CONSTRUCTION_KEYS:
if ck in el['tags']:
city.warn(
'Under construction {} {} in route. Consider '
'setting \'inactive\' role or removing construction attributes'.format(
m['role'] or 'feature', k
),
relation,
)
is_under_construction = True
break
if is_under_construction:
continue
if Station.is_station(el, city.modes):
# A station may be not included into this route due to previous
# 'stop area has multiple stations' error. No other error message is needed.
pass
elif el['tags'].get('railway') in ('station', 'halt'):
city.error(
'Missing station={} on a {}'.format(self.mode, m['role']),
el,
)
else:
actual_role = RouteStop.get_actual_role(
el, m['role'], city.modes
)
if actual_role:
city.error(
'{} {} {} is not connected to a station in route'.format(
actual_role, m['type'], m['ref']
),
relation,
)
elif not StopArea.is_track(el):
city.warn(
'Unknown member type for {} {} in route'.format(
m['type'], m['ref']
),
relation,
)
if len(self.stops) > 1:
self.is_circular = (
self.stops[0].stoparea == self.stops[-1].stoparea
)
stops_on_longest_line = self.project_stops_on_line()
self.check_and_recover_stops_order(stops_on_longest_line)
self.calculate_distances()
def check_stops_order_by_angle(self):
disorder_warnings = []
disorder_errors = []
for si in range(len(self.stops) - 2):
angle = angle_between(
self.stops[si].stop,
self.stops[si + 1].stop,
self.stops[si + 2].stop,
)
if angle < ALLOWED_ANGLE_BETWEEN_STOPS:
msg = 'Angle between stops around "{}" is too narrow, {} degrees'.format(
self.stops[si + 1].stoparea.name, angle
)
if angle < DISALLOWED_ANGLE_BETWEEN_STOPS:
disorder_errors.append(msg)
else:
disorder_warnings.append(msg)
return disorder_warnings, disorder_errors
def check_stops_order_on_tracks_direct(self, stop_sequence):
"""Checks stops order on tracks, following stop_sequence
in direct order only.
:param stop_sequence: list of RouteStop that belong to the
longest contiguous sequence of tracks in a route.
:return: error message on the first order violation or None.
"""
def make_assertion_error_msg(route_stop, error_type):
return (
"stop_area {} '{}' has {} 'positions_on_rails' "
"attribute in route {}".format(
route_stop.stoparea.id,
route_stop.stoparea.name,
"no" if error_type == 1 else "empty",
self.id,
)
)
allowed_order_violations = 1 if self.is_circular else 0
max_position_on_rails = -1
for route_stop in stop_sequence:
assert hasattr(
route_stop, 'positions_on_rails'
), make_assertion_error_msg(route_stop, error_type=1)
positions_on_rails = route_stop.positions_on_rails
assert positions_on_rails, make_assertion_error_msg(
route_stop, error_type=2
)
suitable_occurrence = 0
while (
suitable_occurrence < len(positions_on_rails)
and positions_on_rails[suitable_occurrence]
< max_position_on_rails
):
suitable_occurrence += 1
if suitable_occurrence == len(positions_on_rails):
if allowed_order_violations > 0:
suitable_occurrence -= 1
allowed_order_violations -= 1
else:
return 'Stops on tracks are unordered near "{}" {}'.format(
route_stop.stoparea.name, route_stop.stop
)
max_position_on_rails = positions_on_rails[suitable_occurrence]
def check_stops_order_on_tracks(self, stop_sequence):
"""Checks stops order on tracks, trying direct and reversed
order of stops in the stop_sequence.
:param stop_sequence: list of RouteStop that belong to the
longest contiguous sequence of tracks in a route.
:return: error message on the first order violation or None.
"""
error_message = self.check_stops_order_on_tracks_direct(stop_sequence)
if error_message:
error_message_reversed = self.check_stops_order_on_tracks_direct(
reversed(stop_sequence)
)
if error_message_reversed is None:
error_message = None
self.city.warn(
'Tracks seem to go in the opposite direction to stops',
self.element,
)
return error_message
def check_stops_order(self, stops_on_longest_line):
(
angle_disorder_warnings,
angle_disorder_errors,
) = self.check_stops_order_by_angle()
disorder_on_tracks_error = self.check_stops_order_on_tracks(
stops_on_longest_line
)
disorder_warnings = angle_disorder_warnings
disorder_errors = angle_disorder_errors
if disorder_on_tracks_error:
disorder_errors.append(disorder_on_tracks_error)
return disorder_warnings, disorder_errors
def check_and_recover_stops_order(self, stops_on_longest_line):
disorder_warnings, disorder_errors = self.check_stops_order(
stops_on_longest_line
)
if disorder_warnings or disorder_errors:
resort_success = False
if self.city.recovery_data:
resort_success = self.try_resort_stops()
if resort_success:
for msg in disorder_warnings:
self.city.notice(msg, self.element)
for msg in disorder_errors:
self.city.warn(
"Fixed with recovery data: " + msg, self.element
)
if not resort_success:
for msg in disorder_warnings:
self.city.notice(msg, self.element)
for msg in disorder_errors:
self.city.error(msg, self.element)
def try_resort_stops(self):
"""Precondition: self.city.recovery_data is not None.
Return success of station order recovering."""
self_stops = {} # station name => RouteStop
for stop in self.stops:
station = stop.stoparea.station
stop_name = station.name
if stop_name == '?' and station.int_name:
stop_name = station.int_name
# We won't programmatically recover routes with repeating stations:
# such cases are rare and deserves manual verification
if stop_name in self_stops:
return False
self_stops[stop_name] = stop
route_id = (self.colour, self.ref)
if route_id not in self.city.recovery_data:
return False
stop_names = list(self_stops.keys())
suitable_itineraries = []
for itinerary in self.city.recovery_data[route_id]:
itinerary_stop_names = [
stop['name'] for stop in itinerary['stations']
]
if not (
len(stop_names) == len(itinerary_stop_names)
and sorted(stop_names) == sorted(itinerary_stop_names)
):
continue
big_station_displacement = False
for it_stop in itinerary['stations']:
name = it_stop['name']
it_stop_center = it_stop['center']
self_stop_center = self_stops[name].stoparea.station.center
if (
distance(it_stop_center, self_stop_center)
> DISPLACEMENT_TOLERANCE
):
big_station_displacement = True
break
if not big_station_displacement:
suitable_itineraries.append(itinerary)
if len(suitable_itineraries) == 0:
return False
elif len(suitable_itineraries) == 1:
matching_itinerary = suitable_itineraries[0]
else:
from_tag = self.element['tags'].get('from')
to_tag = self.element['tags'].get('to')
if not from_tag and not to_tag:
return False
matching_itineraries = [
itin
for itin in suitable_itineraries
if from_tag
and itin['from'] == from_tag
or to_tag
and itin['to'] == to_tag
]
if len(matching_itineraries) != 1:
return False
matching_itinerary = matching_itineraries[0]
self.stops = [
self_stops[stop['name']] for stop in matching_itinerary['stations']
]
return True
def __len__(self):
return len(self.stops)
def __getitem__(self, i):
return self.stops[i]
def __iter__(self):
return iter(self.stops)
def __repr__(self):
return (
'Route(id={}, mode={}, ref={}, name={}, network={}, interval={}, '
'circular={}, num_stops={}, line_length={} m, from={}, to={}'
).format(
self.id,
self.mode,
self.ref,
self.name,
self.network,
self.interval,
self.is_circular,
len(self.stops),
self.stops[-1].distance,
self.stops[0],
self.stops[-1],
)
class RouteMaster:
def __init__(self, master=None):
self.routes = []
self.best = None
self.id = el_id(master)
self.has_master = master is not None
self.interval_from_master = False
if master:
self.ref = master['tags'].get(
'ref', master['tags'].get('name', None)
)
try:
self.colour = normalize_colour(
master['tags'].get('colour', None)
)
except ValueError:
self.colour = None
try:
self.infill = normalize_colour(
master['tags'].get('colour:infill', None)
)
except ValueError:
self.colour = None
self.network = Route.get_network(master)
self.mode = master['tags'].get(
'route_master', None
) # This tag is required, but okay
self.name = master['tags'].get('name', None)
self.interval = Route.get_interval(master['tags'])
self.interval_from_master = self.interval is not None
else:
self.ref = None
self.colour = None
self.infill = None
self.network = None
self.mode = None
self.name = None
self.interval = None
def add(self, route, city):
if not self.network:
self.network = route.network
elif route.network and route.network != self.network:
city.error(
'Route has different network ("{}") from master "{}"'.format(
route.network, self.network
),
route.element,
)
if not self.colour:
self.colour = route.colour
elif route.colour and route.colour != self.colour:
city.notice(
'Route "{}" has different colour from master "{}"'.format(
route.colour, self.colour
),
route.element,
)
if not self.infill:
self.infill = route.infill
elif route.infill and route.infill != self.infill:
city.notice(
'Route "{}" has different infill colour from master "{}"'.format(
route.infill, self.infill
),
route.element,
)
if not self.ref:
self.ref = route.ref
elif route.ref != self.ref:
city.notice(
'Route "{}" has different ref from master "{}"'.format(
route.ref, self.ref
),
route.element,
)
if not self.name:
self.name = route.name
if not self.mode:
self.mode = route.mode
elif route.mode != self.mode:
city.error(
'Incompatible PT mode: master has {} and route has {}'.format(
self.mode, route.mode
),
route.element,
)
return
if not self.interval_from_master and route.interval:
if not self.interval:
self.interval = route.interval
else:
self.interval = min(self.interval, route.interval)
if not self.has_master and (not self.id or self.id > route.id):
self.id = route.id
self.routes.append(route)
if not self.best or len(route.stops) > len(self.best.stops):
self.best = route
def stop_areas(self):
"""Returns a list of all stations on all route variants."""
seen_ids = set()
for route in self.routes:
for stop in route:
st = stop.stoparea
if st.id not in seen_ids:
seen_ids.add(st.id)
yield st
def __len__(self):
return len(self.routes)
def __getitem__(self, i):
return self.routes[i]
def __iter__(self):
return iter(self.routes)
def __repr__(self):
return 'RouteMaster(id={}, mode={}, ref={}, name={}, network={}, num_variants={}'.format(
self.id,
self.mode,
self.ref,
self.name,
self.network,
len(self.routes),
)
class City:
def __init__(self, row, overground=False):
self.errors = []
self.warnings = []
self.notices = []
self.name = row[1]
self.country = row[2]
self.continent = row[3]
if not row[0]:
self.error('City {} does not have an id'.format(self.name))
self.id = int(row[0] or '0')
self.overground = overground
if not overground:
self.num_stations = int(row[4])
self.num_lines = int(row[5] or '0')
self.num_light_lines = int(row[6] or '0')
self.num_interchanges = int(row[7] or '0')
else:
self.num_tram_lines = int(row[4] or '0')
self.num_trolleybus_lines = int(row[5] or '0')
self.num_bus_lines = int(row[6] or '0')
self.num_other_lines = int(row[7] or '0')
# Aquiring list of networks and modes
networks = None if len(row) <= 9 else row[9].split(':')
if not networks or len(networks[-1]) == 0:
self.networks = []
else:
self.networks = set(
filter(None, [x.strip() for x in networks[-1].split(';')])
)
if not networks or len(networks) < 2 or len(networks[0]) == 0:
if self.overground:
self.modes = DEFAULT_MODES_OVERGROUND
else:
self.modes = DEFAULT_MODES_RAPID
else:
self.modes = set([x.strip() for x in networks[0].split(',')])
# Reversing bbox so it is (xmin, ymin, xmax, ymax)
bbox = row[8].split(',')
if len(bbox) == 4:
self.bbox = [float(bbox[i]) for i in (1, 0, 3, 2)]
else:
self.bbox = None
self.elements = {} # Dict el_id → el
self.stations = defaultdict(list) # Dict el_id → list of StopAreas
self.routes = {} # Dict route_ref → route
self.masters = {} # Dict el_id of route → route_master
self.stop_areas = defaultdict(
list
) # El_id → list of el_id of stop_area
self.transfers = [] # List of lists of stop areas
self.station_ids = set() # Set of stations' uid
self.stops_and_platforms = set() # Set of stops and platforms el_id
self.recovery_data = None
@staticmethod
def log_message(message, el):
if el:
tags = el.get('tags', {})
message += ' ({} {}, "{}")'.format(
el['type'],
el.get('id', el.get('ref')),
tags.get('name', tags.get('ref', '')),
)
return message
def notice(self, message, el=None):
"""This type of message may point to a potential problem."""
msg = City.log_message(message, el)
self.notices.append(msg)
def warn(self, message, el=None):
"""A warning is definitely a problem but is doesn't prevent
from building a routing file and doesn't invalidate the city.
"""
msg = City.log_message(message, el)
self.warnings.append(msg)
def error(self, message, el=None):
"""Error if a critical problem that invalidates the city"""
msg = City.log_message(message, el)
self.errors.append(msg)
def contains(self, el):
center = el_center(el)
if center:
return (
self.bbox[0] <= center[1] <= self.bbox[2]
and self.bbox[1] <= center[0] <= self.bbox[3]
)
return False
def add(self, el):
if el['type'] == 'relation' and 'members' not in el:
return
self.elements[el_id(el)] = el
if el['type'] == 'relation' and 'tags' in el:
if el['tags'].get('type') == 'route_master':
for m in el['members']:
if m['type'] == 'relation':
if el_id(m) in self.masters:
self.error('Route in two route_masters', m)
self.masters[el_id(m)] = el
elif el['tags'].get('public_transport') == 'stop_area':
warned_about_duplicates = False
for m in el['members']:
stop_areas = self.stop_areas[el_id(m)]
if el in stop_areas:
if not warned_about_duplicates:
self.warn('Duplicate element in a stop area', el)
warned_about_duplicates = True
else:
stop_areas.append(el)
def make_transfer(self, sag):
transfer = set()
for m in sag['members']:
k = el_id(m)
el = self.elements.get(k)
if not el:
# A sag member may validly not belong to the city while
# the sag does - near the city bbox boundary
continue
if 'tags' not in el:
self.warn(
'An untagged object {} in a stop_area_group'.format(k), sag
)
continue
if (
el['type'] != 'relation'
or el['tags'].get('type') != 'public_transport'
or el['tags'].get('public_transport') != 'stop_area'
):
continue
if k in self.stations:
stoparea = self.stations[k][0]
transfer.add(stoparea)
if stoparea.transfer:
# TODO: properly process such cases.
# Counterexample 1: Paris,
# Châtelet subway station <->
# "Châtelet - Les Halles" railway station <->
# Les Halles subway station
# Counterexample 2: Saint-Petersburg, transfers Витебский вокзал <-> Пушкинская <-> Звенигородская
self.warn(
'Stop area {} belongs to multiple interchanges'.format(
k
)
)
stoparea.transfer = el_id(sag)
if len(transfer) > 1:
self.transfers.append(transfer)
def extract_routes(self):
# Extract stations
processed_stop_areas = set()
for el in self.elements.values():
if Station.is_station(el, self.modes):
# See PR https://github.com/mapsme/subways/pull/98
if (
el['type'] == 'relation'
and el['tags'].get('type') != 'multipolygon'
):
self.warn(
"A railway station cannot be a relation of type '{}'".format(
el['tags'].get('type')
),
el,
)
continue
st = Station(el, self)
self.station_ids.add(st.id)
if st.id in self.stop_areas:
stations = []
for sa in self.stop_areas[st.id]:
stations.append(StopArea(st, self, sa))
else:
stations = [StopArea(st, self)]
for station in stations:
if station.id not in processed_stop_areas:
processed_stop_areas.add(station.id)
for st_el in station.get_elements():
self.stations[st_el].append(station)
# Check that stops and platforms belong to single stop_area
for sp in station.stops | station.platforms:
if sp in self.stops_and_platforms:
self.notice(
'A stop or a platform {} belongs to multiple '
'stop areas, might be correct'.format(sp)
)
else:
self.stops_and_platforms.add(sp)
# Extract routes
for el in self.elements.values():
if Route.is_route(el, self.modes):
if el['tags'].get('access') in ('no', 'private'):
continue
route_id = el_id(el)
master = self.masters.get(route_id, None)
if self.networks:
network = Route.get_network(el)
if master:
master_network = Route.get_network(master)
else:
master_network = None
if (
network not in self.networks
and master_network not in self.networks
):
continue
route = Route(el, self, master)
if not route.stops:
self.warn('Route has no stops', el)
continue
elif len(route.stops) == 1:
self.warn('Route has only one stop', el)
continue
k = el_id(master) if master else route.ref
if k not in self.routes:
self.routes[k] = RouteMaster(master)
self.routes[k].add(route, self)
# Sometimes adding a route to a newly initialized RouteMaster can fail
if len(self.routes[k]) == 0:
del self.routes[k]
# And while we're iterating over relations, find interchanges
if (
el['type'] == 'relation'
and el.get('tags', {}).get('public_transport', None)
== 'stop_area_group'
):
self.make_transfer(el)
# Filter transfers, leaving only stations that belong to routes
used_stop_areas = set()
for rmaster in self.routes.values():
for route in rmaster:
used_stop_areas.update([s.stoparea for s in route.stops])
new_transfers = []
for transfer in self.transfers:
new_tr = [s for s in transfer if s in used_stop_areas]
if len(new_tr) > 1:
new_transfers.append(new_tr)
self.transfers = new_transfers
def __iter__(self):
return iter(self.routes.values())
def is_good(self):
return len(self.errors) == 0
def get_validation_result(self):
result = {
'name': self.name,
'country': self.country,
'continent': self.continent,
'stations_found': getattr(self, 'found_stations', 0),
'transfers_found': getattr(self, 'found_interchanges', 0),
'unused_entrances': getattr(self, 'unused_entrances', 0),
'networks': getattr(self, 'found_networks', 0),
}
if not self.overground:
result.update(
{
'subwayl_expected': self.num_lines,
'lightrl_expected': self.num_light_lines,
'subwayl_found': getattr(self, 'found_lines', 0),
'lightrl_found': getattr(self, 'found_light_lines', 0),
'stations_expected': self.num_stations,
'transfers_expected': self.num_interchanges,
}
)
else:
result.update(
{
'stations_expected': 0,
'transfers_expected': 0,
'busl_expected': self.num_bus_lines,
'trolleybusl_expected': self.num_trolleybus_lines,
'traml_expected': self.num_tram_lines,
'otherl_expected': self.num_other_lines,
'busl_found': getattr(self, 'found_bus_lines', 0),
'trolleybusl_found': getattr(
self, 'found_trolleybus_lines', 0
),
'traml_found': getattr(self, 'found_tram_lines', 0),
'otherl_found': getattr(self, 'found_other_lines', 0),
}
)
result['warnings'] = self.warnings
result['errors'] = self.errors
result['notices'] = self.notices
return result
def count_unused_entrances(self):
global used_entrances
stop_areas = set()
for el in self.elements.values():
if (
el['type'] == 'relation'
and 'tags' in el
and el['tags'].get('public_transport') == 'stop_area'
and 'members' in el
):
stop_areas.update([el_id(m) for m in el['members']])
unused = []
not_in_sa = []
for el in self.elements.values():
if (
el['type'] == 'node'
and 'tags' in el
and el['tags'].get('railway') == 'subway_entrance'
):
i = el_id(el)
if i in self.stations:
used_entrances.add(i)
if i not in stop_areas:
not_in_sa.append(i)
if i not in self.stations:
unused.append(i)
self.unused_entrances = len(unused)
self.entrances_not_in_stop_areas = len(not_in_sa)
if unused:
self.notice(
'{} subway entrances are not connected to a station: {}'.format(
len(unused), format_elid_list(unused)
)
)
if not_in_sa:
self.notice(
'{} subway entrances are not in stop_area relations: {}'.format(
len(not_in_sa), format_elid_list(not_in_sa)
)
)
def check_return_routes(self, rmaster):
variants = {}
have_return = set()
for variant in rmaster:
if len(variant) < 2:
continue
# Using transfer ids because a train can arrive at different stations within a transfer
# But disregard transfer that may give an impression of a circular route
# (for example, Simonis / Elisabeth station and route 2 in Brussels)
if variant[0].stoparea.transfer == variant[-1].stoparea.transfer:
t = (variant[0].stoparea.id, variant[-1].stoparea.id)
else:
t = (
variant[0].stoparea.transfer or variant[0].stoparea.id,
variant[-1].stoparea.transfer or variant[-1].stoparea.id,
)
if t in variants:
continue
variants[t] = variant.element
tr = (t[1], t[0])
if tr in variants:
have_return.add(t)
have_return.add(tr)
if len(variants) == 0:
self.error(
'An empty route master {}. Please set construction:route '
'if it is under construction'.format(rmaster.id)
)
elif len(variants) == 1:
log_function = self.error if not rmaster.best.is_circular else self.notice
log_function(
'Only one route in route_master. '
'Please check if it needs a return route',
rmaster.best.element,
)
else:
for t, rel in variants.items():
if t not in have_return:
self.notice('Route does not have a return direction', rel)
def validate_lines(self):
self.found_light_lines = len(
[x for x in self.routes.values() if x.mode != 'subway']
)
self.found_lines = len(self.routes) - self.found_light_lines
if self.found_lines != self.num_lines:
self.error(
'Found {} subway lines, expected {}'.format(
self.found_lines, self.num_lines
)
)
if self.found_light_lines != self.num_light_lines:
self.error(
'Found {} light rail lines, expected {}'.format(
self.found_light_lines, self.num_light_lines
)
)
def validate_overground_lines(self):
self.found_tram_lines = len(
[x for x in self.routes.values() if x.mode == 'tram']
)
self.found_bus_lines = len(
[x for x in self.routes.values() if x.mode == 'bus']
)
self.found_trolleybus_lines = len(
[x for x in self.routes.values() if x.mode == 'trolleybus']
)
self.found_other_lines = len(
[
x
for x in self.routes.values()
if x.mode not in ('bus', 'trolleybus', 'tram')
]
)
if self.found_tram_lines != self.num_tram_lines:
log_function = self.error if self.found_tram_lines == 0 else self.notice
log_function(
'Found {} tram lines, expected {}'.format(
self.found_tram_lines, self.num_tram_lines
),
)
def validate(self):
networks = Counter()
self.found_stations = 0
unused_stations = set(self.station_ids)
for rmaster in self.routes.values():
networks[str(rmaster.network)] += 1
if not self.overground:
self.check_return_routes(rmaster)
route_stations = set()
for sa in rmaster.stop_areas():
route_stations.add(sa.transfer or sa.id)
unused_stations.discard(sa.station.id)
self.found_stations += len(route_stations)
if unused_stations:
self.unused_stations = len(unused_stations)
self.notice(
'{} unused stations: {}'.format(
self.unused_stations, format_elid_list(unused_stations)
)
)
self.count_unused_entrances()
self.found_interchanges = len(self.transfers)
if self.overground:
self.validate_overground_lines()
else:
self.validate_lines()
if self.found_stations != self.num_stations:
msg = 'Found {} stations in routes, expected {}'.format(
self.found_stations, self.num_stations
)
log_function = (
self.error
if not (
0
<= (self.num_stations - self.found_stations)
/ self.num_stations
<= ALLOWED_STATIONS_MISMATCH
)
else self.warn
)
log_function(msg)
if self.found_interchanges != self.num_interchanges:
msg = 'Found {} interchanges, expected {}'.format(
self.found_interchanges, self.num_interchanges
)
log_function = (
self.error
if self.num_interchanges != 0
and not (
(self.num_interchanges - self.found_interchanges)
/ self.num_interchanges
<= ALLOWED_TRANSFERS_MISMATCH
)
else self.warn
)
log_function(msg)
self.found_networks = len(networks)
if len(networks) > max(1, len(self.networks)):
n_str = '; '.join(
['{} ({})'.format(k, v) for k, v in networks.items()]
)
self.notice('More than one network: {}'.format(n_str))
def find_transfers(elements, cities):
transfers = []
stop_area_groups = []
for el in elements:
if (
el['type'] == 'relation'
and 'members' in el
and el.get('tags', {}).get('public_transport') == 'stop_area_group'
):
stop_area_groups.append(el)
# StopArea.id uniquely identifies a StopArea.
# We must ensure StopArea uniqueness since one stop_area relation may result in
# several StopArea instances at inter-city interchanges.
stop_area_ids = defaultdict(set) # el_id -> set of StopArea.id
stop_area_objects = dict() # StopArea.id -> one of StopArea instances
for city in cities:
for el, st in city.stations.items():
stop_area_ids[el].update(sa.id for sa in st)
stop_area_objects.update((sa.id, sa) for sa in st)
for sag in stop_area_groups:
transfer = set()
for m in sag['members']:
k = el_id(m)
if k not in stop_area_ids:
continue
transfer.update(
stop_area_objects[sa_id] for sa_id in stop_area_ids[k]
)
if len(transfer) > 1:
transfers.append(transfer)
return transfers
def get_unused_entrances_geojson(elements):
global used_entrances
features = []
for el in elements:
if (
el['type'] == 'node'
and 'tags' in el
and el['tags'].get('railway') == 'subway_entrance'
):
if el_id(el) not in used_entrances:
geometry = {'type': 'Point', 'coordinates': el_center(el)}
properties = {
k: v
for k, v in el['tags'].items()
if k not in ('railway', 'entrance')
}
features.append(
{
'type': 'Feature',
'geometry': geometry,
'properties': properties,
}
)
return {'type': 'FeatureCollection', 'features': features}
def download_cities(overground=False):
url = (
'https://docs.google.com/spreadsheets/d/{}/export?format=csv{}'.format(
SPREADSHEET_ID, '&gid=1881416409' if overground else ''
)
)
response = urllib.request.urlopen(url)
if response.getcode() != 200:
raise Exception(
'Failed to download cities spreadsheet: HTTP {}'.format(
response.getcode()
)
)
data = response.read().decode('utf-8')
r = csv.reader(data.splitlines())
next(r) # skipping the header
names = set()
cities = []
for row in r:
if len(row) > 8 and row[8]:
cities.append(City(row, overground))
if row[0].strip() in names:
logging.warning(
'Duplicate city name in the google spreadsheet: %s', row[0]
)
names.add(row[0].strip())
return cities