diff --git a/.gitignore b/.gitignore index f2fb32f..129911a 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ tmp_html/ html/ .idea .DS_Store +.venv *.log *.json *.geojson diff --git a/subway_io.py b/subway_io.py index cbd252a..4b02596 100644 --- a/subway_io.py +++ b/subway_io.py @@ -88,7 +88,7 @@ def dump_yaml(city, f): routes = [] for route in city: stations = OrderedDict( - [(sa.transfer or sa.id, sa.name) for sa in route.stop_areas()] + [(sa.transfer or sa.id, sa.name) for sa in route.stopareas()] ) rte = { "type": route.mode, diff --git a/subway_structure.py b/subway_structure.py index c7e7327..d486d90 100644 --- a/subway_structure.py +++ b/subway_structure.py @@ -5,6 +5,7 @@ import re from collections import Counter, defaultdict from collections.abc import Collection, Iterator from itertools import chain, islice +from typing import TypeVar from css_colours import normalize_colour @@ -49,6 +50,7 @@ START_END_TIMES_RE = re.compile(r".*?(\d{2}):(\d{2})-(\d{2}):(\d{2}).*") IdT = str # Type of feature ids TransferT = set[IdT] # A transfer is a set of StopArea IDs TransfersT = Collection[TransferT] +T = TypeVar("T") def get_start_end_times(opening_hours): @@ -626,7 +628,7 @@ class RouteStop: class Route: - """The longest route for a city with a unique ref.""" + """Corresponds to OSM "type=route" relation""" @staticmethod def is_route(el, modes): @@ -677,7 +679,12 @@ class Route: yield stoparea yielded_stopareas.add(stoparea) - def __init__(self, relation, city, master=None): + def __init__( + self, + relation: dict, + city: City, + master: dict | None = None, + ) -> None: assert Route.is_route( relation, city.modes ), f"The relation does not seem to be a route: {relation}" @@ -1440,7 +1447,8 @@ class Route: class RouteMaster: - def __init__(self, master=None): + def __init__(self, city: City, master: dict = None) -> None: + self.city = city self.routes = [] self.best = None self.id = el_id(master) @@ -1486,11 +1494,11 @@ class RouteMaster: yield stoparea yielded_stopareas.add(stoparea) - def add(self, route, city): + def add(self, route: Route) -> None: if not self.network: self.network = route.network elif route.network and route.network != self.network: - city.error( + self.city.error( 'Route has different network ("{}") from master "{}"'.format( route.network, self.network ), @@ -1500,7 +1508,7 @@ class RouteMaster: if not self.colour: self.colour = route.colour elif route.colour and route.colour != self.colour: - city.notice( + self.city.notice( 'Route "{}" has different colour from master "{}"'.format( route.colour, self.colour ), @@ -1510,7 +1518,7 @@ class RouteMaster: if not self.infill: self.infill = route.infill elif route.infill and route.infill != self.infill: - city.notice( + self.city.notice( ( f'Route "{route.infill}" has different infill colour ' f'from master "{self.infill}"' @@ -1521,7 +1529,7 @@ class RouteMaster: if not self.ref: self.ref = route.ref elif route.ref != self.ref: - city.notice( + self.city.notice( 'Route "{}" has different ref from master "{}"'.format( route.ref, self.ref ), @@ -1534,7 +1542,7 @@ class RouteMaster: if not self.mode: self.mode = route.mode elif route.mode != self.mode: - city.error( + self.city.error( "Incompatible PT mode: master has {} and route has {}".format( self.mode, route.mode ), @@ -1568,8 +1576,8 @@ class RouteMaster: return [route for route in self if len(route) >= 2] def find_twin_routes(self) -> dict[Route, Route]: - """Two routes are twins if they have the same end stations - and opposite directions, and the number of stations is + """Two non-circular routes are twins if they have the same end + stations and opposite directions, and the number of stations is the same or almost the same. We'll then find stops that are present in one direction and is missing in another direction - to warn. """ @@ -1581,8 +1589,6 @@ class RouteMaster: continue # Difficult to calculate. TODO(?) in the future if route in twin_routes: continue - if len(route) < 2: - continue route_transfer_ids = set(route.get_transfers_sequence()) ends = route.get_end_transfers() @@ -1617,15 +1623,253 @@ class RouteMaster: return twin_routes - def stop_areas(self): - """Returns a list of all stations on all route variants.""" - seen_ids = set() - for route in self.routes: - for stop in route: - st = stop.stoparea - if st.id not in seen_ids: - seen_ids.add(st.id) - yield st + def check_return_routes(self) -> None: + """Check if a route has return direction, and if twin routes + miss stations. + """ + meaningful_routes = self.get_meaningful_routes() + + if len(meaningful_routes) == 0: + self.city.error( + f"An empty route master {self.id}. " + "Please set construction:route if it is under construction" + ) + elif len(meaningful_routes) == 1: + log_function = ( + self.city.error + if not self.best.is_circular + else self.city.notice + ) + log_function( + "Only one route in route_master. " + "Please check if it needs a return route", + self.best.element, + ) + else: + self.check_return_circular_routes() + self.check_return_noncircular_routes() + + def check_return_noncircular_routes(self) -> None: + routes = [ + route + for route in self.get_meaningful_routes() + if not route.is_circular + ] + all_ends = {route.get_end_transfers(): route for route in routes} + for route in routes: + ends = route.get_end_transfers() + if ends[::-1] not in all_ends: + self.city.notice( + "Route does not have a return direction", route.element + ) + + twin_routes = self.find_twin_routes() + for route1, route2 in twin_routes.items(): + if route1.id > route2.id: + continue # to process a pair of routes only once + # and to ensure the order of routes in the pair + self.alert_twin_routes_differ(route1, route2) + + def check_return_circular_routes(self) -> None: + routes = { + route + for route in self.get_meaningful_routes() + if route.is_circular + } + routes_having_backward = set() + + for route in routes: + if route in routes_having_backward: + continue + transfer_sequence1 = [ + stop.stoparea.transfer or stop.stoparea.id for stop in route + ] + transfer_sequence1.pop() + for potential_backward_route in routes - {route}: + transfer_sequence2 = [ + stop.stoparea.transfer or stop.stoparea.id + for stop in potential_backward_route + ][ + -2::-1 + ] # truncate repeated first stop and reverse + common_subsequence = self.find_common_circular_subsequence( + transfer_sequence1, transfer_sequence2 + ) + if len(common_subsequence) >= 0.8 * min( + len(transfer_sequence1), len(transfer_sequence2) + ): + routes_having_backward.add(route) + routes_having_backward.add(potential_backward_route) + break + + for route in routes - routes_having_backward: + self.city.notice( + "Route does not have a return direction", route.element + ) + + @staticmethod + def find_common_circular_subsequence( + seq1: list[T], seq2: list[T] + ) -> list[T]: + """seq1 and seq2 are supposed to be stops of some circular routes. + Prerequisites to rely on the result: + - elements of each sequence are not repeated + - the order of stations is not violated. + Under these conditions we don't need LCS algorithm. Linear scan is + sufficient. + """ + i1, i2 = -1, -1 + for i1, x in enumerate(seq1): + try: + i2 = seq2.index(x) + except ValueError: + continue + else: + # x is found both in seq1 and seq2 + break + + if i2 == -1: + return [] + + # Shift cyclically so that the common element takes the first position + # both in seq1 and seq2 + seq1 = seq1[i1:] + seq1[:i1] + seq2 = seq2[i2:] + seq2[:i2] + + common_subsequence = [] + i2 = 0 + for x in seq1: + try: + i2 = seq2.index(x, i2) + except ValueError: + continue + common_subsequence.append(x) + i2 += 1 + if i2 >= len(seq2): + break + return common_subsequence + + def alert_twin_routes_differ(self, route1: Route, route2: Route) -> None: + """Arguments are that route1.id < route2.id""" + ( + stops_missing_from_route1, + stops_missing_from_route2, + stops_that_dont_match, + ) = self.calculate_twin_routes_diff(route1, route2) + + for st in stops_missing_from_route1: + if ( + not route1.are_tracks_complete() + or ( + projected_point := project_on_line( + st.stoparea.center, route1.tracks + )["projected_point"] + ) + is not None + and distance(st.stoparea.center, projected_point) + <= MAX_DISTANCE_STOP_TO_LINE + ): + self.city.notice( + f"Stop {st.stoparea.station.name} {st.stop} is included " + f"in the {route2.id} but not included in {route1.id}", + route1.element, + ) + + for st in stops_missing_from_route2: + if ( + not route2.are_tracks_complete() + or ( + projected_point := project_on_line( + st.stoparea.center, route2.tracks + )["projected_point"] + ) + is not None + and distance(st.stoparea.center, projected_point) + <= MAX_DISTANCE_STOP_TO_LINE + ): + self.city.notice( + f"Stop {st.stoparea.station.name} {st.stop} is included " + f"in the {route1.id} but not included in {route2.id}", + route2.element, + ) + + for st1, st2 in stops_that_dont_match: + if ( + st1.stoparea.station == st2.stoparea.station + or distance(st1.stop, st2.stop) < SUGGEST_TRANSFER_MIN_DISTANCE + ): + self.city.notice( + "Should there be one stoparea or a transfer between " + f"{st1.stoparea.station.name} {st1.stop} and " + f"{st2.stoparea.station.name} {st2.stop}?", + route1.element, + ) + + @staticmethod + def calculate_twin_routes_diff(route1: Route, route2: Route) -> tuple: + """Wagner–Fischer algorithm for stops diff in two twin routes.""" + + stops1 = route1.stops + stops2 = route2.stops[::-1] + + def stops_match(stop1: RouteStop, stop2: RouteStop) -> bool: + return ( + stop1.stoparea == stop2.stoparea + or stop1.stoparea.transfer is not None + and stop1.stoparea.transfer == stop2.stoparea.transfer + ) + + d = [[0] * (len(stops2) + 1) for _ in range(len(stops1) + 1)] + d[0] = list(range(len(stops2) + 1)) + for i in range(len(stops1) + 1): + d[i][0] = i + + for i in range(1, len(stops1) + 1): + for j in range(1, len(stops2) + 1): + d[i][j] = ( + d[i - 1][j - 1] + if stops_match(stops1[i - 1], stops2[j - 1]) + else min((d[i - 1][j], d[i][j - 1], d[i - 1][j - 1])) + 1 + ) + + stops_missing_from_route1: list[RouteStop] = [] + stops_missing_from_route2: list[RouteStop] = [] + stops_that_dont_match: list[tuple[RouteStop, RouteStop]] = [] + + i = len(stops1) + j = len(stops2) + while not (i == 0 and j == 0): + action = None + if i > 0 and j > 0: + match = stops_match(stops1[i - 1], stops2[j - 1]) + if match and d[i - 1][j - 1] == d[i][j]: + action = "no" + elif not match and d[i - 1][j - 1] + 1 == d[i][j]: + action = "change" + if not action and i > 0 and d[i - 1][j] + 1 == d[i][j]: + action = "add_2" + if not action and j > 0 and d[i][j - 1] + 1 == d[i][j]: + action = "add_1" + + match action: + case "add_1": + stops_missing_from_route1.append(stops2[j - 1]) + j -= 1 + case "add_2": + stops_missing_from_route2.append(stops1[i - 1]) + i -= 1 + case _: + if action == "change": + stops_that_dont_match.append( + (stops1[i - 1], stops2[j - 1]) + ) + i -= 1 + j -= 1 + return ( + stops_missing_from_route1, + stops_missing_from_route2, + stops_that_dont_match, + ) def __len__(self): return len(self.routes) @@ -1923,8 +2167,8 @@ class City: k = el_id(master) if master else route.ref if k not in self.routes: - self.routes[k] = RouteMaster(master) - self.routes[k].add(route, self) + self.routes[k] = RouteMaster(self, master) + self.routes[k].add(route) # Sometimes adding a route to a newly initialized RouteMaster # can fail @@ -2055,166 +2299,6 @@ class City: f"relations: {format_elid_list(not_in_sa)}" ) - def check_return_routes(self, rmaster: RouteMaster) -> None: - """Check if a route has return direction, and if twin routes - miss stations. - """ - meaningful_routes = rmaster.get_meaningful_routes() - - if len(meaningful_routes) == 0: - self.error( - f"An empty route master {rmaster.id}. " - "Please set construction:route if it is under construction" - ) - elif len(meaningful_routes) == 1: - log_function = ( - self.error if not rmaster.best.is_circular else self.notice - ) - log_function( - "Only one route in route_master. " - "Please check if it needs a return route", - rmaster.best.element, - ) - else: - all_ends = { - route.get_end_transfers(): route for route in meaningful_routes - } - for route in meaningful_routes: - ends = route.get_end_transfers() - if ends[::-1] not in all_ends: - self.notice( - "Route does not have a return direction", route.element - ) - - twin_routes = rmaster.find_twin_routes() - for route1, route2 in twin_routes.items(): - if route1.id > route2.id: - continue # to process a pair of routes only once - # and to ensure the order of routes in the pair - self.alert_twin_routes_differ(route1, route2) - - def alert_twin_routes_differ(self, route1: Route, route2: Route) -> None: - """Arguments are that route1.id < route2.id""" - ( - stops_missing_from_route1, - stops_missing_from_route2, - stops_that_dont_match, - ) = self.calculate_twin_routes_diff(route1, route2) - - for st in stops_missing_from_route1: - if ( - not route1.are_tracks_complete() - or ( - projected_point := project_on_line( - st.stoparea.center, route1.tracks - )["projected_point"] - ) - is not None - and distance(st.stoparea.center, projected_point) - <= MAX_DISTANCE_STOP_TO_LINE - ): - self.notice( - f"Stop {st.stoparea.station.name} {st.stop} is included " - f"in the {route2.id} but not included in {route1.id}", - route1.element, - ) - - for st in stops_missing_from_route2: - if ( - not route2.are_tracks_complete() - or ( - projected_point := project_on_line( - st.stoparea.center, route2.tracks - )["projected_point"] - ) - is not None - and distance(st.stoparea.center, projected_point) - <= MAX_DISTANCE_STOP_TO_LINE - ): - self.notice( - f"Stop {st.stoparea.station.name} {st.stop} is included " - f"in the {route1.id} but not included in {route2.id}", - route2.element, - ) - - for st1, st2 in stops_that_dont_match: - if ( - st1.stoparea.station == st2.stoparea.station - or distance(st1.stop, st2.stop) < SUGGEST_TRANSFER_MIN_DISTANCE - ): - self.notice( - "Should there be one stoparea or a transfer between " - f"{st1.stoparea.station.name} {st1.stop} and " - f"{st2.stoparea.station.name} {st2.stop}?", - route1.element, - ) - - @staticmethod - def calculate_twin_routes_diff(route1: Route, route2: Route) -> tuple: - """Wagner–Fischer algorithm for stops diff in two twin routes.""" - - stops1 = route1.stops - stops2 = route2.stops[::-1] - - def stops_match(stop1: RouteStop, stop2: RouteStop) -> bool: - return ( - stop1.stoparea == stop2.stoparea - or stop1.stoparea.transfer is not None - and stop1.stoparea.transfer == stop2.stoparea.transfer - ) - - d = [[0] * (len(stops2) + 1) for _ in range(len(stops1) + 1)] - d[0] = list(range(len(stops2) + 1)) - for i in range(len(stops1) + 1): - d[i][0] = i - - for i in range(1, len(stops1) + 1): - for j in range(1, len(stops2) + 1): - d[i][j] = ( - d[i - 1][j - 1] - if stops_match(stops1[i - 1], stops2[j - 1]) - else min((d[i - 1][j], d[i][j - 1], d[i - 1][j - 1])) + 1 - ) - - stops_missing_from_route1: list[RouteStop] = [] - stops_missing_from_route2: list[RouteStop] = [] - stops_that_dont_match: list[tuple[RouteStop, RouteStop]] = [] - - i = len(stops1) - j = len(stops2) - while not (i == 0 and j == 0): - action = None - if i > 0 and j > 0: - match = stops_match(stops1[i - 1], stops2[j - 1]) - if match and d[i - 1][j - 1] == d[i][j]: - action = "no" - elif not match and d[i - 1][j - 1] + 1 == d[i][j]: - action = "change" - if not action and i > 0 and d[i - 1][j] + 1 == d[i][j]: - action = "add_2" - if not action and j > 0 and d[i][j - 1] + 1 == d[i][j]: - action = "add_1" - - match action: - case "add_1": - stops_missing_from_route1.append(stops2[j - 1]) - j -= 1 - case "add_2": - stops_missing_from_route2.append(stops1[i - 1]) - i -= 1 - case _: - if action == "change": - stops_that_dont_match.append( - (stops1[i - 1], stops2[j - 1]) - ) - i -= 1 - j -= 1 - return ( - stops_missing_from_route1, - stops_missing_from_route2, - stops_that_dont_match, - ) - def validate_lines(self): self.found_light_lines = len( [x for x in self.routes.values() if x.mode != "subway"] @@ -2267,9 +2351,9 @@ class City: for rmaster in self.routes.values(): networks[str(rmaster.network)] += 1 if not self.overground: - self.check_return_routes(rmaster) + rmaster.check_return_routes() route_stations = set() - for sa in rmaster.stop_areas(): + for sa in rmaster.stopareas(): route_stations.add(sa.transfer or sa.id) unused_stations.discard(sa.station.id) self.found_stations += len(route_stations) diff --git a/tests/assets/route_masters.osm b/tests/assets/route_masters.osm index 0635a2b..1d466c8 100644 --- a/tests/assets/route_masters.osm +++ b/tests/assets/route_masters.osm @@ -194,7 +194,7 @@ - + @@ -202,8 +202,8 @@ - + @@ -272,12 +272,27 @@ - - - + + + + + - - + + + + + + + + + + + + + + + @@ -524,4 +539,20 @@ + + + + + + + + + + + + + + + + diff --git a/tests/assets/twin_routes.osm b/tests/assets/twin_routes.osm index e2e7f42..38cbe6c 100644 --- a/tests/assets/twin_routes.osm +++ b/tests/assets/twin_routes.osm @@ -288,10 +288,10 @@ - - - + + + diff --git a/tests/sample_data_for_error_messages.py b/tests/sample_data_for_error_messages.py index 2e20c73..0f5a434 100644 --- a/tests/sample_data_for_error_messages.py +++ b/tests/sample_data_for_error_messages.py @@ -326,9 +326,9 @@ metro_samples = [ "xml_file": "assets/route_masters.osm", "cities_info": [ { - "num_stations": (3 + 3 + 3 + 5 + 3 + 3 + 4) + "num_stations": (3 + 3 + 3 + 5 + 3 + 3 + 4 + 3) + (3 + 3 + 3 + 3 + 3 + 3 + 4), - "num_lines": 7 + 7, + "num_lines": 8 + 7, "num_interchanges": 0 + 1, }, ], @@ -350,6 +350,8 @@ metro_samples = [ 'Route does not have a return direction (relation 209, "5: 1-2-3")', # noqa: E501 'Route does not have a return direction (relation 210, "5: 2-1")', # noqa: E501 'Only one route in route_master. Please check if it needs a return route (relation 213, "C3: 1-2-3-8-1")', # noqa: E501 + 'Route does not have a return direction (relation 168, "C5: 1-3-5-1")', # noqa: E501 + 'Route does not have a return direction (relation 169, "C5: 3-5-1-3")', # noqa: E501 ], }, ] diff --git a/tests/test_route_master.py b/tests/test_route_master.py index 1bab617..22d2f8b 100644 --- a/tests/test_route_master.py +++ b/tests/test_route_master.py @@ -1,9 +1,97 @@ -from tests.util import TestCase - +from subway_structure import RouteMaster from tests.sample_data_for_twin_routes import metro_samples +from tests.util import TestCase class TestRouteMaster(TestCase): + def test__find_common_circular_subsequence(self) -> None: + cases = [ + { # the 1st sequence is empty + "sequence1": [], + "sequence2": [1, 2, 3, 4], + "answer": [], + }, + { # the 2nd sequence is empty + "sequence1": [1, 2, 3, 4], + "sequence2": [], + "answer": [], + }, + { # equal sequences + "sequence1": [1, 2, 3, 4], + "sequence2": [1, 2, 3, 4], + "answer": [1, 2, 3, 4], + }, + { # one sequence is a cyclic shift of the other + "sequence1": [1, 2, 3, 4], + "sequence2": [4, 1, 2, 3], + "answer": [1, 2, 3, 4], + }, + { # the 2nd sequence is a subsequence of the 1st; equal ends + "sequence1": [1, 2, 3, 4], + "sequence2": [1, 2, 4], + "answer": [1, 2, 4], + }, + { # the 1st sequence is a subsequence of the 2nd; equal ends + "sequence1": [1, 2, 4], + "sequence2": [1, 2, 3, 4], + "answer": [1, 2, 4], + }, + { # the 2nd sequence is an innter subsequence of the 1st + "sequence1": [1, 2, 3, 4], + "sequence2": [2, 3], + "answer": [2, 3], + }, + { # the 1st sequence is an inner subsequence of the 2nd + "sequence1": [2, 3], + "sequence2": [1, 2, 3, 4], + "answer": [2, 3], + }, + { # the 2nd sequence is a continuation of the 1st + "sequence1": [1, 2, 3, 4], + "sequence2": [4, 5, 6], + "answer": [4], + }, + { # the 1st sequence is a continuation of the 2nd + "sequence1": [4, 5, 6], + "sequence2": [1, 2, 3, 4], + "answer": [4], + }, + { # no common elements + "sequence1": [1, 2, 3, 4], + "sequence2": [5, 6, 7], + "answer": [], + }, + { # one sequence is the reversed other + "sequence1": [1, 2, 3, 4], + "sequence2": [4, 3, 2, 1], + "answer": [1, 2], + }, + { # the 2nd is a subsequence of shifted 1st + "sequence1": [1, 2, 3, 4], + "sequence2": [2, 4, 1], + "answer": [1, 2, 4], + }, + { # the 1st is a subsequence of shifted 2nd + "sequence1": [2, 4, 1], + "sequence2": [1, 2, 3, 4], + "answer": [2, 4, 1], + }, + { # mixed case: few common elements + "sequence1": [1, 2, 4], + "sequence2": [2, 3, 4], + "answer": [2, 4], + }, + ] + + for i, case in enumerate(cases): + with self.subTest(f"case#{i}"): + self.assertListEqual( + case["answer"], + RouteMaster.find_common_circular_subsequence( + case["sequence1"], case["sequence2"] + ), + ) + def _test_find_twin_routes_for_network(self, metro_sample: dict) -> None: cities, transfers = self.prepare_cities(metro_sample) city = cities[0]