Special searching for backward counterparts for circular routes

This commit is contained in:
Alexey Zakharenkov 2024-02-21 23:33:07 +03:00
parent f7087a0c25
commit 28f4c0d139
7 changed files with 407 additions and 201 deletions

1
.gitignore vendored
View file

@ -3,6 +3,7 @@ tmp_html/
html/
.idea
.DS_Store
.venv
*.log
*.json
*.geojson

View file

@ -88,7 +88,7 @@ def dump_yaml(city, f):
routes = []
for route in city:
stations = OrderedDict(
[(sa.transfer or sa.id, sa.name) for sa in route.stop_areas()]
[(sa.transfer or sa.id, sa.name) for sa in route.stopareas()]
)
rte = {
"type": route.mode,

View file

@ -5,6 +5,7 @@ import re
from collections import Counter, defaultdict
from collections.abc import Collection, Iterator
from itertools import chain, islice
from typing import TypeVar
from css_colours import normalize_colour
@ -49,6 +50,7 @@ START_END_TIMES_RE = re.compile(r".*?(\d{2}):(\d{2})-(\d{2}):(\d{2}).*")
IdT = str # Type of feature ids
TransferT = set[IdT] # A transfer is a set of StopArea IDs
TransfersT = Collection[TransferT]
T = TypeVar("T")
def get_start_end_times(opening_hours):
@ -626,7 +628,7 @@ class RouteStop:
class Route:
"""The longest route for a city with a unique ref."""
"""Corresponds to OSM "type=route" relation"""
@staticmethod
def is_route(el, modes):
@ -677,7 +679,12 @@ class Route:
yield stoparea
yielded_stopareas.add(stoparea)
def __init__(self, relation, city, master=None):
def __init__(
self,
relation: dict,
city: City,
master: dict | None = None,
) -> None:
assert Route.is_route(
relation, city.modes
), f"The relation does not seem to be a route: {relation}"
@ -1440,7 +1447,8 @@ class Route:
class RouteMaster:
def __init__(self, master=None):
def __init__(self, city: City, master: dict = None) -> None:
self.city = city
self.routes = []
self.best = None
self.id = el_id(master)
@ -1486,11 +1494,11 @@ class RouteMaster:
yield stoparea
yielded_stopareas.add(stoparea)
def add(self, route, city):
def add(self, route: Route) -> None:
if not self.network:
self.network = route.network
elif route.network and route.network != self.network:
city.error(
self.city.error(
'Route has different network ("{}") from master "{}"'.format(
route.network, self.network
),
@ -1500,7 +1508,7 @@ class RouteMaster:
if not self.colour:
self.colour = route.colour
elif route.colour and route.colour != self.colour:
city.notice(
self.city.notice(
'Route "{}" has different colour from master "{}"'.format(
route.colour, self.colour
),
@ -1510,7 +1518,7 @@ class RouteMaster:
if not self.infill:
self.infill = route.infill
elif route.infill and route.infill != self.infill:
city.notice(
self.city.notice(
(
f'Route "{route.infill}" has different infill colour '
f'from master "{self.infill}"'
@ -1521,7 +1529,7 @@ class RouteMaster:
if not self.ref:
self.ref = route.ref
elif route.ref != self.ref:
city.notice(
self.city.notice(
'Route "{}" has different ref from master "{}"'.format(
route.ref, self.ref
),
@ -1534,7 +1542,7 @@ class RouteMaster:
if not self.mode:
self.mode = route.mode
elif route.mode != self.mode:
city.error(
self.city.error(
"Incompatible PT mode: master has {} and route has {}".format(
self.mode, route.mode
),
@ -1568,8 +1576,8 @@ class RouteMaster:
return [route for route in self if len(route) >= 2]
def find_twin_routes(self) -> dict[Route, Route]:
"""Two routes are twins if they have the same end stations
and opposite directions, and the number of stations is
"""Two non-circular routes are twins if they have the same end
stations and opposite directions, and the number of stations is
the same or almost the same. We'll then find stops that are present
in one direction and is missing in another direction - to warn.
"""
@ -1581,8 +1589,6 @@ class RouteMaster:
continue # Difficult to calculate. TODO(?) in the future
if route in twin_routes:
continue
if len(route) < 2:
continue
route_transfer_ids = set(route.get_transfers_sequence())
ends = route.get_end_transfers()
@ -1617,15 +1623,253 @@ class RouteMaster:
return twin_routes
def stop_areas(self):
"""Returns a list of all stations on all route variants."""
seen_ids = set()
for route in self.routes:
for stop in route:
st = stop.stoparea
if st.id not in seen_ids:
seen_ids.add(st.id)
yield st
def check_return_routes(self) -> None:
"""Check if a route has return direction, and if twin routes
miss stations.
"""
meaningful_routes = self.get_meaningful_routes()
if len(meaningful_routes) == 0:
self.city.error(
f"An empty route master {self.id}. "
"Please set construction:route if it is under construction"
)
elif len(meaningful_routes) == 1:
log_function = (
self.city.error
if not self.best.is_circular
else self.city.notice
)
log_function(
"Only one route in route_master. "
"Please check if it needs a return route",
self.best.element,
)
else:
self.check_return_circular_routes()
self.check_return_noncircular_routes()
def check_return_noncircular_routes(self) -> None:
routes = [
route
for route in self.get_meaningful_routes()
if not route.is_circular
]
all_ends = {route.get_end_transfers(): route for route in routes}
for route in routes:
ends = route.get_end_transfers()
if ends[::-1] not in all_ends:
self.city.notice(
"Route does not have a return direction", route.element
)
twin_routes = self.find_twin_routes()
for route1, route2 in twin_routes.items():
if route1.id > route2.id:
continue # to process a pair of routes only once
# and to ensure the order of routes in the pair
self.alert_twin_routes_differ(route1, route2)
def check_return_circular_routes(self) -> None:
routes = {
route
for route in self.get_meaningful_routes()
if route.is_circular
}
routes_having_backward = set()
for route in routes:
if route in routes_having_backward:
continue
transfer_sequence1 = [
stop.stoparea.transfer or stop.stoparea.id for stop in route
]
transfer_sequence1.pop()
for potential_backward_route in routes - {route}:
transfer_sequence2 = [
stop.stoparea.transfer or stop.stoparea.id
for stop in potential_backward_route
][
-2::-1
] # truncate repeated first stop and reverse
common_subsequence = self.find_common_circular_subsequence(
transfer_sequence1, transfer_sequence2
)
if len(common_subsequence) >= 0.8 * min(
len(transfer_sequence1), len(transfer_sequence2)
):
routes_having_backward.add(route)
routes_having_backward.add(potential_backward_route)
break
for route in routes - routes_having_backward:
self.city.notice(
"Route does not have a return direction", route.element
)
@staticmethod
def find_common_circular_subsequence(
seq1: list[T], seq2: list[T]
) -> list[T]:
"""seq1 and seq2 are supposed to be stops of some circular routes.
Prerequisites to rely on the result:
- elements of each sequence are not repeated
- the order of stations is not violated.
Under these conditions we don't need LCS algorithm. Linear scan is
sufficient.
"""
i1, i2 = -1, -1
for i1, x in enumerate(seq1):
try:
i2 = seq2.index(x)
except ValueError:
continue
else:
# x is found both in seq1 and seq2
break
if i2 == -1:
return []
# Shift cyclically so that the common element takes the first position
# both in seq1 and seq2
seq1 = seq1[i1:] + seq1[:i1]
seq2 = seq2[i2:] + seq2[:i2]
common_subsequence = []
i2 = 0
for x in seq1:
try:
i2 = seq2.index(x, i2)
except ValueError:
continue
common_subsequence.append(x)
i2 += 1
if i2 >= len(seq2):
break
return common_subsequence
def alert_twin_routes_differ(self, route1: Route, route2: Route) -> None:
"""Arguments are that route1.id < route2.id"""
(
stops_missing_from_route1,
stops_missing_from_route2,
stops_that_dont_match,
) = self.calculate_twin_routes_diff(route1, route2)
for st in stops_missing_from_route1:
if (
not route1.are_tracks_complete()
or (
projected_point := project_on_line(
st.stoparea.center, route1.tracks
)["projected_point"]
)
is not None
and distance(st.stoparea.center, projected_point)
<= MAX_DISTANCE_STOP_TO_LINE
):
self.city.notice(
f"Stop {st.stoparea.station.name} {st.stop} is included "
f"in the {route2.id} but not included in {route1.id}",
route1.element,
)
for st in stops_missing_from_route2:
if (
not route2.are_tracks_complete()
or (
projected_point := project_on_line(
st.stoparea.center, route2.tracks
)["projected_point"]
)
is not None
and distance(st.stoparea.center, projected_point)
<= MAX_DISTANCE_STOP_TO_LINE
):
self.city.notice(
f"Stop {st.stoparea.station.name} {st.stop} is included "
f"in the {route1.id} but not included in {route2.id}",
route2.element,
)
for st1, st2 in stops_that_dont_match:
if (
st1.stoparea.station == st2.stoparea.station
or distance(st1.stop, st2.stop) < SUGGEST_TRANSFER_MIN_DISTANCE
):
self.city.notice(
"Should there be one stoparea or a transfer between "
f"{st1.stoparea.station.name} {st1.stop} and "
f"{st2.stoparea.station.name} {st2.stop}?",
route1.element,
)
@staticmethod
def calculate_twin_routes_diff(route1: Route, route2: Route) -> tuple:
"""WagnerFischer algorithm for stops diff in two twin routes."""
stops1 = route1.stops
stops2 = route2.stops[::-1]
def stops_match(stop1: RouteStop, stop2: RouteStop) -> bool:
return (
stop1.stoparea == stop2.stoparea
or stop1.stoparea.transfer is not None
and stop1.stoparea.transfer == stop2.stoparea.transfer
)
d = [[0] * (len(stops2) + 1) for _ in range(len(stops1) + 1)]
d[0] = list(range(len(stops2) + 1))
for i in range(len(stops1) + 1):
d[i][0] = i
for i in range(1, len(stops1) + 1):
for j in range(1, len(stops2) + 1):
d[i][j] = (
d[i - 1][j - 1]
if stops_match(stops1[i - 1], stops2[j - 1])
else min((d[i - 1][j], d[i][j - 1], d[i - 1][j - 1])) + 1
)
stops_missing_from_route1: list[RouteStop] = []
stops_missing_from_route2: list[RouteStop] = []
stops_that_dont_match: list[tuple[RouteStop, RouteStop]] = []
i = len(stops1)
j = len(stops2)
while not (i == 0 and j == 0):
action = None
if i > 0 and j > 0:
match = stops_match(stops1[i - 1], stops2[j - 1])
if match and d[i - 1][j - 1] == d[i][j]:
action = "no"
elif not match and d[i - 1][j - 1] + 1 == d[i][j]:
action = "change"
if not action and i > 0 and d[i - 1][j] + 1 == d[i][j]:
action = "add_2"
if not action and j > 0 and d[i][j - 1] + 1 == d[i][j]:
action = "add_1"
match action:
case "add_1":
stops_missing_from_route1.append(stops2[j - 1])
j -= 1
case "add_2":
stops_missing_from_route2.append(stops1[i - 1])
i -= 1
case _:
if action == "change":
stops_that_dont_match.append(
(stops1[i - 1], stops2[j - 1])
)
i -= 1
j -= 1
return (
stops_missing_from_route1,
stops_missing_from_route2,
stops_that_dont_match,
)
def __len__(self):
return len(self.routes)
@ -1923,8 +2167,8 @@ class City:
k = el_id(master) if master else route.ref
if k not in self.routes:
self.routes[k] = RouteMaster(master)
self.routes[k].add(route, self)
self.routes[k] = RouteMaster(self, master)
self.routes[k].add(route)
# Sometimes adding a route to a newly initialized RouteMaster
# can fail
@ -2055,166 +2299,6 @@ class City:
f"relations: {format_elid_list(not_in_sa)}"
)
def check_return_routes(self, rmaster: RouteMaster) -> None:
"""Check if a route has return direction, and if twin routes
miss stations.
"""
meaningful_routes = rmaster.get_meaningful_routes()
if len(meaningful_routes) == 0:
self.error(
f"An empty route master {rmaster.id}. "
"Please set construction:route if it is under construction"
)
elif len(meaningful_routes) == 1:
log_function = (
self.error if not rmaster.best.is_circular else self.notice
)
log_function(
"Only one route in route_master. "
"Please check if it needs a return route",
rmaster.best.element,
)
else:
all_ends = {
route.get_end_transfers(): route for route in meaningful_routes
}
for route in meaningful_routes:
ends = route.get_end_transfers()
if ends[::-1] not in all_ends:
self.notice(
"Route does not have a return direction", route.element
)
twin_routes = rmaster.find_twin_routes()
for route1, route2 in twin_routes.items():
if route1.id > route2.id:
continue # to process a pair of routes only once
# and to ensure the order of routes in the pair
self.alert_twin_routes_differ(route1, route2)
def alert_twin_routes_differ(self, route1: Route, route2: Route) -> None:
"""Arguments are that route1.id < route2.id"""
(
stops_missing_from_route1,
stops_missing_from_route2,
stops_that_dont_match,
) = self.calculate_twin_routes_diff(route1, route2)
for st in stops_missing_from_route1:
if (
not route1.are_tracks_complete()
or (
projected_point := project_on_line(
st.stoparea.center, route1.tracks
)["projected_point"]
)
is not None
and distance(st.stoparea.center, projected_point)
<= MAX_DISTANCE_STOP_TO_LINE
):
self.notice(
f"Stop {st.stoparea.station.name} {st.stop} is included "
f"in the {route2.id} but not included in {route1.id}",
route1.element,
)
for st in stops_missing_from_route2:
if (
not route2.are_tracks_complete()
or (
projected_point := project_on_line(
st.stoparea.center, route2.tracks
)["projected_point"]
)
is not None
and distance(st.stoparea.center, projected_point)
<= MAX_DISTANCE_STOP_TO_LINE
):
self.notice(
f"Stop {st.stoparea.station.name} {st.stop} is included "
f"in the {route1.id} but not included in {route2.id}",
route2.element,
)
for st1, st2 in stops_that_dont_match:
if (
st1.stoparea.station == st2.stoparea.station
or distance(st1.stop, st2.stop) < SUGGEST_TRANSFER_MIN_DISTANCE
):
self.notice(
"Should there be one stoparea or a transfer between "
f"{st1.stoparea.station.name} {st1.stop} and "
f"{st2.stoparea.station.name} {st2.stop}?",
route1.element,
)
@staticmethod
def calculate_twin_routes_diff(route1: Route, route2: Route) -> tuple:
"""WagnerFischer algorithm for stops diff in two twin routes."""
stops1 = route1.stops
stops2 = route2.stops[::-1]
def stops_match(stop1: RouteStop, stop2: RouteStop) -> bool:
return (
stop1.stoparea == stop2.stoparea
or stop1.stoparea.transfer is not None
and stop1.stoparea.transfer == stop2.stoparea.transfer
)
d = [[0] * (len(stops2) + 1) for _ in range(len(stops1) + 1)]
d[0] = list(range(len(stops2) + 1))
for i in range(len(stops1) + 1):
d[i][0] = i
for i in range(1, len(stops1) + 1):
for j in range(1, len(stops2) + 1):
d[i][j] = (
d[i - 1][j - 1]
if stops_match(stops1[i - 1], stops2[j - 1])
else min((d[i - 1][j], d[i][j - 1], d[i - 1][j - 1])) + 1
)
stops_missing_from_route1: list[RouteStop] = []
stops_missing_from_route2: list[RouteStop] = []
stops_that_dont_match: list[tuple[RouteStop, RouteStop]] = []
i = len(stops1)
j = len(stops2)
while not (i == 0 and j == 0):
action = None
if i > 0 and j > 0:
match = stops_match(stops1[i - 1], stops2[j - 1])
if match and d[i - 1][j - 1] == d[i][j]:
action = "no"
elif not match and d[i - 1][j - 1] + 1 == d[i][j]:
action = "change"
if not action and i > 0 and d[i - 1][j] + 1 == d[i][j]:
action = "add_2"
if not action and j > 0 and d[i][j - 1] + 1 == d[i][j]:
action = "add_1"
match action:
case "add_1":
stops_missing_from_route1.append(stops2[j - 1])
j -= 1
case "add_2":
stops_missing_from_route2.append(stops1[i - 1])
i -= 1
case _:
if action == "change":
stops_that_dont_match.append(
(stops1[i - 1], stops2[j - 1])
)
i -= 1
j -= 1
return (
stops_missing_from_route1,
stops_missing_from_route2,
stops_that_dont_match,
)
def validate_lines(self):
self.found_light_lines = len(
[x for x in self.routes.values() if x.mode != "subway"]
@ -2267,9 +2351,9 @@ class City:
for rmaster in self.routes.values():
networks[str(rmaster.network)] += 1
if not self.overground:
self.check_return_routes(rmaster)
rmaster.check_return_routes()
route_stations = set()
for sa in rmaster.stop_areas():
for sa in rmaster.stopareas():
route_stations.add(sa.transfer or sa.id)
unused_stations.discard(sa.station.id)
self.found_stations += len(route_stations)

View file

@ -194,7 +194,7 @@
<member type='node' ref='5' role='' />
<member type='node' ref='1' role='' />
<tag k='name' v='C: 1-3-5-1' />
<tag k='note' v='Circular route without backward' />
<tag k='note' v='Circular route without backward and without master' />
<tag k='ref' v='C' />
<tag k='colour' v='gray' />
<tag k='route' v='subway' />
@ -202,8 +202,8 @@
</relation>
<relation id='160' visible='true' version='1'>
<member type='node' ref='1' role='' />
<member type='node' ref='3' role='' />
<member type='node' ref='5' role='' />
<member type='node' ref='3' role='' />
<member type='node' ref='1' role='' />
<tag k='name' v='C2: 1-5-3-1' />
<tag k='note' v='Circular route with backward' />
@ -272,12 +272,27 @@
<tag k='route' v='subway' />
<tag k='type' v='route' />
</relation>
<relation id='167' visible='true' version='1'>
<tag k='note' v='Empty route_master, so that it cannot be assigned to any city' />
<tag k='ref' v='06' />
<relation id='168' visible='true' version='1'>
<member type='node' ref='1' role='' />
<member type='node' ref='3' role='' />
<member type='node' ref='5' role='' />
<member type='node' ref='1' role='' />
<tag k='colour' v='gray' />
<tag k='route_master' v='subway' />
<tag k='type' v='route_master' />
<tag k='name' v='C5: 1-3-5-1' />
<tag k='ref' v='C5' />
<tag k='route' v='subway' />
<tag k='type' v='route' />
</relation>
<relation id='169' visible='true' version='1'>
<member type='node' ref='3' role='' />
<member type='node' ref='5' role='' />
<member type='node' ref='1' role='' />
<member type='node' ref='3' role='' />
<tag k='colour' v='gray' />
<tag k='name' v='C5: 3-5-1-3' />
<tag k='ref' v='C5' />
<tag k='route' v='subway' />
<tag k='type' v='route' />
</relation>
<relation id='201' visible='true' version='1'>
<member type='node' ref='21' role='' />
@ -524,4 +539,20 @@
<tag k='route_master' v='subway' />
<tag k='type' v='route_master' />
</relation>
<relation id='10025' visible='true' version='1'>
<member type='relation' ref='168' role='' />
<member type='relation' ref='169' role='' />
<tag k='colour' v='gray' />
<tag k='note' v='Two cirlucar routes with the same direction. Shloud generate notice &apos;no return direction&apos;' />
<tag k='ref' v='C5' />
<tag k='route_master' v='subway' />
<tag k='type' v='route_master' />
</relation>
<relation id='10026' visible='true' version='1'>
<tag k='colour' v='gray' />
<tag k='note' v='Empty route_master, so that it cannot be assigned to any city' />
<tag k='ref' v='06' />
<tag k='route_master' v='subway' />
<tag k='type' v='route_master' />
</relation>
</osm>

View file

@ -288,10 +288,10 @@
</relation>
<relation id='160' visible='true' version='1'>
<member type='node' ref='1' role='' />
<member type='node' ref='2' role='' />
<member type='node' ref='3' role='' />
<member type='node' ref='4' role='' />
<member type='node' ref='5' role='' />
<member type='node' ref='4' role='' />
<member type='node' ref='3' role='' />
<member type='node' ref='2' role='' />
<member type='node' ref='1' role='' />
<tag k='name' v='C2: 1-5-4-3-2-1' />
<tag k='note' v='Circular route with backward' />

View file

@ -326,9 +326,9 @@ metro_samples = [
"xml_file": "assets/route_masters.osm",
"cities_info": [
{
"num_stations": (3 + 3 + 3 + 5 + 3 + 3 + 4)
"num_stations": (3 + 3 + 3 + 5 + 3 + 3 + 4 + 3)
+ (3 + 3 + 3 + 3 + 3 + 3 + 4),
"num_lines": 7 + 7,
"num_lines": 8 + 7,
"num_interchanges": 0 + 1,
},
],
@ -350,6 +350,8 @@ metro_samples = [
'Route does not have a return direction (relation 209, "5: 1-2-3")', # noqa: E501
'Route does not have a return direction (relation 210, "5: 2-1")', # noqa: E501
'Only one route in route_master. Please check if it needs a return route (relation 213, "C3: 1-2-3-8-1")', # noqa: E501
'Route does not have a return direction (relation 168, "C5: 1-3-5-1")', # noqa: E501
'Route does not have a return direction (relation 169, "C5: 3-5-1-3")', # noqa: E501
],
},
]

View file

@ -1,9 +1,97 @@
from tests.util import TestCase
from subway_structure import RouteMaster
from tests.sample_data_for_twin_routes import metro_samples
from tests.util import TestCase
class TestRouteMaster(TestCase):
def test__find_common_circular_subsequence(self) -> None:
cases = [
{ # the 1st sequence is empty
"sequence1": [],
"sequence2": [1, 2, 3, 4],
"answer": [],
},
{ # the 2nd sequence is empty
"sequence1": [1, 2, 3, 4],
"sequence2": [],
"answer": [],
},
{ # equal sequences
"sequence1": [1, 2, 3, 4],
"sequence2": [1, 2, 3, 4],
"answer": [1, 2, 3, 4],
},
{ # one sequence is a cyclic shift of the other
"sequence1": [1, 2, 3, 4],
"sequence2": [4, 1, 2, 3],
"answer": [1, 2, 3, 4],
},
{ # the 2nd sequence is a subsequence of the 1st; equal ends
"sequence1": [1, 2, 3, 4],
"sequence2": [1, 2, 4],
"answer": [1, 2, 4],
},
{ # the 1st sequence is a subsequence of the 2nd; equal ends
"sequence1": [1, 2, 4],
"sequence2": [1, 2, 3, 4],
"answer": [1, 2, 4],
},
{ # the 2nd sequence is an innter subsequence of the 1st
"sequence1": [1, 2, 3, 4],
"sequence2": [2, 3],
"answer": [2, 3],
},
{ # the 1st sequence is an inner subsequence of the 2nd
"sequence1": [2, 3],
"sequence2": [1, 2, 3, 4],
"answer": [2, 3],
},
{ # the 2nd sequence is a continuation of the 1st
"sequence1": [1, 2, 3, 4],
"sequence2": [4, 5, 6],
"answer": [4],
},
{ # the 1st sequence is a continuation of the 2nd
"sequence1": [4, 5, 6],
"sequence2": [1, 2, 3, 4],
"answer": [4],
},
{ # no common elements
"sequence1": [1, 2, 3, 4],
"sequence2": [5, 6, 7],
"answer": [],
},
{ # one sequence is the reversed other
"sequence1": [1, 2, 3, 4],
"sequence2": [4, 3, 2, 1],
"answer": [1, 2],
},
{ # the 2nd is a subsequence of shifted 1st
"sequence1": [1, 2, 3, 4],
"sequence2": [2, 4, 1],
"answer": [1, 2, 4],
},
{ # the 1st is a subsequence of shifted 2nd
"sequence1": [2, 4, 1],
"sequence2": [1, 2, 3, 4],
"answer": [2, 4, 1],
},
{ # mixed case: few common elements
"sequence1": [1, 2, 4],
"sequence2": [2, 3, 4],
"answer": [2, 4],
},
]
for i, case in enumerate(cases):
with self.subTest(f"case#{i}"):
self.assertListEqual(
case["answer"],
RouteMaster.find_common_circular_subsequence(
case["sequence1"], case["sequence2"]
),
)
def _test_find_twin_routes_for_network(self, metro_sample: dict) -> None:
cities, transfers = self.prepare_cities(metro_sample)
city = cities[0]